diff --git a/scripts/base/init-bare.zeek b/scripts/base/init-bare.zeek index a71963f1fd..a45c5904f1 100644 --- a/scripts/base/init-bare.zeek +++ b/scripts/base/init-bare.zeek @@ -6246,7 +6246,10 @@ export { ## Generic disconnection failure. DISCONNECTION_FAILED, ## Generic initialization failure. - INITIALIZATION_FAILED + INITIALIZATION_FAILED, + ## Returned from async operations when the backend is waiting + ## for a result. + IN_PROGRESS, } &redef; ## Returned as the result of the various storage operations. diff --git a/src/storage/Manager.cc b/src/storage/Manager.cc index 6f991af5c2..0e1bf86ca4 100644 --- a/src/storage/Manager.cc +++ b/src/storage/Manager.cc @@ -76,7 +76,7 @@ zeek::expected Manager::Instantiate(const Tag& type) { OperationResult Manager::OpenBackend(BackendPtr backend, RecordValPtr options, TypePtr key_type, TypePtr val_type, OpenResultCallback* cb) { auto res = backend->Open(std::move(options), std::move(key_type), std::move(val_type), cb); - if ( res.code != ReturnCode::SUCCESS ) { + if ( res.code != ReturnCode::SUCCESS && res.code != ReturnCode::IN_PROGRESS ) { res.err_str = util::fmt("Failed to open backend %s: %s", backend->Tag(), res.err_str.c_str()); return res; } diff --git a/src/storage/ReturnCode.cc b/src/storage/ReturnCode.cc index 50d13554ce..22a3e8a2e6 100644 --- a/src/storage/ReturnCode.cc +++ b/src/storage/ReturnCode.cc @@ -18,6 +18,7 @@ EnumValPtr ReturnCode::KEY_EXISTS; EnumValPtr ReturnCode::CONNECTION_FAILED; EnumValPtr ReturnCode::DISCONNECTION_FAILED; EnumValPtr ReturnCode::INITIALIZATION_FAILED; +EnumValPtr ReturnCode::IN_PROGRESS; void ReturnCode::Initialize() { static const auto& return_code_type = zeek::id::find_type("Storage::ReturnCode"); @@ -57,6 +58,9 @@ void ReturnCode::Initialize() { tmp = return_code_type->Lookup("Storage::INITIALIZATION_FAILED"); INITIALIZATION_FAILED = return_code_type->GetEnumVal(tmp); + + tmp = return_code_type->Lookup("Storage::IN_PROGRESS"); + IN_PROGRESS = return_code_type->GetEnumVal(tmp); } void ReturnCode::Cleanup() { @@ -72,6 +76,7 @@ void ReturnCode::Cleanup() { CONNECTION_FAILED.reset(); DISCONNECTION_FAILED.reset(); INITIALIZATION_FAILED.reset(); + IN_PROGRESS.reset(); } } // namespace zeek::storage diff --git a/src/storage/ReturnCode.h b/src/storage/ReturnCode.h index 826408c9df..8b110641f0 100644 --- a/src/storage/ReturnCode.h +++ b/src/storage/ReturnCode.h @@ -31,6 +31,7 @@ public: static EnumValPtr CONNECTION_FAILED; static EnumValPtr DISCONNECTION_FAILED; static EnumValPtr INITIALIZATION_FAILED; + static EnumValPtr IN_PROGRESS; }; } // namespace storage diff --git a/src/storage/backend/redis/Redis.cc b/src/storage/backend/redis/Redis.cc index 7d1eb0a854..5ec512b3f7 100644 --- a/src/storage/backend/redis/Redis.cc +++ b/src/storage/backend/redis/Redis.cc @@ -228,7 +228,7 @@ OperationResult Redis::DoOpen(RecordValPtr options, OpenResultCallback* cb) { async_ctx->ev.addWrite = redisAddWrite; async_ctx->ev.delWrite = redisDelWrite; - return {ReturnCode::SUCCESS}; + return {ReturnCode::IN_PROGRESS}; } /** @@ -247,8 +247,6 @@ OperationResult Redis::DoClose(OperationResultCallback* cb) { // TODO: handle response } - CompleteCallback(cb, {ReturnCode::SUCCESS}); - redisAsyncFree(async_ctx); async_ctx = nullptr; @@ -306,7 +304,7 @@ OperationResult Redis::DoPut(ValPtr key, ValPtr value, bool overwrite, double ex ++active_ops; } - return {ReturnCode::SUCCESS}; + return {ReturnCode::IN_PROGRESS}; } /** @@ -329,7 +327,7 @@ OperationResult Redis::DoGet(ValPtr key, OperationResultCallback* cb) { // There isn't a result to return here. That happens in HandleGetResult for // async operations. - return {ReturnCode::SUCCESS}; + return {ReturnCode::IN_PROGRESS}; } /** @@ -350,7 +348,7 @@ OperationResult Redis::DoErase(ValPtr key, OperationResultCallback* cb) { ++active_ops; - return {ReturnCode::SUCCESS}; + return {ReturnCode::IN_PROGRESS}; } void Redis::DoExpire() { diff --git a/src/storage/storage.bif b/src/storage/storage.bif index 393a21fe0c..82d85e6afb 100644 --- a/src/storage/storage.bif +++ b/src/storage/storage.bif @@ -54,9 +54,12 @@ static zeek::expected cast_ static void handle_async_result(const IntrusivePtr& backend, ResultCallback* cb, const OperationResult& op_result) { - if ( ! backend->SupportsAsync() ) { - // If the backend doesn't support async, we blocked in order to get here already. Handle the - // callback manually. + if ( op_result.code != ReturnCode::IN_PROGRESS || ! backend->SupportsAsync() ) { + // We need to complete the callback early if: + // 1. The operation didn't start up successfully. For async operations, this means + // it didn't report back IN_PROGRESS. + // 2. The backend doesn't support async. This means we already blocked in order + // to get here already. cb->Complete(op_result); delete cb; }