diff --git a/src/Net.cc b/src/Net.cc index adac9c02fd..3acd4bce9d 100644 --- a/src/Net.cc +++ b/src/Net.cc @@ -34,6 +34,10 @@ #include "iosource/PktDumper.h" #include "plugin/Manager.h" +#ifdef ENABLE_BROKER +#include "comm/Manager.h" +#endif + extern "C" { #include "setsignal.h" }; @@ -315,6 +319,11 @@ void net_run() } #endif current_iosrc = src; + bool communication_enabled = using_communication; + +#ifdef ENABLE_BROKER + communication_enabled |= comm_mgr->Enabled(); +#endif if ( src ) src->Process(); // which will call net_packet_dispatch() @@ -332,7 +341,7 @@ void net_run() } } - else if ( (have_pending_timers || using_communication) && + else if ( (have_pending_timers || communication_enabled) && ! pseudo_realtime ) { // Take advantage of the lull to get up to @@ -347,7 +356,7 @@ void net_run() // us a lot of idle time, but doesn't delay near-term // timers too much. (Delaying them somewhat is okay, // since Bro timers are not high-precision anyway.) - if ( ! using_communication ) + if ( ! communication_enabled ) usleep(100000); else usleep(1000); diff --git a/src/comm/CMakeLists.txt b/src/comm/CMakeLists.txt index 6453e006bf..ef41c605c7 100644 --- a/src/comm/CMakeLists.txt +++ b/src/comm/CMakeLists.txt @@ -19,6 +19,7 @@ set(comm_SRCS Store.cc ) +bif_target(comm.bif) bif_target(data.bif) bif_target(messaging.bif) bif_target(store.bif) diff --git a/src/comm/Manager.cc b/src/comm/Manager.cc index 443c5f90da..7db80ebb40 100644 --- a/src/comm/Manager.cc +++ b/src/comm/Manager.cc @@ -13,6 +13,7 @@ #include "comm/store.bif.h" #include "logging/Manager.h" #include "DebugLogger.h" +#include "iosource/Manager.h" using namespace std; @@ -28,11 +29,6 @@ comm::Manager::~Manager() CloseStore(s.first.first, s.first.second); } -bool comm::Manager::InitPreScript() - { - return true; - } - static int require_field(RecordType* rt, const char* name) { auto rval = rt->FieldOffset(name); @@ -44,8 +40,11 @@ static int require_field(RecordType* rt, const char* name) return rval; } -bool comm::Manager::InitPostScript() +bool comm::Manager::Enable() { + if ( endpoint != nullptr ) + return true; + auto send_flags_type = internal_type("Comm::SendFlags")->AsRecordType(); send_flags_self_idx = require_field(send_flags_type, "self"); send_flags_peers_idx = require_field(send_flags_type, "peers"); @@ -94,11 +93,15 @@ bool comm::Manager::InitPostScript() } endpoint = unique_ptr(new broker::endpoint(name)); + iosource_mgr->Register(this, true); return true; } bool comm::Manager::Listen(uint16_t port, const char* addr, bool reuse_addr) { + if ( ! Enabled() ) + return false; + auto rval = endpoint->listen(port, addr, reuse_addr); if ( ! rval ) @@ -114,6 +117,9 @@ bool comm::Manager::Listen(uint16_t port, const char* addr, bool reuse_addr) bool comm::Manager::Connect(string addr, uint16_t port, chrono::duration retry_interval) { + if ( ! Enabled() ) + return false; + auto& peer = peers[make_pair(addr, port)]; if ( peer ) @@ -125,6 +131,9 @@ bool comm::Manager::Connect(string addr, uint16_t port, bool comm::Manager::Disconnect(const string& addr, uint16_t port) { + if ( ! Enabled() ) + return false; + auto it = peers.find(make_pair(addr, port)); if ( it == peers.end() ) @@ -137,18 +146,27 @@ bool comm::Manager::Disconnect(const string& addr, uint16_t port) bool comm::Manager::Print(string topic, string msg, Val* flags) { + if ( ! Enabled() ) + return false; + endpoint->send(move(topic), broker::message{move(msg)}, GetFlags(flags)); return true; } bool comm::Manager::Event(std::string topic, broker::message msg, int flags) { + if ( ! Enabled() ) + return false; + endpoint->send(move(topic), move(msg), flags); return true; } bool comm::Manager::Log(EnumVal* stream, RecordVal* columns, int flags) { + if ( ! Enabled() ) + return false; + auto stream_name = stream->Type()->AsEnumType()->Lookup(stream->AsEnum()); if ( ! stream_name ) @@ -176,6 +194,9 @@ bool comm::Manager::Log(EnumVal* stream, RecordVal* columns, int flags) bool comm::Manager::Event(std::string topic, RecordVal* args, Val* flags) { + if ( ! Enabled() ) + return false; + if ( ! args->Lookup(0) ) return false; @@ -198,6 +219,9 @@ bool comm::Manager::Event(std::string topic, RecordVal* args, Val* flags) bool comm::Manager::AutoEvent(string topic, Val* event, Val* flags) { + if ( ! Enabled() ) + return false; + if ( event->Type()->Tag() != TYPE_FUNC ) { reporter->Error("Comm::auto_event must operate on an event"); @@ -227,6 +251,9 @@ bool comm::Manager::AutoEvent(string topic, Val* event, Val* flags) bool comm::Manager::AutoEventStop(const string& topic, Val* event) { + if ( ! Enabled() ) + return false; + if ( event->Type()->Tag() != TYPE_FUNC ) { reporter->Error("Comm::auto_event_stop must operate on an event"); @@ -257,6 +284,9 @@ bool comm::Manager::AutoEventStop(const string& topic, Val* event) RecordVal* comm::Manager::MakeEventArgs(val_list* args) { + if ( ! Enabled() ) + return nullptr; + auto rval = new RecordVal(BifType::Record::Comm::EventArgs); auto arg_vec = new VectorVal(vector_of_data_type); rval->Assign(1, arg_vec); @@ -324,6 +354,9 @@ RecordVal* comm::Manager::MakeEventArgs(val_list* args) bool comm::Manager::SubscribeToPrints(string topic_prefix) { + if ( ! Enabled() ) + return false; + auto& q = print_subscriptions[topic_prefix]; if ( q ) @@ -335,11 +368,17 @@ bool comm::Manager::SubscribeToPrints(string topic_prefix) bool comm::Manager::UnsubscribeToPrints(const string& topic_prefix) { + if ( ! Enabled() ) + return false; + return print_subscriptions.erase(topic_prefix); } bool comm::Manager::SubscribeToEvents(string topic_prefix) { + if ( ! Enabled() ) + return false; + auto& q = event_subscriptions[topic_prefix]; if ( q ) @@ -351,11 +390,17 @@ bool comm::Manager::SubscribeToEvents(string topic_prefix) bool comm::Manager::UnsubscribeToEvents(const string& topic_prefix) { + if ( ! Enabled() ) + return false; + return event_subscriptions.erase(topic_prefix); } bool comm::Manager::SubscribeToLogs(string topic_prefix) { + if ( ! Enabled() ) + return false; + auto& q = log_subscriptions[topic_prefix]; if ( q ) @@ -367,6 +412,9 @@ bool comm::Manager::SubscribeToLogs(string topic_prefix) bool comm::Manager::UnsubscribeToLogs(const string& topic_prefix) { + if ( ! Enabled() ) + return false; + return log_subscriptions.erase(topic_prefix); } @@ -797,6 +845,9 @@ void comm::Manager::Process() bool comm::Manager::AddStore(StoreHandleVal* handle) { + if ( ! Enabled() ) + return false; + if ( ! handle->store ) return false; @@ -814,6 +865,9 @@ comm::StoreHandleVal* comm::Manager::LookupStore(const broker::store::identifier& id, comm::StoreType type) { + if ( ! Enabled() ) + return nullptr; + auto key = make_pair(id, type); auto it = data_stores.find(key); @@ -826,6 +880,9 @@ comm::Manager::LookupStore(const broker::store::identifier& id, bool comm::Manager::CloseStore(const broker::store::identifier& id, StoreType type) { + if ( ! Enabled() ) + return false; + auto key = make_pair(id, type); auto it = data_stores.find(key); @@ -854,5 +911,8 @@ bool comm::Manager::CloseStore(const broker::store::identifier& id, bool comm::Manager::TrackStoreQuery(StoreQueryCallback* cb) { + if ( ! Enabled() ) + return false; + return pending_queries.insert(cb).second; } diff --git a/src/comm/Manager.h b/src/comm/Manager.h index 31fdfa56c1..2317ecea2c 100644 --- a/src/comm/Manager.h +++ b/src/comm/Manager.h @@ -24,9 +24,10 @@ public: ~Manager(); - bool InitPreScript(); + bool Enable(); - bool InitPostScript(); + bool Enabled() + { return endpoint != nullptr; } bool Listen(uint16_t port, const char* addr = nullptr, bool reuse_addr = true); diff --git a/src/comm/comm.bif b/src/comm/comm.bif new file mode 100644 index 0000000000..7f8d85b720 --- /dev/null +++ b/src/comm/comm.bif @@ -0,0 +1,13 @@ + +##! General functions regarding Bro's broker communication mechanisms. + +%%{ +#include "comm/Manager.h" +%%} + +module Comm; + +function Comm::enable%(%): bool + %{ + return new Val(comm_mgr->Enable(), TYPE_BOOL); + %} diff --git a/src/main.cc b/src/main.cc index 5385ca7993..3d80833009 100644 --- a/src/main.cc +++ b/src/main.cc @@ -860,12 +860,6 @@ int main(int argc, char** argv) #ifdef ENABLE_BROKER comm_mgr = new comm::Manager(); - - if ( ! comm_mgr->InitPreScript() ) - { - fprintf(stderr, "Failed to initialize communication manager."); - exit(1); - } #endif plugin_mgr->InitPreScript(); @@ -942,11 +936,6 @@ int main(int argc, char** argv) exit(rc); } -#ifdef ENABLE_BROKER - comm_mgr->InitPostScript(); - iosource_mgr->Register(comm_mgr, true); -#endif - #ifdef USE_PERFTOOLS_DEBUG } #endif diff --git a/testing/btest/comm/clone_store.bro b/testing/btest/comm/clone_store.bro index 03e0fe172f..3ea0347024 100644 --- a/testing/btest/comm/clone_store.bro +++ b/testing/btest/comm/clone_store.bro @@ -53,6 +53,7 @@ event ready() event bro_init() { + Comm::enable(); Comm::listen(9999/tcp, "127.0.0.1"); Comm::subscribe_to_events("bro/event/ready"); Comm::auto_event("bro/event/done", done); @@ -106,6 +107,7 @@ event Comm::remote_connection_established(peer_address: string, event bro_init() { + Comm::enable(); Comm::connect("127.0.0.1", 9999/tcp, 1secs); Comm::auto_event("bro/event/ready", ready); Comm::subscribe_to_events("bro/event/done"); diff --git a/testing/btest/comm/data.bro b/testing/btest/comm/data.bro index 3fb9dcd86e..dfbb8fc1d7 100644 --- a/testing/btest/comm/data.bro +++ b/testing/btest/comm/data.bro @@ -100,6 +100,7 @@ function comm_vector_to_bro_vector(d: Comm::Data): bro_vector event bro_init() { +Comm::enable(); print Comm::data_type(Comm::data(T)); print Comm::data_type(Comm::data(+1)); print Comm::data_type(Comm::data(1)); diff --git a/testing/btest/comm/master_store.bro b/testing/btest/comm/master_store.bro index 84b4ee07a1..a1cc6a8c95 100644 --- a/testing/btest/comm/master_store.bro +++ b/testing/btest/comm/master_store.bro @@ -120,6 +120,7 @@ function dv(d: Comm::Data): Comm::DataVector event bro_init() { + Comm::enable(); local myset: set[string] = {"a", "b", "c"}; local myvec: vector of string = {"alpha", "beta", "gamma"}; h = Store::create_master("master"); diff --git a/testing/btest/comm/remote_event.test b/testing/btest/comm/remote_event.test index 9ab9a6b224..f44ed0df10 100644 --- a/testing/btest/comm/remote_event.test +++ b/testing/btest/comm/remote_event.test @@ -17,6 +17,7 @@ global auto_event_handler: event(msg: string, c: count); event bro_init() { + Comm::enable(); Comm::listen(9999/tcp, "127.0.0.1"); Comm::subscribe_to_events("bro/event/"); Comm::auto_event("bro/event/my_topic", auto_event_handler); @@ -47,6 +48,7 @@ global auto_event_handler: event(msg: string, c: count); event bro_init() { + Comm::enable(); Comm::subscribe_to_events("bro/event/my_topic"); Comm::connect("127.0.0.1", 9999/tcp, 1secs); } diff --git a/testing/btest/comm/remote_log.test b/testing/btest/comm/remote_log.test index aea88cdc25..7cdc2ab97d 100644 --- a/testing/btest/comm/remote_log.test +++ b/testing/btest/comm/remote_log.test @@ -23,13 +23,14 @@ export { }; global log_test: event(rec: Test::Info); - - event bro_init() &priority=5 - { - Log::create_stream(Test::LOG, [$columns=Test::Info, $ev=log_test]); - } } +event bro_init() &priority=5 + { + Comm::enable(); + Log::create_stream(Test::LOG, [$columns=Test::Info, $ev=log_test]); + } + @TEST-END-FILE @TEST-START-FILE recv.bro diff --git a/testing/btest/comm/remote_print.test b/testing/btest/comm/remote_print.test index 48dfd98bed..03e7517f20 100644 --- a/testing/btest/comm/remote_print.test +++ b/testing/btest/comm/remote_print.test @@ -14,6 +14,7 @@ redef exit_only_after_terminate = T; event bro_init() { + Comm::enable(); Comm::listen(9999/tcp, "127.0.0.1"); Comm::subscribe_to_prints("bro/print/"); } @@ -38,6 +39,7 @@ redef exit_only_after_terminate = T; event bro_init() { + Comm::enable(); Comm::subscribe_to_prints("bro/print/my_topic"); Comm::connect("127.0.0.1", 9999/tcp, 1secs); }