mirror of
https://github.com/zeek/zeek.git
synced 2025-10-05 16:18:19 +00:00
Improve log filter compatibility with remote logging.
If a log filter attempts to write to a path for which a writer is already instantiated due to remote logging, it will re-use the writer as long as the fields of the filter and writer are compatible, else the filter path will be auto-adjusted to not conflict with existing writer's. Conflicts between two local filters are still always auto-adjusted even if field types agree (since they could still be semantically different). Addresses #842.
This commit is contained in:
parent
38c2ee6894
commit
7b2c3db488
3 changed files with 36 additions and 9 deletions
|
@ -2716,7 +2716,8 @@ bool RemoteSerializer::ProcessLogCreateWriter()
|
||||||
id_val = new EnumVal(id, BifType::Enum::Log::ID);
|
id_val = new EnumVal(id, BifType::Enum::Log::ID);
|
||||||
writer_val = new EnumVal(writer, BifType::Enum::Log::Writer);
|
writer_val = new EnumVal(writer, BifType::Enum::Log::Writer);
|
||||||
|
|
||||||
if ( ! log_mgr->CreateWriter(id_val, writer_val, info, num_fields, fields, true, false) )
|
if ( ! log_mgr->CreateWriter(id_val, writer_val, info, num_fields, fields,
|
||||||
|
true, false, true) )
|
||||||
goto error;
|
goto error;
|
||||||
|
|
||||||
Unref(id_val);
|
Unref(id_val);
|
||||||
|
|
|
@ -86,6 +86,7 @@ struct Manager::WriterInfo {
|
||||||
Func* postprocessor;
|
Func* postprocessor;
|
||||||
WriterFrontend* writer;
|
WriterFrontend* writer;
|
||||||
WriterBackend::WriterInfo* info;
|
WriterBackend::WriterInfo* info;
|
||||||
|
bool from_remote;
|
||||||
string instantiating_filter;
|
string instantiating_filter;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -240,6 +241,29 @@ Manager::WriterInfo* Manager::FindWriter(WriterFrontend* writer)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool Manager::CompareFields(const Filter* filter, const WriterFrontend* writer)
|
||||||
|
{
|
||||||
|
if ( filter->num_fields != writer->NumFields() )
|
||||||
|
return false;
|
||||||
|
|
||||||
|
for ( int i = 0; i < filter->num_fields; ++ i)
|
||||||
|
if ( filter->fields[i]->type != writer->Fields()[i]->type )
|
||||||
|
return false;
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool Manager::CheckFilterWriterConflict(const WriterInfo* winfo, const Filter* filter)
|
||||||
|
{
|
||||||
|
if ( winfo->from_remote )
|
||||||
|
// If the writer was instantiated as a result of remote logging, then
|
||||||
|
// a filter and writer are only compatible if field types match
|
||||||
|
return ! CompareFields(filter, winfo->writer);
|
||||||
|
else
|
||||||
|
// If the writer was instantiated locally, it is bound to one filter
|
||||||
|
return winfo->instantiating_filter != filter->name;
|
||||||
|
}
|
||||||
|
|
||||||
void Manager::RemoveDisabledWriters(Stream* stream)
|
void Manager::RemoveDisabledWriters(Stream* stream)
|
||||||
{
|
{
|
||||||
list<Stream::WriterPathPair> disabled;
|
list<Stream::WriterPathPair> disabled;
|
||||||
|
@ -756,10 +780,9 @@ bool Manager::Write(EnumVal* id, RecordVal* columns)
|
||||||
Stream::WriterMap::iterator w = stream->writers.find(wpp);
|
Stream::WriterMap::iterator w = stream->writers.find(wpp);
|
||||||
|
|
||||||
if ( w != stream->writers.end() &&
|
if ( w != stream->writers.end() &&
|
||||||
w->second->instantiating_filter != filter->name )
|
CheckFilterWriterConflict(w->second, filter) )
|
||||||
{
|
{
|
||||||
// Auto-correct path due to conflict with another filter over the
|
// Auto-correct path due to conflict over the writer/path pairs.
|
||||||
// same writer/path pair
|
|
||||||
string instantiator = w->second->instantiating_filter;
|
string instantiator = w->second->instantiating_filter;
|
||||||
string new_path;
|
string new_path;
|
||||||
unsigned int i = 2;
|
unsigned int i = 2;
|
||||||
|
@ -771,7 +794,7 @@ bool Manager::Write(EnumVal* id, RecordVal* columns)
|
||||||
wpp.second = new_path;
|
wpp.second = new_path;
|
||||||
w = stream->writers.find(wpp);
|
w = stream->writers.find(wpp);
|
||||||
} while ( w != stream->writers.end() &&
|
} while ( w != stream->writers.end() &&
|
||||||
w->second->instantiating_filter != filter->name );
|
CheckFilterWriterConflict(w->second, filter) );
|
||||||
|
|
||||||
Unref(filter->path_val);
|
Unref(filter->path_val);
|
||||||
filter->path_val = new StringVal(new_path.c_str());
|
filter->path_val = new StringVal(new_path.c_str());
|
||||||
|
@ -824,8 +847,8 @@ bool Manager::Write(EnumVal* id, RecordVal* columns)
|
||||||
// CreateWriter() will set the other fields in info.
|
// CreateWriter() will set the other fields in info.
|
||||||
|
|
||||||
writer = CreateWriter(stream->id, filter->writer,
|
writer = CreateWriter(stream->id, filter->writer,
|
||||||
info, filter->num_fields,
|
info, filter->num_fields, arg_fields, filter->local,
|
||||||
arg_fields, filter->local, filter->remote, filter->name);
|
filter->remote, false, filter->name);
|
||||||
|
|
||||||
if ( ! writer )
|
if ( ! writer )
|
||||||
{
|
{
|
||||||
|
@ -1024,7 +1047,7 @@ threading::Value** Manager::RecordToFilterVals(Stream* stream, Filter* filter,
|
||||||
}
|
}
|
||||||
|
|
||||||
WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, WriterBackend::WriterInfo* info,
|
WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, WriterBackend::WriterInfo* info,
|
||||||
int num_fields, const threading::Field* const* fields, bool local, bool remote,
|
int num_fields, const threading::Field* const* fields, bool local, bool remote, bool from_remote,
|
||||||
const string& instantiating_filter)
|
const string& instantiating_filter)
|
||||||
{
|
{
|
||||||
Stream* stream = FindStream(id);
|
Stream* stream = FindStream(id);
|
||||||
|
@ -1049,6 +1072,7 @@ WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, WriterBacken
|
||||||
winfo->interval = 0;
|
winfo->interval = 0;
|
||||||
winfo->postprocessor = 0;
|
winfo->postprocessor = 0;
|
||||||
winfo->info = info;
|
winfo->info = info;
|
||||||
|
winfo->from_remote = from_remote;
|
||||||
winfo->instantiating_filter = instantiating_filter;
|
winfo->instantiating_filter = instantiating_filter;
|
||||||
|
|
||||||
// Search for a corresponding filter for the writer/path pair and use its
|
// Search for a corresponding filter for the writer/path pair and use its
|
||||||
|
|
|
@ -166,7 +166,7 @@ protected:
|
||||||
// Takes ownership of fields and info.
|
// Takes ownership of fields and info.
|
||||||
WriterFrontend* CreateWriter(EnumVal* id, EnumVal* writer, WriterBackend::WriterInfo* info,
|
WriterFrontend* CreateWriter(EnumVal* id, EnumVal* writer, WriterBackend::WriterInfo* info,
|
||||||
int num_fields, const threading::Field* const* fields,
|
int num_fields, const threading::Field* const* fields,
|
||||||
bool local, bool remote, const string& instantiating_filter="");
|
bool local, bool remote, bool from_remote, const string& instantiating_filter="");
|
||||||
|
|
||||||
// Takes ownership of values..
|
// Takes ownership of values..
|
||||||
bool Write(EnumVal* id, EnumVal* writer, string path,
|
bool Write(EnumVal* id, EnumVal* writer, string path,
|
||||||
|
@ -200,6 +200,8 @@ private:
|
||||||
void Rotate(WriterInfo* info);
|
void Rotate(WriterInfo* info);
|
||||||
Filter* FindFilter(EnumVal* id, StringVal* filter);
|
Filter* FindFilter(EnumVal* id, StringVal* filter);
|
||||||
WriterInfo* FindWriter(WriterFrontend* writer);
|
WriterInfo* FindWriter(WriterFrontend* writer);
|
||||||
|
bool CompareFields(const Filter* filter, const WriterFrontend* writer);
|
||||||
|
bool CheckFilterWriterConflict(const WriterInfo* winfo, const Filter* filter);
|
||||||
|
|
||||||
vector<Stream *> streams; // Indexed by stream enum.
|
vector<Stream *> streams; // Indexed by stream enum.
|
||||||
int rotations_pending; // Number of rotations not yet finished.
|
int rotations_pending; // Number of rotations not yet finished.
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue