Introduce support for a table of key/value pairs with further configuration options,

with the same userinterface as in the logging interface.

Not really tested, but tests still work.
This commit is contained in:
Bernhard Amann 2012-06-28 16:16:48 -07:00
parent 227159fd04
commit f820ee9f5c
19 changed files with 173 additions and 26 deletions

View file

@ -53,6 +53,10 @@ export {
## really be executed. Parameters are the same as for the event. If true is
## returned, the update is performed. If false is returned, it is skipped.
pred: function(typ: Input::Event, left: any, right: any): bool &optional;
## A key/value table that will be passed on the reader.
## Interpretation of the values is left to the reader.
config: table[string] of string &default=table();
};
## EventFilter description type used for the `event` method.
@ -85,6 +89,9 @@ export {
## The event will receive an Input::Event enum as the first element, and the fields as the following arguments.
ev: any;
## A key/value table that will be passed on the reader.
## Interpretation of the values is left to the reader.
config: table[string] of string &default=table();
};
## Create a new table input from a given source. Returns true on success.

View file

@ -80,6 +80,8 @@ public:
EnumVal* type;
ReaderFrontend* reader;
TableVal* config;
std::map<string, string> configmap;
RecordVal* description;
@ -103,6 +105,9 @@ Manager::Stream::~Stream()
if ( description )
Unref(description);
if ( config )
Unref(config);
if ( reader )
delete(reader);
}
@ -300,6 +305,7 @@ bool Manager::CreateStream(Stream* info, RecordVal* description)
Unref(sourceval);
EnumVal* mode = description->LookupWithDefault(rtype->FieldOffset("mode"))->AsEnumVal();
Val* config = description->LookupWithDefault(rtype->FieldOffset("config"));
switch ( mode->InternalInt() )
{
@ -325,8 +331,27 @@ bool Manager::CreateStream(Stream* info, RecordVal* description)
info->type = reader->AsEnumVal(); // ref'd by lookupwithdefault
info->name = name;
info->source = source;
info->config = config->AsTableVal(); // ref'd by LookupWithDefault
Ref(description);
info->description = description;
info->description = description;
{
HashKey* k;
IterCookie* c = info->config->AsTable()->InitForIteration();
TableEntryVal* v;
while ( (v = info->config->AsTable()->NextEntry(k, c)) )
{
ListVal* index = info->config->RecoverIndex(k);
string key = index->Index(0)->AsString()->CheckString();
string value = v->Value()->AsString()->CheckString();
info->configmap.insert(std::make_pair(key, value));
Unref(index);
delete k;
}
}
DBG_LOG(DBG_INPUT, "Successfully created new input stream %s",
name.c_str());
@ -451,7 +476,8 @@ bool Manager::CreateEventStream(RecordVal* fval)
Unref(want_record); // ref'd by lookupwithdefault
assert(stream->reader);
stream->reader->Init(stream->source, stream->mode, stream->num_fields, logf );
stream->reader->Init(stream->source, stream->mode, stream->num_fields, logf, stream->configmap );
readers[stream->reader] = stream;
@ -628,7 +654,7 @@ bool Manager::CreateTableStream(RecordVal* fval)
assert(stream->reader);
stream->reader->Init(stream->source, stream->mode, fieldsV.size(), fields );
stream->reader->Init(stream->source, stream->mode, fieldsV.size(), fields, stream->configmap );
readers[stream->reader] = stream;

View file

@ -184,7 +184,7 @@ void ReaderBackend::SendEntry(Value* *vals)
}
bool ReaderBackend::Init(string arg_source, ReaderMode arg_mode, const int arg_num_fields,
const threading::Field* const* arg_fields)
const threading::Field* const* arg_fields, const std::map<string, string> config)
{
source = arg_source;
mode = arg_mode;
@ -194,7 +194,7 @@ bool ReaderBackend::Init(string arg_source, ReaderMode arg_mode, const int arg_n
SetName("InputReader/"+source);
// disable if DoInit returns error.
int success = DoInit(arg_source, mode, arg_num_fields, arg_fields);
int success = DoInit(arg_source, mode, arg_num_fields, arg_fields, config);
if ( ! success )
{

View file

@ -79,9 +79,12 @@ public:
* @param fields The types and names of the fields to be retrieved
* from the input source.
*
* @param config A string map containing additional configuration options
* for the reader.
*
* @return False if an error occured.
*/
bool Init(string source, ReaderMode mode, int num_fields, const threading::Field* const* fields);
bool Init(string source, ReaderMode mode, int num_fields, const threading::Field* const* fields, std::map<string, string> config);
/**
* Finishes reading from this input stream in a regular fashion. Must
@ -130,7 +133,7 @@ protected:
* provides accessor methods to get them later, and they are passed
* in here only for convinience.
*/
virtual bool DoInit(string path, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields) = 0;
virtual bool DoInit(string path, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields, const std::map<string, string> config) = 0;
/**
* Reader-specific method implementing input finalization at

View file

@ -12,13 +12,13 @@ class InitMessage : public threading::InputMessage<ReaderBackend>
{
public:
InitMessage(ReaderBackend* backend, const string source, ReaderMode mode,
const int num_fields, const threading::Field* const* fields)
const int num_fields, const threading::Field* const* fields, const std::map<string, string> config)
: threading::InputMessage<ReaderBackend>("Init", backend),
source(source), mode(mode), num_fields(num_fields), fields(fields) { }
source(source), mode(mode), num_fields(num_fields), fields(fields), config(config) { }
virtual bool Process()
{
return Object()->Init(source, mode, num_fields, fields);
return Object()->Init(source, mode, num_fields, fields, config);
}
private:
@ -26,6 +26,7 @@ private:
const ReaderMode mode;
const int num_fields;
const threading::Field* const* fields;
const std::map<string, string> config;
};
class UpdateMessage : public threading::InputMessage<ReaderBackend>
@ -64,7 +65,7 @@ ReaderFrontend::~ReaderFrontend()
}
void ReaderFrontend::Init(string arg_source, ReaderMode mode, const int num_fields,
const threading::Field* const* fields)
const threading::Field* const* fields, const std::map<string, string> config)
{
if ( disabled )
return;
@ -75,7 +76,7 @@ void ReaderFrontend::Init(string arg_source, ReaderMode mode, const int num_fiel
source = arg_source;
initialized = true;
backend->SendIn(new InitMessage(backend, arg_source, mode, num_fields, fields));
backend->SendIn(new InitMessage(backend, arg_source, mode, num_fields, fields, config));
}
void ReaderFrontend::Update()

View file

@ -52,7 +52,7 @@ public:
*
* This method must only be called from the main thread.
*/
void Init(string arg_source, ReaderMode mode, const int arg_num_fields, const threading::Field* const* fields);
void Init(string arg_source, ReaderMode mode, const int arg_num_fields, const threading::Field* const* fields, const std::map<string, string> config);
/**
* Force an update of the current input source. Actual action depends

View file

@ -83,7 +83,7 @@ void Ascii::DoClose()
}
}
bool Ascii::DoInit(string path, ReaderMode mode, int num_fields, const Field* const* fields)
bool Ascii::DoInit(string path, ReaderMode mode, int num_fields, const Field* const* fields, const std::map<string, string> config)
{
mtime = 0;

View file

@ -38,7 +38,7 @@ public:
static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Ascii(frontend); }
protected:
virtual bool DoInit(string path, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields);
virtual bool DoInit(string path, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields, const std::map<string, string> config);
virtual void DoClose();
virtual bool DoUpdate();
virtual bool DoHeartbeat(double network_time, double current_time);

View file

@ -36,7 +36,7 @@ void Benchmark::DoClose()
{
}
bool Benchmark::DoInit(string path, ReaderMode mode, int num_fields, const Field* const* fields)
bool Benchmark::DoInit(string path, ReaderMode mode, int num_fields, const Field* const* fields, const std::map<string, string> config)
{
num_lines = atoi(path.c_str());

View file

@ -18,7 +18,7 @@ public:
static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Benchmark(frontend); }
protected:
virtual bool DoInit(string path, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields);
virtual bool DoInit(string path, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields, const std::map<string, string> config);
virtual void DoClose();
virtual bool DoUpdate();
virtual bool DoHeartbeat(double network_time, double current_time);

View file

@ -100,7 +100,7 @@ bool Raw::CloseInput()
return true;
}
bool Raw::DoInit(string path, ReaderMode mode, int num_fields, const Field* const* fields)
bool Raw::DoInit(string path, ReaderMode mode, int num_fields, const Field* const* fields, const std::map<string, string> config)
{
fname = path;
mtime = 0;

View file

@ -22,7 +22,7 @@ public:
static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Raw(frontend); }
protected:
virtual bool DoInit(string path, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields);
virtual bool DoInit(string path, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields, const std::map<string, string> config);
virtual void DoClose();
virtual bool DoUpdate();
virtual bool DoHeartbeat(double network_time, double current_time);

View file

@ -4,6 +4,8 @@ print A::description;
print A::tpe;
print A::i;
print A::b;
}, config={
}]
Input::EVENT_NEW
1
@ -14,6 +16,8 @@ print A::description;
print A::tpe;
print A::i;
print A::b;
}, config={
}]
Input::EVENT_NEW
2
@ -24,6 +28,8 @@ print A::description;
print A::tpe;
print A::i;
print A::b;
}, config={
}]
Input::EVENT_NEW
3
@ -34,6 +40,8 @@ print A::description;
print A::tpe;
print A::i;
print A::b;
}, config={
}]
Input::EVENT_NEW
4
@ -44,6 +52,8 @@ print A::description;
print A::tpe;
print A::i;
print A::b;
}, config={
}]
Input::EVENT_NEW
5
@ -54,6 +64,8 @@ print A::description;
print A::tpe;
print A::i;
print A::b;
}, config={
}]
Input::EVENT_NEW
6
@ -64,6 +76,8 @@ print A::description;
print A::tpe;
print A::i;
print A::b;
}, config={
}]
Input::EVENT_NEW
7

View file

@ -4,6 +4,8 @@ print outfile, description;
print outfile, tpe;
print outfile, s;
close(outfile);
}, config={
}]
Input::EVENT_NEW
8 ../input.log

View file

@ -3,6 +3,8 @@
print A::description;
print A::tpe;
print A::s;
}, config={
}]
Input::EVENT_NEW
sdfkh:KH;fdkncv;ISEUp34:Fkdj;YVpIODhfDF
@ -11,6 +13,8 @@ sdfkh:KH;fdkncv;ISEUp34:Fkdj;YVpIODhfDF
print A::description;
print A::tpe;
print A::s;
}, config={
}]
Input::EVENT_NEW
DSF"DFKJ"SDFKLh304yrsdkfj@#(*U$34jfDJup3UF
@ -19,6 +23,8 @@ DSF"DFKJ"SDFKLh304yrsdkfj@#(*U$34jfDJup3UF
print A::description;
print A::tpe;
print A::s;
}, config={
}]
Input::EVENT_NEW
q3r3057fdf
@ -27,6 +33,8 @@ q3r3057fdf
print A::description;
print A::tpe;
print A::s;
}, config={
}]
Input::EVENT_NEW
sdfs\d
@ -35,6 +43,8 @@ sdfs\d
print A::description;
print A::tpe;
print A::s;
}, config={
}]
Input::EVENT_NEW
@ -43,6 +53,8 @@ Input::EVENT_NEW
print A::description;
print A::tpe;
print A::s;
}, config={
}]
Input::EVENT_NEW
dfsdf
@ -51,6 +63,8 @@ dfsdf
print A::description;
print A::tpe;
print A::s;
}, config={
}]
Input::EVENT_NEW
sdf
@ -59,6 +73,8 @@ sdf
print A::description;
print A::tpe;
print A::s;
}, config={
}]
Input::EVENT_NEW
3rw43wRRERLlL#RWERERERE.

View file

@ -46,6 +46,8 @@ print A::outfile, A::typ;
print A::outfile, A::left;
print A::outfile, A::right;
return (T);
}, config={
}]
Type
Input::EVENT_NEW
@ -139,6 +141,8 @@ print A::outfile, A::typ;
print A::outfile, A::left;
print A::outfile, A::right;
return (T);
}, config={
}]
Type
Input::EVENT_NEW
@ -244,6 +248,8 @@ print A::outfile, A::typ;
print A::outfile, A::left;
print A::outfile, A::right;
return (T);
}, config={
}]
Type
Input::EVENT_CHANGED
@ -469,6 +475,8 @@ print A::outfile, A::typ;
print A::outfile, A::left;
print A::outfile, A::right;
return (T);
}, config={
}]
Type
Input::EVENT_NEW
@ -592,6 +600,8 @@ print A::outfile, A::typ;
print A::outfile, A::left;
print A::outfile, A::right;
return (T);
}, config={
}]
Type
Input::EVENT_NEW
@ -715,6 +725,8 @@ print A::outfile, A::typ;
print A::outfile, A::left;
print A::outfile, A::right;
return (T);
}, config={
}]
Type
Input::EVENT_NEW
@ -838,6 +850,8 @@ print A::outfile, A::typ;
print A::outfile, A::left;
print A::outfile, A::right;
return (T);
}, config={
}]
Type
Input::EVENT_NEW
@ -961,6 +975,8 @@ print A::outfile, A::typ;
print A::outfile, A::left;
print A::outfile, A::right;
return (T);
}, config={
}]
Type
Input::EVENT_NEW

View file

@ -3,6 +3,8 @@
print A::description;
print A::tpe;
print A::s;
}, config={
}]
Input::EVENT_NEW
sdfkh:KH;fdkncv;ISEUp34:Fkdj;YVpIODhfDF
@ -11,6 +13,8 @@ sdfkh:KH;fdkncv;ISEUp34:Fkdj;YVpIODhfDF
print A::description;
print A::tpe;
print A::s;
}, config={
}]
Input::EVENT_NEW
DSF"DFKJ"SDFKLh304yrsdkfj@#(*U$34jfDJup3UF
@ -19,6 +23,8 @@ DSF"DFKJ"SDFKLh304yrsdkfj@#(*U$34jfDJup3UF
print A::description;
print A::tpe;
print A::s;
}, config={
}]
Input::EVENT_NEW
q3r3057fdf
@ -27,6 +33,8 @@ q3r3057fdf
print A::description;
print A::tpe;
print A::s;
}, config={
}]
Input::EVENT_NEW
sdfs\d
@ -35,6 +43,8 @@ sdfs\d
print A::description;
print A::tpe;
print A::s;
}, config={
}]
Input::EVENT_NEW
@ -43,6 +53,8 @@ Input::EVENT_NEW
print A::description;
print A::tpe;
print A::s;
}, config={
}]
Input::EVENT_NEW
dfsdf
@ -51,6 +63,8 @@ dfsdf
print A::description;
print A::tpe;
print A::s;
}, config={
}]
Input::EVENT_NEW
sdf
@ -59,6 +73,8 @@ sdf
print A::description;
print A::tpe;
print A::s;
}, config={
}]
Input::EVENT_NEW
3rw43wRRERLlL#RWERERERE.
@ -67,6 +83,8 @@ Input::EVENT_NEW
print A::description;
print A::tpe;
print A::s;
}, config={
}]
Input::EVENT_NEW
sdfkh:KH;fdkncv;ISEUp34:Fkdj;YVpIODhfDF
@ -75,6 +93,8 @@ sdfkh:KH;fdkncv;ISEUp34:Fkdj;YVpIODhfDF
print A::description;
print A::tpe;
print A::s;
}, config={
}]
Input::EVENT_NEW
DSF"DFKJ"SDFKLh304yrsdkfj@#(*U$34jfDJup3UF
@ -83,6 +103,8 @@ DSF"DFKJ"SDFKLh304yrsdkfj@#(*U$34jfDJup3UF
print A::description;
print A::tpe;
print A::s;
}, config={
}]
Input::EVENT_NEW
q3r3057fdf
@ -91,6 +113,8 @@ q3r3057fdf
print A::description;
print A::tpe;
print A::s;
}, config={
}]
Input::EVENT_NEW
sdfs\d
@ -99,6 +123,8 @@ sdfs\d
print A::description;
print A::tpe;
print A::s;
}, config={
}]
Input::EVENT_NEW
@ -107,6 +133,8 @@ Input::EVENT_NEW
print A::description;
print A::tpe;
print A::s;
}, config={
}]
Input::EVENT_NEW
dfsdf
@ -115,6 +143,8 @@ dfsdf
print A::description;
print A::tpe;
print A::s;
}, config={
}]
Input::EVENT_NEW
sdf
@ -123,6 +153,8 @@ sdf
print A::description;
print A::tpe;
print A::s;
}, config={
}]
Input::EVENT_NEW
3rw43wRRERLlL#RWERERERE.

View file

@ -10,6 +10,8 @@ close(A::outfile);
Input::remove(input);
}
}, config={
}]
Input::EVENT_NEW
sdfkh:KH;fdkncv;ISEUp34:Fkdj;YVpIODhfDF
@ -25,6 +27,8 @@ close(A::outfile);
Input::remove(input);
}
}, config={
}]
Input::EVENT_NEW
DSF"DFKJ"SDFKLh304yrsdkfj@#(*U$34jfDJup3UF
@ -40,6 +44,8 @@ close(A::outfile);
Input::remove(input);
}
}, config={
}]
Input::EVENT_NEW
q3r3057fdf
@ -55,6 +61,8 @@ close(A::outfile);
Input::remove(input);
}
}, config={
}]
Input::EVENT_NEW
sdfs\d
@ -70,6 +78,8 @@ close(A::outfile);
Input::remove(input);
}
}, config={
}]
Input::EVENT_NEW
@ -85,6 +95,8 @@ close(A::outfile);
Input::remove(input);
}
}, config={
}]
Input::EVENT_NEW
dfsdf
@ -100,6 +112,8 @@ close(A::outfile);
Input::remove(input);
}
}, config={
}]
Input::EVENT_NEW
sdf
@ -115,6 +129,8 @@ close(A::outfile);
Input::remove(input);
}
}, config={
}]
Input::EVENT_NEW
3rw43wRRERLlL#RWERERERE.

View file

@ -12,7 +12,9 @@ print description;
print tpe;
print left;
print right;
}, pred=<uninitialized>]
}, pred=<uninitialized>, config={
}]
Input::EVENT_NEW
[i=1]
T
@ -30,7 +32,9 @@ print description;
print tpe;
print left;
print right;
}, pred=<uninitialized>]
}, pred=<uninitialized>, config={
}]
Input::EVENT_NEW
[i=2]
T
@ -48,7 +52,9 @@ print description;
print tpe;
print left;
print right;
}, pred=<uninitialized>]
}, pred=<uninitialized>, config={
}]
Input::EVENT_NEW
[i=3]
F
@ -66,7 +72,9 @@ print description;
print tpe;
print left;
print right;
}, pred=<uninitialized>]
}, pred=<uninitialized>, config={
}]
Input::EVENT_NEW
[i=4]
F
@ -84,7 +92,9 @@ print description;
print tpe;
print left;
print right;
}, pred=<uninitialized>]
}, pred=<uninitialized>, config={
}]
Input::EVENT_NEW
[i=5]
F
@ -102,7 +112,9 @@ print description;
print tpe;
print left;
print right;
}, pred=<uninitialized>]
}, pred=<uninitialized>, config={
}]
Input::EVENT_NEW
[i=6]
F
@ -120,7 +132,9 @@ print description;
print tpe;
print left;
print right;
}, pred=<uninitialized>]
}, pred=<uninitialized>, config={
}]
Input::EVENT_NEW
[i=7]
T