Merge remote-tracking branch 'origin/topic/bernhard/thread-cleanup'

* origin/topic/bernhard/thread-cleanup:
  and just to be really sure - always make threads go through OnWaitForStop
  hopefully finally fix last interesting race-condition
  it is apparently getting a bit late for changes at important code...
  spoke to soon (forgot to comment in line again).
  Change thread shutdown again to also work with input framework.
  Changing semantics of thread stop methods.
  Support for cleaning up threads that have terminated.
This commit is contained in:
Robin Sommer 2013-05-15 17:16:41 -07:00
commit 639a6410c6
18 changed files with 218 additions and 95 deletions

View file

@ -189,6 +189,15 @@ export {
## .. bro:see:: Log::add_default_filter Log::remove_default_filter ## .. bro:see:: Log::add_default_filter Log::remove_default_filter
global create_stream: function(id: ID, stream: Stream) : bool; global create_stream: function(id: ID, stream: Stream) : bool;
## Removes a logging stream completely, stopping all the threads.
##
## id: The ID enum to be associated with the new logging stream.
##
## Returns: True if a new stream was successfully removed.
##
## .. bro:see:: Log:create_stream
global remove_stream: function(id: ID) : bool;
## Enables a previously disabled logging stream. Disabled streams ## Enables a previously disabled logging stream. Disabled streams
## will not be written to until they are enabled again. New streams ## will not be written to until they are enabled again. New streams
## are enabled by default. ## are enabled by default.
@ -442,6 +451,12 @@ function create_stream(id: ID, stream: Stream) : bool
return add_default_filter(id); return add_default_filter(id);
} }
function remove_stream(id: ID) : bool
{
delete active_streams[id];
return __remove_stream(id);
}
function disable_stream(id: ID) : bool function disable_stream(id: ID) : bool
{ {
delete active_streams[id]; delete active_streams[id];

View file

@ -747,6 +747,8 @@ bool Manager::RemoveStream(Stream *i)
DBG_LOG(DBG_INPUT, "Successfully queued removal of stream %s", DBG_LOG(DBG_INPUT, "Successfully queued removal of stream %s",
i->name.c_str()); i->name.c_str());
i->reader->Stop();
return true; return true;
} }

View file

@ -49,6 +49,15 @@ ReaderFrontend::ReaderFrontend(const ReaderBackend::ReaderInfo& arg_info, EnumVa
backend->Start(); backend->Start();
} }
void ReaderFrontend::Stop()
{
if ( backend )
{
backend->SignalStop();
backend = 0; // Thread manager will clean it up once it finishes.
}
}
ReaderFrontend::~ReaderFrontend() ReaderFrontend::~ReaderFrontend()
{ {
delete [] name; delete [] name;

View file

@ -75,7 +75,7 @@ public:
* the corresponding message there. This method must only be called * the corresponding message there. This method must only be called
* from the main thread. * from the main thread.
*/ */
void Close(); void Stop();
/** /**
* Disables the reader frontend. From now on, all method calls that * Disables the reader frontend. From now on, all method calls that

View file

@ -18,6 +18,12 @@ function Log::__create_stream%(id: Log::ID, stream: Log::Stream%) : bool
return new Val(result, TYPE_BOOL); return new Val(result, TYPE_BOOL);
%} %}
function Log::__remove_stream%(id: Log::ID%) : bool
%{
bool result = log_mgr->RemoveStream(id->AsEnumVal());
return new Val(result, TYPE_BOOL);
%}
function Log::__enable_stream%(id: Log::ID%) : bool function Log::__enable_stream%(id: Log::ID%) : bool
%{ %{
bool result = log_mgr->EnableStream(id->AsEnumVal()); bool result = log_mgr->EnableStream(id->AsEnumVal());

View file

@ -377,6 +377,38 @@ bool Manager::CreateStream(EnumVal* id, RecordVal* sval)
return true; return true;
} }
bool Manager::RemoveStream(EnumVal* id)
{
unsigned int idx = id->AsEnum();
if ( idx >= streams.size() || ! streams[idx] )
return false;
Stream* stream = streams[idx];
if ( ! stream )
return false;
for ( Stream::WriterMap::iterator i = stream->writers.begin(); i != stream->writers.end(); i++ )
{
WriterInfo* winfo = i->second;
DBG_LOG(DBG_LOGGING, "Removed writer '%s' from stream '%s'",
winfo->writer->Name(), stream->name.c_str());
winfo->writer->Stop();
delete winfo->writer;
delete winfo;
}
stream->writers.clear();
delete stream;
streams[idx] = 0;
DBG_LOG(DBG_LOGGING, "Removed logging stream '%s'", stream->name.c_str());
return true;
}
bool Manager::EnableStream(EnumVal* id) bool Manager::EnableStream(EnumVal* id)
{ {
Stream* stream = FindStream(id); Stream* stream = FindStream(id);
@ -1239,25 +1271,17 @@ bool Manager::Flush(EnumVal* id)
return true; return true;
} }
void Manager::Terminate() void Manager::FlushBuffers()
{ {
// Make sure we process all the pending rotations. // Flush out cached entries in Frontend
while ( rotations_pending > 0 )
{
thread_mgr->ForceProcessing(); // A blatant layering violation ...
usleep(1000);
}
if ( rotations_pending < 0 )
reporter->InternalError("Negative pending log rotations: %d", rotations_pending);
for ( vector<Stream *>::iterator s = streams.begin(); s != streams.end(); ++s ) for ( vector<Stream *>::iterator s = streams.begin(); s != streams.end(); ++s )
{ {
if ( ! *s ) if ( ! *s )
continue; continue;
Flush((*s)->id); for ( Stream::WriterMap::iterator i = (*s)->writers.begin();
i != (*s)->writers.end(); i++ )
i->second->writer->FlushWriteBuffer();
} }
} }

View file

@ -47,6 +47,16 @@ public:
*/ */
bool CreateStream(EnumVal* id, RecordVal* stream); bool CreateStream(EnumVal* id, RecordVal* stream);
/**
* Remove a log stream, stopping all threads.
*
* @param id The enum value corresponding the log stream.
*
* This methods corresponds directly to the internal BiF defined in
* logging.bif, which just forwards here.
*/
bool RemoveStream(EnumVal* id);
/** /**
* Enables a log log stream. * Enables a log log stream.
* *
@ -140,10 +150,10 @@ public:
bool Flush(EnumVal* id); bool Flush(EnumVal* id);
/** /**
* Prepares the log manager to terminate. This will flush all log * Flushes all buffers that are currently held by writer frontends
* stream. * out to the threads. Does not call the thread flush operation.
*/ */
void Terminate(); void FlushBuffers();
/** /**
* Returns a list of supported output formats. * Returns a list of supported output formats.

View file

@ -138,6 +138,12 @@ void WriterFrontend::Stop()
{ {
FlushWriteBuffer(); FlushWriteBuffer();
SetDisable(); SetDisable();
if ( backend )
{
backend->SignalStop();
backend = 0; // Thread manager will clean it up once it finishes.
}
} }
void WriterFrontend::Init(int arg_num_fields, const Field* const * arg_fields) void WriterFrontend::Init(int arg_num_fields, const Field* const * arg_fields)

View file

@ -54,7 +54,8 @@ public:
/** /**
* Stops all output to this writer. Calling this methods disables all * Stops all output to this writer. Calling this methods disables all
* message forwarding to the backend. * message forwarding to the backend and will eventually remove the
* backend thread.
* *
* This method must only be called from the main thread. * This method must only be called from the main thread.
*/ */
@ -207,7 +208,7 @@ protected:
EnumVal* stream; EnumVal* stream;
EnumVal* writer; EnumVal* writer;
WriterBackend* backend; // The backend we have instanatiated. WriterBackend* backend; // The backend we have instantiated.
bool disabled; // True if disabled. bool disabled; // True if disabled.
bool initialized; // True if initialized. bool initialized; // True if initialized.
bool buf; // True if buffering is enabled (default). bool buf; // True if buffering is enabled (default).

View file

@ -192,6 +192,8 @@ bool Ascii::DoFinish(double network_time)
abort(); abort();
} }
DoFlush(network_time);
ascii_done = true; ascii_done = true;
CloseFile(network_time); CloseFile(network_time);

View file

@ -330,7 +330,7 @@ void terminate_bro()
mgr.Drain(); mgr.Drain();
file_mgr->Terminate(); file_mgr->Terminate();
log_mgr->Terminate(); log_mgr->FlushBuffers();
thread_mgr->Terminate(); thread_mgr->Terminate();
mgr.Drain(); mgr.Drain();

View file

@ -117,20 +117,7 @@ void BasicThread::Start()
OnStart(); OnStart();
} }
void BasicThread::PrepareStop() void BasicThread::SignalStop()
{
if ( ! started )
return;
if ( terminating )
return;
DBG_LOG(DBG_THREADING, "Preparing thread %s to terminate ...", name);
OnPrepareStop();
}
void BasicThread::Stop()
{ {
if ( ! started ) if ( ! started )
return; return;
@ -140,7 +127,17 @@ void BasicThread::Stop()
DBG_LOG(DBG_THREADING, "Signaling thread %s to terminate ...", name); DBG_LOG(DBG_THREADING, "Signaling thread %s to terminate ...", name);
OnStop(); OnSignalStop();
}
void BasicThread::WaitForStop()
{
if ( ! started )
return;
DBG_LOG(DBG_THREADING, "Waiting for thread %s to terminate and process last queue items...", name);
OnWaitForStop();
terminating = true; terminating = true;
} }
@ -150,11 +147,12 @@ void BasicThread::Join()
if ( ! started ) if ( ! started )
return; return;
if ( ! pthread )
return;
assert(terminating); assert(terminating);
DBG_LOG(DBG_THREADING, "Joining thread %s ...", name); if ( pthread_join(pthread, 0) != 0 )
if ( pthread && pthread_join(pthread, 0) != 0 )
reporter->FatalError("Failure joining thread %s", name); reporter->FatalError("Failure joining thread %s", name);
DBG_LOG(DBG_THREADING, "Joined with thread %s", name); DBG_LOG(DBG_THREADING, "Joined with thread %s", name);

View file

@ -71,32 +71,33 @@ public:
void Start(); void Start();
/** /**
* Signals the thread to prepare for stopping. This must be called * Signals the thread to prepare for stopping, but doesn't block to
* before Stop() and allows the thread to trigger shutting down * wait for that to happen. Use WaitForStop() for that.
* without yet blocking for doing so. *
* The method lets Terminating() now return true, it does however not
* force the thread to terminate. It's up to the Run() method to to
* query Terminating() and exit eventually.
* *
* Calling this method has no effect if Start() hasn't been executed * Calling this method has no effect if Start() hasn't been executed
* yet. * yet.
* *
* Only Bro's main thread must call this method. * Only Bro's main thread must call this method.
*/ */
void PrepareStop(); void SignalStop();
/** /**
* Signals the thread to stop. The method lets Terminating() now * Waits until a thread has stopped after receiving SignalStop().
* return true. It does however not force the thread to terminate.
* It's up to the Run() method to to query Terminating() and exit
* eventually.
* *
* Calling this method has no effect if Start() hasn't been executed * Calling this method has no effect if Start() hasn't been executed
* yet. * yet. If this is executed without calling SignalStop() first,
* results are undefined.
* *
* Only Bro's main thread must call this method. * Only Bro's main thread must call this method.
*/ */
void Stop(); void WaitForStop();
/** /**
* Returns true if Stop() has been called. * Returns true if WaitForStop() has been called and finished.
* *
* This method is safe to call from any thread. * This method is safe to call from any thread.
*/ */
@ -145,18 +146,19 @@ protected:
virtual void OnStart() {} virtual void OnStart() {}
/** /**
* Executed with PrepareStop() (and before OnStop()). This is a hook * Executed with SignalStop(). This is a hook into preparing the
* into preparing the thread for stopping. It will be called from * thread for stopping. It will be called from Bro's main thread
* Bro's main thread before the thread has been signaled to stop. * before the thread has been signaled to stop.
*/ */
virtual void OnPrepareStop() {} virtual void OnSignalStop() {}
/** /**
* Executed with Stop() (and after OnPrepareStop()). This is a hook * Executed with WaitForStop(). This is a hook into waiting for the
* into stopping the thread. It will be called from Bro's main thread * thread to stop. It must be overridden by derived classes and only
* after the thread has been signaled to stop. * return once the thread has indeed finished processing. The method
* will be called from Bro's main thread.
*/ */
virtual void OnStop() {} virtual void OnWaitForStop() = 0;
/** /**
* Executed with Kill(). This is a hook into killing the thread. * Executed with Kill(). This is a hook into killing the thread.

View file

@ -32,10 +32,10 @@ void Manager::Terminate()
// Signal all to stop. // Signal all to stop.
for ( all_thread_list::iterator i = all_threads.begin(); i != all_threads.end(); i++ ) for ( all_thread_list::iterator i = all_threads.begin(); i != all_threads.end(); i++ )
(*i)->PrepareStop(); (*i)->SignalStop();
for ( all_thread_list::iterator i = all_threads.begin(); i != all_threads.end(); i++ ) for ( all_thread_list::iterator i = all_threads.begin(); i != all_threads.end(); i++ )
(*i)->Stop(); (*i)->WaitForStop();
// Then join them all. // Then join them all.
for ( all_thread_list::iterator i = all_threads.begin(); i != all_threads.end(); i++ ) for ( all_thread_list::iterator i = all_threads.begin(); i != all_threads.end(); i++ )
@ -122,15 +122,10 @@ void Manager::Process()
if ( do_beat ) if ( do_beat )
t->Heartbeat(); t->Heartbeat();
while ( t->HasOut() && ! t->Killed() ) while ( t->HasOut() )
{ {
Message* msg = t->RetrieveOut(); Message* msg = t->RetrieveOut();
assert(msg);
if ( ! msg )
{
assert(t->Killed());
break;
}
if ( msg->Process() ) if ( msg->Process() )
{ {
@ -141,13 +136,40 @@ void Manager::Process()
else else
{ {
reporter->Error("%s failed, terminating thread", msg->Name()); reporter->Error("%s failed, terminating thread", msg->Name());
t->Stop(); t->SignalStop();
} }
delete msg; delete msg;
} }
} }
all_thread_list to_delete;
for ( all_thread_list::iterator i = all_threads.begin(); i != all_threads.end(); i++ )
{
BasicThread* t = *i;
if ( ! t->Killed() )
continue;
to_delete.push_back(t);
}
for ( all_thread_list::iterator i = to_delete.begin(); i != to_delete.end(); i++ )
{
BasicThread* t = *i;
all_threads.remove(t);
MsgThread* mt = dynamic_cast<MsgThread *>(t);
if ( mt )
msg_threads.remove(mt);
t->Join();
delete t;
}
// fprintf(stderr, "P %.6f %.6f do_beat=%d did_process=%d next_next=%.6f\n", network_time, timer_mgr->Time(), do_beat, (int)did_process, next_beat); // fprintf(stderr, "P %.6f %.6f do_beat=%d did_process=%d next_next=%.6f\n", network_time, timer_mgr->Time(), do_beat, (int)did_process, next_beat);
} }

View file

@ -68,12 +68,6 @@ public:
*/ */
int NumThreads() const { return all_threads.size(); } int NumThreads() const { return all_threads.size(); }
/** Manually triggers processing of any thread input. This can be useful
* if the main thread is waiting for a specific message from a child.
* Usually, though, one should avoid using it.
*/
void ForceProcessing() { Process(); }
/** /**
* Signals a specific threads to terminate immediately. * Signals a specific threads to terminate immediately.
*/ */

View file

@ -30,6 +30,20 @@ private:
double network_time; double network_time;
}; };
// Signals main thread that operations shut down.
class FinishedMessage : public OutputMessage<MsgThread>
{
public:
FinishedMessage(MsgThread* thread)
: OutputMessage<MsgThread>("FinishedMessage", thread)
{ }
virtual bool Process() {
Object()->main_finished = true;
return true;
}
};
/// Sends a heartbeat to the child thread. /// Sends a heartbeat to the child thread.
class HeartbeatMessage : public InputMessage<MsgThread> class HeartbeatMessage : public InputMessage<MsgThread>
{ {
@ -153,7 +167,8 @@ bool ReporterMessage::Process()
MsgThread::MsgThread() : BasicThread(), queue_in(this, 0), queue_out(0, this) MsgThread::MsgThread() : BasicThread(), queue_in(this, 0), queue_out(0, this)
{ {
cnt_sent_in = cnt_sent_out = 0; cnt_sent_in = cnt_sent_out = 0;
finished = false; main_finished = false;
child_finished = false;
failed = false; failed = false;
thread_mgr->AddMsgThread(this); thread_mgr->AddMsgThread(this);
} }
@ -161,16 +176,16 @@ MsgThread::MsgThread() : BasicThread(), queue_in(this, 0), queue_out(0, this)
// Set by Bro's main signal handler. // Set by Bro's main signal handler.
extern int signal_val; extern int signal_val;
void MsgThread::OnPrepareStop() void MsgThread::OnSignalStop()
{ {
if ( finished || Killed() ) if ( main_finished || Killed() )
return; return;
// Signal thread to terminate and wait until it has acknowledged. // Signal thread to terminate.
SendIn(new FinishMessage(this, network_time), true); SendIn(new FinishMessage(this, network_time), true);
} }
void MsgThread::OnStop() void MsgThread::OnWaitForStop()
{ {
int signal_count = 0; int signal_count = 0;
int old_signal_val = signal_val; int old_signal_val = signal_val;
@ -180,7 +195,7 @@ void MsgThread::OnStop()
uint64_t last_size = 0; uint64_t last_size = 0;
uint64_t cur_size = 0; uint64_t cur_size = 0;
while ( ! (finished || Killed() ) ) while ( ! main_finished )
{ {
// Terminate if we get another kill signal. // Terminate if we get another kill signal.
if ( signal_val == SIGTERM || signal_val == SIGINT ) if ( signal_val == SIGTERM || signal_val == SIGINT )
@ -205,9 +220,22 @@ void MsgThread::OnStop()
signal_val = 0; signal_val = 0;
} }
queue_in.WakeUp(); if ( ! Killed() )
queue_in.WakeUp();
usleep(1000); while ( HasOut() )
{
Message* msg = RetrieveOut();
assert ( msg );
if ( ! msg->Process() )
reporter->Error("%s failed during thread termination", msg->Name());
delete msg;
}
if ( ! Killed() )
usleep(1000);
} }
signal_val = old_signal_val; signal_val = old_signal_val;
@ -237,9 +265,8 @@ void MsgThread::HeartbeatInChild()
void MsgThread::Finished() void MsgThread::Finished()
{ {
// This is thread-safe "enough", we're the only one ever writing child_finished = true;
// there. SendOut(new FinishedMessage(this));
finished = true;
} }
void MsgThread::Info(const char* msg) void MsgThread::Info(const char* msg)
@ -344,7 +371,7 @@ BasicInputMessage* MsgThread::RetrieveIn()
void MsgThread::Run() void MsgThread::Run()
{ {
while ( ! (finished || Killed() ) ) while ( ! (child_finished || Killed() ) )
{ {
BasicInputMessage* msg = RetrieveIn(); BasicInputMessage* msg = RetrieveIn();
@ -368,10 +395,10 @@ void MsgThread::Run()
} }
} }
// In case we haven't send the finish method yet, do it now. Reading // In case we haven't sent the finish method yet, do it now. Reading
// global network_time here should be fine, it isn't changing // global network_time here should be fine, it isn't changing
// anymore. // anymore.
if ( ! finished && ! Killed() ) if ( ! child_finished && ! Killed() )
{ {
OnFinish(network_time); OnFinish(network_time);
Finished(); Finished();

View file

@ -228,8 +228,8 @@ protected:
* *
*/ */
virtual void Run(); virtual void Run();
virtual void OnStop(); virtual void OnWaitForStop();
virtual void OnPrepareStop(); virtual void OnSignalStop();
virtual void OnKill(); virtual void OnKill();
private: private:
@ -289,7 +289,8 @@ private:
*/ */
bool MightHaveOut() { return queue_out.MaybeReady(); } bool MightHaveOut() { return queue_out.MaybeReady(); }
/** Flags that the child process has finished processing. Called from child. /** Sends a message to the main thread signaling that the child process
* has finished processing. Called from child.
*/ */
void Finished(); void Finished();
@ -299,7 +300,8 @@ private:
uint64_t cnt_sent_in; // Counts message sent to child. uint64_t cnt_sent_in; // Counts message sent to child.
uint64_t cnt_sent_out; // Counts message sent by child. uint64_t cnt_sent_out; // Counts message sent by child.
bool finished; // Set to true by Finished message. bool main_finished; // Main thread is finished, meaning child_finished propagated back through message queue.
bool child_finished; // Child thread is finished.
bool failed; // Set to true when a command failed. bool failed; // Set to true when a command failed.
}; };

View file

@ -155,14 +155,11 @@ inline Queue<T>::~Queue()
template<typename T> template<typename T>
inline T Queue<T>::Get() inline T Queue<T>::Get()
{ {
if ( (reader && reader->Killed()) || (writer && writer->Killed()) )
return 0;
safe_lock(&mutex[read_ptr]); safe_lock(&mutex[read_ptr]);
int old_read_ptr = read_ptr; int old_read_ptr = read_ptr;
if ( messages[read_ptr].empty() ) if ( messages[read_ptr].empty() && ! ((reader && reader->Killed()) || (writer && writer->Killed())) )
{ {
struct timespec ts; struct timespec ts;
ts.tv_sec = time(0) + 5; ts.tv_sec = time(0) + 5;
@ -173,6 +170,12 @@ inline T Queue<T>::Get()
return 0; return 0;
} }
else if ( messages[read_ptr].empty() )
{
safe_unlock(&mutex[read_ptr]);
return 0;
}
T data = messages[read_ptr].front(); T data = messages[read_ptr].front();
messages[read_ptr].pop(); messages[read_ptr].pop();