diff --git a/src/storage/Backend.cc b/src/storage/Backend.cc index b1ceb92a54..605cb900da 100644 --- a/src/storage/Backend.cc +++ b/src/storage/Backend.cc @@ -5,6 +5,7 @@ #include "zeek/Trigger.h" #include "zeek/broker/Data.h" #include "zeek/storage/ReturnCode.h" +#include "zeek/storage/storage.bif.h" namespace zeek::storage { @@ -68,6 +69,10 @@ OpenResultCallback::OpenResultCallback(zeek::detail::trigger::TriggerPtr trigger : ResultCallback(std::move(trigger), assoc), backend(std::move(backend)) {} void OpenResultCallback::Complete(OperationResult res) { + if ( res.code == ReturnCode::SUCCESS ) { + backend->backend->EnqueueBackendOpened(); + } + // If this is a sync callback, there isn't a trigger to process. Store the result and bail. Always // set result's value to the backend pointer so that it comes across in the result. This ensures // the handle is always available in the result even on failures. @@ -94,6 +99,7 @@ void OpenResultCallback::Complete(OperationResult res) { OperationResult Backend::Open(RecordValPtr options, TypePtr kt, TypePtr vt, OpenResultCallback* cb) { key_type = std::move(kt); val_type = std::move(vt); + backend_options = options; auto ret = DoOpen(std::move(options), cb); if ( ! ret.value ) @@ -153,6 +159,15 @@ void Backend::CompleteCallback(ResultCallback* cb, const OperationResult& data) } } +void Backend::EnqueueBackendOpened() { + event_mgr.Enqueue(Storage::backend_opened, make_intrusive(Tag()), backend_options); +} + +void Backend::EnqueueBackendLost(std::string_view reason) { + event_mgr.Enqueue(Storage::backend_lost, make_intrusive(Tag()), backend_options, + make_intrusive(reason)); +} + zeek::OpaqueTypePtr detail::backend_opaque; IMPLEMENT_OPAQUE_VALUE(detail::BackendHandleVal) diff --git a/src/storage/Backend.h b/src/storage/Backend.h index 139a1ab96c..828349526a 100644 --- a/src/storage/Backend.h +++ b/src/storage/Backend.h @@ -114,10 +114,15 @@ public: */ virtual void Poll() {} + const RecordValPtr& Options() const { return backend_options; } + protected: // Allow the manager to call Open/Close. friend class storage::Manager; + // Allow OpenResultCallback to call EnqueueConnectionEstablished. + friend class storage::OpenResultCallback; + /** * Constructor * @@ -183,10 +188,24 @@ protected: */ virtual void Expire() {} + /** + * Enqueues the Storage::backend_opened event. This is called automatically + * when an OpenResultCallback is completed successfully. + */ + void EnqueueBackendOpened(); + + /** + * Enqueues the Storage::backend_lost event with an optional reason + * string. This should be called by the backends whenever they lose their + * connection. + */ + void EnqueueBackendLost(std::string_view reason); + void CompleteCallback(ResultCallback* cb, const OperationResult& data) const; TypePtr key_type; TypePtr val_type; + RecordValPtr backend_options; std::string tag; diff --git a/src/storage/backend/redis/Redis.cc b/src/storage/backend/redis/Redis.cc index e5e5312987..bda140ef0a 100644 --- a/src/storage/backend/redis/Redis.cc +++ b/src/storage/backend/redis/Redis.cc @@ -8,6 +8,7 @@ #include "zeek/Val.h" #include "zeek/iosource/Manager.h" #include "zeek/storage/ReturnCode.h" +#include "zeek/storage/storage.bif.h" #include "hiredis/adapters/poll.h" #include "hiredis/async.h" @@ -481,7 +482,7 @@ void Redis::OnConnect(int status) { if ( status == REDIS_OK ) { connected = true; CompleteCallback(open_cb, {ReturnCode::SUCCESS}); - // TODO: post connect event + // The connection_established event is sent via the open callback handler. return; } @@ -493,17 +494,14 @@ void Redis::OnConnect(int status) { void Redis::OnDisconnect(int status) { DBG_LOG(DBG_STORAGE, "Redis backend: disconnection event"); + --active_ops; - - if ( status == REDIS_OK ) { - // TODO: this was an intentional disconnect, nothing to do? - } - else { - // TODO: this was unintentional, should we reconnect? - // TODO: post disconnect event - } - connected = false; + + if ( status == REDIS_ERR ) + EnqueueBackendLost(async_ctx->errstr); + else + EnqueueBackendLost("Client disconnected"); } void Redis::ProcessFd(int fd, int flags) { diff --git a/src/storage/storage.bif b/src/storage/storage.bif index c7c55ef2e4..7213618c56 100644 --- a/src/storage/storage.bif +++ b/src/storage/storage.bif @@ -45,11 +45,13 @@ static IntrusivePtr make_backend_handle(Val* module Storage; -# Generated when a new backend connection is opened -event Storage::backend_opened%(%); +## Generated automatically when a new backend connection is opened successfully. +event Storage::backend_opened%(tag: string, options: any%); -# Generated when a backend connection is lost -event Storage::backend_lost%(%); +## May be generated when a backend connection is lost, both normally and +## unexpectedly. This event depends on the backends implementing handling for +## it, and is not generated automatically by the storage framework. +event Storage::backend_lost%(tag: string, options: any, reason: string%); module Storage::Async; diff --git a/testing/btest/Baseline/scripts.base.frameworks.storage.redis-disconnect/out b/testing/btest/Baseline/scripts.base.frameworks.storage.redis-disconnect/out new file mode 100644 index 0000000000..3308aef14e --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.storage.redis-disconnect/out @@ -0,0 +1,4 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +open_result, [code=Storage::SUCCESS, error_str=, value=] +Storage::backend_opened, Storage::REDIS, [redis=[server_host=127.0.0.1, server_port=xxxx/tcp, server_unix_socket=, key_prefix=testing]] +Storage::backend_lost, Storage::REDIS, [redis=[server_host=127.0.0.1, server_port=xxxx/tcp, server_unix_socket=, key_prefix=testing]], Server closed the connection diff --git a/testing/btest/Baseline/scripts.base.frameworks.storage.redis-sync/out b/testing/btest/Baseline/scripts.base.frameworks.storage.redis-sync/out index 2f1d82d855..a8fa18538e 100644 --- a/testing/btest/Baseline/scripts.base.frameworks.storage.redis-sync/out +++ b/testing/btest/Baseline/scripts.base.frameworks.storage.redis-sync/out @@ -6,3 +6,5 @@ get result same as inserted, T overwrite put result, [code=Storage::SUCCESS, error_str=, value=] get result, [code=Storage::SUCCESS, error_str=, value=value5678] get result same as inserted, T +Storage::backend_opened, Storage::REDIS, [redis=[server_host=127.0.0.1, server_port=xxxx/tcp, server_unix_socket=, key_prefix=testing]] +Storage::backend_lost, Storage::REDIS, [redis=[server_host=127.0.0.1, server_port=xxxx/tcp, server_unix_socket=, key_prefix=testing]], Client disconnected diff --git a/testing/btest/Baseline/scripts.base.frameworks.storage.sqlite-basic/out b/testing/btest/Baseline/scripts.base.frameworks.storage.sqlite-basic/out index c9184cdc98..781e32a333 100644 --- a/testing/btest/Baseline/scripts.base.frameworks.storage.sqlite-basic/out +++ b/testing/btest/Baseline/scripts.base.frameworks.storage.sqlite-basic/out @@ -1,4 +1,9 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +Storage::backend_opened, Storage::SQLITE, [sqlite=[database_path=test.sqlite, table_name=testing, tuning_params={ +[synchronous] = normal, +[temp_store] = memory, +[journal_mode] = WAL +}]] open result, [code=Storage::SUCCESS, error_str=, value=] put result, [code=Storage::SUCCESS, error_str=, value=] get result, [code=Storage::SUCCESS, error_str=, value=value5678] diff --git a/testing/btest/scripts/base/frameworks/storage/redis-disconnect.zeek b/testing/btest/scripts/base/frameworks/storage/redis-disconnect.zeek new file mode 100644 index 0000000000..c43dcbc99a --- /dev/null +++ b/testing/btest/scripts/base/frameworks/storage/redis-disconnect.zeek @@ -0,0 +1,44 @@ +# @TEST-DOC: Tests basic Redis storage backend functions in sync mode, including overwriting + +# @TEST-REQUIRES: have-redis +# @TEST-PORT: REDIS_PORT + +# @TEST-EXEC: btest-bg-run redis-server run-redis-server ${REDIS_PORT%/tcp} +# @TEST-EXEC: zeek -b %INPUT | sed 's|=[0-9]*/tcp|=xxxx/tcp|g' > out +# @TEST-EXEC: btest-bg-wait -k 0 + +# @TEST-EXEC: btest-diff out + +@load base/frameworks/storage/sync +@load policy/frameworks/storage/backend/redis + +redef exit_only_after_terminate = T; + +# Create a typename here that can be passed down into open_backend() +type str: string; + +event Storage::backend_opened(tag: string, config: any) { + print "Storage::backend_opened", tag, config; +} + +event Storage::backend_lost(tag: string, config: any, reason: string) { + print "Storage::backend_lost", tag, config, reason; + terminate(); +} + +event zeek_init() + { + local opts: Storage::BackendOptions; + opts$redis = [ $server_host="127.0.0.1", $server_port=to_port(getenv( + "REDIS_PORT")), $key_prefix="testing" ]; + + local key = "key1234"; + local value = "value1234"; + + local open_res = Storage::Sync::open_backend(Storage::REDIS, opts, str, str); + print "open_result", open_res; + + # Kill the redis server so the backend will disconnect and fire the backend_lost event. + system("cat redis-server/redis.pid"); + system("kill $(cat redis-server/redis.pid)"); + } diff --git a/testing/btest/scripts/base/frameworks/storage/redis-sync.zeek b/testing/btest/scripts/base/frameworks/storage/redis-sync.zeek index f3d5d4b684..ee2dc88031 100644 --- a/testing/btest/scripts/base/frameworks/storage/redis-sync.zeek +++ b/testing/btest/scripts/base/frameworks/storage/redis-sync.zeek @@ -4,7 +4,7 @@ # @TEST-PORT: REDIS_PORT # @TEST-EXEC: btest-bg-run redis-server run-redis-server ${REDIS_PORT%/tcp} -# @TEST-EXEC: zeek -b %INPUT > out +# @TEST-EXEC: zeek -b %INPUT | sed 's|=[0-9]*/tcp|=xxxx/tcp|g' > out # @TEST-EXEC: btest-bg-wait -k 0 # @TEST-EXEC: btest-diff out @@ -15,6 +15,15 @@ # Create a typename here that can be passed down into open_backend() type str: string; +event Storage::backend_opened(tag: string, config: any) { + print "Storage::backend_opened", tag, config; +} + +event Storage::backend_lost(tag: string, config: any, reason: string) { + print "Storage::backend_lost", tag, config, reason; + terminate(); +} + event zeek_init() { local opts: Storage::BackendOptions; diff --git a/testing/btest/scripts/base/frameworks/storage/sqlite-basic.zeek b/testing/btest/scripts/base/frameworks/storage/sqlite-basic.zeek index 92f2a34059..3f37dd352b 100644 --- a/testing/btest/scripts/base/frameworks/storage/sqlite-basic.zeek +++ b/testing/btest/scripts/base/frameworks/storage/sqlite-basic.zeek @@ -11,6 +11,10 @@ redef exit_only_after_terminate = T; # Create a typename here that can be passed down into get(). type str: string; +event Storage::backend_opened(tag: string, config: any) { + print "Storage::backend_opened", tag, config; +} + event zeek_init() { # Create a database file in the .tmp directory with a 'testing' table