automatically delete disabled input streams

This commit is contained in:
Bernhard Amann 2012-06-07 14:12:42 -07:00
parent 18f07d3a46
commit 852de4700c
4 changed files with 39 additions and 11 deletions

View file

@ -689,16 +689,14 @@ bool Manager::IsCompatibleType(BroType* t, bool atomic_only)
} }
bool Manager::RemoveStream(const string &name) bool Manager::RemoveStream(Stream *i)
{ {
Stream *i = FindStream(name);
if ( i == 0 ) if ( i == 0 )
return false; // not found return false; // not found
if ( i->removed ) if ( i->removed )
{ {
reporter->Error("Stream %s is already queued for removal. Ignoring remove.", name.c_str()); reporter->Error("Stream %s is already queued for removal. Ignoring remove.", i->name.c_str());
return false; return false;
} }
@ -708,12 +706,24 @@ bool Manager::RemoveStream(const string &name)
#ifdef DEBUG #ifdef DEBUG
DBG_LOG(DBG_INPUT, "Successfully queued removal of stream %s", DBG_LOG(DBG_INPUT, "Successfully queued removal of stream %s",
name.c_str()); i->name.c_str());
#endif #endif
return true; return true;
} }
bool Manager::RemoveStream(ReaderFrontend* frontend)
{
return RemoveStream(FindStream(frontend));
}
bool Manager::RemoveStream(const string &name)
{
return RemoveStream(FindStream(name));
}
bool Manager::RemoveStreamContinuation(ReaderFrontend* reader) bool Manager::RemoveStreamContinuation(ReaderFrontend* reader)
{ {
Stream *i = FindStream(reader); Stream *i = FindStream(reader);

View file

@ -72,7 +72,7 @@ public:
/** /**
* Deletes an existing input stream. * Deletes an existing input stream.
* *
* @param id The enum value corresponding the input stream. * @param id The name of the input stream to be removed.
* *
* This method corresponds directly to the internal BiF defined in * This method corresponds directly to the internal BiF defined in
* input.bif, which just forwards here. * input.bif, which just forwards here.
@ -88,6 +88,7 @@ protected:
friend class SendEntryMessage; friend class SendEntryMessage;
friend class EndCurrentSendMessage; friend class EndCurrentSendMessage;
friend class ReaderClosedMessage; friend class ReaderClosedMessage;
friend class DisableMessage;
// For readers to write to input stream in direct mode (reporting // For readers to write to input stream in direct mode (reporting
// new/deleted values directly). Functions take ownership of // new/deleted values directly). Functions take ownership of
@ -119,11 +120,25 @@ protected:
// stream is still received. // stream is still received.
bool RemoveStreamContinuation(ReaderFrontend* reader); bool RemoveStreamContinuation(ReaderFrontend* reader);
/**
* Deletes an existing input stream.
*
* @param frontend pointer to the frontend of the input stream to be removed.
*
* This method is used by the reader backends to remove a reader when it fails
* for some reason.
*/
bool RemoveStream(ReaderFrontend* frontend);
private: private:
class Stream; class Stream;
class TableStream; class TableStream;
class EventStream; class EventStream;
// actual RemoveStream implementation -- the function public
// and protected function definitions are wrappers around this function.
bool RemoveStream(Stream* i);
bool CreateStream(Stream*, RecordVal* description); bool CreateStream(Stream*, RecordVal* description);
// SendEntry implementation for Table stream. // SendEntry implementation for Table stream.

View file

@ -113,6 +113,7 @@ public:
virtual bool Process() virtual bool Process()
{ {
Object()->SetDisable();
return input_mgr->RemoveStreamContinuation(Object()); return input_mgr->RemoveStreamContinuation(Object());
} }
@ -129,6 +130,11 @@ public:
virtual bool Process() virtual bool Process()
{ {
Object()->SetDisable(); Object()->SetDisable();
// and - because we do not need disabled objects any more - there is no way to re-enable them
// simply delete them.
// This avoids the problem of having to periodically check if there are any disabled readers
// out there. As soon as a reader disables itself, it deletes itself.
input_mgr->RemoveStream(Object());
return true; return true;
} }
}; };
@ -203,8 +209,7 @@ bool ReaderBackend::Init(string arg_source, ReaderMode arg_mode, const int arg_n
void ReaderBackend::Close() void ReaderBackend::Close()
{ {
DoClose(); DoClose();
disabled = true; disabled = true; // frontend disables itself when it gets the Close-message.
DisableFrontend();
SendOut(new ReaderClosedMessage(frontend)); SendOut(new ReaderClosedMessage(frontend));
if ( fields != 0 ) if ( fields != 0 )

View file

@ -6,9 +6,6 @@
#include "threading/MsgThread.h" #include "threading/MsgThread.h"
// FIXME: cleanup of disabled inputreaders is missing. we need this, because
// stuff can e.g. fail in init and might never be removed afterwards.
namespace input { namespace input {
class InitMessage : public threading::InputMessage<ReaderBackend> class InitMessage : public threading::InputMessage<ReaderBackend>
@ -106,6 +103,7 @@ void ReaderFrontend::Close()
return; return;
} }
disabled = true;
backend->SendIn(new CloseMessage(backend)); backend->SendIn(new CloseMessage(backend));
} }