mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 06:38:20 +00:00
cluster: Introduce telemetry component
This commit is contained in:
parent
d3593e0489
commit
4c34274a6c
11 changed files with 543 additions and 1 deletions
|
@ -1,6 +1,7 @@
|
||||||
# Load the core cluster support.
|
# Load the core cluster support.
|
||||||
@load ./main
|
@load ./main
|
||||||
@load ./pools
|
@load ./pools
|
||||||
|
@load ./telemetry
|
||||||
|
|
||||||
@if ( Cluster::is_enabled() )
|
@if ( Cluster::is_enabled() )
|
||||||
|
|
||||||
|
|
40
scripts/base/frameworks/cluster/telemetry.zeek
Normal file
40
scripts/base/frameworks/cluster/telemetry.zeek
Normal file
|
@ -0,0 +1,40 @@
|
||||||
|
## Module for cluster telemetry.
|
||||||
|
module Cluster::Telemetry;
|
||||||
|
|
||||||
|
export {
|
||||||
|
type Type: enum {
|
||||||
|
## Creates counter metrics for incoming and for outgoing
|
||||||
|
## events without labels.
|
||||||
|
INFO,
|
||||||
|
## Creates counter metrics for incoming and outgoing events
|
||||||
|
## labeled with handler and normalized topic names.
|
||||||
|
VERBOSE,
|
||||||
|
## Creates histogram metrics using the serialized message size
|
||||||
|
## for events, labeled by topic, handler and script location
|
||||||
|
## (outgoing only).
|
||||||
|
DEBUG,
|
||||||
|
};
|
||||||
|
|
||||||
|
## The telemetry types to enable for the core backend.
|
||||||
|
const core_metrics: set[Type] = {
|
||||||
|
INFO,
|
||||||
|
} &redef;
|
||||||
|
|
||||||
|
## The telemetry types to enable for WebSocket backends.
|
||||||
|
const websocket_metrics: set[Type] = {
|
||||||
|
INFO,
|
||||||
|
} &redef;
|
||||||
|
|
||||||
|
## Table used for normalizing topic names that contain random parts.
|
||||||
|
## Map to an empty string to skip recording a specific metric
|
||||||
|
## completely.
|
||||||
|
const topic_normalizations: table[pattern] of string = {
|
||||||
|
[/^zeek\.cluster\.nodeid\..*/] = "zeek.cluster.nodeid.__normalized__",
|
||||||
|
[/^zeek\/cluster\/nodeid\/.*/] = "zeek/cluster/nodeid/__normalized__",
|
||||||
|
} &ordered &redef;
|
||||||
|
|
||||||
|
## For the DEBUG metrics, the histogram buckets to use.
|
||||||
|
const message_size_bounds: vector of double = {
|
||||||
|
10.0, 50.0, 100.0, 500.0, 1000.0, 5000.0, 10000.0, 50000.0,
|
||||||
|
} &redef;
|
||||||
|
}
|
|
@ -16,6 +16,7 @@
|
||||||
#include "zeek/cluster/Manager.h"
|
#include "zeek/cluster/Manager.h"
|
||||||
#include "zeek/cluster/OnLoop.h"
|
#include "zeek/cluster/OnLoop.h"
|
||||||
#include "zeek/cluster/Serializer.h"
|
#include "zeek/cluster/Serializer.h"
|
||||||
|
#include "zeek/cluster/Telemetry.h"
|
||||||
#include "zeek/cluster/cluster.bif.h"
|
#include "zeek/cluster/cluster.bif.h"
|
||||||
#include "zeek/logging/Manager.h"
|
#include "zeek/logging/Manager.h"
|
||||||
#include "zeek/plugin/Manager.h"
|
#include "zeek/plugin/Manager.h"
|
||||||
|
@ -128,6 +129,9 @@ Backend::Backend(std::string_view arg_name, std::unique_ptr<EventSerializer> es,
|
||||||
tag = zeek::cluster::manager->Backends().GetComponentTag(name);
|
tag = zeek::cluster::manager->Backends().GetComponentTag(name);
|
||||||
if ( ! tag )
|
if ( ! tag )
|
||||||
reporter->InternalError("unknown cluster backend name '%s'; mismatch with tag component?", name.c_str());
|
reporter->InternalError("unknown cluster backend name '%s'; mismatch with tag component?", name.c_str());
|
||||||
|
|
||||||
|
// No telemetry by default.
|
||||||
|
telemetry = std::make_unique<detail::NullTelemetry>();
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Backend::Init(std::string nid) {
|
bool Backend::Init(std::string nid) {
|
||||||
|
@ -185,6 +189,8 @@ bool Backend::DoPublishEvent(const std::string& topic, cluster::detail::Event& e
|
||||||
if ( ! event_serializer->SerializeEvent(buf, event) )
|
if ( ! event_serializer->SerializeEvent(buf, event) )
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
|
Telemetry().OnOutgoingEvent(topic, event.HandlerName(), detail::SerializationInfo{buf.size()});
|
||||||
|
|
||||||
return DoPublishEvent(topic, event_serializer->Name(), buf);
|
return DoPublishEvent(topic, event_serializer->Name(), buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -227,6 +233,8 @@ bool Backend::ProcessEventMessage(std::string_view topic, std::string_view forma
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Telemetry().OnIncomingEvent(topic, r->HandlerName(), detail::SerializationInfo{payload.size()});
|
||||||
|
|
||||||
return ProcessEvent(topic, std::move(*r));
|
return ProcessEvent(topic, std::move(*r));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
#include "zeek/ZeekArgs.h"
|
#include "zeek/ZeekArgs.h"
|
||||||
#include "zeek/cluster/BifSupport.h"
|
#include "zeek/cluster/BifSupport.h"
|
||||||
#include "zeek/cluster/Serializer.h"
|
#include "zeek/cluster/Serializer.h"
|
||||||
|
#include "zeek/cluster/Telemetry.h"
|
||||||
#include "zeek/logging/Types.h"
|
#include "zeek/logging/Types.h"
|
||||||
|
|
||||||
namespace zeek {
|
namespace zeek {
|
||||||
|
@ -341,6 +342,13 @@ public:
|
||||||
*/
|
*/
|
||||||
const std::string& NodeId() const { return node_id; }
|
const std::string& NodeId() const { return node_id; }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the telemetry handle to be used by this backend.
|
||||||
|
*
|
||||||
|
* @param The new telemetry instance to use.
|
||||||
|
*/
|
||||||
|
void SetTelemetry(detail::TelemetryPtr new_telemetry) { telemetry = std::move(new_telemetry); }
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
/**
|
/**
|
||||||
* Constructor.
|
* Constructor.
|
||||||
|
@ -410,6 +418,14 @@ protected:
|
||||||
*/
|
*/
|
||||||
void SetNodeId(std::string nid);
|
void SetNodeId(std::string nid);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Provides access to the detail::Telemetry handle.
|
||||||
|
*/
|
||||||
|
detail::Telemetry& Telemetry() {
|
||||||
|
assert(telemetry);
|
||||||
|
return *telemetry;
|
||||||
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
/**
|
/**
|
||||||
* Called after all Zeek scripts have been loaded.
|
* Called after all Zeek scripts have been loaded.
|
||||||
|
@ -550,6 +566,8 @@ private:
|
||||||
* The backend's instance cluster node identifier.
|
* The backend's instance cluster node identifier.
|
||||||
*/
|
*/
|
||||||
std::string node_id;
|
std::string node_id;
|
||||||
|
|
||||||
|
detail::TelemetryPtr telemetry;
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -4,10 +4,11 @@ zeek_add_subdir_library(
|
||||||
${CMAKE_CURRENT_SOURCE_DIR}
|
${CMAKE_CURRENT_SOURCE_DIR}
|
||||||
${CMAKE_CURRENT_BINARY_DIR}
|
${CMAKE_CURRENT_BINARY_DIR}
|
||||||
SOURCES
|
SOURCES
|
||||||
Component.cc
|
|
||||||
Backend.cc
|
Backend.cc
|
||||||
BifSupport.cc
|
BifSupport.cc
|
||||||
|
Component.cc
|
||||||
Manager.cc
|
Manager.cc
|
||||||
|
Telemetry.cc
|
||||||
BIFS
|
BIFS
|
||||||
cluster.bif)
|
cluster.bif)
|
||||||
|
|
||||||
|
|
268
src/cluster/Telemetry.cc
Normal file
268
src/cluster/Telemetry.cc
Normal file
|
@ -0,0 +1,268 @@
|
||||||
|
// See the file "COPYING" in the main distribution directory for copyright.
|
||||||
|
|
||||||
|
#include "zeek/cluster/Telemetry.h"
|
||||||
|
|
||||||
|
#include <cinttypes>
|
||||||
|
|
||||||
|
#include "zeek/Desc.h"
|
||||||
|
#include "zeek/Expr.h"
|
||||||
|
#include "zeek/Func.h"
|
||||||
|
#include "zeek/IntrusivePtr.h"
|
||||||
|
#include "zeek/Reporter.h"
|
||||||
|
#include "zeek/Val.h"
|
||||||
|
#include "zeek/ZeekString.h"
|
||||||
|
#include "zeek/cluster/Backend.h"
|
||||||
|
#include "zeek/telemetry/Manager.h"
|
||||||
|
#include "zeek/util.h"
|
||||||
|
|
||||||
|
namespace zeek::cluster::detail {
|
||||||
|
|
||||||
|
TableTopicNormalizer::TableTopicNormalizer() {
|
||||||
|
topic_normalizations = zeek::id::find_val<zeek::TableVal>("Cluster::Telemetry::topic_normalizations");
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string_view TableTopicNormalizer::operator()(std::string_view topic) {
|
||||||
|
// TODO: It'd be nice if we could just lookup via string_view so we can
|
||||||
|
// avoid the allocation of the intermediary StringVal just to match
|
||||||
|
// against the patterns.
|
||||||
|
auto sv = zeek::make_intrusive<zeek::StringVal>(topic);
|
||||||
|
VectorValPtr r = topic_normalizations->LookupPattern(sv);
|
||||||
|
|
||||||
|
if ( r->Size() == 0 )
|
||||||
|
return topic;
|
||||||
|
|
||||||
|
// I think this is safe: It returns a string_view to a StringVal that's stored
|
||||||
|
// persistently in a table[pattern] of string. We only need the string_view for
|
||||||
|
// looking up the right counter.
|
||||||
|
return r->StringValAt(0)->ToStdStringView();
|
||||||
|
}
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
|
||||||
|
std::vector<telemetry::LabelView> to_label_view_vec(const LabelList& static_label_list) {
|
||||||
|
std::vector<telemetry::LabelView> labels_view_vec;
|
||||||
|
labels_view_vec.reserve(static_label_list.size());
|
||||||
|
|
||||||
|
for ( const auto& [name, value] : static_label_list )
|
||||||
|
labels_view_vec.emplace_back(name, value);
|
||||||
|
|
||||||
|
return labels_view_vec;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector<std::string_view> to_label_names_vec(const LabelList& static_label_list) {
|
||||||
|
std::vector<std::string_view> label_names_vec;
|
||||||
|
label_names_vec.reserve(static_label_list.size());
|
||||||
|
|
||||||
|
for ( const auto& [name, value] : static_label_list )
|
||||||
|
label_names_vec.emplace_back(name);
|
||||||
|
|
||||||
|
return label_names_vec;
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace
|
||||||
|
|
||||||
|
InfoTelemetry::InfoTelemetry(std::string_view name, LabelList static_labels, std::string_view prefix) {
|
||||||
|
if ( name != "core" && name != "websocket" )
|
||||||
|
zeek::reporter->FatalError("name can only be backend or websocket, got '%s'", std::string(name).c_str());
|
||||||
|
|
||||||
|
std::string out_name = util::fmt("cluster_%s_outgoing_events", std::string(name).c_str());
|
||||||
|
std::string in_name = util::fmt("cluster_%s_incoming_events", std::string(name).c_str());
|
||||||
|
|
||||||
|
auto label_view_vec = to_label_view_vec(static_labels);
|
||||||
|
|
||||||
|
out = zeek::telemetry_mgr->CounterInstance(prefix, out_name, label_view_vec, "Number of outgoing events");
|
||||||
|
in = zeek::telemetry_mgr->CounterInstance(prefix, in_name, label_view_vec, "Number of incoming events");
|
||||||
|
}
|
||||||
|
|
||||||
|
void InfoTelemetry::OnOutgoingEvent(std::string_view topic, std::string_view handler_name,
|
||||||
|
const SerializationInfo& info) {
|
||||||
|
out->Inc();
|
||||||
|
}
|
||||||
|
|
||||||
|
void InfoTelemetry::OnIncomingEvent(std::string_view topic, std::string_view handler_name,
|
||||||
|
const SerializationInfo& info) {
|
||||||
|
in->Inc();
|
||||||
|
}
|
||||||
|
|
||||||
|
VerboseTelemetry::VerboseTelemetry(TopicNormalizer topic_normalizer, std::string_view name, LabelList arg_static_labels,
|
||||||
|
std::string_view prefix)
|
||||||
|
: topic_normalizer(std::move(topic_normalizer)), labels(std::move(arg_static_labels)) {
|
||||||
|
if ( name != "core" && name != "websocket" )
|
||||||
|
zeek::reporter->FatalError("name can only be backend or websocket, got '%s'", std::string(name).c_str());
|
||||||
|
|
||||||
|
// Add topic and handler to the labels. This assumes the caller didn't provide them already.
|
||||||
|
topic_idx = labels.size();
|
||||||
|
labels.emplace_back("topic", "");
|
||||||
|
handler_idx = labels.size();
|
||||||
|
labels.emplace_back("handler", "");
|
||||||
|
|
||||||
|
labels_view = to_label_view_vec(labels);
|
||||||
|
|
||||||
|
auto label_names = to_label_names_vec(labels);
|
||||||
|
|
||||||
|
std::string out_name = util::fmt("cluster_%s_verbose_outgoing_events", std::string(name).c_str());
|
||||||
|
std::string in_name = util::fmt("cluster_%s_verbose_incoming_events", std::string(name).c_str());
|
||||||
|
|
||||||
|
out = zeek::telemetry_mgr->CounterFamily(prefix, out_name, label_names,
|
||||||
|
"Number of outgoing events with topic and handler information");
|
||||||
|
in = zeek::telemetry_mgr->CounterFamily(prefix, in_name, label_names,
|
||||||
|
"Number of incoming events with topic and handler information");
|
||||||
|
}
|
||||||
|
|
||||||
|
void VerboseTelemetry::OnOutgoingEvent(std::string_view topic, std::string_view handler_name,
|
||||||
|
const SerializationInfo& info) {
|
||||||
|
auto normalized_topic = topic_normalizer(topic);
|
||||||
|
|
||||||
|
labels_view[topic_idx].second = normalized_topic;
|
||||||
|
labels_view[handler_idx].second = handler_name;
|
||||||
|
|
||||||
|
out->GetOrAdd(labels_view)->Inc();
|
||||||
|
}
|
||||||
|
|
||||||
|
void VerboseTelemetry::OnIncomingEvent(std::string_view topic, std::string_view handler_name,
|
||||||
|
const SerializationInfo& info) {
|
||||||
|
auto normalized_topic = topic_normalizer(topic);
|
||||||
|
|
||||||
|
labels_view[topic_idx].second = normalized_topic;
|
||||||
|
labels_view[handler_idx].second = handler_name;
|
||||||
|
|
||||||
|
in->GetOrAdd(labels_view)->Inc();
|
||||||
|
}
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
|
||||||
|
std::string determine_script_location() {
|
||||||
|
std::string result = "none";
|
||||||
|
|
||||||
|
if ( zeek::detail::call_stack.empty() )
|
||||||
|
return result;
|
||||||
|
|
||||||
|
ssize_t sidx = static_cast<ssize_t>(zeek::detail::call_stack.size()) - 1;
|
||||||
|
|
||||||
|
while ( sidx >= 0 ) {
|
||||||
|
const auto* func = zeek::detail::call_stack[sidx].func;
|
||||||
|
const auto* ce = zeek::detail::call_stack[sidx].call;
|
||||||
|
|
||||||
|
// without_zeekpath_component looks pretty expensive and might be
|
||||||
|
// better to cache the result using the ce pointer instead of computing
|
||||||
|
// it over and over again.
|
||||||
|
const auto* loc = ce->GetLocationInfo();
|
||||||
|
std::string normalized_location = zeek::util::detail::without_zeekpath_component(loc->filename);
|
||||||
|
result = normalized_location + ":" + std::to_string(loc->first_line);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace
|
||||||
|
|
||||||
|
|
||||||
|
DebugTelemetry::DebugTelemetry(TopicNormalizer topic_normalizer, std::string_view name, LabelList static_labels,
|
||||||
|
std::vector<double> arg_size_bounds, std::string_view prefix)
|
||||||
|
: topic_normalizer(std::move(topic_normalizer)),
|
||||||
|
size_bounds(std::move(arg_size_bounds)),
|
||||||
|
labels(std::move(static_labels)) {
|
||||||
|
if ( name != "core" && name != "websocket" )
|
||||||
|
zeek::reporter->FatalError("name can only be backend or websocket, got '%s'", std::string(name).c_str());
|
||||||
|
|
||||||
|
// Add topic, handler and script_location to the labels. This assumes the caller didn't provide them already.
|
||||||
|
topic_idx = labels.size();
|
||||||
|
labels.emplace_back("topic", "");
|
||||||
|
handler_idx = labels.size();
|
||||||
|
labels.emplace_back("handler", "");
|
||||||
|
script_location_idx = labels.size();
|
||||||
|
labels.emplace_back("script_location", "");
|
||||||
|
|
||||||
|
labels_view = to_label_view_vec(labels);
|
||||||
|
labels_view_no_location = zeek::Span{labels_view.data(), labels_view.size() - 1};
|
||||||
|
|
||||||
|
auto label_names = to_label_names_vec(labels);
|
||||||
|
|
||||||
|
std::string out_name = util::fmt("cluster_%s_debug_outgoing_event_sizes", std::string(name).c_str());
|
||||||
|
std::string in_name = util::fmt("cluster_%s_debug_incoming_event_sizes", std::string(name).c_str());
|
||||||
|
|
||||||
|
out = zeek::telemetry_mgr->HistogramFamily(
|
||||||
|
prefix, out_name, label_names, size_bounds,
|
||||||
|
"The number and size distribution of outgoing events with topic, handler and script location information");
|
||||||
|
|
||||||
|
// Remove script-location from incoming metrics
|
||||||
|
label_names.pop_back();
|
||||||
|
|
||||||
|
in =
|
||||||
|
zeek::telemetry_mgr
|
||||||
|
->HistogramFamily(prefix, in_name, label_names, size_bounds,
|
||||||
|
"The number and size distribution of incoming events with topic and handler information");
|
||||||
|
}
|
||||||
|
|
||||||
|
void DebugTelemetry::OnOutgoingEvent(std::string_view topic, std::string_view handler_name,
|
||||||
|
const SerializationInfo& info) {
|
||||||
|
auto normalized_topic = topic_normalizer(topic);
|
||||||
|
std::string script_location = determine_script_location();
|
||||||
|
|
||||||
|
labels_view[topic_idx].second = normalized_topic;
|
||||||
|
labels_view[handler_idx].second = handler_name;
|
||||||
|
labels_view[script_location_idx].second = script_location;
|
||||||
|
|
||||||
|
const auto& hist = out->GetOrAdd(labels_view);
|
||||||
|
hist->Observe(static_cast<double>(info.Size()));
|
||||||
|
}
|
||||||
|
|
||||||
|
void DebugTelemetry::OnIncomingEvent(std::string_view topic, std::string_view handler_name,
|
||||||
|
const SerializationInfo& info) {
|
||||||
|
auto normalized_topic = topic_normalizer(topic);
|
||||||
|
|
||||||
|
labels_view[topic_idx].second = normalized_topic;
|
||||||
|
labels_view[handler_idx].second = handler_name;
|
||||||
|
|
||||||
|
const auto& hist = in->GetOrAdd(labels_view_no_location);
|
||||||
|
hist->Observe(static_cast<double>(info.Size()));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reads Cluster::Telemetry consts, instantiates and appropriate Telemetry instance and configures
|
||||||
|
// the given backend with it.
|
||||||
|
void configure_backend_telemetry(Backend& backend, std::string_view name, LabelList static_labels) {
|
||||||
|
if ( name != "core" && name != "websocket" )
|
||||||
|
zeek::reporter->FatalError("name can only be backend or websocket, got '%s'", std::string(name).c_str());
|
||||||
|
|
||||||
|
static const auto& info = zeek::id::find_val<zeek::EnumVal>("Cluster::Telemetry::INFO");
|
||||||
|
static const auto& verbose = zeek::id::find_val<zeek::EnumVal>("Cluster::Telemetry::VERBOSE");
|
||||||
|
static const auto& debug = zeek::id::find_val<zeek::EnumVal>("Cluster::Telemetry::DEBUG");
|
||||||
|
|
||||||
|
std::string var_name = util::fmt("Cluster::Telemetry::%s_metrics", std::string(name).c_str());
|
||||||
|
static const auto& metrics = zeek::id::find_val<zeek::TableVal>(var_name);
|
||||||
|
|
||||||
|
auto composite = std::make_unique<detail::CompositeTelemetry>();
|
||||||
|
|
||||||
|
for ( const auto& [k, v] : metrics->ToMap() ) {
|
||||||
|
detail::TelemetryPtr child;
|
||||||
|
// Keys are (always?) returned as ListVal, take the first one.
|
||||||
|
auto metric_type = zeek::cast_intrusive<zeek::EnumVal>(k->AsListVal()->Idx(0));
|
||||||
|
|
||||||
|
if ( metric_type == info ) {
|
||||||
|
child = std::make_unique<detail::InfoTelemetry>(name, static_labels);
|
||||||
|
}
|
||||||
|
else if ( metric_type == verbose ) {
|
||||||
|
child = std::make_unique<detail::VerboseTelemetry>(cluster::detail::TableTopicNormalizer(), name,
|
||||||
|
static_labels);
|
||||||
|
}
|
||||||
|
else if ( metric_type == debug ) {
|
||||||
|
auto bound_val_vec = zeek::id::find_val<zeek::VectorVal>("Cluster::Telemetry::message_size_bounds");
|
||||||
|
std::vector<double> bounds_vec(bound_val_vec->Size());
|
||||||
|
for ( unsigned int i = 0; i < bound_val_vec->Size(); i++ )
|
||||||
|
bounds_vec[i] = bound_val_vec->DoubleAt(i);
|
||||||
|
child = std::make_unique<detail::DebugTelemetry>(cluster::detail::TableTopicNormalizer(), name,
|
||||||
|
static_labels, std::move(bounds_vec));
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
zeek::reporter->FatalError("Invalid metric_type %s %" PRIu64, obj_desc_short(metric_type).c_str(),
|
||||||
|
metric_type->Get());
|
||||||
|
}
|
||||||
|
|
||||||
|
composite->Add(std::move(child));
|
||||||
|
}
|
||||||
|
|
||||||
|
backend.SetTelemetry(std::move(composite));
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace zeek::cluster::detail
|
195
src/cluster/Telemetry.h
Normal file
195
src/cluster/Telemetry.h
Normal file
|
@ -0,0 +1,195 @@
|
||||||
|
// See the file "COPYING" in the main distribution directory for copyright.
|
||||||
|
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <cstddef>
|
||||||
|
#include <functional>
|
||||||
|
#include <memory>
|
||||||
|
#include <string>
|
||||||
|
#include <string_view>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
|
#include "zeek/IntrusivePtr.h"
|
||||||
|
#include "zeek/Span.h"
|
||||||
|
|
||||||
|
|
||||||
|
namespace zeek {
|
||||||
|
|
||||||
|
class TableVal;
|
||||||
|
using TableValPtr = zeek::IntrusivePtr<TableVal>;
|
||||||
|
|
||||||
|
namespace telemetry {
|
||||||
|
class Counter;
|
||||||
|
using CounterPtr = std::shared_ptr<Counter>;
|
||||||
|
|
||||||
|
class CounterFamily;
|
||||||
|
using CounterFamilyPtr = std::shared_ptr<CounterFamily>;
|
||||||
|
|
||||||
|
class HistogramFamily;
|
||||||
|
using HistogramFamilyPtr = std::shared_ptr<HistogramFamily>;
|
||||||
|
|
||||||
|
using LabelView = std::pair<std::string_view, std::string_view>;
|
||||||
|
|
||||||
|
} // namespace telemetry
|
||||||
|
|
||||||
|
namespace cluster {
|
||||||
|
|
||||||
|
class Backend;
|
||||||
|
|
||||||
|
namespace detail {
|
||||||
|
|
||||||
|
enum class TelemetryScope {
|
||||||
|
Core,
|
||||||
|
WebSocket,
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Extra information of the serialized version of an Event.
|
||||||
|
*/
|
||||||
|
class SerializationInfo {
|
||||||
|
public:
|
||||||
|
explicit SerializationInfo(size_t size) : size(size) {}
|
||||||
|
|
||||||
|
size_t Size() const { return size; }
|
||||||
|
|
||||||
|
private:
|
||||||
|
size_t size;
|
||||||
|
};
|
||||||
|
|
||||||
|
using TopicNormalizer = std::function<std::string_view(std::string_view)>;
|
||||||
|
using LabelList = std::vector<std::pair<std::string, std::string>>;
|
||||||
|
using LabelViewList = std::vector<std::pair<std::string_view, std::string_view>>;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A topic normalizer using the Cluster::Telemetry::topic_normalizations table.
|
||||||
|
*/
|
||||||
|
class TableTopicNormalizer {
|
||||||
|
public:
|
||||||
|
TableTopicNormalizer();
|
||||||
|
std::string_view operator()(std::string_view topic);
|
||||||
|
|
||||||
|
private:
|
||||||
|
zeek::TableValPtr topic_normalizations;
|
||||||
|
};
|
||||||
|
|
||||||
|
class Telemetry {
|
||||||
|
public:
|
||||||
|
virtual ~Telemetry() = default;
|
||||||
|
|
||||||
|
virtual void OnOutgoingEvent(std::string_view topic, std::string_view handler_name,
|
||||||
|
const SerializationInfo& info) = 0;
|
||||||
|
virtual void OnIncomingEvent(std::string_view topic, std::string_view handler_name,
|
||||||
|
const SerializationInfo& info) = 0;
|
||||||
|
};
|
||||||
|
|
||||||
|
using TelemetryPtr = std::unique_ptr<Telemetry>;
|
||||||
|
|
||||||
|
// Reporting nothing.
|
||||||
|
class NullTelemetry : public Telemetry {
|
||||||
|
void OnOutgoingEvent(std::string_view topic, std::string_view handler_name,
|
||||||
|
const SerializationInfo& info) override {}
|
||||||
|
void OnIncomingEvent(std::string_view topic, std::string_view handler_name,
|
||||||
|
const SerializationInfo& info) override {}
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
// A container for telemetry instances, delegating to its children.
|
||||||
|
class CompositeTelemetry : public Telemetry {
|
||||||
|
public:
|
||||||
|
void OnOutgoingEvent(std::string_view topic, std::string_view handler_name,
|
||||||
|
const SerializationInfo& info) override {
|
||||||
|
for ( const auto& c : children )
|
||||||
|
c->OnOutgoingEvent(topic, handler_name, info);
|
||||||
|
}
|
||||||
|
|
||||||
|
void OnIncomingEvent(std::string_view topic, std::string_view handler_name,
|
||||||
|
const SerializationInfo& info) override {
|
||||||
|
for ( const auto& c : children )
|
||||||
|
c->OnIncomingEvent(topic, handler_name, info);
|
||||||
|
}
|
||||||
|
|
||||||
|
void Add(TelemetryPtr child) { children.push_back(std::move(child)); }
|
||||||
|
|
||||||
|
private:
|
||||||
|
std::vector<TelemetryPtr> children;
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Just one metric for incoming and one for outgoing metrics.
|
||||||
|
*/
|
||||||
|
class InfoTelemetry : public Telemetry {
|
||||||
|
public:
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @param name The metric name without prefix.
|
||||||
|
* @param static_labels Labels to add on all metrics.
|
||||||
|
* @param prefix The metric prefix.
|
||||||
|
*/
|
||||||
|
InfoTelemetry(std::string_view name, LabelList static_labels, std::string_view prefix = "zeek");
|
||||||
|
|
||||||
|
void OnOutgoingEvent(std::string_view topic, std::string_view handler_name, const SerializationInfo& info) override;
|
||||||
|
void OnIncomingEvent(std::string_view topic, std::string_view handler_name, const SerializationInfo& info) override;
|
||||||
|
|
||||||
|
private:
|
||||||
|
telemetry::CounterPtr in, out;
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A telemetry class producing metrics labeled with handler names and topics.
|
||||||
|
*
|
||||||
|
* Note that randomly generated topic names will cause unbounded
|
||||||
|
* metrics growth. A topic_normalizer should be injected to normalize
|
||||||
|
* topic names.
|
||||||
|
*/
|
||||||
|
class VerboseTelemetry : public Telemetry {
|
||||||
|
public:
|
||||||
|
VerboseTelemetry(TopicNormalizer topic_normalizer, std::string_view name, LabelList static_labels,
|
||||||
|
std::string_view prefix = "zeek");
|
||||||
|
|
||||||
|
void OnOutgoingEvent(std::string_view topic, std::string_view handler_name, const SerializationInfo& info) override;
|
||||||
|
void OnIncomingEvent(std::string_view topic, std::string_view handler_name, const SerializationInfo& info) override;
|
||||||
|
|
||||||
|
private:
|
||||||
|
TopicNormalizer topic_normalizer;
|
||||||
|
LabelList labels;
|
||||||
|
LabelViewList labels_view;
|
||||||
|
size_t topic_idx, handler_idx; // Index of topic and handler labels in labels_view
|
||||||
|
telemetry::CounterFamilyPtr in, out;
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A telemetry class producing metrics labeled with topics
|
||||||
|
* and the script layer location for outgoing metrics.
|
||||||
|
*/
|
||||||
|
class DebugTelemetry : public Telemetry {
|
||||||
|
public:
|
||||||
|
DebugTelemetry(TopicNormalizer topic_normalizer, std::string_view name, LabelList static_labels,
|
||||||
|
std::vector<double> size_bounds, std::string_view prefix = "zeek");
|
||||||
|
|
||||||
|
void OnOutgoingEvent(std::string_view topic, std::string_view handler_name, const SerializationInfo& info) override;
|
||||||
|
void OnIncomingEvent(std::string_view topic, std::string_view handler_name, const SerializationInfo& info) override;
|
||||||
|
|
||||||
|
private:
|
||||||
|
TopicNormalizer topic_normalizer;
|
||||||
|
std::vector<double> size_bounds;
|
||||||
|
LabelList labels;
|
||||||
|
LabelViewList labels_view;
|
||||||
|
zeek::Span<telemetry::LabelView> labels_view_no_location;
|
||||||
|
size_t topic_idx, handler_idx,
|
||||||
|
script_location_idx; // Index of topic, handler and script_location labels in labels_view
|
||||||
|
telemetry::HistogramFamilyPtr in, out;
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reads Cluster::Telemetry consts, instantiates and appropriate Telemetry instance
|
||||||
|
* set it on the given backend.
|
||||||
|
*
|
||||||
|
* @param backend The cluster backend to configure.
|
||||||
|
* @param name The name used in the metric names. Either core or websocket at this point.
|
||||||
|
* @param static_labels Static labels to attach to metrics.
|
||||||
|
*/
|
||||||
|
void configure_backend_telemetry(Backend& backend, std::string_view name, LabelList static_labels = {});
|
||||||
|
|
||||||
|
} // namespace detail
|
||||||
|
} // namespace cluster
|
||||||
|
} // namespace zeek
|
|
@ -54,6 +54,7 @@
|
||||||
#include "zeek/broker/Manager.h"
|
#include "zeek/broker/Manager.h"
|
||||||
#include "zeek/cluster/Backend.h"
|
#include "zeek/cluster/Backend.h"
|
||||||
#include "zeek/cluster/Manager.h"
|
#include "zeek/cluster/Manager.h"
|
||||||
|
#include "zeek/cluster/Telemetry.h"
|
||||||
#include "zeek/conn_key/Manager.h"
|
#include "zeek/conn_key/Manager.h"
|
||||||
#include "zeek/file_analysis/Manager.h"
|
#include "zeek/file_analysis/Manager.h"
|
||||||
#include "zeek/input.h"
|
#include "zeek/input.h"
|
||||||
|
@ -891,6 +892,8 @@ SetupResult setup(int argc, char** argv, Options* zopts) {
|
||||||
cluster::backend = backend.release();
|
cluster::backend = backend.release();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cluster::detail::configure_backend_telemetry(*cluster::backend, "core");
|
||||||
|
|
||||||
broker_mgr->InitPostScript();
|
broker_mgr->InitPostScript();
|
||||||
if ( cluster::backend != broker_mgr )
|
if ( cluster::backend != broker_mgr )
|
||||||
cluster::backend->InitPostScript();
|
cluster::backend->InitPostScript();
|
||||||
|
|
|
@ -140,6 +140,7 @@ scripts/base/init-frameworks-and-bifs.zeek
|
||||||
build/scripts/base/bif/plugins/Zeek_Cluster_WebSocket.events.bif.zeek
|
build/scripts/base/bif/plugins/Zeek_Cluster_WebSocket.events.bif.zeek
|
||||||
scripts/base/frameworks/cluster/pools.zeek
|
scripts/base/frameworks/cluster/pools.zeek
|
||||||
scripts/base/utils/hash_hrw.zeek
|
scripts/base/utils/hash_hrw.zeek
|
||||||
|
scripts/base/frameworks/cluster/telemetry.zeek
|
||||||
scripts/base/frameworks/config/__load__.zeek
|
scripts/base/frameworks/config/__load__.zeek
|
||||||
scripts/base/frameworks/config/main.zeek
|
scripts/base/frameworks/config/main.zeek
|
||||||
scripts/base/frameworks/config/input.zeek
|
scripts/base/frameworks/config/input.zeek
|
||||||
|
|
|
@ -140,6 +140,7 @@ scripts/base/init-frameworks-and-bifs.zeek
|
||||||
build/scripts/base/bif/plugins/Zeek_Cluster_WebSocket.events.bif.zeek
|
build/scripts/base/bif/plugins/Zeek_Cluster_WebSocket.events.bif.zeek
|
||||||
scripts/base/frameworks/cluster/pools.zeek
|
scripts/base/frameworks/cluster/pools.zeek
|
||||||
scripts/base/utils/hash_hrw.zeek
|
scripts/base/utils/hash_hrw.zeek
|
||||||
|
scripts/base/frameworks/cluster/telemetry.zeek
|
||||||
scripts/base/frameworks/config/__load__.zeek
|
scripts/base/frameworks/config/__load__.zeek
|
||||||
scripts/base/frameworks/config/main.zeek
|
scripts/base/frameworks/config/main.zeek
|
||||||
scripts/base/frameworks/config/input.zeek
|
scripts/base/frameworks/config/input.zeek
|
||||||
|
|
|
@ -491,6 +491,7 @@
|
||||||
0.000000 MetaHookPost LoadFile(0, ./store.bif.zeek, <...>/store.bif.zeek) -> -1
|
0.000000 MetaHookPost LoadFile(0, ./store.bif.zeek, <...>/store.bif.zeek) -> -1
|
||||||
0.000000 MetaHookPost LoadFile(0, ./strings.bif.zeek, <...>/strings.bif.zeek) -> -1
|
0.000000 MetaHookPost LoadFile(0, ./strings.bif.zeek, <...>/strings.bif.zeek) -> -1
|
||||||
0.000000 MetaHookPost LoadFile(0, ./supervisor.bif.zeek, <...>/supervisor.bif.zeek) -> -1
|
0.000000 MetaHookPost LoadFile(0, ./supervisor.bif.zeek, <...>/supervisor.bif.zeek) -> -1
|
||||||
|
0.000000 MetaHookPost LoadFile(0, ./telemetry, <...>/telemetry.zeek) -> -1
|
||||||
0.000000 MetaHookPost LoadFile(0, ./telemetry_consts.bif.zeek, <...>/telemetry_consts.bif.zeek) -> -1
|
0.000000 MetaHookPost LoadFile(0, ./telemetry_consts.bif.zeek, <...>/telemetry_consts.bif.zeek) -> -1
|
||||||
0.000000 MetaHookPost LoadFile(0, ./telemetry_functions.bif.zeek, <...>/telemetry_functions.bif.zeek) -> -1
|
0.000000 MetaHookPost LoadFile(0, ./telemetry_functions.bif.zeek, <...>/telemetry_functions.bif.zeek) -> -1
|
||||||
0.000000 MetaHookPost LoadFile(0, ./telemetry_types.bif.zeek, <...>/telemetry_types.bif.zeek) -> -1
|
0.000000 MetaHookPost LoadFile(0, ./telemetry_types.bif.zeek, <...>/telemetry_types.bif.zeek) -> -1
|
||||||
|
@ -805,6 +806,7 @@
|
||||||
0.000000 MetaHookPost LoadFileExtended(0, ./store.bif.zeek, <...>/store.bif.zeek) -> (-1, <no content>)
|
0.000000 MetaHookPost LoadFileExtended(0, ./store.bif.zeek, <...>/store.bif.zeek) -> (-1, <no content>)
|
||||||
0.000000 MetaHookPost LoadFileExtended(0, ./strings.bif.zeek, <...>/strings.bif.zeek) -> (-1, <no content>)
|
0.000000 MetaHookPost LoadFileExtended(0, ./strings.bif.zeek, <...>/strings.bif.zeek) -> (-1, <no content>)
|
||||||
0.000000 MetaHookPost LoadFileExtended(0, ./supervisor.bif.zeek, <...>/supervisor.bif.zeek) -> (-1, <no content>)
|
0.000000 MetaHookPost LoadFileExtended(0, ./supervisor.bif.zeek, <...>/supervisor.bif.zeek) -> (-1, <no content>)
|
||||||
|
0.000000 MetaHookPost LoadFileExtended(0, ./telemetry, <...>/telemetry.zeek) -> (-1, <no content>)
|
||||||
0.000000 MetaHookPost LoadFileExtended(0, ./telemetry_consts.bif.zeek, <...>/telemetry_consts.bif.zeek) -> (-1, <no content>)
|
0.000000 MetaHookPost LoadFileExtended(0, ./telemetry_consts.bif.zeek, <...>/telemetry_consts.bif.zeek) -> (-1, <no content>)
|
||||||
0.000000 MetaHookPost LoadFileExtended(0, ./telemetry_functions.bif.zeek, <...>/telemetry_functions.bif.zeek) -> (-1, <no content>)
|
0.000000 MetaHookPost LoadFileExtended(0, ./telemetry_functions.bif.zeek, <...>/telemetry_functions.bif.zeek) -> (-1, <no content>)
|
||||||
0.000000 MetaHookPost LoadFileExtended(0, ./telemetry_types.bif.zeek, <...>/telemetry_types.bif.zeek) -> (-1, <no content>)
|
0.000000 MetaHookPost LoadFileExtended(0, ./telemetry_types.bif.zeek, <...>/telemetry_types.bif.zeek) -> (-1, <no content>)
|
||||||
|
@ -1430,6 +1432,7 @@
|
||||||
0.000000 MetaHookPre LoadFile(0, ./store.bif.zeek, <...>/store.bif.zeek)
|
0.000000 MetaHookPre LoadFile(0, ./store.bif.zeek, <...>/store.bif.zeek)
|
||||||
0.000000 MetaHookPre LoadFile(0, ./strings.bif.zeek, <...>/strings.bif.zeek)
|
0.000000 MetaHookPre LoadFile(0, ./strings.bif.zeek, <...>/strings.bif.zeek)
|
||||||
0.000000 MetaHookPre LoadFile(0, ./supervisor.bif.zeek, <...>/supervisor.bif.zeek)
|
0.000000 MetaHookPre LoadFile(0, ./supervisor.bif.zeek, <...>/supervisor.bif.zeek)
|
||||||
|
0.000000 MetaHookPre LoadFile(0, ./telemetry, <...>/telemetry.zeek)
|
||||||
0.000000 MetaHookPre LoadFile(0, ./telemetry_consts.bif.zeek, <...>/telemetry_consts.bif.zeek)
|
0.000000 MetaHookPre LoadFile(0, ./telemetry_consts.bif.zeek, <...>/telemetry_consts.bif.zeek)
|
||||||
0.000000 MetaHookPre LoadFile(0, ./telemetry_functions.bif.zeek, <...>/telemetry_functions.bif.zeek)
|
0.000000 MetaHookPre LoadFile(0, ./telemetry_functions.bif.zeek, <...>/telemetry_functions.bif.zeek)
|
||||||
0.000000 MetaHookPre LoadFile(0, ./telemetry_types.bif.zeek, <...>/telemetry_types.bif.zeek)
|
0.000000 MetaHookPre LoadFile(0, ./telemetry_types.bif.zeek, <...>/telemetry_types.bif.zeek)
|
||||||
|
@ -1744,6 +1747,7 @@
|
||||||
0.000000 MetaHookPre LoadFileExtended(0, ./store.bif.zeek, <...>/store.bif.zeek)
|
0.000000 MetaHookPre LoadFileExtended(0, ./store.bif.zeek, <...>/store.bif.zeek)
|
||||||
0.000000 MetaHookPre LoadFileExtended(0, ./strings.bif.zeek, <...>/strings.bif.zeek)
|
0.000000 MetaHookPre LoadFileExtended(0, ./strings.bif.zeek, <...>/strings.bif.zeek)
|
||||||
0.000000 MetaHookPre LoadFileExtended(0, ./supervisor.bif.zeek, <...>/supervisor.bif.zeek)
|
0.000000 MetaHookPre LoadFileExtended(0, ./supervisor.bif.zeek, <...>/supervisor.bif.zeek)
|
||||||
|
0.000000 MetaHookPre LoadFileExtended(0, ./telemetry, <...>/telemetry.zeek)
|
||||||
0.000000 MetaHookPre LoadFileExtended(0, ./telemetry_consts.bif.zeek, <...>/telemetry_consts.bif.zeek)
|
0.000000 MetaHookPre LoadFileExtended(0, ./telemetry_consts.bif.zeek, <...>/telemetry_consts.bif.zeek)
|
||||||
0.000000 MetaHookPre LoadFileExtended(0, ./telemetry_functions.bif.zeek, <...>/telemetry_functions.bif.zeek)
|
0.000000 MetaHookPre LoadFileExtended(0, ./telemetry_functions.bif.zeek, <...>/telemetry_functions.bif.zeek)
|
||||||
0.000000 MetaHookPre LoadFileExtended(0, ./telemetry_types.bif.zeek, <...>/telemetry_types.bif.zeek)
|
0.000000 MetaHookPre LoadFileExtended(0, ./telemetry_types.bif.zeek, <...>/telemetry_types.bif.zeek)
|
||||||
|
@ -2380,6 +2384,7 @@
|
||||||
0.000000 | HookLoadFile ./store.bif.zeek <...>/store.bif.zeek
|
0.000000 | HookLoadFile ./store.bif.zeek <...>/store.bif.zeek
|
||||||
0.000000 | HookLoadFile ./strings.bif.zeek <...>/strings.bif.zeek
|
0.000000 | HookLoadFile ./strings.bif.zeek <...>/strings.bif.zeek
|
||||||
0.000000 | HookLoadFile ./supervisor.bif.zeek <...>/supervisor.bif.zeek
|
0.000000 | HookLoadFile ./supervisor.bif.zeek <...>/supervisor.bif.zeek
|
||||||
|
0.000000 | HookLoadFile ./telemetry <...>/telemetry.zeek
|
||||||
0.000000 | HookLoadFile ./telemetry_consts.bif.zeek <...>/telemetry_consts.bif.zeek
|
0.000000 | HookLoadFile ./telemetry_consts.bif.zeek <...>/telemetry_consts.bif.zeek
|
||||||
0.000000 | HookLoadFile ./telemetry_functions.bif.zeek <...>/telemetry_functions.bif.zeek
|
0.000000 | HookLoadFile ./telemetry_functions.bif.zeek <...>/telemetry_functions.bif.zeek
|
||||||
0.000000 | HookLoadFile ./telemetry_types.bif.zeek <...>/telemetry_types.bif.zeek
|
0.000000 | HookLoadFile ./telemetry_types.bif.zeek <...>/telemetry_types.bif.zeek
|
||||||
|
@ -2694,6 +2699,7 @@
|
||||||
0.000000 | HookLoadFileExtended ./store.bif.zeek <...>/store.bif.zeek
|
0.000000 | HookLoadFileExtended ./store.bif.zeek <...>/store.bif.zeek
|
||||||
0.000000 | HookLoadFileExtended ./strings.bif.zeek <...>/strings.bif.zeek
|
0.000000 | HookLoadFileExtended ./strings.bif.zeek <...>/strings.bif.zeek
|
||||||
0.000000 | HookLoadFileExtended ./supervisor.bif.zeek <...>/supervisor.bif.zeek
|
0.000000 | HookLoadFileExtended ./supervisor.bif.zeek <...>/supervisor.bif.zeek
|
||||||
|
0.000000 | HookLoadFileExtended ./telemetry <...>/telemetry.zeek
|
||||||
0.000000 | HookLoadFileExtended ./telemetry_consts.bif.zeek <...>/telemetry_consts.bif.zeek
|
0.000000 | HookLoadFileExtended ./telemetry_consts.bif.zeek <...>/telemetry_consts.bif.zeek
|
||||||
0.000000 | HookLoadFileExtended ./telemetry_functions.bif.zeek <...>/telemetry_functions.bif.zeek
|
0.000000 | HookLoadFileExtended ./telemetry_functions.bif.zeek <...>/telemetry_functions.bif.zeek
|
||||||
0.000000 | HookLoadFileExtended ./telemetry_types.bif.zeek <...>/telemetry_types.bif.zeek
|
0.000000 | HookLoadFileExtended ./telemetry_types.bif.zeek <...>/telemetry_types.bif.zeek
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue