Merge remote-tracking branch 'origin/topic/jsiwek/bit-1967'

* origin/topic/jsiwek/bit-1967:
  Fix a routing loop in control framework
  Add Broker::forward() function
  Enable implicit Broker message forwarding by default
  Remove Cluster::broadcast_topic
  Remove Intel Broker topics, re-use existing Cluster topics
  Remove "relay" family of Broker functions
This commit is contained in:
Robin Sommer 2018-08-29 23:45:29 +00:00
commit 6de436f3f6
33 changed files with 411 additions and 841 deletions

35
CHANGES
View file

@ -1,4 +1,39 @@
2.5-931 | 2018-08-29 23:45:29 +0000
* Add Broker::forward() function. This enables explicit forwarding
of events matching a given topic prefix. Even if a receiving node
has an event handler, it will not be raised if the event was sent
along a topic that matches a previous call to Broker::forward().
(Jon Siwek, Corelight)
* Enable implicit Broker message forwarding by default. (Jon Siwek,
Corelight)
* Remove Cluster::broadcast_topic. As enabling Broker forwarding
would cause routing loops with messages sent to such a topic (one
subscribed to on all nodes). (Jon Siwek, Corelight)
* Remove Intel Broker topics, re-use existing Cluster topics. (Jon
Siwek, Corelight)
* Update broker docs to reflect best-practice/convention for
declaring new topics.
* Remove "relay" family of Broker functions. (Jon Siwek, Corelight)
Namely these are now removed:
- Broker::relay
- Broker::publish_and_relay
- Cluster::relay_rr
- Cluster::relay_hrw
The idea being that Broker may eventually implement the necessary
routing (plus load balancing) functionality. For now, code that
used these should "manually" handle and re-publish events as
needed.
2.5-924 | 2018-08-29 18:21:37 -0500 2.5-924 | 2018-08-29 18:21:37 -0500
* Allow event/function headers to be wrapped in directives. (Johanna Amann) * Allow event/function headers to be wrapped in directives. (Johanna Amann)

View file

@ -1 +1 @@
2.5-924 2.5-931

@ -1 +1 @@
Subproject commit 3ebf910b6befde6352e3af0b25589cfc2545cb5a Subproject commit ff8c94964fccbf60abae401d03c9fb35a8894c16

@ -1 +1 @@
Subproject commit 486bbb9d9ee7c66b55003e58f986d18e951902ec Subproject commit a88cb3434e05dbb117687f6152acc8892ed969ca

@ -1 +1 @@
Subproject commit dede39c13390bcd57dff311dab9648db7dfdaa89 Subproject commit a3e188680cd2889edbb9cf09c01fb1f031a90975

View file

@ -237,7 +237,7 @@ follows certain conventions in choosing these topics to help avoid
conflicts and generally make them easier to remember. conflicts and generally make them easier to remember.
As a reminder of how topic subscriptions work, subscribers advertise As a reminder of how topic subscriptions work, subscribers advertise
interest in a topic **prefix** and then receive any messages publish by a interest in a topic **prefix** and then receive any messages published by a
peer to a topic name that starts with that prefix. E.g. Alice peer to a topic name that starts with that prefix. E.g. Alice
subscribes to the "alice/dogs" prefix, then would receive the following subscribes to the "alice/dogs" prefix, then would receive the following
message topics published by Bob: message topics published by Bob:
@ -263,12 +263,17 @@ scripts use will be along the lines of "bro/<namespace>/<specifics>"
with "<namespace>" being the script's module name (in all-undercase). with "<namespace>" being the script's module name (in all-undercase).
For example, you might expect an imaginary "Pretend" framework to For example, you might expect an imaginary "Pretend" framework to
publish/subscribe using topic names like "bro/pretend/my_cool_event". publish/subscribe using topic names like "bro/pretend/my_cool_event".
For scripts that use Broker as a means of cluster-aware analysis,
it's usually sufficient for them to make use of the topics declared
by the cluster framework. For scripts that are meant to establish
communication flows unrelated to Bro cluster, new topics are declared
(examples being the NetControl and Control frameworks).
For cluster operation, see :doc:`/scripts/base/frameworks/cluster/main.bro` For cluster operation, see :doc:`/scripts/base/frameworks/cluster/main.bro`
for a list of topics that are useful for steering published events to for a list of topics that are useful for steering published events to
the various node classes. E.g. you have the ability to broadcast to all the various node classes. E.g. you have the ability to broadcast
directly-connected nodes, only those of a given class (e.g. just workers), to all nodes of a given class (e.g. just workers) or just send to a
or to a specific node within a class. specific node within a class.
The topic names that logs get published under are a bit nuanced. In the The topic names that logs get published under are a bit nuanced. In the
default cluster configuration, they are round-robin published to default cluster configuration, they are round-robin published to
@ -279,7 +284,12 @@ processes, logs get published to the topic indicated by
For those writing their own scripts which need new topic names, a For those writing their own scripts which need new topic names, a
suggestion would be to avoid prefixing any new topics/prefixes with suggestion would be to avoid prefixing any new topics/prefixes with
"bro/" as any changes in scripts shipping with Bro will use that prefix "bro/" as any changes in scripts shipping with Bro will use that prefix
and it's better to not risk unintended conflicts. and it's better to not risk unintended conflicts. Again, it's
often less confusing to just re-use existing topic names instead
of introducing new topic names. The typical use case is writing
a cluster-enabled script, which usually just needs to route events
based upon node classes, and that already has usable topics in the
cluster framework.
Connecting to Peers Connecting to Peers
------------------- -------------------
@ -518,24 +528,28 @@ Worker Sending Events To All Workers
Since workers are not directly connected to each other in the cluster Since workers are not directly connected to each other in the cluster
topology, this type of communication is a bit different than what we topology, this type of communication is a bit different than what we
did before. Instead of using :bro:see:`Broker::publish` we use different did before since we have to manually relay the event via some node that *is*
"relay" calls to hop the message from a different node that *is* connected. connected to all workers. The manager or a proxy satisfies that requirement:
.. code:: bro .. code:: bro
event worker_to_workers(worker_name: string) event worker_to_workers(worker_name: string)
{ {
print "got event from worker", worker_name; @if ( Cluster::local_node_type() == Cluster::MANAGER ||
Cluster::local_node_type() == Cluster::PROXY )
Broker::publish(Cluster::worker_topic, worker_to_workers,
worker_name)
@else
print "got event from worker", worker_name;
@endif
} }
event some_event_handled_on_worker() event some_event_handled_on_worker()
{ {
# We know the manager is connected to all workers, so we could # We know the manager is connected to all workers, so we could
# choose to relay the event across it. Note that sending the event # choose to relay the event across it.
# this way will not allow the manager to handle it, even if it Broker::publish(Cluster::manager_topic, worker_to_workers,
# does have an event handler. Cluster::node + " (via manager)");
Broker::relay(Cluster::manager_topic, Cluster::worker_topic,
worker_to_workers, Cluster::node + " (via manager)");
# We also know that any given proxy is connected to all workers, # We also know that any given proxy is connected to all workers,
# though now we have a choice of which proxy to use. If we # though now we have a choice of which proxy to use. If we
@ -543,9 +557,9 @@ did before. Instead of using :bro:see:`Broker::publish` we use different
# we can use a round-robin strategy. The key used here is simply # we can use a round-robin strategy. The key used here is simply
# used by the cluster framework internally to keep track of # used by the cluster framework internally to keep track of
# which node is up next in the round-robin. # which node is up next in the round-robin.
Cluster::relay_rr(Cluster::proxy_pool, "example_key", local pt = Cluster::rr_topic(Cluster::proxy_pool, "example_key");
Cluster::worker_topic, worker_to_workers, Broker::publish(pt, worker_to_workers,
Cluster::node + " (via a proxy)"); Cluster::node + " (via a proxy)");
} }
Worker Distributing Events Uniformly Across Proxies Worker Distributing Events Uniformly Across Proxies

View file

@ -74,7 +74,7 @@ export {
const max_sleep = 0 &redef; const max_sleep = 0 &redef;
## Forward all received messages to subscribing peers. ## Forward all received messages to subscribing peers.
const forward_messages = F &redef; const forward_messages = T &redef;
## The default topic prefix where logs will be published. The log's stream ## The default topic prefix where logs will be published. The log's stream
## id is appended when writing to a particular stream. ## id is appended when writing to a particular stream.
@ -285,11 +285,25 @@ export {
## (except during :bro:see:`bro_init`). ## (except during :bro:see:`bro_init`).
## ##
## topic_prefix: a prefix previously supplied to a successful call to ## topic_prefix: a prefix previously supplied to a successful call to
## :bro:see:`Broker::subscribe`. ## :bro:see:`Broker::subscribe` or :bro:see:`Broker::forward`.
## ##
## Returns: true if interest in the topic prefix is no longer advertised. ## Returns: true if interest in the topic prefix is no longer advertised.
global unsubscribe: function(topic_prefix: string): bool; global unsubscribe: function(topic_prefix: string): bool;
## Register a topic prefix subscription for events that should only be
## forwarded to any subscribing peers and not raise any event handlers
## on the receiving/forwarding node. i.e. it's the same as
## :bro:see:`Broker::subscribe` except matching events are not raised
## on the receiver, just forwarded. Use :bro:see:`Broker::unsubscribe`
## with the same argument to undo this operation.
##
## topic_prefix: a prefix to match against remote message topics.
## e.g. an empty prefix matches everything and "a" matches
## "alice" and "amy" but not "bob".
##
## Returns: true if a new event forwarding/subscription is now registered.
global forward: function(topic_prefix: string): bool;
## Automatically send an event to any interested peers whenever it is ## Automatically send an event to any interested peers whenever it is
## locally dispatched. (For example, using "event my_event(...);" in a ## locally dispatched. (For example, using "event my_event(...);" in a
## script.) ## script.)
@ -377,6 +391,11 @@ function subscribe(topic_prefix: string): bool
return __subscribe(topic_prefix); return __subscribe(topic_prefix);
} }
function forward(topic_prefix: string): bool
{
return __forward(topic_prefix);
}
function unsubscribe(topic_prefix: string): bool function unsubscribe(topic_prefix: string): bool
{ {
return __unsubscribe(topic_prefix); return __unsubscribe(topic_prefix);

View file

@ -15,10 +15,6 @@ export {
## Whether to distribute log messages among available logging nodes. ## Whether to distribute log messages among available logging nodes.
const enable_round_robin_logging = T &redef; const enable_round_robin_logging = T &redef;
## The topic name used for exchanging general messages that are relevant to
## any node in a cluster. Used with broker-enabled cluster communication.
const broadcast_topic = "bro/cluster/broadcast" &redef;
## The topic name used for exchanging messages that are relevant to ## The topic name used for exchanging messages that are relevant to
## logger nodes in a cluster. Used with broker-enabled cluster communication. ## logger nodes in a cluster. Used with broker-enabled cluster communication.
const logger_topic = "bro/cluster/logger" &redef; const logger_topic = "bro/cluster/logger" &redef;
@ -43,6 +39,10 @@ export {
## a named node in a cluster. Used with broker-enabled cluster communication. ## a named node in a cluster. Used with broker-enabled cluster communication.
const node_topic_prefix = "bro/cluster/node/" &redef; const node_topic_prefix = "bro/cluster/node/" &redef;
## The topic prefix used for exchanging messages that are relevant to
## a unique node in a cluster. Used with broker-enabled cluster communication.
const nodeid_topic_prefix = "bro/cluster/nodeid/" &redef;
## Name of the node on which master data stores will be created if no other ## Name of the node on which master data stores will be created if no other
## has already been specified by the user in :bro:see:`Cluster::stores`. ## has already been specified by the user in :bro:see:`Cluster::stores`.
## An empty value means "use whatever name corresponds to the manager ## An empty value means "use whatever name corresponds to the manager
@ -238,6 +238,15 @@ export {
## Returns: a topic string that may used to send a message exclusively to ## Returns: a topic string that may used to send a message exclusively to
## a given cluster node. ## a given cluster node.
global node_topic: function(name: string): string; global node_topic: function(name: string): string;
## Retrieve the topic associated with a specific node in the cluster.
##
## id: the id of the cluster node (from :bro:see:`Broker::EndpointInfo`
## or :bro:see:`Broker::node_id`.
##
## Returns: a topic string that may used to send a message exclusively to
## a given cluster node.
global nodeid_topic: function(id: string): string;
} }
global active_worker_ids: set[string] = set(); global active_worker_ids: set[string] = set();
@ -286,6 +295,11 @@ function node_topic(name: string): string
return node_topic_prefix + name; return node_topic_prefix + name;
} }
function nodeid_topic(id: string): string
{
return node_topic_prefix + id;
}
event Cluster::hello(name: string, id: string) &priority=10 event Cluster::hello(name: string, id: string) &priority=10
{ {
if ( name !in nodes ) if ( name !in nodes )
@ -321,7 +335,7 @@ event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) &priority=
return; return;
local e = Broker::make_event(Cluster::hello, node, Broker::node_id()); local e = Broker::make_event(Cluster::hello, node, Broker::node_id());
Broker::publish(Cluster::broadcast_topic, e); Broker::publish(nodeid_topic(endpoint$id), e);
} }
event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) &priority=10 event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) &priority=10

View file

@ -87,7 +87,7 @@ event bro_init() &priority=-10
return; return;
} }
Broker::subscribe(Cluster::broadcast_topic); Broker::subscribe(nodeid_topic(Broker::node_id()));
Broker::subscribe(node_topic(node)); Broker::subscribe(node_topic(node));
Broker::listen(Broker::default_listen_address, Broker::listen(Broker::default_listen_address,

View file

@ -28,11 +28,6 @@ export {
## record as it is sent on to the logging framework. ## record as it is sent on to the logging framework.
global log_config: event(rec: Info); global log_config: event(rec: Info);
## Broker topic for announcing new configuration values. Sending new_value,
## peers can send configuration changes that will be distributed across
## the entire cluster.
const change_topic = "bro/config/change";
## This function is the config framework layer around the lower-level ## This function is the config framework layer around the lower-level
## :bro:see:`Option::set` call. Config::set_value will set the configuration ## :bro:see:`Option::set` call. Config::set_value will set the configuration
## value for all nodes in the cluster, no matter where it was called. Note ## value for all nodes in the cluster, no matter where it was called. Note
@ -57,48 +52,58 @@ type OptionCacheValue: record {
global option_cache: table[string] of OptionCacheValue; global option_cache: table[string] of OptionCacheValue;
event bro_init() global Config::cluster_set_option: event(ID: string, val: any, location: string);
function broadcast_option(ID: string, val: any, location: string)
{ {
Broker::subscribe(change_topic); # There's not currently a common topic to broadcast to as then enabling
# implicit Broker forwarding would cause a routing loop.
Broker::publish(Cluster::worker_topic, Config::cluster_set_option,
ID, val, location);
Broker::publish(Cluster::proxy_topic, Config::cluster_set_option,
ID, val, location);
Broker::publish(Cluster::logger_topic, Config::cluster_set_option,
ID, val, location);
} }
event Config::cluster_set_option(ID: string, val: any, location: string) event Config::cluster_set_option(ID: string, val: any, location: string)
{ {
@if ( Cluster::local_node_type() == Cluster::MANAGER ) @if ( Cluster::local_node_type() == Cluster::MANAGER )
option_cache[ID] = OptionCacheValue($val=val, $location=location); option_cache[ID] = OptionCacheValue($val=val, $location=location);
broadcast_option(ID, val, location);
@endif @endif
Option::set(ID, val, location); Option::set(ID, val, location);
} }
function set_value(ID: string, val: any, location: string &default = "" &optional): bool function set_value(ID: string, val: any, location: string &default = "" &optional): bool
{ {
local cache_val: any; # Always copy the value to break references -- if caller mutates their
# First cache value in case setting it succeeds and we have to store it. # value afterwards, we still guarantee the option has not changed. If
if ( Cluster::local_node_type() == Cluster::MANAGER ) # one wants it to change, they need to explicitly call Option::set_value
cache_val = copy(val); # or Option::set with the intended value at the time of the call.
val = copy(val);
# First try setting it locally - abort if not possible. # First try setting it locally - abort if not possible.
if ( ! Option::set(ID, val, location) ) if ( ! Option::set(ID, val, location) )
return F; return F;
# If setting worked, copy the new value into the cache on the manager
if ( Cluster::local_node_type() == Cluster::MANAGER )
option_cache[ID] = OptionCacheValue($val=cache_val, $location=location);
# If it turns out that it is possible - send it to everyone else to apply. @if ( Cluster::local_node_type() == Cluster::MANAGER )
Broker::publish(change_topic, Config::cluster_set_option, ID, val, location); option_cache[ID] = OptionCacheValue($val=val, $location=location);
broadcast_option(ID, val, location);
@else
Broker::publish(Cluster::manager_topic, Config::cluster_set_option,
ID, val, location);
@endif
if ( Cluster::local_node_type() != Cluster::MANAGER )
{
Broker::relay(change_topic, change_topic, Config::cluster_set_option, ID, val, location);
}
return T; return T;
} }
@else @else # Standalone implementation
# Standalone implementation
function set_value(ID: string, val: any, location: string &default = "" &optional): bool function set_value(ID: string, val: any, location: string &default = "" &optional): bool
{ {
return Option::set(ID, val, location); return Option::set(ID, val, location);
} }
@endif @endif # Cluster::is_enabled
@if ( Cluster::is_enabled() && Cluster::local_node_type() == Cluster::MANAGER ) @if ( Cluster::is_enabled() && Cluster::local_node_type() == Cluster::MANAGER )
# Handling of new worker nodes. # Handling of new worker nodes.

View file

@ -6,21 +6,6 @@
module Intel; module Intel;
export {
## Broker topic for management of intel items. Sending insert_item and
## remove_item events, peers can manage intelligence data.
const item_topic = "bro/intel/items" &redef;
## Broker topic for management of intel indicators as stored on workers
## for matching. Sending insert_indicator and remove_indicator events,
## the back-end manages indicators.
const indicator_topic = "bro/intel/indicators" &redef;
## Broker topic for matching events, generated by workers and sent to
## the back-end for metadata enrichment and logging.
const match_topic = "bro/intel/match" &redef;
}
# Internal events for cluster data distribution. # Internal events for cluster data distribution.
global insert_item: event(item: Item); global insert_item: event(item: Item);
global insert_indicator: event(item: Item); global insert_indicator: event(item: Item);
@ -33,10 +18,7 @@ redef have_full_data = F;
@if ( Cluster::local_node_type() == Cluster::MANAGER ) @if ( Cluster::local_node_type() == Cluster::MANAGER )
event bro_init() event bro_init()
{ {
Broker::subscribe(item_topic); Broker::auto_publish(Cluster::worker_topic, remove_indicator);
Broker::subscribe(match_topic);
Broker::auto_publish(indicator_topic, remove_indicator);
} }
# Handling of new worker nodes. # Handling of new worker nodes.
@ -54,11 +36,14 @@ event Cluster::node_up(name: string, id: string)
# has to be distributed. # has to be distributed.
event Intel::new_item(item: Item) &priority=5 event Intel::new_item(item: Item) &priority=5
{ {
if ( Cluster::proxy_pool$alive_count == 0 ) local pt = Cluster::rr_topic(Cluster::proxy_pool, "intel_insert_rr_key");
Broker::publish(indicator_topic, Intel::insert_indicator, item);
else if ( pt == "" )
Cluster::relay_rr(Cluster::proxy_pool, "Intel::new_item_relay_rr", # No proxies alive, publish to all workers ourself instead of
indicator_topic, Intel::insert_indicator, item); # relaying via a proxy.
pt = Cluster::worker_topic;
Broker::publish(pt, Intel::insert_indicator, item);
} }
# Handling of item insertion triggered by remote node. # Handling of item insertion triggered by remote node.
@ -84,17 +69,15 @@ event Intel::match_remote(s: Seen) &priority=5
@if ( Cluster::local_node_type() == Cluster::WORKER ) @if ( Cluster::local_node_type() == Cluster::WORKER )
event bro_init() event bro_init()
{ {
Broker::subscribe(indicator_topic); Broker::auto_publish(Cluster::manager_topic, match_remote);
Broker::auto_publish(Cluster::manager_topic, remove_item);
Broker::auto_publish(match_topic, match_remote);
Broker::auto_publish(item_topic, remove_item);
} }
# On a worker, the new_item event requires to trigger the insertion # On a worker, the new_item event requires to trigger the insertion
# on the manager to update the back-end data store. # on the manager to update the back-end data store.
event Intel::new_item(item: Intel::Item) &priority=5 event Intel::new_item(item: Intel::Item) &priority=5
{ {
Broker::publish(item_topic, Intel::insert_item, item); Broker::publish(Cluster::manager_topic, Intel::insert_item, item);
} }
# Handling of new indicators published by the manager. # Handling of new indicators published by the manager.
@ -103,3 +86,12 @@ event Intel::insert_indicator(item: Intel::Item) &priority=5
Intel::_insert(item, F); Intel::_insert(item, F);
} }
@endif @endif
@if ( Cluster::local_node_type() == Cluster::PROXY )
event Intel::insert_indicator(item: Intel::Item) &priority=5
{
# Just forwarding from manager to workers.
Broker::publish(Cluster::worker_topic, Intel::insert_indicator, item);
}
@endif

View file

@ -24,32 +24,41 @@ export {
## Sniffed mime type of the file. ## Sniffed mime type of the file.
dcc_mime_type: string &log &optional; dcc_mime_type: string &log &optional;
}; };
## The broker topic name to which expected DCC transfer updates are
## relayed.
const dcc_transfer_update_topic = "bro/irc/dcc_transfer_update" &redef;
} }
global dcc_expected_transfers: table[addr, port] of Info &read_expire=5mins; global dcc_expected_transfers: table[addr, port] of Info &read_expire=5mins;
function dcc_relay_topic(): string
{
local rval = Cluster::rr_topic(Cluster::proxy_pool, "dcc_transfer_rr_key");
if ( rval == "" )
# No proxy is alive, so relay via manager instead.
return Cluster::manager_topic;
return rval;
}
event dcc_transfer_add(host: addr, p: port, info: Info) event dcc_transfer_add(host: addr, p: port, info: Info)
{ {
@if ( Cluster::local_node_type() == Cluster::PROXY ||
Cluster::local_node_type() == Cluster::MANAGER )
Broker::publish(Cluster::worker_topic, dcc_transfer_add, host, p, info);
@else
dcc_expected_transfers[host, p] = info; dcc_expected_transfers[host, p] = info;
Analyzer::schedule_analyzer(0.0.0.0, host, p, Analyzer::schedule_analyzer(0.0.0.0, host, p,
Analyzer::ANALYZER_IRC_DATA, 5 min); Analyzer::ANALYZER_IRC_DATA, 5 min);
@endif
} }
event dcc_transfer_remove(host: addr, p: port) event dcc_transfer_remove(host: addr, p: port)
{ {
@if ( Cluster::local_node_type() == Cluster::PROXY ||
Cluster::local_node_type() == Cluster::MANAGER )
Broker::publish(Cluster::worker_topic, dcc_transfer_remove, host, p);
@else
delete dcc_expected_transfers[host, p]; delete dcc_expected_transfers[host, p];
} @endif
event bro_init()
{
local lnt = Cluster::local_node_type();
if ( lnt == Cluster::WORKER )
Broker::subscribe(dcc_transfer_update_topic);
} }
function log_dcc(f: fa_file) function log_dcc(f: fa_file)
@ -76,9 +85,11 @@ function log_dcc(f: fa_file)
delete irc$dcc_mime_type; delete irc$dcc_mime_type;
delete dcc_expected_transfers[cid$resp_h, cid$resp_p]; delete dcc_expected_transfers[cid$resp_h, cid$resp_p];
Cluster::relay_rr(Cluster::proxy_pool, dcc_transfer_update_topic,
dcc_transfer_update_topic, dcc_transfer_remove, @if ( Cluster::is_enabled() )
cid$resp_h, cid$resp_p); Broker::publish(dcc_relay_topic(), dcc_transfer_remove,
cid$resp_h, cid$resp_p);
@endif
return; return;
} }
} }
@ -102,9 +113,10 @@ event irc_dcc_message(c: connection, is_orig: bool,
local p = count_to_port(dest_port, tcp); local p = count_to_port(dest_port, tcp);
Analyzer::schedule_analyzer(0.0.0.0, address, p, Analyzer::ANALYZER_IRC_DATA, 5 min); Analyzer::schedule_analyzer(0.0.0.0, address, p, Analyzer::ANALYZER_IRC_DATA, 5 min);
dcc_expected_transfers[address, p] = c$irc; dcc_expected_transfers[address, p] = c$irc;
Cluster::relay_rr(Cluster::proxy_pool, dcc_transfer_update_topic,
dcc_transfer_update_topic, dcc_transfer_add, @if ( Cluster::is_enabled() )
address, p, c$irc); Broker::publish(dcc_relay_topic(), dcc_transfer_add, address, p, c$irc);
@endif
} }
event scheduled_analyzer_applied(c: connection, a: Analyzer::Tag) &priority=10 event scheduled_analyzer_applied(c: connection, a: Analyzer::Tag) &priority=10
@ -119,8 +131,10 @@ event connection_state_remove(c: connection) &priority=-5
if ( [c$id$resp_h, c$id$resp_p] in dcc_expected_transfers ) if ( [c$id$resp_h, c$id$resp_p] in dcc_expected_transfers )
{ {
delete dcc_expected_transfers[c$id$resp_h, c$id$resp_p]; delete dcc_expected_transfers[c$id$resp_h, c$id$resp_p];
Cluster::relay_rr(Cluster::proxy_pool, dcc_transfer_update_topic,
dcc_transfer_update_topic, dcc_transfer_remove, @if ( Cluster::is_enabled() )
c$id$resp_h, c$id$resp_p); Broker::publish(dcc_relay_topic(), dcc_transfer_remove,
c$id$resp_h, c$id$resp_p);
@endif
} }
} }

View file

@ -14,7 +14,7 @@ module Control;
event bro_init() &priority=-10 event bro_init() &priority=-10
{ {
Broker::subscribe(Control::topic_prefix); Broker::subscribe(Control::topic_prefix + "/" + Broker::node_id());
Broker::auto_publish(Control::topic_prefix + "/id_value_response", Broker::auto_publish(Control::topic_prefix + "/id_value_response",
Control::id_value_response); Control::id_value_response);
Broker::auto_publish(Control::topic_prefix + "/peer_status_response", Broker::auto_publish(Control::topic_prefix + "/peer_status_response",

View file

@ -23,16 +23,6 @@ event bro_init() &priority=5
terminate(); terminate();
} }
Broker::auto_publish(Control::topic_prefix + "/id_value_request",
Control::id_value_request);
Broker::auto_publish(Control::topic_prefix + "/peer_status_request",
Control::peer_status_request);
Broker::auto_publish(Control::topic_prefix + "/net_stats_request",
Control::net_stats_request);
Broker::auto_publish(Control::topic_prefix + "/configuration_update_request",
Control::configuration_update_request);
Broker::auto_publish(Control::topic_prefix + "/shutdown_request",
Control::shutdown_request);
Broker::subscribe(Control::topic_prefix); Broker::subscribe(Control::topic_prefix);
Broker::peer(cat(host), host_port); Broker::peer(cat(host), host_port);
} }
@ -88,30 +78,30 @@ function configurable_ids(): id_table
return rval; return rval;
} }
function send_control_request() function send_control_request(topic: string)
{ {
switch ( cmd ) { switch ( cmd ) {
case "id_value": case "id_value":
if ( arg == "" ) if ( arg == "" )
Reporter::fatal("The Control::id_value command requires that Control::arg also has some value."); Reporter::fatal("The Control::id_value command requires that Control::arg also has some value.");
event Control::id_value_request(arg); Broker::publish(topic, Control::id_value_request, arg);
break; break;
case "peer_status": case "peer_status":
event Control::peer_status_request(); Broker::publish(topic, Control::peer_status_request);
break; break;
case "net_stats": case "net_stats":
event Control::net_stats_request(); Broker::publish(topic, Control::net_stats_request);
break; break;
case "shutdown": case "shutdown":
event Control::shutdown_request(); Broker::publish(topic, Control::shutdown_request);
break; break;
case "configuration_update": case "configuration_update":
event Control::configuration_update_request(); Broker::publish(topic, Control::configuration_update_request);
break; break;
default: default:
@ -122,6 +112,8 @@ function send_control_request()
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) &priority=-10 event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) &priority=-10
{ {
local topic = Control::topic_prefix + "/" + endpoint$id;
if ( cmd == "configuration_update" ) if ( cmd == "configuration_update" )
{ {
# Send all &redef'able consts to the peer. # Send all &redef'able consts to the peer.
@ -130,8 +122,6 @@ event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) &priority=
for ( id in ids ) for ( id in ids )
{ {
local topic = fmt("%s/id/%s", Control::topic_prefix, id);
if ( Broker::publish_id(topic, id) ) if ( Broker::publish_id(topic, id) )
++publish_count; ++publish_count;
} }
@ -139,5 +129,5 @@ event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) &priority=
Reporter::info(fmt("Control framework sent %d IDs", publish_count)); Reporter::info(fmt("Control framework sent %d IDs", publish_count));
} }
send_control_request(); send_control_request(topic);
} }

@ -1 +1 @@
Subproject commit d7af30b30c811077d3838f4142400d8cc1f2cdbb Subproject commit 7c95b51de202ac534b27dd721da5778b773dd614

View file

@ -2,6 +2,7 @@
#include <broker/broker.hh> #include <broker/broker.hh>
#include <broker/bro.hh> #include <broker/bro.hh>
#include <cstdio> #include <cstdio>
#include <cstring>
#include <unistd.h> #include <unistd.h>
#include "Manager.h" #include "Manager.h"
@ -403,76 +404,6 @@ bool Manager::PublishEvent(string topic, RecordVal* args)
return PublishEvent(topic, event_name, std::move(xs)); return PublishEvent(topic, event_name, std::move(xs));
} }
bool Manager::RelayEvent(std::string first_topic,
broker::set relay_topics,
std::string name,
broker::vector args,
bool handle_on_relayer)
{
if ( bstate->endpoint.is_shutdown() )
return true;
if ( peer_count == 0 )
return true;
DBG_LOG(DBG_BROKER, "Publishing %s-relay event: %s",
handle_on_relayer ? "handle" : "",
RenderEvent(first_topic, name, args).c_str());
if ( handle_on_relayer )
{
broker::bro::HandleAndRelayEvent msg(std::move(relay_topics),
std::move(name),
std::move(args));
bstate->endpoint.publish(std::move(first_topic), std::move(msg));
}
else
{
broker::bro::RelayEvent msg(std::move(relay_topics),
std::move(name),
std::move(args));
bstate->endpoint.publish(std::move(first_topic), std::move(msg));
}
++statistics.num_events_outgoing;
return true;
}
bool Manager::RelayEvent(std::string first_topic,
std::set<std::string> relay_topics,
RecordVal* args,
bool handle_on_relayer)
{
if ( bstate->endpoint.is_shutdown() )
return true;
if ( peer_count == 0 )
return true;
if ( ! args->Lookup(0) )
return false;
auto event_name = args->Lookup(0)->AsString()->CheckString();
auto vv = args->Lookup(1)->AsVectorVal();
broker::vector xs;
xs.reserve(vv->Size());
for ( auto i = 0u; i < vv->Size(); ++i )
{
auto val = vv->Lookup(i)->AsRecordVal()->Lookup(0);
auto data_val = static_cast<DataVal*>(val);
xs.emplace_back(data_val->data);
}
broker::set topic_set;
for ( auto& t : relay_topics )
topic_set.emplace(std::move(t));
return RelayEvent(first_topic, std::move(topic_set), event_name,
std::move(xs), handle_on_relayer);
}
bool Manager::PublishIdentifier(std::string topic, std::string id) bool Manager::PublishIdentifier(std::string topic, std::string id)
{ {
if ( bstate->endpoint.is_shutdown() ) if ( bstate->endpoint.is_shutdown() )
@ -857,8 +788,28 @@ bool Manager::Subscribe(const string& topic_prefix)
return true; return true;
} }
bool Manager::Forward(string topic_prefix)
{
for ( auto i = 0u; i < forwarded_prefixes.size(); ++i )
if ( forwarded_prefixes[i] == topic_prefix )
return false;
DBG_LOG(DBG_BROKER, "Forwarding topic prefix %s", topic_prefix.c_str());
Subscribe(topic_prefix);
forwarded_prefixes.emplace_back(std::move(topic_prefix));
return true;
}
bool Manager::Unsubscribe(const string& topic_prefix) bool Manager::Unsubscribe(const string& topic_prefix)
{ {
for ( auto i = 0u; i < forwarded_prefixes.size(); ++i )
if ( forwarded_prefixes[i] == topic_prefix )
{
DBG_LOG(DBG_BROKER, "Unforwading topic prefix %s", topic_prefix.c_str());
forwarded_prefixes.erase(forwarded_prefixes.begin() + i);
break;
}
DBG_LOG(DBG_BROKER, "Unsubscribing from topic prefix %s", topic_prefix.c_str()); DBG_LOG(DBG_BROKER, "Unsubscribing from topic prefix %s", topic_prefix.c_str());
bstate->subscriber.remove_topic(topic_prefix, ! after_bro_init); bstate->subscriber.remove_topic(topic_prefix, ! after_bro_init);
return true; return true;
@ -898,19 +849,11 @@ double Manager::NextTimestamp(double* local_network_time)
return -1; return -1;
} }
void Manager::DispatchMessage(broker::data msg) void Manager::DispatchMessage(const broker::topic& topic, broker::data msg)
{ {
switch ( broker::bro::Message::type(msg) ) { switch ( broker::bro::Message::type(msg) ) {
case broker::bro::Message::Type::Event: case broker::bro::Message::Type::Event:
ProcessEvent(std::move(msg)); ProcessEvent(topic, std::move(msg));
break;
case broker::bro::Message::Type::RelayEvent:
ProcessRelayEvent(std::move(msg));
break;
case broker::bro::Message::Type::HandleAndRelayEvent:
ProcessHandleAndRelayEvent(std::move(msg));
break; break;
case broker::bro::Message::Type::LogCreate: case broker::bro::Message::Type::LogCreate:
@ -930,7 +873,7 @@ void Manager::DispatchMessage(broker::data msg)
broker::bro::Batch batch(std::move(msg)); broker::bro::Batch batch(std::move(msg));
for ( auto& i : batch.batch() ) for ( auto& i : batch.batch() )
DispatchMessage(std::move(i)); DispatchMessage(topic, std::move(i));
break; break;
} }
@ -978,7 +921,7 @@ void Manager::Process()
try try
{ {
DispatchMessage(std::move(msg)); DispatchMessage(topic, std::move(msg));
} }
catch ( std::runtime_error& e ) catch ( std::runtime_error& e )
{ {
@ -1001,8 +944,11 @@ void Manager::Process()
} }
void Manager::ProcessEvent(std::string name, broker::vector args) void Manager::ProcessEvent(const broker::topic& topic, broker::bro::Event ev)
{ {
auto name = std::move(ev.name());
auto args = std::move(ev.args());
DBG_LOG(DBG_BROKER, "Process event: %s %s", DBG_LOG(DBG_BROKER, "Process event: %s %s",
name.data(), RenderMessage(args).data()); name.data(), RenderMessage(args).data());
++statistics.num_events_incoming; ++statistics.num_events_incoming;
@ -1011,6 +957,23 @@ void Manager::ProcessEvent(std::string name, broker::vector args)
if ( ! handler ) if ( ! handler )
return; return;
auto& topic_string = topic.string();
for ( auto i = 0u; i < forwarded_prefixes.size(); ++i )
{
auto& p = forwarded_prefixes[i];
if ( p.size() > topic_string.size() )
continue;
if ( strncmp(p.data(), topic_string.data(), p.size()) != 0 )
continue;
DBG_LOG(DBG_BROKER, "Skip processing of forwarded event: %s %s",
name.data(), RenderMessage(args).data());
return;
}
auto arg_types = handler->FType(false)->ArgTypes()->Types(); auto arg_types = handler->FType(false)->ArgTypes()->Types();
if ( static_cast<size_t>(arg_types->length()) != args.size() ) if ( static_cast<size_t>(arg_types->length()) != args.size() )
@ -1047,34 +1010,6 @@ void Manager::ProcessEvent(std::string name, broker::vector args)
delete_vals(vl); delete_vals(vl);
} }
void Manager::ProcessEvent(broker::bro::Event ev)
{
ProcessEvent(std::move(ev.name()), std::move(ev.args()));
}
void Manager::ProcessRelayEvent(broker::bro::RelayEvent ev)
{
DBG_LOG(DBG_BROKER, "Received relay event: %s", RenderMessage(ev).c_str());
++statistics.num_events_incoming;
for ( auto& t : ev.topics() )
PublishEvent(std::move(caf::get<std::string>(t)),
std::move(ev.name()),
std::move(ev.args()));
}
void Manager::ProcessHandleAndRelayEvent(broker::bro::HandleAndRelayEvent ev)
{
DBG_LOG(DBG_BROKER, "Received handle-relay event: %s",
RenderMessage(ev).c_str());
ProcessEvent(ev.name(), ev.args());
for ( auto& t : ev.topics() )
PublishEvent(std::move(caf::get<std::string>(t)),
std::move(ev.name()),
std::move(ev.args()));
}
bool bro_broker::Manager::ProcessLogCreate(broker::bro::LogCreate lc) bool bro_broker::Manager::ProcessLogCreate(broker::bro::LogCreate lc)
{ {
DBG_LOG(DBG_BROKER, "Received log-create: %s", RenderMessage(lc).c_str()); DBG_LOG(DBG_BROKER, "Received log-create: %s", RenderMessage(lc).c_str());

View file

@ -153,43 +153,6 @@ public:
*/ */
bool PublishEvent(std::string topic, RecordVal* ev); bool PublishEvent(std::string topic, RecordVal* ev);
/**
* Sends an event to any interested peers, who, upon receipt,
* republishes the event to a new set of topics and optionally
* calls event handlers.
* @param first_topic the first topic to use when publishing the event
* @param relay_topics the set of topics the receivers will use to
* republish the event. The event is relayed at most a single hop.
* @param name the name of the event
* @param args the event's arguments
* @param handle_on_relayer whether they relaying-node should call event
* handlers.
* @return true if the message is sent successfully.
*/
bool RelayEvent(std::string first_topic,
broker::set relay_topics,
std::string name,
broker::vector args,
bool handle_on_relayer);
/**
* Sends an event to any interested peers, who, upon receipt,
* republishes the event to a new set of topics and optionally
* calls event handlers.
* @param first_topic the first topic to use when publishing the event
* @param relay_topics the set of topics the receivers will use to
* republish the event. The event is relayed at most a single hop.
* @param ev the event and its arguments to send to peers, in the form of
* a Broker::Event record type.
* @param handle_on_relayer whether they relaying-node should call event
* handlers.
* @return true if the message is sent successfully.
*/
bool RelayEvent(std::string first_topic,
std::set<std::string> relay_topics,
RecordVal* ev,
bool handle_on_relayer);
/** /**
* Send a message to create a log stream to any interested peers. * Send a message to create a log stream to any interested peers.
* The log stream may or may not already exist on the receiving side. * The log stream may or may not already exist on the receiving side.
@ -261,10 +224,21 @@ public:
*/ */
bool Subscribe(const std::string& topic_prefix); bool Subscribe(const std::string& topic_prefix);
/**
* Register interest in peer event messages that use a certain topic prefix,
* but that should not be raised locally, just forwarded to any subscribing
* peers.
* @param topic_prefix a prefix to match against remote message topics.
* e.g. an empty prefix will match everything and "a" will match "alice"
* and "amy" but not "bob".
* @return true if it's a new event forward/subscription and it is now registered.
*/
bool Forward(std::string topic_prefix);
/** /**
* Unregister interest in peer event messages. * Unregister interest in peer event messages.
* @param topic_prefix a prefix previously supplied to a successful call * @param topic_prefix a prefix previously supplied to a successful call
* to bro_broker::Manager::Subscribe(). * to bro_broker::Manager::Subscribe() or bro_broker::Manager::Forward().
* @return true if interest in topic prefix is no longer advertised. * @return true if interest in topic prefix is no longer advertised.
*/ */
bool Unsubscribe(const std::string& topic_prefix); bool Unsubscribe(const std::string& topic_prefix);
@ -348,11 +322,8 @@ public:
private: private:
void DispatchMessage(broker::data msg); void DispatchMessage(const broker::topic& topic, broker::data msg);
void ProcessEvent(std::string name, broker::vector args); void ProcessEvent(const broker::topic& topic, broker::bro::Event ev);
void ProcessEvent(broker::bro::Event ev);
void ProcessRelayEvent(broker::bro::RelayEvent re);
void ProcessHandleAndRelayEvent(broker::bro::HandleAndRelayEvent ev);
bool ProcessLogCreate(broker::bro::LogCreate lc); bool ProcessLogCreate(broker::bro::LogCreate lc);
bool ProcessLogWrite(broker::bro::LogWrite lw); bool ProcessLogWrite(broker::bro::LogWrite lw);
bool ProcessIdentifierUpdate(broker::bro::IdentifierUpdate iu); bool ProcessIdentifierUpdate(broker::bro::IdentifierUpdate iu);
@ -403,6 +374,7 @@ private:
std::unordered_map<std::string, StoreHandleVal*> data_stores; std::unordered_map<std::string, StoreHandleVal*> data_stores;
std::unordered_map<query_id, StoreQueryCallback*, std::unordered_map<query_id, StoreQueryCallback*,
query_id_hasher> pending_queries; query_id_hasher> pending_queries;
std::vector<std::string> forwarded_prefixes;
Stats statistics; Stats statistics;

View file

@ -67,28 +67,6 @@ static bool publish_event_args(val_list& args, const BroString* topic,
return rval; return rval;
} }
static bool relay_event_args(val_list& args, const BroString* topic,
std::set<std::string> topic_set, Frame* frame)
{
bro_broker::Manager::ScriptScopeGuard ssg;
auto rval = false;
if ( args[0]->Type()->Tag() == TYPE_RECORD )
rval = broker_mgr->RelayEvent(topic->CheckString(),
std::move(topic_set),
args[0]->AsRecordVal(), false);
else
{
auto ev = broker_mgr->MakeEvent(&args, frame);
rval = broker_mgr->RelayEvent(topic->CheckString(),
std::move(topic_set),
ev, false);
Unref(ev);
}
return rval;
}
%%} %%}
module Broker; module Broker;
@ -131,130 +109,6 @@ function Broker::publish%(topic: string, ...%): bool
return new Val(rval, TYPE_BOOL); return new Val(rval, TYPE_BOOL);
%} %}
## Publishes an event at a given topic, with any receivers automatically
## forwarding it to its peers with a different topic. The event is relayed
## at most a single hop and the relayer does not call any local event handlers.
##
## first_topic: the initial topic to use for publishing the event.
##
## args: the first member of the argument list may be either a string or
## a set of strings indicating the secondary topic that the first
## set of receivers will use to re-publish the event. The remaining
## members of the argument list are either the return value of a
## previously-made call to :bro:see:`Broker::make_event` or the
## argument list that should be passed along to it, so that it can
## be called as part of executing this function.
##
## Returns: true if the message is sent.
function Broker::relay%(first_topic: string, ...%): bool
%{
bro_broker::Manager::ScriptScopeGuard ssg;
val_list* bif_args = @ARGS@;
if ( bif_args->length() < 3 )
{
builtin_error("Broker::relay requires at least 3 arguments");
return new Val(false, TYPE_BOOL);
}
auto second_topic = (*bif_args)[1];
if ( second_topic->Type()->Tag() != TYPE_STRING &&
! is_string_set(second_topic->Type()) )
{
builtin_error("Broker::relay requires a string or string_set as 2nd argument");
return new Val(false, TYPE_BOOL);
}
auto topic_set = val_to_topic_set(second_topic);
if ( topic_set.empty() )
return new Val(false, TYPE_BOOL);
val_list args(bif_args->length() - 2);
for ( auto i = 2; i < bif_args->length(); ++i )
args.append((*bif_args)[i]);
auto rval = false;
if ( args[0]->Type()->Tag() == TYPE_RECORD )
rval = broker_mgr->RelayEvent(first_topic->CheckString(),
std::move(topic_set),
args[0]->AsRecordVal(), false);
else
{
auto ev = broker_mgr->MakeEvent(&args, frame);
rval = broker_mgr->RelayEvent(first_topic->CheckString(),
std::move(topic_set), ev, false);
Unref(ev);
}
return new Val(rval, TYPE_BOOL);
%}
## Publishes an event at a given topic, with any receivers automatically
## forwarding it to its peers with a different topic. The event is relayed
## at most a single hop and the relayer does call local event handlers.
##
## first_topic: the initial topic to use for publishing the event.
##
## args: the first member of the argument list may be either a string or
## a set of strings indicating the secondary topic that the first
## set of receivers will use to re-publish the event. The remaining
## members of the argument list are either the return value of a
## previously-made call to :bro:see:`Broker::make_event` or the
## argument list that should be passed along to it, so that it can
## be called as part of executing this function.
##
## Returns: true if the message is sent.
function Broker::publish_and_relay%(first_topic: string, ...%): bool
%{
bro_broker::Manager::ScriptScopeGuard ssg;
val_list* bif_args = @ARGS@;
if ( bif_args->length() < 3 )
{
builtin_error("Broker::publish_and_relay requires at least 3 arguments");
return new Val(false, TYPE_BOOL);
}
auto second_topic = (*bif_args)[1];
if ( second_topic->Type()->Tag() != TYPE_STRING &&
! is_string_set(second_topic->Type()) )
{
builtin_error("Broker::publish_and_relay requires a string or string_set as 2nd argument");
return new Val(false, TYPE_BOOL);
}
auto topic_set = val_to_topic_set(second_topic);
if ( topic_set.empty() )
return new Val(false, TYPE_BOOL);
val_list args(bif_args->length() - 2);
for ( auto i = 2; i < bif_args->length(); ++i )
args.append((*bif_args)[i]);
auto rval = false;
if ( args[0]->Type()->Tag() == TYPE_RECORD )
rval = broker_mgr->RelayEvent(first_topic->CheckString(),
std::move(topic_set),
args[0]->AsRecordVal(), true);
else
{
auto ev = broker_mgr->MakeEvent(&args, frame);
rval = broker_mgr->RelayEvent(first_topic->CheckString(),
std::move(topic_set), ev, true);
Unref(ev);
}
return new Val(rval, TYPE_BOOL);
%}
function Broker::__flush_logs%(%): count function Broker::__flush_logs%(%): count
%{ %{
auto rval = broker_mgr->FlushLogBuffers(); auto rval = broker_mgr->FlushLogBuffers();
@ -290,6 +144,13 @@ function Broker::__subscribe%(topic_prefix: string%): bool
return new Val(rval, TYPE_BOOL); return new Val(rval, TYPE_BOOL);
%} %}
function Broker::__forward%(topic_prefix: string%): bool
%{
bro_broker::Manager::ScriptScopeGuard ssg;
auto rval = broker_mgr->Forward(topic_prefix->CheckString());
return new Val(rval, TYPE_BOOL);
%}
function Broker::__unsubscribe%(topic_prefix: string%): bool function Broker::__unsubscribe%(topic_prefix: string%): bool
%{ %{
bro_broker::Manager::ScriptScopeGuard ssg; bro_broker::Manager::ScriptScopeGuard ssg;
@ -345,81 +206,6 @@ function Cluster::publish_rr%(pool: Pool, key: string, ...%): bool
%} %}
## Publishes an event at a given topic, with a receiver node chosen
## from a pool according to Round-Robin distribution strategy. The receiving
## node, then automatically forwards it to its peers with a different topic.
## The event is relayed at most a single hop.
##
## pool: the pool of nodes that are eligible to receive the event.
##
## key: an arbitrary string to identify the purpose for which you're
## distributing the event. e.g. consider using namespacing of your
## script like "Intel::cluster_rr_key".
##
## args: the first member of the argument list may be either a string or
## a set of strings indicating the secondary topic that the receiver
## will use to re-publish the event. The remaining
## members of the argument list are either the return value of a
## previously-made call to :bro:see:`Broker::make_event` or the
## argument list that should be passed along to it, so that it can
## be called as part of executing this function.
##
## Returns: true if the message is sent.
function Cluster::relay_rr%(pool: Pool, key: any, ...%): bool
%{
val_list* bif_args = @ARGS@;
if ( bif_args->length() < 4 )
{
builtin_error("Cluster::relay_rr requires at least 4 arguments");
return new Val(false, TYPE_BOOL);
}
static Func* topic_func = 0;
if ( ! topic_func )
topic_func = global_scope()->Lookup("Cluster::rr_topic")->ID_Val()->AsFunc();
val_list vl(2);
vl.append(pool->Ref());
vl.append(key->Ref());
auto topic = topic_func->Call(&vl);
if ( ! topic->AsString()->Len() )
{
Unref(topic);
return new Val(false, TYPE_BOOL);
}
auto second_topic = (*bif_args)[2];
if ( second_topic->Type()->Tag() != TYPE_STRING &&
! is_string_set(second_topic->Type()) )
{
builtin_error("Cluster::relay_rr requires a string or string_set as 3rd argument");
Unref(topic);
return new Val(false, TYPE_BOOL);
}
auto topic_set = val_to_topic_set(second_topic);
if ( topic_set.empty() )
{
Unref(topic);
return new Val(false, TYPE_BOOL);
}
val_list args(bif_args->length() - 3);
for ( auto i = 3; i < bif_args->length(); ++i )
args.append((*bif_args)[i]);
auto rval = relay_event_args(args, topic->AsString(), std::move(topic_set),
frame);
Unref(topic);
return new Val(rval, TYPE_BOOL);
%}
## Publishes an event to a node within a pool according to Rendezvous ## Publishes an event to a node within a pool according to Rendezvous
## (Highest Random Weight) hashing strategy. ## (Highest Random Weight) hashing strategy.
## ##
@ -461,77 +247,3 @@ function Cluster::publish_hrw%(pool: Pool, key: any, ...%): bool
Unref(topic); Unref(topic);
return new Val(rval, TYPE_BOOL); return new Val(rval, TYPE_BOOL);
%} %}
## Publishes an event at a given topic, with a receiver node chosen
## from a pool according to Rendezvous (Highest Random Weight) distribution
## strategy. The receiving nodes then automatically forwards it to its peers
## with a different topic. The event is relayed at most a single hop.
##
## pool: the pool of nodes that are eligible to receive the event.
##
## key: data used for input to the hashing function that will uniformly
## distribute keys among available nodes.
##
## args: the first member of the argument list may be either a string or
## a set of strings indicating the secondary topic that the receiver
## will use to re-publish the event. The remaining
## members of the argument list are either the return value of a
## previously-made call to :bro:see:`Broker::make_event` or the
## argument list that should be passed along to it, so that it can
## be called as part of executing this function.
##
## Returns: true if the message is sent.
function Cluster::relay_hrw%(pool: Pool, key: any, ...%): bool
%{
val_list* bif_args = @ARGS@;
if ( bif_args->length() < 4 )
{
builtin_error("Cluster::relay_hrw requires at least 4 arguments");
return new Val(false, TYPE_BOOL);
}
static Func* topic_func = 0;
if ( ! topic_func )
topic_func = global_scope()->Lookup("Cluster::hrw_topic")->ID_Val()->AsFunc();
val_list vl(2);
vl.append(pool->Ref());
vl.append(key->Ref());
auto topic = topic_func->Call(&vl);
if ( ! topic->AsString()->Len() )
{
Unref(topic);
return new Val(false, TYPE_BOOL);
}
auto second_topic = (*bif_args)[2];
if ( second_topic->Type()->Tag() != TYPE_STRING &&
! is_string_set(second_topic->Type()) )
{
builtin_error("Cluster::relay_hrw requires a string or string_set as 3rd argument");
Unref(topic);
return new Val(false, TYPE_BOOL);
}
auto topic_set = val_to_topic_set(second_topic);
if ( topic_set.empty() )
{
Unref(topic);
return new Val(false, TYPE_BOOL);
}
val_list args(bif_args->length() - 3);
for ( auto i = 3; i < bif_args->length(); ++i )
args.append((*bif_args)[i]);
auto rval = relay_event_args(args, topic->AsString(), std::move(topic_set),
frame);
Unref(topic);
return new Val(rval, TYPE_BOOL);
%}

View file

@ -1,3 +0,0 @@
sender added peer: endpoint=127.0.0.1 msg=received handshake from remote core
got ready event
sender lost peer: endpoint=127.0.0.1 msg=lost remote peer

View file

@ -1,2 +0,0 @@
receiver added peer: endpoint=127.0.0.1 msg=handshake successful
got my_event, hello world

View file

@ -1,5 +0,0 @@
receiver added peer: endpoint=127.0.0.1 msg=received handshake from remote core
receiver added peer: endpoint=127.0.0.1 msg=handshake successful
sending ready event
got my_event, hello world
receiver lost peer: endpoint=127.0.0.1 msg=lost remote peer

View file

@ -1,3 +0,0 @@
sender added peer: endpoint=127.0.0.1 msg=received handshake from remote core
got ready event
sender lost peer: endpoint=127.0.0.1 msg=lost remote peer

View file

@ -1,2 +0,0 @@
receiver added peer: endpoint=127.0.0.1 msg=handshake successful
got my_event, hello world

View file

@ -1,4 +0,0 @@
receiver added peer: endpoint=127.0.0.1 msg=received handshake from remote core
receiver added peer: endpoint=127.0.0.1 msg=handshake successful
sending ready event
receiver lost peer: endpoint=127.0.0.1 msg=lost remote peer

View file

@ -157,7 +157,7 @@
0.000000 MetaHookPost CallFunction(Analyzer::register_for_ports, <frame>, (Analyzer::ANALYZER_XMPP, {5222<...>/tcp})) -> <no result> 0.000000 MetaHookPost CallFunction(Analyzer::register_for_ports, <frame>, (Analyzer::ANALYZER_XMPP, {5222<...>/tcp})) -> <no result>
0.000000 MetaHookPost CallFunction(Cluster::is_enabled, <frame>, ()) -> <no result> 0.000000 MetaHookPost CallFunction(Cluster::is_enabled, <frame>, ()) -> <no result>
0.000000 MetaHookPost CallFunction(Cluster::is_enabled, <null>, ()) -> <no result> 0.000000 MetaHookPost CallFunction(Cluster::is_enabled, <null>, ()) -> <no result>
0.000000 MetaHookPost CallFunction(Cluster::local_node_type, <frame>, ()) -> <no result> 0.000000 MetaHookPost CallFunction(Cluster::local_node_type, <null>, ()) -> <no result>
0.000000 MetaHookPost CallFunction(Cluster::register_pool, <frame>, ([topic=bro<...>/logger, node_type=Cluster::LOGGER, max_nodes=<uninitialized>, exclusive=F])) -> <no result> 0.000000 MetaHookPost CallFunction(Cluster::register_pool, <frame>, ([topic=bro<...>/logger, node_type=Cluster::LOGGER, max_nodes=<uninitialized>, exclusive=F])) -> <no result>
0.000000 MetaHookPost CallFunction(Cluster::register_pool, <frame>, ([topic=bro<...>/proxy, node_type=Cluster::PROXY, max_nodes=<uninitialized>, exclusive=F])) -> <no result> 0.000000 MetaHookPost CallFunction(Cluster::register_pool, <frame>, ([topic=bro<...>/proxy, node_type=Cluster::PROXY, max_nodes=<uninitialized>, exclusive=F])) -> <no result>
0.000000 MetaHookPost CallFunction(Cluster::register_pool, <frame>, ([topic=bro<...>/worker, node_type=Cluster::WORKER, max_nodes=<uninitialized>, exclusive=F])) -> <no result> 0.000000 MetaHookPost CallFunction(Cluster::register_pool, <frame>, ([topic=bro<...>/worker, node_type=Cluster::WORKER, max_nodes=<uninitialized>, exclusive=F])) -> <no result>
@ -274,7 +274,7 @@
0.000000 MetaHookPost CallFunction(Log::__create_stream, <frame>, (Weird::LOG, [columns=<no value description>, ev=Weird::log_weird, path=weird])) -> <no result> 0.000000 MetaHookPost CallFunction(Log::__create_stream, <frame>, (Weird::LOG, [columns=<no value description>, ev=Weird::log_weird, path=weird])) -> <no result>
0.000000 MetaHookPost CallFunction(Log::__create_stream, <frame>, (X509::LOG, [columns=<no value description>, ev=X509::log_x509, path=x509])) -> <no result> 0.000000 MetaHookPost CallFunction(Log::__create_stream, <frame>, (X509::LOG, [columns=<no value description>, ev=X509::log_x509, path=x509])) -> <no result>
0.000000 MetaHookPost CallFunction(Log::__create_stream, <frame>, (mysql::LOG, [columns=<no value description>, ev=MySQL::log_mysql, path=mysql])) -> <no result> 0.000000 MetaHookPost CallFunction(Log::__create_stream, <frame>, (mysql::LOG, [columns=<no value description>, ev=MySQL::log_mysql, path=mysql])) -> <no result>
0.000000 MetaHookPost CallFunction(Log::__write, <frame>, (PacketFilter::LOG, [ts=1535492133.703661, node=bro, filter=ip or not ip, init=T, success=T])) -> <no result> 0.000000 MetaHookPost CallFunction(Log::__write, <frame>, (PacketFilter::LOG, [ts=1535587658.201126, node=bro, filter=ip or not ip, init=T, success=T])) -> <no result>
0.000000 MetaHookPost CallFunction(Log::add_default_filter, <frame>, (Broker::LOG)) -> <no result> 0.000000 MetaHookPost CallFunction(Log::add_default_filter, <frame>, (Broker::LOG)) -> <no result>
0.000000 MetaHookPost CallFunction(Log::add_default_filter, <frame>, (Cluster::LOG)) -> <no result> 0.000000 MetaHookPost CallFunction(Log::add_default_filter, <frame>, (Cluster::LOG)) -> <no result>
0.000000 MetaHookPost CallFunction(Log::add_default_filter, <frame>, (Config::LOG)) -> <no result> 0.000000 MetaHookPost CallFunction(Log::add_default_filter, <frame>, (Config::LOG)) -> <no result>
@ -459,7 +459,7 @@
0.000000 MetaHookPost CallFunction(Log::create_stream, <frame>, (Weird::LOG, [columns=<no value description>, ev=Weird::log_weird, path=weird])) -> <no result> 0.000000 MetaHookPost CallFunction(Log::create_stream, <frame>, (Weird::LOG, [columns=<no value description>, ev=Weird::log_weird, path=weird])) -> <no result>
0.000000 MetaHookPost CallFunction(Log::create_stream, <frame>, (X509::LOG, [columns=<no value description>, ev=X509::log_x509, path=x509])) -> <no result> 0.000000 MetaHookPost CallFunction(Log::create_stream, <frame>, (X509::LOG, [columns=<no value description>, ev=X509::log_x509, path=x509])) -> <no result>
0.000000 MetaHookPost CallFunction(Log::create_stream, <frame>, (mysql::LOG, [columns=<no value description>, ev=MySQL::log_mysql, path=mysql])) -> <no result> 0.000000 MetaHookPost CallFunction(Log::create_stream, <frame>, (mysql::LOG, [columns=<no value description>, ev=MySQL::log_mysql, path=mysql])) -> <no result>
0.000000 MetaHookPost CallFunction(Log::write, <frame>, (PacketFilter::LOG, [ts=1535492133.703661, node=bro, filter=ip or not ip, init=T, success=T])) -> <no result> 0.000000 MetaHookPost CallFunction(Log::write, <frame>, (PacketFilter::LOG, [ts=1535587658.201126, node=bro, filter=ip or not ip, init=T, success=T])) -> <no result>
0.000000 MetaHookPost CallFunction(NetControl::check_plugins, <frame>, ()) -> <no result> 0.000000 MetaHookPost CallFunction(NetControl::check_plugins, <frame>, ()) -> <no result>
0.000000 MetaHookPost CallFunction(NetControl::init, <null>, ()) -> <no result> 0.000000 MetaHookPost CallFunction(NetControl::init, <null>, ()) -> <no result>
0.000000 MetaHookPost CallFunction(Notice::want_pp, <frame>, ()) -> <no result> 0.000000 MetaHookPost CallFunction(Notice::want_pp, <frame>, ()) -> <no result>
@ -1036,7 +1036,7 @@
0.000000 MetaHookPre CallFunction(Analyzer::register_for_ports, <frame>, (Analyzer::ANALYZER_XMPP, {5222<...>/tcp})) 0.000000 MetaHookPre CallFunction(Analyzer::register_for_ports, <frame>, (Analyzer::ANALYZER_XMPP, {5222<...>/tcp}))
0.000000 MetaHookPre CallFunction(Cluster::is_enabled, <frame>, ()) 0.000000 MetaHookPre CallFunction(Cluster::is_enabled, <frame>, ())
0.000000 MetaHookPre CallFunction(Cluster::is_enabled, <null>, ()) 0.000000 MetaHookPre CallFunction(Cluster::is_enabled, <null>, ())
0.000000 MetaHookPre CallFunction(Cluster::local_node_type, <frame>, ()) 0.000000 MetaHookPre CallFunction(Cluster::local_node_type, <null>, ())
0.000000 MetaHookPre CallFunction(Cluster::register_pool, <frame>, ([topic=bro<...>/logger, node_type=Cluster::LOGGER, max_nodes=<uninitialized>, exclusive=F])) 0.000000 MetaHookPre CallFunction(Cluster::register_pool, <frame>, ([topic=bro<...>/logger, node_type=Cluster::LOGGER, max_nodes=<uninitialized>, exclusive=F]))
0.000000 MetaHookPre CallFunction(Cluster::register_pool, <frame>, ([topic=bro<...>/proxy, node_type=Cluster::PROXY, max_nodes=<uninitialized>, exclusive=F])) 0.000000 MetaHookPre CallFunction(Cluster::register_pool, <frame>, ([topic=bro<...>/proxy, node_type=Cluster::PROXY, max_nodes=<uninitialized>, exclusive=F]))
0.000000 MetaHookPre CallFunction(Cluster::register_pool, <frame>, ([topic=bro<...>/worker, node_type=Cluster::WORKER, max_nodes=<uninitialized>, exclusive=F])) 0.000000 MetaHookPre CallFunction(Cluster::register_pool, <frame>, ([topic=bro<...>/worker, node_type=Cluster::WORKER, max_nodes=<uninitialized>, exclusive=F]))
@ -1153,7 +1153,7 @@
0.000000 MetaHookPre CallFunction(Log::__create_stream, <frame>, (Weird::LOG, [columns=<no value description>, ev=Weird::log_weird, path=weird])) 0.000000 MetaHookPre CallFunction(Log::__create_stream, <frame>, (Weird::LOG, [columns=<no value description>, ev=Weird::log_weird, path=weird]))
0.000000 MetaHookPre CallFunction(Log::__create_stream, <frame>, (X509::LOG, [columns=<no value description>, ev=X509::log_x509, path=x509])) 0.000000 MetaHookPre CallFunction(Log::__create_stream, <frame>, (X509::LOG, [columns=<no value description>, ev=X509::log_x509, path=x509]))
0.000000 MetaHookPre CallFunction(Log::__create_stream, <frame>, (mysql::LOG, [columns=<no value description>, ev=MySQL::log_mysql, path=mysql])) 0.000000 MetaHookPre CallFunction(Log::__create_stream, <frame>, (mysql::LOG, [columns=<no value description>, ev=MySQL::log_mysql, path=mysql]))
0.000000 MetaHookPre CallFunction(Log::__write, <frame>, (PacketFilter::LOG, [ts=1535492133.703661, node=bro, filter=ip or not ip, init=T, success=T])) 0.000000 MetaHookPre CallFunction(Log::__write, <frame>, (PacketFilter::LOG, [ts=1535587658.201126, node=bro, filter=ip or not ip, init=T, success=T]))
0.000000 MetaHookPre CallFunction(Log::add_default_filter, <frame>, (Broker::LOG)) 0.000000 MetaHookPre CallFunction(Log::add_default_filter, <frame>, (Broker::LOG))
0.000000 MetaHookPre CallFunction(Log::add_default_filter, <frame>, (Cluster::LOG)) 0.000000 MetaHookPre CallFunction(Log::add_default_filter, <frame>, (Cluster::LOG))
0.000000 MetaHookPre CallFunction(Log::add_default_filter, <frame>, (Config::LOG)) 0.000000 MetaHookPre CallFunction(Log::add_default_filter, <frame>, (Config::LOG))
@ -1338,7 +1338,7 @@
0.000000 MetaHookPre CallFunction(Log::create_stream, <frame>, (Weird::LOG, [columns=<no value description>, ev=Weird::log_weird, path=weird])) 0.000000 MetaHookPre CallFunction(Log::create_stream, <frame>, (Weird::LOG, [columns=<no value description>, ev=Weird::log_weird, path=weird]))
0.000000 MetaHookPre CallFunction(Log::create_stream, <frame>, (X509::LOG, [columns=<no value description>, ev=X509::log_x509, path=x509])) 0.000000 MetaHookPre CallFunction(Log::create_stream, <frame>, (X509::LOG, [columns=<no value description>, ev=X509::log_x509, path=x509]))
0.000000 MetaHookPre CallFunction(Log::create_stream, <frame>, (mysql::LOG, [columns=<no value description>, ev=MySQL::log_mysql, path=mysql])) 0.000000 MetaHookPre CallFunction(Log::create_stream, <frame>, (mysql::LOG, [columns=<no value description>, ev=MySQL::log_mysql, path=mysql]))
0.000000 MetaHookPre CallFunction(Log::write, <frame>, (PacketFilter::LOG, [ts=1535492133.703661, node=bro, filter=ip or not ip, init=T, success=T])) 0.000000 MetaHookPre CallFunction(Log::write, <frame>, (PacketFilter::LOG, [ts=1535587658.201126, node=bro, filter=ip or not ip, init=T, success=T]))
0.000000 MetaHookPre CallFunction(NetControl::check_plugins, <frame>, ()) 0.000000 MetaHookPre CallFunction(NetControl::check_plugins, <frame>, ())
0.000000 MetaHookPre CallFunction(NetControl::init, <null>, ()) 0.000000 MetaHookPre CallFunction(NetControl::init, <null>, ())
0.000000 MetaHookPre CallFunction(Notice::want_pp, <frame>, ()) 0.000000 MetaHookPre CallFunction(Notice::want_pp, <frame>, ())
@ -2031,7 +2031,7 @@
0.000000 | HookCallFunction Log::__create_stream(Weird::LOG, [columns=<no value description>, ev=Weird::log_weird, path=weird]) 0.000000 | HookCallFunction Log::__create_stream(Weird::LOG, [columns=<no value description>, ev=Weird::log_weird, path=weird])
0.000000 | HookCallFunction Log::__create_stream(X509::LOG, [columns=<no value description>, ev=X509::log_x509, path=x509]) 0.000000 | HookCallFunction Log::__create_stream(X509::LOG, [columns=<no value description>, ev=X509::log_x509, path=x509])
0.000000 | HookCallFunction Log::__create_stream(mysql::LOG, [columns=<no value description>, ev=MySQL::log_mysql, path=mysql]) 0.000000 | HookCallFunction Log::__create_stream(mysql::LOG, [columns=<no value description>, ev=MySQL::log_mysql, path=mysql])
0.000000 | HookCallFunction Log::__write(PacketFilter::LOG, [ts=1535492133.703661, node=bro, filter=ip or not ip, init=T, success=T]) 0.000000 | HookCallFunction Log::__write(PacketFilter::LOG, [ts=1535587658.201126, node=bro, filter=ip or not ip, init=T, success=T])
0.000000 | HookCallFunction Log::add_default_filter(Broker::LOG) 0.000000 | HookCallFunction Log::add_default_filter(Broker::LOG)
0.000000 | HookCallFunction Log::add_default_filter(Cluster::LOG) 0.000000 | HookCallFunction Log::add_default_filter(Cluster::LOG)
0.000000 | HookCallFunction Log::add_default_filter(Config::LOG) 0.000000 | HookCallFunction Log::add_default_filter(Config::LOG)
@ -2216,7 +2216,7 @@
0.000000 | HookCallFunction Log::create_stream(Weird::LOG, [columns=<no value description>, ev=Weird::log_weird, path=weird]) 0.000000 | HookCallFunction Log::create_stream(Weird::LOG, [columns=<no value description>, ev=Weird::log_weird, path=weird])
0.000000 | HookCallFunction Log::create_stream(X509::LOG, [columns=<no value description>, ev=X509::log_x509, path=x509]) 0.000000 | HookCallFunction Log::create_stream(X509::LOG, [columns=<no value description>, ev=X509::log_x509, path=x509])
0.000000 | HookCallFunction Log::create_stream(mysql::LOG, [columns=<no value description>, ev=MySQL::log_mysql, path=mysql]) 0.000000 | HookCallFunction Log::create_stream(mysql::LOG, [columns=<no value description>, ev=MySQL::log_mysql, path=mysql])
0.000000 | HookCallFunction Log::write(PacketFilter::LOG, [ts=1535492133.703661, node=bro, filter=ip or not ip, init=T, success=T]) 0.000000 | HookCallFunction Log::write(PacketFilter::LOG, [ts=1535587658.201126, node=bro, filter=ip or not ip, init=T, success=T])
0.000000 | HookCallFunction NetControl::check_plugins() 0.000000 | HookCallFunction NetControl::check_plugins()
0.000000 | HookCallFunction NetControl::init() 0.000000 | HookCallFunction NetControl::init()
0.000000 | HookCallFunction Notice::want_pp() 0.000000 | HookCallFunction Notice::want_pp()
@ -2630,7 +2630,7 @@
0.000000 | HookLoadFile base<...>/x509 0.000000 | HookLoadFile base<...>/x509
0.000000 | HookLoadFile base<...>/xmpp 0.000000 | HookLoadFile base<...>/xmpp
0.000000 | HookLogInit packet_filter 1/1 {ts (time), node (string), filter (string), init (bool), success (bool)} 0.000000 | HookLogInit packet_filter 1/1 {ts (time), node (string), filter (string), init (bool), success (bool)}
0.000000 | HookLogWrite packet_filter [ts=1535492133.703661, node=bro, filter=ip or not ip, init=T, success=T] 0.000000 | HookLogWrite packet_filter [ts=1535587658.201126, node=bro, filter=ip or not ip, init=T, success=T]
0.000000 | HookQueueEvent NetControl::init() 0.000000 | HookQueueEvent NetControl::init()
0.000000 | HookQueueEvent bro_init() 0.000000 | HookQueueEvent bro_init()
0.000000 | HookQueueEvent filter_change_tracking() 0.000000 | HookQueueEvent filter_change_tracking()

View file

@ -0,0 +1,8 @@
Connected to a peer
Connected to a peer
Connected to a peer
Got fully_connected event
Got fully_connected event
Connected to a peer
Got fully_connected event
Got fully_connected event

View file

@ -0,0 +1,3 @@
Connected to a peer
Connected to a peer
Connected to a peer

View file

@ -0,0 +1,3 @@
Connected to a peer
Connected to a peer
Connected to a peer

View file

@ -0,0 +1,3 @@
Connected to a peer
Connected to a peer
Connected to a peer

View file

@ -0,0 +1,4 @@
Connected to a peer
Connected to a peer
Connected to a peer
got forwarded event

View file

@ -1,119 +0,0 @@
# @TEST-SERIALIZE: comm
#
# @TEST-EXEC: btest-bg-run three "bro -B broker -b ../three.bro >three.out"
# @TEST-EXEC: btest-bg-run two "bro -B broker -b ../two.bro >two.out"
# @TEST-EXEC: btest-bg-run one "bro -B broker -b ../one.bro >one.out"
#
# @TEST-EXEC: btest-bg-wait 20
# @TEST-EXEC: btest-diff one/one.out
# @TEST-EXEC: btest-diff two/two.out
# @TEST-EXEC: btest-diff three/three.out
@TEST-START-FILE one.bro
redef exit_only_after_terminate = T;
event my_event(s: string)
{
print "got my_event", s;
}
event ready_event()
{
print "got ready event";
Broker::publish_and_relay("bro/event/pre-relay", "bro/event/post-relay",
my_event, "hello world");
}
event bro_init()
{
Broker::subscribe("bro/event/ready");
Broker::peer("127.0.0.1", 10000/tcp);
}
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
{
print fmt("sender added peer: endpoint=%s msg=%s",
endpoint$network$address, msg);
}
event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
{
print fmt("sender lost peer: endpoint=%s msg=%s",
endpoint$network$address, msg);
terminate();
}
@TEST-END-FILE
@TEST-START-FILE two.bro
redef exit_only_after_terminate = T;
global peers_added = 0;
event my_event(s: string)
{
print "got my_event", s;
}
event ready_event()
{
}
event bro_init()
{
Broker::subscribe("bro/event/pre-relay");
Broker::listen("127.0.0.1", 10000/tcp);
Broker::peer("127.0.0.1", 9999/tcp);
}
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
{
print fmt("receiver added peer: endpoint=%s msg=%s", endpoint$network$address, msg);
++peers_added;
if ( peers_added == 2 )
{
print "sending ready event";
Broker::publish("bro/event/ready", ready_event);
}
}
event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
{
print fmt("receiver lost peer: endpoint=%s msg=%s", endpoint$network$address, msg);
terminate();
}
@TEST-END-FILE
@TEST-START-FILE three.bro
redef exit_only_after_terminate = T;
event my_event(s: string)
{
print "got my_event", s;
terminate();
}
event bro_init()
{
Broker::subscribe("bro/event/post-relay");
Broker::listen("127.0.0.1", 9999/tcp);
}
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
{
print fmt("receiver added peer: endpoint=%s msg=%s", endpoint$network$address, msg);
}
event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
{
print fmt("receiver lost peer: endpoint=%s msg=%s", endpoint$network$address, msg);
}
@TEST-END-FILE

View file

@ -1,120 +0,0 @@
# @TEST-SERIALIZE: comm
#
# @TEST-EXEC: btest-bg-run three "bro -B broker -b ../three.bro >three.out"
# @TEST-EXEC: btest-bg-run two "bro -B broker -b ../two.bro >two.out"
# @TEST-EXEC: btest-bg-run one "bro -B broker -b ../one.bro >one.out"
#
# @TEST-EXEC: btest-bg-wait 20
# @TEST-EXEC: btest-diff one/one.out
# @TEST-EXEC: btest-diff two/two.out
# @TEST-EXEC: btest-diff three/three.out
@TEST-START-FILE one.bro
redef exit_only_after_terminate = T;
event my_event(s: string)
{
print "got my_event", s;
}
event ready_event()
{
print "got ready event";
Broker::relay("bro/event/pre-relay", "bro/event/post-relay", my_event,
"hello world");
}
event bro_init()
{
Broker::subscribe("bro/event/ready");
Broker::peer("127.0.0.1", 10000/tcp);
}
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
{
print fmt("sender added peer: endpoint=%s msg=%s",
endpoint$network$address, msg);
}
event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
{
print fmt("sender lost peer: endpoint=%s msg=%s",
endpoint$network$address, msg);
terminate();
}
@TEST-END-FILE
@TEST-START-FILE two.bro
redef exit_only_after_terminate = T;
global peers_added = 0;
event my_event(s: string)
{
print "got my_event", s;
terminate();
}
event ready_event()
{
}
event bro_init()
{
Broker::subscribe("bro/event/pre-relay");
Broker::listen("127.0.0.1", 10000/tcp);
Broker::peer("127.0.0.1", 9999/tcp);
}
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
{
print fmt("receiver added peer: endpoint=%s msg=%s", endpoint$network$address, msg);
++peers_added;
if ( peers_added == 2 )
{
print "sending ready event";
Broker::publish("bro/event/ready", ready_event);
}
}
event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
{
print fmt("receiver lost peer: endpoint=%s msg=%s", endpoint$network$address, msg);
terminate();
}
@TEST-END-FILE
@TEST-START-FILE three.bro
redef exit_only_after_terminate = T;
event my_event(s: string)
{
print "got my_event", s;
terminate();
}
event bro_init()
{
Broker::subscribe("bro/event/post-relay");
Broker::listen("127.0.0.1", 9999/tcp);
}
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
{
print fmt("receiver added peer: endpoint=%s msg=%s", endpoint$network$address, msg);
}
event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
{
print fmt("receiver lost peer: endpoint=%s msg=%s", endpoint$network$address, msg);
}
@TEST-END-FILE

View file

@ -0,0 +1,105 @@
# @TEST-SERIALIZE: comm
#
# @TEST-EXEC: btest-bg-run manager-1 BROPATH=$BROPATH:.. CLUSTER_NODE=manager-1 bro %INPUT
# @TEST-EXEC: btest-bg-run proxy-1 BROPATH=$BROPATH:.. CLUSTER_NODE=proxy-1 bro %INPUT
# @TEST-EXEC: btest-bg-run proxy-2 BROPATH=$BROPATH:.. CLUSTER_NODE=proxy-2 bro %INPUT
# @TEST-EXEC: btest-bg-run worker-1 BROPATH=$BROPATH:.. CLUSTER_NODE=worker-1 bro %INPUT
# @TEST-EXEC: btest-bg-run worker-2 BROPATH=$BROPATH:.. CLUSTER_NODE=worker-2 bro %INPUT
# @TEST-EXEC: btest-bg-wait 30
# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-sort btest-diff manager-1/.stdout
# @TEST-EXEC: btest-diff proxy-1/.stdout
# @TEST-EXEC: btest-diff proxy-2/.stdout
# @TEST-EXEC: btest-diff worker-1/.stdout
# @TEST-EXEC: btest-diff worker-2/.stdout
@TEST-START-FILE cluster-layout.bro
redef Cluster::nodes = {
["manager-1"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=37757/tcp],
["proxy-1"] = [$node_type=Cluster::PROXY, $ip=127.0.0.1, $p=37758/tcp, $manager="manager-1"],
["proxy-2"] = [$node_type=Cluster::PROXY, $ip=127.0.0.1, $p=37759/tcp, $manager="manager-1"],
["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=37760/tcp, $manager="manager-1", $interface="eth0"],
["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=37761/tcp, $manager="manager-1", $interface="eth1"],
};
@TEST-END-FILE
global fully_connected: event();
global peer_count = 0;
global peers_lost = 0;
global fully_connected_nodes = 0;
event forwarded_event()
{
print "got forwarded event";
if ( Cluster::node == "manager-1" )
print "manager should NOT have raised the forwarded event";
terminate();
}
event ready()
{
# note that the publishing node, worker-1, will not receive the forwarded
# event as Broker's forwarding prevents the message going back to the
# immediate sender.
Broker::publish("test_topic", forwarded_event);
}
event fully_connected()
{
if ( ! is_remote_event() )
return;
print "Got fully_connected event";
fully_connected_nodes = fully_connected_nodes + 1;
if ( Cluster::node == "manager-1" )
{
if ( peer_count == 4 && fully_connected_nodes == 4 )
Broker::publish(Cluster::node_topic("worker-1"), ready);
}
}
event bro_init()
{
Broker::auto_publish(Cluster::manager_topic, fully_connected);
if ( Cluster::node == "manager-1" )
Broker::forward("test_topic");
if ( Cluster::node == "worker-1" )
Broker::subscribe("test_topic");
if ( Cluster::node == "worker-2" )
Broker::subscribe("test_topic");
}
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
{
print "Connected to a peer";
peer_count = peer_count + 1;
if ( Cluster::node == "manager-1" )
{
if ( peer_count == 4 && fully_connected_nodes == 4 )
Broker::publish(Cluster::node_topic("worker-1"), ready);
}
else
{
if ( peer_count == 3 )
event fully_connected();
}
}
event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
{
++peers_lost;
if ( Cluster::node == "manager-1" )
{
if ( peers_lost == 2 )
# Both workers terminated
terminate();
}
else
terminate();
}