diff --git a/CHANGES b/CHANGES index ebf38a74a0..f1039ada53 100644 --- a/CHANGES +++ b/CHANGES @@ -1,4 +1,19 @@ +2.1-780 | 2013-07-03 16:46:26 -0700 + + * Rewrite of the RAW input reader for improved robustness and new + features. (Bernhard Amann) This includes: + + - Send "end_of_data" event for all kind of streams. + - Send "process_finished" event with exit code of child + process at process termination. + - Expose name of input stream to readers. + - Better error handling. + - New "force_kill" option which SIGKILLs processes on reader termination. + - Supports reading from stdout and stderr simultaneously. + - Support sending data to stdin of child process. + - Streaming reads from external commands work without blocking. + 2.1-762 | 2013-07-03 16:33:22 -0700 * Fix to correct support for TLS 1.2. Addresses #1020. (Seth Hall, diff --git a/NEWS b/NEWS index f0d302156b..be0ddffaa2 100644 --- a/NEWS +++ b/NEWS @@ -73,10 +73,12 @@ New Functionality script file name without path, respectively. (Jon Siwek) - The new file analysis framework moves most of the processing of file - content from script-land into the core, where it belongs. Much of - this is an internal change, the framework comes with the following - user-visibible functionality (some of that was already available - before, but done differently): + content from script-land into the core, where it belongs. See + doc/file-analysis.rst for more information. + + Much of this is an internal change, but the framework also comes + with the following user-visibible functionality (some of that was + already available before, but done differently): [TODO: This will probably change with further script updates.] diff --git a/VERSION b/VERSION index 726b2d9d3d..ee5173cb0f 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.1-762 +2.1-780 diff --git a/scripts/base/frameworks/input/readers/raw.bro b/scripts/base/frameworks/input/readers/raw.bro index 45deed3eda..9fa7d1a2fb 100644 --- a/scripts/base/frameworks/input/readers/raw.bro +++ b/scripts/base/frameworks/input/readers/raw.bro @@ -6,4 +6,12 @@ export { ## Separator between input records. ## Please note that the separator has to be exactly one character long const record_separator = "\n" &redef; + + ## Event that is called when a process created by the raw reader exits. + ## + ## name: name of the input stream + ## source: source of the input stream + ## exit_code: exit code of the program, or number of the signal that forced the program to exit + ## signal_exit: false when program exitted normally, true when program was forced to exit by a signal + global process_finished: event(name: string, source:string, exit_code:count, signal_exit:bool); } diff --git a/src/input/Manager.cc b/src/input/Manager.cc index 2e0473f961..d467e32005 100644 --- a/src/input/Manager.cc +++ b/src/input/Manager.cc @@ -320,6 +320,7 @@ bool Manager::CreateStream(Stream* info, RecordVal* description) ReaderBackend::ReaderInfo* rinfo = new ReaderBackend::ReaderInfo(); rinfo->source = copy_string(source.c_str()); + rinfo->name = copy_string(name.c_str()); EnumVal* mode = description->LookupWithDefault(rtype->FieldOffset("mode"))->AsEnumVal(); switch ( mode->InternalInt() ) @@ -1241,6 +1242,9 @@ void Manager::EndCurrentSend(ReaderFrontend* reader) if ( i->stream_type != TABLE_STREAM ) { +#ifdef DEBUG + DBG_LOG(DBG_INPUT, "%s is event, sending end of data", i->name.c_str()); +#endif // just signal the end of the data source SendEndOfData(i); return; @@ -1345,8 +1349,13 @@ void Manager::SendEndOfData(ReaderFrontend* reader) SendEndOfData(i); } + void Manager::SendEndOfData(const Stream *i) { +#ifdef DEBUG + DBG_LOG(DBG_INPUT, "SendEndOfData for stream %s", + i->name.c_str()); +#endif SendEvent(end_of_data, 2, new StringVal(i->name.c_str()), new StringVal(i->info->source)); if ( i->stream_type == ANALYSIS_STREAM ) @@ -1362,6 +1371,11 @@ void Manager::Put(ReaderFrontend* reader, Value* *vals) return; } +#ifdef DEBUG + DBG_LOG(DBG_INPUT, "Put for stream %s", + i->name.c_str()); +#endif + int readFields = 0; if ( i->stream_type == TABLE_STREAM ) @@ -1700,6 +1714,11 @@ bool Manager::SendEvent(const string& name, const int num_vals, Value* *vals) return false; } +#ifdef DEBUG + DBG_LOG(DBG_INPUT, "SendEvent for event %s with num_vals vals", + name.c_str(), num_vals); +#endif + RecordType *type = handler->FType()->Args(); int num_event_vals = type->NumFields(); if ( num_vals != num_event_vals ) @@ -1712,7 +1731,7 @@ bool Manager::SendEvent(const string& name, const int num_vals, Value* *vals) for ( int i = 0; i < num_vals; i++) vl->append(ValueToVal(vals[i], type->FieldType(i))); - mgr.Dispatch(new Event(handler, vl)); + mgr.QueueEvent(handler, vl, SOURCE_LOCAL); for ( int i = 0; i < num_vals; i++ ) delete vals[i]; @@ -1726,6 +1745,11 @@ void Manager::SendEvent(EventHandlerPtr ev, const int numvals, ...) { val_list* vl = new val_list; +#ifdef DEBUG + DBG_LOG(DBG_INPUT, "SendEvent with %d vals", + numvals); +#endif + va_list lP; va_start(lP, numvals); for ( int i = 0; i < numvals; i++ ) @@ -1740,6 +1764,11 @@ void Manager::SendEvent(EventHandlerPtr ev, list events) { val_list* vl = new val_list; +#ifdef DEBUG + DBG_LOG(DBG_INPUT, "SendEvent with %d vals (list)", + events.size()); +#endif + for ( list::iterator i = events.begin(); i != events.end(); i++ ) { vl->append( *i ); @@ -2244,3 +2273,18 @@ Manager::Stream* Manager::FindStream(ReaderFrontend* reader) return 0; } + +// Function is called on Bro shutdown. +// Signal all frontends that they will cease operation. +void Manager::Terminate() + { + for ( map::iterator i = readers.begin(); i != readers.end(); ++i ) + { + if ( i->second->removed ) + continue; + + i->second->removed = true; + i->second->reader->Stop(); + } + + } diff --git a/src/input/Manager.h b/src/input/Manager.h index a1fbb94313..8156ed5248 100644 --- a/src/input/Manager.h +++ b/src/input/Manager.h @@ -91,6 +91,11 @@ public: */ bool RemoveStream(const string &id); + /** + * Signals the manager to shutdown at Bro's termination. + */ + void Terminate(); + protected: friend class ReaderFrontend; friend class PutMessage; diff --git a/src/input/ReaderBackend.h b/src/input/ReaderBackend.h index 73e5475db6..5419879e13 100644 --- a/src/input/ReaderBackend.h +++ b/src/input/ReaderBackend.h @@ -85,6 +85,11 @@ public: */ const char* source; + /** + * The name of the input stream. + */ + const char* name; + /** * A map of key/value pairs corresponding to the relevant * filter's "config" table. @@ -99,12 +104,14 @@ public: ReaderInfo() { source = 0; + name = 0; mode = MODE_NONE; } ReaderInfo(const ReaderInfo& other) { source = other.source ? copy_string(other.source) : 0; + name = other.name ? copy_string(other.name) : 0; mode = other.mode; for ( config_map::const_iterator i = other.config.begin(); i != other.config.end(); i++ ) diff --git a/src/input/fdstream.h b/src/input/fdstream.h deleted file mode 100644 index cda767dd52..0000000000 --- a/src/input/fdstream.h +++ /dev/null @@ -1,189 +0,0 @@ -/* The following code declares classes to read from and write to - * file descriptore or file handles. - * - * See - * http://www.josuttis.com/cppcode - * for details and the latest version. - * - * - open: - * - integrating BUFSIZ on some systems? - * - optimized reading of multiple characters - * - stream for reading AND writing - * - i18n - * - * (C) Copyright Nicolai M. Josuttis 2001. - * Permission to copy, use, modify, sell and distribute this software - * is granted provided this copyright notice appears in all copies. - * This software is provided "as is" without express or implied - * warranty, and with no claim as to its suitability for any purpose. - * - * Version: Jul 28, 2002 - * History: - * Jul 28, 2002: bugfix memcpy() => memmove() - * fdinbuf::underflow(): cast for return statements - * Aug 05, 2001: first public version - */ -#ifndef BOOST_FDSTREAM_HPP -#define BOOST_FDSTREAM_HPP - -#include -#include -#include -// for EOF: -#include -// for memmove(): -#include - - - -// low-level read and write functions -#ifdef _MSC_VER -# include -#else -# include -# include -//extern "C" { -// int write (int fd, const char* buf, int num); -// int read (int fd, char* buf, int num); -//} -#endif - - -// BEGIN namespace BOOST -namespace boost { - - -/************************************************************ - * fdostream - * - a stream that writes on a file descriptor - ************************************************************/ - - -class fdoutbuf : public std::streambuf { - protected: - int fd; // file descriptor - public: - // constructor - fdoutbuf (int _fd) : fd(_fd) { - } - protected: - // write one character - virtual int_type overflow (int_type c) { - if (c != EOF) { - char z = c; - if (write (fd, &z, 1) != 1) { - return EOF; - } - } - return c; - } - // write multiple characters - virtual - std::streamsize xsputn (const char* s, - std::streamsize num) { - return write(fd,s,num); - } -}; - -class fdostream : public std::ostream { - protected: - fdoutbuf buf; - public: - fdostream (int fd) : std::ostream(0), buf(fd) { - rdbuf(&buf); - } -}; - - -/************************************************************ - * fdistream - * - a stream that reads on a file descriptor - ************************************************************/ - -class fdinbuf : public std::streambuf { - protected: - int fd; // file descriptor - protected: - /* data buffer: - * - at most, pbSize characters in putback area plus - * - at most, bufSize characters in ordinary read buffer - */ - static const int pbSize = 4; // size of putback area - static const int bufSize = 1024; // size of the data buffer - char buffer[bufSize+pbSize]; // data buffer - - public: - /* constructor - * - initialize file descriptor - * - initialize empty data buffer - * - no putback area - * => force underflow() - */ - fdinbuf (int _fd) : fd(_fd) { - setg (buffer+pbSize, // beginning of putback area - buffer+pbSize, // read position - buffer+pbSize); // end position - } - - protected: - // insert new characters into the buffer - virtual int_type underflow () { -#ifndef _MSC_VER - using std::memmove; -#endif - - // is read position before end of buffer? - if (gptr() < egptr()) { - return traits_type::to_int_type(*gptr()); - } - - /* process size of putback area - * - use number of characters read - * - but at most size of putback area - */ - int numPutback; - numPutback = gptr() - eback(); - if (numPutback > pbSize) { - numPutback = pbSize; - } - - /* copy up to pbSize characters previously read into - * the putback area - */ - memmove (buffer+(pbSize-numPutback), gptr()-numPutback, - numPutback); - - // read at most bufSize new characters - int num; - num = read (fd, buffer+pbSize, bufSize); - if ( num == EAGAIN ) { - return 0; - } - if (num <= 0) { - // ERROR or EOF - return EOF; - } - - // reset buffer pointers - setg (buffer+(pbSize-numPutback), // beginning of putback area - buffer+pbSize, // read position - buffer+pbSize+num); // end of buffer - - // return next character - return traits_type::to_int_type(*gptr()); - } -}; - -class fdistream : public std::istream { - protected: - fdinbuf buf; - public: - fdistream (int fd) : std::istream(0), buf(fd) { - rdbuf(&buf); - } -}; - - -} // END namespace boost - -#endif /*BOOST_FDSTREAM_HPP*/ diff --git a/src/input/readers/Raw.cc b/src/input/readers/Raw.cc index ac96e5c0f5..46cb3656a3 100644 --- a/src/input/readers/Raw.cc +++ b/src/input/readers/Raw.cc @@ -3,32 +3,46 @@ #include "Raw.h" #include "NetVar.h" -#include -#include - #include "../../threading/SerialTypes.h" -#include "../fdstream.h" #include +#include #include #include #include #include +#include +#include using namespace input::reader; using threading::Value; using threading::Field; +const int Raw::block_size = 4096; // how big do we expect our chunks of data to be. + + Raw::Raw(ReaderFrontend *frontend) : ReaderBackend(frontend) { file = 0; - in = 0; - + stderrfile = 0; + forcekill = false; separator.assign( (const char*) BifConst::InputRaw::record_separator->Bytes(), BifConst::InputRaw::record_separator->Len()); - if ( separator.size() != 1 ) - Error("separator length has to be 1. Separator will be truncated."); + sep_length = BifConst::InputRaw::record_separator->Len(); + + buf = 0; + outbuf = 0; + bufpos = 0; + + stdin_fileno = fileno(stdin); + stdout_fileno = fileno(stdout); + stderr_fileno = fileno(stderr); + + childpid = -1; + + stdin_towrite = 0; // by default do not open stdin + use_stderr = false; } Raw::~Raw() @@ -40,41 +54,123 @@ void Raw::DoClose() { if ( file != 0 ) CloseInput(); + + if ( execute && childpid > 0 && kill(childpid, 0) == 0 ) + { + // kill child process + kill(childpid, 15); // sigterm + + if ( forcekill ) + { + usleep(200); // 200 msecs should be enough for anyone ;) + + if ( kill(childpid, 0) == 0 ) // perhaps it is already gone + kill(childpid, 9); // TERMINATE + } + } + } + +bool Raw::Execute() + { + if ( pipe(pipes) != 0 || pipe(pipes+2) || pipe(pipes+4) ) + { + Error(Fmt("Could not open pipe: %d", errno)); + return false; + } + + childpid = fork(); + if ( childpid < 0 ) + { + Error(Fmt("Could not create child process: %d", errno)); + return false; + } + + else if ( childpid == 0 ) + { + // we are the child. + close(pipes[stdout_in]); + dup2(pipes[stdout_out], stdout_fileno); + + if ( stdin_towrite ) + { + close(pipes[stdin_out]); + dup2(pipes[stdin_in], stdin_fileno); + } + + if ( use_stderr ) + { + close(pipes[stderr_in]); + dup2(pipes[stderr_out], stderr_fileno); + } + + execl("/bin/sh", "sh", "-c", fname.c_str(), NULL); + fprintf(stderr, "Exec failed :(......\n"); + exit(255); + } + else + { + // we are the parent + close(pipes[stdout_out]); + pipes[stdout_out] = -1; + + if ( Info().mode == MODE_STREAM ) + fcntl(pipes[stdout_in], F_SETFL, O_NONBLOCK); + + if ( stdin_towrite ) + { + close(pipes[stdin_in]); + pipes[stdin_in] = -1; + fcntl(pipes[stdin_out], F_SETFL, O_NONBLOCK); // ya, just always set this to nonblocking. we do not want to block on a program receiving data. + // note that there is a small gotcha with it. More data is queued when more data is read from the program output. Hence, when having + // a program in mode_manual where the first write cannot write everything, the rest will be stuck in a queue that is never emptied. + } + + if ( use_stderr ) + { + close(pipes[stderr_out]); + pipes[stderr_out] = -1; + fcntl(pipes[stderr_in], F_SETFL, O_NONBLOCK); // true for this too. + } + + file = fdopen(pipes[stdout_in], "r"); + pipes[stdout_in] = -1; // will be closed by fclose + + if ( use_stderr ) + stderrfile = fdopen(pipes[stderr_in], "r"); + pipes[stderr_in] = -1; // will be closed by fclose + if ( file == 0 || (stderrfile == 0 && use_stderr) ) + { + Error("Could not convert fileno to file"); + return false; + } + + + return true; + } } bool Raw::OpenInput() { if ( execute ) - { - file = popen(fname.c_str(), "r"); - if ( file == NULL ) - { - Error(Fmt("Could not execute command %s", fname.c_str())); - return false; - } - } + return Execute(); + else { file = fopen(fname.c_str(), "r"); - if ( file == NULL ) + fcntl(fileno(file), F_SETFD, FD_CLOEXEC); + if ( ! file ) { Error(Fmt("Init: cannot open %s", fname.c_str())); return false; } } - // This is defined in input/fdstream.h - in = new boost::fdistream(fileno(file)); - - if ( execute && Info().mode == MODE_STREAM ) - fcntl(fileno(file), F_SETFL, O_NONBLOCK); - return true; } bool Raw::CloseInput() { - if ( file == NULL ) + if ( file == 0 ) { InternalError(Fmt("Trying to close closed file for stream %s", fname.c_str())); return false; @@ -83,15 +179,20 @@ bool Raw::CloseInput() Debug(DBG_INPUT, "Raw reader starting close"); #endif - delete in; + fclose(file); - if ( execute ) - pclose(file); - else - fclose(file); + if ( use_stderr ) + fclose(stderrfile); - in = NULL; - file = NULL; + if ( execute ) // we do not care if any of those fails. They should all be defined. + { + for ( int i = 0; i < 6; i ++ ) + if ( pipes[i] != -1 ) + close(pipes[i]); + } + + file = 0; + stderrfile = 0; #ifdef DEBUG Debug(DBG_INPUT, "Raw reader finished close"); @@ -106,28 +207,9 @@ bool Raw::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fie mtime = 0; execute = false; firstrun = true; + int want_fields = 1; bool result; - if ( ! info.source || strlen(info.source) == 0 ) - { - Error("No source path provided"); - return false; - } - - if ( num_fields != 1 ) - { - Error("Filter for raw reader contains more than one field. " - "Filters for the raw reader may only contain exactly one string field. " - "Filter ignored."); - return false; - } - - if ( fields[0]->type != TYPE_STRING ) - { - Error("Filter for raw reader contains a field that is not of type string."); - return false; - } - // do Initialization string source = string(info.source); char last = info.source[source.length() - 1]; @@ -135,23 +217,63 @@ bool Raw::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fie { execute = true; fname = source.substr(0, fname.length() - 1); - - if ( (info.mode != MODE_MANUAL) ) - { - Error(Fmt("Unsupported read mode %d for source %s in execution mode", - info.mode, fname.c_str())); - return false; - } - - result = OpenInput(); - } - else + + if ( ! info.source || strlen(info.source) == 0 ) { - execute = false; - result = OpenInput(); + Error("No source path provided"); + return false; } + map::const_iterator it = info.config.find("stdin"); // data that is sent to the child process + if ( it != info.config.end() ) + { + stdin_string = it->second; + stdin_towrite = stdin_string.length(); + } + + it = info.config.find("read_stderr"); // we want to read stderr + if ( it != info.config.end() && execute ) + { + use_stderr = true; + want_fields = 2; + } + + it = info.config.find("force_kill"); // we want to be sure that our child is dead when we exit + if ( it != info.config.end() && execute ) + { + forcekill = true; + } + + if ( num_fields != want_fields ) + { + Error(Fmt("Filter for raw reader contains wrong number of fields -- got %d, expected %d. " + "Filters for the raw reader contain one string field when used in normal mode and one string and one bool fields when using execute mode with stderr capuring. " + "Filter ignored.", num_fields, want_fields)); + return false; + } + + if ( fields[0]->type != TYPE_STRING ) + { + Error("First field for raw reader always has to be of type string."); + return false; + } + if ( use_stderr && fields[1]->type != TYPE_BOOL ) + { + Error("Second field for raw reader always has to be of type bool."); + return false; + } + + if ( execute && Info().mode == MODE_REREAD ) + { + // for execs this makes no sense - would have to execute each heartbeat? + Error("Rereading only supported for files, not for executables."); + return false; + } + + + result = OpenInput(); + if ( result == false ) return result; @@ -168,18 +290,115 @@ bool Raw::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fie return true; } - -bool Raw::GetLine(string& str) +int64_t Raw::GetLine(FILE* arg_file) { - if ( in->peek() == std::iostream::traits_type::eof() ) - return false; + errno = 0; + int pos = 0; // strstr_n only works on ints - so no use to use something different here + int offset = 0; - if ( in->eofbit == true || in->failbit == true ) - return false; + if ( buf == 0 ) + buf = new char[block_size]; - return getline(*in, str, separator[0]); + int repeats = 1; + + for (;;) + { + size_t readbytes = fread(buf+bufpos+offset, 1, block_size-bufpos, arg_file); + pos += bufpos + readbytes; + //printf("Pos: %d\n", pos); + bufpos = offset = 0; // read full block size in next read... + + if ( pos == 0 && errno != 0 ) + break; + + // researching everything each time is a bit... cpu-intensive. But otherwhise we have + // to deal with situations where the separator is multi-character and split over multiple + // reads... + int found = strstr_n(pos, (unsigned char*) buf, separator.size(), (unsigned char*) separator.c_str()); + + if ( found == -1 ) + { + // we did not find it and have to search again in the next try. resize buffer.... + // but first check if we encountered the file end - because if we did this was it. + if ( feof(arg_file) != 0 ) + { + outbuf = buf; + buf = 0; + if ( pos == 0 ) + return -1; // signal EOF - and that we had no more data. + else + return pos; + } + + repeats++; + // bah, we cannot use realloc because we would have to change the delete in the manager to a free. + char * newbuf = new char[block_size*repeats]; + memcpy(newbuf, buf, block_size*(repeats-1)); + delete buf; + buf = newbuf; + offset = block_size*(repeats-1); + } + else + { + outbuf = buf; + buf = 0; + buf = new char[block_size]; + + + if ( found < pos ) + { + // we have leftovers. copy them into the buffer for the next line + buf = new char[block_size]; + memcpy(buf, outbuf + found + sep_length, pos - found - sep_length); + bufpos = pos - found - sep_length; + } + + return found; + } + + } + + if ( errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR ) + return -2; + + else + { + // an error code we did no expect. This probably is bad. + Error(Fmt("Reader encountered unexpected error code %d", errno)); + return -3; + } + + InternalError("Internal control flow execution"); + assert(false); } +// write to the stdin of the child process +void Raw::WriteToStdin() + { + assert(stdin_towrite <= stdin_string.length()); + uint64_t pos = stdin_string.length() - stdin_towrite; + + errno = 0; + ssize_t written = write(pipes[stdin_out], stdin_string.c_str() + pos, stdin_towrite); + stdin_towrite -= written; + + if ( errno != 0 && errno != EAGAIN && errno != EWOULDBLOCK ) + { + Error(Fmt("Writing to child process stdin failed: %d. Stopping writing at position %d", errno, pos)); + stdin_towrite = 0; + close(pipes[stdin_out]); + } + + if ( stdin_towrite == 0 ) // send EOF when we are done. + close(pipes[stdin_out]); + + if ( Info().mode == MODE_MANUAL && stdin_towrite != 0 ) + { + Error(Fmt("Could not write whole string to stdin of child process in one go. Please use STREAM mode to pass more data to child.")); + } + } + + // read the entire file and send appropriate thingies back to InputMgr bool Raw::DoUpdate() { @@ -191,6 +410,7 @@ bool Raw::DoUpdate() switch ( Info().mode ) { case MODE_REREAD: { + assert(childpid == -1); // mode may not be used to execute child programs // check if the file has changed struct stat sb; if ( stat(fname.c_str(), &sb) == -1 ) @@ -211,10 +431,9 @@ bool Raw::DoUpdate() case MODE_MANUAL: case MODE_STREAM: - if ( Info().mode == MODE_STREAM && file != NULL && in != NULL ) + if ( Info().mode == MODE_STREAM && file != 0 ) { - //fpurge(file); - in->clear(); // remove end of file evil bits + clearerr(file); // remove end of file evil bits break; } @@ -230,21 +449,118 @@ bool Raw::DoUpdate() } string line; - while ( GetLine(line) ) + assert ( (NumFields() == 1 && !use_stderr) || (NumFields() == 2 && use_stderr)); + for ( ;; ) { - assert (NumFields() == 1); + if ( stdin_towrite > 0 ) + WriteToStdin(); - Value** fields = new Value*[1]; + int64_t length = GetLine(file); + //printf("Read %lld bytes\n", length); + + if ( length == -3 ) + return false; + + else if ( length == -2 || length == -1 ) + // no data ready or eof + break; + + Value** fields = new Value*[2]; // just always reserve 2. This means that our [] is too long by a count of 1 if not using stderr. But who cares... // filter has exactly one text field. convert to it. Value* val = new Value(TYPE_STRING, true); - val->val.string_val.data = copy_string(line.c_str()); - val->val.string_val.length = line.size(); + val->val.string_val.data = outbuf; + val->val.string_val.length = length; fields[0] = val; + if ( use_stderr ) + { + Value* bval = new Value(TYPE_BOOL, true); + bval->val.int_val = 0; + fields[1] = bval; + } + Put(fields); + + outbuf = 0; } + if ( use_stderr ) + { + for ( ;; ) + { + int64_t length = GetLine(stderrfile); + //printf("Read stderr %lld bytes\n", length); + if ( length == -3 ) + return false; + + else if ( length == -2 || length == -1 ) + break; + + Value** fields = new Value*[2]; + Value* val = new Value(TYPE_STRING, true); + val->val.string_val.data = outbuf; + val->val.string_val.length = length; + fields[0] = val; + Value* bval = new Value(TYPE_BOOL, true); + bval->val.int_val = 1; // yes, we are stderr + fields[1] = bval; + + Put(fields); + + outbuf = 0; + } + } + + if ( ( Info().mode == MODE_MANUAL ) || ( Info().mode == MODE_REREAD ) ) + // done with the current data source + EndCurrentSend(); + + // and let's check if the child process is still alive + int return_code; + if ( childpid != -1 && waitpid(childpid, &return_code, WNOHANG) != 0 ) + { + // child died + bool signal = false; + int code = 0; + if ( WIFEXITED(return_code) ) + { + code = WEXITSTATUS(return_code); + if ( code != 0 ) + Error(Fmt("Child process exited with non-zero return code %d", code)); + } + + else if ( WIFSIGNALED(return_code) ) + { + signal = false; + code = WTERMSIG(return_code); + Error(Fmt("Child process exited due to signal %d", code)); + } + + else + assert(false); + + Value** vals = new Value*[4]; + vals[0] = new Value(TYPE_STRING, true); + vals[0]->val.string_val.data = copy_string(Info().name); + vals[0]->val.string_val.length = strlen(Info().name); + vals[1] = new Value(TYPE_STRING, true); + vals[1]->val.string_val.data = copy_string(Info().source); + vals[1]->val.string_val.length = strlen(Info().source); + vals[2] = new Value(TYPE_COUNT, true); + vals[2]->val.int_val = code; + vals[3] = new Value(TYPE_BOOL, true); + vals[3]->val.int_val = signal; + + // and in this case we can signal end_of_data even for the streaming reader + if ( Info().mode == MODE_STREAM ) + EndCurrentSend(); + + SendEvent("InputRaw::process_finished", 4, vals); + } + + + #ifdef DEBUG Debug(DBG_INPUT, "DoUpdate finished successfully"); #endif diff --git a/src/input/readers/Raw.h b/src/input/readers/Raw.h index 48912b70a7..6dbae21002 100644 --- a/src/input/readers/Raw.h +++ b/src/input/readers/Raw.h @@ -3,7 +3,6 @@ #ifndef INPUT_READERS_RAW_H #define INPUT_READERS_RAW_H -#include #include #include "../ReaderBackend.h" @@ -30,17 +29,49 @@ protected: private: bool OpenInput(); bool CloseInput(); - bool GetLine(string& str); + int64_t GetLine(FILE* file); + bool Execute(); + void WriteToStdin(); string fname; // Source with a potential "|" removed. - istream* in; FILE* file; + FILE* stderrfile; bool execute; bool firstrun; time_t mtime; // options set from the script-level. string separator; + unsigned int sep_length; // length of the separator + + static const int block_size; + int bufpos; + char* buf; + char* outbuf; + + int stdin_fileno; + int stdout_fileno; + int stderr_fileno; + + string stdin_string; + uint64_t stdin_towrite; + + bool use_stderr; + + bool forcekill; + + int pipes[6]; + pid_t childpid; + + enum IoChannels { + stdout_in = 0, + stdout_out = 1, + stdin_in = 2, + stdin_out = 3, + stderr_in = 4, + stderr_out = 5 + }; + }; } diff --git a/src/main.cc b/src/main.cc index 5b39c7d45b..acd7296b27 100644 --- a/src/main.cc +++ b/src/main.cc @@ -357,6 +357,7 @@ void terminate_bro() file_mgr->Terminate(); log_mgr->Terminate(); + input_mgr->Terminate(); thread_mgr->Terminate(); mgr.Drain(); diff --git a/testing/btest/Baseline/core.check-unused-event-handlers/.stderr b/testing/btest/Baseline/core.check-unused-event-handlers/.stderr index 8d8bf1a85b..1a32ad442c 100644 --- a/testing/btest/Baseline/core.check-unused-event-handlers/.stderr +++ b/testing/btest/Baseline/core.check-unused-event-handlers/.stderr @@ -1 +1,2 @@ warning in , line 1: event handler never invoked: this_is_never_used +warning in , line 1: event handler never invoked: InputRaw::process_finished diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.raw/out b/testing/btest/Baseline/scripts.base.frameworks.input.raw.basic/out similarity index 100% rename from testing/btest/Baseline/scripts.base.frameworks.input.raw/out rename to testing/btest/Baseline/scripts.base.frameworks.input.raw.basic/out diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.executeraw/out b/testing/btest/Baseline/scripts.base.frameworks.input.raw.execute/out similarity index 100% rename from testing/btest/Baseline/scripts.base.frameworks.input.executeraw/out rename to testing/btest/Baseline/scripts.base.frameworks.input.raw.execute/out diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.raw.executestdin/out b/testing/btest/Baseline/scripts.base.frameworks.input.raw.executestdin/out new file mode 100644 index 0000000000..c49aee85b3 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.input.raw.executestdin/out @@ -0,0 +1,36 @@ +[source=cat |, reader=Input::READER_RAW, mode=Input::STREAM, name=input2, fields=, want_record=F, ev=line +{ +print outfile, A::description; +print outfile, A::tpe; +print outfile, A::s; +try = try + 1; +if (2 == try) +{ +Input::remove(input2); +close(outfile); +terminate(); +} + +}, config={ +[stdin] = hello^Jthere^A^B^C^D^E^A^B^Cyay +}] +Input::EVENT_NEW +hello +[source=cat |, reader=Input::READER_RAW, mode=Input::STREAM, name=input2, fields=, want_record=F, ev=line +{ +print outfile, A::description; +print outfile, A::tpe; +print outfile, A::s; +try = try + 1; +if (2 == try) +{ +Input::remove(input2); +close(outfile); +terminate(); +} + +}, config={ +[stdin] = hello^Jthere^A^B^C^D^E^A^B^Cyay +}] +Input::EVENT_NEW +there^A^B^C^D^E^A^B^Cyay diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.raw.executestdin/test.txt b/testing/btest/Baseline/scripts.base.frameworks.input.raw.executestdin/test.txt new file mode 100644 index 0000000000..0205cd7c3a --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.input.raw.executestdin/test.txt @@ -0,0 +1,2 @@ +hello +thereyay \ No newline at end of file diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.raw.executestream/out b/testing/btest/Baseline/scripts.base.frameworks.input.raw.executestream/out new file mode 100644 index 0000000000..59a5f2c116 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.input.raw.executestream/out @@ -0,0 +1,153 @@ +[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, name=input, fields=, want_record=F, ev=line +{ +print A::outfile, A::description; +print A::outfile, A::tpe; +print A::outfile, A::s; +A::try = A::try + 1; +if (8 == A::try) +{ +print A::outfile, done; +close(A::outfile); +Input::remove(input); +terminate(); +} + +}, config={ + +}] +Input::EVENT_NEW +sdfkh:KH;fdkncv;ISEUp34:Fkdj;YVpIODhfDF +[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, name=input, fields=, want_record=F, ev=line +{ +print A::outfile, A::description; +print A::outfile, A::tpe; +print A::outfile, A::s; +A::try = A::try + 1; +if (8 == A::try) +{ +print A::outfile, done; +close(A::outfile); +Input::remove(input); +terminate(); +} + +}, config={ + +}] +Input::EVENT_NEW +DSF"DFKJ"SDFKLh304yrsdkfj@#(*U$34jfDJup3UF +[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, name=input, fields=, want_record=F, ev=line +{ +print A::outfile, A::description; +print A::outfile, A::tpe; +print A::outfile, A::s; +A::try = A::try + 1; +if (8 == A::try) +{ +print A::outfile, done; +close(A::outfile); +Input::remove(input); +terminate(); +} + +}, config={ + +}] +Input::EVENT_NEW +q3r3057fdf +[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, name=input, fields=, want_record=F, ev=line +{ +print A::outfile, A::description; +print A::outfile, A::tpe; +print A::outfile, A::s; +A::try = A::try + 1; +if (8 == A::try) +{ +print A::outfile, done; +close(A::outfile); +Input::remove(input); +terminate(); +} + +}, config={ + +}] +Input::EVENT_NEW +sdfs\d +[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, name=input, fields=, want_record=F, ev=line +{ +print A::outfile, A::description; +print A::outfile, A::tpe; +print A::outfile, A::s; +A::try = A::try + 1; +if (8 == A::try) +{ +print A::outfile, done; +close(A::outfile); +Input::remove(input); +terminate(); +} + +}, config={ + +}] +Input::EVENT_NEW + +[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, name=input, fields=, want_record=F, ev=line +{ +print A::outfile, A::description; +print A::outfile, A::tpe; +print A::outfile, A::s; +A::try = A::try + 1; +if (8 == A::try) +{ +print A::outfile, done; +close(A::outfile); +Input::remove(input); +terminate(); +} + +}, config={ + +}] +Input::EVENT_NEW +dfsdf +[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, name=input, fields=, want_record=F, ev=line +{ +print A::outfile, A::description; +print A::outfile, A::tpe; +print A::outfile, A::s; +A::try = A::try + 1; +if (8 == A::try) +{ +print A::outfile, done; +close(A::outfile); +Input::remove(input); +terminate(); +} + +}, config={ + +}] +Input::EVENT_NEW +sdf +[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, name=input, fields=, want_record=F, ev=line +{ +print A::outfile, A::description; +print A::outfile, A::tpe; +print A::outfile, A::s; +A::try = A::try + 1; +if (8 == A::try) +{ +print A::outfile, done; +close(A::outfile); +Input::remove(input); +terminate(); +} + +}, config={ + +}] +Input::EVENT_NEW +3rw43wRRERLlL#RWERERERE. +done diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.raw.long/out b/testing/btest/Baseline/scripts.base.frameworks.input.raw.long/out new file mode 100644 index 0000000000..fac8e79c0b --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.input.raw.long/out @@ -0,0 +1,2 @@ +Input::EVENT_NEW +8193 diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.rereadraw/out b/testing/btest/Baseline/scripts.base.frameworks.input.raw.rereadraw/out similarity index 100% rename from testing/btest/Baseline/scripts.base.frameworks.input.rereadraw/out rename to testing/btest/Baseline/scripts.base.frameworks.input.raw.rereadraw/out diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.raw.stderr/out b/testing/btest/Baseline/scripts.base.frameworks.input.raw.stderr/out new file mode 100644 index 0000000000..b7f857339d --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.input.raw.stderr/out @@ -0,0 +1,27 @@ +Input::EVENT_NEW +..: +F +Input::EVENT_NEW +bro +F +Input::EVENT_NEW +out +F +Input::EVENT_NEW +stderr.bro +F +Input::EVENT_NEW +stderr output contained nonexistant +T +Input::EVENT_NEW +stderr output contained nonexistant +T +Input::EVENT_NEW +stderr output contained nonexistant +T +done +End of Data event +input +Process finished event +input +Exit code != 0 diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.streamraw/out b/testing/btest/Baseline/scripts.base.frameworks.input.raw.streamraw/out similarity index 100% rename from testing/btest/Baseline/scripts.base.frameworks.input.streamraw/out rename to testing/btest/Baseline/scripts.base.frameworks.input.raw.streamraw/out diff --git a/testing/btest/scripts/base/frameworks/input/raw.bro b/testing/btest/scripts/base/frameworks/input/raw/basic.bro similarity index 100% rename from testing/btest/scripts/base/frameworks/input/raw.bro rename to testing/btest/scripts/base/frameworks/input/raw/basic.bro diff --git a/testing/btest/scripts/base/frameworks/input/executeraw.bro b/testing/btest/scripts/base/frameworks/input/raw/execute.bro similarity index 100% rename from testing/btest/scripts/base/frameworks/input/executeraw.bro rename to testing/btest/scripts/base/frameworks/input/raw/execute.bro diff --git a/testing/btest/scripts/base/frameworks/input/raw/executestdin.bro b/testing/btest/scripts/base/frameworks/input/raw/executestdin.bro new file mode 100644 index 0000000000..729844e4b4 --- /dev/null +++ b/testing/btest/scripts/base/frameworks/input/raw/executestdin.bro @@ -0,0 +1,44 @@ +# @TEST-EXEC: btest-bg-run bro bro -b %INPUT +# @TEST-EXEC: btest-bg-wait -k 5 +# @TEST-EXEC: btest-diff test.txt +# @TEST-EXEC: btest-diff out + +redef exit_only_after_terminate = T; +@load base/frameworks/communication # let network-time run. otherwise there are no heartbeats... + +global outfile: file; +global try: count; + +module A; + +type Val: record { + s: string; +}; + +event line(description: Input::EventDescription, tpe: Input::Event, s: string) + { + print outfile, description; + print outfile, tpe; + print outfile, s; + try = try + 1; + if ( try == 2 ) + { + Input::remove("input2"); + close(outfile); + terminate(); + } + } + +event bro_init() + { + local config_strings: table[string] of string = { + ["stdin"] = "hello\nthere\1\2\3\4\5\1\2\3yay" + #["stdin"] = "yay" + }; + + try = 0; + outfile = open("../out"); + Input::add_event([$source="cat > ../test.txt |", $reader=Input::READER_RAW, $mode=Input::STREAM, $name="input", $fields=Val, $ev=line, $want_record=F, $config=config_strings]); + Input::remove("input"); + Input::add_event([$source="cat |", $reader=Input::READER_RAW, $mode=Input::STREAM, $name="input2", $fields=Val, $ev=line, $want_record=F, $config=config_strings]); + } diff --git a/testing/btest/scripts/base/frameworks/input/raw/executestream.bro b/testing/btest/scripts/base/frameworks/input/raw/executestream.bro new file mode 100644 index 0000000000..ead33018dc --- /dev/null +++ b/testing/btest/scripts/base/frameworks/input/raw/executestream.bro @@ -0,0 +1,61 @@ +# @TEST-EXEC: cp input1.log input.log +# @TEST-EXEC: btest-bg-run bro bro -b %INPUT +# @TEST-EXEC: sleep 3 +# @TEST-EXEC: cat input2.log >> input.log +# @TEST-EXEC: sleep 3 +# @TEST-EXEC: cat input3.log >> input.log +# @TEST-EXEC: btest-bg-wait -k 5 +# @TEST-EXEC: btest-diff out + +redef exit_only_after_terminate = T; + +@TEST-START-FILE input1.log +sdfkh:KH;fdkncv;ISEUp34:Fkdj;YVpIODhfDF +@TEST-END-FILE + +@TEST-START-FILE input2.log +DSF"DFKJ"SDFKLh304yrsdkfj@#(*U$34jfDJup3UF +q3r3057fdf +@TEST-END-FILE + +@TEST-START-FILE input3.log +sdfs\d + +dfsdf +sdf +3rw43wRRERLlL#RWERERERE. +@TEST-END-FILE + +@load base/frameworks/communication # let network-time run + +module A; + +type Val: record { + s: string; +}; + +global try: count; +global outfile: file; + +event line(description: Input::EventDescription, tpe: Input::Event, s: string) + { + print outfile, description; + print outfile, tpe; + print outfile, s; + + try = try + 1; + if ( try == 8 ) + { + print outfile, "done"; + close(outfile); + Input::remove("input"); + terminate(); + } + } + +event bro_init() + { + outfile = open("../out"); + try = 0; + Input::add_event([$source="tail -f ../input.log |", $reader=Input::READER_RAW, $mode=Input::STREAM, $name="input", $fields=Val, $ev=line, $want_record=F]); + } diff --git a/testing/btest/scripts/base/frameworks/input/raw/long.bro b/testing/btest/scripts/base/frameworks/input/raw/long.bro new file mode 100644 index 0000000000..ac07639f77 --- /dev/null +++ b/testing/btest/scripts/base/frameworks/input/raw/long.bro @@ -0,0 +1,37 @@ +# @TEST-EXEC: dd if=/dev/zero of=input.log bs=8193 count=1 +# @TEST-EXEC: btest-bg-run bro bro -b %INPUT +# @TEST-EXEC: btest-bg-wait -k 5 +# @TEST-EXEC: btest-diff out +# +# this test should be longer than one block-size. to test behavior of input-reader if it has to re-allocate stuff. + +redef exit_only_after_terminate = T; + +global outfile: file; +global try: count; + +module A; + +type Val: record { + s: string; +}; + +event line(description: Input::EventDescription, tpe: Input::Event, s: string) + { + print outfile, tpe; + print outfile, |s|; + try = try + 1; + if ( try == 1 ) + { + close(outfile); + terminate(); + } + } + +event bro_init() + { + try = 0; + outfile = open("../out"); + Input::add_event([$source="../input.log", $reader=Input::READER_RAW, $mode=Input::STREAM, $name="input", $fields=Val, $ev=line, $want_record=F]); + Input::remove("input"); + } diff --git a/testing/btest/scripts/base/frameworks/input/rereadraw.bro b/testing/btest/scripts/base/frameworks/input/raw/rereadraw.bro similarity index 100% rename from testing/btest/scripts/base/frameworks/input/rereadraw.bro rename to testing/btest/scripts/base/frameworks/input/raw/rereadraw.bro diff --git a/testing/btest/scripts/base/frameworks/input/raw/stderr.bro b/testing/btest/scripts/base/frameworks/input/raw/stderr.bro new file mode 100644 index 0000000000..e84ed048cd --- /dev/null +++ b/testing/btest/scripts/base/frameworks/input/raw/stderr.bro @@ -0,0 +1,66 @@ +# @TEST-EXEC: btest-bg-run bro bro -b %INPUT +# @TEST-EXEC: btest-bg-wait -k 5 +# @TEST-EXEC: btest-diff out + +redef exit_only_after_terminate = T; + +type Val: record { + s: string; + is_stderr: bool; +}; + +global try: count; +global outfile: file; + +event line(description: Input::EventDescription, tpe: Input::Event, s: string, is_stderr: bool) + { + print outfile, tpe; + if ( is_stderr ) + { + # work around localized error messages. and if some localization does not include the filename... well... that would be bad :) + if ( strstr(s, "nonexistant") > 0 ) + { + print outfile, "stderr output contained nonexistant"; + } + } + else + { + print outfile, s; + } + print outfile, is_stderr; + + try = try + 1; + if ( try == 7 ) + { + print outfile, "done"; + Input::remove("input"); + } + } + +event Input::end_of_data(name: string, source:string) + { + print outfile, "End of Data event"; + print outfile, name; + terminate(); # due to the current design, end_of_data will be called after process_finshed and all line events. + # this could potentially change + } + +event InputRaw::process_finished(name: string, source:string, exit_code:count, signal_exit:bool) + { + print outfile, "Process finished event"; + print outfile, name; + if ( exit_code != 0 ) + print outfile, "Exit code != 0"; + } + +event bro_init() + { + + local config_strings: table[string] of string = { + ["read_stderr"] = "1" + }; + + outfile = open("../out"); + try = 0; + Input::add_event([$source="ls .. ../nonexistant ../nonexistant2 ../nonexistant3 |", $reader=Input::READER_RAW, $name="input", $fields=Val, $ev=line, $want_record=F, $config=config_strings]); + } diff --git a/testing/btest/scripts/base/frameworks/input/streamraw.bro b/testing/btest/scripts/base/frameworks/input/raw/streamraw.bro similarity index 100% rename from testing/btest/scripts/base/frameworks/input/streamraw.bro rename to testing/btest/scripts/base/frameworks/input/raw/streamraw.bro