From 850b20e12bc2ea659c6ae05859e0685076333d00 Mon Sep 17 00:00:00 2001 From: Tim Wojtulewicz Date: Mon, 2 Jun 2025 13:46:49 -0700 Subject: [PATCH 1/5] Move Deferred class from ZeroMQ to util --- src/cluster/backend/zeromq/ZeroMQ.cc | 12 +----------- src/util.h | 12 ++++++++++++ 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/src/cluster/backend/zeromq/ZeroMQ.cc b/src/cluster/backend/zeromq/ZeroMQ.cc index 5c1104afbd..c2c5ac52a9 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.cc +++ b/src/cluster/backend/zeromq/ZeroMQ.cc @@ -571,16 +571,6 @@ void ZeroMQBackend::Run() { } }; - // Helper class running at destruction. - class Deferred { - public: - Deferred(std::function deferred) : closer(std::move(deferred)) {} - ~Deferred() { closer(); } - - private: - std::function closer; - }; - struct SocketInfo { zmq::socket_ref socket; std::string name; @@ -595,7 +585,7 @@ void ZeroMQBackend::Run() { }; // Called when Run() terminates. - auto deferred_close = Deferred([this]() { + auto deferred_close = util::Deferred([this]() { child_inproc.close(); xpub.close(); xsub.close(); diff --git a/src/util.h b/src/util.h index 7beb6a9001..4f50d3474c 100644 --- a/src/util.h +++ b/src/util.h @@ -662,5 +662,17 @@ inline std::vector split(const wchar_t* s, const wchar_t* del return split(std::wstring_view(s), std::wstring_view(delim)); } +/** + * Helper class that runs a function at destruction. + */ +class Deferred { +public: + Deferred(std::function deferred) : deferred(std::move(deferred)) {} + ~Deferred() { deferred(); } + +private: + std::function deferred; +}; + } // namespace util } // namespace zeek From 97a2ec379e08e8852e7862d78ed6443629468851 Mon Sep 17 00:00:00 2001 From: Tim Wojtulewicz Date: Thu, 22 May 2025 14:25:22 -0700 Subject: [PATCH 2/5] Use unique_ptr to avoid needing to call sqlite3_reset manually --- src/storage/backend/sqlite/SQLite.cc | 56 ++++++++++++---------------- src/storage/backend/sqlite/SQLite.h | 5 ++- 2 files changed, 26 insertions(+), 35 deletions(-) diff --git a/src/storage/backend/sqlite/SQLite.cc b/src/storage/backend/sqlite/SQLite.cc index 6a6be32ba7..87e7a66652 100644 --- a/src/storage/backend/sqlite/SQLite.cc +++ b/src/storage/backend/sqlite/SQLite.cc @@ -152,7 +152,7 @@ OperationResult SQLite::DoOpen(OpenResultCallback* cb, RecordValPtr options) { return prep_res; } - stmt_ptrs[i++] = unique_stmt_ptr(ps, [](sqlite3_stmt* stmt) { sqlite3_finalize(stmt); }); + stmt_ptrs[i++] = unique_stmt_ptr(ps, sqlite3_finalize); } put_stmt = std::move(stmt_ptrs[0]); @@ -171,6 +171,7 @@ OperationResult SQLite::DoClose(ResultCallback* cb) { OperationResult op_res{ReturnCode::SUCCESS}; if ( db ) { + // These will all call sqlite3_finalize as they're deleted. put_stmt.reset(); put_update_stmt.reset(); get_stmt.reset(); @@ -208,15 +209,14 @@ OperationResult SQLite::DoPut(ResultCallback* cb, ValPtr key, ValPtr value, bool if ( ! key_data ) return {ReturnCode::SERIALIZATION_FAILED, "Failed to serialize key"}; - sqlite3_stmt* stmt; + unique_stmt_ptr stmt; if ( ! overwrite ) - stmt = put_stmt.get(); + stmt = unique_stmt_ptr(put_stmt.get(), sqlite3_reset); else - stmt = put_update_stmt.get(); + stmt = unique_stmt_ptr(put_update_stmt.get(), sqlite3_reset); - if ( auto res = CheckError(sqlite3_bind_blob(stmt, 1, key_data->data(), key_data->size(), SQLITE_STATIC)); + if ( auto res = CheckError(sqlite3_bind_blob(stmt.get(), 1, key_data->data(), key_data->size(), SQLITE_STATIC)); res.code != ReturnCode::SUCCESS ) { - sqlite3_reset(stmt); return res; } @@ -224,26 +224,23 @@ OperationResult SQLite::DoPut(ResultCallback* cb, ValPtr key, ValPtr value, bool if ( ! val_data ) return {ReturnCode::SERIALIZATION_FAILED, "Failed to serialize value"}; - if ( auto res = CheckError(sqlite3_bind_blob(stmt, 2, val_data->data(), val_data->size(), SQLITE_STATIC)); + if ( auto res = CheckError(sqlite3_bind_blob(stmt.get(), 2, val_data->data(), val_data->size(), SQLITE_STATIC)); res.code != ReturnCode::SUCCESS ) { - sqlite3_reset(stmt); return res; } - if ( auto res = CheckError(sqlite3_bind_double(stmt, 3, expiration_time)); res.code != ReturnCode::SUCCESS ) { - sqlite3_reset(stmt); + if ( auto res = CheckError(sqlite3_bind_double(stmt.get(), 3, expiration_time)); res.code != ReturnCode::SUCCESS ) { return res; } if ( overwrite ) { - if ( auto res = CheckError(sqlite3_bind_blob(stmt, 4, val_data->data(), val_data->size(), SQLITE_STATIC)); + if ( auto res = CheckError(sqlite3_bind_blob(stmt.get(), 4, val_data->data(), val_data->size(), SQLITE_STATIC)); res.code != ReturnCode::SUCCESS ) { - sqlite3_reset(stmt); return res; } } - return Step(stmt, false); + return Step(stmt.get(), false); } /** @@ -257,15 +254,14 @@ OperationResult SQLite::DoGet(ResultCallback* cb, ValPtr key) { if ( ! key_data ) return {ReturnCode::SERIALIZATION_FAILED, "Failed to serialize key"}; - auto stmt = get_stmt.get(); + auto stmt = unique_stmt_ptr(get_stmt.get(), sqlite3_reset); - if ( auto res = CheckError(sqlite3_bind_blob(stmt, 1, key_data->data(), key_data->size(), SQLITE_STATIC)); + if ( auto res = CheckError(sqlite3_bind_blob(stmt.get(), 1, key_data->data(), key_data->size(), SQLITE_STATIC)); res.code != ReturnCode::SUCCESS ) { - sqlite3_reset(stmt); return res; } - return Step(stmt, true); + return Step(stmt.get(), true); } /** @@ -279,15 +275,14 @@ OperationResult SQLite::DoErase(ResultCallback* cb, ValPtr key) { if ( ! key_data ) return {ReturnCode::SERIALIZATION_FAILED, "Failed to serialize key"}; - auto stmt = erase_stmt.get(); + auto stmt = unique_stmt_ptr(erase_stmt.get(), sqlite3_reset); - if ( auto res = CheckError(sqlite3_bind_blob(stmt, 1, key_data->data(), key_data->size(), SQLITE_STATIC)); + if ( auto res = CheckError(sqlite3_bind_blob(stmt.get(), 1, key_data->data(), key_data->size(), SQLITE_STATIC)); res.code != ReturnCode::SUCCESS ) { - sqlite3_reset(stmt); return res; } - return Step(stmt, false); + return Step(stmt.get(), false); } /** @@ -295,20 +290,18 @@ OperationResult SQLite::DoErase(ResultCallback* cb, ValPtr key) { * derived classes. */ void SQLite::DoExpire(double current_network_time) { - auto stmt = expire_stmt.get(); + auto stmt = unique_stmt_ptr(expire_stmt.get(), sqlite3_reset); - int status = sqlite3_bind_double(stmt, 1, current_network_time); + int status = sqlite3_bind_double(stmt.get(), 1, current_network_time); if ( status != SQLITE_OK ) { // TODO: do something with the error? - } - else { - status = sqlite3_step(stmt); - if ( status != SQLITE_ROW ) { - // TODO: should this return an error somehow? Reporter warning? - } + return; } - sqlite3_reset(stmt); + status = sqlite3_step(stmt.get()); + if ( status != SQLITE_ROW ) { + // TODO: should this return an error somehow? Reporter warning? + } } // returns true in case of error @@ -330,7 +323,6 @@ OperationResult SQLite::Step(sqlite3_stmt* stmt, bool parse_value) { size_t blob_size = sqlite3_column_bytes(stmt, 0); auto val = serializer->Unserialize({blob, blob_size}, val_type); - sqlite3_reset(stmt); if ( val ) ret = {ReturnCode::SUCCESS, "", val.value()}; @@ -355,8 +347,6 @@ OperationResult SQLite::Step(sqlite3_stmt* stmt, bool parse_value) { else ret = {ReturnCode::OPERATION_FAILED}; - sqlite3_reset(stmt); - return ret; } diff --git a/src/storage/backend/sqlite/SQLite.h b/src/storage/backend/sqlite/SQLite.h index a5db127fb7..29cae4fd89 100644 --- a/src/storage/backend/sqlite/SQLite.h +++ b/src/storage/backend/sqlite/SQLite.h @@ -54,8 +54,9 @@ private: sqlite3* db = nullptr; - using stmt_deleter = std::function; - using unique_stmt_ptr = std::unique_ptr; + using sqlite_stmt_func = std::function; + using unique_stmt_ptr = std::unique_ptr; + unique_stmt_ptr put_stmt; unique_stmt_ptr put_update_stmt; unique_stmt_ptr get_stmt; From fb165c5ef11cc3ffbc63b6227ca76cbcca9c4c83 Mon Sep 17 00:00:00 2001 From: Tim Wojtulewicz Date: Thu, 22 May 2025 14:29:06 -0700 Subject: [PATCH 3/5] Add expiration to sqlite-cluster.btest --- .../worker-1..stdout | 6 ++-- .../worker-2..stdout | 3 +- .../frameworks/storage/sqlite-cluster.zeek | 33 +++++++++++++++---- 3 files changed, 32 insertions(+), 10 deletions(-) diff --git a/testing/btest/Baseline/scripts.base.frameworks.storage.sqlite-cluster/worker-1..stdout b/testing/btest/Baseline/scripts.base.frameworks.storage.sqlite-cluster/worker-1..stdout index 33644c6261..75adea5b51 100644 --- a/testing/btest/Baseline/scripts.base.frameworks.storage.sqlite-cluster/worker-1..stdout +++ b/testing/btest/Baseline/scripts.base.frameworks.storage.sqlite-cluster/worker-1..stdout @@ -1,4 +1,6 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. -worker-1, put result, [code=Storage::SUCCESS, error_str=, value=] +worker-1, put result 1, [code=Storage::SUCCESS, error_str=, value=] +worker-1, put result 2, [code=Storage::SUCCESS, error_str=, value=] sqlite_data_written -worker-1, [code=Storage::SUCCESS, error_str=, value=5678] +worker-1, get result 1 after expiration, [code=Storage::SUCCESS, error_str=, value=value1234] +worker-1, get result 2 after expiration, [code=Storage::KEY_NOT_FOUND, error_str=, value=] diff --git a/testing/btest/Baseline/scripts.base.frameworks.storage.sqlite-cluster/worker-2..stdout b/testing/btest/Baseline/scripts.base.frameworks.storage.sqlite-cluster/worker-2..stdout index fbd008b81b..a7988331a2 100644 --- a/testing/btest/Baseline/scripts.base.frameworks.storage.sqlite-cluster/worker-2..stdout +++ b/testing/btest/Baseline/scripts.base.frameworks.storage.sqlite-cluster/worker-2..stdout @@ -1,3 +1,4 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. sqlite_data_written -worker-2, [code=Storage::SUCCESS, error_str=, value=5678] +worker-2, get result 1 after expiration, [code=Storage::SUCCESS, error_str=, value=value1234] +worker-2, get result 2 after expiration, [code=Storage::KEY_NOT_FOUND, error_str=, value=] diff --git a/testing/btest/scripts/base/frameworks/storage/sqlite-cluster.zeek b/testing/btest/scripts/base/frameworks/storage/sqlite-cluster.zeek index 7d4f5db5e4..f06c536a24 100644 --- a/testing/btest/scripts/base/frameworks/storage/sqlite-cluster.zeek +++ b/testing/btest/scripts/base/frameworks/storage/sqlite-cluster.zeek @@ -10,7 +10,7 @@ # @TEST-EXEC: btest-bg-run manager ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=manager zeek -b %INPUT # @TEST-EXEC: btest-bg-run worker-1 ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=worker-1 zeek -b %INPUT # @TEST-EXEC: btest-bg-run worker-2 ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=worker-2 zeek -b %INPUT -# @TEST-EXEC: btest-bg-wait -k 5 +# @TEST-EXEC: btest-bg-wait -k 10 # @TEST-EXEC: btest-diff worker-1/.stdout # @TEST-EXEC: btest-diff worker-2/.stdout # @TEST-EXEC: @@ -20,11 +20,18 @@ @load policy/frameworks/storage/backend/sqlite @load policy/frameworks/cluster/experimental +redef Storage::expire_interval = 2 secs; + global sqlite_data_written: event() &is_used; @if ( Cluster::local_node_type() == Cluster::WORKER ) global backend: opaque of Storage::BackendHandle; +global key1: string = "key1234"; +global value1: string = "value1234"; + +global key2: string = "key2345"; +global value2: string = "value2345"; event zeek_init() { @@ -40,13 +47,22 @@ event zeek_init() backend = open_res$value; } +event check_removed() + { + local res = Storage::Sync::get(backend, key1); + print Cluster::node, "get result 1 after expiration", res; + + res = Storage::Sync::get(backend, key2); + print Cluster::node, "get result 2 after expiration", res; + + Storage::Sync::close_backend(backend); + terminate(); + } + event sqlite_data_written() { print "sqlite_data_written"; - local res = Storage::Sync::get(backend, "1234"); - print Cluster::node, res; - Storage::Sync::close_backend(backend); - terminate(); + schedule 5secs { check_removed() }; } @else @@ -72,8 +88,11 @@ event sqlite_data_written() event Cluster::Experimental::cluster_started() { - local res = Storage::Sync::put(backend, [ $key="1234", $value="5678" ]); - print Cluster::node, "put result", res; + local res = Storage::Sync::put(backend, [ $key=key1, $value=value1 ]); + print Cluster::node, "put result 1", res; + + res = Storage::Sync::put(backend, [ $key=key2, $value=value2, $expire_time=2 sec ]); + print Cluster::node, "put result 2", res; local e = Cluster::make_event(sqlite_data_written); Cluster::publish(Cluster::manager_topic, e); From 0e5b3def84733207fedcecc6b510a6c11ac0dadd Mon Sep 17 00:00:00 2001 From: Tim Wojtulewicz Date: Thu, 22 May 2025 14:29:37 -0700 Subject: [PATCH 4/5] Handle potential contention when running sqlite expiration --- src/storage/backend/sqlite/SQLite.cc | 218 ++++++++++++++++++++++++--- src/storage/backend/sqlite/SQLite.h | 5 + 2 files changed, 198 insertions(+), 25 deletions(-) diff --git a/src/storage/backend/sqlite/SQLite.cc b/src/storage/backend/sqlite/SQLite.cc index 87e7a66652..596ae6374f 100644 --- a/src/storage/backend/sqlite/SQLite.cc +++ b/src/storage/backend/sqlite/SQLite.cc @@ -12,6 +12,8 @@ #include "zeek/Val.h" #include "zeek/storage/ReturnCode.h" +#include "const.bif.netvar_h" + using namespace std::chrono_literals; namespace zeek::storage::backend::sqlite { @@ -90,7 +92,7 @@ OperationResult SQLite::DoOpen(OpenResultCallback* cb, RecordValPtr options) { if ( auto open_res = CheckError(sqlite3_open_v2(full_path.c_str(), &db, - SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_FULLMUTEX, NULL)); + SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_NOMUTEX, NULL)); open_res.code != ReturnCode::SUCCESS ) { sqlite3_close_v2(db); db = nullptr; @@ -110,7 +112,7 @@ OperationResult SQLite::DoOpen(OpenResultCallback* cb, RecordValPtr options) { auto ks = vl->AsListVal()->Idx(0)->AsStringVal(); auto ks_sv = ks->ToStdStringView(); - auto vs = v->GetVal()->AsStringVal(); + auto vs = iter.value->GetVal()->AsStringVal(); auto vs_sv = vs->ToStdStringView(); auto pragma_res = RunPragma(ks_sv, vs_sv); @@ -120,11 +122,23 @@ OperationResult SQLite::DoOpen(OpenResultCallback* cb, RecordValPtr options) { } } - std::string create = "create table if not exists " + table_name + " ("; - create.append("key_str blob primary key, value_str blob not null, expire_time real);"); + // Open a second connection to the database. This one is used for expiration and exists to prevent + // simultaneous multi-threaded access to the same connection. + if ( auto open_res = + CheckError(sqlite3_open_v2(full_path.c_str(), &expire_db, + SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_NOMUTEX, NULL)); + open_res.code != ReturnCode::SUCCESS ) { + Close(nullptr); + return open_res; + } + + sqlite3_busy_timeout(expire_db, busy_timeout * 1000); + + std::string cmd = "create table if not exists " + table_name + " ("; + cmd.append("key_str blob primary key, value_str blob not null, expire_time real);"); char* errorMsg = nullptr; - if ( int res = sqlite3_exec(db, create.c_str(), NULL, NULL, &errorMsg); res != SQLITE_OK ) { + if ( int res = sqlite3_exec(db, cmd.c_str(), NULL, NULL, &errorMsg); res != SQLITE_OK ) { std::string err = util::fmt("Error executing table creation statement: (%d) %s", res, errorMsg); Error(err.c_str()); sqlite3_free(errorMsg); @@ -132,34 +146,85 @@ OperationResult SQLite::DoOpen(OpenResultCallback* cb, RecordValPtr options) { return {ReturnCode::INITIALIZATION_FAILED, std::move(err)}; } - static std::array statements = - {util::fmt("insert into %s (key_str, value_str, expire_time) values(?, ?, ?)", table_name.c_str()), - util::fmt("insert into %s (key_str, value_str, expire_time) values(?, ?, ?) ON CONFLICT(key_str) " - "DO UPDATE SET value_str=?", - table_name.c_str()), - util::fmt("select value_str from %s where key_str=?", table_name.c_str()), - util::fmt("delete from %s where key_str=?", table_name.c_str()), - util::fmt("delete from %s where expire_time > 0 and expire_time != 0 and expire_time <= ?", - table_name.c_str())}; + sqlite3_free(errorMsg); - std::array stmt_ptrs; + // Create a table for controlling expiration contention. The ukey column here ensures that only + // one row exists for this backend's table. + cmd = util::fmt("create table if not exists zeek_storage_expiry_runs (ukey primary key, last_run double);"); + if ( int res = sqlite3_exec(db, cmd.c_str(), NULL, NULL, &errorMsg); res != SQLITE_OK ) { + std::string err = util::fmt("Error executing table creation statement: (%d) %s", res, errorMsg); + Error(err.c_str()); + sqlite3_free(errorMsg); + Close(nullptr); + return {ReturnCode::INITIALIZATION_FAILED, std::move(err)}; + } + + sqlite3_free(errorMsg); + + // Attempt to insert an initial value into the table if this is the first run with + // this file. This may result in a SQLITE_CONSTRAINT if the row already exists. That's + // not an error, as it's possible if the file already existed. + cmd = util::fmt("insert into zeek_storage_expiry_runs (ukey, last_run) values('%s', 0);", table_name.c_str()); + if ( int res = sqlite3_exec(db, cmd.c_str(), nullptr, nullptr, &errorMsg); + res != SQLITE_OK && res != SQLITE_CONSTRAINT ) { + std::string err = + util::fmt("Error inserting initial row into expiration control table: (%d) %s", res, errorMsg); + Error(err.c_str()); + sqlite3_free(errorMsg); + Close(nullptr); + return {ReturnCode::INITIALIZATION_FAILED, std::move(err)}; + } + + sqlite3_free(errorMsg); + + static std::array, 8> statements = + {std::make_pair(util::fmt("insert into %s (key_str, value_str, expire_time) values(?, ?, ?)", + table_name.c_str()), + db), + std::make_pair(util:: + fmt("insert into %s (key_str, value_str, expire_time) values(?, ?, ?) ON CONFLICT(key_str) " + "DO UPDATE SET value_str=?", + table_name.c_str()), + db), + std::make_pair(util::fmt("select value_str from %s where key_str=?", table_name.c_str()), db), + std::make_pair(util::fmt("delete from %s where key_str=?", table_name.c_str()), db), + + std::make_pair( + util::fmt("select count(*) from %s where expire_time > 0 and expire_time != 0 and expire_time <= ?", + table_name.c_str()), + expire_db), + std::make_pair(util::fmt("delete from %s where expire_time > 0 and expire_time != 0 and expire_time <= ?", + table_name.c_str()), + expire_db), + std::make_pair(util::fmt("select last_run from zeek_storage_expiry_runs where ukey = '%s'", + table_name.c_str()), + expire_db), + std::make_pair(util::fmt("update zeek_storage_expiry_runs set last_run = ? where ukey = '%s'", + table_name.c_str()), + expire_db)}; + + std::vector stmt_ptrs; int i = 0; - for ( const auto& stmt : statements ) { + for ( const auto& [stmt, stmt_db] : statements ) { sqlite3_stmt* ps; - if ( auto prep_res = CheckError(sqlite3_prepare_v2(db, stmt.c_str(), static_cast(stmt.size()), &ps, NULL)); + if ( auto prep_res = + CheckError(sqlite3_prepare_v2(stmt_db, stmt.c_str(), static_cast(stmt.size()), &ps, NULL)); prep_res.code != ReturnCode::SUCCESS ) { Close(nullptr); return prep_res; } - stmt_ptrs[i++] = unique_stmt_ptr(ps, sqlite3_finalize); + stmt_ptrs.emplace_back(ps, sqlite3_finalize); } put_stmt = std::move(stmt_ptrs[0]); put_update_stmt = std::move(stmt_ptrs[1]); get_stmt = std::move(stmt_ptrs[2]); erase_stmt = std::move(stmt_ptrs[3]); - expire_stmt = std::move(stmt_ptrs[4]); + check_expire_stmt = std::move(stmt_ptrs[4]); + expire_stmt = std::move(stmt_ptrs[5]); + get_expiry_last_run_stmt = std::move(stmt_ptrs[6]); + update_expiry_last_run_stmt = std::move(stmt_ptrs[7]); return {ReturnCode::SUCCESS}; } @@ -176,7 +241,10 @@ OperationResult SQLite::DoClose(ResultCallback* cb) { put_update_stmt.reset(); get_stmt.reset(); erase_stmt.reset(); + check_expire_stmt.reset(); expire_stmt.reset(); + get_expiry_last_run_stmt.reset(); + update_expiry_last_run_stmt.reset(); char* errmsg; if ( int res = sqlite3_exec(db, "pragma optimize", NULL, NULL, &errmsg); @@ -188,11 +256,20 @@ OperationResult SQLite::DoClose(ResultCallback* cb) { } if ( int res = sqlite3_close_v2(db); res != SQLITE_OK ) { + op_res.code = ReturnCode::DISCONNECTION_FAILED; if ( op_res.err_str.empty() ) - op_res.err_str = "Sqlite could not close connection"; + op_res.err_str = "Sqlite could not close main db connection"; } db = nullptr; + + if ( int res = sqlite3_close_v2(expire_db); res != SQLITE_OK ) { + op_res.code = ReturnCode::DISCONNECTION_FAILED; + if ( op_res.err_str.empty() ) + op_res.err_str = "Sqlite could not close expire db connection"; + } + + expire_db = nullptr; } return op_res; @@ -290,18 +367,109 @@ OperationResult SQLite::DoErase(ResultCallback* cb, ValPtr key) { * derived classes. */ void SQLite::DoExpire(double current_network_time) { - auto stmt = unique_stmt_ptr(expire_stmt.get(), sqlite3_reset); + int status; + char* errMsg = nullptr; + unique_stmt_ptr stmt; - int status = sqlite3_bind_double(stmt.get(), 1, current_network_time); + // Begin an exclusive transaction here to lock the database for this one process. That + // will ensure there isn't a TOCTOU bug in the time check below. + while ( true ) { + status = sqlite3_exec(expire_db, "begin immediate transaction", nullptr, nullptr, &errMsg); + sqlite3_free(errMsg); + + if ( status == SQLITE_OK ) + break; + else + // If any other status is returned here, give up. Notably, this includes + // SQLITE_BUSY which will be returned if there was already a transaction + // running. If one node got in and made the transaction, expiration is + // happening so the rest don't need to retry. + return; + } + + // Automatically rollback the transaction when this object is deleted. + auto deferred_rollback = util::Deferred([this]() { + char* errMsg = nullptr; + sqlite3_exec(expire_db, "rollback transaction", nullptr, nullptr, &errMsg); + sqlite3_free(errMsg); + }); + + // Check if there's anything to expire. + stmt = unique_stmt_ptr(check_expire_stmt.get(), sqlite3_reset); + status = sqlite3_bind_double(stmt.get(), 1, current_network_time); + while ( status != SQLITE_ROW ) { + status = sqlite3_step(stmt.get()); + if ( status == SQLITE_ROW ) { + auto num_to_expire = sqlite3_column_int(stmt.get(), 0); + + DBG_LOG(DBG_STORAGE, "Expiration has %d elements to expire", num_to_expire); + if ( num_to_expire == 0 ) + return; + } + else + return; + } + + // Check if the expiration control key is less than the interval. Exit if not. + stmt = unique_stmt_ptr(get_expiry_last_run_stmt.get(), sqlite3_reset); + while ( status != SQLITE_ROW ) { + status = sqlite3_step(stmt.get()); + if ( status == SQLITE_ROW ) { + double last_run = sqlite3_column_double(stmt.get(), 0); + + DBG_LOG(DBG_STORAGE, "Expiration last run: %f diff: %f interval: %f", last_run, + current_network_time - last_run, zeek::BifConst::Storage::expire_interval); + + if ( current_network_time > 0 && + (current_network_time - last_run) < zeek::BifConst::Storage::expire_interval ) + return; + } + else + return; + } + + // Update the expiration control key + stmt = unique_stmt_ptr(update_expiry_last_run_stmt.get(), sqlite3_reset); + status = sqlite3_bind_double(stmt.get(), 1, current_network_time); if ( status != SQLITE_OK ) { - // TODO: do something with the error? + std::string err = + util::fmt("Error preparing statement to update expiration control time: %s", sqlite3_errmsg(expire_db)); + DBG_LOG(DBG_STORAGE, "%s", err.c_str()); + Error(err.c_str()); + sqlite3_free(errMsg); return; } status = sqlite3_step(stmt.get()); - if ( status != SQLITE_ROW ) { - // TODO: should this return an error somehow? Reporter warning? + if ( status != SQLITE_ROW && status != SQLITE_DONE ) { + std::string err = util::fmt("Error updating expiration control time: %s", errMsg); + DBG_LOG(DBG_STORAGE, "%s", err.c_str()); + Error(err.c_str()); + sqlite3_free(errMsg); + return; } + + // Delete the values. + stmt = unique_stmt_ptr(expire_stmt.get(), sqlite3_reset); + + status = sqlite3_bind_double(stmt.get(), 1, current_network_time); + if ( status != SQLITE_OK ) { + std::string err = util::fmt("Error preparing statement to expire elements: %s", sqlite3_errmsg(expire_db)); + DBG_LOG(DBG_STORAGE, "%s", err.c_str()); + Error(err.c_str()); + sqlite3_free(errMsg); + return; + } + + status = sqlite3_step(stmt.get()); + if ( status != SQLITE_ROW && status != SQLITE_DONE ) { + std::string err = util::fmt("Error expiring elements: %s", sqlite3_errmsg(expire_db)); + DBG_LOG(DBG_STORAGE, "%s", err.c_str()); + Error(err.c_str()); + } + + sqlite3_exec(expire_db, "commit transaction", nullptr, nullptr, &errMsg); + sqlite3_free(errMsg); } // returns true in case of error diff --git a/src/storage/backend/sqlite/SQLite.h b/src/storage/backend/sqlite/SQLite.h index 29cae4fd89..9b4677737d 100644 --- a/src/storage/backend/sqlite/SQLite.h +++ b/src/storage/backend/sqlite/SQLite.h @@ -53,6 +53,7 @@ private: OperationResult RunPragma(std::string_view name, std::optional value = std::nullopt); sqlite3* db = nullptr; + sqlite3* expire_db = nullptr; using sqlite_stmt_func = std::function; using unique_stmt_ptr = std::unique_ptr; @@ -61,7 +62,11 @@ private: unique_stmt_ptr put_update_stmt; unique_stmt_ptr get_stmt; unique_stmt_ptr erase_stmt; + + unique_stmt_ptr check_expire_stmt; unique_stmt_ptr expire_stmt; + unique_stmt_ptr get_expiry_last_run_stmt; + unique_stmt_ptr update_expiry_last_run_stmt; std::string full_path; std::string table_name; From be71196fa7441e1cf765900cc763b2e8c39cfef4 Mon Sep 17 00:00:00 2001 From: Tim Wojtulewicz Date: Tue, 3 Jun 2025 15:03:40 -0700 Subject: [PATCH 5/5] Add busy_timeout script-level option, override any busy_timeout pragma --- .../frameworks/storage/backend/sqlite/main.zeek | 9 +++++++-- src/storage/backend/sqlite/SQLite.cc | 13 ++++++++----- .../out | 3 +-- 3 files changed, 16 insertions(+), 9 deletions(-) diff --git a/scripts/policy/frameworks/storage/backend/sqlite/main.zeek b/scripts/policy/frameworks/storage/backend/sqlite/main.zeek index c8c5f6ca87..505aee00de 100644 --- a/scripts/policy/frameworks/storage/backend/sqlite/main.zeek +++ b/scripts/policy/frameworks/storage/backend/sqlite/main.zeek @@ -20,14 +20,19 @@ export { ## different between the two. table_name: string; + ## The timeout for the connection to the database. This is set + ## per-connection. It is equivalent to setting a ``busy_timeout`` pragma + ## value, but that value will be ignored in favor of this field. + busy_timeout: interval &default=5 secs; + ## Key/value table for passing pragma commands when opening the database. ## These must be pairs that can be passed to the ``pragma`` command in ## sqlite. The ``integrity_check`` pragma is run automatically and does ## not need to be included here. For pragmas without a second argument, - ## set the value to an empty string. + ## set the value to an empty string. Setting the ``busy_timeout`` pragma + ## here will be ignored. pragma_commands: table[string] of string &ordered &default=table( ["integrity_check"] = "", - ["busy_timeout"] = "5000", ["journal_mode"] = "WAL", ["synchronous"] = "normal", ["temp_store"] = "memory" diff --git a/src/storage/backend/sqlite/SQLite.cc b/src/storage/backend/sqlite/SQLite.cc index 596ae6374f..309407bb6c 100644 --- a/src/storage/backend/sqlite/SQLite.cc +++ b/src/storage/backend/sqlite/SQLite.cc @@ -84,6 +84,8 @@ OperationResult SQLite::DoOpen(OpenResultCallback* cb, RecordValPtr options) { full_path = zeek::filesystem::path(path->ToStdString()).string(); table_name = backend_options->GetField("table_name")->ToStdString(); + auto busy_timeout = backend_options->GetField("busy_timeout")->Get(); + auto pragma_timeout_val = backend_options->GetField("pragma_timeout"); pragma_timeout = std::chrono::milliseconds(static_cast(pragma_timeout_val->Get() * 1000)); @@ -99,25 +101,26 @@ OperationResult SQLite::DoOpen(OpenResultCallback* cb, RecordValPtr options) { return open_res; } - // TODO: Should we use sqlite3_busy_timeout here instead of using the pragma? That would - // at least let us skip over one. The busy timeout is per-connection as well, so it'll - // never fail to run like the other pragmas can. - // sqlite3_busy_timeout(db, 2000); + sqlite3_busy_timeout(db, busy_timeout * 1000); auto pragmas = backend_options->GetField("pragma_commands"); for ( const auto& iter : *(pragmas->Get()) ) { auto k = iter.GetHashKey(); - auto v = iter.value; auto vl = pragmas->GetTableHash()->RecoverVals(*k); auto ks = vl->AsListVal()->Idx(0)->AsStringVal(); auto ks_sv = ks->ToStdStringView(); + + if ( ks_sv == "busy_timeout" ) + continue; + auto vs = iter.value->GetVal()->AsStringVal(); auto vs_sv = vs->ToStdStringView(); auto pragma_res = RunPragma(ks_sv, vs_sv); if ( pragma_res.code != ReturnCode::SUCCESS ) { Error(pragma_res.err_str.c_str()); + Close(nullptr); return pragma_res; } } diff --git a/testing/btest/Baseline/scripts.base.frameworks.storage.sqlite-basic/out b/testing/btest/Baseline/scripts.base.frameworks.storage.sqlite-basic/out index 0d8cb07abb..055e0cd530 100644 --- a/testing/btest/Baseline/scripts.base.frameworks.storage.sqlite-basic/out +++ b/testing/btest/Baseline/scripts.base.frameworks.storage.sqlite-basic/out @@ -1,7 +1,6 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. -Storage::backend_opened, Storage::STORAGE_BACKEND_SQLITE, [serializer=Storage::STORAGE_SERIALIZER_JSON, sqlite=[database_path=test.sqlite, table_name=testing, pragma_commands={ +Storage::backend_opened, Storage::STORAGE_BACKEND_SQLITE, [serializer=Storage::STORAGE_SERIALIZER_JSON, sqlite=[database_path=test.sqlite, table_name=testing, busy_timeout=5.0 secs, pragma_commands={ [integrity_check] = , -[busy_timeout] = 5000, [journal_mode] = WAL, [synchronous] = normal, [temp_store] = memory