diff --git a/src/storage/backend/redis/Redis.cc b/src/storage/backend/redis/Redis.cc index 3fded1452a..a30c17e23a 100644 --- a/src/storage/backend/redis/Redis.cc +++ b/src/storage/backend/redis/Redis.cc @@ -217,7 +217,7 @@ ErrorResult Redis::DoPut(ValPtr key, ValPtr value, bool overwrite, double expira if ( async_mode ) { int status; - if ( expiration_time > 0.0 && ! zeek::run_state::reading_traces ) + if ( expiration_time > 0.0 ) status = redisAsyncCommand(async_ctx, redisPut, cb, format.c_str(), key_prefix.data(), json_key.data(), json_value.data(), static_cast(expire_time * 1e6)); else @@ -242,6 +242,22 @@ ErrorResult Redis::DoPut(ValPtr key, ValPtr value, bool overwrite, double expira freeReplyObject(reply); } + // If reading pcaps insert into a secondary set that's ordered by expiration + // time that gets checked by Expire(). + if ( expiration_time > 0.0 && zeek::run_state::reading_traces ) { + format = "ZADD %s_expire"; + if ( ! overwrite ) + format.append(" NX"); + format += " %f %s"; + + redisReply* reply = + (redisReply*)redisCommand(ctx, format.c_str(), key_prefix.data(), expire_time, json_key.data()); + if ( ! reply ) + return util::fmt("ZADD operation failed: %s", ctx->errstr); + + freeReplyObject(reply); + } + return std::nullopt; } @@ -303,6 +319,50 @@ ErrorResult Redis::DoErase(ValPtr key, ErrorResultCallback* cb) { return std::nullopt; } +void Redis::Expire() { + if ( ! connected || ! zeek::run_state::reading_traces ) + return; + + redisReply* reply = + (redisReply*)redisCommand(ctx, "ZRANGEBYSCORE %s_expire -inf %f", key_prefix.data(), run_state::network_time); + + if ( ! reply ) { + // TODO: do something with the error? + printf("ZRANGEBYSCORE command failed: %s\n", ctx->errstr); + return; + } + + if ( reply->elements == 0 ) { + freeReplyObject(reply); + return; + } + + // TODO: it's possible to pass multiple keys to a DEL operation but it requires + // building an array of the strings, building up the DEL command with entries, + // and passing the array as a block somehow. There's no guarantee it'd be faster + // anyways. + for ( size_t i = 0; i < reply->elements; i++ ) { + auto del_reply = (redisReply*)redisCommand(ctx, "DEL %s:%s", key_prefix.data(), reply->element[i]->str); + + // Don't bother checking the response here. The only error that would matter is if the key + // didn't exist, but that would mean it was already removed for some other reason. + + freeReplyObject(del_reply); + } + + freeReplyObject(reply); + reply = (redisReply*)redisCommand(ctx, "ZREMRANGEBYSCORE %s_expire -inf %f", key_prefix.data(), + run_state::network_time); + + if ( ! reply ) { + // TODO: do something with the error? + printf("ZREMRANGEBYSCORE command failed: %s\n", ctx->errstr); + return; + } + + freeReplyObject(reply); +} + void Redis::HandlePutResult(redisReply* reply, ErrorResultCallback* callback) { ErrorResult res; if ( ! connected ) diff --git a/src/storage/backend/redis/Redis.h b/src/storage/backend/redis/Redis.h index 68f1969489..2d283f2965 100644 --- a/src/storage/backend/redis/Redis.h +++ b/src/storage/backend/redis/Redis.h @@ -58,6 +58,12 @@ public: */ ErrorResult DoErase(ValPtr key, ErrorResultCallback* cb = nullptr) override; + /** + * Removes any entries in the backend that have expired. Can be overridden by + * derived classes. + */ + void Expire() override; + // IOSource interface double GetNextTimeout() override { return -1; } void Process() override {} diff --git a/testing/btest/Baseline/scripts.base.frameworks.storage.redis-expiration/out b/testing/btest/Baseline/scripts.base.frameworks.storage.redis-expiration/out new file mode 100644 index 0000000000..88d16bb4e3 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.storage.redis-expiration/out @@ -0,0 +1,5 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +put result, T +get result, value7890 +get result same as inserted, T +get result after expiration, F diff --git a/testing/btest/scripts/base/frameworks/storage/redis-expiration.zeek b/testing/btest/scripts/base/frameworks/storage/redis-expiration.zeek new file mode 100644 index 0000000000..810bbb102d --- /dev/null +++ b/testing/btest/scripts/base/frameworks/storage/redis-expiration.zeek @@ -0,0 +1,60 @@ +# @TEST-DOC: Tests expiration of data from Redis when reading a pcap + +# @TEST-REQUIRES: have-redis +# @TEST-PORT: REDIS_PORT + +# Generate a redis.conf file with the port defined above, but without the /tcp at the end of +# it. This also sets some paths in the conf to the testing directory. +# @TEST-EXEC: cat $FILES/redis.conf | sed "s|%REDIS_PORT%|${REDIS_PORT%/tcp}|g" | sed "s|%RUN_PATH%|$(pwd)|g" > ./redis.conf +# @TEST-EXEC: btest-bg-run redis redis-server ../redis.conf +# @TEST-EXEC: zcat <$TRACES/echo-connections.pcap.gz | zeek -B storage -b -Cr - %INPUT > out +# @TEST-EXEC: btest-bg-wait -k 1 + +# @TEST-EXEC: btest-diff out + +@load base/frameworks/storage +@load policy/frameworks/storage/backend/redis + +redef Storage::expire_interval = 2 secs; +redef exit_only_after_terminate = T; + +# Create a typename here that can be passed down into open_backend() +type str: string; + +global b: opaque of Storage::BackendHandle; +global key: string = "key1234"; +global value: string = "value7890"; + +event check_removed() { + local res2 = Storage::get(b, key, F); + print "get result after expiration", res2; + + Storage::close_backend(b); + terminate(); +} + +event setup_test() { + local opts : Storage::Backend::Redis::Options; + opts$server_host = "127.0.0.1"; + opts$server_port = to_port(getenv("REDIS_PORT")); + opts$key_prefix = "testing"; + opts$async_mode = F; + + b = Storage::open_backend(Storage::REDIS, opts, str, str); + + local res = Storage::put(b, [$key=key, $value=value, $async_mode=F, $expire_time=2 secs]); + print "put result", res; + + local res2 = Storage::get(b, key, F); + print "get result", res2; + print "get result same as inserted", value == (res2 as string); + + schedule 5 secs { check_removed() }; +} + +event zeek_init() { + # We need network time to be set to something other than zero for the + # expiration time to be set correctly. Schedule an event on a short + # timer so packets start getting read and do the setup there. + schedule 100 msecs { setup_test() }; +}