cluster: Add on_subscribe() and on_unsubscribe() hooks

Closes #4176
This commit is contained in:
Arne Welzel 2025-07-30 11:01:12 +02:00
parent aabb36abf7
commit 1a87ebab72
9 changed files with 77 additions and 9 deletions

4
NEWS
View file

@ -279,6 +279,10 @@ New Functionality
``get_net_stats()``, it's possible to determine the number of packets that have
been received and accepted by Zeek, but eventually discarded without processing.
- Two new hooks, ``Cluster::on_subscribe()`` and ``Cluster::on_unsubscribe()`` have
been added to allow observing ``Subscribe()`` and ``Unsubscribe()`` calls on
backends by Zeek scripts.
Changed Functionality
---------------------

View file

@ -401,6 +401,20 @@ export {
## The value of the X-Application-Name HTTP header, if any.
application_name: string &optional;
};
## A hook invoked for every :zeek:see:`Cluster::subscribe` call.
##
## Breaking from this hook has no effect.
##
## topic: The topic string as given to :zeek:see:`Cluster::subscribe`.
global on_subscribe: hook(topic: string);
## A hook invoked for every :zeek:see:`Cluster::subscribe` call.
##
## Breaking from this hook has no effect.
##
## topic: The topic string as given to :zeek:see:`Cluster::subscribe`.
global on_unsubscribe: hook(topic: string);
}
# Needs declaration of Cluster::Event type.

View file

@ -10,6 +10,7 @@
#include "zeek/EventHandler.h"
#include "zeek/EventRegistry.h"
#include "zeek/Func.h"
#include "zeek/ID.h"
#include "zeek/Reporter.h"
#include "zeek/Type.h"
#include "zeek/Val.h"
@ -138,6 +139,26 @@ std::optional<Event> Backend::MakeClusterEvent(FuncValPtr handler, ArgsSpan args
return Event{eh, std::move(*checked_args), std::move(meta)};
}
bool Backend::Subscribe(const std::string& topic_prefix, SubscribeCallback cb) {
static const auto on_subscribe = zeek::id::find_func("Cluster::on_subscribe");
assert(on_subscribe && on_subscribe->Flavor() == FUNC_FLAVOR_HOOK);
if ( on_subscribe && on_subscribe->HasEnabledBodies() )
on_subscribe->Invoke(zeek::make_intrusive<zeek::StringVal>(topic_prefix));
return DoSubscribe(topic_prefix, std::move(cb));
}
bool Backend::Unsubscribe(const std::string& topic_prefix) {
static const auto on_unsubscribe = zeek::id::find_func("Cluster::on_unsubscribe");
assert(on_unsubscribe && on_unsubscribe->Flavor() == FUNC_FLAVOR_HOOK);
if ( on_unsubscribe->HasEnabledBodies() )
on_unsubscribe->Invoke(zeek::make_intrusive<zeek::StringVal>(topic_prefix));
return DoUnsubscribe(topic_prefix);
}
void Backend::DoReadyToPublishCallback(Backend::ReadyCallback cb) {
Backend::ReadyCallbackInfo info{Backend::CallbackStatus::Success};
cb(info);

View file

@ -210,9 +210,7 @@ public:
* @param cb callback invoked when the subscription was processed.
* @return true if it's a new event subscription and it is now registered.
*/
bool Subscribe(const std::string& topic_prefix, SubscribeCallback cb = SubscribeCallback()) {
return DoSubscribe(topic_prefix, std::move(cb));
}
bool Subscribe(const std::string& topic_prefix, SubscribeCallback cb = SubscribeCallback());
/**
* Unregister interest in messages on a certain topic.
@ -220,7 +218,7 @@ public:
* @param topic_prefix a prefix previously supplied to Subscribe()
* @return true if interest in topic prefix is no longer advertised.
*/
bool Unsubscribe(const std::string& topic_prefix) { return DoUnsubscribe(topic_prefix); }
bool Unsubscribe(const std::string& topic_prefix);
/**
* Information passed to a ready callback.

View file

@ -1,5 +1,5 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
error in <...>/main.zeek, line 677: Already listening on 127.0.0.1:<port> (Cluster::__listen_websocket(ws_opts_x))
error in <...>/main.zeek, line 677: Already listening on 127.0.0.1:<port> (Cluster::__listen_websocket(ws_opts_wss_port))
error in <...>/main.zeek, line 677: Already listening on 127.0.0.1:<port> (Cluster::__listen_websocket(ws_opts_qs))
error in <...>/main.zeek, line 691: Already listening on 127.0.0.1:<port> (Cluster::__listen_websocket(ws_opts_x))
error in <...>/main.zeek, line 691: Already listening on 127.0.0.1:<port> (Cluster::__listen_websocket(ws_opts_wss_port))
error in <...>/main.zeek, line 691: Already listening on 127.0.0.1:<port> (Cluster::__listen_websocket(ws_opts_qs))
received termination signal

View file

@ -1,3 +1,3 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
error in <...>/main.zeek, line 677: Invalid tls_options: No key_file field (Cluster::__listen_websocket(Cluster::options.0))
error in <...>/main.zeek, line 677: Invalid tls_options: No cert_file field (Cluster::__listen_websocket(Cluster::options.3))
error in <...>/main.zeek, line 691: Invalid tls_options: No key_file field (Cluster::__listen_websocket(Cluster::options.0))
error in <...>/main.zeek, line 691: Invalid tls_options: No cert_file field (Cluster::__listen_websocket(Cluster::options.3))

View file

@ -0,0 +1 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.

View file

@ -0,0 +1,6 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
on_subscribe, zeek/supervisor
on_subscribe, /my_topic
on_unsubscribe, /my_topic
on_unsubscribe, /my_topic
on_subscribe, /my_topic2

View file

@ -0,0 +1,24 @@
# @TEST-DOC: Cluster::on_subscribe and Cluster::on_unsubscribe hooks
#
# @TEST-EXEC: zeek --parse-only -b %INPUT
# @TEST-EXEC: zeek -b %INPUT
# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-remove-abspath btest-diff .stderr
# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-remove-abspath btest-diff .stdout
hook Cluster::on_subscribe(topic: string)
{
print "on_subscribe", topic;
}
hook Cluster::on_unsubscribe(topic: string)
{
print "on_unsubscribe", topic;
}
event zeek_init()
{
Cluster::subscribe("/my_topic");
Cluster::unsubscribe("/my_topic");
Cluster::unsubscribe("/my_topic");
Cluster::subscribe("/my_topic2");
}