From 78999d147d7a98a064d25854eea38468c1af7c5e Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Thu, 19 Sep 2024 13:14:14 +0200 Subject: [PATCH] logging/Manager: Extract another CreateWriter() helper For other cluster backends, CreateWriter() will use a logger's filter configuration rather than receiving all configuration through CreateLog. Extract a helper out from WriteToFilters() for reuse. --- src/logging/Manager.cc | 95 ++++++++++++++++++++++-------------------- src/logging/Manager.h | 17 ++++++++ 2 files changed, 66 insertions(+), 46 deletions(-) diff --git a/src/logging/Manager.cc b/src/logging/Manager.cc index d295ddd603..b422ef783b 100644 --- a/src/logging/Manager.cc +++ b/src/logging/Manager.cc @@ -1116,13 +1116,11 @@ bool Manager::WriteToFilters(const Manager::Stream* stream, zeek::RecordValPtr c path = filter->path = filter->path_val->AsString()->CheckString(); } - WriterBackend::WriterInfo* info = nullptr; WriterFrontend* writer = nullptr; if ( w != stream->writers.end() ) { // We know this writer already. writer = w->second->writer; - info = w->second->info; if ( ! w->second->hook_initialized ) { auto wi = w->second; @@ -1136,52 +1134,18 @@ bool Manager::WriteToFilters(const Manager::Stream* stream, zeek::RecordValPtr c else { // No, need to create one. - - // Copy the fields for WriterFrontend::Init() as it - // will take ownership. - threading::Field** arg_fields = new threading::Field*[filter->num_fields]; - - for ( int j = 0; j < filter->num_fields; ++j ) { - // Rename fields if a field name map is set. - if ( filter->field_name_map ) { - const char* name = filter->fields[j]->name; - if ( const auto& val = filter->field_name_map->Find(make_intrusive(name)) ) { - delete[] filter->fields[j]->name; - auto [data, len] = val->AsStringVal()->CheckStringWithSize(); - filter->fields[j]->name = util::copy_string(data, len); - } - } - arg_fields[j] = new threading::Field(*filter->fields[j]); - } - - info = new WriterBackend::WriterInfo; - info->path = util::copy_string(path.c_str(), path.size()); - info->network_time = run_state::network_time; - - auto* filter_config_table = filter->config->AsTable(); - for ( const auto& fcte : *filter_config_table ) { - auto k = fcte.GetHashKey(); - auto* v = fcte.value; - - auto index = filter->config->RecreateIndex(*k); - string key = index->Idx(0)->AsString()->CheckString(); - string value = v->GetVal()->AsString()->CheckString(); - info->config.emplace(util::copy_string(key.c_str(), key.size()), - util::copy_string(value.c_str(), value.size())); - } - - // CreateWriter() will set the other fields in info. - - writer = CreateWriter(stream->id, filter->writer, info, filter->num_fields, arg_fields, filter->local, - filter->remote, false, filter->name); + writer = CreateWriterForFilter(filter, path, WriterOrigin::LOCAL); if ( ! writer ) return false; // Find the newly inserted WriterInfo record. w = stream->writers.find(wpp); + assert(w != stream->writers.end()); } + assert(writer); + // Alright, can do the write now. auto rec = RecordToLogRecord(stream, filter, columns.get()); @@ -1193,10 +1157,10 @@ bool Manager::WriteToFilters(const Manager::Stream* stream, zeek::RecordValPtr c for ( auto& v : rec ) vals.emplace_back(&v); - bool res = - zeek::plugin_mgr->HookLogWrite(filter->writer->GetType()->AsEnumType()->Lookup( - filter->writer->InternalInt()), - filter->name, *info, filter->num_fields, filter->fields, &vals[0]); + bool res = zeek::plugin_mgr->HookLogWrite(filter->writer->GetType()->AsEnumType()->Lookup( + filter->writer->InternalInt()), + filter->name, *writer->info, filter->num_fields, filter->fields, + &vals[0]); if ( ! res ) { DBG_LOG(DBG_LOGGING, "Hook prevented writing to filter '%s' on stream '%s'", filter->name.c_str(), stream->name.c_str()); @@ -1205,11 +1169,9 @@ bool Manager::WriteToFilters(const Manager::Stream* stream, zeek::RecordValPtr c } } - assert(w != stream->writers.end()); w->second->total_writes->Inc(); // Write takes ownership of vals. - assert(writer); writer->Write(std::move(rec)); #ifdef DEBUG @@ -1728,6 +1690,47 @@ WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, WriterBacken return winfo->writer; } +WriterFrontend* Manager::CreateWriterForFilter(Filter* filter, const std::string& path, WriterOrigin from) { + // Copy the fields for WriterFrontend::Init() as it + // will take ownership. + threading::Field** arg_fields = new threading::Field*[filter->num_fields]; + + for ( int j = 0; j < filter->num_fields; ++j ) { + // Rename fields if a field name map is set. + if ( filter->field_name_map ) { + const char* name = filter->fields[j]->name; + if ( const auto& val = filter->field_name_map->Find(make_intrusive(name)) ) { + delete[] filter->fields[j]->name; + auto [data, len] = val->AsStringVal()->CheckStringWithSize(); + filter->fields[j]->name = util::copy_string(data, len); + } + } + arg_fields[j] = new threading::Field(*filter->fields[j]); + } + + auto* info = new WriterBackend::WriterInfo; + info->path = util::copy_string(path.c_str(), path.size()); + info->network_time = run_state::network_time; + + auto* filter_config_table = filter->config->AsTable(); + for ( const auto& fcte : *filter_config_table ) { + auto k = fcte.GetHashKey(); + auto* v = fcte.value; + + auto index = filter->config->RecreateIndex(*k); + string key = index->Idx(0)->AsString()->CheckString(); + string value = v->GetVal()->AsString()->CheckString(); + info->config.emplace(util::copy_string(key.c_str(), key.size()), + util::copy_string(value.c_str(), value.size())); + } + + // CreateWriter() will set the other fields in info. + + bool from_remote = from == Manager::WriterOrigin::REMOTE; + return CreateWriter(filter->id, filter->writer, info, filter->num_fields, arg_fields, filter->local, filter->remote, + from_remote, filter->name); +} + bool Manager::WriteFromRemote(EnumVal* id, EnumVal* writer, const string& path, detail::LogRecord&& rec) { Stream* stream = FindStream(id); diff --git a/src/logging/Manager.h b/src/logging/Manager.h index 4233daed69..ef61d781ae 100644 --- a/src/logging/Manager.h +++ b/src/logging/Manager.h @@ -378,6 +378,23 @@ private: struct Stream; struct WriterInfo; + /** + * Helper enum for CreateWriterForFilter to avoid bool params. + */ + enum class WriterOrigin { + REMOTE, + LOCAL, + }; + + /** + * Helper to create a new writer for a filter with the given path. + * + * @param filter the filter for which to create the writer. + * @param path the path for the new writer + * @param from whether instantiated for a remote log, or locally created. + */ + WriterFrontend* CreateWriterForFilter(Filter* filter, const std::string& path, WriterOrigin origin); + bool TraverseRecord(Stream* stream, Filter* filter, RecordType* rt, TableVal* include, TableVal* exclude, const std::string& path, const std::list& indices);