Add infrastructure for automated expiration of storage entries

This is used for backends that don't support expiration natively.
This commit is contained in:
Tim Wojtulewicz 2024-12-06 15:29:15 -07:00
parent 8dee733a7d
commit d07d27453a
12 changed files with 96 additions and 21 deletions

View file

@ -16,6 +16,10 @@ export {
# Indicates whether this value should overwrite an existing entry # Indicates whether this value should overwrite an existing entry
# for the key. # for the key.
overwrite: bool &default=T; overwrite: bool &default=T;
# An interval of time until the entry is automatically removed from the
# backend.
expire_time: interval &default=0sec;
}; };
## Opens a new backend connection based on a configuration object. ## Opens a new backend connection based on a configuration object.
@ -95,7 +99,7 @@ function close_backend(backend: opaque of Storage::BackendHandle): bool
function put(backend: opaque of Storage::BackendHandle, args: Storage::PutArgs): bool function put(backend: opaque of Storage::BackendHandle, args: Storage::PutArgs): bool
{ {
return Storage::__put(backend, args$key, args$value, args$overwrite); return Storage::__put(backend, args$key, args$value, args$overwrite, args$expire_time);
} }
function get(backend: opaque of Storage::BackendHandle, key: any): any function get(backend: opaque of Storage::BackendHandle, key: any): any

View file

@ -6210,6 +6210,14 @@ export {
}; };
} }
module Storage;
export {
## The interval used by the storage framework for automatic expiration
## of elements in all backends that don't support it natively.
const expire_interval = 5.0 secs &redef;
}
module GLOBAL; module GLOBAL;
@load base/bif/event.bif @load base/bif/event.bif

View file

@ -51,6 +51,7 @@ const char* TimerNames[] = {
"UnknownProtocolExpire", "UnknownProtocolExpire",
"LogDelayExpire", "LogDelayExpire",
"LogFlushWriteBufferTimer", "LogFlushWriteBufferTimer",
"StorageExpire",
}; };
const char* timer_type_to_string(TimerType type) { return TimerNames[type]; } const char* timer_type_to_string(TimerType type) { return TimerNames[type]; }

View file

@ -58,8 +58,9 @@ enum TimerType : uint8_t {
TIMER_UNKNOWN_PROTOCOL_EXPIRE, TIMER_UNKNOWN_PROTOCOL_EXPIRE,
TIMER_LOG_DELAY_EXPIRE, TIMER_LOG_DELAY_EXPIRE,
TIMER_LOG_FLUSH_WRITE_BUFFER, TIMER_LOG_FLUSH_WRITE_BUFFER,
TIMER_STORAGE_EXPIRE,
}; };
constexpr int NUM_TIMER_TYPES = int(TIMER_LOG_FLUSH_WRITE_BUFFER) + 1; constexpr int NUM_TIMER_TYPES = int(TIMER_STORAGE_EXPIRE) + 1;
extern const char* timer_type_to_string(TimerType type); extern const char* timer_type_to_string(TimerType type);

View file

@ -32,3 +32,5 @@ const Threading::heartbeat_interval: interval;
const Log::flush_interval: interval; const Log::flush_interval: interval;
const Log::write_buffer_size: count; const Log::write_buffer_size: count;
const Storage::expire_interval: interval;

View file

@ -14,7 +14,7 @@ ErrorResult Backend::Open(RecordValPtr options, TypePtr kt, TypePtr vt) {
return DoOpen(std::move(options)); return DoOpen(std::move(options));
} }
ErrorResult Backend::Put(ValPtr key, ValPtr value, bool overwrite) { ErrorResult Backend::Put(ValPtr key, ValPtr value, bool overwrite, double expiration_time) {
// The intention for this method is to do some other heavy lifting in regard // The intention for this method is to do some other heavy lifting in regard
// to backends that need to pass data through the manager instead of directly // to backends that need to pass data through the manager instead of directly
// through the workers. For the first versions of the storage framework it // through the workers. For the first versions of the storage framework it
@ -26,7 +26,7 @@ ErrorResult Backend::Put(ValPtr key, ValPtr value, bool overwrite) {
return util::fmt("type of value passed (%s) does not match backend's value type (%s)", return util::fmt("type of value passed (%s) does not match backend's value type (%s)",
obj_desc_short(value->GetType().get()).c_str(), val_type->GetName().c_str()); obj_desc_short(value->GetType().get()).c_str(), val_type->GetName().c_str());
return DoPut(std::move(key), std::move(value), overwrite); return DoPut(std::move(key), std::move(value), overwrite, expiration_time);
} }
ValResult Backend::Get(ValPtr key) { ValResult Backend::Get(ValPtr key) {

View file

@ -32,10 +32,12 @@ public:
* @param key the key for the pair * @param key the key for the pair
* @param value the value for the pair * @param value the value for the pair
* @param overwrite whether an existing value for a key should be overwritten. * @param overwrite whether an existing value for a key should be overwritten.
* @return A result pair containing a bool with the success state, and a * @param expiration_time the time when this entry should be automatically
* possible error string if the operation failed. * removed. Set to zero to disable expiration.
* @return An optional value potentially containing an error string if
* needed. Will be unset if the operation succeeded.
*/ */
ErrorResult Put(ValPtr key, ValPtr value, bool overwrite = true); ErrorResult Put(ValPtr key, ValPtr value, bool overwrite = true, double expiration_time = 0);
/** /**
* Retrieve a value from the backend for a provided key. * Retrieve a value from the backend for a provided key.
@ -99,7 +101,7 @@ protected:
/** /**
* The workhorse method for Put(). * The workhorse method for Put().
*/ */
virtual ErrorResult DoPut(ValPtr key, ValPtr value, bool overwrite = true) = 0; virtual ErrorResult DoPut(ValPtr key, ValPtr value, bool overwrite = true, double expiration_time = 0) = 0;
/** /**
* The workhorse method for Get(). * The workhorse method for Get().
@ -111,6 +113,12 @@ protected:
*/ */
virtual ErrorResult DoErase(ValPtr key) = 0; virtual ErrorResult DoErase(ValPtr key) = 0;
/**
* Removes any entries in the backend that have expired. Can be overridden by
* derived classes.
*/
virtual void Expire() {}
TypePtr key_type; TypePtr key_type;
TypePtr val_type; TypePtr val_type;

View file

@ -4,11 +4,24 @@
#include "zeek/Desc.h" #include "zeek/Desc.h"
#include "const.bif.netvar_h"
namespace zeek::storage { namespace zeek::storage {
void detail::ExpirationTimer::Dispatch(double t, bool is_expire) {
if ( is_expire )
return;
storage_mgr->Expire();
storage_mgr->StartExpirationTimer();
}
Manager::Manager() : plugin::ComponentManager<storage::Component>("Storage", "Backend") {} Manager::Manager() : plugin::ComponentManager<storage::Component>("Storage", "Backend") {}
void Manager::InitPostScript() { detail::backend_opaque = make_intrusive<OpaqueType>("Storage::Backend"); } void Manager::InitPostScript() {
detail::backend_opaque = make_intrusive<OpaqueType>("Storage::Backend");
StartExpirationTimer();
}
zeek::expected<BackendPtr, std::string> Manager::OpenBackend(const Tag& type, RecordValPtr options, TypePtr key_type, zeek::expected<BackendPtr, std::string> Manager::OpenBackend(const Tag& type, RecordValPtr options, TypePtr key_type,
TypePtr val_type) { TypePtr val_type) {
@ -40,20 +53,39 @@ zeek::expected<BackendPtr, std::string> Manager::OpenBackend(const Tag& type, Re
// TODO: post Storage::backend_opened event // TODO: post Storage::backend_opened event
{
std::unique_lock<std::mutex> lk(backends_mtx);
backends.push_back(bp); backends.push_back(bp);
}
return bp; return bp;
} }
void Manager::CloseBackend(BackendPtr backend) { void Manager::CloseBackend(BackendPtr backend) {
{
std::unique_lock<std::mutex> lk(backends_mtx);
auto it = std::find(backends.begin(), backends.end(), backend); auto it = std::find(backends.begin(), backends.end(), backend);
if ( it == backends.end() ) if ( it == backends.end() )
return; return;
backends.erase(it); backends.erase(it);
}
backend->Close(); backend->Close();
// TODO: post Storage::backend_lost event // TODO: post Storage::backend_lost event
} }
void Manager::Expire() {
DBG_LOG(DBG_STORAGE, "Expire running, have %zu backends to check", backends.size());
std::unique_lock<std::mutex> lk(backends_mtx);
for ( const auto& b : backends )
b->Expire();
}
void Manager::StartExpirationTimer() {
zeek::detail::timer_mgr->Add(
new detail::ExpirationTimer(run_state::network_time + zeek::BifConst::Storage::expire_interval));
}
} // namespace zeek::storage } // namespace zeek::storage

View file

@ -2,12 +2,26 @@
#pragma once #pragma once
#include <mutex>
#include "zeek/Timer.h"
#include "zeek/plugin/ComponentManager.h" #include "zeek/plugin/ComponentManager.h"
#include "zeek/storage/Backend.h" #include "zeek/storage/Backend.h"
#include "zeek/storage/Component.h" #include "zeek/storage/Component.h"
namespace zeek::storage { namespace zeek::storage {
namespace detail {
class ExpirationTimer final : public zeek::detail::Timer {
public:
ExpirationTimer(double t) : zeek::detail::Timer(t, zeek::detail::TIMER_STORAGE_EXPIRE) {}
~ExpirationTimer() override {}
void Dispatch(double t, bool is_expire) override;
};
} // namespace detail
class Manager final : public plugin::ComponentManager<Component> { class Manager final : public plugin::ComponentManager<Component> {
public: public:
Manager(); Manager();
@ -40,12 +54,14 @@ public:
*/ */
void CloseBackend(BackendPtr backend); void CloseBackend(BackendPtr backend);
// TODO: protected:
// - Hooks for storage-backed tables? friend class storage::detail::ExpirationTimer;
// - Handling aggregation from workers on a single manager? void Expire();
void StartExpirationTimer();
private: private:
std::vector<BackendPtr> backends; std::vector<BackendPtr> backends;
std::mutex backends_mtx;
}; };
} // namespace zeek::storage } // namespace zeek::storage

View file

@ -49,7 +49,8 @@ function Storage::__close_backend%(backend: opaque of Storage::BackendHandle%) :
return val_mgr->Bool(true); return val_mgr->Bool(true);
%} %}
function Storage::__put%(backend: opaque of Storage::BackendHandle, key: any, value: any, overwrite: bool%): bool function Storage::__put%(backend: opaque of Storage::BackendHandle, key: any, value: any,
overwrite: bool, expire_time: interval%): bool
%{ %{
auto b = dynamic_cast<storage::detail::BackendHandleVal*>(backend); auto b = dynamic_cast<storage::detail::BackendHandleVal*>(backend);
if ( ! b ) { if ( ! b ) {
@ -63,7 +64,7 @@ function Storage::__put%(backend: opaque of Storage::BackendHandle, key: any, va
auto key_v = IntrusivePtr<Val>{NewRef{}, key}; auto key_v = IntrusivePtr<Val>{NewRef{}, key};
auto val_v = IntrusivePtr<Val>{NewRef{}, value}; auto val_v = IntrusivePtr<Val>{NewRef{}, value};
auto result = b->backend->Put(key_v, val_v, overwrite); auto result = b->backend->Put(key_v, val_v, overwrite, expire_time);
if ( result.has_value() ) { if ( result.has_value() ) {
emit_builtin_error(util::fmt("Failed to store data: %s", result.value().c_str())); emit_builtin_error(util::fmt("Failed to store data: %s", result.value().c_str()));
return val_mgr->Bool(false); return val_mgr->Bool(false);

View file

@ -36,7 +36,8 @@ void StorageDummy::Close() { open = false; }
/** /**
* The workhorse method for Put(). This must be implemented by plugins. * The workhorse method for Put(). This must be implemented by plugins.
*/ */
zeek::storage::ErrorResult StorageDummy::DoPut(zeek::ValPtr key, zeek::ValPtr value, bool overwrite) { zeek::storage::ErrorResult StorageDummy::DoPut(zeek::ValPtr key, zeek::ValPtr value, bool overwrite,
double expiration_time) {
auto json_key = key->ToJSON()->ToStdString(); auto json_key = key->ToJSON()->ToStdString();
auto json_value = value->ToJSON()->ToStdString(); auto json_value = value->ToJSON()->ToStdString();
data[json_key] = json_value; data[json_key] = json_value;

View file

@ -36,7 +36,8 @@ public:
/** /**
* The workhorse method for Put(). * The workhorse method for Put().
*/ */
zeek::storage::ErrorResult DoPut(zeek::ValPtr key, zeek::ValPtr value, bool overwrite = true) override; zeek::storage::ErrorResult DoPut(zeek::ValPtr key, zeek::ValPtr value, bool overwrite = true,
double expiration_time = 0) override;
/** /**
* The workhorse method for Get(). * The workhorse method for Get().