From d07d27453af793a0944d4a83c2e8d672fe7fc054 Mon Sep 17 00:00:00 2001 From: Tim Wojtulewicz Date: Fri, 6 Dec 2024 15:29:15 -0700 Subject: [PATCH] Add infrastructure for automated expiration of storage entries This is used for backends that don't support expiration natively. --- scripts/base/frameworks/storage/main.zeek | 6 ++- scripts/base/init-bare.zeek | 8 ++++ src/Timer.cc | 1 + src/Timer.h | 3 +- src/const.bif | 2 + src/storage/Backend.cc | 4 +- src/storage/Backend.h | 16 +++++-- src/storage/Manager.cc | 44 ++++++++++++++++--- src/storage/Manager.h | 22 ++++++++-- src/storage/storage.bif | 5 ++- .../storage-plugin/src/StorageDummy.cc | 3 +- .../plugins/storage-plugin/src/StorageDummy.h | 3 +- 12 files changed, 96 insertions(+), 21 deletions(-) diff --git a/scripts/base/frameworks/storage/main.zeek b/scripts/base/frameworks/storage/main.zeek index 8bfe4fd4e6..f6fe09b960 100644 --- a/scripts/base/frameworks/storage/main.zeek +++ b/scripts/base/frameworks/storage/main.zeek @@ -16,6 +16,10 @@ export { # Indicates whether this value should overwrite an existing entry # for the key. overwrite: bool &default=T; + + # An interval of time until the entry is automatically removed from the + # backend. + expire_time: interval &default=0sec; }; ## Opens a new backend connection based on a configuration object. @@ -95,7 +99,7 @@ function close_backend(backend: opaque of Storage::BackendHandle): bool function put(backend: opaque of Storage::BackendHandle, args: Storage::PutArgs): bool { - return Storage::__put(backend, args$key, args$value, args$overwrite); + return Storage::__put(backend, args$key, args$value, args$overwrite, args$expire_time); } function get(backend: opaque of Storage::BackendHandle, key: any): any diff --git a/scripts/base/init-bare.zeek b/scripts/base/init-bare.zeek index 06fc0283d3..9593efd0f1 100644 --- a/scripts/base/init-bare.zeek +++ b/scripts/base/init-bare.zeek @@ -6210,6 +6210,14 @@ export { }; } +module Storage; + +export { + ## The interval used by the storage framework for automatic expiration + ## of elements in all backends that don't support it natively. + const expire_interval = 5.0 secs &redef; +} + module GLOBAL; @load base/bif/event.bif diff --git a/src/Timer.cc b/src/Timer.cc index 285e26df6a..70fa7f4ff7 100644 --- a/src/Timer.cc +++ b/src/Timer.cc @@ -51,6 +51,7 @@ const char* TimerNames[] = { "UnknownProtocolExpire", "LogDelayExpire", "LogFlushWriteBufferTimer", + "StorageExpire", }; const char* timer_type_to_string(TimerType type) { return TimerNames[type]; } diff --git a/src/Timer.h b/src/Timer.h index cf3f4f130d..509a6bec18 100644 --- a/src/Timer.h +++ b/src/Timer.h @@ -58,8 +58,9 @@ enum TimerType : uint8_t { TIMER_UNKNOWN_PROTOCOL_EXPIRE, TIMER_LOG_DELAY_EXPIRE, TIMER_LOG_FLUSH_WRITE_BUFFER, + TIMER_STORAGE_EXPIRE, }; -constexpr int NUM_TIMER_TYPES = int(TIMER_LOG_FLUSH_WRITE_BUFFER) + 1; +constexpr int NUM_TIMER_TYPES = int(TIMER_STORAGE_EXPIRE) + 1; extern const char* timer_type_to_string(TimerType type); diff --git a/src/const.bif b/src/const.bif index f2b0423ba6..822b0d30a6 100644 --- a/src/const.bif +++ b/src/const.bif @@ -32,3 +32,5 @@ const Threading::heartbeat_interval: interval; const Log::flush_interval: interval; const Log::write_buffer_size: count; + +const Storage::expire_interval: interval; diff --git a/src/storage/Backend.cc b/src/storage/Backend.cc index f064344d62..37e074eda3 100644 --- a/src/storage/Backend.cc +++ b/src/storage/Backend.cc @@ -14,7 +14,7 @@ ErrorResult Backend::Open(RecordValPtr options, TypePtr kt, TypePtr vt) { return DoOpen(std::move(options)); } -ErrorResult Backend::Put(ValPtr key, ValPtr value, bool overwrite) { +ErrorResult Backend::Put(ValPtr key, ValPtr value, bool overwrite, double expiration_time) { // The intention for this method is to do some other heavy lifting in regard // to backends that need to pass data through the manager instead of directly // through the workers. For the first versions of the storage framework it @@ -26,7 +26,7 @@ ErrorResult Backend::Put(ValPtr key, ValPtr value, bool overwrite) { return util::fmt("type of value passed (%s) does not match backend's value type (%s)", obj_desc_short(value->GetType().get()).c_str(), val_type->GetName().c_str()); - return DoPut(std::move(key), std::move(value), overwrite); + return DoPut(std::move(key), std::move(value), overwrite, expiration_time); } ValResult Backend::Get(ValPtr key) { diff --git a/src/storage/Backend.h b/src/storage/Backend.h index dccff707ee..dd8aedc064 100644 --- a/src/storage/Backend.h +++ b/src/storage/Backend.h @@ -32,10 +32,12 @@ public: * @param key the key for the pair * @param value the value for the pair * @param overwrite whether an existing value for a key should be overwritten. - * @return A result pair containing a bool with the success state, and a - * possible error string if the operation failed. + * @param expiration_time the time when this entry should be automatically + * removed. Set to zero to disable expiration. + * @return An optional value potentially containing an error string if + * needed. Will be unset if the operation succeeded. */ - ErrorResult Put(ValPtr key, ValPtr value, bool overwrite = true); + ErrorResult Put(ValPtr key, ValPtr value, bool overwrite = true, double expiration_time = 0); /** * Retrieve a value from the backend for a provided key. @@ -99,7 +101,7 @@ protected: /** * The workhorse method for Put(). */ - virtual ErrorResult DoPut(ValPtr key, ValPtr value, bool overwrite = true) = 0; + virtual ErrorResult DoPut(ValPtr key, ValPtr value, bool overwrite = true, double expiration_time = 0) = 0; /** * The workhorse method for Get(). @@ -111,6 +113,12 @@ protected: */ virtual ErrorResult DoErase(ValPtr key) = 0; + /** + * Removes any entries in the backend that have expired. Can be overridden by + * derived classes. + */ + virtual void Expire() {} + TypePtr key_type; TypePtr val_type; diff --git a/src/storage/Manager.cc b/src/storage/Manager.cc index 5c24ff38eb..987f54f55c 100644 --- a/src/storage/Manager.cc +++ b/src/storage/Manager.cc @@ -4,11 +4,24 @@ #include "zeek/Desc.h" +#include "const.bif.netvar_h" + namespace zeek::storage { +void detail::ExpirationTimer::Dispatch(double t, bool is_expire) { + if ( is_expire ) + return; + + storage_mgr->Expire(); + storage_mgr->StartExpirationTimer(); +} + Manager::Manager() : plugin::ComponentManager("Storage", "Backend") {} -void Manager::InitPostScript() { detail::backend_opaque = make_intrusive("Storage::Backend"); } +void Manager::InitPostScript() { + detail::backend_opaque = make_intrusive("Storage::Backend"); + StartExpirationTimer(); +} zeek::expected Manager::OpenBackend(const Tag& type, RecordValPtr options, TypePtr key_type, TypePtr val_type) { @@ -40,20 +53,39 @@ zeek::expected Manager::OpenBackend(const Tag& type, Re // TODO: post Storage::backend_opened event - backends.push_back(bp); + { + std::unique_lock lk(backends_mtx); + backends.push_back(bp); + } return bp; } void Manager::CloseBackend(BackendPtr backend) { - auto it = std::find(backends.begin(), backends.end(), backend); - if ( it == backends.end() ) - return; + { + std::unique_lock lk(backends_mtx); + auto it = std::find(backends.begin(), backends.end(), backend); + if ( it == backends.end() ) + return; + + backends.erase(it); + } - backends.erase(it); backend->Close(); // TODO: post Storage::backend_lost event } +void Manager::Expire() { + DBG_LOG(DBG_STORAGE, "Expire running, have %zu backends to check", backends.size()); + std::unique_lock lk(backends_mtx); + for ( const auto& b : backends ) + b->Expire(); +} + +void Manager::StartExpirationTimer() { + zeek::detail::timer_mgr->Add( + new detail::ExpirationTimer(run_state::network_time + zeek::BifConst::Storage::expire_interval)); +} + } // namespace zeek::storage diff --git a/src/storage/Manager.h b/src/storage/Manager.h index c0545e53a8..49957a678d 100644 --- a/src/storage/Manager.h +++ b/src/storage/Manager.h @@ -2,12 +2,26 @@ #pragma once +#include + +#include "zeek/Timer.h" #include "zeek/plugin/ComponentManager.h" #include "zeek/storage/Backend.h" #include "zeek/storage/Component.h" namespace zeek::storage { +namespace detail { + +class ExpirationTimer final : public zeek::detail::Timer { +public: + ExpirationTimer(double t) : zeek::detail::Timer(t, zeek::detail::TIMER_STORAGE_EXPIRE) {} + ~ExpirationTimer() override {} + void Dispatch(double t, bool is_expire) override; +}; + +} // namespace detail + class Manager final : public plugin::ComponentManager { public: Manager(); @@ -40,12 +54,14 @@ public: */ void CloseBackend(BackendPtr backend); - // TODO: - // - Hooks for storage-backed tables? - // - Handling aggregation from workers on a single manager? +protected: + friend class storage::detail::ExpirationTimer; + void Expire(); + void StartExpirationTimer(); private: std::vector backends; + std::mutex backends_mtx; }; } // namespace zeek::storage diff --git a/src/storage/storage.bif b/src/storage/storage.bif index 98ac5978c9..960084e666 100644 --- a/src/storage/storage.bif +++ b/src/storage/storage.bif @@ -49,7 +49,8 @@ function Storage::__close_backend%(backend: opaque of Storage::BackendHandle%) : return val_mgr->Bool(true); %} -function Storage::__put%(backend: opaque of Storage::BackendHandle, key: any, value: any, overwrite: bool%): bool +function Storage::__put%(backend: opaque of Storage::BackendHandle, key: any, value: any, + overwrite: bool, expire_time: interval%): bool %{ auto b = dynamic_cast(backend); if ( ! b ) { @@ -63,7 +64,7 @@ function Storage::__put%(backend: opaque of Storage::BackendHandle, key: any, va auto key_v = IntrusivePtr{NewRef{}, key}; auto val_v = IntrusivePtr{NewRef{}, value}; - auto result = b->backend->Put(key_v, val_v, overwrite); + auto result = b->backend->Put(key_v, val_v, overwrite, expire_time); if ( result.has_value() ) { emit_builtin_error(util::fmt("Failed to store data: %s", result.value().c_str())); return val_mgr->Bool(false); diff --git a/testing/btest/plugins/storage-plugin/src/StorageDummy.cc b/testing/btest/plugins/storage-plugin/src/StorageDummy.cc index eb78d7eace..0d0fe83d3f 100644 --- a/testing/btest/plugins/storage-plugin/src/StorageDummy.cc +++ b/testing/btest/plugins/storage-plugin/src/StorageDummy.cc @@ -36,7 +36,8 @@ void StorageDummy::Close() { open = false; } /** * The workhorse method for Put(). This must be implemented by plugins. */ -zeek::storage::ErrorResult StorageDummy::DoPut(zeek::ValPtr key, zeek::ValPtr value, bool overwrite) { +zeek::storage::ErrorResult StorageDummy::DoPut(zeek::ValPtr key, zeek::ValPtr value, bool overwrite, + double expiration_time) { auto json_key = key->ToJSON()->ToStdString(); auto json_value = value->ToJSON()->ToStdString(); data[json_key] = json_value; diff --git a/testing/btest/plugins/storage-plugin/src/StorageDummy.h b/testing/btest/plugins/storage-plugin/src/StorageDummy.h index fab247f1bf..1e2b6bc99e 100644 --- a/testing/btest/plugins/storage-plugin/src/StorageDummy.h +++ b/testing/btest/plugins/storage-plugin/src/StorageDummy.h @@ -36,7 +36,8 @@ public: /** * The workhorse method for Put(). */ - zeek::storage::ErrorResult DoPut(zeek::ValPtr key, zeek::ValPtr value, bool overwrite = true) override; + zeek::storage::ErrorResult DoPut(zeek::ValPtr key, zeek::ValPtr value, bool overwrite = true, + double expiration_time = 0) override; /** * The workhorse method for Get().