A bunch of more changes for the raw reader

* send end_of_data event for all kind of streams
* send process_finished event containing exit code of child process for executed programs
* move raw-tests to separate directory
* expose name of input stream to readers
* better handling of some error cases in raw reader
* new force_kill option for raw reader which SIGKILLs progesses on exit

The ordering of events how they arrive in the main loop is a bit peculiar at the moment.
The process_finished event arrives in scriptland before all of the other events, even though
it should be sent last. I have not yet fully figured that out.
This commit is contained in:
Bernhard Amann 2013-03-18 21:49:16 -07:00
parent f1c91f02ce
commit 8875953751
21 changed files with 166 additions and 174 deletions

View file

@ -6,4 +6,12 @@ export {
## Separator between input records. ## Separator between input records.
## Please note that the separator has to be exactly one character long ## Please note that the separator has to be exactly one character long
const record_separator = "\n" &redef; const record_separator = "\n" &redef;
## Event that is called, when a process created by the raw reader exits.
##
## name: name of the input stream
## source: source of the input stream
## exit_code: exit code of the program, or number of the signal that forced the program to exit
## signal_exit: false when program exitted normally, true when program was forced to exit by a signal
global process_finished: event(name: string, source:string, exit_code:count, signal_exit:bool);
} }

View file

@ -299,6 +299,7 @@ bool Manager::CreateStream(Stream* info, RecordVal* description)
ReaderBackend::ReaderInfo* rinfo = new ReaderBackend::ReaderInfo(); ReaderBackend::ReaderInfo* rinfo = new ReaderBackend::ReaderInfo();
rinfo->source = copy_string(source.c_str()); rinfo->source = copy_string(source.c_str());
rinfo->name = copy_string(name.c_str());
EnumVal* mode = description->LookupWithDefault(rtype->FieldOffset("mode"))->AsEnumVal(); EnumVal* mode = description->LookupWithDefault(rtype->FieldOffset("mode"))->AsEnumVal();
switch ( mode->InternalInt() ) switch ( mode->InternalInt() )
@ -1175,6 +1176,9 @@ void Manager::EndCurrentSend(ReaderFrontend* reader)
if ( i->stream_type == EVENT_STREAM ) if ( i->stream_type == EVENT_STREAM )
{ {
#ifdef DEBUG
DBG_LOG(DBG_INPUT, "%s is event, sending end of data", i->name.c_str());
#endif
// just signal the end of the data source // just signal the end of the data source
SendEndOfData(i); SendEndOfData(i);
return; return;
@ -1281,6 +1285,10 @@ void Manager::SendEndOfData(ReaderFrontend* reader)
void Manager::SendEndOfData(const Stream *i) void Manager::SendEndOfData(const Stream *i)
{ {
#ifdef DEBUG
DBG_LOG(DBG_INPUT, "SendEndOfData for stream %s",
i->name.c_str());
#endif
SendEvent(end_of_data, 2, new StringVal(i->name.c_str()), new StringVal(i->info->source)); SendEvent(end_of_data, 2, new StringVal(i->name.c_str()), new StringVal(i->info->source));
} }

View file

@ -85,6 +85,11 @@ public:
*/ */
const char* source; const char* source;
/**
* The name of the input stream.
*/
const char* name;
/** /**
* A map of key/value pairs corresponding to the relevant * A map of key/value pairs corresponding to the relevant
* filter's "config" table. * filter's "config" table.
@ -99,12 +104,14 @@ public:
ReaderInfo() ReaderInfo()
{ {
source = 0; source = 0;
name = 0;
mode = MODE_NONE; mode = MODE_NONE;
} }
ReaderInfo(const ReaderInfo& other) ReaderInfo(const ReaderInfo& other)
{ {
source = other.source ? copy_string(other.source) : 0; source = other.source ? copy_string(other.source) : 0;
name = other.name ? copy_string(other.name) : 0;
mode = other.mode; mode = other.mode;
for ( config_map::const_iterator i = other.config.begin(); i != other.config.end(); i++ ) for ( config_map::const_iterator i = other.config.begin(); i != other.config.end(); i++ )

View file

@ -6,6 +6,7 @@
#include "../../threading/SerialTypes.h" #include "../../threading/SerialTypes.h"
#include <sys/types.h> #include <sys/types.h>
#include <sys/wait.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <unistd.h> #include <unistd.h>
#include <stdio.h> #include <stdio.h>
@ -23,6 +24,8 @@ const int Raw::block_size = 512; // how big do we expect our chunks of data to b
Raw::Raw(ReaderFrontend *frontend) : ReaderBackend(frontend) Raw::Raw(ReaderFrontend *frontend) : ReaderBackend(frontend)
{ {
file = 0; file = 0;
stderrfile = 0;
forcekill = false;
separator.assign( (const char*) BifConst::InputRaw::record_separator->Bytes(), separator.assign( (const char*) BifConst::InputRaw::record_separator->Bytes(),
BifConst::InputRaw::record_separator->Len()); BifConst::InputRaw::record_separator->Len());
@ -36,11 +39,6 @@ Raw::Raw(ReaderFrontend *frontend) : ReaderBackend(frontend)
stdout_fileno = fileno(stdout); stdout_fileno = fileno(stdout);
stderr_fileno = fileno(stderr); stderr_fileno = fileno(stderr);
// and because we later assume this...
assert(stdin_fileno == 0);
assert(stdout_fileno == 1);
assert(stderr_fileno == 2);
childpid = -1; childpid = -1;
stdin_towrite = 0; // by default do not open stdin stdin_towrite = 0; // by default do not open stdin
@ -58,9 +56,17 @@ void Raw::DoClose()
CloseInput(); CloseInput();
if ( execute && childpid > 0 ) if ( execute && childpid > 0 && kill(childpid, 0) == 0 )
{
// kill child process // kill child process
kill(childpid, 9); // TERMINATOR kill(childpid, 15); // sigterm
if ( forcekill )
{
usleep(200); // 200 msecs should be enough for anyone ;)
if ( kill(childpid, 0) == 0 ) // perhaps it is already gone
kill(childpid, 9); // TERMINATE
}
}
} }
bool Raw::Execute() bool Raw::Execute()
@ -112,22 +118,26 @@ bool Raw::Execute()
if ( stdin_towrite ) if ( stdin_towrite )
{ {
close(pipes[stdin_in]); close(pipes[stdin_in]);
fcntl(pipes[stdin_out], F_SETFL, O_NONBLOCK); fcntl(pipes[stdin_out], F_SETFL, O_NONBLOCK); // ya, just always set this to nonblocking. we do not want to block on a program receiving data.
// note that there is a small gotcha with it. More data is queued when more data is read from the program output. Hence, when having
// a program in mode_manual where the first write cannot write everything, the rest will be stuck in a queue that is never emptied.
} }
if ( use_stderr ) if ( use_stderr )
{ {
close(pipes[stderr_out]); close(pipes[stderr_out]);
fcntl(pipes[stderr_in], F_SETFL, O_NONBLOCK); fcntl(pipes[stderr_in], F_SETFL, O_NONBLOCK); // true for this too.
} }
file = fdopen(pipes[stdout_in], "r"); file = fdopen(pipes[stdout_in], "r");
stderrfile = fdopen(pipes[stderr_in], "r");
if ( file == 0 || (stderrfile == 0 && use_stderr) ) if ( use_stderr )
{ stderrfile = fdopen(pipes[stderr_in], "r");
Error("Could not convert fileno to file"); if ( file == 0 || (stderrfile == 0 && use_stderr) )
return false; {
} Error("Could not convert fileno to file");
return false;
}
return true; return true;
@ -165,15 +175,19 @@ bool Raw::CloseInput()
Debug(DBG_INPUT, "Raw reader starting close"); Debug(DBG_INPUT, "Raw reader starting close");
#endif #endif
fclose(file);
if ( use_stderr )
fclose(stderrfile);
if ( execute ) // we do not care if any of those fails. They should all be defined. if ( execute ) // we do not care if any of those fails. They should all be defined.
{ {
for ( int i = 0; i < 6; i ++ ) for ( int i = 0; i < 6; i ++ )
close(pipes[i]); close(pipes[i]);
} }
else
fclose(file);
file = 0; file = 0;
stderrfile = 0;
#ifdef DEBUG #ifdef DEBUG
Debug(DBG_INPUT, "Raw reader finished close"); Debug(DBG_INPUT, "Raw reader finished close");
@ -220,10 +234,16 @@ bool Raw::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fie
want_fields = 2; want_fields = 2;
} }
it = info.config.find("force_kill"); // we want to be sure that our child is dead when we exit
if ( it != info.config.end() && execute )
{
forcekill = true;
}
if ( num_fields != want_fields ) if ( num_fields != want_fields )
{ {
Error(Fmt("Filter for raw reader contains wrong number of fields -- got %d, expected %d. " Error(Fmt("Filter for raw reader contains wrong number of fields -- got %d, expected %d. "
"Filters for the raw reader contain one field when used in normal mode and 2 fields when using execute mode with stderr capuring. " "Filters for the raw reader contain one string field when used in normal mode and one string and one bool fields when using execute mode with stderr capuring. "
"Filter ignored.", num_fields, want_fields)); "Filter ignored.", num_fields, want_fields));
return false; return false;
} }
@ -236,6 +256,14 @@ bool Raw::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fie
if ( use_stderr && fields[1]->type != TYPE_BOOL ) if ( use_stderr && fields[1]->type != TYPE_BOOL )
{ {
Error("Second field for raw reader always has to be of type bool."); Error("Second field for raw reader always has to be of type bool.");
return false;
}
if ( execute && Info().mode == MODE_REREAD )
{
// for execs this makes no sense - would have to execute each heartbeat?
Error("Rereading only supported for files, not for executables.");
return false;
} }
@ -353,8 +381,14 @@ void Raw::WriteToStdin()
if ( stdin_towrite == 0 ) // send EOF when we are done. if ( stdin_towrite == 0 ) // send EOF when we are done.
close(pipes[stdin_out]); close(pipes[stdin_out]);
if ( Info().mode == MODE_MANUAL && stdin_towrite != 0 )
{
Error(Fmt("Could not write whole string to stdin of child process in one go. Please use STREAM mode to pass more data to child."));
}
} }
// read the entire file and send appropriate thingies back to InputMgr // read the entire file and send appropriate thingies back to InputMgr
bool Raw::DoUpdate() bool Raw::DoUpdate()
{ {
@ -366,6 +400,7 @@ bool Raw::DoUpdate()
switch ( Info().mode ) { switch ( Info().mode ) {
case MODE_REREAD: case MODE_REREAD:
{ {
assert(childpid == -1); // mode may not be used to execute child programs
// check if the file has changed // check if the file has changed
struct stat sb; struct stat sb;
if ( stat(fname.c_str(), &sb) == -1 ) if ( stat(fname.c_str(), &sb) == -1 )
@ -388,7 +423,6 @@ bool Raw::DoUpdate()
case MODE_STREAM: case MODE_STREAM:
if ( Info().mode == MODE_STREAM && file != 0 ) if ( Info().mode == MODE_STREAM && file != 0 )
{ {
//fpurge(file);
clearerr(file); // remove end of file evil bits clearerr(file); // remove end of file evil bits
break; break;
} }
@ -412,7 +446,7 @@ bool Raw::DoUpdate()
WriteToStdin(); WriteToStdin();
int64_t length = GetLine(file); int64_t length = GetLine(file);
printf("Read %lld bytes\n", length); //printf("Read %lld bytes\n", length);
if ( length == -3 ) if ( length == -3 )
return false; return false;
@ -444,7 +478,7 @@ bool Raw::DoUpdate()
for ( ;; ) for ( ;; )
{ {
int64_t length = GetLine(stderrfile); int64_t length = GetLine(stderrfile);
printf("Read stderr %lld bytes\n", length); //printf("Read stderr %lld bytes\n", length);
if ( length == -3 ) if ( length == -3 )
return false; return false;
else if ( length == -2 || length == -1 ) else if ( length == -2 || length == -1 )
@ -464,6 +498,50 @@ bool Raw::DoUpdate()
outbuf = 0; outbuf = 0;
} }
if ( ( Info().mode == MODE_MANUAL ) || ( Info().mode == MODE_REREAD ) )
// done with the current data source :)
EndCurrentSend();
// and let's check if the child process is still alive
int return_code;
if ( waitpid(childpid, &return_code, WNOHANG) != 0 ) {
// child died :(
bool signal = false;
int code = 0;
if ( WIFEXITED(return_code) ) {
code = WEXITSTATUS(return_code);
if ( code != 0 )
Error(Fmt("Child process exited with non-zero return code %d", code));
} else if ( WIFSIGNALED(return_code) ) {
signal = false;
code = WTERMSIG(return_code);
Error(Fmt("Child process exited due to signal %d", code));
} else {
assert(false);
}
Value** vals = new Value*[4];
vals[0] = new Value(TYPE_STRING, true);
vals[0]->val.string_val.data = copy_string(Info().name);
vals[0]->val.string_val.length = strlen(Info().name);
vals[1] = new Value(TYPE_STRING, true);
vals[1]->val.string_val.data = copy_string(Info().source);
vals[1]->val.string_val.length = strlen(Info().source);
vals[2] = new Value(TYPE_COUNT, true);
vals[2]->val.int_val = code;
vals[3] = new Value(TYPE_BOOL, true);
vals[3]->val.int_val = signal;
// and in this case we can signal end_of_data even for the streaming reader
if ( Info().mode == MODE_STREAM )
EndCurrentSend();
SendEvent("InputRaw::process_finished", 4, vals);
}
#ifdef DEBUG #ifdef DEBUG
Debug(DBG_INPUT, "DoUpdate finished successfully"); Debug(DBG_INPUT, "DoUpdate finished successfully");
#endif #endif

View file

@ -58,6 +58,8 @@ private:
bool use_stderr; bool use_stderr;
bool forcekill;
int pipes[6]; int pipes[6];
pid_t childpid; pid_t childpid;

View file

@ -1,148 +0,0 @@
[source=ls .. ../nonexistant ../nonexistant2 ../nonexistant3 |, reader=Input::READER_RAW, mode=Input::MANUAL, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::outfile, A::description;
print A::outfile, A::tpe;
print A::outfile, A::s;
print A::outfile, A::is_stderr;
A::try = A::try + 1;
if (7 == A::try)
{
print A::outfile, done;
close(A::outfile);
Input::remove(input);
terminate();
}
}, config={
[read_stderr] = 1
}]
Input::EVENT_NEW
..:
F
[source=ls .. ../nonexistant ../nonexistant2 ../nonexistant3 |, reader=Input::READER_RAW, mode=Input::MANUAL, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::outfile, A::description;
print A::outfile, A::tpe;
print A::outfile, A::s;
print A::outfile, A::is_stderr;
A::try = A::try + 1;
if (7 == A::try)
{
print A::outfile, done;
close(A::outfile);
Input::remove(input);
terminate();
}
}, config={
[read_stderr] = 1
}]
Input::EVENT_NEW
bro
F
[source=ls .. ../nonexistant ../nonexistant2 ../nonexistant3 |, reader=Input::READER_RAW, mode=Input::MANUAL, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::outfile, A::description;
print A::outfile, A::tpe;
print A::outfile, A::s;
print A::outfile, A::is_stderr;
A::try = A::try + 1;
if (7 == A::try)
{
print A::outfile, done;
close(A::outfile);
Input::remove(input);
terminate();
}
}, config={
[read_stderr] = 1
}]
Input::EVENT_NEW
executestreamrawstderr.bro
F
[source=ls .. ../nonexistant ../nonexistant2 ../nonexistant3 |, reader=Input::READER_RAW, mode=Input::MANUAL, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::outfile, A::description;
print A::outfile, A::tpe;
print A::outfile, A::s;
print A::outfile, A::is_stderr;
A::try = A::try + 1;
if (7 == A::try)
{
print A::outfile, done;
close(A::outfile);
Input::remove(input);
terminate();
}
}, config={
[read_stderr] = 1
}]
Input::EVENT_NEW
out
F
[source=ls .. ../nonexistant ../nonexistant2 ../nonexistant3 |, reader=Input::READER_RAW, mode=Input::MANUAL, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::outfile, A::description;
print A::outfile, A::tpe;
print A::outfile, A::s;
print A::outfile, A::is_stderr;
A::try = A::try + 1;
if (7 == A::try)
{
print A::outfile, done;
close(A::outfile);
Input::remove(input);
terminate();
}
}, config={
[read_stderr] = 1
}]
Input::EVENT_NEW
ls: ../nonexistant: No such file or directory
T
[source=ls .. ../nonexistant ../nonexistant2 ../nonexistant3 |, reader=Input::READER_RAW, mode=Input::MANUAL, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::outfile, A::description;
print A::outfile, A::tpe;
print A::outfile, A::s;
print A::outfile, A::is_stderr;
A::try = A::try + 1;
if (7 == A::try)
{
print A::outfile, done;
close(A::outfile);
Input::remove(input);
terminate();
}
}, config={
[read_stderr] = 1
}]
Input::EVENT_NEW
ls: ../nonexistant2: No such file or directory
T
[source=ls .. ../nonexistant ../nonexistant2 ../nonexistant3 |, reader=Input::READER_RAW, mode=Input::MANUAL, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::outfile, A::description;
print A::outfile, A::tpe;
print A::outfile, A::s;
print A::outfile, A::is_stderr;
A::try = A::try + 1;
if (7 == A::try)
{
print A::outfile, done;
close(A::outfile);
Input::remove(input);
terminate();
}
}, config={
[read_stderr] = 1
}]
Input::EVENT_NEW
ls: ../nonexistant3: No such file or directory
T
done

View file

@ -0,0 +1,27 @@
Process finished event
input
1
Input::EVENT_NEW
..:
F
Input::EVENT_NEW
bro
F
Input::EVENT_NEW
out
F
Input::EVENT_NEW
stderr.bro
F
Input::EVENT_NEW
ls: ../nonexistant: No such file or directory
T
Input::EVENT_NEW
ls: ../nonexistant2: No such file or directory
T
Input::EVENT_NEW
ls: ../nonexistant3: No such file or directory
T
done
End of Data event
input

View file

@ -4,8 +4,6 @@
redef exit_only_after_terminate = T; redef exit_only_after_terminate = T;
module A;
type Val: record { type Val: record {
s: string; s: string;
is_stderr: bool; is_stderr: bool;
@ -16,7 +14,6 @@ global outfile: file;
event line(description: Input::EventDescription, tpe: Input::Event, s: string, is_stderr: bool) event line(description: Input::EventDescription, tpe: Input::Event, s: string, is_stderr: bool)
{ {
print outfile, description;
print outfile, tpe; print outfile, tpe;
print outfile, s; print outfile, s;
print outfile, is_stderr; print outfile, is_stderr;
@ -25,12 +22,25 @@ event line(description: Input::EventDescription, tpe: Input::Event, s: string, i
if ( try == 7 ) if ( try == 7 )
{ {
print outfile, "done"; print outfile, "done";
close(outfile);
Input::remove("input"); Input::remove("input");
terminate();
} }
} }
event Input::end_of_data(name: string, source:string)
{
print outfile, "End of Data event";
print outfile, name;
terminate(); # due to the current design, end_of_data will be called after process_finshed and all line events.
# this could potentially change
}
event InputRaw::process_finished(name: string, source:string, exit_code:count, signal_exit:bool)
{
print outfile, "Process finished event";
print outfile, name;
print outfile, exit_code;
}
event bro_init() event bro_init()
{ {