Move broker statistics to be telemetry metrics

This commit is contained in:
Tim Wojtulewicz 2024-06-06 14:51:09 -07:00
parent 44860676a2
commit 206f5cd522
2 changed files with 69 additions and 11 deletions

View file

@ -340,6 +340,43 @@ void Manager::InitPostScript() {
bstate->subscriber.add_topic(broker::topic::store_events(), true);
InitializeBrokerStoreForwarding();
num_peers_metric =
telemetry_mgr->GaugeInstance("zeek", "broker_peers", {}, "Current number of peers connected via broker", "",
[]() -> prometheus::ClientMetric {
prometheus::ClientMetric metric;
metric.gauge.value = static_cast<double>(broker_mgr->peer_count);
return metric;
});
num_stores_metric =
telemetry_mgr->GaugeInstance("zeek", "broker_stores", {}, "Current number of stores connected via broker", "",
[]() -> prometheus::ClientMetric {
prometheus::ClientMetric metric;
metric.gauge.value = static_cast<double>(broker_mgr->data_stores.size());
return metric;
});
num_pending_queries_metric =
telemetry_mgr->GaugeInstance("zeek", "broker_pending_queries", {}, "Current number of pending broker queries",
"", []() -> prometheus::ClientMetric {
prometheus::ClientMetric metric;
metric.gauge.value = static_cast<double>(broker_mgr->pending_queries.size());
return metric;
});
num_events_incoming_metric = telemetry_mgr->CounterInstance("zeek", "broker_incoming_events", {},
"Total number of incoming events via broker");
num_events_outgoing_metric = telemetry_mgr->CounterInstance("zeek", "broker_outgoing_events", {},
"Total number of outgoing events via broker");
num_logs_incoming_metric =
telemetry_mgr->CounterInstance("zeek", "broker_incoming_logs", {}, "Total number of incoming logs via broker");
num_logs_outgoing_metric =
telemetry_mgr->CounterInstance("zeek", "broker_outgoing_logs", {}, "Total number of outgoing logs via broker");
num_ids_incoming_metric =
telemetry_mgr->CounterInstance("zeek", "broker_incoming_ids", {}, "Total number of incoming ids via broker");
num_ids_outgoing_metric =
telemetry_mgr->CounterInstance("zeek", "broker_outgoing_ids", {}, "Total number of outgoing ids via broker");
}
void Manager::InitializeBrokerStoreForwarding() {
@ -528,7 +565,7 @@ bool Manager::PublishEvent(string topic, std::string name, broker::vector args,
DBG_LOG(DBG_BROKER, "Publishing event: %s", RenderEvent(topic, name, args).c_str());
broker::zeek::Event ev(std::move(name), std::move(args), broker::to_timestamp(ts));
bstate->endpoint.publish(std::move(topic), ev.move_data());
++statistics.num_events_outgoing;
num_events_outgoing_metric->Inc();
return true;
}
@ -588,7 +625,7 @@ bool Manager::PublishIdentifier(std::string topic, std::string id) {
broker::zeek::IdentifierUpdate msg(std::move(id), std::move(data.value_));
DBG_LOG(DBG_BROKER, "Publishing id-update: %s", RenderMessage(topic, msg.as_data()).c_str());
bstate->endpoint.publish(std::move(topic), msg.move_data());
++statistics.num_ids_outgoing;
num_ids_outgoing_metric->Inc();
return true;
}
@ -715,8 +752,10 @@ bool Manager::PublishLogWrite(EnumVal* stream, EnumVal* writer, string path, int
++lb.message_count;
lb.msgs[topic].add(std::move(msg));
if ( lb.message_count >= log_batch_size )
statistics.num_logs_outgoing += lb.Flush(bstate->endpoint, log_batch_size);
if ( lb.message_count >= log_batch_size ) {
auto outgoing_logs = static_cast<double>(lb.Flush(bstate->endpoint, log_batch_size));
num_logs_outgoing_metric->Inc(outgoing_logs);
}
return true;
}
@ -746,7 +785,8 @@ size_t Manager::FlushLogBuffers() {
for ( auto& lb : log_buffers )
rval += lb.Flush(bstate->endpoint, log_batch_size);
statistics.num_logs_outgoing += rval;
num_logs_outgoing_metric->Inc(rval);
return rval;
}
@ -1141,7 +1181,7 @@ void Manager::ProcessMessage(std::string_view topic, broker::zeek::Event& ev) {
ts = run_state::network_time;
DBG_LOG(DBG_BROKER, "Process event: %s (%.6f) %s", c_str_safe(name).c_str(), ts, RenderMessage(args).c_str());
++statistics.num_events_incoming;
num_events_incoming_metric->Inc();
auto handler = event_registry->Lookup(name);
if ( ! handler )
@ -1286,7 +1326,7 @@ bool Manager::ProcessMessage(std::string_view, broker::zeek::LogWrite& lw) {
return false;
}
++statistics.num_logs_incoming;
num_logs_incoming_metric->Inc();
auto&& stream_id_name = lw.stream_id().name;
// Get stream ID.
@ -1352,7 +1392,7 @@ bool Manager::ProcessMessage(std::string_view, broker::zeek::IdentifierUpdate& i
return false;
}
++statistics.num_ids_incoming;
num_ids_incoming_metric->Inc();
auto id_name = c_str_safe(iu.id_name());
auto id_value = convert_if_broker_variant_or_move(iu.id_value());
const auto& id = zeek::detail::global_scope()->Find(id_name);
@ -1706,7 +1746,12 @@ const Stats& Manager::GetStatistics() {
statistics.num_stores = data_stores.size();
statistics.num_pending_queries = pending_queries.size();
// The other attributes are set as activity happens.
statistics.num_events_incoming = static_cast<size_t>(num_events_incoming_metric->Value());
statistics.num_events_outgoing = static_cast<size_t>(num_events_outgoing_metric->Value());
statistics.num_logs_incoming = static_cast<size_t>(num_logs_incoming_metric->Value());
statistics.num_logs_outgoing = static_cast<size_t>(num_logs_outgoing_metric->Value());
statistics.num_ids_incoming = static_cast<size_t>(num_ids_incoming_metric->Value());
statistics.num_ids_outgoing = static_cast<size_t>(num_ids_outgoing_metric->Value());
return statistics;
}