diff --git a/CHANGES b/CHANGES index c731c73bfe..1bb44d6840 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,25 @@ +8.0.0-dev.546 | 2025-06-26 14:30:37 +0200 + + * btest/cluster/telemetry: Add smoke testing for telemetry (Arne Welzel, Corelight) + + * cluster/WebSocket: Fetch X-Application-Name header as app label (Arne Welzel, Corelight) + + * cluster/WebSocket: Pass X-Application-Name to dispatcher (Arne Welzel, Corelight) + + This is going to be used to add labels to telemetry if the + X-Application-Name header is set. + + * broker/WebSocketShim: Add calls to Telemetry hooks (Arne Welzel, Corelight) + + WebSocket clients with Broker do not use the normal Broker backend, so + we need to add the telemetry invocations explicitly. + + * cluster/WebSocket: Configure telemetry for WebSocket backends (Arne Welzel, Corelight) + + * broker: Hook up generic cluster telemetry (Arne Welzel, Corelight) + + * cluster: Introduce telemetry component (Arne Welzel, Corelight) + 8.0.0-dev.538 | 2025-06-26 09:58:39 +0100 * Only pass session ticket data in ssl_session_ticket_handshake event (Johanna Amann, Corelight) diff --git a/NEWS b/NEWS index a9ac806269..17c987438d 100644 --- a/NEWS +++ b/NEWS @@ -101,6 +101,29 @@ New Functionality implementation in the ``src/packet_analysis/protocol/ip/conn_key/vlan_fivetuple`` directory for an example. +- Cluster telemetry improvements. Zeek now exposes a configurable number of + metrics regarding outgoing and incoming cluster events. By default, the number + of events sent and received by a Zeek cluster node and any attached WebSocket + clients is tracked as four individual counters. It's possible to gather more + detailed information by adding ``Cluster::Telemetry::VERBOSE`` and + ``Cluster::Telemetry::DEBUG`` to the variables ``Cluster::core_metrics`` and + ``Cluster::webscoket_metrics``: + + redef Cluster::core_metrics += { Cluster::Telemetry::VERBOSE }; + redef Cluster::websocket_metrics += { Cluster::Telemetry::DEBUG }; + + Configuring verbose, adds metrics that are labeled with the event handler + and topic name. Configuring debug, uses histogram metrics to additionally track + the distribution of the serialized event size. Additionally, when debug is selected, + outgoing events are labeled with the script location from where they were published. + +- Support for the X-Application-Name HTTP header was added to the WebSocket API at + ``v1/messages/json``. A WebSocket application connecting to Zeek may set the + X-Application-Name header to a descriptive identifier. The value of this header + will be added to the cluster metrics as ``app`` label. This allows to gather + incoming and outgoing event metrics of a specific WebSocket application, simply + by setting the X-Application-Name header. + - Generic event metadata support. A new ``EventMetadata`` module was added allowing to register generic event metadata types and accessing the current event's metadata using the functions ``current()`` and ``current_all()`` of this module. diff --git a/VERSION b/VERSION index 57099acec3..aa43a6b471 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -8.0.0-dev.538 +8.0.0-dev.546 diff --git a/scripts/base/frameworks/cluster/__load__.zeek b/scripts/base/frameworks/cluster/__load__.zeek index 85fef40c5f..2072b457a7 100644 --- a/scripts/base/frameworks/cluster/__load__.zeek +++ b/scripts/base/frameworks/cluster/__load__.zeek @@ -1,6 +1,7 @@ # Load the core cluster support. @load ./main @load ./pools +@load ./telemetry @if ( Cluster::is_enabled() ) diff --git a/scripts/base/frameworks/cluster/telemetry.zeek b/scripts/base/frameworks/cluster/telemetry.zeek new file mode 100644 index 0000000000..879f8a9770 --- /dev/null +++ b/scripts/base/frameworks/cluster/telemetry.zeek @@ -0,0 +1,40 @@ +## Module for cluster telemetry. +module Cluster::Telemetry; + +export { + type Type: enum { + ## Creates counter metrics for incoming and for outgoing + ## events without labels. + INFO, + ## Creates counter metrics for incoming and outgoing events + ## labeled with handler and normalized topic names. + VERBOSE, + ## Creates histogram metrics using the serialized message size + ## for events, labeled by topic, handler and script location + ## (outgoing only). + DEBUG, + }; + + ## The telemetry types to enable for the core backend. + const core_metrics: set[Type] = { + INFO, + } &redef; + + ## The telemetry types to enable for WebSocket backends. + const websocket_metrics: set[Type] = { + INFO, + } &redef; + + ## Table used for normalizing topic names that contain random parts. + ## Map to an empty string to skip recording a specific metric + ## completely. + const topic_normalizations: table[pattern] of string = { + [/^zeek\.cluster\.nodeid\..*/] = "zeek.cluster.nodeid.__normalized__", + [/^zeek\/cluster\/nodeid\/.*/] = "zeek/cluster/nodeid/__normalized__", + } &ordered &redef; + + ## For the DEBUG metrics, the histogram buckets to use. + const message_size_bounds: vector of double = { + 10.0, 50.0, 100.0, 500.0, 1000.0, 5000.0, 10000.0, 50000.0, + } &redef; +} diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index 06cbb431be..67204893f7 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -34,6 +34,7 @@ #include "zeek/broker/comm.bif.h" #include "zeek/broker/messaging.bif.h" #include "zeek/broker/store.bif.h" +#include "zeek/cluster/Telemetry.h" #include "zeek/cluster/serializer/broker/Serializer.h" #include "zeek/iosource/Manager.h" #include "zeek/logging/Manager.h" @@ -838,6 +839,9 @@ bool Manager::DoPublishEvent(const std::string& topic, cluster::detail::Event& e auto& ev = maybe_ev.value(); + size_t size = ev.as_data().shared_envelope()->raw_bytes().second; + Telemetry().OnOutgoingEvent(topic, event.HandlerName(), cluster::detail::SerializationInfo{size}); + DBG_LOG(DBG_BROKER, "Publishing event: %s", RenderEvent(topic, std::string(ev.name()), ev.args()).c_str()); bstate->endpoint.publish(topic, ev.move_data()); num_events_outgoing_metric->Inc(); @@ -859,6 +863,10 @@ bool Manager::PublishEvent(string topic, std::string name, broker::vector args, } broker::zeek::Event ev(name, args, meta); + + size_t size = ev.as_data().shared_envelope()->raw_bytes().second; + Telemetry().OnOutgoingEvent(topic, name, cluster::detail::SerializationInfo{size}); + DBG_LOG(DBG_BROKER, "Publishing event: %s", RenderEvent(topic, std::string(ev.name()), ev.args()).c_str()); bstate->endpoint.publish(std::move(topic), ev.move_data()); num_events_outgoing_metric->Inc(); @@ -1583,6 +1591,8 @@ void Manager::ProcessMessage(std::string_view topic, broker::zeek::Event& ev) { DBG_LOG(DBG_BROKER, "Process event: %s (with %zu metadata entries) %s", std::string{name}.c_str(), meta ? meta->size() : 0, RenderMessage(args).c_str()); num_events_incoming_metric->Inc(); + size_t size = ev.as_data().shared_envelope()->raw_bytes().second; + Telemetry().OnIncomingEvent(topic, name, cluster::detail::SerializationInfo{size}); auto handler = event_registry->Lookup(name); if ( ! handler ) diff --git a/src/broker/WebSocketShim.cc b/src/broker/WebSocketShim.cc index 552edb749f..dc9c8ba43d 100644 --- a/src/broker/WebSocketShim.cc +++ b/src/broker/WebSocketShim.cc @@ -118,6 +118,9 @@ bool WebSocketShim::DoPublishEvent(const std::string& topic, zeek::cluster::deta return false; } + size_t size = r->as_data().shared_envelope()->raw_bytes().second; + Telemetry().OnOutgoingEvent(topic, r->name(), cluster::detail::SerializationInfo{size}); + auto msg = broker::data_envelope::make(broker::topic(topic), r->as_data()); state->hub.publish(std::move(msg)); return true; @@ -176,6 +179,9 @@ void WebSocketShim::ProcessMessage(std::string_view topic, broker::zeek::Event& return; } + size_t size = ev.as_data().shared_envelope()->raw_bytes().second; + Telemetry().OnIncomingEvent(topic, r->HandlerName(), cluster::detail::SerializationInfo{size}); + ProcessEvent(topic, std::move(*r)); } void WebSocketShim::ProcessMessage(std::string_view topic, broker::zeek::Invalid& invalid) { diff --git a/src/cluster/Backend.cc b/src/cluster/Backend.cc index b0a86ef600..ffa4872e60 100644 --- a/src/cluster/Backend.cc +++ b/src/cluster/Backend.cc @@ -16,6 +16,7 @@ #include "zeek/cluster/Manager.h" #include "zeek/cluster/OnLoop.h" #include "zeek/cluster/Serializer.h" +#include "zeek/cluster/Telemetry.h" #include "zeek/cluster/cluster.bif.h" #include "zeek/logging/Manager.h" #include "zeek/plugin/Manager.h" @@ -128,6 +129,9 @@ Backend::Backend(std::string_view arg_name, std::unique_ptr es, tag = zeek::cluster::manager->Backends().GetComponentTag(name); if ( ! tag ) reporter->InternalError("unknown cluster backend name '%s'; mismatch with tag component?", name.c_str()); + + // No telemetry by default. + telemetry = std::make_unique(); } bool Backend::Init(std::string nid) { @@ -185,6 +189,8 @@ bool Backend::DoPublishEvent(const std::string& topic, cluster::detail::Event& e if ( ! event_serializer->SerializeEvent(buf, event) ) return false; + Telemetry().OnOutgoingEvent(topic, event.HandlerName(), detail::SerializationInfo{buf.size()}); + return DoPublishEvent(topic, event_serializer->Name(), buf); } @@ -227,6 +233,8 @@ bool Backend::ProcessEventMessage(std::string_view topic, std::string_view forma return false; } + Telemetry().OnIncomingEvent(topic, r->HandlerName(), detail::SerializationInfo{payload.size()}); + return ProcessEvent(topic, std::move(*r)); } diff --git a/src/cluster/Backend.h b/src/cluster/Backend.h index 91be697441..f401c02b9d 100644 --- a/src/cluster/Backend.h +++ b/src/cluster/Backend.h @@ -17,6 +17,7 @@ #include "zeek/ZeekArgs.h" #include "zeek/cluster/BifSupport.h" #include "zeek/cluster/Serializer.h" +#include "zeek/cluster/Telemetry.h" #include "zeek/logging/Types.h" namespace zeek { @@ -341,6 +342,13 @@ public: */ const std::string& NodeId() const { return node_id; } + /** + * Set the telemetry handle to be used by this backend. + * + * @param The new telemetry instance to use. + */ + void SetTelemetry(detail::TelemetryPtr new_telemetry) { telemetry = std::move(new_telemetry); } + protected: /** * Constructor. @@ -410,6 +418,14 @@ protected: */ void SetNodeId(std::string nid); + /** + * Provides access to the detail::Telemetry handle. + */ + detail::Telemetry& Telemetry() { + assert(telemetry); + return *telemetry; + } + private: /** * Called after all Zeek scripts have been loaded. @@ -550,6 +566,8 @@ private: * The backend's instance cluster node identifier. */ std::string node_id; + + detail::TelemetryPtr telemetry; }; /** diff --git a/src/cluster/CMakeLists.txt b/src/cluster/CMakeLists.txt index fa2093be63..29f8a9debe 100644 --- a/src/cluster/CMakeLists.txt +++ b/src/cluster/CMakeLists.txt @@ -4,10 +4,11 @@ zeek_add_subdir_library( ${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR} SOURCES - Component.cc Backend.cc BifSupport.cc + Component.cc Manager.cc + Telemetry.cc BIFS cluster.bif) diff --git a/src/cluster/Telemetry.cc b/src/cluster/Telemetry.cc new file mode 100644 index 0000000000..0d21721b30 --- /dev/null +++ b/src/cluster/Telemetry.cc @@ -0,0 +1,268 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#include "zeek/cluster/Telemetry.h" + +#include + +#include "zeek/Desc.h" +#include "zeek/Expr.h" +#include "zeek/Func.h" +#include "zeek/IntrusivePtr.h" +#include "zeek/Reporter.h" +#include "zeek/Val.h" +#include "zeek/ZeekString.h" +#include "zeek/cluster/Backend.h" +#include "zeek/telemetry/Manager.h" +#include "zeek/util.h" + +namespace zeek::cluster::detail { + +TableTopicNormalizer::TableTopicNormalizer() { + topic_normalizations = zeek::id::find_val("Cluster::Telemetry::topic_normalizations"); +} + +std::string_view TableTopicNormalizer::operator()(std::string_view topic) { + // TODO: It'd be nice if we could just lookup via string_view so we can + // avoid the allocation of the intermediary StringVal just to match + // against the patterns. + auto sv = zeek::make_intrusive(topic); + VectorValPtr r = topic_normalizations->LookupPattern(sv); + + if ( r->Size() == 0 ) + return topic; + + // I think this is safe: It returns a string_view to a StringVal that's stored + // persistently in a table[pattern] of string. We only need the string_view for + // looking up the right counter. + return r->StringValAt(0)->ToStdStringView(); +} + +namespace { + +std::vector to_label_view_vec(const LabelList& static_label_list) { + std::vector labels_view_vec; + labels_view_vec.reserve(static_label_list.size()); + + for ( const auto& [name, value] : static_label_list ) + labels_view_vec.emplace_back(name, value); + + return labels_view_vec; +} + +std::vector to_label_names_vec(const LabelList& static_label_list) { + std::vector label_names_vec; + label_names_vec.reserve(static_label_list.size()); + + for ( const auto& [name, value] : static_label_list ) + label_names_vec.emplace_back(name); + + return label_names_vec; +} + +} // namespace + +InfoTelemetry::InfoTelemetry(std::string_view name, LabelList static_labels, std::string_view prefix) { + if ( name != "core" && name != "websocket" ) + zeek::reporter->FatalError("name can only be backend or websocket, got '%s'", std::string(name).c_str()); + + std::string out_name = util::fmt("cluster_%s_outgoing_events", std::string(name).c_str()); + std::string in_name = util::fmt("cluster_%s_incoming_events", std::string(name).c_str()); + + auto label_view_vec = to_label_view_vec(static_labels); + + out = zeek::telemetry_mgr->CounterInstance(prefix, out_name, label_view_vec, "Number of outgoing events"); + in = zeek::telemetry_mgr->CounterInstance(prefix, in_name, label_view_vec, "Number of incoming events"); +} + +void InfoTelemetry::OnOutgoingEvent(std::string_view topic, std::string_view handler_name, + const SerializationInfo& info) { + out->Inc(); +} + +void InfoTelemetry::OnIncomingEvent(std::string_view topic, std::string_view handler_name, + const SerializationInfo& info) { + in->Inc(); +} + +VerboseTelemetry::VerboseTelemetry(TopicNormalizer topic_normalizer, std::string_view name, LabelList arg_static_labels, + std::string_view prefix) + : topic_normalizer(std::move(topic_normalizer)), labels(std::move(arg_static_labels)) { + if ( name != "core" && name != "websocket" ) + zeek::reporter->FatalError("name can only be backend or websocket, got '%s'", std::string(name).c_str()); + + // Add topic and handler to the labels. This assumes the caller didn't provide them already. + topic_idx = labels.size(); + labels.emplace_back("topic", ""); + handler_idx = labels.size(); + labels.emplace_back("handler", ""); + + labels_view = to_label_view_vec(labels); + + auto label_names = to_label_names_vec(labels); + + std::string out_name = util::fmt("cluster_%s_verbose_outgoing_events", std::string(name).c_str()); + std::string in_name = util::fmt("cluster_%s_verbose_incoming_events", std::string(name).c_str()); + + out = zeek::telemetry_mgr->CounterFamily(prefix, out_name, label_names, + "Number of outgoing events with topic and handler information"); + in = zeek::telemetry_mgr->CounterFamily(prefix, in_name, label_names, + "Number of incoming events with topic and handler information"); +} + +void VerboseTelemetry::OnOutgoingEvent(std::string_view topic, std::string_view handler_name, + const SerializationInfo& info) { + auto normalized_topic = topic_normalizer(topic); + + labels_view[topic_idx].second = normalized_topic; + labels_view[handler_idx].second = handler_name; + + out->GetOrAdd(labels_view)->Inc(); +} + +void VerboseTelemetry::OnIncomingEvent(std::string_view topic, std::string_view handler_name, + const SerializationInfo& info) { + auto normalized_topic = topic_normalizer(topic); + + labels_view[topic_idx].second = normalized_topic; + labels_view[handler_idx].second = handler_name; + + in->GetOrAdd(labels_view)->Inc(); +} + +namespace { + +std::string determine_script_location() { + std::string result = "none"; + + if ( zeek::detail::call_stack.empty() ) + return result; + + ssize_t sidx = static_cast(zeek::detail::call_stack.size()) - 1; + + while ( sidx >= 0 ) { + const auto* func = zeek::detail::call_stack[sidx].func; + const auto* ce = zeek::detail::call_stack[sidx].call; + + // without_zeekpath_component looks pretty expensive and might be + // better to cache the result using the ce pointer instead of computing + // it over and over again. + const auto* loc = ce->GetLocationInfo(); + std::string normalized_location = zeek::util::detail::without_zeekpath_component(loc->filename); + result = normalized_location + ":" + std::to_string(loc->first_line); + break; + } + + return result; +} + +} // namespace + + +DebugTelemetry::DebugTelemetry(TopicNormalizer topic_normalizer, std::string_view name, LabelList static_labels, + std::vector arg_size_bounds, std::string_view prefix) + : topic_normalizer(std::move(topic_normalizer)), + size_bounds(std::move(arg_size_bounds)), + labels(std::move(static_labels)) { + if ( name != "core" && name != "websocket" ) + zeek::reporter->FatalError("name can only be backend or websocket, got '%s'", std::string(name).c_str()); + + // Add topic, handler and script_location to the labels. This assumes the caller didn't provide them already. + topic_idx = labels.size(); + labels.emplace_back("topic", ""); + handler_idx = labels.size(); + labels.emplace_back("handler", ""); + script_location_idx = labels.size(); + labels.emplace_back("script_location", ""); + + labels_view = to_label_view_vec(labels); + labels_view_no_location = zeek::Span{labels_view.data(), labels_view.size() - 1}; + + auto label_names = to_label_names_vec(labels); + + std::string out_name = util::fmt("cluster_%s_debug_outgoing_event_sizes", std::string(name).c_str()); + std::string in_name = util::fmt("cluster_%s_debug_incoming_event_sizes", std::string(name).c_str()); + + out = zeek::telemetry_mgr->HistogramFamily( + prefix, out_name, label_names, size_bounds, + "The number and size distribution of outgoing events with topic, handler and script location information"); + + // Remove script-location from incoming metrics + label_names.pop_back(); + + in = + zeek::telemetry_mgr + ->HistogramFamily(prefix, in_name, label_names, size_bounds, + "The number and size distribution of incoming events with topic and handler information"); +} + +void DebugTelemetry::OnOutgoingEvent(std::string_view topic, std::string_view handler_name, + const SerializationInfo& info) { + auto normalized_topic = topic_normalizer(topic); + std::string script_location = determine_script_location(); + + labels_view[topic_idx].second = normalized_topic; + labels_view[handler_idx].second = handler_name; + labels_view[script_location_idx].second = script_location; + + const auto& hist = out->GetOrAdd(labels_view); + hist->Observe(static_cast(info.Size())); +} + +void DebugTelemetry::OnIncomingEvent(std::string_view topic, std::string_view handler_name, + const SerializationInfo& info) { + auto normalized_topic = topic_normalizer(topic); + + labels_view[topic_idx].second = normalized_topic; + labels_view[handler_idx].second = handler_name; + + const auto& hist = in->GetOrAdd(labels_view_no_location); + hist->Observe(static_cast(info.Size())); +} + +// Reads Cluster::Telemetry consts, instantiates and appropriate Telemetry instance and configures +// the given backend with it. +void configure_backend_telemetry(Backend& backend, std::string_view name, LabelList static_labels) { + if ( name != "core" && name != "websocket" ) + zeek::reporter->FatalError("name can only be backend or websocket, got '%s'", std::string(name).c_str()); + + static const auto& info = zeek::id::find_val("Cluster::Telemetry::INFO"); + static const auto& verbose = zeek::id::find_val("Cluster::Telemetry::VERBOSE"); + static const auto& debug = zeek::id::find_val("Cluster::Telemetry::DEBUG"); + + std::string var_name = util::fmt("Cluster::Telemetry::%s_metrics", std::string(name).c_str()); + const auto& metrics = zeek::id::find_val(var_name); + + auto composite = std::make_unique(); + + for ( const auto& [k, v] : metrics->ToMap() ) { + detail::TelemetryPtr child; + // Keys are (always?) returned as ListVal, take the first one. + auto metric_type = zeek::cast_intrusive(k->AsListVal()->Idx(0)); + + if ( metric_type == info ) { + child = std::make_unique(name, static_labels); + } + else if ( metric_type == verbose ) { + child = std::make_unique(cluster::detail::TableTopicNormalizer(), name, + static_labels); + } + else if ( metric_type == debug ) { + auto bound_val_vec = zeek::id::find_val("Cluster::Telemetry::message_size_bounds"); + std::vector bounds_vec(bound_val_vec->Size()); + for ( unsigned int i = 0; i < bound_val_vec->Size(); i++ ) + bounds_vec[i] = bound_val_vec->DoubleAt(i); + child = std::make_unique(cluster::detail::TableTopicNormalizer(), name, + static_labels, std::move(bounds_vec)); + } + else { + zeek::reporter->FatalError("Invalid metric_type %s %" PRIu64, obj_desc_short(metric_type).c_str(), + metric_type->Get()); + } + + composite->Add(std::move(child)); + } + + backend.SetTelemetry(std::move(composite)); +} + +} // namespace zeek::cluster::detail diff --git a/src/cluster/Telemetry.h b/src/cluster/Telemetry.h new file mode 100644 index 0000000000..f0462ab8ed --- /dev/null +++ b/src/cluster/Telemetry.h @@ -0,0 +1,195 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "zeek/IntrusivePtr.h" +#include "zeek/Span.h" + + +namespace zeek { + +class TableVal; +using TableValPtr = zeek::IntrusivePtr; + +namespace telemetry { +class Counter; +using CounterPtr = std::shared_ptr; + +class CounterFamily; +using CounterFamilyPtr = std::shared_ptr; + +class HistogramFamily; +using HistogramFamilyPtr = std::shared_ptr; + +using LabelView = std::pair; + +} // namespace telemetry + +namespace cluster { + +class Backend; + +namespace detail { + +enum class TelemetryScope { + Core, + WebSocket, +}; + +/** + * Extra information of the serialized version of an Event. + */ +class SerializationInfo { +public: + explicit SerializationInfo(size_t size) : size(size) {} + + size_t Size() const { return size; } + +private: + size_t size; +}; + +using TopicNormalizer = std::function; +using LabelList = std::vector>; +using LabelViewList = std::vector>; + +/** + * A topic normalizer using the Cluster::Telemetry::topic_normalizations table. + */ +class TableTopicNormalizer { +public: + TableTopicNormalizer(); + std::string_view operator()(std::string_view topic); + +private: + zeek::TableValPtr topic_normalizations; +}; + +class Telemetry { +public: + virtual ~Telemetry() = default; + + virtual void OnOutgoingEvent(std::string_view topic, std::string_view handler_name, + const SerializationInfo& info) = 0; + virtual void OnIncomingEvent(std::string_view topic, std::string_view handler_name, + const SerializationInfo& info) = 0; +}; + +using TelemetryPtr = std::unique_ptr; + +// Reporting nothing. +class NullTelemetry : public Telemetry { + void OnOutgoingEvent(std::string_view topic, std::string_view handler_name, + const SerializationInfo& info) override {} + void OnIncomingEvent(std::string_view topic, std::string_view handler_name, + const SerializationInfo& info) override {} +}; + + +// A container for telemetry instances, delegating to its children. +class CompositeTelemetry : public Telemetry { +public: + void OnOutgoingEvent(std::string_view topic, std::string_view handler_name, + const SerializationInfo& info) override { + for ( const auto& c : children ) + c->OnOutgoingEvent(topic, handler_name, info); + } + + void OnIncomingEvent(std::string_view topic, std::string_view handler_name, + const SerializationInfo& info) override { + for ( const auto& c : children ) + c->OnIncomingEvent(topic, handler_name, info); + } + + void Add(TelemetryPtr child) { children.push_back(std::move(child)); } + +private: + std::vector children; +}; + +/** + * Just one metric for incoming and one for outgoing metrics. + */ +class InfoTelemetry : public Telemetry { +public: + /** + * + * @param name The metric name without prefix. + * @param static_labels Labels to add on all metrics. + * @param prefix The metric prefix. + */ + InfoTelemetry(std::string_view name, LabelList static_labels, std::string_view prefix = "zeek"); + + void OnOutgoingEvent(std::string_view topic, std::string_view handler_name, const SerializationInfo& info) override; + void OnIncomingEvent(std::string_view topic, std::string_view handler_name, const SerializationInfo& info) override; + +private: + telemetry::CounterPtr in, out; +}; + +/** + * A telemetry class producing metrics labeled with handler names and topics. + * + * Note that randomly generated topic names will cause unbounded + * metrics growth. A topic_normalizer should be injected to normalize + * topic names. + */ +class VerboseTelemetry : public Telemetry { +public: + VerboseTelemetry(TopicNormalizer topic_normalizer, std::string_view name, LabelList static_labels, + std::string_view prefix = "zeek"); + + void OnOutgoingEvent(std::string_view topic, std::string_view handler_name, const SerializationInfo& info) override; + void OnIncomingEvent(std::string_view topic, std::string_view handler_name, const SerializationInfo& info) override; + +private: + TopicNormalizer topic_normalizer; + LabelList labels; + LabelViewList labels_view; + size_t topic_idx, handler_idx; // Index of topic and handler labels in labels_view + telemetry::CounterFamilyPtr in, out; +}; + +/** + * A telemetry class producing metrics labeled with topics + * and the script layer location for outgoing metrics. + */ +class DebugTelemetry : public Telemetry { +public: + DebugTelemetry(TopicNormalizer topic_normalizer, std::string_view name, LabelList static_labels, + std::vector size_bounds, std::string_view prefix = "zeek"); + + void OnOutgoingEvent(std::string_view topic, std::string_view handler_name, const SerializationInfo& info) override; + void OnIncomingEvent(std::string_view topic, std::string_view handler_name, const SerializationInfo& info) override; + +private: + TopicNormalizer topic_normalizer; + std::vector size_bounds; + LabelList labels; + LabelViewList labels_view; + zeek::Span labels_view_no_location; + size_t topic_idx, handler_idx, + script_location_idx; // Index of topic, handler and script_location labels in labels_view + telemetry::HistogramFamilyPtr in, out; +}; + +/** + * Reads Cluster::Telemetry consts, instantiates and appropriate Telemetry instance + * set it on the given backend. + * + * @param backend The cluster backend to configure. + * @param name The name used in the metric names. Either core or websocket at this point. + * @param static_labels Static labels to attach to metrics. + */ +void configure_backend_telemetry(Backend& backend, std::string_view name, LabelList static_labels = {}); + +} // namespace detail +} // namespace cluster +} // namespace zeek diff --git a/src/cluster/websocket/WebSocket-IXWebSocket.cc b/src/cluster/websocket/WebSocket-IXWebSocket.cc index 36d4864236..3c575abf16 100644 --- a/src/cluster/websocket/WebSocket-IXWebSocket.cc +++ b/src/cluster/websocket/WebSocket-IXWebSocket.cc @@ -138,8 +138,13 @@ std::unique_ptr StartServer(std::unique_ptrtype == ix::WebSocketMessageType::Open ) { - dispatcher->QueueForProcessing( - WebSocketOpen{id, msg->openInfo.uri, msg->openInfo.protocol, std::move(ixws)}); + std::optional application_name; + auto it = msg->openInfo.headers.find("X-Application-Name"); + if ( it != msg->openInfo.headers.end() ) + application_name = it->second; + + dispatcher->QueueForProcessing(WebSocketOpen{id, msg->openInfo.uri, msg->openInfo.protocol, + std::move(application_name), std::move(ixws)}); } else if ( msg->type == ix::WebSocketMessageType::Message ) { dispatcher->QueueForProcessing(WebSocketMessage{id, msg->str}); diff --git a/src/cluster/websocket/WebSocket.cc b/src/cluster/websocket/WebSocket.cc index e66f0854b2..1063ab8b6f 100644 --- a/src/cluster/websocket/WebSocket.cc +++ b/src/cluster/websocket/WebSocket.cc @@ -4,6 +4,8 @@ #include "zeek/cluster/websocket/WebSocket.h" +#include +#include #include #include #include @@ -14,6 +16,7 @@ #include "zeek/cluster/Manager.h" #include "zeek/cluster/OnLoop.h" #include "zeek/cluster/Serializer.h" +#include "zeek/cluster/Telemetry.h" #include "zeek/cluster/serializer/broker/Serializer.h" #include "zeek/cluster/websocket/Plugin.h" #include "zeek/cluster/websocket/events.bif.h" @@ -291,6 +294,24 @@ void WebSocketEventDispatcher::Process(const WebSocketOpen& open) { return; } + std::string application_name = open.application_name.value_or("unknown"); + + // A bit ad-hoc + bool good_application_name = std::all_of(application_name.begin(), application_name.end(), [](auto c) { + return std::isalnum(c) || c == '/' || c == '_' || c == '-' || c == '.' || c == '=' || c == ':' || c == '*' || + c == '@'; + }); + + if ( ! good_application_name ) { + QueueReply(WebSocketCloseReply{wsc, 1001, "Internal error"}); + open.wsc->SendError("invalid_application_name", "Invalid X-Application-Name"); + open.wsc->Close(1008, "Invalid X-Application-Name"); + + // Still create an entry as we might see messages and close events coming in. + clients[id] = WebSocketClientEntry{id, wsc, nullptr}; + return; + } + // Generate an ID for this client. auto ws_id = cluster::backend->NodeId() + "-websocket-" + id; @@ -321,6 +342,8 @@ void WebSocketEventDispatcher::Process(const WebSocketOpen& open) { return; } + cluster::detail::configure_backend_telemetry(*backend, "websocket", {{"app", application_name}}); + WS_DEBUG("New WebSocket client %s (%s:%d) - using id %s backend=%p", id.c_str(), wsc->getRemoteIp().c_str(), wsc->getRemotePort(), ws_id.c_str(), backend.get()); diff --git a/src/cluster/websocket/WebSocket.h b/src/cluster/websocket/WebSocket.h index eff83c11f2..ab5cebcec0 100644 --- a/src/cluster/websocket/WebSocket.h +++ b/src/cluster/websocket/WebSocket.h @@ -125,6 +125,7 @@ struct WebSocketOpen { std::string id; std::string uri; std::string protocol; + std::optional application_name; std::shared_ptr wsc; }; diff --git a/src/zeek-setup.cc b/src/zeek-setup.cc index d3c6000f84..fe9ae50be8 100644 --- a/src/zeek-setup.cc +++ b/src/zeek-setup.cc @@ -54,6 +54,7 @@ #include "zeek/broker/Manager.h" #include "zeek/cluster/Backend.h" #include "zeek/cluster/Manager.h" +#include "zeek/cluster/Telemetry.h" #include "zeek/conn_key/Manager.h" #include "zeek/file_analysis/Manager.h" #include "zeek/input.h" @@ -891,6 +892,8 @@ SetupResult setup(int argc, char** argv, Options* zopts) { cluster::backend = backend.release(); } + cluster::detail::configure_backend_telemetry(*cluster::backend, "core"); + broker_mgr->InitPostScript(); if ( cluster::backend != broker_mgr ) cluster::backend->InitPostScript(); diff --git a/testing/btest/Baseline/cluster.telemetry.two-nodes/..manager.out b/testing/btest/Baseline/cluster.telemetry.two-nodes/..manager.out new file mode 100644 index 0000000000..a67c185653 --- /dev/null +++ b/testing/btest/Baseline/cluster.telemetry.two-nodes/..manager.out @@ -0,0 +1,9 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +### NOTE: This file has been sorted with diff-sort. +zeek, zeek_cluster_core_incoming_events_total, [node], [manager], 2.0 +zeek, zeek_cluster_core_outgoing_events_total, [node], [manager], 3.0 +zeek, zeek_cluster_core_verbose_incoming_events_total, [handler, node, topic], [Cluster::Backend::ZeroMQ::hello, manager, zeek.cluster.nodeid.__normalized__], 1.0 +zeek, zeek_cluster_core_verbose_incoming_events_total, [handler, node, topic], [Cluster::hello, manager, zeek.cluster.nodeid.__normalized__], 1.0 +zeek, zeek_cluster_core_verbose_outgoing_events_total, [handler, node, topic], [Cluster::Backend::ZeroMQ::hello, manager, zeek.cluster.nodeid.__normalized__], 1.0 +zeek, zeek_cluster_core_verbose_outgoing_events_total, [handler, node, topic], [Cluster::hello, manager, zeek.cluster.nodeid.__normalized__], 1.0 +zeek, zeek_cluster_core_verbose_outgoing_events_total, [handler, node, topic], [finish, manager, zeek.cluster.nodeid.__normalized__], 1.0 diff --git a/testing/btest/Baseline/cluster.telemetry.two-nodes/..worker.out b/testing/btest/Baseline/cluster.telemetry.two-nodes/..worker.out new file mode 100644 index 0000000000..98723c5791 --- /dev/null +++ b/testing/btest/Baseline/cluster.telemetry.two-nodes/..worker.out @@ -0,0 +1,9 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +### NOTE: This file has been sorted with diff-sort. +zeek, zeek_cluster_core_incoming_events_total, [node], [worker-1], 3.0 +zeek, zeek_cluster_core_outgoing_events_total, [node], [worker-1], 2.0 +zeek, zeek_cluster_core_verbose_incoming_events_total, [handler, node, topic], [Cluster::Backend::ZeroMQ::hello, worker-1, zeek.cluster.nodeid.__normalized__], 1.0 +zeek, zeek_cluster_core_verbose_incoming_events_total, [handler, node, topic], [Cluster::hello, worker-1, zeek.cluster.nodeid.__normalized__], 1.0 +zeek, zeek_cluster_core_verbose_incoming_events_total, [handler, node, topic], [finish, worker-1, zeek.cluster.nodeid.__normalized__], 1.0 +zeek, zeek_cluster_core_verbose_outgoing_events_total, [handler, node, topic], [Cluster::Backend::ZeroMQ::hello, worker-1, zeek.cluster.nodeid.__normalized__], 1.0 +zeek, zeek_cluster_core_verbose_outgoing_events_total, [handler, node, topic], [Cluster::hello, worker-1, zeek.cluster.nodeid.__normalized__], 1.0 diff --git a/testing/btest/Baseline/cluster.telemetry.ws-app/..manager..stderr b/testing/btest/Baseline/cluster.telemetry.ws-app/..manager..stderr new file mode 100644 index 0000000000..70b881ebd5 --- /dev/null +++ b/testing/btest/Baseline/cluster.telemetry.ws-app/..manager..stderr @@ -0,0 +1,3 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +### NOTE: This file has been sorted with diff-sort. +received termination signal diff --git a/testing/btest/Baseline/cluster.telemetry.ws-app/..manager..stdout b/testing/btest/Baseline/cluster.telemetry.ws-app/..manager..stdout new file mode 100644 index 0000000000..0e181ab2bb --- /dev/null +++ b/testing/btest/Baseline/cluster.telemetry.ws-app/..manager..stdout @@ -0,0 +1,33 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +### NOTE: This file has been sorted with diff-sort. +Cluster::websocket_client_added, [/test/pings, /zeek/wstest/ws1/] +zeek, zeek_cluster_core_debug_incoming_event_sizes, [handler, node, topic], [ping, manager, /test/pings/0], [0.0, 1.0, 0.0, 3.0, 4.0, 17.0, 0.0, 0.0, 0.0] +zeek, zeek_cluster_core_debug_incoming_event_sizes, [handler, node, topic], [ping, manager, /test/pings/1], [0.0, 0.0, 1.0, 3.0, 4.0, 17.0, 0.0, 0.0, 0.0] +zeek, zeek_cluster_core_debug_incoming_event_sizes, [handler, node, topic], [ping, manager, /test/pings/2], [0.0, 0.0, 0.0, 4.0, 3.0, 18.0, 0.0, 0.0, 0.0] +zeek, zeek_cluster_core_debug_incoming_event_sizes, [handler, node, topic], [ping, manager, /test/pings/3], [0.0, 0.0, 0.0, 3.0, 4.0, 18.0, 0.0, 0.0, 0.0] +zeek, zeek_cluster_core_debug_outgoing_event_sizes, [handler, node, script_location, topic], [ping, manager, ./manager.zeek:32, /test/pings/0], [0.0, 1.0, 0.0, 3.0, 4.0, 17.0, 0.0, 0.0, 0.0] +zeek, zeek_cluster_core_debug_outgoing_event_sizes, [handler, node, script_location, topic], [ping, manager, ./manager.zeek:32, /test/pings/2], [0.0, 0.0, 0.0, 4.0, 3.0, 18.0, 0.0, 0.0, 0.0] +zeek, zeek_cluster_core_incoming_events_total, [node], [manager], 100.0 +zeek, zeek_cluster_core_outgoing_events_total, [node], [manager], 50.0 +zeek, zeek_cluster_core_verbose_incoming_events_total, [handler, node, topic], [ping, manager, /test/pings/0], 25.0 +zeek, zeek_cluster_core_verbose_incoming_events_total, [handler, node, topic], [ping, manager, /test/pings/1], 25.0 +zeek, zeek_cluster_core_verbose_incoming_events_total, [handler, node, topic], [ping, manager, /test/pings/2], 25.0 +zeek, zeek_cluster_core_verbose_incoming_events_total, [handler, node, topic], [ping, manager, /test/pings/3], 25.0 +zeek, zeek_cluster_core_verbose_outgoing_events_total, [handler, node, topic], [ping, manager, /test/pings/0], 25.0 +zeek, zeek_cluster_core_verbose_outgoing_events_total, [handler, node, topic], [ping, manager, /test/pings/2], 25.0 +zeek, zeek_cluster_websocket_debug_incoming_event_sizes, [app, handler, node, topic], [btest-python-client, ping, manager, /test/pings/0], [0.0, 1.0, 0.0, 3.0, 4.0, 17.0, 0.0, 0.0, 0.0] +zeek, zeek_cluster_websocket_debug_incoming_event_sizes, [app, handler, node, topic], [btest-python-client, ping, manager, /test/pings/2], [0.0, 0.0, 0.0, 4.0, 3.0, 18.0, 0.0, 0.0, 0.0] +zeek, zeek_cluster_websocket_debug_outgoing_event_sizes, [app, handler, node, script_location, topic], [btest-python-client, ping, manager, none, /test/pings/0], [0.0, 1.0, 0.0, 3.0, 4.0, 17.0, 0.0, 0.0, 0.0] +zeek, zeek_cluster_websocket_debug_outgoing_event_sizes, [app, handler, node, script_location, topic], [btest-python-client, ping, manager, none, /test/pings/1], [0.0, 0.0, 1.0, 3.0, 4.0, 17.0, 0.0, 0.0, 0.0] +zeek, zeek_cluster_websocket_debug_outgoing_event_sizes, [app, handler, node, script_location, topic], [btest-python-client, ping, manager, none, /test/pings/2], [0.0, 0.0, 0.0, 4.0, 3.0, 18.0, 0.0, 0.0, 0.0] +zeek, zeek_cluster_websocket_debug_outgoing_event_sizes, [app, handler, node, script_location, topic], [btest-python-client, ping, manager, none, /test/pings/3], [0.0, 0.0, 0.0, 3.0, 4.0, 18.0, 0.0, 0.0, 0.0] +zeek, zeek_cluster_websocket_incoming_events_total, [app, node], [btest-python-client, manager], 50.0 +zeek, zeek_cluster_websocket_outgoing_events_total, [app, node], [btest-python-client, manager], 100.0 +zeek, zeek_cluster_websocket_verbose_incoming_events_total, [app, handler, node, topic], [btest-python-client, ping, manager, /test/pings/0], 25.0 +zeek, zeek_cluster_websocket_verbose_incoming_events_total, [app, handler, node, topic], [btest-python-client, ping, manager, /test/pings/2], 25.0 +zeek, zeek_cluster_websocket_verbose_outgoing_events_total, [app, handler, node, topic], [btest-python-client, ping, manager, /test/pings/0], 25.0 +zeek, zeek_cluster_websocket_verbose_outgoing_events_total, [app, handler, node, topic], [btest-python-client, ping, manager, /test/pings/1], 25.0 +zeek, zeek_cluster_websocket_verbose_outgoing_events_total, [app, handler, node, topic], [btest-python-client, ping, manager, /test/pings/2], 25.0 +zeek, zeek_cluster_websocket_verbose_outgoing_events_total, [app, handler, node, topic], [btest-python-client, ping, manager, /test/pings/3], 25.0 +zeek_cluster_* histogram metrics, 12 +zeek_cluster_* metrics, 16 diff --git a/testing/btest/Baseline/cluster.telemetry.ws/..manager..stderr b/testing/btest/Baseline/cluster.telemetry.ws/..manager..stderr new file mode 100644 index 0000000000..70b881ebd5 --- /dev/null +++ b/testing/btest/Baseline/cluster.telemetry.ws/..manager..stderr @@ -0,0 +1,3 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +### NOTE: This file has been sorted with diff-sort. +received termination signal diff --git a/testing/btest/Baseline/cluster.telemetry.ws/..manager..stdout b/testing/btest/Baseline/cluster.telemetry.ws/..manager..stdout new file mode 100644 index 0000000000..682f29828e --- /dev/null +++ b/testing/btest/Baseline/cluster.telemetry.ws/..manager..stdout @@ -0,0 +1,18 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +### NOTE: This file has been sorted with diff-sort. +Cluster::websocket_client_added, [/test/pings, /zeek/wstest/ws1/] +zeek, zeek_cluster_core_incoming_events_total, [node], [manager], 100.0 +zeek, zeek_cluster_core_outgoing_events_total, [node], [manager], 50.0 +zeek, zeek_cluster_core_verbose_incoming_events_total, [handler, node, topic], [ping, manager, /test/pings/0], 25.0 +zeek, zeek_cluster_core_verbose_incoming_events_total, [handler, node, topic], [ping, manager, /test/pings/1], 25.0 +zeek, zeek_cluster_core_verbose_incoming_events_total, [handler, node, topic], [ping, manager, /test/pings/2], 25.0 +zeek, zeek_cluster_core_verbose_incoming_events_total, [handler, node, topic], [ping, manager, /test/pings/3], 25.0 +zeek, zeek_cluster_core_verbose_outgoing_events_total, [handler, node, topic], [ping, manager, /test/pings], 50.0 +zeek, zeek_cluster_websocket_incoming_events_total, [app, node], [unknown, manager], 50.0 +zeek, zeek_cluster_websocket_outgoing_events_total, [app, node], [unknown, manager], 100.0 +zeek, zeek_cluster_websocket_verbose_incoming_events_total, [app, handler, node, topic], [unknown, ping, manager, /test/pings], 50.0 +zeek, zeek_cluster_websocket_verbose_outgoing_events_total, [app, handler, node, topic], [unknown, ping, manager, /test/pings/0], 25.0 +zeek, zeek_cluster_websocket_verbose_outgoing_events_total, [app, handler, node, topic], [unknown, ping, manager, /test/pings/1], 25.0 +zeek, zeek_cluster_websocket_verbose_outgoing_events_total, [app, handler, node, topic], [unknown, ping, manager, /test/pings/2], 25.0 +zeek, zeek_cluster_websocket_verbose_outgoing_events_total, [app, handler, node, topic], [unknown, ping, manager, /test/pings/3], 25.0 +zeek_cluster_* metrics, 14 diff --git a/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log b/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log index c0a0da839f..4bb0fdb685 100644 --- a/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log +++ b/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log @@ -140,6 +140,7 @@ scripts/base/init-frameworks-and-bifs.zeek build/scripts/base/bif/plugins/Zeek_Cluster_WebSocket.events.bif.zeek scripts/base/frameworks/cluster/pools.zeek scripts/base/utils/hash_hrw.zeek + scripts/base/frameworks/cluster/telemetry.zeek scripts/base/frameworks/config/__load__.zeek scripts/base/frameworks/config/main.zeek scripts/base/frameworks/config/input.zeek diff --git a/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log b/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log index e13fb0109d..4ed3a0c33b 100644 --- a/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log +++ b/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log @@ -140,6 +140,7 @@ scripts/base/init-frameworks-and-bifs.zeek build/scripts/base/bif/plugins/Zeek_Cluster_WebSocket.events.bif.zeek scripts/base/frameworks/cluster/pools.zeek scripts/base/utils/hash_hrw.zeek + scripts/base/frameworks/cluster/telemetry.zeek scripts/base/frameworks/config/__load__.zeek scripts/base/frameworks/config/main.zeek scripts/base/frameworks/config/input.zeek diff --git a/testing/btest/Baseline/plugins.hooks/output b/testing/btest/Baseline/plugins.hooks/output index 6960513cb6..d8dafb81e9 100644 --- a/testing/btest/Baseline/plugins.hooks/output +++ b/testing/btest/Baseline/plugins.hooks/output @@ -491,6 +491,7 @@ 0.000000 MetaHookPost LoadFile(0, ./store.bif.zeek, <...>/store.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./strings.bif.zeek, <...>/strings.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./supervisor.bif.zeek, <...>/supervisor.bif.zeek) -> -1 +0.000000 MetaHookPost LoadFile(0, ./telemetry, <...>/telemetry.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./telemetry_consts.bif.zeek, <...>/telemetry_consts.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./telemetry_functions.bif.zeek, <...>/telemetry_functions.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./telemetry_types.bif.zeek, <...>/telemetry_types.bif.zeek) -> -1 @@ -805,6 +806,7 @@ 0.000000 MetaHookPost LoadFileExtended(0, ./store.bif.zeek, <...>/store.bif.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./strings.bif.zeek, <...>/strings.bif.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./supervisor.bif.zeek, <...>/supervisor.bif.zeek) -> (-1, ) +0.000000 MetaHookPost LoadFileExtended(0, ./telemetry, <...>/telemetry.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./telemetry_consts.bif.zeek, <...>/telemetry_consts.bif.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./telemetry_functions.bif.zeek, <...>/telemetry_functions.bif.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./telemetry_types.bif.zeek, <...>/telemetry_types.bif.zeek) -> (-1, ) @@ -1430,6 +1432,7 @@ 0.000000 MetaHookPre LoadFile(0, ./store.bif.zeek, <...>/store.bif.zeek) 0.000000 MetaHookPre LoadFile(0, ./strings.bif.zeek, <...>/strings.bif.zeek) 0.000000 MetaHookPre LoadFile(0, ./supervisor.bif.zeek, <...>/supervisor.bif.zeek) +0.000000 MetaHookPre LoadFile(0, ./telemetry, <...>/telemetry.zeek) 0.000000 MetaHookPre LoadFile(0, ./telemetry_consts.bif.zeek, <...>/telemetry_consts.bif.zeek) 0.000000 MetaHookPre LoadFile(0, ./telemetry_functions.bif.zeek, <...>/telemetry_functions.bif.zeek) 0.000000 MetaHookPre LoadFile(0, ./telemetry_types.bif.zeek, <...>/telemetry_types.bif.zeek) @@ -1744,6 +1747,7 @@ 0.000000 MetaHookPre LoadFileExtended(0, ./store.bif.zeek, <...>/store.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./strings.bif.zeek, <...>/strings.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./supervisor.bif.zeek, <...>/supervisor.bif.zeek) +0.000000 MetaHookPre LoadFileExtended(0, ./telemetry, <...>/telemetry.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./telemetry_consts.bif.zeek, <...>/telemetry_consts.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./telemetry_functions.bif.zeek, <...>/telemetry_functions.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./telemetry_types.bif.zeek, <...>/telemetry_types.bif.zeek) @@ -2380,6 +2384,7 @@ 0.000000 | HookLoadFile ./store.bif.zeek <...>/store.bif.zeek 0.000000 | HookLoadFile ./strings.bif.zeek <...>/strings.bif.zeek 0.000000 | HookLoadFile ./supervisor.bif.zeek <...>/supervisor.bif.zeek +0.000000 | HookLoadFile ./telemetry <...>/telemetry.zeek 0.000000 | HookLoadFile ./telemetry_consts.bif.zeek <...>/telemetry_consts.bif.zeek 0.000000 | HookLoadFile ./telemetry_functions.bif.zeek <...>/telemetry_functions.bif.zeek 0.000000 | HookLoadFile ./telemetry_types.bif.zeek <...>/telemetry_types.bif.zeek @@ -2694,6 +2699,7 @@ 0.000000 | HookLoadFileExtended ./store.bif.zeek <...>/store.bif.zeek 0.000000 | HookLoadFileExtended ./strings.bif.zeek <...>/strings.bif.zeek 0.000000 | HookLoadFileExtended ./supervisor.bif.zeek <...>/supervisor.bif.zeek +0.000000 | HookLoadFileExtended ./telemetry <...>/telemetry.zeek 0.000000 | HookLoadFileExtended ./telemetry_consts.bif.zeek <...>/telemetry_consts.bif.zeek 0.000000 | HookLoadFileExtended ./telemetry_functions.bif.zeek <...>/telemetry_functions.bif.zeek 0.000000 | HookLoadFileExtended ./telemetry_types.bif.zeek <...>/telemetry_types.bif.zeek diff --git a/testing/btest/Files/ws/wstest.py b/testing/btest/Files/ws/wstest.py index 9373f006d2..f01ec3d9f6 100644 --- a/testing/btest/Files/ws/wstest.py +++ b/testing/btest/Files/ws/wstest.py @@ -90,14 +90,18 @@ class TestClient: return self.__name -def connect(name: str, url: Optional[str] = None) -> TestClient: +def connect( + name: str, + url: Optional[str] = None, + additional_headers: Optional[dict[str, str]] = None, +) -> TestClient: """ Connect to a WebSocket server and return a TestClient instance. """ if url is None: url = WS4_URL_V1 - cc = websockets.sync.client.connect(url) + cc = websockets.sync.client.connect(url, additional_headers=additional_headers) return TestClient(name, cc) diff --git a/testing/btest/cluster/telemetry/two-nodes.zeek b/testing/btest/cluster/telemetry/two-nodes.zeek new file mode 100644 index 0000000000..8eceea0b6e --- /dev/null +++ b/testing/btest/cluster/telemetry/two-nodes.zeek @@ -0,0 +1,68 @@ +# @TEST-DOC: All parties log their cluster metrics at zeek_done() time. +# +# @TEST-REQUIRES: have-zeromq +# +# @TEST-GROUP: cluster-zeromq +# +# @TEST-PORT: XPUB_PORT +# @TEST-PORT: XSUB_PORT +# @TEST-PORT: LOG_PULL_PORT +# +# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-simple.zeek cluster-layout.zeek +# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek +# +# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek >out" +# @TEST-EXEC: btest-bg-run worker "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-1 zeek -b ../worker.zeek >out" +# +# @TEST-EXEC: btest-bg-wait 30 +# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-sort btest-diff ./manager/out +# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-sort btest-diff ./worker/out + + +# @TEST-START-FILE common.zeek +@load base/frameworks/telemetry + +@load ./zeromq-test-bootstrap + +redef Cluster::Telemetry::core_metrics += { + Cluster::Telemetry::VERBOSE, +}; + +redef Cluster::Telemetry::websocket_metrics += { + Cluster::Telemetry::VERBOSE, +}; + +global finish: event(name: string); + +event zeek_done() + { + local ms = Telemetry::collect_metrics("zeek", "cluster_core_*"); + ms += Telemetry::collect_metrics("zeek", "cluster_websocket_*"); + for ( _, m in ms ) + print m$opts$prefix, m$opts$name, m$label_names, m$label_values, m$value; + } +# @TEST-END-FILE + +# @TEST-START-FILE manager.zeek +@load ./common.zeek +# If a node comes up that isn't us, send it a finish event. +event Cluster::node_up(name: string, id: string) { + Cluster::publish(Cluster::nodeid_topic(id), finish, Cluster::node); +} + +# If the worker vanishes, finish the test. +event Cluster::node_down(name: string, id: string) { + terminate(); +} +# @TEST-END-FILE + +# @TEST-START-FILE worker.zeek +@load ./common.zeek + +event Cluster::node_up(name: string, id: string) { +} + +event finish(name: string) &is_used { + terminate(); +} +# @TEST-END-FILE diff --git a/testing/btest/cluster/telemetry/ws-app.zeek b/testing/btest/cluster/telemetry/ws-app.zeek new file mode 100644 index 0000000000..59064a2be3 --- /dev/null +++ b/testing/btest/cluster/telemetry/ws-app.zeek @@ -0,0 +1,105 @@ +# @TEST-DOC: Output cluster telemetry after working with a WebSocket client. The WebSocket client sends an X-Application-Name header. Also include debug metrics as histograms in the output. +# +# @TEST-REQUIRES: have-zeromq +# @TEST-REQUIRES: python3 -c 'import websockets.sync' +# +# @TEST-GROUP: cluster-zeromq +# +# @TEST-PORT: XPUB_PORT +# @TEST-PORT: XSUB_PORT +# @TEST-PORT: LOG_PULL_PORT +# @TEST-PORT: WEBSOCKET_PORT +# +# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-simple.zeek cluster-layout.zeek +# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek +# @TEST-EXEC: cp $FILES/ws/wstest.py . +# +# @TEST-EXEC: zeek -b --parse-only manager.zeek +# @TEST-EXEC: python3 -m py_compile client.py +# +# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek" +# @TEST-EXEC: python3 client.py +# +# @TEST-EXEC: btest-bg-wait 10 +# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-sort btest-diff ./manager/.stdout +# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-sort btest-diff ./manager/.stderr + +# @TEST-START-FILE manager.zeek +@load base/frameworks/telemetry + +@load ./zeromq-test-bootstrap + +redef Cluster::Telemetry::core_metrics += { + Cluster::Telemetry::VERBOSE, + Cluster::Telemetry::DEBUG, +}; + +redef Cluster::Telemetry::websocket_metrics += { + Cluster::Telemetry::VERBOSE, + Cluster::Telemetry::DEBUG, +}; + +redef exit_only_after_terminate = T; + +global expected_ping_count = 100; +global ping_count = 0; + +global ping: event(msg: string, c: count) &is_used; + +event zeek_init() + { + Cluster::subscribe("/test/pings"); + Cluster::listen_websocket([$listen_addr=127.0.0.1, $listen_port=to_port(getenv("WEBSOCKET_PORT"))]); + } + +event ping(msg: string, n: count) &is_used + { + if ( ping_count % 2 == 0) # Reply every other ping. + { + Cluster::publish(fmt("/test/pings/%s", ping_count % 4), ping, msg, n); + } + + ++ping_count; + + if ( ping_count == expected_ping_count ) + terminate(); + } + +event Cluster::websocket_client_added(info: Cluster::EndpointInfo, subscriptions: string_vec) + { + print "Cluster::websocket_client_added", subscriptions; + } + +event zeek_done() + { + local ms = Telemetry::collect_metrics("zeek", "cluster_core_*"); + ms += Telemetry::collect_metrics("zeek", "cluster_websocket_*"); + print "zeek_cluster_* metrics", |ms|; + for ( _, m in ms ) + print m$opts$prefix, m$opts$name, m$label_names, m$label_values, m$value; + + local hms = Telemetry::collect_histogram_metrics("zeek", "cluster_core_*"); + hms += Telemetry::collect_histogram_metrics("zeek", "cluster_websocket_*"); + + print "zeek_cluster_* histogram metrics", |hms|; + for ( _, hm in hms ) + print hm$opts$prefix, hm$opts$name, hm$label_names, hm$label_values, hm$values; + } +# @TEST-END-FILE + + +# @TEST-START-FILE client.py +import wstest + +def run(ws_url): + with wstest.connect("ws1", ws_url, additional_headers={"X-Application-Name": "btest-python-client"}) as tc: + tc.hello_v1(["/test/pings"]) + for i in range(0, 100): + msg = f"ping {i}" + (i * 32 * "A") + tc.send_json(wstest.build_event_v1(f"/test/pings/{i % 4}", "ping", [msg, i])) + if i % 2 == 0: # Wait for a reply for every other ping + tc.recv_json() + +if __name__ == "__main__": + wstest.main(run, wstest.WS4_URL_V1) +# @TEST-END-FILE diff --git a/testing/btest/cluster/telemetry/ws.zeek b/testing/btest/cluster/telemetry/ws.zeek new file mode 100644 index 0000000000..601441c66c --- /dev/null +++ b/testing/btest/cluster/telemetry/ws.zeek @@ -0,0 +1,93 @@ +# @TEST-DOC: Output cluster telemetry after working with a WebSocket client. +# +# @TEST-REQUIRES: have-zeromq +# @TEST-REQUIRES: python3 -c 'import websockets.sync' +# +# @TEST-GROUP: cluster-zeromq +# +# @TEST-PORT: XPUB_PORT +# @TEST-PORT: XSUB_PORT +# @TEST-PORT: LOG_PULL_PORT +# @TEST-PORT: WEBSOCKET_PORT +# +# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-simple.zeek cluster-layout.zeek +# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek +# @TEST-EXEC: cp $FILES/ws/wstest.py . +# +# @TEST-EXEC: zeek -b --parse-only manager.zeek +# @TEST-EXEC: python3 -m py_compile client.py +# +# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek" +# @TEST-EXEC: python3 client.py +# +# @TEST-EXEC: btest-bg-wait 10 +# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-sort btest-diff ./manager/.stdout +# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-sort btest-diff ./manager/.stderr + +# @TEST-START-FILE manager.zeek +@load base/frameworks/telemetry + +@load ./zeromq-test-bootstrap + +redef Cluster::Telemetry::core_metrics += { + Cluster::Telemetry::VERBOSE, +}; + +redef Cluster::Telemetry::websocket_metrics += { + Cluster::Telemetry::VERBOSE, +}; + +redef exit_only_after_terminate = T; + +global expected_ping_count = 100; +global ping_count = 0; + +global ping: event(msg: string, c: count) &is_used; + +event zeek_init() + { + Cluster::subscribe("/test/pings"); + Cluster::listen_websocket([$listen_addr=127.0.0.1, $listen_port=to_port(getenv("WEBSOCKET_PORT"))]); + } + +event ping(msg: string, n: count) &is_used + { + if ( ping_count % 2 == 0) # Reply every other ping. + Cluster::publish("/test/pings", ping, msg, n); + + ++ping_count; + + if ( ping_count == expected_ping_count ) + terminate(); + } + +event Cluster::websocket_client_added(info: Cluster::EndpointInfo, subscriptions: string_vec) + { + print "Cluster::websocket_client_added", subscriptions; + } + +event zeek_done() + { + local ms = Telemetry::collect_metrics("zeek", "cluster_core_*"); + ms += Telemetry::collect_metrics("zeek", "cluster_websocket_*"); + print "zeek_cluster_* metrics", |ms|; + for ( _, m in ms ) + print m$opts$prefix, m$opts$name, m$label_names, m$label_values, m$value; + } +# @TEST-END-FILE + + +# @TEST-START-FILE client.py +import wstest + +def run(ws_url): + with wstest.connect("ws1", ws_url) as tc: + tc.hello_v1(["/test/pings"]) + for i in range(0, 100): + tc.send_json(wstest.build_event_v1(f"/test/pings/{i % 4}", "ping", [f"ping {i}", i])) + if i % 2 == 0: # Wait for a reply for every other ping + tc.recv_json() + +if __name__ == "__main__": + wstest.main(run, wstest.WS4_URL_V1) +# @TEST-END-FILE