Add Broker::forward() function

This enables explicit forwarding of events matching a given topic
prefix.  Even if a receiving node has an event handler, it will not
be raised if the event was sent along a topic that matches a previous
call to Broker::forward().
This commit is contained in:
Jon Siwek 2018-08-28 19:42:22 -05:00
parent 850030822d
commit 1dcead93bf
10 changed files with 214 additions and 15 deletions

View file

@ -2,6 +2,7 @@
#include <broker/broker.hh>
#include <broker/bro.hh>
#include <cstdio>
#include <cstring>
#include <unistd.h>
#include "Manager.h"
@ -787,8 +788,28 @@ bool Manager::Subscribe(const string& topic_prefix)
return true;
}
bool Manager::Forward(string topic_prefix)
{
for ( auto i = 0u; i < forwarded_prefixes.size(); ++i )
if ( forwarded_prefixes[i] == topic_prefix )
return false;
DBG_LOG(DBG_BROKER, "Forwarding topic prefix %s", topic_prefix.c_str());
Subscribe(topic_prefix);
forwarded_prefixes.emplace_back(std::move(topic_prefix));
return true;
}
bool Manager::Unsubscribe(const string& topic_prefix)
{
for ( auto i = 0u; i < forwarded_prefixes.size(); ++i )
if ( forwarded_prefixes[i] == topic_prefix )
{
DBG_LOG(DBG_BROKER, "Unforwading topic prefix %s", topic_prefix.c_str());
forwarded_prefixes.erase(forwarded_prefixes.begin() + i);
break;
}
DBG_LOG(DBG_BROKER, "Unsubscribing from topic prefix %s", topic_prefix.c_str());
bstate->subscriber.remove_topic(topic_prefix, ! after_bro_init);
return true;
@ -828,11 +849,11 @@ double Manager::NextTimestamp(double* local_network_time)
return -1;
}
void Manager::DispatchMessage(broker::data msg)
void Manager::DispatchMessage(const broker::topic& topic, broker::data msg)
{
switch ( broker::bro::Message::type(msg) ) {
case broker::bro::Message::Type::Event:
ProcessEvent(std::move(msg));
ProcessEvent(topic, std::move(msg));
break;
case broker::bro::Message::Type::LogCreate:
@ -852,7 +873,7 @@ void Manager::DispatchMessage(broker::data msg)
broker::bro::Batch batch(std::move(msg));
for ( auto& i : batch.batch() )
DispatchMessage(std::move(i));
DispatchMessage(topic, std::move(i));
break;
}
@ -900,7 +921,7 @@ void Manager::Process()
try
{
DispatchMessage(std::move(msg));
DispatchMessage(topic, std::move(msg));
}
catch ( std::runtime_error& e )
{
@ -923,8 +944,11 @@ void Manager::Process()
}
void Manager::ProcessEvent(std::string name, broker::vector args)
void Manager::ProcessEvent(const broker::topic& topic, broker::bro::Event ev)
{
auto name = std::move(ev.name());
auto args = std::move(ev.args());
DBG_LOG(DBG_BROKER, "Process event: %s %s",
name.data(), RenderMessage(args).data());
++statistics.num_events_incoming;
@ -933,6 +957,23 @@ void Manager::ProcessEvent(std::string name, broker::vector args)
if ( ! handler )
return;
auto& topic_string = topic.string();
for ( auto i = 0u; i < forwarded_prefixes.size(); ++i )
{
auto& p = forwarded_prefixes[i];
if ( p.size() > topic_string.size() )
continue;
if ( strncmp(p.data(), topic_string.data(), p.size()) != 0 )
continue;
DBG_LOG(DBG_BROKER, "Skip processing of forwarded event: %s %s",
name.data(), RenderMessage(args).data());
return;
}
auto arg_types = handler->FType(false)->ArgTypes()->Types();
if ( static_cast<size_t>(arg_types->length()) != args.size() )
@ -969,11 +1010,6 @@ void Manager::ProcessEvent(std::string name, broker::vector args)
delete_vals(vl);
}
void Manager::ProcessEvent(broker::bro::Event ev)
{
ProcessEvent(std::move(ev.name()), std::move(ev.args()));
}
bool bro_broker::Manager::ProcessLogCreate(broker::bro::LogCreate lc)
{
DBG_LOG(DBG_BROKER, "Received log-create: %s", RenderMessage(lc).c_str());