From 5dd07e023dfbc01e9fe8d7026b59f1a5bfbfba4a Mon Sep 17 00:00:00 2001 From: Bernhard Amann Date: Thu, 14 Mar 2013 16:14:13 -0700 Subject: [PATCH 01/14] change raw reader to use basic c io instead of fdstream encapsulation class. preparation for future changes. --- src/input/fdstream.h | 189 --------------------------------------- src/input/readers/Raw.cc | 128 +++++++++++++++++++------- src/input/readers/Raw.h | 10 ++- 3 files changed, 105 insertions(+), 222 deletions(-) delete mode 100644 src/input/fdstream.h diff --git a/src/input/fdstream.h b/src/input/fdstream.h deleted file mode 100644 index cda767dd52..0000000000 --- a/src/input/fdstream.h +++ /dev/null @@ -1,189 +0,0 @@ -/* The following code declares classes to read from and write to - * file descriptore or file handles. - * - * See - * http://www.josuttis.com/cppcode - * for details and the latest version. - * - * - open: - * - integrating BUFSIZ on some systems? - * - optimized reading of multiple characters - * - stream for reading AND writing - * - i18n - * - * (C) Copyright Nicolai M. Josuttis 2001. - * Permission to copy, use, modify, sell and distribute this software - * is granted provided this copyright notice appears in all copies. - * This software is provided "as is" without express or implied - * warranty, and with no claim as to its suitability for any purpose. - * - * Version: Jul 28, 2002 - * History: - * Jul 28, 2002: bugfix memcpy() => memmove() - * fdinbuf::underflow(): cast for return statements - * Aug 05, 2001: first public version - */ -#ifndef BOOST_FDSTREAM_HPP -#define BOOST_FDSTREAM_HPP - -#include -#include -#include -// for EOF: -#include -// for memmove(): -#include - - - -// low-level read and write functions -#ifdef _MSC_VER -# include -#else -# include -# include -//extern "C" { -// int write (int fd, const char* buf, int num); -// int read (int fd, char* buf, int num); -//} -#endif - - -// BEGIN namespace BOOST -namespace boost { - - -/************************************************************ - * fdostream - * - a stream that writes on a file descriptor - ************************************************************/ - - -class fdoutbuf : public std::streambuf { - protected: - int fd; // file descriptor - public: - // constructor - fdoutbuf (int _fd) : fd(_fd) { - } - protected: - // write one character - virtual int_type overflow (int_type c) { - if (c != EOF) { - char z = c; - if (write (fd, &z, 1) != 1) { - return EOF; - } - } - return c; - } - // write multiple characters - virtual - std::streamsize xsputn (const char* s, - std::streamsize num) { - return write(fd,s,num); - } -}; - -class fdostream : public std::ostream { - protected: - fdoutbuf buf; - public: - fdostream (int fd) : std::ostream(0), buf(fd) { - rdbuf(&buf); - } -}; - - -/************************************************************ - * fdistream - * - a stream that reads on a file descriptor - ************************************************************/ - -class fdinbuf : public std::streambuf { - protected: - int fd; // file descriptor - protected: - /* data buffer: - * - at most, pbSize characters in putback area plus - * - at most, bufSize characters in ordinary read buffer - */ - static const int pbSize = 4; // size of putback area - static const int bufSize = 1024; // size of the data buffer - char buffer[bufSize+pbSize]; // data buffer - - public: - /* constructor - * - initialize file descriptor - * - initialize empty data buffer - * - no putback area - * => force underflow() - */ - fdinbuf (int _fd) : fd(_fd) { - setg (buffer+pbSize, // beginning of putback area - buffer+pbSize, // read position - buffer+pbSize); // end position - } - - protected: - // insert new characters into the buffer - virtual int_type underflow () { -#ifndef _MSC_VER - using std::memmove; -#endif - - // is read position before end of buffer? - if (gptr() < egptr()) { - return traits_type::to_int_type(*gptr()); - } - - /* process size of putback area - * - use number of characters read - * - but at most size of putback area - */ - int numPutback; - numPutback = gptr() - eback(); - if (numPutback > pbSize) { - numPutback = pbSize; - } - - /* copy up to pbSize characters previously read into - * the putback area - */ - memmove (buffer+(pbSize-numPutback), gptr()-numPutback, - numPutback); - - // read at most bufSize new characters - int num; - num = read (fd, buffer+pbSize, bufSize); - if ( num == EAGAIN ) { - return 0; - } - if (num <= 0) { - // ERROR or EOF - return EOF; - } - - // reset buffer pointers - setg (buffer+(pbSize-numPutback), // beginning of putback area - buffer+pbSize, // read position - buffer+pbSize+num); // end of buffer - - // return next character - return traits_type::to_int_type(*gptr()); - } -}; - -class fdistream : public std::istream { - protected: - fdinbuf buf; - public: - fdistream (int fd) : std::istream(0), buf(fd) { - rdbuf(&buf); - } -}; - - -} // END namespace boost - -#endif /*BOOST_FDSTREAM_HPP*/ diff --git a/src/input/readers/Raw.cc b/src/input/readers/Raw.cc index ac96e5c0f5..7a31be8716 100644 --- a/src/input/readers/Raw.cc +++ b/src/input/readers/Raw.cc @@ -3,32 +3,32 @@ #include "Raw.h" #include "NetVar.h" -#include -#include - #include "../../threading/SerialTypes.h" -#include "../fdstream.h" #include #include #include #include #include +#include using namespace input::reader; using threading::Value; using threading::Field; +const int Raw::block_size = 512; // how big do we expect our chunks of data to be... + Raw::Raw(ReaderFrontend *frontend) : ReaderBackend(frontend) { file = 0; - in = 0; - separator.assign( (const char*) BifConst::InputRaw::record_separator->Bytes(), BifConst::InputRaw::record_separator->Len()); - if ( separator.size() != 1 ) - Error("separator length has to be 1. Separator will be truncated."); + sep_length = BifConst::InputRaw::record_separator->Len(); + + buf = 0; + outbuf = 0; + bufpos = 0; } Raw::~Raw() @@ -47,7 +47,7 @@ bool Raw::OpenInput() if ( execute ) { file = popen(fname.c_str(), "r"); - if ( file == NULL ) + if ( !file ) { Error(Fmt("Could not execute command %s", fname.c_str())); return false; @@ -56,16 +56,13 @@ bool Raw::OpenInput() else { file = fopen(fname.c_str(), "r"); - if ( file == NULL ) + if ( !file ) { Error(Fmt("Init: cannot open %s", fname.c_str())); return false; } } - // This is defined in input/fdstream.h - in = new boost::fdistream(fileno(file)); - if ( execute && Info().mode == MODE_STREAM ) fcntl(fileno(file), F_SETFL, O_NONBLOCK); @@ -74,7 +71,7 @@ bool Raw::OpenInput() bool Raw::CloseInput() { - if ( file == NULL ) + if ( file == 0 ) { InternalError(Fmt("Trying to close closed file for stream %s", fname.c_str())); return false; @@ -83,15 +80,12 @@ bool Raw::CloseInput() Debug(DBG_INPUT, "Raw reader starting close"); #endif - delete in; - if ( execute ) pclose(file); else fclose(file); - in = NULL; - file = NULL; + file = 0; #ifdef DEBUG Debug(DBG_INPUT, "Raw reader finished close"); @@ -169,15 +163,81 @@ bool Raw::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fie } -bool Raw::GetLine(string& str) +int64_t Raw::GetLine() { - if ( in->peek() == std::iostream::traits_type::eof() ) - return false; - if ( in->eofbit == true || in->failbit == true ) - return false; + errno = 0; + uint64_t pos = 0; + + if ( buf == 0 ) + buf = new char[block_size]; + + int repeats = 1; + + for (;;) + { + size_t readbytes = fread(buf+bufpos, 1, block_size-bufpos, file); + pos += bufpos + readbytes; + bufpos = 0; // read full block size in next read... + + if ( errno != 0 ) + break; + + char* token = strnstr(buf, separator.c_str(), block_size*repeats-pos); + + if ( token == 0 ) + { + // we did not find it and have to search again in the next try. resize buffer.... + // but first check if we encountered the file end - because if we did this was it. + if ( feof(file) != 0 ) + { + outbuf = buf; + buf = 0; + if ( pos == 0 ) + return -1; // signal EOF - and that we had no more data. + else + return pos; + } + + repeats++; + // bah, we cannot use realloc because we would have to change the delete in the manager to a delete :( + //char* newbuf = realloc(buf,block_size*repeats); + char * newbuf = new char[block_size*repeats]; + memcpy(newbuf, buf, block_size*(repeats-1)); + delete buf; + buf = newbuf; + } + else + { + outbuf = buf; + buf = 0; + buf = new char[block_size]; + + + if ( token - outbuf < pos ) + { + // we have leftovers. copy them into the buffer for the next line + buf = new char[block_size]; + memcpy(buf, token + sep_length, -(token - outbuf + sep_length) +pos); + bufpos = -(token - outbuf + sep_length) +pos; + } + + pos = token-outbuf; + return pos; + } + + } + + if ( errno == 0 ) { + assert(false); + } else if ( errno == EAGAIN || errno == EAGAIN || errno == EINTR ) { + return -2; + } else { + // an error code we did no expect. This probably is bad. + Error(Fmt("Reader encountered unexpected error code %d", errno)); + return -3; + } - return getline(*in, str, separator[0]); } // read the entire file and send appropriate thingies back to InputMgr @@ -211,10 +271,10 @@ bool Raw::DoUpdate() case MODE_MANUAL: case MODE_STREAM: - if ( Info().mode == MODE_STREAM && file != NULL && in != NULL ) + if ( Info().mode == MODE_STREAM && file != 0 ) { //fpurge(file); - in->clear(); // remove end of file evil bits + clearerr(file); // remove end of file evil bits break; } @@ -230,19 +290,27 @@ bool Raw::DoUpdate() } string line; - while ( GetLine(line) ) + assert (NumFields() == 1); + for ( ;; ) { - assert (NumFields() == 1); + int64_t length = GetLine(); + if ( length == -3 ) + return false; + else if ( length == -2 || length == -1 ) + // no data ready or eof + break; Value** fields = new Value*[1]; // filter has exactly one text field. convert to it. Value* val = new Value(TYPE_STRING, true); - val->val.string_val.data = copy_string(line.c_str()); - val->val.string_val.length = line.size(); + val->val.string_val.data = outbuf; + val->val.string_val.length = length; fields[0] = val; Put(fields); + + outbuf = 0; } #ifdef DEBUG diff --git a/src/input/readers/Raw.h b/src/input/readers/Raw.h index 48912b70a7..07acf1b2c0 100644 --- a/src/input/readers/Raw.h +++ b/src/input/readers/Raw.h @@ -3,7 +3,6 @@ #ifndef INPUT_READERS_RAW_H #define INPUT_READERS_RAW_H -#include #include #include "../ReaderBackend.h" @@ -30,10 +29,9 @@ protected: private: bool OpenInput(); bool CloseInput(); - bool GetLine(string& str); + int64_t GetLine(); string fname; // Source with a potential "|" removed. - istream* in; FILE* file; bool execute; bool firstrun; @@ -41,6 +39,12 @@ private: // options set from the script-level. string separator; + unsigned int sep_length; // length of the separator + + static const int block_size; + uint32_t bufpos; + char* buf; + char* outbuf; }; } From f2d67b5829f1886e94c4aee99a5b2f92b7b98892 Mon Sep 17 00:00:00 2001 From: Bernhard Amann Date: Fri, 15 Mar 2013 13:32:28 -0700 Subject: [PATCH 02/14] replace popen with fork and exec. Note for future: eof only gets sent when the parent closes its in-pipe after forking. --- src/input/readers/Raw.cc | 65 +++++++++++++++++++++++++++++++++++----- src/input/readers/Raw.h | 5 ++++ 2 files changed, 62 insertions(+), 8 deletions(-) diff --git a/src/input/readers/Raw.cc b/src/input/readers/Raw.cc index 7a31be8716..4c0fd988d3 100644 --- a/src/input/readers/Raw.cc +++ b/src/input/readers/Raw.cc @@ -29,6 +29,15 @@ Raw::Raw(ReaderFrontend *frontend) : ReaderBackend(frontend) buf = 0; outbuf = 0; bufpos = 0; + + stdin_fileno = fileno(stdin); + stdout_fileno = fileno(stdout); + stderr_fileno = fileno(stderr); + + // and because we later assume this... + assert(stdin_fileno == 0); + assert(stdout_fileno == 1); + assert(stderr_fileno == 2); } Raw::~Raw() @@ -42,16 +51,53 @@ void Raw::DoClose() CloseInput(); } +bool Raw::Execute() + { + int stdout_pipe[2]; + pid_t pid; + + if (pipe(stdout_pipe) != 0) + { + Error(Fmt("Could not open pipe: %d", errno)); + return false; + } + + pid = fork(); + if ( pid < 0 ) + { + Error(Fmt("Could not create child process: %d", errno)); + return false; + } + else if ( pid == 0 ) + { + // we are the child. + close(stdout_pipe[stdin_fileno]); + dup2(stdout_pipe[stdout_fileno], stdout_fileno); + //execv("/usr/bin/uname",test); + execl("/bin/sh", "sh", "-c", fname.c_str(), NULL); + fprintf(stderr, "Exec failed :(......\n"); + exit(255); + } + else + { + // we are the parent + close(stdout_pipe[stdout_fileno]); + file = fdopen(stdout_pipe[stdin_fileno], "r"); + if ( file == 0 ) + { + Error("Could not convert fileno to file"); + return false; + } + return true; + } + } + bool Raw::OpenInput() { if ( execute ) { - file = popen(fname.c_str(), "r"); - if ( !file ) - { - Error(Fmt("Could not execute command %s", fname.c_str())); + if ( ! Execute() ) return false; - } } else { @@ -63,9 +109,10 @@ bool Raw::OpenInput() } } - if ( execute && Info().mode == MODE_STREAM ) - fcntl(fileno(file), F_SETFL, O_NONBLOCK); + //if ( execute && Info().mode == MODE_STREAM ) + // fcntl(fileno(file), F_SETFL, O_NONBLOCK); + //fcntl(fileno(file), F_SETFD, FD_CLOEXEC); return true; } @@ -130,12 +177,14 @@ bool Raw::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fie execute = true; fname = source.substr(0, fname.length() - 1); + /* if ( (info.mode != MODE_MANUAL) ) { Error(Fmt("Unsupported read mode %d for source %s in execution mode", info.mode, fname.c_str())); return false; } + */ result = OpenInput(); @@ -299,7 +348,7 @@ bool Raw::DoUpdate() else if ( length == -2 || length == -1 ) // no data ready or eof break; - + Value** fields = new Value*[1]; // filter has exactly one text field. convert to it. diff --git a/src/input/readers/Raw.h b/src/input/readers/Raw.h index 07acf1b2c0..bd87648493 100644 --- a/src/input/readers/Raw.h +++ b/src/input/readers/Raw.h @@ -40,11 +40,16 @@ private: // options set from the script-level. string separator; unsigned int sep_length; // length of the separator + bool Execute(); static const int block_size; uint32_t bufpos; char* buf; char* outbuf; + + int stdin_fileno; + int stdout_fileno; + int stderr_fileno; }; } From fc42c71dfa497325613a004e442391753ea9277a Mon Sep 17 00:00:00 2001 From: Bernhard Amann Date: Fri, 15 Mar 2013 13:58:41 -0700 Subject: [PATCH 03/14] Streaming reads from external commands work without blocking anything. --- src/input/readers/Raw.cc | 34 ++-- src/input/readers/Raw.h | 2 + .../out | 153 ++++++++++++++++++ .../frameworks/input/executestreamraw.bro | 61 +++++++ 4 files changed, 235 insertions(+), 15 deletions(-) create mode 100644 testing/btest/Baseline/scripts.base.frameworks.input.executestreamraw/out create mode 100644 testing/btest/scripts/base/frameworks/input/executestreamraw.bro diff --git a/src/input/readers/Raw.cc b/src/input/readers/Raw.cc index 4c0fd988d3..40215dabee 100644 --- a/src/input/readers/Raw.cc +++ b/src/input/readers/Raw.cc @@ -11,6 +11,7 @@ #include #include #include +#include using namespace input::reader; using threading::Value; @@ -38,6 +39,8 @@ Raw::Raw(ReaderFrontend *frontend) : ReaderBackend(frontend) assert(stdin_fileno == 0); assert(stdout_fileno == 1); assert(stderr_fileno == 2); + + childpid = -1; } Raw::~Raw() @@ -49,12 +52,15 @@ void Raw::DoClose() { if ( file != 0 ) CloseInput(); + + if ( execute && childpid > 0 ) + // kill child process + kill(childpid, 9); // TERMINATOR } bool Raw::Execute() { int stdout_pipe[2]; - pid_t pid; if (pipe(stdout_pipe) != 0) { @@ -62,13 +68,13 @@ bool Raw::Execute() return false; } - pid = fork(); - if ( pid < 0 ) + childpid = fork(); + if ( childpid < 0 ) { Error(Fmt("Could not create child process: %d", errno)); return false; } - else if ( pid == 0 ) + else if ( childpid == 0 ) { // we are the child. close(stdout_pipe[stdin_fileno]); @@ -82,6 +88,10 @@ bool Raw::Execute() { // we are the parent close(stdout_pipe[stdout_fileno]); + + if ( Info().mode == MODE_STREAM ) + fcntl(stdout_pipe[stdin_fileno], F_SETFL, O_NONBLOCK); + file = fdopen(stdout_pipe[stdin_fileno], "r"); if ( file == 0 ) { @@ -102,6 +112,7 @@ bool Raw::OpenInput() else { file = fopen(fname.c_str(), "r"); + fcntl(fileno(file), F_SETFD, FD_CLOEXEC); if ( !file ) { Error(Fmt("Init: cannot open %s", fname.c_str())); @@ -177,15 +188,6 @@ bool Raw::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fie execute = true; fname = source.substr(0, fname.length() - 1); - /* - if ( (info.mode != MODE_MANUAL) ) - { - Error(Fmt("Unsupported read mode %d for source %s in execution mode", - info.mode, fname.c_str())); - return false; - } - */ - result = OpenInput(); } @@ -229,7 +231,7 @@ int64_t Raw::GetLine() pos += bufpos + readbytes; bufpos = 0; // read full block size in next read... - if ( errno != 0 ) + if ( pos == 0 && errno != 0 ) break; char* token = strnstr(buf, separator.c_str(), block_size*repeats-pos); @@ -279,7 +281,7 @@ int64_t Raw::GetLine() if ( errno == 0 ) { assert(false); - } else if ( errno == EAGAIN || errno == EAGAIN || errno == EINTR ) { + } else if ( errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR ) { return -2; } else { // an error code we did no expect. This probably is bad. @@ -343,6 +345,8 @@ bool Raw::DoUpdate() for ( ;; ) { int64_t length = GetLine(); + //printf("Read %lld bytes", length); + if ( length == -3 ) return false; else if ( length == -2 || length == -1 ) diff --git a/src/input/readers/Raw.h b/src/input/readers/Raw.h index bd87648493..d550716c48 100644 --- a/src/input/readers/Raw.h +++ b/src/input/readers/Raw.h @@ -50,6 +50,8 @@ private: int stdin_fileno; int stdout_fileno; int stderr_fileno; + + pid_t childpid; }; } diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.executestreamraw/out b/testing/btest/Baseline/scripts.base.frameworks.input.executestreamraw/out new file mode 100644 index 0000000000..59a5f2c116 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.input.executestreamraw/out @@ -0,0 +1,153 @@ +[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, name=input, fields=, want_record=F, ev=line +{ +print A::outfile, A::description; +print A::outfile, A::tpe; +print A::outfile, A::s; +A::try = A::try + 1; +if (8 == A::try) +{ +print A::outfile, done; +close(A::outfile); +Input::remove(input); +terminate(); +} + +}, config={ + +}] +Input::EVENT_NEW +sdfkh:KH;fdkncv;ISEUp34:Fkdj;YVpIODhfDF +[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, name=input, fields=, want_record=F, ev=line +{ +print A::outfile, A::description; +print A::outfile, A::tpe; +print A::outfile, A::s; +A::try = A::try + 1; +if (8 == A::try) +{ +print A::outfile, done; +close(A::outfile); +Input::remove(input); +terminate(); +} + +}, config={ + +}] +Input::EVENT_NEW +DSF"DFKJ"SDFKLh304yrsdkfj@#(*U$34jfDJup3UF +[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, name=input, fields=, want_record=F, ev=line +{ +print A::outfile, A::description; +print A::outfile, A::tpe; +print A::outfile, A::s; +A::try = A::try + 1; +if (8 == A::try) +{ +print A::outfile, done; +close(A::outfile); +Input::remove(input); +terminate(); +} + +}, config={ + +}] +Input::EVENT_NEW +q3r3057fdf +[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, name=input, fields=, want_record=F, ev=line +{ +print A::outfile, A::description; +print A::outfile, A::tpe; +print A::outfile, A::s; +A::try = A::try + 1; +if (8 == A::try) +{ +print A::outfile, done; +close(A::outfile); +Input::remove(input); +terminate(); +} + +}, config={ + +}] +Input::EVENT_NEW +sdfs\d +[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, name=input, fields=, want_record=F, ev=line +{ +print A::outfile, A::description; +print A::outfile, A::tpe; +print A::outfile, A::s; +A::try = A::try + 1; +if (8 == A::try) +{ +print A::outfile, done; +close(A::outfile); +Input::remove(input); +terminate(); +} + +}, config={ + +}] +Input::EVENT_NEW + +[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, name=input, fields=, want_record=F, ev=line +{ +print A::outfile, A::description; +print A::outfile, A::tpe; +print A::outfile, A::s; +A::try = A::try + 1; +if (8 == A::try) +{ +print A::outfile, done; +close(A::outfile); +Input::remove(input); +terminate(); +} + +}, config={ + +}] +Input::EVENT_NEW +dfsdf +[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, name=input, fields=, want_record=F, ev=line +{ +print A::outfile, A::description; +print A::outfile, A::tpe; +print A::outfile, A::s; +A::try = A::try + 1; +if (8 == A::try) +{ +print A::outfile, done; +close(A::outfile); +Input::remove(input); +terminate(); +} + +}, config={ + +}] +Input::EVENT_NEW +sdf +[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, name=input, fields=, want_record=F, ev=line +{ +print A::outfile, A::description; +print A::outfile, A::tpe; +print A::outfile, A::s; +A::try = A::try + 1; +if (8 == A::try) +{ +print A::outfile, done; +close(A::outfile); +Input::remove(input); +terminate(); +} + +}, config={ + +}] +Input::EVENT_NEW +3rw43wRRERLlL#RWERERERE. +done diff --git a/testing/btest/scripts/base/frameworks/input/executestreamraw.bro b/testing/btest/scripts/base/frameworks/input/executestreamraw.bro new file mode 100644 index 0000000000..ead33018dc --- /dev/null +++ b/testing/btest/scripts/base/frameworks/input/executestreamraw.bro @@ -0,0 +1,61 @@ +# @TEST-EXEC: cp input1.log input.log +# @TEST-EXEC: btest-bg-run bro bro -b %INPUT +# @TEST-EXEC: sleep 3 +# @TEST-EXEC: cat input2.log >> input.log +# @TEST-EXEC: sleep 3 +# @TEST-EXEC: cat input3.log >> input.log +# @TEST-EXEC: btest-bg-wait -k 5 +# @TEST-EXEC: btest-diff out + +redef exit_only_after_terminate = T; + +@TEST-START-FILE input1.log +sdfkh:KH;fdkncv;ISEUp34:Fkdj;YVpIODhfDF +@TEST-END-FILE + +@TEST-START-FILE input2.log +DSF"DFKJ"SDFKLh304yrsdkfj@#(*U$34jfDJup3UF +q3r3057fdf +@TEST-END-FILE + +@TEST-START-FILE input3.log +sdfs\d + +dfsdf +sdf +3rw43wRRERLlL#RWERERERE. +@TEST-END-FILE + +@load base/frameworks/communication # let network-time run + +module A; + +type Val: record { + s: string; +}; + +global try: count; +global outfile: file; + +event line(description: Input::EventDescription, tpe: Input::Event, s: string) + { + print outfile, description; + print outfile, tpe; + print outfile, s; + + try = try + 1; + if ( try == 8 ) + { + print outfile, "done"; + close(outfile); + Input::remove("input"); + terminate(); + } + } + +event bro_init() + { + outfile = open("../out"); + try = 0; + Input::add_event([$source="tail -f ../input.log |", $reader=Input::READER_RAW, $mode=Input::STREAM, $name="input", $fields=Val, $ev=line, $want_record=F]); + } From 3aeec7ec14323acc62f785c35df6d710aeeec0f1 Mon Sep 17 00:00:00 2001 From: Bernhard Amann Date: Fri, 15 Mar 2013 15:47:20 -0700 Subject: [PATCH 04/14] allow sending data to stdin of child process --- src/input/readers/Raw.cc | 87 ++++++++++++++----- src/input/readers/Raw.h | 21 ++++- .../out | 36 ++++++++ .../test.txt | 2 + .../base/frameworks/input/execrawstdin.bro | 44 ++++++++++ 5 files changed, 166 insertions(+), 24 deletions(-) create mode 100644 testing/btest/Baseline/scripts.base.frameworks.input.execrawstdin/out create mode 100644 testing/btest/Baseline/scripts.base.frameworks.input.execrawstdin/test.txt create mode 100644 testing/btest/scripts/base/frameworks/input/execrawstdin.bro diff --git a/src/input/readers/Raw.cc b/src/input/readers/Raw.cc index 40215dabee..c4dfe55b93 100644 --- a/src/input/readers/Raw.cc +++ b/src/input/readers/Raw.cc @@ -19,6 +19,7 @@ using threading::Field; const int Raw::block_size = 512; // how big do we expect our chunks of data to be... + Raw::Raw(ReaderFrontend *frontend) : ReaderBackend(frontend) { file = 0; @@ -41,6 +42,8 @@ Raw::Raw(ReaderFrontend *frontend) : ReaderBackend(frontend) assert(stderr_fileno == 2); childpid = -1; + + stdin_towrite = 0; // by default do not open stdin } Raw::~Raw() @@ -53,6 +56,7 @@ void Raw::DoClose() if ( file != 0 ) CloseInput(); + if ( execute && childpid > 0 ) // kill child process kill(childpid, 9); // TERMINATOR @@ -60,9 +64,8 @@ void Raw::DoClose() bool Raw::Execute() { - int stdout_pipe[2]; - if (pipe(stdout_pipe) != 0) + if (pipe(pipes) != 0 || pipe(pipes+2) || pipe(pipes+4) ) { Error(Fmt("Could not open pipe: %d", errno)); return false; @@ -77,8 +80,15 @@ bool Raw::Execute() else if ( childpid == 0 ) { // we are the child. - close(stdout_pipe[stdin_fileno]); - dup2(stdout_pipe[stdout_fileno], stdout_fileno); + close(pipes[stdout_in]); + dup2(pipes[stdout_out], stdout_fileno); + + if ( stdin_towrite ) + { + close(pipes[stdin_out]); + dup2(pipes[stdin_in], stdin_fileno); + } + //execv("/usr/bin/uname",test); execl("/bin/sh", "sh", "-c", fname.c_str(), NULL); fprintf(stderr, "Exec failed :(......\n"); @@ -87,12 +97,18 @@ bool Raw::Execute() else { // we are the parent - close(stdout_pipe[stdout_fileno]); + close(pipes[stdout_out]); if ( Info().mode == MODE_STREAM ) - fcntl(stdout_pipe[stdin_fileno], F_SETFL, O_NONBLOCK); + fcntl(pipes[stdout_in], F_SETFL, O_NONBLOCK); + + if ( stdin_towrite ) + { + close(pipes[stdin_in]); + fcntl(pipes[stdin_out], F_SETFL, O_NONBLOCK); + } - file = fdopen(stdout_pipe[stdin_fileno], "r"); + file = fdopen(pipes[stdout_in], "r"); if ( file == 0 ) { Error("Could not convert fileno to file"); @@ -106,8 +122,7 @@ bool Raw::OpenInput() { if ( execute ) { - if ( ! Execute() ) - return false; + return Execute(); } else { @@ -120,10 +135,6 @@ bool Raw::OpenInput() } } - //if ( execute && Info().mode == MODE_STREAM ) - // fcntl(fileno(file), F_SETFL, O_NONBLOCK); - - //fcntl(fileno(file), F_SETFD, FD_CLOEXEC); return true; } @@ -138,8 +149,11 @@ bool Raw::CloseInput() Debug(DBG_INPUT, "Raw reader starting close"); #endif - if ( execute ) - pclose(file); + if ( execute ) // we do not care if any of those fails. They should all be defined. + { + for ( int i = 0; i < 6; i ++ ) + close(pipes[i]); + } else fclose(file); @@ -166,6 +180,13 @@ bool Raw::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fie return false; } + map::const_iterator it = info.config.find("stdin"); // data that is sent to the child process + if ( it != info.config.end() ) + { + stdin_string = it->second; + stdin_towrite = stdin_string.length(); + } + if ( num_fields != 1 ) { Error("Filter for raw reader contains more than one field. " @@ -214,9 +235,8 @@ bool Raw::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fie } -int64_t Raw::GetLine() +int64_t Raw::GetLine(FILE* arg_file) { - errno = 0; uint64_t pos = 0; @@ -227,7 +247,7 @@ int64_t Raw::GetLine() for (;;) { - size_t readbytes = fread(buf+bufpos, 1, block_size-bufpos, file); + size_t readbytes = fread(buf+bufpos, 1, block_size-bufpos, arg_file); pos += bufpos + readbytes; bufpos = 0; // read full block size in next read... @@ -240,7 +260,7 @@ int64_t Raw::GetLine() { // we did not find it and have to search again in the next try. resize buffer.... // but first check if we encountered the file end - because if we did this was it. - if ( feof(file) != 0 ) + if ( feof(arg_file) != 0 ) { outbuf = buf; buf = 0; @@ -291,6 +311,28 @@ int64_t Raw::GetLine() } +// write to the stdin of the child process +void Raw::WriteToStdin() + { + assert(stdin_towrite <= stdin_string.length()); + uint64_t pos = stdin_string.length() - stdin_towrite; + + errno = 0; + ssize_t written = write(pipes[stdin_out], stdin_string.c_str() + pos, stdin_towrite); + stdin_towrite -= written; + + if ( errno != 0 && errno != EAGAIN && errno != EWOULDBLOCK ) + { + Error(Fmt("Writing to child process stdin failed: %d. Stopping writing at position %d", errno, pos)); + stdin_towrite = 0; + close(pipes[stdin_out]); + } + + if ( stdin_towrite == 0 ) // send EOF when we are done. + printf("Closing %d\n", pipes[stdin_out]); + close(pipes[stdin_out]); + } + // read the entire file and send appropriate thingies back to InputMgr bool Raw::DoUpdate() { @@ -344,8 +386,11 @@ bool Raw::DoUpdate() assert (NumFields() == 1); for ( ;; ) { - int64_t length = GetLine(); - //printf("Read %lld bytes", length); + if ( stdin_towrite > 0 ) + WriteToStdin(); + + int64_t length = GetLine(file); + //printf("Read %lld bytes\n", length); if ( length == -3 ) return false; diff --git a/src/input/readers/Raw.h b/src/input/readers/Raw.h index d550716c48..cf29609331 100644 --- a/src/input/readers/Raw.h +++ b/src/input/readers/Raw.h @@ -29,7 +29,9 @@ protected: private: bool OpenInput(); bool CloseInput(); - int64_t GetLine(); + int64_t GetLine(FILE* file); + bool Execute(); + void WriteToStdin(); string fname; // Source with a potential "|" removed. FILE* file; @@ -40,10 +42,9 @@ private: // options set from the script-level. string separator; unsigned int sep_length; // length of the separator - bool Execute(); static const int block_size; - uint32_t bufpos; + uint64_t bufpos; char* buf; char* outbuf; @@ -51,7 +52,21 @@ private: int stdout_fileno; int stderr_fileno; + string stdin_string; + uint64_t stdin_towrite; + + int pipes[6]; pid_t childpid; + + enum IoChannels { + stdout_in = 0, + stdout_out = 1, + stdin_in = 2, + stdin_out = 3, + stderr_in = 4, + stderr_out = 5 + }; + }; } diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.execrawstdin/out b/testing/btest/Baseline/scripts.base.frameworks.input.execrawstdin/out new file mode 100644 index 0000000000..c49aee85b3 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.input.execrawstdin/out @@ -0,0 +1,36 @@ +[source=cat |, reader=Input::READER_RAW, mode=Input::STREAM, name=input2, fields=, want_record=F, ev=line +{ +print outfile, A::description; +print outfile, A::tpe; +print outfile, A::s; +try = try + 1; +if (2 == try) +{ +Input::remove(input2); +close(outfile); +terminate(); +} + +}, config={ +[stdin] = hello^Jthere^A^B^C^D^E^A^B^Cyay +}] +Input::EVENT_NEW +hello +[source=cat |, reader=Input::READER_RAW, mode=Input::STREAM, name=input2, fields=, want_record=F, ev=line +{ +print outfile, A::description; +print outfile, A::tpe; +print outfile, A::s; +try = try + 1; +if (2 == try) +{ +Input::remove(input2); +close(outfile); +terminate(); +} + +}, config={ +[stdin] = hello^Jthere^A^B^C^D^E^A^B^Cyay +}] +Input::EVENT_NEW +there^A^B^C^D^E^A^B^Cyay diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.execrawstdin/test.txt b/testing/btest/Baseline/scripts.base.frameworks.input.execrawstdin/test.txt new file mode 100644 index 0000000000..0205cd7c3a --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.input.execrawstdin/test.txt @@ -0,0 +1,2 @@ +hello +thereyay \ No newline at end of file diff --git a/testing/btest/scripts/base/frameworks/input/execrawstdin.bro b/testing/btest/scripts/base/frameworks/input/execrawstdin.bro new file mode 100644 index 0000000000..729844e4b4 --- /dev/null +++ b/testing/btest/scripts/base/frameworks/input/execrawstdin.bro @@ -0,0 +1,44 @@ +# @TEST-EXEC: btest-bg-run bro bro -b %INPUT +# @TEST-EXEC: btest-bg-wait -k 5 +# @TEST-EXEC: btest-diff test.txt +# @TEST-EXEC: btest-diff out + +redef exit_only_after_terminate = T; +@load base/frameworks/communication # let network-time run. otherwise there are no heartbeats... + +global outfile: file; +global try: count; + +module A; + +type Val: record { + s: string; +}; + +event line(description: Input::EventDescription, tpe: Input::Event, s: string) + { + print outfile, description; + print outfile, tpe; + print outfile, s; + try = try + 1; + if ( try == 2 ) + { + Input::remove("input2"); + close(outfile); + terminate(); + } + } + +event bro_init() + { + local config_strings: table[string] of string = { + ["stdin"] = "hello\nthere\1\2\3\4\5\1\2\3yay" + #["stdin"] = "yay" + }; + + try = 0; + outfile = open("../out"); + Input::add_event([$source="cat > ../test.txt |", $reader=Input::READER_RAW, $mode=Input::STREAM, $name="input", $fields=Val, $ev=line, $want_record=F, $config=config_strings]); + Input::remove("input"); + Input::add_event([$source="cat |", $reader=Input::READER_RAW, $mode=Input::STREAM, $name="input2", $fields=Val, $ev=line, $want_record=F, $config=config_strings]); + } From 6fef99ee0337964eb8f89828d59eb7cee0d63f22 Mon Sep 17 00:00:00 2001 From: Bernhard Amann Date: Sat, 16 Mar 2013 09:54:48 -0700 Subject: [PATCH 05/14] make reading from stdout and stderr simultaneously work. Needs a few test-cases - but seems ok... --- src/input/readers/Raw.cc | 105 ++++++++++--- src/input/readers/Raw.h | 3 + .../out | 148 ++++++++++++++++++ .../input/executestreamrawstderr.bro | 44 ++++++ 4 files changed, 274 insertions(+), 26 deletions(-) create mode 100644 testing/btest/Baseline/scripts.base.frameworks.input.executestreamrawstderr/out create mode 100644 testing/btest/scripts/base/frameworks/input/executestreamrawstderr.bro diff --git a/src/input/readers/Raw.cc b/src/input/readers/Raw.cc index c4dfe55b93..850dda3a39 100644 --- a/src/input/readers/Raw.cc +++ b/src/input/readers/Raw.cc @@ -44,6 +44,7 @@ Raw::Raw(ReaderFrontend *frontend) : ReaderBackend(frontend) childpid = -1; stdin_towrite = 0; // by default do not open stdin + use_stderr = false; } Raw::~Raw() @@ -89,6 +90,12 @@ bool Raw::Execute() dup2(pipes[stdin_in], stdin_fileno); } + if ( use_stderr ) + { + close(pipes[stderr_in]); + dup2(pipes[stderr_out], stderr_fileno); + } + //execv("/usr/bin/uname",test); execl("/bin/sh", "sh", "-c", fname.c_str(), NULL); fprintf(stderr, "Exec failed :(......\n"); @@ -107,13 +114,22 @@ bool Raw::Execute() close(pipes[stdin_in]); fcntl(pipes[stdin_out], F_SETFL, O_NONBLOCK); } + + if ( use_stderr ) + { + close(pipes[stderr_out]); + fcntl(pipes[stderr_in], F_SETFL, O_NONBLOCK); + } file = fdopen(pipes[stdout_in], "r"); - if ( file == 0 ) + stderrfile = fdopen(pipes[stderr_in], "r"); + if ( file == 0 || (stderrfile == 0 && use_stderr) ) { Error("Could not convert fileno to file"); return false; } + + return true; } } @@ -172,7 +188,17 @@ bool Raw::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fie mtime = 0; execute = false; firstrun = true; + int want_fields = 1; bool result; + + // do Initialization + string source = string(info.source); + char last = info.source[source.length() - 1]; + if ( last == '|' ) + { + execute = true; + fname = source.substr(0, fname.length() - 1); + } if ( ! info.source || strlen(info.source) == 0 ) { @@ -186,38 +212,35 @@ bool Raw::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fie stdin_string = it->second; stdin_towrite = stdin_string.length(); } - - if ( num_fields != 1 ) + + it = info.config.find("read_stderr"); // we want to read stderr + if ( it != info.config.end() && execute ) { - Error("Filter for raw reader contains more than one field. " - "Filters for the raw reader may only contain exactly one string field. " - "Filter ignored."); + use_stderr = true; + want_fields = 2; + } + + if ( num_fields != want_fields ) + { + Error(Fmt("Filter for raw reader contains wrong number of fields -- got %d, expected %d. " + "Filters for the raw reader contain one field when used in normal mode and 2 fields when using execute mode with stderr capuring. " + "Filter ignored.", num_fields, want_fields)); return false; } if ( fields[0]->type != TYPE_STRING ) { - Error("Filter for raw reader contains a field that is not of type string."); + Error("First field for raw reader always has to be of type string."); return false; } - - // do Initialization - string source = string(info.source); - char last = info.source[source.length() - 1]; - if ( last == '|' ) + if ( use_stderr && fields[1]->type != TYPE_BOOL ) { - execute = true; - fname = source.substr(0, fname.length() - 1); - - result = OpenInput(); - - } - else - { - execute = false; - result = OpenInput(); + Error("Second field for raw reader always has to be of type bool."); } + + result = OpenInput(); + if ( result == false ) return result; @@ -329,7 +352,6 @@ void Raw::WriteToStdin() } if ( stdin_towrite == 0 ) // send EOF when we are done. - printf("Closing %d\n", pipes[stdin_out]); close(pipes[stdin_out]); } @@ -383,14 +405,14 @@ bool Raw::DoUpdate() } string line; - assert (NumFields() == 1); + assert ( (NumFields() == 1 && !use_stderr) || (NumFields() == 2 && use_stderr)); for ( ;; ) { if ( stdin_towrite > 0 ) WriteToStdin(); int64_t length = GetLine(file); - //printf("Read %lld bytes\n", length); + printf("Read %lld bytes\n", length); if ( length == -3 ) return false; @@ -398,7 +420,7 @@ bool Raw::DoUpdate() // no data ready or eof break; - Value** fields = new Value*[1]; + Value** fields = new Value*[2]; // just always reserve 2. This means that our [] is too long by a count of 1 if not using stderr. But who cares... // filter has exactly one text field. convert to it. Value* val = new Value(TYPE_STRING, true); @@ -406,11 +428,42 @@ bool Raw::DoUpdate() val->val.string_val.length = length; fields[0] = val; + if ( use_stderr ) + { + Value* bval = new Value(TYPE_BOOL, true); + bval->val.int_val = 0; + fields[1] = bval; + } + Put(fields); outbuf = 0; } + if ( use_stderr ) + for ( ;; ) + { + int64_t length = GetLine(stderrfile); + printf("Read stderr %lld bytes\n", length); + if ( length == -3 ) + return false; + else if ( length == -2 || length == -1 ) + break; + + Value** fields = new Value*[2]; + Value* val = new Value(TYPE_STRING, true); + val->val.string_val.data = outbuf; + val->val.string_val.length = length; + fields[0] = val; + Value* bval = new Value(TYPE_BOOL, true); + bval->val.int_val = 1; // yes, we are stderr + fields[1] = bval; + + Put(fields); + + outbuf = 0; + } + #ifdef DEBUG Debug(DBG_INPUT, "DoUpdate finished successfully"); #endif diff --git a/src/input/readers/Raw.h b/src/input/readers/Raw.h index cf29609331..d79a80c31e 100644 --- a/src/input/readers/Raw.h +++ b/src/input/readers/Raw.h @@ -35,6 +35,7 @@ private: string fname; // Source with a potential "|" removed. FILE* file; + FILE* stderrfile; bool execute; bool firstrun; time_t mtime; @@ -55,6 +56,8 @@ private: string stdin_string; uint64_t stdin_towrite; + bool use_stderr; + int pipes[6]; pid_t childpid; diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.executestreamrawstderr/out b/testing/btest/Baseline/scripts.base.frameworks.input.executestreamrawstderr/out new file mode 100644 index 0000000000..55c4167ef8 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.input.executestreamrawstderr/out @@ -0,0 +1,148 @@ +[source=ls .. ../nonexistant ../nonexistant2 ../nonexistant3 |, reader=Input::READER_RAW, mode=Input::MANUAL, name=input, fields=, want_record=F, ev=line +{ +print A::outfile, A::description; +print A::outfile, A::tpe; +print A::outfile, A::s; +print A::outfile, A::is_stderr; +A::try = A::try + 1; +if (7 == A::try) +{ +print A::outfile, done; +close(A::outfile); +Input::remove(input); +terminate(); +} + +}, config={ +[read_stderr] = 1 +}] +Input::EVENT_NEW +..: +F +[source=ls .. ../nonexistant ../nonexistant2 ../nonexistant3 |, reader=Input::READER_RAW, mode=Input::MANUAL, name=input, fields=, want_record=F, ev=line +{ +print A::outfile, A::description; +print A::outfile, A::tpe; +print A::outfile, A::s; +print A::outfile, A::is_stderr; +A::try = A::try + 1; +if (7 == A::try) +{ +print A::outfile, done; +close(A::outfile); +Input::remove(input); +terminate(); +} + +}, config={ +[read_stderr] = 1 +}] +Input::EVENT_NEW +bro +F +[source=ls .. ../nonexistant ../nonexistant2 ../nonexistant3 |, reader=Input::READER_RAW, mode=Input::MANUAL, name=input, fields=, want_record=F, ev=line +{ +print A::outfile, A::description; +print A::outfile, A::tpe; +print A::outfile, A::s; +print A::outfile, A::is_stderr; +A::try = A::try + 1; +if (7 == A::try) +{ +print A::outfile, done; +close(A::outfile); +Input::remove(input); +terminate(); +} + +}, config={ +[read_stderr] = 1 +}] +Input::EVENT_NEW +executestreamrawstderr.bro +F +[source=ls .. ../nonexistant ../nonexistant2 ../nonexistant3 |, reader=Input::READER_RAW, mode=Input::MANUAL, name=input, fields=, want_record=F, ev=line +{ +print A::outfile, A::description; +print A::outfile, A::tpe; +print A::outfile, A::s; +print A::outfile, A::is_stderr; +A::try = A::try + 1; +if (7 == A::try) +{ +print A::outfile, done; +close(A::outfile); +Input::remove(input); +terminate(); +} + +}, config={ +[read_stderr] = 1 +}] +Input::EVENT_NEW +out +F +[source=ls .. ../nonexistant ../nonexistant2 ../nonexistant3 |, reader=Input::READER_RAW, mode=Input::MANUAL, name=input, fields=, want_record=F, ev=line +{ +print A::outfile, A::description; +print A::outfile, A::tpe; +print A::outfile, A::s; +print A::outfile, A::is_stderr; +A::try = A::try + 1; +if (7 == A::try) +{ +print A::outfile, done; +close(A::outfile); +Input::remove(input); +terminate(); +} + +}, config={ +[read_stderr] = 1 +}] +Input::EVENT_NEW +ls: ../nonexistant: No such file or directory +T +[source=ls .. ../nonexistant ../nonexistant2 ../nonexistant3 |, reader=Input::READER_RAW, mode=Input::MANUAL, name=input, fields=, want_record=F, ev=line +{ +print A::outfile, A::description; +print A::outfile, A::tpe; +print A::outfile, A::s; +print A::outfile, A::is_stderr; +A::try = A::try + 1; +if (7 == A::try) +{ +print A::outfile, done; +close(A::outfile); +Input::remove(input); +terminate(); +} + +}, config={ +[read_stderr] = 1 +}] +Input::EVENT_NEW +ls: ../nonexistant2: No such file or directory +T +[source=ls .. ../nonexistant ../nonexistant2 ../nonexistant3 |, reader=Input::READER_RAW, mode=Input::MANUAL, name=input, fields=, want_record=F, ev=line +{ +print A::outfile, A::description; +print A::outfile, A::tpe; +print A::outfile, A::s; +print A::outfile, A::is_stderr; +A::try = A::try + 1; +if (7 == A::try) +{ +print A::outfile, done; +close(A::outfile); +Input::remove(input); +terminate(); +} + +}, config={ +[read_stderr] = 1 +}] +Input::EVENT_NEW +ls: ../nonexistant3: No such file or directory +T +done diff --git a/testing/btest/scripts/base/frameworks/input/executestreamrawstderr.bro b/testing/btest/scripts/base/frameworks/input/executestreamrawstderr.bro new file mode 100644 index 0000000000..7e7c640112 --- /dev/null +++ b/testing/btest/scripts/base/frameworks/input/executestreamrawstderr.bro @@ -0,0 +1,44 @@ +# @TEST-EXEC: btest-bg-run bro bro -b %INPUT +# @TEST-EXEC: btest-bg-wait -k 5 +# @TEST-EXEC: btest-diff out + +redef exit_only_after_terminate = T; + +module A; + +type Val: record { + s: string; + is_stderr: bool; +}; + +global try: count; +global outfile: file; + +event line(description: Input::EventDescription, tpe: Input::Event, s: string, is_stderr: bool) + { + print outfile, description; + print outfile, tpe; + print outfile, s; + print outfile, is_stderr; + + try = try + 1; + if ( try == 7 ) + { + print outfile, "done"; + close(outfile); + Input::remove("input"); + terminate(); + } + } + +event bro_init() + { + + local config_strings: table[string] of string = { + ["read_stderr"] = "1" + }; + + outfile = open("../out"); + try = 0; + Input::add_event([$source="ls .. ../nonexistant ../nonexistant2 ../nonexistant3 |", $reader=Input::READER_RAW, $name="input", $fields=Val, $ev=line, $want_record=F, $config=config_strings]); + } From 887595375174eb0ac8405b775b17e3da8c9dc58f Mon Sep 17 00:00:00 2001 From: Bernhard Amann Date: Mon, 18 Mar 2013 21:49:16 -0700 Subject: [PATCH 06/14] A bunch of more changes for the raw reader * send end_of_data event for all kind of streams * send process_finished event containing exit code of child process for executed programs * move raw-tests to separate directory * expose name of input stream to readers * better handling of some error cases in raw reader * new force_kill option for raw reader which SIGKILLs progesses on exit The ordering of events how they arrive in the main loop is a bit peculiar at the moment. The process_finished event arrives in scriptland before all of the other events, even though it should be sent last. I have not yet fully figured that out. --- scripts/base/frameworks/input/readers/raw.bro | 8 + src/input/Manager.cc | 8 + src/input/ReaderBackend.h | 7 + src/input/readers/Raw.cc | 120 +++++++++++--- src/input/readers/Raw.h | 2 + .../out | 148 ------------------ .../out | 0 .../out | 0 .../out | 0 .../test.txt | 0 .../out | 0 .../out | 0 .../out | 27 ++++ .../out | 0 .../input/{raw.bro => raw/basic.bro} | 0 .../input/{executeraw.bro => raw/execute.bro} | 0 .../executestdin.bro} | 0 .../executestream.bro} | 0 .../frameworks/input/{ => raw}/rereadraw.bro | 0 .../stderr.bro} | 20 ++- .../frameworks/input/{ => raw}/streamraw.bro | 0 21 files changed, 166 insertions(+), 174 deletions(-) delete mode 100644 testing/btest/Baseline/scripts.base.frameworks.input.executestreamrawstderr/out rename testing/btest/Baseline/{scripts.base.frameworks.input.raw => scripts.base.frameworks.input.raw.basic}/out (100%) rename testing/btest/Baseline/{scripts.base.frameworks.input.executeraw => scripts.base.frameworks.input.raw.execute}/out (100%) rename testing/btest/Baseline/{scripts.base.frameworks.input.execrawstdin => scripts.base.frameworks.input.raw.executestdin}/out (100%) rename testing/btest/Baseline/{scripts.base.frameworks.input.execrawstdin => scripts.base.frameworks.input.raw.executestdin}/test.txt (100%) rename testing/btest/Baseline/{scripts.base.frameworks.input.executestreamraw => scripts.base.frameworks.input.raw.executestream}/out (100%) rename testing/btest/Baseline/{scripts.base.frameworks.input.rereadraw => scripts.base.frameworks.input.raw.rereadraw}/out (100%) create mode 100644 testing/btest/Baseline/scripts.base.frameworks.input.raw.stderr/out rename testing/btest/Baseline/{scripts.base.frameworks.input.streamraw => scripts.base.frameworks.input.raw.streamraw}/out (100%) rename testing/btest/scripts/base/frameworks/input/{raw.bro => raw/basic.bro} (100%) rename testing/btest/scripts/base/frameworks/input/{executeraw.bro => raw/execute.bro} (100%) rename testing/btest/scripts/base/frameworks/input/{execrawstdin.bro => raw/executestdin.bro} (100%) rename testing/btest/scripts/base/frameworks/input/{executestreamraw.bro => raw/executestream.bro} (100%) rename testing/btest/scripts/base/frameworks/input/{ => raw}/rereadraw.bro (100%) rename testing/btest/scripts/base/frameworks/input/{executestreamrawstderr.bro => raw/stderr.bro} (64%) rename testing/btest/scripts/base/frameworks/input/{ => raw}/streamraw.bro (100%) diff --git a/scripts/base/frameworks/input/readers/raw.bro b/scripts/base/frameworks/input/readers/raw.bro index 45deed3eda..ff49032b35 100644 --- a/scripts/base/frameworks/input/readers/raw.bro +++ b/scripts/base/frameworks/input/readers/raw.bro @@ -6,4 +6,12 @@ export { ## Separator between input records. ## Please note that the separator has to be exactly one character long const record_separator = "\n" &redef; + + ## Event that is called, when a process created by the raw reader exits. + ## + ## name: name of the input stream + ## source: source of the input stream + ## exit_code: exit code of the program, or number of the signal that forced the program to exit + ## signal_exit: false when program exitted normally, true when program was forced to exit by a signal + global process_finished: event(name: string, source:string, exit_code:count, signal_exit:bool); } diff --git a/src/input/Manager.cc b/src/input/Manager.cc index f5d0e2693c..33abd7d136 100644 --- a/src/input/Manager.cc +++ b/src/input/Manager.cc @@ -299,6 +299,7 @@ bool Manager::CreateStream(Stream* info, RecordVal* description) ReaderBackend::ReaderInfo* rinfo = new ReaderBackend::ReaderInfo(); rinfo->source = copy_string(source.c_str()); + rinfo->name = copy_string(name.c_str()); EnumVal* mode = description->LookupWithDefault(rtype->FieldOffset("mode"))->AsEnumVal(); switch ( mode->InternalInt() ) @@ -1175,6 +1176,9 @@ void Manager::EndCurrentSend(ReaderFrontend* reader) if ( i->stream_type == EVENT_STREAM ) { +#ifdef DEBUG + DBG_LOG(DBG_INPUT, "%s is event, sending end of data", i->name.c_str()); +#endif // just signal the end of the data source SendEndOfData(i); return; @@ -1281,6 +1285,10 @@ void Manager::SendEndOfData(ReaderFrontend* reader) void Manager::SendEndOfData(const Stream *i) { +#ifdef DEBUG + DBG_LOG(DBG_INPUT, "SendEndOfData for stream %s", + i->name.c_str()); +#endif SendEvent(end_of_data, 2, new StringVal(i->name.c_str()), new StringVal(i->info->source)); } diff --git a/src/input/ReaderBackend.h b/src/input/ReaderBackend.h index 73e5475db6..5419879e13 100644 --- a/src/input/ReaderBackend.h +++ b/src/input/ReaderBackend.h @@ -85,6 +85,11 @@ public: */ const char* source; + /** + * The name of the input stream. + */ + const char* name; + /** * A map of key/value pairs corresponding to the relevant * filter's "config" table. @@ -99,12 +104,14 @@ public: ReaderInfo() { source = 0; + name = 0; mode = MODE_NONE; } ReaderInfo(const ReaderInfo& other) { source = other.source ? copy_string(other.source) : 0; + name = other.name ? copy_string(other.name) : 0; mode = other.mode; for ( config_map::const_iterator i = other.config.begin(); i != other.config.end(); i++ ) diff --git a/src/input/readers/Raw.cc b/src/input/readers/Raw.cc index 850dda3a39..39d25912f8 100644 --- a/src/input/readers/Raw.cc +++ b/src/input/readers/Raw.cc @@ -6,6 +6,7 @@ #include "../../threading/SerialTypes.h" #include +#include #include #include #include @@ -23,6 +24,8 @@ const int Raw::block_size = 512; // how big do we expect our chunks of data to b Raw::Raw(ReaderFrontend *frontend) : ReaderBackend(frontend) { file = 0; + stderrfile = 0; + forcekill = false; separator.assign( (const char*) BifConst::InputRaw::record_separator->Bytes(), BifConst::InputRaw::record_separator->Len()); @@ -36,11 +39,6 @@ Raw::Raw(ReaderFrontend *frontend) : ReaderBackend(frontend) stdout_fileno = fileno(stdout); stderr_fileno = fileno(stderr); - // and because we later assume this... - assert(stdin_fileno == 0); - assert(stdout_fileno == 1); - assert(stderr_fileno == 2); - childpid = -1; stdin_towrite = 0; // by default do not open stdin @@ -58,9 +56,17 @@ void Raw::DoClose() CloseInput(); - if ( execute && childpid > 0 ) + if ( execute && childpid > 0 && kill(childpid, 0) == 0 ) + { // kill child process - kill(childpid, 9); // TERMINATOR + kill(childpid, 15); // sigterm + if ( forcekill ) + { + usleep(200); // 200 msecs should be enough for anyone ;) + if ( kill(childpid, 0) == 0 ) // perhaps it is already gone + kill(childpid, 9); // TERMINATE + } + } } bool Raw::Execute() @@ -112,22 +118,26 @@ bool Raw::Execute() if ( stdin_towrite ) { close(pipes[stdin_in]); - fcntl(pipes[stdin_out], F_SETFL, O_NONBLOCK); + fcntl(pipes[stdin_out], F_SETFL, O_NONBLOCK); // ya, just always set this to nonblocking. we do not want to block on a program receiving data. + // note that there is a small gotcha with it. More data is queued when more data is read from the program output. Hence, when having + // a program in mode_manual where the first write cannot write everything, the rest will be stuck in a queue that is never emptied. } if ( use_stderr ) { close(pipes[stderr_out]); - fcntl(pipes[stderr_in], F_SETFL, O_NONBLOCK); + fcntl(pipes[stderr_in], F_SETFL, O_NONBLOCK); // true for this too. } file = fdopen(pipes[stdout_in], "r"); - stderrfile = fdopen(pipes[stderr_in], "r"); - if ( file == 0 || (stderrfile == 0 && use_stderr) ) - { - Error("Could not convert fileno to file"); - return false; - } + + if ( use_stderr ) + stderrfile = fdopen(pipes[stderr_in], "r"); + if ( file == 0 || (stderrfile == 0 && use_stderr) ) + { + Error("Could not convert fileno to file"); + return false; + } return true; @@ -165,15 +175,19 @@ bool Raw::CloseInput() Debug(DBG_INPUT, "Raw reader starting close"); #endif + fclose(file); + + if ( use_stderr ) + fclose(stderrfile); + if ( execute ) // we do not care if any of those fails. They should all be defined. { for ( int i = 0; i < 6; i ++ ) close(pipes[i]); } - else - fclose(file); file = 0; + stderrfile = 0; #ifdef DEBUG Debug(DBG_INPUT, "Raw reader finished close"); @@ -219,11 +233,17 @@ bool Raw::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fie use_stderr = true; want_fields = 2; } + + it = info.config.find("force_kill"); // we want to be sure that our child is dead when we exit + if ( it != info.config.end() && execute ) + { + forcekill = true; + } if ( num_fields != want_fields ) { Error(Fmt("Filter for raw reader contains wrong number of fields -- got %d, expected %d. " - "Filters for the raw reader contain one field when used in normal mode and 2 fields when using execute mode with stderr capuring. " + "Filters for the raw reader contain one string field when used in normal mode and one string and one bool fields when using execute mode with stderr capuring. " "Filter ignored.", num_fields, want_fields)); return false; } @@ -236,6 +256,14 @@ bool Raw::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fie if ( use_stderr && fields[1]->type != TYPE_BOOL ) { Error("Second field for raw reader always has to be of type bool."); + return false; + } + + if ( execute && Info().mode == MODE_REREAD ) + { + // for execs this makes no sense - would have to execute each heartbeat? + Error("Rereading only supported for files, not for executables."); + return false; } @@ -353,8 +381,14 @@ void Raw::WriteToStdin() if ( stdin_towrite == 0 ) // send EOF when we are done. close(pipes[stdin_out]); + + if ( Info().mode == MODE_MANUAL && stdin_towrite != 0 ) + { + Error(Fmt("Could not write whole string to stdin of child process in one go. Please use STREAM mode to pass more data to child.")); + } } + // read the entire file and send appropriate thingies back to InputMgr bool Raw::DoUpdate() { @@ -366,6 +400,7 @@ bool Raw::DoUpdate() switch ( Info().mode ) { case MODE_REREAD: { + assert(childpid == -1); // mode may not be used to execute child programs // check if the file has changed struct stat sb; if ( stat(fname.c_str(), &sb) == -1 ) @@ -388,7 +423,6 @@ bool Raw::DoUpdate() case MODE_STREAM: if ( Info().mode == MODE_STREAM && file != 0 ) { - //fpurge(file); clearerr(file); // remove end of file evil bits break; } @@ -412,7 +446,7 @@ bool Raw::DoUpdate() WriteToStdin(); int64_t length = GetLine(file); - printf("Read %lld bytes\n", length); + //printf("Read %lld bytes\n", length); if ( length == -3 ) return false; @@ -444,7 +478,7 @@ bool Raw::DoUpdate() for ( ;; ) { int64_t length = GetLine(stderrfile); - printf("Read stderr %lld bytes\n", length); + //printf("Read stderr %lld bytes\n", length); if ( length == -3 ) return false; else if ( length == -2 || length == -1 ) @@ -464,6 +498,50 @@ bool Raw::DoUpdate() outbuf = 0; } + if ( ( Info().mode == MODE_MANUAL ) || ( Info().mode == MODE_REREAD ) ) + // done with the current data source :) + EndCurrentSend(); + + // and let's check if the child process is still alive + int return_code; + if ( waitpid(childpid, &return_code, WNOHANG) != 0 ) { + // child died :( + bool signal = false; + int code = 0; + if ( WIFEXITED(return_code) ) { + code = WEXITSTATUS(return_code); + if ( code != 0 ) + Error(Fmt("Child process exited with non-zero return code %d", code)); + } else if ( WIFSIGNALED(return_code) ) { + signal = false; + code = WTERMSIG(return_code); + Error(Fmt("Child process exited due to signal %d", code)); + } else { + assert(false); + } + + Value** vals = new Value*[4]; + vals[0] = new Value(TYPE_STRING, true); + vals[0]->val.string_val.data = copy_string(Info().name); + vals[0]->val.string_val.length = strlen(Info().name); + vals[1] = new Value(TYPE_STRING, true); + vals[1]->val.string_val.data = copy_string(Info().source); + vals[1]->val.string_val.length = strlen(Info().source); + vals[2] = new Value(TYPE_COUNT, true); + vals[2]->val.int_val = code; + vals[3] = new Value(TYPE_BOOL, true); + vals[3]->val.int_val = signal; + + // and in this case we can signal end_of_data even for the streaming reader + if ( Info().mode == MODE_STREAM ) + EndCurrentSend(); + + SendEvent("InputRaw::process_finished", 4, vals); + + } + + + #ifdef DEBUG Debug(DBG_INPUT, "DoUpdate finished successfully"); #endif diff --git a/src/input/readers/Raw.h b/src/input/readers/Raw.h index d79a80c31e..8ea03a70b4 100644 --- a/src/input/readers/Raw.h +++ b/src/input/readers/Raw.h @@ -58,6 +58,8 @@ private: bool use_stderr; + bool forcekill; + int pipes[6]; pid_t childpid; diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.executestreamrawstderr/out b/testing/btest/Baseline/scripts.base.frameworks.input.executestreamrawstderr/out deleted file mode 100644 index 55c4167ef8..0000000000 --- a/testing/btest/Baseline/scripts.base.frameworks.input.executestreamrawstderr/out +++ /dev/null @@ -1,148 +0,0 @@ -[source=ls .. ../nonexistant ../nonexistant2 ../nonexistant3 |, reader=Input::READER_RAW, mode=Input::MANUAL, name=input, fields=, want_record=F, ev=line -{ -print A::outfile, A::description; -print A::outfile, A::tpe; -print A::outfile, A::s; -print A::outfile, A::is_stderr; -A::try = A::try + 1; -if (7 == A::try) -{ -print A::outfile, done; -close(A::outfile); -Input::remove(input); -terminate(); -} - -}, config={ -[read_stderr] = 1 -}] -Input::EVENT_NEW -..: -F -[source=ls .. ../nonexistant ../nonexistant2 ../nonexistant3 |, reader=Input::READER_RAW, mode=Input::MANUAL, name=input, fields=, want_record=F, ev=line -{ -print A::outfile, A::description; -print A::outfile, A::tpe; -print A::outfile, A::s; -print A::outfile, A::is_stderr; -A::try = A::try + 1; -if (7 == A::try) -{ -print A::outfile, done; -close(A::outfile); -Input::remove(input); -terminate(); -} - -}, config={ -[read_stderr] = 1 -}] -Input::EVENT_NEW -bro -F -[source=ls .. ../nonexistant ../nonexistant2 ../nonexistant3 |, reader=Input::READER_RAW, mode=Input::MANUAL, name=input, fields=, want_record=F, ev=line -{ -print A::outfile, A::description; -print A::outfile, A::tpe; -print A::outfile, A::s; -print A::outfile, A::is_stderr; -A::try = A::try + 1; -if (7 == A::try) -{ -print A::outfile, done; -close(A::outfile); -Input::remove(input); -terminate(); -} - -}, config={ -[read_stderr] = 1 -}] -Input::EVENT_NEW -executestreamrawstderr.bro -F -[source=ls .. ../nonexistant ../nonexistant2 ../nonexistant3 |, reader=Input::READER_RAW, mode=Input::MANUAL, name=input, fields=, want_record=F, ev=line -{ -print A::outfile, A::description; -print A::outfile, A::tpe; -print A::outfile, A::s; -print A::outfile, A::is_stderr; -A::try = A::try + 1; -if (7 == A::try) -{ -print A::outfile, done; -close(A::outfile); -Input::remove(input); -terminate(); -} - -}, config={ -[read_stderr] = 1 -}] -Input::EVENT_NEW -out -F -[source=ls .. ../nonexistant ../nonexistant2 ../nonexistant3 |, reader=Input::READER_RAW, mode=Input::MANUAL, name=input, fields=, want_record=F, ev=line -{ -print A::outfile, A::description; -print A::outfile, A::tpe; -print A::outfile, A::s; -print A::outfile, A::is_stderr; -A::try = A::try + 1; -if (7 == A::try) -{ -print A::outfile, done; -close(A::outfile); -Input::remove(input); -terminate(); -} - -}, config={ -[read_stderr] = 1 -}] -Input::EVENT_NEW -ls: ../nonexistant: No such file or directory -T -[source=ls .. ../nonexistant ../nonexistant2 ../nonexistant3 |, reader=Input::READER_RAW, mode=Input::MANUAL, name=input, fields=, want_record=F, ev=line -{ -print A::outfile, A::description; -print A::outfile, A::tpe; -print A::outfile, A::s; -print A::outfile, A::is_stderr; -A::try = A::try + 1; -if (7 == A::try) -{ -print A::outfile, done; -close(A::outfile); -Input::remove(input); -terminate(); -} - -}, config={ -[read_stderr] = 1 -}] -Input::EVENT_NEW -ls: ../nonexistant2: No such file or directory -T -[source=ls .. ../nonexistant ../nonexistant2 ../nonexistant3 |, reader=Input::READER_RAW, mode=Input::MANUAL, name=input, fields=, want_record=F, ev=line -{ -print A::outfile, A::description; -print A::outfile, A::tpe; -print A::outfile, A::s; -print A::outfile, A::is_stderr; -A::try = A::try + 1; -if (7 == A::try) -{ -print A::outfile, done; -close(A::outfile); -Input::remove(input); -terminate(); -} - -}, config={ -[read_stderr] = 1 -}] -Input::EVENT_NEW -ls: ../nonexistant3: No such file or directory -T -done diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.raw/out b/testing/btest/Baseline/scripts.base.frameworks.input.raw.basic/out similarity index 100% rename from testing/btest/Baseline/scripts.base.frameworks.input.raw/out rename to testing/btest/Baseline/scripts.base.frameworks.input.raw.basic/out diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.executeraw/out b/testing/btest/Baseline/scripts.base.frameworks.input.raw.execute/out similarity index 100% rename from testing/btest/Baseline/scripts.base.frameworks.input.executeraw/out rename to testing/btest/Baseline/scripts.base.frameworks.input.raw.execute/out diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.execrawstdin/out b/testing/btest/Baseline/scripts.base.frameworks.input.raw.executestdin/out similarity index 100% rename from testing/btest/Baseline/scripts.base.frameworks.input.execrawstdin/out rename to testing/btest/Baseline/scripts.base.frameworks.input.raw.executestdin/out diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.execrawstdin/test.txt b/testing/btest/Baseline/scripts.base.frameworks.input.raw.executestdin/test.txt similarity index 100% rename from testing/btest/Baseline/scripts.base.frameworks.input.execrawstdin/test.txt rename to testing/btest/Baseline/scripts.base.frameworks.input.raw.executestdin/test.txt diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.executestreamraw/out b/testing/btest/Baseline/scripts.base.frameworks.input.raw.executestream/out similarity index 100% rename from testing/btest/Baseline/scripts.base.frameworks.input.executestreamraw/out rename to testing/btest/Baseline/scripts.base.frameworks.input.raw.executestream/out diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.rereadraw/out b/testing/btest/Baseline/scripts.base.frameworks.input.raw.rereadraw/out similarity index 100% rename from testing/btest/Baseline/scripts.base.frameworks.input.rereadraw/out rename to testing/btest/Baseline/scripts.base.frameworks.input.raw.rereadraw/out diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.raw.stderr/out b/testing/btest/Baseline/scripts.base.frameworks.input.raw.stderr/out new file mode 100644 index 0000000000..4900bc8ff8 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.input.raw.stderr/out @@ -0,0 +1,27 @@ +Process finished event +input +1 +Input::EVENT_NEW +..: +F +Input::EVENT_NEW +bro +F +Input::EVENT_NEW +out +F +Input::EVENT_NEW +stderr.bro +F +Input::EVENT_NEW +ls: ../nonexistant: No such file or directory +T +Input::EVENT_NEW +ls: ../nonexistant2: No such file or directory +T +Input::EVENT_NEW +ls: ../nonexistant3: No such file or directory +T +done +End of Data event +input diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.streamraw/out b/testing/btest/Baseline/scripts.base.frameworks.input.raw.streamraw/out similarity index 100% rename from testing/btest/Baseline/scripts.base.frameworks.input.streamraw/out rename to testing/btest/Baseline/scripts.base.frameworks.input.raw.streamraw/out diff --git a/testing/btest/scripts/base/frameworks/input/raw.bro b/testing/btest/scripts/base/frameworks/input/raw/basic.bro similarity index 100% rename from testing/btest/scripts/base/frameworks/input/raw.bro rename to testing/btest/scripts/base/frameworks/input/raw/basic.bro diff --git a/testing/btest/scripts/base/frameworks/input/executeraw.bro b/testing/btest/scripts/base/frameworks/input/raw/execute.bro similarity index 100% rename from testing/btest/scripts/base/frameworks/input/executeraw.bro rename to testing/btest/scripts/base/frameworks/input/raw/execute.bro diff --git a/testing/btest/scripts/base/frameworks/input/execrawstdin.bro b/testing/btest/scripts/base/frameworks/input/raw/executestdin.bro similarity index 100% rename from testing/btest/scripts/base/frameworks/input/execrawstdin.bro rename to testing/btest/scripts/base/frameworks/input/raw/executestdin.bro diff --git a/testing/btest/scripts/base/frameworks/input/executestreamraw.bro b/testing/btest/scripts/base/frameworks/input/raw/executestream.bro similarity index 100% rename from testing/btest/scripts/base/frameworks/input/executestreamraw.bro rename to testing/btest/scripts/base/frameworks/input/raw/executestream.bro diff --git a/testing/btest/scripts/base/frameworks/input/rereadraw.bro b/testing/btest/scripts/base/frameworks/input/raw/rereadraw.bro similarity index 100% rename from testing/btest/scripts/base/frameworks/input/rereadraw.bro rename to testing/btest/scripts/base/frameworks/input/raw/rereadraw.bro diff --git a/testing/btest/scripts/base/frameworks/input/executestreamrawstderr.bro b/testing/btest/scripts/base/frameworks/input/raw/stderr.bro similarity index 64% rename from testing/btest/scripts/base/frameworks/input/executestreamrawstderr.bro rename to testing/btest/scripts/base/frameworks/input/raw/stderr.bro index 7e7c640112..c85ee8b0ef 100644 --- a/testing/btest/scripts/base/frameworks/input/executestreamrawstderr.bro +++ b/testing/btest/scripts/base/frameworks/input/raw/stderr.bro @@ -4,8 +4,6 @@ redef exit_only_after_terminate = T; -module A; - type Val: record { s: string; is_stderr: bool; @@ -16,7 +14,6 @@ global outfile: file; event line(description: Input::EventDescription, tpe: Input::Event, s: string, is_stderr: bool) { - print outfile, description; print outfile, tpe; print outfile, s; print outfile, is_stderr; @@ -25,12 +22,25 @@ event line(description: Input::EventDescription, tpe: Input::Event, s: string, i if ( try == 7 ) { print outfile, "done"; - close(outfile); Input::remove("input"); - terminate(); } } +event Input::end_of_data(name: string, source:string) + { + print outfile, "End of Data event"; + print outfile, name; + terminate(); # due to the current design, end_of_data will be called after process_finshed and all line events. + # this could potentially change + } + +event InputRaw::process_finished(name: string, source:string, exit_code:count, signal_exit:bool) + { + print outfile, "Process finished event"; + print outfile, name; + print outfile, exit_code; + } + event bro_init() { diff --git a/testing/btest/scripts/base/frameworks/input/streamraw.bro b/testing/btest/scripts/base/frameworks/input/raw/streamraw.bro similarity index 100% rename from testing/btest/scripts/base/frameworks/input/streamraw.bro rename to testing/btest/scripts/base/frameworks/input/raw/streamraw.bro From fed914252c0d4e426069a57fd55cc4ba78113069 Mon Sep 17 00:00:00 2001 From: Bernhard Amann Date: Mon, 18 Mar 2013 22:11:13 -0700 Subject: [PATCH 07/14] and close only fds that are currently open (the logging framework really did not like that :) ) --- src/input/readers/Raw.cc | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/input/readers/Raw.cc b/src/input/readers/Raw.cc index 39d25912f8..12f66a9b39 100644 --- a/src/input/readers/Raw.cc +++ b/src/input/readers/Raw.cc @@ -111,6 +111,7 @@ bool Raw::Execute() { // we are the parent close(pipes[stdout_out]); + pipes[stdout_out] = -1; if ( Info().mode == MODE_STREAM ) fcntl(pipes[stdout_in], F_SETFL, O_NONBLOCK); @@ -118,6 +119,7 @@ bool Raw::Execute() if ( stdin_towrite ) { close(pipes[stdin_in]); + pipes[stdin_in] = -1; fcntl(pipes[stdin_out], F_SETFL, O_NONBLOCK); // ya, just always set this to nonblocking. we do not want to block on a program receiving data. // note that there is a small gotcha with it. More data is queued when more data is read from the program output. Hence, when having // a program in mode_manual where the first write cannot write everything, the rest will be stuck in a queue that is never emptied. @@ -126,13 +128,16 @@ bool Raw::Execute() if ( use_stderr ) { close(pipes[stderr_out]); + pipes[stderr_out] = -1; fcntl(pipes[stderr_in], F_SETFL, O_NONBLOCK); // true for this too. } file = fdopen(pipes[stdout_in], "r"); + pipes[stdout_in] = -1; // will be closed by fclose if ( use_stderr ) stderrfile = fdopen(pipes[stderr_in], "r"); + pipes[stderr_in] = -1; // will be closed by fclose if ( file == 0 || (stderrfile == 0 && use_stderr) ) { Error("Could not convert fileno to file"); @@ -183,8 +188,9 @@ bool Raw::CloseInput() if ( execute ) // we do not care if any of those fails. They should all be defined. { for ( int i = 0; i < 6; i ++ ) - close(pipes[i]); - } + if ( pipes[i] != -1 ) + close(pipes[i]); + } file = 0; stderrfile = 0; From d61973a92d112bed7c39baf826d591a791107684 Mon Sep 17 00:00:00 2001 From: Bernhard Amann Date: Mon, 27 May 2013 21:14:07 -0700 Subject: [PATCH 08/14] linux does not have strnstr --- src/input/readers/Raw.cc | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/src/input/readers/Raw.cc b/src/input/readers/Raw.cc index 12f66a9b39..3b79ca4bf2 100644 --- a/src/input/readers/Raw.cc +++ b/src/input/readers/Raw.cc @@ -291,11 +291,10 @@ bool Raw::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fie return true; } - int64_t Raw::GetLine(FILE* arg_file) { errno = 0; - uint64_t pos = 0; + int pos = 0; // strstr_n only works on ints - so no use to use something bigger here if ( buf == 0 ) buf = new char[block_size]; @@ -311,9 +310,12 @@ int64_t Raw::GetLine(FILE* arg_file) if ( pos == 0 && errno != 0 ) break; - char* token = strnstr(buf, separator.c_str(), block_size*repeats-pos); - - if ( token == 0 ) + // researching everything each time is a bit... cpu-intensive. But otherwhise we have + // to deal with situations where the separator is multi-character and split over multiple + // reads... + int found = strstr_n(pos, (unsigned char*) buf, separator.size(), (unsigned char*) separator.c_str()); + + if ( found == -1 ) { // we did not find it and have to search again in the next try. resize buffer.... // but first check if we encountered the file end - because if we did this was it. @@ -342,16 +344,15 @@ int64_t Raw::GetLine(FILE* arg_file) buf = new char[block_size]; - if ( token - outbuf < pos ) + if ( found < pos ) { // we have leftovers. copy them into the buffer for the next line buf = new char[block_size]; - memcpy(buf, token + sep_length, -(token - outbuf + sep_length) +pos); - bufpos = -(token - outbuf + sep_length) +pos; + memcpy(buf, buf + found + sep_length, pos - found - sep_length); + bufpos = pos - found - sep_length; } - pos = token-outbuf; - return pos; + return found; } } From 08656c976b7f0f5194c0bcbf6abba2eba2dbb6f2 Mon Sep 17 00:00:00 2001 From: Bernhard Amann Date: Mon, 27 May 2013 22:59:27 -0700 Subject: [PATCH 09/14] small fixes. --- src/input/readers/Raw.cc | 18 +++++---- src/input/readers/Raw.h | 2 +- .../out | 2 + .../base/frameworks/input/raw/long.bro | 37 +++++++++++++++++++ 4 files changed, 50 insertions(+), 9 deletions(-) create mode 100644 testing/btest/Baseline/scripts.base.frameworks.input.raw.long/out create mode 100644 testing/btest/scripts/base/frameworks/input/raw/long.bro diff --git a/src/input/readers/Raw.cc b/src/input/readers/Raw.cc index 3b79ca4bf2..435876ece1 100644 --- a/src/input/readers/Raw.cc +++ b/src/input/readers/Raw.cc @@ -18,7 +18,7 @@ using namespace input::reader; using threading::Value; using threading::Field; -const int Raw::block_size = 512; // how big do we expect our chunks of data to be... +const int Raw::block_size = 4096; // how big do we expect our chunks of data to be... Raw::Raw(ReaderFrontend *frontend) : ReaderBackend(frontend) @@ -102,7 +102,6 @@ bool Raw::Execute() dup2(pipes[stderr_out], stderr_fileno); } - //execv("/usr/bin/uname",test); execl("/bin/sh", "sh", "-c", fname.c_str(), NULL); fprintf(stderr, "Exec failed :(......\n"); exit(255); @@ -294,7 +293,8 @@ bool Raw::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fie int64_t Raw::GetLine(FILE* arg_file) { errno = 0; - int pos = 0; // strstr_n only works on ints - so no use to use something bigger here + int pos = 0; // strstr_n only works on ints - so no use to use something different here + int offset = 0; if ( buf == 0 ) buf = new char[block_size]; @@ -303,9 +303,10 @@ int64_t Raw::GetLine(FILE* arg_file) for (;;) { - size_t readbytes = fread(buf+bufpos, 1, block_size-bufpos, arg_file); + size_t readbytes = fread(buf+bufpos+offset, 1, block_size-bufpos, arg_file); pos += bufpos + readbytes; - bufpos = 0; // read full block size in next read... + //printf("Pos: %d\n", pos); + bufpos = offset = 0; // read full block size in next read... if ( pos == 0 && errno != 0 ) break; @@ -336,6 +337,7 @@ int64_t Raw::GetLine(FILE* arg_file) memcpy(newbuf, buf, block_size*(repeats-1)); delete buf; buf = newbuf; + offset = block_size*(repeats-1); } else { @@ -348,11 +350,11 @@ int64_t Raw::GetLine(FILE* arg_file) { // we have leftovers. copy them into the buffer for the next line buf = new char[block_size]; - memcpy(buf, buf + found + sep_length, pos - found - sep_length); + memcpy(buf, outbuf + found + sep_length, pos - found - sep_length); bufpos = pos - found - sep_length; } - return found; + return found; } } @@ -511,7 +513,7 @@ bool Raw::DoUpdate() // and let's check if the child process is still alive int return_code; - if ( waitpid(childpid, &return_code, WNOHANG) != 0 ) { + if ( childpid != -1 && waitpid(childpid, &return_code, WNOHANG) != 0 ) { // child died :( bool signal = false; int code = 0; diff --git a/src/input/readers/Raw.h b/src/input/readers/Raw.h index 8ea03a70b4..6dbae21002 100644 --- a/src/input/readers/Raw.h +++ b/src/input/readers/Raw.h @@ -45,7 +45,7 @@ private: unsigned int sep_length; // length of the separator static const int block_size; - uint64_t bufpos; + int bufpos; char* buf; char* outbuf; diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.raw.long/out b/testing/btest/Baseline/scripts.base.frameworks.input.raw.long/out new file mode 100644 index 0000000000..fac8e79c0b --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.input.raw.long/out @@ -0,0 +1,2 @@ +Input::EVENT_NEW +8193 diff --git a/testing/btest/scripts/base/frameworks/input/raw/long.bro b/testing/btest/scripts/base/frameworks/input/raw/long.bro new file mode 100644 index 0000000000..ac07639f77 --- /dev/null +++ b/testing/btest/scripts/base/frameworks/input/raw/long.bro @@ -0,0 +1,37 @@ +# @TEST-EXEC: dd if=/dev/zero of=input.log bs=8193 count=1 +# @TEST-EXEC: btest-bg-run bro bro -b %INPUT +# @TEST-EXEC: btest-bg-wait -k 5 +# @TEST-EXEC: btest-diff out +# +# this test should be longer than one block-size. to test behavior of input-reader if it has to re-allocate stuff. + +redef exit_only_after_terminate = T; + +global outfile: file; +global try: count; + +module A; + +type Val: record { + s: string; +}; + +event line(description: Input::EventDescription, tpe: Input::Event, s: string) + { + print outfile, tpe; + print outfile, |s|; + try = try + 1; + if ( try == 1 ) + { + close(outfile); + terminate(); + } + } + +event bro_init() + { + try = 0; + outfile = open("../out"); + Input::add_event([$source="../input.log", $reader=Input::READER_RAW, $mode=Input::STREAM, $name="input", $fields=Val, $ev=line, $want_record=F]); + Input::remove("input"); + } From f1745ff488df3b9e6c7b446576d1aff446d3f4d3 Mon Sep 17 00:00:00 2001 From: Bernhard Amann Date: Mon, 27 May 2013 23:07:37 -0700 Subject: [PATCH 10/14] fix stderr test. ls behaves differently on errors on linux... --- .../scripts.base.frameworks.input.raw.stderr/out | 8 ++++---- .../scripts/base/frameworks/input/raw/stderr.bro | 16 ++++++++++++++-- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.raw.stderr/out b/testing/btest/Baseline/scripts.base.frameworks.input.raw.stderr/out index 4900bc8ff8..e7ff580dfd 100644 --- a/testing/btest/Baseline/scripts.base.frameworks.input.raw.stderr/out +++ b/testing/btest/Baseline/scripts.base.frameworks.input.raw.stderr/out @@ -1,6 +1,6 @@ Process finished event input -1 +Exit code != 0 Input::EVENT_NEW ..: F @@ -14,13 +14,13 @@ Input::EVENT_NEW stderr.bro F Input::EVENT_NEW -ls: ../nonexistant: No such file or directory +stderr output contained nonexistant T Input::EVENT_NEW -ls: ../nonexistant2: No such file or directory +stderr output contained nonexistant T Input::EVENT_NEW -ls: ../nonexistant3: No such file or directory +stderr output contained nonexistant T done End of Data event diff --git a/testing/btest/scripts/base/frameworks/input/raw/stderr.bro b/testing/btest/scripts/base/frameworks/input/raw/stderr.bro index c85ee8b0ef..e84ed048cd 100644 --- a/testing/btest/scripts/base/frameworks/input/raw/stderr.bro +++ b/testing/btest/scripts/base/frameworks/input/raw/stderr.bro @@ -15,7 +15,18 @@ global outfile: file; event line(description: Input::EventDescription, tpe: Input::Event, s: string, is_stderr: bool) { print outfile, tpe; - print outfile, s; + if ( is_stderr ) + { + # work around localized error messages. and if some localization does not include the filename... well... that would be bad :) + if ( strstr(s, "nonexistant") > 0 ) + { + print outfile, "stderr output contained nonexistant"; + } + } + else + { + print outfile, s; + } print outfile, is_stderr; try = try + 1; @@ -38,7 +49,8 @@ event InputRaw::process_finished(name: string, source:string, exit_code:count, s { print outfile, "Process finished event"; print outfile, name; - print outfile, exit_code; + if ( exit_code != 0 ) + print outfile, "Exit code != 0"; } event bro_init() From a32bb59770e9d6355aaa83f224e4f1c21c518515 Mon Sep 17 00:00:00 2001 From: Bernhard Amann Date: Sat, 8 Jun 2013 05:57:56 -0700 Subject: [PATCH 11/14] fix warning. Update baseline of stderr test to what it should be. There still is a message ordering issue there (which is the last issue in the new Raw reader I know of). One message that sidesteps a bit of the usual processing does not always arrive at the correct time (meaning it pops up from the event queue too early). Even though it sidesteps a bit of the usual processing that should not happen in my opinion (which clearly does not matter). And I have not yet fully grasped how this can happen. --- src/input/readers/Raw.cc | 8 ++++---- .../Baseline/scripts.base.frameworks.input.raw.stderr/out | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/input/readers/Raw.cc b/src/input/readers/Raw.cc index 435876ece1..ab2cb8bd44 100644 --- a/src/input/readers/Raw.cc +++ b/src/input/readers/Raw.cc @@ -359,9 +359,7 @@ int64_t Raw::GetLine(FILE* arg_file) } - if ( errno == 0 ) { - assert(false); - } else if ( errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR ) { + if ( errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR ) { return -2; } else { // an error code we did no expect. This probably is bad. @@ -369,6 +367,9 @@ int64_t Raw::GetLine(FILE* arg_file) return -3; } + InternalError("Internal control flow execution"); + assert(false); + } // write to the stdin of the child process @@ -546,7 +547,6 @@ bool Raw::DoUpdate() EndCurrentSend(); SendEvent("InputRaw::process_finished", 4, vals); - } diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.raw.stderr/out b/testing/btest/Baseline/scripts.base.frameworks.input.raw.stderr/out index e7ff580dfd..b7f857339d 100644 --- a/testing/btest/Baseline/scripts.base.frameworks.input.raw.stderr/out +++ b/testing/btest/Baseline/scripts.base.frameworks.input.raw.stderr/out @@ -1,6 +1,3 @@ -Process finished event -input -Exit code != 0 Input::EVENT_NEW ..: F @@ -25,3 +22,6 @@ T done End of Data event input +Process finished event +input +Exit code != 0 From 3517c0ba992bb7333551489ec2790126120f525d Mon Sep 17 00:00:00 2001 From: Bernhard Amann Date: Sun, 9 Jun 2013 08:27:08 -0400 Subject: [PATCH 12/14] add Terminate to input framework to prevent potential shutdown race-conditions. --- src/input/Manager.cc | 16 ++++++++++++++++ src/input/Manager.h | 5 +++++ src/main.cc | 1 + 3 files changed, 22 insertions(+) diff --git a/src/input/Manager.cc b/src/input/Manager.cc index 495df682fa..ac6946318d 100644 --- a/src/input/Manager.cc +++ b/src/input/Manager.cc @@ -1289,6 +1289,7 @@ void Manager::SendEndOfData(ReaderFrontend* reader) SendEndOfData(i); } + void Manager::SendEndOfData(const Stream *i) { #ifdef DEBUG @@ -2174,3 +2175,18 @@ Manager::Stream* Manager::FindStream(ReaderFrontend* reader) return 0; } + +// function is called on Bro shutdown. +// sinal all frontends that they will cease operation. +void Manager::Terminate() + { + for (map::iterator i = readers.begin(); i != readers.end(); ++i ) + { + if ( i->second->removed ) + continue; + + i->second->removed = true; + i->second->reader->Stop(); + } + + } diff --git a/src/input/Manager.h b/src/input/Manager.h index 633b20f8ed..0b633cca95 100644 --- a/src/input/Manager.h +++ b/src/input/Manager.h @@ -79,6 +79,11 @@ public: */ bool RemoveStream(const string &id); + /** + * Signals the manager to shutdown at Bro's termination. + */ + void Terminate(); + protected: friend class ReaderFrontend; friend class PutMessage; diff --git a/src/main.cc b/src/main.cc index 491f8a732d..90dca2d48c 100644 --- a/src/main.cc +++ b/src/main.cc @@ -357,6 +357,7 @@ void terminate_bro() file_mgr->Terminate(); log_mgr->Terminate(); + input_mgr->Terminate(); thread_mgr->Terminate(); mgr.Drain(); From 655187a4f4a24b7e27c2bc4db6bbd9658ab6bdc1 Mon Sep 17 00:00:00 2001 From: Bernhard Amann Date: Sun, 9 Jun 2013 08:43:17 -0400 Subject: [PATCH 13/14] ...and fix the event ordering issue. Dispatch != QueueEvent --- src/input/Manager.cc | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/src/input/Manager.cc b/src/input/Manager.cc index ac6946318d..8dd578d40f 100644 --- a/src/input/Manager.cc +++ b/src/input/Manager.cc @@ -1308,6 +1308,11 @@ void Manager::Put(ReaderFrontend* reader, Value* *vals) return; } +#ifdef DEBUG + DBG_LOG(DBG_INPUT, "Put for stream %s", + i->name.c_str()); +#endif + int readFields = 0; if ( i->stream_type == TABLE_STREAM ) @@ -1631,6 +1636,11 @@ bool Manager::SendEvent(const string& name, const int num_vals, Value* *vals) return false; } +#ifdef DEBUG + DBG_LOG(DBG_INPUT, "SendEvent for event %s with num_vals vals", + name.c_str(), num_vals); +#endif + RecordType *type = handler->FType()->Args(); int num_event_vals = type->NumFields(); if ( num_vals != num_event_vals ) @@ -1643,7 +1653,8 @@ bool Manager::SendEvent(const string& name, const int num_vals, Value* *vals) for ( int i = 0; i < num_vals; i++) vl->append(ValueToVal(vals[i], type->FieldType(i))); - mgr.Dispatch(new Event(handler, vl)); + //mgr.Dispatch(new Event(handler, vl)); + mgr.QueueEvent(handler, vl, SOURCE_LOCAL); for ( int i = 0; i < num_vals; i++ ) delete vals[i]; @@ -1657,6 +1668,11 @@ void Manager::SendEvent(EventHandlerPtr ev, const int numvals, ...) { val_list* vl = new val_list; +#ifdef DEBUG + DBG_LOG(DBG_INPUT, "SendEvent with %d vals", + numvals); +#endif + va_list lP; va_start(lP, numvals); for ( int i = 0; i < numvals; i++ ) @@ -1671,6 +1687,11 @@ void Manager::SendEvent(EventHandlerPtr ev, list events) { val_list* vl = new val_list; +#ifdef DEBUG + DBG_LOG(DBG_INPUT, "SendEvent with %d vals (list)", + events.size()); +#endif + for ( list::iterator i = events.begin(); i != events.end(); i++ ) { vl->append( *i ); From ebb7af1483891ccb2dda66919d365682b3675727 Mon Sep 17 00:00:00 2001 From: Bernhard Amann Date: Sun, 9 Jun 2013 16:18:17 -0400 Subject: [PATCH 14/14] this event handler fails the unused-event-handlers test because it is a bit of a special case. It is only called via the SendEvent function from a reader. The reader does (at least with the current interface) however not provide the function pointer, but looks up the name of the event dynamically. Hence, internal_handler is never called for the event. Even if resolving the event in the reader, e.g. in an initialization function, this would not solve the issue - the initialization function is only called when the first Raw reader is initialized - and in the base configuration the raw reader will never be used (hence, internal_handler also won't be called). Calling it once in the manager seems like a really dirty hack. So - now it is the second exception in the testcase, unless anyone has a better idea :) --- testing/btest/Baseline/core.check-unused-event-handlers/.stderr | 1 + 1 file changed, 1 insertion(+) diff --git a/testing/btest/Baseline/core.check-unused-event-handlers/.stderr b/testing/btest/Baseline/core.check-unused-event-handlers/.stderr index 8d8bf1a85b..1a32ad442c 100644 --- a/testing/btest/Baseline/core.check-unused-event-handlers/.stderr +++ b/testing/btest/Baseline/core.check-unused-event-handlers/.stderr @@ -1 +1,2 @@ warning in , line 1: event handler never invoked: this_is_never_used +warning in , line 1: event handler never invoked: InputRaw::process_finished