Change thread shutdown again to also work with input framework.

Seems to work, tests pass, but not really verified.

Major change 1:
finished flag in MsgThread was replaced by 2 flags:
child_finished and main_finished.

child_finished is set by child_thread and means that the processing
loop is stopped immediately (no longer needed, no new input messages
will be processed, if loop continues running there is an ugly delay
on shutdown). (This took me a while to realize...)

main_finished is set by a message that is sent back by the child
to the main thread when Finished() is called (and child_finished
is set). when main_finished is set, processing of output messages
stops. But all messages that the child thread pushed in the queue
before calling Finish() are still processed.

Change 2:
Logging terminate call was replaced by a smaller call that just
flushes out the cache held by the main thread. This call
has to be done before thread shutdown is called - otherwhise
the threads will be shut down before all messages are pushed
on them. (This also took me a while to realize...).

Change 3:
Input framework actually calls it stop methods correctly (everything
was prepared, function call was missing)
This commit is contained in:
Bernhard Amann 2013-05-14 23:45:55 -07:00
parent bb1e2f57b9
commit 39f1b9e01f
12 changed files with 67 additions and 39 deletions

View file

@ -742,6 +742,8 @@ bool Manager::RemoveStream(Stream *i)
DBG_LOG(DBG_INPUT, "Successfully queued removal of stream %s",
i->name.c_str());
//i->reader->Stop();
return true;
}

View file

@ -49,6 +49,15 @@ ReaderFrontend::ReaderFrontend(const ReaderBackend::ReaderInfo& arg_info, EnumVa
backend->Start();
}
void ReaderFrontend::Stop()
{
if ( backend )
{
backend->SignalStop();
backend = 0; // Thread manager will clean it up once it finishes.
}
}
ReaderFrontend::~ReaderFrontend()
{
delete [] name;

View file

@ -67,7 +67,7 @@ public:
* This method must only be called from the main thread.
*/
void Update();
/**
* Finalizes reading from this stream.
*
@ -75,7 +75,7 @@ public:
* the corresponding message there. This method must only be called
* from the main thread.
*/
void Close();
void Stop();
/**
* Disables the reader frontend. From now on, all method calls that

View file

@ -73,8 +73,10 @@ Ascii::Ascii(ReaderFrontend *frontend) : ReaderBackend(frontend)
Ascii::~Ascii()
{
DoClose();
delete ascii;
/* printf("Destructor called\n");
if ( file )
DoClose();
delete ascii; */
}
void Ascii::DoClose()

View file

@ -1268,25 +1268,18 @@ bool Manager::Flush(EnumVal* id)
return true;
}
void Manager::Terminate()
void Manager::FlushBuffers()
{
// Make sure we process all the pending rotations.
while ( rotations_pending > 0 )
{
thread_mgr->ForceProcessing(); // A blatant layering violation ...
usleep(1000);
}
if ( rotations_pending < 0 )
reporter->InternalError("Negative pending log rotations: %d", rotations_pending);
// Flush out cached entries in Frontend
for ( vector<Stream *>::iterator s = streams.begin(); s != streams.end(); ++s )
{
if ( ! *s )
continue;
Flush((*s)->id);
for ( Stream::WriterMap::iterator i = (*s)->writers.begin();
i != (*s)->writers.end(); i++ )
i->second->writer->FlushWriteBuffer();
}
}

View file

@ -150,10 +150,10 @@ public:
bool Flush(EnumVal* id);
/**
* Prepares the log manager to terminate. This will flush all log
* stream.
* Flushes all buffers that are currently held by writer frontends
* out to the threads. Does not call the thread flush operation.
*/
void Terminate();
void FlushBuffers();
/**
* Returns a list of supported output formats.

View file

@ -208,7 +208,7 @@ protected:
EnumVal* stream;
EnumVal* writer;
WriterBackend* backend; // The backend we have instanatiated.
WriterBackend* backend; // The backend we have instantiated.
bool disabled; // True if disabled.
bool initialized; // True if initialized.
bool buf; // True if buffering is enabled (default).

View file

@ -192,6 +192,8 @@ bool Ascii::DoFinish(double network_time)
abort();
}
DoFlush(network_time);
ascii_done = true;
CloseFile(network_time);

View file

@ -319,7 +319,7 @@ void terminate_bro()
mgr.Drain();
log_mgr->Terminate();
log_mgr->FlushBuffers();
thread_mgr->Terminate();
mgr.Drain();

View file

@ -68,12 +68,6 @@ public:
*/
int NumThreads() const { return all_threads.size(); }
/** Manually triggers processing of any thread input. This can be useful
* if the main thread is waiting for a specific message from a child.
* Usually, though, one should avoid using it.
*/
void ForceProcessing() { Process(); }
/**
* Signals a specific threads to terminate immediately.
*/

View file

@ -30,6 +30,21 @@ private:
double network_time;
};
// Signals main thread that operations shut down
class FinishedMessage : public OutputMessage<MsgThread>
{
public:
FinishedMessage(MsgThread* thread)
: OutputMessage<MsgThread>("FinishedMessage", thread)
{ }
virtual bool Process() {
Object()->main_finished = true;
return true;
}
};
/// Sends a heartbeat to the child thread.
class HeartbeatMessage : public InputMessage<MsgThread>
{
@ -153,7 +168,8 @@ bool ReporterMessage::Process()
MsgThread::MsgThread() : BasicThread(), queue_in(this, 0), queue_out(0, this)
{
cnt_sent_in = cnt_sent_out = 0;
finished = false;
main_finished = false;
child_finished = false;
failed = false;
thread_mgr->AddMsgThread(this);
}
@ -163,7 +179,7 @@ extern int signal_val;
void MsgThread::OnSignalStop()
{
if ( finished || Killed() )
if ( main_finished || Killed() )
return;
// Signal thread to terminate.
@ -180,7 +196,7 @@ void MsgThread::OnWaitForStop()
uint64_t last_size = 0;
uint64_t cur_size = 0;
while ( ! (finished || Killed() ) )
while ( ! (main_finished || Killed() ) )
{
// Terminate if we get another kill signal.
if ( signal_val == SIGTERM || signal_val == SIGINT )
@ -206,6 +222,15 @@ void MsgThread::OnWaitForStop()
}
queue_in.WakeUp();
while ( HasOut() && !Killed() )
{
Message* msg = RetrieveOut();
assert ( msg );
if ( !msg->Process() )
reporter->Error("%s failed during thread termination", msg->Name());
delete msg;
}
usleep(1000);
}
@ -237,9 +262,8 @@ void MsgThread::HeartbeatInChild()
void MsgThread::Finished()
{
// This is thread-safe "enough", we're the only one ever writing
// there.
finished = true;
child_finished = true;
SendOut(new FinishedMessage(this));
}
void MsgThread::Info(const char* msg)
@ -344,7 +368,7 @@ BasicInputMessage* MsgThread::RetrieveIn()
void MsgThread::Run()
{
while ( ! (finished || Killed() ) )
while ( ! (child_finished || Killed() ) )
{
BasicInputMessage* msg = RetrieveIn();
@ -368,10 +392,10 @@ void MsgThread::Run()
}
}
// In case we haven't send the finish method yet, do it now. Reading
// In case we haven't sent the finish method yet, do it now. Reading
// global network_time here should be fine, it isn't changing
// anymore.
if ( ! finished && ! Killed() )
if ( ! child_finished && ! Killed() )
{
OnFinish(network_time);
Finished();

View file

@ -289,7 +289,8 @@ private:
*/
bool MightHaveOut() { return queue_out.MaybeReady(); }
/** Flags that the child process has finished processing. Called from child.
/** Sends a message to the main thread signaling that the child process
* has finished processing. Called from child.
*/
void Finished();
@ -299,7 +300,8 @@ private:
uint64_t cnt_sent_in; // Counts message sent to child.
uint64_t cnt_sent_out; // Counts message sent by child.
bool finished; // Set to true by Finished message.
bool main_finished; // main thread is finished. Means child_finished propagated back through message queue.
bool child_finished; // child thread is finished
bool failed; // Set to true when a command failed.
};