Add IN_PROGRESS return code, handle for async backends

This commit is contained in:
Tim Wojtulewicz 2025-03-09 20:48:26 -07:00
parent 60aa987e06
commit c7503654e8
6 changed files with 21 additions and 11 deletions

View file

@ -6246,7 +6246,10 @@ export {
## Generic disconnection failure. ## Generic disconnection failure.
DISCONNECTION_FAILED, DISCONNECTION_FAILED,
## Generic initialization failure. ## Generic initialization failure.
INITIALIZATION_FAILED INITIALIZATION_FAILED,
## Returned from async operations when the backend is waiting
## for a result.
IN_PROGRESS,
} &redef; } &redef;
## Returned as the result of the various storage operations. ## Returned as the result of the various storage operations.

View file

@ -76,7 +76,7 @@ zeek::expected<BackendPtr, std::string> Manager::Instantiate(const Tag& type) {
OperationResult Manager::OpenBackend(BackendPtr backend, RecordValPtr options, TypePtr key_type, TypePtr val_type, OperationResult Manager::OpenBackend(BackendPtr backend, RecordValPtr options, TypePtr key_type, TypePtr val_type,
OpenResultCallback* cb) { OpenResultCallback* cb) {
auto res = backend->Open(std::move(options), std::move(key_type), std::move(val_type), cb); auto res = backend->Open(std::move(options), std::move(key_type), std::move(val_type), cb);
if ( res.code != ReturnCode::SUCCESS ) { if ( res.code != ReturnCode::SUCCESS && res.code != ReturnCode::IN_PROGRESS ) {
res.err_str = util::fmt("Failed to open backend %s: %s", backend->Tag(), res.err_str.c_str()); res.err_str = util::fmt("Failed to open backend %s: %s", backend->Tag(), res.err_str.c_str());
return res; return res;
} }

View file

@ -18,6 +18,7 @@ EnumValPtr ReturnCode::KEY_EXISTS;
EnumValPtr ReturnCode::CONNECTION_FAILED; EnumValPtr ReturnCode::CONNECTION_FAILED;
EnumValPtr ReturnCode::DISCONNECTION_FAILED; EnumValPtr ReturnCode::DISCONNECTION_FAILED;
EnumValPtr ReturnCode::INITIALIZATION_FAILED; EnumValPtr ReturnCode::INITIALIZATION_FAILED;
EnumValPtr ReturnCode::IN_PROGRESS;
void ReturnCode::Initialize() { void ReturnCode::Initialize() {
static const auto& return_code_type = zeek::id::find_type<zeek::EnumType>("Storage::ReturnCode"); static const auto& return_code_type = zeek::id::find_type<zeek::EnumType>("Storage::ReturnCode");
@ -57,6 +58,9 @@ void ReturnCode::Initialize() {
tmp = return_code_type->Lookup("Storage::INITIALIZATION_FAILED"); tmp = return_code_type->Lookup("Storage::INITIALIZATION_FAILED");
INITIALIZATION_FAILED = return_code_type->GetEnumVal(tmp); INITIALIZATION_FAILED = return_code_type->GetEnumVal(tmp);
tmp = return_code_type->Lookup("Storage::IN_PROGRESS");
IN_PROGRESS = return_code_type->GetEnumVal(tmp);
} }
void ReturnCode::Cleanup() { void ReturnCode::Cleanup() {
@ -72,6 +76,7 @@ void ReturnCode::Cleanup() {
CONNECTION_FAILED.reset(); CONNECTION_FAILED.reset();
DISCONNECTION_FAILED.reset(); DISCONNECTION_FAILED.reset();
INITIALIZATION_FAILED.reset(); INITIALIZATION_FAILED.reset();
IN_PROGRESS.reset();
} }
} // namespace zeek::storage } // namespace zeek::storage

View file

@ -31,6 +31,7 @@ public:
static EnumValPtr CONNECTION_FAILED; static EnumValPtr CONNECTION_FAILED;
static EnumValPtr DISCONNECTION_FAILED; static EnumValPtr DISCONNECTION_FAILED;
static EnumValPtr INITIALIZATION_FAILED; static EnumValPtr INITIALIZATION_FAILED;
static EnumValPtr IN_PROGRESS;
}; };
} // namespace storage } // namespace storage

View file

@ -228,7 +228,7 @@ OperationResult Redis::DoOpen(RecordValPtr options, OpenResultCallback* cb) {
async_ctx->ev.addWrite = redisAddWrite; async_ctx->ev.addWrite = redisAddWrite;
async_ctx->ev.delWrite = redisDelWrite; async_ctx->ev.delWrite = redisDelWrite;
return {ReturnCode::SUCCESS}; return {ReturnCode::IN_PROGRESS};
} }
/** /**
@ -247,8 +247,6 @@ OperationResult Redis::DoClose(OperationResultCallback* cb) {
// TODO: handle response // TODO: handle response
} }
CompleteCallback(cb, {ReturnCode::SUCCESS});
redisAsyncFree(async_ctx); redisAsyncFree(async_ctx);
async_ctx = nullptr; async_ctx = nullptr;
@ -306,7 +304,7 @@ OperationResult Redis::DoPut(ValPtr key, ValPtr value, bool overwrite, double ex
++active_ops; ++active_ops;
} }
return {ReturnCode::SUCCESS}; return {ReturnCode::IN_PROGRESS};
} }
/** /**
@ -329,7 +327,7 @@ OperationResult Redis::DoGet(ValPtr key, OperationResultCallback* cb) {
// There isn't a result to return here. That happens in HandleGetResult for // There isn't a result to return here. That happens in HandleGetResult for
// async operations. // async operations.
return {ReturnCode::SUCCESS}; return {ReturnCode::IN_PROGRESS};
} }
/** /**
@ -350,7 +348,7 @@ OperationResult Redis::DoErase(ValPtr key, OperationResultCallback* cb) {
++active_ops; ++active_ops;
return {ReturnCode::SUCCESS}; return {ReturnCode::IN_PROGRESS};
} }
void Redis::DoExpire() { void Redis::DoExpire() {

View file

@ -54,9 +54,12 @@ static zeek::expected<storage::detail::BackendHandleVal*, OperationResult> cast_
static void handle_async_result(const IntrusivePtr<Backend>& backend, ResultCallback* cb, static void handle_async_result(const IntrusivePtr<Backend>& backend, ResultCallback* cb,
const OperationResult& op_result) { const OperationResult& op_result) {
if ( ! backend->SupportsAsync() ) { if ( op_result.code != ReturnCode::IN_PROGRESS || ! backend->SupportsAsync() ) {
// If the backend doesn't support async, we blocked in order to get here already. Handle the // We need to complete the callback early if:
// callback manually. // 1. The operation didn't start up successfully. For async operations, this means
// it didn't report back IN_PROGRESS.
// 2. The backend doesn't support async. This means we already blocked in order
// to get here already.
cb->Complete(op_result); cb->Complete(op_result);
delete cb; delete cb;
} }