A set of input framework refactoring, cleanup, and polishing.

This commit is contained in:
Robin Sommer 2012-05-30 16:38:08 -07:00
parent 1416d5404d
commit fc907c0090
14 changed files with 162 additions and 168 deletions

View file

@ -117,9 +117,6 @@ export {
module Input;
#global streams: table[string] of Filter;
# ^ change to set containing the names
function add_table(description: Input::TableDescription) : bool
{
return __create_table_stream(description);

View file

@ -74,7 +74,7 @@ public:
string source;
bool removed;
int mode;
ReaderMode mode;
StreamType stream_type; // to distinguish between event and table streams
@ -299,7 +299,25 @@ bool Manager::CreateStream(Stream* info, RecordVal* description)
Unref(sourceval);
EnumVal* mode = description->LookupWithDefault(rtype->FieldOffset("mode"))->AsEnumVal();
info->mode = mode->InternalInt();
switch ( mode->InternalInt() )
{
case 0:
info->mode = MODE_MANUAL;
break;
case 1:
info->mode = MODE_REREAD;
break;
case 2:
info->mode = MODE_STREAM;
break;
default:
reporter->InternalError("unknown reader mode");
}
Unref(mode);
info->reader = reader_obj;

View file

@ -176,15 +176,16 @@ void ReaderBackend::SendEntry(Value* *vals)
SendOut(new SendEntryMessage(frontend, vals));
}
bool ReaderBackend::Init(string arg_source, int mode, const int arg_num_fields,
bool ReaderBackend::Init(string arg_source, ReaderMode arg_mode, const int arg_num_fields,
const threading::Field* const* arg_fields)
{
source = arg_source;
SetName("InputReader/"+source);
mode = arg_mode;
num_fields = arg_num_fields;
fields = arg_fields;
SetName("InputReader/"+source);
// disable if DoInit returns error.
int success = DoInit(arg_source, mode, arg_num_fields, arg_fields);

View file

@ -4,11 +4,32 @@
#define INPUT_READERBACKEND_H
#include "BroString.h"
#include "../threading/SerialTypes.h"
#include "threading/SerialTypes.h"
#include "threading/MsgThread.h"
namespace input {
/**
* The modes a reader can be in.
*/
enum ReaderMode {
/**
* TODO Bernhard.
*/
MODE_MANUAL,
/**
* TODO Bernhard.
*/
MODE_REREAD,
/**
* TODO Bernhard.
*/
MODE_STREAM
};
class ReaderFrontend;
/**
@ -40,24 +61,20 @@ public:
/**
* One-time initialization of the reader to define the input source.
*
* @param arg_source A string left to the interpretation of the
* @param source A string left to the interpretation of the
* reader implementation; it corresponds to the value configured on
* the script-level for the input stream.
*
* @param fields An array of size \a num_fields with the input
* fields. The method takes ownership of the array.
* @param mode The opening mode for the input source.
*
* @param mode The opening mode for the input source as one of the
* Input::Mode script constants.
*
* @param arg_num_fields Number of fields contained in \a fields.
* @param num_fields Number of fields contained in \a fields.
*
* @param fields The types and names of the fields to be retrieved
* from the input source.
*
* @return False if an error occured.
*/
bool Init(string arg_source, int mode, int arg_num_fields, const threading::Field* const* fields);
bool Init(string source, ReaderMode mode, int num_fields, const threading::Field* const* fields);
/**
* Finishes reading from this input stream in a regular fashion. Must
@ -98,8 +115,15 @@ protected:
* prevents the reader from further operation; it will then be
* disabled and eventually deleted. When returning false, an
* implementation should also call Error() to indicate what happened.
*
* Arguments are the same as Init().
*
* Note that derived classes don't need to store the values passed in
* here if other methods need them to; the \a ReaderBackend class
* provides accessor methods to get them later, and they are passed
* in here only for convinience.
*/
virtual bool DoInit(string arg_sources, int mode, int arg_num_fields, const threading::Field* const* fields) = 0;
virtual bool DoInit(string path, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields) = 0;
/**
* Reader-specific method implementing input finalization at
@ -129,10 +153,25 @@ protected:
virtual bool DoUpdate() = 0;
/**
* Returns the input source as passed into the constructor.
* Returns the input source as passed into Init()/.
*/
const string Source() const { return source; }
/**
* Returns the reader mode as passed into Init().
*/
const ReaderMode Mode() const { return mode; }
/**
* Returns the number of log fields as passed into Init().
*/
unsigned int NumFields() const { return num_fields; }
/**
* Returns the log fields as passed into Init().
*/
const threading::Field* const * Fields() const { return fields; }
/**
* Method allowing a reader to send a specified Bro event. Vals must
* match the values expected by the bro event.
@ -145,8 +184,8 @@ protected:
*/
void SendEvent(const string& name, const int num_vals, threading::Value* *vals);
// Content-sending-functions (simple mode). Including table-specific
// stuff that simply is not used if we have no table.
// Content-sending-functions (simple mode). Include table-specific
// functionality that simply is not used if we have no table.
/**
* Method allowing a reader to send a list of values read from a
@ -155,9 +194,10 @@ protected:
* If the stream is a table stream, the values are inserted into the
* table; if it is an event stream, the event is raised.
*
* @param val list of threading::Values expected by the stream
* @param val Array of threading::Values expected by the stream. The
* array must have exactly NumEntries() elements.
*/
void Put(threading::Value* *val);
void Put(threading::Value** val);
/**
* Method allowing a reader to delete a specific value from a Bro
@ -166,9 +206,10 @@ protected:
* If the receiving stream is an event stream, only a removed event
* is raised.
*
* @param val list of threading::Values expected by the stream
* @param val Array of threading::Values expected by the stream. The
* array must have exactly NumEntries() elements.
*/
void Delete(threading::Value* *val);
void Delete(threading::Value** val);
/**
* Method allowing a reader to clear a Bro table.
@ -187,9 +228,10 @@ protected:
* If the stream is a table stream, the values are inserted into the
* table; if it is an event stream, the event is raised.
*
* @param val list of threading::Values expected by the stream
* @param val Array of threading::Values expected by the stream. The
* array must have exactly NumEntries() elements.
*/
void SendEntry(threading::Value* *vals);
void SendEntry(threading::Value** vals);
/**
* Method telling the manager, that the current list of entries sent
@ -210,14 +252,16 @@ protected:
virtual bool DoHeartbeat(double network_time, double current_time);
/**
* Utility function for Readers - convert a string into a TransportProto
* Convert a string into a TransportProto. This is just a utility
* function for Readers.
*
* @param proto the transport protocol
*/
TransportProto StringToProto(const string &proto);
/**
* Utility function for Readers - convert a string into a Value::addr_t
* Convert a string into a Value::addr_t. This is just a utility
* function for Readers.
*
* @param addr containing an ipv4 or ipv6 address
*/
@ -229,11 +273,11 @@ private:
ReaderFrontend* frontend;
string source;
bool disabled;
ReaderMode mode;
unsigned int num_fields;
const threading::Field* const * fields; // raw mapping
bool disabled;
};
}

View file

@ -12,7 +12,7 @@ namespace input {
class InitMessage : public threading::InputMessage<ReaderBackend>
{
public:
InitMessage(ReaderBackend* backend, const string source, const int mode,
InitMessage(ReaderBackend* backend, const string source, ReaderMode mode,
const int num_fields, const threading::Field* const* fields)
: threading::InputMessage<ReaderBackend>("Init", backend),
source(source), mode(mode), num_fields(num_fields), fields(fields) { }
@ -24,7 +24,7 @@ public:
private:
const string source;
const int mode;
const ReaderMode mode;
const int num_fields;
const threading::Field* const* fields;
};
@ -64,7 +64,7 @@ ReaderFrontend::~ReaderFrontend()
{
}
void ReaderFrontend::Init(string arg_source, int mode, const int num_fields,
void ReaderFrontend::Init(string arg_source, ReaderMode mode, const int num_fields,
const threading::Field* const* fields)
{
if ( disabled )

View file

@ -6,6 +6,8 @@
#include "../threading/MsgThread.h"
#include "../threading/SerialTypes.h"
#include "ReaderBackend.h"
namespace input {
class Manager;
@ -50,7 +52,7 @@ public:
*
* This method must only be called from the main thread.
*/
void Init(string arg_source, int mode, const int arg_num_fields, const threading::Field* const* fields);
void Init(string arg_source, ReaderMode mode, const int arg_num_fields, const threading::Field* const* fields);
/**
* Force an update of the current input source. Actual action depends

View file

@ -8,10 +8,6 @@
#include "../../threading/SerialTypes.h"
#define MANUAL 0
#define REREAD 1
#define STREAM 2
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
@ -87,25 +83,14 @@ void Ascii::DoClose()
}
}
bool Ascii::DoInit(string path, int arg_mode, int arg_num_fields, const Field* const* arg_fields)
bool Ascii::DoInit(string path, ReaderMode mode, int num_fields, const Field* const* fields)
{
fname = path;
mode = arg_mode;
mtime = 0;
num_fields = arg_num_fields;
fields = arg_fields;
if ( (mode != MANUAL) && (mode != REREAD) && (mode != STREAM) )
{
Error(Fmt("Unsupported read mode %d for source %s", mode, path.c_str()));
return false;
}
file = new ifstream(path.c_str());
if ( ! file->is_open() )
{
Error(Fmt("Init: cannot open %s", fname.c_str()));
Error(Fmt("Init: cannot open %s", path.c_str()));
delete(file);
file = 0;
return false;
@ -113,7 +98,7 @@ bool Ascii::DoInit(string path, int arg_mode, int arg_num_fields, const Field* c
if ( ReadHeader(false) == false )
{
Error(Fmt("Init: cannot open %s; headers are incorrect", fname.c_str()));
Error(Fmt("Init: cannot open %s; headers are incorrect", path.c_str()));
file->close();
delete(file);
file = 0;
@ -162,9 +147,9 @@ bool Ascii::ReadHeader(bool useCached)
//printf("Updating fields from description %s\n", line.c_str());
columnMap.clear();
for ( unsigned int i = 0; i < num_fields; i++ )
for ( unsigned int i = 0; i < NumFields(); i++ )
{
const Field* field = fields[i];
const Field* field = Fields()[i];
map<string, uint32_t>::iterator fit = ifields.find(field->name);
if ( fit == ifields.end() )
@ -179,7 +164,7 @@ bool Ascii::ReadHeader(bool useCached)
}
Error(Fmt("Did not find requested field %s in input data file %s.",
field->name.c_str(), fname.c_str()));
field->name.c_str(), Source().c_str()));
return false;
}
@ -377,14 +362,14 @@ Value* Ascii::EntryToVal(string s, FieldMapping field)
// read the entire file and send appropriate thingies back to InputMgr
bool Ascii::DoUpdate()
{
switch ( mode ) {
case REREAD:
switch ( Mode() ) {
case MODE_REREAD:
{
// check if the file has changed
struct stat sb;
if ( stat(fname.c_str(), &sb) == -1 )
if ( stat(Source().c_str(), &sb) == -1 )
{
Error(Fmt("Could not get stat for %s", fname.c_str()));
Error(Fmt("Could not get stat for %s", Source().c_str()));
return false;
}
@ -397,14 +382,14 @@ bool Ascii::DoUpdate()
// fallthrough
}
case MANUAL:
case STREAM:
case MODE_MANUAL:
case MODE_STREAM:
{
// dirty, fix me. (well, apparently after trying seeking, etc
// - this is not that bad)
if ( file && file->is_open() )
{
if ( mode == STREAM )
if ( Mode() == MODE_STREAM )
{
file->clear(); // remove end of file evil bits
if ( !ReadHeader(true) )
@ -415,10 +400,10 @@ bool Ascii::DoUpdate()
file->close();
}
file = new ifstream(fname.c_str());
file = new ifstream(Source().c_str());
if ( !file->is_open() )
{
Error(Fmt("cannot open %s", fname.c_str()));
Error(Fmt("cannot open %s", Source().c_str()));
return false;
}
@ -455,7 +440,7 @@ bool Ascii::DoUpdate()
pos--; // for easy comparisons of max element.
Value** fields = new Value*[num_fields];
Value** fields = new Value*[NumFields()];
int fpos = 0;
for ( vector<FieldMapping>::iterator fit = columnMap.begin();
@ -502,15 +487,15 @@ bool Ascii::DoUpdate()
}
//printf("fpos: %d, second.num_fields: %d\n", fpos, (*it).second.num_fields);
assert ( (unsigned int) fpos == num_fields );
assert ( (unsigned int) fpos == NumFields() );
if ( mode == STREAM )
if ( Mode() == MODE_STREAM )
Put(fields);
else
SendEntry(fields);
}
if ( mode != STREAM )
if ( Mode () != MODE_STREAM )
EndCurrentSend();
return true;
@ -520,13 +505,13 @@ bool Ascii::DoHeartbeat(double network_time, double current_time)
{
ReaderBackend::DoHeartbeat(network_time, current_time);
switch ( mode ) {
case MANUAL:
switch ( Mode() ) {
case MODE_MANUAL:
// yay, we do nothing :)
break;
case REREAD:
case STREAM:
case MODE_REREAD:
case MODE_STREAM:
Update(); // call update and not DoUpdate, because update
// checks disabled.
break;

View file

@ -10,7 +10,7 @@
namespace input { namespace reader {
// Description for input field mapping
// Description for input field mapping.
struct FieldMapping {
string name;
TypeTag type;
@ -27,6 +27,9 @@ struct FieldMapping {
FieldMapping subType();
};
/**
* Reader for structured ASCII files.
*/
class Ascii : public ReaderBackend {
public:
Ascii(ReaderFrontend* frontend);
@ -35,23 +38,18 @@ public:
static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Ascii(frontend); }
protected:
virtual bool DoInit(string path, int mode, int arg_num_fields, const threading::Field* const* fields);
virtual bool DoInit(string path, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields);
virtual void DoClose();
virtual bool DoUpdate();
virtual bool DoHeartbeat(double network_time, double current_time);
private:
virtual bool DoHeartbeat(double network_time, double current_time);
bool ReadHeader(bool useCached);
bool GetLine(string& str);
threading::Value* EntryToVal(string s, FieldMapping type);
unsigned int num_fields;
const threading::Field* const *fields; // raw mapping
ifstream* file;
string fname;
int mode;
time_t mtime;
// map columns in the file to columns to send back to the manager

View file

@ -5,10 +5,6 @@
#include "../../threading/SerialTypes.h"
#define MANUAL 0
#define REREAD 1
#define STREAM 2
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
@ -19,8 +15,6 @@ using namespace input::reader;
using threading::Value;
using threading::Field;
Benchmark::Benchmark(ReaderFrontend *frontend) : ReaderBackend(frontend)
{
multiplication_factor = double(BifConst::InputBenchmark::factor);
@ -42,23 +36,13 @@ void Benchmark::DoClose()
{
}
bool Benchmark::DoInit(string path, int arg_mode, int arg_num_fields, const Field* const* arg_fields)
bool Benchmark::DoInit(string path, ReaderMode mode, int num_fields, const Field* const* fields)
{
mode = arg_mode;
num_fields = arg_num_fields;
fields = arg_fields;
num_lines = atoi(path.c_str());
if ( autospread != 0.0 )
autospread_time = (int) ( (double) 1000000 / (autospread * (double) num_lines) );
if ( (mode != MANUAL) && (mode != REREAD) && (mode != STREAM) )
{
Error(Fmt("Unsupported read mode %d for source %s", mode, path.c_str()));
return false;
}
heartbeatstarttime = CurrTime();
DoUpdate();
@ -95,11 +79,11 @@ bool Benchmark::DoUpdate()
int linestosend = num_lines * heart_beat_interval;
for ( int i = 0; i < linestosend; i++ )
{
Value** field = new Value*[num_fields];
for (unsigned int j = 0; j < num_fields; j++ )
field[j] = EntryToVal(fields[j]->type, fields[j]->subtype);
Value** field = new Value*[NumFields()];
for (unsigned int j = 0; j < NumFields(); j++ )
field[j] = EntryToVal(Fields()[j]->type, Fields()[j]->subtype);
if ( mode == STREAM )
if ( Mode() == MODE_STREAM )
// do not do tracking, spread out elements over the second that we have...
Put(field);
else
@ -125,7 +109,7 @@ bool Benchmark::DoUpdate()
}
if ( mode != STREAM )
if ( Mode() != MODE_STREAM )
EndCurrentSend();
return true;
@ -243,13 +227,13 @@ bool Benchmark::DoHeartbeat(double network_time, double current_time)
num_lines += add;
heartbeatstarttime = CurrTime();
switch ( mode ) {
case MANUAL:
switch ( Mode() ) {
case MODE_MANUAL:
// yay, we do nothing :)
break;
case REREAD:
case STREAM:
case MODE_REREAD:
case MODE_STREAM:
if ( multiplication_factor != 1 || add != 0 )
{
// we have to document at what time we changed the factor to what value.
@ -270,6 +254,7 @@ bool Benchmark::DoHeartbeat(double network_time, double current_time)
SendEvent("HeartbeatDone", 0, 0);
break;
default:
assert(false);
}

View file

@ -18,21 +18,16 @@ public:
static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Benchmark(frontend); }
protected:
virtual bool DoInit(string path, int mode, int arg_num_fields, const threading::Field* const* fields);
virtual bool DoInit(string path, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields);
virtual void DoClose();
virtual bool DoUpdate();
private:
virtual bool DoHeartbeat(double network_time, double current_time);
private:
double CurrTime();
string RandomString(const int len);
threading::Value* EntryToVal(TypeTag Type, TypeTag subtype);
unsigned int num_fields;
const threading::Field* const * fields; // raw mapping
int mode;
int num_lines;
double multiplication_factor;
int spread;

View file

@ -9,10 +9,6 @@
#include "../../threading/SerialTypes.h"
#include "../fdstream.h"
#define MANUAL 0
#define REREAD 1
#define STREAM 2
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
@ -48,7 +44,7 @@ void Raw::DoClose()
}
}
bool Raw::Open()
bool Raw::OpenInput()
{
if ( execute )
{
@ -72,13 +68,13 @@ bool Raw::Open()
// This is defined in input/fdstream.h
in = new boost::fdistream(fileno(file));
if ( execute && mode == STREAM )
if ( execute && Mode() == MODE_STREAM )
fcntl(fileno(file), F_SETFL, O_NONBLOCK);
return true;
}
bool Raw::Close()
bool Raw::CloseInput()
{
if ( file == NULL )
{
@ -103,25 +99,21 @@ bool Raw::Close()
return true;
}
bool Raw::DoInit(string path, int arg_mode, int arg_num_fields, const Field* const* arg_fields)
bool Raw::DoInit(string path, ReaderMode mode, int num_fields, const Field* const* fields)
{
fname = path;
mode = arg_mode;
mtime = 0;
execute = false;
firstrun = true;
bool result;
num_fields = arg_num_fields;
fields = arg_fields;
if ( path.length() == 0 )
{
Error("No source path provided");
return false;
}
if ( arg_num_fields != 1 )
if ( num_fields != 1 )
{
Error("Filter for raw reader contains more than one field. "
"Filters for the raw reader may only contain exactly one string field. "
@ -142,7 +134,7 @@ bool Raw::DoInit(string path, int arg_mode, int arg_num_fields, const Field* con
execute = true;
fname = path.substr(0, fname.length() - 1);
if ( (mode != MANUAL) && (mode != STREAM) ) {
if ( (mode != MODE_MANUAL) && (mode != MODE_STREAM) ) {
Error(Fmt("Unsupported read mode %d for source %s in execution mode",
mode, fname.c_str()));
return false;
@ -152,13 +144,6 @@ bool Raw::DoInit(string path, int arg_mode, int arg_num_fields, const Field* con
} else {
execute = false;
if ( (mode != MANUAL) && (mode != REREAD) && (mode != STREAM) )
{
Error(Fmt("Unsupported read mode %d for source %s",
mode, fname.c_str()));
return false;
}
result = Open();
}
@ -198,8 +183,8 @@ bool Raw::DoUpdate()
else
{
switch ( mode ) {
case REREAD:
switch ( Mode() ) {
case MODE_REREAD:
{
// check if the file has changed
struct stat sb;
@ -219,9 +204,9 @@ bool Raw::DoUpdate()
// fallthrough
}
case MANUAL:
case STREAM:
if ( mode == STREAM && file != NULL && in != NULL )
case MODE_MANUAL:
case MODE_STREAM:
if ( Mode() == MODE_STREAM && file != NULL && in != NULL )
{
//fpurge(file);
in->clear(); // remove end of file evil bits
@ -242,7 +227,7 @@ bool Raw::DoUpdate()
string line;
while ( GetLine(line) )
{
assert (num_fields == 1);
assert (NumFields() == 1);
Value** fields = new Value*[1];
@ -265,13 +250,13 @@ bool Raw::DoHeartbeat(double network_time, double current_time)
{
ReaderBackend::DoHeartbeat(network_time, current_time);
switch ( mode ) {
case MANUAL:
switch ( Mode() ) {
case MODE_MANUAL:
// yay, we do nothing :)
break;
case REREAD:
case STREAM:
case MODE_REREAD:
case MODE_STREAM:
Update(); // call update and not DoUpdate, because update
// checks disabled.
break;

View file

@ -22,24 +22,19 @@ public:
static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Raw(frontend); }
protected:
virtual bool DoInit(string path, int mode, int arg_num_fields, const threading::Field* const* fields);
virtual bool DoInit(string path, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields);
virtual void DoClose();
virtual bool DoUpdate();
private:
virtual bool DoHeartbeat(double network_time, double current_time);
bool Open();
bool Close();
private:
bool OpenInput();
bool CloseInput();
bool GetLine(string& str);
unsigned int num_fields;
const threading::Field* const * fields; // raw mapping
string fname; // Sources with a potential " |" removed.
istream* in;
FILE* file;
string fname;
int mode;
bool execute;
bool firstrun;
time_t mtime;

View file

@ -12,9 +12,6 @@ Manager::Manager()
next_beat = 0;
terminating = false;
idle = true;
heart_beat_interval = double(BifConst::Threading::heart_beat_interval);
DBG_LOG(DBG_THREADING, "Heart beat interval set to %f", heart_beat_interval);
}
Manager::~Manager()
@ -61,12 +58,6 @@ void Manager::KillThreads()
void Manager::AddThread(BasicThread* thread)
{
if ( heart_beat_interval == 0 ) {
// Sometimes initialization does not seem to work from constructor.
heart_beat_interval = double(BifConst::Threading::heart_beat_interval);
DBG_LOG(DBG_THREADING, "Heart beat interval set to %f", heart_beat_interval);
}
DBG_LOG(DBG_THREADING, "Adding thread %s ...", thread->Name().c_str());
all_threads.push_back(thread);
idle = false;
@ -107,7 +98,7 @@ void Manager::Process()
if ( network_time && (network_time > next_beat || ! next_beat) )
{
do_beat = true;
next_beat = ::network_time + heart_beat_interval;
next_beat = ::network_time + BifConst::Threading::heart_beat_interval;
}
did_process = false;

View file

@ -126,8 +126,6 @@ protected:
virtual const char* Tag() { return "threading::Manager"; }
private:
int heart_beat_interval;
typedef std::list<BasicThread*> all_thread_list;
all_thread_list all_threads;