Allow tuning Broker log batching via scripts

Via redefining "Broker::log_batch_size" or "Broker::log_batch_interval"
This commit is contained in:
Jon Siwek 2019-05-08 12:42:18 -07:00
parent 312713810f
commit cb6b9a1f1a
6 changed files with 29 additions and 17 deletions

View file

@ -1,4 +1,10 @@
2.6-277 | 2019-05-08 12:42:18 -0700
* Allow tuning Broker log batching via scripts (Jon Siwek, Corelight)
Via redefining "Broker::log_batch_size" or "Broker::log_batch_interval"
2.6-276 | 2019-05-08 09:03:27 -0700 2.6-276 | 2019-05-08 09:03:27 -0700
* Force the Broker IOSource to idle periodically, preventing packet * Force the Broker IOSource to idle periodically, preventing packet

View file

@ -1 +1 @@
2.6-276 2.6-277

2
doc

@ -1 +1 @@
Subproject commit 736323fe8a78fd8478325c134afe38c075e297a7 Subproject commit 6eb2d810ad7b2f66f6739bc23dc82ba5d6b27ec1

View file

@ -61,6 +61,14 @@ export {
## control mechanisms). ## control mechanisms).
const congestion_queue_size = 200 &redef; const congestion_queue_size = 200 &redef;
## The max number of log entries per log stream to batch together when
## sending log messages to a remote logger.
const log_batch_size = 400 &redef;
## Max time to buffer log messages before sending the current set out as a
## batch.
const log_batch_interval = 1sec &redef;
## Max number of threads to use for Broker/CAF functionality. The ## Max number of threads to use for Broker/CAF functionality. The
## BRO_BROKER_MAX_THREADS environment variable overrides this setting. ## BRO_BROKER_MAX_THREADS environment variable overrides this setting.
const max_threads = 1 &redef; const max_threads = 1 &redef;

View file

@ -23,14 +23,6 @@ using namespace std;
namespace bro_broker { namespace bro_broker {
// Max number of log messages buffered per stream before we send them out as
// a batch.
static const int LOG_BATCH_SIZE = 400;
// Max secs to buffer log messages before sending the current set out as a
// batch.
static const double LOG_BUFFER_INTERVAL = 1.0;
static inline Val* get_option(const char* option) static inline Val* get_option(const char* option)
{ {
auto id = global_scope()->Lookup(option); auto id = global_scope()->Lookup(option);
@ -141,6 +133,8 @@ Manager::Manager(bool arg_reading_pcaps)
after_zeek_init = false; after_zeek_init = false;
peer_count = 0; peer_count = 0;
times_processed_without_idle = 0; times_processed_without_idle = 0;
log_batch_size = 0;
log_batch_interval = 0;
log_topic_func = nullptr; log_topic_func = nullptr;
vector_of_data_type = nullptr; vector_of_data_type = nullptr;
log_id_type = nullptr; log_id_type = nullptr;
@ -157,6 +151,8 @@ void Manager::InitPostScript()
{ {
DBG_LOG(DBG_BROKER, "Initializing"); DBG_LOG(DBG_BROKER, "Initializing");
log_batch_size = get_option("Broker::log_batch_size")->AsCount();
log_batch_interval = get_option("Broker::log_batch_interval")->AsInterval();
default_log_topic_prefix = default_log_topic_prefix =
get_option("Broker::default_log_topic_prefix")->AsString()->CheckString(); get_option("Broker::default_log_topic_prefix")->AsString()->CheckString();
log_topic_func = get_option("Broker::log_topic")->AsFunc(); log_topic_func = get_option("Broker::log_topic")->AsFunc();
@ -574,14 +570,14 @@ bool Manager::PublishLogWrite(EnumVal* stream, EnumVal* writer, string path, int
auto& pending_batch = lb.msgs[topic]; auto& pending_batch = lb.msgs[topic];
pending_batch.emplace_back(std::move(msg)); pending_batch.emplace_back(std::move(msg));
if ( lb.message_count >= LOG_BATCH_SIZE || if ( lb.message_count >= log_batch_size ||
(network_time - lb.last_flush >= LOG_BUFFER_INTERVAL) ) (network_time - lb.last_flush >= log_batch_interval ) )
statistics.num_logs_outgoing += lb.Flush(bstate->endpoint); statistics.num_logs_outgoing += lb.Flush(bstate->endpoint, log_batch_size);
return true; return true;
} }
size_t Manager::LogBuffer::Flush(broker::endpoint& endpoint) size_t Manager::LogBuffer::Flush(broker::endpoint& endpoint, size_t log_batch_size)
{ {
if ( endpoint.is_shutdown() ) if ( endpoint.is_shutdown() )
return 0; return 0;
@ -595,7 +591,7 @@ size_t Manager::LogBuffer::Flush(broker::endpoint& endpoint)
auto& topic = kv.first; auto& topic = kv.first;
auto& pending_batch = kv.second; auto& pending_batch = kv.second;
broker::vector batch; broker::vector batch;
batch.reserve(LOG_BATCH_SIZE + 1); batch.reserve(log_batch_size + 1);
pending_batch.swap(batch); pending_batch.swap(batch);
broker::bro::Batch msg(std::move(batch)); broker::bro::Batch msg(std::move(batch));
endpoint.publish(topic, move(msg)); endpoint.publish(topic, move(msg));
@ -613,7 +609,7 @@ size_t Manager::FlushLogBuffers()
auto rval = 0u; auto rval = 0u;
for ( auto& lb : log_buffers ) for ( auto& lb : log_buffers )
rval += lb.Flush(bstate->endpoint); rval += lb.Flush(bstate->endpoint, log_batch_interval);
return rval; return rval;
} }

View file

@ -353,7 +353,7 @@ private:
double last_flush; double last_flush;
size_t message_count; size_t message_count;
size_t Flush(broker::endpoint& endpoint); size_t Flush(broker::endpoint& endpoint, size_t batch_size);
}; };
// Data stores // Data stores
@ -385,6 +385,8 @@ private:
int peer_count; int peer_count;
int times_processed_without_idle; int times_processed_without_idle;
size_t log_batch_size;
double log_batch_interval;
Func* log_topic_func; Func* log_topic_func;
VectorType* vector_of_data_type; VectorType* vector_of_data_type;
EnumType* log_id_type; EnumType* log_id_type;