diff --git a/src/EventHandler.cc b/src/EventHandler.cc index 0f25d63ba8..d623f43b66 100644 --- a/src/EventHandler.cc +++ b/src/EventHandler.cc @@ -5,6 +5,11 @@ #include "RemoteSerializer.h" #include "NetVar.h" +#ifdef ENABLE_BROKER +#include "comm/Manager.h" +#include "comm/Data.h" +#endif + EventHandler::EventHandler(const char* arg_name) { name = copy_string(arg_name); @@ -26,7 +31,12 @@ EventHandler::operator bool() const { return enabled && ((local && local->HasBodies()) || receivers.length() - || generate_always); + || generate_always +#ifdef ENABLE_BROKER + || ! auto_remote_send.empty() + // TODO: and require a subscriber interested in a topic or unsolicited flags? +#endif + ); } FuncType* EventHandler::FType() @@ -73,6 +83,44 @@ void EventHandler::Call(val_list* vl, bool no_remote) SerialInfo info(remote_serializer); remote_serializer->SendCall(&info, receivers[i], name, vl); } + +#ifdef ENABLE_BROKER + + if ( ! auto_remote_send.empty() ) + { + // TODO: also short-circuit based on interested subscribers/flags? + broker::message msg; + msg.reserve(vl->length() + 1); + msg.emplace_back(Name()); + bool valid_args = true; + + for ( auto i = 0u; i < vl->length(); ++i ) + { + auto opt_data = comm::val_to_data((*vl)[i]); + + if ( opt_data ) + msg.emplace_back(move(*opt_data)); + else + { + valid_args = false; + auto_remote_send.clear(); + reporter->Error("failed auto-remote event '%s', disabled", + Name()); + break; + } + } + + if ( valid_args ) + for ( auto it = auto_remote_send.begin(); + it != auto_remote_send.end(); ++it ) + { + if ( std::next(it) == auto_remote_send.end() ) + comm_mgr->Event(it->first, move(msg), it->second); + else + comm_mgr->Event(it->first, msg, it->second); + } + } +#endif } if ( local ) diff --git a/src/EventHandler.h b/src/EventHandler.h index 55ac33cffd..7729e2af27 100644 --- a/src/EventHandler.h +++ b/src/EventHandler.h @@ -4,7 +4,8 @@ #define EVENTHANDLER #include - +#include +#include #include "List.h" #include "BroList.h" @@ -28,6 +29,18 @@ public: void AddRemoteHandler(SourceID peer); void RemoveRemoteHandler(SourceID peer); +#ifdef ENABLE_BROKER + void AutoRemote(std::string topic, int flags) + { + auto_remote_send[std::move(topic)] = flags; + } + + void AutoRemoteStop(const std::string& topic) + { + auto_remote_send.erase(topic); + } +#endif + void Call(val_list* vl, bool no_remote = false); // Returns true if there is at least one local or remote handler. @@ -67,6 +80,10 @@ private: declare(List, SourceID); typedef List(SourceID) receiver_list; receiver_list receivers; + +#ifdef ENABLE_BROKER + std::map auto_remote_send; // topic -> flags +#endif }; // Encapsulates a ptr to an event handler to overload the boolean operator. diff --git a/src/comm/Manager.cc b/src/comm/Manager.cc index b4f118706a..d803d64ae7 100644 --- a/src/comm/Manager.cc +++ b/src/comm/Manager.cc @@ -107,6 +107,12 @@ bool comm::Manager::Print(string topic, string msg, const Val* flags) return true; } +bool comm::Manager::Event(std::string topic, broker::message msg, int flags) + { + endpoint->send(move(topic), move(msg), flags); + return true; + } + bool comm::Manager::Event(std::string topic, const RecordVal* args, const Val* flags) { @@ -130,6 +136,65 @@ bool comm::Manager::Event(std::string topic, const RecordVal* args, return true; } +bool comm::Manager::AutoEvent(string topic, const Val* event, const Val* flags) + { + if ( event->Type()->Tag() != TYPE_FUNC ) + { + reporter->Error("Comm::auto_event must operate on an event"); + return false; + } + + auto event_val = event->AsFunc(); + + if ( event_val->Flavor() != FUNC_FLAVOR_EVENT ) + { + reporter->Error("Comm::auto_event must operate on an event"); + return false; + } + + auto handler = event_registry->Lookup(event_val->Name()); + + if ( ! handler ) + { + reporter->Error("Comm::auto_event failed to lookup event '%s'", + event_val->Name()); + return false; + } + + handler->AutoRemote(move(topic), get_flags(flags)); + return true; + } + +bool comm::Manager::AutoEventStop(const string& topic, const Val* event) + { + if ( event->Type()->Tag() != TYPE_FUNC ) + { + reporter->Error("Comm::auto_event_stop must operate on an event"); + return false; + } + + auto event_val = event->AsFunc(); + + if ( event_val->Flavor() != FUNC_FLAVOR_EVENT ) + { + reporter->Error("Comm::auto_event_stop must operate on an event"); + return false; + } + + auto handler = event_registry->Lookup(event_val->Name()); + + if ( ! handler ) + { + reporter->Error("Comm::auto_event_stop failed to lookup event '%s'", + event_val->Name()); + return false; + } + + + handler->AutoRemoteStop(topic); + return true; + } + RecordVal* comm::Manager::MakeEventArgs(const val_list* args) { auto rval = new RecordVal(BifType::Record::Comm::EventArgs); diff --git a/src/comm/Manager.h b/src/comm/Manager.h index 020f78a03b..70bec51ded 100644 --- a/src/comm/Manager.h +++ b/src/comm/Manager.h @@ -31,8 +31,13 @@ public: bool Print(std::string topic, std::string msg, const Val* flags); + bool Event(std::string topic, broker::message msg, int flags); bool Event(std::string topic, const RecordVal* args, const Val* flags); + bool AutoEvent(std::string topic, const Val* event, const Val* flags); + + bool AutoEventStop(const std::string& topic, const Val* event); + RecordVal* MakeEventArgs(const val_list* args); bool SubscribeToPrints(std::string topic_prefix); diff --git a/src/comm/comm.bif b/src/comm/comm.bif index fe405222cc..c185120126 100644 --- a/src/comm/comm.bif +++ b/src/comm/comm.bif @@ -100,6 +100,19 @@ function Comm::event%(topic: string, args: Comm::EventArgs, return new Val(rval, TYPE_BOOL); %} +function Comm::auto_event%(topic: string, ev: any, + flags: SendFlags &default = SendFlags()%): bool + %{ + auto rval = comm_mgr->AutoEvent(topic->CheckString(), ev, flags); + return new Val(rval, TYPE_BOOL); + %} + +function Comm::auto_event_stop%(topic: string, ev: any%): bool + %{ + auto rval = comm_mgr->AutoEventStop(topic->CheckString(), ev); + return new Val(rval, TYPE_BOOL); + %} + function Comm::subscribe_to_events%(topic_prefix: string%): bool %{ auto rval = comm_mgr->SubscribeToEvents(topic_prefix->CheckString());