// See the file "COPYING" in the main distribution directory for copyright. #include "ZeroMQ.h" #include #include #include #include #include #include #include #include #include #include #include "zeek/DebugLogger.h" #include "zeek/EventHandler.h" #include "zeek/EventRegistry.h" #include "zeek/IntrusivePtr.h" #include "zeek/Reporter.h" #include "zeek/Val.h" #include "zeek/cluster/Backend.h" #include "zeek/cluster/Serializer.h" #include "zeek/cluster/backend/zeromq/Plugin.h" #include "zeek/cluster/backend/zeromq/ZeroMQ-Proxy.h" #include "zeek/util.h" extern int signal_val; namespace zeek { namespace plugin::Zeek_Cluster_Backend_ZeroMQ { extern zeek::plugin::Zeek_Cluster_Backend_ZeroMQ::Plugin plugin; } namespace cluster::zeromq { enum class DebugFlag : zeek_uint_t { NONE = 0, POLL = 1, THREAD = 2, }; constexpr DebugFlag operator&(zeek_uint_t x, DebugFlag y) { return static_cast(x & static_cast(y)); } #define ZEROMQ_DEBUG(...) PLUGIN_DBG_LOG(zeek::plugin::Zeek_Cluster_Backend_ZeroMQ::plugin, __VA_ARGS__) #define ZEROMQ_THREAD_PRINTF(...) \ do { \ std::fprintf(stderr, "[zeromq] " __VA_ARGS__); \ } while ( 0 ) #define ZEROMQ_DEBUG_THREAD_PRINTF(flag, ...) \ do { \ if ( (debug_flags & flag) == flag ) { \ ZEROMQ_THREAD_PRINTF(__VA_ARGS__); \ } \ } while ( 0 ) ZeroMQBackend::ZeroMQBackend(std::unique_ptr es, std::unique_ptr ls, std::unique_ptr ehs) : ThreadedBackend(std::move(es), std::move(ls), std::move(ehs)) { log_push = zmq::socket_t(ctx, zmq::socket_type::push); main_inproc = zmq::socket_t(ctx, zmq::socket_type::pair); } ZeroMQBackend::~ZeroMQBackend() { try { // DoTerminate is idempotent. DoTerminate(); } catch ( ... ) { // This should never happen. abort(); } } void ZeroMQBackend::DoInitPostScript() { listen_xpub_endpoint = zeek::id::find_val("Cluster::Backend::ZeroMQ::listen_xpub_endpoint")->ToStdString(); listen_xsub_endpoint = zeek::id::find_val("Cluster::Backend::ZeroMQ::listen_xsub_endpoint")->ToStdString(); listen_xpub_nodrop = zeek::id::find_val("Cluster::Backend::ZeroMQ::listen_xpub_nodrop")->AsBool() ? 1 : 0; connect_xpub_endpoint = zeek::id::find_val("Cluster::Backend::ZeroMQ::connect_xpub_endpoint")->ToStdString(); connect_xsub_endpoint = zeek::id::find_val("Cluster::Backend::ZeroMQ::connect_xsub_endpoint")->ToStdString(); listen_log_endpoint = zeek::id::find_val("Cluster::Backend::ZeroMQ::listen_log_endpoint")->ToStdString(); poll_max_messages = zeek::id::find_val("Cluster::Backend::ZeroMQ::poll_max_messages")->Get(); debug_flags = zeek::id::find_val("Cluster::Backend::ZeroMQ::debug_flags")->Get(); event_unsubscription = zeek::event_registry->Register("Cluster::Backend::ZeroMQ::unsubscription"); event_subscription = zeek::event_registry->Register("Cluster::Backend::ZeroMQ::subscription"); } void ZeroMQBackend::DoTerminate() { ThreadedBackend::DoTerminate(); ZEROMQ_DEBUG("Shutting down ctx"); ctx.shutdown(); ZEROMQ_DEBUG("Joining self_thread"); if ( self_thread.joinable() ) self_thread.join(); ZEROMQ_DEBUG("Joined self_thread"); // Close the sockets that are used from the main thread, // the remaining sockets are closed by self_thread. log_push.close(); main_inproc.close(); ZEROMQ_DEBUG("Closing ctx"); ctx.close(); // If running the proxy thread, terminate it, too. if ( proxy_thread ) { ZEROMQ_DEBUG("Shutting down proxy thread"); proxy_thread->Shutdown(); proxy_thread.reset(); } ZEROMQ_DEBUG("Terminated"); } bool ZeroMQBackend::DoInit() { xsub = zmq::socket_t(ctx, zmq::socket_type::xsub); xpub = zmq::socket_t(ctx, zmq::socket_type::xpub); log_pull = zmq::socket_t(ctx, zmq::socket_type::pull); child_inproc = zmq::socket_t(ctx, zmq::socket_type::pair); auto linger_ms = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::linger_ms")->AsInt()); int xpub_nodrop = zeek::id::find_val("Cluster::Backend::ZeroMQ::xpub_nodrop")->AsBool() ? 1 : 0; xpub.set(zmq::sockopt::linger, linger_ms); xpub.set(zmq::sockopt::xpub_nodrop, xpub_nodrop); // Enable XPUB_VERBOSE unconditional to enforce nodes receiving // notifications about any new subscriptions, even if they have // seen them before. This is needed to for the subscribe callback // functionality to work reliably. xpub.set(zmq::sockopt::xpub_verbose, 1); try { xsub.connect(connect_xsub_endpoint); } catch ( zmq::error_t& err ) { zeek::reporter->Error("ZeroMQ: Failed to connect to XSUB %s: %s", connect_xsub_endpoint.c_str(), err.what()); return false; } try { xpub.connect(connect_xpub_endpoint); } catch ( zmq::error_t& err ) { zeek::reporter->Error("ZeroMQ: Failed to connect to XPUB %s: %s", connect_xpub_endpoint.c_str(), err.what()); return false; } auto log_immediate = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::log_immediate")->AsBool()); auto log_sndhwm = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::log_sndhwm")->AsInt()); auto log_sndbuf = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::log_sndbuf")->AsInt()); auto log_rcvhwm = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::log_rcvhwm")->AsInt()); auto log_rcvbuf = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::log_rcvbuf")->AsInt()); ZEROMQ_DEBUG("Setting log_sndhwm=%d log_sndbuf=%d log_rcvhwm=%d log_rcvbuf=%d linger_ms=%d", log_sndhwm, log_sndbuf, log_rcvhwm, log_rcvbuf, linger_ms); log_push.set(zmq::sockopt::sndhwm, log_sndhwm); log_push.set(zmq::sockopt::sndbuf, log_sndbuf); log_push.set(zmq::sockopt::linger, linger_ms); log_push.set(zmq::sockopt::immediate, log_immediate); log_pull.set(zmq::sockopt::rcvhwm, log_rcvhwm); log_pull.set(zmq::sockopt::rcvbuf, log_rcvbuf); if ( ! listen_log_endpoint.empty() ) { ZEROMQ_DEBUG("Listening on log pull socket: %s", listen_log_endpoint.c_str()); try { log_pull.bind(listen_log_endpoint); } catch ( zmq::error_t& err ) { zeek::reporter->Error("ZeroMQ: Failed to bind to PULL socket %s: %s", listen_log_endpoint.c_str(), err.what()); return false; } } const auto& log_endpoints = zeek::id::find_val("Cluster::Backend::ZeroMQ::connect_log_endpoints"); for ( unsigned int i = 0; i < log_endpoints->Size(); i++ ) connect_log_endpoints.push_back(log_endpoints->StringValAt(i)->ToStdString()); for ( const auto& endp : connect_log_endpoints ) { ZEROMQ_DEBUG("Connecting log_push socket with %s", endp.c_str()); try { log_push.connect(endp); } catch ( zmq::error_t& err ) { zeek::reporter->Error("ZeroMQ: Failed to connect to PUSH socket %s: %s", endp.c_str(), err.what()); return false; } } // At this point we've connected xpub/xsub and any logging endpoints. // However, we cannot tell if we're connected to anything as ZeroMQ does // not trivially expose this information. // // There is the zmq_socket_monitor() API that we could use to get some // more low-level events in the future for logging and possibly script // layer eventing: http://api.zeromq.org/4-2:zmq-socket-monitor // As of now, message processing happens in a separate thread that is // started below. If we wanted to integrate ZeroMQ as a selectable IO // source rather than going through ThreadedBackend and its flare, the // following post might be useful: // // https://funcptr.net/2012/09/10/zeromq---edge-triggered-notification/ // Setup connectivity between main and child thread. main_inproc.bind("inproc://inproc-bridge"); child_inproc.connect("inproc://inproc-bridge"); // Thread is joined in backend->DoTerminate(), backend outlives it. self_thread = std::thread([](auto* backend) { backend->Run(); }, this); // After connecting, call ThreadedBackend::DoInit() to register // the IO source with the loop. return ThreadedBackend::DoInit(); } bool ZeroMQBackend::SpawnZmqProxyThread() { proxy_thread = std::make_unique(listen_xpub_endpoint, listen_xsub_endpoint, listen_xpub_nodrop); return proxy_thread->Start(); } bool ZeroMQBackend::DoPublishEvent(const std::string& topic, const std::string& format, const cluster::detail::byte_buffer& buf) { // Publishing an event happens as a multipart message with 4 parts: // // * The topic to publish to - this is required by XPUB/XSUB // * The node's identifier - see Cluster::node_id(). // * The format used to serialize the event. // * The serialized event itself. std::array parts = { zmq::const_buffer(topic.data(), topic.size()), zmq::const_buffer(NodeId().data(), NodeId().size()), zmq::const_buffer(format.data(), format.size()), zmq::const_buffer(buf.data(), buf.size()), }; ZEROMQ_DEBUG("Publishing %zu bytes to %s", buf.size(), topic.c_str()); for ( size_t i = 0; i < parts.size(); i++ ) { zmq::send_flags flags = zmq::send_flags::none; if ( i < parts.size() - 1 ) flags = flags | zmq::send_flags::sndmore; // This should never fail, it will instead block // when HWM is reached. I guess we need to see if // and how this can happen :-/ try { main_inproc.send(parts[i], flags); } catch ( zmq::error_t& err ) { // If send() was interrupted and Zeek caught an interrupt or term signal, // fail the publish as we're about to shutdown. There's nothing the user // can do, but it indicates an overload situation as send() was blocking. if ( err.num() == EINTR && (signal_val == SIGINT || signal_val == SIGTERM) ) { zeek::reporter->Error("Failed publish() using ZeroMQ backend at shutdown: %s (signal_val=%d)", err.what(), signal_val); return false; } zeek::reporter->Error("Unexpected ZeroMQ::DoPublishEvent() error: %s", err.what()); return false; } } return true; } bool ZeroMQBackend::DoSubscribe(const std::string& topic_prefix, SubscribeCallback cb) { ZEROMQ_DEBUG("Subscribing to %s", topic_prefix.c_str()); try { // Prepend 0x01 byte to indicate subscription to XSUB socket // This is the XSUB API instead of setsockopt(ZMQ_SUBSCRIBE). std::string msg = "\x01" + topic_prefix; main_inproc.send(zmq::const_buffer(msg.data(), msg.size())); } catch ( zmq::error_t& err ) { zeek::reporter->Error("Failed to subscribe to topic %s: %s", topic_prefix.c_str(), err.what()); if ( cb ) cb(topic_prefix, {CallbackStatus::Error, err.what()}); return false; } // Store the callback for later. if ( cb ) subscription_callbacks.insert({topic_prefix, cb}); return true; } bool ZeroMQBackend::DoUnsubscribe(const std::string& topic_prefix) { ZEROMQ_DEBUG("Unsubscribing %s", topic_prefix.c_str()); try { // Prepend 0x00 byte to indicate subscription to XSUB socket. // This is the XSUB API instead of setsockopt(ZMQ_SUBSCRIBE). std::string msg = '\0' + topic_prefix; main_inproc.send(zmq::const_buffer(msg.data(), msg.size())); } catch ( zmq::error_t& err ) { zeek::reporter->Error("Failed to unsubscribe from topic %s: %s", topic_prefix.c_str(), err.what()); return false; } return true; } bool ZeroMQBackend::DoPublishLogWrites(const logging::detail::LogWriteHeader& header, const std::string& format, cluster::detail::byte_buffer& buf) { ZEROMQ_DEBUG("Publishing %zu bytes of log writes (path %s)", buf.size(), header.path.c_str()); static std::string message_type = "log-write"; // Publishing a log write is done using 4 parts // // * A constant "log-write" string // * The node's identifier - see Cluster::node_id(). // * The format used to serialize the log write. // * The serialized log write itself. std::array parts = { zmq::const_buffer{message_type.data(), message_type.size()}, zmq::const_buffer(NodeId().data(), NodeId().size()), zmq::const_buffer{format.data(), format.size()}, zmq::const_buffer{buf.data(), buf.size()}, }; zmq::send_result_t result; for ( size_t i = 0; i < parts.size(); i++ ) { zmq::send_flags flags = zmq::send_flags::dontwait; if ( i < parts.size() - 1 ) flags = flags | zmq::send_flags::sndmore; result = log_push.send(parts[i], flags); if ( ! result ) { // XXX: Not exactly clear what we should do if we reach HWM. // we could block and hope a logger comes along that empties // our internal queue, or discard messages and log very loudly // and have metrics about it. However, this may happen regularly // at shutdown. // // Maybe that should be configurable? // If no logging endpoints were configured, that almost seems on // purpose (and there's a warning elsewhere about this), so skip // logging an error when sending fails. if ( connect_log_endpoints.empty() ) return true; reporter->Error("Failed to send log write. HWM reached?"); return false; } } return true; } void ZeroMQBackend::Run() { char name[4 + 2 + 16 + 1]{}; // zmq-0x<8byte pointer in hex> snprintf(name, sizeof(name), "zmq-%p", this); util::detail::set_thread_name(name); ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::THREAD, "Thread starting (%p)\n", this); using MultipartMessage = std::vector; auto HandleLogMessages = [this](const std::vector& msgs) { for ( const auto& msg : msgs ) { // sender, format, type, payload if ( msg.size() != 4 ) { ZEROMQ_THREAD_PRINTF("log: error: expected 4 parts, have %zu!\n", msg.size()); continue; } detail::byte_buffer payload{msg[3].data(), msg[3].data() + msg[3].size()}; LogMessage lm{.format = std::string(msg[2].data(), msg[2].size()), .payload = std::move(payload)}; QueueForProcessing(std::move(lm)); } }; auto HandleInprocMessages = [this](std::vector& msgs) { // Forward messages from the inprocess bridge to XSUB for subscription // subscription handling (1 part) or XPUB for publishing (4 parts). for ( auto& msg : msgs ) { assert(msg.size() == 1 || msg.size() == 4); if ( msg.size() == 1 ) { xsub.send(msg[0], zmq::send_flags::none); } else { for ( auto& part : msg ) { zmq::send_flags flags = zmq::send_flags::dontwait; if ( part.more() ) flags = flags | zmq::send_flags::sndmore; zmq::send_result_t result; int tries = 0; do { try { result = xpub.send(part, flags); } catch ( zmq::error_t& err ) { if ( err.num() == ETERM ) return; // XXX: What other error can happen here? How should we react? ZEROMQ_THREAD_PRINTF("xpub: Failed to publish with error %s (%d)\n", err.what(), err.num()); break; } // Empty result means xpub.send() returned EAGAIN. The // socket reached its high water mark and we should // relax / backoff a bit. Otherwise we'll be spinning // unproductively very fast here. Note that this is going // to build up backpressure and eventually inproc.send() // will block from the main thread. if ( ! result ) { ++tries; auto sleep_for = std::min(tries * 10, 500); ZEROMQ_THREAD_PRINTF( "xpub: Failed forward inproc to xpub! Overloaded? (tries=%d sleeping %d ms)\n", tries, sleep_for); std::this_thread::sleep_for(std::chrono::milliseconds(sleep_for)); } } while ( ! result ); } } } }; auto HandleXPubMessages = [this](const std::vector& msgs) { for ( const auto& msg : msgs ) { if ( msg.size() != 1 ) { ZEROMQ_THREAD_PRINTF("xpub: error: expected 1 part, have %zu!\n", msg.size()); continue; } // Check if the messages starts with \x00 or \x01 to understand if it's // a subscription or unsubscription message. auto first = *reinterpret_cast(msg[0].data()); if ( first == 0 || first == 1 ) { QueueMessage qm; auto* start = msg[0].data() + 1; auto* end = msg[0].data() + msg[0].size(); detail::byte_buffer topic(start, end); if ( first == 1 ) { qm = BackendMessage{1, std::move(topic)}; } else if ( first == 0 ) { qm = BackendMessage{0, std::move(topic)}; } else { ZEROMQ_THREAD_PRINTF("xpub: error: unexpected first char: have '0x%02x'", first); continue; } QueueForProcessing(std::move(qm)); } } }; auto HandleXSubMessages = [this](const std::vector& msgs) { for ( const auto& msg : msgs ) { if ( msg.size() != 4 ) { ZEROMQ_THREAD_PRINTF("xsub: error: expected 4 parts, have %zu!\n", msg.size()); continue; } // Filter out messages that are coming from this node. std::string sender(msg[1].data(), msg[1].size()); if ( sender == NodeId() ) continue; detail::byte_buffer payload{msg[3].data(), msg[3].data() + msg[3].size()}; EventMessage em{.topic = std::string(msg[0].data(), msg[0].size()), .format = std::string(msg[2].data(), msg[2].size()), .payload = std::move(payload)}; QueueForProcessing(std::move(em)); } }; // Helper class running at destruction. class Deferred { public: Deferred(std::function deferred) : closer(std::move(deferred)) {} ~Deferred() { closer(); } private: std::function closer; }; struct SocketInfo { zmq::socket_ref socket; std::string name; std::function&)> handler; }; std::vector sockets = { {.socket = child_inproc, .name = "inproc", .handler = HandleInprocMessages}, {.socket = xpub, .name = "xpub", .handler = HandleXPubMessages}, {.socket = xsub, .name = "xsub", .handler = HandleXSubMessages}, {.socket = log_pull, .name = "log_pull", .handler = HandleLogMessages}, }; // Called when Run() terminates. auto deferred_close = Deferred([this]() { child_inproc.close(); xpub.close(); xsub.close(); log_pull.close(); ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::THREAD, "Thread sockets closed (%p)\n", this); }); std::vector poll_items(sockets.size()); while ( true ) { for ( size_t i = 0; i < sockets.size(); i++ ) poll_items[i] = {.socket = sockets[i].socket.handle(), .fd = 0, .events = ZMQ_POLLIN | ZMQ_POLLERR}; // Awkward. std::vector> rcv_messages(sockets.size()); try { int r = zmq::poll(poll_items, std::chrono::seconds(-1)); ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::POLL, "poll: r=%d", r); for ( size_t i = 0; i < poll_items.size(); i++ ) { const auto& item = poll_items[i]; ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::POLL, "poll: items[%lu]=%s %s %s\n", i, sockets[i].name.c_str(), item.revents & ZMQ_POLLIN ? "pollin " : "", item.revents & ZMQ_POLLERR ? "err" : ""); if ( item.revents & ZMQ_POLLERR ) { // What should we be doing? Re-open sockets? Terminate? ZEROMQ_THREAD_PRINTF("poll: error: POLLERR on socket %zu %s %p revents=%x\n", i, sockets[i].name.c_str(), item.socket, item.revents); } // Nothing to do? if ( (item.revents & ZMQ_POLLIN) == 0 ) continue; bool consumed_one = false; // Read messages from the socket. do { zmq::message_t msg; rcv_messages[i].emplace_back(); // make room for a multipart message auto& into = rcv_messages[i].back(); // Only receive up to poll_max_messages from an individual // socket. Move on to the next when exceeded. The last pushed // message (empty) is popped at the end of the loop. if ( poll_max_messages > 0 && rcv_messages[i].size() > poll_max_messages ) { ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::POLL, "poll: %s rcv_messages[%zu] full!\n", sockets[i].name.c_str(), i); break; } consumed_one = false; bool more = false; // Read a multi-part message. do { auto recv_result = sockets[i].socket.recv(msg, zmq::recv_flags::dontwait); if ( recv_result ) { consumed_one = true; more = msg.more(); into.emplace_back(std::move(msg)); } else { // EAGAIN and more flag set? Try again! if ( more ) continue; } } while ( more ); } while ( consumed_one ); assert(rcv_messages[i].back().size() == 0); rcv_messages[i].pop_back(); } } catch ( zmq::error_t& err ) { if ( err.num() != ETERM ) throw; // Shutdown. ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::THREAD, "Thread terminating (%p)\n", this); break; } // At this point, we've received anything that was readable from the sockets. // Now interpret and enqueue it into messages. for ( size_t i = 0; i < sockets.size(); i++ ) { if ( rcv_messages[i].empty() ) continue; sockets[i].handler(rcv_messages[i]); } } } bool ZeroMQBackend::DoProcessBackendMessage(int tag, detail::byte_buffer_span payload) { if ( tag == 0 || tag == 1 ) { std::string topic{reinterpret_cast(payload.data()), payload.size()}; zeek::EventHandlerPtr eh; if ( tag == 1 ) { // If this is the first time the subscription was observed, raise // the ZeroMQ internal event. if ( xpub_subscriptions.count(topic) == 0 ) { eh = event_subscription; xpub_subscriptions.insert(topic); } if ( const auto& cbit = subscription_callbacks.find(topic); cbit != subscription_callbacks.end() ) { const auto& cb = cbit->second; if ( cb ) cb(topic, {CallbackStatus::Success, "success"}); subscription_callbacks.erase(cbit); } } else if ( tag == 0 ) { eh = event_unsubscription; xpub_subscriptions.erase(topic); } ZEROMQ_DEBUG("BackendMessage: %s for %s", eh != nullptr ? eh->Name() : "", topic.c_str()); if ( eh ) EnqueueEvent(eh, zeek::Args{zeek::make_intrusive(topic)}); return true; } else { zeek::reporter->Error("Ignoring bad BackendMessage tag=%d", tag); return false; } } } // namespace cluster::zeromq } // namespace zeek