diff --git a/scripts/base/frameworks/storage/async.zeek b/scripts/base/frameworks/storage/async.zeek index 04398585f1..fa836759f4 100644 --- a/scripts/base/frameworks/storage/async.zeek +++ b/scripts/base/frameworks/storage/async.zeek @@ -81,30 +81,46 @@ export { function open_backend(btype: Storage::Backend, options: Storage::BackendOptions, key_type: any, val_type: any): Storage::OperationResult { - return Storage::Async::__open_backend(btype, options, key_type, val_type); + if ( options$forced_sync ) + return Storage::Sync::__open_backend(btype, options, key_type, val_type); + else + return Storage::Async::__open_backend(btype, options, key_type, val_type); } function close_backend(backend: opaque of Storage::BackendHandle) : Storage::OperationResult { - return Storage::Async::__close_backend(backend); + if ( Storage::is_forced_sync(backend) ) + return Storage::Sync::__close_backend(backend); + else + return Storage::Async::__close_backend(backend); } function put(backend: opaque of Storage::BackendHandle, args: Storage::PutArgs) : Storage::OperationResult { - return Storage::Async::__put(backend, args$key, args$value, args$overwrite, - args$expire_time); + if ( Storage::is_forced_sync(backend) ) + return Storage::Sync::__put(backend, args$key, args$value, args$overwrite, + args$expire_time); + else + return Storage::Async::__put(backend, args$key, args$value, args$overwrite, + args$expire_time); } function get(backend: opaque of Storage::BackendHandle, key: any) : Storage::OperationResult { - return Storage::Async::__get(backend, key); + if ( Storage::is_forced_sync(backend) ) + return Storage::Sync::__get(backend, key); + else + return Storage::Async::__get(backend, key); } function erase(backend: opaque of Storage::BackendHandle, key: any) : Storage::OperationResult { - return Storage::Async::__erase(backend, key); + if ( Storage::is_forced_sync(backend) ) + return Storage::Sync::__erase(backend, key); + else + return Storage::Async::__erase(backend, key); } diff --git a/scripts/base/frameworks/storage/main.zeek b/scripts/base/frameworks/storage/main.zeek index 51ab13c27c..4a3af7c6b8 100644 --- a/scripts/base/frameworks/storage/main.zeek +++ b/scripts/base/frameworks/storage/main.zeek @@ -3,6 +3,9 @@ module Storage; export { + # Default value for the BackendOptions::forced_sync field. + const default_forced_sync: bool = F &redef; + ## Base record for backend options that can be passed to ## :zeek:see:`Storage::Async::open_backend` and ## :zeek:see:`Storage::Sync::open_backend`. Backend plugins can redef this record @@ -10,6 +13,11 @@ export { type BackendOptions: record { ## The serializer used for converting Zeek data. serializer: Storage::Serializer &default=Storage::STORAGE_SERIALIZER_JSON; + + ## Sets the backend into forced-synchronous mode. All operations will run + ## in synchronous mode, even if the async functions are called. This + ## should generally only be set to ``T`` during testing. + forced_sync : bool &default=Storage::default_forced_sync; }; ## Record for passing arguments to :zeek:see:`Storage::Async::put` and diff --git a/src/storage/Backend.cc b/src/storage/Backend.cc index 533d11eaa0..8545f0e8c0 100644 --- a/src/storage/Backend.cc +++ b/src/storage/Backend.cc @@ -127,6 +127,8 @@ OperationResult Backend::Open(OpenResultCallback* cb, RecordValPtr options, Type val_type = std::move(vt); backend_options = options; + forced_sync = options->GetField("forced_sync")->Get(); + auto stype = options->GetField("serializer"); zeek::Tag stag{stype}; diff --git a/src/storage/Backend.h b/src/storage/Backend.h index 8aa660cc68..5e5c0799e4 100644 --- a/src/storage/Backend.h +++ b/src/storage/Backend.h @@ -216,6 +216,11 @@ public: */ const RecordValPtr& Options() const { return backend_options; } + /** + * Returns the state of the forced_sync option that was passed in the options record to Open(). + */ + bool IsForcedSync() const { return forced_sync; } + protected: // Allow the manager to call Open/Close. friend class storage::Manager; @@ -385,6 +390,7 @@ private: void InitMetrics(); uint8_t modes; + bool forced_sync = false; bool metrics_initialized = false; // These are owned by the backend but are passed into the callbacks to be diff --git a/src/storage/storage.bif b/src/storage/storage.bif index 0c6413333f..c497e542f9 100644 --- a/src/storage/storage.bif +++ b/src/storage/storage.bif @@ -22,3 +22,18 @@ function Storage::is_open%(backend: opaque of Storage::BackendHandle%) : bool auto b = storage::detail::BackendHandleVal::CastFromAny(backend); return zeek::val_mgr->Bool(b.has_value()); %} + +## Checks whether a storage backend was opened in forced-synchronous mode. +## +## backend: A handle to the backend to check. +## +## Returns: T if the forced_synchronous option was set to T, F otherwise or if the +## handle is invalid. +function Storage::is_forced_sync%(backend: opaque of Storage::BackendHandle%) : bool + %{ + auto b = storage::detail::BackendHandleVal::CastFromAny(backend); + if ( ! b.has_value() ) + return zeek::val_mgr->Bool(false); + + return zeek::val_mgr->Bool(b.value()->backend->IsForcedSync()); + %} diff --git a/testing/btest/Baseline/scripts.base.frameworks.storage.redis-disconnect/out b/testing/btest/Baseline/scripts.base.frameworks.storage.redis-disconnect/out index 6fde062ec8..8b91e6b95d 100644 --- a/testing/btest/Baseline/scripts.base.frameworks.storage.redis-disconnect/out +++ b/testing/btest/Baseline/scripts.base.frameworks.storage.redis-disconnect/out @@ -1,4 +1,4 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. open_result, [code=Storage::SUCCESS, error_str=, value=] -Storage::backend_opened, Storage::STORAGE_BACKEND_REDIS, [serializer=Storage::STORAGE_SERIALIZER_JSON, redis=[server_host=127.0.0.1, server_port=xxxx/tcp, server_unix_socket=, key_prefix=testing, connect_timeout=5.0 secs, operation_timeout=5.0 secs, username=, password=]] -Storage::backend_lost, Storage::STORAGE_BACKEND_REDIS, [serializer=Storage::STORAGE_SERIALIZER_JSON, redis=[server_host=127.0.0.1, server_port=xxxx/tcp, server_unix_socket=, key_prefix=testing, connect_timeout=5.0 secs, operation_timeout=5.0 secs, username=, password=]], Server closed the connection +Storage::backend_opened, Storage::STORAGE_BACKEND_REDIS, [serializer=Storage::STORAGE_SERIALIZER_JSON, forced_sync=F, redis=[server_host=127.0.0.1, server_port=xxxx/tcp, server_unix_socket=, key_prefix=testing, connect_timeout=5.0 secs, operation_timeout=5.0 secs, username=, password=]] +Storage::backend_lost, Storage::STORAGE_BACKEND_REDIS, [serializer=Storage::STORAGE_SERIALIZER_JSON, forced_sync=F, redis=[server_host=127.0.0.1, server_port=xxxx/tcp, server_unix_socket=, key_prefix=testing, connect_timeout=5.0 secs, operation_timeout=5.0 secs, username=, password=]], Server closed the connection diff --git a/testing/btest/Baseline/scripts.base.frameworks.storage.redis-forced-sync/out b/testing/btest/Baseline/scripts.base.frameworks.storage.redis-forced-sync/out new file mode 100644 index 0000000000..96d4a9da72 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.storage.redis-forced-sync/out @@ -0,0 +1,28 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +open result, [code=Storage::SUCCESS, error_str=, value=] +put result, [code=Storage::SUCCESS, error_str=, value=] +get result, [code=Storage::SUCCESS, error_str=, value=value5678] +get result same as inserted, T +inner get done +inner put done +outer open done + +Post-operation metrics: +Telemetry::COUNTER, zeek, zeek_storage_backends_opened_total, [], [], 1.0 +Telemetry::COUNTER, zeek, zeek_storage_backend_data_read_bytes_total, [config, type], [server_addr-testing, Storage::STORAGE_BACKEND_REDIS], 34.0 +Telemetry::COUNTER, zeek, zeek_storage_backend_expired_entries_total, [config, type], [server_addr-testing, Storage::STORAGE_BACKEND_REDIS], 0.0 +Telemetry::COUNTER, zeek, zeek_storage_backend_operation_results_total, [config, operation, result, type], [server_addr-testing, erase, error, Storage::STORAGE_BACKEND_REDIS], 0.0 +Telemetry::COUNTER, zeek, zeek_storage_backend_operation_results_total, [config, operation, result, type], [server_addr-testing, erase, fail, Storage::STORAGE_BACKEND_REDIS], 0.0 +Telemetry::COUNTER, zeek, zeek_storage_backend_operation_results_total, [config, operation, result, type], [server_addr-testing, erase, success, Storage::STORAGE_BACKEND_REDIS], 0.0 +Telemetry::COUNTER, zeek, zeek_storage_backend_operation_results_total, [config, operation, result, type], [server_addr-testing, erase, timeout, Storage::STORAGE_BACKEND_REDIS], 0.0 +Telemetry::COUNTER, zeek, zeek_storage_backend_operation_results_total, [config, operation, result, type], [server_addr-testing, get, error, Storage::STORAGE_BACKEND_REDIS], 0.0 +Telemetry::COUNTER, zeek, zeek_storage_backend_operation_results_total, [config, operation, result, type], [server_addr-testing, get, fail, Storage::STORAGE_BACKEND_REDIS], 0.0 +Telemetry::COUNTER, zeek, zeek_storage_backend_operation_results_total, [config, operation, result, type], [server_addr-testing, get, success, Storage::STORAGE_BACKEND_REDIS], 1.0 +Telemetry::COUNTER, zeek, zeek_storage_backend_operation_results_total, [config, operation, result, type], [server_addr-testing, get, timeout, Storage::STORAGE_BACKEND_REDIS], 0.0 +Telemetry::COUNTER, zeek, zeek_storage_backend_operation_results_total, [config, operation, result, type], [server_addr-testing, put, error, Storage::STORAGE_BACKEND_REDIS], 0.0 +Telemetry::COUNTER, zeek, zeek_storage_backend_operation_results_total, [config, operation, result, type], [server_addr-testing, put, fail, Storage::STORAGE_BACKEND_REDIS], 0.0 +Telemetry::COUNTER, zeek, zeek_storage_backend_operation_results_total, [config, operation, result, type], [server_addr-testing, put, success, Storage::STORAGE_BACKEND_REDIS], 1.0 +Telemetry::COUNTER, zeek, zeek_storage_backend_operation_results_total, [config, operation, result, type], [server_addr-testing, put, timeout, Storage::STORAGE_BACKEND_REDIS], 0.0 +Telemetry::COUNTER, zeek, zeek_storage_backend_data_written_bytes_total, [config, type], [server_addr-testing, Storage::STORAGE_BACKEND_REDIS], 18.0 + +close result, [code=Storage::SUCCESS, error_str=, value=] diff --git a/testing/btest/Baseline/scripts.base.frameworks.storage.redis-sync/out b/testing/btest/Baseline/scripts.base.frameworks.storage.redis-sync/out index 565f0c8832..bf33393768 100644 --- a/testing/btest/Baseline/scripts.base.frameworks.storage.redis-sync/out +++ b/testing/btest/Baseline/scripts.base.frameworks.storage.redis-sync/out @@ -10,7 +10,7 @@ put result, [code=Storage::SUCCESS, error_str=, value=, value=value2345] get result same as overwritten, T get result, [code=Storage::KEY_NOT_FOUND, error_str=, value=] -Storage::backend_opened, Storage::STORAGE_BACKEND_REDIS, [serializer=Storage::STORAGE_SERIALIZER_JSON, redis=[server_host=127.0.0.1, server_port=XXXX/tcp, server_unix_socket=, key_prefix=testing, connect_timeout=5.0 secs, operation_timeout=5.0 secs, username=, password=]] +Storage::backend_opened, Storage::STORAGE_BACKEND_REDIS, [serializer=Storage::STORAGE_SERIALIZER_JSON, forced_sync=F, redis=[server_host=127.0.0.1, server_port=XXXX/tcp, server_unix_socket=, key_prefix=testing, connect_timeout=5.0 secs, operation_timeout=5.0 secs, username=, password=]] Post-operation metrics: Telemetry::COUNTER, zeek, zeek_storage_backends_opened_total, [], [], 1.0 @@ -30,4 +30,4 @@ Telemetry::COUNTER, zeek, zeek_storage_backend_operation_results_total, [config, Telemetry::COUNTER, zeek, zeek_storage_backend_operation_results_total, [config, operation, result, type], [server_addr-testing, put, timeout, Storage::STORAGE_BACKEND_REDIS], 0.0 Telemetry::COUNTER, zeek, zeek_storage_backend_data_written_bytes_total, [config, type], [server_addr-testing, Storage::STORAGE_BACKEND_REDIS], 54.0 -Storage::backend_lost, Storage::STORAGE_BACKEND_REDIS, [serializer=Storage::STORAGE_SERIALIZER_JSON, redis=[server_host=127.0.0.1, server_port=XXXX/tcp, server_unix_socket=, key_prefix=testing, connect_timeout=5.0 secs, operation_timeout=5.0 secs, username=, password=]], Client disconnected +Storage::backend_lost, Storage::STORAGE_BACKEND_REDIS, [serializer=Storage::STORAGE_SERIALIZER_JSON, forced_sync=F, redis=[server_host=127.0.0.1, server_port=XXXX/tcp, server_unix_socket=, key_prefix=testing, connect_timeout=5.0 secs, operation_timeout=5.0 secs, username=, password=]], Client disconnected diff --git a/testing/btest/Baseline/scripts.base.frameworks.storage.sqlite-basic-sync-in-when/out b/testing/btest/Baseline/scripts.base.frameworks.storage.sqlite-basic-sync-in-when/out index 3690e561c6..1efa1c4bff 100644 --- a/testing/btest/Baseline/scripts.base.frameworks.storage.sqlite-basic-sync-in-when/out +++ b/testing/btest/Baseline/scripts.base.frameworks.storage.sqlite-basic-sync-in-when/out @@ -4,7 +4,7 @@ put result, [code=Storage::SUCCESS, error_str=, value=, value=value5678] get result same as inserted, T closed succesfully -Storage::backend_opened, Storage::STORAGE_BACKEND_SQLITE, [serializer=Storage::STORAGE_SERIALIZER_JSON, sqlite=[database_path=test.sqlite, table_name=testing, busy_timeout=5.0 secs, pragma_commands={ +Storage::backend_opened, Storage::STORAGE_BACKEND_SQLITE, [serializer=Storage::STORAGE_SERIALIZER_JSON, forced_sync=F, sqlite=[database_path=test.sqlite, table_name=testing, busy_timeout=5.0 secs, pragma_commands={ [integrity_check] = , [journal_mode] = WAL, [synchronous] = normal, diff --git a/testing/btest/Baseline/scripts.base.frameworks.storage.sqlite-basic/out b/testing/btest/Baseline/scripts.base.frameworks.storage.sqlite-basic/out index e93c61da36..b5a4e5b15f 100644 --- a/testing/btest/Baseline/scripts.base.frameworks.storage.sqlite-basic/out +++ b/testing/btest/Baseline/scripts.base.frameworks.storage.sqlite-basic/out @@ -1,5 +1,5 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. -Storage::backend_opened, Storage::STORAGE_BACKEND_SQLITE, [serializer=Storage::STORAGE_SERIALIZER_JSON, sqlite=[database_path=test.sqlite, table_name=testing, busy_timeout=5.0 secs, pragma_commands={ +Storage::backend_opened, Storage::STORAGE_BACKEND_SQLITE, [serializer=Storage::STORAGE_SERIALIZER_JSON, forced_sync=F, sqlite=[database_path=test.sqlite, table_name=testing, busy_timeout=5.0 secs, pragma_commands={ [integrity_check] = , [journal_mode] = WAL, [synchronous] = normal, diff --git a/testing/btest/Baseline/scripts.base.frameworks.storage.sqlite-erase/out b/testing/btest/Baseline/scripts.base.frameworks.storage.sqlite-erase/out index 9a0d8c8402..1f91c462fc 100644 --- a/testing/btest/Baseline/scripts.base.frameworks.storage.sqlite-erase/out +++ b/testing/btest/Baseline/scripts.base.frameworks.storage.sqlite-erase/out @@ -2,7 +2,7 @@ open result, [code=Storage::SUCCESS, error_str=, value=] erase result, [code=Storage::SUCCESS, error_str=, value=] get result, [code=Storage::KEY_NOT_FOUND, error_str=, value=] -Storage::backend_opened, Storage::STORAGE_BACKEND_SQLITE, [serializer=Storage::STORAGE_SERIALIZER_JSON, sqlite=[database_path=storage-test.sqlite, table_name=testing, busy_timeout=5.0 secs, pragma_commands={ +Storage::backend_opened, Storage::STORAGE_BACKEND_SQLITE, [serializer=Storage::STORAGE_SERIALIZER_JSON, forced_sync=F, sqlite=[database_path=storage-test.sqlite, table_name=testing, busy_timeout=5.0 secs, pragma_commands={ [integrity_check] = , [journal_mode] = WAL, [synchronous] = normal, diff --git a/testing/btest/scripts/base/frameworks/storage/redis-async.zeek b/testing/btest/scripts/base/frameworks/storage/redis-async.zeek index b16cd62747..c3e109cd04 100644 --- a/testing/btest/scripts/base/frameworks/storage/redis-async.zeek +++ b/testing/btest/scripts/base/frameworks/storage/redis-async.zeek @@ -68,7 +68,7 @@ event zeek_init() if ( get_res$code == Storage::SUCCESS && get_res?$value ) print "get result same as inserted", value == ( get_res$value as string ); - schedule 100 msec { print_metrics_and_close() }; + event print_metrics_and_close(); } timeout 5sec { diff --git a/testing/btest/scripts/base/frameworks/storage/redis-forced-sync.zeek b/testing/btest/scripts/base/frameworks/storage/redis-forced-sync.zeek new file mode 100644 index 0000000000..e0665ae76f --- /dev/null +++ b/testing/btest/scripts/base/frameworks/storage/redis-forced-sync.zeek @@ -0,0 +1,97 @@ +# @TEST-DOC: Tests basic Redis storage backend functions in async mode when using the forced_synchronous flag. + +# @TEST-REQUIRES: have-redis +# @TEST-PORT: REDIS_PORT + +# @TEST-EXEC: btest-bg-run redis-server run-redis-server ${REDIS_PORT%/tcp} +# @TEST-EXEC: zeek -b %INPUT | sed "s|-${REDIS_PORT%/tcp}-|-XXXX-|g" > out +# @TEST-EXEC: btest-bg-wait -k 0 + +# @TEST-EXEC: btest-diff out + +@load base/frameworks/storage/async +@load policy/frameworks/storage/backend/redis +@load base/frameworks/telemetry + +# Make sure the telemetry output is in a fixed order. +redef running_under_test = T; + +redef exit_only_after_terminate = T; +global b : opaque of Storage::BackendHandle; + +event print_metrics_and_close() + { + print ""; + print "Post-operation metrics:"; + local storage_metrics = Telemetry::collect_metrics("zeek", "storage*"); + for (_, m in storage_metrics) + { + print m$opts$metric_type, m$opts$prefix, m$opts$name, m$label_names, m$label_values, m$value; + } + print ""; + + when [] ( local close_res = Storage::Async::close_backend(b) ) + { + print "close result", close_res; + terminate(); + } + timeout 5sec + { + print "close result", close_res; + terminate(); + } + } + +event zeek_init() + { + local opts: Storage::BackendOptions; + opts$forced_sync = T; + opts$redis = [ $server_host="127.0.0.1", $server_port=to_port(getenv( + "REDIS_PORT")), $key_prefix="testing" ]; + + local key = "key1234"; + local value = "value5678"; + + when [opts, key, value] ( local open_res = Storage::Async::open_backend( + Storage::STORAGE_BACKEND_REDIS, opts, string, string) ) + { + print "open result", open_res; + b = open_res$value; + + when [key, value] ( local put_res = Storage::Async::put(b, [ $key=key, + $value=value ]) ) + { + print "put result", put_res; + + when [key, value] ( local get_res = Storage::Async::get(b, key) ) + { + print "get result", get_res; + if ( get_res$code == Storage::SUCCESS && get_res?$value ) + print "get result same as inserted", value == ( get_res$value as string ); + + event print_metrics_and_close(); + } + timeout 5sec + { + print "get request timed out"; + terminate(); + } + + print "inner get done"; + } + timeout 5sec + { + print "put request timed out"; + terminate(); + } + + print "inner put done"; + } + timeout 5sec + { + print "open request timed out"; + terminate(); + } + + print "outer open done"; + }