diff --git a/CHANGES b/CHANGES index 8db9af5621..46502f8781 100644 --- a/CHANGES +++ b/CHANGES @@ -1,4 +1,39 @@ +2.5-931 | 2018-08-29 23:45:29 +0000 + + * Add Broker::forward() function. This enables explicit forwarding + of events matching a given topic prefix. Even if a receiving node + has an event handler, it will not be raised if the event was sent + along a topic that matches a previous call to Broker::forward(). + (Jon Siwek, Corelight) + + * Enable implicit Broker message forwarding by default. (Jon Siwek, + Corelight) + + * Remove Cluster::broadcast_topic. As enabling Broker forwarding + would cause routing loops with messages sent to such a topic (one + subscribed to on all nodes). (Jon Siwek, Corelight) + + * Remove Intel Broker topics, re-use existing Cluster topics. (Jon + Siwek, Corelight) + + * Update broker docs to reflect best-practice/convention for + declaring new topics. + + * Remove "relay" family of Broker functions. (Jon Siwek, Corelight) + + Namely these are now removed: + + - Broker::relay + - Broker::publish_and_relay + - Cluster::relay_rr + - Cluster::relay_hrw + + The idea being that Broker may eventually implement the necessary + routing (plus load balancing) functionality. For now, code that + used these should "manually" handle and re-publish events as + needed. + 2.5-924 | 2018-08-29 18:21:37 -0500 * Allow event/function headers to be wrapped in directives. (Johanna Amann) @@ -970,7 +1005,7 @@ 2.5-405 | 2018-02-05 13:29:39 -0600 * Add MOUNT3 protocol parser. - + It's not activated by default. New events available: mount_proc_null, mount_proc_mnt, mount_proc_umnt, mount_proc_umnt_all, mount_proc_not_implemented, mount_reply_status. (Devin Trejo) @@ -1184,7 +1219,7 @@ * BIT-1845 - Make "in" keyword work with binary data. (Johanna Amann) - * Add TLS 1.3 fix and testcase due to Google Chrome's use of TLS 1.3. + * Add TLS 1.3 fix and testcase due to Google Chrome's use of TLS 1.3. It turns out that Chrome supports an experimental mode to support TLS 1.3, which uses a non-standard way to negotiate TLS 1.3 with a server. @@ -1381,7 +1416,7 @@ 2.5-140 | 2017-05-12 15:31:32 -0400 - * Lessen cluster load due to notice suppression. + * Lessen cluster load due to notice suppression. (Johanna Amann, Justin Azoff) 2.5-137 | 2017-05-04 11:37:48 -0500 diff --git a/VERSION b/VERSION index 919bb76990..796adf46ab 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.5-924 +2.5-931 diff --git a/aux/binpac b/aux/binpac index 3ebf910b6b..ff8c94964f 160000 --- a/aux/binpac +++ b/aux/binpac @@ -1 +1 @@ -Subproject commit 3ebf910b6befde6352e3af0b25589cfc2545cb5a +Subproject commit ff8c94964fccbf60abae401d03c9fb35a8894c16 diff --git a/aux/broctl b/aux/broctl index 486bbb9d9e..a88cb3434e 160000 --- a/aux/broctl +++ b/aux/broctl @@ -1 +1 @@ -Subproject commit 486bbb9d9ee7c66b55003e58f986d18e951902ec +Subproject commit a88cb3434e05dbb117687f6152acc8892ed969ca diff --git a/aux/broker b/aux/broker index dede39c133..a3e188680c 160000 --- a/aux/broker +++ b/aux/broker @@ -1 +1 @@ -Subproject commit dede39c13390bcd57dff311dab9648db7dfdaa89 +Subproject commit a3e188680cd2889edbb9cf09c01fb1f031a90975 diff --git a/doc/frameworks/broker.rst b/doc/frameworks/broker.rst index e050ec6479..191b8178cc 100644 --- a/doc/frameworks/broker.rst +++ b/doc/frameworks/broker.rst @@ -237,7 +237,7 @@ follows certain conventions in choosing these topics to help avoid conflicts and generally make them easier to remember. As a reminder of how topic subscriptions work, subscribers advertise -interest in a topic **prefix** and then receive any messages publish by a +interest in a topic **prefix** and then receive any messages published by a peer to a topic name that starts with that prefix. E.g. Alice subscribes to the "alice/dogs" prefix, then would receive the following message topics published by Bob: @@ -263,12 +263,17 @@ scripts use will be along the lines of "bro//" with "" being the script's module name (in all-undercase). For example, you might expect an imaginary "Pretend" framework to publish/subscribe using topic names like "bro/pretend/my_cool_event". +For scripts that use Broker as a means of cluster-aware analysis, +it's usually sufficient for them to make use of the topics declared +by the cluster framework. For scripts that are meant to establish +communication flows unrelated to Bro cluster, new topics are declared +(examples being the NetControl and Control frameworks). For cluster operation, see :doc:`/scripts/base/frameworks/cluster/main.bro` for a list of topics that are useful for steering published events to -the various node classes. E.g. you have the ability to broadcast to all -directly-connected nodes, only those of a given class (e.g. just workers), -or to a specific node within a class. +the various node classes. E.g. you have the ability to broadcast +to all nodes of a given class (e.g. just workers) or just send to a +specific node within a class. The topic names that logs get published under are a bit nuanced. In the default cluster configuration, they are round-robin published to @@ -279,7 +284,12 @@ processes, logs get published to the topic indicated by For those writing their own scripts which need new topic names, a suggestion would be to avoid prefixing any new topics/prefixes with "bro/" as any changes in scripts shipping with Bro will use that prefix -and it's better to not risk unintended conflicts. +and it's better to not risk unintended conflicts. Again, it's +often less confusing to just re-use existing topic names instead +of introducing new topic names. The typical use case is writing +a cluster-enabled script, which usually just needs to route events +based upon node classes, and that already has usable topics in the +cluster framework. Connecting to Peers ------------------- @@ -518,24 +528,28 @@ Worker Sending Events To All Workers Since workers are not directly connected to each other in the cluster topology, this type of communication is a bit different than what we -did before. Instead of using :bro:see:`Broker::publish` we use different -"relay" calls to hop the message from a different node that *is* connected. +did before since we have to manually relay the event via some node that *is* +connected to all workers. The manager or a proxy satisfies that requirement: .. code:: bro event worker_to_workers(worker_name: string) { - print "got event from worker", worker_name; + @if ( Cluster::local_node_type() == Cluster::MANAGER || + Cluster::local_node_type() == Cluster::PROXY ) + Broker::publish(Cluster::worker_topic, worker_to_workers, + worker_name) + @else + print "got event from worker", worker_name; + @endif } event some_event_handled_on_worker() { # We know the manager is connected to all workers, so we could - # choose to relay the event across it. Note that sending the event - # this way will not allow the manager to handle it, even if it - # does have an event handler. - Broker::relay(Cluster::manager_topic, Cluster::worker_topic, - worker_to_workers, Cluster::node + " (via manager)"); + # choose to relay the event across it. + Broker::publish(Cluster::manager_topic, worker_to_workers, + Cluster::node + " (via manager)"); # We also know that any given proxy is connected to all workers, # though now we have a choice of which proxy to use. If we @@ -543,9 +557,9 @@ did before. Instead of using :bro:see:`Broker::publish` we use different # we can use a round-robin strategy. The key used here is simply # used by the cluster framework internally to keep track of # which node is up next in the round-robin. - Cluster::relay_rr(Cluster::proxy_pool, "example_key", - Cluster::worker_topic, worker_to_workers, - Cluster::node + " (via a proxy)"); + local pt = Cluster::rr_topic(Cluster::proxy_pool, "example_key"); + Broker::publish(pt, worker_to_workers, + Cluster::node + " (via a proxy)"); } Worker Distributing Events Uniformly Across Proxies diff --git a/scripts/base/frameworks/broker/main.bro b/scripts/base/frameworks/broker/main.bro index 0db239fb88..613ebed14e 100644 --- a/scripts/base/frameworks/broker/main.bro +++ b/scripts/base/frameworks/broker/main.bro @@ -74,7 +74,7 @@ export { const max_sleep = 0 &redef; ## Forward all received messages to subscribing peers. - const forward_messages = F &redef; + const forward_messages = T &redef; ## The default topic prefix where logs will be published. The log's stream ## id is appended when writing to a particular stream. @@ -285,11 +285,25 @@ export { ## (except during :bro:see:`bro_init`). ## ## topic_prefix: a prefix previously supplied to a successful call to - ## :bro:see:`Broker::subscribe`. + ## :bro:see:`Broker::subscribe` or :bro:see:`Broker::forward`. ## ## Returns: true if interest in the topic prefix is no longer advertised. global unsubscribe: function(topic_prefix: string): bool; + ## Register a topic prefix subscription for events that should only be + ## forwarded to any subscribing peers and not raise any event handlers + ## on the receiving/forwarding node. i.e. it's the same as + ## :bro:see:`Broker::subscribe` except matching events are not raised + ## on the receiver, just forwarded. Use :bro:see:`Broker::unsubscribe` + ## with the same argument to undo this operation. + ## + ## topic_prefix: a prefix to match against remote message topics. + ## e.g. an empty prefix matches everything and "a" matches + ## "alice" and "amy" but not "bob". + ## + ## Returns: true if a new event forwarding/subscription is now registered. + global forward: function(topic_prefix: string): bool; + ## Automatically send an event to any interested peers whenever it is ## locally dispatched. (For example, using "event my_event(...);" in a ## script.) @@ -377,6 +391,11 @@ function subscribe(topic_prefix: string): bool return __subscribe(topic_prefix); } +function forward(topic_prefix: string): bool + { + return __forward(topic_prefix); + } + function unsubscribe(topic_prefix: string): bool { return __unsubscribe(topic_prefix); diff --git a/scripts/base/frameworks/cluster/main.bro b/scripts/base/frameworks/cluster/main.bro index 25c0f4f63e..4c7f806131 100644 --- a/scripts/base/frameworks/cluster/main.bro +++ b/scripts/base/frameworks/cluster/main.bro @@ -15,10 +15,6 @@ export { ## Whether to distribute log messages among available logging nodes. const enable_round_robin_logging = T &redef; - ## The topic name used for exchanging general messages that are relevant to - ## any node in a cluster. Used with broker-enabled cluster communication. - const broadcast_topic = "bro/cluster/broadcast" &redef; - ## The topic name used for exchanging messages that are relevant to ## logger nodes in a cluster. Used with broker-enabled cluster communication. const logger_topic = "bro/cluster/logger" &redef; @@ -43,6 +39,10 @@ export { ## a named node in a cluster. Used with broker-enabled cluster communication. const node_topic_prefix = "bro/cluster/node/" &redef; + ## The topic prefix used for exchanging messages that are relevant to + ## a unique node in a cluster. Used with broker-enabled cluster communication. + const nodeid_topic_prefix = "bro/cluster/nodeid/" &redef; + ## Name of the node on which master data stores will be created if no other ## has already been specified by the user in :bro:see:`Cluster::stores`. ## An empty value means "use whatever name corresponds to the manager @@ -238,6 +238,15 @@ export { ## Returns: a topic string that may used to send a message exclusively to ## a given cluster node. global node_topic: function(name: string): string; + + ## Retrieve the topic associated with a specific node in the cluster. + ## + ## id: the id of the cluster node (from :bro:see:`Broker::EndpointInfo` + ## or :bro:see:`Broker::node_id`. + ## + ## Returns: a topic string that may used to send a message exclusively to + ## a given cluster node. + global nodeid_topic: function(id: string): string; } global active_worker_ids: set[string] = set(); @@ -286,6 +295,11 @@ function node_topic(name: string): string return node_topic_prefix + name; } +function nodeid_topic(id: string): string + { + return node_topic_prefix + id; + } + event Cluster::hello(name: string, id: string) &priority=10 { if ( name !in nodes ) @@ -321,7 +335,7 @@ event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) &priority= return; local e = Broker::make_event(Cluster::hello, node, Broker::node_id()); - Broker::publish(Cluster::broadcast_topic, e); + Broker::publish(nodeid_topic(endpoint$id), e); } event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) &priority=10 diff --git a/scripts/base/frameworks/cluster/setup-connections.bro b/scripts/base/frameworks/cluster/setup-connections.bro index 63ddbdd8b0..a90081c639 100644 --- a/scripts/base/frameworks/cluster/setup-connections.bro +++ b/scripts/base/frameworks/cluster/setup-connections.bro @@ -87,7 +87,7 @@ event bro_init() &priority=-10 return; } - Broker::subscribe(Cluster::broadcast_topic); + Broker::subscribe(nodeid_topic(Broker::node_id())); Broker::subscribe(node_topic(node)); Broker::listen(Broker::default_listen_address, diff --git a/scripts/base/frameworks/config/main.bro b/scripts/base/frameworks/config/main.bro index 30ddeaf3b9..dc7e71ecdf 100644 --- a/scripts/base/frameworks/config/main.bro +++ b/scripts/base/frameworks/config/main.bro @@ -28,11 +28,6 @@ export { ## record as it is sent on to the logging framework. global log_config: event(rec: Info); - ## Broker topic for announcing new configuration values. Sending new_value, - ## peers can send configuration changes that will be distributed across - ## the entire cluster. - const change_topic = "bro/config/change"; - ## This function is the config framework layer around the lower-level ## :bro:see:`Option::set` call. Config::set_value will set the configuration ## value for all nodes in the cluster, no matter where it was called. Note @@ -57,48 +52,58 @@ type OptionCacheValue: record { global option_cache: table[string] of OptionCacheValue; -event bro_init() +global Config::cluster_set_option: event(ID: string, val: any, location: string); + +function broadcast_option(ID: string, val: any, location: string) { - Broker::subscribe(change_topic); + # There's not currently a common topic to broadcast to as then enabling + # implicit Broker forwarding would cause a routing loop. + Broker::publish(Cluster::worker_topic, Config::cluster_set_option, + ID, val, location); + Broker::publish(Cluster::proxy_topic, Config::cluster_set_option, + ID, val, location); + Broker::publish(Cluster::logger_topic, Config::cluster_set_option, + ID, val, location); } event Config::cluster_set_option(ID: string, val: any, location: string) { @if ( Cluster::local_node_type() == Cluster::MANAGER ) option_cache[ID] = OptionCacheValue($val=val, $location=location); + broadcast_option(ID, val, location); @endif + Option::set(ID, val, location); } function set_value(ID: string, val: any, location: string &default = "" &optional): bool { - local cache_val: any; - # First cache value in case setting it succeeds and we have to store it. - if ( Cluster::local_node_type() == Cluster::MANAGER ) - cache_val = copy(val); + # Always copy the value to break references -- if caller mutates their + # value afterwards, we still guarantee the option has not changed. If + # one wants it to change, they need to explicitly call Option::set_value + # or Option::set with the intended value at the time of the call. + val = copy(val); + # First try setting it locally - abort if not possible. if ( ! Option::set(ID, val, location) ) return F; - # If setting worked, copy the new value into the cache on the manager - if ( Cluster::local_node_type() == Cluster::MANAGER ) - option_cache[ID] = OptionCacheValue($val=cache_val, $location=location); - # If it turns out that it is possible - send it to everyone else to apply. - Broker::publish(change_topic, Config::cluster_set_option, ID, val, location); +@if ( Cluster::local_node_type() == Cluster::MANAGER ) + option_cache[ID] = OptionCacheValue($val=val, $location=location); + broadcast_option(ID, val, location); +@else + Broker::publish(Cluster::manager_topic, Config::cluster_set_option, + ID, val, location); +@endif - if ( Cluster::local_node_type() != Cluster::MANAGER ) - { - Broker::relay(change_topic, change_topic, Config::cluster_set_option, ID, val, location); - } return T; } -@else -# Standalone implementation +@else # Standalone implementation function set_value(ID: string, val: any, location: string &default = "" &optional): bool { return Option::set(ID, val, location); } -@endif +@endif # Cluster::is_enabled @if ( Cluster::is_enabled() && Cluster::local_node_type() == Cluster::MANAGER ) # Handling of new worker nodes. diff --git a/scripts/base/frameworks/intel/cluster.bro b/scripts/base/frameworks/intel/cluster.bro index 99b920e00d..b71e8c47ea 100644 --- a/scripts/base/frameworks/intel/cluster.bro +++ b/scripts/base/frameworks/intel/cluster.bro @@ -6,21 +6,6 @@ module Intel; -export { - ## Broker topic for management of intel items. Sending insert_item and - ## remove_item events, peers can manage intelligence data. - const item_topic = "bro/intel/items" &redef; - - ## Broker topic for management of intel indicators as stored on workers - ## for matching. Sending insert_indicator and remove_indicator events, - ## the back-end manages indicators. - const indicator_topic = "bro/intel/indicators" &redef; - - ## Broker topic for matching events, generated by workers and sent to - ## the back-end for metadata enrichment and logging. - const match_topic = "bro/intel/match" &redef; -} - # Internal events for cluster data distribution. global insert_item: event(item: Item); global insert_indicator: event(item: Item); @@ -33,10 +18,7 @@ redef have_full_data = F; @if ( Cluster::local_node_type() == Cluster::MANAGER ) event bro_init() { - Broker::subscribe(item_topic); - Broker::subscribe(match_topic); - - Broker::auto_publish(indicator_topic, remove_indicator); + Broker::auto_publish(Cluster::worker_topic, remove_indicator); } # Handling of new worker nodes. @@ -54,11 +36,14 @@ event Cluster::node_up(name: string, id: string) # has to be distributed. event Intel::new_item(item: Item) &priority=5 { - if ( Cluster::proxy_pool$alive_count == 0 ) - Broker::publish(indicator_topic, Intel::insert_indicator, item); - else - Cluster::relay_rr(Cluster::proxy_pool, "Intel::new_item_relay_rr", - indicator_topic, Intel::insert_indicator, item); + local pt = Cluster::rr_topic(Cluster::proxy_pool, "intel_insert_rr_key"); + + if ( pt == "" ) + # No proxies alive, publish to all workers ourself instead of + # relaying via a proxy. + pt = Cluster::worker_topic; + + Broker::publish(pt, Intel::insert_indicator, item); } # Handling of item insertion triggered by remote node. @@ -84,17 +69,15 @@ event Intel::match_remote(s: Seen) &priority=5 @if ( Cluster::local_node_type() == Cluster::WORKER ) event bro_init() { - Broker::subscribe(indicator_topic); - - Broker::auto_publish(match_topic, match_remote); - Broker::auto_publish(item_topic, remove_item); + Broker::auto_publish(Cluster::manager_topic, match_remote); + Broker::auto_publish(Cluster::manager_topic, remove_item); } # On a worker, the new_item event requires to trigger the insertion # on the manager to update the back-end data store. event Intel::new_item(item: Intel::Item) &priority=5 { - Broker::publish(item_topic, Intel::insert_item, item); + Broker::publish(Cluster::manager_topic, Intel::insert_item, item); } # Handling of new indicators published by the manager. @@ -103,3 +86,12 @@ event Intel::insert_indicator(item: Intel::Item) &priority=5 Intel::_insert(item, F); } @endif + +@if ( Cluster::local_node_type() == Cluster::PROXY ) +event Intel::insert_indicator(item: Intel::Item) &priority=5 + { + # Just forwarding from manager to workers. + Broker::publish(Cluster::worker_topic, Intel::insert_indicator, item); + } +@endif + diff --git a/scripts/base/protocols/irc/dcc-send.bro b/scripts/base/protocols/irc/dcc-send.bro index 9454fef7b0..44d939209e 100644 --- a/scripts/base/protocols/irc/dcc-send.bro +++ b/scripts/base/protocols/irc/dcc-send.bro @@ -24,32 +24,41 @@ export { ## Sniffed mime type of the file. dcc_mime_type: string &log &optional; }; - - ## The broker topic name to which expected DCC transfer updates are - ## relayed. - const dcc_transfer_update_topic = "bro/irc/dcc_transfer_update" &redef; } global dcc_expected_transfers: table[addr, port] of Info &read_expire=5mins; +function dcc_relay_topic(): string + { + local rval = Cluster::rr_topic(Cluster::proxy_pool, "dcc_transfer_rr_key"); + + if ( rval == "" ) + # No proxy is alive, so relay via manager instead. + return Cluster::manager_topic; + + return rval; + } + event dcc_transfer_add(host: addr, p: port, info: Info) { +@if ( Cluster::local_node_type() == Cluster::PROXY || + Cluster::local_node_type() == Cluster::MANAGER ) + Broker::publish(Cluster::worker_topic, dcc_transfer_add, host, p, info); +@else dcc_expected_transfers[host, p] = info; Analyzer::schedule_analyzer(0.0.0.0, host, p, Analyzer::ANALYZER_IRC_DATA, 5 min); +@endif } event dcc_transfer_remove(host: addr, p: port) { +@if ( Cluster::local_node_type() == Cluster::PROXY || + Cluster::local_node_type() == Cluster::MANAGER ) + Broker::publish(Cluster::worker_topic, dcc_transfer_remove, host, p); +@else delete dcc_expected_transfers[host, p]; - } - -event bro_init() - { - local lnt = Cluster::local_node_type(); - - if ( lnt == Cluster::WORKER ) - Broker::subscribe(dcc_transfer_update_topic); +@endif } function log_dcc(f: fa_file) @@ -76,9 +85,11 @@ function log_dcc(f: fa_file) delete irc$dcc_mime_type; delete dcc_expected_transfers[cid$resp_h, cid$resp_p]; - Cluster::relay_rr(Cluster::proxy_pool, dcc_transfer_update_topic, - dcc_transfer_update_topic, dcc_transfer_remove, - cid$resp_h, cid$resp_p); + +@if ( Cluster::is_enabled() ) + Broker::publish(dcc_relay_topic(), dcc_transfer_remove, + cid$resp_h, cid$resp_p); +@endif return; } } @@ -102,9 +113,10 @@ event irc_dcc_message(c: connection, is_orig: bool, local p = count_to_port(dest_port, tcp); Analyzer::schedule_analyzer(0.0.0.0, address, p, Analyzer::ANALYZER_IRC_DATA, 5 min); dcc_expected_transfers[address, p] = c$irc; - Cluster::relay_rr(Cluster::proxy_pool, dcc_transfer_update_topic, - dcc_transfer_update_topic, dcc_transfer_add, - address, p, c$irc); + +@if ( Cluster::is_enabled() ) + Broker::publish(dcc_relay_topic(), dcc_transfer_add, address, p, c$irc); +@endif } event scheduled_analyzer_applied(c: connection, a: Analyzer::Tag) &priority=10 @@ -119,8 +131,10 @@ event connection_state_remove(c: connection) &priority=-5 if ( [c$id$resp_h, c$id$resp_p] in dcc_expected_transfers ) { delete dcc_expected_transfers[c$id$resp_h, c$id$resp_p]; - Cluster::relay_rr(Cluster::proxy_pool, dcc_transfer_update_topic, - dcc_transfer_update_topic, dcc_transfer_remove, - c$id$resp_h, c$id$resp_p); + +@if ( Cluster::is_enabled() ) + Broker::publish(dcc_relay_topic(), dcc_transfer_remove, + c$id$resp_h, c$id$resp_p); +@endif } } diff --git a/scripts/policy/frameworks/control/controllee.bro b/scripts/policy/frameworks/control/controllee.bro index 01937f27db..c3f08cda2b 100644 --- a/scripts/policy/frameworks/control/controllee.bro +++ b/scripts/policy/frameworks/control/controllee.bro @@ -14,7 +14,7 @@ module Control; event bro_init() &priority=-10 { - Broker::subscribe(Control::topic_prefix); + Broker::subscribe(Control::topic_prefix + "/" + Broker::node_id()); Broker::auto_publish(Control::topic_prefix + "/id_value_response", Control::id_value_response); Broker::auto_publish(Control::topic_prefix + "/peer_status_response", diff --git a/scripts/policy/frameworks/control/controller.bro b/scripts/policy/frameworks/control/controller.bro index 4897005dfb..b81ce4b2d6 100644 --- a/scripts/policy/frameworks/control/controller.bro +++ b/scripts/policy/frameworks/control/controller.bro @@ -23,16 +23,6 @@ event bro_init() &priority=5 terminate(); } - Broker::auto_publish(Control::topic_prefix + "/id_value_request", - Control::id_value_request); - Broker::auto_publish(Control::topic_prefix + "/peer_status_request", - Control::peer_status_request); - Broker::auto_publish(Control::topic_prefix + "/net_stats_request", - Control::net_stats_request); - Broker::auto_publish(Control::topic_prefix + "/configuration_update_request", - Control::configuration_update_request); - Broker::auto_publish(Control::topic_prefix + "/shutdown_request", - Control::shutdown_request); Broker::subscribe(Control::topic_prefix); Broker::peer(cat(host), host_port); } @@ -88,30 +78,30 @@ function configurable_ids(): id_table return rval; } -function send_control_request() +function send_control_request(topic: string) { switch ( cmd ) { case "id_value": if ( arg == "" ) Reporter::fatal("The Control::id_value command requires that Control::arg also has some value."); - event Control::id_value_request(arg); + Broker::publish(topic, Control::id_value_request, arg); break; case "peer_status": - event Control::peer_status_request(); + Broker::publish(topic, Control::peer_status_request); break; case "net_stats": - event Control::net_stats_request(); + Broker::publish(topic, Control::net_stats_request); break; case "shutdown": - event Control::shutdown_request(); + Broker::publish(topic, Control::shutdown_request); break; case "configuration_update": - event Control::configuration_update_request(); + Broker::publish(topic, Control::configuration_update_request); break; default: @@ -122,6 +112,8 @@ function send_control_request() event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) &priority=-10 { + local topic = Control::topic_prefix + "/" + endpoint$id; + if ( cmd == "configuration_update" ) { # Send all &redef'able consts to the peer. @@ -130,8 +122,6 @@ event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) &priority= for ( id in ids ) { - local topic = fmt("%s/id/%s", Control::topic_prefix, id); - if ( Broker::publish_id(topic, id) ) ++publish_count; } @@ -139,5 +129,5 @@ event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) &priority= Reporter::info(fmt("Control framework sent %d IDs", publish_count)); } - send_control_request(); + send_control_request(topic); } diff --git a/src/3rdparty b/src/3rdparty index d7af30b30c..7c95b51de2 160000 --- a/src/3rdparty +++ b/src/3rdparty @@ -1 +1 @@ -Subproject commit d7af30b30c811077d3838f4142400d8cc1f2cdbb +Subproject commit 7c95b51de202ac534b27dd721da5778b773dd614 diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index 6aef1e06cf..7861a5d420 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -2,6 +2,7 @@ #include #include #include +#include #include #include "Manager.h" @@ -403,76 +404,6 @@ bool Manager::PublishEvent(string topic, RecordVal* args) return PublishEvent(topic, event_name, std::move(xs)); } -bool Manager::RelayEvent(std::string first_topic, - broker::set relay_topics, - std::string name, - broker::vector args, - bool handle_on_relayer) - { - if ( bstate->endpoint.is_shutdown() ) - return true; - - if ( peer_count == 0 ) - return true; - - DBG_LOG(DBG_BROKER, "Publishing %s-relay event: %s", - handle_on_relayer ? "handle" : "", - RenderEvent(first_topic, name, args).c_str()); - - if ( handle_on_relayer ) - { - broker::bro::HandleAndRelayEvent msg(std::move(relay_topics), - std::move(name), - std::move(args)); - bstate->endpoint.publish(std::move(first_topic), std::move(msg)); - } - else - { - broker::bro::RelayEvent msg(std::move(relay_topics), - std::move(name), - std::move(args)); - bstate->endpoint.publish(std::move(first_topic), std::move(msg)); - } - - ++statistics.num_events_outgoing; - return true; - } - -bool Manager::RelayEvent(std::string first_topic, - std::set relay_topics, - RecordVal* args, - bool handle_on_relayer) - { - if ( bstate->endpoint.is_shutdown() ) - return true; - - if ( peer_count == 0 ) - return true; - - if ( ! args->Lookup(0) ) - return false; - - auto event_name = args->Lookup(0)->AsString()->CheckString(); - auto vv = args->Lookup(1)->AsVectorVal(); - broker::vector xs; - xs.reserve(vv->Size()); - - for ( auto i = 0u; i < vv->Size(); ++i ) - { - auto val = vv->Lookup(i)->AsRecordVal()->Lookup(0); - auto data_val = static_cast(val); - xs.emplace_back(data_val->data); - } - - broker::set topic_set; - - for ( auto& t : relay_topics ) - topic_set.emplace(std::move(t)); - - return RelayEvent(first_topic, std::move(topic_set), event_name, - std::move(xs), handle_on_relayer); - } - bool Manager::PublishIdentifier(std::string topic, std::string id) { if ( bstate->endpoint.is_shutdown() ) @@ -857,8 +788,28 @@ bool Manager::Subscribe(const string& topic_prefix) return true; } +bool Manager::Forward(string topic_prefix) + { + for ( auto i = 0u; i < forwarded_prefixes.size(); ++i ) + if ( forwarded_prefixes[i] == topic_prefix ) + return false; + + DBG_LOG(DBG_BROKER, "Forwarding topic prefix %s", topic_prefix.c_str()); + Subscribe(topic_prefix); + forwarded_prefixes.emplace_back(std::move(topic_prefix)); + return true; + } + bool Manager::Unsubscribe(const string& topic_prefix) { + for ( auto i = 0u; i < forwarded_prefixes.size(); ++i ) + if ( forwarded_prefixes[i] == topic_prefix ) + { + DBG_LOG(DBG_BROKER, "Unforwading topic prefix %s", topic_prefix.c_str()); + forwarded_prefixes.erase(forwarded_prefixes.begin() + i); + break; + } + DBG_LOG(DBG_BROKER, "Unsubscribing from topic prefix %s", topic_prefix.c_str()); bstate->subscriber.remove_topic(topic_prefix, ! after_bro_init); return true; @@ -898,19 +849,11 @@ double Manager::NextTimestamp(double* local_network_time) return -1; } -void Manager::DispatchMessage(broker::data msg) +void Manager::DispatchMessage(const broker::topic& topic, broker::data msg) { switch ( broker::bro::Message::type(msg) ) { case broker::bro::Message::Type::Event: - ProcessEvent(std::move(msg)); - break; - - case broker::bro::Message::Type::RelayEvent: - ProcessRelayEvent(std::move(msg)); - break; - - case broker::bro::Message::Type::HandleAndRelayEvent: - ProcessHandleAndRelayEvent(std::move(msg)); + ProcessEvent(topic, std::move(msg)); break; case broker::bro::Message::Type::LogCreate: @@ -930,7 +873,7 @@ void Manager::DispatchMessage(broker::data msg) broker::bro::Batch batch(std::move(msg)); for ( auto& i : batch.batch() ) - DispatchMessage(std::move(i)); + DispatchMessage(topic, std::move(i)); break; } @@ -978,7 +921,7 @@ void Manager::Process() try { - DispatchMessage(std::move(msg)); + DispatchMessage(topic, std::move(msg)); } catch ( std::runtime_error& e ) { @@ -1001,8 +944,11 @@ void Manager::Process() } -void Manager::ProcessEvent(std::string name, broker::vector args) +void Manager::ProcessEvent(const broker::topic& topic, broker::bro::Event ev) { + auto name = std::move(ev.name()); + auto args = std::move(ev.args()); + DBG_LOG(DBG_BROKER, "Process event: %s %s", name.data(), RenderMessage(args).data()); ++statistics.num_events_incoming; @@ -1011,6 +957,23 @@ void Manager::ProcessEvent(std::string name, broker::vector args) if ( ! handler ) return; + auto& topic_string = topic.string(); + + for ( auto i = 0u; i < forwarded_prefixes.size(); ++i ) + { + auto& p = forwarded_prefixes[i]; + + if ( p.size() > topic_string.size() ) + continue; + + if ( strncmp(p.data(), topic_string.data(), p.size()) != 0 ) + continue; + + DBG_LOG(DBG_BROKER, "Skip processing of forwarded event: %s %s", + name.data(), RenderMessage(args).data()); + return; + } + auto arg_types = handler->FType(false)->ArgTypes()->Types(); if ( static_cast(arg_types->length()) != args.size() ) @@ -1047,34 +1010,6 @@ void Manager::ProcessEvent(std::string name, broker::vector args) delete_vals(vl); } -void Manager::ProcessEvent(broker::bro::Event ev) - { - ProcessEvent(std::move(ev.name()), std::move(ev.args())); - } - -void Manager::ProcessRelayEvent(broker::bro::RelayEvent ev) - { - DBG_LOG(DBG_BROKER, "Received relay event: %s", RenderMessage(ev).c_str()); - ++statistics.num_events_incoming; - - for ( auto& t : ev.topics() ) - PublishEvent(std::move(caf::get(t)), - std::move(ev.name()), - std::move(ev.args())); - } - -void Manager::ProcessHandleAndRelayEvent(broker::bro::HandleAndRelayEvent ev) - { - DBG_LOG(DBG_BROKER, "Received handle-relay event: %s", - RenderMessage(ev).c_str()); - ProcessEvent(ev.name(), ev.args()); - - for ( auto& t : ev.topics() ) - PublishEvent(std::move(caf::get(t)), - std::move(ev.name()), - std::move(ev.args())); - } - bool bro_broker::Manager::ProcessLogCreate(broker::bro::LogCreate lc) { DBG_LOG(DBG_BROKER, "Received log-create: %s", RenderMessage(lc).c_str()); diff --git a/src/broker/Manager.h b/src/broker/Manager.h index a42cb495e6..87aba80058 100644 --- a/src/broker/Manager.h +++ b/src/broker/Manager.h @@ -153,43 +153,6 @@ public: */ bool PublishEvent(std::string topic, RecordVal* ev); - /** - * Sends an event to any interested peers, who, upon receipt, - * republishes the event to a new set of topics and optionally - * calls event handlers. - * @param first_topic the first topic to use when publishing the event - * @param relay_topics the set of topics the receivers will use to - * republish the event. The event is relayed at most a single hop. - * @param name the name of the event - * @param args the event's arguments - * @param handle_on_relayer whether they relaying-node should call event - * handlers. - * @return true if the message is sent successfully. - */ - bool RelayEvent(std::string first_topic, - broker::set relay_topics, - std::string name, - broker::vector args, - bool handle_on_relayer); - - /** - * Sends an event to any interested peers, who, upon receipt, - * republishes the event to a new set of topics and optionally - * calls event handlers. - * @param first_topic the first topic to use when publishing the event - * @param relay_topics the set of topics the receivers will use to - * republish the event. The event is relayed at most a single hop. - * @param ev the event and its arguments to send to peers, in the form of - * a Broker::Event record type. - * @param handle_on_relayer whether they relaying-node should call event - * handlers. - * @return true if the message is sent successfully. - */ - bool RelayEvent(std::string first_topic, - std::set relay_topics, - RecordVal* ev, - bool handle_on_relayer); - /** * Send a message to create a log stream to any interested peers. * The log stream may or may not already exist on the receiving side. @@ -261,10 +224,21 @@ public: */ bool Subscribe(const std::string& topic_prefix); + /** + * Register interest in peer event messages that use a certain topic prefix, + * but that should not be raised locally, just forwarded to any subscribing + * peers. + * @param topic_prefix a prefix to match against remote message topics. + * e.g. an empty prefix will match everything and "a" will match "alice" + * and "amy" but not "bob". + * @return true if it's a new event forward/subscription and it is now registered. + */ + bool Forward(std::string topic_prefix); + /** * Unregister interest in peer event messages. * @param topic_prefix a prefix previously supplied to a successful call - * to bro_broker::Manager::Subscribe(). + * to bro_broker::Manager::Subscribe() or bro_broker::Manager::Forward(). * @return true if interest in topic prefix is no longer advertised. */ bool Unsubscribe(const std::string& topic_prefix); @@ -348,11 +322,8 @@ public: private: - void DispatchMessage(broker::data msg); - void ProcessEvent(std::string name, broker::vector args); - void ProcessEvent(broker::bro::Event ev); - void ProcessRelayEvent(broker::bro::RelayEvent re); - void ProcessHandleAndRelayEvent(broker::bro::HandleAndRelayEvent ev); + void DispatchMessage(const broker::topic& topic, broker::data msg); + void ProcessEvent(const broker::topic& topic, broker::bro::Event ev); bool ProcessLogCreate(broker::bro::LogCreate lc); bool ProcessLogWrite(broker::bro::LogWrite lw); bool ProcessIdentifierUpdate(broker::bro::IdentifierUpdate iu); @@ -403,6 +374,7 @@ private: std::unordered_map data_stores; std::unordered_map pending_queries; + std::vector forwarded_prefixes; Stats statistics; diff --git a/src/broker/messaging.bif b/src/broker/messaging.bif index bc0d03a629..50966fa342 100644 --- a/src/broker/messaging.bif +++ b/src/broker/messaging.bif @@ -67,28 +67,6 @@ static bool publish_event_args(val_list& args, const BroString* topic, return rval; } -static bool relay_event_args(val_list& args, const BroString* topic, - std::set topic_set, Frame* frame) - { - bro_broker::Manager::ScriptScopeGuard ssg; - auto rval = false; - - if ( args[0]->Type()->Tag() == TYPE_RECORD ) - rval = broker_mgr->RelayEvent(topic->CheckString(), - std::move(topic_set), - args[0]->AsRecordVal(), false); - else - { - auto ev = broker_mgr->MakeEvent(&args, frame); - rval = broker_mgr->RelayEvent(topic->CheckString(), - std::move(topic_set), - ev, false); - Unref(ev); - } - - return rval; - } - %%} module Broker; @@ -131,130 +109,6 @@ function Broker::publish%(topic: string, ...%): bool return new Val(rval, TYPE_BOOL); %} -## Publishes an event at a given topic, with any receivers automatically -## forwarding it to its peers with a different topic. The event is relayed -## at most a single hop and the relayer does not call any local event handlers. -## -## first_topic: the initial topic to use for publishing the event. -## -## args: the first member of the argument list may be either a string or -## a set of strings indicating the secondary topic that the first -## set of receivers will use to re-publish the event. The remaining -## members of the argument list are either the return value of a -## previously-made call to :bro:see:`Broker::make_event` or the -## argument list that should be passed along to it, so that it can -## be called as part of executing this function. -## -## Returns: true if the message is sent. -function Broker::relay%(first_topic: string, ...%): bool - %{ - bro_broker::Manager::ScriptScopeGuard ssg; - val_list* bif_args = @ARGS@; - - if ( bif_args->length() < 3 ) - { - builtin_error("Broker::relay requires at least 3 arguments"); - return new Val(false, TYPE_BOOL); - } - - auto second_topic = (*bif_args)[1]; - - if ( second_topic->Type()->Tag() != TYPE_STRING && - ! is_string_set(second_topic->Type()) ) - { - builtin_error("Broker::relay requires a string or string_set as 2nd argument"); - return new Val(false, TYPE_BOOL); - } - - auto topic_set = val_to_topic_set(second_topic); - - if ( topic_set.empty() ) - return new Val(false, TYPE_BOOL); - - val_list args(bif_args->length() - 2); - - for ( auto i = 2; i < bif_args->length(); ++i ) - args.append((*bif_args)[i]); - - auto rval = false; - - if ( args[0]->Type()->Tag() == TYPE_RECORD ) - rval = broker_mgr->RelayEvent(first_topic->CheckString(), - std::move(topic_set), - args[0]->AsRecordVal(), false); - else - { - auto ev = broker_mgr->MakeEvent(&args, frame); - rval = broker_mgr->RelayEvent(first_topic->CheckString(), - std::move(topic_set), ev, false); - Unref(ev); - } - - return new Val(rval, TYPE_BOOL); - %} - -## Publishes an event at a given topic, with any receivers automatically -## forwarding it to its peers with a different topic. The event is relayed -## at most a single hop and the relayer does call local event handlers. -## -## first_topic: the initial topic to use for publishing the event. -## -## args: the first member of the argument list may be either a string or -## a set of strings indicating the secondary topic that the first -## set of receivers will use to re-publish the event. The remaining -## members of the argument list are either the return value of a -## previously-made call to :bro:see:`Broker::make_event` or the -## argument list that should be passed along to it, so that it can -## be called as part of executing this function. -## -## Returns: true if the message is sent. -function Broker::publish_and_relay%(first_topic: string, ...%): bool - %{ - bro_broker::Manager::ScriptScopeGuard ssg; - val_list* bif_args = @ARGS@; - - if ( bif_args->length() < 3 ) - { - builtin_error("Broker::publish_and_relay requires at least 3 arguments"); - return new Val(false, TYPE_BOOL); - } - - auto second_topic = (*bif_args)[1]; - - if ( second_topic->Type()->Tag() != TYPE_STRING && - ! is_string_set(second_topic->Type()) ) - { - builtin_error("Broker::publish_and_relay requires a string or string_set as 2nd argument"); - return new Val(false, TYPE_BOOL); - } - - auto topic_set = val_to_topic_set(second_topic); - - if ( topic_set.empty() ) - return new Val(false, TYPE_BOOL); - - val_list args(bif_args->length() - 2); - - for ( auto i = 2; i < bif_args->length(); ++i ) - args.append((*bif_args)[i]); - - auto rval = false; - - if ( args[0]->Type()->Tag() == TYPE_RECORD ) - rval = broker_mgr->RelayEvent(first_topic->CheckString(), - std::move(topic_set), - args[0]->AsRecordVal(), true); - else - { - auto ev = broker_mgr->MakeEvent(&args, frame); - rval = broker_mgr->RelayEvent(first_topic->CheckString(), - std::move(topic_set), ev, true); - Unref(ev); - } - - return new Val(rval, TYPE_BOOL); - %} - function Broker::__flush_logs%(%): count %{ auto rval = broker_mgr->FlushLogBuffers(); @@ -290,6 +144,13 @@ function Broker::__subscribe%(topic_prefix: string%): bool return new Val(rval, TYPE_BOOL); %} +function Broker::__forward%(topic_prefix: string%): bool + %{ + bro_broker::Manager::ScriptScopeGuard ssg; + auto rval = broker_mgr->Forward(topic_prefix->CheckString()); + return new Val(rval, TYPE_BOOL); + %} + function Broker::__unsubscribe%(topic_prefix: string%): bool %{ bro_broker::Manager::ScriptScopeGuard ssg; @@ -345,81 +206,6 @@ function Cluster::publish_rr%(pool: Pool, key: string, ...%): bool %} -## Publishes an event at a given topic, with a receiver node chosen -## from a pool according to Round-Robin distribution strategy. The receiving -## node, then automatically forwards it to its peers with a different topic. -## The event is relayed at most a single hop. -## -## pool: the pool of nodes that are eligible to receive the event. -## -## key: an arbitrary string to identify the purpose for which you're -## distributing the event. e.g. consider using namespacing of your -## script like "Intel::cluster_rr_key". -## -## args: the first member of the argument list may be either a string or -## a set of strings indicating the secondary topic that the receiver -## will use to re-publish the event. The remaining -## members of the argument list are either the return value of a -## previously-made call to :bro:see:`Broker::make_event` or the -## argument list that should be passed along to it, so that it can -## be called as part of executing this function. -## -## Returns: true if the message is sent. -function Cluster::relay_rr%(pool: Pool, key: any, ...%): bool - %{ - val_list* bif_args = @ARGS@; - - if ( bif_args->length() < 4 ) - { - builtin_error("Cluster::relay_rr requires at least 4 arguments"); - return new Val(false, TYPE_BOOL); - } - - static Func* topic_func = 0; - - if ( ! topic_func ) - topic_func = global_scope()->Lookup("Cluster::rr_topic")->ID_Val()->AsFunc(); - - val_list vl(2); - vl.append(pool->Ref()); - vl.append(key->Ref()); - auto topic = topic_func->Call(&vl); - - if ( ! topic->AsString()->Len() ) - { - Unref(topic); - return new Val(false, TYPE_BOOL); - } - - auto second_topic = (*bif_args)[2]; - - if ( second_topic->Type()->Tag() != TYPE_STRING && - ! is_string_set(second_topic->Type()) ) - { - builtin_error("Cluster::relay_rr requires a string or string_set as 3rd argument"); - Unref(topic); - return new Val(false, TYPE_BOOL); - } - - auto topic_set = val_to_topic_set(second_topic); - - if ( topic_set.empty() ) - { - Unref(topic); - return new Val(false, TYPE_BOOL); - } - - val_list args(bif_args->length() - 3); - - for ( auto i = 3; i < bif_args->length(); ++i ) - args.append((*bif_args)[i]); - - auto rval = relay_event_args(args, topic->AsString(), std::move(topic_set), - frame); - Unref(topic); - return new Val(rval, TYPE_BOOL); - %} - ## Publishes an event to a node within a pool according to Rendezvous ## (Highest Random Weight) hashing strategy. ## @@ -461,77 +247,3 @@ function Cluster::publish_hrw%(pool: Pool, key: any, ...%): bool Unref(topic); return new Val(rval, TYPE_BOOL); %} - -## Publishes an event at a given topic, with a receiver node chosen -## from a pool according to Rendezvous (Highest Random Weight) distribution -## strategy. The receiving nodes then automatically forwards it to its peers -## with a different topic. The event is relayed at most a single hop. -## -## pool: the pool of nodes that are eligible to receive the event. -## -## key: data used for input to the hashing function that will uniformly -## distribute keys among available nodes. -## -## args: the first member of the argument list may be either a string or -## a set of strings indicating the secondary topic that the receiver -## will use to re-publish the event. The remaining -## members of the argument list are either the return value of a -## previously-made call to :bro:see:`Broker::make_event` or the -## argument list that should be passed along to it, so that it can -## be called as part of executing this function. -## -## Returns: true if the message is sent. -function Cluster::relay_hrw%(pool: Pool, key: any, ...%): bool - %{ - val_list* bif_args = @ARGS@; - - if ( bif_args->length() < 4 ) - { - builtin_error("Cluster::relay_hrw requires at least 4 arguments"); - return new Val(false, TYPE_BOOL); - } - - static Func* topic_func = 0; - - if ( ! topic_func ) - topic_func = global_scope()->Lookup("Cluster::hrw_topic")->ID_Val()->AsFunc(); - - val_list vl(2); - vl.append(pool->Ref()); - vl.append(key->Ref()); - auto topic = topic_func->Call(&vl); - - if ( ! topic->AsString()->Len() ) - { - Unref(topic); - return new Val(false, TYPE_BOOL); - } - - auto second_topic = (*bif_args)[2]; - - if ( second_topic->Type()->Tag() != TYPE_STRING && - ! is_string_set(second_topic->Type()) ) - { - builtin_error("Cluster::relay_hrw requires a string or string_set as 3rd argument"); - Unref(topic); - return new Val(false, TYPE_BOOL); - } - - auto topic_set = val_to_topic_set(second_topic); - - if ( topic_set.empty() ) - { - Unref(topic); - return new Val(false, TYPE_BOOL); - } - - val_list args(bif_args->length() - 3); - - for ( auto i = 3; i < bif_args->length(); ++i ) - args.append((*bif_args)[i]); - - auto rval = relay_event_args(args, topic->AsString(), std::move(topic_set), - frame); - Unref(topic); - return new Val(rval, TYPE_BOOL); - %} diff --git a/testing/btest/Baseline/broker.remote_publish_and_relay_event/one.one.out b/testing/btest/Baseline/broker.remote_publish_and_relay_event/one.one.out deleted file mode 100644 index 45c18d28be..0000000000 --- a/testing/btest/Baseline/broker.remote_publish_and_relay_event/one.one.out +++ /dev/null @@ -1,3 +0,0 @@ -sender added peer: endpoint=127.0.0.1 msg=received handshake from remote core -got ready event -sender lost peer: endpoint=127.0.0.1 msg=lost remote peer diff --git a/testing/btest/Baseline/broker.remote_publish_and_relay_event/three.three.out b/testing/btest/Baseline/broker.remote_publish_and_relay_event/three.three.out deleted file mode 100644 index 8193829fd4..0000000000 --- a/testing/btest/Baseline/broker.remote_publish_and_relay_event/three.three.out +++ /dev/null @@ -1,2 +0,0 @@ -receiver added peer: endpoint=127.0.0.1 msg=handshake successful -got my_event, hello world diff --git a/testing/btest/Baseline/broker.remote_publish_and_relay_event/two.two.out b/testing/btest/Baseline/broker.remote_publish_and_relay_event/two.two.out deleted file mode 100644 index 7bedece7d2..0000000000 --- a/testing/btest/Baseline/broker.remote_publish_and_relay_event/two.two.out +++ /dev/null @@ -1,5 +0,0 @@ -receiver added peer: endpoint=127.0.0.1 msg=received handshake from remote core -receiver added peer: endpoint=127.0.0.1 msg=handshake successful -sending ready event -got my_event, hello world -receiver lost peer: endpoint=127.0.0.1 msg=lost remote peer diff --git a/testing/btest/Baseline/broker.remote_relay_event/one.one.out b/testing/btest/Baseline/broker.remote_relay_event/one.one.out deleted file mode 100644 index 45c18d28be..0000000000 --- a/testing/btest/Baseline/broker.remote_relay_event/one.one.out +++ /dev/null @@ -1,3 +0,0 @@ -sender added peer: endpoint=127.0.0.1 msg=received handshake from remote core -got ready event -sender lost peer: endpoint=127.0.0.1 msg=lost remote peer diff --git a/testing/btest/Baseline/broker.remote_relay_event/three.three.out b/testing/btest/Baseline/broker.remote_relay_event/three.three.out deleted file mode 100644 index 8193829fd4..0000000000 --- a/testing/btest/Baseline/broker.remote_relay_event/three.three.out +++ /dev/null @@ -1,2 +0,0 @@ -receiver added peer: endpoint=127.0.0.1 msg=handshake successful -got my_event, hello world diff --git a/testing/btest/Baseline/broker.remote_relay_event/two.two.out b/testing/btest/Baseline/broker.remote_relay_event/two.two.out deleted file mode 100644 index 21378cdd92..0000000000 --- a/testing/btest/Baseline/broker.remote_relay_event/two.two.out +++ /dev/null @@ -1,4 +0,0 @@ -receiver added peer: endpoint=127.0.0.1 msg=received handshake from remote core -receiver added peer: endpoint=127.0.0.1 msg=handshake successful -sending ready event -receiver lost peer: endpoint=127.0.0.1 msg=lost remote peer diff --git a/testing/btest/Baseline/plugins.hooks/output b/testing/btest/Baseline/plugins.hooks/output index 677b978e8e..8371f5ee03 100644 --- a/testing/btest/Baseline/plugins.hooks/output +++ b/testing/btest/Baseline/plugins.hooks/output @@ -157,7 +157,7 @@ 0.000000 MetaHookPost CallFunction(Analyzer::register_for_ports, , (Analyzer::ANALYZER_XMPP, {5222<...>/tcp})) -> 0.000000 MetaHookPost CallFunction(Cluster::is_enabled, , ()) -> 0.000000 MetaHookPost CallFunction(Cluster::is_enabled, , ()) -> -0.000000 MetaHookPost CallFunction(Cluster::local_node_type, , ()) -> +0.000000 MetaHookPost CallFunction(Cluster::local_node_type, , ()) -> 0.000000 MetaHookPost CallFunction(Cluster::register_pool, , ([topic=bro<...>/logger, node_type=Cluster::LOGGER, max_nodes=, exclusive=F])) -> 0.000000 MetaHookPost CallFunction(Cluster::register_pool, , ([topic=bro<...>/proxy, node_type=Cluster::PROXY, max_nodes=, exclusive=F])) -> 0.000000 MetaHookPost CallFunction(Cluster::register_pool, , ([topic=bro<...>/worker, node_type=Cluster::WORKER, max_nodes=, exclusive=F])) -> @@ -274,7 +274,7 @@ 0.000000 MetaHookPost CallFunction(Log::__create_stream, , (Weird::LOG, [columns=, ev=Weird::log_weird, path=weird])) -> 0.000000 MetaHookPost CallFunction(Log::__create_stream, , (X509::LOG, [columns=, ev=X509::log_x509, path=x509])) -> 0.000000 MetaHookPost CallFunction(Log::__create_stream, , (mysql::LOG, [columns=, ev=MySQL::log_mysql, path=mysql])) -> -0.000000 MetaHookPost CallFunction(Log::__write, , (PacketFilter::LOG, [ts=1535492133.703661, node=bro, filter=ip or not ip, init=T, success=T])) -> +0.000000 MetaHookPost CallFunction(Log::__write, , (PacketFilter::LOG, [ts=1535587658.201126, node=bro, filter=ip or not ip, init=T, success=T])) -> 0.000000 MetaHookPost CallFunction(Log::add_default_filter, , (Broker::LOG)) -> 0.000000 MetaHookPost CallFunction(Log::add_default_filter, , (Cluster::LOG)) -> 0.000000 MetaHookPost CallFunction(Log::add_default_filter, , (Config::LOG)) -> @@ -459,7 +459,7 @@ 0.000000 MetaHookPost CallFunction(Log::create_stream, , (Weird::LOG, [columns=, ev=Weird::log_weird, path=weird])) -> 0.000000 MetaHookPost CallFunction(Log::create_stream, , (X509::LOG, [columns=, ev=X509::log_x509, path=x509])) -> 0.000000 MetaHookPost CallFunction(Log::create_stream, , (mysql::LOG, [columns=, ev=MySQL::log_mysql, path=mysql])) -> -0.000000 MetaHookPost CallFunction(Log::write, , (PacketFilter::LOG, [ts=1535492133.703661, node=bro, filter=ip or not ip, init=T, success=T])) -> +0.000000 MetaHookPost CallFunction(Log::write, , (PacketFilter::LOG, [ts=1535587658.201126, node=bro, filter=ip or not ip, init=T, success=T])) -> 0.000000 MetaHookPost CallFunction(NetControl::check_plugins, , ()) -> 0.000000 MetaHookPost CallFunction(NetControl::init, , ()) -> 0.000000 MetaHookPost CallFunction(Notice::want_pp, , ()) -> @@ -1036,7 +1036,7 @@ 0.000000 MetaHookPre CallFunction(Analyzer::register_for_ports, , (Analyzer::ANALYZER_XMPP, {5222<...>/tcp})) 0.000000 MetaHookPre CallFunction(Cluster::is_enabled, , ()) 0.000000 MetaHookPre CallFunction(Cluster::is_enabled, , ()) -0.000000 MetaHookPre CallFunction(Cluster::local_node_type, , ()) +0.000000 MetaHookPre CallFunction(Cluster::local_node_type, , ()) 0.000000 MetaHookPre CallFunction(Cluster::register_pool, , ([topic=bro<...>/logger, node_type=Cluster::LOGGER, max_nodes=, exclusive=F])) 0.000000 MetaHookPre CallFunction(Cluster::register_pool, , ([topic=bro<...>/proxy, node_type=Cluster::PROXY, max_nodes=, exclusive=F])) 0.000000 MetaHookPre CallFunction(Cluster::register_pool, , ([topic=bro<...>/worker, node_type=Cluster::WORKER, max_nodes=, exclusive=F])) @@ -1153,7 +1153,7 @@ 0.000000 MetaHookPre CallFunction(Log::__create_stream, , (Weird::LOG, [columns=, ev=Weird::log_weird, path=weird])) 0.000000 MetaHookPre CallFunction(Log::__create_stream, , (X509::LOG, [columns=, ev=X509::log_x509, path=x509])) 0.000000 MetaHookPre CallFunction(Log::__create_stream, , (mysql::LOG, [columns=, ev=MySQL::log_mysql, path=mysql])) -0.000000 MetaHookPre CallFunction(Log::__write, , (PacketFilter::LOG, [ts=1535492133.703661, node=bro, filter=ip or not ip, init=T, success=T])) +0.000000 MetaHookPre CallFunction(Log::__write, , (PacketFilter::LOG, [ts=1535587658.201126, node=bro, filter=ip or not ip, init=T, success=T])) 0.000000 MetaHookPre CallFunction(Log::add_default_filter, , (Broker::LOG)) 0.000000 MetaHookPre CallFunction(Log::add_default_filter, , (Cluster::LOG)) 0.000000 MetaHookPre CallFunction(Log::add_default_filter, , (Config::LOG)) @@ -1338,7 +1338,7 @@ 0.000000 MetaHookPre CallFunction(Log::create_stream, , (Weird::LOG, [columns=, ev=Weird::log_weird, path=weird])) 0.000000 MetaHookPre CallFunction(Log::create_stream, , (X509::LOG, [columns=, ev=X509::log_x509, path=x509])) 0.000000 MetaHookPre CallFunction(Log::create_stream, , (mysql::LOG, [columns=, ev=MySQL::log_mysql, path=mysql])) -0.000000 MetaHookPre CallFunction(Log::write, , (PacketFilter::LOG, [ts=1535492133.703661, node=bro, filter=ip or not ip, init=T, success=T])) +0.000000 MetaHookPre CallFunction(Log::write, , (PacketFilter::LOG, [ts=1535587658.201126, node=bro, filter=ip or not ip, init=T, success=T])) 0.000000 MetaHookPre CallFunction(NetControl::check_plugins, , ()) 0.000000 MetaHookPre CallFunction(NetControl::init, , ()) 0.000000 MetaHookPre CallFunction(Notice::want_pp, , ()) @@ -2031,7 +2031,7 @@ 0.000000 | HookCallFunction Log::__create_stream(Weird::LOG, [columns=, ev=Weird::log_weird, path=weird]) 0.000000 | HookCallFunction Log::__create_stream(X509::LOG, [columns=, ev=X509::log_x509, path=x509]) 0.000000 | HookCallFunction Log::__create_stream(mysql::LOG, [columns=, ev=MySQL::log_mysql, path=mysql]) -0.000000 | HookCallFunction Log::__write(PacketFilter::LOG, [ts=1535492133.703661, node=bro, filter=ip or not ip, init=T, success=T]) +0.000000 | HookCallFunction Log::__write(PacketFilter::LOG, [ts=1535587658.201126, node=bro, filter=ip or not ip, init=T, success=T]) 0.000000 | HookCallFunction Log::add_default_filter(Broker::LOG) 0.000000 | HookCallFunction Log::add_default_filter(Cluster::LOG) 0.000000 | HookCallFunction Log::add_default_filter(Config::LOG) @@ -2216,7 +2216,7 @@ 0.000000 | HookCallFunction Log::create_stream(Weird::LOG, [columns=, ev=Weird::log_weird, path=weird]) 0.000000 | HookCallFunction Log::create_stream(X509::LOG, [columns=, ev=X509::log_x509, path=x509]) 0.000000 | HookCallFunction Log::create_stream(mysql::LOG, [columns=, ev=MySQL::log_mysql, path=mysql]) -0.000000 | HookCallFunction Log::write(PacketFilter::LOG, [ts=1535492133.703661, node=bro, filter=ip or not ip, init=T, success=T]) +0.000000 | HookCallFunction Log::write(PacketFilter::LOG, [ts=1535587658.201126, node=bro, filter=ip or not ip, init=T, success=T]) 0.000000 | HookCallFunction NetControl::check_plugins() 0.000000 | HookCallFunction NetControl::init() 0.000000 | HookCallFunction Notice::want_pp() @@ -2630,7 +2630,7 @@ 0.000000 | HookLoadFile base<...>/x509 0.000000 | HookLoadFile base<...>/xmpp 0.000000 | HookLogInit packet_filter 1/1 {ts (time), node (string), filter (string), init (bool), success (bool)} -0.000000 | HookLogWrite packet_filter [ts=1535492133.703661, node=bro, filter=ip or not ip, init=T, success=T] +0.000000 | HookLogWrite packet_filter [ts=1535587658.201126, node=bro, filter=ip or not ip, init=T, success=T] 0.000000 | HookQueueEvent NetControl::init() 0.000000 | HookQueueEvent bro_init() 0.000000 | HookQueueEvent filter_change_tracking() diff --git a/testing/btest/Baseline/scripts.base.frameworks.cluster.forwarding/manager-1..stdout b/testing/btest/Baseline/scripts.base.frameworks.cluster.forwarding/manager-1..stdout new file mode 100644 index 0000000000..5b10602c67 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.cluster.forwarding/manager-1..stdout @@ -0,0 +1,8 @@ +Connected to a peer +Connected to a peer +Connected to a peer +Got fully_connected event +Got fully_connected event +Connected to a peer +Got fully_connected event +Got fully_connected event diff --git a/testing/btest/Baseline/scripts.base.frameworks.cluster.forwarding/proxy-1..stdout b/testing/btest/Baseline/scripts.base.frameworks.cluster.forwarding/proxy-1..stdout new file mode 100644 index 0000000000..c3a1950daf --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.cluster.forwarding/proxy-1..stdout @@ -0,0 +1,3 @@ +Connected to a peer +Connected to a peer +Connected to a peer diff --git a/testing/btest/Baseline/scripts.base.frameworks.cluster.forwarding/proxy-2..stdout b/testing/btest/Baseline/scripts.base.frameworks.cluster.forwarding/proxy-2..stdout new file mode 100644 index 0000000000..c3a1950daf --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.cluster.forwarding/proxy-2..stdout @@ -0,0 +1,3 @@ +Connected to a peer +Connected to a peer +Connected to a peer diff --git a/testing/btest/Baseline/scripts.base.frameworks.cluster.forwarding/worker-1..stdout b/testing/btest/Baseline/scripts.base.frameworks.cluster.forwarding/worker-1..stdout new file mode 100644 index 0000000000..c3a1950daf --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.cluster.forwarding/worker-1..stdout @@ -0,0 +1,3 @@ +Connected to a peer +Connected to a peer +Connected to a peer diff --git a/testing/btest/Baseline/scripts.base.frameworks.cluster.forwarding/worker-2..stdout b/testing/btest/Baseline/scripts.base.frameworks.cluster.forwarding/worker-2..stdout new file mode 100644 index 0000000000..5bce99d801 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.cluster.forwarding/worker-2..stdout @@ -0,0 +1,4 @@ +Connected to a peer +Connected to a peer +Connected to a peer +got forwarded event diff --git a/testing/btest/broker/remote_publish_and_relay_event.bro b/testing/btest/broker/remote_publish_and_relay_event.bro deleted file mode 100644 index 444b454f80..0000000000 --- a/testing/btest/broker/remote_publish_and_relay_event.bro +++ /dev/null @@ -1,119 +0,0 @@ -# @TEST-SERIALIZE: comm -# -# @TEST-EXEC: btest-bg-run three "bro -B broker -b ../three.bro >three.out" -# @TEST-EXEC: btest-bg-run two "bro -B broker -b ../two.bro >two.out" -# @TEST-EXEC: btest-bg-run one "bro -B broker -b ../one.bro >one.out" -# -# @TEST-EXEC: btest-bg-wait 20 -# @TEST-EXEC: btest-diff one/one.out -# @TEST-EXEC: btest-diff two/two.out -# @TEST-EXEC: btest-diff three/three.out - -@TEST-START-FILE one.bro - -redef exit_only_after_terminate = T; - -event my_event(s: string) - { - print "got my_event", s; - } - -event ready_event() - { - print "got ready event"; - - Broker::publish_and_relay("bro/event/pre-relay", "bro/event/post-relay", - my_event, "hello world"); - } - -event bro_init() - { - Broker::subscribe("bro/event/ready"); - Broker::peer("127.0.0.1", 10000/tcp); - } - -event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) - { - print fmt("sender added peer: endpoint=%s msg=%s", - endpoint$network$address, msg); - } - -event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) - { - print fmt("sender lost peer: endpoint=%s msg=%s", - endpoint$network$address, msg); - terminate(); - } - -@TEST-END-FILE - - -@TEST-START-FILE two.bro - -redef exit_only_after_terminate = T; - -global peers_added = 0; - -event my_event(s: string) - { - print "got my_event", s; - } - -event ready_event() - { - } - -event bro_init() - { - Broker::subscribe("bro/event/pre-relay"); - Broker::listen("127.0.0.1", 10000/tcp); - Broker::peer("127.0.0.1", 9999/tcp); - } - -event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) - { - print fmt("receiver added peer: endpoint=%s msg=%s", endpoint$network$address, msg); - ++peers_added; - - if ( peers_added == 2 ) - { - print "sending ready event"; - Broker::publish("bro/event/ready", ready_event); - } - } - -event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) - { - print fmt("receiver lost peer: endpoint=%s msg=%s", endpoint$network$address, msg); - terminate(); - } - -@TEST-END-FILE - -@TEST-START-FILE three.bro - -redef exit_only_after_terminate = T; - -event my_event(s: string) - { - print "got my_event", s; - terminate(); - } - -event bro_init() - { - Broker::subscribe("bro/event/post-relay"); - Broker::listen("127.0.0.1", 9999/tcp); - } - -event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) - { - print fmt("receiver added peer: endpoint=%s msg=%s", endpoint$network$address, msg); - } - -event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) - { - print fmt("receiver lost peer: endpoint=%s msg=%s", endpoint$network$address, msg); - } - -@TEST-END-FILE diff --git a/testing/btest/broker/remote_relay_event.bro b/testing/btest/broker/remote_relay_event.bro deleted file mode 100644 index c65265bdb5..0000000000 --- a/testing/btest/broker/remote_relay_event.bro +++ /dev/null @@ -1,120 +0,0 @@ -# @TEST-SERIALIZE: comm -# -# @TEST-EXEC: btest-bg-run three "bro -B broker -b ../three.bro >three.out" -# @TEST-EXEC: btest-bg-run two "bro -B broker -b ../two.bro >two.out" -# @TEST-EXEC: btest-bg-run one "bro -B broker -b ../one.bro >one.out" -# -# @TEST-EXEC: btest-bg-wait 20 -# @TEST-EXEC: btest-diff one/one.out -# @TEST-EXEC: btest-diff two/two.out -# @TEST-EXEC: btest-diff three/three.out - -@TEST-START-FILE one.bro - -redef exit_only_after_terminate = T; - -event my_event(s: string) - { - print "got my_event", s; - } - -event ready_event() - { - print "got ready event"; - - Broker::relay("bro/event/pre-relay", "bro/event/post-relay", my_event, - "hello world"); - } - -event bro_init() - { - Broker::subscribe("bro/event/ready"); - Broker::peer("127.0.0.1", 10000/tcp); - } - -event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) - { - print fmt("sender added peer: endpoint=%s msg=%s", - endpoint$network$address, msg); - } - -event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) - { - print fmt("sender lost peer: endpoint=%s msg=%s", - endpoint$network$address, msg); - terminate(); - } - -@TEST-END-FILE - - -@TEST-START-FILE two.bro - -redef exit_only_after_terminate = T; - -global peers_added = 0; - -event my_event(s: string) - { - print "got my_event", s; - terminate(); - } - -event ready_event() - { - } - -event bro_init() - { - Broker::subscribe("bro/event/pre-relay"); - Broker::listen("127.0.0.1", 10000/tcp); - Broker::peer("127.0.0.1", 9999/tcp); - } - -event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) - { - print fmt("receiver added peer: endpoint=%s msg=%s", endpoint$network$address, msg); - ++peers_added; - - if ( peers_added == 2 ) - { - print "sending ready event"; - Broker::publish("bro/event/ready", ready_event); - } - } - -event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) - { - print fmt("receiver lost peer: endpoint=%s msg=%s", endpoint$network$address, msg); - terminate(); - } - -@TEST-END-FILE - -@TEST-START-FILE three.bro - -redef exit_only_after_terminate = T; - -event my_event(s: string) - { - print "got my_event", s; - terminate(); - } - -event bro_init() - { - Broker::subscribe("bro/event/post-relay"); - Broker::listen("127.0.0.1", 9999/tcp); - } - -event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) - { - print fmt("receiver added peer: endpoint=%s msg=%s", endpoint$network$address, msg); - } - -event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) - { - print fmt("receiver lost peer: endpoint=%s msg=%s", endpoint$network$address, msg); - } - -@TEST-END-FILE diff --git a/testing/btest/scripts/base/frameworks/cluster/forwarding.bro b/testing/btest/scripts/base/frameworks/cluster/forwarding.bro new file mode 100644 index 0000000000..e6e743ec0f --- /dev/null +++ b/testing/btest/scripts/base/frameworks/cluster/forwarding.bro @@ -0,0 +1,105 @@ +# @TEST-SERIALIZE: comm +# +# @TEST-EXEC: btest-bg-run manager-1 BROPATH=$BROPATH:.. CLUSTER_NODE=manager-1 bro %INPUT +# @TEST-EXEC: btest-bg-run proxy-1 BROPATH=$BROPATH:.. CLUSTER_NODE=proxy-1 bro %INPUT +# @TEST-EXEC: btest-bg-run proxy-2 BROPATH=$BROPATH:.. CLUSTER_NODE=proxy-2 bro %INPUT +# @TEST-EXEC: btest-bg-run worker-1 BROPATH=$BROPATH:.. CLUSTER_NODE=worker-1 bro %INPUT +# @TEST-EXEC: btest-bg-run worker-2 BROPATH=$BROPATH:.. CLUSTER_NODE=worker-2 bro %INPUT +# @TEST-EXEC: btest-bg-wait 30 +# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-sort btest-diff manager-1/.stdout +# @TEST-EXEC: btest-diff proxy-1/.stdout +# @TEST-EXEC: btest-diff proxy-2/.stdout +# @TEST-EXEC: btest-diff worker-1/.stdout +# @TEST-EXEC: btest-diff worker-2/.stdout + +@TEST-START-FILE cluster-layout.bro +redef Cluster::nodes = { + ["manager-1"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=37757/tcp], + ["proxy-1"] = [$node_type=Cluster::PROXY, $ip=127.0.0.1, $p=37758/tcp, $manager="manager-1"], + ["proxy-2"] = [$node_type=Cluster::PROXY, $ip=127.0.0.1, $p=37759/tcp, $manager="manager-1"], + ["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=37760/tcp, $manager="manager-1", $interface="eth0"], + ["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=37761/tcp, $manager="manager-1", $interface="eth1"], +}; +@TEST-END-FILE + +global fully_connected: event(); + +global peer_count = 0; +global peers_lost = 0; +global fully_connected_nodes = 0; + +event forwarded_event() + { + print "got forwarded event"; + + if ( Cluster::node == "manager-1" ) + print "manager should NOT have raised the forwarded event"; + + terminate(); + } + +event ready() + { + # note that the publishing node, worker-1, will not receive the forwarded + # event as Broker's forwarding prevents the message going back to the + # immediate sender. + Broker::publish("test_topic", forwarded_event); + } + +event fully_connected() + { + if ( ! is_remote_event() ) + return; + + print "Got fully_connected event"; + fully_connected_nodes = fully_connected_nodes + 1; + + if ( Cluster::node == "manager-1" ) + { + if ( peer_count == 4 && fully_connected_nodes == 4 ) + Broker::publish(Cluster::node_topic("worker-1"), ready); + } + } + +event bro_init() + { + Broker::auto_publish(Cluster::manager_topic, fully_connected); + + if ( Cluster::node == "manager-1" ) + Broker::forward("test_topic"); + if ( Cluster::node == "worker-1" ) + Broker::subscribe("test_topic"); + if ( Cluster::node == "worker-2" ) + Broker::subscribe("test_topic"); + } + +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) + { + print "Connected to a peer"; + peer_count = peer_count + 1; + + if ( Cluster::node == "manager-1" ) + { + if ( peer_count == 4 && fully_connected_nodes == 4 ) + Broker::publish(Cluster::node_topic("worker-1"), ready); + } + else + { + if ( peer_count == 3 ) + event fully_connected(); + } + } + +event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) + { + ++peers_lost; + + if ( Cluster::node == "manager-1" ) + { + if ( peers_lost == 2 ) + # Both workers terminated + terminate(); + } + else + terminate(); + }