logging/Manager: Implement new WriteBatchFromRemote()

This commit is contained in:
Arne Welzel 2024-11-14 17:29:44 +01:00
parent 3dd1f8d38a
commit ab1d48c95a
3 changed files with 116 additions and 5 deletions

View file

@ -1731,6 +1731,95 @@ WriterFrontend* Manager::CreateWriterForFilter(Filter* filter, const std::string
from_remote, filter->name);
}
bool Manager::WriteBatchFromRemote(const detail::LogWriteHeader& header, std::vector<detail::LogRecord>&& records) {
Stream* stream = FindStream(header.stream_id.get());
if ( ! stream ) {
reporter->Error("Failed to find stream for !");
return false;
}
Filter* filter = nullptr;
// Find a filter with a matching filter name.
for ( const auto& f : stream->filters ) {
if ( f->name == header.filter_name ) {
filter = f;
break;
}
}
if ( ! filter ) {
reporter->Error("Remote write failed: Did not find filter '%s' for stream '%s'", header.filter_name.c_str(),
header.stream_name.c_str());
return false;
}
if ( filter->remote ) {
reporter->Error("Remote write failed: Local filter '%s' of stream '%s' has remote=true", filter->name.c_str(),
obj_desc_short(header.stream_id.get()).c_str());
return false;
}
// Basic validation of incoming log record with local filter configuration.
if ( static_cast<int>(header.fields.size()) != filter->num_fields ) {
reporter->Error("Remote write failed: Local filter '%s' of stream '%s' has '%d' fields, got %zu",
filter->name.c_str(), obj_desc_short(header.stream_id.get()).c_str(), filter->num_fields,
header.fields.size());
return false;
}
// Schema checking of filter and received header.
for ( int i = 0; i < filter->num_fields; ++i ) {
const auto ft = filter->fields[i]->type;
const auto ht = header.fields[i].type;
if ( ft != ht ) {
reporter->Error("Remote write failed: fields[%d] got type %s, want %s for stream '%s' and filter '%s'", i,
zeek::type_name(ht), zeek::type_name(ft), header.stream_name.c_str(), filter->name.c_str());
return false;
}
}
if ( header.writer_id.get() != filter->writer ) {
reporter->Error("Remote write failed: Local filter '%s' of stream '%s' has writer '%s', got '%s'",
filter->name.c_str(), header.stream_name.c_str(), obj_desc_short(filter->writer).c_str(),
header.writer_name.c_str());
return false;
}
// Lookup if there's a (writer, path) pair for this stream. Either use
// it or create a new one if not.
//
// TODO: Might make sense to switch this to (stream, filter, path)
// down the road.
Stream::WriterPathPair wpp(filter->writer->AsEnum(), header.path);
Stream::WriterMap::const_iterator w = stream->writers.find(wpp);
if ( w == stream->writers.end() ) {
DBG_LOG(DBG_LOGGING, "creating writer %s for filter %s of stream %s\n", header.writer_name.c_str(),
filter->name.c_str(), header.stream_name.c_str());
if ( ! CreateWriterForFilter(filter, header.path, WriterOrigin::REMOTE) ) {
reporter->Error("Failed to create writer %s for filter '%s' of stream '%s'", header.writer_name.c_str(),
filter->name.c_str(), header.stream_name.c_str());
return false;
}
}
w = stream->writers.find(wpp);
assert(w != stream->writers.end());
// Write each record individually. This results in the
// frontend's buffering to be in effect.
//
// This is nice as it'll buffer log writes from different
// remote nodes until the frontend is flushed. On the flip
// side, could also see how flushing the buffer for every
// every remote log batch might be more predictable.
for ( auto& r : records )
w->second->writer->Write(std::move(r));
return true;
}
bool Manager::WriteFromRemote(EnumVal* id, EnumVal* writer, const string& path, detail::LogRecord&& rec) {
Stream* stream = FindStream(id);