From 8db042a8c26962a899fc04dbb06a09a78eab9ba3 Mon Sep 17 00:00:00 2001 From: Jon Siwek Date: Tue, 28 Aug 2018 16:40:48 -0500 Subject: [PATCH] Remove Cluster::broadcast_topic As enabling Broker forwarding would cause routing loops with messages sent to such a topic (one subscribed to on all nodes). --- doc/frameworks/broker.rst | 6 ++--- scripts/base/frameworks/cluster/main.bro | 24 +++++++++++++++---- .../frameworks/cluster/setup-connections.bro | 2 +- scripts/base/frameworks/config/main.bro | 20 ++++++++++++---- 4 files changed, 39 insertions(+), 13 deletions(-) diff --git a/doc/frameworks/broker.rst b/doc/frameworks/broker.rst index 057096f86e..191b8178cc 100644 --- a/doc/frameworks/broker.rst +++ b/doc/frameworks/broker.rst @@ -271,9 +271,9 @@ communication flows unrelated to Bro cluster, new topics are declared For cluster operation, see :doc:`/scripts/base/frameworks/cluster/main.bro` for a list of topics that are useful for steering published events to -the various node classes. E.g. you have the ability to broadcast to all -directly-connected nodes, only those of a given class (e.g. just workers), -or to a specific node within a class. +the various node classes. E.g. you have the ability to broadcast +to all nodes of a given class (e.g. just workers) or just send to a +specific node within a class. The topic names that logs get published under are a bit nuanced. In the default cluster configuration, they are round-robin published to diff --git a/scripts/base/frameworks/cluster/main.bro b/scripts/base/frameworks/cluster/main.bro index 25c0f4f63e..4c7f806131 100644 --- a/scripts/base/frameworks/cluster/main.bro +++ b/scripts/base/frameworks/cluster/main.bro @@ -15,10 +15,6 @@ export { ## Whether to distribute log messages among available logging nodes. const enable_round_robin_logging = T &redef; - ## The topic name used for exchanging general messages that are relevant to - ## any node in a cluster. Used with broker-enabled cluster communication. - const broadcast_topic = "bro/cluster/broadcast" &redef; - ## The topic name used for exchanging messages that are relevant to ## logger nodes in a cluster. Used with broker-enabled cluster communication. const logger_topic = "bro/cluster/logger" &redef; @@ -43,6 +39,10 @@ export { ## a named node in a cluster. Used with broker-enabled cluster communication. const node_topic_prefix = "bro/cluster/node/" &redef; + ## The topic prefix used for exchanging messages that are relevant to + ## a unique node in a cluster. Used with broker-enabled cluster communication. + const nodeid_topic_prefix = "bro/cluster/nodeid/" &redef; + ## Name of the node on which master data stores will be created if no other ## has already been specified by the user in :bro:see:`Cluster::stores`. ## An empty value means "use whatever name corresponds to the manager @@ -238,6 +238,15 @@ export { ## Returns: a topic string that may used to send a message exclusively to ## a given cluster node. global node_topic: function(name: string): string; + + ## Retrieve the topic associated with a specific node in the cluster. + ## + ## id: the id of the cluster node (from :bro:see:`Broker::EndpointInfo` + ## or :bro:see:`Broker::node_id`. + ## + ## Returns: a topic string that may used to send a message exclusively to + ## a given cluster node. + global nodeid_topic: function(id: string): string; } global active_worker_ids: set[string] = set(); @@ -286,6 +295,11 @@ function node_topic(name: string): string return node_topic_prefix + name; } +function nodeid_topic(id: string): string + { + return node_topic_prefix + id; + } + event Cluster::hello(name: string, id: string) &priority=10 { if ( name !in nodes ) @@ -321,7 +335,7 @@ event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) &priority= return; local e = Broker::make_event(Cluster::hello, node, Broker::node_id()); - Broker::publish(Cluster::broadcast_topic, e); + Broker::publish(nodeid_topic(endpoint$id), e); } event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) &priority=10 diff --git a/scripts/base/frameworks/cluster/setup-connections.bro b/scripts/base/frameworks/cluster/setup-connections.bro index 63ddbdd8b0..a90081c639 100644 --- a/scripts/base/frameworks/cluster/setup-connections.bro +++ b/scripts/base/frameworks/cluster/setup-connections.bro @@ -87,7 +87,7 @@ event bro_init() &priority=-10 return; } - Broker::subscribe(Cluster::broadcast_topic); + Broker::subscribe(nodeid_topic(Broker::node_id())); Broker::subscribe(node_topic(node)); Broker::listen(Broker::default_listen_address, diff --git a/scripts/base/frameworks/config/main.bro b/scripts/base/frameworks/config/main.bro index 7dce8284b4..dc7e71ecdf 100644 --- a/scripts/base/frameworks/config/main.bro +++ b/scripts/base/frameworks/config/main.bro @@ -52,12 +52,25 @@ type OptionCacheValue: record { global option_cache: table[string] of OptionCacheValue; +global Config::cluster_set_option: event(ID: string, val: any, location: string); + +function broadcast_option(ID: string, val: any, location: string) + { + # There's not currently a common topic to broadcast to as then enabling + # implicit Broker forwarding would cause a routing loop. + Broker::publish(Cluster::worker_topic, Config::cluster_set_option, + ID, val, location); + Broker::publish(Cluster::proxy_topic, Config::cluster_set_option, + ID, val, location); + Broker::publish(Cluster::logger_topic, Config::cluster_set_option, + ID, val, location); + } + event Config::cluster_set_option(ID: string, val: any, location: string) { @if ( Cluster::local_node_type() == Cluster::MANAGER ) option_cache[ID] = OptionCacheValue($val=val, $location=location); - Broker::publish(Cluster::broadcast_topic, Config::cluster_set_option, - ID, val, location); + broadcast_option(ID, val, location); @endif Option::set(ID, val, location); @@ -77,8 +90,7 @@ function set_value(ID: string, val: any, location: string &default = "" &optiona @if ( Cluster::local_node_type() == Cluster::MANAGER ) option_cache[ID] = OptionCacheValue($val=val, $location=location); - Broker::publish(Cluster::broadcast_topic, Config::cluster_set_option, - ID, val, location); + broadcast_option(ID, val, location); @else Broker::publish(Cluster::manager_topic, Config::cluster_set_option, ID, val, location);