diff --git a/src/input/Manager.cc b/src/input/Manager.cc index b905958b51..e32f758620 100644 --- a/src/input/Manager.cc +++ b/src/input/Manager.cc @@ -743,7 +743,7 @@ bool Manager::RemoveStream(Stream *i) DBG_LOG(DBG_INPUT, "Successfully queued removal of stream %s", i->name.c_str()); - //i->reader->Stop(); + i->reader->Stop(); return true; } diff --git a/src/input/readers/Ascii.cc b/src/input/readers/Ascii.cc index dacebe3eb5..a2f36497ab 100644 --- a/src/input/readers/Ascii.cc +++ b/src/input/readers/Ascii.cc @@ -73,10 +73,8 @@ Ascii::Ascii(ReaderFrontend *frontend) : ReaderBackend(frontend) Ascii::~Ascii() { - /* printf("Destructor called\n"); - if ( file ) - DoClose(); - delete ascii; */ + DoClose(); + delete ascii; } void Ascii::DoClose() diff --git a/src/threading/Manager.cc b/src/threading/Manager.cc index 39a6bdce7d..1b6cb551e2 100644 --- a/src/threading/Manager.cc +++ b/src/threading/Manager.cc @@ -122,15 +122,10 @@ void Manager::Process() if ( do_beat ) t->Heartbeat(); - while ( t->HasOut() && ! t->Killed() ) + while ( t->HasOut() ) { Message* msg = t->RetrieveOut(); - - if ( ! msg ) - { - assert(t->Killed()); - break; - } + assert(msg); if ( msg->Process() ) { diff --git a/src/threading/MsgThread.cc b/src/threading/MsgThread.cc index b1d78485e0..3a68c4195f 100644 --- a/src/threading/MsgThread.cc +++ b/src/threading/MsgThread.cc @@ -333,7 +333,7 @@ void MsgThread::SendOut(BasicOutputMessage* msg, bool force) { delete msg; return; - } + } queue_out.Put(msg); diff --git a/src/threading/Queue.h b/src/threading/Queue.h index 0ddcda29f7..4ca8f9cd92 100644 --- a/src/threading/Queue.h +++ b/src/threading/Queue.h @@ -155,14 +155,11 @@ inline Queue::~Queue() template inline T Queue::Get() { - if ( (reader && reader->Killed()) || (writer && writer->Killed()) ) - return 0; - safe_lock(&mutex[read_ptr]); int old_read_ptr = read_ptr; - if ( messages[read_ptr].empty() ) + if ( messages[read_ptr].empty() && !( (reader && reader->Killed()) || (writer && writer->Killed())) ) { struct timespec ts; ts.tv_sec = time(0) + 5; @@ -172,6 +169,8 @@ inline T Queue::Get() safe_unlock(&mutex[read_ptr]); return 0; } + else if ( messages[read_ptr].empty() ) + return 0; T data = messages[read_ptr].front(); messages[read_ptr].pop();