broker integration: add auto sending remote events

i.e. ability to toggle whether all local dispatches of an event also
generate a remote event message to peers.
This commit is contained in:
Jon Siwek 2015-01-22 11:29:53 -06:00
parent 7e563b7275
commit 5df71ddc91
5 changed files with 150 additions and 2 deletions

View file

@ -5,6 +5,11 @@
#include "RemoteSerializer.h"
#include "NetVar.h"
#ifdef ENABLE_BROKER
#include "comm/Manager.h"
#include "comm/Data.h"
#endif
EventHandler::EventHandler(const char* arg_name)
{
name = copy_string(arg_name);
@ -26,7 +31,12 @@ EventHandler::operator bool() const
{
return enabled && ((local && local->HasBodies())
|| receivers.length()
|| generate_always);
|| generate_always
#ifdef ENABLE_BROKER
|| ! auto_remote_send.empty()
// TODO: and require a subscriber interested in a topic or unsolicited flags?
#endif
);
}
FuncType* EventHandler::FType()
@ -73,6 +83,44 @@ void EventHandler::Call(val_list* vl, bool no_remote)
SerialInfo info(remote_serializer);
remote_serializer->SendCall(&info, receivers[i], name, vl);
}
#ifdef ENABLE_BROKER
if ( ! auto_remote_send.empty() )
{
// TODO: also short-circuit based on interested subscribers/flags?
broker::message msg;
msg.reserve(vl->length() + 1);
msg.emplace_back(Name());
bool valid_args = true;
for ( auto i = 0u; i < vl->length(); ++i )
{
auto opt_data = comm::val_to_data((*vl)[i]);
if ( opt_data )
msg.emplace_back(move(*opt_data));
else
{
valid_args = false;
auto_remote_send.clear();
reporter->Error("failed auto-remote event '%s', disabled",
Name());
break;
}
}
if ( valid_args )
for ( auto it = auto_remote_send.begin();
it != auto_remote_send.end(); ++it )
{
if ( std::next(it) == auto_remote_send.end() )
comm_mgr->Event(it->first, move(msg), it->second);
else
comm_mgr->Event(it->first, msg, it->second);
}
}
#endif
}
if ( local )

View file

@ -4,7 +4,8 @@
#define EVENTHANDLER
#include <assert.h>
#include <map>
#include <string>
#include "List.h"
#include "BroList.h"
@ -28,6 +29,18 @@ public:
void AddRemoteHandler(SourceID peer);
void RemoveRemoteHandler(SourceID peer);
#ifdef ENABLE_BROKER
void AutoRemote(std::string topic, int flags)
{
auto_remote_send[std::move(topic)] = flags;
}
void AutoRemoteStop(const std::string& topic)
{
auto_remote_send.erase(topic);
}
#endif
void Call(val_list* vl, bool no_remote = false);
// Returns true if there is at least one local or remote handler.
@ -67,6 +80,10 @@ private:
declare(List, SourceID);
typedef List(SourceID) receiver_list;
receiver_list receivers;
#ifdef ENABLE_BROKER
std::map<std::string, int> auto_remote_send; // topic -> flags
#endif
};
// Encapsulates a ptr to an event handler to overload the boolean operator.

View file

@ -107,6 +107,12 @@ bool comm::Manager::Print(string topic, string msg, const Val* flags)
return true;
}
bool comm::Manager::Event(std::string topic, broker::message msg, int flags)
{
endpoint->send(move(topic), move(msg), flags);
return true;
}
bool comm::Manager::Event(std::string topic, const RecordVal* args,
const Val* flags)
{
@ -130,6 +136,65 @@ bool comm::Manager::Event(std::string topic, const RecordVal* args,
return true;
}
bool comm::Manager::AutoEvent(string topic, const Val* event, const Val* flags)
{
if ( event->Type()->Tag() != TYPE_FUNC )
{
reporter->Error("Comm::auto_event must operate on an event");
return false;
}
auto event_val = event->AsFunc();
if ( event_val->Flavor() != FUNC_FLAVOR_EVENT )
{
reporter->Error("Comm::auto_event must operate on an event");
return false;
}
auto handler = event_registry->Lookup(event_val->Name());
if ( ! handler )
{
reporter->Error("Comm::auto_event failed to lookup event '%s'",
event_val->Name());
return false;
}
handler->AutoRemote(move(topic), get_flags(flags));
return true;
}
bool comm::Manager::AutoEventStop(const string& topic, const Val* event)
{
if ( event->Type()->Tag() != TYPE_FUNC )
{
reporter->Error("Comm::auto_event_stop must operate on an event");
return false;
}
auto event_val = event->AsFunc();
if ( event_val->Flavor() != FUNC_FLAVOR_EVENT )
{
reporter->Error("Comm::auto_event_stop must operate on an event");
return false;
}
auto handler = event_registry->Lookup(event_val->Name());
if ( ! handler )
{
reporter->Error("Comm::auto_event_stop failed to lookup event '%s'",
event_val->Name());
return false;
}
handler->AutoRemoteStop(topic);
return true;
}
RecordVal* comm::Manager::MakeEventArgs(const val_list* args)
{
auto rval = new RecordVal(BifType::Record::Comm::EventArgs);

View file

@ -31,8 +31,13 @@ public:
bool Print(std::string topic, std::string msg, const Val* flags);
bool Event(std::string topic, broker::message msg, int flags);
bool Event(std::string topic, const RecordVal* args, const Val* flags);
bool AutoEvent(std::string topic, const Val* event, const Val* flags);
bool AutoEventStop(const std::string& topic, const Val* event);
RecordVal* MakeEventArgs(const val_list* args);
bool SubscribeToPrints(std::string topic_prefix);

View file

@ -100,6 +100,19 @@ function Comm::event%(topic: string, args: Comm::EventArgs,
return new Val(rval, TYPE_BOOL);
%}
function Comm::auto_event%(topic: string, ev: any,
flags: SendFlags &default = SendFlags()%): bool
%{
auto rval = comm_mgr->AutoEvent(topic->CheckString(), ev, flags);
return new Val(rval, TYPE_BOOL);
%}
function Comm::auto_event_stop%(topic: string, ev: any%): bool
%{
auto rval = comm_mgr->AutoEventStop(topic->CheckString(), ev);
return new Val(rval, TYPE_BOOL);
%}
function Comm::subscribe_to_events%(topic_prefix: string%): bool
%{
auto rval = comm_mgr->SubscribeToEvents(topic_prefix->CheckString());