Raw input reader command execution "fixes".

- Primarily working around an issue that occurs when threads
  concurrently create pipes and fork a child process.  See comment in
  code...

- Other minor cleanup of the code:  making sure the child process calls
  _exit() versus exit(), limits itself to few select system calls before
  the exec(), and closes more unused file descriptors.
This commit is contained in:
Jon Siwek 2013-08-14 11:37:30 -05:00
parent 35dfdf7288
commit d3dad31bdc
5 changed files with 138 additions and 91 deletions

View file

@ -19,7 +19,12 @@ using threading::Value;
using threading::Field; using threading::Field;
const int Raw::block_size = 4096; // how big do we expect our chunks of data to be. const int Raw::block_size = 4096; // how big do we expect our chunks of data to be.
pthread_mutex_t Raw::fork_mutex;
bool Raw::ClassInit()
{
return pthread_mutex_init(&fork_mutex, 0) == 0;
}
Raw::Raw(ReaderFrontend *frontend) : ReaderBackend(frontend) Raw::Raw(ReaderFrontend *frontend) : ReaderBackend(frontend)
{ {
@ -77,8 +82,30 @@ void Raw::DoClose()
} }
} }
void Raw::ClosePipeEnd(int i)
{
if ( pipes[i] == -1 )
return;
safe_close(pipes[i]);
pipes[i] = -1;
}
bool Raw::Execute() bool Raw::Execute()
{ {
// TODO: AFAICT, pipe/fork/exec should be thread-safe, but actually having
// multiple threads set up pipes and fork concurrently sometimes results
// in problems w/ a stdin pipe not ever getting an EOF even though both
// ends of it are closed. But if the same threads allocate pipes and fork
// individually or sequentially, that issue never crops up... ("never"
// meaning I haven't seen in it in hundreds of tests using 50+ threads
// where before I'd see the issue w/ just 2 threads ~33% of the time).
int lock_rval = pthread_mutex_lock(&fork_mutex);
if ( lock_rval != 0 )
{
Error(Fmt("cannot lock fork mutex: %d", lock_rval));
return false;
}
if ( pipe(pipes) != 0 || pipe(pipes+2) || pipe(pipes+4) ) if ( pipe(pipes) != 0 || pipe(pipes+2) || pipe(pipes+4) )
{ {
Error(Fmt("Could not open pipe: %d", errno)); Error(Fmt("Could not open pipe: %d", errno));
@ -95,65 +122,75 @@ bool Raw::Execute()
else if ( childpid == 0 ) else if ( childpid == 0 )
{ {
// we are the child. // we are the child.
safe_close(pipes[stdout_in]); close(pipes[stdout_in]);
if ( dup2(pipes[stdout_out], stdout_fileno) == -1 ) if ( dup2(pipes[stdout_out], stdout_fileno) == -1 )
Error(Fmt("Error on dup2 stdout_out: %d", errno)); _exit(252);
close(pipes[stdout_out]);
if ( stdin_towrite ) close(pipes[stdin_out]);
{ if ( stdin_towrite && dup2(pipes[stdin_in], stdin_fileno) == -1 )
safe_close(pipes[stdin_out]); _exit(253);
if ( dup2(pipes[stdin_in], stdin_fileno) == -1 ) close(pipes[stdin_in]);
Error(Fmt("Error on dup2 stdin_in: %d", errno));
}
if ( use_stderr ) close(pipes[stderr_in]);
{ if ( use_stderr && dup2(pipes[stderr_out], stderr_fileno) == -1 )
safe_close(pipes[stderr_in]); _exit(254);
if ( dup2(pipes[stderr_out], stderr_fileno) == -1 ) close(pipes[stderr_out]);
Error(Fmt("Error on dup2 stderr_out: %d", errno));
}
execl("/bin/sh", "sh", "-c", fname.c_str(), (char*) NULL); execl("/bin/sh", "sh", "-c", fname.c_str(), (char*) NULL);
fprintf(stderr, "Exec failed :(......\n"); fprintf(stderr, "Exec failed :(......\n");
exit(255); _exit(255);
} }
else else
{ {
// we are the parent // we are the parent
safe_close(pipes[stdout_out]); lock_rval = pthread_mutex_unlock(&fork_mutex);
pipes[stdout_out] = -1; if ( lock_rval != 0 )
{
Error(Fmt("cannot unlock fork mutex: %d", lock_rval));
return false;
}
ClosePipeEnd(stdout_out);
if ( Info().mode == MODE_STREAM ) if ( Info().mode == MODE_STREAM )
fcntl(pipes[stdout_in], F_SETFL, O_NONBLOCK); fcntl(pipes[stdout_in], F_SETFL, O_NONBLOCK);
ClosePipeEnd(stdin_in);
if ( stdin_towrite ) if ( stdin_towrite )
{
safe_close(pipes[stdin_in]);
pipes[stdin_in] = -1;
fcntl(pipes[stdin_out], F_SETFL, O_NONBLOCK); // ya, just always set this to nonblocking. we do not want to block on a program receiving data. fcntl(pipes[stdin_out], F_SETFL, O_NONBLOCK); // ya, just always set this to nonblocking. we do not want to block on a program receiving data.
// note that there is a small gotcha with it. More data is queued when more data is read from the program output. Hence, when having // note that there is a small gotcha with it. More data is queued when more data is read from the program output. Hence, when having
// a program in mode_manual where the first write cannot write everything, the rest will be stuck in a queue that is never emptied. // a program in mode_manual where the first write cannot write everything, the rest will be stuck in a queue that is never emptied.
} else
ClosePipeEnd(stdin_out);
ClosePipeEnd(stderr_out);
if ( use_stderr ) if ( use_stderr )
{
safe_close(pipes[stderr_out]);
pipes[stderr_out] = -1;
fcntl(pipes[stderr_in], F_SETFL, O_NONBLOCK); // true for this too. fcntl(pipes[stderr_in], F_SETFL, O_NONBLOCK); // true for this too.
} else
ClosePipeEnd(stderr_in);
file = fdopen(pipes[stdout_in], "r"); file = fdopen(pipes[stdout_in], "r");
if ( ! file )
{
Error("Could not convert stdout_in fileno to file");
return false;
}
pipes[stdout_in] = -1; // will be closed by fclose pipes[stdout_in] = -1; // will be closed by fclose
if ( use_stderr ) if ( use_stderr )
{
stderrfile = fdopen(pipes[stderr_in], "r"); stderrfile = fdopen(pipes[stderr_in], "r");
pipes[stderr_in] = -1; // will be closed by fclose
if ( file == 0 || (stderrfile == 0 && use_stderr) ) if ( ! stderrfile )
{ {
Error("Could not convert fileno to file"); Error("Could not convert stderr_in fileno to file");
return false; return false;
} }
pipes[stderr_in] = -1; // will be closed by fclose
}
return true; return true;
} }
@ -194,15 +231,9 @@ bool Raw::CloseInput()
if ( use_stderr ) if ( use_stderr )
fclose(stderrfile); fclose(stderrfile);
if ( execute ) // we do not care if any of those fails. They should all be defined. if ( execute )
{
for ( int i = 0; i < 6; i ++ ) for ( int i = 0; i < 6; i ++ )
if ( pipes[i] != -1 ) ClosePipeEnd(i);
{
safe_close(pipes[i]);
pipes[i] = -1;
}
}
file = 0; file = 0;
stderrfile = 0; stderrfile = 0;
@ -371,7 +402,7 @@ int64_t Raw::GetLine(FILE* arg_file)
} }
if ( errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR ) if ( errno == 0 || errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR )
return -2; return -2;
else else
@ -402,10 +433,7 @@ void Raw::WriteToStdin()
} }
if ( stdin_towrite == 0 ) // send EOF when we are done. if ( stdin_towrite == 0 ) // send EOF when we are done.
{ ClosePipeEnd(stdin_out);
safe_close(pipes[stdin_out]);
pipes[stdin_out] = -1;
}
if ( Info().mode == MODE_MANUAL && stdin_towrite != 0 ) if ( Info().mode == MODE_MANUAL && stdin_towrite != 0 )
{ {

View file

@ -4,6 +4,7 @@
#define INPUT_READERS_RAW_H #define INPUT_READERS_RAW_H
#include <vector> #include <vector>
#include <pthread.h>
#include "../ReaderBackend.h" #include "../ReaderBackend.h"
@ -20,6 +21,8 @@ public:
static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Raw(frontend); } static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Raw(frontend); }
static bool ClassInit();
protected: protected:
virtual bool DoInit(const ReaderInfo& info, int arg_num_fields, const threading::Field* const* fields); virtual bool DoInit(const ReaderInfo& info, int arg_num_fields, const threading::Field* const* fields);
virtual void DoClose(); virtual void DoClose();
@ -27,6 +30,9 @@ protected:
virtual bool DoHeartbeat(double network_time, double current_time); virtual bool DoHeartbeat(double network_time, double current_time);
private: private:
void ClosePipeEnd(int i);
bool OpenInput(); bool OpenInput();
bool CloseInput(); bool CloseInput();
int64_t GetLine(FILE* file); int64_t GetLine(FILE* file);
@ -45,6 +51,7 @@ private:
unsigned int sep_length; // length of the separator unsigned int sep_length; // length of the separator
static const int block_size; static const int block_size;
static pthread_mutex_t fork_mutex;
int bufpos; int bufpos;
char* buf; char* buf;
char* outbuf; char* outbuf;

View file

@ -57,6 +57,7 @@ extern "C" void OPENSSL_add_all_algorithms_conf(void);
#include "input/Manager.h" #include "input/Manager.h"
#include "logging/Manager.h" #include "logging/Manager.h"
#include "logging/writers/Ascii.h" #include "logging/writers/Ascii.h"
#include "input/readers/Raw.h"
#include "analyzer/Manager.h" #include "analyzer/Manager.h"
#include "analyzer/Tag.h" #include "analyzer/Tag.h"
#include "plugin/Manager.h" #include "plugin/Manager.h"
@ -842,6 +843,8 @@ int main(int argc, char** argv)
init_event_handlers(); init_event_handlers();
input::reader::Raw::ClassInit();
// The leak-checker tends to produce some false // The leak-checker tends to produce some false
// positives (memory which had already been // positives (memory which had already been
// allocated before we start the checking is // allocated before we start the checking is

View file

@ -1,36 +1,20 @@
[source=cat |, reader=Input::READER_RAW, mode=Input::STREAM, name=input2, fields=<no value description>, want_record=F, ev=line Input::EVENT_NEW, cat |, input0
{
print outfile, A::description;
print outfile, A::tpe;
print outfile, A::s;
try = try + 1;
if (2 == try)
{
Input::remove(input2);
close(outfile);
terminate();
}
}, config={
[stdin] = hello^Jthere^A^B^C^D^E^A^B^Cyay
}]
Input::EVENT_NEW
hello hello
[source=cat |, reader=Input::READER_RAW, mode=Input::STREAM, name=input2, fields=<no value description>, want_record=F, ev=line Input::EVENT_NEW, cat |, input0
{ there^A^B^C^D^E^A^B^Cyay0
print outfile, A::description; Input::EVENT_NEW, cat |, input1
print outfile, A::tpe; hello
print outfile, A::s; Input::EVENT_NEW, cat |, input1
try = try + 1; there^A^B^C^D^E^A^B^Cyay01
if (2 == try) Input::EVENT_NEW, cat |, input2
{ hello
Input::remove(input2); Input::EVENT_NEW, cat |, input2
close(outfile); there^A^B^C^D^E^A^B^Cyay012
terminate(); Input::EVENT_NEW, cat |, input3
} hello
Input::EVENT_NEW, cat |, input3
}, config={ there^A^B^C^D^E^A^B^Cyay0123
[stdin] = hello^Jthere^A^B^C^D^E^A^B^Cyay Input::EVENT_NEW, cat |, input4
}] hello
Input::EVENT_NEW Input::EVENT_NEW, cat |, input4
there^A^B^C^D^E^A^B^Cyay there^A^B^C^D^E^A^B^Cyay01234

View file

@ -1,5 +1,5 @@
# @TEST-EXEC: btest-bg-run bro bro -b %INPUT # @TEST-EXEC: btest-bg-run bro bro -b %INPUT
# @TEST-EXEC: btest-bg-wait -k 5 # @TEST-EXEC: btest-bg-wait 15
# @TEST-EXEC: btest-diff test.txt # @TEST-EXEC: btest-diff test.txt
# @TEST-EXEC: btest-diff out # @TEST-EXEC: btest-diff out
@ -7,7 +7,13 @@ redef exit_only_after_terminate = T;
@load base/frameworks/communication # let network-time run. otherwise there are no heartbeats... @load base/frameworks/communication # let network-time run. otherwise there are no heartbeats...
global outfile: file; global outfile: file;
global try: count; global processes_finished: count = 0;
global n: count = 0;
global total_processes: count = 0;
global config_strings: table[string] of string = {
["stdin"] = "hello\nthere\1\2\3\4\5\1\2\3yay"
};
module A; module A;
@ -17,27 +23,46 @@ type Val: record {
event line(description: Input::EventDescription, tpe: Input::Event, s: string) event line(description: Input::EventDescription, tpe: Input::Event, s: string)
{ {
print outfile, description; print outfile, tpe, description$source, description$name;
print outfile, tpe;
print outfile, s; print outfile, s;
try = try + 1; }
if ( try == 2 )
event InputRaw::process_finished(name: string, source:string, exit_code:count, signal_exit:bool)
{
print "process_finished", name, source;
Input::remove(name);
++processes_finished;
if ( processes_finished == total_processes )
{ {
Input::remove("input2");
close(outfile); close(outfile);
terminate(); terminate();
} }
} }
function more_input(name_prefix: string)
{
local name = fmt("%s%d", name_prefix, n);
config_strings["stdin"] += fmt("%d", n);
++n;
++total_processes;
Input::add_event([$source="cat |",
$reader=Input::READER_RAW, $mode=Input::STREAM,
$name=name, $fields=Val, $ev=line, $want_record=F,
$config=config_strings]);
}
event bro_init() event bro_init()
{ {
local config_strings: table[string] of string = {
["stdin"] = "hello\nthere\1\2\3\4\5\1\2\3yay"
#["stdin"] = "yay"
};
try = 0;
outfile = open("../out"); outfile = open("../out");
Input::add_event([$source="cat > ../test.txt |", $reader=Input::READER_RAW, $mode=Input::STREAM, $name="input", $fields=Val, $ev=line, $want_record=F, $config=config_strings]); ++total_processes;
Input::add_event([$source="cat |", $reader=Input::READER_RAW, $mode=Input::STREAM, $name="input2", $fields=Val, $ev=line, $want_record=F, $config=config_strings]);
Input::add_event([$source="cat > ../test.txt |",
$reader=Input::READER_RAW, $mode=Input::STREAM,
$name="input", $fields=Val, $ev=line, $want_record=F,
$config=config_strings]);
more_input("input");
more_input("input");
more_input("input");
more_input("input");
more_input("input");
} }