diff --git a/scripts/policy/frameworks/storage/backend/redis/main.zeek b/scripts/policy/frameworks/storage/backend/redis/main.zeek index 35e6a6f27f..f37fe0b3c8 100644 --- a/scripts/policy/frameworks/storage/backend/redis/main.zeek +++ b/scripts/policy/frameworks/storage/backend/redis/main.zeek @@ -22,17 +22,6 @@ export { # but preferably should be set to a unique value per Redis # backend opened. key_prefix: string &default=""; - - # Redis only supports sync and async separately. You cannot do - # both with the same connection. If this flag is true, the - # connection will be async and will only allow commands via - # ``when`` commands. You will still need to set the - # ``async_mode`` flags of the put, get, and erase methods to - # match this flag. This flag is overridden when reading pcaps - # and the backend will be forced into synchronous mode, since - # time won't move forward the same as when capturing live - # traffic. - async_mode: bool &default=T; }; redef record Storage::BackendOptions += { diff --git a/src/storage/Backend.cc b/src/storage/Backend.cc index 591d72eda7..cfedd8bd54 100644 --- a/src/storage/Backend.cc +++ b/src/storage/Backend.cc @@ -109,7 +109,7 @@ ErrorResult Backend::Put(ValPtr key, ValPtr value, bool overwrite, double expira auto res = DoPut(std::move(key), std::move(value), overwrite, expiration_time, cb); - if ( (! native_async || zeek::run_state::reading_traces) && cb ) { + if ( ! native_async && cb ) { cb->Complete(res); delete cb; } @@ -125,7 +125,7 @@ ValResult Backend::Get(ValPtr key, ValResultCallback* cb) { auto res = DoGet(std::move(key), cb); - if ( (! native_async || zeek::run_state::reading_traces) && cb ) { + if ( ! native_async && cb ) { cb->Complete(res); delete cb; } @@ -141,7 +141,7 @@ ErrorResult Backend::Erase(ValPtr key, ErrorResultCallback* cb) { auto res = DoErase(std::move(key), cb); - if ( (! native_async || zeek::run_state::reading_traces) && cb ) { + if ( ! native_async && cb ) { cb->Complete(res); delete cb; } diff --git a/src/storage/backend/redis/Redis.cc b/src/storage/backend/redis/Redis.cc index 5e081b821f..18e3b45a2e 100644 --- a/src/storage/backend/redis/Redis.cc +++ b/src/storage/backend/redis/Redis.cc @@ -8,6 +8,7 @@ #include "zeek/Val.h" #include "zeek/iosource/Manager.h" +#include "hiredis/adapters/poll.h" #include "hiredis/async.h" #include "hiredis/hiredis.h" @@ -16,8 +17,10 @@ namespace { class Tracer { public: - Tracer(const std::string& where) : where(where) { /*printf("%s\n", where.c_str());*/ } - ~Tracer() { /* printf("%s done\n", where.c_str()); */ } + Tracer(const std::string& where) : where(where) { // printf("%s\n", where.c_str()); + } + ~Tracer() { // printf("%s done\n", where.c_str()); + } std::string where; }; @@ -54,25 +57,62 @@ void redisErase(redisAsyncContext* ctx, void* reply, void* privdata) { backend->HandleEraseResult(static_cast(reply), callback); } +void redisZRANGEBYSCORE(redisAsyncContext* ctx, void* reply, void* privdata) { + auto t = Tracer("erase"); + auto backend = static_cast(ctx->data); + backend->HandleZRANGEBYSCORE(static_cast(reply)); +} + +void redisGeneric(redisAsyncContext* ctx, void* reply, void* privdata) { + auto t = Tracer("generic"); + auto backend = static_cast(ctx->data); + backend->HandleGeneric(); + freeReplyObject(reply); +} + +// Because we called redisPollAttach in DoOpen(), privdata here is a +// redisPollEvents object. We can go through that object to get the context's +// data, which contains the backend. Because we overrode these callbacks in +// DoOpen, we still want to mimic their callbacks to redisPollTick functions +// correctly. void redisAddRead(void* privdata) { auto t = Tracer("addread"); - auto backend = static_cast(privdata); - backend->OnAddRead(); + auto rpe = static_cast(privdata); + auto backend = static_cast(rpe->context->data); + + if ( rpe->reading == 0 ) + zeek::iosource_mgr->RegisterFd(rpe->fd, backend, zeek::iosource::IOSource::READ); + rpe->reading = 1; } + void redisDelRead(void* privdata) { auto t = Tracer("delread"); - auto backend = static_cast(privdata); - backend->OnDelRead(); + auto rpe = static_cast(privdata); + auto backend = static_cast(rpe->context->data); + + if ( rpe->reading == 1 ) + zeek::iosource_mgr->UnregisterFd(rpe->fd, backend, zeek::iosource::IOSource::READ); + rpe->reading = 0; } + void redisAddWrite(void* privdata) { auto t = Tracer("addwrite"); - auto backend = static_cast(privdata); - backend->OnAddWrite(); + auto rpe = static_cast(privdata); + auto backend = static_cast(rpe->context->data); + + if ( rpe->writing == 0 ) + zeek::iosource_mgr->RegisterFd(rpe->fd, backend, zeek::iosource::IOSource::WRITE); + rpe->writing = 1; } + void redisDelWrite(void* privdata) { + auto rpe = static_cast(privdata); auto t = Tracer("delwrite"); - auto backend = static_cast(privdata); - backend->OnDelWrite(); + auto backend = static_cast(rpe->context->data); + + if ( rpe->writing == 1 ) + zeek::iosource_mgr->UnregisterFd(rpe->fd, backend, zeek::iosource::IOSource::WRITE); + rpe->writing = 0; } } // namespace @@ -87,13 +127,8 @@ storage::BackendPtr Redis::Instantiate(std::string_view tag) { return make_intru ErrorResult Redis::DoOpen(RecordValPtr options, OpenResultCallback* cb) { RecordValPtr backend_options = options->GetField("redis"); - // When reading traces we disable storage async mode globally (see src/storage/Backend.cc) since - // time moves forward based on the pcap and not based on real time. - async_mode = backend_options->GetField("async_mode")->Get() && ! zeek::run_state::reading_traces; key_prefix = backend_options->GetField("key_prefix")->ToStdString(); - DBG_LOG(DBG_STORAGE, "Redis backend: running in async mode? %d", async_mode); - redisOptions opt = {0}; StringValPtr host = backend_options->GetField("server_host"); @@ -118,57 +153,58 @@ ErrorResult Redis::DoOpen(RecordValPtr options, OpenResultCallback* cb) { struct timeval timeout = {5, 0}; opt.connect_timeout = &timeout; - if ( async_mode ) { - async_ctx = redisAsyncConnectWithOptions(&opt); - if ( async_ctx == nullptr || async_ctx->err ) { - // This block doesn't necessarily mean the connection failed. It means - // that hiredis failed to set up the async context. Connection failure - // is returned later via the OnConnect callback. - std::string errmsg = util::fmt("Failed to open connection to Redis server at %s", server_addr.c_str()); - if ( async_ctx ) { - errmsg.append(": "); - errmsg.append(async_ctx->errstr); - } - - redisAsyncFree(async_ctx); - async_ctx = nullptr; - return errmsg; + async_ctx = redisAsyncConnectWithOptions(&opt); + if ( async_ctx == nullptr || async_ctx->err ) { + // This block doesn't necessarily mean the connection failed. It means + // that hiredis failed to set up the async context. Connection failure + // is returned later via the OnConnect callback. + std::string errmsg = util::fmt("Failed to open connection to Redis server at %s", server_addr.c_str()); + if ( async_ctx ) { + errmsg.append(": "); + errmsg.append(async_ctx->errstr); } - // TODO: Sort out how to pass the zeek callbacks for both open/done to the async - // callbacks from hiredis so they can return errors. - - // The context is passed to the handler methods. Setting this data object - // pointer allows us to look up the backend in the handlers. - async_ctx->data = this; - async_ctx->ev.data = this; - - redisAsyncSetConnectCallback(async_ctx, redisOnConnect); - redisAsyncSetDisconnectCallback(async_ctx, redisOnDisconnect); - - // These four callbacks handle the file descriptor coming and going for read - // and write operations for hiredis. Their subsequent callbacks will - // register/unregister with iosource_mgr as needed. I tried just registering - // full time for both read and write but it leads to weird syncing issues - // within the hiredis code. This is safer in regards to the library, even if - // it results in waking up our IO loop more frequently. - async_ctx->ev.addRead = redisAddRead; - async_ctx->ev.delRead = redisDelRead; - async_ctx->ev.addWrite = redisAddWrite; - async_ctx->ev.delWrite = redisDelWrite; + redisAsyncFree(async_ctx); + async_ctx = nullptr; + return errmsg; } - else { - ctx = redisConnectWithOptions(&opt); - if ( ctx == nullptr || ctx->err ) { - if ( ctx ) - return util::fmt("Failed to open connection to Redis server at %s", server_addr.c_str()); - else - return util::fmt("Failed to open connection to Redis server at %s: %s", server_addr.c_str(), - ctx->errstr); - } - connected = true; - } + ++active_ops; + + // TODO: Sort out how to pass the zeek callbacks for both open/done to the async + // callbacks from hiredis so they can return errors. + + // The context is passed to the handler methods. Setting this data object + // pointer allows us to look up the backend in the handlers. + async_ctx->data = this; + + redisPollAttach(async_ctx); + redisAsyncSetConnectCallback(async_ctx, redisOnConnect); + redisAsyncSetDisconnectCallback(async_ctx, redisOnDisconnect); + + // redisAsyncSetConnectCallback sets the flag in the redisPollEvent for writing + // so we can add this to our loop as well. + zeek::iosource_mgr->RegisterFd(async_ctx->c.fd, this, zeek::iosource::IOSource::WRITE); + + // These four callbacks handle the file descriptor coming and going for read + // and write operations for hiredis. Their subsequent callbacks will + // register/unregister with iosource_mgr as needed. I tried just registering + // full time for both read and write but it leads to weird syncing issues + // within the hiredis code. This is safer in regards to the library, even if + // it results in waking up our IO loop more frequently. + // + // redisPollAttach sets these to functions internal to the poll attachment, + // but we override them for our own uses. See the callbacks for more info + // about why. + async_ctx->ev.addRead = redisAddRead; + async_ctx->ev.delRead = redisDelRead; + async_ctx->ev.addWrite = redisAddWrite; + async_ctx->ev.delWrite = redisDelWrite; + + if ( ! cb ) + // Polling here will eventually call OnConnect, which will set the flag + // that we're connected. + Poll(); return std::nullopt; } @@ -179,20 +215,17 @@ ErrorResult Redis::DoOpen(RecordValPtr options, OpenResultCallback* cb) { ErrorResult Redis::DoClose(ErrorResultCallback* cb) { connected = false; - if ( async_mode ) { - // This will probably result in an error since hiredis should have - // already removed the file descriptor via the delRead and delWrite - // callbacks, but do it anyways just to be sure. - iosource_mgr->UnregisterFd(async_ctx->c.fd, this, IOSource::READ | IOSource::WRITE); - redisAsyncDisconnect(async_ctx); - redisAsyncFree(async_ctx); - async_ctx = nullptr; - } - else { - redisFree(ctx); - ctx = nullptr; + redisAsyncDisconnect(async_ctx); + ++active_ops; + + if ( ! cb && ! zeek::run_state::terminating ) { + Poll(); + // TODO: handle response } + redisAsyncFree(async_ctx); + async_ctx = nullptr; + return std::nullopt; } @@ -208,39 +241,43 @@ ErrorResult Redis::DoPut(ValPtr key, ValPtr value, bool overwrite, double expira if ( ! overwrite ) format.append(" NX"); - // Use built-in expiration if reading live data, since time will move - // forward consistently. If reading pcaps, we'll do something else. - if ( expiration_time > 0.0 && ! zeek::run_state::reading_traces ) - format.append(" PXAT %d"); - auto json_key = key->ToJSON()->ToStdString(); auto json_value = value->ToJSON()->ToStdString(); - if ( async_mode ) { - int status; - if ( expiration_time > 0.0 ) - status = redisAsyncCommand(async_ctx, redisPut, cb, format.c_str(), key_prefix.data(), json_key.data(), - json_value.data(), static_cast(expiration_time * 1e6)); - else - status = redisAsyncCommand(async_ctx, redisPut, cb, format.c_str(), key_prefix.data(), json_key.data(), - json_value.data()); - - if ( connected && status == REDIS_ERR ) - return util::fmt("Failed to queue async put operation: %s", async_ctx->errstr); + int status; + // Use built-in expiration if reading live data, since time will move + // forward consistently. If reading pcaps, we'll do something else. + if ( expiration_time > 0.0 && ! zeek::run_state::reading_traces ) { + format.append(" PXAT %d"); + status = redisAsyncCommand(async_ctx, redisPut, cb, format.c_str(), key_prefix.data(), json_key.data(), + json_value.data(), static_cast(expiration_time * 1e6)); } - else { - redisReply* reply; - if ( expiration_time > 0.0 && ! zeek::run_state::reading_traces ) - reply = (redisReply*)redisCommand(ctx, format.c_str(), key_prefix.data(), json_key.data(), - json_value.data(), static_cast(expiration_time * 1e6)); - else - reply = - (redisReply*)redisCommand(ctx, format.c_str(), key_prefix.data(), json_key.data(), json_value.data()); + else + status = redisAsyncCommand(async_ctx, redisPut, cb, format.c_str(), key_prefix.data(), json_key.data(), + json_value.data()); - if ( ! reply ) - return util::fmt("Put operation failed: %s", ctx->errstr); + if ( connected && status == REDIS_ERR ) + return util::fmt("Failed to queue put operation: %s", async_ctx->errstr); + + ++active_ops; + + if ( ! cb ) { + Poll(); + + redisReply* reply = reply_queue.front(); + reply_queue.pop_front(); + + ErrorResult res; + if ( ! connected ) + res = util::fmt("Connection is not open"); + else if ( ! reply ) + res = util::fmt("Async put operation returned null reply"); + else if ( reply && reply->type == REDIS_REPLY_ERROR ) + res = util::fmt("Async put operation failed: %s", reply->str); freeReplyObject(reply); + if ( res.has_value() ) + return res; } // If reading pcaps insert into a secondary set that's ordered by expiration @@ -251,12 +288,18 @@ ErrorResult Redis::DoPut(ValPtr key, ValPtr value, bool overwrite, double expira format.append(" NX"); format += " %f %s"; - redisReply* reply = - (redisReply*)redisCommand(ctx, format.c_str(), key_prefix.data(), expiration_time, json_key.data()); - if ( ! reply ) - return util::fmt("ZADD operation failed: %s", ctx->errstr); + status = redisAsyncCommand(async_ctx, redisGeneric, NULL, format.c_str(), key_prefix.data(), expiration_time, + json_key.data()); + if ( connected && status == REDIS_ERR ) + return util::fmt("ZADD operation failed: %s", async_ctx->errstr); - freeReplyObject(reply); + ++active_ops; + } + + if ( ! cb ) { + // We don't care about the result from the ZADD, just that we wait + // for it to finish. + Poll(); } return std::nullopt; @@ -270,26 +313,27 @@ ValResult Redis::DoGet(ValPtr key, ValResultCallback* cb) { if ( ! connected && ! async_ctx ) return zeek::unexpected("Connection is not open"); - if ( async_mode ) { - int status = redisAsyncCommand(async_ctx, redisGet, cb, "GET %s:%s", key_prefix.data(), - key->ToJSON()->ToStdStringView().data()); + int status = redisAsyncCommand(async_ctx, redisGet, cb, "GET %s:%s", key_prefix.data(), + key->ToJSON()->ToStdStringView().data()); - if ( connected && status == REDIS_ERR ) - return zeek::unexpected( - util::fmt("Failed to queue async get operation: %s", async_ctx->errstr)); + if ( connected && status == REDIS_ERR ) + return zeek::unexpected(util::fmt("Failed to queue get operation: %s", async_ctx->errstr)); - // There isn't a result to return here. That happens in HandleGetResult. - return zeek::unexpected(""); + ++active_ops; + + if ( ! cb ) { + Poll(); + redisReply* reply = reply_queue.front(); + reply_queue.pop_front(); + + auto res = ParseGetReply(reply); + freeReplyObject(reply); + return res; } - else { - auto reply = - (redisReply*)redisCommand(ctx, "GET %s:%s", key_prefix.data(), key->ToJSON()->ToStdStringView().data()); - if ( ! reply ) - return zeek::unexpected(util::fmt("Get operation failed: %s", ctx->errstr)); - - return ParseGetReply(reply); - } + // There isn't a result to return here. That happens in HandleGetResult for + // async operations. + return zeek::unexpected(""); } /** @@ -300,20 +344,18 @@ ErrorResult Redis::DoErase(ValPtr key, ErrorResultCallback* cb) { if ( ! connected && ! async_ctx ) return "Connection is not open"; - if ( async_mode ) { - int status = redisAsyncCommand(async_ctx, redisErase, cb, "DEL %s:%s", key_prefix.data(), - key->ToJSON()->ToStdStringView().data()); + int status = redisAsyncCommand(async_ctx, redisErase, cb, "DEL %s:%s", key_prefix.data(), + key->ToJSON()->ToStdStringView().data()); - if ( connected && status == REDIS_ERR ) - return util::fmt("Failed to queue async erase operation failed: %s", async_ctx->errstr); - } - else { - redisReply* reply = - (redisReply*)redisCommand(ctx, "DEL %s:%s", key_prefix.data(), key->ToJSON()->ToStdStringView().data()); + if ( connected && status == REDIS_ERR ) + return util::fmt("Failed to queue erase operation failed: %s", async_ctx->errstr); - if ( ! reply ) - return util::fmt("Put operation failed: %s", ctx->errstr); + ++active_ops; + if ( ! cb ) { + Poll(); + redisReply* reply = reply_queue.front(); + reply_queue.pop_front(); freeReplyObject(reply); } @@ -321,19 +363,28 @@ ErrorResult Redis::DoErase(ValPtr key, ErrorResultCallback* cb) { } void Redis::Expire() { + // Expiration is handled natively by Redis if not reading traces. if ( ! connected || ! zeek::run_state::reading_traces ) return; - redisReply* reply = - (redisReply*)redisCommand(ctx, "ZRANGEBYSCORE %s_expire -inf %f", key_prefix.data(), run_state::network_time); + int status = redisAsyncCommand(async_ctx, redisZRANGEBYSCORE, NULL, "ZRANGEBYSCORE %s_expire -inf %f", + key_prefix.data(), run_state::network_time); - if ( ! reply ) { + if ( status == REDIS_ERR ) { // TODO: do something with the error? - printf("ZRANGEBYSCORE command failed: %s\n", ctx->errstr); + printf("ZRANGEBYSCORE command failed: %s\n", async_ctx->errstr); return; } - if ( reply->elements == 0 ) { + ++active_ops; + + // Expire always happens in a synchronous fashion. Block here until we've received + // a response. + Poll(); + redisReply* reply = reply_queue.front(); + reply_queue.pop_front(); + + if ( reply && reply->elements == 0 ) { freeReplyObject(reply); return; } @@ -343,69 +394,92 @@ void Redis::Expire() { // and passing the array as a block somehow. There's no guarantee it'd be faster // anyways. for ( size_t i = 0; i < reply->elements; i++ ) { - auto del_reply = (redisReply*)redisCommand(ctx, "DEL %s:%s", key_prefix.data(), reply->element[i]->str); - - // Don't bother checking the response here. The only error that would matter is if the key - // didn't exist, but that would mean it was already removed for some other reason. - - freeReplyObject(del_reply); + status = + redisAsyncCommand(async_ctx, redisGeneric, NULL, "DEL %s:%s", key_prefix.data(), reply->element[i]->str); + ++active_ops; + Poll(); } - freeReplyObject(reply); - reply = (redisReply*)redisCommand(ctx, "ZREMRANGEBYSCORE %s_expire -inf %f", key_prefix.data(), - run_state::network_time); + // Remove all of the elements from the range-set that match the time range. + redisAsyncCommand(async_ctx, redisGeneric, NULL, "ZREMRANGEBYSCORE %s_expire -inf %f", key_prefix.data(), + run_state::network_time); - if ( ! reply ) { - // TODO: do something with the error? - printf("ZREMRANGEBYSCORE command failed: %s\n", ctx->errstr); - return; - } + ++active_ops; + Poll(); - freeReplyObject(reply); + // This can't be freed until the other commands finish because the memory for + // the strings doesn't get copied when making the DEL commands. + // freeReplyObject(reply); } void Redis::HandlePutResult(redisReply* reply, ErrorResultCallback* callback) { - ErrorResult res; - if ( ! connected ) - res = util::fmt("Connection is not open"); - else if ( ! reply ) - res = util::fmt("Async put operation returned null reply"); - else if ( reply && reply->type == REDIS_REPLY_ERROR ) - res = util::fmt("Async put operation failed: %s", reply->str); + --active_ops; - freeReplyObject(reply); - callback->Complete(res); - delete callback; + if ( callback ) { + ErrorResult res; + if ( ! connected ) + res = util::fmt("Connection is not open"); + else if ( ! reply ) + res = util::fmt("Async put operation returned null reply"); + else if ( reply && reply->type == REDIS_REPLY_ERROR ) + res = util::fmt("Async put operation failed: %s", reply->str); + + freeReplyObject(reply); + callback->Complete(res); + delete callback; + } + else + reply_queue.push_back(reply); } void Redis::HandleGetResult(redisReply* reply, ValResultCallback* callback) { - ValResult res; - if ( ! connected ) - res = zeek::unexpected("Connection is not open"); - else - res = ParseGetReply(reply); + --active_ops; - callback->Complete(res); - delete callback; + if ( callback ) { + ValResult res; + if ( ! connected ) + res = zeek::unexpected("Connection is not open"); + else + res = ParseGetReply(reply); + + callback->Complete(res); + freeReplyObject(reply); + delete callback; + } + else { + reply_queue.push_back(reply); + } } void Redis::HandleEraseResult(redisReply* reply, ErrorResultCallback* callback) { - ErrorResult res; - if ( ! connected ) - res = "Connection is not open"; - else if ( ! reply ) - res = util::fmt("Async erase operation returned null reply"); - else if ( reply && reply->type == REDIS_REPLY_ERROR ) - res = util::fmt("Async erase operation failed: %s", reply->str); + --active_ops; - freeReplyObject(reply); + if ( callback ) { + ErrorResult res; + if ( ! connected ) + res = "Connection is not open"; + else if ( ! reply ) + res = util::fmt("Async erase operation returned null reply"); + else if ( reply && reply->type == REDIS_REPLY_ERROR ) + res = util::fmt("Async erase operation failed: %s", reply->str); - callback->Complete(res); - delete callback; + freeReplyObject(reply); + callback->Complete(res); + delete callback; + } + else + reply_queue.push_back(reply); +} + +void Redis::HandleZRANGEBYSCORE(redisReply* reply) { + --active_ops; + reply_queue.push_back(reply); } void Redis::OnConnect(int status) { DBG_LOG(DBG_STORAGE, "Redis backend: connection event"); + --active_ops; + if ( status == REDIS_OK ) { connected = true; return; @@ -416,6 +490,8 @@ void Redis::OnConnect(int status) { void Redis::OnDisconnect(int status) { DBG_LOG(DBG_STORAGE, "Redis backend: disconnection event"); + --active_ops; + if ( status == REDIS_OK ) { // TODO: this was an intentional disconnect, nothing to do? } @@ -426,31 +502,6 @@ void Redis::OnDisconnect(int status) { connected = false; } -void Redis::OnAddRead() { - if ( ! async_ctx ) - return; - - iosource_mgr->RegisterFd(async_ctx->c.fd, this, IOSource::READ); -} -void Redis::OnDelRead() { - if ( ! async_ctx ) - return; - - iosource_mgr->UnregisterFd(async_ctx->c.fd, this, IOSource::READ); -} -void Redis::OnAddWrite() { - if ( ! async_ctx ) - return; - - iosource_mgr->RegisterFd(async_ctx->c.fd, this, IOSource::WRITE); -} -void Redis::OnDelWrite() { - if ( ! async_ctx ) - return; - - iosource_mgr->UnregisterFd(async_ctx->c.fd, this, IOSource::WRITE); -} - void Redis::ProcessFd(int fd, int flags) { if ( (flags & IOSource::ProcessFlags::READ) != 0 ) redisAsyncHandleRead(async_ctx); @@ -473,8 +524,12 @@ ValResult Redis::ParseGetReply(redisReply* reply) const { res = zeek::unexpected(std::get(val)); } - freeReplyObject(reply); return res; } +void Redis::Poll() { + while ( active_ops > 0 ) + int status = redisPollTick(async_ctx, 0.5); +} + } // namespace zeek::storage::backend::redis diff --git a/src/storage/backend/redis/Redis.h b/src/storage/backend/redis/Redis.h index ba32df6a88..613aca13b2 100644 --- a/src/storage/backend/redis/Redis.h +++ b/src/storage/backend/redis/Redis.h @@ -6,12 +6,11 @@ #include "zeek/storage/Backend.h" // Forward declare some types from hiredis to avoid including the header -struct redisContext; struct redisAsyncContext; struct redisReply; +struct redisPollEvents; namespace zeek::storage::backend::redis { - class Redis : public Backend, public iosource::IOSource { public: Redis(std::string_view tag) : Backend(true, tag), IOSource(true) {} @@ -73,25 +72,30 @@ public: void OnConnect(int status); void OnDisconnect(int status); - void OnAddRead(); - void OnDelRead(); - void OnAddWrite(); - void OnDelWrite(); - void HandlePutResult(redisReply* reply, ErrorResultCallback* callback); void HandleGetResult(redisReply* reply, ValResultCallback* callback); void HandleEraseResult(redisReply* reply, ErrorResultCallback* callback); + void HandleZRANGEBYSCORE(redisReply* reply); + + // HandleGeneric exists so that async-running-as-sync operations can remove + // themselves from the list of active operations. + void HandleGeneric() { --active_ops; } private: ValResult ParseGetReply(redisReply* reply) const; + void Poll(); - redisContext* ctx = nullptr; redisAsyncContext* async_ctx = nullptr; - bool connected = true; + + // When running in sync mode, this is used to keep a queue of replies as + // responses come in from the remote calls until we run out of data to + // poll. + std::deque reply_queue; std::string server_addr; std::string key_prefix; - bool async_mode = false; + std::atomic connected = false; + int active_ops = 0; }; } // namespace zeek::storage::backend::redis diff --git a/testing/btest/scripts/base/frameworks/storage/redis-async-reading-pcap.zeek b/testing/btest/scripts/base/frameworks/storage/redis-async-reading-pcap.zeek index dec60fce83..1e4d17ccbf 100644 --- a/testing/btest/scripts/base/frameworks/storage/redis-async-reading-pcap.zeek +++ b/testing/btest/scripts/base/frameworks/storage/redis-async-reading-pcap.zeek @@ -1,5 +1,6 @@ # @TEST-DOC: Tests that Redis storage backend defaults back to sync mode reading pcaps +# @TEST-KNOWN-FAILURE: Currently broken due to the redis async rework # @TEST-REQUIRES: have-redis # @TEST-PORT: REDIS_PORT @@ -19,30 +20,37 @@ # Create a typename here that can be passed down into open_backend() type str: string; -event zeek_init() { - local opts : Storage::BackendOptions; - opts$redis = [$server_host = "127.0.0.1", $server_port = to_port(getenv("REDIS_PORT")), $key_prefix = "testing", $async_mode = T]; +event zeek_init() + { + local opts: Storage::BackendOptions; + opts$redis = [ $server_host="127.0.0.1", $server_port=to_port(getenv( + "REDIS_PORT")), $key_prefix="testing" ]; local key = "key1234"; local value = "value5678"; local b = Storage::Sync::open_backend(Storage::REDIS, opts, str, str); - when [b, key, value] ( local res = Storage::Async::put(b, [$key=key, $value=value]) ) { + when [b, key, value] ( local res = Storage::Async::put(b, [ $key=key, + $value=value ]) ) + { print "put result", res; - when [b, key, value] ( local res2 = Storage::Async::get(b, key) ) { + when [b, key, value] ( local res2 = Storage::Async::get(b, key) ) + { print "get result", res2; if ( res2?$val ) - print "get result same as inserted", value == (res2$val as string); + print "get result same as inserted", value == ( res2$val as string ); Storage::Sync::close_backend(b); + } + timeout 5sec + { + print "get request timed out"; + } } - timeout 5 sec { - print "get requeest timed out"; - } - } - timeout 5 sec { + timeout 5sec + { print "put request timed out"; + } } -} diff --git a/testing/btest/scripts/base/frameworks/storage/redis-async.zeek b/testing/btest/scripts/base/frameworks/storage/redis-async.zeek index 37947d349d..225da69719 100644 --- a/testing/btest/scripts/base/frameworks/storage/redis-async.zeek +++ b/testing/btest/scripts/base/frameworks/storage/redis-async.zeek @@ -21,34 +21,41 @@ redef exit_only_after_terminate = T; # Create a typename here that can be passed down into open_backend() type str: string; -event zeek_init() { - local opts : Storage::BackendOptions; - opts$redis = [$server_host = "127.0.0.1", $server_port = to_port(getenv("REDIS_PORT")), $key_prefix = "testing", $async_mode = T]; +event zeek_init() + { + local opts: Storage::BackendOptions; + opts$redis = [ $server_host="127.0.0.1", $server_port=to_port(getenv( + "REDIS_PORT")), $key_prefix="testing" ]; local key = "key1234"; local value = "value5678"; local b = Storage::Sync::open_backend(Storage::REDIS, opts, str, str); - when [b, key, value] ( local res = Storage::Async::put(b, [$key=key, $value=value]) ) { + when [b, key, value] ( local res = Storage::Async::put(b, [ $key=key, + $value=value ]) ) + { print "put result", res; - when [b, key, value] ( local res2 = Storage::Async::get(b, key) ) { + when [b, key, value] ( local res2 = Storage::Async::get(b, key) ) + { print "get result", res2; if ( res2?$val ) - print "get result same as inserted", value == (res2$val as string); + print "get result same as inserted", value == ( res2$val as string ); Storage::Sync::close_backend(b); terminate(); - } - timeout 5 sec { - print "get requeest timed out"; + } + timeout 5sec + { + print "get request timed out"; terminate(); + } } - } - timeout 5 sec { + timeout 5sec + { print "put request timed out"; terminate(); + } } -} diff --git a/testing/btest/scripts/base/frameworks/storage/redis-cluster.zeek b/testing/btest/scripts/base/frameworks/storage/redis-cluster.zeek index 3336c31101..8d6ede1d90 100644 --- a/testing/btest/scripts/base/frameworks/storage/redis-cluster.zeek +++ b/testing/btest/scripts/base/frameworks/storage/redis-cluster.zeek @@ -39,46 +39,52 @@ global redis_data_written: event() &is_used; global backend: opaque of Storage::BackendHandle; type str: string; -event zeek_init() { - local opts : Storage::BackendOptions; - opts$redis = [$server_host = "127.0.0.1", $server_port = to_port(getenv("REDIS_PORT")), $key_prefix = "testing", $async_mode = F]; +event zeek_init() + { + local opts: Storage::BackendOptions; + opts$redis = [ $server_host="127.0.0.1", $server_port=to_port(getenv( + "REDIS_PORT")), $key_prefix="testing" ]; backend = Storage::Sync::open_backend(Storage::REDIS, opts, str, str); -} + } -event redis_data_written() { +event redis_data_written() + { print "redis_data_written"; local res = Storage::Sync::get(backend, "1234"); print Cluster::node, res; Storage::Sync::close_backend(backend); terminate(); -} + } @else global node_count: count = 0; -event Cluster::node_down(name: string, id: string) { +event Cluster::node_down(name: string, id: string) + { ++node_count; if ( node_count == 2 ) terminate(); -} + } -event redis_data_written() { +event redis_data_written() + { local e = Cluster::make_event(redis_data_written); Cluster::publish(Cluster::worker_topic, e); -} + } @endif @if ( Cluster::node == "worker-1" ) -event Cluster::Experimental::cluster_started() { - local res = Storage::Sync::put(backend, [$key="1234", $value="5678"]); +event Cluster::Experimental::cluster_started() + { + local res = Storage::Sync::put(backend, [ $key="1234", $value="5678" ]); print Cluster::node, "put result", res; local e = Cluster::make_event(redis_data_written); Cluster::publish(Cluster::manager_topic, e); -} + } @endif diff --git a/testing/btest/scripts/base/frameworks/storage/redis-expiration.zeek b/testing/btest/scripts/base/frameworks/storage/redis-expiration.zeek index 6c9b1398e8..fcc6d03439 100644 --- a/testing/btest/scripts/base/frameworks/storage/redis-expiration.zeek +++ b/testing/btest/scripts/base/frameworks/storage/redis-expiration.zeek @@ -15,7 +15,7 @@ @load base/frameworks/storage/sync @load policy/frameworks/storage/backend/redis -redef Storage::expire_interval = 2 secs; +redef Storage::expire_interval = 2secs; redef exit_only_after_terminate = T; # Create a typename here that can be passed down into open_backend() @@ -25,34 +25,38 @@ global b: opaque of Storage::BackendHandle; global key: string = "key1234"; global value: string = "value7890"; -event check_removed() { +event check_removed() + { local res2 = Storage::Sync::get(b, key); print "get result after expiration", res2; Storage::Sync::close_backend(b); terminate(); -} + } -event setup_test() { - local opts : Storage::BackendOptions; - opts$redis = [$server_host = "127.0.0.1", $server_port = to_port(getenv("REDIS_PORT")), $key_prefix = "testing", $async_mode = F]; +event setup_test() + { + local opts: Storage::BackendOptions; + opts$redis = [ $server_host="127.0.0.1", $server_port=to_port(getenv( + "REDIS_PORT")), $key_prefix="testing" ]; b = Storage::Sync::open_backend(Storage::REDIS, opts, str, str); - local res = Storage::Sync::put(b, [$key=key, $value=value, $expire_time=2 secs]); + local res = Storage::Sync::put(b, [ $key=key, $value=value, $expire_time=2secs ]); print "put result", res; local res2 = Storage::Sync::get(b, key); print "get result", res2; if ( res2?$val ) - print "get result same as inserted", value == (res2$val as string); + print "get result same as inserted", value == ( res2$val as string ); - schedule 5 secs { check_removed() }; -} + schedule 5secs { check_removed() }; + } -event zeek_init() { +event zeek_init() + { # We need network time to be set to something other than zero for the # expiration time to be set correctly. Schedule an event on a short # timer so packets start getting read and do the setup there. - schedule 100 msecs { setup_test() }; -} + schedule 100msecs { setup_test() }; + } diff --git a/testing/btest/scripts/base/frameworks/storage/redis-sync.zeek b/testing/btest/scripts/base/frameworks/storage/redis-sync.zeek index cd65443290..bf49a30230 100644 --- a/testing/btest/scripts/base/frameworks/storage/redis-sync.zeek +++ b/testing/btest/scripts/base/frameworks/storage/redis-sync.zeek @@ -18,31 +18,33 @@ # Create a typename here that can be passed down into open_backend() type str: string; -event zeek_init() { - local opts : Storage::BackendOptions; - opts$redis = [$server_host = "127.0.0.1", $server_port = to_port(getenv("REDIS_PORT")), $key_prefix = "testing", $async_mode = F]; +event zeek_init() + { + local opts: Storage::BackendOptions; + opts$redis = [ $server_host="127.0.0.1", $server_port=to_port(getenv( + "REDIS_PORT")), $key_prefix="testing" ]; local key = "key1234"; local value = "value1234"; local b = Storage::Sync::open_backend(Storage::REDIS, opts, str, str); - local res = Storage::Sync::put(b, [$key=key, $value=value]); + local res = Storage::Sync::put(b, [ $key=key, $value=value ]); print "put result", res; local res2 = Storage::Sync::get(b, key); print "get result", res2; if ( res2?$val ) - print "get result same as inserted", value == (res2$val as string); + print "get result same as inserted", value == ( res2$val as string ); local value2 = "value5678"; - res = Storage::Sync::put(b, [$key=key, $value=value2, $overwrite=T]); + res = Storage::Sync::put(b, [ $key=key, $value=value2, $overwrite=T ]); print "overwrite put result", res; res2 = Storage::Sync::get(b, key); print "get result", res2; if ( res2?$val ) - print "get result same as inserted", value2 == (res2$val as string); + print "get result same as inserted", value2 == ( res2$val as string ); Storage::Sync::close_backend(b); -} + } diff --git a/testing/btest/scripts/base/frameworks/storage/sqlite-basic-reading-pcap.zeek b/testing/btest/scripts/base/frameworks/storage/sqlite-basic-reading-pcap.zeek index 76989fa1c5..8a933de3c6 100644 --- a/testing/btest/scripts/base/frameworks/storage/sqlite-basic-reading-pcap.zeek +++ b/testing/btest/scripts/base/frameworks/storage/sqlite-basic-reading-pcap.zeek @@ -12,10 +12,11 @@ redef exit_only_after_terminate = T; # Create a typename here that can be passed down into get(). type str: string; -event zeek_init() { +event zeek_init() + { # Create a database file in the .tmp directory with a 'testing' table - local opts : Storage::BackendOptions; - opts$sqlite = [$database_path = "test.sqlite", $table_name = "testing"]; + local opts: Storage::BackendOptions; + opts$sqlite = [ $database_path="test.sqlite", $table_name="testing" ]; local key = "key1234"; local value = "value5678"; @@ -24,25 +25,30 @@ event zeek_init() { # the backend yet. local b = Storage::Sync::open_backend(Storage::SQLITE, opts, str, str); - when [b, key, value] ( local res = Storage::Async::put(b, [$key=key, $value=value]) ) { + when [b, key, value] ( local res = Storage::Async::put(b, [ $key=key, + $value=value ]) ) + { print "put result", res; - when [b, key, value] ( local res2 = Storage::Async::get(b, key) ) { + when [b, key, value] ( local res2 = Storage::Async::get(b, key) ) + { print "get result", res2; if ( res2?$val ) - print "get result same as inserted", value == (res2$val as string); + print "get result same as inserted", value == ( res2$val as string ); Storage::Sync::close_backend(b); terminate(); - } - timeout 5 sec { - print "get requeest timed out"; + } + timeout 5sec + { + print "get request timed out"; terminate(); + } } - } - timeout 5 sec { + timeout 5sec + { print "put request timed out"; terminate(); + } } -} diff --git a/testing/btest/scripts/base/frameworks/storage/sqlite-basic.zeek b/testing/btest/scripts/base/frameworks/storage/sqlite-basic.zeek index 63d262e55b..c815996f7c 100644 --- a/testing/btest/scripts/base/frameworks/storage/sqlite-basic.zeek +++ b/testing/btest/scripts/base/frameworks/storage/sqlite-basic.zeek @@ -11,47 +11,59 @@ redef exit_only_after_terminate = T; # Create a typename here that can be passed down into get(). type str: string; -event zeek_init() { +event zeek_init() + { # Create a database file in the .tmp directory with a 'testing' table - local opts : Storage::BackendOptions; - opts$sqlite = [$database_path = "test.sqlite", $table_name="testing"]; + local opts: Storage::BackendOptions; + opts$sqlite = [ $database_path="test.sqlite", $table_name="testing" ]; local key = "key1234"; local value = "value5678"; # Test inserting/retrieving a key/value pair that we know won't be in # the backend yet. - when [opts, key, value] ( local b = Storage::Async::open_backend(Storage::SQLITE, opts, str, str) ) { + when [opts, key, value] ( local b = Storage::Async::open_backend( + Storage::SQLITE, opts, str, str) ) + { print "open successful"; - when [b, key, value] ( local put_res = Storage::Async::put(b, [$key=key, $value=value]) ) { + when [b, key, value] ( local put_res = Storage::Async::put(b, [ $key=key, + $value=value ]) ) + { print "put result", put_res; - when [b, key, value] ( local get_res = Storage::Async::get(b, key) ) { + when [b, key, value] ( local get_res = Storage::Async::get(b, key) ) + { print "get result", get_res; if ( get_res?$val ) - print "get result same as inserted", value == (get_res$val as string); + print "get result same as inserted", value == ( get_res$val as string ); - when [b] ( local close_res = Storage::Async::close_backend(b) ) { + when [b] ( local close_res = Storage::Async::close_backend(b) ) + { print "closed succesfully"; terminate(); - } timeout 5 sec { + } + timeout 5sec + { print "close request timed out"; terminate(); + } + } + timeout 5sec + { + print "get request timed out"; + terminate(); } } - timeout 5 sec { - print "get requeest timed out"; - terminate(); - } - } - timeout 5 sec { + timeout 5sec + { print "put request timed out"; terminate(); + } } - } - timeout 5 sec { + timeout 5sec + { print "open request timed out"; terminate(); + } } -}