Reduce data copying in Broker message processing

This commit is contained in:
Jon Siwek 2019-05-07 20:15:31 -07:00
parent 9029acd7e5
commit dbb49b17f4

View file

@ -91,17 +91,17 @@ struct scoped_reporter_location {
}; };
#ifdef DEBUG #ifdef DEBUG
static std::string RenderMessage(std::string topic, broker::data x) static std::string RenderMessage(std::string topic, const broker::data& x)
{ {
return fmt("%s -> %s", broker::to_string(x).c_str(), topic.c_str()); return fmt("%s -> %s", broker::to_string(x).c_str(), topic.c_str());
} }
static std::string RenderEvent(std::string topic, std::string name, broker::data args) static std::string RenderEvent(std::string topic, std::string name, const broker::data& args)
{ {
return fmt("%s(%s) -> %s", name.c_str(), broker::to_string(args).c_str(), topic.c_str()); return fmt("%s(%s) -> %s", name.c_str(), broker::to_string(args).c_str(), topic.c_str());
} }
static std::string RenderMessage(broker::store::response x) static std::string RenderMessage(const broker::store::response& x)
{ {
return fmt("%s [id %" PRIu64 "]", (x.answer ? broker::to_string(*x.answer).c_str() : "<no answer>"), x.id); return fmt("%s [id %" PRIu64 "]", (x.answer ? broker::to_string(*x.answer).c_str() : "<no answer>"), x.id);
} }
@ -361,7 +361,7 @@ bool Manager::PublishEvent(string topic, std::string name, broker::vector args)
DBG_LOG(DBG_BROKER, "Publishing event: %s", DBG_LOG(DBG_BROKER, "Publishing event: %s",
RenderEvent(topic, name, args).c_str()); RenderEvent(topic, name, args).c_str());
broker::bro::Event ev(std::move(name), std::move(args)); broker::bro::Event ev(std::move(name), std::move(args));
bstate->endpoint.publish(move(topic), std::move(ev)); bstate->endpoint.publish(move(topic), ev.move_data());
++statistics.num_events_outgoing; ++statistics.num_events_outgoing;
return true; return true;
} }
@ -423,8 +423,8 @@ bool Manager::PublishIdentifier(std::string topic, std::string id)
broker::bro::IdentifierUpdate msg(move(id), move(*data)); broker::bro::IdentifierUpdate msg(move(id), move(*data));
DBG_LOG(DBG_BROKER, "Publishing id-update: %s", DBG_LOG(DBG_BROKER, "Publishing id-update: %s",
RenderMessage(topic, msg).c_str()); RenderMessage(topic, msg.as_data()).c_str());
bstate->endpoint.publish(move(topic), move(msg)); bstate->endpoint.publish(move(topic), msg.move_data());
++statistics.num_ids_outgoing; ++statistics.num_ids_outgoing;
return true; return true;
} }
@ -474,14 +474,14 @@ bool Manager::PublishLogCreate(EnumVal* stream, EnumVal* writer,
auto bwriter_id = broker::enum_value(move(writer_id)); auto bwriter_id = broker::enum_value(move(writer_id));
broker::bro::LogCreate msg(move(bstream_id), move(bwriter_id), move(writer_info), move(fields_data)); broker::bro::LogCreate msg(move(bstream_id), move(bwriter_id), move(writer_info), move(fields_data));
DBG_LOG(DBG_BROKER, "Publishing log creation: %s", RenderMessage(topic, msg).c_str()); DBG_LOG(DBG_BROKER, "Publishing log creation: %s", RenderMessage(topic, msg.as_data()).c_str());
if ( peer.node != NoPeer.node ) if ( peer.node != NoPeer.node )
// Direct message. // Direct message.
bstate->endpoint.publish(peer, move(topic), move(msg)); bstate->endpoint.publish(peer, move(topic), msg.move_data());
else else
// Broadcast. // Broadcast.
bstate->endpoint.publish(move(topic), move(msg)); bstate->endpoint.publish(move(topic), msg.move_data());
return true; return true;
} }
@ -563,7 +563,7 @@ bool Manager::PublishLogWrite(EnumVal* stream, EnumVal* writer, string path, int
broker::bro::LogWrite msg(move(bstream_id), move(bwriter_id), move(path), broker::bro::LogWrite msg(move(bstream_id), move(bwriter_id), move(path),
move(serial_data)); move(serial_data));
DBG_LOG(DBG_BROKER, "Buffering log record: %s", RenderMessage(topic, msg).c_str()); DBG_LOG(DBG_BROKER, "Buffering log record: %s", RenderMessage(topic, msg.as_data()).c_str());
if ( log_buffers.size() <= (unsigned int)stream_id_num ) if ( log_buffers.size() <= (unsigned int)stream_id_num )
log_buffers.resize(stream_id_num + 1); log_buffers.resize(stream_id_num + 1);
@ -571,7 +571,7 @@ bool Manager::PublishLogWrite(EnumVal* stream, EnumVal* writer, string path, int
auto& lb = log_buffers[stream_id_num]; auto& lb = log_buffers[stream_id_num];
++lb.message_count; ++lb.message_count;
auto& pending_batch = lb.msgs[topic]; auto& pending_batch = lb.msgs[topic];
pending_batch.emplace_back(std::move(msg)); pending_batch.emplace_back(msg.move_data());
if ( lb.message_count >= LOG_BATCH_SIZE || if ( lb.message_count >= LOG_BATCH_SIZE ||
(network_time - lb.last_flush >= LOG_BUFFER_INTERVAL) ) (network_time - lb.last_flush >= LOG_BUFFER_INTERVAL) )
@ -597,7 +597,7 @@ size_t Manager::LogBuffer::Flush(broker::endpoint& endpoint)
batch.reserve(LOG_BATCH_SIZE + 1); batch.reserve(LOG_BATCH_SIZE + 1);
pending_batch.swap(batch); pending_batch.swap(batch);
broker::bro::Batch msg(std::move(batch)); broker::bro::Batch msg(std::move(batch));
endpoint.publish(topic, move(msg)); endpoint.publish(topic, msg.move_data());
} }
auto rval = message_count; auto rval = message_count;
@ -953,7 +953,7 @@ void Manager::ProcessEvent(const broker::topic& topic, broker::bro::Event ev)
if ( ! ev.valid() ) if ( ! ev.valid() )
{ {
reporter->Warning("received invalid broker Event: %s", reporter->Warning("received invalid broker Event: %s",
broker::to_string(ev).data()); broker::to_string(ev.as_data()).data());
return; return;
} }
@ -1026,7 +1026,7 @@ void Manager::ProcessEvent(const broker::topic& topic, broker::bro::Event ev)
bool bro_broker::Manager::ProcessLogCreate(broker::bro::LogCreate lc) bool bro_broker::Manager::ProcessLogCreate(broker::bro::LogCreate lc)
{ {
DBG_LOG(DBG_BROKER, "Received log-create: %s", RenderMessage(lc).c_str()); DBG_LOG(DBG_BROKER, "Received log-create: %s", RenderMessage(lc.as_data()).c_str());
if ( ! lc.valid() ) if ( ! lc.valid() )
{ {
reporter->Warning("received invalid broker LogCreate: %s", reporter->Warning("received invalid broker LogCreate: %s",
@ -1096,7 +1096,7 @@ bool bro_broker::Manager::ProcessLogCreate(broker::bro::LogCreate lc)
bool bro_broker::Manager::ProcessLogWrite(broker::bro::LogWrite lw) bool bro_broker::Manager::ProcessLogWrite(broker::bro::LogWrite lw)
{ {
DBG_LOG(DBG_BROKER, "Received log-write: %s", RenderMessage(lw).c_str()); DBG_LOG(DBG_BROKER, "Received log-write: %s", RenderMessage(lw.as_data()).c_str());
if ( ! lw.valid() ) if ( ! lw.valid() )
{ {
@ -1183,7 +1183,7 @@ bool bro_broker::Manager::ProcessLogWrite(broker::bro::LogWrite lw)
bool Manager::ProcessIdentifierUpdate(broker::bro::IdentifierUpdate iu) bool Manager::ProcessIdentifierUpdate(broker::bro::IdentifierUpdate iu)
{ {
DBG_LOG(DBG_BROKER, "Received id-update: %s", RenderMessage(iu).c_str()); DBG_LOG(DBG_BROKER, "Received id-update: %s", RenderMessage(iu.as_data()).c_str());
if ( ! iu.valid() ) if ( ! iu.valid() )
{ {