Remove Cluster::broadcast_topic

As enabling Broker forwarding would cause routing loops with messages
sent to such a topic (one subscribed to on all nodes).
This commit is contained in:
Jon Siwek 2018-08-28 16:40:48 -05:00
parent 2f1e81059b
commit 8db042a8c2
4 changed files with 39 additions and 13 deletions

View file

@ -271,9 +271,9 @@ communication flows unrelated to Bro cluster, new topics are declared
For cluster operation, see :doc:`/scripts/base/frameworks/cluster/main.bro` For cluster operation, see :doc:`/scripts/base/frameworks/cluster/main.bro`
for a list of topics that are useful for steering published events to for a list of topics that are useful for steering published events to
the various node classes. E.g. you have the ability to broadcast to all the various node classes. E.g. you have the ability to broadcast
directly-connected nodes, only those of a given class (e.g. just workers), to all nodes of a given class (e.g. just workers) or just send to a
or to a specific node within a class. specific node within a class.
The topic names that logs get published under are a bit nuanced. In the The topic names that logs get published under are a bit nuanced. In the
default cluster configuration, they are round-robin published to default cluster configuration, they are round-robin published to

View file

@ -15,10 +15,6 @@ export {
## Whether to distribute log messages among available logging nodes. ## Whether to distribute log messages among available logging nodes.
const enable_round_robin_logging = T &redef; const enable_round_robin_logging = T &redef;
## The topic name used for exchanging general messages that are relevant to
## any node in a cluster. Used with broker-enabled cluster communication.
const broadcast_topic = "bro/cluster/broadcast" &redef;
## The topic name used for exchanging messages that are relevant to ## The topic name used for exchanging messages that are relevant to
## logger nodes in a cluster. Used with broker-enabled cluster communication. ## logger nodes in a cluster. Used with broker-enabled cluster communication.
const logger_topic = "bro/cluster/logger" &redef; const logger_topic = "bro/cluster/logger" &redef;
@ -43,6 +39,10 @@ export {
## a named node in a cluster. Used with broker-enabled cluster communication. ## a named node in a cluster. Used with broker-enabled cluster communication.
const node_topic_prefix = "bro/cluster/node/" &redef; const node_topic_prefix = "bro/cluster/node/" &redef;
## The topic prefix used for exchanging messages that are relevant to
## a unique node in a cluster. Used with broker-enabled cluster communication.
const nodeid_topic_prefix = "bro/cluster/nodeid/" &redef;
## Name of the node on which master data stores will be created if no other ## Name of the node on which master data stores will be created if no other
## has already been specified by the user in :bro:see:`Cluster::stores`. ## has already been specified by the user in :bro:see:`Cluster::stores`.
## An empty value means "use whatever name corresponds to the manager ## An empty value means "use whatever name corresponds to the manager
@ -238,6 +238,15 @@ export {
## Returns: a topic string that may used to send a message exclusively to ## Returns: a topic string that may used to send a message exclusively to
## a given cluster node. ## a given cluster node.
global node_topic: function(name: string): string; global node_topic: function(name: string): string;
## Retrieve the topic associated with a specific node in the cluster.
##
## id: the id of the cluster node (from :bro:see:`Broker::EndpointInfo`
## or :bro:see:`Broker::node_id`.
##
## Returns: a topic string that may used to send a message exclusively to
## a given cluster node.
global nodeid_topic: function(id: string): string;
} }
global active_worker_ids: set[string] = set(); global active_worker_ids: set[string] = set();
@ -286,6 +295,11 @@ function node_topic(name: string): string
return node_topic_prefix + name; return node_topic_prefix + name;
} }
function nodeid_topic(id: string): string
{
return node_topic_prefix + id;
}
event Cluster::hello(name: string, id: string) &priority=10 event Cluster::hello(name: string, id: string) &priority=10
{ {
if ( name !in nodes ) if ( name !in nodes )
@ -321,7 +335,7 @@ event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) &priority=
return; return;
local e = Broker::make_event(Cluster::hello, node, Broker::node_id()); local e = Broker::make_event(Cluster::hello, node, Broker::node_id());
Broker::publish(Cluster::broadcast_topic, e); Broker::publish(nodeid_topic(endpoint$id), e);
} }
event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) &priority=10 event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) &priority=10

View file

@ -87,7 +87,7 @@ event bro_init() &priority=-10
return; return;
} }
Broker::subscribe(Cluster::broadcast_topic); Broker::subscribe(nodeid_topic(Broker::node_id()));
Broker::subscribe(node_topic(node)); Broker::subscribe(node_topic(node));
Broker::listen(Broker::default_listen_address, Broker::listen(Broker::default_listen_address,

View file

@ -52,12 +52,25 @@ type OptionCacheValue: record {
global option_cache: table[string] of OptionCacheValue; global option_cache: table[string] of OptionCacheValue;
global Config::cluster_set_option: event(ID: string, val: any, location: string);
function broadcast_option(ID: string, val: any, location: string)
{
# There's not currently a common topic to broadcast to as then enabling
# implicit Broker forwarding would cause a routing loop.
Broker::publish(Cluster::worker_topic, Config::cluster_set_option,
ID, val, location);
Broker::publish(Cluster::proxy_topic, Config::cluster_set_option,
ID, val, location);
Broker::publish(Cluster::logger_topic, Config::cluster_set_option,
ID, val, location);
}
event Config::cluster_set_option(ID: string, val: any, location: string) event Config::cluster_set_option(ID: string, val: any, location: string)
{ {
@if ( Cluster::local_node_type() == Cluster::MANAGER ) @if ( Cluster::local_node_type() == Cluster::MANAGER )
option_cache[ID] = OptionCacheValue($val=val, $location=location); option_cache[ID] = OptionCacheValue($val=val, $location=location);
Broker::publish(Cluster::broadcast_topic, Config::cluster_set_option, broadcast_option(ID, val, location);
ID, val, location);
@endif @endif
Option::set(ID, val, location); Option::set(ID, val, location);
@ -77,8 +90,7 @@ function set_value(ID: string, val: any, location: string &default = "" &optiona
@if ( Cluster::local_node_type() == Cluster::MANAGER ) @if ( Cluster::local_node_type() == Cluster::MANAGER )
option_cache[ID] = OptionCacheValue($val=val, $location=location); option_cache[ID] = OptionCacheValue($val=val, $location=location);
Broker::publish(Cluster::broadcast_topic, Config::cluster_set_option, broadcast_option(ID, val, location);
ID, val, location);
@else @else
Broker::publish(Cluster::manager_topic, Config::cluster_set_option, Broker::publish(Cluster::manager_topic, Config::cluster_set_option,
ID, val, location); ID, val, location);