From cb6b9a1f1a914fb462a0bf6acea26b08fef1429d Mon Sep 17 00:00:00 2001 From: Jon Siwek Date: Wed, 8 May 2019 12:42:18 -0700 Subject: [PATCH] Allow tuning Broker log batching via scripts Via redefining "Broker::log_batch_size" or "Broker::log_batch_interval" --- CHANGES | 6 ++++++ VERSION | 2 +- doc | 2 +- scripts/base/frameworks/broker/main.zeek | 8 ++++++++ src/broker/Manager.cc | 24 ++++++++++-------------- src/broker/Manager.h | 4 +++- 6 files changed, 29 insertions(+), 17 deletions(-) diff --git a/CHANGES b/CHANGES index adb24b8ed4..c4d2d26a68 100644 --- a/CHANGES +++ b/CHANGES @@ -1,4 +1,10 @@ +2.6-277 | 2019-05-08 12:42:18 -0700 + + * Allow tuning Broker log batching via scripts (Jon Siwek, Corelight) + + Via redefining "Broker::log_batch_size" or "Broker::log_batch_interval" + 2.6-276 | 2019-05-08 09:03:27 -0700 * Force the Broker IOSource to idle periodically, preventing packet diff --git a/VERSION b/VERSION index 5a9089c29d..64298b5057 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.6-276 +2.6-277 diff --git a/doc b/doc index 736323fe8a..6eb2d810ad 160000 --- a/doc +++ b/doc @@ -1 +1 @@ -Subproject commit 736323fe8a78fd8478325c134afe38c075e297a7 +Subproject commit 6eb2d810ad7b2f66f6739bc23dc82ba5d6b27ec1 diff --git a/scripts/base/frameworks/broker/main.zeek b/scripts/base/frameworks/broker/main.zeek index f64ff0ce14..a61f81f239 100644 --- a/scripts/base/frameworks/broker/main.zeek +++ b/scripts/base/frameworks/broker/main.zeek @@ -61,6 +61,14 @@ export { ## control mechanisms). const congestion_queue_size = 200 &redef; + ## The max number of log entries per log stream to batch together when + ## sending log messages to a remote logger. + const log_batch_size = 400 &redef; + + ## Max time to buffer log messages before sending the current set out as a + ## batch. + const log_batch_interval = 1sec &redef; + ## Max number of threads to use for Broker/CAF functionality. The ## BRO_BROKER_MAX_THREADS environment variable overrides this setting. const max_threads = 1 &redef; diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index ebde5229d3..fbc15b3c18 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -23,14 +23,6 @@ using namespace std; namespace bro_broker { -// Max number of log messages buffered per stream before we send them out as -// a batch. -static const int LOG_BATCH_SIZE = 400; - -// Max secs to buffer log messages before sending the current set out as a -// batch. -static const double LOG_BUFFER_INTERVAL = 1.0; - static inline Val* get_option(const char* option) { auto id = global_scope()->Lookup(option); @@ -141,6 +133,8 @@ Manager::Manager(bool arg_reading_pcaps) after_zeek_init = false; peer_count = 0; times_processed_without_idle = 0; + log_batch_size = 0; + log_batch_interval = 0; log_topic_func = nullptr; vector_of_data_type = nullptr; log_id_type = nullptr; @@ -157,6 +151,8 @@ void Manager::InitPostScript() { DBG_LOG(DBG_BROKER, "Initializing"); + log_batch_size = get_option("Broker::log_batch_size")->AsCount(); + log_batch_interval = get_option("Broker::log_batch_interval")->AsInterval(); default_log_topic_prefix = get_option("Broker::default_log_topic_prefix")->AsString()->CheckString(); log_topic_func = get_option("Broker::log_topic")->AsFunc(); @@ -574,14 +570,14 @@ bool Manager::PublishLogWrite(EnumVal* stream, EnumVal* writer, string path, int auto& pending_batch = lb.msgs[topic]; pending_batch.emplace_back(std::move(msg)); - if ( lb.message_count >= LOG_BATCH_SIZE || - (network_time - lb.last_flush >= LOG_BUFFER_INTERVAL) ) - statistics.num_logs_outgoing += lb.Flush(bstate->endpoint); + if ( lb.message_count >= log_batch_size || + (network_time - lb.last_flush >= log_batch_interval ) ) + statistics.num_logs_outgoing += lb.Flush(bstate->endpoint, log_batch_size); return true; } -size_t Manager::LogBuffer::Flush(broker::endpoint& endpoint) +size_t Manager::LogBuffer::Flush(broker::endpoint& endpoint, size_t log_batch_size) { if ( endpoint.is_shutdown() ) return 0; @@ -595,7 +591,7 @@ size_t Manager::LogBuffer::Flush(broker::endpoint& endpoint) auto& topic = kv.first; auto& pending_batch = kv.second; broker::vector batch; - batch.reserve(LOG_BATCH_SIZE + 1); + batch.reserve(log_batch_size + 1); pending_batch.swap(batch); broker::bro::Batch msg(std::move(batch)); endpoint.publish(topic, move(msg)); @@ -613,7 +609,7 @@ size_t Manager::FlushLogBuffers() auto rval = 0u; for ( auto& lb : log_buffers ) - rval += lb.Flush(bstate->endpoint); + rval += lb.Flush(bstate->endpoint, log_batch_interval); return rval; } diff --git a/src/broker/Manager.h b/src/broker/Manager.h index 901cd4d06c..004ad01dc9 100644 --- a/src/broker/Manager.h +++ b/src/broker/Manager.h @@ -353,7 +353,7 @@ private: double last_flush; size_t message_count; - size_t Flush(broker::endpoint& endpoint); + size_t Flush(broker::endpoint& endpoint, size_t batch_size); }; // Data stores @@ -385,6 +385,8 @@ private: int peer_count; int times_processed_without_idle; + size_t log_batch_size; + double log_batch_interval; Func* log_topic_func; VectorType* vector_of_data_type; EnumType* log_id_type;