mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 06:38:20 +00:00
cluster/zeromq: Improve EINTR handling
When using ZeroMQ also within the Supervisor process, zmq::poll() and recv() were observed to return EINTR, handle these.
This commit is contained in:
parent
69b7bcc323
commit
a318463c1c
4 changed files with 146 additions and 12 deletions
|
@ -20,15 +20,23 @@ namespace {
|
|||
void thread_fun(ProxyThread::Args* args) {
|
||||
zeek::util::detail::set_thread_name("zmq-proxy-thread");
|
||||
|
||||
try {
|
||||
zmq::proxy(args->xsub, args->xpub, zmq::socket_ref{} /*capture*/);
|
||||
} catch ( zmq::error_t& err ) {
|
||||
args->xsub.close();
|
||||
args->xpub.close();
|
||||
bool done = false;
|
||||
|
||||
if ( err.num() != ETERM ) {
|
||||
std::fprintf(stderr, "[zeromq] unexpected zmq_proxy() error: %s (%d)", err.what(), err.num());
|
||||
throw;
|
||||
while ( ! done ) {
|
||||
try {
|
||||
zmq::proxy(args->xsub, args->xpub, zmq::socket_ref{} /*capture*/);
|
||||
} catch ( zmq::error_t& err ) {
|
||||
if ( err.num() == EINTR )
|
||||
continue;
|
||||
|
||||
done = true;
|
||||
args->xsub.close();
|
||||
args->xpub.close();
|
||||
|
||||
if ( err.num() != ETERM ) {
|
||||
std::fprintf(stderr, "[zeromq] unexpected zmq_proxy() error: %s (%d)", err.what(), err.num());
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -650,8 +650,17 @@ void ZeroMQBackend::Run() {
|
|||
// Awkward.
|
||||
std::vector<std::vector<MultipartMessage>> rcv_messages(sockets.size());
|
||||
try {
|
||||
int r = zmq::poll(poll_items, std::chrono::seconds(-1));
|
||||
ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::POLL, "poll: r=%d", r);
|
||||
try {
|
||||
int r = zmq::poll(poll_items, std::chrono::seconds(-1));
|
||||
ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::POLL, "poll: r=%d", r);
|
||||
} catch ( const zmq::error_t& err ) {
|
||||
ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::POLL, "poll exception: what=%s num=%d", err.what(), err.num());
|
||||
// Retry interrupted zmq::poll() calls.
|
||||
if ( err.num() == EINTR )
|
||||
continue;
|
||||
|
||||
throw;
|
||||
}
|
||||
|
||||
for ( size_t i = 0; i < poll_items.size(); i++ ) {
|
||||
const auto& item = poll_items[i];
|
||||
|
@ -691,7 +700,17 @@ void ZeroMQBackend::Run() {
|
|||
|
||||
// Read a multi-part message.
|
||||
do {
|
||||
auto recv_result = sockets[i].socket.recv(msg, zmq::recv_flags::dontwait);
|
||||
zmq::recv_result_t recv_result;
|
||||
try {
|
||||
recv_result = sockets[i].socket.recv(msg, zmq::recv_flags::dontwait);
|
||||
} catch ( const zmq::error_t& err ) {
|
||||
// Retry interrupted recv() calls.
|
||||
if ( err.num() == EINTR )
|
||||
continue;
|
||||
|
||||
throw;
|
||||
}
|
||||
|
||||
if ( recv_result ) {
|
||||
consumed_one = true;
|
||||
more = msg.more();
|
||||
|
@ -708,7 +727,7 @@ void ZeroMQBackend::Run() {
|
|||
assert(rcv_messages[i].back().size() == 0);
|
||||
rcv_messages[i].pop_back();
|
||||
}
|
||||
} catch ( zmq::error_t& err ) {
|
||||
} catch ( const zmq::error_t& err ) {
|
||||
if ( err.num() != ETERM )
|
||||
throw;
|
||||
|
||||
|
|
|
@ -0,0 +1,21 @@
|
|||
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||
logger got hello from manager (zeromq_manager_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
logger got hello from proxy (zeromq_proxy_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
logger got hello from worker-1 (zeromq_worker-1_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
logger got hello from worker-2 (zeromq_worker-2_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
manager got hello from logger (zeromq_logger_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
manager got hello from proxy (zeromq_proxy_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
manager got hello from worker-1 (zeromq_worker-1_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
manager got hello from worker-2 (zeromq_worker-2_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
proxy got hello from logger (zeromq_logger_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
proxy got hello from manager (zeromq_manager_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
proxy got hello from worker-1 (zeromq_worker-1_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
proxy got hello from worker-2 (zeromq_worker-2_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
worker-1 got hello from logger (zeromq_logger_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
worker-1 got hello from manager (zeromq_manager_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
worker-1 got hello from proxy (zeromq_proxy_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
worker-1 got hello from worker-2 (zeromq_worker-2_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
worker-2 got hello from logger (zeromq_logger_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
worker-2 got hello from manager (zeromq_manager_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
worker-2 got hello from proxy (zeromq_proxy_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
worker-2 got hello from worker-1 (zeromq_worker-1_<hostname>_<pid>_NrFj3eGxkRR5)
|
86
testing/btest/cluster/zeromq/supervisor-runs-zmq-proxy.zeek
Normal file
86
testing/btest/cluster/zeromq/supervisor-runs-zmq-proxy.zeek
Normal file
|
@ -0,0 +1,86 @@
|
|||
# @TEST-DOC: Test a Zeek cluster where the ZeroMQ proxy thread is spawned by the supervisor instead of the manager.
|
||||
#
|
||||
# @TEST-REQUIRES: have-zeromq
|
||||
#
|
||||
# @TEST-GROUP: cluster-zeromq
|
||||
#
|
||||
# @TEST-PORT: XPUB_PORT
|
||||
# @TEST-PORT: XSUB_PORT
|
||||
# @TEST-PORT: LOG_PULL_PORT
|
||||
|
||||
# @TEST-EXEC: chmod +x ./check-cluster-log.sh
|
||||
#
|
||||
# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek
|
||||
#
|
||||
# @TEST-EXEC: btest-bg-run supervisor "ZEEKPATH=$ZEEKPATH:.. && zeek -j ../supervisor-runs-zmq-proxy.zeek >out"
|
||||
# @TEST-EXEC: btest-bg-wait 30
|
||||
# @TEST-EXEC: btest-diff supervisor/cluster.log
|
||||
|
||||
redef Log::default_rotation_interval = 0secs;
|
||||
redef Log::flush_interval = 0.01sec;
|
||||
|
||||
@load ./zeromq-test-bootstrap
|
||||
|
||||
redef Cluster::Backend::ZeroMQ::run_proxy_thread = F;
|
||||
|
||||
# The supervisor peeks into logger/cluster.log to initate a shutdown when
|
||||
# all nodes have said hello to each other. See the check-cluster.log.sh
|
||||
# script below.
|
||||
event check_cluster_log() {
|
||||
system_env("../check-cluster-log.sh", table(["SUPERVISOR_PID"] = cat(getpid())));
|
||||
|
||||
schedule 0.1sec { check_cluster_log() };
|
||||
}
|
||||
|
||||
event zeek_init()
|
||||
{
|
||||
if ( ! Supervisor::is_supervisor() )
|
||||
return;
|
||||
|
||||
if ( ! Cluster::Backend::ZeroMQ::spawn_zmq_proxy_thread() )
|
||||
Reporter::fatal("Failed to spawn proxy thread");
|
||||
|
||||
local cluster: table[string] of Supervisor::ClusterEndpoint;
|
||||
cluster["manager"] = [$role=Supervisor::MANAGER, $host=127.0.0.1, $p=0/unknown];
|
||||
cluster["logger"] = [$role=Supervisor::LOGGER, $host=127.0.0.1, $p=to_port(getenv("LOG_PULL_PORT"))];
|
||||
cluster["proxy"] = [$role=Supervisor::PROXY, $host=127.0.0.1, $p=0/unknown];
|
||||
cluster["worker-1"] = [$role=Supervisor::WORKER, $host=127.0.0.1, $p=0/unknown];
|
||||
cluster["worker-2"] = [$role=Supervisor::WORKER, $host=127.0.0.1, $p=0/unknown];
|
||||
|
||||
for ( n, ep in cluster )
|
||||
{
|
||||
local sn = Supervisor::NodeConfig($name=n, $bare_mode=T, $cluster=cluster, $directory=n);
|
||||
local res = Supervisor::create(sn);
|
||||
|
||||
if ( res != "" )
|
||||
print fmt("supervisor failed to create node '%s': %s", n, res);
|
||||
}
|
||||
|
||||
# Start polling the cluster.log
|
||||
event check_cluster_log();
|
||||
}
|
||||
|
||||
# @TEST-START-FILE check-cluster-log.sh
|
||||
#!/bin/sh
|
||||
#
|
||||
# This script checks logger/cluster.log until the expected number
|
||||
# of log entries have been observed and puts a normalized version
|
||||
# into the current directory. This runs from the supervisor.
|
||||
if [ ! -f logger/cluster.log ]; then
|
||||
exit 1;
|
||||
fi
|
||||
|
||||
if [ -f DONE ]; then
|
||||
exit 0
|
||||
fi
|
||||
|
||||
# Remove hostname and pid from node id in message.
|
||||
zeek-cut node message < logger/cluster.log | sed -r 's/_[^_]+_[0-9]+_/_<hostname>_<pid>_/g' | sort > cluster.log
|
||||
|
||||
if [ $(wc -l < cluster.log) = 20 ]; then
|
||||
echo "DONE!" >&2
|
||||
# Trigger shutdown through supervisor.
|
||||
kill ${ZEEK_ARG_SUPERVISOR_PID};
|
||||
echo "DONE" > DONE
|
||||
fi
|
||||
# @TEST-END-FILE
|
Loading…
Add table
Add a link
Reference in a new issue