Merge remote-tracking branch 'origin/topic/awelzel/pluggable-cluster-backends-part2'

* origin/topic/awelzel/pluggable-cluster-backends-part2:
  ci/test.sh: Run doctest with TZ=UTC
  cluster/setup-connections: Switch to Cluster::subscribe(), short-circuit broker
  cluster/serializer: Add Broker based event serializers
  cluster: Add Cluster scoped bifs
  Reporter: Add ScriptLocationScope helper
  init-bare/zeek-setup: Add Cluster::backend const &redef
  broker: Implement cluster::Backend interface
  Broker: Fix some error messages
  broker: Remove MakeEvent(ArgsSpan)
This commit is contained in:
Arne Welzel 2024-11-26 17:45:08 +01:00
commit 3ce41f1eff
36 changed files with 1107 additions and 64 deletions

41
CHANGES
View file

@ -1,3 +1,44 @@
7.1.0-dev.629 | 2024-11-26 17:45:08 +0100
* ci/test.sh: Run doctest with TZ=UTC (Arne Welzel, Corelight)
Broker's JSON serialization is TZ dependent (which seems a bug). For now
do the same as we do in btest.cfg and run doctests with TZ set to UTC.
Reported in zeek/broker#434.
* cluster/setup-connections: Switch to Cluster::subscribe(), short-circuit broker (Arne Welzel, Corelight)
For the time being, this is easiest, otherwise we'd need to
conditionally load a broker-specific policy script based on
Cluster::backend being set.
* cluster/serializer: Add Broker based event serializers (Arne Welzel, Corelight)
This adds the first event serializers that use
broker functionality. Binary and JSON formats.
* cluster: Add Cluster scoped bifs (Arne Welzel, Corelight)
... and a broker based test using Cluster::publish() and
Cluster::subscribe().
* Reporter: Add ScriptLocationScope helper (Arne Welzel, Corelight)
* init-bare/zeek-setup: Add Cluster::backend const &redef (Arne Welzel, Corelight)
* broker: Implement cluster::Backend interface (Arne Welzel, Corelight)
* Broker: Fix some error messages (Arne Welzel, Corelight)
* broker: Remove MakeEvent(ArgsSpan) (Arne Welzel, Corelight)
This was added previously in the 7.1 cycle. Now that MakeEvent() was
removed from cluster::Backend, there's no need for Broker to provide
this version.
* Update 3dparty submodule (Johanna Amann, Corelight)
7.1.0-dev.618 | 2024-11-26 17:17:00 +0100 7.1.0-dev.618 | 2024-11-26 17:17:00 +0100
* GH-4052: More robust memory management for ZAM execution - fixes #4052 (Vern Paxson, Corelight) * GH-4052: More robust memory management for ZAM execution - fixes #4052 (Vern Paxson, Corelight)

View file

@ -1 +1 @@
7.1.0-dev.618 7.1.0-dev.629

View file

@ -48,7 +48,7 @@ function run_unit_tests {
banner "Running unit tests" banner "Running unit tests"
pushd build pushd build
(. ./zeek-path-dev.sh && zeek --test --no-skip) || result=1 (. ./zeek-path-dev.sh && TZ=UTC zeek --test --no-skip) || result=1
popd popd
return 0 return 0
} }

View file

@ -281,7 +281,30 @@ export {
## a given cluster node. ## a given cluster node.
global nodeid_topic: function(id: string): string; global nodeid_topic: function(id: string): string;
## Initialize the cluster backend.
##
## Cluster backends usually invoke this from a :zeek:see:`zeek_init` handler.
##
## Returns: T on success, else F.
global init: function(): bool;
## Subscribe to the given topic.
##
## topic: The topic to subscribe to.
##
## Returns: T on success, else F.
global subscribe: function(topic: string): bool;
## Unsubscribe from the given topic.
##
## topic: The topic to unsubscribe from.
##
## Returns: T on success, else F.
global unsubscribe: function(topic: string): bool;
## An event instance for cluster pub/sub. ## An event instance for cluster pub/sub.
##
## See :zeek:see:`Cluster::publish` and :zeek:see:`Cluster::make_event`.
type Event: record { type Event: record {
## The event handler to be invoked on the remote node. ## The event handler to be invoked on the remote node.
ev: any; ev: any;
@ -290,6 +313,10 @@ export {
}; };
} }
# Needs declaration of Cluster::Event type.
@load base/bif/cluster.bif
# Track active nodes per type. # Track active nodes per type.
global active_node_ids: table[NodeType] of set[string]; global active_node_ids: table[NodeType] of set[string];
@ -528,3 +555,18 @@ function log(msg: string)
{ {
Log::write(Cluster::LOG, [$ts = network_time(), $node = node, $message = msg]); Log::write(Cluster::LOG, [$ts = network_time(), $node = node, $message = msg]);
} }
function init(): bool
{
return Cluster::Backend::__init();
}
function subscribe(topic: string): bool
{
return Cluster::__subscribe(topic);
}
function unsubscribe(topic: string): bool
{
return Cluster::__unsubscribe(topic);
}

View file

@ -69,7 +69,7 @@ event zeek_init() &priority=-10
local pool = registered_pools[i]; local pool = registered_pools[i];
if ( node in pool$nodes ) if ( node in pool$nodes )
Broker::subscribe(pool$spec$topic); Cluster::subscribe(pool$spec$topic);
} }
switch ( self$node_type ) { switch ( self$node_type ) {
@ -78,29 +78,47 @@ event zeek_init() &priority=-10
case CONTROL: case CONTROL:
break; break;
case LOGGER: case LOGGER:
Broker::subscribe(Cluster::logger_topic); Cluster::subscribe(Cluster::logger_topic);
Broker::subscribe(Broker::default_log_topic_prefix);
break; break;
case MANAGER: case MANAGER:
Broker::subscribe(Cluster::manager_topic); Cluster::subscribe(Cluster::manager_topic);
if ( Cluster::manager_is_logger )
Broker::subscribe(Broker::default_log_topic_prefix);
break; break;
case PROXY: case PROXY:
Broker::subscribe(Cluster::proxy_topic); Cluster::subscribe(Cluster::proxy_topic);
break; break;
case WORKER: case WORKER:
Broker::subscribe(Cluster::worker_topic); Cluster::subscribe(Cluster::worker_topic);
break; break;
default: default:
Reporter::error(fmt("Unhandled cluster node type: %s", self$node_type)); Reporter::error(fmt("Unhandled cluster node type: %s", self$node_type));
return; return;
} }
Broker::subscribe(nodeid_topic(Broker::node_id())); Cluster::subscribe(nodeid_topic(Broker::node_id()));
Broker::subscribe(node_topic(node)); Cluster::subscribe(node_topic(node));
# Listening and connecting to other peers is broker specific,
# short circuit if Zeek is configured with a different
# cluster backend.
#
# In the future, this could move into a policy script, but
# for the time being it's easier for backwards compatibility
# to keep this here.
if ( Cluster::backend != Cluster::CLUSTER_BACKEND_BROKER )
return;
# Logging setup: Anything handling logging additionally subscribes
# to Broker::default_log_topic_prefix.
switch ( self$node_type ) {
case LOGGER:
Cluster::subscribe(Broker::default_log_topic_prefix);
break;
case MANAGER:
if ( Cluster::manager_is_logger )
Cluster::subscribe(Broker::default_log_topic_prefix);
break;
}
if ( self$p != 0/unknown ) if ( self$p != 0/unknown )
{ {

View file

@ -5753,6 +5753,9 @@ export {
module Cluster; module Cluster;
export { export {
type Cluster::Pool: record {}; type Cluster::Pool: record {};
## Cluster backend to use. Default is the broker backend.
const backend = Cluster::CLUSTER_BACKEND_BROKER &redef;
} }
module Weird; module Weird;

View file

@ -14,6 +14,7 @@
#include "zeek/Event.h" #include "zeek/Event.h"
#include "zeek/EventHandler.h" #include "zeek/EventHandler.h"
#include "zeek/Expr.h" #include "zeek/Expr.h"
#include "zeek/Frame.h"
#include "zeek/ID.h" #include "zeek/ID.h"
#include "zeek/NetVar.h" #include "zeek/NetVar.h"
#include "zeek/RunState.h" #include "zeek/RunState.h"
@ -649,4 +650,10 @@ void Reporter::DoLog(const char* prefix, EventHandlerPtr event, FILE* out, Conne
bool Reporter::EmitToStderr(bool flag) { return flag || ! run_state::detail::zeek_init_done; } bool Reporter::EmitToStderr(bool flag) { return flag || ! run_state::detail::zeek_init_done; }
ScriptLocationScope::ScriptLocationScope(const zeek::detail::Frame* frame) {
zeek::reporter->PushLocation(frame->GetCallLocation());
}
ScriptLocationScope::~ScriptLocationScope() { zeek::reporter->PopLocation(); }
} // namespace zeek } // namespace zeek

View file

@ -31,8 +31,9 @@ using StringValPtr = IntrusivePtr<StringVal>;
namespace detail { namespace detail {
class AssertStmt; class AssertStmt;
class Location;
class Expr; class Expr;
class Frame;
class Location;
} // namespace detail } // namespace detail
@ -334,6 +335,20 @@ private:
bool ignore_deprecations; bool ignore_deprecations;
}; };
/**
* Helper class pushing the frame's call location onto
* the reporter's location stack, associating the caller's
* location with subsequent output.
*
* The frame's location must remain valid until the object
* is destroyed.
*/
class ScriptLocationScope {
public:
ScriptLocationScope(const zeek::detail::Frame* frame);
~ScriptLocationScope();
};
extern Reporter* reporter; extern Reporter* reporter;
} // namespace zeek } // namespace zeek

View file

@ -28,6 +28,7 @@
#include "zeek/broker/store.bif.h" #include "zeek/broker/store.bif.h"
#include "zeek/iosource/Manager.h" #include "zeek/iosource/Manager.h"
#include "zeek/logging/Manager.h" #include "zeek/logging/Manager.h"
#include "zeek/logging/Types.h"
#include "zeek/telemetry/Manager.h" #include "zeek/telemetry/Manager.h"
#include "zeek/util.h" #include "zeek/util.h"
@ -252,7 +253,7 @@ std::string RenderEvent(const std::string& topic, const std::string& name, const
} // namespace } // namespace
#endif #endif
Manager::Manager(bool arg_use_real_time) { Manager::Manager(bool arg_use_real_time) : Backend(nullptr, nullptr) {
bound_port = 0; bound_port = 0;
use_real_time = arg_use_real_time; use_real_time = arg_use_real_time;
peer_count = 0; peer_count = 0;
@ -262,7 +263,7 @@ Manager::Manager(bool arg_use_real_time) {
writer_id_type = nullptr; writer_id_type = nullptr;
} }
void Manager::InitPostScript() { void Manager::DoInitPostScript() {
DBG_LOG(DBG_BROKER, "Initializing"); DBG_LOG(DBG_BROKER, "Initializing");
log_batch_size = get_option("Broker::log_batch_size")->AsCount(); log_batch_size = get_option("Broker::log_batch_size")->AsCount();
@ -404,7 +405,7 @@ void Manager::InitializeBrokerStoreForwarding() {
} }
} }
void Manager::Terminate() { void Manager::DoTerminate() {
FlushLogBuffers(); FlushLogBuffers();
iosource_mgr->UnregisterFd(bstate->subscriber.fd(), this); iosource_mgr->UnregisterFd(bstate->subscriber.fd(), this);
@ -545,6 +546,22 @@ std::vector<broker::peer_info> Manager::Peers() const {
std::string Manager::NodeID() const { return to_string(bstate->endpoint.node_id()); } std::string Manager::NodeID() const { return to_string(bstate->endpoint.node_id()); }
bool Manager::DoPublishEvent(const std::string& topic, const cluster::detail::Event& event) {
broker::vector xs;
xs.reserve(event.args.size());
for ( const auto& a : event.args ) {
auto r = detail::val_to_data(a.get());
if ( ! r ) {
Error("Failed to convert %s to broker::data", zeek::obj_desc(a.get()).c_str());
return false;
}
xs.emplace_back(std::move(r.value()));
}
std::string name(event.HandlerName());
return PublishEvent(topic, name, std::move(xs), event.timestamp);
}
bool Manager::PublishEvent(string topic, std::string name, broker::vector args, double ts) { bool Manager::PublishEvent(string topic, std::string name, broker::vector args, double ts) {
if ( bstate->endpoint.is_shutdown() ) if ( bstate->endpoint.is_shutdown() )
return true; return true;
@ -862,10 +879,6 @@ RecordVal* Manager::MakeEvent(ValPList* args, zeek::detail::Frame* frame) {
zeek::RecordValPtr Manager::MakeEvent(ArgsSpan args, zeek::detail::Frame* frame) { zeek::RecordValPtr Manager::MakeEvent(ArgsSpan args, zeek::detail::Frame* frame) {
scoped_reporter_location srl{frame}; scoped_reporter_location srl{frame};
return MakeEvent(args);
}
zeek::RecordValPtr Manager::MakeEvent(ArgsSpan args) {
auto rval = zeek::make_intrusive<RecordVal>(BifType::Record::Broker::Event); auto rval = zeek::make_intrusive<RecordVal>(BifType::Record::Broker::Event);
auto arg_vec = make_intrusive<VectorVal>(vector_of_data_type); auto arg_vec = make_intrusive<VectorVal>(vector_of_data_type);
rval->Assign(1, arg_vec); rval->Assign(1, arg_vec);
@ -877,7 +890,8 @@ zeek::RecordValPtr Manager::MakeEvent(ArgsSpan args) {
// Event val must come first. // Event val must come first.
if ( arg_val->GetType()->Tag() != TYPE_FUNC ) { if ( arg_val->GetType()->Tag() != TYPE_FUNC ) {
Error("attempt to convert non-event into an event type"); Error("attempt to convert non-event into an event type (%s)",
zeek::obj_desc_short(arg_val.get()).c_str());
return rval; return rval;
} }
@ -891,7 +905,7 @@ zeek::RecordValPtr Manager::MakeEvent(ArgsSpan args) {
auto num_args = static_cast<size_t>(func->GetType()->Params()->NumFields()); auto num_args = static_cast<size_t>(func->GetType()->Params()->NumFields());
if ( num_args != args.size() - 1 ) { if ( num_args != args.size() - 1 ) {
Error("bad # of arguments: got %zu, expect %zu", args.size() - 1, num_args + 1); Error("bad # of arguments: got %zu, expect %zu", args.size() - 1, num_args);
return rval; return rval;
} }
@ -933,7 +947,7 @@ zeek::RecordValPtr Manager::MakeEvent(ArgsSpan args) {
return rval; return rval;
} }
bool Manager::Subscribe(const string& topic_prefix) { bool Manager::DoSubscribe(const string& topic_prefix) {
DBG_LOG(DBG_BROKER, "Subscribing to topic prefix %s", topic_prefix.c_str()); DBG_LOG(DBG_BROKER, "Subscribing to topic prefix %s", topic_prefix.c_str());
bstate->subscriber.add_topic(topic_prefix, ! run_state::detail::zeek_init_done); bstate->subscriber.add_topic(topic_prefix, ! run_state::detail::zeek_init_done);
@ -951,7 +965,7 @@ bool Manager::Forward(string topic_prefix) {
return true; return true;
} }
bool Manager::Unsubscribe(const string& topic_prefix) { bool Manager::DoUnsubscribe(const string& topic_prefix) {
for ( size_t i = 0; i < forwarded_prefixes.size(); ++i ) for ( size_t i = 0; i < forwarded_prefixes.size(); ++i )
if ( forwarded_prefixes[i] == topic_prefix ) { if ( forwarded_prefixes[i] == topic_prefix ) {
DBG_LOG(DBG_BROKER, "Unforwarding topic prefix %s", topic_prefix.c_str()); DBG_LOG(DBG_BROKER, "Unforwarding topic prefix %s", topic_prefix.c_str());

View file

@ -10,13 +10,16 @@
#include <broker/peer_info.hh> #include <broker/peer_info.hh>
#include <broker/zeek.hh> #include <broker/zeek.hh>
#include <memory> #include <memory>
#include <stdexcept>
#include <string> #include <string>
#include <unordered_map> #include <unordered_map>
#include "zeek/IntrusivePtr.h" #include "zeek/IntrusivePtr.h"
#include "zeek/Span.h" #include "zeek/Span.h"
#include "zeek/broker/Data.h" #include "zeek/broker/Data.h"
#include "zeek/cluster/Backend.h"
#include "zeek/iosource/IOSource.h" #include "zeek/iosource/IOSource.h"
#include "zeek/logging/Types.h"
#include "zeek/logging/WriterBackend.h" #include "zeek/logging/WriterBackend.h"
namespace zeek { namespace zeek {
@ -75,7 +78,7 @@ struct Stats {
* Manages various forms of communication between peer Zeek processes * Manages various forms of communication between peer Zeek processes
* or other external applications via use of the Broker messaging library. * or other external applications via use of the Broker messaging library.
*/ */
class Manager : public iosource::IOSource { class Manager : public zeek::cluster::Backend, public iosource::IOSource {
public: public:
/** Broker protocol to expect on a listening port. */ /** Broker protocol to expect on a listening port. */
enum class BrokerProtocol { enum class BrokerProtocol {
@ -95,17 +98,6 @@ public:
*/ */
~Manager() override = default; ~Manager() override = default;
/**
* Initialization of the manager. This is called late during Zeek's
* initialization after any scripts are processed.
*/
void InitPostScript();
/**
* Shuts Broker down at termination.
*/
void Terminate();
/** /**
* Returns true if any Broker communication is currently active. * Returns true if any Broker communication is currently active.
*/ */
@ -193,6 +185,8 @@ public:
return PublishEvent(std::move(topic), std::move(name), std::move(broker::get<broker::vector>(args.value_)), ts); return PublishEvent(std::move(topic), std::move(name), std::move(broker::get<broker::vector>(args.value_)), ts);
} }
using cluster::Backend::PublishEvent;
/** /**
* Send an event to any interested peers. * Send an event to any interested peers.
* @param topic a topic string associated with the message. * @param topic a topic string associated with the message.
@ -268,14 +262,6 @@ public:
using ArgsSpan = Span<const ValPtr>; using ArgsSpan = Span<const ValPtr>;
/**
* Create an `Event` record value from an event and its arguments.
* @param args A span pointing at the event arguments.
* @return an `Event` record value. If an invalid event or arguments
* were supplied the optional "name" field will not be set.
*/
zeek::RecordValPtr MakeEvent(ArgsSpan args);
/** /**
* Create an `Event` record value from an event and its arguments. * Create an `Event` record value from an event and its arguments.
* @param args A span pointing at the event arguments. * @param args A span pointing at the event arguments.
@ -285,15 +271,6 @@ public:
*/ */
zeek::RecordValPtr MakeEvent(ArgsSpan args, zeek::detail::Frame* frame); zeek::RecordValPtr MakeEvent(ArgsSpan args, zeek::detail::Frame* frame);
/**
* Register interest in peer event messages that use a certain topic prefix.
* @param topic_prefix a prefix to match against remote message topics.
* e.g. an empty prefix will match everything and "a" will match "alice"
* and "amy" but not "bob".
* @return true if it's a new event subscription and it is now registered.
*/
bool Subscribe(const std::string& topic_prefix);
/** /**
* Register interest in peer event messages that use a certain topic prefix, * Register interest in peer event messages that use a certain topic prefix,
* but that should not be raised locally, just forwarded to any subscribing * but that should not be raised locally, just forwarded to any subscribing
@ -305,14 +282,6 @@ public:
*/ */
bool Forward(std::string topic_prefix); bool Forward(std::string topic_prefix);
/**
* Unregister interest in peer event messages.
* @param topic_prefix a prefix previously supplied to a successful call
* to zeek::Broker::Manager::Subscribe() or zeek::Broker::Manager::Forward().
* @return true if interest in topic prefix is no longer advertised.
*/
bool Unsubscribe(const std::string& topic_prefix);
/** /**
* Create a new *master* data store. * Create a new *master* data store.
* @param name The name of the store. * @param name The name of the store.
@ -402,6 +371,48 @@ public:
}; };
private: private:
// Register interest in peer event messages that use a certain topic prefix.
bool DoSubscribe(const std::string& topic_prefix) override;
// Unregister interest in peer event messages.
bool DoUnsubscribe(const std::string& topic_prefix) override;
// Initialization of the manager. This is called late during Zeek's
// initialization after any scripts are processed.
void DoInitPostScript() override;
// Broker doesn't do anything during Broker::Backend::init().
bool DoInit() override { return true; }
// Shuts Broker down at termination.
void DoTerminate() override;
// Broker overrides this to do its own serialization.
bool DoPublishEvent(const std::string& topic, const cluster::detail::Event& event) override;
// This should never be reached, broker itself doesn't call this and overrides
// the generic DoPublishEvent() method that would call this.
bool DoPublishEvent(const std::string& topic, const std::string& format,
const cluster::detail::byte_buffer& buf) override {
throw std::logic_error("not implemented");
}
// WriterFrontend instances are broker-aware and never call this
// method and instead call the existing PublishLogWrite() method.
//
// TODO: Move log buffering out of broker and implement.
bool DoPublishLogWrites(const logging::detail::LogWriteHeader& header,
zeek::Span<logging::detail::LogRecord> records) override {
// Not implemented by broker.
throw std::logic_error("not implemented");
}
bool DoPublishLogWrites(const logging::detail::LogWriteHeader& header, const std::string& format,
cluster::detail::byte_buffer& buf) override {
// Not implemented by broker.
throw std::logic_error("not implemented");
}
// Process events used for Broker store backed zeek tables // Process events used for Broker store backed zeek tables
void ProcessStoreEvent(broker::data msg); void ProcessStoreEvent(broker::data msg);
// Common functionality for processing insert and update events. // Common functionality for processing insert and update events.

137
src/cluster/BifSupport.cc Normal file
View file

@ -0,0 +1,137 @@
#include "zeek/cluster/BifSupport.h"
#include "zeek/Desc.h"
#include "zeek/Event.h"
#include "zeek/EventRegistry.h"
#include "zeek/Frame.h"
#include "zeek/Func.h"
#include "zeek/IntrusivePtr.h"
#include "zeek/Reporter.h"
#include "zeek/Type.h"
#include "zeek/Val.h"
#include "zeek/broker/Manager.h" // For publishing to broker_mgr directly.
#include "zeek/cluster/Backend.h"
namespace {
// Convert a script-level Cluster::Event to a cluster::detail::Event.
std::optional<zeek::cluster::detail::Event> to_cluster_event(const zeek::RecordValPtr& rec) {
const auto& func = rec->GetField<zeek::FuncVal>(0);
const auto& vargs = rec->GetField<zeek::VectorVal>(1);
const auto& eh = zeek::event_registry->Lookup(func->AsFuncPtr()->GetName());
if ( ! eh ) {
zeek::emit_builtin_error(
zeek::util::fmt("event registry lookup of '%s' failed", zeek::obj_desc_short(func.get()).c_str()));
return std::nullopt;
}
// Need to copy from VectorVal to zeek::Args
zeek::Args args(vargs->Size());
for ( size_t i = 0; i < vargs->Size(); i++ )
args[i] = vargs->ValAt(i);
// TODO: Support configurable timestamps or custom metadata on the record.
auto timestamp = zeek::event_mgr.CurrentEventTime();
return zeek::cluster::detail::Event(eh, std::move(args), timestamp);
}
} // namespace
namespace zeek::cluster::detail::bif {
zeek::RecordValPtr make_event(zeek::ArgsSpan args) {
static const auto& any_vec_type = zeek::id::find_type<zeek::VectorType>("any_vec");
static const auto& event_record_type = zeek::id::find_type<zeek::RecordType>("Cluster::Event");
auto rec = zeek::make_intrusive<zeek::RecordVal>(event_record_type);
if ( args.empty() ) {
zeek::emit_builtin_error("not enough arguments");
return rec;
}
const auto& maybe_func_val = args[0];
if ( maybe_func_val->GetType()->Tag() != zeek::TYPE_FUNC ) {
zeek::emit_builtin_error(
zeek::util::fmt("got non-event type '%s'", zeek::obj_desc_short(maybe_func_val->GetType().get()).c_str()));
return rec;
}
const auto func = zeek::FuncValPtr{zeek::NewRef{}, maybe_func_val->AsFuncVal()};
auto checked_args = cluster::detail::check_args(func, args.subspan(1));
if ( ! checked_args )
return rec;
// Making a copy from zeek::Args to a VectorVal and then back again on publish.
auto vec = zeek::make_intrusive<zeek::VectorVal>(any_vec_type);
vec->Reserve(checked_args->size());
rec->Assign(0, maybe_func_val);
for ( const auto& arg : *checked_args )
vec->Append(arg);
rec->Assign(1, vec); // Args
return rec;
}
zeek::ValPtr publish_event(const zeek::ValPtr& topic, zeek::ArgsSpan args) {
static const auto& cluster_event_type = zeek::id::find_type<zeek::RecordType>("Cluster::Event");
static const auto& broker_event_type = zeek::id::find_type<zeek::RecordType>("Broker::Event");
if ( args.empty() ) {
zeek::emit_builtin_error("no event arguments given");
return zeek::val_mgr->False();
}
if ( topic->GetType()->Tag() != zeek::TYPE_STRING ) {
zeek::emit_builtin_error("topic is not a string");
return zeek::val_mgr->False();
}
const auto topic_str = topic->AsStringVal()->ToStdString();
auto timestamp = zeek::event_mgr.CurrentEventTime();
if ( args[0]->GetType()->Tag() == zeek::TYPE_FUNC ) {
auto event = zeek::cluster::backend->MakeClusterEvent({zeek::NewRef{}, args[0]->AsFuncVal()}, args.subspan(1),
timestamp);
if ( event )
return zeek::val_mgr->Bool(zeek::cluster::backend->PublishEvent(topic_str, *event));
return zeek::val_mgr->False();
}
else if ( args[0]->GetType()->Tag() == zeek::TYPE_RECORD ) {
if ( args[0]->GetType() == cluster_event_type ) { // Handling Cluster::Event record type
auto ev = to_cluster_event(cast_intrusive<zeek::RecordVal>(args[0]));
if ( ! ev )
return zeek::val_mgr->False();
return zeek::val_mgr->Bool(zeek::cluster::backend->PublishEvent(topic_str, *ev));
}
else if ( args[0]->GetType() == broker_event_type ) {
// Handling Broker::Event record type created by Broker::make_event()
// only works if the backend is broker_mgr!
if ( zeek::cluster::backend != zeek::broker_mgr ) {
zeek::emit_builtin_error(
zeek::util::fmt("Publish of Broker::Event record instance with type '%s' to a non-Broker backend",
zeek::obj_desc_short(args[0]->GetType().get()).c_str()));
return zeek::val_mgr->False();
}
return zeek::val_mgr->Bool(zeek::broker_mgr->PublishEvent(topic_str, args[0]->AsRecordVal()));
}
else {
zeek::emit_builtin_error(zeek::util::fmt("Publish of unknown record type '%s'",
zeek::obj_desc_short(args[0]->GetType().get()).c_str()));
return zeek::val_mgr->False();
}
}
zeek::emit_builtin_error(zeek::util::fmt("expected function or record as first argument, got %s",
zeek::obj_desc_short(args[0]->GetType().get()).c_str()));
return zeek::val_mgr->False();
}
} // namespace zeek::cluster::detail::bif

49
src/cluster/BifSupport.h Normal file
View file

@ -0,0 +1,49 @@
// See the file "COPYING" in the main distribution directory for copyright.
#pragma once
#include "zeek/IntrusivePtr.h"
#include "zeek/Span.h"
// Helpers for cluster.bif
namespace zeek {
namespace detail {
class Frame;
}
class RecordVal;
using RecordValPtr = IntrusivePtr<RecordVal>;
class Val;
using ValPtr = IntrusivePtr<Val>;
using ArgsSpan = Span<const ValPtr>;
namespace cluster::detail::bif {
/**
* Cluster::make_event() implementation.
*
* @param topic The topic to publish to. Should be a StringVal.
* @param args The arguments to the BiF function. May either be a prepared event from make_event(),
* or a FuncValPtr and it's arguments
*
* @return A RecordValPtr representing a Cluster::Event record instance.
*/
zeek::RecordValPtr make_event(zeek::ArgsSpan args);
/**
* Publish helper.
*
* @param topic The topic to publish to. Should be a StringVal.
* @param args The arguments to the BiF function. May either be a prepared event from make_event(),
* or a FuncValPtr and it's arguments
*
* @return A BoolValPtr that's true if the event was published, else false.
*/
zeek::ValPtr publish_event(const zeek::ValPtr& topic, zeek::ArgsSpan args);
} // namespace cluster::detail::bif
} // namespace zeek

View file

@ -6,4 +6,9 @@ zeek_add_subdir_library(
SOURCES SOURCES
Component.cc Component.cc
Backend.cc Backend.cc
Manager.cc) BifSupport.cc
Manager.cc
BIFS
cluster.bif)
add_subdirectory(serializer)

71
src/cluster/cluster.bif Normal file
View file

@ -0,0 +1,71 @@
%%{
#include <string>
#include "zeek/cluster/Backend.h"
#include "zeek/cluster/BifSupport.h"
using namespace zeek::cluster::detail::bif;
%%}
module Cluster;
type Cluster::Event: record;
## Publishes an event to a given topic.
##
## topic: a topic associated with the event message.
##
## args: Either the event arguments as already made by
## :zeek:see:`Cluster::make_event` or the argument list to pass along
## to it.
##
## Returns: true if the message is sent.
function Cluster::publish%(topic: string, ...%): bool
%{
ScriptLocationScope scope{frame};
auto args = zeek::ArgsSpan{*@ARGS@}.subspan(1);
return publish_event({zeek::NewRef{}, topic}, args);
%}
## Create a data structure that may be used to send a remote event via
## :zeek:see:`Broker::publish`.
##
## args: an event, followed by a list of argument values that may be used
## to call it.
##
## Returns: A :zeek:type:`Cluster::Event` instance that can be published via
## :zeek:see:`Cluster::publish`, :zeek:see:`Cluster::publish_rr`
## or :zeek:see:`Cluster::publish_hrw`.
function Cluster::make_event%(...%): Cluster::Event
%{
ScriptLocationScope scope{frame};
return make_event(zeek::ArgsSpan{*@ARGS@});
%}
function Cluster::__subscribe%(topic_prefix: string%): bool
%{
ScriptLocationScope scope{frame};
auto rval = zeek::cluster::backend->Subscribe(topic_prefix->CheckString());
return zeek::val_mgr->Bool(rval);
%}
function Cluster::__unsubscribe%(topic_prefix: string%): bool
%{
ScriptLocationScope scope{frame};
auto rval = zeek::cluster::backend->Unsubscribe(topic_prefix->CheckString());
return zeek::val_mgr->Bool(rval);
%}
## Initialize the global cluster backend.
##
## Returns: true on success.
function Cluster::Backend::__init%(%): bool
%{
auto rval = zeek::cluster::backend->Init();
return zeek::val_mgr->Bool(rval);
%}

View file

@ -0,0 +1 @@
add_subdirectory(broker)

View file

@ -0,0 +1,9 @@
zeek_add_plugin(
Zeek
Cluster_Serializer_Binary_Serialization_Format
INCLUDE_DIRS
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_BINARY_DIR}
SOURCES
Plugin.cc
Serializer.cc)

View file

@ -0,0 +1,24 @@
#include "Plugin.h"
#include <memory>
#include "zeek/cluster/Component.h"
#include "Serializer.h"
using namespace zeek::cluster;
using namespace zeek::plugin::Broker_Serializer;
zeek::plugin::Configuration Plugin::Configure() {
AddComponent(new EventSerializerComponent("BROKER_BIN_V1", []() -> std::unique_ptr<EventSerializer> {
return std::make_unique<cluster::detail::BrokerBinV1_Serializer>();
}));
AddComponent(new EventSerializerComponent("BROKER_JSON_V1", []() -> std::unique_ptr<EventSerializer> {
return std::make_unique<cluster::detail::BrokerJsonV1_Serializer>();
}));
zeek::plugin::Configuration config;
config.name = "Zeek::Broker_Serializer";
config.description = "Event serialization using Broker event formats (binary and json)";
return config;
}

View file

@ -0,0 +1,14 @@
// See the file "COPYING" in the main distribution directory for copyright.
#pragma once
#include "zeek/plugin/Plugin.h"
namespace zeek::plugin::Broker_Serializer {
class Plugin : public zeek::plugin::Plugin {
public:
zeek::plugin::Configuration Configure() override;
} plugin;
} // namespace zeek::plugin::Broker_Serializer

View file

@ -0,0 +1 @@
Contains event serializers using Broker's BIN and JSON formats.

View file

@ -0,0 +1,219 @@
#include "zeek/cluster/serializer/broker/Serializer.h"
#include <optional>
#include "zeek/Desc.h"
#include "zeek/Func.h"
#include "zeek/Reporter.h"
#include "zeek/broker/Data.h"
#include "zeek/cluster/Backend.h"
#include "broker/data_envelope.hh"
#include "broker/error.hh"
#include "broker/format/json.hh"
#include "broker/zeek.hh"
#include "zeek/3rdparty/doctest.h"
using namespace zeek::cluster;
namespace {
/**
* Convert a cluster::detail::Event to a broker::zeek::Event.
*
* @param ev The cluster::detail::Event
* @return A broker::zeek::Event to be serialized, or nullopt in case of errors.
*/
std::optional<broker::zeek::Event> to_broker_event(const detail::Event& ev) {
broker::vector xs;
xs.reserve(ev.args.size());
for ( const auto& a : ev.args ) {
if ( auto res = zeek::Broker::detail::val_to_data(a.get()) ) {
xs.emplace_back(std::move(res.value()));
}
else {
return std::nullopt;
}
}
return broker::zeek::Event(ev.HandlerName(), xs, broker::to_timestamp(ev.timestamp));
}
/**
* Convert a broker::zeek::Event to cluster::detail::Event by looking
* it up in Zeek's event handler registry and converting event arguments
* to the appropriate Val instances.
*
* @param broker_ev The broker side event.
* @returns A zeek::cluster::detail::Event instance, or std::nullopt if the conversion failed.
*/
std::optional<detail::Event> to_zeek_event(const broker::zeek::Event& ev) {
auto&& name = ev.name();
auto&& args = ev.args();
// Meh, technically need to convert ev.metadata() and
// expose it to script land as `table[count] of any`
// where consumers then know what to do with it.
//
// For now, handle the timestamp explicitly.
double ts;
if ( auto ev_ts = ev.ts() )
broker::convert(*ev_ts, ts);
else
ts = zeek::run_state::network_time;
zeek::Args vl;
zeek::EventHandlerPtr handler = zeek::event_registry->Lookup(name);
if ( ! handler ) {
zeek::reporter->Error("Failed to lookup handler for '%s'", std::string(name).c_str());
return std::nullopt;
}
const auto& arg_types = handler->GetFunc()->GetType()->ParamList()->GetTypes();
if ( arg_types.size() != args.size() ) {
std::string event_name(name);
zeek::reporter->Error("Unserialize error '%s' arg_types.size()=%zu and args.size()=%zu", event_name.c_str(),
arg_types.size(), args.size());
return std::nullopt;
}
for ( size_t i = 0; i < args.size(); ++i ) {
const auto& expected_type = arg_types[i];
auto arg = args[i].to_data();
auto val = zeek::Broker::detail::data_to_val(arg, expected_type.get());
if ( val )
vl.emplace_back(std::move(val));
else {
std::string event_name(name);
auto got_type = args[i].get_type_name();
std::string argstr = broker::to_string(arg);
zeek::reporter
->Error("Unserialize error for event '%s': broker value '%s' type '%s' to Zeek type '%s' failed",
event_name.c_str(), argstr.c_str(), got_type, obj_desc(expected_type.get()).c_str());
return std::nullopt;
}
}
return detail::Event{handler, std::move(vl), ts};
}
} // namespace
bool detail::BrokerBinV1_Serializer::SerializeEvent(detail::byte_buffer& buf, const detail::Event& event) {
auto ev = to_broker_event(event);
if ( ! ev )
return false;
// The produced broker::zeek::Event is already in bin::v1 format after
// constructing it, so we can take the raw bytes directly rather than
// going through encode() again.
//
// broker::format::bin::v1::encode(ev->move_data(), std::back_inserter(buf));
assert(ev->raw()->shared_envelope() != nullptr);
auto [raw, size] = ev->raw().shared_envelope()->raw_bytes();
buf.insert(buf.begin(), raw, raw + size);
return true;
}
std::optional<detail::Event> detail::BrokerBinV1_Serializer::UnserializeEvent(detail::byte_buffer_span buf) {
auto r = broker::data_envelope::deserialize(broker::endpoint_id::nil(), broker::endpoint_id::nil(), 0, "",
buf.data(), buf.size());
if ( ! r )
return std::nullopt;
broker::zeek::Event ev(*r);
return to_zeek_event(ev);
}
// Convert char to std::byte during push_back() so that
// we don't need to copy from std::vector<char> to a
// std::vector<std::byte> when rendering JSON.
template<typename T>
struct PushBackAdapter {
explicit PushBackAdapter(T& c) : container(&c) {}
using value_type = char;
void push_back(char c) { container->push_back(static_cast<std::byte>(c)); }
T* container;
};
bool detail::BrokerJsonV1_Serializer::SerializeEvent(byte_buffer& buf, const detail::Event& event) {
auto ev = to_broker_event(event);
if ( ! ev )
return false;
auto push_back_adaptor = PushBackAdapter(buf);
broker::format::json::v1::encode(ev->move_data(), std::back_inserter(push_back_adaptor));
return true;
}
std::optional<detail::Event> detail::BrokerJsonV1_Serializer::UnserializeEvent(detail::byte_buffer_span buf) {
broker::variant res;
auto err =
broker::format::json::v1::decode(std::string_view{reinterpret_cast<const char*>(buf.data()), buf.size()}, res);
if ( err ) {
zeek::reporter->Error("Decode error for JSON payload: '%s'",
err.message() ? err.message()->c_str() : "unknown");
return std::nullopt;
}
broker::zeek::Event ev(std::move(res));
return to_zeek_event(ev);
}
TEST_SUITE_BEGIN("cluster serializer broker");
#include "zeek/EventRegistry.h"
TEST_CASE("roundtrip") {
auto* handler = zeek::event_registry->Lookup("Supervisor::node_status");
detail::Event e{handler, zeek::Args{zeek::make_intrusive<zeek::StringVal>("TEST"), zeek::val_mgr->Count(42)}};
detail::byte_buffer buf;
SUBCASE("json") {
detail::BrokerJsonV1_Serializer serializer;
std::string expected =
R"({"@data-type":"vector","data":[{"@data-type":"count","data":1},{"@data-type":"count","data":1},{"@data-type":"vector","data":[{"@data-type":"string","data":"Supervisor::node_status"},{"@data-type":"vector","data":[{"@data-type":"string","data":"TEST"},{"@data-type":"count","data":42}]},{"@data-type":"vector","data":[{"@data-type":"vector","data":[{"@data-type":"count","data":1},{"@data-type":"timestamp","data":"1970-01-01T00:00:00.000"}]}]}]}]})";
serializer.SerializeEvent(buf, e);
CHECK_EQ(expected, std::string{reinterpret_cast<char*>(buf.data()), buf.size()});
auto result = serializer.UnserializeEvent(buf);
REQUIRE(result);
CHECK_EQ(result->Handler(), handler);
CHECK_EQ(result->HandlerName(), "Supervisor::node_status");
CHECK_EQ(result->args.size(), 2);
}
SUBCASE("binary") {
detail::BrokerBinV1_Serializer serializer;
unsigned char expected_bytes[] = {0x0e, 0x03, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x02, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x0e, 0x03, 0x05, 0x17, 0x53, 0x75,
0x70, 0x65, 0x72, 0x76, 0x69, 0x73, 0x6f, 0x72, 0x3a, 0x3a, 0x6e, 0x6f, 0x64,
0x65, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x0e, 0x02, 0x05, 0x04, 0x54,
0x45, 0x53, 0x54, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x2a, 0x0e,
0x01, 0x0e, 0x02, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x09,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
std::byte* p = reinterpret_cast<std::byte*>(&expected_bytes[0]);
detail::byte_buffer expected{p, p + sizeof(expected_bytes)};
serializer.SerializeEvent(buf, e);
CHECK_EQ(expected, buf);
auto result = serializer.UnserializeEvent(buf);
REQUIRE(result);
CHECK_EQ(result->Handler(), handler);
CHECK_EQ(result->HandlerName(), "Supervisor::node_status");
CHECK_EQ(result->args.size(), 2);
}
}
TEST_SUITE_END();

View file

@ -0,0 +1,31 @@
// See the file "COPYING" in the main distribution directory for copyright.
#pragma once
#include "zeek/cluster/Serializer.h"
namespace zeek::cluster::detail {
// Implementation of the EventSerializer using the existing broker::detail::val_to_data()
// and broker::format::bin::v1::encode().
class BrokerBinV1_Serializer : public EventSerializer {
public:
BrokerBinV1_Serializer() : EventSerializer("broker-bin-v1") {}
bool SerializeEvent(detail::byte_buffer& buf, const detail::Event& event) override;
std::optional<detail::Event> UnserializeEvent(detail::byte_buffer_span buf) override;
};
// Implementation of the EventSerializer that uses the existing broker::detail::val_to_data()
// and broker::format::json::v1::encode()
class BrokerJsonV1_Serializer : public EventSerializer {
public:
BrokerJsonV1_Serializer() : EventSerializer("broker-json-v1") {}
bool SerializeEvent(zeek::cluster::detail::byte_buffer& buf, const detail::Event& event) override;
std::optional<detail::Event> UnserializeEvent(detail::byte_buffer_span buf) override;
};
} // namespace zeek::cluster::detail

View file

@ -50,6 +50,7 @@
#include "zeek/analyzer/Manager.h" #include "zeek/analyzer/Manager.h"
#include "zeek/binpac_zeek.h" #include "zeek/binpac_zeek.h"
#include "zeek/broker/Manager.h" #include "zeek/broker/Manager.h"
#include "zeek/cluster/Backend.h"
#include "zeek/cluster/Manager.h" #include "zeek/cluster/Manager.h"
#include "zeek/file_analysis/Manager.h" #include "zeek/file_analysis/Manager.h"
#include "zeek/input.h" #include "zeek/input.h"
@ -179,6 +180,7 @@ zeek::spicy::Manager* zeek::spicy_mgr = nullptr;
#endif #endif
zeek::cluster::Manager* zeek::cluster::manager = nullptr; zeek::cluster::Manager* zeek::cluster::manager = nullptr;
zeek::cluster::Backend* zeek::cluster::backend = nullptr;
std::vector<std::string> zeek::detail::zeek_script_prefixes; std::vector<std::string> zeek::detail::zeek_script_prefixes;
zeek::detail::Stmt* zeek::detail::stmts = nullptr; zeek::detail::Stmt* zeek::detail::stmts = nullptr;
@ -376,7 +378,11 @@ static void terminate_zeek() {
log_mgr->Terminate(); log_mgr->Terminate();
input_mgr->Terminate(); input_mgr->Terminate();
thread_mgr->Terminate(); thread_mgr->Terminate();
broker_mgr->Terminate(); broker_mgr->Terminate();
if ( cluster::backend != broker_mgr )
cluster::backend->Terminate();
telemetry_mgr->Terminate(); telemetry_mgr->Terminate();
event_mgr.Drain(); event_mgr.Drain();
@ -814,7 +820,26 @@ SetupResult setup(int argc, char** argv, Options* zopts) {
log_mgr->InitPostScript(); log_mgr->InitPostScript();
plugin_mgr->InitPostScript(); plugin_mgr->InitPostScript();
zeekygen_mgr->InitPostScript(); zeekygen_mgr->InitPostScript();
// If Cluster::backend is set to broker, just set zeek::cluster::backend
// to broker_mgr like it has always been. If it's an alternative
// implementation, instantiate it.
const auto& cluster_backend_val = id::find_val<zeek::EnumVal>("Cluster::backend");
const auto& cluster_backend_type = zeek::id::find_type<EnumType>("Cluster::BackendTag");
zeek_int_t broker_enum = cluster_backend_type->Lookup("Cluster::CLUSTER_BACKEND_BROKER");
if ( broker_enum == cluster_backend_val->AsEnum() ) {
cluster::backend = broker_mgr;
}
else {
reporter->Error("Unsupported cluster backend configured: %s",
zeek::obj_desc_short(cluster_backend_val.get()).c_str());
exit(1);
}
broker_mgr->InitPostScript(); broker_mgr->InitPostScript();
if ( cluster::backend != broker_mgr )
cluster::backend->InitPostScript();
timer_mgr->InitPostScript(); timer_mgr->InitPostScript();
event_mgr.InitPostScript(); event_mgr.InitPostScript();

View file

@ -0,0 +1,12 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
receiver added peer: endpoint=127.0.0.1 msg=handshake successful
is_remote should be T, and is, T
receiver got ping: my-message, 1
is_remote should be T, and is, T
receiver got ping: my-message, 2
is_remote should be T, and is, T
receiver got ping: my-message, 3
is_remote should be T, and is, T
receiver got ping: my-message, 4
is_remote should be T, and is, T
receiver got ping: my-message, 5

View file

@ -0,0 +1,11 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
sender added peer: endpoint=127.0.0.1 msg=handshake successful
is_remote should be T, and is, T
sender got pong: my-message, 1
is_remote should be T, and is, T
sender got pong: my-message, 2
is_remote should be T, and is, T
sender got pong: my-message, 3
is_remote should be T, and is, T
sender got pong: my-message, 4
sender lost peer: endpoint=127.0.0.1 msg=lost connection to remote peer

View file

@ -0,0 +1,15 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
error in <...>/errors.zeek, line 20: no event arguments given (Cluster::publish(topic))
error in <...>/errors.zeek, line 21: not enough arguments (Cluster::make_event())
error in <...>/errors.zeek, line 24: bad number of arguments for ping1: got 0, expect 2
error in <...>/errors.zeek, line 25: bad number of arguments for ping1: got 0, expect 2
error in <...>/errors.zeek, line 28: bad number of arguments for ping1: got 1, expect 2
error in <...>/errors.zeek, line 29: bad number of arguments for ping1: got 1, expect 2
error in <...>/errors.zeek, line 32: bad number of arguments for ping1: got 3, expect 2
error in <...>/errors.zeek, line 33: bad number of arguments for ping1: got 3, expect 2
error in <...>/errors.zeek, line 41: event parameter #2 type mismatch, got count, expecting string
error in <...>/errors.zeek, line 42: event parameter #2 type mismatch, got count, expecting string
error in <...>/errors.zeek, line 45: unexpected function type for hook1: hook
error in <...>/errors.zeek, line 46: unexpected function type for hook1: hook
error in <...>/errors.zeek, line 49: expected function or record as first argument, got count (Cluster::publish(topic, 1))
error in <...>/errors.zeek, line 50: got non-event type 'count' (Cluster::make_event(1))

View file

@ -0,0 +1,10 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
wrong number of args
r1, [ev=<uninitialized>, args=[]]
r2, [ev=<uninitialized>, args=[]]
r3, [ev=<uninitialized>, args=[]]
r4, [ev=<uninitialized>, args=[]]
wrong types
r1, [ev=<uninitialized>, args=[]]
r2, [ev=<uninitialized>, args=[]]
r3, [ev=<uninitialized>, args=[]]

View file

@ -0,0 +1,7 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
error in <...>/make_event.zeek, line 32: not enough arguments (Cluster::make_event())
error in <...>/make_event.zeek, line 37: got non-event type 'string' (Cluster::make_event(a))
error in <...>/make_event.zeek, line 42: unexpected function type for test_fun: function
error in <...>/make_event.zeek, line 47: unexpected function type for test_hook: hook
error in <...>/make_event.zeek, line 52: bad number of arguments for test_event2: got 0, expect 1
error in <...>/make_event.zeek, line 57: bad number of arguments for test_event2: got 2, expect 1

View file

@ -0,0 +1,3 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
event(), []
event(s:string), [abc]

View file

@ -0,0 +1,7 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
Zeek::Broker_Serializer - Event serialization using Broker event formats (binary and json) (built-in)
[Event Serializer] BROKER_BIN_V1 (Cluster::EVENT_SERIALIZER_BROKER_BIN_V1)
[Event Serializer] BROKER_JSON_V1 (Cluster::EVENT_SERIALIZER_BROKER_JSON_V1)
Cluster::EVENT_SERIALIZER_BROKER_BIN_V1, Cluster::EventSerializerTag
Cluster::EVENT_SERIALIZER_BROKER_JSON_V1, Cluster::EventSerializerTag

View file

@ -135,6 +135,7 @@ scripts/base/init-frameworks-and-bifs.zeek
scripts/base/frameworks/cluster/main.zeek scripts/base/frameworks/cluster/main.zeek
scripts/base/frameworks/control/__load__.zeek scripts/base/frameworks/control/__load__.zeek
scripts/base/frameworks/control/main.zeek scripts/base/frameworks/control/main.zeek
build/scripts/base/bif/cluster.bif.zeek
scripts/base/frameworks/cluster/pools.zeek scripts/base/frameworks/cluster/pools.zeek
scripts/base/utils/hash_hrw.zeek scripts/base/utils/hash_hrw.zeek
scripts/base/frameworks/config/__load__.zeek scripts/base/frameworks/config/__load__.zeek

View file

@ -135,6 +135,7 @@ scripts/base/init-frameworks-and-bifs.zeek
scripts/base/frameworks/cluster/main.zeek scripts/base/frameworks/cluster/main.zeek
scripts/base/frameworks/control/__load__.zeek scripts/base/frameworks/control/__load__.zeek
scripts/base/frameworks/control/main.zeek scripts/base/frameworks/control/main.zeek
build/scripts/base/bif/cluster.bif.zeek
scripts/base/frameworks/cluster/pools.zeek scripts/base/frameworks/cluster/pools.zeek
scripts/base/utils/hash_hrw.zeek scripts/base/utils/hash_hrw.zeek
scripts/base/frameworks/config/__load__.zeek scripts/base/frameworks/config/__load__.zeek

View file

@ -462,6 +462,7 @@
0.000000 MetaHookPost LoadFile(0, ./api, <...>/api.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./api, <...>/api.zeek) -> -1
0.000000 MetaHookPost LoadFile(0, ./bloom-filter.bif.zeek, <...>/bloom-filter.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./bloom-filter.bif.zeek, <...>/bloom-filter.bif.zeek) -> -1
0.000000 MetaHookPost LoadFile(0, ./cardinality-counter.bif.zeek, <...>/cardinality-counter.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./cardinality-counter.bif.zeek, <...>/cardinality-counter.bif.zeek) -> -1
0.000000 MetaHookPost LoadFile(0, ./cluster.bif.zeek, <...>/cluster.bif.zeek) -> -1
0.000000 MetaHookPost LoadFile(0, ./comm.bif.zeek, <...>/comm.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./comm.bif.zeek, <...>/comm.bif.zeek) -> -1
0.000000 MetaHookPost LoadFile(0, ./communityid.bif.zeek, <...>/communityid.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./communityid.bif.zeek, <...>/communityid.bif.zeek) -> -1
0.000000 MetaHookPost LoadFile(0, ./const.bif.zeek, <...>/const.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./const.bif.zeek, <...>/const.bif.zeek) -> -1
@ -540,6 +541,7 @@
0.000000 MetaHookPost LoadFile(0, base<...>/ayiya, <...>/ayiya) -> -1 0.000000 MetaHookPost LoadFile(0, base<...>/ayiya, <...>/ayiya) -> -1
0.000000 MetaHookPost LoadFile(0, base<...>/broker, <...>/broker) -> -1 0.000000 MetaHookPost LoadFile(0, base<...>/broker, <...>/broker) -> -1
0.000000 MetaHookPost LoadFile(0, base<...>/cluster, <...>/cluster) -> -1 0.000000 MetaHookPost LoadFile(0, base<...>/cluster, <...>/cluster) -> -1
0.000000 MetaHookPost LoadFile(0, base<...>/cluster.bif, <...>/cluster.bif.zeek) -> -1
0.000000 MetaHookPost LoadFile(0, base<...>/comm.bif, <...>/comm.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, base<...>/comm.bif, <...>/comm.bif.zeek) -> -1
0.000000 MetaHookPost LoadFile(0, base<...>/communityid.bif, <...>/communityid.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, base<...>/communityid.bif, <...>/communityid.bif.zeek) -> -1
0.000000 MetaHookPost LoadFile(0, base<...>/config, <...>/config) -> -1 0.000000 MetaHookPost LoadFile(0, base<...>/config, <...>/config) -> -1
@ -763,6 +765,7 @@
0.000000 MetaHookPost LoadFileExtended(0, ./api, <...>/api.zeek) -> (-1, <no content>) 0.000000 MetaHookPost LoadFileExtended(0, ./api, <...>/api.zeek) -> (-1, <no content>)
0.000000 MetaHookPost LoadFileExtended(0, ./bloom-filter.bif.zeek, <...>/bloom-filter.bif.zeek) -> (-1, <no content>) 0.000000 MetaHookPost LoadFileExtended(0, ./bloom-filter.bif.zeek, <...>/bloom-filter.bif.zeek) -> (-1, <no content>)
0.000000 MetaHookPost LoadFileExtended(0, ./cardinality-counter.bif.zeek, <...>/cardinality-counter.bif.zeek) -> (-1, <no content>) 0.000000 MetaHookPost LoadFileExtended(0, ./cardinality-counter.bif.zeek, <...>/cardinality-counter.bif.zeek) -> (-1, <no content>)
0.000000 MetaHookPost LoadFileExtended(0, ./cluster.bif.zeek, <...>/cluster.bif.zeek) -> (-1, <no content>)
0.000000 MetaHookPost LoadFileExtended(0, ./comm.bif.zeek, <...>/comm.bif.zeek) -> (-1, <no content>) 0.000000 MetaHookPost LoadFileExtended(0, ./comm.bif.zeek, <...>/comm.bif.zeek) -> (-1, <no content>)
0.000000 MetaHookPost LoadFileExtended(0, ./communityid.bif.zeek, <...>/communityid.bif.zeek) -> (-1, <no content>) 0.000000 MetaHookPost LoadFileExtended(0, ./communityid.bif.zeek, <...>/communityid.bif.zeek) -> (-1, <no content>)
0.000000 MetaHookPost LoadFileExtended(0, ./const.bif.zeek, <...>/const.bif.zeek) -> (-1, <no content>) 0.000000 MetaHookPost LoadFileExtended(0, ./const.bif.zeek, <...>/const.bif.zeek) -> (-1, <no content>)
@ -841,6 +844,7 @@
0.000000 MetaHookPost LoadFileExtended(0, base<...>/ayiya, <...>/ayiya) -> (-1, <no content>) 0.000000 MetaHookPost LoadFileExtended(0, base<...>/ayiya, <...>/ayiya) -> (-1, <no content>)
0.000000 MetaHookPost LoadFileExtended(0, base<...>/broker, <...>/broker) -> (-1, <no content>) 0.000000 MetaHookPost LoadFileExtended(0, base<...>/broker, <...>/broker) -> (-1, <no content>)
0.000000 MetaHookPost LoadFileExtended(0, base<...>/cluster, <...>/cluster) -> (-1, <no content>) 0.000000 MetaHookPost LoadFileExtended(0, base<...>/cluster, <...>/cluster) -> (-1, <no content>)
0.000000 MetaHookPost LoadFileExtended(0, base<...>/cluster.bif, <...>/cluster.bif.zeek) -> (-1, <no content>)
0.000000 MetaHookPost LoadFileExtended(0, base<...>/comm.bif, <...>/comm.bif.zeek) -> (-1, <no content>) 0.000000 MetaHookPost LoadFileExtended(0, base<...>/comm.bif, <...>/comm.bif.zeek) -> (-1, <no content>)
0.000000 MetaHookPost LoadFileExtended(0, base<...>/communityid.bif, <...>/communityid.bif.zeek) -> (-1, <no content>) 0.000000 MetaHookPost LoadFileExtended(0, base<...>/communityid.bif, <...>/communityid.bif.zeek) -> (-1, <no content>)
0.000000 MetaHookPost LoadFileExtended(0, base<...>/config, <...>/config) -> (-1, <no content>) 0.000000 MetaHookPost LoadFileExtended(0, base<...>/config, <...>/config) -> (-1, <no content>)
@ -1396,6 +1400,7 @@
0.000000 MetaHookPre LoadFile(0, ./api, <...>/api.zeek) 0.000000 MetaHookPre LoadFile(0, ./api, <...>/api.zeek)
0.000000 MetaHookPre LoadFile(0, ./bloom-filter.bif.zeek, <...>/bloom-filter.bif.zeek) 0.000000 MetaHookPre LoadFile(0, ./bloom-filter.bif.zeek, <...>/bloom-filter.bif.zeek)
0.000000 MetaHookPre LoadFile(0, ./cardinality-counter.bif.zeek, <...>/cardinality-counter.bif.zeek) 0.000000 MetaHookPre LoadFile(0, ./cardinality-counter.bif.zeek, <...>/cardinality-counter.bif.zeek)
0.000000 MetaHookPre LoadFile(0, ./cluster.bif.zeek, <...>/cluster.bif.zeek)
0.000000 MetaHookPre LoadFile(0, ./comm.bif.zeek, <...>/comm.bif.zeek) 0.000000 MetaHookPre LoadFile(0, ./comm.bif.zeek, <...>/comm.bif.zeek)
0.000000 MetaHookPre LoadFile(0, ./communityid.bif.zeek, <...>/communityid.bif.zeek) 0.000000 MetaHookPre LoadFile(0, ./communityid.bif.zeek, <...>/communityid.bif.zeek)
0.000000 MetaHookPre LoadFile(0, ./const.bif.zeek, <...>/const.bif.zeek) 0.000000 MetaHookPre LoadFile(0, ./const.bif.zeek, <...>/const.bif.zeek)
@ -1474,6 +1479,7 @@
0.000000 MetaHookPre LoadFile(0, base<...>/ayiya, <...>/ayiya) 0.000000 MetaHookPre LoadFile(0, base<...>/ayiya, <...>/ayiya)
0.000000 MetaHookPre LoadFile(0, base<...>/broker, <...>/broker) 0.000000 MetaHookPre LoadFile(0, base<...>/broker, <...>/broker)
0.000000 MetaHookPre LoadFile(0, base<...>/cluster, <...>/cluster) 0.000000 MetaHookPre LoadFile(0, base<...>/cluster, <...>/cluster)
0.000000 MetaHookPre LoadFile(0, base<...>/cluster.bif, <...>/cluster.bif.zeek)
0.000000 MetaHookPre LoadFile(0, base<...>/comm.bif, <...>/comm.bif.zeek) 0.000000 MetaHookPre LoadFile(0, base<...>/comm.bif, <...>/comm.bif.zeek)
0.000000 MetaHookPre LoadFile(0, base<...>/communityid.bif, <...>/communityid.bif.zeek) 0.000000 MetaHookPre LoadFile(0, base<...>/communityid.bif, <...>/communityid.bif.zeek)
0.000000 MetaHookPre LoadFile(0, base<...>/config, <...>/config) 0.000000 MetaHookPre LoadFile(0, base<...>/config, <...>/config)
@ -1697,6 +1703,7 @@
0.000000 MetaHookPre LoadFileExtended(0, ./api, <...>/api.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./api, <...>/api.zeek)
0.000000 MetaHookPre LoadFileExtended(0, ./bloom-filter.bif.zeek, <...>/bloom-filter.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./bloom-filter.bif.zeek, <...>/bloom-filter.bif.zeek)
0.000000 MetaHookPre LoadFileExtended(0, ./cardinality-counter.bif.zeek, <...>/cardinality-counter.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./cardinality-counter.bif.zeek, <...>/cardinality-counter.bif.zeek)
0.000000 MetaHookPre LoadFileExtended(0, ./cluster.bif.zeek, <...>/cluster.bif.zeek)
0.000000 MetaHookPre LoadFileExtended(0, ./comm.bif.zeek, <...>/comm.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./comm.bif.zeek, <...>/comm.bif.zeek)
0.000000 MetaHookPre LoadFileExtended(0, ./communityid.bif.zeek, <...>/communityid.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./communityid.bif.zeek, <...>/communityid.bif.zeek)
0.000000 MetaHookPre LoadFileExtended(0, ./const.bif.zeek, <...>/const.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./const.bif.zeek, <...>/const.bif.zeek)
@ -1775,6 +1782,7 @@
0.000000 MetaHookPre LoadFileExtended(0, base<...>/ayiya, <...>/ayiya) 0.000000 MetaHookPre LoadFileExtended(0, base<...>/ayiya, <...>/ayiya)
0.000000 MetaHookPre LoadFileExtended(0, base<...>/broker, <...>/broker) 0.000000 MetaHookPre LoadFileExtended(0, base<...>/broker, <...>/broker)
0.000000 MetaHookPre LoadFileExtended(0, base<...>/cluster, <...>/cluster) 0.000000 MetaHookPre LoadFileExtended(0, base<...>/cluster, <...>/cluster)
0.000000 MetaHookPre LoadFileExtended(0, base<...>/cluster.bif, <...>/cluster.bif.zeek)
0.000000 MetaHookPre LoadFileExtended(0, base<...>/comm.bif, <...>/comm.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, base<...>/comm.bif, <...>/comm.bif.zeek)
0.000000 MetaHookPre LoadFileExtended(0, base<...>/communityid.bif, <...>/communityid.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, base<...>/communityid.bif, <...>/communityid.bif.zeek)
0.000000 MetaHookPre LoadFileExtended(0, base<...>/config, <...>/config) 0.000000 MetaHookPre LoadFileExtended(0, base<...>/config, <...>/config)
@ -2331,6 +2339,7 @@
0.000000 | HookLoadFile ./audio <...>/audio.sig 0.000000 | HookLoadFile ./audio <...>/audio.sig
0.000000 | HookLoadFile ./bloom-filter.bif.zeek <...>/bloom-filter.bif.zeek 0.000000 | HookLoadFile ./bloom-filter.bif.zeek <...>/bloom-filter.bif.zeek
0.000000 | HookLoadFile ./cardinality-counter.bif.zeek <...>/cardinality-counter.bif.zeek 0.000000 | HookLoadFile ./cardinality-counter.bif.zeek <...>/cardinality-counter.bif.zeek
0.000000 | HookLoadFile ./cluster.bif.zeek <...>/cluster.bif.zeek
0.000000 | HookLoadFile ./comm.bif.zeek <...>/comm.bif.zeek 0.000000 | HookLoadFile ./comm.bif.zeek <...>/comm.bif.zeek
0.000000 | HookLoadFile ./communityid.bif.zeek <...>/communityid.bif.zeek 0.000000 | HookLoadFile ./communityid.bif.zeek <...>/communityid.bif.zeek
0.000000 | HookLoadFile ./const.bif.zeek <...>/const.bif.zeek 0.000000 | HookLoadFile ./const.bif.zeek <...>/const.bif.zeek
@ -2419,6 +2428,7 @@
0.000000 | HookLoadFile base<...>/ayiya <...>/ayiya 0.000000 | HookLoadFile base<...>/ayiya <...>/ayiya
0.000000 | HookLoadFile base<...>/broker <...>/broker 0.000000 | HookLoadFile base<...>/broker <...>/broker
0.000000 | HookLoadFile base<...>/cluster <...>/cluster 0.000000 | HookLoadFile base<...>/cluster <...>/cluster
0.000000 | HookLoadFile base<...>/cluster.bif <...>/cluster.bif.zeek
0.000000 | HookLoadFile base<...>/comm.bif <...>/comm.bif.zeek 0.000000 | HookLoadFile base<...>/comm.bif <...>/comm.bif.zeek
0.000000 | HookLoadFile base<...>/communityid.bif <...>/communityid.bif.zeek 0.000000 | HookLoadFile base<...>/communityid.bif <...>/communityid.bif.zeek
0.000000 | HookLoadFile base<...>/config <...>/config 0.000000 | HookLoadFile base<...>/config <...>/config
@ -2632,6 +2642,7 @@
0.000000 | HookLoadFileExtended ./audio <...>/audio.sig 0.000000 | HookLoadFileExtended ./audio <...>/audio.sig
0.000000 | HookLoadFileExtended ./bloom-filter.bif.zeek <...>/bloom-filter.bif.zeek 0.000000 | HookLoadFileExtended ./bloom-filter.bif.zeek <...>/bloom-filter.bif.zeek
0.000000 | HookLoadFileExtended ./cardinality-counter.bif.zeek <...>/cardinality-counter.bif.zeek 0.000000 | HookLoadFileExtended ./cardinality-counter.bif.zeek <...>/cardinality-counter.bif.zeek
0.000000 | HookLoadFileExtended ./cluster.bif.zeek <...>/cluster.bif.zeek
0.000000 | HookLoadFileExtended ./comm.bif.zeek <...>/comm.bif.zeek 0.000000 | HookLoadFileExtended ./comm.bif.zeek <...>/comm.bif.zeek
0.000000 | HookLoadFileExtended ./communityid.bif.zeek <...>/communityid.bif.zeek 0.000000 | HookLoadFileExtended ./communityid.bif.zeek <...>/communityid.bif.zeek
0.000000 | HookLoadFileExtended ./const.bif.zeek <...>/const.bif.zeek 0.000000 | HookLoadFileExtended ./const.bif.zeek <...>/const.bif.zeek
@ -2720,6 +2731,7 @@
0.000000 | HookLoadFileExtended base<...>/ayiya <...>/ayiya 0.000000 | HookLoadFileExtended base<...>/ayiya <...>/ayiya
0.000000 | HookLoadFileExtended base<...>/broker <...>/broker 0.000000 | HookLoadFileExtended base<...>/broker <...>/broker
0.000000 | HookLoadFileExtended base<...>/cluster <...>/cluster 0.000000 | HookLoadFileExtended base<...>/cluster <...>/cluster
0.000000 | HookLoadFileExtended base<...>/cluster.bif <...>/cluster.bif.zeek
0.000000 | HookLoadFileExtended base<...>/comm.bif <...>/comm.bif.zeek 0.000000 | HookLoadFileExtended base<...>/comm.bif <...>/comm.bif.zeek
0.000000 | HookLoadFileExtended base<...>/communityid.bif <...>/communityid.bif.zeek 0.000000 | HookLoadFileExtended base<...>/communityid.bif <...>/communityid.bif.zeek
0.000000 | HookLoadFileExtended base<...>/config <...>/config 0.000000 | HookLoadFileExtended base<...>/config <...>/config

View file

@ -0,0 +1,106 @@
# @TEST-DOC: Use Cluster::subscribe() and Cluster::publish() with Broker
# @TEST-GROUP: cluster
#
# @TEST-PORT: BROKER_PORT
#
# @TEST-EXEC: btest-bg-run recv "zeek -b ../recv.zeek >recv.out"
# @TEST-EXEC: btest-bg-run send "zeek -b ../send.zeek >send.out"
#
# @TEST-EXEC: btest-bg-wait 30
# @TEST-EXEC: btest-diff recv/recv.out
# @TEST-EXEC: btest-diff send/send.out
@TEST-START-FILE send.zeek
redef Cluster::backend = Cluster::CLUSTER_BACKEND_BROKER;
redef exit_only_after_terminate = T;
global event_count = 0;
global ping: event(msg: string, c: count);
event zeek_init()
{
Cluster::init();
Broker::peer("127.0.0.1", to_port(getenv("BROKER_PORT")));
Cluster::subscribe("zeek/event/my_topic");
}
function send_event()
{
++event_count;
local e = Cluster::make_event(ping, "my-message", event_count);
Cluster::publish("zeek/event/my_topic", e);
}
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
{
print fmt("sender added peer: endpoint=%s msg=%s",
endpoint$network$address, msg);
send_event();
}
event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
{
print fmt("sender lost peer: endpoint=%s msg=%s",
endpoint$network$address, msg);
Cluster::unsubscribe("zeek/event/my_topic");
terminate();
}
event pong(msg: string, n: count)
{
print "is_remote should be T, and is", is_remote_event();
print fmt("sender got pong: %s, %s", msg, n);
send_event();
}
@TEST-END-FILE
@TEST-START-FILE recv.zeek
redef Cluster::backend = Cluster::CLUSTER_BACKEND_BROKER;
redef exit_only_after_terminate = T;
const events_to_recv = 5;
global handler: event(msg: string, c: count);
global auto_handler: event(msg: string, c: count);
global pong: event(msg: string, c: count);
event zeek_init()
{
Cluster::init();
Broker::listen("127.0.0.1", to_port(getenv("BROKER_PORT")));
Cluster::subscribe("zeek/event/my_topic");
}
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
{
print fmt("receiver added peer: endpoint=%s msg=%s", endpoint$network$address, msg);
}
event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
{
print fmt("receiver lost peer: endpoint=%s msg=%s", endpoint$network$address, msg);
}
event ping(msg: string, n: count)
{
print "is_remote should be T, and is", is_remote_event();
print fmt("receiver got ping: %s, %s", msg, n);
if ( n == events_to_recv )
{
Cluster::unsubscribe("zeek/event/my_topic");
terminate();
return;
}
Cluster::publish("zeek/event/my_topic", pong, msg, n);
}
@TEST-END-FILE

View file

@ -0,0 +1,52 @@
# @TEST-DOC: Test some validation errors of cluster bifs
#
# @TEST-EXEC: zeek --parse-only -b %INPUT
# @TEST-EXEC: zeek -b %INPUT
# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-remove-abspath btest-diff .stderr
# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-remove-abspath btest-diff .stdout
event ping1(c: count, how: string) &is_used
{
}
hook hook1(c: count, how: string) &is_used
{
}
event zeek_init() &priority=-1
{
print "wrong number of args";
Cluster::publish("topic");
local r1 = Cluster::make_event();
print "r1", r1;
Cluster::publish("topic", ping1);
local r2 = Cluster::make_event(ping1);
print "r2", r2;
Cluster::publish("topic", ping1, 1);
local r3 = Cluster::make_event(ping1, 1);
print "r3", r3;
Cluster::publish("topic", ping1, 1, "args", 1.2.3.4);
local r4 = Cluster::make_event(ping1, 1, "event", 1.2.3.4);
print "r4", r4;
}
event zeek_init() &priority=-2
{
print "wrong types";
Cluster::publish("topic", ping1, 1, 2);
local r1 = Cluster::make_event(ping1, 1, 2);
print "r1", r1;
Cluster::publish("topic", hook1, 1, "hook");
local r2 = Cluster::make_event(hook1, 1, "hook");
print "r2", r2;
Cluster::publish("topic", 1);
local r3 = Cluster::make_event(1);
print "r3", r2;
}

View file

@ -0,0 +1,58 @@
# @TEST-DOC: Test make_event behavior.
#
# @TEST-EXEC: zeek -b %INPUT >out
#
# @TEST-EXEC: btest-diff out
# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-remove-abspath btest-diff .stderr
function test_fun() { }
hook test_hook() { }
event test_event() { }
event test_event2(s: string) { }
function as_cluster_event(e: any): Cluster::Event
{
assert e is Cluster::Event;
return e as Cluster::Event;
}
event zeek_init() &priority=10
{
local e1 = Cluster::make_event(test_event);
local ce1 = as_cluster_event(e1);
print type_name(ce1$ev), ce1$args;
local e2 = Cluster::make_event(test_event2, "abc");
local ce2 = as_cluster_event(e2);
print type_name(ce2$ev), ce2$args;
}
event zeek_init() &priority=-10
{
local e = Cluster::make_event();
}
event zeek_init() &priority=-11
{
local e = Cluster::make_event("a");
}
event zeek_init() &priority=-12
{
local e = Cluster::make_event(test_fun);
}
event zeek_init() &priority=-13
{
local e = Cluster::make_event(test_hook);
}
event zeek_init() &priority=-14
{
local e = Cluster::make_event(test_event2);
}
event zeek_init() &priority=-15
{
local e = Cluster::make_event(test_event2, "a", "b");
}

View file

@ -0,0 +1,11 @@
# @TEST-DOC: Test cluster backend enum
#
# @TEST-EXEC: zeek -NN Zeek::Broker_Serializer >>out
# @TEST-EXEC: zeek -b %INPUT >>out
# @TEST-EXEC: btest-diff out
event zeek_init()
{
print Cluster::EVENT_SERIALIZER_BROKER_BIN_V1, type_name(Cluster::EVENT_SERIALIZER_BROKER_BIN_V1);
print Cluster::EVENT_SERIALIZER_BROKER_JSON_V1, type_name(Cluster::EVENT_SERIALIZER_BROKER_JSON_V1);
}