mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 06:38:20 +00:00
Supervisor: Handle EAGAIN error on stem pipe
util::safe_write() calls abort() in case of EAGAIN errors. This is easily observed when starting clusters with 32 workers or more. Add a custom write_message() function handling EAGAIN by retrying after a small sleep. It's not clear a more complicated poll() would be much better: The pipe might be ready for writing, but then our message might not actually fit in, resulting in another EAGAIN error. And even poll() would introduce blocking/sleeping code. Take some precautions against the stem and the supervisor dead-locking when both pipes are full by draining the other end on EAGAIN errors. Closes #3043
This commit is contained in:
parent
ddcf75e934
commit
92565d4739
4 changed files with 193 additions and 24 deletions
|
@ -119,7 +119,8 @@ struct Stem
|
|||
std::unique_ptr<detail::Flare> signal_flare;
|
||||
std::unique_ptr<detail::PipePair> pipe;
|
||||
std::map<std::string, SupervisorNode> nodes;
|
||||
std::string msg_buffer;
|
||||
// May be modified during Log() and ReportStatus() via write_msg().
|
||||
mutable std::string msg_buffer;
|
||||
bool shutting_down = false;
|
||||
};
|
||||
}
|
||||
|
@ -147,6 +148,11 @@ static RETSIGTYPE supervisor_signal_handler(int signo)
|
|||
return RETSIGVAL;
|
||||
}
|
||||
|
||||
static bool have_msgs(std::string* buffer, char delim)
|
||||
{
|
||||
return buffer->find(delim) != std::string::npos;
|
||||
}
|
||||
|
||||
static std::vector<std::string> extract_msgs(std::string* buffer, char delim)
|
||||
{
|
||||
std::vector<std::string> rval;
|
||||
|
@ -167,7 +173,8 @@ static std::vector<std::string> extract_msgs(std::string* buffer, char delim)
|
|||
return rval;
|
||||
}
|
||||
|
||||
static std::pair<int, std::vector<std::string>> read_msgs(int fd, std::string* buffer, char delim)
|
||||
// Read some bytes from the given fd and append them to buffer.
|
||||
static int read_bytes_into_buffer(int fd, std::string* buffer)
|
||||
{
|
||||
constexpr auto buf_size = 256;
|
||||
char buf[buf_size];
|
||||
|
@ -175,12 +182,74 @@ static std::pair<int, std::vector<std::string>> read_msgs(int fd, std::string* b
|
|||
int bytes_read = read(fd, buf, buf_size);
|
||||
|
||||
if ( bytes_read <= 0 )
|
||||
return {bytes_read, {}};
|
||||
return bytes_read;
|
||||
|
||||
buffer->append(buf, bytes_read);
|
||||
|
||||
return bytes_read;
|
||||
}
|
||||
|
||||
static std::pair<int, std::vector<std::string>> read_msgs(int fd, std::string* buffer, char delim)
|
||||
{
|
||||
int bytes_read = read_bytes_into_buffer(fd, buffer);
|
||||
if ( bytes_read < 0 && errno == EAGAIN ) // Treat EAGAIN as no bytes read.
|
||||
bytes_read = 0;
|
||||
|
||||
if ( bytes_read < 0 )
|
||||
return {bytes_read, {}};
|
||||
|
||||
return {bytes_read, extract_msgs(buffer, delim)};
|
||||
}
|
||||
|
||||
// Write a message to the OutFD() of the given pipe pair.
|
||||
//
|
||||
// Should the write() fail with EAGAIN, usleep() for a tiny amount
|
||||
// and retry after draining data pending on InFD() and appending
|
||||
// it to given buffer. Draining is done in case the other side of the
|
||||
// pipe is also blocking on a write() that we're blocking.
|
||||
static bool write_msg(const std::unique_ptr<detail::PipePair>& pipe, const std::string& msg,
|
||||
std::string* buffer)
|
||||
{
|
||||
// Send the string including its '\0'
|
||||
int len = msg.size() + 1;
|
||||
const char* data = msg.data();
|
||||
|
||||
while ( len > 0 )
|
||||
{
|
||||
int n = write(pipe->OutFD(), data, len);
|
||||
|
||||
if ( n < 0 )
|
||||
{
|
||||
if ( errno == EINTR )
|
||||
continue;
|
||||
|
||||
// If there's no room in the pipe and we would've blocked, relax
|
||||
// a bit, but otherwise just retry again. No fancy polling.
|
||||
//
|
||||
// We drain the stem's queue for a best effort preventing it
|
||||
// running into a full pipe as well.
|
||||
if ( errno == EAGAIN || errno == EWOULDBLOCK )
|
||||
{
|
||||
read_bytes_into_buffer(pipe->InFD(), buffer);
|
||||
usleep(10);
|
||||
continue;
|
||||
}
|
||||
|
||||
char buf[128];
|
||||
util::zeek_strerror_r(errno, buf, sizeof(buf));
|
||||
fprintf(stderr, "write_msg error: %d (%s)\n", errno, buf);
|
||||
abort();
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
data += n;
|
||||
len -= n;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
static std::string make_create_message(const Supervisor::NodeConfig& node)
|
||||
{
|
||||
auto json_str = node.ToJSON();
|
||||
|
@ -479,7 +548,7 @@ void Supervisor::HandleChildSignal()
|
|||
{
|
||||
const auto& node = n.second;
|
||||
auto msg = make_create_message(node.config);
|
||||
util::safe_write(stem_pipe->OutFD(), msg.data(), msg.size() + 1);
|
||||
write_msg(stem_pipe, msg, &msg_buffer);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -507,6 +576,11 @@ void Supervisor::InitPostScript()
|
|||
|
||||
double Supervisor::GetNextTimeout()
|
||||
{
|
||||
// If there's any messages to be processed in the mg_buffer,
|
||||
// let Zeek's IO loop know we are ready.
|
||||
if ( have_msgs(&msg_buffer, '\0') )
|
||||
return 0.0;
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -929,7 +1003,7 @@ void Stem::Shutdown(int exit_code)
|
|||
void Stem::ReportStatus(const SupervisorNode& node) const
|
||||
{
|
||||
std::string msg = util::fmt("status %s %d", node.Name().data(), node.pid);
|
||||
util::safe_write(pipe->OutFD(), msg.data(), msg.size() + 1);
|
||||
write_msg(pipe, msg, &msg_buffer);
|
||||
}
|
||||
|
||||
void Stem::Log(std::string_view type, const char* format, va_list args) const
|
||||
|
@ -946,7 +1020,7 @@ void Stem::Log(std::string_view type, const char* format, va_list args) const
|
|||
std::string msg{type.data(), type.size()};
|
||||
msg += " ";
|
||||
msg += raw_msg;
|
||||
util::safe_write(pipe->OutFD(), msg.data(), msg.size() + 1);
|
||||
write_msg(pipe, msg, &msg_buffer);
|
||||
}
|
||||
|
||||
void Stem::LogDebug(const char* format, ...) const
|
||||
|
@ -1009,7 +1083,7 @@ std::optional<SupervisedNode> Stem::Poll()
|
|||
|
||||
// Note: the poll timeout here is for periodically checking if the parent
|
||||
// process died (see below).
|
||||
constexpr auto poll_timeout_ms = 1000;
|
||||
int poll_timeout_ms = have_msgs(&msg_buffer, '\0') ? 0 : 1000;
|
||||
auto res = poll(pfds.get(), total_fd_count, poll_timeout_ms);
|
||||
|
||||
if ( res < 0 )
|
||||
|
@ -1072,23 +1146,35 @@ std::optional<SupervisedNode> Stem::Poll()
|
|||
node.stderr_pipe.Process();
|
||||
}
|
||||
|
||||
if ( ! pfds[0].revents )
|
||||
// No messages from supervisor to process, so return early.
|
||||
return {};
|
||||
|
||||
auto [bytes_read, msgs] = read_msgs(pipe->InFD(), &msg_buffer, '\0');
|
||||
|
||||
if ( bytes_read == 0 )
|
||||
// Process messages from Supervisor.
|
||||
//
|
||||
// Either the supervisor sent some new data and/or we already
|
||||
// have some in msg_buffer populated during write_msg().
|
||||
std::vector<std::string> msgs;
|
||||
if ( pfds[0].revents )
|
||||
{
|
||||
// EOF, supervisor must have exited
|
||||
DBG_STEM("Stem EOF");
|
||||
Shutdown(14);
|
||||
// New data from Supervisor.
|
||||
auto [bytes_read, msgs_] = read_msgs(pipe->InFD(), &msg_buffer, '\0');
|
||||
|
||||
if ( bytes_read == 0 )
|
||||
{
|
||||
// EOF, supervisor must have exited
|
||||
DBG_STEM("Stem EOF");
|
||||
Shutdown(14);
|
||||
}
|
||||
|
||||
if ( bytes_read < 0 )
|
||||
{
|
||||
LogError("Stem read() failed: %s", strerror(errno));
|
||||
return {};
|
||||
}
|
||||
|
||||
msgs = std::move(msgs_);
|
||||
}
|
||||
|
||||
if ( bytes_read < 0 )
|
||||
else if ( have_msgs(&msg_buffer, '\0') )
|
||||
{
|
||||
LogError("Stem read() failed: %s", strerror(errno));
|
||||
return {};
|
||||
// Have some pending messages.
|
||||
msgs = extract_msgs(&msg_buffer, '\0');
|
||||
}
|
||||
|
||||
for ( auto& msg : msgs )
|
||||
|
@ -1719,7 +1805,7 @@ std::string Supervisor::Create(const Supervisor::NodeConfig& node)
|
|||
}
|
||||
|
||||
auto msg = make_create_message(node);
|
||||
util::safe_write(stem_pipe->OutFD(), msg.data(), msg.size() + 1);
|
||||
write_msg(stem_pipe, msg, &msg_buffer);
|
||||
nodes.emplace(node.name, node);
|
||||
return "";
|
||||
}
|
||||
|
@ -1731,7 +1817,7 @@ bool Supervisor::Destroy(std::string_view node_name)
|
|||
std::stringstream ss;
|
||||
ss << "destroy " << name;
|
||||
std::string msg = ss.str();
|
||||
util::safe_write(stem_pipe->OutFD(), msg.data(), msg.size() + 1);
|
||||
write_msg(stem_pipe, msg, &msg_buffer);
|
||||
};
|
||||
|
||||
if ( node_name.empty() )
|
||||
|
@ -1760,7 +1846,7 @@ bool Supervisor::Restart(std::string_view node_name)
|
|||
std::stringstream ss;
|
||||
ss << "restart " << name;
|
||||
std::string msg = ss.str();
|
||||
util::safe_write(stem_pipe->OutFD(), msg.data(), msg.size() + 1);
|
||||
write_msg(stem_pipe, msg, &msg_buffer);
|
||||
};
|
||||
|
||||
if ( node_name.empty() )
|
||||
|
|
|
@ -0,0 +1,5 @@
|
|||
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||
supervised node zeek_init()
|
||||
1024, cluster_nodes!
|
||||
[node_type=Cluster::WORKER, ip=127.0.0.1, zone_id=, p=0/tcp, interface=<uninitialized>, manager=<uninitialized>, time_machine=<uninitialized>, id=<uninitialized>]
|
||||
supervised node zeek_done()
|
|
@ -0,0 +1,5 @@
|
|||
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||
supervised node zeek_init()
|
||||
1024, cluster_nodes!
|
||||
[node_type=Cluster::WORKER, ip=127.0.0.1, zone_id=, p=0/tcp, interface=<uninitialized>, manager=<uninitialized>, time_machine=<uninitialized>, id=<uninitialized>]
|
||||
supervised node zeek_done()
|
73
testing/btest/supervisor/large-cluster.zeek
Normal file
73
testing/btest/supervisor/large-cluster.zeek
Normal file
|
@ -0,0 +1,73 @@
|
|||
# Run a cluster with 64 Zeek processes and an insanely large cluster
|
||||
# layout (which is sent over the supervisor <-> stem pipe for every
|
||||
# Supervisor::create() call. This previously triggered an instant-abort()
|
||||
# due to write() returning with EAGAIN when the pipe was filled.
|
||||
|
||||
# @TEST-PORT: BROKER_PORT
|
||||
# @TEST-REQUIRES: test "${ZEEK_USE_CPP}" != "1"
|
||||
# @TEST-EXEC: btest-bg-run zeek zeek -j -b %INPUT
|
||||
# @TEST-EXEC: btest-bg-wait 60
|
||||
# @TEST-EXEC: btest-diff zeek/bare-1/node.out
|
||||
# @TEST-EXEC: btest-diff zeek/bare-32/node.out
|
||||
|
||||
# So the supervised node doesn't terminate right away.
|
||||
redef exit_only_after_terminate=T;
|
||||
|
||||
global node_output_file: file;
|
||||
global topic = "test-topic";
|
||||
|
||||
event do_destroy(name: string)
|
||||
{
|
||||
Supervisor::destroy(name);
|
||||
|
||||
# When no nodes are left, exit.
|
||||
local status = Supervisor::status();
|
||||
if ( |status$nodes| == 0)
|
||||
terminate();
|
||||
}
|
||||
|
||||
event zeek_init()
|
||||
{
|
||||
if ( Supervisor::is_supervisor() )
|
||||
{
|
||||
Broker::subscribe(topic);
|
||||
Broker::listen("127.0.0.1", to_port(getenv("BROKER_PORT")));
|
||||
|
||||
local i = 0;
|
||||
local name: string;
|
||||
local cluster: table[string] of Supervisor::ClusterEndpoint;
|
||||
while ( ++i <= 1024 )
|
||||
{
|
||||
name = fmt("bare-%d", i);
|
||||
cluster[name] = [$host=127.0.0.1, $p=0/tcp, $role=Supervisor::WORKER];
|
||||
}
|
||||
|
||||
i = 0;
|
||||
while ( ++i <= 32 )
|
||||
{
|
||||
name = fmt("bare-%d", i);
|
||||
local sn = Supervisor::NodeConfig($name=name, $directory=name, $bare_mode=T, $cluster=cluster);
|
||||
Supervisor::create(sn);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
Broker::peer("127.0.0.1", to_port(getenv("BROKER_PORT")));
|
||||
node_output_file = open("node.out");
|
||||
print node_output_file, "supervised node zeek_init()";
|
||||
print node_output_file, |Cluster::nodes|, "cluster_nodes!";
|
||||
print node_output_file, Cluster::nodes[Cluster::node];
|
||||
}
|
||||
}
|
||||
|
||||
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
|
||||
{
|
||||
if ( Supervisor::is_supervised() )
|
||||
Broker::publish(topic, do_destroy, Supervisor::node()$name);
|
||||
}
|
||||
|
||||
event zeek_done()
|
||||
{
|
||||
if ( Supervisor::is_supervised() )
|
||||
print node_output_file, "supervised node zeek_done()";
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue