diff --git a/CHANGES b/CHANGES index bb3e3d3ea2..416a825e62 100644 --- a/CHANGES +++ b/CHANGES @@ -1,4 +1,19 @@ +2.6-352 | 2019-05-28 17:57:36 -0700 + + * Reduce data copying in Broker message processing (Jon Siwek, Corelight) + + * Improve Broker I/O loop integration: less mutex locking (Jon Siwek, Corelight) + + Checking a subscriber for available messages required locking a mutex, + but we should never actually need to do that in the main-loop to check + for Broker readiness since we can rely on file descriptor polling. + + * Improve processing of broker data store responses (Jon Siwek, Corelight) + + Now retrieves and processes all N available responses at once instead + of one-by-one-until-empty. + 2.6-345 | 2019-05-28 11:32:16 -0700 * RDP: Add parsing and logging of channels requested by the client. (Vlad Grigorescu) diff --git a/VERSION b/VERSION index b4b13dd274..0eeddbc5fc 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.6-345 +2.6-352 diff --git a/aux/broker b/aux/broker index 474a6d902b..fc077e2f76 160000 --- a/aux/broker +++ b/aux/broker @@ -1 +1 @@ -Subproject commit 474a6d902b1c41912b216c06a74085ff3b102bad +Subproject commit fc077e2f767252006bf7a3f3f1368c0165c987c3 diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index 9288c576e4..f5e374e239 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -83,17 +83,17 @@ struct scoped_reporter_location { }; #ifdef DEBUG -static std::string RenderMessage(std::string topic, broker::data x) +static std::string RenderMessage(std::string topic, const broker::data& x) { return fmt("%s -> %s", broker::to_string(x).c_str(), topic.c_str()); } -static std::string RenderEvent(std::string topic, std::string name, broker::data args) +static std::string RenderEvent(std::string topic, std::string name, const broker::data& args) { return fmt("%s(%s) -> %s", name.c_str(), broker::to_string(args).c_str(), topic.c_str()); } -static std::string RenderMessage(broker::store::response x) +static std::string RenderMessage(const broker::store::response& x) { return fmt("%s [id %" PRIu64 "]", (x.answer ? broker::to_string(*x.answer).c_str() : ""), x.id); } @@ -358,7 +358,7 @@ bool Manager::PublishEvent(string topic, std::string name, broker::vector args) DBG_LOG(DBG_BROKER, "Publishing event: %s", RenderEvent(topic, name, args).c_str()); broker::zeek::Event ev(std::move(name), std::move(args)); - bstate->endpoint.publish(move(topic), std::move(ev)); + bstate->endpoint.publish(move(topic), ev.move_data()); ++statistics.num_events_outgoing; return true; } @@ -420,8 +420,8 @@ bool Manager::PublishIdentifier(std::string topic, std::string id) broker::zeek::IdentifierUpdate msg(move(id), move(*data)); DBG_LOG(DBG_BROKER, "Publishing id-update: %s", - RenderMessage(topic, msg).c_str()); - bstate->endpoint.publish(move(topic), move(msg)); + RenderMessage(topic, msg.as_data()).c_str()); + bstate->endpoint.publish(move(topic), msg.move_data()); ++statistics.num_ids_outgoing; return true; } @@ -471,14 +471,14 @@ bool Manager::PublishLogCreate(EnumVal* stream, EnumVal* writer, auto bwriter_id = broker::enum_value(move(writer_id)); broker::zeek::LogCreate msg(move(bstream_id), move(bwriter_id), move(writer_info), move(fields_data)); - DBG_LOG(DBG_BROKER, "Publishing log creation: %s", RenderMessage(topic, msg).c_str()); + DBG_LOG(DBG_BROKER, "Publishing log creation: %s", RenderMessage(topic, msg.as_data()).c_str()); if ( peer.node != NoPeer.node ) // Direct message. - bstate->endpoint.publish(peer, move(topic), move(msg)); + bstate->endpoint.publish(peer, move(topic), msg.move_data()); else // Broadcast. - bstate->endpoint.publish(move(topic), move(msg)); + bstate->endpoint.publish(move(topic), msg.move_data()); return true; } @@ -560,7 +560,7 @@ bool Manager::PublishLogWrite(EnumVal* stream, EnumVal* writer, string path, int broker::zeek::LogWrite msg(move(bstream_id), move(bwriter_id), move(path), move(serial_data)); - DBG_LOG(DBG_BROKER, "Buffering log record: %s", RenderMessage(topic, msg).c_str()); + DBG_LOG(DBG_BROKER, "Buffering log record: %s", RenderMessage(topic, msg.as_data()).c_str()); if ( log_buffers.size() <= (unsigned int)stream_id_num ) log_buffers.resize(stream_id_num + 1); @@ -568,7 +568,7 @@ bool Manager::PublishLogWrite(EnumVal* stream, EnumVal* writer, string path, int auto& lb = log_buffers[stream_id_num]; ++lb.message_count; auto& pending_batch = lb.msgs[topic]; - pending_batch.emplace_back(std::move(msg)); + pending_batch.emplace_back(msg.move_data()); if ( lb.message_count >= log_batch_size || (network_time - lb.last_flush >= log_batch_interval ) ) @@ -594,7 +594,7 @@ size_t Manager::LogBuffer::Flush(broker::endpoint& endpoint, size_t log_batch_si batch.reserve(log_batch_size + 1); pending_batch.swap(batch); broker::zeek::Batch msg(std::move(batch)); - endpoint.publish(topic, move(msg)); + endpoint.publish(topic, msg.move_data()); } auto rval = message_count; @@ -963,7 +963,7 @@ void Manager::ProcessEvent(const broker::topic& topic, broker::zeek::Event ev) if ( ! ev.valid() ) { reporter->Warning("received invalid broker Event: %s", - broker::to_string(ev).data()); + broker::to_string(ev.as_data()).data()); return; } @@ -1036,7 +1036,7 @@ void Manager::ProcessEvent(const broker::topic& topic, broker::zeek::Event ev) bool bro_broker::Manager::ProcessLogCreate(broker::zeek::LogCreate lc) { - DBG_LOG(DBG_BROKER, "Received log-create: %s", RenderMessage(lc).c_str()); + DBG_LOG(DBG_BROKER, "Received log-create: %s", RenderMessage(lc.as_data()).c_str()); if ( ! lc.valid() ) { reporter->Warning("received invalid broker LogCreate: %s", @@ -1106,7 +1106,7 @@ bool bro_broker::Manager::ProcessLogCreate(broker::zeek::LogCreate lc) bool bro_broker::Manager::ProcessLogWrite(broker::zeek::LogWrite lw) { - DBG_LOG(DBG_BROKER, "Received log-write: %s", RenderMessage(lw).c_str()); + DBG_LOG(DBG_BROKER, "Received log-write: %s", RenderMessage(lw.as_data()).c_str()); if ( ! lw.valid() ) { @@ -1193,7 +1193,7 @@ bool bro_broker::Manager::ProcessLogWrite(broker::zeek::LogWrite lw) bool Manager::ProcessIdentifierUpdate(broker::zeek::IdentifierUpdate iu) { - DBG_LOG(DBG_BROKER, "Received id-update: %s", RenderMessage(iu).c_str()); + DBG_LOG(DBG_BROKER, "Received id-update: %s", RenderMessage(iu.as_data()).c_str()); if ( ! iu.valid() ) {