From 2b598e3d5a582b61ab43d43d52134a95e7e45336 Mon Sep 17 00:00:00 2001 From: Jon Siwek Date: Mon, 26 Jan 2015 14:24:42 -0600 Subject: [PATCH] broker integration: add remote logging It now works a bit differently than before: whether to send a remote log write is now a property of the logging stream, not the logging filter and it's now up the the receiver side filters to instantiate the desired writer. i.e. the sender now has no say in what the receiver should use as the log writer backend. Under the new style of remote logging, the "Log::enable_remote_logging" option is repurposed to set the default behavior for new logging streams. There's also "Comm::{enable,disable}_remote_logging()" to explicitly set the desired behavior for a given logging stream. To receive remote logs, one calls "Comm::subscribe_to_logs()", where senders implicitly use topics of the form "bro/log/". --- aux/broker | 2 +- src/comm/Data.cc | 2 +- src/comm/Manager.cc | 137 +++++++++++++++++++++++++++++++++++++++-- src/comm/Manager.h | 21 +++++-- src/comm/comm.bif | 34 ++++++++++ src/logging/Manager.cc | 66 ++++++++++++++++++++ src/logging/Manager.h | 10 +++ 7 files changed, 260 insertions(+), 12 deletions(-) diff --git a/aux/broker b/aux/broker index 1e8d675790..425bab3bf4 160000 --- a/aux/broker +++ b/aux/broker @@ -1 +1 @@ -Subproject commit 1e8d6757909750524c15f8eaf3c297243bc55425 +Subproject commit 425bab3bf420898d8dbd14280f94aee9d420f617 diff --git a/src/comm/Data.cc b/src/comm/Data.cc index 58d5b30085..b279b97529 100644 --- a/src/comm/Data.cc +++ b/src/comm/Data.cc @@ -332,7 +332,7 @@ struct val_converter { rval->Assign(i, item_val); } - return nullptr; + return rval; } }; diff --git a/src/comm/Manager.cc b/src/comm/Manager.cc index d803d64ae7..ffe68970a8 100644 --- a/src/comm/Manager.cc +++ b/src/comm/Manager.cc @@ -7,9 +7,16 @@ #include "Var.h" #include "Reporter.h" #include "comm/comm.bif.h" +#include "logging/Manager.h" using namespace std; +VectorType* comm::Manager::vector_of_data_type; +EnumType* comm::Manager::log_id_type; +int comm::Manager::send_flags_self_idx; +int comm::Manager::send_flags_peers_idx; +int comm::Manager::send_flags_unsolicited_idx; + bool comm::Manager::InitPreScript() { return true; @@ -33,6 +40,8 @@ bool comm::Manager::InitPostScript() send_flags_peers_idx = require_field(send_flags_type, "peers"); send_flags_unsolicited_idx = require_field(send_flags_type, "unsolicited"); + log_id_type = internal_type("Log::ID")->AsEnumType(); + comm::opaque_of_data_type = new OpaqueType("Comm::Data"); vector_of_data_type = new VectorType(internal_type("Comm::Data")->Ref()); @@ -103,7 +112,7 @@ bool comm::Manager::Disconnect(const string& addr, uint16_t port) bool comm::Manager::Print(string topic, string msg, const Val* flags) { - endpoint->send(move(topic), broker::message{move(msg)}, get_flags(flags)); + endpoint->send(move(topic), broker::message{move(msg)}, GetFlags(flags)); return true; } @@ -113,6 +122,34 @@ bool comm::Manager::Event(std::string topic, broker::message msg, int flags) return true; } +bool comm::Manager::Log(const EnumVal* stream, const RecordVal* columns, + int flags) + { + auto stream_name = stream->Type()->AsEnumType()->Lookup(stream->AsEnum()); + + if ( ! stream_name ) + { + reporter->Error("Failed to remotely log: stream %d doesn't have name", + stream->AsEnum()); + return false; + } + + auto opt_column_data = val_to_data(columns); + + if ( ! opt_column_data ) + { + reporter->Error("Failed to remotely log stream %s: unsupported types", + stream_name); + return false; + } + + broker::message msg{broker::enum_value{stream_name}, + move(*opt_column_data)}; + std::string topic = std::string("bro/log/") + stream_name; + endpoint->send(move(topic), move(msg), flags); + return true; + } + bool comm::Manager::Event(std::string topic, const RecordVal* args, const Val* flags) { @@ -132,7 +169,7 @@ bool comm::Manager::Event(std::string topic, const RecordVal* args, msg.emplace_back(data_val->data); } - endpoint->send(move(topic), move(msg), get_flags(flags)); + endpoint->send(move(topic), move(msg), GetFlags(flags)); return true; } @@ -161,7 +198,7 @@ bool comm::Manager::AutoEvent(string topic, const Val* event, const Val* flags) return false; } - handler->AutoRemote(move(topic), get_flags(flags)); + handler->AutoRemote(move(topic), GetFlags(flags)); return true; } @@ -294,7 +331,23 @@ bool comm::Manager::UnsubscribeToEvents(const string& topic_prefix) return event_subscriptions.erase(topic_prefix); } -int comm::Manager::get_flags(const Val* flags) +bool comm::Manager::SubscribeToLogs(string topic_prefix) + { + auto& q = log_subscriptions[topic_prefix]; + + if ( q ) + return false; + + q = broker::message_queue(move(topic_prefix), *endpoint); + return true; + } + +bool comm::Manager::UnsubscribeToLogs(const string& topic_prefix) + { + return log_subscriptions.erase(topic_prefix); + } + +int comm::Manager::GetFlags(const Val* flags) { auto r = flags->AsRecordVal(); int rval = 0; @@ -327,6 +380,9 @@ void comm::Manager::GetFds(iosource::FD_Set* read, iosource::FD_Set* write, for ( const auto& ps : event_subscriptions ) read->Insert(ps.second.fd()); + + for ( const auto& ps : log_subscriptions ) + read->Insert(ps.second.fd()); } double comm::Manager::NextTimestamp(double* local_network_time) @@ -493,5 +549,78 @@ void comm::Manager::Process() } } + struct unref_guard { + unref_guard(Val* v) : val(v) {} + ~unref_guard() { Unref(val); } + Val* val; + }; + + for ( const auto& ls : log_subscriptions ) + { + auto log_messages = ls.second.want_pop(); + + if ( log_messages.empty() ) + continue; + + idle = false; + + for ( auto& lm : log_messages ) + { + if ( lm.size() != 2 ) + { + reporter->Warning("got bad remote log size: %zd (expect 2)", + lm.size()); + continue; + } + + if ( ! broker::get(lm[0]) ) + { + reporter->Warning("got remote log w/o stream id: %d", + static_cast(broker::which(lm[0]))); + continue; + } + + if ( ! broker::get(lm[1]) ) + { + reporter->Warning("got remote log w/o columns: %d", + static_cast(broker::which(lm[1]))); + continue; + } + + auto stream_id = data_to_val(move(lm[0]), log_id_type); + + if ( ! stream_id ) + { + reporter->Warning("failed to unpack remote log stream id"); + continue; + } + + unref_guard stream_id_unreffer{stream_id}; + auto columns_type = log_mgr->StreamColumns(stream_id->AsEnumVal()); + + if ( ! columns_type ) + { + reporter->Warning("got remote log for unknown stream: %s", + stream_id->Type()->AsEnumType()->Lookup( + stream_id->AsEnum())); + continue; + } + + auto columns = data_to_val(move(lm[1]), columns_type); + + if ( ! columns ) + { + reporter->Warning("failed to unpack remote log stream columns" + " for stream: %s", + stream_id->Type()->AsEnumType()->Lookup( + stream_id->AsEnum())); + continue; + } + + log_mgr->Write(stream_id->AsEnumVal(), columns->AsRecordVal()); + Unref(columns); + } + } + SetIdle(idle); } diff --git a/src/comm/Manager.h b/src/comm/Manager.h index 70bec51ded..3c1e80827b 100644 --- a/src/comm/Manager.h +++ b/src/comm/Manager.h @@ -8,6 +8,7 @@ #include #include "Reporter.h" #include "iosource/IOSource.h" +#include "Val.h" namespace comm { @@ -34,6 +35,8 @@ public: bool Event(std::string topic, broker::message msg, int flags); bool Event(std::string topic, const RecordVal* args, const Val* flags); + bool Log(const EnumVal* stream_id, const RecordVal* columns, int flags); + bool AutoEvent(std::string topic, const Val* event, const Val* flags); bool AutoEventStop(const std::string& topic, const Val* event); @@ -48,9 +51,13 @@ public: bool UnsubscribeToEvents(const std::string& topic_prefix); -private: + bool SubscribeToLogs(std::string topic_prefix); - int get_flags(const Val* flags); + bool UnsubscribeToLogs(const std::string& topic_prefix); + + static int GetFlags(const Val* flags); + +private: // IOSource interface overrides: void GetFds(iosource::FD_Set* read, iosource::FD_Set* write, @@ -67,12 +74,14 @@ private: std::map, broker::peering> peers; std::map print_subscriptions; std::map event_subscriptions; + std::map log_subscriptions; - int send_flags_self_idx; - int send_flags_peers_idx; - int send_flags_unsolicited_idx; + static VectorType* vector_of_data_type; + static EnumType* log_id_type; + static int send_flags_self_idx; + static int send_flags_peers_idx; + static int send_flags_unsolicited_idx; - VectorType* vector_of_data_type; }; } // namespace comm diff --git a/src/comm/comm.bif b/src/comm/comm.bif index c185120126..e1c2bc533f 100644 --- a/src/comm/comm.bif +++ b/src/comm/comm.bif @@ -2,6 +2,7 @@ %%{ #include "comm/Manager.h" #include "comm/Data.h" +#include "logging/Manager.h" %%} module Comm; @@ -124,3 +125,36 @@ function Comm::unsubscribe_to_events%(topic_prefix: string%): bool auto rval = comm_mgr->UnsubscribeToEvents(topic_prefix->CheckString()); return new Val(rval, TYPE_BOOL); %} + +function +Comm::enable_remote_logs%(id: Log::ID, + flags: SendFlags &default = SendFlags()%): bool + %{ + auto rval = log_mgr->EnableRemoteLogs(id->AsEnumVal(), + comm::Manager::GetFlags(flags)); + return new Val(rval, TYPE_BOOL); + %} + +function Comm::disable_remote_logs%(id: Log::ID%): bool + %{ + auto rval = log_mgr->DisableRemoteLogs(id->AsEnumVal()); + return new Val(rval, TYPE_BOOL); + %} + +function Comm::remote_logs_enabled%(id: Log::ID%): bool + %{ + auto rval = log_mgr->RemoteLogsAreEnabled(id->AsEnumVal()); + return new Val(rval, TYPE_BOOL); + %} + +function Comm::subscribe_to_logs%(topic_prefix: string%): bool + %{ + auto rval = comm_mgr->SubscribeToLogs(topic_prefix->CheckString()); + return new Val(rval, TYPE_BOOL); + %} + +function Comm::unsubscribe_to_logs%(topic_prefix: string%): bool + %{ + auto rval = comm_mgr->UnsubscribeToLogs(topic_prefix->CheckString()); + return new Val(rval, TYPE_BOOL); + %} diff --git a/src/logging/Manager.cc b/src/logging/Manager.cc index 1fe5db3b26..d6d7fbb908 100644 --- a/src/logging/Manager.cc +++ b/src/logging/Manager.cc @@ -16,6 +16,10 @@ #include "WriterBackend.h" #include "logging.bif.h" +#ifdef ENABLE_BROKER +#include "comm/Manager.h" +#endif + using namespace logging; struct Manager::Filter { @@ -69,6 +73,11 @@ struct Manager::Stream { WriterMap writers; // Writers indexed by id/path pair. +#ifdef ENABLE_BROKER + bool enable_remote; + int remote_flags; +#endif + ~Stream(); }; @@ -287,6 +296,11 @@ bool Manager::CreateStream(EnumVal* id, RecordVal* sval) streams[idx]->event = event ? event_registry->Lookup(event->Name()) : 0; streams[idx]->columns = columns->Ref()->AsRecordType(); +#ifdef ENABLE_BROKER + streams[idx]->enable_remote = internal_val("Log::enable_remote_logging")->AsBool(); + streams[idx]->remote_flags = broker::PEERS; +#endif + DBG_LOG(DBG_LOGGING, "Created new logging stream '%s', raising event %s", streams[idx]->name.c_str(), event ? streams[idx]->event->Name() : ""); @@ -828,6 +842,11 @@ bool Manager::Write(EnumVal* id, RecordVal* columns) #endif } +#ifdef ENABLE_BROKER + if ( stream->enable_remote ) + comm_mgr->Log(id, columns, stream->remote_flags); +#endif + Unref(columns); if ( error ) @@ -1206,6 +1225,53 @@ void Manager::Terminate() } } +#ifdef ENABLE_BROKER + +bool Manager::EnableRemoteLogs(EnumVal* stream_id, int flags) + { + auto stream = FindStream(stream_id); + + if ( ! stream ) + return false; + + stream->enable_remote = true; + stream->remote_flags = flags; + return true; + } + +bool Manager::DisableRemoteLogs(EnumVal* stream_id) + { + auto stream = FindStream(stream_id); + + if ( ! stream ) + return false; + + stream->enable_remote = false; + return true; + } + +bool Manager::RemoteLogsAreEnabled(EnumVal* stream_id) + { + auto stream = FindStream(stream_id); + + if ( ! stream ) + return false; + + return stream->enable_remote; + } + +RecordType* Manager::StreamColumns(EnumVal* stream_id) + { + auto stream = FindStream(stream_id); + + if ( ! stream ) + return nullptr; + + return stream->columns; + } + +#endif + // Timer which on dispatching rotates the filter. class RotationTimer : public Timer { public: diff --git a/src/logging/Manager.h b/src/logging/Manager.h index b8264927a3..8130a1ddd4 100644 --- a/src/logging/Manager.h +++ b/src/logging/Manager.h @@ -157,6 +157,16 @@ public: */ void Terminate(); +#ifdef ENABLE_BROKER + bool EnableRemoteLogs(EnumVal* stream_id, int flags); + + bool DisableRemoteLogs(EnumVal* stream_id); + + bool RemoteLogsAreEnabled(EnumVal* stream_id); + + RecordType* StreamColumns(EnumVal* stream_id); +#endif + protected: friend class WriterFrontend; friend class RotationFinishedMessage;