diff --git a/src/cluster/backend/zeromq/ZeroMQ.cc b/src/cluster/backend/zeromq/ZeroMQ.cc index 3e587dd963..d69d4ba7e4 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.cc +++ b/src/cluster/backend/zeromq/ZeroMQ.cc @@ -44,6 +44,11 @@ enum class DebugFlag : zeek_uint_t { THREAD = 2, }; +enum class InprocTag : uint8_t { + XsubUpdate, + Terminate, +}; + constexpr DebugFlag operator&(zeek_uint_t x, DebugFlag y) { return static_cast(x & static_cast(y)); } @@ -112,15 +117,28 @@ void ZeroMQBackend::DoInitPostScript() { void ZeroMQBackend::DoTerminate() { ThreadedBackend::DoTerminate(); + + // If self_thread is running, notify it to shutdown via the inproc + // socket, then wait for it to terminate. + if ( self_thread.joinable() && ! self_thread_shutdown_requested ) { + ZEROMQ_DEBUG("Sending terminate request via inproc socket"); + auto tag = InprocTag::Terminate; + main_inproc.send(zmq::const_buffer(&tag, 1), zmq::send_flags::sndmore); + main_inproc.send(zmq::const_buffer("", 0)); + self_thread_shutdown_requested = true; + + ZEROMQ_DEBUG("Joining self_thread"); + if ( self_thread.joinable() ) + self_thread.join(); + ZEROMQ_DEBUG("Joined self_thread"); + } + ZEROMQ_DEBUG("Shutting down ctx"); ctx.shutdown(); - ZEROMQ_DEBUG("Joining self_thread"); - if ( self_thread.joinable() ) - self_thread.join(); - ZEROMQ_DEBUG("Joined self_thread"); // Close the sockets that are used from the main thread, - // the remaining sockets are closed by self_thread. + // the remaining sockets were closed by self_thread during + // shutdown already. log_push.close(); main_inproc.close(); @@ -303,6 +321,11 @@ bool ZeroMQBackend::DoSubscribe(const std::string& topic_prefix, SubscribeCallba // Prepend 0x01 byte to indicate subscription to XSUB socket // This is the XSUB API instead of setsockopt(ZMQ_SUBSCRIBE). std::string msg = "\x01" + topic_prefix; + + // Send two message parts. The first part is a single byte tagging the + // message as a XSUB update. The second part the payload for the XSUB socket. + auto tag = InprocTag::XsubUpdate; + main_inproc.send(zmq::const_buffer(&tag, 1), zmq::send_flags::sndmore); main_inproc.send(zmq::const_buffer(msg.data(), msg.size())); } catch ( const zmq::error_t& err ) { zeek::reporter->Error("Failed to subscribe to topic %s: %s", topic_prefix.c_str(), err.what()); @@ -322,9 +345,14 @@ bool ZeroMQBackend::DoSubscribe(const std::string& topic_prefix, SubscribeCallba bool ZeroMQBackend::DoUnsubscribe(const std::string& topic_prefix) { ZEROMQ_DEBUG("Unsubscribing %s", topic_prefix.c_str()); try { - // Prepend 0x00 byte to indicate subscription to XSUB socket. + // Prepend 0x00 byte to indicate unsubscription to XSUB socket. // This is the XSUB API instead of setsockopt(ZMQ_SUBSCRIBE). std::string msg = '\0' + topic_prefix; + + // Send two message parts. The first part is a single byte tagging the + // message as a XSUB update. The second part the payload for the XSUB socket. + auto tag = InprocTag::XsubUpdate; + main_inproc.send(zmq::const_buffer(&tag, 1), zmq::send_flags::sndmore); main_inproc.send(zmq::const_buffer(msg.data(), msg.size())); } catch ( const zmq::error_t& err ) { zeek::reporter->Error("Failed to unsubscribe from topic %s: %s", topic_prefix.c_str(), err.what()); @@ -413,11 +441,25 @@ void ZeroMQBackend::Run() { }; auto HandleInprocMessages = [this](std::vector& msgs) { - // Forward messages from the inprocess bridge to XSUB for subscription - // subscription handling (1 part) or XPUB for publishing (4 parts). + // Forward messages from the inprocess bridge. + // + // Either it's 2 parts (tag and payload) for controlling subscriptions + // or terminating the thread, or it is 4 parts in which case all the parts + // are forwarded to the XPUB socket directly for publishing. for ( auto& msg : msgs ) { - if ( msg.size() == 1 ) { - xsub.send(msg[0], zmq::send_flags::none); + if ( msg.size() == 2 ) { + InprocTag tag = msg[0].data()[0]; + switch ( tag ) { + case InprocTag::XsubUpdate: { + xsub.send(msg[1], zmq::send_flags::none); + break; + } + case InprocTag::Terminate: { + if ( self_thread_stop ) + ZEROMQ_THREAD_PRINTF("inproc: error: duplicate shutdown message"); + self_thread_stop = true; + } + } } else if ( msg.size() == 4 ) { for ( auto& part : msg ) { @@ -558,7 +600,7 @@ void ZeroMQBackend::Run() { std::vector poll_items(sockets.size()); - while ( true ) { + while ( ! self_thread_stop ) { for ( size_t i = 0; i < sockets.size(); i++ ) poll_items[i] = {.socket = sockets[i].socket.handle(), .fd = 0, .events = ZMQ_POLLIN | ZMQ_POLLERR}; diff --git a/src/cluster/backend/zeromq/ZeroMQ.h b/src/cluster/backend/zeromq/ZeroMQ.h index 088f922f34..f09bfa09db 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.h +++ b/src/cluster/backend/zeromq/ZeroMQ.h @@ -105,6 +105,8 @@ private: zmq::socket_t log_pull; std::thread self_thread; + bool self_thread_shutdown_requested = false; + bool self_thread_stop = false; int proxy_io_threads = 2; std::unique_ptr proxy_thread; diff --git a/testing/btest/Baseline/cluster.websocket.broker.oneshot/..client..stderr b/testing/btest/Baseline/cluster.websocket.broker.oneshot/..client..stderr new file mode 100644 index 0000000000..49d861c74c --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.broker.oneshot/..client..stderr @@ -0,0 +1 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. diff --git a/testing/btest/Baseline/cluster.websocket.broker.oneshot/..client..stdout b/testing/btest/Baseline/cluster.websocket.broker.oneshot/..client..stdout new file mode 100644 index 0000000000..49d861c74c --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.broker.oneshot/..client..stdout @@ -0,0 +1 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. diff --git a/testing/btest/Baseline/cluster.websocket.broker.oneshot/..manager..stderr b/testing/btest/Baseline/cluster.websocket.broker.oneshot/..manager..stderr new file mode 100644 index 0000000000..e3f6131b1d --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.broker.oneshot/..manager..stderr @@ -0,0 +1,2 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +received termination signal diff --git a/testing/btest/Baseline/cluster.websocket.broker.oneshot/..manager..stdout b/testing/btest/Baseline/cluster.websocket.broker.oneshot/..manager..stdout new file mode 100644 index 0000000000..8a464311fa --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.broker.oneshot/..manager..stdout @@ -0,0 +1,102 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +Cluster::websocket_client_added, [/zeek/wstest/ws1/] +0, ping 0 +1, ping 1 +2, ping 2 +3, ping 3 +4, ping 4 +5, ping 5 +6, ping 6 +7, ping 7 +8, ping 8 +9, ping 9 +10, ping 10 +11, ping 11 +12, ping 12 +13, ping 13 +14, ping 14 +15, ping 15 +16, ping 16 +17, ping 17 +18, ping 18 +19, ping 19 +20, ping 20 +21, ping 21 +22, ping 22 +23, ping 23 +24, ping 24 +25, ping 25 +26, ping 26 +27, ping 27 +28, ping 28 +29, ping 29 +30, ping 30 +31, ping 31 +32, ping 32 +33, ping 33 +34, ping 34 +35, ping 35 +36, ping 36 +37, ping 37 +38, ping 38 +39, ping 39 +40, ping 40 +41, ping 41 +42, ping 42 +43, ping 43 +44, ping 44 +45, ping 45 +46, ping 46 +47, ping 47 +48, ping 48 +49, ping 49 +50, ping 50 +51, ping 51 +52, ping 52 +53, ping 53 +54, ping 54 +55, ping 55 +56, ping 56 +57, ping 57 +58, ping 58 +59, ping 59 +60, ping 60 +61, ping 61 +62, ping 62 +63, ping 63 +64, ping 64 +65, ping 65 +66, ping 66 +67, ping 67 +68, ping 68 +69, ping 69 +70, ping 70 +71, ping 71 +72, ping 72 +73, ping 73 +74, ping 74 +75, ping 75 +76, ping 76 +77, ping 77 +78, ping 78 +79, ping 79 +80, ping 80 +81, ping 81 +82, ping 82 +83, ping 83 +84, ping 84 +85, ping 85 +86, ping 86 +87, ping 87 +88, ping 88 +89, ping 89 +90, ping 90 +91, ping 91 +92, ping 92 +93, ping 93 +94, ping 94 +95, ping 95 +96, ping 96 +97, ping 97 +98, ping 98 +99, ping 99 diff --git a/testing/btest/Baseline/cluster.websocket.zeromq.oneshot/..client..stderr b/testing/btest/Baseline/cluster.websocket.zeromq.oneshot/..client..stderr new file mode 100644 index 0000000000..49d861c74c --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.zeromq.oneshot/..client..stderr @@ -0,0 +1 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. diff --git a/testing/btest/Baseline/cluster.websocket.zeromq.oneshot/..client..stdout b/testing/btest/Baseline/cluster.websocket.zeromq.oneshot/..client..stdout new file mode 100644 index 0000000000..49d861c74c --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.zeromq.oneshot/..client..stdout @@ -0,0 +1 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. diff --git a/testing/btest/Baseline/cluster.websocket.zeromq.oneshot/..manager..stderr b/testing/btest/Baseline/cluster.websocket.zeromq.oneshot/..manager..stderr new file mode 100644 index 0000000000..e3f6131b1d --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.zeromq.oneshot/..manager..stderr @@ -0,0 +1,2 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +received termination signal diff --git a/testing/btest/Baseline/cluster.websocket.zeromq.oneshot/..manager..stdout b/testing/btest/Baseline/cluster.websocket.zeromq.oneshot/..manager..stdout new file mode 100644 index 0000000000..8a464311fa --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.zeromq.oneshot/..manager..stdout @@ -0,0 +1,102 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +Cluster::websocket_client_added, [/zeek/wstest/ws1/] +0, ping 0 +1, ping 1 +2, ping 2 +3, ping 3 +4, ping 4 +5, ping 5 +6, ping 6 +7, ping 7 +8, ping 8 +9, ping 9 +10, ping 10 +11, ping 11 +12, ping 12 +13, ping 13 +14, ping 14 +15, ping 15 +16, ping 16 +17, ping 17 +18, ping 18 +19, ping 19 +20, ping 20 +21, ping 21 +22, ping 22 +23, ping 23 +24, ping 24 +25, ping 25 +26, ping 26 +27, ping 27 +28, ping 28 +29, ping 29 +30, ping 30 +31, ping 31 +32, ping 32 +33, ping 33 +34, ping 34 +35, ping 35 +36, ping 36 +37, ping 37 +38, ping 38 +39, ping 39 +40, ping 40 +41, ping 41 +42, ping 42 +43, ping 43 +44, ping 44 +45, ping 45 +46, ping 46 +47, ping 47 +48, ping 48 +49, ping 49 +50, ping 50 +51, ping 51 +52, ping 52 +53, ping 53 +54, ping 54 +55, ping 55 +56, ping 56 +57, ping 57 +58, ping 58 +59, ping 59 +60, ping 60 +61, ping 61 +62, ping 62 +63, ping 63 +64, ping 64 +65, ping 65 +66, ping 66 +67, ping 67 +68, ping 68 +69, ping 69 +70, ping 70 +71, ping 71 +72, ping 72 +73, ping 73 +74, ping 74 +75, ping 75 +76, ping 76 +77, ping 77 +78, ping 78 +79, ping 79 +80, ping 80 +81, ping 81 +82, ping 82 +83, ping 83 +84, ping 84 +85, ping 85 +86, ping 86 +87, ping 87 +88, ping 88 +89, ping 89 +90, ping 90 +91, ping 91 +92, ping 92 +93, ping 93 +94, ping 94 +95, ping 95 +96, ping 96 +97, ping 97 +98, ping 98 +99, ping 99 diff --git a/testing/btest/cluster/websocket/broker/oneshot.zeek b/testing/btest/cluster/websocket/broker/oneshot.zeek new file mode 100644 index 0000000000..c1fcdf3e0c --- /dev/null +++ b/testing/btest/cluster/websocket/broker/oneshot.zeek @@ -0,0 +1,72 @@ +# @TEST-DOC: Connect with a WebSocket client, it sends 100 events and disconnects immediately. Ensure the manager sees all of them. Regression test for ZeroMQ discarding queued messages. +# +# @TEST-REQUIRES: python3 -c 'import websockets.sync' +# +# @TEST-PORT: BROKER_PORT1 +# @TEST-PORT: WEBSOCKET_PORT +# +# @TEST-EXEC: cp $FILES/ws/wstest.py . +# +# @TEST-EXEC: zeek -b --parse-only manager.zeek +# @TEST-EXEC: python3 -m py_compile client.py +# +# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek" +# @TEST-EXEC: btest-bg-run client "python3 ../client.py" +# +# @TEST-EXEC: btest-bg-wait 30 +# @TEST-EXEC: btest-diff ./manager/.stdout +# @TEST-EXEC: btest-diff ./manager/.stderr +# @TEST-EXEC: btest-diff ./client/.stdout +# @TEST-EXEC: btest-diff ./client/.stderr + +# @TEST-START-FILE cluster-layout.zeek +redef Cluster::nodes = { + ["manager"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT1"))], +}; +# @TEST-END-FILE + +# @TEST-START-FILE manager.zeek +redef exit_only_after_terminate = T; + +redef Log::enable_local_logging = T; +redef Log::default_rotation_interval = 0sec; +redef Broker::disable_ssl = T; + +global expected_ping_count = 100; +global ping_count = 0; + +global ping: event(msg: string, c: count) &is_used; + +event zeek_init() + { + Cluster::subscribe("/test/pings"); + Cluster::listen_websocket([$listen_host="127.0.0.1", $listen_port=to_port(getenv("WEBSOCKET_PORT"))]); + } + +event ping(msg: string, n: count) &is_used + { + ++ping_count; + print n, msg; + + if ( ping_count == expected_ping_count ) + terminate(); + } + +event Cluster::websocket_client_added(info: Cluster::EndpointInfo, subscriptions: string_vec) + { + print "Cluster::websocket_client_added", subscriptions; + } +# @TEST-END-FILE +# +# @TEST-START-FILE client.py +import wstest + +def run(ws_url): + with wstest.connect("ws1", ws_url) as tc: + tc.hello_v1([]) + for i in range(0, 100): + tc.send_json(wstest.build_event_v1("/test/pings/", "ping", [f"ping {i}", i])) + +if __name__ == "__main__": + wstest.main(run, wstest.WS4_URL_V1) +# @TEST-END-FILE diff --git a/testing/btest/cluster/websocket/zeromq/oneshot.zeek b/testing/btest/cluster/websocket/zeromq/oneshot.zeek new file mode 100644 index 0000000000..19f9f12720 --- /dev/null +++ b/testing/btest/cluster/websocket/zeromq/oneshot.zeek @@ -0,0 +1,71 @@ +# @TEST-DOC: Connect with a WebSocket client, it sends 100 events and disconnects immediately. Ensure the manager sees all of them. Regression test for ZeroMQ discarding queued messages. +# +# @TEST-REQUIRES: have-zeromq +# @TEST-REQUIRES: python3 -c 'import websockets.sync' +# +# @TEST-GROUP: cluster-zeromq +# +# @TEST-PORT: XPUB_PORT +# @TEST-PORT: XSUB_PORT +# @TEST-PORT: LOG_PULL_PORT +# @TEST-PORT: WEBSOCKET_PORT +# +# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-simple.zeek cluster-layout.zeek +# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek +# @TEST-EXEC: cp $FILES/ws/wstest.py . +# +# @TEST-EXEC: zeek -b --parse-only manager.zeek +# @TEST-EXEC: python3 -m py_compile client.py +# +# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek" +# @TEST-EXEC: btest-bg-run client "python3 ../client.py" +# +# @TEST-EXEC: btest-bg-wait 30 +# @TEST-EXEC: btest-diff ./manager/.stdout +# @TEST-EXEC: btest-diff ./manager/.stderr +# @TEST-EXEC: btest-diff ./client/.stdout +# @TEST-EXEC: btest-diff ./client/.stderr + +# @TEST-START-FILE manager.zeek +@load ./zeromq-test-bootstrap +redef exit_only_after_terminate = T; + +global expected_ping_count = 100; +global ping_count = 0; + +global ping: event(msg: string, c: count) &is_used; + +event zeek_init() + { + Cluster::subscribe("/test/pings"); + Cluster::listen_websocket([$listen_host="127.0.0.1", $listen_port=to_port(getenv("WEBSOCKET_PORT"))]); + } + +event ping(msg: string, n: count) &is_used + { + ++ping_count; + print n, msg; + + if ( ping_count == expected_ping_count ) + terminate(); + } + +event Cluster::websocket_client_added(info: Cluster::EndpointInfo, subscriptions: string_vec) + { + print "Cluster::websocket_client_added", subscriptions; + } +# @TEST-END-FILE + + +# @TEST-START-FILE client.py +import wstest + +def run(ws_url): + with wstest.connect("ws1", ws_url) as tc: + tc.hello_v1([]) + for i in range(0, 100): + tc.send_json(wstest.build_event_v1("/test/pings/", "ping", [f"ping {i}", i])) + +if __name__ == "__main__": + wstest.main(run, wstest.WS4_URL_V1) +# @TEST-END-FILE