mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 06:38:20 +00:00
cluster/zeromq: Fix Cluster::subscribe() block if not initialized
If Cluster::init() hasn't been invoked yet, Cluster::subscribe() with the ZeroMQ backend would block because the main_inproc socket didn't yet have a connection from the child thread. Prevent this by connecting the main and child socket pair at construction time. This will queue the subscriptions and start processing them once the child thread has started.
This commit is contained in:
parent
d2cda5a68c
commit
01666df3d7
3 changed files with 68 additions and 9 deletions
|
@ -85,6 +85,7 @@ ZeroMQBackend::ZeroMQBackend(std::unique_ptr<EventSerializer> es, std::unique_pt
|
||||||
: ThreadedBackend("ZeroMQ", std::move(es), std::move(ls), std::move(ehs),
|
: ThreadedBackend("ZeroMQ", std::move(es), std::move(ls), std::move(ehs),
|
||||||
new zeek::detail::OnLoopProcess<ThreadedBackend, QueueMessage>(this, "ZeroMQ", onloop_queue_hwm)),
|
new zeek::detail::OnLoopProcess<ThreadedBackend, QueueMessage>(this, "ZeroMQ", onloop_queue_hwm)),
|
||||||
main_inproc(zmq::socket_t(ctx, zmq::socket_type::pair)),
|
main_inproc(zmq::socket_t(ctx, zmq::socket_type::pair)),
|
||||||
|
child_inproc(zmq::socket_t(ctx, zmq::socket_type::pair)),
|
||||||
// Counters for block and drop metrics.
|
// Counters for block and drop metrics.
|
||||||
total_xpub_drops(
|
total_xpub_drops(
|
||||||
zeek::telemetry_mgr->CounterInstance("zeek", "cluster_zeromq_xpub_drops", {},
|
zeek::telemetry_mgr->CounterInstance("zeek", "cluster_zeromq_xpub_drops", {},
|
||||||
|
@ -94,7 +95,13 @@ ZeroMQBackend::ZeroMQBackend(std::unique_ptr<EventSerializer> es, std::unique_pt
|
||||||
"Number of received events dropped due to OnLoop queue full.")),
|
"Number of received events dropped due to OnLoop queue full.")),
|
||||||
total_msg_errors(
|
total_msg_errors(
|
||||||
zeek::telemetry_mgr->CounterInstance("zeek", "cluster_zeromq_msg_errors", {},
|
zeek::telemetry_mgr->CounterInstance("zeek", "cluster_zeromq_msg_errors", {},
|
||||||
"Number of events with the wrong number of message parts.")) {}
|
"Number of events with the wrong number of message parts.")) {
|
||||||
|
// Establish the socket connection between main thread and child thread
|
||||||
|
// already in the constructor. This allows Subscribe() and Unsubscribe()
|
||||||
|
// calls to be delayed until DoInit() was called.
|
||||||
|
main_inproc.bind("inproc://inproc-bridge");
|
||||||
|
child_inproc.connect("inproc://inproc-bridge");
|
||||||
|
}
|
||||||
|
|
||||||
ZeroMQBackend::~ZeroMQBackend() {
|
ZeroMQBackend::~ZeroMQBackend() {
|
||||||
try {
|
try {
|
||||||
|
@ -169,10 +176,11 @@ void ZeroMQBackend::DoTerminate() {
|
||||||
ctx.shutdown();
|
ctx.shutdown();
|
||||||
|
|
||||||
// Close the sockets that are used from the main thread,
|
// Close the sockets that are used from the main thread,
|
||||||
// the remaining sockets were closed by self_thread during
|
// the remaining sockets except for the child_inproc one
|
||||||
// shutdown already.
|
// were closed by self_thread during shutdown already.
|
||||||
log_push.close();
|
log_push.close();
|
||||||
main_inproc.close();
|
main_inproc.close();
|
||||||
|
child_inproc.close();
|
||||||
|
|
||||||
ZEROMQ_DEBUG("Closing ctx");
|
ZEROMQ_DEBUG("Closing ctx");
|
||||||
ctx.close();
|
ctx.close();
|
||||||
|
@ -197,7 +205,6 @@ bool ZeroMQBackend::DoInit() {
|
||||||
xpub = zmq::socket_t(ctx, zmq::socket_type::xpub);
|
xpub = zmq::socket_t(ctx, zmq::socket_type::xpub);
|
||||||
log_push = zmq::socket_t(ctx, zmq::socket_type::push);
|
log_push = zmq::socket_t(ctx, zmq::socket_type::push);
|
||||||
log_pull = zmq::socket_t(ctx, zmq::socket_type::pull);
|
log_pull = zmq::socket_t(ctx, zmq::socket_type::pull);
|
||||||
child_inproc = zmq::socket_t(ctx, zmq::socket_type::pair);
|
|
||||||
|
|
||||||
xpub.set(zmq::sockopt::linger, linger_ms);
|
xpub.set(zmq::sockopt::linger, linger_ms);
|
||||||
|
|
||||||
|
@ -286,10 +293,6 @@ bool ZeroMQBackend::DoInit() {
|
||||||
//
|
//
|
||||||
// https://funcptr.net/2012/09/10/zeromq---edge-triggered-notification/
|
// https://funcptr.net/2012/09/10/zeromq---edge-triggered-notification/
|
||||||
|
|
||||||
// Setup connectivity between main and child thread.
|
|
||||||
main_inproc.bind("inproc://inproc-bridge");
|
|
||||||
child_inproc.connect("inproc://inproc-bridge");
|
|
||||||
|
|
||||||
// Thread is joined in backend->DoTerminate(), backend outlives it.
|
// Thread is joined in backend->DoTerminate(), backend outlives it.
|
||||||
self_thread = std::thread([](auto* backend) { backend->Run(); }, this);
|
self_thread = std::thread([](auto* backend) { backend->Run(); }, this);
|
||||||
|
|
||||||
|
@ -634,7 +637,6 @@ void ZeroMQBackend::Run() {
|
||||||
|
|
||||||
// Called when Run() terminates.
|
// Called when Run() terminates.
|
||||||
auto deferred_close = util::Deferred([this]() {
|
auto deferred_close = util::Deferred([this]() {
|
||||||
child_inproc.close();
|
|
||||||
xpub.close();
|
xpub.close();
|
||||||
xsub.close();
|
xsub.close();
|
||||||
log_pull.close();
|
log_pull.close();
|
||||||
|
|
|
@ -0,0 +1,2 @@
|
||||||
|
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||||
|
test(), 42
|
55
testing/btest/cluster/zeromq/subscribe-before-init.zeek
Normal file
55
testing/btest/cluster/zeromq/subscribe-before-init.zeek
Normal file
|
@ -0,0 +1,55 @@
|
||||||
|
# @TEST-DOC: Regression test Cluster::subscribe() blocking if called in a high-priority zeek_init() handler
|
||||||
|
#
|
||||||
|
# @TEST-REQUIRES: have-zeromq
|
||||||
|
#
|
||||||
|
# @TEST-GROUP: cluster-zeromq
|
||||||
|
#
|
||||||
|
# @TEST-PORT: XPUB_PORT
|
||||||
|
# @TEST-PORT: XSUB_PORT
|
||||||
|
# @TEST-PORT: LOG_PULL_PORT
|
||||||
|
#
|
||||||
|
# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-simple.zeek cluster-layout.zeek
|
||||||
|
# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek
|
||||||
|
#
|
||||||
|
# @TEST-EXEC: zeek --parse-only ./manager.zeek ./worker.zeek
|
||||||
|
#
|
||||||
|
# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek >out"
|
||||||
|
# @TEST-EXEC: btest-bg-run worker "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-1 zeek -b ../worker.zeek >out"
|
||||||
|
#
|
||||||
|
# @TEST-EXEC: btest-bg-wait 30
|
||||||
|
# @TEST-EXEC: btest-diff ./manager/out
|
||||||
|
|
||||||
|
# @TEST-START-FILE common.zeek
|
||||||
|
@load ./zeromq-test-bootstrap
|
||||||
|
|
||||||
|
global test: event(c: count) &is_used;
|
||||||
|
|
||||||
|
# @TEST-END-FILE
|
||||||
|
|
||||||
|
# @TEST-START-FILE manager.zeek
|
||||||
|
@load ./common.zeek
|
||||||
|
|
||||||
|
event zeek_init() &priority=1000000
|
||||||
|
{
|
||||||
|
Cluster::subscribe("test.early");
|
||||||
|
}
|
||||||
|
|
||||||
|
event test(c: count)
|
||||||
|
{
|
||||||
|
print "test()", c;
|
||||||
|
}
|
||||||
|
|
||||||
|
event Cluster::node_down(name: string, id: string)
|
||||||
|
{
|
||||||
|
terminate();
|
||||||
|
}
|
||||||
|
# @TEST-END-FILE
|
||||||
|
|
||||||
|
# @TEST-START-FILE worker.zeek
|
||||||
|
@load ./common.zeek
|
||||||
|
event Cluster::node_up(name: string, id: string)
|
||||||
|
{
|
||||||
|
Cluster::publish("test.early", test, 42);
|
||||||
|
terminate();
|
||||||
|
}
|
||||||
|
# @TEST-END-FILE
|
Loading…
Add table
Add a link
Reference in a new issue