// See the file "COPYING" in the main distribution directory for copyright. #include "zeek/storage/backend/redis/Redis.h" #include "zeek/DebugLogger.h" #include "zeek/Func.h" #include "zeek/RunState.h" #include "zeek/Val.h" #include "zeek/iosource/Manager.h" #include "zeek/storage/ReturnCode.h" #include "hiredis/adapters/poll.h" #include "hiredis/async.h" #include "hiredis/hiredis.h" // Anonymous callback handler methods for the hiredis async API. namespace { class Tracer { public: Tracer(const std::string& where) : where(where) {} // DBG_LOG(zeek::DBG_STORAGE, "%s", where.c_str()); } ~Tracer() {} // DBG_LOG(zeek::DBG_STORAGE, "%s done", where.c_str()); } std::string where; }; void redisOnConnect(const redisAsyncContext* ctx, int status) { auto t = Tracer("connect"); auto backend = static_cast(ctx->data); backend->OnConnect(status); } void redisOnDisconnect(const redisAsyncContext* ctx, int status) { auto t = Tracer("disconnect"); auto backend = static_cast(ctx->data); backend->OnDisconnect(status); } void redisPut(redisAsyncContext* ctx, void* reply, void* privdata) { auto t = Tracer("put"); auto backend = static_cast(ctx->data); auto callback = static_cast(privdata); backend->HandlePutResult(static_cast(reply), callback); } void redisGet(redisAsyncContext* ctx, void* reply, void* privdata) { auto t = Tracer("get"); auto backend = static_cast(ctx->data); auto callback = static_cast(privdata); backend->HandleGetResult(static_cast(reply), callback); } void redisErase(redisAsyncContext* ctx, void* reply, void* privdata) { auto t = Tracer("erase"); auto backend = static_cast(ctx->data); auto callback = static_cast(privdata); backend->HandleEraseResult(static_cast(reply), callback); } void redisZADD(redisAsyncContext* ctx, void* reply, void* privdata) { auto t = Tracer("generic"); auto backend = static_cast(ctx->data); // We don't care about the reply from the ZADD, mostly because blocking to poll // for it adds a bunch of complication to DoPut() with having to handle the // reply from SET first. backend->HandleGeneric(nullptr); freeReplyObject(reply); } void redisGeneric(redisAsyncContext* ctx, void* reply, void* privdata) { auto t = Tracer("generic"); auto backend = static_cast(ctx->data); backend->HandleGeneric(static_cast(reply)); } // Because we called redisPollAttach in DoOpen(), privdata here is a // redisPollEvents object. We can go through that object to get the context's // data, which contains the backend. Because we overrode these callbacks in // DoOpen, we still want to mimic their callbacks to redisPollTick functions // correctly. // // Additionally, if we're in the middle of running a manual Expire() because // we're reading a pcap, don't add the file descriptor into iosource_mgr. Manual // calls to Poll() during that will handle reading/writing any data, and we // don't want the contention with the main loop. void redisAddRead(void* privdata) { auto t = Tracer("addread"); auto rpe = static_cast(privdata); auto backend = static_cast(rpe->context->data); if ( rpe->reading == 0 && ! backend->ExpireRunning() ) zeek::iosource_mgr->RegisterFd(rpe->fd, backend, zeek::iosource::IOSource::READ); rpe->reading = 1; } void redisDelRead(void* privdata) { auto t = Tracer("delread"); auto rpe = static_cast(privdata); auto backend = static_cast(rpe->context->data); if ( rpe->reading == 1 && ! backend->ExpireRunning() ) zeek::iosource_mgr->UnregisterFd(rpe->fd, backend, zeek::iosource::IOSource::READ); rpe->reading = 0; } void redisAddWrite(void* privdata) { auto t = Tracer("addwrite"); auto rpe = static_cast(privdata); auto backend = static_cast(rpe->context->data); if ( rpe->writing == 0 && ! backend->ExpireRunning() ) zeek::iosource_mgr->RegisterFd(rpe->fd, backend, zeek::iosource::IOSource::WRITE); rpe->writing = 1; } void redisDelWrite(void* privdata) { auto rpe = static_cast(privdata); auto t = Tracer("delwrite"); auto backend = static_cast(rpe->context->data); if ( rpe->writing == 1 && ! backend->ExpireRunning() ) zeek::iosource_mgr->UnregisterFd(rpe->fd, backend, zeek::iosource::IOSource::WRITE); rpe->writing = 0; } // Creates a unique_lock based on a condition against a mutex. This is used to // conditionally lock the expire_mutex. We only need to do it while reading // pcaps. The only thread contention happens during Expire(), which only happens // when reading pcaps. It's not worth the cycles to lock the mutex otherwise, // and hiredis will deal with other cross-command contention correctly as long // as it's in a single thread. std::unique_lock conditionally_lock(bool condition, std::mutex& mutex) { return condition ? std::unique_lock(mutex) : std::unique_lock(); } } // namespace namespace zeek::storage::backend::redis { storage::BackendPtr Redis::Instantiate() { return make_intrusive(); } /** * Called by the manager system to open the backend. */ OperationResult Redis::DoOpen(OpenResultCallback* cb, RecordValPtr options) { RecordValPtr backend_options = options->GetField("redis"); key_prefix = backend_options->GetField("key_prefix")->ToStdString(); redisOptions opt = {0}; StringValPtr host = backend_options->GetField("server_host"); if ( host ) { PortValPtr port = backend_options->GetField("server_port"); server_addr = util::fmt("%s:%d", host->ToStdStringView().data(), port->Port()); REDIS_OPTIONS_SET_TCP(&opt, host->ToStdStringView().data(), port->Port()); } else { StringValPtr unix_sock = backend_options->GetField("server_unix_socket"); if ( ! unix_sock ) { return {ReturnCode::CONNECTION_FAILED, "Either server_host/server_port or server_unix_socket must be set in Redis options record"}; } server_addr = unix_sock->ToStdString(); REDIS_OPTIONS_SET_UNIX(&opt, server_addr.c_str()); } opt.options |= REDIS_OPT_PREFER_IPV4; opt.options |= REDIS_OPT_NOAUTOFREEREPLIES; struct timeval timeout = {5, 0}; opt.connect_timeout = &timeout; // The connection request below should be operation #1. active_ops = 1; async_ctx = redisAsyncConnectWithOptions(&opt); if ( async_ctx == nullptr || async_ctx->err ) { // This block doesn't necessarily mean the connection failed. It means // that hiredis failed to set up the async context. Connection failure // is returned later via the OnConnect callback. std::string errmsg = util::fmt("Failed to open connection to Redis server at %s", server_addr.c_str()); if ( async_ctx ) { errmsg.append(": "); errmsg.append(async_ctx->errstr); } redisAsyncFree(async_ctx); async_ctx = nullptr; return {ReturnCode::CONNECTION_FAILED, errmsg}; } // There's no way to pass privdata down to the connect handler like there is for // the other callbacks. Store the open callback so that it can be dealt with from // OnConnect(). open_cb = cb; // TODO: Sort out how to pass the zeek callbacks for both open/done to the async // callbacks from hiredis so they can return errors. // The context is passed to the handler methods. Setting this data object // pointer allows us to look up the backend in the handlers. async_ctx->data = this; redisPollAttach(async_ctx); redisAsyncSetConnectCallback(async_ctx, redisOnConnect); redisAsyncSetDisconnectCallback(async_ctx, redisOnDisconnect); // redisAsyncSetConnectCallback sets the flag in the redisPollEvent for writing // so we can add this to our loop as well. zeek::iosource_mgr->RegisterFd(async_ctx->c.fd, this, zeek::iosource::IOSource::WRITE); // These four callbacks handle the file descriptor coming and going for read // and write operations for hiredis. Their subsequent callbacks will // register/unregister with iosource_mgr as needed. I tried just registering // full time for both read and write but it leads to weird syncing issues // within the hiredis code. This is safer in regards to the library, even if // it results in waking up our IO loop more frequently. // // redisPollAttach sets these to functions internal to the poll attachment, // but we override them for our own uses. See the callbacks for more info // about why. async_ctx->ev.addRead = redisAddRead; async_ctx->ev.delRead = redisDelRead; async_ctx->ev.addWrite = redisAddWrite; async_ctx->ev.delWrite = redisDelWrite; return {ReturnCode::IN_PROGRESS}; } /** * Finalizes the backend when it's being closed. */ OperationResult Redis::DoClose(ResultCallback* cb) { auto locked_scope = conditionally_lock(zeek::run_state::reading_traces, expire_mutex); connected = false; close_cb = cb; redisAsyncDisconnect(async_ctx); ++active_ops; return {ReturnCode::IN_PROGRESS}; } /** * The workhorse method for Put(). This must be implemented by plugins. */ OperationResult Redis::DoPut(ResultCallback* cb, ValPtr key, ValPtr value, bool overwrite, double expiration_time) { // The async context will queue operations until it's connected fully. if ( ! connected && ! async_ctx ) return {ReturnCode::NOT_CONNECTED}; auto locked_scope = conditionally_lock(zeek::run_state::reading_traces, expire_mutex); std::string format = "SET %s:%b %b"; if ( ! overwrite ) format.append(" NX"); auto key_data = serializer->Serialize(key); if ( ! key_data ) return {ReturnCode::SERIALIZATION_FAILED, "Failed to serialize key"}; auto val_data = serializer->Serialize(value); if ( ! val_data ) return {ReturnCode::SERIALIZATION_FAILED, "Failed to serialize value"}; int status; // Use built-in expiration if reading live data, since time will move // forward consistently. If reading pcaps, we'll do something else. if ( expiration_time > 0.0 && ! zeek::run_state::reading_traces ) { format.append(" PXAT %" PRIu64); status = redisAsyncCommand(async_ctx, redisPut, cb, format.c_str(), key_prefix.data(), key_data->data(), key_data->size(), val_data->data(), val_data->size(), static_cast(expiration_time * 1e3)); } else status = redisAsyncCommand(async_ctx, redisPut, cb, format.c_str(), key_prefix.data(), key_data->data(), key_data->size(), val_data->data(), val_data->size()); if ( connected && status == REDIS_ERR ) return {ReturnCode::OPERATION_FAILED, util::fmt("Failed to queue put operation: %s", async_ctx->errstr)}; ++active_ops; // If reading pcaps insert into a secondary set that's ordered by expiration // time that gets checked by Expire(). if ( expiration_time > 0.0 && zeek::run_state::reading_traces ) { format = "ZADD %s_expire"; if ( ! overwrite ) format.append(" NX"); format += " %f %b"; status = redisAsyncCommand(async_ctx, redisZADD, NULL, format.c_str(), key_prefix.data(), expiration_time, key_data->data(), key_data->size()); if ( connected && status == REDIS_ERR ) return {ReturnCode::OPERATION_FAILED, util::fmt("ZADD operation failed: %s", async_ctx->errstr)}; ++active_ops; } return {ReturnCode::IN_PROGRESS}; } /** * The workhorse method for Get(). This must be implemented for plugins. */ OperationResult Redis::DoGet(ResultCallback* cb, ValPtr key) { // The async context will queue operations until it's connected fully. if ( ! connected && ! async_ctx ) return {ReturnCode::NOT_CONNECTED}; auto locked_scope = conditionally_lock(zeek::run_state::reading_traces, expire_mutex); auto key_data = serializer->Serialize(key); if ( ! key_data ) return {ReturnCode::SERIALIZATION_FAILED, "Failed to serialize key"}; int status = redisAsyncCommand(async_ctx, redisGet, cb, "GET %s:%b", key_prefix.data(), key_data->data(), key_data->size()); if ( connected && status == REDIS_ERR ) return {ReturnCode::OPERATION_FAILED, util::fmt("Failed to queue get operation: %s", async_ctx->errstr)}; ++active_ops; // There isn't a result to return here. That happens in HandleGetResult for // async operations. return {ReturnCode::IN_PROGRESS}; } /** * The workhorse method for Erase(). This must be implemented for plugins. */ OperationResult Redis::DoErase(ResultCallback* cb, ValPtr key) { // The async context will queue operations until it's connected fully. if ( ! connected && ! async_ctx ) return {ReturnCode::NOT_CONNECTED}; auto locked_scope = conditionally_lock(zeek::run_state::reading_traces, expire_mutex); auto key_data = serializer->Serialize(key); if ( ! key_data ) return {ReturnCode::SERIALIZATION_FAILED, "Failed to serialize key"}; int status = redisAsyncCommand(async_ctx, redisErase, cb, "DEL %s:%b", key_prefix.data(), key_data->data(), key_data->size()); if ( connected && status == REDIS_ERR ) return {ReturnCode::OPERATION_FAILED, async_ctx->errstr}; ++active_ops; return {ReturnCode::IN_PROGRESS}; } void Redis::DoExpire(double current_network_time) { // Expiration is handled natively by Redis if not reading traces. if ( ! connected || ! zeek::run_state::reading_traces ) return; auto locked_scope = conditionally_lock(zeek::run_state::reading_traces, expire_mutex); expire_running = true; int status = redisAsyncCommand(async_ctx, redisGeneric, NULL, "ZRANGEBYSCORE %s_expire -inf %f", key_prefix.data(), current_network_time); if ( status == REDIS_ERR ) { // TODO: do something with the error? printf("ZRANGEBYSCORE command failed: %s\n", async_ctx->errstr); expire_running = false; return; } ++active_ops; // Expire always happens in a synchronous fashion. Block here until we've received // a response. Poll(); redisReply* reply = reply_queue.front(); reply_queue.pop_front(); if ( reply->elements == 0 ) { freeReplyObject(reply); expire_running = false; return; } std::vector elements; for ( size_t i = 0; i < reply->elements; i++ ) elements.emplace_back(reply->element[i]->str); freeReplyObject(reply); // TODO: it's possible to pass multiple keys to a DEL operation but it requires // building an array of the strings, building up the DEL command with entries, // and passing the array as a block somehow. There's no guarantee it'd be faster // anyways. for ( const auto& e : elements ) { status = redisAsyncCommand(async_ctx, redisGeneric, NULL, "DEL %s:%s", key_prefix.data(), e.c_str()); ++active_ops; Poll(); redisReply* reply = reply_queue.front(); reply_queue.pop_front(); freeReplyObject(reply); // TODO: do we care if this failed? } // Remove all of the elements from the range-set that match the time range. redisAsyncCommand(async_ctx, redisGeneric, NULL, "ZREMRANGEBYSCORE %s_expire -inf %f", key_prefix.data(), current_network_time); ++active_ops; Poll(); reply = reply_queue.front(); reply_queue.pop_front(); freeReplyObject(reply); // TODO: do we care if this failed? } void Redis::HandlePutResult(redisReply* reply, ResultCallback* callback) { --active_ops; OperationResult res{ReturnCode::SUCCESS}; if ( ! connected ) res = {ReturnCode::NOT_CONNECTED}; else if ( ! reply ) res = {ReturnCode::OPERATION_FAILED, "put operation returned null reply"}; else if ( reply->type == REDIS_REPLY_NIL ) // For a SET operation, a NIL reply indicates a conflict with the NX flag. res = {ReturnCode::KEY_EXISTS}; else if ( reply->type == REDIS_REPLY_ERROR ) res = ParseReplyError("put", reply->str); freeReplyObject(reply); CompleteCallback(callback, res); } void Redis::HandleGetResult(redisReply* reply, ResultCallback* callback) { --active_ops; OperationResult res; if ( ! connected ) res = {ReturnCode::NOT_CONNECTED}; if ( ! reply ) res = {ReturnCode::OPERATION_FAILED, "get operation returned null reply"}; else if ( reply->type == REDIS_REPLY_NIL ) res = {ReturnCode::KEY_NOT_FOUND}; else if ( reply->type == REDIS_REPLY_ERROR ) res = ParseReplyError("get", reply->str); else { auto val = serializer->Unserialize({(std::byte*)reply->str, reply->len}, val_type); if ( val ) res = {ReturnCode::SUCCESS, "", val.value()}; else res = {ReturnCode::OPERATION_FAILED, val.error()}; } freeReplyObject(reply); CompleteCallback(callback, res); } void Redis::HandleEraseResult(redisReply* reply, ResultCallback* callback) { --active_ops; OperationResult res{ReturnCode::SUCCESS}; if ( ! connected ) res = {ReturnCode::NOT_CONNECTED}; else if ( ! reply ) res = {ReturnCode::OPERATION_FAILED, "erase operation returned null reply"}; else if ( reply->type == REDIS_REPLY_ERROR ) res = ParseReplyError("erase", reply->str); freeReplyObject(reply); CompleteCallback(callback, res); } void Redis::HandleGeneric(redisReply* reply) { --active_ops; if ( reply ) reply_queue.push_back(reply); } void Redis::OnConnect(int status) { DBG_LOG(DBG_STORAGE, "Redis backend: connection event"); --active_ops; if ( status == REDIS_OK ) { connected = true; CompleteCallback(open_cb, {ReturnCode::SUCCESS}); // The connection_established event is sent via the open callback handler. return; } connected = false; CompleteCallback(open_cb, {ReturnCode::CONNECTION_FAILED}); // TODO: we could attempt to reconnect here } void Redis::OnDisconnect(int status) { DBG_LOG(DBG_STORAGE, "Redis backend: disconnection event"); connected = false; if ( status == REDIS_ERR ) { // An error status indicates that the connection was lost unexpectedly and not // via a request from backend. EnqueueBackendLost(async_ctx->errstr); } else { --active_ops; EnqueueBackendLost("Client disconnected"); CompleteCallback(close_cb, {ReturnCode::SUCCESS}); } redisAsyncFree(async_ctx); async_ctx = nullptr; } void Redis::ProcessFd(int fd, int flags) { auto locked_scope = conditionally_lock(zeek::run_state::reading_traces, expire_mutex); if ( (flags & IOSource::ProcessFlags::READ) != 0 ) redisAsyncHandleRead(async_ctx); if ( (flags & IOSource::ProcessFlags::WRITE) != 0 ) redisAsyncHandleWrite(async_ctx); } OperationResult Redis::ParseReplyError(std::string_view op_str, std::string_view reply_err_str) const { if ( async_ctx->err == REDIS_ERR_TIMEOUT ) return {ReturnCode::TIMEOUT}; else if ( async_ctx->err == REDIS_ERR_IO ) return {ReturnCode::OPERATION_FAILED, util::fmt("%s operation IO error: %s", op_str.data(), strerror(errno))}; else return {ReturnCode::OPERATION_FAILED, util::fmt("%s operation failed: %s", op_str.data(), reply_err_str.data())}; } void Redis::DoPoll() { while ( active_ops > 0 ) int status = redisPollTick(async_ctx, 0.5); } } // namespace zeek::storage::backend::redis