Remove "relay" family of Broker functions

Namely these are now removed:

    - Broker::relay
    - Broker::publish_and_relay
    - Cluster::relay_rr
    - Cluster::relay_hrw

The idea being that Broker may eventually implement the necessary
routing (plus load balancing) functionality.  For now, code that used
these should "manually" handle and re-publish events as needed.
This commit is contained in:
Jon Siwek 2018-08-27 16:49:35 -05:00
parent e055f9b36b
commit 1a75ef2abd
17 changed files with 96 additions and 766 deletions

@ -1 +1 @@
Subproject commit 18d56b70558db61f424dc90e5d010f57b75de459 Subproject commit 955a8e4369d7bce78ea5e23a763b594c85b910b9

View file

@ -518,24 +518,28 @@ Worker Sending Events To All Workers
Since workers are not directly connected to each other in the cluster Since workers are not directly connected to each other in the cluster
topology, this type of communication is a bit different than what we topology, this type of communication is a bit different than what we
did before. Instead of using :bro:see:`Broker::publish` we use different did before since we have to manually relay the event via some node that *is*
"relay" calls to hop the message from a different node that *is* connected. connected to all workers. The manager or a proxy satisfies that requirement:
.. code:: bro .. code:: bro
event worker_to_workers(worker_name: string) event worker_to_workers(worker_name: string)
{ {
print "got event from worker", worker_name; @if ( Cluster::local_node_type() == Cluster::MANAGER ||
Cluster::local_node_type() == Cluster::PROXY )
Broker::publish(Cluster::worker_topic, worker_to_workers,
worker_name)
@else
print "got event from worker", worker_name;
@endif
} }
event some_event_handled_on_worker() event some_event_handled_on_worker()
{ {
# We know the manager is connected to all workers, so we could # We know the manager is connected to all workers, so we could
# choose to relay the event across it. Note that sending the event # choose to relay the event across it.
# this way will not allow the manager to handle it, even if it Broker::publish(Cluster::manager_topic, worker_to_workers,
# does have an event handler. Cluster::node + " (via manager)");
Broker::relay(Cluster::manager_topic, Cluster::worker_topic,
worker_to_workers, Cluster::node + " (via manager)");
# We also know that any given proxy is connected to all workers, # We also know that any given proxy is connected to all workers,
# though now we have a choice of which proxy to use. If we # though now we have a choice of which proxy to use. If we
@ -543,9 +547,9 @@ did before. Instead of using :bro:see:`Broker::publish` we use different
# we can use a round-robin strategy. The key used here is simply # we can use a round-robin strategy. The key used here is simply
# used by the cluster framework internally to keep track of # used by the cluster framework internally to keep track of
# which node is up next in the round-robin. # which node is up next in the round-robin.
Cluster::relay_rr(Cluster::proxy_pool, "example_key", local pt = Cluster::rr_topic(Cluster::proxy_pool, "example_key");
Cluster::worker_topic, worker_to_workers, Broker::publish(pt, worker_to_workers,
Cluster::node + " (via a proxy)"); Cluster::node + " (via a proxy)");
} }
Worker Distributing Events Uniformly Across Proxies Worker Distributing Events Uniformly Across Proxies

View file

@ -28,11 +28,6 @@ export {
## record as it is sent on to the logging framework. ## record as it is sent on to the logging framework.
global log_config: event(rec: Info); global log_config: event(rec: Info);
## Broker topic for announcing new configuration values. Sending new_value,
## peers can send configuration changes that will be distributed across
## the entire cluster.
const change_topic = "bro/config/change";
## This function is the config framework layer around the lower-level ## This function is the config framework layer around the lower-level
## :bro:see:`Option::set` call. Config::set_value will set the configuration ## :bro:see:`Option::set` call. Config::set_value will set the configuration
## value for all nodes in the cluster, no matter where it was called. Note ## value for all nodes in the cluster, no matter where it was called. Note
@ -57,48 +52,46 @@ type OptionCacheValue: record {
global option_cache: table[string] of OptionCacheValue; global option_cache: table[string] of OptionCacheValue;
event bro_init()
{
Broker::subscribe(change_topic);
}
event Config::cluster_set_option(ID: string, val: any, location: string) event Config::cluster_set_option(ID: string, val: any, location: string)
{ {
@if ( Cluster::local_node_type() == Cluster::MANAGER ) @if ( Cluster::local_node_type() == Cluster::MANAGER )
option_cache[ID] = OptionCacheValue($val=val, $location=location); option_cache[ID] = OptionCacheValue($val=val, $location=location);
Broker::publish(Cluster::broadcast_topic, Config::cluster_set_option,
ID, val, location);
@endif @endif
Option::set(ID, val, location); Option::set(ID, val, location);
} }
function set_value(ID: string, val: any, location: string &default = "" &optional): bool function set_value(ID: string, val: any, location: string &default = "" &optional): bool
{ {
local cache_val: any; # Always copy the value to break references -- if caller mutates their
# First cache value in case setting it succeeds and we have to store it. # value afterwards, we still guarantee the option has not changed. If
if ( Cluster::local_node_type() == Cluster::MANAGER ) # one wants it to change, they need to explicitly call Option::set_value
cache_val = copy(val); # or Option::set with the intended value at the time of the call.
val = copy(val);
# First try setting it locally - abort if not possible. # First try setting it locally - abort if not possible.
if ( ! Option::set(ID, val, location) ) if ( ! Option::set(ID, val, location) )
return F; return F;
# If setting worked, copy the new value into the cache on the manager
if ( Cluster::local_node_type() == Cluster::MANAGER )
option_cache[ID] = OptionCacheValue($val=cache_val, $location=location);
# If it turns out that it is possible - send it to everyone else to apply. @if ( Cluster::local_node_type() == Cluster::MANAGER )
Broker::publish(change_topic, Config::cluster_set_option, ID, val, location); option_cache[ID] = OptionCacheValue($val=val, $location=location);
Broker::publish(Cluster::broadcast_topic, Config::cluster_set_option,
ID, val, location);
@else
Broker::publish(Cluster::manager_topic, Config::cluster_set_option,
ID, val, location);
@endif
if ( Cluster::local_node_type() != Cluster::MANAGER )
{
Broker::relay(change_topic, change_topic, Config::cluster_set_option, ID, val, location);
}
return T; return T;
} }
@else @else # Standalone implementation
# Standalone implementation
function set_value(ID: string, val: any, location: string &default = "" &optional): bool function set_value(ID: string, val: any, location: string &default = "" &optional): bool
{ {
return Option::set(ID, val, location); return Option::set(ID, val, location);
} }
@endif @endif # Cluster::is_enabled
@if ( Cluster::is_enabled() && Cluster::local_node_type() == Cluster::MANAGER ) @if ( Cluster::is_enabled() && Cluster::local_node_type() == Cluster::MANAGER )
# Handling of new worker nodes. # Handling of new worker nodes.

View file

@ -54,11 +54,14 @@ event Cluster::node_up(name: string, id: string)
# has to be distributed. # has to be distributed.
event Intel::new_item(item: Item) &priority=5 event Intel::new_item(item: Item) &priority=5
{ {
if ( Cluster::proxy_pool$alive_count == 0 ) local pt = Cluster::rr_topic(Cluster::proxy_pool, indicator_topic);
Broker::publish(indicator_topic, Intel::insert_indicator, item);
else if ( pt == "" )
Cluster::relay_rr(Cluster::proxy_pool, "Intel::new_item_relay_rr", # No proxies alive, publish to all workers ourself instead of
indicator_topic, Intel::insert_indicator, item); # relaying via a proxy.
pt = indicator_topic;
Broker::publish(pt, Intel::insert_indicator, item);
} }
# Handling of item insertion triggered by remote node. # Handling of item insertion triggered by remote node.
@ -103,3 +106,12 @@ event Intel::insert_indicator(item: Intel::Item) &priority=5
Intel::_insert(item, F); Intel::_insert(item, F);
} }
@endif @endif
@if ( Cluster::local_node_type() == Cluster::PROXY )
event Intel::insert_indicator(item: Intel::Item) &priority=5
{
# Just forwarding from manager to workers.
Broker::publish(indicator_topic, Intel::insert_indicator, item);
}
@endif

View file

@ -24,32 +24,41 @@ export {
## Sniffed mime type of the file. ## Sniffed mime type of the file.
dcc_mime_type: string &log &optional; dcc_mime_type: string &log &optional;
}; };
## The broker topic name to which expected DCC transfer updates are
## relayed.
const dcc_transfer_update_topic = "bro/irc/dcc_transfer_update" &redef;
} }
global dcc_expected_transfers: table[addr, port] of Info &read_expire=5mins; global dcc_expected_transfers: table[addr, port] of Info &read_expire=5mins;
function dcc_relay_topic(): string
{
local rval = Cluster::rr_topic(Cluster::proxy_pool, "dcc_transfer_rr_key");
if ( rval == "" )
# No proxy is alive, so relay via manager instead.
return Cluster::manager_topic;
return rval;
}
event dcc_transfer_add(host: addr, p: port, info: Info) event dcc_transfer_add(host: addr, p: port, info: Info)
{ {
@if ( Cluster::local_node_type() == Cluster::PROXY ||
Cluster::local_node_type() == Cluster::MANAGER )
Broker::publish(Cluster::worker_topic, dcc_transfer_add, host, p, info);
@else
dcc_expected_transfers[host, p] = info; dcc_expected_transfers[host, p] = info;
Analyzer::schedule_analyzer(0.0.0.0, host, p, Analyzer::schedule_analyzer(0.0.0.0, host, p,
Analyzer::ANALYZER_IRC_DATA, 5 min); Analyzer::ANALYZER_IRC_DATA, 5 min);
@endif
} }
event dcc_transfer_remove(host: addr, p: port) event dcc_transfer_remove(host: addr, p: port)
{ {
@if ( Cluster::local_node_type() == Cluster::PROXY ||
Cluster::local_node_type() == Cluster::MANAGER )
Broker::publish(Cluster::worker_topic, dcc_transfer_remove, host, p);
@else
delete dcc_expected_transfers[host, p]; delete dcc_expected_transfers[host, p];
} @endif
event bro_init()
{
local lnt = Cluster::local_node_type();
if ( lnt == Cluster::WORKER )
Broker::subscribe(dcc_transfer_update_topic);
} }
function log_dcc(f: fa_file) function log_dcc(f: fa_file)
@ -76,9 +85,11 @@ function log_dcc(f: fa_file)
delete irc$dcc_mime_type; delete irc$dcc_mime_type;
delete dcc_expected_transfers[cid$resp_h, cid$resp_p]; delete dcc_expected_transfers[cid$resp_h, cid$resp_p];
Cluster::relay_rr(Cluster::proxy_pool, dcc_transfer_update_topic,
dcc_transfer_update_topic, dcc_transfer_remove, @if ( Cluster::is_enabled() )
cid$resp_h, cid$resp_p); Broker::publish(dcc_relay_topic(), dcc_transfer_remove,
cid$resp_h, cid$resp_p);
@endif
return; return;
} }
} }
@ -102,9 +113,10 @@ event irc_dcc_message(c: connection, is_orig: bool,
local p = count_to_port(dest_port, tcp); local p = count_to_port(dest_port, tcp);
Analyzer::schedule_analyzer(0.0.0.0, address, p, Analyzer::ANALYZER_IRC_DATA, 5 min); Analyzer::schedule_analyzer(0.0.0.0, address, p, Analyzer::ANALYZER_IRC_DATA, 5 min);
dcc_expected_transfers[address, p] = c$irc; dcc_expected_transfers[address, p] = c$irc;
Cluster::relay_rr(Cluster::proxy_pool, dcc_transfer_update_topic,
dcc_transfer_update_topic, dcc_transfer_add, @if ( Cluster::is_enabled() )
address, p, c$irc); Broker::publish(dcc_relay_topic(), dcc_transfer_add, address, p, c$irc);
@endif
} }
event scheduled_analyzer_applied(c: connection, a: Analyzer::Tag) &priority=10 event scheduled_analyzer_applied(c: connection, a: Analyzer::Tag) &priority=10
@ -119,8 +131,10 @@ event connection_state_remove(c: connection) &priority=-5
if ( [c$id$resp_h, c$id$resp_p] in dcc_expected_transfers ) if ( [c$id$resp_h, c$id$resp_p] in dcc_expected_transfers )
{ {
delete dcc_expected_transfers[c$id$resp_h, c$id$resp_p]; delete dcc_expected_transfers[c$id$resp_h, c$id$resp_p];
Cluster::relay_rr(Cluster::proxy_pool, dcc_transfer_update_topic,
dcc_transfer_update_topic, dcc_transfer_remove, @if ( Cluster::is_enabled() )
c$id$resp_h, c$id$resp_p); Broker::publish(dcc_relay_topic(), dcc_transfer_remove,
c$id$resp_h, c$id$resp_p);
@endif
} }
} }

View file

@ -403,76 +403,6 @@ bool Manager::PublishEvent(string topic, RecordVal* args)
return PublishEvent(topic, event_name, std::move(xs)); return PublishEvent(topic, event_name, std::move(xs));
} }
bool Manager::RelayEvent(std::string first_topic,
broker::set relay_topics,
std::string name,
broker::vector args,
bool handle_on_relayer)
{
if ( bstate->endpoint.is_shutdown() )
return true;
if ( peer_count == 0 )
return true;
DBG_LOG(DBG_BROKER, "Publishing %s-relay event: %s",
handle_on_relayer ? "handle" : "",
RenderEvent(first_topic, name, args).c_str());
if ( handle_on_relayer )
{
broker::bro::HandleAndRelayEvent msg(std::move(relay_topics),
std::move(name),
std::move(args));
bstate->endpoint.publish(std::move(first_topic), std::move(msg));
}
else
{
broker::bro::RelayEvent msg(std::move(relay_topics),
std::move(name),
std::move(args));
bstate->endpoint.publish(std::move(first_topic), std::move(msg));
}
++statistics.num_events_outgoing;
return true;
}
bool Manager::RelayEvent(std::string first_topic,
std::set<std::string> relay_topics,
RecordVal* args,
bool handle_on_relayer)
{
if ( bstate->endpoint.is_shutdown() )
return true;
if ( peer_count == 0 )
return true;
if ( ! args->Lookup(0) )
return false;
auto event_name = args->Lookup(0)->AsString()->CheckString();
auto vv = args->Lookup(1)->AsVectorVal();
broker::vector xs;
xs.reserve(vv->Size());
for ( auto i = 0u; i < vv->Size(); ++i )
{
auto val = vv->Lookup(i)->AsRecordVal()->Lookup(0);
auto data_val = static_cast<DataVal*>(val);
xs.emplace_back(data_val->data);
}
broker::set topic_set;
for ( auto& t : relay_topics )
topic_set.emplace(std::move(t));
return RelayEvent(first_topic, std::move(topic_set), event_name,
std::move(xs), handle_on_relayer);
}
bool Manager::PublishIdentifier(std::string topic, std::string id) bool Manager::PublishIdentifier(std::string topic, std::string id)
{ {
if ( bstate->endpoint.is_shutdown() ) if ( bstate->endpoint.is_shutdown() )
@ -905,14 +835,6 @@ void Manager::DispatchMessage(broker::data msg)
ProcessEvent(std::move(msg)); ProcessEvent(std::move(msg));
break; break;
case broker::bro::Message::Type::RelayEvent:
ProcessRelayEvent(std::move(msg));
break;
case broker::bro::Message::Type::HandleAndRelayEvent:
ProcessHandleAndRelayEvent(std::move(msg));
break;
case broker::bro::Message::Type::LogCreate: case broker::bro::Message::Type::LogCreate:
ProcessLogCreate(std::move(msg)); ProcessLogCreate(std::move(msg));
break; break;
@ -1052,29 +974,6 @@ void Manager::ProcessEvent(broker::bro::Event ev)
ProcessEvent(std::move(ev.name()), std::move(ev.args())); ProcessEvent(std::move(ev.name()), std::move(ev.args()));
} }
void Manager::ProcessRelayEvent(broker::bro::RelayEvent ev)
{
DBG_LOG(DBG_BROKER, "Received relay event: %s", RenderMessage(ev).c_str());
++statistics.num_events_incoming;
for ( auto& t : ev.topics() )
PublishEvent(std::move(caf::get<std::string>(t)),
std::move(ev.name()),
std::move(ev.args()));
}
void Manager::ProcessHandleAndRelayEvent(broker::bro::HandleAndRelayEvent ev)
{
DBG_LOG(DBG_BROKER, "Received handle-relay event: %s",
RenderMessage(ev).c_str());
ProcessEvent(ev.name(), ev.args());
for ( auto& t : ev.topics() )
PublishEvent(std::move(caf::get<std::string>(t)),
std::move(ev.name()),
std::move(ev.args()));
}
bool bro_broker::Manager::ProcessLogCreate(broker::bro::LogCreate lc) bool bro_broker::Manager::ProcessLogCreate(broker::bro::LogCreate lc)
{ {
DBG_LOG(DBG_BROKER, "Received log-create: %s", RenderMessage(lc).c_str()); DBG_LOG(DBG_BROKER, "Received log-create: %s", RenderMessage(lc).c_str());

View file

@ -153,43 +153,6 @@ public:
*/ */
bool PublishEvent(std::string topic, RecordVal* ev); bool PublishEvent(std::string topic, RecordVal* ev);
/**
* Sends an event to any interested peers, who, upon receipt,
* republishes the event to a new set of topics and optionally
* calls event handlers.
* @param first_topic the first topic to use when publishing the event
* @param relay_topics the set of topics the receivers will use to
* republish the event. The event is relayed at most a single hop.
* @param name the name of the event
* @param args the event's arguments
* @param handle_on_relayer whether they relaying-node should call event
* handlers.
* @return true if the message is sent successfully.
*/
bool RelayEvent(std::string first_topic,
broker::set relay_topics,
std::string name,
broker::vector args,
bool handle_on_relayer);
/**
* Sends an event to any interested peers, who, upon receipt,
* republishes the event to a new set of topics and optionally
* calls event handlers.
* @param first_topic the first topic to use when publishing the event
* @param relay_topics the set of topics the receivers will use to
* republish the event. The event is relayed at most a single hop.
* @param ev the event and its arguments to send to peers, in the form of
* a Broker::Event record type.
* @param handle_on_relayer whether they relaying-node should call event
* handlers.
* @return true if the message is sent successfully.
*/
bool RelayEvent(std::string first_topic,
std::set<std::string> relay_topics,
RecordVal* ev,
bool handle_on_relayer);
/** /**
* Send a message to create a log stream to any interested peers. * Send a message to create a log stream to any interested peers.
* The log stream may or may not already exist on the receiving side. * The log stream may or may not already exist on the receiving side.
@ -351,8 +314,6 @@ private:
void DispatchMessage(broker::data msg); void DispatchMessage(broker::data msg);
void ProcessEvent(std::string name, broker::vector args); void ProcessEvent(std::string name, broker::vector args);
void ProcessEvent(broker::bro::Event ev); void ProcessEvent(broker::bro::Event ev);
void ProcessRelayEvent(broker::bro::RelayEvent re);
void ProcessHandleAndRelayEvent(broker::bro::HandleAndRelayEvent ev);
bool ProcessLogCreate(broker::bro::LogCreate lc); bool ProcessLogCreate(broker::bro::LogCreate lc);
bool ProcessLogWrite(broker::bro::LogWrite lw); bool ProcessLogWrite(broker::bro::LogWrite lw);
bool ProcessIdentifierUpdate(broker::bro::IdentifierUpdate iu); bool ProcessIdentifierUpdate(broker::bro::IdentifierUpdate iu);

View file

@ -67,28 +67,6 @@ static bool publish_event_args(val_list& args, const BroString* topic,
return rval; return rval;
} }
static bool relay_event_args(val_list& args, const BroString* topic,
std::set<std::string> topic_set, Frame* frame)
{
bro_broker::Manager::ScriptScopeGuard ssg;
auto rval = false;
if ( args[0]->Type()->Tag() == TYPE_RECORD )
rval = broker_mgr->RelayEvent(topic->CheckString(),
std::move(topic_set),
args[0]->AsRecordVal(), false);
else
{
auto ev = broker_mgr->MakeEvent(&args, frame);
rval = broker_mgr->RelayEvent(topic->CheckString(),
std::move(topic_set),
ev, false);
Unref(ev);
}
return rval;
}
%%} %%}
module Broker; module Broker;
@ -131,130 +109,6 @@ function Broker::publish%(topic: string, ...%): bool
return new Val(rval, TYPE_BOOL); return new Val(rval, TYPE_BOOL);
%} %}
## Publishes an event at a given topic, with any receivers automatically
## forwarding it to its peers with a different topic. The event is relayed
## at most a single hop and the relayer does not call any local event handlers.
##
## first_topic: the initial topic to use for publishing the event.
##
## args: the first member of the argument list may be either a string or
## a set of strings indicating the secondary topic that the first
## set of receivers will use to re-publish the event. The remaining
## members of the argument list are either the return value of a
## previously-made call to :bro:see:`Broker::make_event` or the
## argument list that should be passed along to it, so that it can
## be called as part of executing this function.
##
## Returns: true if the message is sent.
function Broker::relay%(first_topic: string, ...%): bool
%{
bro_broker::Manager::ScriptScopeGuard ssg;
val_list* bif_args = @ARGS@;
if ( bif_args->length() < 3 )
{
builtin_error("Broker::relay requires at least 3 arguments");
return new Val(false, TYPE_BOOL);
}
auto second_topic = (*bif_args)[1];
if ( second_topic->Type()->Tag() != TYPE_STRING &&
! is_string_set(second_topic->Type()) )
{
builtin_error("Broker::relay requires a string or string_set as 2nd argument");
return new Val(false, TYPE_BOOL);
}
auto topic_set = val_to_topic_set(second_topic);
if ( topic_set.empty() )
return new Val(false, TYPE_BOOL);
val_list args(bif_args->length() - 2);
for ( auto i = 2; i < bif_args->length(); ++i )
args.append((*bif_args)[i]);
auto rval = false;
if ( args[0]->Type()->Tag() == TYPE_RECORD )
rval = broker_mgr->RelayEvent(first_topic->CheckString(),
std::move(topic_set),
args[0]->AsRecordVal(), false);
else
{
auto ev = broker_mgr->MakeEvent(&args, frame);
rval = broker_mgr->RelayEvent(first_topic->CheckString(),
std::move(topic_set), ev, false);
Unref(ev);
}
return new Val(rval, TYPE_BOOL);
%}
## Publishes an event at a given topic, with any receivers automatically
## forwarding it to its peers with a different topic. The event is relayed
## at most a single hop and the relayer does call local event handlers.
##
## first_topic: the initial topic to use for publishing the event.
##
## args: the first member of the argument list may be either a string or
## a set of strings indicating the secondary topic that the first
## set of receivers will use to re-publish the event. The remaining
## members of the argument list are either the return value of a
## previously-made call to :bro:see:`Broker::make_event` or the
## argument list that should be passed along to it, so that it can
## be called as part of executing this function.
##
## Returns: true if the message is sent.
function Broker::publish_and_relay%(first_topic: string, ...%): bool
%{
bro_broker::Manager::ScriptScopeGuard ssg;
val_list* bif_args = @ARGS@;
if ( bif_args->length() < 3 )
{
builtin_error("Broker::publish_and_relay requires at least 3 arguments");
return new Val(false, TYPE_BOOL);
}
auto second_topic = (*bif_args)[1];
if ( second_topic->Type()->Tag() != TYPE_STRING &&
! is_string_set(second_topic->Type()) )
{
builtin_error("Broker::publish_and_relay requires a string or string_set as 2nd argument");
return new Val(false, TYPE_BOOL);
}
auto topic_set = val_to_topic_set(second_topic);
if ( topic_set.empty() )
return new Val(false, TYPE_BOOL);
val_list args(bif_args->length() - 2);
for ( auto i = 2; i < bif_args->length(); ++i )
args.append((*bif_args)[i]);
auto rval = false;
if ( args[0]->Type()->Tag() == TYPE_RECORD )
rval = broker_mgr->RelayEvent(first_topic->CheckString(),
std::move(topic_set),
args[0]->AsRecordVal(), true);
else
{
auto ev = broker_mgr->MakeEvent(&args, frame);
rval = broker_mgr->RelayEvent(first_topic->CheckString(),
std::move(topic_set), ev, true);
Unref(ev);
}
return new Val(rval, TYPE_BOOL);
%}
function Broker::__flush_logs%(%): count function Broker::__flush_logs%(%): count
%{ %{
auto rval = broker_mgr->FlushLogBuffers(); auto rval = broker_mgr->FlushLogBuffers();
@ -345,81 +199,6 @@ function Cluster::publish_rr%(pool: Pool, key: string, ...%): bool
%} %}
## Publishes an event at a given topic, with a receiver node chosen
## from a pool according to Round-Robin distribution strategy. The receiving
## node, then automatically forwards it to its peers with a different topic.
## The event is relayed at most a single hop.
##
## pool: the pool of nodes that are eligible to receive the event.
##
## key: an arbitrary string to identify the purpose for which you're
## distributing the event. e.g. consider using namespacing of your
## script like "Intel::cluster_rr_key".
##
## args: the first member of the argument list may be either a string or
## a set of strings indicating the secondary topic that the receiver
## will use to re-publish the event. The remaining
## members of the argument list are either the return value of a
## previously-made call to :bro:see:`Broker::make_event` or the
## argument list that should be passed along to it, so that it can
## be called as part of executing this function.
##
## Returns: true if the message is sent.
function Cluster::relay_rr%(pool: Pool, key: any, ...%): bool
%{
val_list* bif_args = @ARGS@;
if ( bif_args->length() < 4 )
{
builtin_error("Cluster::relay_rr requires at least 4 arguments");
return new Val(false, TYPE_BOOL);
}
static Func* topic_func = 0;
if ( ! topic_func )
topic_func = global_scope()->Lookup("Cluster::rr_topic")->ID_Val()->AsFunc();
val_list vl(2);
vl.append(pool->Ref());
vl.append(key->Ref());
auto topic = topic_func->Call(&vl);
if ( ! topic->AsString()->Len() )
{
Unref(topic);
return new Val(false, TYPE_BOOL);
}
auto second_topic = (*bif_args)[2];
if ( second_topic->Type()->Tag() != TYPE_STRING &&
! is_string_set(second_topic->Type()) )
{
builtin_error("Cluster::relay_rr requires a string or string_set as 3rd argument");
Unref(topic);
return new Val(false, TYPE_BOOL);
}
auto topic_set = val_to_topic_set(second_topic);
if ( topic_set.empty() )
{
Unref(topic);
return new Val(false, TYPE_BOOL);
}
val_list args(bif_args->length() - 3);
for ( auto i = 3; i < bif_args->length(); ++i )
args.append((*bif_args)[i]);
auto rval = relay_event_args(args, topic->AsString(), std::move(topic_set),
frame);
Unref(topic);
return new Val(rval, TYPE_BOOL);
%}
## Publishes an event to a node within a pool according to Rendezvous ## Publishes an event to a node within a pool according to Rendezvous
## (Highest Random Weight) hashing strategy. ## (Highest Random Weight) hashing strategy.
## ##
@ -461,77 +240,3 @@ function Cluster::publish_hrw%(pool: Pool, key: any, ...%): bool
Unref(topic); Unref(topic);
return new Val(rval, TYPE_BOOL); return new Val(rval, TYPE_BOOL);
%} %}
## Publishes an event at a given topic, with a receiver node chosen
## from a pool according to Rendezvous (Highest Random Weight) distribution
## strategy. The receiving nodes then automatically forwards it to its peers
## with a different topic. The event is relayed at most a single hop.
##
## pool: the pool of nodes that are eligible to receive the event.
##
## key: data used for input to the hashing function that will uniformly
## distribute keys among available nodes.
##
## args: the first member of the argument list may be either a string or
## a set of strings indicating the secondary topic that the receiver
## will use to re-publish the event. The remaining
## members of the argument list are either the return value of a
## previously-made call to :bro:see:`Broker::make_event` or the
## argument list that should be passed along to it, so that it can
## be called as part of executing this function.
##
## Returns: true if the message is sent.
function Cluster::relay_hrw%(pool: Pool, key: any, ...%): bool
%{
val_list* bif_args = @ARGS@;
if ( bif_args->length() < 4 )
{
builtin_error("Cluster::relay_hrw requires at least 4 arguments");
return new Val(false, TYPE_BOOL);
}
static Func* topic_func = 0;
if ( ! topic_func )
topic_func = global_scope()->Lookup("Cluster::hrw_topic")->ID_Val()->AsFunc();
val_list vl(2);
vl.append(pool->Ref());
vl.append(key->Ref());
auto topic = topic_func->Call(&vl);
if ( ! topic->AsString()->Len() )
{
Unref(topic);
return new Val(false, TYPE_BOOL);
}
auto second_topic = (*bif_args)[2];
if ( second_topic->Type()->Tag() != TYPE_STRING &&
! is_string_set(second_topic->Type()) )
{
builtin_error("Cluster::relay_hrw requires a string or string_set as 3rd argument");
Unref(topic);
return new Val(false, TYPE_BOOL);
}
auto topic_set = val_to_topic_set(second_topic);
if ( topic_set.empty() )
{
Unref(topic);
return new Val(false, TYPE_BOOL);
}
val_list args(bif_args->length() - 3);
for ( auto i = 3; i < bif_args->length(); ++i )
args.append((*bif_args)[i]);
auto rval = relay_event_args(args, topic->AsString(), std::move(topic_set),
frame);
Unref(topic);
return new Val(rval, TYPE_BOOL);
%}

View file

@ -1,3 +0,0 @@
sender added peer: endpoint=127.0.0.1 msg=received handshake from remote core
got ready event
sender lost peer: endpoint=127.0.0.1 msg=lost remote peer

View file

@ -1,2 +0,0 @@
receiver added peer: endpoint=127.0.0.1 msg=handshake successful
got my_event, hello world

View file

@ -1,5 +0,0 @@
receiver added peer: endpoint=127.0.0.1 msg=received handshake from remote core
receiver added peer: endpoint=127.0.0.1 msg=handshake successful
sending ready event
got my_event, hello world
receiver lost peer: endpoint=127.0.0.1 msg=lost remote peer

View file

@ -1,3 +0,0 @@
sender added peer: endpoint=127.0.0.1 msg=received handshake from remote core
got ready event
sender lost peer: endpoint=127.0.0.1 msg=lost remote peer

View file

@ -1,2 +0,0 @@
receiver added peer: endpoint=127.0.0.1 msg=handshake successful
got my_event, hello world

View file

@ -1,4 +0,0 @@
receiver added peer: endpoint=127.0.0.1 msg=received handshake from remote core
receiver added peer: endpoint=127.0.0.1 msg=handshake successful
sending ready event
receiver lost peer: endpoint=127.0.0.1 msg=lost remote peer

View file

@ -157,7 +157,7 @@
0.000000 MetaHookPost CallFunction(Analyzer::register_for_ports, <frame>, (Analyzer::ANALYZER_XMPP, {5222<...>/tcp})) -> <no result> 0.000000 MetaHookPost CallFunction(Analyzer::register_for_ports, <frame>, (Analyzer::ANALYZER_XMPP, {5222<...>/tcp})) -> <no result>
0.000000 MetaHookPost CallFunction(Cluster::is_enabled, <frame>, ()) -> <no result> 0.000000 MetaHookPost CallFunction(Cluster::is_enabled, <frame>, ()) -> <no result>
0.000000 MetaHookPost CallFunction(Cluster::is_enabled, <null>, ()) -> <no result> 0.000000 MetaHookPost CallFunction(Cluster::is_enabled, <null>, ()) -> <no result>
0.000000 MetaHookPost CallFunction(Cluster::local_node_type, <frame>, ()) -> <no result> 0.000000 MetaHookPost CallFunction(Cluster::local_node_type, <null>, ()) -> <no result>
0.000000 MetaHookPost CallFunction(Cluster::register_pool, <frame>, ([topic=bro<...>/logger, node_type=Cluster::LOGGER, max_nodes=<uninitialized>, exclusive=F])) -> <no result> 0.000000 MetaHookPost CallFunction(Cluster::register_pool, <frame>, ([topic=bro<...>/logger, node_type=Cluster::LOGGER, max_nodes=<uninitialized>, exclusive=F])) -> <no result>
0.000000 MetaHookPost CallFunction(Cluster::register_pool, <frame>, ([topic=bro<...>/proxy, node_type=Cluster::PROXY, max_nodes=<uninitialized>, exclusive=F])) -> <no result> 0.000000 MetaHookPost CallFunction(Cluster::register_pool, <frame>, ([topic=bro<...>/proxy, node_type=Cluster::PROXY, max_nodes=<uninitialized>, exclusive=F])) -> <no result>
0.000000 MetaHookPost CallFunction(Cluster::register_pool, <frame>, ([topic=bro<...>/worker, node_type=Cluster::WORKER, max_nodes=<uninitialized>, exclusive=F])) -> <no result> 0.000000 MetaHookPost CallFunction(Cluster::register_pool, <frame>, ([topic=bro<...>/worker, node_type=Cluster::WORKER, max_nodes=<uninitialized>, exclusive=F])) -> <no result>
@ -274,7 +274,7 @@
0.000000 MetaHookPost CallFunction(Log::__create_stream, <frame>, (Weird::LOG, [columns=<no value description>, ev=Weird::log_weird, path=weird])) -> <no result> 0.000000 MetaHookPost CallFunction(Log::__create_stream, <frame>, (Weird::LOG, [columns=<no value description>, ev=Weird::log_weird, path=weird])) -> <no result>
0.000000 MetaHookPost CallFunction(Log::__create_stream, <frame>, (X509::LOG, [columns=<no value description>, ev=X509::log_x509, path=x509])) -> <no result> 0.000000 MetaHookPost CallFunction(Log::__create_stream, <frame>, (X509::LOG, [columns=<no value description>, ev=X509::log_x509, path=x509])) -> <no result>
0.000000 MetaHookPost CallFunction(Log::__create_stream, <frame>, (mysql::LOG, [columns=<no value description>, ev=MySQL::log_mysql, path=mysql])) -> <no result> 0.000000 MetaHookPost CallFunction(Log::__create_stream, <frame>, (mysql::LOG, [columns=<no value description>, ev=MySQL::log_mysql, path=mysql])) -> <no result>
0.000000 MetaHookPost CallFunction(Log::__write, <frame>, (PacketFilter::LOG, [ts=1535159114.845209, node=bro, filter=ip or not ip, init=T, success=T])) -> <no result> 0.000000 MetaHookPost CallFunction(Log::__write, <frame>, (PacketFilter::LOG, [ts=1535402742.587686, node=bro, filter=ip or not ip, init=T, success=T])) -> <no result>
0.000000 MetaHookPost CallFunction(Log::add_default_filter, <frame>, (Broker::LOG)) -> <no result> 0.000000 MetaHookPost CallFunction(Log::add_default_filter, <frame>, (Broker::LOG)) -> <no result>
0.000000 MetaHookPost CallFunction(Log::add_default_filter, <frame>, (Cluster::LOG)) -> <no result> 0.000000 MetaHookPost CallFunction(Log::add_default_filter, <frame>, (Cluster::LOG)) -> <no result>
0.000000 MetaHookPost CallFunction(Log::add_default_filter, <frame>, (Config::LOG)) -> <no result> 0.000000 MetaHookPost CallFunction(Log::add_default_filter, <frame>, (Config::LOG)) -> <no result>
@ -459,7 +459,7 @@
0.000000 MetaHookPost CallFunction(Log::create_stream, <frame>, (Weird::LOG, [columns=<no value description>, ev=Weird::log_weird, path=weird])) -> <no result> 0.000000 MetaHookPost CallFunction(Log::create_stream, <frame>, (Weird::LOG, [columns=<no value description>, ev=Weird::log_weird, path=weird])) -> <no result>
0.000000 MetaHookPost CallFunction(Log::create_stream, <frame>, (X509::LOG, [columns=<no value description>, ev=X509::log_x509, path=x509])) -> <no result> 0.000000 MetaHookPost CallFunction(Log::create_stream, <frame>, (X509::LOG, [columns=<no value description>, ev=X509::log_x509, path=x509])) -> <no result>
0.000000 MetaHookPost CallFunction(Log::create_stream, <frame>, (mysql::LOG, [columns=<no value description>, ev=MySQL::log_mysql, path=mysql])) -> <no result> 0.000000 MetaHookPost CallFunction(Log::create_stream, <frame>, (mysql::LOG, [columns=<no value description>, ev=MySQL::log_mysql, path=mysql])) -> <no result>
0.000000 MetaHookPost CallFunction(Log::write, <frame>, (PacketFilter::LOG, [ts=1535159114.845209, node=bro, filter=ip or not ip, init=T, success=T])) -> <no result> 0.000000 MetaHookPost CallFunction(Log::write, <frame>, (PacketFilter::LOG, [ts=1535402742.587686, node=bro, filter=ip or not ip, init=T, success=T])) -> <no result>
0.000000 MetaHookPost CallFunction(NetControl::check_plugins, <frame>, ()) -> <no result> 0.000000 MetaHookPost CallFunction(NetControl::check_plugins, <frame>, ()) -> <no result>
0.000000 MetaHookPost CallFunction(NetControl::init, <null>, ()) -> <no result> 0.000000 MetaHookPost CallFunction(NetControl::init, <null>, ()) -> <no result>
0.000000 MetaHookPost CallFunction(Notice::want_pp, <frame>, ()) -> <no result> 0.000000 MetaHookPost CallFunction(Notice::want_pp, <frame>, ()) -> <no result>
@ -1022,7 +1022,7 @@
0.000000 MetaHookPre CallFunction(Analyzer::register_for_ports, <frame>, (Analyzer::ANALYZER_XMPP, {5222<...>/tcp})) 0.000000 MetaHookPre CallFunction(Analyzer::register_for_ports, <frame>, (Analyzer::ANALYZER_XMPP, {5222<...>/tcp}))
0.000000 MetaHookPre CallFunction(Cluster::is_enabled, <frame>, ()) 0.000000 MetaHookPre CallFunction(Cluster::is_enabled, <frame>, ())
0.000000 MetaHookPre CallFunction(Cluster::is_enabled, <null>, ()) 0.000000 MetaHookPre CallFunction(Cluster::is_enabled, <null>, ())
0.000000 MetaHookPre CallFunction(Cluster::local_node_type, <frame>, ()) 0.000000 MetaHookPre CallFunction(Cluster::local_node_type, <null>, ())
0.000000 MetaHookPre CallFunction(Cluster::register_pool, <frame>, ([topic=bro<...>/logger, node_type=Cluster::LOGGER, max_nodes=<uninitialized>, exclusive=F])) 0.000000 MetaHookPre CallFunction(Cluster::register_pool, <frame>, ([topic=bro<...>/logger, node_type=Cluster::LOGGER, max_nodes=<uninitialized>, exclusive=F]))
0.000000 MetaHookPre CallFunction(Cluster::register_pool, <frame>, ([topic=bro<...>/proxy, node_type=Cluster::PROXY, max_nodes=<uninitialized>, exclusive=F])) 0.000000 MetaHookPre CallFunction(Cluster::register_pool, <frame>, ([topic=bro<...>/proxy, node_type=Cluster::PROXY, max_nodes=<uninitialized>, exclusive=F]))
0.000000 MetaHookPre CallFunction(Cluster::register_pool, <frame>, ([topic=bro<...>/worker, node_type=Cluster::WORKER, max_nodes=<uninitialized>, exclusive=F])) 0.000000 MetaHookPre CallFunction(Cluster::register_pool, <frame>, ([topic=bro<...>/worker, node_type=Cluster::WORKER, max_nodes=<uninitialized>, exclusive=F]))
@ -1139,7 +1139,7 @@
0.000000 MetaHookPre CallFunction(Log::__create_stream, <frame>, (Weird::LOG, [columns=<no value description>, ev=Weird::log_weird, path=weird])) 0.000000 MetaHookPre CallFunction(Log::__create_stream, <frame>, (Weird::LOG, [columns=<no value description>, ev=Weird::log_weird, path=weird]))
0.000000 MetaHookPre CallFunction(Log::__create_stream, <frame>, (X509::LOG, [columns=<no value description>, ev=X509::log_x509, path=x509])) 0.000000 MetaHookPre CallFunction(Log::__create_stream, <frame>, (X509::LOG, [columns=<no value description>, ev=X509::log_x509, path=x509]))
0.000000 MetaHookPre CallFunction(Log::__create_stream, <frame>, (mysql::LOG, [columns=<no value description>, ev=MySQL::log_mysql, path=mysql])) 0.000000 MetaHookPre CallFunction(Log::__create_stream, <frame>, (mysql::LOG, [columns=<no value description>, ev=MySQL::log_mysql, path=mysql]))
0.000000 MetaHookPre CallFunction(Log::__write, <frame>, (PacketFilter::LOG, [ts=1535159114.845209, node=bro, filter=ip or not ip, init=T, success=T])) 0.000000 MetaHookPre CallFunction(Log::__write, <frame>, (PacketFilter::LOG, [ts=1535402742.587686, node=bro, filter=ip or not ip, init=T, success=T]))
0.000000 MetaHookPre CallFunction(Log::add_default_filter, <frame>, (Broker::LOG)) 0.000000 MetaHookPre CallFunction(Log::add_default_filter, <frame>, (Broker::LOG))
0.000000 MetaHookPre CallFunction(Log::add_default_filter, <frame>, (Cluster::LOG)) 0.000000 MetaHookPre CallFunction(Log::add_default_filter, <frame>, (Cluster::LOG))
0.000000 MetaHookPre CallFunction(Log::add_default_filter, <frame>, (Config::LOG)) 0.000000 MetaHookPre CallFunction(Log::add_default_filter, <frame>, (Config::LOG))
@ -1324,7 +1324,7 @@
0.000000 MetaHookPre CallFunction(Log::create_stream, <frame>, (Weird::LOG, [columns=<no value description>, ev=Weird::log_weird, path=weird])) 0.000000 MetaHookPre CallFunction(Log::create_stream, <frame>, (Weird::LOG, [columns=<no value description>, ev=Weird::log_weird, path=weird]))
0.000000 MetaHookPre CallFunction(Log::create_stream, <frame>, (X509::LOG, [columns=<no value description>, ev=X509::log_x509, path=x509])) 0.000000 MetaHookPre CallFunction(Log::create_stream, <frame>, (X509::LOG, [columns=<no value description>, ev=X509::log_x509, path=x509]))
0.000000 MetaHookPre CallFunction(Log::create_stream, <frame>, (mysql::LOG, [columns=<no value description>, ev=MySQL::log_mysql, path=mysql])) 0.000000 MetaHookPre CallFunction(Log::create_stream, <frame>, (mysql::LOG, [columns=<no value description>, ev=MySQL::log_mysql, path=mysql]))
0.000000 MetaHookPre CallFunction(Log::write, <frame>, (PacketFilter::LOG, [ts=1535159114.845209, node=bro, filter=ip or not ip, init=T, success=T])) 0.000000 MetaHookPre CallFunction(Log::write, <frame>, (PacketFilter::LOG, [ts=1535402742.587686, node=bro, filter=ip or not ip, init=T, success=T]))
0.000000 MetaHookPre CallFunction(NetControl::check_plugins, <frame>, ()) 0.000000 MetaHookPre CallFunction(NetControl::check_plugins, <frame>, ())
0.000000 MetaHookPre CallFunction(NetControl::init, <null>, ()) 0.000000 MetaHookPre CallFunction(NetControl::init, <null>, ())
0.000000 MetaHookPre CallFunction(Notice::want_pp, <frame>, ()) 0.000000 MetaHookPre CallFunction(Notice::want_pp, <frame>, ())
@ -2003,7 +2003,7 @@
0.000000 | HookCallFunction Log::__create_stream(Weird::LOG, [columns=<no value description>, ev=Weird::log_weird, path=weird]) 0.000000 | HookCallFunction Log::__create_stream(Weird::LOG, [columns=<no value description>, ev=Weird::log_weird, path=weird])
0.000000 | HookCallFunction Log::__create_stream(X509::LOG, [columns=<no value description>, ev=X509::log_x509, path=x509]) 0.000000 | HookCallFunction Log::__create_stream(X509::LOG, [columns=<no value description>, ev=X509::log_x509, path=x509])
0.000000 | HookCallFunction Log::__create_stream(mysql::LOG, [columns=<no value description>, ev=MySQL::log_mysql, path=mysql]) 0.000000 | HookCallFunction Log::__create_stream(mysql::LOG, [columns=<no value description>, ev=MySQL::log_mysql, path=mysql])
0.000000 | HookCallFunction Log::__write(PacketFilter::LOG, [ts=1535159114.845209, node=bro, filter=ip or not ip, init=T, success=T]) 0.000000 | HookCallFunction Log::__write(PacketFilter::LOG, [ts=1535402742.587686, node=bro, filter=ip or not ip, init=T, success=T])
0.000000 | HookCallFunction Log::add_default_filter(Broker::LOG) 0.000000 | HookCallFunction Log::add_default_filter(Broker::LOG)
0.000000 | HookCallFunction Log::add_default_filter(Cluster::LOG) 0.000000 | HookCallFunction Log::add_default_filter(Cluster::LOG)
0.000000 | HookCallFunction Log::add_default_filter(Config::LOG) 0.000000 | HookCallFunction Log::add_default_filter(Config::LOG)
@ -2188,7 +2188,7 @@
0.000000 | HookCallFunction Log::create_stream(Weird::LOG, [columns=<no value description>, ev=Weird::log_weird, path=weird]) 0.000000 | HookCallFunction Log::create_stream(Weird::LOG, [columns=<no value description>, ev=Weird::log_weird, path=weird])
0.000000 | HookCallFunction Log::create_stream(X509::LOG, [columns=<no value description>, ev=X509::log_x509, path=x509]) 0.000000 | HookCallFunction Log::create_stream(X509::LOG, [columns=<no value description>, ev=X509::log_x509, path=x509])
0.000000 | HookCallFunction Log::create_stream(mysql::LOG, [columns=<no value description>, ev=MySQL::log_mysql, path=mysql]) 0.000000 | HookCallFunction Log::create_stream(mysql::LOG, [columns=<no value description>, ev=MySQL::log_mysql, path=mysql])
0.000000 | HookCallFunction Log::write(PacketFilter::LOG, [ts=1535159114.845209, node=bro, filter=ip or not ip, init=T, success=T]) 0.000000 | HookCallFunction Log::write(PacketFilter::LOG, [ts=1535402742.587686, node=bro, filter=ip or not ip, init=T, success=T])
0.000000 | HookCallFunction NetControl::check_plugins() 0.000000 | HookCallFunction NetControl::check_plugins()
0.000000 | HookCallFunction NetControl::init() 0.000000 | HookCallFunction NetControl::init()
0.000000 | HookCallFunction Notice::want_pp() 0.000000 | HookCallFunction Notice::want_pp()
@ -2588,7 +2588,7 @@
0.000000 | HookLoadFile base<...>/x509 0.000000 | HookLoadFile base<...>/x509
0.000000 | HookLoadFile base<...>/xmpp 0.000000 | HookLoadFile base<...>/xmpp
0.000000 | HookLogInit packet_filter 1/1 {ts (time), node (string), filter (string), init (bool), success (bool)} 0.000000 | HookLogInit packet_filter 1/1 {ts (time), node (string), filter (string), init (bool), success (bool)}
0.000000 | HookLogWrite packet_filter [ts=1535159114.845209, node=bro, filter=ip or not ip, init=T, success=T] 0.000000 | HookLogWrite packet_filter [ts=1535402742.587686, node=bro, filter=ip or not ip, init=T, success=T]
0.000000 | HookQueueEvent NetControl::init() 0.000000 | HookQueueEvent NetControl::init()
0.000000 | HookQueueEvent bro_init() 0.000000 | HookQueueEvent bro_init()
0.000000 | HookQueueEvent filter_change_tracking() 0.000000 | HookQueueEvent filter_change_tracking()

View file

@ -1,119 +0,0 @@
# @TEST-SERIALIZE: comm
#
# @TEST-EXEC: btest-bg-run three "bro -B broker -b ../three.bro >three.out"
# @TEST-EXEC: btest-bg-run two "bro -B broker -b ../two.bro >two.out"
# @TEST-EXEC: btest-bg-run one "bro -B broker -b ../one.bro >one.out"
#
# @TEST-EXEC: btest-bg-wait 20
# @TEST-EXEC: btest-diff one/one.out
# @TEST-EXEC: btest-diff two/two.out
# @TEST-EXEC: btest-diff three/three.out
@TEST-START-FILE one.bro
redef exit_only_after_terminate = T;
event my_event(s: string)
{
print "got my_event", s;
}
event ready_event()
{
print "got ready event";
Broker::publish_and_relay("bro/event/pre-relay", "bro/event/post-relay",
my_event, "hello world");
}
event bro_init()
{
Broker::subscribe("bro/event/ready");
Broker::peer("127.0.0.1", 10000/tcp);
}
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
{
print fmt("sender added peer: endpoint=%s msg=%s",
endpoint$network$address, msg);
}
event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
{
print fmt("sender lost peer: endpoint=%s msg=%s",
endpoint$network$address, msg);
terminate();
}
@TEST-END-FILE
@TEST-START-FILE two.bro
redef exit_only_after_terminate = T;
global peers_added = 0;
event my_event(s: string)
{
print "got my_event", s;
}
event ready_event()
{
}
event bro_init()
{
Broker::subscribe("bro/event/pre-relay");
Broker::listen("127.0.0.1", 10000/tcp);
Broker::peer("127.0.0.1", 9999/tcp);
}
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
{
print fmt("receiver added peer: endpoint=%s msg=%s", endpoint$network$address, msg);
++peers_added;
if ( peers_added == 2 )
{
print "sending ready event";
Broker::publish("bro/event/ready", ready_event);
}
}
event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
{
print fmt("receiver lost peer: endpoint=%s msg=%s", endpoint$network$address, msg);
terminate();
}
@TEST-END-FILE
@TEST-START-FILE three.bro
redef exit_only_after_terminate = T;
event my_event(s: string)
{
print "got my_event", s;
terminate();
}
event bro_init()
{
Broker::subscribe("bro/event/post-relay");
Broker::listen("127.0.0.1", 9999/tcp);
}
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
{
print fmt("receiver added peer: endpoint=%s msg=%s", endpoint$network$address, msg);
}
event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
{
print fmt("receiver lost peer: endpoint=%s msg=%s", endpoint$network$address, msg);
}
@TEST-END-FILE

View file

@ -1,120 +0,0 @@
# @TEST-SERIALIZE: comm
#
# @TEST-EXEC: btest-bg-run three "bro -B broker -b ../three.bro >three.out"
# @TEST-EXEC: btest-bg-run two "bro -B broker -b ../two.bro >two.out"
# @TEST-EXEC: btest-bg-run one "bro -B broker -b ../one.bro >one.out"
#
# @TEST-EXEC: btest-bg-wait 20
# @TEST-EXEC: btest-diff one/one.out
# @TEST-EXEC: btest-diff two/two.out
# @TEST-EXEC: btest-diff three/three.out
@TEST-START-FILE one.bro
redef exit_only_after_terminate = T;
event my_event(s: string)
{
print "got my_event", s;
}
event ready_event()
{
print "got ready event";
Broker::relay("bro/event/pre-relay", "bro/event/post-relay", my_event,
"hello world");
}
event bro_init()
{
Broker::subscribe("bro/event/ready");
Broker::peer("127.0.0.1", 10000/tcp);
}
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
{
print fmt("sender added peer: endpoint=%s msg=%s",
endpoint$network$address, msg);
}
event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
{
print fmt("sender lost peer: endpoint=%s msg=%s",
endpoint$network$address, msg);
terminate();
}
@TEST-END-FILE
@TEST-START-FILE two.bro
redef exit_only_after_terminate = T;
global peers_added = 0;
event my_event(s: string)
{
print "got my_event", s;
terminate();
}
event ready_event()
{
}
event bro_init()
{
Broker::subscribe("bro/event/pre-relay");
Broker::listen("127.0.0.1", 10000/tcp);
Broker::peer("127.0.0.1", 9999/tcp);
}
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
{
print fmt("receiver added peer: endpoint=%s msg=%s", endpoint$network$address, msg);
++peers_added;
if ( peers_added == 2 )
{
print "sending ready event";
Broker::publish("bro/event/ready", ready_event);
}
}
event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
{
print fmt("receiver lost peer: endpoint=%s msg=%s", endpoint$network$address, msg);
terminate();
}
@TEST-END-FILE
@TEST-START-FILE three.bro
redef exit_only_after_terminate = T;
event my_event(s: string)
{
print "got my_event", s;
terminate();
}
event bro_init()
{
Broker::subscribe("bro/event/post-relay");
Broker::listen("127.0.0.1", 9999/tcp);
}
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
{
print fmt("receiver added peer: endpoint=%s msg=%s", endpoint$network$address, msg);
}
event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
{
print fmt("receiver lost peer: endpoint=%s msg=%s", endpoint$network$address, msg);
}
@TEST-END-FILE