introduce reader-info struct analogous to writer-info.

All tests still pass.
This commit is contained in:
Bernhard Amann 2012-07-02 10:03:28 -07:00
parent f820ee9f5c
commit 3559a39d59
11 changed files with 162 additions and 70 deletions

View file

@ -71,7 +71,7 @@ declare(PDict, InputHash);
class Manager::Stream { class Manager::Stream {
public: public:
string name; string name;
string source; ReaderBackend::ReaderInfo info;
bool removed; bool removed;
ReaderMode mode; ReaderMode mode;
@ -81,7 +81,6 @@ public:
EnumVal* type; EnumVal* type;
ReaderFrontend* reader; ReaderFrontend* reader;
TableVal* config; TableVal* config;
std::map<string, string> configmap;
RecordVal* description; RecordVal* description;
@ -330,8 +329,11 @@ bool Manager::CreateStream(Stream* info, RecordVal* description)
info->reader = reader_obj; info->reader = reader_obj;
info->type = reader->AsEnumVal(); // ref'd by lookupwithdefault info->type = reader->AsEnumVal(); // ref'd by lookupwithdefault
info->name = name; info->name = name;
info->source = source;
info->config = config->AsTableVal(); // ref'd by LookupWithDefault info->config = config->AsTableVal(); // ref'd by LookupWithDefault
ReaderBackend::ReaderInfo readerinfo;
readerinfo.source = source;
Ref(description); Ref(description);
info->description = description; info->description = description;
@ -345,13 +347,15 @@ bool Manager::CreateStream(Stream* info, RecordVal* description)
ListVal* index = info->config->RecoverIndex(k); ListVal* index = info->config->RecoverIndex(k);
string key = index->Index(0)->AsString()->CheckString(); string key = index->Index(0)->AsString()->CheckString();
string value = v->Value()->AsString()->CheckString(); string value = v->Value()->AsString()->CheckString();
info->configmap.insert(std::make_pair(key, value)); info->info.config.insert(std::make_pair(key, value));
Unref(index); Unref(index);
delete k; delete k;
} }
} }
info->info = readerinfo;
DBG_LOG(DBG_INPUT, "Successfully created new input stream %s", DBG_LOG(DBG_INPUT, "Successfully created new input stream %s",
name.c_str()); name.c_str());
@ -477,7 +481,7 @@ bool Manager::CreateEventStream(RecordVal* fval)
assert(stream->reader); assert(stream->reader);
stream->reader->Init(stream->source, stream->mode, stream->num_fields, logf, stream->configmap ); stream->reader->Init(stream->info, stream->mode, stream->num_fields, logf );
readers[stream->reader] = stream; readers[stream->reader] = stream;
@ -654,7 +658,7 @@ bool Manager::CreateTableStream(RecordVal* fval)
assert(stream->reader); assert(stream->reader);
stream->reader->Init(stream->source, stream->mode, fieldsV.size(), fields, stream->configmap ); stream->reader->Init(stream->info, stream->mode, fieldsV.size(), fields );
readers[stream->reader] = stream; readers[stream->reader] = stream;
@ -1234,7 +1238,7 @@ void Manager::EndCurrentSend(ReaderFrontend* reader)
#endif #endif
// Send event that the current update is indeed finished. // Send event that the current update is indeed finished.
SendEvent(update_finished, 2, new StringVal(i->name.c_str()), new StringVal(i->source.c_str())); SendEvent(update_finished, 2, new StringVal(i->name.c_str()), new StringVal(i->info.source.c_str()));
} }
void Manager::Put(ReaderFrontend* reader, Value* *vals) void Manager::Put(ReaderFrontend* reader, Value* *vals)

View file

@ -140,6 +140,49 @@ public:
} }
}; };
using namespace logging;
bool ReaderBackend::ReaderInfo::Read(SerializationFormat* fmt)
{
int size;
if ( ! (fmt->Read(&source, "source") &&
fmt->Read(&size, "config_size")) )
return false;
config.clear();
while ( size )
{
string value;
string key;
if ( ! (fmt->Read(&value, "config-value") && fmt->Read(&value, "config-key")) )
return false;
config.insert(std::make_pair(value, key));
}
return true;
}
bool ReaderBackend::ReaderInfo::Write(SerializationFormat* fmt) const
{
int size = config.size();
if ( ! (fmt->Write(source, "source") &&
fmt->Write(size, "config_size")) )
return false;
for ( config_map::const_iterator i = config.begin(); i != config.end(); ++i )
{
if ( ! (fmt->Write(i->first, "config-value") && fmt->Write(i->second, "config-key")) )
return false;
}
return true;
}
ReaderBackend::ReaderBackend(ReaderFrontend* arg_frontend) : MsgThread() ReaderBackend::ReaderBackend(ReaderFrontend* arg_frontend) : MsgThread()
{ {
@ -183,18 +226,18 @@ void ReaderBackend::SendEntry(Value* *vals)
SendOut(new SendEntryMessage(frontend, vals)); SendOut(new SendEntryMessage(frontend, vals));
} }
bool ReaderBackend::Init(string arg_source, ReaderMode arg_mode, const int arg_num_fields, bool ReaderBackend::Init(const ReaderInfo& arg_info, ReaderMode arg_mode, const int arg_num_fields,
const threading::Field* const* arg_fields, const std::map<string, string> config) const threading::Field* const* arg_fields)
{ {
source = arg_source; info = arg_info;
mode = arg_mode; mode = arg_mode;
num_fields = arg_num_fields; num_fields = arg_num_fields;
fields = arg_fields; fields = arg_fields;
SetName("InputReader/"+source); SetName("InputReader/"+info.source);
// disable if DoInit returns error. // disable if DoInit returns error.
int success = DoInit(arg_source, mode, arg_num_fields, arg_fields, config); int success = DoInit(arg_info, mode, arg_num_fields, arg_fields);
if ( ! success ) if ( ! success )
{ {

View file

@ -7,6 +7,8 @@
#include "threading/SerialTypes.h" #include "threading/SerialTypes.h"
#include "threading/MsgThread.h" #include "threading/MsgThread.h"
class RemoteSerializer;
namespace input { namespace input {
@ -65,6 +67,35 @@ public:
*/ */
virtual ~ReaderBackend(); virtual ~ReaderBackend();
/**
* A struct passing information to the reader at initialization time.
*/
struct ReaderInfo
{
typedef std::map<string, string> config_map;
/**
* A string left to the interpretation of the reader
* implementation; it corresponds to the value configured on
* the script-level for the logging filter.
*/
string source;
/**
* A map of key/value pairs corresponding to the relevant
* filter's "config" table.
*/
config_map config;
private:
friend class ::RemoteSerializer;
// Note, these need to be adapted when changing the struct's
// fields. They serialize/deserialize the struct.
bool Read(SerializationFormat* fmt);
bool Write(SerializationFormat* fmt) const;
};
/** /**
* One-time initialization of the reader to define the input source. * One-time initialization of the reader to define the input source.
* *
@ -84,7 +115,7 @@ public:
* *
* @return False if an error occured. * @return False if an error occured.
*/ */
bool Init(string source, ReaderMode mode, int num_fields, const threading::Field* const* fields, std::map<string, string> config); bool Init(const ReaderInfo& info, ReaderMode mode, int num_fields, const threading::Field* const* fields);
/** /**
* Finishes reading from this input stream in a regular fashion. Must * Finishes reading from this input stream in a regular fashion. Must
@ -112,6 +143,22 @@ public:
*/ */
void DisableFrontend(); void DisableFrontend();
/**
* Returns the log fields as passed into the constructor.
*/
const threading::Field* const * Fields() const { return fields; }
/**
* Returns the additional reader information into the constructor.
*/
const ReaderInfo& Info() const { return info; }
/**
* Returns the number of log fields as passed into the constructor.
*/
int NumFields() const { return num_fields; }
protected: protected:
// Methods that have to be overwritten by the individual readers // Methods that have to be overwritten by the individual readers
@ -133,7 +180,7 @@ protected:
* provides accessor methods to get them later, and they are passed * provides accessor methods to get them later, and they are passed
* in here only for convinience. * in here only for convinience.
*/ */
virtual bool DoInit(string path, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields, const std::map<string, string> config) = 0; virtual bool DoInit(const ReaderInfo& info, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields) = 0;
/** /**
* Reader-specific method implementing input finalization at * Reader-specific method implementing input finalization at
@ -162,26 +209,11 @@ protected:
*/ */
virtual bool DoUpdate() = 0; virtual bool DoUpdate() = 0;
/**
* Returns the input source as passed into Init()/.
*/
const string Source() const { return source; }
/** /**
* Returns the reader mode as passed into Init(). * Returns the reader mode as passed into Init().
*/ */
const ReaderMode Mode() const { return mode; } const ReaderMode Mode() const { return mode; }
/**
* Returns the number of log fields as passed into Init().
*/
unsigned int NumFields() const { return num_fields; }
/**
* Returns the log fields as passed into Init().
*/
const threading::Field* const * Fields() const { return fields; }
/** /**
* Method allowing a reader to send a specified Bro event. Vals must * Method allowing a reader to send a specified Bro event. Vals must
* match the values expected by the bro event. * match the values expected by the bro event.
@ -282,7 +314,7 @@ private:
// from this class, it's running in a different thread! // from this class, it's running in a different thread!
ReaderFrontend* frontend; ReaderFrontend* frontend;
string source; ReaderInfo info;
ReaderMode mode; ReaderMode mode;
unsigned int num_fields; unsigned int num_fields;
const threading::Field* const * fields; // raw mapping const threading::Field* const * fields; // raw mapping

View file

@ -11,22 +11,21 @@ namespace input {
class InitMessage : public threading::InputMessage<ReaderBackend> class InitMessage : public threading::InputMessage<ReaderBackend>
{ {
public: public:
InitMessage(ReaderBackend* backend, const string source, ReaderMode mode, InitMessage(ReaderBackend* backend, const ReaderBackend::ReaderInfo& info, ReaderMode mode,
const int num_fields, const threading::Field* const* fields, const std::map<string, string> config) const int num_fields, const threading::Field* const* fields)
: threading::InputMessage<ReaderBackend>("Init", backend), : threading::InputMessage<ReaderBackend>("Init", backend),
source(source), mode(mode), num_fields(num_fields), fields(fields), config(config) { } info(info), mode(mode), num_fields(num_fields), fields(fields) { }
virtual bool Process() virtual bool Process()
{ {
return Object()->Init(source, mode, num_fields, fields, config); return Object()->Init(info, mode, num_fields, fields);
} }
private: private:
const string source; const ReaderBackend::ReaderInfo info;
const ReaderMode mode; const ReaderMode mode;
const int num_fields; const int num_fields;
const threading::Field* const* fields; const threading::Field* const* fields;
const std::map<string, string> config;
}; };
class UpdateMessage : public threading::InputMessage<ReaderBackend> class UpdateMessage : public threading::InputMessage<ReaderBackend>
@ -64,8 +63,8 @@ ReaderFrontend::~ReaderFrontend()
{ {
} }
void ReaderFrontend::Init(string arg_source, ReaderMode mode, const int num_fields, void ReaderFrontend::Init(const ReaderBackend::ReaderInfo& arg_info, ReaderMode mode, const int arg_num_fields,
const threading::Field* const* fields, const std::map<string, string> config) const threading::Field* const* arg_fields)
{ {
if ( disabled ) if ( disabled )
return; return;
@ -73,10 +72,12 @@ void ReaderFrontend::Init(string arg_source, ReaderMode mode, const int num_fiel
if ( initialized ) if ( initialized )
reporter->InternalError("reader initialize twice"); reporter->InternalError("reader initialize twice");
source = arg_source; info = arg_info;
num_fields = arg_num_fields;
fields = arg_fields;
initialized = true; initialized = true;
backend->SendIn(new InitMessage(backend, arg_source, mode, num_fields, fields, config)); backend->SendIn(new InitMessage(backend, info, mode, num_fields, fields));
} }
void ReaderFrontend::Update() void ReaderFrontend::Update()
@ -110,10 +111,10 @@ void ReaderFrontend::Close()
string ReaderFrontend::Name() const string ReaderFrontend::Name() const
{ {
if ( source.size() ) if ( info.source.size() )
return ty_name; return ty_name;
return ty_name + "/" + source; return ty_name + "/" + info.source;
} }
} }

View file

@ -52,7 +52,7 @@ public:
* *
* This method must only be called from the main thread. * This method must only be called from the main thread.
*/ */
void Init(string arg_source, ReaderMode mode, const int arg_num_fields, const threading::Field* const* fields, const std::map<string, string> config); void Init(const ReaderBackend::ReaderInfo& info, ReaderMode mode, const int arg_num_fields, const threading::Field* const* fields);
/** /**
* Force an update of the current input source. Actual action depends * Force an update of the current input source. Actual action depends
@ -102,13 +102,23 @@ public:
*/ */
string Name() const; string Name() const;
protected: /**
friend class Manager; * Returns the additional reader information into the constructor.
*/
const ReaderBackend::ReaderInfo& Info() const { return info; }
/** /**
* Returns the source as passed into the constructor. * Returns the number of log fields as passed into the constructor.
*/ */
const string& Source() const { return source; }; int NumFields() const { return num_fields; }
/**
* Returns the log fields as passed into the constructor.
*/
const threading::Field* const * Fields() const { return fields; }
protected:
friend class Manager;
/** /**
* Returns the name of the backend's type. * Returns the name of the backend's type.
@ -117,7 +127,9 @@ protected:
private: private:
ReaderBackend* backend; // The backend we have instanatiated. ReaderBackend* backend; // The backend we have instanatiated.
string source; ReaderBackend::ReaderInfo info; // Meta information as passed to Init().
const threading::Field* const* fields; // The log fields.
int num_fields; // Information as passed to init();
string ty_name; // Backend type, set by manager. string ty_name; // Backend type, set by manager.
bool disabled; // True if disabled. bool disabled; // True if disabled.
bool initialized; // True if initialized. bool initialized; // True if initialized.

View file

@ -83,14 +83,14 @@ void Ascii::DoClose()
} }
} }
bool Ascii::DoInit(string path, ReaderMode mode, int num_fields, const Field* const* fields, const std::map<string, string> config) bool Ascii::DoInit(const ReaderInfo& info, ReaderMode mode, int num_fields, const Field* const* fields)
{ {
mtime = 0; mtime = 0;
file = new ifstream(path.c_str()); file = new ifstream(info.source.c_str());
if ( ! file->is_open() ) if ( ! file->is_open() )
{ {
Error(Fmt("Init: cannot open %s", path.c_str())); Error(Fmt("Init: cannot open %s", info.source.c_str()));
delete(file); delete(file);
file = 0; file = 0;
return false; return false;
@ -98,7 +98,7 @@ bool Ascii::DoInit(string path, ReaderMode mode, int num_fields, const Field* co
if ( ReadHeader(false) == false ) if ( ReadHeader(false) == false )
{ {
Error(Fmt("Init: cannot open %s; headers are incorrect", path.c_str())); Error(Fmt("Init: cannot open %s; headers are incorrect", info.source.c_str()));
file->close(); file->close();
delete(file); delete(file);
file = 0; file = 0;
@ -147,7 +147,7 @@ bool Ascii::ReadHeader(bool useCached)
//printf("Updating fields from description %s\n", line.c_str()); //printf("Updating fields from description %s\n", line.c_str());
columnMap.clear(); columnMap.clear();
for ( unsigned int i = 0; i < NumFields(); i++ ) for ( int i = 0; i < NumFields(); i++ )
{ {
const Field* field = Fields()[i]; const Field* field = Fields()[i];
@ -164,7 +164,7 @@ bool Ascii::ReadHeader(bool useCached)
} }
Error(Fmt("Did not find requested field %s in input data file %s.", Error(Fmt("Did not find requested field %s in input data file %s.",
field->name.c_str(), Source().c_str())); field->name.c_str(), Info().source.c_str()));
return false; return false;
} }
@ -367,9 +367,9 @@ bool Ascii::DoUpdate()
{ {
// check if the file has changed // check if the file has changed
struct stat sb; struct stat sb;
if ( stat(Source().c_str(), &sb) == -1 ) if ( stat(Info().source.c_str(), &sb) == -1 )
{ {
Error(Fmt("Could not get stat for %s", Source().c_str())); Error(Fmt("Could not get stat for %s", Info().source.c_str()));
return false; return false;
} }
@ -403,10 +403,10 @@ bool Ascii::DoUpdate()
file = 0; file = 0;
} }
file = new ifstream(Source().c_str()); file = new ifstream(Info().source.c_str());
if ( ! file->is_open() ) if ( ! file->is_open() )
{ {
Error(Fmt("cannot open %s", Source().c_str())); Error(Fmt("cannot open %s", Info().source.c_str()));
return false; return false;
} }
@ -490,7 +490,7 @@ bool Ascii::DoUpdate()
} }
//printf("fpos: %d, second.num_fields: %d\n", fpos, (*it).second.num_fields); //printf("fpos: %d, second.num_fields: %d\n", fpos, (*it).second.num_fields);
assert ( (unsigned int) fpos == NumFields() ); assert ( fpos == NumFields() );
if ( Mode() == MODE_STREAM ) if ( Mode() == MODE_STREAM )
Put(fields); Put(fields);

View file

@ -38,7 +38,7 @@ public:
static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Ascii(frontend); } static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Ascii(frontend); }
protected: protected:
virtual bool DoInit(string path, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields, const std::map<string, string> config); virtual bool DoInit(const ReaderInfo& info, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields);
virtual void DoClose(); virtual void DoClose();
virtual bool DoUpdate(); virtual bool DoUpdate();
virtual bool DoHeartbeat(double network_time, double current_time); virtual bool DoHeartbeat(double network_time, double current_time);

View file

@ -36,9 +36,9 @@ void Benchmark::DoClose()
{ {
} }
bool Benchmark::DoInit(string path, ReaderMode mode, int num_fields, const Field* const* fields, const std::map<string, string> config) bool Benchmark::DoInit(const ReaderInfo& info, ReaderMode mode, int num_fields, const Field* const* fields)
{ {
num_lines = atoi(path.c_str()); num_lines = atoi(info.source.c_str());
if ( autospread != 0.0 ) if ( autospread != 0.0 )
autospread_time = (int) ( (double) 1000000 / (autospread * (double) num_lines) ); autospread_time = (int) ( (double) 1000000 / (autospread * (double) num_lines) );
@ -80,7 +80,7 @@ bool Benchmark::DoUpdate()
for ( int i = 0; i < linestosend; i++ ) for ( int i = 0; i < linestosend; i++ )
{ {
Value** field = new Value*[NumFields()]; Value** field = new Value*[NumFields()];
for (unsigned int j = 0; j < NumFields(); j++ ) for (int j = 0; j < NumFields(); j++ )
field[j] = EntryToVal(Fields()[j]->type, Fields()[j]->subtype); field[j] = EntryToVal(Fields()[j]->type, Fields()[j]->subtype);
if ( Mode() == MODE_STREAM ) if ( Mode() == MODE_STREAM )

View file

@ -18,7 +18,7 @@ public:
static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Benchmark(frontend); } static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Benchmark(frontend); }
protected: protected:
virtual bool DoInit(string path, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields, const std::map<string, string> config); virtual bool DoInit(const ReaderInfo& info, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields);
virtual void DoClose(); virtual void DoClose();
virtual bool DoUpdate(); virtual bool DoUpdate();
virtual bool DoHeartbeat(double network_time, double current_time); virtual bool DoHeartbeat(double network_time, double current_time);

View file

@ -100,15 +100,15 @@ bool Raw::CloseInput()
return true; return true;
} }
bool Raw::DoInit(string path, ReaderMode mode, int num_fields, const Field* const* fields, const std::map<string, string> config) bool Raw::DoInit(const ReaderInfo& info, ReaderMode mode, int num_fields, const Field* const* fields)
{ {
fname = path; fname = info.source;
mtime = 0; mtime = 0;
execute = false; execute = false;
firstrun = true; firstrun = true;
bool result; bool result;
if ( path.length() == 0 ) if ( info.source.length() == 0 )
{ {
Error("No source path provided"); Error("No source path provided");
return false; return false;
@ -129,11 +129,11 @@ bool Raw::DoInit(string path, ReaderMode mode, int num_fields, const Field* cons
} }
// do Initialization // do Initialization
char last = path[path.length()-1]; char last = info.source[info.source.length()-1];
if ( last == '|' ) if ( last == '|' )
{ {
execute = true; execute = true;
fname = path.substr(0, fname.length() - 1); fname = info.source.substr(0, fname.length() - 1);
if ( (mode != MODE_MANUAL) ) if ( (mode != MODE_MANUAL) )
{ {

View file

@ -22,7 +22,7 @@ public:
static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Raw(frontend); } static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Raw(frontend); }
protected: protected:
virtual bool DoInit(string path, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields, const std::map<string, string> config); virtual bool DoInit(const ReaderInfo& info, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields);
virtual void DoClose(); virtual void DoClose();
virtual bool DoUpdate(); virtual bool DoUpdate();
virtual bool DoHeartbeat(double network_time, double current_time); virtual bool DoHeartbeat(double network_time, double current_time);