mirror of
https://github.com/zeek/zeek.git
synced 2025-10-03 15:18:20 +00:00
Fixing a rotation race condition at termination.
Noticed with DS, but could just as well happen with ASCII.
This commit is contained in:
parent
99db264775
commit
5dae925f67
3 changed files with 20 additions and 0 deletions
|
@ -7,6 +7,7 @@
|
||||||
#include "../NetVar.h"
|
#include "../NetVar.h"
|
||||||
#include "../Net.h"
|
#include "../Net.h"
|
||||||
|
|
||||||
|
#include "threading/Manager.h"
|
||||||
#include "threading/SerialTypes.h"
|
#include "threading/SerialTypes.h"
|
||||||
|
|
||||||
#include "Manager.h"
|
#include "Manager.h"
|
||||||
|
@ -124,6 +125,7 @@ Manager::Stream::~Stream()
|
||||||
|
|
||||||
Manager::Manager()
|
Manager::Manager()
|
||||||
{
|
{
|
||||||
|
rotations_pending = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
Manager::~Manager()
|
Manager::~Manager()
|
||||||
|
@ -1127,6 +1129,13 @@ bool Manager::Flush(EnumVal* id)
|
||||||
|
|
||||||
void Manager::Terminate()
|
void Manager::Terminate()
|
||||||
{
|
{
|
||||||
|
// Make sure we process all the pending rotations.
|
||||||
|
while ( rotations_pending )
|
||||||
|
{
|
||||||
|
thread_mgr->ForceProcessing(); // A blatant layering violation ...
|
||||||
|
usleep(1000);
|
||||||
|
}
|
||||||
|
|
||||||
for ( vector<Stream *>::iterator s = streams.begin(); s != streams.end(); ++s )
|
for ( vector<Stream *>::iterator s = streams.begin(); s != streams.end(); ++s )
|
||||||
{
|
{
|
||||||
if ( ! *s )
|
if ( ! *s )
|
||||||
|
@ -1235,6 +1244,8 @@ void Manager::Rotate(WriterInfo* winfo)
|
||||||
|
|
||||||
// Trigger the rotation.
|
// Trigger the rotation.
|
||||||
winfo->writer->Rotate(tmp, winfo->open_time, network_time, terminating);
|
winfo->writer->Rotate(tmp, winfo->open_time, network_time, terminating);
|
||||||
|
|
||||||
|
++rotations_pending;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Manager::FinishedRotation(WriterFrontend* writer, string new_name, string old_name,
|
bool Manager::FinishedRotation(WriterFrontend* writer, string new_name, string old_name,
|
||||||
|
@ -1243,6 +1254,8 @@ bool Manager::FinishedRotation(WriterFrontend* writer, string new_name, string o
|
||||||
DBG_LOG(DBG_LOGGING, "Finished rotating %s at %.6f, new name %s",
|
DBG_LOG(DBG_LOGGING, "Finished rotating %s at %.6f, new name %s",
|
||||||
writer->Path().c_str(), network_time, new_name.c_str());
|
writer->Path().c_str(), network_time, new_name.c_str());
|
||||||
|
|
||||||
|
--rotations_pending;
|
||||||
|
|
||||||
WriterInfo* winfo = FindWriter(writer);
|
WriterInfo* winfo = FindWriter(writer);
|
||||||
if ( ! winfo )
|
if ( ! winfo )
|
||||||
return true;
|
return true;
|
||||||
|
|
|
@ -200,6 +200,7 @@ private:
|
||||||
WriterInfo* FindWriter(WriterFrontend* writer);
|
WriterInfo* FindWriter(WriterFrontend* writer);
|
||||||
|
|
||||||
vector<Stream *> streams; // Indexed by stream enum.
|
vector<Stream *> streams; // Indexed by stream enum.
|
||||||
|
int rotations_pending; // Number of rotations not yet finished.
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -77,6 +77,12 @@ public:
|
||||||
*/
|
*/
|
||||||
int NumThreads() const { return all_threads.size(); }
|
int NumThreads() const { return all_threads.size(); }
|
||||||
|
|
||||||
|
/** Manually triggers processing of any thread input. This can be useful
|
||||||
|
* if the main thread is waiting for a specific message from a child.
|
||||||
|
* Usually, though, one should avoid using it.
|
||||||
|
*/
|
||||||
|
void ForceProcessing() { Process(); }
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
friend class BasicThread;
|
friend class BasicThread;
|
||||||
friend class MsgThread;
|
friend class MsgThread;
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue