Add Broker::publish_and_relay BIF

Like Broker::relay, except the relaying-node also calls event handlers.
This commit is contained in:
Jon Siwek 2018-05-31 15:26:22 -05:00
parent 08c64112f0
commit 224ee790e2
8 changed files with 270 additions and 26 deletions

View file

@ -341,7 +341,8 @@ bool Manager::PublishEvent(string topic, RecordVal* args)
bool Manager::RelayEvent(std::string first_topic,
broker::set relay_topics,
std::string name,
broker::vector args)
broker::vector args,
bool handle_on_relayer)
{
if ( bstate->endpoint.is_shutdown() )
return true;
@ -349,18 +350,33 @@ bool Manager::RelayEvent(std::string first_topic,
if ( ! bstate->endpoint.peers().size() )
return true;
DBG_LOG(DBG_BROKER, "Publishing relay event: %s",
DBG_LOG(DBG_BROKER, "Publishing %s-relay event: %s",
handle_on_relayer ? "handle" : "",
RenderEvent(first_topic, name, args).c_str());
broker::bro::RelayEvent msg(std::move(relay_topics), std::move(name),
std::move(args));
bstate->endpoint.publish(std::move(first_topic), std::move(msg));
if ( handle_on_relayer )
{
broker::bro::HandleAndRelayEvent msg(std::move(relay_topics),
std::move(name),
std::move(args));
bstate->endpoint.publish(std::move(first_topic), std::move(msg));
}
else
{
broker::bro::RelayEvent msg(std::move(relay_topics),
std::move(name),
std::move(args));
bstate->endpoint.publish(std::move(first_topic), std::move(msg));
}
++statistics.num_events_outgoing;
return true;
}
bool Manager::RelayEvent(std::string first_topic,
std::set<std::string> relay_topics,
RecordVal* args)
RecordVal* args,
bool handle_on_relayer)
{
if ( bstate->endpoint.is_shutdown() )
return true;
@ -389,7 +405,7 @@ bool Manager::RelayEvent(std::string first_topic,
topic_set.emplace(std::move(t));
return RelayEvent(first_topic, std::move(topic_set), event_name,
std::move(xs));
std::move(xs), handle_on_relayer);
}
bool Manager::PublishIdentifier(std::string topic, std::string id)
@ -820,6 +836,10 @@ void Manager::DispatchMessage(broker::data msg)
ProcessRelayEvent(std::move(msg));
break;
case broker::bro::Message::Type::HandleAndRelayEvent:
ProcessHandleAndRelayEvent(std::move(msg));
break;
case broker::bro::Message::Type::LogCreate:
ProcessLogCreate(std::move(msg));
break;
@ -907,23 +927,23 @@ void Manager::Process()
SetIdle(! had_input);
}
void Manager::ProcessEvent(broker::bro::Event ev)
void Manager::ProcessEvent(std::string name, broker::vector args)
{
DBG_LOG(DBG_BROKER, "Received event: %s", RenderMessage(ev).c_str());
DBG_LOG(DBG_BROKER, "Process event: %s %s",
name.data(), RenderMessage(args).data());
++statistics.num_events_incoming;
auto handler = event_registry->Lookup(name.data());
auto handler = event_registry->Lookup(ev.name().c_str());
if ( ! handler )
return;
auto& args = ev.args();
auto arg_types = handler->FType(false)->ArgTypes()->Types();
if ( static_cast<size_t>(arg_types->length()) != args.size() )
{
reporter->Warning("got event message '%s' with invalid # of args,"
" got %zd, expected %d", ev.name().data(), args.size(),
" got %zd, expected %d", name.data(), args.size(),
arg_types->length());
return;
}
@ -942,7 +962,7 @@ void Manager::ProcessEvent(broker::bro::Event ev)
{
reporter->Warning("failed to convert remote event '%s' arg #%d,"
" got %s, expected %s",
ev.name().data(), i, got_type,
name.data(), i, got_type,
type_name(expected_type->Tag()));
break;
}
@ -954,6 +974,11 @@ void Manager::ProcessEvent(broker::bro::Event ev)
delete_vals(vl);
}
void Manager::ProcessEvent(broker::bro::Event ev)
{
ProcessEvent(std::move(ev.name()), std::move(ev.args()));
}
void Manager::ProcessRelayEvent(broker::bro::RelayEvent ev)
{
DBG_LOG(DBG_BROKER, "Received relay event: %s", RenderMessage(ev).c_str());
@ -965,6 +990,18 @@ void Manager::ProcessRelayEvent(broker::bro::RelayEvent ev)
std::move(ev.args()));
}
void Manager::ProcessHandleAndRelayEvent(broker::bro::HandleAndRelayEvent ev)
{
DBG_LOG(DBG_BROKER, "Received handle-relay event: %s",
RenderMessage(ev).c_str());
ProcessEvent(ev.name(), ev.args());
for ( auto& t : ev.topics() )
PublishEvent(std::move(broker::get<std::string>(t)),
std::move(ev.name()),
std::move(ev.args()));
}
bool bro_broker::Manager::ProcessLogCreate(broker::bro::LogCreate lc)
{
DBG_LOG(DBG_BROKER, "Received log-create: %s", RenderMessage(lc).c_str());