From 1511ca00dfd9a9086c5ca6e0ac7e6e34a8f7089b Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Fri, 8 Aug 2025 14:23:51 +0200 Subject: [PATCH] Merge remote-tracking branch 'origin/topic/awelzel/4176-cluster-on-sub-unsub-hooks' * origin/topic/awelzel/4176-cluster-on-sub-unsub-hooks: cluster: Add on_subscribe() and on_unsubscribe() hooks (cherry picked from commit 13f613eb1d29895924ae516ad51ca7090acd231f) --- CHANGES | 6 +++++ NEWS | 4 ++++ VERSION | 2 +- scripts/base/frameworks/cluster/main.zeek | 14 +++++++++++ src/cluster/Backend.cc | 21 ++++++++++++++++ src/cluster/Backend.h | 6 ++--- .../.stderr | 6 ++--- .../cluster.websocket.tls-usage-error/.stderr | 4 ++-- .../.stderr | 1 + .../.stdout | 6 +++++ .../generic/on-subscribe-unsubscribe.zeek | 24 +++++++++++++++++++ 11 files changed, 84 insertions(+), 10 deletions(-) create mode 100644 testing/btest/Baseline/cluster.generic.on-subscribe-unsubscribe/.stderr create mode 100644 testing/btest/Baseline/cluster.generic.on-subscribe-unsubscribe/.stdout create mode 100644 testing/btest/cluster/generic/on-subscribe-unsubscribe.zeek diff --git a/CHANGES b/CHANGES index 98b1aa42a4..f81ece3c1f 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,9 @@ +8.0.0-rc1.3 | 2025-08-11 11:35:42 -0700 + + * GH-4176: cluster: Add on_subscribe() and on_unsubscribe() hooks (Arne Welzel, Corelight) + + (cherry picked from commit 13f613eb1d29895924ae516ad51ca7090acd231f) + 8.0.0-rc1.2 | 2025-08-11 11:33:46 -0700 * Add proto to analyzer.log (Johanna Amann, Corelight) diff --git a/NEWS b/NEWS index dc798eeb8c..b4e562e5cf 100644 --- a/NEWS +++ b/NEWS @@ -296,6 +296,10 @@ New Functionality ``get_net_stats()``, it's possible to determine the number of packets that have been received and accepted by Zeek, but eventually discarded without processing. +- Two new hooks, ``Cluster::on_subscribe()`` and ``Cluster::on_unsubscribe()`` have + been added to allow observing ``Subscribe()`` and ``Unsubscribe()`` calls on + backends by Zeek scripts. + Changed Functionality --------------------- diff --git a/VERSION b/VERSION index e19097d063..f09f2c681a 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -8.0.0-rc1.2 +8.0.0-rc1.3 diff --git a/scripts/base/frameworks/cluster/main.zeek b/scripts/base/frameworks/cluster/main.zeek index 3060ee08af..86c27eff4a 100644 --- a/scripts/base/frameworks/cluster/main.zeek +++ b/scripts/base/frameworks/cluster/main.zeek @@ -401,6 +401,20 @@ export { ## The value of the X-Application-Name HTTP header, if any. application_name: string &optional; }; + + ## A hook invoked for every :zeek:see:`Cluster::subscribe` call. + ## + ## Breaking from this hook has no effect. + ## + ## topic: The topic string as given to :zeek:see:`Cluster::subscribe`. + global on_subscribe: hook(topic: string); + + ## A hook invoked for every :zeek:see:`Cluster::subscribe` call. + ## + ## Breaking from this hook has no effect. + ## + ## topic: The topic string as given to :zeek:see:`Cluster::subscribe`. + global on_unsubscribe: hook(topic: string); } # Needs declaration of Cluster::Event type. diff --git a/src/cluster/Backend.cc b/src/cluster/Backend.cc index 3a0214b0eb..e0aa6f6c16 100644 --- a/src/cluster/Backend.cc +++ b/src/cluster/Backend.cc @@ -11,6 +11,7 @@ #include "zeek/EventHandler.h" #include "zeek/EventRegistry.h" #include "zeek/Func.h" +#include "zeek/ID.h" #include "zeek/Reporter.h" #include "zeek/Type.h" #include "zeek/Val.h" @@ -139,6 +140,26 @@ std::optional Backend::MakeClusterEvent(FuncValPtr handler, ArgsSpan args return Event{eh, std::move(*checked_args), std::move(meta)}; } +bool Backend::Subscribe(const std::string& topic_prefix, SubscribeCallback cb) { + static const auto on_subscribe = zeek::id::find_func("Cluster::on_subscribe"); + assert(on_subscribe && on_subscribe->Flavor() == FUNC_FLAVOR_HOOK); + + if ( on_subscribe && on_subscribe->HasEnabledBodies() ) + on_subscribe->Invoke(zeek::make_intrusive(topic_prefix)); + + return DoSubscribe(topic_prefix, std::move(cb)); +} + +bool Backend::Unsubscribe(const std::string& topic_prefix) { + static const auto on_unsubscribe = zeek::id::find_func("Cluster::on_unsubscribe"); + assert(on_unsubscribe && on_unsubscribe->Flavor() == FUNC_FLAVOR_HOOK); + + if ( on_unsubscribe->HasEnabledBodies() ) + on_unsubscribe->Invoke(zeek::make_intrusive(topic_prefix)); + + return DoUnsubscribe(topic_prefix); +} + void Backend::DoReadyToPublishCallback(Backend::ReadyCallback cb) { Backend::ReadyCallbackInfo info{Backend::CallbackStatus::Success}; cb(info); diff --git a/src/cluster/Backend.h b/src/cluster/Backend.h index f152ce4e14..9775d598f0 100644 --- a/src/cluster/Backend.h +++ b/src/cluster/Backend.h @@ -210,9 +210,7 @@ public: * @param cb callback invoked when the subscription was processed. * @return true if it's a new event subscription and it is now registered. */ - bool Subscribe(const std::string& topic_prefix, SubscribeCallback cb = SubscribeCallback()) { - return DoSubscribe(topic_prefix, std::move(cb)); - } + bool Subscribe(const std::string& topic_prefix, SubscribeCallback cb = SubscribeCallback()); /** * Unregister interest in messages on a certain topic. @@ -220,7 +218,7 @@ public: * @param topic_prefix a prefix previously supplied to Subscribe() * @return true if interest in topic prefix is no longer advertised. */ - bool Unsubscribe(const std::string& topic_prefix) { return DoUnsubscribe(topic_prefix); } + bool Unsubscribe(const std::string& topic_prefix); /** * Information passed to a ready callback. diff --git a/testing/btest/Baseline.zam/cluster.websocket.listen-idempotent/.stderr b/testing/btest/Baseline.zam/cluster.websocket.listen-idempotent/.stderr index f739a1dd5b..dce9b20598 100644 --- a/testing/btest/Baseline.zam/cluster.websocket.listen-idempotent/.stderr +++ b/testing/btest/Baseline.zam/cluster.websocket.listen-idempotent/.stderr @@ -1,5 +1,5 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. -error in <...>/main.zeek, line 677: Already listening on 127.0.0.1: (Cluster::__listen_websocket(ws_opts_x)) -error in <...>/main.zeek, line 677: Already listening on 127.0.0.1: (Cluster::__listen_websocket(ws_opts_wss_port)) -error in <...>/main.zeek, line 677: Already listening on 127.0.0.1: (Cluster::__listen_websocket(ws_opts_qs)) +error in <...>/main.zeek, line 691: Already listening on 127.0.0.1: (Cluster::__listen_websocket(ws_opts_x)) +error in <...>/main.zeek, line 691: Already listening on 127.0.0.1: (Cluster::__listen_websocket(ws_opts_wss_port)) +error in <...>/main.zeek, line 691: Already listening on 127.0.0.1: (Cluster::__listen_websocket(ws_opts_qs)) received termination signal diff --git a/testing/btest/Baseline.zam/cluster.websocket.tls-usage-error/.stderr b/testing/btest/Baseline.zam/cluster.websocket.tls-usage-error/.stderr index 171791a278..3a54b399a3 100644 --- a/testing/btest/Baseline.zam/cluster.websocket.tls-usage-error/.stderr +++ b/testing/btest/Baseline.zam/cluster.websocket.tls-usage-error/.stderr @@ -1,3 +1,3 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. -error in <...>/main.zeek, line 677: Invalid tls_options: No key_file field (Cluster::__listen_websocket(Cluster::options.0)) -error in <...>/main.zeek, line 677: Invalid tls_options: No cert_file field (Cluster::__listen_websocket(Cluster::options.3)) +error in <...>/main.zeek, line 691: Invalid tls_options: No key_file field (Cluster::__listen_websocket(Cluster::options.0)) +error in <...>/main.zeek, line 691: Invalid tls_options: No cert_file field (Cluster::__listen_websocket(Cluster::options.3)) diff --git a/testing/btest/Baseline/cluster.generic.on-subscribe-unsubscribe/.stderr b/testing/btest/Baseline/cluster.generic.on-subscribe-unsubscribe/.stderr new file mode 100644 index 0000000000..49d861c74c --- /dev/null +++ b/testing/btest/Baseline/cluster.generic.on-subscribe-unsubscribe/.stderr @@ -0,0 +1 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. diff --git a/testing/btest/Baseline/cluster.generic.on-subscribe-unsubscribe/.stdout b/testing/btest/Baseline/cluster.generic.on-subscribe-unsubscribe/.stdout new file mode 100644 index 0000000000..791b39dc64 --- /dev/null +++ b/testing/btest/Baseline/cluster.generic.on-subscribe-unsubscribe/.stdout @@ -0,0 +1,6 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +on_subscribe, zeek/supervisor +on_subscribe, /my_topic +on_unsubscribe, /my_topic +on_unsubscribe, /my_topic +on_subscribe, /my_topic2 diff --git a/testing/btest/cluster/generic/on-subscribe-unsubscribe.zeek b/testing/btest/cluster/generic/on-subscribe-unsubscribe.zeek new file mode 100644 index 0000000000..998b0d23d6 --- /dev/null +++ b/testing/btest/cluster/generic/on-subscribe-unsubscribe.zeek @@ -0,0 +1,24 @@ +# @TEST-DOC: Cluster::on_subscribe and Cluster::on_unsubscribe hooks +# +# @TEST-EXEC: zeek --parse-only -b %INPUT +# @TEST-EXEC: zeek -b %INPUT +# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-remove-abspath btest-diff .stderr +# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-remove-abspath btest-diff .stdout + +hook Cluster::on_subscribe(topic: string) + { + print "on_subscribe", topic; + } + +hook Cluster::on_unsubscribe(topic: string) + { + print "on_unsubscribe", topic; + } + +event zeek_init() + { + Cluster::subscribe("/my_topic"); + Cluster::unsubscribe("/my_topic"); + Cluster::unsubscribe("/my_topic"); + Cluster::subscribe("/my_topic2"); + }