Squash code from OperationResultCallback into ResultCallback

This commit is contained in:
Tim Wojtulewicz 2025-03-18 18:34:23 -07:00
parent cd66b32428
commit e5b06367f7
12 changed files with 62 additions and 91 deletions

View file

@ -34,10 +34,7 @@ void ResultCallback::Timeout() {
trigger->Cache(assoc, OperationResult::MakeVal(ReturnCode::TIMEOUT).release());
}
OperationResultCallback::OperationResultCallback(zeek::detail::trigger::TriggerPtr trigger, const void* assoc)
: ResultCallback(std::move(trigger), assoc) {}
void OperationResultCallback::Complete(OperationResult res) {
void ResultCallback::Complete(OperationResult res) {
// If this is a sync callback, there isn't a trigger to process. Store the result and bail.
if ( IsSyncCallback() ) {
result = std::move(res);
@ -65,15 +62,7 @@ void OpenResultCallback::Complete(OperationResult res) {
// passed back to the trigger or the one stored for sync backends.
res.value = backend;
// If this is a sync callback, there isn't a trigger to process. Store the result and bail.
if ( IsSyncCallback() ) {
result = std::move(res);
return;
}
auto res_val = res.BuildVal();
trigger->Cache(assoc, res_val.get());
trigger->Release();
ResultCallback::Complete(std::move(res));
}
OperationResult Backend::Open(OpenResultCallback* cb, RecordValPtr options, TypePtr kt, TypePtr vt) {
@ -88,10 +77,9 @@ OperationResult Backend::Open(OpenResultCallback* cb, RecordValPtr options, Type
return ret;
}
OperationResult Backend::Close(OperationResultCallback* cb) { return DoClose(cb); }
OperationResult Backend::Close(ResultCallback* cb) { return DoClose(cb); }
OperationResult Backend::Put(OperationResultCallback* cb, ValPtr key, ValPtr value, bool overwrite,
double expiration_time) {
OperationResult Backend::Put(ResultCallback* cb, ValPtr key, ValPtr value, bool overwrite, double expiration_time) {
// The intention for this method is to do some other heavy lifting in regard
// to backends that need to pass data through the manager instead of directly
// through the workers. For the first versions of the storage framework it
@ -110,7 +98,7 @@ OperationResult Backend::Put(OperationResultCallback* cb, ValPtr key, ValPtr val
return DoPut(cb, std::move(key), std::move(value), overwrite, expiration_time);
}
OperationResult Backend::Get(OperationResultCallback* cb, ValPtr key) {
OperationResult Backend::Get(ResultCallback* cb, ValPtr key) {
// See the note in Put().
if ( ! same_type(key->GetType(), key_type) ) {
auto ret = OperationResult{ReturnCode::KEY_TYPE_MISMATCH};
@ -121,7 +109,7 @@ OperationResult Backend::Get(OperationResultCallback* cb, ValPtr key) {
return DoGet(cb, std::move(key));
}
OperationResult Backend::Erase(OperationResultCallback* cb, ValPtr key) {
OperationResult Backend::Erase(ResultCallback* cb, ValPtr key) {
// See the note in Put().
if ( ! same_type(key->GetType(), key_type) ) {
auto ret = OperationResult{ReturnCode::KEY_TYPE_MISMATCH};

View file

@ -76,25 +76,13 @@ public:
* Completes a callback, releasing the trigger if it was valid or storing the result
* for later usage if needed.
*/
virtual void Complete(OperationResult res) = 0;
virtual void Complete(OperationResult res);
OperationResult Result() const { return result; }
protected:
zeek::detail::trigger::TriggerPtr trigger;
const void* assoc = nullptr;
};
/**
* A callback that returns an `OperationResult` when it is complete. This is used by most
* of the storage operations for returning status.
*/
class OperationResultCallback : public ResultCallback {
public:
OperationResultCallback() = default;
OperationResultCallback(detail::trigger::TriggerPtr trigger, const void* assoc);
void Complete(OperationResult res) override;
OperationResult Result() { return result; }
private:
OperationResult result;
};
@ -127,7 +115,7 @@ public:
* @return A struct describing the result of the operation, containing a code, an
* optional error string, and a ValPtr for operations that return values.
*/
OperationResult Put(OperationResultCallback* cb, ValPtr key, ValPtr value, bool overwrite = true,
OperationResult Put(ResultCallback* cb, ValPtr key, ValPtr value, bool overwrite = true,
double expiration_time = 0);
/**
@ -139,7 +127,7 @@ public:
* @return A struct describing the result of the operation, containing a code, an
* optional error string, and a ValPtr for operations that return values.
*/
OperationResult Get(OperationResultCallback* cb, ValPtr key);
OperationResult Get(ResultCallback* cb, ValPtr key);
/**
* Erases the value for a key from the backend.
@ -150,7 +138,7 @@ public:
* @return A struct describing the result of the operation, containing a code, an
* optional error string, and a ValPtr for operations that return values.
*/
OperationResult Erase(OperationResultCallback* cb, ValPtr key);
OperationResult Erase(ResultCallback* cb, ValPtr key);
/**
* Returns whether the backend is opened.
@ -212,7 +200,7 @@ protected:
* @return A struct describing the result of the operation, containing a code, an
* optional error string, and a ValPtr for operations that return values.
*/
OperationResult Close(OperationResultCallback* cb);
OperationResult Close(ResultCallback* cb);
/**
* Removes any entries in the backend that have expired. Can be overridden by
@ -260,26 +248,26 @@ private:
* Workhorse method for calls to `Manager::CloseBackend()`. See that method for
* documentation of the arguments. This must be overridden by all backends.
*/
virtual OperationResult DoClose(OperationResultCallback* cb) = 0;
virtual OperationResult DoClose(ResultCallback* cb) = 0;
/**
* Workhorse method for calls to `Backend::Put()`. See that method for
* documentation of the arguments. This must be overridden by all backends.
*/
virtual OperationResult DoPut(OperationResultCallback* cb, ValPtr key, ValPtr value, bool overwrite,
virtual OperationResult DoPut(ResultCallback* cb, ValPtr key, ValPtr value, bool overwrite,
double expiration_time) = 0;
/**
* Workhorse method for calls to `Backend::Get()`. See that method for
* documentation of the arguments. This must be overridden by all backends.
*/
virtual OperationResult DoGet(OperationResultCallback* cb, ValPtr key) = 0;
virtual OperationResult DoGet(ResultCallback* cb, ValPtr key) = 0;
/**
* Workhorse method for calls to `Backend::Erase()`. See that method for
* documentation of the arguments. This must be overridden by all backends.
*/
virtual OperationResult DoErase(OperationResultCallback* cb, ValPtr key) = 0;
virtual OperationResult DoErase(ResultCallback* cb, ValPtr key) = 0;
/**
* Optional method for backends to override to provide direct polling. This should be
@ -337,11 +325,9 @@ public:
IntrusivePtr<detail::BackendHandleVal> backend);
void Complete(OperationResult res) override;
OperationResult Result() const { return result; }
IntrusivePtr<detail::BackendHandleVal> Backend() const { return backend; }
private:
OperationResult result{};
IntrusivePtr<detail::BackendHandleVal> backend;
};

View file

@ -89,7 +89,7 @@ OperationResult Manager::OpenBackend(BackendPtr backend, OpenResultCallback* cb,
return res;
}
OperationResult Manager::CloseBackend(BackendPtr backend, OperationResultCallback* cb) {
OperationResult Manager::CloseBackend(BackendPtr backend, ResultCallback* cb) {
// Expiration runs on a separate thread and loops over the vector of backends. The mutex
// here ensures exclusive access. This one happens in a block because we can remove the
// backend from the vector before actually closing it.

View file

@ -71,7 +71,7 @@ public:
* @return A struct describing the result of the operation, containing a code, an
* optional error string, and a ValPtr for operations that return values.
*/
OperationResult CloseBackend(BackendPtr backend, OperationResultCallback* cb);
OperationResult CloseBackend(BackendPtr backend, ResultCallback* cb);
/**
* Runs an expire operation on all open backends. This is called by the expiration

View file

@ -38,21 +38,21 @@ void redisOnDisconnect(const redisAsyncContext* ctx, int status) {
void redisPut(redisAsyncContext* ctx, void* reply, void* privdata) {
auto t = Tracer("put");
auto backend = static_cast<zeek::storage::backend::redis::Redis*>(ctx->data);
auto callback = static_cast<zeek::storage::OperationResultCallback*>(privdata);
auto callback = static_cast<zeek::storage::ResultCallback*>(privdata);
backend->HandlePutResult(static_cast<redisReply*>(reply), callback);
}
void redisGet(redisAsyncContext* ctx, void* reply, void* privdata) {
auto t = Tracer("get");
auto backend = static_cast<zeek::storage::backend::redis::Redis*>(ctx->data);
auto callback = static_cast<zeek::storage::OperationResultCallback*>(privdata);
auto callback = static_cast<zeek::storage::ResultCallback*>(privdata);
backend->HandleGetResult(static_cast<redisReply*>(reply), callback);
}
void redisErase(redisAsyncContext* ctx, void* reply, void* privdata) {
auto t = Tracer("erase");
auto backend = static_cast<zeek::storage::backend::redis::Redis*>(ctx->data);
auto callback = static_cast<zeek::storage::OperationResultCallback*>(privdata);
auto callback = static_cast<zeek::storage::ResultCallback*>(privdata);
backend->HandleEraseResult(static_cast<redisReply*>(reply), callback);
}
@ -232,7 +232,7 @@ OperationResult Redis::DoOpen(OpenResultCallback* cb, RecordValPtr options) {
/**
* Finalizes the backend when it's being closed.
*/
OperationResult Redis::DoClose(OperationResultCallback* cb) {
OperationResult Redis::DoClose(ResultCallback* cb) {
auto locked_scope = conditionally_lock(zeek::run_state::reading_traces, expire_mutex);
connected = false;
@ -247,8 +247,7 @@ OperationResult Redis::DoClose(OperationResultCallback* cb) {
/**
* The workhorse method for Put(). This must be implemented by plugins.
*/
OperationResult Redis::DoPut(OperationResultCallback* cb, ValPtr key, ValPtr value, bool overwrite,
double expiration_time) {
OperationResult Redis::DoPut(ResultCallback* cb, ValPtr key, ValPtr value, bool overwrite, double expiration_time) {
// The async context will queue operations until it's connected fully.
if ( ! connected && ! async_ctx )
return {ReturnCode::NOT_CONNECTED};
@ -301,7 +300,7 @@ OperationResult Redis::DoPut(OperationResultCallback* cb, ValPtr key, ValPtr val
/**
* The workhorse method for Get(). This must be implemented for plugins.
*/
OperationResult Redis::DoGet(OperationResultCallback* cb, ValPtr key) {
OperationResult Redis::DoGet(ResultCallback* cb, ValPtr key) {
// The async context will queue operations until it's connected fully.
if ( ! connected && ! async_ctx )
return {ReturnCode::NOT_CONNECTED};
@ -324,7 +323,7 @@ OperationResult Redis::DoGet(OperationResultCallback* cb, ValPtr key) {
/**
* The workhorse method for Erase(). This must be implemented for plugins.
*/
OperationResult Redis::DoErase(OperationResultCallback* cb, ValPtr key) {
OperationResult Redis::DoErase(ResultCallback* cb, ValPtr key) {
// The async context will queue operations until it's connected fully.
if ( ! connected && ! async_ctx )
return {ReturnCode::NOT_CONNECTED};
@ -409,7 +408,7 @@ void Redis::DoExpire(double current_network_time) {
// TODO: do we care if this failed?
}
void Redis::HandlePutResult(redisReply* reply, OperationResultCallback* callback) {
void Redis::HandlePutResult(redisReply* reply, ResultCallback* callback) {
--active_ops;
OperationResult res{ReturnCode::SUCCESS};
@ -424,7 +423,7 @@ void Redis::HandlePutResult(redisReply* reply, OperationResultCallback* callback
CompleteCallback(callback, res);
}
void Redis::HandleGetResult(redisReply* reply, OperationResultCallback* callback) {
void Redis::HandleGetResult(redisReply* reply, ResultCallback* callback) {
--active_ops;
OperationResult res;
@ -437,7 +436,7 @@ void Redis::HandleGetResult(redisReply* reply, OperationResultCallback* callback
CompleteCallback(callback, res);
}
void Redis::HandleEraseResult(redisReply* reply, OperationResultCallback* callback) {
void Redis::HandleEraseResult(redisReply* reply, ResultCallback* callback) {
--active_ops;
OperationResult res{ReturnCode::SUCCESS};

View file

@ -37,9 +37,9 @@ public:
void OnConnect(int status);
void OnDisconnect(int status);
void HandlePutResult(redisReply* reply, OperationResultCallback* callback);
void HandleGetResult(redisReply* reply, OperationResultCallback* callback);
void HandleEraseResult(redisReply* reply, OperationResultCallback* callback);
void HandlePutResult(redisReply* reply, ResultCallback* callback);
void HandleGetResult(redisReply* reply, ResultCallback* callback);
void HandleEraseResult(redisReply* reply, ResultCallback* callback);
void HandleGeneric(redisReply* reply);
/**
@ -51,11 +51,11 @@ public:
private:
OperationResult DoOpen(OpenResultCallback* cb, RecordValPtr options) override;
OperationResult DoClose(OperationResultCallback* cb) override;
OperationResult DoPut(OperationResultCallback* cb, ValPtr key, ValPtr value, bool overwrite,
OperationResult DoClose(ResultCallback* cb) override;
OperationResult DoPut(ResultCallback* cb, ValPtr key, ValPtr value, bool overwrite,
double expiration_time) override;
OperationResult DoGet(OperationResultCallback* cb, ValPtr key) override;
OperationResult DoErase(OperationResultCallback* cb, ValPtr key) override;
OperationResult DoGet(ResultCallback* cb, ValPtr key) override;
OperationResult DoErase(ResultCallback* cb, ValPtr key) override;
void DoExpire(double current_network_time) override;
void DoPoll() override;
@ -69,7 +69,7 @@ private:
std::deque<redisReply*> reply_queue;
OpenResultCallback* open_cb;
OperationResultCallback* close_cb;
ResultCallback* close_cb;
std::mutex expire_mutex;
std::string server_addr;

View file

@ -115,7 +115,7 @@ OperationResult SQLite::DoOpen(OpenResultCallback* cb, RecordValPtr options) {
/**
* Finalizes the backend when it's being closed.
*/
OperationResult SQLite::DoClose(OperationResultCallback* cb) {
OperationResult SQLite::DoClose(ResultCallback* cb) {
OperationResult op_res{ReturnCode::SUCCESS};
if ( db ) {
@ -146,8 +146,7 @@ OperationResult SQLite::DoClose(OperationResultCallback* cb) {
/**
* The workhorse method for Put(). This must be implemented by plugins.
*/
OperationResult SQLite::DoPut(OperationResultCallback* cb, ValPtr key, ValPtr value, bool overwrite,
double expiration_time) {
OperationResult SQLite::DoPut(ResultCallback* cb, ValPtr key, ValPtr value, bool overwrite, double expiration_time) {
if ( ! db )
return {ReturnCode::NOT_CONNECTED};
@ -193,7 +192,7 @@ OperationResult SQLite::DoPut(OperationResultCallback* cb, ValPtr key, ValPtr va
/**
* The workhorse method for Get(). This must be implemented for plugins.
*/
OperationResult SQLite::DoGet(OperationResultCallback* cb, ValPtr key) {
OperationResult SQLite::DoGet(ResultCallback* cb, ValPtr key) {
if ( ! db )
return {ReturnCode::NOT_CONNECTED};
@ -213,7 +212,7 @@ OperationResult SQLite::DoGet(OperationResultCallback* cb, ValPtr key) {
/**
* The workhorse method for Erase(). This must be implemented for plugins.
*/
OperationResult SQLite::DoErase(OperationResultCallback* cb, ValPtr key) {
OperationResult SQLite::DoErase(ResultCallback* cb, ValPtr key) {
if ( ! db )
return {ReturnCode::NOT_CONNECTED};

View file

@ -24,11 +24,11 @@ public:
private:
OperationResult DoOpen(OpenResultCallback* cb, RecordValPtr options) override;
OperationResult DoClose(OperationResultCallback* cb) override;
OperationResult DoPut(OperationResultCallback* cb, ValPtr key, ValPtr value, bool overwrite,
OperationResult DoClose(ResultCallback* cb) override;
OperationResult DoPut(ResultCallback* cb, ValPtr key, ValPtr value, bool overwrite,
double expiration_time) override;
OperationResult DoGet(OperationResultCallback* cb, ValPtr key) override;
OperationResult DoErase(OperationResultCallback* cb, ValPtr key) override;
OperationResult DoGet(ResultCallback* cb, ValPtr key) override;
OperationResult DoErase(ResultCallback* cb, ValPtr key) override;
void DoExpire(double current_network_time) override;
/**

View file

@ -107,7 +107,7 @@ function Storage::Async::__close_backend%(backend: opaque of Storage::BackendHan
if ( ! trigger )
return nullptr;
auto cb = new OperationResultCallback(trigger, frame->GetTriggerAssoc());
auto cb = new ResultCallback(trigger, frame->GetTriggerAssoc());
auto b = cast_handle(backend);
if ( ! b ) {
cb->Complete(b.error());
@ -128,7 +128,7 @@ function Storage::Async::__put%(backend: opaque of Storage::BackendHandle, key:
if ( ! trigger )
return nullptr;
auto cb = new OperationResultCallback(trigger, frame->GetTriggerAssoc());
auto cb = new ResultCallback(trigger, frame->GetTriggerAssoc());
auto b = cast_handle(backend);
if ( ! b ) {
cb->Complete(b.error());
@ -153,7 +153,7 @@ function Storage::Async::__get%(backend: opaque of Storage::BackendHandle, key:
if ( ! trigger )
return nullptr;
auto cb = new OperationResultCallback(trigger, frame->GetTriggerAssoc());
auto cb = new ResultCallback(trigger, frame->GetTriggerAssoc());
auto b = cast_handle(backend);
if ( ! b ) {
cb->Complete(b.error());
@ -174,7 +174,7 @@ function Storage::Async::__erase%(backend: opaque of Storage::BackendHandle, key
if ( ! trigger )
return nullptr;
auto cb = new OperationResultCallback(trigger, frame->GetTriggerAssoc());
auto cb = new ResultCallback(trigger, frame->GetTriggerAssoc());
auto b = cast_handle(backend);
if ( ! b ) {
cb->Complete(b.error());

View file

@ -68,7 +68,7 @@ function Storage::Sync::__close_backend%(backend: opaque of Storage::BackendHand
if ( ! b )
op_result = b.error();
else {
auto cb = new OperationResultCallback();
auto cb = new ResultCallback();
op_result = storage_mgr->CloseBackend((*b)->backend, cb);
// If the backend only supports async, block until it's ready and then pull the result out of
@ -96,7 +96,7 @@ function Storage::Sync::__put%(backend: opaque of Storage::BackendHandle, key: a
if ( expire_time > 0.0 )
expire_time += run_state::network_time;
auto cb = new OperationResultCallback();
auto cb = new ResultCallback();
auto key_v = IntrusivePtr<Val>{NewRef{}, key};
auto val_v = IntrusivePtr<Val>{NewRef{}, value};
op_result = (*b)->backend->Put(cb, key_v, val_v, overwrite, expire_time);
@ -122,7 +122,7 @@ function Storage::Sync::__get%(backend: opaque of Storage::BackendHandle, key: a
if ( ! b )
op_result = b.error();
else {
auto cb = new OperationResultCallback();
auto cb = new ResultCallback();
auto key_v = IntrusivePtr<Val>{NewRef{}, key};
op_result = (*b)->backend->Get(cb, key_v);
@ -147,7 +147,7 @@ function Storage::Sync::__erase%(backend: opaque of Storage::BackendHandle, key:
if ( ! b )
op_result = b.error();
else {
auto cb = new OperationResultCallback();
auto cb = new ResultCallback();
auto key_v = IntrusivePtr<Val>{NewRef{}, key};
op_result = (*b)->backend->Erase(cb, key_v);

View file

@ -34,7 +34,7 @@ OperationResult StorageDummy::DoOpen(OpenResultCallback* cb, RecordValPtr option
/**
* Finalizes the backend when it's being closed.
*/
OperationResult StorageDummy::DoClose(OperationResultCallback* cb) {
OperationResult StorageDummy::DoClose(ResultCallback* cb) {
open = false;
return {ReturnCode::SUCCESS};
}
@ -42,7 +42,7 @@ OperationResult StorageDummy::DoClose(OperationResultCallback* cb) {
/**
* The workhorse method for Put(). This must be implemented by plugins.
*/
OperationResult StorageDummy::DoPut(OperationResultCallback* cb, ValPtr key, ValPtr value, bool overwrite,
OperationResult StorageDummy::DoPut(ResultCallback* cb, ValPtr key, ValPtr value, bool overwrite,
double expiration_time) {
auto json_key = key->ToJSON()->ToStdString();
auto json_value = value->ToJSON()->ToStdString();
@ -53,7 +53,7 @@ OperationResult StorageDummy::DoPut(OperationResultCallback* cb, ValPtr key, Val
/**
* The workhorse method for Get(). This must be implemented for plugins.
*/
OperationResult StorageDummy::DoGet(OperationResultCallback* cb, ValPtr key) {
OperationResult StorageDummy::DoGet(ResultCallback* cb, ValPtr key) {
auto json_key = key->ToJSON();
auto it = data.find(json_key->ToStdString());
if ( it == data.end() )
@ -71,7 +71,7 @@ OperationResult StorageDummy::DoGet(OperationResultCallback* cb, ValPtr key) {
/**
* The workhorse method for Erase(). This must be implemented for plugins.
*/
OperationResult StorageDummy::DoErase(OperationResultCallback* cb, ValPtr key) {
OperationResult StorageDummy::DoErase(ResultCallback* cb, ValPtr key) {
auto json_key = key->ToJSON();
auto it = data.find(json_key->ToStdString());
if ( it == data.end() )

View file

@ -26,7 +26,7 @@ public:
/**
* Finalizes the backend when it's being closed.
*/
zeek::storage::OperationResult DoClose(zeek::storage::OperationResultCallback* cb = nullptr) override;
zeek::storage::OperationResult DoClose(zeek::storage::ResultCallback* cb = nullptr) override;
/**
* Returns whether the backend is opened.
@ -36,19 +36,18 @@ public:
/**
* The workhorse method for Put().
*/
zeek::storage::OperationResult DoPut(zeek::storage::OperationResultCallback* cb, zeek::ValPtr key,
zeek::ValPtr value, bool overwrite = true,
double expiration_time = 0) override;
zeek::storage::OperationResult DoPut(zeek::storage::ResultCallback* cb, zeek::ValPtr key, zeek::ValPtr value,
bool overwrite = true, double expiration_time = 0) override;
/**
* The workhorse method for Get().
*/
zeek::storage::OperationResult DoGet(zeek::storage::OperationResultCallback* cb, zeek::ValPtr key) override;
zeek::storage::OperationResult DoGet(zeek::storage::ResultCallback* cb, zeek::ValPtr key) override;
/**
* The workhorse method for Erase().
*/
zeek::storage::OperationResult DoErase(zeek::storage::OperationResultCallback* cb, zeek::ValPtr key) override;
zeek::storage::OperationResult DoErase(zeek::storage::ResultCallback* cb, zeek::ValPtr key) override;
private:
std::map<std::string, std::string> data;