diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index 9cf1a4481d..dbe598311b 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -539,13 +539,22 @@ std::string Manager::NodeID() const { return to_string(bstate->endpoint.node_id( bool Manager::DoPublishEvent(const std::string& topic, const cluster::detail::Event& event) { broker::vector xs; xs.reserve(event.args.size()); + for ( const auto& a : event.args ) { - auto r = detail::val_to_data(a.get()); - if ( ! r ) { + if ( a->GetType() == zeek::BifType::Record::Broker::Data ) { + // When encountering a Broker::Data instance within args, pick out + // the broker::data directly to avoid double encoding of the record. + const auto& val = a->AsRecordVal()->GetField(0); + auto* data_val = static_cast(val.get()); + xs.emplace_back(data_val->data); + } + else if ( auto r = detail::val_to_data(a.get()) ) { + xs.emplace_back(std::move(r.value())); + } + else { Error("Failed to convert %s to broker::data", zeek::obj_desc(a.get()).c_str()); return false; } - xs.emplace_back(std::move(r.value())); } std::string name(event.HandlerName()); diff --git a/src/cluster/BifSupport.cc b/src/cluster/BifSupport.cc index 871a2dfc2d..9d3b0ac5c4 100644 --- a/src/cluster/BifSupport.cc +++ b/src/cluster/BifSupport.cc @@ -21,6 +21,9 @@ std::optional to_cluster_event(const zeek::RecordV const auto& func = rec->GetField(0); const auto& vargs = rec->GetField(1); + if ( ! func ) + return std::nullopt; + const auto& eh = zeek::event_registry->Lookup(func->AsFuncPtr()->GetName()); if ( ! eh ) { zeek::emit_builtin_error( diff --git a/src/cluster/serializer/broker/Serializer.cc b/src/cluster/serializer/broker/Serializer.cc index 6b1b036f22..fba37ff9e9 100644 --- a/src/cluster/serializer/broker/Serializer.cc +++ b/src/cluster/serializer/broker/Serializer.cc @@ -10,6 +10,7 @@ #include "zeek/broker/Data.h" #include "zeek/cluster/Backend.h" +#include "broker/data.bif.h" #include "broker/data_envelope.hh" #include "broker/error.hh" #include "broker/format/json.hh" @@ -30,8 +31,16 @@ namespace { std::optional to_broker_event(const detail::Event& ev) { broker::vector xs; xs.reserve(ev.args.size()); + for ( const auto& a : ev.args ) { - if ( auto res = zeek::Broker::detail::val_to_data(a.get()) ) { + if ( a->GetType() == zeek::BifType::Record::Broker::Data ) { + // When encountering a Broker::Data instance within args, pick out + // the broker::data directly to avoid double encoding, Broker::Data. + const auto& val = a->AsRecordVal()->GetField(0); + auto* data_val = static_cast(val.get()); + xs.emplace_back(data_val->data); + } + else if ( auto res = zeek::Broker::detail::val_to_data(a.get()) ) { xs.emplace_back(std::move(res.value())); } else { @@ -85,6 +94,11 @@ std::optional to_zeek_event(const broker::zeek::Event& ev) { for ( size_t i = 0; i < args.size(); ++i ) { const auto& expected_type = arg_types[i]; auto arg = args[i].to_data(); + // XXX: data_to_val() uses Broker::Data for `any` type parameters, exposing + // Broker::Data to the script-layer even if Broker isn't used. + // + // This might be part of the API, but seems we could also use the concrete + // Val type if the serializer encodes that information in the message. auto val = zeek::Broker::detail::data_to_val(arg, expected_type.get()); if ( val ) vl.emplace_back(std::move(val)); diff --git a/testing/btest/Baseline/cluster.broker.publish-any/..manager..stdout b/testing/btest/Baseline/cluster.broker.publish-any/..manager..stdout new file mode 100644 index 0000000000..5aea408737 --- /dev/null +++ b/testing/btest/Baseline/cluster.broker.publish-any/..manager..stdout @@ -0,0 +1,67 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +node_up, worker-1 +sending pings, 0, count, 1 +got pong, 1, for ping, 0, count (cluster publish), Broker::Data, [data=broker::data{1}] +got pong, 2, for ping, 0, count (cluster event ), Broker::Data, [data=broker::data{1}] +got pong, 3, for ping, 0, count (broker publish ), Broker::Data, [data=broker::data{1}] +got pong, 4, for ping, 0, count (broker event ), Broker::Data, [data=broker::data{1}] +got pong, 5, for ping, 0, count (cluster publish), Broker::Data, [data=broker::data{1}] +got pong, 6, for ping, 0, count (cluster event ), Broker::Data, [data=broker::data{1}] +got pong, 7, for ping, 0, count (broker publish ), Broker::Data, [data=broker::data{1}] +got pong, 8, for ping, 0, count (broker event ), Broker::Data, [data=broker::data{1}] +got pong, 9, for ping, 0, count (cluster publish), Broker::Data, [data=broker::data{1}] +got pong, 10, for ping, 0, count (cluster event ), Broker::Data, [data=broker::data{1}] +got pong, 11, for ping, 0, count (broker publish ), Broker::Data, [data=broker::data{1}] +got pong, 12, for ping, 0, count (broker event ), Broker::Data, [data=broker::data{1}] +sending pings, 1, string, a string +got pong, 13, for ping, 1, string (cluster publish), Broker::Data, [data=broker::data{a string}] +got pong, 14, for ping, 1, string (cluster event ), Broker::Data, [data=broker::data{a string}] +got pong, 15, for ping, 1, string (broker publish ), Broker::Data, [data=broker::data{a string}] +got pong, 16, for ping, 1, string (broker event ), Broker::Data, [data=broker::data{a string}] +got pong, 17, for ping, 1, string (cluster publish), Broker::Data, [data=broker::data{a string}] +got pong, 18, for ping, 1, string (cluster event ), Broker::Data, [data=broker::data{a string}] +got pong, 19, for ping, 1, string (broker publish ), Broker::Data, [data=broker::data{a string}] +got pong, 20, for ping, 1, string (broker event ), Broker::Data, [data=broker::data{a string}] +got pong, 21, for ping, 1, string (cluster publish), Broker::Data, [data=broker::data{a string}] +got pong, 22, for ping, 1, string (cluster event ), Broker::Data, [data=broker::data{a string}] +got pong, 23, for ping, 1, string (broker publish ), Broker::Data, [data=broker::data{a string}] +got pong, 24, for ping, 1, string (broker event ), Broker::Data, [data=broker::data{a string}] +sending pings, 2, port, 42/tcp +got pong, 25, for ping, 2, port (cluster publish), Broker::Data, [data=broker::data{42/tcp}] +got pong, 26, for ping, 2, port (cluster event ), Broker::Data, [data=broker::data{42/tcp}] +got pong, 27, for ping, 2, port (broker publish ), Broker::Data, [data=broker::data{42/tcp}] +got pong, 28, for ping, 2, port (broker event ), Broker::Data, [data=broker::data{42/tcp}] +got pong, 29, for ping, 2, port (cluster publish), Broker::Data, [data=broker::data{42/tcp}] +got pong, 30, for ping, 2, port (cluster event ), Broker::Data, [data=broker::data{42/tcp}] +got pong, 31, for ping, 2, port (broker publish ), Broker::Data, [data=broker::data{42/tcp}] +got pong, 32, for ping, 2, port (broker event ), Broker::Data, [data=broker::data{42/tcp}] +got pong, 33, for ping, 2, port (cluster publish), Broker::Data, [data=broker::data{42/tcp}] +got pong, 34, for ping, 2, port (cluster event ), Broker::Data, [data=broker::data{42/tcp}] +got pong, 35, for ping, 2, port (broker publish ), Broker::Data, [data=broker::data{42/tcp}] +got pong, 36, for ping, 2, port (broker event ), Broker::Data, [data=broker::data{42/tcp}] +sending pings, 3, vector of count, [1, 2, 3] +got pong, 37, for ping, 3, vector of count (cluster publish), Broker::Data, [data=broker::data{(1, 2, 3)}] +got pong, 38, for ping, 3, vector of count (cluster event ), Broker::Data, [data=broker::data{(1, 2, 3)}] +got pong, 39, for ping, 3, vector of count (broker publish ), Broker::Data, [data=broker::data{(1, 2, 3)}] +got pong, 40, for ping, 3, vector of count (broker event ), Broker::Data, [data=broker::data{(1, 2, 3)}] +got pong, 41, for ping, 3, vector of count (cluster publish), Broker::Data, [data=broker::data{(1, 2, 3)}] +got pong, 42, for ping, 3, vector of count (cluster event ), Broker::Data, [data=broker::data{(1, 2, 3)}] +got pong, 43, for ping, 3, vector of count (broker publish ), Broker::Data, [data=broker::data{(1, 2, 3)}] +got pong, 44, for ping, 3, vector of count (broker event ), Broker::Data, [data=broker::data{(1, 2, 3)}] +got pong, 45, for ping, 3, vector of count (cluster publish), Broker::Data, [data=broker::data{(1, 2, 3)}] +got pong, 46, for ping, 3, vector of count (cluster event ), Broker::Data, [data=broker::data{(1, 2, 3)}] +got pong, 47, for ping, 3, vector of count (broker publish ), Broker::Data, [data=broker::data{(1, 2, 3)}] +got pong, 48, for ping, 3, vector of count (broker event ), Broker::Data, [data=broker::data{(1, 2, 3)}] +sending pings, 4, time, 42.0 +got pong, 49, for ping, 4, time (cluster publish), Broker::Data, [data=broker::data{42000000000ns}] +got pong, 50, for ping, 4, time (cluster event ), Broker::Data, [data=broker::data{42000000000ns}] +got pong, 51, for ping, 4, time (broker publish ), Broker::Data, [data=broker::data{42000000000ns}] +got pong, 52, for ping, 4, time (broker event ), Broker::Data, [data=broker::data{42000000000ns}] +got pong, 53, for ping, 4, time (cluster publish), Broker::Data, [data=broker::data{42000000000ns}] +got pong, 54, for ping, 4, time (cluster event ), Broker::Data, [data=broker::data{42000000000ns}] +got pong, 55, for ping, 4, time (broker publish ), Broker::Data, [data=broker::data{42000000000ns}] +got pong, 56, for ping, 4, time (broker event ), Broker::Data, [data=broker::data{42000000000ns}] +got pong, 57, for ping, 4, time (cluster publish), Broker::Data, [data=broker::data{42000000000ns}] +got pong, 58, for ping, 4, time (cluster event ), Broker::Data, [data=broker::data{42000000000ns}] +got pong, 59, for ping, 4, time (broker publish ), Broker::Data, [data=broker::data{42000000000ns}] +got pong, 60, for ping, 4, time (broker event ), Broker::Data, [data=broker::data{42000000000ns}] diff --git a/testing/btest/Baseline/cluster.broker.publish-any/..worker-1..stdout b/testing/btest/Baseline/cluster.broker.publish-any/..worker-1..stdout new file mode 100644 index 0000000000..805fd81613 --- /dev/null +++ b/testing/btest/Baseline/cluster.broker.publish-any/..worker-1..stdout @@ -0,0 +1,17 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +got ping, 0, count, Broker::Data, [data=broker::data{1}] +got ping, 0, count, Broker::Data, [data=broker::data{1}] +got ping, 0, count, Broker::Data, [data=broker::data{1}] +got ping, 1, string, Broker::Data, [data=broker::data{a string}] +got ping, 1, string, Broker::Data, [data=broker::data{a string}] +got ping, 1, string, Broker::Data, [data=broker::data{a string}] +got ping, 2, port, Broker::Data, [data=broker::data{42/tcp}] +got ping, 2, port, Broker::Data, [data=broker::data{42/tcp}] +got ping, 2, port, Broker::Data, [data=broker::data{42/tcp}] +got ping, 3, vector of count, Broker::Data, [data=broker::data{(1, 2, 3)}] +got ping, 3, vector of count, Broker::Data, [data=broker::data{(1, 2, 3)}] +got ping, 3, vector of count, Broker::Data, [data=broker::data{(1, 2, 3)}] +got ping, 4, time, Broker::Data, [data=broker::data{42000000000ns}] +got ping, 4, time, Broker::Data, [data=broker::data{42000000000ns}] +got ping, 4, time, Broker::Data, [data=broker::data{42000000000ns}] +got finish! diff --git a/testing/btest/Baseline/cluster.generic.publish-any/..manager..stdout b/testing/btest/Baseline/cluster.generic.publish-any/..manager..stdout new file mode 100644 index 0000000000..1bb58756f0 --- /dev/null +++ b/testing/btest/Baseline/cluster.generic.publish-any/..manager..stdout @@ -0,0 +1,37 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +node_up, worker-1 +sending pings, 0, count, 1 +got pong, 1, with, 0, count (cluster publish), Broker::Data, [data=broker::data{1}] +got pong, 2, with, 0, count (cluster event ), Broker::Data, [data=broker::data{1}] +got pong, 3, with, 0, count (cluster publish), Broker::Data, [data=broker::data{1}] +got pong, 4, with, 0, count (cluster event ), Broker::Data, [data=broker::data{1}] +got pong, 5, with, 0, count (cluster publish), Broker::Data, [data=broker::data{1}] +got pong, 6, with, 0, count (cluster event ), Broker::Data, [data=broker::data{1}] +sending pings, 1, string, a string +got pong, 7, with, 1, string (cluster publish), Broker::Data, [data=broker::data{a string}] +got pong, 8, with, 1, string (cluster event ), Broker::Data, [data=broker::data{a string}] +got pong, 9, with, 1, string (cluster publish), Broker::Data, [data=broker::data{a string}] +got pong, 10, with, 1, string (cluster event ), Broker::Data, [data=broker::data{a string}] +got pong, 11, with, 1, string (cluster publish), Broker::Data, [data=broker::data{a string}] +got pong, 12, with, 1, string (cluster event ), Broker::Data, [data=broker::data{a string}] +sending pings, 2, port, 42/tcp +got pong, 13, with, 2, port (cluster publish), Broker::Data, [data=broker::data{42/tcp}] +got pong, 14, with, 2, port (cluster event ), Broker::Data, [data=broker::data{42/tcp}] +got pong, 15, with, 2, port (cluster publish), Broker::Data, [data=broker::data{42/tcp}] +got pong, 16, with, 2, port (cluster event ), Broker::Data, [data=broker::data{42/tcp}] +got pong, 17, with, 2, port (cluster publish), Broker::Data, [data=broker::data{42/tcp}] +got pong, 18, with, 2, port (cluster event ), Broker::Data, [data=broker::data{42/tcp}] +sending pings, 3, vector of count, [1, 2, 3] +got pong, 19, with, 3, vector of count (cluster publish), Broker::Data, [data=broker::data{(1, 2, 3)}] +got pong, 20, with, 3, vector of count (cluster event ), Broker::Data, [data=broker::data{(1, 2, 3)}] +got pong, 21, with, 3, vector of count (cluster publish), Broker::Data, [data=broker::data{(1, 2, 3)}] +got pong, 22, with, 3, vector of count (cluster event ), Broker::Data, [data=broker::data{(1, 2, 3)}] +got pong, 23, with, 3, vector of count (cluster publish), Broker::Data, [data=broker::data{(1, 2, 3)}] +got pong, 24, with, 3, vector of count (cluster event ), Broker::Data, [data=broker::data{(1, 2, 3)}] +sending pings, 4, time, 42.0 +got pong, 25, with, 4, time (cluster publish), Broker::Data, [data=broker::data{42000000000ns}] +got pong, 26, with, 4, time (cluster event ), Broker::Data, [data=broker::data{42000000000ns}] +got pong, 27, with, 4, time (cluster publish), Broker::Data, [data=broker::data{42000000000ns}] +got pong, 28, with, 4, time (cluster event ), Broker::Data, [data=broker::data{42000000000ns}] +got pong, 29, with, 4, time (cluster publish), Broker::Data, [data=broker::data{42000000000ns}] +got pong, 30, with, 4, time (cluster event ), Broker::Data, [data=broker::data{42000000000ns}] diff --git a/testing/btest/Baseline/cluster.generic.publish-any/..worker-1..stdout b/testing/btest/Baseline/cluster.generic.publish-any/..worker-1..stdout new file mode 100644 index 0000000000..805fd81613 --- /dev/null +++ b/testing/btest/Baseline/cluster.generic.publish-any/..worker-1..stdout @@ -0,0 +1,17 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +got ping, 0, count, Broker::Data, [data=broker::data{1}] +got ping, 0, count, Broker::Data, [data=broker::data{1}] +got ping, 0, count, Broker::Data, [data=broker::data{1}] +got ping, 1, string, Broker::Data, [data=broker::data{a string}] +got ping, 1, string, Broker::Data, [data=broker::data{a string}] +got ping, 1, string, Broker::Data, [data=broker::data{a string}] +got ping, 2, port, Broker::Data, [data=broker::data{42/tcp}] +got ping, 2, port, Broker::Data, [data=broker::data{42/tcp}] +got ping, 2, port, Broker::Data, [data=broker::data{42/tcp}] +got ping, 3, vector of count, Broker::Data, [data=broker::data{(1, 2, 3)}] +got ping, 3, vector of count, Broker::Data, [data=broker::data{(1, 2, 3)}] +got ping, 3, vector of count, Broker::Data, [data=broker::data{(1, 2, 3)}] +got ping, 4, time, Broker::Data, [data=broker::data{42000000000ns}] +got ping, 4, time, Broker::Data, [data=broker::data{42000000000ns}] +got ping, 4, time, Broker::Data, [data=broker::data{42000000000ns}] +got finish! diff --git a/testing/btest/cluster/broker/publish-any.zeek b/testing/btest/cluster/broker/publish-any.zeek new file mode 100644 index 0000000000..09497c7bae --- /dev/null +++ b/testing/btest/cluster/broker/publish-any.zeek @@ -0,0 +1,107 @@ +# @TEST-DOC: Send any values and observe behavior using broker. +# +# @TEST-PORT: BROKER_PORT1 +# @TEST-PORT: BROKER_PORT2 +# +# @TEST-EXEC: zeek -b --parse-only common.zeek manager.zeek worker.zeek +# +# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek" +# @TEST-EXEC: btest-bg-run worker-1 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-1 zeek -b ../worker.zeek" +# +# @TEST-EXEC: btest-bg-wait 10 +# @TEST-EXEC: btest-diff ./manager/.stdout +# @TEST-EXEC: btest-diff ./worker-1/.stdout + +@TEST-START-FILE cluster-layout.zeek +redef Cluster::nodes = { + ["manager"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT1"))], + ["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT2")), $manager="manager"], +}; +@TEST-END-FILE + +# @TEST-START-FILE common.zeek +redef Log::default_rotation_interval = 0sec; + +global finish: event() &is_used; +global ping: event(c: count, what: string, val: any) &is_used; +global pong: event(c: count, what: string, val: any) &is_used; +# @TEST-END-FILE + +# @TEST-START-FILE manager.zeek +@load ./common.zeek + +global i = 0; +global pongs = 0; + +event send_any() + { + if ( i > 4 ) + return; + + local val: any; + if ( i == 0 ) + val = 1; + else if ( i == 1 ) + val = "a string"; + else if ( i == 2 ) + val = 42/tcp; + else if ( i == 3 ) + val = vector(1, 2, 3); + else + val = double_to_time(42.0); + + print "sending pings", i, type_name(val), val; + Cluster::publish_hrw(Cluster::worker_pool, cat(i), ping, i, type_name(val), val); + Broker::publish(Cluster::worker_topic, ping, i, type_name(val), val); + local e = Cluster::make_event(ping, i, type_name(val), val); + Cluster::publish_hrw(Cluster::worker_pool, cat(i), e); + ++i; + + schedule 0.05sec { send_any() }; + } + +event pong(c: count, what: string, val: any) + { + ++pongs; + print "got pong", pongs, "for ping", c, what, type_name(val), val; + + # We send 5 pings in 3 different variations and + # get 4 one pong for each. + if ( pongs == 60 ) + Cluster::publish(Cluster::worker_topic, finish); + } + +event Cluster::node_up(name: string, id: string) + { + print "node_up", name; + schedule 0.1sec { send_any() }; + } + +event Cluster::node_down(name: string, id: string) + { + terminate(); + } +# @TEST-END-FILE + + +# @TEST-START-FILE worker.zeek +@load ./common.zeek + +event ping(c: count, what: string, val: any) + { + print "got ping", c, what, type_name(val), cat(val); + Cluster::publish(Cluster::manager_topic, pong, c, what + " (cluster publish)", val); + local e = Cluster::make_event(pong, c, what + " (cluster event )", val); + Cluster::publish(Cluster::manager_topic, e); + + Broker::publish(Cluster::manager_topic, pong, c, what + " (broker publish )", val); + local be = Broker::make_event(pong, c, what + " (broker event )", val); + Broker::publish(Cluster::manager_topic, be); + } + +event finish() + { + print "got finish!"; + terminate(); + } +# @TEST-END-FILE diff --git a/testing/btest/cluster/generic/publish-any.zeek b/testing/btest/cluster/generic/publish-any.zeek new file mode 100644 index 0000000000..9b68f6abab --- /dev/null +++ b/testing/btest/cluster/generic/publish-any.zeek @@ -0,0 +1,104 @@ +# @TEST-DOC: Send any values and observe behavior using zeromq. +# +# @TEST-REQUIRES: have-zeromq +# +# @TEST-PORT: XPUB_PORT +# @TEST-PORT: XSUB_PORT +# @TEST-PORT: LOG_PULL_PORT +# +# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-no-logger.zeek cluster-layout.zeek +# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek +# +# @TEST-EXEC: zeek -b --parse-only common.zeek manager.zeek worker.zeek +# +# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek" +# @TEST-EXEC: btest-bg-run worker-1 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-1 zeek -b ../worker.zeek" +# +# @TEST-EXEC: btest-bg-wait 10 +# @TEST-EXEC: btest-diff ./manager/.stdout +# @TEST-EXEC: btest-diff ./worker-1/.stdout + +# @TEST-START-FILE common.zeek +@load ./zeromq-test-bootstrap.zeek + +redef Log::default_rotation_interval = 0sec; + +global finish: event() &is_used; +global ping: event(c: count, what: string, val: any) &is_used; +global pong: event(c: count, what: string, val: any) &is_used; +# @TEST-END-FILE + +# @TEST-START-FILE manager.zeek +@load ./common.zeek + +global i = 0; +global pongs = 0; + +event send_any() + { + if ( i > 4 ) + return; + + local val: any; + if ( i == 0 ) + val = 1; + else if ( i == 1 ) + val = "a string"; + else if ( i == 2 ) + val = 42/tcp; + else if ( i == 3 ) + val = vector(1, 2, 3); + else + val = double_to_time(42.0); + + print "sending pings", i, type_name(val), val; + Cluster::publish_hrw(Cluster::worker_pool, cat(i), ping, i, type_name(val), val); + Cluster::publish(Cluster::worker_topic, ping, i, type_name(val), val); + local e = Cluster::make_event(ping, i, type_name(val), val); + Cluster::publish_hrw(Cluster::worker_pool, cat(i), e); + ++i; + + schedule 0.05sec { send_any() }; + } + +event pong(c: count, what: string, val: any) + { + ++pongs; + print "got pong", pongs, "with", c, what, type_name(val), val; + + # We send 5 pings in 3 different variations and + # get two pongs for each. + if ( pongs == 30 ) + Cluster::publish(Cluster::worker_topic, finish); + } + +event Cluster::node_up(name: string, id: string) + { + print "node_up", name; + schedule 0.1sec { send_any() }; + } + +event Cluster::node_down(name: string, id: string) + { + terminate(); + } +# @TEST-END-FILE + + +# @TEST-START-FILE worker.zeek +@load ./common.zeek + +event ping(c: count, what: string, val: any) + { + print "got ping", c, what, type_name(val), cat(val); + Cluster::publish(Cluster::manager_topic, pong, c, what + " (cluster publish)", val); + local e = Cluster::make_event(pong, c, what + " (cluster event )", val); + Cluster::publish(Cluster::manager_topic, e); + } + +event finish() + { + print "got finish!"; + terminate(); + } +# @TEST-END-FILE