Commit graph

10 commits

Author SHA1 Message Date
Arne Welzel
827eccb732 cluster/zeromq: Adapt for OnLoopProcess changes 2025-03-10 17:07:30 +01:00
Arne Welzel
6008e67008 cluster/zeromq: Call DoTerminate() in destructor
Normal life-cycle is that Terminate() / DoTerminate() is called
by zeek-setup code. If that doesn't happen, shutdown and join
threads during destructor.

try { } catch (...) suggested by Benjamin.
2025-02-05 16:39:44 +01:00
Arne Welzel
2c6d934ef4 cluster/zeromq: Use lambda for thread trampoline 2025-02-05 16:38:24 +01:00
Arne Welzel
16c745cee4 cluster/zeromq: Do not call util::fmt() from thread
...util::fmt() uses a static buffer, so this is problematic.

I've dabbled a bit replacing std::thread with using threading::BasicThread
which would offer Fmt(), but this makes things more complicated. Primarily
as BasicThread is registered with the thread manager and the shutdown
interactions become entangled. The thread might be terminated before the
backend, or vice-versa. Seems nicer for the thread to be owned by the backend.
2025-02-05 16:38:24 +01:00
Arne Welzel
9c5c0f40e1 cluster/zeromq: Fix Unsubscribe() bug caused by \x00 prefix 2025-02-05 10:39:56 +01:00
Arne Welzel
e8f87019c6 cluster: Add SubscribeCallback support
This allows callers of Subscribe() to pass in a callback that will be invoked
once the subscription is established or failed to establish. It is the
backend's responsibility to execute the callback on the main thread either
synchronously, or preferably asynchronously at a later point, by
scheduling a task on the IO main loop.

This turns on ZMQ_XPUB_VERBOSE for ZeroMQ so that notifications about
subscriptions are raised even if the subscriptions has previously been
observed.
2025-02-05 10:39:56 +01:00
Arne Welzel
fa22f91ca4 cluster/zeromq: Fix XSUB threading issues
It is not safe to use the same socket from different threads, but the
current code used the xsub socket directly from the main thread (to setup
subscriptions) and from the internal thread for polling and reading.

Leverage the PAIR socket already in use for forwarding publish operations
to the internal thread also for subscribe and unsubscribe.

The failure mode is/was a bit annoying. Essentially, closing of the
context would hang indefinitely in zmq_ctx_term().
2025-02-05 10:39:56 +01:00
Arne Welzel
df78a94c76 cluster/zeromq: Use NodeId(), drop my_node_id 2025-02-05 10:39:56 +01:00
Arne Welzel
0b7a660a34 cluster/Backend: Make backend event processing customizable
This allows configurability at the code level to decide what to do with
a received remote events and events produced by a backend. For now, only
enqueue events into the process's script layer, but for the WebSocket
interface, the action would be to send out the event on a WebSocket
connection instead.
2025-02-05 10:39:56 +01:00
Arne Welzel
35c79ab2e3 cluster/backend/zeromq: Add ZeroMQ based cluster backend
This is a cluster backend implementation using a central XPUB/XSUB proxy
that by default runs on the manager node. Logging is implemented leveraging
PUSH/PULL sockets between logger and other nodes, rather than going
through XPUB/XSUB.

The test-all-policy-cluster baseline changed: Previously, Broker::peer()
would be called from setup-connections.zeek, causing the IO loop to be
alive. With the ZeroMQ backend, the IO loop is only alive when
Cluster::init() is called, but that doesn't happen anymore.
2024-12-10 20:33:02 +01:00