From ef04a199c8dd4b6440fb6320a64f9b3671ca53d2 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Wed, 13 Nov 2024 16:22:15 +0100 Subject: [PATCH] cluster: Add Cluster scoped bifs ... and a broker based test using Cluster::publish() and Cluster::subscribe(). --- scripts/base/frameworks/cluster/main.zeek | 42 ++++++ src/cluster/BifSupport.cc | 137 ++++++++++++++++++ src/cluster/BifSupport.h | 49 +++++++ src/cluster/CMakeLists.txt | 5 +- src/cluster/cluster.bif | 71 +++++++++ .../recv.recv.out | 12 ++ .../send.send.out | 11 ++ .../Baseline/cluster.generic.errors/.stderr | 15 ++ .../Baseline/cluster.generic.errors/.stdout | 10 ++ .../cluster.generic.make_event/.stderr | 7 + .../Baseline/cluster.generic.make_event/out | 3 + .../canonified_loaded_scripts.log | 1 + .../canonified_loaded_scripts.log | 1 + testing/btest/Baseline/plugins.hooks/output | 12 ++ .../btest/cluster/broker/cluster-publish.zeek | 106 ++++++++++++++ testing/btest/cluster/generic/errors.zeek | 52 +++++++ testing/btest/cluster/generic/make_event.zeek | 58 ++++++++ 17 files changed, 591 insertions(+), 1 deletion(-) create mode 100644 src/cluster/BifSupport.cc create mode 100644 src/cluster/BifSupport.h create mode 100644 src/cluster/cluster.bif create mode 100644 testing/btest/Baseline/cluster.broker.cluster-publish/recv.recv.out create mode 100644 testing/btest/Baseline/cluster.broker.cluster-publish/send.send.out create mode 100644 testing/btest/Baseline/cluster.generic.errors/.stderr create mode 100644 testing/btest/Baseline/cluster.generic.errors/.stdout create mode 100644 testing/btest/Baseline/cluster.generic.make_event/.stderr create mode 100644 testing/btest/Baseline/cluster.generic.make_event/out create mode 100644 testing/btest/cluster/broker/cluster-publish.zeek create mode 100644 testing/btest/cluster/generic/errors.zeek create mode 100644 testing/btest/cluster/generic/make_event.zeek diff --git a/scripts/base/frameworks/cluster/main.zeek b/scripts/base/frameworks/cluster/main.zeek index d27a9160a9..caf2e6a11d 100644 --- a/scripts/base/frameworks/cluster/main.zeek +++ b/scripts/base/frameworks/cluster/main.zeek @@ -281,7 +281,30 @@ export { ## a given cluster node. global nodeid_topic: function(id: string): string; + ## Initialize the cluster backend. + ## + ## Cluster backends usually invoke this from a :zeek:see:`zeek_init` handler. + ## + ## Returns: T on success, else F. + global init: function(): bool; + + ## Subscribe to the given topic. + ## + ## topic: The topic to subscribe to. + ## + ## Returns: T on success, else F. + global subscribe: function(topic: string): bool; + + ## Unsubscribe from the given topic. + ## + ## topic: The topic to unsubscribe from. + ## + ## Returns: T on success, else F. + global unsubscribe: function(topic: string): bool; + ## An event instance for cluster pub/sub. + ## + ## See :zeek:see:`Cluster::publish` and :zeek:see:`Cluster::make_event`. type Event: record { ## The event handler to be invoked on the remote node. ev: any; @@ -290,6 +313,10 @@ export { }; } +# Needs declaration of Cluster::Event type. +@load base/bif/cluster.bif + + # Track active nodes per type. global active_node_ids: table[NodeType] of set[string]; @@ -528,3 +555,18 @@ function log(msg: string) { Log::write(Cluster::LOG, [$ts = network_time(), $node = node, $message = msg]); } + +function init(): bool + { + return Cluster::Backend::__init(); + } + +function subscribe(topic: string): bool + { + return Cluster::__subscribe(topic); + } + +function unsubscribe(topic: string): bool + { + return Cluster::__unsubscribe(topic); + } diff --git a/src/cluster/BifSupport.cc b/src/cluster/BifSupport.cc new file mode 100644 index 0000000000..1ed4b49760 --- /dev/null +++ b/src/cluster/BifSupport.cc @@ -0,0 +1,137 @@ +#include "zeek/cluster/BifSupport.h" + +#include "zeek/Desc.h" +#include "zeek/Event.h" +#include "zeek/EventRegistry.h" +#include "zeek/Frame.h" +#include "zeek/Func.h" +#include "zeek/IntrusivePtr.h" +#include "zeek/Reporter.h" +#include "zeek/Type.h" +#include "zeek/Val.h" +#include "zeek/broker/Manager.h" // For publishing to broker_mgr directly. +#include "zeek/cluster/Backend.h" + +namespace { + +// Convert a script-level Cluster::Event to a cluster::detail::Event. +std::optional to_cluster_event(const zeek::RecordValPtr& rec) { + const auto& func = rec->GetField(0); + const auto& vargs = rec->GetField(1); + + const auto& eh = zeek::event_registry->Lookup(func->AsFuncPtr()->GetName()); + if ( ! eh ) { + zeek::emit_builtin_error( + zeek::util::fmt("event registry lookup of '%s' failed", zeek::obj_desc_short(func.get()).c_str())); + return std::nullopt; + } + + // Need to copy from VectorVal to zeek::Args + zeek::Args args(vargs->Size()); + for ( size_t i = 0; i < vargs->Size(); i++ ) + args[i] = vargs->ValAt(i); + + // TODO: Support configurable timestamps or custom metadata on the record. + auto timestamp = zeek::event_mgr.CurrentEventTime(); + + return zeek::cluster::detail::Event(eh, std::move(args), timestamp); +} +} // namespace + + +namespace zeek::cluster::detail::bif { + +zeek::RecordValPtr make_event(zeek::ArgsSpan args) { + static const auto& any_vec_type = zeek::id::find_type("any_vec"); + static const auto& event_record_type = zeek::id::find_type("Cluster::Event"); + auto rec = zeek::make_intrusive(event_record_type); + + if ( args.empty() ) { + zeek::emit_builtin_error("not enough arguments"); + return rec; + } + + const auto& maybe_func_val = args[0]; + + if ( maybe_func_val->GetType()->Tag() != zeek::TYPE_FUNC ) { + zeek::emit_builtin_error( + zeek::util::fmt("got non-event type '%s'", zeek::obj_desc_short(maybe_func_val->GetType().get()).c_str())); + return rec; + } + + const auto func = zeek::FuncValPtr{zeek::NewRef{}, maybe_func_val->AsFuncVal()}; + auto checked_args = cluster::detail::check_args(func, args.subspan(1)); + if ( ! checked_args ) + return rec; + + // Making a copy from zeek::Args to a VectorVal and then back again on publish. + auto vec = zeek::make_intrusive(any_vec_type); + vec->Reserve(checked_args->size()); + rec->Assign(0, maybe_func_val); + for ( const auto& arg : *checked_args ) + vec->Append(arg); + + rec->Assign(1, vec); // Args + + return rec; +} + +zeek::ValPtr publish_event(const zeek::ValPtr& topic, zeek::ArgsSpan args) { + static const auto& cluster_event_type = zeek::id::find_type("Cluster::Event"); + static const auto& broker_event_type = zeek::id::find_type("Broker::Event"); + + if ( args.empty() ) { + zeek::emit_builtin_error("no event arguments given"); + return zeek::val_mgr->False(); + } + + if ( topic->GetType()->Tag() != zeek::TYPE_STRING ) { + zeek::emit_builtin_error("topic is not a string"); + return zeek::val_mgr->False(); + } + + const auto topic_str = topic->AsStringVal()->ToStdString(); + + auto timestamp = zeek::event_mgr.CurrentEventTime(); + + if ( args[0]->GetType()->Tag() == zeek::TYPE_FUNC ) { + auto event = zeek::cluster::backend->MakeClusterEvent({zeek::NewRef{}, args[0]->AsFuncVal()}, args.subspan(1), + timestamp); + if ( event ) + return zeek::val_mgr->Bool(zeek::cluster::backend->PublishEvent(topic_str, *event)); + + return zeek::val_mgr->False(); + } + else if ( args[0]->GetType()->Tag() == zeek::TYPE_RECORD ) { + if ( args[0]->GetType() == cluster_event_type ) { // Handling Cluster::Event record type + auto ev = to_cluster_event(cast_intrusive(args[0])); + if ( ! ev ) + return zeek::val_mgr->False(); + + return zeek::val_mgr->Bool(zeek::cluster::backend->PublishEvent(topic_str, *ev)); + } + else if ( args[0]->GetType() == broker_event_type ) { + // Handling Broker::Event record type created by Broker::make_event() + // only works if the backend is broker_mgr! + if ( zeek::cluster::backend != zeek::broker_mgr ) { + zeek::emit_builtin_error( + zeek::util::fmt("Publish of Broker::Event record instance with type '%s' to a non-Broker backend", + zeek::obj_desc_short(args[0]->GetType().get()).c_str())); + + return zeek::val_mgr->False(); + } + + return zeek::val_mgr->Bool(zeek::broker_mgr->PublishEvent(topic_str, args[0]->AsRecordVal())); + } + else { + zeek::emit_builtin_error(zeek::util::fmt("Publish of unknown record type '%s'", + zeek::obj_desc_short(args[0]->GetType().get()).c_str())); + return zeek::val_mgr->False(); + } + } + + zeek::emit_builtin_error(zeek::util::fmt("expected function or record as first argument, got %s", + zeek::obj_desc_short(args[0]->GetType().get()).c_str())); + return zeek::val_mgr->False(); +} +} // namespace zeek::cluster::detail::bif diff --git a/src/cluster/BifSupport.h b/src/cluster/BifSupport.h new file mode 100644 index 0000000000..2434482796 --- /dev/null +++ b/src/cluster/BifSupport.h @@ -0,0 +1,49 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#pragma once + +#include "zeek/IntrusivePtr.h" +#include "zeek/Span.h" + +// Helpers for cluster.bif + +namespace zeek { + +namespace detail { +class Frame; +} + +class RecordVal; +using RecordValPtr = IntrusivePtr; + +class Val; +using ValPtr = IntrusivePtr; +using ArgsSpan = Span; + +namespace cluster::detail::bif { + +/** + * Cluster::make_event() implementation. + * + * @param topic The topic to publish to. Should be a StringVal. + * @param args The arguments to the BiF function. May either be a prepared event from make_event(), + * or a FuncValPtr and it's arguments + * + * @return A RecordValPtr representing a Cluster::Event record instance. + */ +zeek::RecordValPtr make_event(zeek::ArgsSpan args); + +/** + * Publish helper. + * + * @param topic The topic to publish to. Should be a StringVal. + * @param args The arguments to the BiF function. May either be a prepared event from make_event(), + * or a FuncValPtr and it's arguments + * + * @return A BoolValPtr that's true if the event was published, else false. + */ +zeek::ValPtr publish_event(const zeek::ValPtr& topic, zeek::ArgsSpan args); + +} // namespace cluster::detail::bif + +} // namespace zeek diff --git a/src/cluster/CMakeLists.txt b/src/cluster/CMakeLists.txt index 0395d28786..a4da087ff1 100644 --- a/src/cluster/CMakeLists.txt +++ b/src/cluster/CMakeLists.txt @@ -6,4 +6,7 @@ zeek_add_subdir_library( SOURCES Component.cc Backend.cc - Manager.cc) + BifSupport.cc + Manager.cc + BIFS + cluster.bif) diff --git a/src/cluster/cluster.bif b/src/cluster/cluster.bif new file mode 100644 index 0000000000..cdbe5edf9d --- /dev/null +++ b/src/cluster/cluster.bif @@ -0,0 +1,71 @@ +%%{ +#include + +#include "zeek/cluster/Backend.h" +#include "zeek/cluster/BifSupport.h" + +using namespace zeek::cluster::detail::bif; + +%%} + +module Cluster; + +type Cluster::Event: record; + +## Publishes an event to a given topic. +## +## topic: a topic associated with the event message. +## +## args: Either the event arguments as already made by +## :zeek:see:`Cluster::make_event` or the argument list to pass along +## to it. +## +## Returns: true if the message is sent. +function Cluster::publish%(topic: string, ...%): bool + %{ + ScriptLocationScope scope{frame}; + + auto args = zeek::ArgsSpan{*@ARGS@}.subspan(1); + return publish_event({zeek::NewRef{}, topic}, args); + %} + +## Create a data structure that may be used to send a remote event via +## :zeek:see:`Broker::publish`. +## +## args: an event, followed by a list of argument values that may be used +## to call it. +## +## Returns: A :zeek:type:`Cluster::Event` instance that can be published via +## :zeek:see:`Cluster::publish`, :zeek:see:`Cluster::publish_rr` +## or :zeek:see:`Cluster::publish_hrw`. +function Cluster::make_event%(...%): Cluster::Event + %{ + ScriptLocationScope scope{frame}; + + return make_event(zeek::ArgsSpan{*@ARGS@}); + %} + +function Cluster::__subscribe%(topic_prefix: string%): bool + %{ + ScriptLocationScope scope{frame}; + + auto rval = zeek::cluster::backend->Subscribe(topic_prefix->CheckString()); + return zeek::val_mgr->Bool(rval); + %} + +function Cluster::__unsubscribe%(topic_prefix: string%): bool + %{ + ScriptLocationScope scope{frame}; + + auto rval = zeek::cluster::backend->Unsubscribe(topic_prefix->CheckString()); + return zeek::val_mgr->Bool(rval); + %} + +## Initialize the global cluster backend. +## +## Returns: true on success. +function Cluster::Backend::__init%(%): bool + %{ + auto rval = zeek::cluster::backend->Init(); + return zeek::val_mgr->Bool(rval); + %} diff --git a/testing/btest/Baseline/cluster.broker.cluster-publish/recv.recv.out b/testing/btest/Baseline/cluster.broker.cluster-publish/recv.recv.out new file mode 100644 index 0000000000..fcd1b41ad2 --- /dev/null +++ b/testing/btest/Baseline/cluster.broker.cluster-publish/recv.recv.out @@ -0,0 +1,12 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +receiver added peer: endpoint=127.0.0.1 msg=handshake successful +is_remote should be T, and is, T +receiver got ping: my-message, 1 +is_remote should be T, and is, T +receiver got ping: my-message, 2 +is_remote should be T, and is, T +receiver got ping: my-message, 3 +is_remote should be T, and is, T +receiver got ping: my-message, 4 +is_remote should be T, and is, T +receiver got ping: my-message, 5 diff --git a/testing/btest/Baseline/cluster.broker.cluster-publish/send.send.out b/testing/btest/Baseline/cluster.broker.cluster-publish/send.send.out new file mode 100644 index 0000000000..de80e44f07 --- /dev/null +++ b/testing/btest/Baseline/cluster.broker.cluster-publish/send.send.out @@ -0,0 +1,11 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +sender added peer: endpoint=127.0.0.1 msg=handshake successful +is_remote should be T, and is, T +sender got pong: my-message, 1 +is_remote should be T, and is, T +sender got pong: my-message, 2 +is_remote should be T, and is, T +sender got pong: my-message, 3 +is_remote should be T, and is, T +sender got pong: my-message, 4 +sender lost peer: endpoint=127.0.0.1 msg=lost connection to remote peer diff --git a/testing/btest/Baseline/cluster.generic.errors/.stderr b/testing/btest/Baseline/cluster.generic.errors/.stderr new file mode 100644 index 0000000000..f26dbca5a0 --- /dev/null +++ b/testing/btest/Baseline/cluster.generic.errors/.stderr @@ -0,0 +1,15 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +error in <...>/errors.zeek, line 20: no event arguments given (Cluster::publish(topic)) +error in <...>/errors.zeek, line 21: not enough arguments (Cluster::make_event()) +error in <...>/errors.zeek, line 24: bad number of arguments for ping1: got 0, expect 2 +error in <...>/errors.zeek, line 25: bad number of arguments for ping1: got 0, expect 2 +error in <...>/errors.zeek, line 28: bad number of arguments for ping1: got 1, expect 2 +error in <...>/errors.zeek, line 29: bad number of arguments for ping1: got 1, expect 2 +error in <...>/errors.zeek, line 32: bad number of arguments for ping1: got 3, expect 2 +error in <...>/errors.zeek, line 33: bad number of arguments for ping1: got 3, expect 2 +error in <...>/errors.zeek, line 41: event parameter #2 type mismatch, got count, expecting string +error in <...>/errors.zeek, line 42: event parameter #2 type mismatch, got count, expecting string +error in <...>/errors.zeek, line 45: unexpected function type for hook1: hook +error in <...>/errors.zeek, line 46: unexpected function type for hook1: hook +error in <...>/errors.zeek, line 49: expected function or record as first argument, got count (Cluster::publish(topic, 1)) +error in <...>/errors.zeek, line 50: got non-event type 'count' (Cluster::make_event(1)) diff --git a/testing/btest/Baseline/cluster.generic.errors/.stdout b/testing/btest/Baseline/cluster.generic.errors/.stdout new file mode 100644 index 0000000000..ee431b2959 --- /dev/null +++ b/testing/btest/Baseline/cluster.generic.errors/.stdout @@ -0,0 +1,10 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +wrong number of args +r1, [ev=, args=[]] +r2, [ev=, args=[]] +r3, [ev=, args=[]] +r4, [ev=, args=[]] +wrong types +r1, [ev=, args=[]] +r2, [ev=, args=[]] +r3, [ev=, args=[]] diff --git a/testing/btest/Baseline/cluster.generic.make_event/.stderr b/testing/btest/Baseline/cluster.generic.make_event/.stderr new file mode 100644 index 0000000000..e5dfcdb9f7 --- /dev/null +++ b/testing/btest/Baseline/cluster.generic.make_event/.stderr @@ -0,0 +1,7 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +error in <...>/make_event.zeek, line 32: not enough arguments (Cluster::make_event()) +error in <...>/make_event.zeek, line 37: got non-event type 'string' (Cluster::make_event(a)) +error in <...>/make_event.zeek, line 42: unexpected function type for test_fun: function +error in <...>/make_event.zeek, line 47: unexpected function type for test_hook: hook +error in <...>/make_event.zeek, line 52: bad number of arguments for test_event2: got 0, expect 1 +error in <...>/make_event.zeek, line 57: bad number of arguments for test_event2: got 2, expect 1 diff --git a/testing/btest/Baseline/cluster.generic.make_event/out b/testing/btest/Baseline/cluster.generic.make_event/out new file mode 100644 index 0000000000..1f379ec328 --- /dev/null +++ b/testing/btest/Baseline/cluster.generic.make_event/out @@ -0,0 +1,3 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +event(), [] +event(s:string), [abc] diff --git a/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log b/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log index 38dbbc40f4..b64871a39f 100644 --- a/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log +++ b/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log @@ -135,6 +135,7 @@ scripts/base/init-frameworks-and-bifs.zeek scripts/base/frameworks/cluster/main.zeek scripts/base/frameworks/control/__load__.zeek scripts/base/frameworks/control/main.zeek + build/scripts/base/bif/cluster.bif.zeek scripts/base/frameworks/cluster/pools.zeek scripts/base/utils/hash_hrw.zeek scripts/base/frameworks/config/__load__.zeek diff --git a/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log b/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log index 829f16e0e9..99cb5d53a3 100644 --- a/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log +++ b/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log @@ -135,6 +135,7 @@ scripts/base/init-frameworks-and-bifs.zeek scripts/base/frameworks/cluster/main.zeek scripts/base/frameworks/control/__load__.zeek scripts/base/frameworks/control/main.zeek + build/scripts/base/bif/cluster.bif.zeek scripts/base/frameworks/cluster/pools.zeek scripts/base/utils/hash_hrw.zeek scripts/base/frameworks/config/__load__.zeek diff --git a/testing/btest/Baseline/plugins.hooks/output b/testing/btest/Baseline/plugins.hooks/output index 884e2bf4ca..9c4ee0795b 100644 --- a/testing/btest/Baseline/plugins.hooks/output +++ b/testing/btest/Baseline/plugins.hooks/output @@ -462,6 +462,7 @@ 0.000000 MetaHookPost LoadFile(0, ./api, <...>/api.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./bloom-filter.bif.zeek, <...>/bloom-filter.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./cardinality-counter.bif.zeek, <...>/cardinality-counter.bif.zeek) -> -1 +0.000000 MetaHookPost LoadFile(0, ./cluster.bif.zeek, <...>/cluster.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./comm.bif.zeek, <...>/comm.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./communityid.bif.zeek, <...>/communityid.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./const.bif.zeek, <...>/const.bif.zeek) -> -1 @@ -540,6 +541,7 @@ 0.000000 MetaHookPost LoadFile(0, base<...>/ayiya, <...>/ayiya) -> -1 0.000000 MetaHookPost LoadFile(0, base<...>/broker, <...>/broker) -> -1 0.000000 MetaHookPost LoadFile(0, base<...>/cluster, <...>/cluster) -> -1 +0.000000 MetaHookPost LoadFile(0, base<...>/cluster.bif, <...>/cluster.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, base<...>/comm.bif, <...>/comm.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, base<...>/communityid.bif, <...>/communityid.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, base<...>/config, <...>/config) -> -1 @@ -763,6 +765,7 @@ 0.000000 MetaHookPost LoadFileExtended(0, ./api, <...>/api.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./bloom-filter.bif.zeek, <...>/bloom-filter.bif.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./cardinality-counter.bif.zeek, <...>/cardinality-counter.bif.zeek) -> (-1, ) +0.000000 MetaHookPost LoadFileExtended(0, ./cluster.bif.zeek, <...>/cluster.bif.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./comm.bif.zeek, <...>/comm.bif.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./communityid.bif.zeek, <...>/communityid.bif.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./const.bif.zeek, <...>/const.bif.zeek) -> (-1, ) @@ -841,6 +844,7 @@ 0.000000 MetaHookPost LoadFileExtended(0, base<...>/ayiya, <...>/ayiya) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, base<...>/broker, <...>/broker) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, base<...>/cluster, <...>/cluster) -> (-1, ) +0.000000 MetaHookPost LoadFileExtended(0, base<...>/cluster.bif, <...>/cluster.bif.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, base<...>/comm.bif, <...>/comm.bif.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, base<...>/communityid.bif, <...>/communityid.bif.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, base<...>/config, <...>/config) -> (-1, ) @@ -1396,6 +1400,7 @@ 0.000000 MetaHookPre LoadFile(0, ./api, <...>/api.zeek) 0.000000 MetaHookPre LoadFile(0, ./bloom-filter.bif.zeek, <...>/bloom-filter.bif.zeek) 0.000000 MetaHookPre LoadFile(0, ./cardinality-counter.bif.zeek, <...>/cardinality-counter.bif.zeek) +0.000000 MetaHookPre LoadFile(0, ./cluster.bif.zeek, <...>/cluster.bif.zeek) 0.000000 MetaHookPre LoadFile(0, ./comm.bif.zeek, <...>/comm.bif.zeek) 0.000000 MetaHookPre LoadFile(0, ./communityid.bif.zeek, <...>/communityid.bif.zeek) 0.000000 MetaHookPre LoadFile(0, ./const.bif.zeek, <...>/const.bif.zeek) @@ -1474,6 +1479,7 @@ 0.000000 MetaHookPre LoadFile(0, base<...>/ayiya, <...>/ayiya) 0.000000 MetaHookPre LoadFile(0, base<...>/broker, <...>/broker) 0.000000 MetaHookPre LoadFile(0, base<...>/cluster, <...>/cluster) +0.000000 MetaHookPre LoadFile(0, base<...>/cluster.bif, <...>/cluster.bif.zeek) 0.000000 MetaHookPre LoadFile(0, base<...>/comm.bif, <...>/comm.bif.zeek) 0.000000 MetaHookPre LoadFile(0, base<...>/communityid.bif, <...>/communityid.bif.zeek) 0.000000 MetaHookPre LoadFile(0, base<...>/config, <...>/config) @@ -1697,6 +1703,7 @@ 0.000000 MetaHookPre LoadFileExtended(0, ./api, <...>/api.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./bloom-filter.bif.zeek, <...>/bloom-filter.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./cardinality-counter.bif.zeek, <...>/cardinality-counter.bif.zeek) +0.000000 MetaHookPre LoadFileExtended(0, ./cluster.bif.zeek, <...>/cluster.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./comm.bif.zeek, <...>/comm.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./communityid.bif.zeek, <...>/communityid.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./const.bif.zeek, <...>/const.bif.zeek) @@ -1775,6 +1782,7 @@ 0.000000 MetaHookPre LoadFileExtended(0, base<...>/ayiya, <...>/ayiya) 0.000000 MetaHookPre LoadFileExtended(0, base<...>/broker, <...>/broker) 0.000000 MetaHookPre LoadFileExtended(0, base<...>/cluster, <...>/cluster) +0.000000 MetaHookPre LoadFileExtended(0, base<...>/cluster.bif, <...>/cluster.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, base<...>/comm.bif, <...>/comm.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, base<...>/communityid.bif, <...>/communityid.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, base<...>/config, <...>/config) @@ -2331,6 +2339,7 @@ 0.000000 | HookLoadFile ./audio <...>/audio.sig 0.000000 | HookLoadFile ./bloom-filter.bif.zeek <...>/bloom-filter.bif.zeek 0.000000 | HookLoadFile ./cardinality-counter.bif.zeek <...>/cardinality-counter.bif.zeek +0.000000 | HookLoadFile ./cluster.bif.zeek <...>/cluster.bif.zeek 0.000000 | HookLoadFile ./comm.bif.zeek <...>/comm.bif.zeek 0.000000 | HookLoadFile ./communityid.bif.zeek <...>/communityid.bif.zeek 0.000000 | HookLoadFile ./const.bif.zeek <...>/const.bif.zeek @@ -2419,6 +2428,7 @@ 0.000000 | HookLoadFile base<...>/ayiya <...>/ayiya 0.000000 | HookLoadFile base<...>/broker <...>/broker 0.000000 | HookLoadFile base<...>/cluster <...>/cluster +0.000000 | HookLoadFile base<...>/cluster.bif <...>/cluster.bif.zeek 0.000000 | HookLoadFile base<...>/comm.bif <...>/comm.bif.zeek 0.000000 | HookLoadFile base<...>/communityid.bif <...>/communityid.bif.zeek 0.000000 | HookLoadFile base<...>/config <...>/config @@ -2632,6 +2642,7 @@ 0.000000 | HookLoadFileExtended ./audio <...>/audio.sig 0.000000 | HookLoadFileExtended ./bloom-filter.bif.zeek <...>/bloom-filter.bif.zeek 0.000000 | HookLoadFileExtended ./cardinality-counter.bif.zeek <...>/cardinality-counter.bif.zeek +0.000000 | HookLoadFileExtended ./cluster.bif.zeek <...>/cluster.bif.zeek 0.000000 | HookLoadFileExtended ./comm.bif.zeek <...>/comm.bif.zeek 0.000000 | HookLoadFileExtended ./communityid.bif.zeek <...>/communityid.bif.zeek 0.000000 | HookLoadFileExtended ./const.bif.zeek <...>/const.bif.zeek @@ -2720,6 +2731,7 @@ 0.000000 | HookLoadFileExtended base<...>/ayiya <...>/ayiya 0.000000 | HookLoadFileExtended base<...>/broker <...>/broker 0.000000 | HookLoadFileExtended base<...>/cluster <...>/cluster +0.000000 | HookLoadFileExtended base<...>/cluster.bif <...>/cluster.bif.zeek 0.000000 | HookLoadFileExtended base<...>/comm.bif <...>/comm.bif.zeek 0.000000 | HookLoadFileExtended base<...>/communityid.bif <...>/communityid.bif.zeek 0.000000 | HookLoadFileExtended base<...>/config <...>/config diff --git a/testing/btest/cluster/broker/cluster-publish.zeek b/testing/btest/cluster/broker/cluster-publish.zeek new file mode 100644 index 0000000000..b96c984e6a --- /dev/null +++ b/testing/btest/cluster/broker/cluster-publish.zeek @@ -0,0 +1,106 @@ +# @TEST-DOC: Use Cluster::subscribe() and Cluster::publish() with Broker +# @TEST-GROUP: cluster +# +# @TEST-PORT: BROKER_PORT +# +# @TEST-EXEC: btest-bg-run recv "zeek -b ../recv.zeek >recv.out" +# @TEST-EXEC: btest-bg-run send "zeek -b ../send.zeek >send.out" +# +# @TEST-EXEC: btest-bg-wait 30 +# @TEST-EXEC: btest-diff recv/recv.out +# @TEST-EXEC: btest-diff send/send.out + +@TEST-START-FILE send.zeek + +redef Cluster::backend = Cluster::CLUSTER_BACKEND_BROKER; +redef exit_only_after_terminate = T; + +global event_count = 0; + +global ping: event(msg: string, c: count); + +event zeek_init() + { + Cluster::init(); + Broker::peer("127.0.0.1", to_port(getenv("BROKER_PORT"))); + + Cluster::subscribe("zeek/event/my_topic"); + } + +function send_event() + { + ++event_count; + local e = Cluster::make_event(ping, "my-message", event_count); + Cluster::publish("zeek/event/my_topic", e); + } + +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) + { + print fmt("sender added peer: endpoint=%s msg=%s", + endpoint$network$address, msg); + send_event(); + } + +event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) + { + print fmt("sender lost peer: endpoint=%s msg=%s", + endpoint$network$address, msg); + Cluster::unsubscribe("zeek/event/my_topic"); + terminate(); + } + +event pong(msg: string, n: count) + { + print "is_remote should be T, and is", is_remote_event(); + print fmt("sender got pong: %s, %s", msg, n); + send_event(); + } + +@TEST-END-FILE + + +@TEST-START-FILE recv.zeek + +redef Cluster::backend = Cluster::CLUSTER_BACKEND_BROKER; +redef exit_only_after_terminate = T; + +const events_to_recv = 5; + +global handler: event(msg: string, c: count); +global auto_handler: event(msg: string, c: count); + +global pong: event(msg: string, c: count); + +event zeek_init() + { + Cluster::init(); + Broker::listen("127.0.0.1", to_port(getenv("BROKER_PORT"))); + + Cluster::subscribe("zeek/event/my_topic"); + } + +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) + { + print fmt("receiver added peer: endpoint=%s msg=%s", endpoint$network$address, msg); + } + +event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) + { + print fmt("receiver lost peer: endpoint=%s msg=%s", endpoint$network$address, msg); + } + +event ping(msg: string, n: count) + { + print "is_remote should be T, and is", is_remote_event(); + print fmt("receiver got ping: %s, %s", msg, n); + + if ( n == events_to_recv ) + { + Cluster::unsubscribe("zeek/event/my_topic"); + terminate(); + return; + } + + Cluster::publish("zeek/event/my_topic", pong, msg, n); + } +@TEST-END-FILE diff --git a/testing/btest/cluster/generic/errors.zeek b/testing/btest/cluster/generic/errors.zeek new file mode 100644 index 0000000000..b428af3e1d --- /dev/null +++ b/testing/btest/cluster/generic/errors.zeek @@ -0,0 +1,52 @@ +# @TEST-DOC: Test some validation errors of cluster bifs +# +# @TEST-EXEC: zeek --parse-only -b %INPUT +# @TEST-EXEC: zeek -b %INPUT +# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-remove-abspath btest-diff .stderr +# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-remove-abspath btest-diff .stdout + +event ping1(c: count, how: string) &is_used + { + } + +hook hook1(c: count, how: string) &is_used + { + } + +event zeek_init() &priority=-1 + { + print "wrong number of args"; + + Cluster::publish("topic"); + local r1 = Cluster::make_event(); + print "r1", r1; + + Cluster::publish("topic", ping1); + local r2 = Cluster::make_event(ping1); + print "r2", r2; + + Cluster::publish("topic", ping1, 1); + local r3 = Cluster::make_event(ping1, 1); + print "r3", r3; + + Cluster::publish("topic", ping1, 1, "args", 1.2.3.4); + local r4 = Cluster::make_event(ping1, 1, "event", 1.2.3.4); + print "r4", r4; + } + +event zeek_init() &priority=-2 + { + print "wrong types"; + + Cluster::publish("topic", ping1, 1, 2); + local r1 = Cluster::make_event(ping1, 1, 2); + print "r1", r1; + + Cluster::publish("topic", hook1, 1, "hook"); + local r2 = Cluster::make_event(hook1, 1, "hook"); + print "r2", r2; + + Cluster::publish("topic", 1); + local r3 = Cluster::make_event(1); + print "r3", r2; + } diff --git a/testing/btest/cluster/generic/make_event.zeek b/testing/btest/cluster/generic/make_event.zeek new file mode 100644 index 0000000000..fd551b1263 --- /dev/null +++ b/testing/btest/cluster/generic/make_event.zeek @@ -0,0 +1,58 @@ +# @TEST-DOC: Test make_event behavior. +# +# @TEST-EXEC: zeek -b %INPUT >out +# +# @TEST-EXEC: btest-diff out +# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-remove-abspath btest-diff .stderr + +function test_fun() { } +hook test_hook() { } +event test_event() { } +event test_event2(s: string) { } + +function as_cluster_event(e: any): Cluster::Event + { + assert e is Cluster::Event; + return e as Cluster::Event; + } + +event zeek_init() &priority=10 + { + local e1 = Cluster::make_event(test_event); + local ce1 = as_cluster_event(e1); + print type_name(ce1$ev), ce1$args; + + local e2 = Cluster::make_event(test_event2, "abc"); + local ce2 = as_cluster_event(e2); + print type_name(ce2$ev), ce2$args; + } + +event zeek_init() &priority=-10 + { + local e = Cluster::make_event(); + } + +event zeek_init() &priority=-11 + { + local e = Cluster::make_event("a"); + } + +event zeek_init() &priority=-12 + { + local e = Cluster::make_event(test_fun); + } + +event zeek_init() &priority=-13 + { + local e = Cluster::make_event(test_hook); + } + +event zeek_init() &priority=-14 + { + local e = Cluster::make_event(test_event2); + } + +event zeek_init() &priority=-15 + { + local e = Cluster::make_event(test_event2, "a", "b"); + }