raw input reader for seth, which can simply read a file into string-events given a line separator.

This commit is contained in:
Bernhard Amann 2012-02-22 09:44:45 -08:00
parent 531189b5fd
commit 7e5f733826
11 changed files with 363 additions and 2 deletions

View file

@ -1,3 +1,4 @@
@load ./main @load ./main
@load ./readers/ascii @load ./readers/ascii
@load ./readers/raw

View file

@ -0,0 +1,9 @@
##! Interface for the raw input reader.
module InputRaw;
export {
## Separator between input records.
## Please note that the separator has to be exactly one character long
const record_separator = "\n" &redef;
}

View file

@ -424,6 +424,7 @@ set(bro_SRCS
input/ReaderBackend.cc input/ReaderBackend.cc
input/ReaderFrontend.cc input/ReaderFrontend.cc
input/readers/Ascii.cc input/readers/Ascii.cc
input/readers/Raw.cc
${dns_SRCS} ${dns_SRCS}

View file

@ -62,3 +62,5 @@ const set_separator: string;
const empty_field: string; const empty_field: string;
const unset_field: string; const unset_field: string;
module InputRaw;
const record_separator: string;

View file

@ -6,6 +6,7 @@
#include "ReaderFrontend.h" #include "ReaderFrontend.h"
#include "ReaderBackend.h" #include "ReaderBackend.h"
#include "readers/Ascii.h" #include "readers/Ascii.h"
#include "readers/Raw.h"
#include "Event.h" #include "Event.h"
#include "EventHandler.h" #include "EventHandler.h"
@ -143,6 +144,7 @@ struct ReaderDefinition {
ReaderDefinition input_readers[] = { ReaderDefinition input_readers[] = {
{ BifEnum::Input::READER_ASCII, "Ascii", 0, reader::Ascii::Instantiate }, { BifEnum::Input::READER_ASCII, "Ascii", 0, reader::Ascii::Instantiate },
{ BifEnum::Input::READER_RAW, "Raw", 0, reader::Raw::Instantiate },
// End marker // End marker
{ BifEnum::Input::READER_DEFAULT, "None", 0, (ReaderBackend* (*)(ReaderFrontend* frontend))0 } { BifEnum::Input::READER_DEFAULT, "None", 0, (ReaderBackend* (*)(ReaderFrontend* frontend))0 }

View file

@ -133,6 +133,7 @@ bool Ascii::DoStartReading() {
bool Ascii::DoAddFilter( int id, int arg_num_fields, const Field* const* fields ) { bool Ascii::DoAddFilter( int id, int arg_num_fields, const Field* const* fields ) {
if ( HasFilter(id) ) { if ( HasFilter(id) ) {
Error("Filter was added twice, ignoring.");
return false; // no, we don't want to add this a second time return false; // no, we don't want to add this a second time
} }
@ -147,6 +148,7 @@ bool Ascii::DoAddFilter( int id, int arg_num_fields, const Field* const* fields
bool Ascii::DoRemoveFilter ( int id ) { bool Ascii::DoRemoveFilter ( int id ) {
if (!HasFilter(id) ) { if (!HasFilter(id) ) {
Error("Filter removal of nonexisting filter requested.");
return false; return false;
} }
@ -263,11 +265,11 @@ TransportProto Ascii::StringToProto(const string &proto) {
Value* Ascii::EntryToVal(string s, FieldMapping field) { Value* Ascii::EntryToVal(string s, FieldMapping field) {
Value* val = new Value(field.type, true);
if ( s.compare(unset_field) == 0 ) { // field is not set... if ( s.compare(unset_field) == 0 ) { // field is not set...
return new Value(field.type, false); return new Value(field.type, false);
} }
Value* val = new Value(field.type, true);
switch ( field.type ) { switch ( field.type ) {
case TYPE_ENUM: case TYPE_ENUM:

230
src/input/readers/Raw.cc Normal file
View file

@ -0,0 +1,230 @@
// See the file "COPYING" in the main distribution directory for copyright.
#include "Raw.h"
#include "NetVar.h"
#include <fstream>
#include <sstream>
#include "../../threading/SerializationTypes.h"
#define MANUAL 0
#define REREAD 1
#define STREAM 2
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
using namespace input::reader;
using threading::Value;
using threading::Field;
Raw::Raw(ReaderFrontend *frontend) : ReaderBackend(frontend)
{
file = 0;
//keyMap = new map<string, string>();
separator.assign( (const char*) BifConst::InputRaw::record_separator->Bytes(), BifConst::InputRaw::record_separator->Len());
if ( separator.size() != 1 ) {
Error("separator length has to be 1. Separator will be truncated.");
}
}
Raw::~Raw()
{
DoFinish();
}
void Raw::DoFinish()
{
filters.empty();
if ( file != 0 ) {
file->close();
delete(file);
file = 0;
}
}
bool Raw::DoInit(string path, int arg_mode)
{
started = false;
fname = path;
mode = arg_mode;
mtime = 0;
if ( ( mode != MANUAL ) && (mode != REREAD) && ( mode != STREAM ) ) {
Error(Fmt("Unsupported read mode %d for source %s", mode, path.c_str()));
return false;
}
file = new ifstream(path.c_str());
if ( !file->is_open() ) {
Error(Fmt("Init: cannot open %s", fname.c_str()));
return false;
}
return true;
}
bool Raw::DoStartReading() {
if ( started == true ) {
Error("Started twice");
return false;
}
started = true;
switch ( mode ) {
case MANUAL:
case REREAD:
case STREAM:
DoUpdate();
break;
default:
assert(false);
}
return true;
}
bool Raw::DoAddFilter( int id, int arg_num_fields, const Field* const* fields ) {
if ( arg_num_fields != 1 ) {
Error("Filter for raw reader contains more than one field. Filters for the raw reader may only contain exactly one string field. Filter ignored.");
return false;
}
if ( fields[0]->type != TYPE_STRING ) {
Error("Filter for raw reader contains a field that is not of type string.");
return false;
}
if ( HasFilter(id) ) {
Error("Filter was added twice, ignoring");
return false; // no, we don't want to add this a second time
}
Filter f;
f.num_fields = arg_num_fields;
f.fields = fields;
filters[id] = f;
return true;
}
bool Raw::DoRemoveFilter ( int id ) {
if (!HasFilter(id) ) {
Error("Filter removal of nonexisting filter requested.");
return false;
}
assert ( filters.erase(id) == 1 );
return true;
}
bool Raw::HasFilter(int id) {
map<int, Filter>::iterator it = filters.find(id);
if ( it == filters.end() ) {
return false;
}
return true;
}
bool Raw::GetLine(string& str) {
while ( getline(*file, str, separator[0]) ) {
return true;
}
return false;
}
// read the entire file and send appropriate thingies back to InputMgr
bool Raw::DoUpdate() {
switch ( mode ) {
case REREAD:
// check if the file has changed
struct stat sb;
if ( stat(fname.c_str(), &sb) == -1 ) {
Error(Fmt("Could not get stat for %s", fname.c_str()));
return false;
}
if ( sb.st_mtime <= mtime ) {
// no change
return true;
}
mtime = sb.st_mtime;
// file changed. reread.
// fallthrough
case MANUAL:
case STREAM:
if ( file && file->is_open() ) {
if ( mode == STREAM ) {
file->clear(); // remove end of file evil bits
break;
}
file->close();
}
file = new ifstream(fname.c_str());
if ( !file->is_open() ) {
Error(Fmt("cannot open %s", fname.c_str()));
return false;
}
break;
default:
assert(false);
}
string line;
while ( GetLine(line) ) {
for ( map<int, Filter>::iterator it = filters.begin(); it != filters.end(); it++ ) {
assert ((*it).second.num_fields == 1);
Value** fields = new Value*[1];
// filter has exactly one text field. convert to it.
Value* val = new Value(TYPE_STRING, true);
val->val.string_val = new string(line);
fields[0] = val;
Put((*it).first, fields);
}
}
return true;
}
bool Raw::DoHeartbeat(double network_time, double current_time)
{
ReaderBackend::DoHeartbeat(network_time, current_time);
switch ( mode ) {
case MANUAL:
// yay, we do nothing :)
break;
case REREAD:
case STREAM:
Update(); // call update and not DoUpdate, because update actually checks disabled.
break;
default:
assert(false);
}
return true;
}

70
src/input/readers/Raw.h Normal file
View file

@ -0,0 +1,70 @@
// See the file "COPYING" in the main distribution directory for copyright.
#ifndef INPUT_READERS_RAW_H
#define INPUT_READERS_RAW_H
#include <iostream>
#include <vector>
#include "../ReaderBackend.h"
namespace input { namespace reader {
class Raw : public ReaderBackend {
public:
Raw(ReaderFrontend* frontend);
~Raw();
static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Raw(frontend); }
protected:
virtual bool DoInit(string path, int mode);
virtual bool DoAddFilter( int id, int arg_num_fields, const threading::Field* const* fields );
virtual bool DoRemoveFilter ( int id );
virtual void DoFinish();
virtual bool DoUpdate();
virtual bool DoStartReading();
private:
virtual bool DoHeartbeat(double network_time, double current_time);
struct Filter {
unsigned int num_fields;
const threading::Field* const * fields; // raw mapping
};
bool HasFilter(int id);
bool GetLine(string& str);
ifstream* file;
string fname;
map<int, Filter> filters;
// Options set from the script-level.
string separator;
// keep a copy of the headerline to determine field locations when filters change
string headerline;
int mode;
bool started;
time_t mtime;
};
}
}
#endif /* INPUT_READERS_RAW_H */

View file

@ -173,6 +173,7 @@ module Input;
enum Reader %{ enum Reader %{
READER_DEFAULT, READER_DEFAULT,
READER_ASCII, READER_ASCII,
READER_RAW,
%} %}
enum Event %{ enum Event %{

View file

@ -0,0 +1,8 @@
sdfkh:KH;fdkncv;ISEUp34:Fkdj;YVpIODhfDF
DSF"DFKJ"SDFKLh304yrsdkfj@#(*U$34jfDJup3UF
q3r3057fdf
sdfs\d
dfsdf
sdf
3rw43wRRERLlL#RWERERERE.

View file

@ -0,0 +1,35 @@
#
# @TEST-EXEC: bro %INPUT >out
# @TEST-EXEC: btest-diff out
@TEST-START-FILE input.log
sdfkh:KH;fdkncv;ISEUp34:Fkdj;YVpIODhfDF
DSF"DFKJ"SDFKLh304yrsdkfj@#(*U$34jfDJup3UF
q3r3057fdf
sdfs\d
dfsdf
sdf
3rw43wRRERLlL#RWERERERE.
@TEST-END-FILE
module A;
export {
redef enum Input::ID += { INPUT };
}
type Val: record {
s: string;
};
event line(tpe: Input::Event, s: string) {
print s;
}
event bro_init()
{
Input::create_stream(A::INPUT, [$source="input.log", $reader=Input::READER_RAW, $mode=Input::STREAM]);
Input::add_eventfilter(A::INPUT, [$name="input", $fields=Val, $ev=line]);
}