mirror of
https://github.com/zeek/zeek.git
synced 2025-10-06 08:38:20 +00:00
spoke to soon (forgot to comment in line again).
Now it should work. However - this commit changes a basic assumption of the threading queue. This basic assumption is, that nothing can be read out of the out-queue of a dead thread. I think that reading out of the queue of a dead thread makes perfect sense (when the thread shuts down, pushes the rest of its work on the queue and says bye, and wants the main thread to pick it up afterwards) - however, I guess one can be of a differing opinion here. In any case, it makes stuff a bit easier to understand - in my opinion. It took me a while to find out why the messages disappear in thin air and never arrive in the main thread ;)
This commit is contained in:
parent
39f1b9e01f
commit
b947394990
5 changed files with 9 additions and 17 deletions
|
@ -743,7 +743,7 @@ bool Manager::RemoveStream(Stream *i)
|
|||
DBG_LOG(DBG_INPUT, "Successfully queued removal of stream %s",
|
||||
i->name.c_str());
|
||||
|
||||
//i->reader->Stop();
|
||||
i->reader->Stop();
|
||||
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -73,10 +73,8 @@ Ascii::Ascii(ReaderFrontend *frontend) : ReaderBackend(frontend)
|
|||
|
||||
Ascii::~Ascii()
|
||||
{
|
||||
/* printf("Destructor called\n");
|
||||
if ( file )
|
||||
DoClose();
|
||||
delete ascii; */
|
||||
delete ascii;
|
||||
}
|
||||
|
||||
void Ascii::DoClose()
|
||||
|
|
|
@ -122,15 +122,10 @@ void Manager::Process()
|
|||
if ( do_beat )
|
||||
t->Heartbeat();
|
||||
|
||||
while ( t->HasOut() && ! t->Killed() )
|
||||
while ( t->HasOut() )
|
||||
{
|
||||
Message* msg = t->RetrieveOut();
|
||||
|
||||
if ( ! msg )
|
||||
{
|
||||
assert(t->Killed());
|
||||
break;
|
||||
}
|
||||
assert(msg);
|
||||
|
||||
if ( msg->Process() )
|
||||
{
|
||||
|
|
|
@ -155,14 +155,11 @@ inline Queue<T>::~Queue()
|
|||
template<typename T>
|
||||
inline T Queue<T>::Get()
|
||||
{
|
||||
if ( (reader && reader->Killed()) || (writer && writer->Killed()) )
|
||||
return 0;
|
||||
|
||||
safe_lock(&mutex[read_ptr]);
|
||||
|
||||
int old_read_ptr = read_ptr;
|
||||
|
||||
if ( messages[read_ptr].empty() )
|
||||
if ( messages[read_ptr].empty() && !( (reader && reader->Killed()) || (writer && writer->Killed())) )
|
||||
{
|
||||
struct timespec ts;
|
||||
ts.tv_sec = time(0) + 5;
|
||||
|
@ -172,6 +169,8 @@ inline T Queue<T>::Get()
|
|||
safe_unlock(&mutex[read_ptr]);
|
||||
return 0;
|
||||
}
|
||||
else if ( messages[read_ptr].empty() )
|
||||
return 0;
|
||||
|
||||
T data = messages[read_ptr].front();
|
||||
messages[read_ptr].pop();
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue