From 189565d1319586c54f3f2e4eb037f718c5e95193 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Wed, 6 Nov 2024 12:25:19 +0100 Subject: [PATCH 01/16] types: Fix table() resulting in table_type->IsSet() == true typename(table()) apparently always resulted in a set[] type being rendered. Make the yield type of an unspecified table ANY so that type->IsSet() ends up false. While at it, also render unspecified types as table(), set() and vector() rather than vector of void, set[] or table[] of any which IMO should help to figure out what's going. --- src/Expr.cc | 2 +- src/Type.cc | 17 +++++++++++++++-- .../btest/Baseline/language.unspecified/output | 4 ++++ testing/btest/language/unspecified.zeek | 7 +++++++ 4 files changed, 27 insertions(+), 3 deletions(-) create mode 100644 testing/btest/Baseline/language.unspecified/output create mode 100644 testing/btest/language/unspecified.zeek diff --git a/src/Expr.cc b/src/Expr.cc index d03df0568f..bdac67c9be 100644 --- a/src/Expr.cc +++ b/src/Expr.cc @@ -3181,7 +3181,7 @@ TableConstructorExpr::TableConstructorExpr(ListExprPtr constructor_list, } else { if ( op->AsListExpr()->Exprs().empty() ) - SetType(make_intrusive(make_intrusive(base_type(TYPE_ANY)), nullptr)); + SetType(make_intrusive(make_intrusive(base_type(TYPE_ANY)), base_type(TYPE_ANY))); else { SetType(init_type(op)); diff --git a/src/Type.cc b/src/Type.cc index fe58ffbbeb..4b6d78a066 100644 --- a/src/Type.cc +++ b/src/Type.cc @@ -320,11 +320,19 @@ int IndexType::MatchesIndex(detail::ListExpr* const index) const { void IndexType::DoDescribe(ODesc* d) const { Type::DoDescribe(d); - if ( ! d->IsBinary() ) - d->Add("["); const auto& its = GetIndexTypes(); + // Deal with unspecified table/set. + if ( its.empty() ) { + if ( ! d->IsBinary() ) + d->Add("()"); + return; + } + + if ( ! d->IsBinary() ) + d->Add("["); + for ( auto i = 0u; i < its.size(); ++i ) { if ( ! d->IsBinary() && i > 0 ) d->Add(","); @@ -1763,6 +1771,11 @@ int VectorType::MatchesIndex(detail::ListExpr* const index) const { bool VectorType::IsUnspecifiedVector() const { return yield_type->Tag() == TYPE_VOID; } void VectorType::DoDescribe(ODesc* d) const { + if ( IsUnspecifiedVector() && d->IsReadable() ) { + d->Add("vector()"); + return; + } + if ( d->IsReadable() ) d->AddSP("vector of"); else diff --git a/testing/btest/Baseline/language.unspecified/output b/testing/btest/Baseline/language.unspecified/output new file mode 100644 index 0000000000..3879fcb759 --- /dev/null +++ b/testing/btest/Baseline/language.unspecified/output @@ -0,0 +1,4 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +set() +table() +vector() diff --git a/testing/btest/language/unspecified.zeek b/testing/btest/language/unspecified.zeek new file mode 100644 index 0000000000..55659398e1 --- /dev/null +++ b/testing/btest/language/unspecified.zeek @@ -0,0 +1,7 @@ +# @TEST-DOC: Test representation of unspecified table, set and vector +# @TEST-EXEC: zeek -b %INPUT >output +# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-remove-abspath btest-diff output + +print type_name(set()); +print type_name(table()); +print type_name(vector()); From 91c03cd98884bc16343576b319e1b4cdc7790729 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Wed, 6 Nov 2024 12:10:41 +0100 Subject: [PATCH 02/16] broker: Support publish() of unspecified set() / table() Calling Broker::make_event(..., table()) or Broker::publish(..., table()) caused runtime type errors as same_type() would result in false. --- src/broker/Manager.cc | 12 +- .../recv.recv.out | 12 ++ .../send.send.out | 13 ++ .../broker/publish_unspecified_table.zeek | 121 ++++++++++++++++++ 4 files changed, 155 insertions(+), 3 deletions(-) create mode 100644 testing/btest/Baseline/broker.publish_unspecified_table/recv.recv.out create mode 100644 testing/btest/Baseline/broker.publish_unspecified_table/send.send.out create mode 100644 testing/btest/broker/publish_unspecified_table.zeek diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index b5f80d2e9f..44c6c407a0 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -17,6 +17,7 @@ #include "zeek/Reporter.h" #include "zeek/RunState.h" #include "zeek/SerializationFormat.h" +#include "zeek/Type.h" #include "zeek/Var.h" #include "zeek/broker/Data.h" #include "zeek/broker/Store.h" @@ -890,13 +891,18 @@ zeek::RecordValPtr Manager::MakeEvent(ArgsSpan args) { continue; } - const auto& got_type = arg_val->GetType(); + auto got_type = arg_val->GetType(); const auto& expected_type = func->GetType()->ParamList()->GetTypes()[index - 1]; + // If called with an unspecified table or set, adopt the expected type. + if ( got_type->Tag() == TYPE_TABLE && got_type->AsTableType()->IsUnspecifiedTable() ) + if ( expected_type->Tag() == TYPE_TABLE && got_type->IsSet() == expected_type->IsSet() ) + got_type = expected_type; + if ( ! same_type(got_type, expected_type) ) { rval->Remove(0); - Error("event parameter #%zu type mismatch, got %s, expect %s", index, type_name(got_type->Tag()), - type_name(expected_type->Tag())); + Error("event parameter #%zu type mismatch, got %s, expect %s", index, + obj_desc_short(got_type.get()).c_str(), obj_desc_short(expected_type.get()).c_str()); return rval; } diff --git a/testing/btest/Baseline/broker.publish_unspecified_table/recv.recv.out b/testing/btest/Baseline/broker.publish_unspecified_table/recv.recv.out new file mode 100644 index 0000000000..cc70056375 --- /dev/null +++ b/testing/btest/Baseline/broker.publish_unspecified_table/recv.recv.out @@ -0,0 +1,12 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +receiver added peer: endpoint=127.0.0.1 msg=handshake successful +ping, my-message-make-event, 0 +ping, my-message-args, 0 +ping_set, my-message-make-event, 0 +ping_set, my-message-args, 0 +ping, my-message-make-event, 0 +ping, my-message-args, 0 +ping_set, my-message-make-event, 0 +ping_set, my-message-args, 0 +ping, my-message-make-event, 0 +ping, my-message-args, 0 diff --git a/testing/btest/Baseline/broker.publish_unspecified_table/send.send.out b/testing/btest/Baseline/broker.publish_unspecified_table/send.send.out new file mode 100644 index 0000000000..3aaeb60436 --- /dev/null +++ b/testing/btest/Baseline/broker.publish_unspecified_table/send.send.out @@ -0,0 +1,13 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +sender added peer: endpoint=127.0.0.1 msg=handshake successful +pong, my-message-make-event, 0 +pong, my-message-args, 0 +pong_set, my-message-make-event, 0 +pong_set, my-message-args, 0 +pong, my-message-make-event, 0 +pong, my-message-args, 0 +pong_set, my-message-make-event, 0 +pong_set, my-message-args, 0 +pong, my-message-make-event, 0 +pong, my-message-args, 0 +sender lost peer: endpoint=127.0.0.1 msg=lost connection to remote peer diff --git a/testing/btest/broker/publish_unspecified_table.zeek b/testing/btest/broker/publish_unspecified_table.zeek new file mode 100644 index 0000000000..81aaf21742 --- /dev/null +++ b/testing/btest/broker/publish_unspecified_table.zeek @@ -0,0 +1,121 @@ +# @TEST-DOC: Using table() or set() for a Broker::publish() or Broker::make_event() should do the right thing. +# +# @TEST-GROUP: broker +# +# @TEST-PORT: BROKER_PORT +# +# @TEST-EXEC: btest-bg-run recv "zeek -b ../recv.zeek >recv.out" +# @TEST-EXEC: btest-bg-run send "zeek -b ../send.zeek >send.out" +# +# @TEST-EXEC: btest-bg-wait 10 +# @TEST-EXEC: btest-diff recv/recv.out +# @TEST-EXEC: btest-diff send/send.out + +@TEST-START-FILE common.zeek +type ResultTable: table[string] of count; +type ResultSet : set[count]; + +global ping: event(msg: string, t: ResultTable) &is_used; +global pong: event(msg: string, t: ResultTable) &is_used; + +global ping_set: event(msg: string, s: ResultSet) &is_used; +global pong_set: event(msg: string, s: ResultSet) &is_used; +@TEST-END-FILE + +@TEST-START-FILE send.zeek +@load ./common.zeek + +redef exit_only_after_terminate = T; + +global event_count = 0; + + +event zeek_init() + { + Broker::subscribe("zeek/event/my_topic"); + Broker::peer("127.0.0.1", to_port(getenv("BROKER_PORT"))); + } + +function send_events() + { + local e = Broker::make_event(ping, "my-message-make-event", table()); + Broker::publish("zeek/event/my_topic", e); + Broker::publish("zeek/event/my_topic", ping, "my-message-args", table()); + + local es = Broker::make_event(ping_set, "my-message-make-event", set()); + Broker::publish("zeek/event/my_topic", es); + Broker::publish("zeek/event/my_topic", ping_set, "my-message-args", set()); + } + +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) + { + print fmt("sender added peer: endpoint=%s msg=%s", endpoint$network$address, msg); + send_events(); + } + +event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) + { + print fmt("sender lost peer: endpoint=%s msg=%s", endpoint$network$address, msg); + terminate(); + } + +event pong(msg: string, t: ResultTable) + { + ++event_count; + print "pong", msg, |t|; + } + +event pong_set(msg: string, s: ResultSet) + { + ++event_count; + print "pong_set", msg, |s|; + if ( event_count % 4 == 0 ) + send_events(); + } +@TEST-END-FILE + + +@TEST-START-FILE recv.zeek +@load ./common.zeek + +redef exit_only_after_terminate = T; + +const events_to_recv = 8; +global events_recv = 0; + +event zeek_init() + { + Broker::subscribe("zeek/event/my_topic"); + Broker::listen("127.0.0.1", to_port(getenv("BROKER_PORT"))); + } + +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) + { + print fmt("receiver added peer: endpoint=%s msg=%s", endpoint$network$address, msg); + } + +event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) + { + print fmt("receiver lost peer: endpoint=%s msg=%s", endpoint$network$address, msg); + } + +event ping(msg: string, t: ResultTable) + { + ++events_recv; + print "ping", msg, |t|; + Broker::publish("zeek/event/my_topic", pong, msg, t); + } + +event ping_set(msg: string, s: ResultSet) + { + ++events_recv; + if ( events_recv > events_to_recv ) + { + terminate(); + return; + } + + print "ping_set", msg, |s|; + Broker::publish("zeek/event/my_topic", pong_set, msg, s); + } +@TEST-END-FILE From 93478a246e7645df4b0751d40f2a332893f442b8 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Tue, 5 Nov 2024 16:26:01 +0100 Subject: [PATCH 03/16] intel: Switch to Cluster::publish() This isn't quite making things a lot nicer, but more explicit. --- scripts/base/frameworks/intel/cluster.zeek | 30 +++++++++++++++++----- 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/scripts/base/frameworks/intel/cluster.zeek b/scripts/base/frameworks/intel/cluster.zeek index d890e455f9..016b49f35c 100644 --- a/scripts/base/frameworks/intel/cluster.zeek +++ b/scripts/base/frameworks/intel/cluster.zeek @@ -11,6 +11,9 @@ module Intel; global insert_item: event(item: Item) &is_used; global insert_indicator: event(item: Item) &is_used; +# Event to transfer the min_data_store to connecting nodes. +global new_min_data_store: event(store: MinDataStore) &is_used; + # By default the manager sends its current min_data_store to connecting workers. # During testing it's handy to suppress this, since receipt of the store # introduces nondeterminism when mixed with explicit data insertions. @@ -22,9 +25,10 @@ redef have_full_data = F; @endif @if ( Cluster::local_node_type() == Cluster::MANAGER ) -event zeek_init() +# The manager propagates remove_indicator() to workers. +event remove_indicator(item: Item) { - Broker::auto_publish(Cluster::worker_topic, remove_indicator); + Broker::publish(Cluster::worker_topic, remove_indicator, item); } # Handling of new worker nodes. @@ -35,7 +39,7 @@ event Cluster::node_up(name: string, id: string) # this by the insert_indicator event. if ( send_store_on_node_up && name in Cluster::nodes && Cluster::nodes[name]$node_type == Cluster::WORKER ) { - Broker::publish_id(Cluster::node_topic(name), "Intel::min_data_store"); + Broker::publish(Cluster::node_topic(name), new_min_data_store, min_data_store); } } @@ -43,6 +47,9 @@ event Cluster::node_up(name: string, id: string) # has to be distributed. event Intel::new_item(item: Item) &priority=5 { + # This shouldn't be required, pushing directly from + # the manager is more efficient and has less round + # trips for non-broker backends. local pt = Cluster::rr_topic(Cluster::proxy_pool, "intel_insert_rr_key"); if ( pt == "" ) @@ -73,11 +80,16 @@ event Intel::match_remote(s: Seen) &priority=5 } @endif + @if ( Cluster::local_node_type() == Cluster::WORKER ) -event zeek_init() +event match_remote(s: Seen) { - Broker::auto_publish(Cluster::manager_topic, match_remote); - Broker::auto_publish(Cluster::manager_topic, remove_item); + Broker::publish(Cluster::manager_topic, match_remote, s); + } + +event remove_item(item: Item, purge_indicator: bool) + { + Broker::publish(Cluster::manager_topic, remove_item, item, purge_indicator); } # On a worker, the new_item event requires to trigger the insertion @@ -92,6 +104,12 @@ event Intel::insert_indicator(item: Intel::Item) &priority=5 { Intel::_insert(item, F); } + +# Handling of a complete MinDataStore snapshot +event new_min_data_store(store: MinDataStore) + { + min_data_store = store; + } @endif @if ( Cluster::local_node_type() == Cluster::PROXY ) From 219d6212347282b524a3577b08cbcc6cc3edf42a Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Tue, 5 Nov 2024 17:15:35 +0100 Subject: [PATCH 04/16] netcontrol: Replace Broker::auto_publish() I'd think we could drop the cluster.zeek and non-cluster.zeek and just unconditionally do the publish(), but keeping it for now. --- .../base/frameworks/netcontrol/cluster.zeek | 46 +++++++++---------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/scripts/base/frameworks/netcontrol/cluster.zeek b/scripts/base/frameworks/netcontrol/cluster.zeek index a79e03cd95..5cebbb0e1b 100644 --- a/scripts/base/frameworks/netcontrol/cluster.zeek +++ b/scripts/base/frameworks/netcontrol/cluster.zeek @@ -16,26 +16,6 @@ export { global cluster_netcontrol_delete_rule: event(id: string, reason: string); } -@if ( Cluster::local_node_type() == Cluster::MANAGER ) -event zeek_init() - { - Broker::auto_publish(Cluster::worker_topic, NetControl::rule_added); - Broker::auto_publish(Cluster::worker_topic, NetControl::rule_removed); - Broker::auto_publish(Cluster::worker_topic, NetControl::rule_timeout); - Broker::auto_publish(Cluster::worker_topic, NetControl::rule_error); - Broker::auto_publish(Cluster::worker_topic, NetControl::rule_exists); - Broker::auto_publish(Cluster::worker_topic, NetControl::rule_new); - Broker::auto_publish(Cluster::worker_topic, NetControl::rule_destroyed); - } -@else -event zeek_init() - { - Broker::auto_publish(Cluster::manager_topic, NetControl::cluster_netcontrol_add_rule); - Broker::auto_publish(Cluster::manager_topic, NetControl::cluster_netcontrol_remove_rule); - Broker::auto_publish(Cluster::manager_topic, NetControl::cluster_netcontrol_delete_rule); - } -@endif - function activate(p: PluginState, priority: int) { # We only run the activate function on the manager. @@ -66,7 +46,7 @@ function add_rule(r: Rule) : string if ( r$id == "" ) r$id = cat(Cluster::node, ":", ++local_rule_count); - event NetControl::cluster_netcontrol_add_rule(r); + Broker::publish(Cluster::manager_topic, NetControl::cluster_netcontrol_add_rule, r); return r$id; } } @@ -77,7 +57,7 @@ function delete_rule(id: string, reason: string &default="") : bool return delete_rule_impl(id, reason); else { - event NetControl::cluster_netcontrol_delete_rule(id, reason); + Broker::publish(Cluster::manager_topic, NetControl::cluster_netcontrol_delete_rule, id, reason); return T; # well, we can't know here. So - just hope... } } @@ -88,7 +68,7 @@ function remove_rule(id: string, reason: string &default="") : bool return remove_rule_impl(id, reason); else { - event NetControl::cluster_netcontrol_remove_rule(id, reason); + Broker::publish(Cluster::manager_topic, NetControl::cluster_netcontrol_remove_rule, id, reason); return T; # well, we can't know here. So - just hope... } } @@ -120,6 +100,8 @@ event rule_exists(r: Rule, p: PluginState, msg: string) &priority=5 if ( r?$expire && r$expire > 0secs && ! p$plugin$can_expire ) schedule r$expire { rule_expire(r, p) }; + + Broker::publish(Cluster::worker_topic, rule_exists, r, p, msg); } event rule_added(r: Rule, p: PluginState, msg: string) &priority=5 @@ -128,21 +110,39 @@ event rule_added(r: Rule, p: PluginState, msg: string) &priority=5 if ( r?$expire && r$expire > 0secs && ! p$plugin$can_expire ) schedule r$expire { rule_expire(r, p) }; + + Broker::publish(Cluster::worker_topic, rule_added, r, p, msg); } event rule_removed(r: Rule, p: PluginState, msg: string) &priority=-5 { rule_removed_impl(r, p, msg); + + Broker::publish(Cluster::worker_topic, rule_removed, r, p, msg); } event rule_timeout(r: Rule, i: FlowInfo, p: PluginState) &priority=-5 { rule_timeout_impl(r, i, p); + + Broker::publish(Cluster::worker_topic, rule_timeout, r, i, p); } event rule_error(r: Rule, p: PluginState, msg: string) &priority=-5 { rule_error_impl(r, p, msg); + + Broker::publish(Cluster::worker_topic, rule_error, r, msg); + } + +event rule_new(r: Rule) + { + Broker::publish(Cluster::worker_topic, rule_new, r); + } + +event rule_destroyed(r: Rule) + { + Broker::publish(Cluster::worker_topic, rule_destroyed, r); } @endif From 08f2198d3e3810b43bb0c5d28a413c87bd6a7a3f Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Tue, 5 Nov 2024 19:26:49 +0100 Subject: [PATCH 05/16] frameworks/notice: Remove Broker::auto_publish() --- scripts/base/frameworks/notice/main.zeek | 55 ++++++++---------------- 1 file changed, 19 insertions(+), 36 deletions(-) diff --git a/scripts/base/frameworks/notice/main.zeek b/scripts/base/frameworks/notice/main.zeek index 991376dc46..2a827fcbaf 100644 --- a/scripts/base/frameworks/notice/main.zeek +++ b/scripts/base/frameworks/notice/main.zeek @@ -281,18 +281,6 @@ export { ## identifier: The identifier string of the notice that should be suppressed. global begin_suppression: event(ts: time, suppress_for: interval, note: Type, identifier: string); - ## This is an internal event that is used to broadcast the begin_suppression - ## event over a cluster. - ## - ## ts: time indicating then when the notice to be suppressed occurred. - ## - ## suppress_for: length of time that this notice should be suppressed. - ## - ## note: The :zeek:type:`Notice::Type` of the notice. - ## - ## identifier: The identifier string of the notice that should be suppressed. - global manager_begin_suppression: event(ts: time, suppress_for: interval, note: Type, identifier: string); - ## A function to determine if an event is supposed to be suppressed. ## ## n: The record containing the notice in question. @@ -536,39 +524,34 @@ hook Notice::notice(n: Notice::Info) &priority=-5 event Notice::begin_suppression(n$ts, n$suppress_for, n$note, n$identifier); suppressing[n$note, n$identifier] = n$ts + n$suppress_for; @if ( Cluster::is_enabled() && Cluster::local_node_type() != Cluster::MANAGER ) - event Notice::manager_begin_suppression(n$ts, n$suppress_for, n$note, n$identifier); + # Notify the manager about the new suppression, it'll broadcast + # to the other nodes in the cluster. + # Once we have global pub/sub, we could also unconditionally + # send to a notice specific topic for communicating + # suppressions directly to all nodes. + Broker::publish(Cluster::manager_topic, Notice::begin_suppression, + n$ts, n$suppress_for, n$note, n$identifier); @endif } } -event Notice::begin_suppression(ts: time, suppress_for: interval, note: Type, - identifier: string) +# The manager currently re-publishes Notice::begin_suppression to worker +# and proxy nodes. +@if ( Cluster::is_enabled() && Cluster::local_node_type() == Cluster::MANAGER ) +event Notice::begin_suppression(ts: time, suppress_for: interval, note: Type, identifier: string) + { + local e = Broker::make_event(Notice::begin_suppression, ts, suppress_for, note, identifier); + Broker::publish(Cluster::worker_topic, e); + Broker::publish(Cluster::proxy_topic, e); + } +@endif + +event Notice::begin_suppression(ts: time, suppress_for: interval, note: Type, identifier: string) { local suppress_until = ts + suppress_for; suppressing[note, identifier] = suppress_until; } -@if ( Cluster::is_enabled() && Cluster::local_node_type() == Cluster::MANAGER ) -event zeek_init() - { - Broker::auto_publish(Cluster::worker_topic, Notice::begin_suppression); - Broker::auto_publish(Cluster::proxy_topic, Notice::begin_suppression); - } - -event Notice::manager_begin_suppression(ts: time, suppress_for: interval, note: Type, - identifier: string) - { - event Notice::begin_suppression(ts, suppress_for, note, identifier); - } -@endif - -@if ( Cluster::is_enabled() && Cluster::local_node_type() != Cluster::MANAGER ) -event zeek_init() - { - Broker::auto_publish(Cluster::manager_topic, Notice::manager_begin_suppression); - } -@endif - function is_being_suppressed(n: Notice::Info): bool { if ( n?$identifier && [n$note, n$identifier] in suppressing ) From cb10852f999e6d7f7dccc94c2cf073cfe7c0bb1a Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Tue, 5 Nov 2024 22:19:09 +0100 Subject: [PATCH 06/16] dhcp: Remove Broker::auto_publish() This isn't prettier, but neither worse IMO. A test would be good. --- scripts/base/protocols/dhcp/main.zeek | 13 ++-- .../manager-1..stdout | 2 + .../manager-1.dhcp.log | 11 +++ .../worker-1..stdout | 4 ++ .../scripts/base/protocols/dhcp/cluster.zeek | 71 +++++++++++++++++++ 5 files changed, 93 insertions(+), 8 deletions(-) create mode 100644 testing/btest/Baseline/scripts.base.protocols.dhcp.cluster/manager-1..stdout create mode 100644 testing/btest/Baseline/scripts.base.protocols.dhcp.cluster/manager-1.dhcp.log create mode 100644 testing/btest/Baseline/scripts.base.protocols.dhcp.cluster/worker-1..stdout create mode 100644 testing/btest/scripts/base/protocols/dhcp/cluster.zeek diff --git a/scripts/base/protocols/dhcp/main.zeek b/scripts/base/protocols/dhcp/main.zeek index 1d53cbfd63..7b60bf410d 100644 --- a/scripts/base/protocols/dhcp/main.zeek +++ b/scripts/base/protocols/dhcp/main.zeek @@ -134,13 +134,6 @@ event zeek_init() &priority=5 Analyzer::register_for_ports(Analyzer::ANALYZER_DHCP, ports); } -@if ( Cluster::is_enabled() ) -event zeek_init() - { - Broker::auto_publish(Cluster::manager_topic, DHCP::aggregate_msgs); - } -@endif - function join_data_expiration(t: table[count] of Info, idx: count): interval { local info = t[idx]; @@ -307,7 +300,11 @@ event DHCP::aggregate_msgs(ts: time, id: conn_id, uid: string, is_orig: bool, ms # Aggregate DHCP messages to the manager. event dhcp_message(c: connection, is_orig: bool, msg: DHCP::Msg, options: DHCP::Options) &priority=-5 { - event DHCP::aggregate_msgs(network_time(), c$id, c$uid, is_orig, msg, options); + if ( Cluster::is_enabled() && Cluster::local_node_type() != Cluster::MANAGER ) + Broker::publish(Cluster::manager_topic, DHCP::aggregate_msgs, + network_time(), c$id, c$uid, is_orig, msg, options); + else + event DHCP::aggregate_msgs(network_time(), c$id, c$uid, is_orig, msg, options); } event zeek_done() &priority=-5 diff --git a/testing/btest/Baseline/scripts.base.protocols.dhcp.cluster/manager-1..stdout b/testing/btest/Baseline/scripts.base.protocols.dhcp.cluster/manager-1..stdout new file mode 100644 index 0000000000..ed1bc7d49d --- /dev/null +++ b/testing/btest/Baseline/scripts.base.protocols.dhcp.cluster/manager-1..stdout @@ -0,0 +1,2 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +DHCP::aggregate_msgs, XXXXXXXXXX.XXXXXX, CHhAvVGS1DHFjwGM9 diff --git a/testing/btest/Baseline/scripts.base.protocols.dhcp.cluster/manager-1.dhcp.log b/testing/btest/Baseline/scripts.base.protocols.dhcp.cluster/manager-1.dhcp.log new file mode 100644 index 0000000000..6da1b3dc58 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.protocols.dhcp.cluster/manager-1.dhcp.log @@ -0,0 +1,11 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +#separator \x09 +#set_separator , +#empty_field (empty) +#unset_field - +#path dhcp +#open XXXX-XX-XX-XX-XX-XX +#fields ts uids client_addr server_addr mac host_name client_fqdn domain requested_addr assigned_addr lease_time client_message server_message msg_types duration +#types time set[string] addr addr string string string string addr addr interval string string vector[string] interval +XXXXXXXXXX.XXXXXX CHhAvVGS1DHFjwGM9 192.168.0.10 10.10.0.1 00:0a:28:00:fa:42 - - - - 192.168.0.10 3600.000000 - - ACK 0.000000 +#close XXXX-XX-XX-XX-XX-XX diff --git a/testing/btest/Baseline/scripts.base.protocols.dhcp.cluster/worker-1..stdout b/testing/btest/Baseline/scripts.base.protocols.dhcp.cluster/worker-1..stdout new file mode 100644 index 0000000000..0520896388 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.protocols.dhcp.cluster/worker-1..stdout @@ -0,0 +1,4 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +node_up, manager-1 +dhcp_message, CHhAvVGS1DHFjwGM9 +file_done diff --git a/testing/btest/scripts/base/protocols/dhcp/cluster.zeek b/testing/btest/scripts/base/protocols/dhcp/cluster.zeek new file mode 100644 index 0000000000..57ce58db5a --- /dev/null +++ b/testing/btest/scripts/base/protocols/dhcp/cluster.zeek @@ -0,0 +1,71 @@ +# Test in cluster mode, the manager produces the cluster.log +# +# @TEST-PORT: BROKER_PORT1 +# @TEST-PORT: BROKER_PORT2 +# +# @TEST-EXEC: zeek -b --parse-only %INPUT +# @TEST-EXEC: btest-bg-run manager-1 "cp ../cluster-layout.zeek . && CLUSTER_NODE=manager-1 zeek -b %INPUT" +# @TEST-EXEC: btest-bg-run worker-1 "cp ../cluster-layout.zeek . && CLUSTER_NODE=worker-1 zeek -b --pseudo-realtime -C -r $TRACES/dhcp/dhcp_ack_subscriber_id_and_agent_remote_id.trace %INPUT" + +# @TEST-EXEC: btest-bg-wait 10 +# @TEST-EXEC: btest-diff worker-1/.stdout +# @TEST-EXEC: btest-diff manager-1/.stdout +# @TEST-EXEC: btest-diff manager-1/dhcp.log + +@TEST-START-FILE cluster-layout.zeek +redef Cluster::nodes = { + ["manager-1"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT1"))], + ["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT2")), $manager="manager-1"], +}; +@TEST-END-FILE + +@load base/protocols/dhcp +@load base/frameworks/broker +@load base/frameworks/cluster +@load base/frameworks/logging + +redef Log::default_rotation_interval = 0secs; +redef Log::default_rotation_postprocessor_cmd = "echo"; +redef exit_only_after_terminate = T; + +redef Broker::disable_ssl = T; +redef Cluster::manager_is_logger = T; + +event terminate_me() { + terminate(); +} + +@if ( Cluster::local_node_type() == Cluster::WORKER ) +event zeek_init() + { + suspend_processing(); + } + +event Cluster::node_up(name: string, id: string) + { + print "node_up", name; + continue_processing(); + } + +event dhcp_message(c: connection, is_orig: bool, msg: DHCP::Msg, options: DHCP::Options) + { + print "dhcp_message", c$uid; + } + +event Pcap::file_done(path: string) + { + print "file_done"; + terminate(); + } +@else + +event DHCP::aggregate_msgs(ts: time, id: conn_id, uid: string, is_orig: bool, msg: DHCP::Msg, options: DHCP::Options) &priority=5 + { + print "DHCP::aggregate_msgs", ts, uid; + } + +event Cluster::node_down(name: string, id: string) + { + terminate(); + } +@endif From b32153037a6198cebdd5e5d29992541babf97fc9 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Tue, 5 Nov 2024 22:48:00 +0100 Subject: [PATCH 07/16] openflow: Remove Broker::auto_publish() --- scripts/base/frameworks/openflow/cluster.zeek | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/scripts/base/frameworks/openflow/cluster.zeek b/scripts/base/frameworks/openflow/cluster.zeek index a16539c281..a73f7389e8 100644 --- a/scripts/base/frameworks/openflow/cluster.zeek +++ b/scripts/base/frameworks/openflow/cluster.zeek @@ -13,15 +13,6 @@ export { global cluster_flow_clear: event(name: string); } -@if ( Cluster::local_node_type() != Cluster::MANAGER ) -# Workers need ability to forward commands to manager. -event zeek_init() - { - Broker::auto_publish(Cluster::manager_topic, OpenFlow::cluster_flow_mod); - Broker::auto_publish(Cluster::manager_topic, OpenFlow::cluster_flow_clear); - } -@endif - # the flow_mod function wrapper function flow_mod(controller: Controller, match: ofp_match, flow_mod: ofp_flow_mod): bool { @@ -31,7 +22,7 @@ function flow_mod(controller: Controller, match: ofp_match, flow_mod: ofp_flow_m if ( Cluster::local_node_type() == Cluster::MANAGER ) return controller$flow_mod(controller$state, match, flow_mod); else - event OpenFlow::cluster_flow_mod(controller$state$_name, match, flow_mod); + Broker::publish(Cluster::manager_topic, OpenFlow::cluster_flow_mod, controller$state$_name, match, flow_mod); return T; } @@ -44,7 +35,7 @@ function flow_clear(controller: Controller): bool if ( Cluster::local_node_type() == Cluster::MANAGER ) return controller$flow_clear(controller$state); else - event OpenFlow::cluster_flow_clear(controller$state$_name); + Broker::publish(Cluster::manager_topic, OpenFlow::cluster_flow_clear, controller$state$_name); return T; } From 416887157c4683ae9c8e2555877d3d4ff83aafb9 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Tue, 5 Nov 2024 22:58:52 +0100 Subject: [PATCH 08/16] cluster_started: No Broker::auto_publish() use --- scripts/policy/frameworks/cluster/experimental.zeek | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/scripts/policy/frameworks/cluster/experimental.zeek b/scripts/policy/frameworks/cluster/experimental.zeek index 65375009d2..83e6febdca 100644 --- a/scripts/policy/frameworks/cluster/experimental.zeek +++ b/scripts/policy/frameworks/cluster/experimental.zeek @@ -48,11 +48,6 @@ global is_cluster_started = F; @load ./nodes-experimental/manager @endif -event zeek_init() &priority=4 - { - Broker::auto_publish(Cluster::manager_topic, Cluster::Experimental::node_fully_connected); - } - hook Cluster::connect_node_hook(connectee: Cluster::NamedNode) { add connectees_pending[connectee$name]; @@ -71,8 +66,11 @@ event Cluster::node_up(name: string, id: string) &priority=-10 # pending connectee is left. delete connectees_pending[name]; if ( |connectees_pending| == 0 ) - event Cluster::Experimental::node_fully_connected(Cluster::node, Broker::node_id(), - is_cluster_started); + { + event node_fully_connected(Cluster::node, Broker::node_id(), is_cluster_started); + Broker::publish(Cluster::manager_topic, node_fully_connected, + Cluster::node, Broker::node_id(), is_cluster_started); + } } event Cluster::Experimental::node_fully_connected(name: string, id: string, resending: bool) From 883ae3694cf5f4b131a3ef7afd63ae77d3dc1583 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Wed, 6 Nov 2024 11:01:27 +0100 Subject: [PATCH 09/16] sumstats: Remove Broker::auto_publish() --- scripts/base/frameworks/sumstats/cluster.zeek | 69 ++++++++++--------- 1 file changed, 36 insertions(+), 33 deletions(-) diff --git a/scripts/base/frameworks/sumstats/cluster.zeek b/scripts/base/frameworks/sumstats/cluster.zeek index 5a975c5ee9..c5d45e7a7d 100644 --- a/scripts/base/frameworks/sumstats/cluster.zeek +++ b/scripts/base/frameworks/sumstats/cluster.zeek @@ -61,14 +61,6 @@ global recent_global_view_keys: set[string, Key] &create_expire=1min; @if ( Cluster::local_node_type() != Cluster::MANAGER ) -event zeek_init() &priority=100 - { - Broker::auto_publish(Cluster::manager_topic, SumStats::cluster_send_result); - Broker::auto_publish(Cluster::manager_topic, SumStats::cluster_key_intermediate_response); - Broker::auto_publish(Cluster::manager_topic, SumStats::send_a_key); - Broker::auto_publish(Cluster::manager_topic, SumStats::send_no_key); - } - # Result tables indexed on a uid that are currently being sent to the # manager. global sending_results: table[string] of ResultTable = table() &read_expire=1min; @@ -87,7 +79,8 @@ function data_added(ss: SumStat, key: Key, result: Result) if ( check_thresholds(ss, key, result, cluster_request_global_view_percent) ) { # kick off intermediate update - event SumStats::cluster_key_intermediate_response(ss$name, key); + Broker::publish(Cluster::manager_topic, SumStats::cluster_key_intermediate_response, + ss$name, key); add recent_global_view_keys[ss$name, key]; } } @@ -98,13 +91,15 @@ event SumStats::get_a_key(uid: string, ss_name: string, cleanup: bool) { if ( |sending_results[uid]| == 0 ) { - event SumStats::send_no_key(uid, ss_name); + Broker::publish(Cluster::manager_topic, SumStats::send_no_key, + uid, ss_name); } else { for ( key in sending_results[uid] ) { - event SumStats::send_a_key(uid, ss_name, key); + Broker::publish(Cluster::manager_topic, SumStats::send_a_key, + uid, ss_name, key); # break to only send one. break; } @@ -120,7 +115,8 @@ event SumStats::get_a_key(uid: string, ss_name: string, cleanup: bool) { for ( key in result_store[ss_name] ) { - event SumStats::send_a_key(uid, ss_name, key); + Broker::publish(Cluster::manager_topic, SumStats::send_a_key, + uid, ss_name, key); # break to only send one. break; } @@ -128,7 +124,8 @@ event SumStats::get_a_key(uid: string, ss_name: string, cleanup: bool) } else { - event SumStats::send_no_key(uid, ss_name); + Broker::publish(Cluster::manager_topic, SumStats::send_no_key, + uid, ss_name); } } @@ -154,16 +151,20 @@ event SumStats::cluster_get_result(uid: string, ss_name: string, key: Key, clean { if ( uid in sending_results && key in sending_results[uid] ) { + # XXX: Is that comment stale? + # # Note: copy is needed to compensate serialization caching issue. This should be # changed to something else later. - event SumStats::cluster_send_result(uid, ss_name, key, copy(sending_results[uid][key]), cleanup); + Broker::publish(Cluster::manager_topic, SumStats::cluster_send_result, + uid, ss_name, key, copy(sending_results[uid][key]), cleanup); delete sending_results[uid][key]; } else { # We need to send an empty response if we don't have the data so that the manager # can know that it heard back from all of the workers. - event SumStats::cluster_send_result(uid, ss_name, key, table(), cleanup); + Broker::publish(Cluster::manager_topic, SumStats::cluster_send_result, + uid, ss_name, key, table(), cleanup); } } else @@ -172,13 +173,15 @@ event SumStats::cluster_get_result(uid: string, ss_name: string, key: Key, clean { # Note: copy is needed to compensate serialization caching issue. This should be # changed to something else later. - event SumStats::cluster_send_result(uid, ss_name, key, copy(result_store[ss_name][key]), cleanup); + Broker::publish(Cluster::manager_topic, SumStats::cluster_send_result, + uid, ss_name, key, copy(result_store[ss_name][key]), cleanup); } else { # We need to send an empty response if we don't have the data so that the manager # can know that it heard back from all of the workers. - event SumStats::cluster_send_result(uid, ss_name, key, table(), cleanup); + Broker::publish(Cluster::manager_topic, SumStats::cluster_send_result, + uid, ss_name, key, table(), cleanup); } } } @@ -209,14 +212,6 @@ function request_key(ss_name: string, key: Key): Result @if ( Cluster::local_node_type() == Cluster::MANAGER ) -event zeek_init() &priority=100 - { - Broker::auto_publish(Cluster::worker_topic, SumStats::cluster_ss_request); - Broker::auto_publish(Cluster::worker_topic, SumStats::cluster_get_result); - Broker::auto_publish(Cluster::worker_topic, SumStats::cluster_threshold_crossed); - Broker::auto_publish(Cluster::worker_topic, SumStats::get_a_key); - } - # This variable is maintained by manager nodes as they collect and aggregate # results. # Index on a uid. @@ -263,12 +258,14 @@ event SumStats::finish_epoch(ss: SumStat) stats_keys[uid] = set(); # Request data from peers. - event SumStats::cluster_ss_request(uid, ss$name, T); + Broker::publish(Cluster::worker_topic, SumStats::cluster_ss_request, + uid, ss$name, T); done_with[uid] = 0; #print fmt("get_key by uid: %s", uid); - event SumStats::get_a_key(uid, ss$name, T); + Broker::publish(Cluster::worker_topic, SumStats::get_a_key, + uid, ss$name, T); } # Schedule the next finish_epoch event. @@ -283,7 +280,8 @@ function data_added(ss: SumStat, key: Key, result: Result) if ( check_thresholds(ss, key, result, 1.0) ) { threshold_crossed(ss, key, result); - event SumStats::cluster_threshold_crossed(ss$name, key, threshold_tracker[ss$name][key]); + Broker::publish(Cluster::worker_topic, SumStats::cluster_threshold_crossed, + ss$name, key, threshold_tracker[ss$name][key]); } } @@ -300,7 +298,8 @@ function handle_end_of_result_collection(uid: string, ss_name: string, key: Key, if ( check_thresholds(ss, key, ir, 1.0) ) { threshold_crossed(ss, key, ir); - event SumStats::cluster_threshold_crossed(ss_name, key, threshold_tracker[ss_name][key]); + Broker::publish(Cluster::worker_topic, SumStats::cluster_threshold_crossed, + ss_name, key, threshold_tracker[ss_name][key]); } if ( cleanup ) @@ -336,7 +335,8 @@ function request_all_current_keys(uid: string, ss_name: string, cleanup: bool) } done_with[uid] = 0; - event SumStats::cluster_get_result(uid, ss_name, key, cleanup); + Broker::publish(Cluster::worker_topic, SumStats::cluster_get_result, + uid, ss_name, key, cleanup); delete stats_keys[uid][key]; } else @@ -344,7 +344,8 @@ function request_all_current_keys(uid: string, ss_name: string, cleanup: bool) # Get more keys! And this breaks us out of the evented loop. done_with[uid] = 0; #print fmt("get_key by uid: %s", uid); - event SumStats::get_a_key(uid, ss_name, cleanup); + Broker::publish(Cluster::worker_topic, SumStats::get_a_key, + uid, ss_name, cleanup); } } @@ -469,7 +470,8 @@ event SumStats::cluster_key_intermediate_response(ss_name: string, key: Key) add outstanding_global_views[ss_name][uid]; done_with[uid] = 0; #print fmt("requesting results for: %s", uid); - event SumStats::cluster_get_result(uid, ss_name, key, F); + Broker::publish(Cluster::worker_topic, SumStats::cluster_get_result, + uid, ss_name, key, F); } function request_key(ss_name: string, key: Key): Result @@ -479,7 +481,8 @@ function request_key(ss_name: string, key: Key): Result key_requests[uid] = table(); add dynamic_requests[uid]; - event SumStats::cluster_get_result(uid, ss_name, key, F); + Broker::publish(Cluster::worker_topic, SumStats::cluster_get_result, + uid, ss_name, key, F); return when [uid, ss_name, key] ( uid in done_with && Cluster::get_active_node_count(Cluster::WORKER) == done_with[uid] ) { From 44c4a91cc8e80e1e56fc9e04c12ce2cd174d6d6e Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Wed, 6 Nov 2024 13:34:46 +0100 Subject: [PATCH 10/16] ssl/validate-certs: Remove Broker::auto_publish() --- scripts/policy/protocols/ssl/validate-certs.zeek | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/scripts/policy/protocols/ssl/validate-certs.zeek b/scripts/policy/protocols/ssl/validate-certs.zeek index 4d23c8e02d..9c1c54b185 100644 --- a/scripts/policy/protocols/ssl/validate-certs.zeek +++ b/scripts/policy/protocols/ssl/validate-certs.zeek @@ -61,39 +61,27 @@ export { global intermediate_cache: table[string] of vector of opaque of x509; -@if ( Cluster::is_enabled() ) -event zeek_init() - { - Broker::auto_publish(Cluster::worker_topic, SSL::intermediate_add); - Broker::auto_publish(Cluster::manager_topic, SSL::new_intermediate); - } -@endif - function add_to_cache(key: string, value: vector of opaque of x509) { intermediate_cache[key] = value; @if ( Cluster::is_enabled() ) - event SSL::new_intermediate(key, value); + Broker::publish(Cluster::manager_topic, SSL::new_intermediate, key, value); @endif } -@if ( Cluster::is_enabled() && Cluster::local_node_type() != Cluster::MANAGER ) event SSL::intermediate_add(key: string, value: vector of opaque of x509) { intermediate_cache[key] = value; } -@endif -@if ( Cluster::is_enabled() && Cluster::local_node_type() == Cluster::MANAGER ) event SSL::new_intermediate(key: string, value: vector of opaque of x509) { if ( key in intermediate_cache ) return; intermediate_cache[key] = value; - event SSL::intermediate_add(key, value); + Broker::publish(Cluster::worker_topic, SSL::intermediate_add, key, value); } -@endif function cache_validate(chain: vector of opaque of x509): X509::Result { From 6aca4d1dc75a932b28cdbc1753e3580a6cfd0311 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Wed, 6 Nov 2024 15:07:14 +0100 Subject: [PATCH 11/16] catch-and-release: Remove Broker::auto_publish() --- .../netcontrol/catch-and-release.zeek | 49 +++++++------------ 1 file changed, 17 insertions(+), 32 deletions(-) diff --git a/scripts/policy/frameworks/netcontrol/catch-and-release.zeek b/scripts/policy/frameworks/netcontrol/catch-and-release.zeek index 18a5f1eef7..1996188d49 100644 --- a/scripts/policy/frameworks/netcontrol/catch-and-release.zeek +++ b/scripts/policy/frameworks/netcontrol/catch-and-release.zeek @@ -226,26 +226,6 @@ global blocks: table[addr] of BlockInfo = {} &create_expire=0secs &expire_func=per_block_interval; - -@if ( Cluster::is_enabled() ) - -@if ( Cluster::local_node_type() == Cluster::MANAGER ) -event zeek_init() - { - Broker::auto_publish(Cluster::worker_topic, NetControl::catch_release_block_new); - Broker::auto_publish(Cluster::worker_topic, NetControl::catch_release_block_delete); - } -@else -event zeek_init() - { - Broker::auto_publish(Cluster::manager_topic, NetControl::catch_release_add); - Broker::auto_publish(Cluster::manager_topic, NetControl::catch_release_delete); - Broker::auto_publish(Cluster::manager_topic, NetControl::catch_release_encountered); - } -@endif - -@endif - function cr_check_rule(r: Rule): bool &is_used { if ( r$ty == DROP && r$entity$ty == ADDRESS ) @@ -397,14 +377,18 @@ function drop_address_catch_release(a: addr, location: string &default=""): Bloc log$message = "Address already blocked outside of catch-and-release. Catch and release will monitor and only actively block if it appears in network traffic."; Log::write(CATCH_RELEASE, log); blocks[a] = bi; - event NetControl::catch_release_block_new(a, bi); +@if ( Cluster::is_enabled() ) + Broker::publish(Cluster::worker_topic, NetControl::catch_release_block_new, a, bi); @endif +@endif + @if ( Cluster::is_enabled() && Cluster::local_node_type() != Cluster::MANAGER ) - event NetControl::catch_release_add(a, location); + Broker::publish(Cluster::manager_topic, NetControl::catch_release_add, a, location); @endif return bi; } + # No entry in blocks. local block_interval = catch_release_intervals[0]; @if ( ! Cluster::is_enabled() || ( Cluster::is_enabled() && Cluster::local_node_type() == Cluster::MANAGER ) ) @@ -416,8 +400,9 @@ function drop_address_catch_release(a: addr, location: string &default=""): Bloc if ( location != "" ) bi$location = location; blocks[a] = bi; - event NetControl::catch_release_block_new(a, bi); - blocks[a] = bi; +@if ( Cluster::is_enabled() ) + Broker::publish(Cluster::worker_topic, NetControl::catch_release_block_new, a, bi); +@endif log = populate_log_record(a, bi, DROP_REQUESTED); Log::write(CATCH_RELEASE, log); return bi; @@ -428,7 +413,7 @@ function drop_address_catch_release(a: addr, location: string &default=""): Bloc @if ( Cluster::is_enabled() && Cluster::local_node_type() != Cluster::MANAGER ) bi = BlockInfo($watch_until=network_time()+catch_release_intervals[1], $block_until=network_time()+block_interval, $current_interval=0, $current_block_id=""); - event NetControl::catch_release_add(a, location); + Broker::publish(Cluster::manager_topic, NetControl::catch_release_add, a, location); return bi; @endif @@ -450,10 +435,10 @@ function unblock_address_catch_release(a: addr, reason: string &default=""): boo remove_rule(bi$current_block_id, reason); @endif @if ( Cluster::is_enabled() && Cluster::local_node_type() == Cluster::MANAGER ) - event NetControl::catch_release_block_delete(a); + Broker::publish(Cluster::worker_topic, NetControl::catch_release_block_delete, a); @endif @if ( Cluster::is_enabled() && Cluster::local_node_type() != Cluster::MANAGER ) - event NetControl::catch_release_delete(a, reason); + Broker::publish(Cluster::manager_topic, NetControl::catch_release_delete, a, reason); @endif return T; @@ -509,14 +494,14 @@ function catch_release_seen(a: addr) Log::write(CATCH_RELEASE, log); @endif @if ( Cluster::is_enabled() && Cluster::local_node_type() == Cluster::MANAGER ) - event NetControl::catch_release_block_new(a, bi); + Broker::publish(Cluster::worker_topic, NetControl::catch_release_block_new, a, bi); @endif @if ( Cluster::is_enabled() && Cluster::local_node_type() != Cluster::MANAGER ) - if ( a in catch_release_recently_notified ) - return; + if ( a in catch_release_recently_notified ) + return; - event NetControl::catch_release_encountered(a); - add catch_release_recently_notified[a]; + Broker::publish(Cluster::manager_topic, NetControl::catch_release_encountered, a); + add catch_release_recently_notified[a]; @endif return; From 927e9366533f13a67ed12087b8688fd05cdb1ceb Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Wed, 6 Nov 2024 15:27:10 +0100 Subject: [PATCH 12/16] frameworks/control: Remove Broker::auto_publish() --- .../policy/frameworks/control/controllee.zeek | 25 ++++++++----------- 1 file changed, 10 insertions(+), 15 deletions(-) diff --git a/scripts/policy/frameworks/control/controllee.zeek b/scripts/policy/frameworks/control/controllee.zeek index 13e6caeb92..0fa7a4495f 100644 --- a/scripts/policy/frameworks/control/controllee.zeek +++ b/scripts/policy/frameworks/control/controllee.zeek @@ -15,16 +15,6 @@ module Control; event zeek_init() &priority=-10 { Broker::subscribe(Control::topic_prefix + "/" + Broker::node_id()); - Broker::auto_publish(Control::topic_prefix + "/id_value_response", - Control::id_value_response); - Broker::auto_publish(Control::topic_prefix + "/peer_status_response", - Control::peer_status_response); - Broker::auto_publish(Control::topic_prefix + "/net_stats_response", - Control::net_stats_response); - Broker::auto_publish(Control::topic_prefix + "/configuration_update_response", - Control::configuration_update_response); - Broker::auto_publish(Control::topic_prefix + "/shutdown_response", - Control::shutdown_response); if ( Control::controllee_listen ) Broker::listen(); @@ -33,7 +23,8 @@ event zeek_init() &priority=-10 event Control::id_value_request(id: string) { local val = lookup_ID(id); - event Control::id_value_response(id, fmt("%s", val)); + local reply_topic = Control::topic_prefix + "/id_value_response"; + Broker::publish(reply_topic, Control::id_value_response, id, fmt("%s", val)); } event Control::peer_status_request() @@ -53,7 +44,8 @@ event Control::peer_status_request() bpeer$status); } - event Control::peer_status_response(status); + local topic = Control::topic_prefix + "/peer_status_response"; + Broker::publish(topic, Control::peer_status_response, status); } event Control::net_stats_request() @@ -61,7 +53,8 @@ event Control::net_stats_request() local ns = get_net_stats(); local reply = fmt("%.6f recvd=%d dropped=%d link=%d\n", network_time(), ns$pkts_recvd, ns$pkts_dropped, ns$pkts_link); - event Control::net_stats_response(reply); + local topic = Control::topic_prefix + "/net_stats_response"; + Broker::publish(topic, Control::net_stats_response, reply); } event Control::configuration_update_request() @@ -73,13 +66,15 @@ event Control::configuration_update_request() # the configuration is going to be updated. This event could be handled # by other scripts if they need to do some ancillary processing if # redef-able consts are modified at runtime. - event Control::configuration_update_response(); + local topic = Control::topic_prefix + "/configuration_update_response"; + Broker::publish(topic, Control::configuration_update_response); } event Control::shutdown_request() { # Send the acknowledgement event. - event Control::shutdown_response(); + local topic = Control::topic_prefix + "/shutdown_response"; + Broker::publish(topic, Control::shutdown_response); # Schedule the shutdown to let the current event queue flush itself first. schedule 1sec { terminate_event() }; } From 455e05bc2e8bcf3efc632cd3831fa3ff0098ff3e Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Wed, 6 Nov 2024 16:49:00 +0100 Subject: [PATCH 13/16] btest: Remove Broker::auto_publish() usages The ones that seemed to test Broker::auto_publish() were annotated for removal. --- .../worker-1..stdout | 1 + testing/btest/bifs/hll_cluster.zeek | 7 +----- testing/btest/broker/connect-on-retry.zeek | 12 ++++------ .../btest/broker/event-group-interaction.zeek | 2 ++ testing/btest/broker/remote_event_auto.zeek | 2 ++ .../btest/broker/remote_event_auto_ts.zeek | 2 ++ testing/btest/broker/store/clone.zeek | 9 +++++--- testing/btest/broker/unpeer.zeek | 2 +- .../cluster/start-it-up-logger.zeek | 23 +++++-------------- .../base/frameworks/cluster/start-it-up.zeek | 9 +------- .../base/frameworks/config/basic_cluster.zeek | 7 ------ .../frameworks/config/cluster_resend.zeek | 11 ++++----- .../sumstats/manual-epoch-cluster.zeek | 15 +++--------- .../management/controller/agent-checkin.zeek | 5 +--- 14 files changed, 35 insertions(+), 72 deletions(-) diff --git a/testing/btest/Baseline/scripts.base.frameworks.config.cluster_resend/worker-1..stdout b/testing/btest/Baseline/scripts.base.frameworks.config.cluster_resend/worker-1..stdout index 478da86d89..cf5036302f 100644 --- a/testing/btest/Baseline/scripts.base.frameworks.config.cluster_resend/worker-1..stdout +++ b/testing/btest/Baseline/scripts.base.frameworks.config.cluster_resend/worker-1..stdout @@ -1,4 +1,5 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +option changed, testcount, 1, option changed, testport, 44/tcp, option changed, teststring, b, comment option changed, testcount, 1, diff --git a/testing/btest/bifs/hll_cluster.zeek b/testing/btest/bifs/hll_cluster.zeek index dae968ad66..d026d6850d 100644 --- a/testing/btest/bifs/hll_cluster.zeek +++ b/testing/btest/bifs/hll_cluster.zeek @@ -28,11 +28,6 @@ global hll_data: event(data: opaque of cardinality); @if ( Cluster::local_node_type() == Cluster::WORKER ) -event zeek_init() - { - Broker::auto_publish(Cluster::manager_topic, hll_data); - } - global runnumber: count &redef; # differentiate runs event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) @@ -84,7 +79,7 @@ event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) print hll_cardinality_estimate(c); } - event hll_data(c); + Broker::publish(Cluster::manager_topic, hll_data, c); } @endif diff --git a/testing/btest/broker/connect-on-retry.zeek b/testing/btest/broker/connect-on-retry.zeek index 7fe700a2cb..2db0bd5237 100644 --- a/testing/btest/broker/connect-on-retry.zeek +++ b/testing/btest/broker/connect-on-retry.zeek @@ -21,13 +21,12 @@ global ping: event(msg: string, c: count); event zeek_init() { Broker::subscribe("zeek/event/my_topic"); - Broker::auto_publish("zeek/event/my_topic", ping); Broker::peer("127.0.0.1", to_port(getenv("BROKER_PORT"))); } function send_event() { - event ping("my-message", ++event_count); + Broker::publish("zeek/event/my_topic", ping, "my-message", ++event_count); } event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) @@ -63,15 +62,14 @@ global auto_handler: event(msg: string, c: count); global pong: event(msg: string, c: count); event delayed_listen() - { + { Broker::listen("127.0.0.1", to_port(getenv("BROKER_PORT"))); - } + } event zeek_init() { Broker::subscribe("zeek/event/my_topic"); - Broker::auto_publish("zeek/event/my_topic", pong); - schedule 5secs { delayed_listen() }; + schedule 5secs { delayed_listen() }; } event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) @@ -96,7 +94,7 @@ event ping(msg: string, n: count) return; } - event pong(msg, n); + Broker::publish("zeek/event/my_topic", pong, msg, n); } @TEST-END-FILE diff --git a/testing/btest/broker/event-group-interaction.zeek b/testing/btest/broker/event-group-interaction.zeek index e62ca9f1fa..04ec380e80 100644 --- a/testing/btest/broker/event-group-interaction.zeek +++ b/testing/btest/broker/event-group-interaction.zeek @@ -1,5 +1,7 @@ # @TEST-DOC: Disabling an unrelated event group caused auto-publish to break because the remote event had no bodies and got disabled. This is a regression test it's not being done again. # +# Remove in v8.1 when auto_publish() is removed. +# # @TEST-GROUP: broker # # @TEST-PORT: BROKER_PORT diff --git a/testing/btest/broker/remote_event_auto.zeek b/testing/btest/broker/remote_event_auto.zeek index 264f131708..429a11c07a 100644 --- a/testing/btest/broker/remote_event_auto.zeek +++ b/testing/btest/broker/remote_event_auto.zeek @@ -1,3 +1,5 @@ +# Remove in v8.1 when auto_publish() is gone. +# # @TEST-GROUP: broker # # @TEST-PORT: BROKER_PORT diff --git a/testing/btest/broker/remote_event_auto_ts.zeek b/testing/btest/broker/remote_event_auto_ts.zeek index fe2b24f870..49b26d846b 100644 --- a/testing/btest/broker/remote_event_auto_ts.zeek +++ b/testing/btest/broker/remote_event_auto_ts.zeek @@ -1,3 +1,5 @@ +# Remove in v8.1 when auto_publish() is gone. +# # Not compatible with -O C++ testing since includes two distinct scripts. # @TEST-REQUIRES: test "${ZEEK_USE_CPP}" != "1" # diff --git a/testing/btest/broker/store/clone.zeek b/testing/btest/broker/store/clone.zeek index d2e8dbad25..b53b17e303 100644 --- a/testing/btest/broker/store/clone.zeek +++ b/testing/btest/broker/store/clone.zeek @@ -30,6 +30,7 @@ function print_index(k: any) event done() { + Broker::publish("zeek/events", done); terminate(); } @@ -50,7 +51,6 @@ event inserted() event zeek_init() { - Broker::auto_publish("zeek/events", done); Broker::subscribe("zeek/"); h = Broker::create_master("test"); @@ -84,7 +84,11 @@ global query_timeout = 1sec; global h: opaque of Broker::Store; -global inserted: event(); +event inserted() + { + # Propagate inserted() + Broker::publish("zeek/events", inserted); + } function print_index(k: any) { @@ -131,7 +135,6 @@ event lookup(stage: count) event zeek_init() { - Broker::auto_publish("zeek/events", inserted); Broker::subscribe("zeek/"); Broker::listen("127.0.0.1", to_port(getenv("BROKER_PORT"))); } diff --git a/testing/btest/broker/unpeer.zeek b/testing/btest/broker/unpeer.zeek index 6cb3cc9a8c..eee693d9d9 100644 --- a/testing/btest/broker/unpeer.zeek +++ b/testing/btest/broker/unpeer.zeek @@ -26,6 +26,7 @@ event do_terminate() event print_something(i: int) { print "Something sender", i; + Broker::publish("zeek/event/my_topic", print_something, i); } event unpeer(endpoint: Broker::EndpointInfo) @@ -39,7 +40,6 @@ event unpeer(endpoint: Broker::EndpointInfo) event zeek_init() { Broker::subscribe("zeek/event/my_topic"); - Broker::auto_publish("zeek/event/my_topic", print_something); Broker::peer("127.0.0.1", to_port(getenv("BROKER_PORT"))); } diff --git a/testing/btest/scripts/base/frameworks/cluster/start-it-up-logger.zeek b/testing/btest/scripts/base/frameworks/cluster/start-it-up-logger.zeek index 7a23b37ede..43c8ecbf07 100644 --- a/testing/btest/scripts/base/frameworks/cluster/start-it-up-logger.zeek +++ b/testing/btest/scripts/base/frameworks/cluster/start-it-up-logger.zeek @@ -51,15 +51,6 @@ event fully_connected(n: string) terminate(); } } - else - { - print "sent fully_connected event"; - } - } - -event zeek_init() - { - Broker::auto_publish(Cluster::logger_topic, fully_connected); } event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) @@ -74,16 +65,14 @@ event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) print "termination condition met: shutting down"; terminate(); } + return; } - else if ( Cluster::node == "manager-1" ) + + local expected_nodes = Cluster::node == "manager-1" ? 5 : 4; + if ( peer_count == expected_nodes ) { - if ( peer_count == 5 ) - event fully_connected(Cluster::node); - } - else - { - if ( peer_count == 4 ) - event fully_connected(Cluster::node); + Broker::publish(Cluster::logger_topic, fully_connected, Cluster::node); + print "sent fully_connected event"; } } diff --git a/testing/btest/scripts/base/frameworks/cluster/start-it-up.zeek b/testing/btest/scripts/base/frameworks/cluster/start-it-up.zeek index aa7b624db9..91657d1f4c 100644 --- a/testing/btest/scripts/base/frameworks/cluster/start-it-up.zeek +++ b/testing/btest/scripts/base/frameworks/cluster/start-it-up.zeek @@ -28,8 +28,6 @@ redef Cluster::nodes = { }; @TEST-END-FILE -global fully_connected: event(); - global peer_count = 0; global fully_connected_nodes = 0; @@ -49,11 +47,6 @@ event fully_connected() } } -event zeek_init() - { - Broker::auto_publish(Cluster::manager_topic, fully_connected); - } - event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) { print "Connected to a peer"; @@ -67,7 +60,7 @@ event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) else { if ( peer_count == 3 ) - event fully_connected(); + Broker::publish(Cluster::manager_topic, fully_connected); } } diff --git a/testing/btest/scripts/base/frameworks/config/basic_cluster.zeek b/testing/btest/scripts/base/frameworks/config/basic_cluster.zeek index 2f30081c71..000efdb594 100644 --- a/testing/btest/scripts/base/frameworks/config/basic_cluster.zeek +++ b/testing/btest/scripts/base/frameworks/config/basic_cluster.zeek @@ -36,13 +36,6 @@ event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) terminate(); } -global ready_for_data: event(); - -event zeek_init() - { - Broker::auto_publish(Cluster::worker_topic, ready_for_data); - } - @if ( Cluster::node == "worker-1" ) event Cluster::Experimental::cluster_started() { diff --git a/testing/btest/scripts/base/frameworks/config/cluster_resend.zeek b/testing/btest/scripts/base/frameworks/config/cluster_resend.zeek index 5e8f48409c..c397e5bdeb 100644 --- a/testing/btest/scripts/base/frameworks/config/cluster_resend.zeek +++ b/testing/btest/scripts/base/frameworks/config/cluster_resend.zeek @@ -38,10 +38,6 @@ global n = 0; event ready_for_data() { -@if ( Cluster::node == "manager-1" ) - Config::set_value("testcount", 1); -@endif - @if ( Cluster::node == "worker-1" ) Config::set_value("testport", 44/tcp); Config::set_value("teststring", "b", "comment"); @@ -66,7 +62,6 @@ function option_changed(ID: string, new_value: any, location: string): any event zeek_init() &priority=5 { - Broker::auto_publish(Cluster::worker_topic, ready_for_data); Option::set_change_handler("testport", option_changed, -100); Option::set_change_handler("teststring", option_changed, -100); Option::set_change_handler("testcount", option_changed, -100); @@ -79,9 +74,11 @@ event Cluster::node_up(name: string, id: string) &priority=-5 { ++peer_count; if ( peer_count == 1 ) - event ready_for_data(); + { + Config::set_value("testcount", 1); + Broker::publish(Cluster::worker_topic, ready_for_data); + } } - @endif event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) diff --git a/testing/btest/scripts/base/frameworks/sumstats/manual-epoch-cluster.zeek b/testing/btest/scripts/base/frameworks/sumstats/manual-epoch-cluster.zeek index 86e1e4ae39..0ec2efb312 100644 --- a/testing/btest/scripts/base/frameworks/sumstats/manual-epoch-cluster.zeek +++ b/testing/btest/scripts/base/frameworks/sumstats/manual-epoch-cluster.zeek @@ -50,9 +50,7 @@ event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) terminate(); } -global ready_for_data: event(); - -event ready_for_data() +event ready_for_data() &is_used { if ( Cluster::node == "worker-1" ) { @@ -74,8 +72,6 @@ event ready_for_data() SumStats::observe("test", [$host=7.2.1.5], [$num=91]); SumStats::observe("test", [$host=10.10.10.10], [$num=5]); } - - did_data = T; } @if ( Cluster::local_node_type() == Cluster::MANAGER ) @@ -91,7 +87,8 @@ event second_test() event send_ready_for_data() { print "Sending ready for data"; - event ready_for_data(); + Broker::publish(Cluster::worker_topic, ready_for_data); + did_data = T; } @@ -104,10 +101,4 @@ event Cluster::Experimental::cluster_started() schedule 5secs { send_ready_for_data() }; schedule 10secs { second_test() }; } - -event zeek_init() &priority=100 - { - Broker::auto_publish(Cluster::worker_topic, ready_for_data); - } - @endif diff --git a/testing/btest/scripts/policy/frameworks/management/controller/agent-checkin.zeek b/testing/btest/scripts/policy/frameworks/management/controller/agent-checkin.zeek index be0881c6ad..26f049a6e6 100644 --- a/testing/btest/scripts/policy/frameworks/management/controller/agent-checkin.zeek +++ b/testing/btest/scripts/policy/frameworks/management/controller/agent-checkin.zeek @@ -34,10 +34,7 @@ event zeek_init() # notify_agent_hello event has arrived. The controller doesn't normally # talk to the supervisor, so connect to it. if ( Management::role == Management::CONTROLLER ) - { Broker::peer(getenv("ZEEK_DEFAULT_LISTEN_ADDRESS"), Broker::default_port, Broker::default_listen_retry); - Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::stop_request); - } } event Management::Agent::API::notify_agent_hello(instance: string, id: string, connecting: bool, api_version: count) @@ -50,7 +47,7 @@ event Management::Agent::API::notify_agent_hello(instance: string, id: string, c logged = T; # This takes down the whole process tree. - event SupervisorControl::stop_request(); + Broker::publish(SupervisorControl::topic_prefix, SupervisorControl::stop_request); } } From 6abb9d7edab1b86b37d99bd5cec2f3bcd59f2fbf Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Wed, 6 Nov 2024 15:15:04 +0100 Subject: [PATCH 14/16] broker/Eventhandler: Deprecate Broker::auto_publish() for v8.1 Relates to #3637 --- NEWS | 4 ++++ scripts/base/frameworks/broker/main.zeek | 4 ++-- src/EventHandler.h | 10 ++++++++-- src/broker/Manager.cc | 6 ++++++ src/broker/Manager.h | 4 +++- src/broker/messaging.bif | 6 ++++++ 6 files changed, 29 insertions(+), 5 deletions(-) diff --git a/NEWS b/NEWS index 0098c11e95..c4b4e7dc00 100644 --- a/NEWS +++ b/NEWS @@ -109,6 +109,10 @@ Removed Functionality Deprecated Functionality ------------------------ +* The ``Broker::auto_publish()`` function has been deprecated and should + be replaced with explicit ``Broker::publish()`` invocations that are + potentially guarded with appropriate ``@if`` or ``@ifdef`` directives. + Zeek 7.0.0 ========== diff --git a/scripts/base/frameworks/broker/main.zeek b/scripts/base/frameworks/broker/main.zeek index 0dd61f9139..d41f64ab2e 100644 --- a/scripts/base/frameworks/broker/main.zeek +++ b/scripts/base/frameworks/broker/main.zeek @@ -395,7 +395,7 @@ export { ## ev: a Zeek event value. ## ## Returns: true if automatic event sending is now enabled. - global auto_publish: function(topic: string, ev: any): bool; + global auto_publish: function(topic: string, ev: any): bool &deprecated="Remove in v8.1. Switch to explicit Broker::publish() calls. Auto-publish won't work with all cluster backends."; ## Stop automatically sending an event to peers upon local dispatch. ## @@ -405,7 +405,7 @@ export { ## ## Returns: true if automatic events will not occur for the topic/event ## pair. - global auto_unpublish: function(topic: string, ev: any): bool; + global auto_unpublish: function(topic: string, ev: any): bool &deprecated="Remove in v8.1. See Broker::auto_publish()"; } @load base/bif/comm.bif diff --git a/src/EventHandler.h b/src/EventHandler.h index 0fbd5c1282..9291bda809 100644 --- a/src/EventHandler.h +++ b/src/EventHandler.h @@ -35,9 +35,15 @@ public: void SetFunc(FuncPtr f); - void AutoPublish(std::string topic) { auto_publish.insert(std::move(topic)); } + [[deprecated("Remove in v8.1, use explicit Publish().")]] + void AutoPublish(std::string topic) { + auto_publish.insert(std::move(topic)); + } - void AutoUnpublish(const std::string& topic) { auto_publish.erase(topic); } + [[deprecated("Remove in v8.1.")]] + void AutoUnpublish(const std::string& topic) { + auto_publish.erase(topic); + } void Call(zeek::Args* vl, bool no_remote = false, double ts = run_state::network_time); diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index 44c6c407a0..270f57c4c1 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -811,7 +811,10 @@ bool Manager::AutoPublishEvent(string topic, Val* event) { } DBG_LOG(DBG_BROKER, "Enabling auto-publishing of event %s to topic %s", handler->Name(), topic.c_str()); +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wdeprecated-declarations" handler->AutoPublish(std::move(topic)); +#pragma GCC diagnostic pop return true; } @@ -837,7 +840,10 @@ bool Manager::AutoUnpublishEvent(const string& topic, Val* event) { } DBG_LOG(DBG_BROKER, "Disabling auto-publishing of event %s to topic %s", handler->Name(), topic.c_str()); +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wdeprecated-declarations" handler->AutoUnpublish(topic); +#pragma GCC diagnostic pop return true; } diff --git a/src/broker/Manager.h b/src/broker/Manager.h index 412fa764ae..297d7a5f05 100644 --- a/src/broker/Manager.h +++ b/src/broker/Manager.h @@ -243,6 +243,7 @@ public: * @param event a Zeek event value. * @return true if automatic event sending is now enabled. */ + [[deprecated("Remove in v8.1, use explicit Publish().")]] bool AutoPublishEvent(std::string topic, Val* event); /** @@ -251,6 +252,7 @@ public: * @param event an event originally given to zeek::Broker::Manager::AutoPublish(). * @return true if automatic events will no occur for the topic/event pair. */ + [[deprecated("Remove in v8.1.")]] bool AutoUnpublishEvent(const std::string& topic, Val* event); /** @@ -483,7 +485,7 @@ private: telemetry::CounterPtr num_logs_outgoing_metric; telemetry::CounterPtr num_ids_incoming_metric; telemetry::CounterPtr num_ids_outgoing_metric; -}; +}; // namespace zeek } // namespace Broker diff --git a/src/broker/messaging.bif b/src/broker/messaging.bif index 92bb625dd8..632cf06ce5 100644 --- a/src/broker/messaging.bif +++ b/src/broker/messaging.bif @@ -136,14 +136,20 @@ function Broker::__publish_id%(topic: string, id: string%): bool function Broker::__auto_publish%(topic: string, ev: any%): bool %{ zeek::Broker::Manager::ScriptScopeGuard ssg; +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wdeprecated-declarations" auto rval = zeek::broker_mgr->AutoPublishEvent(topic->CheckString(), ev); +#pragma GCC diagnostic pop return zeek::val_mgr->Bool(rval); %} function Broker::__auto_unpublish%(topic: string, ev: any%): bool %{ zeek::Broker::Manager::ScriptScopeGuard ssg; +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wdeprecated-declarations" auto rval = zeek::broker_mgr->AutoUnpublishEvent(topic->CheckString(), ev); +#pragma GCC diagnostic pop return zeek::val_mgr->Bool(rval); %} From 831614f907b094b242b02d1ed370ea80818adcac Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Tue, 12 Nov 2024 13:26:38 +0100 Subject: [PATCH 15/16] broker/Publish: Use event time instead of network time Discussed with @J-Gras, calling Broker::publish() within a scheduled should use the "intended timestamp" implicitly. This is subtle, but supposedly more expected when running a pcap replay cluster. --- NEWS | 5 + src/broker/Manager.cc | 8 +- .../recv.recv.out | 11 +++ .../send.send.out | 43 +++++++++ .../broker/remote_event_schedule_ts.zeek | 92 +++++++++++++++++++ .../broker/web-socket-events-metadata.zeek | 10 +- 6 files changed, 163 insertions(+), 6 deletions(-) create mode 100644 testing/btest/Baseline/broker.remote_event_schedule_ts/recv.recv.out create mode 100644 testing/btest/Baseline/broker.remote_event_schedule_ts/send.send.out create mode 100644 testing/btest/broker/remote_event_schedule_ts.zeek diff --git a/NEWS b/NEWS index c4b4e7dc00..f985ea09d3 100644 --- a/NEWS +++ b/NEWS @@ -103,6 +103,11 @@ Changed Functionality of a node are scraped via the Prometheus HTTP endpoint, or one of the collect methods is invoked from Zeek script. +* Calling ``Broker::publish()`` now uses the event time of the currently + executing event as network time metadata attached to the remote event. + Previously, ``network_time()`` was used. This matters if ``Broker::publish()`` + is called within scheduled events or called within remote events. + Removed Functionality --------------------- diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index 270f57c4c1..7b6178187b 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -12,6 +12,7 @@ #include "zeek/DebugLogger.h" #include "zeek/Desc.h" +#include "zeek/Event.h" #include "zeek/Func.h" #include "zeek/IntrusivePtr.h" #include "zeek/Reporter.h" @@ -580,9 +581,10 @@ bool Manager::PublishEvent(string topic, RecordVal* args) { } // At this point we come from script-land. This means that publishing of the event was - // explicitly triggered. Hence, the timestamp is set to the current network time. This also - // means that timestamping cannot be manipulated from script-land for now. - return PublishEvent(std::move(topic), event_name, std::move(xs), run_state::network_time); + // explicitly triggered. Hence, the timestamp is set to the current event's time. This + // also means that timestamping cannot be manipulated from script-land for now. + auto ts = event_mgr.CurrentEventTime(); + return PublishEvent(std::move(topic), event_name, std::move(xs), ts); } bool Manager::PublishIdentifier(std::string topic, std::string id) { diff --git a/testing/btest/Baseline/broker.remote_event_schedule_ts/recv.recv.out b/testing/btest/Baseline/broker.remote_event_schedule_ts/recv.recv.out new file mode 100644 index 0000000000..60cebba91f --- /dev/null +++ b/testing/btest/Baseline/broker.remote_event_schedule_ts/recv.recv.out @@ -0,0 +1,11 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +receiver added peer: endpoint=127.0.0.1 msg=handshake successful +receiver got ping: my-message-a intended for 1989-12-12-22:00:00 published at 1989-12-12-22:00:00 stamped to 1989-12-12-22:00:00 (is_remote = T, intended_equals_stamped=T) +receiver got ping: my-message-b intended for 1989-12-12-22:15:00 published at 1989-12-12-23:00:00 stamped to 1989-12-12-22:15:00 (is_remote = T, intended_equals_stamped=T) +receiver got ping: my-message-c intended for 1989-12-12-22:30:00 published at 1989-12-12-23:00:00 stamped to 1989-12-12-22:30:00 (is_remote = T, intended_equals_stamped=T) +receiver got ping: my-message-a intended for 1989-12-12-23:00:00 published at 1989-12-12-23:00:00 stamped to 1989-12-12-23:00:00 (is_remote = T, intended_equals_stamped=T) +receiver got ping: my-message-b intended for 1989-12-12-23:15:00 published at 1989-12-13-00:00:00 stamped to 1989-12-12-23:15:00 (is_remote = T, intended_equals_stamped=T) +receiver got ping: my-message-c intended for 1989-12-12-23:30:00 published at 1989-12-13-00:00:00 stamped to 1989-12-12-23:30:00 (is_remote = T, intended_equals_stamped=T) +receiver got ping: my-message-a intended for 1989-12-13-00:00:00 published at 1989-12-13-00:00:00 stamped to 1989-12-13-00:00:00 (is_remote = T, intended_equals_stamped=T) +receiver got ping: my-message-b intended for 1989-12-13-00:15:00 published at 1989-12-13-01:00:00 stamped to 1989-12-13-00:15:00 (is_remote = T, intended_equals_stamped=T) +receiver got ping: my-message-c intended for 1989-12-13-00:30:00 published at 1989-12-13-01:00:00 stamped to 1989-12-13-00:30:00 (is_remote = T, intended_equals_stamped=T) diff --git a/testing/btest/Baseline/broker.remote_event_schedule_ts/send.send.out b/testing/btest/Baseline/broker.remote_event_schedule_ts/send.send.out new file mode 100644 index 0000000000..68fdce6f5b --- /dev/null +++ b/testing/btest/Baseline/broker.remote_event_schedule_ts/send.send.out @@ -0,0 +1,43 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +sender added peer: endpoint=127.0.0.1 msg=handshake successful +>> Run 1 (1989-12-12-22:00:00) +>>> Publish my-message-a intended for 1989-12-12-22:00:00 (current_event_time=1989-12-12-22:00:00, network_time=1989-12-12-22:00:00) +>>> Publish my-message-b intended for 1989-12-12-22:15:00 (current_event_time=1989-12-12-22:15:00, network_time=1989-12-12-23:00:00) +>>> Publish my-message-c intended for 1989-12-12-22:30:00 (current_event_time=1989-12-12-22:30:00, network_time=1989-12-12-23:00:00) +>> Run 2 (1989-12-12-23:00:00) +>>> Publish my-message-a intended for 1989-12-12-23:00:00 (current_event_time=1989-12-12-23:00:00, network_time=1989-12-12-23:00:00) +>>> Publish my-message-b intended for 1989-12-12-23:15:00 (current_event_time=1989-12-12-23:15:00, network_time=1989-12-13-00:00:00) +>>> Publish my-message-c intended for 1989-12-12-23:30:00 (current_event_time=1989-12-12-23:30:00, network_time=1989-12-13-00:00:00) +>> Run 3 (1989-12-13-00:00:00) +>>> Publish my-message-a intended for 1989-12-13-00:00:00 (current_event_time=1989-12-13-00:00:00, network_time=1989-12-13-00:00:00) +>>> Publish my-message-b intended for 1989-12-13-00:15:00 (current_event_time=1989-12-13-00:15:00, network_time=1989-12-13-01:00:00) +>>> Publish my-message-c intended for 1989-12-13-00:30:00 (current_event_time=1989-12-13-00:30:00, network_time=1989-12-13-01:00:00) +>> Run 4 (1989-12-13-01:00:00) +>>> Publish my-message-a intended for 1989-12-13-01:00:00 (current_event_time=1989-12-13-01:00:00, network_time=1989-12-13-01:00:00) +>>> Publish my-message-b intended for 1989-12-13-01:15:00 (current_event_time=1989-12-13-01:15:00, network_time=1989-12-13-02:00:00) +>>> Publish my-message-c intended for 1989-12-13-01:30:00 (current_event_time=1989-12-13-01:30:00, network_time=1989-12-13-02:00:00) +>> Run 5 (1989-12-13-02:00:00) +>>> Publish my-message-a intended for 1989-12-13-02:00:00 (current_event_time=1989-12-13-02:00:00, network_time=1989-12-13-02:00:00) +>>> Publish my-message-b intended for 1989-12-13-02:15:00 (current_event_time=1989-12-13-02:15:00, network_time=1989-12-13-03:00:00) +>>> Publish my-message-c intended for 1989-12-13-02:30:00 (current_event_time=1989-12-13-02:30:00, network_time=1989-12-13-03:00:00) +>> Run 6 (1989-12-13-03:00:00) +>>> Publish my-message-a intended for 1989-12-13-03:00:00 (current_event_time=1989-12-13-03:00:00, network_time=1989-12-13-03:00:00) +>>> Publish my-message-b intended for 1989-12-13-03:15:00 (current_event_time=1989-12-13-03:15:00, network_time=1989-12-13-04:00:00) +>>> Publish my-message-c intended for 1989-12-13-03:30:00 (current_event_time=1989-12-13-03:30:00, network_time=1989-12-13-04:00:00) +>> Run 7 (1989-12-13-04:00:00) +>>> Publish my-message-a intended for 1989-12-13-04:00:00 (current_event_time=1989-12-13-04:00:00, network_time=1989-12-13-04:00:00) +>>> Publish my-message-b intended for 1989-12-13-04:15:00 (current_event_time=1989-12-13-04:15:00, network_time=1989-12-13-05:00:00) +>>> Publish my-message-c intended for 1989-12-13-04:30:00 (current_event_time=1989-12-13-04:30:00, network_time=1989-12-13-05:00:00) +>> Run 8 (1989-12-13-05:00:00) +>>> Publish my-message-a intended for 1989-12-13-05:00:00 (current_event_time=1989-12-13-05:00:00, network_time=1989-12-13-05:00:00) +>>> Publish my-message-b intended for 1989-12-13-05:15:00 (current_event_time=1989-12-13-05:15:00, network_time=1989-12-13-06:00:00) +>>> Publish my-message-c intended for 1989-12-13-05:30:00 (current_event_time=1989-12-13-05:30:00, network_time=1989-12-13-06:00:00) +>> Run 9 (1989-12-13-06:00:00) +>>> Publish my-message-a intended for 1989-12-13-06:00:00 (current_event_time=1989-12-13-06:00:00, network_time=1989-12-13-06:00:00) +>>> Publish my-message-b intended for 1989-12-13-06:15:00 (current_event_time=1989-12-13-06:15:00, network_time=1989-12-13-07:00:00) +>>> Publish my-message-c intended for 1989-12-13-06:30:00 (current_event_time=1989-12-13-06:30:00, network_time=1989-12-13-07:00:00) +>> Run 10 (1989-12-13-07:00:00) +>>> Publish my-message-a intended for 1989-12-13-07:00:00 (current_event_time=1989-12-13-07:00:00, network_time=1989-12-13-07:00:00) +sender lost peer: endpoint=127.0.0.1 msg=lost connection to remote peer +>>> Publish my-message-b intended for 1989-12-13-07:15:00 (current_event_time=1989-12-13-07:15:00, network_time=1989-12-13-07:00:00) +>>> Publish my-message-c intended for 1989-12-13-07:30:00 (current_event_time=1989-12-13-07:30:00, network_time=1989-12-13-07:00:00) diff --git a/testing/btest/broker/remote_event_schedule_ts.zeek b/testing/btest/broker/remote_event_schedule_ts.zeek new file mode 100644 index 0000000000..fe90f2b0cb --- /dev/null +++ b/testing/btest/broker/remote_event_schedule_ts.zeek @@ -0,0 +1,92 @@ +# @TEST-DOC: Broker::publish() within a schedule event uses the intended timestamp, not the current network_time() +# +# Not compatible with -O C++ testing since includes two distinct scripts. +# @TEST-REQUIRES: test "${ZEEK_USE_CPP}" != "1" +# +# @TEST-GROUP: broker +# +# @TEST-PORT: BROKER_PORT +# +# @TEST-EXEC: btest-bg-run recv "zeek -b ../recv.zeek >recv.out" +# @TEST-EXEC: btest-bg-run send "zeek -b -r $TRACES/ticks-dns-1hr.pcap ../send.zeek >send.out" +# +# @TEST-EXEC: btest-bg-wait 5 +# @TEST-EXEC: btest-diff recv/recv.out +# @TEST-EXEC: btest-diff send/send.out + +# @TEST-START-FILE send.zeek + +redef exit_only_after_terminate = T; + +global runs = 0; +global ping: event(msg: string, intended_ts: time, publish_ts: time); + +event zeek_init() + { + suspend_processing(); + Broker::subscribe("zeek/event/my_topic"); + Broker::peer("127.0.0.1", to_port(getenv("BROKER_PORT"))); + } + +event send_ping(msg: string, intended_ts: time) + { + print fmt(">>> Publish %s intended for %D (current_event_time=%D, network_time=%D)", + msg, intended_ts, current_event_time(), network_time()); + Broker::publish("zeek/event/my_topic", ping, msg, intended_ts, network_time()); + } + +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) + { + print fmt("sender added peer: endpoint=%s msg=%s", + endpoint$network$address, msg); + continue_processing(); + } + +event new_connection(c: connection) + { + print fmt(">> Run %s (%D)", ++runs, network_time()); + + event send_ping("my-message-a", network_time()); + schedule 30 mins { send_ping("my-message-c", network_time() + 30 mins) }; + schedule 15 mins { send_ping("my-message-b", network_time() + 15 mins) }; + } + +event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) + { + print fmt("sender lost peer: endpoint=%s msg=%s", + endpoint$network$address, msg); + terminate(); + } + +# @TEST-END-FILE + + +# @TEST-START-FILE recv.zeek + +redef exit_only_after_terminate = T; + +global msg_count = 0; + +event zeek_init() + { + Broker::subscribe("zeek/event/my_topic"); + Broker::listen("127.0.0.1", to_port(getenv("BROKER_PORT"))); + } + +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) + { + print fmt("receiver added peer: endpoint=%s msg=%s", endpoint$network$address, msg); + } + +event ping(msg: string, intended_ts: time, publish_ts: time) &is_used + { + if ( ++msg_count >= 10 ) + { + terminate(); + return; + } + + print fmt("receiver got ping: %s intended for %D published at %D stamped to %D (is_remote = %s, intended_equals_stamped=%s)", + msg, intended_ts, publish_ts, current_event_time(), is_remote_event(), intended_ts == current_event_time()); + } +# @TEST-END-FILE diff --git a/testing/btest/broker/web-socket-events-metadata.zeek b/testing/btest/broker/web-socket-events-metadata.zeek index 52b1c8ab3b..33477b4e7e 100644 --- a/testing/btest/broker/web-socket-events-metadata.zeek +++ b/testing/btest/broker/web-socket-events-metadata.zeek @@ -29,7 +29,7 @@ event zeek_init() Broker::listen_websocket("127.0.0.1", to_port(getenv("BROKER_PORT"))); } -function send_event() +event send_event() { ++event_count; local e = Broker::make_event(ping, "my-message", event_count); @@ -39,7 +39,7 @@ function send_event() event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) { print fmt("sender added peer: endpoint=%s msg=%s", endpoint$network$address, msg); - send_event(); + event send_event(); } event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) @@ -53,7 +53,11 @@ event pong(msg: string, n: count) &is_used print fmt("sender got pong: %s, %s network_time=%s current_event_time=%s", msg, n, network_time(), current_event_time()); set_network_time(network_time() + 10sec); - send_event(); + + # pong is a remote event and a Broker::publish() would take + # current_event_time() as the network time for Broker::publish(), + # prevent this by queuing a new send_event(). + event send_event(); } From aabc4a4114b73e3bfaccc0c22c0f3394d0d44215 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Thu, 14 Nov 2024 12:08:35 +0100 Subject: [PATCH 16/16] sumstats: Remove copy() for Broker::publish() calls Serialization happens immediately at Broker::publish() time, there should be no caching issues. --- scripts/base/frameworks/sumstats/cluster.zeek | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/scripts/base/frameworks/sumstats/cluster.zeek b/scripts/base/frameworks/sumstats/cluster.zeek index c5d45e7a7d..3016b5bb42 100644 --- a/scripts/base/frameworks/sumstats/cluster.zeek +++ b/scripts/base/frameworks/sumstats/cluster.zeek @@ -151,12 +151,8 @@ event SumStats::cluster_get_result(uid: string, ss_name: string, key: Key, clean { if ( uid in sending_results && key in sending_results[uid] ) { - # XXX: Is that comment stale? - # - # Note: copy is needed to compensate serialization caching issue. This should be - # changed to something else later. Broker::publish(Cluster::manager_topic, SumStats::cluster_send_result, - uid, ss_name, key, copy(sending_results[uid][key]), cleanup); + uid, ss_name, key, sending_results[uid][key], cleanup); delete sending_results[uid][key]; } else @@ -171,10 +167,8 @@ event SumStats::cluster_get_result(uid: string, ss_name: string, key: Key, clean { if ( ss_name in result_store && key in result_store[ss_name] ) { - # Note: copy is needed to compensate serialization caching issue. This should be - # changed to something else later. Broker::publish(Cluster::manager_topic, SumStats::cluster_send_result, - uid, ss_name, key, copy(result_store[ss_name][key]), cleanup); + uid, ss_name, key, result_store[ss_name][key], cleanup); } else {