Merge remote-tracking branch 'origin/topic/awelzel/zeromq-fix-block-subscribe-before-init'
Some checks are pending
pre-commit / pre-commit (push) Waiting to run

* origin/topic/awelzel/zeromq-fix-block-subscribe-before-init:
  cluster/zeromq: Fix Cluster::subscribe() block if not initialized
This commit is contained in:
Arne Welzel 2025-09-29 13:08:15 +02:00
commit 3abc1116a1
5 changed files with 81 additions and 10 deletions

12
CHANGES
View file

@ -1,3 +1,15 @@
8.1.0-dev.609 | 2025-09-29 13:08:15 +0200
* cluster/zeromq: Fix Cluster::subscribe() block if not initialized (Arne Welzel, Corelight)
If Cluster::init() hasn't been invoked yet, Cluster::subscribe() with the
ZeroMQ backend would block because the main_inproc socket didn't
yet have a connection from the child thread. Prevent this by connecting
the main and child socket pair at construction time.
This will queue the subscriptions and start processing them once the
child thread has started.
8.1.0-dev.607 | 2025-09-26 14:19:40 -0700
* Fixes for -O gen-standalone-C++ for tracking BiFs, lambdas, attribute types, and independent globals (Vern Paxson, Corelight)

View file

@ -1 +1 @@
8.1.0-dev.607
8.1.0-dev.609

View file

@ -85,6 +85,7 @@ ZeroMQBackend::ZeroMQBackend(std::unique_ptr<EventSerializer> es, std::unique_pt
: ThreadedBackend("ZeroMQ", std::move(es), std::move(ls), std::move(ehs),
new zeek::detail::OnLoopProcess<ThreadedBackend, QueueMessage>(this, "ZeroMQ", onloop_queue_hwm)),
main_inproc(zmq::socket_t(ctx, zmq::socket_type::pair)),
child_inproc(zmq::socket_t(ctx, zmq::socket_type::pair)),
// Counters for block and drop metrics.
total_xpub_drops(
zeek::telemetry_mgr->CounterInstance("zeek", "cluster_zeromq_xpub_drops", {},
@ -94,7 +95,13 @@ ZeroMQBackend::ZeroMQBackend(std::unique_ptr<EventSerializer> es, std::unique_pt
"Number of received events dropped due to OnLoop queue full.")),
total_msg_errors(
zeek::telemetry_mgr->CounterInstance("zeek", "cluster_zeromq_msg_errors", {},
"Number of events with the wrong number of message parts.")) {}
"Number of events with the wrong number of message parts.")) {
// Establish the socket connection between main thread and child thread
// already in the constructor. This allows Subscribe() and Unsubscribe()
// calls to be delayed until DoInit() was called.
main_inproc.bind("inproc://inproc-bridge");
child_inproc.connect("inproc://inproc-bridge");
}
ZeroMQBackend::~ZeroMQBackend() {
try {
@ -169,10 +176,11 @@ void ZeroMQBackend::DoTerminate() {
ctx.shutdown();
// Close the sockets that are used from the main thread,
// the remaining sockets were closed by self_thread during
// shutdown already.
// the remaining sockets except for the child_inproc one
// were closed by self_thread during shutdown already.
log_push.close();
main_inproc.close();
child_inproc.close();
ZEROMQ_DEBUG("Closing ctx");
ctx.close();
@ -197,7 +205,6 @@ bool ZeroMQBackend::DoInit() {
xpub = zmq::socket_t(ctx, zmq::socket_type::xpub);
log_push = zmq::socket_t(ctx, zmq::socket_type::push);
log_pull = zmq::socket_t(ctx, zmq::socket_type::pull);
child_inproc = zmq::socket_t(ctx, zmq::socket_type::pair);
xpub.set(zmq::sockopt::linger, linger_ms);
@ -286,10 +293,6 @@ bool ZeroMQBackend::DoInit() {
//
// https://funcptr.net/2012/09/10/zeromq---edge-triggered-notification/
// Setup connectivity between main and child thread.
main_inproc.bind("inproc://inproc-bridge");
child_inproc.connect("inproc://inproc-bridge");
// Thread is joined in backend->DoTerminate(), backend outlives it.
self_thread = std::thread([](auto* backend) { backend->Run(); }, this);
@ -634,7 +637,6 @@ void ZeroMQBackend::Run() {
// Called when Run() terminates.
auto deferred_close = util::Deferred([this]() {
child_inproc.close();
xpub.close();
xsub.close();
log_pull.close();

View file

@ -0,0 +1,2 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
test(), 42

View file

@ -0,0 +1,55 @@
# @TEST-DOC: Regression test Cluster::subscribe() blocking if called in a high-priority zeek_init() handler
#
# @TEST-REQUIRES: have-zeromq
#
# @TEST-GROUP: cluster-zeromq
#
# @TEST-PORT: XPUB_PORT
# @TEST-PORT: XSUB_PORT
# @TEST-PORT: LOG_PULL_PORT
#
# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-simple.zeek cluster-layout.zeek
# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek
#
# @TEST-EXEC: zeek --parse-only ./manager.zeek ./worker.zeek
#
# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek >out"
# @TEST-EXEC: btest-bg-run worker "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-1 zeek -b ../worker.zeek >out"
#
# @TEST-EXEC: btest-bg-wait 30
# @TEST-EXEC: btest-diff ./manager/out
# @TEST-START-FILE common.zeek
@load ./zeromq-test-bootstrap
global test: event(c: count) &is_used;
# @TEST-END-FILE
# @TEST-START-FILE manager.zeek
@load ./common.zeek
event zeek_init() &priority=1000000
{
Cluster::subscribe("test.early");
}
event test(c: count)
{
print "test()", c;
}
event Cluster::node_down(name: string, id: string)
{
terminate();
}
# @TEST-END-FILE
# @TEST-START-FILE worker.zeek
@load ./common.zeek
event Cluster::node_up(name: string, id: string)
{
Cluster::publish("test.early", test, 42);
terminate();
}
# @TEST-END-FILE