mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 14:48:21 +00:00
Redis: Support non-native expiration when reading traces
This commit is contained in:
parent
08bebaa426
commit
ea87c773cd
4 changed files with 132 additions and 1 deletions
|
@ -217,7 +217,7 @@ ErrorResult Redis::DoPut(ValPtr key, ValPtr value, bool overwrite, double expira
|
||||||
|
|
||||||
if ( async_mode ) {
|
if ( async_mode ) {
|
||||||
int status;
|
int status;
|
||||||
if ( expiration_time > 0.0 && ! zeek::run_state::reading_traces )
|
if ( expiration_time > 0.0 )
|
||||||
status = redisAsyncCommand(async_ctx, redisPut, cb, format.c_str(), key_prefix.data(), json_key.data(),
|
status = redisAsyncCommand(async_ctx, redisPut, cb, format.c_str(), key_prefix.data(), json_key.data(),
|
||||||
json_value.data(), static_cast<uint64_t>(expire_time * 1e6));
|
json_value.data(), static_cast<uint64_t>(expire_time * 1e6));
|
||||||
else
|
else
|
||||||
|
@ -242,6 +242,22 @@ ErrorResult Redis::DoPut(ValPtr key, ValPtr value, bool overwrite, double expira
|
||||||
freeReplyObject(reply);
|
freeReplyObject(reply);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If reading pcaps insert into a secondary set that's ordered by expiration
|
||||||
|
// time that gets checked by Expire().
|
||||||
|
if ( expiration_time > 0.0 && zeek::run_state::reading_traces ) {
|
||||||
|
format = "ZADD %s_expire";
|
||||||
|
if ( ! overwrite )
|
||||||
|
format.append(" NX");
|
||||||
|
format += " %f %s";
|
||||||
|
|
||||||
|
redisReply* reply =
|
||||||
|
(redisReply*)redisCommand(ctx, format.c_str(), key_prefix.data(), expire_time, json_key.data());
|
||||||
|
if ( ! reply )
|
||||||
|
return util::fmt("ZADD operation failed: %s", ctx->errstr);
|
||||||
|
|
||||||
|
freeReplyObject(reply);
|
||||||
|
}
|
||||||
|
|
||||||
return std::nullopt;
|
return std::nullopt;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -303,6 +319,50 @@ ErrorResult Redis::DoErase(ValPtr key, ErrorResultCallback* cb) {
|
||||||
return std::nullopt;
|
return std::nullopt;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void Redis::Expire() {
|
||||||
|
if ( ! connected || ! zeek::run_state::reading_traces )
|
||||||
|
return;
|
||||||
|
|
||||||
|
redisReply* reply =
|
||||||
|
(redisReply*)redisCommand(ctx, "ZRANGEBYSCORE %s_expire -inf %f", key_prefix.data(), run_state::network_time);
|
||||||
|
|
||||||
|
if ( ! reply ) {
|
||||||
|
// TODO: do something with the error?
|
||||||
|
printf("ZRANGEBYSCORE command failed: %s\n", ctx->errstr);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ( reply->elements == 0 ) {
|
||||||
|
freeReplyObject(reply);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: it's possible to pass multiple keys to a DEL operation but it requires
|
||||||
|
// building an array of the strings, building up the DEL command with entries,
|
||||||
|
// and passing the array as a block somehow. There's no guarantee it'd be faster
|
||||||
|
// anyways.
|
||||||
|
for ( size_t i = 0; i < reply->elements; i++ ) {
|
||||||
|
auto del_reply = (redisReply*)redisCommand(ctx, "DEL %s:%s", key_prefix.data(), reply->element[i]->str);
|
||||||
|
|
||||||
|
// Don't bother checking the response here. The only error that would matter is if the key
|
||||||
|
// didn't exist, but that would mean it was already removed for some other reason.
|
||||||
|
|
||||||
|
freeReplyObject(del_reply);
|
||||||
|
}
|
||||||
|
|
||||||
|
freeReplyObject(reply);
|
||||||
|
reply = (redisReply*)redisCommand(ctx, "ZREMRANGEBYSCORE %s_expire -inf %f", key_prefix.data(),
|
||||||
|
run_state::network_time);
|
||||||
|
|
||||||
|
if ( ! reply ) {
|
||||||
|
// TODO: do something with the error?
|
||||||
|
printf("ZREMRANGEBYSCORE command failed: %s\n", ctx->errstr);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
freeReplyObject(reply);
|
||||||
|
}
|
||||||
|
|
||||||
void Redis::HandlePutResult(redisReply* reply, ErrorResultCallback* callback) {
|
void Redis::HandlePutResult(redisReply* reply, ErrorResultCallback* callback) {
|
||||||
ErrorResult res;
|
ErrorResult res;
|
||||||
if ( ! connected )
|
if ( ! connected )
|
||||||
|
|
|
@ -58,6 +58,12 @@ public:
|
||||||
*/
|
*/
|
||||||
ErrorResult DoErase(ValPtr key, ErrorResultCallback* cb = nullptr) override;
|
ErrorResult DoErase(ValPtr key, ErrorResultCallback* cb = nullptr) override;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Removes any entries in the backend that have expired. Can be overridden by
|
||||||
|
* derived classes.
|
||||||
|
*/
|
||||||
|
void Expire() override;
|
||||||
|
|
||||||
// IOSource interface
|
// IOSource interface
|
||||||
double GetNextTimeout() override { return -1; }
|
double GetNextTimeout() override { return -1; }
|
||||||
void Process() override {}
|
void Process() override {}
|
||||||
|
|
|
@ -0,0 +1,5 @@
|
||||||
|
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||||
|
put result, T
|
||||||
|
get result, value7890
|
||||||
|
get result same as inserted, T
|
||||||
|
get result after expiration, F
|
|
@ -0,0 +1,60 @@
|
||||||
|
# @TEST-DOC: Tests expiration of data from Redis when reading a pcap
|
||||||
|
|
||||||
|
# @TEST-REQUIRES: have-redis
|
||||||
|
# @TEST-PORT: REDIS_PORT
|
||||||
|
|
||||||
|
# Generate a redis.conf file with the port defined above, but without the /tcp at the end of
|
||||||
|
# it. This also sets some paths in the conf to the testing directory.
|
||||||
|
# @TEST-EXEC: cat $FILES/redis.conf | sed "s|%REDIS_PORT%|${REDIS_PORT%/tcp}|g" | sed "s|%RUN_PATH%|$(pwd)|g" > ./redis.conf
|
||||||
|
# @TEST-EXEC: btest-bg-run redis redis-server ../redis.conf
|
||||||
|
# @TEST-EXEC: zcat <$TRACES/echo-connections.pcap.gz | zeek -B storage -b -Cr - %INPUT > out
|
||||||
|
# @TEST-EXEC: btest-bg-wait -k 1
|
||||||
|
|
||||||
|
# @TEST-EXEC: btest-diff out
|
||||||
|
|
||||||
|
@load base/frameworks/storage
|
||||||
|
@load policy/frameworks/storage/backend/redis
|
||||||
|
|
||||||
|
redef Storage::expire_interval = 2 secs;
|
||||||
|
redef exit_only_after_terminate = T;
|
||||||
|
|
||||||
|
# Create a typename here that can be passed down into open_backend()
|
||||||
|
type str: string;
|
||||||
|
|
||||||
|
global b: opaque of Storage::BackendHandle;
|
||||||
|
global key: string = "key1234";
|
||||||
|
global value: string = "value7890";
|
||||||
|
|
||||||
|
event check_removed() {
|
||||||
|
local res2 = Storage::get(b, key, F);
|
||||||
|
print "get result after expiration", res2;
|
||||||
|
|
||||||
|
Storage::close_backend(b);
|
||||||
|
terminate();
|
||||||
|
}
|
||||||
|
|
||||||
|
event setup_test() {
|
||||||
|
local opts : Storage::Backend::Redis::Options;
|
||||||
|
opts$server_host = "127.0.0.1";
|
||||||
|
opts$server_port = to_port(getenv("REDIS_PORT"));
|
||||||
|
opts$key_prefix = "testing";
|
||||||
|
opts$async_mode = F;
|
||||||
|
|
||||||
|
b = Storage::open_backend(Storage::REDIS, opts, str, str);
|
||||||
|
|
||||||
|
local res = Storage::put(b, [$key=key, $value=value, $async_mode=F, $expire_time=2 secs]);
|
||||||
|
print "put result", res;
|
||||||
|
|
||||||
|
local res2 = Storage::get(b, key, F);
|
||||||
|
print "get result", res2;
|
||||||
|
print "get result same as inserted", value == (res2 as string);
|
||||||
|
|
||||||
|
schedule 5 secs { check_removed() };
|
||||||
|
}
|
||||||
|
|
||||||
|
event zeek_init() {
|
||||||
|
# We need network time to be set to something other than zero for the
|
||||||
|
# expiration time to be set correctly. Schedule an event on a short
|
||||||
|
# timer so packets start getting read and do the setup there.
|
||||||
|
schedule 100 msecs { setup_test() };
|
||||||
|
}
|
Loading…
Add table
Add a link
Reference in a new issue