Merge remote-tracking branch 'origin/topic/bernhard/input-update'

Closes #1021.

* origin/topic/bernhard/input-update:
  this event handler fails the unused-event-handlers test because it is a bit of a special case.
  ...and fix the event ordering issue. Dispatch != QueueEvent
  add Terminate to input framework to prevent potential shutdown race-conditions.
  fix warning.
  fix stderr test. ls behaves differently on errors on linux...
  small fixes.
  linux does not have strnstr
  and close only fds that are currently open (the logging framework really did not like that :) )
  A bunch of more changes for the raw reader
  make reading from stdout and stderr simultaneously work.
  allow sending data to stdin of child process
  Streaming reads from external commands work without blocking anything.
  replace popen with fork and exec.
  change raw reader to use basic c io instead of fdstream encapsulation class.
This commit is contained in:
Robin Sommer 2013-07-03 16:46:26 -07:00
commit 96fe05633a
29 changed files with 946 additions and 277 deletions

15
CHANGES
View file

@ -1,4 +1,19 @@
2.1-780 | 2013-07-03 16:46:26 -0700
* Rewrite of the RAW input reader for improved robustness and new
features. (Bernhard Amann) This includes:
- Send "end_of_data" event for all kind of streams.
- Send "process_finished" event with exit code of child
process at process termination.
- Expose name of input stream to readers.
- Better error handling.
- New "force_kill" option which SIGKILLs processes on reader termination.
- Supports reading from stdout and stderr simultaneously.
- Support sending data to stdin of child process.
- Streaming reads from external commands work without blocking.
2.1-762 | 2013-07-03 16:33:22 -0700 2.1-762 | 2013-07-03 16:33:22 -0700
* Fix to correct support for TLS 1.2. Addresses #1020. (Seth Hall, * Fix to correct support for TLS 1.2. Addresses #1020. (Seth Hall,

10
NEWS
View file

@ -73,10 +73,12 @@ New Functionality
script file name without path, respectively. (Jon Siwek) script file name without path, respectively. (Jon Siwek)
- The new file analysis framework moves most of the processing of file - The new file analysis framework moves most of the processing of file
content from script-land into the core, where it belongs. Much of content from script-land into the core, where it belongs. See
this is an internal change, the framework comes with the following doc/file-analysis.rst for more information.
user-visibible functionality (some of that was already available
before, but done differently): Much of this is an internal change, but the framework also comes
with the following user-visibible functionality (some of that was
already available before, but done differently):
[TODO: This will probably change with further script updates.] [TODO: This will probably change with further script updates.]

View file

@ -1 +1 @@
2.1-762 2.1-780

View file

@ -6,4 +6,12 @@ export {
## Separator between input records. ## Separator between input records.
## Please note that the separator has to be exactly one character long ## Please note that the separator has to be exactly one character long
const record_separator = "\n" &redef; const record_separator = "\n" &redef;
## Event that is called when a process created by the raw reader exits.
##
## name: name of the input stream
## source: source of the input stream
## exit_code: exit code of the program, or number of the signal that forced the program to exit
## signal_exit: false when program exitted normally, true when program was forced to exit by a signal
global process_finished: event(name: string, source:string, exit_code:count, signal_exit:bool);
} }

View file

@ -320,6 +320,7 @@ bool Manager::CreateStream(Stream* info, RecordVal* description)
ReaderBackend::ReaderInfo* rinfo = new ReaderBackend::ReaderInfo(); ReaderBackend::ReaderInfo* rinfo = new ReaderBackend::ReaderInfo();
rinfo->source = copy_string(source.c_str()); rinfo->source = copy_string(source.c_str());
rinfo->name = copy_string(name.c_str());
EnumVal* mode = description->LookupWithDefault(rtype->FieldOffset("mode"))->AsEnumVal(); EnumVal* mode = description->LookupWithDefault(rtype->FieldOffset("mode"))->AsEnumVal();
switch ( mode->InternalInt() ) switch ( mode->InternalInt() )
@ -1241,6 +1242,9 @@ void Manager::EndCurrentSend(ReaderFrontend* reader)
if ( i->stream_type != TABLE_STREAM ) if ( i->stream_type != TABLE_STREAM )
{ {
#ifdef DEBUG
DBG_LOG(DBG_INPUT, "%s is event, sending end of data", i->name.c_str());
#endif
// just signal the end of the data source // just signal the end of the data source
SendEndOfData(i); SendEndOfData(i);
return; return;
@ -1345,8 +1349,13 @@ void Manager::SendEndOfData(ReaderFrontend* reader)
SendEndOfData(i); SendEndOfData(i);
} }
void Manager::SendEndOfData(const Stream *i) void Manager::SendEndOfData(const Stream *i)
{ {
#ifdef DEBUG
DBG_LOG(DBG_INPUT, "SendEndOfData for stream %s",
i->name.c_str());
#endif
SendEvent(end_of_data, 2, new StringVal(i->name.c_str()), new StringVal(i->info->source)); SendEvent(end_of_data, 2, new StringVal(i->name.c_str()), new StringVal(i->info->source));
if ( i->stream_type == ANALYSIS_STREAM ) if ( i->stream_type == ANALYSIS_STREAM )
@ -1362,6 +1371,11 @@ void Manager::Put(ReaderFrontend* reader, Value* *vals)
return; return;
} }
#ifdef DEBUG
DBG_LOG(DBG_INPUT, "Put for stream %s",
i->name.c_str());
#endif
int readFields = 0; int readFields = 0;
if ( i->stream_type == TABLE_STREAM ) if ( i->stream_type == TABLE_STREAM )
@ -1700,6 +1714,11 @@ bool Manager::SendEvent(const string& name, const int num_vals, Value* *vals)
return false; return false;
} }
#ifdef DEBUG
DBG_LOG(DBG_INPUT, "SendEvent for event %s with num_vals vals",
name.c_str(), num_vals);
#endif
RecordType *type = handler->FType()->Args(); RecordType *type = handler->FType()->Args();
int num_event_vals = type->NumFields(); int num_event_vals = type->NumFields();
if ( num_vals != num_event_vals ) if ( num_vals != num_event_vals )
@ -1712,7 +1731,7 @@ bool Manager::SendEvent(const string& name, const int num_vals, Value* *vals)
for ( int i = 0; i < num_vals; i++) for ( int i = 0; i < num_vals; i++)
vl->append(ValueToVal(vals[i], type->FieldType(i))); vl->append(ValueToVal(vals[i], type->FieldType(i)));
mgr.Dispatch(new Event(handler, vl)); mgr.QueueEvent(handler, vl, SOURCE_LOCAL);
for ( int i = 0; i < num_vals; i++ ) for ( int i = 0; i < num_vals; i++ )
delete vals[i]; delete vals[i];
@ -1726,6 +1745,11 @@ void Manager::SendEvent(EventHandlerPtr ev, const int numvals, ...)
{ {
val_list* vl = new val_list; val_list* vl = new val_list;
#ifdef DEBUG
DBG_LOG(DBG_INPUT, "SendEvent with %d vals",
numvals);
#endif
va_list lP; va_list lP;
va_start(lP, numvals); va_start(lP, numvals);
for ( int i = 0; i < numvals; i++ ) for ( int i = 0; i < numvals; i++ )
@ -1740,6 +1764,11 @@ void Manager::SendEvent(EventHandlerPtr ev, list<Val*> events)
{ {
val_list* vl = new val_list; val_list* vl = new val_list;
#ifdef DEBUG
DBG_LOG(DBG_INPUT, "SendEvent with %d vals (list)",
events.size());
#endif
for ( list<Val*>::iterator i = events.begin(); i != events.end(); i++ ) for ( list<Val*>::iterator i = events.begin(); i != events.end(); i++ )
{ {
vl->append( *i ); vl->append( *i );
@ -2244,3 +2273,18 @@ Manager::Stream* Manager::FindStream(ReaderFrontend* reader)
return 0; return 0;
} }
// Function is called on Bro shutdown.
// Signal all frontends that they will cease operation.
void Manager::Terminate()
{
for ( map<ReaderFrontend*, Stream*>::iterator i = readers.begin(); i != readers.end(); ++i )
{
if ( i->second->removed )
continue;
i->second->removed = true;
i->second->reader->Stop();
}
}

View file

@ -91,6 +91,11 @@ public:
*/ */
bool RemoveStream(const string &id); bool RemoveStream(const string &id);
/**
* Signals the manager to shutdown at Bro's termination.
*/
void Terminate();
protected: protected:
friend class ReaderFrontend; friend class ReaderFrontend;
friend class PutMessage; friend class PutMessage;

View file

@ -85,6 +85,11 @@ public:
*/ */
const char* source; const char* source;
/**
* The name of the input stream.
*/
const char* name;
/** /**
* A map of key/value pairs corresponding to the relevant * A map of key/value pairs corresponding to the relevant
* filter's "config" table. * filter's "config" table.
@ -99,12 +104,14 @@ public:
ReaderInfo() ReaderInfo()
{ {
source = 0; source = 0;
name = 0;
mode = MODE_NONE; mode = MODE_NONE;
} }
ReaderInfo(const ReaderInfo& other) ReaderInfo(const ReaderInfo& other)
{ {
source = other.source ? copy_string(other.source) : 0; source = other.source ? copy_string(other.source) : 0;
name = other.name ? copy_string(other.name) : 0;
mode = other.mode; mode = other.mode;
for ( config_map::const_iterator i = other.config.begin(); i != other.config.end(); i++ ) for ( config_map::const_iterator i = other.config.begin(); i != other.config.end(); i++ )

View file

@ -1,189 +0,0 @@
/* The following code declares classes to read from and write to
* file descriptore or file handles.
*
* See
* http://www.josuttis.com/cppcode
* for details and the latest version.
*
* - open:
* - integrating BUFSIZ on some systems?
* - optimized reading of multiple characters
* - stream for reading AND writing
* - i18n
*
* (C) Copyright Nicolai M. Josuttis 2001.
* Permission to copy, use, modify, sell and distribute this software
* is granted provided this copyright notice appears in all copies.
* This software is provided "as is" without express or implied
* warranty, and with no claim as to its suitability for any purpose.
*
* Version: Jul 28, 2002
* History:
* Jul 28, 2002: bugfix memcpy() => memmove()
* fdinbuf::underflow(): cast for return statements
* Aug 05, 2001: first public version
*/
#ifndef BOOST_FDSTREAM_HPP
#define BOOST_FDSTREAM_HPP
#include <istream>
#include <ostream>
#include <streambuf>
// for EOF:
#include <cstdio>
// for memmove():
#include <cstring>
// low-level read and write functions
#ifdef _MSC_VER
# include <io.h>
#else
# include <sys/errno.h>
# include <unistd.h>
//extern "C" {
// int write (int fd, const char* buf, int num);
// int read (int fd, char* buf, int num);
//}
#endif
// BEGIN namespace BOOST
namespace boost {
/************************************************************
* fdostream
* - a stream that writes on a file descriptor
************************************************************/
class fdoutbuf : public std::streambuf {
protected:
int fd; // file descriptor
public:
// constructor
fdoutbuf (int _fd) : fd(_fd) {
}
protected:
// write one character
virtual int_type overflow (int_type c) {
if (c != EOF) {
char z = c;
if (write (fd, &z, 1) != 1) {
return EOF;
}
}
return c;
}
// write multiple characters
virtual
std::streamsize xsputn (const char* s,
std::streamsize num) {
return write(fd,s,num);
}
};
class fdostream : public std::ostream {
protected:
fdoutbuf buf;
public:
fdostream (int fd) : std::ostream(0), buf(fd) {
rdbuf(&buf);
}
};
/************************************************************
* fdistream
* - a stream that reads on a file descriptor
************************************************************/
class fdinbuf : public std::streambuf {
protected:
int fd; // file descriptor
protected:
/* data buffer:
* - at most, pbSize characters in putback area plus
* - at most, bufSize characters in ordinary read buffer
*/
static const int pbSize = 4; // size of putback area
static const int bufSize = 1024; // size of the data buffer
char buffer[bufSize+pbSize]; // data buffer
public:
/* constructor
* - initialize file descriptor
* - initialize empty data buffer
* - no putback area
* => force underflow()
*/
fdinbuf (int _fd) : fd(_fd) {
setg (buffer+pbSize, // beginning of putback area
buffer+pbSize, // read position
buffer+pbSize); // end position
}
protected:
// insert new characters into the buffer
virtual int_type underflow () {
#ifndef _MSC_VER
using std::memmove;
#endif
// is read position before end of buffer?
if (gptr() < egptr()) {
return traits_type::to_int_type(*gptr());
}
/* process size of putback area
* - use number of characters read
* - but at most size of putback area
*/
int numPutback;
numPutback = gptr() - eback();
if (numPutback > pbSize) {
numPutback = pbSize;
}
/* copy up to pbSize characters previously read into
* the putback area
*/
memmove (buffer+(pbSize-numPutback), gptr()-numPutback,
numPutback);
// read at most bufSize new characters
int num;
num = read (fd, buffer+pbSize, bufSize);
if ( num == EAGAIN ) {
return 0;
}
if (num <= 0) {
// ERROR or EOF
return EOF;
}
// reset buffer pointers
setg (buffer+(pbSize-numPutback), // beginning of putback area
buffer+pbSize, // read position
buffer+pbSize+num); // end of buffer
// return next character
return traits_type::to_int_type(*gptr());
}
};
class fdistream : public std::istream {
protected:
fdinbuf buf;
public:
fdistream (int fd) : std::istream(0), buf(fd) {
rdbuf(&buf);
}
};
} // END namespace boost
#endif /*BOOST_FDSTREAM_HPP*/

View file

@ -3,32 +3,46 @@
#include "Raw.h" #include "Raw.h"
#include "NetVar.h" #include "NetVar.h"
#include <fstream>
#include <sstream>
#include "../../threading/SerialTypes.h" #include "../../threading/SerialTypes.h"
#include "../fdstream.h"
#include <sys/types.h> #include <sys/types.h>
#include <sys/wait.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <unistd.h> #include <unistd.h>
#include <stdio.h> #include <stdio.h>
#include <fcntl.h> #include <fcntl.h>
#include <errno.h>
#include <signal.h>
using namespace input::reader; using namespace input::reader;
using threading::Value; using threading::Value;
using threading::Field; using threading::Field;
const int Raw::block_size = 4096; // how big do we expect our chunks of data to be.
Raw::Raw(ReaderFrontend *frontend) : ReaderBackend(frontend) Raw::Raw(ReaderFrontend *frontend) : ReaderBackend(frontend)
{ {
file = 0; file = 0;
in = 0; stderrfile = 0;
forcekill = false;
separator.assign( (const char*) BifConst::InputRaw::record_separator->Bytes(), separator.assign( (const char*) BifConst::InputRaw::record_separator->Bytes(),
BifConst::InputRaw::record_separator->Len()); BifConst::InputRaw::record_separator->Len());
if ( separator.size() != 1 ) sep_length = BifConst::InputRaw::record_separator->Len();
Error("separator length has to be 1. Separator will be truncated.");
buf = 0;
outbuf = 0;
bufpos = 0;
stdin_fileno = fileno(stdin);
stdout_fileno = fileno(stdout);
stderr_fileno = fileno(stderr);
childpid = -1;
stdin_towrite = 0; // by default do not open stdin
use_stderr = false;
} }
Raw::~Raw() Raw::~Raw()
@ -40,41 +54,123 @@ void Raw::DoClose()
{ {
if ( file != 0 ) if ( file != 0 )
CloseInput(); CloseInput();
if ( execute && childpid > 0 && kill(childpid, 0) == 0 )
{
// kill child process
kill(childpid, 15); // sigterm
if ( forcekill )
{
usleep(200); // 200 msecs should be enough for anyone ;)
if ( kill(childpid, 0) == 0 ) // perhaps it is already gone
kill(childpid, 9); // TERMINATE
}
}
}
bool Raw::Execute()
{
if ( pipe(pipes) != 0 || pipe(pipes+2) || pipe(pipes+4) )
{
Error(Fmt("Could not open pipe: %d", errno));
return false;
}
childpid = fork();
if ( childpid < 0 )
{
Error(Fmt("Could not create child process: %d", errno));
return false;
}
else if ( childpid == 0 )
{
// we are the child.
close(pipes[stdout_in]);
dup2(pipes[stdout_out], stdout_fileno);
if ( stdin_towrite )
{
close(pipes[stdin_out]);
dup2(pipes[stdin_in], stdin_fileno);
}
if ( use_stderr )
{
close(pipes[stderr_in]);
dup2(pipes[stderr_out], stderr_fileno);
}
execl("/bin/sh", "sh", "-c", fname.c_str(), NULL);
fprintf(stderr, "Exec failed :(......\n");
exit(255);
}
else
{
// we are the parent
close(pipes[stdout_out]);
pipes[stdout_out] = -1;
if ( Info().mode == MODE_STREAM )
fcntl(pipes[stdout_in], F_SETFL, O_NONBLOCK);
if ( stdin_towrite )
{
close(pipes[stdin_in]);
pipes[stdin_in] = -1;
fcntl(pipes[stdin_out], F_SETFL, O_NONBLOCK); // ya, just always set this to nonblocking. we do not want to block on a program receiving data.
// note that there is a small gotcha with it. More data is queued when more data is read from the program output. Hence, when having
// a program in mode_manual where the first write cannot write everything, the rest will be stuck in a queue that is never emptied.
}
if ( use_stderr )
{
close(pipes[stderr_out]);
pipes[stderr_out] = -1;
fcntl(pipes[stderr_in], F_SETFL, O_NONBLOCK); // true for this too.
}
file = fdopen(pipes[stdout_in], "r");
pipes[stdout_in] = -1; // will be closed by fclose
if ( use_stderr )
stderrfile = fdopen(pipes[stderr_in], "r");
pipes[stderr_in] = -1; // will be closed by fclose
if ( file == 0 || (stderrfile == 0 && use_stderr) )
{
Error("Could not convert fileno to file");
return false;
}
return true;
}
} }
bool Raw::OpenInput() bool Raw::OpenInput()
{ {
if ( execute ) if ( execute )
{ return Execute();
file = popen(fname.c_str(), "r");
if ( file == NULL )
{
Error(Fmt("Could not execute command %s", fname.c_str()));
return false;
}
}
else else
{ {
file = fopen(fname.c_str(), "r"); file = fopen(fname.c_str(), "r");
if ( file == NULL ) fcntl(fileno(file), F_SETFD, FD_CLOEXEC);
if ( ! file )
{ {
Error(Fmt("Init: cannot open %s", fname.c_str())); Error(Fmt("Init: cannot open %s", fname.c_str()));
return false; return false;
} }
} }
// This is defined in input/fdstream.h
in = new boost::fdistream(fileno(file));
if ( execute && Info().mode == MODE_STREAM )
fcntl(fileno(file), F_SETFL, O_NONBLOCK);
return true; return true;
} }
bool Raw::CloseInput() bool Raw::CloseInput()
{ {
if ( file == NULL ) if ( file == 0 )
{ {
InternalError(Fmt("Trying to close closed file for stream %s", fname.c_str())); InternalError(Fmt("Trying to close closed file for stream %s", fname.c_str()));
return false; return false;
@ -83,15 +179,20 @@ bool Raw::CloseInput()
Debug(DBG_INPUT, "Raw reader starting close"); Debug(DBG_INPUT, "Raw reader starting close");
#endif #endif
delete in; fclose(file);
if ( execute ) if ( use_stderr )
pclose(file); fclose(stderrfile);
else
fclose(file);
in = NULL; if ( execute ) // we do not care if any of those fails. They should all be defined.
file = NULL; {
for ( int i = 0; i < 6; i ++ )
if ( pipes[i] != -1 )
close(pipes[i]);
}
file = 0;
stderrfile = 0;
#ifdef DEBUG #ifdef DEBUG
Debug(DBG_INPUT, "Raw reader finished close"); Debug(DBG_INPUT, "Raw reader finished close");
@ -106,28 +207,9 @@ bool Raw::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fie
mtime = 0; mtime = 0;
execute = false; execute = false;
firstrun = true; firstrun = true;
int want_fields = 1;
bool result; bool result;
if ( ! info.source || strlen(info.source) == 0 )
{
Error("No source path provided");
return false;
}
if ( num_fields != 1 )
{
Error("Filter for raw reader contains more than one field. "
"Filters for the raw reader may only contain exactly one string field. "
"Filter ignored.");
return false;
}
if ( fields[0]->type != TYPE_STRING )
{
Error("Filter for raw reader contains a field that is not of type string.");
return false;
}
// do Initialization // do Initialization
string source = string(info.source); string source = string(info.source);
char last = info.source[source.length() - 1]; char last = info.source[source.length() - 1];
@ -135,23 +217,63 @@ bool Raw::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fie
{ {
execute = true; execute = true;
fname = source.substr(0, fname.length() - 1); fname = source.substr(0, fname.length() - 1);
if ( (info.mode != MODE_MANUAL) )
{
Error(Fmt("Unsupported read mode %d for source %s in execution mode",
info.mode, fname.c_str()));
return false;
}
result = OpenInput();
} }
else
if ( ! info.source || strlen(info.source) == 0 )
{ {
execute = false; Error("No source path provided");
result = OpenInput(); return false;
} }
map<const char*, const char*>::const_iterator it = info.config.find("stdin"); // data that is sent to the child process
if ( it != info.config.end() )
{
stdin_string = it->second;
stdin_towrite = stdin_string.length();
}
it = info.config.find("read_stderr"); // we want to read stderr
if ( it != info.config.end() && execute )
{
use_stderr = true;
want_fields = 2;
}
it = info.config.find("force_kill"); // we want to be sure that our child is dead when we exit
if ( it != info.config.end() && execute )
{
forcekill = true;
}
if ( num_fields != want_fields )
{
Error(Fmt("Filter for raw reader contains wrong number of fields -- got %d, expected %d. "
"Filters for the raw reader contain one string field when used in normal mode and one string and one bool fields when using execute mode with stderr capuring. "
"Filter ignored.", num_fields, want_fields));
return false;
}
if ( fields[0]->type != TYPE_STRING )
{
Error("First field for raw reader always has to be of type string.");
return false;
}
if ( use_stderr && fields[1]->type != TYPE_BOOL )
{
Error("Second field for raw reader always has to be of type bool.");
return false;
}
if ( execute && Info().mode == MODE_REREAD )
{
// for execs this makes no sense - would have to execute each heartbeat?
Error("Rereading only supported for files, not for executables.");
return false;
}
result = OpenInput();
if ( result == false ) if ( result == false )
return result; return result;
@ -168,18 +290,115 @@ bool Raw::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fie
return true; return true;
} }
int64_t Raw::GetLine(FILE* arg_file)
bool Raw::GetLine(string& str)
{ {
if ( in->peek() == std::iostream::traits_type::eof() ) errno = 0;
return false; int pos = 0; // strstr_n only works on ints - so no use to use something different here
int offset = 0;
if ( in->eofbit == true || in->failbit == true ) if ( buf == 0 )
return false; buf = new char[block_size];
return getline(*in, str, separator[0]); int repeats = 1;
for (;;)
{
size_t readbytes = fread(buf+bufpos+offset, 1, block_size-bufpos, arg_file);
pos += bufpos + readbytes;
//printf("Pos: %d\n", pos);
bufpos = offset = 0; // read full block size in next read...
if ( pos == 0 && errno != 0 )
break;
// researching everything each time is a bit... cpu-intensive. But otherwhise we have
// to deal with situations where the separator is multi-character and split over multiple
// reads...
int found = strstr_n(pos, (unsigned char*) buf, separator.size(), (unsigned char*) separator.c_str());
if ( found == -1 )
{
// we did not find it and have to search again in the next try. resize buffer....
// but first check if we encountered the file end - because if we did this was it.
if ( feof(arg_file) != 0 )
{
outbuf = buf;
buf = 0;
if ( pos == 0 )
return -1; // signal EOF - and that we had no more data.
else
return pos;
}
repeats++;
// bah, we cannot use realloc because we would have to change the delete in the manager to a free.
char * newbuf = new char[block_size*repeats];
memcpy(newbuf, buf, block_size*(repeats-1));
delete buf;
buf = newbuf;
offset = block_size*(repeats-1);
}
else
{
outbuf = buf;
buf = 0;
buf = new char[block_size];
if ( found < pos )
{
// we have leftovers. copy them into the buffer for the next line
buf = new char[block_size];
memcpy(buf, outbuf + found + sep_length, pos - found - sep_length);
bufpos = pos - found - sep_length;
}
return found;
}
}
if ( errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR )
return -2;
else
{
// an error code we did no expect. This probably is bad.
Error(Fmt("Reader encountered unexpected error code %d", errno));
return -3;
}
InternalError("Internal control flow execution");
assert(false);
} }
// write to the stdin of the child process
void Raw::WriteToStdin()
{
assert(stdin_towrite <= stdin_string.length());
uint64_t pos = stdin_string.length() - stdin_towrite;
errno = 0;
ssize_t written = write(pipes[stdin_out], stdin_string.c_str() + pos, stdin_towrite);
stdin_towrite -= written;
if ( errno != 0 && errno != EAGAIN && errno != EWOULDBLOCK )
{
Error(Fmt("Writing to child process stdin failed: %d. Stopping writing at position %d", errno, pos));
stdin_towrite = 0;
close(pipes[stdin_out]);
}
if ( stdin_towrite == 0 ) // send EOF when we are done.
close(pipes[stdin_out]);
if ( Info().mode == MODE_MANUAL && stdin_towrite != 0 )
{
Error(Fmt("Could not write whole string to stdin of child process in one go. Please use STREAM mode to pass more data to child."));
}
}
// read the entire file and send appropriate thingies back to InputMgr // read the entire file and send appropriate thingies back to InputMgr
bool Raw::DoUpdate() bool Raw::DoUpdate()
{ {
@ -191,6 +410,7 @@ bool Raw::DoUpdate()
switch ( Info().mode ) { switch ( Info().mode ) {
case MODE_REREAD: case MODE_REREAD:
{ {
assert(childpid == -1); // mode may not be used to execute child programs
// check if the file has changed // check if the file has changed
struct stat sb; struct stat sb;
if ( stat(fname.c_str(), &sb) == -1 ) if ( stat(fname.c_str(), &sb) == -1 )
@ -211,10 +431,9 @@ bool Raw::DoUpdate()
case MODE_MANUAL: case MODE_MANUAL:
case MODE_STREAM: case MODE_STREAM:
if ( Info().mode == MODE_STREAM && file != NULL && in != NULL ) if ( Info().mode == MODE_STREAM && file != 0 )
{ {
//fpurge(file); clearerr(file); // remove end of file evil bits
in->clear(); // remove end of file evil bits
break; break;
} }
@ -230,21 +449,118 @@ bool Raw::DoUpdate()
} }
string line; string line;
while ( GetLine(line) ) assert ( (NumFields() == 1 && !use_stderr) || (NumFields() == 2 && use_stderr));
for ( ;; )
{ {
assert (NumFields() == 1); if ( stdin_towrite > 0 )
WriteToStdin();
Value** fields = new Value*[1]; int64_t length = GetLine(file);
//printf("Read %lld bytes\n", length);
if ( length == -3 )
return false;
else if ( length == -2 || length == -1 )
// no data ready or eof
break;
Value** fields = new Value*[2]; // just always reserve 2. This means that our [] is too long by a count of 1 if not using stderr. But who cares...
// filter has exactly one text field. convert to it. // filter has exactly one text field. convert to it.
Value* val = new Value(TYPE_STRING, true); Value* val = new Value(TYPE_STRING, true);
val->val.string_val.data = copy_string(line.c_str()); val->val.string_val.data = outbuf;
val->val.string_val.length = line.size(); val->val.string_val.length = length;
fields[0] = val; fields[0] = val;
if ( use_stderr )
{
Value* bval = new Value(TYPE_BOOL, true);
bval->val.int_val = 0;
fields[1] = bval;
}
Put(fields); Put(fields);
outbuf = 0;
} }
if ( use_stderr )
{
for ( ;; )
{
int64_t length = GetLine(stderrfile);
//printf("Read stderr %lld bytes\n", length);
if ( length == -3 )
return false;
else if ( length == -2 || length == -1 )
break;
Value** fields = new Value*[2];
Value* val = new Value(TYPE_STRING, true);
val->val.string_val.data = outbuf;
val->val.string_val.length = length;
fields[0] = val;
Value* bval = new Value(TYPE_BOOL, true);
bval->val.int_val = 1; // yes, we are stderr
fields[1] = bval;
Put(fields);
outbuf = 0;
}
}
if ( ( Info().mode == MODE_MANUAL ) || ( Info().mode == MODE_REREAD ) )
// done with the current data source
EndCurrentSend();
// and let's check if the child process is still alive
int return_code;
if ( childpid != -1 && waitpid(childpid, &return_code, WNOHANG) != 0 )
{
// child died
bool signal = false;
int code = 0;
if ( WIFEXITED(return_code) )
{
code = WEXITSTATUS(return_code);
if ( code != 0 )
Error(Fmt("Child process exited with non-zero return code %d", code));
}
else if ( WIFSIGNALED(return_code) )
{
signal = false;
code = WTERMSIG(return_code);
Error(Fmt("Child process exited due to signal %d", code));
}
else
assert(false);
Value** vals = new Value*[4];
vals[0] = new Value(TYPE_STRING, true);
vals[0]->val.string_val.data = copy_string(Info().name);
vals[0]->val.string_val.length = strlen(Info().name);
vals[1] = new Value(TYPE_STRING, true);
vals[1]->val.string_val.data = copy_string(Info().source);
vals[1]->val.string_val.length = strlen(Info().source);
vals[2] = new Value(TYPE_COUNT, true);
vals[2]->val.int_val = code;
vals[3] = new Value(TYPE_BOOL, true);
vals[3]->val.int_val = signal;
// and in this case we can signal end_of_data even for the streaming reader
if ( Info().mode == MODE_STREAM )
EndCurrentSend();
SendEvent("InputRaw::process_finished", 4, vals);
}
#ifdef DEBUG #ifdef DEBUG
Debug(DBG_INPUT, "DoUpdate finished successfully"); Debug(DBG_INPUT, "DoUpdate finished successfully");
#endif #endif

View file

@ -3,7 +3,6 @@
#ifndef INPUT_READERS_RAW_H #ifndef INPUT_READERS_RAW_H
#define INPUT_READERS_RAW_H #define INPUT_READERS_RAW_H
#include <iostream>
#include <vector> #include <vector>
#include "../ReaderBackend.h" #include "../ReaderBackend.h"
@ -30,17 +29,49 @@ protected:
private: private:
bool OpenInput(); bool OpenInput();
bool CloseInput(); bool CloseInput();
bool GetLine(string& str); int64_t GetLine(FILE* file);
bool Execute();
void WriteToStdin();
string fname; // Source with a potential "|" removed. string fname; // Source with a potential "|" removed.
istream* in;
FILE* file; FILE* file;
FILE* stderrfile;
bool execute; bool execute;
bool firstrun; bool firstrun;
time_t mtime; time_t mtime;
// options set from the script-level. // options set from the script-level.
string separator; string separator;
unsigned int sep_length; // length of the separator
static const int block_size;
int bufpos;
char* buf;
char* outbuf;
int stdin_fileno;
int stdout_fileno;
int stderr_fileno;
string stdin_string;
uint64_t stdin_towrite;
bool use_stderr;
bool forcekill;
int pipes[6];
pid_t childpid;
enum IoChannels {
stdout_in = 0,
stdout_out = 1,
stdin_in = 2,
stdin_out = 3,
stderr_in = 4,
stderr_out = 5
};
}; };
} }

View file

@ -357,6 +357,7 @@ void terminate_bro()
file_mgr->Terminate(); file_mgr->Terminate();
log_mgr->Terminate(); log_mgr->Terminate();
input_mgr->Terminate();
thread_mgr->Terminate(); thread_mgr->Terminate();
mgr.Drain(); mgr.Drain();

View file

@ -1 +1,2 @@
warning in <params>, line 1: event handler never invoked: this_is_never_used warning in <params>, line 1: event handler never invoked: this_is_never_used
warning in <params>, line 1: event handler never invoked: InputRaw::process_finished

View file

@ -0,0 +1,36 @@
[source=cat |, reader=Input::READER_RAW, mode=Input::STREAM, name=input2, fields=<no value description>, want_record=F, ev=line
{
print outfile, A::description;
print outfile, A::tpe;
print outfile, A::s;
try = try + 1;
if (2 == try)
{
Input::remove(input2);
close(outfile);
terminate();
}
}, config={
[stdin] = hello^Jthere^A^B^C^D^E^A^B^Cyay
}]
Input::EVENT_NEW
hello
[source=cat |, reader=Input::READER_RAW, mode=Input::STREAM, name=input2, fields=<no value description>, want_record=F, ev=line
{
print outfile, A::description;
print outfile, A::tpe;
print outfile, A::s;
try = try + 1;
if (2 == try)
{
Input::remove(input2);
close(outfile);
terminate();
}
}, config={
[stdin] = hello^Jthere^A^B^C^D^E^A^B^Cyay
}]
Input::EVENT_NEW
there^A^B^C^D^E^A^B^Cyay

View file

@ -0,0 +1,2 @@
hello
thereyay

View file

@ -0,0 +1,153 @@
[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::outfile, A::description;
print A::outfile, A::tpe;
print A::outfile, A::s;
A::try = A::try + 1;
if (8 == A::try)
{
print A::outfile, done;
close(A::outfile);
Input::remove(input);
terminate();
}
}, config={
}]
Input::EVENT_NEW
sdfkh:KH;fdkncv;ISEUp34:Fkdj;YVpIODhfDF
[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::outfile, A::description;
print A::outfile, A::tpe;
print A::outfile, A::s;
A::try = A::try + 1;
if (8 == A::try)
{
print A::outfile, done;
close(A::outfile);
Input::remove(input);
terminate();
}
}, config={
}]
Input::EVENT_NEW
DSF"DFKJ"SDFKLh304yrsdkfj@#(*U$34jfDJup3UF
[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::outfile, A::description;
print A::outfile, A::tpe;
print A::outfile, A::s;
A::try = A::try + 1;
if (8 == A::try)
{
print A::outfile, done;
close(A::outfile);
Input::remove(input);
terminate();
}
}, config={
}]
Input::EVENT_NEW
q3r3057fdf
[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::outfile, A::description;
print A::outfile, A::tpe;
print A::outfile, A::s;
A::try = A::try + 1;
if (8 == A::try)
{
print A::outfile, done;
close(A::outfile);
Input::remove(input);
terminate();
}
}, config={
}]
Input::EVENT_NEW
sdfs\d
[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::outfile, A::description;
print A::outfile, A::tpe;
print A::outfile, A::s;
A::try = A::try + 1;
if (8 == A::try)
{
print A::outfile, done;
close(A::outfile);
Input::remove(input);
terminate();
}
}, config={
}]
Input::EVENT_NEW
[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::outfile, A::description;
print A::outfile, A::tpe;
print A::outfile, A::s;
A::try = A::try + 1;
if (8 == A::try)
{
print A::outfile, done;
close(A::outfile);
Input::remove(input);
terminate();
}
}, config={
}]
Input::EVENT_NEW
dfsdf
[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::outfile, A::description;
print A::outfile, A::tpe;
print A::outfile, A::s;
A::try = A::try + 1;
if (8 == A::try)
{
print A::outfile, done;
close(A::outfile);
Input::remove(input);
terminate();
}
}, config={
}]
Input::EVENT_NEW
sdf
[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::outfile, A::description;
print A::outfile, A::tpe;
print A::outfile, A::s;
A::try = A::try + 1;
if (8 == A::try)
{
print A::outfile, done;
close(A::outfile);
Input::remove(input);
terminate();
}
}, config={
}]
Input::EVENT_NEW
3rw43wRRERLlL#RWERERERE.
done

View file

@ -0,0 +1,2 @@
Input::EVENT_NEW
8193

View file

@ -0,0 +1,27 @@
Input::EVENT_NEW
..:
F
Input::EVENT_NEW
bro
F
Input::EVENT_NEW
out
F
Input::EVENT_NEW
stderr.bro
F
Input::EVENT_NEW
stderr output contained nonexistant
T
Input::EVENT_NEW
stderr output contained nonexistant
T
Input::EVENT_NEW
stderr output contained nonexistant
T
done
End of Data event
input
Process finished event
input
Exit code != 0

View file

@ -0,0 +1,44 @@
# @TEST-EXEC: btest-bg-run bro bro -b %INPUT
# @TEST-EXEC: btest-bg-wait -k 5
# @TEST-EXEC: btest-diff test.txt
# @TEST-EXEC: btest-diff out
redef exit_only_after_terminate = T;
@load base/frameworks/communication # let network-time run. otherwise there are no heartbeats...
global outfile: file;
global try: count;
module A;
type Val: record {
s: string;
};
event line(description: Input::EventDescription, tpe: Input::Event, s: string)
{
print outfile, description;
print outfile, tpe;
print outfile, s;
try = try + 1;
if ( try == 2 )
{
Input::remove("input2");
close(outfile);
terminate();
}
}
event bro_init()
{
local config_strings: table[string] of string = {
["stdin"] = "hello\nthere\1\2\3\4\5\1\2\3yay"
#["stdin"] = "yay"
};
try = 0;
outfile = open("../out");
Input::add_event([$source="cat > ../test.txt |", $reader=Input::READER_RAW, $mode=Input::STREAM, $name="input", $fields=Val, $ev=line, $want_record=F, $config=config_strings]);
Input::remove("input");
Input::add_event([$source="cat |", $reader=Input::READER_RAW, $mode=Input::STREAM, $name="input2", $fields=Val, $ev=line, $want_record=F, $config=config_strings]);
}

View file

@ -0,0 +1,61 @@
# @TEST-EXEC: cp input1.log input.log
# @TEST-EXEC: btest-bg-run bro bro -b %INPUT
# @TEST-EXEC: sleep 3
# @TEST-EXEC: cat input2.log >> input.log
# @TEST-EXEC: sleep 3
# @TEST-EXEC: cat input3.log >> input.log
# @TEST-EXEC: btest-bg-wait -k 5
# @TEST-EXEC: btest-diff out
redef exit_only_after_terminate = T;
@TEST-START-FILE input1.log
sdfkh:KH;fdkncv;ISEUp34:Fkdj;YVpIODhfDF
@TEST-END-FILE
@TEST-START-FILE input2.log
DSF"DFKJ"SDFKLh304yrsdkfj@#(*U$34jfDJup3UF
q3r3057fdf
@TEST-END-FILE
@TEST-START-FILE input3.log
sdfs\d
dfsdf
sdf
3rw43wRRERLlL#RWERERERE.
@TEST-END-FILE
@load base/frameworks/communication # let network-time run
module A;
type Val: record {
s: string;
};
global try: count;
global outfile: file;
event line(description: Input::EventDescription, tpe: Input::Event, s: string)
{
print outfile, description;
print outfile, tpe;
print outfile, s;
try = try + 1;
if ( try == 8 )
{
print outfile, "done";
close(outfile);
Input::remove("input");
terminate();
}
}
event bro_init()
{
outfile = open("../out");
try = 0;
Input::add_event([$source="tail -f ../input.log |", $reader=Input::READER_RAW, $mode=Input::STREAM, $name="input", $fields=Val, $ev=line, $want_record=F]);
}

View file

@ -0,0 +1,37 @@
# @TEST-EXEC: dd if=/dev/zero of=input.log bs=8193 count=1
# @TEST-EXEC: btest-bg-run bro bro -b %INPUT
# @TEST-EXEC: btest-bg-wait -k 5
# @TEST-EXEC: btest-diff out
#
# this test should be longer than one block-size. to test behavior of input-reader if it has to re-allocate stuff.
redef exit_only_after_terminate = T;
global outfile: file;
global try: count;
module A;
type Val: record {
s: string;
};
event line(description: Input::EventDescription, tpe: Input::Event, s: string)
{
print outfile, tpe;
print outfile, |s|;
try = try + 1;
if ( try == 1 )
{
close(outfile);
terminate();
}
}
event bro_init()
{
try = 0;
outfile = open("../out");
Input::add_event([$source="../input.log", $reader=Input::READER_RAW, $mode=Input::STREAM, $name="input", $fields=Val, $ev=line, $want_record=F]);
Input::remove("input");
}

View file

@ -0,0 +1,66 @@
# @TEST-EXEC: btest-bg-run bro bro -b %INPUT
# @TEST-EXEC: btest-bg-wait -k 5
# @TEST-EXEC: btest-diff out
redef exit_only_after_terminate = T;
type Val: record {
s: string;
is_stderr: bool;
};
global try: count;
global outfile: file;
event line(description: Input::EventDescription, tpe: Input::Event, s: string, is_stderr: bool)
{
print outfile, tpe;
if ( is_stderr )
{
# work around localized error messages. and if some localization does not include the filename... well... that would be bad :)
if ( strstr(s, "nonexistant") > 0 )
{
print outfile, "stderr output contained nonexistant";
}
}
else
{
print outfile, s;
}
print outfile, is_stderr;
try = try + 1;
if ( try == 7 )
{
print outfile, "done";
Input::remove("input");
}
}
event Input::end_of_data(name: string, source:string)
{
print outfile, "End of Data event";
print outfile, name;
terminate(); # due to the current design, end_of_data will be called after process_finshed and all line events.
# this could potentially change
}
event InputRaw::process_finished(name: string, source:string, exit_code:count, signal_exit:bool)
{
print outfile, "Process finished event";
print outfile, name;
if ( exit_code != 0 )
print outfile, "Exit code != 0";
}
event bro_init()
{
local config_strings: table[string] of string = {
["read_stderr"] = "1"
};
outfile = open("../out");
try = 0;
Input::add_event([$source="ls .. ../nonexistant ../nonexistant2 ../nonexistant3 |", $reader=Input::READER_RAW, $name="input", $fields=Val, $ev=line, $want_record=F, $config=config_strings]);
}