logging/WriterFrontend: Add logic for non-broker cluster backends

If cluster::backend isn't broker_mgr, use the WriterFrontend's buffering
logic and send a whole batch of log writes during FlushWriteBuffer().
This is a different path than broker's own logging logic.

Preferably we adapt broker to a model where it isn't
buffering either.
This commit is contained in:
Arne Welzel 2024-11-14 18:02:44 +01:00
parent 15ea8a3be9
commit 10d93eff36

View file

@ -3,6 +3,7 @@
#include "zeek/RunState.h" #include "zeek/RunState.h"
#include "zeek/Span.h" #include "zeek/Span.h"
#include "zeek/broker/Manager.h" #include "zeek/broker/Manager.h"
#include "zeek/cluster/Backend.h"
#include "zeek/logging/Manager.h" #include "zeek/logging/Manager.h"
#include "zeek/logging/WriterBackend.h" #include "zeek/logging/WriterBackend.h"
#include "zeek/threading/SerialTypes.h" #include "zeek/threading/SerialTypes.h"
@ -185,12 +186,30 @@ void WriterFrontend::Write(detail::LogRecord&& arg_vals) {
return; return;
} }
// If remote logging is enabled *and* broker is used as cluster backend,
// push the single log record directly to broker_mgr, it uses its own
// buffering logic currently.
//
// Other cluster backends leverage the write buffering logic in the
// WriterFrontend. See FlushWriteBuffer().
const bool broker_is_cluster_backend = zeek::cluster::backend == zeek::broker_mgr;
if ( remote ) { if ( remote ) {
broker_mgr->PublishLogWrite(header.stream_id.get(), header.writer_id.get(), info->path, vals); if ( broker_is_cluster_backend ) {
zeek::broker_mgr->PublishLogWrite(header.stream_id.get(), header.writer_id.get(), info->path, vals);
if ( ! backend ) // nothing left do do if we do not log locally
return;
}
}
else if ( ! backend ) {
assert(! remote);
// Not remote and no backend, we're done.
return;
} }
if ( ! backend ) // Either non-broker remote or local logging.
return; assert(backend || (remote && ! broker_is_cluster_backend));
write_buffer.WriteRecord(std::move(vals)); write_buffer.WriteRecord(std::move(vals));
@ -207,8 +226,16 @@ void WriterFrontend::FlushWriteBuffer() {
// Nothing to do. // Nothing to do.
return; return;
auto records = std::move(write_buffer).TakeRecords();
// We've already pushed to broker during Write(). If another backend
// is used, push all the buffered log records to it now.
const bool broker_is_cluster_backend = zeek::cluster::backend == zeek::broker_mgr;
if ( remote && ! broker_is_cluster_backend )
zeek::cluster::backend->PublishLogWrites(header, Span{records});
if ( backend ) if ( backend )
backend->SendIn(new WriteMessage(backend, num_fields, std::move(write_buffer).TakeRecords())); backend->SendIn(new WriteMessage(backend, num_fields, std::move(records)));
} }
void WriterFrontend::SetBuf(bool enabled) { void WriterFrontend::SetBuf(bool enabled) {