add mode to readerinfo - no need to have it separately everywhere anymore.

Disable remoteserialization of readerinfo - in contrast to the logging framework this is not needed here (I think).
This commit is contained in:
Bernhard Amann 2012-07-02 10:41:02 -07:00
parent 3559a39d59
commit 7f83f157fc
11 changed files with 52 additions and 53 deletions

View file

@ -74,8 +74,6 @@ public:
ReaderBackend::ReaderInfo info; ReaderBackend::ReaderInfo info;
bool removed; bool removed;
ReaderMode mode;
StreamType stream_type; // to distinguish between event and table streams StreamType stream_type; // to distinguish between event and table streams
EnumVal* type; EnumVal* type;
@ -305,19 +303,21 @@ bool Manager::CreateStream(Stream* info, RecordVal* description)
EnumVal* mode = description->LookupWithDefault(rtype->FieldOffset("mode"))->AsEnumVal(); EnumVal* mode = description->LookupWithDefault(rtype->FieldOffset("mode"))->AsEnumVal();
Val* config = description->LookupWithDefault(rtype->FieldOffset("config")); Val* config = description->LookupWithDefault(rtype->FieldOffset("config"));
ReaderBackend::ReaderInfo readerinfo;
switch ( mode->InternalInt() ) switch ( mode->InternalInt() )
{ {
case 0: case 0:
info->mode = MODE_MANUAL; readerinfo.mode = MODE_MANUAL;
break; break;
case 1: case 1:
info->mode = MODE_REREAD; readerinfo.mode = MODE_REREAD;
break; break;
case 2: case 2:
info->mode = MODE_STREAM; readerinfo.mode = MODE_STREAM;
break; break;
default: default:
@ -331,7 +331,6 @@ bool Manager::CreateStream(Stream* info, RecordVal* description)
info->name = name; info->name = name;
info->config = config->AsTableVal(); // ref'd by LookupWithDefault info->config = config->AsTableVal(); // ref'd by LookupWithDefault
ReaderBackend::ReaderInfo readerinfo;
readerinfo.source = source; readerinfo.source = source;
Ref(description); Ref(description);
@ -481,7 +480,7 @@ bool Manager::CreateEventStream(RecordVal* fval)
assert(stream->reader); assert(stream->reader);
stream->reader->Init(stream->info, stream->mode, stream->num_fields, logf ); stream->reader->Init(stream->info, stream->num_fields, logf );
readers[stream->reader] = stream; readers[stream->reader] = stream;
@ -658,7 +657,7 @@ bool Manager::CreateTableStream(RecordVal* fval)
assert(stream->reader); assert(stream->reader);
stream->reader->Init(stream->info, stream->mode, fieldsV.size(), fields ); stream->reader->Init(stream->info, fieldsV.size(), fields );
readers[stream->reader] = stream; readers[stream->reader] = stream;

View file

@ -142,6 +142,10 @@ public:
using namespace logging; using namespace logging;
/*
* I don't think the input framework needs remote serialization. If it doesn't, kill this. If it does add ReaderMode.
bool ReaderBackend::ReaderInfo::Read(SerializationFormat* fmt) bool ReaderBackend::ReaderInfo::Read(SerializationFormat* fmt)
{ {
int size; int size;
@ -184,6 +188,8 @@ bool ReaderBackend::ReaderInfo::Write(SerializationFormat* fmt) const
return true; return true;
} }
*/
ReaderBackend::ReaderBackend(ReaderFrontend* arg_frontend) : MsgThread() ReaderBackend::ReaderBackend(ReaderFrontend* arg_frontend) : MsgThread()
{ {
disabled = true; // disabled will be set correcty in init. disabled = true; // disabled will be set correcty in init.
@ -226,18 +232,17 @@ void ReaderBackend::SendEntry(Value* *vals)
SendOut(new SendEntryMessage(frontend, vals)); SendOut(new SendEntryMessage(frontend, vals));
} }
bool ReaderBackend::Init(const ReaderInfo& arg_info, ReaderMode arg_mode, const int arg_num_fields, bool ReaderBackend::Init(const ReaderInfo& arg_info, const int arg_num_fields,
const threading::Field* const* arg_fields) const threading::Field* const* arg_fields)
{ {
info = arg_info; info = arg_info;
mode = arg_mode;
num_fields = arg_num_fields; num_fields = arg_num_fields;
fields = arg_fields; fields = arg_fields;
SetName("InputReader/"+info.source); SetName("InputReader/"+info.source);
// disable if DoInit returns error. // disable if DoInit returns error.
int success = DoInit(arg_info, mode, arg_num_fields, arg_fields); int success = DoInit(arg_info, arg_num_fields, arg_fields);
if ( ! success ) if ( ! success )
{ {

View file

@ -7,8 +7,6 @@
#include "threading/SerialTypes.h" #include "threading/SerialTypes.h"
#include "threading/MsgThread.h" #include "threading/MsgThread.h"
class RemoteSerializer;
namespace input { namespace input {
@ -87,6 +85,12 @@ public:
*/ */
config_map config; config_map config;
/**
* The opening mode for the input source.
*/
ReaderMode mode;
/*
* I don't think the input framework needs remote serialization. If it doesn't, kill this. If it does add ReaderMode.
private: private:
friend class ::RemoteSerializer; friend class ::RemoteSerializer;
@ -94,16 +98,14 @@ public:
// fields. They serialize/deserialize the struct. // fields. They serialize/deserialize the struct.
bool Read(SerializationFormat* fmt); bool Read(SerializationFormat* fmt);
bool Write(SerializationFormat* fmt) const; bool Write(SerializationFormat* fmt) const;
*/
}; };
/** /**
* One-time initialization of the reader to define the input source. * One-time initialization of the reader to define the input source.
* *
* @param source A string left to the interpretation of the * @param @param info Meta information for the writer.
* reader implementation; it corresponds to the value configured on
* the script-level for the input stream.
*
* @param mode The opening mode for the input source.
* *
* @param num_fields Number of fields contained in \a fields. * @param num_fields Number of fields contained in \a fields.
* *
@ -115,7 +117,7 @@ public:
* *
* @return False if an error occured. * @return False if an error occured.
*/ */
bool Init(const ReaderInfo& info, ReaderMode mode, int num_fields, const threading::Field* const* fields); bool Init(const ReaderInfo& info, int num_fields, const threading::Field* const* fields);
/** /**
* Finishes reading from this input stream in a regular fashion. Must * Finishes reading from this input stream in a regular fashion. Must
@ -180,7 +182,7 @@ protected:
* provides accessor methods to get them later, and they are passed * provides accessor methods to get them later, and they are passed
* in here only for convinience. * in here only for convinience.
*/ */
virtual bool DoInit(const ReaderInfo& info, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields) = 0; virtual bool DoInit(const ReaderInfo& info, int arg_num_fields, const threading::Field* const* fields) = 0;
/** /**
* Reader-specific method implementing input finalization at * Reader-specific method implementing input finalization at
@ -209,11 +211,6 @@ protected:
*/ */
virtual bool DoUpdate() = 0; virtual bool DoUpdate() = 0;
/**
* Returns the reader mode as passed into Init().
*/
const ReaderMode Mode() const { return mode; }
/** /**
* Method allowing a reader to send a specified Bro event. Vals must * Method allowing a reader to send a specified Bro event. Vals must
* match the values expected by the bro event. * match the values expected by the bro event.
@ -315,7 +312,6 @@ private:
ReaderFrontend* frontend; ReaderFrontend* frontend;
ReaderInfo info; ReaderInfo info;
ReaderMode mode;
unsigned int num_fields; unsigned int num_fields;
const threading::Field* const * fields; // raw mapping const threading::Field* const * fields; // raw mapping

View file

@ -11,19 +11,18 @@ namespace input {
class InitMessage : public threading::InputMessage<ReaderBackend> class InitMessage : public threading::InputMessage<ReaderBackend>
{ {
public: public:
InitMessage(ReaderBackend* backend, const ReaderBackend::ReaderInfo& info, ReaderMode mode, InitMessage(ReaderBackend* backend, const ReaderBackend::ReaderInfo& info,
const int num_fields, const threading::Field* const* fields) const int num_fields, const threading::Field* const* fields)
: threading::InputMessage<ReaderBackend>("Init", backend), : threading::InputMessage<ReaderBackend>("Init", backend),
info(info), mode(mode), num_fields(num_fields), fields(fields) { } info(info), num_fields(num_fields), fields(fields) { }
virtual bool Process() virtual bool Process()
{ {
return Object()->Init(info, mode, num_fields, fields); return Object()->Init(info, num_fields, fields);
} }
private: private:
const ReaderBackend::ReaderInfo info; const ReaderBackend::ReaderInfo info;
const ReaderMode mode;
const int num_fields; const int num_fields;
const threading::Field* const* fields; const threading::Field* const* fields;
}; };
@ -63,7 +62,7 @@ ReaderFrontend::~ReaderFrontend()
{ {
} }
void ReaderFrontend::Init(const ReaderBackend::ReaderInfo& arg_info, ReaderMode mode, const int arg_num_fields, void ReaderFrontend::Init(const ReaderBackend::ReaderInfo& arg_info, const int arg_num_fields,
const threading::Field* const* arg_fields) const threading::Field* const* arg_fields)
{ {
if ( disabled ) if ( disabled )
@ -77,7 +76,7 @@ void ReaderFrontend::Init(const ReaderBackend::ReaderInfo& arg_info, ReaderMode
fields = arg_fields; fields = arg_fields;
initialized = true; initialized = true;
backend->SendIn(new InitMessage(backend, info, mode, num_fields, fields)); backend->SendIn(new InitMessage(backend, info, num_fields, fields));
} }
void ReaderFrontend::Update() void ReaderFrontend::Update()

View file

@ -52,7 +52,7 @@ public:
* *
* This method must only be called from the main thread. * This method must only be called from the main thread.
*/ */
void Init(const ReaderBackend::ReaderInfo& info, ReaderMode mode, const int arg_num_fields, const threading::Field* const* fields); void Init(const ReaderBackend::ReaderInfo& info, const int arg_num_fields, const threading::Field* const* fields);
/** /**
* Force an update of the current input source. Actual action depends * Force an update of the current input source. Actual action depends

View file

@ -83,7 +83,7 @@ void Ascii::DoClose()
} }
} }
bool Ascii::DoInit(const ReaderInfo& info, ReaderMode mode, int num_fields, const Field* const* fields) bool Ascii::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fields)
{ {
mtime = 0; mtime = 0;
@ -362,7 +362,7 @@ Value* Ascii::EntryToVal(string s, FieldMapping field)
// read the entire file and send appropriate thingies back to InputMgr // read the entire file and send appropriate thingies back to InputMgr
bool Ascii::DoUpdate() bool Ascii::DoUpdate()
{ {
switch ( Mode() ) { switch ( Info().mode ) {
case MODE_REREAD: case MODE_REREAD:
{ {
// check if the file has changed // check if the file has changed
@ -389,7 +389,7 @@ bool Ascii::DoUpdate()
// - this is not that bad) // - this is not that bad)
if ( file && file->is_open() ) if ( file && file->is_open() )
{ {
if ( Mode() == MODE_STREAM ) if ( Info().mode == MODE_STREAM )
{ {
file->clear(); // remove end of file evil bits file->clear(); // remove end of file evil bits
if ( !ReadHeader(true) ) if ( !ReadHeader(true) )
@ -492,13 +492,13 @@ bool Ascii::DoUpdate()
//printf("fpos: %d, second.num_fields: %d\n", fpos, (*it).second.num_fields); //printf("fpos: %d, second.num_fields: %d\n", fpos, (*it).second.num_fields);
assert ( fpos == NumFields() ); assert ( fpos == NumFields() );
if ( Mode() == MODE_STREAM ) if ( Info().mode == MODE_STREAM )
Put(fields); Put(fields);
else else
SendEntry(fields); SendEntry(fields);
} }
if ( Mode () != MODE_STREAM ) if ( Info().mode != MODE_STREAM )
EndCurrentSend(); EndCurrentSend();
return true; return true;
@ -508,7 +508,7 @@ bool Ascii::DoHeartbeat(double network_time, double current_time)
{ {
ReaderBackend::DoHeartbeat(network_time, current_time); ReaderBackend::DoHeartbeat(network_time, current_time);
switch ( Mode() ) { switch ( Info().mode ) {
case MODE_MANUAL: case MODE_MANUAL:
// yay, we do nothing :) // yay, we do nothing :)
break; break;

View file

@ -38,7 +38,7 @@ public:
static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Ascii(frontend); } static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Ascii(frontend); }
protected: protected:
virtual bool DoInit(const ReaderInfo& info, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields); virtual bool DoInit(const ReaderInfo& info, int arg_num_fields, const threading::Field* const* fields);
virtual void DoClose(); virtual void DoClose();
virtual bool DoUpdate(); virtual bool DoUpdate();
virtual bool DoHeartbeat(double network_time, double current_time); virtual bool DoHeartbeat(double network_time, double current_time);

View file

@ -36,7 +36,7 @@ void Benchmark::DoClose()
{ {
} }
bool Benchmark::DoInit(const ReaderInfo& info, ReaderMode mode, int num_fields, const Field* const* fields) bool Benchmark::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fields)
{ {
num_lines = atoi(info.source.c_str()); num_lines = atoi(info.source.c_str());
@ -83,7 +83,7 @@ bool Benchmark::DoUpdate()
for (int j = 0; j < NumFields(); j++ ) for (int j = 0; j < NumFields(); j++ )
field[j] = EntryToVal(Fields()[j]->type, Fields()[j]->subtype); field[j] = EntryToVal(Fields()[j]->type, Fields()[j]->subtype);
if ( Mode() == MODE_STREAM ) if ( Info().mode == MODE_STREAM )
// do not do tracking, spread out elements over the second that we have... // do not do tracking, spread out elements over the second that we have...
Put(field); Put(field);
else else
@ -109,7 +109,7 @@ bool Benchmark::DoUpdate()
} }
if ( Mode() != MODE_STREAM ) if ( Info().mode != MODE_STREAM )
EndCurrentSend(); EndCurrentSend();
return true; return true;
@ -227,7 +227,7 @@ bool Benchmark::DoHeartbeat(double network_time, double current_time)
num_lines += add; num_lines += add;
heartbeatstarttime = CurrTime(); heartbeatstarttime = CurrTime();
switch ( Mode() ) { switch ( Info().mode ) {
case MODE_MANUAL: case MODE_MANUAL:
// yay, we do nothing :) // yay, we do nothing :)
break; break;

View file

@ -18,7 +18,7 @@ public:
static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Benchmark(frontend); } static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Benchmark(frontend); }
protected: protected:
virtual bool DoInit(const ReaderInfo& info, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields); virtual bool DoInit(const ReaderInfo& info, int arg_num_fields, const threading::Field* const* fields);
virtual void DoClose(); virtual void DoClose();
virtual bool DoUpdate(); virtual bool DoUpdate();
virtual bool DoHeartbeat(double network_time, double current_time); virtual bool DoHeartbeat(double network_time, double current_time);

View file

@ -66,7 +66,7 @@ bool Raw::OpenInput()
// This is defined in input/fdstream.h // This is defined in input/fdstream.h
in = new boost::fdistream(fileno(file)); in = new boost::fdistream(fileno(file));
if ( execute && Mode() == MODE_STREAM ) if ( execute && Info().mode == MODE_STREAM )
fcntl(fileno(file), F_SETFL, O_NONBLOCK); fcntl(fileno(file), F_SETFL, O_NONBLOCK);
return true; return true;
@ -100,7 +100,7 @@ bool Raw::CloseInput()
return true; return true;
} }
bool Raw::DoInit(const ReaderInfo& info, ReaderMode mode, int num_fields, const Field* const* fields) bool Raw::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fields)
{ {
fname = info.source; fname = info.source;
mtime = 0; mtime = 0;
@ -135,10 +135,10 @@ bool Raw::DoInit(const ReaderInfo& info, ReaderMode mode, int num_fields, const
execute = true; execute = true;
fname = info.source.substr(0, fname.length() - 1); fname = info.source.substr(0, fname.length() - 1);
if ( (mode != MODE_MANUAL) ) if ( (info.mode != MODE_MANUAL) )
{ {
Error(Fmt("Unsupported read mode %d for source %s in execution mode", Error(Fmt("Unsupported read mode %d for source %s in execution mode",
mode, fname.c_str())); info.mode, fname.c_str()));
return false; return false;
} }
@ -187,7 +187,7 @@ bool Raw::DoUpdate()
else else
{ {
switch ( Mode() ) { switch ( Info().mode ) {
case MODE_REREAD: case MODE_REREAD:
{ {
// check if the file has changed // check if the file has changed
@ -210,7 +210,7 @@ bool Raw::DoUpdate()
case MODE_MANUAL: case MODE_MANUAL:
case MODE_STREAM: case MODE_STREAM:
if ( Mode() == MODE_STREAM && file != NULL && in != NULL ) if ( Info().mode == MODE_STREAM && file != NULL && in != NULL )
{ {
//fpurge(file); //fpurge(file);
in->clear(); // remove end of file evil bits in->clear(); // remove end of file evil bits
@ -254,7 +254,7 @@ bool Raw::DoHeartbeat(double network_time, double current_time)
{ {
ReaderBackend::DoHeartbeat(network_time, current_time); ReaderBackend::DoHeartbeat(network_time, current_time);
switch ( Mode() ) { switch ( Info().mode ) {
case MODE_MANUAL: case MODE_MANUAL:
// yay, we do nothing :) // yay, we do nothing :)
break; break;

View file

@ -22,7 +22,7 @@ public:
static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Raw(frontend); } static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Raw(frontend); }
protected: protected:
virtual bool DoInit(const ReaderInfo& info, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields); virtual bool DoInit(const ReaderInfo& info, int arg_num_fields, const threading::Field* const* fields);
virtual void DoClose(); virtual void DoClose();
virtual bool DoUpdate(); virtual bool DoUpdate();
virtual bool DoHeartbeat(double network_time, double current_time); virtual bool DoHeartbeat(double network_time, double current_time);