Implement minimal supervised cluster configuration

More aspects of the cluster configuration to get fleshed out later,
but a basic cluster like one would use for a live deployment
can now be instantiated and run under supervision.  The new
clusterized-pcap-processing supervisor mode is also not done yet.
This commit is contained in:
Jon Siwek 2019-10-23 17:37:53 -07:00
parent 25a8ba99fa
commit 29f386e388
12 changed files with 390 additions and 91 deletions

View file

@ -18,7 +18,12 @@ redef Broker::log_topic = Cluster::rr_log_topic;
# Loading the cluster framework requires that a script by this name exists # Loading the cluster framework requires that a script by this name exists
# somewhere in the ZEEKPATH. The only thing in the file should be the # somewhere in the ZEEKPATH. The only thing in the file should be the
# cluster definition in the :zeek:id:`Cluster::nodes` variable. # cluster definition in the :zeek:id:`Cluster::nodes` variable.
@if ( ! Supervisor::__init_cluster() )
# When running a supervised cluster, Cluster::nodes is instead populated
# from the internal C++-layer directly via the above BIF.
@load cluster-layout @load cluster-layout
@endif
@if ( Cluster::node in Cluster::nodes ) @if ( Cluster::node in Cluster::nodes )

View file

@ -287,7 +287,13 @@ function is_enabled(): bool
function local_node_type(): NodeType function local_node_type(): NodeType
{ {
return is_enabled() ? nodes[node]$node_type : NONE; if ( ! is_enabled() )
return NONE;
if ( node !in nodes )
return NONE;
return nodes[node]$node_type;
} }
function node_topic(name: string): string function node_topic(name: string): string

View file

@ -4,14 +4,32 @@
module Supervisor; module Supervisor;
export { export {
type ClusterRole: enum {
NONE,
LOGGER,
MANAGER,
PROXY,
WORKER,
};
type ClusterEndpoint: record {
role: ClusterRole;
host: addr;
p: port;
interface: string &optional;
};
type Node: record { type Node: record {
# TODO: add proper config fields
name: string; name: string;
interface: string &optional;
cluster: table[string] of ClusterEndpoint &default=table();
# TODO: separate node config fields from status fields ?
# TODO: add more status fields ?
pid: count &optional; pid: count &optional;
}; };
type Status: record { type Status: record {
# TODO: add more status fields ?
nodes: table[string] of Node; nodes: table[string] of Node;
}; };

View file

@ -54,17 +54,17 @@ function Supervisor::status(nodes: string): Status
return Supervisor::__status(nodes); return Supervisor::__status(nodes);
} }
function create(node: Node): string function Supervisor::create(node: Node): string
{ {
return Supervisor::__create(node); return Supervisor::__create(node);
} }
function destroy(nodes: string): bool function Supervisor::destroy(nodes: string): bool
{ {
return Supervisor::__destroy(nodes); return Supervisor::__destroy(nodes);
} }
function restart(nodes: string): bool function Supervisor::restart(nodes: string): bool
{ {
return Supervisor::__restart(nodes); return Supervisor::__restart(nodes);
} }

View file

@ -33,6 +33,7 @@
#include "iosource/PktDumper.h" #include "iosource/PktDumper.h"
#include "plugin/Manager.h" #include "plugin/Manager.h"
#include "broker/Manager.h" #include "broker/Manager.h"
#include "Supervisor.h"
extern "C" { extern "C" {
#include "setsignal.h" #include "setsignal.h"
@ -288,6 +289,9 @@ void net_run()
while ( iosource_mgr->Size() || while ( iosource_mgr->Size() ||
(BifConst::exit_only_after_terminate && ! terminating) ) (BifConst::exit_only_after_terminate && ! terminating) )
{ {
if ( zeek::supervised_node && getppid() == 1 )
zeek_terminate_loop("supervised cluster node was orphaned");
double ts; double ts;
iosource::IOSource* src = iosource_mgr->FindSoonest(&ts); iosource::IOSource* src = iosource_mgr->FindSoonest(&ts);
@ -361,13 +365,11 @@ void net_run()
current_dispatched = 0; current_dispatched = 0;
current_iosrc = 0; current_iosrc = 0;
// Should we put the signal handling into an IOSource?
extern void termination_signal();
if ( signal_val == SIGTERM || signal_val == SIGINT ) if ( signal_val == SIGTERM || signal_val == SIGINT )
// We received a signal while processing the // We received a signal while processing the
// current packet and its related events. // current packet and its related events.
termination_signal(); // Should we put the signal handling into an IOSource?
zeek_terminate_loop("received termination signal");
if ( ! reading_traces ) if ( ! reading_traces )
// Check whether we have timers scheduled for // Check whether we have timers scheduled for

View file

@ -24,7 +24,7 @@ extern void net_update_time(double new_network_time);
extern void net_packet_dispatch(double t, const Packet* pkt, extern void net_packet_dispatch(double t, const Packet* pkt,
iosource::PktSrc* src_ps); iosource::PktSrc* src_ps);
extern void expire_timers(iosource::PktSrc* src_ps = 0); extern void expire_timers(iosource::PktSrc* src_ps = 0);
extern void termination_signal(); extern void zeek_terminate_loop(const char* reason);
// Functions to temporarily suspend processing of live input (network packets // Functions to temporarily suspend processing of live input (network packets
// and remote events/state). Turning this is on is sure to lead to data loss! // and remote events/state). Turning this is on is sure to lead to data loss!

View file

@ -14,6 +14,8 @@
#include "zeek-config.h" #include "zeek-config.h"
#include "util.h" #include "util.h"
#include "3rdparty/json.hpp"
extern "C" { extern "C" {
#include "setsignal.h" #include "setsignal.h"
} }
@ -24,14 +26,14 @@ struct Stem {
~Stem(); ~Stem();
std::string Run(); zeek::Supervisor::Node* Run();
std::string Poll(); zeek::Supervisor::Node* Poll();
zeek::Supervisor::Node* Revive();
void Reap(); void Reap();
std::string Revive();
bool Spawn(zeek::Supervisor::Node* node); bool Spawn(zeek::Supervisor::Node* node);
int AliveNodeCount() const; int AliveNodeCount() const;
@ -60,6 +62,7 @@ static Stem* stem = nullptr;
static RETSIGTYPE stem_sig_handler(int signo) static RETSIGTYPE stem_sig_handler(int signo)
{ {
// TODO: signal safety
printf("Stem received signal: %d\n", signo); printf("Stem received signal: %d\n", signo);
if ( stem->shutting_down ) if ( stem->shutting_down )
@ -75,6 +78,7 @@ static RETSIGTYPE stem_sig_handler(int signo)
static RETSIGTYPE supervisor_sig_handler(int signo) static RETSIGTYPE supervisor_sig_handler(int signo)
{ {
// TODO: signal safety
DBG_LOG(DBG_SUPERVISOR, "received signal: %d", signo); DBG_LOG(DBG_SUPERVISOR, "received signal: %d", signo);
zeek::supervisor->ObserveChildSignal(); zeek::supervisor->ObserveChildSignal();
return RETSIGVAL; return RETSIGVAL;
@ -191,8 +195,15 @@ void zeek::Supervisor::ReapStem()
void zeek::Supervisor::HandleChildSignal() void zeek::Supervisor::HandleChildSignal()
{ {
signal_flare.Extinguish(); bool had_child_signal = signal_flare.Extinguish();
ReapStem();
if ( had_child_signal )
{
ReapStem();
DBG_LOG(DBG_SUPERVISOR, "processed SIGCHLD %s",
stem_pid ? "(spurious)" : "");
}
if ( stem_pid ) if ( stem_pid )
return; return;
@ -233,9 +244,10 @@ void zeek::Supervisor::HandleChildSignal()
exit(1); exit(1);
} }
DBG_LOG(DBG_SUPERVISOR, "stem process revived, new pid: %d", stem_pid);
// Parent supervisor process resends node configurations to recreate // Parent supervisor process resends node configurations to recreate
// the desired process hierarchy // the desired process hierarchy
DBG_LOG(DBG_SUPERVISOR, "stem process revived, new pid: %d", stem_pid);
// TODO: probably a preferred order in which to create nodes // TODO: probably a preferred order in which to create nodes
// e.g. logger, manager, proxy, worker // e.g. logger, manager, proxy, worker
@ -402,7 +414,7 @@ void Stem::Destroy(zeek::Supervisor::Node* node) const
} }
} }
std::string Stem::Revive() zeek::Supervisor::Node* Stem::Revive()
{ {
constexpr auto attempts_before_delay_increase = 3; constexpr auto attempts_before_delay_increase = 3;
constexpr auto delay_increase_factor = 2; constexpr auto delay_increase_factor = 2;
@ -437,12 +449,12 @@ std::string Stem::Revive()
node.revival_delay *= delay_increase_factor; node.revival_delay *= delay_increase_factor;
if ( Spawn(&node) ) if ( Spawn(&node) )
return node.name; return new zeek::Supervisor::Node(node);
ReportStatus(node); ReportStatus(node);
} }
return ""; return {};
} }
bool Stem::Spawn(zeek::Supervisor::Node* node) bool Stem::Spawn(zeek::Supervisor::Node* node)
@ -494,10 +506,15 @@ void Stem::Shutdown(int exit_code)
for ( ; ; ) for ( ; ; )
{ {
auto sig = kill_attempts++ < max_term_attempts ? SIGTERM : SIGKILL; auto sig = kill_attempts++ < max_term_attempts ? SIGTERM : SIGKILL;
printf("Stem killed nodes with signal %d\n", sig);
KillNodes(sig); if ( ! nodes.empty() )
usleep(10); {
Reap(); KillNodes(sig);
printf("Stem killed nodes with signal %d\n", sig);
usleep(10);
Reap();
}
auto nodes_alive = AliveNodeCount(); auto nodes_alive = AliveNodeCount();
if ( nodes_alive == 0 ) if ( nodes_alive == 0 )
@ -531,20 +548,20 @@ void Stem::ReportStatus(const zeek::Supervisor::Node& node) const
safe_write(pipe->OutFD(), msg.data(), msg.size() + 1); safe_write(pipe->OutFD(), msg.data(), msg.size() + 1);
} }
std::string Stem::Run() zeek::Supervisor::Node* Stem::Run()
{ {
for ( ; ; ) for ( ; ; )
{ {
auto new_node_name = Poll(); auto new_node = Poll();
if ( ! new_node_name.empty() ) if ( new_node )
return new_node_name; return new_node;
} }
return ""; return {};
} }
std::string Stem::Poll() zeek::Supervisor::Node* Stem::Poll()
{ {
pollfd fds[2] = { { pipe->InFD(), POLLIN, 0 }, pollfd fds[2] = { { pipe->InFD(), POLLIN, 0 },
{ signal_flare->FD(), POLLIN, 0} }; { signal_flare->FD(), POLLIN, 0} };
@ -569,10 +586,10 @@ std::string Stem::Poll()
Shutdown(13); Shutdown(13);
} }
auto new_node_name = Revive(); auto new_node = Revive();
if ( ! new_node_name.empty() ) if ( new_node )
return new_node_name; return new_node;
if ( res == 0 ) if ( res == 0 )
return {}; return {};
@ -583,10 +600,10 @@ std::string Stem::Poll()
Shutdown(0); Shutdown(0);
Reap(); Reap();
auto new_node_name = Revive(); auto new_node = Revive();
if ( ! new_node_name.empty() ) if ( new_node )
return new_node_name; return new_node;
} }
if ( ! fds[0].revents ) if ( ! fds[0].revents )
@ -615,20 +632,18 @@ std::string Stem::Poll()
{ {
// TODO: improve message format ... // TODO: improve message format ...
std::vector<std::string> msg_tokens; std::vector<std::string> msg_tokens;
tokenize_string(std::move(msg), " ", &msg_tokens); tokenize_string(std::move(msg), " ", &msg_tokens, 2);
const auto& cmd = msg_tokens[0]; const auto& cmd = msg_tokens[0];
const auto& node_name = msg_tokens[1]; const auto& node_name = msg_tokens[1];
if ( cmd == "create" ) if ( cmd == "create" )
{ {
const auto& node_json = msg_tokens[2];
assert(nodes.find(node_name) == nodes.end()); assert(nodes.find(node_name) == nodes.end());
zeek::Supervisor::Node node; auto node = zeek::Supervisor::Node::FromJSON(node_json);
node.name = node_name;
if ( Spawn(&node) ) if ( Spawn(&node) )
// TODO: probably want to return the full configuration the return new zeek::Supervisor::Node(node);
// new node ought to use
return node.name;
// TODO: get stem printfs going through standard Zeek debug.log // TODO: get stem printfs going through standard Zeek debug.log
printf("Stem created node: %s (%d)\n", node.name.data(), node.pid); printf("Stem created node: %s (%d)\n", node.name.data(), node.pid);
@ -653,7 +668,7 @@ std::string Stem::Poll()
Destroy(&node); Destroy(&node);
if ( Spawn(&node) ) if ( Spawn(&node) )
return node.name; return new zeek::Supervisor::Node(node);
ReportStatus(node); ReportStatus(node);
} }
@ -664,7 +679,7 @@ std::string Stem::Poll()
return {}; return {};
} }
std::string zeek::Supervisor::RunStem(std::unique_ptr<bro::PipePair> pipe) zeek::Supervisor::Node* zeek::Supervisor::RunStem(std::unique_ptr<bro::PipePair> pipe)
{ {
Stem s(std::move(pipe)); Stem s(std::move(pipe));
return s.Run(); return s.Run();
@ -674,20 +689,192 @@ static zeek::Supervisor::Node node_val_to_struct(const RecordVal* node)
{ {
zeek::Supervisor::Node rval; zeek::Supervisor::Node rval;
rval.name = node->Lookup("name")->AsString()->CheckString(); rval.name = node->Lookup("name")->AsString()->CheckString();
auto iface_val = node->Lookup("interface");
if ( iface_val )
rval.interface = iface_val->AsString()->CheckString();
auto cluster_table_val = node->Lookup("cluster")->AsTableVal();
auto cluster_table = cluster_table_val->AsTable();
auto c = cluster_table->InitForIteration();
HashKey* k;
TableEntryVal* v;
while ( (v = cluster_table->NextEntry(k, c)) )
{
auto key = cluster_table_val->RecoverIndex(k);
auto name = key->Index(0)->AsStringVal()->ToStdString();
Unref(key);
auto rv = v->Value()->AsRecordVal();
zeek::Supervisor::ClusterEndpoint ep;
ep.role = static_cast<BifEnum::Supervisor::ClusterRole>(rv->Lookup("role")->AsEnum());
ep.host = rv->Lookup("host")->AsAddr().AsString();
ep.port = rv->Lookup("p")->AsPortVal()->Port();
auto iface = rv->Lookup("interface");
if ( iface )
ep.interface = iface->AsStringVal()->ToStdString();
rval.cluster.emplace(name, std::move(ep));
}
return rval; return rval;
} }
static RecordVal* node_struct_to_val(const zeek::Supervisor::Node& node) static RecordVal* node_struct_to_val(const zeek::Supervisor::Node& node)
{ {
auto rval = new RecordVal(BifType::Record::Supervisor::Node); auto rt = BifType::Record::Supervisor::Node;
rval->Assign(0, new StringVal(node.name)); auto rval = new RecordVal(rt);
rval->Assign(rt->FieldOffset("name"), new StringVal(node.name));
if ( ! node.interface.empty() )
rval->Assign(rt->FieldOffset("interface"),
new StringVal(node.interface));
auto tt = BifType::Record::Supervisor::Node->FieldType("cluster");
auto cluster_val = new TableVal(tt->AsTableType());
rval->Assign(rt->FieldOffset("cluster"), cluster_val);
for ( const auto& e : node.cluster )
{
auto& name = e.first;
auto& ep = e.second;
auto key = new StringVal(name);
auto ept = BifType::Record::Supervisor::ClusterEndpoint;
auto val = new RecordVal(ept);
val->Assign(ept->FieldOffset("role"), BifType::Enum::Supervisor::ClusterRole->GetVal(ep.role));
val->Assign(ept->FieldOffset("host"), new AddrVal(ep.host));
val->Assign(ept->FieldOffset("p"), val_mgr->GetPort(ep.port, TRANSPORT_TCP));
if ( ! ep.interface.empty() )
val->Assign(ept->FieldOffset("interface"), new StringVal(ep.interface));
cluster_val->Assign(key, val);
Unref(key);
}
if ( node.pid ) if ( node.pid )
rval->Assign(1, val_mgr->GetCount(node.pid)); rval->Assign(rt->FieldOffset("pid"), val_mgr->GetCount(node.pid));
return rval; return rval;
} }
static BifEnum::Supervisor::ClusterRole role_str_to_enum(const std::string& r)
{
if ( r == "Supervisor::LOGGER" )
return BifEnum::Supervisor::LOGGER;
if ( r == "Supervisor::MANAGER" )
return BifEnum::Supervisor::MANAGER;
if ( r == "Supervisor::PROXY" )
return BifEnum::Supervisor::PROXY;
if ( r == "Supervisor::WORKER" )
return BifEnum::Supervisor::LOGGER;
return BifEnum::Supervisor::NONE;
}
zeek::Supervisor::Node zeek::Supervisor::Node::FromJSON(const std::string& json)
{
zeek::Supervisor::Node rval;
auto j = nlohmann::json::parse(json);
rval.name = j["name"];
auto it = j.find("interface");
if ( it != j.end() )
rval.interface = *it;
auto cluster = j["cluster"];
for ( const auto& e : cluster.items() )
{
Supervisor::ClusterEndpoint ep;
auto& key = e.key();
auto& val = e.value();
auto role_str = val["role"];
ep.role = role_str_to_enum(role_str);
ep.host = val["host"];
ep.port = val["p"]["port"];
auto it = val.find("interface");
if ( it != val.end() )
ep.interface = *it;
rval.cluster.emplace(key, std::move(ep));
}
return rval;
}
static Val* supervisor_role_to_cluster_node_type(BifEnum::Supervisor::ClusterRole role)
{
static auto node_type = global_scope()->Lookup("Cluster::NodeType")->AsType()->AsEnumType();
switch ( role ) {
case BifEnum::Supervisor::LOGGER:
return node_type->GetVal(node_type->Lookup("Cluster", "LOGGER"));
case BifEnum::Supervisor::MANAGER:
return node_type->GetVal(node_type->Lookup("Cluster", "MANAGER"));
case BifEnum::Supervisor::PROXY:
return node_type->GetVal(node_type->Lookup("Cluster", "PROXY"));
case BifEnum::Supervisor::WORKER:
return node_type->GetVal(node_type->Lookup("Cluster", "WORKER"));
default:
return node_type->GetVal(node_type->Lookup("Cluster", "NONE"));
}
}
void zeek::Supervisor::Node::InitCluster()
{
auto cluster_node_type = global_scope()->Lookup("Cluster::Node")->AsType()->AsRecordType();
auto cluster_nodes_id = global_scope()->Lookup("Cluster::nodes");
auto cluster_manager_is_logger_id = global_scope()->Lookup("Cluster::manager_is_logger");
auto cluster_nodes = cluster_nodes_id->ID_Val()->AsTableVal();
auto has_logger = false;
std::string manager_name;
for ( const auto& e : supervised_node->cluster )
{
if ( e.second.role == BifEnum::Supervisor::MANAGER )
manager_name = e.first;
else if ( e.second.role == BifEnum::Supervisor::LOGGER )
has_logger = true;
}
for ( const auto& e : supervised_node->cluster )
{
const auto& node_name = e.first;
const auto& ep = e.second;
auto key = new StringVal(node_name);
auto val = new RecordVal(cluster_node_type);
auto node_type = supervisor_role_to_cluster_node_type(ep.role);
val->Assign(cluster_node_type->FieldOffset("node_type"), node_type);
val->Assign(cluster_node_type->FieldOffset("ip"), new AddrVal(ep.host));
val->Assign(cluster_node_type->FieldOffset("p"), val_mgr->GetPort(ep.port, TRANSPORT_TCP));
if ( ! ep.interface.empty() )
val->Assign(cluster_node_type->FieldOffset("interface"),
new StringVal(ep.interface));
if ( ! manager_name.empty() && ep.role != BifEnum::Supervisor::MANAGER )
val->Assign(cluster_node_type->FieldOffset("manager"),
new StringVal(manager_name));
cluster_nodes->Assign(key, val);
Unref(key);
}
cluster_manager_is_logger_id->SetVal(val_mgr->GetBool(! has_logger));
}
RecordVal* zeek::Supervisor::Status(const std::string& node_name) RecordVal* zeek::Supervisor::Status(const std::string& node_name)
{ {
// TODO: handle node classes // TODO: handle node classes
@ -708,7 +895,7 @@ RecordVal* zeek::Supervisor::Status(const std::string& node_name)
return rval; return rval;
} }
std::string zeek::Supervisor::Create(const RecordVal* node_val) std::string zeek::Supervisor::Create(RecordVal* node_val)
{ {
auto node = node_val_to_struct(node_val); auto node = node_val_to_struct(node_val);
@ -719,7 +906,13 @@ std::string zeek::Supervisor::Create(const RecordVal* node_val)
if ( nodes.find(node.name) != nodes.end() ) if ( nodes.find(node.name) != nodes.end() )
return fmt("node with name '%s' already exists", node.name.data()); return fmt("node with name '%s' already exists", node.name.data());
std::string msg = fmt("create %s", node.name.data()); auto re = new RE_Matcher("^_");
auto json_val = node_val->ToJSON(false, re);
auto json_str = json_val->ToStdString();
delete re;
Unref(json_val);
std::string msg = fmt("create %s %s", node.name.data(), json_str.data());
safe_write(stem_pipe->OutFD(), msg.data(), msg.size() + 1); safe_write(stem_pipe->OutFD(), msg.data(), msg.size() + 1);
nodes.emplace(node.name, node); nodes.emplace(node.name, node);
return ""; return "";

View file

@ -12,22 +12,35 @@
#include "iosource/IOSource.h" #include "iosource/IOSource.h"
#include "Pipe.h" #include "Pipe.h"
#include "Flare.h" #include "Flare.h"
#include "NetVar.h"
namespace zeek { namespace zeek {
class Supervisor : public iosource::IOSource { class Supervisor : public iosource::IOSource {
public: public:
static std::string RunStem(std::unique_ptr<bro::PipePair> pipe);
struct Config { struct Config {
int num_workers = 1; int num_workers = 1;
std::vector<std::string> pcaps; std::vector<std::string> pcaps;
std::string zeek_exe_path; std::string zeek_exe_path;
}; };
struct ClusterEndpoint {
BifEnum::Supervisor::ClusterRole role;
std::string host;
int port;
std::string interface;
};
struct Node { struct Node {
static Node FromJSON(const std::string& json);
static void InitCluster();
std::string name; std::string name;
std::string interface;
std::map<std::string, ClusterEndpoint> cluster;
pid_t pid = 0; pid_t pid = 0;
int exit_status = 0; int exit_status = 0;
int signal_number = 0; int signal_number = 0;
@ -36,6 +49,8 @@ public:
std::chrono::time_point<std::chrono::steady_clock> spawn_time; std::chrono::time_point<std::chrono::steady_clock> spawn_time;
}; };
static Node* RunStem(std::unique_ptr<bro::PipePair> pipe);
Supervisor(Config cfg, std::unique_ptr<bro::PipePair> stem_pipe, pid_t stem_pid); Supervisor(Config cfg, std::unique_ptr<bro::PipePair> stem_pipe, pid_t stem_pid);
~Supervisor(); ~Supervisor();
@ -46,7 +61,7 @@ public:
void ObserveChildSignal(); void ObserveChildSignal();
RecordVal* Status(const std::string& node_name); RecordVal* Status(const std::string& node_name);
std::string Create(const RecordVal* node); std::string Create(RecordVal* node);
bool Destroy(const std::string& node_name); bool Destroy(const std::string& node_name);
bool Restart(const std::string& node_name); bool Restart(const std::string& node_name);
@ -76,5 +91,6 @@ private:
}; };
extern Supervisor* supervisor; extern Supervisor* supervisor;
extern Supervisor::Node* supervised_node;
} // namespace zeek } // namespace zeek

View file

@ -94,6 +94,7 @@ zeekygen::Manager* zeekygen_mgr = 0;
iosource::Manager* iosource_mgr = 0; iosource::Manager* iosource_mgr = 0;
bro_broker::Manager* broker_mgr = 0; bro_broker::Manager* broker_mgr = 0;
zeek::Supervisor* zeek::supervisor = 0; zeek::Supervisor* zeek::supervisor = 0;
zeek::Supervisor::Node* zeek::supervised_node = 0;
std::vector<std::string> zeek_script_prefixes; std::vector<std::string> zeek_script_prefixes;
Stmt* stmts; Stmt* stmts;
@ -613,6 +614,7 @@ void terminate_bro()
delete file_mgr; delete file_mgr;
// broker_mgr is deleted via iosource_mgr // broker_mgr is deleted via iosource_mgr
// supervisor is deleted via iosource_mgr // supervisor is deleted via iosource_mgr
delete zeek::supervised_node;
delete iosource_mgr; delete iosource_mgr;
delete log_mgr; delete log_mgr;
delete reporter; delete reporter;
@ -623,11 +625,11 @@ void terminate_bro()
reporter = 0; reporter = 0;
} }
void termination_signal() void zeek_terminate_loop(const char* reason)
{ {
set_processing_status("TERMINATING", "termination_signal"); set_processing_status("TERMINATING", reason);
reporter->Info("%s", reason);
reporter->Info("received termination signal");
net_get_final_stats(); net_get_final_stats();
done_with_network(); done_with_network();
net_delete(); net_delete();
@ -738,13 +740,13 @@ int main(int argc, char** argv)
exit(0); exit(0);
} }
bool use_supervisor = options.supervised_workers > 0; auto use_supervisor = [&]() -> bool { return options.supervised_workers > 0; };
pid_t stem_pid = 0; pid_t stem_pid = 0;
std::unique_ptr<bro::PipePair> supervisor_pipe; std::unique_ptr<bro::PipePair> supervisor_pipe;
std::string stem_spawn = "";
if ( use_supervisor ) if ( use_supervisor() )
{ {
// TODO: the SIGCHLD handler should be set before fork()
supervisor_pipe.reset(new bro::PipePair{FD_CLOEXEC, O_NONBLOCK}); supervisor_pipe.reset(new bro::PipePair{FD_CLOEXEC, O_NONBLOCK});
stem_pid = fork(); stem_pid = fork();
@ -756,7 +758,7 @@ int main(int argc, char** argv)
} }
if ( stem_pid == 0 ) if ( stem_pid == 0 )
stem_spawn = zeek::Supervisor::RunStem(std::move(supervisor_pipe)); zeek::supervised_node = zeek::Supervisor::RunStem(std::move(supervisor_pipe));
} }
auto zeek_stem_env = getenv("ZEEK_STEM"); auto zeek_stem_env = getenv("ZEEK_STEM");
@ -779,22 +781,27 @@ int main(int argc, char** argv)
fds[i] = std::stoi(fd_strings[i]); fds[i] = std::stoi(fd_strings[i]);
supervisor_pipe.reset(new bro::PipePair{FD_CLOEXEC, O_NONBLOCK, fds}); supervisor_pipe.reset(new bro::PipePair{FD_CLOEXEC, O_NONBLOCK, fds});
stem_spawn = zeek::Supervisor::RunStem(std::move(supervisor_pipe)); zeek::supervised_node = zeek::Supervisor::RunStem(std::move(supervisor_pipe));
} }
if ( ! stem_spawn.empty() ) if ( zeek::supervised_node )
{ {
for ( ; ; ) // TODO: possibly can inherit some command-line options?
{ // In case stem gets revived via exec(), would need to pass along
// TODO: this no-op loop is here just to test the process hierarchy // original arguments to it.
printf("node wakeup: %s\n", stem_spawn.data()); options = {};
sleep(2); const auto& node_name = zeek::supervised_node->name;
// TODO: this re-parenting check needs to go somewhere proper if ( ! zeek::supervised_node->interface.empty() )
if ( getppid() == 1 ) options.interfaces.emplace_back(zeek::supervised_node->interface);
if ( ! zeek::supervised_node->cluster.empty() )
{
if ( setenv("CLUSTER_NODE", node_name.data(), true) == -1 )
{ {
printf("node suicide: %s\n", stem_spawn.data()); fprintf(stderr, "cluster node %s failed to setenv: %s\n",
exit(13); node_name.data(), strerror(errno));
exit(1);
} }
} }
} }
@ -862,7 +869,7 @@ int main(int argc, char** argv)
if ( ! getenv("ZEEK_DEBUG_LOG_STDERR") ) if ( ! getenv("ZEEK_DEBUG_LOG_STDERR") )
{ {
if ( use_supervisor ) if ( use_supervisor() )
debug_log_name = "debug-supervisor"; debug_log_name = "debug-supervisor";
else else
debug_log_name = "debug"; debug_log_name = "debug";
@ -872,7 +879,7 @@ int main(int argc, char** argv)
} }
#endif #endif
if ( use_supervisor ) if ( use_supervisor() )
{ {
zeek::Supervisor::Config cfg = {}; zeek::Supervisor::Config cfg = {};
cfg.pcaps = options.pcap_files; cfg.pcaps = options.pcap_files;
@ -939,7 +946,7 @@ int main(int argc, char** argv)
options.interfaces.size() == 0 && options.interfaces.size() == 0 &&
options.identifier_to_print.empty() && options.identifier_to_print.empty() &&
! command_line_policy && ! options.print_plugins && ! command_line_policy && ! options.print_plugins &&
! use_supervisor ) ! use_supervisor() && ! zeek::supervised_node )
add_input_file("-"); add_input_file("-");
for ( const auto& script_option : options.script_options_to_set ) for ( const auto& script_option : options.script_options_to_set )

View file

@ -6,30 +6,72 @@
module Supervisor; module Supervisor;
enum ClusterRole %{
NONE,
LOGGER,
MANAGER,
PROXY,
WORKER,
%}
type Supervisor::ClusterEndpoint: record;
type Supervisor::Status: record; type Supervisor::Status: record;
type Supervisor::Node: record; type Supervisor::Node: record;
function Supervisor::__status%(nodes: string%): Supervisor::Status function Supervisor::__status%(nodes: string%): Supervisor::Status
%{ %{
if ( ! zeek::supervisor )
{
builtin_error("supervisor mode not enabled");
return new RecordVal(BifType::Record::Supervisor::Status);
}
return zeek::supervisor->Status(nodes->CheckString()); return zeek::supervisor->Status(nodes->CheckString());
%} %}
function Supervisor::__create%(node: Supervisor::Node%): string function Supervisor::__create%(node: Supervisor::Node%): string
%{ %{
if ( ! zeek::supervisor )
{
builtin_error("supervisor mode not enabled");
return new StringVal("supervisor mode not enabled");
}
auto rval = zeek::supervisor->Create(node->AsRecordVal()); auto rval = zeek::supervisor->Create(node->AsRecordVal());
return new StringVal(rval); return new StringVal(rval);
%} %}
function Supervisor::__destroy%(nodes: string%): bool function Supervisor::__destroy%(nodes: string%): bool
%{ %{
if ( ! zeek::supervisor )
{
builtin_error("supervisor mode not enabled");
return val_mgr->GetBool(false);
}
auto rval = zeek::supervisor->Destroy(nodes->CheckString()); auto rval = zeek::supervisor->Destroy(nodes->CheckString());
return val_mgr->GetBool(rval); return val_mgr->GetBool(rval);
%} %}
function Supervisor::__restart%(nodes: string%): bool function Supervisor::__restart%(nodes: string%): bool
%{ %{
if ( ! zeek::supervisor )
{
builtin_error("supervisor mode not enabled");
return val_mgr->GetBool(false);
}
auto rval = zeek::supervisor->Restart(nodes->CheckString()); auto rval = zeek::supervisor->Restart(nodes->CheckString());
return val_mgr->GetBool(rval); return val_mgr->GetBool(rval);
%} %}
# TODO: BIFs for "restart", "add", "remove" operations function Supervisor::__init_cluster%(%): bool
%{
if ( zeek::supervised_node && ! zeek::supervised_node->cluster.empty() )
{
zeek::supervised_node->InitCluster();
return val_mgr->GetBool(true);
}
return val_mgr->GetBool(false);
%}

View file

@ -1165,17 +1165,22 @@ string flatten_script_name(const string& name, const string& prefix)
} }
vector<string>* tokenize_string(string input, const string& delim, vector<string>* tokenize_string(string input, const string& delim,
vector<string>* rval) vector<string>* rval, int limit)
{ {
if ( ! rval ) if ( ! rval )
rval = new vector<string>(); rval = new vector<string>();
size_t n; size_t n;
auto found = 0;
while ( (n = input.find(delim)) != string::npos ) while ( (n = input.find(delim)) != string::npos )
{ {
++found;
rval->push_back(input.substr(0, n)); rval->push_back(input.substr(0, n));
input.erase(0, n + 1); input.erase(0, n + 1);
if ( limit && found == limit )
break;
} }
rval->push_back(input); rval->push_back(input);
@ -1456,7 +1461,7 @@ void terminate_processing()
} }
extern const char* proc_status_file; extern const char* proc_status_file;
void _set_processing_status(const char* status) void set_processing_status(const char* status, const char* reason)
{ {
if ( ! proc_status_file ) if ( ! proc_status_file )
return; return;
@ -1483,20 +1488,27 @@ void _set_processing_status(const char* status)
return; return;
} }
int len = strlen(status); auto write_str = [](int fd, const char* s)
while ( len )
{ {
int n = write(fd, status, len); int len = strlen(s);
while ( len )
{
int n = write(fd, s, len);
if ( n < 0 && errno != EINTR && errno != EAGAIN ) if ( n < 0 && errno != EINTR && errno != EAGAIN )
// Ignore errors, as they're too difficult to // Ignore errors, as they're too difficult to
// safely report here. // safely report here.
break; break;
status += n; s += n;
len -= n; len -= n;
} }
};
write_str(fd, status);
write_str(fd, " [");
write_str(fd, reason);
write_str(fd, "]\n");
safe_close(fd); safe_close(fd);
errno = old_errno; errno = old_errno;

View file

@ -147,7 +147,7 @@ inline std::string get_escaped_string(const std::string& str, bool escape_all)
std::vector<std::string>* tokenize_string(std::string input, std::vector<std::string>* tokenize_string(std::string input,
const std::string& delim, const std::string& delim,
std::vector<std::string>* rval = 0); std::vector<std::string>* rval = 0, int limit = 0);
extern char* copy_string(const char* s); extern char* copy_string(const char* s);
extern int streq(const char* s1, const char* s2); extern int streq(const char* s1, const char* s2);
@ -411,9 +411,7 @@ void terminate_processing();
// Sets the current status of the Bro process to the given string. // Sets the current status of the Bro process to the given string.
// If the option --status-file has been set, this is written into // If the option --status-file has been set, this is written into
// the the corresponding file. Otherwise, the function is a no-op. // the the corresponding file. Otherwise, the function is a no-op.
#define set_processing_status(status, location) \ void set_processing_status(const char* status, const char* reason);
_set_processing_status(status " [" location "]\n");
void _set_processing_status(const char* status);
// Current timestamp, from a networking perspective, not a wall-clock // Current timestamp, from a networking perspective, not a wall-clock
// perspective. In particular, if we're reading from a savefile this // perspective. In particular, if we're reading from a savefile this