broker integration: add Comm::enable function

Works like old enable_communication(), but for new broker communication
mechanism.  Scripts have to explicitly call this if they want to use the
broker communication functionality.  Saves a decent chunk of Bros'
initialization time when one doesn't need communication features.
This commit is contained in:
Jon Siwek 2015-02-03 16:38:56 -06:00
parent 0cf982f1d1
commit 4dfec04135
12 changed files with 108 additions and 26 deletions

View file

@ -34,6 +34,10 @@
#include "iosource/PktDumper.h" #include "iosource/PktDumper.h"
#include "plugin/Manager.h" #include "plugin/Manager.h"
#ifdef ENABLE_BROKER
#include "comm/Manager.h"
#endif
extern "C" { extern "C" {
#include "setsignal.h" #include "setsignal.h"
}; };
@ -315,6 +319,11 @@ void net_run()
} }
#endif #endif
current_iosrc = src; current_iosrc = src;
bool communication_enabled = using_communication;
#ifdef ENABLE_BROKER
communication_enabled |= comm_mgr->Enabled();
#endif
if ( src ) if ( src )
src->Process(); // which will call net_packet_dispatch() src->Process(); // which will call net_packet_dispatch()
@ -332,7 +341,7 @@ void net_run()
} }
} }
else if ( (have_pending_timers || using_communication) && else if ( (have_pending_timers || communication_enabled) &&
! pseudo_realtime ) ! pseudo_realtime )
{ {
// Take advantage of the lull to get up to // Take advantage of the lull to get up to
@ -347,7 +356,7 @@ void net_run()
// us a lot of idle time, but doesn't delay near-term // us a lot of idle time, but doesn't delay near-term
// timers too much. (Delaying them somewhat is okay, // timers too much. (Delaying them somewhat is okay,
// since Bro timers are not high-precision anyway.) // since Bro timers are not high-precision anyway.)
if ( ! using_communication ) if ( ! communication_enabled )
usleep(100000); usleep(100000);
else else
usleep(1000); usleep(1000);

View file

@ -19,6 +19,7 @@ set(comm_SRCS
Store.cc Store.cc
) )
bif_target(comm.bif)
bif_target(data.bif) bif_target(data.bif)
bif_target(messaging.bif) bif_target(messaging.bif)
bif_target(store.bif) bif_target(store.bif)

View file

@ -13,6 +13,7 @@
#include "comm/store.bif.h" #include "comm/store.bif.h"
#include "logging/Manager.h" #include "logging/Manager.h"
#include "DebugLogger.h" #include "DebugLogger.h"
#include "iosource/Manager.h"
using namespace std; using namespace std;
@ -28,11 +29,6 @@ comm::Manager::~Manager()
CloseStore(s.first.first, s.first.second); CloseStore(s.first.first, s.first.second);
} }
bool comm::Manager::InitPreScript()
{
return true;
}
static int require_field(RecordType* rt, const char* name) static int require_field(RecordType* rt, const char* name)
{ {
auto rval = rt->FieldOffset(name); auto rval = rt->FieldOffset(name);
@ -44,8 +40,11 @@ static int require_field(RecordType* rt, const char* name)
return rval; return rval;
} }
bool comm::Manager::InitPostScript() bool comm::Manager::Enable()
{ {
if ( endpoint != nullptr )
return true;
auto send_flags_type = internal_type("Comm::SendFlags")->AsRecordType(); auto send_flags_type = internal_type("Comm::SendFlags")->AsRecordType();
send_flags_self_idx = require_field(send_flags_type, "self"); send_flags_self_idx = require_field(send_flags_type, "self");
send_flags_peers_idx = require_field(send_flags_type, "peers"); send_flags_peers_idx = require_field(send_flags_type, "peers");
@ -94,11 +93,15 @@ bool comm::Manager::InitPostScript()
} }
endpoint = unique_ptr<broker::endpoint>(new broker::endpoint(name)); endpoint = unique_ptr<broker::endpoint>(new broker::endpoint(name));
iosource_mgr->Register(this, true);
return true; return true;
} }
bool comm::Manager::Listen(uint16_t port, const char* addr, bool reuse_addr) bool comm::Manager::Listen(uint16_t port, const char* addr, bool reuse_addr)
{ {
if ( ! Enabled() )
return false;
auto rval = endpoint->listen(port, addr, reuse_addr); auto rval = endpoint->listen(port, addr, reuse_addr);
if ( ! rval ) if ( ! rval )
@ -114,6 +117,9 @@ bool comm::Manager::Listen(uint16_t port, const char* addr, bool reuse_addr)
bool comm::Manager::Connect(string addr, uint16_t port, bool comm::Manager::Connect(string addr, uint16_t port,
chrono::duration<double> retry_interval) chrono::duration<double> retry_interval)
{ {
if ( ! Enabled() )
return false;
auto& peer = peers[make_pair(addr, port)]; auto& peer = peers[make_pair(addr, port)];
if ( peer ) if ( peer )
@ -125,6 +131,9 @@ bool comm::Manager::Connect(string addr, uint16_t port,
bool comm::Manager::Disconnect(const string& addr, uint16_t port) bool comm::Manager::Disconnect(const string& addr, uint16_t port)
{ {
if ( ! Enabled() )
return false;
auto it = peers.find(make_pair(addr, port)); auto it = peers.find(make_pair(addr, port));
if ( it == peers.end() ) if ( it == peers.end() )
@ -137,18 +146,27 @@ bool comm::Manager::Disconnect(const string& addr, uint16_t port)
bool comm::Manager::Print(string topic, string msg, Val* flags) bool comm::Manager::Print(string topic, string msg, Val* flags)
{ {
if ( ! Enabled() )
return false;
endpoint->send(move(topic), broker::message{move(msg)}, GetFlags(flags)); endpoint->send(move(topic), broker::message{move(msg)}, GetFlags(flags));
return true; return true;
} }
bool comm::Manager::Event(std::string topic, broker::message msg, int flags) bool comm::Manager::Event(std::string topic, broker::message msg, int flags)
{ {
if ( ! Enabled() )
return false;
endpoint->send(move(topic), move(msg), flags); endpoint->send(move(topic), move(msg), flags);
return true; return true;
} }
bool comm::Manager::Log(EnumVal* stream, RecordVal* columns, int flags) bool comm::Manager::Log(EnumVal* stream, RecordVal* columns, int flags)
{ {
if ( ! Enabled() )
return false;
auto stream_name = stream->Type()->AsEnumType()->Lookup(stream->AsEnum()); auto stream_name = stream->Type()->AsEnumType()->Lookup(stream->AsEnum());
if ( ! stream_name ) if ( ! stream_name )
@ -176,6 +194,9 @@ bool comm::Manager::Log(EnumVal* stream, RecordVal* columns, int flags)
bool comm::Manager::Event(std::string topic, RecordVal* args, Val* flags) bool comm::Manager::Event(std::string topic, RecordVal* args, Val* flags)
{ {
if ( ! Enabled() )
return false;
if ( ! args->Lookup(0) ) if ( ! args->Lookup(0) )
return false; return false;
@ -198,6 +219,9 @@ bool comm::Manager::Event(std::string topic, RecordVal* args, Val* flags)
bool comm::Manager::AutoEvent(string topic, Val* event, Val* flags) bool comm::Manager::AutoEvent(string topic, Val* event, Val* flags)
{ {
if ( ! Enabled() )
return false;
if ( event->Type()->Tag() != TYPE_FUNC ) if ( event->Type()->Tag() != TYPE_FUNC )
{ {
reporter->Error("Comm::auto_event must operate on an event"); reporter->Error("Comm::auto_event must operate on an event");
@ -227,6 +251,9 @@ bool comm::Manager::AutoEvent(string topic, Val* event, Val* flags)
bool comm::Manager::AutoEventStop(const string& topic, Val* event) bool comm::Manager::AutoEventStop(const string& topic, Val* event)
{ {
if ( ! Enabled() )
return false;
if ( event->Type()->Tag() != TYPE_FUNC ) if ( event->Type()->Tag() != TYPE_FUNC )
{ {
reporter->Error("Comm::auto_event_stop must operate on an event"); reporter->Error("Comm::auto_event_stop must operate on an event");
@ -257,6 +284,9 @@ bool comm::Manager::AutoEventStop(const string& topic, Val* event)
RecordVal* comm::Manager::MakeEventArgs(val_list* args) RecordVal* comm::Manager::MakeEventArgs(val_list* args)
{ {
if ( ! Enabled() )
return nullptr;
auto rval = new RecordVal(BifType::Record::Comm::EventArgs); auto rval = new RecordVal(BifType::Record::Comm::EventArgs);
auto arg_vec = new VectorVal(vector_of_data_type); auto arg_vec = new VectorVal(vector_of_data_type);
rval->Assign(1, arg_vec); rval->Assign(1, arg_vec);
@ -324,6 +354,9 @@ RecordVal* comm::Manager::MakeEventArgs(val_list* args)
bool comm::Manager::SubscribeToPrints(string topic_prefix) bool comm::Manager::SubscribeToPrints(string topic_prefix)
{ {
if ( ! Enabled() )
return false;
auto& q = print_subscriptions[topic_prefix]; auto& q = print_subscriptions[topic_prefix];
if ( q ) if ( q )
@ -335,11 +368,17 @@ bool comm::Manager::SubscribeToPrints(string topic_prefix)
bool comm::Manager::UnsubscribeToPrints(const string& topic_prefix) bool comm::Manager::UnsubscribeToPrints(const string& topic_prefix)
{ {
if ( ! Enabled() )
return false;
return print_subscriptions.erase(topic_prefix); return print_subscriptions.erase(topic_prefix);
} }
bool comm::Manager::SubscribeToEvents(string topic_prefix) bool comm::Manager::SubscribeToEvents(string topic_prefix)
{ {
if ( ! Enabled() )
return false;
auto& q = event_subscriptions[topic_prefix]; auto& q = event_subscriptions[topic_prefix];
if ( q ) if ( q )
@ -351,11 +390,17 @@ bool comm::Manager::SubscribeToEvents(string topic_prefix)
bool comm::Manager::UnsubscribeToEvents(const string& topic_prefix) bool comm::Manager::UnsubscribeToEvents(const string& topic_prefix)
{ {
if ( ! Enabled() )
return false;
return event_subscriptions.erase(topic_prefix); return event_subscriptions.erase(topic_prefix);
} }
bool comm::Manager::SubscribeToLogs(string topic_prefix) bool comm::Manager::SubscribeToLogs(string topic_prefix)
{ {
if ( ! Enabled() )
return false;
auto& q = log_subscriptions[topic_prefix]; auto& q = log_subscriptions[topic_prefix];
if ( q ) if ( q )
@ -367,6 +412,9 @@ bool comm::Manager::SubscribeToLogs(string topic_prefix)
bool comm::Manager::UnsubscribeToLogs(const string& topic_prefix) bool comm::Manager::UnsubscribeToLogs(const string& topic_prefix)
{ {
if ( ! Enabled() )
return false;
return log_subscriptions.erase(topic_prefix); return log_subscriptions.erase(topic_prefix);
} }
@ -797,6 +845,9 @@ void comm::Manager::Process()
bool comm::Manager::AddStore(StoreHandleVal* handle) bool comm::Manager::AddStore(StoreHandleVal* handle)
{ {
if ( ! Enabled() )
return false;
if ( ! handle->store ) if ( ! handle->store )
return false; return false;
@ -814,6 +865,9 @@ comm::StoreHandleVal*
comm::Manager::LookupStore(const broker::store::identifier& id, comm::Manager::LookupStore(const broker::store::identifier& id,
comm::StoreType type) comm::StoreType type)
{ {
if ( ! Enabled() )
return nullptr;
auto key = make_pair(id, type); auto key = make_pair(id, type);
auto it = data_stores.find(key); auto it = data_stores.find(key);
@ -826,6 +880,9 @@ comm::Manager::LookupStore(const broker::store::identifier& id,
bool comm::Manager::CloseStore(const broker::store::identifier& id, bool comm::Manager::CloseStore(const broker::store::identifier& id,
StoreType type) StoreType type)
{ {
if ( ! Enabled() )
return false;
auto key = make_pair(id, type); auto key = make_pair(id, type);
auto it = data_stores.find(key); auto it = data_stores.find(key);
@ -854,5 +911,8 @@ bool comm::Manager::CloseStore(const broker::store::identifier& id,
bool comm::Manager::TrackStoreQuery(StoreQueryCallback* cb) bool comm::Manager::TrackStoreQuery(StoreQueryCallback* cb)
{ {
if ( ! Enabled() )
return false;
return pending_queries.insert(cb).second; return pending_queries.insert(cb).second;
} }

View file

@ -24,9 +24,10 @@ public:
~Manager(); ~Manager();
bool InitPreScript(); bool Enable();
bool InitPostScript(); bool Enabled()
{ return endpoint != nullptr; }
bool Listen(uint16_t port, const char* addr = nullptr, bool Listen(uint16_t port, const char* addr = nullptr,
bool reuse_addr = true); bool reuse_addr = true);

13
src/comm/comm.bif Normal file
View file

@ -0,0 +1,13 @@
##! General functions regarding Bro's broker communication mechanisms.
%%{
#include "comm/Manager.h"
%%}
module Comm;
function Comm::enable%(%): bool
%{
return new Val(comm_mgr->Enable(), TYPE_BOOL);
%}

View file

@ -860,12 +860,6 @@ int main(int argc, char** argv)
#ifdef ENABLE_BROKER #ifdef ENABLE_BROKER
comm_mgr = new comm::Manager(); comm_mgr = new comm::Manager();
if ( ! comm_mgr->InitPreScript() )
{
fprintf(stderr, "Failed to initialize communication manager.");
exit(1);
}
#endif #endif
plugin_mgr->InitPreScript(); plugin_mgr->InitPreScript();
@ -942,11 +936,6 @@ int main(int argc, char** argv)
exit(rc); exit(rc);
} }
#ifdef ENABLE_BROKER
comm_mgr->InitPostScript();
iosource_mgr->Register(comm_mgr, true);
#endif
#ifdef USE_PERFTOOLS_DEBUG #ifdef USE_PERFTOOLS_DEBUG
} }
#endif #endif

View file

@ -53,6 +53,7 @@ event ready()
event bro_init() event bro_init()
{ {
Comm::enable();
Comm::listen(9999/tcp, "127.0.0.1"); Comm::listen(9999/tcp, "127.0.0.1");
Comm::subscribe_to_events("bro/event/ready"); Comm::subscribe_to_events("bro/event/ready");
Comm::auto_event("bro/event/done", done); Comm::auto_event("bro/event/done", done);
@ -106,6 +107,7 @@ event Comm::remote_connection_established(peer_address: string,
event bro_init() event bro_init()
{ {
Comm::enable();
Comm::connect("127.0.0.1", 9999/tcp, 1secs); Comm::connect("127.0.0.1", 9999/tcp, 1secs);
Comm::auto_event("bro/event/ready", ready); Comm::auto_event("bro/event/ready", ready);
Comm::subscribe_to_events("bro/event/done"); Comm::subscribe_to_events("bro/event/done");

View file

@ -100,6 +100,7 @@ function comm_vector_to_bro_vector(d: Comm::Data): bro_vector
event bro_init() event bro_init()
{ {
Comm::enable();
print Comm::data_type(Comm::data(T)); print Comm::data_type(Comm::data(T));
print Comm::data_type(Comm::data(+1)); print Comm::data_type(Comm::data(+1));
print Comm::data_type(Comm::data(1)); print Comm::data_type(Comm::data(1));

View file

@ -120,6 +120,7 @@ function dv(d: Comm::Data): Comm::DataVector
event bro_init() event bro_init()
{ {
Comm::enable();
local myset: set[string] = {"a", "b", "c"}; local myset: set[string] = {"a", "b", "c"};
local myvec: vector of string = {"alpha", "beta", "gamma"}; local myvec: vector of string = {"alpha", "beta", "gamma"};
h = Store::create_master("master"); h = Store::create_master("master");

View file

@ -17,6 +17,7 @@ global auto_event_handler: event(msg: string, c: count);
event bro_init() event bro_init()
{ {
Comm::enable();
Comm::listen(9999/tcp, "127.0.0.1"); Comm::listen(9999/tcp, "127.0.0.1");
Comm::subscribe_to_events("bro/event/"); Comm::subscribe_to_events("bro/event/");
Comm::auto_event("bro/event/my_topic", auto_event_handler); Comm::auto_event("bro/event/my_topic", auto_event_handler);
@ -47,6 +48,7 @@ global auto_event_handler: event(msg: string, c: count);
event bro_init() event bro_init()
{ {
Comm::enable();
Comm::subscribe_to_events("bro/event/my_topic"); Comm::subscribe_to_events("bro/event/my_topic");
Comm::connect("127.0.0.1", 9999/tcp, 1secs); Comm::connect("127.0.0.1", 9999/tcp, 1secs);
} }

View file

@ -23,13 +23,14 @@ export {
}; };
global log_test: event(rec: Test::Info); global log_test: event(rec: Test::Info);
event bro_init() &priority=5
{
Log::create_stream(Test::LOG, [$columns=Test::Info, $ev=log_test]);
}
} }
event bro_init() &priority=5
{
Comm::enable();
Log::create_stream(Test::LOG, [$columns=Test::Info, $ev=log_test]);
}
@TEST-END-FILE @TEST-END-FILE
@TEST-START-FILE recv.bro @TEST-START-FILE recv.bro

View file

@ -14,6 +14,7 @@ redef exit_only_after_terminate = T;
event bro_init() event bro_init()
{ {
Comm::enable();
Comm::listen(9999/tcp, "127.0.0.1"); Comm::listen(9999/tcp, "127.0.0.1");
Comm::subscribe_to_prints("bro/print/"); Comm::subscribe_to_prints("bro/print/");
} }
@ -38,6 +39,7 @@ redef exit_only_after_terminate = T;
event bro_init() event bro_init()
{ {
Comm::enable();
Comm::subscribe_to_prints("bro/print/my_topic"); Comm::subscribe_to_prints("bro/print/my_topic");
Comm::connect("127.0.0.1", 9999/tcp, 1secs); Comm::connect("127.0.0.1", 9999/tcp, 1secs);
} }