diff --git a/src/input/readers/Raw.cc b/src/input/readers/Raw.cc index 40215dabee..c4dfe55b93 100644 --- a/src/input/readers/Raw.cc +++ b/src/input/readers/Raw.cc @@ -19,6 +19,7 @@ using threading::Field; const int Raw::block_size = 512; // how big do we expect our chunks of data to be... + Raw::Raw(ReaderFrontend *frontend) : ReaderBackend(frontend) { file = 0; @@ -41,6 +42,8 @@ Raw::Raw(ReaderFrontend *frontend) : ReaderBackend(frontend) assert(stderr_fileno == 2); childpid = -1; + + stdin_towrite = 0; // by default do not open stdin } Raw::~Raw() @@ -53,6 +56,7 @@ void Raw::DoClose() if ( file != 0 ) CloseInput(); + if ( execute && childpid > 0 ) // kill child process kill(childpid, 9); // TERMINATOR @@ -60,9 +64,8 @@ void Raw::DoClose() bool Raw::Execute() { - int stdout_pipe[2]; - if (pipe(stdout_pipe) != 0) + if (pipe(pipes) != 0 || pipe(pipes+2) || pipe(pipes+4) ) { Error(Fmt("Could not open pipe: %d", errno)); return false; @@ -77,8 +80,15 @@ bool Raw::Execute() else if ( childpid == 0 ) { // we are the child. - close(stdout_pipe[stdin_fileno]); - dup2(stdout_pipe[stdout_fileno], stdout_fileno); + close(pipes[stdout_in]); + dup2(pipes[stdout_out], stdout_fileno); + + if ( stdin_towrite ) + { + close(pipes[stdin_out]); + dup2(pipes[stdin_in], stdin_fileno); + } + //execv("/usr/bin/uname",test); execl("/bin/sh", "sh", "-c", fname.c_str(), NULL); fprintf(stderr, "Exec failed :(......\n"); @@ -87,12 +97,18 @@ bool Raw::Execute() else { // we are the parent - close(stdout_pipe[stdout_fileno]); + close(pipes[stdout_out]); if ( Info().mode == MODE_STREAM ) - fcntl(stdout_pipe[stdin_fileno], F_SETFL, O_NONBLOCK); + fcntl(pipes[stdout_in], F_SETFL, O_NONBLOCK); + + if ( stdin_towrite ) + { + close(pipes[stdin_in]); + fcntl(pipes[stdin_out], F_SETFL, O_NONBLOCK); + } - file = fdopen(stdout_pipe[stdin_fileno], "r"); + file = fdopen(pipes[stdout_in], "r"); if ( file == 0 ) { Error("Could not convert fileno to file"); @@ -106,8 +122,7 @@ bool Raw::OpenInput() { if ( execute ) { - if ( ! Execute() ) - return false; + return Execute(); } else { @@ -120,10 +135,6 @@ bool Raw::OpenInput() } } - //if ( execute && Info().mode == MODE_STREAM ) - // fcntl(fileno(file), F_SETFL, O_NONBLOCK); - - //fcntl(fileno(file), F_SETFD, FD_CLOEXEC); return true; } @@ -138,8 +149,11 @@ bool Raw::CloseInput() Debug(DBG_INPUT, "Raw reader starting close"); #endif - if ( execute ) - pclose(file); + if ( execute ) // we do not care if any of those fails. They should all be defined. + { + for ( int i = 0; i < 6; i ++ ) + close(pipes[i]); + } else fclose(file); @@ -166,6 +180,13 @@ bool Raw::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fie return false; } + map::const_iterator it = info.config.find("stdin"); // data that is sent to the child process + if ( it != info.config.end() ) + { + stdin_string = it->second; + stdin_towrite = stdin_string.length(); + } + if ( num_fields != 1 ) { Error("Filter for raw reader contains more than one field. " @@ -214,9 +235,8 @@ bool Raw::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fie } -int64_t Raw::GetLine() +int64_t Raw::GetLine(FILE* arg_file) { - errno = 0; uint64_t pos = 0; @@ -227,7 +247,7 @@ int64_t Raw::GetLine() for (;;) { - size_t readbytes = fread(buf+bufpos, 1, block_size-bufpos, file); + size_t readbytes = fread(buf+bufpos, 1, block_size-bufpos, arg_file); pos += bufpos + readbytes; bufpos = 0; // read full block size in next read... @@ -240,7 +260,7 @@ int64_t Raw::GetLine() { // we did not find it and have to search again in the next try. resize buffer.... // but first check if we encountered the file end - because if we did this was it. - if ( feof(file) != 0 ) + if ( feof(arg_file) != 0 ) { outbuf = buf; buf = 0; @@ -291,6 +311,28 @@ int64_t Raw::GetLine() } +// write to the stdin of the child process +void Raw::WriteToStdin() + { + assert(stdin_towrite <= stdin_string.length()); + uint64_t pos = stdin_string.length() - stdin_towrite; + + errno = 0; + ssize_t written = write(pipes[stdin_out], stdin_string.c_str() + pos, stdin_towrite); + stdin_towrite -= written; + + if ( errno != 0 && errno != EAGAIN && errno != EWOULDBLOCK ) + { + Error(Fmt("Writing to child process stdin failed: %d. Stopping writing at position %d", errno, pos)); + stdin_towrite = 0; + close(pipes[stdin_out]); + } + + if ( stdin_towrite == 0 ) // send EOF when we are done. + printf("Closing %d\n", pipes[stdin_out]); + close(pipes[stdin_out]); + } + // read the entire file and send appropriate thingies back to InputMgr bool Raw::DoUpdate() { @@ -344,8 +386,11 @@ bool Raw::DoUpdate() assert (NumFields() == 1); for ( ;; ) { - int64_t length = GetLine(); - //printf("Read %lld bytes", length); + if ( stdin_towrite > 0 ) + WriteToStdin(); + + int64_t length = GetLine(file); + //printf("Read %lld bytes\n", length); if ( length == -3 ) return false; diff --git a/src/input/readers/Raw.h b/src/input/readers/Raw.h index d550716c48..cf29609331 100644 --- a/src/input/readers/Raw.h +++ b/src/input/readers/Raw.h @@ -29,7 +29,9 @@ protected: private: bool OpenInput(); bool CloseInput(); - int64_t GetLine(); + int64_t GetLine(FILE* file); + bool Execute(); + void WriteToStdin(); string fname; // Source with a potential "|" removed. FILE* file; @@ -40,10 +42,9 @@ private: // options set from the script-level. string separator; unsigned int sep_length; // length of the separator - bool Execute(); static const int block_size; - uint32_t bufpos; + uint64_t bufpos; char* buf; char* outbuf; @@ -51,7 +52,21 @@ private: int stdout_fileno; int stderr_fileno; + string stdin_string; + uint64_t stdin_towrite; + + int pipes[6]; pid_t childpid; + + enum IoChannels { + stdout_in = 0, + stdout_out = 1, + stdin_in = 2, + stdin_out = 3, + stderr_in = 4, + stderr_out = 5 + }; + }; } diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.execrawstdin/out b/testing/btest/Baseline/scripts.base.frameworks.input.execrawstdin/out new file mode 100644 index 0000000000..c49aee85b3 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.input.execrawstdin/out @@ -0,0 +1,36 @@ +[source=cat |, reader=Input::READER_RAW, mode=Input::STREAM, name=input2, fields=, want_record=F, ev=line +{ +print outfile, A::description; +print outfile, A::tpe; +print outfile, A::s; +try = try + 1; +if (2 == try) +{ +Input::remove(input2); +close(outfile); +terminate(); +} + +}, config={ +[stdin] = hello^Jthere^A^B^C^D^E^A^B^Cyay +}] +Input::EVENT_NEW +hello +[source=cat |, reader=Input::READER_RAW, mode=Input::STREAM, name=input2, fields=, want_record=F, ev=line +{ +print outfile, A::description; +print outfile, A::tpe; +print outfile, A::s; +try = try + 1; +if (2 == try) +{ +Input::remove(input2); +close(outfile); +terminate(); +} + +}, config={ +[stdin] = hello^Jthere^A^B^C^D^E^A^B^Cyay +}] +Input::EVENT_NEW +there^A^B^C^D^E^A^B^Cyay diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.execrawstdin/test.txt b/testing/btest/Baseline/scripts.base.frameworks.input.execrawstdin/test.txt new file mode 100644 index 0000000000..0205cd7c3a --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.input.execrawstdin/test.txt @@ -0,0 +1,2 @@ +hello +thereyay \ No newline at end of file diff --git a/testing/btest/scripts/base/frameworks/input/execrawstdin.bro b/testing/btest/scripts/base/frameworks/input/execrawstdin.bro new file mode 100644 index 0000000000..729844e4b4 --- /dev/null +++ b/testing/btest/scripts/base/frameworks/input/execrawstdin.bro @@ -0,0 +1,44 @@ +# @TEST-EXEC: btest-bg-run bro bro -b %INPUT +# @TEST-EXEC: btest-bg-wait -k 5 +# @TEST-EXEC: btest-diff test.txt +# @TEST-EXEC: btest-diff out + +redef exit_only_after_terminate = T; +@load base/frameworks/communication # let network-time run. otherwise there are no heartbeats... + +global outfile: file; +global try: count; + +module A; + +type Val: record { + s: string; +}; + +event line(description: Input::EventDescription, tpe: Input::Event, s: string) + { + print outfile, description; + print outfile, tpe; + print outfile, s; + try = try + 1; + if ( try == 2 ) + { + Input::remove("input2"); + close(outfile); + terminate(); + } + } + +event bro_init() + { + local config_strings: table[string] of string = { + ["stdin"] = "hello\nthere\1\2\3\4\5\1\2\3yay" + #["stdin"] = "yay" + }; + + try = 0; + outfile = open("../out"); + Input::add_event([$source="cat > ../test.txt |", $reader=Input::READER_RAW, $mode=Input::STREAM, $name="input", $fields=Val, $ev=line, $want_record=F, $config=config_strings]); + Input::remove("input"); + Input::add_event([$source="cat |", $reader=Input::READER_RAW, $mode=Input::STREAM, $name="input2", $fields=Val, $ev=line, $want_record=F, $config=config_strings]); + }