Bugfixes.

- Data queued at termination wasn't written out completely.

    - Fixed some race conditions.

    - Fixing IOSource integration.

    - Fixing setting thread names on Linux.

    - Fixing minor leaks.

All tests now pass for me on Linux in debug and non-debug compiles.

Remaining TODOs:

        - Needs leak check.

        - Test on MacOS and FreeBSD.

        - More testing:
            - High volume traffic.
            - Different platforms.
This commit is contained in:
Robin Sommer 2012-02-12 09:41:43 -08:00
parent abb506ec63
commit b8ec653ebf
12 changed files with 100 additions and 30 deletions

View file

@ -1470,6 +1470,15 @@ bool Manager::Flush(EnumVal* id)
return true; return true;
} }
void Manager::Terminate()
{
for ( vector<Stream *>::iterator s = streams.begin(); s != streams.end(); ++s )
{
if ( *s )
Flush((*s)->id);
}
}
void Manager::Error(WriterFrontend* writer, const char* msg) void Manager::Error(WriterFrontend* writer, const char* msg)
{ {
reporter->Error("error with writer for %s: %s", reporter->Error("error with writer for %s: %s",

View file

@ -251,6 +251,12 @@ public:
*/ */
bool Flush(EnumVal* id); bool Flush(EnumVal* id);
/**
* Prepares the log manager to terminate. This will flush all log
* stream.
*/
void Terminate();
protected: protected:
friend class WriterFrontend; friend class WriterFrontend;
friend class RotationFinishedMessage; friend class RotationFinishedMessage;
@ -258,7 +264,7 @@ protected:
friend class ::RotationTimer; friend class ::RotationTimer;
// Instantiates a new WriterBackend of the given type (note that // Instantiates a new WriterBackend of the given type (note that
// doing so creates a new thread!). // doing so creates a new thread!).
WriterBackend* CreateBackend(WriterFrontend* frontend, bro_int_t type); WriterBackend* CreateBackend(WriterFrontend* frontend, bro_int_t type);
//// Function also used by the RemoteSerializer. //// Function also used by the RemoteSerializer.

View file

@ -57,7 +57,7 @@ using namespace logging;
WriterBackend::WriterBackend(WriterFrontend* arg_frontend) : MsgThread() WriterBackend::WriterBackend(WriterFrontend* arg_frontend) : MsgThread()
{ {
path = "<not set>"; path = "<path not yet set>";
num_fields = 0; num_fields = 0;
fields = 0; fields = 0;
buffering = true; buffering = true;
@ -109,7 +109,9 @@ bool WriterBackend::Init(string arg_path, int arg_num_fields, const Field* const
num_fields = arg_num_fields; num_fields = arg_num_fields;
fields = arg_fields; fields = arg_fields;
SetName(frontend->Name()); string name = Fmt("%s/%s", path.c_str(), frontend->Name().c_str());
SetName(name);
if ( ! DoInit(arg_path, arg_num_fields, arg_fields) ) if ( ! DoInit(arg_path, arg_num_fields, arg_fields) )
{ {
@ -229,6 +231,8 @@ bool WriterBackend::Finish()
bool WriterBackend::DoHeartbeat(double network_time, double current_time) bool WriterBackend::DoHeartbeat(double network_time, double current_time)
{ {
MsgThread::DoHeartbeat(network_time, current_time);
SendOut(new FlushWriteBufferMessage(frontend)); SendOut(new FlushWriteBufferMessage(frontend));
return true; return true;

View file

@ -1,4 +1,6 @@
#include "Net.h"
#include "WriterFrontend.h" #include "WriterFrontend.h"
#include "WriterBackend.h" #include "WriterBackend.h"
@ -155,8 +157,8 @@ void WriterFrontend::Write(int num_fields, Value** vals)
write_buffer[write_buffer_pos++] = vals; write_buffer[write_buffer_pos++] = vals;
if ( write_buffer_pos >= WRITER_BUFFER_SIZE || ! buf ) if ( write_buffer_pos >= WRITER_BUFFER_SIZE || ! buf || terminating )
// Buffer full (or no bufferin desired). // Buffer full (or no bufferin desired or termiating).
FlushWriteBuffer(); FlushWriteBuffer();
} }

View file

@ -30,9 +30,6 @@ public:
* frontend will internally instantiate a WriterBackend of the * frontend will internally instantiate a WriterBackend of the
* corresponding type. * corresponding type.
* *
* name: A descriptive name for the backend wroter type (e.g., \c
* Ascii).
*
* Frontends must only be instantiated by the main thread. * Frontends must only be instantiated by the main thread.
*/ */
WriterFrontend(bro_int_t type); WriterFrontend(bro_int_t type);

View file

@ -171,14 +171,34 @@ bool Ascii::DoWriteOne(ODesc* desc, Value* val, const Field* field)
break; break;
case TYPE_SUBNET: case TYPE_SUBNET:
desc->Add(dotted_addr(val->val.subnet_val.net)); {
// FIXME: This will be replaced with string(addr) once the
// IPV6 branch is merged in.
uint32_t addr = ntohl(val->val.subnet_val.net);
char buf[32];
snprintf(buf, sizeof(buf), "%d.%d.%d.%d",
addr >> 24, (addr >> 16) & 0xff,
(addr >> 8) & 0xff, addr & 0xff);
desc->Add(buf);
desc->Add("/"); desc->Add("/");
desc->Add(val->val.subnet_val.width); desc->Add(val->val.subnet_val.width);
break; break;
}
case TYPE_ADDR: case TYPE_ADDR:
desc->Add(dotted_addr(val->val.addr_val)); {
// FIXME: This will be replaced with string(addr) once the
// IPV6 branch is merged in.
uint32_t addr = ntohl(*val->val.addr_val);
char buf[32];
snprintf(buf, sizeof(buf), "%d.%d.%d.%d",
addr >> 24, (addr >> 16) & 0xff,
(addr >> 8) & 0xff, addr & 0xff);
desc->Add(buf);
break; break;
}
case TYPE_TIME: case TYPE_TIME:
case TYPE_INTERVAL: case TYPE_INTERVAL:

View file

@ -290,6 +290,7 @@ void terminate_bro()
if ( remote_serializer ) if ( remote_serializer )
remote_serializer->LogStats(); remote_serializer->LogStats();
log_mgr->Terminate();
thread_mgr->Terminate(); thread_mgr->Terminate();
delete timer_mgr; delete timer_mgr;

View file

@ -2,9 +2,14 @@
#include <sys/signal.h> #include <sys/signal.h>
#include <signal.h> #include <signal.h>
#include "config.h"
#include "BasicThread.h" #include "BasicThread.h"
#include "Manager.h" #include "Manager.h"
#ifdef HAVE_LINUX
#include <sys/prctl.h>
#endif
using namespace threading; using namespace threading;
uint64_t BasicThread::thread_counter = 0; uint64_t BasicThread::thread_counter = 0;
@ -25,6 +30,8 @@ BasicThread::BasicThread()
BasicThread::~BasicThread() BasicThread::~BasicThread()
{ {
if ( buf )
free(buf);
} }
void BasicThread::SetName(const string& arg_name) void BasicThread::SetName(const string& arg_name)
@ -35,8 +42,8 @@ void BasicThread::SetName(const string& arg_name)
void BasicThread::SetOSName(const string& name) void BasicThread::SetOSName(const string& name)
{ {
#ifdef LINUX #ifdef HAVE_LINUX
pthread_setname_np(pthread_self(), name.c_str()); prctl(PR_SET_NAME, name.c_str(), 0, 0, 0);
#endif #endif
#ifdef __APPLE__ #ifdef __APPLE__

View file

@ -82,7 +82,7 @@ public:
void Stop(); void Stop();
/** /**
* Returns true if Terminate() has been called. * Returns true if Stop() has been called.
* *
* This method is safe to call from any thread. * This method is safe to call from any thread.
*/ */

View file

@ -7,9 +7,10 @@ Manager::Manager()
{ {
DBG_LOG(DBG_THREADING, "Creating thread manager ..."); DBG_LOG(DBG_THREADING, "Creating thread manager ...");
did_process = false; did_process = true;
next_beat = 0; next_beat = 0;
terminating = false; terminating = false;
idle = false;
} }
Manager::~Manager() Manager::~Manager()
@ -41,6 +42,7 @@ void Manager::Terminate()
all_threads.clear(); all_threads.clear();
msg_threads.clear(); msg_threads.clear();
idle = true;
terminating = false; terminating = false;
} }
@ -70,18 +72,22 @@ void Manager::GetFds(int* read, int* write, int* except)
double Manager::NextTimestamp(double* network_time) double Manager::NextTimestamp(double* network_time)
{ {
if ( did_process || ! next_beat == 0 ) if ( ::network_time && ! next_beat )
// If we had something to process last time (or haven't had a next_beat = ::network_time + HEART_BEAT_INTERVAL;
// chance to check yet), we want to check for more asap.
// fprintf(stderr, "N %.6f %.6f did_process=%d next_next=%.6f\n", ::network_time, timer_mgr->Time(), (int)did_process, next_beat);
if ( did_process || ::network_time > next_beat )
// If we had something to process last time (or out heartbeat
// is due), we want to check for more asap.
return timer_mgr->Time(); return timer_mgr->Time();
// Else we assume we don't have much to do at all and wait for the next heart beat. return -1.0;
return next_beat;
} }
void Manager::Process() void Manager::Process()
{ {
bool do_beat = (next_beat == 0 || network_time >= next_beat); bool do_beat = (next_beat && network_time > next_beat);
did_process = false; did_process = false;
@ -90,14 +96,17 @@ void Manager::Process()
MsgThread* t = *i; MsgThread* t = *i;
if ( do_beat ) if ( do_beat )
{
t->Heartbeat(); t->Heartbeat();
next_beat = 0;
}
if ( ! t->HasOut() ) if ( ! t->HasOut() )
continue; continue;
Message* msg = t->RetrieveOut(); Message* msg = t->RetrieveOut();
if ( msg->Process() ) if ( msg->Process() && network_time )
did_process = true; did_process = true;
else else
@ -110,15 +119,14 @@ void Manager::Process()
delete msg; delete msg;
} }
if ( do_beat ) // fprintf(stderr, "P %.6f %.6f do_beat=%d did_process=%d next_next=%.6f\n", network_time, timer_mgr->Time(), do_beat, (int)did_process, next_beat);
next_beat = network_time + HEART_BEAT_INTERVAL;
} }
const threading::Manager::msg_stats_list& threading::Manager::GetMsgThreadStats() const threading::Manager::msg_stats_list& threading::Manager::GetMsgThreadStats()
{ {
stats.clear(); stats.clear();
for ( msg_thread_list::iterator i = msg_threads.begin(); i != msg_threads.end(); i++ ) for ( msg_thread_list::iterator i = msg_threads.begin(); i != msg_threads.end(); i++ )
{ {
MsgThread* t = *i; MsgThread* t = *i;

View file

@ -142,12 +142,19 @@ void MsgThread::OnStop()
void MsgThread::Heartbeat() void MsgThread::Heartbeat()
{ {
SendIn(new HeartbeatMessage(this, network_time, current_time())); SendIn(new HeartbeatMessage(this, network_time, current_time()));
}
string name = Fmt("%s (%d/%d)", name.c_str(), bool MsgThread::DoHeartbeat(double network_time, double current_time)
cnt_sent_in - queue_in.Size(), {
cnt_sent_out - queue_out.Size()); string n = Name();
SetOSName(name.c_str()); n = Fmt("bro: %s (%" PRIu64 "/%" PRIu64 ")", n.c_str(),
cnt_sent_in - queue_in.Size(),
cnt_sent_out - queue_out.Size());
SetOSName(n.c_str());
return true;
} }
void MsgThread::Info(const char* msg) void MsgThread::Info(const char* msg)
@ -197,7 +204,10 @@ void MsgThread::Debug(DebugStream stream, const char* msg)
void MsgThread::SendIn(BasicInputMessage* msg, bool force) void MsgThread::SendIn(BasicInputMessage* msg, bool force)
{ {
if ( Terminating() && ! force ) if ( Terminating() && ! force )
{
delete msg;
return; return;
}
DBG_LOG(DBG_THREADING, "Sending '%s' to %s ...", msg->Name().c_str(), Name().c_str()); DBG_LOG(DBG_THREADING, "Sending '%s' to %s ...", msg->Name().c_str(), Name().c_str());
@ -209,7 +219,10 @@ void MsgThread::SendIn(BasicInputMessage* msg, bool force)
void MsgThread::SendOut(BasicOutputMessage* msg, bool force) void MsgThread::SendOut(BasicOutputMessage* msg, bool force)
{ {
if ( Terminating() && ! force ) if ( Terminating() && ! force )
{
delete msg;
return; return;
}
queue_out.Put(msg); queue_out.Put(msg);

View file

@ -184,7 +184,10 @@ protected:
* This is method is called regularly by the threading::Manager. * This is method is called regularly by the threading::Manager.
* *
* Can be overriden in derived classed to hook into the heart beat, * Can be overriden in derived classed to hook into the heart beat,
* but must call the parent implementation. * but must call the parent implementation. Note that this method is
* always called by the main thread and must not access data of the
* child thread directly. See DoHeartbeat() if you want to do
* something on the child-side.
*/ */
virtual void Heartbeat(); virtual void Heartbeat();
@ -206,7 +209,7 @@ protected:
* current_time: Wall clock when the heartbeat was trigger by the * current_time: Wall clock when the heartbeat was trigger by the
* main thread. * main thread.
*/ */
virtual bool DoHeartbeat(double network_time, double current_time) { return true; } virtual bool DoHeartbeat(double network_time, double current_time);
private: private:
/** /**