diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index 7c70ba3b25..1b0ba2cdaf 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -28,6 +28,7 @@ #include "zeek/broker/store.bif.h" #include "zeek/iosource/Manager.h" #include "zeek/logging/Manager.h" +#include "zeek/logging/Types.h" #include "zeek/telemetry/Manager.h" #include "zeek/util.h" @@ -252,7 +253,7 @@ std::string RenderEvent(const std::string& topic, const std::string& name, const } // namespace #endif -Manager::Manager(bool arg_use_real_time) { +Manager::Manager(bool arg_use_real_time) : Backend(nullptr, nullptr) { bound_port = 0; use_real_time = arg_use_real_time; peer_count = 0; @@ -262,7 +263,7 @@ Manager::Manager(bool arg_use_real_time) { writer_id_type = nullptr; } -void Manager::InitPostScript() { +void Manager::DoInitPostScript() { DBG_LOG(DBG_BROKER, "Initializing"); log_batch_size = get_option("Broker::log_batch_size")->AsCount(); @@ -404,7 +405,7 @@ void Manager::InitializeBrokerStoreForwarding() { } } -void Manager::Terminate() { +void Manager::DoTerminate() { FlushLogBuffers(); iosource_mgr->UnregisterFd(bstate->subscriber.fd(), this); @@ -545,6 +546,22 @@ std::vector Manager::Peers() const { std::string Manager::NodeID() const { return to_string(bstate->endpoint.node_id()); } +bool Manager::DoPublishEvent(const std::string& topic, const cluster::detail::Event& event) { + broker::vector xs; + xs.reserve(event.args.size()); + for ( const auto& a : event.args ) { + auto r = detail::val_to_data(a.get()); + if ( ! r ) { + Error("Failed to convert %s to broker::data", zeek::obj_desc(a.get()).c_str()); + return false; + } + xs.emplace_back(std::move(r.value())); + } + + std::string name(event.HandlerName()); + return PublishEvent(topic, name, std::move(xs), event.timestamp); +} + bool Manager::PublishEvent(string topic, std::string name, broker::vector args, double ts) { if ( bstate->endpoint.is_shutdown() ) return true; @@ -930,7 +947,7 @@ zeek::RecordValPtr Manager::MakeEvent(ArgsSpan args, zeek::detail::Frame* frame) return rval; } -bool Manager::Subscribe(const string& topic_prefix) { +bool Manager::DoSubscribe(const string& topic_prefix) { DBG_LOG(DBG_BROKER, "Subscribing to topic prefix %s", topic_prefix.c_str()); bstate->subscriber.add_topic(topic_prefix, ! run_state::detail::zeek_init_done); @@ -948,7 +965,7 @@ bool Manager::Forward(string topic_prefix) { return true; } -bool Manager::Unsubscribe(const string& topic_prefix) { +bool Manager::DoUnsubscribe(const string& topic_prefix) { for ( size_t i = 0; i < forwarded_prefixes.size(); ++i ) if ( forwarded_prefixes[i] == topic_prefix ) { DBG_LOG(DBG_BROKER, "Unforwarding topic prefix %s", topic_prefix.c_str()); diff --git a/src/broker/Manager.h b/src/broker/Manager.h index 96ba237589..a27e61ec14 100644 --- a/src/broker/Manager.h +++ b/src/broker/Manager.h @@ -10,13 +10,16 @@ #include #include #include +#include #include #include #include "zeek/IntrusivePtr.h" #include "zeek/Span.h" #include "zeek/broker/Data.h" +#include "zeek/cluster/Backend.h" #include "zeek/iosource/IOSource.h" +#include "zeek/logging/Types.h" #include "zeek/logging/WriterBackend.h" namespace zeek { @@ -75,7 +78,7 @@ struct Stats { * Manages various forms of communication between peer Zeek processes * or other external applications via use of the Broker messaging library. */ -class Manager : public iosource::IOSource { +class Manager : public zeek::cluster::Backend, public iosource::IOSource { public: /** Broker protocol to expect on a listening port. */ enum class BrokerProtocol { @@ -95,17 +98,6 @@ public: */ ~Manager() override = default; - /** - * Initialization of the manager. This is called late during Zeek's - * initialization after any scripts are processed. - */ - void InitPostScript(); - - /** - * Shuts Broker down at termination. - */ - void Terminate(); - /** * Returns true if any Broker communication is currently active. */ @@ -193,6 +185,8 @@ public: return PublishEvent(std::move(topic), std::move(name), std::move(broker::get(args.value_)), ts); } + using cluster::Backend::PublishEvent; + /** * Send an event to any interested peers. * @param topic a topic string associated with the message. @@ -277,15 +271,6 @@ public: */ zeek::RecordValPtr MakeEvent(ArgsSpan args, zeek::detail::Frame* frame); - /** - * Register interest in peer event messages that use a certain topic prefix. - * @param topic_prefix a prefix to match against remote message topics. - * e.g. an empty prefix will match everything and "a" will match "alice" - * and "amy" but not "bob". - * @return true if it's a new event subscription and it is now registered. - */ - bool Subscribe(const std::string& topic_prefix); - /** * Register interest in peer event messages that use a certain topic prefix, * but that should not be raised locally, just forwarded to any subscribing @@ -297,14 +282,6 @@ public: */ bool Forward(std::string topic_prefix); - /** - * Unregister interest in peer event messages. - * @param topic_prefix a prefix previously supplied to a successful call - * to zeek::Broker::Manager::Subscribe() or zeek::Broker::Manager::Forward(). - * @return true if interest in topic prefix is no longer advertised. - */ - bool Unsubscribe(const std::string& topic_prefix); - /** * Create a new *master* data store. * @param name The name of the store. @@ -394,6 +371,48 @@ public: }; private: + // Register interest in peer event messages that use a certain topic prefix. + bool DoSubscribe(const std::string& topic_prefix) override; + + // Unregister interest in peer event messages. + bool DoUnsubscribe(const std::string& topic_prefix) override; + + // Initialization of the manager. This is called late during Zeek's + // initialization after any scripts are processed. + void DoInitPostScript() override; + + // Broker doesn't do anything during Broker::Backend::init(). + bool DoInit() override { return true; } + + // Shuts Broker down at termination. + void DoTerminate() override; + + // Broker overrides this to do its own serialization. + bool DoPublishEvent(const std::string& topic, const cluster::detail::Event& event) override; + + // This should never be reached, broker itself doesn't call this and overrides + // the generic DoPublishEvent() method that would call this. + bool DoPublishEvent(const std::string& topic, const std::string& format, + const cluster::detail::byte_buffer& buf) override { + throw std::logic_error("not implemented"); + } + + // WriterFrontend instances are broker-aware and never call this + // method and instead call the existing PublishLogWrite() method. + // + // TODO: Move log buffering out of broker and implement. + bool DoPublishLogWrites(const logging::detail::LogWriteHeader& header, + zeek::Span records) override { + // Not implemented by broker. + throw std::logic_error("not implemented"); + } + + bool DoPublishLogWrites(const logging::detail::LogWriteHeader& header, const std::string& format, + cluster::detail::byte_buffer& buf) override { + // Not implemented by broker. + throw std::logic_error("not implemented"); + } + // Process events used for Broker store backed zeek tables void ProcessStoreEvent(broker::data msg); // Common functionality for processing insert and update events.