broker integration: add remote logging

It now works a bit differently than before: whether to send a remote log
write is now a property of the logging stream, not the logging filter
and it's now up the the receiver side filters to instantiate the desired
writer.  i.e. the sender now has no say in what the receiver should use
as the log writer backend.

Under the new style of remote logging, the "Log::enable_remote_logging"
option is repurposed to set the default behavior for new logging
streams.  There's also "Comm::{enable,disable}_remote_logging()" to
explicitly set the desired behavior for a given logging stream.  To
receive remote logs, one calls "Comm::subscribe_to_logs(<topic>)", where
senders implicitly use topics of the form "bro/log/<stream id>".
This commit is contained in:
Jon Siwek 2015-01-26 14:24:42 -06:00
parent 5df71ddc91
commit 2b598e3d5a
7 changed files with 260 additions and 12 deletions

@ -1 +1 @@
Subproject commit 1e8d6757909750524c15f8eaf3c297243bc55425 Subproject commit 425bab3bf420898d8dbd14280f94aee9d420f617

View file

@ -332,7 +332,7 @@ struct val_converter {
rval->Assign(i, item_val); rval->Assign(i, item_val);
} }
return nullptr; return rval;
} }
}; };

View file

@ -7,9 +7,16 @@
#include "Var.h" #include "Var.h"
#include "Reporter.h" #include "Reporter.h"
#include "comm/comm.bif.h" #include "comm/comm.bif.h"
#include "logging/Manager.h"
using namespace std; using namespace std;
VectorType* comm::Manager::vector_of_data_type;
EnumType* comm::Manager::log_id_type;
int comm::Manager::send_flags_self_idx;
int comm::Manager::send_flags_peers_idx;
int comm::Manager::send_flags_unsolicited_idx;
bool comm::Manager::InitPreScript() bool comm::Manager::InitPreScript()
{ {
return true; return true;
@ -33,6 +40,8 @@ bool comm::Manager::InitPostScript()
send_flags_peers_idx = require_field(send_flags_type, "peers"); send_flags_peers_idx = require_field(send_flags_type, "peers");
send_flags_unsolicited_idx = require_field(send_flags_type, "unsolicited"); send_flags_unsolicited_idx = require_field(send_flags_type, "unsolicited");
log_id_type = internal_type("Log::ID")->AsEnumType();
comm::opaque_of_data_type = new OpaqueType("Comm::Data"); comm::opaque_of_data_type = new OpaqueType("Comm::Data");
vector_of_data_type = new VectorType(internal_type("Comm::Data")->Ref()); vector_of_data_type = new VectorType(internal_type("Comm::Data")->Ref());
@ -103,7 +112,7 @@ bool comm::Manager::Disconnect(const string& addr, uint16_t port)
bool comm::Manager::Print(string topic, string msg, const Val* flags) bool comm::Manager::Print(string topic, string msg, const Val* flags)
{ {
endpoint->send(move(topic), broker::message{move(msg)}, get_flags(flags)); endpoint->send(move(topic), broker::message{move(msg)}, GetFlags(flags));
return true; return true;
} }
@ -113,6 +122,34 @@ bool comm::Manager::Event(std::string topic, broker::message msg, int flags)
return true; return true;
} }
bool comm::Manager::Log(const EnumVal* stream, const RecordVal* columns,
int flags)
{
auto stream_name = stream->Type()->AsEnumType()->Lookup(stream->AsEnum());
if ( ! stream_name )
{
reporter->Error("Failed to remotely log: stream %d doesn't have name",
stream->AsEnum());
return false;
}
auto opt_column_data = val_to_data(columns);
if ( ! opt_column_data )
{
reporter->Error("Failed to remotely log stream %s: unsupported types",
stream_name);
return false;
}
broker::message msg{broker::enum_value{stream_name},
move(*opt_column_data)};
std::string topic = std::string("bro/log/") + stream_name;
endpoint->send(move(topic), move(msg), flags);
return true;
}
bool comm::Manager::Event(std::string topic, const RecordVal* args, bool comm::Manager::Event(std::string topic, const RecordVal* args,
const Val* flags) const Val* flags)
{ {
@ -132,7 +169,7 @@ bool comm::Manager::Event(std::string topic, const RecordVal* args,
msg.emplace_back(data_val->data); msg.emplace_back(data_val->data);
} }
endpoint->send(move(topic), move(msg), get_flags(flags)); endpoint->send(move(topic), move(msg), GetFlags(flags));
return true; return true;
} }
@ -161,7 +198,7 @@ bool comm::Manager::AutoEvent(string topic, const Val* event, const Val* flags)
return false; return false;
} }
handler->AutoRemote(move(topic), get_flags(flags)); handler->AutoRemote(move(topic), GetFlags(flags));
return true; return true;
} }
@ -294,7 +331,23 @@ bool comm::Manager::UnsubscribeToEvents(const string& topic_prefix)
return event_subscriptions.erase(topic_prefix); return event_subscriptions.erase(topic_prefix);
} }
int comm::Manager::get_flags(const Val* flags) bool comm::Manager::SubscribeToLogs(string topic_prefix)
{
auto& q = log_subscriptions[topic_prefix];
if ( q )
return false;
q = broker::message_queue(move(topic_prefix), *endpoint);
return true;
}
bool comm::Manager::UnsubscribeToLogs(const string& topic_prefix)
{
return log_subscriptions.erase(topic_prefix);
}
int comm::Manager::GetFlags(const Val* flags)
{ {
auto r = flags->AsRecordVal(); auto r = flags->AsRecordVal();
int rval = 0; int rval = 0;
@ -327,6 +380,9 @@ void comm::Manager::GetFds(iosource::FD_Set* read, iosource::FD_Set* write,
for ( const auto& ps : event_subscriptions ) for ( const auto& ps : event_subscriptions )
read->Insert(ps.second.fd()); read->Insert(ps.second.fd());
for ( const auto& ps : log_subscriptions )
read->Insert(ps.second.fd());
} }
double comm::Manager::NextTimestamp(double* local_network_time) double comm::Manager::NextTimestamp(double* local_network_time)
@ -493,5 +549,78 @@ void comm::Manager::Process()
} }
} }
struct unref_guard {
unref_guard(Val* v) : val(v) {}
~unref_guard() { Unref(val); }
Val* val;
};
for ( const auto& ls : log_subscriptions )
{
auto log_messages = ls.second.want_pop();
if ( log_messages.empty() )
continue;
idle = false;
for ( auto& lm : log_messages )
{
if ( lm.size() != 2 )
{
reporter->Warning("got bad remote log size: %zd (expect 2)",
lm.size());
continue;
}
if ( ! broker::get<broker::enum_value>(lm[0]) )
{
reporter->Warning("got remote log w/o stream id: %d",
static_cast<int>(broker::which(lm[0])));
continue;
}
if ( ! broker::get<broker::record>(lm[1]) )
{
reporter->Warning("got remote log w/o columns: %d",
static_cast<int>(broker::which(lm[1])));
continue;
}
auto stream_id = data_to_val(move(lm[0]), log_id_type);
if ( ! stream_id )
{
reporter->Warning("failed to unpack remote log stream id");
continue;
}
unref_guard stream_id_unreffer{stream_id};
auto columns_type = log_mgr->StreamColumns(stream_id->AsEnumVal());
if ( ! columns_type )
{
reporter->Warning("got remote log for unknown stream: %s",
stream_id->Type()->AsEnumType()->Lookup(
stream_id->AsEnum()));
continue;
}
auto columns = data_to_val(move(lm[1]), columns_type);
if ( ! columns )
{
reporter->Warning("failed to unpack remote log stream columns"
" for stream: %s",
stream_id->Type()->AsEnumType()->Lookup(
stream_id->AsEnum()));
continue;
}
log_mgr->Write(stream_id->AsEnumVal(), columns->AsRecordVal());
Unref(columns);
}
}
SetIdle(idle); SetIdle(idle);
} }

View file

@ -8,6 +8,7 @@
#include <map> #include <map>
#include "Reporter.h" #include "Reporter.h"
#include "iosource/IOSource.h" #include "iosource/IOSource.h"
#include "Val.h"
namespace comm { namespace comm {
@ -34,6 +35,8 @@ public:
bool Event(std::string topic, broker::message msg, int flags); bool Event(std::string topic, broker::message msg, int flags);
bool Event(std::string topic, const RecordVal* args, const Val* flags); bool Event(std::string topic, const RecordVal* args, const Val* flags);
bool Log(const EnumVal* stream_id, const RecordVal* columns, int flags);
bool AutoEvent(std::string topic, const Val* event, const Val* flags); bool AutoEvent(std::string topic, const Val* event, const Val* flags);
bool AutoEventStop(const std::string& topic, const Val* event); bool AutoEventStop(const std::string& topic, const Val* event);
@ -48,9 +51,13 @@ public:
bool UnsubscribeToEvents(const std::string& topic_prefix); bool UnsubscribeToEvents(const std::string& topic_prefix);
private: bool SubscribeToLogs(std::string topic_prefix);
int get_flags(const Val* flags); bool UnsubscribeToLogs(const std::string& topic_prefix);
static int GetFlags(const Val* flags);
private:
// IOSource interface overrides: // IOSource interface overrides:
void GetFds(iosource::FD_Set* read, iosource::FD_Set* write, void GetFds(iosource::FD_Set* read, iosource::FD_Set* write,
@ -67,12 +74,14 @@ private:
std::map<std::pair<std::string, uint16_t>, broker::peering> peers; std::map<std::pair<std::string, uint16_t>, broker::peering> peers;
std::map<std::string, broker::message_queue> print_subscriptions; std::map<std::string, broker::message_queue> print_subscriptions;
std::map<std::string, broker::message_queue> event_subscriptions; std::map<std::string, broker::message_queue> event_subscriptions;
std::map<std::string, broker::message_queue> log_subscriptions;
int send_flags_self_idx; static VectorType* vector_of_data_type;
int send_flags_peers_idx; static EnumType* log_id_type;
int send_flags_unsolicited_idx; static int send_flags_self_idx;
static int send_flags_peers_idx;
static int send_flags_unsolicited_idx;
VectorType* vector_of_data_type;
}; };
} // namespace comm } // namespace comm

View file

@ -2,6 +2,7 @@
%%{ %%{
#include "comm/Manager.h" #include "comm/Manager.h"
#include "comm/Data.h" #include "comm/Data.h"
#include "logging/Manager.h"
%%} %%}
module Comm; module Comm;
@ -124,3 +125,36 @@ function Comm::unsubscribe_to_events%(topic_prefix: string%): bool
auto rval = comm_mgr->UnsubscribeToEvents(topic_prefix->CheckString()); auto rval = comm_mgr->UnsubscribeToEvents(topic_prefix->CheckString());
return new Val(rval, TYPE_BOOL); return new Val(rval, TYPE_BOOL);
%} %}
function
Comm::enable_remote_logs%(id: Log::ID,
flags: SendFlags &default = SendFlags()%): bool
%{
auto rval = log_mgr->EnableRemoteLogs(id->AsEnumVal(),
comm::Manager::GetFlags(flags));
return new Val(rval, TYPE_BOOL);
%}
function Comm::disable_remote_logs%(id: Log::ID%): bool
%{
auto rval = log_mgr->DisableRemoteLogs(id->AsEnumVal());
return new Val(rval, TYPE_BOOL);
%}
function Comm::remote_logs_enabled%(id: Log::ID%): bool
%{
auto rval = log_mgr->RemoteLogsAreEnabled(id->AsEnumVal());
return new Val(rval, TYPE_BOOL);
%}
function Comm::subscribe_to_logs%(topic_prefix: string%): bool
%{
auto rval = comm_mgr->SubscribeToLogs(topic_prefix->CheckString());
return new Val(rval, TYPE_BOOL);
%}
function Comm::unsubscribe_to_logs%(topic_prefix: string%): bool
%{
auto rval = comm_mgr->UnsubscribeToLogs(topic_prefix->CheckString());
return new Val(rval, TYPE_BOOL);
%}

View file

@ -16,6 +16,10 @@
#include "WriterBackend.h" #include "WriterBackend.h"
#include "logging.bif.h" #include "logging.bif.h"
#ifdef ENABLE_BROKER
#include "comm/Manager.h"
#endif
using namespace logging; using namespace logging;
struct Manager::Filter { struct Manager::Filter {
@ -69,6 +73,11 @@ struct Manager::Stream {
WriterMap writers; // Writers indexed by id/path pair. WriterMap writers; // Writers indexed by id/path pair.
#ifdef ENABLE_BROKER
bool enable_remote;
int remote_flags;
#endif
~Stream(); ~Stream();
}; };
@ -287,6 +296,11 @@ bool Manager::CreateStream(EnumVal* id, RecordVal* sval)
streams[idx]->event = event ? event_registry->Lookup(event->Name()) : 0; streams[idx]->event = event ? event_registry->Lookup(event->Name()) : 0;
streams[idx]->columns = columns->Ref()->AsRecordType(); streams[idx]->columns = columns->Ref()->AsRecordType();
#ifdef ENABLE_BROKER
streams[idx]->enable_remote = internal_val("Log::enable_remote_logging")->AsBool();
streams[idx]->remote_flags = broker::PEERS;
#endif
DBG_LOG(DBG_LOGGING, "Created new logging stream '%s', raising event %s", DBG_LOG(DBG_LOGGING, "Created new logging stream '%s', raising event %s",
streams[idx]->name.c_str(), event ? streams[idx]->event->Name() : "<none>"); streams[idx]->name.c_str(), event ? streams[idx]->event->Name() : "<none>");
@ -828,6 +842,11 @@ bool Manager::Write(EnumVal* id, RecordVal* columns)
#endif #endif
} }
#ifdef ENABLE_BROKER
if ( stream->enable_remote )
comm_mgr->Log(id, columns, stream->remote_flags);
#endif
Unref(columns); Unref(columns);
if ( error ) if ( error )
@ -1206,6 +1225,53 @@ void Manager::Terminate()
} }
} }
#ifdef ENABLE_BROKER
bool Manager::EnableRemoteLogs(EnumVal* stream_id, int flags)
{
auto stream = FindStream(stream_id);
if ( ! stream )
return false;
stream->enable_remote = true;
stream->remote_flags = flags;
return true;
}
bool Manager::DisableRemoteLogs(EnumVal* stream_id)
{
auto stream = FindStream(stream_id);
if ( ! stream )
return false;
stream->enable_remote = false;
return true;
}
bool Manager::RemoteLogsAreEnabled(EnumVal* stream_id)
{
auto stream = FindStream(stream_id);
if ( ! stream )
return false;
return stream->enable_remote;
}
RecordType* Manager::StreamColumns(EnumVal* stream_id)
{
auto stream = FindStream(stream_id);
if ( ! stream )
return nullptr;
return stream->columns;
}
#endif
// Timer which on dispatching rotates the filter. // Timer which on dispatching rotates the filter.
class RotationTimer : public Timer { class RotationTimer : public Timer {
public: public:

View file

@ -157,6 +157,16 @@ public:
*/ */
void Terminate(); void Terminate();
#ifdef ENABLE_BROKER
bool EnableRemoteLogs(EnumVal* stream_id, int flags);
bool DisableRemoteLogs(EnumVal* stream_id);
bool RemoteLogsAreEnabled(EnumVal* stream_id);
RecordType* StreamColumns(EnumVal* stream_id);
#endif
protected: protected:
friend class WriterFrontend; friend class WriterFrontend;
friend class RotationFinishedMessage; friend class RotationFinishedMessage;