From 101060e194844c8c7f066aa568025520f17f2e51 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Thu, 25 Sep 2025 19:01:49 +0200 Subject: [PATCH] supervisor: Switch from Broker:: to Cluster:: --- scripts/base/frameworks/supervisor/main.zeek | 14 ++++++++------ .../canonified_loaded_scripts.log | 6 +++--- .../canonified_loaded_scripts.log | 6 +++--- testing/btest/Baseline/plugins.hooks/output | 18 ++++++++++++------ .../management/controller/agent-checkin.zeek | 1 + 5 files changed, 27 insertions(+), 18 deletions(-) diff --git a/scripts/base/frameworks/supervisor/main.zeek b/scripts/base/frameworks/supervisor/main.zeek index fbc7cf1009..2b6f0ed86a 100644 --- a/scripts/base/frameworks/supervisor/main.zeek +++ b/scripts/base/frameworks/supervisor/main.zeek @@ -1,6 +1,8 @@ ##! Implements Zeek process supervision API and default behavior for its ##! associated (remote) control events. +@load base/frameworks/cluster/pubsub + @load ./api @load ./control @@ -49,7 +51,7 @@ event zeek_init() &priority=10 Broker::listen(); } - Broker::subscribe(SupervisorControl::topic_prefix); + Cluster::subscribe(SupervisorControl::topic_prefix); } event SupervisorControl::stop_request() @@ -67,7 +69,7 @@ event SupervisorControl::status_request(reqid: string, node: string) local res = Supervisor::status(node); local topic = SupervisorControl::topic_prefix + fmt("/status_response/%s", reqid); - Broker::publish(topic, SupervisorControl::status_response, reqid, res); + Cluster::publish(topic, SupervisorControl::status_response, reqid, res); } event SupervisorControl::create_request(reqid: string, node: Supervisor::NodeConfig) @@ -77,7 +79,7 @@ event SupervisorControl::create_request(reqid: string, node: Supervisor::NodeCon local res = Supervisor::create(node); local topic = SupervisorControl::topic_prefix + fmt("/create_response/%s", reqid); - Broker::publish(topic, SupervisorControl::create_response, reqid, res); + Cluster::publish(topic, SupervisorControl::create_response, reqid, res); } event SupervisorControl::destroy_request(reqid: string, node: string) @@ -87,7 +89,7 @@ event SupervisorControl::destroy_request(reqid: string, node: string) local res = Supervisor::destroy(node); local topic = SupervisorControl::topic_prefix + fmt("/destroy_response/%s", reqid); - Broker::publish(topic, SupervisorControl::destroy_response, reqid, res); + Cluster::publish(topic, SupervisorControl::destroy_response, reqid, res); } event SupervisorControl::restart_request(reqid: string, node: string) @@ -97,7 +99,7 @@ event SupervisorControl::restart_request(reqid: string, node: string) local res = Supervisor::restart(node); local topic = SupervisorControl::topic_prefix + fmt("/restart_response/%s", reqid); - Broker::publish(topic, SupervisorControl::restart_response, reqid, res); + Cluster::publish(topic, SupervisorControl::restart_response, reqid, res); } event Supervisor::node_status(node: string, pid: count) @@ -106,5 +108,5 @@ event Supervisor::node_status(node: string, pid: count) return; local topic = SupervisorControl::topic_prefix + "/node_status"; - Broker::publish(topic, SupervisorControl::node_status, node, pid); + Cluster::publish(topic, SupervisorControl::node_status, node, pid); } diff --git a/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log b/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log index 57c9152075..bb8a7653d4 100644 --- a/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log +++ b/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log @@ -123,6 +123,9 @@ scripts/base/init-frameworks-and-bifs.zeek scripts/base/frameworks/supervisor/__load__.zeek scripts/base/frameworks/supervisor/control.zeek scripts/base/frameworks/supervisor/main.zeek + scripts/base/frameworks/cluster/pubsub.zeek + scripts/base/frameworks/cluster/types.zeek + build/scripts/base/bif/cluster.bif.zeek scripts/base/frameworks/input/__load__.zeek scripts/base/frameworks/input/main.zeek build/scripts/base/bif/input.bif.zeek @@ -136,12 +139,9 @@ scripts/base/init-frameworks-and-bifs.zeek scripts/base/frameworks/cluster/main.zeek scripts/base/frameworks/control/__load__.zeek scripts/base/frameworks/control/main.zeek - scripts/base/frameworks/cluster/types.zeek build/scripts/base/bif/plugins/Zeek_Cluster_WebSocket.events.bif.zeek - build/scripts/base/bif/cluster.bif.zeek scripts/base/frameworks/cluster/pools.zeek scripts/base/utils/hash_hrw.zeek - scripts/base/frameworks/cluster/pubsub.zeek scripts/base/frameworks/cluster/telemetry.zeek scripts/base/frameworks/config/__load__.zeek scripts/base/frameworks/config/main.zeek diff --git a/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log b/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log index 2476ec6e1b..bdc4807cdf 100644 --- a/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log +++ b/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log @@ -123,6 +123,9 @@ scripts/base/init-frameworks-and-bifs.zeek scripts/base/frameworks/supervisor/__load__.zeek scripts/base/frameworks/supervisor/control.zeek scripts/base/frameworks/supervisor/main.zeek + scripts/base/frameworks/cluster/pubsub.zeek + scripts/base/frameworks/cluster/types.zeek + build/scripts/base/bif/cluster.bif.zeek scripts/base/frameworks/input/__load__.zeek scripts/base/frameworks/input/main.zeek build/scripts/base/bif/input.bif.zeek @@ -136,12 +139,9 @@ scripts/base/init-frameworks-and-bifs.zeek scripts/base/frameworks/cluster/main.zeek scripts/base/frameworks/control/__load__.zeek scripts/base/frameworks/control/main.zeek - scripts/base/frameworks/cluster/types.zeek build/scripts/base/bif/plugins/Zeek_Cluster_WebSocket.events.bif.zeek - build/scripts/base/bif/cluster.bif.zeek scripts/base/frameworks/cluster/pools.zeek scripts/base/utils/hash_hrw.zeek - scripts/base/frameworks/cluster/pubsub.zeek scripts/base/frameworks/cluster/telemetry.zeek scripts/base/frameworks/config/__load__.zeek scripts/base/frameworks/config/main.zeek diff --git a/testing/btest/Baseline/plugins.hooks/output b/testing/btest/Baseline/plugins.hooks/output index 6ad9831781..560b182567 100644 --- a/testing/btest/Baseline/plugins.hooks/output +++ b/testing/btest/Baseline/plugins.hooks/output @@ -18,13 +18,13 @@ 0.000000 MetaHookPost CallFunction(Analyzer::register_for_port, , (Analyzer::ANALYZER_HTTP, 81/tcp)) -> 0.000000 MetaHookPost CallFunction(Analyzer::register_for_port, , (Analyzer::ANALYZER_HTTP, 8888/tcp)) -> 0.000000 MetaHookPost CallFunction(Analyzer::register_for_ports, , (Analyzer::ANALYZER_HTTP, {80<...>/tcp})) -> -0.000000 MetaHookPost CallFunction(Broker::__subscribe, , (zeek/supervisor)) -> -0.000000 MetaHookPost CallFunction(Broker::subscribe, , (zeek/supervisor)) -> +0.000000 MetaHookPost CallFunction(Cluster::__subscribe, , (zeek/supervisor)) -> 0.000000 MetaHookPost CallFunction(Cluster::is_enabled, , ()) -> 0.000000 MetaHookPost CallFunction(Cluster::is_enabled, , ()) -> 0.000000 MetaHookPost CallFunction(Cluster::register_pool, , ([topic=zeek<...>/logger, node_type=Cluster::LOGGER, max_nodes=, exclusive=F])) -> 0.000000 MetaHookPost CallFunction(Cluster::register_pool, , ([topic=zeek<...>/proxy, node_type=Cluster::PROXY, max_nodes=, exclusive=F])) -> 0.000000 MetaHookPost CallFunction(Cluster::register_pool, , ([topic=zeek<...>/worker, node_type=Cluster::WORKER, max_nodes=, exclusive=F])) -> +0.000000 MetaHookPost CallFunction(Cluster::subscribe, , (zeek/supervisor)) -> 0.000000 MetaHookPost CallFunction(Config::config_option_changed, , (Site::local_nets, {64:ff9b:1::<...>/15,fc00::<...>/10,::/128,2002:ffff:ffff::/48,::1/128,fec0::/10,2002:cb00:7100::/40,2002:c633:6400::<...>/4,2002:a00::/24,100::<...>/8,2001:2::<...>/12,2002:c000:200::/40,2002:f000::/20,2002:7f00::/24,2001::/23,2002:6440::/26,2002:c000::<...>/16,2002:ac10::/28,2002:a9fe::<...>/16,2002:c612::/31,2002::/24,fe80::/10,2001:db8::/32,2002:ef00::<...>/24,2002:e000::/40,2002:c0a8::<...>/24}, )) -> 0.000000 MetaHookPost CallFunction(Files::register_protocol, , (Analyzer::ANALYZER_HTTP, [get_file_handle=HTTP::get_file_handle: function(c:connection, is_orig:bool) : string{ if (!HTTP::c?$http) return ()if (HTTP::c$http$range_request && !HTTP::is_orig) { return (cat(Analyzer::ANALYZER_HTTP, HTTP::is_orig, HTTP::c$id$orig_h, HTTP::build_url(HTTP::c$http)))}else{ HTTP::mime_depth = HTTP::is_orig ? HTTP::c$http$orig_mime_depth : HTTP::c$http$resp_mime_depthreturn (cat(Analyzer::ANALYZER_HTTP, HTTP::c$start_time, HTTP::is_orig, HTTP::c$http$trans_depth, HTTP::mime_depth, id_string(HTTP::c$id)))}}, describe=HTTP::describe_file: function(f:fa_file) : string{ if (HTTP::f$source != HTTP) return ()for ([HTTP::_], HTTP::c in HTTP::f$conns) { if (HTTP::c?$http) return (HTTP::build_url_http(HTTP::c$http))}return ()}])) -> 0.000000 MetaHookPost CallFunction(Log::__add_filter, , (Analyzer::Logging::LOG, [name=default, writer=Log::WRITER_ASCII, path=analyzer, path_func=, include=, exclude=, log_local=T, log_remote=T, field_name_map={}, scope_sep=., ext_prefix=_, ext_func=lambda_<4692973652431675528>: function(path:string) : void, interv=0 secs, postprocessor=, config={}, policy=])) -> @@ -583,6 +583,7 @@ 0.000000 MetaHookPost LoadFile(0, base<...>/ppp, <...>/ppp) -> -1 0.000000 MetaHookPost LoadFile(0, base<...>/ppp_serial, <...>/ppp_serial) -> -1 0.000000 MetaHookPost LoadFile(0, base<...>/pppoe, <...>/pppoe) -> -1 +0.000000 MetaHookPost LoadFile(0, base<...>/pubsub, <...>/pubsub.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, base<...>/removal-hooks, <...>/removal-hooks.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, base<...>/reporter.bif, <...>/reporter.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, base<...>/root, <...>/root) -> -1 @@ -900,6 +901,7 @@ 0.000000 MetaHookPost LoadFileExtended(0, base<...>/ppp, <...>/ppp) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, base<...>/ppp_serial, <...>/ppp_serial) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, base<...>/pppoe, <...>/pppoe) -> (-1, ) +0.000000 MetaHookPost LoadFileExtended(0, base<...>/pubsub, <...>/pubsub.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, base<...>/removal-hooks, <...>/removal-hooks.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, base<...>/reporter.bif, <...>/reporter.bif.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, base<...>/root, <...>/root) -> (-1, ) @@ -962,13 +964,13 @@ 0.000000 MetaHookPre CallFunction(Analyzer::register_for_port, , (Analyzer::ANALYZER_HTTP, 81/tcp)) 0.000000 MetaHookPre CallFunction(Analyzer::register_for_port, , (Analyzer::ANALYZER_HTTP, 8888/tcp)) 0.000000 MetaHookPre CallFunction(Analyzer::register_for_ports, , (Analyzer::ANALYZER_HTTP, {80<...>/tcp})) -0.000000 MetaHookPre CallFunction(Broker::__subscribe, , (zeek/supervisor)) -0.000000 MetaHookPre CallFunction(Broker::subscribe, , (zeek/supervisor)) +0.000000 MetaHookPre CallFunction(Cluster::__subscribe, , (zeek/supervisor)) 0.000000 MetaHookPre CallFunction(Cluster::is_enabled, , ()) 0.000000 MetaHookPre CallFunction(Cluster::is_enabled, , ()) 0.000000 MetaHookPre CallFunction(Cluster::register_pool, , ([topic=zeek<...>/logger, node_type=Cluster::LOGGER, max_nodes=, exclusive=F])) 0.000000 MetaHookPre CallFunction(Cluster::register_pool, , ([topic=zeek<...>/proxy, node_type=Cluster::PROXY, max_nodes=, exclusive=F])) 0.000000 MetaHookPre CallFunction(Cluster::register_pool, , ([topic=zeek<...>/worker, node_type=Cluster::WORKER, max_nodes=, exclusive=F])) +0.000000 MetaHookPre CallFunction(Cluster::subscribe, , (zeek/supervisor)) 0.000000 MetaHookPre CallFunction(Config::config_option_changed, , (Site::local_nets, {64:ff9b:1::<...>/15,fc00::<...>/10,::/128,2002:ffff:ffff::/48,::1/128,fec0::/10,2002:cb00:7100::/40,2002:c633:6400::<...>/4,2002:a00::/24,100::<...>/8,2001:2::<...>/12,2002:c000:200::/40,2002:f000::/20,2002:7f00::/24,2001::/23,2002:6440::/26,2002:c000::<...>/16,2002:ac10::/28,2002:a9fe::<...>/16,2002:c612::/31,2002::/24,fe80::/10,2001:db8::/32,2002:ef00::<...>/24,2002:e000::/40,2002:c0a8::<...>/24}, )) 0.000000 MetaHookPre CallFunction(Files::register_protocol, , (Analyzer::ANALYZER_HTTP, [get_file_handle=HTTP::get_file_handle: function(c:connection, is_orig:bool) : string{ if (!HTTP::c?$http) return ()if (HTTP::c$http$range_request && !HTTP::is_orig) { return (cat(Analyzer::ANALYZER_HTTP, HTTP::is_orig, HTTP::c$id$orig_h, HTTP::build_url(HTTP::c$http)))}else{ HTTP::mime_depth = HTTP::is_orig ? HTTP::c$http$orig_mime_depth : HTTP::c$http$resp_mime_depthreturn (cat(Analyzer::ANALYZER_HTTP, HTTP::c$start_time, HTTP::is_orig, HTTP::c$http$trans_depth, HTTP::mime_depth, id_string(HTTP::c$id)))}}, describe=HTTP::describe_file: function(f:fa_file) : string{ if (HTTP::f$source != HTTP) return ()for ([HTTP::_], HTTP::c in HTTP::f$conns) { if (HTTP::c?$http) return (HTTP::build_url_http(HTTP::c$http))}return ()}])) 0.000000 MetaHookPre CallFunction(Log::__add_filter, , (Analyzer::Logging::LOG, [name=default, writer=Log::WRITER_ASCII, path=analyzer, path_func=, include=, exclude=, log_local=T, log_remote=T, field_name_map={}, scope_sep=., ext_prefix=_, ext_func=lambda_<4692973652431675528>: function(path:string) : void, interv=0 secs, postprocessor=, config={}, policy=])) @@ -1527,6 +1529,7 @@ 0.000000 MetaHookPre LoadFile(0, base<...>/ppp, <...>/ppp) 0.000000 MetaHookPre LoadFile(0, base<...>/ppp_serial, <...>/ppp_serial) 0.000000 MetaHookPre LoadFile(0, base<...>/pppoe, <...>/pppoe) +0.000000 MetaHookPre LoadFile(0, base<...>/pubsub, <...>/pubsub.zeek) 0.000000 MetaHookPre LoadFile(0, base<...>/removal-hooks, <...>/removal-hooks.zeek) 0.000000 MetaHookPre LoadFile(0, base<...>/reporter.bif, <...>/reporter.bif.zeek) 0.000000 MetaHookPre LoadFile(0, base<...>/root, <...>/root) @@ -1844,6 +1847,7 @@ 0.000000 MetaHookPre LoadFileExtended(0, base<...>/ppp, <...>/ppp) 0.000000 MetaHookPre LoadFileExtended(0, base<...>/ppp_serial, <...>/ppp_serial) 0.000000 MetaHookPre LoadFileExtended(0, base<...>/pppoe, <...>/pppoe) +0.000000 MetaHookPre LoadFileExtended(0, base<...>/pubsub, <...>/pubsub.zeek) 0.000000 MetaHookPre LoadFileExtended(0, base<...>/removal-hooks, <...>/removal-hooks.zeek) 0.000000 MetaHookPre LoadFileExtended(0, base<...>/reporter.bif, <...>/reporter.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, base<...>/root, <...>/root) @@ -1906,12 +1910,12 @@ 0.000000 | HookCallFunction Analyzer::register_for_port(Analyzer::ANALYZER_HTTP, 81/tcp) 0.000000 | HookCallFunction Analyzer::register_for_port(Analyzer::ANALYZER_HTTP, 8888/tcp) 0.000000 | HookCallFunction Analyzer::register_for_ports(Analyzer::ANALYZER_HTTP, {80<...>/tcp}) -0.000000 | HookCallFunction Broker::__subscribe(zeek/supervisor) -0.000000 | HookCallFunction Broker::subscribe(zeek/supervisor) +0.000000 | HookCallFunction Cluster::__subscribe(zeek/supervisor) 0.000000 | HookCallFunction Cluster::is_enabled() 0.000000 | HookCallFunction Cluster::register_pool([topic=zeek<...>/logger, node_type=Cluster::LOGGER, max_nodes=, exclusive=F]) 0.000000 | HookCallFunction Cluster::register_pool([topic=zeek<...>/proxy, node_type=Cluster::PROXY, max_nodes=, exclusive=F]) 0.000000 | HookCallFunction Cluster::register_pool([topic=zeek<...>/worker, node_type=Cluster::WORKER, max_nodes=, exclusive=F]) +0.000000 | HookCallFunction Cluster::subscribe(zeek/supervisor) 0.000000 | HookCallFunction Config::config_option_changed(Site::local_nets, {64:ff9b:1::<...>/15,fc00::<...>/10,::/128,2002:ffff:ffff::/48,::1/128,fec0::/10,2002:cb00:7100::/40,2002:c633:6400::<...>/4,2002:a00::/24,100::<...>/8,2001:2::<...>/12,2002:c000:200::/40,2002:f000::/20,2002:7f00::/24,2001::/23,2002:6440::/26,2002:c000::<...>/16,2002:ac10::/28,2002:a9fe::<...>/16,2002:c612::/31,2002::/24,fe80::/10,2001:db8::/32,2002:ef00::<...>/24,2002:e000::/40,2002:c0a8::<...>/24}, ) 0.000000 | HookCallFunction Files::register_protocol(Analyzer::ANALYZER_HTTP, [get_file_handle=HTTP::get_file_handle: function(c:connection, is_orig:bool) : string{ if (!HTTP::c?$http) return ()if (HTTP::c$http$range_request && !HTTP::is_orig) { return (cat(Analyzer::ANALYZER_HTTP, HTTP::is_orig, HTTP::c$id$orig_h, HTTP::build_url(HTTP::c$http)))}else{ HTTP::mime_depth = HTTP::is_orig ? HTTP::c$http$orig_mime_depth : HTTP::c$http$resp_mime_depthreturn (cat(Analyzer::ANALYZER_HTTP, HTTP::c$start_time, HTTP::is_orig, HTTP::c$http$trans_depth, HTTP::mime_depth, id_string(HTTP::c$id)))}}, describe=HTTP::describe_file: function(f:fa_file) : string{ if (HTTP::f$source != HTTP) return ()for ([HTTP::_], HTTP::c in HTTP::f$conns) { if (HTTP::c?$http) return (HTTP::build_url_http(HTTP::c$http))}return ()}]) 0.000000 | HookCallFunction Log::__add_filter(Analyzer::Logging::LOG, [name=default, writer=Log::WRITER_ASCII, path=analyzer, path_func=, include=, exclude=, log_local=T, log_remote=T, field_name_map={}, scope_sep=., ext_prefix=_, ext_func=lambda_<4692973652431675528>: function(path:string) : void, interv=0 secs, postprocessor=, config={}, policy=]) @@ -2483,6 +2487,7 @@ 0.000000 | HookLoadFile base<...>/ppp <...>/ppp 0.000000 | HookLoadFile base<...>/ppp_serial <...>/ppp_serial 0.000000 | HookLoadFile base<...>/pppoe <...>/pppoe +0.000000 | HookLoadFile base<...>/pubsub <...>/pubsub.zeek 0.000000 | HookLoadFile base<...>/removal-hooks <...>/removal-hooks.zeek 0.000000 | HookLoadFile base<...>/reporter.bif <...>/reporter.bif.zeek 0.000000 | HookLoadFile base<...>/root <...>/root @@ -2800,6 +2805,7 @@ 0.000000 | HookLoadFileExtended base<...>/ppp <...>/ppp 0.000000 | HookLoadFileExtended base<...>/ppp_serial <...>/ppp_serial 0.000000 | HookLoadFileExtended base<...>/pppoe <...>/pppoe +0.000000 | HookLoadFileExtended base<...>/pubsub <...>/pubsub.zeek 0.000000 | HookLoadFileExtended base<...>/removal-hooks <...>/removal-hooks.zeek 0.000000 | HookLoadFileExtended base<...>/reporter.bif <...>/reporter.bif.zeek 0.000000 | HookLoadFileExtended base<...>/root <...>/root diff --git a/testing/btest/scripts/policy/frameworks/management/controller/agent-checkin.zeek b/testing/btest/scripts/policy/frameworks/management/controller/agent-checkin.zeek index 26f049a6e6..c441f0a9e5 100644 --- a/testing/btest/scripts/policy/frameworks/management/controller/agent-checkin.zeek +++ b/testing/btest/scripts/policy/frameworks/management/controller/agent-checkin.zeek @@ -11,6 +11,7 @@ # @TEST-EXEC: btest-bg-wait 10 # @TEST-EXEC: btest-diff zeek/nodes/controller/stdout +@load policy/frameworks/cluster/backend/broker @load policy/frameworks/management/agent @load policy/frameworks/management/controller