logging/Manager: Extract another CreateWriter() helper

For other cluster backends, CreateWriter() will use a logger's filter
configuration rather than receiving all configuration through CreateLog.
Extract a helper out from WriteToFilters() for reuse.
This commit is contained in:
Arne Welzel 2024-09-19 13:14:14 +02:00
parent 16cca62292
commit 78999d147d
2 changed files with 66 additions and 46 deletions

View file

@ -1116,13 +1116,11 @@ bool Manager::WriteToFilters(const Manager::Stream* stream, zeek::RecordValPtr c
path = filter->path = filter->path_val->AsString()->CheckString();
}
WriterBackend::WriterInfo* info = nullptr;
WriterFrontend* writer = nullptr;
if ( w != stream->writers.end() ) {
// We know this writer already.
writer = w->second->writer;
info = w->second->info;
if ( ! w->second->hook_initialized ) {
auto wi = w->second;
@ -1136,52 +1134,18 @@ bool Manager::WriteToFilters(const Manager::Stream* stream, zeek::RecordValPtr c
else {
// No, need to create one.
// Copy the fields for WriterFrontend::Init() as it
// will take ownership.
threading::Field** arg_fields = new threading::Field*[filter->num_fields];
for ( int j = 0; j < filter->num_fields; ++j ) {
// Rename fields if a field name map is set.
if ( filter->field_name_map ) {
const char* name = filter->fields[j]->name;
if ( const auto& val = filter->field_name_map->Find(make_intrusive<StringVal>(name)) ) {
delete[] filter->fields[j]->name;
auto [data, len] = val->AsStringVal()->CheckStringWithSize();
filter->fields[j]->name = util::copy_string(data, len);
}
}
arg_fields[j] = new threading::Field(*filter->fields[j]);
}
info = new WriterBackend::WriterInfo;
info->path = util::copy_string(path.c_str(), path.size());
info->network_time = run_state::network_time;
auto* filter_config_table = filter->config->AsTable();
for ( const auto& fcte : *filter_config_table ) {
auto k = fcte.GetHashKey();
auto* v = fcte.value;
auto index = filter->config->RecreateIndex(*k);
string key = index->Idx(0)->AsString()->CheckString();
string value = v->GetVal()->AsString()->CheckString();
info->config.emplace(util::copy_string(key.c_str(), key.size()),
util::copy_string(value.c_str(), value.size()));
}
// CreateWriter() will set the other fields in info.
writer = CreateWriter(stream->id, filter->writer, info, filter->num_fields, arg_fields, filter->local,
filter->remote, false, filter->name);
writer = CreateWriterForFilter(filter, path, WriterOrigin::LOCAL);
if ( ! writer )
return false;
// Find the newly inserted WriterInfo record.
w = stream->writers.find(wpp);
assert(w != stream->writers.end());
}
assert(writer);
// Alright, can do the write now.
auto rec = RecordToLogRecord(stream, filter, columns.get());
@ -1193,10 +1157,10 @@ bool Manager::WriteToFilters(const Manager::Stream* stream, zeek::RecordValPtr c
for ( auto& v : rec )
vals.emplace_back(&v);
bool res =
zeek::plugin_mgr->HookLogWrite(filter->writer->GetType()->AsEnumType()->Lookup(
filter->writer->InternalInt()),
filter->name, *info, filter->num_fields, filter->fields, &vals[0]);
bool res = zeek::plugin_mgr->HookLogWrite(filter->writer->GetType()->AsEnumType()->Lookup(
filter->writer->InternalInt()),
filter->name, *writer->info, filter->num_fields, filter->fields,
&vals[0]);
if ( ! res ) {
DBG_LOG(DBG_LOGGING, "Hook prevented writing to filter '%s' on stream '%s'", filter->name.c_str(),
stream->name.c_str());
@ -1205,11 +1169,9 @@ bool Manager::WriteToFilters(const Manager::Stream* stream, zeek::RecordValPtr c
}
}
assert(w != stream->writers.end());
w->second->total_writes->Inc();
// Write takes ownership of vals.
assert(writer);
writer->Write(std::move(rec));
#ifdef DEBUG
@ -1728,6 +1690,47 @@ WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, WriterBacken
return winfo->writer;
}
WriterFrontend* Manager::CreateWriterForFilter(Filter* filter, const std::string& path, WriterOrigin from) {
// Copy the fields for WriterFrontend::Init() as it
// will take ownership.
threading::Field** arg_fields = new threading::Field*[filter->num_fields];
for ( int j = 0; j < filter->num_fields; ++j ) {
// Rename fields if a field name map is set.
if ( filter->field_name_map ) {
const char* name = filter->fields[j]->name;
if ( const auto& val = filter->field_name_map->Find(make_intrusive<StringVal>(name)) ) {
delete[] filter->fields[j]->name;
auto [data, len] = val->AsStringVal()->CheckStringWithSize();
filter->fields[j]->name = util::copy_string(data, len);
}
}
arg_fields[j] = new threading::Field(*filter->fields[j]);
}
auto* info = new WriterBackend::WriterInfo;
info->path = util::copy_string(path.c_str(), path.size());
info->network_time = run_state::network_time;
auto* filter_config_table = filter->config->AsTable();
for ( const auto& fcte : *filter_config_table ) {
auto k = fcte.GetHashKey();
auto* v = fcte.value;
auto index = filter->config->RecreateIndex(*k);
string key = index->Idx(0)->AsString()->CheckString();
string value = v->GetVal()->AsString()->CheckString();
info->config.emplace(util::copy_string(key.c_str(), key.size()),
util::copy_string(value.c_str(), value.size()));
}
// CreateWriter() will set the other fields in info.
bool from_remote = from == Manager::WriterOrigin::REMOTE;
return CreateWriter(filter->id, filter->writer, info, filter->num_fields, arg_fields, filter->local, filter->remote,
from_remote, filter->name);
}
bool Manager::WriteFromRemote(EnumVal* id, EnumVal* writer, const string& path, detail::LogRecord&& rec) {
Stream* stream = FindStream(id);

View file

@ -378,6 +378,23 @@ private:
struct Stream;
struct WriterInfo;
/**
* Helper enum for CreateWriterForFilter to avoid bool params.
*/
enum class WriterOrigin {
REMOTE,
LOCAL,
};
/**
* Helper to create a new writer for a filter with the given path.
*
* @param filter the filter for which to create the writer.
* @param path the path for the new writer
* @param from whether instantiated for a remote log, or locally created.
*/
WriterFrontend* CreateWriterForFilter(Filter* filter, const std::string& path, WriterOrigin origin);
bool TraverseRecord(Stream* stream, Filter* filter, RecordType* rt, TableVal* include, TableVal* exclude,
const std::string& path, const std::list<int>& indices);