mirror of
https://github.com/zeek/zeek.git
synced 2025-10-03 23:28:20 +00:00
Support for cleaning up threads that have terminated.
Once a BasicThread leaves its run() method, a thread is now marked for cleaning up, and the ThreadMgr will soon join it to release the OS resources. Also, adding a function Log::remove_stream() that remove a logging stream, stopping all writer threads that are associated with it. Note, however, that removing a *filter* from a stream still doesn't clean up any threads. The problem is that because of the output paths potentially being created dynamically it's unclear if the writer thread will still be needed in the future. We could add clean writers up with timeouts, but that doesn't sound great either. So for now, the only way to sure clean up logging threads is to remove the entire stream. Also note that cleanup doesn't work with input threads yet, which don't seem to terminate (at least in the case I tried).
This commit is contained in:
parent
b4824f4207
commit
38e1dc9ca4
8 changed files with 104 additions and 4 deletions
|
@ -189,6 +189,15 @@ export {
|
||||||
## .. bro:see:: Log::add_default_filter Log::remove_default_filter
|
## .. bro:see:: Log::add_default_filter Log::remove_default_filter
|
||||||
global create_stream: function(id: ID, stream: Stream) : bool;
|
global create_stream: function(id: ID, stream: Stream) : bool;
|
||||||
|
|
||||||
|
## Removes a logging stream completely, stopping all the threads.
|
||||||
|
##
|
||||||
|
## id: The ID enum to be associated with the new logging stream.
|
||||||
|
##
|
||||||
|
## Returns: True if a new stream was successfully removed.
|
||||||
|
##
|
||||||
|
## .. bro:see:: Log:create_stream
|
||||||
|
global remove_stream: function(id: ID) : bool;
|
||||||
|
|
||||||
## Enables a previously disabled logging stream. Disabled streams
|
## Enables a previously disabled logging stream. Disabled streams
|
||||||
## will not be written to until they are enabled again. New streams
|
## will not be written to until they are enabled again. New streams
|
||||||
## are enabled by default.
|
## are enabled by default.
|
||||||
|
@ -442,6 +451,13 @@ function create_stream(id: ID, stream: Stream) : bool
|
||||||
return add_default_filter(id);
|
return add_default_filter(id);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function remove_stream(id: ID) : bool
|
||||||
|
{
|
||||||
|
delete active_streams[id];
|
||||||
|
|
||||||
|
return __remove_stream(id);
|
||||||
|
}
|
||||||
|
|
||||||
function disable_stream(id: ID) : bool
|
function disable_stream(id: ID) : bool
|
||||||
{
|
{
|
||||||
delete active_streams[id];
|
delete active_streams[id];
|
||||||
|
|
|
@ -18,6 +18,12 @@ function Log::__create_stream%(id: Log::ID, stream: Log::Stream%) : bool
|
||||||
return new Val(result, TYPE_BOOL);
|
return new Val(result, TYPE_BOOL);
|
||||||
%}
|
%}
|
||||||
|
|
||||||
|
function Log::__remove_stream%(id: Log::ID%) : bool
|
||||||
|
%{
|
||||||
|
bool result = log_mgr->RemoveStream(id->AsEnumVal());
|
||||||
|
return new Val(result, TYPE_BOOL);
|
||||||
|
%}
|
||||||
|
|
||||||
function Log::__enable_stream%(id: Log::ID%) : bool
|
function Log::__enable_stream%(id: Log::ID%) : bool
|
||||||
%{
|
%{
|
||||||
bool result = log_mgr->EnableStream(id->AsEnumVal());
|
bool result = log_mgr->EnableStream(id->AsEnumVal());
|
||||||
|
|
|
@ -374,6 +374,38 @@ bool Manager::CreateStream(EnumVal* id, RecordVal* sval)
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool Manager::RemoveStream(EnumVal* id)
|
||||||
|
{
|
||||||
|
unsigned int idx = id->AsEnum();
|
||||||
|
|
||||||
|
if ( idx >= streams.size() || ! streams[idx] )
|
||||||
|
return false;
|
||||||
|
|
||||||
|
Stream* stream = streams[idx];
|
||||||
|
|
||||||
|
if ( ! stream )
|
||||||
|
return false;
|
||||||
|
|
||||||
|
for ( Stream::WriterMap::iterator i = stream->writers.begin(); i != stream->writers.end(); i++ )
|
||||||
|
{
|
||||||
|
WriterInfo* winfo = i->second;
|
||||||
|
|
||||||
|
DBG_LOG(DBG_LOGGING, "Removed writer '%s' from stream '%s'",
|
||||||
|
winfo->writer->Name(), stream->name.c_str());
|
||||||
|
|
||||||
|
winfo->writer->Stop();
|
||||||
|
delete winfo->writer;
|
||||||
|
delete winfo;
|
||||||
|
}
|
||||||
|
|
||||||
|
stream->writers.clear();
|
||||||
|
delete stream;
|
||||||
|
streams[idx] = 0;
|
||||||
|
|
||||||
|
DBG_LOG(DBG_LOGGING, "Removed logging stream '%s'", stream->name.c_str());
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
bool Manager::EnableStream(EnumVal* id)
|
bool Manager::EnableStream(EnumVal* id)
|
||||||
{
|
{
|
||||||
Stream* stream = FindStream(id);
|
Stream* stream = FindStream(id);
|
||||||
|
|
|
@ -47,6 +47,16 @@ public:
|
||||||
*/
|
*/
|
||||||
bool CreateStream(EnumVal* id, RecordVal* stream);
|
bool CreateStream(EnumVal* id, RecordVal* stream);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove a log stream, stopping all threads.
|
||||||
|
*
|
||||||
|
* @param id The enum value corresponding the log stream.
|
||||||
|
*
|
||||||
|
* This methods corresponds directly to the internal BiF defined in
|
||||||
|
* logging.bif, which just forwards here.
|
||||||
|
*/
|
||||||
|
bool RemoveStream(EnumVal* id);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Enables a log log stream.
|
* Enables a log log stream.
|
||||||
*
|
*
|
||||||
|
|
|
@ -138,6 +138,13 @@ void WriterFrontend::Stop()
|
||||||
{
|
{
|
||||||
FlushWriteBuffer();
|
FlushWriteBuffer();
|
||||||
SetDisable();
|
SetDisable();
|
||||||
|
|
||||||
|
if ( backend )
|
||||||
|
{
|
||||||
|
backend->PrepareStop();
|
||||||
|
backend->Stop();
|
||||||
|
backend = 0; // Thread manager will clean it up once it finishes.
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void WriterFrontend::Init(int arg_num_fields, const Field* const * arg_fields)
|
void WriterFrontend::Init(int arg_num_fields, const Field* const * arg_fields)
|
||||||
|
|
|
@ -54,7 +54,8 @@ public:
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Stops all output to this writer. Calling this methods disables all
|
* Stops all output to this writer. Calling this methods disables all
|
||||||
* message forwarding to the backend.
|
* message forwarding to the backend and will eventually remove the
|
||||||
|
* backend thread.
|
||||||
*
|
*
|
||||||
* This method must only be called from the main thread.
|
* This method must only be called from the main thread.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -150,11 +150,12 @@ void BasicThread::Join()
|
||||||
if ( ! started )
|
if ( ! started )
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
if ( ! pthread )
|
||||||
|
return;
|
||||||
|
|
||||||
assert(terminating);
|
assert(terminating);
|
||||||
|
|
||||||
DBG_LOG(DBG_THREADING, "Joining thread %s ...", name);
|
if ( pthread_join(pthread, 0) != 0 )
|
||||||
|
|
||||||
if ( pthread && pthread_join(pthread, 0) != 0 )
|
|
||||||
reporter->FatalError("Failure joining thread %s", name);
|
reporter->FatalError("Failure joining thread %s", name);
|
||||||
|
|
||||||
DBG_LOG(DBG_THREADING, "Joined with thread %s", name);
|
DBG_LOG(DBG_THREADING, "Joined with thread %s", name);
|
||||||
|
|
|
@ -148,6 +148,33 @@ void Manager::Process()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
all_thread_list to_delete;
|
||||||
|
|
||||||
|
for ( all_thread_list::iterator i = all_threads.begin(); i != all_threads.end(); i++ )
|
||||||
|
{
|
||||||
|
BasicThread* t = *i;
|
||||||
|
|
||||||
|
if ( ! t->Killed() )
|
||||||
|
continue;
|
||||||
|
|
||||||
|
to_delete.push_back(t);
|
||||||
|
}
|
||||||
|
|
||||||
|
for ( all_thread_list::iterator i = to_delete.begin(); i != to_delete.end(); i++ )
|
||||||
|
{
|
||||||
|
BasicThread* t = *i;
|
||||||
|
|
||||||
|
all_threads.remove(t);
|
||||||
|
|
||||||
|
MsgThread* mt = dynamic_cast<MsgThread *>(t);
|
||||||
|
|
||||||
|
if ( mt )
|
||||||
|
msg_threads.remove(mt);
|
||||||
|
|
||||||
|
t->Join();
|
||||||
|
delete t;
|
||||||
|
}
|
||||||
|
|
||||||
// fprintf(stderr, "P %.6f %.6f do_beat=%d did_process=%d next_next=%.6f\n", network_time, timer_mgr->Time(), do_beat, (int)did_process, next_beat);
|
// fprintf(stderr, "P %.6f %.6f do_beat=%d did_process=%d next_next=%.6f\n", network_time, timer_mgr->Time(), do_beat, (int)did_process, next_beat);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue