Split sync/async handling into the BIF methods

This commit is contained in:
Tim Wojtulewicz 2025-02-19 14:58:59 -07:00
parent c247de8ec3
commit e766af7322
8 changed files with 260 additions and 181 deletions

View file

@ -3,10 +3,8 @@
#include "zeek/storage/Backend.h"
#include "zeek/Desc.h"
#include "zeek/RunState.h"
#include "zeek/Trigger.h"
#include "zeek/broker/Data.h"
#include "zeek/storage/Manager.h"
namespace zeek::storage {
@ -16,84 +14,84 @@ ResultCallback::ResultCallback(zeek::detail::trigger::TriggerPtr trigger, const
ResultCallback::~ResultCallback() {}
void ResultCallback::Timeout() {
auto v = make_intrusive<StringVal>("Timeout during request");
trigger->Cache(assoc, v.get());
if ( ! IsSyncCallback() ) {
auto v = make_intrusive<StringVal>("Timeout during request");
trigger->Cache(assoc, v.get());
}
}
void ResultCallback::ValComplete(Val* result) {
trigger->Cache(assoc, result);
if ( ! IsSyncCallback() ) {
trigger->Cache(assoc, result);
trigger->Release();
}
Unref(result);
trigger->Release();
}
ErrorResultCallback::ErrorResultCallback(IntrusivePtr<zeek::detail::trigger::Trigger> trigger, const void* assoc)
: ResultCallback(std::move(trigger), assoc) {}
void ErrorResultCallback::Complete(const ErrorResult& res) {
zeek::Val* result;
if ( IsSyncCallback() )
result = res;
zeek::Val* val_result;
if ( res )
result = new StringVal(res.value());
val_result = new StringVal(res.value());
else
result = val_mgr->Bool(true).get();
val_result = val_mgr->Bool(true).get();
ValComplete(result);
ValComplete(val_result);
}
ValResultCallback::ValResultCallback(zeek::detail::trigger::TriggerPtr trigger, const void* assoc)
: ResultCallback(std::move(trigger), assoc) {}
void ValResultCallback::Complete(const ValResult& res) {
if ( IsSyncCallback() )
result = res;
static auto val_result_type = zeek::id::find_type<zeek::RecordType>("val_result");
auto* result = new zeek::RecordVal(val_result_type);
auto* val_result = new zeek::RecordVal(val_result_type);
if ( res )
result->Assign(0, res.value());
val_result->Assign(0, res.value());
else
result->Assign(1, zeek::make_intrusive<StringVal>(res.error()));
val_result->Assign(1, zeek::make_intrusive<StringVal>(res.error()));
ValComplete(result);
ValComplete(val_result);
}
OpenResultCallback::OpenResultCallback(detail::BackendHandleVal* backend) : ResultCallback(), backend(backend) {}
OpenResultCallback::OpenResultCallback(IntrusivePtr<zeek::detail::trigger::Trigger> trigger, const void* assoc,
detail::BackendHandleVal* backend)
: ResultCallback(std::move(trigger), assoc), backend(backend) {}
void OpenResultCallback::Complete(const ErrorResult& res) {
zeek::Val* result;
if ( IsSyncCallback() )
result = res;
zeek::Val* val_result;
if ( res )
result = new StringVal(res.value());
val_result = new StringVal(res.value());
else
result = backend;
val_result = backend;
ValComplete(result);
ValComplete(val_result);
}
ErrorResult Backend::Open(RecordValPtr options, TypePtr kt, TypePtr vt, OpenResultCallback* cb) {
key_type = std::move(kt);
val_type = std::move(vt);
auto res = DoOpen(std::move(options));
if ( (! native_async || zeek::run_state::reading_traces) && cb ) {
cb->Complete(res);
delete cb;
}
return res;
return DoOpen(std::move(options));
}
ErrorResult Backend::Close(ErrorResultCallback* cb) {
auto res = DoClose(cb);
if ( (! native_async || zeek::run_state::reading_traces) && cb ) {
cb->Complete(res);
delete cb;
}
return res;
}
ErrorResult Backend::Close(ErrorResultCallback* cb) { return DoClose(cb); }
ErrorResult Backend::Put(ValPtr key, ValPtr value, bool overwrite, double expiration_time, ErrorResultCallback* cb) {
// The intention for this method is to do some other heavy lifting in regard
@ -107,14 +105,7 @@ ErrorResult Backend::Put(ValPtr key, ValPtr value, bool overwrite, double expira
return util::fmt("type of value passed (%s) does not match backend's value type (%s)",
obj_desc_short(value->GetType().get()).c_str(), val_type->GetName().c_str());
auto res = DoPut(std::move(key), std::move(value), overwrite, expiration_time, cb);
if ( ! native_async && cb ) {
cb->Complete(res);
delete cb;
}
return res;
return DoPut(std::move(key), std::move(value), overwrite, expiration_time, cb);
}
ValResult Backend::Get(ValPtr key, ValResultCallback* cb) {
@ -123,14 +114,7 @@ ValResult Backend::Get(ValPtr key, ValResultCallback* cb) {
return zeek::unexpected<std::string>(util::fmt("type of key passed (%s) does not match backend's key type (%s)",
key->GetType()->GetName().c_str(), key_type->GetName().c_str()));
auto res = DoGet(std::move(key), cb);
if ( ! native_async && cb ) {
cb->Complete(res);
delete cb;
}
return res;
return DoGet(std::move(key), cb);
}
ErrorResult Backend::Erase(ValPtr key, ErrorResultCallback* cb) {
@ -139,16 +123,31 @@ ErrorResult Backend::Erase(ValPtr key, ErrorResultCallback* cb) {
return util::fmt("type of key passed (%s) does not match backend's key type (%s)",
key->GetType()->GetName().c_str(), key_type->GetName().c_str());
auto res = DoErase(std::move(key), cb);
return DoErase(std::move(key), cb);
}
if ( ! native_async && cb ) {
cb->Complete(res);
void Backend::CompleteCallback(ValResultCallback* cb, const ValResult& data) const {
cb->Complete(data);
if ( ! cb->IsSyncCallback() ) {
delete cb;
}
return res;
}
void Backend::CompleteCallback(ErrorResultCallback* cb, const ErrorResult& data) const {
cb->Complete(data);
if ( ! cb->IsSyncCallback() ) {
delete cb;
}
}
void Backend::CompleteCallback(OpenResultCallback* cb, const ErrorResult& data) const {
cb->Complete(data);
if ( ! cb->IsSyncCallback() ) {
delete cb;
}
}
zeek::OpaqueTypePtr detail::backend_opaque;
IMPLEMENT_OPAQUE_VALUE(detail::BackendHandleVal)

View file

@ -29,34 +29,48 @@ using ValResult = zeek::expected<ValPtr, std::string>;
// code reuse in the other callback methods.
class ResultCallback {
public:
ResultCallback() = default;
ResultCallback(IntrusivePtr<zeek::detail::trigger::Trigger> trigger, const void* assoc);
virtual ~ResultCallback();
void Timeout();
bool IsSyncCallback() const { return ! trigger; }
protected:
void ValComplete(Val* result);
private:
IntrusivePtr<zeek::detail::trigger::Trigger> trigger;
const void* assoc;
const void* assoc = nullptr;
};
// A callback result that returns an ErrorResult.
class ErrorResultCallback : public ResultCallback {
public:
ErrorResultCallback() = default;
ErrorResultCallback(zeek::detail::trigger::TriggerPtr trigger, const void* assoc);
virtual void Complete(const ErrorResult& res);
ErrorResult Result() { return result; }
private:
ErrorResult result;
};
// A callback result that returns a ValResult.
class ValResultCallback : public ResultCallback {
public:
ValResultCallback() = default;
ValResultCallback(zeek::detail::trigger::TriggerPtr trigger, const void* assoc);
void Complete(const ValResult& res);
ValResult Result() { return result; }
private:
ValResult result;
};
class OpenResultCallback;
enum SupportedModes : uint8_t { SYNC = 0x01, ASYNC = 0x02 };
class Backend : public zeek::Obj {
public:
/**
@ -105,6 +119,15 @@ public:
*/
virtual bool IsOpen() = 0;
bool SupportsSync() const { return (modes & SupportedModes::SYNC) == SupportedModes::SYNC; }
bool SupportsAsync() const { return (modes & SupportedModes::ASYNC) == SupportedModes::ASYNC; }
/**
* Optional method to allow a backend to poll for data. This can be used to
* mimic sync mode even if the backend only supports async.
*/
virtual void Poll() {}
protected:
// Allow the manager to call Open/Close.
friend class storage::Manager;
@ -112,14 +135,12 @@ protected:
/**
* Constructor
*
* @param native_async Denotes whether this backend can handle async request
* natively. If set to false, the Put/Get/Erase methods will call the
* callback after their corresponding Do methods return. If set to true, the
* backend needs to call the callback itself.
* @param modes A combination of values from SupportedModes. These modes
# define whether a backend only supports sync or async or both.
* @param tag A string representation of the tag for this backend. This
* is passed from the Manager through the component factory.
*/
Backend(bool native_async, std::string_view tag) : tag(tag), native_async(native_async) {}
Backend(uint8_t modes, std::string_view tag) : tag(tag), modes(modes) {}
/**
* Called by the manager system to open the backend.
@ -176,13 +197,17 @@ protected:
*/
virtual void Expire() {}
void CompleteCallback(ValResultCallback* cb, const ValResult& data) const;
void CompleteCallback(ErrorResultCallback* cb, const ErrorResult& data) const;
void CompleteCallback(OpenResultCallback* cb, const ErrorResult& data) const;
TypePtr key_type;
TypePtr val_type;
std::string tag;
private:
bool native_async = false;
uint8_t modes;
};
using BackendPtr = zeek::IntrusivePtr<Backend>;
@ -210,12 +235,15 @@ protected:
// A callback for the Backend::Open() method that returns an error or a backend handle.
class OpenResultCallback : public ResultCallback {
public:
OpenResultCallback(detail::BackendHandleVal* backend);
OpenResultCallback(IntrusivePtr<zeek::detail::trigger::Trigger> trigger, const void* assoc,
detail::BackendHandleVal* backend);
void Complete(const ErrorResult& res);
ErrorResult Result() { return result; }
private:
detail::BackendHandleVal* backend;
ErrorResult result;
detail::BackendHandleVal* backend = nullptr;
};
} // namespace zeek::storage

View file

@ -17,10 +17,8 @@ namespace {
class Tracer {
public:
Tracer(const std::string& where) : where(where) { // printf("%s\n", where.c_str());
}
~Tracer() { // printf("%s done\n", where.c_str());
}
Tracer(const std::string& where) : where(where) {} // DBG_LOG(zeek::DBG_STORAGE, "%s", where.c_str()); }
~Tracer() {} // DBG_LOG(zeek::DBG_STORAGE, "%s done", where.c_str()); }
std::string where;
};
@ -58,7 +56,7 @@ void redisErase(redisAsyncContext* ctx, void* reply, void* privdata) {
}
void redisZRANGEBYSCORE(redisAsyncContext* ctx, void* reply, void* privdata) {
auto t = Tracer("erase");
auto t = Tracer("zrangebyscore");
auto backend = static_cast<zeek::storage::backend::redis::Redis*>(ctx->data);
backend->HandleZRANGEBYSCORE(static_cast<redisReply*>(reply));
}
@ -201,11 +199,6 @@ ErrorResult Redis::DoOpen(RecordValPtr options, OpenResultCallback* cb) {
async_ctx->ev.addWrite = redisAddWrite;
async_ctx->ev.delWrite = redisDelWrite;
if ( ! cb )
// Polling here will eventually call OnConnect, which will set the flag
// that we're connected.
Poll();
return std::nullopt;
}
@ -218,7 +211,7 @@ ErrorResult Redis::DoClose(ErrorResultCallback* cb) {
redisAsyncDisconnect(async_ctx);
++active_ops;
if ( ! cb && ! zeek::run_state::terminating ) {
if ( cb->IsSyncCallback() && ! zeek::run_state::terminating ) {
Poll();
// TODO: handle response
}
@ -261,25 +254,6 @@ ErrorResult Redis::DoPut(ValPtr key, ValPtr value, bool overwrite, double expira
++active_ops;
if ( ! cb ) {
Poll();
redisReply* reply = reply_queue.front();
reply_queue.pop_front();
ErrorResult res;
if ( ! connected )
res = util::fmt("Connection is not open");
else if ( ! reply )
res = util::fmt("Async put operation returned null reply");
else if ( reply && reply->type == REDIS_REPLY_ERROR )
res = util::fmt("Async put operation failed: %s", reply->str);
freeReplyObject(reply);
if ( res.has_value() )
return res;
}
// If reading pcaps insert into a secondary set that's ordered by expiration
// time that gets checked by Expire().
if ( expiration_time > 0.0 && zeek::run_state::reading_traces ) {
@ -296,12 +270,6 @@ ErrorResult Redis::DoPut(ValPtr key, ValPtr value, bool overwrite, double expira
++active_ops;
}
if ( ! cb ) {
// We don't care about the result from the ZADD, just that we wait
// for it to finish.
Poll();
}
return std::nullopt;
}
@ -321,16 +289,6 @@ ValResult Redis::DoGet(ValPtr key, ValResultCallback* cb) {
++active_ops;
if ( ! cb ) {
Poll();
redisReply* reply = reply_queue.front();
reply_queue.pop_front();
auto res = ParseGetReply(reply);
freeReplyObject(reply);
return res;
}
// There isn't a result to return here. That happens in HandleGetResult for
// async operations.
return zeek::unexpected<std::string>("");
@ -352,13 +310,6 @@ ErrorResult Redis::DoErase(ValPtr key, ErrorResultCallback* cb) {
++active_ops;
if ( ! cb ) {
Poll();
redisReply* reply = reply_queue.front();
reply_queue.pop_front();
freeReplyObject(reply);
}
return std::nullopt;
}
@ -415,46 +366,37 @@ void Redis::Expire() {
void Redis::HandlePutResult(redisReply* reply, ErrorResultCallback* callback) {
--active_ops;
if ( callback ) {
ErrorResult res;
if ( ! connected )
res = util::fmt("Connection is not open");
else if ( ! reply )
res = util::fmt("Async put operation returned null reply");
else if ( reply && reply->type == REDIS_REPLY_ERROR )
res = util::fmt("Async put operation failed: %s", reply->str);
ErrorResult res;
if ( ! connected )
res = util::fmt("Connection is not open");
else if ( ! reply )
res = util::fmt("Async put operation returned null reply");
else if ( reply && reply->type == REDIS_REPLY_ERROR )
res = util::fmt("Async put operation failed: %s", reply->str);
freeReplyObject(reply);
callback->Complete(res);
delete callback;
}
else
reply_queue.push_back(reply);
freeReplyObject(reply);
CompleteCallback(callback, res);
}
void Redis::HandleGetResult(redisReply* reply, ValResultCallback* callback) {
--active_ops;
if ( callback ) {
ValResult res;
if ( ! connected )
res = zeek::unexpected<std::string>("Connection is not open");
else
res = ParseGetReply(reply);
ValResult res;
if ( ! connected )
res = zeek::unexpected<std::string>("Connection is not open");
else
res = ParseGetReply(reply);
callback->Complete(res);
freeReplyObject(reply);
delete callback;
}
else {
reply_queue.push_back(reply);
}
freeReplyObject(reply);
CompleteCallback(callback, res);
}
void Redis::HandleEraseResult(redisReply* reply, ErrorResultCallback* callback) {
--active_ops;
if ( callback ) {
if ( callback->IsSyncCallback() )
reply_queue.push_back(reply);
else {
ErrorResult res;
if ( ! connected )
res = "Connection is not open";
@ -464,11 +406,8 @@ void Redis::HandleEraseResult(redisReply* reply, ErrorResultCallback* callback)
res = util::fmt("Async erase operation failed: %s", reply->str);
freeReplyObject(reply);
callback->Complete(res);
delete callback;
CompleteCallback(callback, res);
}
else
reply_queue.push_back(reply);
}
void Redis::HandleZRANGEBYSCORE(redisReply* reply) {

View file

@ -13,7 +13,7 @@ struct redisPollEvents;
namespace zeek::storage::backend::redis {
class Redis : public Backend, public iosource::IOSource {
public:
Redis(std::string_view tag) : Backend(true, tag), IOSource(true) {}
Redis(std::string_view tag) : Backend(SupportedModes::ASYNC, tag), IOSource(true) {}
~Redis() override = default;
static BackendPtr Instantiate(std::string_view tag);
@ -81,9 +81,11 @@ public:
// themselves from the list of active operations.
void HandleGeneric() { --active_ops; }
protected:
void Poll() override;
private:
ValResult ParseGetReply(redisReply* reply) const;
void Poll();
redisAsyncContext* async_ctx = nullptr;

View file

@ -12,7 +12,7 @@ namespace zeek::storage::backend::sqlite {
class SQLite : public Backend {
public:
SQLite(std::string_view tag) : Backend(false, tag) {}
SQLite(std::string_view tag) : Backend(SupportedModes::SYNC, tag) {}
~SQLite() override = default;
static BackendPtr Instantiate(std::string_view tag);

View file

@ -8,12 +8,12 @@ using namespace zeek;
using namespace zeek::storage;
static zeek::detail::trigger::TriggerPtr init_trigger(zeek::detail::Frame* frame) {
auto trigger = frame->GetTrigger();
auto trigger = frame->GetTrigger();
if ( ! trigger ) {
emit_builtin_error("Asynchronous storage operations must be called via a when-condition");
return nullptr;
}
if ( ! trigger ) {
emit_builtin_error("Asynchronous storage operations must be called via a when-condition");
return nullptr;
}
if ( auto timeout = trigger->TimeoutValue(); timeout < 0 ) {
emit_builtin_error("Async Storage operations must specify a timeout block");
@ -56,7 +56,7 @@ function Storage::Async::__open_backend%(btype: Storage::Backend, options: any,
%{
auto trigger = init_trigger(frame);
if ( ! trigger )
return val_mgr->Bool(false);
return val_mgr->Bool(false);
auto btype_val = IntrusivePtr<EnumVal>{NewRef{}, btype->AsEnumVal()};
Tag tag{btype_val};
@ -74,7 +74,19 @@ function Storage::Async::__open_backend%(btype: Storage::Backend, options: any,
auto kt = key_type->AsTypeVal()->GetType()->AsTypeType()->GetType();
auto vt = val_type->AsTypeVal()->GetType()->AsTypeType()->GetType();
auto options_val = IntrusivePtr<RecordVal>{NewRef{}, options->AsRecordVal()};
storage_mgr->OpenBackend(b.value(), options_val, kt, vt, cb);
auto open_res = storage_mgr->OpenBackend(b.value(), options_val, kt, vt, cb);
if ( ! b.value()->SupportsAsync() ) {
// If the backend doesn't support async, we blocked in order to get here already. Handle the
// callback manually.
cb->Complete(open_res);
delete cb;
}
else if ( run_state::reading_traces ) {
// If the backend is truly async and we're reading traces, we need to fake being in sync mode
// because otherwise time doesn't move forward correctly.
b.value()->Poll();
}
return nullptr;
%}
@ -94,7 +106,19 @@ function Storage::Async::__close_backend%(backend: opaque of Storage::BackendHan
return val_mgr->Bool(true);
auto cb = new ErrorResultCallback(trigger, frame->GetTriggerAssoc());
storage_mgr->CloseBackend(b->backend, cb);
auto close_res = storage_mgr->CloseBackend(b->backend, cb);
if ( ! b->backend->SupportsAsync() ) {
// If the backend doesn't support async, we blocked in order to get here already. Handle the
// callback manually.
cb->Complete(close_res);
delete cb;
}
else if ( run_state::reading_traces ) {
// If the backend is truly async and we're reading traces, we need to fake being in sync mode
// because otherwise time doesn't move forward correctly.
b->backend->Poll();
}
return nullptr;
%}
@ -120,7 +144,19 @@ function Storage::Async::__put%(backend: opaque of Storage::BackendHandle, key:
auto cb = new ErrorResultCallback(trigger, frame->GetTriggerAssoc());
auto key_v = IntrusivePtr<Val>{NewRef{}, key};
auto val_v = IntrusivePtr<Val>{NewRef{}, value};
b->backend->Put(key_v, val_v, overwrite, expire_time, cb);
auto put_res = b->backend->Put(key_v, val_v, overwrite, expire_time, cb);
if ( ! b->backend->SupportsAsync() ) {
// If the backend doesn't support async, we blocked in order to get here already. Handle the
// callback manually.
cb->Complete(put_res);
delete cb;
}
else if ( run_state::reading_traces ) {
// If the backend is truly async and we're reading traces, we need to fake being in sync mode
// because otherwise time doesn't move forward correctly.
b->backend->Poll();
}
return nullptr;
%}
@ -148,7 +184,19 @@ function Storage::Async::__get%(backend: opaque of Storage::BackendHandle, key:
auto cb = new ValResultCallback(trigger, frame->GetTriggerAssoc());
auto key_v = IntrusivePtr<Val>{NewRef{}, key};
auto result = b->backend->Get(key_v, cb);
auto get_res = b->backend->Get(key_v, cb);
if ( ! b->backend->SupportsAsync() ) {
// If the backend doesn't support async, we blocked in order to get here already. Handle the
// callback manually.
cb->Complete(get_res);
delete cb;
}
else if ( run_state::reading_traces ) {
// If the backend is truly async and we're reading traces, we need to fake being in sync mode
// because otherwise time doesn't move forward correctly.
b->backend->Poll();
}
return nullptr;
%}
@ -169,7 +217,19 @@ function Storage::Async::__erase%(backend: opaque of Storage::BackendHandle, key
auto cb = new ErrorResultCallback(trigger, frame->GetTriggerAssoc());
auto key_v = IntrusivePtr<Val>{NewRef{}, key};
b->backend->Erase(key_v, cb);
auto erase_res = b->backend->Erase(key_v, cb);
if ( ! b->backend->SupportsAsync() ) {
// If the backend doesn't support async, we blocked in order to get here already. Handle the
// callback manually.
cb->Complete(erase_res);
delete cb;
}
else if ( run_state::reading_traces ) {
// If the backend is truly async and we're reading traces, we need to fake being in sync mode
// because otherwise time doesn't move forward correctly.
b->backend->Poll();
}
return nullptr;
%}
@ -188,17 +248,29 @@ function Storage::Sync::__open_backend%(btype: Storage::Backend, options: any, k
return val_mgr->Bool(false);
}
auto bh = make_intrusive<storage::detail::BackendHandleVal>(b.value());
auto cb = new OpenResultCallback(bh.get());
auto kt = key_type->AsTypeVal()->GetType()->AsTypeType()->GetType();
auto vt = val_type->AsTypeVal()->GetType()->AsTypeType()->GetType();
auto options_val = IntrusivePtr<RecordVal>{NewRef{}, options->AsRecordVal()};
auto open_res = storage_mgr->OpenBackend(b.value(), options_val, kt, vt, nullptr);
auto open_res = storage_mgr->OpenBackend(b.value(), options_val, kt, vt, cb);
// If the backend only supports async, block until it's ready and then pull the result out of
// the callback.
if ( ! b.value()->SupportsSync() ) {
b.value()->Poll();
open_res = cb->Result();
}
delete cb;
if ( open_res.has_value() ) {
emit_builtin_error(open_res.value().c_str());
return val_mgr->Bool(false);
}
return make_intrusive<storage::detail::BackendHandleVal>(b.value());
return bh;
%}
function Storage::Sync::__close_backend%(backend: opaque of Storage::BackendHandle%) : bool
@ -212,10 +284,20 @@ function Storage::Sync::__close_backend%(backend: opaque of Storage::BackendHand
// Return true here since the backend is already closed
return val_mgr->Bool(true);
auto result = storage_mgr->CloseBackend(b->backend, nullptr);
auto cb = new ErrorResultCallback();
auto close_res = storage_mgr->CloseBackend(b->backend, cb);
if ( result.has_value() ) {
emit_builtin_error(result.value().c_str());
// If the backend only supports async, block until it's ready and then pull the result out of
// the callback.
if ( ! b->backend->SupportsSync() ) {
b->backend->Poll();
close_res = cb->Result();
}
delete cb;
if ( close_res.has_value() ) {
emit_builtin_error(close_res.value().c_str());
return val_mgr->Bool(false);
}
@ -233,12 +315,22 @@ function Storage::Sync::__put%(backend: opaque of Storage::BackendHandle, key: a
else if ( ! b->backend->IsOpen() )
return val_mgr->Bool(false);
auto cb = new ErrorResultCallback();
auto key_v = IntrusivePtr<Val>{NewRef{}, key};
auto val_v = IntrusivePtr<Val>{NewRef{}, value};
auto result = b->backend->Put(key_v, val_v, overwrite, expire_time, nullptr);
auto put_res = b->backend->Put(key_v, val_v, overwrite, expire_time, cb);
if ( result.has_value() ) {
emit_builtin_error(util::fmt("Failed to store data: %s", result.value().c_str()));
// If the backend only supports async, block until it's ready and then pull the result out of
// the callback.
if ( ! b->backend->SupportsSync() ) {
b->backend->Poll();
put_res = cb->Result();
}
delete cb;
if ( put_res.has_value() ) {
emit_builtin_error(util::fmt("Failed to store data: %s", put_res.value().c_str()));
return val_mgr->Bool(false);
}
@ -261,15 +353,25 @@ function Storage::Sync::__get%(backend: opaque of Storage::BackendHandle, key: a
}
auto key_v = IntrusivePtr<Val>{NewRef{}, key};
auto result = b->backend->Get(key_v, nullptr);
auto cb = new ValResultCallback();
auto get_res = b->backend->Get(key_v, cb);
if ( ! result.has_value() ) {
// If the backend only supports async, block until it's ready and then pull the result out of
// the callback.
if ( ! b->backend->SupportsSync() ) {
b->backend->Poll();
get_res = cb->Result();
}
delete cb;
if ( ! get_res.has_value() ) {
val_result->Assign(1, make_intrusive<StringVal>(
util::fmt("Failed to retrieve data: %s", result.error().c_str())));
util::fmt("Failed to retrieve data: %s", get_res.error().c_str())));
return val_result;
}
val_result->Assign(0, result.value());
val_result->Assign(0, get_res.value());
return val_result;
%}
@ -283,11 +385,21 @@ function Storage::Sync::__erase%(backend: opaque of Storage::BackendHandle, key:
else if ( ! b->backend->IsOpen() )
return val_mgr->Bool(false);
auto cb = new ErrorResultCallback();
auto key_v = IntrusivePtr<Val>{NewRef{}, key};
auto result = b->backend->Erase(key_v, nullptr);
auto erase_res = b->backend->Erase(key_v, cb);
if ( result.has_value() ) {
emit_builtin_error(util::fmt("Failed to erase data for key: %s", result.value().c_str()));
// If the backend only supports async, block until it's ready and then pull the result out of
// the callback.
if ( ! b->backend->SupportsSync() ) {
b->backend->Poll();
erase_res = cb->Result();
}
delete cb;
if ( erase_res.has_value() ) {
emit_builtin_error(util::fmt("Failed to erase data for key: %s", erase_res.value().c_str()));
return val_mgr->Bool(false);
}

View file

@ -13,7 +13,7 @@ namespace btest::storage::backend {
*/
class StorageDummy : public zeek::storage::Backend {
public:
StorageDummy(std::string_view tag) : Backend(false, tag) {}
StorageDummy(std::string_view tag) : Backend(zeek::storage::SupportedModes::SYNC, tag) {}
~StorageDummy() override = default;
static zeek::storage::BackendPtr Instantiate(std::string_view tag);

View file

@ -1,6 +1,5 @@
# @TEST-DOC: Tests that Redis storage backend defaults back to sync mode reading pcaps
# @TEST-KNOWN-FAILURE: Currently broken due to the redis async rework
# @TEST-REQUIRES: have-redis
# @TEST-PORT: REDIS_PORT