zeek/src/input/ReaderFrontend.cc
Bernhard Amann 57ffe1be77 completely change interface again.
compiles, not really tested.

basic test works 70% of the time, coredumps in the other 30 - but was not easy to debug on a first glance (most interestingly the crash happens in the logging framework - I wonder how that works).
Other tests are not adjusted to the new interface yet.
2012-03-15 18:41:51 -07:00

107 lines
2.5 KiB
C++

// See the file "COPYING" in the main distribution directory for copyright.
#include "Manager.h"
#include "ReaderFrontend.h"
#include "ReaderBackend.h"
#include "threading/MsgThread.h"
// FIXME: cleanup of disabled inputreaders is missing. we need this, because stuff can e.g. fail in init and might never be removed afterwards.
namespace input {
class InitMessage : public threading::InputMessage<ReaderBackend>
{
public:
InitMessage(ReaderBackend* backend, const string source, const int mode, const int num_fields, const threading::Field* const* fields)
: threading::InputMessage<ReaderBackend>("Init", backend),
source(source), mode(mode), num_fields(num_fields), fields(fields) { }
virtual bool Process() { return Object()->Init(source, mode, num_fields, fields); }
private:
const string source;
const int mode;
const int num_fields;
const threading::Field* const* fields;
};
class UpdateMessage : public threading::InputMessage<ReaderBackend>
{
public:
UpdateMessage(ReaderBackend* backend)
: threading::InputMessage<ReaderBackend>("Update", backend)
{ }
virtual bool Process() { return Object()->Update(); }
};
class FinishMessage : public threading::InputMessage<ReaderBackend>
{
public:
FinishMessage(ReaderBackend* backend)
: threading::InputMessage<ReaderBackend>("Finish", backend)
{ }
virtual bool Process() { Object()->Finish(); return true; }
};
ReaderFrontend::ReaderFrontend(bro_int_t type) {
disabled = initialized = false;
ty_name = "<not set>";
backend = input_mgr->CreateBackend(this, type);
assert(backend);
backend->Start();
}
ReaderFrontend::~ReaderFrontend() {
}
void ReaderFrontend::Init(string arg_source, int mode, const int num_fields, const threading::Field* const* fields) {
if ( disabled )
return;
if ( initialized )
reporter->InternalError("reader initialize twice");
source = arg_source;
initialized = true;
backend->SendIn(new InitMessage(backend, arg_source, mode, num_fields, fields));
}
void ReaderFrontend::Update() {
if ( disabled )
return;
if ( !initialized ) {
reporter->Error("Tried to call update on uninitialized reader");
return;
}
backend->SendIn(new UpdateMessage(backend));
}
void ReaderFrontend::Finish() {
if ( disabled )
return;
if ( !initialized ) {
reporter->Error("Tried to call finish on uninitialized reader");
return;
}
backend->SendIn(new FinishMessage(backend));
}
string ReaderFrontend::Name() const
{
if ( source.size() )
return ty_name;
return ty_name + "/" + source;
}
}