diff --git a/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek b/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek index 52e8bff74b..466d778840 100644 --- a/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek +++ b/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek @@ -162,7 +162,8 @@ export { ## Bitmask to enable low-level stderr based debug printing. ## - ## poll debugging: 1 (produce verbose zmq::poll() output) + ## poll: 1 (produce verbose zmq::poll() output) + ## thread: 2 (produce thread related output) ## ## Or values from the above list together and set debug_flags ## to the result. E.g. use 7 to select 4, 2 and 1. Only use this diff --git a/src/cluster/backend/zeromq/ZeroMQ.cc b/src/cluster/backend/zeromq/ZeroMQ.cc index f07785e2a2..d79eb87db1 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.cc +++ b/src/cluster/backend/zeromq/ZeroMQ.cc @@ -22,6 +22,7 @@ #include "zeek/cluster/Serializer.h" #include "zeek/cluster/backend/zeromq/Plugin.h" #include "zeek/cluster/backend/zeromq/ZeroMQ-Proxy.h" +#include "zeek/util.h" namespace zeek { @@ -36,6 +37,7 @@ namespace cluster::zeromq { enum class DebugFlag : zeek_uint_t { NONE = 0, POLL = 1, + THREAD = 2, }; constexpr DebugFlag operator&(zeek_uint_t x, DebugFlag y) { @@ -68,13 +70,8 @@ void self_thread_fun(void* arg) { ZeroMQBackend::ZeroMQBackend(std::unique_ptr es, std::unique_ptr ls, std::unique_ptr ehs) : ThreadedBackend(std::move(es), std::move(ls), std::move(ehs)) { - xsub = zmq::socket_t(ctx, zmq::socket_type::xsub); - xpub = zmq::socket_t(ctx, zmq::socket_type::xpub); log_push = zmq::socket_t(ctx, zmq::socket_type::push); - log_pull = zmq::socket_t(ctx, zmq::socket_type::pull); - main_inproc = zmq::socket_t(ctx, zmq::socket_type::pair); - child_inproc = zmq::socket_t(ctx, zmq::socket_type::pair); } void ZeroMQBackend::DoInitPostScript() { @@ -97,9 +94,6 @@ void ZeroMQBackend::DoInitPostScript() { event_unsubscription = zeek::event_registry->Register("Cluster::Backend::ZeroMQ::unsubscription"); event_subscription = zeek::event_registry->Register("Cluster::Backend::ZeroMQ::subscription"); - - main_inproc.bind("inproc://publish-bridge"); - child_inproc.connect("inproc://publish-bridge"); } @@ -109,13 +103,12 @@ void ZeroMQBackend::DoTerminate() { ZEROMQ_DEBUG("Joining self_thread"); if ( self_thread.joinable() ) self_thread.join(); + ZEROMQ_DEBUG("Joined self_thread"); + // Close the sockets that are used from the main thread, + // the remaining sockets are closed by self_thread. log_push.close(); - log_pull.close(); - xsub.close(); - xpub.close(); main_inproc.close(); - child_inproc.close(); ZEROMQ_DEBUG("Closing ctx"); ctx.close(); @@ -130,6 +123,11 @@ void ZeroMQBackend::DoTerminate() { } bool ZeroMQBackend::DoInit() { + xsub = zmq::socket_t(ctx, zmq::socket_type::xsub); + xpub = zmq::socket_t(ctx, zmq::socket_type::xpub); + log_pull = zmq::socket_t(ctx, zmq::socket_type::pull); + child_inproc = zmq::socket_t(ctx, zmq::socket_type::pair); + auto linger_ms = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::linger_ms")->AsInt()); int xpub_nodrop = zeek::id::find_val("Cluster::Backend::ZeroMQ::xpub_nodrop")->AsBool() ? 1 : 0; @@ -218,6 +216,10 @@ bool ZeroMQBackend::DoInit() { // following post might be useful: // // https://funcptr.net/2012/09/10/zeromq---edge-triggered-notification/ + + // Setup connectivity between main and child thread. + main_inproc.bind("inproc://inproc-bridge"); + child_inproc.connect("inproc://inproc-bridge"); self_thread = std::thread(self_thread_fun, this); // After connecting, call ThreadedBackend::DoInit() to register @@ -267,7 +269,7 @@ bool ZeroMQBackend::DoSubscribe(const std::string& topic_prefix) { // Prepend 0x01 byte to indicate subscription to XSUB socket // This is the XSUB API instead of setsockopt(ZMQ_SUBSCRIBE). std::string msg = "\x01" + topic_prefix; - xsub.send(zmq::const_buffer(msg.data(), msg.size())); + main_inproc.send(zmq::const_buffer(msg.data(), msg.size())); } catch ( zmq::error_t& err ) { zeek::reporter->Error("Failed to subscribe to topic %s: %s", topic_prefix.c_str(), err.what()); return false; @@ -282,7 +284,7 @@ bool ZeroMQBackend::DoUnsubscribe(const std::string& topic_prefix) { // Prepend 0x00 byte to indicate subscription to XSUB socket. // This is the XSUB API instead of setsockopt(ZMQ_SUBSCRIBE). std::string msg = "\x00" + topic_prefix; - xsub.send(zmq::const_buffer(msg.data(), msg.size())); + main_inproc.send(zmq::const_buffer(msg.data(), msg.size())); } catch ( zmq::error_t& err ) { zeek::reporter->Error("Failed to unsubscribe from topic %s: %s", topic_prefix.c_str(), err.what()); return false; @@ -340,6 +342,9 @@ bool ZeroMQBackend::DoPublishLogWrites(const logging::detail::LogWriteHeader& he } void ZeroMQBackend::Run() { + util::detail::set_thread_name(zeek::util::fmt("zmq-%p", this)); + ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::THREAD, "Thread starting (%p)\n", this); + using MultipartMessage = std::vector; auto HandleLogMessages = [this](const std::vector& msgs) { @@ -362,29 +367,50 @@ void ZeroMQBackend::Run() { }; auto HandleInprocMessages = [this](std::vector& msgs) { - // Forward messages from the inprocess bridge to xpub. + // Forward messages from the inprocess bridge to XSUB for subscription + // subscription handling (1 part) or XPUB for publishing (4 parts). for ( auto& msg : msgs ) { - assert(msg.size() == 4); + assert(msg.size() == 1 || msg.size() == 4); + if ( msg.size() == 1 ) { + xsub.send(msg[0], zmq::send_flags::none); + } + else { + for ( auto& part : msg ) { + zmq::send_flags flags = zmq::send_flags::dontwait; + if ( part.more() ) + flags = flags | zmq::send_flags::sndmore; - for ( auto& part : msg ) { - zmq::send_flags flags = zmq::send_flags::dontwait; - if ( part.more() ) - flags = flags | zmq::send_flags::sndmore; + zmq::send_result_t result; + int tries = 0; + do { + try { + result = xpub.send(part, flags); + } catch ( zmq::error_t& err ) { + if ( err.num() == ETERM ) + return; - zmq::send_result_t result; - do { - try { - result = xpub.send(part, flags); - } catch ( zmq::error_t& err ) { - // XXX: Not sure if the return false is so great here. - // - // Also, if we fail to publish, should we block rather - // than discard? - ZEROMQ_THREAD_PRINTF("xpub: Failed to publish: %s (%d)", err.what(), err.num()); - break; - } - // EAGAIN returns empty result, means try again! - } while ( ! result ); + // XXX: What other error can happen here? How should we react? + ZEROMQ_THREAD_PRINTF("xpub: Failed to publish with error %s (%d)\n", err.what(), err.num()); + break; + } + + // Empty result means xpub.send() returned EAGAIN. The + // socket reached its high water mark and we should + // relax / backoff a bit. Otherwise we'll be spinning + // unproductively very fast here. Note that this is going + // to build up backpressure and eventually inproc.send() + // will block from the main thread. + if ( ! result ) { + ++tries; + auto sleep_for = std::min(tries * 10, 500); + ZEROMQ_THREAD_PRINTF( + "xpub: Failed forward inproc to xpub! Overloaded? (tries=%d sleeping %d ms)\n", tries, + sleep_for); + + std::this_thread::sleep_for(std::chrono::milliseconds(sleep_for)); + } + } while ( ! result ); + } } } }; @@ -449,6 +475,16 @@ void ZeroMQBackend::Run() { QueueForProcessing(std::move(qmsgs)); }; + // Helper class running at destruction. + class Deferred { + public: + Deferred(std::function deferred) : closer(std::move(deferred)) {} + ~Deferred() { closer(); } + + private: + std::function closer; + }; + struct SocketInfo { zmq::socket_ref socket; std::string name; @@ -462,6 +498,15 @@ void ZeroMQBackend::Run() { {.socket = log_pull, .name = "log_pull", .handler = HandleLogMessages}, }; + // Called when Run() terminates. + auto deferred_close = Deferred([this]() { + child_inproc.close(); + xpub.close(); + xsub.close(); + log_pull.close(); + ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::THREAD, "Thread sockets closed (%p)\n", this); + }); + std::vector poll_items(sockets.size()); while ( true ) { @@ -530,10 +575,12 @@ void ZeroMQBackend::Run() { rcv_messages[i].pop_back(); } } catch ( zmq::error_t& err ) { - if ( err.num() == ETERM ) - return; + if ( err.num() != ETERM ) + throw; - throw; + // Shutdown. + ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::THREAD, "Thread terminating (%p)\n", this); + break; } // At this point, we've received anything that was readable from the sockets.