mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 06:38:20 +00:00
Merge remote-tracking branch 'origin/topic/timw/storage-expire-contention'
* origin/topic/timw/storage-expire-contention: Add busy_timeout script-level option, override any busy_timeout pragma Handle potential contention when running sqlite expiration Add expiration to sqlite-cluster.btest Use unique_ptr to avoid needing to call sqlite3_reset manually Move Deferred class from ZeroMQ to util
This commit is contained in:
commit
e6492f7c7b
11 changed files with 299 additions and 85 deletions
19
CHANGES
19
CHANGES
|
@ -1,3 +1,22 @@
|
||||||
|
8.0.0-dev.375 | 2025-06-05 12:43:26 -0700
|
||||||
|
|
||||||
|
* Add busy_timeout script-level option, override any busy_timeout pragma (Tim Wojtulewicz, Corelight)
|
||||||
|
|
||||||
|
* Handle potential contention when running sqlite expiration (Tim Wojtulewicz, Corelight)
|
||||||
|
|
||||||
|
* Add expiration to sqlite-cluster.btest (Tim Wojtulewicz, Corelight)
|
||||||
|
|
||||||
|
* Use unique_ptr to avoid needing to call sqlite3_reset manually (Tim Wojtulewicz, Corelight)
|
||||||
|
|
||||||
|
* Move Deferred class from ZeroMQ to util (Tim Wojtulewicz, Corelight)
|
||||||
|
|
||||||
|
* Temporarily disable pppoe-over-qinq-test for spicy-ssl (Johanna Amann, Corelight)
|
||||||
|
|
||||||
|
The analyzer.log changes exposed a new bug in the Spicy SSL
|
||||||
|
implemenataion.
|
||||||
|
|
||||||
|
Relates to GH-4547
|
||||||
|
|
||||||
8.0.0-dev.367 | 2025-06-05 08:22:25 -0700
|
8.0.0-dev.367 | 2025-06-05 08:22:25 -0700
|
||||||
|
|
||||||
* Silence -Wnontrivial-memcall warning in ConnKey methods (Tim Wojtulewicz, Corelight)
|
* Silence -Wnontrivial-memcall warning in ConnKey methods (Tim Wojtulewicz, Corelight)
|
||||||
|
|
2
VERSION
2
VERSION
|
@ -1 +1 @@
|
||||||
8.0.0-dev.367
|
8.0.0-dev.375
|
||||||
|
|
|
@ -20,14 +20,19 @@ export {
|
||||||
## different between the two.
|
## different between the two.
|
||||||
table_name: string;
|
table_name: string;
|
||||||
|
|
||||||
|
## The timeout for the connection to the database. This is set
|
||||||
|
## per-connection. It is equivalent to setting a ``busy_timeout`` pragma
|
||||||
|
## value, but that value will be ignored in favor of this field.
|
||||||
|
busy_timeout: interval &default=5 secs;
|
||||||
|
|
||||||
## Key/value table for passing pragma commands when opening the database.
|
## Key/value table for passing pragma commands when opening the database.
|
||||||
## These must be pairs that can be passed to the ``pragma`` command in
|
## These must be pairs that can be passed to the ``pragma`` command in
|
||||||
## sqlite. The ``integrity_check`` pragma is run automatically and does
|
## sqlite. The ``integrity_check`` pragma is run automatically and does
|
||||||
## not need to be included here. For pragmas without a second argument,
|
## not need to be included here. For pragmas without a second argument,
|
||||||
## set the value to an empty string.
|
## set the value to an empty string. Setting the ``busy_timeout`` pragma
|
||||||
|
## here will be ignored.
|
||||||
pragma_commands: table[string] of string &ordered &default=table(
|
pragma_commands: table[string] of string &ordered &default=table(
|
||||||
["integrity_check"] = "",
|
["integrity_check"] = "",
|
||||||
["busy_timeout"] = "5000",
|
|
||||||
["journal_mode"] = "WAL",
|
["journal_mode"] = "WAL",
|
||||||
["synchronous"] = "normal",
|
["synchronous"] = "normal",
|
||||||
["temp_store"] = "memory"
|
["temp_store"] = "memory"
|
||||||
|
|
|
@ -571,16 +571,6 @@ void ZeroMQBackend::Run() {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// Helper class running at destruction.
|
|
||||||
class Deferred {
|
|
||||||
public:
|
|
||||||
Deferred(std::function<void()> deferred) : closer(std::move(deferred)) {}
|
|
||||||
~Deferred() { closer(); }
|
|
||||||
|
|
||||||
private:
|
|
||||||
std::function<void()> closer;
|
|
||||||
};
|
|
||||||
|
|
||||||
struct SocketInfo {
|
struct SocketInfo {
|
||||||
zmq::socket_ref socket;
|
zmq::socket_ref socket;
|
||||||
std::string name;
|
std::string name;
|
||||||
|
@ -595,7 +585,7 @@ void ZeroMQBackend::Run() {
|
||||||
};
|
};
|
||||||
|
|
||||||
// Called when Run() terminates.
|
// Called when Run() terminates.
|
||||||
auto deferred_close = Deferred([this]() {
|
auto deferred_close = util::Deferred([this]() {
|
||||||
child_inproc.close();
|
child_inproc.close();
|
||||||
xpub.close();
|
xpub.close();
|
||||||
xsub.close();
|
xsub.close();
|
||||||
|
|
|
@ -12,6 +12,8 @@
|
||||||
#include "zeek/Val.h"
|
#include "zeek/Val.h"
|
||||||
#include "zeek/storage/ReturnCode.h"
|
#include "zeek/storage/ReturnCode.h"
|
||||||
|
|
||||||
|
#include "const.bif.netvar_h"
|
||||||
|
|
||||||
using namespace std::chrono_literals;
|
using namespace std::chrono_literals;
|
||||||
|
|
||||||
namespace zeek::storage::backend::sqlite {
|
namespace zeek::storage::backend::sqlite {
|
||||||
|
@ -82,6 +84,8 @@ OperationResult SQLite::DoOpen(OpenResultCallback* cb, RecordValPtr options) {
|
||||||
full_path = zeek::filesystem::path(path->ToStdString()).string();
|
full_path = zeek::filesystem::path(path->ToStdString()).string();
|
||||||
table_name = backend_options->GetField<StringVal>("table_name")->ToStdString();
|
table_name = backend_options->GetField<StringVal>("table_name")->ToStdString();
|
||||||
|
|
||||||
|
auto busy_timeout = backend_options->GetField<IntervalVal>("busy_timeout")->Get();
|
||||||
|
|
||||||
auto pragma_timeout_val = backend_options->GetField<IntervalVal>("pragma_timeout");
|
auto pragma_timeout_val = backend_options->GetField<IntervalVal>("pragma_timeout");
|
||||||
pragma_timeout = std::chrono::milliseconds(static_cast<int64_t>(pragma_timeout_val->Get() * 1000));
|
pragma_timeout = std::chrono::milliseconds(static_cast<int64_t>(pragma_timeout_val->Get() * 1000));
|
||||||
|
|
||||||
|
@ -90,41 +94,54 @@ OperationResult SQLite::DoOpen(OpenResultCallback* cb, RecordValPtr options) {
|
||||||
|
|
||||||
if ( auto open_res =
|
if ( auto open_res =
|
||||||
CheckError(sqlite3_open_v2(full_path.c_str(), &db,
|
CheckError(sqlite3_open_v2(full_path.c_str(), &db,
|
||||||
SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_FULLMUTEX, NULL));
|
SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_NOMUTEX, NULL));
|
||||||
open_res.code != ReturnCode::SUCCESS ) {
|
open_res.code != ReturnCode::SUCCESS ) {
|
||||||
sqlite3_close_v2(db);
|
sqlite3_close_v2(db);
|
||||||
db = nullptr;
|
db = nullptr;
|
||||||
return open_res;
|
return open_res;
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Should we use sqlite3_busy_timeout here instead of using the pragma? That would
|
sqlite3_busy_timeout(db, busy_timeout * 1000);
|
||||||
// at least let us skip over one. The busy timeout is per-connection as well, so it'll
|
|
||||||
// never fail to run like the other pragmas can.
|
|
||||||
// sqlite3_busy_timeout(db, 2000);
|
|
||||||
|
|
||||||
auto pragmas = backend_options->GetField<TableVal>("pragma_commands");
|
auto pragmas = backend_options->GetField<TableVal>("pragma_commands");
|
||||||
for ( const auto& iter : *(pragmas->Get()) ) {
|
for ( const auto& iter : *(pragmas->Get()) ) {
|
||||||
auto k = iter.GetHashKey();
|
auto k = iter.GetHashKey();
|
||||||
auto v = iter.value;
|
|
||||||
auto vl = pragmas->GetTableHash()->RecoverVals(*k);
|
auto vl = pragmas->GetTableHash()->RecoverVals(*k);
|
||||||
|
|
||||||
auto ks = vl->AsListVal()->Idx(0)->AsStringVal();
|
auto ks = vl->AsListVal()->Idx(0)->AsStringVal();
|
||||||
auto ks_sv = ks->ToStdStringView();
|
auto ks_sv = ks->ToStdStringView();
|
||||||
auto vs = v->GetVal()->AsStringVal();
|
|
||||||
|
if ( ks_sv == "busy_timeout" )
|
||||||
|
continue;
|
||||||
|
|
||||||
|
auto vs = iter.value->GetVal()->AsStringVal();
|
||||||
auto vs_sv = vs->ToStdStringView();
|
auto vs_sv = vs->ToStdStringView();
|
||||||
|
|
||||||
auto pragma_res = RunPragma(ks_sv, vs_sv);
|
auto pragma_res = RunPragma(ks_sv, vs_sv);
|
||||||
if ( pragma_res.code != ReturnCode::SUCCESS ) {
|
if ( pragma_res.code != ReturnCode::SUCCESS ) {
|
||||||
Error(pragma_res.err_str.c_str());
|
Error(pragma_res.err_str.c_str());
|
||||||
|
Close(nullptr);
|
||||||
return pragma_res;
|
return pragma_res;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string create = "create table if not exists " + table_name + " (";
|
// Open a second connection to the database. This one is used for expiration and exists to prevent
|
||||||
create.append("key_str blob primary key, value_str blob not null, expire_time real);");
|
// simultaneous multi-threaded access to the same connection.
|
||||||
|
if ( auto open_res =
|
||||||
|
CheckError(sqlite3_open_v2(full_path.c_str(), &expire_db,
|
||||||
|
SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_NOMUTEX, NULL));
|
||||||
|
open_res.code != ReturnCode::SUCCESS ) {
|
||||||
|
Close(nullptr);
|
||||||
|
return open_res;
|
||||||
|
}
|
||||||
|
|
||||||
|
sqlite3_busy_timeout(expire_db, busy_timeout * 1000);
|
||||||
|
|
||||||
|
std::string cmd = "create table if not exists " + table_name + " (";
|
||||||
|
cmd.append("key_str blob primary key, value_str blob not null, expire_time real);");
|
||||||
|
|
||||||
char* errorMsg = nullptr;
|
char* errorMsg = nullptr;
|
||||||
if ( int res = sqlite3_exec(db, create.c_str(), NULL, NULL, &errorMsg); res != SQLITE_OK ) {
|
if ( int res = sqlite3_exec(db, cmd.c_str(), NULL, NULL, &errorMsg); res != SQLITE_OK ) {
|
||||||
std::string err = util::fmt("Error executing table creation statement: (%d) %s", res, errorMsg);
|
std::string err = util::fmt("Error executing table creation statement: (%d) %s", res, errorMsg);
|
||||||
Error(err.c_str());
|
Error(err.c_str());
|
||||||
sqlite3_free(errorMsg);
|
sqlite3_free(errorMsg);
|
||||||
|
@ -132,34 +149,85 @@ OperationResult SQLite::DoOpen(OpenResultCallback* cb, RecordValPtr options) {
|
||||||
return {ReturnCode::INITIALIZATION_FAILED, std::move(err)};
|
return {ReturnCode::INITIALIZATION_FAILED, std::move(err)};
|
||||||
}
|
}
|
||||||
|
|
||||||
static std::array<std::string, 5> statements =
|
sqlite3_free(errorMsg);
|
||||||
{util::fmt("insert into %s (key_str, value_str, expire_time) values(?, ?, ?)", table_name.c_str()),
|
|
||||||
util::fmt("insert into %s (key_str, value_str, expire_time) values(?, ?, ?) ON CONFLICT(key_str) "
|
|
||||||
"DO UPDATE SET value_str=?",
|
|
||||||
table_name.c_str()),
|
|
||||||
util::fmt("select value_str from %s where key_str=?", table_name.c_str()),
|
|
||||||
util::fmt("delete from %s where key_str=?", table_name.c_str()),
|
|
||||||
util::fmt("delete from %s where expire_time > 0 and expire_time != 0 and expire_time <= ?",
|
|
||||||
table_name.c_str())};
|
|
||||||
|
|
||||||
std::array<unique_stmt_ptr, 5> stmt_ptrs;
|
// Create a table for controlling expiration contention. The ukey column here ensures that only
|
||||||
|
// one row exists for this backend's table.
|
||||||
|
cmd = util::fmt("create table if not exists zeek_storage_expiry_runs (ukey primary key, last_run double);");
|
||||||
|
if ( int res = sqlite3_exec(db, cmd.c_str(), NULL, NULL, &errorMsg); res != SQLITE_OK ) {
|
||||||
|
std::string err = util::fmt("Error executing table creation statement: (%d) %s", res, errorMsg);
|
||||||
|
Error(err.c_str());
|
||||||
|
sqlite3_free(errorMsg);
|
||||||
|
Close(nullptr);
|
||||||
|
return {ReturnCode::INITIALIZATION_FAILED, std::move(err)};
|
||||||
|
}
|
||||||
|
|
||||||
|
sqlite3_free(errorMsg);
|
||||||
|
|
||||||
|
// Attempt to insert an initial value into the table if this is the first run with
|
||||||
|
// this file. This may result in a SQLITE_CONSTRAINT if the row already exists. That's
|
||||||
|
// not an error, as it's possible if the file already existed.
|
||||||
|
cmd = util::fmt("insert into zeek_storage_expiry_runs (ukey, last_run) values('%s', 0);", table_name.c_str());
|
||||||
|
if ( int res = sqlite3_exec(db, cmd.c_str(), nullptr, nullptr, &errorMsg);
|
||||||
|
res != SQLITE_OK && res != SQLITE_CONSTRAINT ) {
|
||||||
|
std::string err =
|
||||||
|
util::fmt("Error inserting initial row into expiration control table: (%d) %s", res, errorMsg);
|
||||||
|
Error(err.c_str());
|
||||||
|
sqlite3_free(errorMsg);
|
||||||
|
Close(nullptr);
|
||||||
|
return {ReturnCode::INITIALIZATION_FAILED, std::move(err)};
|
||||||
|
}
|
||||||
|
|
||||||
|
sqlite3_free(errorMsg);
|
||||||
|
|
||||||
|
static std::array<std::pair<std::string, sqlite3*>, 8> statements =
|
||||||
|
{std::make_pair(util::fmt("insert into %s (key_str, value_str, expire_time) values(?, ?, ?)",
|
||||||
|
table_name.c_str()),
|
||||||
|
db),
|
||||||
|
std::make_pair(util::
|
||||||
|
fmt("insert into %s (key_str, value_str, expire_time) values(?, ?, ?) ON CONFLICT(key_str) "
|
||||||
|
"DO UPDATE SET value_str=?",
|
||||||
|
table_name.c_str()),
|
||||||
|
db),
|
||||||
|
std::make_pair(util::fmt("select value_str from %s where key_str=?", table_name.c_str()), db),
|
||||||
|
std::make_pair(util::fmt("delete from %s where key_str=?", table_name.c_str()), db),
|
||||||
|
|
||||||
|
std::make_pair(
|
||||||
|
util::fmt("select count(*) from %s where expire_time > 0 and expire_time != 0 and expire_time <= ?",
|
||||||
|
table_name.c_str()),
|
||||||
|
expire_db),
|
||||||
|
std::make_pair(util::fmt("delete from %s where expire_time > 0 and expire_time != 0 and expire_time <= ?",
|
||||||
|
table_name.c_str()),
|
||||||
|
expire_db),
|
||||||
|
std::make_pair(util::fmt("select last_run from zeek_storage_expiry_runs where ukey = '%s'",
|
||||||
|
table_name.c_str()),
|
||||||
|
expire_db),
|
||||||
|
std::make_pair(util::fmt("update zeek_storage_expiry_runs set last_run = ? where ukey = '%s'",
|
||||||
|
table_name.c_str()),
|
||||||
|
expire_db)};
|
||||||
|
|
||||||
|
std::vector<unique_stmt_ptr> stmt_ptrs;
|
||||||
int i = 0;
|
int i = 0;
|
||||||
for ( const auto& stmt : statements ) {
|
for ( const auto& [stmt, stmt_db] : statements ) {
|
||||||
sqlite3_stmt* ps;
|
sqlite3_stmt* ps;
|
||||||
if ( auto prep_res = CheckError(sqlite3_prepare_v2(db, stmt.c_str(), static_cast<int>(stmt.size()), &ps, NULL));
|
if ( auto prep_res =
|
||||||
|
CheckError(sqlite3_prepare_v2(stmt_db, stmt.c_str(), static_cast<int>(stmt.size()), &ps, NULL));
|
||||||
prep_res.code != ReturnCode::SUCCESS ) {
|
prep_res.code != ReturnCode::SUCCESS ) {
|
||||||
Close(nullptr);
|
Close(nullptr);
|
||||||
return prep_res;
|
return prep_res;
|
||||||
}
|
}
|
||||||
|
|
||||||
stmt_ptrs[i++] = unique_stmt_ptr(ps, [](sqlite3_stmt* stmt) { sqlite3_finalize(stmt); });
|
stmt_ptrs.emplace_back(ps, sqlite3_finalize);
|
||||||
}
|
}
|
||||||
|
|
||||||
put_stmt = std::move(stmt_ptrs[0]);
|
put_stmt = std::move(stmt_ptrs[0]);
|
||||||
put_update_stmt = std::move(stmt_ptrs[1]);
|
put_update_stmt = std::move(stmt_ptrs[1]);
|
||||||
get_stmt = std::move(stmt_ptrs[2]);
|
get_stmt = std::move(stmt_ptrs[2]);
|
||||||
erase_stmt = std::move(stmt_ptrs[3]);
|
erase_stmt = std::move(stmt_ptrs[3]);
|
||||||
expire_stmt = std::move(stmt_ptrs[4]);
|
check_expire_stmt = std::move(stmt_ptrs[4]);
|
||||||
|
expire_stmt = std::move(stmt_ptrs[5]);
|
||||||
|
get_expiry_last_run_stmt = std::move(stmt_ptrs[6]);
|
||||||
|
update_expiry_last_run_stmt = std::move(stmt_ptrs[7]);
|
||||||
|
|
||||||
return {ReturnCode::SUCCESS};
|
return {ReturnCode::SUCCESS};
|
||||||
}
|
}
|
||||||
|
@ -171,11 +239,15 @@ OperationResult SQLite::DoClose(ResultCallback* cb) {
|
||||||
OperationResult op_res{ReturnCode::SUCCESS};
|
OperationResult op_res{ReturnCode::SUCCESS};
|
||||||
|
|
||||||
if ( db ) {
|
if ( db ) {
|
||||||
|
// These will all call sqlite3_finalize as they're deleted.
|
||||||
put_stmt.reset();
|
put_stmt.reset();
|
||||||
put_update_stmt.reset();
|
put_update_stmt.reset();
|
||||||
get_stmt.reset();
|
get_stmt.reset();
|
||||||
erase_stmt.reset();
|
erase_stmt.reset();
|
||||||
|
check_expire_stmt.reset();
|
||||||
expire_stmt.reset();
|
expire_stmt.reset();
|
||||||
|
get_expiry_last_run_stmt.reset();
|
||||||
|
update_expiry_last_run_stmt.reset();
|
||||||
|
|
||||||
char* errmsg;
|
char* errmsg;
|
||||||
if ( int res = sqlite3_exec(db, "pragma optimize", NULL, NULL, &errmsg);
|
if ( int res = sqlite3_exec(db, "pragma optimize", NULL, NULL, &errmsg);
|
||||||
|
@ -187,11 +259,20 @@ OperationResult SQLite::DoClose(ResultCallback* cb) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if ( int res = sqlite3_close_v2(db); res != SQLITE_OK ) {
|
if ( int res = sqlite3_close_v2(db); res != SQLITE_OK ) {
|
||||||
|
op_res.code = ReturnCode::DISCONNECTION_FAILED;
|
||||||
if ( op_res.err_str.empty() )
|
if ( op_res.err_str.empty() )
|
||||||
op_res.err_str = "Sqlite could not close connection";
|
op_res.err_str = "Sqlite could not close main db connection";
|
||||||
}
|
}
|
||||||
|
|
||||||
db = nullptr;
|
db = nullptr;
|
||||||
|
|
||||||
|
if ( int res = sqlite3_close_v2(expire_db); res != SQLITE_OK ) {
|
||||||
|
op_res.code = ReturnCode::DISCONNECTION_FAILED;
|
||||||
|
if ( op_res.err_str.empty() )
|
||||||
|
op_res.err_str = "Sqlite could not close expire db connection";
|
||||||
|
}
|
||||||
|
|
||||||
|
expire_db = nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
return op_res;
|
return op_res;
|
||||||
|
@ -208,15 +289,14 @@ OperationResult SQLite::DoPut(ResultCallback* cb, ValPtr key, ValPtr value, bool
|
||||||
if ( ! key_data )
|
if ( ! key_data )
|
||||||
return {ReturnCode::SERIALIZATION_FAILED, "Failed to serialize key"};
|
return {ReturnCode::SERIALIZATION_FAILED, "Failed to serialize key"};
|
||||||
|
|
||||||
sqlite3_stmt* stmt;
|
unique_stmt_ptr stmt;
|
||||||
if ( ! overwrite )
|
if ( ! overwrite )
|
||||||
stmt = put_stmt.get();
|
stmt = unique_stmt_ptr(put_stmt.get(), sqlite3_reset);
|
||||||
else
|
else
|
||||||
stmt = put_update_stmt.get();
|
stmt = unique_stmt_ptr(put_update_stmt.get(), sqlite3_reset);
|
||||||
|
|
||||||
if ( auto res = CheckError(sqlite3_bind_blob(stmt, 1, key_data->data(), key_data->size(), SQLITE_STATIC));
|
if ( auto res = CheckError(sqlite3_bind_blob(stmt.get(), 1, key_data->data(), key_data->size(), SQLITE_STATIC));
|
||||||
res.code != ReturnCode::SUCCESS ) {
|
res.code != ReturnCode::SUCCESS ) {
|
||||||
sqlite3_reset(stmt);
|
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -224,26 +304,23 @@ OperationResult SQLite::DoPut(ResultCallback* cb, ValPtr key, ValPtr value, bool
|
||||||
if ( ! val_data )
|
if ( ! val_data )
|
||||||
return {ReturnCode::SERIALIZATION_FAILED, "Failed to serialize value"};
|
return {ReturnCode::SERIALIZATION_FAILED, "Failed to serialize value"};
|
||||||
|
|
||||||
if ( auto res = CheckError(sqlite3_bind_blob(stmt, 2, val_data->data(), val_data->size(), SQLITE_STATIC));
|
if ( auto res = CheckError(sqlite3_bind_blob(stmt.get(), 2, val_data->data(), val_data->size(), SQLITE_STATIC));
|
||||||
res.code != ReturnCode::SUCCESS ) {
|
res.code != ReturnCode::SUCCESS ) {
|
||||||
sqlite3_reset(stmt);
|
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ( auto res = CheckError(sqlite3_bind_double(stmt, 3, expiration_time)); res.code != ReturnCode::SUCCESS ) {
|
if ( auto res = CheckError(sqlite3_bind_double(stmt.get(), 3, expiration_time)); res.code != ReturnCode::SUCCESS ) {
|
||||||
sqlite3_reset(stmt);
|
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ( overwrite ) {
|
if ( overwrite ) {
|
||||||
if ( auto res = CheckError(sqlite3_bind_blob(stmt, 4, val_data->data(), val_data->size(), SQLITE_STATIC));
|
if ( auto res = CheckError(sqlite3_bind_blob(stmt.get(), 4, val_data->data(), val_data->size(), SQLITE_STATIC));
|
||||||
res.code != ReturnCode::SUCCESS ) {
|
res.code != ReturnCode::SUCCESS ) {
|
||||||
sqlite3_reset(stmt);
|
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return Step(stmt, false);
|
return Step(stmt.get(), false);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -257,15 +334,14 @@ OperationResult SQLite::DoGet(ResultCallback* cb, ValPtr key) {
|
||||||
if ( ! key_data )
|
if ( ! key_data )
|
||||||
return {ReturnCode::SERIALIZATION_FAILED, "Failed to serialize key"};
|
return {ReturnCode::SERIALIZATION_FAILED, "Failed to serialize key"};
|
||||||
|
|
||||||
auto stmt = get_stmt.get();
|
auto stmt = unique_stmt_ptr(get_stmt.get(), sqlite3_reset);
|
||||||
|
|
||||||
if ( auto res = CheckError(sqlite3_bind_blob(stmt, 1, key_data->data(), key_data->size(), SQLITE_STATIC));
|
if ( auto res = CheckError(sqlite3_bind_blob(stmt.get(), 1, key_data->data(), key_data->size(), SQLITE_STATIC));
|
||||||
res.code != ReturnCode::SUCCESS ) {
|
res.code != ReturnCode::SUCCESS ) {
|
||||||
sqlite3_reset(stmt);
|
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
return Step(stmt, true);
|
return Step(stmt.get(), true);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -279,15 +355,14 @@ OperationResult SQLite::DoErase(ResultCallback* cb, ValPtr key) {
|
||||||
if ( ! key_data )
|
if ( ! key_data )
|
||||||
return {ReturnCode::SERIALIZATION_FAILED, "Failed to serialize key"};
|
return {ReturnCode::SERIALIZATION_FAILED, "Failed to serialize key"};
|
||||||
|
|
||||||
auto stmt = erase_stmt.get();
|
auto stmt = unique_stmt_ptr(erase_stmt.get(), sqlite3_reset);
|
||||||
|
|
||||||
if ( auto res = CheckError(sqlite3_bind_blob(stmt, 1, key_data->data(), key_data->size(), SQLITE_STATIC));
|
if ( auto res = CheckError(sqlite3_bind_blob(stmt.get(), 1, key_data->data(), key_data->size(), SQLITE_STATIC));
|
||||||
res.code != ReturnCode::SUCCESS ) {
|
res.code != ReturnCode::SUCCESS ) {
|
||||||
sqlite3_reset(stmt);
|
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
return Step(stmt, false);
|
return Step(stmt.get(), false);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -295,20 +370,109 @@ OperationResult SQLite::DoErase(ResultCallback* cb, ValPtr key) {
|
||||||
* derived classes.
|
* derived classes.
|
||||||
*/
|
*/
|
||||||
void SQLite::DoExpire(double current_network_time) {
|
void SQLite::DoExpire(double current_network_time) {
|
||||||
auto stmt = expire_stmt.get();
|
int status;
|
||||||
|
char* errMsg = nullptr;
|
||||||
|
unique_stmt_ptr stmt;
|
||||||
|
|
||||||
int status = sqlite3_bind_double(stmt, 1, current_network_time);
|
// Begin an exclusive transaction here to lock the database for this one process. That
|
||||||
if ( status != SQLITE_OK ) {
|
// will ensure there isn't a TOCTOU bug in the time check below.
|
||||||
// TODO: do something with the error?
|
while ( true ) {
|
||||||
|
status = sqlite3_exec(expire_db, "begin immediate transaction", nullptr, nullptr, &errMsg);
|
||||||
|
sqlite3_free(errMsg);
|
||||||
|
|
||||||
|
if ( status == SQLITE_OK )
|
||||||
|
break;
|
||||||
|
else
|
||||||
|
// If any other status is returned here, give up. Notably, this includes
|
||||||
|
// SQLITE_BUSY which will be returned if there was already a transaction
|
||||||
|
// running. If one node got in and made the transaction, expiration is
|
||||||
|
// happening so the rest don't need to retry.
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
else {
|
|
||||||
status = sqlite3_step(stmt);
|
// Automatically rollback the transaction when this object is deleted.
|
||||||
if ( status != SQLITE_ROW ) {
|
auto deferred_rollback = util::Deferred([this]() {
|
||||||
// TODO: should this return an error somehow? Reporter warning?
|
char* errMsg = nullptr;
|
||||||
|
sqlite3_exec(expire_db, "rollback transaction", nullptr, nullptr, &errMsg);
|
||||||
|
sqlite3_free(errMsg);
|
||||||
|
});
|
||||||
|
|
||||||
|
// Check if there's anything to expire.
|
||||||
|
stmt = unique_stmt_ptr(check_expire_stmt.get(), sqlite3_reset);
|
||||||
|
status = sqlite3_bind_double(stmt.get(), 1, current_network_time);
|
||||||
|
while ( status != SQLITE_ROW ) {
|
||||||
|
status = sqlite3_step(stmt.get());
|
||||||
|
if ( status == SQLITE_ROW ) {
|
||||||
|
auto num_to_expire = sqlite3_column_int(stmt.get(), 0);
|
||||||
|
|
||||||
|
DBG_LOG(DBG_STORAGE, "Expiration has %d elements to expire", num_to_expire);
|
||||||
|
if ( num_to_expire == 0 )
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
sqlite3_reset(stmt);
|
// Check if the expiration control key is less than the interval. Exit if not.
|
||||||
|
stmt = unique_stmt_ptr(get_expiry_last_run_stmt.get(), sqlite3_reset);
|
||||||
|
while ( status != SQLITE_ROW ) {
|
||||||
|
status = sqlite3_step(stmt.get());
|
||||||
|
if ( status == SQLITE_ROW ) {
|
||||||
|
double last_run = sqlite3_column_double(stmt.get(), 0);
|
||||||
|
|
||||||
|
DBG_LOG(DBG_STORAGE, "Expiration last run: %f diff: %f interval: %f", last_run,
|
||||||
|
current_network_time - last_run, zeek::BifConst::Storage::expire_interval);
|
||||||
|
|
||||||
|
if ( current_network_time > 0 &&
|
||||||
|
(current_network_time - last_run) < zeek::BifConst::Storage::expire_interval )
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update the expiration control key
|
||||||
|
stmt = unique_stmt_ptr(update_expiry_last_run_stmt.get(), sqlite3_reset);
|
||||||
|
status = sqlite3_bind_double(stmt.get(), 1, current_network_time);
|
||||||
|
if ( status != SQLITE_OK ) {
|
||||||
|
std::string err =
|
||||||
|
util::fmt("Error preparing statement to update expiration control time: %s", sqlite3_errmsg(expire_db));
|
||||||
|
DBG_LOG(DBG_STORAGE, "%s", err.c_str());
|
||||||
|
Error(err.c_str());
|
||||||
|
sqlite3_free(errMsg);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
status = sqlite3_step(stmt.get());
|
||||||
|
if ( status != SQLITE_ROW && status != SQLITE_DONE ) {
|
||||||
|
std::string err = util::fmt("Error updating expiration control time: %s", errMsg);
|
||||||
|
DBG_LOG(DBG_STORAGE, "%s", err.c_str());
|
||||||
|
Error(err.c_str());
|
||||||
|
sqlite3_free(errMsg);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete the values.
|
||||||
|
stmt = unique_stmt_ptr(expire_stmt.get(), sqlite3_reset);
|
||||||
|
|
||||||
|
status = sqlite3_bind_double(stmt.get(), 1, current_network_time);
|
||||||
|
if ( status != SQLITE_OK ) {
|
||||||
|
std::string err = util::fmt("Error preparing statement to expire elements: %s", sqlite3_errmsg(expire_db));
|
||||||
|
DBG_LOG(DBG_STORAGE, "%s", err.c_str());
|
||||||
|
Error(err.c_str());
|
||||||
|
sqlite3_free(errMsg);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
status = sqlite3_step(stmt.get());
|
||||||
|
if ( status != SQLITE_ROW && status != SQLITE_DONE ) {
|
||||||
|
std::string err = util::fmt("Error expiring elements: %s", sqlite3_errmsg(expire_db));
|
||||||
|
DBG_LOG(DBG_STORAGE, "%s", err.c_str());
|
||||||
|
Error(err.c_str());
|
||||||
|
}
|
||||||
|
|
||||||
|
sqlite3_exec(expire_db, "commit transaction", nullptr, nullptr, &errMsg);
|
||||||
|
sqlite3_free(errMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
// returns true in case of error
|
// returns true in case of error
|
||||||
|
@ -330,7 +494,6 @@ OperationResult SQLite::Step(sqlite3_stmt* stmt, bool parse_value) {
|
||||||
size_t blob_size = sqlite3_column_bytes(stmt, 0);
|
size_t blob_size = sqlite3_column_bytes(stmt, 0);
|
||||||
|
|
||||||
auto val = serializer->Unserialize({blob, blob_size}, val_type);
|
auto val = serializer->Unserialize({blob, blob_size}, val_type);
|
||||||
sqlite3_reset(stmt);
|
|
||||||
|
|
||||||
if ( val )
|
if ( val )
|
||||||
ret = {ReturnCode::SUCCESS, "", val.value()};
|
ret = {ReturnCode::SUCCESS, "", val.value()};
|
||||||
|
@ -355,8 +518,6 @@ OperationResult SQLite::Step(sqlite3_stmt* stmt, bool parse_value) {
|
||||||
else
|
else
|
||||||
ret = {ReturnCode::OPERATION_FAILED};
|
ret = {ReturnCode::OPERATION_FAILED};
|
||||||
|
|
||||||
sqlite3_reset(stmt);
|
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -53,14 +53,20 @@ private:
|
||||||
OperationResult RunPragma(std::string_view name, std::optional<std::string_view> value = std::nullopt);
|
OperationResult RunPragma(std::string_view name, std::optional<std::string_view> value = std::nullopt);
|
||||||
|
|
||||||
sqlite3* db = nullptr;
|
sqlite3* db = nullptr;
|
||||||
|
sqlite3* expire_db = nullptr;
|
||||||
|
|
||||||
|
using sqlite_stmt_func = std::function<void(sqlite3_stmt*)>;
|
||||||
|
using unique_stmt_ptr = std::unique_ptr<sqlite3_stmt, sqlite_stmt_func>;
|
||||||
|
|
||||||
using stmt_deleter = std::function<void(sqlite3_stmt*)>;
|
|
||||||
using unique_stmt_ptr = std::unique_ptr<sqlite3_stmt, stmt_deleter>;
|
|
||||||
unique_stmt_ptr put_stmt;
|
unique_stmt_ptr put_stmt;
|
||||||
unique_stmt_ptr put_update_stmt;
|
unique_stmt_ptr put_update_stmt;
|
||||||
unique_stmt_ptr get_stmt;
|
unique_stmt_ptr get_stmt;
|
||||||
unique_stmt_ptr erase_stmt;
|
unique_stmt_ptr erase_stmt;
|
||||||
|
|
||||||
|
unique_stmt_ptr check_expire_stmt;
|
||||||
unique_stmt_ptr expire_stmt;
|
unique_stmt_ptr expire_stmt;
|
||||||
|
unique_stmt_ptr get_expiry_last_run_stmt;
|
||||||
|
unique_stmt_ptr update_expiry_last_run_stmt;
|
||||||
|
|
||||||
std::string full_path;
|
std::string full_path;
|
||||||
std::string table_name;
|
std::string table_name;
|
||||||
|
|
12
src/util.h
12
src/util.h
|
@ -662,5 +662,17 @@ inline std::vector<std::wstring_view> split(const wchar_t* s, const wchar_t* del
|
||||||
return split(std::wstring_view(s), std::wstring_view(delim));
|
return split(std::wstring_view(s), std::wstring_view(delim));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper class that runs a function at destruction.
|
||||||
|
*/
|
||||||
|
class Deferred {
|
||||||
|
public:
|
||||||
|
Deferred(std::function<void()> deferred) : deferred(std::move(deferred)) {}
|
||||||
|
~Deferred() { deferred(); }
|
||||||
|
|
||||||
|
private:
|
||||||
|
std::function<void()> deferred;
|
||||||
|
};
|
||||||
|
|
||||||
} // namespace util
|
} // namespace util
|
||||||
} // namespace zeek
|
} // namespace zeek
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||||
Storage::backend_opened, Storage::STORAGE_BACKEND_SQLITE, [serializer=Storage::STORAGE_SERIALIZER_JSON, sqlite=[database_path=test.sqlite, table_name=testing, pragma_commands={
|
Storage::backend_opened, Storage::STORAGE_BACKEND_SQLITE, [serializer=Storage::STORAGE_SERIALIZER_JSON, sqlite=[database_path=test.sqlite, table_name=testing, busy_timeout=5.0 secs, pragma_commands={
|
||||||
[integrity_check] = ,
|
[integrity_check] = ,
|
||||||
[busy_timeout] = 5000,
|
|
||||||
[journal_mode] = WAL,
|
[journal_mode] = WAL,
|
||||||
[synchronous] = normal,
|
[synchronous] = normal,
|
||||||
[temp_store] = memory
|
[temp_store] = memory
|
||||||
|
|
|
@ -1,4 +1,6 @@
|
||||||
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||||
worker-1, put result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=<uninitialized>]
|
worker-1, put result 1, [code=Storage::SUCCESS, error_str=<uninitialized>, value=<uninitialized>]
|
||||||
|
worker-1, put result 2, [code=Storage::SUCCESS, error_str=<uninitialized>, value=<uninitialized>]
|
||||||
sqlite_data_written
|
sqlite_data_written
|
||||||
worker-1, [code=Storage::SUCCESS, error_str=<uninitialized>, value=5678]
|
worker-1, get result 1 after expiration, [code=Storage::SUCCESS, error_str=<uninitialized>, value=value1234]
|
||||||
|
worker-1, get result 2 after expiration, [code=Storage::KEY_NOT_FOUND, error_str=<uninitialized>, value=<uninitialized>]
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||||
sqlite_data_written
|
sqlite_data_written
|
||||||
worker-2, [code=Storage::SUCCESS, error_str=<uninitialized>, value=5678]
|
worker-2, get result 1 after expiration, [code=Storage::SUCCESS, error_str=<uninitialized>, value=value1234]
|
||||||
|
worker-2, get result 2 after expiration, [code=Storage::KEY_NOT_FOUND, error_str=<uninitialized>, value=<uninitialized>]
|
||||||
|
|
|
@ -10,7 +10,7 @@
|
||||||
# @TEST-EXEC: btest-bg-run manager ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=manager zeek -b %INPUT
|
# @TEST-EXEC: btest-bg-run manager ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=manager zeek -b %INPUT
|
||||||
# @TEST-EXEC: btest-bg-run worker-1 ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=worker-1 zeek -b %INPUT
|
# @TEST-EXEC: btest-bg-run worker-1 ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=worker-1 zeek -b %INPUT
|
||||||
# @TEST-EXEC: btest-bg-run worker-2 ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=worker-2 zeek -b %INPUT
|
# @TEST-EXEC: btest-bg-run worker-2 ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=worker-2 zeek -b %INPUT
|
||||||
# @TEST-EXEC: btest-bg-wait -k 5
|
# @TEST-EXEC: btest-bg-wait -k 10
|
||||||
# @TEST-EXEC: btest-diff worker-1/.stdout
|
# @TEST-EXEC: btest-diff worker-1/.stdout
|
||||||
# @TEST-EXEC: btest-diff worker-2/.stdout
|
# @TEST-EXEC: btest-diff worker-2/.stdout
|
||||||
# @TEST-EXEC:
|
# @TEST-EXEC:
|
||||||
|
@ -20,11 +20,18 @@
|
||||||
@load policy/frameworks/storage/backend/sqlite
|
@load policy/frameworks/storage/backend/sqlite
|
||||||
@load policy/frameworks/cluster/experimental
|
@load policy/frameworks/cluster/experimental
|
||||||
|
|
||||||
|
redef Storage::expire_interval = 2 secs;
|
||||||
|
|
||||||
global sqlite_data_written: event() &is_used;
|
global sqlite_data_written: event() &is_used;
|
||||||
|
|
||||||
@if ( Cluster::local_node_type() == Cluster::WORKER )
|
@if ( Cluster::local_node_type() == Cluster::WORKER )
|
||||||
|
|
||||||
global backend: opaque of Storage::BackendHandle;
|
global backend: opaque of Storage::BackendHandle;
|
||||||
|
global key1: string = "key1234";
|
||||||
|
global value1: string = "value1234";
|
||||||
|
|
||||||
|
global key2: string = "key2345";
|
||||||
|
global value2: string = "value2345";
|
||||||
|
|
||||||
event zeek_init()
|
event zeek_init()
|
||||||
{
|
{
|
||||||
|
@ -40,13 +47,22 @@ event zeek_init()
|
||||||
backend = open_res$value;
|
backend = open_res$value;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
event check_removed()
|
||||||
|
{
|
||||||
|
local res = Storage::Sync::get(backend, key1);
|
||||||
|
print Cluster::node, "get result 1 after expiration", res;
|
||||||
|
|
||||||
|
res = Storage::Sync::get(backend, key2);
|
||||||
|
print Cluster::node, "get result 2 after expiration", res;
|
||||||
|
|
||||||
|
Storage::Sync::close_backend(backend);
|
||||||
|
terminate();
|
||||||
|
}
|
||||||
|
|
||||||
event sqlite_data_written()
|
event sqlite_data_written()
|
||||||
{
|
{
|
||||||
print "sqlite_data_written";
|
print "sqlite_data_written";
|
||||||
local res = Storage::Sync::get(backend, "1234");
|
schedule 5secs { check_removed() };
|
||||||
print Cluster::node, res;
|
|
||||||
Storage::Sync::close_backend(backend);
|
|
||||||
terminate();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@else
|
@else
|
||||||
|
@ -72,8 +88,11 @@ event sqlite_data_written()
|
||||||
|
|
||||||
event Cluster::Experimental::cluster_started()
|
event Cluster::Experimental::cluster_started()
|
||||||
{
|
{
|
||||||
local res = Storage::Sync::put(backend, [ $key="1234", $value="5678" ]);
|
local res = Storage::Sync::put(backend, [ $key=key1, $value=value1 ]);
|
||||||
print Cluster::node, "put result", res;
|
print Cluster::node, "put result 1", res;
|
||||||
|
|
||||||
|
res = Storage::Sync::put(backend, [ $key=key2, $value=value2, $expire_time=2 sec ]);
|
||||||
|
print Cluster::node, "put result 2", res;
|
||||||
|
|
||||||
local e = Cluster::make_event(sqlite_data_written);
|
local e = Cluster::make_event(sqlite_data_written);
|
||||||
Cluster::publish(Cluster::manager_topic, e);
|
Cluster::publish(Cluster::manager_topic, e);
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue