diff --git a/scripts/base/frameworks/input/readers/raw.bro b/scripts/base/frameworks/input/readers/raw.bro index 45deed3eda..ff49032b35 100644 --- a/scripts/base/frameworks/input/readers/raw.bro +++ b/scripts/base/frameworks/input/readers/raw.bro @@ -6,4 +6,12 @@ export { ## Separator between input records. ## Please note that the separator has to be exactly one character long const record_separator = "\n" &redef; + + ## Event that is called, when a process created by the raw reader exits. + ## + ## name: name of the input stream + ## source: source of the input stream + ## exit_code: exit code of the program, or number of the signal that forced the program to exit + ## signal_exit: false when program exitted normally, true when program was forced to exit by a signal + global process_finished: event(name: string, source:string, exit_code:count, signal_exit:bool); } diff --git a/src/input/Manager.cc b/src/input/Manager.cc index f5d0e2693c..33abd7d136 100644 --- a/src/input/Manager.cc +++ b/src/input/Manager.cc @@ -299,6 +299,7 @@ bool Manager::CreateStream(Stream* info, RecordVal* description) ReaderBackend::ReaderInfo* rinfo = new ReaderBackend::ReaderInfo(); rinfo->source = copy_string(source.c_str()); + rinfo->name = copy_string(name.c_str()); EnumVal* mode = description->LookupWithDefault(rtype->FieldOffset("mode"))->AsEnumVal(); switch ( mode->InternalInt() ) @@ -1175,6 +1176,9 @@ void Manager::EndCurrentSend(ReaderFrontend* reader) if ( i->stream_type == EVENT_STREAM ) { +#ifdef DEBUG + DBG_LOG(DBG_INPUT, "%s is event, sending end of data", i->name.c_str()); +#endif // just signal the end of the data source SendEndOfData(i); return; @@ -1281,6 +1285,10 @@ void Manager::SendEndOfData(ReaderFrontend* reader) void Manager::SendEndOfData(const Stream *i) { +#ifdef DEBUG + DBG_LOG(DBG_INPUT, "SendEndOfData for stream %s", + i->name.c_str()); +#endif SendEvent(end_of_data, 2, new StringVal(i->name.c_str()), new StringVal(i->info->source)); } diff --git a/src/input/ReaderBackend.h b/src/input/ReaderBackend.h index 73e5475db6..5419879e13 100644 --- a/src/input/ReaderBackend.h +++ b/src/input/ReaderBackend.h @@ -85,6 +85,11 @@ public: */ const char* source; + /** + * The name of the input stream. + */ + const char* name; + /** * A map of key/value pairs corresponding to the relevant * filter's "config" table. @@ -99,12 +104,14 @@ public: ReaderInfo() { source = 0; + name = 0; mode = MODE_NONE; } ReaderInfo(const ReaderInfo& other) { source = other.source ? copy_string(other.source) : 0; + name = other.name ? copy_string(other.name) : 0; mode = other.mode; for ( config_map::const_iterator i = other.config.begin(); i != other.config.end(); i++ ) diff --git a/src/input/readers/Raw.cc b/src/input/readers/Raw.cc index 850dda3a39..39d25912f8 100644 --- a/src/input/readers/Raw.cc +++ b/src/input/readers/Raw.cc @@ -6,6 +6,7 @@ #include "../../threading/SerialTypes.h" #include +#include #include #include #include @@ -23,6 +24,8 @@ const int Raw::block_size = 512; // how big do we expect our chunks of data to b Raw::Raw(ReaderFrontend *frontend) : ReaderBackend(frontend) { file = 0; + stderrfile = 0; + forcekill = false; separator.assign( (const char*) BifConst::InputRaw::record_separator->Bytes(), BifConst::InputRaw::record_separator->Len()); @@ -36,11 +39,6 @@ Raw::Raw(ReaderFrontend *frontend) : ReaderBackend(frontend) stdout_fileno = fileno(stdout); stderr_fileno = fileno(stderr); - // and because we later assume this... - assert(stdin_fileno == 0); - assert(stdout_fileno == 1); - assert(stderr_fileno == 2); - childpid = -1; stdin_towrite = 0; // by default do not open stdin @@ -58,9 +56,17 @@ void Raw::DoClose() CloseInput(); - if ( execute && childpid > 0 ) + if ( execute && childpid > 0 && kill(childpid, 0) == 0 ) + { // kill child process - kill(childpid, 9); // TERMINATOR + kill(childpid, 15); // sigterm + if ( forcekill ) + { + usleep(200); // 200 msecs should be enough for anyone ;) + if ( kill(childpid, 0) == 0 ) // perhaps it is already gone + kill(childpid, 9); // TERMINATE + } + } } bool Raw::Execute() @@ -112,22 +118,26 @@ bool Raw::Execute() if ( stdin_towrite ) { close(pipes[stdin_in]); - fcntl(pipes[stdin_out], F_SETFL, O_NONBLOCK); + fcntl(pipes[stdin_out], F_SETFL, O_NONBLOCK); // ya, just always set this to nonblocking. we do not want to block on a program receiving data. + // note that there is a small gotcha with it. More data is queued when more data is read from the program output. Hence, when having + // a program in mode_manual where the first write cannot write everything, the rest will be stuck in a queue that is never emptied. } if ( use_stderr ) { close(pipes[stderr_out]); - fcntl(pipes[stderr_in], F_SETFL, O_NONBLOCK); + fcntl(pipes[stderr_in], F_SETFL, O_NONBLOCK); // true for this too. } file = fdopen(pipes[stdout_in], "r"); - stderrfile = fdopen(pipes[stderr_in], "r"); - if ( file == 0 || (stderrfile == 0 && use_stderr) ) - { - Error("Could not convert fileno to file"); - return false; - } + + if ( use_stderr ) + stderrfile = fdopen(pipes[stderr_in], "r"); + if ( file == 0 || (stderrfile == 0 && use_stderr) ) + { + Error("Could not convert fileno to file"); + return false; + } return true; @@ -165,15 +175,19 @@ bool Raw::CloseInput() Debug(DBG_INPUT, "Raw reader starting close"); #endif + fclose(file); + + if ( use_stderr ) + fclose(stderrfile); + if ( execute ) // we do not care if any of those fails. They should all be defined. { for ( int i = 0; i < 6; i ++ ) close(pipes[i]); } - else - fclose(file); file = 0; + stderrfile = 0; #ifdef DEBUG Debug(DBG_INPUT, "Raw reader finished close"); @@ -219,11 +233,17 @@ bool Raw::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fie use_stderr = true; want_fields = 2; } + + it = info.config.find("force_kill"); // we want to be sure that our child is dead when we exit + if ( it != info.config.end() && execute ) + { + forcekill = true; + } if ( num_fields != want_fields ) { Error(Fmt("Filter for raw reader contains wrong number of fields -- got %d, expected %d. " - "Filters for the raw reader contain one field when used in normal mode and 2 fields when using execute mode with stderr capuring. " + "Filters for the raw reader contain one string field when used in normal mode and one string and one bool fields when using execute mode with stderr capuring. " "Filter ignored.", num_fields, want_fields)); return false; } @@ -236,6 +256,14 @@ bool Raw::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fie if ( use_stderr && fields[1]->type != TYPE_BOOL ) { Error("Second field for raw reader always has to be of type bool."); + return false; + } + + if ( execute && Info().mode == MODE_REREAD ) + { + // for execs this makes no sense - would have to execute each heartbeat? + Error("Rereading only supported for files, not for executables."); + return false; } @@ -353,8 +381,14 @@ void Raw::WriteToStdin() if ( stdin_towrite == 0 ) // send EOF when we are done. close(pipes[stdin_out]); + + if ( Info().mode == MODE_MANUAL && stdin_towrite != 0 ) + { + Error(Fmt("Could not write whole string to stdin of child process in one go. Please use STREAM mode to pass more data to child.")); + } } + // read the entire file and send appropriate thingies back to InputMgr bool Raw::DoUpdate() { @@ -366,6 +400,7 @@ bool Raw::DoUpdate() switch ( Info().mode ) { case MODE_REREAD: { + assert(childpid == -1); // mode may not be used to execute child programs // check if the file has changed struct stat sb; if ( stat(fname.c_str(), &sb) == -1 ) @@ -388,7 +423,6 @@ bool Raw::DoUpdate() case MODE_STREAM: if ( Info().mode == MODE_STREAM && file != 0 ) { - //fpurge(file); clearerr(file); // remove end of file evil bits break; } @@ -412,7 +446,7 @@ bool Raw::DoUpdate() WriteToStdin(); int64_t length = GetLine(file); - printf("Read %lld bytes\n", length); + //printf("Read %lld bytes\n", length); if ( length == -3 ) return false; @@ -444,7 +478,7 @@ bool Raw::DoUpdate() for ( ;; ) { int64_t length = GetLine(stderrfile); - printf("Read stderr %lld bytes\n", length); + //printf("Read stderr %lld bytes\n", length); if ( length == -3 ) return false; else if ( length == -2 || length == -1 ) @@ -464,6 +498,50 @@ bool Raw::DoUpdate() outbuf = 0; } + if ( ( Info().mode == MODE_MANUAL ) || ( Info().mode == MODE_REREAD ) ) + // done with the current data source :) + EndCurrentSend(); + + // and let's check if the child process is still alive + int return_code; + if ( waitpid(childpid, &return_code, WNOHANG) != 0 ) { + // child died :( + bool signal = false; + int code = 0; + if ( WIFEXITED(return_code) ) { + code = WEXITSTATUS(return_code); + if ( code != 0 ) + Error(Fmt("Child process exited with non-zero return code %d", code)); + } else if ( WIFSIGNALED(return_code) ) { + signal = false; + code = WTERMSIG(return_code); + Error(Fmt("Child process exited due to signal %d", code)); + } else { + assert(false); + } + + Value** vals = new Value*[4]; + vals[0] = new Value(TYPE_STRING, true); + vals[0]->val.string_val.data = copy_string(Info().name); + vals[0]->val.string_val.length = strlen(Info().name); + vals[1] = new Value(TYPE_STRING, true); + vals[1]->val.string_val.data = copy_string(Info().source); + vals[1]->val.string_val.length = strlen(Info().source); + vals[2] = new Value(TYPE_COUNT, true); + vals[2]->val.int_val = code; + vals[3] = new Value(TYPE_BOOL, true); + vals[3]->val.int_val = signal; + + // and in this case we can signal end_of_data even for the streaming reader + if ( Info().mode == MODE_STREAM ) + EndCurrentSend(); + + SendEvent("InputRaw::process_finished", 4, vals); + + } + + + #ifdef DEBUG Debug(DBG_INPUT, "DoUpdate finished successfully"); #endif diff --git a/src/input/readers/Raw.h b/src/input/readers/Raw.h index d79a80c31e..8ea03a70b4 100644 --- a/src/input/readers/Raw.h +++ b/src/input/readers/Raw.h @@ -58,6 +58,8 @@ private: bool use_stderr; + bool forcekill; + int pipes[6]; pid_t childpid; diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.executestreamrawstderr/out b/testing/btest/Baseline/scripts.base.frameworks.input.executestreamrawstderr/out deleted file mode 100644 index 55c4167ef8..0000000000 --- a/testing/btest/Baseline/scripts.base.frameworks.input.executestreamrawstderr/out +++ /dev/null @@ -1,148 +0,0 @@ -[source=ls .. ../nonexistant ../nonexistant2 ../nonexistant3 |, reader=Input::READER_RAW, mode=Input::MANUAL, name=input, fields=, want_record=F, ev=line -{ -print A::outfile, A::description; -print A::outfile, A::tpe; -print A::outfile, A::s; -print A::outfile, A::is_stderr; -A::try = A::try + 1; -if (7 == A::try) -{ -print A::outfile, done; -close(A::outfile); -Input::remove(input); -terminate(); -} - -}, config={ -[read_stderr] = 1 -}] -Input::EVENT_NEW -..: -F -[source=ls .. ../nonexistant ../nonexistant2 ../nonexistant3 |, reader=Input::READER_RAW, mode=Input::MANUAL, name=input, fields=, want_record=F, ev=line -{ -print A::outfile, A::description; -print A::outfile, A::tpe; -print A::outfile, A::s; -print A::outfile, A::is_stderr; -A::try = A::try + 1; -if (7 == A::try) -{ -print A::outfile, done; -close(A::outfile); -Input::remove(input); -terminate(); -} - -}, config={ -[read_stderr] = 1 -}] -Input::EVENT_NEW -bro -F -[source=ls .. ../nonexistant ../nonexistant2 ../nonexistant3 |, reader=Input::READER_RAW, mode=Input::MANUAL, name=input, fields=, want_record=F, ev=line -{ -print A::outfile, A::description; -print A::outfile, A::tpe; -print A::outfile, A::s; -print A::outfile, A::is_stderr; -A::try = A::try + 1; -if (7 == A::try) -{ -print A::outfile, done; -close(A::outfile); -Input::remove(input); -terminate(); -} - -}, config={ -[read_stderr] = 1 -}] -Input::EVENT_NEW -executestreamrawstderr.bro -F -[source=ls .. ../nonexistant ../nonexistant2 ../nonexistant3 |, reader=Input::READER_RAW, mode=Input::MANUAL, name=input, fields=, want_record=F, ev=line -{ -print A::outfile, A::description; -print A::outfile, A::tpe; -print A::outfile, A::s; -print A::outfile, A::is_stderr; -A::try = A::try + 1; -if (7 == A::try) -{ -print A::outfile, done; -close(A::outfile); -Input::remove(input); -terminate(); -} - -}, config={ -[read_stderr] = 1 -}] -Input::EVENT_NEW -out -F -[source=ls .. ../nonexistant ../nonexistant2 ../nonexistant3 |, reader=Input::READER_RAW, mode=Input::MANUAL, name=input, fields=, want_record=F, ev=line -{ -print A::outfile, A::description; -print A::outfile, A::tpe; -print A::outfile, A::s; -print A::outfile, A::is_stderr; -A::try = A::try + 1; -if (7 == A::try) -{ -print A::outfile, done; -close(A::outfile); -Input::remove(input); -terminate(); -} - -}, config={ -[read_stderr] = 1 -}] -Input::EVENT_NEW -ls: ../nonexistant: No such file or directory -T -[source=ls .. ../nonexistant ../nonexistant2 ../nonexistant3 |, reader=Input::READER_RAW, mode=Input::MANUAL, name=input, fields=, want_record=F, ev=line -{ -print A::outfile, A::description; -print A::outfile, A::tpe; -print A::outfile, A::s; -print A::outfile, A::is_stderr; -A::try = A::try + 1; -if (7 == A::try) -{ -print A::outfile, done; -close(A::outfile); -Input::remove(input); -terminate(); -} - -}, config={ -[read_stderr] = 1 -}] -Input::EVENT_NEW -ls: ../nonexistant2: No such file or directory -T -[source=ls .. ../nonexistant ../nonexistant2 ../nonexistant3 |, reader=Input::READER_RAW, mode=Input::MANUAL, name=input, fields=, want_record=F, ev=line -{ -print A::outfile, A::description; -print A::outfile, A::tpe; -print A::outfile, A::s; -print A::outfile, A::is_stderr; -A::try = A::try + 1; -if (7 == A::try) -{ -print A::outfile, done; -close(A::outfile); -Input::remove(input); -terminate(); -} - -}, config={ -[read_stderr] = 1 -}] -Input::EVENT_NEW -ls: ../nonexistant3: No such file or directory -T -done diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.raw/out b/testing/btest/Baseline/scripts.base.frameworks.input.raw.basic/out similarity index 100% rename from testing/btest/Baseline/scripts.base.frameworks.input.raw/out rename to testing/btest/Baseline/scripts.base.frameworks.input.raw.basic/out diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.executeraw/out b/testing/btest/Baseline/scripts.base.frameworks.input.raw.execute/out similarity index 100% rename from testing/btest/Baseline/scripts.base.frameworks.input.executeraw/out rename to testing/btest/Baseline/scripts.base.frameworks.input.raw.execute/out diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.execrawstdin/out b/testing/btest/Baseline/scripts.base.frameworks.input.raw.executestdin/out similarity index 100% rename from testing/btest/Baseline/scripts.base.frameworks.input.execrawstdin/out rename to testing/btest/Baseline/scripts.base.frameworks.input.raw.executestdin/out diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.execrawstdin/test.txt b/testing/btest/Baseline/scripts.base.frameworks.input.raw.executestdin/test.txt similarity index 100% rename from testing/btest/Baseline/scripts.base.frameworks.input.execrawstdin/test.txt rename to testing/btest/Baseline/scripts.base.frameworks.input.raw.executestdin/test.txt diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.executestreamraw/out b/testing/btest/Baseline/scripts.base.frameworks.input.raw.executestream/out similarity index 100% rename from testing/btest/Baseline/scripts.base.frameworks.input.executestreamraw/out rename to testing/btest/Baseline/scripts.base.frameworks.input.raw.executestream/out diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.rereadraw/out b/testing/btest/Baseline/scripts.base.frameworks.input.raw.rereadraw/out similarity index 100% rename from testing/btest/Baseline/scripts.base.frameworks.input.rereadraw/out rename to testing/btest/Baseline/scripts.base.frameworks.input.raw.rereadraw/out diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.raw.stderr/out b/testing/btest/Baseline/scripts.base.frameworks.input.raw.stderr/out new file mode 100644 index 0000000000..4900bc8ff8 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.input.raw.stderr/out @@ -0,0 +1,27 @@ +Process finished event +input +1 +Input::EVENT_NEW +..: +F +Input::EVENT_NEW +bro +F +Input::EVENT_NEW +out +F +Input::EVENT_NEW +stderr.bro +F +Input::EVENT_NEW +ls: ../nonexistant: No such file or directory +T +Input::EVENT_NEW +ls: ../nonexistant2: No such file or directory +T +Input::EVENT_NEW +ls: ../nonexistant3: No such file or directory +T +done +End of Data event +input diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.streamraw/out b/testing/btest/Baseline/scripts.base.frameworks.input.raw.streamraw/out similarity index 100% rename from testing/btest/Baseline/scripts.base.frameworks.input.streamraw/out rename to testing/btest/Baseline/scripts.base.frameworks.input.raw.streamraw/out diff --git a/testing/btest/scripts/base/frameworks/input/raw.bro b/testing/btest/scripts/base/frameworks/input/raw/basic.bro similarity index 100% rename from testing/btest/scripts/base/frameworks/input/raw.bro rename to testing/btest/scripts/base/frameworks/input/raw/basic.bro diff --git a/testing/btest/scripts/base/frameworks/input/executeraw.bro b/testing/btest/scripts/base/frameworks/input/raw/execute.bro similarity index 100% rename from testing/btest/scripts/base/frameworks/input/executeraw.bro rename to testing/btest/scripts/base/frameworks/input/raw/execute.bro diff --git a/testing/btest/scripts/base/frameworks/input/execrawstdin.bro b/testing/btest/scripts/base/frameworks/input/raw/executestdin.bro similarity index 100% rename from testing/btest/scripts/base/frameworks/input/execrawstdin.bro rename to testing/btest/scripts/base/frameworks/input/raw/executestdin.bro diff --git a/testing/btest/scripts/base/frameworks/input/executestreamraw.bro b/testing/btest/scripts/base/frameworks/input/raw/executestream.bro similarity index 100% rename from testing/btest/scripts/base/frameworks/input/executestreamraw.bro rename to testing/btest/scripts/base/frameworks/input/raw/executestream.bro diff --git a/testing/btest/scripts/base/frameworks/input/rereadraw.bro b/testing/btest/scripts/base/frameworks/input/raw/rereadraw.bro similarity index 100% rename from testing/btest/scripts/base/frameworks/input/rereadraw.bro rename to testing/btest/scripts/base/frameworks/input/raw/rereadraw.bro diff --git a/testing/btest/scripts/base/frameworks/input/executestreamrawstderr.bro b/testing/btest/scripts/base/frameworks/input/raw/stderr.bro similarity index 64% rename from testing/btest/scripts/base/frameworks/input/executestreamrawstderr.bro rename to testing/btest/scripts/base/frameworks/input/raw/stderr.bro index 7e7c640112..c85ee8b0ef 100644 --- a/testing/btest/scripts/base/frameworks/input/executestreamrawstderr.bro +++ b/testing/btest/scripts/base/frameworks/input/raw/stderr.bro @@ -4,8 +4,6 @@ redef exit_only_after_terminate = T; -module A; - type Val: record { s: string; is_stderr: bool; @@ -16,7 +14,6 @@ global outfile: file; event line(description: Input::EventDescription, tpe: Input::Event, s: string, is_stderr: bool) { - print outfile, description; print outfile, tpe; print outfile, s; print outfile, is_stderr; @@ -25,12 +22,25 @@ event line(description: Input::EventDescription, tpe: Input::Event, s: string, i if ( try == 7 ) { print outfile, "done"; - close(outfile); Input::remove("input"); - terminate(); } } +event Input::end_of_data(name: string, source:string) + { + print outfile, "End of Data event"; + print outfile, name; + terminate(); # due to the current design, end_of_data will be called after process_finshed and all line events. + # this could potentially change + } + +event InputRaw::process_finished(name: string, source:string, exit_code:count, signal_exit:bool) + { + print outfile, "Process finished event"; + print outfile, name; + print outfile, exit_code; + } + event bro_init() { diff --git a/testing/btest/scripts/base/frameworks/input/streamraw.bro b/testing/btest/scripts/base/frameworks/input/raw/streamraw.bro similarity index 100% rename from testing/btest/scripts/base/frameworks/input/streamraw.bro rename to testing/btest/scripts/base/frameworks/input/raw/streamraw.bro