// See the file "COPYING" in the main distribution directory for copyright. #include "ZeroMQ.h" #include #include #include #include #include #include #include #include #include #include #include #include "zeek/DebugLogger.h" #include "zeek/EventHandler.h" #include "zeek/EventRegistry.h" #include "zeek/ID.h" #include "zeek/IntrusivePtr.h" #include "zeek/Reporter.h" #include "zeek/Val.h" #include "zeek/cluster/Backend.h" #include "zeek/cluster/OnLoop.h" #include "zeek/cluster/Serializer.h" #include "zeek/cluster/backend/zeromq/Plugin.h" #include "zeek/cluster/backend/zeromq/ZeroMQ-Proxy.h" #include "zeek/telemetry/Manager.h" #include "zeek/util-types.h" #include "zeek/util.h" extern int signal_val; namespace zeek { namespace plugin::Zeek_Cluster_Backend_ZeroMQ { extern zeek::plugin::Zeek_Cluster_Backend_ZeroMQ::Plugin plugin; } namespace cluster::zeromq { enum class DebugFlag : uint8_t { NONE = 0, POLL = 1, THREAD = 2, }; enum class InprocTag : uint8_t { XsubUpdate, Terminate, }; constexpr DebugFlag operator&(uint8_t x, DebugFlag y) { return static_cast(x & static_cast(y)); } // NOLINTBEGIN(cppcoreguidelines-macro-usage) #define ZEROMQ_DEBUG(...) PLUGIN_DBG_LOG(zeek::plugin::Zeek_Cluster_Backend_ZeroMQ::plugin, __VA_ARGS__) #define ZEROMQ_THREAD_PRINTF(...) \ do { \ std::fprintf(stderr, "[zeromq] " __VA_ARGS__); \ } while ( 0 ) #define ZEROMQ_DEBUG_THREAD_PRINTF(flag, ...) \ do { \ if ( (debug_flags & flag) == flag ) { \ ZEROMQ_THREAD_PRINTF(__VA_ARGS__); \ } \ } while ( 0 ) // NOLINTEND(cppcoreguidelines-macro-usage) std::unique_ptr ZeroMQBackend::Instantiate(std::unique_ptr es, std::unique_ptr ls, std::unique_ptr ehs) { auto onloop_queue_hwm = zeek::id::find_val("Cluster::Backend::ZeroMQ::onloop_queue_hwm")->AsCount(); return std::make_unique(std::move(es), std::move(ls), std::move(ehs), onloop_queue_hwm); } ZeroMQBackend::ZeroMQBackend(std::unique_ptr es, std::unique_ptr ls, std::unique_ptr ehs, zeek_uint_t onloop_queue_hwm) : ThreadedBackend("ZeroMQ", std::move(es), std::move(ls), std::move(ehs), new zeek::detail::OnLoopProcess(this, "ZeroMQ", onloop_queue_hwm)), main_inproc(zmq::socket_t(ctx, zmq::socket_type::pair)), child_inproc(zmq::socket_t(ctx, zmq::socket_type::pair)), // Counters for block and drop metrics. total_xpub_drops( zeek::telemetry_mgr->CounterInstance("zeek", "cluster_zeromq_xpub_drops", {}, "Number of published events dropped due to XPUB socket HWM.")), total_onloop_drops( zeek::telemetry_mgr->CounterInstance("zeek", "cluster_zeromq_onloop_drops", {}, "Number of received events dropped due to OnLoop queue full.")), total_msg_errors( zeek::telemetry_mgr->CounterInstance("zeek", "cluster_zeromq_msg_errors", {}, "Number of events with the wrong number of message parts.")) { // Establish the socket connection between main thread and child thread // already in the constructor. This allows Subscribe() and Unsubscribe() // calls to be delayed until DoInit() was called. main_inproc.bind("inproc://inproc-bridge"); child_inproc.connect("inproc://inproc-bridge"); } ZeroMQBackend::~ZeroMQBackend() { try { // DoTerminate is idempotent. DoTerminate(); } catch ( ... ) { // This should never happen. abort(); } } void ZeroMQBackend::DoInitPostScript() { listen_xpub_endpoint = zeek::id::find_val("Cluster::Backend::ZeroMQ::listen_xpub_endpoint")->ToStdString(); listen_xsub_endpoint = zeek::id::find_val("Cluster::Backend::ZeroMQ::listen_xsub_endpoint")->ToStdString(); ipv6 = zeek::id::find_val("Cluster::Backend::ZeroMQ::ipv6")->AsBool() ? 1 : 0; listen_xpub_nodrop = zeek::id::find_val("Cluster::Backend::ZeroMQ::listen_xpub_nodrop")->AsBool() ? 1 : 0; connect_xpub_endpoint = zeek::id::find_val("Cluster::Backend::ZeroMQ::connect_xpub_endpoint")->ToStdString(); connect_xsub_endpoint = zeek::id::find_val("Cluster::Backend::ZeroMQ::connect_xsub_endpoint")->ToStdString(); connect_xpub_nodrop = zeek::id::find_val("Cluster::Backend::ZeroMQ::connect_xpub_nodrop")->AsBool() ? 1 : 0; listen_log_endpoint = zeek::id::find_val("Cluster::Backend::ZeroMQ::listen_log_endpoint")->ToStdString(); linger_ms = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::linger_ms")->AsInt()); poll_max_messages = zeek::id::find_val("Cluster::Backend::ZeroMQ::poll_max_messages")->Get(); debug_flags = zeek::id::find_val("Cluster::Backend::ZeroMQ::debug_flags")->Get(); internal_topic_prefix = zeek::id::find_const("Cluster::Backend::ZeroMQ::internal_topic_prefix")->ToStdString(); proxy_io_threads = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::proxy_io_threads")->Get()); event_unsubscription = zeek::event_registry->Register("Cluster::Backend::ZeroMQ::unsubscription"); event_subscription = zeek::event_registry->Register("Cluster::Backend::ZeroMQ::subscription"); // xpub/xsub hwm configuration xpub_sndhwm = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::xpub_sndhwm")->AsInt()); xpub_sndbuf = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::xpub_sndbuf")->AsInt()); xsub_rcvhwm = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::xsub_rcvhwm")->AsInt()); xsub_rcvbuf = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::xsub_rcvbuf")->AsInt()); // log push/pull socket configuration log_immediate = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::log_immediate")->AsBool()); log_sndhwm = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::log_sndhwm")->AsInt()); log_sndbuf = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::log_sndbuf")->AsInt()); log_rcvhwm = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::log_rcvhwm")->AsInt()); log_rcvbuf = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::log_rcvbuf")->AsInt()); } void ZeroMQBackend::DoTerminate() { // If self_thread is running, notify it to shutdown via the inproc // socket, then wait for it to terminate. if ( self_thread.joinable() && ! self_thread_shutdown_requested ) { ZEROMQ_DEBUG("Sending terminate request via inproc socket"); auto tag = InprocTag::Terminate; main_inproc.send(zmq::const_buffer(&tag, 1), zmq::send_flags::sndmore); main_inproc.send(zmq::const_buffer("", 0)); self_thread_shutdown_requested = true; ZEROMQ_DEBUG("Joining self_thread"); if ( self_thread.joinable() ) self_thread.join(); ZEROMQ_DEBUG("Joined self_thread"); } ZEROMQ_DEBUG("Shutting down ctx"); ctx.shutdown(); // Close the sockets that are used from the main thread, // the remaining sockets except for the child_inproc one // were closed by self_thread during shutdown already. log_push.close(); main_inproc.close(); child_inproc.close(); ZEROMQ_DEBUG("Closing ctx"); ctx.close(); // If running the proxy thread, terminate it, too. if ( proxy_thread ) { ZEROMQ_DEBUG("Shutting down proxy thread"); proxy_thread->Shutdown(); proxy_thread.reset(); } // ThreadedBackend::DoTerminate() cleans up the onloop instance. ThreadedBackend::DoTerminate(); ZEROMQ_DEBUG("Terminated"); } bool ZeroMQBackend::DoInit() { // Enable IPv6 support for all subsequently created sockets, if configured. ctx.set(zmq::ctxopt::ipv6, ipv6); xsub = zmq::socket_t(ctx, zmq::socket_type::xsub); xpub = zmq::socket_t(ctx, zmq::socket_type::xpub); log_push = zmq::socket_t(ctx, zmq::socket_type::push); log_pull = zmq::socket_t(ctx, zmq::socket_type::pull); xpub.set(zmq::sockopt::linger, linger_ms); // Enable XPUB_VERBOSE unconditional to enforce nodes receiving // notifications about any new subscriptions, even if they have // seen them before. This is needed to for the subscribe callback // functionality to work reliably. xpub.set(zmq::sockopt::xpub_nodrop, connect_xpub_nodrop); xpub.set(zmq::sockopt::xpub_verbose, 1); xpub.set(zmq::sockopt::sndhwm, xpub_sndhwm); xpub.set(zmq::sockopt::sndbuf, xpub_sndbuf); xsub.set(zmq::sockopt::rcvhwm, xsub_rcvhwm); xsub.set(zmq::sockopt::rcvbuf, xsub_rcvbuf); try { xsub.connect(connect_xsub_endpoint); } catch ( zmq::error_t& err ) { zeek::reporter->Error("ZeroMQ: Failed to connect xsub socket %s: %s", connect_xsub_endpoint.c_str(), err.what()); return false; } try { xpub.connect(connect_xpub_endpoint); } catch ( zmq::error_t& err ) { zeek::reporter->Error("ZeroMQ: Failed to connect xpub socket %s: %s", connect_xpub_endpoint.c_str(), err.what()); return false; } ZEROMQ_DEBUG("Setting log_sndhwm=%d log_sndbuf=%d log_rcvhwm=%d log_rcvbuf=%d linger_ms=%d", log_sndhwm, log_sndbuf, log_rcvhwm, log_rcvbuf, linger_ms); log_push.set(zmq::sockopt::sndhwm, log_sndhwm); log_push.set(zmq::sockopt::sndbuf, log_sndbuf); log_push.set(zmq::sockopt::linger, linger_ms); log_push.set(zmq::sockopt::immediate, log_immediate); log_pull.set(zmq::sockopt::rcvhwm, log_rcvhwm); log_pull.set(zmq::sockopt::rcvbuf, log_rcvbuf); if ( ! listen_log_endpoint.empty() ) { ZEROMQ_DEBUG("Listening on log pull socket: %s", listen_log_endpoint.c_str()); try { log_pull.bind(listen_log_endpoint); } catch ( zmq::error_t& err ) { zeek::reporter->Error("ZeroMQ: Failed to bind pull socket %s: %s", listen_log_endpoint.c_str(), err.what()); return false; } } // The connect_log_endpoints variable may be modified by zeek_init(), so // need to look it up here rather than during DoInitPostScript(). // // We should've probably introduced a configuration record similar to the // storage framework, too. Hmm. Maybe in the future. const auto& log_endpoints = zeek::id::find_val("Cluster::Backend::ZeroMQ::connect_log_endpoints"); for ( unsigned int i = 0; i < log_endpoints->Size(); i++ ) connect_log_endpoints.push_back(log_endpoints->StringValAt(i)->ToStdString()); for ( const auto& endp : connect_log_endpoints ) { ZEROMQ_DEBUG("Connecting log_push socket with %s", endp.c_str()); try { log_push.connect(endp); } catch ( zmq::error_t& err ) { zeek::reporter->Error("ZeroMQ: Failed to connect push socket %s: %s", endp.c_str(), err.what()); return false; } } // At this point we've connected xpub/xsub and any logging endpoints. // However, we cannot tell if we're connected to anything as ZeroMQ does // not trivially expose this information. // // There is the zmq_socket_monitor() API that we could use to get some // more low-level events in the future for logging and possibly script // layer eventing: http://api.zeromq.org/4-2:zmq-socket-monitor // As of now, message processing happens in a separate thread that is // started below. If we wanted to integrate ZeroMQ as a selectable IO // source rather than going through ThreadedBackend and its flare, the // following post might be useful: // // https://funcptr.net/2012/09/10/zeromq---edge-triggered-notification/ // Thread is joined in backend->DoTerminate(), backend outlives it. self_thread = std::thread([](auto* backend) { backend->Run(); }, this); // After connecting, call ThreadedBackend::DoInit() to register // the IO source with the loop. return ThreadedBackend::DoInit(); } bool ZeroMQBackend::SpawnZmqProxyThread() { proxy_thread = std::make_unique(listen_xpub_endpoint, listen_xsub_endpoint, ipv6, listen_xpub_nodrop, proxy_io_threads); return proxy_thread->Start(); } bool ZeroMQBackend::DoPublishEvent(const std::string& topic, const std::string& format, const byte_buffer& buf) { // Publishing an event happens as a multipart message with 4 parts: // // * The topic to publish to - this is required by XPUB/XSUB // * The node's identifier - see Cluster::node_id(). // * The format used to serialize the event. // * The serialized event itself. std::array parts = { zmq::const_buffer(topic.data(), topic.size()), zmq::const_buffer(NodeId().data(), NodeId().size()), zmq::const_buffer(format.data(), format.size()), zmq::const_buffer(buf.data(), buf.size()), }; ZEROMQ_DEBUG("Publishing %zu bytes to %s", buf.size(), topic.c_str()); for ( size_t i = 0; i < parts.size(); i++ ) { zmq::send_flags flags = zmq::send_flags::none; if ( i < parts.size() - 1 ) flags = flags | zmq::send_flags::sndmore; // This never returns EAGAIN. A pair socket blocks whenever the hwm // is reached, regardless of passing any dontwait flag. // // This can result in blocking on Cluster::publish() if the inner // thread does not consume from child_inproc. try { main_inproc.send(parts[i], flags); } catch ( const zmq::error_t& err ) { // If send() was interrupted and Zeek caught an interrupt or term signal, // fail the publish as we're about to shutdown. There's nothing the user // can do, but it indicates an overload situation as send() was blocking. if ( err.num() == EINTR && (signal_val == SIGINT || signal_val == SIGTERM) ) { zeek::reporter->Error("Failed publish() using ZeroMQ backend at shutdown: %s (signal_val=%d)", err.what(), signal_val); return false; } zeek::reporter->Error("Unexpected ZeroMQ::DoPublishEvent() error: %s", err.what()); return false; } } return true; } bool ZeroMQBackend::DoSubscribe(const std::string& topic_prefix, SubscribeCallback cb) { ZEROMQ_DEBUG("Subscribing to %s", topic_prefix.c_str()); try { // Prepend 0x01 byte to indicate subscription to XSUB socket // This is the XSUB API instead of setsockopt(ZMQ_SUBSCRIBE). std::string msg = "\x01" + topic_prefix; // Send two message parts. The first part is a single byte tagging the // message as a XSUB update. The second part the payload for the XSUB socket. auto tag = InprocTag::XsubUpdate; main_inproc.send(zmq::const_buffer(&tag, 1), zmq::send_flags::sndmore); main_inproc.send(zmq::const_buffer(msg.data(), msg.size())); } catch ( const zmq::error_t& err ) { zeek::reporter->Error("Failed to subscribe to topic %s: %s", topic_prefix.c_str(), err.what()); if ( cb ) cb(topic_prefix, {CallbackStatus::Error, err.what()}); return false; } // Store the callback for later. if ( cb ) subscription_callbacks.insert({topic_prefix, cb}); return true; } bool ZeroMQBackend::DoUnsubscribe(const std::string& topic_prefix) { ZEROMQ_DEBUG("Unsubscribing %s", topic_prefix.c_str()); try { // Prepend 0x00 byte to indicate unsubscription to XSUB socket. // This is the XSUB API instead of setsockopt(ZMQ_SUBSCRIBE). std::string msg = '\0' + topic_prefix; // Send two message parts. The first part is a single byte tagging the // message as a XSUB update. The second part the payload for the XSUB socket. auto tag = InprocTag::XsubUpdate; main_inproc.send(zmq::const_buffer(&tag, 1), zmq::send_flags::sndmore); main_inproc.send(zmq::const_buffer(msg.data(), msg.size())); } catch ( const zmq::error_t& err ) { zeek::reporter->Error("Failed to unsubscribe from topic %s: %s", topic_prefix.c_str(), err.what()); return false; } return true; } bool ZeroMQBackend::DoPublishLogWrites(const logging::detail::LogWriteHeader& header, const std::string& format, byte_buffer& buf) { static std::string message_type = "log-write"; // Publishing a log write is done using 4 parts // // * A constant "log-write" string // * The node's identifier - see Cluster::node_id(). // * The format used to serialize the log write. // * The serialized log write itself. std::array parts = { zmq::const_buffer{message_type.data(), message_type.size()}, zmq::const_buffer(NodeId().data(), NodeId().size()), zmq::const_buffer{format.data(), format.size()}, zmq::const_buffer{buf.data(), buf.size()}, }; // If the log_push socket isn't yet initialized or has been closed, just return. if ( ! log_push ) { ZEROMQ_DEBUG("Skipping log write - log_push socket not open"); return false; } ZEROMQ_DEBUG("Publishing %zu bytes of log writes (path %s)", buf.size(), header.path.c_str()); for ( size_t i = 0; i < parts.size(); i++ ) { zmq::send_flags flags = zmq::send_flags::dontwait; if ( i < parts.size() - 1 ) flags = flags | zmq::send_flags::sndmore; zmq::send_result_t result; try { result = log_push.send(parts[i], flags); } catch ( const zmq::error_t& err ) { zeek::reporter->Error("Failed to send log write part %zu: %s", i, err.what()); return false; } if ( ! result ) { // XXX: Not exactly clear what we should do if we reach HWM. // we could block and hope a logger comes along that empties // our internal queue, or discard messages and log very loudly // and have metrics about it. However, this may happen regularly // at shutdown. // // Maybe that should be configurable? // If no logging endpoints were configured, that almost seems on // purpose (and there's a warning elsewhere about this), so skip // logging an error when sending fails. if ( connect_log_endpoints.empty() ) return true; reporter->Error("Failed to send log write. HWM reached?"); return false; } } return true; } // Forward messages from the inprocess bridge. // // Either it's 2 parts (tag and payload) for controlling subscriptions // or terminating the thread, or it is 4 parts in which case all the parts // are forwarded to the XPUB socket directly for publishing. void ZeroMQBackend::HandleInprocMessages(std::vector& msgs) { for ( auto& msg : msgs ) { if ( msg.size() == 2 ) { InprocTag tag = msg[0].data()[0]; switch ( tag ) { case InprocTag::XsubUpdate: { xsub.send(msg[1], zmq::send_flags::none); break; } case InprocTag::Terminate: { if ( self_thread_stop ) ZEROMQ_THREAD_PRINTF("inproc: error: duplicate shutdown message"); self_thread_stop = true; } } } else if ( msg.size() == 4 ) { for ( auto& part : msg ) { zmq::send_flags flags = zmq::send_flags::dontwait; if ( part.more() ) flags = flags | zmq::send_flags::sndmore; zmq::send_result_t result; try { result = xpub.send(part, flags); } catch ( zmq::error_t& err ) { if ( err.num() == ETERM ) return; // XXX: What other error can happen here? How should we react? ZEROMQ_THREAD_PRINTF("xpub: Failed to publish with error %s (%d)\n", err.what(), err.num()); break; } // Empty result means xpub.send() returned EAGAIN. The socket reached // its high-water-mark and we drop this message. if ( ! result ) { total_xpub_drops->Inc(); // Warn once about a dropped message. // // TODO: warn every n seconds? if ( xpub_drop_last_warn_at == 0.0 ) { ZEROMQ_THREAD_PRINTF("xpub: warn: dropped a message due to hwm\n"); xpub_drop_last_warn_at = util::current_time(true); } break; // Skip the whole message. } } } else { ZEROMQ_THREAD_PRINTF("inproc: error: expected 2 or 4 parts, have %zu!\n", msg.size()); total_msg_errors->Inc(); } } } void ZeroMQBackend::HandleLogMessages(const std::vector& msgs) { for ( const auto& msg : msgs ) { // sender, format, type, payload if ( msg.size() != 4 ) { ZEROMQ_THREAD_PRINTF("log: error: expected 4 parts, have %zu!\n", msg.size()); total_msg_errors->Inc(); continue; } byte_buffer payload{msg[3].data(), msg[3].data() + msg[3].size()}; LogMessage lm{.format = std::string(msg[2].data(), msg[2].size()), .payload = std::move(payload)}; // Always enqueue log messages for processing, they are important. // // Hmm, we could also consider bypassing Zeek's event loop completely and // just go to the log_mgr directly in the future. OnLoop()->QueueForProcessing(std::move(lm), zeek::detail::QueueFlag::Force); } } void ZeroMQBackend::HandleXPubMessages(const std::vector& msgs) { for ( const auto& msg : msgs ) { if ( msg.size() != 1 ) { ZEROMQ_THREAD_PRINTF("xpub: error: expected 1 part, have %zu!\n", msg.size()); total_msg_errors->Inc(); continue; } // Check if the messages starts with \x00 or \x01 to understand if it's // a subscription or unsubscription message. auto first = *reinterpret_cast(msg[0].data()); if ( first == 0 || first == 1 ) { QueueMessage qm; auto* start = msg[0].data() + 1; auto* end = msg[0].data() + msg[0].size(); byte_buffer topic(start, end); if ( first == 1 ) { qm = BackendMessage{1, std::move(topic)}; } else if ( first == 0 ) { qm = BackendMessage{0, std::move(topic)}; } else { ZEROMQ_THREAD_PRINTF("xpub: error: unexpected first char: have '0x%02x'", first); continue; } // Always enqueue subscription messages from other nodes as events. // // There shouldn't be all that many, unless some script calls Cluster::subscribe() and // Cluster::unsubscribe() a lot, so assume we can afford the extra memory rather than // missing these low-frequency events. OnLoop()->QueueForProcessing(std::move(qm), zeek::detail::QueueFlag::Force); } } } void ZeroMQBackend::HandleXSubMessages(const std::vector& msgs) { for ( const auto& msg : msgs ) { if ( msg.size() != 4 ) { ZEROMQ_THREAD_PRINTF("xsub: error: expected 4 parts, have %zu!\n", msg.size()); total_msg_errors->Inc(); continue; } // Filter out messages that are coming from this node. std::string sender(msg[1].data(), msg[1].size()); if ( sender == NodeId() ) continue; byte_buffer payload{msg[3].data(), msg[3].data() + msg[3].size()}; EventMessage em{.topic = std::string(msg[0].data(), msg[0].size()), .format = std::string(msg[2].data(), msg[2].size()), .payload = std::move(payload)}; // If queueing the event message for Zeek's main loop doesn't work due to reaching the onloop hwm, // drop the message. // // This is sort of a suicidal snail pattern but without exiting the node. if ( ! OnLoop()->QueueForProcessing(std::move(em), zeek::detail::QueueFlag::DontBlock) ) { total_onloop_drops->Inc(); // Warn once about a dropped message. if ( onloop_drop_last_warn_at == 0.0 ) { ZEROMQ_THREAD_PRINTF("warn: dropped a message due to onloop queue full\n"); onloop_drop_last_warn_at = util::current_time(true); } } } } void ZeroMQBackend::Run() { char name[4 + 2 + 16 + 1]{}; // zmq-0x<8byte pointer in hex> snprintf(name, sizeof(name), "zmq-%p", this); util::detail::set_thread_name(name); ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::THREAD, "Thread starting (%p)\n", this); struct SocketInfo { zmq::socket_ref socket; std::string name; std::function&)> handler; }; std::vector sockets = { {.socket = child_inproc, .name = "inproc", .handler = [this](auto& msgs) { HandleInprocMessages(msgs); }}, {.socket = xpub, .name = "xpub", .handler = [this](const auto& msgs) { HandleXPubMessages(msgs); }}, {.socket = xsub, .name = "xsub", .handler = [this](const auto& msgs) { HandleXSubMessages(msgs); }}, {.socket = log_pull, .name = "log_pull", .handler = [this](const auto& msgs) { HandleLogMessages(msgs); }}, }; // Called when Run() terminates. auto deferred_close = util::Deferred([this]() { xpub.close(); xsub.close(); log_pull.close(); ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::THREAD, "Thread sockets closed (%p)\n", this); }); std::vector poll_items(sockets.size()); while ( ! self_thread_stop ) { for ( size_t i = 0; i < sockets.size(); i++ ) poll_items[i] = {.socket = sockets[i].socket.handle(), .fd = 0, .events = ZMQ_POLLIN | ZMQ_POLLERR}; // Awkward. std::vector> rcv_messages(sockets.size()); try { try { int r = zmq::poll(poll_items, std::chrono::seconds(-1)); ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::POLL, "poll: r=%d", r); } catch ( const zmq::error_t& err ) { ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::POLL, "poll exception: what=%s num=%d", err.what(), err.num()); // Retry interrupted zmq::poll() calls. if ( err.num() == EINTR ) continue; throw; } for ( size_t i = 0; i < poll_items.size(); i++ ) { const auto& item = poll_items[i]; ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::POLL, "poll: items[%lu]=%s %s %s\n", i, sockets[i].name.c_str(), item.revents & ZMQ_POLLIN ? "pollin " : "", item.revents & ZMQ_POLLERR ? "err" : ""); if ( item.revents & ZMQ_POLLERR ) { // What should we be doing? Re-open sockets? Terminate? ZEROMQ_THREAD_PRINTF("poll: error: POLLERR on socket %zu %s %p revents=%x\n", i, sockets[i].name.c_str(), item.socket, item.revents); } // Nothing to do? if ( (item.revents & ZMQ_POLLIN) == 0 ) continue; bool consumed_one = false; // Read messages from the socket. do { zmq::message_t msg; rcv_messages[i].emplace_back(); // make room for a multipart message auto& into = rcv_messages[i].back(); // Only receive up to poll_max_messages from an individual // socket. Move on to the next when exceeded. The last pushed // message (empty) is popped at the end of the loop. if ( poll_max_messages > 0 && rcv_messages[i].size() > poll_max_messages ) { ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::POLL, "poll: %s rcv_messages[%zu] full!\n", sockets[i].name.c_str(), i); break; } consumed_one = false; bool more = false; // Read a multi-part message. do { zmq::recv_result_t recv_result; try { recv_result = sockets[i].socket.recv(msg, zmq::recv_flags::dontwait); } catch ( const zmq::error_t& err ) { // Retry interrupted recv() calls. if ( err.num() == EINTR ) continue; throw; } if ( recv_result ) { consumed_one = true; more = msg.more(); into.emplace_back(std::move(msg)); } else { // EAGAIN and more flag set? Try again! if ( more ) continue; } } while ( more ); } while ( consumed_one ); assert(rcv_messages[i].back().size() == 0); rcv_messages[i].pop_back(); } } catch ( const zmq::error_t& err ) { if ( err.num() != ETERM ) throw; // Shutdown. ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::THREAD, "Thread terminating (%p)\n", this); break; } // At this point, we've received anything that was readable from the sockets. // Now interpret and enqueue it into messages. for ( size_t i = 0; i < sockets.size(); i++ ) { if ( rcv_messages[i].empty() ) continue; sockets[i].handler(rcv_messages[i]); } } } bool ZeroMQBackend::DoProcessBackendMessage(int tag, byte_buffer_span payload) { if ( tag == 0 || tag == 1 ) { std::string topic{reinterpret_cast(payload.data()), payload.size()}; zeek::EventHandlerPtr eh; if ( tag == 1 ) { // If this is the first time the subscription was observed, raise // the ZeroMQ internal event. if ( ! xpub_subscriptions.contains(topic) ) { eh = event_subscription; xpub_subscriptions.insert(topic); } if ( const auto& cbit = subscription_callbacks.find(topic); cbit != subscription_callbacks.end() ) { const auto& cb = cbit->second; if ( cb ) cb(topic, {CallbackStatus::Success, "success"}); subscription_callbacks.erase(cbit); } } else if ( tag == 0 ) { eh = event_unsubscription; xpub_subscriptions.erase(topic); } ZEROMQ_DEBUG("BackendMessage: %s for %s", eh != nullptr ? eh->Name() : "", topic.c_str()); if ( eh ) EnqueueEvent(eh, zeek::Args{zeek::make_intrusive(topic)}); return true; } else { zeek::reporter->Error("Ignoring bad BackendMessage tag=%d", tag); return false; } } void ZeroMQBackend::DoReadyToPublishCallback(ReadyCallback cb) { // Setup an ephemeral subscription for a topic produced with the internal // topic prefix, this backend's node identifier and an incrementing counter. // When the SubscribeCallback for the subscription is invoked, meaning it // has become visible on the XPUB socket, call the provided ready callback // and cancel the subscription by unsubscribing from the topic again. // // The heuristic here is that seeing a subscription created by the node itself // also leads to the XPUB/XSUB proxy having sent all subscriptions from other // nodes in the cluster. // // Without this heuristic, short-lived WebSocket clients may fail to publish // messages as ZeroMQ implements sender-side subscription filtering and simply // discards messages to topics for which it hasn't seen any subscriptions yet. static int ready_topic_counter = 0; ++ready_topic_counter; auto scb = [this, cb = std::move(cb)](const std::string& topic_prefix, const SubscriptionCallbackInfo& sinfo) { Backend::ReadyCallbackInfo info{sinfo.status, sinfo.message}; cb(info); // Unsubscribe again, we're not actually interested in this topic. Unsubscribe(topic_prefix); }; std::string topic = util::fmt("%s%s.%d.", internal_topic_prefix.c_str(), NodeId().c_str(), ready_topic_counter); Subscribe(topic, std::move(scb)); } } // namespace cluster::zeromq } // namespace zeek