Broker: Adapt to upstream API adjustments

This commit is contained in:
Dominik Charousset 2023-09-24 15:52:21 +02:00 committed by Tim Wojtulewicz
parent 738c39379f
commit 015608fef6
3 changed files with 46 additions and 98 deletions

@ -1 +1 @@
Subproject commit f9af5f0ed2b87e01790fe06a0658cc54f1c32974 Subproject commit 86ed39b80c272cff44bb95f58eeda1bc5ff6f65c

View file

@ -861,8 +861,7 @@ bool Manager::PublishLogWrite(EnumVal* stream, EnumVal* writer, string path, int
auto& lb = log_buffers[stream_id_num]; auto& lb = log_buffers[stream_id_num];
++lb.message_count; ++lb.message_count;
auto& pending_batch = lb.msgs[topic]; lb.msgs[topic].add(std::move(msg));
pending_batch.emplace_back(msg.move_data());
if ( lb.message_count >= log_batch_size ) if ( lb.message_count >= log_batch_size )
statistics.num_logs_outgoing += lb.Flush(bstate->endpoint, log_batch_size); statistics.num_logs_outgoing += lb.Flush(bstate->endpoint, log_batch_size);
@ -879,15 +878,10 @@ size_t Manager::LogBuffer::Flush(broker::endpoint& endpoint, size_t log_batch_si
// No logs buffered for this stream. // No logs buffered for this stream.
return 0; return 0;
for ( auto& kv : msgs ) for ( auto& [topic, pending_batch] : msgs )
{ {
auto& topic = kv.first; if ( ! pending_batch.empty() )
auto& pending_batch = kv.second; endpoint.publish(topic, pending_batch.build());
broker::vector batch;
batch.reserve(log_batch_size + 1);
pending_batch.swap(batch);
broker::zeek::Batch msg(std::move(batch));
endpoint.publish(topic, msg.move_data());
} }
auto rval = message_count; auto rval = message_count;
@ -1097,55 +1091,6 @@ bool Manager::Unsubscribe(const string& topic_prefix)
return true; return true;
} }
void Manager::DispatchMessage(const broker::topic& topic, broker::data msg)
{
switch ( broker::zeek::Message::type(msg) )
{
case broker::zeek::Message::Type::Invalid:
reporter->Warning("received invalid broker message: %s", broker::to_string(msg).data());
break;
case broker::zeek::Message::Type::Event:
ProcessEvent(topic, std::move(msg));
break;
case broker::zeek::Message::Type::LogCreate:
ProcessLogCreate(std::move(msg));
break;
case broker::zeek::Message::Type::LogWrite:
ProcessLogWrite(std::move(msg));
break;
case broker::zeek::Message::Type::IdentifierUpdate:
ProcessIdentifierUpdate(std::move(msg));
break;
case broker::zeek::Message::Type::Batch:
{
broker::zeek::Batch batch(std::move(msg));
if ( ! batch.valid() )
{
reporter->Warning("received invalid broker Batch: %s",
broker::to_string(batch.as_data()).data());
return;
}
for ( auto& i : batch.batch() )
DispatchMessage(topic, std::move(i));
break;
}
default:
// We ignore unknown types so that we could add more in the
// future if we had too.
reporter->Warning("received unknown broker message: %s", broker::to_string(msg).data());
break;
}
}
void Manager::Process() void Manager::Process()
{ {
auto messages = bstate->subscriber.poll(); auto messages = bstate->subscriber.poll();
@ -1197,8 +1142,13 @@ void Manager::Process()
// different memory region if other threads keep references to the // different memory region if other threads keep references to the
// message. Since `topic` still points into the original memory // message. Since `topic` still points into the original memory
// region, we may no longer access it after this point. // region, we may no longer access it after this point.
auto unshared_topic = broker::move_topic(message); auto topic_str = broker::get_topic_str(message);
DispatchMessage(unshared_topic, broker::move_data(message)); broker::zeek::visit_as_message(
[this, topic_str](auto& msg)
{
ProcessMessage(topic_str, msg);
},
std::move(message));
} }
catch ( std::runtime_error& e ) catch ( std::runtime_error& e )
{ {
@ -1382,7 +1332,21 @@ void Manager::ProcessStoreEvent(broker::data msg)
} }
} }
void Manager::ProcessEvent(const broker::topic& topic, broker::zeek::Event ev) void Manager::ProcessMessage(std::string_view topic, broker::zeek::Invalid& ev)
{
reporter->Warning("received invalid broker message: %s", broker::to_string(ev).c_str());
}
void Manager::ProcessMessage(std::string_view topic, broker::zeek::Batch& ev)
{
ev.for_each(
[this, topic](auto& inner)
{
ProcessMessage(topic, inner);
});
}
void Manager::ProcessMessage(std::string_view topic, broker::zeek::Event& ev)
{ {
if ( ! ev.valid() ) if ( ! ev.valid() )
{ {
@ -1391,8 +1355,8 @@ void Manager::ProcessEvent(const broker::topic& topic, broker::zeek::Event ev)
return; return;
} }
auto name = std::move(ev.name()); const auto& name = ev.name();
auto args = std::move(ev.args()); const auto& args = ev.args();
double ts; double ts;
if ( auto ev_ts = ev.ts() ) if ( auto ev_ts = ev.ts() )
@ -1408,14 +1372,12 @@ void Manager::ProcessEvent(const broker::topic& topic, broker::zeek::Event ev)
if ( ! handler ) if ( ! handler )
return; return;
auto& topic_string = topic.string();
for ( const auto& p : forwarded_prefixes ) for ( const auto& p : forwarded_prefixes )
{ {
if ( p.size() > topic_string.size() ) if ( p.size() > topic.size() )
continue; continue;
if ( strncmp(p.data(), topic_string.data(), p.size()) != 0 ) if ( strncmp(p.data(), topic.data(), p.size()) != 0 )
continue; continue;
DBG_LOG(DBG_BROKER, "Skip processing of forwarded event: %s %s", name.data(), DBG_LOG(DBG_BROKER, "Skip processing of forwarded event: %s %s", name.data(),
@ -1486,7 +1448,7 @@ void Manager::ProcessEvent(const broker::topic& topic, broker::zeek::Event ev)
event_mgr.Enqueue(handler, std::move(vl), util::detail::SOURCE_BROKER, 0, nullptr, ts); event_mgr.Enqueue(handler, std::move(vl), util::detail::SOURCE_BROKER, 0, nullptr, ts);
} }
bool Manager::ProcessLogCreate(broker::zeek::LogCreate lc) bool Manager::ProcessMessage(std::string_view, broker::zeek::LogCreate& lc)
{ {
DBG_LOG(DBG_BROKER, "Received log-create: %s", RenderMessage(lc.as_data()).c_str()); DBG_LOG(DBG_BROKER, "Received log-create: %s", RenderMessage(lc.as_data()).c_str());
if ( ! lc.valid() ) if ( ! lc.valid() )
@ -1552,7 +1514,7 @@ bool Manager::ProcessLogCreate(broker::zeek::LogCreate lc)
return true; return true;
} }
bool Manager::ProcessLogWrite(broker::zeek::LogWrite lw) bool Manager::ProcessMessage(std::string_view, broker::zeek::LogWrite& lw)
{ {
DBG_LOG(DBG_BROKER, "Received log-write: %s", RenderMessage(lw.as_data()).c_str()); DBG_LOG(DBG_BROKER, "Received log-write: %s", RenderMessage(lw.as_data()).c_str());
@ -1584,27 +1546,12 @@ bool Manager::ProcessLogWrite(broker::zeek::LogWrite lw)
return false; return false;
} }
auto path = get_if<std::string>(&lw.path()); auto path = std::string{lw.path_str()};
if ( ! path ) auto serial_data = lw.serial_data_str();
{
reporter->Warning("failed to unpack remote log values (bad path variant) for stream: %s",
stream_id_name.data());
return false;
}
auto serial_data = get_if<std::string>(&lw.serial_data());
if ( ! serial_data )
{
reporter->Warning(
"failed to unpack remote log values (bad serial_data variant) for stream: %s",
stream_id_name.data());
return false;
}
zeek::detail::BinarySerializationFormat fmt; zeek::detail::BinarySerializationFormat fmt;
fmt.StartRead(serial_data->data(), serial_data->size()); fmt.StartRead(serial_data.data(), serial_data.size());
int num_fields; int num_fields;
bool success = fmt.Read(&num_fields, "num_fields"); bool success = fmt.Read(&num_fields, "num_fields");
@ -1635,13 +1582,13 @@ bool Manager::ProcessLogWrite(broker::zeek::LogWrite lw)
} }
} }
log_mgr->WriteFromRemote(stream_id->AsEnumVal(), writer_id->AsEnumVal(), std::move(*path), log_mgr->WriteFromRemote(stream_id->AsEnumVal(), writer_id->AsEnumVal(), path, num_fields,
num_fields, vals); vals);
fmt.EndRead(); fmt.EndRead();
return true; return true;
} }
bool Manager::ProcessIdentifierUpdate(broker::zeek::IdentifierUpdate iu) bool Manager::ProcessMessage(std::string_view, broker::zeek::IdentifierUpdate& iu)
{ {
DBG_LOG(DBG_BROKER, "Received id-update: %s", RenderMessage(iu.as_data()).c_str()); DBG_LOG(DBG_BROKER, "Received id-update: %s", RenderMessage(iu.as_data()).c_str());

View file

@ -419,17 +419,18 @@ public:
void SetMetricsExportPrefixes(std::vector<std::string> filter); void SetMetricsExportPrefixes(std::vector<std::string> filter);
private: private:
void DispatchMessage(const broker::topic& topic, broker::data msg);
// Process events used for Broker store backed zeek tables // Process events used for Broker store backed zeek tables
void ProcessStoreEvent(broker::data msg); void ProcessStoreEvent(broker::data msg);
// Common functionality for processing insert and update events. // Common functionality for processing insert and update events.
void ProcessStoreEventInsertUpdate(const TableValPtr& table, const std::string& store_id, void ProcessStoreEventInsertUpdate(const TableValPtr& table, const std::string& store_id,
const broker::data& key, const broker::data& data, const broker::data& key, const broker::data& data,
const broker::data& old_value, bool insert); const broker::data& old_value, bool insert);
void ProcessEvent(const broker::topic& topic, broker::zeek::Event ev); void ProcessMessage(std::string_view topic, broker::zeek::Batch& ev);
bool ProcessLogCreate(broker::zeek::LogCreate lc); void ProcessMessage(std::string_view topic, broker::zeek::Event& ev);
bool ProcessLogWrite(broker::zeek::LogWrite lw); void ProcessMessage(std::string_view topic, broker::zeek::Invalid& ev);
bool ProcessIdentifierUpdate(broker::zeek::IdentifierUpdate iu); bool ProcessMessage(std::string_view topic, broker::zeek::LogCreate& lc);
bool ProcessMessage(std::string_view topic, broker::zeek::LogWrite& lw);
bool ProcessMessage(std::string_view topic, broker::zeek::IdentifierUpdate& iu);
void ProcessStatus(broker::status_view stat); void ProcessStatus(broker::status_view stat);
void ProcessError(broker::error_view err); void ProcessError(broker::error_view err);
void ProcessStoreResponse(detail::StoreHandleVal*, broker::store::response response); void ProcessStoreResponse(detail::StoreHandleVal*, broker::store::response response);
@ -452,7 +453,7 @@ private:
struct LogBuffer struct LogBuffer
{ {
// Indexed by topic string. // Indexed by topic string.
std::unordered_map<std::string, broker::vector> msgs; std::unordered_map<std::string, broker::zeek::BatchBuilder> msgs;
size_t message_count; size_t message_count;
size_t Flush(broker::endpoint& endpoint, size_t batch_size); size_t Flush(broker::endpoint& endpoint, size_t batch_size);