Redis: Handle other errors from requests, fix KEY_EXISTS for put operations

This commit is contained in:
Tim Wojtulewicz 2025-03-21 09:37:30 -07:00
parent 3d7fcfb428
commit 855c530b64
4 changed files with 67 additions and 44 deletions

View file

@ -60,7 +60,7 @@ void redisZADD(redisAsyncContext* ctx, void* reply, void* privdata) {
auto t = Tracer("generic"); auto t = Tracer("generic");
auto backend = static_cast<zeek::storage::backend::redis::Redis*>(ctx->data); auto backend = static_cast<zeek::storage::backend::redis::Redis*>(ctx->data);
// We don't care about the reply from the ZADD, m1ostly because blocking to poll // We don't care about the reply from the ZADD, mostly because blocking to poll
// for it adds a bunch of complication to DoPut() with having to handle the // for it adds a bunch of complication to DoPut() with having to handle the
// reply from SET first. // reply from SET first.
backend->HandleGeneric(nullptr); backend->HandleGeneric(nullptr);
@ -415,9 +415,12 @@ void Redis::HandlePutResult(redisReply* reply, ResultCallback* callback) {
if ( ! connected ) if ( ! connected )
res = {ReturnCode::NOT_CONNECTED}; res = {ReturnCode::NOT_CONNECTED};
else if ( ! reply ) else if ( ! reply )
res = {ReturnCode::OPERATION_FAILED, "Async put operation returned null reply"}; res = {ReturnCode::OPERATION_FAILED, "put operation returned null reply"};
else if ( reply && reply->type == REDIS_REPLY_ERROR ) else if ( reply->type == REDIS_REPLY_NIL )
res = {ReturnCode::OPERATION_FAILED, util::fmt("Async put operation failed: %s", reply->str)}; // For a SET operation, a NIL reply indicates a conflict with the NX flag.
res = {ReturnCode::KEY_EXISTS};
else if ( reply->type == REDIS_REPLY_ERROR )
res = ParseReplyError("put", reply->str);
freeReplyObject(reply); freeReplyObject(reply);
CompleteCallback(callback, res); CompleteCallback(callback, res);
@ -429,8 +432,19 @@ void Redis::HandleGetResult(redisReply* reply, ResultCallback* callback) {
OperationResult res; OperationResult res;
if ( ! connected ) if ( ! connected )
res = {ReturnCode::NOT_CONNECTED}; res = {ReturnCode::NOT_CONNECTED};
else if ( ! reply )
res = ParseGetReply(reply); res = {ReturnCode::OPERATION_FAILED, "get operation returned null reply"};
else if ( reply->type == REDIS_REPLY_NIL )
res = {ReturnCode::KEY_NOT_FOUND};
else if ( reply->type == REDIS_REPLY_ERROR )
res = ParseReplyError("get", reply->str);
else {
auto val = zeek::detail::ValFromJSON(reply->str, val_type, Func::nil);
if ( std::holds_alternative<ValPtr>(val) )
res = {ReturnCode::SUCCESS, "", std::get<ValPtr>(val)};
else
res = {ReturnCode::OPERATION_FAILED, std::get<std::string>(val)};
}
freeReplyObject(reply); freeReplyObject(reply);
CompleteCallback(callback, res); CompleteCallback(callback, res);
@ -444,9 +458,9 @@ void Redis::HandleEraseResult(redisReply* reply, ResultCallback* callback) {
if ( ! connected ) if ( ! connected )
res = {ReturnCode::NOT_CONNECTED}; res = {ReturnCode::NOT_CONNECTED};
else if ( ! reply ) else if ( ! reply )
res = {ReturnCode::OPERATION_FAILED, "Async erase operation returned null reply"}; res = {ReturnCode::OPERATION_FAILED, "erase operation returned null reply"};
else if ( reply && reply->type == REDIS_REPLY_ERROR ) else if ( reply->type == REDIS_REPLY_ERROR )
res = {ReturnCode::OPERATION_FAILED, util::fmt("Async erase operation failed: %s", reply->str)}; res = ParseReplyError("erase", reply->str);
freeReplyObject(reply); freeReplyObject(reply);
CompleteCallback(callback, res); CompleteCallback(callback, res);
@ -505,22 +519,14 @@ void Redis::ProcessFd(int fd, int flags) {
redisAsyncHandleWrite(async_ctx); redisAsyncHandleWrite(async_ctx);
} }
OperationResult Redis::ParseGetReply(redisReply* reply) const { OperationResult Redis::ParseReplyError(std::string_view op_str, std::string_view reply_err_str) const {
OperationResult res; if ( async_ctx->err == REDIS_ERR_TIMEOUT )
return {ReturnCode::TIMEOUT};
if ( ! reply ) else if ( async_ctx->err == REDIS_ERR_IO )
res = {ReturnCode::OPERATION_FAILED, "GET returned null reply"}; return {ReturnCode::OPERATION_FAILED, util::fmt("%s operation IO error: %s", op_str.data(), strerror(errno))};
else if ( ! reply->str ) else
res = {ReturnCode::KEY_NOT_FOUND}; return {ReturnCode::OPERATION_FAILED,
else { util::fmt("%s operation failed: %s", op_str.data(), reply_err_str.data())};
auto val = zeek::detail::ValFromJSON(reply->str, val_type, Func::nil);
if ( std::holds_alternative<ValPtr>(val) )
res = {ReturnCode::SUCCESS, "", std::get<ValPtr>(val)};
else
res = {ReturnCode::OPERATION_FAILED, std::get<std::string>(val)};
}
return res;
} }
void Redis::DoPoll() { void Redis::DoPoll() {

View file

@ -59,7 +59,7 @@ private:
void DoExpire(double current_network_time) override; void DoExpire(double current_network_time) override;
void DoPoll() override; void DoPoll() override;
OperationResult ParseGetReply(redisReply* reply) const; OperationResult ParseReplyError(std::string_view op_str, std::string_view reply_err_str) const;
redisAsyncContext* async_ctx = nullptr; redisAsyncContext* async_ctx = nullptr;

View file

@ -3,8 +3,11 @@ open_result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=<opaque of
put result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=<uninitialized>] put result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=<uninitialized>]
get result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=value1234] get result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=value1234]
get result same as inserted, T get result same as inserted, T
overwrite put result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=<uninitialized>] put result, [code=Storage::KEY_EXISTS, error_str=<uninitialized>, value=<uninitialized>]
get result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=value5678] get result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=value1234]
get result same as inserted, T get result same as originally inserted, T
put result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=<uninitialized>]
get result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=value2345]
get result same as overwritten, T
Storage::backend_opened, Storage::REDIS, [redis=[server_host=127.0.0.1, server_port=xxxx/tcp, server_unix_socket=<uninitialized>, key_prefix=testing]] Storage::backend_opened, Storage::REDIS, [redis=[server_host=127.0.0.1, server_port=xxxx/tcp, server_unix_socket=<uninitialized>, key_prefix=testing]]
Storage::backend_lost, Storage::REDIS, [redis=[server_host=127.0.0.1, server_port=xxxx/tcp, server_unix_socket=<uninitialized>, key_prefix=testing]], Client disconnected Storage::backend_lost, Storage::REDIS, [redis=[server_host=127.0.0.1, server_port=xxxx/tcp, server_unix_socket=<uninitialized>, key_prefix=testing]], Client disconnected

View file

@ -29,28 +29,42 @@ event zeek_init()
local key = "key1234"; local key = "key1234";
local value = "value1234"; local value = "value1234";
local value2 = "value2345";
local open_res = Storage::Sync::open_backend(Storage::REDIS, opts, string, string); local res = Storage::Sync::open_backend(Storage::REDIS, opts, string, string);
print "open_result", open_res; print "open_result", res;
local b = open_res$value; local b = res$value;
local res = Storage::Sync::put(b, [ $key=key, $value=value ]); # Put a first value. This should return Storage::SUCCESS.
res = Storage::Sync::put(b, [$key=key, $value=value]);
print "put result", res; print "put result", res;
local res2 = Storage::Sync::get(b, key); # Get the first value, validate that it's what we inserted.
print "get result", res2; res = Storage::Sync::get(b, key);
if ( res2$code == Storage::SUCCESS && res2?$value ) print "get result", res;
print "get result same as inserted", value == ( res2$value as string ); if ( res$code == Storage::SUCCESS && res?$value )
print "get result same as inserted", value == (res$value as string);
local value2 = "value5678"; # This will return a Storage::KEY_EXISTS since we don't want overwriting.
res = Storage::Sync::put(b, [ $key=key, $value=value2, $overwrite=T ]); res = Storage::Sync::put(b, [$key=key, $value=value2, $overwrite=F]);
print "overwrite put result", res; print "put result", res;
res2 = Storage::Sync::get(b, key); # Verify that the overwrite didn't actually happen.
print "get result", res2; res = Storage::Sync::get(b, key);
if ( res2$code == Storage::SUCCESS && res2?$value ) print "get result", res;
print "get result same as inserted", value2 == ( res2$value as string ); if ( res$code == Storage::SUCCESS && res?$value )
print "get result same as originally inserted", value == (res$value as string);
# This will return a Storage::SUCESSS since we're asking for an overwrite.
res = Storage::Sync::put(b, [$key=key, $value=value2, $overwrite=T]);
print "put result", res;
# Verify that the overwrite happened.
res = Storage::Sync::get(b, key);
print "get result", res;
if ( res$code == Storage::SUCCESS && res?$value )
print "get result same as overwritten", value2 == (res$value as string);
Storage::Sync::close_backend(b); Storage::Sync::close_backend(b);
} }