diff --git a/scripts/base/frameworks/storage/main.zeek b/scripts/base/frameworks/storage/main.zeek index f6fe09b960..e7fa684466 100644 --- a/scripts/base/frameworks/storage/main.zeek +++ b/scripts/base/frameworks/storage/main.zeek @@ -13,13 +13,19 @@ export { # The value to store associated with the key. value: any; - # Indicates whether this value should overwrite an existing entry - # for the key. + # Indicates whether this value should overwrite an existing entry for the + # key. overwrite: bool &default=T; # An interval of time until the entry is automatically removed from the # backend. expire_time: interval &default=0sec; + + # Indicates whether this operation should happen asynchronously. If this + # is true, the call to put must happen as part of a :zeek:see:`when` + # statement. This flag is overridden and set to F when reading pcaps, + # since time won't move forward the same as when caputring live traffic. + async_mode: bool &default=T; }; ## Opens a new backend connection based on a configuration object. @@ -58,7 +64,7 @@ export { ## ## Returns: A boolean indicating success or failure of the operation. Type ## comparison failures against the types passed to - ## :zeek:see:`Storage::open_backend` for the backend will cause false to + ## :zeek:see:`Storage::open_backend` for the backend will cause ``F`` to ## be returned. global put: function(backend: opaque of Storage::BackendHandle, args: Storage::PutArgs): bool; @@ -68,11 +74,18 @@ export { ## ## key: The key to look up. ## + ## async_mode: Indicates whether this operation should happen asynchronously. If + ## this is T, the call must happen as part of a :zeek:see:`when` + ## statement. This flag is overridden and set to F when reading pcaps, + ## since time won't move forward the same as when caputring live + ## traffic. + ## ## Returns: A boolean indicating success or failure of the operation. Type ## comparison failures against the types passed to - ## :zeek:see:`Storage::open_backend` for the backend will cause false to + ## :zeek:see:`Storage::open_backend` for the backend will cause ``F`` to ## be returned. - global get: function(backend: opaque of Storage::BackendHandle, key: any): any; + global get: function(backend: opaque of Storage::BackendHandle, key: any, + async_mode: bool &default=T): any; ## Erases an entry from the backend. ## @@ -80,11 +93,18 @@ export { ## ## key: The key to erase. ## + ## async_mode: Indicates whether this operation should happen asynchronously. If + ## this is T, the call must happen as part of a :zeek:see:`when` + ## statement. This flag is overridden and set to F when reading pcaps, + ## since time won't move forward the same as when caputring live + ## traffic. + ## ## Returns: A boolean indicating success or failure of the operation. Type ## comparison failures against the types passed to - ## :zeek:see:`Storage::open_backend` for the backend will cause false to + ## :zeek:see:`Storage::open_backend` for the backend will cause ``F`` to ## be returned. - global erase: function(backend: opaque of Storage::BackendHandle, key: any): bool; + global erase: function(backend: opaque of Storage::BackendHandle, key: any, + async_mode: bool &default=T): bool; } function open_backend(btype: Storage::Backend, options: any, key_type: any, val_type: any): opaque of Storage::BackendHandle @@ -99,15 +119,15 @@ function close_backend(backend: opaque of Storage::BackendHandle): bool function put(backend: opaque of Storage::BackendHandle, args: Storage::PutArgs): bool { - return Storage::__put(backend, args$key, args$value, args$overwrite, args$expire_time); + return Storage::__put(backend, args$key, args$value, args$overwrite, args$expire_time, args$async_mode); } -function get(backend: opaque of Storage::BackendHandle, key: any): any +function get(backend: opaque of Storage::BackendHandle, key: any, async_mode: bool &default=T): any { - return Storage::__get(backend, key); + return Storage::__get(backend, key, async_mode); } -function erase(backend: opaque of Storage::BackendHandle, key: any): bool +function erase(backend: opaque of Storage::BackendHandle, key: any, async_mode: bool &default=T): bool { - return Storage::__erase(backend, key); + return Storage::__erase(backend, key, async_mode); } diff --git a/src/storage/Backend.cc b/src/storage/Backend.cc index 37e074eda3..bf4e6b69d7 100644 --- a/src/storage/Backend.cc +++ b/src/storage/Backend.cc @@ -3,10 +3,58 @@ #include "zeek/storage/Backend.h" #include "zeek/Desc.h" +#include "zeek/RunState.h" +#include "zeek/Trigger.h" #include "zeek/broker/Data.h" namespace zeek::storage { +ErrorResultCallback::ErrorResultCallback(zeek::detail::trigger::TriggerPtr trigger, const void* assoc) + : trigger(std::move(trigger)), assoc(assoc) {} +ErrorResultCallback::~ErrorResultCallback() {} + +void ErrorResultCallback::Complete(const ErrorResult& res) { + zeek::Val* result; + + if ( res ) + result = new StringVal(res.value()); + else + result = val_mgr->Bool(true).get(); + + trigger->Cache(assoc, result); + Unref(result); + trigger->Release(); +} + +void ErrorResultCallback::Timeout() { + auto v = make_intrusive("Timeout during request"); + trigger->Cache(assoc, v.get()); +} + +ValResultCallback::ValResultCallback(zeek::detail::trigger::TriggerPtr trigger, const void* assoc) + : trigger(std::move(trigger)), assoc(assoc) {} +ValResultCallback::~ValResultCallback() {} + +void ValResultCallback::Complete(const ValResult& res) { + zeek::Val* result; + + if ( res ) { + result = res.value().get(); + Ref(result); + } + else + result = new StringVal(res.error()); + + trigger->Cache(assoc, result); + Unref(result); + trigger->Release(); +} + +void ValResultCallback::Timeout() { + auto v = make_intrusive("Timeout during request"); + trigger->Cache(assoc, v.get()); +} + ErrorResult Backend::Open(RecordValPtr options, TypePtr kt, TypePtr vt) { key_type = std::move(kt); val_type = std::move(vt); @@ -14,7 +62,7 @@ ErrorResult Backend::Open(RecordValPtr options, TypePtr kt, TypePtr vt) { return DoOpen(std::move(options)); } -ErrorResult Backend::Put(ValPtr key, ValPtr value, bool overwrite, double expiration_time) { +ErrorResult Backend::Put(ValPtr key, ValPtr value, bool overwrite, double expiration_time, ErrorResultCallback* cb) { // The intention for this method is to do some other heavy lifting in regard // to backends that need to pass data through the manager instead of directly // through the workers. For the first versions of the storage framework it @@ -26,25 +74,46 @@ ErrorResult Backend::Put(ValPtr key, ValPtr value, bool overwrite, double expira return util::fmt("type of value passed (%s) does not match backend's value type (%s)", obj_desc_short(value->GetType().get()).c_str(), val_type->GetName().c_str()); - return DoPut(std::move(key), std::move(value), overwrite, expiration_time); + auto res = DoPut(std::move(key), std::move(value), overwrite, expiration_time, cb); + + if ( (! native_async || zeek::run_state::reading_traces) && cb ) { + cb->Complete(res); + delete cb; + } + + return res; } -ValResult Backend::Get(ValPtr key) { +ValResult Backend::Get(ValPtr key, ValResultCallback* cb) { // See the note in Put(). if ( ! same_type(key->GetType(), key_type) ) return zeek::unexpected(util::fmt("type of key passed (%s) does not match backend's key type (%s)", key->GetType()->GetName().c_str(), key_type->GetName().c_str())); - return DoGet(std::move(key)); + auto res = DoGet(std::move(key), cb); + + if ( (! native_async || zeek::run_state::reading_traces) && cb ) { + cb->Complete(res); + delete cb; + } + + return res; } -ErrorResult Backend::Erase(ValPtr key) { +ErrorResult Backend::Erase(ValPtr key, ErrorResultCallback* cb) { // See the note in Put(). if ( ! same_type(key->GetType(), key_type) ) return util::fmt("type of key passed (%s) does not match backend's key type (%s)", key->GetType()->GetName().c_str(), key_type->GetName().c_str()); - return DoErase(std::move(key)); + auto res = DoErase(std::move(key), cb); + + if ( (! native_async || zeek::run_state::reading_traces) && cb ) { + cb->Complete(res); + delete cb; + } + + return res; } zeek::OpaqueTypePtr detail::backend_opaque; diff --git a/src/storage/Backend.h b/src/storage/Backend.h index dd8aedc064..ea50b36c3d 100644 --- a/src/storage/Backend.h +++ b/src/storage/Backend.h @@ -6,6 +6,11 @@ #include "zeek/Val.h" #include "zeek/util.h" +namespace zeek::detail::trigger { +class Trigger; +using TriggerPtr = IntrusivePtr; +} // namespace zeek::detail::trigger + namespace zeek::storage { class Manager; @@ -19,6 +24,32 @@ using ErrorResult = std::optional; // string value will store an error message if the result is null. using ValResult = zeek::expected; +// A callback result that returns an ErrorResult. +class ErrorResultCallback { +public: + ErrorResultCallback(zeek::detail::trigger::TriggerPtr trigger, const void* assoc); + ~ErrorResultCallback(); + void Complete(const ErrorResult& res); + void Timeout(); + +private: + zeek::detail::trigger::TriggerPtr trigger; + const void* assoc; +}; + +// A callback result that returns a ValResult. +class ValResultCallback { +public: + ValResultCallback(zeek::detail::trigger::TriggerPtr trigger, const void* assoc); + ~ValResultCallback(); + void Complete(const ValResult& res); + void Timeout(); + +private: + zeek::detail::trigger::TriggerPtr trigger; + const void* assoc; +}; + class Backend : public zeek::Obj { public: /** @@ -29,39 +60,50 @@ public: /** * Store a new key/value pair in the backend. * - * @param key the key for the pair - * @param value the value for the pair + * @param key the key for the pair. + * @param value the value for the pair. * @param overwrite whether an existing value for a key should be overwritten. * @param expiration_time the time when this entry should be automatically * removed. Set to zero to disable expiration. + * @param cb An optional callback object if being called via an async context. * @return An optional value potentially containing an error string if - * needed. Will be unset if the operation succeeded. + * needed Will be unset if the operation succeeded. */ - ErrorResult Put(ValPtr key, ValPtr value, bool overwrite = true, double expiration_time = 0); + ErrorResult Put(ValPtr key, ValPtr value, bool overwrite = true, double expiration_time = 0, + ErrorResultCallback* cb = nullptr); /** * Retrieve a value from the backend for a provided key. * * @param key the key to lookup in the backend. + * @param cb An optional callback object if being called via an async context. * @return A std::expected containing either a valid ValPtr with the result * of the operation or a string containing an error message for failure. */ - ValResult Get(ValPtr key); + ValResult Get(ValPtr key, ValResultCallback* cb = nullptr); /** * Erases the value for a key from the backend. * + * @param key the key to erase + * @param cb An optional callback object if being called via an async context. * @return An optional value potentially containing an error string if * needed. Will be unset if the operation succeeded. * possible error string if the operation failed. */ - ErrorResult Erase(ValPtr key); + ErrorResult Erase(ValPtr key, ErrorResultCallback* cb = nullptr); /** * Returns whether the backend is opened. */ virtual bool IsOpen() = 0; + /** + * Returns whether the backend's connection supports asynchronous commands. + * Defaults to true, but can be overridden by backends. + */ + virtual bool SupportsAsync() { return true; } + protected: // Allow the manager to call Open/Close. friend class storage::Manager; @@ -69,10 +111,14 @@ protected: /** * Constructor * + * @param native_async Denotes whether this backend can handle async request + * natively. If set to false, the Put/Get/Erase methods will call the + * callback after their corresponding Do methods return. If set to true, the + * backend needs to call the callback itself. * @param tag A string representation of the tag for this backend. This * is passed from the Manager through the component factory. */ - Backend(std::string_view tag) : tag(tag) {} + Backend(bool native_async, std::string_view tag) : tag(tag), native_async(native_async) {} /** * Called by the manager system to open the backend. @@ -101,17 +147,18 @@ protected: /** * The workhorse method for Put(). */ - virtual ErrorResult DoPut(ValPtr key, ValPtr value, bool overwrite = true, double expiration_time = 0) = 0; + virtual ErrorResult DoPut(ValPtr key, ValPtr value, bool overwrite = true, double expiration_time = 0, + ErrorResultCallback* cb = nullptr) = 0; /** * The workhorse method for Get(). */ - virtual ValResult DoGet(ValPtr key) = 0; + virtual ValResult DoGet(ValPtr key, ValResultCallback* cb = nullptr) = 0; /** * The workhorse method for Erase(). */ - virtual ErrorResult DoErase(ValPtr key) = 0; + virtual ErrorResult DoErase(ValPtr key, ErrorResultCallback* cb = nullptr) = 0; /** * Removes any entries in the backend that have expired. Can be overridden by @@ -123,6 +170,9 @@ protected: TypePtr val_type; std::string tag; + +private: + bool native_async = false; }; using BackendPtr = zeek::IntrusivePtr; diff --git a/src/storage/storage.bif b/src/storage/storage.bif index 960084e666..56a18a49aa 100644 --- a/src/storage/storage.bif +++ b/src/storage/storage.bif @@ -1,9 +1,35 @@ %%{ #include "zeek/storage/Backend.h" #include "zeek/storage/Manager.h" +#include "zeek/Trigger.h" +#include "zeek/Frame.h" using namespace zeek; using namespace zeek::storage; + +static zeek::detail::trigger::TriggerPtr init_trigger(zeek::detail::Frame* frame, const BackendPtr& b) { + if ( ! b->SupportsAsync() ) { + emit_builtin_error("Async mode requested but backend does not support async operations"); + return nullptr; + } + + auto trigger = frame->GetTrigger(); + + if ( ! trigger ) { + emit_builtin_error("Async storage operations can only be called inside when-conditions"); + return nullptr; + } + + if ( auto timeout = trigger->TimeoutValue(); timeout < 0 ) { + emit_builtin_error("Async Storage operations must specify a timeout block"); + return nullptr; + } + + frame->SetDelayed(); + trigger->Hold(); + + return {NewRef{}, trigger}; +} %%} module Storage; @@ -50,7 +76,7 @@ function Storage::__close_backend%(backend: opaque of Storage::BackendHandle%) : %} function Storage::__put%(backend: opaque of Storage::BackendHandle, key: any, value: any, - overwrite: bool, expire_time: interval%): bool + overwrite: bool, expire_time: interval, async_mode: bool &default=T%): bool %{ auto b = dynamic_cast(backend); if ( ! b ) { @@ -60,11 +86,23 @@ function Storage::__put%(backend: opaque of Storage::BackendHandle, key: any, va else if ( ! b->backend->IsOpen() ) return val_mgr->Bool(false); - // TODO: add support for when statements (see broker/store.bif) + ErrorResultCallback* cb = nullptr; + + if ( async_mode ) { + auto trigger = init_trigger(frame, b->backend); + if ( ! trigger ) + return val_mgr->Bool(false); + + cb = new ErrorResultCallback(trigger, frame->GetTriggerAssoc()); + } auto key_v = IntrusivePtr{NewRef{}, key}; auto val_v = IntrusivePtr{NewRef{}, value}; - auto result = b->backend->Put(key_v, val_v, overwrite, expire_time); + auto result = b->backend->Put(key_v, val_v, overwrite, expire_time, cb); + + if ( async_mode ) + return nullptr; + if ( result.has_value() ) { emit_builtin_error(util::fmt("Failed to store data: %s", result.value().c_str())); return val_mgr->Bool(false); @@ -73,7 +111,7 @@ function Storage::__put%(backend: opaque of Storage::BackendHandle, key: any, va return val_mgr->Bool(true); %} -function Storage::__get%(backend: opaque of Storage::BackendHandle, key: any%): any +function Storage::__get%(backend: opaque of Storage::BackendHandle, key: any, async_mode: bool &default=T%): any %{ auto b = dynamic_cast(backend); if ( ! b ) { @@ -83,10 +121,22 @@ function Storage::__get%(backend: opaque of Storage::BackendHandle, key: any%): else if ( ! b->backend->IsOpen() ) return val_mgr->Bool(false); - // TODO: add support for when statements (see broker/store.bif) + ValResultCallback* cb = nullptr; + + if ( async_mode ) { + auto trigger = init_trigger(frame, b->backend); + if ( ! trigger ) + return val_mgr->Bool(false); + + cb = new ValResultCallback(trigger, frame->GetTriggerAssoc()); + } auto key_v = IntrusivePtr{NewRef{}, key}; - auto result = b->backend->Get(key_v); + auto result = b->backend->Get(key_v, cb); + + if ( async_mode ) + return nullptr; + if ( ! result.has_value() ) { emit_builtin_error(util::fmt("Failed to retrieve data: %s", result.error().c_str())); return val_mgr->Bool(false); @@ -95,7 +145,7 @@ function Storage::__get%(backend: opaque of Storage::BackendHandle, key: any%): return result.value(); %} -function Storage::__erase%(backend: opaque of Storage::BackendHandle, key: any%): bool +function Storage::__erase%(backend: opaque of Storage::BackendHandle, key: any, async_mode: bool &default=T%): bool %{ auto b = dynamic_cast(backend); if ( ! b ) { @@ -105,10 +155,22 @@ function Storage::__erase%(backend: opaque of Storage::BackendHandle, key: any%) else if ( ! b->backend->IsOpen() ) return val_mgr->Bool(false); - // TODO: add support for when statements (see broker/store.bif) + ErrorResultCallback* cb = nullptr; + + if ( async_mode ) { + auto trigger = init_trigger(frame, b->backend); + if ( ! trigger ) + return val_mgr->Bool(false); + + cb = new ErrorResultCallback(trigger, frame->GetTriggerAssoc()); + } auto key_v = IntrusivePtr{NewRef{}, key}; - auto result = b->backend->Erase(key_v); + auto result = b->backend->Erase(key_v, cb); + + if ( async_mode ) + return nullptr; + if ( result.has_value() ) { emit_builtin_error(util::fmt("Failed to erase data for key: %s", result.value().c_str())); return val_mgr->Bool(false); diff --git a/testing/btest/Baseline/plugins.storage/zeek-stderr b/testing/btest/Baseline/plugins.storage/zeek-stderr index 3ef9134f8a..a1151e1362 100644 --- a/testing/btest/Baseline/plugins.storage/zeek-stderr +++ b/testing/btest/Baseline/plugins.storage/zeek-stderr @@ -1,4 +1,4 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. -error in <...>/storage.zeek, line 37: Failed to retrieve data: Failed to find key (Storage::get(b, to_any_coerce key)) +error in <...>/storage.zeek, line 37: Failed to retrieve data: Failed to find key (Storage::get(b, to_any_coerce key, F)) error in <...>/storage.zeek, line 50: Failed to open backend STORAGEDUMMY: open_fail was set to true, returning error (Storage::open_backend(Storage::STORAGEDUMMY, to_any_coerce opts, to_any_coerce str, to_any_coerce str)) error in <...>/storage.zeek, line 51: Invalid storage handle (Storage::close_backend(b2) and F) diff --git a/testing/btest/plugins/storage-plugin/src/StorageDummy.cc b/testing/btest/plugins/storage-plugin/src/StorageDummy.cc index 0d0fe83d3f..ce71dd5c1c 100644 --- a/testing/btest/plugins/storage-plugin/src/StorageDummy.cc +++ b/testing/btest/plugins/storage-plugin/src/StorageDummy.cc @@ -37,7 +37,7 @@ void StorageDummy::Close() { open = false; } * The workhorse method for Put(). This must be implemented by plugins. */ zeek::storage::ErrorResult StorageDummy::DoPut(zeek::ValPtr key, zeek::ValPtr value, bool overwrite, - double expiration_time) { + double expiration_time, zeek::storage::ErrorResultCallback* cb) { auto json_key = key->ToJSON()->ToStdString(); auto json_value = value->ToJSON()->ToStdString(); data[json_key] = json_value; @@ -47,7 +47,7 @@ zeek::storage::ErrorResult StorageDummy::DoPut(zeek::ValPtr key, zeek::ValPtr va /** * The workhorse method for Get(). This must be implemented for plugins. */ -zeek::storage::ValResult StorageDummy::DoGet(zeek::ValPtr key) { +zeek::storage::ValResult StorageDummy::DoGet(zeek::ValPtr key, zeek::storage::ValResultCallback* cb) { auto json_key = key->ToJSON(); auto it = data.find(json_key->ToStdString()); if ( it == data.end() ) @@ -65,7 +65,7 @@ zeek::storage::ValResult StorageDummy::DoGet(zeek::ValPtr key) { /** * The workhorse method for Erase(). This must be implemented for plugins. */ -zeek::storage::ErrorResult StorageDummy::DoErase(zeek::ValPtr key) { +zeek::storage::ErrorResult StorageDummy::DoErase(zeek::ValPtr key, zeek::storage::ErrorResultCallback* cb) { auto json_key = key->ToJSON(); auto it = data.find(json_key->ToStdString()); if ( it == data.end() ) diff --git a/testing/btest/plugins/storage-plugin/src/StorageDummy.h b/testing/btest/plugins/storage-plugin/src/StorageDummy.h index 1e2b6bc99e..b770080dca 100644 --- a/testing/btest/plugins/storage-plugin/src/StorageDummy.h +++ b/testing/btest/plugins/storage-plugin/src/StorageDummy.h @@ -13,7 +13,7 @@ namespace btest::storage::backend { */ class StorageDummy : public zeek::storage::Backend { public: - StorageDummy(std::string_view tag) : Backend(tag) {} + StorageDummy(std::string_view tag) : Backend(false, tag) {} ~StorageDummy() override = default; static zeek::storage::BackendPtr Instantiate(std::string_view tag); @@ -37,17 +37,18 @@ public: * The workhorse method for Put(). */ zeek::storage::ErrorResult DoPut(zeek::ValPtr key, zeek::ValPtr value, bool overwrite = true, - double expiration_time = 0) override; + double expiration_time = 0, + zeek::storage::ErrorResultCallback* cb = nullptr) override; /** * The workhorse method for Get(). */ - zeek::storage::ValResult DoGet(zeek::ValPtr key) override; + zeek::storage::ValResult DoGet(zeek::ValPtr key, zeek::storage::ValResultCallback* cb = nullptr) override; /** * The workhorse method for Erase(). */ - zeek::storage::ErrorResult DoErase(zeek::ValPtr key) override; + zeek::storage::ErrorResult DoErase(zeek::ValPtr key, zeek::storage::ErrorResultCallback* cb = nullptr) override; private: std::map data; diff --git a/testing/btest/plugins/storage.zeek b/testing/btest/plugins/storage.zeek index 8f7ebabf46..d759e4b7da 100644 --- a/testing/btest/plugins/storage.zeek +++ b/testing/btest/plugins/storage.zeek @@ -27,14 +27,14 @@ event zeek_init() { # Test basic operation. The second get() should return an error # as the key should have been erased. local b = Storage::open_backend(Storage::STORAGEDUMMY, opts, str, str); - local put_res = Storage::put(b, [$key=key, $value=value, $overwrite=F]); - local get_res = Storage::get(b, key); + local put_res = Storage::put(b, [$key=key, $value=value, $overwrite=F, $async_mode=F]); + local get_res = Storage::get(b, key, F); if ( get_res is bool ) { print("Got an invalid value in response!"); } - local erase_res = Storage::erase(b, key); - get_res = Storage::get(b, key); + local erase_res = Storage::erase(b, key, F); + get_res = Storage::get(b, key, F); Storage::close_backend(b); # Test attempting to use the closed handle.