diff --git a/src/logging/WriterFrontend.cc b/src/logging/WriterFrontend.cc index f4451c3f79..97aee69d3a 100644 --- a/src/logging/WriterFrontend.cc +++ b/src/logging/WriterFrontend.cc @@ -3,6 +3,7 @@ #include "zeek/RunState.h" #include "zeek/Span.h" #include "zeek/broker/Manager.h" +#include "zeek/cluster/Backend.h" #include "zeek/logging/Manager.h" #include "zeek/logging/WriterBackend.h" #include "zeek/threading/SerialTypes.h" @@ -185,12 +186,30 @@ void WriterFrontend::Write(detail::LogRecord&& arg_vals) { return; } + // If remote logging is enabled *and* broker is used as cluster backend, + // push the single log record directly to broker_mgr, it uses its own + // buffering logic currently. + // + // Other cluster backends leverage the write buffering logic in the + // WriterFrontend. See FlushWriteBuffer(). + const bool broker_is_cluster_backend = zeek::cluster::backend == zeek::broker_mgr; + if ( remote ) { - broker_mgr->PublishLogWrite(header.stream_id.get(), header.writer_id.get(), info->path, vals); + if ( broker_is_cluster_backend ) { + zeek::broker_mgr->PublishLogWrite(header.stream_id.get(), header.writer_id.get(), info->path, vals); + + if ( ! backend ) // nothing left do do if we do not log locally + return; + } + } + else if ( ! backend ) { + assert(! remote); + // Not remote and no backend, we're done. + return; } - if ( ! backend ) - return; + // Either non-broker remote or local logging. + assert(backend || (remote && ! broker_is_cluster_backend)); write_buffer.WriteRecord(std::move(vals)); @@ -207,8 +226,16 @@ void WriterFrontend::FlushWriteBuffer() { // Nothing to do. return; + auto records = std::move(write_buffer).TakeRecords(); + + // We've already pushed to broker during Write(). If another backend + // is used, push all the buffered log records to it now. + const bool broker_is_cluster_backend = zeek::cluster::backend == zeek::broker_mgr; + if ( remote && ! broker_is_cluster_backend ) + zeek::cluster::backend->PublishLogWrites(header, Span{records}); + if ( backend ) - backend->SendIn(new WriteMessage(backend, num_fields, std::move(write_buffer).TakeRecords())); + backend->SendIn(new WriteMessage(backend, num_fields, std::move(records))); } void WriterFrontend::SetBuf(bool enabled) {