diff --git a/src/storage/backend/redis/Redis.cc b/src/storage/backend/redis/Redis.cc index b1fb29ed03..e5e5312987 100644 --- a/src/storage/backend/redis/Redis.cc +++ b/src/storage/backend/redis/Redis.cc @@ -16,6 +16,8 @@ // Anonymous callback handler methods for the hiredis async API. namespace { +bool during_expire = false; + class Tracer { public: Tracer(const std::string& where) : where(where) {} // DBG_LOG(zeek::DBG_STORAGE, "%s", where.c_str()); } @@ -56,17 +58,21 @@ void redisErase(redisAsyncContext* ctx, void* reply, void* privdata) { backend->HandleEraseResult(static_cast(reply), callback); } -void redisZRANGEBYSCORE(redisAsyncContext* ctx, void* reply, void* privdata) { - auto t = Tracer("zrangebyscore"); +void redisZADD(redisAsyncContext* ctx, void* reply, void* privdata) { + auto t = Tracer("generic"); auto backend = static_cast(ctx->data); - backend->HandleZRANGEBYSCORE(static_cast(reply)); + + // We don't care about the reply from the ZADD, m1ostly because blocking to poll + // for it adds a bunch of complication to DoPut() with having to handle the + // reply from SET first. + backend->HandleGeneric(nullptr); + freeReplyObject(reply); } void redisGeneric(redisAsyncContext* ctx, void* reply, void* privdata) { auto t = Tracer("generic"); auto backend = static_cast(ctx->data); - backend->HandleGeneric(); - freeReplyObject(reply); + backend->HandleGeneric(static_cast(reply)); } // Because we called redisPollAttach in DoOpen(), privdata here is a @@ -74,12 +80,17 @@ void redisGeneric(redisAsyncContext* ctx, void* reply, void* privdata) { // data, which contains the backend. Because we overrode these callbacks in // DoOpen, we still want to mimic their callbacks to redisPollTick functions // correctly. +// +// Additionally, if we're in the middle of running a manual Expire() because +// we're reading a pcap, don't add the file descriptor into iosource_mgr. Manual +// calls to Poll() during that will handle reading/writing any data, and we +// don't want the contention with the main loop. void redisAddRead(void* privdata) { auto t = Tracer("addread"); auto rpe = static_cast(privdata); auto backend = static_cast(rpe->context->data); - if ( rpe->reading == 0 ) + if ( rpe->reading == 0 && ! during_expire ) zeek::iosource_mgr->RegisterFd(rpe->fd, backend, zeek::iosource::IOSource::READ); rpe->reading = 1; } @@ -89,7 +100,7 @@ void redisDelRead(void* privdata) { auto rpe = static_cast(privdata); auto backend = static_cast(rpe->context->data); - if ( rpe->reading == 1 ) + if ( rpe->reading == 1 && ! during_expire ) zeek::iosource_mgr->UnregisterFd(rpe->fd, backend, zeek::iosource::IOSource::READ); rpe->reading = 0; } @@ -99,7 +110,7 @@ void redisAddWrite(void* privdata) { auto rpe = static_cast(privdata); auto backend = static_cast(rpe->context->data); - if ( rpe->writing == 0 ) + if ( rpe->writing == 0 && ! during_expire ) zeek::iosource_mgr->RegisterFd(rpe->fd, backend, zeek::iosource::IOSource::WRITE); rpe->writing = 1; } @@ -109,11 +120,21 @@ void redisDelWrite(void* privdata) { auto t = Tracer("delwrite"); auto backend = static_cast(rpe->context->data); - if ( rpe->writing == 1 ) + if ( rpe->writing == 1 && ! during_expire ) zeek::iosource_mgr->UnregisterFd(rpe->fd, backend, zeek::iosource::IOSource::WRITE); rpe->writing = 0; } +// Creates a unique_lock based on a condition against a mutex. This is used to +// conditionally lock the expire_mutex. We only need to do it while reading +// pcaps. The only thread contention happens during Expire(), which only happens +// when reading pcaps. It's not worth the cycles to lock the mutex otherwise, +// and hiredis will deal with other cross-command contention correctly as long +// as it's in a single thread. +std::unique_lock conditionally_lock(bool condition, std::mutex& mutex) { + return condition ? std::unique_lock(mutex) : std::unique_lock(); +} + } // namespace namespace zeek::storage::backend::redis { @@ -213,6 +234,8 @@ OperationResult Redis::DoOpen(RecordValPtr options, OpenResultCallback* cb) { * Finalizes the backend when it's being closed. */ OperationResult Redis::DoClose(OperationResultCallback* cb) { + auto locked_scope = conditionally_lock(zeek::run_state::reading_traces, expire_mutex); + connected = false; redisAsyncDisconnect(async_ctx); @@ -240,6 +263,8 @@ OperationResult Redis::DoPut(ValPtr key, ValPtr value, bool overwrite, double ex if ( ! connected && ! async_ctx ) return {ReturnCode::NOT_CONNECTED}; + auto locked_scope = conditionally_lock(zeek::run_state::reading_traces, expire_mutex); + std::string format = "SET %s:%s %s"; if ( ! overwrite ) format.append(" NX"); @@ -251,9 +276,9 @@ OperationResult Redis::DoPut(ValPtr key, ValPtr value, bool overwrite, double ex // Use built-in expiration if reading live data, since time will move // forward consistently. If reading pcaps, we'll do something else. if ( expiration_time > 0.0 && ! zeek::run_state::reading_traces ) { - format.append(" PXAT %d"); + format.append(" PXAT %" PRIu64); status = redisAsyncCommand(async_ctx, redisPut, cb, format.c_str(), key_prefix.data(), json_key.data(), - json_value.data(), static_cast(expiration_time * 1e6)); + json_value.data(), static_cast(expiration_time * 1e3)); } else status = redisAsyncCommand(async_ctx, redisPut, cb, format.c_str(), key_prefix.data(), json_key.data(), @@ -272,7 +297,7 @@ OperationResult Redis::DoPut(ValPtr key, ValPtr value, bool overwrite, double ex format.append(" NX"); format += " %f %s"; - status = redisAsyncCommand(async_ctx, redisGeneric, NULL, format.c_str(), key_prefix.data(), expiration_time, + status = redisAsyncCommand(async_ctx, redisZADD, NULL, format.c_str(), key_prefix.data(), expiration_time, json_key.data()); if ( connected && status == REDIS_ERR ) return {ReturnCode::OPERATION_FAILED, util::fmt("ZADD operation failed: %s", async_ctx->errstr)}; @@ -291,6 +316,8 @@ OperationResult Redis::DoGet(ValPtr key, OperationResultCallback* cb) { if ( ! connected && ! async_ctx ) return {ReturnCode::NOT_CONNECTED}; + auto locked_scope = conditionally_lock(zeek::run_state::reading_traces, expire_mutex); + int status = redisAsyncCommand(async_ctx, redisGet, cb, "GET %s:%s", key_prefix.data(), key->ToJSON()->ToStdStringView().data()); @@ -312,6 +339,8 @@ OperationResult Redis::DoErase(ValPtr key, OperationResultCallback* cb) { if ( ! connected && ! async_ctx ) return {ReturnCode::NOT_CONNECTED}; + auto locked_scope = conditionally_lock(zeek::run_state::reading_traces, expire_mutex); + int status = redisAsyncCommand(async_ctx, redisErase, cb, "DEL %s:%s", key_prefix.data(), key->ToJSON()->ToStdStringView().data()); @@ -328,12 +357,17 @@ void Redis::Expire() { if ( ! connected || ! zeek::run_state::reading_traces ) return; - int status = redisAsyncCommand(async_ctx, redisZRANGEBYSCORE, NULL, "ZRANGEBYSCORE %s_expire -inf %f", - key_prefix.data(), run_state::network_time); + auto locked_scope = conditionally_lock(zeek::run_state::reading_traces, expire_mutex); + + during_expire = true; + + int status = redisAsyncCommand(async_ctx, redisGeneric, NULL, "ZRANGEBYSCORE %s_expire -inf %f", key_prefix.data(), + run_state::network_time); if ( status == REDIS_ERR ) { // TODO: do something with the error? printf("ZRANGEBYSCORE command failed: %s\n", async_ctx->errstr); + during_expire = false; return; } @@ -347,22 +381,29 @@ void Redis::Expire() { if ( reply->elements == 0 ) { freeReplyObject(reply); + during_expire = false; return; } - // The data from the reply to ZRANGEBYSCORE gets deleted as part of the - // commands below so we don't need to free it manually. Doing so results in - // a double-free. + std::vector elements; + for ( size_t i = 0; i < reply->elements; i++ ) + elements.emplace_back(reply->element[i]->str); + + freeReplyObject(reply); // TODO: it's possible to pass multiple keys to a DEL operation but it requires // building an array of the strings, building up the DEL command with entries, // and passing the array as a block somehow. There's no guarantee it'd be faster // anyways. - for ( size_t i = 0; i < reply->elements; i++ ) { - status = - redisAsyncCommand(async_ctx, redisGeneric, NULL, "DEL %s:%s", key_prefix.data(), reply->element[i]->str); + for ( const auto& e : elements ) { + status = redisAsyncCommand(async_ctx, redisGeneric, NULL, "DEL %s:%s", key_prefix.data(), e.c_str()); ++active_ops; Poll(); + + redisReply* reply = reply_queue.front(); + reply_queue.pop_front(); + freeReplyObject(reply); + // TODO: do we care if this failed? } // Remove all of the elements from the range-set that match the time range. @@ -371,6 +412,11 @@ void Redis::Expire() { ++active_ops; Poll(); + + reply = reply_queue.front(); + reply_queue.pop_front(); + freeReplyObject(reply); + // TODO: do we care if this failed? } void Redis::HandlePutResult(redisReply* reply, OperationResultCallback* callback) { @@ -421,9 +467,11 @@ void Redis::HandleEraseResult(redisReply* reply, OperationResultCallback* callba } } -void Redis::HandleZRANGEBYSCORE(redisReply* reply) { +void Redis::HandleGeneric(redisReply* reply) { --active_ops; - reply_queue.push_back(reply); + + if ( reply ) + reply_queue.push_back(reply); } void Redis::OnConnect(int status) { @@ -459,6 +507,8 @@ void Redis::OnDisconnect(int status) { } void Redis::ProcessFd(int fd, int flags) { + auto locked_scope = conditionally_lock(zeek::run_state::reading_traces, expire_mutex); + if ( (flags & IOSource::ProcessFlags::READ) != 0 ) redisAsyncHandleRead(async_ctx); if ( (flags & IOSource::ProcessFlags::WRITE) != 0 ) diff --git a/src/storage/backend/redis/Redis.h b/src/storage/backend/redis/Redis.h index b5a65d12d2..7bdfce9285 100644 --- a/src/storage/backend/redis/Redis.h +++ b/src/storage/backend/redis/Redis.h @@ -2,6 +2,8 @@ #pragma once +#include + #include "zeek/iosource/IOSource.h" #include "zeek/storage/Backend.h" @@ -75,11 +77,7 @@ public: void HandlePutResult(redisReply* reply, OperationResultCallback* callback); void HandleGetResult(redisReply* reply, OperationResultCallback* callback); void HandleEraseResult(redisReply* reply, OperationResultCallback* callback); - void HandleZRANGEBYSCORE(redisReply* reply); - - // HandleGeneric exists so that async-running-as-sync operations can remove - // themselves from the list of active operations. - void HandleGeneric() { --active_ops; } + void HandleGeneric(redisReply* reply); protected: void Poll() override; @@ -95,6 +93,7 @@ private: std::deque reply_queue; OpenResultCallback* open_cb; + std::mutex expire_mutex; std::string server_addr; std::string key_prefix; diff --git a/src/storage/storage.bif b/src/storage/storage.bif index 013b95e8be..c7c55ef2e4 100644 --- a/src/storage/storage.bif +++ b/src/storage/storage.bif @@ -344,6 +344,9 @@ function Storage::Sync::__put%(backend: opaque of Storage::BackendHandle, key: a return op_result; } + if ( expire_time > 0.0 ) + expire_time += run_state::network_time; + auto cb = new OperationResultCallback(); auto key_v = IntrusivePtr{NewRef{}, key}; auto val_v = IntrusivePtr{NewRef{}, value}; diff --git a/testing/btest/Baseline/scripts.base.frameworks.storage.redis-expiration/out b/testing/btest/Baseline/scripts.base.frameworks.storage.redis-expiration/out index e2421b325f..428580958a 100644 --- a/testing/btest/Baseline/scripts.base.frameworks.storage.redis-expiration/out +++ b/testing/btest/Baseline/scripts.base.frameworks.storage.redis-expiration/out @@ -1,6 +1,10 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. open result, [code=Storage::SUCCESS, error_str=, value=] -put result, [code=Storage::SUCCESS, error_str=, value=] -get result, [code=Storage::SUCCESS, error_str=, value=value7890] +put result 1, [code=Storage::SUCCESS, error_str=, value=] +put result 2, [code=Storage::SUCCESS, error_str=, value=] +get result, [code=Storage::SUCCESS, error_str=, value=value1234] get result same as inserted, T -get result after expiration, [code=Storage::KEY_NOT_FOUND, error_str=, value=] +get result 2, [code=Storage::SUCCESS, error_str=, value=value2345] +get result 2 same as inserted, T +get result 1 after expiration, [code=Storage::KEY_NOT_FOUND, error_str=, value=] +get result 2 after expiration, [code=Storage::SUCCESS, error_str=, value=value2345] diff --git a/testing/btest/Baseline/scripts.base.frameworks.storage.redis-native-expiration/out b/testing/btest/Baseline/scripts.base.frameworks.storage.redis-native-expiration/out new file mode 100644 index 0000000000..428580958a --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.storage.redis-native-expiration/out @@ -0,0 +1,10 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +open result, [code=Storage::SUCCESS, error_str=, value=] +put result 1, [code=Storage::SUCCESS, error_str=, value=] +put result 2, [code=Storage::SUCCESS, error_str=, value=] +get result, [code=Storage::SUCCESS, error_str=, value=value1234] +get result same as inserted, T +get result 2, [code=Storage::SUCCESS, error_str=, value=value2345] +get result 2 same as inserted, T +get result 1 after expiration, [code=Storage::KEY_NOT_FOUND, error_str=, value=] +get result 2 after expiration, [code=Storage::SUCCESS, error_str=, value=value2345] diff --git a/testing/btest/scripts/base/frameworks/storage/redis-expiration.zeek b/testing/btest/scripts/base/frameworks/storage/redis-expiration.zeek index c7223c4277..3d0ae7042c 100644 --- a/testing/btest/scripts/base/frameworks/storage/redis-expiration.zeek +++ b/testing/btest/scripts/base/frameworks/storage/redis-expiration.zeek @@ -19,13 +19,19 @@ redef exit_only_after_terminate = T; type str: string; global b: opaque of Storage::BackendHandle; -global key: string = "key1234"; -global value: string = "value7890"; +global key1: string = "key1234"; +global value1: string = "value1234"; + +global key2: string = "key2345"; +global value2: string = "value2345"; event check_removed() { - local res2 = Storage::Sync::get(b, key); - print "get result after expiration", res2; + local res = Storage::Sync::get(b, key1); + print "get result 1 after expiration", res; + + res = Storage::Sync::get(b, key2); + print "get result 2 after expiration", res; Storage::Sync::close_backend(b); terminate(); @@ -42,13 +48,23 @@ event setup_test() b = open_res$value; - local res = Storage::Sync::put(b, [ $key=key, $value=value, $expire_time=2secs ]); - print "put result", res; + # Insert a key that will expire in the time allotted + local res = Storage::Sync::put(b, [ $key=key1, $value=value1, $expire_time=2secs ]); + print "put result 1", res; - local res2 = Storage::Sync::get(b, key); - print "get result", res2; - if ( res2$code == Storage::SUCCESS && res2?$value ) - print "get result same as inserted", value == ( res2$value as string ); + # Insert a key that won't expire + res = Storage::Sync::put(b, [ $key=key2, $value=value2, $expire_time=20secs ]); + print "put result 2", res; + + res = Storage::Sync::get(b, key1); + print "get result", res; + if ( res$code == Storage::SUCCESS && res?$value ) + print "get result same as inserted", value1 == ( res$value as string ); + + res = Storage::Sync::get(b, key2); + print "get result 2", res; + if ( res$code == Storage::SUCCESS && res?$value ) + print "get result 2 same as inserted", value2 == ( res$value as string ); schedule 5secs { check_removed() }; } diff --git a/testing/btest/scripts/base/frameworks/storage/redis-native-expiration.zeek b/testing/btest/scripts/base/frameworks/storage/redis-native-expiration.zeek new file mode 100644 index 0000000000..bbcf58a13d --- /dev/null +++ b/testing/btest/scripts/base/frameworks/storage/redis-native-expiration.zeek @@ -0,0 +1,78 @@ +# @TEST-DOC: Tests expiration of data from Redis when reading a pcap + +# @TEST-REQUIRES: have-redis +# @TEST-PORT: REDIS_PORT + +# @TEST-EXEC: btest-bg-run redis-server run-redis-server ${REDIS_PORT%/tcp} +# @TEST-EXEC: zeek -B storage -b %INPUT > out +# @TEST-EXEC: btest-bg-wait -k 1 + +# @TEST-EXEC: btest-diff out + +@load base/frameworks/storage/sync +@load policy/frameworks/storage/backend/redis + +redef Storage::expire_interval = 2secs; +redef exit_only_after_terminate = T; + +# Create a typename here that can be passed down into open_backend() +type str: string; + +global b: opaque of Storage::BackendHandle; +global key1: string = "key1234"; +global value1: string = "value1234"; + +global key2: string = "key2345"; +global value2: string = "value2345"; + +event check_removed() + { + local res = Storage::Sync::get(b, key1); + print "get result 1 after expiration", res; + + res = Storage::Sync::get(b, key2); + print "get result 2 after expiration", res; + + Storage::Sync::close_backend(b); + terminate(); + } + +event setup_test() + { + local opts: Storage::BackendOptions; + opts$redis = [ $server_host="127.0.0.1", $server_port=to_port(getenv( + "REDIS_PORT")), $key_prefix="testing" ]; + + local open_res = Storage::Sync::open_backend(Storage::REDIS, opts, str, str); + print "open result", open_res; + + b = open_res$value; + + # Insert a key that will expire in the time allotted + local res = Storage::Sync::put(b, [ $key=key1, $value=value1, $expire_time=2secs ]); + print "put result 1", res; + + # Insert a key that won't expire + res = Storage::Sync::put(b, [ $key=key2, $value=value2, $expire_time=20secs ]); + print "put result 2", res; + + res = Storage::Sync::get(b, key1); + print "get result", res; + if ( res$code == Storage::SUCCESS && res?$value ) + print "get result same as inserted", value1 == ( res$value as string ); + + res = Storage::Sync::get(b, key2); + print "get result 2", res; + if ( res$code == Storage::SUCCESS && res?$value ) + print "get result 2 same as inserted", value2 == ( res$value as string ); + + schedule 5secs { check_removed() }; + } + +event zeek_init() + { + # We need network time to be set to something other than zero for the + # expiration time to be set correctly. Schedule an event on a short + # timer so packets start getting read and do the setup there. + schedule 100msecs { setup_test() }; + }