diff --git a/aux/broker b/aux/broker index 18d56b7055..955a8e4369 160000 --- a/aux/broker +++ b/aux/broker @@ -1 +1 @@ -Subproject commit 18d56b70558db61f424dc90e5d010f57b75de459 +Subproject commit 955a8e4369d7bce78ea5e23a763b594c85b910b9 diff --git a/doc/frameworks/broker.rst b/doc/frameworks/broker.rst index e050ec6479..755b4d8b78 100644 --- a/doc/frameworks/broker.rst +++ b/doc/frameworks/broker.rst @@ -518,24 +518,28 @@ Worker Sending Events To All Workers Since workers are not directly connected to each other in the cluster topology, this type of communication is a bit different than what we -did before. Instead of using :bro:see:`Broker::publish` we use different -"relay" calls to hop the message from a different node that *is* connected. +did before since we have to manually relay the event via some node that *is* +connected to all workers. The manager or a proxy satisfies that requirement: .. code:: bro event worker_to_workers(worker_name: string) { - print "got event from worker", worker_name; + @if ( Cluster::local_node_type() == Cluster::MANAGER || + Cluster::local_node_type() == Cluster::PROXY ) + Broker::publish(Cluster::worker_topic, worker_to_workers, + worker_name) + @else + print "got event from worker", worker_name; + @endif } event some_event_handled_on_worker() { # We know the manager is connected to all workers, so we could - # choose to relay the event across it. Note that sending the event - # this way will not allow the manager to handle it, even if it - # does have an event handler. - Broker::relay(Cluster::manager_topic, Cluster::worker_topic, - worker_to_workers, Cluster::node + " (via manager)"); + # choose to relay the event across it. + Broker::publish(Cluster::manager_topic, worker_to_workers, + Cluster::node + " (via manager)"); # We also know that any given proxy is connected to all workers, # though now we have a choice of which proxy to use. If we @@ -543,9 +547,9 @@ did before. Instead of using :bro:see:`Broker::publish` we use different # we can use a round-robin strategy. The key used here is simply # used by the cluster framework internally to keep track of # which node is up next in the round-robin. - Cluster::relay_rr(Cluster::proxy_pool, "example_key", - Cluster::worker_topic, worker_to_workers, - Cluster::node + " (via a proxy)"); + local pt = Cluster::rr_topic(Cluster::proxy_pool, "example_key"); + Broker::publish(pt, worker_to_workers, + Cluster::node + " (via a proxy)"); } Worker Distributing Events Uniformly Across Proxies diff --git a/scripts/base/frameworks/config/main.bro b/scripts/base/frameworks/config/main.bro index 30ddeaf3b9..7dce8284b4 100644 --- a/scripts/base/frameworks/config/main.bro +++ b/scripts/base/frameworks/config/main.bro @@ -28,11 +28,6 @@ export { ## record as it is sent on to the logging framework. global log_config: event(rec: Info); - ## Broker topic for announcing new configuration values. Sending new_value, - ## peers can send configuration changes that will be distributed across - ## the entire cluster. - const change_topic = "bro/config/change"; - ## This function is the config framework layer around the lower-level ## :bro:see:`Option::set` call. Config::set_value will set the configuration ## value for all nodes in the cluster, no matter where it was called. Note @@ -57,48 +52,46 @@ type OptionCacheValue: record { global option_cache: table[string] of OptionCacheValue; -event bro_init() - { - Broker::subscribe(change_topic); - } - event Config::cluster_set_option(ID: string, val: any, location: string) { @if ( Cluster::local_node_type() == Cluster::MANAGER ) option_cache[ID] = OptionCacheValue($val=val, $location=location); + Broker::publish(Cluster::broadcast_topic, Config::cluster_set_option, + ID, val, location); @endif + Option::set(ID, val, location); } function set_value(ID: string, val: any, location: string &default = "" &optional): bool { - local cache_val: any; - # First cache value in case setting it succeeds and we have to store it. - if ( Cluster::local_node_type() == Cluster::MANAGER ) - cache_val = copy(val); + # Always copy the value to break references -- if caller mutates their + # value afterwards, we still guarantee the option has not changed. If + # one wants it to change, they need to explicitly call Option::set_value + # or Option::set with the intended value at the time of the call. + val = copy(val); + # First try setting it locally - abort if not possible. if ( ! Option::set(ID, val, location) ) return F; - # If setting worked, copy the new value into the cache on the manager - if ( Cluster::local_node_type() == Cluster::MANAGER ) - option_cache[ID] = OptionCacheValue($val=cache_val, $location=location); - # If it turns out that it is possible - send it to everyone else to apply. - Broker::publish(change_topic, Config::cluster_set_option, ID, val, location); +@if ( Cluster::local_node_type() == Cluster::MANAGER ) + option_cache[ID] = OptionCacheValue($val=val, $location=location); + Broker::publish(Cluster::broadcast_topic, Config::cluster_set_option, + ID, val, location); +@else + Broker::publish(Cluster::manager_topic, Config::cluster_set_option, + ID, val, location); +@endif - if ( Cluster::local_node_type() != Cluster::MANAGER ) - { - Broker::relay(change_topic, change_topic, Config::cluster_set_option, ID, val, location); - } return T; } -@else -# Standalone implementation +@else # Standalone implementation function set_value(ID: string, val: any, location: string &default = "" &optional): bool { return Option::set(ID, val, location); } -@endif +@endif # Cluster::is_enabled @if ( Cluster::is_enabled() && Cluster::local_node_type() == Cluster::MANAGER ) # Handling of new worker nodes. diff --git a/scripts/base/frameworks/intel/cluster.bro b/scripts/base/frameworks/intel/cluster.bro index 99b920e00d..4c95e3ae05 100644 --- a/scripts/base/frameworks/intel/cluster.bro +++ b/scripts/base/frameworks/intel/cluster.bro @@ -54,11 +54,14 @@ event Cluster::node_up(name: string, id: string) # has to be distributed. event Intel::new_item(item: Item) &priority=5 { - if ( Cluster::proxy_pool$alive_count == 0 ) - Broker::publish(indicator_topic, Intel::insert_indicator, item); - else - Cluster::relay_rr(Cluster::proxy_pool, "Intel::new_item_relay_rr", - indicator_topic, Intel::insert_indicator, item); + local pt = Cluster::rr_topic(Cluster::proxy_pool, indicator_topic); + + if ( pt == "" ) + # No proxies alive, publish to all workers ourself instead of + # relaying via a proxy. + pt = indicator_topic; + + Broker::publish(pt, Intel::insert_indicator, item); } # Handling of item insertion triggered by remote node. @@ -103,3 +106,12 @@ event Intel::insert_indicator(item: Intel::Item) &priority=5 Intel::_insert(item, F); } @endif + +@if ( Cluster::local_node_type() == Cluster::PROXY ) +event Intel::insert_indicator(item: Intel::Item) &priority=5 + { + # Just forwarding from manager to workers. + Broker::publish(indicator_topic, Intel::insert_indicator, item); + } +@endif + diff --git a/scripts/base/protocols/irc/dcc-send.bro b/scripts/base/protocols/irc/dcc-send.bro index 9454fef7b0..44d939209e 100644 --- a/scripts/base/protocols/irc/dcc-send.bro +++ b/scripts/base/protocols/irc/dcc-send.bro @@ -24,32 +24,41 @@ export { ## Sniffed mime type of the file. dcc_mime_type: string &log &optional; }; - - ## The broker topic name to which expected DCC transfer updates are - ## relayed. - const dcc_transfer_update_topic = "bro/irc/dcc_transfer_update" &redef; } global dcc_expected_transfers: table[addr, port] of Info &read_expire=5mins; +function dcc_relay_topic(): string + { + local rval = Cluster::rr_topic(Cluster::proxy_pool, "dcc_transfer_rr_key"); + + if ( rval == "" ) + # No proxy is alive, so relay via manager instead. + return Cluster::manager_topic; + + return rval; + } + event dcc_transfer_add(host: addr, p: port, info: Info) { +@if ( Cluster::local_node_type() == Cluster::PROXY || + Cluster::local_node_type() == Cluster::MANAGER ) + Broker::publish(Cluster::worker_topic, dcc_transfer_add, host, p, info); +@else dcc_expected_transfers[host, p] = info; Analyzer::schedule_analyzer(0.0.0.0, host, p, Analyzer::ANALYZER_IRC_DATA, 5 min); +@endif } event dcc_transfer_remove(host: addr, p: port) { +@if ( Cluster::local_node_type() == Cluster::PROXY || + Cluster::local_node_type() == Cluster::MANAGER ) + Broker::publish(Cluster::worker_topic, dcc_transfer_remove, host, p); +@else delete dcc_expected_transfers[host, p]; - } - -event bro_init() - { - local lnt = Cluster::local_node_type(); - - if ( lnt == Cluster::WORKER ) - Broker::subscribe(dcc_transfer_update_topic); +@endif } function log_dcc(f: fa_file) @@ -76,9 +85,11 @@ function log_dcc(f: fa_file) delete irc$dcc_mime_type; delete dcc_expected_transfers[cid$resp_h, cid$resp_p]; - Cluster::relay_rr(Cluster::proxy_pool, dcc_transfer_update_topic, - dcc_transfer_update_topic, dcc_transfer_remove, - cid$resp_h, cid$resp_p); + +@if ( Cluster::is_enabled() ) + Broker::publish(dcc_relay_topic(), dcc_transfer_remove, + cid$resp_h, cid$resp_p); +@endif return; } } @@ -102,9 +113,10 @@ event irc_dcc_message(c: connection, is_orig: bool, local p = count_to_port(dest_port, tcp); Analyzer::schedule_analyzer(0.0.0.0, address, p, Analyzer::ANALYZER_IRC_DATA, 5 min); dcc_expected_transfers[address, p] = c$irc; - Cluster::relay_rr(Cluster::proxy_pool, dcc_transfer_update_topic, - dcc_transfer_update_topic, dcc_transfer_add, - address, p, c$irc); + +@if ( Cluster::is_enabled() ) + Broker::publish(dcc_relay_topic(), dcc_transfer_add, address, p, c$irc); +@endif } event scheduled_analyzer_applied(c: connection, a: Analyzer::Tag) &priority=10 @@ -119,8 +131,10 @@ event connection_state_remove(c: connection) &priority=-5 if ( [c$id$resp_h, c$id$resp_p] in dcc_expected_transfers ) { delete dcc_expected_transfers[c$id$resp_h, c$id$resp_p]; - Cluster::relay_rr(Cluster::proxy_pool, dcc_transfer_update_topic, - dcc_transfer_update_topic, dcc_transfer_remove, - c$id$resp_h, c$id$resp_p); + +@if ( Cluster::is_enabled() ) + Broker::publish(dcc_relay_topic(), dcc_transfer_remove, + c$id$resp_h, c$id$resp_p); +@endif } } diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index 6aef1e06cf..c0af314e10 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -403,76 +403,6 @@ bool Manager::PublishEvent(string topic, RecordVal* args) return PublishEvent(topic, event_name, std::move(xs)); } -bool Manager::RelayEvent(std::string first_topic, - broker::set relay_topics, - std::string name, - broker::vector args, - bool handle_on_relayer) - { - if ( bstate->endpoint.is_shutdown() ) - return true; - - if ( peer_count == 0 ) - return true; - - DBG_LOG(DBG_BROKER, "Publishing %s-relay event: %s", - handle_on_relayer ? "handle" : "", - RenderEvent(first_topic, name, args).c_str()); - - if ( handle_on_relayer ) - { - broker::bro::HandleAndRelayEvent msg(std::move(relay_topics), - std::move(name), - std::move(args)); - bstate->endpoint.publish(std::move(first_topic), std::move(msg)); - } - else - { - broker::bro::RelayEvent msg(std::move(relay_topics), - std::move(name), - std::move(args)); - bstate->endpoint.publish(std::move(first_topic), std::move(msg)); - } - - ++statistics.num_events_outgoing; - return true; - } - -bool Manager::RelayEvent(std::string first_topic, - std::set relay_topics, - RecordVal* args, - bool handle_on_relayer) - { - if ( bstate->endpoint.is_shutdown() ) - return true; - - if ( peer_count == 0 ) - return true; - - if ( ! args->Lookup(0) ) - return false; - - auto event_name = args->Lookup(0)->AsString()->CheckString(); - auto vv = args->Lookup(1)->AsVectorVal(); - broker::vector xs; - xs.reserve(vv->Size()); - - for ( auto i = 0u; i < vv->Size(); ++i ) - { - auto val = vv->Lookup(i)->AsRecordVal()->Lookup(0); - auto data_val = static_cast(val); - xs.emplace_back(data_val->data); - } - - broker::set topic_set; - - for ( auto& t : relay_topics ) - topic_set.emplace(std::move(t)); - - return RelayEvent(first_topic, std::move(topic_set), event_name, - std::move(xs), handle_on_relayer); - } - bool Manager::PublishIdentifier(std::string topic, std::string id) { if ( bstate->endpoint.is_shutdown() ) @@ -905,14 +835,6 @@ void Manager::DispatchMessage(broker::data msg) ProcessEvent(std::move(msg)); break; - case broker::bro::Message::Type::RelayEvent: - ProcessRelayEvent(std::move(msg)); - break; - - case broker::bro::Message::Type::HandleAndRelayEvent: - ProcessHandleAndRelayEvent(std::move(msg)); - break; - case broker::bro::Message::Type::LogCreate: ProcessLogCreate(std::move(msg)); break; @@ -1052,29 +974,6 @@ void Manager::ProcessEvent(broker::bro::Event ev) ProcessEvent(std::move(ev.name()), std::move(ev.args())); } -void Manager::ProcessRelayEvent(broker::bro::RelayEvent ev) - { - DBG_LOG(DBG_BROKER, "Received relay event: %s", RenderMessage(ev).c_str()); - ++statistics.num_events_incoming; - - for ( auto& t : ev.topics() ) - PublishEvent(std::move(caf::get(t)), - std::move(ev.name()), - std::move(ev.args())); - } - -void Manager::ProcessHandleAndRelayEvent(broker::bro::HandleAndRelayEvent ev) - { - DBG_LOG(DBG_BROKER, "Received handle-relay event: %s", - RenderMessage(ev).c_str()); - ProcessEvent(ev.name(), ev.args()); - - for ( auto& t : ev.topics() ) - PublishEvent(std::move(caf::get(t)), - std::move(ev.name()), - std::move(ev.args())); - } - bool bro_broker::Manager::ProcessLogCreate(broker::bro::LogCreate lc) { DBG_LOG(DBG_BROKER, "Received log-create: %s", RenderMessage(lc).c_str()); diff --git a/src/broker/Manager.h b/src/broker/Manager.h index a42cb495e6..9302801d29 100644 --- a/src/broker/Manager.h +++ b/src/broker/Manager.h @@ -153,43 +153,6 @@ public: */ bool PublishEvent(std::string topic, RecordVal* ev); - /** - * Sends an event to any interested peers, who, upon receipt, - * republishes the event to a new set of topics and optionally - * calls event handlers. - * @param first_topic the first topic to use when publishing the event - * @param relay_topics the set of topics the receivers will use to - * republish the event. The event is relayed at most a single hop. - * @param name the name of the event - * @param args the event's arguments - * @param handle_on_relayer whether they relaying-node should call event - * handlers. - * @return true if the message is sent successfully. - */ - bool RelayEvent(std::string first_topic, - broker::set relay_topics, - std::string name, - broker::vector args, - bool handle_on_relayer); - - /** - * Sends an event to any interested peers, who, upon receipt, - * republishes the event to a new set of topics and optionally - * calls event handlers. - * @param first_topic the first topic to use when publishing the event - * @param relay_topics the set of topics the receivers will use to - * republish the event. The event is relayed at most a single hop. - * @param ev the event and its arguments to send to peers, in the form of - * a Broker::Event record type. - * @param handle_on_relayer whether they relaying-node should call event - * handlers. - * @return true if the message is sent successfully. - */ - bool RelayEvent(std::string first_topic, - std::set relay_topics, - RecordVal* ev, - bool handle_on_relayer); - /** * Send a message to create a log stream to any interested peers. * The log stream may or may not already exist on the receiving side. @@ -351,8 +314,6 @@ private: void DispatchMessage(broker::data msg); void ProcessEvent(std::string name, broker::vector args); void ProcessEvent(broker::bro::Event ev); - void ProcessRelayEvent(broker::bro::RelayEvent re); - void ProcessHandleAndRelayEvent(broker::bro::HandleAndRelayEvent ev); bool ProcessLogCreate(broker::bro::LogCreate lc); bool ProcessLogWrite(broker::bro::LogWrite lw); bool ProcessIdentifierUpdate(broker::bro::IdentifierUpdate iu); diff --git a/src/broker/messaging.bif b/src/broker/messaging.bif index bc0d03a629..0a2203e8a9 100644 --- a/src/broker/messaging.bif +++ b/src/broker/messaging.bif @@ -67,28 +67,6 @@ static bool publish_event_args(val_list& args, const BroString* topic, return rval; } -static bool relay_event_args(val_list& args, const BroString* topic, - std::set topic_set, Frame* frame) - { - bro_broker::Manager::ScriptScopeGuard ssg; - auto rval = false; - - if ( args[0]->Type()->Tag() == TYPE_RECORD ) - rval = broker_mgr->RelayEvent(topic->CheckString(), - std::move(topic_set), - args[0]->AsRecordVal(), false); - else - { - auto ev = broker_mgr->MakeEvent(&args, frame); - rval = broker_mgr->RelayEvent(topic->CheckString(), - std::move(topic_set), - ev, false); - Unref(ev); - } - - return rval; - } - %%} module Broker; @@ -131,130 +109,6 @@ function Broker::publish%(topic: string, ...%): bool return new Val(rval, TYPE_BOOL); %} -## Publishes an event at a given topic, with any receivers automatically -## forwarding it to its peers with a different topic. The event is relayed -## at most a single hop and the relayer does not call any local event handlers. -## -## first_topic: the initial topic to use for publishing the event. -## -## args: the first member of the argument list may be either a string or -## a set of strings indicating the secondary topic that the first -## set of receivers will use to re-publish the event. The remaining -## members of the argument list are either the return value of a -## previously-made call to :bro:see:`Broker::make_event` or the -## argument list that should be passed along to it, so that it can -## be called as part of executing this function. -## -## Returns: true if the message is sent. -function Broker::relay%(first_topic: string, ...%): bool - %{ - bro_broker::Manager::ScriptScopeGuard ssg; - val_list* bif_args = @ARGS@; - - if ( bif_args->length() < 3 ) - { - builtin_error("Broker::relay requires at least 3 arguments"); - return new Val(false, TYPE_BOOL); - } - - auto second_topic = (*bif_args)[1]; - - if ( second_topic->Type()->Tag() != TYPE_STRING && - ! is_string_set(second_topic->Type()) ) - { - builtin_error("Broker::relay requires a string or string_set as 2nd argument"); - return new Val(false, TYPE_BOOL); - } - - auto topic_set = val_to_topic_set(second_topic); - - if ( topic_set.empty() ) - return new Val(false, TYPE_BOOL); - - val_list args(bif_args->length() - 2); - - for ( auto i = 2; i < bif_args->length(); ++i ) - args.append((*bif_args)[i]); - - auto rval = false; - - if ( args[0]->Type()->Tag() == TYPE_RECORD ) - rval = broker_mgr->RelayEvent(first_topic->CheckString(), - std::move(topic_set), - args[0]->AsRecordVal(), false); - else - { - auto ev = broker_mgr->MakeEvent(&args, frame); - rval = broker_mgr->RelayEvent(first_topic->CheckString(), - std::move(topic_set), ev, false); - Unref(ev); - } - - return new Val(rval, TYPE_BOOL); - %} - -## Publishes an event at a given topic, with any receivers automatically -## forwarding it to its peers with a different topic. The event is relayed -## at most a single hop and the relayer does call local event handlers. -## -## first_topic: the initial topic to use for publishing the event. -## -## args: the first member of the argument list may be either a string or -## a set of strings indicating the secondary topic that the first -## set of receivers will use to re-publish the event. The remaining -## members of the argument list are either the return value of a -## previously-made call to :bro:see:`Broker::make_event` or the -## argument list that should be passed along to it, so that it can -## be called as part of executing this function. -## -## Returns: true if the message is sent. -function Broker::publish_and_relay%(first_topic: string, ...%): bool - %{ - bro_broker::Manager::ScriptScopeGuard ssg; - val_list* bif_args = @ARGS@; - - if ( bif_args->length() < 3 ) - { - builtin_error("Broker::publish_and_relay requires at least 3 arguments"); - return new Val(false, TYPE_BOOL); - } - - auto second_topic = (*bif_args)[1]; - - if ( second_topic->Type()->Tag() != TYPE_STRING && - ! is_string_set(second_topic->Type()) ) - { - builtin_error("Broker::publish_and_relay requires a string or string_set as 2nd argument"); - return new Val(false, TYPE_BOOL); - } - - auto topic_set = val_to_topic_set(second_topic); - - if ( topic_set.empty() ) - return new Val(false, TYPE_BOOL); - - val_list args(bif_args->length() - 2); - - for ( auto i = 2; i < bif_args->length(); ++i ) - args.append((*bif_args)[i]); - - auto rval = false; - - if ( args[0]->Type()->Tag() == TYPE_RECORD ) - rval = broker_mgr->RelayEvent(first_topic->CheckString(), - std::move(topic_set), - args[0]->AsRecordVal(), true); - else - { - auto ev = broker_mgr->MakeEvent(&args, frame); - rval = broker_mgr->RelayEvent(first_topic->CheckString(), - std::move(topic_set), ev, true); - Unref(ev); - } - - return new Val(rval, TYPE_BOOL); - %} - function Broker::__flush_logs%(%): count %{ auto rval = broker_mgr->FlushLogBuffers(); @@ -345,81 +199,6 @@ function Cluster::publish_rr%(pool: Pool, key: string, ...%): bool %} -## Publishes an event at a given topic, with a receiver node chosen -## from a pool according to Round-Robin distribution strategy. The receiving -## node, then automatically forwards it to its peers with a different topic. -## The event is relayed at most a single hop. -## -## pool: the pool of nodes that are eligible to receive the event. -## -## key: an arbitrary string to identify the purpose for which you're -## distributing the event. e.g. consider using namespacing of your -## script like "Intel::cluster_rr_key". -## -## args: the first member of the argument list may be either a string or -## a set of strings indicating the secondary topic that the receiver -## will use to re-publish the event. The remaining -## members of the argument list are either the return value of a -## previously-made call to :bro:see:`Broker::make_event` or the -## argument list that should be passed along to it, so that it can -## be called as part of executing this function. -## -## Returns: true if the message is sent. -function Cluster::relay_rr%(pool: Pool, key: any, ...%): bool - %{ - val_list* bif_args = @ARGS@; - - if ( bif_args->length() < 4 ) - { - builtin_error("Cluster::relay_rr requires at least 4 arguments"); - return new Val(false, TYPE_BOOL); - } - - static Func* topic_func = 0; - - if ( ! topic_func ) - topic_func = global_scope()->Lookup("Cluster::rr_topic")->ID_Val()->AsFunc(); - - val_list vl(2); - vl.append(pool->Ref()); - vl.append(key->Ref()); - auto topic = topic_func->Call(&vl); - - if ( ! topic->AsString()->Len() ) - { - Unref(topic); - return new Val(false, TYPE_BOOL); - } - - auto second_topic = (*bif_args)[2]; - - if ( second_topic->Type()->Tag() != TYPE_STRING && - ! is_string_set(second_topic->Type()) ) - { - builtin_error("Cluster::relay_rr requires a string or string_set as 3rd argument"); - Unref(topic); - return new Val(false, TYPE_BOOL); - } - - auto topic_set = val_to_topic_set(second_topic); - - if ( topic_set.empty() ) - { - Unref(topic); - return new Val(false, TYPE_BOOL); - } - - val_list args(bif_args->length() - 3); - - for ( auto i = 3; i < bif_args->length(); ++i ) - args.append((*bif_args)[i]); - - auto rval = relay_event_args(args, topic->AsString(), std::move(topic_set), - frame); - Unref(topic); - return new Val(rval, TYPE_BOOL); - %} - ## Publishes an event to a node within a pool according to Rendezvous ## (Highest Random Weight) hashing strategy. ## @@ -461,77 +240,3 @@ function Cluster::publish_hrw%(pool: Pool, key: any, ...%): bool Unref(topic); return new Val(rval, TYPE_BOOL); %} - -## Publishes an event at a given topic, with a receiver node chosen -## from a pool according to Rendezvous (Highest Random Weight) distribution -## strategy. The receiving nodes then automatically forwards it to its peers -## with a different topic. The event is relayed at most a single hop. -## -## pool: the pool of nodes that are eligible to receive the event. -## -## key: data used for input to the hashing function that will uniformly -## distribute keys among available nodes. -## -## args: the first member of the argument list may be either a string or -## a set of strings indicating the secondary topic that the receiver -## will use to re-publish the event. The remaining -## members of the argument list are either the return value of a -## previously-made call to :bro:see:`Broker::make_event` or the -## argument list that should be passed along to it, so that it can -## be called as part of executing this function. -## -## Returns: true if the message is sent. -function Cluster::relay_hrw%(pool: Pool, key: any, ...%): bool - %{ - val_list* bif_args = @ARGS@; - - if ( bif_args->length() < 4 ) - { - builtin_error("Cluster::relay_hrw requires at least 4 arguments"); - return new Val(false, TYPE_BOOL); - } - - static Func* topic_func = 0; - - if ( ! topic_func ) - topic_func = global_scope()->Lookup("Cluster::hrw_topic")->ID_Val()->AsFunc(); - - val_list vl(2); - vl.append(pool->Ref()); - vl.append(key->Ref()); - auto topic = topic_func->Call(&vl); - - if ( ! topic->AsString()->Len() ) - { - Unref(topic); - return new Val(false, TYPE_BOOL); - } - - auto second_topic = (*bif_args)[2]; - - if ( second_topic->Type()->Tag() != TYPE_STRING && - ! is_string_set(second_topic->Type()) ) - { - builtin_error("Cluster::relay_hrw requires a string or string_set as 3rd argument"); - Unref(topic); - return new Val(false, TYPE_BOOL); - } - - auto topic_set = val_to_topic_set(second_topic); - - if ( topic_set.empty() ) - { - Unref(topic); - return new Val(false, TYPE_BOOL); - } - - val_list args(bif_args->length() - 3); - - for ( auto i = 3; i < bif_args->length(); ++i ) - args.append((*bif_args)[i]); - - auto rval = relay_event_args(args, topic->AsString(), std::move(topic_set), - frame); - Unref(topic); - return new Val(rval, TYPE_BOOL); - %} diff --git a/testing/btest/Baseline/broker.remote_publish_and_relay_event/one.one.out b/testing/btest/Baseline/broker.remote_publish_and_relay_event/one.one.out deleted file mode 100644 index 45c18d28be..0000000000 --- a/testing/btest/Baseline/broker.remote_publish_and_relay_event/one.one.out +++ /dev/null @@ -1,3 +0,0 @@ -sender added peer: endpoint=127.0.0.1 msg=received handshake from remote core -got ready event -sender lost peer: endpoint=127.0.0.1 msg=lost remote peer diff --git a/testing/btest/Baseline/broker.remote_publish_and_relay_event/three.three.out b/testing/btest/Baseline/broker.remote_publish_and_relay_event/three.three.out deleted file mode 100644 index 8193829fd4..0000000000 --- a/testing/btest/Baseline/broker.remote_publish_and_relay_event/three.three.out +++ /dev/null @@ -1,2 +0,0 @@ -receiver added peer: endpoint=127.0.0.1 msg=handshake successful -got my_event, hello world diff --git a/testing/btest/Baseline/broker.remote_publish_and_relay_event/two.two.out b/testing/btest/Baseline/broker.remote_publish_and_relay_event/two.two.out deleted file mode 100644 index 7bedece7d2..0000000000 --- a/testing/btest/Baseline/broker.remote_publish_and_relay_event/two.two.out +++ /dev/null @@ -1,5 +0,0 @@ -receiver added peer: endpoint=127.0.0.1 msg=received handshake from remote core -receiver added peer: endpoint=127.0.0.1 msg=handshake successful -sending ready event -got my_event, hello world -receiver lost peer: endpoint=127.0.0.1 msg=lost remote peer diff --git a/testing/btest/Baseline/broker.remote_relay_event/one.one.out b/testing/btest/Baseline/broker.remote_relay_event/one.one.out deleted file mode 100644 index 45c18d28be..0000000000 --- a/testing/btest/Baseline/broker.remote_relay_event/one.one.out +++ /dev/null @@ -1,3 +0,0 @@ -sender added peer: endpoint=127.0.0.1 msg=received handshake from remote core -got ready event -sender lost peer: endpoint=127.0.0.1 msg=lost remote peer diff --git a/testing/btest/Baseline/broker.remote_relay_event/three.three.out b/testing/btest/Baseline/broker.remote_relay_event/three.three.out deleted file mode 100644 index 8193829fd4..0000000000 --- a/testing/btest/Baseline/broker.remote_relay_event/three.three.out +++ /dev/null @@ -1,2 +0,0 @@ -receiver added peer: endpoint=127.0.0.1 msg=handshake successful -got my_event, hello world diff --git a/testing/btest/Baseline/broker.remote_relay_event/two.two.out b/testing/btest/Baseline/broker.remote_relay_event/two.two.out deleted file mode 100644 index 21378cdd92..0000000000 --- a/testing/btest/Baseline/broker.remote_relay_event/two.two.out +++ /dev/null @@ -1,4 +0,0 @@ -receiver added peer: endpoint=127.0.0.1 msg=received handshake from remote core -receiver added peer: endpoint=127.0.0.1 msg=handshake successful -sending ready event -receiver lost peer: endpoint=127.0.0.1 msg=lost remote peer diff --git a/testing/btest/Baseline/plugins.hooks/output b/testing/btest/Baseline/plugins.hooks/output index b58e835a11..1f80c388bc 100644 --- a/testing/btest/Baseline/plugins.hooks/output +++ b/testing/btest/Baseline/plugins.hooks/output @@ -157,7 +157,7 @@ 0.000000 MetaHookPost CallFunction(Analyzer::register_for_ports, , (Analyzer::ANALYZER_XMPP, {5222<...>/tcp})) -> 0.000000 MetaHookPost CallFunction(Cluster::is_enabled, , ()) -> 0.000000 MetaHookPost CallFunction(Cluster::is_enabled, , ()) -> -0.000000 MetaHookPost CallFunction(Cluster::local_node_type, , ()) -> +0.000000 MetaHookPost CallFunction(Cluster::local_node_type, , ()) -> 0.000000 MetaHookPost CallFunction(Cluster::register_pool, , ([topic=bro<...>/logger, node_type=Cluster::LOGGER, max_nodes=, exclusive=F])) -> 0.000000 MetaHookPost CallFunction(Cluster::register_pool, , ([topic=bro<...>/proxy, node_type=Cluster::PROXY, max_nodes=, exclusive=F])) -> 0.000000 MetaHookPost CallFunction(Cluster::register_pool, , ([topic=bro<...>/worker, node_type=Cluster::WORKER, max_nodes=, exclusive=F])) -> @@ -274,7 +274,7 @@ 0.000000 MetaHookPost CallFunction(Log::__create_stream, , (Weird::LOG, [columns=, ev=Weird::log_weird, path=weird])) -> 0.000000 MetaHookPost CallFunction(Log::__create_stream, , (X509::LOG, [columns=, ev=X509::log_x509, path=x509])) -> 0.000000 MetaHookPost CallFunction(Log::__create_stream, , (mysql::LOG, [columns=, ev=MySQL::log_mysql, path=mysql])) -> -0.000000 MetaHookPost CallFunction(Log::__write, , (PacketFilter::LOG, [ts=1535159114.845209, node=bro, filter=ip or not ip, init=T, success=T])) -> +0.000000 MetaHookPost CallFunction(Log::__write, , (PacketFilter::LOG, [ts=1535402742.587686, node=bro, filter=ip or not ip, init=T, success=T])) -> 0.000000 MetaHookPost CallFunction(Log::add_default_filter, , (Broker::LOG)) -> 0.000000 MetaHookPost CallFunction(Log::add_default_filter, , (Cluster::LOG)) -> 0.000000 MetaHookPost CallFunction(Log::add_default_filter, , (Config::LOG)) -> @@ -459,7 +459,7 @@ 0.000000 MetaHookPost CallFunction(Log::create_stream, , (Weird::LOG, [columns=, ev=Weird::log_weird, path=weird])) -> 0.000000 MetaHookPost CallFunction(Log::create_stream, , (X509::LOG, [columns=, ev=X509::log_x509, path=x509])) -> 0.000000 MetaHookPost CallFunction(Log::create_stream, , (mysql::LOG, [columns=, ev=MySQL::log_mysql, path=mysql])) -> -0.000000 MetaHookPost CallFunction(Log::write, , (PacketFilter::LOG, [ts=1535159114.845209, node=bro, filter=ip or not ip, init=T, success=T])) -> +0.000000 MetaHookPost CallFunction(Log::write, , (PacketFilter::LOG, [ts=1535402742.587686, node=bro, filter=ip or not ip, init=T, success=T])) -> 0.000000 MetaHookPost CallFunction(NetControl::check_plugins, , ()) -> 0.000000 MetaHookPost CallFunction(NetControl::init, , ()) -> 0.000000 MetaHookPost CallFunction(Notice::want_pp, , ()) -> @@ -1022,7 +1022,7 @@ 0.000000 MetaHookPre CallFunction(Analyzer::register_for_ports, , (Analyzer::ANALYZER_XMPP, {5222<...>/tcp})) 0.000000 MetaHookPre CallFunction(Cluster::is_enabled, , ()) 0.000000 MetaHookPre CallFunction(Cluster::is_enabled, , ()) -0.000000 MetaHookPre CallFunction(Cluster::local_node_type, , ()) +0.000000 MetaHookPre CallFunction(Cluster::local_node_type, , ()) 0.000000 MetaHookPre CallFunction(Cluster::register_pool, , ([topic=bro<...>/logger, node_type=Cluster::LOGGER, max_nodes=, exclusive=F])) 0.000000 MetaHookPre CallFunction(Cluster::register_pool, , ([topic=bro<...>/proxy, node_type=Cluster::PROXY, max_nodes=, exclusive=F])) 0.000000 MetaHookPre CallFunction(Cluster::register_pool, , ([topic=bro<...>/worker, node_type=Cluster::WORKER, max_nodes=, exclusive=F])) @@ -1139,7 +1139,7 @@ 0.000000 MetaHookPre CallFunction(Log::__create_stream, , (Weird::LOG, [columns=, ev=Weird::log_weird, path=weird])) 0.000000 MetaHookPre CallFunction(Log::__create_stream, , (X509::LOG, [columns=, ev=X509::log_x509, path=x509])) 0.000000 MetaHookPre CallFunction(Log::__create_stream, , (mysql::LOG, [columns=, ev=MySQL::log_mysql, path=mysql])) -0.000000 MetaHookPre CallFunction(Log::__write, , (PacketFilter::LOG, [ts=1535159114.845209, node=bro, filter=ip or not ip, init=T, success=T])) +0.000000 MetaHookPre CallFunction(Log::__write, , (PacketFilter::LOG, [ts=1535402742.587686, node=bro, filter=ip or not ip, init=T, success=T])) 0.000000 MetaHookPre CallFunction(Log::add_default_filter, , (Broker::LOG)) 0.000000 MetaHookPre CallFunction(Log::add_default_filter, , (Cluster::LOG)) 0.000000 MetaHookPre CallFunction(Log::add_default_filter, , (Config::LOG)) @@ -1324,7 +1324,7 @@ 0.000000 MetaHookPre CallFunction(Log::create_stream, , (Weird::LOG, [columns=, ev=Weird::log_weird, path=weird])) 0.000000 MetaHookPre CallFunction(Log::create_stream, , (X509::LOG, [columns=, ev=X509::log_x509, path=x509])) 0.000000 MetaHookPre CallFunction(Log::create_stream, , (mysql::LOG, [columns=, ev=MySQL::log_mysql, path=mysql])) -0.000000 MetaHookPre CallFunction(Log::write, , (PacketFilter::LOG, [ts=1535159114.845209, node=bro, filter=ip or not ip, init=T, success=T])) +0.000000 MetaHookPre CallFunction(Log::write, , (PacketFilter::LOG, [ts=1535402742.587686, node=bro, filter=ip or not ip, init=T, success=T])) 0.000000 MetaHookPre CallFunction(NetControl::check_plugins, , ()) 0.000000 MetaHookPre CallFunction(NetControl::init, , ()) 0.000000 MetaHookPre CallFunction(Notice::want_pp, , ()) @@ -2003,7 +2003,7 @@ 0.000000 | HookCallFunction Log::__create_stream(Weird::LOG, [columns=, ev=Weird::log_weird, path=weird]) 0.000000 | HookCallFunction Log::__create_stream(X509::LOG, [columns=, ev=X509::log_x509, path=x509]) 0.000000 | HookCallFunction Log::__create_stream(mysql::LOG, [columns=, ev=MySQL::log_mysql, path=mysql]) -0.000000 | HookCallFunction Log::__write(PacketFilter::LOG, [ts=1535159114.845209, node=bro, filter=ip or not ip, init=T, success=T]) +0.000000 | HookCallFunction Log::__write(PacketFilter::LOG, [ts=1535402742.587686, node=bro, filter=ip or not ip, init=T, success=T]) 0.000000 | HookCallFunction Log::add_default_filter(Broker::LOG) 0.000000 | HookCallFunction Log::add_default_filter(Cluster::LOG) 0.000000 | HookCallFunction Log::add_default_filter(Config::LOG) @@ -2188,7 +2188,7 @@ 0.000000 | HookCallFunction Log::create_stream(Weird::LOG, [columns=, ev=Weird::log_weird, path=weird]) 0.000000 | HookCallFunction Log::create_stream(X509::LOG, [columns=, ev=X509::log_x509, path=x509]) 0.000000 | HookCallFunction Log::create_stream(mysql::LOG, [columns=, ev=MySQL::log_mysql, path=mysql]) -0.000000 | HookCallFunction Log::write(PacketFilter::LOG, [ts=1535159114.845209, node=bro, filter=ip or not ip, init=T, success=T]) +0.000000 | HookCallFunction Log::write(PacketFilter::LOG, [ts=1535402742.587686, node=bro, filter=ip or not ip, init=T, success=T]) 0.000000 | HookCallFunction NetControl::check_plugins() 0.000000 | HookCallFunction NetControl::init() 0.000000 | HookCallFunction Notice::want_pp() @@ -2588,7 +2588,7 @@ 0.000000 | HookLoadFile base<...>/x509 0.000000 | HookLoadFile base<...>/xmpp 0.000000 | HookLogInit packet_filter 1/1 {ts (time), node (string), filter (string), init (bool), success (bool)} -0.000000 | HookLogWrite packet_filter [ts=1535159114.845209, node=bro, filter=ip or not ip, init=T, success=T] +0.000000 | HookLogWrite packet_filter [ts=1535402742.587686, node=bro, filter=ip or not ip, init=T, success=T] 0.000000 | HookQueueEvent NetControl::init() 0.000000 | HookQueueEvent bro_init() 0.000000 | HookQueueEvent filter_change_tracking() diff --git a/testing/btest/broker/remote_publish_and_relay_event.bro b/testing/btest/broker/remote_publish_and_relay_event.bro deleted file mode 100644 index 444b454f80..0000000000 --- a/testing/btest/broker/remote_publish_and_relay_event.bro +++ /dev/null @@ -1,119 +0,0 @@ -# @TEST-SERIALIZE: comm -# -# @TEST-EXEC: btest-bg-run three "bro -B broker -b ../three.bro >three.out" -# @TEST-EXEC: btest-bg-run two "bro -B broker -b ../two.bro >two.out" -# @TEST-EXEC: btest-bg-run one "bro -B broker -b ../one.bro >one.out" -# -# @TEST-EXEC: btest-bg-wait 20 -# @TEST-EXEC: btest-diff one/one.out -# @TEST-EXEC: btest-diff two/two.out -# @TEST-EXEC: btest-diff three/three.out - -@TEST-START-FILE one.bro - -redef exit_only_after_terminate = T; - -event my_event(s: string) - { - print "got my_event", s; - } - -event ready_event() - { - print "got ready event"; - - Broker::publish_and_relay("bro/event/pre-relay", "bro/event/post-relay", - my_event, "hello world"); - } - -event bro_init() - { - Broker::subscribe("bro/event/ready"); - Broker::peer("127.0.0.1", 10000/tcp); - } - -event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) - { - print fmt("sender added peer: endpoint=%s msg=%s", - endpoint$network$address, msg); - } - -event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) - { - print fmt("sender lost peer: endpoint=%s msg=%s", - endpoint$network$address, msg); - terminate(); - } - -@TEST-END-FILE - - -@TEST-START-FILE two.bro - -redef exit_only_after_terminate = T; - -global peers_added = 0; - -event my_event(s: string) - { - print "got my_event", s; - } - -event ready_event() - { - } - -event bro_init() - { - Broker::subscribe("bro/event/pre-relay"); - Broker::listen("127.0.0.1", 10000/tcp); - Broker::peer("127.0.0.1", 9999/tcp); - } - -event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) - { - print fmt("receiver added peer: endpoint=%s msg=%s", endpoint$network$address, msg); - ++peers_added; - - if ( peers_added == 2 ) - { - print "sending ready event"; - Broker::publish("bro/event/ready", ready_event); - } - } - -event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) - { - print fmt("receiver lost peer: endpoint=%s msg=%s", endpoint$network$address, msg); - terminate(); - } - -@TEST-END-FILE - -@TEST-START-FILE three.bro - -redef exit_only_after_terminate = T; - -event my_event(s: string) - { - print "got my_event", s; - terminate(); - } - -event bro_init() - { - Broker::subscribe("bro/event/post-relay"); - Broker::listen("127.0.0.1", 9999/tcp); - } - -event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) - { - print fmt("receiver added peer: endpoint=%s msg=%s", endpoint$network$address, msg); - } - -event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) - { - print fmt("receiver lost peer: endpoint=%s msg=%s", endpoint$network$address, msg); - } - -@TEST-END-FILE diff --git a/testing/btest/broker/remote_relay_event.bro b/testing/btest/broker/remote_relay_event.bro deleted file mode 100644 index c65265bdb5..0000000000 --- a/testing/btest/broker/remote_relay_event.bro +++ /dev/null @@ -1,120 +0,0 @@ -# @TEST-SERIALIZE: comm -# -# @TEST-EXEC: btest-bg-run three "bro -B broker -b ../three.bro >three.out" -# @TEST-EXEC: btest-bg-run two "bro -B broker -b ../two.bro >two.out" -# @TEST-EXEC: btest-bg-run one "bro -B broker -b ../one.bro >one.out" -# -# @TEST-EXEC: btest-bg-wait 20 -# @TEST-EXEC: btest-diff one/one.out -# @TEST-EXEC: btest-diff two/two.out -# @TEST-EXEC: btest-diff three/three.out - -@TEST-START-FILE one.bro - -redef exit_only_after_terminate = T; - -event my_event(s: string) - { - print "got my_event", s; - } - -event ready_event() - { - print "got ready event"; - - Broker::relay("bro/event/pre-relay", "bro/event/post-relay", my_event, - "hello world"); - } - -event bro_init() - { - Broker::subscribe("bro/event/ready"); - Broker::peer("127.0.0.1", 10000/tcp); - } - -event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) - { - print fmt("sender added peer: endpoint=%s msg=%s", - endpoint$network$address, msg); - } - -event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) - { - print fmt("sender lost peer: endpoint=%s msg=%s", - endpoint$network$address, msg); - terminate(); - } - -@TEST-END-FILE - - -@TEST-START-FILE two.bro - -redef exit_only_after_terminate = T; - -global peers_added = 0; - -event my_event(s: string) - { - print "got my_event", s; - terminate(); - } - -event ready_event() - { - } - -event bro_init() - { - Broker::subscribe("bro/event/pre-relay"); - Broker::listen("127.0.0.1", 10000/tcp); - Broker::peer("127.0.0.1", 9999/tcp); - } - -event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) - { - print fmt("receiver added peer: endpoint=%s msg=%s", endpoint$network$address, msg); - ++peers_added; - - if ( peers_added == 2 ) - { - print "sending ready event"; - Broker::publish("bro/event/ready", ready_event); - } - } - -event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) - { - print fmt("receiver lost peer: endpoint=%s msg=%s", endpoint$network$address, msg); - terminate(); - } - -@TEST-END-FILE - -@TEST-START-FILE three.bro - -redef exit_only_after_terminate = T; - -event my_event(s: string) - { - print "got my_event", s; - terminate(); - } - -event bro_init() - { - Broker::subscribe("bro/event/post-relay"); - Broker::listen("127.0.0.1", 9999/tcp); - } - -event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) - { - print fmt("receiver added peer: endpoint=%s msg=%s", endpoint$network$address, msg); - } - -event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) - { - print fmt("receiver lost peer: endpoint=%s msg=%s", endpoint$network$address, msg); - } - -@TEST-END-FILE