diff --git a/scripts/base/frameworks/supervisor/main.zeek b/scripts/base/frameworks/supervisor/main.zeek
index fbc7cf1009..2b6f0ed86a 100644
--- a/scripts/base/frameworks/supervisor/main.zeek
+++ b/scripts/base/frameworks/supervisor/main.zeek
@@ -1,6 +1,8 @@
##! Implements Zeek process supervision API and default behavior for its
##! associated (remote) control events.
+@load base/frameworks/cluster/pubsub
+
@load ./api
@load ./control
@@ -49,7 +51,7 @@ event zeek_init() &priority=10
Broker::listen();
}
- Broker::subscribe(SupervisorControl::topic_prefix);
+ Cluster::subscribe(SupervisorControl::topic_prefix);
}
event SupervisorControl::stop_request()
@@ -67,7 +69,7 @@ event SupervisorControl::status_request(reqid: string, node: string)
local res = Supervisor::status(node);
local topic = SupervisorControl::topic_prefix + fmt("/status_response/%s", reqid);
- Broker::publish(topic, SupervisorControl::status_response, reqid, res);
+ Cluster::publish(topic, SupervisorControl::status_response, reqid, res);
}
event SupervisorControl::create_request(reqid: string, node: Supervisor::NodeConfig)
@@ -77,7 +79,7 @@ event SupervisorControl::create_request(reqid: string, node: Supervisor::NodeCon
local res = Supervisor::create(node);
local topic = SupervisorControl::topic_prefix + fmt("/create_response/%s", reqid);
- Broker::publish(topic, SupervisorControl::create_response, reqid, res);
+ Cluster::publish(topic, SupervisorControl::create_response, reqid, res);
}
event SupervisorControl::destroy_request(reqid: string, node: string)
@@ -87,7 +89,7 @@ event SupervisorControl::destroy_request(reqid: string, node: string)
local res = Supervisor::destroy(node);
local topic = SupervisorControl::topic_prefix + fmt("/destroy_response/%s", reqid);
- Broker::publish(topic, SupervisorControl::destroy_response, reqid, res);
+ Cluster::publish(topic, SupervisorControl::destroy_response, reqid, res);
}
event SupervisorControl::restart_request(reqid: string, node: string)
@@ -97,7 +99,7 @@ event SupervisorControl::restart_request(reqid: string, node: string)
local res = Supervisor::restart(node);
local topic = SupervisorControl::topic_prefix + fmt("/restart_response/%s", reqid);
- Broker::publish(topic, SupervisorControl::restart_response, reqid, res);
+ Cluster::publish(topic, SupervisorControl::restart_response, reqid, res);
}
event Supervisor::node_status(node: string, pid: count)
@@ -106,5 +108,5 @@ event Supervisor::node_status(node: string, pid: count)
return;
local topic = SupervisorControl::topic_prefix + "/node_status";
- Broker::publish(topic, SupervisorControl::node_status, node, pid);
+ Cluster::publish(topic, SupervisorControl::node_status, node, pid);
}
diff --git a/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log b/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log
index 57c9152075..bb8a7653d4 100644
--- a/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log
+++ b/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log
@@ -123,6 +123,9 @@ scripts/base/init-frameworks-and-bifs.zeek
scripts/base/frameworks/supervisor/__load__.zeek
scripts/base/frameworks/supervisor/control.zeek
scripts/base/frameworks/supervisor/main.zeek
+ scripts/base/frameworks/cluster/pubsub.zeek
+ scripts/base/frameworks/cluster/types.zeek
+ build/scripts/base/bif/cluster.bif.zeek
scripts/base/frameworks/input/__load__.zeek
scripts/base/frameworks/input/main.zeek
build/scripts/base/bif/input.bif.zeek
@@ -136,12 +139,9 @@ scripts/base/init-frameworks-and-bifs.zeek
scripts/base/frameworks/cluster/main.zeek
scripts/base/frameworks/control/__load__.zeek
scripts/base/frameworks/control/main.zeek
- scripts/base/frameworks/cluster/types.zeek
build/scripts/base/bif/plugins/Zeek_Cluster_WebSocket.events.bif.zeek
- build/scripts/base/bif/cluster.bif.zeek
scripts/base/frameworks/cluster/pools.zeek
scripts/base/utils/hash_hrw.zeek
- scripts/base/frameworks/cluster/pubsub.zeek
scripts/base/frameworks/cluster/telemetry.zeek
scripts/base/frameworks/config/__load__.zeek
scripts/base/frameworks/config/main.zeek
diff --git a/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log b/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log
index 2476ec6e1b..bdc4807cdf 100644
--- a/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log
+++ b/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log
@@ -123,6 +123,9 @@ scripts/base/init-frameworks-and-bifs.zeek
scripts/base/frameworks/supervisor/__load__.zeek
scripts/base/frameworks/supervisor/control.zeek
scripts/base/frameworks/supervisor/main.zeek
+ scripts/base/frameworks/cluster/pubsub.zeek
+ scripts/base/frameworks/cluster/types.zeek
+ build/scripts/base/bif/cluster.bif.zeek
scripts/base/frameworks/input/__load__.zeek
scripts/base/frameworks/input/main.zeek
build/scripts/base/bif/input.bif.zeek
@@ -136,12 +139,9 @@ scripts/base/init-frameworks-and-bifs.zeek
scripts/base/frameworks/cluster/main.zeek
scripts/base/frameworks/control/__load__.zeek
scripts/base/frameworks/control/main.zeek
- scripts/base/frameworks/cluster/types.zeek
build/scripts/base/bif/plugins/Zeek_Cluster_WebSocket.events.bif.zeek
- build/scripts/base/bif/cluster.bif.zeek
scripts/base/frameworks/cluster/pools.zeek
scripts/base/utils/hash_hrw.zeek
- scripts/base/frameworks/cluster/pubsub.zeek
scripts/base/frameworks/cluster/telemetry.zeek
scripts/base/frameworks/config/__load__.zeek
scripts/base/frameworks/config/main.zeek
diff --git a/testing/btest/Baseline/plugins.hooks/output b/testing/btest/Baseline/plugins.hooks/output
index 6ad9831781..560b182567 100644
--- a/testing/btest/Baseline/plugins.hooks/output
+++ b/testing/btest/Baseline/plugins.hooks/output
@@ -18,13 +18,13 @@
0.000000 MetaHookPost CallFunction(Analyzer::register_for_port, , (Analyzer::ANALYZER_HTTP, 81/tcp)) ->
0.000000 MetaHookPost CallFunction(Analyzer::register_for_port, , (Analyzer::ANALYZER_HTTP, 8888/tcp)) ->
0.000000 MetaHookPost CallFunction(Analyzer::register_for_ports, , (Analyzer::ANALYZER_HTTP, {80<...>/tcp})) ->
-0.000000 MetaHookPost CallFunction(Broker::__subscribe, , (zeek/supervisor)) ->
-0.000000 MetaHookPost CallFunction(Broker::subscribe, , (zeek/supervisor)) ->
+0.000000 MetaHookPost CallFunction(Cluster::__subscribe, , (zeek/supervisor)) ->
0.000000 MetaHookPost CallFunction(Cluster::is_enabled, , ()) ->
0.000000 MetaHookPost CallFunction(Cluster::is_enabled, , ()) ->
0.000000 MetaHookPost CallFunction(Cluster::register_pool, , ([topic=zeek<...>/logger, node_type=Cluster::LOGGER, max_nodes=, exclusive=F])) ->
0.000000 MetaHookPost CallFunction(Cluster::register_pool, , ([topic=zeek<...>/proxy, node_type=Cluster::PROXY, max_nodes=, exclusive=F])) ->
0.000000 MetaHookPost CallFunction(Cluster::register_pool, , ([topic=zeek<...>/worker, node_type=Cluster::WORKER, max_nodes=, exclusive=F])) ->
+0.000000 MetaHookPost CallFunction(Cluster::subscribe, , (zeek/supervisor)) ->
0.000000 MetaHookPost CallFunction(Config::config_option_changed, , (Site::local_nets, {64:ff9b:1::<...>/15,fc00::<...>/10,::/128,2002:ffff:ffff::/48,::1/128,fec0::/10,2002:cb00:7100::/40,2002:c633:6400::<...>/4,2002:a00::/24,100::<...>/8,2001:2::<...>/12,2002:c000:200::/40,2002:f000::/20,2002:7f00::/24,2001::/23,2002:6440::/26,2002:c000::<...>/16,2002:ac10::/28,2002:a9fe::<...>/16,2002:c612::/31,2002::/24,fe80::/10,2001:db8::/32,2002:ef00::<...>/24,2002:e000::/40,2002:c0a8::<...>/24}, )) ->
0.000000 MetaHookPost CallFunction(Files::register_protocol, , (Analyzer::ANALYZER_HTTP, [get_file_handle=HTTP::get_file_handle: function(c:connection, is_orig:bool) : string{ if (!HTTP::c?$http) return ()if (HTTP::c$http$range_request && !HTTP::is_orig) { return (cat(Analyzer::ANALYZER_HTTP, HTTP::is_orig, HTTP::c$id$orig_h, HTTP::build_url(HTTP::c$http)))}else{ HTTP::mime_depth = HTTP::is_orig ? HTTP::c$http$orig_mime_depth : HTTP::c$http$resp_mime_depthreturn (cat(Analyzer::ANALYZER_HTTP, HTTP::c$start_time, HTTP::is_orig, HTTP::c$http$trans_depth, HTTP::mime_depth, id_string(HTTP::c$id)))}}, describe=HTTP::describe_file: function(f:fa_file) : string{ if (HTTP::f$source != HTTP) return ()for ([HTTP::_], HTTP::c in HTTP::f$conns) { if (HTTP::c?$http) return (HTTP::build_url_http(HTTP::c$http))}return ()}])) ->
0.000000 MetaHookPost CallFunction(Log::__add_filter, , (Analyzer::Logging::LOG, [name=default, writer=Log::WRITER_ASCII, path=analyzer, path_func=, include=, exclude=, log_local=T, log_remote=T, field_name_map={}, scope_sep=., ext_prefix=_, ext_func=lambda_<4692973652431675528>: function(path:string) : void, interv=0 secs, postprocessor=, config={}, policy=])) ->
@@ -583,6 +583,7 @@
0.000000 MetaHookPost LoadFile(0, base<...>/ppp, <...>/ppp) -> -1
0.000000 MetaHookPost LoadFile(0, base<...>/ppp_serial, <...>/ppp_serial) -> -1
0.000000 MetaHookPost LoadFile(0, base<...>/pppoe, <...>/pppoe) -> -1
+0.000000 MetaHookPost LoadFile(0, base<...>/pubsub, <...>/pubsub.zeek) -> -1
0.000000 MetaHookPost LoadFile(0, base<...>/removal-hooks, <...>/removal-hooks.zeek) -> -1
0.000000 MetaHookPost LoadFile(0, base<...>/reporter.bif, <...>/reporter.bif.zeek) -> -1
0.000000 MetaHookPost LoadFile(0, base<...>/root, <...>/root) -> -1
@@ -900,6 +901,7 @@
0.000000 MetaHookPost LoadFileExtended(0, base<...>/ppp, <...>/ppp) -> (-1, )
0.000000 MetaHookPost LoadFileExtended(0, base<...>/ppp_serial, <...>/ppp_serial) -> (-1, )
0.000000 MetaHookPost LoadFileExtended(0, base<...>/pppoe, <...>/pppoe) -> (-1, )
+0.000000 MetaHookPost LoadFileExtended(0, base<...>/pubsub, <...>/pubsub.zeek) -> (-1, )
0.000000 MetaHookPost LoadFileExtended(0, base<...>/removal-hooks, <...>/removal-hooks.zeek) -> (-1, )
0.000000 MetaHookPost LoadFileExtended(0, base<...>/reporter.bif, <...>/reporter.bif.zeek) -> (-1, )
0.000000 MetaHookPost LoadFileExtended(0, base<...>/root, <...>/root) -> (-1, )
@@ -962,13 +964,13 @@
0.000000 MetaHookPre CallFunction(Analyzer::register_for_port, , (Analyzer::ANALYZER_HTTP, 81/tcp))
0.000000 MetaHookPre CallFunction(Analyzer::register_for_port, , (Analyzer::ANALYZER_HTTP, 8888/tcp))
0.000000 MetaHookPre CallFunction(Analyzer::register_for_ports, , (Analyzer::ANALYZER_HTTP, {80<...>/tcp}))
-0.000000 MetaHookPre CallFunction(Broker::__subscribe, , (zeek/supervisor))
-0.000000 MetaHookPre CallFunction(Broker::subscribe, , (zeek/supervisor))
+0.000000 MetaHookPre CallFunction(Cluster::__subscribe, , (zeek/supervisor))
0.000000 MetaHookPre CallFunction(Cluster::is_enabled, , ())
0.000000 MetaHookPre CallFunction(Cluster::is_enabled, , ())
0.000000 MetaHookPre CallFunction(Cluster::register_pool, , ([topic=zeek<...>/logger, node_type=Cluster::LOGGER, max_nodes=, exclusive=F]))
0.000000 MetaHookPre CallFunction(Cluster::register_pool, , ([topic=zeek<...>/proxy, node_type=Cluster::PROXY, max_nodes=, exclusive=F]))
0.000000 MetaHookPre CallFunction(Cluster::register_pool, , ([topic=zeek<...>/worker, node_type=Cluster::WORKER, max_nodes=, exclusive=F]))
+0.000000 MetaHookPre CallFunction(Cluster::subscribe, , (zeek/supervisor))
0.000000 MetaHookPre CallFunction(Config::config_option_changed, , (Site::local_nets, {64:ff9b:1::<...>/15,fc00::<...>/10,::/128,2002:ffff:ffff::/48,::1/128,fec0::/10,2002:cb00:7100::/40,2002:c633:6400::<...>/4,2002:a00::/24,100::<...>/8,2001:2::<...>/12,2002:c000:200::/40,2002:f000::/20,2002:7f00::/24,2001::/23,2002:6440::/26,2002:c000::<...>/16,2002:ac10::/28,2002:a9fe::<...>/16,2002:c612::/31,2002::/24,fe80::/10,2001:db8::/32,2002:ef00::<...>/24,2002:e000::/40,2002:c0a8::<...>/24}, ))
0.000000 MetaHookPre CallFunction(Files::register_protocol, , (Analyzer::ANALYZER_HTTP, [get_file_handle=HTTP::get_file_handle: function(c:connection, is_orig:bool) : string{ if (!HTTP::c?$http) return ()if (HTTP::c$http$range_request && !HTTP::is_orig) { return (cat(Analyzer::ANALYZER_HTTP, HTTP::is_orig, HTTP::c$id$orig_h, HTTP::build_url(HTTP::c$http)))}else{ HTTP::mime_depth = HTTP::is_orig ? HTTP::c$http$orig_mime_depth : HTTP::c$http$resp_mime_depthreturn (cat(Analyzer::ANALYZER_HTTP, HTTP::c$start_time, HTTP::is_orig, HTTP::c$http$trans_depth, HTTP::mime_depth, id_string(HTTP::c$id)))}}, describe=HTTP::describe_file: function(f:fa_file) : string{ if (HTTP::f$source != HTTP) return ()for ([HTTP::_], HTTP::c in HTTP::f$conns) { if (HTTP::c?$http) return (HTTP::build_url_http(HTTP::c$http))}return ()}]))
0.000000 MetaHookPre CallFunction(Log::__add_filter, , (Analyzer::Logging::LOG, [name=default, writer=Log::WRITER_ASCII, path=analyzer, path_func=, include=, exclude=, log_local=T, log_remote=T, field_name_map={}, scope_sep=., ext_prefix=_, ext_func=lambda_<4692973652431675528>: function(path:string) : void, interv=0 secs, postprocessor=, config={}, policy=]))
@@ -1527,6 +1529,7 @@
0.000000 MetaHookPre LoadFile(0, base<...>/ppp, <...>/ppp)
0.000000 MetaHookPre LoadFile(0, base<...>/ppp_serial, <...>/ppp_serial)
0.000000 MetaHookPre LoadFile(0, base<...>/pppoe, <...>/pppoe)
+0.000000 MetaHookPre LoadFile(0, base<...>/pubsub, <...>/pubsub.zeek)
0.000000 MetaHookPre LoadFile(0, base<...>/removal-hooks, <...>/removal-hooks.zeek)
0.000000 MetaHookPre LoadFile(0, base<...>/reporter.bif, <...>/reporter.bif.zeek)
0.000000 MetaHookPre LoadFile(0, base<...>/root, <...>/root)
@@ -1844,6 +1847,7 @@
0.000000 MetaHookPre LoadFileExtended(0, base<...>/ppp, <...>/ppp)
0.000000 MetaHookPre LoadFileExtended(0, base<...>/ppp_serial, <...>/ppp_serial)
0.000000 MetaHookPre LoadFileExtended(0, base<...>/pppoe, <...>/pppoe)
+0.000000 MetaHookPre LoadFileExtended(0, base<...>/pubsub, <...>/pubsub.zeek)
0.000000 MetaHookPre LoadFileExtended(0, base<...>/removal-hooks, <...>/removal-hooks.zeek)
0.000000 MetaHookPre LoadFileExtended(0, base<...>/reporter.bif, <...>/reporter.bif.zeek)
0.000000 MetaHookPre LoadFileExtended(0, base<...>/root, <...>/root)
@@ -1906,12 +1910,12 @@
0.000000 | HookCallFunction Analyzer::register_for_port(Analyzer::ANALYZER_HTTP, 81/tcp)
0.000000 | HookCallFunction Analyzer::register_for_port(Analyzer::ANALYZER_HTTP, 8888/tcp)
0.000000 | HookCallFunction Analyzer::register_for_ports(Analyzer::ANALYZER_HTTP, {80<...>/tcp})
-0.000000 | HookCallFunction Broker::__subscribe(zeek/supervisor)
-0.000000 | HookCallFunction Broker::subscribe(zeek/supervisor)
+0.000000 | HookCallFunction Cluster::__subscribe(zeek/supervisor)
0.000000 | HookCallFunction Cluster::is_enabled()
0.000000 | HookCallFunction Cluster::register_pool([topic=zeek<...>/logger, node_type=Cluster::LOGGER, max_nodes=, exclusive=F])
0.000000 | HookCallFunction Cluster::register_pool([topic=zeek<...>/proxy, node_type=Cluster::PROXY, max_nodes=, exclusive=F])
0.000000 | HookCallFunction Cluster::register_pool([topic=zeek<...>/worker, node_type=Cluster::WORKER, max_nodes=, exclusive=F])
+0.000000 | HookCallFunction Cluster::subscribe(zeek/supervisor)
0.000000 | HookCallFunction Config::config_option_changed(Site::local_nets, {64:ff9b:1::<...>/15,fc00::<...>/10,::/128,2002:ffff:ffff::/48,::1/128,fec0::/10,2002:cb00:7100::/40,2002:c633:6400::<...>/4,2002:a00::/24,100::<...>/8,2001:2::<...>/12,2002:c000:200::/40,2002:f000::/20,2002:7f00::/24,2001::/23,2002:6440::/26,2002:c000::<...>/16,2002:ac10::/28,2002:a9fe::<...>/16,2002:c612::/31,2002::/24,fe80::/10,2001:db8::/32,2002:ef00::<...>/24,2002:e000::/40,2002:c0a8::<...>/24}, )
0.000000 | HookCallFunction Files::register_protocol(Analyzer::ANALYZER_HTTP, [get_file_handle=HTTP::get_file_handle: function(c:connection, is_orig:bool) : string{ if (!HTTP::c?$http) return ()if (HTTP::c$http$range_request && !HTTP::is_orig) { return (cat(Analyzer::ANALYZER_HTTP, HTTP::is_orig, HTTP::c$id$orig_h, HTTP::build_url(HTTP::c$http)))}else{ HTTP::mime_depth = HTTP::is_orig ? HTTP::c$http$orig_mime_depth : HTTP::c$http$resp_mime_depthreturn (cat(Analyzer::ANALYZER_HTTP, HTTP::c$start_time, HTTP::is_orig, HTTP::c$http$trans_depth, HTTP::mime_depth, id_string(HTTP::c$id)))}}, describe=HTTP::describe_file: function(f:fa_file) : string{ if (HTTP::f$source != HTTP) return ()for ([HTTP::_], HTTP::c in HTTP::f$conns) { if (HTTP::c?$http) return (HTTP::build_url_http(HTTP::c$http))}return ()}])
0.000000 | HookCallFunction Log::__add_filter(Analyzer::Logging::LOG, [name=default, writer=Log::WRITER_ASCII, path=analyzer, path_func=, include=, exclude=, log_local=T, log_remote=T, field_name_map={}, scope_sep=., ext_prefix=_, ext_func=lambda_<4692973652431675528>: function(path:string) : void, interv=0 secs, postprocessor=, config={}, policy=])
@@ -2483,6 +2487,7 @@
0.000000 | HookLoadFile base<...>/ppp <...>/ppp
0.000000 | HookLoadFile base<...>/ppp_serial <...>/ppp_serial
0.000000 | HookLoadFile base<...>/pppoe <...>/pppoe
+0.000000 | HookLoadFile base<...>/pubsub <...>/pubsub.zeek
0.000000 | HookLoadFile base<...>/removal-hooks <...>/removal-hooks.zeek
0.000000 | HookLoadFile base<...>/reporter.bif <...>/reporter.bif.zeek
0.000000 | HookLoadFile base<...>/root <...>/root
@@ -2800,6 +2805,7 @@
0.000000 | HookLoadFileExtended base<...>/ppp <...>/ppp
0.000000 | HookLoadFileExtended base<...>/ppp_serial <...>/ppp_serial
0.000000 | HookLoadFileExtended base<...>/pppoe <...>/pppoe
+0.000000 | HookLoadFileExtended base<...>/pubsub <...>/pubsub.zeek
0.000000 | HookLoadFileExtended base<...>/removal-hooks <...>/removal-hooks.zeek
0.000000 | HookLoadFileExtended base<...>/reporter.bif <...>/reporter.bif.zeek
0.000000 | HookLoadFileExtended base<...>/root <...>/root
diff --git a/testing/btest/scripts/policy/frameworks/management/controller/agent-checkin.zeek b/testing/btest/scripts/policy/frameworks/management/controller/agent-checkin.zeek
index 26f049a6e6..c441f0a9e5 100644
--- a/testing/btest/scripts/policy/frameworks/management/controller/agent-checkin.zeek
+++ b/testing/btest/scripts/policy/frameworks/management/controller/agent-checkin.zeek
@@ -11,6 +11,7 @@
# @TEST-EXEC: btest-bg-wait 10
# @TEST-EXEC: btest-diff zeek/nodes/controller/stdout
+@load policy/frameworks/cluster/backend/broker
@load policy/frameworks/management/agent
@load policy/frameworks/management/controller