Add infrastructure for asynchronous storage operations

This commit is contained in:
Tim Wojtulewicz 2024-12-06 16:05:23 -07:00
parent d07d27453a
commit 7ad6a05f5b
8 changed files with 251 additions and 49 deletions

View file

@ -6,6 +6,11 @@
#include "zeek/Val.h"
#include "zeek/util.h"
namespace zeek::detail::trigger {
class Trigger;
using TriggerPtr = IntrusivePtr<Trigger>;
} // namespace zeek::detail::trigger
namespace zeek::storage {
class Manager;
@ -19,6 +24,32 @@ using ErrorResult = std::optional<std::string>;
// string value will store an error message if the result is null.
using ValResult = zeek::expected<ValPtr, std::string>;
// A callback result that returns an ErrorResult.
class ErrorResultCallback {
public:
ErrorResultCallback(zeek::detail::trigger::TriggerPtr trigger, const void* assoc);
~ErrorResultCallback();
void Complete(const ErrorResult& res);
void Timeout();
private:
zeek::detail::trigger::TriggerPtr trigger;
const void* assoc;
};
// A callback result that returns a ValResult.
class ValResultCallback {
public:
ValResultCallback(zeek::detail::trigger::TriggerPtr trigger, const void* assoc);
~ValResultCallback();
void Complete(const ValResult& res);
void Timeout();
private:
zeek::detail::trigger::TriggerPtr trigger;
const void* assoc;
};
class Backend : public zeek::Obj {
public:
/**
@ -29,39 +60,50 @@ public:
/**
* Store a new key/value pair in the backend.
*
* @param key the key for the pair
* @param value the value for the pair
* @param key the key for the pair.
* @param value the value for the pair.
* @param overwrite whether an existing value for a key should be overwritten.
* @param expiration_time the time when this entry should be automatically
* removed. Set to zero to disable expiration.
* @param cb An optional callback object if being called via an async context.
* @return An optional value potentially containing an error string if
* needed. Will be unset if the operation succeeded.
* needed Will be unset if the operation succeeded.
*/
ErrorResult Put(ValPtr key, ValPtr value, bool overwrite = true, double expiration_time = 0);
ErrorResult Put(ValPtr key, ValPtr value, bool overwrite = true, double expiration_time = 0,
ErrorResultCallback* cb = nullptr);
/**
* Retrieve a value from the backend for a provided key.
*
* @param key the key to lookup in the backend.
* @param cb An optional callback object if being called via an async context.
* @return A std::expected containing either a valid ValPtr with the result
* of the operation or a string containing an error message for failure.
*/
ValResult Get(ValPtr key);
ValResult Get(ValPtr key, ValResultCallback* cb = nullptr);
/**
* Erases the value for a key from the backend.
*
* @param key the key to erase
* @param cb An optional callback object if being called via an async context.
* @return An optional value potentially containing an error string if
* needed. Will be unset if the operation succeeded.
* possible error string if the operation failed.
*/
ErrorResult Erase(ValPtr key);
ErrorResult Erase(ValPtr key, ErrorResultCallback* cb = nullptr);
/**
* Returns whether the backend is opened.
*/
virtual bool IsOpen() = 0;
/**
* Returns whether the backend's connection supports asynchronous commands.
* Defaults to true, but can be overridden by backends.
*/
virtual bool SupportsAsync() { return true; }
protected:
// Allow the manager to call Open/Close.
friend class storage::Manager;
@ -69,10 +111,14 @@ protected:
/**
* Constructor
*
* @param native_async Denotes whether this backend can handle async request
* natively. If set to false, the Put/Get/Erase methods will call the
* callback after their corresponding Do methods return. If set to true, the
* backend needs to call the callback itself.
* @param tag A string representation of the tag for this backend. This
* is passed from the Manager through the component factory.
*/
Backend(std::string_view tag) : tag(tag) {}
Backend(bool native_async, std::string_view tag) : tag(tag), native_async(native_async) {}
/**
* Called by the manager system to open the backend.
@ -101,17 +147,18 @@ protected:
/**
* The workhorse method for Put().
*/
virtual ErrorResult DoPut(ValPtr key, ValPtr value, bool overwrite = true, double expiration_time = 0) = 0;
virtual ErrorResult DoPut(ValPtr key, ValPtr value, bool overwrite = true, double expiration_time = 0,
ErrorResultCallback* cb = nullptr) = 0;
/**
* The workhorse method for Get().
*/
virtual ValResult DoGet(ValPtr key) = 0;
virtual ValResult DoGet(ValPtr key, ValResultCallback* cb = nullptr) = 0;
/**
* The workhorse method for Erase().
*/
virtual ErrorResult DoErase(ValPtr key) = 0;
virtual ErrorResult DoErase(ValPtr key, ErrorResultCallback* cb = nullptr) = 0;
/**
* Removes any entries in the backend that have expired. Can be overridden by
@ -123,6 +170,9 @@ protected:
TypePtr val_type;
std::string tag;
private:
bool native_async = false;
};
using BackendPtr = zeek::IntrusivePtr<Backend>;