diff --git a/src/storage/Backend.cc b/src/storage/Backend.cc index cfedd8bd54..12bd1eb60b 100644 --- a/src/storage/Backend.cc +++ b/src/storage/Backend.cc @@ -3,10 +3,8 @@ #include "zeek/storage/Backend.h" #include "zeek/Desc.h" -#include "zeek/RunState.h" #include "zeek/Trigger.h" #include "zeek/broker/Data.h" -#include "zeek/storage/Manager.h" namespace zeek::storage { @@ -16,84 +14,84 @@ ResultCallback::ResultCallback(zeek::detail::trigger::TriggerPtr trigger, const ResultCallback::~ResultCallback() {} void ResultCallback::Timeout() { - auto v = make_intrusive("Timeout during request"); - trigger->Cache(assoc, v.get()); + if ( ! IsSyncCallback() ) { + auto v = make_intrusive("Timeout during request"); + trigger->Cache(assoc, v.get()); + } } void ResultCallback::ValComplete(Val* result) { - trigger->Cache(assoc, result); + if ( ! IsSyncCallback() ) { + trigger->Cache(assoc, result); + trigger->Release(); + } + Unref(result); - trigger->Release(); } ErrorResultCallback::ErrorResultCallback(IntrusivePtr trigger, const void* assoc) : ResultCallback(std::move(trigger), assoc) {} void ErrorResultCallback::Complete(const ErrorResult& res) { - zeek::Val* result; + if ( IsSyncCallback() ) + result = res; + + zeek::Val* val_result; if ( res ) - result = new StringVal(res.value()); + val_result = new StringVal(res.value()); else - result = val_mgr->Bool(true).get(); + val_result = val_mgr->Bool(true).get(); - ValComplete(result); + ValComplete(val_result); } ValResultCallback::ValResultCallback(zeek::detail::trigger::TriggerPtr trigger, const void* assoc) : ResultCallback(std::move(trigger), assoc) {} void ValResultCallback::Complete(const ValResult& res) { + if ( IsSyncCallback() ) + result = res; + static auto val_result_type = zeek::id::find_type("val_result"); - auto* result = new zeek::RecordVal(val_result_type); + auto* val_result = new zeek::RecordVal(val_result_type); if ( res ) - result->Assign(0, res.value()); + val_result->Assign(0, res.value()); else - result->Assign(1, zeek::make_intrusive(res.error())); + val_result->Assign(1, zeek::make_intrusive(res.error())); - ValComplete(result); + ValComplete(val_result); } +OpenResultCallback::OpenResultCallback(detail::BackendHandleVal* backend) : ResultCallback(), backend(backend) {} + OpenResultCallback::OpenResultCallback(IntrusivePtr trigger, const void* assoc, detail::BackendHandleVal* backend) : ResultCallback(std::move(trigger), assoc), backend(backend) {} void OpenResultCallback::Complete(const ErrorResult& res) { - zeek::Val* result; + if ( IsSyncCallback() ) + result = res; + + zeek::Val* val_result; if ( res ) - result = new StringVal(res.value()); + val_result = new StringVal(res.value()); else - result = backend; + val_result = backend; - ValComplete(result); + ValComplete(val_result); } ErrorResult Backend::Open(RecordValPtr options, TypePtr kt, TypePtr vt, OpenResultCallback* cb) { key_type = std::move(kt); val_type = std::move(vt); - auto res = DoOpen(std::move(options)); - - if ( (! native_async || zeek::run_state::reading_traces) && cb ) { - cb->Complete(res); - delete cb; - } - - return res; + return DoOpen(std::move(options)); } -ErrorResult Backend::Close(ErrorResultCallback* cb) { - auto res = DoClose(cb); - - if ( (! native_async || zeek::run_state::reading_traces) && cb ) { - cb->Complete(res); - delete cb; - } - - return res; -} +ErrorResult Backend::Close(ErrorResultCallback* cb) { return DoClose(cb); } ErrorResult Backend::Put(ValPtr key, ValPtr value, bool overwrite, double expiration_time, ErrorResultCallback* cb) { // The intention for this method is to do some other heavy lifting in regard @@ -107,14 +105,7 @@ ErrorResult Backend::Put(ValPtr key, ValPtr value, bool overwrite, double expira return util::fmt("type of value passed (%s) does not match backend's value type (%s)", obj_desc_short(value->GetType().get()).c_str(), val_type->GetName().c_str()); - auto res = DoPut(std::move(key), std::move(value), overwrite, expiration_time, cb); - - if ( ! native_async && cb ) { - cb->Complete(res); - delete cb; - } - - return res; + return DoPut(std::move(key), std::move(value), overwrite, expiration_time, cb); } ValResult Backend::Get(ValPtr key, ValResultCallback* cb) { @@ -123,14 +114,7 @@ ValResult Backend::Get(ValPtr key, ValResultCallback* cb) { return zeek::unexpected(util::fmt("type of key passed (%s) does not match backend's key type (%s)", key->GetType()->GetName().c_str(), key_type->GetName().c_str())); - auto res = DoGet(std::move(key), cb); - - if ( ! native_async && cb ) { - cb->Complete(res); - delete cb; - } - - return res; + return DoGet(std::move(key), cb); } ErrorResult Backend::Erase(ValPtr key, ErrorResultCallback* cb) { @@ -139,16 +123,31 @@ ErrorResult Backend::Erase(ValPtr key, ErrorResultCallback* cb) { return util::fmt("type of key passed (%s) does not match backend's key type (%s)", key->GetType()->GetName().c_str(), key_type->GetName().c_str()); - auto res = DoErase(std::move(key), cb); + return DoErase(std::move(key), cb); +} - if ( ! native_async && cb ) { - cb->Complete(res); +void Backend::CompleteCallback(ValResultCallback* cb, const ValResult& data) const { + cb->Complete(data); + if ( ! cb->IsSyncCallback() ) { delete cb; } - - return res; } +void Backend::CompleteCallback(ErrorResultCallback* cb, const ErrorResult& data) const { + cb->Complete(data); + if ( ! cb->IsSyncCallback() ) { + delete cb; + } +} + +void Backend::CompleteCallback(OpenResultCallback* cb, const ErrorResult& data) const { + cb->Complete(data); + if ( ! cb->IsSyncCallback() ) { + delete cb; + } +} + + zeek::OpaqueTypePtr detail::backend_opaque; IMPLEMENT_OPAQUE_VALUE(detail::BackendHandleVal) diff --git a/src/storage/Backend.h b/src/storage/Backend.h index 071ce1f507..ab89fb619f 100644 --- a/src/storage/Backend.h +++ b/src/storage/Backend.h @@ -29,34 +29,48 @@ using ValResult = zeek::expected; // code reuse in the other callback methods. class ResultCallback { public: + ResultCallback() = default; ResultCallback(IntrusivePtr trigger, const void* assoc); virtual ~ResultCallback(); void Timeout(); + bool IsSyncCallback() const { return ! trigger; } protected: void ValComplete(Val* result); private: IntrusivePtr trigger; - const void* assoc; + const void* assoc = nullptr; }; // A callback result that returns an ErrorResult. class ErrorResultCallback : public ResultCallback { public: + ErrorResultCallback() = default; ErrorResultCallback(zeek::detail::trigger::TriggerPtr trigger, const void* assoc); virtual void Complete(const ErrorResult& res); + ErrorResult Result() { return result; } + +private: + ErrorResult result; }; // A callback result that returns a ValResult. class ValResultCallback : public ResultCallback { public: + ValResultCallback() = default; ValResultCallback(zeek::detail::trigger::TriggerPtr trigger, const void* assoc); void Complete(const ValResult& res); + ValResult Result() { return result; } + +private: + ValResult result; }; class OpenResultCallback; +enum SupportedModes : uint8_t { SYNC = 0x01, ASYNC = 0x02 }; + class Backend : public zeek::Obj { public: /** @@ -105,6 +119,15 @@ public: */ virtual bool IsOpen() = 0; + bool SupportsSync() const { return (modes & SupportedModes::SYNC) == SupportedModes::SYNC; } + bool SupportsAsync() const { return (modes & SupportedModes::ASYNC) == SupportedModes::ASYNC; } + + /** + * Optional method to allow a backend to poll for data. This can be used to + * mimic sync mode even if the backend only supports async. + */ + virtual void Poll() {} + protected: // Allow the manager to call Open/Close. friend class storage::Manager; @@ -112,14 +135,12 @@ protected: /** * Constructor * - * @param native_async Denotes whether this backend can handle async request - * natively. If set to false, the Put/Get/Erase methods will call the - * callback after their corresponding Do methods return. If set to true, the - * backend needs to call the callback itself. + * @param modes A combination of values from SupportedModes. These modes + # define whether a backend only supports sync or async or both. * @param tag A string representation of the tag for this backend. This * is passed from the Manager through the component factory. */ - Backend(bool native_async, std::string_view tag) : tag(tag), native_async(native_async) {} + Backend(uint8_t modes, std::string_view tag) : tag(tag), modes(modes) {} /** * Called by the manager system to open the backend. @@ -176,13 +197,17 @@ protected: */ virtual void Expire() {} + void CompleteCallback(ValResultCallback* cb, const ValResult& data) const; + void CompleteCallback(ErrorResultCallback* cb, const ErrorResult& data) const; + void CompleteCallback(OpenResultCallback* cb, const ErrorResult& data) const; + TypePtr key_type; TypePtr val_type; std::string tag; private: - bool native_async = false; + uint8_t modes; }; using BackendPtr = zeek::IntrusivePtr; @@ -210,12 +235,15 @@ protected: // A callback for the Backend::Open() method that returns an error or a backend handle. class OpenResultCallback : public ResultCallback { public: + OpenResultCallback(detail::BackendHandleVal* backend); OpenResultCallback(IntrusivePtr trigger, const void* assoc, detail::BackendHandleVal* backend); void Complete(const ErrorResult& res); + ErrorResult Result() { return result; } private: - detail::BackendHandleVal* backend; + ErrorResult result; + detail::BackendHandleVal* backend = nullptr; }; } // namespace zeek::storage diff --git a/src/storage/backend/redis/Redis.cc b/src/storage/backend/redis/Redis.cc index 18e3b45a2e..d23f62a4e4 100644 --- a/src/storage/backend/redis/Redis.cc +++ b/src/storage/backend/redis/Redis.cc @@ -17,10 +17,8 @@ namespace { class Tracer { public: - Tracer(const std::string& where) : where(where) { // printf("%s\n", where.c_str()); - } - ~Tracer() { // printf("%s done\n", where.c_str()); - } + Tracer(const std::string& where) : where(where) {} // DBG_LOG(zeek::DBG_STORAGE, "%s", where.c_str()); } + ~Tracer() {} // DBG_LOG(zeek::DBG_STORAGE, "%s done", where.c_str()); } std::string where; }; @@ -58,7 +56,7 @@ void redisErase(redisAsyncContext* ctx, void* reply, void* privdata) { } void redisZRANGEBYSCORE(redisAsyncContext* ctx, void* reply, void* privdata) { - auto t = Tracer("erase"); + auto t = Tracer("zrangebyscore"); auto backend = static_cast(ctx->data); backend->HandleZRANGEBYSCORE(static_cast(reply)); } @@ -201,11 +199,6 @@ ErrorResult Redis::DoOpen(RecordValPtr options, OpenResultCallback* cb) { async_ctx->ev.addWrite = redisAddWrite; async_ctx->ev.delWrite = redisDelWrite; - if ( ! cb ) - // Polling here will eventually call OnConnect, which will set the flag - // that we're connected. - Poll(); - return std::nullopt; } @@ -218,7 +211,7 @@ ErrorResult Redis::DoClose(ErrorResultCallback* cb) { redisAsyncDisconnect(async_ctx); ++active_ops; - if ( ! cb && ! zeek::run_state::terminating ) { + if ( cb->IsSyncCallback() && ! zeek::run_state::terminating ) { Poll(); // TODO: handle response } @@ -261,25 +254,6 @@ ErrorResult Redis::DoPut(ValPtr key, ValPtr value, bool overwrite, double expira ++active_ops; - if ( ! cb ) { - Poll(); - - redisReply* reply = reply_queue.front(); - reply_queue.pop_front(); - - ErrorResult res; - if ( ! connected ) - res = util::fmt("Connection is not open"); - else if ( ! reply ) - res = util::fmt("Async put operation returned null reply"); - else if ( reply && reply->type == REDIS_REPLY_ERROR ) - res = util::fmt("Async put operation failed: %s", reply->str); - - freeReplyObject(reply); - if ( res.has_value() ) - return res; - } - // If reading pcaps insert into a secondary set that's ordered by expiration // time that gets checked by Expire(). if ( expiration_time > 0.0 && zeek::run_state::reading_traces ) { @@ -296,12 +270,6 @@ ErrorResult Redis::DoPut(ValPtr key, ValPtr value, bool overwrite, double expira ++active_ops; } - if ( ! cb ) { - // We don't care about the result from the ZADD, just that we wait - // for it to finish. - Poll(); - } - return std::nullopt; } @@ -321,16 +289,6 @@ ValResult Redis::DoGet(ValPtr key, ValResultCallback* cb) { ++active_ops; - if ( ! cb ) { - Poll(); - redisReply* reply = reply_queue.front(); - reply_queue.pop_front(); - - auto res = ParseGetReply(reply); - freeReplyObject(reply); - return res; - } - // There isn't a result to return here. That happens in HandleGetResult for // async operations. return zeek::unexpected(""); @@ -352,13 +310,6 @@ ErrorResult Redis::DoErase(ValPtr key, ErrorResultCallback* cb) { ++active_ops; - if ( ! cb ) { - Poll(); - redisReply* reply = reply_queue.front(); - reply_queue.pop_front(); - freeReplyObject(reply); - } - return std::nullopt; } @@ -415,46 +366,37 @@ void Redis::Expire() { void Redis::HandlePutResult(redisReply* reply, ErrorResultCallback* callback) { --active_ops; - if ( callback ) { - ErrorResult res; - if ( ! connected ) - res = util::fmt("Connection is not open"); - else if ( ! reply ) - res = util::fmt("Async put operation returned null reply"); - else if ( reply && reply->type == REDIS_REPLY_ERROR ) - res = util::fmt("Async put operation failed: %s", reply->str); + ErrorResult res; + if ( ! connected ) + res = util::fmt("Connection is not open"); + else if ( ! reply ) + res = util::fmt("Async put operation returned null reply"); + else if ( reply && reply->type == REDIS_REPLY_ERROR ) + res = util::fmt("Async put operation failed: %s", reply->str); - freeReplyObject(reply); - callback->Complete(res); - delete callback; - } - else - reply_queue.push_back(reply); + freeReplyObject(reply); + CompleteCallback(callback, res); } void Redis::HandleGetResult(redisReply* reply, ValResultCallback* callback) { --active_ops; - if ( callback ) { - ValResult res; - if ( ! connected ) - res = zeek::unexpected("Connection is not open"); - else - res = ParseGetReply(reply); + ValResult res; + if ( ! connected ) + res = zeek::unexpected("Connection is not open"); + else + res = ParseGetReply(reply); - callback->Complete(res); - freeReplyObject(reply); - delete callback; - } - else { - reply_queue.push_back(reply); - } + freeReplyObject(reply); + CompleteCallback(callback, res); } void Redis::HandleEraseResult(redisReply* reply, ErrorResultCallback* callback) { --active_ops; - if ( callback ) { + if ( callback->IsSyncCallback() ) + reply_queue.push_back(reply); + else { ErrorResult res; if ( ! connected ) res = "Connection is not open"; @@ -464,11 +406,8 @@ void Redis::HandleEraseResult(redisReply* reply, ErrorResultCallback* callback) res = util::fmt("Async erase operation failed: %s", reply->str); freeReplyObject(reply); - callback->Complete(res); - delete callback; + CompleteCallback(callback, res); } - else - reply_queue.push_back(reply); } void Redis::HandleZRANGEBYSCORE(redisReply* reply) { diff --git a/src/storage/backend/redis/Redis.h b/src/storage/backend/redis/Redis.h index 613aca13b2..3a7dd21dcc 100644 --- a/src/storage/backend/redis/Redis.h +++ b/src/storage/backend/redis/Redis.h @@ -13,7 +13,7 @@ struct redisPollEvents; namespace zeek::storage::backend::redis { class Redis : public Backend, public iosource::IOSource { public: - Redis(std::string_view tag) : Backend(true, tag), IOSource(true) {} + Redis(std::string_view tag) : Backend(SupportedModes::ASYNC, tag), IOSource(true) {} ~Redis() override = default; static BackendPtr Instantiate(std::string_view tag); @@ -81,9 +81,11 @@ public: // themselves from the list of active operations. void HandleGeneric() { --active_ops; } +protected: + void Poll() override; + private: ValResult ParseGetReply(redisReply* reply) const; - void Poll(); redisAsyncContext* async_ctx = nullptr; diff --git a/src/storage/backend/sqlite/SQLite.h b/src/storage/backend/sqlite/SQLite.h index f3eff1536a..ad0869bad2 100644 --- a/src/storage/backend/sqlite/SQLite.h +++ b/src/storage/backend/sqlite/SQLite.h @@ -12,7 +12,7 @@ namespace zeek::storage::backend::sqlite { class SQLite : public Backend { public: - SQLite(std::string_view tag) : Backend(false, tag) {} + SQLite(std::string_view tag) : Backend(SupportedModes::SYNC, tag) {} ~SQLite() override = default; static BackendPtr Instantiate(std::string_view tag); diff --git a/src/storage/storage.bif b/src/storage/storage.bif index 6c68e5f638..a29c5fd801 100644 --- a/src/storage/storage.bif +++ b/src/storage/storage.bif @@ -8,12 +8,12 @@ using namespace zeek; using namespace zeek::storage; static zeek::detail::trigger::TriggerPtr init_trigger(zeek::detail::Frame* frame) { - auto trigger = frame->GetTrigger(); + auto trigger = frame->GetTrigger(); - if ( ! trigger ) { - emit_builtin_error("Asynchronous storage operations must be called via a when-condition"); - return nullptr; - } + if ( ! trigger ) { + emit_builtin_error("Asynchronous storage operations must be called via a when-condition"); + return nullptr; + } if ( auto timeout = trigger->TimeoutValue(); timeout < 0 ) { emit_builtin_error("Async Storage operations must specify a timeout block"); @@ -56,7 +56,7 @@ function Storage::Async::__open_backend%(btype: Storage::Backend, options: any, %{ auto trigger = init_trigger(frame); if ( ! trigger ) - return val_mgr->Bool(false); + return val_mgr->Bool(false); auto btype_val = IntrusivePtr{NewRef{}, btype->AsEnumVal()}; Tag tag{btype_val}; @@ -74,7 +74,19 @@ function Storage::Async::__open_backend%(btype: Storage::Backend, options: any, auto kt = key_type->AsTypeVal()->GetType()->AsTypeType()->GetType(); auto vt = val_type->AsTypeVal()->GetType()->AsTypeType()->GetType(); auto options_val = IntrusivePtr{NewRef{}, options->AsRecordVal()}; - storage_mgr->OpenBackend(b.value(), options_val, kt, vt, cb); + auto open_res = storage_mgr->OpenBackend(b.value(), options_val, kt, vt, cb); + + if ( ! b.value()->SupportsAsync() ) { + // If the backend doesn't support async, we blocked in order to get here already. Handle the + // callback manually. + cb->Complete(open_res); + delete cb; + } + else if ( run_state::reading_traces ) { + // If the backend is truly async and we're reading traces, we need to fake being in sync mode + // because otherwise time doesn't move forward correctly. + b.value()->Poll(); + } return nullptr; %} @@ -94,7 +106,19 @@ function Storage::Async::__close_backend%(backend: opaque of Storage::BackendHan return val_mgr->Bool(true); auto cb = new ErrorResultCallback(trigger, frame->GetTriggerAssoc()); - storage_mgr->CloseBackend(b->backend, cb); + auto close_res = storage_mgr->CloseBackend(b->backend, cb); + + if ( ! b->backend->SupportsAsync() ) { + // If the backend doesn't support async, we blocked in order to get here already. Handle the + // callback manually. + cb->Complete(close_res); + delete cb; + } + else if ( run_state::reading_traces ) { + // If the backend is truly async and we're reading traces, we need to fake being in sync mode + // because otherwise time doesn't move forward correctly. + b->backend->Poll(); + } return nullptr; %} @@ -120,7 +144,19 @@ function Storage::Async::__put%(backend: opaque of Storage::BackendHandle, key: auto cb = new ErrorResultCallback(trigger, frame->GetTriggerAssoc()); auto key_v = IntrusivePtr{NewRef{}, key}; auto val_v = IntrusivePtr{NewRef{}, value}; - b->backend->Put(key_v, val_v, overwrite, expire_time, cb); + auto put_res = b->backend->Put(key_v, val_v, overwrite, expire_time, cb); + + if ( ! b->backend->SupportsAsync() ) { + // If the backend doesn't support async, we blocked in order to get here already. Handle the + // callback manually. + cb->Complete(put_res); + delete cb; + } + else if ( run_state::reading_traces ) { + // If the backend is truly async and we're reading traces, we need to fake being in sync mode + // because otherwise time doesn't move forward correctly. + b->backend->Poll(); + } return nullptr; %} @@ -148,7 +184,19 @@ function Storage::Async::__get%(backend: opaque of Storage::BackendHandle, key: auto cb = new ValResultCallback(trigger, frame->GetTriggerAssoc()); auto key_v = IntrusivePtr{NewRef{}, key}; - auto result = b->backend->Get(key_v, cb); + auto get_res = b->backend->Get(key_v, cb); + + if ( ! b->backend->SupportsAsync() ) { + // If the backend doesn't support async, we blocked in order to get here already. Handle the + // callback manually. + cb->Complete(get_res); + delete cb; + } + else if ( run_state::reading_traces ) { + // If the backend is truly async and we're reading traces, we need to fake being in sync mode + // because otherwise time doesn't move forward correctly. + b->backend->Poll(); + } return nullptr; %} @@ -169,7 +217,19 @@ function Storage::Async::__erase%(backend: opaque of Storage::BackendHandle, key auto cb = new ErrorResultCallback(trigger, frame->GetTriggerAssoc()); auto key_v = IntrusivePtr{NewRef{}, key}; - b->backend->Erase(key_v, cb); + auto erase_res = b->backend->Erase(key_v, cb); + + if ( ! b->backend->SupportsAsync() ) { + // If the backend doesn't support async, we blocked in order to get here already. Handle the + // callback manually. + cb->Complete(erase_res); + delete cb; + } + else if ( run_state::reading_traces ) { + // If the backend is truly async and we're reading traces, we need to fake being in sync mode + // because otherwise time doesn't move forward correctly. + b->backend->Poll(); + } return nullptr; %} @@ -188,17 +248,29 @@ function Storage::Sync::__open_backend%(btype: Storage::Backend, options: any, k return val_mgr->Bool(false); } + auto bh = make_intrusive(b.value()); + + auto cb = new OpenResultCallback(bh.get()); auto kt = key_type->AsTypeVal()->GetType()->AsTypeType()->GetType(); auto vt = val_type->AsTypeVal()->GetType()->AsTypeType()->GetType(); auto options_val = IntrusivePtr{NewRef{}, options->AsRecordVal()}; - auto open_res = storage_mgr->OpenBackend(b.value(), options_val, kt, vt, nullptr); + auto open_res = storage_mgr->OpenBackend(b.value(), options_val, kt, vt, cb); + + // If the backend only supports async, block until it's ready and then pull the result out of + // the callback. + if ( ! b.value()->SupportsSync() ) { + b.value()->Poll(); + open_res = cb->Result(); + } + + delete cb; if ( open_res.has_value() ) { emit_builtin_error(open_res.value().c_str()); return val_mgr->Bool(false); } - return make_intrusive(b.value()); + return bh; %} function Storage::Sync::__close_backend%(backend: opaque of Storage::BackendHandle%) : bool @@ -212,10 +284,20 @@ function Storage::Sync::__close_backend%(backend: opaque of Storage::BackendHand // Return true here since the backend is already closed return val_mgr->Bool(true); - auto result = storage_mgr->CloseBackend(b->backend, nullptr); + auto cb = new ErrorResultCallback(); + auto close_res = storage_mgr->CloseBackend(b->backend, cb); - if ( result.has_value() ) { - emit_builtin_error(result.value().c_str()); + // If the backend only supports async, block until it's ready and then pull the result out of + // the callback. + if ( ! b->backend->SupportsSync() ) { + b->backend->Poll(); + close_res = cb->Result(); + } + + delete cb; + + if ( close_res.has_value() ) { + emit_builtin_error(close_res.value().c_str()); return val_mgr->Bool(false); } @@ -233,12 +315,22 @@ function Storage::Sync::__put%(backend: opaque of Storage::BackendHandle, key: a else if ( ! b->backend->IsOpen() ) return val_mgr->Bool(false); + auto cb = new ErrorResultCallback(); auto key_v = IntrusivePtr{NewRef{}, key}; auto val_v = IntrusivePtr{NewRef{}, value}; - auto result = b->backend->Put(key_v, val_v, overwrite, expire_time, nullptr); + auto put_res = b->backend->Put(key_v, val_v, overwrite, expire_time, cb); - if ( result.has_value() ) { - emit_builtin_error(util::fmt("Failed to store data: %s", result.value().c_str())); + // If the backend only supports async, block until it's ready and then pull the result out of + // the callback. + if ( ! b->backend->SupportsSync() ) { + b->backend->Poll(); + put_res = cb->Result(); + } + + delete cb; + + if ( put_res.has_value() ) { + emit_builtin_error(util::fmt("Failed to store data: %s", put_res.value().c_str())); return val_mgr->Bool(false); } @@ -261,15 +353,25 @@ function Storage::Sync::__get%(backend: opaque of Storage::BackendHandle, key: a } auto key_v = IntrusivePtr{NewRef{}, key}; - auto result = b->backend->Get(key_v, nullptr); + auto cb = new ValResultCallback(); + auto get_res = b->backend->Get(key_v, cb); - if ( ! result.has_value() ) { + // If the backend only supports async, block until it's ready and then pull the result out of + // the callback. + if ( ! b->backend->SupportsSync() ) { + b->backend->Poll(); + get_res = cb->Result(); + } + + delete cb; + + if ( ! get_res.has_value() ) { val_result->Assign(1, make_intrusive( - util::fmt("Failed to retrieve data: %s", result.error().c_str()))); + util::fmt("Failed to retrieve data: %s", get_res.error().c_str()))); return val_result; } - val_result->Assign(0, result.value()); + val_result->Assign(0, get_res.value()); return val_result; %} @@ -283,11 +385,21 @@ function Storage::Sync::__erase%(backend: opaque of Storage::BackendHandle, key: else if ( ! b->backend->IsOpen() ) return val_mgr->Bool(false); + auto cb = new ErrorResultCallback(); auto key_v = IntrusivePtr{NewRef{}, key}; - auto result = b->backend->Erase(key_v, nullptr); + auto erase_res = b->backend->Erase(key_v, cb); - if ( result.has_value() ) { - emit_builtin_error(util::fmt("Failed to erase data for key: %s", result.value().c_str())); + // If the backend only supports async, block until it's ready and then pull the result out of + // the callback. + if ( ! b->backend->SupportsSync() ) { + b->backend->Poll(); + erase_res = cb->Result(); + } + + delete cb; + + if ( erase_res.has_value() ) { + emit_builtin_error(util::fmt("Failed to erase data for key: %s", erase_res.value().c_str())); return val_mgr->Bool(false); } diff --git a/testing/btest/plugins/storage-plugin/src/StorageDummy.h b/testing/btest/plugins/storage-plugin/src/StorageDummy.h index e45acf3335..4c8a2dfc29 100644 --- a/testing/btest/plugins/storage-plugin/src/StorageDummy.h +++ b/testing/btest/plugins/storage-plugin/src/StorageDummy.h @@ -13,7 +13,7 @@ namespace btest::storage::backend { */ class StorageDummy : public zeek::storage::Backend { public: - StorageDummy(std::string_view tag) : Backend(false, tag) {} + StorageDummy(std::string_view tag) : Backend(zeek::storage::SupportedModes::SYNC, tag) {} ~StorageDummy() override = default; static zeek::storage::BackendPtr Instantiate(std::string_view tag); diff --git a/testing/btest/scripts/base/frameworks/storage/redis-async-reading-pcap.zeek b/testing/btest/scripts/base/frameworks/storage/redis-async-reading-pcap.zeek index 1e4d17ccbf..2fe19ddb37 100644 --- a/testing/btest/scripts/base/frameworks/storage/redis-async-reading-pcap.zeek +++ b/testing/btest/scripts/base/frameworks/storage/redis-async-reading-pcap.zeek @@ -1,6 +1,5 @@ # @TEST-DOC: Tests that Redis storage backend defaults back to sync mode reading pcaps -# @TEST-KNOWN-FAILURE: Currently broken due to the redis async rework # @TEST-REQUIRES: have-redis # @TEST-PORT: REDIS_PORT