diff --git a/CHANGES b/CHANGES
index 111c557b49..d7014fa711 100644
--- a/CHANGES
+++ b/CHANGES
@@ -1,4 +1,20 @@
+2.3-526 | 2015-03-06 12:48:49 -0600
+
+ * Fix build warnings, clarify broker requirements, update submodule.
+ (Jon Siwek)
+
+ * Rename comm/ directories to broker/ (Jon Siwek)
+
+ * Rename broker-related namespaces. (Jon Siwek)
+
+ * Improve remote logging via broker by only sending fields w/ &log.
+ (Jon Siwek)
+
+ * Disable a stream's remote logging via broker if it fails. (Jon Siwek)
+
+ * Improve some broker communication unit tests. (Jon Siwek)
+
2.3-518 | 2015-03-04 13:13:50 -0800
* Add bytes_recvd to stats.log recording the number of bytes
diff --git a/VERSION b/VERSION
index 5a9ae5afca..747dc6d532 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-2.3-518
+2.3-526
diff --git a/aux/broker b/aux/broker
index 8cc208192b..694af9d9ed 160000
--- a/aux/broker
+++ b/aux/broker
@@ -1 +1 @@
-Subproject commit 8cc208192b4b692a082e22c8dd89c44f69e824d7
+Subproject commit 694af9d9edd188a461cc762bfdb7b61688b93ada
diff --git a/doc/frameworks/comm.rst b/doc/frameworks/broker.rst
similarity index 66%
rename from doc/frameworks/comm.rst
rename to doc/frameworks/broker.rst
index 38f1f5b644..3cd8dab6e3 100644
--- a/doc/frameworks/comm.rst
+++ b/doc/frameworks/broker.rst
@@ -1,5 +1,5 @@
-.. _comm-framework:
+.. _brokercomm-framework:
======================================
Broker-Enabled Communication Framework
@@ -11,8 +11,8 @@ Broker-Enabled Communication Framework
<../components/broker/README.html>`_ to exchange information with
other Bro processes. To enable it run Bro's ``configure`` script
with the ``--enable-broker`` option. Note that a C++11 compatible
- compiler is required as well as the `C++ Actor Framework
- `_.
+ compiler (e.g. GCC 4.8+ or Clang 3.3+) is required as well as the
+ `C++ Actor Framework `_.
.. contents::
@@ -20,37 +20,37 @@ Connecting to Peers
===================
Communication via Broker must first be turned on via
-:bro:see:`Comm::enable`.
+:bro:see:`BrokerComm::enable`.
-Bro can accept incoming connections by calling :bro:see:`Comm::listen`
+Bro can accept incoming connections by calling :bro:see:`BrokerComm::listen`
and then monitor connection status updates via
-:bro:see:`Comm::incoming_connection_established` and
-:bro:see:`Comm::incoming_connection_broken`.
+:bro:see:`BrokerComm::incoming_connection_established` and
+:bro:see:`BrokerComm::incoming_connection_broken`.
-.. btest-include:: ${DOC_ROOT}/frameworks/comm/connecting-listener.bro
+.. btest-include:: ${DOC_ROOT}/frameworks/broker/connecting-listener.bro
-Bro can initiate outgoing connections by calling :bro:see:`Comm::connect`
+Bro can initiate outgoing connections by calling :bro:see:`BrokerComm::connect`
and then monitor connection status updates via
-:bro:see:`Comm::outgoing_connection_established`,
-:bro:see:`Comm::outgoing_connection_broken`, and
-:bro:see:`Comm::outgoing_connection_incompatible`.
+:bro:see:`BrokerComm::outgoing_connection_established`,
+:bro:see:`BrokerComm::outgoing_connection_broken`, and
+:bro:see:`BrokerComm::outgoing_connection_incompatible`.
-.. btest-include:: ${DOC_ROOT}/frameworks/comm/connecting-connector.bro
+.. btest-include:: ${DOC_ROOT}/frameworks/broker/connecting-connector.bro
Remote Printing
===============
To receive remote print messages, first use
-:bro:see:`Comm::subscribe_to_prints` to advertise to peers a topic
+:bro:see:`BrokerComm::subscribe_to_prints` to advertise to peers a topic
prefix of interest and then create an event handler for
-:bro:see:`Comm::print_handler` to handle any print messages that are
+:bro:see:`BrokerComm::print_handler` to handle any print messages that are
received.
-.. btest-include:: ${DOC_ROOT}/frameworks/comm/printing-listener.bro
+.. btest-include:: ${DOC_ROOT}/frameworks/broker/printing-listener.bro
-To send remote print messages, just call :bro:see:`Comm::print`.
+To send remote print messages, just call :bro:see:`BrokerComm::print`.
-.. btest-include:: ${DOC_ROOT}/frameworks/comm/printing-connector.bro
+.. btest-include:: ${DOC_ROOT}/frameworks/broker/printing-connector.bro
Notice that the subscriber only used the prefix "bro/print/", but is
able to receive messages with full topics of "bro/print/hi",
@@ -72,18 +72,18 @@ Remote Events
=============
Receiving remote events is similar to remote prints. Just use
-:bro:see:`Comm::subscribe_to_events` and possibly define any new events
+:bro:see:`BrokerComm::subscribe_to_events` and possibly define any new events
along with handlers that peers may want to send.
-.. btest-include:: ${DOC_ROOT}/frameworks/comm/events-listener.bro
+.. btest-include:: ${DOC_ROOT}/frameworks/broker/events-listener.bro
To send events, there are two choices. The first is to use call
-:bro:see:`Comm::event` directly. The second option is to use
-:bro:see:`Comm::auto_event` to make it so a particular event is
+:bro:see:`BrokerComm::event` directly. The second option is to use
+:bro:see:`BrokerComm::auto_event` to make it so a particular event is
automatically sent to peers whenever it is called locally via the normal
event invocation syntax.
-.. btest-include:: ${DOC_ROOT}/frameworks/comm/events-connector.bro
+.. btest-include:: ${DOC_ROOT}/frameworks/broker/events-connector.bro
Again, the subscription model is prefix-based.
@@ -105,20 +105,20 @@ parameter of the message.
Remote Logging
==============
-.. btest-include:: ${DOC_ROOT}/frameworks/comm/testlog.bro
+.. btest-include:: ${DOC_ROOT}/frameworks/broker/testlog.bro
-Use :bro:see:`Comm::subscribe_to_logs` to advertise interest in logs
+Use :bro:see:`BrokerComm::subscribe_to_logs` to advertise interest in logs
written by peers. The topic names that Bro uses are implicitly of the
form "bro/log/".
-.. btest-include:: ${DOC_ROOT}/frameworks/comm/logs-listener.bro
+.. btest-include:: ${DOC_ROOT}/frameworks/broker/logs-listener.bro
To send remote logs either use :bro:see:`Log::enable_remote_logging` or
-:bro:see:`Comm::enable_remote_logs`. The former allows any log stream
+:bro:see:`BrokerComm::enable_remote_logs`. The former allows any log stream
to be sent to peers while the later toggles remote logging for
particular streams.
-.. btest-include:: ${DOC_ROOT}/frameworks/comm/logs-connector.bro
+.. btest-include:: ${DOC_ROOT}/frameworks/broker/logs-connector.bro
Message Format
--------------
@@ -140,23 +140,23 @@ Tuning Access Control
By default, endpoints do not restrict the message topics that it sends
to peers and do not restrict what message topics and data store
identifiers get advertised to peers. These are the default
-:bro:see:`Comm::EndpointFlags` supplied to :bro:see:`Comm::enable`.
+:bro:see:`BrokerComm::EndpointFlags` supplied to :bro:see:`BrokerComm::enable`.
If not using the ``auto_publish`` flag, one can use the
-:bro:see:`Comm::publish_topic` and :bro:see:`Comm::unpublish_topic`
+:bro:see:`BrokerComm::publish_topic` and :bro:see:`BrokerComm::unpublish_topic`
functions to manipulate the set of message topics (must match exactly)
that are allowed to be sent to peer endpoints. These settings take
precedence over the per-message ``peers`` flag supplied to functions
-that take a :bro:see:`Comm::SendFlags` such as :bro:see:`Comm::print`,
-:bro:see:`Comm::event`, :bro:see:`Comm::auto_event` or
-:bro:see:`Comm::enable_remote_logs`.
+that take a :bro:see:`BrokerComm::SendFlags` such as :bro:see:`BrokerComm::print`,
+:bro:see:`BrokerComm::event`, :bro:see:`BrokerComm::auto_event` or
+:bro:see:`BrokerComm::enable_remote_logs`.
If not using the ``auto_advertise`` flag, one can use the
-:bro:see:`Comm::advertise_topic` and :bro:see:`Comm::unadvertise_topic`
+:bro:see:`BrokerComm::advertise_topic` and :bro:see:`BrokerComm::unadvertise_topic`
to manupulate the set of topic prefixes that are allowed to be
advertised to peers. If an endpoint does not advertise a topic prefix,
the only way a peers can send messages to it is via the ``unsolicited``
-flag of :bro:see:`Comm::SendFlags` and choosing a topic with a matching
+flag of :bro:see:`BrokerComm::SendFlags` and choosing a topic with a matching
prefix (i.e. full topic may be longer than receivers prefix, just the
prefix needs to match).
@@ -189,13 +189,13 @@ Data stores also support expiration on a per-key basis either using an
absolute point in time or a relative amount of time since the entry's
last modification time.
-.. btest-include:: ${DOC_ROOT}/frameworks/comm/stores-listener.bro
+.. btest-include:: ${DOC_ROOT}/frameworks/broker/stores-listener.bro
-.. btest-include:: ${DOC_ROOT}/frameworks/comm/stores-connector.bro
+.. btest-include:: ${DOC_ROOT}/frameworks/broker/stores-connector.bro
In the above example, if a local copy of the store contents isn't
-needed, just replace the :bro:see:`Store::create_clone` call with
-:bro:see:`Store::create_frontend`. Queries will then be made against
+needed, just replace the :bro:see:`BrokerStore::create_clone` call with
+:bro:see:`BrokerStore::create_frontend`. Queries will then be made against
the remote master store instead of the local clone.
Note that all queries are made within Bro's asynchrounous ``when``
diff --git a/doc/frameworks/comm/connecting-connector.bro b/doc/frameworks/broker/connecting-connector.bro
similarity index 52%
rename from doc/frameworks/comm/connecting-connector.bro
rename to doc/frameworks/broker/connecting-connector.bro
index d5b191ad38..017f88f214 100644
--- a/doc/frameworks/comm/connecting-connector.bro
+++ b/doc/frameworks/broker/connecting-connector.bro
@@ -1,19 +1,19 @@
const broker_port: port &redef;
redef exit_only_after_terminate = T;
-redef Comm::endpoint_name = "connector";
+redef BrokerComm::endpoint_name = "connector";
event bro_init()
{
- Comm::enable();
- Comm::connect("127.0.0.1", broker_port, 1sec);
+ BrokerComm::enable();
+ BrokerComm::connect("127.0.0.1", broker_port, 1sec);
}
-event Comm::outgoing_connection_established(peer_address: string,
+event BrokerComm::outgoing_connection_established(peer_address: string,
peer_port: port,
peer_name: string)
{
- print "Comm::outgoing_connection_established",
+ print "BrokerComm::outgoing_connection_established",
peer_address, peer_port, peer_name;
terminate();
}
diff --git a/doc/frameworks/broker/connecting-listener.bro b/doc/frameworks/broker/connecting-listener.bro
new file mode 100644
index 0000000000..2732bed760
--- /dev/null
+++ b/doc/frameworks/broker/connecting-listener.bro
@@ -0,0 +1,21 @@
+
+const broker_port: port &redef;
+redef exit_only_after_terminate = T;
+redef BrokerComm::endpoint_name = "listener";
+
+event bro_init()
+ {
+ BrokerComm::enable();
+ BrokerComm::listen(broker_port, "127.0.0.1");
+ }
+
+event BrokerComm::incoming_connection_established(peer_name: string)
+ {
+ print "BrokerComm::incoming_connection_established", peer_name;
+ }
+
+event BrokerComm::incoming_connection_broken(peer_name: string)
+ {
+ print "BrokerComm::incoming_connection_broken", peer_name;
+ terminate();
+ }
diff --git a/doc/frameworks/broker/events-connector.bro b/doc/frameworks/broker/events-connector.bro
new file mode 100644
index 0000000000..fc0e48769b
--- /dev/null
+++ b/doc/frameworks/broker/events-connector.bro
@@ -0,0 +1,31 @@
+const broker_port: port &redef;
+redef exit_only_after_terminate = T;
+redef BrokerComm::endpoint_name = "connector";
+global my_event: event(msg: string, c: count);
+global my_auto_event: event(msg: string, c: count);
+
+event bro_init()
+ {
+ BrokerComm::enable();
+ BrokerComm::connect("127.0.0.1", broker_port, 1sec);
+ BrokerComm::auto_event("bro/event/my_auto_event", my_auto_event);
+ }
+
+event BrokerComm::outgoing_connection_established(peer_address: string,
+ peer_port: port,
+ peer_name: string)
+ {
+ print "BrokerComm::outgoing_connection_established",
+ peer_address, peer_port, peer_name;
+ BrokerComm::event("bro/event/my_event", BrokerComm::event_args(my_event, "hi", 0));
+ event my_auto_event("stuff", 88);
+ BrokerComm::event("bro/event/my_event", BrokerComm::event_args(my_event, "...", 1));
+ event my_auto_event("more stuff", 51);
+ BrokerComm::event("bro/event/my_event", BrokerComm::event_args(my_event, "bye", 2));
+ }
+
+event BrokerComm::outgoing_connection_broken(peer_address: string,
+ peer_port: port)
+ {
+ terminate();
+ }
diff --git a/doc/frameworks/comm/events-listener.bro b/doc/frameworks/broker/events-listener.bro
similarity index 62%
rename from doc/frameworks/comm/events-listener.bro
rename to doc/frameworks/broker/events-listener.bro
index 4d8985d09a..09a99b7c97 100644
--- a/doc/frameworks/comm/events-listener.bro
+++ b/doc/frameworks/broker/events-listener.bro
@@ -1,21 +1,21 @@
const broker_port: port &redef;
redef exit_only_after_terminate = T;
-redef Comm::endpoint_name = "listener";
+redef BrokerComm::endpoint_name = "listener";
global msg_count = 0;
global my_event: event(msg: string, c: count);
global my_auto_event: event(msg: string, c: count);
event bro_init()
{
- Comm::enable();
- Comm::subscribe_to_events("bro/event/");
- Comm::listen(broker_port, "127.0.0.1");
+ BrokerComm::enable();
+ BrokerComm::subscribe_to_events("bro/event/");
+ BrokerComm::listen(broker_port, "127.0.0.1");
}
-event Comm::incoming_connection_established(peer_name: string)
+event BrokerComm::incoming_connection_established(peer_name: string)
{
- print "Comm::incoming_connection_established", peer_name;
+ print "BrokerComm::incoming_connection_established", peer_name;
}
event my_event(msg: string, c: count)
diff --git a/doc/frameworks/comm/logs-connector.bro b/doc/frameworks/broker/logs-connector.bro
similarity index 62%
rename from doc/frameworks/comm/logs-connector.bro
rename to doc/frameworks/broker/logs-connector.bro
index 2c02acb188..cdd50b7140 100644
--- a/doc/frameworks/comm/logs-connector.bro
+++ b/doc/frameworks/broker/logs-connector.bro
@@ -2,16 +2,16 @@
const broker_port: port &redef;
redef exit_only_after_terminate = T;
-redef Comm::endpoint_name = "connector";
+redef BrokerComm::endpoint_name = "connector";
redef Log::enable_local_logging = F;
redef Log::enable_remote_logging = F;
global n = 0;
event bro_init()
{
- Comm::enable();
- Comm::enable_remote_logs(Test::LOG);
- Comm::connect("127.0.0.1", broker_port, 1sec);
+ BrokerComm::enable();
+ BrokerComm::enable_remote_logs(Test::LOG);
+ BrokerComm::connect("127.0.0.1", broker_port, 1sec);
}
event do_write()
@@ -24,16 +24,16 @@ event do_write()
event do_write();
}
-event Comm::outgoing_connection_established(peer_address: string,
+event BrokerComm::outgoing_connection_established(peer_address: string,
peer_port: port,
peer_name: string)
{
- print "Comm::outgoing_connection_established",
+ print "BrokerComm::outgoing_connection_established",
peer_address, peer_port, peer_name;
event do_write();
}
-event Comm::outgoing_connection_broken(peer_address: string,
+event BrokerComm::outgoing_connection_broken(peer_address: string,
peer_port: port)
{
terminate();
diff --git a/doc/frameworks/broker/logs-listener.bro b/doc/frameworks/broker/logs-listener.bro
new file mode 100644
index 0000000000..788d459cc8
--- /dev/null
+++ b/doc/frameworks/broker/logs-listener.bro
@@ -0,0 +1,25 @@
+@load ./testlog
+
+const broker_port: port &redef;
+redef exit_only_after_terminate = T;
+redef BrokerComm::endpoint_name = "listener";
+
+event bro_init()
+ {
+ BrokerComm::enable();
+ BrokerComm::subscribe_to_logs("bro/log/Test::LOG");
+ BrokerComm::listen(broker_port, "127.0.0.1");
+ }
+
+event BrokerComm::incoming_connection_established(peer_name: string)
+ {
+ print "BrokerComm::incoming_connection_established", peer_name;
+ }
+
+event Test::log_test(rec: Test::Info)
+ {
+ print "wrote log", rec;
+
+ if ( rec$num == 5 )
+ terminate();
+ }
diff --git a/doc/frameworks/broker/printing-connector.bro b/doc/frameworks/broker/printing-connector.bro
new file mode 100644
index 0000000000..9135f8f4c4
--- /dev/null
+++ b/doc/frameworks/broker/printing-connector.bro
@@ -0,0 +1,26 @@
+const broker_port: port &redef;
+redef exit_only_after_terminate = T;
+redef BrokerComm::endpoint_name = "connector";
+
+event bro_init()
+ {
+ BrokerComm::enable();
+ BrokerComm::connect("127.0.0.1", broker_port, 1sec);
+ }
+
+event BrokerComm::outgoing_connection_established(peer_address: string,
+ peer_port: port,
+ peer_name: string)
+ {
+ print "BrokerComm::outgoing_connection_established",
+ peer_address, peer_port, peer_name;
+ BrokerComm::print("bro/print/hi", "hello");
+ BrokerComm::print("bro/print/stuff", "...");
+ BrokerComm::print("bro/print/bye", "goodbye");
+ }
+
+event BrokerComm::outgoing_connection_broken(peer_address: string,
+ peer_port: port)
+ {
+ terminate();
+ }
diff --git a/doc/frameworks/broker/printing-listener.bro b/doc/frameworks/broker/printing-listener.bro
new file mode 100644
index 0000000000..12abf9131e
--- /dev/null
+++ b/doc/frameworks/broker/printing-listener.bro
@@ -0,0 +1,26 @@
+
+const broker_port: port &redef;
+redef exit_only_after_terminate = T;
+redef BrokerComm::endpoint_name = "listener";
+global msg_count = 0;
+
+event bro_init()
+ {
+ BrokerComm::enable();
+ BrokerComm::subscribe_to_prints("bro/print/");
+ BrokerComm::listen(broker_port, "127.0.0.1");
+ }
+
+event BrokerComm::incoming_connection_established(peer_name: string)
+ {
+ print "BrokerComm::incoming_connection_established", peer_name;
+ }
+
+event BrokerComm::print_handler(msg: string)
+ {
+ ++msg_count;
+ print "got print message", msg;
+
+ if ( msg_count == 3 )
+ terminate();
+ }
diff --git a/doc/frameworks/broker/stores-connector.bro b/doc/frameworks/broker/stores-connector.bro
new file mode 100644
index 0000000000..82fb54dcdb
--- /dev/null
+++ b/doc/frameworks/broker/stores-connector.bro
@@ -0,0 +1,53 @@
+const broker_port: port &redef;
+redef exit_only_after_terminate = T;
+
+global h: opaque of BrokerStore::Handle;
+
+function dv(d: BrokerComm::Data): BrokerComm::DataVector
+ {
+ local rval: BrokerComm::DataVector;
+ rval[0] = d;
+ return rval;
+ }
+
+global ready: event();
+
+event BrokerComm::outgoing_connection_broken(peer_address: string,
+ peer_port: port)
+ {
+ terminate();
+ }
+
+event BrokerComm::outgoing_connection_established(peer_address: string,
+ peer_port: port,
+ peer_name: string)
+ {
+ local myset: set[string] = {"a", "b", "c"};
+ local myvec: vector of string = {"alpha", "beta", "gamma"};
+ h = BrokerStore::create_master("mystore");
+ BrokerStore::insert(h, BrokerComm::data("one"), BrokerComm::data(110));
+ BrokerStore::insert(h, BrokerComm::data("two"), BrokerComm::data(223));
+ BrokerStore::insert(h, BrokerComm::data("myset"), BrokerComm::data(myset));
+ BrokerStore::insert(h, BrokerComm::data("myvec"), BrokerComm::data(myvec));
+ BrokerStore::increment(h, BrokerComm::data("one"));
+ BrokerStore::decrement(h, BrokerComm::data("two"));
+ BrokerStore::add_to_set(h, BrokerComm::data("myset"), BrokerComm::data("d"));
+ BrokerStore::remove_from_set(h, BrokerComm::data("myset"), BrokerComm::data("b"));
+ BrokerStore::push_left(h, BrokerComm::data("myvec"), dv(BrokerComm::data("delta")));
+ BrokerStore::push_right(h, BrokerComm::data("myvec"), dv(BrokerComm::data("omega")));
+
+ when ( local res = BrokerStore::size(h) )
+ {
+ print "master size", res;
+ event ready();
+ }
+ timeout 10sec
+ { print "timeout"; }
+ }
+
+event bro_init()
+ {
+ BrokerComm::enable();
+ BrokerComm::connect("127.0.0.1", broker_port, 1secs);
+ BrokerComm::auto_event("bro/event/ready", ready);
+ }
diff --git a/doc/frameworks/broker/stores-listener.bro b/doc/frameworks/broker/stores-listener.bro
new file mode 100644
index 0000000000..0a8dc85e13
--- /dev/null
+++ b/doc/frameworks/broker/stores-listener.bro
@@ -0,0 +1,43 @@
+const broker_port: port &redef;
+redef exit_only_after_terminate = T;
+
+global h: opaque of BrokerStore::Handle;
+global expected_key_count = 4;
+global key_count = 0;
+
+function do_lookup(key: string)
+ {
+ when ( local res = BrokerStore::lookup(h, BrokerComm::data(key)) )
+ {
+ ++key_count;
+ print "lookup", key, res;
+
+ if ( key_count == expected_key_count )
+ terminate();
+ }
+ timeout 10sec
+ { print "timeout", key; }
+ }
+
+event ready()
+ {
+ h = BrokerStore::create_clone("mystore");
+
+ when ( local res = BrokerStore::keys(h) )
+ {
+ print "clone keys", res;
+ do_lookup(BrokerComm::refine_to_string(BrokerComm::vector_lookup(res$result, 0)));
+ do_lookup(BrokerComm::refine_to_string(BrokerComm::vector_lookup(res$result, 1)));
+ do_lookup(BrokerComm::refine_to_string(BrokerComm::vector_lookup(res$result, 2)));
+ do_lookup(BrokerComm::refine_to_string(BrokerComm::vector_lookup(res$result, 3)));
+ }
+ timeout 10sec
+ { print "timeout"; }
+ }
+
+event bro_init()
+ {
+ BrokerComm::enable();
+ BrokerComm::subscribe_to_events("bro/event/ready");
+ BrokerComm::listen(broker_port, "127.0.0.1");
+ }
diff --git a/doc/frameworks/comm/testlog.bro b/doc/frameworks/broker/testlog.bro
similarity index 92%
rename from doc/frameworks/comm/testlog.bro
rename to doc/frameworks/broker/testlog.bro
index b5f449ae3d..9c04218e85 100644
--- a/doc/frameworks/comm/testlog.bro
+++ b/doc/frameworks/broker/testlog.bro
@@ -14,6 +14,6 @@ export {
event bro_init() &priority=5
{
- Comm::enable();
+ BrokerComm::enable();
Log::create_stream(Test::LOG, [$columns=Test::Info, $ev=log_test]);
}
diff --git a/doc/frameworks/comm/connecting-listener.bro b/doc/frameworks/comm/connecting-listener.bro
deleted file mode 100644
index 4e5c3ad86f..0000000000
--- a/doc/frameworks/comm/connecting-listener.bro
+++ /dev/null
@@ -1,21 +0,0 @@
-
-const broker_port: port &redef;
-redef exit_only_after_terminate = T;
-redef Comm::endpoint_name = "listener";
-
-event bro_init()
- {
- Comm::enable();
- Comm::listen(broker_port, "127.0.0.1");
- }
-
-event Comm::incoming_connection_established(peer_name: string)
- {
- print "Comm::incoming_connection_established", peer_name;
- }
-
-event Comm::incoming_connection_broken(peer_name: string)
- {
- print "Comm::incoming_connection_broken", peer_name;
- terminate();
- }
diff --git a/doc/frameworks/comm/events-connector.bro b/doc/frameworks/comm/events-connector.bro
deleted file mode 100644
index 28a94f356e..0000000000
--- a/doc/frameworks/comm/events-connector.bro
+++ /dev/null
@@ -1,31 +0,0 @@
-const broker_port: port &redef;
-redef exit_only_after_terminate = T;
-redef Comm::endpoint_name = "connector";
-global my_event: event(msg: string, c: count);
-global my_auto_event: event(msg: string, c: count);
-
-event bro_init()
- {
- Comm::enable();
- Comm::connect("127.0.0.1", broker_port, 1sec);
- Comm::auto_event("bro/event/my_auto_event", my_auto_event);
- }
-
-event Comm::outgoing_connection_established(peer_address: string,
- peer_port: port,
- peer_name: string)
- {
- print "Comm::outgoing_connection_established",
- peer_address, peer_port, peer_name;
- Comm::event("bro/event/my_event", Comm::event_args(my_event, "hi", 0));
- event my_auto_event("stuff", 88);
- Comm::event("bro/event/my_event", Comm::event_args(my_event, "...", 1));
- event my_auto_event("more stuff", 51);
- Comm::event("bro/event/my_event", Comm::event_args(my_event, "bye", 2));
- }
-
-event Comm::outgoing_connection_broken(peer_address: string,
- peer_port: port)
- {
- terminate();
- }
diff --git a/doc/frameworks/comm/logs-listener.bro b/doc/frameworks/comm/logs-listener.bro
deleted file mode 100644
index 5f763be7ee..0000000000
--- a/doc/frameworks/comm/logs-listener.bro
+++ /dev/null
@@ -1,25 +0,0 @@
-@load ./testlog
-
-const broker_port: port &redef;
-redef exit_only_after_terminate = T;
-redef Comm::endpoint_name = "listener";
-
-event bro_init()
- {
- Comm::enable();
- Comm::subscribe_to_logs("bro/log/Test::LOG");
- Comm::listen(broker_port, "127.0.0.1");
- }
-
-event Comm::incoming_connection_established(peer_name: string)
- {
- print "Comm::incoming_connection_established", peer_name;
- }
-
-event Test::log_test(rec: Test::Info)
- {
- print "wrote log", rec;
-
- if ( rec$num == 5 )
- terminate();
- }
diff --git a/doc/frameworks/comm/printing-connector.bro b/doc/frameworks/comm/printing-connector.bro
deleted file mode 100644
index 76567dbb97..0000000000
--- a/doc/frameworks/comm/printing-connector.bro
+++ /dev/null
@@ -1,26 +0,0 @@
-const broker_port: port &redef;
-redef exit_only_after_terminate = T;
-redef Comm::endpoint_name = "connector";
-
-event bro_init()
- {
- Comm::enable();
- Comm::connect("127.0.0.1", broker_port, 1sec);
- }
-
-event Comm::outgoing_connection_established(peer_address: string,
- peer_port: port,
- peer_name: string)
- {
- print "Comm::outgoing_connection_established",
- peer_address, peer_port, peer_name;
- Comm::print("bro/print/hi", "hello");
- Comm::print("bro/print/stuff", "...");
- Comm::print("bro/print/bye", "goodbye");
- }
-
-event Comm::outgoing_connection_broken(peer_address: string,
- peer_port: port)
- {
- terminate();
- }
diff --git a/doc/frameworks/comm/printing-listener.bro b/doc/frameworks/comm/printing-listener.bro
deleted file mode 100644
index 9bd3844502..0000000000
--- a/doc/frameworks/comm/printing-listener.bro
+++ /dev/null
@@ -1,26 +0,0 @@
-
-const broker_port: port &redef;
-redef exit_only_after_terminate = T;
-redef Comm::endpoint_name = "listener";
-global msg_count = 0;
-
-event bro_init()
- {
- Comm::enable();
- Comm::subscribe_to_prints("bro/print/");
- Comm::listen(broker_port, "127.0.0.1");
- }
-
-event Comm::incoming_connection_established(peer_name: string)
- {
- print "Comm::incoming_connection_established", peer_name;
- }
-
-event Comm::print_handler(msg: string)
- {
- ++msg_count;
- print "got print message", msg;
-
- if ( msg_count == 3 )
- terminate();
- }
diff --git a/doc/frameworks/comm/stores-connector.bro b/doc/frameworks/comm/stores-connector.bro
deleted file mode 100644
index 61e863e835..0000000000
--- a/doc/frameworks/comm/stores-connector.bro
+++ /dev/null
@@ -1,53 +0,0 @@
-const broker_port: port &redef;
-redef exit_only_after_terminate = T;
-
-global h: opaque of Store::Handle;
-
-function dv(d: Comm::Data): Comm::DataVector
- {
- local rval: Comm::DataVector;
- rval[0] = d;
- return rval;
- }
-
-global ready: event();
-
-event Comm::outgoing_connection_broken(peer_address: string,
- peer_port: port)
- {
- terminate();
- }
-
-event Comm::outgoing_connection_established(peer_address: string,
- peer_port: port,
- peer_name: string)
- {
- local myset: set[string] = {"a", "b", "c"};
- local myvec: vector of string = {"alpha", "beta", "gamma"};
- h = Store::create_master("mystore");
- Store::insert(h, Comm::data("one"), Comm::data(110));
- Store::insert(h, Comm::data("two"), Comm::data(223));
- Store::insert(h, Comm::data("myset"), Comm::data(myset));
- Store::insert(h, Comm::data("myvec"), Comm::data(myvec));
- Store::increment(h, Comm::data("one"));
- Store::decrement(h, Comm::data("two"));
- Store::add_to_set(h, Comm::data("myset"), Comm::data("d"));
- Store::remove_from_set(h, Comm::data("myset"), Comm::data("b"));
- Store::push_left(h, Comm::data("myvec"), dv(Comm::data("delta")));
- Store::push_right(h, Comm::data("myvec"), dv(Comm::data("omega")));
-
- when ( local res = Store::size(h) )
- {
- print "master size", res;
- event ready();
- }
- timeout 10sec
- { print "timeout"; }
- }
-
-event bro_init()
- {
- Comm::enable();
- Comm::connect("127.0.0.1", broker_port, 1secs);
- Comm::auto_event("bro/event/ready", ready);
- }
diff --git a/doc/frameworks/comm/stores-listener.bro b/doc/frameworks/comm/stores-listener.bro
deleted file mode 100644
index 89384c2e9d..0000000000
--- a/doc/frameworks/comm/stores-listener.bro
+++ /dev/null
@@ -1,43 +0,0 @@
-const broker_port: port &redef;
-redef exit_only_after_terminate = T;
-
-global h: opaque of Store::Handle;
-global expected_key_count = 4;
-global key_count = 0;
-
-function do_lookup(key: string)
- {
- when ( local res = Store::lookup(h, Comm::data(key)) )
- {
- ++key_count;
- print "lookup", key, res;
-
- if ( key_count == expected_key_count )
- terminate();
- }
- timeout 10sec
- { print "timeout", key; }
- }
-
-event ready()
- {
- h = Store::create_clone("mystore");
-
- when ( local res = Store::keys(h) )
- {
- print "clone keys", res;
- do_lookup(Comm::refine_to_string(Comm::vector_lookup(res$result, 0)));
- do_lookup(Comm::refine_to_string(Comm::vector_lookup(res$result, 1)));
- do_lookup(Comm::refine_to_string(Comm::vector_lookup(res$result, 2)));
- do_lookup(Comm::refine_to_string(Comm::vector_lookup(res$result, 3)));
- }
- timeout 10sec
- { print "timeout"; }
- }
-
-event bro_init()
- {
- Comm::enable();
- Comm::subscribe_to_events("bro/event/ready");
- Comm::listen(broker_port, "127.0.0.1");
- }
diff --git a/doc/frameworks/index.rst b/doc/frameworks/index.rst
index 9819b803f0..028f95af21 100644
--- a/doc/frameworks/index.rst
+++ b/doc/frameworks/index.rst
@@ -14,4 +14,4 @@ Frameworks
notice
signatures
sumstats
- comm
+ broker
diff --git a/scripts/base/frameworks/comm/__load__.bro b/scripts/base/frameworks/broker/__load__.bro
similarity index 100%
rename from scripts/base/frameworks/comm/__load__.bro
rename to scripts/base/frameworks/broker/__load__.bro
diff --git a/scripts/base/frameworks/comm/main.bro b/scripts/base/frameworks/broker/main.bro
similarity index 89%
rename from scripts/base/frameworks/comm/main.bro
rename to scripts/base/frameworks/broker/main.bro
index d0f58f585a..e8b57d57d9 100644
--- a/scripts/base/frameworks/comm/main.bro
+++ b/scripts/base/frameworks/broker/main.bro
@@ -1,11 +1,11 @@
##! Various data structure definitions for use with Bro's communication system.
-module Comm;
+module BrokerComm;
export {
## A name used to identify this endpoint to peers.
- ## .. bro:see:: Comm::connect Comm::listen
+ ## .. bro:see:: BrokerComm::connect BrokerComm::listen
const endpoint_name = "" &redef;
## Change communication behavior.
@@ -32,11 +32,11 @@ export {
## Opaque communication data.
type Data: record {
- d: opaque of Comm::Data &optional;
+ d: opaque of BrokerComm::Data &optional;
};
## Opaque communication data.
- type DataVector: vector of Comm::Data;
+ type DataVector: vector of BrokerComm::Data;
## Opaque event communication data.
type EventArgs: record {
@@ -48,13 +48,13 @@ export {
## Opaque communication data used as a convenient way to wrap key-value
## pairs that comprise table entries.
- type Comm::TableItem : record {
- key: Comm::Data;
- val: Comm::Data;
+ type TableItem : record {
+ key: BrokerComm::Data;
+ val: BrokerComm::Data;
};
}
-module Store;
+module BrokerStore;
export {
@@ -76,11 +76,11 @@ export {
## The result of a data store query.
type QueryResult: record {
## Whether the query completed or not.
- status: Store::QueryStatus;
+ status: BrokerStore::QueryStatus;
## The result of the query. Certain queries may use a particular
## data type (e.g. querying store size always returns a count, but
## a lookup may return various data types).
- result: Comm::Data;
+ result: BrokerComm::Data;
};
## Options to tune the SQLite storage backend.
diff --git a/scripts/base/init-bare.bro b/scripts/base/init-bare.bro
index 5dc3345b09..c62549f8b3 100644
--- a/scripts/base/init-bare.bro
+++ b/scripts/base/init-bare.bro
@@ -3360,7 +3360,7 @@ const bits_per_uid: count = 96 &redef;
# Load these frameworks here because they use fairly deep integration with
# BiFs and script-land defined types.
-@load base/frameworks/comm
+@load base/frameworks/broker
@load base/frameworks/logging
@load base/frameworks/input
@load base/frameworks/analyzer
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 974c23c3a3..e73324c4d1 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -162,11 +162,11 @@ add_subdirectory(logging)
add_subdirectory(probabilistic)
if ( ENABLE_BROKER )
- add_subdirectory(comm)
+ add_subdirectory(broker)
else ()
# Just to satisfy coverage unit tests until new Broker-based
# communication is enabled by default.
- add_subdirectory(comm-dummy)
+ add_subdirectory(broker-dummy)
endif ()
set(bro_SUBDIRS
diff --git a/src/EventHandler.cc b/src/EventHandler.cc
index f063026d9a..3f1fd71ddf 100644
--- a/src/EventHandler.cc
+++ b/src/EventHandler.cc
@@ -6,8 +6,8 @@
#include "NetVar.h"
#ifdef ENABLE_BROKER
-#include "comm/Manager.h"
-#include "comm/Data.h"
+#include "broker/Manager.h"
+#include "broker/Data.h"
#endif
EventHandler::EventHandler(const char* arg_name)
@@ -96,7 +96,7 @@ void EventHandler::Call(val_list* vl, bool no_remote)
for ( auto i = 0; i < vl->length(); ++i )
{
- auto opt_data = comm::val_to_data((*vl)[i]);
+ auto opt_data = bro_broker::val_to_data((*vl)[i]);
if ( opt_data )
msg.emplace_back(move(*opt_data));
@@ -116,9 +116,9 @@ void EventHandler::Call(val_list* vl, bool no_remote)
it != auto_remote_send.end(); ++it )
{
if ( std::next(it) == auto_remote_send.end() )
- comm_mgr->Event(it->first, move(msg), it->second);
+ broker_mgr->Event(it->first, move(msg), it->second);
else
- comm_mgr->Event(it->first, msg, it->second);
+ broker_mgr->Event(it->first, msg, it->second);
}
}
}
diff --git a/src/Net.cc b/src/Net.cc
index 3acd4bce9d..af542cb1a6 100644
--- a/src/Net.cc
+++ b/src/Net.cc
@@ -35,7 +35,7 @@
#include "plugin/Manager.h"
#ifdef ENABLE_BROKER
-#include "comm/Manager.h"
+#include "broker/Manager.h"
#endif
extern "C" {
@@ -322,7 +322,7 @@ void net_run()
bool communication_enabled = using_communication;
#ifdef ENABLE_BROKER
- communication_enabled |= comm_mgr->Enabled();
+ communication_enabled |= broker_mgr->Enabled();
#endif
if ( src )
diff --git a/src/Stats.cc b/src/Stats.cc
index 111af52598..00f603cba7 100644
--- a/src/Stats.cc
+++ b/src/Stats.cc
@@ -11,7 +11,7 @@
#include "threading/Manager.h"
#ifdef ENABLE_BROKER
-#include "comm/Manager.h"
+#include "broker/Manager.h"
#endif
int killed_by_inactivity = 0;
@@ -227,7 +227,7 @@ void ProfileLogger::Log()
}
#ifdef ENABLE_BROKER
- auto cs = comm_mgr->ConsumeStatistics();
+ auto cs = broker_mgr->ConsumeStatistics();
file->Write(fmt("%0.6f Comm: peers=%zu stores=%zu "
"store_queries=%zu store_responses=%zu "
diff --git a/src/comm-dummy/CMakeLists.txt b/src/broker-dummy/CMakeLists.txt
similarity index 73%
rename from src/comm-dummy/CMakeLists.txt
rename to src/broker-dummy/CMakeLists.txt
index cddea1342d..08c5f3214c 100644
--- a/src/comm-dummy/CMakeLists.txt
+++ b/src/broker-dummy/CMakeLists.txt
@@ -9,5 +9,5 @@ bif_target(data.bif)
bif_target(messaging.bif)
bif_target(store.bif)
-bro_add_subdir_library(comm_dummy ${BIF_OUTPUT_CC})
-add_dependencies(bro_comm_dummy generate_outputs)
+bro_add_subdir_library(broker_dummy ${BIF_OUTPUT_CC})
+add_dependencies(bro_broker_dummy generate_outputs)
diff --git a/src/comm-dummy/comm.bif b/src/broker-dummy/comm.bif
similarity index 100%
rename from src/comm-dummy/comm.bif
rename to src/broker-dummy/comm.bif
diff --git a/src/comm-dummy/data.bif b/src/broker-dummy/data.bif
similarity index 100%
rename from src/comm-dummy/data.bif
rename to src/broker-dummy/data.bif
diff --git a/src/comm-dummy/messaging.bif b/src/broker-dummy/messaging.bif
similarity index 100%
rename from src/comm-dummy/messaging.bif
rename to src/broker-dummy/messaging.bif
diff --git a/src/comm-dummy/store.bif b/src/broker-dummy/store.bif
similarity index 100%
rename from src/comm-dummy/store.bif
rename to src/broker-dummy/store.bif
diff --git a/src/comm/CMakeLists.txt b/src/broker/CMakeLists.txt
similarity index 82%
rename from src/comm/CMakeLists.txt
rename to src/broker/CMakeLists.txt
index ef41c605c7..7329bfd46e 100644
--- a/src/comm/CMakeLists.txt
+++ b/src/broker/CMakeLists.txt
@@ -24,5 +24,5 @@ bif_target(data.bif)
bif_target(messaging.bif)
bif_target(store.bif)
-bro_add_subdir_library(comm ${comm_SRCS} ${BIF_OUTPUT_CC})
-add_dependencies(bro_comm generate_outputs)
+bro_add_subdir_library(brokercomm ${comm_SRCS} ${BIF_OUTPUT_CC})
+add_dependencies(bro_brokercomm generate_outputs)
diff --git a/src/comm/Data.cc b/src/broker/Data.cc
similarity index 75%
rename from src/comm/Data.cc
rename to src/broker/Data.cc
index 46fc8bc8eb..8f66427bb5 100644
--- a/src/comm/Data.cc
+++ b/src/broker/Data.cc
@@ -1,15 +1,15 @@
#include "Data.h"
-#include "comm/data.bif.h"
+#include "broker/data.bif.h"
#include
#include
using namespace std;
-OpaqueType* comm::opaque_of_data_type;
-OpaqueType* comm::opaque_of_set_iterator;
-OpaqueType* comm::opaque_of_table_iterator;
-OpaqueType* comm::opaque_of_vector_iterator;
-OpaqueType* comm::opaque_of_record_iterator;
+OpaqueType* bro_broker::opaque_of_data_type;
+OpaqueType* bro_broker::opaque_of_set_iterator;
+OpaqueType* bro_broker::opaque_of_table_iterator;
+OpaqueType* bro_broker::opaque_of_vector_iterator;
+OpaqueType* bro_broker::opaque_of_record_iterator;
static broker::port::protocol to_broker_port_proto(TransportProto tp)
{
@@ -26,7 +26,7 @@ static broker::port::protocol to_broker_port_proto(TransportProto tp)
}
}
-TransportProto comm::to_bro_port_proto(broker::port::protocol tp)
+TransportProto bro_broker::to_bro_port_proto(broker::port::protocol tp)
{
switch ( tp ) {
case broker::port::protocol::tcp:
@@ -45,6 +45,7 @@ struct val_converter {
using result_type = Val*;
BroType* type;
+ bool require_log_attr;
result_type operator()(bool a)
{
@@ -134,7 +135,7 @@ struct val_converter {
result_type operator()(broker::port& a)
{
if ( type->Tag() == TYPE_PORT )
- return new PortVal(a.number(), comm::to_bro_port_proto(a.type()));
+ return new PortVal(a.number(), bro_broker::to_bro_port_proto(a.type()));
return nullptr;
}
@@ -203,8 +204,8 @@ struct val_converter {
for ( auto i = 0u; i < indices->size(); ++i )
{
- auto index_val = comm::data_to_val(move((*indices)[i]),
- (*expected_index_types)[i]);
+ auto index_val = bro_broker::data_to_val(move((*indices)[i]),
+ (*expected_index_types)[i]);
if ( ! index_val )
{
@@ -256,8 +257,8 @@ struct val_converter {
for ( auto i = 0u; i < indices->size(); ++i )
{
- auto index_val = comm::data_to_val(move((*indices)[i]),
- (*expected_index_types)[i]);
+ auto index_val = bro_broker::data_to_val(move((*indices)[i]),
+ (*expected_index_types)[i]);
if ( ! index_val )
{
@@ -269,8 +270,8 @@ struct val_converter {
list_val->Append(index_val);
}
- auto value_val = comm::data_to_val(move(item.second),
- tt->YieldType());
+ auto value_val = bro_broker::data_to_val(move(item.second),
+ tt->YieldType());
if ( ! value_val )
{
@@ -296,7 +297,7 @@ struct val_converter {
for ( auto& item : a )
{
- auto item_val = comm::data_to_val(move(item), vt->YieldType());
+ auto item_val = bro_broker::data_to_val(move(item), vt->YieldType());
if ( ! item_val )
{
@@ -316,22 +317,27 @@ struct val_converter {
return nullptr;
auto rt = type->AsRecordType();
-
- if ( a.fields.size() != static_cast(rt->NumFields()) )
- return nullptr;
-
auto rval = new RecordVal(rt);
- for ( auto i = 0u; i < a.fields.size(); ++i )
+ for ( auto i = 0u; i < static_cast(rt->NumFields()); ++i )
{
+ if ( require_log_attr && ! rt->FieldDecl(i)->FindAttr(ATTR_LOG) )
+ continue;
+
+ if ( i >= a.fields.size() )
+ {
+ Unref(rval);
+ return nullptr;
+ }
+
if ( ! a.fields[i] )
{
rval->Assign(i, nullptr);
continue;
}
- auto item_val = comm::data_to_val(move(*a.fields[i]),
- rt->FieldType(i));
+ auto item_val = bro_broker::data_to_val(move(*a.fields[i]),
+ rt->FieldType(i));
if ( ! item_val )
{
@@ -346,12 +352,12 @@ struct val_converter {
}
};
-Val* comm::data_to_val(broker::data d, BroType* type)
+Val* bro_broker::data_to_val(broker::data d, BroType* type, bool require_log_attr)
{
- return broker::visit(val_converter{type}, d);
+ return broker::visit(val_converter{type, require_log_attr}, d);
}
-broker::util::optional comm::val_to_data(Val* v)
+broker::util::optional bro_broker::val_to_data(Val* v)
{
switch ( v->Type()->Tag() ) {
case TYPE_BOOL:
@@ -533,7 +539,7 @@ broker::util::optional comm::val_to_data(Val* v)
return {rval};
}
default:
- reporter->Error("unsupported Comm::Data type: %s",
+ reporter->Error("unsupported BrokerComm::Data type: %s",
type_name(v->Type()->Tag()));
break;
}
@@ -541,9 +547,9 @@ broker::util::optional comm::val_to_data(Val* v)
return {};
}
-RecordVal* comm::make_data_val(Val* v)
+RecordVal* bro_broker::make_data_val(Val* v)
{
- auto rval = new RecordVal(BifType::Record::Comm::Data);
+ auto rval = new RecordVal(BifType::Record::BrokerComm::Data);
auto data = val_to_data(v);
if ( data )
@@ -552,9 +558,9 @@ RecordVal* comm::make_data_val(Val* v)
return rval;
}
-RecordVal* comm::make_data_val(broker::data d)
+RecordVal* bro_broker::make_data_val(broker::data d)
{
- auto rval = new RecordVal(BifType::Record::Comm::Data);
+ auto rval = new RecordVal(BifType::Record::BrokerComm::Data);
rval->Assign(0, new DataVal(move(d)));
return rval;
}
@@ -564,114 +570,114 @@ struct data_type_getter {
result_type operator()(bool a)
{
- return new EnumVal(BifEnum::Comm::BOOL,
- BifType::Enum::Comm::DataType);
+ return new EnumVal(BifEnum::BrokerComm::BOOL,
+ BifType::Enum::BrokerComm::DataType);
}
result_type operator()(uint64_t a)
{
- return new EnumVal(BifEnum::Comm::COUNT,
- BifType::Enum::Comm::DataType);
+ return new EnumVal(BifEnum::BrokerComm::COUNT,
+ BifType::Enum::BrokerComm::DataType);
}
result_type operator()(int64_t a)
{
- return new EnumVal(BifEnum::Comm::INT,
- BifType::Enum::Comm::DataType);
+ return new EnumVal(BifEnum::BrokerComm::INT,
+ BifType::Enum::BrokerComm::DataType);
}
result_type operator()(double a)
{
- return new EnumVal(BifEnum::Comm::DOUBLE,
- BifType::Enum::Comm::DataType);
+ return new EnumVal(BifEnum::BrokerComm::DOUBLE,
+ BifType::Enum::BrokerComm::DataType);
}
result_type operator()(const std::string& a)
{
- return new EnumVal(BifEnum::Comm::STRING,
- BifType::Enum::Comm::DataType);
+ return new EnumVal(BifEnum::BrokerComm::STRING,
+ BifType::Enum::BrokerComm::DataType);
}
result_type operator()(const broker::address& a)
{
- return new EnumVal(BifEnum::Comm::ADDR,
- BifType::Enum::Comm::DataType);
+ return new EnumVal(BifEnum::BrokerComm::ADDR,
+ BifType::Enum::BrokerComm::DataType);
}
result_type operator()(const broker::subnet& a)
{
- return new EnumVal(BifEnum::Comm::SUBNET,
- BifType::Enum::Comm::DataType);
+ return new EnumVal(BifEnum::BrokerComm::SUBNET,
+ BifType::Enum::BrokerComm::DataType);
}
result_type operator()(const broker::port& a)
{
- return new EnumVal(BifEnum::Comm::PORT,
- BifType::Enum::Comm::DataType);
+ return new EnumVal(BifEnum::BrokerComm::PORT,
+ BifType::Enum::BrokerComm::DataType);
}
result_type operator()(const broker::time_point& a)
{
- return new EnumVal(BifEnum::Comm::TIME,
- BifType::Enum::Comm::DataType);
+ return new EnumVal(BifEnum::BrokerComm::TIME,
+ BifType::Enum::BrokerComm::DataType);
}
result_type operator()(const broker::time_duration& a)
{
- return new EnumVal(BifEnum::Comm::INTERVAL,
- BifType::Enum::Comm::DataType);
+ return new EnumVal(BifEnum::BrokerComm::INTERVAL,
+ BifType::Enum::BrokerComm::DataType);
}
result_type operator()(const broker::enum_value& a)
{
- return new EnumVal(BifEnum::Comm::ENUM,
- BifType::Enum::Comm::DataType);
+ return new EnumVal(BifEnum::BrokerComm::ENUM,
+ BifType::Enum::BrokerComm::DataType);
}
result_type operator()(const broker::set& a)
{
- return new EnumVal(BifEnum::Comm::SET,
- BifType::Enum::Comm::DataType);
+ return new EnumVal(BifEnum::BrokerComm::SET,
+ BifType::Enum::BrokerComm::DataType);
}
result_type operator()(const broker::table& a)
{
- return new EnumVal(BifEnum::Comm::TABLE,
- BifType::Enum::Comm::DataType);
+ return new EnumVal(BifEnum::BrokerComm::TABLE,
+ BifType::Enum::BrokerComm::DataType);
}
result_type operator()(const broker::vector& a)
{
- return new EnumVal(BifEnum::Comm::VECTOR,
- BifType::Enum::Comm::DataType);
+ return new EnumVal(BifEnum::BrokerComm::VECTOR,
+ BifType::Enum::BrokerComm::DataType);
}
result_type operator()(const broker::record& a)
{
- return new EnumVal(BifEnum::Comm::RECORD,
- BifType::Enum::Comm::DataType);
+ return new EnumVal(BifEnum::BrokerComm::RECORD,
+ BifType::Enum::BrokerComm::DataType);
}
};
-EnumVal* comm::get_data_type(RecordVal* v, Frame* frame)
+EnumVal* bro_broker::get_data_type(RecordVal* v, Frame* frame)
{
return broker::visit(data_type_getter{}, opaque_field_to_data(v, frame));
}
-broker::data& comm::opaque_field_to_data(RecordVal* v, Frame* f)
+broker::data& bro_broker::opaque_field_to_data(RecordVal* v, Frame* f)
{
Val* d = v->Lookup(0);
if ( ! d )
reporter->RuntimeError(f->GetCall()->GetLocationInfo(),
- "Comm::Data's opaque field is not set");
+ "BrokerComm::Data's opaque field is not set");
return static_cast(d)->data;
}
-IMPLEMENT_SERIAL(comm::DataVal, SER_COMM_DATA_VAL);
+IMPLEMENT_SERIAL(bro_broker::DataVal, SER_COMM_DATA_VAL);
-bool comm::DataVal::DoSerialize(SerialInfo* info) const
+bool bro_broker::DataVal::DoSerialize(SerialInfo* info) const
{
DO_SERIALIZE(SER_COMM_DATA_VAL, OpaqueVal);
@@ -685,7 +691,7 @@ bool comm::DataVal::DoSerialize(SerialInfo* info) const
return true;
}
-bool comm::DataVal::DoUnserialize(UnserialInfo* info)
+bool bro_broker::DataVal::DoUnserialize(UnserialInfo* info)
{
DO_UNSERIALIZE(OpaqueVal);
diff --git a/src/comm/Data.h b/src/broker/Data.h
similarity index 83%
rename from src/comm/Data.h
rename to src/broker/Data.h
index ef7b15110d..84495056be 100644
--- a/src/comm/Data.h
+++ b/src/broker/Data.h
@@ -7,7 +7,7 @@
#include "Frame.h"
#include "Expr.h"
-namespace comm {
+namespace bro_broker {
extern OpaqueType* opaque_of_data_type;
extern OpaqueType* opaque_of_set_iterator;
@@ -21,25 +21,25 @@ extern OpaqueType* opaque_of_record_iterator;
TransportProto to_bro_port_proto(broker::port::protocol tp);
/**
- * Create a Comm::Data value from a Bro value.
+ * Create a BrokerComm::Data value from a Bro value.
* @param v the Bro value to convert to a Broker data value.
- * @return a Comm::Data value, where the optional field is set if the conversion
+ * @return a BrokerComm::Data value, where the optional field is set if the conversion
* was possible, else it is unset.
*/
RecordVal* make_data_val(Val* v);
/**
- * Create a Comm::Data value from a Broker data value.
+ * Create a BrokerComm::Data value from a Broker data value.
* @param d the Broker value to wrap in an opaque type.
- * @return a Comm::Data value that wraps the Broker value.
+ * @return a BrokerComm::Data value that wraps the Broker value.
*/
RecordVal* make_data_val(broker::data d);
/**
- * Get the type of Broker data that Comm::Data wraps.
- * @param v a Comm::Data value.
+ * Get the type of Broker data that BrokerComm::Data wraps.
+ * @param v a BrokerComm::Data value.
* @param frame used to get location info upon error.
- * @return a Comm::DataType value.
+ * @return a BrokerComm::DataType value.
*/
EnumVal* get_data_type(RecordVal* v, Frame* frame);
@@ -54,10 +54,12 @@ broker::util::optional val_to_data(Val* v);
* Convert a Broker data value to a Bro value.
* @param d a Broker data value.
* @param type the expected type of the value to return.
+ * @param require_log_attr if true, skip over record fields that don't have the
+ * &log attribute.
* @return a pointer to a new Bro value or a nullptr if the conversion was not
* possible.
*/
-Val* data_to_val(broker::data d, BroType* type);
+Val* data_to_val(broker::data d, BroType* type, bool require_log_attr = false);
/**
* A Bro value which wraps a Broker data value.
@@ -66,7 +68,7 @@ class DataVal : public OpaqueVal {
public:
DataVal(broker::data arg_data)
- : OpaqueVal(comm::opaque_of_data_type), data(std::move(arg_data))
+ : OpaqueVal(bro_broker::opaque_of_data_type), data(std::move(arg_data))
{}
void ValDescribe(ODesc* d) const override
@@ -139,8 +141,8 @@ struct type_name_getter {
};
/**
- * Retrieve Broker data value associated with a Comm::Data Bro value.
- * @param v a Comm::Data value.
+ * Retrieve Broker data value associated with a BrokerComm::Data Bro value.
+ * @param v a BrokerComm::Data value.
* @param f used to get location information on error.
* @return a reference to the wrapped Broker data value. A runtime interpreter
* exception is thrown if the the optional opaque value of \a v is not set.
@@ -181,9 +183,9 @@ inline T& require_data_type(RecordVal* v, TypeTag tag, Frame* f)
}
/**
- * Convert a Comm::Data Bro value to a Bro value of a given type.
+ * Convert a BrokerComm::Data Bro value to a Bro value of a given type.
* @tparam a type that a Broker data variant may contain.
- * @param v a Comm::Data value.
+ * @param v a BrokerComm::Data value.
* @param tag a Bro type to convert to.
* @param f used to get location information on error.
* A runtime interpret exception is thrown if trying to access a type which
@@ -201,7 +203,7 @@ class SetIterator : public OpaqueVal {
public:
SetIterator(RecordVal* v, TypeTag tag, Frame* f)
- : OpaqueVal(comm::opaque_of_set_iterator),
+ : OpaqueVal(bro_broker::opaque_of_set_iterator),
dat(require_data_type(v, TYPE_TABLE, f)),
it(dat.begin())
{}
@@ -214,7 +216,7 @@ class TableIterator : public OpaqueVal {
public:
TableIterator(RecordVal* v, TypeTag tag, Frame* f)
- : OpaqueVal(comm::opaque_of_table_iterator),
+ : OpaqueVal(bro_broker::opaque_of_table_iterator),
dat(require_data_type(v, TYPE_TABLE, f)),
it(dat.begin())
{}
@@ -227,7 +229,7 @@ class VectorIterator : public OpaqueVal {
public:
VectorIterator(RecordVal* v, TypeTag tag, Frame* f)
- : OpaqueVal(comm::opaque_of_vector_iterator),
+ : OpaqueVal(bro_broker::opaque_of_vector_iterator),
dat(require_data_type(v, TYPE_VECTOR, f)),
it(dat.begin())
{}
@@ -240,7 +242,7 @@ class RecordIterator : public OpaqueVal {
public:
RecordIterator(RecordVal* v, TypeTag tag, Frame* f)
- : OpaqueVal(comm::opaque_of_record_iterator),
+ : OpaqueVal(bro_broker::opaque_of_record_iterator),
dat(require_data_type(v, TYPE_VECTOR, f)),
it(dat.fields.begin())
{}
@@ -249,6 +251,6 @@ public:
decltype(broker::record::fields)::iterator it;
};
-} // namespace comm
+} // namespace bro_broker
#endif // BRO_COMM_DATA_H
diff --git a/src/comm/Manager.cc b/src/broker/Manager.cc
similarity index 76%
rename from src/comm/Manager.cc
rename to src/broker/Manager.cc
index 420f67f711..eadadea137 100644
--- a/src/comm/Manager.cc
+++ b/src/broker/Manager.cc
@@ -8,23 +8,23 @@
#include "util.h"
#include "Var.h"
#include "Reporter.h"
-#include "comm/comm.bif.h"
-#include "comm/data.bif.h"
-#include "comm/messaging.bif.h"
-#include "comm/store.bif.h"
+#include "broker/comm.bif.h"
+#include "broker/data.bif.h"
+#include "broker/messaging.bif.h"
+#include "broker/store.bif.h"
#include "logging/Manager.h"
#include "DebugLogger.h"
#include "iosource/Manager.h"
using namespace std;
-VectorType* comm::Manager::vector_of_data_type;
-EnumType* comm::Manager::log_id_type;
-int comm::Manager::send_flags_self_idx;
-int comm::Manager::send_flags_peers_idx;
-int comm::Manager::send_flags_unsolicited_idx;
+VectorType* bro_broker::Manager::vector_of_data_type;
+EnumType* bro_broker::Manager::log_id_type;
+int bro_broker::Manager::send_flags_self_idx;
+int bro_broker::Manager::send_flags_peers_idx;
+int bro_broker::Manager::send_flags_unsolicited_idx;
-comm::Manager::~Manager()
+bro_broker::Manager::~Manager()
{
for ( auto& s : data_stores )
CloseStore(s.first.first, s.first.second);
@@ -59,25 +59,25 @@ static int endpoint_flags_to_int(Val* broker_endpoint_flags)
return rval;
}
-bool comm::Manager::Enable(Val* broker_endpoint_flags)
+bool bro_broker::Manager::Enable(Val* broker_endpoint_flags)
{
if ( endpoint != nullptr )
return true;
- auto send_flags_type = internal_type("Comm::SendFlags")->AsRecordType();
+ auto send_flags_type = internal_type("BrokerComm::SendFlags")->AsRecordType();
send_flags_self_idx = require_field(send_flags_type, "self");
send_flags_peers_idx = require_field(send_flags_type, "peers");
send_flags_unsolicited_idx = require_field(send_flags_type, "unsolicited");
log_id_type = internal_type("Log::ID")->AsEnumType();
- comm::opaque_of_data_type = new OpaqueType("Comm::Data");
- comm::opaque_of_set_iterator = new OpaqueType("Comm::SetIterator");
- comm::opaque_of_table_iterator = new OpaqueType("Comm::TableIterator");
- comm::opaque_of_vector_iterator = new OpaqueType("Comm::VectorIterator");
- comm::opaque_of_record_iterator = new OpaqueType("Comm::RecordIterator");
- comm::opaque_of_store_handle = new OpaqueType("Store::Handle");
- vector_of_data_type = new VectorType(internal_type("Comm::Data")->Ref());
+ bro_broker::opaque_of_data_type = new OpaqueType("BrokerComm::Data");
+ bro_broker::opaque_of_set_iterator = new OpaqueType("BrokerComm::SetIterator");
+ bro_broker::opaque_of_table_iterator = new OpaqueType("BrokerComm::TableIterator");
+ bro_broker::opaque_of_vector_iterator = new OpaqueType("BrokerComm::VectorIterator");
+ bro_broker::opaque_of_record_iterator = new OpaqueType("BrokerComm::RecordIterator");
+ bro_broker::opaque_of_store_handle = new OpaqueType("BrokerStore::Handle");
+ vector_of_data_type = new VectorType(internal_type("BrokerComm::Data")->Ref());
auto res = broker::init();
@@ -97,7 +97,7 @@ bool comm::Manager::Enable(Val* broker_endpoint_flags)
}
const char* name;
- auto name_from_script = internal_val("Comm::endpoint_name")->AsString();
+ auto name_from_script = internal_val("BrokerComm::endpoint_name")->AsString();
if ( name_from_script->Len() )
name = name_from_script->CheckString();
@@ -117,7 +117,7 @@ bool comm::Manager::Enable(Val* broker_endpoint_flags)
return true;
}
-bool comm::Manager::SetEndpointFlags(Val* broker_endpoint_flags)
+bool bro_broker::Manager::SetEndpointFlags(Val* broker_endpoint_flags)
{
if ( ! Enabled() )
return false;
@@ -127,7 +127,7 @@ bool comm::Manager::SetEndpointFlags(Val* broker_endpoint_flags)
return true;
}
-bool comm::Manager::Listen(uint16_t port, const char* addr, bool reuse_addr)
+bool bro_broker::Manager::Listen(uint16_t port, const char* addr, bool reuse_addr)
{
if ( ! Enabled() )
return false;
@@ -144,7 +144,7 @@ bool comm::Manager::Listen(uint16_t port, const char* addr, bool reuse_addr)
return rval;
}
-bool comm::Manager::Connect(string addr, uint16_t port,
+bool bro_broker::Manager::Connect(string addr, uint16_t port,
chrono::duration retry_interval)
{
if ( ! Enabled() )
@@ -159,7 +159,7 @@ bool comm::Manager::Connect(string addr, uint16_t port,
return true;
}
-bool comm::Manager::Disconnect(const string& addr, uint16_t port)
+bool bro_broker::Manager::Disconnect(const string& addr, uint16_t port)
{
if ( ! Enabled() )
return false;
@@ -174,7 +174,7 @@ bool comm::Manager::Disconnect(const string& addr, uint16_t port)
return rval;
}
-bool comm::Manager::Print(string topic, string msg, Val* flags)
+bool bro_broker::Manager::Print(string topic, string msg, Val* flags)
{
if ( ! Enabled() )
return false;
@@ -184,7 +184,7 @@ bool comm::Manager::Print(string topic, string msg, Val* flags)
return true;
}
-bool comm::Manager::Event(std::string topic, broker::message msg, int flags)
+bool bro_broker::Manager::Event(std::string topic, broker::message msg, int flags)
{
if ( ! Enabled() )
return false;
@@ -193,7 +193,8 @@ bool comm::Manager::Event(std::string topic, broker::message msg, int flags)
return true;
}
-bool comm::Manager::Log(EnumVal* stream, RecordVal* columns, int flags)
+bool bro_broker::Manager::Log(EnumVal* stream, RecordVal* columns, RecordType* info,
+ int flags)
{
if ( ! Enabled() )
return false;
@@ -207,23 +208,44 @@ bool comm::Manager::Log(EnumVal* stream, RecordVal* columns, int flags)
return false;
}
- auto opt_column_data = val_to_data(columns);
+ broker::record column_data;
- if ( ! opt_column_data )
+ for ( auto i = 0u; i < static_cast(info->NumFields()); ++i )
{
- reporter->Error("Failed to remotely log stream %s: unsupported types",
- stream_name);
- return false;
+ if ( ! info->FieldDecl(i)->FindAttr(ATTR_LOG) )
+ continue;
+
+ auto field_val = columns->LookupWithDefault(i);
+
+ if ( ! field_val )
+ {
+ column_data.fields.emplace_back(broker::record::field{});
+ continue;
+ }
+
+ auto opt_field_data = val_to_data(field_val);
+ Unref(field_val);
+
+ if ( ! opt_field_data )
+ {
+ reporter->Error("Failed to remotely log stream %s: "
+ "unsupported type '%s'",
+ stream_name,
+ type_name(info->FieldDecl(i)->type->Tag()));
+ return false;
+ }
+
+ column_data.fields.emplace_back(
+ broker::record::field{move(*opt_field_data)});
}
- broker::message msg{broker::enum_value{stream_name},
- move(*opt_column_data)};
+ broker::message msg{broker::enum_value{stream_name}, move(column_data)};
std::string topic = std::string("bro/log/") + stream_name;
endpoint->send(move(topic), move(msg), flags);
return true;
}
-bool comm::Manager::Event(std::string topic, RecordVal* args, Val* flags)
+bool bro_broker::Manager::Event(std::string topic, RecordVal* args, Val* flags)
{
if ( ! Enabled() )
return false;
@@ -248,14 +270,14 @@ bool comm::Manager::Event(std::string topic, RecordVal* args, Val* flags)
return true;
}
-bool comm::Manager::AutoEvent(string topic, Val* event, Val* flags)
+bool bro_broker::Manager::AutoEvent(string topic, Val* event, Val* flags)
{
if ( ! Enabled() )
return false;
if ( event->Type()->Tag() != TYPE_FUNC )
{
- reporter->Error("Comm::auto_event must operate on an event");
+ reporter->Error("BrokerComm::auto_event must operate on an event");
return false;
}
@@ -263,7 +285,7 @@ bool comm::Manager::AutoEvent(string topic, Val* event, Val* flags)
if ( event_val->Flavor() != FUNC_FLAVOR_EVENT )
{
- reporter->Error("Comm::auto_event must operate on an event");
+ reporter->Error("BrokerComm::auto_event must operate on an event");
return false;
}
@@ -271,7 +293,7 @@ bool comm::Manager::AutoEvent(string topic, Val* event, Val* flags)
if ( ! handler )
{
- reporter->Error("Comm::auto_event failed to lookup event '%s'",
+ reporter->Error("BrokerComm::auto_event failed to lookup event '%s'",
event_val->Name());
return false;
}
@@ -280,14 +302,14 @@ bool comm::Manager::AutoEvent(string topic, Val* event, Val* flags)
return true;
}
-bool comm::Manager::AutoEventStop(const string& topic, Val* event)
+bool bro_broker::Manager::AutoEventStop(const string& topic, Val* event)
{
if ( ! Enabled() )
return false;
if ( event->Type()->Tag() != TYPE_FUNC )
{
- reporter->Error("Comm::auto_event_stop must operate on an event");
+ reporter->Error("BrokerComm::auto_event_stop must operate on an event");
return false;
}
@@ -295,7 +317,7 @@ bool comm::Manager::AutoEventStop(const string& topic, Val* event)
if ( event_val->Flavor() != FUNC_FLAVOR_EVENT )
{
- reporter->Error("Comm::auto_event_stop must operate on an event");
+ reporter->Error("BrokerComm::auto_event_stop must operate on an event");
return false;
}
@@ -303,7 +325,7 @@ bool comm::Manager::AutoEventStop(const string& topic, Val* event)
if ( ! handler )
{
- reporter->Error("Comm::auto_event_stop failed to lookup event '%s'",
+ reporter->Error("BrokerComm::auto_event_stop failed to lookup event '%s'",
event_val->Name());
return false;
}
@@ -313,12 +335,12 @@ bool comm::Manager::AutoEventStop(const string& topic, Val* event)
return true;
}
-RecordVal* comm::Manager::MakeEventArgs(val_list* args)
+RecordVal* bro_broker::Manager::MakeEventArgs(val_list* args)
{
if ( ! Enabled() )
return nullptr;
- auto rval = new RecordVal(BifType::Record::Comm::EventArgs);
+ auto rval = new RecordVal(BifType::Record::BrokerComm::EventArgs);
auto arg_vec = new VectorVal(vector_of_data_type);
rval->Assign(1, arg_vec);
Func* func = 0;
@@ -333,7 +355,7 @@ RecordVal* comm::Manager::MakeEventArgs(val_list* args)
if ( arg_val->Type()->Tag() != TYPE_FUNC )
{
- reporter->Error("1st param of Comm::event_args must be event");
+ reporter->Error("1st param of BrokerComm::event_args must be event");
return rval;
}
@@ -341,7 +363,7 @@ RecordVal* comm::Manager::MakeEventArgs(val_list* args)
if ( func->Flavor() != FUNC_FLAVOR_EVENT )
{
- reporter->Error("1st param of Comm::event_args must be event");
+ reporter->Error("1st param of BrokerComm::event_args must be event");
return rval;
}
@@ -349,7 +371,7 @@ RecordVal* comm::Manager::MakeEventArgs(val_list* args)
if ( num_args != args->length() - 1 )
{
- reporter->Error("bad # of Comm::event_args: got %d, expect %d",
+ reporter->Error("bad # of BrokerComm::event_args: got %d, expect %d",
args->length(), num_args + 1);
return rval;
}
@@ -363,7 +385,7 @@ RecordVal* comm::Manager::MakeEventArgs(val_list* args)
if ( ! same_type((*args)[i]->Type(), expected_type) )
{
rval->Assign(0, 0);
- reporter->Error("Comm::event_args param %d type mismatch", i);
+ reporter->Error("BrokerComm::event_args param %d type mismatch", i);
return rval;
}
@@ -373,7 +395,7 @@ RecordVal* comm::Manager::MakeEventArgs(val_list* args)
{
Unref(data_val);
rval->Assign(0, 0);
- reporter->Error("Comm::event_args unsupported event/params");
+ reporter->Error("BrokerComm::event_args unsupported event/params");
return rval;
}
@@ -383,7 +405,7 @@ RecordVal* comm::Manager::MakeEventArgs(val_list* args)
return rval;
}
-bool comm::Manager::SubscribeToPrints(string topic_prefix)
+bool bro_broker::Manager::SubscribeToPrints(string topic_prefix)
{
if ( ! Enabled() )
return false;
@@ -397,7 +419,7 @@ bool comm::Manager::SubscribeToPrints(string topic_prefix)
return true;
}
-bool comm::Manager::UnsubscribeToPrints(const string& topic_prefix)
+bool bro_broker::Manager::UnsubscribeToPrints(const string& topic_prefix)
{
if ( ! Enabled() )
return false;
@@ -405,7 +427,7 @@ bool comm::Manager::UnsubscribeToPrints(const string& topic_prefix)
return print_subscriptions.erase(topic_prefix);
}
-bool comm::Manager::SubscribeToEvents(string topic_prefix)
+bool bro_broker::Manager::SubscribeToEvents(string topic_prefix)
{
if ( ! Enabled() )
return false;
@@ -419,7 +441,7 @@ bool comm::Manager::SubscribeToEvents(string topic_prefix)
return true;
}
-bool comm::Manager::UnsubscribeToEvents(const string& topic_prefix)
+bool bro_broker::Manager::UnsubscribeToEvents(const string& topic_prefix)
{
if ( ! Enabled() )
return false;
@@ -427,7 +449,7 @@ bool comm::Manager::UnsubscribeToEvents(const string& topic_prefix)
return event_subscriptions.erase(topic_prefix);
}
-bool comm::Manager::SubscribeToLogs(string topic_prefix)
+bool bro_broker::Manager::SubscribeToLogs(string topic_prefix)
{
if ( ! Enabled() )
return false;
@@ -441,7 +463,7 @@ bool comm::Manager::SubscribeToLogs(string topic_prefix)
return true;
}
-bool comm::Manager::UnsubscribeToLogs(const string& topic_prefix)
+bool bro_broker::Manager::UnsubscribeToLogs(const string& topic_prefix)
{
if ( ! Enabled() )
return false;
@@ -449,7 +471,7 @@ bool comm::Manager::UnsubscribeToLogs(const string& topic_prefix)
return log_subscriptions.erase(topic_prefix);
}
-bool comm::Manager::PublishTopic(broker::topic t)
+bool bro_broker::Manager::PublishTopic(broker::topic t)
{
if ( ! Enabled() )
return false;
@@ -458,7 +480,7 @@ bool comm::Manager::PublishTopic(broker::topic t)
return true;
}
-bool comm::Manager::UnpublishTopic(broker::topic t)
+bool bro_broker::Manager::UnpublishTopic(broker::topic t)
{
if ( ! Enabled() )
return false;
@@ -467,7 +489,7 @@ bool comm::Manager::UnpublishTopic(broker::topic t)
return true;
}
-bool comm::Manager::AdvertiseTopic(broker::topic t)
+bool bro_broker::Manager::AdvertiseTopic(broker::topic t)
{
if ( ! Enabled() )
return false;
@@ -476,7 +498,7 @@ bool comm::Manager::AdvertiseTopic(broker::topic t)
return true;
}
-bool comm::Manager::UnadvertiseTopic(broker::topic t)
+bool bro_broker::Manager::UnadvertiseTopic(broker::topic t)
{
if ( ! Enabled() )
return false;
@@ -485,7 +507,7 @@ bool comm::Manager::UnadvertiseTopic(broker::topic t)
return true;
}
-int comm::Manager::send_flags_to_int(Val* flags)
+int bro_broker::Manager::send_flags_to_int(Val* flags)
{
auto r = flags->AsRecordVal();
int rval = 0;
@@ -508,7 +530,7 @@ int comm::Manager::send_flags_to_int(Val* flags)
return rval;
}
-void comm::Manager::GetFds(iosource::FD_Set* read, iosource::FD_Set* write,
+void bro_broker::Manager::GetFds(iosource::FD_Set* read, iosource::FD_Set* write,
iosource::FD_Set* except)
{
read->Insert(endpoint->outgoing_connection_status().fd());
@@ -529,7 +551,7 @@ void comm::Manager::GetFds(iosource::FD_Set* read, iosource::FD_Set* write,
read->Insert(broker::report::default_queue->fd());
}
-double comm::Manager::NextTimestamp(double* local_network_time)
+double bro_broker::Manager::NextTimestamp(double* local_network_time)
{
// TODO: do something better?
return timer_mgr->Time();
@@ -547,25 +569,25 @@ struct response_converter {
case broker::store::query::tag::lookup:
// A boolean result means the key doesn't exist (if it did, then
// the result would contain the broker::data value, not a bool).
- return new RecordVal(BifType::Record::Comm::Data);
+ return new RecordVal(BifType::Record::BrokerComm::Data);
default:
- return comm::make_data_val(broker::data{d});
+ return bro_broker::make_data_val(broker::data{d});
}
}
result_type operator()(uint64_t d)
{
- return comm::make_data_val(broker::data{d});
+ return bro_broker::make_data_val(broker::data{d});
}
result_type operator()(broker::data& d)
{
- return comm::make_data_val(move(d));
+ return bro_broker::make_data_val(move(d));
}
result_type operator()(std::vector& d)
{
- return comm::make_data_val(broker::data{move(d)});
+ return bro_broker::make_data_val(broker::data{move(d)});
}
result_type operator()(broker::store::snapshot& d)
@@ -579,7 +601,7 @@ struct response_converter {
table[move(key)] = move(val);
}
- return comm::make_data_val(broker::data{move(table)});
+ return bro_broker::make_data_val(broker::data{move(table)});
}
};
@@ -588,7 +610,7 @@ static RecordVal* response_to_val(broker::store::response r)
return broker::visit(response_converter{r.request.type}, r.reply.value);
}
-void comm::Manager::Process()
+void bro_broker::Manager::Process()
{
bool idle = true;
auto outgoing_connection_updates =
@@ -605,36 +627,36 @@ void comm::Manager::Process()
switch ( u.status ) {
case broker::outgoing_connection_status::tag::established:
- if ( Comm::outgoing_connection_established )
+ if ( BrokerComm::outgoing_connection_established )
{
val_list* vl = new val_list;
vl->append(new StringVal(u.relation.remote_tuple().first));
vl->append(new PortVal(u.relation.remote_tuple().second,
TRANSPORT_TCP));
vl->append(new StringVal(u.peer_name));
- mgr.QueueEvent(Comm::outgoing_connection_established, vl);
+ mgr.QueueEvent(BrokerComm::outgoing_connection_established, vl);
}
break;
case broker::outgoing_connection_status::tag::disconnected:
- if ( Comm::outgoing_connection_broken )
+ if ( BrokerComm::outgoing_connection_broken )
{
val_list* vl = new val_list;
vl->append(new StringVal(u.relation.remote_tuple().first));
vl->append(new PortVal(u.relation.remote_tuple().second,
TRANSPORT_TCP));
- mgr.QueueEvent(Comm::outgoing_connection_broken, vl);
+ mgr.QueueEvent(BrokerComm::outgoing_connection_broken, vl);
}
break;
case broker::outgoing_connection_status::tag::incompatible:
- if ( Comm::outgoing_connection_incompatible )
+ if ( BrokerComm::outgoing_connection_incompatible )
{
val_list* vl = new val_list;
vl->append(new StringVal(u.relation.remote_tuple().first));
vl->append(new PortVal(u.relation.remote_tuple().second,
TRANSPORT_TCP));
- mgr.QueueEvent(Comm::outgoing_connection_incompatible, vl);
+ mgr.QueueEvent(BrokerComm::outgoing_connection_incompatible, vl);
}
break;
@@ -652,20 +674,20 @@ void comm::Manager::Process()
switch ( u.status ) {
case broker::incoming_connection_status::tag::established:
- if ( Comm::incoming_connection_established )
+ if ( BrokerComm::incoming_connection_established )
{
val_list* vl = new val_list;
vl->append(new StringVal(u.peer_name));
- mgr.QueueEvent(Comm::incoming_connection_established, vl);
+ mgr.QueueEvent(BrokerComm::incoming_connection_established, vl);
}
break;
case broker::incoming_connection_status::tag::disconnected:
- if ( Comm::incoming_connection_broken )
+ if ( BrokerComm::incoming_connection_broken )
{
val_list* vl = new val_list;
vl->append(new StringVal(u.peer_name));
- mgr.QueueEvent(Comm::incoming_connection_broken, vl);
+ mgr.QueueEvent(BrokerComm::incoming_connection_broken, vl);
}
break;
@@ -687,7 +709,7 @@ void comm::Manager::Process()
ps.second.received += print_messages.size();
idle = false;
- if ( ! Comm::print_handler )
+ if ( ! BrokerComm::print_handler )
continue;
for ( auto& pm : print_messages )
@@ -710,7 +732,7 @@ void comm::Manager::Process()
val_list* vl = new val_list;
vl->append(new StringVal(move(*msg)));
- mgr.QueueEvent(Comm::print_handler, vl);
+ mgr.QueueEvent(BrokerComm::print_handler, vl);
}
}
@@ -837,7 +859,7 @@ void comm::Manager::Process()
continue;
}
- auto columns = data_to_val(move(lm[1]), columns_type);
+ auto columns = data_to_val(move(lm[1]), columns_type, true);
if ( ! columns )
{
@@ -953,7 +975,7 @@ void comm::Manager::Process()
SetIdle(idle);
}
-bool comm::Manager::AddStore(StoreHandleVal* handle)
+bool bro_broker::Manager::AddStore(StoreHandleVal* handle)
{
if ( ! Enabled() )
return false;
@@ -971,9 +993,9 @@ bool comm::Manager::AddStore(StoreHandleVal* handle)
return true;
}
-comm::StoreHandleVal*
-comm::Manager::LookupStore(const broker::store::identifier& id,
- comm::StoreType type)
+bro_broker::StoreHandleVal*
+bro_broker::Manager::LookupStore(const broker::store::identifier& id,
+ bro_broker::StoreType type)
{
if ( ! Enabled() )
return nullptr;
@@ -987,7 +1009,7 @@ comm::Manager::LookupStore(const broker::store::identifier& id,
return it->second;
}
-bool comm::Manager::CloseStore(const broker::store::identifier& id,
+bool bro_broker::Manager::CloseStore(const broker::store::identifier& id,
StoreType type)
{
if ( ! Enabled() )
@@ -1019,13 +1041,13 @@ bool comm::Manager::CloseStore(const broker::store::identifier& id,
return true;
}
-bool comm::Manager::TrackStoreQuery(StoreQueryCallback* cb)
+bool bro_broker::Manager::TrackStoreQuery(StoreQueryCallback* cb)
{
assert(Enabled());
return pending_queries.insert(cb).second;
}
-comm::Stats comm::Manager::ConsumeStatistics()
+bro_broker::Stats bro_broker::Manager::ConsumeStatistics()
{
statistics.outgoing_peer_count = peers.size();
statistics.data_store_count = data_stores.size();
diff --git a/src/comm/Manager.h b/src/broker/Manager.h
similarity index 86%
rename from src/comm/Manager.h
rename to src/broker/Manager.h
index 0093c0bd90..63fbba074a 100644
--- a/src/comm/Manager.h
+++ b/src/broker/Manager.h
@@ -7,16 +7,16 @@
#include
#include