From e545fe825684a2c16fdde3563d17354c1e440a17 Mon Sep 17 00:00:00 2001 From: Tim Wojtulewicz Date: Tue, 1 Apr 2025 16:14:35 -0700 Subject: [PATCH] Ground work for pluggable storage serializers --- scripts/base/frameworks/storage/async.zeek | 5 +- scripts/base/frameworks/storage/main.zeek | 17 ++++--- scripts/base/frameworks/storage/sync.zeek | 5 +- src/plugin/Component.cc | 2 + src/plugin/Component.h | 27 ++++++----- src/storage/Backend.cc | 11 ++++- src/storage/Backend.h | 4 +- src/storage/CMakeLists.txt | 1 + src/storage/Component.cc | 25 ++++++++-- src/storage/Component.h | 55 ++++++++++++++++++++-- src/storage/Manager.cc | 44 ++++++++++++----- src/storage/Manager.h | 20 +++++++- src/storage/ReturnCode.cc | 10 ++++ src/storage/ReturnCode.h | 2 + src/storage/Serializer.h | 50 ++++++++++++++++++++ src/storage/backend/redis/Plugin.cc | 2 +- src/storage/backend/sqlite/Plugin.cc | 2 +- src/storage/serializer/CMakeLists.txt | 0 src/storage/storage-async.bif | 4 +- src/storage/storage-sync.bif | 4 +- src/zeekygen/ScriptInfo.cc | 2 + 21 files changed, 235 insertions(+), 57 deletions(-) create mode 100644 src/storage/Serializer.h create mode 100644 src/storage/serializer/CMakeLists.txt diff --git a/scripts/base/frameworks/storage/async.zeek b/scripts/base/frameworks/storage/async.zeek index 3a30755afe..04398585f1 100644 --- a/scripts/base/frameworks/storage/async.zeek +++ b/scripts/base/frameworks/storage/async.zeek @@ -25,9 +25,8 @@ export { ## Returns: A record containing the status of the operation, and either an error ## string on failure or a value on success. The value returned here will ## be an ``opaque of BackendHandle``. - global open_backend: function(btype: Storage::Backend, - options: Storage::BackendOptions, key_type: any, val_type: any) - : Storage::OperationResult; + global open_backend: function(btype: Storage::Backend, options: Storage::BackendOptions, + key_type: any, val_type: any): Storage::OperationResult; ## Closes an existing backend connection asynchronously. This method must be ## called via a :zeek:see:`when` condition or an error will be returned. diff --git a/scripts/base/frameworks/storage/main.zeek b/scripts/base/frameworks/storage/main.zeek index 5c05503c33..a1793dac37 100644 --- a/scripts/base/frameworks/storage/main.zeek +++ b/scripts/base/frameworks/storage/main.zeek @@ -7,23 +7,26 @@ export { ## :zeek:see:`Storage::Async::open_backend` and ## :zeek:see:`Storage::Sync::open_backend`. Backend plugins can redef this record ## to add relevant fields to it. - type BackendOptions: record { }; + type BackendOptions: record { + ## The serializer used for converting Zeek data. + serializer: Storage::Serializer &default=Storage::JSON; + }; ## Record for passing arguments to :zeek:see:`Storage::Async::put` and ## :zeek:see:`Storage::Sync::put`. type PutArgs: record { - # The key to store the value under. + ## The key to store the value under. key: any; - # The value to store associated with the key. + ## The value to store associated with the key. value: any; - # Indicates whether this value should overwrite an existing entry for the - # key. + ## Indicates whether this value should overwrite an existing entry for the + ## key. overwrite: bool &default=T; - # An interval of time until the entry is automatically removed from the - # backend. + ## An interval of time until the entry is automatically removed from the + ## backend. expire_time: interval &default=0sec; }; } diff --git a/scripts/base/frameworks/storage/sync.zeek b/scripts/base/frameworks/storage/sync.zeek index acc06dfbb4..042cf96420 100644 --- a/scripts/base/frameworks/storage/sync.zeek +++ b/scripts/base/frameworks/storage/sync.zeek @@ -23,9 +23,8 @@ export { ## Returns: A record containing the status of the operation, and either an error ## string on failure or a value on success. The value returned here will ## be an ``opaque of BackendHandle``. - global open_backend: function(btype: Storage::Backend, - options: Storage::BackendOptions, key_type: any, val_type: any) - : Storage::OperationResult; + global open_backend: function(btype: Storage::Backend, options: Storage::BackendOptions, + key_type: any, val_type: any): Storage::OperationResult; ## Closes an existing backend connection. ## diff --git a/src/plugin/Component.cc b/src/plugin/Component.cc index 7374622910..fda9bde98d 100644 --- a/src/plugin/Component.cc +++ b/src/plugin/Component.cc @@ -47,6 +47,8 @@ void Component::Describe(ODesc* d) const { case component::STORAGE_BACKEND: d->Add("Storage Backend"); break; + case component::STORAGE_SERIALIZER: d->Add("Storage Serializer"); break; + default: reporter->InternalWarning("unknown component type in plugin::Component::Describe"); d->Add(""); diff --git a/src/plugin/Component.h b/src/plugin/Component.h index 259fc4ebc8..62f11b0cb9 100644 --- a/src/plugin/Component.h +++ b/src/plugin/Component.h @@ -21,19 +21,20 @@ namespace component { * Component types. */ enum Type { - READER, /// An input reader (not currently used). - WRITER, /// A logging writer (not currently used). - ANALYZER, /// A protocol analyzer. - PACKET_ANALYZER, /// A packet analyzer. - FILE_ANALYZER, /// A file analyzer. - IOSOURCE, /// An I/O source, excluding packet sources. - PKTSRC, /// A packet source. - PKTDUMPER, /// A packet dumper. - SESSION_ADAPTER, /// A session adapter analyzer. - CLUSTER_BACKEND, /// A cluster backend. - EVENT_SERIALIZER, /// A serializer for events, used by cluster backends. - LOG_SERIALIZER, /// A serializer for log batches, used by cluster backends. - STORAGE_BACKEND, /// A backend for the storage framework. + READER, /// An input reader (not currently used). + WRITER, /// A logging writer (not currently used). + ANALYZER, /// A protocol analyzer. + PACKET_ANALYZER, /// A packet analyzer. + FILE_ANALYZER, /// A file analyzer. + IOSOURCE, /// An I/O source, excluding packet sources. + PKTSRC, /// A packet source. + PKTDUMPER, /// A packet dumper. + SESSION_ADAPTER, /// A session adapter analyzer. + CLUSTER_BACKEND, /// A cluster backend. + EVENT_SERIALIZER, /// A serializer for events, used by cluster backends. + LOG_SERIALIZER, /// A serializer for log batches, used by cluster backends. + STORAGE_BACKEND, /// A backend for the storage framework. + STORAGE_SERIALIZER, /// A serializer for the storage framework. }; } // namespace component diff --git a/src/storage/Backend.cc b/src/storage/Backend.cc index e71b9e2603..a8d951b0dc 100644 --- a/src/storage/Backend.cc +++ b/src/storage/Backend.cc @@ -66,7 +66,7 @@ void OpenResultCallback::Complete(OperationResult res) { } Backend::Backend(uint8_t modes, std::string_view tag_name) : modes(modes) { - tag = storage_mgr->GetComponentTag(std::string{tag_name}); + tag = storage_mgr->BackendMgr().GetComponentTag(std::string{tag_name}); tag_str = zeek::obj_desc_short(tag.AsVal().get()); } @@ -75,6 +75,15 @@ OperationResult Backend::Open(OpenResultCallback* cb, RecordValPtr options, Type val_type = std::move(vt); backend_options = options; + auto stype = options->GetField("serializer"); + zeek::Tag stag{stype}; + + auto s = storage_mgr->InstantiateSerializer(stag); + if ( ! s ) + return {ReturnCode::INITIALIZATION_FAILED, s.error()}; + + serializer = std::move(s.value()); + auto ret = DoOpen(cb, std::move(options)); if ( ! ret.value ) ret.value = cb->Backend(); diff --git a/src/storage/Backend.h b/src/storage/Backend.h index 702fde362b..17fb5e33bf 100644 --- a/src/storage/Backend.h +++ b/src/storage/Backend.h @@ -5,6 +5,7 @@ #include "zeek/OpaqueVal.h" #include "zeek/Tag.h" #include "zeek/Val.h" +#include "zeek/storage/Serializer.h" namespace zeek::detail::trigger { class Trigger; @@ -58,7 +59,7 @@ struct OperationResult { class ResultCallback { public: ResultCallback() = default; - ResultCallback(detail::trigger::TriggerPtr trigger, const void* assoc); + ResultCallback(zeek::detail::trigger::TriggerPtr trigger, const void* assoc); virtual ~ResultCallback() = default; /** @@ -239,6 +240,7 @@ protected: zeek::Tag tag; std::string tag_str; + std::unique_ptr serializer; private: /** diff --git a/src/storage/CMakeLists.txt b/src/storage/CMakeLists.txt index b64c7d6016..c68b7b1af8 100644 --- a/src/storage/CMakeLists.txt +++ b/src/storage/CMakeLists.txt @@ -11,3 +11,4 @@ zeek_add_subdir_library( storage-sync.bif) add_subdirectory(backend) +add_subdirectory(serializer) diff --git a/src/storage/Component.cc b/src/storage/Component.cc index 1f5bb1da6e..cc0ee4cc4c 100644 --- a/src/storage/Component.cc +++ b/src/storage/Component.cc @@ -7,19 +7,34 @@ namespace zeek::storage { -Component::Component(const std::string& name, factory_callback arg_factory) - : plugin::Component(plugin::component::STORAGE_BACKEND, name, 0, storage_mgr->GetTagType()) { +BackendComponent::BackendComponent(const std::string& name, factory_callback arg_factory) + : plugin::Component(plugin::component::STORAGE_BACKEND, name, 0, storage_mgr->BackendMgr().GetTagType()) { factory = arg_factory; } -void Component::Initialize() { +void BackendComponent::Initialize() { InitializeTag(); - storage_mgr->RegisterComponent(this); + storage_mgr->BackendMgr().RegisterComponent(this); } -void Component::DoDescribe(ODesc* d) const { +void BackendComponent::DoDescribe(ODesc* d) const { d->Add("Storage::STORAGE_BACKEND_"); d->Add(CanonicalName()); } +SerializerComponent::SerializerComponent(const std::string& name, factory_callback arg_factory) + : plugin::Component(plugin::component::STORAGE_SERIALIZER, name, 0, storage_mgr->SerializerMgr().GetTagType()) { + factory = arg_factory; +} + +void SerializerComponent::Initialize() { + InitializeTag(); + storage_mgr->SerializerMgr().RegisterComponent(this); +} + +void SerializerComponent::DoDescribe(ODesc* d) const { + d->Add("Storage::STORAGE_SERIALIZER_"); + d->Add(CanonicalName()); +} + } // namespace zeek::storage diff --git a/src/storage/Component.h b/src/storage/Component.h index 6d7f5d1e6f..d7768b1d1a 100644 --- a/src/storage/Component.h +++ b/src/storage/Component.h @@ -7,11 +7,12 @@ namespace zeek::storage { class Backend; +class Serializer; /** * Component description for plugins providing storage backends. */ -class Component : public plugin::Component { +class BackendComponent : public plugin::Component { public: using factory_callback = IntrusivePtr (*)(); @@ -27,12 +28,60 @@ public: * method inside the class that just allocates and returns a new * instance. */ - Component(const std::string& name, factory_callback factory); + BackendComponent(const std::string& name, factory_callback factory); /** * Destructor. */ - ~Component() override = default; + ~BackendComponent() override = default; + + /** + * Initialization function. This function has to be called before any + * plugin component functionality is used; it is used to add the + * plugin component to the list of components and to initialize tags + */ + void Initialize() override; + + /** + * Returns the backend's factory function. + */ + factory_callback Factory() const { return factory; } + +protected: + /** + * Overridden from plugin::Component. + */ + void DoDescribe(ODesc* d) const override; + +private: + factory_callback factory; +}; + +/** + * Component description for plugins providing serialization for storage data.. + */ +class SerializerComponent : public plugin::Component { +public: + using factory_callback = std::unique_ptr (*)(); + + /** + * Constructor. + * + * @param name The name of the provided backend. This name is used + * across the system to identify the backend. + * + * @param factory A factory function to instantiate instances of the + * backend's class, which must be derived directly or indirectly from + * storage::Backend. This is typically a static \c Instantiate() + * method inside the class that just allocates and returns a new + * instance. + */ + SerializerComponent(const std::string& name, factory_callback factory); + + /** + * Destructor. + */ + ~SerializerComponent() override = default; /** * Initialization function. This function has to be called before any diff --git a/src/storage/Manager.cc b/src/storage/Manager.cc index 3ca04d5530..abd2848832 100644 --- a/src/storage/Manager.cc +++ b/src/storage/Manager.cc @@ -28,7 +28,9 @@ void detail::ExpirationTimer::Dispatch(double t, bool is_expire) { storage_mgr->StartExpirationTimer(); } -Manager::Manager() : plugin::ComponentManager("Storage", "Backend") {} +Manager::Manager() + : backend_mgr(plugin::ComponentManager("Storage", "Backend")), + serializer_mgr(plugin::ComponentManager("Storage", "Serializer")) {} Manager::~Manager() { // TODO: should we shut down any existing backends? force-poll until all of their existing @@ -48,24 +50,40 @@ void Manager::InitPostScript() { StartExpirationTimer(); } -zeek::expected Manager::Instantiate(const Tag& type) { - Component* c = Lookup(type); - if ( ! c ) { +zeek::expected Manager::InstantiateBackend(const Tag& type) { + BackendComponent* c = backend_mgr.Lookup(type); + if ( ! c ) return zeek::unexpected( - util::fmt("Request to open unknown backend (%d:%d)", type.Type(), type.Subtype())); - } + util::fmt("Request to instantiate unknown backend type (%d:%d)", type.Type(), type.Subtype())); - if ( ! c->Factory() ) { + if ( ! c->Factory() ) return zeek::unexpected( - util::fmt("Factory invalid for backend %s", GetComponentName(type).c_str())); - } + util::fmt("Factory invalid for backend %s", backend_mgr.GetComponentName(type).c_str())); - BackendPtr bp = c->Factory()(); + auto bp = c->Factory()(); - if ( ! bp ) { + if ( ! bp ) return zeek::unexpected( - util::fmt("Failed to instantiate backend %s", GetComponentName(type).c_str())); - } + util::fmt("Failed to instantiate backend %s", backend_mgr.GetComponentName(type).c_str())); + + return bp; +} + +zeek::expected, std::string> Manager::InstantiateSerializer(const Tag& type) { + SerializerComponent* c = serializer_mgr.Lookup(type); + if ( ! c ) + return zeek::unexpected( + util::fmt("Request to instantiate unknown serializer type (%d:%d)", type.Type(), type.Subtype())); + + if ( ! c->Factory() ) + return zeek::unexpected( + util::fmt("Factory invalid for serializer %s", serializer_mgr.GetComponentName(type).c_str())); + + auto bp = c->Factory()(); + + if ( ! bp ) + return zeek::unexpected( + util::fmt("Failed to instantiate serializer %s", serializer_mgr.GetComponentName(type).c_str())); return bp; } diff --git a/src/storage/Manager.h b/src/storage/Manager.h index ae3c1bab66..cdbc208b04 100644 --- a/src/storage/Manager.h +++ b/src/storage/Manager.h @@ -10,6 +10,7 @@ #include "zeek/plugin/ComponentManager.h" #include "zeek/storage/Backend.h" #include "zeek/storage/Component.h" +#include "zeek/storage/Serializer.h" namespace zeek::storage { @@ -24,7 +25,7 @@ public: } // namespace detail -class Manager final : public plugin::ComponentManager { +class Manager final { public: Manager(); ~Manager(); @@ -43,7 +44,16 @@ public: * @return A std::expected containing either a valid BackendPtr with the result of the * operation or a string containing an error message for failure. */ - zeek::expected Instantiate(const Tag& type); + zeek::expected InstantiateBackend(const Tag& type); + + /** + * Instantiates a new serializer object. + * + * @param type The tag for the type of backend being opened. + * @return A std::expected containing either a valid BackendPtr with the result of the + * operation or a string containing an error message for failure. + */ + zeek::expected, std::string> InstantiateSerializer(const Tag& type); /** * Opens a new storage backend. @@ -82,6 +92,9 @@ public: */ void Expire(double t); + plugin::ComponentManager& BackendMgr() { return backend_mgr; } + plugin::ComponentManager& SerializerMgr() { return serializer_mgr; } + protected: friend class storage::detail::ExpirationTimer; void RunExpireThread(); @@ -94,6 +107,9 @@ protected: private: std::vector backends; std::mutex backends_mtx; + + plugin::ComponentManager backend_mgr; + plugin::ComponentManager serializer_mgr; }; } // namespace zeek::storage diff --git a/src/storage/ReturnCode.cc b/src/storage/ReturnCode.cc index 22a3e8a2e6..3b78c6e4ae 100644 --- a/src/storage/ReturnCode.cc +++ b/src/storage/ReturnCode.cc @@ -19,6 +19,8 @@ EnumValPtr ReturnCode::CONNECTION_FAILED; EnumValPtr ReturnCode::DISCONNECTION_FAILED; EnumValPtr ReturnCode::INITIALIZATION_FAILED; EnumValPtr ReturnCode::IN_PROGRESS; +EnumValPtr ReturnCode::SERIALIZATION_FAILED; +EnumValPtr ReturnCode::UNSERIALIZATION_FAILED; void ReturnCode::Initialize() { static const auto& return_code_type = zeek::id::find_type("Storage::ReturnCode"); @@ -61,6 +63,12 @@ void ReturnCode::Initialize() { tmp = return_code_type->Lookup("Storage::IN_PROGRESS"); IN_PROGRESS = return_code_type->GetEnumVal(tmp); + + tmp = return_code_type->Lookup("Storage::SERIALIZATION_FAILED"); + SERIALIZATION_FAILED = return_code_type->GetEnumVal(tmp); + + tmp = return_code_type->Lookup("Storage::UNSERIALIZATION_FAILED"); + UNSERIALIZATION_FAILED = return_code_type->GetEnumVal(tmp); } void ReturnCode::Cleanup() { @@ -77,6 +85,8 @@ void ReturnCode::Cleanup() { DISCONNECTION_FAILED.reset(); INITIALIZATION_FAILED.reset(); IN_PROGRESS.reset(); + SERIALIZATION_FAILED.reset(); + UNSERIALIZATION_FAILED.reset(); } } // namespace zeek::storage diff --git a/src/storage/ReturnCode.h b/src/storage/ReturnCode.h index 8b110641f0..db0bda7f16 100644 --- a/src/storage/ReturnCode.h +++ b/src/storage/ReturnCode.h @@ -32,6 +32,8 @@ public: static EnumValPtr DISCONNECTION_FAILED; static EnumValPtr INITIALIZATION_FAILED; static EnumValPtr IN_PROGRESS; + static EnumValPtr SERIALIZATION_FAILED; + static EnumValPtr UNSERIALIZATION_FAILED; }; } // namespace storage diff --git a/src/storage/Serializer.h b/src/storage/Serializer.h new file mode 100644 index 0000000000..dea136fcb2 --- /dev/null +++ b/src/storage/Serializer.h @@ -0,0 +1,50 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#pragma once + +#include "zeek/Span.h" +#include "zeek/Val.h" + +namespace zeek::storage { + +namespace detail { +using byte_buffer = std::vector; +using byte_buffer_span = Span; +} // namespace detail + +/** + * Base class for a serializer used by storage backends. + */ +class Serializer { +public: + virtual ~Serializer() = default; + + /** + * Serializes Zeek Val data into another format. + * + * @param val The data to serialize. + * + * @return On success, a byte buffer containing the serialized data. std::nullopt will + * be returned on failure. + */ + virtual std::optional Serialize(ValPtr val) = 0; + + /** + * Unserializes a byte buffer into Zeek Val objects of a specific type. + * + * @param buf The byte data to unserialize. + * @param type The Zeek script-level type to unserialize the data into. + * + * @return A zeek::expected containing either the unserialized Val data on success, or + * a string containing an error message on failure. + */ + virtual zeek::expected Unserialize(detail::byte_buffer_span buf, TypePtr type) = 0; + +protected: + Serializer(std::string name) : name(std::move(name)) {} + +private: + std::string name; +}; + +} // namespace zeek::storage diff --git a/src/storage/backend/redis/Plugin.cc b/src/storage/backend/redis/Plugin.cc index 751a3996c9..a94707ca45 100644 --- a/src/storage/backend/redis/Plugin.cc +++ b/src/storage/backend/redis/Plugin.cc @@ -10,7 +10,7 @@ namespace zeek::storage::backend::redis { class Plugin : public plugin::Plugin { public: plugin::Configuration Configure() override { - AddComponent(new storage::Component("REDIS", backend::redis::Redis::Instantiate)); + AddComponent(new storage::BackendComponent("REDIS", backend::redis::Redis::Instantiate)); plugin::Configuration config; config.name = "Zeek::Storage_Backend_Redis"; diff --git a/src/storage/backend/sqlite/Plugin.cc b/src/storage/backend/sqlite/Plugin.cc index f6d56e77fb..8498e747d8 100644 --- a/src/storage/backend/sqlite/Plugin.cc +++ b/src/storage/backend/sqlite/Plugin.cc @@ -10,7 +10,7 @@ namespace zeek::storage::backend::sqlite { class Plugin : public plugin::Plugin { public: plugin::Configuration Configure() override { - AddComponent(new storage::Component("SQLITE", backend::sqlite::SQLite::Instantiate)); + AddComponent(new storage::BackendComponent("SQLITE", backend::sqlite::SQLite::Instantiate)); plugin::Configuration config; config.name = "Zeek::Storage_Backend_SQLite"; diff --git a/src/storage/serializer/CMakeLists.txt b/src/storage/serializer/CMakeLists.txt new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/storage/storage-async.bif b/src/storage/storage-async.bif index ff56cef0cd..a705569236 100644 --- a/src/storage/storage-async.bif +++ b/src/storage/storage-async.bif @@ -80,9 +80,9 @@ function Storage::Async::__open_backend%(btype: Storage::Backend, options: any, return nullptr; auto btype_val = IntrusivePtr{NewRef{}, btype->AsEnumVal()}; - Tag tag{btype_val}; + Tag btag{btype_val}; - auto b = storage_mgr->Instantiate(tag); + auto b = storage_mgr->InstantiateBackend(btag); if ( ! b.has_value() ) { trigger->Cache( diff --git a/src/storage/storage-sync.bif b/src/storage/storage-sync.bif index a3a9e78e42..38c977bd3b 100644 --- a/src/storage/storage-sync.bif +++ b/src/storage/storage-sync.bif @@ -31,9 +31,9 @@ module Storage::Sync; function Storage::Sync::__open_backend%(btype: Storage::Backend, options: any, key_type: any, val_type: any%): Storage::OperationResult %{ auto btype_val = IntrusivePtr{NewRef{}, btype->AsEnumVal()}; - Tag tag{btype_val}; + Tag btag{btype_val}; - auto b = storage_mgr->Instantiate(tag); + auto b = storage_mgr->InstantiateBackend(btag); if ( ! b.has_value() ) { emit_builtin_error(b.error().c_str()); diff --git a/src/zeekygen/ScriptInfo.cc b/src/zeekygen/ScriptInfo.cc index 71464fa87d..8495b384f1 100644 --- a/src/zeekygen/ScriptInfo.cc +++ b/src/zeekygen/ScriptInfo.cc @@ -324,6 +324,8 @@ void ScriptInfo::DoInitPostScript() { else if ( name == "base/frameworks/storage/main.zeek" ) { const auto& backend_id = zeek::detail::global_scope()->Find("Storage::Backend"); types.push_back(new IdentifierInfo(backend_id, this)); + const auto& serializer_id = zeek::detail::global_scope()->Find("Storage::Serializer"); + types.push_back(new IdentifierInfo(serializer_id, this)); } }