diff --git a/src/cluster/backend/zeromq/ZeroMQ.cc b/src/cluster/backend/zeromq/ZeroMQ.cc index 7af2fb41ac..acbc430018 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.cc +++ b/src/cluster/backend/zeromq/ZeroMQ.cc @@ -124,6 +124,14 @@ void ZeroMQBackend::DoInitPostScript() { xpub_sndbuf = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::xpub_sndbuf")->AsInt()); xsub_rcvhwm = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::xsub_rcvhwm")->AsInt()); xsub_rcvbuf = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::xsub_rcvbuf")->AsInt()); + + // log push/pull socket configuration + log_immediate = + static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::log_immediate")->AsBool()); + log_sndhwm = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::log_sndhwm")->AsInt()); + log_sndbuf = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::log_sndbuf")->AsInt()); + log_rcvhwm = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::log_rcvhwm")->AsInt()); + log_rcvbuf = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::log_rcvbuf")->AsInt()); } void ZeroMQBackend::DoTerminate() { @@ -206,22 +214,6 @@ bool ZeroMQBackend::DoInit() { return false; } - - auto log_immediate = - static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::log_immediate")->AsBool()); - - auto log_sndhwm = - static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::log_sndhwm")->AsInt()); - - auto log_sndbuf = - static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::log_sndbuf")->AsInt()); - - auto log_rcvhwm = - static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::log_rcvhwm")->AsInt()); - - auto log_rcvbuf = - static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::log_rcvbuf")->AsInt()); - ZEROMQ_DEBUG("Setting log_sndhwm=%d log_sndbuf=%d log_rcvhwm=%d log_rcvbuf=%d linger_ms=%d", log_sndhwm, log_sndbuf, log_rcvhwm, log_rcvbuf, linger_ms); @@ -244,6 +236,11 @@ bool ZeroMQBackend::DoInit() { } } + // The connect_log_endpoints variable may be modified by zeek_init(), so + // need to look it up here rather than during DoInitPostScript(). + // + // We should've probably introduced a configuration record similar to the + // storage framework, too. Hmm. Maybe in the future. const auto& log_endpoints = zeek::id::find_val("Cluster::Backend::ZeroMQ::connect_log_endpoints"); for ( unsigned int i = 0; i < log_endpoints->Size(); i++ ) connect_log_endpoints.push_back(log_endpoints->StringValAt(i)->ToStdString()); @@ -313,9 +310,11 @@ bool ZeroMQBackend::DoPublishEvent(const std::string& topic, const std::string& if ( i < parts.size() - 1 ) flags = flags | zmq::send_flags::sndmore; - // This should never fail, it will instead block - // when HWM is reached. I guess we need to see if - // and how this can happen :-/ + // This never returns EAGAIN. A pair socket blocks whenever the hwm + // is reached, regardless of passing any dontwait flag. + // + // This can result in blocking on Cluster::publish() if the inner + // thread does not consume from child_inproc. try { main_inproc.send(parts[i], flags); } catch ( const zmq::error_t& err ) { @@ -512,7 +511,7 @@ void ZeroMQBackend::HandleInprocMessages(std::vector& msgs) { } } else { - ZEROMQ_THREAD_PRINTF("inproc: error: expected 1 or 4 parts, have %zu!\n", msg.size()); + ZEROMQ_THREAD_PRINTF("inproc: error: expected 2 or 4 parts, have %zu!\n", msg.size()); } } } diff --git a/src/cluster/backend/zeromq/ZeroMQ.h b/src/cluster/backend/zeromq/ZeroMQ.h index 2932f0161a..cb41a22d6a 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.h +++ b/src/cluster/backend/zeromq/ZeroMQ.h @@ -105,6 +105,13 @@ private: int xsub_rcvhwm = 1000; // libzmq default int xsub_rcvbuf = -1; // OS defaults + // log socket configuration + int log_immediate = false; // libzmq default + int log_sndhwm = 1000; // libzmq default + int log_sndbuf = -1; // OS defaults + int log_rcvhwm = 1000; // libzmq defaults + int log_rcvbuf = -1; // OS defaults + zmq::context_t ctx; zmq::socket_t xsub; zmq::socket_t xpub;