mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 14:48:21 +00:00
Merge remote-tracking branch 'origin/topic/jsiwek/broker-less-copies'
* origin/topic/jsiwek/broker-less-copies: Reduce data copying in Broker message processing
This commit is contained in:
commit
574d2c363a
4 changed files with 33 additions and 18 deletions
15
CHANGES
15
CHANGES
|
@ -1,4 +1,19 @@
|
||||||
|
|
||||||
|
2.6-352 | 2019-05-28 17:57:36 -0700
|
||||||
|
|
||||||
|
* Reduce data copying in Broker message processing (Jon Siwek, Corelight)
|
||||||
|
|
||||||
|
* Improve Broker I/O loop integration: less mutex locking (Jon Siwek, Corelight)
|
||||||
|
|
||||||
|
Checking a subscriber for available messages required locking a mutex,
|
||||||
|
but we should never actually need to do that in the main-loop to check
|
||||||
|
for Broker readiness since we can rely on file descriptor polling.
|
||||||
|
|
||||||
|
* Improve processing of broker data store responses (Jon Siwek, Corelight)
|
||||||
|
|
||||||
|
Now retrieves and processes all N available responses at once instead
|
||||||
|
of one-by-one-until-empty.
|
||||||
|
|
||||||
2.6-345 | 2019-05-28 11:32:16 -0700
|
2.6-345 | 2019-05-28 11:32:16 -0700
|
||||||
|
|
||||||
* RDP: Add parsing and logging of channels requested by the client. (Vlad Grigorescu)
|
* RDP: Add parsing and logging of channels requested by the client. (Vlad Grigorescu)
|
||||||
|
|
2
VERSION
2
VERSION
|
@ -1 +1 @@
|
||||||
2.6-345
|
2.6-352
|
||||||
|
|
|
@ -1 +1 @@
|
||||||
Subproject commit 474a6d902b1c41912b216c06a74085ff3b102bad
|
Subproject commit fc077e2f767252006bf7a3f3f1368c0165c987c3
|
|
@ -83,17 +83,17 @@ struct scoped_reporter_location {
|
||||||
};
|
};
|
||||||
|
|
||||||
#ifdef DEBUG
|
#ifdef DEBUG
|
||||||
static std::string RenderMessage(std::string topic, broker::data x)
|
static std::string RenderMessage(std::string topic, const broker::data& x)
|
||||||
{
|
{
|
||||||
return fmt("%s -> %s", broker::to_string(x).c_str(), topic.c_str());
|
return fmt("%s -> %s", broker::to_string(x).c_str(), topic.c_str());
|
||||||
}
|
}
|
||||||
|
|
||||||
static std::string RenderEvent(std::string topic, std::string name, broker::data args)
|
static std::string RenderEvent(std::string topic, std::string name, const broker::data& args)
|
||||||
{
|
{
|
||||||
return fmt("%s(%s) -> %s", name.c_str(), broker::to_string(args).c_str(), topic.c_str());
|
return fmt("%s(%s) -> %s", name.c_str(), broker::to_string(args).c_str(), topic.c_str());
|
||||||
}
|
}
|
||||||
|
|
||||||
static std::string RenderMessage(broker::store::response x)
|
static std::string RenderMessage(const broker::store::response& x)
|
||||||
{
|
{
|
||||||
return fmt("%s [id %" PRIu64 "]", (x.answer ? broker::to_string(*x.answer).c_str() : "<no answer>"), x.id);
|
return fmt("%s [id %" PRIu64 "]", (x.answer ? broker::to_string(*x.answer).c_str() : "<no answer>"), x.id);
|
||||||
}
|
}
|
||||||
|
@ -358,7 +358,7 @@ bool Manager::PublishEvent(string topic, std::string name, broker::vector args)
|
||||||
DBG_LOG(DBG_BROKER, "Publishing event: %s",
|
DBG_LOG(DBG_BROKER, "Publishing event: %s",
|
||||||
RenderEvent(topic, name, args).c_str());
|
RenderEvent(topic, name, args).c_str());
|
||||||
broker::zeek::Event ev(std::move(name), std::move(args));
|
broker::zeek::Event ev(std::move(name), std::move(args));
|
||||||
bstate->endpoint.publish(move(topic), std::move(ev));
|
bstate->endpoint.publish(move(topic), ev.move_data());
|
||||||
++statistics.num_events_outgoing;
|
++statistics.num_events_outgoing;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -420,8 +420,8 @@ bool Manager::PublishIdentifier(std::string topic, std::string id)
|
||||||
|
|
||||||
broker::zeek::IdentifierUpdate msg(move(id), move(*data));
|
broker::zeek::IdentifierUpdate msg(move(id), move(*data));
|
||||||
DBG_LOG(DBG_BROKER, "Publishing id-update: %s",
|
DBG_LOG(DBG_BROKER, "Publishing id-update: %s",
|
||||||
RenderMessage(topic, msg).c_str());
|
RenderMessage(topic, msg.as_data()).c_str());
|
||||||
bstate->endpoint.publish(move(topic), move(msg));
|
bstate->endpoint.publish(move(topic), msg.move_data());
|
||||||
++statistics.num_ids_outgoing;
|
++statistics.num_ids_outgoing;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -471,14 +471,14 @@ bool Manager::PublishLogCreate(EnumVal* stream, EnumVal* writer,
|
||||||
auto bwriter_id = broker::enum_value(move(writer_id));
|
auto bwriter_id = broker::enum_value(move(writer_id));
|
||||||
broker::zeek::LogCreate msg(move(bstream_id), move(bwriter_id), move(writer_info), move(fields_data));
|
broker::zeek::LogCreate msg(move(bstream_id), move(bwriter_id), move(writer_info), move(fields_data));
|
||||||
|
|
||||||
DBG_LOG(DBG_BROKER, "Publishing log creation: %s", RenderMessage(topic, msg).c_str());
|
DBG_LOG(DBG_BROKER, "Publishing log creation: %s", RenderMessage(topic, msg.as_data()).c_str());
|
||||||
|
|
||||||
if ( peer.node != NoPeer.node )
|
if ( peer.node != NoPeer.node )
|
||||||
// Direct message.
|
// Direct message.
|
||||||
bstate->endpoint.publish(peer, move(topic), move(msg));
|
bstate->endpoint.publish(peer, move(topic), msg.move_data());
|
||||||
else
|
else
|
||||||
// Broadcast.
|
// Broadcast.
|
||||||
bstate->endpoint.publish(move(topic), move(msg));
|
bstate->endpoint.publish(move(topic), msg.move_data());
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -560,7 +560,7 @@ bool Manager::PublishLogWrite(EnumVal* stream, EnumVal* writer, string path, int
|
||||||
broker::zeek::LogWrite msg(move(bstream_id), move(bwriter_id), move(path),
|
broker::zeek::LogWrite msg(move(bstream_id), move(bwriter_id), move(path),
|
||||||
move(serial_data));
|
move(serial_data));
|
||||||
|
|
||||||
DBG_LOG(DBG_BROKER, "Buffering log record: %s", RenderMessage(topic, msg).c_str());
|
DBG_LOG(DBG_BROKER, "Buffering log record: %s", RenderMessage(topic, msg.as_data()).c_str());
|
||||||
|
|
||||||
if ( log_buffers.size() <= (unsigned int)stream_id_num )
|
if ( log_buffers.size() <= (unsigned int)stream_id_num )
|
||||||
log_buffers.resize(stream_id_num + 1);
|
log_buffers.resize(stream_id_num + 1);
|
||||||
|
@ -568,7 +568,7 @@ bool Manager::PublishLogWrite(EnumVal* stream, EnumVal* writer, string path, int
|
||||||
auto& lb = log_buffers[stream_id_num];
|
auto& lb = log_buffers[stream_id_num];
|
||||||
++lb.message_count;
|
++lb.message_count;
|
||||||
auto& pending_batch = lb.msgs[topic];
|
auto& pending_batch = lb.msgs[topic];
|
||||||
pending_batch.emplace_back(std::move(msg));
|
pending_batch.emplace_back(msg.move_data());
|
||||||
|
|
||||||
if ( lb.message_count >= log_batch_size ||
|
if ( lb.message_count >= log_batch_size ||
|
||||||
(network_time - lb.last_flush >= log_batch_interval ) )
|
(network_time - lb.last_flush >= log_batch_interval ) )
|
||||||
|
@ -594,7 +594,7 @@ size_t Manager::LogBuffer::Flush(broker::endpoint& endpoint, size_t log_batch_si
|
||||||
batch.reserve(log_batch_size + 1);
|
batch.reserve(log_batch_size + 1);
|
||||||
pending_batch.swap(batch);
|
pending_batch.swap(batch);
|
||||||
broker::zeek::Batch msg(std::move(batch));
|
broker::zeek::Batch msg(std::move(batch));
|
||||||
endpoint.publish(topic, move(msg));
|
endpoint.publish(topic, msg.move_data());
|
||||||
}
|
}
|
||||||
|
|
||||||
auto rval = message_count;
|
auto rval = message_count;
|
||||||
|
@ -963,7 +963,7 @@ void Manager::ProcessEvent(const broker::topic& topic, broker::zeek::Event ev)
|
||||||
if ( ! ev.valid() )
|
if ( ! ev.valid() )
|
||||||
{
|
{
|
||||||
reporter->Warning("received invalid broker Event: %s",
|
reporter->Warning("received invalid broker Event: %s",
|
||||||
broker::to_string(ev).data());
|
broker::to_string(ev.as_data()).data());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1036,7 +1036,7 @@ void Manager::ProcessEvent(const broker::topic& topic, broker::zeek::Event ev)
|
||||||
|
|
||||||
bool bro_broker::Manager::ProcessLogCreate(broker::zeek::LogCreate lc)
|
bool bro_broker::Manager::ProcessLogCreate(broker::zeek::LogCreate lc)
|
||||||
{
|
{
|
||||||
DBG_LOG(DBG_BROKER, "Received log-create: %s", RenderMessage(lc).c_str());
|
DBG_LOG(DBG_BROKER, "Received log-create: %s", RenderMessage(lc.as_data()).c_str());
|
||||||
if ( ! lc.valid() )
|
if ( ! lc.valid() )
|
||||||
{
|
{
|
||||||
reporter->Warning("received invalid broker LogCreate: %s",
|
reporter->Warning("received invalid broker LogCreate: %s",
|
||||||
|
@ -1106,7 +1106,7 @@ bool bro_broker::Manager::ProcessLogCreate(broker::zeek::LogCreate lc)
|
||||||
|
|
||||||
bool bro_broker::Manager::ProcessLogWrite(broker::zeek::LogWrite lw)
|
bool bro_broker::Manager::ProcessLogWrite(broker::zeek::LogWrite lw)
|
||||||
{
|
{
|
||||||
DBG_LOG(DBG_BROKER, "Received log-write: %s", RenderMessage(lw).c_str());
|
DBG_LOG(DBG_BROKER, "Received log-write: %s", RenderMessage(lw.as_data()).c_str());
|
||||||
|
|
||||||
if ( ! lw.valid() )
|
if ( ! lw.valid() )
|
||||||
{
|
{
|
||||||
|
@ -1193,7 +1193,7 @@ bool bro_broker::Manager::ProcessLogWrite(broker::zeek::LogWrite lw)
|
||||||
|
|
||||||
bool Manager::ProcessIdentifierUpdate(broker::zeek::IdentifierUpdate iu)
|
bool Manager::ProcessIdentifierUpdate(broker::zeek::IdentifierUpdate iu)
|
||||||
{
|
{
|
||||||
DBG_LOG(DBG_BROKER, "Received id-update: %s", RenderMessage(iu).c_str());
|
DBG_LOG(DBG_BROKER, "Received id-update: %s", RenderMessage(iu.as_data()).c_str());
|
||||||
|
|
||||||
if ( ! iu.valid() )
|
if ( ! iu.valid() )
|
||||||
{
|
{
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue