logging/Manager: Split Write()

If we delay in the stream policy hook, we'll need to resume writing
to the attached filters later on. Prepare for that by splitting out
the filter processing.
This commit is contained in:
Arne Welzel 2023-11-12 16:03:23 +01:00
parent 2d0fa13e18
commit 3afd6242c7
2 changed files with 22 additions and 8 deletions

View file

@ -681,8 +681,11 @@ bool Manager::Write(EnumVal* id, RecordVal* columns_arg) {
stream->total_writes->Inc();
// Send to each of our filters.
for ( list<Filter*>::iterator i = stream->filters.begin(); i != stream->filters.end(); ++i ) {
return WriteToFilters(stream, columns, stream_veto ? PolicyVerdict::VETO : PolicyVerdict::PASS);
}
bool Manager::WriteToFilters(const Manager::Stream* stream, zeek::RecordValPtr columns, PolicyVerdict stream_verdict) {
for ( list<Filter*>::const_iterator i = stream->filters.begin(); i != stream->filters.end(); ++i ) {
Filter* filter = *i;
string path = filter->path;
@ -692,12 +695,15 @@ bool Manager::Write(EnumVal* id, RecordVal* columns_arg) {
// handlers/bodies. Doing this skips sampling and
// plugin hooks, though, so for now we do invoke.
if ( filter->policy ) {
auto v = filter->policy->Invoke(columns, IntrusivePtr{NewRef{}, id}, IntrusivePtr{NewRef{}, filter->fval});
auto v = filter->policy->Invoke(columns, IntrusivePtr{NewRef{}, stream->id},
IntrusivePtr{NewRef{}, filter->fval});
if ( v && ! v->AsBool() )
continue;
}
if ( stream_veto )
// Even if Log::log_stream_policy vetoed, we, invoke filter policy
// hooks. Skip actually writing here.
if ( stream_verdict == PolicyVerdict::VETO )
continue;
if ( filter->path_func ) {
@ -717,7 +723,8 @@ bool Manager::Write(EnumVal* id, RecordVal* columns_arg) {
// Can be TYPE_ANY here.
rec_arg = columns;
auto v = filter->path_func->Invoke(IntrusivePtr{NewRef{}, id}, std::move(path_arg), std::move(rec_arg));
auto v =
filter->path_func->Invoke(IntrusivePtr{NewRef{}, stream->id}, std::move(path_arg), std::move(rec_arg));
if ( ! v )
return false;
@ -743,7 +750,7 @@ bool Manager::Write(EnumVal* id, RecordVal* columns_arg) {
Stream::WriterPathPair wpp(filter->writer->AsEnum(), path);
// See if we already have a writer for this path.
Stream::WriterMap::iterator w = stream->writers.find(wpp);
Stream::WriterMap::const_iterator w = stream->writers.find(wpp);
if ( w != stream->writers.end() && CheckFilterWriterConflict(w->second, filter) ) {
// Auto-correct path due to conflict over the writer/path pairs.
@ -1003,7 +1010,7 @@ threading::Value* Manager::ValToLogVal(std::optional<ZVal>& val, Type* ty) {
return lval;
}
threading::Value** Manager::RecordToFilterVals(Stream* stream, Filter* filter, RecordVal* columns) {
threading::Value** Manager::RecordToFilterVals(const Stream* stream, Filter* filter, RecordVal* columns) {
RecordValPtr ext_rec;
if ( filter->num_ext_fields > 0 ) {