Add flag to force synchronous mode when calling storage script-land functions

This commit is contained in:
Tim Wojtulewicz 2025-07-21 17:12:40 -07:00
parent ee5ffdf42c
commit 7e3ed2010d
13 changed files with 186 additions and 14 deletions

View file

@ -81,18 +81,28 @@ export {
function open_backend(btype: Storage::Backend, options: Storage::BackendOptions,
key_type: any, val_type: any): Storage::OperationResult
{
if ( options$forced_sync )
return Storage::Sync::__open_backend(btype, options, key_type, val_type);
else
return Storage::Async::__open_backend(btype, options, key_type, val_type);
}
function close_backend(backend: opaque of Storage::BackendHandle)
: Storage::OperationResult
{
if ( Storage::is_forced_sync(backend) )
return Storage::Sync::__close_backend(backend);
else
return Storage::Async::__close_backend(backend);
}
function put(backend: opaque of Storage::BackendHandle, args: Storage::PutArgs)
: Storage::OperationResult
{
if ( Storage::is_forced_sync(backend) )
return Storage::Sync::__put(backend, args$key, args$value, args$overwrite,
args$expire_time);
else
return Storage::Async::__put(backend, args$key, args$value, args$overwrite,
args$expire_time);
}
@ -100,11 +110,17 @@ function put(backend: opaque of Storage::BackendHandle, args: Storage::PutArgs)
function get(backend: opaque of Storage::BackendHandle, key: any)
: Storage::OperationResult
{
if ( Storage::is_forced_sync(backend) )
return Storage::Sync::__get(backend, key);
else
return Storage::Async::__get(backend, key);
}
function erase(backend: opaque of Storage::BackendHandle, key: any)
: Storage::OperationResult
{
if ( Storage::is_forced_sync(backend) )
return Storage::Sync::__erase(backend, key);
else
return Storage::Async::__erase(backend, key);
}

View file

@ -3,6 +3,9 @@
module Storage;
export {
# Default value for the BackendOptions::forced_sync field.
const default_forced_sync: bool = F &redef;
## Base record for backend options that can be passed to
## :zeek:see:`Storage::Async::open_backend` and
## :zeek:see:`Storage::Sync::open_backend`. Backend plugins can redef this record
@ -10,6 +13,11 @@ export {
type BackendOptions: record {
## The serializer used for converting Zeek data.
serializer: Storage::Serializer &default=Storage::STORAGE_SERIALIZER_JSON;
## Sets the backend into forced-synchronous mode. All operations will run
## in synchronous mode, even if the async functions are called. This
## should generally only be set to ``T`` during testing.
forced_sync : bool &default=Storage::default_forced_sync;
};
## Record for passing arguments to :zeek:see:`Storage::Async::put` and

View file

@ -127,6 +127,8 @@ OperationResult Backend::Open(OpenResultCallback* cb, RecordValPtr options, Type
val_type = std::move(vt);
backend_options = options;
forced_sync = options->GetField<BoolVal>("forced_sync")->Get();
auto stype = options->GetField<EnumVal>("serializer");
zeek::Tag stag{stype};

View file

@ -216,6 +216,11 @@ public:
*/
const RecordValPtr& Options() const { return backend_options; }
/**
* Returns the state of the forced_sync option that was passed in the options record to Open().
*/
bool IsForcedSync() const { return forced_sync; }
protected:
// Allow the manager to call Open/Close.
friend class storage::Manager;
@ -385,6 +390,7 @@ private:
void InitMetrics();
uint8_t modes;
bool forced_sync = false;
bool metrics_initialized = false;
// These are owned by the backend but are passed into the callbacks to be

View file

@ -22,3 +22,18 @@ function Storage::is_open%(backend: opaque of Storage::BackendHandle%) : bool
auto b = storage::detail::BackendHandleVal::CastFromAny(backend);
return zeek::val_mgr->Bool(b.has_value());
%}
## Checks whether a storage backend was opened in forced-synchronous mode.
##
## backend: A handle to the backend to check.
##
## Returns: T if the forced_synchronous option was set to T, F otherwise or if the
## handle is invalid.
function Storage::is_forced_sync%(backend: opaque of Storage::BackendHandle%) : bool
%{
auto b = storage::detail::BackendHandleVal::CastFromAny(backend);
if ( ! b.has_value() )
return zeek::val_mgr->Bool(false);
return zeek::val_mgr->Bool(b.value()->backend->IsForcedSync());
%}

View file

@ -1,4 +1,4 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
open_result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=<opaque of BackendHandleVal>]
Storage::backend_opened, Storage::STORAGE_BACKEND_REDIS, [serializer=Storage::STORAGE_SERIALIZER_JSON, redis=[server_host=127.0.0.1, server_port=xxxx/tcp, server_unix_socket=<uninitialized>, key_prefix=testing, connect_timeout=5.0 secs, operation_timeout=5.0 secs, username=<uninitialized>, password=<uninitialized>]]
Storage::backend_lost, Storage::STORAGE_BACKEND_REDIS, [serializer=Storage::STORAGE_SERIALIZER_JSON, redis=[server_host=127.0.0.1, server_port=xxxx/tcp, server_unix_socket=<uninitialized>, key_prefix=testing, connect_timeout=5.0 secs, operation_timeout=5.0 secs, username=<uninitialized>, password=<uninitialized>]], Server closed the connection
Storage::backend_opened, Storage::STORAGE_BACKEND_REDIS, [serializer=Storage::STORAGE_SERIALIZER_JSON, forced_sync=F, redis=[server_host=127.0.0.1, server_port=xxxx/tcp, server_unix_socket=<uninitialized>, key_prefix=testing, connect_timeout=5.0 secs, operation_timeout=5.0 secs, username=<uninitialized>, password=<uninitialized>]]
Storage::backend_lost, Storage::STORAGE_BACKEND_REDIS, [serializer=Storage::STORAGE_SERIALIZER_JSON, forced_sync=F, redis=[server_host=127.0.0.1, server_port=xxxx/tcp, server_unix_socket=<uninitialized>, key_prefix=testing, connect_timeout=5.0 secs, operation_timeout=5.0 secs, username=<uninitialized>, password=<uninitialized>]], Server closed the connection

View file

@ -0,0 +1,28 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
open result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=<opaque of BackendHandleVal>]
put result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=<uninitialized>]
get result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=value5678]
get result same as inserted, T
inner get done
inner put done
outer open done
Post-operation metrics:
Telemetry::COUNTER, zeek, zeek_storage_backends_opened_total, [], [], 1.0
Telemetry::COUNTER, zeek, zeek_storage_backend_data_read_bytes_total, [config, type], [server_addr-testing, Storage::STORAGE_BACKEND_REDIS], 34.0
Telemetry::COUNTER, zeek, zeek_storage_backend_expired_entries_total, [config, type], [server_addr-testing, Storage::STORAGE_BACKEND_REDIS], 0.0
Telemetry::COUNTER, zeek, zeek_storage_backend_operation_results_total, [config, operation, result, type], [server_addr-testing, erase, error, Storage::STORAGE_BACKEND_REDIS], 0.0
Telemetry::COUNTER, zeek, zeek_storage_backend_operation_results_total, [config, operation, result, type], [server_addr-testing, erase, fail, Storage::STORAGE_BACKEND_REDIS], 0.0
Telemetry::COUNTER, zeek, zeek_storage_backend_operation_results_total, [config, operation, result, type], [server_addr-testing, erase, success, Storage::STORAGE_BACKEND_REDIS], 0.0
Telemetry::COUNTER, zeek, zeek_storage_backend_operation_results_total, [config, operation, result, type], [server_addr-testing, erase, timeout, Storage::STORAGE_BACKEND_REDIS], 0.0
Telemetry::COUNTER, zeek, zeek_storage_backend_operation_results_total, [config, operation, result, type], [server_addr-testing, get, error, Storage::STORAGE_BACKEND_REDIS], 0.0
Telemetry::COUNTER, zeek, zeek_storage_backend_operation_results_total, [config, operation, result, type], [server_addr-testing, get, fail, Storage::STORAGE_BACKEND_REDIS], 0.0
Telemetry::COUNTER, zeek, zeek_storage_backend_operation_results_total, [config, operation, result, type], [server_addr-testing, get, success, Storage::STORAGE_BACKEND_REDIS], 1.0
Telemetry::COUNTER, zeek, zeek_storage_backend_operation_results_total, [config, operation, result, type], [server_addr-testing, get, timeout, Storage::STORAGE_BACKEND_REDIS], 0.0
Telemetry::COUNTER, zeek, zeek_storage_backend_operation_results_total, [config, operation, result, type], [server_addr-testing, put, error, Storage::STORAGE_BACKEND_REDIS], 0.0
Telemetry::COUNTER, zeek, zeek_storage_backend_operation_results_total, [config, operation, result, type], [server_addr-testing, put, fail, Storage::STORAGE_BACKEND_REDIS], 0.0
Telemetry::COUNTER, zeek, zeek_storage_backend_operation_results_total, [config, operation, result, type], [server_addr-testing, put, success, Storage::STORAGE_BACKEND_REDIS], 1.0
Telemetry::COUNTER, zeek, zeek_storage_backend_operation_results_total, [config, operation, result, type], [server_addr-testing, put, timeout, Storage::STORAGE_BACKEND_REDIS], 0.0
Telemetry::COUNTER, zeek, zeek_storage_backend_data_written_bytes_total, [config, type], [server_addr-testing, Storage::STORAGE_BACKEND_REDIS], 18.0
close result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=<uninitialized>]

View file

@ -10,7 +10,7 @@ put result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=<uninitiali
get result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=value2345]
get result same as overwritten, T
get result, [code=Storage::KEY_NOT_FOUND, error_str=<uninitialized>, value=<uninitialized>]
Storage::backend_opened, Storage::STORAGE_BACKEND_REDIS, [serializer=Storage::STORAGE_SERIALIZER_JSON, redis=[server_host=127.0.0.1, server_port=XXXX/tcp, server_unix_socket=<uninitialized>, key_prefix=testing, connect_timeout=5.0 secs, operation_timeout=5.0 secs, username=<uninitialized>, password=<uninitialized>]]
Storage::backend_opened, Storage::STORAGE_BACKEND_REDIS, [serializer=Storage::STORAGE_SERIALIZER_JSON, forced_sync=F, redis=[server_host=127.0.0.1, server_port=XXXX/tcp, server_unix_socket=<uninitialized>, key_prefix=testing, connect_timeout=5.0 secs, operation_timeout=5.0 secs, username=<uninitialized>, password=<uninitialized>]]
Post-operation metrics:
Telemetry::COUNTER, zeek, zeek_storage_backends_opened_total, [], [], 1.0
@ -30,4 +30,4 @@ Telemetry::COUNTER, zeek, zeek_storage_backend_operation_results_total, [config,
Telemetry::COUNTER, zeek, zeek_storage_backend_operation_results_total, [config, operation, result, type], [server_addr-testing, put, timeout, Storage::STORAGE_BACKEND_REDIS], 0.0
Telemetry::COUNTER, zeek, zeek_storage_backend_data_written_bytes_total, [config, type], [server_addr-testing, Storage::STORAGE_BACKEND_REDIS], 54.0
Storage::backend_lost, Storage::STORAGE_BACKEND_REDIS, [serializer=Storage::STORAGE_SERIALIZER_JSON, redis=[server_host=127.0.0.1, server_port=XXXX/tcp, server_unix_socket=<uninitialized>, key_prefix=testing, connect_timeout=5.0 secs, operation_timeout=5.0 secs, username=<uninitialized>, password=<uninitialized>]], Client disconnected
Storage::backend_lost, Storage::STORAGE_BACKEND_REDIS, [serializer=Storage::STORAGE_SERIALIZER_JSON, forced_sync=F, redis=[server_host=127.0.0.1, server_port=XXXX/tcp, server_unix_socket=<uninitialized>, key_prefix=testing, connect_timeout=5.0 secs, operation_timeout=5.0 secs, username=<uninitialized>, password=<uninitialized>]], Client disconnected

View file

@ -4,7 +4,7 @@ put result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=<uninitiali
get result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=value5678]
get result same as inserted, T
closed succesfully
Storage::backend_opened, Storage::STORAGE_BACKEND_SQLITE, [serializer=Storage::STORAGE_SERIALIZER_JSON, sqlite=[database_path=test.sqlite, table_name=testing, busy_timeout=5.0 secs, pragma_commands={
Storage::backend_opened, Storage::STORAGE_BACKEND_SQLITE, [serializer=Storage::STORAGE_SERIALIZER_JSON, forced_sync=F, sqlite=[database_path=test.sqlite, table_name=testing, busy_timeout=5.0 secs, pragma_commands={
[integrity_check] = ,
[journal_mode] = WAL,
[synchronous] = normal,

View file

@ -1,5 +1,5 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
Storage::backend_opened, Storage::STORAGE_BACKEND_SQLITE, [serializer=Storage::STORAGE_SERIALIZER_JSON, sqlite=[database_path=test.sqlite, table_name=testing, busy_timeout=5.0 secs, pragma_commands={
Storage::backend_opened, Storage::STORAGE_BACKEND_SQLITE, [serializer=Storage::STORAGE_SERIALIZER_JSON, forced_sync=F, sqlite=[database_path=test.sqlite, table_name=testing, busy_timeout=5.0 secs, pragma_commands={
[integrity_check] = ,
[journal_mode] = WAL,
[synchronous] = normal,

View file

@ -2,7 +2,7 @@
open result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=<opaque of BackendHandleVal>]
erase result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=<uninitialized>]
get result, [code=Storage::KEY_NOT_FOUND, error_str=<uninitialized>, value=<uninitialized>]
Storage::backend_opened, Storage::STORAGE_BACKEND_SQLITE, [serializer=Storage::STORAGE_SERIALIZER_JSON, sqlite=[database_path=storage-test.sqlite, table_name=testing, busy_timeout=5.0 secs, pragma_commands={
Storage::backend_opened, Storage::STORAGE_BACKEND_SQLITE, [serializer=Storage::STORAGE_SERIALIZER_JSON, forced_sync=F, sqlite=[database_path=storage-test.sqlite, table_name=testing, busy_timeout=5.0 secs, pragma_commands={
[integrity_check] = ,
[journal_mode] = WAL,
[synchronous] = normal,

View file

@ -68,7 +68,7 @@ event zeek_init()
if ( get_res$code == Storage::SUCCESS && get_res?$value )
print "get result same as inserted", value == ( get_res$value as string );
schedule 100 msec { print_metrics_and_close() };
event print_metrics_and_close();
}
timeout 5sec
{

View file

@ -0,0 +1,97 @@
# @TEST-DOC: Tests basic Redis storage backend functions in async mode when using the forced_synchronous flag.
# @TEST-REQUIRES: have-redis
# @TEST-PORT: REDIS_PORT
# @TEST-EXEC: btest-bg-run redis-server run-redis-server ${REDIS_PORT%/tcp}
# @TEST-EXEC: zeek -b %INPUT | sed "s|-${REDIS_PORT%/tcp}-|-XXXX-|g" > out
# @TEST-EXEC: btest-bg-wait -k 0
# @TEST-EXEC: btest-diff out
@load base/frameworks/storage/async
@load policy/frameworks/storage/backend/redis
@load base/frameworks/telemetry
# Make sure the telemetry output is in a fixed order.
redef running_under_test = T;
redef exit_only_after_terminate = T;
global b : opaque of Storage::BackendHandle;
event print_metrics_and_close()
{
print "";
print "Post-operation metrics:";
local storage_metrics = Telemetry::collect_metrics("zeek", "storage*");
for (_, m in storage_metrics)
{
print m$opts$metric_type, m$opts$prefix, m$opts$name, m$label_names, m$label_values, m$value;
}
print "";
when [] ( local close_res = Storage::Async::close_backend(b) )
{
print "close result", close_res;
terminate();
}
timeout 5sec
{
print "close result", close_res;
terminate();
}
}
event zeek_init()
{
local opts: Storage::BackendOptions;
opts$forced_sync = T;
opts$redis = [ $server_host="127.0.0.1", $server_port=to_port(getenv(
"REDIS_PORT")), $key_prefix="testing" ];
local key = "key1234";
local value = "value5678";
when [opts, key, value] ( local open_res = Storage::Async::open_backend(
Storage::STORAGE_BACKEND_REDIS, opts, string, string) )
{
print "open result", open_res;
b = open_res$value;
when [key, value] ( local put_res = Storage::Async::put(b, [ $key=key,
$value=value ]) )
{
print "put result", put_res;
when [key, value] ( local get_res = Storage::Async::get(b, key) )
{
print "get result", get_res;
if ( get_res$code == Storage::SUCCESS && get_res?$value )
print "get result same as inserted", value == ( get_res$value as string );
event print_metrics_and_close();
}
timeout 5sec
{
print "get request timed out";
terminate();
}
print "inner get done";
}
timeout 5sec
{
print "put request timed out";
terminate();
}
print "inner put done";
}
timeout 5sec
{
print "open request timed out";
terminate();
}
print "outer open done";
}