Merge remote-tracking branch 'origin/topic/robin/thread-cleanup' into topic/seth/exec-module

This commit is contained in:
Seth Hall 2013-04-02 15:12:38 -04:00
commit f2ac938603
11 changed files with 147 additions and 46 deletions

View file

@ -189,6 +189,15 @@ export {
## .. bro:see:: Log::add_default_filter Log::remove_default_filter ## .. bro:see:: Log::add_default_filter Log::remove_default_filter
global create_stream: function(id: ID, stream: Stream) : bool; global create_stream: function(id: ID, stream: Stream) : bool;
## Removes a logging stream completely, stopping all the threads.
##
## id: The ID enum to be associated with the new logging stream.
##
## Returns: True if a new stream was successfully removed.
##
## .. bro:see:: Log:create_stream
global remove_stream: function(id: ID) : bool;
## Enables a previously disabled logging stream. Disabled streams ## Enables a previously disabled logging stream. Disabled streams
## will not be written to until they are enabled again. New streams ## will not be written to until they are enabled again. New streams
## are enabled by default. ## are enabled by default.
@ -442,6 +451,13 @@ function create_stream(id: ID, stream: Stream) : bool
return add_default_filter(id); return add_default_filter(id);
} }
function remove_stream(id: ID) : bool
{
delete active_streams[id];
return __remove_stream(id);
}
function disable_stream(id: ID) : bool function disable_stream(id: ID) : bool
{ {
delete active_streams[id]; delete active_streams[id];

View file

@ -18,6 +18,12 @@ function Log::__create_stream%(id: Log::ID, stream: Log::Stream%) : bool
return new Val(result, TYPE_BOOL); return new Val(result, TYPE_BOOL);
%} %}
function Log::__remove_stream%(id: Log::ID%) : bool
%{
bool result = log_mgr->RemoveStream(id->AsEnumVal());
return new Val(result, TYPE_BOOL);
%}
function Log::__enable_stream%(id: Log::ID%) : bool function Log::__enable_stream%(id: Log::ID%) : bool
%{ %{
bool result = log_mgr->EnableStream(id->AsEnumVal()); bool result = log_mgr->EnableStream(id->AsEnumVal());

View file

@ -374,6 +374,38 @@ bool Manager::CreateStream(EnumVal* id, RecordVal* sval)
return true; return true;
} }
bool Manager::RemoveStream(EnumVal* id)
{
unsigned int idx = id->AsEnum();
if ( idx >= streams.size() || ! streams[idx] )
return false;
Stream* stream = streams[idx];
if ( ! stream )
return false;
for ( Stream::WriterMap::iterator i = stream->writers.begin(); i != stream->writers.end(); i++ )
{
WriterInfo* winfo = i->second;
DBG_LOG(DBG_LOGGING, "Removed writer '%s' from stream '%s'",
winfo->writer->Name(), stream->name.c_str());
winfo->writer->Stop();
delete winfo->writer;
delete winfo;
}
stream->writers.clear();
delete stream;
streams[idx] = 0;
DBG_LOG(DBG_LOGGING, "Removed logging stream '%s'", stream->name.c_str());
return true;
}
bool Manager::EnableStream(EnumVal* id) bool Manager::EnableStream(EnumVal* id)
{ {
Stream* stream = FindStream(id); Stream* stream = FindStream(id);

View file

@ -47,6 +47,16 @@ public:
*/ */
bool CreateStream(EnumVal* id, RecordVal* stream); bool CreateStream(EnumVal* id, RecordVal* stream);
/**
* Remove a log stream, stopping all threads.
*
* @param id The enum value corresponding the log stream.
*
* This methods corresponds directly to the internal BiF defined in
* logging.bif, which just forwards here.
*/
bool RemoveStream(EnumVal* id);
/** /**
* Enables a log log stream. * Enables a log log stream.
* *

View file

@ -138,6 +138,12 @@ void WriterFrontend::Stop()
{ {
FlushWriteBuffer(); FlushWriteBuffer();
SetDisable(); SetDisable();
if ( backend )
{
backend->SignalStop();
backend = 0; // Thread manager will clean it up once it finishes.
}
} }
void WriterFrontend::Init(int arg_num_fields, const Field* const * arg_fields) void WriterFrontend::Init(int arg_num_fields, const Field* const * arg_fields)

View file

@ -54,7 +54,8 @@ public:
/** /**
* Stops all output to this writer. Calling this methods disables all * Stops all output to this writer. Calling this methods disables all
* message forwarding to the backend. * message forwarding to the backend and will eventually remove the
* backend thread.
* *
* This method must only be called from the main thread. * This method must only be called from the main thread.
*/ */

View file

@ -117,20 +117,7 @@ void BasicThread::Start()
OnStart(); OnStart();
} }
void BasicThread::PrepareStop() void BasicThread::SignalStop()
{
if ( ! started )
return;
if ( terminating )
return;
DBG_LOG(DBG_THREADING, "Preparing thread %s to terminate ...", name);
OnPrepareStop();
}
void BasicThread::Stop()
{ {
if ( ! started ) if ( ! started )
return; return;
@ -140,7 +127,20 @@ void BasicThread::Stop()
DBG_LOG(DBG_THREADING, "Signaling thread %s to terminate ...", name); DBG_LOG(DBG_THREADING, "Signaling thread %s to terminate ...", name);
OnStop(); OnSignalStop();
}
void BasicThread::WaitForStop()
{
if ( ! started )
return;
if ( terminating )
return;
DBG_LOG(DBG_THREADING, "Waiting for thread %s to terminate ...", name);
OnWaitForStop();
terminating = true; terminating = true;
} }
@ -150,11 +150,12 @@ void BasicThread::Join()
if ( ! started ) if ( ! started )
return; return;
if ( ! pthread )
return;
assert(terminating); assert(terminating);
DBG_LOG(DBG_THREADING, "Joining thread %s ...", name); if ( pthread_join(pthread, 0) != 0 )
if ( pthread && pthread_join(pthread, 0) != 0 )
reporter->FatalError("Failure joining thread %s", name); reporter->FatalError("Failure joining thread %s", name);
DBG_LOG(DBG_THREADING, "Joined with thread %s", name); DBG_LOG(DBG_THREADING, "Joined with thread %s", name);

View file

@ -71,32 +71,33 @@ public:
void Start(); void Start();
/** /**
* Signals the thread to prepare for stopping. This must be called * Signals the thread to prepare for stopping, but doesn't block to
* before Stop() and allows the thread to trigger shutting down * wait for that to happen. Use WaitForStop() for that.
* without yet blocking for doing so.
* *
* The method lets Terminating() now return true, it does however not
* force the thread to terminate. It's up to the Run() method to to
* query Terminating() and exit eventually.
*
* Calling this method has no effect if Start() hasn't been executed * Calling this method has no effect if Start() hasn't been executed
* yet. * yet.
* *
* Only Bro's main thread must call this method. * Only Bro's main thread must call this method.
*/ */
void PrepareStop(); void SignalStop();
/** /**
* Signals the thread to stop. The method lets Terminating() now * Waits until a thread has stopped after receiving SignalStop().
* return true. It does however not force the thread to terminate.
* It's up to the Run() method to to query Terminating() and exit
* eventually.
* *
* Calling this method has no effect if Start() hasn't been executed * Calling this method has no effect if Start() hasn't been executed
* yet. * yet. If this is executed without calling SignalStop() first,
* results are undefined.
* *
* Only Bro's main thread must call this method. * Only Bro's main thread must call this method.
*/ */
void Stop(); void WaitForStop();
/** /**
* Returns true if Stop() has been called. * Returns true if WaitForStop() has been called and finished.
* *
* This method is safe to call from any thread. * This method is safe to call from any thread.
*/ */
@ -145,18 +146,19 @@ protected:
virtual void OnStart() {} virtual void OnStart() {}
/** /**
* Executed with PrepareStop() (and before OnStop()). This is a hook * Executed with SignalStop(). This is a hook into preparing the
* into preparing the thread for stopping. It will be called from * thread for stopping. It will be called from Bro's main thread
* Bro's main thread before the thread has been signaled to stop. * before the thread has been signaled to stop.
*/ */
virtual void OnPrepareStop() {} virtual void OnSignalStop() {}
/** /**
* Executed with Stop() (and after OnPrepareStop()). This is a hook * Executed with WaitForStop(). This is a hook into waiting for the
* into stopping the thread. It will be called from Bro's main thread * thread to stop. It must be overridden by derived classes and only
* after the thread has been signaled to stop. * return once the thread has indeed finished processing. The method
* will be called from Bro's main thread.
*/ */
virtual void OnStop() {} virtual void OnWaitForStop() = 0;
/** /**
* Executed with Kill(). This is a hook into killing the thread. * Executed with Kill(). This is a hook into killing the thread.

View file

@ -32,10 +32,10 @@ void Manager::Terminate()
// Signal all to stop. // Signal all to stop.
for ( all_thread_list::iterator i = all_threads.begin(); i != all_threads.end(); i++ ) for ( all_thread_list::iterator i = all_threads.begin(); i != all_threads.end(); i++ )
(*i)->PrepareStop(); (*i)->SignalStop();
for ( all_thread_list::iterator i = all_threads.begin(); i != all_threads.end(); i++ ) for ( all_thread_list::iterator i = all_threads.begin(); i != all_threads.end(); i++ )
(*i)->Stop(); (*i)->WaitForStop();
// Then join them all. // Then join them all.
for ( all_thread_list::iterator i = all_threads.begin(); i != all_threads.end(); i++ ) for ( all_thread_list::iterator i = all_threads.begin(); i != all_threads.end(); i++ )
@ -141,13 +141,40 @@ void Manager::Process()
else else
{ {
reporter->Error("%s failed, terminating thread", msg->Name()); reporter->Error("%s failed, terminating thread", msg->Name());
t->Stop(); t->SignalStop();
} }
delete msg; delete msg;
} }
} }
all_thread_list to_delete;
for ( all_thread_list::iterator i = all_threads.begin(); i != all_threads.end(); i++ )
{
BasicThread* t = *i;
if ( ! t->Killed() )
continue;
to_delete.push_back(t);
}
for ( all_thread_list::iterator i = to_delete.begin(); i != to_delete.end(); i++ )
{
BasicThread* t = *i;
all_threads.remove(t);
MsgThread* mt = dynamic_cast<MsgThread *>(t);
if ( mt )
msg_threads.remove(mt);
t->Join();
delete t;
}
// fprintf(stderr, "P %.6f %.6f do_beat=%d did_process=%d next_next=%.6f\n", network_time, timer_mgr->Time(), do_beat, (int)did_process, next_beat); // fprintf(stderr, "P %.6f %.6f do_beat=%d did_process=%d next_next=%.6f\n", network_time, timer_mgr->Time(), do_beat, (int)did_process, next_beat);
} }

View file

@ -161,16 +161,16 @@ MsgThread::MsgThread() : BasicThread(), queue_in(this, 0), queue_out(0, this)
// Set by Bro's main signal handler. // Set by Bro's main signal handler.
extern int signal_val; extern int signal_val;
void MsgThread::OnPrepareStop() void MsgThread::OnSignalStop()
{ {
if ( finished || Killed() ) if ( finished || Killed() )
return; return;
// Signal thread to terminate and wait until it has acknowledged. // Signal thread to terminate.
SendIn(new FinishMessage(this, network_time), true); SendIn(new FinishMessage(this, network_time), true);
} }
void MsgThread::OnStop() void MsgThread::OnWaitForStop()
{ {
int signal_count = 0; int signal_count = 0;
int old_signal_val = signal_val; int old_signal_val = signal_val;

View file

@ -228,8 +228,8 @@ protected:
* *
*/ */
virtual void Run(); virtual void Run();
virtual void OnStop(); virtual void OnWaitForStop();
virtual void OnPrepareStop(); virtual void OnSignalStop();
virtual void OnKill(); virtual void OnKill();
private: private: