Merge remote-tracking branch 'origin/topic/awelzel/4136-cluster-backend-pre-work'

* origin/topic/awelzel/4136-cluster-backend-pre-work:
  cluster/zeromq: Fix Unsubscribe() bug caused by \x00 prefix
  cluster: Add SubscribeCallback support
  cluster/zeromq: Fix XSUB threading issues
  cluster/zeromq: Use NodeId(), drop my_node_id
  cluster/Backend: Pass node_id via Init()
  cluster/Backend: Make backend event processing customizable
  cluster/broker/Serializer: Fix adaptor to adapter
  cluster/Backend: Do not use const std::string_view&
  cluster/serializer/broker: Fix handler lookup
  broker/Manager: Move name in PublishEvent()
  btest/zeromq/test-bootstrap: Fix port parsing
  EventHandler: Support operator!=
This commit is contained in:
Arne Welzel 2025-02-05 11:10:21 +01:00
commit da673d6577
28 changed files with 583 additions and 98 deletions

58
CHANGES
View file

@ -1,3 +1,61 @@
7.2.0-dev.169 | 2025-02-05 11:10:21 +0100
* cluster/zeromq: Fix Unsubscribe() bug caused by \x00 prefix (Arne Welzel, Corelight)
* cluster: Add SubscribeCallback support (Arne Welzel, Corelight)
This allows callers of Subscribe() to pass in a callback that will be invoked
once the subscription is established or failed to establish. It is the
backend's responsibility to execute the callback on the main thread either
synchronously, or preferably asynchronously at a later point, by
scheduling a task on the IO main loop.
This turns on ZMQ_XPUB_VERBOSE for ZeroMQ so that notifications about
subscriptions are raised even if the subscriptions has previously been
observed.
* cluster/zeromq: Fix XSUB threading issues (Arne Welzel, Corelight)
It is not safe to use the same socket from different threads, but the
current code used the xsub socket directly from the main thread (to setup
subscriptions) and from the internal thread for polling and reading.
Leverage the PAIR socket already in use for forwarding publish operations
to the internal thread also for subscribe and unsubscribe.
The failure mode is/was a bit annoying. Essentially, closing of the
context would hang indefinitely in zmq_ctx_term().
* cluster/zeromq: Use NodeId(), drop my_node_id (Arne Welzel, Corelight)
* cluster/Backend: Pass node_id via Init() (Arne Welzel, Corelight)
* cluster/Backend: Make backend event processing customizable (Arne Welzel, Corelight)
This allows configurability at the code level to decide what to do with
a received remote events and events produced by a backend. For now, only
enqueue events into the process's script layer, but for the WebSocket
interface, the action would be to send out the event on a WebSocket
connection instead.
* cluster/broker/Serializer: Fix adaptor to adapter (Arne Welzel, Corelight)
* cluster/Backend: Do not use const std::string_view& (Arne Welzel, Corelight)
* cluster/serializer/broker: Fix handler lookup (Arne Welzel, Corelight)
Handler overwrites operator bool, so need to explicitly test for nullptr
rather than not having any handlers defined.
* broker/Manager: Move name in PublishEvent() (Arne Welzel, Corelight)
* btest/zeromq/test-bootstrap: Fix port parsing (Arne Welzel, Corelight)
to_port() will produce an error on empty strings which extract_count()
does not.
* EventHandler: Support operator!= (Arne Welzel, Corelight)
7.2.0-dev.156 | 2025-02-05 09:05:00 +0000 7.2.0-dev.156 | 2025-02-05 09:05:00 +0000
* IPv6 support for detect-external-names and testcase (Johanna Amann, Corelight) * IPv6 support for detect-external-names and testcase (Johanna Amann, Corelight)

View file

@ -1 +1 @@
7.2.0-dev.156 7.2.0-dev.169

View file

@ -585,7 +585,7 @@ function log(msg: string)
function init(): bool function init(): bool
{ {
return Cluster::Backend::__init(); return Cluster::Backend::__init(Cluster::node_id());
} }
function subscribe(topic: string): bool function subscribe(topic: string): bool

View file

@ -162,7 +162,8 @@ export {
## Bitmask to enable low-level stderr based debug printing. ## Bitmask to enable low-level stderr based debug printing.
## ##
## poll debugging: 1 (produce verbose zmq::poll() output) ## poll: 1 (produce verbose zmq::poll() output)
## thread: 2 (produce thread related output)
## ##
## Or values from the above list together and set debug_flags ## Or values from the above list together and set debug_flags
## to the result. E.g. use 7 to select 4, 2 and 1. Only use this ## to the result. E.g. use 7 to select 4, 2 and 1. Only use this

View file

@ -101,6 +101,8 @@ public:
bool operator==(const EventHandlerPtr& h) const { return handler == h.handler; } bool operator==(const EventHandlerPtr& h) const { return handler == h.handler; }
bool operator!=(const EventHandlerPtr& h) const { return ! (*this == h); }
EventHandler* Ptr() { return handler; } EventHandler* Ptr() { return handler; }
explicit operator bool() const { return handler && *handler; } explicit operator bool() const { return handler && *handler; }

View file

@ -213,7 +213,7 @@ std::string RenderEvent(const std::string& topic, const std::string& name, const
} // namespace } // namespace
#endif #endif
Manager::Manager(bool arg_use_real_time) : Backend(nullptr, nullptr) { Manager::Manager(bool arg_use_real_time) : Backend(nullptr, nullptr, nullptr) {
bound_port = 0; bound_port = 0;
use_real_time = arg_use_real_time; use_real_time = arg_use_real_time;
peer_count = 0; peer_count = 0;
@ -558,7 +558,7 @@ bool Manager::DoPublishEvent(const std::string& topic, const cluster::detail::Ev
} }
std::string name(event.HandlerName()); std::string name(event.HandlerName());
return PublishEvent(topic, name, std::move(xs), event.timestamp); return PublishEvent(topic, std::move(name), std::move(xs), event.timestamp);
} }
bool Manager::PublishEvent(string topic, std::string name, broker::vector args, double ts) { bool Manager::PublishEvent(string topic, std::string name, broker::vector args, double ts) {
@ -946,10 +946,13 @@ zeek::RecordValPtr Manager::MakeEvent(ArgsSpan args, zeek::detail::Frame* frame)
return rval; return rval;
} }
bool Manager::DoSubscribe(const string& topic_prefix) { bool Manager::DoSubscribe(const string& topic_prefix, SubscribeCallback cb) {
DBG_LOG(DBG_BROKER, "Subscribing to topic prefix %s", topic_prefix.c_str()); DBG_LOG(DBG_BROKER, "Subscribing to topic prefix %s", topic_prefix.c_str());
bstate->subscriber.add_topic(topic_prefix, ! run_state::detail::zeek_init_done); bstate->subscriber.add_topic(topic_prefix, ! run_state::detail::zeek_init_done);
if ( cb )
cb(topic_prefix, {CallbackStatus::NotImplemented});
return true; return true;
} }

View file

@ -380,7 +380,7 @@ public:
private: private:
// Register interest in peer event messages that use a certain topic prefix. // Register interest in peer event messages that use a certain topic prefix.
bool DoSubscribe(const std::string& topic_prefix) override; bool DoSubscribe(const std::string& topic_prefix, SubscribeCallback cb) override;
// Unregister interest in peer event messages. // Unregister interest in peer event messages.
bool DoUnsubscribe(const std::string& topic_prefix) override; bool DoUnsubscribe(const std::string& topic_prefix) override;

View file

@ -15,8 +15,9 @@ zeek::plugin::Configuration Plugin::Configure() {
// instantiated in zeek-setup.cc. Don't even allow to instantiate // instantiated in zeek-setup.cc. Don't even allow to instantiate
// a second one via the plugin mechanism. In the future, this could // a second one via the plugin mechanism. In the future, this could
// be changed so that broker is instantiated on demand only. // be changed so that broker is instantiated on demand only.
auto fail_instantiate = [](std::unique_ptr<cluster::EventSerializer>, auto fail_instantiate =
std::unique_ptr<cluster::LogSerializer>) -> std::unique_ptr<cluster::Backend> { [](std::unique_ptr<cluster::EventSerializer>, std::unique_ptr<cluster::LogSerializer>,
std::unique_ptr<cluster::detail::EventHandlingStrategy>) -> std::unique_ptr<cluster::Backend> {
zeek::reporter->FatalError("do not instantiate broker explicitly"); zeek::reporter->FatalError("do not instantiate broker explicitly");
return nullptr; return nullptr;
}; };

View file

@ -13,9 +13,20 @@
#include "zeek/cluster/Serializer.h" #include "zeek/cluster/Serializer.h"
#include "zeek/iosource/Manager.h" #include "zeek/iosource/Manager.h"
#include "zeek/logging/Manager.h" #include "zeek/logging/Manager.h"
#include "zeek/util.h"
using namespace zeek::cluster; using namespace zeek::cluster;
bool detail::LocalEventHandlingStrategy::DoHandleRemoteEvent(std::string_view topic, detail::Event e) {
zeek::event_mgr.Enqueue(e.Handler(), std::move(e.args), util::detail::SOURCE_BROKER, 0, nullptr, e.timestamp);
return true;
}
void detail::LocalEventHandlingStrategy::DoEnqueueLocalEvent(EventHandlerPtr h, zeek::Args args) {
zeek::event_mgr.Enqueue(h, std::move(args));
}
std::optional<zeek::Args> detail::check_args(const zeek::FuncValPtr& handler, zeek::ArgsSpan args) { std::optional<zeek::Args> detail::check_args(const zeek::FuncValPtr& handler, zeek::ArgsSpan args) {
const auto& func_type = handler->GetType<zeek::FuncType>(); const auto& func_type = handler->GetType<zeek::FuncType>();
@ -58,6 +69,10 @@ std::optional<zeek::Args> detail::check_args(const zeek::FuncValPtr& handler, ze
return result; return result;
} }
Backend::Backend(std::unique_ptr<EventSerializer> es, std::unique_ptr<LogSerializer> ls,
std::unique_ptr<detail::EventHandlingStrategy> ehs)
: event_serializer(std::move(es)), log_serializer(std::move(ls)), event_handling_strategy(std::move(ehs)) {}
std::optional<detail::Event> Backend::MakeClusterEvent(FuncValPtr handler, ArgsSpan args, double timestamp) const { std::optional<detail::Event> Backend::MakeClusterEvent(FuncValPtr handler, ArgsSpan args, double timestamp) const {
auto checked_args = detail::check_args(handler, args); auto checked_args = detail::check_args(handler, args);
if ( ! checked_args ) if ( ! checked_args )
@ -96,7 +111,11 @@ bool Backend::DoPublishLogWrites(const zeek::logging::detail::LogWriteHeader& he
return DoPublishLogWrites(header, log_serializer->Name(), buf); return DoPublishLogWrites(header, log_serializer->Name(), buf);
} }
bool Backend::ProcessEventMessage(const std::string_view& topic, const std::string_view& format, void Backend::EnqueueEvent(EventHandlerPtr h, zeek::Args args) {
event_handling_strategy->EnqueueLocalEvent(h, std::move(args));
}
bool Backend::ProcessEventMessage(std::string_view topic, std::string_view format,
const detail::byte_buffer_span payload) { const detail::byte_buffer_span payload) {
if ( format != event_serializer->Name() ) { if ( format != event_serializer->Name() ) {
zeek::reporter->Error("ProcessEventMessage: Wrong format: %s vs %s", std::string{format}.c_str(), zeek::reporter->Error("ProcessEventMessage: Wrong format: %s vs %s", std::string{format}.c_str(),
@ -113,14 +132,10 @@ bool Backend::ProcessEventMessage(const std::string_view& topic, const std::stri
return false; return false;
} }
auto& event = *r; return event_handling_strategy->HandleRemoteEvent(topic, std::move(*r));
zeek::event_mgr.Enqueue(event.Handler(), std::move(event.args), util::detail::SOURCE_BROKER, 0, nullptr,
event.timestamp);
return true;
} }
bool Backend::ProcessLogMessage(const std::string_view& format, detail::byte_buffer_span payload) { bool Backend::ProcessLogMessage(std::string_view format, detail::byte_buffer_span payload) {
// We could also dynamically lookup the right de-serializer, but // We could also dynamically lookup the right de-serializer, but
// for now assume we just receive what is configured. // for now assume we just receive what is configured.
if ( format != log_serializer->Name() ) { if ( format != log_serializer->Name() ) {

View file

@ -53,6 +53,75 @@ public:
const EventHandlerPtr& Handler() const { return handler; } const EventHandlerPtr& Handler() const { return handler; }
}; };
/**
* Interface for processing cluster::Event instances received
* on a given topic.
*
* An instances is injected into Backend instances to allow
* modifying the behavior for received events. For instance,
* for backends instantiated for WebSocket clients, events
* should not be raised as Zeek events locally and instead
* transmitted to the WebSocket client.
*/
class EventHandlingStrategy {
public:
virtual ~EventHandlingStrategy() = default;
/**
* Method for processing a remote event received on the given topic.
*
* When handling the remote event fails, this method should return false.
*
* @param topic The topic on which the event was received.
* @param ev The parsed event that was received.
*
* @return true if the remote event was handled successfully, else false.
*/
bool HandleRemoteEvent(std::string_view topic, Event e) { return DoHandleRemoteEvent(topic, std::move(e)); }
/**
* Method for enquing backend specific events.
*
* Some backend's may raise events destined for the local
* scripting layer. That's usually wanted, but not always.
* When the backend is instantiated for a WebSocket client,
* local scripting layer should not raise events for the
* WebSocket client.
* @param h The event handler to use.
* @param args The event arguments.
*/
void EnqueueLocalEvent(EventHandlerPtr h, zeek::Args args) { DoEnqueueLocalEvent(h, std::move(args)); }
private:
/**
* Hook method for implementing HandleRemoteEvent().
*
* @param topic The topic on which the event was received.
* @param ev The parsed event that was received.
*
* @return true if the remote event was handled successfully, else false.
*/
virtual bool DoHandleRemoteEvent(std::string_view topic, Event e) = 0;
/**
* Hook method for implementing EnqueueLocalEvent().
*
* @param h The event handler to use.
* @param args The event arguments.
*/
virtual void DoEnqueueLocalEvent(EventHandlerPtr h, zeek::Args args) = 0;
};
/**
* Strategy enqueueing events into this process's Zeek event loop.
*/
class LocalEventHandlingStrategy : public EventHandlingStrategy {
private:
bool DoHandleRemoteEvent(std::string_view topic, Event e) override;
void DoEnqueueLocalEvent(EventHandlerPtr h, zeek::Args args) override;
};
/** /**
* Validate that the provided args are suitable for handler. * Validate that the provided args are suitable for handler.
* *
@ -80,8 +149,14 @@ public:
/** /**
* Method invoked from the Cluster::Backend::__init() bif. * Method invoked from the Cluster::Backend::__init() bif.
*
* @param nid The node identifier to use.
*/ */
bool Init() { return DoInit(); } bool Init(std::string nid) {
node_id = std::move(nid);
return DoInit();
}
/** /**
* Hook invoked when Zeek is about to terminate. * Hook invoked when Zeek is about to terminate.
@ -110,13 +185,40 @@ public:
return DoPublishEvent(topic, event); return DoPublishEvent(topic, event);
} }
/**
* Status codes for callbacks.
*/
enum class CallbackStatus {
Success,
Error,
NotImplemented,
};
/**
* Information for subscription callbacks.
*/
struct SubscriptionCallbackInfo {
CallbackStatus status; // The status of the operation.
std::optional<std::string> message; // Optional message.
};
using SubscribeCallback =
std::function<void(const std::string& topic_prefix, const SubscriptionCallbackInfo& info)>;
/** /**
* Register interest in messages that use a certain topic prefix. * Register interest in messages that use a certain topic prefix.
* *
* Invoking cb may happen while Subscribe() executes, for example if the
* call to Subscribe() is synchronous, or an error is discovered before
* submitting any work.
*
* @param topic_prefix a prefix to match against remote message topics. * @param topic_prefix a prefix to match against remote message topics.
* @param cb callback invoked when the subscription was processed.
* @return true if it's a new event subscription and it is now registered. * @return true if it's a new event subscription and it is now registered.
*/ */
bool Subscribe(const std::string& topic_prefix) { return DoSubscribe(topic_prefix); } bool Subscribe(const std::string& topic_prefix, SubscribeCallback cb = SubscribeCallback()) {
return DoSubscribe(topic_prefix, std::move(cb));
}
/** /**
* Unregister interest in messages on a certain topic. * Unregister interest in messages on a certain topic.
@ -140,23 +242,42 @@ public:
return DoPublishLogWrites(header, records); return DoPublishLogWrites(header, records);
} }
/**
* @return This backend's node identifier.
*/
const std::string& NodeId() const { return node_id; }
protected: protected:
/** /**
* Constructor. * Constructor.
*
* @param es The event serializer to use.
* @param ls The log batch serializer to use.
* @param ehs The event handling strategy to use for this backend.
*/ */
Backend(std::unique_ptr<EventSerializer> es, std::unique_ptr<LogSerializer> ls) Backend(std::unique_ptr<EventSerializer> es, std::unique_ptr<LogSerializer> ls,
: event_serializer(std::move(es)), log_serializer(std::move(ls)) {} std::unique_ptr<detail::EventHandlingStrategy> ehs);
/**
* Enqueue an event to be raised to this process Zeek scripting layer.
*
* When a backend is used for a WebSocket client connection, events
* raised through this method are blackholed.
*
* @param h The event handler.
* @param args The event arguments.
*/
void EnqueueEvent(EventHandlerPtr h, zeek::Args args);
/** /**
* Process an incoming event message. * Process an incoming event message.
*/ */
bool ProcessEventMessage(const std::string_view& topic, const std::string_view& format, bool ProcessEventMessage(std::string_view topic, std::string_view format, detail::byte_buffer_span payload);
detail::byte_buffer_span payload);
/** /**
* Process an incoming log message. * Process an incoming log message.
*/ */
bool ProcessLogMessage(const std::string_view& format, detail::byte_buffer_span payload); bool ProcessLogMessage(std::string_view format, detail::byte_buffer_span payload);
private: private:
/** /**
@ -220,13 +341,19 @@ private:
* Register interest in messages that use a certain topic prefix. * Register interest in messages that use a certain topic prefix.
* *
* If the backend hasn't yet established a connection, any subscriptions * If the backend hasn't yet established a connection, any subscriptions
* should be queued until they can be processed. * should be queued until they can be processed. If a callback is given,
* it should be called once the subscription can be determined to be
* active. The callback has to be invoked from Zeek's main thread. If
* the backend does not implement callbacks, it should invoke the callback
* with CallbackStatus::NotImplemented, which will act as success, but
* provides a way to distinguish behavior.
* *
* @param topic_prefix a prefix to match against remote message topics. * @param topic_prefix a prefix to match against remote message topics.
* @param cb callback to invoke when the subscription is active
* *
* @return true if it's a new event subscription and now registered. * @return true if it's a new event subscription and now registered.
*/ */
virtual bool DoSubscribe(const std::string& topic_prefix) = 0; virtual bool DoSubscribe(const std::string& topic_prefix, SubscribeCallback cb) = 0;
/** /**
* Unregister interest in messages on a certain topic. * Unregister interest in messages on a certain topic.
@ -278,6 +405,12 @@ private:
std::unique_ptr<EventSerializer> event_serializer; std::unique_ptr<EventSerializer> event_serializer;
std::unique_ptr<LogSerializer> log_serializer; std::unique_ptr<LogSerializer> log_serializer;
std::unique_ptr<detail::EventHandlingStrategy> event_handling_strategy;
/**
* The backend's instance cluster node identifier.
*/
std::string node_id;
}; };
/** /**

View file

@ -13,7 +13,8 @@ namespace zeek::cluster {
class BackendComponent : public plugin::Component { class BackendComponent : public plugin::Component {
public: public:
using factory_callback = std::unique_ptr<Backend> (*)(std::unique_ptr<EventSerializer>, using factory_callback = std::unique_ptr<Backend> (*)(std::unique_ptr<EventSerializer>,
std::unique_ptr<LogSerializer>); std::unique_ptr<LogSerializer>,
std::unique_ptr<detail::EventHandlingStrategy>);
/** /**
* Constructor. * Constructor.

View file

@ -11,11 +11,14 @@ Manager::Manager()
event_serializers(plugin::ComponentManager<EventSerializerComponent>("Cluster", "EventSerializerTag")), event_serializers(plugin::ComponentManager<EventSerializerComponent>("Cluster", "EventSerializerTag")),
log_serializers(plugin::ComponentManager<LogSerializerComponent>("Cluster", "LogSerializerTag")) {} log_serializers(plugin::ComponentManager<LogSerializerComponent>("Cluster", "LogSerializerTag")) {}
std::unique_ptr<Backend> Manager::InstantiateBackend(const zeek::EnumValPtr& tag, std::unique_ptr<Backend> Manager::InstantiateBackend(
std::unique_ptr<EventSerializer> event_serializer, const zeek::EnumValPtr& tag, std::unique_ptr<EventSerializer> event_serializer,
std::unique_ptr<LogSerializer> log_serializer) { std::unique_ptr<LogSerializer> log_serializer,
const BackendComponent* c = Backends().Lookup(tag); std::unique_ptr<detail::EventHandlingStrategy> event_handling_strategy) {
return c ? c->Factory()(std::move(event_serializer), std::move(log_serializer)) : nullptr; if ( const auto* c = Backends().Lookup(tag) )
return c->Factory()(std::move(event_serializer), std::move(log_serializer), std::move(event_handling_strategy));
return nullptr;
} }
std::unique_ptr<EventSerializer> Manager::InstantiateEventSerializer(const zeek::EnumValPtr& tag) { std::unique_ptr<EventSerializer> Manager::InstantiateEventSerializer(const zeek::EnumValPtr& tag) {

View file

@ -27,12 +27,14 @@ public:
* @param tag The enum value identifying the backend. * @param tag The enum value identifying the backend.
* @param event_serializer The event serializer to inject. * @param event_serializer The event serializer to inject.
* @param log_serializer The log serializer to inject. * @param log_serializer The log serializer to inject.
* @param event_handling_strategy The event handling strategy to inject.
* *
* @return New ClusterBackend instance, or null if there's no such component. * @return New ClusterBackend instance, or null if there's no such component.
*/ */
std::unique_ptr<Backend> InstantiateBackend(const EnumValPtr& tag, std::unique_ptr<Backend> InstantiateBackend(const EnumValPtr& tag,
std::unique_ptr<EventSerializer> event_serializer, std::unique_ptr<EventSerializer> event_serializer,
std::unique_ptr<LogSerializer> log_serializer); std::unique_ptr<LogSerializer> log_serializer,
std::unique_ptr<detail::EventHandlingStrategy> event_handling_strategy);
/** /**
* Instantiate a event serializer with the given enum value. * Instantiate a event serializer with the given enum value.

View file

@ -39,6 +39,12 @@ bool ProxyThread::Start() {
zmq::socket_t xpub(ctx, zmq::socket_type::xpub); zmq::socket_t xpub(ctx, zmq::socket_type::xpub);
zmq::socket_t xsub(ctx, zmq::socket_type::xsub); zmq::socket_t xsub(ctx, zmq::socket_type::xsub);
// Enable XPUB_VERBOSE unconditional to enforce nodes receiving
// notifications about any new subscriptions, even if they have
// seen them before. This is needed to for the subscribe callback
// functionality to work reliably.
xpub.set(zmq::sockopt::xpub_verbose, 1);
xpub.set(zmq::sockopt::xpub_nodrop, xpub_nodrop); xpub.set(zmq::sockopt::xpub_nodrop, xpub_nodrop);
try { try {

View file

@ -14,7 +14,7 @@
#include <zmq.hpp> #include <zmq.hpp>
#include "zeek/DebugLogger.h" #include "zeek/DebugLogger.h"
#include "zeek/Event.h" #include "zeek/EventHandler.h"
#include "zeek/EventRegistry.h" #include "zeek/EventRegistry.h"
#include "zeek/IntrusivePtr.h" #include "zeek/IntrusivePtr.h"
#include "zeek/Reporter.h" #include "zeek/Reporter.h"
@ -23,6 +23,7 @@
#include "zeek/cluster/Serializer.h" #include "zeek/cluster/Serializer.h"
#include "zeek/cluster/backend/zeromq/Plugin.h" #include "zeek/cluster/backend/zeromq/Plugin.h"
#include "zeek/cluster/backend/zeromq/ZeroMQ-Proxy.h" #include "zeek/cluster/backend/zeromq/ZeroMQ-Proxy.h"
#include "zeek/util.h"
namespace zeek { namespace zeek {
@ -37,6 +38,7 @@ namespace cluster::zeromq {
enum class DebugFlag : zeek_uint_t { enum class DebugFlag : zeek_uint_t {
NONE = 0, NONE = 0,
POLL = 1, POLL = 1,
THREAD = 2,
}; };
constexpr DebugFlag operator&(zeek_uint_t x, DebugFlag y) { constexpr DebugFlag operator&(zeek_uint_t x, DebugFlag y) {
@ -66,22 +68,16 @@ void self_thread_fun(void* arg) {
} // namespace } // namespace
// Constructor. ZeroMQBackend::ZeroMQBackend(std::unique_ptr<EventSerializer> es, std::unique_ptr<LogSerializer> ls,
ZeroMQBackend::ZeroMQBackend(std::unique_ptr<EventSerializer> es, std::unique_ptr<LogSerializer> ls) std::unique_ptr<detail::EventHandlingStrategy> ehs)
: ThreadedBackend(std::move(es), std::move(ls)) { : ThreadedBackend(std::move(es), std::move(ls), std::move(ehs)) {
xsub = zmq::socket_t(ctx, zmq::socket_type::xsub);
xpub = zmq::socket_t(ctx, zmq::socket_type::xpub);
log_push = zmq::socket_t(ctx, zmq::socket_type::push); log_push = zmq::socket_t(ctx, zmq::socket_type::push);
log_pull = zmq::socket_t(ctx, zmq::socket_type::pull);
main_inproc = zmq::socket_t(ctx, zmq::socket_type::pair); main_inproc = zmq::socket_t(ctx, zmq::socket_type::pair);
child_inproc = zmq::socket_t(ctx, zmq::socket_type::pair);
} }
void ZeroMQBackend::DoInitPostScript() { void ZeroMQBackend::DoInitPostScript() {
ThreadedBackend::DoInitPostScript(); ThreadedBackend::DoInitPostScript();
my_node_id = zeek::id::find_val<zeek::StringVal>("Cluster::Backend::ZeroMQ::my_node_id")->ToStdString();
listen_xpub_endpoint = listen_xpub_endpoint =
zeek::id::find_val<zeek::StringVal>("Cluster::Backend::ZeroMQ::listen_xpub_endpoint")->ToStdString(); zeek::id::find_val<zeek::StringVal>("Cluster::Backend::ZeroMQ::listen_xpub_endpoint")->ToStdString();
listen_xsub_endpoint = listen_xsub_endpoint =
@ -99,9 +95,6 @@ void ZeroMQBackend::DoInitPostScript() {
event_unsubscription = zeek::event_registry->Register("Cluster::Backend::ZeroMQ::unsubscription"); event_unsubscription = zeek::event_registry->Register("Cluster::Backend::ZeroMQ::unsubscription");
event_subscription = zeek::event_registry->Register("Cluster::Backend::ZeroMQ::subscription"); event_subscription = zeek::event_registry->Register("Cluster::Backend::ZeroMQ::subscription");
main_inproc.bind("inproc://publish-bridge");
child_inproc.connect("inproc://publish-bridge");
} }
@ -111,13 +104,12 @@ void ZeroMQBackend::DoTerminate() {
ZEROMQ_DEBUG("Joining self_thread"); ZEROMQ_DEBUG("Joining self_thread");
if ( self_thread.joinable() ) if ( self_thread.joinable() )
self_thread.join(); self_thread.join();
ZEROMQ_DEBUG("Joined self_thread");
// Close the sockets that are used from the main thread,
// the remaining sockets are closed by self_thread.
log_push.close(); log_push.close();
log_pull.close();
xsub.close();
xpub.close();
main_inproc.close(); main_inproc.close();
child_inproc.close();
ZEROMQ_DEBUG("Closing ctx"); ZEROMQ_DEBUG("Closing ctx");
ctx.close(); ctx.close();
@ -132,12 +124,23 @@ void ZeroMQBackend::DoTerminate() {
} }
bool ZeroMQBackend::DoInit() { bool ZeroMQBackend::DoInit() {
xsub = zmq::socket_t(ctx, zmq::socket_type::xsub);
xpub = zmq::socket_t(ctx, zmq::socket_type::xpub);
log_pull = zmq::socket_t(ctx, zmq::socket_type::pull);
child_inproc = zmq::socket_t(ctx, zmq::socket_type::pair);
auto linger_ms = static_cast<int>(zeek::id::find_val<zeek::IntVal>("Cluster::Backend::ZeroMQ::linger_ms")->AsInt()); auto linger_ms = static_cast<int>(zeek::id::find_val<zeek::IntVal>("Cluster::Backend::ZeroMQ::linger_ms")->AsInt());
int xpub_nodrop = zeek::id::find_val<zeek::BoolVal>("Cluster::Backend::ZeroMQ::xpub_nodrop")->AsBool() ? 1 : 0; int xpub_nodrop = zeek::id::find_val<zeek::BoolVal>("Cluster::Backend::ZeroMQ::xpub_nodrop")->AsBool() ? 1 : 0;
xpub.set(zmq::sockopt::linger, linger_ms); xpub.set(zmq::sockopt::linger, linger_ms);
xpub.set(zmq::sockopt::xpub_nodrop, xpub_nodrop); xpub.set(zmq::sockopt::xpub_nodrop, xpub_nodrop);
// Enable XPUB_VERBOSE unconditional to enforce nodes receiving
// notifications about any new subscriptions, even if they have
// seen them before. This is needed to for the subscribe callback
// functionality to work reliably.
xpub.set(zmq::sockopt::xpub_verbose, 1);
try { try {
xsub.connect(connect_xsub_endpoint); xsub.connect(connect_xsub_endpoint);
} catch ( zmq::error_t& err ) { } catch ( zmq::error_t& err ) {
@ -220,6 +223,10 @@ bool ZeroMQBackend::DoInit() {
// following post might be useful: // following post might be useful:
// //
// https://funcptr.net/2012/09/10/zeromq---edge-triggered-notification/ // https://funcptr.net/2012/09/10/zeromq---edge-triggered-notification/
// Setup connectivity between main and child thread.
main_inproc.bind("inproc://inproc-bridge");
child_inproc.connect("inproc://inproc-bridge");
self_thread = std::thread(self_thread_fun, this); self_thread = std::thread(self_thread_fun, this);
// After connecting, call ThreadedBackend::DoInit() to register // After connecting, call ThreadedBackend::DoInit() to register
@ -242,7 +249,7 @@ bool ZeroMQBackend::DoPublishEvent(const std::string& topic, const std::string&
// * The serialized event itself. // * The serialized event itself.
std::array<zmq::const_buffer, 4> parts = { std::array<zmq::const_buffer, 4> parts = {
zmq::const_buffer(topic.data(), topic.size()), zmq::const_buffer(topic.data(), topic.size()),
zmq::const_buffer(my_node_id.data(), my_node_id.size()), zmq::const_buffer(NodeId().data(), NodeId().size()),
zmq::const_buffer(format.data(), format.size()), zmq::const_buffer(format.data(), format.size()),
zmq::const_buffer(buf.data(), buf.size()), zmq::const_buffer(buf.data(), buf.size()),
}; };
@ -263,18 +270,25 @@ bool ZeroMQBackend::DoPublishEvent(const std::string& topic, const std::string&
return true; return true;
} }
bool ZeroMQBackend::DoSubscribe(const std::string& topic_prefix) { bool ZeroMQBackend::DoSubscribe(const std::string& topic_prefix, SubscribeCallback cb) {
ZEROMQ_DEBUG("Subscribing to %s", topic_prefix.c_str()); ZEROMQ_DEBUG("Subscribing to %s", topic_prefix.c_str());
try { try {
// Prepend 0x01 byte to indicate subscription to XSUB socket // Prepend 0x01 byte to indicate subscription to XSUB socket
// This is the XSUB API instead of setsockopt(ZMQ_SUBSCRIBE). // This is the XSUB API instead of setsockopt(ZMQ_SUBSCRIBE).
std::string msg = "\x01" + topic_prefix; std::string msg = "\x01" + topic_prefix;
xsub.send(zmq::const_buffer(msg.data(), msg.size())); main_inproc.send(zmq::const_buffer(msg.data(), msg.size()));
} catch ( zmq::error_t& err ) { } catch ( zmq::error_t& err ) {
zeek::reporter->Error("Failed to subscribe to topic %s: %s", topic_prefix.c_str(), err.what()); zeek::reporter->Error("Failed to subscribe to topic %s: %s", topic_prefix.c_str(), err.what());
if ( cb )
cb(topic_prefix, {CallbackStatus::Error, err.what()});
return false; return false;
} }
// Store the callback for later.
if ( cb )
subscription_callbacks.insert({topic_prefix, cb});
return true; return true;
} }
@ -283,8 +297,8 @@ bool ZeroMQBackend::DoUnsubscribe(const std::string& topic_prefix) {
try { try {
// Prepend 0x00 byte to indicate subscription to XSUB socket. // Prepend 0x00 byte to indicate subscription to XSUB socket.
// This is the XSUB API instead of setsockopt(ZMQ_SUBSCRIBE). // This is the XSUB API instead of setsockopt(ZMQ_SUBSCRIBE).
std::string msg = "\x00" + topic_prefix; std::string msg = '\0' + topic_prefix;
xsub.send(zmq::const_buffer(msg.data(), msg.size())); main_inproc.send(zmq::const_buffer(msg.data(), msg.size()));
} catch ( zmq::error_t& err ) { } catch ( zmq::error_t& err ) {
zeek::reporter->Error("Failed to unsubscribe from topic %s: %s", topic_prefix.c_str(), err.what()); zeek::reporter->Error("Failed to unsubscribe from topic %s: %s", topic_prefix.c_str(), err.what());
return false; return false;
@ -306,7 +320,7 @@ bool ZeroMQBackend::DoPublishLogWrites(const logging::detail::LogWriteHeader& he
// * The serialized log write itself. // * The serialized log write itself.
std::array<zmq::const_buffer, 4> parts = { std::array<zmq::const_buffer, 4> parts = {
zmq::const_buffer{message_type.data(), message_type.size()}, zmq::const_buffer{message_type.data(), message_type.size()},
zmq::const_buffer(my_node_id.data(), my_node_id.size()), zmq::const_buffer(NodeId().data(), NodeId().size()),
zmq::const_buffer{format.data(), format.size()}, zmq::const_buffer{format.data(), format.size()},
zmq::const_buffer{buf.data(), buf.size()}, zmq::const_buffer{buf.data(), buf.size()},
}; };
@ -342,6 +356,9 @@ bool ZeroMQBackend::DoPublishLogWrites(const logging::detail::LogWriteHeader& he
} }
void ZeroMQBackend::Run() { void ZeroMQBackend::Run() {
util::detail::set_thread_name(zeek::util::fmt("zmq-%p", this));
ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::THREAD, "Thread starting (%p)\n", this);
using MultipartMessage = std::vector<zmq::message_t>; using MultipartMessage = std::vector<zmq::message_t>;
auto HandleLogMessages = [this](const std::vector<MultipartMessage>& msgs) { auto HandleLogMessages = [this](const std::vector<MultipartMessage>& msgs) {
@ -364,31 +381,52 @@ void ZeroMQBackend::Run() {
}; };
auto HandleInprocMessages = [this](std::vector<MultipartMessage>& msgs) { auto HandleInprocMessages = [this](std::vector<MultipartMessage>& msgs) {
// Forward messages from the inprocess bridge to xpub. // Forward messages from the inprocess bridge to XSUB for subscription
// subscription handling (1 part) or XPUB for publishing (4 parts).
for ( auto& msg : msgs ) { for ( auto& msg : msgs ) {
assert(msg.size() == 4); assert(msg.size() == 1 || msg.size() == 4);
if ( msg.size() == 1 ) {
xsub.send(msg[0], zmq::send_flags::none);
}
else {
for ( auto& part : msg ) { for ( auto& part : msg ) {
zmq::send_flags flags = zmq::send_flags::dontwait; zmq::send_flags flags = zmq::send_flags::dontwait;
if ( part.more() ) if ( part.more() )
flags = flags | zmq::send_flags::sndmore; flags = flags | zmq::send_flags::sndmore;
zmq::send_result_t result; zmq::send_result_t result;
int tries = 0;
do { do {
try { try {
result = xpub.send(part, flags); result = xpub.send(part, flags);
} catch ( zmq::error_t& err ) { } catch ( zmq::error_t& err ) {
// XXX: Not sure if the return false is so great here. if ( err.num() == ETERM )
// return;
// Also, if we fail to publish, should we block rather
// than discard? // XXX: What other error can happen here? How should we react?
ZEROMQ_THREAD_PRINTF("xpub: Failed to publish: %s (%d)", err.what(), err.num()); ZEROMQ_THREAD_PRINTF("xpub: Failed to publish with error %s (%d)\n", err.what(), err.num());
break; break;
} }
// EAGAIN returns empty result, means try again!
// Empty result means xpub.send() returned EAGAIN. The
// socket reached its high water mark and we should
// relax / backoff a bit. Otherwise we'll be spinning
// unproductively very fast here. Note that this is going
// to build up backpressure and eventually inproc.send()
// will block from the main thread.
if ( ! result ) {
++tries;
auto sleep_for = std::min(tries * 10, 500);
ZEROMQ_THREAD_PRINTF(
"xpub: Failed forward inproc to xpub! Overloaded? (tries=%d sleeping %d ms)\n", tries,
sleep_for);
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_for));
}
} while ( ! result ); } while ( ! result );
} }
} }
}
}; };
auto HandleXPubMessages = [this](const std::vector<MultipartMessage>& msgs) { auto HandleXPubMessages = [this](const std::vector<MultipartMessage>& msgs) {
@ -439,7 +477,7 @@ void ZeroMQBackend::Run() {
// Filter out messages that are coming from this node. // Filter out messages that are coming from this node.
std::string sender(msg[1].data<const char>(), msg[1].size()); std::string sender(msg[1].data<const char>(), msg[1].size());
if ( sender == my_node_id ) if ( sender == NodeId() )
continue; continue;
detail::byte_buffer payload{msg[3].data<std::byte>(), msg[3].data<std::byte>() + msg[3].size()}; detail::byte_buffer payload{msg[3].data<std::byte>(), msg[3].data<std::byte>() + msg[3].size()};
@ -451,6 +489,16 @@ void ZeroMQBackend::Run() {
QueueForProcessing(std::move(qmsgs)); QueueForProcessing(std::move(qmsgs));
}; };
// Helper class running at destruction.
class Deferred {
public:
Deferred(std::function<void()> deferred) : closer(std::move(deferred)) {}
~Deferred() { closer(); }
private:
std::function<void()> closer;
};
struct SocketInfo { struct SocketInfo {
zmq::socket_ref socket; zmq::socket_ref socket;
std::string name; std::string name;
@ -464,6 +512,15 @@ void ZeroMQBackend::Run() {
{.socket = log_pull, .name = "log_pull", .handler = HandleLogMessages}, {.socket = log_pull, .name = "log_pull", .handler = HandleLogMessages},
}; };
// Called when Run() terminates.
auto deferred_close = Deferred([this]() {
child_inproc.close();
xpub.close();
xsub.close();
log_pull.close();
ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::THREAD, "Thread sockets closed (%p)\n", this);
});
std::vector<zmq::pollitem_t> poll_items(sockets.size()); std::vector<zmq::pollitem_t> poll_items(sockets.size());
while ( true ) { while ( true ) {
@ -532,10 +589,12 @@ void ZeroMQBackend::Run() {
rcv_messages[i].pop_back(); rcv_messages[i].pop_back();
} }
} catch ( zmq::error_t& err ) { } catch ( zmq::error_t& err ) {
if ( err.num() == ETERM ) if ( err.num() != ETERM )
return;
throw; throw;
// Shutdown.
ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::THREAD, "Thread terminating (%p)\n", this);
break;
} }
// At this point, we've received anything that was readable from the sockets. // At this point, we've received anything that was readable from the sockets.
@ -552,10 +611,33 @@ void ZeroMQBackend::Run() {
bool ZeroMQBackend::DoProcessBackendMessage(int tag, detail::byte_buffer_span payload) { bool ZeroMQBackend::DoProcessBackendMessage(int tag, detail::byte_buffer_span payload) {
if ( tag == 0 || tag == 1 ) { if ( tag == 0 || tag == 1 ) {
std::string topic{reinterpret_cast<const char*>(payload.data()), payload.size()}; std::string topic{reinterpret_cast<const char*>(payload.data()), payload.size()};
zeek::EventHandlerPtr eh = tag == 1 ? event_subscription : event_unsubscription; zeek::EventHandlerPtr eh;
if ( tag == 1 ) {
// If this is the first time the subscription was observed, raise
// the ZeroMQ internal event.
if ( xpub_subscriptions.count(topic) == 0 ) {
eh = event_subscription;
xpub_subscriptions.insert(topic);
}
if ( const auto& cbit = subscription_callbacks.find(topic); cbit != subscription_callbacks.end() ) {
const auto& cb = cbit->second;
if ( cb )
cb(topic, {CallbackStatus::Success, "success"});
subscription_callbacks.erase(cbit);
}
}
else if ( tag == 0 ) {
eh = event_unsubscription;
xpub_subscriptions.erase(topic);
}
ZEROMQ_DEBUG("BackendMessage: %s for %s", eh != nullptr ? eh->Name() : "<raising no event>", topic.c_str());
if ( eh )
EnqueueEvent(eh, zeek::Args{zeek::make_intrusive<zeek::StringVal>(topic)});
ZEROMQ_DEBUG("BackendMessage: %s for %s", eh->Name(), topic.c_str());
zeek::event_mgr.Enqueue(eh, zeek::make_intrusive<zeek::StringVal>(topic));
return true; return true;
} }
else { else {

View file

@ -17,7 +17,8 @@ public:
/** /**
* Constructor. * Constructor.
*/ */
ZeroMQBackend(std::unique_ptr<EventSerializer> es, std::unique_ptr<LogSerializer> ls); ZeroMQBackend(std::unique_ptr<EventSerializer> es, std::unique_ptr<LogSerializer> ls,
std::unique_ptr<detail::EventHandlingStrategy> ehs);
/** /**
* Spawns a thread running zmq_proxy() for the configured XPUB/XSUB listen * Spawns a thread running zmq_proxy() for the configured XPUB/XSUB listen
@ -34,8 +35,9 @@ public:
* Component factory. * Component factory.
*/ */
static std::unique_ptr<Backend> Instantiate(std::unique_ptr<EventSerializer> event_serializer, static std::unique_ptr<Backend> Instantiate(std::unique_ptr<EventSerializer> event_serializer,
std::unique_ptr<LogSerializer> log_serializer) { std::unique_ptr<LogSerializer> log_serializer,
return std::make_unique<ZeroMQBackend>(std::move(event_serializer), std::move(log_serializer)); std::unique_ptr<detail::EventHandlingStrategy> ehs) {
return std::make_unique<ZeroMQBackend>(std::move(event_serializer), std::move(log_serializer), std::move(ehs));
} }
private: private:
@ -48,7 +50,7 @@ private:
bool DoPublishEvent(const std::string& topic, const std::string& format, bool DoPublishEvent(const std::string& topic, const std::string& format,
const cluster::detail::byte_buffer& buf) override; const cluster::detail::byte_buffer& buf) override;
bool DoSubscribe(const std::string& topic_prefix) override; bool DoSubscribe(const std::string& topic_prefix, SubscribeCallback cb) override;
bool DoUnsubscribe(const std::string& topic_prefix) override; bool DoUnsubscribe(const std::string& topic_prefix) override;
@ -60,7 +62,6 @@ private:
bool DoProcessBackendMessage(int tag, detail::byte_buffer_span payload) override; bool DoProcessBackendMessage(int tag, detail::byte_buffer_span payload) override;
// Script level variables. // Script level variables.
std::string my_node_id;
std::string connect_xsub_endpoint; std::string connect_xsub_endpoint;
std::string connect_xpub_endpoint; std::string connect_xpub_endpoint;
std::string listen_xsub_endpoint; std::string listen_xsub_endpoint;
@ -94,6 +95,10 @@ private:
std::thread self_thread; std::thread self_thread;
std::unique_ptr<ProxyThread> proxy_thread; std::unique_ptr<ProxyThread> proxy_thread;
// Tracking the subscriptions on the local XPUB socket.
std::map<std::string, SubscribeCallback> subscription_callbacks;
std::set<std::string> xpub_subscriptions;
}; };
} // namespace zeek::cluster::zeromq } // namespace zeek::cluster::zeromq

View file

@ -64,9 +64,9 @@ function Cluster::__unsubscribe%(topic_prefix: string%): bool
## Initialize the global cluster backend. ## Initialize the global cluster backend.
## ##
## Returns: true on success. ## Returns: true on success.
function Cluster::Backend::__init%(%): bool function Cluster::Backend::__init%(nid: string%): bool
%{ %{
auto rval = zeek::cluster::backend->Init(); auto rval = zeek::cluster::backend->Init(nid->ToStdString());
return zeek::val_mgr->Bool(rval); return zeek::val_mgr->Bool(rval);
%} %}

View file

@ -76,7 +76,7 @@ std::optional<detail::Event> to_zeek_event(const broker::zeek::Event& ev) {
zeek::Args vl; zeek::Args vl;
zeek::EventHandlerPtr handler = zeek::event_registry->Lookup(name); zeek::EventHandlerPtr handler = zeek::event_registry->Lookup(name);
if ( ! handler ) { if ( handler == nullptr ) {
zeek::reporter->Error("Failed to lookup handler for '%s'", std::string(name).c_str()); zeek::reporter->Error("Failed to lookup handler for '%s'", std::string(name).c_str());
return std::nullopt; return std::nullopt;
} }
@ -165,8 +165,8 @@ bool detail::BrokerJsonV1_Serializer::SerializeEvent(byte_buffer& buf, const det
if ( ! ev ) if ( ! ev )
return false; return false;
auto push_back_adaptor = PushBackAdapter(buf); auto push_back_adapter = PushBackAdapter(buf);
broker::format::json::v1::encode(ev->move_data(), std::back_inserter(push_back_adaptor)); broker::format::json::v1::encode(ev->move_data(), std::back_inserter(push_back_adapter));
return true; return true;
} }

View file

@ -849,8 +849,12 @@ SetupResult setup(int argc, char** argv, Options* zopts) {
exit(1); exit(1);
} }
auto backend = cluster::manager->InstantiateBackend(cluster_backend_val, std::move(event_serializer), auto event_handling_strategy = std::make_unique<cluster::detail::LocalEventHandlingStrategy>();
std::move(log_serializer));
auto backend =
cluster::manager->InstantiateBackend(cluster_backend_val, std::move(event_serializer),
std::move(log_serializer), std::move(event_handling_strategy));
if ( ! backend ) { if ( ! backend ) {
reporter->Error("Failed to instantiate cluster backend: %s", reporter->Error("Failed to instantiate cluster backend: %s",
zeek::obj_desc_short(cluster_backend_val.get()).c_str()); zeek::obj_desc_short(cluster_backend_val.get()).c_str());

View file

@ -0,0 +1,2 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
received termination signal

View file

@ -0,0 +1,3 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
node_up, worker-1
node_down, worker-1

View file

@ -0,0 +1,2 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
received termination signal

View file

@ -0,0 +1,2 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
got finish

View file

@ -0,0 +1,5 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
node_up, worker-1
subscription, /test/worker/topic
unsubscription, /test/worker/topic
node_down, worker-1

View file

@ -0,0 +1,5 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
start_test
subscription, /test/manager/topic
unsubscription, /test/manager/topic
done

View file

@ -5,7 +5,7 @@
@load frameworks/cluster/backend/zeromq @load frameworks/cluster/backend/zeromq
@load frameworks/cluster/backend/zeromq/connect @load frameworks/cluster/backend/zeromq/connect
redef Cluster::Backend::ZeroMQ::listen_xpub_endpoint = fmt("tcp://127.0.0.1:%s", extract_count(getenv("XPUB_PORT"))); redef Cluster::Backend::ZeroMQ::listen_xpub_endpoint = fmt("tcp://127.0.0.1:%s", port_to_count(to_port(getenv("XPUB_PORT"))));
redef Cluster::Backend::ZeroMQ::listen_xsub_endpoint = fmt("tcp://127.0.0.1:%s", extract_count(getenv("XSUB_PORT"))); redef Cluster::Backend::ZeroMQ::listen_xsub_endpoint = fmt("tcp://127.0.0.1:%s", port_to_count(to_port(getenv("XSUB_PORT"))));
redef Cluster::Backend::ZeroMQ::connect_xpub_endpoint = fmt("tcp://127.0.0.1:%s", extract_count(getenv("XSUB_PORT"))); redef Cluster::Backend::ZeroMQ::connect_xpub_endpoint = fmt("tcp://127.0.0.1:%s", port_to_count(to_port(getenv("XSUB_PORT"))));
redef Cluster::Backend::ZeroMQ::connect_xsub_endpoint = fmt("tcp://127.0.0.1:%s", extract_count(getenv("XPUB_PORT"))); redef Cluster::Backend::ZeroMQ::connect_xsub_endpoint = fmt("tcp://127.0.0.1:%s", port_to_count(to_port(getenv("XPUB_PORT"))));

View file

@ -0,0 +1,66 @@
# @TEST-DOC: A worker receiving an event without a handler implemented would produce a reporter error
#
# @TEST-REQUIRES: have-zeromq
#
# @TEST-GROUP: cluster-zeromq
#
# @TEST-PORT: XPUB_PORT
# @TEST-PORT: XSUB_PORT
# @TEST-PORT: LOG_PULL_PORT
#
# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-simple.zeek cluster-layout.zeek
# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek
#
# @TEST-EXEC: zeek --parse-only manager.zeek worker.zeek
#
# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek >out"
# @TEST-EXEC: btest-bg-run worker "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-1 zeek -b ../worker.zeek >out"
#
# @TEST-EXEC: btest-bg-wait 30
# @TEST-EXEC: btest-diff ./manager/out
# @TEST-EXEC: btest-diff ./manager/.stderr
# @TEST-EXEC: btest-diff ./worker/out
# @TEST-EXEC: btest-diff ./worker/.stderr
# @TEST-START-FILE common.zeek
@load ./zeromq-test-bootstrap
global hello: event() &is_used;
global finish: event() &is_used;
# @TEST-END-FILE
# @TEST-START-FILE manager.zeek
@load ./common.zeek
event send_finish(id: string)
{
Cluster::publish(Cluster::nodeid_topic(id), finish);
}
# If a node comes up that isn't us, send it a hello and
# schedule sending a my_finish
event Cluster::node_up(name: string, id: string)
{
print "node_up", name;
Cluster::publish(Cluster::nodeid_topic(id), hello);
schedule 20msec { send_finish(id) };
}
# If the worker vanishes, finish the test.
event Cluster::node_down(name: string, id: string)
{
print "node_down", name;
terminate();
}
# @TEST-END-FILE
# @TEST-START-FILE worker.zeek
@load ./common.zeek
# The worker does not implement hello!
event finish()
{
print "got finish";
terminate();
}
# @TEST-END-FILE

View file

@ -0,0 +1,84 @@
# @TEST-DOC: Regression test for unsubscriptions not actually unsubscribing because of "\x00" usage.
#
# @TEST-REQUIRES: have-zeromq
#
# @TEST-GROUP: cluster-zeromq
#
# @TEST-PORT: XPUB_PORT
# @TEST-PORT: XSUB_PORT
# @TEST-PORT: LOG_PULL_PORT
#
# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-simple.zeek cluster-layout.zeek
# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek
#
# @TEST-EXEC: zeek --parse-only ./manager.zeek ./worker.zeek
#
# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek >out"
# @TEST-EXEC: btest-bg-run worker "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-1 zeek -b ../worker.zeek >out"
#
# @TEST-EXEC: btest-bg-wait 30
# @TEST-EXEC: btest-diff ./manager/out
# @TEST-EXEC: btest-diff ./worker/out
# @TEST-START-FILE common.zeek
@load ./zeromq-test-bootstrap
global start_test: event() &is_used;
# @TEST-END-FILE
# @TEST-START-FILE manager.zeek
@load ./common.zeek
event Cluster::Backend::ZeroMQ::subscription(topic: string) {
if ( topic == "/test/worker/topic" ) {
print "subscription", topic;
Cluster::subscribe("/test/manager/topic");
}
}
event Cluster::Backend::ZeroMQ::unsubscription(topic: string) {
if ( topic == "/test/worker/topic" ) {
print "unsubscription", topic;
Cluster::unsubscribe("/test/manager/topic");
}
}
event Cluster::node_up(name: string, id: string) {
print "node_up", name;
Cluster::publish(Cluster::nodeid_topic(id), start_test);
}
event Cluster::node_down(name: string, id: string) {
print "node_down", name;
terminate();
}
# @TEST-END-FILE
# @TEST-START-FILE worker.zeek
@load ./common.zeek
event start_test() {
print "start_test";
Cluster::subscribe("/test/worker/topic");
}
event Cluster::Backend::ZeroMQ::subscription(topic: string) {
if ( topic == "/test/manager/topic" ) {
print "subscription", topic;
Cluster::unsubscribe("/test/worker/topic");
}
}
event Cluster::Backend::ZeroMQ::unsubscription(topic: string) {
if ( topic == "/test/manager/topic" ) {
print "unsubscription", topic;
terminate();
}
}
event zeek_done() {
print "done";
}
# @TEST-END-FILE