diff --git a/src/cluster/Backend.cc b/src/cluster/Backend.cc index 3737b9134f..d1e6ef9a4f 100644 --- a/src/cluster/Backend.cc +++ b/src/cluster/Backend.cc @@ -12,6 +12,7 @@ #include "zeek/Type.h" #include "zeek/cluster/Serializer.h" #include "zeek/iosource/Manager.h" +#include "zeek/logging/Manager.h" using namespace zeek::cluster; @@ -129,10 +130,7 @@ bool Backend::ProcessLogMessage(const std::string_view& format, detail::byte_buf return false; } - // TODO: Send the whole batch to the logging manager. - // return zeek::log_mgr->WritesFromRemote(result->header, std::move(result->records)); - zeek::reporter->FatalError("not implemented"); - return false; + return zeek::log_mgr->WriteBatchFromRemote(result->header, std::move(result->records)); } bool ThreadedBackend::ProcessBackendMessage(int tag, detail::byte_buffer_span payload) { diff --git a/src/logging/Manager.cc b/src/logging/Manager.cc index 733ba86cc0..571b61c020 100644 --- a/src/logging/Manager.cc +++ b/src/logging/Manager.cc @@ -1731,6 +1731,95 @@ WriterFrontend* Manager::CreateWriterForFilter(Filter* filter, const std::string from_remote, filter->name); } +bool Manager::WriteBatchFromRemote(const detail::LogWriteHeader& header, std::vector&& records) { + Stream* stream = FindStream(header.stream_id.get()); + if ( ! stream ) { + reporter->Error("Failed to find stream for !"); + return false; + } + + Filter* filter = nullptr; + + // Find a filter with a matching filter name. + for ( const auto& f : stream->filters ) { + if ( f->name == header.filter_name ) { + filter = f; + break; + } + } + if ( ! filter ) { + reporter->Error("Remote write failed: Did not find filter '%s' for stream '%s'", header.filter_name.c_str(), + header.stream_name.c_str()); + return false; + } + + if ( filter->remote ) { + reporter->Error("Remote write failed: Local filter '%s' of stream '%s' has remote=true", filter->name.c_str(), + obj_desc_short(header.stream_id.get()).c_str()); + return false; + } + + // Basic validation of incoming log record with local filter configuration. + if ( static_cast(header.fields.size()) != filter->num_fields ) { + reporter->Error("Remote write failed: Local filter '%s' of stream '%s' has '%d' fields, got %zu", + filter->name.c_str(), obj_desc_short(header.stream_id.get()).c_str(), filter->num_fields, + header.fields.size()); + return false; + } + + // Schema checking of filter and received header. + for ( int i = 0; i < filter->num_fields; ++i ) { + const auto ft = filter->fields[i]->type; + const auto ht = header.fields[i].type; + if ( ft != ht ) { + reporter->Error("Remote write failed: fields[%d] got type %s, want %s for stream '%s' and filter '%s'", i, + zeek::type_name(ht), zeek::type_name(ft), header.stream_name.c_str(), filter->name.c_str()); + return false; + } + } + + if ( header.writer_id.get() != filter->writer ) { + reporter->Error("Remote write failed: Local filter '%s' of stream '%s' has writer '%s', got '%s'", + filter->name.c_str(), header.stream_name.c_str(), obj_desc_short(filter->writer).c_str(), + header.writer_name.c_str()); + return false; + } + + // Lookup if there's a (writer, path) pair for this stream. Either use + // it or create a new one if not. + // + // TODO: Might make sense to switch this to (stream, filter, path) + // down the road. + Stream::WriterPathPair wpp(filter->writer->AsEnum(), header.path); + + Stream::WriterMap::const_iterator w = stream->writers.find(wpp); + if ( w == stream->writers.end() ) { + DBG_LOG(DBG_LOGGING, "creating writer %s for filter %s of stream %s\n", header.writer_name.c_str(), + filter->name.c_str(), header.stream_name.c_str()); + + if ( ! CreateWriterForFilter(filter, header.path, WriterOrigin::REMOTE) ) { + reporter->Error("Failed to create writer %s for filter '%s' of stream '%s'", header.writer_name.c_str(), + filter->name.c_str(), header.stream_name.c_str()); + return false; + } + } + + w = stream->writers.find(wpp); + assert(w != stream->writers.end()); + + // Write each record individually. This results in the + // frontend's buffering to be in effect. + // + // This is nice as it'll buffer log writes from different + // remote nodes until the frontend is flushed. On the flip + // side, could also see how flushing the buffer for every + // every remote log batch might be more predictable. + for ( auto& r : records ) + w->second->writer->Write(std::move(r)); + + return true; +} + bool Manager::WriteFromRemote(EnumVal* id, EnumVal* writer, const string& path, detail::LogRecord&& rec) { Stream* stream = FindStream(id); diff --git a/src/logging/Manager.h b/src/logging/Manager.h index ef61d781ae..e90c545990 100644 --- a/src/logging/Manager.h +++ b/src/logging/Manager.h @@ -10,6 +10,7 @@ #include "zeek/Tag.h" #include "zeek/Val.h" #include "zeek/logging/Component.h" +#include "zeek/logging/Types.h" #include "zeek/logging/WriterBackend.h" #include "zeek/plugin/ComponentManager.h" #include "zeek/telemetry/Manager.h" @@ -282,11 +283,34 @@ public: * @param path The path of the target log stream to write to. * * @param rec Representation of the log record to write. - + * * @return Returns true if the record was processed successfully. */ bool WriteFromRemote(EnumVal* id, EnumVal* writer, const std::string& path, detail::LogRecord&& rec); + /** + * Writes out a batch of log entries received from remote nodes. + * + * The given records have passed through all policy filters and raised events + * on the sending node. They are only meant to be written out. + * + * In contrast to WriteFromRemote(), this method works on a whole batch of log + * records at once. As long as the the receiving node has a matching filter + * attached to the stream and the fields within the header match the local + * filter's fields, an appropriate writer is created. WriteFromRemote() instead + * assumes the writer exists aprior. + * + * This method acts as a sink for \a records. A rvalue reference is used to + * make this explicit and prevent callers from copying all records by mistake. + * + * @param header The header describing the log records as deserialized from a remote message. + * + * @param records Records to be written out, the manager takes ownership of these. + * + * @return Returns true if the records were processed successfully. + */ + bool WriteBatchFromRemote(const detail::LogWriteHeader& header, std::vector&& records); + /** * Announces all instantiated writers to a given Broker peer. */