Add facade types to avoid using raw Broker types

By avoiding to use `broker::data` directly, we gain a degree of freedom
that allows us to swap out `broker::data` for something else (e.g.,
`broker::variant`) in the future. Furthermore, it also helps us to keep
Broker types "local" to the Broker manager and gives us a nicer
interface.

Also replaces uses of `broker::expected` with `std::optional`. While an
`expected `can carry additional information as to why a value is not
present, nothing in Zeek ever cared about that. Hence, using
`std::optional` removes an unnecessary dependency on a Broker detail
while also being more efficient (no extra heap allocation when no value
is present).
This commit is contained in:
Dominik Charousset 2023-11-11 09:21:00 +01:00
parent bc0f85caa8
commit 647fdf7737
30 changed files with 1091 additions and 552 deletions

View file

@ -603,14 +603,14 @@ bool Manager::PublishIdentifier(std::string topic, std::string id) {
// receiving side, but not sure what use that would be.
return false;
auto data = detail::val_to_data(val.get());
auto data = BrokerData{};
if ( ! data ) {
if ( ! data.Convert(val) ) {
Error("Failed to publish ID with unsupported type: %s (%s)", id.c_str(), type_name(val->GetType()->Tag()));
return false;
}
broker::zeek::IdentifierUpdate msg(std::move(id), std::move(*data));
broker::zeek::IdentifierUpdate msg(std::move(id), std::move(data.value_));
DBG_LOG(DBG_BROKER, "Publishing id-update: %s", RenderMessage(topic, msg.as_data()).c_str());
bstate->endpoint.publish(std::move(topic), msg.move_data());
++statistics.num_ids_outgoing;
@ -888,7 +888,7 @@ RecordVal* Manager::MakeEvent(ValPList* args, zeek::detail::Frame* frame) {
if ( same_type(got_type, detail::DataVal::ScriptDataType()) )
data_val = {NewRef{}, (*args)[i]->AsRecordVal()};
else
data_val = detail::make_data_val((*args)[i]);
data_val = BrokerData::ToRecordVal((*args)[i]);
if ( ! data_val->HasField(0) ) {
rval->Remove(0);
@ -1026,10 +1026,11 @@ void Manager::ProcessStoreEventInsertUpdate(const TableValPtr& table, const std:
const auto& its = table->GetType()->AsTableType()->GetIndexTypes();
ValPtr zeek_key;
auto key_copy = key;
if ( its.size() == 1 )
zeek_key = detail::data_to_val(key, its[0].get());
zeek_key = detail::data_to_val(key_copy, its[0].get());
else
zeek_key = detail::data_to_val(key, table->GetType()->AsTableType()->GetIndices().get());
zeek_key = detail::data_to_val(key_copy, table->GetType()->AsTableType()->GetIndices().get());
if ( ! zeek_key ) {
reporter->Error(
@ -1045,7 +1046,8 @@ void Manager::ProcessStoreEventInsertUpdate(const TableValPtr& table, const std:
}
// it is a table
auto zeek_value = detail::data_to_val(data, table->GetType()->Yield().get());
auto data_copy = data;
auto zeek_value = detail::data_to_val(data_copy, table->GetType()->Yield().get());
if ( ! zeek_value ) {
reporter->Error(
"ProcessStoreEvent %s: could not convert value \"%s\" for key \"%s\" in "
@ -1204,7 +1206,8 @@ void Manager::ProcessMessage(std::string_view topic, broker::zeek::Event& ev) {
for ( size_t i = 0; i < args.size(); ++i ) {
auto got_type = args[i].get_type_name();
const auto& expected_type = arg_types[i];
auto val = detail::data_to_val(args[i], expected_type.get());
auto arg = args[i];
auto val = detail::data_to_val(arg, expected_type.get());
if ( val )
vl.emplace_back(std::move(val));
@ -1254,13 +1257,15 @@ bool Manager::ProcessMessage(std::string_view, broker::zeek::LogCreate& lc) {
return false;
}
auto stream_id = detail::data_to_val(std::move(lc.stream_id()), log_id_type);
auto wrapped_stream_id = broker::data{lc.stream_id()};
auto stream_id = detail::data_to_val(wrapped_stream_id, log_id_type);
if ( ! stream_id ) {
reporter->Warning("failed to unpack remote log stream id");
return false;
}
auto writer_id = detail::data_to_val(std::move(lc.writer_id()), writer_id_type);
auto wrapped_writer_id = broker::data{lc.writer_id()};
auto writer_id = detail::data_to_val(wrapped_writer_id, writer_id_type);
if ( ! writer_id ) {
reporter->Warning("failed to unpack remote log writer id");
return false;
@ -1315,7 +1320,8 @@ bool Manager::ProcessMessage(std::string_view, broker::zeek::LogWrite& lw) {
auto& stream_id_name = lw.stream_id().name;
// Get stream ID.
auto stream_id = detail::data_to_val(std::move(lw.stream_id()), log_id_type);
auto wrapped_stream_id = broker::data{lw.stream_id()};
auto stream_id = detail::data_to_val(wrapped_stream_id, log_id_type);
if ( ! stream_id ) {
reporter->Warning("failed to unpack remote log stream id: %s", stream_id_name.data());
@ -1323,7 +1329,8 @@ bool Manager::ProcessMessage(std::string_view, broker::zeek::LogWrite& lw) {
}
// Get writer ID.
auto writer_id = detail::data_to_val(std::move(lw.writer_id()), writer_id_type);
auto wrapped_writer_id = broker::data{lw.writer_id()};
auto writer_id = detail::data_to_val(wrapped_writer_id, writer_id_type);
if ( ! writer_id ) {
reporter->Warning("failed to unpack remote log writer id for stream: %s", stream_id_name.data());
return false;
@ -1383,7 +1390,7 @@ bool Manager::ProcessMessage(std::string_view, broker::zeek::IdentifierUpdate& i
return false;
}
auto val = detail::data_to_val(std::move(id_value), id->GetType().get());
auto val = detail::data_to_val(id_value, id->GetType().get());
if ( ! val ) {
reporter->Error("Failed to receive ID with unsupported type: %s (%s)", id_name.c_str(),
@ -1516,8 +1523,10 @@ void Manager::ProcessStoreResponse(detail::StoreHandleVal* s, broker::store::res
return;
}
if ( response.answer )
request->second->Result(detail::query_result(detail::make_data_val(std::move(*response.answer))));
if ( response.answer ) {
BrokerData tmp{std::move(*response.answer)};
request->second->Result(detail::query_result(std::move(tmp).ToRecordVal()));
}
else if ( response.answer.error() == broker::ec::request_timeout ) {
// Fine, trigger's timeout takes care of things.
}
@ -1604,11 +1613,12 @@ void Manager::BrokerStoreToZeekTable(const std::string& name, const detail::Stor
table->DisableChangeNotifications();
for ( const auto& key : *set ) {
ValPtr zeek_key;
auto zeek_key = ValPtr{};
auto key_copy = key;
if ( its.size() == 1 )
zeek_key = detail::data_to_val(key, its[0].get());
zeek_key = detail::data_to_val(key_copy, its[0].get());
else
zeek_key = detail::data_to_val(key, table->GetType()->AsTableType()->GetIndices().get());
zeek_key = detail::data_to_val(key_copy, table->GetType()->AsTableType()->GetIndices().get());
if ( ! zeek_key ) {
reporter->Error(