diff --git a/src/RemoteSerializer.cc b/src/RemoteSerializer.cc index 4e9ccb7dd2..cfd20eba39 100644 --- a/src/RemoteSerializer.cc +++ b/src/RemoteSerializer.cc @@ -2716,7 +2716,8 @@ bool RemoteSerializer::ProcessLogCreateWriter() id_val = new EnumVal(id, BifType::Enum::Log::ID); writer_val = new EnumVal(writer, BifType::Enum::Log::Writer); - if ( ! log_mgr->CreateWriter(id_val, writer_val, info, num_fields, fields, true, false) ) + if ( ! log_mgr->CreateWriter(id_val, writer_val, info, num_fields, fields, + true, false, true) ) goto error; Unref(id_val); diff --git a/src/logging/Manager.cc b/src/logging/Manager.cc index 7a182a78b7..4c6d2e92fd 100644 --- a/src/logging/Manager.cc +++ b/src/logging/Manager.cc @@ -86,6 +86,7 @@ struct Manager::WriterInfo { Func* postprocessor; WriterFrontend* writer; WriterBackend::WriterInfo* info; + bool from_remote; string instantiating_filter; }; @@ -240,6 +241,29 @@ Manager::WriterInfo* Manager::FindWriter(WriterFrontend* writer) return 0; } +bool Manager::CompareFields(const Filter* filter, const WriterFrontend* writer) + { + if ( filter->num_fields != writer->NumFields() ) + return false; + + for ( int i = 0; i < filter->num_fields; ++ i) + if ( filter->fields[i]->type != writer->Fields()[i]->type ) + return false; + + return true; + } + +bool Manager::CheckFilterWriterConflict(const WriterInfo* winfo, const Filter* filter) + { + if ( winfo->from_remote ) + // If the writer was instantiated as a result of remote logging, then + // a filter and writer are only compatible if field types match + return ! CompareFields(filter, winfo->writer); + else + // If the writer was instantiated locally, it is bound to one filter + return winfo->instantiating_filter != filter->name; + } + void Manager::RemoveDisabledWriters(Stream* stream) { list disabled; @@ -756,10 +780,9 @@ bool Manager::Write(EnumVal* id, RecordVal* columns) Stream::WriterMap::iterator w = stream->writers.find(wpp); if ( w != stream->writers.end() && - w->second->instantiating_filter != filter->name ) + CheckFilterWriterConflict(w->second, filter) ) { - // Auto-correct path due to conflict with another filter over the - // same writer/path pair + // Auto-correct path due to conflict over the writer/path pairs. string instantiator = w->second->instantiating_filter; string new_path; unsigned int i = 2; @@ -771,7 +794,7 @@ bool Manager::Write(EnumVal* id, RecordVal* columns) wpp.second = new_path; w = stream->writers.find(wpp); } while ( w != stream->writers.end() && - w->second->instantiating_filter != filter->name ); + CheckFilterWriterConflict(w->second, filter) ); Unref(filter->path_val); filter->path_val = new StringVal(new_path.c_str()); @@ -824,8 +847,8 @@ bool Manager::Write(EnumVal* id, RecordVal* columns) // CreateWriter() will set the other fields in info. writer = CreateWriter(stream->id, filter->writer, - info, filter->num_fields, - arg_fields, filter->local, filter->remote, filter->name); + info, filter->num_fields, arg_fields, filter->local, + filter->remote, false, filter->name); if ( ! writer ) { @@ -1024,7 +1047,7 @@ threading::Value** Manager::RecordToFilterVals(Stream* stream, Filter* filter, } WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, WriterBackend::WriterInfo* info, - int num_fields, const threading::Field* const* fields, bool local, bool remote, + int num_fields, const threading::Field* const* fields, bool local, bool remote, bool from_remote, const string& instantiating_filter) { Stream* stream = FindStream(id); @@ -1049,6 +1072,7 @@ WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, WriterBacken winfo->interval = 0; winfo->postprocessor = 0; winfo->info = info; + winfo->from_remote = from_remote; winfo->instantiating_filter = instantiating_filter; // Search for a corresponding filter for the writer/path pair and use its diff --git a/src/logging/Manager.h b/src/logging/Manager.h index 864a23ca88..90ad944bc6 100644 --- a/src/logging/Manager.h +++ b/src/logging/Manager.h @@ -166,7 +166,7 @@ protected: // Takes ownership of fields and info. WriterFrontend* CreateWriter(EnumVal* id, EnumVal* writer, WriterBackend::WriterInfo* info, int num_fields, const threading::Field* const* fields, - bool local, bool remote, const string& instantiating_filter=""); + bool local, bool remote, bool from_remote, const string& instantiating_filter=""); // Takes ownership of values.. bool Write(EnumVal* id, EnumVal* writer, string path, @@ -200,6 +200,8 @@ private: void Rotate(WriterInfo* info); Filter* FindFilter(EnumVal* id, StringVal* filter); WriterInfo* FindWriter(WriterFrontend* writer); + bool CompareFields(const Filter* filter, const WriterFrontend* writer); + bool CheckFilterWriterConflict(const WriterInfo* winfo, const Filter* filter); vector streams; // Indexed by stream enum. int rotations_pending; // Number of rotations not yet finished.