Merge remote-tracking branch 'origin/topic/awelzel/4176-cluster-on-sub-unsub-hooks'

* origin/topic/awelzel/4176-cluster-on-sub-unsub-hooks:
  cluster: Add on_subscribe() and on_unsubscribe() hooks

(cherry picked from commit 13f613eb1d)
This commit is contained in:
Arne Welzel 2025-08-08 14:23:51 +02:00 committed by Tim Wojtulewicz
parent a76b2148c6
commit 1511ca00df
11 changed files with 84 additions and 10 deletions

View file

@ -1,3 +1,9 @@
8.0.0-rc1.3 | 2025-08-11 11:35:42 -0700
* GH-4176: cluster: Add on_subscribe() and on_unsubscribe() hooks (Arne Welzel, Corelight)
(cherry picked from commit 13f613eb1d29895924ae516ad51ca7090acd231f)
8.0.0-rc1.2 | 2025-08-11 11:33:46 -0700 8.0.0-rc1.2 | 2025-08-11 11:33:46 -0700
* Add proto to analyzer.log (Johanna Amann, Corelight) * Add proto to analyzer.log (Johanna Amann, Corelight)

4
NEWS
View file

@ -296,6 +296,10 @@ New Functionality
``get_net_stats()``, it's possible to determine the number of packets that have ``get_net_stats()``, it's possible to determine the number of packets that have
been received and accepted by Zeek, but eventually discarded without processing. been received and accepted by Zeek, but eventually discarded without processing.
- Two new hooks, ``Cluster::on_subscribe()`` and ``Cluster::on_unsubscribe()`` have
been added to allow observing ``Subscribe()`` and ``Unsubscribe()`` calls on
backends by Zeek scripts.
Changed Functionality Changed Functionality
--------------------- ---------------------

View file

@ -1 +1 @@
8.0.0-rc1.2 8.0.0-rc1.3

View file

@ -401,6 +401,20 @@ export {
## The value of the X-Application-Name HTTP header, if any. ## The value of the X-Application-Name HTTP header, if any.
application_name: string &optional; application_name: string &optional;
}; };
## A hook invoked for every :zeek:see:`Cluster::subscribe` call.
##
## Breaking from this hook has no effect.
##
## topic: The topic string as given to :zeek:see:`Cluster::subscribe`.
global on_subscribe: hook(topic: string);
## A hook invoked for every :zeek:see:`Cluster::subscribe` call.
##
## Breaking from this hook has no effect.
##
## topic: The topic string as given to :zeek:see:`Cluster::subscribe`.
global on_unsubscribe: hook(topic: string);
} }
# Needs declaration of Cluster::Event type. # Needs declaration of Cluster::Event type.

View file

@ -11,6 +11,7 @@
#include "zeek/EventHandler.h" #include "zeek/EventHandler.h"
#include "zeek/EventRegistry.h" #include "zeek/EventRegistry.h"
#include "zeek/Func.h" #include "zeek/Func.h"
#include "zeek/ID.h"
#include "zeek/Reporter.h" #include "zeek/Reporter.h"
#include "zeek/Type.h" #include "zeek/Type.h"
#include "zeek/Val.h" #include "zeek/Val.h"
@ -139,6 +140,26 @@ std::optional<Event> Backend::MakeClusterEvent(FuncValPtr handler, ArgsSpan args
return Event{eh, std::move(*checked_args), std::move(meta)}; return Event{eh, std::move(*checked_args), std::move(meta)};
} }
bool Backend::Subscribe(const std::string& topic_prefix, SubscribeCallback cb) {
static const auto on_subscribe = zeek::id::find_func("Cluster::on_subscribe");
assert(on_subscribe && on_subscribe->Flavor() == FUNC_FLAVOR_HOOK);
if ( on_subscribe && on_subscribe->HasEnabledBodies() )
on_subscribe->Invoke(zeek::make_intrusive<zeek::StringVal>(topic_prefix));
return DoSubscribe(topic_prefix, std::move(cb));
}
bool Backend::Unsubscribe(const std::string& topic_prefix) {
static const auto on_unsubscribe = zeek::id::find_func("Cluster::on_unsubscribe");
assert(on_unsubscribe && on_unsubscribe->Flavor() == FUNC_FLAVOR_HOOK);
if ( on_unsubscribe->HasEnabledBodies() )
on_unsubscribe->Invoke(zeek::make_intrusive<zeek::StringVal>(topic_prefix));
return DoUnsubscribe(topic_prefix);
}
void Backend::DoReadyToPublishCallback(Backend::ReadyCallback cb) { void Backend::DoReadyToPublishCallback(Backend::ReadyCallback cb) {
Backend::ReadyCallbackInfo info{Backend::CallbackStatus::Success}; Backend::ReadyCallbackInfo info{Backend::CallbackStatus::Success};
cb(info); cb(info);

View file

@ -210,9 +210,7 @@ public:
* @param cb callback invoked when the subscription was processed. * @param cb callback invoked when the subscription was processed.
* @return true if it's a new event subscription and it is now registered. * @return true if it's a new event subscription and it is now registered.
*/ */
bool Subscribe(const std::string& topic_prefix, SubscribeCallback cb = SubscribeCallback()) { bool Subscribe(const std::string& topic_prefix, SubscribeCallback cb = SubscribeCallback());
return DoSubscribe(topic_prefix, std::move(cb));
}
/** /**
* Unregister interest in messages on a certain topic. * Unregister interest in messages on a certain topic.
@ -220,7 +218,7 @@ public:
* @param topic_prefix a prefix previously supplied to Subscribe() * @param topic_prefix a prefix previously supplied to Subscribe()
* @return true if interest in topic prefix is no longer advertised. * @return true if interest in topic prefix is no longer advertised.
*/ */
bool Unsubscribe(const std::string& topic_prefix) { return DoUnsubscribe(topic_prefix); } bool Unsubscribe(const std::string& topic_prefix);
/** /**
* Information passed to a ready callback. * Information passed to a ready callback.

View file

@ -1,5 +1,5 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
error in <...>/main.zeek, line 677: Already listening on 127.0.0.1:<port> (Cluster::__listen_websocket(ws_opts_x)) error in <...>/main.zeek, line 691: Already listening on 127.0.0.1:<port> (Cluster::__listen_websocket(ws_opts_x))
error in <...>/main.zeek, line 677: Already listening on 127.0.0.1:<port> (Cluster::__listen_websocket(ws_opts_wss_port)) error in <...>/main.zeek, line 691: Already listening on 127.0.0.1:<port> (Cluster::__listen_websocket(ws_opts_wss_port))
error in <...>/main.zeek, line 677: Already listening on 127.0.0.1:<port> (Cluster::__listen_websocket(ws_opts_qs)) error in <...>/main.zeek, line 691: Already listening on 127.0.0.1:<port> (Cluster::__listen_websocket(ws_opts_qs))
received termination signal received termination signal

View file

@ -1,3 +1,3 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
error in <...>/main.zeek, line 677: Invalid tls_options: No key_file field (Cluster::__listen_websocket(Cluster::options.0)) error in <...>/main.zeek, line 691: Invalid tls_options: No key_file field (Cluster::__listen_websocket(Cluster::options.0))
error in <...>/main.zeek, line 677: Invalid tls_options: No cert_file field (Cluster::__listen_websocket(Cluster::options.3)) error in <...>/main.zeek, line 691: Invalid tls_options: No cert_file field (Cluster::__listen_websocket(Cluster::options.3))

View file

@ -0,0 +1 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.

View file

@ -0,0 +1,6 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
on_subscribe, zeek/supervisor
on_subscribe, /my_topic
on_unsubscribe, /my_topic
on_unsubscribe, /my_topic
on_subscribe, /my_topic2

View file

@ -0,0 +1,24 @@
# @TEST-DOC: Cluster::on_subscribe and Cluster::on_unsubscribe hooks
#
# @TEST-EXEC: zeek --parse-only -b %INPUT
# @TEST-EXEC: zeek -b %INPUT
# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-remove-abspath btest-diff .stderr
# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-remove-abspath btest-diff .stdout
hook Cluster::on_subscribe(topic: string)
{
print "on_subscribe", topic;
}
hook Cluster::on_unsubscribe(topic: string)
{
print "on_unsubscribe", topic;
}
event zeek_init()
{
Cluster::subscribe("/my_topic");
Cluster::unsubscribe("/my_topic");
Cluster::unsubscribe("/my_topic");
Cluster::subscribe("/my_topic2");
}