Merge remote-tracking branch 'origin/topic/awelzel/1474-cluster-telemetry'

* origin/topic/awelzel/1474-cluster-telemetry:
  btest/cluster/telemetry: Add smoke testing for telemetry
  cluster/WebSocket: Fetch X-Application-Name header as app label
  cluster/WebSocket: Pass X-Application-Name to dispatcher
  broker/WebSocketShim: Add calls to Telemetry hooks
  cluster/WebSocket: Configure telemetry for WebSocket backends
  broker: Hook up generic cluster telemetry
  cluster: Introduce telemetry component

One bug fix removing static from a variable that shouldn't be static.
This commit is contained in:
Arne Welzel 2025-06-26 14:30:37 +02:00
commit 22958f7cdf
29 changed files with 983 additions and 6 deletions

22
CHANGES
View file

@ -1,3 +1,25 @@
8.0.0-dev.546 | 2025-06-26 14:30:37 +0200
* btest/cluster/telemetry: Add smoke testing for telemetry (Arne Welzel, Corelight)
* cluster/WebSocket: Fetch X-Application-Name header as app label (Arne Welzel, Corelight)
* cluster/WebSocket: Pass X-Application-Name to dispatcher (Arne Welzel, Corelight)
This is going to be used to add labels to telemetry if the
X-Application-Name header is set.
* broker/WebSocketShim: Add calls to Telemetry hooks (Arne Welzel, Corelight)
WebSocket clients with Broker do not use the normal Broker backend, so
we need to add the telemetry invocations explicitly.
* cluster/WebSocket: Configure telemetry for WebSocket backends (Arne Welzel, Corelight)
* broker: Hook up generic cluster telemetry (Arne Welzel, Corelight)
* cluster: Introduce telemetry component (Arne Welzel, Corelight)
8.0.0-dev.538 | 2025-06-26 09:58:39 +0100 8.0.0-dev.538 | 2025-06-26 09:58:39 +0100
* Only pass session ticket data in ssl_session_ticket_handshake event (Johanna Amann, Corelight) * Only pass session ticket data in ssl_session_ticket_handshake event (Johanna Amann, Corelight)

23
NEWS
View file

@ -101,6 +101,29 @@ New Functionality
implementation in the ``src/packet_analysis/protocol/ip/conn_key/vlan_fivetuple`` implementation in the ``src/packet_analysis/protocol/ip/conn_key/vlan_fivetuple``
directory for an example. directory for an example.
- Cluster telemetry improvements. Zeek now exposes a configurable number of
metrics regarding outgoing and incoming cluster events. By default, the number
of events sent and received by a Zeek cluster node and any attached WebSocket
clients is tracked as four individual counters. It's possible to gather more
detailed information by adding ``Cluster::Telemetry::VERBOSE`` and
``Cluster::Telemetry::DEBUG`` to the variables ``Cluster::core_metrics`` and
``Cluster::webscoket_metrics``:
redef Cluster::core_metrics += { Cluster::Telemetry::VERBOSE };
redef Cluster::websocket_metrics += { Cluster::Telemetry::DEBUG };
Configuring verbose, adds metrics that are labeled with the event handler
and topic name. Configuring debug, uses histogram metrics to additionally track
the distribution of the serialized event size. Additionally, when debug is selected,
outgoing events are labeled with the script location from where they were published.
- Support for the X-Application-Name HTTP header was added to the WebSocket API at
``v1/messages/json``. A WebSocket application connecting to Zeek may set the
X-Application-Name header to a descriptive identifier. The value of this header
will be added to the cluster metrics as ``app`` label. This allows to gather
incoming and outgoing event metrics of a specific WebSocket application, simply
by setting the X-Application-Name header.
- Generic event metadata support. A new ``EventMetadata`` module was added allowing - Generic event metadata support. A new ``EventMetadata`` module was added allowing
to register generic event metadata types and accessing the current event's metadata to register generic event metadata types and accessing the current event's metadata
using the functions ``current()`` and ``current_all()`` of this module. using the functions ``current()`` and ``current_all()`` of this module.

View file

@ -1 +1 @@
8.0.0-dev.538 8.0.0-dev.546

View file

@ -1,6 +1,7 @@
# Load the core cluster support. # Load the core cluster support.
@load ./main @load ./main
@load ./pools @load ./pools
@load ./telemetry
@if ( Cluster::is_enabled() ) @if ( Cluster::is_enabled() )

View file

@ -0,0 +1,40 @@
## Module for cluster telemetry.
module Cluster::Telemetry;
export {
type Type: enum {
## Creates counter metrics for incoming and for outgoing
## events without labels.
INFO,
## Creates counter metrics for incoming and outgoing events
## labeled with handler and normalized topic names.
VERBOSE,
## Creates histogram metrics using the serialized message size
## for events, labeled by topic, handler and script location
## (outgoing only).
DEBUG,
};
## The telemetry types to enable for the core backend.
const core_metrics: set[Type] = {
INFO,
} &redef;
## The telemetry types to enable for WebSocket backends.
const websocket_metrics: set[Type] = {
INFO,
} &redef;
## Table used for normalizing topic names that contain random parts.
## Map to an empty string to skip recording a specific metric
## completely.
const topic_normalizations: table[pattern] of string = {
[/^zeek\.cluster\.nodeid\..*/] = "zeek.cluster.nodeid.__normalized__",
[/^zeek\/cluster\/nodeid\/.*/] = "zeek/cluster/nodeid/__normalized__",
} &ordered &redef;
## For the DEBUG metrics, the histogram buckets to use.
const message_size_bounds: vector of double = {
10.0, 50.0, 100.0, 500.0, 1000.0, 5000.0, 10000.0, 50000.0,
} &redef;
}

View file

@ -34,6 +34,7 @@
#include "zeek/broker/comm.bif.h" #include "zeek/broker/comm.bif.h"
#include "zeek/broker/messaging.bif.h" #include "zeek/broker/messaging.bif.h"
#include "zeek/broker/store.bif.h" #include "zeek/broker/store.bif.h"
#include "zeek/cluster/Telemetry.h"
#include "zeek/cluster/serializer/broker/Serializer.h" #include "zeek/cluster/serializer/broker/Serializer.h"
#include "zeek/iosource/Manager.h" #include "zeek/iosource/Manager.h"
#include "zeek/logging/Manager.h" #include "zeek/logging/Manager.h"
@ -838,6 +839,9 @@ bool Manager::DoPublishEvent(const std::string& topic, cluster::detail::Event& e
auto& ev = maybe_ev.value(); auto& ev = maybe_ev.value();
size_t size = ev.as_data().shared_envelope()->raw_bytes().second;
Telemetry().OnOutgoingEvent(topic, event.HandlerName(), cluster::detail::SerializationInfo{size});
DBG_LOG(DBG_BROKER, "Publishing event: %s", RenderEvent(topic, std::string(ev.name()), ev.args()).c_str()); DBG_LOG(DBG_BROKER, "Publishing event: %s", RenderEvent(topic, std::string(ev.name()), ev.args()).c_str());
bstate->endpoint.publish(topic, ev.move_data()); bstate->endpoint.publish(topic, ev.move_data());
num_events_outgoing_metric->Inc(); num_events_outgoing_metric->Inc();
@ -859,6 +863,10 @@ bool Manager::PublishEvent(string topic, std::string name, broker::vector args,
} }
broker::zeek::Event ev(name, args, meta); broker::zeek::Event ev(name, args, meta);
size_t size = ev.as_data().shared_envelope()->raw_bytes().second;
Telemetry().OnOutgoingEvent(topic, name, cluster::detail::SerializationInfo{size});
DBG_LOG(DBG_BROKER, "Publishing event: %s", RenderEvent(topic, std::string(ev.name()), ev.args()).c_str()); DBG_LOG(DBG_BROKER, "Publishing event: %s", RenderEvent(topic, std::string(ev.name()), ev.args()).c_str());
bstate->endpoint.publish(std::move(topic), ev.move_data()); bstate->endpoint.publish(std::move(topic), ev.move_data());
num_events_outgoing_metric->Inc(); num_events_outgoing_metric->Inc();
@ -1583,6 +1591,8 @@ void Manager::ProcessMessage(std::string_view topic, broker::zeek::Event& ev) {
DBG_LOG(DBG_BROKER, "Process event: %s (with %zu metadata entries) %s", std::string{name}.c_str(), DBG_LOG(DBG_BROKER, "Process event: %s (with %zu metadata entries) %s", std::string{name}.c_str(),
meta ? meta->size() : 0, RenderMessage(args).c_str()); meta ? meta->size() : 0, RenderMessage(args).c_str());
num_events_incoming_metric->Inc(); num_events_incoming_metric->Inc();
size_t size = ev.as_data().shared_envelope()->raw_bytes().second;
Telemetry().OnIncomingEvent(topic, name, cluster::detail::SerializationInfo{size});
auto handler = event_registry->Lookup(name); auto handler = event_registry->Lookup(name);
if ( ! handler ) if ( ! handler )

View file

@ -118,6 +118,9 @@ bool WebSocketShim::DoPublishEvent(const std::string& topic, zeek::cluster::deta
return false; return false;
} }
size_t size = r->as_data().shared_envelope()->raw_bytes().second;
Telemetry().OnOutgoingEvent(topic, r->name(), cluster::detail::SerializationInfo{size});
auto msg = broker::data_envelope::make(broker::topic(topic), r->as_data()); auto msg = broker::data_envelope::make(broker::topic(topic), r->as_data());
state->hub.publish(std::move(msg)); state->hub.publish(std::move(msg));
return true; return true;
@ -176,6 +179,9 @@ void WebSocketShim::ProcessMessage(std::string_view topic, broker::zeek::Event&
return; return;
} }
size_t size = ev.as_data().shared_envelope()->raw_bytes().second;
Telemetry().OnIncomingEvent(topic, r->HandlerName(), cluster::detail::SerializationInfo{size});
ProcessEvent(topic, std::move(*r)); ProcessEvent(topic, std::move(*r));
} }
void WebSocketShim::ProcessMessage(std::string_view topic, broker::zeek::Invalid& invalid) { void WebSocketShim::ProcessMessage(std::string_view topic, broker::zeek::Invalid& invalid) {

View file

@ -16,6 +16,7 @@
#include "zeek/cluster/Manager.h" #include "zeek/cluster/Manager.h"
#include "zeek/cluster/OnLoop.h" #include "zeek/cluster/OnLoop.h"
#include "zeek/cluster/Serializer.h" #include "zeek/cluster/Serializer.h"
#include "zeek/cluster/Telemetry.h"
#include "zeek/cluster/cluster.bif.h" #include "zeek/cluster/cluster.bif.h"
#include "zeek/logging/Manager.h" #include "zeek/logging/Manager.h"
#include "zeek/plugin/Manager.h" #include "zeek/plugin/Manager.h"
@ -128,6 +129,9 @@ Backend::Backend(std::string_view arg_name, std::unique_ptr<EventSerializer> es,
tag = zeek::cluster::manager->Backends().GetComponentTag(name); tag = zeek::cluster::manager->Backends().GetComponentTag(name);
if ( ! tag ) if ( ! tag )
reporter->InternalError("unknown cluster backend name '%s'; mismatch with tag component?", name.c_str()); reporter->InternalError("unknown cluster backend name '%s'; mismatch with tag component?", name.c_str());
// No telemetry by default.
telemetry = std::make_unique<detail::NullTelemetry>();
} }
bool Backend::Init(std::string nid) { bool Backend::Init(std::string nid) {
@ -185,6 +189,8 @@ bool Backend::DoPublishEvent(const std::string& topic, cluster::detail::Event& e
if ( ! event_serializer->SerializeEvent(buf, event) ) if ( ! event_serializer->SerializeEvent(buf, event) )
return false; return false;
Telemetry().OnOutgoingEvent(topic, event.HandlerName(), detail::SerializationInfo{buf.size()});
return DoPublishEvent(topic, event_serializer->Name(), buf); return DoPublishEvent(topic, event_serializer->Name(), buf);
} }
@ -227,6 +233,8 @@ bool Backend::ProcessEventMessage(std::string_view topic, std::string_view forma
return false; return false;
} }
Telemetry().OnIncomingEvent(topic, r->HandlerName(), detail::SerializationInfo{payload.size()});
return ProcessEvent(topic, std::move(*r)); return ProcessEvent(topic, std::move(*r));
} }

View file

@ -17,6 +17,7 @@
#include "zeek/ZeekArgs.h" #include "zeek/ZeekArgs.h"
#include "zeek/cluster/BifSupport.h" #include "zeek/cluster/BifSupport.h"
#include "zeek/cluster/Serializer.h" #include "zeek/cluster/Serializer.h"
#include "zeek/cluster/Telemetry.h"
#include "zeek/logging/Types.h" #include "zeek/logging/Types.h"
namespace zeek { namespace zeek {
@ -341,6 +342,13 @@ public:
*/ */
const std::string& NodeId() const { return node_id; } const std::string& NodeId() const { return node_id; }
/**
* Set the telemetry handle to be used by this backend.
*
* @param The new telemetry instance to use.
*/
void SetTelemetry(detail::TelemetryPtr new_telemetry) { telemetry = std::move(new_telemetry); }
protected: protected:
/** /**
* Constructor. * Constructor.
@ -410,6 +418,14 @@ protected:
*/ */
void SetNodeId(std::string nid); void SetNodeId(std::string nid);
/**
* Provides access to the detail::Telemetry handle.
*/
detail::Telemetry& Telemetry() {
assert(telemetry);
return *telemetry;
}
private: private:
/** /**
* Called after all Zeek scripts have been loaded. * Called after all Zeek scripts have been loaded.
@ -550,6 +566,8 @@ private:
* The backend's instance cluster node identifier. * The backend's instance cluster node identifier.
*/ */
std::string node_id; std::string node_id;
detail::TelemetryPtr telemetry;
}; };
/** /**

View file

@ -4,10 +4,11 @@ zeek_add_subdir_library(
${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_BINARY_DIR} ${CMAKE_CURRENT_BINARY_DIR}
SOURCES SOURCES
Component.cc
Backend.cc Backend.cc
BifSupport.cc BifSupport.cc
Component.cc
Manager.cc Manager.cc
Telemetry.cc
BIFS BIFS
cluster.bif) cluster.bif)

268
src/cluster/Telemetry.cc Normal file
View file

@ -0,0 +1,268 @@
// See the file "COPYING" in the main distribution directory for copyright.
#include "zeek/cluster/Telemetry.h"
#include <cinttypes>
#include "zeek/Desc.h"
#include "zeek/Expr.h"
#include "zeek/Func.h"
#include "zeek/IntrusivePtr.h"
#include "zeek/Reporter.h"
#include "zeek/Val.h"
#include "zeek/ZeekString.h"
#include "zeek/cluster/Backend.h"
#include "zeek/telemetry/Manager.h"
#include "zeek/util.h"
namespace zeek::cluster::detail {
TableTopicNormalizer::TableTopicNormalizer() {
topic_normalizations = zeek::id::find_val<zeek::TableVal>("Cluster::Telemetry::topic_normalizations");
}
std::string_view TableTopicNormalizer::operator()(std::string_view topic) {
// TODO: It'd be nice if we could just lookup via string_view so we can
// avoid the allocation of the intermediary StringVal just to match
// against the patterns.
auto sv = zeek::make_intrusive<zeek::StringVal>(topic);
VectorValPtr r = topic_normalizations->LookupPattern(sv);
if ( r->Size() == 0 )
return topic;
// I think this is safe: It returns a string_view to a StringVal that's stored
// persistently in a table[pattern] of string. We only need the string_view for
// looking up the right counter.
return r->StringValAt(0)->ToStdStringView();
}
namespace {
std::vector<telemetry::LabelView> to_label_view_vec(const LabelList& static_label_list) {
std::vector<telemetry::LabelView> labels_view_vec;
labels_view_vec.reserve(static_label_list.size());
for ( const auto& [name, value] : static_label_list )
labels_view_vec.emplace_back(name, value);
return labels_view_vec;
}
std::vector<std::string_view> to_label_names_vec(const LabelList& static_label_list) {
std::vector<std::string_view> label_names_vec;
label_names_vec.reserve(static_label_list.size());
for ( const auto& [name, value] : static_label_list )
label_names_vec.emplace_back(name);
return label_names_vec;
}
} // namespace
InfoTelemetry::InfoTelemetry(std::string_view name, LabelList static_labels, std::string_view prefix) {
if ( name != "core" && name != "websocket" )
zeek::reporter->FatalError("name can only be backend or websocket, got '%s'", std::string(name).c_str());
std::string out_name = util::fmt("cluster_%s_outgoing_events", std::string(name).c_str());
std::string in_name = util::fmt("cluster_%s_incoming_events", std::string(name).c_str());
auto label_view_vec = to_label_view_vec(static_labels);
out = zeek::telemetry_mgr->CounterInstance(prefix, out_name, label_view_vec, "Number of outgoing events");
in = zeek::telemetry_mgr->CounterInstance(prefix, in_name, label_view_vec, "Number of incoming events");
}
void InfoTelemetry::OnOutgoingEvent(std::string_view topic, std::string_view handler_name,
const SerializationInfo& info) {
out->Inc();
}
void InfoTelemetry::OnIncomingEvent(std::string_view topic, std::string_view handler_name,
const SerializationInfo& info) {
in->Inc();
}
VerboseTelemetry::VerboseTelemetry(TopicNormalizer topic_normalizer, std::string_view name, LabelList arg_static_labels,
std::string_view prefix)
: topic_normalizer(std::move(topic_normalizer)), labels(std::move(arg_static_labels)) {
if ( name != "core" && name != "websocket" )
zeek::reporter->FatalError("name can only be backend or websocket, got '%s'", std::string(name).c_str());
// Add topic and handler to the labels. This assumes the caller didn't provide them already.
topic_idx = labels.size();
labels.emplace_back("topic", "");
handler_idx = labels.size();
labels.emplace_back("handler", "");
labels_view = to_label_view_vec(labels);
auto label_names = to_label_names_vec(labels);
std::string out_name = util::fmt("cluster_%s_verbose_outgoing_events", std::string(name).c_str());
std::string in_name = util::fmt("cluster_%s_verbose_incoming_events", std::string(name).c_str());
out = zeek::telemetry_mgr->CounterFamily(prefix, out_name, label_names,
"Number of outgoing events with topic and handler information");
in = zeek::telemetry_mgr->CounterFamily(prefix, in_name, label_names,
"Number of incoming events with topic and handler information");
}
void VerboseTelemetry::OnOutgoingEvent(std::string_view topic, std::string_view handler_name,
const SerializationInfo& info) {
auto normalized_topic = topic_normalizer(topic);
labels_view[topic_idx].second = normalized_topic;
labels_view[handler_idx].second = handler_name;
out->GetOrAdd(labels_view)->Inc();
}
void VerboseTelemetry::OnIncomingEvent(std::string_view topic, std::string_view handler_name,
const SerializationInfo& info) {
auto normalized_topic = topic_normalizer(topic);
labels_view[topic_idx].second = normalized_topic;
labels_view[handler_idx].second = handler_name;
in->GetOrAdd(labels_view)->Inc();
}
namespace {
std::string determine_script_location() {
std::string result = "none";
if ( zeek::detail::call_stack.empty() )
return result;
ssize_t sidx = static_cast<ssize_t>(zeek::detail::call_stack.size()) - 1;
while ( sidx >= 0 ) {
const auto* func = zeek::detail::call_stack[sidx].func;
const auto* ce = zeek::detail::call_stack[sidx].call;
// without_zeekpath_component looks pretty expensive and might be
// better to cache the result using the ce pointer instead of computing
// it over and over again.
const auto* loc = ce->GetLocationInfo();
std::string normalized_location = zeek::util::detail::without_zeekpath_component(loc->filename);
result = normalized_location + ":" + std::to_string(loc->first_line);
break;
}
return result;
}
} // namespace
DebugTelemetry::DebugTelemetry(TopicNormalizer topic_normalizer, std::string_view name, LabelList static_labels,
std::vector<double> arg_size_bounds, std::string_view prefix)
: topic_normalizer(std::move(topic_normalizer)),
size_bounds(std::move(arg_size_bounds)),
labels(std::move(static_labels)) {
if ( name != "core" && name != "websocket" )
zeek::reporter->FatalError("name can only be backend or websocket, got '%s'", std::string(name).c_str());
// Add topic, handler and script_location to the labels. This assumes the caller didn't provide them already.
topic_idx = labels.size();
labels.emplace_back("topic", "");
handler_idx = labels.size();
labels.emplace_back("handler", "");
script_location_idx = labels.size();
labels.emplace_back("script_location", "");
labels_view = to_label_view_vec(labels);
labels_view_no_location = zeek::Span{labels_view.data(), labels_view.size() - 1};
auto label_names = to_label_names_vec(labels);
std::string out_name = util::fmt("cluster_%s_debug_outgoing_event_sizes", std::string(name).c_str());
std::string in_name = util::fmt("cluster_%s_debug_incoming_event_sizes", std::string(name).c_str());
out = zeek::telemetry_mgr->HistogramFamily(
prefix, out_name, label_names, size_bounds,
"The number and size distribution of outgoing events with topic, handler and script location information");
// Remove script-location from incoming metrics
label_names.pop_back();
in =
zeek::telemetry_mgr
->HistogramFamily(prefix, in_name, label_names, size_bounds,
"The number and size distribution of incoming events with topic and handler information");
}
void DebugTelemetry::OnOutgoingEvent(std::string_view topic, std::string_view handler_name,
const SerializationInfo& info) {
auto normalized_topic = topic_normalizer(topic);
std::string script_location = determine_script_location();
labels_view[topic_idx].second = normalized_topic;
labels_view[handler_idx].second = handler_name;
labels_view[script_location_idx].second = script_location;
const auto& hist = out->GetOrAdd(labels_view);
hist->Observe(static_cast<double>(info.Size()));
}
void DebugTelemetry::OnIncomingEvent(std::string_view topic, std::string_view handler_name,
const SerializationInfo& info) {
auto normalized_topic = topic_normalizer(topic);
labels_view[topic_idx].second = normalized_topic;
labels_view[handler_idx].second = handler_name;
const auto& hist = in->GetOrAdd(labels_view_no_location);
hist->Observe(static_cast<double>(info.Size()));
}
// Reads Cluster::Telemetry consts, instantiates and appropriate Telemetry instance and configures
// the given backend with it.
void configure_backend_telemetry(Backend& backend, std::string_view name, LabelList static_labels) {
if ( name != "core" && name != "websocket" )
zeek::reporter->FatalError("name can only be backend or websocket, got '%s'", std::string(name).c_str());
static const auto& info = zeek::id::find_val<zeek::EnumVal>("Cluster::Telemetry::INFO");
static const auto& verbose = zeek::id::find_val<zeek::EnumVal>("Cluster::Telemetry::VERBOSE");
static const auto& debug = zeek::id::find_val<zeek::EnumVal>("Cluster::Telemetry::DEBUG");
std::string var_name = util::fmt("Cluster::Telemetry::%s_metrics", std::string(name).c_str());
const auto& metrics = zeek::id::find_val<zeek::TableVal>(var_name);
auto composite = std::make_unique<detail::CompositeTelemetry>();
for ( const auto& [k, v] : metrics->ToMap() ) {
detail::TelemetryPtr child;
// Keys are (always?) returned as ListVal, take the first one.
auto metric_type = zeek::cast_intrusive<zeek::EnumVal>(k->AsListVal()->Idx(0));
if ( metric_type == info ) {
child = std::make_unique<detail::InfoTelemetry>(name, static_labels);
}
else if ( metric_type == verbose ) {
child = std::make_unique<detail::VerboseTelemetry>(cluster::detail::TableTopicNormalizer(), name,
static_labels);
}
else if ( metric_type == debug ) {
auto bound_val_vec = zeek::id::find_val<zeek::VectorVal>("Cluster::Telemetry::message_size_bounds");
std::vector<double> bounds_vec(bound_val_vec->Size());
for ( unsigned int i = 0; i < bound_val_vec->Size(); i++ )
bounds_vec[i] = bound_val_vec->DoubleAt(i);
child = std::make_unique<detail::DebugTelemetry>(cluster::detail::TableTopicNormalizer(), name,
static_labels, std::move(bounds_vec));
}
else {
zeek::reporter->FatalError("Invalid metric_type %s %" PRIu64, obj_desc_short(metric_type).c_str(),
metric_type->Get());
}
composite->Add(std::move(child));
}
backend.SetTelemetry(std::move(composite));
}
} // namespace zeek::cluster::detail

195
src/cluster/Telemetry.h Normal file
View file

@ -0,0 +1,195 @@
// See the file "COPYING" in the main distribution directory for copyright.
#pragma once
#include <cstddef>
#include <functional>
#include <memory>
#include <string>
#include <string_view>
#include <vector>
#include "zeek/IntrusivePtr.h"
#include "zeek/Span.h"
namespace zeek {
class TableVal;
using TableValPtr = zeek::IntrusivePtr<TableVal>;
namespace telemetry {
class Counter;
using CounterPtr = std::shared_ptr<Counter>;
class CounterFamily;
using CounterFamilyPtr = std::shared_ptr<CounterFamily>;
class HistogramFamily;
using HistogramFamilyPtr = std::shared_ptr<HistogramFamily>;
using LabelView = std::pair<std::string_view, std::string_view>;
} // namespace telemetry
namespace cluster {
class Backend;
namespace detail {
enum class TelemetryScope {
Core,
WebSocket,
};
/**
* Extra information of the serialized version of an Event.
*/
class SerializationInfo {
public:
explicit SerializationInfo(size_t size) : size(size) {}
size_t Size() const { return size; }
private:
size_t size;
};
using TopicNormalizer = std::function<std::string_view(std::string_view)>;
using LabelList = std::vector<std::pair<std::string, std::string>>;
using LabelViewList = std::vector<std::pair<std::string_view, std::string_view>>;
/**
* A topic normalizer using the Cluster::Telemetry::topic_normalizations table.
*/
class TableTopicNormalizer {
public:
TableTopicNormalizer();
std::string_view operator()(std::string_view topic);
private:
zeek::TableValPtr topic_normalizations;
};
class Telemetry {
public:
virtual ~Telemetry() = default;
virtual void OnOutgoingEvent(std::string_view topic, std::string_view handler_name,
const SerializationInfo& info) = 0;
virtual void OnIncomingEvent(std::string_view topic, std::string_view handler_name,
const SerializationInfo& info) = 0;
};
using TelemetryPtr = std::unique_ptr<Telemetry>;
// Reporting nothing.
class NullTelemetry : public Telemetry {
void OnOutgoingEvent(std::string_view topic, std::string_view handler_name,
const SerializationInfo& info) override {}
void OnIncomingEvent(std::string_view topic, std::string_view handler_name,
const SerializationInfo& info) override {}
};
// A container for telemetry instances, delegating to its children.
class CompositeTelemetry : public Telemetry {
public:
void OnOutgoingEvent(std::string_view topic, std::string_view handler_name,
const SerializationInfo& info) override {
for ( const auto& c : children )
c->OnOutgoingEvent(topic, handler_name, info);
}
void OnIncomingEvent(std::string_view topic, std::string_view handler_name,
const SerializationInfo& info) override {
for ( const auto& c : children )
c->OnIncomingEvent(topic, handler_name, info);
}
void Add(TelemetryPtr child) { children.push_back(std::move(child)); }
private:
std::vector<TelemetryPtr> children;
};
/**
* Just one metric for incoming and one for outgoing metrics.
*/
class InfoTelemetry : public Telemetry {
public:
/**
*
* @param name The metric name without prefix.
* @param static_labels Labels to add on all metrics.
* @param prefix The metric prefix.
*/
InfoTelemetry(std::string_view name, LabelList static_labels, std::string_view prefix = "zeek");
void OnOutgoingEvent(std::string_view topic, std::string_view handler_name, const SerializationInfo& info) override;
void OnIncomingEvent(std::string_view topic, std::string_view handler_name, const SerializationInfo& info) override;
private:
telemetry::CounterPtr in, out;
};
/**
* A telemetry class producing metrics labeled with handler names and topics.
*
* Note that randomly generated topic names will cause unbounded
* metrics growth. A topic_normalizer should be injected to normalize
* topic names.
*/
class VerboseTelemetry : public Telemetry {
public:
VerboseTelemetry(TopicNormalizer topic_normalizer, std::string_view name, LabelList static_labels,
std::string_view prefix = "zeek");
void OnOutgoingEvent(std::string_view topic, std::string_view handler_name, const SerializationInfo& info) override;
void OnIncomingEvent(std::string_view topic, std::string_view handler_name, const SerializationInfo& info) override;
private:
TopicNormalizer topic_normalizer;
LabelList labels;
LabelViewList labels_view;
size_t topic_idx, handler_idx; // Index of topic and handler labels in labels_view
telemetry::CounterFamilyPtr in, out;
};
/**
* A telemetry class producing metrics labeled with topics
* and the script layer location for outgoing metrics.
*/
class DebugTelemetry : public Telemetry {
public:
DebugTelemetry(TopicNormalizer topic_normalizer, std::string_view name, LabelList static_labels,
std::vector<double> size_bounds, std::string_view prefix = "zeek");
void OnOutgoingEvent(std::string_view topic, std::string_view handler_name, const SerializationInfo& info) override;
void OnIncomingEvent(std::string_view topic, std::string_view handler_name, const SerializationInfo& info) override;
private:
TopicNormalizer topic_normalizer;
std::vector<double> size_bounds;
LabelList labels;
LabelViewList labels_view;
zeek::Span<telemetry::LabelView> labels_view_no_location;
size_t topic_idx, handler_idx,
script_location_idx; // Index of topic, handler and script_location labels in labels_view
telemetry::HistogramFamilyPtr in, out;
};
/**
* Reads Cluster::Telemetry consts, instantiates and appropriate Telemetry instance
* set it on the given backend.
*
* @param backend The cluster backend to configure.
* @param name The name used in the metric names. Either core or websocket at this point.
* @param static_labels Static labels to attach to metrics.
*/
void configure_backend_telemetry(Backend& backend, std::string_view name, LabelList static_labels = {});
} // namespace detail
} // namespace cluster
} // namespace zeek

View file

@ -138,8 +138,13 @@ std::unique_ptr<WebSocketServer> StartServer(std::unique_ptr<WebSocketEventDispa
remoteIp = std::move(remoteIp), remoteIp = std::move(remoteIp),
ixws = std::move(ixws)](const ix::WebSocketMessagePtr& msg) mutable { ixws = std::move(ixws)](const ix::WebSocketMessagePtr& msg) mutable {
if ( msg->type == ix::WebSocketMessageType::Open ) { if ( msg->type == ix::WebSocketMessageType::Open ) {
dispatcher->QueueForProcessing( std::optional<std::string> application_name;
WebSocketOpen{id, msg->openInfo.uri, msg->openInfo.protocol, std::move(ixws)}); auto it = msg->openInfo.headers.find("X-Application-Name");
if ( it != msg->openInfo.headers.end() )
application_name = it->second;
dispatcher->QueueForProcessing(WebSocketOpen{id, msg->openInfo.uri, msg->openInfo.protocol,
std::move(application_name), std::move(ixws)});
} }
else if ( msg->type == ix::WebSocketMessageType::Message ) { else if ( msg->type == ix::WebSocketMessageType::Message ) {
dispatcher->QueueForProcessing(WebSocketMessage{id, msg->str}); dispatcher->QueueForProcessing(WebSocketMessage{id, msg->str});

View file

@ -4,6 +4,8 @@
#include "zeek/cluster/websocket/WebSocket.h" #include "zeek/cluster/websocket/WebSocket.h"
#include <algorithm>
#include <cctype>
#include <memory> #include <memory>
#include <string_view> #include <string_view>
#include <variant> #include <variant>
@ -14,6 +16,7 @@
#include "zeek/cluster/Manager.h" #include "zeek/cluster/Manager.h"
#include "zeek/cluster/OnLoop.h" #include "zeek/cluster/OnLoop.h"
#include "zeek/cluster/Serializer.h" #include "zeek/cluster/Serializer.h"
#include "zeek/cluster/Telemetry.h"
#include "zeek/cluster/serializer/broker/Serializer.h" #include "zeek/cluster/serializer/broker/Serializer.h"
#include "zeek/cluster/websocket/Plugin.h" #include "zeek/cluster/websocket/Plugin.h"
#include "zeek/cluster/websocket/events.bif.h" #include "zeek/cluster/websocket/events.bif.h"
@ -291,6 +294,24 @@ void WebSocketEventDispatcher::Process(const WebSocketOpen& open) {
return; return;
} }
std::string application_name = open.application_name.value_or("unknown");
// A bit ad-hoc
bool good_application_name = std::all_of(application_name.begin(), application_name.end(), [](auto c) {
return std::isalnum(c) || c == '/' || c == '_' || c == '-' || c == '.' || c == '=' || c == ':' || c == '*' ||
c == '@';
});
if ( ! good_application_name ) {
QueueReply(WebSocketCloseReply{wsc, 1001, "Internal error"});
open.wsc->SendError("invalid_application_name", "Invalid X-Application-Name");
open.wsc->Close(1008, "Invalid X-Application-Name");
// Still create an entry as we might see messages and close events coming in.
clients[id] = WebSocketClientEntry{id, wsc, nullptr};
return;
}
// Generate an ID for this client. // Generate an ID for this client.
auto ws_id = cluster::backend->NodeId() + "-websocket-" + id; auto ws_id = cluster::backend->NodeId() + "-websocket-" + id;
@ -321,6 +342,8 @@ void WebSocketEventDispatcher::Process(const WebSocketOpen& open) {
return; return;
} }
cluster::detail::configure_backend_telemetry(*backend, "websocket", {{"app", application_name}});
WS_DEBUG("New WebSocket client %s (%s:%d) - using id %s backend=%p", id.c_str(), wsc->getRemoteIp().c_str(), WS_DEBUG("New WebSocket client %s (%s:%d) - using id %s backend=%p", id.c_str(), wsc->getRemoteIp().c_str(),
wsc->getRemotePort(), ws_id.c_str(), backend.get()); wsc->getRemotePort(), ws_id.c_str(), backend.get());

View file

@ -125,6 +125,7 @@ struct WebSocketOpen {
std::string id; std::string id;
std::string uri; std::string uri;
std::string protocol; std::string protocol;
std::optional<std::string> application_name;
std::shared_ptr<WebSocketClient> wsc; std::shared_ptr<WebSocketClient> wsc;
}; };

View file

@ -54,6 +54,7 @@
#include "zeek/broker/Manager.h" #include "zeek/broker/Manager.h"
#include "zeek/cluster/Backend.h" #include "zeek/cluster/Backend.h"
#include "zeek/cluster/Manager.h" #include "zeek/cluster/Manager.h"
#include "zeek/cluster/Telemetry.h"
#include "zeek/conn_key/Manager.h" #include "zeek/conn_key/Manager.h"
#include "zeek/file_analysis/Manager.h" #include "zeek/file_analysis/Manager.h"
#include "zeek/input.h" #include "zeek/input.h"
@ -891,6 +892,8 @@ SetupResult setup(int argc, char** argv, Options* zopts) {
cluster::backend = backend.release(); cluster::backend = backend.release();
} }
cluster::detail::configure_backend_telemetry(*cluster::backend, "core");
broker_mgr->InitPostScript(); broker_mgr->InitPostScript();
if ( cluster::backend != broker_mgr ) if ( cluster::backend != broker_mgr )
cluster::backend->InitPostScript(); cluster::backend->InitPostScript();

View file

@ -0,0 +1,9 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
### NOTE: This file has been sorted with diff-sort.
zeek, zeek_cluster_core_incoming_events_total, [node], [manager], 2.0
zeek, zeek_cluster_core_outgoing_events_total, [node], [manager], 3.0
zeek, zeek_cluster_core_verbose_incoming_events_total, [handler, node, topic], [Cluster::Backend::ZeroMQ::hello, manager, zeek.cluster.nodeid.__normalized__], 1.0
zeek, zeek_cluster_core_verbose_incoming_events_total, [handler, node, topic], [Cluster::hello, manager, zeek.cluster.nodeid.__normalized__], 1.0
zeek, zeek_cluster_core_verbose_outgoing_events_total, [handler, node, topic], [Cluster::Backend::ZeroMQ::hello, manager, zeek.cluster.nodeid.__normalized__], 1.0
zeek, zeek_cluster_core_verbose_outgoing_events_total, [handler, node, topic], [Cluster::hello, manager, zeek.cluster.nodeid.__normalized__], 1.0
zeek, zeek_cluster_core_verbose_outgoing_events_total, [handler, node, topic], [finish, manager, zeek.cluster.nodeid.__normalized__], 1.0

View file

@ -0,0 +1,9 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
### NOTE: This file has been sorted with diff-sort.
zeek, zeek_cluster_core_incoming_events_total, [node], [worker-1], 3.0
zeek, zeek_cluster_core_outgoing_events_total, [node], [worker-1], 2.0
zeek, zeek_cluster_core_verbose_incoming_events_total, [handler, node, topic], [Cluster::Backend::ZeroMQ::hello, worker-1, zeek.cluster.nodeid.__normalized__], 1.0
zeek, zeek_cluster_core_verbose_incoming_events_total, [handler, node, topic], [Cluster::hello, worker-1, zeek.cluster.nodeid.__normalized__], 1.0
zeek, zeek_cluster_core_verbose_incoming_events_total, [handler, node, topic], [finish, worker-1, zeek.cluster.nodeid.__normalized__], 1.0
zeek, zeek_cluster_core_verbose_outgoing_events_total, [handler, node, topic], [Cluster::Backend::ZeroMQ::hello, worker-1, zeek.cluster.nodeid.__normalized__], 1.0
zeek, zeek_cluster_core_verbose_outgoing_events_total, [handler, node, topic], [Cluster::hello, worker-1, zeek.cluster.nodeid.__normalized__], 1.0

View file

@ -0,0 +1,3 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
### NOTE: This file has been sorted with diff-sort.
received termination signal

View file

@ -0,0 +1,33 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
### NOTE: This file has been sorted with diff-sort.
Cluster::websocket_client_added, [/test/pings, /zeek/wstest/ws1/]
zeek, zeek_cluster_core_debug_incoming_event_sizes, [handler, node, topic], [ping, manager, /test/pings/0], [0.0, 1.0, 0.0, 3.0, 4.0, 17.0, 0.0, 0.0, 0.0]
zeek, zeek_cluster_core_debug_incoming_event_sizes, [handler, node, topic], [ping, manager, /test/pings/1], [0.0, 0.0, 1.0, 3.0, 4.0, 17.0, 0.0, 0.0, 0.0]
zeek, zeek_cluster_core_debug_incoming_event_sizes, [handler, node, topic], [ping, manager, /test/pings/2], [0.0, 0.0, 0.0, 4.0, 3.0, 18.0, 0.0, 0.0, 0.0]
zeek, zeek_cluster_core_debug_incoming_event_sizes, [handler, node, topic], [ping, manager, /test/pings/3], [0.0, 0.0, 0.0, 3.0, 4.0, 18.0, 0.0, 0.0, 0.0]
zeek, zeek_cluster_core_debug_outgoing_event_sizes, [handler, node, script_location, topic], [ping, manager, ./manager.zeek:32, /test/pings/0], [0.0, 1.0, 0.0, 3.0, 4.0, 17.0, 0.0, 0.0, 0.0]
zeek, zeek_cluster_core_debug_outgoing_event_sizes, [handler, node, script_location, topic], [ping, manager, ./manager.zeek:32, /test/pings/2], [0.0, 0.0, 0.0, 4.0, 3.0, 18.0, 0.0, 0.0, 0.0]
zeek, zeek_cluster_core_incoming_events_total, [node], [manager], 100.0
zeek, zeek_cluster_core_outgoing_events_total, [node], [manager], 50.0
zeek, zeek_cluster_core_verbose_incoming_events_total, [handler, node, topic], [ping, manager, /test/pings/0], 25.0
zeek, zeek_cluster_core_verbose_incoming_events_total, [handler, node, topic], [ping, manager, /test/pings/1], 25.0
zeek, zeek_cluster_core_verbose_incoming_events_total, [handler, node, topic], [ping, manager, /test/pings/2], 25.0
zeek, zeek_cluster_core_verbose_incoming_events_total, [handler, node, topic], [ping, manager, /test/pings/3], 25.0
zeek, zeek_cluster_core_verbose_outgoing_events_total, [handler, node, topic], [ping, manager, /test/pings/0], 25.0
zeek, zeek_cluster_core_verbose_outgoing_events_total, [handler, node, topic], [ping, manager, /test/pings/2], 25.0
zeek, zeek_cluster_websocket_debug_incoming_event_sizes, [app, handler, node, topic], [btest-python-client, ping, manager, /test/pings/0], [0.0, 1.0, 0.0, 3.0, 4.0, 17.0, 0.0, 0.0, 0.0]
zeek, zeek_cluster_websocket_debug_incoming_event_sizes, [app, handler, node, topic], [btest-python-client, ping, manager, /test/pings/2], [0.0, 0.0, 0.0, 4.0, 3.0, 18.0, 0.0, 0.0, 0.0]
zeek, zeek_cluster_websocket_debug_outgoing_event_sizes, [app, handler, node, script_location, topic], [btest-python-client, ping, manager, none, /test/pings/0], [0.0, 1.0, 0.0, 3.0, 4.0, 17.0, 0.0, 0.0, 0.0]
zeek, zeek_cluster_websocket_debug_outgoing_event_sizes, [app, handler, node, script_location, topic], [btest-python-client, ping, manager, none, /test/pings/1], [0.0, 0.0, 1.0, 3.0, 4.0, 17.0, 0.0, 0.0, 0.0]
zeek, zeek_cluster_websocket_debug_outgoing_event_sizes, [app, handler, node, script_location, topic], [btest-python-client, ping, manager, none, /test/pings/2], [0.0, 0.0, 0.0, 4.0, 3.0, 18.0, 0.0, 0.0, 0.0]
zeek, zeek_cluster_websocket_debug_outgoing_event_sizes, [app, handler, node, script_location, topic], [btest-python-client, ping, manager, none, /test/pings/3], [0.0, 0.0, 0.0, 3.0, 4.0, 18.0, 0.0, 0.0, 0.0]
zeek, zeek_cluster_websocket_incoming_events_total, [app, node], [btest-python-client, manager], 50.0
zeek, zeek_cluster_websocket_outgoing_events_total, [app, node], [btest-python-client, manager], 100.0
zeek, zeek_cluster_websocket_verbose_incoming_events_total, [app, handler, node, topic], [btest-python-client, ping, manager, /test/pings/0], 25.0
zeek, zeek_cluster_websocket_verbose_incoming_events_total, [app, handler, node, topic], [btest-python-client, ping, manager, /test/pings/2], 25.0
zeek, zeek_cluster_websocket_verbose_outgoing_events_total, [app, handler, node, topic], [btest-python-client, ping, manager, /test/pings/0], 25.0
zeek, zeek_cluster_websocket_verbose_outgoing_events_total, [app, handler, node, topic], [btest-python-client, ping, manager, /test/pings/1], 25.0
zeek, zeek_cluster_websocket_verbose_outgoing_events_total, [app, handler, node, topic], [btest-python-client, ping, manager, /test/pings/2], 25.0
zeek, zeek_cluster_websocket_verbose_outgoing_events_total, [app, handler, node, topic], [btest-python-client, ping, manager, /test/pings/3], 25.0
zeek_cluster_* histogram metrics, 12
zeek_cluster_* metrics, 16

View file

@ -0,0 +1,3 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
### NOTE: This file has been sorted with diff-sort.
received termination signal

View file

@ -0,0 +1,18 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
### NOTE: This file has been sorted with diff-sort.
Cluster::websocket_client_added, [/test/pings, /zeek/wstest/ws1/]
zeek, zeek_cluster_core_incoming_events_total, [node], [manager], 100.0
zeek, zeek_cluster_core_outgoing_events_total, [node], [manager], 50.0
zeek, zeek_cluster_core_verbose_incoming_events_total, [handler, node, topic], [ping, manager, /test/pings/0], 25.0
zeek, zeek_cluster_core_verbose_incoming_events_total, [handler, node, topic], [ping, manager, /test/pings/1], 25.0
zeek, zeek_cluster_core_verbose_incoming_events_total, [handler, node, topic], [ping, manager, /test/pings/2], 25.0
zeek, zeek_cluster_core_verbose_incoming_events_total, [handler, node, topic], [ping, manager, /test/pings/3], 25.0
zeek, zeek_cluster_core_verbose_outgoing_events_total, [handler, node, topic], [ping, manager, /test/pings], 50.0
zeek, zeek_cluster_websocket_incoming_events_total, [app, node], [unknown, manager], 50.0
zeek, zeek_cluster_websocket_outgoing_events_total, [app, node], [unknown, manager], 100.0
zeek, zeek_cluster_websocket_verbose_incoming_events_total, [app, handler, node, topic], [unknown, ping, manager, /test/pings], 50.0
zeek, zeek_cluster_websocket_verbose_outgoing_events_total, [app, handler, node, topic], [unknown, ping, manager, /test/pings/0], 25.0
zeek, zeek_cluster_websocket_verbose_outgoing_events_total, [app, handler, node, topic], [unknown, ping, manager, /test/pings/1], 25.0
zeek, zeek_cluster_websocket_verbose_outgoing_events_total, [app, handler, node, topic], [unknown, ping, manager, /test/pings/2], 25.0
zeek, zeek_cluster_websocket_verbose_outgoing_events_total, [app, handler, node, topic], [unknown, ping, manager, /test/pings/3], 25.0
zeek_cluster_* metrics, 14

View file

@ -140,6 +140,7 @@ scripts/base/init-frameworks-and-bifs.zeek
build/scripts/base/bif/plugins/Zeek_Cluster_WebSocket.events.bif.zeek build/scripts/base/bif/plugins/Zeek_Cluster_WebSocket.events.bif.zeek
scripts/base/frameworks/cluster/pools.zeek scripts/base/frameworks/cluster/pools.zeek
scripts/base/utils/hash_hrw.zeek scripts/base/utils/hash_hrw.zeek
scripts/base/frameworks/cluster/telemetry.zeek
scripts/base/frameworks/config/__load__.zeek scripts/base/frameworks/config/__load__.zeek
scripts/base/frameworks/config/main.zeek scripts/base/frameworks/config/main.zeek
scripts/base/frameworks/config/input.zeek scripts/base/frameworks/config/input.zeek

View file

@ -140,6 +140,7 @@ scripts/base/init-frameworks-and-bifs.zeek
build/scripts/base/bif/plugins/Zeek_Cluster_WebSocket.events.bif.zeek build/scripts/base/bif/plugins/Zeek_Cluster_WebSocket.events.bif.zeek
scripts/base/frameworks/cluster/pools.zeek scripts/base/frameworks/cluster/pools.zeek
scripts/base/utils/hash_hrw.zeek scripts/base/utils/hash_hrw.zeek
scripts/base/frameworks/cluster/telemetry.zeek
scripts/base/frameworks/config/__load__.zeek scripts/base/frameworks/config/__load__.zeek
scripts/base/frameworks/config/main.zeek scripts/base/frameworks/config/main.zeek
scripts/base/frameworks/config/input.zeek scripts/base/frameworks/config/input.zeek

View file

@ -491,6 +491,7 @@
0.000000 MetaHookPost LoadFile(0, ./store.bif.zeek, <...>/store.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./store.bif.zeek, <...>/store.bif.zeek) -> -1
0.000000 MetaHookPost LoadFile(0, ./strings.bif.zeek, <...>/strings.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./strings.bif.zeek, <...>/strings.bif.zeek) -> -1
0.000000 MetaHookPost LoadFile(0, ./supervisor.bif.zeek, <...>/supervisor.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./supervisor.bif.zeek, <...>/supervisor.bif.zeek) -> -1
0.000000 MetaHookPost LoadFile(0, ./telemetry, <...>/telemetry.zeek) -> -1
0.000000 MetaHookPost LoadFile(0, ./telemetry_consts.bif.zeek, <...>/telemetry_consts.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./telemetry_consts.bif.zeek, <...>/telemetry_consts.bif.zeek) -> -1
0.000000 MetaHookPost LoadFile(0, ./telemetry_functions.bif.zeek, <...>/telemetry_functions.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./telemetry_functions.bif.zeek, <...>/telemetry_functions.bif.zeek) -> -1
0.000000 MetaHookPost LoadFile(0, ./telemetry_types.bif.zeek, <...>/telemetry_types.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./telemetry_types.bif.zeek, <...>/telemetry_types.bif.zeek) -> -1
@ -805,6 +806,7 @@
0.000000 MetaHookPost LoadFileExtended(0, ./store.bif.zeek, <...>/store.bif.zeek) -> (-1, <no content>) 0.000000 MetaHookPost LoadFileExtended(0, ./store.bif.zeek, <...>/store.bif.zeek) -> (-1, <no content>)
0.000000 MetaHookPost LoadFileExtended(0, ./strings.bif.zeek, <...>/strings.bif.zeek) -> (-1, <no content>) 0.000000 MetaHookPost LoadFileExtended(0, ./strings.bif.zeek, <...>/strings.bif.zeek) -> (-1, <no content>)
0.000000 MetaHookPost LoadFileExtended(0, ./supervisor.bif.zeek, <...>/supervisor.bif.zeek) -> (-1, <no content>) 0.000000 MetaHookPost LoadFileExtended(0, ./supervisor.bif.zeek, <...>/supervisor.bif.zeek) -> (-1, <no content>)
0.000000 MetaHookPost LoadFileExtended(0, ./telemetry, <...>/telemetry.zeek) -> (-1, <no content>)
0.000000 MetaHookPost LoadFileExtended(0, ./telemetry_consts.bif.zeek, <...>/telemetry_consts.bif.zeek) -> (-1, <no content>) 0.000000 MetaHookPost LoadFileExtended(0, ./telemetry_consts.bif.zeek, <...>/telemetry_consts.bif.zeek) -> (-1, <no content>)
0.000000 MetaHookPost LoadFileExtended(0, ./telemetry_functions.bif.zeek, <...>/telemetry_functions.bif.zeek) -> (-1, <no content>) 0.000000 MetaHookPost LoadFileExtended(0, ./telemetry_functions.bif.zeek, <...>/telemetry_functions.bif.zeek) -> (-1, <no content>)
0.000000 MetaHookPost LoadFileExtended(0, ./telemetry_types.bif.zeek, <...>/telemetry_types.bif.zeek) -> (-1, <no content>) 0.000000 MetaHookPost LoadFileExtended(0, ./telemetry_types.bif.zeek, <...>/telemetry_types.bif.zeek) -> (-1, <no content>)
@ -1430,6 +1432,7 @@
0.000000 MetaHookPre LoadFile(0, ./store.bif.zeek, <...>/store.bif.zeek) 0.000000 MetaHookPre LoadFile(0, ./store.bif.zeek, <...>/store.bif.zeek)
0.000000 MetaHookPre LoadFile(0, ./strings.bif.zeek, <...>/strings.bif.zeek) 0.000000 MetaHookPre LoadFile(0, ./strings.bif.zeek, <...>/strings.bif.zeek)
0.000000 MetaHookPre LoadFile(0, ./supervisor.bif.zeek, <...>/supervisor.bif.zeek) 0.000000 MetaHookPre LoadFile(0, ./supervisor.bif.zeek, <...>/supervisor.bif.zeek)
0.000000 MetaHookPre LoadFile(0, ./telemetry, <...>/telemetry.zeek)
0.000000 MetaHookPre LoadFile(0, ./telemetry_consts.bif.zeek, <...>/telemetry_consts.bif.zeek) 0.000000 MetaHookPre LoadFile(0, ./telemetry_consts.bif.zeek, <...>/telemetry_consts.bif.zeek)
0.000000 MetaHookPre LoadFile(0, ./telemetry_functions.bif.zeek, <...>/telemetry_functions.bif.zeek) 0.000000 MetaHookPre LoadFile(0, ./telemetry_functions.bif.zeek, <...>/telemetry_functions.bif.zeek)
0.000000 MetaHookPre LoadFile(0, ./telemetry_types.bif.zeek, <...>/telemetry_types.bif.zeek) 0.000000 MetaHookPre LoadFile(0, ./telemetry_types.bif.zeek, <...>/telemetry_types.bif.zeek)
@ -1744,6 +1747,7 @@
0.000000 MetaHookPre LoadFileExtended(0, ./store.bif.zeek, <...>/store.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./store.bif.zeek, <...>/store.bif.zeek)
0.000000 MetaHookPre LoadFileExtended(0, ./strings.bif.zeek, <...>/strings.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./strings.bif.zeek, <...>/strings.bif.zeek)
0.000000 MetaHookPre LoadFileExtended(0, ./supervisor.bif.zeek, <...>/supervisor.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./supervisor.bif.zeek, <...>/supervisor.bif.zeek)
0.000000 MetaHookPre LoadFileExtended(0, ./telemetry, <...>/telemetry.zeek)
0.000000 MetaHookPre LoadFileExtended(0, ./telemetry_consts.bif.zeek, <...>/telemetry_consts.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./telemetry_consts.bif.zeek, <...>/telemetry_consts.bif.zeek)
0.000000 MetaHookPre LoadFileExtended(0, ./telemetry_functions.bif.zeek, <...>/telemetry_functions.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./telemetry_functions.bif.zeek, <...>/telemetry_functions.bif.zeek)
0.000000 MetaHookPre LoadFileExtended(0, ./telemetry_types.bif.zeek, <...>/telemetry_types.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./telemetry_types.bif.zeek, <...>/telemetry_types.bif.zeek)
@ -2380,6 +2384,7 @@
0.000000 | HookLoadFile ./store.bif.zeek <...>/store.bif.zeek 0.000000 | HookLoadFile ./store.bif.zeek <...>/store.bif.zeek
0.000000 | HookLoadFile ./strings.bif.zeek <...>/strings.bif.zeek 0.000000 | HookLoadFile ./strings.bif.zeek <...>/strings.bif.zeek
0.000000 | HookLoadFile ./supervisor.bif.zeek <...>/supervisor.bif.zeek 0.000000 | HookLoadFile ./supervisor.bif.zeek <...>/supervisor.bif.zeek
0.000000 | HookLoadFile ./telemetry <...>/telemetry.zeek
0.000000 | HookLoadFile ./telemetry_consts.bif.zeek <...>/telemetry_consts.bif.zeek 0.000000 | HookLoadFile ./telemetry_consts.bif.zeek <...>/telemetry_consts.bif.zeek
0.000000 | HookLoadFile ./telemetry_functions.bif.zeek <...>/telemetry_functions.bif.zeek 0.000000 | HookLoadFile ./telemetry_functions.bif.zeek <...>/telemetry_functions.bif.zeek
0.000000 | HookLoadFile ./telemetry_types.bif.zeek <...>/telemetry_types.bif.zeek 0.000000 | HookLoadFile ./telemetry_types.bif.zeek <...>/telemetry_types.bif.zeek
@ -2694,6 +2699,7 @@
0.000000 | HookLoadFileExtended ./store.bif.zeek <...>/store.bif.zeek 0.000000 | HookLoadFileExtended ./store.bif.zeek <...>/store.bif.zeek
0.000000 | HookLoadFileExtended ./strings.bif.zeek <...>/strings.bif.zeek 0.000000 | HookLoadFileExtended ./strings.bif.zeek <...>/strings.bif.zeek
0.000000 | HookLoadFileExtended ./supervisor.bif.zeek <...>/supervisor.bif.zeek 0.000000 | HookLoadFileExtended ./supervisor.bif.zeek <...>/supervisor.bif.zeek
0.000000 | HookLoadFileExtended ./telemetry <...>/telemetry.zeek
0.000000 | HookLoadFileExtended ./telemetry_consts.bif.zeek <...>/telemetry_consts.bif.zeek 0.000000 | HookLoadFileExtended ./telemetry_consts.bif.zeek <...>/telemetry_consts.bif.zeek
0.000000 | HookLoadFileExtended ./telemetry_functions.bif.zeek <...>/telemetry_functions.bif.zeek 0.000000 | HookLoadFileExtended ./telemetry_functions.bif.zeek <...>/telemetry_functions.bif.zeek
0.000000 | HookLoadFileExtended ./telemetry_types.bif.zeek <...>/telemetry_types.bif.zeek 0.000000 | HookLoadFileExtended ./telemetry_types.bif.zeek <...>/telemetry_types.bif.zeek

View file

@ -90,14 +90,18 @@ class TestClient:
return self.__name return self.__name
def connect(name: str, url: Optional[str] = None) -> TestClient: def connect(
name: str,
url: Optional[str] = None,
additional_headers: Optional[dict[str, str]] = None,
) -> TestClient:
""" """
Connect to a WebSocket server and return a TestClient instance. Connect to a WebSocket server and return a TestClient instance.
""" """
if url is None: if url is None:
url = WS4_URL_V1 url = WS4_URL_V1
cc = websockets.sync.client.connect(url) cc = websockets.sync.client.connect(url, additional_headers=additional_headers)
return TestClient(name, cc) return TestClient(name, cc)

View file

@ -0,0 +1,68 @@
# @TEST-DOC: All parties log their cluster metrics at zeek_done() time.
#
# @TEST-REQUIRES: have-zeromq
#
# @TEST-GROUP: cluster-zeromq
#
# @TEST-PORT: XPUB_PORT
# @TEST-PORT: XSUB_PORT
# @TEST-PORT: LOG_PULL_PORT
#
# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-simple.zeek cluster-layout.zeek
# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek
#
# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek >out"
# @TEST-EXEC: btest-bg-run worker "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-1 zeek -b ../worker.zeek >out"
#
# @TEST-EXEC: btest-bg-wait 30
# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-sort btest-diff ./manager/out
# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-sort btest-diff ./worker/out
# @TEST-START-FILE common.zeek
@load base/frameworks/telemetry
@load ./zeromq-test-bootstrap
redef Cluster::Telemetry::core_metrics += {
Cluster::Telemetry::VERBOSE,
};
redef Cluster::Telemetry::websocket_metrics += {
Cluster::Telemetry::VERBOSE,
};
global finish: event(name: string);
event zeek_done()
{
local ms = Telemetry::collect_metrics("zeek", "cluster_core_*");
ms += Telemetry::collect_metrics("zeek", "cluster_websocket_*");
for ( _, m in ms )
print m$opts$prefix, m$opts$name, m$label_names, m$label_values, m$value;
}
# @TEST-END-FILE
# @TEST-START-FILE manager.zeek
@load ./common.zeek
# If a node comes up that isn't us, send it a finish event.
event Cluster::node_up(name: string, id: string) {
Cluster::publish(Cluster::nodeid_topic(id), finish, Cluster::node);
}
# If the worker vanishes, finish the test.
event Cluster::node_down(name: string, id: string) {
terminate();
}
# @TEST-END-FILE
# @TEST-START-FILE worker.zeek
@load ./common.zeek
event Cluster::node_up(name: string, id: string) {
}
event finish(name: string) &is_used {
terminate();
}
# @TEST-END-FILE

View file

@ -0,0 +1,105 @@
# @TEST-DOC: Output cluster telemetry after working with a WebSocket client. The WebSocket client sends an X-Application-Name header. Also include debug metrics as histograms in the output.
#
# @TEST-REQUIRES: have-zeromq
# @TEST-REQUIRES: python3 -c 'import websockets.sync'
#
# @TEST-GROUP: cluster-zeromq
#
# @TEST-PORT: XPUB_PORT
# @TEST-PORT: XSUB_PORT
# @TEST-PORT: LOG_PULL_PORT
# @TEST-PORT: WEBSOCKET_PORT
#
# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-simple.zeek cluster-layout.zeek
# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek
# @TEST-EXEC: cp $FILES/ws/wstest.py .
#
# @TEST-EXEC: zeek -b --parse-only manager.zeek
# @TEST-EXEC: python3 -m py_compile client.py
#
# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek"
# @TEST-EXEC: python3 client.py
#
# @TEST-EXEC: btest-bg-wait 10
# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-sort btest-diff ./manager/.stdout
# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-sort btest-diff ./manager/.stderr
# @TEST-START-FILE manager.zeek
@load base/frameworks/telemetry
@load ./zeromq-test-bootstrap
redef Cluster::Telemetry::core_metrics += {
Cluster::Telemetry::VERBOSE,
Cluster::Telemetry::DEBUG,
};
redef Cluster::Telemetry::websocket_metrics += {
Cluster::Telemetry::VERBOSE,
Cluster::Telemetry::DEBUG,
};
redef exit_only_after_terminate = T;
global expected_ping_count = 100;
global ping_count = 0;
global ping: event(msg: string, c: count) &is_used;
event zeek_init()
{
Cluster::subscribe("/test/pings");
Cluster::listen_websocket([$listen_addr=127.0.0.1, $listen_port=to_port(getenv("WEBSOCKET_PORT"))]);
}
event ping(msg: string, n: count) &is_used
{
if ( ping_count % 2 == 0) # Reply every other ping.
{
Cluster::publish(fmt("/test/pings/%s", ping_count % 4), ping, msg, n);
}
++ping_count;
if ( ping_count == expected_ping_count )
terminate();
}
event Cluster::websocket_client_added(info: Cluster::EndpointInfo, subscriptions: string_vec)
{
print "Cluster::websocket_client_added", subscriptions;
}
event zeek_done()
{
local ms = Telemetry::collect_metrics("zeek", "cluster_core_*");
ms += Telemetry::collect_metrics("zeek", "cluster_websocket_*");
print "zeek_cluster_* metrics", |ms|;
for ( _, m in ms )
print m$opts$prefix, m$opts$name, m$label_names, m$label_values, m$value;
local hms = Telemetry::collect_histogram_metrics("zeek", "cluster_core_*");
hms += Telemetry::collect_histogram_metrics("zeek", "cluster_websocket_*");
print "zeek_cluster_* histogram metrics", |hms|;
for ( _, hm in hms )
print hm$opts$prefix, hm$opts$name, hm$label_names, hm$label_values, hm$values;
}
# @TEST-END-FILE
# @TEST-START-FILE client.py
import wstest
def run(ws_url):
with wstest.connect("ws1", ws_url, additional_headers={"X-Application-Name": "btest-python-client"}) as tc:
tc.hello_v1(["/test/pings"])
for i in range(0, 100):
msg = f"ping {i}" + (i * 32 * "A")
tc.send_json(wstest.build_event_v1(f"/test/pings/{i % 4}", "ping", [msg, i]))
if i % 2 == 0: # Wait for a reply for every other ping
tc.recv_json()
if __name__ == "__main__":
wstest.main(run, wstest.WS4_URL_V1)
# @TEST-END-FILE

View file

@ -0,0 +1,93 @@
# @TEST-DOC: Output cluster telemetry after working with a WebSocket client.
#
# @TEST-REQUIRES: have-zeromq
# @TEST-REQUIRES: python3 -c 'import websockets.sync'
#
# @TEST-GROUP: cluster-zeromq
#
# @TEST-PORT: XPUB_PORT
# @TEST-PORT: XSUB_PORT
# @TEST-PORT: LOG_PULL_PORT
# @TEST-PORT: WEBSOCKET_PORT
#
# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-simple.zeek cluster-layout.zeek
# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek
# @TEST-EXEC: cp $FILES/ws/wstest.py .
#
# @TEST-EXEC: zeek -b --parse-only manager.zeek
# @TEST-EXEC: python3 -m py_compile client.py
#
# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek"
# @TEST-EXEC: python3 client.py
#
# @TEST-EXEC: btest-bg-wait 10
# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-sort btest-diff ./manager/.stdout
# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-sort btest-diff ./manager/.stderr
# @TEST-START-FILE manager.zeek
@load base/frameworks/telemetry
@load ./zeromq-test-bootstrap
redef Cluster::Telemetry::core_metrics += {
Cluster::Telemetry::VERBOSE,
};
redef Cluster::Telemetry::websocket_metrics += {
Cluster::Telemetry::VERBOSE,
};
redef exit_only_after_terminate = T;
global expected_ping_count = 100;
global ping_count = 0;
global ping: event(msg: string, c: count) &is_used;
event zeek_init()
{
Cluster::subscribe("/test/pings");
Cluster::listen_websocket([$listen_addr=127.0.0.1, $listen_port=to_port(getenv("WEBSOCKET_PORT"))]);
}
event ping(msg: string, n: count) &is_used
{
if ( ping_count % 2 == 0) # Reply every other ping.
Cluster::publish("/test/pings", ping, msg, n);
++ping_count;
if ( ping_count == expected_ping_count )
terminate();
}
event Cluster::websocket_client_added(info: Cluster::EndpointInfo, subscriptions: string_vec)
{
print "Cluster::websocket_client_added", subscriptions;
}
event zeek_done()
{
local ms = Telemetry::collect_metrics("zeek", "cluster_core_*");
ms += Telemetry::collect_metrics("zeek", "cluster_websocket_*");
print "zeek_cluster_* metrics", |ms|;
for ( _, m in ms )
print m$opts$prefix, m$opts$name, m$label_names, m$label_values, m$value;
}
# @TEST-END-FILE
# @TEST-START-FILE client.py
import wstest
def run(ws_url):
with wstest.connect("ws1", ws_url) as tc:
tc.hello_v1(["/test/pings"])
for i in range(0, 100):
tc.send_json(wstest.build_event_v1(f"/test/pings/{i % 4}", "ping", [f"ping {i}", i]))
if i % 2 == 0: # Wait for a reply for every other ping
tc.recv_json()
if __name__ == "__main__":
wstest.main(run, wstest.WS4_URL_V1)
# @TEST-END-FILE