Redis: Rework everything to only use async mode

This commit is contained in:
Tim Wojtulewicz 2025-02-14 14:24:55 -08:00
parent 40f60f26b3
commit c247de8ec3
11 changed files with 410 additions and 317 deletions

View file

@ -22,17 +22,6 @@ export {
# but preferably should be set to a unique value per Redis # but preferably should be set to a unique value per Redis
# backend opened. # backend opened.
key_prefix: string &default=""; key_prefix: string &default="";
# Redis only supports sync and async separately. You cannot do
# both with the same connection. If this flag is true, the
# connection will be async and will only allow commands via
# ``when`` commands. You will still need to set the
# ``async_mode`` flags of the put, get, and erase methods to
# match this flag. This flag is overridden when reading pcaps
# and the backend will be forced into synchronous mode, since
# time won't move forward the same as when capturing live
# traffic.
async_mode: bool &default=T;
}; };
redef record Storage::BackendOptions += { redef record Storage::BackendOptions += {

View file

@ -109,7 +109,7 @@ ErrorResult Backend::Put(ValPtr key, ValPtr value, bool overwrite, double expira
auto res = DoPut(std::move(key), std::move(value), overwrite, expiration_time, cb); auto res = DoPut(std::move(key), std::move(value), overwrite, expiration_time, cb);
if ( (! native_async || zeek::run_state::reading_traces) && cb ) { if ( ! native_async && cb ) {
cb->Complete(res); cb->Complete(res);
delete cb; delete cb;
} }
@ -125,7 +125,7 @@ ValResult Backend::Get(ValPtr key, ValResultCallback* cb) {
auto res = DoGet(std::move(key), cb); auto res = DoGet(std::move(key), cb);
if ( (! native_async || zeek::run_state::reading_traces) && cb ) { if ( ! native_async && cb ) {
cb->Complete(res); cb->Complete(res);
delete cb; delete cb;
} }
@ -141,7 +141,7 @@ ErrorResult Backend::Erase(ValPtr key, ErrorResultCallback* cb) {
auto res = DoErase(std::move(key), cb); auto res = DoErase(std::move(key), cb);
if ( (! native_async || zeek::run_state::reading_traces) && cb ) { if ( ! native_async && cb ) {
cb->Complete(res); cb->Complete(res);
delete cb; delete cb;
} }

View file

@ -8,6 +8,7 @@
#include "zeek/Val.h" #include "zeek/Val.h"
#include "zeek/iosource/Manager.h" #include "zeek/iosource/Manager.h"
#include "hiredis/adapters/poll.h"
#include "hiredis/async.h" #include "hiredis/async.h"
#include "hiredis/hiredis.h" #include "hiredis/hiredis.h"
@ -16,8 +17,10 @@ namespace {
class Tracer { class Tracer {
public: public:
Tracer(const std::string& where) : where(where) { /*printf("%s\n", where.c_str());*/ } Tracer(const std::string& where) : where(where) { // printf("%s\n", where.c_str());
~Tracer() { /* printf("%s done\n", where.c_str()); */ } }
~Tracer() { // printf("%s done\n", where.c_str());
}
std::string where; std::string where;
}; };
@ -54,25 +57,62 @@ void redisErase(redisAsyncContext* ctx, void* reply, void* privdata) {
backend->HandleEraseResult(static_cast<redisReply*>(reply), callback); backend->HandleEraseResult(static_cast<redisReply*>(reply), callback);
} }
void redisZRANGEBYSCORE(redisAsyncContext* ctx, void* reply, void* privdata) {
auto t = Tracer("erase");
auto backend = static_cast<zeek::storage::backend::redis::Redis*>(ctx->data);
backend->HandleZRANGEBYSCORE(static_cast<redisReply*>(reply));
}
void redisGeneric(redisAsyncContext* ctx, void* reply, void* privdata) {
auto t = Tracer("generic");
auto backend = static_cast<zeek::storage::backend::redis::Redis*>(ctx->data);
backend->HandleGeneric();
freeReplyObject(reply);
}
// Because we called redisPollAttach in DoOpen(), privdata here is a
// redisPollEvents object. We can go through that object to get the context's
// data, which contains the backend. Because we overrode these callbacks in
// DoOpen, we still want to mimic their callbacks to redisPollTick functions
// correctly.
void redisAddRead(void* privdata) { void redisAddRead(void* privdata) {
auto t = Tracer("addread"); auto t = Tracer("addread");
auto backend = static_cast<zeek::storage::backend::redis::Redis*>(privdata); auto rpe = static_cast<redisPollEvents*>(privdata);
backend->OnAddRead(); auto backend = static_cast<zeek::storage::backend::redis::Redis*>(rpe->context->data);
if ( rpe->reading == 0 )
zeek::iosource_mgr->RegisterFd(rpe->fd, backend, zeek::iosource::IOSource::READ);
rpe->reading = 1;
} }
void redisDelRead(void* privdata) { void redisDelRead(void* privdata) {
auto t = Tracer("delread"); auto t = Tracer("delread");
auto backend = static_cast<zeek::storage::backend::redis::Redis*>(privdata); auto rpe = static_cast<redisPollEvents*>(privdata);
backend->OnDelRead(); auto backend = static_cast<zeek::storage::backend::redis::Redis*>(rpe->context->data);
if ( rpe->reading == 1 )
zeek::iosource_mgr->UnregisterFd(rpe->fd, backend, zeek::iosource::IOSource::READ);
rpe->reading = 0;
} }
void redisAddWrite(void* privdata) { void redisAddWrite(void* privdata) {
auto t = Tracer("addwrite"); auto t = Tracer("addwrite");
auto backend = static_cast<zeek::storage::backend::redis::Redis*>(privdata); auto rpe = static_cast<redisPollEvents*>(privdata);
backend->OnAddWrite(); auto backend = static_cast<zeek::storage::backend::redis::Redis*>(rpe->context->data);
if ( rpe->writing == 0 )
zeek::iosource_mgr->RegisterFd(rpe->fd, backend, zeek::iosource::IOSource::WRITE);
rpe->writing = 1;
} }
void redisDelWrite(void* privdata) { void redisDelWrite(void* privdata) {
auto rpe = static_cast<redisPollEvents*>(privdata);
auto t = Tracer("delwrite"); auto t = Tracer("delwrite");
auto backend = static_cast<zeek::storage::backend::redis::Redis*>(privdata); auto backend = static_cast<zeek::storage::backend::redis::Redis*>(rpe->context->data);
backend->OnDelWrite();
if ( rpe->writing == 1 )
zeek::iosource_mgr->UnregisterFd(rpe->fd, backend, zeek::iosource::IOSource::WRITE);
rpe->writing = 0;
} }
} // namespace } // namespace
@ -87,13 +127,8 @@ storage::BackendPtr Redis::Instantiate(std::string_view tag) { return make_intru
ErrorResult Redis::DoOpen(RecordValPtr options, OpenResultCallback* cb) { ErrorResult Redis::DoOpen(RecordValPtr options, OpenResultCallback* cb) {
RecordValPtr backend_options = options->GetField<RecordVal>("redis"); RecordValPtr backend_options = options->GetField<RecordVal>("redis");
// When reading traces we disable storage async mode globally (see src/storage/Backend.cc) since
// time moves forward based on the pcap and not based on real time.
async_mode = backend_options->GetField<BoolVal>("async_mode")->Get() && ! zeek::run_state::reading_traces;
key_prefix = backend_options->GetField<StringVal>("key_prefix")->ToStdString(); key_prefix = backend_options->GetField<StringVal>("key_prefix")->ToStdString();
DBG_LOG(DBG_STORAGE, "Redis backend: running in async mode? %d", async_mode);
redisOptions opt = {0}; redisOptions opt = {0};
StringValPtr host = backend_options->GetField<StringVal>("server_host"); StringValPtr host = backend_options->GetField<StringVal>("server_host");
@ -118,7 +153,6 @@ ErrorResult Redis::DoOpen(RecordValPtr options, OpenResultCallback* cb) {
struct timeval timeout = {5, 0}; struct timeval timeout = {5, 0};
opt.connect_timeout = &timeout; opt.connect_timeout = &timeout;
if ( async_mode ) {
async_ctx = redisAsyncConnectWithOptions(&opt); async_ctx = redisAsyncConnectWithOptions(&opt);
if ( async_ctx == nullptr || async_ctx->err ) { if ( async_ctx == nullptr || async_ctx->err ) {
// This block doesn't necessarily mean the connection failed. It means // This block doesn't necessarily mean the connection failed. It means
@ -135,40 +169,42 @@ ErrorResult Redis::DoOpen(RecordValPtr options, OpenResultCallback* cb) {
return errmsg; return errmsg;
} }
++active_ops;
// TODO: Sort out how to pass the zeek callbacks for both open/done to the async // TODO: Sort out how to pass the zeek callbacks for both open/done to the async
// callbacks from hiredis so they can return errors. // callbacks from hiredis so they can return errors.
// The context is passed to the handler methods. Setting this data object // The context is passed to the handler methods. Setting this data object
// pointer allows us to look up the backend in the handlers. // pointer allows us to look up the backend in the handlers.
async_ctx->data = this; async_ctx->data = this;
async_ctx->ev.data = this;
redisPollAttach(async_ctx);
redisAsyncSetConnectCallback(async_ctx, redisOnConnect); redisAsyncSetConnectCallback(async_ctx, redisOnConnect);
redisAsyncSetDisconnectCallback(async_ctx, redisOnDisconnect); redisAsyncSetDisconnectCallback(async_ctx, redisOnDisconnect);
// redisAsyncSetConnectCallback sets the flag in the redisPollEvent for writing
// so we can add this to our loop as well.
zeek::iosource_mgr->RegisterFd(async_ctx->c.fd, this, zeek::iosource::IOSource::WRITE);
// These four callbacks handle the file descriptor coming and going for read // These four callbacks handle the file descriptor coming and going for read
// and write operations for hiredis. Their subsequent callbacks will // and write operations for hiredis. Their subsequent callbacks will
// register/unregister with iosource_mgr as needed. I tried just registering // register/unregister with iosource_mgr as needed. I tried just registering
// full time for both read and write but it leads to weird syncing issues // full time for both read and write but it leads to weird syncing issues
// within the hiredis code. This is safer in regards to the library, even if // within the hiredis code. This is safer in regards to the library, even if
// it results in waking up our IO loop more frequently. // it results in waking up our IO loop more frequently.
//
// redisPollAttach sets these to functions internal to the poll attachment,
// but we override them for our own uses. See the callbacks for more info
// about why.
async_ctx->ev.addRead = redisAddRead; async_ctx->ev.addRead = redisAddRead;
async_ctx->ev.delRead = redisDelRead; async_ctx->ev.delRead = redisDelRead;
async_ctx->ev.addWrite = redisAddWrite; async_ctx->ev.addWrite = redisAddWrite;
async_ctx->ev.delWrite = redisDelWrite; async_ctx->ev.delWrite = redisDelWrite;
}
else {
ctx = redisConnectWithOptions(&opt);
if ( ctx == nullptr || ctx->err ) {
if ( ctx )
return util::fmt("Failed to open connection to Redis server at %s", server_addr.c_str());
else
return util::fmt("Failed to open connection to Redis server at %s: %s", server_addr.c_str(),
ctx->errstr);
}
connected = true; if ( ! cb )
} // Polling here will eventually call OnConnect, which will set the flag
// that we're connected.
Poll();
return std::nullopt; return std::nullopt;
} }
@ -179,19 +215,16 @@ ErrorResult Redis::DoOpen(RecordValPtr options, OpenResultCallback* cb) {
ErrorResult Redis::DoClose(ErrorResultCallback* cb) { ErrorResult Redis::DoClose(ErrorResultCallback* cb) {
connected = false; connected = false;
if ( async_mode ) {
// This will probably result in an error since hiredis should have
// already removed the file descriptor via the delRead and delWrite
// callbacks, but do it anyways just to be sure.
iosource_mgr->UnregisterFd(async_ctx->c.fd, this, IOSource::READ | IOSource::WRITE);
redisAsyncDisconnect(async_ctx); redisAsyncDisconnect(async_ctx);
++active_ops;
if ( ! cb && ! zeek::run_state::terminating ) {
Poll();
// TODO: handle response
}
redisAsyncFree(async_ctx); redisAsyncFree(async_ctx);
async_ctx = nullptr; async_ctx = nullptr;
}
else {
redisFree(ctx);
ctx = nullptr;
}
return std::nullopt; return std::nullopt;
} }
@ -208,39 +241,43 @@ ErrorResult Redis::DoPut(ValPtr key, ValPtr value, bool overwrite, double expira
if ( ! overwrite ) if ( ! overwrite )
format.append(" NX"); format.append(" NX");
// Use built-in expiration if reading live data, since time will move
// forward consistently. If reading pcaps, we'll do something else.
if ( expiration_time > 0.0 && ! zeek::run_state::reading_traces )
format.append(" PXAT %d");
auto json_key = key->ToJSON()->ToStdString(); auto json_key = key->ToJSON()->ToStdString();
auto json_value = value->ToJSON()->ToStdString(); auto json_value = value->ToJSON()->ToStdString();
if ( async_mode ) {
int status; int status;
if ( expiration_time > 0.0 ) // Use built-in expiration if reading live data, since time will move
// forward consistently. If reading pcaps, we'll do something else.
if ( expiration_time > 0.0 && ! zeek::run_state::reading_traces ) {
format.append(" PXAT %d");
status = redisAsyncCommand(async_ctx, redisPut, cb, format.c_str(), key_prefix.data(), json_key.data(), status = redisAsyncCommand(async_ctx, redisPut, cb, format.c_str(), key_prefix.data(), json_key.data(),
json_value.data(), static_cast<uint64_t>(expiration_time * 1e6)); json_value.data(), static_cast<uint64_t>(expiration_time * 1e6));
}
else else
status = redisAsyncCommand(async_ctx, redisPut, cb, format.c_str(), key_prefix.data(), json_key.data(), status = redisAsyncCommand(async_ctx, redisPut, cb, format.c_str(), key_prefix.data(), json_key.data(),
json_value.data()); json_value.data());
if ( connected && status == REDIS_ERR ) if ( connected && status == REDIS_ERR )
return util::fmt("Failed to queue async put operation: %s", async_ctx->errstr); return util::fmt("Failed to queue put operation: %s", async_ctx->errstr);
}
else {
redisReply* reply;
if ( expiration_time > 0.0 && ! zeek::run_state::reading_traces )
reply = (redisReply*)redisCommand(ctx, format.c_str(), key_prefix.data(), json_key.data(),
json_value.data(), static_cast<uint64_t>(expiration_time * 1e6));
else
reply =
(redisReply*)redisCommand(ctx, format.c_str(), key_prefix.data(), json_key.data(), json_value.data());
if ( ! reply ) ++active_ops;
return util::fmt("Put operation failed: %s", ctx->errstr);
if ( ! cb ) {
Poll();
redisReply* reply = reply_queue.front();
reply_queue.pop_front();
ErrorResult res;
if ( ! connected )
res = util::fmt("Connection is not open");
else if ( ! reply )
res = util::fmt("Async put operation returned null reply");
else if ( reply && reply->type == REDIS_REPLY_ERROR )
res = util::fmt("Async put operation failed: %s", reply->str);
freeReplyObject(reply); freeReplyObject(reply);
if ( res.has_value() )
return res;
} }
// If reading pcaps insert into a secondary set that's ordered by expiration // If reading pcaps insert into a secondary set that's ordered by expiration
@ -251,12 +288,18 @@ ErrorResult Redis::DoPut(ValPtr key, ValPtr value, bool overwrite, double expira
format.append(" NX"); format.append(" NX");
format += " %f %s"; format += " %f %s";
redisReply* reply = status = redisAsyncCommand(async_ctx, redisGeneric, NULL, format.c_str(), key_prefix.data(), expiration_time,
(redisReply*)redisCommand(ctx, format.c_str(), key_prefix.data(), expiration_time, json_key.data()); json_key.data());
if ( ! reply ) if ( connected && status == REDIS_ERR )
return util::fmt("ZADD operation failed: %s", ctx->errstr); return util::fmt("ZADD operation failed: %s", async_ctx->errstr);
freeReplyObject(reply); ++active_ops;
}
if ( ! cb ) {
// We don't care about the result from the ZADD, just that we wait
// for it to finish.
Poll();
} }
return std::nullopt; return std::nullopt;
@ -270,27 +313,28 @@ ValResult Redis::DoGet(ValPtr key, ValResultCallback* cb) {
if ( ! connected && ! async_ctx ) if ( ! connected && ! async_ctx )
return zeek::unexpected<std::string>("Connection is not open"); return zeek::unexpected<std::string>("Connection is not open");
if ( async_mode ) {
int status = redisAsyncCommand(async_ctx, redisGet, cb, "GET %s:%s", key_prefix.data(), int status = redisAsyncCommand(async_ctx, redisGet, cb, "GET %s:%s", key_prefix.data(),
key->ToJSON()->ToStdStringView().data()); key->ToJSON()->ToStdStringView().data());
if ( connected && status == REDIS_ERR ) if ( connected && status == REDIS_ERR )
return zeek::unexpected<std::string>( return zeek::unexpected<std::string>(util::fmt("Failed to queue get operation: %s", async_ctx->errstr));
util::fmt("Failed to queue async get operation: %s", async_ctx->errstr));
// There isn't a result to return here. That happens in HandleGetResult. ++active_ops;
if ( ! cb ) {
Poll();
redisReply* reply = reply_queue.front();
reply_queue.pop_front();
auto res = ParseGetReply(reply);
freeReplyObject(reply);
return res;
}
// There isn't a result to return here. That happens in HandleGetResult for
// async operations.
return zeek::unexpected<std::string>(""); return zeek::unexpected<std::string>("");
} }
else {
auto reply =
(redisReply*)redisCommand(ctx, "GET %s:%s", key_prefix.data(), key->ToJSON()->ToStdStringView().data());
if ( ! reply )
return zeek::unexpected<std::string>(util::fmt("Get operation failed: %s", ctx->errstr));
return ParseGetReply(reply);
}
}
/** /**
* The workhorse method for Erase(). This must be implemented for plugins. * The workhorse method for Erase(). This must be implemented for plugins.
@ -300,20 +344,18 @@ ErrorResult Redis::DoErase(ValPtr key, ErrorResultCallback* cb) {
if ( ! connected && ! async_ctx ) if ( ! connected && ! async_ctx )
return "Connection is not open"; return "Connection is not open";
if ( async_mode ) {
int status = redisAsyncCommand(async_ctx, redisErase, cb, "DEL %s:%s", key_prefix.data(), int status = redisAsyncCommand(async_ctx, redisErase, cb, "DEL %s:%s", key_prefix.data(),
key->ToJSON()->ToStdStringView().data()); key->ToJSON()->ToStdStringView().data());
if ( connected && status == REDIS_ERR ) if ( connected && status == REDIS_ERR )
return util::fmt("Failed to queue async erase operation failed: %s", async_ctx->errstr); return util::fmt("Failed to queue erase operation failed: %s", async_ctx->errstr);
}
else {
redisReply* reply =
(redisReply*)redisCommand(ctx, "DEL %s:%s", key_prefix.data(), key->ToJSON()->ToStdStringView().data());
if ( ! reply ) ++active_ops;
return util::fmt("Put operation failed: %s", ctx->errstr);
if ( ! cb ) {
Poll();
redisReply* reply = reply_queue.front();
reply_queue.pop_front();
freeReplyObject(reply); freeReplyObject(reply);
} }
@ -321,19 +363,28 @@ ErrorResult Redis::DoErase(ValPtr key, ErrorResultCallback* cb) {
} }
void Redis::Expire() { void Redis::Expire() {
// Expiration is handled natively by Redis if not reading traces.
if ( ! connected || ! zeek::run_state::reading_traces ) if ( ! connected || ! zeek::run_state::reading_traces )
return; return;
redisReply* reply = int status = redisAsyncCommand(async_ctx, redisZRANGEBYSCORE, NULL, "ZRANGEBYSCORE %s_expire -inf %f",
(redisReply*)redisCommand(ctx, "ZRANGEBYSCORE %s_expire -inf %f", key_prefix.data(), run_state::network_time); key_prefix.data(), run_state::network_time);
if ( ! reply ) { if ( status == REDIS_ERR ) {
// TODO: do something with the error? // TODO: do something with the error?
printf("ZRANGEBYSCORE command failed: %s\n", ctx->errstr); printf("ZRANGEBYSCORE command failed: %s\n", async_ctx->errstr);
return; return;
} }
if ( reply->elements == 0 ) { ++active_ops;
// Expire always happens in a synchronous fashion. Block here until we've received
// a response.
Poll();
redisReply* reply = reply_queue.front();
reply_queue.pop_front();
if ( reply && reply->elements == 0 ) {
freeReplyObject(reply); freeReplyObject(reply);
return; return;
} }
@ -343,28 +394,28 @@ void Redis::Expire() {
// and passing the array as a block somehow. There's no guarantee it'd be faster // and passing the array as a block somehow. There's no guarantee it'd be faster
// anyways. // anyways.
for ( size_t i = 0; i < reply->elements; i++ ) { for ( size_t i = 0; i < reply->elements; i++ ) {
auto del_reply = (redisReply*)redisCommand(ctx, "DEL %s:%s", key_prefix.data(), reply->element[i]->str); status =
redisAsyncCommand(async_ctx, redisGeneric, NULL, "DEL %s:%s", key_prefix.data(), reply->element[i]->str);
// Don't bother checking the response here. The only error that would matter is if the key ++active_ops;
// didn't exist, but that would mean it was already removed for some other reason. Poll();
freeReplyObject(del_reply);
} }
freeReplyObject(reply); // Remove all of the elements from the range-set that match the time range.
reply = (redisReply*)redisCommand(ctx, "ZREMRANGEBYSCORE %s_expire -inf %f", key_prefix.data(), redisAsyncCommand(async_ctx, redisGeneric, NULL, "ZREMRANGEBYSCORE %s_expire -inf %f", key_prefix.data(),
run_state::network_time); run_state::network_time);
if ( ! reply ) { ++active_ops;
// TODO: do something with the error? Poll();
printf("ZREMRANGEBYSCORE command failed: %s\n", ctx->errstr);
return;
}
freeReplyObject(reply); // This can't be freed until the other commands finish because the memory for
// the strings doesn't get copied when making the DEL commands.
// freeReplyObject(reply);
} }
void Redis::HandlePutResult(redisReply* reply, ErrorResultCallback* callback) { void Redis::HandlePutResult(redisReply* reply, ErrorResultCallback* callback) {
--active_ops;
if ( callback ) {
ErrorResult res; ErrorResult res;
if ( ! connected ) if ( ! connected )
res = util::fmt("Connection is not open"); res = util::fmt("Connection is not open");
@ -377,8 +428,14 @@ void Redis::HandlePutResult(redisReply* reply, ErrorResultCallback* callback) {
callback->Complete(res); callback->Complete(res);
delete callback; delete callback;
} }
else
reply_queue.push_back(reply);
}
void Redis::HandleGetResult(redisReply* reply, ValResultCallback* callback) { void Redis::HandleGetResult(redisReply* reply, ValResultCallback* callback) {
--active_ops;
if ( callback ) {
ValResult res; ValResult res;
if ( ! connected ) if ( ! connected )
res = zeek::unexpected<std::string>("Connection is not open"); res = zeek::unexpected<std::string>("Connection is not open");
@ -386,10 +443,18 @@ void Redis::HandleGetResult(redisReply* reply, ValResultCallback* callback) {
res = ParseGetReply(reply); res = ParseGetReply(reply);
callback->Complete(res); callback->Complete(res);
freeReplyObject(reply);
delete callback; delete callback;
} }
else {
reply_queue.push_back(reply);
}
}
void Redis::HandleEraseResult(redisReply* reply, ErrorResultCallback* callback) { void Redis::HandleEraseResult(redisReply* reply, ErrorResultCallback* callback) {
--active_ops;
if ( callback ) {
ErrorResult res; ErrorResult res;
if ( ! connected ) if ( ! connected )
res = "Connection is not open"; res = "Connection is not open";
@ -399,13 +464,22 @@ void Redis::HandleEraseResult(redisReply* reply, ErrorResultCallback* callback)
res = util::fmt("Async erase operation failed: %s", reply->str); res = util::fmt("Async erase operation failed: %s", reply->str);
freeReplyObject(reply); freeReplyObject(reply);
callback->Complete(res); callback->Complete(res);
delete callback; delete callback;
} }
else
reply_queue.push_back(reply);
}
void Redis::HandleZRANGEBYSCORE(redisReply* reply) {
--active_ops;
reply_queue.push_back(reply);
}
void Redis::OnConnect(int status) { void Redis::OnConnect(int status) {
DBG_LOG(DBG_STORAGE, "Redis backend: connection event"); DBG_LOG(DBG_STORAGE, "Redis backend: connection event");
--active_ops;
if ( status == REDIS_OK ) { if ( status == REDIS_OK ) {
connected = true; connected = true;
return; return;
@ -416,6 +490,8 @@ void Redis::OnConnect(int status) {
void Redis::OnDisconnect(int status) { void Redis::OnDisconnect(int status) {
DBG_LOG(DBG_STORAGE, "Redis backend: disconnection event"); DBG_LOG(DBG_STORAGE, "Redis backend: disconnection event");
--active_ops;
if ( status == REDIS_OK ) { if ( status == REDIS_OK ) {
// TODO: this was an intentional disconnect, nothing to do? // TODO: this was an intentional disconnect, nothing to do?
} }
@ -426,31 +502,6 @@ void Redis::OnDisconnect(int status) {
connected = false; connected = false;
} }
void Redis::OnAddRead() {
if ( ! async_ctx )
return;
iosource_mgr->RegisterFd(async_ctx->c.fd, this, IOSource::READ);
}
void Redis::OnDelRead() {
if ( ! async_ctx )
return;
iosource_mgr->UnregisterFd(async_ctx->c.fd, this, IOSource::READ);
}
void Redis::OnAddWrite() {
if ( ! async_ctx )
return;
iosource_mgr->RegisterFd(async_ctx->c.fd, this, IOSource::WRITE);
}
void Redis::OnDelWrite() {
if ( ! async_ctx )
return;
iosource_mgr->UnregisterFd(async_ctx->c.fd, this, IOSource::WRITE);
}
void Redis::ProcessFd(int fd, int flags) { void Redis::ProcessFd(int fd, int flags) {
if ( (flags & IOSource::ProcessFlags::READ) != 0 ) if ( (flags & IOSource::ProcessFlags::READ) != 0 )
redisAsyncHandleRead(async_ctx); redisAsyncHandleRead(async_ctx);
@ -473,8 +524,12 @@ ValResult Redis::ParseGetReply(redisReply* reply) const {
res = zeek::unexpected<std::string>(std::get<std::string>(val)); res = zeek::unexpected<std::string>(std::get<std::string>(val));
} }
freeReplyObject(reply);
return res; return res;
} }
void Redis::Poll() {
while ( active_ops > 0 )
int status = redisPollTick(async_ctx, 0.5);
}
} // namespace zeek::storage::backend::redis } // namespace zeek::storage::backend::redis

View file

@ -6,12 +6,11 @@
#include "zeek/storage/Backend.h" #include "zeek/storage/Backend.h"
// Forward declare some types from hiredis to avoid including the header // Forward declare some types from hiredis to avoid including the header
struct redisContext;
struct redisAsyncContext; struct redisAsyncContext;
struct redisReply; struct redisReply;
struct redisPollEvents;
namespace zeek::storage::backend::redis { namespace zeek::storage::backend::redis {
class Redis : public Backend, public iosource::IOSource { class Redis : public Backend, public iosource::IOSource {
public: public:
Redis(std::string_view tag) : Backend(true, tag), IOSource(true) {} Redis(std::string_view tag) : Backend(true, tag), IOSource(true) {}
@ -73,25 +72,30 @@ public:
void OnConnect(int status); void OnConnect(int status);
void OnDisconnect(int status); void OnDisconnect(int status);
void OnAddRead();
void OnDelRead();
void OnAddWrite();
void OnDelWrite();
void HandlePutResult(redisReply* reply, ErrorResultCallback* callback); void HandlePutResult(redisReply* reply, ErrorResultCallback* callback);
void HandleGetResult(redisReply* reply, ValResultCallback* callback); void HandleGetResult(redisReply* reply, ValResultCallback* callback);
void HandleEraseResult(redisReply* reply, ErrorResultCallback* callback); void HandleEraseResult(redisReply* reply, ErrorResultCallback* callback);
void HandleZRANGEBYSCORE(redisReply* reply);
// HandleGeneric exists so that async-running-as-sync operations can remove
// themselves from the list of active operations.
void HandleGeneric() { --active_ops; }
private: private:
ValResult ParseGetReply(redisReply* reply) const; ValResult ParseGetReply(redisReply* reply) const;
void Poll();
redisContext* ctx = nullptr;
redisAsyncContext* async_ctx = nullptr; redisAsyncContext* async_ctx = nullptr;
bool connected = true;
// When running in sync mode, this is used to keep a queue of replies as
// responses come in from the remote calls until we run out of data to
// poll.
std::deque<redisReply*> reply_queue;
std::string server_addr; std::string server_addr;
std::string key_prefix; std::string key_prefix;
bool async_mode = false; std::atomic<bool> connected = false;
int active_ops = 0;
}; };
} // namespace zeek::storage::backend::redis } // namespace zeek::storage::backend::redis

View file

@ -1,5 +1,6 @@
# @TEST-DOC: Tests that Redis storage backend defaults back to sync mode reading pcaps # @TEST-DOC: Tests that Redis storage backend defaults back to sync mode reading pcaps
# @TEST-KNOWN-FAILURE: Currently broken due to the redis async rework
# @TEST-REQUIRES: have-redis # @TEST-REQUIRES: have-redis
# @TEST-PORT: REDIS_PORT # @TEST-PORT: REDIS_PORT
@ -19,30 +20,37 @@
# Create a typename here that can be passed down into open_backend() # Create a typename here that can be passed down into open_backend()
type str: string; type str: string;
event zeek_init() { event zeek_init()
{
local opts: Storage::BackendOptions; local opts: Storage::BackendOptions;
opts$redis = [$server_host = "127.0.0.1", $server_port = to_port(getenv("REDIS_PORT")), $key_prefix = "testing", $async_mode = T]; opts$redis = [ $server_host="127.0.0.1", $server_port=to_port(getenv(
"REDIS_PORT")), $key_prefix="testing" ];
local key = "key1234"; local key = "key1234";
local value = "value5678"; local value = "value5678";
local b = Storage::Sync::open_backend(Storage::REDIS, opts, str, str); local b = Storage::Sync::open_backend(Storage::REDIS, opts, str, str);
when [b, key, value] ( local res = Storage::Async::put(b, [$key=key, $value=value]) ) { when [b, key, value] ( local res = Storage::Async::put(b, [ $key=key,
$value=value ]) )
{
print "put result", res; print "put result", res;
when [b, key, value] ( local res2 = Storage::Async::get(b, key) ) { when [b, key, value] ( local res2 = Storage::Async::get(b, key) )
{
print "get result", res2; print "get result", res2;
if ( res2?$val ) if ( res2?$val )
print "get result same as inserted", value == ( res2$val as string ); print "get result same as inserted", value == ( res2$val as string );
Storage::Sync::close_backend(b); Storage::Sync::close_backend(b);
} }
timeout 5 sec { timeout 5sec
print "get requeest timed out"; {
print "get request timed out";
} }
} }
timeout 5 sec { timeout 5sec
{
print "put request timed out"; print "put request timed out";
} }
} }

View file

@ -21,19 +21,24 @@ redef exit_only_after_terminate = T;
# Create a typename here that can be passed down into open_backend() # Create a typename here that can be passed down into open_backend()
type str: string; type str: string;
event zeek_init() { event zeek_init()
{
local opts: Storage::BackendOptions; local opts: Storage::BackendOptions;
opts$redis = [$server_host = "127.0.0.1", $server_port = to_port(getenv("REDIS_PORT")), $key_prefix = "testing", $async_mode = T]; opts$redis = [ $server_host="127.0.0.1", $server_port=to_port(getenv(
"REDIS_PORT")), $key_prefix="testing" ];
local key = "key1234"; local key = "key1234";
local value = "value5678"; local value = "value5678";
local b = Storage::Sync::open_backend(Storage::REDIS, opts, str, str); local b = Storage::Sync::open_backend(Storage::REDIS, opts, str, str);
when [b, key, value] ( local res = Storage::Async::put(b, [$key=key, $value=value]) ) { when [b, key, value] ( local res = Storage::Async::put(b, [ $key=key,
$value=value ]) )
{
print "put result", res; print "put result", res;
when [b, key, value] ( local res2 = Storage::Async::get(b, key) ) { when [b, key, value] ( local res2 = Storage::Async::get(b, key) )
{
print "get result", res2; print "get result", res2;
if ( res2?$val ) if ( res2?$val )
print "get result same as inserted", value == ( res2$val as string ); print "get result same as inserted", value == ( res2$val as string );
@ -42,12 +47,14 @@ event zeek_init() {
terminate(); terminate();
} }
timeout 5 sec { timeout 5sec
print "get requeest timed out"; {
print "get request timed out";
terminate(); terminate();
} }
} }
timeout 5 sec { timeout 5sec
{
print "put request timed out"; print "put request timed out";
terminate(); terminate();
} }

View file

@ -39,14 +39,17 @@ global redis_data_written: event() &is_used;
global backend: opaque of Storage::BackendHandle; global backend: opaque of Storage::BackendHandle;
type str: string; type str: string;
event zeek_init() { event zeek_init()
{
local opts: Storage::BackendOptions; local opts: Storage::BackendOptions;
opts$redis = [$server_host = "127.0.0.1", $server_port = to_port(getenv("REDIS_PORT")), $key_prefix = "testing", $async_mode = F]; opts$redis = [ $server_host="127.0.0.1", $server_port=to_port(getenv(
"REDIS_PORT")), $key_prefix="testing" ];
backend = Storage::Sync::open_backend(Storage::REDIS, opts, str, str); backend = Storage::Sync::open_backend(Storage::REDIS, opts, str, str);
} }
event redis_data_written() { event redis_data_written()
{
print "redis_data_written"; print "redis_data_written";
local res = Storage::Sync::get(backend, "1234"); local res = Storage::Sync::get(backend, "1234");
print Cluster::node, res; print Cluster::node, res;
@ -58,13 +61,15 @@ event redis_data_written() {
global node_count: count = 0; global node_count: count = 0;
event Cluster::node_down(name: string, id: string) { event Cluster::node_down(name: string, id: string)
{
++node_count; ++node_count;
if ( node_count == 2 ) if ( node_count == 2 )
terminate(); terminate();
} }
event redis_data_written() { event redis_data_written()
{
local e = Cluster::make_event(redis_data_written); local e = Cluster::make_event(redis_data_written);
Cluster::publish(Cluster::worker_topic, e); Cluster::publish(Cluster::worker_topic, e);
} }
@ -73,7 +78,8 @@ event redis_data_written() {
@if ( Cluster::node == "worker-1" ) @if ( Cluster::node == "worker-1" )
event Cluster::Experimental::cluster_started() { event Cluster::Experimental::cluster_started()
{
local res = Storage::Sync::put(backend, [ $key="1234", $value="5678" ]); local res = Storage::Sync::put(backend, [ $key="1234", $value="5678" ]);
print Cluster::node, "put result", res; print Cluster::node, "put result", res;

View file

@ -25,7 +25,8 @@ global b: opaque of Storage::BackendHandle;
global key: string = "key1234"; global key: string = "key1234";
global value: string = "value7890"; global value: string = "value7890";
event check_removed() { event check_removed()
{
local res2 = Storage::Sync::get(b, key); local res2 = Storage::Sync::get(b, key);
print "get result after expiration", res2; print "get result after expiration", res2;
@ -33,9 +34,11 @@ event check_removed() {
terminate(); terminate();
} }
event setup_test() { event setup_test()
{
local opts: Storage::BackendOptions; local opts: Storage::BackendOptions;
opts$redis = [$server_host = "127.0.0.1", $server_port = to_port(getenv("REDIS_PORT")), $key_prefix = "testing", $async_mode = F]; opts$redis = [ $server_host="127.0.0.1", $server_port=to_port(getenv(
"REDIS_PORT")), $key_prefix="testing" ];
b = Storage::Sync::open_backend(Storage::REDIS, opts, str, str); b = Storage::Sync::open_backend(Storage::REDIS, opts, str, str);
@ -50,7 +53,8 @@ event setup_test() {
schedule 5secs { check_removed() }; schedule 5secs { check_removed() };
} }
event zeek_init() { event zeek_init()
{
# We need network time to be set to something other than zero for the # We need network time to be set to something other than zero for the
# expiration time to be set correctly. Schedule an event on a short # expiration time to be set correctly. Schedule an event on a short
# timer so packets start getting read and do the setup there. # timer so packets start getting read and do the setup there.

View file

@ -18,9 +18,11 @@
# Create a typename here that can be passed down into open_backend() # Create a typename here that can be passed down into open_backend()
type str: string; type str: string;
event zeek_init() { event zeek_init()
{
local opts: Storage::BackendOptions; local opts: Storage::BackendOptions;
opts$redis = [$server_host = "127.0.0.1", $server_port = to_port(getenv("REDIS_PORT")), $key_prefix = "testing", $async_mode = F]; opts$redis = [ $server_host="127.0.0.1", $server_port=to_port(getenv(
"REDIS_PORT")), $key_prefix="testing" ];
local key = "key1234"; local key = "key1234";
local value = "value1234"; local value = "value1234";

View file

@ -12,7 +12,8 @@ redef exit_only_after_terminate = T;
# Create a typename here that can be passed down into get(). # Create a typename here that can be passed down into get().
type str: string; type str: string;
event zeek_init() { event zeek_init()
{
# Create a database file in the .tmp directory with a 'testing' table # Create a database file in the .tmp directory with a 'testing' table
local opts: Storage::BackendOptions; local opts: Storage::BackendOptions;
opts$sqlite = [ $database_path="test.sqlite", $table_name="testing" ]; opts$sqlite = [ $database_path="test.sqlite", $table_name="testing" ];
@ -24,10 +25,13 @@ event zeek_init() {
# the backend yet. # the backend yet.
local b = Storage::Sync::open_backend(Storage::SQLITE, opts, str, str); local b = Storage::Sync::open_backend(Storage::SQLITE, opts, str, str);
when [b, key, value] ( local res = Storage::Async::put(b, [$key=key, $value=value]) ) { when [b, key, value] ( local res = Storage::Async::put(b, [ $key=key,
$value=value ]) )
{
print "put result", res; print "put result", res;
when [b, key, value] ( local res2 = Storage::Async::get(b, key) ) { when [b, key, value] ( local res2 = Storage::Async::get(b, key) )
{
print "get result", res2; print "get result", res2;
if ( res2?$val ) if ( res2?$val )
print "get result same as inserted", value == ( res2$val as string ); print "get result same as inserted", value == ( res2$val as string );
@ -36,12 +40,14 @@ event zeek_init() {
terminate(); terminate();
} }
timeout 5 sec { timeout 5sec
print "get requeest timed out"; {
print "get request timed out";
terminate(); terminate();
} }
} }
timeout 5 sec { timeout 5sec
{
print "put request timed out"; print "put request timed out";
terminate(); terminate();
} }

View file

@ -11,7 +11,8 @@ redef exit_only_after_terminate = T;
# Create a typename here that can be passed down into get(). # Create a typename here that can be passed down into get().
type str: string; type str: string;
event zeek_init() { event zeek_init()
{
# Create a database file in the .tmp directory with a 'testing' table # Create a database file in the .tmp directory with a 'testing' table
local opts: Storage::BackendOptions; local opts: Storage::BackendOptions;
opts$sqlite = [ $database_path="test.sqlite", $table_name="testing" ]; opts$sqlite = [ $database_path="test.sqlite", $table_name="testing" ];
@ -21,36 +22,47 @@ event zeek_init() {
# Test inserting/retrieving a key/value pair that we know won't be in # Test inserting/retrieving a key/value pair that we know won't be in
# the backend yet. # the backend yet.
when [opts, key, value] ( local b = Storage::Async::open_backend(Storage::SQLITE, opts, str, str) ) { when [opts, key, value] ( local b = Storage::Async::open_backend(
Storage::SQLITE, opts, str, str) )
{
print "open successful"; print "open successful";
when [b, key, value] ( local put_res = Storage::Async::put(b, [$key=key, $value=value]) ) { when [b, key, value] ( local put_res = Storage::Async::put(b, [ $key=key,
$value=value ]) )
{
print "put result", put_res; print "put result", put_res;
when [b, key, value] ( local get_res = Storage::Async::get(b, key) ) { when [b, key, value] ( local get_res = Storage::Async::get(b, key) )
{
print "get result", get_res; print "get result", get_res;
if ( get_res?$val ) if ( get_res?$val )
print "get result same as inserted", value == ( get_res$val as string ); print "get result same as inserted", value == ( get_res$val as string );
when [b] ( local close_res = Storage::Async::close_backend(b) ) { when [b] ( local close_res = Storage::Async::close_backend(b) )
{
print "closed succesfully"; print "closed succesfully";
terminate(); terminate();
} timeout 5 sec { }
timeout 5sec
{
print "close request timed out"; print "close request timed out";
terminate(); terminate();
} }
} }
timeout 5 sec { timeout 5sec
print "get requeest timed out"; {
print "get request timed out";
terminate(); terminate();
} }
} }
timeout 5 sec { timeout 5sec
{
print "put request timed out"; print "put request timed out";
terminate(); terminate();
} }
} }
timeout 5 sec { timeout 5sec
{
print "open request timed out"; print "open request timed out";
terminate(); terminate();
} }