mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 06:38:20 +00:00
cluster/zeromq: Fix unsubscription visibility
When two workers connect to zeek.cluster.worker, the central ZeroMQ proxy would not propagate unsubscription information to other nodes once they both left. Set ZMQ_XPUB_VERBOSER on the proxies XPUB socket for visibility.
This commit is contained in:
parent
6045c8ee64
commit
3885871e7d
5 changed files with 173 additions and 5 deletions
|
@ -41,11 +41,12 @@ bool ProxyThread::Start() {
|
|||
zmq::socket_t xpub(ctx, zmq::socket_type::xpub);
|
||||
zmq::socket_t xsub(ctx, zmq::socket_type::xsub);
|
||||
|
||||
// Enable XPUB_VERBOSE unconditional to enforce nodes receiving
|
||||
// notifications about any new subscriptions, even if they have
|
||||
// seen them before. This is needed to for the subscribe callback
|
||||
// functionality to work reliably.
|
||||
xpub.set(zmq::sockopt::xpub_verbose, 1);
|
||||
// Enable XPUB_VERBOSER unconditional to enforce nodes receiving
|
||||
// notifications about any new and removed subscriptions, even if
|
||||
// they have seen them before. This is needed for the subscribe
|
||||
// callback and shared subscription removal notification to work
|
||||
// reliably.
|
||||
xpub.set(zmq::sockopt::xpub_verboser, 1);
|
||||
|
||||
xpub.set(zmq::sockopt::xpub_nodrop, xpub_nodrop);
|
||||
|
||||
|
|
|
@ -0,0 +1,14 @@
|
|||
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||
### NOTE: This file has been sorted with diff-sort.
|
||||
A node_up, worker-1
|
||||
A node_up, worker-2
|
||||
B subscription, /test/worker-1
|
||||
B subscription, /test/worker-2
|
||||
B subscription, /test/worker-common
|
||||
C subscribing to /test/manager-common
|
||||
D unsubscription, /test/worker-1
|
||||
D unsubscription, /test/worker-2
|
||||
D unsubscription, /test/worker-common
|
||||
E unsubscribing from /test/manager-common
|
||||
Z node_down, worker-1
|
||||
Z node_down, worker-2
|
|
@ -0,0 +1,7 @@
|
|||
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||
### NOTE: This file has been sorted with diff-sort.
|
||||
A start_test
|
||||
B subscription, /test/manager-common
|
||||
C unsubscribe from /test/worker-common
|
||||
C unsubscription, /test/manager-common
|
||||
D /test/manager-common unsubscribed, terminate()
|
|
@ -0,0 +1,7 @@
|
|||
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||
### NOTE: This file has been sorted with diff-sort.
|
||||
A start_test
|
||||
B subscription, /test/manager-common
|
||||
C unsubscribe from /test/worker-common
|
||||
C unsubscription, /test/manager-common
|
||||
D /test/manager-common unsubscribed, terminate()
|
139
testing/btest/cluster/zeromq/unsubscribe-two-workers.zeek
Normal file
139
testing/btest/cluster/zeromq/unsubscribe-two-workers.zeek
Normal file
|
@ -0,0 +1,139 @@
|
|||
# @TEST-DOC: Regression test for shared unsubscriptions not happening.
|
||||
#
|
||||
# Scenario:
|
||||
# * manager waits for two workers and sends start_test() event
|
||||
# * workers create subscriptions for /test/worker-1, /test/worker-2 and /test/worker-common
|
||||
# * manager: Seeing all these subscriptions, subscribe to /test/manager-common
|
||||
# * workers: Seeing /test/manager-common subscription, unsubscribe /test/worker-common
|
||||
# * manager: Observes unsubscription for /test/worker-common, unsubscribes from /test/manager-common
|
||||
# * workers: terminate() when seeing the unsubscription for /test/manager-common
|
||||
#
|
||||
# @TEST-REQUIRES: have-zeromq
|
||||
#
|
||||
# @TEST-GROUP: cluster-zeromq
|
||||
#
|
||||
# @TEST-PORT: XPUB_PORT
|
||||
# @TEST-PORT: XSUB_PORT
|
||||
# @TEST-PORT: LOG_PULL_PORT
|
||||
#
|
||||
# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-simple.zeek cluster-layout.zeek
|
||||
# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek
|
||||
#
|
||||
# @TEST-EXEC: zeek --parse-only ./manager.zeek ./worker.zeek
|
||||
#
|
||||
# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek >out"
|
||||
# @TEST-EXEC: btest-bg-run worker-1 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-1 zeek -b ../worker.zeek >out"
|
||||
# @TEST-EXEC: btest-bg-run worker-2 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-2 zeek -b ../worker.zeek >out"
|
||||
# @TEST-EXEC: btest-bg-wait 30
|
||||
#
|
||||
# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-sort btest-diff ./manager/out
|
||||
# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-sort btest-diff ./worker-1/out
|
||||
# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-sort btest-diff ./worker-2/out
|
||||
|
||||
|
||||
# @TEST-START-FILE common.zeek
|
||||
@load ./zeromq-test-bootstrap
|
||||
|
||||
global start_test: event() &is_used;
|
||||
# @TEST-END-FILE
|
||||
|
||||
# @TEST-START-FILE manager.zeek
|
||||
@load ./common.zeek
|
||||
|
||||
global worker_subs_seen = 0;
|
||||
global worker_common_seen = F;
|
||||
global subscribe_done = F;
|
||||
global nodes_up = 0;
|
||||
global nodes_down = 0;
|
||||
|
||||
event Cluster::Backend::ZeroMQ::subscription(topic: string)
|
||||
{
|
||||
if ( ! starts_with(topic, "/test/worker") )
|
||||
return;
|
||||
|
||||
print "B subscription", topic;
|
||||
|
||||
if ( topic == "/test/worker-1" || topic == "/test/worker-2" )
|
||||
++worker_subs_seen;
|
||||
|
||||
if ( topic == "/test/worker-common" )
|
||||
worker_common_seen = T;
|
||||
|
||||
if ( ! subscribe_done && worker_common_seen && worker_subs_seen == 2 )
|
||||
{
|
||||
print "C subscribing to /test/manager-common";
|
||||
Cluster::subscribe("/test/manager-common");
|
||||
subscribe_done = T;
|
||||
}
|
||||
}
|
||||
|
||||
event Cluster::Backend::ZeroMQ::unsubscription(topic: string)
|
||||
{
|
||||
if ( ! starts_with(topic, "/test/worker") )
|
||||
return;
|
||||
|
||||
print "D unsubscription", topic;
|
||||
|
||||
if ( topic == "/test/worker-common" )
|
||||
{
|
||||
print "E unsubscribing from /test/manager-common";
|
||||
Cluster::unsubscribe("/test/manager-common");
|
||||
}
|
||||
}
|
||||
|
||||
event Cluster::node_up(name: string, id: string)
|
||||
{
|
||||
print "A node_up", name;
|
||||
++nodes_up;
|
||||
|
||||
if ( nodes_up == 2 )
|
||||
Cluster::publish(Cluster::worker_topic, start_test);
|
||||
}
|
||||
|
||||
event Cluster::node_down(name: string, id: string)
|
||||
{
|
||||
print "Z node_down", name;
|
||||
++nodes_down;
|
||||
if ( nodes_down == 2 )
|
||||
terminate();
|
||||
}
|
||||
# @TEST-END-FILE
|
||||
|
||||
# @TEST-START-FILE worker.zeek
|
||||
@load ./common.zeek
|
||||
|
||||
event start_test()
|
||||
{
|
||||
print "A start_test";
|
||||
Cluster::subscribe("/test/worker-common");
|
||||
Cluster::subscribe("/test/" + Cluster::node);
|
||||
}
|
||||
|
||||
event Cluster::Backend::ZeroMQ::subscription(topic: string)
|
||||
{
|
||||
if ( ! starts_with(topic, "/test/manager") )
|
||||
return;
|
||||
|
||||
print "B subscription", topic;
|
||||
|
||||
if ( topic == "/test/manager-common" )
|
||||
{
|
||||
print "C unsubscribe from /test/worker-common";
|
||||
Cluster::unsubscribe("/test/worker-common");
|
||||
}
|
||||
}
|
||||
|
||||
event Cluster::Backend::ZeroMQ::unsubscription(topic: string)
|
||||
{
|
||||
if ( ! starts_with(topic, "/test/manager") )
|
||||
return;
|
||||
|
||||
print "C unsubscription", topic;
|
||||
|
||||
if ( topic == "/test/manager-common" )
|
||||
{
|
||||
print "D /test/manager-common unsubscribed, terminate()";
|
||||
terminate();
|
||||
}
|
||||
}
|
||||
# @TEST-END-FILE
|
Loading…
Add table
Add a link
Reference in a new issue