diff --git a/src/cluster/backend/zeromq/ZeroMQ.cc b/src/cluster/backend/zeromq/ZeroMQ.cc index a695a24f7b..ef117749ac 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.cc +++ b/src/cluster/backend/zeromq/ZeroMQ.cc @@ -297,7 +297,7 @@ bool ZeroMQBackend::DoUnsubscribe(const std::string& topic_prefix) { try { // Prepend 0x00 byte to indicate subscription to XSUB socket. // This is the XSUB API instead of setsockopt(ZMQ_SUBSCRIBE). - std::string msg = "\x00" + topic_prefix; + std::string msg = '\0' + topic_prefix; main_inproc.send(zmq::const_buffer(msg.data(), msg.size())); } catch ( zmq::error_t& err ) { zeek::reporter->Error("Failed to unsubscribe from topic %s: %s", topic_prefix.c_str(), err.what()); diff --git a/testing/btest/Baseline/cluster.zeromq.unsubscribe/..manager.out b/testing/btest/Baseline/cluster.zeromq.unsubscribe/..manager.out new file mode 100644 index 0000000000..31e6e4aeaf --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.unsubscribe/..manager.out @@ -0,0 +1,5 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +node_up, worker-1 +subscription, /test/worker/topic +unsubscription, /test/worker/topic +node_down, worker-1 diff --git a/testing/btest/Baseline/cluster.zeromq.unsubscribe/..worker.out b/testing/btest/Baseline/cluster.zeromq.unsubscribe/..worker.out new file mode 100644 index 0000000000..e265cc3e2a --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.unsubscribe/..worker.out @@ -0,0 +1,5 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +start_test +subscription, /test/manager/topic +unsubscription, /test/manager/topic +done diff --git a/testing/btest/cluster/zeromq/unsubscribe.zeek b/testing/btest/cluster/zeromq/unsubscribe.zeek new file mode 100644 index 0000000000..1067465bfd --- /dev/null +++ b/testing/btest/cluster/zeromq/unsubscribe.zeek @@ -0,0 +1,84 @@ +# @TEST-DOC: Regression test for unsubscriptions not actually unsubscribing because of "\x00" usage. +# +# @TEST-REQUIRES: have-zeromq +# +# @TEST-GROUP: cluster-zeromq +# +# @TEST-PORT: XPUB_PORT +# @TEST-PORT: XSUB_PORT +# @TEST-PORT: LOG_PULL_PORT +# +# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-simple.zeek cluster-layout.zeek +# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek +# +# @TEST-EXEC: zeek --parse-only ./manager.zeek ./worker.zeek +# +# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek >out" +# @TEST-EXEC: btest-bg-run worker "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-1 zeek -b ../worker.zeek >out" +# +# @TEST-EXEC: btest-bg-wait 30 +# @TEST-EXEC: btest-diff ./manager/out +# @TEST-EXEC: btest-diff ./worker/out + + +# @TEST-START-FILE common.zeek +@load ./zeromq-test-bootstrap + +global start_test: event() &is_used; + +# @TEST-END-FILE + +# @TEST-START-FILE manager.zeek +@load ./common.zeek + +event Cluster::Backend::ZeroMQ::subscription(topic: string) { + if ( topic == "/test/worker/topic" ) { + print "subscription", topic; + Cluster::subscribe("/test/manager/topic"); + } +} + +event Cluster::Backend::ZeroMQ::unsubscription(topic: string) { + if ( topic == "/test/worker/topic" ) { + print "unsubscription", topic; + Cluster::unsubscribe("/test/manager/topic"); + } +} + +event Cluster::node_up(name: string, id: string) { + print "node_up", name; + Cluster::publish(Cluster::nodeid_topic(id), start_test); +} + +event Cluster::node_down(name: string, id: string) { + print "node_down", name; + terminate(); +} +# @TEST-END-FILE + +# @TEST-START-FILE worker.zeek +@load ./common.zeek + +event start_test() { + print "start_test"; + Cluster::subscribe("/test/worker/topic"); +} + +event Cluster::Backend::ZeroMQ::subscription(topic: string) { + if ( topic == "/test/manager/topic" ) { + print "subscription", topic; + Cluster::unsubscribe("/test/worker/topic"); + } +} + +event Cluster::Backend::ZeroMQ::unsubscription(topic: string) { + if ( topic == "/test/manager/topic" ) { + print "unsubscription", topic; + terminate(); + } +} + +event zeek_done() { + print "done"; +} +# @TEST-END-FILE