Merge remote-tracking branch 'origin/topic/awelzel/pluggable-cluster-backends-part1'

* origin/topic/awelzel/pluggable-cluster-backends-part1:
  btest: Test Broker::make_event() together with Cluster::publish_hrw()
  btest: Add cluster dir, minimal test for enum value
  broker: Add shim plugin adding a backend component
  zeek-setup: Instantiate backend::manager
  cluster: Add to src/CMakeLists.txt
  cluster: Add Components and ComponentManager for new components
  cluster/Backend: Interface for cluster backends
  cluster/Serializer: Interface for event and log serializers
  logging: Introduce logging/Types.h
  SerialTypes/Field: Allow default construction and add move constructor
  DebugLogger: Add cluster debugging stream
  plugin: Add component enums for pluggable cluster backends
  broker: Pass frame to MakeEvent()
This commit is contained in:
Arne Welzel 2024-11-22 12:32:21 +01:00
commit 97f05b2f8c
34 changed files with 1542 additions and 28 deletions

44
CHANGES
View file

@ -1,3 +1,47 @@
7.1.0-dev.607 | 2024-11-22 12:32:21 +0100
* btest: Test Broker::make_event() together with Cluster::publish_hrw() (Arne Welzel, Corelight)
* btest: Add cluster dir, minimal test for enum value (Arne Welzel, Corelight)
* broker: Add shim plugin adding a backend component (Arne Welzel, Corelight)
For broker, this isn't really functioning, but at least makes the
CLUSTER_BACKEND_BROKER enum available.
* zeek-setup: Instantiate backend::manager (Arne Welzel, Corelight)
Required to allow registration of cluster components.
* cluster: Add to src/CMakeLists.txt (Arne Welzel, Corelight)
* cluster: Add Components and ComponentManager for new components (Arne Welzel, Corelight)
* cluster/Backend: Interface for cluster backends (Arne Welzel, Corelight)
* cluster/Serializer: Interface for event and log serializers (Arne Welzel, Corelight)
* logging: Introduce logging/Types.h (Arne Welzel, Corelight)
Header and implementation for types shared between the cluster and
logging framework. The logging framework will be adapted later to
use these. For now, the new cluster components will simply reference
them.
* SerialTypes/Field: Allow default construction and add move constructor (Arne Welzel, Corelight)
This is in preparation of using SerialTypes to serialize and
unserialize complete log batches which include Field instances
and not just Value's.
* DebugLogger: Add cluster debugging stream (Arne Welzel, Corelight)
* plugin: Add component enums for pluggable cluster backends (Arne Welzel, Corelight)
* broker: Pass frame to MakeEvent() (Arne Welzel, Corelight)
This was lost in a prior change.
7.1.0-dev.592 | 2024-11-21 16:38:55 +0100
* sumstat/non-cluster: Move last epoch processing to zeek_done() (Arne Welzel, Corelight)

View file

@ -1 +1 @@
7.1.0-dev.592
7.1.0-dev.607

View file

@ -280,6 +280,14 @@ export {
## Returns: a topic string that may used to send a message exclusively to
## a given cluster node.
global nodeid_topic: function(id: string): string;
## An event instance for cluster pub/sub.
type Event: record {
## The event handler to be invoked on the remote node.
ev: any;
## The arguments for the event.
args: vector of any;
};
}
# Track active nodes per type.

View file

@ -194,6 +194,7 @@ gen_zam_target(${GEN_ZAM_SRC_DIR})
option(USE_SQLITE "Should Zeek use SQLite?" ON)
add_subdirectory(analyzer)
add_subdirectory(cluster)
add_subdirectory(packet_analysis)
add_subdirectory(broker)
add_subdirectory(telemetry)

View file

@ -19,7 +19,8 @@ DebugLogger::Stream DebugLogger::streams[NUM_DBGS] =
{"main-loop", 0, false}, {"dpd", 0, false}, {"packet-analysis", 0, false}, {"file-analysis", 0, false},
{"tm", 0, false}, {"logging", 0, false}, {"input", 0, false}, {"threading", 0, false},
{"plugins", 0, false}, {"zeekygen", 0, false}, {"pktio", 0, false}, {"broker", 0, false},
{"scripts", 0, false}, {"supervisor", 0, false}, {"hashkey", 0, false}, {"spicy", 0, false}};
{"scripts", 0, false}, {"supervisor", 0, false}, {"hashkey", 0, false}, {"spicy", 0, false},
{"cluster", 0, false}};
DebugLogger::~DebugLogger() {
if ( file && file != stderr )

View file

@ -54,6 +54,7 @@ enum DebugStream {
DBG_SUPERVISOR, // Process supervisor
DBG_HASHKEY, // HashKey buffers
DBG_SPICY, // Spicy functionality
DBG_CLUSTER, // Cluster functionality
NUM_DBGS // Has to be last
};

View file

@ -12,3 +12,6 @@ zeek_add_subdir_library(
data.bif
messaging.bif
store.bif)
# Small plugin shim to make the CLUSTER_BACKEND_BROKER enum value available.
zeek_add_plugin(Zeek Cluster_Backend_Broker SOURCES Plugin.cc)

28
src/broker/Plugin.cc Normal file
View file

@ -0,0 +1,28 @@
#include "zeek/broker/Plugin.h"
#include <memory>
#include "zeek/cluster/Backend.h"
#include "zeek/cluster/Component.h"
#include "zeek/cluster/Serializer.h"
using namespace zeek::plugin::Zeek_Cluster_Backend_Broker;
zeek::plugin::Configuration Plugin::Configure() {
// For now, there's always the broker_mgr instance that's explicitly
// instantiated in zeek-setup.cc. Don't even allow to instantiate
// a second one via the plugin mechanism. In the future, this could
// be changed so that broker is instantiated on demand only.
auto fail_instantiate = [](std::unique_ptr<cluster::EventSerializer>,
std::unique_ptr<cluster::LogSerializer>) -> std::unique_ptr<cluster::Backend> {
zeek::reporter->FatalError("do not instantiate broker explicitly");
return nullptr;
};
AddComponent(new cluster::BackendComponent("BROKER", fail_instantiate));
zeek::plugin::Configuration config;
config.name = "Zeek::Cluster_Backend_Broker";
config.description = "Cluster backend using Broker";
return config;
}

14
src/broker/Plugin.h Normal file
View file

@ -0,0 +1,14 @@
// See the file "COPYING" in the main distribution directory for copyright.
#pragma once
#include "zeek/plugin/Plugin.h"
namespace zeek::plugin::Zeek_Cluster_Backend_Broker {
class Plugin : public zeek::plugin::Plugin {
public:
zeek::plugin::Configuration Configure() override;
} plugin;
} // namespace zeek::plugin::Zeek_Cluster_Backend_Broker

View file

@ -99,7 +99,7 @@ function Broker::make_event%(...%): Broker::Event
%{
zeek::Broker::Manager::ScriptScopeGuard ssg;
auto ev = zeek::broker_mgr->MakeEvent(ArgsSpan{*@ARGS@});
auto ev = zeek::broker_mgr->MakeEvent(ArgsSpan{*@ARGS@}, frame);
return zeek::cast_intrusive<RecordVal>(ev);
%}

214
src/cluster/Backend.cc Normal file
View file

@ -0,0 +1,214 @@
// See the file "COPYING" in the main distribution directory for copyright.
#include "zeek/cluster/Backend.h"
#include <optional>
#include "zeek/Desc.h"
#include "zeek/Event.h"
#include "zeek/EventRegistry.h"
#include "zeek/Func.h"
#include "zeek/Reporter.h"
#include "zeek/Type.h"
#include "zeek/cluster/Serializer.h"
#include "zeek/iosource/Manager.h"
using namespace zeek::cluster;
std::optional<zeek::Args> detail::check_args(const zeek::FuncValPtr& handler, zeek::ArgsSpan args) {
const auto& func_type = handler->GetType<zeek::FuncType>();
if ( func_type->Flavor() != zeek::FUNC_FLAVOR_EVENT ) {
zeek::reporter->Error("unexpected function type for %s: %s", handler->AsFunc()->GetName().c_str(),
func_type->FlavorString().c_str());
return std::nullopt;
}
const auto& types = func_type->ParamList()->GetTypes();
if ( args.size() != types.size() ) {
zeek::reporter->Error("bad number of arguments for %s: got %zu, expect %zu",
handler->AsFunc()->GetName().c_str(), args.size(), types.size());
return std::nullopt;
}
zeek::Args result(args.size());
for ( size_t i = 0; i < args.size(); i++ ) {
const auto& a = args[i];
const auto& got_type = a->GetType();
const auto& expected_type = types[i];
if ( ! same_type(got_type, expected_type) ) {
zeek::reporter->Error("event parameter #%zu type mismatch, got %s, expecting %s", i + 1,
zeek::obj_desc_short(got_type.get()).c_str(),
zeek::obj_desc_short(expected_type.get()).c_str());
return std::nullopt;
}
result[i] = args[i];
}
return result;
}
std::optional<detail::Event> Backend::MakeClusterEvent(FuncValPtr handler, ArgsSpan args, double timestamp) const {
auto checked_args = detail::check_args(handler, args);
if ( ! checked_args )
return std::nullopt;
if ( timestamp == 0.0 )
timestamp = zeek::event_mgr.CurrentEventTime();
const auto& eh = zeek::event_registry->Lookup(handler->AsFuncPtr()->GetName());
if ( ! eh ) {
zeek::reporter->Error("event registry lookup of '%s' failed", obj_desc(handler.get()).c_str());
return std::nullopt;
}
return zeek::cluster::detail::Event{eh, std::move(*checked_args), timestamp};
}
// Default implementation doing the serialization.
bool Backend::DoPublishEvent(const std::string& topic, const cluster::detail::Event& event) {
cluster::detail::byte_buffer buf;
if ( ! event_serializer->SerializeEvent(buf, event) )
return false;
return DoPublishEvent(topic, event_serializer->Name(), buf);
}
// Default implementation doing log record serialization.
bool Backend::DoPublishLogWrites(const zeek::logging::detail::LogWriteHeader& header,
zeek::Span<zeek::logging::detail::LogRecord> records) {
cluster::detail::byte_buffer buf;
if ( ! log_serializer->SerializeLogWrite(buf, header, records) )
return false;
return DoPublishLogWrites(header, log_serializer->Name(), buf);
}
bool Backend::ProcessEventMessage(const std::string_view& topic, const std::string_view& format,
const detail::byte_buffer_span payload) {
if ( format != event_serializer->Name() ) {
zeek::reporter->Error("ProcessEventMessage: Wrong format: %s vs %s", std::string{format}.c_str(),
event_serializer->Name().c_str());
return false;
}
auto r = event_serializer->UnserializeEvent(payload);
if ( ! r ) {
auto escaped =
util::get_escaped_string(std::string(reinterpret_cast<const char*>(payload.data()), payload.size()), false);
zeek::reporter->Error("Failed to unserialize message: %s: %s", std::string{topic}.c_str(), escaped.c_str());
return false;
}
auto& event = *r;
zeek::event_mgr.Enqueue(event.Handler(), std::move(event.args), util::detail::SOURCE_BROKER, 0, nullptr,
event.timestamp);
return true;
}
bool Backend::ProcessLogMessage(const std::string_view& format, detail::byte_buffer_span payload) {
// We could also dynamically lookup the right de-serializer, but
// for now assume we just receive what is configured.
if ( format != log_serializer->Name() ) {
zeek::reporter->Error("Got log message in format '%s', but have deserializer '%s'", std::string{format}.c_str(),
log_serializer->Name().c_str());
return false;
}
auto result = log_serializer->UnserializeLogWrite(payload);
if ( ! result ) {
zeek::reporter->Error("Failed to unserialize log message using '%s'", std::string{format}.c_str());
return false;
}
// TODO: Send the whole batch to the logging manager.
// return zeek::log_mgr->WritesFromRemote(result->header, std::move(result->records));
zeek::reporter->FatalError("not implemented");
return false;
}
bool ThreadedBackend::ProcessBackendMessage(int tag, detail::byte_buffer_span payload) {
return DoProcessBackendMessage(tag, payload);
}
namespace {
bool register_io_source(zeek::iosource::IOSource* src, int fd, bool dont_count) {
constexpr bool manage_lifetime = true;
zeek::iosource_mgr->Register(src, dont_count, manage_lifetime);
if ( ! zeek::iosource_mgr->RegisterFd(fd, src) ) {
zeek::reporter->Error("Failed to register messages_flare with IO manager");
return false;
}
return true;
}
} // namespace
bool ThreadedBackend::DoInit() {
// Register as counting during DoInit() to avoid Zeek from shutting down.
return register_io_source(this, messages_flare.FD(), false);
}
void ThreadedBackend::DoInitPostScript() {
// Register non-counting after parsing scripts.
register_io_source(this, messages_flare.FD(), true);
}
void ThreadedBackend::QueueForProcessing(QueueMessages&& qmessages) {
bool fire = false;
// Enqueue under lock.
{
std::scoped_lock lock(messages_mtx);
fire = messages.empty();
if ( messages.empty() ) {
messages = std::move(qmessages);
}
else {
messages.reserve(messages.size() + qmessages.size());
for ( auto& qmsg : qmessages )
messages.emplace_back(std::move(qmsg));
}
}
if ( fire )
messages_flare.Fire();
}
void ThreadedBackend::Process() {
QueueMessages to_process;
{
std::scoped_lock lock(messages_mtx);
to_process = std::move(messages);
messages_flare.Extinguish();
messages.clear();
}
for ( const auto& msg : to_process ) {
// sonarlint wants to use std::visit. not sure...
if ( auto* emsg = std::get_if<EventMessage>(&msg) ) {
ProcessEventMessage(emsg->topic, emsg->format, emsg->payload_span());
}
else if ( auto* lmsg = std::get_if<LogMessage>(&msg) ) {
ProcessLogMessage(lmsg->format, lmsg->payload_span());
}
else if ( auto* bmsg = std::get_if<BackendMessage>(&msg) ) {
ProcessBackendMessage(bmsg->tag, bmsg->payload_span());
}
else {
zeek::reporter->FatalError("Unimplemented QueueMessage %zu", msg.index());
}
}
}

399
src/cluster/Backend.h Normal file
View file

@ -0,0 +1,399 @@
// See the file "COPYING" in the main distribution directory for copyright.
// The interface for cluster backends and remote events.
#pragma once
#include <memory>
#include <mutex>
#include <optional>
#include <string_view>
#include <variant>
#include "zeek/EventHandler.h"
#include "zeek/Flare.h"
#include "zeek/IntrusivePtr.h"
#include "zeek/Span.h"
#include "zeek/cluster/Serializer.h"
#include "zeek/iosource/IOSource.h"
#include "zeek/logging/Types.h"
namespace zeek {
class FuncVal;
using FuncValPtr = IntrusivePtr<FuncVal>;
class Val;
using ValPtr = IntrusivePtr<Val>;
using ArgsSpan = Span<const ValPtr>;
namespace cluster {
namespace detail {
/**
* Cluster event class.
*/
class Event {
public:
/**
* Constructor.
*/
Event(const EventHandlerPtr& handler, zeek::Args args, double timestamp = 0.0)
: handler(handler), args(std::move(args)), timestamp(timestamp) {}
EventHandlerPtr handler;
zeek::Args args;
double timestamp; // TODO: This should be more generic, possibly holding a
// vector of key/value metadata, rather than just
// the timestamp.
std::string_view HandlerName() const { return handler->Name(); }
const EventHandlerPtr& Handler() const { return handler; }
};
/**
* Validate that the provided args are suitable for handler.
*
* @param handler An event handler.
* @param args The provide arguments for the handler as a span.
*
* @return A zeek::Args instance if successful, else std::nullopt.
*/
std::optional<zeek::Args> check_args(const zeek::FuncValPtr& handler, zeek::ArgsSpan args);
} // namespace detail
/**
* Interface for a cluster backend implementing publish/subscribe communication.
* Serialization of events should be done using the serializers injected into
* the constructor.
*/
class Backend {
public:
virtual ~Backend() = default;
/**
* Hook invoked after all scripts have been parsed.
*/
void InitPostScript() { DoInitPostScript(); }
/**
* Method invoked from the Cluster::Backend::__init() bif.
*/
bool Init() { return DoInit(); }
/**
* Hook invoked when Zeek is about to terminate.
*/
void Terminate() { DoTerminate(); }
/**
* Create a cluster::detail::Event instance given an event handler and the
* script function arguments to it.
*
* @param handler A function val representing an event handler.
* @param args The arguments for the event handler.
* @param timestamp The network time to add to the event as metadata.
*/
std::optional<detail::Event> MakeClusterEvent(FuncValPtr handler, ArgsSpan args, double timestamp = 0.0) const;
/**
* Publish a cluster::detail::Event instance to a given topic.
*
* @param topic The topic string to publish the event to.
* @param event The event to publish.
*
* @return true if the event was successfully published.
*/
bool PublishEvent(const std::string& topic, const cluster::detail::Event& event) {
return DoPublishEvent(topic, event);
}
/**
* Register interest in messages that use a certain topic prefix.
*
* @param topic_prefix a prefix to match against remote message topics.
* @return true if it's a new event subscription and it is now registered.
*/
bool Subscribe(const std::string& topic_prefix) { return DoSubscribe(topic_prefix); }
/**
* Unregister interest in messages on a certain topic.
*
* @param topic_prefix a prefix previously supplied to Subscribe()
* @return true if interest in topic prefix is no longer advertised.
*/
bool Unsubscribe(const std::string& topic_prefix) { return DoUnsubscribe(topic_prefix); }
/**
* Publish multiple log records.
*
* All log records belong to the (stream, filter, path) tuple that is
* described by \a header.
*
* @param header Fixed information about the stream, writer, filter and schema of the records.
* @param records A span of logging::detail::LogRecords to be published.
*/
bool PublishLogWrites(const zeek::logging::detail::LogWriteHeader& header,
zeek::Span<zeek::logging::detail::LogRecord> records) {
return DoPublishLogWrites(header, records);
}
protected:
/**
* Constructor.
*/
Backend(std::unique_ptr<EventSerializer> es, std::unique_ptr<LogSerializer> ls)
: event_serializer(std::move(es)), log_serializer(std::move(ls)) {}
/**
* Process an incoming event message.
*/
bool ProcessEventMessage(const std::string_view& topic, const std::string_view& format,
detail::byte_buffer_span payload);
/**
* Process an incoming log message.
*/
bool ProcessLogMessage(const std::string_view& format, detail::byte_buffer_span payload);
private:
/**
* Called after all Zeek scripts have been loaded.
*
* A cluster backend should initialize itself based on script variables,
* register any IO sources, etc. It should not yet start any connections, that
* should happen in DoInit() instead.
*/
virtual void DoInitPostScript() = 0;
/**
* Called from Cluster::Backend::__init().
*
* Backend implementations should start connections with
* remote systems or other nodes, open listening ports or
* do whatever is needed to be functional.
*/
virtual bool DoInit() = 0;
/**
* Called at termination time.
*
* This should be used to shut down connectivity. Any last messages
* to be published should be sent from script land, rather than in
* DoTerminate(). A backend may wait for a bounded and configurable
* amount of time to flush any last messages out.
*/
virtual void DoTerminate() = 0;
/**
* Publish a cluster::detail::Event to the given topic.
*
* The default implementation serializes to a detail::byte_buffer and
* calls DoPublishEvent() with the resulting buffer.
*
* This hook method only exists for the existing Broker implementation that
* short-circuits serialization. Other backends should not override this.
*/
virtual bool DoPublishEvent(const std::string& topic, const cluster::detail::Event& event);
/**
* Send a serialized cluster::detail::Event to the given topic.
*
* Semantics of this call are "fire-and-forget". An implementation should
* ensure the message is enqueued for delivery, but may not have been sent out
* let alone received by any subscribers of the topic when this call returns.
*
* If the backend has not established a connection, the published message is
* allowed to be discarded.
*
* @param topic a topic string associated with the message.
* @param format the format/serializer used for serialization of the message payload.
* @param buf the serialized Event.
* @return true if the message has been published successfully.
*/
virtual bool DoPublishEvent(const std::string& topic, const std::string& format,
const detail::byte_buffer& buf) = 0;
/**
* Register interest in messages that use a certain topic prefix.
*
* If the backend hasn't yet established a connection, any subscriptions
* should be queued until they can be processed.
*
* @param topic_prefix a prefix to match against remote message topics.
*
* @return true if it's a new event subscription and now registered.
*/
virtual bool DoSubscribe(const std::string& topic_prefix) = 0;
/**
* Unregister interest in messages on a certain topic.
*
* @param topic_prefix a prefix previously supplied to Subscribe()
* @return true if interest in topic prefix is no longer advertised.
*/
virtual bool DoUnsubscribe(const std::string& topic_prefix) = 0;
/**
* Serialize a log batch, then forward it to DoPublishLogWrites() below.
* The default implementation serializes to a detail::byte_buffer and
* calls DoPublishLogWrites() with the resulting buffer.
*
* This hook method only exists for the existing Broker implementation that
* short-circuits serialization. Other backends should not override this.
*
* @param header The header describing the writer frontend where the records originate from.
* @param records Records to be serialized.
*
* @return true if the message has been published successfully.
*/
virtual bool DoPublishLogWrites(const zeek::logging::detail::LogWriteHeader& header,
zeek::Span<zeek::logging::detail::LogRecord> records);
/**
* Send out a serialized log batch.
*
* A backend implementation may use the values from \a header to
* construct a topic to write the logs to.
*
* Semantics of this call are "fire-and-forget". An implementation should
* ensure the message is enqueued for delivery, but may not have been sent out
* let alone received by the destination when this call returns.
*
* Sharding log writes to multiple receivers (logger nodes) is backend specific.
* Broker, for example, involves Zeek script layer cluster pool concepts.
* Other backends may use appropriate native mechanisms that may be more
* efficient.
*
* @param header the header describing the writer frontend where the records originate from.
* @param format the format/serializer used for serialization of the message payload.
* @param buf the serialized log batch. This is the message payload.
* @return true if the message has been published successfully.
*/
virtual bool DoPublishLogWrites(const zeek::logging::detail::LogWriteHeader& header, const std::string& format,
detail::byte_buffer& buf) = 0;
std::unique_ptr<EventSerializer> event_serializer;
std::unique_ptr<LogSerializer> log_serializer;
};
/**
* A cluster backend may receive event and log messages asynchronously
* through threads. The following structs can be used with QueueForProcessing()
* to enqueue these messages onto the main IO loop for processing.
*
* EventMessage and LogMessage are processed in a generic fashion in
* the Process() method. The BackendMessage can be intercepted with
* DoProcessBackendMessage(). DoProcessBackendMessage() is guaranteed
* to run on Zeek's main thread.
*/
/**
* A message on a topic for events was received.
*/
struct EventMessage {
std::string topic;
std::string format;
detail::byte_buffer payload;
auto payload_span() const { return Span(payload.data(), payload.size()); };
};
/**
* A message that represents log records.
*/
struct LogMessage {
std::string format;
detail::byte_buffer payload;
auto payload_span() const { return Span(payload.data(), payload.size()); };
};
/**
* A backend specific message.
*
* This provides a mechanism to transfer auxiliary information
* from a background thread to Zeek's main thread.
*/
struct BackendMessage {
int tag;
detail::byte_buffer payload;
auto payload_span() const { return Span(payload.data(), payload.size()); };
};
using QueueMessage = std::variant<EventMessage, LogMessage, BackendMessage>;
using QueueMessages = std::vector<QueueMessage>;
/**
* Support for backends that use background threads or invoke
* callbacks on non-main threads.
*/
class ThreadedBackend : public Backend, public zeek::iosource::IOSource {
public:
using Backend::Backend;
protected:
/**
* To be used by implementations to enqueue messages for processing on the IO loop.
*
* It's safe to call this method from any thread.
*
* @param messages Messages to be enqueued.
*/
void QueueForProcessing(QueueMessages&& messages);
void Process() override;
double GetNextTimeout() override { return -1; }
/**
* The DoInitPostScript() implementation of ThreadedBackend
* registers itself as a non-counting IO source.
*
* Classes deriving from ThreadedBackend and providing their
* own DoInitPostScript() method should invoke the ThreadedBackend's
* implementation to register themselves as a non-counting
* IO source with the IO loop.
*/
void DoInitPostScript() override;
/**
* The default DoInit() implementation of ThreadedBackend
* registers itself as a counting IO source to keep the IO
* loop alive after initialization.
*
* Classes deriving from ThreadedBackend and providing their
* own DoInit() method should invoke the ThreadedBackend's
* implementation to register themselves as a counting
* IO source with the IO loop.
*/
bool DoInit() override;
private:
/**
* Process a backend specific message queued as BackendMessage.
*/
bool ProcessBackendMessage(int tag, detail::byte_buffer_span payload);
/**
* If a cluster backend produces messages of type BackendMessage,
* this method will be invoked by the main thread to process it.
*/
virtual bool DoProcessBackendMessage(int tag, detail::byte_buffer_span payload) { return false; };
// Members used for communication with the main thread.
std::mutex messages_mtx;
std::vector<QueueMessage> messages;
zeek::detail::Flare messages_flare;
};
// Cluster backend instance used for publish() and subscribe() calls.
extern Backend* backend;
} // namespace cluster
} // namespace zeek

View file

@ -0,0 +1,9 @@
zeek_add_subdir_library(
cluster
INCLUDE_DIRS
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_BINARY_DIR}
SOURCES
Component.cc
Backend.cc
Manager.cc)

56
src/cluster/Component.cc Normal file
View file

@ -0,0 +1,56 @@
// See the file "COPYING" in the main distribution directory for copyright.
#include "zeek/cluster/Component.h"
#include "zeek/Desc.h"
#include "zeek/Tag.h"
#include "zeek/cluster/Manager.h"
#include "zeek/util.h"
using namespace zeek::cluster;
BackendComponent::BackendComponent(const std::string& name, factory_callback arg_factory)
: plugin::Component(plugin::component::CLUSTER_BACKEND, name, 0, cluster::manager->Backends().GetTagType()) {
factory = arg_factory;
}
void BackendComponent::Initialize() {
InitializeTag();
cluster::manager->Backends().RegisterComponent(this, "CLUSTER_BACKEND_");
}
void BackendComponent::DoDescribe(ODesc* d) const {
d->Add("Cluster::CLUSTER_BACKEND_");
d->Add(CanonicalName());
}
EventSerializerComponent::EventSerializerComponent(const std::string& name, factory_callback arg_factory)
: plugin::Component(plugin::component::EVENT_SERIALIZER, name, 0,
cluster::manager->EventSerializers().GetTagType()) {
factory = arg_factory;
}
void EventSerializerComponent::Initialize() {
InitializeTag();
cluster::manager->EventSerializers().RegisterComponent(this, "EVENT_SERIALIZER_");
}
void EventSerializerComponent::DoDescribe(ODesc* d) const {
d->Add("Cluster::EVENT_SERIALIZER_");
d->Add(CanonicalName());
}
LogSerializerComponent::LogSerializerComponent(const std::string& name, factory_callback arg_factory)
: plugin::Component(plugin::component::LOG_SERIALIZER, name, 0, cluster::manager->LogSerializers().GetTagType()) {
factory = arg_factory;
}
void LogSerializerComponent::Initialize() {
InitializeTag();
cluster::manager->LogSerializers().RegisterComponent(this, "LOG_SERIALIZER_");
}
void LogSerializerComponent::DoDescribe(ODesc* d) const {
d->Add("Cluster::LOG_SERIALIZER_");
d->Add(CanonicalName());
}

131
src/cluster/Component.h Normal file
View file

@ -0,0 +1,131 @@
// See the file "COPYING" in the main distribution directory for copyright.
#pragma once
#include <memory>
#include "zeek/cluster/Backend.h"
#include "zeek/cluster/Serializer.h"
#include "zeek/plugin/Component.h"
namespace zeek::cluster {
class BackendComponent : public plugin::Component {
public:
using factory_callback = std::unique_ptr<Backend> (*)(std::unique_ptr<EventSerializer>,
std::unique_ptr<LogSerializer>);
/**
* Constructor.
*
* @param name The name of the cluster backend. A Zeek script-level enum
* with the name Cluster::CLUSTER_BACKEND_<NAME> will be created.
*
* @param factory A factory function to instantiate instances of the
* cluster backend.
*/
BackendComponent(const std::string& name, factory_callback factory);
/**
* Destructor.
*/
~BackendComponent() override = default;
/**
* Initialization function. This function has to be called before any
* plugin component functionality is used; it is used to add the
* plugin component to the list of components and to initialize tags
*/
void Initialize() override;
/**
* Returns the analyzer's factory function.
*/
factory_callback Factory() const { return factory; }
protected:
void DoDescribe(ODesc* d) const override;
private:
factory_callback factory;
};
class EventSerializerComponent : public plugin::Component {
public:
using factory_callback = std::unique_ptr<EventSerializer> (*)();
/**
* Constructor.
*
* @param name The name of the event serializer. A Zeek script-level enum
* with the name Cluster::EVENT_SERIALIZER_<NAME> will be created.
*
* @param factory A factory function to instantiate instances of the
* event serializer.
*/
EventSerializerComponent(const std::string& name, factory_callback factory);
/**
* Destructor.
*/
~EventSerializerComponent() override = default;
/**
* Initialization function. This function has to be called before any
* plugin component functionality is used; it is used to add the
* plugin component to the list of components and to initialize tags
*/
void Initialize() override;
/**
* Returns the analyzer's factory function.
*/
factory_callback Factory() const { return factory; }
protected:
void DoDescribe(ODesc* d) const override;
private:
factory_callback factory;
};
class LogSerializerComponent : public plugin::Component {
public:
using factory_callback = std::unique_ptr<LogSerializer> (*)();
/**
* Constructor.
*
* @param name The name of the log serializer. A Zeek script-level enum
* with the name Cluster::LOG_SERIALIZER_<NAME> will be created.
*
* @param factory A factory function to instantiate instances of the
* log serializer.
*/
LogSerializerComponent(const std::string& name, factory_callback factory);
/**
* Destructor.
*/
~LogSerializerComponent() override = default;
/**
* Initialization function. This function has to be called before any
* plugin component functionality is used; it is used to add the
* plugin component to the list of components and to initialize tags
*/
void Initialize() override;
/**
* Returns the analyzer's factory function.
*/
factory_callback Factory() const { return factory; }
protected:
void DoDescribe(ODesc* d) const override;
private:
factory_callback factory;
};
} // namespace zeek::cluster

27
src/cluster/Manager.cc Normal file
View file

@ -0,0 +1,27 @@
#include "zeek/cluster/Manager.h"
#include "zeek/cluster/Serializer.h"
using namespace zeek::cluster;
Manager::Manager()
: backends(plugin::ComponentManager<BackendComponent>("Cluster", "BackendTag")),
event_serializers(plugin::ComponentManager<EventSerializerComponent>("Cluster", "EventSerializerTag")),
log_serializers(plugin::ComponentManager<LogSerializerComponent>("Cluster", "LogSerializerTag")) {}
std::unique_ptr<Backend> Manager::InstantiateBackend(const zeek::EnumValPtr& tag,
std::unique_ptr<EventSerializer> event_serializer,
std::unique_ptr<LogSerializer> log_serializer) {
const BackendComponent* c = Backends().Lookup(tag);
return c ? c->Factory()(std::move(event_serializer), std::move(log_serializer)) : nullptr;
}
std::unique_ptr<EventSerializer> Manager::InstantiateEventSerializer(const zeek::EnumValPtr& tag) {
const EventSerializerComponent* c = EventSerializers().Lookup(tag);
return c ? c->Factory()() : nullptr;
}
std::unique_ptr<LogSerializer> Manager::InstantiateLogSerializer(const zeek::EnumValPtr& tag) {
const LogSerializerComponent* c = LogSerializers().Lookup(tag);
return c ? c->Factory()() : nullptr;
}

80
src/cluster/Manager.h Normal file
View file

@ -0,0 +1,80 @@
// See the file "COPYING" in the main distribution directory for copyright.
#pragma once
#include <memory>
#include "zeek/cluster/Component.h"
#include "zeek/cluster/Serializer.h"
#include "zeek/plugin/ComponentManager.h"
namespace zeek::cluster {
/**
* Manager to allow registration of cluster components.
*
* This manager holds three component manager for event and log serializers
* components, as well as backend components themselves.
*/
class Manager {
public:
Manager();
/**
* Instantiate a cluster backend with the given enum value and
* pre-instantiated event and log serializers.
*
* @param tag The enum value identifying the backend.
* @param event_serializer The event serializer to inject.
* @param log_serializer The log serializer to inject.
*
* @return New ClusterBackend instance, or null if there's no such component.
*/
std::unique_ptr<Backend> InstantiateBackend(const EnumValPtr& tag,
std::unique_ptr<EventSerializer> event_serializer,
std::unique_ptr<LogSerializer> log_serializer);
/**
* Instantiate a event serializer with the given enum value.
*
* @param tag The enum value identifying a serializer.
*
* @return New Serializer instance, or null if there's no such component.
*/
std::unique_ptr<EventSerializer> InstantiateEventSerializer(const EnumValPtr& tag);
/**
* Instantiate a log serializer with the given enum value.
*
* @param tag The enum value identifying a serializer.
*
* @return New Serializer instance, or null if there's no such component.
*/
std::unique_ptr<LogSerializer> InstantiateLogSerializer(const EnumValPtr& tag);
/**
* @return The ComponentManager for backends.
*/
plugin::ComponentManager<BackendComponent>& Backends() { return backends; };
/**
* @return The ComponentManager for event serializers.
*/
plugin::ComponentManager<EventSerializerComponent>& EventSerializers() { return event_serializers; };
/**
* @return The ComponentManager for serializers.
*/
plugin::ComponentManager<LogSerializerComponent>& LogSerializers() { return log_serializers; };
private:
plugin::ComponentManager<BackendComponent> backends;
plugin::ComponentManager<EventSerializerComponent> event_serializers;
plugin::ComponentManager<LogSerializerComponent> log_serializers;
};
// This manager instance only exists for plugins to register components,
// not for actual cluster functionality.
extern Manager* manager;
} // namespace zeek::cluster

107
src/cluster/Serializer.h Normal file
View file

@ -0,0 +1,107 @@
// See the file "COPYING" in the main distribution directory for copyright.
// Interfaces to be implemented by event and log serializer components.
#pragma once
#include <optional>
#include <string>
#include <vector>
#include "zeek/Span.h"
#include "zeek/logging/Types.h"
namespace zeek::cluster {
namespace detail {
class Event;
using byte_buffer = std::vector<std::byte>;
using byte_buffer_span = Span<const std::byte>;
} // namespace detail
/**
* This class handles encoding of events into byte buffers and back.
*
* An event and its parameters can be serialized as a message which
* another node can unserialize and then enqueue as an event.
*/
class EventSerializer {
public:
virtual ~EventSerializer() = default;
/**
* Serialize an event into the given byte buffer.
*
* @param buf The buffer to use for serialization.
* @param event The event to serialize.
*
* @returns True on success, false in exceptional cases (e.g. unsupported serialization).
*/
virtual bool SerializeEvent(detail::byte_buffer& buf, const detail::Event& event) = 0;
/**
* Unserialize an event from a given byte buffer.
*
* @param buf A span representing a received remote event.
*
* @returns The event, or std::nullopt on error.
*/
virtual std::optional<cluster::detail::Event> UnserializeEvent(detail::byte_buffer_span buf) = 0;
/**
* @returns The name of this event serializer instance.
*/
const std::string& Name() { return name; }
protected:
/**
* Constructor.
*/
EventSerializer(std::string name) : name(std::move(name)) {}
private:
std::string name;
};
/**
* Interface for a serializer for logging::LogRecord instances.
*/
class LogSerializer {
public:
/**
* Constructor.
*/
explicit LogSerializer(std::string name) : name(std::move(name)) {};
virtual ~LogSerializer() = default;
/**
* Serialize log records into a byte buffer.
*
* @param buf The buffer to serialize into.
* @param header The log batch header.
* @param records The actual log writes.
*/
virtual bool SerializeLogWrite(detail::byte_buffer& buf, const logging::detail::LogWriteHeader& header,
zeek::Span<logging::detail::LogRecord> records) = 0;
/**
* Unserialize log writes from a given byte buffer.
*
* @param buf The span representing received log writes.
*/
virtual std::optional<logging::detail::LogWriteBatch> UnserializeLogWrite(detail::byte_buffer_span buf) = 0;
/**
* @returns The name of this log serializer instance.
*/
const std::string& Name() { return name; }
private:
std::string name;
};
} // namespace zeek::cluster

View file

@ -5,6 +5,7 @@ zeek_add_subdir_library(
Manager.cc
WriterBackend.cc
WriterFrontend.cc
Types.cc
BIFS
logging.bif)

48
src/logging/Types.cc Normal file
View file

@ -0,0 +1,48 @@
// See the file "COPYING" in the main distribution directory for copyright.
#include "zeek/logging/Types.h"
#include "zeek/Desc.h"
#include "zeek/Type.h"
#include "zeek/Val.h"
namespace zeek::logging::detail {
LogWriteHeader::LogWriteHeader() = default;
LogWriteHeader::LogWriteHeader(EnumValPtr arg_stream_id, EnumValPtr arg_writer_id, std::string arg_filter_name,
std::string arg_path)
: stream_id(std::move(arg_stream_id)),
writer_id(std::move(arg_writer_id)),
filter_name(std::move(arg_filter_name)),
path(std::move(arg_path)) {
stream_name = obj_desc_short(stream_id.get());
writer_name = obj_desc_short(writer_id.get());
}
LogWriteHeader& LogWriteHeader::operator=(const LogWriteHeader& other) = default;
LogWriteHeader::~LogWriteHeader() = default;
bool LogWriteHeader::PopulateEnumVals() {
static const auto& stream_id_type = zeek::id::find_type<zeek::EnumType>("Log::ID");
static const auto& writer_id_type = zeek::id::find_type<zeek::EnumType>("Log::Writer");
if ( stream_name.empty() || writer_name.empty() )
return false;
auto sid = stream_id_type->Lookup(stream_name);
if ( sid < 0 )
return false;
auto wid = writer_id_type->Lookup(writer_name);
if ( wid < 0 )
return false;
stream_id = stream_id_type->GetEnumVal(sid);
writer_id = writer_id_type->GetEnumVal(wid);
return true;
}
} // namespace zeek::logging::detail

99
src/logging/Types.h Normal file
View file

@ -0,0 +1,99 @@
// See the file "COPYING" in the main distribution directory for copyright.
// Header for types shared between cluster and logging components.
//
// Currently these are in detail, but over time may move into the
// public namespace once established.
#pragma once
#include <string>
#include <vector>
#include "zeek/IntrusivePtr.h"
#include "zeek/threading/SerialTypes.h"
namespace zeek {
class EnumVal;
using EnumValPtr = IntrusivePtr<EnumVal>;
namespace logging::detail {
/**
* A single log record.
*
* This is what a Zeek record value passed into Log::write()
* is converted into before passed to a local log writer or
* send via the cluster to a remote node.
*/
using LogRecord = std::vector<threading::Value>;
/**
* A struct holding all necessary information that relates to
* log writes for a given path. These values are constant over
* the lifetime of a \a WriterFrontend.
*
* Note that the constructor, destructor and assignment operator are
* defaulted in Types.cc. This is to avoid a Val.h include here.
*/
struct LogWriteHeader {
/**
* Default constructor.
*/
LogWriteHeader();
/**
* Constructor that populates stream_name and writer_name.
*
* @param stream_id Enum value representing the stream.
* @param writer_id Enum value representing the writer.
* @param filter_name The filter name of the writer frontend.
* @param path The path of the writer frontend.
*/
LogWriteHeader(EnumValPtr stream_id, EnumValPtr writer_id, std::string filter_name, std::string path);
/**
* Assignment operator.
*/
LogWriteHeader& operator=(const LogWriteHeader& other);
/**
* Destructor.
*/
~LogWriteHeader();
/**
* Helper to populate stream_id and writer_id after the
* stream_name and writer_name members were set.
*
* @return true if matching enum values were, else false.
*/
bool PopulateEnumVals();
EnumValPtr stream_id; // The enum identifying the stream.
std::string stream_name; // The name of the stream, e.g. Conn::LOG
EnumValPtr writer_id; // The enum identifying the writer. Mostly for backwards compat with broker.
std::string writer_name; // The name of the writer, e.g. WRITER_ASCII.
std::string filter_name; // The name of the filter.
std::string path; // The path as configured or produced by the filter's path_func.
std::vector<threading::Field> fields; // The schema describing a log record.
};
/**
* A batch of log records including their header.
*
* This is the object created when unserialzing a log-write
* from another cluster node.
*
* This layout currently implies that during de-serialization
* data is copied into LogRecord / threading::Value structures.
* If the need for zero-copy approaches arises, might need a
* different approach to free the underlying buffer (capnproto).
*/
struct LogWriteBatch {
LogWriteHeader header;
std::vector<LogRecord> records;
};
} // namespace logging::detail
} // namespace zeek

View file

@ -39,6 +39,12 @@ void Component::Describe(ODesc* d) const {
case component::SESSION_ADAPTER: d->Add("Session Adapter"); break;
case component::CLUSTER_BACKEND: d->Add("Cluster Backend"); break;
case component::EVENT_SERIALIZER: d->Add("Event Serializer"); break;
case component::LOG_SERIALIZER: d->Add("Log Serializer"); break;
default:
reporter->InternalWarning("unknown component type in plugin::Component::Describe");
d->Add("<unknown component type>");

View file

@ -21,15 +21,18 @@ namespace component {
* Component types.
*/
enum Type {
READER, /// An input reader (not currently used).
WRITER, /// A logging writer (not currently used).
ANALYZER, /// A protocol analyzer.
PACKET_ANALYZER, /// A packet analyzer.
FILE_ANALYZER, /// A file analyzer.
IOSOURCE, /// An I/O source, excluding packet sources.
PKTSRC, /// A packet source.
PKTDUMPER, /// A packet dumper.
SESSION_ADAPTER, /// A session adapter analyzer.
READER, /// An input reader (not currently used).
WRITER, /// A logging writer (not currently used).
ANALYZER, /// A protocol analyzer.
PACKET_ANALYZER, /// A packet analyzer.
FILE_ANALYZER, /// A file analyzer.
IOSOURCE, /// An I/O source, excluding packet sources.
PKTSRC, /// A packet source.
PKTDUMPER, /// A packet dumper.
SESSION_ADAPTER, /// A session adapter analyzer.
CLUSTER_BACKEND, /// A cluster backend.
EVENT_SERIALIZER, /// A serializer for events, used by cluster backends.
LOG_SERIALIZER, /// A serializer for log batches, used by cluster backends.
};
} // namespace component

View file

@ -19,13 +19,21 @@ namespace zeek::threading {
* Definition of a log file, i.e., one column of a log stream.
*/
struct Field {
const char* name; //! Name of the field.
const char* name = nullptr; //! Name of the field.
//! Needed by input framework. Port fields have two names (one for the
//! port, one for the type), and this specifies the secondary name.
const char* secondary_name;
TypeTag type; //! Type of the field.
TypeTag subtype; //! Inner type for sets and vectors.
bool optional; //! True if field is optional.
const char* secondary_name = nullptr;
TypeTag type = TYPE_ERROR; //! Type of the field.
TypeTag subtype = TYPE_ERROR; //! Inner type for sets and vectors.
bool optional = false; //! True if field is optional.
/**
* Constructor.
*
* For Read() usage. Initializes with TYPE_ERROR.
*/
Field() = default;
/**
* Constructor.
@ -47,6 +55,23 @@ struct Field {
subtype(other.subtype),
optional(other.optional) {}
/**
* Move constructor.
*/
Field(Field&& other) noexcept {
name = other.name;
secondary_name = other.secondary_name;
type = other.type;
subtype = other.subtype;
optional = other.optional;
other.name = nullptr;
other.secondary_name = nullptr;
other.type = TYPE_ERROR;
other.subtype = TYPE_ERROR;
other.optional = false;
}
~Field() {
delete[] name;
delete[] secondary_name;
@ -91,10 +116,6 @@ struct Field {
* thread-safe.
*/
std::string TypeName() const;
private:
// Force usage of constructor above.
Field() {}
};
/**

View file

@ -50,6 +50,7 @@
#include "zeek/analyzer/Manager.h"
#include "zeek/binpac_zeek.h"
#include "zeek/broker/Manager.h"
#include "zeek/cluster/Manager.h"
#include "zeek/file_analysis/Manager.h"
#include "zeek/input.h"
#include "zeek/input/Manager.h"
@ -177,6 +178,8 @@ zeek::detail::trigger::Manager* zeek::detail::trigger_mgr = nullptr;
zeek::spicy::Manager* zeek::spicy_mgr = nullptr;
#endif
zeek::cluster::Manager* zeek::cluster::manager = nullptr;
std::vector<std::string> zeek::detail::zeek_script_prefixes;
zeek::detail::Stmt* zeek::detail::stmts = nullptr;
zeek::EventRegistry* zeek::event_registry = nullptr;
@ -389,6 +392,7 @@ static void terminate_zeek() {
delete packet_mgr;
delete analyzer_mgr;
delete file_mgr;
delete cluster::manager;
// broker_mgr, timer_mgr, supervisor, and dns_mgr are deleted via iosource_mgr
delete iosource_mgr;
delete event_registry;
@ -666,6 +670,7 @@ SetupResult setup(int argc, char** argv, Options* zopts) {
log_mgr = new logging::Manager();
input_mgr = new input::Manager();
file_mgr = new file_analysis::Manager();
cluster::manager = new cluster::Manager();
auto broker_real_time = ! options.pcap_file && ! options.deterministic_mode;
broker_mgr = new Broker::Manager(broker_real_time);
trigger_mgr = new trigger::Manager();

View file

@ -313,6 +313,14 @@ void ScriptInfo::DoInitPostScript() {
const auto& id = zeek::detail::global_scope()->Find("Log::Writer");
types.push_back(new IdentifierInfo(id, this));
}
else if ( name == "base/frameworks/cluster/main.zeek" ) {
const auto& backend_id = zeek::detail::global_scope()->Find("Cluster::BackendTag");
types.push_back(new IdentifierInfo(backend_id, this));
const auto& event_serializer_id = zeek::detail::global_scope()->Find("Cluster::EventSerializerTag");
types.push_back(new IdentifierInfo(event_serializer_id, this));
const auto& log_serializer_id = zeek::detail::global_scope()->Find("Cluster::LogSerializerTag");
types.push_back(new IdentifierInfo(log_serializer_id, this));
}
}
vector<string> ScriptInfo::GetComments() const { return comments; }

View file

@ -0,0 +1,5 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
Zeek::Cluster_Backend_Broker - Cluster backend using Broker (built-in)
[Cluster Backend] BROKER (Cluster::CLUSTER_BACKEND_BROKER)
Cluster::CLUSTER_BACKEND_BROKER, Cluster::BackendTag

View file

@ -7,10 +7,10 @@
#open XXXX-XX-XX-XX-XX-XX
#fields ts fuid uid id.orig_h id.orig_p id.resp_h id.resp_p source depth analyzers mime_type filename duration local_orig is_orig seen_bytes total_bytes missing_bytes overflow_bytes timedout parent_fuid md5 sha1 sha256
#types time string string addr port addr port string count set[string] string string interval bool bool count count count count bool string string string string
XXXXXXXXXX.XXXXXX FgN3AE3of2TRIqaeQe CHhAvVGS1DHFjwGM9 192.168.4.149 60623 74.125.239.129 443 SSL 0 SHA256,X509,SHA1,MD5 application/x-x509-user-cert - 0.000000 F F 1859 - 0 0 F - 7af07aca6d5c6e8e87fe4bb34786edc0 548b9e03bc183d1cd39f93a37985cb3950f8f06f 6bacfa4536150ed996f2b0c05ab6e345a257225f449aeb9d2018ccd88f4ede43
XXXXXXXXXX.XXXXXX Fv2Agc4z5boBOacQi6 CHhAvVGS1DHFjwGM9 192.168.4.149 60623 74.125.239.129 443 SSL 0 SHA256,X509,SHA1,MD5 application/x-x509-ca-cert - 0.000000 F F 1032 - 0 0 F - 9e4ac96474245129d9766700412a1f89 d83c1a7f4d0446bb2081b81a1670f8183451ca24 a047a37fa2d2e118a4f5095fe074d6cfe0e352425a7632bf8659c03919a6c81d
XXXXXXXXXX.XXXXXX Ftmyeg2qgI2V38Dt3g CHhAvVGS1DHFjwGM9 192.168.4.149 60623 74.125.239.129 443 SSL 0 SHA256,X509,SHA1,MD5 application/x-x509-ca-cert - 0.000000 F F 897 - 0 0 F - 2e7db2a31d0e3da4b25f49b9542a2e1a 7359755c6df9a0abc3060bce369564c8ec4542a3 3c35cc963eb004451323d3275d05b353235053490d9cd83729a2faf5e7ca1cc0
XXXXXXXXXX.XXXXXX FUFNf84cduA0IJCp07 ClEkJM2Vm5giqnMf4h 192.168.4.149 60624 74.125.239.129 443 SSL 0 SHA256,X509,SHA1,MD5 application/x-x509-user-cert - 0.000000 F F 1859 - 0 0 F - 7af07aca6d5c6e8e87fe4bb34786edc0 548b9e03bc183d1cd39f93a37985cb3950f8f06f 6bacfa4536150ed996f2b0c05ab6e345a257225f449aeb9d2018ccd88f4ede43
XXXXXXXXXX.XXXXXX F1H4bd2OKGbLPEdHm4 ClEkJM2Vm5giqnMf4h 192.168.4.149 60624 74.125.239.129 443 SSL 0 SHA256,X509,SHA1,MD5 application/x-x509-ca-cert - 0.000000 F F 1032 - 0 0 F - 9e4ac96474245129d9766700412a1f89 d83c1a7f4d0446bb2081b81a1670f8183451ca24 a047a37fa2d2e118a4f5095fe074d6cfe0e352425a7632bf8659c03919a6c81d
XXXXXXXXXX.XXXXXX Fgsbci2jxFXYMOHOhi ClEkJM2Vm5giqnMf4h 192.168.4.149 60624 74.125.239.129 443 SSL 0 SHA256,X509,SHA1,MD5 application/x-x509-ca-cert - 0.000000 F F 897 - 0 0 F - 2e7db2a31d0e3da4b25f49b9542a2e1a 7359755c6df9a0abc3060bce369564c8ec4542a3 3c35cc963eb004451323d3275d05b353235053490d9cd83729a2faf5e7ca1cc0
XXXXXXXXXX.XXXXXX FgN3AE3of2TRIqaeQe CHhAvVGS1DHFjwGM9 192.168.4.149 60623 74.125.239.129 443 SSL 0 X509,SHA256,SHA1,MD5 application/x-x509-user-cert - 0.000000 F F 1859 - 0 0 F - 7af07aca6d5c6e8e87fe4bb34786edc0 548b9e03bc183d1cd39f93a37985cb3950f8f06f 6bacfa4536150ed996f2b0c05ab6e345a257225f449aeb9d2018ccd88f4ede43
XXXXXXXXXX.XXXXXX Fv2Agc4z5boBOacQi6 CHhAvVGS1DHFjwGM9 192.168.4.149 60623 74.125.239.129 443 SSL 0 X509,SHA256,SHA1,MD5 application/x-x509-ca-cert - 0.000000 F F 1032 - 0 0 F - 9e4ac96474245129d9766700412a1f89 d83c1a7f4d0446bb2081b81a1670f8183451ca24 a047a37fa2d2e118a4f5095fe074d6cfe0e352425a7632bf8659c03919a6c81d
XXXXXXXXXX.XXXXXX Ftmyeg2qgI2V38Dt3g CHhAvVGS1DHFjwGM9 192.168.4.149 60623 74.125.239.129 443 SSL 0 X509,SHA256,SHA1,MD5 application/x-x509-ca-cert - 0.000000 F F 897 - 0 0 F - 2e7db2a31d0e3da4b25f49b9542a2e1a 7359755c6df9a0abc3060bce369564c8ec4542a3 3c35cc963eb004451323d3275d05b353235053490d9cd83729a2faf5e7ca1cc0
XXXXXXXXXX.XXXXXX FUFNf84cduA0IJCp07 ClEkJM2Vm5giqnMf4h 192.168.4.149 60624 74.125.239.129 443 SSL 0 X509,SHA256,SHA1,MD5 application/x-x509-user-cert - 0.000000 F F 1859 - 0 0 F - 7af07aca6d5c6e8e87fe4bb34786edc0 548b9e03bc183d1cd39f93a37985cb3950f8f06f 6bacfa4536150ed996f2b0c05ab6e345a257225f449aeb9d2018ccd88f4ede43
XXXXXXXXXX.XXXXXX F1H4bd2OKGbLPEdHm4 ClEkJM2Vm5giqnMf4h 192.168.4.149 60624 74.125.239.129 443 SSL 0 X509,SHA256,SHA1,MD5 application/x-x509-ca-cert - 0.000000 F F 1032 - 0 0 F - 9e4ac96474245129d9766700412a1f89 d83c1a7f4d0446bb2081b81a1670f8183451ca24 a047a37fa2d2e118a4f5095fe074d6cfe0e352425a7632bf8659c03919a6c81d
XXXXXXXXXX.XXXXXX Fgsbci2jxFXYMOHOhi ClEkJM2Vm5giqnMf4h 192.168.4.149 60624 74.125.239.129 443 SSL 0 X509,SHA256,SHA1,MD5 application/x-x509-ca-cert - 0.000000 F F 897 - 0 0 F - 2e7db2a31d0e3da4b25f49b9542a2e1a 7359755c6df9a0abc3060bce369564c8ec4542a3 3c35cc963eb004451323d3275d05b353235053490d9cd83729a2faf5e7ca1cc0
#close XXXX-XX-XX-XX-XX-XX

View file

@ -0,0 +1,52 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
1st stuff
hrw, 0, T
hrw, 1, T
hrw, 2, T
hrw, 3, T
hrw, 13, T
hrw, 37, T
hrw, 42, T
hrw, 101, T
rr, T
rr, T
rr, T
rr, T
rr, T
rr, T
rr, T
rr, T
2nd stuff
hrw, 0, T
hrw, 1, T
hrw, 2, T
hrw, 3, T
hrw, 13, T
hrw, 37, T
hrw, 42, T
hrw, 101, T
rr, T
rr, T
rr, T
rr, T
rr, T
rr, T
rr, T
rr, T
no stuff
hrw, 0, F
hrw, 1, F
hrw, 2, F
hrw, 3, F
hrw, 13, F
hrw, 37, F
hrw, 42, F
hrw, 101, F
rr, F
rr, F
rr, F
rr, F
rr, F
rr, F
rr, F
rr, F

View file

@ -0,0 +1,10 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
got distributed event hrw, 1
got distributed event hrw, 13
got distributed event hrw, 37
got distributed event hrw, 42
got distributed event hrw, 101
got distributed event rr, 0
got distributed event rr, 2
got distributed event rr, 13
got distributed event rr, 42

View file

@ -0,0 +1,24 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
got distributed event hrw, 0
got distributed event hrw, 2
got distributed event hrw, 3
got distributed event rr, 1
got distributed event rr, 3
got distributed event rr, 37
got distributed event rr, 101
got distributed event hrw, 0
got distributed event hrw, 1
got distributed event hrw, 2
got distributed event hrw, 3
got distributed event hrw, 13
got distributed event hrw, 37
got distributed event hrw, 42
got distributed event hrw, 101
got distributed event rr, 0
got distributed event rr, 1
got distributed event rr, 2
got distributed event rr, 3
got distributed event rr, 13
got distributed event rr, 37
got distributed event rr, 42
got distributed event rr, 101

View file

@ -4,7 +4,7 @@
build_dir = build
[btest]
TestDirs = af_packet doc bifs language core scripts coverage signatures plugins broker spicy supervisor telemetry javascript misc opt dns_mgr
TestDirs = af_packet doc bifs language core scripts coverage signatures plugins broker spicy supervisor telemetry javascript misc opt dns_mgr cluster
TmpDir = %(testbase)s/.tmp
BaselineDir = %(testbase)s/Baseline
IgnoreDirs = .svn CVS .tmp

View file

@ -0,0 +1,10 @@
# @TEST-DOC: Test cluster backend enum
#
# @TEST-EXEC: zeek -NN Zeek::Cluster_Backend_Broker >>out
# @TEST-EXEC: zeek -b %INPUT >>out
# @TEST-EXEC: btest-diff out
event zeek_init()
{
print Cluster::CLUSTER_BACKEND_BROKER, type_name(Cluster::CLUSTER_BACKEND_BROKER);
}

View file

@ -0,0 +1,99 @@
# @TEST-DOC: Broker::make_event() together with Cluster::publish_hrw() and Cluster::publish_rr()
# @TEST-PORT: BROKER_PORT1
# @TEST-PORT: BROKER_PORT2
# @TEST-PORT: BROKER_PORT3
# @TEST-PORT: BROKER_PORT4
# @TEST-PORT: BROKER_PORT5
#
# @TEST-EXEC: zeek -b --parse-only %INPUT
# @TEST-EXEC: btest-bg-run manager-1 ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=manager-1 zeek -b %INPUT
# @TEST-EXEC: btest-bg-run proxy-1 ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=proxy-1 zeek -b %INPUT
# @TEST-EXEC: btest-bg-run proxy-2 ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=proxy-2 zeek -b %INPUT
# @TEST-EXEC: btest-bg-wait 30
# @TEST-EXEC: btest-diff manager-1/.stdout
# @TEST-EXEC: btest-diff proxy-1/.stdout
# @TEST-EXEC: btest-diff proxy-2/.stdout
@load policy/frameworks/cluster/experimental
@TEST-START-FILE cluster-layout.zeek
redef Cluster::nodes = {
["manager-1"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT1"))],
["proxy-1"] = [$node_type=Cluster::PROXY, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT2")), $manager="manager-1"],
["proxy-2"] = [$node_type=Cluster::PROXY, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT3")), $manager="manager-1"],
};
@TEST-END-FILE
global q = 0;
event go_away()
{
terminate();
}
event distributed_event_hrw(c: count)
{
print "got distributed event hrw", c;
}
event distributed_event_rr(c: count)
{
print "got distributed event rr", c;
}
function send_stuff(heading: string)
{
print heading;
local v: vector of count = vector(0, 1, 2, 3, 13, 37, 42, 101);
local e: Broker::Event;
for ( i in v )
{
e = Broker::make_event(distributed_event_hrw, v[i]);
print "hrw", v[i], Cluster::publish_hrw(Cluster::proxy_pool, v[i], e);
}
local rr_key = "test";
for ( i in v )
{
e = Broker::make_event(distributed_event_rr, v[i]);
print "rr", Cluster::publish_rr(Cluster::proxy_pool, rr_key, e);
}
}
event Cluster::Experimental::cluster_started()
{
if ( Cluster::node != "manager-1" )
return;
send_stuff("1st stuff");
local e = Broker::make_event(go_away);
Broker::publish(Cluster::node_topic("proxy-1"), e);
}
event Cluster::node_down(name: string, id: string)
{
if ( Cluster::node != "manager-1" )
return;
if ( name == "proxy-1" )
{
send_stuff("2nd stuff");
local e = Broker::make_event(go_away);
Broker::publish(Cluster::node_topic("proxy-2"), e);
}
if ( name == "proxy-2" )
{
send_stuff("no stuff");
terminate();
}
}
event Cluster::node_down(name: string, id: string)
{
if ( name == "manager-1" )
terminate();
}