From 855c530b640ff7455c303cf2ba0746bf73ec0bee Mon Sep 17 00:00:00 2001 From: Tim Wojtulewicz Date: Fri, 21 Mar 2025 09:37:30 -0700 Subject: [PATCH] Redis: Handle other errors from requests, fix KEY_EXISTS for put operations --- src/storage/backend/redis/Redis.cc | 56 ++++++++++--------- src/storage/backend/redis/Redis.h | 2 +- .../out | 9 ++- .../base/frameworks/storage/redis-sync.zeek | 44 ++++++++++----- 4 files changed, 67 insertions(+), 44 deletions(-) diff --git a/src/storage/backend/redis/Redis.cc b/src/storage/backend/redis/Redis.cc index 2478f0ad24..95ba8eba23 100644 --- a/src/storage/backend/redis/Redis.cc +++ b/src/storage/backend/redis/Redis.cc @@ -60,7 +60,7 @@ void redisZADD(redisAsyncContext* ctx, void* reply, void* privdata) { auto t = Tracer("generic"); auto backend = static_cast(ctx->data); - // We don't care about the reply from the ZADD, m1ostly because blocking to poll + // We don't care about the reply from the ZADD, mostly because blocking to poll // for it adds a bunch of complication to DoPut() with having to handle the // reply from SET first. backend->HandleGeneric(nullptr); @@ -415,9 +415,12 @@ void Redis::HandlePutResult(redisReply* reply, ResultCallback* callback) { if ( ! connected ) res = {ReturnCode::NOT_CONNECTED}; else if ( ! reply ) - res = {ReturnCode::OPERATION_FAILED, "Async put operation returned null reply"}; - else if ( reply && reply->type == REDIS_REPLY_ERROR ) - res = {ReturnCode::OPERATION_FAILED, util::fmt("Async put operation failed: %s", reply->str)}; + res = {ReturnCode::OPERATION_FAILED, "put operation returned null reply"}; + else if ( reply->type == REDIS_REPLY_NIL ) + // For a SET operation, a NIL reply indicates a conflict with the NX flag. + res = {ReturnCode::KEY_EXISTS}; + else if ( reply->type == REDIS_REPLY_ERROR ) + res = ParseReplyError("put", reply->str); freeReplyObject(reply); CompleteCallback(callback, res); @@ -429,8 +432,19 @@ void Redis::HandleGetResult(redisReply* reply, ResultCallback* callback) { OperationResult res; if ( ! connected ) res = {ReturnCode::NOT_CONNECTED}; - else - res = ParseGetReply(reply); + if ( ! reply ) + res = {ReturnCode::OPERATION_FAILED, "get operation returned null reply"}; + else if ( reply->type == REDIS_REPLY_NIL ) + res = {ReturnCode::KEY_NOT_FOUND}; + else if ( reply->type == REDIS_REPLY_ERROR ) + res = ParseReplyError("get", reply->str); + else { + auto val = zeek::detail::ValFromJSON(reply->str, val_type, Func::nil); + if ( std::holds_alternative(val) ) + res = {ReturnCode::SUCCESS, "", std::get(val)}; + else + res = {ReturnCode::OPERATION_FAILED, std::get(val)}; + } freeReplyObject(reply); CompleteCallback(callback, res); @@ -444,9 +458,9 @@ void Redis::HandleEraseResult(redisReply* reply, ResultCallback* callback) { if ( ! connected ) res = {ReturnCode::NOT_CONNECTED}; else if ( ! reply ) - res = {ReturnCode::OPERATION_FAILED, "Async erase operation returned null reply"}; - else if ( reply && reply->type == REDIS_REPLY_ERROR ) - res = {ReturnCode::OPERATION_FAILED, util::fmt("Async erase operation failed: %s", reply->str)}; + res = {ReturnCode::OPERATION_FAILED, "erase operation returned null reply"}; + else if ( reply->type == REDIS_REPLY_ERROR ) + res = ParseReplyError("erase", reply->str); freeReplyObject(reply); CompleteCallback(callback, res); @@ -505,22 +519,14 @@ void Redis::ProcessFd(int fd, int flags) { redisAsyncHandleWrite(async_ctx); } -OperationResult Redis::ParseGetReply(redisReply* reply) const { - OperationResult res; - - if ( ! reply ) - res = {ReturnCode::OPERATION_FAILED, "GET returned null reply"}; - else if ( ! reply->str ) - res = {ReturnCode::KEY_NOT_FOUND}; - else { - auto val = zeek::detail::ValFromJSON(reply->str, val_type, Func::nil); - if ( std::holds_alternative(val) ) - res = {ReturnCode::SUCCESS, "", std::get(val)}; - else - res = {ReturnCode::OPERATION_FAILED, std::get(val)}; - } - - return res; +OperationResult Redis::ParseReplyError(std::string_view op_str, std::string_view reply_err_str) const { + if ( async_ctx->err == REDIS_ERR_TIMEOUT ) + return {ReturnCode::TIMEOUT}; + else if ( async_ctx->err == REDIS_ERR_IO ) + return {ReturnCode::OPERATION_FAILED, util::fmt("%s operation IO error: %s", op_str.data(), strerror(errno))}; + else + return {ReturnCode::OPERATION_FAILED, + util::fmt("%s operation failed: %s", op_str.data(), reply_err_str.data())}; } void Redis::DoPoll() { diff --git a/src/storage/backend/redis/Redis.h b/src/storage/backend/redis/Redis.h index 951a2a93d6..64607c2f20 100644 --- a/src/storage/backend/redis/Redis.h +++ b/src/storage/backend/redis/Redis.h @@ -59,7 +59,7 @@ private: void DoExpire(double current_network_time) override; void DoPoll() override; - OperationResult ParseGetReply(redisReply* reply) const; + OperationResult ParseReplyError(std::string_view op_str, std::string_view reply_err_str) const; redisAsyncContext* async_ctx = nullptr; diff --git a/testing/btest/Baseline/scripts.base.frameworks.storage.redis-sync/out b/testing/btest/Baseline/scripts.base.frameworks.storage.redis-sync/out index a8fa18538e..0053e1555d 100644 --- a/testing/btest/Baseline/scripts.base.frameworks.storage.redis-sync/out +++ b/testing/btest/Baseline/scripts.base.frameworks.storage.redis-sync/out @@ -3,8 +3,11 @@ open_result, [code=Storage::SUCCESS, error_str=, value=, value=] get result, [code=Storage::SUCCESS, error_str=, value=value1234] get result same as inserted, T -overwrite put result, [code=Storage::SUCCESS, error_str=, value=] -get result, [code=Storage::SUCCESS, error_str=, value=value5678] -get result same as inserted, T +put result, [code=Storage::KEY_EXISTS, error_str=, value=] +get result, [code=Storage::SUCCESS, error_str=, value=value1234] +get result same as originally inserted, T +put result, [code=Storage::SUCCESS, error_str=, value=] +get result, [code=Storage::SUCCESS, error_str=, value=value2345] +get result same as overwritten, T Storage::backend_opened, Storage::REDIS, [redis=[server_host=127.0.0.1, server_port=xxxx/tcp, server_unix_socket=, key_prefix=testing]] Storage::backend_lost, Storage::REDIS, [redis=[server_host=127.0.0.1, server_port=xxxx/tcp, server_unix_socket=, key_prefix=testing]], Client disconnected diff --git a/testing/btest/scripts/base/frameworks/storage/redis-sync.zeek b/testing/btest/scripts/base/frameworks/storage/redis-sync.zeek index 0142b9fef3..82e9bf41d0 100644 --- a/testing/btest/scripts/base/frameworks/storage/redis-sync.zeek +++ b/testing/btest/scripts/base/frameworks/storage/redis-sync.zeek @@ -29,28 +29,42 @@ event zeek_init() local key = "key1234"; local value = "value1234"; + local value2 = "value2345"; - local open_res = Storage::Sync::open_backend(Storage::REDIS, opts, string, string); - print "open_result", open_res; + local res = Storage::Sync::open_backend(Storage::REDIS, opts, string, string); + print "open_result", res; - local b = open_res$value; + local b = res$value; - local res = Storage::Sync::put(b, [ $key=key, $value=value ]); + # Put a first value. This should return Storage::SUCCESS. + res = Storage::Sync::put(b, [$key=key, $value=value]); print "put result", res; - local res2 = Storage::Sync::get(b, key); - print "get result", res2; - if ( res2$code == Storage::SUCCESS && res2?$value ) - print "get result same as inserted", value == ( res2$value as string ); + # Get the first value, validate that it's what we inserted. + res = Storage::Sync::get(b, key); + print "get result", res; + if ( res$code == Storage::SUCCESS && res?$value ) + print "get result same as inserted", value == (res$value as string); - local value2 = "value5678"; - res = Storage::Sync::put(b, [ $key=key, $value=value2, $overwrite=T ]); - print "overwrite put result", res; + # This will return a Storage::KEY_EXISTS since we don't want overwriting. + res = Storage::Sync::put(b, [$key=key, $value=value2, $overwrite=F]); + print "put result", res; - res2 = Storage::Sync::get(b, key); - print "get result", res2; - if ( res2$code == Storage::SUCCESS && res2?$value ) - print "get result same as inserted", value2 == ( res2$value as string ); + # Verify that the overwrite didn't actually happen. + res = Storage::Sync::get(b, key); + print "get result", res; + if ( res$code == Storage::SUCCESS && res?$value ) + print "get result same as originally inserted", value == (res$value as string); + + # This will return a Storage::SUCESSS since we're asking for an overwrite. + res = Storage::Sync::put(b, [$key=key, $value=value2, $overwrite=T]); + print "put result", res; + + # Verify that the overwrite happened. + res = Storage::Sync::get(b, key); + print "get result", res; + if ( res$code == Storage::SUCCESS && res?$value ) + print "get result same as overwritten", value2 == (res$value as string); Storage::Sync::close_backend(b); }