broker: Implement cluster::Backend interface

This commit is contained in:
Arne Welzel 2024-11-22 10:56:06 +01:00
parent 91a8fd0c63
commit 68a391d767
2 changed files with 70 additions and 34 deletions

View file

@ -28,6 +28,7 @@
#include "zeek/broker/store.bif.h" #include "zeek/broker/store.bif.h"
#include "zeek/iosource/Manager.h" #include "zeek/iosource/Manager.h"
#include "zeek/logging/Manager.h" #include "zeek/logging/Manager.h"
#include "zeek/logging/Types.h"
#include "zeek/telemetry/Manager.h" #include "zeek/telemetry/Manager.h"
#include "zeek/util.h" #include "zeek/util.h"
@ -252,7 +253,7 @@ std::string RenderEvent(const std::string& topic, const std::string& name, const
} // namespace } // namespace
#endif #endif
Manager::Manager(bool arg_use_real_time) { Manager::Manager(bool arg_use_real_time) : Backend(nullptr, nullptr) {
bound_port = 0; bound_port = 0;
use_real_time = arg_use_real_time; use_real_time = arg_use_real_time;
peer_count = 0; peer_count = 0;
@ -262,7 +263,7 @@ Manager::Manager(bool arg_use_real_time) {
writer_id_type = nullptr; writer_id_type = nullptr;
} }
void Manager::InitPostScript() { void Manager::DoInitPostScript() {
DBG_LOG(DBG_BROKER, "Initializing"); DBG_LOG(DBG_BROKER, "Initializing");
log_batch_size = get_option("Broker::log_batch_size")->AsCount(); log_batch_size = get_option("Broker::log_batch_size")->AsCount();
@ -404,7 +405,7 @@ void Manager::InitializeBrokerStoreForwarding() {
} }
} }
void Manager::Terminate() { void Manager::DoTerminate() {
FlushLogBuffers(); FlushLogBuffers();
iosource_mgr->UnregisterFd(bstate->subscriber.fd(), this); iosource_mgr->UnregisterFd(bstate->subscriber.fd(), this);
@ -545,6 +546,22 @@ std::vector<broker::peer_info> Manager::Peers() const {
std::string Manager::NodeID() const { return to_string(bstate->endpoint.node_id()); } std::string Manager::NodeID() const { return to_string(bstate->endpoint.node_id()); }
bool Manager::DoPublishEvent(const std::string& topic, const cluster::detail::Event& event) {
broker::vector xs;
xs.reserve(event.args.size());
for ( const auto& a : event.args ) {
auto r = detail::val_to_data(a.get());
if ( ! r ) {
Error("Failed to convert %s to broker::data", zeek::obj_desc(a.get()).c_str());
return false;
}
xs.emplace_back(std::move(r.value()));
}
std::string name(event.HandlerName());
return PublishEvent(topic, name, std::move(xs), event.timestamp);
}
bool Manager::PublishEvent(string topic, std::string name, broker::vector args, double ts) { bool Manager::PublishEvent(string topic, std::string name, broker::vector args, double ts) {
if ( bstate->endpoint.is_shutdown() ) if ( bstate->endpoint.is_shutdown() )
return true; return true;
@ -930,7 +947,7 @@ zeek::RecordValPtr Manager::MakeEvent(ArgsSpan args, zeek::detail::Frame* frame)
return rval; return rval;
} }
bool Manager::Subscribe(const string& topic_prefix) { bool Manager::DoSubscribe(const string& topic_prefix) {
DBG_LOG(DBG_BROKER, "Subscribing to topic prefix %s", topic_prefix.c_str()); DBG_LOG(DBG_BROKER, "Subscribing to topic prefix %s", topic_prefix.c_str());
bstate->subscriber.add_topic(topic_prefix, ! run_state::detail::zeek_init_done); bstate->subscriber.add_topic(topic_prefix, ! run_state::detail::zeek_init_done);
@ -948,7 +965,7 @@ bool Manager::Forward(string topic_prefix) {
return true; return true;
} }
bool Manager::Unsubscribe(const string& topic_prefix) { bool Manager::DoUnsubscribe(const string& topic_prefix) {
for ( size_t i = 0; i < forwarded_prefixes.size(); ++i ) for ( size_t i = 0; i < forwarded_prefixes.size(); ++i )
if ( forwarded_prefixes[i] == topic_prefix ) { if ( forwarded_prefixes[i] == topic_prefix ) {
DBG_LOG(DBG_BROKER, "Unforwarding topic prefix %s", topic_prefix.c_str()); DBG_LOG(DBG_BROKER, "Unforwarding topic prefix %s", topic_prefix.c_str());

View file

@ -10,13 +10,16 @@
#include <broker/peer_info.hh> #include <broker/peer_info.hh>
#include <broker/zeek.hh> #include <broker/zeek.hh>
#include <memory> #include <memory>
#include <stdexcept>
#include <string> #include <string>
#include <unordered_map> #include <unordered_map>
#include "zeek/IntrusivePtr.h" #include "zeek/IntrusivePtr.h"
#include "zeek/Span.h" #include "zeek/Span.h"
#include "zeek/broker/Data.h" #include "zeek/broker/Data.h"
#include "zeek/cluster/Backend.h"
#include "zeek/iosource/IOSource.h" #include "zeek/iosource/IOSource.h"
#include "zeek/logging/Types.h"
#include "zeek/logging/WriterBackend.h" #include "zeek/logging/WriterBackend.h"
namespace zeek { namespace zeek {
@ -75,7 +78,7 @@ struct Stats {
* Manages various forms of communication between peer Zeek processes * Manages various forms of communication between peer Zeek processes
* or other external applications via use of the Broker messaging library. * or other external applications via use of the Broker messaging library.
*/ */
class Manager : public iosource::IOSource { class Manager : public zeek::cluster::Backend, public iosource::IOSource {
public: public:
/** Broker protocol to expect on a listening port. */ /** Broker protocol to expect on a listening port. */
enum class BrokerProtocol { enum class BrokerProtocol {
@ -95,17 +98,6 @@ public:
*/ */
~Manager() override = default; ~Manager() override = default;
/**
* Initialization of the manager. This is called late during Zeek's
* initialization after any scripts are processed.
*/
void InitPostScript();
/**
* Shuts Broker down at termination.
*/
void Terminate();
/** /**
* Returns true if any Broker communication is currently active. * Returns true if any Broker communication is currently active.
*/ */
@ -193,6 +185,8 @@ public:
return PublishEvent(std::move(topic), std::move(name), std::move(broker::get<broker::vector>(args.value_)), ts); return PublishEvent(std::move(topic), std::move(name), std::move(broker::get<broker::vector>(args.value_)), ts);
} }
using cluster::Backend::PublishEvent;
/** /**
* Send an event to any interested peers. * Send an event to any interested peers.
* @param topic a topic string associated with the message. * @param topic a topic string associated with the message.
@ -277,15 +271,6 @@ public:
*/ */
zeek::RecordValPtr MakeEvent(ArgsSpan args, zeek::detail::Frame* frame); zeek::RecordValPtr MakeEvent(ArgsSpan args, zeek::detail::Frame* frame);
/**
* Register interest in peer event messages that use a certain topic prefix.
* @param topic_prefix a prefix to match against remote message topics.
* e.g. an empty prefix will match everything and "a" will match "alice"
* and "amy" but not "bob".
* @return true if it's a new event subscription and it is now registered.
*/
bool Subscribe(const std::string& topic_prefix);
/** /**
* Register interest in peer event messages that use a certain topic prefix, * Register interest in peer event messages that use a certain topic prefix,
* but that should not be raised locally, just forwarded to any subscribing * but that should not be raised locally, just forwarded to any subscribing
@ -297,14 +282,6 @@ public:
*/ */
bool Forward(std::string topic_prefix); bool Forward(std::string topic_prefix);
/**
* Unregister interest in peer event messages.
* @param topic_prefix a prefix previously supplied to a successful call
* to zeek::Broker::Manager::Subscribe() or zeek::Broker::Manager::Forward().
* @return true if interest in topic prefix is no longer advertised.
*/
bool Unsubscribe(const std::string& topic_prefix);
/** /**
* Create a new *master* data store. * Create a new *master* data store.
* @param name The name of the store. * @param name The name of the store.
@ -394,6 +371,48 @@ public:
}; };
private: private:
// Register interest in peer event messages that use a certain topic prefix.
bool DoSubscribe(const std::string& topic_prefix) override;
// Unregister interest in peer event messages.
bool DoUnsubscribe(const std::string& topic_prefix) override;
// Initialization of the manager. This is called late during Zeek's
// initialization after any scripts are processed.
void DoInitPostScript() override;
// Broker doesn't do anything during Broker::Backend::init().
bool DoInit() override { return true; }
// Shuts Broker down at termination.
void DoTerminate() override;
// Broker overrides this to do its own serialization.
bool DoPublishEvent(const std::string& topic, const cluster::detail::Event& event) override;
// This should never be reached, broker itself doesn't call this and overrides
// the generic DoPublishEvent() method that would call this.
bool DoPublishEvent(const std::string& topic, const std::string& format,
const cluster::detail::byte_buffer& buf) override {
throw std::logic_error("not implemented");
}
// WriterFrontend instances are broker-aware and never call this
// method and instead call the existing PublishLogWrite() method.
//
// TODO: Move log buffering out of broker and implement.
bool DoPublishLogWrites(const logging::detail::LogWriteHeader& header,
zeek::Span<logging::detail::LogRecord> records) override {
// Not implemented by broker.
throw std::logic_error("not implemented");
}
bool DoPublishLogWrites(const logging::detail::LogWriteHeader& header, const std::string& format,
cluster::detail::byte_buffer& buf) override {
// Not implemented by broker.
throw std::logic_error("not implemented");
}
// Process events used for Broker store backed zeek tables // Process events used for Broker store backed zeek tables
void ProcessStoreEvent(broker::data msg); void ProcessStoreEvent(broker::data msg);
// Common functionality for processing insert and update events. // Common functionality for processing insert and update events.