Merge remote-tracking branch 'origin/topic/timw/sqlite-cluster-test'

* origin/topic/timw/sqlite-cluster-test:
  SQLite: Move integrity_check to pragma table
  SQLite: Add backend option for pragma timeout
  SQLite: Rename tuning_params to pragma_commands, move running pragmas to utility method
  SQLite: Retry pragma statements at startup to avoid contention
  SQLite: Check for locked database as well as busy databases
  SQLite: Fix some string-sizing issues
  SQLite: Run pragmas on connection before creating table
  SQLite: Add busy_timeout pragma to default options
  Prefix sqlite-based btests with sqlite- to match redis tests
  Add sqlite cluster storage btest
This commit is contained in:
Tim Wojtulewicz 2025-05-21 09:40:31 -07:00
commit 1862e66097
18 changed files with 213 additions and 39 deletions

22
CHANGES
View file

@ -1,3 +1,25 @@
8.0.0-dev.179 | 2025-05-21 09:40:31 -0700
* SQLite: Move integrity_check to pragma table (Tim Wojtulewicz, Corelight)
* SQLite: Add backend option for pragma timeout (Tim Wojtulewicz, Corelight)
* SQLite: Rename tuning_params to pragma_commands, move running pragmas to utility method (Tim Wojtulewicz, Corelight)
* SQLite: Retry pragma statements at startup to avoid contention (Tim Wojtulewicz, Corelight)
* SQLite: Check for locked database as well as busy databases (Tim Wojtulewicz, Corelight)
* SQLite: Fix some string-sizing issues (Tim Wojtulewicz, Corelight)
* SQLite: Run pragmas on connection before creating table (Tim Wojtulewicz, Corelight)
* SQLite: Add busy_timeout pragma to default options (Tim Wojtulewicz, Corelight)
* Prefix sqlite-based btests with sqlite- to match redis tests (Tim Wojtulewicz, Corelight)
* Add sqlite cluster storage btest (Tim Wojtulewicz, Corelight)
8.0.0-dev.168 | 2025-05-21 13:38:46 +0200
* http/detect-sql-injection: Fix zeekygen comment (Arne Welzel, Corelight)

View file

@ -1 +1 @@
8.0.0-dev.168
8.0.0-dev.179

View file

@ -20,14 +20,28 @@ export {
## different between the two.
table_name: string;
## Key/value table for passing tuning parameters when opening the
## database. These must be pairs that can be passed to the ``pragma``
## command in sqlite.
tuning_params: table[string] of string &default=table(
## Key/value table for passing pragma commands when opening the database.
## These must be pairs that can be passed to the ``pragma`` command in
## sqlite. The ``integrity_check`` pragma is run automatically and does
## not need to be included here. For pragmas without a second argument,
## set the value to an empty string.
pragma_commands: table[string] of string &ordered &default=table(
["integrity_check"] = "",
["busy_timeout"] = "5000",
["journal_mode"] = "WAL",
["synchronous"] = "normal",
["temp_store"] = "memory"
);
) &ordered;
## The total amount of time that an SQLite backend will spend attempting
## to run an individual pragma command before giving up and returning an
## initialization error. Setting this to zero will result in the backend
## attempting forever until success.
pragma_timeout: interval &default=500 msec;
## The amount of time that at SQLite backend will wait between failures
## to run an individual pragma command.
pragma_wait_on_busy: interval &default=5 msec;
};
}

View file

@ -2,13 +2,56 @@
#include "zeek/storage/backend/sqlite/SQLite.h"
#include <thread>
#include "zeek/3rdparty/sqlite3.h"
#include "zeek/DebugLogger.h"
#include "zeek/Func.h"
#include "zeek/Val.h"
#include "zeek/storage/ReturnCode.h"
using namespace std::chrono_literals;
namespace zeek::storage::backend::sqlite {
OperationResult SQLite::RunPragma(std::string_view name, std::optional<std::string_view> value) {
char* errorMsg = nullptr;
std::chrono::milliseconds time_spent = 0ms;
std::string cmd = util::fmt("pragma %.*s", static_cast<int>(name.size()), name.data());
if ( value && ! value->empty() )
cmd += util::fmt(" = %.*s", static_cast<int>(value->size()), value->data());
DBG_LOG(DBG_STORAGE, "Executing pragma %s on %s", cmd.c_str(), full_path.c_str());
while ( pragma_timeout == 0ms || time_spent < pragma_timeout ) {
int res = sqlite3_exec(db, cmd.c_str(), NULL, NULL, &errorMsg);
if ( res == SQLITE_OK ) {
break;
}
else if ( res == SQLITE_BUSY ) {
// If we got back that the database is busy, it likely means that another process is trying to
// do their pragmas at startup too. Exponentially back off and try again after a sleep.
sqlite3_free(errorMsg);
std::this_thread::sleep_for(pragma_wait_on_busy);
time_spent += pragma_wait_on_busy;
}
else {
std::string err = util::fmt("Error while executing pragma '%s': %s", cmd.c_str(), errorMsg);
sqlite3_free(errorMsg);
return {ReturnCode::INITIALIZATION_FAILED, std::move(err)};
}
}
if ( pragma_timeout != 0ms && time_spent >= pragma_timeout ) {
std::string err =
util::fmt("Database was busy while executing %.*s pragma", static_cast<int>(name.size()), name.data());
return {ReturnCode::INITIALIZATION_FAILED, std::move(err)};
}
return {ReturnCode::SUCCESS};
}
storage::BackendPtr SQLite::Instantiate() { return make_intrusive<SQLite>(); }
/**
@ -34,6 +77,12 @@ OperationResult SQLite::DoOpen(OpenResultCallback* cb, RecordValPtr options) {
full_path = zeek::filesystem::path(path->ToStdString()).string();
table_name = backend_options->GetField<StringVal>("table_name")->ToStdString();
auto pragma_timeout_val = backend_options->GetField<IntervalVal>("pragma_timeout");
pragma_timeout = std::chrono::milliseconds(static_cast<int64_t>(pragma_timeout_val->Get() * 1000));
auto pragma_wof_val = backend_options->GetField<IntervalVal>("pragma_wait_on_busy");
pragma_wait_on_busy = std::chrono::milliseconds(static_cast<int64_t>(pragma_timeout_val->Get() * 1000));
if ( auto open_res =
CheckError(sqlite3_open_v2(full_path.c_str(), &db,
SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_FULLMUTEX, NULL));
@ -43,41 +92,32 @@ OperationResult SQLite::DoOpen(OpenResultCallback* cb, RecordValPtr options) {
return open_res;
}
auto pragmas = backend_options->GetField<TableVal>("pragma_commands")->ToMap();
for ( const auto& [k, v] : pragmas ) {
auto ks = k->AsListVal()->Idx(0)->AsStringVal();
auto ks_sv = ks->ToStdStringView();
auto vs = v->AsStringVal();
auto vs_sv = vs->ToStdStringView();
auto pragma_res = RunPragma(ks_sv, vs_sv);
if ( pragma_res.code != ReturnCode::SUCCESS ) {
Error(pragma_res.err_str.c_str());
return pragma_res;
}
}
std::string create = "create table if not exists " + table_name + " (";
create.append("key_str blob primary key, value_str blob not null, expire_time real);");
char* errorMsg = nullptr;
if ( int res = sqlite3_exec(db, create.c_str(), NULL, NULL, &errorMsg); res != SQLITE_OK ) {
std::string err = util::fmt("Error executing table creation statement: %s", errorMsg);
std::string err = util::fmt("Error executing table creation statement: (%d) %s", res, errorMsg);
Error(err.c_str());
sqlite3_free(errorMsg);
Close(nullptr);
return {ReturnCode::INITIALIZATION_FAILED, std::move(err)};
}
if ( int res = sqlite3_exec(db, "pragma integrity_check", NULL, NULL, &errorMsg); res != SQLITE_OK ) {
std::string err = util::fmt("Error executing integrity check: %s", errorMsg);
Error(err.c_str());
sqlite3_free(errorMsg);
Close(nullptr);
return {ReturnCode::INITIALIZATION_FAILED, std::move(err)};
}
auto tuning_params = backend_options->GetField<TableVal>("tuning_params")->ToMap();
for ( const auto& [k, v] : tuning_params ) {
auto ks = k->AsListVal()->Idx(0)->AsStringVal();
auto vs = v->AsStringVal();
std::string cmd = util::fmt("pragma %s = %s", ks->ToStdStringView().data(), vs->ToStdStringView().data());
if ( int res = sqlite3_exec(db, cmd.c_str(), NULL, NULL, &errorMsg); res != SQLITE_OK ) {
std::string err = util::fmt("Error executing tuning pragma statement: %s", errorMsg);
Error(err.c_str());
sqlite3_free(errorMsg);
Close(nullptr);
return {ReturnCode::INITIALIZATION_FAILED, std::move(err)};
}
}
static std::array<std::string, 5> statements =
{util::fmt("insert into %s (key_str, value_str, expire_time) values(?, ?, ?)", table_name.c_str()),
util::fmt("insert into %s (key_str, value_str, expire_time) values(?, ?, ?) ON CONFLICT(key_str) "
@ -92,7 +132,7 @@ OperationResult SQLite::DoOpen(OpenResultCallback* cb, RecordValPtr options) {
int i = 0;
for ( const auto& stmt : statements ) {
sqlite3_stmt* ps;
if ( auto prep_res = CheckError(sqlite3_prepare_v2(db, stmt.c_str(), stmt.size(), &ps, NULL));
if ( auto prep_res = CheckError(sqlite3_prepare_v2(db, stmt.c_str(), static_cast<int>(stmt.size()), &ps, NULL));
prep_res.code != ReturnCode::SUCCESS ) {
Close(nullptr);
return prep_res;
@ -107,8 +147,6 @@ OperationResult SQLite::DoOpen(OpenResultCallback* cb, RecordValPtr options) {
erase_stmt = std::move(stmt_ptrs[3]);
expire_stmt = std::move(stmt_ptrs[4]);
sqlite3_busy_timeout(db, 5000);
return {ReturnCode::SUCCESS};
}
@ -126,7 +164,8 @@ OperationResult SQLite::DoClose(ResultCallback* cb) {
expire_stmt.reset();
char* errmsg;
if ( int res = sqlite3_exec(db, "pragma optimize", NULL, NULL, &errmsg); res != SQLITE_OK ) {
if ( int res = sqlite3_exec(db, "pragma optimize", NULL, NULL, &errmsg);
res != SQLITE_OK && res != SQLITE_BUSY ) {
// We're shutting down so capture the error message here for informational
// reasons, but don't do anything else with it.
op_res = {ReturnCode::DISCONNECTION_FAILED, util::fmt("Sqlite failed to optimize at shutdown: %s", errmsg)};
@ -294,7 +333,7 @@ OperationResult SQLite::Step(sqlite3_stmt* stmt, bool parse_value) {
else
ret = {ReturnCode::SUCCESS};
}
else if ( step_status == SQLITE_BUSY )
else if ( step_status == SQLITE_BUSY || step_status == SQLITE_LOCKED )
// TODO: this could retry a number of times instead of just failing
ret = {ReturnCode::TIMEOUT};
else if ( step_status == SQLITE_CONSTRAINT )

View file

@ -2,6 +2,8 @@
#pragma once
#include <chrono>
#include "zeek/storage/Backend.h"
// Forward declare these to avoid including sqlite3.h here
@ -45,6 +47,11 @@ private:
*/
OperationResult Step(sqlite3_stmt* stmt, bool parse_value = false);
/**
* Helper utility for running pragmas on the database.
*/
OperationResult RunPragma(std::string_view name, std::optional<std::string_view> value = std::nullopt);
sqlite3* db = nullptr;
using stmt_deleter = std::function<void(sqlite3_stmt*)>;
@ -57,6 +64,8 @@ private:
std::string full_path;
std::string table_name;
std::chrono::milliseconds pragma_timeout;
std::chrono::milliseconds pragma_wait_on_busy;
};
} // namespace zeek::storage::backend::sqlite

View file

@ -1,9 +1,11 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
Storage::backend_opened, Storage::STORAGE_BACKEND_SQLITE, [serializer=Storage::STORAGE_SERIALIZER_JSON, sqlite=[database_path=test.sqlite, table_name=testing, tuning_params={
Storage::backend_opened, Storage::STORAGE_BACKEND_SQLITE, [serializer=Storage::STORAGE_SERIALIZER_JSON, sqlite=[database_path=test.sqlite, table_name=testing, pragma_commands={
[synchronous] = normal,
[temp_store] = memory,
[journal_mode] = WAL
}]]
[journal_mode] = WAL,
[busy_timeout] = 5000,
[integrity_check] = ,
[temp_store] = memory
}, pragma_timeout=500.0 msecs, pragma_wait_on_failure=5.0 msecs]]
open result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=<opaque of BackendHandleVal>]
put result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=<uninitialized>]
get result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=value5678]

View file

@ -0,0 +1,4 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
worker-1, put result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=<uninitialized>]
sqlite_data_written
worker-1, [code=Storage::SUCCESS, error_str=<uninitialized>, value=5678]

View file

@ -0,0 +1,3 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
sqlite_data_written
worker-2, [code=Storage::SUCCESS, error_str=<uninitialized>, value=5678]

View file

@ -0,0 +1,81 @@
# @TEST-DOC: Tests SQLite storage in a cluster environment
# @TEST-PORT: BROKER_MANAGER_PORT
# @TEST-PORT: BROKER_WORKER1_PORT
# @TEST-PORT: BROKER_WORKER2_PORT
#
# @TEST-EXEC: cp $FILES/broker/cluster-layout.zeek .
# @TEST-EXEC: btest-bg-run manager ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=manager zeek -b %INPUT
# @TEST-EXEC: btest-bg-run worker-1 ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=worker-1 zeek -b %INPUT
# @TEST-EXEC: btest-bg-run worker-2 ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=worker-2 zeek -b %INPUT
# @TEST-EXEC: btest-bg-wait -k 5
# @TEST-EXEC: btest-diff worker-1/.stdout
# @TEST-EXEC: btest-diff worker-2/.stdout
# @TEST-EXEC:
@load base/frameworks/storage/sync
@load base/frameworks/cluster
@load policy/frameworks/storage/backend/sqlite
@load policy/frameworks/cluster/experimental
global sqlite_data_written: event() &is_used;
@if ( Cluster::local_node_type() == Cluster::WORKER )
global backend: opaque of Storage::BackendHandle;
event zeek_init()
{
local opts: Storage::BackendOptions;
opts$sqlite = [ $database_path="../test.sqlite", $table_name="testing" ];
local open_res = Storage::Sync::open_backend(Storage::STORAGE_BACKEND_SQLITE, opts, string, string);
if ( open_res$code != Storage::SUCCESS ) {
print fmt("Worker %s failed to open backend: %s", Cluster::node, open_res$error_str);
terminate();
}
backend = open_res$value;
}
event sqlite_data_written()
{
print "sqlite_data_written";
local res = Storage::Sync::get(backend, "1234");
print Cluster::node, res;
Storage::Sync::close_backend(backend);
terminate();
}
@else
global node_count: count = 0;
event Cluster::node_down(name: string, id: string)
{
++node_count;
if ( node_count == 2 )
terminate();
}
event sqlite_data_written()
{
local e = Cluster::make_event(sqlite_data_written);
Cluster::publish(Cluster::worker_topic, e);
}
@endif
@if ( Cluster::node == "worker-1" )
event Cluster::Experimental::cluster_started()
{
local res = Storage::Sync::put(backend, [ $key="1234", $value="5678" ]);
print Cluster::node, "put result", res;
local e = Cluster::make_event(sqlite_data_written);
Cluster::publish(Cluster::manager_topic, e);
}
@endif