Prepare Broker manager for broker::variant

- With `broker::data`, we always have actual `std::string` objects that
  we can pass to C functions expecting a null-terminated string.
  However, `broker::variant` will return a `std::string_view` where we
  have previously received a `std::string`. Hence, we add an extra level
  of indirection that ensures that views are converted to
  null-terminated strings and also use `c_str()` where we have
  previously used `data()`. The former is not present on a
  `std::string_view`. Using this member function instead acts as an
  extra level of insurance that we do not accidentally pass the bytes
  from a view to a C function.
- Switch from error and status views to actual error and status objects.
  The view types from Broker only work with `broker::data` and thus
  won't be available with `broker::variant`.
This commit is contained in:
Dominik Charousset 2023-12-22 12:15:26 +01:00 committed by Christian Kreibich
parent a16179eae7
commit 4e3acfe8fc
2 changed files with 84 additions and 52 deletions

View file

@ -1,11 +1,14 @@
#include "zeek/broker/Manager.h"
#include <broker/broker.hh>
#include <broker/config.hh>
#include <broker/configuration.hh>
#include <broker/zeek.hh>
#include <unistd.h>
#include <cstdio>
#include <cstring>
#include <string>
#include <string_view>
#include "zeek/DebugLogger.h"
#include "zeek/Desc.h"
@ -30,6 +33,26 @@ using namespace std;
namespace {
broker::data&& convert_if_broker_variant(broker::data&& arg) { return std::move(arg); }
broker::data& convert_if_broker_variant(broker::data& arg) { return arg; }
broker::data&& convert_if_broker_variant_or_move(broker::data& arg) { return std::move(arg); }
broker::vector& broker_vector_from(broker::data& arg) { return broker::get<broker::vector>(arg); }
// Converts a string_view into a string to make sure that we can safely call `.c_str()` on the result.
template<class View>
std::enable_if_t<std::is_same_v<std::decay_t<View>, std::string_view>, std::string> c_str_safe(View&& arg) {
return std::string{arg};
}
// Passes through a string without copying it (already safe to call `.c_str()` on it).
template<class String>
std::enable_if_t<std::is_same_v<std::decay_t<String>, std::string>, const std::string&> c_str_safe(String&& arg) {
return arg;
}
void print_escaped(std::string& buf, std::string_view str) {
buf.push_back('"');
for ( auto c : str ) {
@ -168,33 +191,38 @@ struct scoped_reporter_location {
};
#ifdef DEBUG
static std::string RenderMessage(const broker::data& d) { return util::json_escape_utf8(broker::to_string(d)); }
namespace {
static std::string RenderMessage(std::string topic, const broker::data& x) {
return util::fmt("%s -> %s", RenderMessage(x).c_str(), topic.c_str());
}
std::string RenderMessage(const broker::data& d) { return util::json_escape_utf8(broker::to_string(d)); }
static std::string RenderEvent(std::string topic, std::string name, const broker::data& args) {
return util::fmt("%s(%s) -> %s", name.c_str(), RenderMessage(args).c_str(), topic.c_str());
}
static std::string RenderMessage(const broker::store::response& x) {
std::string RenderMessage(const broker::store::response& x) {
return util::fmt("%s [id %" PRIu64 "]", (x.answer ? broker::to_string(*x.answer).c_str() : "<no answer>"), x.id);
}
static std::string RenderMessage(const broker::vector* xs) { return broker::to_string(*xs); }
std::string RenderMessage(const broker::vector* xs) { return broker::to_string(*xs); }
static std::string RenderMessage(const broker::vector& xs) { return broker::to_string(xs); }
std::string RenderMessage(const broker::vector& xs) { return broker::to_string(xs); }
static std::string RenderMessage(broker::status_view s) { return broker::to_string(s.code()); }
std::string RenderMessage(const broker::status& s) { return broker::to_string(s.code()); }
static std::string RenderMessage(broker::error_view e) {
std::string RenderMessage(const broker::error& e) {
if ( auto ctx = e.context() )
return util::fmt("%s (%s)", to_string(e.code()).c_str(), to_string(*ctx).c_str());
else
return util::fmt("%s (null)", to_string(e.code()).c_str());
}
template<class DataOrVariant>
std::string RenderMessage(const std::string& topic, const DataOrVariant& x) {
return util::fmt("%s -> %s", RenderMessage(x).c_str(), topic.c_str());
}
template<class DataOrVariant>
std::string RenderEvent(const std::string& topic, const std::string& name, const DataOrVariant& args) {
return util::fmt("%s(%s) -> %s", name.c_str(), RenderMessage(args).c_str(), topic.c_str());
}
} // namespace
#endif
Manager::Manager(bool arg_use_real_time) {
@ -721,7 +749,7 @@ bool Manager::PublishLogWrite(EnumVal* stream, EnumVal* writer, string path, int
reporter->Error(
"Failed to remotely log: log_topic func did not return"
" a value for stream %s at path %s",
stream_id, path.data());
stream_id, c_str_safe(path).c_str());
return false;
}
@ -939,11 +967,11 @@ void Manager::Process() {
bool had_input = ! messages.empty();
for ( auto& message : messages ) {
auto& topic = broker::get_topic(message);
auto&& topic = broker::get_topic(message);
if ( broker::is_prefix(topic, broker::topic::statuses_str) ) {
if ( auto stat = broker::make_status_view(get_data(message)) ) {
ProcessStatus(stat);
if ( auto stat = broker::to<broker::status>(get_data(message)) ) {
ProcessStatus(*stat);
}
else {
auto str = to_string(message);
@ -953,8 +981,8 @@ void Manager::Process() {
}
if ( broker::is_prefix(topic, broker::topic::errors_str) ) {
if ( auto err = broker::make_error_view(get_data(message)) ) {
ProcessError(err);
if ( auto err = broker::to<broker::error>(get_data(message)) ) {
ProcessError(*err);
}
else {
auto str = to_string(message);
@ -964,7 +992,7 @@ void Manager::Process() {
}
if ( broker::is_prefix(topic, broker::topic::store_events_str) ) {
ProcessStoreEvent(broker::move_data(message));
ProcessStoreEvent(convert_if_broker_variant(broker::move_data(message)));
continue;
}
@ -1151,12 +1179,12 @@ void Manager::ProcessMessage(std::string_view topic, broker::zeek::Batch& ev) {
void Manager::ProcessMessage(std::string_view topic, broker::zeek::Event& ev) {
if ( ! ev.valid() ) {
reporter->Warning("received invalid broker Event: %s", broker::to_string(ev.as_data()).data());
reporter->Warning("received invalid broker Event: %s", broker::to_string(ev.as_data()).c_str());
return;
}
const auto& name = ev.name();
const auto& args = ev.args();
auto&& name = ev.name();
auto&& args = ev.args();
double ts;
if ( auto ev_ts = ev.ts() )
@ -1165,7 +1193,7 @@ void Manager::ProcessMessage(std::string_view topic, broker::zeek::Event& ev) {
// Default to current network time, if the received event did not contain a timestamp.
ts = run_state::network_time;
DBG_LOG(DBG_BROKER, "Process event: %s (%.6f) %s", name.data(), ts, RenderMessage(args).data());
DBG_LOG(DBG_BROKER, "Process event: %s (%.6f) %s", c_str_safe(name).c_str(), ts, RenderMessage(args).c_str());
++statistics.num_events_incoming;
auto handler = event_registry->Lookup(name);
@ -1179,7 +1207,8 @@ void Manager::ProcessMessage(std::string_view topic, broker::zeek::Event& ev) {
if ( strncmp(p.data(), topic.data(), p.size()) != 0 )
continue;
DBG_LOG(DBG_BROKER, "Skip processing of forwarded event: %s %s", name.data(), RenderMessage(args).data());
DBG_LOG(DBG_BROKER, "Skip processing of forwarded event: %s %s", c_str_safe(name).c_str(),
RenderMessage(args).c_str());
return;
}
@ -1189,7 +1218,7 @@ void Manager::ProcessMessage(std::string_view topic, broker::zeek::Event& ev) {
reporter->Warning(
"got event message '%s' with invalid # of args,"
" got %zd, expected %zu",
name.data(), args.size(), arg_types.size());
c_str_safe(name).c_str(), args.size(), arg_types.size());
return;
}
@ -1199,7 +1228,7 @@ void Manager::ProcessMessage(std::string_view topic, broker::zeek::Event& ev) {
for ( size_t i = 0; i < args.size(); ++i ) {
auto got_type = args[i].get_type_name();
const auto& expected_type = arg_types[i];
auto arg = args[i];
auto arg = convert_if_broker_variant(args[i]);
auto val = detail::data_to_val(arg, expected_type.get());
if ( val )
@ -1213,7 +1242,7 @@ void Manager::ProcessMessage(std::string_view topic, broker::zeek::Event& ev) {
// fields. Produce an error message that shows what we
// received.
std::string elements;
for ( const auto& e : broker::get<broker::vector>(args[i]) ) {
for ( auto&& e : broker_vector_from(args[i]) ) {
if ( ! elements.empty() )
elements += ", ";
@ -1224,7 +1253,8 @@ void Manager::ProcessMessage(std::string_view topic, broker::zeek::Event& ev) {
expected_type->GetName().c_str());
}
reporter->Warning("failed to convert remote event '%s' arg #%zu, %s", name.data(), i, msg_addl.c_str());
reporter->Warning("failed to convert remote event '%s' arg #%zu, %s", c_str_safe(name).c_str(), i,
msg_addl.c_str());
// If we got a vector and expected a function this is
// possibly because of a mismatch between
@ -1246,7 +1276,7 @@ void Manager::ProcessMessage(std::string_view topic, broker::zeek::Event& ev) {
bool Manager::ProcessMessage(std::string_view, broker::zeek::LogCreate& lc) {
DBG_LOG(DBG_BROKER, "Received log-create: %s", RenderMessage(lc.as_data()).c_str());
if ( ! lc.valid() ) {
reporter->Warning("received invalid broker LogCreate: %s", broker::to_string(lc.as_data()).data());
reporter->Warning("received invalid broker LogCreate: %s", broker::to_string(lc.as_data()).c_str());
return false;
}
@ -1265,27 +1295,27 @@ bool Manager::ProcessMessage(std::string_view, broker::zeek::LogCreate& lc) {
}
auto writer_info = std::make_unique<logging::WriterBackend::WriterInfo>();
if ( ! writer_info->FromBroker(std::move(lc.writer_info())) ) {
if ( ! writer_info->FromBroker(convert_if_broker_variant_or_move(lc.writer_info())) ) {
reporter->Warning("failed to unpack remote log writer info");
return false;
}
// Get log fields.
auto fields_data = get_if<broker::vector>(&lc.fields_data());
if ( ! fields_data ) {
if ( ! lc.fields_data().is_list() ) {
reporter->Warning("failed to unpack remote log fields");
return false;
}
auto&& fields_data = broker_vector_from(lc.fields_data());
auto num_fields = fields_data->size();
auto num_fields = fields_data.size();
auto fields = new threading::Field*[num_fields];
for ( size_t i = 0; i < num_fields; ++i ) {
if ( auto field = detail::data_to_threading_field(std::move((*fields_data)[i])) )
if ( auto field = detail::data_to_threading_field(fields_data[i]) )
fields[i] = field;
else {
reporter->Warning("failed to convert remote log field # %zu", i);
reporter->Warning("failed to convert remote log field #%zu: %s", i,
broker::to_string(fields_data[i]).c_str());
delete[] fields;
return false;
}
@ -1305,19 +1335,19 @@ bool Manager::ProcessMessage(std::string_view, broker::zeek::LogWrite& lw) {
DBG_LOG(DBG_BROKER, "Received log-write: %s", RenderMessage(lw.as_data()).c_str());
if ( ! lw.valid() ) {
reporter->Warning("received invalid broker LogWrite: %s", broker::to_string(lw.as_data()).data());
reporter->Warning("received invalid broker LogWrite: %s", broker::to_string(lw.as_data()).c_str());
return false;
}
++statistics.num_logs_incoming;
auto& stream_id_name = lw.stream_id().name;
auto&& stream_id_name = lw.stream_id().name;
// Get stream ID.
auto wrapped_stream_id = broker::data{lw.stream_id()};
auto stream_id = detail::data_to_val(wrapped_stream_id, log_id_type);
if ( ! stream_id ) {
reporter->Warning("failed to unpack remote log stream id: %s", stream_id_name.data());
reporter->Warning("failed to unpack remote log stream id: %s", c_str_safe(stream_id_name).c_str());
return false;
}
@ -1325,7 +1355,7 @@ bool Manager::ProcessMessage(std::string_view, broker::zeek::LogWrite& lw) {
auto wrapped_writer_id = broker::data{lw.writer_id()};
auto writer_id = detail::data_to_val(wrapped_writer_id, writer_id_type);
if ( ! writer_id ) {
reporter->Warning("failed to unpack remote log writer id for stream: %s", stream_id_name.data());
reporter->Warning("failed to unpack remote log writer id for stream: %s", c_str_safe(stream_id_name).c_str());
return false;
}
@ -1340,7 +1370,8 @@ bool Manager::ProcessMessage(std::string_view, broker::zeek::LogWrite& lw) {
bool success = fmt.Read(&num_fields, "num_fields");
if ( ! success ) {
reporter->Warning("failed to unserialize remote log num fields for stream: %s", stream_id_name.data());
reporter->Warning("failed to unserialize remote log num fields for stream: %s",
c_str_safe(stream_id_name).c_str());
return false;
}
@ -1354,7 +1385,8 @@ bool Manager::ProcessMessage(std::string_view, broker::zeek::LogWrite& lw) {
delete vals[j];
delete[] vals;
reporter->Warning("failed to unserialize remote log field %d for stream: %s", i, stream_id_name.data());
reporter->Warning("failed to unserialize remote log field %d for stream: %s", i,
c_str_safe(stream_id_name).c_str());
return false;
}
@ -1369,13 +1401,13 @@ bool Manager::ProcessMessage(std::string_view, broker::zeek::IdentifierUpdate& i
DBG_LOG(DBG_BROKER, "Received id-update: %s", RenderMessage(iu.as_data()).c_str());
if ( ! iu.valid() ) {
reporter->Warning("received invalid broker IdentifierUpdate: %s", broker::to_string(iu.as_data()).data());
reporter->Warning("received invalid broker IdentifierUpdate: %s", broker::to_string(iu.as_data()).c_str());
return false;
}
++statistics.num_ids_incoming;
auto id_name = std::move(iu.id_name());
auto id_value = std::move(iu.id_value());
auto id_name = c_str_safe(iu.id_name());
auto id_value = convert_if_broker_variant_or_move(iu.id_value());
const auto& id = zeek::detail::global_scope()->Find(id_name);
if ( ! id ) {
@ -1395,7 +1427,7 @@ bool Manager::ProcessMessage(std::string_view, broker::zeek::IdentifierUpdate& i
return true;
}
void Manager::ProcessStatus(broker::status_view stat) {
void Manager::ProcessStatus(broker::status& stat) {
DBG_LOG(DBG_BROKER, "Received status message: %s", RenderMessage(stat).c_str());
auto ctx = stat.context();
@ -1425,7 +1457,7 @@ void Manager::ProcessStatus(broker::status_view stat) {
case broker::sc::endpoint_unreachable: event = ::Broker::endpoint_unreachable; break;
default: reporter->Warning("Unhandled Broker status: %s", to_string(stat).data()); break;
default: reporter->Warning("Unhandled Broker status: %s", to_string(stat).c_str()); break;
}
if ( ! event )
@ -1440,7 +1472,7 @@ void Manager::ProcessStatus(broker::status_view stat) {
auto network_info = make_intrusive<RecordVal>(ni);
if ( ctx->network ) {
network_info->Assign(0, ctx->network->address.data());
network_info->Assign(0, ctx->network->address.c_str());
network_info->Assign(1, val_mgr->Port(ctx->network->port, TRANSPORT_TCP));
}
else {
@ -1459,7 +1491,7 @@ void Manager::ProcessStatus(broker::status_view stat) {
event_mgr.Enqueue(event, std::move(endpoint_info), std::move(msg));
}
void Manager::ProcessError(broker::error_view err) {
void Manager::ProcessError(broker::error& err) {
DBG_LOG(DBG_BROKER, "Received error message: %s", RenderMessage(err).c_str());
if ( ! ::Broker::error )

View file

@ -425,8 +425,8 @@ private:
bool ProcessMessage(std::string_view topic, broker::zeek::LogCreate& lc);
bool ProcessMessage(std::string_view topic, broker::zeek::LogWrite& lw);
bool ProcessMessage(std::string_view topic, broker::zeek::IdentifierUpdate& iu);
void ProcessStatus(broker::status_view stat);
void ProcessError(broker::error_view err);
void ProcessStatus(broker::status& stat);
void ProcessError(broker::error& err);
void ProcessStoreResponse(detail::StoreHandleVal*, broker::store::response response);
void FlushPendingQueries();
// Initializes the masters for Broker backed Zeek tables when using the &backend attribute