Redis: Fix thread-contention issues with Expire(), add more tests

This commit is contained in:
Tim Wojtulewicz 2025-02-27 16:39:41 -07:00
parent b81e876ec8
commit cca1d4f988
7 changed files with 200 additions and 40 deletions

View file

@ -16,6 +16,8 @@
// Anonymous callback handler methods for the hiredis async API. // Anonymous callback handler methods for the hiredis async API.
namespace { namespace {
bool during_expire = false;
class Tracer { class Tracer {
public: public:
Tracer(const std::string& where) : where(where) {} // DBG_LOG(zeek::DBG_STORAGE, "%s", where.c_str()); } Tracer(const std::string& where) : where(where) {} // DBG_LOG(zeek::DBG_STORAGE, "%s", where.c_str()); }
@ -56,17 +58,21 @@ void redisErase(redisAsyncContext* ctx, void* reply, void* privdata) {
backend->HandleEraseResult(static_cast<redisReply*>(reply), callback); backend->HandleEraseResult(static_cast<redisReply*>(reply), callback);
} }
void redisZRANGEBYSCORE(redisAsyncContext* ctx, void* reply, void* privdata) { void redisZADD(redisAsyncContext* ctx, void* reply, void* privdata) {
auto t = Tracer("zrangebyscore"); auto t = Tracer("generic");
auto backend = static_cast<zeek::storage::backend::redis::Redis*>(ctx->data); auto backend = static_cast<zeek::storage::backend::redis::Redis*>(ctx->data);
backend->HandleZRANGEBYSCORE(static_cast<redisReply*>(reply));
// We don't care about the reply from the ZADD, m1ostly because blocking to poll
// for it adds a bunch of complication to DoPut() with having to handle the
// reply from SET first.
backend->HandleGeneric(nullptr);
freeReplyObject(reply);
} }
void redisGeneric(redisAsyncContext* ctx, void* reply, void* privdata) { void redisGeneric(redisAsyncContext* ctx, void* reply, void* privdata) {
auto t = Tracer("generic"); auto t = Tracer("generic");
auto backend = static_cast<zeek::storage::backend::redis::Redis*>(ctx->data); auto backend = static_cast<zeek::storage::backend::redis::Redis*>(ctx->data);
backend->HandleGeneric(); backend->HandleGeneric(static_cast<redisReply*>(reply));
freeReplyObject(reply);
} }
// Because we called redisPollAttach in DoOpen(), privdata here is a // Because we called redisPollAttach in DoOpen(), privdata here is a
@ -74,12 +80,17 @@ void redisGeneric(redisAsyncContext* ctx, void* reply, void* privdata) {
// data, which contains the backend. Because we overrode these callbacks in // data, which contains the backend. Because we overrode these callbacks in
// DoOpen, we still want to mimic their callbacks to redisPollTick functions // DoOpen, we still want to mimic their callbacks to redisPollTick functions
// correctly. // correctly.
//
// Additionally, if we're in the middle of running a manual Expire() because
// we're reading a pcap, don't add the file descriptor into iosource_mgr. Manual
// calls to Poll() during that will handle reading/writing any data, and we
// don't want the contention with the main loop.
void redisAddRead(void* privdata) { void redisAddRead(void* privdata) {
auto t = Tracer("addread"); auto t = Tracer("addread");
auto rpe = static_cast<redisPollEvents*>(privdata); auto rpe = static_cast<redisPollEvents*>(privdata);
auto backend = static_cast<zeek::storage::backend::redis::Redis*>(rpe->context->data); auto backend = static_cast<zeek::storage::backend::redis::Redis*>(rpe->context->data);
if ( rpe->reading == 0 ) if ( rpe->reading == 0 && ! during_expire )
zeek::iosource_mgr->RegisterFd(rpe->fd, backend, zeek::iosource::IOSource::READ); zeek::iosource_mgr->RegisterFd(rpe->fd, backend, zeek::iosource::IOSource::READ);
rpe->reading = 1; rpe->reading = 1;
} }
@ -89,7 +100,7 @@ void redisDelRead(void* privdata) {
auto rpe = static_cast<redisPollEvents*>(privdata); auto rpe = static_cast<redisPollEvents*>(privdata);
auto backend = static_cast<zeek::storage::backend::redis::Redis*>(rpe->context->data); auto backend = static_cast<zeek::storage::backend::redis::Redis*>(rpe->context->data);
if ( rpe->reading == 1 ) if ( rpe->reading == 1 && ! during_expire )
zeek::iosource_mgr->UnregisterFd(rpe->fd, backend, zeek::iosource::IOSource::READ); zeek::iosource_mgr->UnregisterFd(rpe->fd, backend, zeek::iosource::IOSource::READ);
rpe->reading = 0; rpe->reading = 0;
} }
@ -99,7 +110,7 @@ void redisAddWrite(void* privdata) {
auto rpe = static_cast<redisPollEvents*>(privdata); auto rpe = static_cast<redisPollEvents*>(privdata);
auto backend = static_cast<zeek::storage::backend::redis::Redis*>(rpe->context->data); auto backend = static_cast<zeek::storage::backend::redis::Redis*>(rpe->context->data);
if ( rpe->writing == 0 ) if ( rpe->writing == 0 && ! during_expire )
zeek::iosource_mgr->RegisterFd(rpe->fd, backend, zeek::iosource::IOSource::WRITE); zeek::iosource_mgr->RegisterFd(rpe->fd, backend, zeek::iosource::IOSource::WRITE);
rpe->writing = 1; rpe->writing = 1;
} }
@ -109,11 +120,21 @@ void redisDelWrite(void* privdata) {
auto t = Tracer("delwrite"); auto t = Tracer("delwrite");
auto backend = static_cast<zeek::storage::backend::redis::Redis*>(rpe->context->data); auto backend = static_cast<zeek::storage::backend::redis::Redis*>(rpe->context->data);
if ( rpe->writing == 1 ) if ( rpe->writing == 1 && ! during_expire )
zeek::iosource_mgr->UnregisterFd(rpe->fd, backend, zeek::iosource::IOSource::WRITE); zeek::iosource_mgr->UnregisterFd(rpe->fd, backend, zeek::iosource::IOSource::WRITE);
rpe->writing = 0; rpe->writing = 0;
} }
// Creates a unique_lock based on a condition against a mutex. This is used to
// conditionally lock the expire_mutex. We only need to do it while reading
// pcaps. The only thread contention happens during Expire(), which only happens
// when reading pcaps. It's not worth the cycles to lock the mutex otherwise,
// and hiredis will deal with other cross-command contention correctly as long
// as it's in a single thread.
std::unique_lock<std::mutex> conditionally_lock(bool condition, std::mutex& mutex) {
return condition ? std::unique_lock<std::mutex>(mutex) : std::unique_lock<std::mutex>();
}
} // namespace } // namespace
namespace zeek::storage::backend::redis { namespace zeek::storage::backend::redis {
@ -213,6 +234,8 @@ OperationResult Redis::DoOpen(RecordValPtr options, OpenResultCallback* cb) {
* Finalizes the backend when it's being closed. * Finalizes the backend when it's being closed.
*/ */
OperationResult Redis::DoClose(OperationResultCallback* cb) { OperationResult Redis::DoClose(OperationResultCallback* cb) {
auto locked_scope = conditionally_lock(zeek::run_state::reading_traces, expire_mutex);
connected = false; connected = false;
redisAsyncDisconnect(async_ctx); redisAsyncDisconnect(async_ctx);
@ -240,6 +263,8 @@ OperationResult Redis::DoPut(ValPtr key, ValPtr value, bool overwrite, double ex
if ( ! connected && ! async_ctx ) if ( ! connected && ! async_ctx )
return {ReturnCode::NOT_CONNECTED}; return {ReturnCode::NOT_CONNECTED};
auto locked_scope = conditionally_lock(zeek::run_state::reading_traces, expire_mutex);
std::string format = "SET %s:%s %s"; std::string format = "SET %s:%s %s";
if ( ! overwrite ) if ( ! overwrite )
format.append(" NX"); format.append(" NX");
@ -251,9 +276,9 @@ OperationResult Redis::DoPut(ValPtr key, ValPtr value, bool overwrite, double ex
// Use built-in expiration if reading live data, since time will move // Use built-in expiration if reading live data, since time will move
// forward consistently. If reading pcaps, we'll do something else. // forward consistently. If reading pcaps, we'll do something else.
if ( expiration_time > 0.0 && ! zeek::run_state::reading_traces ) { if ( expiration_time > 0.0 && ! zeek::run_state::reading_traces ) {
format.append(" PXAT %d"); format.append(" PXAT %" PRIu64);
status = redisAsyncCommand(async_ctx, redisPut, cb, format.c_str(), key_prefix.data(), json_key.data(), status = redisAsyncCommand(async_ctx, redisPut, cb, format.c_str(), key_prefix.data(), json_key.data(),
json_value.data(), static_cast<uint64_t>(expiration_time * 1e6)); json_value.data(), static_cast<uint64_t>(expiration_time * 1e3));
} }
else else
status = redisAsyncCommand(async_ctx, redisPut, cb, format.c_str(), key_prefix.data(), json_key.data(), status = redisAsyncCommand(async_ctx, redisPut, cb, format.c_str(), key_prefix.data(), json_key.data(),
@ -272,7 +297,7 @@ OperationResult Redis::DoPut(ValPtr key, ValPtr value, bool overwrite, double ex
format.append(" NX"); format.append(" NX");
format += " %f %s"; format += " %f %s";
status = redisAsyncCommand(async_ctx, redisGeneric, NULL, format.c_str(), key_prefix.data(), expiration_time, status = redisAsyncCommand(async_ctx, redisZADD, NULL, format.c_str(), key_prefix.data(), expiration_time,
json_key.data()); json_key.data());
if ( connected && status == REDIS_ERR ) if ( connected && status == REDIS_ERR )
return {ReturnCode::OPERATION_FAILED, util::fmt("ZADD operation failed: %s", async_ctx->errstr)}; return {ReturnCode::OPERATION_FAILED, util::fmt("ZADD operation failed: %s", async_ctx->errstr)};
@ -291,6 +316,8 @@ OperationResult Redis::DoGet(ValPtr key, OperationResultCallback* cb) {
if ( ! connected && ! async_ctx ) if ( ! connected && ! async_ctx )
return {ReturnCode::NOT_CONNECTED}; return {ReturnCode::NOT_CONNECTED};
auto locked_scope = conditionally_lock(zeek::run_state::reading_traces, expire_mutex);
int status = redisAsyncCommand(async_ctx, redisGet, cb, "GET %s:%s", key_prefix.data(), int status = redisAsyncCommand(async_ctx, redisGet, cb, "GET %s:%s", key_prefix.data(),
key->ToJSON()->ToStdStringView().data()); key->ToJSON()->ToStdStringView().data());
@ -312,6 +339,8 @@ OperationResult Redis::DoErase(ValPtr key, OperationResultCallback* cb) {
if ( ! connected && ! async_ctx ) if ( ! connected && ! async_ctx )
return {ReturnCode::NOT_CONNECTED}; return {ReturnCode::NOT_CONNECTED};
auto locked_scope = conditionally_lock(zeek::run_state::reading_traces, expire_mutex);
int status = redisAsyncCommand(async_ctx, redisErase, cb, "DEL %s:%s", key_prefix.data(), int status = redisAsyncCommand(async_ctx, redisErase, cb, "DEL %s:%s", key_prefix.data(),
key->ToJSON()->ToStdStringView().data()); key->ToJSON()->ToStdStringView().data());
@ -328,12 +357,17 @@ void Redis::Expire() {
if ( ! connected || ! zeek::run_state::reading_traces ) if ( ! connected || ! zeek::run_state::reading_traces )
return; return;
int status = redisAsyncCommand(async_ctx, redisZRANGEBYSCORE, NULL, "ZRANGEBYSCORE %s_expire -inf %f", auto locked_scope = conditionally_lock(zeek::run_state::reading_traces, expire_mutex);
key_prefix.data(), run_state::network_time);
during_expire = true;
int status = redisAsyncCommand(async_ctx, redisGeneric, NULL, "ZRANGEBYSCORE %s_expire -inf %f", key_prefix.data(),
run_state::network_time);
if ( status == REDIS_ERR ) { if ( status == REDIS_ERR ) {
// TODO: do something with the error? // TODO: do something with the error?
printf("ZRANGEBYSCORE command failed: %s\n", async_ctx->errstr); printf("ZRANGEBYSCORE command failed: %s\n", async_ctx->errstr);
during_expire = false;
return; return;
} }
@ -347,22 +381,29 @@ void Redis::Expire() {
if ( reply->elements == 0 ) { if ( reply->elements == 0 ) {
freeReplyObject(reply); freeReplyObject(reply);
during_expire = false;
return; return;
} }
// The data from the reply to ZRANGEBYSCORE gets deleted as part of the std::vector<std::string> elements;
// commands below so we don't need to free it manually. Doing so results in for ( size_t i = 0; i < reply->elements; i++ )
// a double-free. elements.emplace_back(reply->element[i]->str);
freeReplyObject(reply);
// TODO: it's possible to pass multiple keys to a DEL operation but it requires // TODO: it's possible to pass multiple keys to a DEL operation but it requires
// building an array of the strings, building up the DEL command with entries, // building an array of the strings, building up the DEL command with entries,
// and passing the array as a block somehow. There's no guarantee it'd be faster // and passing the array as a block somehow. There's no guarantee it'd be faster
// anyways. // anyways.
for ( size_t i = 0; i < reply->elements; i++ ) { for ( const auto& e : elements ) {
status = status = redisAsyncCommand(async_ctx, redisGeneric, NULL, "DEL %s:%s", key_prefix.data(), e.c_str());
redisAsyncCommand(async_ctx, redisGeneric, NULL, "DEL %s:%s", key_prefix.data(), reply->element[i]->str);
++active_ops; ++active_ops;
Poll(); Poll();
redisReply* reply = reply_queue.front();
reply_queue.pop_front();
freeReplyObject(reply);
// TODO: do we care if this failed?
} }
// Remove all of the elements from the range-set that match the time range. // Remove all of the elements from the range-set that match the time range.
@ -371,6 +412,11 @@ void Redis::Expire() {
++active_ops; ++active_ops;
Poll(); Poll();
reply = reply_queue.front();
reply_queue.pop_front();
freeReplyObject(reply);
// TODO: do we care if this failed?
} }
void Redis::HandlePutResult(redisReply* reply, OperationResultCallback* callback) { void Redis::HandlePutResult(redisReply* reply, OperationResultCallback* callback) {
@ -421,8 +467,10 @@ void Redis::HandleEraseResult(redisReply* reply, OperationResultCallback* callba
} }
} }
void Redis::HandleZRANGEBYSCORE(redisReply* reply) { void Redis::HandleGeneric(redisReply* reply) {
--active_ops; --active_ops;
if ( reply )
reply_queue.push_back(reply); reply_queue.push_back(reply);
} }
@ -459,6 +507,8 @@ void Redis::OnDisconnect(int status) {
} }
void Redis::ProcessFd(int fd, int flags) { void Redis::ProcessFd(int fd, int flags) {
auto locked_scope = conditionally_lock(zeek::run_state::reading_traces, expire_mutex);
if ( (flags & IOSource::ProcessFlags::READ) != 0 ) if ( (flags & IOSource::ProcessFlags::READ) != 0 )
redisAsyncHandleRead(async_ctx); redisAsyncHandleRead(async_ctx);
if ( (flags & IOSource::ProcessFlags::WRITE) != 0 ) if ( (flags & IOSource::ProcessFlags::WRITE) != 0 )

View file

@ -2,6 +2,8 @@
#pragma once #pragma once
#include <mutex>
#include "zeek/iosource/IOSource.h" #include "zeek/iosource/IOSource.h"
#include "zeek/storage/Backend.h" #include "zeek/storage/Backend.h"
@ -75,11 +77,7 @@ public:
void HandlePutResult(redisReply* reply, OperationResultCallback* callback); void HandlePutResult(redisReply* reply, OperationResultCallback* callback);
void HandleGetResult(redisReply* reply, OperationResultCallback* callback); void HandleGetResult(redisReply* reply, OperationResultCallback* callback);
void HandleEraseResult(redisReply* reply, OperationResultCallback* callback); void HandleEraseResult(redisReply* reply, OperationResultCallback* callback);
void HandleZRANGEBYSCORE(redisReply* reply); void HandleGeneric(redisReply* reply);
// HandleGeneric exists so that async-running-as-sync operations can remove
// themselves from the list of active operations.
void HandleGeneric() { --active_ops; }
protected: protected:
void Poll() override; void Poll() override;
@ -95,6 +93,7 @@ private:
std::deque<redisReply*> reply_queue; std::deque<redisReply*> reply_queue;
OpenResultCallback* open_cb; OpenResultCallback* open_cb;
std::mutex expire_mutex;
std::string server_addr; std::string server_addr;
std::string key_prefix; std::string key_prefix;

View file

@ -344,6 +344,9 @@ function Storage::Sync::__put%(backend: opaque of Storage::BackendHandle, key: a
return op_result; return op_result;
} }
if ( expire_time > 0.0 )
expire_time += run_state::network_time;
auto cb = new OperationResultCallback(); auto cb = new OperationResultCallback();
auto key_v = IntrusivePtr<Val>{NewRef{}, key}; auto key_v = IntrusivePtr<Val>{NewRef{}, key};
auto val_v = IntrusivePtr<Val>{NewRef{}, value}; auto val_v = IntrusivePtr<Val>{NewRef{}, value};

View file

@ -1,6 +1,10 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
open result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=<opaque of BackendHandleVal>] open result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=<opaque of BackendHandleVal>]
put result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=<uninitialized>] put result 1, [code=Storage::SUCCESS, error_str=<uninitialized>, value=<uninitialized>]
get result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=value7890] put result 2, [code=Storage::SUCCESS, error_str=<uninitialized>, value=<uninitialized>]
get result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=value1234]
get result same as inserted, T get result same as inserted, T
get result after expiration, [code=Storage::KEY_NOT_FOUND, error_str=<uninitialized>, value=<uninitialized>] get result 2, [code=Storage::SUCCESS, error_str=<uninitialized>, value=value2345]
get result 2 same as inserted, T
get result 1 after expiration, [code=Storage::KEY_NOT_FOUND, error_str=<uninitialized>, value=<uninitialized>]
get result 2 after expiration, [code=Storage::SUCCESS, error_str=<uninitialized>, value=value2345]

View file

@ -0,0 +1,10 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
open result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=<opaque of BackendHandleVal>]
put result 1, [code=Storage::SUCCESS, error_str=<uninitialized>, value=<uninitialized>]
put result 2, [code=Storage::SUCCESS, error_str=<uninitialized>, value=<uninitialized>]
get result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=value1234]
get result same as inserted, T
get result 2, [code=Storage::SUCCESS, error_str=<uninitialized>, value=value2345]
get result 2 same as inserted, T
get result 1 after expiration, [code=Storage::KEY_NOT_FOUND, error_str=<uninitialized>, value=<uninitialized>]
get result 2 after expiration, [code=Storage::SUCCESS, error_str=<uninitialized>, value=value2345]

View file

@ -19,13 +19,19 @@ redef exit_only_after_terminate = T;
type str: string; type str: string;
global b: opaque of Storage::BackendHandle; global b: opaque of Storage::BackendHandle;
global key: string = "key1234"; global key1: string = "key1234";
global value: string = "value7890"; global value1: string = "value1234";
global key2: string = "key2345";
global value2: string = "value2345";
event check_removed() event check_removed()
{ {
local res2 = Storage::Sync::get(b, key); local res = Storage::Sync::get(b, key1);
print "get result after expiration", res2; print "get result 1 after expiration", res;
res = Storage::Sync::get(b, key2);
print "get result 2 after expiration", res;
Storage::Sync::close_backend(b); Storage::Sync::close_backend(b);
terminate(); terminate();
@ -42,13 +48,23 @@ event setup_test()
b = open_res$value; b = open_res$value;
local res = Storage::Sync::put(b, [ $key=key, $value=value, $expire_time=2secs ]); # Insert a key that will expire in the time allotted
print "put result", res; local res = Storage::Sync::put(b, [ $key=key1, $value=value1, $expire_time=2secs ]);
print "put result 1", res;
local res2 = Storage::Sync::get(b, key); # Insert a key that won't expire
print "get result", res2; res = Storage::Sync::put(b, [ $key=key2, $value=value2, $expire_time=20secs ]);
if ( res2$code == Storage::SUCCESS && res2?$value ) print "put result 2", res;
print "get result same as inserted", value == ( res2$value as string );
res = Storage::Sync::get(b, key1);
print "get result", res;
if ( res$code == Storage::SUCCESS && res?$value )
print "get result same as inserted", value1 == ( res$value as string );
res = Storage::Sync::get(b, key2);
print "get result 2", res;
if ( res$code == Storage::SUCCESS && res?$value )
print "get result 2 same as inserted", value2 == ( res$value as string );
schedule 5secs { check_removed() }; schedule 5secs { check_removed() };
} }

View file

@ -0,0 +1,78 @@
# @TEST-DOC: Tests expiration of data from Redis when reading a pcap
# @TEST-REQUIRES: have-redis
# @TEST-PORT: REDIS_PORT
# @TEST-EXEC: btest-bg-run redis-server run-redis-server ${REDIS_PORT%/tcp}
# @TEST-EXEC: zeek -B storage -b %INPUT > out
# @TEST-EXEC: btest-bg-wait -k 1
# @TEST-EXEC: btest-diff out
@load base/frameworks/storage/sync
@load policy/frameworks/storage/backend/redis
redef Storage::expire_interval = 2secs;
redef exit_only_after_terminate = T;
# Create a typename here that can be passed down into open_backend()
type str: string;
global b: opaque of Storage::BackendHandle;
global key1: string = "key1234";
global value1: string = "value1234";
global key2: string = "key2345";
global value2: string = "value2345";
event check_removed()
{
local res = Storage::Sync::get(b, key1);
print "get result 1 after expiration", res;
res = Storage::Sync::get(b, key2);
print "get result 2 after expiration", res;
Storage::Sync::close_backend(b);
terminate();
}
event setup_test()
{
local opts: Storage::BackendOptions;
opts$redis = [ $server_host="127.0.0.1", $server_port=to_port(getenv(
"REDIS_PORT")), $key_prefix="testing" ];
local open_res = Storage::Sync::open_backend(Storage::REDIS, opts, str, str);
print "open result", open_res;
b = open_res$value;
# Insert a key that will expire in the time allotted
local res = Storage::Sync::put(b, [ $key=key1, $value=value1, $expire_time=2secs ]);
print "put result 1", res;
# Insert a key that won't expire
res = Storage::Sync::put(b, [ $key=key2, $value=value2, $expire_time=20secs ]);
print "put result 2", res;
res = Storage::Sync::get(b, key1);
print "get result", res;
if ( res$code == Storage::SUCCESS && res?$value )
print "get result same as inserted", value1 == ( res$value as string );
res = Storage::Sync::get(b, key2);
print "get result 2", res;
if ( res$code == Storage::SUCCESS && res?$value )
print "get result 2 same as inserted", value2 == ( res$value as string );
schedule 5secs { check_removed() };
}
event zeek_init()
{
# We need network time to be set to something other than zero for the
# expiration time to be set correctly. Schedule an event on a short
# timer so packets start getting read and do the setup there.
schedule 100msecs { setup_test() };
}