logging: Dedicated log flush timer

Log flushing is currently triggered based on the threading heartbeat timer
of WriterBackends and the hard-coded WRITE_BUFFER_SIZE 1000.

This change introduces a separate timer that is managed by the logger
manager instead of piggy-backing on the heartbeat timer, as well as a
const &redef for the buffer size.

This allows to modify the log flush frequency and batch size independently
of the threading heartbeat interval. Later, this will allow to re-use the
buffering and flushing logic of writer frontends for non-Broker cluster
backends, too.

One change here is that even frontends that do not have a backend will
be flushed regularly. This is wanted for non-Broker backends and should be
very cheap. Possibly, Broker can piggy back on this timer down the road, too,
rather than using its own script-level timer (see Broker::log_flush()).
This commit is contained in:
Arne Welzel 2024-09-27 11:37:23 +02:00
parent 77b9510c8a
commit 0d925e935e
11 changed files with 87 additions and 23 deletions

View file

@ -16,6 +16,7 @@
#include "zeek/NetVar.h"
#include "zeek/OpaqueVal.h"
#include "zeek/RunState.h"
#include "zeek/Timer.h"
#include "zeek/Type.h"
#include "zeek/broker/Manager.h"
#include "zeek/input.h"
@ -39,6 +40,21 @@ extern zeek::OpaqueTypePtr log_delay_token_type;
namespace zeek::logging {
namespace detail {
// A timer that regularly flushes the write buffer of all WriterFrontends.
class LogFlushWriteBufferTimer : public zeek::detail::Timer {
public:
explicit LogFlushWriteBufferTimer(double t) : Timer(t, zeek::detail::TIMER_LOG_FLUSH_WRITE_BUFFER) {}
void Dispatch(double t, bool is_expire) override {
zeek::log_mgr->FlushAllWriteBuffers();
if ( ! is_expire )
zeek::log_mgr->StartLogFlushTimer();
}
};
using DelayTokenType = zeek_uint_t;
class DelayInfo;
@ -414,7 +430,6 @@ void Manager::Stream::DispatchDelayExpiredTimer(double t, bool is_expire) {
ScheduleLogDelayExpiredTimer(delay_queue.front()->ExpireTime());
}
Manager::Manager()
: plugin::ComponentManager<logging::Component>("Log", "Writer"),
total_log_stream_writes_family(telemetry_mgr->CounterFamily("zeek", "log-stream-writes", {"module", "stream"},
@ -598,6 +613,9 @@ bool Manager::CreateStream(EnumVal* id, RecordVal* sval) {
DBG_LOG(DBG_LOGGING, "Created new logging stream '%s', raising event %s", streams[idx]->name.c_str(),
event ? streams[idx]->event->Name() : "<none>");
if ( ! log_flush_timer )
StartLogFlushTimer();
return true;
}
@ -2033,4 +2051,20 @@ bool Manager::FinishedRotation(WriterFrontend* writer, const char* new_name, con
return result;
}
void Manager::FlushAllWriteBuffers() {
for ( const auto* s : zeek::log_mgr->streams ) {
if ( ! s ) // may store nullptr
continue;
for ( const auto& [_, info] : s->writers )
info->writer->FlushWriteBuffer();
}
}
void Manager::StartLogFlushTimer() {
double next_t = zeek::run_state::network_time + BifConst::Log::flush_interval;
log_flush_timer = new detail::LogFlushWriteBufferTimer(next_t);
zeek::detail::timer_mgr->Add(log_flush_timer);
}
} // namespace zeek::logging