From 015608fef62e334be6d41881a3b2abd45564f2a7 Mon Sep 17 00:00:00 2001 From: Dominik Charousset Date: Sun, 24 Sep 2023 15:52:21 +0200 Subject: [PATCH] Broker: Adapt to upstream API adjustments --- auxil/broker | 2 +- src/broker/Manager.cc | 129 +++++++++++++----------------------------- src/broker/Manager.h | 13 +++-- 3 files changed, 46 insertions(+), 98 deletions(-) diff --git a/auxil/broker b/auxil/broker index f9af5f0ed2..86ed39b80c 160000 --- a/auxil/broker +++ b/auxil/broker @@ -1 +1 @@ -Subproject commit f9af5f0ed2b87e01790fe06a0658cc54f1c32974 +Subproject commit 86ed39b80c272cff44bb95f58eeda1bc5ff6f65c diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index da81ceb7d4..8080541caa 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -861,8 +861,7 @@ bool Manager::PublishLogWrite(EnumVal* stream, EnumVal* writer, string path, int auto& lb = log_buffers[stream_id_num]; ++lb.message_count; - auto& pending_batch = lb.msgs[topic]; - pending_batch.emplace_back(msg.move_data()); + lb.msgs[topic].add(std::move(msg)); if ( lb.message_count >= log_batch_size ) statistics.num_logs_outgoing += lb.Flush(bstate->endpoint, log_batch_size); @@ -879,15 +878,10 @@ size_t Manager::LogBuffer::Flush(broker::endpoint& endpoint, size_t log_batch_si // No logs buffered for this stream. return 0; - for ( auto& kv : msgs ) + for ( auto& [topic, pending_batch] : msgs ) { - auto& topic = kv.first; - auto& pending_batch = kv.second; - broker::vector batch; - batch.reserve(log_batch_size + 1); - pending_batch.swap(batch); - broker::zeek::Batch msg(std::move(batch)); - endpoint.publish(topic, msg.move_data()); + if ( ! pending_batch.empty() ) + endpoint.publish(topic, pending_batch.build()); } auto rval = message_count; @@ -1097,55 +1091,6 @@ bool Manager::Unsubscribe(const string& topic_prefix) return true; } -void Manager::DispatchMessage(const broker::topic& topic, broker::data msg) - { - switch ( broker::zeek::Message::type(msg) ) - { - case broker::zeek::Message::Type::Invalid: - reporter->Warning("received invalid broker message: %s", broker::to_string(msg).data()); - break; - - case broker::zeek::Message::Type::Event: - ProcessEvent(topic, std::move(msg)); - break; - - case broker::zeek::Message::Type::LogCreate: - ProcessLogCreate(std::move(msg)); - break; - - case broker::zeek::Message::Type::LogWrite: - ProcessLogWrite(std::move(msg)); - break; - - case broker::zeek::Message::Type::IdentifierUpdate: - ProcessIdentifierUpdate(std::move(msg)); - break; - - case broker::zeek::Message::Type::Batch: - { - broker::zeek::Batch batch(std::move(msg)); - - if ( ! batch.valid() ) - { - reporter->Warning("received invalid broker Batch: %s", - broker::to_string(batch.as_data()).data()); - return; - } - - for ( auto& i : batch.batch() ) - DispatchMessage(topic, std::move(i)); - - break; - } - - default: - // We ignore unknown types so that we could add more in the - // future if we had too. - reporter->Warning("received unknown broker message: %s", broker::to_string(msg).data()); - break; - } - } - void Manager::Process() { auto messages = bstate->subscriber.poll(); @@ -1197,8 +1142,13 @@ void Manager::Process() // different memory region if other threads keep references to the // message. Since `topic` still points into the original memory // region, we may no longer access it after this point. - auto unshared_topic = broker::move_topic(message); - DispatchMessage(unshared_topic, broker::move_data(message)); + auto topic_str = broker::get_topic_str(message); + broker::zeek::visit_as_message( + [this, topic_str](auto& msg) + { + ProcessMessage(topic_str, msg); + }, + std::move(message)); } catch ( std::runtime_error& e ) { @@ -1382,7 +1332,21 @@ void Manager::ProcessStoreEvent(broker::data msg) } } -void Manager::ProcessEvent(const broker::topic& topic, broker::zeek::Event ev) +void Manager::ProcessMessage(std::string_view topic, broker::zeek::Invalid& ev) + { + reporter->Warning("received invalid broker message: %s", broker::to_string(ev).c_str()); + } + +void Manager::ProcessMessage(std::string_view topic, broker::zeek::Batch& ev) + { + ev.for_each( + [this, topic](auto& inner) + { + ProcessMessage(topic, inner); + }); + } + +void Manager::ProcessMessage(std::string_view topic, broker::zeek::Event& ev) { if ( ! ev.valid() ) { @@ -1391,8 +1355,8 @@ void Manager::ProcessEvent(const broker::topic& topic, broker::zeek::Event ev) return; } - auto name = std::move(ev.name()); - auto args = std::move(ev.args()); + const auto& name = ev.name(); + const auto& args = ev.args(); double ts; if ( auto ev_ts = ev.ts() ) @@ -1408,14 +1372,12 @@ void Manager::ProcessEvent(const broker::topic& topic, broker::zeek::Event ev) if ( ! handler ) return; - auto& topic_string = topic.string(); - for ( const auto& p : forwarded_prefixes ) { - if ( p.size() > topic_string.size() ) + if ( p.size() > topic.size() ) continue; - if ( strncmp(p.data(), topic_string.data(), p.size()) != 0 ) + if ( strncmp(p.data(), topic.data(), p.size()) != 0 ) continue; DBG_LOG(DBG_BROKER, "Skip processing of forwarded event: %s %s", name.data(), @@ -1486,7 +1448,7 @@ void Manager::ProcessEvent(const broker::topic& topic, broker::zeek::Event ev) event_mgr.Enqueue(handler, std::move(vl), util::detail::SOURCE_BROKER, 0, nullptr, ts); } -bool Manager::ProcessLogCreate(broker::zeek::LogCreate lc) +bool Manager::ProcessMessage(std::string_view, broker::zeek::LogCreate& lc) { DBG_LOG(DBG_BROKER, "Received log-create: %s", RenderMessage(lc.as_data()).c_str()); if ( ! lc.valid() ) @@ -1552,7 +1514,7 @@ bool Manager::ProcessLogCreate(broker::zeek::LogCreate lc) return true; } -bool Manager::ProcessLogWrite(broker::zeek::LogWrite lw) +bool Manager::ProcessMessage(std::string_view, broker::zeek::LogWrite& lw) { DBG_LOG(DBG_BROKER, "Received log-write: %s", RenderMessage(lw.as_data()).c_str()); @@ -1584,27 +1546,12 @@ bool Manager::ProcessLogWrite(broker::zeek::LogWrite lw) return false; } - auto path = get_if(&lw.path()); + auto path = std::string{lw.path_str()}; - if ( ! path ) - { - reporter->Warning("failed to unpack remote log values (bad path variant) for stream: %s", - stream_id_name.data()); - return false; - } - - auto serial_data = get_if(&lw.serial_data()); - - if ( ! serial_data ) - { - reporter->Warning( - "failed to unpack remote log values (bad serial_data variant) for stream: %s", - stream_id_name.data()); - return false; - } + auto serial_data = lw.serial_data_str(); zeek::detail::BinarySerializationFormat fmt; - fmt.StartRead(serial_data->data(), serial_data->size()); + fmt.StartRead(serial_data.data(), serial_data.size()); int num_fields; bool success = fmt.Read(&num_fields, "num_fields"); @@ -1635,13 +1582,13 @@ bool Manager::ProcessLogWrite(broker::zeek::LogWrite lw) } } - log_mgr->WriteFromRemote(stream_id->AsEnumVal(), writer_id->AsEnumVal(), std::move(*path), - num_fields, vals); + log_mgr->WriteFromRemote(stream_id->AsEnumVal(), writer_id->AsEnumVal(), path, num_fields, + vals); fmt.EndRead(); return true; } -bool Manager::ProcessIdentifierUpdate(broker::zeek::IdentifierUpdate iu) +bool Manager::ProcessMessage(std::string_view, broker::zeek::IdentifierUpdate& iu) { DBG_LOG(DBG_BROKER, "Received id-update: %s", RenderMessage(iu.as_data()).c_str()); diff --git a/src/broker/Manager.h b/src/broker/Manager.h index 60bc376fca..8dbb4797eb 100644 --- a/src/broker/Manager.h +++ b/src/broker/Manager.h @@ -419,17 +419,18 @@ public: void SetMetricsExportPrefixes(std::vector filter); private: - void DispatchMessage(const broker::topic& topic, broker::data msg); // Process events used for Broker store backed zeek tables void ProcessStoreEvent(broker::data msg); // Common functionality for processing insert and update events. void ProcessStoreEventInsertUpdate(const TableValPtr& table, const std::string& store_id, const broker::data& key, const broker::data& data, const broker::data& old_value, bool insert); - void ProcessEvent(const broker::topic& topic, broker::zeek::Event ev); - bool ProcessLogCreate(broker::zeek::LogCreate lc); - bool ProcessLogWrite(broker::zeek::LogWrite lw); - bool ProcessIdentifierUpdate(broker::zeek::IdentifierUpdate iu); + void ProcessMessage(std::string_view topic, broker::zeek::Batch& ev); + void ProcessMessage(std::string_view topic, broker::zeek::Event& ev); + void ProcessMessage(std::string_view topic, broker::zeek::Invalid& ev); + bool ProcessMessage(std::string_view topic, broker::zeek::LogCreate& lc); + bool ProcessMessage(std::string_view topic, broker::zeek::LogWrite& lw); + bool ProcessMessage(std::string_view topic, broker::zeek::IdentifierUpdate& iu); void ProcessStatus(broker::status_view stat); void ProcessError(broker::error_view err); void ProcessStoreResponse(detail::StoreHandleVal*, broker::store::response response); @@ -452,7 +453,7 @@ private: struct LogBuffer { // Indexed by topic string. - std::unordered_map msgs; + std::unordered_map msgs; size_t message_count; size_t Flush(broker::endpoint& endpoint, size_t batch_size);