diff --git a/src/supervisor/Supervisor.cc b/src/supervisor/Supervisor.cc index e913eb343a..2a2049923c 100644 --- a/src/supervisor/Supervisor.cc +++ b/src/supervisor/Supervisor.cc @@ -119,7 +119,8 @@ struct Stem std::unique_ptr signal_flare; std::unique_ptr pipe; std::map nodes; - std::string msg_buffer; + // May be modified during Log() and ReportStatus() via write_msg(). + mutable std::string msg_buffer; bool shutting_down = false; }; } @@ -147,6 +148,11 @@ static RETSIGTYPE supervisor_signal_handler(int signo) return RETSIGVAL; } +static bool have_msgs(std::string* buffer, char delim) + { + return buffer->find(delim) != std::string::npos; + } + static std::vector extract_msgs(std::string* buffer, char delim) { std::vector rval; @@ -167,7 +173,8 @@ static std::vector extract_msgs(std::string* buffer, char delim) return rval; } -static std::pair> read_msgs(int fd, std::string* buffer, char delim) +// Read some bytes from the given fd and append them to buffer. +static int read_bytes_into_buffer(int fd, std::string* buffer) { constexpr auto buf_size = 256; char buf[buf_size]; @@ -175,12 +182,74 @@ static std::pair> read_msgs(int fd, std::string* b int bytes_read = read(fd, buf, buf_size); if ( bytes_read <= 0 ) - return {bytes_read, {}}; + return bytes_read; buffer->append(buf, bytes_read); + + return bytes_read; + } + +static std::pair> read_msgs(int fd, std::string* buffer, char delim) + { + int bytes_read = read_bytes_into_buffer(fd, buffer); + if ( bytes_read < 0 && errno == EAGAIN ) // Treat EAGAIN as no bytes read. + bytes_read = 0; + + if ( bytes_read < 0 ) + return {bytes_read, {}}; + return {bytes_read, extract_msgs(buffer, delim)}; } +// Write a message to the OutFD() of the given pipe pair. +// +// Should the write() fail with EAGAIN, usleep() for a tiny amount +// and retry after draining data pending on InFD() and appending +// it to given buffer. Draining is done in case the other side of the +// pipe is also blocking on a write() that we're blocking. +static bool write_msg(const std::unique_ptr& pipe, const std::string& msg, + std::string* buffer) + { + // Send the string including its '\0' + int len = msg.size() + 1; + const char* data = msg.data(); + + while ( len > 0 ) + { + int n = write(pipe->OutFD(), data, len); + + if ( n < 0 ) + { + if ( errno == EINTR ) + continue; + + // If there's no room in the pipe and we would've blocked, relax + // a bit, but otherwise just retry again. No fancy polling. + // + // We drain the stem's queue for a best effort preventing it + // running into a full pipe as well. + if ( errno == EAGAIN || errno == EWOULDBLOCK ) + { + read_bytes_into_buffer(pipe->InFD(), buffer); + usleep(10); + continue; + } + + char buf[128]; + util::zeek_strerror_r(errno, buf, sizeof(buf)); + fprintf(stderr, "write_msg error: %d (%s)\n", errno, buf); + abort(); + + return false; + } + + data += n; + len -= n; + } + + return true; + } + static std::string make_create_message(const Supervisor::NodeConfig& node) { auto json_str = node.ToJSON(); @@ -479,7 +548,7 @@ void Supervisor::HandleChildSignal() { const auto& node = n.second; auto msg = make_create_message(node.config); - util::safe_write(stem_pipe->OutFD(), msg.data(), msg.size() + 1); + write_msg(stem_pipe, msg, &msg_buffer); } } @@ -507,6 +576,11 @@ void Supervisor::InitPostScript() double Supervisor::GetNextTimeout() { + // If there's any messages to be processed in the mg_buffer, + // let Zeek's IO loop know we are ready. + if ( have_msgs(&msg_buffer, '\0') ) + return 0.0; + return -1; } @@ -929,7 +1003,7 @@ void Stem::Shutdown(int exit_code) void Stem::ReportStatus(const SupervisorNode& node) const { std::string msg = util::fmt("status %s %d", node.Name().data(), node.pid); - util::safe_write(pipe->OutFD(), msg.data(), msg.size() + 1); + write_msg(pipe, msg, &msg_buffer); } void Stem::Log(std::string_view type, const char* format, va_list args) const @@ -946,7 +1020,7 @@ void Stem::Log(std::string_view type, const char* format, va_list args) const std::string msg{type.data(), type.size()}; msg += " "; msg += raw_msg; - util::safe_write(pipe->OutFD(), msg.data(), msg.size() + 1); + write_msg(pipe, msg, &msg_buffer); } void Stem::LogDebug(const char* format, ...) const @@ -1009,7 +1083,7 @@ std::optional Stem::Poll() // Note: the poll timeout here is for periodically checking if the parent // process died (see below). - constexpr auto poll_timeout_ms = 1000; + int poll_timeout_ms = have_msgs(&msg_buffer, '\0') ? 0 : 1000; auto res = poll(pfds.get(), total_fd_count, poll_timeout_ms); if ( res < 0 ) @@ -1072,23 +1146,35 @@ std::optional Stem::Poll() node.stderr_pipe.Process(); } - if ( ! pfds[0].revents ) - // No messages from supervisor to process, so return early. - return {}; - - auto [bytes_read, msgs] = read_msgs(pipe->InFD(), &msg_buffer, '\0'); - - if ( bytes_read == 0 ) + // Process messages from Supervisor. + // + // Either the supervisor sent some new data and/or we already + // have some in msg_buffer populated during write_msg(). + std::vector msgs; + if ( pfds[0].revents ) { - // EOF, supervisor must have exited - DBG_STEM("Stem EOF"); - Shutdown(14); + // New data from Supervisor. + auto [bytes_read, msgs_] = read_msgs(pipe->InFD(), &msg_buffer, '\0'); + + if ( bytes_read == 0 ) + { + // EOF, supervisor must have exited + DBG_STEM("Stem EOF"); + Shutdown(14); + } + + if ( bytes_read < 0 ) + { + LogError("Stem read() failed: %s", strerror(errno)); + return {}; + } + + msgs = std::move(msgs_); } - - if ( bytes_read < 0 ) + else if ( have_msgs(&msg_buffer, '\0') ) { - LogError("Stem read() failed: %s", strerror(errno)); - return {}; + // Have some pending messages. + msgs = extract_msgs(&msg_buffer, '\0'); } for ( auto& msg : msgs ) @@ -1719,7 +1805,7 @@ std::string Supervisor::Create(const Supervisor::NodeConfig& node) } auto msg = make_create_message(node); - util::safe_write(stem_pipe->OutFD(), msg.data(), msg.size() + 1); + write_msg(stem_pipe, msg, &msg_buffer); nodes.emplace(node.name, node); return ""; } @@ -1731,7 +1817,7 @@ bool Supervisor::Destroy(std::string_view node_name) std::stringstream ss; ss << "destroy " << name; std::string msg = ss.str(); - util::safe_write(stem_pipe->OutFD(), msg.data(), msg.size() + 1); + write_msg(stem_pipe, msg, &msg_buffer); }; if ( node_name.empty() ) @@ -1760,7 +1846,7 @@ bool Supervisor::Restart(std::string_view node_name) std::stringstream ss; ss << "restart " << name; std::string msg = ss.str(); - util::safe_write(stem_pipe->OutFD(), msg.data(), msg.size() + 1); + write_msg(stem_pipe, msg, &msg_buffer); }; if ( node_name.empty() ) diff --git a/testing/btest/Baseline/supervisor.large-cluster/zeek.bare-1.node.out b/testing/btest/Baseline/supervisor.large-cluster/zeek.bare-1.node.out new file mode 100644 index 0000000000..4c79e06437 --- /dev/null +++ b/testing/btest/Baseline/supervisor.large-cluster/zeek.bare-1.node.out @@ -0,0 +1,5 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +supervised node zeek_init() +1024, cluster_nodes! +[node_type=Cluster::WORKER, ip=127.0.0.1, zone_id=, p=0/tcp, interface=, manager=, time_machine=, id=] +supervised node zeek_done() diff --git a/testing/btest/Baseline/supervisor.large-cluster/zeek.bare-32.node.out b/testing/btest/Baseline/supervisor.large-cluster/zeek.bare-32.node.out new file mode 100644 index 0000000000..4c79e06437 --- /dev/null +++ b/testing/btest/Baseline/supervisor.large-cluster/zeek.bare-32.node.out @@ -0,0 +1,5 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +supervised node zeek_init() +1024, cluster_nodes! +[node_type=Cluster::WORKER, ip=127.0.0.1, zone_id=, p=0/tcp, interface=, manager=, time_machine=, id=] +supervised node zeek_done() diff --git a/testing/btest/supervisor/large-cluster.zeek b/testing/btest/supervisor/large-cluster.zeek new file mode 100644 index 0000000000..08771af3f2 --- /dev/null +++ b/testing/btest/supervisor/large-cluster.zeek @@ -0,0 +1,73 @@ +# Run a cluster with 64 Zeek processes and an insanely large cluster +# layout (which is sent over the supervisor <-> stem pipe for every +# Supervisor::create() call. This previously triggered an instant-abort() +# due to write() returning with EAGAIN when the pipe was filled. + +# @TEST-PORT: BROKER_PORT +# @TEST-REQUIRES: test "${ZEEK_USE_CPP}" != "1" +# @TEST-EXEC: btest-bg-run zeek zeek -j -b %INPUT +# @TEST-EXEC: btest-bg-wait 60 +# @TEST-EXEC: btest-diff zeek/bare-1/node.out +# @TEST-EXEC: btest-diff zeek/bare-32/node.out + +# So the supervised node doesn't terminate right away. +redef exit_only_after_terminate=T; + +global node_output_file: file; +global topic = "test-topic"; + +event do_destroy(name: string) + { + Supervisor::destroy(name); + + # When no nodes are left, exit. + local status = Supervisor::status(); + if ( |status$nodes| == 0) + terminate(); + } + +event zeek_init() + { + if ( Supervisor::is_supervisor() ) + { + Broker::subscribe(topic); + Broker::listen("127.0.0.1", to_port(getenv("BROKER_PORT"))); + + local i = 0; + local name: string; + local cluster: table[string] of Supervisor::ClusterEndpoint; + while ( ++i <= 1024 ) + { + name = fmt("bare-%d", i); + cluster[name] = [$host=127.0.0.1, $p=0/tcp, $role=Supervisor::WORKER]; + } + + i = 0; + while ( ++i <= 32 ) + { + name = fmt("bare-%d", i); + local sn = Supervisor::NodeConfig($name=name, $directory=name, $bare_mode=T, $cluster=cluster); + Supervisor::create(sn); + } + } + else + { + Broker::peer("127.0.0.1", to_port(getenv("BROKER_PORT"))); + node_output_file = open("node.out"); + print node_output_file, "supervised node zeek_init()"; + print node_output_file, |Cluster::nodes|, "cluster_nodes!"; + print node_output_file, Cluster::nodes[Cluster::node]; + } + } + +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) + { + if ( Supervisor::is_supervised() ) + Broker::publish(topic, do_destroy, Supervisor::node()$name); + } + +event zeek_done() + { + if ( Supervisor::is_supervised() ) + print node_output_file, "supervised node zeek_done()"; + }