diff --git a/scripts/base/init-bare.zeek b/scripts/base/init-bare.zeek index f01c4496aa..48643239b8 100644 --- a/scripts/base/init-bare.zeek +++ b/scripts/base/init-bare.zeek @@ -2903,6 +2903,33 @@ export { } # end export +module Log; + +export { + ## Default interval for flushing the write buffers of all + ## enabled log streams. + ## + ## In earlier Zeek releases this was governed by :zeek:see:`Threading::heartbeat_interval`. + ## For Broker, see also :zeek::see:`Broker::log_batch_interval`. + ## + ## .. :zeek:see:`Log::flush` + ## .. :zeek:see:`Log::set_buf` + ## .. :zeek:see:`Log::write_buffer_size` + const flush_interval = 1.0sec &redef; + + ## Default maximum size of the log write buffer per filter/path pair. + ## If this many log writes are buffered, the writer frontend flushes + ## its writes to its backend before flush_interval expires. + ## + ## In earlier Zeek releases this was hard-coded to 1000. + ## + ## .. :zeek:see:`Log::flush` + ## .. :zeek:see:`Log::set_buf` + ## .. :zeek:see:`Log::flush_interval` + const write_buffer_size = 1000 &redef; + +} # end export + module POP3; export { diff --git a/src/Timer.cc b/src/Timer.cc index e54c4f6add..285e26df6a 100644 --- a/src/Timer.cc +++ b/src/Timer.cc @@ -50,6 +50,7 @@ const char* TimerNames[] = { "ThreadHeartbeat", "UnknownProtocolExpire", "LogDelayExpire", + "LogFlushWriteBufferTimer", }; const char* timer_type_to_string(TimerType type) { return TimerNames[type]; } diff --git a/src/Timer.h b/src/Timer.h index 45ba50314e..cf3f4f130d 100644 --- a/src/Timer.h +++ b/src/Timer.h @@ -57,8 +57,9 @@ enum TimerType : uint8_t { TIMER_THREAD_HEARTBEAT, TIMER_UNKNOWN_PROTOCOL_EXPIRE, TIMER_LOG_DELAY_EXPIRE, + TIMER_LOG_FLUSH_WRITE_BUFFER, }; -constexpr int NUM_TIMER_TYPES = int(TIMER_LOG_DELAY_EXPIRE) + 1; +constexpr int NUM_TIMER_TYPES = int(TIMER_LOG_FLUSH_WRITE_BUFFER) + 1; extern const char* timer_type_to_string(TimerType type); diff --git a/src/const.bif b/src/const.bif index 1ae177335d..f2b0423ba6 100644 --- a/src/const.bif +++ b/src/const.bif @@ -29,3 +29,6 @@ const Tunnel::ip_tunnel_timeout: interval; const Tunnel::validate_vxlan_checksums: bool; const Threading::heartbeat_interval: interval; + +const Log::flush_interval: interval; +const Log::write_buffer_size: count; diff --git a/src/logging/Manager.cc b/src/logging/Manager.cc index 183532152f..5c6ad7fdab 100644 --- a/src/logging/Manager.cc +++ b/src/logging/Manager.cc @@ -16,6 +16,7 @@ #include "zeek/NetVar.h" #include "zeek/OpaqueVal.h" #include "zeek/RunState.h" +#include "zeek/Timer.h" #include "zeek/Type.h" #include "zeek/broker/Manager.h" #include "zeek/input.h" @@ -39,6 +40,21 @@ extern zeek::OpaqueTypePtr log_delay_token_type; namespace zeek::logging { namespace detail { + +// A timer that regularly flushes the write buffer of all WriterFrontends. +class LogFlushWriteBufferTimer : public zeek::detail::Timer { +public: + explicit LogFlushWriteBufferTimer(double t) : Timer(t, zeek::detail::TIMER_LOG_FLUSH_WRITE_BUFFER) {} + + void Dispatch(double t, bool is_expire) override { + zeek::log_mgr->FlushAllWriteBuffers(); + + if ( ! is_expire ) + zeek::log_mgr->StartLogFlushTimer(); + } +}; + + using DelayTokenType = zeek_uint_t; class DelayInfo; @@ -414,7 +430,6 @@ void Manager::Stream::DispatchDelayExpiredTimer(double t, bool is_expire) { ScheduleLogDelayExpiredTimer(delay_queue.front()->ExpireTime()); } - Manager::Manager() : plugin::ComponentManager("Log", "Writer"), total_log_stream_writes_family(telemetry_mgr->CounterFamily("zeek", "log-stream-writes", {"module", "stream"}, @@ -598,6 +613,9 @@ bool Manager::CreateStream(EnumVal* id, RecordVal* sval) { DBG_LOG(DBG_LOGGING, "Created new logging stream '%s', raising event %s", streams[idx]->name.c_str(), event ? streams[idx]->event->Name() : ""); + if ( ! log_flush_timer ) + StartLogFlushTimer(); + return true; } @@ -2033,4 +2051,20 @@ bool Manager::FinishedRotation(WriterFrontend* writer, const char* new_name, con return result; } +void Manager::FlushAllWriteBuffers() { + for ( const auto* s : zeek::log_mgr->streams ) { + if ( ! s ) // may store nullptr + continue; + + for ( const auto& [_, info] : s->writers ) + info->writer->FlushWriteBuffer(); + } +} + +void Manager::StartLogFlushTimer() { + double next_t = zeek::run_state::network_time + BifConst::Log::flush_interval; + log_flush_timer = new detail::LogFlushWriteBufferTimer(next_t); + zeek::detail::timer_mgr->Add(log_flush_timer); +} + } // namespace zeek::logging diff --git a/src/logging/Manager.h b/src/logging/Manager.h index 81eedb47ef..4233daed69 100644 --- a/src/logging/Manager.h +++ b/src/logging/Manager.h @@ -32,6 +32,8 @@ class RotationTimer; namespace detail { +class LogFlushWriteBufferTimer; + class DelayInfo; using WriteIdx = uint64_t; @@ -348,6 +350,7 @@ protected: friend class RotationFinishedMessage; friend class RotationFailedMessage; friend class RotationTimer; + friend class detail::LogFlushWriteBufferTimer; // Instantiates a new WriterBackend of the given type (note that // doing so creates a new thread!). @@ -364,6 +367,12 @@ protected: bool FinishedRotation(WriterFrontend* writer, const char* new_name, const char* old_name, double open, double close, bool success, bool terminating); + // Flush write buffers of all writers. + void FlushAllWriteBuffers(); + + // Start the regular log flushing timer. + void StartLogFlushTimer(); + private: struct Filter; struct Stream; @@ -404,6 +413,9 @@ private: zeek_uint_t last_delay_token = 0; std::vector active_writes; + + // Timer for flushing write buffers of frontends. + detail::LogFlushWriteBufferTimer* log_flush_timer = nullptr; }; } // namespace logging diff --git a/src/logging/WriterBackend.cc b/src/logging/WriterBackend.cc index 29ccde38cf..46750cd7a4 100644 --- a/src/logging/WriterBackend.cc +++ b/src/logging/WriterBackend.cc @@ -46,17 +46,6 @@ private: bool terminating; }; -class FlushWriteBufferMessage final : public threading::OutputMessage { -public: - FlushWriteBufferMessage(WriterFrontend* writer) - : threading::OutputMessage("FlushWriteBuffer", writer) {} - - bool Process() override { - Object()->FlushWriteBuffer(); - return true; - } -}; - class DisableMessage final : public threading::OutputMessage { public: DisableMessage(WriterFrontend* writer) : threading::OutputMessage("Disable", writer) {} @@ -305,7 +294,6 @@ bool WriterBackend::OnHeartbeat(double network_time, double current_time) { if ( Failed() ) return true; - SendOut(new FlushWriteBufferMessage(frontend)); return DoHeartbeat(network_time, current_time); } diff --git a/src/logging/WriterFrontend.cc b/src/logging/WriterFrontend.cc index a5d90ebb60..121cc94dc6 100644 --- a/src/logging/WriterFrontend.cc +++ b/src/logging/WriterFrontend.cc @@ -89,7 +89,7 @@ private: WriterFrontend::WriterFrontend(const WriterBackend::WriterInfo& arg_info, EnumVal* arg_stream, EnumVal* arg_writer, bool arg_local, bool arg_remote) - : write_buffer(detail::WriteBuffer(WRITER_BUFFER_SIZE)) { + : write_buffer(detail::WriteBuffer(BifConst::Log::write_buffer_size)) { stream = arg_stream; writer = arg_writer; Ref(stream); diff --git a/src/logging/WriterFrontend.h b/src/logging/WriterFrontend.h index 7dd44cfd7e..a8c565cd24 100644 --- a/src/logging/WriterFrontend.h +++ b/src/logging/WriterFrontend.h @@ -261,9 +261,7 @@ protected: int num_fields; // The number of log fields. const threading::Field* const* fields; // The log fields. - // Buffer for bulk writes. - static const int WRITER_BUFFER_SIZE = 1000; - detail::WriteBuffer write_buffer; // Buffer of size WRITER_BUFFER_SIZE. + detail::WriteBuffer write_buffer; // Buffer for bulk writes. }; } // namespace zeek::logging diff --git a/testing/btest/Baseline/language.expire_func-copy/output b/testing/btest/Baseline/language.expire_func-copy/output index f971c3d005..dedc219ade 100644 --- a/testing/btest/Baseline/language.expire_func-copy/output +++ b/testing/btest/Baseline/language.expire_func-copy/output @@ -18,10 +18,10 @@ @XXXXXXXXXX.XXXXXX expired [orig_h=fe80::20c:29ff:febd:6f01, orig_p=5353/udp, resp_h=ff02::fb, resp_p=5353/udp] @XXXXXXXXXX.XXXXXX expired [orig_h=172.16.238.1, orig_p=49657/tcp, resp_h=172.16.238.131, resp_p=80/tcp] @XXXXXXXXXX.XXXXXX expired [orig_h=172.16.238.1, orig_p=49656/tcp, resp_h=172.16.238.131, resp_p=22/tcp] -@XXXXXXXXXX.XXXXXX expired [orig_h=172.16.238.131, orig_p=45126/udp, resp_h=172.16.238.2, resp_p=53/udp] -@XXXXXXXXXX.XXXXXX expired [orig_h=172.16.238.1, orig_p=49659/tcp, resp_h=172.16.238.131, resp_p=21/tcp] @XXXXXXXXXX.XXXXXX expired copy [orig_h=172.16.238.131, orig_p=45126/udp, resp_h=172.16.238.2, resp_p=53/udp] @XXXXXXXXXX.XXXXXX expired copy [orig_h=172.16.238.1, orig_p=49659/tcp, resp_h=172.16.238.131, resp_p=21/tcp] +@XXXXXXXXXX.XXXXXX expired [orig_h=172.16.238.131, orig_p=45126/udp, resp_h=172.16.238.2, resp_p=53/udp] +@XXXXXXXXXX.XXXXXX expired [orig_h=172.16.238.1, orig_p=49659/tcp, resp_h=172.16.238.131, resp_p=21/tcp] @XXXXXXXXXX.XXXXXX expired [orig_h=172.16.238.131, orig_p=53102/udp, resp_h=172.16.238.2, resp_p=53/udp] @XXXXXXXXXX.XXXXXX expired [orig_h=172.16.238.131, orig_p=48621/udp, resp_h=172.16.238.2, resp_p=53/udp] @XXXXXXXXXX.XXXXXX expired [orig_h=172.16.238.131, orig_p=33109/udp, resp_h=172.16.238.2, resp_p=53/udp] diff --git a/testing/btest/Baseline/scripts.base.frameworks.netcontrol.rule-added-hook-2/netcontrol.log b/testing/btest/Baseline/scripts.base.frameworks.netcontrol.rule-added-hook-2/netcontrol.log index 7d415e9fde..3bde5dc695 100644 --- a/testing/btest/Baseline/scripts.base.frameworks.netcontrol.rule-added-hook-2/netcontrol.log +++ b/testing/btest/Baseline/scripts.base.frameworks.netcontrol.rule-added-hook-2/netcontrol.log @@ -16,10 +16,10 @@ XXXXXXXXXX.XXXXXX 2 NetControl::RULE ADD NetControl::REQUESTED NetControl::DROP XXXXXXXXXX.XXXXXX 2 NetControl::RULE ADD NetControl::REQUESTED NetControl::DROP NetControl::FORWARD NetControl::ADDRESS 192.168.18.50/32 - - 0 1.000000 - plugin-2 XXXXXXXXXX.XXXXXX 2 NetControl::RULE ADD NetControl::SUCCEEDED NetControl::DROP NetControl::FORWARD NetControl::ADDRESS 192.168.18.50/32 - - 0 1.000000 - plugin-1 XXXXXXXXXX.XXXXXX 2 NetControl::RULE ADD NetControl::EXISTS NetControl::DROP NetControl::FORWARD NetControl::ADDRESS 192.168.18.50/32 - - 0 1.000000 - plugin-2 -XXXXXXXXXX.XXXXXX 2 NetControl::RULE EXPIRE NetControl::TIMEOUT NetControl::DROP NetControl::FORWARD NetControl::ADDRESS 192.168.18.50/32 - - 0 1.000000 - plugin-1 -XXXXXXXXXX.XXXXXX 2 NetControl::RULE REMOVE NetControl::REQUESTED NetControl::DROP NetControl::FORWARD NetControl::ADDRESS 192.168.18.50/32 - - 0 1.000000 - plugin-1 XXXXXXXXXX.XXXXXX 2 NetControl::RULE EXPIRE NetControl::TIMEOUT NetControl::DROP NetControl::FORWARD NetControl::ADDRESS 192.168.18.50/32 - - 0 1.000000 - plugin-2 XXXXXXXXXX.XXXXXX 2 NetControl::RULE REMOVE NetControl::REQUESTED NetControl::DROP NetControl::FORWARD NetControl::ADDRESS 192.168.18.50/32 - - 0 1.000000 - plugin-2 -XXXXXXXXXX.XXXXXX 2 NetControl::RULE REMOVE NetControl::SUCCEEDED NetControl::DROP NetControl::FORWARD NetControl::ADDRESS 192.168.18.50/32 - - 0 1.000000 - plugin-1 +XXXXXXXXXX.XXXXXX 2 NetControl::RULE EXPIRE NetControl::TIMEOUT NetControl::DROP NetControl::FORWARD NetControl::ADDRESS 192.168.18.50/32 - - 0 1.000000 - plugin-1 +XXXXXXXXXX.XXXXXX 2 NetControl::RULE REMOVE NetControl::REQUESTED NetControl::DROP NetControl::FORWARD NetControl::ADDRESS 192.168.18.50/32 - - 0 1.000000 - plugin-1 XXXXXXXXXX.XXXXXX 2 NetControl::RULE REMOVE NetControl::SUCCEEDED NetControl::DROP NetControl::FORWARD NetControl::ADDRESS 192.168.18.50/32 - - 0 1.000000 - plugin-2 +XXXXXXXXXX.XXXXXX 2 NetControl::RULE REMOVE NetControl::SUCCEEDED NetControl::DROP NetControl::FORWARD NetControl::ADDRESS 192.168.18.50/32 - - 0 1.000000 - plugin-1 #close XXXX-XX-XX-XX-XX-XX