Changing semantics of Broker's remote logging to match old communication framework.

Broker had changed the semantics of remote logging: it sent over the
original Bro record containing the values to be logged, which on the
receiving side would then pass through the logging framework normally,
including triggering filters and events. The old communication system
however special-cases logs: it sends already processed log entries,
just as they go into the log files, and without any receiver-side
filtering etc. This more efficient as it short-cuts the processing
path, and also avoids the more expensive Val serialization. It also
lets the sender determine the specifics of what gets logged (and how).

This commit changes Broker over to now use the same semantics as the
old communication system.

TODOs:
     - The new Broker code doesn't have consistent #ifdefs yet.

     - Right now, when a new log receiver connects, all existing logs
     are broadcasted out again to all current clients. That doesn't so
     any harm, but is unncessary. Need to add a way to send the
     existing logs to just the new client.
This commit is contained in:
Robin Sommer 2017-01-31 21:51:31 -08:00
parent 0dd0bfb5bb
commit a5e9a535a5
25 changed files with 1178 additions and 159 deletions

View file

@ -20,10 +20,17 @@ using namespace std;
VectorType* bro_broker::Manager::vector_of_data_type;
EnumType* bro_broker::Manager::log_id_type;
EnumType* bro_broker::Manager::writer_id_type;
int bro_broker::Manager::send_flags_self_idx;
int bro_broker::Manager::send_flags_peers_idx;
int bro_broker::Manager::send_flags_unsolicited_idx;
struct unref_guard {
unref_guard(Val* v) : val(v) {}
~unref_guard() { Unref(val); }
Val* val;
};
bro_broker::Manager::Manager()
: iosource::IOSource(), next_timestamp(-1)
{
@ -83,6 +90,7 @@ bool bro_broker::Manager::Enable(Val* broker_endpoint_flags)
send_flags_unsolicited_idx = require_field(send_flags_type, "unsolicited");
log_id_type = internal_type("Log::ID")->AsEnumType();
writer_id_type = internal_type("Log::Writer")->AsEnumType();
bro_broker::opaque_of_data_type = new OpaqueType("Broker::Data");
bro_broker::opaque_of_set_iterator = new OpaqueType("Broker::SetIterator");
@ -206,8 +214,9 @@ bool bro_broker::Manager::Event(std::string topic, broker::message msg, int flag
return true;
}
bool bro_broker::Manager::Log(EnumVal* stream, RecordVal* columns, RecordType* info,
int flags)
bool bro_broker::Manager::CreateLog(EnumVal* stream, EnumVal* writer, const logging::WriterBackend::WriterInfo& info,
int num_fields, const threading::Field* const * fields, int flags,
const string& peer)
{
if ( ! Enabled() )
return false;
@ -221,40 +230,82 @@ bool bro_broker::Manager::Log(EnumVal* stream, RecordVal* columns, RecordType* i
return false;
}
broker::record column_data;
auto writer_name = writer->Type()->AsEnumType()->Lookup(writer->AsEnum());
for ( auto i = 0u; i < static_cast<size_t>(info->NumFields()); ++i )
if ( ! writer_name )
{
if ( ! info->FieldDecl(i)->FindAttr(ATTR_LOG) )
continue;
reporter->Error("Failed to remotely log: writer %d doesn't have name",
writer->AsEnum());
return false;
}
auto field_val = columns->LookupWithDefault(i);
auto writer_info = info.ToBroker();
if ( ! field_val )
{
column_data.fields.emplace_back(broker::record::field{});
continue;
}
broker::vector fields_data;
auto opt_field_data = val_to_data(field_val);
Unref(field_val);
for ( auto i = 0; i < num_fields; ++i )
{
auto field_data = threading_field_to_data(fields[i]);
fields_data.push_back(move(field_data));
}
if ( ! opt_field_data )
// TODO: If peer is given, send message to just that one destination.
std::string topic = std::string("bro/log/") + stream_name;
auto bstream_name = broker::enum_value(move(stream_name));
auto bwriter_name = broker::enum_value(move(writer_name));
broker::message msg{move("create"), move(bstream_name), move(bwriter_name), move(writer_info), move(fields_data)};
endpoint->send(move(topic), move(msg), flags);
return true;
}
bool bro_broker::Manager::Log(EnumVal* stream, EnumVal* writer, string path, int num_vals, const threading::Value* const * vals, int flags)
{
if ( ! Enabled() )
return false;
auto stream_name = stream->Type()->AsEnumType()->Lookup(stream->AsEnum());
if ( ! stream_name )
{
reporter->Error("Failed to remotely log: stream %d doesn't have name",
stream->AsEnum());
return false;
}
auto writer_name = writer->Type()->AsEnumType()->Lookup(writer->AsEnum());
if ( ! writer_name )
{
reporter->Error("Failed to remotely log: writer %d doesn't have name",
writer->AsEnum());
return false;
}
broker::vector vals_data;
for ( auto i = 0; i < num_vals; ++i )
{
auto field_data = threading_val_to_data(vals[i]);
if ( ! field_data )
{
reporter->Error("Failed to remotely log stream %s: "
"unsupported type '%s'",
stream_name,
type_name(info->FieldDecl(i)->type->Tag()));
"unsupported type for field #%d",
stream_name, i);
return false;
}
column_data.fields.emplace_back(
broker::record::field{move(*opt_field_data)});
vals_data.push_back(move(*field_data));
}
broker::message msg{broker::enum_value{stream_name}, move(column_data)};
std::string topic = std::string("bro/log/") + stream_name;
auto bstream_name = broker::enum_value(move(stream_name));
auto bwriter_name = broker::enum_value(move(writer_name));
broker::message msg{move("write"), move(bstream_name), move(bwriter_name), move(path), move(vals_data)};
endpoint->send(move(topic), move(msg), flags);
return true;
}
@ -639,6 +690,8 @@ void bro_broker::Manager::Process()
{
switch ( u.status ) {
case broker::outgoing_connection_status::tag::established:
log_mgr->SendAllWritersTo(u.peer_name);
if ( Broker::outgoing_connection_established )
{
val_list* vl = new val_list;
@ -684,6 +737,8 @@ void bro_broker::Manager::Process()
{
switch ( u.status ) {
case broker::incoming_connection_status::tag::established:
log_mgr->SendAllWritersTo(u.peer_name);
if ( Broker::incoming_connection_established )
{
val_list* vl = new val_list;
@ -809,12 +864,6 @@ void bro_broker::Manager::Process()
}
}
struct unref_guard {
unref_guard(Val* v) : val(v) {}
~unref_guard() { Unref(val); }
Val* val;
};
for ( auto& ls : log_subscriptions )
{
auto log_messages = ls.second.q.want_pop();
@ -826,59 +875,19 @@ void bro_broker::Manager::Process()
for ( auto& lm : log_messages )
{
if ( lm.size() != 2 )
if ( lm.size() < 1 )
{
reporter->Warning("got bad remote log size: %zd (expect 2)",
lm.size());
reporter->Warning("got bad remote log message, no type field");
continue;
}
if ( ! broker::get<broker::enum_value>(lm[0]) )
{
reporter->Warning("got remote log w/o stream id: %d",
static_cast<int>(broker::which(lm[0])));
continue;
}
if ( ! broker::get<broker::record>(lm[1]) )
{
reporter->Warning("got remote log w/o columns: %d",
static_cast<int>(broker::which(lm[1])));
continue;
}
auto stream_id = data_to_val(move(lm[0]), log_id_type);
if ( ! stream_id )
{
reporter->Warning("failed to unpack remote log stream id");
continue;
}
unref_guard stream_id_unreffer{stream_id};
auto columns_type = log_mgr->StreamColumns(stream_id->AsEnumVal());
if ( ! columns_type )
{
reporter->Warning("got remote log for unknown stream: %s",
stream_id->Type()->AsEnumType()->Lookup(
stream_id->AsEnum()));
continue;
}
auto columns = data_to_val(move(lm[1]), columns_type, true);
if ( ! columns )
{
reporter->Warning("failed to unpack remote log stream columns"
" for stream: %s",
stream_id->Type()->AsEnumType()->Lookup(
stream_id->AsEnum()));
continue;
}
log_mgr->Write(stream_id->AsEnumVal(), columns->AsRecordVal());
Unref(columns);
if ( lm[0] == "create" )
ProcessCreateLog(std::move(lm));
else if ( lm[0] == "write" )
ProcessWriteLog(std::move(lm));
else
reporter->Warning("got remote log w/o known type: %d",
static_cast<int>(broker::which(lm[0])));
}
}
@ -979,6 +988,202 @@ void bro_broker::Manager::Process()
next_timestamp = -1;
}
bool bro_broker::Manager::ProcessCreateLog(broker::message msg)
{
if ( msg.size() != 5 )
{
reporter->Warning("got bad remote log create size: %zd (expected 5)",
msg.size());
return false;
}
unsigned int idx = 1; // Skip type at index 0.
// Get stream ID.
if ( ! broker::get<broker::enum_value>(msg[idx]) )
{
reporter->Warning("got remote log create w/o stream id: %d",
static_cast<int>(broker::which(msg[idx])));
return false;
}
auto stream_id = data_to_val(move(msg[idx]), log_id_type);
if ( ! stream_id )
{
reporter->Warning("failed to unpack remote log stream id");
return false;
}
unref_guard stream_id_unreffer{stream_id};
++idx;
// Get writer ID.
if ( ! broker::get<broker::enum_value>(msg[idx]) )
{
reporter->Warning("got remote log w/o writer id: %d",
static_cast<int>(broker::which(msg[idx])));
return false;
}
auto writer_id = data_to_val(move(msg[idx]), writer_id_type);
if ( ! writer_id )
{
reporter->Warning("failed to unpack remote log writer id");
return false;
}
unref_guard writer_id_unreffer{writer_id};
++idx;
// Get writer info.
if ( ! broker::get<broker::record>(msg[idx]) )
{
reporter->Warning("got remote log create w/o writer info id: %d",
static_cast<int>(broker::which(msg[idx])));
return false;
}
auto writer_info = std::unique_ptr<logging::WriterBackend::WriterInfo>(new logging::WriterBackend::WriterInfo);
if ( ! writer_info->FromBroker(std::move(msg[idx])) )
{
reporter->Warning("failed to unpack remote log writer info");
return false;
}
++idx;
// Get log fields.
auto fields_data = broker::get<broker::vector>(msg[idx]);
if ( ! fields_data )
{
reporter->Warning("failed to unpack remote log fields");
return false;
}
auto num_fields = fields_data->size();
auto fields = new threading::Field* [num_fields];
for ( auto i = 0u; i < num_fields; ++i )
{
if ( auto field = data_to_threading_field((*fields_data)[i]) )
fields[i] = field;
else
{
reporter->Warning("failed to convert remote log field # %d", i);
return false;
}
}
if ( ! log_mgr->CreateWriterForRemoteLog(stream_id->AsEnumVal(), writer_id->AsEnumVal(), writer_info.get(), num_fields, fields) )
{
ODesc d;
stream_id->Describe(&d);
reporter->Warning("failed to create remote log stream for %s locally", d.Description());
}
writer_info.release(); // log_mgr took ownership.
return true;
}
bool bro_broker::Manager::ProcessWriteLog(broker::message msg)
{
if ( msg.size() != 5 )
{
reporter->Warning("got bad remote log size: %zd (expected 5)",
msg.size());
return false;
}
unsigned int idx = 1; // Skip type at index 0.
// Get stream ID.
if ( ! broker::get<broker::enum_value>(msg[idx]) )
{
reporter->Warning("got remote log w/o stream id: %d",
static_cast<int>(broker::which(msg[idx])));
return false;
}
auto stream_id = data_to_val(move(msg[idx]), log_id_type);
if ( ! stream_id )
{
reporter->Warning("failed to unpack remote log stream id");
return false;
}
unref_guard stream_id_unreffer{stream_id};
++idx;
// Get writer ID.
if ( ! broker::get<broker::enum_value>(msg[idx]) )
{
reporter->Warning("got remote log w/o writer id: %d",
static_cast<int>(broker::which(msg[idx])));
return false;
}
auto writer_id = data_to_val(move(msg[idx]), writer_id_type);
if ( ! writer_id )
{
reporter->Warning("failed to unpack remote log writer id");
return false;
}
unref_guard writer_id_unreffer{writer_id};
++idx;
// Get path.
auto path = broker::get<std::string>(msg[idx]);
if ( ! path )
{
reporter->Warning("failed to unpack remote log path");
return false;
}
++idx;
// Get log values.
auto vals_data = broker::get<broker::vector>(msg[idx]);
if ( ! vals_data )
{
reporter->Warning("failed to unpack remote log values");
return false;
}
auto num_vals = vals_data->size();
auto vals = new threading::Value* [num_vals];
for ( auto i = 0u; i < num_vals; ++i )
{
if ( auto val = data_to_threading_val((*vals_data)[i]) )
vals[i] = val;
else
{
reporter->Warning("failed to convert remote log arg # %d", i);
return false;
}
}
log_mgr->WriteFromRemote(stream_id->AsEnumVal(), writer_id->AsEnumVal(), *path, num_vals, vals);
return true;
}
bool bro_broker::Manager::AddStore(StoreHandleVal* handle)
{
if ( ! Enabled() )