diff --git a/src/input/readers/Raw.cc b/src/input/readers/Raw.cc index 4c0fd988d3..40215dabee 100644 --- a/src/input/readers/Raw.cc +++ b/src/input/readers/Raw.cc @@ -11,6 +11,7 @@ #include #include #include +#include using namespace input::reader; using threading::Value; @@ -38,6 +39,8 @@ Raw::Raw(ReaderFrontend *frontend) : ReaderBackend(frontend) assert(stdin_fileno == 0); assert(stdout_fileno == 1); assert(stderr_fileno == 2); + + childpid = -1; } Raw::~Raw() @@ -49,12 +52,15 @@ void Raw::DoClose() { if ( file != 0 ) CloseInput(); + + if ( execute && childpid > 0 ) + // kill child process + kill(childpid, 9); // TERMINATOR } bool Raw::Execute() { int stdout_pipe[2]; - pid_t pid; if (pipe(stdout_pipe) != 0) { @@ -62,13 +68,13 @@ bool Raw::Execute() return false; } - pid = fork(); - if ( pid < 0 ) + childpid = fork(); + if ( childpid < 0 ) { Error(Fmt("Could not create child process: %d", errno)); return false; } - else if ( pid == 0 ) + else if ( childpid == 0 ) { // we are the child. close(stdout_pipe[stdin_fileno]); @@ -82,6 +88,10 @@ bool Raw::Execute() { // we are the parent close(stdout_pipe[stdout_fileno]); + + if ( Info().mode == MODE_STREAM ) + fcntl(stdout_pipe[stdin_fileno], F_SETFL, O_NONBLOCK); + file = fdopen(stdout_pipe[stdin_fileno], "r"); if ( file == 0 ) { @@ -102,6 +112,7 @@ bool Raw::OpenInput() else { file = fopen(fname.c_str(), "r"); + fcntl(fileno(file), F_SETFD, FD_CLOEXEC); if ( !file ) { Error(Fmt("Init: cannot open %s", fname.c_str())); @@ -177,15 +188,6 @@ bool Raw::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fie execute = true; fname = source.substr(0, fname.length() - 1); - /* - if ( (info.mode != MODE_MANUAL) ) - { - Error(Fmt("Unsupported read mode %d for source %s in execution mode", - info.mode, fname.c_str())); - return false; - } - */ - result = OpenInput(); } @@ -229,7 +231,7 @@ int64_t Raw::GetLine() pos += bufpos + readbytes; bufpos = 0; // read full block size in next read... - if ( errno != 0 ) + if ( pos == 0 && errno != 0 ) break; char* token = strnstr(buf, separator.c_str(), block_size*repeats-pos); @@ -279,7 +281,7 @@ int64_t Raw::GetLine() if ( errno == 0 ) { assert(false); - } else if ( errno == EAGAIN || errno == EAGAIN || errno == EINTR ) { + } else if ( errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR ) { return -2; } else { // an error code we did no expect. This probably is bad. @@ -343,6 +345,8 @@ bool Raw::DoUpdate() for ( ;; ) { int64_t length = GetLine(); + //printf("Read %lld bytes", length); + if ( length == -3 ) return false; else if ( length == -2 || length == -1 ) diff --git a/src/input/readers/Raw.h b/src/input/readers/Raw.h index bd87648493..d550716c48 100644 --- a/src/input/readers/Raw.h +++ b/src/input/readers/Raw.h @@ -50,6 +50,8 @@ private: int stdin_fileno; int stdout_fileno; int stderr_fileno; + + pid_t childpid; }; } diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.executestreamraw/out b/testing/btest/Baseline/scripts.base.frameworks.input.executestreamraw/out new file mode 100644 index 0000000000..59a5f2c116 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.input.executestreamraw/out @@ -0,0 +1,153 @@ +[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, name=input, fields=, want_record=F, ev=line +{ +print A::outfile, A::description; +print A::outfile, A::tpe; +print A::outfile, A::s; +A::try = A::try + 1; +if (8 == A::try) +{ +print A::outfile, done; +close(A::outfile); +Input::remove(input); +terminate(); +} + +}, config={ + +}] +Input::EVENT_NEW +sdfkh:KH;fdkncv;ISEUp34:Fkdj;YVpIODhfDF +[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, name=input, fields=, want_record=F, ev=line +{ +print A::outfile, A::description; +print A::outfile, A::tpe; +print A::outfile, A::s; +A::try = A::try + 1; +if (8 == A::try) +{ +print A::outfile, done; +close(A::outfile); +Input::remove(input); +terminate(); +} + +}, config={ + +}] +Input::EVENT_NEW +DSF"DFKJ"SDFKLh304yrsdkfj@#(*U$34jfDJup3UF +[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, name=input, fields=, want_record=F, ev=line +{ +print A::outfile, A::description; +print A::outfile, A::tpe; +print A::outfile, A::s; +A::try = A::try + 1; +if (8 == A::try) +{ +print A::outfile, done; +close(A::outfile); +Input::remove(input); +terminate(); +} + +}, config={ + +}] +Input::EVENT_NEW +q3r3057fdf +[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, name=input, fields=, want_record=F, ev=line +{ +print A::outfile, A::description; +print A::outfile, A::tpe; +print A::outfile, A::s; +A::try = A::try + 1; +if (8 == A::try) +{ +print A::outfile, done; +close(A::outfile); +Input::remove(input); +terminate(); +} + +}, config={ + +}] +Input::EVENT_NEW +sdfs\d +[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, name=input, fields=, want_record=F, ev=line +{ +print A::outfile, A::description; +print A::outfile, A::tpe; +print A::outfile, A::s; +A::try = A::try + 1; +if (8 == A::try) +{ +print A::outfile, done; +close(A::outfile); +Input::remove(input); +terminate(); +} + +}, config={ + +}] +Input::EVENT_NEW + +[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, name=input, fields=, want_record=F, ev=line +{ +print A::outfile, A::description; +print A::outfile, A::tpe; +print A::outfile, A::s; +A::try = A::try + 1; +if (8 == A::try) +{ +print A::outfile, done; +close(A::outfile); +Input::remove(input); +terminate(); +} + +}, config={ + +}] +Input::EVENT_NEW +dfsdf +[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, name=input, fields=, want_record=F, ev=line +{ +print A::outfile, A::description; +print A::outfile, A::tpe; +print A::outfile, A::s; +A::try = A::try + 1; +if (8 == A::try) +{ +print A::outfile, done; +close(A::outfile); +Input::remove(input); +terminate(); +} + +}, config={ + +}] +Input::EVENT_NEW +sdf +[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, name=input, fields=, want_record=F, ev=line +{ +print A::outfile, A::description; +print A::outfile, A::tpe; +print A::outfile, A::s; +A::try = A::try + 1; +if (8 == A::try) +{ +print A::outfile, done; +close(A::outfile); +Input::remove(input); +terminate(); +} + +}, config={ + +}] +Input::EVENT_NEW +3rw43wRRERLlL#RWERERERE. +done diff --git a/testing/btest/scripts/base/frameworks/input/executestreamraw.bro b/testing/btest/scripts/base/frameworks/input/executestreamraw.bro new file mode 100644 index 0000000000..ead33018dc --- /dev/null +++ b/testing/btest/scripts/base/frameworks/input/executestreamraw.bro @@ -0,0 +1,61 @@ +# @TEST-EXEC: cp input1.log input.log +# @TEST-EXEC: btest-bg-run bro bro -b %INPUT +# @TEST-EXEC: sleep 3 +# @TEST-EXEC: cat input2.log >> input.log +# @TEST-EXEC: sleep 3 +# @TEST-EXEC: cat input3.log >> input.log +# @TEST-EXEC: btest-bg-wait -k 5 +# @TEST-EXEC: btest-diff out + +redef exit_only_after_terminate = T; + +@TEST-START-FILE input1.log +sdfkh:KH;fdkncv;ISEUp34:Fkdj;YVpIODhfDF +@TEST-END-FILE + +@TEST-START-FILE input2.log +DSF"DFKJ"SDFKLh304yrsdkfj@#(*U$34jfDJup3UF +q3r3057fdf +@TEST-END-FILE + +@TEST-START-FILE input3.log +sdfs\d + +dfsdf +sdf +3rw43wRRERLlL#RWERERERE. +@TEST-END-FILE + +@load base/frameworks/communication # let network-time run + +module A; + +type Val: record { + s: string; +}; + +global try: count; +global outfile: file; + +event line(description: Input::EventDescription, tpe: Input::Event, s: string) + { + print outfile, description; + print outfile, tpe; + print outfile, s; + + try = try + 1; + if ( try == 8 ) + { + print outfile, "done"; + close(outfile); + Input::remove("input"); + terminate(); + } + } + +event bro_init() + { + outfile = open("../out"); + try = 0; + Input::add_event([$source="tail -f ../input.log |", $reader=Input::READER_RAW, $mode=Input::STREAM, $name="input", $fields=Val, $ev=line, $want_record=F]); + }