zeek/src/cluster/backend/zeromq/ZeroMQ-Proxy.cc
Arne Welzel 35c79ab2e3 cluster/backend/zeromq: Add ZeroMQ based cluster backend
This is a cluster backend implementation using a central XPUB/XSUB proxy
that by default runs on the manager node. Logging is implemented leveraging
PUSH/PULL sockets between logger and other nodes, rather than going
through XPUB/XSUB.

The test-all-policy-cluster baseline changed: Previously, Broker::peer()
would be called from setup-connections.zeek, causing the IO loop to be
alive. With the ZeroMQ backend, the IO loop is only alive when
Cluster::init() is called, but that doesn't happen anymore.
2024-12-10 20:33:02 +01:00

72 lines
1.8 KiB
C++

// See the file "COPYING" in the main distribution directory for copyright.
#include "zeek/cluster/backend/zeromq/ZeroMQ-Proxy.h"
#include <zmq.hpp>
#include "zeek/Reporter.h"
#include "zeek/util.h"
using namespace zeek::cluster::zeromq;
namespace {
/**
* Function that runs zmq_proxy() that provides a central XPUB/XSUB
* broker for other Zeek nodes to connect and exchange subscription
* information.
*/
void thread_fun(ProxyThread::Args* args) {
zeek::util::detail::set_thread_name("zmq-proxy-thread");
try {
zmq::proxy(args->xsub, args->xpub, zmq::socket_ref{} /*capture*/);
} catch ( zmq::error_t& err ) {
args->xsub.close();
args->xpub.close();
if ( err.num() != ETERM ) {
std::fprintf(stderr, "[zeromq] unexpected zmq_proxy() error: %s (%d)", err.what(), err.num());
throw;
}
}
}
} // namespace
bool ProxyThread::Start() {
zmq::socket_t xpub(ctx, zmq::socket_type::xpub);
zmq::socket_t xsub(ctx, zmq::socket_type::xsub);
xpub.set(zmq::sockopt::xpub_nodrop, xpub_nodrop);
try {
xpub.bind(xpub_endpoint);
} catch ( zmq::error_t& err ) {
zeek::reporter->Error("Failed to bind xpub socket %s: %s (%d)", xpub_endpoint.c_str(), err.what(), err.num());
return false;
}
try {
xsub.bind(xsub_endpoint);
} catch ( zmq::error_t& err ) {
zeek::reporter->Error("Failed to bind xsub socket %s: %s (%d)", xpub_endpoint.c_str(), err.what(), err.num());
return false;
}
args = {.xpub = std::move(xpub), .xsub = std::move(xsub)};
thread = std::thread(thread_fun, &args);
return true;
}
void ProxyThread::Shutdown() {
ctx.shutdown();
if ( thread.joinable() )
thread.join();
ctx.close();
}