Port broker::data variant usages to use CAF API directly

Old code still all worked, but made use of Broker functions which
now just redirect to CAF ones.
This commit is contained in:
Jon Siwek 2018-07-17 14:20:19 -05:00
parent 4c072409f0
commit 9caad8a042
8 changed files with 125 additions and 118 deletions

View file

@ -942,13 +942,13 @@ void Manager::Process()
{
had_input = true;
if ( auto stat = broker::get_if<broker::status>(status_msg) )
if ( auto stat = caf::get_if<broker::status>(&status_msg) )
{
ProcessStatus(std::move(*stat));
continue;
}
if ( auto err = broker::get_if<broker::error>(status_msg) )
if ( auto err = caf::get_if<broker::error>(&status_msg) )
{
ProcessError(std::move(*err));
continue;
@ -1048,7 +1048,7 @@ void Manager::ProcessRelayEvent(broker::bro::RelayEvent ev)
++statistics.num_events_incoming;
for ( auto& t : ev.topics() )
PublishEvent(std::move(broker::get<std::string>(t)),
PublishEvent(std::move(caf::get<std::string>(t)),
std::move(ev.name()),
std::move(ev.args()));
}
@ -1060,7 +1060,7 @@ void Manager::ProcessHandleAndRelayEvent(broker::bro::HandleAndRelayEvent ev)
ProcessEvent(ev.name(), ev.args());
for ( auto& t : ev.topics() )
PublishEvent(std::move(broker::get<std::string>(t)),
PublishEvent(std::move(caf::get<std::string>(t)),
std::move(ev.name()),
std::move(ev.args()));
}
@ -1095,40 +1095,38 @@ bool bro_broker::Manager::ProcessLogCreate(broker::bro::LogCreate lc)
}
// Get log fields.
auto fields_data = caf::get_if<broker::vector>(&lc.fields_data());
try
{
auto fields_data = std::move(broker::get<broker::vector>(lc.fields_data()));
auto num_fields = fields_data.size();
auto fields = new threading::Field* [num_fields];
for ( auto i = 0u; i < num_fields; ++i )
{
if ( auto field = data_to_threading_field(std::move(fields_data[i])) )
fields[i] = field;
else
{
reporter->Warning("failed to convert remote log field # %d", i);
return false;
}
}
if ( ! log_mgr->CreateWriterForRemoteLog(stream_id->AsEnumVal(), writer_id->AsEnumVal(), writer_info.get(), num_fields, fields) )
{
ODesc d;
stream_id->Describe(&d);
reporter->Warning("failed to create remote log stream for %s locally", d.Description());
}
writer_info.release(); // log_mgr took ownership.
return true;
}
catch (const broker::bad_variant_access& e)
if ( ! fields_data )
{
reporter->Warning("failed to unpack remote log fields");
return false;
}
auto num_fields = fields_data->size();
auto fields = new threading::Field* [num_fields];
for ( auto i = 0u; i < num_fields; ++i )
{
if ( auto field = data_to_threading_field(std::move((*fields_data)[i])) )
fields[i] = field;
else
{
reporter->Warning("failed to convert remote log field # %d", i);
delete [] fields;
return false;
}
}
if ( ! log_mgr->CreateWriterForRemoteLog(stream_id->AsEnumVal(), writer_id->AsEnumVal(), writer_info.get(), num_fields, fields) )
{
ODesc d;
stream_id->Describe(&d);
reporter->Warning("failed to create remote log stream for %s locally", d.Description());
}
writer_info.release(); // log_mgr took ownership.
return true;
}
bool bro_broker::Manager::ProcessLogWrite(broker::bro::LogWrite lw)
@ -1159,52 +1157,56 @@ bool bro_broker::Manager::ProcessLogWrite(broker::bro::LogWrite lw)
}
unref_guard writer_id_unreffer{writer_id};
auto path = caf::get_if<std::string>(&lw.path());
try
if ( ! path )
{
auto& path = broker::get<std::string>(lw.path());
auto& serial_data = broker::get<std::string>(lw.serial_data());
BinarySerializationFormat fmt;
fmt.StartRead(serial_data.data(), serial_data.size());
int num_fields;
bool success = fmt.Read(&num_fields, "num_fields");
if ( ! success )
{
reporter->Warning("failed to unserialize remote log num fields for stream: %s", stream_id_name.data());
return false;
}
auto vals = new threading::Value* [num_fields];
for ( int i = 0; i < num_fields; ++i )
{
vals[i] = new threading::Value;
if ( ! vals[i]->Read(&fmt) )
{
for ( int j = 0; j <=i; ++j )
delete vals[j];
delete [] vals;
reporter->Warning("failed to unserialize remote log field %d for stream: %s", i, stream_id_name.data());
return false;
}
}
log_mgr->WriteFromRemote(stream_id->AsEnumVal(), writer_id->AsEnumVal(), std::move(path), num_fields, vals);
fmt.EndRead();
return true;
}
catch ( const broker::bad_variant_access& e)
{
reporter->Warning("failed to unpack remote log values (bad variant) for stream: %s", stream_id_name.data());
reporter->Warning("failed to unpack remote log values (bad path variant) for stream: %s", stream_id_name.data());
return false;
}
auto serial_data = caf::get_if<std::string>(&lw.serial_data());
if ( ! serial_data )
{
reporter->Warning("failed to unpack remote log values (bad serial_data variant) for stream: %s", stream_id_name.data());
return false;
}
BinarySerializationFormat fmt;
fmt.StartRead(serial_data->data(), serial_data->size());
int num_fields;
bool success = fmt.Read(&num_fields, "num_fields");
if ( ! success )
{
reporter->Warning("failed to unserialize remote log num fields for stream: %s", stream_id_name.data());
return false;
}
auto vals = new threading::Value* [num_fields];
for ( int i = 0; i < num_fields; ++i )
{
vals[i] = new threading::Value;
if ( ! vals[i]->Read(&fmt) )
{
for ( int j = 0; j <=i; ++j )
delete vals[j];
delete [] vals;
reporter->Warning("failed to unserialize remote log field %d for stream: %s", i, stream_id_name.data());
return false;
}
}
log_mgr->WriteFromRemote(stream_id->AsEnumVal(), writer_id->AsEnumVal(),
std::move(*path), num_fields, vals);
fmt.EndRead();
return true;
}
bool Manager::ProcessIdentifierUpdate(broker::bro::IdentifierUpdate iu)