diff --git a/src/logging/Manager.cc b/src/logging/Manager.cc index 6d53ea363f..593766e52a 100644 --- a/src/logging/Manager.cc +++ b/src/logging/Manager.cc @@ -1470,6 +1470,15 @@ bool Manager::Flush(EnumVal* id) return true; } +void Manager::Terminate() + { + for ( vector::iterator s = streams.begin(); s != streams.end(); ++s ) + { + if ( *s ) + Flush((*s)->id); + } + } + void Manager::Error(WriterFrontend* writer, const char* msg) { reporter->Error("error with writer for %s: %s", diff --git a/src/logging/Manager.h b/src/logging/Manager.h index f6829b3554..d12fc7e8fe 100644 --- a/src/logging/Manager.h +++ b/src/logging/Manager.h @@ -251,6 +251,12 @@ public: */ bool Flush(EnumVal* id); + /** + * Prepares the log manager to terminate. This will flush all log + * stream. + */ + void Terminate(); + protected: friend class WriterFrontend; friend class RotationFinishedMessage; @@ -258,7 +264,7 @@ protected: friend class ::RotationTimer; // Instantiates a new WriterBackend of the given type (note that - // doing so creates a new thread!). + // doing so creates a new thread!). WriterBackend* CreateBackend(WriterFrontend* frontend, bro_int_t type); //// Function also used by the RemoteSerializer. diff --git a/src/logging/WriterBackend.cc b/src/logging/WriterBackend.cc index 4d2e497b14..fa86fce324 100644 --- a/src/logging/WriterBackend.cc +++ b/src/logging/WriterBackend.cc @@ -57,7 +57,7 @@ using namespace logging; WriterBackend::WriterBackend(WriterFrontend* arg_frontend) : MsgThread() { - path = ""; + path = ""; num_fields = 0; fields = 0; buffering = true; @@ -109,7 +109,9 @@ bool WriterBackend::Init(string arg_path, int arg_num_fields, const Field* const num_fields = arg_num_fields; fields = arg_fields; - SetName(frontend->Name()); + string name = Fmt("%s/%s", path.c_str(), frontend->Name().c_str()); + + SetName(name); if ( ! DoInit(arg_path, arg_num_fields, arg_fields) ) { @@ -229,6 +231,8 @@ bool WriterBackend::Finish() bool WriterBackend::DoHeartbeat(double network_time, double current_time) { + MsgThread::DoHeartbeat(network_time, current_time); + SendOut(new FlushWriteBufferMessage(frontend)); return true; diff --git a/src/logging/WriterFrontend.cc b/src/logging/WriterFrontend.cc index 79278870f9..0a8ff4b09d 100644 --- a/src/logging/WriterFrontend.cc +++ b/src/logging/WriterFrontend.cc @@ -1,4 +1,6 @@ +#include "Net.h" + #include "WriterFrontend.h" #include "WriterBackend.h" @@ -155,8 +157,8 @@ void WriterFrontend::Write(int num_fields, Value** vals) write_buffer[write_buffer_pos++] = vals; - if ( write_buffer_pos >= WRITER_BUFFER_SIZE || ! buf ) - // Buffer full (or no bufferin desired). + if ( write_buffer_pos >= WRITER_BUFFER_SIZE || ! buf || terminating ) + // Buffer full (or no bufferin desired or termiating). FlushWriteBuffer(); } diff --git a/src/logging/WriterFrontend.h b/src/logging/WriterFrontend.h index e0bc590dfc..a1a1e2b86a 100644 --- a/src/logging/WriterFrontend.h +++ b/src/logging/WriterFrontend.h @@ -30,9 +30,6 @@ public: * frontend will internally instantiate a WriterBackend of the * corresponding type. * - * name: A descriptive name for the backend wroter type (e.g., \c - * Ascii). - * * Frontends must only be instantiated by the main thread. */ WriterFrontend(bro_int_t type); diff --git a/src/logging/writers/Ascii.cc b/src/logging/writers/Ascii.cc index 7cc8459e68..5429bf0b97 100644 --- a/src/logging/writers/Ascii.cc +++ b/src/logging/writers/Ascii.cc @@ -171,14 +171,34 @@ bool Ascii::DoWriteOne(ODesc* desc, Value* val, const Field* field) break; case TYPE_SUBNET: - desc->Add(dotted_addr(val->val.subnet_val.net)); + { + // FIXME: This will be replaced with string(addr) once the + // IPV6 branch is merged in. + uint32_t addr = ntohl(val->val.subnet_val.net); + char buf[32]; + snprintf(buf, sizeof(buf), "%d.%d.%d.%d", + addr >> 24, (addr >> 16) & 0xff, + (addr >> 8) & 0xff, addr & 0xff); + + desc->Add(buf); desc->Add("/"); desc->Add(val->val.subnet_val.width); break; + } case TYPE_ADDR: - desc->Add(dotted_addr(val->val.addr_val)); + { + // FIXME: This will be replaced with string(addr) once the + // IPV6 branch is merged in. + uint32_t addr = ntohl(*val->val.addr_val); + char buf[32]; + snprintf(buf, sizeof(buf), "%d.%d.%d.%d", + addr >> 24, (addr >> 16) & 0xff, + (addr >> 8) & 0xff, addr & 0xff); + + desc->Add(buf); break; + } case TYPE_TIME: case TYPE_INTERVAL: diff --git a/src/main.cc b/src/main.cc index e224910db4..c101e54e74 100644 --- a/src/main.cc +++ b/src/main.cc @@ -290,6 +290,7 @@ void terminate_bro() if ( remote_serializer ) remote_serializer->LogStats(); + log_mgr->Terminate(); thread_mgr->Terminate(); delete timer_mgr; diff --git a/src/threading/BasicThread.cc b/src/threading/BasicThread.cc index 4d51c3c4e4..51c4f7a3bc 100644 --- a/src/threading/BasicThread.cc +++ b/src/threading/BasicThread.cc @@ -2,9 +2,14 @@ #include #include +#include "config.h" #include "BasicThread.h" #include "Manager.h" +#ifdef HAVE_LINUX +#include +#endif + using namespace threading; uint64_t BasicThread::thread_counter = 0; @@ -25,6 +30,8 @@ BasicThread::BasicThread() BasicThread::~BasicThread() { + if ( buf ) + free(buf); } void BasicThread::SetName(const string& arg_name) @@ -35,8 +42,8 @@ void BasicThread::SetName(const string& arg_name) void BasicThread::SetOSName(const string& name) { -#ifdef LINUX - pthread_setname_np(pthread_self(), name.c_str()); +#ifdef HAVE_LINUX + prctl(PR_SET_NAME, name.c_str(), 0, 0, 0); #endif #ifdef __APPLE__ diff --git a/src/threading/BasicThread.h b/src/threading/BasicThread.h index 6d2f739620..cc87ae03bc 100644 --- a/src/threading/BasicThread.h +++ b/src/threading/BasicThread.h @@ -82,7 +82,7 @@ public: void Stop(); /** - * Returns true if Terminate() has been called. + * Returns true if Stop() has been called. * * This method is safe to call from any thread. */ diff --git a/src/threading/Manager.cc b/src/threading/Manager.cc index d07311bbe8..d008d2e5e8 100644 --- a/src/threading/Manager.cc +++ b/src/threading/Manager.cc @@ -7,9 +7,10 @@ Manager::Manager() { DBG_LOG(DBG_THREADING, "Creating thread manager ..."); - did_process = false; + did_process = true; next_beat = 0; terminating = false; + idle = false; } Manager::~Manager() @@ -41,6 +42,7 @@ void Manager::Terminate() all_threads.clear(); msg_threads.clear(); + idle = true; terminating = false; } @@ -70,18 +72,22 @@ void Manager::GetFds(int* read, int* write, int* except) double Manager::NextTimestamp(double* network_time) { - if ( did_process || ! next_beat == 0 ) - // If we had something to process last time (or haven't had a - // chance to check yet), we want to check for more asap. + if ( ::network_time && ! next_beat ) + next_beat = ::network_time + HEART_BEAT_INTERVAL; + +// fprintf(stderr, "N %.6f %.6f did_process=%d next_next=%.6f\n", ::network_time, timer_mgr->Time(), (int)did_process, next_beat); + + if ( did_process || ::network_time > next_beat ) + // If we had something to process last time (or out heartbeat + // is due), we want to check for more asap. return timer_mgr->Time(); - // Else we assume we don't have much to do at all and wait for the next heart beat. - return next_beat; + return -1.0; } void Manager::Process() { - bool do_beat = (next_beat == 0 || network_time >= next_beat); + bool do_beat = (next_beat && network_time > next_beat); did_process = false; @@ -90,14 +96,17 @@ void Manager::Process() MsgThread* t = *i; if ( do_beat ) + { t->Heartbeat(); + next_beat = 0; + } if ( ! t->HasOut() ) continue; Message* msg = t->RetrieveOut(); - if ( msg->Process() ) + if ( msg->Process() && network_time ) did_process = true; else @@ -110,15 +119,14 @@ void Manager::Process() delete msg; } - if ( do_beat ) - next_beat = network_time + HEART_BEAT_INTERVAL; +// fprintf(stderr, "P %.6f %.6f do_beat=%d did_process=%d next_next=%.6f\n", network_time, timer_mgr->Time(), do_beat, (int)did_process, next_beat); } const threading::Manager::msg_stats_list& threading::Manager::GetMsgThreadStats() { stats.clear(); - for ( msg_thread_list::iterator i = msg_threads.begin(); i != msg_threads.end(); i++ ) + for ( msg_thread_list::iterator i = msg_threads.begin(); i != msg_threads.end(); i++ ) { MsgThread* t = *i; diff --git a/src/threading/MsgThread.cc b/src/threading/MsgThread.cc index f41b20ddf9..b7782b9a05 100644 --- a/src/threading/MsgThread.cc +++ b/src/threading/MsgThread.cc @@ -142,12 +142,19 @@ void MsgThread::OnStop() void MsgThread::Heartbeat() { SendIn(new HeartbeatMessage(this, network_time, current_time())); + } - string name = Fmt("%s (%d/%d)", name.c_str(), - cnt_sent_in - queue_in.Size(), - cnt_sent_out - queue_out.Size()); +bool MsgThread::DoHeartbeat(double network_time, double current_time) + { + string n = Name(); - SetOSName(name.c_str()); + n = Fmt("bro: %s (%" PRIu64 "/%" PRIu64 ")", n.c_str(), + cnt_sent_in - queue_in.Size(), + cnt_sent_out - queue_out.Size()); + + SetOSName(n.c_str()); + + return true; } void MsgThread::Info(const char* msg) @@ -197,7 +204,10 @@ void MsgThread::Debug(DebugStream stream, const char* msg) void MsgThread::SendIn(BasicInputMessage* msg, bool force) { if ( Terminating() && ! force ) + { + delete msg; return; + } DBG_LOG(DBG_THREADING, "Sending '%s' to %s ...", msg->Name().c_str(), Name().c_str()); @@ -209,7 +219,10 @@ void MsgThread::SendIn(BasicInputMessage* msg, bool force) void MsgThread::SendOut(BasicOutputMessage* msg, bool force) { if ( Terminating() && ! force ) + { + delete msg; return; + } queue_out.Put(msg); diff --git a/src/threading/MsgThread.h b/src/threading/MsgThread.h index 459ac6c603..28c7690dfa 100644 --- a/src/threading/MsgThread.h +++ b/src/threading/MsgThread.h @@ -184,7 +184,10 @@ protected: * This is method is called regularly by the threading::Manager. * * Can be overriden in derived classed to hook into the heart beat, - * but must call the parent implementation. + * but must call the parent implementation. Note that this method is + * always called by the main thread and must not access data of the + * child thread directly. See DoHeartbeat() if you want to do + * something on the child-side. */ virtual void Heartbeat(); @@ -206,7 +209,7 @@ protected: * current_time: Wall clock when the heartbeat was trigger by the * main thread. */ - virtual bool DoHeartbeat(double network_time, double current_time) { return true; } + virtual bool DoHeartbeat(double network_time, double current_time); private: /**