diff --git a/scripts/base/frameworks/input/__load__.bro b/scripts/base/frameworks/input/__load__.bro index a3315186d5..b41fe5e95f 100644 --- a/scripts/base/frameworks/input/__load__.bro +++ b/scripts/base/frameworks/input/__load__.bro @@ -1,3 +1,4 @@ @load ./main @load ./readers/ascii +@load ./readers/raw diff --git a/scripts/base/frameworks/input/readers/raw.bro b/scripts/base/frameworks/input/readers/raw.bro new file mode 100644 index 0000000000..45deed3eda --- /dev/null +++ b/scripts/base/frameworks/input/readers/raw.bro @@ -0,0 +1,9 @@ +##! Interface for the raw input reader. + +module InputRaw; + +export { + ## Separator between input records. + ## Please note that the separator has to be exactly one character long + const record_separator = "\n" &redef; +} diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 6a84053bce..dd294ace7c 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -424,6 +424,7 @@ set(bro_SRCS input/ReaderBackend.cc input/ReaderFrontend.cc input/readers/Ascii.cc + input/readers/Raw.cc ${dns_SRCS} diff --git a/src/input.bif b/src/input.bif index 2e9324ec56..5418b7bbd4 100644 --- a/src/input.bif +++ b/src/input.bif @@ -62,3 +62,5 @@ const set_separator: string; const empty_field: string; const unset_field: string; +module InputRaw; +const record_separator: string; diff --git a/src/input/Manager.cc b/src/input/Manager.cc index a7afdc3a78..d3009aa619 100644 --- a/src/input/Manager.cc +++ b/src/input/Manager.cc @@ -6,6 +6,7 @@ #include "ReaderFrontend.h" #include "ReaderBackend.h" #include "readers/Ascii.h" +#include "readers/Raw.h" #include "Event.h" #include "EventHandler.h" @@ -143,6 +144,7 @@ struct ReaderDefinition { ReaderDefinition input_readers[] = { { BifEnum::Input::READER_ASCII, "Ascii", 0, reader::Ascii::Instantiate }, + { BifEnum::Input::READER_RAW, "Raw", 0, reader::Raw::Instantiate }, // End marker { BifEnum::Input::READER_DEFAULT, "None", 0, (ReaderBackend* (*)(ReaderFrontend* frontend))0 } diff --git a/src/input/readers/Ascii.cc b/src/input/readers/Ascii.cc index 73b8500d5e..733cca6352 100644 --- a/src/input/readers/Ascii.cc +++ b/src/input/readers/Ascii.cc @@ -133,6 +133,7 @@ bool Ascii::DoStartReading() { bool Ascii::DoAddFilter( int id, int arg_num_fields, const Field* const* fields ) { if ( HasFilter(id) ) { + Error("Filter was added twice, ignoring."); return false; // no, we don't want to add this a second time } @@ -147,6 +148,7 @@ bool Ascii::DoAddFilter( int id, int arg_num_fields, const Field* const* fields bool Ascii::DoRemoveFilter ( int id ) { if (!HasFilter(id) ) { + Error("Filter removal of nonexisting filter requested."); return false; } @@ -263,11 +265,11 @@ TransportProto Ascii::StringToProto(const string &proto) { Value* Ascii::EntryToVal(string s, FieldMapping field) { - Value* val = new Value(field.type, true); - if ( s.compare(unset_field) == 0 ) { // field is not set... return new Value(field.type, false); } + + Value* val = new Value(field.type, true); switch ( field.type ) { case TYPE_ENUM: diff --git a/src/input/readers/Raw.cc b/src/input/readers/Raw.cc new file mode 100644 index 0000000000..c435624865 --- /dev/null +++ b/src/input/readers/Raw.cc @@ -0,0 +1,230 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#include "Raw.h" +#include "NetVar.h" + +#include +#include + +#include "../../threading/SerializationTypes.h" + +#define MANUAL 0 +#define REREAD 1 +#define STREAM 2 + +#include +#include +#include + +using namespace input::reader; +using threading::Value; +using threading::Field; + +Raw::Raw(ReaderFrontend *frontend) : ReaderBackend(frontend) +{ + file = 0; + + //keyMap = new map(); + + separator.assign( (const char*) BifConst::InputRaw::record_separator->Bytes(), BifConst::InputRaw::record_separator->Len()); + if ( separator.size() != 1 ) { + Error("separator length has to be 1. Separator will be truncated."); + } + +} + +Raw::~Raw() +{ + DoFinish(); +} + +void Raw::DoFinish() +{ + filters.empty(); + if ( file != 0 ) { + file->close(); + delete(file); + file = 0; + } +} + +bool Raw::DoInit(string path, int arg_mode) +{ + started = false; + fname = path; + mode = arg_mode; + mtime = 0; + + if ( ( mode != MANUAL ) && (mode != REREAD) && ( mode != STREAM ) ) { + Error(Fmt("Unsupported read mode %d for source %s", mode, path.c_str())); + return false; + } + + file = new ifstream(path.c_str()); + if ( !file->is_open() ) { + Error(Fmt("Init: cannot open %s", fname.c_str())); + return false; + } + + return true; +} + +bool Raw::DoStartReading() { + if ( started == true ) { + Error("Started twice"); + return false; + } + + started = true; + switch ( mode ) { + case MANUAL: + case REREAD: + case STREAM: + DoUpdate(); + break; + default: + assert(false); + } + + return true; +} + +bool Raw::DoAddFilter( int id, int arg_num_fields, const Field* const* fields ) { + + if ( arg_num_fields != 1 ) { + Error("Filter for raw reader contains more than one field. Filters for the raw reader may only contain exactly one string field. Filter ignored."); + return false; + } + + if ( fields[0]->type != TYPE_STRING ) { + Error("Filter for raw reader contains a field that is not of type string."); + return false; + } + + if ( HasFilter(id) ) { + Error("Filter was added twice, ignoring"); + return false; // no, we don't want to add this a second time + } + + Filter f; + f.num_fields = arg_num_fields; + f.fields = fields; + + filters[id] = f; + + return true; +} + +bool Raw::DoRemoveFilter ( int id ) { + if (!HasFilter(id) ) { + Error("Filter removal of nonexisting filter requested."); + return false; + } + + assert ( filters.erase(id) == 1 ); + + return true; +} + + +bool Raw::HasFilter(int id) { + map::iterator it = filters.find(id); + if ( it == filters.end() ) { + return false; + } + return true; +} + +bool Raw::GetLine(string& str) { + while ( getline(*file, str, separator[0]) ) { + return true; + } + + return false; +} + + +// read the entire file and send appropriate thingies back to InputMgr +bool Raw::DoUpdate() { + switch ( mode ) { + case REREAD: + // check if the file has changed + struct stat sb; + if ( stat(fname.c_str(), &sb) == -1 ) { + Error(Fmt("Could not get stat for %s", fname.c_str())); + return false; + } + + if ( sb.st_mtime <= mtime ) { + // no change + return true; + } + + mtime = sb.st_mtime; + // file changed. reread. + + // fallthrough + case MANUAL: + case STREAM: + + if ( file && file->is_open() ) { + if ( mode == STREAM ) { + file->clear(); // remove end of file evil bits + break; + } + file->close(); + } + file = new ifstream(fname.c_str()); + if ( !file->is_open() ) { + Error(Fmt("cannot open %s", fname.c_str())); + return false; + } + + break; + default: + assert(false); + + } + + string line; + while ( GetLine(line) ) { + for ( map::iterator it = filters.begin(); it != filters.end(); it++ ) { + + assert ((*it).second.num_fields == 1); + + Value** fields = new Value*[1]; + + // filter has exactly one text field. convert to it. + Value* val = new Value(TYPE_STRING, true); + val->val.string_val = new string(line); + fields[0] = val; + + Put((*it).first, fields); + + } + + } + + return true; +} + + +bool Raw::DoHeartbeat(double network_time, double current_time) +{ + ReaderBackend::DoHeartbeat(network_time, current_time); + + switch ( mode ) { + case MANUAL: + // yay, we do nothing :) + break; + case REREAD: + case STREAM: + Update(); // call update and not DoUpdate, because update actually checks disabled. + break; + default: + assert(false); + } + + return true; +} + diff --git a/src/input/readers/Raw.h b/src/input/readers/Raw.h new file mode 100644 index 0000000000..e046cb2ff7 --- /dev/null +++ b/src/input/readers/Raw.h @@ -0,0 +1,70 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#ifndef INPUT_READERS_RAW_H +#define INPUT_READERS_RAW_H + +#include +#include + +#include "../ReaderBackend.h" + +namespace input { namespace reader { + +class Raw : public ReaderBackend { +public: + Raw(ReaderFrontend* frontend); + ~Raw(); + + static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Raw(frontend); } + +protected: + + virtual bool DoInit(string path, int mode); + + virtual bool DoAddFilter( int id, int arg_num_fields, const threading::Field* const* fields ); + + virtual bool DoRemoveFilter ( int id ); + + virtual void DoFinish(); + + virtual bool DoUpdate(); + + virtual bool DoStartReading(); + +private: + + virtual bool DoHeartbeat(double network_time, double current_time); + + struct Filter { + unsigned int num_fields; + + const threading::Field* const * fields; // raw mapping + }; + + bool HasFilter(int id); + + bool GetLine(string& str); + + ifstream* file; + string fname; + + map filters; + + // Options set from the script-level. + string separator; + + // keep a copy of the headerline to determine field locations when filters change + string headerline; + + int mode; + + bool started; + time_t mtime; + +}; + + +} +} + +#endif /* INPUT_READERS_RAW_H */ diff --git a/src/types.bif b/src/types.bif index e2a47a7ece..a9c6ecb3a8 100644 --- a/src/types.bif +++ b/src/types.bif @@ -173,6 +173,7 @@ module Input; enum Reader %{ READER_DEFAULT, READER_ASCII, + READER_RAW, %} enum Event %{ diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.raw/out b/testing/btest/Baseline/scripts.base.frameworks.input.raw/out new file mode 100644 index 0000000000..2059013c5d --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.input.raw/out @@ -0,0 +1,8 @@ +sdfkh:KH;fdkncv;ISEUp34:Fkdj;YVpIODhfDF +DSF"DFKJ"SDFKLh304yrsdkfj@#(*U$34jfDJup3UF +q3r3057fdf +sdfs\d + +dfsdf +sdf +3rw43wRRERLlL#RWERERERE. diff --git a/testing/btest/scripts/base/frameworks/input/raw.bro b/testing/btest/scripts/base/frameworks/input/raw.bro new file mode 100644 index 0000000000..5f196648b6 --- /dev/null +++ b/testing/btest/scripts/base/frameworks/input/raw.bro @@ -0,0 +1,35 @@ +# +# @TEST-EXEC: bro %INPUT >out +# @TEST-EXEC: btest-diff out + +@TEST-START-FILE input.log +sdfkh:KH;fdkncv;ISEUp34:Fkdj;YVpIODhfDF +DSF"DFKJ"SDFKLh304yrsdkfj@#(*U$34jfDJup3UF +q3r3057fdf +sdfs\d + +dfsdf +sdf +3rw43wRRERLlL#RWERERERE. +@TEST-END-FILE + + +module A; + +export { + redef enum Input::ID += { INPUT }; +} + +type Val: record { + s: string; +}; + +event line(tpe: Input::Event, s: string) { + print s; +} + +event bro_init() +{ + Input::create_stream(A::INPUT, [$source="input.log", $reader=Input::READER_RAW, $mode=Input::STREAM]); + Input::add_eventfilter(A::INPUT, [$name="input", $fields=Val, $ev=line]); +}