Add options to filter at the stream level as well as globally

This commit is contained in:
Tim Wojtulewicz 2025-08-05 14:50:15 -07:00
parent 339d46ae26
commit 0ec2161b04
30 changed files with 420 additions and 129 deletions

View file

@ -243,7 +243,6 @@ struct Manager::WriterInfo {
bool from_remote = false;
bool hook_initialized = false;
string instantiating_filter;
string stream_name;
std::shared_ptr<telemetry::Counter> total_writes;
std::shared_ptr<telemetry::Counter> total_discarded_writes;
@ -289,10 +288,13 @@ struct Manager::Stream {
zeek_uint_t max_delay_queue_size = 1;
bool evicting = false;
size_t max_field_string_bytes = 0;
size_t max_total_string_bytes = 0;
size_t max_field_container_elements = 0;
size_t max_total_container_elements = 0;
~Stream();
const detail::DelayInfoPtr& GetDelayInfo(const detail::WriteContext& ctx);
void EnqueueWriteForDelay(const detail::WriteContext& ctx);
@ -520,22 +522,6 @@ void Manager::InitPostScript() {
rotation_format_func = id::find_func("Log::rotation_format_func");
log_stream_policy_hook = id::find_func("Log::log_stream_policy");
max_log_record_size = id::find_val("Log::max_log_record_size")->AsCount();
max_field_string_bytes = id::find_val("Log::max_field_string_bytes")->AsCount();
if ( max_field_string_bytes == 0 )
max_field_string_bytes = std::numeric_limits<size_t>::max();
max_total_string_bytes = id::find_val("Log::max_total_string_bytes")->AsCount();
if ( max_total_string_bytes == 0 )
max_total_string_bytes = std::numeric_limits<size_t>::max();
max_field_container_elements = id::find_val("Log::max_field_container_elements")->AsCount();
if ( max_field_container_elements == 0 )
max_field_container_elements = std::numeric_limits<size_t>::max();
max_total_container_elements = id::find_val("Log::max_total_container_elements")->AsCount();
if ( max_total_container_elements == 0 )
max_total_container_elements = std::numeric_limits<size_t>::max();
}
WriterBackend* Manager::CreateBackend(WriterFrontend* frontend, EnumVal* tag) {
@ -691,9 +677,24 @@ bool Manager::CreateStream(EnumVal* id, RecordVal* sval) {
streams[idx]->columns = columns->Ref()->AsRecordType();
streams[idx]->max_delay_interval = sval->GetField("max_delay_interval")->AsInterval();
streams[idx]->max_delay_queue_size = sval->GetField("max_delay_queue_size")->AsCount();
streams[idx]->enable_remote = id::find_val("Log::enable_remote_logging")->AsBool();
streams[idx]->max_field_string_bytes = sval->GetField("max_field_string_bytes")->AsCount();
if ( streams[idx]->max_field_string_bytes == 0 )
streams[idx]->max_field_string_bytes = std::numeric_limits<size_t>::max();
streams[idx]->max_total_string_bytes = sval->GetField("max_total_string_bytes")->AsCount();
if ( streams[idx]->max_total_string_bytes == 0 )
streams[idx]->max_total_string_bytes = std::numeric_limits<size_t>::max();
streams[idx]->max_field_container_elements = sval->GetField("max_field_container_elements")->AsCount();
if ( streams[idx]->max_field_container_elements == 0 )
streams[idx]->max_field_container_elements = std::numeric_limits<size_t>::max();
streams[idx]->max_total_container_elements = sval->GetField("max_total_container_elements")->AsCount();
if ( streams[idx]->max_total_container_elements == 0 )
streams[idx]->max_total_container_elements = std::numeric_limits<size_t>::max();
DBG_LOG(DBG_LOGGING, "Created new logging stream '%s', raising event %s", streams[idx]->name.c_str(),
event ? streams[idx]->event->Name() : "<none>");
@ -1183,7 +1184,7 @@ bool Manager::WriteToFilters(const Manager::Stream* stream, zeek::RecordValPtr c
size_t total_size = 0;
total_string_bytes = 0;
total_container_elements = 0;
auto rec = RecordToLogRecord(w->second, filter, columns.get(), total_size);
auto rec = RecordToLogRecord(w->second, filter, stream, columns.get(), total_size);
if ( total_size > max_log_record_size ) {
reporter->Weird("log_record_too_large", util::fmt("%s", stream->name.c_str()));
@ -1422,7 +1423,8 @@ bool Manager::SetMaxDelayQueueSize(const EnumValPtr& id, zeek_uint_t queue_size)
return true;
}
threading::Value Manager::ValToLogVal(WriterInfo* info, std::optional<ZVal>& val, Type* ty, size_t& total_size) {
threading::Value Manager::ValToLogVal(WriterInfo* info, const Stream* stream, std::optional<ZVal>& val, Type* ty,
size_t& total_size) {
if ( ! val )
return {ty->Tag(), false};
@ -1499,11 +1501,11 @@ threading::Value Manager::ValToLogVal(WriterInfo* info, std::optional<ZVal>& val
case TYPE_STRING: {
const String* s = val->AsString()->AsString();
size_t allowed_bytes = std::min(
{static_cast<size_t>(s->Len()), max_field_string_bytes, max_total_string_bytes - total_string_bytes});
size_t allowed_bytes = std::min({static_cast<size_t>(s->Len()), stream->max_field_string_bytes,
stream->max_total_string_bytes - total_string_bytes});
if ( allowed_bytes < static_cast<size_t>(s->Len()) ) {
reporter->Weird("log_string_field_truncated", util::fmt("%s", info->stream_name.c_str()));
reporter->Weird("log_string_field_truncated", util::fmt("%s", stream->name.c_str()));
info->total_truncated_string_fields->Inc();
}
@ -1555,11 +1557,12 @@ threading::Value Manager::ValToLogVal(WriterInfo* info, std::optional<ZVal>& val
auto& set_t = tbl_t->GetIndexTypes()[0];
bool is_managed = ZVal::IsManagedType(set_t);
size_t allowed_elements = std::min({static_cast<size_t>(set->Length()), max_field_container_elements,
max_total_container_elements - total_container_elements});
size_t allowed_elements =
std::min({static_cast<size_t>(set->Length()), stream->max_field_container_elements,
stream->max_total_container_elements - total_container_elements});
if ( allowed_elements < static_cast<size_t>(set->Length()) ) {
reporter->Weird("log_container_field_truncated", util::fmt("%s", info->stream_name.c_str()));
reporter->Weird("log_container_field_truncated", util::fmt("%s", stream->name.c_str()));
info->total_truncated_containers->Inc();
}
@ -1570,7 +1573,8 @@ threading::Value Manager::ValToLogVal(WriterInfo* info, std::optional<ZVal>& val
for ( size_t i = 0; i < allowed_elements && total_size < max_log_record_size; i++ ) {
std::optional<ZVal> s_i = ZVal(set->Idx(i), set_t);
lval.val.set_val.vals[i] = new threading::Value(ValToLogVal(info, s_i, set_t.get(), total_size));
lval.val.set_val.vals[i] =
new threading::Value(ValToLogVal(info, stream, s_i, set_t.get(), total_size));
if ( is_managed )
ZVal::DeleteManagedType(*s_i);
lval.val.set_val.size++;
@ -1584,11 +1588,11 @@ threading::Value Manager::ValToLogVal(WriterInfo* info, std::optional<ZVal>& val
case TYPE_VECTOR: {
VectorVal* vec = val->AsVector();
size_t allowed_elements = std::min({static_cast<size_t>(vec->Size()), max_field_container_elements,
max_total_container_elements - total_container_elements});
size_t allowed_elements = std::min({static_cast<size_t>(vec->Size()), stream->max_field_container_elements,
stream->max_total_container_elements - total_container_elements});
if ( allowed_elements < static_cast<size_t>(vec->Size()) ) {
reporter->Weird("log_container_field_truncated", util::fmt("%s", info->stream_name.c_str()));
reporter->Weird("log_container_field_truncated", util::fmt("%s", stream->name.c_str()));
info->total_truncated_containers->Inc();
}
@ -1601,7 +1605,8 @@ threading::Value Manager::ValToLogVal(WriterInfo* info, std::optional<ZVal>& val
auto& vt = vec->GetType()->Yield();
for ( size_t i = 0; i < allowed_elements && total_size < max_log_record_size; i++ ) {
lval.val.vector_val.vals[i] = new threading::Value(ValToLogVal(info, vv[i], vt.get(), total_size));
lval.val.vector_val.vals[i] =
new threading::Value(ValToLogVal(info, stream, vv[i], vt.get(), total_size));
lval.val.vector_val.size++;
}
@ -1616,7 +1621,8 @@ threading::Value Manager::ValToLogVal(WriterInfo* info, std::optional<ZVal>& val
return lval;
}
detail::LogRecord Manager::RecordToLogRecord(WriterInfo* info, Filter* filter, RecordVal* columns, size_t& total_size) {
detail::LogRecord Manager::RecordToLogRecord(WriterInfo* info, Filter* filter, const Stream* stream, RecordVal* columns,
size_t& total_size) {
RecordValPtr ext_rec;
if ( filter->num_ext_fields > 0 ) {
@ -1666,7 +1672,7 @@ detail::LogRecord Manager::RecordToLogRecord(WriterInfo* info, Filter* filter, R
}
if ( val )
vals.emplace_back(ValToLogVal(info, val, vt, total_size));
vals.emplace_back(ValToLogVal(info, stream, val, vt, total_size));
if ( total_size > max_log_record_size ) {
return {};
@ -1734,7 +1740,6 @@ WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, WriterBacken
winfo->from_remote = from_remote;
winfo->hook_initialized = false;
winfo->instantiating_filter = instantiating_filter;
winfo->stream_name = stream->name;
// Search for a corresponding filter for the writer/path pair and use its
// rotation settings. If no matching filter is found, fall back on