mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 06:38:20 +00:00
Merge remote-tracking branch 'origin/master' into topic/johanna/243
This commit is contained in:
commit
824ccde6fc
1222 changed files with 2435 additions and 2096 deletions
|
@ -1,6 +1,6 @@
|
|||
|
||||
#include <broker/broker.hh>
|
||||
#include <broker/bro.hh>
|
||||
#include <broker/zeek.hh>
|
||||
#include <cstdio>
|
||||
#include <cstring>
|
||||
#include <unistd.h>
|
||||
|
@ -24,14 +24,6 @@ using namespace std;
|
|||
|
||||
namespace bro_broker {
|
||||
|
||||
// Max number of log messages buffered per stream before we send them out as
|
||||
// a batch.
|
||||
static const int LOG_BATCH_SIZE = 400;
|
||||
|
||||
// Max secs to buffer log messages before sending the current set out as a
|
||||
// batch.
|
||||
static const double LOG_BUFFER_INTERVAL = 1.0;
|
||||
|
||||
static inline Val* get_option(const char* option)
|
||||
{
|
||||
auto id = global_scope()->Lookup(option);
|
||||
|
@ -141,6 +133,9 @@ Manager::Manager(bool arg_reading_pcaps)
|
|||
reading_pcaps = arg_reading_pcaps;
|
||||
after_zeek_init = false;
|
||||
peer_count = 0;
|
||||
times_processed_without_idle = 0;
|
||||
log_batch_size = 0;
|
||||
log_batch_interval = 0;
|
||||
log_topic_func = nullptr;
|
||||
vector_of_data_type = nullptr;
|
||||
log_id_type = nullptr;
|
||||
|
@ -157,6 +152,8 @@ void Manager::InitPostScript()
|
|||
{
|
||||
DBG_LOG(DBG_BROKER, "Initializing");
|
||||
|
||||
log_batch_size = get_option("Broker::log_batch_size")->AsCount();
|
||||
log_batch_interval = get_option("Broker::log_batch_interval")->AsInterval();
|
||||
default_log_topic_prefix =
|
||||
get_option("Broker::default_log_topic_prefix")->AsString()->CheckString();
|
||||
log_topic_func = get_option("Broker::log_topic")->AsFunc();
|
||||
|
@ -361,7 +358,7 @@ bool Manager::PublishEvent(string topic, std::string name, broker::vector args)
|
|||
|
||||
DBG_LOG(DBG_BROKER, "Publishing event: %s",
|
||||
RenderEvent(topic, name, args).c_str());
|
||||
broker::bro::Event ev(std::move(name), std::move(args));
|
||||
broker::zeek::Event ev(std::move(name), std::move(args));
|
||||
bstate->endpoint.publish(move(topic), std::move(ev));
|
||||
++statistics.num_events_outgoing;
|
||||
return true;
|
||||
|
@ -422,7 +419,7 @@ bool Manager::PublishIdentifier(std::string topic, std::string id)
|
|||
return false;
|
||||
}
|
||||
|
||||
broker::bro::IdentifierUpdate msg(move(id), move(*data));
|
||||
broker::zeek::IdentifierUpdate msg(move(id), move(*data));
|
||||
DBG_LOG(DBG_BROKER, "Publishing id-update: %s",
|
||||
RenderMessage(topic, msg).c_str());
|
||||
bstate->endpoint.publish(move(topic), move(msg));
|
||||
|
@ -473,7 +470,7 @@ bool Manager::PublishLogCreate(EnumVal* stream, EnumVal* writer,
|
|||
std::string topic = default_log_topic_prefix + stream_id;
|
||||
auto bstream_id = broker::enum_value(move(stream_id));
|
||||
auto bwriter_id = broker::enum_value(move(writer_id));
|
||||
broker::bro::LogCreate msg(move(bstream_id), move(bwriter_id), move(writer_info), move(fields_data));
|
||||
broker::zeek::LogCreate msg(move(bstream_id), move(bwriter_id), move(writer_info), move(fields_data));
|
||||
|
||||
DBG_LOG(DBG_BROKER, "Publishing log creation: %s", RenderMessage(topic, msg).c_str());
|
||||
|
||||
|
@ -561,7 +558,7 @@ bool Manager::PublishLogWrite(EnumVal* stream, EnumVal* writer, string path, int
|
|||
|
||||
auto bstream_id = broker::enum_value(move(stream_id));
|
||||
auto bwriter_id = broker::enum_value(move(writer_id));
|
||||
broker::bro::LogWrite msg(move(bstream_id), move(bwriter_id), move(path),
|
||||
broker::zeek::LogWrite msg(move(bstream_id), move(bwriter_id), move(path),
|
||||
move(serial_data));
|
||||
|
||||
DBG_LOG(DBG_BROKER, "Buffering log record: %s", RenderMessage(topic, msg).c_str());
|
||||
|
@ -574,14 +571,14 @@ bool Manager::PublishLogWrite(EnumVal* stream, EnumVal* writer, string path, int
|
|||
auto& pending_batch = lb.msgs[topic];
|
||||
pending_batch.emplace_back(std::move(msg));
|
||||
|
||||
if ( lb.message_count >= LOG_BATCH_SIZE ||
|
||||
(network_time - lb.last_flush >= LOG_BUFFER_INTERVAL) )
|
||||
statistics.num_logs_outgoing += lb.Flush(bstate->endpoint);
|
||||
if ( lb.message_count >= log_batch_size ||
|
||||
(network_time - lb.last_flush >= log_batch_interval ) )
|
||||
statistics.num_logs_outgoing += lb.Flush(bstate->endpoint, log_batch_size);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
size_t Manager::LogBuffer::Flush(broker::endpoint& endpoint)
|
||||
size_t Manager::LogBuffer::Flush(broker::endpoint& endpoint, size_t log_batch_size)
|
||||
{
|
||||
if ( endpoint.is_shutdown() )
|
||||
return 0;
|
||||
|
@ -595,9 +592,9 @@ size_t Manager::LogBuffer::Flush(broker::endpoint& endpoint)
|
|||
auto& topic = kv.first;
|
||||
auto& pending_batch = kv.second;
|
||||
broker::vector batch;
|
||||
batch.reserve(LOG_BATCH_SIZE + 1);
|
||||
batch.reserve(log_batch_size + 1);
|
||||
pending_batch.swap(batch);
|
||||
broker::bro::Batch msg(std::move(batch));
|
||||
broker::zeek::Batch msg(std::move(batch));
|
||||
endpoint.publish(topic, move(msg));
|
||||
}
|
||||
|
||||
|
@ -613,7 +610,7 @@ size_t Manager::FlushLogBuffers()
|
|||
auto rval = 0u;
|
||||
|
||||
for ( auto& lb : log_buffers )
|
||||
rval += lb.Flush(bstate->endpoint);
|
||||
rval += lb.Flush(bstate->endpoint, log_batch_interval);
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
@ -842,31 +839,31 @@ double Manager::NextTimestamp(double* local_network_time)
|
|||
|
||||
void Manager::DispatchMessage(const broker::topic& topic, broker::data msg)
|
||||
{
|
||||
switch ( broker::bro::Message::type(msg) ) {
|
||||
case broker::bro::Message::Type::Invalid:
|
||||
switch ( broker::zeek::Message::type(msg) ) {
|
||||
case broker::zeek::Message::Type::Invalid:
|
||||
reporter->Warning("received invalid broker message: %s",
|
||||
broker::to_string(msg).data());
|
||||
break;
|
||||
|
||||
case broker::bro::Message::Type::Event:
|
||||
case broker::zeek::Message::Type::Event:
|
||||
ProcessEvent(topic, std::move(msg));
|
||||
break;
|
||||
|
||||
case broker::bro::Message::Type::LogCreate:
|
||||
case broker::zeek::Message::Type::LogCreate:
|
||||
ProcessLogCreate(std::move(msg));
|
||||
break;
|
||||
|
||||
case broker::bro::Message::Type::LogWrite:
|
||||
case broker::zeek::Message::Type::LogWrite:
|
||||
ProcessLogWrite(std::move(msg));
|
||||
break;
|
||||
|
||||
case broker::bro::Message::Type::IdentifierUpdate:
|
||||
case broker::zeek::Message::Type::IdentifierUpdate:
|
||||
ProcessIdentifierUpdate(std::move(msg));
|
||||
break;
|
||||
|
||||
case broker::bro::Message::Type::Batch:
|
||||
case broker::zeek::Message::Type::Batch:
|
||||
{
|
||||
broker::bro::Batch batch(std::move(msg));
|
||||
broker::zeek::Batch batch(std::move(msg));
|
||||
|
||||
if ( ! batch.valid() )
|
||||
{
|
||||
|
@ -945,11 +942,36 @@ void Manager::Process()
|
|||
}
|
||||
}
|
||||
|
||||
SetIdle(! had_input);
|
||||
if ( had_input )
|
||||
{
|
||||
++times_processed_without_idle;
|
||||
|
||||
// The max number of Process calls allowed to happen in a row without
|
||||
// idling is chosen a bit arbitrarily, except 12 is around half of the
|
||||
// SELECT_FREQUENCY (25).
|
||||
//
|
||||
// But probably the general idea should be for it to have some relation
|
||||
// to the SELECT_FREQUENCY: less than it so other busy IOSources can
|
||||
// fit several Process loops in before the next poll event (e.g. the
|
||||
// select() call ), but still large enough such that we don't have to
|
||||
// wait long before the next poll ourselves after being forced to idle.
|
||||
if ( times_processed_without_idle > 12 )
|
||||
{
|
||||
times_processed_without_idle = 0;
|
||||
SetIdle(true);
|
||||
}
|
||||
else
|
||||
SetIdle(false);
|
||||
}
|
||||
else
|
||||
{
|
||||
times_processed_without_idle = 0;
|
||||
SetIdle(true);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void Manager::ProcessEvent(const broker::topic& topic, broker::bro::Event ev)
|
||||
void Manager::ProcessEvent(const broker::topic& topic, broker::zeek::Event ev)
|
||||
{
|
||||
if ( ! ev.valid() )
|
||||
{
|
||||
|
@ -1025,7 +1047,7 @@ void Manager::ProcessEvent(const broker::topic& topic, broker::bro::Event ev)
|
|||
}
|
||||
}
|
||||
|
||||
bool bro_broker::Manager::ProcessLogCreate(broker::bro::LogCreate lc)
|
||||
bool bro_broker::Manager::ProcessLogCreate(broker::zeek::LogCreate lc)
|
||||
{
|
||||
DBG_LOG(DBG_BROKER, "Received log-create: %s", RenderMessage(lc).c_str());
|
||||
if ( ! lc.valid() )
|
||||
|
@ -1095,7 +1117,7 @@ bool bro_broker::Manager::ProcessLogCreate(broker::bro::LogCreate lc)
|
|||
return true;
|
||||
}
|
||||
|
||||
bool bro_broker::Manager::ProcessLogWrite(broker::bro::LogWrite lw)
|
||||
bool bro_broker::Manager::ProcessLogWrite(broker::zeek::LogWrite lw)
|
||||
{
|
||||
DBG_LOG(DBG_BROKER, "Received log-write: %s", RenderMessage(lw).c_str());
|
||||
|
||||
|
@ -1182,7 +1204,7 @@ bool bro_broker::Manager::ProcessLogWrite(broker::bro::LogWrite lw)
|
|||
return true;
|
||||
}
|
||||
|
||||
bool Manager::ProcessIdentifierUpdate(broker::bro::IdentifierUpdate iu)
|
||||
bool Manager::ProcessIdentifierUpdate(broker::zeek::IdentifierUpdate iu)
|
||||
{
|
||||
DBG_LOG(DBG_BROKER, "Received id-update: %s", RenderMessage(iu).c_str());
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue