From 3946856f06b25fc91b885b941ab634a3fa6f4950 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Thu, 10 Apr 2025 18:38:03 +0200 Subject: [PATCH] cluster/Backend: Add name and lookup component tag This adds two new accessors on Backend, Name() and Tag() that can be used for introspection of a Backend instance. --- src/broker/Manager.cc | 2 +- src/cluster/Backend.cc | 20 ++++++++++++++------ src/cluster/Backend.h | 18 ++++++++++++++++-- src/cluster/backend/zeromq/ZeroMQ.cc | 2 +- src/zeek-setup.cc | 7 +++++-- 5 files changed, 37 insertions(+), 12 deletions(-) diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index ce94c72b69..3ea29b8b58 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -277,7 +277,7 @@ std::string RenderEvent(const std::string& topic, const std::string& name, const } // namespace #endif -Manager::Manager(bool arg_use_real_time) : Backend(nullptr, nullptr, nullptr), iosource::IOSource(true) { +Manager::Manager(bool arg_use_real_time) : Backend("Broker", nullptr, nullptr, nullptr), iosource::IOSource(true) { bound_port = 0; use_real_time = arg_use_real_time; peer_count = 0; diff --git a/src/cluster/Backend.cc b/src/cluster/Backend.cc index 202b38fd46..8fba80b48b 100644 --- a/src/cluster/Backend.cc +++ b/src/cluster/Backend.cc @@ -10,6 +10,7 @@ #include "zeek/Func.h" #include "zeek/Reporter.h" #include "zeek/Type.h" +#include "zeek/cluster/Manager.h" #include "zeek/cluster/OnLoop.h" #include "zeek/cluster/Serializer.h" #include "zeek/logging/Manager.h" @@ -69,9 +70,16 @@ std::optional detail::check_args(const zeek::FuncValPtr& handler, ze return result; } -Backend::Backend(std::unique_ptr es, std::unique_ptr ls, +Backend::Backend(std::string_view arg_name, std::unique_ptr es, std::unique_ptr ls, std::unique_ptr ehs) - : event_serializer(std::move(es)), log_serializer(std::move(ls)), event_handling_strategy(std::move(ehs)) {} + : name(arg_name), + event_serializer(std::move(es)), + log_serializer(std::move(ls)), + event_handling_strategy(std::move(ehs)) { + tag = zeek::cluster::manager->Backends().GetComponentTag(name); + if ( ! tag ) + reporter->InternalError("unknown cluster backend name '%s'; mismatch with tag component?", name.c_str()); +} std::optional Backend::MakeClusterEvent(FuncValPtr handler, ArgsSpan args, double timestamp) const { auto checked_args = detail::check_args(handler, args); @@ -158,10 +166,10 @@ bool ThreadedBackend::ProcessBackendMessage(int tag, detail::byte_buffer_span pa return DoProcessBackendMessage(tag, payload); } -ThreadedBackend::ThreadedBackend(std::unique_ptr es, std::unique_ptr ls, - std::unique_ptr ehs) - : Backend(std::move(es), std::move(ls), std::move(ehs)) { - onloop = new zeek::detail::OnLoopProcess(this, "ThreadedBackend"); +ThreadedBackend::ThreadedBackend(std::string_view name, std::unique_ptr es, + std::unique_ptr ls, std::unique_ptr ehs) + : Backend(name, std::move(es), std::move(ls), std::move(ehs)) { + onloop = new zeek::detail::OnLoopProcess(this, Name()); onloop->Register(true); // Register as don't count first } diff --git a/src/cluster/Backend.h b/src/cluster/Backend.h index 3f27c293a7..5f0495ac18 100644 --- a/src/cluster/Backend.h +++ b/src/cluster/Backend.h @@ -12,6 +12,7 @@ #include "zeek/EventHandler.h" #include "zeek/IntrusivePtr.h" #include "zeek/Span.h" +#include "zeek/Tag.h" #include "zeek/cluster/Serializer.h" #include "zeek/logging/Types.h" @@ -269,6 +270,16 @@ public: return DoPublishLogWrites(header, records); } + /** + * @return This backend's implementation name. + */ + const std::string& Name() const { return name; } + + /** + * @return This backend's implementation component tag. + */ + const zeek::Tag& Tag() const { return tag; } + /** * @return This backend's node identifier. */ @@ -278,11 +289,12 @@ protected: /** * Constructor. * + * @param name The name corresponding to the component tag. * @param es The event serializer to use. * @param ls The log batch serializer to use. * @param ehs The event handling strategy to use for this backend. */ - Backend(std::unique_ptr es, std::unique_ptr ls, + Backend(std::string_view name, std::unique_ptr es, std::unique_ptr ls, std::unique_ptr ehs); /** @@ -430,6 +442,8 @@ private: virtual bool DoPublishLogWrites(const zeek::logging::detail::LogWriteHeader& header, const std::string& format, detail::byte_buffer& buf) = 0; + std::string name; + zeek::Tag tag; std::unique_ptr event_serializer; std::unique_ptr log_serializer; std::unique_ptr event_handling_strategy; @@ -496,7 +510,7 @@ protected: /** * Constructor. */ - ThreadedBackend(std::unique_ptr es, std::unique_ptr ls, + ThreadedBackend(std::string_view name, std::unique_ptr es, std::unique_ptr ls, std::unique_ptr ehs); /** diff --git a/src/cluster/backend/zeromq/ZeroMQ.cc b/src/cluster/backend/zeromq/ZeroMQ.cc index f05ab88d48..43a48aec89 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.cc +++ b/src/cluster/backend/zeromq/ZeroMQ.cc @@ -64,7 +64,7 @@ constexpr DebugFlag operator&(zeek_uint_t x, DebugFlag y) { ZeroMQBackend::ZeroMQBackend(std::unique_ptr es, std::unique_ptr ls, std::unique_ptr ehs) - : ThreadedBackend(std::move(es), std::move(ls), std::move(ehs)) { + : ThreadedBackend("ZeroMQ", std::move(es), std::move(ls), std::move(ehs)) { log_push = zmq::socket_t(ctx, zmq::socket_type::push); main_inproc = zmq::socket_t(ctx, zmq::socket_type::pair); } diff --git a/src/zeek-setup.cc b/src/zeek-setup.cc index 050319621c..d08d21ef88 100644 --- a/src/zeek-setup.cc +++ b/src/zeek-setup.cc @@ -683,8 +683,6 @@ SetupResult setup(int argc, char** argv, Options* zopts) { input_mgr = new input::Manager(); file_mgr = new file_analysis::Manager(); cluster::manager = new cluster::Manager(); - auto broker_real_time = ! options.pcap_file && ! options.deterministic_mode; - broker_mgr = new Broker::Manager(broker_real_time); trigger_mgr = new trigger::Manager(); #ifdef HAVE_SPICY spicy_mgr = new spicy::Manager(); // registers as plugin with the plugin manager @@ -695,6 +693,11 @@ SetupResult setup(int argc, char** argv, Options* zopts) { file_mgr->InitPreScript(); zeekygen_mgr->InitPreScript(); + // Needs the "broker plugin" loaded during plugin_mgr->InitPreScript() + // before Broker::Manager can be instantiated. + auto broker_real_time = ! options.pcap_file && ! options.deterministic_mode; + broker_mgr = new Broker::Manager(broker_real_time); + // This has to happen before ActivateDynamicPlugin() below or the list of plugins in the // manager will be missing the plugins we want to try to add to the path. plugin_mgr->ExtendZeekPathForPlugins();