GH-780: Prevent log batches from indefinite buffering

Logs that got sent sparsely or burstily would get buffered for long
periods of time since the logic to flush them only does so on the next
log write.  In the worst case, a subsequent log write could never happen
and cause a log entry to be indefinitely buffered.

This fix introduces a recurring event/timer to simply flush all pending
logs at frequency of Broker::log_batch_interval.
This commit is contained in:
Jon Siwek 2020-02-05 13:03:21 -08:00
parent 0de6bba95e
commit 43e54c7930
17 changed files with 224 additions and 22 deletions

View file

@ -131,7 +131,6 @@ Manager::Manager(bool arg_reading_pcaps)
after_zeek_init = false;
peer_count = 0;
log_batch_size = 0;
log_batch_interval = 0;
log_topic_func = nullptr;
vector_of_data_type = nullptr;
log_id_type = nullptr;
@ -147,7 +146,6 @@ void Manager::InitPostScript()
DBG_LOG(DBG_BROKER, "Initializing");
log_batch_size = get_option("Broker::log_batch_size")->AsCount();
log_batch_interval = get_option("Broker::log_batch_interval")->AsInterval();
default_log_topic_prefix =
get_option("Broker::default_log_topic_prefix")->AsString()->CheckString();
log_topic_func = get_option("Broker::log_topic")->AsFunc();
@ -580,8 +578,7 @@ bool Manager::PublishLogWrite(EnumVal* stream, EnumVal* writer, string path, int
auto& pending_batch = lb.msgs[topic];
pending_batch.emplace_back(msg.move_data());
if ( lb.message_count >= log_batch_size ||
(network_time - lb.last_flush >= log_batch_interval ) )
if ( lb.message_count >= log_batch_size )
statistics.num_logs_outgoing += lb.Flush(bstate->endpoint, log_batch_size);
return true;
@ -608,7 +605,6 @@ size_t Manager::LogBuffer::Flush(broker::endpoint& endpoint, size_t log_batch_si
}
auto rval = message_count;
last_flush = network_time;
message_count = 0;
return rval;
}
@ -619,8 +615,9 @@ size_t Manager::FlushLogBuffers()
auto rval = 0u;
for ( auto& lb : log_buffers )
rval += lb.Flush(bstate->endpoint, log_batch_interval);
rval += lb.Flush(bstate->endpoint, log_batch_size);
statistics.num_logs_outgoing += rval;
return rval;
}