Merge remote-tracking branch 'origin/topic/bernhard/reader-info'

* origin/topic/bernhard/reader-info:
  fix small bug - now configuration actually is passed.
  add mode to readerinfo - no need to have it separately everywhere anymore.
  introduce reader-info struct analogous to writer-info.
  Introduce support for a table of key/value pairs with further configuration options, with the same userinterface as in the logging interface.
  make writer-info work when debugging is enabled

Conflicts:
	testing/btest/Baseline/scripts.base.frameworks.input.event/out
	testing/btest/Baseline/scripts.base.frameworks.input.executeraw/out
	testing/btest/Baseline/scripts.base.frameworks.input.raw/out
	testing/btest/Baseline/scripts.base.frameworks.input.rereadraw/out
	testing/btest/Baseline/scripts.base.frameworks.input.tableevent/out

Closes #841.
This commit is contained in:
Robin Sommer 2012-07-02 15:31:24 -07:00
commit 06d2fd52bd
18 changed files with 213 additions and 106 deletions

View file

@ -1,4 +1,13 @@
2.0-733 | 2012-07-02 15:31:24 -0700
* Extending the input reader DoInit() API. (Bernhard Amann). It now
provides a Info struct similar to what we introduced for log
writers, including a corresponding "config" key/value table.
* Fix to make writer-info work when debugging is enabled. (Bernhard
Amann)
2.0-726 | 2012-07-02 15:19:15 -0700 2.0-726 | 2012-07-02 15:19:15 -0700
* Extending the log writer DoInit() API. (Robin Sommer) * Extending the log writer DoInit() API. (Robin Sommer)

View file

@ -1 +1 @@
2.0-726 2.0-733

View file

@ -53,6 +53,11 @@ export {
## really be executed. Parameters are the same as for the event. If true is ## really be executed. Parameters are the same as for the event. If true is
## returned, the update is performed. If false is returned, it is skipped. ## returned, the update is performed. If false is returned, it is skipped.
pred: function(typ: Input::Event, left: any, right: any): bool &optional; pred: function(typ: Input::Event, left: any, right: any): bool &optional;
## A key/value table that will be passed on the reader.
## Interpretation of the values is left to the writer, but
## usually they will be used for configuration purposes.
config: table[string] of string &default=table();
}; };
## EventFilter description type used for the `event` method. ## EventFilter description type used for the `event` method.
@ -85,6 +90,10 @@ export {
## The event will receive an Input::Event enum as the first element, and the fields as the following arguments. ## The event will receive an Input::Event enum as the first element, and the fields as the following arguments.
ev: any; ev: any;
## A key/value table that will be passed on the reader.
## Interpretation of the values is left to the writer, but
## usually they will be used for configuration purposes.
config: table[string] of string &default=table();
}; };
## Create a new table input from a given source. Returns true on success. ## Create a new table input from a given source. Returns true on success.

View file

@ -140,9 +140,9 @@ export {
postprocessor: function(info: RotationInfo) : bool &optional; postprocessor: function(info: RotationInfo) : bool &optional;
## A key/value table that will be passed on to the writer. ## A key/value table that will be passed on to the writer.
## Interpretation of the values is left to the writer, but ## Interpretation of the values is left to the writer, but
## usually they will be used for configuration purposes. ## usually they will be used for configuration purposes.
config: table[string] of string &default=table(); config: table[string] of string &default=table();
}; };
## Sentinel value for indicating that a filter was not found when looked up. ## Sentinel value for indicating that a filter was not found when looked up.

View file

@ -71,15 +71,14 @@ declare(PDict, InputHash);
class Manager::Stream { class Manager::Stream {
public: public:
string name; string name;
string source; ReaderBackend::ReaderInfo info;
bool removed; bool removed;
ReaderMode mode;
StreamType stream_type; // to distinguish between event and table streams StreamType stream_type; // to distinguish between event and table streams
EnumVal* type; EnumVal* type;
ReaderFrontend* reader; ReaderFrontend* reader;
TableVal* config;
RecordVal* description; RecordVal* description;
@ -103,6 +102,9 @@ Manager::Stream::~Stream()
if ( description ) if ( description )
Unref(description); Unref(description);
if ( config )
Unref(config);
if ( reader ) if ( reader )
delete(reader); delete(reader);
} }
@ -300,19 +302,20 @@ bool Manager::CreateStream(Stream* info, RecordVal* description)
Unref(sourceval); Unref(sourceval);
EnumVal* mode = description->LookupWithDefault(rtype->FieldOffset("mode"))->AsEnumVal(); EnumVal* mode = description->LookupWithDefault(rtype->FieldOffset("mode"))->AsEnumVal();
Val* config = description->LookupWithDefault(rtype->FieldOffset("config"));
switch ( mode->InternalInt() ) switch ( mode->InternalInt() )
{ {
case 0: case 0:
info->mode = MODE_MANUAL; info->info.mode = MODE_MANUAL;
break; break;
case 1: case 1:
info->mode = MODE_REREAD; info->info.mode = MODE_REREAD;
break; break;
case 2: case 2:
info->mode = MODE_STREAM; info->info.mode = MODE_STREAM;
break; break;
default: default:
@ -324,10 +327,30 @@ bool Manager::CreateStream(Stream* info, RecordVal* description)
info->reader = reader_obj; info->reader = reader_obj;
info->type = reader->AsEnumVal(); // ref'd by lookupwithdefault info->type = reader->AsEnumVal(); // ref'd by lookupwithdefault
info->name = name; info->name = name;
info->source = source; info->config = config->AsTableVal(); // ref'd by LookupWithDefault
info->info.source = source;
Ref(description); Ref(description);
info->description = description; info->description = description;
{
HashKey* k;
IterCookie* c = info->config->AsTable()->InitForIteration();
TableEntryVal* v;
while ( (v = info->config->AsTable()->NextEntry(k, c)) )
{
ListVal* index = info->config->RecoverIndex(k);
string key = index->Index(0)->AsString()->CheckString();
string value = v->Value()->AsString()->CheckString();
info->info.config.insert(std::make_pair(key, value));
Unref(index);
delete k;
}
}
DBG_LOG(DBG_INPUT, "Successfully created new input stream %s", DBG_LOG(DBG_INPUT, "Successfully created new input stream %s",
name.c_str()); name.c_str());
@ -451,7 +474,8 @@ bool Manager::CreateEventStream(RecordVal* fval)
Unref(want_record); // ref'd by lookupwithdefault Unref(want_record); // ref'd by lookupwithdefault
assert(stream->reader); assert(stream->reader);
stream->reader->Init(stream->source, stream->mode, stream->num_fields, logf );
stream->reader->Init(stream->info, stream->num_fields, logf );
readers[stream->reader] = stream; readers[stream->reader] = stream;
@ -628,7 +652,7 @@ bool Manager::CreateTableStream(RecordVal* fval)
assert(stream->reader); assert(stream->reader);
stream->reader->Init(stream->source, stream->mode, fieldsV.size(), fields ); stream->reader->Init(stream->info, fieldsV.size(), fields );
readers[stream->reader] = stream; readers[stream->reader] = stream;
@ -710,7 +734,7 @@ bool Manager::RemoveStream(Stream *i)
return true; return true;
} }
bool Manager::RemoveStream(ReaderFrontend* frontend) bool Manager::RemoveStream(ReaderFrontend* frontend)
{ {
return RemoveStream(FindStream(frontend)); return RemoveStream(FindStream(frontend));
} }
@ -1208,7 +1232,7 @@ void Manager::EndCurrentSend(ReaderFrontend* reader)
#endif #endif
// Send event that the current update is indeed finished. // Send event that the current update is indeed finished.
SendEvent(update_finished, 2, new StringVal(i->name.c_str()), new StringVal(i->source.c_str())); SendEvent(update_finished, 2, new StringVal(i->name.c_str()), new StringVal(i->info.source.c_str()));
} }
void Manager::Put(ReaderFrontend* reader, Value* *vals) void Manager::Put(ReaderFrontend* reader, Value* *vals)

View file

@ -140,6 +140,7 @@ public:
} }
}; };
using namespace logging;
ReaderBackend::ReaderBackend(ReaderFrontend* arg_frontend) : MsgThread() ReaderBackend::ReaderBackend(ReaderFrontend* arg_frontend) : MsgThread()
{ {
@ -183,18 +184,17 @@ void ReaderBackend::SendEntry(Value* *vals)
SendOut(new SendEntryMessage(frontend, vals)); SendOut(new SendEntryMessage(frontend, vals));
} }
bool ReaderBackend::Init(string arg_source, ReaderMode arg_mode, const int arg_num_fields, bool ReaderBackend::Init(const ReaderInfo& arg_info, const int arg_num_fields,
const threading::Field* const* arg_fields) const threading::Field* const* arg_fields)
{ {
source = arg_source; info = arg_info;
mode = arg_mode;
num_fields = arg_num_fields; num_fields = arg_num_fields;
fields = arg_fields; fields = arg_fields;
SetName("InputReader/"+source); SetName("InputReader/"+info.source);
// disable if DoInit returns error. // disable if DoInit returns error.
int success = DoInit(arg_source, mode, arg_num_fields, arg_fields); int success = DoInit(arg_info, arg_num_fields, arg_fields);
if ( ! success ) if ( ! success )
{ {

View file

@ -65,23 +65,48 @@ public:
*/ */
virtual ~ReaderBackend(); virtual ~ReaderBackend();
/**
* A struct passing information to the reader at initialization time.
*/
struct ReaderInfo
{
typedef std::map<string, string> config_map;
/**
* A string left to the interpretation of the reader
* implementation; it corresponds to the value configured on
* the script-level for the logging filter.
*/
string source;
/**
* A map of key/value pairs corresponding to the relevant
* filter's "config" table.
*/
config_map config;
/**
* The opening mode for the input source.
*/
ReaderMode mode;
};
/** /**
* One-time initialization of the reader to define the input source. * One-time initialization of the reader to define the input source.
* *
* @param source A string left to the interpretation of the * @param @param info Meta information for the writer.
* reader implementation; it corresponds to the value configured on
* the script-level for the input stream.
*
* @param mode The opening mode for the input source.
* *
* @param num_fields Number of fields contained in \a fields. * @param num_fields Number of fields contained in \a fields.
* *
* @param fields The types and names of the fields to be retrieved * @param fields The types and names of the fields to be retrieved
* from the input source. * from the input source.
* *
* @param config A string map containing additional configuration options
* for the reader.
*
* @return False if an error occured. * @return False if an error occured.
*/ */
bool Init(string source, ReaderMode mode, int num_fields, const threading::Field* const* fields); bool Init(const ReaderInfo& info, int num_fields, const threading::Field* const* fields);
/** /**
* Finishes reading from this input stream in a regular fashion. Must * Finishes reading from this input stream in a regular fashion. Must
@ -109,6 +134,22 @@ public:
*/ */
void DisableFrontend(); void DisableFrontend();
/**
* Returns the log fields as passed into the constructor.
*/
const threading::Field* const * Fields() const { return fields; }
/**
* Returns the additional reader information into the constructor.
*/
const ReaderInfo& Info() const { return info; }
/**
* Returns the number of log fields as passed into the constructor.
*/
int NumFields() const { return num_fields; }
protected: protected:
// Methods that have to be overwritten by the individual readers // Methods that have to be overwritten by the individual readers
@ -130,7 +171,7 @@ protected:
* provides accessor methods to get them later, and they are passed * provides accessor methods to get them later, and they are passed
* in here only for convinience. * in here only for convinience.
*/ */
virtual bool DoInit(string path, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields) = 0; virtual bool DoInit(const ReaderInfo& info, int arg_num_fields, const threading::Field* const* fields) = 0;
/** /**
* Reader-specific method implementing input finalization at * Reader-specific method implementing input finalization at
@ -159,26 +200,6 @@ protected:
*/ */
virtual bool DoUpdate() = 0; virtual bool DoUpdate() = 0;
/**
* Returns the input source as passed into Init()/.
*/
const string Source() const { return source; }
/**
* Returns the reader mode as passed into Init().
*/
const ReaderMode Mode() const { return mode; }
/**
* Returns the number of log fields as passed into Init().
*/
unsigned int NumFields() const { return num_fields; }
/**
* Returns the log fields as passed into Init().
*/
const threading::Field* const * Fields() const { return fields; }
/** /**
* Method allowing a reader to send a specified Bro event. Vals must * Method allowing a reader to send a specified Bro event. Vals must
* match the values expected by the bro event. * match the values expected by the bro event.
@ -279,8 +300,7 @@ private:
// from this class, it's running in a different thread! // from this class, it's running in a different thread!
ReaderFrontend* frontend; ReaderFrontend* frontend;
string source; ReaderInfo info;
ReaderMode mode;
unsigned int num_fields; unsigned int num_fields;
const threading::Field* const * fields; // raw mapping const threading::Field* const * fields; // raw mapping

View file

@ -11,19 +11,18 @@ namespace input {
class InitMessage : public threading::InputMessage<ReaderBackend> class InitMessage : public threading::InputMessage<ReaderBackend>
{ {
public: public:
InitMessage(ReaderBackend* backend, const string source, ReaderMode mode, InitMessage(ReaderBackend* backend, const ReaderBackend::ReaderInfo& info,
const int num_fields, const threading::Field* const* fields) const int num_fields, const threading::Field* const* fields)
: threading::InputMessage<ReaderBackend>("Init", backend), : threading::InputMessage<ReaderBackend>("Init", backend),
source(source), mode(mode), num_fields(num_fields), fields(fields) { } info(info), num_fields(num_fields), fields(fields) { }
virtual bool Process() virtual bool Process()
{ {
return Object()->Init(source, mode, num_fields, fields); return Object()->Init(info, num_fields, fields);
} }
private: private:
const string source; const ReaderBackend::ReaderInfo info;
const ReaderMode mode;
const int num_fields; const int num_fields;
const threading::Field* const* fields; const threading::Field* const* fields;
}; };
@ -63,8 +62,8 @@ ReaderFrontend::~ReaderFrontend()
{ {
} }
void ReaderFrontend::Init(string arg_source, ReaderMode mode, const int num_fields, void ReaderFrontend::Init(const ReaderBackend::ReaderInfo& arg_info, const int arg_num_fields,
const threading::Field* const* fields) const threading::Field* const* arg_fields)
{ {
if ( disabled ) if ( disabled )
return; return;
@ -72,10 +71,12 @@ void ReaderFrontend::Init(string arg_source, ReaderMode mode, const int num_fiel
if ( initialized ) if ( initialized )
reporter->InternalError("reader initialize twice"); reporter->InternalError("reader initialize twice");
source = arg_source; info = arg_info;
num_fields = arg_num_fields;
fields = arg_fields;
initialized = true; initialized = true;
backend->SendIn(new InitMessage(backend, arg_source, mode, num_fields, fields)); backend->SendIn(new InitMessage(backend, info, num_fields, fields));
} }
void ReaderFrontend::Update() void ReaderFrontend::Update()
@ -109,10 +110,10 @@ void ReaderFrontend::Close()
string ReaderFrontend::Name() const string ReaderFrontend::Name() const
{ {
if ( source.size() ) if ( ! info.source.size() )
return ty_name; return ty_name;
return ty_name + "/" + source; return ty_name + "/" + info.source;
} }
} }

View file

@ -52,7 +52,7 @@ public:
* *
* This method must only be called from the main thread. * This method must only be called from the main thread.
*/ */
void Init(string arg_source, ReaderMode mode, const int arg_num_fields, const threading::Field* const* fields); void Init(const ReaderBackend::ReaderInfo& info, const int arg_num_fields, const threading::Field* const* fields);
/** /**
* Force an update of the current input source. Actual action depends * Force an update of the current input source. Actual action depends
@ -102,13 +102,23 @@ public:
*/ */
string Name() const; string Name() const;
protected: /**
friend class Manager; * Returns the additional reader information passed into the constructor.
*/
const ReaderBackend::ReaderInfo& Info() const { return info; }
/** /**
* Returns the source as passed into the constructor. * Returns the number of log fields as passed into the constructor.
*/ */
const string& Source() const { return source; }; int NumFields() const { return num_fields; }
/**
* Returns the log fields as passed into the constructor.
*/
const threading::Field* const * Fields() const { return fields; }
protected:
friend class Manager;
/** /**
* Returns the name of the backend's type. * Returns the name of the backend's type.
@ -122,7 +132,9 @@ protected:
private: private:
ReaderBackend* backend; // The backend we have instanatiated. ReaderBackend* backend; // The backend we have instanatiated.
string source; ReaderBackend::ReaderInfo info; // Meta information as passed to Init().
const threading::Field* const* fields; // The input fields.
int num_fields; // Information as passed to Init().
string ty_name; // Backend type, set by manager. string ty_name; // Backend type, set by manager.
bool disabled; // True if disabled. bool disabled; // True if disabled.
bool initialized; // True if initialized. bool initialized; // True if initialized.

View file

@ -83,14 +83,14 @@ void Ascii::DoClose()
} }
} }
bool Ascii::DoInit(string path, ReaderMode mode, int num_fields, const Field* const* fields) bool Ascii::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fields)
{ {
mtime = 0; mtime = 0;
file = new ifstream(path.c_str()); file = new ifstream(info.source.c_str());
if ( ! file->is_open() ) if ( ! file->is_open() )
{ {
Error(Fmt("Init: cannot open %s", path.c_str())); Error(Fmt("Init: cannot open %s", info.source.c_str()));
delete(file); delete(file);
file = 0; file = 0;
return false; return false;
@ -98,7 +98,7 @@ bool Ascii::DoInit(string path, ReaderMode mode, int num_fields, const Field* co
if ( ReadHeader(false) == false ) if ( ReadHeader(false) == false )
{ {
Error(Fmt("Init: cannot open %s; headers are incorrect", path.c_str())); Error(Fmt("Init: cannot open %s; headers are incorrect", info.source.c_str()));
file->close(); file->close();
delete(file); delete(file);
file = 0; file = 0;
@ -147,7 +147,7 @@ bool Ascii::ReadHeader(bool useCached)
//printf("Updating fields from description %s\n", line.c_str()); //printf("Updating fields from description %s\n", line.c_str());
columnMap.clear(); columnMap.clear();
for ( unsigned int i = 0; i < NumFields(); i++ ) for ( int i = 0; i < NumFields(); i++ )
{ {
const Field* field = Fields()[i]; const Field* field = Fields()[i];
@ -164,7 +164,7 @@ bool Ascii::ReadHeader(bool useCached)
} }
Error(Fmt("Did not find requested field %s in input data file %s.", Error(Fmt("Did not find requested field %s in input data file %s.",
field->name.c_str(), Source().c_str())); field->name.c_str(), Info().source.c_str()));
return false; return false;
} }
@ -362,14 +362,14 @@ Value* Ascii::EntryToVal(string s, FieldMapping field)
// read the entire file and send appropriate thingies back to InputMgr // read the entire file and send appropriate thingies back to InputMgr
bool Ascii::DoUpdate() bool Ascii::DoUpdate()
{ {
switch ( Mode() ) { switch ( Info().mode ) {
case MODE_REREAD: case MODE_REREAD:
{ {
// check if the file has changed // check if the file has changed
struct stat sb; struct stat sb;
if ( stat(Source().c_str(), &sb) == -1 ) if ( stat(Info().source.c_str(), &sb) == -1 )
{ {
Error(Fmt("Could not get stat for %s", Source().c_str())); Error(Fmt("Could not get stat for %s", Info().source.c_str()));
return false; return false;
} }
@ -389,7 +389,7 @@ bool Ascii::DoUpdate()
// - this is not that bad) // - this is not that bad)
if ( file && file->is_open() ) if ( file && file->is_open() )
{ {
if ( Mode() == MODE_STREAM ) if ( Info().mode == MODE_STREAM )
{ {
file->clear(); // remove end of file evil bits file->clear(); // remove end of file evil bits
if ( !ReadHeader(true) ) if ( !ReadHeader(true) )
@ -403,10 +403,10 @@ bool Ascii::DoUpdate()
file = 0; file = 0;
} }
file = new ifstream(Source().c_str()); file = new ifstream(Info().source.c_str());
if ( ! file->is_open() ) if ( ! file->is_open() )
{ {
Error(Fmt("cannot open %s", Source().c_str())); Error(Fmt("cannot open %s", Info().source.c_str()));
return false; return false;
} }
@ -490,15 +490,15 @@ bool Ascii::DoUpdate()
} }
//printf("fpos: %d, second.num_fields: %d\n", fpos, (*it).second.num_fields); //printf("fpos: %d, second.num_fields: %d\n", fpos, (*it).second.num_fields);
assert ( (unsigned int) fpos == NumFields() ); assert ( fpos == NumFields() );
if ( Mode() == MODE_STREAM ) if ( Info().mode == MODE_STREAM )
Put(fields); Put(fields);
else else
SendEntry(fields); SendEntry(fields);
} }
if ( Mode () != MODE_STREAM ) if ( Info().mode != MODE_STREAM )
EndCurrentSend(); EndCurrentSend();
return true; return true;
@ -508,7 +508,7 @@ bool Ascii::DoHeartbeat(double network_time, double current_time)
{ {
ReaderBackend::DoHeartbeat(network_time, current_time); ReaderBackend::DoHeartbeat(network_time, current_time);
switch ( Mode() ) { switch ( Info().mode ) {
case MODE_MANUAL: case MODE_MANUAL:
// yay, we do nothing :) // yay, we do nothing :)
break; break;

View file

@ -38,7 +38,7 @@ public:
static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Ascii(frontend); } static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Ascii(frontend); }
protected: protected:
virtual bool DoInit(string path, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields); virtual bool DoInit(const ReaderInfo& info, int arg_num_fields, const threading::Field* const* fields);
virtual void DoClose(); virtual void DoClose();
virtual bool DoUpdate(); virtual bool DoUpdate();
virtual bool DoHeartbeat(double network_time, double current_time); virtual bool DoHeartbeat(double network_time, double current_time);

View file

@ -36,9 +36,9 @@ void Benchmark::DoClose()
{ {
} }
bool Benchmark::DoInit(string path, ReaderMode mode, int num_fields, const Field* const* fields) bool Benchmark::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fields)
{ {
num_lines = atoi(path.c_str()); num_lines = atoi(info.source.c_str());
if ( autospread != 0.0 ) if ( autospread != 0.0 )
autospread_time = (int) ( (double) 1000000 / (autospread * (double) num_lines) ); autospread_time = (int) ( (double) 1000000 / (autospread * (double) num_lines) );
@ -80,10 +80,10 @@ bool Benchmark::DoUpdate()
for ( int i = 0; i < linestosend; i++ ) for ( int i = 0; i < linestosend; i++ )
{ {
Value** field = new Value*[NumFields()]; Value** field = new Value*[NumFields()];
for (unsigned int j = 0; j < NumFields(); j++ ) for (int j = 0; j < NumFields(); j++ )
field[j] = EntryToVal(Fields()[j]->type, Fields()[j]->subtype); field[j] = EntryToVal(Fields()[j]->type, Fields()[j]->subtype);
if ( Mode() == MODE_STREAM ) if ( Info().mode == MODE_STREAM )
// do not do tracking, spread out elements over the second that we have... // do not do tracking, spread out elements over the second that we have...
Put(field); Put(field);
else else
@ -109,7 +109,7 @@ bool Benchmark::DoUpdate()
} }
if ( Mode() != MODE_STREAM ) if ( Info().mode != MODE_STREAM )
EndCurrentSend(); EndCurrentSend();
return true; return true;
@ -227,7 +227,7 @@ bool Benchmark::DoHeartbeat(double network_time, double current_time)
num_lines += add; num_lines += add;
heartbeatstarttime = CurrTime(); heartbeatstarttime = CurrTime();
switch ( Mode() ) { switch ( Info().mode ) {
case MODE_MANUAL: case MODE_MANUAL:
// yay, we do nothing :) // yay, we do nothing :)
break; break;

View file

@ -18,7 +18,7 @@ public:
static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Benchmark(frontend); } static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Benchmark(frontend); }
protected: protected:
virtual bool DoInit(string path, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields); virtual bool DoInit(const ReaderInfo& info, int arg_num_fields, const threading::Field* const* fields);
virtual void DoClose(); virtual void DoClose();
virtual bool DoUpdate(); virtual bool DoUpdate();
virtual bool DoHeartbeat(double network_time, double current_time); virtual bool DoHeartbeat(double network_time, double current_time);

View file

@ -66,7 +66,7 @@ bool Raw::OpenInput()
// This is defined in input/fdstream.h // This is defined in input/fdstream.h
in = new boost::fdistream(fileno(file)); in = new boost::fdistream(fileno(file));
if ( execute && Mode() == MODE_STREAM ) if ( execute && Info().mode == MODE_STREAM )
fcntl(fileno(file), F_SETFL, O_NONBLOCK); fcntl(fileno(file), F_SETFL, O_NONBLOCK);
return true; return true;
@ -100,15 +100,15 @@ bool Raw::CloseInput()
return true; return true;
} }
bool Raw::DoInit(string path, ReaderMode mode, int num_fields, const Field* const* fields) bool Raw::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fields)
{ {
fname = path; fname = info.source;
mtime = 0; mtime = 0;
execute = false; execute = false;
firstrun = true; firstrun = true;
bool result; bool result;
if ( path.length() == 0 ) if ( info.source.length() == 0 )
{ {
Error("No source path provided"); Error("No source path provided");
return false; return false;
@ -129,16 +129,16 @@ bool Raw::DoInit(string path, ReaderMode mode, int num_fields, const Field* cons
} }
// do Initialization // do Initialization
char last = path[path.length()-1]; char last = info.source[info.source.length()-1];
if ( last == '|' ) if ( last == '|' )
{ {
execute = true; execute = true;
fname = path.substr(0, fname.length() - 1); fname = info.source.substr(0, fname.length() - 1);
if ( (mode != MODE_MANUAL) ) if ( (info.mode != MODE_MANUAL) )
{ {
Error(Fmt("Unsupported read mode %d for source %s in execution mode", Error(Fmt("Unsupported read mode %d for source %s in execution mode",
mode, fname.c_str())); info.mode, fname.c_str()));
return false; return false;
} }
@ -187,7 +187,7 @@ bool Raw::DoUpdate()
else else
{ {
switch ( Mode() ) { switch ( Info().mode ) {
case MODE_REREAD: case MODE_REREAD:
{ {
// check if the file has changed // check if the file has changed
@ -210,7 +210,7 @@ bool Raw::DoUpdate()
case MODE_MANUAL: case MODE_MANUAL:
case MODE_STREAM: case MODE_STREAM:
if ( Mode() == MODE_STREAM && file != NULL && in != NULL ) if ( Info().mode == MODE_STREAM && file != NULL && in != NULL )
{ {
//fpurge(file); //fpurge(file);
in->clear(); // remove end of file evil bits in->clear(); // remove end of file evil bits
@ -254,7 +254,7 @@ bool Raw::DoHeartbeat(double network_time, double current_time)
{ {
ReaderBackend::DoHeartbeat(network_time, current_time); ReaderBackend::DoHeartbeat(network_time, current_time);
switch ( Mode() ) { switch ( Info().mode ) {
case MODE_MANUAL: case MODE_MANUAL:
// yay, we do nothing :) // yay, we do nothing :)
break; break;

View file

@ -22,7 +22,7 @@ public:
static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Raw(frontend); } static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Raw(frontend); }
protected: protected:
virtual bool DoInit(string path, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields); virtual bool DoInit(const ReaderInfo& info, int arg_num_fields, const threading::Field* const* fields);
virtual void DoClose(); virtual void DoClose();
virtual bool DoUpdate(); virtual bool DoUpdate();
virtual bool DoHeartbeat(double network_time, double current_time); virtual bool DoHeartbeat(double network_time, double current_time);

View file

@ -1261,14 +1261,14 @@ void Manager::InstallRotationTimer(WriterInfo* winfo)
timer_mgr->Add(winfo->rotation_timer); timer_mgr->Add(winfo->rotation_timer);
DBG_LOG(DBG_LOGGING, "Scheduled rotation timer for %s to %.6f", DBG_LOG(DBG_LOGGING, "Scheduled rotation timer for %s to %.6f",
winfo->writer->Path().c_str(), winfo->rotation_timer->Time()); winfo->writer->Name().c_str(), winfo->rotation_timer->Time());
} }
} }
void Manager::Rotate(WriterInfo* winfo) void Manager::Rotate(WriterInfo* winfo)
{ {
DBG_LOG(DBG_LOGGING, "Rotating %s at %.6f", DBG_LOG(DBG_LOGGING, "Rotating %s at %.6f",
winfo->writer->Path().c_str(), network_time); winfo->writer->Name().c_str(), network_time);
// Build a temporary path for the writer to move the file to. // Build a temporary path for the writer to move the file to.
struct tm tm; struct tm tm;
@ -1297,7 +1297,7 @@ bool Manager::FinishedRotation(WriterFrontend* writer, string new_name, string o
return true; return true;
DBG_LOG(DBG_LOGGING, "Finished rotating %s at %.6f, new name %s", DBG_LOG(DBG_LOGGING, "Finished rotating %s at %.6f, new name %s",
writer->Path().c_str(), network_time, new_name.c_str()); writer->Name().c_str(), network_time, new_name.c_str());
WriterInfo* winfo = FindWriter(writer); WriterInfo* winfo = FindWriter(writer);
if ( ! winfo ) if ( ! winfo )

View file

@ -46,6 +46,8 @@ print A::outfile, A::typ;
print A::outfile, A::left; print A::outfile, A::left;
print A::outfile, A::right; print A::outfile, A::right;
return (T); return (T);
}, config={
}] }]
Type Type
Input::EVENT_NEW Input::EVENT_NEW
@ -139,6 +141,8 @@ print A::outfile, A::typ;
print A::outfile, A::left; print A::outfile, A::left;
print A::outfile, A::right; print A::outfile, A::right;
return (T); return (T);
}, config={
}] }]
Type Type
Input::EVENT_NEW Input::EVENT_NEW
@ -244,6 +248,8 @@ print A::outfile, A::typ;
print A::outfile, A::left; print A::outfile, A::left;
print A::outfile, A::right; print A::outfile, A::right;
return (T); return (T);
}, config={
}] }]
Type Type
Input::EVENT_CHANGED Input::EVENT_CHANGED
@ -469,6 +475,8 @@ print A::outfile, A::typ;
print A::outfile, A::left; print A::outfile, A::left;
print A::outfile, A::right; print A::outfile, A::right;
return (T); return (T);
}, config={
}] }]
Type Type
Input::EVENT_NEW Input::EVENT_NEW
@ -592,6 +600,8 @@ print A::outfile, A::typ;
print A::outfile, A::left; print A::outfile, A::left;
print A::outfile, A::right; print A::outfile, A::right;
return (T); return (T);
}, config={
}] }]
Type Type
Input::EVENT_NEW Input::EVENT_NEW
@ -715,6 +725,8 @@ print A::outfile, A::typ;
print A::outfile, A::left; print A::outfile, A::left;
print A::outfile, A::right; print A::outfile, A::right;
return (T); return (T);
}, config={
}] }]
Type Type
Input::EVENT_NEW Input::EVENT_NEW
@ -838,6 +850,8 @@ print A::outfile, A::typ;
print A::outfile, A::left; print A::outfile, A::left;
print A::outfile, A::right; print A::outfile, A::right;
return (T); return (T);
}, config={
}] }]
Type Type
Input::EVENT_NEW Input::EVENT_NEW
@ -961,6 +975,8 @@ print A::outfile, A::typ;
print A::outfile, A::left; print A::outfile, A::left;
print A::outfile, A::right; print A::outfile, A::right;
return (T); return (T);
}, config={
}] }]
Type Type
Input::EVENT_NEW Input::EVENT_NEW

View file

@ -12,6 +12,8 @@ Input::remove(input);
terminate(); terminate();
} }
}, config={
}] }]
Input::EVENT_NEW Input::EVENT_NEW
sdfkh:KH;fdkncv;ISEUp34:Fkdj;YVpIODhfDF sdfkh:KH;fdkncv;ISEUp34:Fkdj;YVpIODhfDF
@ -29,6 +31,8 @@ Input::remove(input);
terminate(); terminate();
} }
}, config={
}] }]
Input::EVENT_NEW Input::EVENT_NEW
DSF"DFKJ"SDFKLh304yrsdkfj@#(*U$34jfDJup3UF DSF"DFKJ"SDFKLh304yrsdkfj@#(*U$34jfDJup3UF
@ -46,6 +50,8 @@ Input::remove(input);
terminate(); terminate();
} }
}, config={
}] }]
Input::EVENT_NEW Input::EVENT_NEW
q3r3057fdf q3r3057fdf
@ -63,6 +69,8 @@ Input::remove(input);
terminate(); terminate();
} }
}, config={
}] }]
Input::EVENT_NEW Input::EVENT_NEW
sdfs\d sdfs\d
@ -80,6 +88,8 @@ Input::remove(input);
terminate(); terminate();
} }
}, config={
}] }]
Input::EVENT_NEW Input::EVENT_NEW
@ -97,6 +107,8 @@ Input::remove(input);
terminate(); terminate();
} }
}, config={
}] }]
Input::EVENT_NEW Input::EVENT_NEW
dfsdf dfsdf
@ -114,6 +126,8 @@ Input::remove(input);
terminate(); terminate();
} }
}, config={
}] }]
Input::EVENT_NEW Input::EVENT_NEW
sdf sdf
@ -131,6 +145,8 @@ Input::remove(input);
terminate(); terminate();
} }
}, config={
}] }]
Input::EVENT_NEW Input::EVENT_NEW
3rw43wRRERLlL#RWERERERE. 3rw43wRRERLlL#RWERERERE.