Merge remote-tracking branch 'origin/topic/timw/storage-framework-followup'

* origin/topic/timw/storage-framework-followup:
  Redis: Handle other errors from requests, fix KEY_EXISTS for put operations
  SQLite: handle existing keys when overwrite=F correctly
  Remove unnecessary type aliases from storage btests
  Avoid thread-leak in scripts.base.frameworks.file-analysis.bifs.enable-disable btest
  Fix data-race with calling DBG_LOG from a separate thread
  Fix data-race with ReturnCode objects in Sqlite::DoExpire
  Fix data race with calling run_state::network_time from a separate thread
  Add NEWS entry for Storage, reduce CHANGES spam
  Fix Coverity findings in the SQLite backend
  Remove inclusion of non-existent expected-lite path during dynamic plugin builds
  Squash code from OperationResultCallback into ResultCallback
  Add hiredis to generate-docs workflow to enable Redis backend
This commit is contained in:
Tim Wojtulewicz 2025-03-21 11:56:40 -07:00
commit bc38dbcc99
36 changed files with 268 additions and 342 deletions

View file

@ -56,6 +56,7 @@ jobs:
g++ \
gcc \
git \
libhiredis-dev \
libfl-dev \
libfl2 \
libkrb5-dev \

150
CHANGES
View file

@ -1,3 +1,35 @@
7.2.0-dev.418 | 2025-03-21 11:56:40 -0700
* Redis: Handle other errors from requests, fix KEY_EXISTS for put operations (Tim Wojtulewicz, Corelight)
* SQLite: handle existing keys when overwrite=F correctly (Tim Wojtulewicz, Corelight)
* Remove unnecessary type aliases from storage btests (Tim Wojtulewicz, Corelight)
* Avoid thread-leak in scripts.base.frameworks.file-analysis.bifs.enable-disable btest (Tim Wojtulewicz, Corelight)
This btest uses the exit() BIF to shut down, which immediately calls
::exit() and kills Zeek without doing any shutdown. This will sometimes
leave the thread running the storage manager, which causes TSan to
complain about a thread leak. Switch to use the terminate() BIF instead
which cleanly shuts down all of Zeek.
* Fix data-race with calling DBG_LOG from a separate thread (Tim Wojtulewicz, Corelight)
* Fix data-race with ReturnCode objects in Sqlite::DoExpire (Tim Wojtulewicz, Corelight)
* Fix data race with calling run_state::network_time from a separate thread (Tim Wojtulewicz, Corelight)
* Add NEWS entry for Storage, reduce CHANGES spam (Tim Wojtulewicz, Corelight)
* Fix Coverity findings in the SQLite backend (Tim Wojtulewicz, Corelight)
* Remove inclusion of non-existent expected-lite path during dynamic plugin builds (Tim Wojtulewicz, Corelight)
* Squash code from OperationResultCallback into ResultCallback (Tim Wojtulewicz, Corelight)
* Add hiredis to generate-docs workflow to enable Redis backend (Tim Wojtulewicz, Corelight)
7.2.0-dev.405 | 2025-03-21 09:43:18 +0100
* testing/btest: Remove btest-bg-run sleep 1 (Arne Welzel, Corelight)
@ -67,121 +99,9 @@
7.2.0-dev.378 | 2025-03-18 11:43:48 -0700
* Cleanup/update comments across the storage C++ files (Tim Wojtulewicz, Corelight)
* Merged new Storage framework (Tim Wojtulewicz, Corelight)
* Split storage.bif file into events/sync/async, add more comments (Tim Wojtulewicz, Corelight)
* Update comments in script files, run zeek-format on all of them (Tim Wojtulewicz, Corelight)
* Allow sync methods to be called from when conditions, add related btest (Tim Wojtulewicz, Corelight)
* Redis: Handle disconnection correctly via callback (Tim Wojtulewicz, Corelight)
* Redis: Fix sync erase, add btest for it (Tim Wojtulewicz, Corelight)
* Remove default argument for callbacks, reorder function arguments (Tim Wojtulewicz, Corelight)
* Remove file-local expire_running variable (Tim Wojtulewicz, Corelight)
* Pass network time down to Expire() (Tim Wojtulewicz, Corelight)
* Add IN_PROGRESS return code, handle for async backends (Tim Wojtulewicz, Corelight)
* Store sqlite3_stmts directly instead of looking up from a map (Tim Wojtulewicz, Corelight)
* Reduce code duplication in storage.bif (Tim Wojtulewicz, Corelight)
* Add OperationResult::MakeVal, use it to reduce some code duplication (Tim Wojtulewicz, Corelight)
* Rearrange visibility of Backend methods, add DoPoll/DoExpire, add return comments (Tim Wojtulewicz, Corelight)
* Implement Storage::backend_opened and Storage::backend_lost events (Tim Wojtulewicz, Corelight)
* SQLite: expand expiration test (Tim Wojtulewicz, Corelight)
* SQLite: Handle other return values from sqlite3_step (Tim Wojtulewicz, Corelight)
* Redis: Fix thread-contention issues with Expire(), add more tests (Tim Wojtulewicz, Corelight)
* Change how redis-server is run during btests, removing redis.conf (Tim Wojtulewicz, Corelight)
* Completely rework return values from storage operations (Tim Wojtulewicz, Corelight)
* Update some btests due to timing changes (Tim Wojtulewicz, Corelight)
* Split sync/async handling into the BIF methods (Tim Wojtulewicz, Corelight)
* Redis: Rework everything to only use async mode (Tim Wojtulewicz, Corelight)
* Run expiration on a separate thread (Tim Wojtulewicz, Corelight)
* Pass network-time-based expiration time to backends instead of an interval (Tim Wojtulewicz, Corelight)
* Make backend options a record, move actual options to be sub-records (Tim Wojtulewicz, Corelight)
* Always register backend for expiration, check for open during loop (Tim Wojtulewicz, Corelight)
* Split sync and async into separate script-land namespaces (Tim Wojtulewicz, Corelight)
* Remove Backend::SupportsAsync (Tim Wojtulewicz, Corelight)
* Add btest that uses a Redis backend in a cluster (Tim Wojtulewicz, Corelight)
* Return generic result for get operations that includes error messages (Tim Wojtulewicz, Corelight)
* Allow opening and closing backends to be async (Tim Wojtulewicz, Corelight)
* Redis: Support non-native expiration when reading traces (Tim Wojtulewicz, Corelight)
* Redis: Add btests for the redis backend (Tim Wojtulewicz, Corelight)
* Redis: Force storage sync mode when reading pcaps, default to async mode (Tim Wojtulewicz, Corelight)
* Redis: Add new backend (Tim Wojtulewicz, Corelight)
* SQLite: Fix some issues with expiration, including in the btest (Tim Wojtulewicz, Corelight)
* SQLite: Add additional btests, which also cover general storage functionality (Tim Wojtulewicz, Corelight)
- New erase/overwrite tests
- Change existing sqlite-basic test to use async
- Test passing bad keys to validate backend type checking
- New test for compound keys and values
* SQLite: Add pragma integrity_check (Tim Wojtulewicz, Corelight)
* SQLite: Add tuning options to configuration (Tim Wojtulewicz, Corelight)
* SQLite: Handle automated expiration (Tim Wojtulewicz, Corelight)
* SQLite: Store/lookup prepared statements instead of recreating (Tim Wojtulewicz, Corelight)
* Add basic SQLite storage backend (Tim Wojtulewicz, Corelight)
* Add infrastructure for asynchronous storage operations (Tim Wojtulewicz, Corelight)
* Add infrastructure for automated expiration of storage entries (Tim Wojtulewicz, Corelight)
This is used for backends that don't support expiration natively.
* Change args to Storage::put to be a record (Tim Wojtulewicz, Corelight)
The number of args being passed to the put() methods was getting to be
fairly long, with more on the horizon. Changing to a record means simplifying
things a little bit.
* Pass key/value types for validation when opening backends (Tim Wojtulewicz, Corelight)
* Lay out initial parts for the Storage framework (Tim Wojtulewicz, Corelight)
This includes a manager, component manager, BIF and script code, and
parts to support new storage backend plugins.
* DebugLogger: add stream for storage (Tim Wojtulewicz, Corelight)
* plugin: Add component enum for storage backends (Tim Wojtulewicz, Corelight)
* Add martinmoene/expected-lite as a submodule (Tim Wojtulewicz, Corelight)
See the NEWS for more details about this new feature.
7.2.0-dev.325 | 2025-03-18 09:07:40 +0100
@ -11009,7 +10929,7 @@
Using "in" to query the language const. This also handles the case of not
having a best guess and continue using the existing behavior.
Given
keyboard_layout = 1033 (0x0409), "keyboard-English - United States"
keyboard_layout = 66569 (0x00010409), "keyboard-English - United States (Best Guess)"

View file

@ -354,10 +354,6 @@ target_include_directories(
zeek_dynamic_plugin_base SYSTEM
INTERFACE $<INSTALL_INTERFACE:include/zeek/3rdparty/prometheus-cpp/include>)
target_include_directories(
zeek_dynamic_plugin_base SYSTEM
INTERFACE $<INSTALL_INTERFACE:include/zeek/3rdparty/expected-lite/include>)
# Convenience function for adding an OBJECT library that feeds directly into the
# main target(s).
#

21
NEWS
View file

@ -29,6 +29,27 @@ New Functionality
- The new ``is_valid_subnet()`` function mirrors ``is_valid_ip()``, for subnets.
- A new Storage framework was merged into the Zeek tree. The intention with this framework
is to eventually replace the storage functionality that Broker provides, including
direct storage via calls such as ``Cluster::create_store`` and ``Broker::put_unique`` as
well as storage-backed tables via the ``&backend`` attribute. This is an initial version
for testing, and will be expanded upon in the future. The current state of the framework
is as follows:
- A new API was added for storage backend plugins.
- Script-level functions for opening and closing backends, and insertion, retrieval, and
erasure of elements are available.
- Backends can support both asynchronous mode (using ``when`` statements) and
synchronous mode (blocking until the operation copmletes). BIF methods were added
under new ``Storage::Async`` and ``Storage::Sync`` modules for these two modes. The
modes can be used interchangeably with the same backend handle.
- SQLite and Redis backends exist in the Zeek tree by default. We are working on a
backend for NATS that will be available as an external plugin, but it is not quite
ready yet. Both of the existing backends support usage in a cluster environment.
Changed Functionality
---------------------

View file

@ -1 +1 @@
7.2.0-dev.405
7.2.0-dev.418

View file

@ -34,10 +34,7 @@ void ResultCallback::Timeout() {
trigger->Cache(assoc, OperationResult::MakeVal(ReturnCode::TIMEOUT).release());
}
OperationResultCallback::OperationResultCallback(zeek::detail::trigger::TriggerPtr trigger, const void* assoc)
: ResultCallback(std::move(trigger), assoc) {}
void OperationResultCallback::Complete(OperationResult res) {
void ResultCallback::Complete(OperationResult res) {
// If this is a sync callback, there isn't a trigger to process. Store the result and bail.
if ( IsSyncCallback() ) {
result = std::move(res);
@ -65,15 +62,7 @@ void OpenResultCallback::Complete(OperationResult res) {
// passed back to the trigger or the one stored for sync backends.
res.value = backend;
// If this is a sync callback, there isn't a trigger to process. Store the result and bail.
if ( IsSyncCallback() ) {
result = std::move(res);
return;
}
auto res_val = res.BuildVal();
trigger->Cache(assoc, res_val.get());
trigger->Release();
ResultCallback::Complete(std::move(res));
}
OperationResult Backend::Open(OpenResultCallback* cb, RecordValPtr options, TypePtr kt, TypePtr vt) {
@ -88,10 +77,9 @@ OperationResult Backend::Open(OpenResultCallback* cb, RecordValPtr options, Type
return ret;
}
OperationResult Backend::Close(OperationResultCallback* cb) { return DoClose(cb); }
OperationResult Backend::Close(ResultCallback* cb) { return DoClose(cb); }
OperationResult Backend::Put(OperationResultCallback* cb, ValPtr key, ValPtr value, bool overwrite,
double expiration_time) {
OperationResult Backend::Put(ResultCallback* cb, ValPtr key, ValPtr value, bool overwrite, double expiration_time) {
// The intention for this method is to do some other heavy lifting in regard
// to backends that need to pass data through the manager instead of directly
// through the workers. For the first versions of the storage framework it
@ -110,7 +98,7 @@ OperationResult Backend::Put(OperationResultCallback* cb, ValPtr key, ValPtr val
return DoPut(cb, std::move(key), std::move(value), overwrite, expiration_time);
}
OperationResult Backend::Get(OperationResultCallback* cb, ValPtr key) {
OperationResult Backend::Get(ResultCallback* cb, ValPtr key) {
// See the note in Put().
if ( ! same_type(key->GetType(), key_type) ) {
auto ret = OperationResult{ReturnCode::KEY_TYPE_MISMATCH};
@ -121,7 +109,7 @@ OperationResult Backend::Get(OperationResultCallback* cb, ValPtr key) {
return DoGet(cb, std::move(key));
}
OperationResult Backend::Erase(OperationResultCallback* cb, ValPtr key) {
OperationResult Backend::Erase(ResultCallback* cb, ValPtr key) {
// See the note in Put().
if ( ! same_type(key->GetType(), key_type) ) {
auto ret = OperationResult{ReturnCode::KEY_TYPE_MISMATCH};

View file

@ -76,25 +76,13 @@ public:
* Completes a callback, releasing the trigger if it was valid or storing the result
* for later usage if needed.
*/
virtual void Complete(OperationResult res) = 0;
virtual void Complete(OperationResult res);
OperationResult Result() const { return result; }
protected:
zeek::detail::trigger::TriggerPtr trigger;
const void* assoc = nullptr;
};
/**
* A callback that returns an `OperationResult` when it is complete. This is used by most
* of the storage operations for returning status.
*/
class OperationResultCallback : public ResultCallback {
public:
OperationResultCallback() = default;
OperationResultCallback(detail::trigger::TriggerPtr trigger, const void* assoc);
void Complete(OperationResult res) override;
OperationResult Result() { return result; }
private:
OperationResult result;
};
@ -127,7 +115,7 @@ public:
* @return A struct describing the result of the operation, containing a code, an
* optional error string, and a ValPtr for operations that return values.
*/
OperationResult Put(OperationResultCallback* cb, ValPtr key, ValPtr value, bool overwrite = true,
OperationResult Put(ResultCallback* cb, ValPtr key, ValPtr value, bool overwrite = true,
double expiration_time = 0);
/**
@ -139,7 +127,7 @@ public:
* @return A struct describing the result of the operation, containing a code, an
* optional error string, and a ValPtr for operations that return values.
*/
OperationResult Get(OperationResultCallback* cb, ValPtr key);
OperationResult Get(ResultCallback* cb, ValPtr key);
/**
* Erases the value for a key from the backend.
@ -150,7 +138,7 @@ public:
* @return A struct describing the result of the operation, containing a code, an
* optional error string, and a ValPtr for operations that return values.
*/
OperationResult Erase(OperationResultCallback* cb, ValPtr key);
OperationResult Erase(ResultCallback* cb, ValPtr key);
/**
* Returns whether the backend is opened.
@ -212,7 +200,7 @@ protected:
* @return A struct describing the result of the operation, containing a code, an
* optional error string, and a ValPtr for operations that return values.
*/
OperationResult Close(OperationResultCallback* cb);
OperationResult Close(ResultCallback* cb);
/**
* Removes any entries in the backend that have expired. Can be overridden by
@ -260,26 +248,26 @@ private:
* Workhorse method for calls to `Manager::CloseBackend()`. See that method for
* documentation of the arguments. This must be overridden by all backends.
*/
virtual OperationResult DoClose(OperationResultCallback* cb) = 0;
virtual OperationResult DoClose(ResultCallback* cb) = 0;
/**
* Workhorse method for calls to `Backend::Put()`. See that method for
* documentation of the arguments. This must be overridden by all backends.
*/
virtual OperationResult DoPut(OperationResultCallback* cb, ValPtr key, ValPtr value, bool overwrite,
virtual OperationResult DoPut(ResultCallback* cb, ValPtr key, ValPtr value, bool overwrite,
double expiration_time) = 0;
/**
* Workhorse method for calls to `Backend::Get()`. See that method for
* documentation of the arguments. This must be overridden by all backends.
*/
virtual OperationResult DoGet(OperationResultCallback* cb, ValPtr key) = 0;
virtual OperationResult DoGet(ResultCallback* cb, ValPtr key) = 0;
/**
* Workhorse method for calls to `Backend::Erase()`. See that method for
* documentation of the arguments. This must be overridden by all backends.
*/
virtual OperationResult DoErase(OperationResultCallback* cb, ValPtr key) = 0;
virtual OperationResult DoErase(ResultCallback* cb, ValPtr key) = 0;
/**
* Optional method for backends to override to provide direct polling. This should be
@ -337,11 +325,9 @@ public:
IntrusivePtr<detail::BackendHandleVal> backend);
void Complete(OperationResult res) override;
OperationResult Result() const { return result; }
IntrusivePtr<detail::BackendHandleVal> Backend() const { return backend; }
private:
OperationResult result{};
IntrusivePtr<detail::BackendHandleVal> backend;
};

View file

@ -23,7 +23,7 @@ void detail::ExpirationTimer::Dispatch(double t, bool is_expire) {
// in the interim.
if ( ! expire_running.test_and_set() ) {
DBG_LOG(DBG_STORAGE, "Starting new expiration thread");
storage_mgr->expiration_thread = std::jthread([]() { storage_mgr->Expire(); });
storage_mgr->expiration_thread = std::jthread([t]() { storage_mgr->Expire(t); });
}
storage_mgr->StartExpirationTimer();
@ -89,7 +89,7 @@ OperationResult Manager::OpenBackend(BackendPtr backend, OpenResultCallback* cb,
return res;
}
OperationResult Manager::CloseBackend(BackendPtr backend, OperationResultCallback* cb) {
OperationResult Manager::CloseBackend(BackendPtr backend, ResultCallback* cb) {
// Expiration runs on a separate thread and loops over the vector of backends. The mutex
// here ensures exclusive access. This one happens in a block because we can remove the
// backend from the vector before actually closing it.
@ -107,17 +107,14 @@ OperationResult Manager::CloseBackend(BackendPtr backend, OperationResultCallbac
return res;
}
void Manager::Expire() {
void Manager::Expire(double t) {
// Expiration runs on a separate thread and loops over the vector of backends. The mutex
// here ensures exclusive access.
std::unique_lock<std::mutex> lk(backends_mtx);
DBG_LOG(DBG_STORAGE, "Expiration running, have %zu backends to check", backends.size());
double current_network_time = run_state::network_time;
for ( auto it = backends.begin(); it != backends.end() && ! run_state::terminating; ++it ) {
if ( (*it)->IsOpen() )
(*it)->Expire(current_network_time);
(*it)->Expire(t);
}
expire_running.clear();

View file

@ -71,14 +71,16 @@ public:
* @return A struct describing the result of the operation, containing a code, an
* optional error string, and a ValPtr for operations that return values.
*/
OperationResult CloseBackend(BackendPtr backend, OperationResultCallback* cb);
OperationResult CloseBackend(BackendPtr backend, ResultCallback* cb);
/**
* Runs an expire operation on all open backends. This is called by the expiration
* timer and shouldn't be called directly otherwise, since it should only happen on a
* separate thread.
*
* @param t The network time that the expiration started.
*/
void Expire();
void Expire(double t);
protected:
friend class storage::detail::ExpirationTimer;

View file

@ -38,21 +38,21 @@ void redisOnDisconnect(const redisAsyncContext* ctx, int status) {
void redisPut(redisAsyncContext* ctx, void* reply, void* privdata) {
auto t = Tracer("put");
auto backend = static_cast<zeek::storage::backend::redis::Redis*>(ctx->data);
auto callback = static_cast<zeek::storage::OperationResultCallback*>(privdata);
auto callback = static_cast<zeek::storage::ResultCallback*>(privdata);
backend->HandlePutResult(static_cast<redisReply*>(reply), callback);
}
void redisGet(redisAsyncContext* ctx, void* reply, void* privdata) {
auto t = Tracer("get");
auto backend = static_cast<zeek::storage::backend::redis::Redis*>(ctx->data);
auto callback = static_cast<zeek::storage::OperationResultCallback*>(privdata);
auto callback = static_cast<zeek::storage::ResultCallback*>(privdata);
backend->HandleGetResult(static_cast<redisReply*>(reply), callback);
}
void redisErase(redisAsyncContext* ctx, void* reply, void* privdata) {
auto t = Tracer("erase");
auto backend = static_cast<zeek::storage::backend::redis::Redis*>(ctx->data);
auto callback = static_cast<zeek::storage::OperationResultCallback*>(privdata);
auto callback = static_cast<zeek::storage::ResultCallback*>(privdata);
backend->HandleEraseResult(static_cast<redisReply*>(reply), callback);
}
@ -60,7 +60,7 @@ void redisZADD(redisAsyncContext* ctx, void* reply, void* privdata) {
auto t = Tracer("generic");
auto backend = static_cast<zeek::storage::backend::redis::Redis*>(ctx->data);
// We don't care about the reply from the ZADD, m1ostly because blocking to poll
// We don't care about the reply from the ZADD, mostly because blocking to poll
// for it adds a bunch of complication to DoPut() with having to handle the
// reply from SET first.
backend->HandleGeneric(nullptr);
@ -232,7 +232,7 @@ OperationResult Redis::DoOpen(OpenResultCallback* cb, RecordValPtr options) {
/**
* Finalizes the backend when it's being closed.
*/
OperationResult Redis::DoClose(OperationResultCallback* cb) {
OperationResult Redis::DoClose(ResultCallback* cb) {
auto locked_scope = conditionally_lock(zeek::run_state::reading_traces, expire_mutex);
connected = false;
@ -247,8 +247,7 @@ OperationResult Redis::DoClose(OperationResultCallback* cb) {
/**
* The workhorse method for Put(). This must be implemented by plugins.
*/
OperationResult Redis::DoPut(OperationResultCallback* cb, ValPtr key, ValPtr value, bool overwrite,
double expiration_time) {
OperationResult Redis::DoPut(ResultCallback* cb, ValPtr key, ValPtr value, bool overwrite, double expiration_time) {
// The async context will queue operations until it's connected fully.
if ( ! connected && ! async_ctx )
return {ReturnCode::NOT_CONNECTED};
@ -301,7 +300,7 @@ OperationResult Redis::DoPut(OperationResultCallback* cb, ValPtr key, ValPtr val
/**
* The workhorse method for Get(). This must be implemented for plugins.
*/
OperationResult Redis::DoGet(OperationResultCallback* cb, ValPtr key) {
OperationResult Redis::DoGet(ResultCallback* cb, ValPtr key) {
// The async context will queue operations until it's connected fully.
if ( ! connected && ! async_ctx )
return {ReturnCode::NOT_CONNECTED};
@ -324,7 +323,7 @@ OperationResult Redis::DoGet(OperationResultCallback* cb, ValPtr key) {
/**
* The workhorse method for Erase(). This must be implemented for plugins.
*/
OperationResult Redis::DoErase(OperationResultCallback* cb, ValPtr key) {
OperationResult Redis::DoErase(ResultCallback* cb, ValPtr key) {
// The async context will queue operations until it's connected fully.
if ( ! connected && ! async_ctx )
return {ReturnCode::NOT_CONNECTED};
@ -409,35 +408,49 @@ void Redis::DoExpire(double current_network_time) {
// TODO: do we care if this failed?
}
void Redis::HandlePutResult(redisReply* reply, OperationResultCallback* callback) {
void Redis::HandlePutResult(redisReply* reply, ResultCallback* callback) {
--active_ops;
OperationResult res{ReturnCode::SUCCESS};
if ( ! connected )
res = {ReturnCode::NOT_CONNECTED};
else if ( ! reply )
res = {ReturnCode::OPERATION_FAILED, "Async put operation returned null reply"};
else if ( reply && reply->type == REDIS_REPLY_ERROR )
res = {ReturnCode::OPERATION_FAILED, util::fmt("Async put operation failed: %s", reply->str)};
res = {ReturnCode::OPERATION_FAILED, "put operation returned null reply"};
else if ( reply->type == REDIS_REPLY_NIL )
// For a SET operation, a NIL reply indicates a conflict with the NX flag.
res = {ReturnCode::KEY_EXISTS};
else if ( reply->type == REDIS_REPLY_ERROR )
res = ParseReplyError("put", reply->str);
freeReplyObject(reply);
CompleteCallback(callback, res);
}
void Redis::HandleGetResult(redisReply* reply, OperationResultCallback* callback) {
void Redis::HandleGetResult(redisReply* reply, ResultCallback* callback) {
--active_ops;
OperationResult res;
if ( ! connected )
res = {ReturnCode::NOT_CONNECTED};
else
res = ParseGetReply(reply);
if ( ! reply )
res = {ReturnCode::OPERATION_FAILED, "get operation returned null reply"};
else if ( reply->type == REDIS_REPLY_NIL )
res = {ReturnCode::KEY_NOT_FOUND};
else if ( reply->type == REDIS_REPLY_ERROR )
res = ParseReplyError("get", reply->str);
else {
auto val = zeek::detail::ValFromJSON(reply->str, val_type, Func::nil);
if ( std::holds_alternative<ValPtr>(val) )
res = {ReturnCode::SUCCESS, "", std::get<ValPtr>(val)};
else
res = {ReturnCode::OPERATION_FAILED, std::get<std::string>(val)};
}
freeReplyObject(reply);
CompleteCallback(callback, res);
}
void Redis::HandleEraseResult(redisReply* reply, OperationResultCallback* callback) {
void Redis::HandleEraseResult(redisReply* reply, ResultCallback* callback) {
--active_ops;
OperationResult res{ReturnCode::SUCCESS};
@ -445,9 +458,9 @@ void Redis::HandleEraseResult(redisReply* reply, OperationResultCallback* callba
if ( ! connected )
res = {ReturnCode::NOT_CONNECTED};
else if ( ! reply )
res = {ReturnCode::OPERATION_FAILED, "Async erase operation returned null reply"};
else if ( reply && reply->type == REDIS_REPLY_ERROR )
res = {ReturnCode::OPERATION_FAILED, util::fmt("Async erase operation failed: %s", reply->str)};
res = {ReturnCode::OPERATION_FAILED, "erase operation returned null reply"};
else if ( reply->type == REDIS_REPLY_ERROR )
res = ParseReplyError("erase", reply->str);
freeReplyObject(reply);
CompleteCallback(callback, res);
@ -506,22 +519,14 @@ void Redis::ProcessFd(int fd, int flags) {
redisAsyncHandleWrite(async_ctx);
}
OperationResult Redis::ParseGetReply(redisReply* reply) const {
OperationResult res;
if ( ! reply )
res = {ReturnCode::OPERATION_FAILED, "GET returned null reply"};
else if ( ! reply->str )
res = {ReturnCode::KEY_NOT_FOUND};
else {
auto val = zeek::detail::ValFromJSON(reply->str, val_type, Func::nil);
if ( std::holds_alternative<ValPtr>(val) )
res = {ReturnCode::SUCCESS, "", std::get<ValPtr>(val)};
else
res = {ReturnCode::OPERATION_FAILED, std::get<std::string>(val)};
}
return res;
OperationResult Redis::ParseReplyError(std::string_view op_str, std::string_view reply_err_str) const {
if ( async_ctx->err == REDIS_ERR_TIMEOUT )
return {ReturnCode::TIMEOUT};
else if ( async_ctx->err == REDIS_ERR_IO )
return {ReturnCode::OPERATION_FAILED, util::fmt("%s operation IO error: %s", op_str.data(), strerror(errno))};
else
return {ReturnCode::OPERATION_FAILED,
util::fmt("%s operation failed: %s", op_str.data(), reply_err_str.data())};
}
void Redis::DoPoll() {

View file

@ -37,9 +37,9 @@ public:
void OnConnect(int status);
void OnDisconnect(int status);
void HandlePutResult(redisReply* reply, OperationResultCallback* callback);
void HandleGetResult(redisReply* reply, OperationResultCallback* callback);
void HandleEraseResult(redisReply* reply, OperationResultCallback* callback);
void HandlePutResult(redisReply* reply, ResultCallback* callback);
void HandleGetResult(redisReply* reply, ResultCallback* callback);
void HandleEraseResult(redisReply* reply, ResultCallback* callback);
void HandleGeneric(redisReply* reply);
/**
@ -51,15 +51,15 @@ public:
private:
OperationResult DoOpen(OpenResultCallback* cb, RecordValPtr options) override;
OperationResult DoClose(OperationResultCallback* cb) override;
OperationResult DoPut(OperationResultCallback* cb, ValPtr key, ValPtr value, bool overwrite,
OperationResult DoClose(ResultCallback* cb) override;
OperationResult DoPut(ResultCallback* cb, ValPtr key, ValPtr value, bool overwrite,
double expiration_time) override;
OperationResult DoGet(OperationResultCallback* cb, ValPtr key) override;
OperationResult DoErase(OperationResultCallback* cb, ValPtr key) override;
OperationResult DoGet(ResultCallback* cb, ValPtr key) override;
OperationResult DoErase(ResultCallback* cb, ValPtr key) override;
void DoExpire(double current_network_time) override;
void DoPoll() override;
OperationResult ParseGetReply(redisReply* reply) const;
OperationResult ParseReplyError(std::string_view op_str, std::string_view reply_err_str) const;
redisAsyncContext* async_ctx = nullptr;
@ -69,7 +69,7 @@ private:
std::deque<redisReply*> reply_queue;
OpenResultCallback* open_cb;
OperationResultCallback* close_cb;
ResultCallback* close_cb;
std::mutex expire_mutex;
std::string server_addr;

View file

@ -20,7 +20,7 @@ OperationResult SQLite::DoOpen(OpenResultCallback* cb, RecordValPtr options) {
"SQLite reports that it is not threadsafe. Zeek needs a threadsafe version of "
"SQLite. Aborting";
Error(res.c_str());
return {ReturnCode::INITIALIZATION_FAILED, res};
return {ReturnCode::INITIALIZATION_FAILED, std::move(res)};
}
// Allow connections to same DB to use single data/schema cache. Also
@ -52,7 +52,7 @@ OperationResult SQLite::DoOpen(OpenResultCallback* cb, RecordValPtr options) {
Error(err.c_str());
sqlite3_free(errorMsg);
Close(nullptr);
return {ReturnCode::INITIALIZATION_FAILED, err};
return {ReturnCode::INITIALIZATION_FAILED, std::move(err)};
}
if ( int res = sqlite3_exec(db, "pragma integrity_check", NULL, NULL, &errorMsg); res != SQLITE_OK ) {
@ -60,7 +60,7 @@ OperationResult SQLite::DoOpen(OpenResultCallback* cb, RecordValPtr options) {
Error(err.c_str());
sqlite3_free(errorMsg);
Close(nullptr);
return {ReturnCode::INITIALIZATION_FAILED, err};
return {ReturnCode::INITIALIZATION_FAILED, std::move(err)};
}
auto tuning_params = backend_options->GetField<TableVal>("tuning_params")->ToMap();
@ -74,7 +74,7 @@ OperationResult SQLite::DoOpen(OpenResultCallback* cb, RecordValPtr options) {
Error(err.c_str());
sqlite3_free(errorMsg);
Close(nullptr);
return {ReturnCode::INITIALIZATION_FAILED, err};
return {ReturnCode::INITIALIZATION_FAILED, std::move(err)};
}
}
@ -115,7 +115,7 @@ OperationResult SQLite::DoOpen(OpenResultCallback* cb, RecordValPtr options) {
/**
* Finalizes the backend when it's being closed.
*/
OperationResult SQLite::DoClose(OperationResultCallback* cb) {
OperationResult SQLite::DoClose(ResultCallback* cb) {
OperationResult op_res{ReturnCode::SUCCESS};
if ( db ) {
@ -127,9 +127,10 @@ OperationResult SQLite::DoClose(OperationResultCallback* cb) {
char* errmsg;
if ( int res = sqlite3_exec(db, "pragma optimize", NULL, NULL, &errmsg); res != SQLITE_OK ) {
// We're shutting down so capture the error message here for informational
// reasons, but don't do anything else with it.
op_res = {ReturnCode::DISCONNECTION_FAILED, util::fmt("Sqlite failed to optimize at shutdown: %s", errmsg)};
sqlite3_free(&errmsg);
// TODO: we're shutting down. does this error matter other than being informational?
sqlite3_free(errmsg);
}
if ( int res = sqlite3_close_v2(db); res != SQLITE_OK ) {
@ -146,8 +147,7 @@ OperationResult SQLite::DoClose(OperationResultCallback* cb) {
/**
* The workhorse method for Put(). This must be implemented by plugins.
*/
OperationResult SQLite::DoPut(OperationResultCallback* cb, ValPtr key, ValPtr value, bool overwrite,
double expiration_time) {
OperationResult SQLite::DoPut(ResultCallback* cb, ValPtr key, ValPtr value, bool overwrite, double expiration_time) {
if ( ! db )
return {ReturnCode::NOT_CONNECTED};
@ -193,7 +193,7 @@ OperationResult SQLite::DoPut(OperationResultCallback* cb, ValPtr key, ValPtr va
/**
* The workhorse method for Get(). This must be implemented for plugins.
*/
OperationResult SQLite::DoGet(OperationResultCallback* cb, ValPtr key) {
OperationResult SQLite::DoGet(ResultCallback* cb, ValPtr key) {
if ( ! db )
return {ReturnCode::NOT_CONNECTED};
@ -213,7 +213,7 @@ OperationResult SQLite::DoGet(OperationResultCallback* cb, ValPtr key) {
/**
* The workhorse method for Erase(). This must be implemented for plugins.
*/
OperationResult SQLite::DoErase(OperationResultCallback* cb, ValPtr key) {
OperationResult SQLite::DoErase(ResultCallback* cb, ValPtr key) {
if ( ! db )
return {ReturnCode::NOT_CONNECTED};
@ -237,12 +237,18 @@ OperationResult SQLite::DoErase(OperationResultCallback* cb, ValPtr key) {
void SQLite::DoExpire(double current_network_time) {
auto stmt = expire_stmt.get();
if ( auto res = CheckError(sqlite3_bind_double(stmt, 1, current_network_time)); res.code != ReturnCode::SUCCESS ) {
sqlite3_reset(stmt);
// TODO: do something with the error here?
int status = sqlite3_bind_double(stmt, 1, current_network_time);
if ( status != SQLITE_OK ) {
// TODO: do something with the error?
}
else {
status = sqlite3_step(stmt);
if ( status != SQLITE_ROW ) {
// TODO: should this return an error somehow? Reporter warning?
}
}
Step(stmt, false);
sqlite3_reset(stmt);
}
// returns true in case of error
@ -285,6 +291,8 @@ OperationResult SQLite::Step(sqlite3_stmt* stmt, bool parse_value) {
else if ( step_status == SQLITE_BUSY )
// TODO: this could retry a number of times instead of just failing
ret = {ReturnCode::TIMEOUT};
else if ( step_status == SQLITE_CONSTRAINT )
ret = {ReturnCode::KEY_EXISTS};
else
ret = {ReturnCode::OPERATION_FAILED};

View file

@ -24,11 +24,11 @@ public:
private:
OperationResult DoOpen(OpenResultCallback* cb, RecordValPtr options) override;
OperationResult DoClose(OperationResultCallback* cb) override;
OperationResult DoPut(OperationResultCallback* cb, ValPtr key, ValPtr value, bool overwrite,
OperationResult DoClose(ResultCallback* cb) override;
OperationResult DoPut(ResultCallback* cb, ValPtr key, ValPtr value, bool overwrite,
double expiration_time) override;
OperationResult DoGet(OperationResultCallback* cb, ValPtr key) override;
OperationResult DoErase(OperationResultCallback* cb, ValPtr key) override;
OperationResult DoGet(ResultCallback* cb, ValPtr key) override;
OperationResult DoErase(ResultCallback* cb, ValPtr key) override;
void DoExpire(double current_network_time) override;
/**

View file

@ -107,7 +107,7 @@ function Storage::Async::__close_backend%(backend: opaque of Storage::BackendHan
if ( ! trigger )
return nullptr;
auto cb = new OperationResultCallback(trigger, frame->GetTriggerAssoc());
auto cb = new ResultCallback(trigger, frame->GetTriggerAssoc());
auto b = cast_handle(backend);
if ( ! b ) {
cb->Complete(b.error());
@ -128,7 +128,7 @@ function Storage::Async::__put%(backend: opaque of Storage::BackendHandle, key:
if ( ! trigger )
return nullptr;
auto cb = new OperationResultCallback(trigger, frame->GetTriggerAssoc());
auto cb = new ResultCallback(trigger, frame->GetTriggerAssoc());
auto b = cast_handle(backend);
if ( ! b ) {
cb->Complete(b.error());
@ -153,7 +153,7 @@ function Storage::Async::__get%(backend: opaque of Storage::BackendHandle, key:
if ( ! trigger )
return nullptr;
auto cb = new OperationResultCallback(trigger, frame->GetTriggerAssoc());
auto cb = new ResultCallback(trigger, frame->GetTriggerAssoc());
auto b = cast_handle(backend);
if ( ! b ) {
cb->Complete(b.error());
@ -174,7 +174,7 @@ function Storage::Async::__erase%(backend: opaque of Storage::BackendHandle, key
if ( ! trigger )
return nullptr;
auto cb = new OperationResultCallback(trigger, frame->GetTriggerAssoc());
auto cb = new ResultCallback(trigger, frame->GetTriggerAssoc());
auto b = cast_handle(backend);
if ( ! b ) {
cb->Complete(b.error());

View file

@ -68,7 +68,7 @@ function Storage::Sync::__close_backend%(backend: opaque of Storage::BackendHand
if ( ! b )
op_result = b.error();
else {
auto cb = new OperationResultCallback();
auto cb = new ResultCallback();
op_result = storage_mgr->CloseBackend((*b)->backend, cb);
// If the backend only supports async, block until it's ready and then pull the result out of
@ -96,7 +96,7 @@ function Storage::Sync::__put%(backend: opaque of Storage::BackendHandle, key: a
if ( expire_time > 0.0 )
expire_time += run_state::network_time;
auto cb = new OperationResultCallback();
auto cb = new ResultCallback();
auto key_v = IntrusivePtr<Val>{NewRef{}, key};
auto val_v = IntrusivePtr<Val>{NewRef{}, value};
op_result = (*b)->backend->Put(cb, key_v, val_v, overwrite, expire_time);
@ -122,7 +122,7 @@ function Storage::Sync::__get%(backend: opaque of Storage::BackendHandle, key: a
if ( ! b )
op_result = b.error();
else {
auto cb = new OperationResultCallback();
auto cb = new ResultCallback();
auto key_v = IntrusivePtr<Val>{NewRef{}, key};
op_result = (*b)->backend->Get(cb, key_v);
@ -147,7 +147,7 @@ function Storage::Sync::__erase%(backend: opaque of Storage::BackendHandle, key:
if ( ! b )
op_result = b.error();
else {
auto cb = new OperationResultCallback();
auto cb = new ResultCallback();
auto key_v = IntrusivePtr<Val>{NewRef{}, key};
op_result = (*b)->backend->Erase(cb, key_v);

View file

@ -3,3 +3,9 @@ open result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=<opaque of
put result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=<uninitialized>]
get result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=value7890]
get result same as inserted, T
put result, [code=Storage::KEY_EXISTS, error_str=<uninitialized>, value=<uninitialized>]
get result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=value7890]
get result same as originally inserted, T
put result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=<uninitialized>]
get result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=value2345]
get result same as overwritten, T

View file

@ -3,8 +3,11 @@ open_result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=<opaque of
put result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=<uninitialized>]
get result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=value1234]
get result same as inserted, T
overwrite put result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=<uninitialized>]
get result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=value5678]
get result same as inserted, T
put result, [code=Storage::KEY_EXISTS, error_str=<uninitialized>, value=<uninitialized>]
get result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=value1234]
get result same as originally inserted, T
put result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=<uninitialized>]
get result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=value2345]
get result same as overwritten, T
Storage::backend_opened, Storage::REDIS, [redis=[server_host=127.0.0.1, server_port=xxxx/tcp, server_unix_socket=<uninitialized>, key_prefix=testing]]
Storage::backend_lost, Storage::REDIS, [redis=[server_host=127.0.0.1, server_port=xxxx/tcp, server_unix_socket=<uninitialized>, key_prefix=testing]], Client disconnected

View file

@ -34,7 +34,7 @@ OperationResult StorageDummy::DoOpen(OpenResultCallback* cb, RecordValPtr option
/**
* Finalizes the backend when it's being closed.
*/
OperationResult StorageDummy::DoClose(OperationResultCallback* cb) {
OperationResult StorageDummy::DoClose(ResultCallback* cb) {
open = false;
return {ReturnCode::SUCCESS};
}
@ -42,7 +42,7 @@ OperationResult StorageDummy::DoClose(OperationResultCallback* cb) {
/**
* The workhorse method for Put(). This must be implemented by plugins.
*/
OperationResult StorageDummy::DoPut(OperationResultCallback* cb, ValPtr key, ValPtr value, bool overwrite,
OperationResult StorageDummy::DoPut(ResultCallback* cb, ValPtr key, ValPtr value, bool overwrite,
double expiration_time) {
auto json_key = key->ToJSON()->ToStdString();
auto json_value = value->ToJSON()->ToStdString();
@ -53,7 +53,7 @@ OperationResult StorageDummy::DoPut(OperationResultCallback* cb, ValPtr key, Val
/**
* The workhorse method for Get(). This must be implemented for plugins.
*/
OperationResult StorageDummy::DoGet(OperationResultCallback* cb, ValPtr key) {
OperationResult StorageDummy::DoGet(ResultCallback* cb, ValPtr key) {
auto json_key = key->ToJSON();
auto it = data.find(json_key->ToStdString());
if ( it == data.end() )
@ -71,7 +71,7 @@ OperationResult StorageDummy::DoGet(OperationResultCallback* cb, ValPtr key) {
/**
* The workhorse method for Erase(). This must be implemented for plugins.
*/
OperationResult StorageDummy::DoErase(OperationResultCallback* cb, ValPtr key) {
OperationResult StorageDummy::DoErase(ResultCallback* cb, ValPtr key) {
auto json_key = key->ToJSON();
auto it = data.find(json_key->ToStdString());
if ( it == data.end() )

View file

@ -26,7 +26,7 @@ public:
/**
* Finalizes the backend when it's being closed.
*/
zeek::storage::OperationResult DoClose(zeek::storage::OperationResultCallback* cb = nullptr) override;
zeek::storage::OperationResult DoClose(zeek::storage::ResultCallback* cb = nullptr) override;
/**
* Returns whether the backend is opened.
@ -36,19 +36,18 @@ public:
/**
* The workhorse method for Put().
*/
zeek::storage::OperationResult DoPut(zeek::storage::OperationResultCallback* cb, zeek::ValPtr key,
zeek::ValPtr value, bool overwrite = true,
double expiration_time = 0) override;
zeek::storage::OperationResult DoPut(zeek::storage::ResultCallback* cb, zeek::ValPtr key, zeek::ValPtr value,
bool overwrite = true, double expiration_time = 0) override;
/**
* The workhorse method for Get().
*/
zeek::storage::OperationResult DoGet(zeek::storage::OperationResultCallback* cb, zeek::ValPtr key) override;
zeek::storage::OperationResult DoGet(zeek::storage::ResultCallback* cb, zeek::ValPtr key) override;
/**
* The workhorse method for Erase().
*/
zeek::storage::OperationResult DoErase(zeek::storage::OperationResultCallback* cb, zeek::ValPtr key) override;
zeek::storage::OperationResult DoErase(zeek::storage::ResultCallback* cb, zeek::ValPtr key) override;
private:
std::map<std::string, std::string> data;

View file

@ -10,9 +10,6 @@
@load base/frameworks/storage/sync
# Create a typename here that can be passed down into get().
type str: string;
type StorageDummyOpts : record {
open_fail: bool;
};
@ -30,7 +27,7 @@ event zeek_init() {
# Test basic operation. The second get() should return an error
# as the key should have been erased.
local open_res = Storage::Sync::open_backend(Storage::STORAGEDUMMY, opts, str, str);
local open_res = Storage::Sync::open_backend(Storage::STORAGEDUMMY, opts, string, string);
print "open result", open_res;
local b = open_res$value;
local put_res = Storage::Sync::put(b, [$key=key, $value=value, $overwrite=F]);
@ -56,7 +53,7 @@ event zeek_init() {
# Test failing to open the handle and test closing an invalid handle.
opts$dummy$open_fail = T;
open_res = Storage::Sync::open_backend(Storage::STORAGEDUMMY, opts, str, str);
open_res = Storage::Sync::open_backend(Storage::STORAGEDUMMY, opts, string, string);
print "open result 2", open_res;
local close_res = Storage::Sync::close_backend(open_res$value);
print "close result of closed handle", close_res;

View file

@ -26,5 +26,5 @@ event zeek_init()
event pe_dos_header(f: fa_file, h: PE::DOSHeader)
{
print "got pe_dos_header event";
exit(0);
terminate();
}

View file

@ -7,9 +7,6 @@
@load base/frameworks/storage/sync
@load policy/frameworks/storage/backend/sqlite
# Create a typename here that can be passed down into get().
type str: string;
event zeek_init()
{
# Create a database file in the .tmp directory with a 'testing' table
@ -20,7 +17,7 @@ event zeek_init()
# Test inserting/retrieving a key/value pair that we know won't be in
# the backend yet.
local open_res = Storage::Sync::open_backend(Storage::SQLITE, opts, str, str);
local open_res = Storage::Sync::open_backend(Storage::SQLITE, opts, string, string);
print "open result", open_res;
local b = open_res$value;

View file

@ -9,9 +9,6 @@
redef Storage::expire_interval = 2 secs;
redef exit_only_after_terminate = T;
# Create a typename here that can be passed down into get().
type str: string;
global b: opaque of Storage::BackendHandle;
global key1: string = "key1234";
global value1: string = "value1234";
@ -36,7 +33,7 @@ event setup_test()
local opts : Storage::BackendOptions;
opts$sqlite = [$database_path = "storage-test.sqlite", $table_name = "testing"];
local open_res = Storage::Sync::open_backend(Storage::SQLITE, opts, str, str);
local open_res = Storage::Sync::open_backend(Storage::SQLITE, opts, string, string);
print "open result", open_res;
b = open_res$value;

View file

@ -12,22 +12,45 @@ type str: string;
event zeek_init() {
local opts : Storage::BackendOptions;
opts$sqlite = [$database_path = "storage-test.sqlite", $table_name = "testing"];
opts$sqlite = [$database_path = "testing.sqlite", $table_name = "testing"];
local key = "key1234";
local key = "key1111";
local value = "value7890";
local value2 = "value2345";
local open_res = Storage::Sync::open_backend(Storage::SQLITE, opts, str, str);
print "open result", open_res;
local b = open_res$value;
local res = Storage::Sync::open_backend(Storage::SQLITE, opts, str, str);
print "open result", res;
local b = res$value;
local res = Storage::Sync::put(b, [$key=key, $value=value]);
# Put a first value. This should return Storage::SUCCESS.
res = Storage::Sync::put(b, [$key=key, $value=value]);
print "put result", res;
local res2 = Storage::Sync::get(b, key);
print "get result", res2;
if ( res2$code == Storage::SUCCESS && res2?$value )
print "get result same as inserted", value == (res2$value as string);
# Get the first value, validate that it's what we inserted.
res = Storage::Sync::get(b, key);
print "get result", res;
if ( res$code == Storage::SUCCESS && res?$value )
print "get result same as inserted", value == (res$value as string);
# This will return a Storage::KEY_EXISTS since we don't want overwriting.
res = Storage::Sync::put(b, [$key=key, $value=value2, $overwrite=F]);
print "put result", res;
# Verify that the overwrite didn't actually happen.
res = Storage::Sync::get(b, key);
print "get result", res;
if ( res$code == Storage::SUCCESS && res?$value )
print "get result same as originally inserted", value == (res$value as string);
# This will return a Storage::SUCESSS since we're asking for an overwrite.
res = Storage::Sync::put(b, [$key=key, $value=value2, $overwrite=T]);
print "put result", res;
# Verify that the overwrite happened.
res = Storage::Sync::get(b, key);
print "get result", res;
if ( res$code == Storage::SUCCESS && res?$value )
print "get result same as overwritten", value2 == (res$value as string);
Storage::Sync::close_backend(b);
}

View file

@ -13,9 +13,6 @@
@load base/frameworks/storage/async
@load policy/frameworks/storage/backend/redis
# Create a typename here that can be passed down into open_backend()
type str: string;
event zeek_init()
{
local opts: Storage::BackendOptions;
@ -25,7 +22,7 @@ event zeek_init()
local key = "key1234";
local value = "value5678";
local open_res = Storage::Sync::open_backend(Storage::REDIS, opts, str, str);
local open_res = Storage::Sync::open_backend(Storage::REDIS, opts, string, string);
print "open result", open_res;
local b = open_res$value;

View file

@ -15,9 +15,6 @@
redef exit_only_after_terminate = T;
# Create a typename here that can be passed down into open_backend()
type str: string;
event zeek_init()
{
local opts: Storage::BackendOptions;
@ -28,7 +25,7 @@ event zeek_init()
local value = "value5678";
when [opts, key, value] ( local open_res = Storage::Async::open_backend(
Storage::REDIS, opts, str, str) )
Storage::REDIS, opts, string, string) )
{
print "open result", open_res;
local b = open_res$value;

View file

@ -33,7 +33,6 @@ global redis_data_written: event() &is_used;
@if ( Cluster::local_node_type() == Cluster::WORKER )
global backend: opaque of Storage::BackendHandle;
type str: string;
event zeek_init()
{
@ -41,7 +40,7 @@ event zeek_init()
opts$redis = [ $server_host="127.0.0.1", $server_port=to_port(getenv(
"REDIS_PORT")), $key_prefix="testing" ];
local open_res = Storage::Sync::open_backend(Storage::REDIS, opts, str, str);
local open_res = Storage::Sync::open_backend(Storage::REDIS, opts, string, string);
backend = open_res$value;
}

View file

@ -14,9 +14,6 @@
redef exit_only_after_terminate = T;
# Create a typename here that can be passed down into open_backend()
type str: string;
event Storage::backend_opened(tag: string, config: any) {
print "Storage::backend_opened", tag, config;
}
@ -35,7 +32,7 @@ event zeek_init()
local key = "key1234";
local value = "value1234";
local open_res = Storage::Sync::open_backend(Storage::REDIS, opts, str, str);
local open_res = Storage::Sync::open_backend(Storage::REDIS, opts, string, string);
print "open_result", open_res;
# Kill the redis server so the backend will disconnect and fire the backend_lost event.

View file

@ -12,9 +12,6 @@
@load base/frameworks/storage/sync
@load policy/frameworks/storage/backend/redis
# Create a typename here that can be passed down into open_backend()
type str: string;
event zeek_init()
{
local opts: Storage::BackendOptions;
@ -24,7 +21,7 @@ event zeek_init()
local key = "key1234";
local value = "value1234";
local open_res = Storage::Sync::open_backend(Storage::REDIS, opts, str, str);
local open_res = Storage::Sync::open_backend(Storage::REDIS, opts, string, string);
print "open_result", open_res;
local b = open_res$value;

View file

@ -15,9 +15,6 @@
redef Storage::expire_interval = 2secs;
redef exit_only_after_terminate = T;
# Create a typename here that can be passed down into open_backend()
type str: string;
global b: opaque of Storage::BackendHandle;
global key1: string = "key1234";
global value1: string = "value1234";
@ -43,7 +40,7 @@ event setup_test()
opts$redis = [ $server_host="127.0.0.1", $server_port=to_port(getenv(
"REDIS_PORT")), $key_prefix="testing" ];
local open_res = Storage::Sync::open_backend(Storage::REDIS, opts, str, str);
local open_res = Storage::Sync::open_backend(Storage::REDIS, opts, string, string);
print "open result", open_res;
b = open_res$value;

View file

@ -15,9 +15,6 @@
redef Storage::expire_interval = 2secs;
redef exit_only_after_terminate = T;
# Create a typename here that can be passed down into open_backend()
type str: string;
global b: opaque of Storage::BackendHandle;
global key1: string = "key1234";
global value1: string = "value1234";
@ -43,7 +40,7 @@ event setup_test()
opts$redis = [ $server_host="127.0.0.1", $server_port=to_port(getenv(
"REDIS_PORT")), $key_prefix="testing" ];
local open_res = Storage::Sync::open_backend(Storage::REDIS, opts, str, str);
local open_res = Storage::Sync::open_backend(Storage::REDIS, opts, string, string);
print "open result", open_res;
b = open_res$value;

View file

@ -12,9 +12,6 @@
@load base/frameworks/storage/sync
@load policy/frameworks/storage/backend/redis
# Create a typename here that can be passed down into open_backend()
type str: string;
event Storage::backend_opened(tag: string, config: any) {
print "Storage::backend_opened", tag, config;
}
@ -32,28 +29,42 @@ event zeek_init()
local key = "key1234";
local value = "value1234";
local value2 = "value2345";
local open_res = Storage::Sync::open_backend(Storage::REDIS, opts, str, str);
print "open_result", open_res;
local res = Storage::Sync::open_backend(Storage::REDIS, opts, string, string);
print "open_result", res;
local b = open_res$value;
local b = res$value;
local res = Storage::Sync::put(b, [ $key=key, $value=value ]);
# Put a first value. This should return Storage::SUCCESS.
res = Storage::Sync::put(b, [$key=key, $value=value]);
print "put result", res;
local res2 = Storage::Sync::get(b, key);
print "get result", res2;
if ( res2$code == Storage::SUCCESS && res2?$value )
print "get result same as inserted", value == ( res2$value as string );
# Get the first value, validate that it's what we inserted.
res = Storage::Sync::get(b, key);
print "get result", res;
if ( res$code == Storage::SUCCESS && res?$value )
print "get result same as inserted", value == (res$value as string);
local value2 = "value5678";
res = Storage::Sync::put(b, [ $key=key, $value=value2, $overwrite=T ]);
print "overwrite put result", res;
# This will return a Storage::KEY_EXISTS since we don't want overwriting.
res = Storage::Sync::put(b, [$key=key, $value=value2, $overwrite=F]);
print "put result", res;
res2 = Storage::Sync::get(b, key);
print "get result", res2;
if ( res2$code == Storage::SUCCESS && res2?$value )
print "get result same as inserted", value2 == ( res2$value as string );
# Verify that the overwrite didn't actually happen.
res = Storage::Sync::get(b, key);
print "get result", res;
if ( res$code == Storage::SUCCESS && res?$value )
print "get result same as originally inserted", value == (res$value as string);
# This will return a Storage::SUCESSS since we're asking for an overwrite.
res = Storage::Sync::put(b, [$key=key, $value=value2, $overwrite=T]);
print "put result", res;
# Verify that the overwrite happened.
res = Storage::Sync::get(b, key);
print "get result", res;
if ( res$code == Storage::SUCCESS && res?$value )
print "get result same as overwritten", value2 == (res$value as string);
Storage::Sync::close_backend(b);
}

View file

@ -9,9 +9,6 @@
redef exit_only_after_terminate = T;
# Create a typename here that can be passed down into get().
type str: string;
event zeek_init()
{
# Create a database file in the .tmp directory with a 'testing' table
@ -23,7 +20,7 @@ event zeek_init()
# Test inserting/retrieving a key/value pair that we know won't be in
# the backend yet.
local open_res = Storage::Sync::open_backend(Storage::SQLITE, opts, str, str);
local open_res = Storage::Sync::open_backend(Storage::SQLITE, opts, string, string);
print "open result", open_res;
local b = open_res$value;

View file

@ -8,9 +8,6 @@
redef exit_only_after_terminate = T;
# Create a typename here that can be passed down into get().
type str: string;
event Storage::backend_opened(tag: string, config: any) {
print "Storage::backend_opened", tag, config;
}
@ -27,7 +24,7 @@ event zeek_init()
# Test inserting/retrieving a key/value pair that we know won't be in
# the backend yet.
when [opts, key, value] ( local open_res = Storage::Sync::open_backend(
Storage::SQLITE, opts, str, str) )
Storage::SQLITE, opts, string, string) )
{
print "open result", open_res;
local b = open_res$value;

View file

@ -8,9 +8,6 @@
redef exit_only_after_terminate = T;
# Create a typename here that can be passed down into get().
type str: string;
event Storage::backend_opened(tag: string, config: any) {
print "Storage::backend_opened", tag, config;
}
@ -27,7 +24,7 @@ event zeek_init()
# Test inserting/retrieving a key/value pair that we know won't be in
# the backend yet.
when [opts, key, value] ( local open_res = Storage::Async::open_backend(
Storage::SQLITE, opts, str, str) )
Storage::SQLITE, opts, string, string) )
{
print "open result", open_res;
local b = open_res$value;

View file

@ -7,9 +7,6 @@
@load base/frameworks/reporter
@load policy/frameworks/storage/backend/sqlite
# Create a typename here that can be passed down into open_backend.
type str: string;
event zeek_init() {
# Test opening a database with an invalid path
local opts : Storage::BackendOptions;
@ -17,12 +14,12 @@ event zeek_init() {
$table_name = "testing"];
# This should report an error in .stderr and reporter.log
local open_res = Storage::Sync::open_backend(Storage::SQLITE, opts, str, str);
local open_res = Storage::Sync::open_backend(Storage::SQLITE, opts, string, string);
print "Open result", open_res;
# Open a valid database file
opts$sqlite$database_path = "test.sqlite";
open_res = Storage::Sync::open_backend(Storage::SQLITE, opts, str, str);
open_res = Storage::Sync::open_backend(Storage::SQLITE, opts, string, string);
print "Open result 2", open_res;
local b = open_res$value;