From 1a87ebab7210223212fff912a259a62c7992cbd9 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Wed, 30 Jul 2025 11:01:12 +0200 Subject: [PATCH] cluster: Add on_subscribe() and on_unsubscribe() hooks Closes #4176 --- NEWS | 4 ++++ scripts/base/frameworks/cluster/main.zeek | 14 +++++++++++ src/cluster/Backend.cc | 21 ++++++++++++++++ src/cluster/Backend.h | 6 ++--- .../.stderr | 6 ++--- .../cluster.websocket.tls-usage-error/.stderr | 4 ++-- .../.stderr | 1 + .../.stdout | 6 +++++ .../generic/on-subscribe-unsubscribe.zeek | 24 +++++++++++++++++++ 9 files changed, 77 insertions(+), 9 deletions(-) create mode 100644 testing/btest/Baseline/cluster.generic.on-subscribe-unsubscribe/.stderr create mode 100644 testing/btest/Baseline/cluster.generic.on-subscribe-unsubscribe/.stdout create mode 100644 testing/btest/cluster/generic/on-subscribe-unsubscribe.zeek diff --git a/NEWS b/NEWS index a5666e10c9..a60ae00632 100644 --- a/NEWS +++ b/NEWS @@ -279,6 +279,10 @@ New Functionality ``get_net_stats()``, it's possible to determine the number of packets that have been received and accepted by Zeek, but eventually discarded without processing. +- Two new hooks, ``Cluster::on_subscribe()`` and ``Cluster::on_unsubscribe()`` have + been added to allow observing ``Subscribe()`` and ``Unsubscribe()`` calls on + backends by Zeek scripts. + Changed Functionality --------------------- diff --git a/scripts/base/frameworks/cluster/main.zeek b/scripts/base/frameworks/cluster/main.zeek index 3060ee08af..86c27eff4a 100644 --- a/scripts/base/frameworks/cluster/main.zeek +++ b/scripts/base/frameworks/cluster/main.zeek @@ -401,6 +401,20 @@ export { ## The value of the X-Application-Name HTTP header, if any. application_name: string &optional; }; + + ## A hook invoked for every :zeek:see:`Cluster::subscribe` call. + ## + ## Breaking from this hook has no effect. + ## + ## topic: The topic string as given to :zeek:see:`Cluster::subscribe`. + global on_subscribe: hook(topic: string); + + ## A hook invoked for every :zeek:see:`Cluster::subscribe` call. + ## + ## Breaking from this hook has no effect. + ## + ## topic: The topic string as given to :zeek:see:`Cluster::subscribe`. + global on_unsubscribe: hook(topic: string); } # Needs declaration of Cluster::Event type. diff --git a/src/cluster/Backend.cc b/src/cluster/Backend.cc index e7aed60faa..171c57290c 100644 --- a/src/cluster/Backend.cc +++ b/src/cluster/Backend.cc @@ -10,6 +10,7 @@ #include "zeek/EventHandler.h" #include "zeek/EventRegistry.h" #include "zeek/Func.h" +#include "zeek/ID.h" #include "zeek/Reporter.h" #include "zeek/Type.h" #include "zeek/Val.h" @@ -138,6 +139,26 @@ std::optional Backend::MakeClusterEvent(FuncValPtr handler, ArgsSpan args return Event{eh, std::move(*checked_args), std::move(meta)}; } +bool Backend::Subscribe(const std::string& topic_prefix, SubscribeCallback cb) { + static const auto on_subscribe = zeek::id::find_func("Cluster::on_subscribe"); + assert(on_subscribe && on_subscribe->Flavor() == FUNC_FLAVOR_HOOK); + + if ( on_subscribe && on_subscribe->HasEnabledBodies() ) + on_subscribe->Invoke(zeek::make_intrusive(topic_prefix)); + + return DoSubscribe(topic_prefix, std::move(cb)); +} + +bool Backend::Unsubscribe(const std::string& topic_prefix) { + static const auto on_unsubscribe = zeek::id::find_func("Cluster::on_unsubscribe"); + assert(on_unsubscribe && on_unsubscribe->Flavor() == FUNC_FLAVOR_HOOK); + + if ( on_unsubscribe->HasEnabledBodies() ) + on_unsubscribe->Invoke(zeek::make_intrusive(topic_prefix)); + + return DoUnsubscribe(topic_prefix); +} + void Backend::DoReadyToPublishCallback(Backend::ReadyCallback cb) { Backend::ReadyCallbackInfo info{Backend::CallbackStatus::Success}; cb(info); diff --git a/src/cluster/Backend.h b/src/cluster/Backend.h index cc475d2ec1..5ce80f23b6 100644 --- a/src/cluster/Backend.h +++ b/src/cluster/Backend.h @@ -210,9 +210,7 @@ public: * @param cb callback invoked when the subscription was processed. * @return true if it's a new event subscription and it is now registered. */ - bool Subscribe(const std::string& topic_prefix, SubscribeCallback cb = SubscribeCallback()) { - return DoSubscribe(topic_prefix, std::move(cb)); - } + bool Subscribe(const std::string& topic_prefix, SubscribeCallback cb = SubscribeCallback()); /** * Unregister interest in messages on a certain topic. @@ -220,7 +218,7 @@ public: * @param topic_prefix a prefix previously supplied to Subscribe() * @return true if interest in topic prefix is no longer advertised. */ - bool Unsubscribe(const std::string& topic_prefix) { return DoUnsubscribe(topic_prefix); } + bool Unsubscribe(const std::string& topic_prefix); /** * Information passed to a ready callback. diff --git a/testing/btest/Baseline.zam/cluster.websocket.listen-idempotent/.stderr b/testing/btest/Baseline.zam/cluster.websocket.listen-idempotent/.stderr index f739a1dd5b..dce9b20598 100644 --- a/testing/btest/Baseline.zam/cluster.websocket.listen-idempotent/.stderr +++ b/testing/btest/Baseline.zam/cluster.websocket.listen-idempotent/.stderr @@ -1,5 +1,5 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. -error in <...>/main.zeek, line 677: Already listening on 127.0.0.1: (Cluster::__listen_websocket(ws_opts_x)) -error in <...>/main.zeek, line 677: Already listening on 127.0.0.1: (Cluster::__listen_websocket(ws_opts_wss_port)) -error in <...>/main.zeek, line 677: Already listening on 127.0.0.1: (Cluster::__listen_websocket(ws_opts_qs)) +error in <...>/main.zeek, line 691: Already listening on 127.0.0.1: (Cluster::__listen_websocket(ws_opts_x)) +error in <...>/main.zeek, line 691: Already listening on 127.0.0.1: (Cluster::__listen_websocket(ws_opts_wss_port)) +error in <...>/main.zeek, line 691: Already listening on 127.0.0.1: (Cluster::__listen_websocket(ws_opts_qs)) received termination signal diff --git a/testing/btest/Baseline.zam/cluster.websocket.tls-usage-error/.stderr b/testing/btest/Baseline.zam/cluster.websocket.tls-usage-error/.stderr index 171791a278..3a54b399a3 100644 --- a/testing/btest/Baseline.zam/cluster.websocket.tls-usage-error/.stderr +++ b/testing/btest/Baseline.zam/cluster.websocket.tls-usage-error/.stderr @@ -1,3 +1,3 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. -error in <...>/main.zeek, line 677: Invalid tls_options: No key_file field (Cluster::__listen_websocket(Cluster::options.0)) -error in <...>/main.zeek, line 677: Invalid tls_options: No cert_file field (Cluster::__listen_websocket(Cluster::options.3)) +error in <...>/main.zeek, line 691: Invalid tls_options: No key_file field (Cluster::__listen_websocket(Cluster::options.0)) +error in <...>/main.zeek, line 691: Invalid tls_options: No cert_file field (Cluster::__listen_websocket(Cluster::options.3)) diff --git a/testing/btest/Baseline/cluster.generic.on-subscribe-unsubscribe/.stderr b/testing/btest/Baseline/cluster.generic.on-subscribe-unsubscribe/.stderr new file mode 100644 index 0000000000..49d861c74c --- /dev/null +++ b/testing/btest/Baseline/cluster.generic.on-subscribe-unsubscribe/.stderr @@ -0,0 +1 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. diff --git a/testing/btest/Baseline/cluster.generic.on-subscribe-unsubscribe/.stdout b/testing/btest/Baseline/cluster.generic.on-subscribe-unsubscribe/.stdout new file mode 100644 index 0000000000..791b39dc64 --- /dev/null +++ b/testing/btest/Baseline/cluster.generic.on-subscribe-unsubscribe/.stdout @@ -0,0 +1,6 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +on_subscribe, zeek/supervisor +on_subscribe, /my_topic +on_unsubscribe, /my_topic +on_unsubscribe, /my_topic +on_subscribe, /my_topic2 diff --git a/testing/btest/cluster/generic/on-subscribe-unsubscribe.zeek b/testing/btest/cluster/generic/on-subscribe-unsubscribe.zeek new file mode 100644 index 0000000000..998b0d23d6 --- /dev/null +++ b/testing/btest/cluster/generic/on-subscribe-unsubscribe.zeek @@ -0,0 +1,24 @@ +# @TEST-DOC: Cluster::on_subscribe and Cluster::on_unsubscribe hooks +# +# @TEST-EXEC: zeek --parse-only -b %INPUT +# @TEST-EXEC: zeek -b %INPUT +# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-remove-abspath btest-diff .stderr +# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-remove-abspath btest-diff .stdout + +hook Cluster::on_subscribe(topic: string) + { + print "on_subscribe", topic; + } + +hook Cluster::on_unsubscribe(topic: string) + { + print "on_unsubscribe", topic; + } + +event zeek_init() + { + Cluster::subscribe("/my_topic"); + Cluster::unsubscribe("/my_topic"); + Cluster::unsubscribe("/my_topic"); + Cluster::subscribe("/my_topic2"); + }