Input framework merge in progress.

This commit is contained in:
Robin Sommer 2012-05-25 15:14:25 -07:00
parent 658b188dff
commit b37f9e38f6
16 changed files with 1063 additions and 1050 deletions

View file

@ -4,7 +4,7 @@
module Input; module Input;
export { export {
## The default input reader used. Defaults to `READER_ASCII`. ## The default input reader used. Defaults to `READER_ASCII`.
const default_reader = READER_ASCII &redef; const default_reader = READER_ASCII &redef;
@ -13,52 +13,56 @@ export {
## TableFilter description type used for the `table` method. ## TableFilter description type used for the `table` method.
type TableDescription: record { type TableDescription: record {
## Common definitions for tables and events ## Common definitions for tables and events
## String that allows the reader to find the source. ## String that allows the reader to find the source.
## For `READER_ASCII`, this is the filename. ## For `READER_ASCII`, this is the filename.
source: string; source: string;
## Reader to use for this steam ## Reader to use for this stream
reader: Reader &default=default_reader; reader: Reader &default=default_reader;
## Read mode to use for this stream ## Read mode to use for this stream
mode: Mode &default=default_mode; mode: Mode &default=default_mode;
## Descriptive name. Used to remove a stream at a later time ## Descriptive name. Used to remove a stream at a later time
name: string; name: string;
## Special definitions for tables # Special definitions for tables
## Table which will contain the data read by the input framework ## Table which will receive the data read by the input framework
destination: any; destination: any;
## Record that defines the values used as the index of the table ## Record that defines the values used as the index of the table
idx: any; idx: any;
## Record that defines the values used as the values of the table
## Record that defines the values used as the elements of the table
## If val is undefined, destination has to be a set. ## If val is undefined, destination has to be a set.
val: any &optional; val: any &optional;
## Defines if the value of the table is a record (default), or a single value.
## Val can only contain one element when this is set to false. ## Defines if the value of the table is a record (default), or a single value. Val
## can only contain one element when this is set to false.
want_record: bool &default=T; want_record: bool &default=T;
## The event that is raised each time a value is added to, changed in or removed from the table. ## The event that is raised each time a value is added to, changed in or removed
## The event will receive an Input::Event enum as the first argument, the idx record as the second argument ## from the table. The event will receive an Input::Event enum as the first
## and the value (record) as the third argument. ## argument, the idx record as the second argument and the value (record) as the
ev: any &optional; # event containing idx, val as values. ## third argument.
ev: any &optional; # event containing idx, val as values.
## Predicate function, that can decide if an insertion, update or removal should really be executed. ## Predicate function that can decide if an insertion, update or removal should
## Parameters are the same as for the event. If true is returned, the update is performed. If false ## really be executed. Parameters are the same as for the event. If true is
## is returned, it is skipped ## returned, the update is performed. If false is returned, it is skipped.
pred: function(typ: Input::Event, left: any, right: any): bool &optional; pred: function(typ: Input::Event, left: any, right: any): bool &optional;
}; };
## EventFilter description type used for the `event` method. ## EventFilter description type used for the `event` method.
type EventDescription: record { type EventDescription: record {
## Common definitions for tables and events ## Common definitions for tables and events
## String that allows the reader to find the source. ## String that allows the reader to find the source.
## For `READER_ASCII`, this is the filename. ## For `READER_ASCII`, this is the filename.
source: string; source: string;
## Reader to use for this steam ## Reader to use for this steam
reader: Reader &default=default_reader; reader: Reader &default=default_reader;
@ -66,19 +70,20 @@ export {
mode: Mode &default=default_mode; mode: Mode &default=default_mode;
## Descriptive name. Used to remove a stream at a later time ## Descriptive name. Used to remove a stream at a later time
name: string; name: string;
# Special definitions for events
## Special definitions for events
## Record describing the fields to be retrieved from the source input. ## Record describing the fields to be retrieved from the source input.
fields: any; fields: any;
## If want_record if false (default), the event receives each value in fields as a seperate argument. ## If want_record if false (default), the event receives each value in fields as a seperate argument.
## If it is set to true, the event receives all fields in a signle record value. ## If it is set to true, the event receives all fields in a signle record value.
want_record: bool &default=F; want_record: bool &default=F;
## The event that is rised each time a new line is received from the reader. ## The event that is rised each time a new line is received from the reader.
## The event will receive an Input::Event enum as the first element, and the fields as the following arguments. ## The event will receive an Input::Event enum as the first element, and the fields as the following arguments.
ev: any; ev: any;
}; };
@ -86,7 +91,7 @@ export {
## ##
## description: `TableDescription` record describing the source. ## description: `TableDescription` record describing the source.
global add_table: function(description: Input::TableDescription) : bool; global add_table: function(description: Input::TableDescription) : bool;
## Create a new event input from a given source. Returns true on success. ## Create a new event input from a given source. Returns true on success.
## ##
## description: `TableDescription` record describing the source. ## description: `TableDescription` record describing the source.

View file

@ -1,4 +1,6 @@
##! Interface for the ascii input reader. ##! Interface for the ascii input reader.
##!
##! The defaults are set to match Bro's ASCII output.
module InputAscii; module InputAscii;

File diff suppressed because it is too large Load diff

View file

@ -1,6 +1,6 @@
// See the file "COPYING" in the main distribution directory for copyright. // See the file "COPYING" in the main distribution directory for copyright.
// //
// Class for managing input streams // Class for managing input streams.
#ifndef INPUT_MANAGER_H #ifndef INPUT_MANAGER_H
#define INPUT_MANAGER_H #define INPUT_MANAGER_H
@ -16,7 +16,7 @@
namespace input { namespace input {
class ReaderFrontend; class ReaderFrontend;
class ReaderBackend; class ReaderBackend;
/** /**
* Singleton class for managing input streams. * Singleton class for managing input streams.
@ -25,58 +25,60 @@ class Manager {
public: public:
/** /**
* Constructor. * Constructor.
*/ */
Manager(); Manager();
/** /**
* Destructor. * Destructor.
*/ */
~Manager(); ~Manager();
/** /**
* Creates a new input stream which will write the data from the data source into * Creates a new input stream which will write the data from the data
* source into a table.
* *
* @param description A record of script type \c Input:StreamDescription. * @param description A record of script type \c
* Input:StreamDescription.
* *
* This method corresponds directly to the internal BiF defined in * This method corresponds directly to the internal BiF defined in
* input.bif, which just forwards here. * input.bif, which just forwards here.
*/ */
bool CreateTableStream(RecordVal* description); bool CreateTableStream(RecordVal* description);
/** /**
* Creates a new input stream which sends events for read input data. * Creates a new input stream which sends events for read input data.
* *
* @param description A record of script type \c Input:StreamDescription. * @param description A record of script type \c
* Input:StreamDescription.
* *
* This method corresponds directly to the internal BiF defined in * This method corresponds directly to the internal BiF defined in
* input.bif, which just forwards here. * input.bif, which just forwards here.
*/ */
bool CreateEventStream(RecordVal* description); bool CreateEventStream(RecordVal* description);
/** /**
* Force update on a input stream. * Force update on a input stream. Forces a re-read of the whole
* Forces a re-read of the whole input source. * input source. Usually used when an input stream is opened in
* Usually used, when an input stream is opened in managed mode. * managed mode. Otherwise, this can be used to trigger a input
* Otherwise, this can be used to trigger a input source check before a heartbeat message arrives. * source check before a heartbeat message arrives. May be ignored by
* May be ignored by the reader. * the reader.
* *
* @param id The enum value corresponding the input stream. * @param id The enum value corresponding the input stream.
* *
* This method corresponds directly to the internal BiF defined in * This method corresponds directly to the internal BiF defined in
* input.bif, which just forwards here. * input.bif, which just forwards here.
*/ */
bool ForceUpdate(const string &id); bool ForceUpdate(const string &id);
/** /**
* Deletes an existing input stream * Deletes an existing input stream.
* *
* @param id The enum value corresponding the input stream. * @param id The enum value corresponding the input stream.
* *
* This method corresponds directly to the internal BiF defined in * This method corresponds directly to the internal BiF defined in
* input.bif, which just forwards here. * input.bif, which just forwards here.
*/ */
bool RemoveStream(const string &id); bool RemoveStream(const string &id);
protected: protected:
friend class ReaderFrontend; friend class ReaderFrontend;
@ -88,90 +90,100 @@ protected:
friend class EndCurrentSendMessage; friend class EndCurrentSendMessage;
friend class ReaderClosedMessage; friend class ReaderClosedMessage;
// For readers to write to input stream in direct mode (reporting new/deleted values directly) // For readers to write to input stream in direct mode (reporting
// Functions take ownership of threading::Value fields // new/deleted values directly). Functions take ownership of
// threading::Value fields.
void Put(ReaderFrontend* reader, threading::Value* *vals); void Put(ReaderFrontend* reader, threading::Value* *vals);
void Clear(ReaderFrontend* reader); void Clear(ReaderFrontend* reader);
bool Delete(ReaderFrontend* reader, threading::Value* *vals); bool Delete(ReaderFrontend* reader, threading::Value* *vals);
// for readers to write to input stream in indirect mode (manager is monitoring new/deleted values) // For readers to write to input stream in indirect mode (manager is
// Functions take ownership of threading::Value fields // monitoring new/deleted values) Functions take ownership of
// threading::Value fields.
void SendEntry(ReaderFrontend* reader, threading::Value* *vals); void SendEntry(ReaderFrontend* reader, threading::Value* *vals);
void EndCurrentSend(ReaderFrontend* reader); void EndCurrentSend(ReaderFrontend* reader);
// Allows readers to directly send Bro events. // Allows readers to directly send Bro events. The num_vals and vals
// The num_vals and vals must be the same the named event expects. // must be the same the named event expects. Takes ownership of
// Takes ownership of threading::Value fields // threading::Value fields.
bool SendEvent(const string& name, const int num_vals, threading::Value* *vals); bool SendEvent(const string& name, const int num_vals, threading::Value* *vals);
// Instantiates a new ReaderBackend of the given type (note that // Instantiates a new ReaderBackend of the given type (note that
// doing so creates a new thread!). // doing so creates a new thread!).
ReaderBackend* CreateBackend(ReaderFrontend* frontend, bro_int_t type); ReaderBackend* CreateBackend(ReaderFrontend* frontend, bro_int_t type);
// Functions are called from the ReaderBackend to notify the manager, that a stream has been removed // Function called from the ReaderBackend to notify the manager that
// or a stream has been closed. // a stream has been removed or a stream has been closed. Used to
// Used to prevent race conditions where data for a specific stream is still in the queue when the // prevent race conditions where data for a specific stream is still
// RemoveStream directive is executed by the main thread. // in the queue when the RemoveStream directive is executed by the
// This makes sure all data that has ben queued for a stream is still received. // main thread. This makes sure all data that has ben queued for a
// stream is still received.
bool RemoveStreamContinuation(ReaderFrontend* reader); bool RemoveStreamContinuation(ReaderFrontend* reader);
private: private:
class Stream; class Stream;
class TableStream; class TableStream;
class EventStream; class EventStream;
bool CreateStream(Stream*, RecordVal* description);
// SendEntry implementation for Table stream bool CreateStream(Stream*, RecordVal* description);
int SendEntryTable(Stream* i, const threading::Value* const *vals);
// Put implementation for Table stream // SendEntry implementation for Table stream.
int PutTable(Stream* i, const threading::Value* const *vals); int SendEntryTable(Stream* i, const threading::Value* const *vals);
// SendEntry and Put implementation for Event stream // Put implementation for Table stream.
int PutTable(Stream* i, const threading::Value* const *vals);
// SendEntry and Put implementation for Event stream.
int SendEventStreamEvent(Stream* i, EnumVal* type, const threading::Value* const *vals); int SendEventStreamEvent(Stream* i, EnumVal* type, const threading::Value* const *vals);
// Checks is a bro type can be used for data reading. The equivalend in threading cannot be used, because we have support different types // Checks that a Bro type can be used for data reading. The
// from the log framework // equivalend in threading cannot be used, because we have support
// different types from the log framework
bool IsCompatibleType(BroType* t, bool atomic_only=false); bool IsCompatibleType(BroType* t, bool atomic_only=false);
// Check if a record is made up of compatible types and return a list of all fields that are in the record in order. // Check if a record is made up of compatible types and return a list
// Recursively unrolls records // of all fields that are in the record in order. Recursively unrolls
// records
bool UnrollRecordType(vector<threading::Field*> *fields, const RecordType *rec, const string& nameprepend); bool UnrollRecordType(vector<threading::Field*> *fields, const RecordType *rec, const string& nameprepend);
// Send events // Send events
void SendEvent(EventHandlerPtr ev, const int numvals, ...); void SendEvent(EventHandlerPtr ev, const int numvals, ...);
void SendEvent(EventHandlerPtr ev, list<Val*> events); void SendEvent(EventHandlerPtr ev, list<Val*> events);
// Call predicate function and return result // Call predicate function and return result.
bool CallPred(Func* pred_func, const int numvals, ...); bool CallPred(Func* pred_func, const int numvals, ...);
// get a hashkey for a set of threading::Values // Get a hashkey for a set of threading::Values.
HashKey* HashValues(const int num_elements, const threading::Value* const *vals); HashKey* HashValues(const int num_elements, const threading::Value* const *vals);
// Get the memory used by a specific value // Get the memory used by a specific value.
int GetValueLength(const threading::Value* val); int GetValueLength(const threading::Value* val);
// Copies the raw data in a specific threading::Value to position sta
// Copies the raw data in a specific threading::Value to position
// startpos.
int CopyValue(char *data, const int startpos, const threading::Value* val); int CopyValue(char *data, const int startpos, const threading::Value* val);
// Convert Threading::Value to an internal Bro Type (works also with Records) // Convert Threading::Value to an internal Bro Type (works also with
// Records).
Val* ValueToVal(const threading::Value* val, BroType* request_type); Val* ValueToVal(const threading::Value* val, BroType* request_type);
// Convert Threading::Value to an internal Bro List type // Convert Threading::Value to an internal Bro List type.
Val* ValueToIndexVal(int num_fields, const RecordType* type, const threading::Value* const *vals); Val* ValueToIndexVal(int num_fields, const RecordType* type, const threading::Value* const *vals);
// Converts a threading::value to a record type. mostly used by ValueToVal // Converts a threading::value to a record type. Mostly used by
RecordVal* ValueToRecordVal(const threading::Value* const *vals, RecordType *request_type, int* position); // ValueToVal.
RecordVal* ValueToRecordVal(const threading::Value* const *vals, RecordType *request_type, int* position);
Val* RecordValToIndexVal(RecordVal *r); Val* RecordValToIndexVal(RecordVal *r);
// Converts a Bro ListVal to a RecordVal given the record type // Converts a Bro ListVal to a RecordVal given the record type.
RecordVal* ListValToRecordVal(ListVal* list, RecordType *request_type, int* position); RecordVal* ListValToRecordVal(ListVal* list, RecordType *request_type, int* position);
Stream* FindStream(const string &name); Stream* FindStream(const string &name);
Stream* FindStream(ReaderFrontend* reader); Stream* FindStream(ReaderFrontend* reader);
enum StreamType { TABLE_FILTER, EVENT_FILTER }; enum StreamType { TABLE_STREAM, EVENT_STREAM };
map<ReaderFrontend*, Stream*> readers; map<ReaderFrontend*, Stream*> readers;
}; };

View file

@ -15,7 +15,7 @@ public:
: threading::OutputMessage<ReaderFrontend>("Put", reader), : threading::OutputMessage<ReaderFrontend>("Put", reader),
val(val) {} val(val) {}
virtual bool Process() virtual bool Process()
{ {
input_mgr->Put(Object(), val); input_mgr->Put(Object(), val);
return true; return true;
@ -31,7 +31,7 @@ public:
: threading::OutputMessage<ReaderFrontend>("Delete", reader), : threading::OutputMessage<ReaderFrontend>("Delete", reader),
val(val) {} val(val) {}
virtual bool Process() virtual bool Process()
{ {
return input_mgr->Delete(Object(), val); return input_mgr->Delete(Object(), val);
} }
@ -45,7 +45,7 @@ public:
ClearMessage(ReaderFrontend* reader) ClearMessage(ReaderFrontend* reader)
: threading::OutputMessage<ReaderFrontend>("Clear", reader) {} : threading::OutputMessage<ReaderFrontend>("Clear", reader) {}
virtual bool Process() virtual bool Process()
{ {
input_mgr->Clear(Object()); input_mgr->Clear(Object());
return true; return true;
@ -60,14 +60,14 @@ public:
: threading::OutputMessage<ReaderFrontend>("SendEvent", reader), : threading::OutputMessage<ReaderFrontend>("SendEvent", reader),
name(name), num_vals(num_vals), val(val) {} name(name), num_vals(num_vals), val(val) {}
virtual bool Process() virtual bool Process()
{ {
bool success = input_mgr->SendEvent(name, num_vals, val); bool success = input_mgr->SendEvent(name, num_vals, val);
if ( !success ) if ( ! success )
reporter->Error("SendEvent for event %s failed", name.c_str()); reporter->Error("SendEvent for event %s failed", name.c_str());
return true; // we do not want to die if sendEvent fails because the event did not return. return true; // We do not want to die if sendEvent fails because the event did not return.
} }
private: private:
@ -82,7 +82,7 @@ public:
: threading::OutputMessage<ReaderFrontend>("SendEntry", reader), : threading::OutputMessage<ReaderFrontend>("SendEntry", reader),
val(val) { } val(val) { }
virtual bool Process() virtual bool Process()
{ {
input_mgr->SendEntry(Object(), val); input_mgr->SendEntry(Object(), val);
return true; return true;
@ -97,7 +97,7 @@ public:
EndCurrentSendMessage(ReaderFrontend* reader) EndCurrentSendMessage(ReaderFrontend* reader)
: threading::OutputMessage<ReaderFrontend>("EndCurrentSend", reader) {} : threading::OutputMessage<ReaderFrontend>("EndCurrentSend", reader) {}
virtual bool Process() virtual bool Process()
{ {
input_mgr->EndCurrentSend(Object()); input_mgr->EndCurrentSend(Object());
return true; return true;
@ -111,7 +111,7 @@ public:
ReaderClosedMessage(ReaderFrontend* reader) ReaderClosedMessage(ReaderFrontend* reader)
: threading::OutputMessage<ReaderFrontend>("ReaderClosed", reader) {} : threading::OutputMessage<ReaderFrontend>("ReaderClosed", reader) {}
virtual bool Process() virtual bool Process()
{ {
return input_mgr->RemoveStreamContinuation(Object()); return input_mgr->RemoveStreamContinuation(Object());
} }
@ -127,49 +127,46 @@ public:
: threading::OutputMessage<ReaderFrontend>("Disable", writer) {} : threading::OutputMessage<ReaderFrontend>("Disable", writer) {}
virtual bool Process() virtual bool Process()
{ {
Object()->SetDisable(); Object()->SetDisable();
return true; return true;
} }
}; };
ReaderBackend::ReaderBackend(ReaderFrontend* arg_frontend) : MsgThread() ReaderBackend::ReaderBackend(ReaderFrontend* arg_frontend) : MsgThread()
{ {
buf = 0;
buf_len = 1024;
disabled = true; // disabled will be set correcty in init. disabled = true; // disabled will be set correcty in init.
frontend = arg_frontend; frontend = arg_frontend;
SetName(frontend->Name()); SetName(frontend->Name());
} }
ReaderBackend::~ReaderBackend() ReaderBackend::~ReaderBackend()
{ {
} }
void ReaderBackend::Put(Value* *val) void ReaderBackend::Put(Value* *val)
{ {
SendOut(new PutMessage(frontend, val)); SendOut(new PutMessage(frontend, val));
} }
void ReaderBackend::Delete(Value* *val) void ReaderBackend::Delete(Value* *val)
{ {
SendOut(new DeleteMessage(frontend, val)); SendOut(new DeleteMessage(frontend, val));
} }
void ReaderBackend::Clear() void ReaderBackend::Clear()
{ {
SendOut(new ClearMessage(frontend)); SendOut(new ClearMessage(frontend));
} }
void ReaderBackend::SendEvent(const string& name, const int num_vals, Value* *vals) void ReaderBackend::SendEvent(const string& name, const int num_vals, Value* *vals)
{ {
SendOut(new SendEventMessage(frontend, name, num_vals, vals)); SendOut(new SendEventMessage(frontend, name, num_vals, vals));
} }
void ReaderBackend::EndCurrentSend() void ReaderBackend::EndCurrentSend()
{ {
SendOut(new EndCurrentSendMessage(frontend)); SendOut(new EndCurrentSendMessage(frontend));
} }
@ -179,19 +176,19 @@ void ReaderBackend::SendEntry(Value* *vals)
SendOut(new SendEntryMessage(frontend, vals)); SendOut(new SendEntryMessage(frontend, vals));
} }
bool ReaderBackend::Init(string arg_source, int mode, const int arg_num_fields, bool ReaderBackend::Init(string arg_source, int mode, const int arg_num_fields,
const threading::Field* const* arg_fields) const threading::Field* const* arg_fields)
{ {
source = arg_source; source = arg_source;
SetName("InputReader/"+source); SetName("InputReader/"+source);
num_fields = arg_num_fields; num_fields = arg_num_fields;
fields = arg_fields; fields = arg_fields;
// disable if DoInit returns error. // disable if DoInit returns error.
int success = DoInit(arg_source, mode, arg_num_fields, arg_fields); int success = DoInit(arg_source, mode, arg_num_fields, arg_fields);
if ( !success ) if ( ! success )
{ {
Error("Init failed"); Error("Init failed");
DisableFrontend(); DisableFrontend();
@ -202,30 +199,30 @@ bool ReaderBackend::Init(string arg_source, int mode, const int arg_num_fields,
return success; return success;
} }
void ReaderBackend::Close() void ReaderBackend::Close()
{ {
DoClose(); DoClose();
disabled = true; disabled = true;
DisableFrontend(); DisableFrontend();
SendOut(new ReaderClosedMessage(frontend)); SendOut(new ReaderClosedMessage(frontend));
if ( fields != 0 ) if ( fields != 0 )
{ {
for ( unsigned int i = 0; i < num_fields; i++ ) for ( unsigned int i = 0; i < num_fields; i++ )
delete(fields[i]); delete(fields[i]);
delete[] (fields); delete [] (fields);
fields = 0; fields = 0;
} }
} }
bool ReaderBackend::Update() bool ReaderBackend::Update()
{ {
if ( disabled ) if ( disabled )
return false; return false;
bool success = DoUpdate(); bool success = DoUpdate();
if ( !success ) if ( ! success )
DisableFrontend(); DisableFrontend();
return success; return success;
@ -233,8 +230,9 @@ bool ReaderBackend::Update()
void ReaderBackend::DisableFrontend() void ReaderBackend::DisableFrontend()
{ {
disabled = true; // We also set disabled here, because there still may be other
// we also set disabled here, because there still may be other messages queued and we will dutifully ignore these from now // messages queued and we will dutifully ignore these from now.
disabled = true;
SendOut(new DisableMessage(frontend)); SendOut(new DisableMessage(frontend));
} }
@ -244,9 +242,9 @@ bool ReaderBackend::DoHeartbeat(double network_time, double current_time)
return true; return true;
} }
TransportProto ReaderBackend::StringToProto(const string &proto) TransportProto ReaderBackend::StringToProto(const string &proto)
{ {
if ( proto == "unknown" ) if ( proto == "unknown" )
return TRANSPORT_UNKNOWN; return TRANSPORT_UNKNOWN;
else if ( proto == "tcp" ) else if ( proto == "tcp" )
return TRANSPORT_TCP; return TRANSPORT_TCP;
@ -261,8 +259,8 @@ TransportProto ReaderBackend::StringToProto(const string &proto)
} }
// more or less verbose copy from IPAddr.cc -- which uses reporter // More or less verbose copy from IPAddr.cc -- which uses reporter.
Value::addr_t ReaderBackend::StringToAddr(const string &s) Value::addr_t ReaderBackend::StringToAddr(const string &s)
{ {
Value::addr_t val; Value::addr_t val;
@ -270,9 +268,9 @@ Value::addr_t ReaderBackend::StringToAddr(const string &s)
{ {
val.family = IPv4; val.family = IPv4;
if ( inet_aton(s.c_str(), &(val.in.in4)) <= 0 ) if ( inet_aton(s.c_str(), &(val.in.in4)) <= 0 )
{ {
Error(Fmt("Bad addres: %s", s.c_str())); Error(Fmt("Bad address: %s", s.c_str()));
memset(&val.in.in4.s_addr, 0, sizeof(val.in.in4.s_addr)); memset(&val.in.in4.s_addr, 0, sizeof(val.in.in4.s_addr));
} }
@ -283,7 +281,7 @@ Value::addr_t ReaderBackend::StringToAddr(const string &s)
val.family = IPv6; val.family = IPv6;
if ( inet_pton(AF_INET6, s.c_str(), val.in.in6.s6_addr) <=0 ) if ( inet_pton(AF_INET6, s.c_str(), val.in.in6.s6_addr) <=0 )
{ {
Error(Fmt("Bad IP address: %s", s.c_str())); Error(Fmt("Bad address: %s", s.c_str()));
memset(val.in.in6.s6_addr, 0, sizeof(val.in.in6.s6_addr)); memset(val.in.in6.s6_addr, 0, sizeof(val.in.in6.s6_addr));
} }
} }

View file

@ -12,13 +12,13 @@ namespace input {
class ReaderFrontend; class ReaderFrontend;
/** /**
* Base class for reader implementation. When the input:Manager creates a * Base class for reader implementation. When the input:Manager creates a new
* new input stream, it instantiates a ReaderFrontend. That then in turn * input stream, it instantiates a ReaderFrontend. That then in turn creates
* creates a ReaderBackend of the right type. The frontend then forwards * a ReaderBackend of the right type. The frontend then forwards messages
* message over the backend as its methods are called. * over the backend as its methods are called.
* *
* All of this methods must be called only from the corresponding child * All methods must be called only from the corresponding child thread (the
* thread (the constructor is the one exception.) * constructor is the one exception.)
*/ */
class ReaderBackend : public threading::MsgThread { class ReaderBackend : public threading::MsgThread {
public: public:
@ -27,54 +27,51 @@ public:
* *
* @param frontend The frontend reader that created this backend. The * @param frontend The frontend reader that created this backend. The
* *only* purpose of this value is to be passed back via messages as * *only* purpose of this value is to be passed back via messages as
* a argument to callbacks. One must not otherwise access the * an argument to callbacks. One must not otherwise access the
* frontend, it's running in a different thread. * frontend, it's running in a different thread.
* */
* @param frontend pointer to the reader frontend
*/
ReaderBackend(ReaderFrontend* frontend); ReaderBackend(ReaderFrontend* frontend);
/** /**
* Destructor. * Destructor.
*/ */
virtual ~ReaderBackend(); virtual ~ReaderBackend();
/** /**
* One-time initialization of the reader to define the input source. * One-time initialization of the reader to define the input source.
* *
* @param arg_source A string left to the interpretation of the reader * @param arg_source A string left to the interpretation of the
* implementation; it corresponds to the value configured on the * reader implementation; it corresponds to the value configured on
* script-level for the input stream. * the script-level for the input stream.
* *
* @param num_fields The number of log fields for the stream. * @param fields An array of size \a num_fields with the input
* fields. The method takes ownership of the array.
* *
* @param fields An array of size \a num_fields with the log fields. * @param mode The opening mode for the input source as one of the
* The methods takes ownership of the array. * Input::Mode script constants.
*
* @param mode the opening mode for the input source
* *
* @param arg_num_fields number of fields contained in \a fields * @param arg_num_fields Number of fields contained in \a fields.
* *
* @param fields the types and names of the fields to be retrieved * @param fields The types and names of the fields to be retrieved
* from the input source * from the input source.
* *
* @return False if an error occured. * @return False if an error occured.
*/ */
bool Init(string arg_source, int mode, int arg_num_fields, const threading::Field* const* fields); bool Init(string arg_source, int mode, int arg_num_fields, const threading::Field* const* fields);
/** /**
* Finishes reading from this input stream in a regular fashion. Must not be * Finishes reading from this input stream in a regular fashion. Must
* called if an error has been indicated earlier. After calling this, * not be called if an error has been indicated earlier. After
* no further reading from the stream can be performed * calling this, no further reading from the stream can be performed.
* *
* @return False if an error occured. * @return False if an error occured.
*/ */
void Close(); void Close();
/** /**
* Force trigger an update of the input stream. * Force trigger an update of the input stream. The action that will
* The action that will be taken depends on the current read mode and the * be taken depends on the current read mode and the individual input
* individual input backend * backend.
* *
* An backend can choose to ignore this. * An backend can choose to ignore this.
* *
@ -84,16 +81,17 @@ public:
/** /**
* Disables the frontend that has instantiated this backend. Once * Disables the frontend that has instantiated this backend. Once
* disabled,the frontend will not send any further message over. * disabled, the frontend will not send any further message over.
*/ */
void DisableFrontend(); void DisableFrontend();
protected: protected:
// Methods that have to be overwritten by the individual readers // Methods that have to be overwritten by the individual readers
/** /**
* Reader-specific intialization method. Note that data may only be * Reader-specific intialization method. Note that data may only be
* read from the input source after the Start function has been called. * read from the input source after the Init() function has been
* called.
* *
* A reader implementation must override this method. If it returns * A reader implementation must override this method. If it returns
* false, it will be assumed that a fatal error has occured that * false, it will be assumed that a fatal error has occured that
@ -105,39 +103,39 @@ protected:
/** /**
* Reader-specific method implementing input finalization at * Reader-specific method implementing input finalization at
* termination. * termination.
* *
* A reader implementation must override this method but it can just * A reader implementation must override this method but it can just
* ignore calls if an input source must not be closed. * ignore calls if an input source can't actually be closed.
* *
* After the method is called, the writer will be deleted. If an error occurs * After the method is called, the writer will be deleted. If an
* during shutdown, an implementation should also call Error() to indicate what * error occurs during shutdown, an implementation should also call
* happened. * Error() to indicate what happened.
*/ */
virtual void DoClose() = 0; virtual void DoClose() = 0;
/** /**
* Reader-specific method implementing the forced update trigger * Reader-specific method implementing the forced update trigger.
* *
* A reader implementation must override this method but it can just ignore * A reader implementation must override this method but it can just
* calls, if a forced update does not fit the input source or the current input * ignore calls if a forced update does not fit the input source or
* reading mode. * the current input reading mode.
* *
* If it returns false, it will be assumed that a fatal error has occured * If it returns false, it will be assumed that a fatal error has
* that prevents the reader from further operation; it will then be * occured that prevents the reader from further operation; it will
* disabled and eventually deleted. When returning false, an implementation * then be disabled and eventually deleted. When returning false, an
* should also call Error to indicate what happened. * implementation should also call Error to indicate what happened.
*/ */
virtual bool DoUpdate() = 0; virtual bool DoUpdate() = 0;
/** /**
* Returns the input source as passed into the constructor. * Returns the input source as passed into the constructor.
*/ */
const string Source() const { return source; } const string Source() const { return source; }
/** /**
* Method allowing a reader to send a specified bro event. * Method allowing a reader to send a specified Bro event. Vals must
* Vals must match the values expected by the bro event. * match the values expected by the bro event.
* *
* @param name name of the bro event to send * @param name name of the bro event to send
* *
@ -147,30 +145,33 @@ protected:
*/ */
void SendEvent(const string& name, const int num_vals, threading::Value* *vals); void SendEvent(const string& name, const int num_vals, threading::Value* *vals);
// Content-sending-functions (simple mode). Including table-specific stuff that // Content-sending-functions (simple mode). Including table-specific
// simply is not used if we have no table // stuff that simply is not used if we have no table.
/** /**
* Method allowing a reader to send a list of values read for a specific stream * Method allowing a reader to send a list of values read from a
* back to the manager. * specific stream back to the manager in simple mode.
* *
* If the stream is a table stream, the values are inserted into the table; * If the stream is a table stream, the values are inserted into the
* if it is an event stream, the event is raised. * table; if it is an event stream, the event is raised.
* *
* @param val list of threading::Values expected by the stream * @param val list of threading::Values expected by the stream
*/ */
void Put(threading::Value* *val); void Put(threading::Value* *val);
/** /**
* Method allowing a reader to delete a specific value from a bro table. * Method allowing a reader to delete a specific value from a Bro
* table.
* *
* If the receiving stream is an event stream, only a removed event is raised * If the receiving stream is an event stream, only a removed event
* is raised.
* *
* @param val list of threading::Values expected by the stream * @param val list of threading::Values expected by the stream
*/ */
void Delete(threading::Value* *val); void Delete(threading::Value* *val);
/** /**
* Method allowing a reader to clear a value from a bro table. * Method allowing a reader to clear a Bro table.
* *
* If the receiving stream is an event stream, this is ignored. * If the receiving stream is an event stream, this is ignored.
* *
@ -178,26 +179,25 @@ protected:
void Clear(); void Clear();
// Content-sending-functions (tracking mode): Only changed lines are propagated. // Content-sending-functions (tracking mode): Only changed lines are propagated.
/** /**
* Method allowing a reader to send a list of values read for a specific stream * Method allowing a reader to send a list of values read from
* back to the manager. * specific stream back to the manager in tracking mode.
* *
* If the stream is a table stream, the values are inserted into the table; * If the stream is a table stream, the values are inserted into the
* if it is an event stream, the event is raised. * table; if it is an event stream, the event is raised.
* *
* @param val list of threading::Values expected by the stream * @param val list of threading::Values expected by the stream
*/ */
void SendEntry(threading::Value* *vals); void SendEntry(threading::Value* *vals);
/** /**
* Method telling the manager, that the current list of entries sent by SendEntry * Method telling the manager, that the current list of entries sent
* is finished. * by SendEntry is finished.
*
* For table streams, all entries that were not updated since the last EndCurrentSend
* will be deleted, because they are no longer present in the input source
* *
* For table streams, all entries that were not updated since the
* last EndCurrentSend will be deleted, because they are no longer
* present in the input source
*/ */
void EndCurrentSend(); void EndCurrentSend();
@ -207,14 +207,14 @@ protected:
* This method can be overridden but once must call * This method can be overridden but once must call
* ReaderBackend::DoHeartbeat(). * ReaderBackend::DoHeartbeat().
*/ */
virtual bool DoHeartbeat(double network_time, double current_time); virtual bool DoHeartbeat(double network_time, double current_time);
/** /**
* Utility function for Readers - convert a string into a TransportProto * Utility function for Readers - convert a string into a TransportProto
* *
* @param proto the transport protocol * @param proto the transport protocol
*/ */
TransportProto StringToProto(const string &proto); TransportProto StringToProto(const string &proto);
/** /**
* Utility function for Readers - convert a string into a Value::addr_t * Utility function for Readers - convert a string into a Value::addr_t
@ -224,20 +224,16 @@ protected:
threading::Value::addr_t StringToAddr(const string &addr); threading::Value::addr_t StringToAddr(const string &addr);
private: private:
// Frontend that instantiated us. This object must not be access from // Frontend that instantiated us. This object must not be accessed
// this class, it's running in a different thread! // from this class, it's running in a different thread!
ReaderFrontend* frontend; ReaderFrontend* frontend;
string source; string source;
bool disabled; bool disabled;
// For implementing Fmt().
char* buf;
unsigned int buf_len;
unsigned int num_fields; unsigned int num_fields;
const threading::Field* const * fields; // raw mapping const threading::Field* const * fields; // raw mapping
}; };
} }

View file

@ -12,11 +12,15 @@ namespace input {
class InitMessage : public threading::InputMessage<ReaderBackend> class InitMessage : public threading::InputMessage<ReaderBackend>
{ {
public: public:
InitMessage(ReaderBackend* backend, const string source, const int mode, const int num_fields, const threading::Field* const* fields) InitMessage(ReaderBackend* backend, const string source, const int mode,
const int num_fields, const threading::Field* const* fields)
: threading::InputMessage<ReaderBackend>("Init", backend), : threading::InputMessage<ReaderBackend>("Init", backend),
source(source), mode(mode), num_fields(num_fields), fields(fields) { } source(source), mode(mode), num_fields(num_fields), fields(fields) { }
virtual bool Process() { return Object()->Init(source, mode, num_fields, fields); } virtual bool Process()
{
return Object()->Init(source, mode, num_fields, fields);
}
private: private:
const string source; const string source;
@ -46,7 +50,7 @@ public:
}; };
ReaderFrontend::ReaderFrontend(bro_int_t type) ReaderFrontend::ReaderFrontend(bro_int_t type)
{ {
disabled = initialized = false; disabled = initialized = false;
ty_name = "<not set>"; ty_name = "<not set>";
@ -56,12 +60,12 @@ ReaderFrontend::ReaderFrontend(bro_int_t type)
backend->Start(); backend->Start();
} }
ReaderFrontend::~ReaderFrontend() ReaderFrontend::~ReaderFrontend()
{ {
} }
void ReaderFrontend::Init(string arg_source, int mode, const int num_fields, void ReaderFrontend::Init(string arg_source, int mode, const int num_fields,
const threading::Field* const* fields) const threading::Field* const* fields)
{ {
if ( disabled ) if ( disabled )
return; return;
@ -73,14 +77,14 @@ void ReaderFrontend::Init(string arg_source, int mode, const int num_fields,
initialized = true; initialized = true;
backend->SendIn(new InitMessage(backend, arg_source, mode, num_fields, fields)); backend->SendIn(new InitMessage(backend, arg_source, mode, num_fields, fields));
} }
void ReaderFrontend::Update() void ReaderFrontend::Update()
{ {
if ( disabled ) if ( disabled )
return; return;
if ( !initialized ) if ( ! initialized )
{ {
reporter->Error("Tried to call update on uninitialized reader"); reporter->Error("Tried to call update on uninitialized reader");
return; return;
@ -89,12 +93,12 @@ void ReaderFrontend::Update()
backend->SendIn(new UpdateMessage(backend)); backend->SendIn(new UpdateMessage(backend));
} }
void ReaderFrontend::Close() void ReaderFrontend::Close()
{ {
if ( disabled ) if ( disabled )
return; return;
if ( !initialized ) if ( ! initialized )
{ {
reporter->Error("Tried to call finish on uninitialized reader"); reporter->Error("Tried to call finish on uninitialized reader");
return; return;

View file

@ -29,14 +29,14 @@ public:
* corresponding type. * corresponding type.
* *
* Frontends must only be instantiated by the main thread. * Frontends must only be instantiated by the main thread.
*/ */
ReaderFrontend(bro_int_t type); ReaderFrontend(bro_int_t type);
/** /**
* Destructor. * Destructor.
* *
* Frontends must only be destroyed by the main thread. * Frontends must only be destroyed by the main thread.
*/ */
virtual ~ReaderFrontend(); virtual ~ReaderFrontend();
/** /**
@ -47,37 +47,39 @@ public:
* sends a message back that will asynchronously call Disable(). * sends a message back that will asynchronously call Disable().
* *
* See ReaderBackend::Init() for arguments. * See ReaderBackend::Init() for arguments.
*
* This method must only be called from the main thread. * This method must only be called from the main thread.
*/ */
void Init(string arg_source, int mode, const int arg_num_fields, const threading::Field* const* fields); void Init(string arg_source, int mode, const int arg_num_fields, const threading::Field* const* fields);
/** /**
* Force an update of the current input source. Actual action depends on * Force an update of the current input source. Actual action depends
* the opening mode and on the input source. * on the opening mode and on the input source.
* *
* This method generates a message to the backend reader and triggers * This method generates a message to the backend reader and triggers
* the corresponding message there. * the corresponding message there.
*
* This method must only be called from the main thread. * This method must only be called from the main thread.
*/ */
void Update(); void Update();
/** /**
* Finalizes writing to this tream. * Finalizes reading from this stream.
* *
* This method generates a message to the backend reader and triggers * This method generates a message to the backend reader and triggers
* the corresponding message there. * the corresponding message there. This method must only be called
* This method must only be called from the main thread. * from the main thread.
*/ */
void Close(); void Close();
/** /**
* Disables the reader frontend. From now on, all method calls that * Disables the reader frontend. From now on, all method calls that
* would normally send message over to the backend, turn into no-ops. * would normally send message over to the backend, turn into no-ops.
* Note though that it does not stop the backend itself, use Finsh() * Note though that it does not stop the backend itself, use Finish()
* to do that as well (this method is primarily for use as callback * to do that as well (this method is primarily for use as callback
* when the backend wants to disable the frontend). * when the backend wants to disable the frontend).
* *
* Disabled frontend will eventually be discarded by the * Disabled frontends will eventually be discarded by the
* input::Manager. * input::Manager.
* *
* This method must only be called from the main thread. * This method must only be called from the main thread.
@ -85,9 +87,10 @@ public:
void SetDisable() { disabled = true; } void SetDisable() { disabled = true; }
/** /**
* Returns true if the reader frontend has been disabled with SetDisable(). * Returns true if the reader frontend has been disabled with
* SetDisable().
*/ */
bool Disabled() { return disabled; } bool Disabled() { return disabled; }
/** /**
* Returns a descriptive name for the reader, including the type of * Returns a descriptive name for the reader, including the type of
@ -101,18 +104,21 @@ protected:
friend class Manager; friend class Manager;
/** /**
* Returns the source as passed into the constructor * Returns the source as passed into the constructor.
*/ */
const string Source() const { return source; }; const string& Source() const { return source; };
string ty_name; // Name of the backend type. Set by the manager. /**
* Returns the name of the backend's type.
*/
const string& TypeName() const { return ty_name; }
private: private:
ReaderBackend* backend; // The backend we have instanatiated. ReaderBackend* backend; // The backend we have instanatiated.
string source; string source;
string ty_name; // Backend type, set by manager.
bool disabled; // True if disabled. bool disabled; // True if disabled.
bool initialized; // True if initialized. bool initialized; // True if initialized.
}; };
} }

View file

@ -20,8 +20,7 @@ using namespace input::reader;
using threading::Value; using threading::Value;
using threading::Field; using threading::Field;
FieldMapping::FieldMapping(const string& arg_name, const TypeTag& arg_type, int arg_position)
FieldMapping::FieldMapping(const string& arg_name, const TypeTag& arg_type, int arg_position)
: name(arg_name), type(arg_type) : name(arg_name), type(arg_type)
{ {
position = arg_position; position = arg_position;
@ -29,8 +28,8 @@ FieldMapping::FieldMapping(const string& arg_name, const TypeTag& arg_type, int
present = true; present = true;
} }
FieldMapping::FieldMapping(const string& arg_name, const TypeTag& arg_type, FieldMapping::FieldMapping(const string& arg_name, const TypeTag& arg_type,
const TypeTag& arg_subtype, int arg_position) const TypeTag& arg_subtype, int arg_position)
: name(arg_name), type(arg_type), subtype(arg_subtype) : name(arg_name), type(arg_type), subtype(arg_subtype)
{ {
position = arg_position; position = arg_position;
@ -38,14 +37,14 @@ FieldMapping::FieldMapping(const string& arg_name, const TypeTag& arg_type,
present = true; present = true;
} }
FieldMapping::FieldMapping(const FieldMapping& arg) FieldMapping::FieldMapping(const FieldMapping& arg)
: name(arg.name), type(arg.type), subtype(arg.subtype), present(arg.present) : name(arg.name), type(arg.type), subtype(arg.subtype), present(arg.present)
{ {
position = arg.position; position = arg.position;
secondary_position = arg.secondary_position; secondary_position = arg.secondary_position;
} }
FieldMapping FieldMapping::subType() FieldMapping FieldMapping::subType()
{ {
return FieldMapping(name, subtype, position); return FieldMapping(name, subtype, position);
} }
@ -54,23 +53,23 @@ Ascii::Ascii(ReaderFrontend *frontend) : ReaderBackend(frontend)
{ {
file = 0; file = 0;
separator.assign( (const char*) BifConst::InputAscii::separator->Bytes(),
separator.assign( (const char*) BifConst::InputAscii::separator->Bytes(),
BifConst::InputAscii::separator->Len()); BifConst::InputAscii::separator->Len());
if ( separator.size() != 1 )
if ( separator.size() != 1 )
Error("separator length has to be 1. Separator will be truncated."); Error("separator length has to be 1. Separator will be truncated.");
set_separator.assign( (const char*) BifConst::InputAscii::set_separator->Bytes(), set_separator.assign( (const char*) BifConst::InputAscii::set_separator->Bytes(),
BifConst::InputAscii::set_separator->Len()); BifConst::InputAscii::set_separator->Len());
if ( set_separator.size() != 1 )
if ( set_separator.size() != 1 )
Error("set_separator length has to be 1. Separator will be truncated."); Error("set_separator length has to be 1. Separator will be truncated.");
empty_field.assign( (const char*) BifConst::InputAscii::empty_field->Bytes(), empty_field.assign( (const char*) BifConst::InputAscii::empty_field->Bytes(),
BifConst::InputAscii::empty_field->Len()); BifConst::InputAscii::empty_field->Len());
unset_field.assign( (const char*) BifConst::InputAscii::unset_field->Bytes(),
BifConst::InputAscii::unset_field->Len());
unset_field.assign( (const char*) BifConst::InputAscii::unset_field->Bytes(),
BifConst::InputAscii::unset_field->Len());
} }
Ascii::~Ascii() Ascii::~Ascii()
@ -80,7 +79,7 @@ Ascii::~Ascii()
void Ascii::DoClose() void Ascii::DoClose()
{ {
if ( file != 0 ) if ( file != 0 )
{ {
file->close(); file->close();
delete(file); delete(file);
@ -93,26 +92,26 @@ bool Ascii::DoInit(string path, int arg_mode, int arg_num_fields, const Field* c
fname = path; fname = path;
mode = arg_mode; mode = arg_mode;
mtime = 0; mtime = 0;
num_fields = arg_num_fields; num_fields = arg_num_fields;
fields = arg_fields; fields = arg_fields;
if ( ( mode != MANUAL ) && (mode != REREAD) && ( mode != STREAM ) ) if ( (mode != MANUAL) && (mode != REREAD) && (mode != STREAM) )
{ {
Error(Fmt("Unsupported read mode %d for source %s", mode, path.c_str())); Error(Fmt("Unsupported read mode %d for source %s", mode, path.c_str()));
return false; return false;
} }
file = new ifstream(path.c_str()); file = new ifstream(path.c_str());
if ( !file->is_open() ) if ( ! file->is_open() )
{ {
Error(Fmt("Init: cannot open %s", fname.c_str())); Error(Fmt("Init: cannot open %s", fname.c_str()));
delete(file); delete(file);
file = 0; file = 0;
return false; return false;
} }
if ( ReadHeader(false) == false ) if ( ReadHeader(false) == false )
{ {
Error(Fmt("Init: cannot open %s; headers are incorrect", fname.c_str())); Error(Fmt("Init: cannot open %s; headers are incorrect", fname.c_str()));
file->close(); file->close();
@ -120,22 +119,22 @@ bool Ascii::DoInit(string path, int arg_mode, int arg_num_fields, const Field* c
file = 0; file = 0;
return false; return false;
} }
DoUpdate(); DoUpdate();
return true; return true;
} }
bool Ascii::ReadHeader(bool useCached) bool Ascii::ReadHeader(bool useCached)
{ {
// try to read the header line... // try to read the header line...
string line; string line;
map<string, uint32_t> ifields; map<string, uint32_t> ifields;
if ( !useCached ) if ( ! useCached )
{ {
if ( !GetLine(line) ) if ( ! GetLine(line) )
{ {
Error("could not read first line"); Error("could not read first line");
return false; return false;
@ -143,16 +142,17 @@ bool Ascii::ReadHeader(bool useCached)
headerline = line; headerline = line;
} }
else
else
line = headerline; line = headerline;
// construct list of field names. // construct list of field names.
istringstream splitstream(line); istringstream splitstream(line);
int pos=0; int pos=0;
while ( splitstream ) while ( splitstream )
{ {
string s; string s;
if ( !getline(splitstream, s, separator[0])) if ( ! getline(splitstream, s, separator[0]))
break; break;
ifields[s] = pos; ifields[s] = pos;
@ -161,15 +161,15 @@ bool Ascii::ReadHeader(bool useCached)
//printf("Updating fields from description %s\n", line.c_str()); //printf("Updating fields from description %s\n", line.c_str());
columnMap.clear(); columnMap.clear();
for ( unsigned int i = 0; i < num_fields; i++ ) for ( unsigned int i = 0; i < num_fields; i++ )
{ {
const Field* field = fields[i]; const Field* field = fields[i];
map<string, uint32_t>::iterator fit = ifields.find(field->name); map<string, uint32_t>::iterator fit = ifields.find(field->name);
if ( fit == ifields.end() ) if ( fit == ifields.end() )
{ {
if ( field->optional ) if ( field->optional )
{ {
// we do not really need this field. mark it as not present and always send an undef back. // we do not really need this field. mark it as not present and always send an undef back.
FieldMapping f(field->name, field->type, field->subtype, -1); FieldMapping f(field->name, field->type, field->subtype, -1);
@ -178,38 +178,43 @@ bool Ascii::ReadHeader(bool useCached)
continue; continue;
} }
Error(Fmt("Did not find requested field %s in input data file %s.", field->name.c_str(), fname.c_str())); Error(Fmt("Did not find requested field %s in input data file %s.",
field->name.c_str(), fname.c_str()));
return false; return false;
} }
FieldMapping f(field->name, field->type, field->subtype, ifields[field->name]); FieldMapping f(field->name, field->type, field->subtype, ifields[field->name]);
if ( field->secondary_name != "" )
if ( field->secondary_name != "" )
{ {
map<string, uint32_t>::iterator fit2 = ifields.find(field->secondary_name); map<string, uint32_t>::iterator fit2 = ifields.find(field->secondary_name);
if ( fit2 == ifields.end() ) if ( fit2 == ifields.end() )
{ {
Error(Fmt("Could not find requested port type field %s in input data file.", field->secondary_name.c_str())); Error(Fmt("Could not find requested port type field %s in input data file.",
field->secondary_name.c_str()));
return false; return false;
} }
f.secondary_position = ifields[field->secondary_name]; f.secondary_position = ifields[field->secondary_name];
} }
columnMap.push_back(f); columnMap.push_back(f);
} }
// well, that seems to have worked... // well, that seems to have worked...
return true; return true;
} }
bool Ascii::GetLine(string& str) bool Ascii::GetLine(string& str)
{ {
while ( getline(*file, str) ) while ( getline(*file, str) )
{ {
if ( str[0] != '#' ) if ( str[0] != '#' )
return true; return true;
if ( str.compare(0,8, "#fields\t") == 0 ) if ( str.compare(0,8, "#fields\t") == 0 )
{ {
str = str.substr(8); str = str.substr(8);
return true; return true;
@ -220,14 +225,13 @@ bool Ascii::GetLine(string& str)
} }
Value* Ascii::EntryToVal(string s, FieldMapping field) Value* Ascii::EntryToVal(string s, FieldMapping field)
{ {
if ( s.compare(unset_field) == 0 ) // field is not set... if ( s.compare(unset_field) == 0 ) // field is not set...
return new Value(field.type, false); return new Value(field.type, false);
Value* val = new Value(field.type, true); Value* val = new Value(field.type, true);
switch ( field.type ) { switch ( field.type ) {
case TYPE_ENUM: case TYPE_ENUM:
case TYPE_STRING: case TYPE_STRING:
@ -235,13 +239,14 @@ Value* Ascii::EntryToVal(string s, FieldMapping field)
break; break;
case TYPE_BOOL: case TYPE_BOOL:
if ( s == "T" ) if ( s == "T" )
val->val.int_val = 1; val->val.int_val = 1;
else if ( s == "F" ) else if ( s == "F" )
val->val.int_val = 0; val->val.int_val = 0;
else else
{ {
Error(Fmt("Field: %s Invalid value for boolean: %s", field.name.c_str(), s.c_str())); Error(Fmt("Field: %s Invalid value for boolean: %s",
field.name.c_str(), s.c_str()));
return false; return false;
} }
break; break;
@ -266,13 +271,15 @@ Value* Ascii::EntryToVal(string s, FieldMapping field)
val->val.port_val.proto = TRANSPORT_UNKNOWN; val->val.port_val.proto = TRANSPORT_UNKNOWN;
break; break;
case TYPE_SUBNET: case TYPE_SUBNET:
{ {
size_t pos = s.find("/"); size_t pos = s.find("/");
if ( pos == s.npos ) { if ( pos == s.npos )
{
Error(Fmt("Invalid value for subnet: %s", s.c_str())); Error(Fmt("Invalid value for subnet: %s", s.c_str()));
return false; return false;
} }
int width = atoi(s.substr(pos+1).c_str()); int width = atoi(s.substr(pos+1).c_str());
string addr = s.substr(0, pos); string addr = s.substr(0, pos);
@ -281,7 +288,7 @@ Value* Ascii::EntryToVal(string s, FieldMapping field)
break; break;
} }
case TYPE_ADDR: case TYPE_ADDR:
val->val.addr_val = StringToAddr(s); val->val.addr_val = StringToAddr(s);
break; break;
@ -295,42 +302,42 @@ Value* Ascii::EntryToVal(string s, FieldMapping field)
// how many entries do we have... // how many entries do we have...
unsigned int length = 1; unsigned int length = 1;
for ( unsigned int i = 0; i < s.size(); i++ ) for ( unsigned int i = 0; i < s.size(); i++ )
if ( s[i] == ',') length++; if ( s[i] == ',' ) length++;
unsigned int pos = 0; unsigned int pos = 0;
if ( s.compare(empty_field) == 0 ) if ( s.compare(empty_field) == 0 )
length = 0; length = 0;
Value** lvals = new Value* [length]; Value** lvals = new Value* [length];
if ( field.type == TYPE_TABLE ) if ( field.type == TYPE_TABLE )
{ {
val->val.set_val.vals = lvals; val->val.set_val.vals = lvals;
val->val.set_val.size = length; val->val.set_val.size = length;
} }
else if ( field.type == TYPE_VECTOR )
else if ( field.type == TYPE_VECTOR )
{ {
val->val.vector_val.vals = lvals; val->val.vector_val.vals = lvals;
val->val.vector_val.size = length; val->val.vector_val.size = length;
}
else
{
assert(false);
} }
else
assert(false);
if ( length == 0 ) if ( length == 0 )
break; //empty break; //empty
istringstream splitstream(s); istringstream splitstream(s);
while ( splitstream ) while ( splitstream )
{ {
string element; string element;
if ( !getline(splitstream, element, set_separator[0]) ) if ( ! getline(splitstream, element, set_separator[0]) )
break; break;
if ( pos >= length ) if ( pos >= length )
{ {
Error(Fmt("Internal error while parsing set. pos %d >= length %d." Error(Fmt("Internal error while parsing set. pos %d >= length %d."
" Element: %s", pos, length, element.c_str())); " Element: %s", pos, length, element.c_str()));
@ -338,18 +345,18 @@ Value* Ascii::EntryToVal(string s, FieldMapping field)
} }
Value* newval = EntryToVal(element, field.subType()); Value* newval = EntryToVal(element, field.subType());
if ( newval == 0 ) if ( newval == 0 )
{ {
Error("Error while reading set"); Error("Error while reading set");
return 0; return 0;
} }
lvals[pos] = newval; lvals[pos] = newval;
pos++; pos++;
} }
if ( pos != length )
if ( pos != length )
{ {
Error("Internal error while parsing set: did not find all elements"); Error("Internal error while parsing set: did not find all elements");
return 0; return 0;
@ -358,24 +365,24 @@ Value* Ascii::EntryToVal(string s, FieldMapping field)
break; break;
} }
default: default:
Error(Fmt("unsupported field format %d for %s", field.type, Error(Fmt("unsupported field format %d for %s", field.type,
field.name.c_str())); field.name.c_str()));
return 0; return 0;
} }
return val; return val;
} }
// read the entire file and send appropriate thingies back to InputMgr // read the entire file and send appropriate thingies back to InputMgr
bool Ascii::DoUpdate() bool Ascii::DoUpdate()
{ {
switch ( mode ) { switch ( mode ) {
case REREAD: case REREAD:
{
// check if the file has changed // check if the file has changed
struct stat sb; struct stat sb;
if ( stat(fname.c_str(), &sb) == -1 ) if ( stat(fname.c_str(), &sb) == -1 )
{ {
Error(Fmt("Could not get stat for %s", fname.c_str())); Error(Fmt("Could not get stat for %s", fname.c_str()));
return false; return false;
@ -388,54 +395,58 @@ bool Ascii::DoUpdate()
// file changed. reread. // file changed. reread.
// fallthrough // fallthrough
}
case MANUAL: case MANUAL:
case STREAM: case STREAM:
{
// dirty, fix me. (well, apparently after trying seeking, etc // dirty, fix me. (well, apparently after trying seeking, etc
// - this is not that bad) // - this is not that bad)
if ( file && file->is_open() ) if ( file && file->is_open() )
{ {
if ( mode == STREAM ) if ( mode == STREAM )
{ {
file->clear(); // remove end of file evil bits file->clear(); // remove end of file evil bits
if ( !ReadHeader(true) ) if ( !ReadHeader(true) )
return false; // header reading failed return false; // header reading failed
break; break;
} }
file->close(); file->close();
} }
file = new ifstream(fname.c_str()); file = new ifstream(fname.c_str());
if ( !file->is_open() ) if ( !file->is_open() )
{ {
Error(Fmt("cannot open %s", fname.c_str())); Error(Fmt("cannot open %s", fname.c_str()));
return false; return false;
} }
if ( ReadHeader(false) == false )
if ( ReadHeader(false) == false )
{ {
return false; return false;
} }
break; break;
}
default: default:
assert(false); assert(false);
} }
string line; string line;
while ( GetLine(line ) ) while ( GetLine(line ) )
{ {
// split on tabs // split on tabs
istringstream splitstream(line); istringstream splitstream(line);
map<int, string> stringfields; map<int, string> stringfields;
int pos = 0; int pos = 0;
while ( splitstream ) while ( splitstream )
{ {
string s; string s;
if ( !getline(splitstream, s, separator[0]) ) if ( ! getline(splitstream, s, separator[0]) )
break; break;
stringfields[pos] = s; stringfields[pos] = s;
@ -444,7 +455,6 @@ bool Ascii::DoUpdate()
pos--; // for easy comparisons of max element. pos--; // for easy comparisons of max element.
Value** fields = new Value*[num_fields]; Value** fields = new Value*[num_fields];
int fpos = 0; int fpos = 0;
@ -453,33 +463,34 @@ bool Ascii::DoUpdate()
fit++ ) fit++ )
{ {
if ( ! fit->present ) if ( ! fit->present )
{ {
// add non-present field // add non-present field
fields[fpos] = new Value((*fit).type, false); fields[fpos] = new Value((*fit).type, false);
fpos++; fpos++;
continue; continue;
} }
assert(fit->position >= 0 ); assert(fit->position >= 0 );
if ( (*fit).position > pos || (*fit).secondary_position > pos ) if ( (*fit).position > pos || (*fit).secondary_position > pos )
{ {
Error(Fmt("Not enough fields in line %s. Found %d fields, want positions %d and %d", line.c_str(), pos, (*fit).position, (*fit).secondary_position)); Error(Fmt("Not enough fields in line %s. Found %d fields, want positions %d and %d",
line.c_str(), pos, (*fit).position, (*fit).secondary_position));
return false; return false;
} }
Value* val = EntryToVal(stringfields[(*fit).position], *fit); Value* val = EntryToVal(stringfields[(*fit).position], *fit);
if ( val == 0 ) if ( val == 0 )
{ {
Error("Could not convert String value to Val"); Error("Could not convert String value to Val");
return false; return false;
} }
if ( (*fit).secondary_position != -1 ) if ( (*fit).secondary_position != -1 )
{ {
// we have a port definition :) // we have a port definition :)
assert(val->type == TYPE_PORT ); assert(val->type == TYPE_PORT );
// Error(Fmt("Got type %d != PORT with secondary position!", val->type)); // Error(Fmt("Got type %d != PORT with secondary position!", val->type));
val->val.port_val.proto = StringToProto(stringfields[(*fit).secondary_position]); val->val.port_val.proto = StringToProto(stringfields[(*fit).secondary_position]);
@ -493,31 +504,33 @@ bool Ascii::DoUpdate()
//printf("fpos: %d, second.num_fields: %d\n", fpos, (*it).second.num_fields); //printf("fpos: %d, second.num_fields: %d\n", fpos, (*it).second.num_fields);
assert ( (unsigned int) fpos == num_fields ); assert ( (unsigned int) fpos == num_fields );
if ( mode == STREAM ) if ( mode == STREAM )
Put(fields); Put(fields);
else else
SendEntry(fields); SendEntry(fields);
} }
if ( mode != STREAM ) if ( mode != STREAM )
EndCurrentSend(); EndCurrentSend();
return true; return true;
} }
bool Ascii::DoHeartbeat(double network_time, double current_time) bool Ascii::DoHeartbeat(double network_time, double current_time)
{ {
ReaderBackend::DoHeartbeat(network_time, current_time); ReaderBackend::DoHeartbeat(network_time, current_time);
switch ( mode ) { switch ( mode ) {
case MANUAL: case MANUAL:
// yay, we do nothing :) // yay, we do nothing :)
break; break;
case REREAD: case REREAD:
case STREAM: case STREAM:
Update(); // call update and not DoUpdate, because update Update(); // call update and not DoUpdate, because update
// checks disabled. // checks disabled.
break; break;
default: default:
assert(false); assert(false);
} }

View file

@ -14,73 +14,57 @@ namespace input { namespace reader {
struct FieldMapping { struct FieldMapping {
string name; string name;
TypeTag type; TypeTag type;
// internal type for sets and vectors TypeTag subtype; // internal type for sets and vectors
TypeTag subtype;
int position; int position;
// for ports: pos of the second field int secondary_position; // for ports: pos of the second field
int secondary_position;
bool present; bool present;
FieldMapping(const string& arg_name, const TypeTag& arg_type, int arg_position); FieldMapping(const string& arg_name, const TypeTag& arg_type, int arg_position);
FieldMapping(const string& arg_name, const TypeTag& arg_type, const TypeTag& arg_subtype, int arg_position); FieldMapping(const string& arg_name, const TypeTag& arg_type, const TypeTag& arg_subtype, int arg_position);
FieldMapping(const FieldMapping& arg); FieldMapping(const FieldMapping& arg);
FieldMapping() { position = -1; secondary_position = -1; } FieldMapping() { position = -1; secondary_position = -1; }
FieldMapping subType(); FieldMapping subType();
//bool IsEmpty() { return position == -1; }
}; };
class Ascii : public ReaderBackend { class Ascii : public ReaderBackend {
public: public:
Ascii(ReaderFrontend* frontend); Ascii(ReaderFrontend* frontend);
~Ascii(); ~Ascii();
static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Ascii(frontend); } static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Ascii(frontend); }
protected: protected:
virtual bool DoInit(string path, int mode, int arg_num_fields, const threading::Field* const* fields); virtual bool DoInit(string path, int mode, int arg_num_fields, const threading::Field* const* fields);
virtual void DoClose(); virtual void DoClose();
virtual bool DoUpdate(); virtual bool DoUpdate();
private: private:
virtual bool DoHeartbeat(double network_time, double current_time); virtual bool DoHeartbeat(double network_time, double current_time);
unsigned int num_fields;
const threading::Field* const * fields; // raw mapping
// map columns in the file to columns to send back to the manager
vector<FieldMapping> columnMap;
bool ReadHeader(bool useCached); bool ReadHeader(bool useCached);
bool GetLine(string& str);
threading::Value* EntryToVal(string s, FieldMapping type); threading::Value* EntryToVal(string s, FieldMapping type);
bool GetLine(string& str); unsigned int num_fields;
const threading::Field* const *fields; // raw mapping
ifstream* file; ifstream* file;
string fname; string fname;
int mode;
time_t mtime;
// Options set from the script-level. // map columns in the file to columns to send back to the manager
string separator; vector<FieldMapping> columnMap;
string set_separator;
string empty_field;
string unset_field;
// keep a copy of the headerline to determine field locations when stream descriptions change // keep a copy of the headerline to determine field locations when stream descriptions change
string headerline; string headerline;
int mode; // Options set from the script-level.
string separator;
time_t mtime; string set_separator;
string empty_field;
string unset_field;
}; };

View file

@ -23,15 +23,14 @@ using threading::Field;
Benchmark::Benchmark(ReaderFrontend *frontend) : ReaderBackend(frontend) Benchmark::Benchmark(ReaderFrontend *frontend) : ReaderBackend(frontend)
{ {
multiplication_factor = double(BifConst::InputBenchmark::factor); multiplication_factor = double(BifConst::InputBenchmark::factor);
autospread = double(BifConst::InputBenchmark::autospread); autospread = double(BifConst::InputBenchmark::autospread);
spread = int(BifConst::InputBenchmark::spread); spread = int(BifConst::InputBenchmark::spread);
add = int(BifConst::InputBenchmark::addfactor); add = int(BifConst::InputBenchmark::addfactor);
autospread_time = 0; autospread_time = 0;
stopspreadat = int(BifConst::InputBenchmark::stopspreadat); stopspreadat = int(BifConst::InputBenchmark::stopspreadat);
timedspread = double(BifConst::InputBenchmark::timedspread); timedspread = double(BifConst::InputBenchmark::timedspread);
heart_beat_interval = double(BifConst::Threading::heart_beat_interval); heart_beat_interval = double(BifConst::Threading::heart_beat_interval);
} }
Benchmark::~Benchmark() Benchmark::~Benchmark()
@ -46,15 +45,15 @@ void Benchmark::DoClose()
bool Benchmark::DoInit(string path, int arg_mode, int arg_num_fields, const Field* const* arg_fields) bool Benchmark::DoInit(string path, int arg_mode, int arg_num_fields, const Field* const* arg_fields)
{ {
mode = arg_mode; mode = arg_mode;
num_fields = arg_num_fields; num_fields = arg_num_fields;
fields = arg_fields; fields = arg_fields;
num_lines = atoi(path.c_str()); num_lines = atoi(path.c_str());
if ( autospread != 0.0 ) if ( autospread != 0.0 )
autospread_time = (int) ( (double) 1000000 / (autospread * (double) num_lines) ); autospread_time = (int) ( (double) 1000000 / (autospread * (double) num_lines) );
if ( ( mode != MANUAL ) && (mode != REREAD) && ( mode != STREAM ) ) if ( (mode != MANUAL) && (mode != REREAD) && (mode != STREAM) )
{ {
Error(Fmt("Unsupported read mode %d for source %s", mode, path.c_str())); Error(Fmt("Unsupported read mode %d for source %s", mode, path.c_str()));
return false; return false;
@ -66,7 +65,7 @@ bool Benchmark::DoInit(string path, int arg_mode, int arg_num_fields, const Fiel
return true; return true;
} }
string Benchmark::RandomString(const int len) string Benchmark::RandomString(const int len)
{ {
string s(len, ' '); string s(len, ' ');
@ -75,13 +74,13 @@ string Benchmark::RandomString(const int len)
"ABCDEFGHIJKLMNOPQRSTUVWXYZ" "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz"; "abcdefghijklmnopqrstuvwxyz";
for (int i = 0; i < len; ++i) for (int i = 0; i < len; ++i)
s[i] = values[rand() / (RAND_MAX / sizeof(values))]; s[i] = values[rand() / (RAND_MAX / sizeof(values))];
return s; return s;
} }
double Benchmark::CurrTime() double Benchmark::CurrTime()
{ {
struct timeval tv; struct timeval tv;
assert ( gettimeofday(&tv, 0) >= 0 ); assert ( gettimeofday(&tv, 0) >= 0 );
@ -91,56 +90,57 @@ double Benchmark::CurrTime()
// read the entire file and send appropriate thingies back to InputMgr // read the entire file and send appropriate thingies back to InputMgr
bool Benchmark::DoUpdate() bool Benchmark::DoUpdate()
{ {
int linestosend = num_lines * heart_beat_interval; int linestosend = num_lines * heart_beat_interval;
for ( int i = 0; i < linestosend; i++ ) for ( int i = 0; i < linestosend; i++ )
{ {
Value** field = new Value*[num_fields]; Value** field = new Value*[num_fields];
for (unsigned int j = 0; j < num_fields; j++ ) for (unsigned int j = 0; j < num_fields; j++ )
field[j] = EntryToVal(fields[j]->type, fields[j]->subtype); field[j] = EntryToVal(fields[j]->type, fields[j]->subtype);
if ( mode == STREAM ) if ( mode == STREAM )
// do not do tracking, spread out elements over the second that we have... // do not do tracking, spread out elements over the second that we have...
Put(field); Put(field);
else else
SendEntry(field); SendEntry(field);
if ( stopspreadat == 0 || num_lines < stopspreadat ) if ( stopspreadat == 0 || num_lines < stopspreadat )
{ {
if ( spread != 0 ) if ( spread != 0 )
usleep(spread); usleep(spread);
if ( autospread_time != 0 ) if ( autospread_time != 0 )
usleep( autospread_time ); usleep( autospread_time );
} }
if ( timedspread != 0.0 ) if ( timedspread != 0.0 )
{ {
double diff; double diff;
do do
diff = CurrTime() - heartbeatstarttime; diff = CurrTime() - heartbeatstarttime;
while ( diff/heart_beat_interval < i/(linestosend while ( diff/heart_beat_interval < i/(linestosend
+ (linestosend * timedspread) ) ); + (linestosend * timedspread) ) );
} }
} }
if ( mode != STREAM ) if ( mode != STREAM )
EndCurrentSend(); EndCurrentSend();
return true; return true;
} }
threading::Value* Benchmark::EntryToVal(TypeTag type, TypeTag subtype) threading::Value* Benchmark::EntryToVal(TypeTag type, TypeTag subtype)
{ {
Value* val = new Value(type, true); Value* val = new Value(type, true);
// basically construct something random from the fields that we want. // basically construct something random from the fields that we want.
switch ( type ) { switch ( type ) {
case TYPE_ENUM: case TYPE_ENUM:
assert(false); // no enums, please. assert(false); // no enums, please.
case TYPE_STRING: case TYPE_STRING:
val->val.string_val = new string(RandomString(10)); val->val.string_val = new string(RandomString(10));
break; break;
@ -172,14 +172,14 @@ threading::Value* Benchmark::EntryToVal(TypeTag type, TypeTag subtype)
val->val.port_val.proto = TRANSPORT_UNKNOWN; val->val.port_val.proto = TRANSPORT_UNKNOWN;
break; break;
case TYPE_SUBNET: case TYPE_SUBNET:
{ {
val->val.subnet_val.prefix = StringToAddr("192.168.17.1"); val->val.subnet_val.prefix = StringToAddr("192.168.17.1");
val->val.subnet_val.length = 16; val->val.subnet_val.length = 16;
} }
break; break;
case TYPE_ADDR: case TYPE_ADDR:
val->val.addr_val = StringToAddr("192.168.17.1"); val->val.addr_val = StringToAddr("192.168.17.1");
break; break;
@ -195,26 +195,26 @@ threading::Value* Benchmark::EntryToVal(TypeTag type, TypeTag subtype)
Value** lvals = new Value* [length]; Value** lvals = new Value* [length];
if ( type == TYPE_TABLE ) if ( type == TYPE_TABLE )
{ {
val->val.set_val.vals = lvals; val->val.set_val.vals = lvals;
val->val.set_val.size = length; val->val.set_val.size = length;
} }
else if ( type == TYPE_VECTOR ) else if ( type == TYPE_VECTOR )
{ {
val->val.vector_val.vals = lvals; val->val.vector_val.vals = lvals;
val->val.vector_val.size = length; val->val.vector_val.size = length;
} }
else else
assert(false); assert(false);
if ( length == 0 ) if ( length == 0 )
break; //empty break; //empty
for ( unsigned int pos = 0; pos < length; pos++ ) for ( unsigned int pos = 0; pos < length; pos++ )
{ {
Value* newval = EntryToVal(subtype, TYPE_ENUM); Value* newval = EntryToVal(subtype, TYPE_ENUM);
if ( newval == 0 ) if ( newval == 0 )
{ {
Error("Error while reading set"); Error("Error while reading set");
return 0; return 0;
@ -229,7 +229,7 @@ threading::Value* Benchmark::EntryToVal(TypeTag type, TypeTag subtype)
default: default:
Error(Fmt("unsupported field format %d", type)); Error(Fmt("unsupported field format %d", type));
return 0; return 0;
} }
return val; return val;
@ -247,9 +247,10 @@ bool Benchmark::DoHeartbeat(double network_time, double current_time)
case MANUAL: case MANUAL:
// yay, we do nothing :) // yay, we do nothing :)
break; break;
case REREAD: case REREAD:
case STREAM: case STREAM:
if ( multiplication_factor != 1 || add != 0 ) if ( multiplication_factor != 1 || add != 0 )
{ {
// we have to document at what time we changed the factor to what value. // we have to document at what time we changed the factor to what value.
Value** v = new Value*[2]; Value** v = new Value*[2];
@ -261,10 +262,10 @@ bool Benchmark::DoHeartbeat(double network_time, double current_time)
SendEvent("lines_changed", 2, v); SendEvent("lines_changed", 2, v);
} }
if ( autospread != 0.0 ) if ( autospread != 0.0 )
// because executing this in every loop is apparently too expensive. // because executing this in every loop is apparently too expensive.
autospread_time = (int) ( (double) 1000000 / (autospread * (double) num_lines) ); autospread_time = (int) ( (double) 1000000 / (autospread * (double) num_lines) );
Update(); // call update and not DoUpdate, because update actually checks disabled. Update(); // call update and not DoUpdate, because update actually checks disabled.
SendEvent("HeartbeatDone", 0, 0); SendEvent("HeartbeatDone", 0, 0);
@ -275,4 +276,3 @@ bool Benchmark::DoHeartbeat(double network_time, double current_time)
return true; return true;
} }

View file

@ -3,41 +3,37 @@
#ifndef INPUT_READERS_BENCHMARK_H #ifndef INPUT_READERS_BENCHMARK_H
#define INPUT_READERS_BENCHMARK_H #define INPUT_READERS_BENCHMARK_H
#include "../ReaderBackend.h" #include "../ReaderBackend.h"
namespace input { namespace reader { namespace input { namespace reader {
/**
* A benchmark reader to measure performance of the input framework.
*/
class Benchmark : public ReaderBackend { class Benchmark : public ReaderBackend {
public: public:
Benchmark(ReaderFrontend* frontend); Benchmark(ReaderFrontend* frontend);
~Benchmark(); ~Benchmark();
static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Benchmark(frontend); } static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Benchmark(frontend); }
protected: protected:
virtual bool DoInit(string path, int mode, int arg_num_fields, const threading::Field* const* fields); virtual bool DoInit(string path, int mode, int arg_num_fields, const threading::Field* const* fields);
virtual void DoClose(); virtual void DoClose();
virtual bool DoUpdate(); virtual bool DoUpdate();
private: private:
virtual bool DoHeartbeat(double network_time, double current_time); virtual bool DoHeartbeat(double network_time, double current_time);
unsigned int num_fields;
double CurrTime(); double CurrTime();
string RandomString(const int len);
const threading::Field* const * fields; // raw mapping
threading::Value* EntryToVal(TypeTag Type, TypeTag subtype); threading::Value* EntryToVal(TypeTag Type, TypeTag subtype);
unsigned int num_fields;
const threading::Field* const * fields; // raw mapping
int mode; int mode;
int num_lines; int num_lines;
double multiplication_factor; double multiplication_factor;
int spread; int spread;
double autospread; double autospread;
@ -47,9 +43,6 @@ private:
double heartbeatstarttime; double heartbeatstarttime;
double timedspread; double timedspread;
double heart_beat_interval; double heart_beat_interval;
string RandomString(const int len);
}; };

View file

@ -28,8 +28,10 @@ Raw::Raw(ReaderFrontend *frontend) : ReaderBackend(frontend)
file = 0; file = 0;
in = 0; in = 0;
separator.assign( (const char*) BifConst::InputRaw::record_separator->Bytes(), BifConst::InputRaw::record_separator->Len()); separator.assign( (const char*) BifConst::InputRaw::record_separator->Bytes(),
if ( separator.size() != 1 ) BifConst::InputRaw::record_separator->Len());
if ( separator.size() != 1 )
Error("separator length has to be 1. Separator will be truncated."); Error("separator length has to be 1. Separator will be truncated.");
} }
@ -40,57 +42,56 @@ Raw::~Raw()
void Raw::DoClose() void Raw::DoClose()
{ {
if ( file != 0 ) if ( file != 0 )
{ {
Close(); Close();
} }
} }
bool Raw::Open() bool Raw::Open()
{ {
if ( execute ) if ( execute )
{ {
file = popen(fname.c_str(), "r"); file = popen(fname.c_str(), "r");
if ( file == NULL ) if ( file == NULL )
{ {
Error(Fmt("Could not execute command %s", fname.c_str())); Error(Fmt("Could not execute command %s", fname.c_str()));
return false; return false;
} }
} }
else else
{ {
file = fopen(fname.c_str(), "r"); file = fopen(fname.c_str(), "r");
if ( file == NULL ) if ( file == NULL )
{ {
Error(Fmt("Init: cannot open %s", fname.c_str())); Error(Fmt("Init: cannot open %s", fname.c_str()));
return false; return false;
} }
} }
// This is defined in input/fdstream.h
in = new boost::fdistream(fileno(file)); in = new boost::fdistream(fileno(file));
if ( execute && mode == STREAM ) if ( execute && mode == STREAM )
{
fcntl(fileno(file), F_SETFL, O_NONBLOCK); fcntl(fileno(file), F_SETFL, O_NONBLOCK);
}
return true; return true;
} }
bool Raw::Close() bool Raw::Close()
{ {
if ( file == NULL ) if ( file == NULL )
{ {
InternalError(Fmt("Trying to close closed file for stream %s", fname.c_str())); InternalError(Fmt("Trying to close closed file for stream %s", fname.c_str()));
return false; return false;
} }
if ( execute ) if ( execute )
{ {
delete(in); delete(in);
pclose(file); pclose(file);
} }
else else
{ {
delete(in); delete(in);
fclose(file); fclose(file);
@ -114,13 +115,13 @@ bool Raw::DoInit(string path, int arg_mode, int arg_num_fields, const Field* con
num_fields = arg_num_fields; num_fields = arg_num_fields;
fields = arg_fields; fields = arg_fields;
if ( path.length() == 0 ) if ( path.length() == 0 )
{ {
Error("No source path provided"); Error("No source path provided");
return false; return false;
} }
if ( arg_num_fields != 1 ) if ( arg_num_fields != 1 )
{ {
Error("Filter for raw reader contains more than one field. " Error("Filter for raw reader contains more than one field. "
"Filters for the raw reader may only contain exactly one string field. " "Filters for the raw reader may only contain exactly one string field. "
@ -128,7 +129,7 @@ bool Raw::DoInit(string path, int arg_mode, int arg_num_fields, const Field* con
return false; return false;
} }
if ( fields[0]->type != TYPE_STRING ) if ( fields[0]->type != TYPE_STRING )
{ {
Error("Filter for raw reader contains a field that is not of type string."); Error("Filter for raw reader contains a field that is not of type string.");
return false; return false;
@ -136,30 +137,32 @@ bool Raw::DoInit(string path, int arg_mode, int arg_num_fields, const Field* con
// do Initialization // do Initialization
char last = path[path.length()-1]; char last = path[path.length()-1];
if ( last == '|' ) if ( last == '|' )
{ {
execute = true; execute = true;
fname = path.substr(0, fname.length() - 1); fname = path.substr(0, fname.length() - 1);
if ( ( mode != MANUAL ) && ( mode != STREAM ) ) { if ( (mode != MANUAL) && (mode != STREAM) ) {
Error(Fmt("Unsupported read mode %d for source %s in execution mode", mode, fname.c_str())); Error(Fmt("Unsupported read mode %d for source %s in execution mode",
mode, fname.c_str()));
return false; return false;
} }
result = Open(); result = Open();
} else { } else {
execute = false; execute = false;
if ( ( mode != MANUAL ) && (mode != REREAD) && ( mode != STREAM ) ) if ( (mode != MANUAL) && (mode != REREAD) && (mode != STREAM) )
{ {
Error(Fmt("Unsupported read mode %d for source %s", mode, fname.c_str())); Error(Fmt("Unsupported read mode %d for source %s",
mode, fname.c_str()));
return false; return false;
} }
result = Open(); result = Open();
} }
if ( result == false ) if ( result == false )
return result; return result;
#ifdef DEBUG #ifdef DEBUG
@ -176,80 +179,78 @@ bool Raw::DoInit(string path, int arg_mode, int arg_num_fields, const Field* con
} }
bool Raw::GetLine(string& str) bool Raw::GetLine(string& str)
{ {
if ( in->peek() == std::iostream::traits_type::eof() ) if ( in->peek() == std::iostream::traits_type::eof() )
return false; return false;
if ( in->eofbit == true || in->failbit == true ) if ( in->eofbit == true || in->failbit == true )
return false; return false;
while ( getline(*in, str, separator[0]) ) return getline(*in, str, separator[0]);
return true;
return false;
} }
// read the entire file and send appropriate thingies back to InputMgr // read the entire file and send appropriate thingies back to InputMgr
bool Raw::DoUpdate() bool Raw::DoUpdate()
{ {
if ( firstrun ) if ( firstrun )
firstrun = false; firstrun = false;
else else
{ {
switch ( mode ) { switch ( mode ) {
case REREAD: case REREAD:
{
// check if the file has changed
struct stat sb;
if ( stat(fname.c_str(), &sb) == -1 )
{ {
// check if the file has changed Error(Fmt("Could not get stat for %s", fname.c_str()));
struct stat sb; return false;
if ( stat(fname.c_str(), &sb) == -1 )
{
Error(Fmt("Could not get stat for %s", fname.c_str()));
return false;
}
if ( sb.st_mtime <= mtime )
// no change
return true;
mtime = sb.st_mtime;
// file changed. reread.
// fallthrough
} }
case MANUAL:
case STREAM:
if ( mode == STREAM && file != NULL && in != NULL )
{
//fpurge(file);
in->clear(); // remove end of file evil bits
break;
}
Close(); if ( sb.st_mtime <= mtime )
if ( !Open() ) // no change
return false; return true;
mtime = sb.st_mtime;
// file changed. reread.
//
// fallthrough
}
case MANUAL:
case STREAM:
if ( mode == STREAM && file != NULL && in != NULL )
{
//fpurge(file);
in->clear(); // remove end of file evil bits
break; break;
default: }
assert(false);
Close();
if ( ! Open() )
return false;
break;
default:
assert(false);
} }
} }
string line; string line;
while ( GetLine(line) ) while ( GetLine(line) )
{ {
assert (num_fields == 1); assert (num_fields == 1);
Value** fields = new Value*[1]; Value** fields = new Value*[1];
// filter has exactly one text field. convert to it. // filter has exactly one text field. convert to it.
Value* val = new Value(TYPE_STRING, true); Value* val = new Value(TYPE_STRING, true);
val->val.string_val = new string(line); val->val.string_val = new string(line);
fields[0] = val; fields[0] = val;
Put(fields); Put(fields);
} }
@ -260,7 +261,6 @@ bool Raw::DoUpdate()
return true; return true;
} }
bool Raw::DoHeartbeat(double network_time, double current_time) bool Raw::DoHeartbeat(double network_time, double current_time)
{ {
ReaderBackend::DoHeartbeat(network_time, current_time); ReaderBackend::DoHeartbeat(network_time, current_time);
@ -269,10 +269,11 @@ bool Raw::DoHeartbeat(double network_time, double current_time)
case MANUAL: case MANUAL:
// yay, we do nothing :) // yay, we do nothing :)
break; break;
case REREAD: case REREAD:
case STREAM: case STREAM:
Update(); // call update and not DoUpdate, because update Update(); // call update and not DoUpdate, because update
// checks disabled. // checks disabled.
break; break;
default: default:
assert(false); assert(false);

View file

@ -10,51 +10,44 @@
namespace input { namespace reader { namespace input { namespace reader {
/**
* A reader that returns a file (or the output of a command) as a single
* blob.
*/
class Raw : public ReaderBackend { class Raw : public ReaderBackend {
public: public:
Raw(ReaderFrontend* frontend); Raw(ReaderFrontend* frontend);
~Raw(); ~Raw();
static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Raw(frontend); } static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Raw(frontend); }
protected: protected:
virtual bool DoInit(string path, int mode, int arg_num_fields, const threading::Field* const* fields); virtual bool DoInit(string path, int mode, int arg_num_fields, const threading::Field* const* fields);
virtual void DoClose(); virtual void DoClose();
virtual bool DoUpdate(); virtual bool DoUpdate();
private: private:
virtual bool DoHeartbeat(double network_time, double current_time); virtual bool DoHeartbeat(double network_time, double current_time);
bool Open(); bool Open();
bool Close(); bool Close();
bool GetLine(string& str); bool GetLine(string& str);
unsigned int num_fields;
const threading::Field* const * fields; // raw mapping
istream* in; istream* in;
FILE* file; FILE* file;
string fname; string fname;
// Options set from the script-level.
string separator;
int mode; int mode;
bool execute; bool execute;
bool firstrun; bool firstrun;
time_t mtime; time_t mtime;
unsigned int num_fields;
const threading::Field* const * fields; // raw mapping
// Options set from the script-level.
string separator;
}; };
} }
} }

View file

@ -6,4 +6,4 @@ print outfile, s;
close(outfile); close(outfile);
}] }]
Input::EVENT_NEW Input::EVENT_NEW
8 ../input.log 8 ../input.log

View file

@ -1,6 +1,7 @@
# #
# @TEST-EXEC: btest-bg-run bro bro -b %INPUT # @TEST-EXEC: btest-bg-run bro bro -b %INPUT
# @TEST-EXEC: btest-bg-wait -k 1 # @TEST-EXEC: btest-bg-wait -k 1
# @TEST-EXEC: cat out.tmp | sed 's/^ *//g' >out
# @TEST-EXEC: btest-diff out # @TEST-EXEC: btest-diff out
@TEST-START-FILE input.log @TEST-START-FILE input.log
@ -31,7 +32,7 @@ event line(description: Input::EventDescription, tpe: Input::Event, s: string) {
event bro_init() event bro_init()
{ {
outfile = open ("../out"); outfile = open ("../out.tmp");
Input::add_event([$source="wc -l ../input.log |", $reader=Input::READER_RAW, $name="input", $fields=Val, $ev=line]); Input::add_event([$source="wc -l ../input.log |", $reader=Input::READER_RAW, $name="input", $fields=Val, $ev=line]);
Input::remove("input"); Input::remove("input");
} }