Make total_size counter a member in logging::Manager

This commit is contained in:
Tim Wojtulewicz 2025-08-11 14:19:44 -07:00
parent 98a77b5f25
commit 29425688da
2 changed files with 25 additions and 29 deletions

View file

@ -1170,12 +1170,12 @@ bool Manager::WriteToFilters(const Manager::Stream* stream, zeek::RecordValPtr c
assert(info); assert(info);
// Alright, can do the write now. // Alright, can do the write now.
size_t total_size = 0; total_record_size = 0;
total_string_bytes = 0; total_string_bytes = 0;
total_container_elements = 0; total_container_elements = 0;
auto rec = RecordToLogRecord(w->second, filter, stream, columns.get(), total_size); auto rec = RecordToLogRecord(w->second, filter, stream, columns.get());
if ( total_size > max_log_record_size ) { if ( total_record_size > max_log_record_size ) {
reporter->Weird("log_record_too_large", util::fmt("%s", stream->name.c_str())); reporter->Weird("log_record_too_large", util::fmt("%s", stream->name.c_str()));
w->second->total_discarded_writes->Inc(); w->second->total_discarded_writes->Inc();
continue; continue;
@ -1426,8 +1426,7 @@ static size_t calculate_allowed(size_t field_size, size_t max_field_size, size_t
} }
threading::Value Manager::ValToLogVal(WriterInfo* info, const Stream* stream, std::optional<ZVal>& val, Type* ty, threading::Value Manager::ValToLogVal(WriterInfo* info, const Stream* stream, std::optional<ZVal>& val, Type* ty) {
size_t& total_size) {
if ( ! val ) if ( ! val )
return {ty->Tag(), false}; return {ty->Tag(), false};
@ -1437,7 +1436,7 @@ threading::Value Manager::ValToLogVal(WriterInfo* info, const Stream* stream, st
case TYPE_BOOL: case TYPE_BOOL:
case TYPE_INT: case TYPE_INT:
lval.val.int_val = val->AsInt(); lval.val.int_val = val->AsInt();
total_size += sizeof(lval.val.int_val); total_record_size += sizeof(lval.val.int_val);
break; break;
case TYPE_ENUM: { case TYPE_ENUM: {
@ -1456,14 +1455,14 @@ threading::Value Manager::ValToLogVal(WriterInfo* info, const Stream* stream, st
lval.val.string_val.length = 0; lval.val.string_val.length = 0;
} }
total_size += lval.val.string_val.length; total_record_size += lval.val.string_val.length;
break; break;
} }
case TYPE_COUNT: case TYPE_COUNT:
lval.val.uint_val = val->AsCount(); lval.val.uint_val = val->AsCount();
total_size += sizeof(lval.val.uint_val); total_record_size += sizeof(lval.val.uint_val);
break; break;
case TYPE_PORT: { case TYPE_PORT: {
@ -1480,25 +1479,25 @@ threading::Value Manager::ValToLogVal(WriterInfo* info, const Stream* stream, st
lval.val.port_val.port = p & ~PORT_SPACE_MASK; lval.val.port_val.port = p & ~PORT_SPACE_MASK;
lval.val.port_val.proto = pt; lval.val.port_val.proto = pt;
total_size += lval.val.port_val.size(); total_record_size += lval.val.port_val.size();
break; break;
} }
case TYPE_SUBNET: case TYPE_SUBNET:
val->AsSubNet()->Get().ConvertToThreadingValue(&lval.val.subnet_val); val->AsSubNet()->Get().ConvertToThreadingValue(&lval.val.subnet_val);
total_size += lval.val.subnet_val.size(); total_record_size += lval.val.subnet_val.size();
break; break;
case TYPE_ADDR: case TYPE_ADDR:
val->AsAddr()->Get().ConvertToThreadingValue(&lval.val.addr_val); val->AsAddr()->Get().ConvertToThreadingValue(&lval.val.addr_val);
total_size += lval.val.addr_val.size(); total_record_size += lval.val.addr_val.size();
break; break;
case TYPE_DOUBLE: case TYPE_DOUBLE:
case TYPE_TIME: case TYPE_TIME:
case TYPE_INTERVAL: case TYPE_INTERVAL:
lval.val.double_val = val->AsDouble(); lval.val.double_val = val->AsDouble();
total_size += sizeof(lval.val.double_val); total_record_size += sizeof(lval.val.double_val);
break; break;
case TYPE_STRING: { case TYPE_STRING: {
@ -1519,7 +1518,7 @@ threading::Value Manager::ValToLogVal(WriterInfo* info, const Stream* stream, st
lval.val.string_val.data = buf; lval.val.string_val.data = buf;
lval.val.string_val.length = allowed_bytes; lval.val.string_val.length = allowed_bytes;
total_size += allowed_bytes; total_record_size += allowed_bytes;
total_string_bytes += allowed_bytes; total_string_bytes += allowed_bytes;
break; break;
} }
@ -1530,7 +1529,7 @@ threading::Value Manager::ValToLogVal(WriterInfo* info, const Stream* stream, st
auto len = strlen(s); auto len = strlen(s);
lval.val.string_val.data = util::copy_string(s, len); lval.val.string_val.data = util::copy_string(s, len);
lval.val.string_val.length = len; lval.val.string_val.length = len;
total_size += lval.val.string_val.length; total_record_size += lval.val.string_val.length;
break; break;
} }
@ -1542,7 +1541,7 @@ threading::Value Manager::ValToLogVal(WriterInfo* info, const Stream* stream, st
auto len = strlen(s); auto len = strlen(s);
lval.val.string_val.data = util::copy_string(s, len); lval.val.string_val.data = util::copy_string(s, len);
lval.val.string_val.length = len; lval.val.string_val.length = len;
total_size += lval.val.string_val.length; total_record_size += lval.val.string_val.length;
break; break;
} }
@ -1573,10 +1572,9 @@ threading::Value Manager::ValToLogVal(WriterInfo* info, const Stream* stream, st
lval.val.set_val.vals = new threading::Value*[allowed_elements]; lval.val.set_val.vals = new threading::Value*[allowed_elements];
for ( size_t i = 0; i < allowed_elements && total_size < max_log_record_size; i++ ) { for ( size_t i = 0; i < allowed_elements && total_record_size < max_log_record_size; i++ ) {
std::optional<ZVal> s_i = ZVal(set->Idx(i), set_t); std::optional<ZVal> s_i = ZVal(set->Idx(i), set_t);
lval.val.set_val.vals[i] = lval.val.set_val.vals[i] = new threading::Value(ValToLogVal(info, stream, s_i, set_t.get()));
new threading::Value(ValToLogVal(info, stream, s_i, set_t.get(), total_size));
if ( is_managed ) if ( is_managed )
ZVal::DeleteManagedType(*s_i); ZVal::DeleteManagedType(*s_i);
lval.val.set_val.size++; lval.val.set_val.size++;
@ -1607,9 +1605,8 @@ threading::Value Manager::ValToLogVal(WriterInfo* info, const Stream* stream, st
auto& vv = vec->RawVec(); auto& vv = vec->RawVec();
auto& vt = vec->GetType()->Yield(); auto& vt = vec->GetType()->Yield();
for ( size_t i = 0; i < allowed_elements && total_size < max_log_record_size; i++ ) { for ( size_t i = 0; i < allowed_elements && total_record_size < max_log_record_size; i++ ) {
lval.val.vector_val.vals[i] = lval.val.vector_val.vals[i] = new threading::Value(ValToLogVal(info, stream, vv[i], vt.get()));
new threading::Value(ValToLogVal(info, stream, vv[i], vt.get(), total_size));
lval.val.vector_val.size++; lval.val.vector_val.size++;
} }
@ -1624,8 +1621,8 @@ threading::Value Manager::ValToLogVal(WriterInfo* info, const Stream* stream, st
return lval; return lval;
} }
detail::LogRecord Manager::RecordToLogRecord(WriterInfo* info, Filter* filter, const Stream* stream, RecordVal* columns, detail::LogRecord Manager::RecordToLogRecord(WriterInfo* info, Filter* filter, const Stream* stream,
size_t& total_size) { RecordVal* columns) {
RecordValPtr ext_rec; RecordValPtr ext_rec;
if ( filter->num_ext_fields > 0 ) { if ( filter->num_ext_fields > 0 ) {
@ -1675,9 +1672,9 @@ detail::LogRecord Manager::RecordToLogRecord(WriterInfo* info, Filter* filter, c
} }
if ( val ) if ( val )
vals.emplace_back(ValToLogVal(info, stream, val, vt, total_size)); vals.emplace_back(ValToLogVal(info, stream, val, vt));
if ( total_size > max_log_record_size ) { if ( total_record_size > max_log_record_size ) {
return {}; return {};
} }
} }

View file

@ -422,10 +422,8 @@ private:
bool TraverseRecord(Stream* stream, Filter* filter, RecordType* rt, TableVal* include, TableVal* exclude, bool TraverseRecord(Stream* stream, Filter* filter, RecordType* rt, TableVal* include, TableVal* exclude,
const std::string& path, const std::list<int>& indices); const std::string& path, const std::list<int>& indices);
detail::LogRecord RecordToLogRecord(WriterInfo* info, Filter* filter, const Stream* stream, RecordVal* columns, detail::LogRecord RecordToLogRecord(WriterInfo* info, Filter* filter, const Stream* stream, RecordVal* columns);
size_t& total_size); threading::Value ValToLogVal(WriterInfo* info, const Stream* stream, std::optional<ZVal>& val, Type* ty);
threading::Value ValToLogVal(WriterInfo* info, const Stream* stream, std::optional<ZVal>& val, Type* ty,
size_t& total_size);
Stream* FindStream(EnumVal* id); Stream* FindStream(EnumVal* id);
void RemoveDisabledWriters(Stream* stream); void RemoveDisabledWriters(Stream* stream);
@ -452,6 +450,7 @@ private:
FuncPtr log_stream_policy_hook; FuncPtr log_stream_policy_hook;
size_t max_log_record_size = 0; size_t max_log_record_size = 0;
size_t total_record_size = 0;
size_t total_string_bytes = 0; size_t total_string_bytes = 0;
size_t total_container_elements = 0; size_t total_container_elements = 0;