make filter removal and stream closure asynchronous.

This commit is contained in:
Bernhard Amann 2012-02-15 15:14:04 -08:00
parent 88233efb2c
commit a850cc5992
6 changed files with 200 additions and 78 deletions

View file

@ -534,7 +534,6 @@ bool Manager::RemoveStream(const EnumVal* id) {
if ( (*s)->id == id ) if ( (*s)->id == id )
{ {
i = (*s); i = (*s);
readers.erase(s); // remove from vector
break; break;
} }
} }
@ -545,11 +544,29 @@ bool Manager::RemoveStream(const EnumVal* id) {
i->reader->Finish(); i->reader->Finish();
delete(i);
return true; return true;
} }
bool Manager::RemoveStreamContinuation(const ReaderFrontend* reader) {
ReaderInfo *i = 0;
for ( vector<ReaderInfo *>::iterator s = readers.begin(); s != readers.end(); ++s )
{
if ( (*s)->reader && (*s)->reader == reader )
{
i = *s;
delete(i);
readers.erase(s);
return true;
}
}
reporter->Error("Stream not found in RemoveStreamContinuation");
return false;
}
bool Manager::UnrollRecordType(vector<Field*> *fields, const RecordType *rec, const string& nameprepend) { bool Manager::UnrollRecordType(vector<Field*> *fields, const RecordType *rec, const string& nameprepend) {
for ( int i = 0; i < rec->NumFields(); i++ ) for ( int i = 0; i < rec->NumFields(); i++ )
{ {
@ -615,20 +632,51 @@ bool Manager::RemoveTableFilter(EnumVal* id, const string &name) {
return false; return false;
} }
map<int, Manager::Filter*>::iterator it = i->filters.find(id->InternalInt()); bool found = false;
if ( it == i->filters.end() ) { int filterId;
for ( map<int, Manager::Filter*>::iterator it = i->filters.begin(); it != i->filters.end(); ++it ) {
if ( (*it).second->name == name ) {
found = true;
filterId = (*it).first;
if ( (*it).second->filter_type != TABLE_FILTER ) {
reporter->Error("Trying to remove filter %s of wrong type", name.c_str());
return false;
}
break;
}
}
if ( !found ) {
reporter->Error("Trying to remove nonexisting filter %s", name.c_str());
return false; return false;
} }
if ( i->filters[id->InternalInt()]->filter_type != TABLE_FILTER ) { i->reader->RemoveFilter(filterId);
// wrong type;
return true;
}
bool Manager::RemoveFilterContinuation(const ReaderFrontend* reader, const int filterId) {
ReaderInfo *i = FindReader(reader);
if ( i == 0 ) {
reporter->Error("Reader not found");
return false;
}
map<int, Manager::Filter*>::iterator it = i->filters.find(filterId);
if ( it == i->filters.end() ) {
reporter->Error("Got RemoveFilterContinuation where filter nonexistant for %d", filterId);
return false; return false;
} }
delete (*it).second; delete (*it).second;
i->filters.erase(it); i->filters.erase(it);
return true; return true;
} }
bool Manager::RemoveEventFilter(EnumVal* id, const string &name) { bool Manager::RemoveEventFilter(EnumVal* id, const string &name) {
ReaderInfo *i = FindReader(id); ReaderInfo *i = FindReader(id);

View file

@ -32,13 +32,14 @@ public:
protected: protected:
friend class ReaderFrontend; friend class ReaderFrontend;
friend class ErrorMessage;
friend class PutMessage; friend class PutMessage;
friend class DeleteMessage; friend class DeleteMessage;
friend class ClearMessage; friend class ClearMessage;
friend class SendEventMessage; friend class SendEventMessage;
friend class SendEntryMessage; friend class SendEntryMessage;
friend class EndCurrentSendMessage; friend class EndCurrentSendMessage;
friend class FilterRemovedMessage;
friend class ReaderFinishedMessage;
// Reports an error for the given reader. // Reports an error for the given reader.
void Error(ReaderFrontend* reader, const char* msg); void Error(ReaderFrontend* reader, const char* msg);
@ -56,6 +57,8 @@ protected:
ReaderBackend* CreateBackend(ReaderFrontend* frontend, bro_int_t type); ReaderBackend* CreateBackend(ReaderFrontend* frontend, bro_int_t type);
bool RemoveFilterContinuation(const ReaderFrontend* reader, const int filterId);
bool RemoveStreamContinuation(const ReaderFrontend* reader);
private: private:
struct ReaderInfo; struct ReaderInfo;

View file

@ -9,21 +9,6 @@ using threading::Field;
namespace input { namespace input {
class ErrorMessage : public threading::OutputMessage<ReaderFrontend> {
public:
ErrorMessage(ReaderFrontend* reader, string message)
: threading::OutputMessage<ReaderFrontend>("Error", reader),
message(message) {}
virtual bool Process() {
input_mgr->Error(Object(), message.c_str());
return true;
}
private:
string message;
};
class PutMessage : public threading::OutputMessage<ReaderFrontend> { class PutMessage : public threading::OutputMessage<ReaderFrontend> {
public: public:
PutMessage(ReaderFrontend* reader, int id, Value* *val) PutMessage(ReaderFrontend* reader, int id, Value* *val)
@ -104,7 +89,7 @@ private:
class EndCurrentSendMessage : public threading::OutputMessage<ReaderFrontend> { class EndCurrentSendMessage : public threading::OutputMessage<ReaderFrontend> {
public: public:
EndCurrentSendMessage(ReaderFrontend* reader, int id) EndCurrentSendMessage(ReaderFrontend* reader, const int id)
: threading::OutputMessage<ReaderFrontend>("EndCurrentSend", reader), : threading::OutputMessage<ReaderFrontend>("EndCurrentSend", reader),
id(id) {} id(id) {}
@ -114,9 +99,46 @@ public:
} }
private: private:
int id; const int id;
}; };
class FilterRemovedMessage : public threading::OutputMessage<ReaderFrontend> {
public:
FilterRemovedMessage(ReaderFrontend* reader, const int id)
: threading::OutputMessage<ReaderFrontend>("FilterRemoved", reader),
id(id) {}
virtual bool Process() {
return input_mgr->RemoveFilterContinuation(Object(), id);
}
private:
const int id;
};
class ReaderFinishedMessage : public threading::OutputMessage<ReaderFrontend> {
public:
ReaderFinishedMessage(ReaderFrontend* reader)
: threading::OutputMessage<ReaderFrontend>("ReaderFinished", reader) {}
virtual bool Process() {
return input_mgr->RemoveStreamContinuation(Object());
}
private:
};
class DisableMessage : public threading::OutputMessage<ReaderFrontend>
{
public:
DisableMessage(ReaderFrontend* writer)
: threading::OutputMessage<ReaderFrontend>("Disable", writer) {}
virtual bool Process() { Object()->SetDisable(); return true; }
};
ReaderBackend::ReaderBackend(ReaderFrontend* arg_frontend) : MsgThread() ReaderBackend::ReaderBackend(ReaderFrontend* arg_frontend) : MsgThread()
{ {
buf = 0; buf = 0;
@ -133,18 +155,6 @@ ReaderBackend::~ReaderBackend()
} }
void ReaderBackend::Error(const string &msg)
{
SendOut(new ErrorMessage(frontend, msg));
}
/*
void ReaderBackend::Error(const char *msg)
{
SendOut(new ErrorMessage(frontend, string(msg)));
} */
void ReaderBackend::Put(int id, Value* *val) void ReaderBackend::Put(int id, Value* *val)
{ {
SendOut(new PutMessage(frontend, id, val)); SendOut(new PutMessage(frontend, id, val));
@ -181,6 +191,11 @@ bool ReaderBackend::Init(string arg_source)
// disable if DoInit returns error. // disable if DoInit returns error.
disabled = !DoInit(arg_source); disabled = !DoInit(arg_source);
if ( disabled ) {
DisableFrontend();
}
return !disabled; return !disabled;
} }
@ -192,13 +207,17 @@ bool ReaderBackend::AddFilter(int id, int arg_num_fields,
bool ReaderBackend::RemoveFilter(int id) bool ReaderBackend::RemoveFilter(int id)
{ {
return DoRemoveFilter(id); bool success = DoRemoveFilter(id);
SendOut(new FilterRemovedMessage(frontend, id));
return success; // yes, I know, noone reads this.
} }
void ReaderBackend::Finish() void ReaderBackend::Finish()
{ {
DoFinish(); DoFinish();
disabled = true; disabled = true;
DisableFrontend();
SendOut(new ReaderFinishedMessage(frontend));
} }
bool ReaderBackend::Update() bool ReaderBackend::Update()
@ -206,32 +225,9 @@ bool ReaderBackend::Update()
return DoUpdate(); return DoUpdate();
} }
void ReaderBackend::DisableFrontend()
// stolen from logwriter {
const char* ReaderBackend::Fmt(const char* format, ...) SendOut(new DisableMessage(frontend));
{ }
if ( ! buf )
buf = (char*) malloc(buf_len);
va_list al;
va_start(al, format);
int n = safe_vsnprintf(buf, buf_len, format, al);
va_end(al);
if ( (unsigned int) n >= buf_len )
{ // Not enough room, grow the buffer.
buf_len = n + 32;
buf = (char*) realloc(buf, buf_len);
// Is it portable to restart?
va_start(al, format);
n = safe_vsnprintf(buf, buf_len, format, al);
va_end(al);
}
return buf;
}
} }

View file

@ -26,6 +26,12 @@ public:
void Finish(); void Finish();
bool Update(); bool Update();
/**
* Disables the frontend that has instantiated this backend. Once
* disabled,the frontend will not send any further message over.
*/
void DisableFrontend();
protected: protected:
// Methods that have to be overwritten by the individual readers // Methods that have to be overwritten by the individual readers
@ -40,16 +46,9 @@ protected:
// update file contents to logmgr // update file contents to logmgr
virtual bool DoUpdate() = 0; virtual bool DoUpdate() = 0;
// Reports an error to the user.
void Error(const string &msg);
//void Error(const char *msg);
// The following methods return the information as passed to Init(). // The following methods return the information as passed to Init().
const string Source() const { return source; } const string Source() const { return source; }
// A thread-safe version of fmt(). (stolen from logwriter)
const char* Fmt(const char* format, ...);
void SendEvent(const string& name, const int num_vals, threading::Value* *vals); void SendEvent(const string& name, const int num_vals, threading::Value* *vals);
// Content-sendinf-functions (simple mode). Including table-specific stuff that simply is not used if we have no table // Content-sendinf-functions (simple mode). Including table-specific stuff that simply is not used if we have no table

View file

@ -5,8 +5,9 @@
#include "ReaderBackend.h" #include "ReaderBackend.h"
#include "threading/MsgThread.h" #include "threading/MsgThread.h"
namespace input { // FIXME: cleanup of disabled inputreaders is missing. we need this, because stuff can e.g. fail in init and might never be removed afterwards.
namespace input {
class InitMessage : public threading::InputMessage<ReaderBackend> class InitMessage : public threading::InputMessage<ReaderBackend>
{ {
@ -56,6 +57,19 @@ private:
const threading::Field* const* fields; const threading::Field* const* fields;
}; };
class RemoveFilterMessage : public threading::InputMessage<ReaderBackend>
{
public:
RemoveFilterMessage(ReaderBackend* backend, const int id)
: threading::InputMessage<ReaderBackend>("RemoveFilter", backend),
id(id) { }
virtual bool Process() { return Object()->RemoveFilter(id); }
private:
const int id;
};
ReaderFrontend::ReaderFrontend(bro_int_t type) { ReaderFrontend::ReaderFrontend(bro_int_t type) {
disabled = initialized = false; disabled = initialized = false;
@ -103,15 +117,20 @@ void ReaderFrontend::AddFilter(const int id, const int arg_num_fields, const thr
backend->SendIn(new AddFilterMessage(backend, id, arg_num_fields, fields)); backend->SendIn(new AddFilterMessage(backend, id, arg_num_fields, fields));
} }
void ReaderFrontend::RemoveFilter(const int id) {
if ( disabled )
return;
backend->SendIn(new RemoveFilterMessage(backend, id));
}
string ReaderFrontend::Name() const string ReaderFrontend::Name() const
{ {
if ( source.size() ) if ( source.size() )
return ty_name; return ty_name;
return ty_name + "/" + source; return ty_name + "/" + source;
} }
} }

View file

@ -10,20 +10,77 @@ namespace input {
class Manager; class Manager;
/**
* Bridge class between the input::Manager and backend input threads. The
* Manager instantiates one \a ReaderFrontend for each open input stream.
* Each frontend in turns instantiates a ReaderBackend-derived class
* internally that's specific to the particular input format. That backend
* spawns a new thread, and it receives messages from the frontend that
* correspond to method called by the manager.
*/
class ReaderFrontend { class ReaderFrontend {
public: public:
/**
* Constructor.
*
* type: The backend writer type, with the value corresponding to the
* script-level \c Input::Reader enum (e.g., \a READER_ASCII). The
* frontend will internally instantiate a ReaderBackend of the
* corresponding type.
*
* Frontends must only be instantiated by the main thread.
*/
ReaderFrontend(bro_int_t type); ReaderFrontend(bro_int_t type);
/**
* Destructor.
*
* Frontends must only be destroyed by the main thread.
*/
virtual ~ReaderFrontend(); virtual ~ReaderFrontend();
/**
* Initializes the reader.
*
* This method generates a message to the backend reader and triggers
* the corresponding message there. If the backend method fails, it
* sends a message back that will asynchronously call Disable().
*
* See ReaderBackend::Init() for arguments.
* This method must only be called from the main thread.
*/
void Init(string arg_source); void Init(string arg_source);
void Update(); void Update();
/* * The method takes
* ownership of \a fields. */
void AddFilter( const int id, const int arg_num_fields, const threading::Field* const* fields ); void AddFilter( const int id, const int arg_num_fields, const threading::Field* const* fields );
void RemoveFilter ( const int id );
void Finish(); void Finish();
/**
* Disables the reader frontend. From now on, all method calls that
* would normally send message over to the backend, turn into no-ops.
* Note though that it does not stop the backend itself, use Finsh()
* to do that as well (this method is primarily for use as callback
* when the backend wants to disable the frontend).
*
* Disabled frontend will eventually be discarded by the
* input::Manager.
*
* This method must only be called from the main thread.
*/
void SetDisable() { disabled = true; }
/**
* Returns true if the reader frontend has been disabled with SetDisable().
*/
bool Disabled() { return disabled; }
/** /**
* Returns a descriptive name for the reader, including the type of * Returns a descriptive name for the reader, including the type of
* the backend and the source used. * the backend and the source used.