Allow opening and closing backends to be async

This commit is contained in:
Tim Wojtulewicz 2025-01-16 17:08:23 -07:00
parent ea87c773cd
commit 4695060d75
16 changed files with 271 additions and 107 deletions

View file

@ -42,17 +42,29 @@ export {
## validation of values passed to :zeek:see:`Storage::put` as well as ## validation of values passed to :zeek:see:`Storage::put` as well as
## for type conversions for return values from :zeek:see:`Storage::get`. ## for type conversions for return values from :zeek:see:`Storage::get`.
## ##
## async_mode: Indicates whether this operation should happen asynchronously. If
## this is T, the call must happen as part of a :zeek:see:`when`
## statement. This flag is overridden and set to F when reading pcaps,
## since time won't move forward the same as when caputring live
## traffic.
##
## Returns: A handle to the new backend connection, or ``F`` if the connection ## Returns: A handle to the new backend connection, or ``F`` if the connection
## failed. ## failed.
global open_backend: function(btype: Storage::Backend, options: any, key_type: any, global open_backend: function(btype: Storage::Backend, options: any, key_type: any,
val_type: any): opaque of Storage::BackendHandle; val_type: any, async_mode: bool &default=F): opaque of Storage::BackendHandle;
## Closes an existing backend connection. ## Closes an existing backend connection.
## ##
## backend: A handle to a backend connection. ## backend: A handle to a backend connection.
## ##
## async_mode: Indicates whether this operation should happen asynchronously. If
## this is T, the call must happen as part of a :zeek:see:`when`
## statement. This flag is overridden and set to F when reading pcaps,
## since time won't move forward the same as when caputring live
## traffic.
##
## Returns: A boolean indicating success or failure of the operation. ## Returns: A boolean indicating success or failure of the operation.
global close_backend: function(backend: opaque of Storage::BackendHandle): bool; global close_backend: function(backend: opaque of Storage::BackendHandle, async_mode: bool &default=F): bool;
## Inserts a new entry into a backend. ## Inserts a new entry into a backend.
## ##
@ -107,14 +119,14 @@ export {
async_mode: bool &default=T): bool; async_mode: bool &default=T): bool;
} }
function open_backend(btype: Storage::Backend, options: any, key_type: any, val_type: any): opaque of Storage::BackendHandle function open_backend(btype: Storage::Backend, options: any, key_type: any, val_type: any, async_mode: bool &default=F): opaque of Storage::BackendHandle
{ {
return Storage::__open_backend(btype, options, key_type, val_type); return Storage::__open_backend(btype, options, key_type, val_type, async_mode);
} }
function close_backend(backend: opaque of Storage::BackendHandle): bool function close_backend(backend: opaque of Storage::BackendHandle, async_mode: bool &default=F): bool
{ {
return Storage::__close_backend(backend); return Storage::__close_backend(backend, async_mode);
} }
function put(backend: opaque of Storage::BackendHandle, args: Storage::PutArgs): bool function put(backend: opaque of Storage::BackendHandle, args: Storage::PutArgs): bool

View file

@ -6,12 +6,28 @@
#include "zeek/RunState.h" #include "zeek/RunState.h"
#include "zeek/Trigger.h" #include "zeek/Trigger.h"
#include "zeek/broker/Data.h" #include "zeek/broker/Data.h"
#include "zeek/storage/Manager.h"
namespace zeek::storage { namespace zeek::storage {
ErrorResultCallback::ErrorResultCallback(zeek::detail::trigger::TriggerPtr trigger, const void* assoc) ResultCallback::ResultCallback(zeek::detail::trigger::TriggerPtr trigger, const void* assoc)
: trigger(std::move(trigger)), assoc(assoc) {} : trigger(std::move(trigger)), assoc(assoc) {}
ErrorResultCallback::~ErrorResultCallback() {}
ResultCallback::~ResultCallback() {}
void ResultCallback::Timeout() {
auto v = make_intrusive<StringVal>("Timeout during request");
trigger->Cache(assoc, v.get());
}
void ResultCallback::ValComplete(Val* result) {
trigger->Cache(assoc, result);
Unref(result);
trigger->Release();
}
ErrorResultCallback::ErrorResultCallback(IntrusivePtr<zeek::detail::trigger::Trigger> trigger, const void* assoc)
: ResultCallback(std::move(trigger), assoc) {}
void ErrorResultCallback::Complete(const ErrorResult& res) { void ErrorResultCallback::Complete(const ErrorResult& res) {
zeek::Val* result; zeek::Val* result;
@ -21,19 +37,11 @@ void ErrorResultCallback::Complete(const ErrorResult& res) {
else else
result = val_mgr->Bool(true).get(); result = val_mgr->Bool(true).get();
trigger->Cache(assoc, result); ValComplete(result);
Unref(result);
trigger->Release();
}
void ErrorResultCallback::Timeout() {
auto v = make_intrusive<StringVal>("Timeout during request");
trigger->Cache(assoc, v.get());
} }
ValResultCallback::ValResultCallback(zeek::detail::trigger::TriggerPtr trigger, const void* assoc) ValResultCallback::ValResultCallback(zeek::detail::trigger::TriggerPtr trigger, const void* assoc)
: trigger(std::move(trigger)), assoc(assoc) {} : ResultCallback(std::move(trigger), assoc) {}
ValResultCallback::~ValResultCallback() {}
void ValResultCallback::Complete(const ValResult& res) { void ValResultCallback::Complete(const ValResult& res) {
zeek::Val* result; zeek::Val* result;
@ -45,21 +53,50 @@ void ValResultCallback::Complete(const ValResult& res) {
else else
result = new StringVal(res.error()); result = new StringVal(res.error());
trigger->Cache(assoc, result); ValComplete(result);
Unref(result);
trigger->Release();
} }
void ValResultCallback::Timeout() { OpenResultCallback::OpenResultCallback(IntrusivePtr<zeek::detail::trigger::Trigger> trigger, const void* assoc,
auto v = make_intrusive<StringVal>("Timeout during request"); detail::BackendHandleVal* backend)
trigger->Cache(assoc, v.get()); : ResultCallback(std::move(trigger), assoc), backend(backend) {}
void OpenResultCallback::Complete(const ErrorResult& res) {
zeek::Val* result;
if ( res ) {
result = new StringVal(res.value());
}
else {
storage_mgr->AddBackendToMap(backend->backend);
result = backend;
}
ValComplete(result);
} }
ErrorResult Backend::Open(RecordValPtr options, TypePtr kt, TypePtr vt) { ErrorResult Backend::Open(RecordValPtr options, TypePtr kt, TypePtr vt, OpenResultCallback* cb) {
key_type = std::move(kt); key_type = std::move(kt);
val_type = std::move(vt); val_type = std::move(vt);
return DoOpen(std::move(options)); auto res = DoOpen(std::move(options));
if ( (! native_async || zeek::run_state::reading_traces) && cb ) {
cb->Complete(res);
delete cb;
}
return res;
}
ErrorResult Backend::Close(ErrorResultCallback* cb) {
auto res = DoClose(cb);
if ( (! native_async || zeek::run_state::reading_traces) && cb ) {
cb->Complete(res);
delete cb;
}
return res;
} }
ErrorResult Backend::Put(ValPtr key, ValPtr value, bool overwrite, double expiration_time, ErrorResultCallback* cb) { ErrorResult Backend::Put(ValPtr key, ValPtr value, bool overwrite, double expiration_time, ErrorResultCallback* cb) {

View file

@ -24,32 +24,39 @@ using ErrorResult = std::optional<std::string>;
// string value will store an error message if the result is null. // string value will store an error message if the result is null.
using ValResult = zeek::expected<ValPtr, std::string>; using ValResult = zeek::expected<ValPtr, std::string>;
// A callback result that returns an ErrorResult.
class ErrorResultCallback { // Base callback object for async operations. This is just here to allow some
// code reuse in the other callback methods.
class ResultCallback {
public: public:
ErrorResultCallback(zeek::detail::trigger::TriggerPtr trigger, const void* assoc); ResultCallback(IntrusivePtr<zeek::detail::trigger::Trigger> trigger, const void* assoc);
~ErrorResultCallback(); virtual ~ResultCallback();
void Complete(const ErrorResult& res);
void Timeout(); void Timeout();
protected:
void ValComplete(Val* result);
private: private:
zeek::detail::trigger::TriggerPtr trigger; IntrusivePtr<zeek::detail::trigger::Trigger> trigger;
const void* assoc; const void* assoc;
}; };
// A callback result that returns an ErrorResult.
class ErrorResultCallback : public ResultCallback {
public:
ErrorResultCallback(zeek::detail::trigger::TriggerPtr trigger, const void* assoc);
virtual void Complete(const ErrorResult& res);
};
// A callback result that returns a ValResult. // A callback result that returns a ValResult.
class ValResultCallback { class ValResultCallback : public ResultCallback {
public: public:
ValResultCallback(zeek::detail::trigger::TriggerPtr trigger, const void* assoc); ValResultCallback(zeek::detail::trigger::TriggerPtr trigger, const void* assoc);
~ValResultCallback();
void Complete(const ValResult& res); void Complete(const ValResult& res);
void Timeout();
private:
zeek::detail::trigger::TriggerPtr trigger;
const void* assoc;
}; };
class OpenResultCallback;
class Backend : public zeek::Obj { class Backend : public zeek::Obj {
public: public:
/** /**
@ -89,7 +96,6 @@ public:
* @param cb An optional callback object if being called via an async context. * @param cb An optional callback object if being called via an async context.
* @return An optional value potentially containing an error string if * @return An optional value potentially containing an error string if
* needed. Will be unset if the operation succeeded. * needed. Will be unset if the operation succeeded.
* possible error string if the operation failed.
*/ */
ErrorResult Erase(ValPtr key, ErrorResultCallback* cb = nullptr); ErrorResult Erase(ValPtr key, ErrorResultCallback* cb = nullptr);
@ -128,21 +134,30 @@ protected:
* validation of types. * validation of types.
* @param vt The script-side type of the values stored in the backend. Used for * @param vt The script-side type of the values stored in the backend. Used for
* validation of types and conversion during retrieval. * validation of types and conversion during retrieval.
* @param cb An optional callback object if being called via an async context.
* @return An optional value potentially containing an error string if * @return An optional value potentially containing an error string if
* needed. Will be unset if the operation succeeded. * needed. Will be unset if the operation succeeded.
*/ */
ErrorResult Open(RecordValPtr options, TypePtr kt, TypePtr vt); ErrorResult Open(RecordValPtr options, TypePtr kt, TypePtr vt, OpenResultCallback* cb = nullptr);
/** /**
* Finalizes the backend when it's being closed. Can be overridden by * Finalizes the backend when it's being closed.
* derived classes. *
* @param cb An optional callback object if being called via an async context.
* @return An optional value potentially containing an error string if
* needed. Will be unset if the operation succeeded.
*/ */
virtual void Close() {} ErrorResult Close(ErrorResultCallback* cb = nullptr);
/** /**
* The workhorse method for Open(). * The workhorse method for Open().
*/ */
virtual ErrorResult DoOpen(RecordValPtr options) = 0; virtual ErrorResult DoOpen(RecordValPtr options, OpenResultCallback* cb = nullptr) = 0;
/**
* The workhorse method for Close().
*/
virtual ErrorResult DoClose(ErrorResultCallback* cb = nullptr) = 0;
/** /**
* The workhorse method for Put(). * The workhorse method for Put().
@ -196,4 +211,16 @@ protected:
}; };
} // namespace detail } // namespace detail
// A callback for the Backend::Open() method that returns an error or a backend handle.
class OpenResultCallback : public ResultCallback {
public:
OpenResultCallback(IntrusivePtr<zeek::detail::trigger::Trigger> trigger, const void* assoc,
detail::BackendHandleVal* backend);
void Complete(const ErrorResult& res);
private:
detail::BackendHandleVal* backend;
};
} // namespace zeek::storage } // namespace zeek::storage

View file

@ -23,8 +23,7 @@ void Manager::InitPostScript() {
StartExpirationTimer(); StartExpirationTimer();
} }
zeek::expected<BackendPtr, std::string> Manager::OpenBackend(const Tag& type, RecordValPtr options, TypePtr key_type, zeek::expected<BackendPtr, std::string> Manager::Instantiate(const Tag& type) {
TypePtr val_type) {
Component* c = Lookup(type); Component* c = Lookup(type);
if ( ! c ) { if ( ! c ) {
return zeek::unexpected<std::string>( return zeek::unexpected<std::string>(
@ -46,32 +45,37 @@ zeek::expected<BackendPtr, std::string> Manager::OpenBackend(const Tag& type, Re
util::fmt("Failed to instantiate backend %s", GetComponentName(type).c_str())); util::fmt("Failed to instantiate backend %s", GetComponentName(type).c_str()));
} }
if ( auto res = bp->Open(std::move(options), std::move(key_type), std::move(val_type)); res.has_value() ) {
return zeek::unexpected<std::string>(
util::fmt("Failed to open backend %s: %s", GetComponentName(type).c_str(), res.value().c_str()));
}
// TODO: post Storage::backend_opened event
{
std::unique_lock<std::mutex> lk(backends_mtx);
backends.push_back(bp);
}
return bp; return bp;
} }
void Manager::CloseBackend(BackendPtr backend) { ErrorResult Manager::OpenBackend(BackendPtr backend, RecordValPtr options, TypePtr key_type, TypePtr val_type,
OpenResultCallback* cb) {
if ( auto res = backend->Open(std::move(options), std::move(key_type), std::move(val_type), cb); res.has_value() ) {
return util::fmt("Failed to open backend %s: %s", backend->Tag(), res.value().c_str());
}
if ( ! cb )
AddBackendToMap(std::move(backend));
// TODO: post Storage::backend_opened event
return std::nullopt;
}
ErrorResult Manager::CloseBackend(BackendPtr backend, ErrorResultCallback* cb) {
// Remove from the list always, even if the close may fail below and even in an async context.
{ {
std::unique_lock<std::mutex> lk(backends_mtx); std::unique_lock<std::mutex> lk(backends_mtx);
auto it = std::find(backends.begin(), backends.end(), backend); auto it = std::find(backends.begin(), backends.end(), backend);
if ( it == backends.end() ) if ( it != backends.end() )
return; backends.erase(it);
backends.erase(it);
} }
backend->Close(); if ( auto res = backend->Close(cb); res.has_value() ) {
return util::fmt("Failed to close backend %s: %s", backend->Tag(), res.value().c_str());
}
return std::nullopt;
// TODO: post Storage::backend_lost event // TODO: post Storage::backend_lost event
} }
@ -88,4 +92,9 @@ void Manager::StartExpirationTimer() {
new detail::ExpirationTimer(run_state::network_time + zeek::BifConst::Storage::expire_interval)); new detail::ExpirationTimer(run_state::network_time + zeek::BifConst::Storage::expire_interval));
} }
void Manager::AddBackendToMap(BackendPtr backend) {
std::unique_lock<std::mutex> lk(backends_mtx);
backends.push_back(std::move(backend));
}
} // namespace zeek::storage } // namespace zeek::storage

View file

@ -34,9 +34,20 @@ public:
void InitPostScript(); void InitPostScript();
/** /**
* Opens a new storage backend. * Instantiates a new backend object. The backend will be in a closed state, and OpenBackend()
* will need to be called to fully initialize it.
* *
* @param type The tag for the type of backend being opened. * @param type The tag for the type of backend being opened.
* @return A std::expected containing either a valid BackendPtr with the
* result of the operation or a string containing an error message for
* failure.
*/
zeek::expected<BackendPtr, std::string> Instantiate(const Tag& type);
/**
* Opens a new storage backend.
*
* @param backend The backend object to open.
* @param options A record val representing the configuration for this type of * @param options A record val representing the configuration for this type of
* backend. * backend.
* @param key_type The script-side type of the keys stored in the backend. Used for * @param key_type The script-side type of the keys stored in the backend. Used for
@ -46,19 +57,22 @@ public:
* @return An optional value potentially containing an error string if needed. Will be * @return An optional value potentially containing an error string if needed. Will be
* unset if the operation succeeded. * unset if the operation succeeded.
*/ */
zeek::expected<BackendPtr, std::string> OpenBackend(const Tag& type, RecordValPtr options, TypePtr key_type, ErrorResult OpenBackend(BackendPtr backend, RecordValPtr options, TypePtr key_type, TypePtr val_type,
TypePtr val_type); OpenResultCallback* cb = nullptr);
/** /**
* Closes a storage backend. * Closes a storage backend.
*/ */
void CloseBackend(BackendPtr backend); ErrorResult CloseBackend(BackendPtr backend, ErrorResultCallback* cb = nullptr);
protected: protected:
friend class storage::detail::ExpirationTimer; friend class storage::detail::ExpirationTimer;
void Expire(); void Expire();
void StartExpirationTimer(); void StartExpirationTimer();
friend class storage::OpenResultCallback;
void AddBackendToMap(BackendPtr backend);
private: private:
std::vector<BackendPtr> backends; std::vector<BackendPtr> backends;
std::mutex backends_mtx; std::mutex backends_mtx;

View file

@ -88,7 +88,7 @@ storage::BackendPtr Redis::Instantiate(std::string_view tag) { return make_intru
* implementation must call \a Opened(); if not, it must call Error() * implementation must call \a Opened(); if not, it must call Error()
* with a corresponding message. * with a corresponding message.
*/ */
ErrorResult Redis::DoOpen(RecordValPtr options) { ErrorResult Redis::DoOpen(RecordValPtr options, OpenResultCallback* cb) {
// When reading traces we disable storage async mode globally (see src/storage/Backend.cc) since // When reading traces we disable storage async mode globally (see src/storage/Backend.cc) since
// time moves forward based on the pcap and not based on real time. // time moves forward based on the pcap and not based on real time.
async_mode = options->GetField<BoolVal>("async_mode")->Get() && ! zeek::run_state::reading_traces; async_mode = options->GetField<BoolVal>("async_mode")->Get() && ! zeek::run_state::reading_traces;
@ -137,6 +137,9 @@ ErrorResult Redis::DoOpen(RecordValPtr options) {
return errmsg; return errmsg;
} }
// TODO: Sort out how to pass the zeek callbacks for both open/done to the async
// callbacks from hiredis so they can return errors.
// The context is passed to the handler methods. Setting this data object // The context is passed to the handler methods. Setting this data object
// pointer allows us to look up the backend in the handlers. // pointer allows us to look up the backend in the handlers.
async_ctx->data = this; async_ctx->data = this;
@ -175,7 +178,7 @@ ErrorResult Redis::DoOpen(RecordValPtr options) {
/** /**
* Finalizes the backend when it's being closed. * Finalizes the backend when it's being closed.
*/ */
void Redis::Close() { ErrorResult Redis::DoClose(ErrorResultCallback* cb) {
connected = false; connected = false;
if ( async_mode ) { if ( async_mode ) {
@ -191,6 +194,8 @@ void Redis::Close() {
redisFree(ctx); redisFree(ctx);
ctx = nullptr; ctx = nullptr;
} }
return std::nullopt;
} }
/** /**

View file

@ -30,12 +30,12 @@ public:
/** /**
* Called by the manager system to open the backend. * Called by the manager system to open the backend.
*/ */
ErrorResult DoOpen(RecordValPtr options) override; ErrorResult DoOpen(RecordValPtr options, OpenResultCallback* cb = nullptr) override;
/** /**
* Finalizes the backend when it's being closed. * Finalizes the backend when it's being closed.
*/ */
void Close() override; ErrorResult DoClose(ErrorResultCallback* cb = nullptr) override;
/** /**
* Returns whether the backend is opened. * Returns whether the backend is opened.

View file

@ -13,7 +13,7 @@ storage::BackendPtr SQLite::Instantiate(std::string_view tag) { return make_intr
/** /**
* Called by the manager system to open the backend. * Called by the manager system to open the backend.
*/ */
ErrorResult SQLite::DoOpen(RecordValPtr options) { ErrorResult SQLite::DoOpen(RecordValPtr options, OpenResultCallback* cb) {
if ( sqlite3_threadsafe() == 0 ) { if ( sqlite3_threadsafe() == 0 ) {
std::string res = std::string res =
"SQLite reports that it is not threadsafe. Zeek needs a threadsafe version of " "SQLite reports that it is not threadsafe. Zeek needs a threadsafe version of "
@ -104,7 +104,9 @@ ErrorResult SQLite::DoOpen(RecordValPtr options) {
/** /**
* Finalizes the backend when it's being closed. * Finalizes the backend when it's being closed.
*/ */
void SQLite::Close() { ErrorResult SQLite::DoClose(ErrorResultCallback* cb) {
ErrorResult err_res;
if ( db ) { if ( db ) {
for ( const auto& [k, stmt] : prepared_stmts ) { for ( const auto& [k, stmt] : prepared_stmts ) {
sqlite3_finalize(stmt); sqlite3_finalize(stmt);
@ -114,15 +116,19 @@ void SQLite::Close() {
char* errmsg; char* errmsg;
if ( int res = sqlite3_exec(db, "pragma optimize", NULL, NULL, &errmsg); res != SQLITE_OK ) { if ( int res = sqlite3_exec(db, "pragma optimize", NULL, NULL, &errmsg); res != SQLITE_OK ) {
Error(util::fmt("Sqlite failed to optimize at shutdown: %s", errmsg)); err_res = util::fmt("Sqlite failed to optimize at shutdown: %s", errmsg);
sqlite3_free(&errmsg); sqlite3_free(&errmsg);
} }
if ( int res = sqlite3_close_v2(db); res != SQLITE_OK ) if ( int res = sqlite3_close_v2(db); res != SQLITE_OK ) {
Error("Sqlite could not close connection"); if ( ! err_res.has_value() )
err_res = "Sqlite could not close connection";
}
db = nullptr; db = nullptr;
} }
return err_res;
} }
/** /**

View file

@ -20,12 +20,12 @@ public:
/** /**
* Called by the manager system to open the backend. * Called by the manager system to open the backend.
*/ */
ErrorResult DoOpen(RecordValPtr options) override; ErrorResult DoOpen(RecordValPtr options, OpenResultCallback* cb = nullptr) override;
/** /**
* Finalizes the backend when it's being closed. * Finalizes the backend when it's being closed.
*/ */
void Close() override; ErrorResult DoClose(ErrorResultCallback* cb = nullptr) override;
/** /**
* Returns whether the backend is opened. * Returns whether the backend is opened.

View file

@ -40,26 +40,46 @@ event Storage::backend_opened%(%);
# Generated when a backend connection is lost # Generated when a backend connection is lost
event Storage::backend_lost%(%); event Storage::backend_lost%(%);
function Storage::__open_backend%(btype: Storage::Backend, options: any, key_type: any, val_type: any%): opaque of Storage::BackendHandle function Storage::__open_backend%(btype: Storage::Backend, options: any, key_type: any, val_type: any, async_mode: bool%): opaque of Storage::BackendHandle
%{ %{
auto btype_val = IntrusivePtr<EnumVal>{NewRef{}, btype->AsEnumVal()}; auto btype_val = IntrusivePtr<EnumVal>{NewRef{}, btype->AsEnumVal()};
Tag tag{btype_val}; Tag tag{btype_val};
auto kt = key_type->AsTypeVal()->GetType()->AsTypeType()->GetType(); auto b = storage_mgr->Instantiate(tag);
auto vt = val_type->AsTypeVal()->GetType()->AsTypeType()->GetType();
auto options_val = IntrusivePtr<RecordVal>{NewRef{}, options->AsRecordVal()};
auto b = storage_mgr->OpenBackend(tag, options_val, kt, vt);
if ( ! b.has_value() ) { if ( ! b.has_value() ) {
emit_builtin_error(b.error().c_str()); emit_builtin_error(b.error().c_str());
return val_mgr->Bool(false); return val_mgr->Bool(false);
} }
return make_intrusive<storage::detail::BackendHandleVal>(b.value()); auto bo = make_intrusive<storage::detail::BackendHandleVal>(b.value());
OpenResultCallback* cb = nullptr;
if ( async_mode ) {
auto trigger = init_trigger(frame, b.value());
if ( ! trigger )
return val_mgr->Bool(false);
cb = new OpenResultCallback(trigger, frame->GetTriggerAssoc(), bo.release());
}
auto kt = key_type->AsTypeVal()->GetType()->AsTypeType()->GetType();
auto vt = val_type->AsTypeVal()->GetType()->AsTypeType()->GetType();
auto options_val = IntrusivePtr<RecordVal>{NewRef{}, options->AsRecordVal()};
auto open_res = storage_mgr->OpenBackend(b.value(), options_val, kt, vt, cb);
if ( async_mode )
return nullptr;
if ( open_res.has_value() ) {
emit_builtin_error(open_res.value().c_str());
return val_mgr->Bool(false);
}
return bo;
%} %}
function Storage::__close_backend%(backend: opaque of Storage::BackendHandle%) : bool function Storage::__close_backend%(backend: opaque of Storage::BackendHandle, async_mode: bool%) : bool
%{ %{
auto b = dynamic_cast<storage::detail::BackendHandleVal*>(backend); auto b = dynamic_cast<storage::detail::BackendHandleVal*>(backend);
if ( ! b ) { if ( ! b ) {
@ -70,7 +90,25 @@ function Storage::__close_backend%(backend: opaque of Storage::BackendHandle%) :
// Return true here since the backend is already closed // Return true here since the backend is already closed
return val_mgr->Bool(true); return val_mgr->Bool(true);
storage_mgr->CloseBackend(b->backend); ErrorResultCallback* cb = nullptr;
if ( async_mode ) {
auto trigger = init_trigger(frame, b->backend);
if ( ! trigger )
return val_mgr->Bool(false);
cb = new ErrorResultCallback(trigger, frame->GetTriggerAssoc());
}
auto result = storage_mgr->CloseBackend(b->backend, cb);
if ( async_mode )
return nullptr;
if ( result.has_value() ) {
emit_builtin_error(result.value().c_str());
return val_mgr->Bool(false);
}
return val_mgr->Bool(true); return val_mgr->Bool(true);
%} %}

View file

@ -1,4 +1,4 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
error in <...>/storage.zeek, line 37: Failed to retrieve data: Failed to find key (Storage::get(b, to_any_coerce key, F)) error in <...>/storage.zeek, line 37: Failed to retrieve data: Failed to find key (Storage::get(b, to_any_coerce key, F))
error in <...>/storage.zeek, line 50: Failed to open backend STORAGEDUMMY: open_fail was set to true, returning error (Storage::open_backend(Storage::STORAGEDUMMY, to_any_coerce opts, to_any_coerce str, to_any_coerce str)) error in <...>/storage.zeek, line 50: Failed to open backend Storage::STORAGEDUMMY: open_fail was set to true, returning error (Storage::open_backend(Storage::STORAGEDUMMY, to_any_coerce opts, to_any_coerce str, to_any_coerce str, F))
error in <...>/storage.zeek, line 51: Invalid storage handle (Storage::close_backend(b2) and F) error in <...>/storage.zeek, line 51: Invalid storage handle (Storage::close_backend(b2, F) and F)

View file

@ -1,4 +1,6 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
open successful
put result, T put result, T
get result, value5678 get result, value5678
get result same as inserted, T get result same as inserted, T
closed succesfully

View file

@ -1,3 +1,3 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
error in <...>/sqlite-error-handling.zeek, line 20: Failed to open backend SQLITE: SQLite call failed: unable to open database file (Storage::open_backend(Storage::SQLITE, to_any_coerce opts, to_any_coerce str, to_any_coerce str)) error in <...>/sqlite-error-handling.zeek, line 20: Failed to open backend Storage::SQLITE: SQLite call failed: unable to open database file (Storage::open_backend(Storage::SQLITE, to_any_coerce opts, to_any_coerce str, to_any_coerce str, F))
error in <...>/sqlite-error-handling.zeek, line 28: Failed to store data: type of key passed (count) does not match backend's key type (str) (Storage::put(b, (coerce [$key=bad_key, $value=value, $async_mode=F] to Storage::PutArgs))) error in <...>/sqlite-error-handling.zeek, line 28: Failed to store data: type of key passed (count) does not match backend's key type (str) (Storage::put(b, (coerce [$key=bad_key, $value=value, $async_mode=F] to Storage::PutArgs)))

View file

@ -18,7 +18,7 @@ zeek::storage::BackendPtr StorageDummy::Instantiate(std::string_view tag) {
* implementation must call \a Opened(); if not, it must call Error() * implementation must call \a Opened(); if not, it must call Error()
* with a corresponding message. * with a corresponding message.
*/ */
zeek::storage::ErrorResult StorageDummy::DoOpen(zeek::RecordValPtr options) { zeek::storage::ErrorResult StorageDummy::DoOpen(zeek::RecordValPtr options, zeek::storage::OpenResultCallback* cb) {
bool open_fail = options->GetField<zeek::BoolVal>("open_fail")->Get(); bool open_fail = options->GetField<zeek::BoolVal>("open_fail")->Get();
if ( open_fail ) if ( open_fail )
return "open_fail was set to true, returning error"; return "open_fail was set to true, returning error";
@ -31,7 +31,10 @@ zeek::storage::ErrorResult StorageDummy::DoOpen(zeek::RecordValPtr options) {
/** /**
* Finalizes the backend when it's being closed. * Finalizes the backend when it's being closed.
*/ */
void StorageDummy::Close() { open = false; } zeek::storage::ErrorResult StorageDummy::DoClose(zeek::storage::ErrorResultCallback* cb) {
open = false;
return std::nullopt;
}
/** /**
* The workhorse method for Put(). This must be implemented by plugins. * The workhorse method for Put(). This must be implemented by plugins.

View file

@ -21,12 +21,13 @@ public:
/** /**
* Called by the manager system to open the backend. * Called by the manager system to open the backend.
*/ */
zeek::storage::ErrorResult DoOpen(zeek::RecordValPtr options) override; zeek::storage::ErrorResult DoOpen(zeek::RecordValPtr options,
zeek::storage::OpenResultCallback* cb = nullptr) override;
/** /**
* Finalizes the backend when it's being closed. * Finalizes the backend when it's being closed.
*/ */
void Close() override; zeek::storage::ErrorResult DoClose(zeek::storage::ErrorResultCallback* cb = nullptr) override;
/** /**
* Returns whether the backend is opened. * Returns whether the backend is opened.

View file

@ -22,26 +22,36 @@ event zeek_init() {
# Test inserting/retrieving a key/value pair that we know won't be in # Test inserting/retrieving a key/value pair that we know won't be in
# the backend yet. # the backend yet.
local b = Storage::open_backend(Storage::SQLITE, opts, str, str); when [opts, key, value] ( local b = Storage::open_backend(Storage::SQLITE, opts, str, str, T) ) {
print "open successful";
when [b, key, value] ( local res = Storage::put(b, [$key=key, $value=value]) ) { when [b, key, value] ( local put_res = Storage::put(b, [$key=key, $value=value]) ) {
print "put result", res; print "put result", put_res;
when [b, key, value] ( local res2 = Storage::get(b, key) ) { when [b, key, value] ( local get_res = Storage::get(b, key) ) {
print "get result", res2; print "get result", get_res;
print "get result same as inserted", value == (res2 as string); print "get result same as inserted", value == (get_res as string);
Storage::close_backend(b); when [b] ( local close_res = Storage::close_backend(b, T) ) {
print "closed succesfully";
terminate(); terminate();
} timeout 5 sec {
print "close request timed out";
terminate();
}
}
timeout 5 sec {
print "get requeest timed out";
terminate();
}
} }
timeout 5 sec { timeout 5 sec {
print "get requeest timed out"; print "put request timed out";
terminate(); terminate();
} }
} }
timeout 5 sec { timeout 5 sec {
print "put request timed out"; print "open request timed out";
terminate(); terminate();
} }
} }