Completely rework return values from storage operations

This commit is contained in:
Tim Wojtulewicz 2025-02-24 14:37:11 -07:00
parent 8ddda016ff
commit 9ed3e33f97
50 changed files with 859 additions and 586 deletions

View file

@ -7,6 +7,7 @@
#include "zeek/RunState.h"
#include "zeek/Val.h"
#include "zeek/iosource/Manager.h"
#include "zeek/storage/ReturnCode.h"
#include "hiredis/adapters/poll.h"
#include "hiredis/async.h"
@ -37,21 +38,21 @@ void redisOnDisconnect(const redisAsyncContext* ctx, int status) {
void redisPut(redisAsyncContext* ctx, void* reply, void* privdata) {
auto t = Tracer("put");
auto backend = static_cast<zeek::storage::backend::redis::Redis*>(ctx->data);
auto callback = static_cast<zeek::storage::ErrorResultCallback*>(privdata);
auto callback = static_cast<zeek::storage::OperationResultCallback*>(privdata);
backend->HandlePutResult(static_cast<redisReply*>(reply), callback);
}
void redisGet(redisAsyncContext* ctx, void* reply, void* privdata) {
auto t = Tracer("get");
auto backend = static_cast<zeek::storage::backend::redis::Redis*>(ctx->data);
auto callback = static_cast<zeek::storage::ValResultCallback*>(privdata);
auto callback = static_cast<zeek::storage::OperationResultCallback*>(privdata);
backend->HandleGetResult(static_cast<redisReply*>(reply), callback);
}
void redisErase(redisAsyncContext* ctx, void* reply, void* privdata) {
auto t = Tracer("erase");
auto backend = static_cast<zeek::storage::backend::redis::Redis*>(ctx->data);
auto callback = static_cast<zeek::storage::ErrorResultCallback*>(privdata);
auto callback = static_cast<zeek::storage::OperationResultCallback*>(privdata);
backend->HandleEraseResult(static_cast<redisReply*>(reply), callback);
}
@ -122,7 +123,7 @@ storage::BackendPtr Redis::Instantiate(std::string_view tag) { return make_intru
/**
* Called by the manager system to open the backend.
*/
ErrorResult Redis::DoOpen(RecordValPtr options, OpenResultCallback* cb) {
OperationResult Redis::DoOpen(RecordValPtr options, OpenResultCallback* cb) {
RecordValPtr backend_options = options->GetField<RecordVal>("redis");
key_prefix = backend_options->GetField<StringVal>("key_prefix")->ToStdString();
@ -137,9 +138,10 @@ ErrorResult Redis::DoOpen(RecordValPtr options, OpenResultCallback* cb) {
}
else {
StringValPtr unix_sock = backend_options->GetField<StringVal>("server_unix_socket");
if ( ! unix_sock )
return util::fmt(
"Either server_host/server_port or server_unix_socket must be set in Redis options record");
if ( ! unix_sock ) {
return {ReturnCode::CONNECTION_FAILED,
"Either server_host/server_port or server_unix_socket must be set in Redis options record"};
}
server_addr = unix_sock->ToStdString();
REDIS_OPTIONS_SET_UNIX(&opt, server_addr.c_str());
@ -164,11 +166,16 @@ ErrorResult Redis::DoOpen(RecordValPtr options, OpenResultCallback* cb) {
redisAsyncFree(async_ctx);
async_ctx = nullptr;
return errmsg;
return {ReturnCode::CONNECTION_FAILED, errmsg};
}
++active_ops;
// There's no way to pass privdata down to the connect handler like there is for
// the other callbacks. Store the open callback so that it can be dealt with from
// OnConnect().
open_cb = cb;
// TODO: Sort out how to pass the zeek callbacks for both open/done to the async
// callbacks from hiredis so they can return errors.
@ -199,13 +206,13 @@ ErrorResult Redis::DoOpen(RecordValPtr options, OpenResultCallback* cb) {
async_ctx->ev.addWrite = redisAddWrite;
async_ctx->ev.delWrite = redisDelWrite;
return std::nullopt;
return {ReturnCode::SUCCESS};
}
/**
* Finalizes the backend when it's being closed.
*/
ErrorResult Redis::DoClose(ErrorResultCallback* cb) {
OperationResult Redis::DoClose(OperationResultCallback* cb) {
connected = false;
redisAsyncDisconnect(async_ctx);
@ -216,19 +223,22 @@ ErrorResult Redis::DoClose(ErrorResultCallback* cb) {
// TODO: handle response
}
CompleteCallback(cb, {ReturnCode::SUCCESS});
redisAsyncFree(async_ctx);
async_ctx = nullptr;
return std::nullopt;
return {ReturnCode::SUCCESS};
}
/**
* The workhorse method for Put(). This must be implemented by plugins.
*/
ErrorResult Redis::DoPut(ValPtr key, ValPtr value, bool overwrite, double expiration_time, ErrorResultCallback* cb) {
OperationResult Redis::DoPut(ValPtr key, ValPtr value, bool overwrite, double expiration_time,
OperationResultCallback* cb) {
// The async context will queue operations until it's connected fully.
if ( ! connected && ! async_ctx )
return "Connection is not open";
return {ReturnCode::NOT_CONNECTED};
std::string format = "SET %s:%s %s";
if ( ! overwrite )
@ -250,7 +260,7 @@ ErrorResult Redis::DoPut(ValPtr key, ValPtr value, bool overwrite, double expira
json_value.data());
if ( connected && status == REDIS_ERR )
return util::fmt("Failed to queue put operation: %s", async_ctx->errstr);
return {ReturnCode::OPERATION_FAILED, util::fmt("Failed to queue put operation: %s", async_ctx->errstr)};
++active_ops;
@ -265,52 +275,52 @@ ErrorResult Redis::DoPut(ValPtr key, ValPtr value, bool overwrite, double expira
status = redisAsyncCommand(async_ctx, redisGeneric, NULL, format.c_str(), key_prefix.data(), expiration_time,
json_key.data());
if ( connected && status == REDIS_ERR )
return util::fmt("ZADD operation failed: %s", async_ctx->errstr);
return {ReturnCode::OPERATION_FAILED, util::fmt("ZADD operation failed: %s", async_ctx->errstr)};
++active_ops;
}
return std::nullopt;
return {ReturnCode::SUCCESS};
}
/**
* The workhorse method for Get(). This must be implemented for plugins.
*/
ValResult Redis::DoGet(ValPtr key, ValResultCallback* cb) {
OperationResult Redis::DoGet(ValPtr key, OperationResultCallback* cb) {
// The async context will queue operations until it's connected fully.
if ( ! connected && ! async_ctx )
return zeek::unexpected<std::string>("Connection is not open");
return {ReturnCode::NOT_CONNECTED};
int status = redisAsyncCommand(async_ctx, redisGet, cb, "GET %s:%s", key_prefix.data(),
key->ToJSON()->ToStdStringView().data());
if ( connected && status == REDIS_ERR )
return zeek::unexpected<std::string>(util::fmt("Failed to queue get operation: %s", async_ctx->errstr));
return {ReturnCode::OPERATION_FAILED, util::fmt("Failed to queue get operation: %s", async_ctx->errstr)};
++active_ops;
// There isn't a result to return here. That happens in HandleGetResult for
// async operations.
return zeek::unexpected<std::string>("");
return {ReturnCode::SUCCESS};
}
/**
* The workhorse method for Erase(). This must be implemented for plugins.
*/
ErrorResult Redis::DoErase(ValPtr key, ErrorResultCallback* cb) {
OperationResult Redis::DoErase(ValPtr key, OperationResultCallback* cb) {
// The async context will queue operations until it's connected fully.
if ( ! connected && ! async_ctx )
return "Connection is not open";
return {ReturnCode::NOT_CONNECTED};
int status = redisAsyncCommand(async_ctx, redisErase, cb, "DEL %s:%s", key_prefix.data(),
key->ToJSON()->ToStdStringView().data());
if ( connected && status == REDIS_ERR )
return util::fmt("Failed to queue erase operation failed: %s", async_ctx->errstr);
return {ReturnCode::OPERATION_FAILED, async_ctx->errstr};
++active_ops;
return std::nullopt;
return {ReturnCode::SUCCESS};
}
void Redis::Expire() {
@ -335,11 +345,15 @@ void Redis::Expire() {
redisReply* reply = reply_queue.front();
reply_queue.pop_front();
if ( reply && reply->elements == 0 ) {
if ( reply->elements == 0 ) {
freeReplyObject(reply);
return;
}
// The data from the reply to ZRANGEBYSCORE gets deleted as part of the
// commands below so we don't need to free it manually. Doing so results in
// a double-free.
// TODO: it's possible to pass multiple keys to a DEL operation but it requires
// building an array of the strings, building up the DEL command with entries,
// and passing the array as a block somehow. There's no guarantee it'd be faster
@ -357,33 +371,29 @@ void Redis::Expire() {
++active_ops;
Poll();
// This can't be freed until the other commands finish because the memory for
// the strings doesn't get copied when making the DEL commands.
// freeReplyObject(reply);
}
void Redis::HandlePutResult(redisReply* reply, ErrorResultCallback* callback) {
void Redis::HandlePutResult(redisReply* reply, OperationResultCallback* callback) {
--active_ops;
ErrorResult res;
OperationResult res{ReturnCode::SUCCESS};
if ( ! connected )
res = util::fmt("Connection is not open");
res = {ReturnCode::NOT_CONNECTED};
else if ( ! reply )
res = util::fmt("Async put operation returned null reply");
res = {ReturnCode::OPERATION_FAILED, "Async put operation returned null reply"};
else if ( reply && reply->type == REDIS_REPLY_ERROR )
res = util::fmt("Async put operation failed: %s", reply->str);
res = {ReturnCode::OPERATION_FAILED, util::fmt("Async put operation failed: %s", reply->str)};
freeReplyObject(reply);
CompleteCallback(callback, res);
}
void Redis::HandleGetResult(redisReply* reply, ValResultCallback* callback) {
void Redis::HandleGetResult(redisReply* reply, OperationResultCallback* callback) {
--active_ops;
ValResult res;
OperationResult res;
if ( ! connected )
res = zeek::unexpected<std::string>("Connection is not open");
res = {ReturnCode::NOT_CONNECTED};
else
res = ParseGetReply(reply);
@ -391,19 +401,20 @@ void Redis::HandleGetResult(redisReply* reply, ValResultCallback* callback) {
CompleteCallback(callback, res);
}
void Redis::HandleEraseResult(redisReply* reply, ErrorResultCallback* callback) {
void Redis::HandleEraseResult(redisReply* reply, OperationResultCallback* callback) {
--active_ops;
if ( callback->IsSyncCallback() )
reply_queue.push_back(reply);
else {
ErrorResult res;
OperationResult res{ReturnCode::SUCCESS};
if ( ! connected )
res = "Connection is not open";
res = {ReturnCode::NOT_CONNECTED};
else if ( ! reply )
res = util::fmt("Async erase operation returned null reply");
res = {ReturnCode::OPERATION_FAILED, "Async erase operation returned null reply"};
else if ( reply && reply->type == REDIS_REPLY_ERROR )
res = util::fmt("Async erase operation failed: %s", reply->str);
res = {ReturnCode::OPERATION_FAILED, util::fmt("Async erase operation failed: %s", reply->str)};
freeReplyObject(reply);
CompleteCallback(callback, res);
@ -421,9 +432,14 @@ void Redis::OnConnect(int status) {
if ( status == REDIS_OK ) {
connected = true;
CompleteCallback(open_cb, {ReturnCode::SUCCESS});
// TODO: post connect event
return;
}
connected = false;
CompleteCallback(open_cb, {ReturnCode::CONNECTION_FAILED});
// TODO: we could attempt to reconnect here
}
@ -436,6 +452,7 @@ void Redis::OnDisconnect(int status) {
}
else {
// TODO: this was unintentional, should we reconnect?
// TODO: post disconnect event
}
connected = false;
@ -448,19 +465,19 @@ void Redis::ProcessFd(int fd, int flags) {
redisAsyncHandleWrite(async_ctx);
}
ValResult Redis::ParseGetReply(redisReply* reply) const {
ValResult res;
OperationResult Redis::ParseGetReply(redisReply* reply) const {
OperationResult res;
if ( ! reply )
res = zeek::unexpected<std::string>("GET returned null reply");
res = {ReturnCode::OPERATION_FAILED, "GET returned null reply"};
else if ( ! reply->str )
res = zeek::unexpected<std::string>("GET returned key didn't exist");
res = {ReturnCode::KEY_NOT_FOUND};
else {
auto val = zeek::detail::ValFromJSON(reply->str, val_type, Func::nil);
if ( std::holds_alternative<ValPtr>(val) )
res = std::get<ValPtr>(val);
res = {ReturnCode::SUCCESS, "", std::get<ValPtr>(val)};
else
res = zeek::unexpected<std::string>(std::get<std::string>(val));
res = {ReturnCode::OPERATION_FAILED, std::get<std::string>(val)};
}
return res;