mirror of
https://github.com/zeek/zeek.git
synced 2025-10-06 16:48:19 +00:00
Improve remote logging via broker.
Only send fields with the &log attribute.
This commit is contained in:
parent
69693663eb
commit
25a4d0ebed
7 changed files with 59 additions and 26 deletions
|
@ -45,6 +45,7 @@ struct val_converter {
|
||||||
using result_type = Val*;
|
using result_type = Val*;
|
||||||
|
|
||||||
BroType* type;
|
BroType* type;
|
||||||
|
bool require_log_attr;
|
||||||
|
|
||||||
result_type operator()(bool a)
|
result_type operator()(bool a)
|
||||||
{
|
{
|
||||||
|
@ -316,14 +317,19 @@ struct val_converter {
|
||||||
return nullptr;
|
return nullptr;
|
||||||
|
|
||||||
auto rt = type->AsRecordType();
|
auto rt = type->AsRecordType();
|
||||||
|
|
||||||
if ( a.fields.size() != static_cast<size_t>(rt->NumFields()) )
|
|
||||||
return nullptr;
|
|
||||||
|
|
||||||
auto rval = new RecordVal(rt);
|
auto rval = new RecordVal(rt);
|
||||||
|
|
||||||
for ( auto i = 0u; i < a.fields.size(); ++i )
|
for ( auto i = 0; i < rt->NumFields(); ++i )
|
||||||
{
|
{
|
||||||
|
if ( require_log_attr && ! rt->FieldDecl(i)->FindAttr(ATTR_LOG) )
|
||||||
|
continue;
|
||||||
|
|
||||||
|
if ( i >= a.fields.size() )
|
||||||
|
{
|
||||||
|
Unref(rval);
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
if ( ! a.fields[i] )
|
if ( ! a.fields[i] )
|
||||||
{
|
{
|
||||||
rval->Assign(i, nullptr);
|
rval->Assign(i, nullptr);
|
||||||
|
@ -346,9 +352,9 @@ struct val_converter {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
Val* comm::data_to_val(broker::data d, BroType* type)
|
Val* comm::data_to_val(broker::data d, BroType* type, bool require_log_attr)
|
||||||
{
|
{
|
||||||
return broker::visit(val_converter{type}, d);
|
return broker::visit(val_converter{type, require_log_attr}, d);
|
||||||
}
|
}
|
||||||
|
|
||||||
broker::util::optional<broker::data> comm::val_to_data(Val* v)
|
broker::util::optional<broker::data> comm::val_to_data(Val* v)
|
||||||
|
|
|
@ -54,10 +54,12 @@ broker::util::optional<broker::data> val_to_data(Val* v);
|
||||||
* Convert a Broker data value to a Bro value.
|
* Convert a Broker data value to a Bro value.
|
||||||
* @param d a Broker data value.
|
* @param d a Broker data value.
|
||||||
* @param type the expected type of the value to return.
|
* @param type the expected type of the value to return.
|
||||||
|
* @param require_log_attr if true, skip over record fields that don't have the
|
||||||
|
* &log attribute.
|
||||||
* @return a pointer to a new Bro value or a nullptr if the conversion was not
|
* @return a pointer to a new Bro value or a nullptr if the conversion was not
|
||||||
* possible.
|
* possible.
|
||||||
*/
|
*/
|
||||||
Val* data_to_val(broker::data d, BroType* type);
|
Val* data_to_val(broker::data d, BroType* type, bool require_log_attr = false);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A Bro value which wraps a Broker data value.
|
* A Bro value which wraps a Broker data value.
|
||||||
|
|
|
@ -193,7 +193,8 @@ bool comm::Manager::Event(std::string topic, broker::message msg, int flags)
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool comm::Manager::Log(EnumVal* stream, RecordVal* columns, int flags)
|
bool comm::Manager::Log(EnumVal* stream, RecordVal* columns, RecordType* info,
|
||||||
|
int flags)
|
||||||
{
|
{
|
||||||
if ( ! Enabled() )
|
if ( ! Enabled() )
|
||||||
return false;
|
return false;
|
||||||
|
@ -207,17 +208,38 @@ bool comm::Manager::Log(EnumVal* stream, RecordVal* columns, int flags)
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto opt_column_data = val_to_data(columns);
|
broker::record column_data;
|
||||||
|
|
||||||
if ( ! opt_column_data )
|
for ( auto i = 0u; i < info->NumFields(); ++i )
|
||||||
{
|
{
|
||||||
reporter->Error("Failed to remotely log stream %s: unsupported types",
|
if ( ! info->FieldDecl(i)->FindAttr(ATTR_LOG) )
|
||||||
stream_name);
|
continue;
|
||||||
return false;
|
|
||||||
|
auto field_val = columns->LookupWithDefault(i);
|
||||||
|
|
||||||
|
if ( ! field_val )
|
||||||
|
{
|
||||||
|
column_data.fields.emplace_back(broker::record::field{});
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto opt_field_data = val_to_data(field_val);
|
||||||
|
Unref(field_val);
|
||||||
|
|
||||||
|
if ( ! opt_field_data )
|
||||||
|
{
|
||||||
|
reporter->Error("Failed to remotely log stream %s: "
|
||||||
|
"unsupported type '%s'",
|
||||||
|
stream_name,
|
||||||
|
type_name(info->FieldDecl(i)->type->Tag()));
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
column_data.fields.emplace_back(
|
||||||
|
broker::record::field{move(*opt_field_data)});
|
||||||
}
|
}
|
||||||
|
|
||||||
broker::message msg{broker::enum_value{stream_name},
|
broker::message msg{broker::enum_value{stream_name}, move(column_data)};
|
||||||
move(*opt_column_data)};
|
|
||||||
std::string topic = std::string("bro/log/") + stream_name;
|
std::string topic = std::string("bro/log/") + stream_name;
|
||||||
endpoint->send(move(topic), move(msg), flags);
|
endpoint->send(move(topic), move(msg), flags);
|
||||||
return true;
|
return true;
|
||||||
|
@ -837,7 +859,7 @@ void comm::Manager::Process()
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto columns = data_to_val(move(lm[1]), columns_type);
|
auto columns = data_to_val(move(lm[1]), columns_type, true);
|
||||||
|
|
||||||
if ( ! columns )
|
if ( ! columns )
|
||||||
{
|
{
|
||||||
|
|
|
@ -153,11 +153,13 @@ public:
|
||||||
* implicitly "bro/log/<stream-name>".
|
* implicitly "bro/log/<stream-name>".
|
||||||
* @param stream_id the stream to which the log entry belongs.
|
* @param stream_id the stream to which the log entry belongs.
|
||||||
* @param columns the data which comprises the log entry.
|
* @param columns the data which comprises the log entry.
|
||||||
|
* @param info the record type corresponding to the log's columns.
|
||||||
* @param flags tune the behavior of how the message is send.
|
* @param flags tune the behavior of how the message is send.
|
||||||
* See the Comm::SendFlags record type.
|
* See the Comm::SendFlags record type.
|
||||||
* @return true if the message is sent successfully.
|
* @return true if the message is sent successfully.
|
||||||
*/
|
*/
|
||||||
bool Log(EnumVal* stream_id, RecordVal* columns, int flags);
|
bool Log(EnumVal* stream_id, RecordVal* columns, RecordType* info,
|
||||||
|
int flags);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Automatically send an event to any interested peers whenever it is
|
* Automatically send an event to any interested peers whenever it is
|
||||||
|
|
|
@ -843,8 +843,8 @@ bool Manager::Write(EnumVal* id, RecordVal* columns)
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef ENABLE_BROKER
|
#ifdef ENABLE_BROKER
|
||||||
if ( stream->enable_remote )
|
if ( stream->enable_remote &&
|
||||||
if ( ! comm_mgr->Log(id, columns, stream->remote_flags) )
|
! comm_mgr->Log(id, columns, stream->columns, stream->remote_flags) )
|
||||||
stream->enable_remote = false;
|
stream->enable_remote = false;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
wrote log, [msg=ping, num=0]
|
wrote log, [msg=ping, num=0, nolog=no]
|
||||||
wrote log, [msg=ping, num=1]
|
wrote log, [msg=ping, num=1, nolog=no]
|
||||||
wrote log, [msg=ping, num=2]
|
wrote log, [msg=ping, num=2, nolog=no]
|
||||||
wrote log, [msg=ping, num=3]
|
wrote log, [msg=ping, num=3, nolog=no]
|
||||||
wrote log, [msg=ping, num=4]
|
wrote log, [msg=ping, num=4, nolog=no]
|
||||||
wrote log, [msg=ping, num=5]
|
wrote log, [msg=ping, num=5, nolog=no]
|
||||||
|
|
|
@ -20,6 +20,7 @@ export {
|
||||||
type Info: record {
|
type Info: record {
|
||||||
msg: string &log;
|
msg: string &log;
|
||||||
num: count &log;
|
num: count &log;
|
||||||
|
nolog: string &default="no";
|
||||||
};
|
};
|
||||||
|
|
||||||
global log_test: event(rec: Test::Info);
|
global log_test: event(rec: Test::Info);
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue