cluster/Backend: Add name and lookup component tag

This adds two new accessors on Backend, Name() and Tag() that can
be used for introspection of a Backend instance.
This commit is contained in:
Arne Welzel 2025-04-10 18:38:03 +02:00
parent 214629e054
commit 3946856f06
5 changed files with 37 additions and 12 deletions

View file

@ -277,7 +277,7 @@ std::string RenderEvent(const std::string& topic, const std::string& name, const
} // namespace } // namespace
#endif #endif
Manager::Manager(bool arg_use_real_time) : Backend(nullptr, nullptr, nullptr), iosource::IOSource(true) { Manager::Manager(bool arg_use_real_time) : Backend("Broker", nullptr, nullptr, nullptr), iosource::IOSource(true) {
bound_port = 0; bound_port = 0;
use_real_time = arg_use_real_time; use_real_time = arg_use_real_time;
peer_count = 0; peer_count = 0;

View file

@ -10,6 +10,7 @@
#include "zeek/Func.h" #include "zeek/Func.h"
#include "zeek/Reporter.h" #include "zeek/Reporter.h"
#include "zeek/Type.h" #include "zeek/Type.h"
#include "zeek/cluster/Manager.h"
#include "zeek/cluster/OnLoop.h" #include "zeek/cluster/OnLoop.h"
#include "zeek/cluster/Serializer.h" #include "zeek/cluster/Serializer.h"
#include "zeek/logging/Manager.h" #include "zeek/logging/Manager.h"
@ -69,9 +70,16 @@ std::optional<zeek::Args> detail::check_args(const zeek::FuncValPtr& handler, ze
return result; return result;
} }
Backend::Backend(std::unique_ptr<EventSerializer> es, std::unique_ptr<LogSerializer> ls, Backend::Backend(std::string_view arg_name, std::unique_ptr<EventSerializer> es, std::unique_ptr<LogSerializer> ls,
std::unique_ptr<detail::EventHandlingStrategy> ehs) std::unique_ptr<detail::EventHandlingStrategy> ehs)
: event_serializer(std::move(es)), log_serializer(std::move(ls)), event_handling_strategy(std::move(ehs)) {} : name(arg_name),
event_serializer(std::move(es)),
log_serializer(std::move(ls)),
event_handling_strategy(std::move(ehs)) {
tag = zeek::cluster::manager->Backends().GetComponentTag(name);
if ( ! tag )
reporter->InternalError("unknown cluster backend name '%s'; mismatch with tag component?", name.c_str());
}
std::optional<detail::Event> Backend::MakeClusterEvent(FuncValPtr handler, ArgsSpan args, double timestamp) const { std::optional<detail::Event> Backend::MakeClusterEvent(FuncValPtr handler, ArgsSpan args, double timestamp) const {
auto checked_args = detail::check_args(handler, args); auto checked_args = detail::check_args(handler, args);
@ -158,10 +166,10 @@ bool ThreadedBackend::ProcessBackendMessage(int tag, detail::byte_buffer_span pa
return DoProcessBackendMessage(tag, payload); return DoProcessBackendMessage(tag, payload);
} }
ThreadedBackend::ThreadedBackend(std::unique_ptr<EventSerializer> es, std::unique_ptr<LogSerializer> ls, ThreadedBackend::ThreadedBackend(std::string_view name, std::unique_ptr<EventSerializer> es,
std::unique_ptr<detail::EventHandlingStrategy> ehs) std::unique_ptr<LogSerializer> ls, std::unique_ptr<detail::EventHandlingStrategy> ehs)
: Backend(std::move(es), std::move(ls), std::move(ehs)) { : Backend(name, std::move(es), std::move(ls), std::move(ehs)) {
onloop = new zeek::detail::OnLoopProcess<ThreadedBackend, QueueMessage>(this, "ThreadedBackend"); onloop = new zeek::detail::OnLoopProcess<ThreadedBackend, QueueMessage>(this, Name());
onloop->Register(true); // Register as don't count first onloop->Register(true); // Register as don't count first
} }

View file

@ -12,6 +12,7 @@
#include "zeek/EventHandler.h" #include "zeek/EventHandler.h"
#include "zeek/IntrusivePtr.h" #include "zeek/IntrusivePtr.h"
#include "zeek/Span.h" #include "zeek/Span.h"
#include "zeek/Tag.h"
#include "zeek/cluster/Serializer.h" #include "zeek/cluster/Serializer.h"
#include "zeek/logging/Types.h" #include "zeek/logging/Types.h"
@ -269,6 +270,16 @@ public:
return DoPublishLogWrites(header, records); return DoPublishLogWrites(header, records);
} }
/**
* @return This backend's implementation name.
*/
const std::string& Name() const { return name; }
/**
* @return This backend's implementation component tag.
*/
const zeek::Tag& Tag() const { return tag; }
/** /**
* @return This backend's node identifier. * @return This backend's node identifier.
*/ */
@ -278,11 +289,12 @@ protected:
/** /**
* Constructor. * Constructor.
* *
* @param name The name corresponding to the component tag.
* @param es The event serializer to use. * @param es The event serializer to use.
* @param ls The log batch serializer to use. * @param ls The log batch serializer to use.
* @param ehs The event handling strategy to use for this backend. * @param ehs The event handling strategy to use for this backend.
*/ */
Backend(std::unique_ptr<EventSerializer> es, std::unique_ptr<LogSerializer> ls, Backend(std::string_view name, std::unique_ptr<EventSerializer> es, std::unique_ptr<LogSerializer> ls,
std::unique_ptr<detail::EventHandlingStrategy> ehs); std::unique_ptr<detail::EventHandlingStrategy> ehs);
/** /**
@ -430,6 +442,8 @@ private:
virtual bool DoPublishLogWrites(const zeek::logging::detail::LogWriteHeader& header, const std::string& format, virtual bool DoPublishLogWrites(const zeek::logging::detail::LogWriteHeader& header, const std::string& format,
detail::byte_buffer& buf) = 0; detail::byte_buffer& buf) = 0;
std::string name;
zeek::Tag tag;
std::unique_ptr<EventSerializer> event_serializer; std::unique_ptr<EventSerializer> event_serializer;
std::unique_ptr<LogSerializer> log_serializer; std::unique_ptr<LogSerializer> log_serializer;
std::unique_ptr<detail::EventHandlingStrategy> event_handling_strategy; std::unique_ptr<detail::EventHandlingStrategy> event_handling_strategy;
@ -496,7 +510,7 @@ protected:
/** /**
* Constructor. * Constructor.
*/ */
ThreadedBackend(std::unique_ptr<EventSerializer> es, std::unique_ptr<LogSerializer> ls, ThreadedBackend(std::string_view name, std::unique_ptr<EventSerializer> es, std::unique_ptr<LogSerializer> ls,
std::unique_ptr<detail::EventHandlingStrategy> ehs); std::unique_ptr<detail::EventHandlingStrategy> ehs);
/** /**

View file

@ -64,7 +64,7 @@ constexpr DebugFlag operator&(zeek_uint_t x, DebugFlag y) {
ZeroMQBackend::ZeroMQBackend(std::unique_ptr<EventSerializer> es, std::unique_ptr<LogSerializer> ls, ZeroMQBackend::ZeroMQBackend(std::unique_ptr<EventSerializer> es, std::unique_ptr<LogSerializer> ls,
std::unique_ptr<detail::EventHandlingStrategy> ehs) std::unique_ptr<detail::EventHandlingStrategy> ehs)
: ThreadedBackend(std::move(es), std::move(ls), std::move(ehs)) { : ThreadedBackend("ZeroMQ", std::move(es), std::move(ls), std::move(ehs)) {
log_push = zmq::socket_t(ctx, zmq::socket_type::push); log_push = zmq::socket_t(ctx, zmq::socket_type::push);
main_inproc = zmq::socket_t(ctx, zmq::socket_type::pair); main_inproc = zmq::socket_t(ctx, zmq::socket_type::pair);
} }

View file

@ -683,8 +683,6 @@ SetupResult setup(int argc, char** argv, Options* zopts) {
input_mgr = new input::Manager(); input_mgr = new input::Manager();
file_mgr = new file_analysis::Manager(); file_mgr = new file_analysis::Manager();
cluster::manager = new cluster::Manager(); cluster::manager = new cluster::Manager();
auto broker_real_time = ! options.pcap_file && ! options.deterministic_mode;
broker_mgr = new Broker::Manager(broker_real_time);
trigger_mgr = new trigger::Manager(); trigger_mgr = new trigger::Manager();
#ifdef HAVE_SPICY #ifdef HAVE_SPICY
spicy_mgr = new spicy::Manager(); // registers as plugin with the plugin manager spicy_mgr = new spicy::Manager(); // registers as plugin with the plugin manager
@ -695,6 +693,11 @@ SetupResult setup(int argc, char** argv, Options* zopts) {
file_mgr->InitPreScript(); file_mgr->InitPreScript();
zeekygen_mgr->InitPreScript(); zeekygen_mgr->InitPreScript();
// Needs the "broker plugin" loaded during plugin_mgr->InitPreScript()
// before Broker::Manager can be instantiated.
auto broker_real_time = ! options.pcap_file && ! options.deterministic_mode;
broker_mgr = new Broker::Manager(broker_real_time);
// This has to happen before ActivateDynamicPlugin() below or the list of plugins in the // This has to happen before ActivateDynamicPlugin() below or the list of plugins in the
// manager will be missing the plugins we want to try to add to the path. // manager will be missing the plugins we want to try to add to the path.
plugin_mgr->ExtendZeekPathForPlugins(); plugin_mgr->ExtendZeekPathForPlugins();