update to execute raw.

support reading from commands by adppending | to the filename.

support streaming reads from command.

Fix something to make rearead work better. (magically happened)
This commit is contained in:
Bernhard Amann 2012-03-20 12:07:37 -07:00
parent 88e0cea598
commit 08e1771682
12 changed files with 1364 additions and 360 deletions

View file

@ -35,10 +35,12 @@
#include <cstring>
// low-level read and write functions
#ifdef _MSC_VER
# include <io.h>
#else
# include <sys/errno.h>
# include <unistd.h>
//extern "C" {
// int write (int fd, const char* buf, int num);
@ -154,6 +156,9 @@ class fdinbuf : public std::streambuf {
// read at most bufSize new characters
int num;
num = read (fd, buffer+pbSize, bufSize);
if ( num == EAGAIN ) {
return 0;
}
if (num <= 0) {
// ERROR or EOF
return EOF;

View file

@ -12,11 +12,11 @@
#define MANUAL 0
#define REREAD 1
#define STREAM 2
#define EXECUTE 3
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <stdio.h>
using namespace input::reader;
using threading::Value;
@ -44,52 +44,73 @@ Raw::~Raw()
void Raw::DoFinish()
{
if ( file != 0 ) {
if ( mode != EXECUTE ) {
file->close();
delete(file);
} else { // mode == EXECUTE
delete(in);
pclose(pfile);
}
file = 0;
in = 0;
Close();
}
}
bool Raw::Open()
{
if ( execute ) {
file = popen(fname.c_str(), "r");
if ( file == NULL ) {
Error(Fmt("Could not execute command %s", fname.c_str()));
return false;
}
} else {
file = fopen(fname.c_str(), "r");
if ( file == NULL ) {
Error(Fmt("Init: cannot open %s", fname.c_str()));
return false;
}
}
in = new boost::fdistream(fileno(file));
if ( execute && mode == STREAM ) {
fcntl(fileno(file), F_SETFL, O_NONBLOCK);
}
return true;
}
bool Raw::Close()
{
if ( file == NULL ) {
InternalError(Fmt("Trying to close closed file for stream %s", fname.c_str()));
return false;
}
if ( execute ) {
delete(in);
pclose(file);
} else {
delete(in);
fclose(file);
}
in = NULL;
file = NULL;
return true;
}
bool Raw::DoInit(string path, int arg_mode, int arg_num_fields, const Field* const* arg_fields)
{
fname = path;
mode = arg_mode;
mtime = 0;
if ( ( mode != MANUAL ) && (mode != REREAD) && ( mode != STREAM ) && ( mode != EXECUTE ) ) {
Error(Fmt("Unsupported read mode %d for source %s", mode, path.c_str()));
return false;
}
execute = false;
firstrun = true;
bool result;
if ( mode != EXECUTE ) {
file = new ifstream(path.c_str());
if ( !file->is_open() ) {
Error(Fmt("Init: cannot open %s", fname.c_str()));
return false;
}
in = file;
} else { // mode == EXECUTE
pfile = popen(path.c_str(), "r");
if ( pfile == NULL ) {
Error(Fmt("Could not execute command %s", path.c_str()));
return false;
}
in = new boost::fdistream(fileno(pfile));
}
num_fields = arg_num_fields;
fields = arg_fields;
if ( path.length() == 0 ) {
Error("No source path provided");
return false;
}
if ( arg_num_fields != 1 ) {
Error("Filter for raw reader contains more than one field. Filters for the raw reader may only contain exactly one string field. Filter ignored.");
return false;
@ -100,12 +121,45 @@ bool Raw::DoInit(string path, int arg_mode, int arg_num_fields, const Field* con
return false;
}
// do Initialization
char last = path[path.length()-1];
if ( last == '|' ) {
execute = true;
fname = path.substr(0, fname.length() - 1);
if ( ( mode != MANUAL ) && ( mode != STREAM ) ) {
Error(Fmt("Unsupported read mode %d for source %s in execution mode", mode, fname.c_str()));
return false;
}
result = Open();
} else {
execute = false;
if ( ( mode != MANUAL ) && (mode != REREAD) && ( mode != STREAM ) ) {
Error(Fmt("Unsupported read mode %d for source %s", mode, fname.c_str()));
return false;
}
result = Open();
}
if ( result == false ) {
return result;
}
#ifdef DEBUG
Debug(DBG_INPUT, "Raw reader created, will perform first update");
#endif
// after initialization - do update
DoUpdate();
#ifdef DEBUG
Debug(DBG_INPUT, "First update went through");
#endif
return true;
}
@ -121,56 +175,45 @@ bool Raw::GetLine(string& str) {
// read the entire file and send appropriate thingies back to InputMgr
bool Raw::DoUpdate() {
switch ( mode ) {
case REREAD:
// check if the file has changed
struct stat sb;
if ( stat(fname.c_str(), &sb) == -1 ) {
Error(Fmt("Could not get stat for %s", fname.c_str()));
return false;
}
if ( firstrun ) {
firstrun = false;
} else {
switch ( mode ) {
case REREAD:
// check if the file has changed
struct stat sb;
if ( stat(fname.c_str(), &sb) == -1 ) {
Error(Fmt("Could not get stat for %s", fname.c_str()));
return false;
}
if ( sb.st_mtime <= mtime ) {
// no change
return true;
}
if ( sb.st_mtime <= mtime ) {
// no change
return true;
}
mtime = sb.st_mtime;
// file changed. reread.
mtime = sb.st_mtime;
// file changed. reread.
// fallthrough
case MANUAL:
case STREAM:
if ( file && file->is_open() ) {
if ( mode == STREAM ) {
file->clear(); // remove end of file evil bits
// fallthrough
case MANUAL:
case STREAM:
Debug(DBG_INPUT, "Updating");
if ( mode == STREAM && file != NULL && in != NULL ) {
fpurge(file);
in->clear(); // remove end of file evil bits
break;
}
file->close();
}
file = new ifstream(fname.c_str());
if ( !file->is_open() ) {
Error(Fmt("cannot open %s", fname.c_str()));
return false;
}
break;
case EXECUTE:
// re-execute it...
pclose(pfile);
pfile = popen(fname.c_str(), "r");
if ( pfile == NULL ) {
Error(Fmt("Could not execute command %s", fname.c_str()));
return false;
}
in = new boost::fdistream(fileno(pfile));
break;
default:
assert(false);
Close();
if ( !Open() ) {
return false;
}
break;
default:
assert(false);
}
}
string line;
@ -195,9 +238,10 @@ bool Raw::DoHeartbeat(double network_time, double current_time)
{
ReaderBackend::DoHeartbeat(network_time, current_time);
Debug(DBG_INPUT, "Heartbeat");
switch ( mode ) {
case MANUAL:
case EXECUTE:
// yay, we do nothing :)
break;
case REREAD:

View file

@ -28,13 +28,14 @@ protected:
private:
virtual bool DoHeartbeat(double network_time, double current_time);
bool Open();
bool Close();
bool GetLine(string& str);
istream* in;
ifstream* file;
FILE* pfile;
FILE* file;
string fname;
@ -45,6 +46,8 @@ private:
string headerline;
int mode;
bool execute;
bool firstrun;
time_t mtime;

View file

@ -186,7 +186,6 @@ enum Mode %{
MANUAL = 0,
REREAD = 1,
STREAM = 2,
EXECUTE = 3,
%}
module GLOBAL;

View file

@ -0,0 +1,145 @@
[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, autostart=T, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::outfile, A::description;
print A::outfile, A::tpe;
print A::outfile, A::s;
A::try = A::try + 1;
if (9 == A::try)
{
print A::outfile, done;
close(A::outfile);
Input::remove(input);
}
}]
Input::EVENT_NEW
sdfkh:KH;fdkncv;ISEUp34:Fkdj;YVpIODhfDF
[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, autostart=T, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::outfile, A::description;
print A::outfile, A::tpe;
print A::outfile, A::s;
A::try = A::try + 1;
if (9 == A::try)
{
print A::outfile, done;
close(A::outfile);
Input::remove(input);
}
}]
Input::EVENT_NEW
DSF"DFKJ"SDFKLh304yrsdkfj@#(*U$34jfDJup3UF
[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, autostart=T, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::outfile, A::description;
print A::outfile, A::tpe;
print A::outfile, A::s;
A::try = A::try + 1;
if (9 == A::try)
{
print A::outfile, done;
close(A::outfile);
Input::remove(input);
}
}]
Input::EVENT_NEW
q3r3057fdf
[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, autostart=T, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::outfile, A::description;
print A::outfile, A::tpe;
print A::outfile, A::s;
A::try = A::try + 1;
if (9 == A::try)
{
print A::outfile, done;
close(A::outfile);
Input::remove(input);
}
}]
Input::EVENT_NEW
sdfs\d
[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, autostart=T, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::outfile, A::description;
print A::outfile, A::tpe;
print A::outfile, A::s;
A::try = A::try + 1;
if (9 == A::try)
{
print A::outfile, done;
close(A::outfile);
Input::remove(input);
}
}]
Input::EVENT_NEW
[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, autostart=T, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::outfile, A::description;
print A::outfile, A::tpe;
print A::outfile, A::s;
A::try = A::try + 1;
if (9 == A::try)
{
print A::outfile, done;
close(A::outfile);
Input::remove(input);
}
}]
Input::EVENT_NEW
dfsdf
[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, autostart=T, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::outfile, A::description;
print A::outfile, A::tpe;
print A::outfile, A::s;
A::try = A::try + 1;
if (9 == A::try)
{
print A::outfile, done;
close(A::outfile);
Input::remove(input);
}
}]
Input::EVENT_NEW
sdf
[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, autostart=T, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::outfile, A::description;
print A::outfile, A::tpe;
print A::outfile, A::s;
A::try = A::try + 1;
if (9 == A::try)
{
print A::outfile, done;
close(A::outfile);
Input::remove(input);
}
}]
Input::EVENT_NEW
3rw43wRRERLlL#RWERERERE.
[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, autostart=T, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::outfile, A::description;
print A::outfile, A::tpe;
print A::outfile, A::s;
A::try = A::try + 1;
if (9 == A::try)
{
print A::outfile, done;
close(A::outfile);
Input::remove(input);
}
}]
Input::EVENT_NEW
done

View file

@ -0,0 +1,128 @@
[source=input.log, reader=Input::READER_RAW, mode=Input::REREAD, autostart=T, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::description;
print A::tpe;
print A::s;
}]
Input::EVENT_NEW
sdfkh:KH;fdkncv;ISEUp34:Fkdj;YVpIODhfDF
[source=input.log, reader=Input::READER_RAW, mode=Input::REREAD, autostart=T, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::description;
print A::tpe;
print A::s;
}]
Input::EVENT_NEW
DSF"DFKJ"SDFKLh304yrsdkfj@#(*U$34jfDJup3UF
[source=input.log, reader=Input::READER_RAW, mode=Input::REREAD, autostart=T, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::description;
print A::tpe;
print A::s;
}]
Input::EVENT_NEW
q3r3057fdf
[source=input.log, reader=Input::READER_RAW, mode=Input::REREAD, autostart=T, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::description;
print A::tpe;
print A::s;
}]
Input::EVENT_NEW
sdfs\d
[source=input.log, reader=Input::READER_RAW, mode=Input::REREAD, autostart=T, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::description;
print A::tpe;
print A::s;
}]
Input::EVENT_NEW
[source=input.log, reader=Input::READER_RAW, mode=Input::REREAD, autostart=T, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::description;
print A::tpe;
print A::s;
}]
Input::EVENT_NEW
dfsdf
[source=input.log, reader=Input::READER_RAW, mode=Input::REREAD, autostart=T, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::description;
print A::tpe;
print A::s;
}]
Input::EVENT_NEW
sdf
[source=input.log, reader=Input::READER_RAW, mode=Input::REREAD, autostart=T, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::description;
print A::tpe;
print A::s;
}]
Input::EVENT_NEW
3rw43wRRERLlL#RWERERERE.
[source=input.log, reader=Input::READER_RAW, mode=Input::REREAD, autostart=T, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::description;
print A::tpe;
print A::s;
}]
Input::EVENT_NEW
sdfkh:KH;fdkncv;ISEUp34:Fkdj;YVpIODhfDF
[source=input.log, reader=Input::READER_RAW, mode=Input::REREAD, autostart=T, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::description;
print A::tpe;
print A::s;
}]
Input::EVENT_NEW
DSF"DFKJ"SDFKLh304yrsdkfj@#(*U$34jfDJup3UF
[source=input.log, reader=Input::READER_RAW, mode=Input::REREAD, autostart=T, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::description;
print A::tpe;
print A::s;
}]
Input::EVENT_NEW
q3r3057fdf
[source=input.log, reader=Input::READER_RAW, mode=Input::REREAD, autostart=T, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::description;
print A::tpe;
print A::s;
}]
Input::EVENT_NEW
sdfs\d
[source=input.log, reader=Input::READER_RAW, mode=Input::REREAD, autostart=T, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::description;
print A::tpe;
print A::s;
}]
Input::EVENT_NEW
[source=input.log, reader=Input::READER_RAW, mode=Input::REREAD, autostart=T, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::description;
print A::tpe;
print A::s;
}]
Input::EVENT_NEW
dfsdf
[source=input.log, reader=Input::READER_RAW, mode=Input::REREAD, autostart=T, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::description;
print A::tpe;
print A::s;
}]
Input::EVENT_NEW
sdf
[source=input.log, reader=Input::READER_RAW, mode=Input::REREAD, autostart=T, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::description;
print A::tpe;
print A::s;
}]
Input::EVENT_NEW
3rw43wRRERLlL#RWERERERE.

View file

@ -0,0 +1,120 @@
[source=../input.log, reader=Input::READER_RAW, mode=Input::STREAM, autostart=T, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::outfile, A::description;
print A::outfile, A::tpe;
print A::outfile, A::s;
if (3 == A::try)
{
print A::outfile, done;
close(A::outfile);
Input::remove(input);
}
}]
Input::EVENT_NEW
sdfkh:KH;fdkncv;ISEUp34:Fkdj;YVpIODhfDF
[source=../input.log, reader=Input::READER_RAW, mode=Input::STREAM, autostart=T, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::outfile, A::description;
print A::outfile, A::tpe;
print A::outfile, A::s;
if (3 == A::try)
{
print A::outfile, done;
close(A::outfile);
Input::remove(input);
}
}]
Input::EVENT_NEW
DSF"DFKJ"SDFKLh304yrsdkfj@#(*U$34jfDJup3UF
[source=../input.log, reader=Input::READER_RAW, mode=Input::STREAM, autostart=T, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::outfile, A::description;
print A::outfile, A::tpe;
print A::outfile, A::s;
if (3 == A::try)
{
print A::outfile, done;
close(A::outfile);
Input::remove(input);
}
}]
Input::EVENT_NEW
q3r3057fdf
[source=../input.log, reader=Input::READER_RAW, mode=Input::STREAM, autostart=T, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::outfile, A::description;
print A::outfile, A::tpe;
print A::outfile, A::s;
if (3 == A::try)
{
print A::outfile, done;
close(A::outfile);
Input::remove(input);
}
}]
Input::EVENT_NEW
sdfs\d
[source=../input.log, reader=Input::READER_RAW, mode=Input::STREAM, autostart=T, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::outfile, A::description;
print A::outfile, A::tpe;
print A::outfile, A::s;
if (3 == A::try)
{
print A::outfile, done;
close(A::outfile);
Input::remove(input);
}
}]
Input::EVENT_NEW
[source=../input.log, reader=Input::READER_RAW, mode=Input::STREAM, autostart=T, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::outfile, A::description;
print A::outfile, A::tpe;
print A::outfile, A::s;
if (3 == A::try)
{
print A::outfile, done;
close(A::outfile);
Input::remove(input);
}
}]
Input::EVENT_NEW
dfsdf
[source=../input.log, reader=Input::READER_RAW, mode=Input::STREAM, autostart=T, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::outfile, A::description;
print A::outfile, A::tpe;
print A::outfile, A::s;
if (3 == A::try)
{
print A::outfile, done;
close(A::outfile);
Input::remove(input);
}
}]
Input::EVENT_NEW
sdf
[source=../input.log, reader=Input::READER_RAW, mode=Input::STREAM, autostart=T, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::outfile, A::description;
print A::outfile, A::tpe;
print A::outfile, A::s;
if (3 == A::try)
{
print A::outfile, done;
close(A::outfile);
Input::remove(input);
}
}]
Input::EVENT_NEW
3rw43wRRERLlL#RWERERERE.

View file

@ -28,6 +28,6 @@ event line(description: Input::EventDescription, tpe: Input::Event, s: string) {
event bro_init()
{
Input::add_event([$source="wc input.log", $reader=Input::READER_RAW, $mode=Input::STREAM, $name="input", $fields=Val, $ev=line, $mode=Input::EXECUTE]);
Input::add_event([$source="wc input.log |", $reader=Input::READER_RAW, $name="input", $fields=Val, $ev=line]);
Input::remove("input");
}

View file

@ -0,0 +1,58 @@
#
# @TEST-EXEC: cp input1.log input.log
# @TEST-EXEC: btest-bg-run bro bro -b %INPUT
# @TEST-EXEC: sleep 3
# @TEST-EXEC: cat input2.log >> input.log
# @TEST-EXEC: sleep 3
# @TEST-EXEC: cat input3.log >> input.log
# @TEST-EXEC: btest-bg-wait -k 3
# @TEST-EXEC: btest-diff out
@TEST-START-FILE input1.log
sdfkh:KH;fdkncv;ISEUp34:Fkdj;YVpIODhfDF
@TEST-END-FILE
@TEST-START-FILE input2.log
DSF"DFKJ"SDFKLh304yrsdkfj@#(*U$34jfDJup3UF
q3r3057fdf
@TEST-END-FILE
@TEST-START-FILE input3.log
sdfs\d
dfsdf
sdf
3rw43wRRERLlL#RWERERERE.
@TEST-END-FILE
@load frameworks/communication/listen
module A;
type Val: record {
s: string;
};
global try: count;
global outfile: file;
event line(description: Input::EventDescription, tpe: Input::Event, s: string) {
print outfile, description;
print outfile, tpe;
print outfile, s;
try = try + 1;
if ( try == 9 ) {
print outfile, "done";
close(outfile);
Input::remove("input");
}
}
event bro_init()
{
outfile = open ("../out");
try = 0;
Input::add_event([$source="tail -f ../input.log |", $reader=Input::READER_RAW, $mode=Input::STREAM, $name="input", $fields=Val, $ev=line]);
}

View file

@ -0,0 +1,34 @@
#
# @TEST-EXEC: bro -b %INPUT >out
# @TEST-EXEC: btest-diff out
@TEST-START-FILE input.log
sdfkh:KH;fdkncv;ISEUp34:Fkdj;YVpIODhfDF
DSF"DFKJ"SDFKLh304yrsdkfj@#(*U$34jfDJup3UF
q3r3057fdf
sdfs\d
dfsdf
sdf
3rw43wRRERLlL#RWERERERE.
@TEST-END-FILE
module A;
type Val: record {
s: string;
};
event line(description: Input::EventDescription, tpe: Input::Event, s: string) {
print description;
print tpe;
print s;
}
event bro_init()
{
Input::add_event([$source="input.log", $reader=Input::READER_RAW, $mode=Input::REREAD, $name="input", $fields=Val, $ev=line]);
Input::force_update("input");
Input::remove("input");
}

View file

@ -0,0 +1,56 @@
#
# @TEST-EXEC: cp input1.log input.log
# @TEST-EXEC: btest-bg-run bro bro -b %INPUT
# @TEST-EXEC: sleep 3
# @TEST-EXEC: cat input2.log >> input.log
# @TEST-EXEC: sleep 3
# @TEST-EXEC: cat input3.log >> input.log
# @TEST-EXEC: btest-bg-wait -k 3
# @TEST-EXEC: btest-diff out
@TEST-START-FILE input1.log
sdfkh:KH;fdkncv;ISEUp34:Fkdj;YVpIODhfDF
@TEST-END-FILE
@TEST-START-FILE input2.log
DSF"DFKJ"SDFKLh304yrsdkfj@#(*U$34jfDJup3UF
q3r3057fdf
@TEST-END-FILE
@TEST-START-FILE input3.log
sdfs\d
dfsdf
sdf
3rw43wRRERLlL#RWERERERE.
@TEST-END-FILE
@load frameworks/communication/listen
module A;
type Val: record {
s: string;
};
global try: count;
global outfile: file;
event line(description: Input::EventDescription, tpe: Input::Event, s: string) {
print outfile, description;
print outfile, tpe;
print outfile, s;
if ( try == 3 ) {
print outfile, "done";
close(outfile);
Input::remove("input");
}
}
event bro_init()
{
outfile = open ("../out");
try = 0;
Input::add_event([$source="../input.log", $reader=Input::READER_RAW, $mode=Input::STREAM, $name="input", $fields=Val, $ev=line]);
}