* rework script interface, add autostart stream flag that starts up a stream automatically when first filter has been added ( probably the most common use case )

* change internal reader interface again
* remove some quite embarassing bugs that must have been in the interface for rather long
* add different read methods to script & internal interface (like normal, streaming, etc). Not implemented in ascii reader yet.
This commit is contained in:
Bernhard Amann 2012-02-16 15:03:20 -08:00
parent de149ff55e
commit 91943c2655
18 changed files with 191 additions and 43 deletions

View file

@ -23,6 +23,9 @@ export {
## Read mode to use for this stream
mode: Mode &default=default_mode;
## Automatically start the input stream after the first filter has been added
autostart: bool &default=T;
};
## TableFilter description type used for the `add_tablefilter` method.

View file

@ -214,6 +214,10 @@ ReaderFrontend* Manager::CreateStream(EnumVal* id, RecordVal* description)
}
EnumVal* reader = description->LookupWithDefault(rtype->FieldOffset("reader"))->AsEnumVal();
EnumVal* mode = description->LookupWithDefault(rtype->FieldOffset("mode"))->AsEnumVal();
Val *autostart = description->LookupWithDefault(rtype->FieldOffset("autostart"));
bool do_autostart = ( autostart->InternalInt() == 1 );
Unref(autostart); // Ref'd by LookupWithDefault
ReaderFrontend* reader_obj = new ReaderFrontend(reader->InternalInt());
assert(reader_obj);
@ -229,16 +233,7 @@ ReaderFrontend* Manager::CreateStream(EnumVal* id, RecordVal* description)
readers.push_back(info);
reader_obj->Init(source);
/* if ( success == false ) {
assert( RemoveStream(id) );
return 0;
} */
reader_obj->Update();
/* if ( success == false ) {
assert ( RemoveStream(id) );
return 0;
} */
reader_obj->Init(source, mode->InternalInt(), do_autostart);
return reader_obj;
@ -785,7 +780,7 @@ int Manager::SendEntryTable(const ReaderFrontend* reader, const int id, const Va
//reporter->Error("Hashing %d val fields", i->num_val_fields);
HashKey* valhash = 0;
if ( filter->num_val_fields > 0 )
HashValues(filter->num_val_fields, vals+filter->num_idx_fields);
valhash = HashValues(filter->num_val_fields, vals+filter->num_idx_fields);
//reporter->Error("Result: %d", (uint64_t) valhash->Hash());
@ -794,6 +789,13 @@ int Manager::SendEntryTable(const ReaderFrontend* reader, const int id, const Va
InputHash *h = filter->lastDict->Lookup(idxhash);
if ( h != 0 ) {
// seen before
valhash->Hash();
h->valhash->Hash();
if ( filter->num_val_fields == 0 || h->valhash->Hash() == valhash->Hash() ) {
// ok, exact duplicate
filter->lastDict->Remove(idxhash);

View file

@ -185,24 +185,42 @@ void ReaderBackend::SendEntry(int id, Value* *vals)
SendOut(new SendEntryMessage(frontend, id, vals));
}
bool ReaderBackend::Init(string arg_source)
bool ReaderBackend::Init(string arg_source, int mode, bool arg_autostart)
{
source = arg_source;
autostart = arg_autostart;
SetName("InputReader/"+source);
// disable if DoInit returns error.
disabled = !DoInit(arg_source);
disabled = !DoInit(arg_source, mode);
if ( disabled ) {
Error("Init failed");
DisableFrontend();
}
return !disabled;
}
bool ReaderBackend::StartReading() {
int success = DoStartReading();
if ( success == false ) {
DisableFrontend();
}
return success;
}
bool ReaderBackend::AddFilter(int id, int arg_num_fields,
const Field* const * arg_fields)
{
return DoAddFilter(id, arg_num_fields, arg_fields);
bool success = DoAddFilter(id, arg_num_fields, arg_fields);
if ( success && autostart) {
autostart = false;
return StartReading();
}
return success;
}
bool ReaderBackend::RemoveFilter(int id)
@ -230,4 +248,12 @@ void ReaderBackend::DisableFrontend()
SendOut(new DisableMessage(frontend));
}
bool ReaderBackend::DoHeartbeat(double network_time, double current_time)
{
MsgThread::DoHeartbeat(network_time, current_time);
return true;
}
}

View file

@ -51,9 +51,24 @@ public:
* @param fields An array of size \a num_fields with the log fields.
* The methods takes ownership of the array.
*
* @param mode the opening mode for the input source
*
* @param autostart automatically start the input source after the first filter has been added
*
* @return False if an error occured.
*/
bool Init(string arg_source);
bool Init(string arg_source, int mode, bool autostart);
/**
* One-time start method of the reader.
*
* This method is called from the scripting layer, after all filters have been added.
* No data should be read before this method is called.
*
* If autostart in Init is set to true, this method is called automatically by the backend after
* the first filter has been added.
*/
bool StartReading();
/**
* Add an input filter to the input stream
@ -107,7 +122,8 @@ protected:
// Methods that have to be overwritten by the individual readers
/**
* Reader-specific intialization method.
* Reader-specific intialization method. Note that data may only be read from the input source
* after the Start function has been called.
*
* A reader implementation must override this method. If it returns
* false, it will be assumed that a fatal error has occured that
@ -115,7 +131,19 @@ protected:
* disabled and eventually deleted. When returning false, an
* implementation should also call Error() to indicate what happened.
*/
virtual bool DoInit(string arg_sources) = 0;
virtual bool DoInit(string arg_sources, int mode) = 0;
/**
* Reader-specific start method. After this function has been called, data may be read from
* the input source and be sent to the specified filters
*
* A reader implementation must override this method.
* If it returns false, it will be assumed that a fatal error has occured
* that prevents the reader from further operation; it will then be
* disabled and eventually deleted. When returning false, an implementation
* should also call Error to indicate what happened.
*/
virtual bool DoStartReading() = 0;
/**
* Reader-specific method to add a filter.
@ -226,6 +254,13 @@ protected:
*/
void EndCurrentSend(int id);
/**
* Triggered by regular heartbeat messages from the main thread.
*
* This method can be overridden but once must call
* ReaderBackend::DoHeartbeat().
*/
virtual bool DoHeartbeat(double network_time, double current_time);
private:
// Frontend that instantiated us. This object must not be access from
@ -239,6 +274,7 @@ private:
// For implementing Fmt().
char* buf;
unsigned int buf_len;
bool autostart;
};
}

View file

@ -12,14 +12,16 @@ namespace input {
class InitMessage : public threading::InputMessage<ReaderBackend>
{
public:
InitMessage(ReaderBackend* backend, const string source)
InitMessage(ReaderBackend* backend, const string source, const int mode, const bool autostart)
: threading::InputMessage<ReaderBackend>("Init", backend),
source(source) { }
source(source), mode(mode), autostart(autostart) { }
virtual bool Process() { return Object()->Init(source); }
virtual bool Process() { return Object()->Init(source, mode, autostart); }
private:
const string source;
const int mode;
const bool autostart;
};
class UpdateMessage : public threading::InputMessage<ReaderBackend>
@ -42,6 +44,16 @@ public:
virtual bool Process() { Object()->Finish(); return true; }
};
class StartReadingMessage : public threading::InputMessage<ReaderBackend>
{
public:
StartReadingMessage(ReaderBackend* backend)
: threading::InputMessage<ReaderBackend>("StartReading", backend)
{ }
virtual bool Process() { Object()->StartReading(); return true; }
};
class AddFilterMessage : public threading::InputMessage<ReaderBackend>
{
public:
@ -83,17 +95,17 @@ ReaderFrontend::ReaderFrontend(bro_int_t type) {
ReaderFrontend::~ReaderFrontend() {
}
void ReaderFrontend::Init(string arg_source) {
void ReaderFrontend::Init(string arg_source, int mode, bool autostart) {
if ( disabled )
return;
if ( initialized )
reporter->InternalError("writer initialize twice");
reporter->InternalError("reader initialize twice");
source = arg_source;
initialized = true;
backend->SendIn(new InitMessage(backend, arg_source));
backend->SendIn(new InitMessage(backend, arg_source, mode, autostart));
}
void ReaderFrontend::Update() {
@ -132,6 +144,13 @@ string ReaderFrontend::Name() const
return ty_name + "/" + source;
}
void ReaderFrontend::StartReading() {
if ( disabled )
return;
backend->SendIn(new StartReadingMessage(backend));
}
}

View file

@ -49,7 +49,18 @@ public:
* See ReaderBackend::Init() for arguments.
* This method must only be called from the main thread.
*/
void Init(string arg_source);
void Init(string arg_source, int mode, bool autostart);
/**
* Start the reader.
*
* This methods starts the reader, after all necessary filters have been added.
* It is not necessary to call this function, if autostart has been set.
* If autostart has been set, the reader will be initialized automatically after the first filter has been added
*
* This method must only be called from the main thread.
*/
void StartReading();
/**
* Force an update of the current input source. Actual action depends on

View file

@ -8,10 +8,14 @@
#include "../../threading/SerializationTypes.h"
#define MANUAL 0
#define REREAD 1
using namespace input::reader;
using threading::Value;
using threading::Field;
FieldMapping::FieldMapping(const string& arg_name, const TypeTag& arg_type, int arg_position)
: name(arg_name), type(arg_type)
{
@ -75,16 +79,41 @@ void Ascii::DoFinish()
}
}
bool Ascii::DoInit(string path)
bool Ascii::DoInit(string path, int arg_mode)
{
started = false;
fname = path;
mode = arg_mode;
file = new ifstream(path.c_str());
if ( !file->is_open() ) {
Error(Fmt("cannot open %s", fname.c_str()));
Error(Fmt("Init: cannot open %s", fname.c_str()));
return false;
}
if ( ( mode != MANUAL ) && (mode != REREAD) ) {
Error(Fmt("Unsupported read mode %d for source %s", mode, path.c_str()));
return false;
}
return true;
}
bool Ascii::DoStartReading() {
if ( started == true ) {
Error("Started twice");
return false;
}
started = true;
switch ( mode ) {
case MANUAL:
DoUpdate();
break;
default:
assert(false);
}
return true;
}
@ -132,7 +161,7 @@ bool Ascii::ReadHeader() {
map<string, uint32_t> fields;
// construcr list of field names.
// construct list of field names.
istringstream splitstream(line);
int pos=0;
while ( splitstream ) {
@ -146,6 +175,7 @@ bool Ascii::ReadHeader() {
for ( map<int, Filter>::iterator it = filters.begin(); it != filters.end(); it++ ) {
(*it).second.columnMap.clear();
for ( unsigned int i = 0; i < (*it).second.num_fields; i++ ) {
const Field* field = (*it).second.fields[i];
@ -372,7 +402,6 @@ Value* Ascii::EntryToVal(string s, FieldMapping field) {
// read the entire file and send appropriate thingies back to InputMgr
bool Ascii::DoUpdate() {
// dirty, fix me. (well, apparently after trying seeking, etc - this is not that bad)
if ( file && file->is_open() ) {
file->close();
@ -418,6 +447,7 @@ bool Ascii::DoUpdate() {
fit != (*it).second.columnMap.end();
fit++ ){
if ( (*fit).position > pos || (*fit).secondary_position > pos ) {
Error(Fmt("Not enough fields in line %s. Found %d fields, want positions %d and %d", line.c_str(), pos, (*fit).position, (*fit).secondary_position));
return false;
@ -455,6 +485,7 @@ bool Ascii::DoUpdate() {
}
//file->clear(); // remove end of file evil bits
//file->seekg(0, ios::beg); // and seek to start.
@ -463,3 +494,22 @@ bool Ascii::DoUpdate() {
}
return true;
}
bool Ascii::DoHeartbeat(double network_time, double current_time)
{
ReaderBackend::DoHeartbeat(network_time, current_time);
switch ( mode ) {
case MANUAL:
// yay, we do nothing :)
break;
case REREAD:
default:
assert(false);
}
return true;
}

View file

@ -39,7 +39,7 @@ public:
protected:
virtual bool DoInit(string path);
virtual bool DoInit(string path, int mode);
virtual bool DoAddFilter( int id, int arg_num_fields, const threading::Field* const* fields );
@ -49,8 +49,12 @@ protected:
virtual bool DoUpdate();
virtual bool DoStartReading();
private:
virtual bool DoHeartbeat(double network_time, double current_time);
struct Filter {
unsigned int num_fields;
@ -84,6 +88,10 @@ private:
string unset_field;
int mode;
bool started;
};

View file

@ -106,7 +106,7 @@ void Manager::Process()
Message* msg = t->RetrieveOut();
if ( msg->Process() && network_time )
if ( msg->Process() ) //&& network_time ) // FIXME: ask robin again if he needs this. makes input interface not work in bro_init.
did_process = true;
else

View file

@ -186,9 +186,9 @@ enum ID %{
%}
enum Mode %{
MANUAL,
REREAD,
STREAM,
MANUAL = 0,
REREAD = 1,
STREAM = 2,
%}
module GLOBAL;

View file

@ -47,7 +47,6 @@ event bro_init()
# first read in the old stuff into the table...
Input::create_stream(A::INPUT, [$source="input.log"]);
Input::add_tablefilter(A::INPUT, [$name="ssh", $idx=Idx, $val=Val, $destination=servers]);
Input::force_update(A::INPUT);
Input::remove_tablefilter(A::INPUT, "ssh");
Input::remove_stream(A::INPUT);
}

View file

@ -38,5 +38,4 @@ event bro_init()
{
Input::create_stream(A::INPUT, [$source="input.log"]);
Input::add_eventfilter(A::INPUT, [$name="input", $fields=Val, $ev=line]);
Input::force_update(A::INPUT);
}

View file

@ -33,7 +33,6 @@ event bro_init()
# first read in the old stuff into the table...
Input::create_stream(A::INPUT, [$source="input.log"]);
Input::add_tablefilter(A::INPUT, [$name="input", $idx=Idx, $val=Val, $destination=servers, $want_record=F]);
Input::force_update(A::INPUT);
}
event Input::update_finished(id: Input::ID) {

View file

@ -33,7 +33,6 @@ event bro_init()
# first read in the old stuff into the table...
Input::create_stream(A::INPUT, [$source="input.log"]);
Input::add_tablefilter(A::INPUT, [$name="input", $idx=Idx, $val=Val, $destination=servers]);
Input::force_update(A::INPUT);
}
event Input::update_finished(id: Input::ID) {

View file

@ -32,7 +32,6 @@ event bro_init()
# first read in the old stuff into the table...
Input::create_stream(A::INPUT, [$source="input.log"]);
Input::add_tablefilter(A::INPUT, [$name="input", $idx=Idx, $val=Val, $destination=servers]);
Input::force_update(A::INPUT);
print servers[1.2.3.4];
print servers[1.2.3.5];
print servers[1.2.3.6];

View file

@ -41,7 +41,6 @@ event bro_init()
Input::add_tablefilter(A::INPUT, [$name="input", $idx=Idx, $val=Val, $destination=servers, $want_record=F,
$pred(typ: Input::Event, left: Idx, right: bool) = { return right; }
]);
Input::force_update(A::INPUT);
}
event Input::update_finished(id: Input::ID) {

View file

@ -44,5 +44,4 @@ event bro_init()
{
Input::create_stream(A::LOG, [$source="input.log"]);
Input::add_tablefilter(A::LOG, [$name="input", $idx=Idx, $val=Val, $destination=destination, $want_record=F,$ev=line]);
Input::force_update(A::LOG);
}

View file

@ -40,7 +40,7 @@ global done: bool = F;
event bro_init()
{
# first read in the old stuff into the table...
Input::create_stream(A::INPUT, [$source="input.log"]);
Input::create_stream(A::INPUT, [$source="input.log", $autostart=F]);
Input::add_tablefilter(A::INPUT, [$name="input", $idx=Idx, $val=Val, $destination=destination1, $want_record=F,
$pred(typ: Input::Event, left: Idx, right: bool) = { return right; }
]);