Add validity checking/warnings for Broker messages

This commit is contained in:
Jon Siwek 2019-02-06 13:17:39 -06:00
parent 1ee96516e8
commit 018f687c33
4 changed files with 50 additions and 2 deletions

View file

@ -840,6 +840,11 @@ double Manager::NextTimestamp(double* local_network_time)
void Manager::DispatchMessage(const broker::topic& topic, broker::data msg)
{
switch ( broker::bro::Message::type(msg) ) {
case broker::bro::Message::Type::Invalid:
reporter->Warning("received invalid broker message: %s",
broker::to_string(msg).data());
break;
case broker::bro::Message::Type::Event:
ProcessEvent(topic, std::move(msg));
break;
@ -860,6 +865,13 @@ void Manager::DispatchMessage(const broker::topic& topic, broker::data msg)
{
broker::bro::Batch batch(std::move(msg));
if ( ! batch.valid() )
{
reporter->Warning("received invalid broker Batch: %s",
broker::to_string(batch).data());
return;
}
for ( auto& i : batch.batch() )
DispatchMessage(topic, std::move(i));
@ -869,6 +881,8 @@ void Manager::DispatchMessage(const broker::topic& topic, broker::data msg)
default:
// We ignore unknown types so that we could add more in the
// future if we had too.
reporter->Warning("received unknown broker message: %s",
broker::to_string(msg).data());
break;
}
}
@ -934,6 +948,13 @@ void Manager::Process()
void Manager::ProcessEvent(const broker::topic& topic, broker::bro::Event ev)
{
if ( ! ev.valid() )
{
reporter->Warning("received invalid broker Event: %s",
broker::to_string(ev).data());
return;
}
auto name = std::move(ev.name());
auto args = std::move(ev.args());
@ -1001,6 +1022,12 @@ void Manager::ProcessEvent(const broker::topic& topic, broker::bro::Event ev)
bool bro_broker::Manager::ProcessLogCreate(broker::bro::LogCreate lc)
{
DBG_LOG(DBG_BROKER, "Received log-create: %s", RenderMessage(lc).c_str());
if ( ! lc.valid() )
{
reporter->Warning("received invalid broker LogCreate: %s",
broker::to_string(lc).data());
return false;
}
auto stream_id = data_to_val(std::move(lc.stream_id()), log_id_type);
if ( ! stream_id )
@ -1066,6 +1093,13 @@ bool bro_broker::Manager::ProcessLogWrite(broker::bro::LogWrite lw)
{
DBG_LOG(DBG_BROKER, "Received log-write: %s", RenderMessage(lw).c_str());
if ( ! lw.valid() )
{
reporter->Warning("received invalid broker LogWrite: %s",
broker::to_string(lw).data());
return false;
}
++statistics.num_logs_incoming;
auto& stream_id_name = lw.stream_id().name;
@ -1145,6 +1179,14 @@ bool bro_broker::Manager::ProcessLogWrite(broker::bro::LogWrite lw)
bool Manager::ProcessIdentifierUpdate(broker::bro::IdentifierUpdate iu)
{
DBG_LOG(DBG_BROKER, "Received id-update: %s", RenderMessage(iu).c_str());
if ( ! iu.valid() )
{
reporter->Warning("received invalid broker IdentifierUpdate: %s",
broker::to_string(iu).data());
return false;
}
++statistics.num_ids_incoming;
auto id_name = std::move(iu.id_name());
auto id_value = std::move(iu.id_value());