diff --git a/src/input/readers/Raw.cc b/src/input/readers/Raw.cc index 0f4c4ca7d1..fecf9a7ddc 100644 --- a/src/input/readers/Raw.cc +++ b/src/input/readers/Raw.cc @@ -19,7 +19,12 @@ using threading::Value; using threading::Field; const int Raw::block_size = 4096; // how big do we expect our chunks of data to be. +pthread_mutex_t Raw::fork_mutex; +bool Raw::ClassInit() + { + return pthread_mutex_init(&fork_mutex, 0) == 0; + } Raw::Raw(ReaderFrontend *frontend) : ReaderBackend(frontend) { @@ -77,8 +82,30 @@ void Raw::DoClose() } } +void Raw::ClosePipeEnd(int i) + { + if ( pipes[i] == -1 ) + return; + safe_close(pipes[i]); + pipes[i] = -1; + } + bool Raw::Execute() { + // TODO: AFAICT, pipe/fork/exec should be thread-safe, but actually having + // multiple threads set up pipes and fork concurrently sometimes results + // in problems w/ a stdin pipe not ever getting an EOF even though both + // ends of it are closed. But if the same threads allocate pipes and fork + // individually or sequentially, that issue never crops up... ("never" + // meaning I haven't seen in it in hundreds of tests using 50+ threads + // where before I'd see the issue w/ just 2 threads ~33% of the time). + int lock_rval = pthread_mutex_lock(&fork_mutex); + if ( lock_rval != 0 ) + { + Error(Fmt("cannot lock fork mutex: %d", lock_rval)); + return false; + } + if ( pipe(pipes) != 0 || pipe(pipes+2) || pipe(pipes+4) ) { Error(Fmt("Could not open pipe: %d", errno)); @@ -95,65 +122,75 @@ bool Raw::Execute() else if ( childpid == 0 ) { // we are the child. - safe_close(pipes[stdout_in]); + close(pipes[stdout_in]); if ( dup2(pipes[stdout_out], stdout_fileno) == -1 ) - Error(Fmt("Error on dup2 stdout_out: %d", errno)); + _exit(252); + close(pipes[stdout_out]); - if ( stdin_towrite ) - { - safe_close(pipes[stdin_out]); - if ( dup2(pipes[stdin_in], stdin_fileno) == -1 ) - Error(Fmt("Error on dup2 stdin_in: %d", errno)); - } + close(pipes[stdin_out]); + if ( stdin_towrite && dup2(pipes[stdin_in], stdin_fileno) == -1 ) + _exit(253); + close(pipes[stdin_in]); - if ( use_stderr ) - { - safe_close(pipes[stderr_in]); - if ( dup2(pipes[stderr_out], stderr_fileno) == -1 ) - Error(Fmt("Error on dup2 stderr_out: %d", errno)); - } + close(pipes[stderr_in]); + if ( use_stderr && dup2(pipes[stderr_out], stderr_fileno) == -1 ) + _exit(254); + close(pipes[stderr_out]); execl("/bin/sh", "sh", "-c", fname.c_str(), (char*) NULL); fprintf(stderr, "Exec failed :(......\n"); - exit(255); + _exit(255); } else { // we are the parent - safe_close(pipes[stdout_out]); - pipes[stdout_out] = -1; + lock_rval = pthread_mutex_unlock(&fork_mutex); + if ( lock_rval != 0 ) + { + Error(Fmt("cannot unlock fork mutex: %d", lock_rval)); + return false; + } + ClosePipeEnd(stdout_out); if ( Info().mode == MODE_STREAM ) fcntl(pipes[stdout_in], F_SETFL, O_NONBLOCK); + ClosePipeEnd(stdin_in); if ( stdin_towrite ) - { - safe_close(pipes[stdin_in]); - pipes[stdin_in] = -1; fcntl(pipes[stdin_out], F_SETFL, O_NONBLOCK); // ya, just always set this to nonblocking. we do not want to block on a program receiving data. // note that there is a small gotcha with it. More data is queued when more data is read from the program output. Hence, when having // a program in mode_manual where the first write cannot write everything, the rest will be stuck in a queue that is never emptied. - } + else + ClosePipeEnd(stdin_out); + ClosePipeEnd(stderr_out); if ( use_stderr ) - { - safe_close(pipes[stderr_out]); - pipes[stderr_out] = -1; fcntl(pipes[stderr_in], F_SETFL, O_NONBLOCK); // true for this too. - } + else + ClosePipeEnd(stderr_in); file = fdopen(pipes[stdout_in], "r"); + + if ( ! file ) + { + Error("Could not convert stdout_in fileno to file"); + return false; + } + pipes[stdout_in] = -1; // will be closed by fclose if ( use_stderr ) + { stderrfile = fdopen(pipes[stderr_in], "r"); - pipes[stderr_in] = -1; // will be closed by fclose - if ( file == 0 || (stderrfile == 0 && use_stderr) ) + + if ( ! stderrfile ) { - Error("Could not convert fileno to file"); + Error("Could not convert stderr_in fileno to file"); return false; } + pipes[stderr_in] = -1; // will be closed by fclose + } return true; } @@ -194,15 +231,9 @@ bool Raw::CloseInput() if ( use_stderr ) fclose(stderrfile); - if ( execute ) // we do not care if any of those fails. They should all be defined. - { + if ( execute ) for ( int i = 0; i < 6; i ++ ) - if ( pipes[i] != -1 ) - { - safe_close(pipes[i]); - pipes[i] = -1; - } - } + ClosePipeEnd(i); file = 0; stderrfile = 0; @@ -371,7 +402,7 @@ int64_t Raw::GetLine(FILE* arg_file) } - if ( errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR ) + if ( errno == 0 || errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR ) return -2; else @@ -402,10 +433,7 @@ void Raw::WriteToStdin() } if ( stdin_towrite == 0 ) // send EOF when we are done. - { - safe_close(pipes[stdin_out]); - pipes[stdin_out] = -1; - } + ClosePipeEnd(stdin_out); if ( Info().mode == MODE_MANUAL && stdin_towrite != 0 ) { diff --git a/src/input/readers/Raw.h b/src/input/readers/Raw.h index 6dbae21002..8c05b54576 100644 --- a/src/input/readers/Raw.h +++ b/src/input/readers/Raw.h @@ -4,6 +4,7 @@ #define INPUT_READERS_RAW_H #include +#include #include "../ReaderBackend.h" @@ -20,6 +21,8 @@ public: static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Raw(frontend); } + static bool ClassInit(); + protected: virtual bool DoInit(const ReaderInfo& info, int arg_num_fields, const threading::Field* const* fields); virtual void DoClose(); @@ -27,6 +30,9 @@ protected: virtual bool DoHeartbeat(double network_time, double current_time); private: + + void ClosePipeEnd(int i); + bool OpenInput(); bool CloseInput(); int64_t GetLine(FILE* file); @@ -45,6 +51,7 @@ private: unsigned int sep_length; // length of the separator static const int block_size; + static pthread_mutex_t fork_mutex; int bufpos; char* buf; char* outbuf; diff --git a/src/main.cc b/src/main.cc index 6a58832964..fef3d94063 100644 --- a/src/main.cc +++ b/src/main.cc @@ -57,6 +57,7 @@ extern "C" void OPENSSL_add_all_algorithms_conf(void); #include "input/Manager.h" #include "logging/Manager.h" #include "logging/writers/Ascii.h" +#include "input/readers/Raw.h" #include "analyzer/Manager.h" #include "analyzer/Tag.h" #include "plugin/Manager.h" @@ -842,6 +843,8 @@ int main(int argc, char** argv) init_event_handlers(); + input::reader::Raw::ClassInit(); + // The leak-checker tends to produce some false // positives (memory which had already been // allocated before we start the checking is diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.raw.executestdin/out b/testing/btest/Baseline/scripts.base.frameworks.input.raw.executestdin/out index c49aee85b3..d36930d752 100644 --- a/testing/btest/Baseline/scripts.base.frameworks.input.raw.executestdin/out +++ b/testing/btest/Baseline/scripts.base.frameworks.input.raw.executestdin/out @@ -1,36 +1,20 @@ -[source=cat |, reader=Input::READER_RAW, mode=Input::STREAM, name=input2, fields=, want_record=F, ev=line -{ -print outfile, A::description; -print outfile, A::tpe; -print outfile, A::s; -try = try + 1; -if (2 == try) -{ -Input::remove(input2); -close(outfile); -terminate(); -} - -}, config={ -[stdin] = hello^Jthere^A^B^C^D^E^A^B^Cyay -}] -Input::EVENT_NEW +Input::EVENT_NEW, cat |, input0 hello -[source=cat |, reader=Input::READER_RAW, mode=Input::STREAM, name=input2, fields=, want_record=F, ev=line -{ -print outfile, A::description; -print outfile, A::tpe; -print outfile, A::s; -try = try + 1; -if (2 == try) -{ -Input::remove(input2); -close(outfile); -terminate(); -} - -}, config={ -[stdin] = hello^Jthere^A^B^C^D^E^A^B^Cyay -}] -Input::EVENT_NEW -there^A^B^C^D^E^A^B^Cyay +Input::EVENT_NEW, cat |, input0 +there^A^B^C^D^E^A^B^Cyay0 +Input::EVENT_NEW, cat |, input1 +hello +Input::EVENT_NEW, cat |, input1 +there^A^B^C^D^E^A^B^Cyay01 +Input::EVENT_NEW, cat |, input2 +hello +Input::EVENT_NEW, cat |, input2 +there^A^B^C^D^E^A^B^Cyay012 +Input::EVENT_NEW, cat |, input3 +hello +Input::EVENT_NEW, cat |, input3 +there^A^B^C^D^E^A^B^Cyay0123 +Input::EVENT_NEW, cat |, input4 +hello +Input::EVENT_NEW, cat |, input4 +there^A^B^C^D^E^A^B^Cyay01234 diff --git a/testing/btest/scripts/base/frameworks/input/raw/executestdin.bro b/testing/btest/scripts/base/frameworks/input/raw/executestdin.bro index f6513dc6aa..f80f2cc613 100644 --- a/testing/btest/scripts/base/frameworks/input/raw/executestdin.bro +++ b/testing/btest/scripts/base/frameworks/input/raw/executestdin.bro @@ -1,5 +1,5 @@ # @TEST-EXEC: btest-bg-run bro bro -b %INPUT -# @TEST-EXEC: btest-bg-wait -k 5 +# @TEST-EXEC: btest-bg-wait 15 # @TEST-EXEC: btest-diff test.txt # @TEST-EXEC: btest-diff out @@ -7,7 +7,13 @@ redef exit_only_after_terminate = T; @load base/frameworks/communication # let network-time run. otherwise there are no heartbeats... global outfile: file; -global try: count; +global processes_finished: count = 0; +global n: count = 0; +global total_processes: count = 0; + +global config_strings: table[string] of string = { + ["stdin"] = "hello\nthere\1\2\3\4\5\1\2\3yay" +}; module A; @@ -17,27 +23,46 @@ type Val: record { event line(description: Input::EventDescription, tpe: Input::Event, s: string) { - print outfile, description; - print outfile, tpe; + print outfile, tpe, description$source, description$name; print outfile, s; - try = try + 1; - if ( try == 2 ) + } + +event InputRaw::process_finished(name: string, source:string, exit_code:count, signal_exit:bool) + { + print "process_finished", name, source; + Input::remove(name); + ++processes_finished; + if ( processes_finished == total_processes ) { - Input::remove("input2"); close(outfile); terminate(); } } +function more_input(name_prefix: string) + { + local name = fmt("%s%d", name_prefix, n); + config_strings["stdin"] += fmt("%d", n); + ++n; + ++total_processes; + Input::add_event([$source="cat |", + $reader=Input::READER_RAW, $mode=Input::STREAM, + $name=name, $fields=Val, $ev=line, $want_record=F, + $config=config_strings]); + } + event bro_init() { - local config_strings: table[string] of string = { - ["stdin"] = "hello\nthere\1\2\3\4\5\1\2\3yay" - #["stdin"] = "yay" - }; - - try = 0; outfile = open("../out"); - Input::add_event([$source="cat > ../test.txt |", $reader=Input::READER_RAW, $mode=Input::STREAM, $name="input", $fields=Val, $ev=line, $want_record=F, $config=config_strings]); - Input::add_event([$source="cat |", $reader=Input::READER_RAW, $mode=Input::STREAM, $name="input2", $fields=Val, $ev=line, $want_record=F, $config=config_strings]); + ++total_processes; + + Input::add_event([$source="cat > ../test.txt |", + $reader=Input::READER_RAW, $mode=Input::STREAM, + $name="input", $fields=Val, $ev=line, $want_record=F, + $config=config_strings]); + more_input("input"); + more_input("input"); + more_input("input"); + more_input("input"); + more_input("input"); }