diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index bc9f2e8f7f..c842f56b01 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -1,11 +1,14 @@ #include "zeek/broker/Manager.h" #include +#include #include #include #include #include #include +#include +#include #include "zeek/DebugLogger.h" #include "zeek/Desc.h" @@ -30,6 +33,26 @@ using namespace std; namespace { +broker::data&& convert_if_broker_variant(broker::data&& arg) { return std::move(arg); } + +broker::data& convert_if_broker_variant(broker::data& arg) { return arg; } + +broker::data&& convert_if_broker_variant_or_move(broker::data& arg) { return std::move(arg); } + +broker::vector& broker_vector_from(broker::data& arg) { return broker::get(arg); } + +// Converts a string_view into a string to make sure that we can safely call `.c_str()` on the result. +template +std::enable_if_t, std::string_view>, std::string> c_str_safe(View&& arg) { + return std::string{arg}; +} + +// Passes through a string without copying it (already safe to call `.c_str()` on it). +template +std::enable_if_t, std::string>, const std::string&> c_str_safe(String&& arg) { + return arg; +} + void print_escaped(std::string& buf, std::string_view str) { buf.push_back('"'); for ( auto c : str ) { @@ -168,33 +191,38 @@ struct scoped_reporter_location { }; #ifdef DEBUG -static std::string RenderMessage(const broker::data& d) { return util::json_escape_utf8(broker::to_string(d)); } +namespace { -static std::string RenderMessage(std::string topic, const broker::data& x) { - return util::fmt("%s -> %s", RenderMessage(x).c_str(), topic.c_str()); -} +std::string RenderMessage(const broker::data& d) { return util::json_escape_utf8(broker::to_string(d)); } -static std::string RenderEvent(std::string topic, std::string name, const broker::data& args) { - return util::fmt("%s(%s) -> %s", name.c_str(), RenderMessage(args).c_str(), topic.c_str()); -} - -static std::string RenderMessage(const broker::store::response& x) { +std::string RenderMessage(const broker::store::response& x) { return util::fmt("%s [id %" PRIu64 "]", (x.answer ? broker::to_string(*x.answer).c_str() : ""), x.id); } -static std::string RenderMessage(const broker::vector* xs) { return broker::to_string(*xs); } +std::string RenderMessage(const broker::vector* xs) { return broker::to_string(*xs); } -static std::string RenderMessage(const broker::vector& xs) { return broker::to_string(xs); } +std::string RenderMessage(const broker::vector& xs) { return broker::to_string(xs); } -static std::string RenderMessage(broker::status_view s) { return broker::to_string(s.code()); } +std::string RenderMessage(const broker::status& s) { return broker::to_string(s.code()); } -static std::string RenderMessage(broker::error_view e) { +std::string RenderMessage(const broker::error& e) { if ( auto ctx = e.context() ) return util::fmt("%s (%s)", to_string(e.code()).c_str(), to_string(*ctx).c_str()); else return util::fmt("%s (null)", to_string(e.code()).c_str()); } +template +std::string RenderMessage(const std::string& topic, const DataOrVariant& x) { + return util::fmt("%s -> %s", RenderMessage(x).c_str(), topic.c_str()); +} + +template +std::string RenderEvent(const std::string& topic, const std::string& name, const DataOrVariant& args) { + return util::fmt("%s(%s) -> %s", name.c_str(), RenderMessage(args).c_str(), topic.c_str()); +} + +} // namespace #endif Manager::Manager(bool arg_use_real_time) { @@ -721,7 +749,7 @@ bool Manager::PublishLogWrite(EnumVal* stream, EnumVal* writer, string path, int reporter->Error( "Failed to remotely log: log_topic func did not return" " a value for stream %s at path %s", - stream_id, path.data()); + stream_id, c_str_safe(path).c_str()); return false; } @@ -939,11 +967,11 @@ void Manager::Process() { bool had_input = ! messages.empty(); for ( auto& message : messages ) { - auto& topic = broker::get_topic(message); + auto&& topic = broker::get_topic(message); if ( broker::is_prefix(topic, broker::topic::statuses_str) ) { - if ( auto stat = broker::make_status_view(get_data(message)) ) { - ProcessStatus(stat); + if ( auto stat = broker::to(get_data(message)) ) { + ProcessStatus(*stat); } else { auto str = to_string(message); @@ -953,8 +981,8 @@ void Manager::Process() { } if ( broker::is_prefix(topic, broker::topic::errors_str) ) { - if ( auto err = broker::make_error_view(get_data(message)) ) { - ProcessError(err); + if ( auto err = broker::to(get_data(message)) ) { + ProcessError(*err); } else { auto str = to_string(message); @@ -964,7 +992,7 @@ void Manager::Process() { } if ( broker::is_prefix(topic, broker::topic::store_events_str) ) { - ProcessStoreEvent(broker::move_data(message)); + ProcessStoreEvent(convert_if_broker_variant(broker::move_data(message))); continue; } @@ -1151,12 +1179,12 @@ void Manager::ProcessMessage(std::string_view topic, broker::zeek::Batch& ev) { void Manager::ProcessMessage(std::string_view topic, broker::zeek::Event& ev) { if ( ! ev.valid() ) { - reporter->Warning("received invalid broker Event: %s", broker::to_string(ev.as_data()).data()); + reporter->Warning("received invalid broker Event: %s", broker::to_string(ev.as_data()).c_str()); return; } - const auto& name = ev.name(); - const auto& args = ev.args(); + auto&& name = ev.name(); + auto&& args = ev.args(); double ts; if ( auto ev_ts = ev.ts() ) @@ -1165,7 +1193,7 @@ void Manager::ProcessMessage(std::string_view topic, broker::zeek::Event& ev) { // Default to current network time, if the received event did not contain a timestamp. ts = run_state::network_time; - DBG_LOG(DBG_BROKER, "Process event: %s (%.6f) %s", name.data(), ts, RenderMessage(args).data()); + DBG_LOG(DBG_BROKER, "Process event: %s (%.6f) %s", c_str_safe(name).c_str(), ts, RenderMessage(args).c_str()); ++statistics.num_events_incoming; auto handler = event_registry->Lookup(name); @@ -1179,7 +1207,8 @@ void Manager::ProcessMessage(std::string_view topic, broker::zeek::Event& ev) { if ( strncmp(p.data(), topic.data(), p.size()) != 0 ) continue; - DBG_LOG(DBG_BROKER, "Skip processing of forwarded event: %s %s", name.data(), RenderMessage(args).data()); + DBG_LOG(DBG_BROKER, "Skip processing of forwarded event: %s %s", c_str_safe(name).c_str(), + RenderMessage(args).c_str()); return; } @@ -1189,7 +1218,7 @@ void Manager::ProcessMessage(std::string_view topic, broker::zeek::Event& ev) { reporter->Warning( "got event message '%s' with invalid # of args," " got %zd, expected %zu", - name.data(), args.size(), arg_types.size()); + c_str_safe(name).c_str(), args.size(), arg_types.size()); return; } @@ -1199,7 +1228,7 @@ void Manager::ProcessMessage(std::string_view topic, broker::zeek::Event& ev) { for ( size_t i = 0; i < args.size(); ++i ) { auto got_type = args[i].get_type_name(); const auto& expected_type = arg_types[i]; - auto arg = args[i]; + auto arg = convert_if_broker_variant(args[i]); auto val = detail::data_to_val(arg, expected_type.get()); if ( val ) @@ -1213,7 +1242,7 @@ void Manager::ProcessMessage(std::string_view topic, broker::zeek::Event& ev) { // fields. Produce an error message that shows what we // received. std::string elements; - for ( const auto& e : broker::get(args[i]) ) { + for ( auto&& e : broker_vector_from(args[i]) ) { if ( ! elements.empty() ) elements += ", "; @@ -1224,7 +1253,8 @@ void Manager::ProcessMessage(std::string_view topic, broker::zeek::Event& ev) { expected_type->GetName().c_str()); } - reporter->Warning("failed to convert remote event '%s' arg #%zu, %s", name.data(), i, msg_addl.c_str()); + reporter->Warning("failed to convert remote event '%s' arg #%zu, %s", c_str_safe(name).c_str(), i, + msg_addl.c_str()); // If we got a vector and expected a function this is // possibly because of a mismatch between @@ -1246,7 +1276,7 @@ void Manager::ProcessMessage(std::string_view topic, broker::zeek::Event& ev) { bool Manager::ProcessMessage(std::string_view, broker::zeek::LogCreate& lc) { DBG_LOG(DBG_BROKER, "Received log-create: %s", RenderMessage(lc.as_data()).c_str()); if ( ! lc.valid() ) { - reporter->Warning("received invalid broker LogCreate: %s", broker::to_string(lc.as_data()).data()); + reporter->Warning("received invalid broker LogCreate: %s", broker::to_string(lc.as_data()).c_str()); return false; } @@ -1265,27 +1295,27 @@ bool Manager::ProcessMessage(std::string_view, broker::zeek::LogCreate& lc) { } auto writer_info = std::make_unique(); - if ( ! writer_info->FromBroker(std::move(lc.writer_info())) ) { + if ( ! writer_info->FromBroker(convert_if_broker_variant_or_move(lc.writer_info())) ) { reporter->Warning("failed to unpack remote log writer info"); return false; } // Get log fields. - auto fields_data = get_if(&lc.fields_data()); - - if ( ! fields_data ) { + if ( ! lc.fields_data().is_list() ) { reporter->Warning("failed to unpack remote log fields"); return false; } + auto&& fields_data = broker_vector_from(lc.fields_data()); - auto num_fields = fields_data->size(); + auto num_fields = fields_data.size(); auto fields = new threading::Field*[num_fields]; for ( size_t i = 0; i < num_fields; ++i ) { - if ( auto field = detail::data_to_threading_field(std::move((*fields_data)[i])) ) + if ( auto field = detail::data_to_threading_field(fields_data[i]) ) fields[i] = field; else { - reporter->Warning("failed to convert remote log field # %zu", i); + reporter->Warning("failed to convert remote log field #%zu: %s", i, + broker::to_string(fields_data[i]).c_str()); delete[] fields; return false; } @@ -1305,19 +1335,19 @@ bool Manager::ProcessMessage(std::string_view, broker::zeek::LogWrite& lw) { DBG_LOG(DBG_BROKER, "Received log-write: %s", RenderMessage(lw.as_data()).c_str()); if ( ! lw.valid() ) { - reporter->Warning("received invalid broker LogWrite: %s", broker::to_string(lw.as_data()).data()); + reporter->Warning("received invalid broker LogWrite: %s", broker::to_string(lw.as_data()).c_str()); return false; } ++statistics.num_logs_incoming; - auto& stream_id_name = lw.stream_id().name; + auto&& stream_id_name = lw.stream_id().name; // Get stream ID. auto wrapped_stream_id = broker::data{lw.stream_id()}; auto stream_id = detail::data_to_val(wrapped_stream_id, log_id_type); if ( ! stream_id ) { - reporter->Warning("failed to unpack remote log stream id: %s", stream_id_name.data()); + reporter->Warning("failed to unpack remote log stream id: %s", c_str_safe(stream_id_name).c_str()); return false; } @@ -1325,7 +1355,7 @@ bool Manager::ProcessMessage(std::string_view, broker::zeek::LogWrite& lw) { auto wrapped_writer_id = broker::data{lw.writer_id()}; auto writer_id = detail::data_to_val(wrapped_writer_id, writer_id_type); if ( ! writer_id ) { - reporter->Warning("failed to unpack remote log writer id for stream: %s", stream_id_name.data()); + reporter->Warning("failed to unpack remote log writer id for stream: %s", c_str_safe(stream_id_name).c_str()); return false; } @@ -1340,7 +1370,8 @@ bool Manager::ProcessMessage(std::string_view, broker::zeek::LogWrite& lw) { bool success = fmt.Read(&num_fields, "num_fields"); if ( ! success ) { - reporter->Warning("failed to unserialize remote log num fields for stream: %s", stream_id_name.data()); + reporter->Warning("failed to unserialize remote log num fields for stream: %s", + c_str_safe(stream_id_name).c_str()); return false; } @@ -1354,7 +1385,8 @@ bool Manager::ProcessMessage(std::string_view, broker::zeek::LogWrite& lw) { delete vals[j]; delete[] vals; - reporter->Warning("failed to unserialize remote log field %d for stream: %s", i, stream_id_name.data()); + reporter->Warning("failed to unserialize remote log field %d for stream: %s", i, + c_str_safe(stream_id_name).c_str()); return false; } @@ -1369,13 +1401,13 @@ bool Manager::ProcessMessage(std::string_view, broker::zeek::IdentifierUpdate& i DBG_LOG(DBG_BROKER, "Received id-update: %s", RenderMessage(iu.as_data()).c_str()); if ( ! iu.valid() ) { - reporter->Warning("received invalid broker IdentifierUpdate: %s", broker::to_string(iu.as_data()).data()); + reporter->Warning("received invalid broker IdentifierUpdate: %s", broker::to_string(iu.as_data()).c_str()); return false; } ++statistics.num_ids_incoming; - auto id_name = std::move(iu.id_name()); - auto id_value = std::move(iu.id_value()); + auto id_name = c_str_safe(iu.id_name()); + auto id_value = convert_if_broker_variant_or_move(iu.id_value()); const auto& id = zeek::detail::global_scope()->Find(id_name); if ( ! id ) { @@ -1395,7 +1427,7 @@ bool Manager::ProcessMessage(std::string_view, broker::zeek::IdentifierUpdate& i return true; } -void Manager::ProcessStatus(broker::status_view stat) { +void Manager::ProcessStatus(broker::status& stat) { DBG_LOG(DBG_BROKER, "Received status message: %s", RenderMessage(stat).c_str()); auto ctx = stat.context(); @@ -1425,7 +1457,7 @@ void Manager::ProcessStatus(broker::status_view stat) { case broker::sc::endpoint_unreachable: event = ::Broker::endpoint_unreachable; break; - default: reporter->Warning("Unhandled Broker status: %s", to_string(stat).data()); break; + default: reporter->Warning("Unhandled Broker status: %s", to_string(stat).c_str()); break; } if ( ! event ) @@ -1440,7 +1472,7 @@ void Manager::ProcessStatus(broker::status_view stat) { auto network_info = make_intrusive(ni); if ( ctx->network ) { - network_info->Assign(0, ctx->network->address.data()); + network_info->Assign(0, ctx->network->address.c_str()); network_info->Assign(1, val_mgr->Port(ctx->network->port, TRANSPORT_TCP)); } else { @@ -1459,7 +1491,7 @@ void Manager::ProcessStatus(broker::status_view stat) { event_mgr.Enqueue(event, std::move(endpoint_info), std::move(msg)); } -void Manager::ProcessError(broker::error_view err) { +void Manager::ProcessError(broker::error& err) { DBG_LOG(DBG_BROKER, "Received error message: %s", RenderMessage(err).c_str()); if ( ! ::Broker::error ) diff --git a/src/broker/Manager.h b/src/broker/Manager.h index 7384fe6218..50020602cc 100644 --- a/src/broker/Manager.h +++ b/src/broker/Manager.h @@ -425,8 +425,8 @@ private: bool ProcessMessage(std::string_view topic, broker::zeek::LogCreate& lc); bool ProcessMessage(std::string_view topic, broker::zeek::LogWrite& lw); bool ProcessMessage(std::string_view topic, broker::zeek::IdentifierUpdate& iu); - void ProcessStatus(broker::status_view stat); - void ProcessError(broker::error_view err); + void ProcessStatus(broker::status& stat); + void ProcessError(broker::error& err); void ProcessStoreResponse(detail::StoreHandleVal*, broker::store::response response); void FlushPendingQueries(); // Initializes the masters for Broker backed Zeek tables when using the &backend attribute