Implement Storage::backend_opened and Storage::backend_lost events

This commit is contained in:
Tim Wojtulewicz 2025-03-03 18:19:52 -07:00
parent a99a13dc4c
commit cc7b2dc890
10 changed files with 117 additions and 15 deletions

View file

@ -5,6 +5,7 @@
#include "zeek/Trigger.h" #include "zeek/Trigger.h"
#include "zeek/broker/Data.h" #include "zeek/broker/Data.h"
#include "zeek/storage/ReturnCode.h" #include "zeek/storage/ReturnCode.h"
#include "zeek/storage/storage.bif.h"
namespace zeek::storage { namespace zeek::storage {
@ -68,6 +69,10 @@ OpenResultCallback::OpenResultCallback(zeek::detail::trigger::TriggerPtr trigger
: ResultCallback(std::move(trigger), assoc), backend(std::move(backend)) {} : ResultCallback(std::move(trigger), assoc), backend(std::move(backend)) {}
void OpenResultCallback::Complete(OperationResult res) { void OpenResultCallback::Complete(OperationResult res) {
if ( res.code == ReturnCode::SUCCESS ) {
backend->backend->EnqueueBackendOpened();
}
// If this is a sync callback, there isn't a trigger to process. Store the result and bail. Always // If this is a sync callback, there isn't a trigger to process. Store the result and bail. Always
// set result's value to the backend pointer so that it comes across in the result. This ensures // set result's value to the backend pointer so that it comes across in the result. This ensures
// the handle is always available in the result even on failures. // the handle is always available in the result even on failures.
@ -94,6 +99,7 @@ void OpenResultCallback::Complete(OperationResult res) {
OperationResult Backend::Open(RecordValPtr options, TypePtr kt, TypePtr vt, OpenResultCallback* cb) { OperationResult Backend::Open(RecordValPtr options, TypePtr kt, TypePtr vt, OpenResultCallback* cb) {
key_type = std::move(kt); key_type = std::move(kt);
val_type = std::move(vt); val_type = std::move(vt);
backend_options = options;
auto ret = DoOpen(std::move(options), cb); auto ret = DoOpen(std::move(options), cb);
if ( ! ret.value ) if ( ! ret.value )
@ -153,6 +159,15 @@ void Backend::CompleteCallback(ResultCallback* cb, const OperationResult& data)
} }
} }
void Backend::EnqueueBackendOpened() {
event_mgr.Enqueue(Storage::backend_opened, make_intrusive<StringVal>(Tag()), backend_options);
}
void Backend::EnqueueBackendLost(std::string_view reason) {
event_mgr.Enqueue(Storage::backend_lost, make_intrusive<StringVal>(Tag()), backend_options,
make_intrusive<StringVal>(reason));
}
zeek::OpaqueTypePtr detail::backend_opaque; zeek::OpaqueTypePtr detail::backend_opaque;
IMPLEMENT_OPAQUE_VALUE(detail::BackendHandleVal) IMPLEMENT_OPAQUE_VALUE(detail::BackendHandleVal)

View file

@ -114,10 +114,15 @@ public:
*/ */
virtual void Poll() {} virtual void Poll() {}
const RecordValPtr& Options() const { return backend_options; }
protected: protected:
// Allow the manager to call Open/Close. // Allow the manager to call Open/Close.
friend class storage::Manager; friend class storage::Manager;
// Allow OpenResultCallback to call EnqueueConnectionEstablished.
friend class storage::OpenResultCallback;
/** /**
* Constructor * Constructor
* *
@ -183,10 +188,24 @@ protected:
*/ */
virtual void Expire() {} virtual void Expire() {}
/**
* Enqueues the Storage::backend_opened event. This is called automatically
* when an OpenResultCallback is completed successfully.
*/
void EnqueueBackendOpened();
/**
* Enqueues the Storage::backend_lost event with an optional reason
* string. This should be called by the backends whenever they lose their
* connection.
*/
void EnqueueBackendLost(std::string_view reason);
void CompleteCallback(ResultCallback* cb, const OperationResult& data) const; void CompleteCallback(ResultCallback* cb, const OperationResult& data) const;
TypePtr key_type; TypePtr key_type;
TypePtr val_type; TypePtr val_type;
RecordValPtr backend_options;
std::string tag; std::string tag;

View file

@ -8,6 +8,7 @@
#include "zeek/Val.h" #include "zeek/Val.h"
#include "zeek/iosource/Manager.h" #include "zeek/iosource/Manager.h"
#include "zeek/storage/ReturnCode.h" #include "zeek/storage/ReturnCode.h"
#include "zeek/storage/storage.bif.h"
#include "hiredis/adapters/poll.h" #include "hiredis/adapters/poll.h"
#include "hiredis/async.h" #include "hiredis/async.h"
@ -481,7 +482,7 @@ void Redis::OnConnect(int status) {
if ( status == REDIS_OK ) { if ( status == REDIS_OK ) {
connected = true; connected = true;
CompleteCallback(open_cb, {ReturnCode::SUCCESS}); CompleteCallback(open_cb, {ReturnCode::SUCCESS});
// TODO: post connect event // The connection_established event is sent via the open callback handler.
return; return;
} }
@ -493,17 +494,14 @@ void Redis::OnConnect(int status) {
void Redis::OnDisconnect(int status) { void Redis::OnDisconnect(int status) {
DBG_LOG(DBG_STORAGE, "Redis backend: disconnection event"); DBG_LOG(DBG_STORAGE, "Redis backend: disconnection event");
--active_ops; --active_ops;
if ( status == REDIS_OK ) {
// TODO: this was an intentional disconnect, nothing to do?
}
else {
// TODO: this was unintentional, should we reconnect?
// TODO: post disconnect event
}
connected = false; connected = false;
if ( status == REDIS_ERR )
EnqueueBackendLost(async_ctx->errstr);
else
EnqueueBackendLost("Client disconnected");
} }
void Redis::ProcessFd(int fd, int flags) { void Redis::ProcessFd(int fd, int flags) {

View file

@ -45,11 +45,13 @@ static IntrusivePtr<storage::detail::BackendHandleVal> make_backend_handle(Val*
module Storage; module Storage;
# Generated when a new backend connection is opened ## Generated automatically when a new backend connection is opened successfully.
event Storage::backend_opened%(%); event Storage::backend_opened%(tag: string, options: any%);
# Generated when a backend connection is lost ## May be generated when a backend connection is lost, both normally and
event Storage::backend_lost%(%); ## unexpectedly. This event depends on the backends implementing handling for
## it, and is not generated automatically by the storage framework.
event Storage::backend_lost%(tag: string, options: any, reason: string%);
module Storage::Async; module Storage::Async;

View file

@ -0,0 +1,4 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
open_result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=<opaque of BackendHandleVal>]
Storage::backend_opened, Storage::REDIS, [redis=[server_host=127.0.0.1, server_port=xxxx/tcp, server_unix_socket=<uninitialized>, key_prefix=testing]]
Storage::backend_lost, Storage::REDIS, [redis=[server_host=127.0.0.1, server_port=xxxx/tcp, server_unix_socket=<uninitialized>, key_prefix=testing]], Server closed the connection

View file

@ -6,3 +6,5 @@ get result same as inserted, T
overwrite put result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=<uninitialized>] overwrite put result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=<uninitialized>]
get result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=value5678] get result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=value5678]
get result same as inserted, T get result same as inserted, T
Storage::backend_opened, Storage::REDIS, [redis=[server_host=127.0.0.1, server_port=xxxx/tcp, server_unix_socket=<uninitialized>, key_prefix=testing]]
Storage::backend_lost, Storage::REDIS, [redis=[server_host=127.0.0.1, server_port=xxxx/tcp, server_unix_socket=<uninitialized>, key_prefix=testing]], Client disconnected

View file

@ -1,4 +1,9 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
Storage::backend_opened, Storage::SQLITE, [sqlite=[database_path=test.sqlite, table_name=testing, tuning_params={
[synchronous] = normal,
[temp_store] = memory,
[journal_mode] = WAL
}]]
open result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=<opaque of BackendHandleVal>] open result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=<opaque of BackendHandleVal>]
put result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=<uninitialized>] put result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=<uninitialized>]
get result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=value5678] get result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=value5678]

View file

@ -0,0 +1,44 @@
# @TEST-DOC: Tests basic Redis storage backend functions in sync mode, including overwriting
# @TEST-REQUIRES: have-redis
# @TEST-PORT: REDIS_PORT
# @TEST-EXEC: btest-bg-run redis-server run-redis-server ${REDIS_PORT%/tcp}
# @TEST-EXEC: zeek -b %INPUT | sed 's|=[0-9]*/tcp|=xxxx/tcp|g' > out
# @TEST-EXEC: btest-bg-wait -k 0
# @TEST-EXEC: btest-diff out
@load base/frameworks/storage/sync
@load policy/frameworks/storage/backend/redis
redef exit_only_after_terminate = T;
# Create a typename here that can be passed down into open_backend()
type str: string;
event Storage::backend_opened(tag: string, config: any) {
print "Storage::backend_opened", tag, config;
}
event Storage::backend_lost(tag: string, config: any, reason: string) {
print "Storage::backend_lost", tag, config, reason;
terminate();
}
event zeek_init()
{
local opts: Storage::BackendOptions;
opts$redis = [ $server_host="127.0.0.1", $server_port=to_port(getenv(
"REDIS_PORT")), $key_prefix="testing" ];
local key = "key1234";
local value = "value1234";
local open_res = Storage::Sync::open_backend(Storage::REDIS, opts, str, str);
print "open_result", open_res;
# Kill the redis server so the backend will disconnect and fire the backend_lost event.
system("cat redis-server/redis.pid");
system("kill $(cat redis-server/redis.pid)");
}

View file

@ -4,7 +4,7 @@
# @TEST-PORT: REDIS_PORT # @TEST-PORT: REDIS_PORT
# @TEST-EXEC: btest-bg-run redis-server run-redis-server ${REDIS_PORT%/tcp} # @TEST-EXEC: btest-bg-run redis-server run-redis-server ${REDIS_PORT%/tcp}
# @TEST-EXEC: zeek -b %INPUT > out # @TEST-EXEC: zeek -b %INPUT | sed 's|=[0-9]*/tcp|=xxxx/tcp|g' > out
# @TEST-EXEC: btest-bg-wait -k 0 # @TEST-EXEC: btest-bg-wait -k 0
# @TEST-EXEC: btest-diff out # @TEST-EXEC: btest-diff out
@ -15,6 +15,15 @@
# Create a typename here that can be passed down into open_backend() # Create a typename here that can be passed down into open_backend()
type str: string; type str: string;
event Storage::backend_opened(tag: string, config: any) {
print "Storage::backend_opened", tag, config;
}
event Storage::backend_lost(tag: string, config: any, reason: string) {
print "Storage::backend_lost", tag, config, reason;
terminate();
}
event zeek_init() event zeek_init()
{ {
local opts: Storage::BackendOptions; local opts: Storage::BackendOptions;

View file

@ -11,6 +11,10 @@ redef exit_only_after_terminate = T;
# Create a typename here that can be passed down into get(). # Create a typename here that can be passed down into get().
type str: string; type str: string;
event Storage::backend_opened(tag: string, config: any) {
print "Storage::backend_opened", tag, config;
}
event zeek_init() event zeek_init()
{ {
# Create a database file in the .tmp directory with a 'testing' table # Create a database file in the .tmp directory with a 'testing' table