Handle potential contention when running sqlite expiration

This commit is contained in:
Tim Wojtulewicz 2025-05-22 14:29:37 -07:00
parent fb165c5ef1
commit 0e5b3def84
2 changed files with 198 additions and 25 deletions

View file

@ -12,6 +12,8 @@
#include "zeek/Val.h"
#include "zeek/storage/ReturnCode.h"
#include "const.bif.netvar_h"
using namespace std::chrono_literals;
namespace zeek::storage::backend::sqlite {
@ -90,7 +92,7 @@ OperationResult SQLite::DoOpen(OpenResultCallback* cb, RecordValPtr options) {
if ( auto open_res =
CheckError(sqlite3_open_v2(full_path.c_str(), &db,
SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_FULLMUTEX, NULL));
SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_NOMUTEX, NULL));
open_res.code != ReturnCode::SUCCESS ) {
sqlite3_close_v2(db);
db = nullptr;
@ -110,7 +112,7 @@ OperationResult SQLite::DoOpen(OpenResultCallback* cb, RecordValPtr options) {
auto ks = vl->AsListVal()->Idx(0)->AsStringVal();
auto ks_sv = ks->ToStdStringView();
auto vs = v->GetVal()->AsStringVal();
auto vs = iter.value->GetVal()->AsStringVal();
auto vs_sv = vs->ToStdStringView();
auto pragma_res = RunPragma(ks_sv, vs_sv);
@ -120,11 +122,23 @@ OperationResult SQLite::DoOpen(OpenResultCallback* cb, RecordValPtr options) {
}
}
std::string create = "create table if not exists " + table_name + " (";
create.append("key_str blob primary key, value_str blob not null, expire_time real);");
// Open a second connection to the database. This one is used for expiration and exists to prevent
// simultaneous multi-threaded access to the same connection.
if ( auto open_res =
CheckError(sqlite3_open_v2(full_path.c_str(), &expire_db,
SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_NOMUTEX, NULL));
open_res.code != ReturnCode::SUCCESS ) {
Close(nullptr);
return open_res;
}
sqlite3_busy_timeout(expire_db, busy_timeout * 1000);
std::string cmd = "create table if not exists " + table_name + " (";
cmd.append("key_str blob primary key, value_str blob not null, expire_time real);");
char* errorMsg = nullptr;
if ( int res = sqlite3_exec(db, create.c_str(), NULL, NULL, &errorMsg); res != SQLITE_OK ) {
if ( int res = sqlite3_exec(db, cmd.c_str(), NULL, NULL, &errorMsg); res != SQLITE_OK ) {
std::string err = util::fmt("Error executing table creation statement: (%d) %s", res, errorMsg);
Error(err.c_str());
sqlite3_free(errorMsg);
@ -132,34 +146,85 @@ OperationResult SQLite::DoOpen(OpenResultCallback* cb, RecordValPtr options) {
return {ReturnCode::INITIALIZATION_FAILED, std::move(err)};
}
static std::array<std::string, 5> statements =
{util::fmt("insert into %s (key_str, value_str, expire_time) values(?, ?, ?)", table_name.c_str()),
util::fmt("insert into %s (key_str, value_str, expire_time) values(?, ?, ?) ON CONFLICT(key_str) "
"DO UPDATE SET value_str=?",
table_name.c_str()),
util::fmt("select value_str from %s where key_str=?", table_name.c_str()),
util::fmt("delete from %s where key_str=?", table_name.c_str()),
util::fmt("delete from %s where expire_time > 0 and expire_time != 0 and expire_time <= ?",
table_name.c_str())};
sqlite3_free(errorMsg);
std::array<unique_stmt_ptr, 5> stmt_ptrs;
// Create a table for controlling expiration contention. The ukey column here ensures that only
// one row exists for this backend's table.
cmd = util::fmt("create table if not exists zeek_storage_expiry_runs (ukey primary key, last_run double);");
if ( int res = sqlite3_exec(db, cmd.c_str(), NULL, NULL, &errorMsg); res != SQLITE_OK ) {
std::string err = util::fmt("Error executing table creation statement: (%d) %s", res, errorMsg);
Error(err.c_str());
sqlite3_free(errorMsg);
Close(nullptr);
return {ReturnCode::INITIALIZATION_FAILED, std::move(err)};
}
sqlite3_free(errorMsg);
// Attempt to insert an initial value into the table if this is the first run with
// this file. This may result in a SQLITE_CONSTRAINT if the row already exists. That's
// not an error, as it's possible if the file already existed.
cmd = util::fmt("insert into zeek_storage_expiry_runs (ukey, last_run) values('%s', 0);", table_name.c_str());
if ( int res = sqlite3_exec(db, cmd.c_str(), nullptr, nullptr, &errorMsg);
res != SQLITE_OK && res != SQLITE_CONSTRAINT ) {
std::string err =
util::fmt("Error inserting initial row into expiration control table: (%d) %s", res, errorMsg);
Error(err.c_str());
sqlite3_free(errorMsg);
Close(nullptr);
return {ReturnCode::INITIALIZATION_FAILED, std::move(err)};
}
sqlite3_free(errorMsg);
static std::array<std::pair<std::string, sqlite3*>, 8> statements =
{std::make_pair(util::fmt("insert into %s (key_str, value_str, expire_time) values(?, ?, ?)",
table_name.c_str()),
db),
std::make_pair(util::
fmt("insert into %s (key_str, value_str, expire_time) values(?, ?, ?) ON CONFLICT(key_str) "
"DO UPDATE SET value_str=?",
table_name.c_str()),
db),
std::make_pair(util::fmt("select value_str from %s where key_str=?", table_name.c_str()), db),
std::make_pair(util::fmt("delete from %s where key_str=?", table_name.c_str()), db),
std::make_pair(
util::fmt("select count(*) from %s where expire_time > 0 and expire_time != 0 and expire_time <= ?",
table_name.c_str()),
expire_db),
std::make_pair(util::fmt("delete from %s where expire_time > 0 and expire_time != 0 and expire_time <= ?",
table_name.c_str()),
expire_db),
std::make_pair(util::fmt("select last_run from zeek_storage_expiry_runs where ukey = '%s'",
table_name.c_str()),
expire_db),
std::make_pair(util::fmt("update zeek_storage_expiry_runs set last_run = ? where ukey = '%s'",
table_name.c_str()),
expire_db)};
std::vector<unique_stmt_ptr> stmt_ptrs;
int i = 0;
for ( const auto& stmt : statements ) {
for ( const auto& [stmt, stmt_db] : statements ) {
sqlite3_stmt* ps;
if ( auto prep_res = CheckError(sqlite3_prepare_v2(db, stmt.c_str(), static_cast<int>(stmt.size()), &ps, NULL));
if ( auto prep_res =
CheckError(sqlite3_prepare_v2(stmt_db, stmt.c_str(), static_cast<int>(stmt.size()), &ps, NULL));
prep_res.code != ReturnCode::SUCCESS ) {
Close(nullptr);
return prep_res;
}
stmt_ptrs[i++] = unique_stmt_ptr(ps, sqlite3_finalize);
stmt_ptrs.emplace_back(ps, sqlite3_finalize);
}
put_stmt = std::move(stmt_ptrs[0]);
put_update_stmt = std::move(stmt_ptrs[1]);
get_stmt = std::move(stmt_ptrs[2]);
erase_stmt = std::move(stmt_ptrs[3]);
expire_stmt = std::move(stmt_ptrs[4]);
check_expire_stmt = std::move(stmt_ptrs[4]);
expire_stmt = std::move(stmt_ptrs[5]);
get_expiry_last_run_stmt = std::move(stmt_ptrs[6]);
update_expiry_last_run_stmt = std::move(stmt_ptrs[7]);
return {ReturnCode::SUCCESS};
}
@ -176,7 +241,10 @@ OperationResult SQLite::DoClose(ResultCallback* cb) {
put_update_stmt.reset();
get_stmt.reset();
erase_stmt.reset();
check_expire_stmt.reset();
expire_stmt.reset();
get_expiry_last_run_stmt.reset();
update_expiry_last_run_stmt.reset();
char* errmsg;
if ( int res = sqlite3_exec(db, "pragma optimize", NULL, NULL, &errmsg);
@ -188,11 +256,20 @@ OperationResult SQLite::DoClose(ResultCallback* cb) {
}
if ( int res = sqlite3_close_v2(db); res != SQLITE_OK ) {
op_res.code = ReturnCode::DISCONNECTION_FAILED;
if ( op_res.err_str.empty() )
op_res.err_str = "Sqlite could not close connection";
op_res.err_str = "Sqlite could not close main db connection";
}
db = nullptr;
if ( int res = sqlite3_close_v2(expire_db); res != SQLITE_OK ) {
op_res.code = ReturnCode::DISCONNECTION_FAILED;
if ( op_res.err_str.empty() )
op_res.err_str = "Sqlite could not close expire db connection";
}
expire_db = nullptr;
}
return op_res;
@ -290,18 +367,109 @@ OperationResult SQLite::DoErase(ResultCallback* cb, ValPtr key) {
* derived classes.
*/
void SQLite::DoExpire(double current_network_time) {
auto stmt = unique_stmt_ptr(expire_stmt.get(), sqlite3_reset);
int status;
char* errMsg = nullptr;
unique_stmt_ptr stmt;
int status = sqlite3_bind_double(stmt.get(), 1, current_network_time);
// Begin an exclusive transaction here to lock the database for this one process. That
// will ensure there isn't a TOCTOU bug in the time check below.
while ( true ) {
status = sqlite3_exec(expire_db, "begin immediate transaction", nullptr, nullptr, &errMsg);
sqlite3_free(errMsg);
if ( status == SQLITE_OK )
break;
else
// If any other status is returned here, give up. Notably, this includes
// SQLITE_BUSY which will be returned if there was already a transaction
// running. If one node got in and made the transaction, expiration is
// happening so the rest don't need to retry.
return;
}
// Automatically rollback the transaction when this object is deleted.
auto deferred_rollback = util::Deferred([this]() {
char* errMsg = nullptr;
sqlite3_exec(expire_db, "rollback transaction", nullptr, nullptr, &errMsg);
sqlite3_free(errMsg);
});
// Check if there's anything to expire.
stmt = unique_stmt_ptr(check_expire_stmt.get(), sqlite3_reset);
status = sqlite3_bind_double(stmt.get(), 1, current_network_time);
while ( status != SQLITE_ROW ) {
status = sqlite3_step(stmt.get());
if ( status == SQLITE_ROW ) {
auto num_to_expire = sqlite3_column_int(stmt.get(), 0);
DBG_LOG(DBG_STORAGE, "Expiration has %d elements to expire", num_to_expire);
if ( num_to_expire == 0 )
return;
}
else
return;
}
// Check if the expiration control key is less than the interval. Exit if not.
stmt = unique_stmt_ptr(get_expiry_last_run_stmt.get(), sqlite3_reset);
while ( status != SQLITE_ROW ) {
status = sqlite3_step(stmt.get());
if ( status == SQLITE_ROW ) {
double last_run = sqlite3_column_double(stmt.get(), 0);
DBG_LOG(DBG_STORAGE, "Expiration last run: %f diff: %f interval: %f", last_run,
current_network_time - last_run, zeek::BifConst::Storage::expire_interval);
if ( current_network_time > 0 &&
(current_network_time - last_run) < zeek::BifConst::Storage::expire_interval )
return;
}
else
return;
}
// Update the expiration control key
stmt = unique_stmt_ptr(update_expiry_last_run_stmt.get(), sqlite3_reset);
status = sqlite3_bind_double(stmt.get(), 1, current_network_time);
if ( status != SQLITE_OK ) {
// TODO: do something with the error?
std::string err =
util::fmt("Error preparing statement to update expiration control time: %s", sqlite3_errmsg(expire_db));
DBG_LOG(DBG_STORAGE, "%s", err.c_str());
Error(err.c_str());
sqlite3_free(errMsg);
return;
}
status = sqlite3_step(stmt.get());
if ( status != SQLITE_ROW ) {
// TODO: should this return an error somehow? Reporter warning?
if ( status != SQLITE_ROW && status != SQLITE_DONE ) {
std::string err = util::fmt("Error updating expiration control time: %s", errMsg);
DBG_LOG(DBG_STORAGE, "%s", err.c_str());
Error(err.c_str());
sqlite3_free(errMsg);
return;
}
// Delete the values.
stmt = unique_stmt_ptr(expire_stmt.get(), sqlite3_reset);
status = sqlite3_bind_double(stmt.get(), 1, current_network_time);
if ( status != SQLITE_OK ) {
std::string err = util::fmt("Error preparing statement to expire elements: %s", sqlite3_errmsg(expire_db));
DBG_LOG(DBG_STORAGE, "%s", err.c_str());
Error(err.c_str());
sqlite3_free(errMsg);
return;
}
status = sqlite3_step(stmt.get());
if ( status != SQLITE_ROW && status != SQLITE_DONE ) {
std::string err = util::fmt("Error expiring elements: %s", sqlite3_errmsg(expire_db));
DBG_LOG(DBG_STORAGE, "%s", err.c_str());
Error(err.c_str());
}
sqlite3_exec(expire_db, "commit transaction", nullptr, nullptr, &errMsg);
sqlite3_free(errMsg);
}
// returns true in case of error

View file

@ -53,6 +53,7 @@ private:
OperationResult RunPragma(std::string_view name, std::optional<std::string_view> value = std::nullopt);
sqlite3* db = nullptr;
sqlite3* expire_db = nullptr;
using sqlite_stmt_func = std::function<void(sqlite3_stmt*)>;
using unique_stmt_ptr = std::unique_ptr<sqlite3_stmt, sqlite_stmt_func>;
@ -61,7 +62,11 @@ private:
unique_stmt_ptr put_update_stmt;
unique_stmt_ptr get_stmt;
unique_stmt_ptr erase_stmt;
unique_stmt_ptr check_expire_stmt;
unique_stmt_ptr expire_stmt;
unique_stmt_ptr get_expiry_last_run_stmt;
unique_stmt_ptr update_expiry_last_run_stmt;
std::string full_path;
std::string table_name;