diff --git a/aux/broker b/aux/broker index 1e8d675790..425bab3bf4 160000 --- a/aux/broker +++ b/aux/broker @@ -1 +1 @@ -Subproject commit 1e8d6757909750524c15f8eaf3c297243bc55425 +Subproject commit 425bab3bf420898d8dbd14280f94aee9d420f617 diff --git a/src/comm/Data.cc b/src/comm/Data.cc index 58d5b30085..b279b97529 100644 --- a/src/comm/Data.cc +++ b/src/comm/Data.cc @@ -332,7 +332,7 @@ struct val_converter { rval->Assign(i, item_val); } - return nullptr; + return rval; } }; diff --git a/src/comm/Manager.cc b/src/comm/Manager.cc index d803d64ae7..ffe68970a8 100644 --- a/src/comm/Manager.cc +++ b/src/comm/Manager.cc @@ -7,9 +7,16 @@ #include "Var.h" #include "Reporter.h" #include "comm/comm.bif.h" +#include "logging/Manager.h" using namespace std; +VectorType* comm::Manager::vector_of_data_type; +EnumType* comm::Manager::log_id_type; +int comm::Manager::send_flags_self_idx; +int comm::Manager::send_flags_peers_idx; +int comm::Manager::send_flags_unsolicited_idx; + bool comm::Manager::InitPreScript() { return true; @@ -33,6 +40,8 @@ bool comm::Manager::InitPostScript() send_flags_peers_idx = require_field(send_flags_type, "peers"); send_flags_unsolicited_idx = require_field(send_flags_type, "unsolicited"); + log_id_type = internal_type("Log::ID")->AsEnumType(); + comm::opaque_of_data_type = new OpaqueType("Comm::Data"); vector_of_data_type = new VectorType(internal_type("Comm::Data")->Ref()); @@ -103,7 +112,7 @@ bool comm::Manager::Disconnect(const string& addr, uint16_t port) bool comm::Manager::Print(string topic, string msg, const Val* flags) { - endpoint->send(move(topic), broker::message{move(msg)}, get_flags(flags)); + endpoint->send(move(topic), broker::message{move(msg)}, GetFlags(flags)); return true; } @@ -113,6 +122,34 @@ bool comm::Manager::Event(std::string topic, broker::message msg, int flags) return true; } +bool comm::Manager::Log(const EnumVal* stream, const RecordVal* columns, + int flags) + { + auto stream_name = stream->Type()->AsEnumType()->Lookup(stream->AsEnum()); + + if ( ! stream_name ) + { + reporter->Error("Failed to remotely log: stream %d doesn't have name", + stream->AsEnum()); + return false; + } + + auto opt_column_data = val_to_data(columns); + + if ( ! opt_column_data ) + { + reporter->Error("Failed to remotely log stream %s: unsupported types", + stream_name); + return false; + } + + broker::message msg{broker::enum_value{stream_name}, + move(*opt_column_data)}; + std::string topic = std::string("bro/log/") + stream_name; + endpoint->send(move(topic), move(msg), flags); + return true; + } + bool comm::Manager::Event(std::string topic, const RecordVal* args, const Val* flags) { @@ -132,7 +169,7 @@ bool comm::Manager::Event(std::string topic, const RecordVal* args, msg.emplace_back(data_val->data); } - endpoint->send(move(topic), move(msg), get_flags(flags)); + endpoint->send(move(topic), move(msg), GetFlags(flags)); return true; } @@ -161,7 +198,7 @@ bool comm::Manager::AutoEvent(string topic, const Val* event, const Val* flags) return false; } - handler->AutoRemote(move(topic), get_flags(flags)); + handler->AutoRemote(move(topic), GetFlags(flags)); return true; } @@ -294,7 +331,23 @@ bool comm::Manager::UnsubscribeToEvents(const string& topic_prefix) return event_subscriptions.erase(topic_prefix); } -int comm::Manager::get_flags(const Val* flags) +bool comm::Manager::SubscribeToLogs(string topic_prefix) + { + auto& q = log_subscriptions[topic_prefix]; + + if ( q ) + return false; + + q = broker::message_queue(move(topic_prefix), *endpoint); + return true; + } + +bool comm::Manager::UnsubscribeToLogs(const string& topic_prefix) + { + return log_subscriptions.erase(topic_prefix); + } + +int comm::Manager::GetFlags(const Val* flags) { auto r = flags->AsRecordVal(); int rval = 0; @@ -327,6 +380,9 @@ void comm::Manager::GetFds(iosource::FD_Set* read, iosource::FD_Set* write, for ( const auto& ps : event_subscriptions ) read->Insert(ps.second.fd()); + + for ( const auto& ps : log_subscriptions ) + read->Insert(ps.second.fd()); } double comm::Manager::NextTimestamp(double* local_network_time) @@ -493,5 +549,78 @@ void comm::Manager::Process() } } + struct unref_guard { + unref_guard(Val* v) : val(v) {} + ~unref_guard() { Unref(val); } + Val* val; + }; + + for ( const auto& ls : log_subscriptions ) + { + auto log_messages = ls.second.want_pop(); + + if ( log_messages.empty() ) + continue; + + idle = false; + + for ( auto& lm : log_messages ) + { + if ( lm.size() != 2 ) + { + reporter->Warning("got bad remote log size: %zd (expect 2)", + lm.size()); + continue; + } + + if ( ! broker::get(lm[0]) ) + { + reporter->Warning("got remote log w/o stream id: %d", + static_cast(broker::which(lm[0]))); + continue; + } + + if ( ! broker::get(lm[1]) ) + { + reporter->Warning("got remote log w/o columns: %d", + static_cast(broker::which(lm[1]))); + continue; + } + + auto stream_id = data_to_val(move(lm[0]), log_id_type); + + if ( ! stream_id ) + { + reporter->Warning("failed to unpack remote log stream id"); + continue; + } + + unref_guard stream_id_unreffer{stream_id}; + auto columns_type = log_mgr->StreamColumns(stream_id->AsEnumVal()); + + if ( ! columns_type ) + { + reporter->Warning("got remote log for unknown stream: %s", + stream_id->Type()->AsEnumType()->Lookup( + stream_id->AsEnum())); + continue; + } + + auto columns = data_to_val(move(lm[1]), columns_type); + + if ( ! columns ) + { + reporter->Warning("failed to unpack remote log stream columns" + " for stream: %s", + stream_id->Type()->AsEnumType()->Lookup( + stream_id->AsEnum())); + continue; + } + + log_mgr->Write(stream_id->AsEnumVal(), columns->AsRecordVal()); + Unref(columns); + } + } + SetIdle(idle); } diff --git a/src/comm/Manager.h b/src/comm/Manager.h index 70bec51ded..3c1e80827b 100644 --- a/src/comm/Manager.h +++ b/src/comm/Manager.h @@ -8,6 +8,7 @@ #include #include "Reporter.h" #include "iosource/IOSource.h" +#include "Val.h" namespace comm { @@ -34,6 +35,8 @@ public: bool Event(std::string topic, broker::message msg, int flags); bool Event(std::string topic, const RecordVal* args, const Val* flags); + bool Log(const EnumVal* stream_id, const RecordVal* columns, int flags); + bool AutoEvent(std::string topic, const Val* event, const Val* flags); bool AutoEventStop(const std::string& topic, const Val* event); @@ -48,9 +51,13 @@ public: bool UnsubscribeToEvents(const std::string& topic_prefix); -private: + bool SubscribeToLogs(std::string topic_prefix); - int get_flags(const Val* flags); + bool UnsubscribeToLogs(const std::string& topic_prefix); + + static int GetFlags(const Val* flags); + +private: // IOSource interface overrides: void GetFds(iosource::FD_Set* read, iosource::FD_Set* write, @@ -67,12 +74,14 @@ private: std::map, broker::peering> peers; std::map print_subscriptions; std::map event_subscriptions; + std::map log_subscriptions; - int send_flags_self_idx; - int send_flags_peers_idx; - int send_flags_unsolicited_idx; + static VectorType* vector_of_data_type; + static EnumType* log_id_type; + static int send_flags_self_idx; + static int send_flags_peers_idx; + static int send_flags_unsolicited_idx; - VectorType* vector_of_data_type; }; } // namespace comm diff --git a/src/comm/comm.bif b/src/comm/comm.bif index c185120126..e1c2bc533f 100644 --- a/src/comm/comm.bif +++ b/src/comm/comm.bif @@ -2,6 +2,7 @@ %%{ #include "comm/Manager.h" #include "comm/Data.h" +#include "logging/Manager.h" %%} module Comm; @@ -124,3 +125,36 @@ function Comm::unsubscribe_to_events%(topic_prefix: string%): bool auto rval = comm_mgr->UnsubscribeToEvents(topic_prefix->CheckString()); return new Val(rval, TYPE_BOOL); %} + +function +Comm::enable_remote_logs%(id: Log::ID, + flags: SendFlags &default = SendFlags()%): bool + %{ + auto rval = log_mgr->EnableRemoteLogs(id->AsEnumVal(), + comm::Manager::GetFlags(flags)); + return new Val(rval, TYPE_BOOL); + %} + +function Comm::disable_remote_logs%(id: Log::ID%): bool + %{ + auto rval = log_mgr->DisableRemoteLogs(id->AsEnumVal()); + return new Val(rval, TYPE_BOOL); + %} + +function Comm::remote_logs_enabled%(id: Log::ID%): bool + %{ + auto rval = log_mgr->RemoteLogsAreEnabled(id->AsEnumVal()); + return new Val(rval, TYPE_BOOL); + %} + +function Comm::subscribe_to_logs%(topic_prefix: string%): bool + %{ + auto rval = comm_mgr->SubscribeToLogs(topic_prefix->CheckString()); + return new Val(rval, TYPE_BOOL); + %} + +function Comm::unsubscribe_to_logs%(topic_prefix: string%): bool + %{ + auto rval = comm_mgr->UnsubscribeToLogs(topic_prefix->CheckString()); + return new Val(rval, TYPE_BOOL); + %} diff --git a/src/logging/Manager.cc b/src/logging/Manager.cc index 1fe5db3b26..d6d7fbb908 100644 --- a/src/logging/Manager.cc +++ b/src/logging/Manager.cc @@ -16,6 +16,10 @@ #include "WriterBackend.h" #include "logging.bif.h" +#ifdef ENABLE_BROKER +#include "comm/Manager.h" +#endif + using namespace logging; struct Manager::Filter { @@ -69,6 +73,11 @@ struct Manager::Stream { WriterMap writers; // Writers indexed by id/path pair. +#ifdef ENABLE_BROKER + bool enable_remote; + int remote_flags; +#endif + ~Stream(); }; @@ -287,6 +296,11 @@ bool Manager::CreateStream(EnumVal* id, RecordVal* sval) streams[idx]->event = event ? event_registry->Lookup(event->Name()) : 0; streams[idx]->columns = columns->Ref()->AsRecordType(); +#ifdef ENABLE_BROKER + streams[idx]->enable_remote = internal_val("Log::enable_remote_logging")->AsBool(); + streams[idx]->remote_flags = broker::PEERS; +#endif + DBG_LOG(DBG_LOGGING, "Created new logging stream '%s', raising event %s", streams[idx]->name.c_str(), event ? streams[idx]->event->Name() : ""); @@ -828,6 +842,11 @@ bool Manager::Write(EnumVal* id, RecordVal* columns) #endif } +#ifdef ENABLE_BROKER + if ( stream->enable_remote ) + comm_mgr->Log(id, columns, stream->remote_flags); +#endif + Unref(columns); if ( error ) @@ -1206,6 +1225,53 @@ void Manager::Terminate() } } +#ifdef ENABLE_BROKER + +bool Manager::EnableRemoteLogs(EnumVal* stream_id, int flags) + { + auto stream = FindStream(stream_id); + + if ( ! stream ) + return false; + + stream->enable_remote = true; + stream->remote_flags = flags; + return true; + } + +bool Manager::DisableRemoteLogs(EnumVal* stream_id) + { + auto stream = FindStream(stream_id); + + if ( ! stream ) + return false; + + stream->enable_remote = false; + return true; + } + +bool Manager::RemoteLogsAreEnabled(EnumVal* stream_id) + { + auto stream = FindStream(stream_id); + + if ( ! stream ) + return false; + + return stream->enable_remote; + } + +RecordType* Manager::StreamColumns(EnumVal* stream_id) + { + auto stream = FindStream(stream_id); + + if ( ! stream ) + return nullptr; + + return stream->columns; + } + +#endif + // Timer which on dispatching rotates the filter. class RotationTimer : public Timer { public: diff --git a/src/logging/Manager.h b/src/logging/Manager.h index b8264927a3..8130a1ddd4 100644 --- a/src/logging/Manager.h +++ b/src/logging/Manager.h @@ -157,6 +157,16 @@ public: */ void Terminate(); +#ifdef ENABLE_BROKER + bool EnableRemoteLogs(EnumVal* stream_id, int flags); + + bool DisableRemoteLogs(EnumVal* stream_id); + + bool RemoteLogsAreEnabled(EnumVal* stream_id); + + RecordType* StreamColumns(EnumVal* stream_id); +#endif + protected: friend class WriterFrontend; friend class RotationFinishedMessage;