Redis: Handle disconnection correctly via callback

This commit is contained in:
Tim Wojtulewicz 2025-03-10 15:32:03 -07:00
parent b067a6e588
commit a40db844eb
3 changed files with 21 additions and 17 deletions

View file

@ -6233,7 +6233,7 @@ export {
NOT_CONNECTED, NOT_CONNECTED,
## Operation timed out. ## Operation timed out.
TIMEOUT, TIMEOUT,
## Connection to backed was lost. ## Connection to backed was lost unexpectedly.
CONNECTION_LOST, CONNECTION_LOST,
## Generic operation failed. ## Generic operation failed.
OPERATION_FAILED, OPERATION_FAILED,
@ -6241,7 +6241,8 @@ export {
KEY_NOT_FOUND, KEY_NOT_FOUND,
## Key requested for overwrite already exists. ## Key requested for overwrite already exists.
KEY_EXISTS, KEY_EXISTS,
## Generic connection failure. ## Generic connection-setup failure. This is not if the connection
## was lost, but if it failed to be setup in the first place.
CONNECTION_FAILED, CONNECTION_FAILED,
## Generic disconnection failure. ## Generic disconnection failure.
DISCONNECTION_FAILED, DISCONNECTION_FAILED,

View file

@ -173,6 +173,9 @@ OperationResult Redis::DoOpen(OpenResultCallback* cb, RecordValPtr options) {
struct timeval timeout = {5, 0}; struct timeval timeout = {5, 0};
opt.connect_timeout = &timeout; opt.connect_timeout = &timeout;
// The connection request below should be operation #1.
active_ops = 1;
async_ctx = redisAsyncConnectWithOptions(&opt); async_ctx = redisAsyncConnectWithOptions(&opt);
if ( async_ctx == nullptr || async_ctx->err ) { if ( async_ctx == nullptr || async_ctx->err ) {
// This block doesn't necessarily mean the connection failed. It means // This block doesn't necessarily mean the connection failed. It means
@ -189,8 +192,6 @@ OperationResult Redis::DoOpen(OpenResultCallback* cb, RecordValPtr options) {
return {ReturnCode::CONNECTION_FAILED, errmsg}; return {ReturnCode::CONNECTION_FAILED, errmsg};
} }
++active_ops;
// There's no way to pass privdata down to the connect handler like there is for // There's no way to pass privdata down to the connect handler like there is for
// the other callbacks. Store the open callback so that it can be dealt with from // the other callbacks. Store the open callback so that it can be dealt with from
// OnConnect(). // OnConnect().
@ -236,19 +237,12 @@ OperationResult Redis::DoClose(OperationResultCallback* cb) {
auto locked_scope = conditionally_lock(zeek::run_state::reading_traces, expire_mutex); auto locked_scope = conditionally_lock(zeek::run_state::reading_traces, expire_mutex);
connected = false; connected = false;
close_cb = cb;
redisAsyncDisconnect(async_ctx); redisAsyncDisconnect(async_ctx);
++active_ops; ++active_ops;
if ( cb->IsSyncCallback() && ! zeek::run_state::terminating ) { return {ReturnCode::IN_PROGRESS};
Poll();
// TODO: handle response
}
redisAsyncFree(async_ctx);
async_ctx = nullptr;
return {ReturnCode::SUCCESS};
} }
/** /**
@ -487,13 +481,21 @@ void Redis::OnConnect(int status) {
void Redis::OnDisconnect(int status) { void Redis::OnDisconnect(int status) {
DBG_LOG(DBG_STORAGE, "Redis backend: disconnection event"); DBG_LOG(DBG_STORAGE, "Redis backend: disconnection event");
--active_ops;
connected = false; connected = false;
if ( status == REDIS_ERR ) {
if ( status == REDIS_ERR ) // An error status indicates that the connection was lost unexpectedly and not
// via a request from backend.
EnqueueBackendLost(async_ctx->errstr); EnqueueBackendLost(async_ctx->errstr);
else }
else {
--active_ops;
EnqueueBackendLost("Client disconnected"); EnqueueBackendLost("Client disconnected");
CompleteCallback(close_cb, {ReturnCode::SUCCESS});
}
redisAsyncFree(async_ctx);
async_ctx = nullptr;
} }
void Redis::ProcessFd(int fd, int flags) { void Redis::ProcessFd(int fd, int flags) {

View file

@ -69,6 +69,7 @@ private:
std::deque<redisReply*> reply_queue; std::deque<redisReply*> reply_queue;
OpenResultCallback* open_cb; OpenResultCallback* open_cb;
OperationResultCallback* close_cb;
std::mutex expire_mutex; std::mutex expire_mutex;
std::string server_addr; std::string server_addr;