mirror of
https://github.com/zeek/zeek.git
synced 2025-10-01 22:28:20 +00:00
Compare commits
2 commits
0700427bac
...
3abc1116a1
Author | SHA1 | Date | |
---|---|---|---|
![]() |
3abc1116a1 | ||
![]() |
01666df3d7 |
5 changed files with 81 additions and 10 deletions
12
CHANGES
12
CHANGES
|
@ -1,3 +1,15 @@
|
|||
8.1.0-dev.609 | 2025-09-29 13:08:15 +0200
|
||||
|
||||
* cluster/zeromq: Fix Cluster::subscribe() block if not initialized (Arne Welzel, Corelight)
|
||||
|
||||
If Cluster::init() hasn't been invoked yet, Cluster::subscribe() with the
|
||||
ZeroMQ backend would block because the main_inproc socket didn't
|
||||
yet have a connection from the child thread. Prevent this by connecting
|
||||
the main and child socket pair at construction time.
|
||||
|
||||
This will queue the subscriptions and start processing them once the
|
||||
child thread has started.
|
||||
|
||||
8.1.0-dev.607 | 2025-09-26 14:19:40 -0700
|
||||
|
||||
* Fixes for -O gen-standalone-C++ for tracking BiFs, lambdas, attribute types, and independent globals (Vern Paxson, Corelight)
|
||||
|
|
2
VERSION
2
VERSION
|
@ -1 +1 @@
|
|||
8.1.0-dev.607
|
||||
8.1.0-dev.609
|
||||
|
|
|
@ -85,6 +85,7 @@ ZeroMQBackend::ZeroMQBackend(std::unique_ptr<EventSerializer> es, std::unique_pt
|
|||
: ThreadedBackend("ZeroMQ", std::move(es), std::move(ls), std::move(ehs),
|
||||
new zeek::detail::OnLoopProcess<ThreadedBackend, QueueMessage>(this, "ZeroMQ", onloop_queue_hwm)),
|
||||
main_inproc(zmq::socket_t(ctx, zmq::socket_type::pair)),
|
||||
child_inproc(zmq::socket_t(ctx, zmq::socket_type::pair)),
|
||||
// Counters for block and drop metrics.
|
||||
total_xpub_drops(
|
||||
zeek::telemetry_mgr->CounterInstance("zeek", "cluster_zeromq_xpub_drops", {},
|
||||
|
@ -94,7 +95,13 @@ ZeroMQBackend::ZeroMQBackend(std::unique_ptr<EventSerializer> es, std::unique_pt
|
|||
"Number of received events dropped due to OnLoop queue full.")),
|
||||
total_msg_errors(
|
||||
zeek::telemetry_mgr->CounterInstance("zeek", "cluster_zeromq_msg_errors", {},
|
||||
"Number of events with the wrong number of message parts.")) {}
|
||||
"Number of events with the wrong number of message parts.")) {
|
||||
// Establish the socket connection between main thread and child thread
|
||||
// already in the constructor. This allows Subscribe() and Unsubscribe()
|
||||
// calls to be delayed until DoInit() was called.
|
||||
main_inproc.bind("inproc://inproc-bridge");
|
||||
child_inproc.connect("inproc://inproc-bridge");
|
||||
}
|
||||
|
||||
ZeroMQBackend::~ZeroMQBackend() {
|
||||
try {
|
||||
|
@ -169,10 +176,11 @@ void ZeroMQBackend::DoTerminate() {
|
|||
ctx.shutdown();
|
||||
|
||||
// Close the sockets that are used from the main thread,
|
||||
// the remaining sockets were closed by self_thread during
|
||||
// shutdown already.
|
||||
// the remaining sockets except for the child_inproc one
|
||||
// were closed by self_thread during shutdown already.
|
||||
log_push.close();
|
||||
main_inproc.close();
|
||||
child_inproc.close();
|
||||
|
||||
ZEROMQ_DEBUG("Closing ctx");
|
||||
ctx.close();
|
||||
|
@ -197,7 +205,6 @@ bool ZeroMQBackend::DoInit() {
|
|||
xpub = zmq::socket_t(ctx, zmq::socket_type::xpub);
|
||||
log_push = zmq::socket_t(ctx, zmq::socket_type::push);
|
||||
log_pull = zmq::socket_t(ctx, zmq::socket_type::pull);
|
||||
child_inproc = zmq::socket_t(ctx, zmq::socket_type::pair);
|
||||
|
||||
xpub.set(zmq::sockopt::linger, linger_ms);
|
||||
|
||||
|
@ -286,10 +293,6 @@ bool ZeroMQBackend::DoInit() {
|
|||
//
|
||||
// https://funcptr.net/2012/09/10/zeromq---edge-triggered-notification/
|
||||
|
||||
// Setup connectivity between main and child thread.
|
||||
main_inproc.bind("inproc://inproc-bridge");
|
||||
child_inproc.connect("inproc://inproc-bridge");
|
||||
|
||||
// Thread is joined in backend->DoTerminate(), backend outlives it.
|
||||
self_thread = std::thread([](auto* backend) { backend->Run(); }, this);
|
||||
|
||||
|
@ -634,7 +637,6 @@ void ZeroMQBackend::Run() {
|
|||
|
||||
// Called when Run() terminates.
|
||||
auto deferred_close = util::Deferred([this]() {
|
||||
child_inproc.close();
|
||||
xpub.close();
|
||||
xsub.close();
|
||||
log_pull.close();
|
||||
|
|
|
@ -0,0 +1,2 @@
|
|||
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||
test(), 42
|
55
testing/btest/cluster/zeromq/subscribe-before-init.zeek
Normal file
55
testing/btest/cluster/zeromq/subscribe-before-init.zeek
Normal file
|
@ -0,0 +1,55 @@
|
|||
# @TEST-DOC: Regression test Cluster::subscribe() blocking if called in a high-priority zeek_init() handler
|
||||
#
|
||||
# @TEST-REQUIRES: have-zeromq
|
||||
#
|
||||
# @TEST-GROUP: cluster-zeromq
|
||||
#
|
||||
# @TEST-PORT: XPUB_PORT
|
||||
# @TEST-PORT: XSUB_PORT
|
||||
# @TEST-PORT: LOG_PULL_PORT
|
||||
#
|
||||
# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-simple.zeek cluster-layout.zeek
|
||||
# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek
|
||||
#
|
||||
# @TEST-EXEC: zeek --parse-only ./manager.zeek ./worker.zeek
|
||||
#
|
||||
# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek >out"
|
||||
# @TEST-EXEC: btest-bg-run worker "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-1 zeek -b ../worker.zeek >out"
|
||||
#
|
||||
# @TEST-EXEC: btest-bg-wait 30
|
||||
# @TEST-EXEC: btest-diff ./manager/out
|
||||
|
||||
# @TEST-START-FILE common.zeek
|
||||
@load ./zeromq-test-bootstrap
|
||||
|
||||
global test: event(c: count) &is_used;
|
||||
|
||||
# @TEST-END-FILE
|
||||
|
||||
# @TEST-START-FILE manager.zeek
|
||||
@load ./common.zeek
|
||||
|
||||
event zeek_init() &priority=1000000
|
||||
{
|
||||
Cluster::subscribe("test.early");
|
||||
}
|
||||
|
||||
event test(c: count)
|
||||
{
|
||||
print "test()", c;
|
||||
}
|
||||
|
||||
event Cluster::node_down(name: string, id: string)
|
||||
{
|
||||
terminate();
|
||||
}
|
||||
# @TEST-END-FILE
|
||||
|
||||
# @TEST-START-FILE worker.zeek
|
||||
@load ./common.zeek
|
||||
event Cluster::node_up(name: string, id: string)
|
||||
{
|
||||
Cluster::publish("test.early", test, 42);
|
||||
terminate();
|
||||
}
|
||||
# @TEST-END-FILE
|
Loading…
Add table
Add a link
Reference in a new issue