diff --git a/src/RemoteSerializer.cc b/src/RemoteSerializer.cc index 4b8f527f2b..f29e907790 100644 --- a/src/RemoteSerializer.cc +++ b/src/RemoteSerializer.cc @@ -234,7 +234,7 @@ static const int PRINT_BUFFER_SIZE = 10 * 1024; static const int SOCKBUF_SIZE = 1024 * 1024; // Buffer size for remote-log data. -static const int LOG_BUFFER_SIZE = 50 * 1024; +static const int LOG_BUFFER_SIZE = 512; struct ping_args { uint32 seq; @@ -532,6 +532,7 @@ RemoteSerializer::RemoteSerializer() terminating = false; in_sync = 0; last_flush = 0; + received_logs = 0; } RemoteSerializer::~RemoteSerializer() @@ -1353,6 +1354,14 @@ double RemoteSerializer::NextTimestamp(double* local_network_time) { Poll(false); + if ( received_logs > 0 ) + { + // If we processed logs last time, assume there's more. + idle = false; + received_logs = 0; + return timer_mgr->Time(); + } + double et = events.length() ? events[0]->time : -1; double pt = packets.length() ? packets[0]->time : -1; @@ -2552,7 +2561,9 @@ bool RemoteSerializer::SendLogWrite(Peer* peer, EnumVal* id, EnumVal* writer, st if ( ! peer->logs_requested ) return false; - assert(peer->log_buffer); + if ( ! peer->log_buffer ) + // Peer shutting down. + return false; // Serialize the log record entry. @@ -2587,7 +2598,10 @@ bool RemoteSerializer::SendLogWrite(Peer* peer, EnumVal* id, EnumVal* writer, st if ( len > (LOG_BUFFER_SIZE - peer->log_buffer_used) || (network_time - last_flush > 1.0) ) { if ( ! FlushLogBuffer(peer) ) + { + delete [] data; return false; + } } // If the data is actually larger than our complete buffer, just send it out. @@ -2631,6 +2645,12 @@ bool RemoteSerializer::ProcessLogCreateWriter() if ( current_peer->state == Peer::CLOSING ) return false; +#ifdef USE_PERFTOOLS + // Don't track allocations here, they'll be released only after the + // main loop exists. And it's just a tiny amount anyway. + HeapLeakChecker::Disabler disabler; +#endif + assert(current_args); EnumVal* id_val = 0; @@ -2666,7 +2686,7 @@ bool RemoteSerializer::ProcessLogCreateWriter() id_val = new EnumVal(id, BifType::Enum::Log::ID); writer_val = new EnumVal(writer, BifType::Enum::Log::Writer); - if ( ! log_mgr->CreateWriter(id_val, writer_val, path, num_fields, fields) ) + if ( ! log_mgr->CreateWriter(id_val, writer_val, path, num_fields, fields, true, false) ) goto error; Unref(id_val); @@ -2735,6 +2755,8 @@ bool RemoteSerializer::ProcessLogWrite() fmt.EndRead(); + ++received_logs; + return true; error: @@ -3376,6 +3398,9 @@ void SocketComm::Run() small_timeout.tv_usec = io->CanWrite() || io->CanRead() ? 1 : 10; + if ( ! io->CanWrite() ) + usleep(10); + int a = select(max_fd + 1, &fd_read, &fd_write, &fd_except, &small_timeout); diff --git a/src/RemoteSerializer.h b/src/RemoteSerializer.h index eabcb18a38..05d25ca525 100644 --- a/src/RemoteSerializer.h +++ b/src/RemoteSerializer.h @@ -338,6 +338,7 @@ private: int propagate_accesses; bool ignore_accesses; bool terminating; + int received_logs; Peer* source_peer; PeerID id_counter; // Keeps track of assigned IDs. uint32 current_sync_point; diff --git a/src/logging/Manager.cc b/src/logging/Manager.cc index 0753296cb4..74220ecde4 100644 --- a/src/logging/Manager.cc +++ b/src/logging/Manager.cc @@ -105,9 +105,6 @@ Manager::Stream::~Stream() { WriterInfo* winfo = i->second; - if ( ! winfo ) - continue; - if ( winfo->rotation_timer ) timer_mgr->Cancel(winfo->rotation_timer); @@ -207,7 +204,7 @@ Manager::WriterInfo* Manager::FindWriter(WriterFrontend* writer) { WriterInfo* winfo = i->second; - if ( winfo && winfo->writer == writer ) + if ( winfo->writer == writer ) return winfo; } } @@ -221,7 +218,7 @@ void Manager::RemoveDisabledWriters(Stream* stream) for ( Stream::WriterMap::iterator j = stream->writers.begin(); j != stream->writers.end(); j++ ) { - if ( j->second && j->second->writer->Disabled() ) + if ( j->second->writer->Disabled() ) { j->second->writer->Stop(); delete j->second; @@ -680,11 +677,11 @@ bool Manager::Write(EnumVal* id, RecordVal* columns) Val* path_arg; if ( filter->path_val ) - path_arg = filter->path_val; + path_arg = filter->path_val->Ref(); else path_arg = new StringVal(""); - vl.append(path_arg->Ref()); + vl.append(path_arg); Val* rec_arg; BroType* rt = filter->path_func->FType()->Args()->FieldType("rec"); @@ -718,7 +715,6 @@ bool Manager::Write(EnumVal* id, RecordVal* columns) if ( ! filter->path_val ) { - Unref(path_arg); filter->path = v->AsString()->CheckString(); filter->path_val = v->Ref(); } @@ -740,7 +736,7 @@ bool Manager::Write(EnumVal* id, RecordVal* columns) if ( w != stream->writers.end() ) // We know this writer already. - writer = w->second ? w->second->writer : 0; + writer = w->second->writer; else { @@ -753,64 +749,25 @@ bool Manager::Write(EnumVal* id, RecordVal* columns) for ( int j = 0; j < filter->num_fields; ++j ) arg_fields[j] = new Field(*filter->fields[j]); - if ( filter->remote ) - remote_serializer->SendLogCreateWriter(stream->id, - filter->writer, - path, - filter->num_fields, - arg_fields); + writer = CreateWriter(stream->id, filter->writer, + path, filter->num_fields, + arg_fields, filter->local, filter->remote); - if ( filter->local ) + if ( ! writer ) { - writer = CreateWriter(stream->id, filter->writer, - path, filter->num_fields, - arg_fields); - - if ( ! writer ) - { - Unref(columns); - return false; - } + Unref(columns); + return false; } - else - { - // Insert a null pointer into the map to make - // sure we don't try creating it again. - stream->writers.insert(Stream::WriterMap::value_type( - Stream::WriterPathPair(filter->writer->AsEnum(), path), 0)); - for( int i = 0; i < filter->num_fields; ++i) - delete arg_fields[i]; - - delete [] arg_fields; - } } // Alright, can do the write now. - if ( filter->local || filter->remote ) - { - threading::Value** vals = RecordToFilterVals(stream, filter, columns); - - if ( filter->remote ) - remote_serializer->SendLogWrite(stream->id, - filter->writer, - path, - filter->num_fields, - vals); - - if ( filter->local ) - { - // Write takes ownership of vals. - assert(writer); - writer->Write(filter->num_fields, vals); - } - - else - DeleteVals(filter->num_fields, vals); - - } + threading::Value** vals = RecordToFilterVals(stream, filter, columns); + // Write takes ownership of vals. + assert(writer); + writer->Write(filter->num_fields, vals); #ifdef DEBUG DBG_LOG(DBG_LOGGING, "Wrote record to filter '%s' on stream '%s'", @@ -976,7 +933,7 @@ Value** Manager::RecordToFilterVals(Stream* stream, Filter* filter, } WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, string path, - int num_fields, const Field* const* fields) + int num_fields, const Field* const* fields, bool local, bool remote) { Stream* stream = FindStream(id); @@ -987,12 +944,12 @@ WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, string path, Stream::WriterMap::iterator w = stream->writers.find(Stream::WriterPathPair(writer->AsEnum(), path)); - if ( w != stream->writers.end() && w->second ) + if ( w != stream->writers.end() ) // If we already have a writer for this. That's fine, we just // return it. return w->second->writer; - WriterFrontend* writer_obj = new WriterFrontend(writer->AsEnum()); + WriterFrontend* writer_obj = new WriterFrontend(id, writer, local, remote); assert(writer_obj); writer_obj->Init(path, num_fields, fields); @@ -1089,8 +1046,7 @@ bool Manager::Write(EnumVal* id, EnumVal* writer, string path, int num_fields, return false; } - if ( w->second ) - w->second->writer->Write(num_fields, vals); + w->second->writer->Write(num_fields, vals); DBG_LOG(DBG_LOGGING, "Wrote pre-filtered record to path '%s' on stream '%s'", @@ -1111,9 +1067,6 @@ void Manager::SendAllWritersTo(RemoteSerializer::PeerID peer) for ( Stream::WriterMap::iterator i = stream->writers.begin(); i != stream->writers.end(); i++ ) { - if ( ! i->second ) - continue; - WriterFrontend* writer = i->second->writer; EnumVal writer_val(i->first.first, BifType::Enum::Log::Writer); @@ -1134,10 +1087,7 @@ bool Manager::SetBuf(EnumVal* id, bool enabled) for ( Stream::WriterMap::iterator i = stream->writers.begin(); i != stream->writers.end(); i++ ) - { - if ( i->second ) - i->second->writer->SetBuf(enabled); - } + i->second->writer->SetBuf(enabled); RemoveDisabledWriters(stream); @@ -1155,10 +1105,7 @@ bool Manager::Flush(EnumVal* id) for ( Stream::WriterMap::iterator i = stream->writers.begin(); i != stream->writers.end(); i++ ) - { - if ( i->second ) - i->second->writer->Flush(); - } + i->second->writer->Flush(); RemoveDisabledWriters(stream); diff --git a/src/logging/Manager.h b/src/logging/Manager.h index d931bfaef8..bf097c5e1a 100644 --- a/src/logging/Manager.h +++ b/src/logging/Manager.h @@ -159,7 +159,8 @@ protected: // Takes ownership of fields. WriterFrontend* CreateWriter(EnumVal* id, EnumVal* writer, string path, - int num_fields, const threading::Field* const* fields); + int num_fields, const threading::Field* const* fields, + bool local, bool remote); // Takes ownership of values.. bool Write(EnumVal* id, EnumVal* writer, string path, diff --git a/src/logging/WriterFrontend.cc b/src/logging/WriterFrontend.cc index 02f1a188d8..26e8eaf22e 100644 --- a/src/logging/WriterFrontend.cc +++ b/src/logging/WriterFrontend.cc @@ -99,21 +99,36 @@ public: using namespace logging; -WriterFrontend::WriterFrontend(bro_int_t type) +WriterFrontend::WriterFrontend(EnumVal* arg_stream, EnumVal* arg_writer, bool arg_local, bool arg_remote) { + stream = arg_stream; + writer = arg_writer; + Ref(stream); + Ref(writer); + disabled = initialized = false; buf = true; + local = arg_local; + remote = arg_remote; write_buffer = 0; write_buffer_pos = 0; ty_name = ""; - backend = log_mgr->CreateBackend(this, type); - assert(backend); - backend->Start(); + if ( local ) + { + backend = log_mgr->CreateBackend(this, writer->AsEnum()); + assert(backend); + backend->Start(); + } + + else + backend = 0; } WriterFrontend::~WriterFrontend() { + Unref(stream); + Unref(writer); } string WriterFrontend::Name() const @@ -128,7 +143,9 @@ void WriterFrontend::Stop() { FlushWriteBuffer(); SetDisable(); - backend->Stop(); + + if ( backend ) + backend->Stop(); } void WriterFrontend::Init(string arg_path, int arg_num_fields, const Field* const * arg_fields) @@ -144,7 +161,17 @@ void WriterFrontend::Init(string arg_path, int arg_num_fields, const Field* cons fields = arg_fields; initialized = true; - backend->SendIn(new InitMessage(backend, arg_path, arg_num_fields, arg_fields)); + + if ( backend ) + backend->SendIn(new InitMessage(backend, arg_path, arg_num_fields, arg_fields)); + + if ( remote ) + remote_serializer->SendLogCreateWriter(stream, + writer, + arg_path, + arg_num_fields, + arg_fields); + } void WriterFrontend::Write(int num_fields, Value** vals) @@ -152,6 +179,19 @@ void WriterFrontend::Write(int num_fields, Value** vals) if ( disabled ) return; + if ( remote ) + remote_serializer->SendLogWrite(stream, + writer, + path, + num_fields, + vals); + + if ( ! backend ) + { + DeleteVals(vals); + return; + } + if ( ! write_buffer ) { // Need new buffer. @@ -173,7 +213,8 @@ void WriterFrontend::FlushWriteBuffer() // Nothing to do. return; - backend->SendIn(new WriteMessage(backend, num_fields, write_buffer_pos, write_buffer)); + if ( backend ) + backend->SendIn(new WriteMessage(backend, num_fields, write_buffer_pos, write_buffer)); // Clear buffer (no delete, we pass ownership to child thread.) write_buffer = 0; @@ -187,7 +228,8 @@ void WriterFrontend::SetBuf(bool enabled) buf = enabled; - backend->SendIn(new SetBufMessage(backend, enabled)); + if ( backend ) + backend->SendIn(new SetBufMessage(backend, enabled)); if ( ! buf ) // Make sure no longer buffer any still queued data. @@ -200,7 +242,9 @@ void WriterFrontend::Flush() return; FlushWriteBuffer(); - backend->SendIn(new FlushMessage(backend)); + + if ( backend ) + backend->SendIn(new FlushMessage(backend)); } void WriterFrontend::Rotate(string rotated_path, double open, double close, bool terminating) @@ -209,7 +253,9 @@ void WriterFrontend::Rotate(string rotated_path, double open, double close, bool return; FlushWriteBuffer(); - backend->SendIn(new RotateMessage(backend, this, rotated_path, open, close, terminating)); + + if ( backend ) + backend->SendIn(new RotateMessage(backend, this, rotated_path, open, close, terminating)); } void WriterFrontend::Finish() @@ -218,7 +264,18 @@ void WriterFrontend::Finish() return; FlushWriteBuffer(); - backend->SendIn(new FinishMessage(backend)); + + if ( backend ) + backend->SendIn(new FinishMessage(backend)); + } + +void WriterFrontend::DeleteVals(Value** vals) + { + // Note this code is duplicated in Manager::DeleteVals(). + for ( int i = 0; i < num_fields; i++ ) + delete vals[i]; + + delete [] vals; } diff --git a/src/logging/WriterFrontend.h b/src/logging/WriterFrontend.h index 4386a15f64..3e05d17c9e 100644 --- a/src/logging/WriterFrontend.h +++ b/src/logging/WriterFrontend.h @@ -25,14 +25,21 @@ public: /** * Constructor. * - * type: The backend writer type, with the value corresponding to the + * stream: The logging stream. + * + * writer: The backend writer type, with the value corresponding to the * script-level \c Log::Writer enum (e.g., \a WRITER_ASCII). The * frontend will internally instantiate a WriterBackend of the * corresponding type. + * + * local: If true, the writer will instantiate a local backend. + * + * remote: If true, the writer will forward all data to remote + * clients. * * Frontends must only be instantiated by the main thread. */ - WriterFrontend(bro_int_t type); + WriterFrontend(EnumVal* stream, EnumVal* writer, bool local, bool remote); /** * Destructor. @@ -187,10 +194,17 @@ public: protected: friend class Manager; + void DeleteVals(threading::Value** vals); + + EnumVal* stream; + EnumVal* writer; + WriterBackend* backend; // The backend we have instanatiated. bool disabled; // True if disabled. bool initialized; // True if initialized. bool buf; // True if buffering is enabled (default). + bool local; // True if logging locally. + bool remote; // True if loggin remotely. string ty_name; // Name of the backend type. Set by the manager. string path; // The log path. diff --git a/src/threading/BasicThread.cc b/src/threading/BasicThread.cc index 51c4f7a3bc..e590b13434 100644 --- a/src/threading/BasicThread.cc +++ b/src/threading/BasicThread.cc @@ -20,8 +20,8 @@ BasicThread::BasicThread() terminating = false; pthread = 0; - buf = 0; - buf_len = 1024; + buf_len = 2048; + buf = (char*) malloc(buf_len); name = Fmt("thread-%d", ++thread_counter); @@ -57,9 +57,6 @@ void BasicThread::SetOSName(const string& name) const char* BasicThread::Fmt(const char* format, ...) { - if ( ! buf ) - buf = (char*) malloc(buf_len); - va_list al; va_start(al, format); int n = safe_vsnprintf(buf, buf_len, format, al); @@ -67,13 +64,15 @@ const char* BasicThread::Fmt(const char* format, ...) if ( (unsigned int) n >= buf_len ) { // Not enough room, grow the buffer. - buf_len = n + 32; - buf = (char*) realloc(buf, buf_len); + int tmp_len = n + 32; + char* tmp = (char*) malloc(tmp_len); // Is it portable to restart? va_start(al, format); - n = safe_vsnprintf(buf, buf_len, format, al); + n = safe_vsnprintf(tmp, tmp_len, format, al); va_end(al); + + free(tmp); } return buf; diff --git a/src/threading/Manager.cc b/src/threading/Manager.cc index 3f5145d7a9..db86caa26f 100644 --- a/src/threading/Manager.cc +++ b/src/threading/Manager.cc @@ -102,25 +102,25 @@ void Manager::Process() next_beat = 0; } - if ( ! t->HasOut() ) - continue; - - Message* msg = t->RetrieveOut(); - - if ( msg->Process() ) + while ( t->HasOut() ) { - //if ( network_time ) //&& network_time ) // FIXME: ask robin again if he needs this. makes input interface not work in bro_init. + Message* msg = t->RetrieveOut(); - did_process = true; - } - else - { - string s = msg->Name() + " failed, terminating thread " + t->Name() + " (in ThreadManager)"; - reporter->Error("%s", s.c_str()); - t->Stop(); + if ( msg->Process() ) + { + //if ( network_time ) // FIXME: ask robin again if he needs this. makes input interface not work in bro_init. + did_process = true; + } + + else + { + string s = msg->Name() + " failed, terminating thread"; + reporter->Error("%s", s.c_str()); + t->Stop(); } - delete msg; + delete msg; + } } // fprintf(stderr, "P %.6f %.6f do_beat=%d did_process=%d next_next=%.6f\n", network_time, timer_mgr->Time(), do_beat, (int)did_process, next_beat); diff --git a/src/threading/MsgThread.cc b/src/threading/MsgThread.cc index 5f77a1c9f8..490c89c057 100644 --- a/src/threading/MsgThread.cc +++ b/src/threading/MsgThread.cc @@ -281,7 +281,7 @@ void MsgThread::GetStats(Stats* stats) { stats->sent_in = cnt_sent_in; stats->sent_out = cnt_sent_out; - stats->pending_in = cnt_sent_in - queue_in.Size(); - stats->pending_out = cnt_sent_out - queue_out.Size(); + stats->pending_in = queue_in.Size(); + stats->pending_out = queue_out.Size(); } diff --git a/testing/btest/Baseline/core.leaks.basic-cluster/manager-1.metrics.log b/testing/btest/Baseline/core.leaks.basic-cluster/manager-1.metrics.log new file mode 100644 index 0000000000..42fcd6a526 --- /dev/null +++ b/testing/btest/Baseline/core.leaks.basic-cluster/manager-1.metrics.log @@ -0,0 +1,10 @@ +#separator \x09 +#set_separator , +#empty_field (empty) +#unset_field - +#path metrics +#fields ts metric_id filter_name index.host index.str index.network value +#types time enum string addr string subnet count +1331256494.591966 TEST_METRIC foo-bar 6.5.4.3 - - 4 +1331256494.591966 TEST_METRIC foo-bar 7.2.1.5 - - 2 +1331256494.591966 TEST_METRIC foo-bar 1.2.3.4 - - 6 diff --git a/testing/btest/Baseline/core.leaks.remote/sender.test.failure.log b/testing/btest/Baseline/core.leaks.remote/sender.test.failure.log new file mode 100644 index 0000000000..5a26f322f4 --- /dev/null +++ b/testing/btest/Baseline/core.leaks.remote/sender.test.failure.log @@ -0,0 +1,10 @@ +#separator \x09 +#set_separator , +#empty_field (empty) +#unset_field - +#path test.failure +#fields t id.orig_h id.orig_p id.resp_h id.resp_p status country +#types time addr port addr port string string +1331256472.375609 1.2.3.4 1234 2.3.4.5 80 failure US +1331256472.375609 1.2.3.4 1234 2.3.4.5 80 failure UK +1331256472.375609 1.2.3.4 1234 2.3.4.5 80 failure MX diff --git a/testing/btest/Baseline/core.leaks.remote/sender.test.log b/testing/btest/Baseline/core.leaks.remote/sender.test.log new file mode 100644 index 0000000000..9d2ba26f48 --- /dev/null +++ b/testing/btest/Baseline/core.leaks.remote/sender.test.log @@ -0,0 +1,12 @@ +#separator \x09 +#set_separator , +#empty_field (empty) +#unset_field - +#path test +#fields t id.orig_h id.orig_p id.resp_h id.resp_p status country +#types time addr port addr port string string +1331256472.375609 1.2.3.4 1234 2.3.4.5 80 success unknown +1331256472.375609 1.2.3.4 1234 2.3.4.5 80 failure US +1331256472.375609 1.2.3.4 1234 2.3.4.5 80 failure UK +1331256472.375609 1.2.3.4 1234 2.3.4.5 80 success BR +1331256472.375609 1.2.3.4 1234 2.3.4.5 80 failure MX diff --git a/testing/btest/Baseline/core.leaks.remote/sender.test.success.log b/testing/btest/Baseline/core.leaks.remote/sender.test.success.log new file mode 100644 index 0000000000..1b2ed452a0 --- /dev/null +++ b/testing/btest/Baseline/core.leaks.remote/sender.test.success.log @@ -0,0 +1,9 @@ +#separator \x09 +#set_separator , +#empty_field (empty) +#unset_field - +#path test.success +#fields t id.orig_h id.orig_p id.resp_h id.resp_p status country +#types time addr port addr port string string +1331256472.375609 1.2.3.4 1234 2.3.4.5 80 success unknown +1331256472.375609 1.2.3.4 1234 2.3.4.5 80 success BR diff --git a/testing/btest/core/leaks/basic-cluster.bro b/testing/btest/core/leaks/basic-cluster.bro new file mode 100644 index 0000000000..a82f52c8b2 --- /dev/null +++ b/testing/btest/core/leaks/basic-cluster.bro @@ -0,0 +1,39 @@ +# Needs perftools support. +# +# @TEST-REQUIRES: bro --help 2>&1 | grep -q mem-leaks +# @TEST-EXEC: btest-bg-run manager-1 HEAP_CHECK_DUMP_DIRECTORY=. HEAPCHECK=local BROPATH=$BROPATH:.. CLUSTER_NODE=manager-1 bro -m %INPUT +# @TEST-EXEC: btest-bg-run proxy-1 BROPATH=$BROPATH:.. CLUSTER_NODE=proxy-1 bro %INPUT +# @TEST-EXEC: sleep 1 +# @TEST-EXEC: btest-bg-run worker-1 HEAP_CHECK_DUMP_DIRECTORY=. HEAPCHECK=local BROPATH=$BROPATH:.. CLUSTER_NODE=worker-1 bro -m -r $TRACES/web.trace --pseudo-realtime %INPUT +# @TEST-EXEC: btest-bg-run worker-2 HEAP_CHECK_DUMP_DIRECTORY=. HEAPCHECK=local BROPATH=$BROPATH:.. CLUSTER_NODE=worker-2 bro -m -r $TRACES/web.trace --pseudo-realtime %INPUT +# @TEST-EXEC: btest-bg-wait -k 30 +# @TEST-EXEC: btest-diff manager-1/metrics.log + +@TEST-START-FILE cluster-layout.bro +redef Cluster::nodes = { + ["manager-1"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=37757/tcp, $workers=set("worker-1")], + ["proxy-1"] = [$node_type=Cluster::PROXY, $ip=127.0.0.1, $p=37758/tcp, $manager="manager-1", $workers=set("worker-1")], + ["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=37760/tcp, $manager="manager-1", $proxy="proxy-1", $interface="eth0"], + ["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=37761/tcp, $manager="manager-1", $proxy="proxy-1", $interface="eth1"], +}; +@TEST-END-FILE + +redef Log::default_rotation_interval = 0secs; + +redef enum Metrics::ID += { + TEST_METRIC, +}; + +event bro_init() &priority=5 + { + Metrics::add_filter(TEST_METRIC, + [$name="foo-bar", + $break_interval=3secs]); + + if ( Cluster::local_node_type() == Cluster::WORKER ) + { + Metrics::add_data(TEST_METRIC, [$host=1.2.3.4], 3); + Metrics::add_data(TEST_METRIC, [$host=6.5.4.3], 2); + Metrics::add_data(TEST_METRIC, [$host=7.2.1.5], 1); + } + } diff --git a/testing/btest/core/leaks/remote.bro b/testing/btest/core/leaks/remote.bro new file mode 100644 index 0000000000..fa72ce6024 --- /dev/null +++ b/testing/btest/core/leaks/remote.bro @@ -0,0 +1,79 @@ +# Needs perftools support. +# +# @TEST-REQUIRES: bro --help 2>&1 | grep -q mem-leaks +# +# @TEST-EXEC: btest-bg-run sender HEAP_CHECK_DUMP_DIRECTORY=. HEAPCHECK=local bro -m --pseudo-realtime %INPUT ../sender.bro +# @TEST-EXEC: sleep 1 +# @TEST-EXEC: btest-bg-run receiver HEAP_CHECK_DUMP_DIRECTORY=. HEAPCHECK=local bro -m --pseudo-realtime %INPUT ../receiver.bro +# @TEST-EXEC: sleep 1 +# @TEST-EXEC: btest-bg-wait -k 10 +# @TEST-EXEC: btest-diff sender/test.log +# @TEST-EXEC: btest-diff sender/test.failure.log +# @TEST-EXEC: btest-diff sender/test.success.log +# @TEST-EXEC: cmp receiver/test.log sender/test.log +# @TEST-EXEC: cmp receiver/test.failure.log sender/test.failure.log +# @TEST-EXEC: cmp receiver/test.success.log sender/test.success.log + +# This is the common part loaded by both sender and receiver. +module Test; + +export { + # Create a new ID for our log stream + redef enum Log::ID += { LOG }; + + # Define a record with all the columns the log file can have. + # (I'm using a subset of fields from ssh-ext for demonstration.) + type Log: record { + t: time; + id: conn_id; # Will be rolled out into individual columns. + status: string &optional; + country: string &default="unknown"; + } &log; +} + +event bro_init() +{ + Log::create_stream(Test::LOG, [$columns=Log]); + Log::add_filter(Test::LOG, [$name="f1", $path="test.success", $pred=function(rec: Log): bool { return rec$status == "success"; }]); +} + +##### + +@TEST-START-FILE sender.bro + +module Test; + +@load frameworks/communication/listen + +function fail(rec: Log): bool + { + return rec$status != "success"; + } + +event remote_connection_handshake_done(p: event_peer) + { + Log::add_filter(Test::LOG, [$name="f2", $path="test.failure", $pred=fail]); + + local cid = [$orig_h=1.2.3.4, $orig_p=1234/tcp, $resp_h=2.3.4.5, $resp_p=80/tcp]; + + local r: Log = [$t=network_time(), $id=cid, $status="success"]; + + # Log something. + Log::write(Test::LOG, r); + Log::write(Test::LOG, [$t=network_time(), $id=cid, $status="failure", $country="US"]); + Log::write(Test::LOG, [$t=network_time(), $id=cid, $status="failure", $country="UK"]); + Log::write(Test::LOG, [$t=network_time(), $id=cid, $status="success", $country="BR"]); + Log::write(Test::LOG, [$t=network_time(), $id=cid, $status="failure", $country="MX"]); + disconnect(p); + } +@TEST-END-FILE + +@TEST-START-FILE receiver.bro + +##### + +redef Communication::nodes += { + ["foo"] = [$host = 127.0.0.1, $connect=T, $request_logs=T] +}; + +@TEST-END-FILE diff --git a/testing/external/scripts/perftools-adapt-paths b/testing/external/scripts/perftools-adapt-paths index 2eda2477c7..cfecd39993 100755 --- a/testing/external/scripts/perftools-adapt-paths +++ b/testing/external/scripts/perftools-adapt-paths @@ -7,4 +7,4 @@ cat $1 | sed "s#bro *\"\./#../../../build/src/bro \".tmp/$TEST_NAME/#g" | sed 's/ *--gv//g' >$1.tmp && mv $1.tmp $1 -grep -q "No leaks found" $1 +grep -qv "detected leaks of" $1