Ground work for pluggable storage serializers

This commit is contained in:
Tim Wojtulewicz 2025-04-01 16:14:35 -07:00
parent faac36f4cd
commit e545fe8256
21 changed files with 235 additions and 57 deletions

View file

@ -25,9 +25,8 @@ export {
## Returns: A record containing the status of the operation, and either an error
## string on failure or a value on success. The value returned here will
## be an ``opaque of BackendHandle``.
global open_backend: function(btype: Storage::Backend,
options: Storage::BackendOptions, key_type: any, val_type: any)
: Storage::OperationResult;
global open_backend: function(btype: Storage::Backend, options: Storage::BackendOptions,
key_type: any, val_type: any): Storage::OperationResult;
## Closes an existing backend connection asynchronously. This method must be
## called via a :zeek:see:`when` condition or an error will be returned.

View file

@ -7,23 +7,26 @@ export {
## :zeek:see:`Storage::Async::open_backend` and
## :zeek:see:`Storage::Sync::open_backend`. Backend plugins can redef this record
## to add relevant fields to it.
type BackendOptions: record { };
type BackendOptions: record {
## The serializer used for converting Zeek data.
serializer: Storage::Serializer &default=Storage::JSON;
};
## Record for passing arguments to :zeek:see:`Storage::Async::put` and
## :zeek:see:`Storage::Sync::put`.
type PutArgs: record {
# The key to store the value under.
## The key to store the value under.
key: any;
# The value to store associated with the key.
## The value to store associated with the key.
value: any;
# Indicates whether this value should overwrite an existing entry for the
# key.
## Indicates whether this value should overwrite an existing entry for the
## key.
overwrite: bool &default=T;
# An interval of time until the entry is automatically removed from the
# backend.
## An interval of time until the entry is automatically removed from the
## backend.
expire_time: interval &default=0sec;
};
}

View file

@ -23,9 +23,8 @@ export {
## Returns: A record containing the status of the operation, and either an error
## string on failure or a value on success. The value returned here will
## be an ``opaque of BackendHandle``.
global open_backend: function(btype: Storage::Backend,
options: Storage::BackendOptions, key_type: any, val_type: any)
: Storage::OperationResult;
global open_backend: function(btype: Storage::Backend, options: Storage::BackendOptions,
key_type: any, val_type: any): Storage::OperationResult;
## Closes an existing backend connection.
##

View file

@ -47,6 +47,8 @@ void Component::Describe(ODesc* d) const {
case component::STORAGE_BACKEND: d->Add("Storage Backend"); break;
case component::STORAGE_SERIALIZER: d->Add("Storage Serializer"); break;
default:
reporter->InternalWarning("unknown component type in plugin::Component::Describe");
d->Add("<unknown component type>");

View file

@ -34,6 +34,7 @@ enum Type {
EVENT_SERIALIZER, /// A serializer for events, used by cluster backends.
LOG_SERIALIZER, /// A serializer for log batches, used by cluster backends.
STORAGE_BACKEND, /// A backend for the storage framework.
STORAGE_SERIALIZER, /// A serializer for the storage framework.
};
} // namespace component

View file

@ -66,7 +66,7 @@ void OpenResultCallback::Complete(OperationResult res) {
}
Backend::Backend(uint8_t modes, std::string_view tag_name) : modes(modes) {
tag = storage_mgr->GetComponentTag(std::string{tag_name});
tag = storage_mgr->BackendMgr().GetComponentTag(std::string{tag_name});
tag_str = zeek::obj_desc_short(tag.AsVal().get());
}
@ -75,6 +75,15 @@ OperationResult Backend::Open(OpenResultCallback* cb, RecordValPtr options, Type
val_type = std::move(vt);
backend_options = options;
auto stype = options->GetField<EnumVal>("serializer");
zeek::Tag stag{stype};
auto s = storage_mgr->InstantiateSerializer(stag);
if ( ! s )
return {ReturnCode::INITIALIZATION_FAILED, s.error()};
serializer = std::move(s.value());
auto ret = DoOpen(cb, std::move(options));
if ( ! ret.value )
ret.value = cb->Backend();

View file

@ -5,6 +5,7 @@
#include "zeek/OpaqueVal.h"
#include "zeek/Tag.h"
#include "zeek/Val.h"
#include "zeek/storage/Serializer.h"
namespace zeek::detail::trigger {
class Trigger;
@ -58,7 +59,7 @@ struct OperationResult {
class ResultCallback {
public:
ResultCallback() = default;
ResultCallback(detail::trigger::TriggerPtr trigger, const void* assoc);
ResultCallback(zeek::detail::trigger::TriggerPtr trigger, const void* assoc);
virtual ~ResultCallback() = default;
/**
@ -239,6 +240,7 @@ protected:
zeek::Tag tag;
std::string tag_str;
std::unique_ptr<Serializer> serializer;
private:
/**

View file

@ -11,3 +11,4 @@ zeek_add_subdir_library(
storage-sync.bif)
add_subdirectory(backend)
add_subdirectory(serializer)

View file

@ -7,19 +7,34 @@
namespace zeek::storage {
Component::Component(const std::string& name, factory_callback arg_factory)
: plugin::Component(plugin::component::STORAGE_BACKEND, name, 0, storage_mgr->GetTagType()) {
BackendComponent::BackendComponent(const std::string& name, factory_callback arg_factory)
: plugin::Component(plugin::component::STORAGE_BACKEND, name, 0, storage_mgr->BackendMgr().GetTagType()) {
factory = arg_factory;
}
void Component::Initialize() {
void BackendComponent::Initialize() {
InitializeTag();
storage_mgr->RegisterComponent(this);
storage_mgr->BackendMgr().RegisterComponent(this);
}
void Component::DoDescribe(ODesc* d) const {
void BackendComponent::DoDescribe(ODesc* d) const {
d->Add("Storage::STORAGE_BACKEND_");
d->Add(CanonicalName());
}
SerializerComponent::SerializerComponent(const std::string& name, factory_callback arg_factory)
: plugin::Component(plugin::component::STORAGE_SERIALIZER, name, 0, storage_mgr->SerializerMgr().GetTagType()) {
factory = arg_factory;
}
void SerializerComponent::Initialize() {
InitializeTag();
storage_mgr->SerializerMgr().RegisterComponent(this);
}
void SerializerComponent::DoDescribe(ODesc* d) const {
d->Add("Storage::STORAGE_SERIALIZER_");
d->Add(CanonicalName());
}
} // namespace zeek::storage

View file

@ -7,11 +7,12 @@
namespace zeek::storage {
class Backend;
class Serializer;
/**
* Component description for plugins providing storage backends.
*/
class Component : public plugin::Component {
class BackendComponent : public plugin::Component {
public:
using factory_callback = IntrusivePtr<Backend> (*)();
@ -27,12 +28,60 @@ public:
* method inside the class that just allocates and returns a new
* instance.
*/
Component(const std::string& name, factory_callback factory);
BackendComponent(const std::string& name, factory_callback factory);
/**
* Destructor.
*/
~Component() override = default;
~BackendComponent() override = default;
/**
* Initialization function. This function has to be called before any
* plugin component functionality is used; it is used to add the
* plugin component to the list of components and to initialize tags
*/
void Initialize() override;
/**
* Returns the backend's factory function.
*/
factory_callback Factory() const { return factory; }
protected:
/**
* Overridden from plugin::Component.
*/
void DoDescribe(ODesc* d) const override;
private:
factory_callback factory;
};
/**
* Component description for plugins providing serialization for storage data..
*/
class SerializerComponent : public plugin::Component {
public:
using factory_callback = std::unique_ptr<Serializer> (*)();
/**
* Constructor.
*
* @param name The name of the provided backend. This name is used
* across the system to identify the backend.
*
* @param factory A factory function to instantiate instances of the
* backend's class, which must be derived directly or indirectly from
* storage::Backend. This is typically a static \c Instantiate()
* method inside the class that just allocates and returns a new
* instance.
*/
SerializerComponent(const std::string& name, factory_callback factory);
/**
* Destructor.
*/
~SerializerComponent() override = default;
/**
* Initialization function. This function has to be called before any

View file

@ -28,7 +28,9 @@ void detail::ExpirationTimer::Dispatch(double t, bool is_expire) {
storage_mgr->StartExpirationTimer();
}
Manager::Manager() : plugin::ComponentManager<storage::Component>("Storage", "Backend") {}
Manager::Manager()
: backend_mgr(plugin::ComponentManager<storage::BackendComponent>("Storage", "Backend")),
serializer_mgr(plugin::ComponentManager<storage::SerializerComponent>("Storage", "Serializer")) {}
Manager::~Manager() {
// TODO: should we shut down any existing backends? force-poll until all of their existing
@ -48,24 +50,40 @@ void Manager::InitPostScript() {
StartExpirationTimer();
}
zeek::expected<BackendPtr, std::string> Manager::Instantiate(const Tag& type) {
Component* c = Lookup(type);
if ( ! c ) {
zeek::expected<BackendPtr, std::string> Manager::InstantiateBackend(const Tag& type) {
BackendComponent* c = backend_mgr.Lookup(type);
if ( ! c )
return zeek::unexpected<std::string>(
util::fmt("Request to open unknown backend (%d:%d)", type.Type(), type.Subtype()));
util::fmt("Request to instantiate unknown backend type (%d:%d)", type.Type(), type.Subtype()));
if ( ! c->Factory() )
return zeek::unexpected<std::string>(
util::fmt("Factory invalid for backend %s", backend_mgr.GetComponentName(type).c_str()));
auto bp = c->Factory()();
if ( ! bp )
return zeek::unexpected<std::string>(
util::fmt("Failed to instantiate backend %s", backend_mgr.GetComponentName(type).c_str()));
return bp;
}
if ( ! c->Factory() ) {
zeek::expected<std::unique_ptr<Serializer>, std::string> Manager::InstantiateSerializer(const Tag& type) {
SerializerComponent* c = serializer_mgr.Lookup(type);
if ( ! c )
return zeek::unexpected<std::string>(
util::fmt("Factory invalid for backend %s", GetComponentName(type).c_str()));
}
util::fmt("Request to instantiate unknown serializer type (%d:%d)", type.Type(), type.Subtype()));
BackendPtr bp = c->Factory()();
if ( ! bp ) {
if ( ! c->Factory() )
return zeek::unexpected<std::string>(
util::fmt("Failed to instantiate backend %s", GetComponentName(type).c_str()));
}
util::fmt("Factory invalid for serializer %s", serializer_mgr.GetComponentName(type).c_str()));
auto bp = c->Factory()();
if ( ! bp )
return zeek::unexpected<std::string>(
util::fmt("Failed to instantiate serializer %s", serializer_mgr.GetComponentName(type).c_str()));
return bp;
}

View file

@ -10,6 +10,7 @@
#include "zeek/plugin/ComponentManager.h"
#include "zeek/storage/Backend.h"
#include "zeek/storage/Component.h"
#include "zeek/storage/Serializer.h"
namespace zeek::storage {
@ -24,7 +25,7 @@ public:
} // namespace detail
class Manager final : public plugin::ComponentManager<Component> {
class Manager final {
public:
Manager();
~Manager();
@ -43,7 +44,16 @@ public:
* @return A std::expected containing either a valid BackendPtr with the result of the
* operation or a string containing an error message for failure.
*/
zeek::expected<BackendPtr, std::string> Instantiate(const Tag& type);
zeek::expected<BackendPtr, std::string> InstantiateBackend(const Tag& type);
/**
* Instantiates a new serializer object.
*
* @param type The tag for the type of backend being opened.
* @return A std::expected containing either a valid BackendPtr with the result of the
* operation or a string containing an error message for failure.
*/
zeek::expected<std::unique_ptr<Serializer>, std::string> InstantiateSerializer(const Tag& type);
/**
* Opens a new storage backend.
@ -82,6 +92,9 @@ public:
*/
void Expire(double t);
plugin::ComponentManager<BackendComponent>& BackendMgr() { return backend_mgr; }
plugin::ComponentManager<SerializerComponent>& SerializerMgr() { return serializer_mgr; }
protected:
friend class storage::detail::ExpirationTimer;
void RunExpireThread();
@ -94,6 +107,9 @@ protected:
private:
std::vector<BackendPtr> backends;
std::mutex backends_mtx;
plugin::ComponentManager<BackendComponent> backend_mgr;
plugin::ComponentManager<SerializerComponent> serializer_mgr;
};
} // namespace zeek::storage

View file

@ -19,6 +19,8 @@ EnumValPtr ReturnCode::CONNECTION_FAILED;
EnumValPtr ReturnCode::DISCONNECTION_FAILED;
EnumValPtr ReturnCode::INITIALIZATION_FAILED;
EnumValPtr ReturnCode::IN_PROGRESS;
EnumValPtr ReturnCode::SERIALIZATION_FAILED;
EnumValPtr ReturnCode::UNSERIALIZATION_FAILED;
void ReturnCode::Initialize() {
static const auto& return_code_type = zeek::id::find_type<zeek::EnumType>("Storage::ReturnCode");
@ -61,6 +63,12 @@ void ReturnCode::Initialize() {
tmp = return_code_type->Lookup("Storage::IN_PROGRESS");
IN_PROGRESS = return_code_type->GetEnumVal(tmp);
tmp = return_code_type->Lookup("Storage::SERIALIZATION_FAILED");
SERIALIZATION_FAILED = return_code_type->GetEnumVal(tmp);
tmp = return_code_type->Lookup("Storage::UNSERIALIZATION_FAILED");
UNSERIALIZATION_FAILED = return_code_type->GetEnumVal(tmp);
}
void ReturnCode::Cleanup() {
@ -77,6 +85,8 @@ void ReturnCode::Cleanup() {
DISCONNECTION_FAILED.reset();
INITIALIZATION_FAILED.reset();
IN_PROGRESS.reset();
SERIALIZATION_FAILED.reset();
UNSERIALIZATION_FAILED.reset();
}
} // namespace zeek::storage

View file

@ -32,6 +32,8 @@ public:
static EnumValPtr DISCONNECTION_FAILED;
static EnumValPtr INITIALIZATION_FAILED;
static EnumValPtr IN_PROGRESS;
static EnumValPtr SERIALIZATION_FAILED;
static EnumValPtr UNSERIALIZATION_FAILED;
};
} // namespace storage

50
src/storage/Serializer.h Normal file
View file

@ -0,0 +1,50 @@
// See the file "COPYING" in the main distribution directory for copyright.
#pragma once
#include "zeek/Span.h"
#include "zeek/Val.h"
namespace zeek::storage {
namespace detail {
using byte_buffer = std::vector<std::byte>;
using byte_buffer_span = Span<const std::byte>;
} // namespace detail
/**
* Base class for a serializer used by storage backends.
*/
class Serializer {
public:
virtual ~Serializer() = default;
/**
* Serializes Zeek Val data into another format.
*
* @param val The data to serialize.
*
* @return On success, a byte buffer containing the serialized data. std::nullopt will
* be returned on failure.
*/
virtual std::optional<detail::byte_buffer> Serialize(ValPtr val) = 0;
/**
* Unserializes a byte buffer into Zeek Val objects of a specific type.
*
* @param buf The byte data to unserialize.
* @param type The Zeek script-level type to unserialize the data into.
*
* @return A zeek::expected containing either the unserialized Val data on success, or
* a string containing an error message on failure.
*/
virtual zeek::expected<ValPtr, std::string> Unserialize(detail::byte_buffer_span buf, TypePtr type) = 0;
protected:
Serializer(std::string name) : name(std::move(name)) {}
private:
std::string name;
};
} // namespace zeek::storage

View file

@ -10,7 +10,7 @@ namespace zeek::storage::backend::redis {
class Plugin : public plugin::Plugin {
public:
plugin::Configuration Configure() override {
AddComponent(new storage::Component("REDIS", backend::redis::Redis::Instantiate));
AddComponent(new storage::BackendComponent("REDIS", backend::redis::Redis::Instantiate));
plugin::Configuration config;
config.name = "Zeek::Storage_Backend_Redis";

View file

@ -10,7 +10,7 @@ namespace zeek::storage::backend::sqlite {
class Plugin : public plugin::Plugin {
public:
plugin::Configuration Configure() override {
AddComponent(new storage::Component("SQLITE", backend::sqlite::SQLite::Instantiate));
AddComponent(new storage::BackendComponent("SQLITE", backend::sqlite::SQLite::Instantiate));
plugin::Configuration config;
config.name = "Zeek::Storage_Backend_SQLite";

View file

View file

@ -80,9 +80,9 @@ function Storage::Async::__open_backend%(btype: Storage::Backend, options: any,
return nullptr;
auto btype_val = IntrusivePtr<EnumVal>{NewRef{}, btype->AsEnumVal()};
Tag tag{btype_val};
Tag btag{btype_val};
auto b = storage_mgr->Instantiate(tag);
auto b = storage_mgr->InstantiateBackend(btag);
if ( ! b.has_value() ) {
trigger->Cache(

View file

@ -31,9 +31,9 @@ module Storage::Sync;
function Storage::Sync::__open_backend%(btype: Storage::Backend, options: any, key_type: any, val_type: any%): Storage::OperationResult
%{
auto btype_val = IntrusivePtr<EnumVal>{NewRef{}, btype->AsEnumVal()};
Tag tag{btype_val};
Tag btag{btype_val};
auto b = storage_mgr->Instantiate(tag);
auto b = storage_mgr->InstantiateBackend(btag);
if ( ! b.has_value() ) {
emit_builtin_error(b.error().c_str());

View file

@ -324,6 +324,8 @@ void ScriptInfo::DoInitPostScript() {
else if ( name == "base/frameworks/storage/main.zeek" ) {
const auto& backend_id = zeek::detail::global_scope()->Find("Storage::Backend");
types.push_back(new IdentifierInfo(backend_id, this));
const auto& serializer_id = zeek::detail::global_scope()->Find("Storage::Serializer");
types.push_back(new IdentifierInfo(serializer_id, this));
}
}