diff --git a/scripts/base/frameworks/broker/main.bro b/scripts/base/frameworks/broker/main.bro index a9922be14f..613ebed14e 100644 --- a/scripts/base/frameworks/broker/main.bro +++ b/scripts/base/frameworks/broker/main.bro @@ -285,11 +285,25 @@ export { ## (except during :bro:see:`bro_init`). ## ## topic_prefix: a prefix previously supplied to a successful call to - ## :bro:see:`Broker::subscribe`. + ## :bro:see:`Broker::subscribe` or :bro:see:`Broker::forward`. ## ## Returns: true if interest in the topic prefix is no longer advertised. global unsubscribe: function(topic_prefix: string): bool; + ## Register a topic prefix subscription for events that should only be + ## forwarded to any subscribing peers and not raise any event handlers + ## on the receiving/forwarding node. i.e. it's the same as + ## :bro:see:`Broker::subscribe` except matching events are not raised + ## on the receiver, just forwarded. Use :bro:see:`Broker::unsubscribe` + ## with the same argument to undo this operation. + ## + ## topic_prefix: a prefix to match against remote message topics. + ## e.g. an empty prefix matches everything and "a" matches + ## "alice" and "amy" but not "bob". + ## + ## Returns: true if a new event forwarding/subscription is now registered. + global forward: function(topic_prefix: string): bool; + ## Automatically send an event to any interested peers whenever it is ## locally dispatched. (For example, using "event my_event(...);" in a ## script.) @@ -377,6 +391,11 @@ function subscribe(topic_prefix: string): bool return __subscribe(topic_prefix); } +function forward(topic_prefix: string): bool + { + return __forward(topic_prefix); + } + function unsubscribe(topic_prefix: string): bool { return __unsubscribe(topic_prefix); diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index c0af314e10..7861a5d420 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -2,6 +2,7 @@ #include #include #include +#include #include #include "Manager.h" @@ -787,8 +788,28 @@ bool Manager::Subscribe(const string& topic_prefix) return true; } +bool Manager::Forward(string topic_prefix) + { + for ( auto i = 0u; i < forwarded_prefixes.size(); ++i ) + if ( forwarded_prefixes[i] == topic_prefix ) + return false; + + DBG_LOG(DBG_BROKER, "Forwarding topic prefix %s", topic_prefix.c_str()); + Subscribe(topic_prefix); + forwarded_prefixes.emplace_back(std::move(topic_prefix)); + return true; + } + bool Manager::Unsubscribe(const string& topic_prefix) { + for ( auto i = 0u; i < forwarded_prefixes.size(); ++i ) + if ( forwarded_prefixes[i] == topic_prefix ) + { + DBG_LOG(DBG_BROKER, "Unforwading topic prefix %s", topic_prefix.c_str()); + forwarded_prefixes.erase(forwarded_prefixes.begin() + i); + break; + } + DBG_LOG(DBG_BROKER, "Unsubscribing from topic prefix %s", topic_prefix.c_str()); bstate->subscriber.remove_topic(topic_prefix, ! after_bro_init); return true; @@ -828,11 +849,11 @@ double Manager::NextTimestamp(double* local_network_time) return -1; } -void Manager::DispatchMessage(broker::data msg) +void Manager::DispatchMessage(const broker::topic& topic, broker::data msg) { switch ( broker::bro::Message::type(msg) ) { case broker::bro::Message::Type::Event: - ProcessEvent(std::move(msg)); + ProcessEvent(topic, std::move(msg)); break; case broker::bro::Message::Type::LogCreate: @@ -852,7 +873,7 @@ void Manager::DispatchMessage(broker::data msg) broker::bro::Batch batch(std::move(msg)); for ( auto& i : batch.batch() ) - DispatchMessage(std::move(i)); + DispatchMessage(topic, std::move(i)); break; } @@ -900,7 +921,7 @@ void Manager::Process() try { - DispatchMessage(std::move(msg)); + DispatchMessage(topic, std::move(msg)); } catch ( std::runtime_error& e ) { @@ -923,8 +944,11 @@ void Manager::Process() } -void Manager::ProcessEvent(std::string name, broker::vector args) +void Manager::ProcessEvent(const broker::topic& topic, broker::bro::Event ev) { + auto name = std::move(ev.name()); + auto args = std::move(ev.args()); + DBG_LOG(DBG_BROKER, "Process event: %s %s", name.data(), RenderMessage(args).data()); ++statistics.num_events_incoming; @@ -933,6 +957,23 @@ void Manager::ProcessEvent(std::string name, broker::vector args) if ( ! handler ) return; + auto& topic_string = topic.string(); + + for ( auto i = 0u; i < forwarded_prefixes.size(); ++i ) + { + auto& p = forwarded_prefixes[i]; + + if ( p.size() > topic_string.size() ) + continue; + + if ( strncmp(p.data(), topic_string.data(), p.size()) != 0 ) + continue; + + DBG_LOG(DBG_BROKER, "Skip processing of forwarded event: %s %s", + name.data(), RenderMessage(args).data()); + return; + } + auto arg_types = handler->FType(false)->ArgTypes()->Types(); if ( static_cast(arg_types->length()) != args.size() ) @@ -969,11 +1010,6 @@ void Manager::ProcessEvent(std::string name, broker::vector args) delete_vals(vl); } -void Manager::ProcessEvent(broker::bro::Event ev) - { - ProcessEvent(std::move(ev.name()), std::move(ev.args())); - } - bool bro_broker::Manager::ProcessLogCreate(broker::bro::LogCreate lc) { DBG_LOG(DBG_BROKER, "Received log-create: %s", RenderMessage(lc).c_str()); diff --git a/src/broker/Manager.h b/src/broker/Manager.h index 9302801d29..87aba80058 100644 --- a/src/broker/Manager.h +++ b/src/broker/Manager.h @@ -224,10 +224,21 @@ public: */ bool Subscribe(const std::string& topic_prefix); + /** + * Register interest in peer event messages that use a certain topic prefix, + * but that should not be raised locally, just forwarded to any subscribing + * peers. + * @param topic_prefix a prefix to match against remote message topics. + * e.g. an empty prefix will match everything and "a" will match "alice" + * and "amy" but not "bob". + * @return true if it's a new event forward/subscription and it is now registered. + */ + bool Forward(std::string topic_prefix); + /** * Unregister interest in peer event messages. * @param topic_prefix a prefix previously supplied to a successful call - * to bro_broker::Manager::Subscribe(). + * to bro_broker::Manager::Subscribe() or bro_broker::Manager::Forward(). * @return true if interest in topic prefix is no longer advertised. */ bool Unsubscribe(const std::string& topic_prefix); @@ -311,9 +322,8 @@ public: private: - void DispatchMessage(broker::data msg); - void ProcessEvent(std::string name, broker::vector args); - void ProcessEvent(broker::bro::Event ev); + void DispatchMessage(const broker::topic& topic, broker::data msg); + void ProcessEvent(const broker::topic& topic, broker::bro::Event ev); bool ProcessLogCreate(broker::bro::LogCreate lc); bool ProcessLogWrite(broker::bro::LogWrite lw); bool ProcessIdentifierUpdate(broker::bro::IdentifierUpdate iu); @@ -364,6 +374,7 @@ private: std::unordered_map data_stores; std::unordered_map pending_queries; + std::vector forwarded_prefixes; Stats statistics; diff --git a/src/broker/messaging.bif b/src/broker/messaging.bif index 0a2203e8a9..50966fa342 100644 --- a/src/broker/messaging.bif +++ b/src/broker/messaging.bif @@ -144,6 +144,13 @@ function Broker::__subscribe%(topic_prefix: string%): bool return new Val(rval, TYPE_BOOL); %} +function Broker::__forward%(topic_prefix: string%): bool + %{ + bro_broker::Manager::ScriptScopeGuard ssg; + auto rval = broker_mgr->Forward(topic_prefix->CheckString()); + return new Val(rval, TYPE_BOOL); + %} + function Broker::__unsubscribe%(topic_prefix: string%): bool %{ bro_broker::Manager::ScriptScopeGuard ssg; diff --git a/testing/btest/Baseline/scripts.base.frameworks.cluster.forwarding/manager-1..stdout b/testing/btest/Baseline/scripts.base.frameworks.cluster.forwarding/manager-1..stdout new file mode 100644 index 0000000000..5b10602c67 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.cluster.forwarding/manager-1..stdout @@ -0,0 +1,8 @@ +Connected to a peer +Connected to a peer +Connected to a peer +Got fully_connected event +Got fully_connected event +Connected to a peer +Got fully_connected event +Got fully_connected event diff --git a/testing/btest/Baseline/scripts.base.frameworks.cluster.forwarding/proxy-1..stdout b/testing/btest/Baseline/scripts.base.frameworks.cluster.forwarding/proxy-1..stdout new file mode 100644 index 0000000000..c3a1950daf --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.cluster.forwarding/proxy-1..stdout @@ -0,0 +1,3 @@ +Connected to a peer +Connected to a peer +Connected to a peer diff --git a/testing/btest/Baseline/scripts.base.frameworks.cluster.forwarding/proxy-2..stdout b/testing/btest/Baseline/scripts.base.frameworks.cluster.forwarding/proxy-2..stdout new file mode 100644 index 0000000000..c3a1950daf --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.cluster.forwarding/proxy-2..stdout @@ -0,0 +1,3 @@ +Connected to a peer +Connected to a peer +Connected to a peer diff --git a/testing/btest/Baseline/scripts.base.frameworks.cluster.forwarding/worker-1..stdout b/testing/btest/Baseline/scripts.base.frameworks.cluster.forwarding/worker-1..stdout new file mode 100644 index 0000000000..c3a1950daf --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.cluster.forwarding/worker-1..stdout @@ -0,0 +1,3 @@ +Connected to a peer +Connected to a peer +Connected to a peer diff --git a/testing/btest/Baseline/scripts.base.frameworks.cluster.forwarding/worker-2..stdout b/testing/btest/Baseline/scripts.base.frameworks.cluster.forwarding/worker-2..stdout new file mode 100644 index 0000000000..5bce99d801 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.cluster.forwarding/worker-2..stdout @@ -0,0 +1,4 @@ +Connected to a peer +Connected to a peer +Connected to a peer +got forwarded event diff --git a/testing/btest/scripts/base/frameworks/cluster/forwarding.bro b/testing/btest/scripts/base/frameworks/cluster/forwarding.bro new file mode 100644 index 0000000000..e6e743ec0f --- /dev/null +++ b/testing/btest/scripts/base/frameworks/cluster/forwarding.bro @@ -0,0 +1,105 @@ +# @TEST-SERIALIZE: comm +# +# @TEST-EXEC: btest-bg-run manager-1 BROPATH=$BROPATH:.. CLUSTER_NODE=manager-1 bro %INPUT +# @TEST-EXEC: btest-bg-run proxy-1 BROPATH=$BROPATH:.. CLUSTER_NODE=proxy-1 bro %INPUT +# @TEST-EXEC: btest-bg-run proxy-2 BROPATH=$BROPATH:.. CLUSTER_NODE=proxy-2 bro %INPUT +# @TEST-EXEC: btest-bg-run worker-1 BROPATH=$BROPATH:.. CLUSTER_NODE=worker-1 bro %INPUT +# @TEST-EXEC: btest-bg-run worker-2 BROPATH=$BROPATH:.. CLUSTER_NODE=worker-2 bro %INPUT +# @TEST-EXEC: btest-bg-wait 30 +# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-sort btest-diff manager-1/.stdout +# @TEST-EXEC: btest-diff proxy-1/.stdout +# @TEST-EXEC: btest-diff proxy-2/.stdout +# @TEST-EXEC: btest-diff worker-1/.stdout +# @TEST-EXEC: btest-diff worker-2/.stdout + +@TEST-START-FILE cluster-layout.bro +redef Cluster::nodes = { + ["manager-1"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=37757/tcp], + ["proxy-1"] = [$node_type=Cluster::PROXY, $ip=127.0.0.1, $p=37758/tcp, $manager="manager-1"], + ["proxy-2"] = [$node_type=Cluster::PROXY, $ip=127.0.0.1, $p=37759/tcp, $manager="manager-1"], + ["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=37760/tcp, $manager="manager-1", $interface="eth0"], + ["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=37761/tcp, $manager="manager-1", $interface="eth1"], +}; +@TEST-END-FILE + +global fully_connected: event(); + +global peer_count = 0; +global peers_lost = 0; +global fully_connected_nodes = 0; + +event forwarded_event() + { + print "got forwarded event"; + + if ( Cluster::node == "manager-1" ) + print "manager should NOT have raised the forwarded event"; + + terminate(); + } + +event ready() + { + # note that the publishing node, worker-1, will not receive the forwarded + # event as Broker's forwarding prevents the message going back to the + # immediate sender. + Broker::publish("test_topic", forwarded_event); + } + +event fully_connected() + { + if ( ! is_remote_event() ) + return; + + print "Got fully_connected event"; + fully_connected_nodes = fully_connected_nodes + 1; + + if ( Cluster::node == "manager-1" ) + { + if ( peer_count == 4 && fully_connected_nodes == 4 ) + Broker::publish(Cluster::node_topic("worker-1"), ready); + } + } + +event bro_init() + { + Broker::auto_publish(Cluster::manager_topic, fully_connected); + + if ( Cluster::node == "manager-1" ) + Broker::forward("test_topic"); + if ( Cluster::node == "worker-1" ) + Broker::subscribe("test_topic"); + if ( Cluster::node == "worker-2" ) + Broker::subscribe("test_topic"); + } + +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) + { + print "Connected to a peer"; + peer_count = peer_count + 1; + + if ( Cluster::node == "manager-1" ) + { + if ( peer_count == 4 && fully_connected_nodes == 4 ) + Broker::publish(Cluster::node_topic("worker-1"), ready); + } + else + { + if ( peer_count == 3 ) + event fully_connected(); + } + } + +event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) + { + ++peers_lost; + + if ( Cluster::node == "manager-1" ) + { + if ( peers_lost == 2 ) + # Both workers terminated + terminate(); + } + else + terminate(); + }