diff --git a/src/logging/Manager.cc b/src/logging/Manager.cc index 01844e596c..dbd474a582 100644 --- a/src/logging/Manager.cc +++ b/src/logging/Manager.cc @@ -681,8 +681,11 @@ bool Manager::Write(EnumVal* id, RecordVal* columns_arg) { stream->total_writes->Inc(); - // Send to each of our filters. - for ( list::iterator i = stream->filters.begin(); i != stream->filters.end(); ++i ) { + return WriteToFilters(stream, columns, stream_veto ? PolicyVerdict::VETO : PolicyVerdict::PASS); +} + +bool Manager::WriteToFilters(const Manager::Stream* stream, zeek::RecordValPtr columns, PolicyVerdict stream_verdict) { + for ( list::const_iterator i = stream->filters.begin(); i != stream->filters.end(); ++i ) { Filter* filter = *i; string path = filter->path; @@ -692,12 +695,15 @@ bool Manager::Write(EnumVal* id, RecordVal* columns_arg) { // handlers/bodies. Doing this skips sampling and // plugin hooks, though, so for now we do invoke. if ( filter->policy ) { - auto v = filter->policy->Invoke(columns, IntrusivePtr{NewRef{}, id}, IntrusivePtr{NewRef{}, filter->fval}); + auto v = filter->policy->Invoke(columns, IntrusivePtr{NewRef{}, stream->id}, + IntrusivePtr{NewRef{}, filter->fval}); if ( v && ! v->AsBool() ) continue; } - if ( stream_veto ) + // Even if Log::log_stream_policy vetoed, we, invoke filter policy + // hooks. Skip actually writing here. + if ( stream_verdict == PolicyVerdict::VETO ) continue; if ( filter->path_func ) { @@ -717,7 +723,8 @@ bool Manager::Write(EnumVal* id, RecordVal* columns_arg) { // Can be TYPE_ANY here. rec_arg = columns; - auto v = filter->path_func->Invoke(IntrusivePtr{NewRef{}, id}, std::move(path_arg), std::move(rec_arg)); + auto v = + filter->path_func->Invoke(IntrusivePtr{NewRef{}, stream->id}, std::move(path_arg), std::move(rec_arg)); if ( ! v ) return false; @@ -743,7 +750,7 @@ bool Manager::Write(EnumVal* id, RecordVal* columns_arg) { Stream::WriterPathPair wpp(filter->writer->AsEnum(), path); // See if we already have a writer for this path. - Stream::WriterMap::iterator w = stream->writers.find(wpp); + Stream::WriterMap::const_iterator w = stream->writers.find(wpp); if ( w != stream->writers.end() && CheckFilterWriterConflict(w->second, filter) ) { // Auto-correct path due to conflict over the writer/path pairs. @@ -1003,7 +1010,7 @@ threading::Value* Manager::ValToLogVal(std::optional& val, Type* ty) { return lval; } -threading::Value** Manager::RecordToFilterVals(Stream* stream, Filter* filter, RecordVal* columns) { +threading::Value** Manager::RecordToFilterVals(const Stream* stream, Filter* filter, RecordVal* columns) { RecordValPtr ext_rec; if ( filter->num_ext_fields > 0 ) { diff --git a/src/logging/Manager.h b/src/logging/Manager.h index 3a42d4ca5f..7ca1500639 100644 --- a/src/logging/Manager.h +++ b/src/logging/Manager.h @@ -283,7 +283,7 @@ private: bool TraverseRecord(Stream* stream, Filter* filter, RecordType* rt, TableVal* include, TableVal* exclude, const std::string& path, const std::list& indices); - threading::Value** RecordToFilterVals(Stream* stream, Filter* filter, RecordVal* columns); + threading::Value** RecordToFilterVals(const Stream* stream, Filter* filter, RecordVal* columns); threading::Value* ValToLogVal(std::optional& val, Type* ty); Stream* FindStream(EnumVal* id); @@ -294,6 +294,13 @@ private: bool CompareFields(const Filter* filter, const WriterFrontend* writer); bool CheckFilterWriterConflict(const WriterInfo* winfo, const Filter* filter); + // Verdict of a PolicyHook. + enum class PolicyVerdict { + PASS, + VETO, + }; + bool WriteToFilters(const Manager::Stream* stream, zeek::RecordValPtr columns, PolicyVerdict stream_verdict); + bool RemoveStream(unsigned int idx); std::vector streams; // Indexed by stream enum.