mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 14:48:21 +00:00
Move thread manager stats to telemetry metric
This commit is contained in:
parent
a81f6ab9a6
commit
4face43462
6 changed files with 105 additions and 3 deletions
|
@ -188,7 +188,7 @@ void ProfileLogger::Log() {
|
||||||
timer_type_to_string(static_cast<TimerType>(i)), current_timers[i]));
|
timer_type_to_string(static_cast<TimerType>(i)), current_timers[i]));
|
||||||
}
|
}
|
||||||
|
|
||||||
file->Write(util::fmt("%0.6f Threads: current=%d\n", run_state::network_time, thread_mgr->NumThreads()));
|
file->Write(util::fmt("%0.6f Threads: current=%zu\n", run_state::network_time, thread_mgr->NumThreads()));
|
||||||
|
|
||||||
const threading::Manager::msg_stats_list& thread_stats = thread_mgr->GetMsgThreadStats();
|
const threading::Manager::msg_stats_list& thread_stats = thread_mgr->GetMsgThreadStats();
|
||||||
for ( threading::Manager::msg_stats_list::const_iterator i = thread_stats.begin(); i != thread_stats.end(); ++i ) {
|
for ( threading::Manager::msg_stats_list::const_iterator i = thread_stats.begin(); i != thread_stats.end(); ++i ) {
|
||||||
|
|
|
@ -337,7 +337,7 @@ function get_thread_stats%(%): ThreadStats
|
||||||
auto r = zeek::make_intrusive<zeek::RecordVal>(ThreadStats);
|
auto r = zeek::make_intrusive<zeek::RecordVal>(ThreadStats);
|
||||||
int n = 0;
|
int n = 0;
|
||||||
|
|
||||||
r->Assign(n++, zeek::thread_mgr->NumThreads());
|
r->Assign(n++, static_cast<uint64_t>(zeek::thread_mgr->NumThreads()));
|
||||||
|
|
||||||
return std::move(r);
|
return std::move(r);
|
||||||
%}
|
%}
|
||||||
|
|
|
@ -8,6 +8,7 @@
|
||||||
#include "zeek/NetVar.h"
|
#include "zeek/NetVar.h"
|
||||||
#include "zeek/RunState.h"
|
#include "zeek/RunState.h"
|
||||||
#include "zeek/iosource/Manager.h"
|
#include "zeek/iosource/Manager.h"
|
||||||
|
#include "zeek/telemetry/Manager.h"
|
||||||
|
|
||||||
namespace zeek::threading {
|
namespace zeek::threading {
|
||||||
namespace detail {
|
namespace detail {
|
||||||
|
@ -36,6 +37,17 @@ Manager::~Manager() {
|
||||||
Terminate();
|
Terminate();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void Manager::InitPostScript() {
|
||||||
|
num_threads_metric =
|
||||||
|
telemetry_mgr->GaugeInstance("zeek", "active_threads", {}, "Number of active threads", "",
|
||||||
|
[]() -> prometheus::ClientMetric {
|
||||||
|
prometheus::ClientMetric metric;
|
||||||
|
metric.gauge.value =
|
||||||
|
thread_mgr ? static_cast<double>(thread_mgr->all_threads.size()) : 0.0;
|
||||||
|
return metric;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
void Manager::Terminate() {
|
void Manager::Terminate() {
|
||||||
DBG_LOG(DBG_THREADING, "Terminating thread manager ...");
|
DBG_LOG(DBG_THREADING, "Terminating thread manager ...");
|
||||||
terminating = true;
|
terminating = true;
|
||||||
|
|
|
@ -7,6 +7,11 @@
|
||||||
#include "zeek/threading/MsgThread.h"
|
#include "zeek/threading/MsgThread.h"
|
||||||
|
|
||||||
namespace zeek {
|
namespace zeek {
|
||||||
|
|
||||||
|
namespace telemetry {
|
||||||
|
class Gauge;
|
||||||
|
}
|
||||||
|
|
||||||
namespace threading {
|
namespace threading {
|
||||||
namespace detail {
|
namespace detail {
|
||||||
|
|
||||||
|
@ -46,6 +51,12 @@ public:
|
||||||
*/
|
*/
|
||||||
~Manager();
|
~Manager();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Performs initialization that can only happen after script parsing has
|
||||||
|
* completed.
|
||||||
|
*/
|
||||||
|
void InitPostScript();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Terminates the manager's processor. The method signals all threads
|
* Terminates the manager's processor. The method signals all threads
|
||||||
* to terminates and wait for them to do so. It then joins them and
|
* to terminates and wait for them to do so. It then joins them and
|
||||||
|
@ -77,7 +88,7 @@ public:
|
||||||
* threads that are not yet joined, including any potentially in
|
* threads that are not yet joined, including any potentially in
|
||||||
* Terminating() state.
|
* Terminating() state.
|
||||||
*/
|
*/
|
||||||
int NumThreads() const { return all_threads.size(); }
|
size_t NumThreads() const { return all_threads.size(); }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Signals a specific threads to terminate immediately.
|
* Signals a specific threads to terminate immediately.
|
||||||
|
@ -151,6 +162,7 @@ private:
|
||||||
msg_stats_list stats;
|
msg_stats_list stats;
|
||||||
|
|
||||||
bool heartbeat_timer_running = false;
|
bool heartbeat_timer_running = false;
|
||||||
|
std::shared_ptr<telemetry::Gauge> num_threads_metric;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace threading
|
} // namespace threading
|
||||||
|
|
|
@ -229,6 +229,83 @@ MsgThread::MsgThread() : BasicThread(), queue_in(this, nullptr), queue_out(nullp
|
||||||
|
|
||||||
// Register IOSource as non-counting lifetime managed IO source.
|
// Register IOSource as non-counting lifetime managed IO source.
|
||||||
iosource_mgr->Register(io_source, true);
|
iosource_mgr->Register(io_source, true);
|
||||||
|
|
||||||
|
cnt_sent_in_metric = telemetry_mgr->CounterInstance("zeek", "msg_thread_msgs_sent_in", {{"thread_name", Name()}},
|
||||||
|
"Number of messages sent into thread");
|
||||||
|
cnt_sent_out_metric = telemetry_mgr->CounterInstance("zeek", "msg_thread_msgs_sent_out", {{"thread_name", Name()}},
|
||||||
|
"Number of messages sent from thread");
|
||||||
|
pending_in_metric = telemetry_mgr->GaugeInstance("zeek", "msg_thread_msgs_pending_in", {{"thread_name", Name()}},
|
||||||
|
"Number of pending messages sent into thread", "",
|
||||||
|
[this]() -> prometheus::ClientMetric {
|
||||||
|
prometheus::ClientMetric metric;
|
||||||
|
metric.gauge.value = static_cast<double>(queue_in.Size());
|
||||||
|
return metric;
|
||||||
|
});
|
||||||
|
pending_out_metric = telemetry_mgr->GaugeInstance("zeek", "msg_thread_msgs_pending_in", {{"thread_name", Name()}},
|
||||||
|
"Number of pending messages sent from thread", "",
|
||||||
|
[this]() -> prometheus::ClientMetric {
|
||||||
|
prometheus::ClientMetric metric;
|
||||||
|
metric.gauge.value = static_cast<double>(queue_out.Size());
|
||||||
|
return metric;
|
||||||
|
});
|
||||||
|
|
||||||
|
static auto get_queue_in_stats = [this]() -> const Queue<BasicInputMessage*>::Stats {
|
||||||
|
double now = util::current_time();
|
||||||
|
if ( this->queue_in_stats_last_updated < now - 0.01 ) {
|
||||||
|
queue_in.GetStats(&queue_in_last_stats);
|
||||||
|
this->queue_in_stats_last_updated = now;
|
||||||
|
}
|
||||||
|
|
||||||
|
return queue_in_last_stats;
|
||||||
|
};
|
||||||
|
|
||||||
|
queue_in_num_reads_metric =
|
||||||
|
telemetry_mgr->CounterInstance("zeek", "msg_thread_queue_in_reads", {{"thread_name", Name()}},
|
||||||
|
"Number of reads from msg thread input queue", "",
|
||||||
|
[]() -> prometheus::ClientMetric {
|
||||||
|
prometheus::ClientMetric metric;
|
||||||
|
auto stats = get_queue_in_stats();
|
||||||
|
metric.gauge.value = static_cast<double>(stats.num_reads);
|
||||||
|
return metric;
|
||||||
|
});
|
||||||
|
queue_in_num_writes_metric =
|
||||||
|
telemetry_mgr->CounterInstance("zeek", "msg_thread_queue_in_writes", {{"thread_name", Name()}},
|
||||||
|
"Number of writes from msg thread input queue", "",
|
||||||
|
[]() -> prometheus::ClientMetric {
|
||||||
|
prometheus::ClientMetric metric;
|
||||||
|
auto stats = get_queue_in_stats();
|
||||||
|
metric.gauge.value = static_cast<double>(stats.num_writes);
|
||||||
|
return metric;
|
||||||
|
});
|
||||||
|
|
||||||
|
static auto get_queue_out_stats = [this]() -> const Queue<BasicOutputMessage*>::Stats {
|
||||||
|
double now = util::current_time();
|
||||||
|
if ( this->queue_out_stats_last_updated < now - 0.01 ) {
|
||||||
|
queue_out.GetStats(&queue_out_last_stats);
|
||||||
|
this->queue_out_stats_last_updated = now;
|
||||||
|
}
|
||||||
|
|
||||||
|
return queue_out_last_stats;
|
||||||
|
};
|
||||||
|
|
||||||
|
queue_out_num_reads_metric =
|
||||||
|
telemetry_mgr->CounterInstance("zeek", "msg_thread_queue_out_reads", {{"thread_name", Name()}},
|
||||||
|
"Number of reads from msg thread input queue", "",
|
||||||
|
[]() -> prometheus::ClientMetric {
|
||||||
|
prometheus::ClientMetric metric;
|
||||||
|
auto stats = get_queue_out_stats();
|
||||||
|
metric.gauge.value = static_cast<double>(stats.num_reads);
|
||||||
|
return metric;
|
||||||
|
});
|
||||||
|
queue_out_num_writes_metric =
|
||||||
|
telemetry_mgr->CounterInstance("zeek", "msg_thread_queue_out_writes", {{"thread_name", Name()}},
|
||||||
|
"Number of writes from msg thread input queue", "",
|
||||||
|
[]() -> prometheus::ClientMetric {
|
||||||
|
prometheus::ClientMetric metric;
|
||||||
|
auto stats = get_queue_out_stats();
|
||||||
|
metric.gauge.value = static_cast<double>(stats.num_writes);
|
||||||
|
return metric;
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
MsgThread::~MsgThread() {
|
MsgThread::~MsgThread() {
|
||||||
|
|
|
@ -802,6 +802,7 @@ SetupResult setup(int argc, char** argv, Options* zopts) {
|
||||||
RecordType::InitPostScript();
|
RecordType::InitPostScript();
|
||||||
|
|
||||||
telemetry_mgr->InitPostScript();
|
telemetry_mgr->InitPostScript();
|
||||||
|
thread_mgr->InitPostScript();
|
||||||
iosource_mgr->InitPostScript();
|
iosource_mgr->InitPostScript();
|
||||||
log_mgr->InitPostScript();
|
log_mgr->InitPostScript();
|
||||||
plugin_mgr->InitPostScript();
|
plugin_mgr->InitPostScript();
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue