mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 06:38:20 +00:00
cluster: Add SubscribeCallback support
This allows callers of Subscribe() to pass in a callback that will be invoked once the subscription is established or failed to establish. It is the backend's responsibility to execute the callback on the main thread either synchronously, or preferably asynchronously at a later point, by scheduling a task on the IO main loop. This turns on ZMQ_XPUB_VERBOSE for ZeroMQ so that notifications about subscriptions are raised even if the subscriptions has previously been observed.
This commit is contained in:
parent
fa22f91ca4
commit
e8f87019c6
6 changed files with 93 additions and 10 deletions
|
@ -946,10 +946,13 @@ zeek::RecordValPtr Manager::MakeEvent(ArgsSpan args, zeek::detail::Frame* frame)
|
||||||
return rval;
|
return rval;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Manager::DoSubscribe(const string& topic_prefix) {
|
bool Manager::DoSubscribe(const string& topic_prefix, SubscribeCallback cb) {
|
||||||
DBG_LOG(DBG_BROKER, "Subscribing to topic prefix %s", topic_prefix.c_str());
|
DBG_LOG(DBG_BROKER, "Subscribing to topic prefix %s", topic_prefix.c_str());
|
||||||
bstate->subscriber.add_topic(topic_prefix, ! run_state::detail::zeek_init_done);
|
bstate->subscriber.add_topic(topic_prefix, ! run_state::detail::zeek_init_done);
|
||||||
|
|
||||||
|
if ( cb )
|
||||||
|
cb(topic_prefix, {CallbackStatus::NotImplemented});
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -380,7 +380,7 @@ public:
|
||||||
|
|
||||||
private:
|
private:
|
||||||
// Register interest in peer event messages that use a certain topic prefix.
|
// Register interest in peer event messages that use a certain topic prefix.
|
||||||
bool DoSubscribe(const std::string& topic_prefix) override;
|
bool DoSubscribe(const std::string& topic_prefix, SubscribeCallback cb) override;
|
||||||
|
|
||||||
// Unregister interest in peer event messages.
|
// Unregister interest in peer event messages.
|
||||||
bool DoUnsubscribe(const std::string& topic_prefix) override;
|
bool DoUnsubscribe(const std::string& topic_prefix) override;
|
||||||
|
|
|
@ -185,13 +185,40 @@ public:
|
||||||
return DoPublishEvent(topic, event);
|
return DoPublishEvent(topic, event);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Status codes for callbacks.
|
||||||
|
*/
|
||||||
|
enum class CallbackStatus {
|
||||||
|
Success,
|
||||||
|
Error,
|
||||||
|
NotImplemented,
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Information for subscription callbacks.
|
||||||
|
*/
|
||||||
|
struct SubscriptionCallbackInfo {
|
||||||
|
CallbackStatus status; // The status of the operation.
|
||||||
|
std::optional<std::string> message; // Optional message.
|
||||||
|
};
|
||||||
|
|
||||||
|
using SubscribeCallback =
|
||||||
|
std::function<void(const std::string& topic_prefix, const SubscriptionCallbackInfo& info)>;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Register interest in messages that use a certain topic prefix.
|
* Register interest in messages that use a certain topic prefix.
|
||||||
*
|
*
|
||||||
|
* Invoking cb may happen while Subscribe() executes, for example if the
|
||||||
|
* call to Subscribe() is synchronous, or an error is discovered before
|
||||||
|
* submitting any work.
|
||||||
|
*
|
||||||
* @param topic_prefix a prefix to match against remote message topics.
|
* @param topic_prefix a prefix to match against remote message topics.
|
||||||
|
* @param cb callback invoked when the subscription was processed.
|
||||||
* @return true if it's a new event subscription and it is now registered.
|
* @return true if it's a new event subscription and it is now registered.
|
||||||
*/
|
*/
|
||||||
bool Subscribe(const std::string& topic_prefix) { return DoSubscribe(topic_prefix); }
|
bool Subscribe(const std::string& topic_prefix, SubscribeCallback cb = SubscribeCallback()) {
|
||||||
|
return DoSubscribe(topic_prefix, std::move(cb));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unregister interest in messages on a certain topic.
|
* Unregister interest in messages on a certain topic.
|
||||||
|
@ -314,13 +341,19 @@ private:
|
||||||
* Register interest in messages that use a certain topic prefix.
|
* Register interest in messages that use a certain topic prefix.
|
||||||
*
|
*
|
||||||
* If the backend hasn't yet established a connection, any subscriptions
|
* If the backend hasn't yet established a connection, any subscriptions
|
||||||
* should be queued until they can be processed.
|
* should be queued until they can be processed. If a callback is given,
|
||||||
|
* it should be called once the subscription can be determined to be
|
||||||
|
* active. The callback has to be invoked from Zeek's main thread. If
|
||||||
|
* the backend does not implement callbacks, it should invoke the callback
|
||||||
|
* with CallbackStatus::NotImplemented, which will act as success, but
|
||||||
|
* provides a way to distinguish behavior.
|
||||||
*
|
*
|
||||||
* @param topic_prefix a prefix to match against remote message topics.
|
* @param topic_prefix a prefix to match against remote message topics.
|
||||||
|
* @param cb callback to invoke when the subscription is active
|
||||||
*
|
*
|
||||||
* @return true if it's a new event subscription and now registered.
|
* @return true if it's a new event subscription and now registered.
|
||||||
*/
|
*/
|
||||||
virtual bool DoSubscribe(const std::string& topic_prefix) = 0;
|
virtual bool DoSubscribe(const std::string& topic_prefix, SubscribeCallback cb) = 0;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unregister interest in messages on a certain topic.
|
* Unregister interest in messages on a certain topic.
|
||||||
|
|
|
@ -39,6 +39,12 @@ bool ProxyThread::Start() {
|
||||||
zmq::socket_t xpub(ctx, zmq::socket_type::xpub);
|
zmq::socket_t xpub(ctx, zmq::socket_type::xpub);
|
||||||
zmq::socket_t xsub(ctx, zmq::socket_type::xsub);
|
zmq::socket_t xsub(ctx, zmq::socket_type::xsub);
|
||||||
|
|
||||||
|
// Enable XPUB_VERBOSE unconditional to enforce nodes receiving
|
||||||
|
// notifications about any new subscriptions, even if they have
|
||||||
|
// seen them before. This is needed to for the subscribe callback
|
||||||
|
// functionality to work reliably.
|
||||||
|
xpub.set(zmq::sockopt::xpub_verbose, 1);
|
||||||
|
|
||||||
xpub.set(zmq::sockopt::xpub_nodrop, xpub_nodrop);
|
xpub.set(zmq::sockopt::xpub_nodrop, xpub_nodrop);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
#include <zmq.hpp>
|
#include <zmq.hpp>
|
||||||
|
|
||||||
#include "zeek/DebugLogger.h"
|
#include "zeek/DebugLogger.h"
|
||||||
|
#include "zeek/EventHandler.h"
|
||||||
#include "zeek/EventRegistry.h"
|
#include "zeek/EventRegistry.h"
|
||||||
#include "zeek/IntrusivePtr.h"
|
#include "zeek/IntrusivePtr.h"
|
||||||
#include "zeek/Reporter.h"
|
#include "zeek/Reporter.h"
|
||||||
|
@ -134,6 +135,12 @@ bool ZeroMQBackend::DoInit() {
|
||||||
xpub.set(zmq::sockopt::linger, linger_ms);
|
xpub.set(zmq::sockopt::linger, linger_ms);
|
||||||
xpub.set(zmq::sockopt::xpub_nodrop, xpub_nodrop);
|
xpub.set(zmq::sockopt::xpub_nodrop, xpub_nodrop);
|
||||||
|
|
||||||
|
// Enable XPUB_VERBOSE unconditional to enforce nodes receiving
|
||||||
|
// notifications about any new subscriptions, even if they have
|
||||||
|
// seen them before. This is needed to for the subscribe callback
|
||||||
|
// functionality to work reliably.
|
||||||
|
xpub.set(zmq::sockopt::xpub_verbose, 1);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
xsub.connect(connect_xsub_endpoint);
|
xsub.connect(connect_xsub_endpoint);
|
||||||
} catch ( zmq::error_t& err ) {
|
} catch ( zmq::error_t& err ) {
|
||||||
|
@ -263,7 +270,7 @@ bool ZeroMQBackend::DoPublishEvent(const std::string& topic, const std::string&
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool ZeroMQBackend::DoSubscribe(const std::string& topic_prefix) {
|
bool ZeroMQBackend::DoSubscribe(const std::string& topic_prefix, SubscribeCallback cb) {
|
||||||
ZEROMQ_DEBUG("Subscribing to %s", topic_prefix.c_str());
|
ZEROMQ_DEBUG("Subscribing to %s", topic_prefix.c_str());
|
||||||
try {
|
try {
|
||||||
// Prepend 0x01 byte to indicate subscription to XSUB socket
|
// Prepend 0x01 byte to indicate subscription to XSUB socket
|
||||||
|
@ -272,9 +279,16 @@ bool ZeroMQBackend::DoSubscribe(const std::string& topic_prefix) {
|
||||||
main_inproc.send(zmq::const_buffer(msg.data(), msg.size()));
|
main_inproc.send(zmq::const_buffer(msg.data(), msg.size()));
|
||||||
} catch ( zmq::error_t& err ) {
|
} catch ( zmq::error_t& err ) {
|
||||||
zeek::reporter->Error("Failed to subscribe to topic %s: %s", topic_prefix.c_str(), err.what());
|
zeek::reporter->Error("Failed to subscribe to topic %s: %s", topic_prefix.c_str(), err.what());
|
||||||
|
if ( cb )
|
||||||
|
cb(topic_prefix, {CallbackStatus::Error, err.what()});
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Store the callback for later.
|
||||||
|
if ( cb )
|
||||||
|
subscription_callbacks.insert({topic_prefix, cb});
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -597,10 +611,33 @@ void ZeroMQBackend::Run() {
|
||||||
bool ZeroMQBackend::DoProcessBackendMessage(int tag, detail::byte_buffer_span payload) {
|
bool ZeroMQBackend::DoProcessBackendMessage(int tag, detail::byte_buffer_span payload) {
|
||||||
if ( tag == 0 || tag == 1 ) {
|
if ( tag == 0 || tag == 1 ) {
|
||||||
std::string topic{reinterpret_cast<const char*>(payload.data()), payload.size()};
|
std::string topic{reinterpret_cast<const char*>(payload.data()), payload.size()};
|
||||||
zeek::EventHandlerPtr eh = tag == 1 ? event_subscription : event_unsubscription;
|
zeek::EventHandlerPtr eh;
|
||||||
|
|
||||||
ZEROMQ_DEBUG("BackendMessage: %s for %s", eh->Name(), topic.c_str());
|
if ( tag == 1 ) {
|
||||||
|
// If this is the first time the subscription was observed, raise
|
||||||
|
// the ZeroMQ internal event.
|
||||||
|
if ( xpub_subscriptions.count(topic) == 0 ) {
|
||||||
|
eh = event_subscription;
|
||||||
|
xpub_subscriptions.insert(topic);
|
||||||
|
}
|
||||||
|
|
||||||
|
if ( const auto& cbit = subscription_callbacks.find(topic); cbit != subscription_callbacks.end() ) {
|
||||||
|
const auto& cb = cbit->second;
|
||||||
|
if ( cb )
|
||||||
|
cb(topic, {CallbackStatus::Success, "success"});
|
||||||
|
|
||||||
|
subscription_callbacks.erase(cbit);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if ( tag == 0 ) {
|
||||||
|
eh = event_unsubscription;
|
||||||
|
xpub_subscriptions.erase(topic);
|
||||||
|
}
|
||||||
|
|
||||||
|
ZEROMQ_DEBUG("BackendMessage: %s for %s", eh != nullptr ? eh->Name() : "<raising no event>", topic.c_str());
|
||||||
|
if ( eh )
|
||||||
EnqueueEvent(eh, zeek::Args{zeek::make_intrusive<zeek::StringVal>(topic)});
|
EnqueueEvent(eh, zeek::Args{zeek::make_intrusive<zeek::StringVal>(topic)});
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
|
|
|
@ -50,7 +50,7 @@ private:
|
||||||
bool DoPublishEvent(const std::string& topic, const std::string& format,
|
bool DoPublishEvent(const std::string& topic, const std::string& format,
|
||||||
const cluster::detail::byte_buffer& buf) override;
|
const cluster::detail::byte_buffer& buf) override;
|
||||||
|
|
||||||
bool DoSubscribe(const std::string& topic_prefix) override;
|
bool DoSubscribe(const std::string& topic_prefix, SubscribeCallback cb) override;
|
||||||
|
|
||||||
bool DoUnsubscribe(const std::string& topic_prefix) override;
|
bool DoUnsubscribe(const std::string& topic_prefix) override;
|
||||||
|
|
||||||
|
@ -95,6 +95,10 @@ private:
|
||||||
std::thread self_thread;
|
std::thread self_thread;
|
||||||
|
|
||||||
std::unique_ptr<ProxyThread> proxy_thread;
|
std::unique_ptr<ProxyThread> proxy_thread;
|
||||||
|
|
||||||
|
// Tracking the subscriptions on the local XPUB socket.
|
||||||
|
std::map<std::string, SubscribeCallback> subscription_callbacks;
|
||||||
|
std::set<std::string> xpub_subscriptions;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace zeek::cluster::zeromq
|
} // namespace zeek::cluster::zeromq
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue