Implement string- and container-length filtering at the log record level

This commit is contained in:
Tim Wojtulewicz 2025-07-25 13:19:47 -07:00
parent cc59bfa5d8
commit e2e7ab28da
11 changed files with 227 additions and 10 deletions

View file

@ -504,6 +504,22 @@ void Manager::InitPostScript() {
rotation_format_func = id::find_func("Log::rotation_format_func");
log_stream_policy_hook = id::find_func("Log::log_stream_policy");
max_log_record_size = id::find_val("Log::max_log_record_size")->AsCount();
max_field_string_bytes = id::find_val("Log::max_field_string_bytes")->AsCount();
if ( max_field_string_bytes == 0 )
max_field_string_bytes = std::numeric_limits<size_t>::max();
max_total_string_bytes = id::find_val("Log::max_total_string_bytes")->AsCount();
if ( max_total_string_bytes == 0 )
max_total_string_bytes = std::numeric_limits<size_t>::max();
max_field_container_elements = id::find_val("Log::max_field_container_elements")->AsCount();
if ( max_field_container_elements == 0 )
max_field_container_elements = std::numeric_limits<size_t>::max();
max_total_container_elements = id::find_val("Log::max_total_container_elements")->AsCount();
if ( max_total_container_elements == 0 )
max_total_container_elements = std::numeric_limits<size_t>::max();
}
WriterBackend* Manager::CreateBackend(WriterFrontend* frontend, EnumVal* tag) {
@ -1149,6 +1165,8 @@ bool Manager::WriteToFilters(const Manager::Stream* stream, zeek::RecordValPtr c
// Alright, can do the write now.
size_t total_size = 0;
total_string_bytes = 0;
total_container_elements = 0;
auto rec = RecordToLogRecord(stream, filter, columns.get(), total_size);
if ( total_size > max_log_record_size ) {
@ -1464,12 +1482,20 @@ threading::Value Manager::ValToLogVal(std::optional<ZVal>& val, Type* ty, size_t
case TYPE_STRING: {
const String* s = val->AsString()->AsString();
char* buf = new char[s->Len()];
memcpy(buf, s->Bytes(), s->Len());
size_t allowed_bytes = std::min(
{static_cast<size_t>(s->Len()), max_field_string_bytes, max_total_string_bytes - total_string_bytes});
if ( allowed_bytes == 0 )
return lval;
char* buf = new char[allowed_bytes];
memcpy(buf, s->Bytes(), allowed_bytes);
lval.val.string_val.data = buf;
lval.val.string_val.length = s->Len();
total_size += lval.val.string_val.length;
lval.val.string_val.length = allowed_bytes;
total_size += allowed_bytes;
total_string_bytes += allowed_bytes;
break;
}
@ -1508,10 +1534,15 @@ threading::Value Manager::ValToLogVal(std::optional<ZVal>& val, Type* ty, size_t
auto& set_t = tbl_t->GetIndexTypes()[0];
bool is_managed = ZVal::IsManagedType(set_t);
zeek_int_t set_length = set->Length();
lval.val.set_val.vals = new threading::Value*[set_length];
size_t allowed_elements = std::min({static_cast<size_t>(set->Length()), max_field_container_elements,
max_total_container_elements - total_container_elements});
for ( zeek_int_t i = 0; i < set_length && total_size < max_log_record_size; i++ ) {
if ( allowed_elements == 0 )
return lval;
lval.val.set_val.vals = new threading::Value*[allowed_elements];
for ( size_t i = 0; i < allowed_elements && total_size < max_log_record_size; i++ ) {
std::optional<ZVal> s_i = ZVal(set->Idx(i), set_t);
lval.val.set_val.vals[i] = new threading::Value(ValToLogVal(s_i, set_t.get(), total_size));
if ( is_managed )
@ -1519,22 +1550,32 @@ threading::Value Manager::ValToLogVal(std::optional<ZVal>& val, Type* ty, size_t
lval.val.set_val.size++;
}
total_container_elements += lval.val.set_val.size;
break;
}
case TYPE_VECTOR: {
VectorVal* vec = val->AsVector();
zeek_int_t vec_length = vec->Size();
lval.val.vector_val.vals = new threading::Value*[vec_length];
size_t allowed_elements = std::min({static_cast<size_t>(vec->Size()), max_field_container_elements,
max_total_container_elements - total_container_elements});
if ( allowed_elements == 0 )
return lval;
lval.val.vector_val.vals = new threading::Value*[allowed_elements];
auto& vv = vec->RawVec();
auto& vt = vec->GetType()->Yield();
for ( zeek_int_t i = 0; i < vec_length && total_size < max_log_record_size; i++ ) {
for ( size_t i = 0; i < allowed_elements && total_size < max_log_record_size; i++ ) {
lval.val.vector_val.vals[i] = new threading::Value(ValToLogVal(vv[i], vt.get(), total_size));
lval.val.vector_val.size++;
}
total_container_elements += lval.val.vector_val.size;
break;
}