Initial structure for supervisor-mode

The full process hierarchy isn't set up yet, but these changes
help prepare by doing two things:

- Add a -j option to enable supervisor-mode.  Currently, just a single
  "stem" process gets forked early on to be used as the basis for
  further forking into real cluster nodes.

- Separates the parsing of command-line options from their consumption.
  i.e. need to parse whether we're in -j supervisor-mode before
  modifying any global state since that would taint the "stem" process.
  The new intermediate structure containing the parsed options may
  also serve as a way to pass configuration info from "stem" to its
  descendent cluster node processes.
This commit is contained in:
Jon Siwek 2019-09-27 18:53:07 -07:00
parent d97d625bc3
commit 4959d438fa
18 changed files with 751 additions and 366 deletions

View file

@ -286,6 +286,7 @@ set(MAIN_SRCS
Notifier.cc
Stats.cc
Stmt.cc
Supervisor.cc
Tag.cc
Timer.cc
Traverse.cc

View file

@ -19,7 +19,8 @@ DebugLogger::Stream DebugLogger::streams[NUM_DBGS] = {
{ "threading", 0, false }, { "file_analysis", 0, false },
{ "plugins", 0, false }, { "zeekygen", 0, false },
{ "pktio", 0, false }, { "broker", 0, false },
{ "scripts", 0, false}
{ "scripts", 0, false},
{ "supervisor", 0, false}
};
DebugLogger::DebugLogger()

View file

@ -29,6 +29,7 @@ enum DebugStream {
DBG_PKTIO, // Packet sources and dumpers.
DBG_BROKER, // Broker communication
DBG_SCRIPTS, // Script initialization
DBG_SUPERVISOR, // Process supervisor
NUM_DBGS // Has to be last
};

View file

@ -17,7 +17,14 @@ static void bad_pipe_op(const char* which)
{
char buf[256];
bro_strerror_r(errno, buf, sizeof(buf));
if ( reporter )
reporter->FatalErrorWithCore("unexpected pipe %s failure: %s", which, buf);
else
{
fprintf(stderr, "unexpected pipe %s failure: %s", which, buf);
abort();
}
}
void Flare::Fire()
@ -49,8 +56,9 @@ void Flare::Fire()
}
}
void Flare::Extinguish()
int Flare::Extinguish()
{
int rval = 0;
char tmp[256];
for ( ; ; )
@ -58,8 +66,11 @@ void Flare::Extinguish()
int n = read(pipe.ReadFD(), &tmp, sizeof(tmp));
if ( n >= 0 )
{
rval += n;
// Pipe may not be empty yet: try again.
continue;
}
if ( errno == EAGAIN )
// Success: pipe is now empty.
@ -71,4 +82,6 @@ void Flare::Extinguish()
bad_pipe_op("read");
}
return rval;
}

View file

@ -31,8 +31,10 @@ public:
/**
* Take the object out of the "ready" state.
* @return number of bytes read from the pipe, corresponds to the number
* of times Fire() was called.
*/
void Extinguish();
int Extinguish();
private:
Pipe pipe;

View file

@ -145,40 +145,39 @@ void net_update_time(double new_network_time)
PLUGIN_HOOK_VOID(HOOK_UPDATE_NETWORK_TIME, HookUpdateNetworkTime(new_network_time));
}
void net_init(name_list& interfaces, name_list& readfiles,
const char* writefile, int do_watchdog)
void net_init(const std::vector<std::string>& interfaces,
const std::vector<std::string>& pcap_input_files,
const std::string& pcap_output_file, bool do_watchdog)
{
if ( readfiles.length() > 0 )
if ( ! pcap_input_files.empty() )
{
reading_live = pseudo_realtime > 0.0;
reading_traces = 1;
for ( int i = 0; i < readfiles.length(); ++i )
for ( const auto& pif : pcap_input_files )
{
iosource::PktSrc* ps = iosource_mgr->OpenPktSrc(readfiles[i], false);
iosource::PktSrc* ps = iosource_mgr->OpenPktSrc(pif, false);
assert(ps);
if ( ! ps->IsOpen() )
reporter->FatalError("problem with trace file %s (%s)",
readfiles[i],
ps->ErrorMsg());
pif.data(), ps->ErrorMsg());
}
}
else if ( interfaces.length() > 0 )
else if ( ! interfaces.empty() )
{
reading_live = 1;
reading_traces = 0;
for ( int i = 0; i < interfaces.length(); ++i )
for ( const auto& iface : interfaces )
{
iosource::PktSrc* ps = iosource_mgr->OpenPktSrc(interfaces[i], true);
iosource::PktSrc* ps = iosource_mgr->OpenPktSrc(iface, true);
assert(ps);
if ( ! ps->IsOpen() )
reporter->FatalError("problem with interface %s (%s)",
interfaces[i],
ps->ErrorMsg());
iface.data(), ps->ErrorMsg());
}
}
@ -189,8 +188,9 @@ void net_init(name_list& interfaces, name_list& readfiles,
// a timer.
reading_traces = reading_live = 0;
if ( writefile )
if ( ! pcap_output_file.empty() )
{
const char* writefile = pcap_output_file.data();
pkt_dumper = iosource_mgr->OpenPktDumper(writefile, false);
assert(pkt_dumper);

View file

@ -2,6 +2,9 @@
#pragma once
#include <vector>
#include <string>
#include "net_util.h"
#include "util.h"
#include "List.h"
@ -10,8 +13,9 @@
#include "iosource/PktSrc.h"
#include "iosource/PktDumper.h"
extern void net_init(name_list& interfaces, name_list& readfiles,
const char* writefile, int do_watchdog);
extern void net_init(const std::vector<std::string>& interfaces,
const std::vector<std::string>& pcap_input_files,
const std::string& pcap_output_file, bool do_watchdog);
extern void net_run();
extern void net_get_final_stats();
extern void net_finish(int drain_events);
@ -76,8 +80,6 @@ extern iosource::IOSource* current_iosrc;
extern iosource::PktDumper* pkt_dumper; // where to save packets
extern char* writefile;
// Script file we have already scanned (or are in the process of scanning).
// They are identified by inode number.
struct ScannedFile {

View file

@ -13,7 +13,11 @@ static void pipe_fail(int eno)
{
char tmp[256];
bro_strerror_r(eno, tmp, sizeof(tmp));
if ( reporter )
reporter->FatalError("Pipe failure: %s", tmp);
else
fprintf(stderr, "Pipe failure: %s", tmp);
}
static void set_flags(int fd, int flags)

View file

@ -231,7 +231,7 @@ void RuleMatcher::Delete(RuleHdrTest* node)
delete node;
}
bool RuleMatcher::ReadFiles(const name_list& files)
bool RuleMatcher::ReadFiles(const std::vector<std::string>& files)
{
#ifdef USE_PERFTOOLS_DEBUG
HeapLeakChecker::Disabler disabler;
@ -239,18 +239,18 @@ bool RuleMatcher::ReadFiles(const name_list& files)
parse_error = false;
for ( int i = 0; i < files.length(); ++i )
for ( const auto& f : files )
{
rules_in = open_file(find_file(files[i], bro_path(), ".sig"));
rules_in = open_file(find_file(f, bro_path(), ".sig"));
if ( ! rules_in )
{
reporter->Error("Can't open signature file %s", files[i]);
reporter->Error("Can't open signature file %s", f.data());
return false;
}
rules_line_number = 0;
current_rule_file = files[i];
current_rule_file = f.data();
rules_parse();
fclose(rules_in);
}

View file

@ -221,7 +221,7 @@ public:
~RuleMatcher();
// Parse the given files and built up data structures.
bool ReadFiles(const name_list& files);
bool ReadFiles(const std::vector<std::string>& files);
/**
* Inititialize a state object for matching file magic signatures.

135
src/Supervisor.cc Normal file
View file

@ -0,0 +1,135 @@
#include <sys/wait.h>
#include <signal.h>
#include "Supervisor.h"
#include "Reporter.h"
#include "DebugLogger.h"
#include "zeek-config.h"
#include "util.h"
extern "C" {
#include "setsignal.h"
}
static RETSIGTYPE supervisor_sig_handler(int signo)
{
DBG_LOG(DBG_SUPERVISOR, "received SIGCHLD signal: %d", signo);
zeek::supervisor->ObserveChildSignal();
return RETSIGVAL;
}
zeek::Supervisor::Supervisor(zeek::Supervisor::Config cfg,
std::unique_ptr<bro::Pipe> pipe,
pid_t arg_stem_pid)
: config(std::move(cfg)), stem_pid(arg_stem_pid), stem_pipe(std::move(pipe))
{
DBG_LOG(DBG_SUPERVISOR, "forked stem process %d", stem_pid);
DBG_LOG(DBG_SUPERVISOR, "using %d workers", config.num_workers);
setsignal(SIGCHLD, supervisor_sig_handler);
SetIdle(true);
}
void zeek::Supervisor::ObserveChildSignal()
{
signal_flare.Fire();
}
zeek::Supervisor::~Supervisor()
{
if ( ! stem_pid )
{
DBG_LOG(DBG_SUPERVISOR, "shutdown, stem process already exited");
return;
}
DBG_LOG(DBG_SUPERVISOR, "shutdown, killing stem process %d", stem_pid);
// TODO: is signal the best way to trigger shutdown of decendent processes?
auto kill_res = kill(stem_pid, SIGTERM);
if ( kill_res == -1 )
{
char tmp[256];
bro_strerror_r(errno, tmp, sizeof(tmp));
reporter->Error("Failed to send SIGTERM to stem process: %s", tmp);
}
else
{
int status;
auto wait_res = waitpid(stem_pid, &status, 0);
if ( wait_res == -1 )
{
char tmp[256];
bro_strerror_r(errno, tmp, sizeof(tmp));
reporter->Error("Failed to wait for stem process to exit: %s", tmp);
}
}
}
void zeek::Supervisor::GetFds(iosource::FD_Set* read, iosource::FD_Set* write,
iosource::FD_Set* except)
{
read->Insert(signal_flare.FD());
read->Insert(stem_pipe->ReadFD());
}
double zeek::Supervisor::NextTimestamp(double* local_network_time)
{
// We're only asked for a timestamp if either (1) a FD was ready
// or (2) we're not idle (and we go idle if when Process is no-op),
// so there's no case where returning -1 to signify a skip will help.
return timer_mgr->Time();
}
void zeek::Supervisor::Process()
{
auto child_signals = signal_flare.Extinguish();
DBG_LOG(DBG_SUPERVISOR, "process: child_signals %d, stem_pid %d",
child_signals, stem_pid);
if ( child_signals && stem_pid )
{
DBG_LOG(DBG_SUPERVISOR, "handle child signal, wait for %d", stem_pid);
int status;
auto res = waitpid(stem_pid, &status, WNOHANG);
if ( res == 0 )
{
DBG_LOG(DBG_SUPERVISOR, "false alarm, stem process still lives");
}
else if ( res == -1 )
{
char tmp[256];
bro_strerror_r(errno, tmp, sizeof(tmp));
reporter->Error("Supervisor failed to get exit status"
" of stem process: %s", tmp);
}
else if ( WIFEXITED(status) )
{
DBG_LOG(DBG_SUPERVISOR, "stem process exited with status %d",
WEXITSTATUS(status));
stem_pid = 0;
}
else if ( WIFSIGNALED(status) )
{
DBG_LOG(DBG_SUPERVISOR, "stem process terminated by signal %d",
WTERMSIG(status));
stem_pid = 0;
}
else
{
reporter->Error("Supervisor failed to get exit status"
" of stem process for unknown reason");
}
// TODO: add proper handling of stem process exiting
// In wait cases is it ok for the stem process to terminate and
// in what cases do we need to automatically re-recreate it ?
// And how do we re-create it? It would be too late to fork() again
// because we've potentially already changed so much global state by the
// time we get there, so guess we exec() and start over completely ?.
}
}

54
src/Supervisor.h Normal file
View file

@ -0,0 +1,54 @@
#pragma once
#include <sys/types.h>
#include <cstdint>
#include <string>
#include <vector>
#include <utility>
#include <memory>
#include "iosource/IOSource.h"
#include "Pipe.h"
#include "Flare.h"
namespace zeek {
class Supervisor : public iosource::IOSource {
public:
struct Config {
int num_workers = 1;
std::vector<std::string> pcaps;
};
Supervisor(Config cfg, std::unique_ptr<bro::Pipe> stem_pipe, pid_t stem_pid);
~Supervisor();
pid_t StemPID() const
{ return stem_pid; }
void ObserveChildSignal();
private:
// IOSource interface overrides:
void GetFds(iosource::FD_Set* read, iosource::FD_Set* write,
iosource::FD_Set* except) override;
double NextTimestamp(double* local_network_time) override;
void Process() override;
const char* Tag() override
{ return "zeek::Supervisor"; }
Config config;
pid_t stem_pid;
std::unique_ptr<bro::Pipe> stem_pipe;
bro::Flare signal_flare;
};
extern Supervisor* supervisor;
} // namespace zeek

View file

@ -38,8 +38,8 @@ extern int bro_argc;
extern char** bro_argv;
extern const char* prog;
extern name_list prefixes; // -p flag
extern char* command_line_policy; // -e flag
extern std::vector<std::string> zeek_script_prefixes; // -p flag
extern const char* command_line_policy; // -e flag
extern vector<string> params;
class Stmt;

File diff suppressed because it is too large Load diff

View file

@ -434,10 +434,9 @@ when return TOK_WHEN;
pref = skip_whitespace(pref + 1); // Skip over '='.
if ( ! append )
while ( prefixes.length() > 1 ) // don't delete "" prefix
delete prefixes.remove_nth(1);
zeek_script_prefixes = { "" }; // don't delete the "" prefix
add_to_name_list(pref, ':', prefixes);
tokenize_string(pref, ":", &zeek_script_prefixes);
}
@if return TOK_ATIF;
@ -942,14 +941,14 @@ int yywrap()
it->prefixes_checked = true;
// Prefixes are pushed onto a stack, so iterate backwards.
for ( int i = prefixes.length() - 1; i >= 0; --i )
for ( int i = zeek_script_prefixes.size() - 1; i >= 0; --i )
{
// Don't look at empty prefixes.
if ( ! prefixes[i][0] )
if ( ! zeek_script_prefixes[i][0] )
continue;
string canon = without_bropath_component(it->name);
string flat = flatten_script_name(canon, prefixes[i]);
string flat = flatten_script_name(canon, zeek_script_prefixes[i]);
string path = find_relative_script_file(flat);
if ( ! path.empty() )

View file

@ -1,18 +1,11 @@
#include <signal.h>
#include <pthread.h>
#include "zeek-config.h"
#include "BasicThread.h"
#include "Manager.h"
#include "pthread.h"
#ifdef HAVE_LINUX
#include <sys/prctl.h>
#endif
#ifdef __FreeBSD__
#include <pthread_np.h>
#endif
#include "util.h"
using namespace threading;
@ -54,18 +47,7 @@ void BasicThread::SetName(const char* arg_name)
void BasicThread::SetOSName(const char* arg_name)
{
static_assert(std::is_same<std::thread::native_handle_type, pthread_t>::value, "libstdc++ doesn't use pthread_t");
#ifdef HAVE_LINUX
prctl(PR_SET_NAME, arg_name, 0, 0, 0);
#endif
#ifdef __APPLE__
pthread_setname_np(arg_name);
#endif
#ifdef __FreeBSD__
pthread_set_name_np(thread.native_handle(), arg_name);
#endif
zeek::set_thread_name(arg_name, thread.native_handle());
}
const char* BasicThread::Fmt(const char* format, ...)

View file

@ -1003,7 +1003,7 @@ string bro_prefixes()
{
string rval;
for ( const auto& prefix : prefixes )
for ( const auto& prefix : zeek_script_prefixes )
{
if ( ! rval.empty() )
rval.append(":");
@ -1936,3 +1936,18 @@ string json_escape_utf8(const string& val)
return result;
}
void zeek::set_thread_name(const char* name, pthread_t tid)
{
#ifdef HAVE_LINUX
prctl(PR_SET_NAME, name, 0, 0, 0);
#endif
#ifdef __APPLE__
pthread_setname_np(name);
#endif
#ifdef __FreeBSD__
pthread_set_name_np(tid, name);
#endif
}

View file

@ -60,6 +60,15 @@ extern HeapLeakChecker* heap_checker;
#endif
#include <stdint.h>
#include <pthread.h>
#ifdef HAVE_LINUX
#include <sys/prctl.h>
#endif
#ifdef __FreeBSD__
#include <pthread_np.h>
#endif
ZEEK_DEPRECATED("Remove in v4.1. Use uint64_t instead.")
typedef uint64_t uint64;
@ -579,3 +588,14 @@ std::unique_ptr<T> build_unique (Args&&... args) {
* @return the escaped string
*/
std::string json_escape_utf8(const std::string& val);
namespace zeek {
/**
* Set the process/thread name. May not be supported on all OSs.
* @param name new name for the process/thread. OS limitations typically
* truncate the name to 15 bytes maximum.
* @param tid handle of thread whose name shall change
*/
void set_thread_name(const char* name, pthread_t tid = pthread_self());
} // namespace zeek