diff --git a/scripts/base/frameworks/storage/main.zeek b/scripts/base/frameworks/storage/main.zeek index 75d72fe706..51ab13c27c 100644 --- a/scripts/base/frameworks/storage/main.zeek +++ b/scripts/base/frameworks/storage/main.zeek @@ -29,4 +29,7 @@ export { ## backend. expire_time: interval &default=0sec; }; + + # The histogram buckets to use for operation latency metrics, in seconds. + const latency_metric_bounds: vector of double = { 0.001, 0.01, 0.1, 1.0, } &redef; } diff --git a/src/storage/Backend.cc b/src/storage/Backend.cc index 566ad56850..533d11eaa0 100644 --- a/src/storage/Backend.cc +++ b/src/storage/Backend.cc @@ -8,9 +8,30 @@ #include "zeek/storage/Manager.h" #include "zeek/storage/ReturnCode.h" #include "zeek/storage/storage-events.bif.h" +#include "zeek/telemetry/Counter.h" +#include "zeek/telemetry/Histogram.h" +#include "zeek/telemetry/Manager.h" namespace zeek::storage { +namespace detail { + +OperationMetrics::OperationMetrics(const telemetry::CounterFamilyPtr& results_family, + const telemetry::HistogramFamilyPtr& latency_family, std::string_view operation_type, + std::string_view backend_type, std::string_view backend_config) + : success(results_family->GetOrAdd( + {{"operation", operation_type}, {"type", backend_type}, {"config", backend_config}, {"result", "success"}})), + fail(results_family->GetOrAdd( + {{"operation", operation_type}, {"type", backend_type}, {"config", backend_config}, {"result", "fail"}})), + error(results_family->GetOrAdd( + {{"operation", operation_type}, {"type", backend_type}, {"config", backend_config}, {"result", "error"}})), + timeouts(results_family->GetOrAdd( + {{"operation", operation_type}, {"type", backend_type}, {"config", backend_config}, {"result", "timeout"}})), + latency(latency_family->GetOrAdd( + {{"operation", operation_type}, {"type", backend_type}, {"config", backend_config}})) {} + +} // namespace detail + RecordValPtr OperationResult::BuildVal() { return MakeVal(code, err_str, value); } RecordValPtr OperationResult::MakeVal(EnumValPtr code, std::string_view err_str, ValPtr value) { @@ -32,9 +53,14 @@ ResultCallback::ResultCallback(zeek::detail::trigger::TriggerPtr trigger, const void ResultCallback::Timeout() { if ( ! IsSyncCallback() ) trigger->Cache(assoc, OperationResult::MakeVal(ReturnCode::TIMEOUT).get()); + + if ( operation_metrics ) + operation_metrics->timeouts->Inc(); } void ResultCallback::Complete(OperationResult res) { + UpdateOperationMetrics(res.code); + // If this is a sync callback, there isn't a trigger to process. Store the result and bail. if ( IsSyncCallback() ) { result = std::move(res); @@ -46,6 +72,25 @@ void ResultCallback::Complete(OperationResult res) { trigger->Release(); } +void ResultCallback::Init(detail::OperationMetrics* m) { + operation_metrics = m; + start_time = util::current_time(true); +} + +void ResultCallback::UpdateOperationMetrics(EnumValPtr c) { + if ( operation_metrics ) { + if ( c == ReturnCode::SUCCESS ) + operation_metrics->success->Inc(); + else if ( c == ReturnCode::KEY_EXISTS || c == ReturnCode::KEY_NOT_FOUND ) + operation_metrics->fail->Inc(); + else if ( c != ReturnCode::IN_PROGRESS ) + operation_metrics->error->Inc(); + + // Store the latency between start and completion in milliseconds. + operation_metrics->latency->Observe(util::current_time(true) - start_time); + } +} + OpenResultCallback::OpenResultCallback(IntrusivePtr backend) : ResultCallback(), backend(std::move(backend)) {} @@ -55,6 +100,7 @@ OpenResultCallback::OpenResultCallback(zeek::detail::trigger::TriggerPtr trigger void OpenResultCallback::Complete(OperationResult res) { if ( res.code == ReturnCode::SUCCESS ) { + backend->backend->InitMetrics(); backend->backend->EnqueueBackendOpened(); } @@ -68,6 +114,12 @@ void OpenResultCallback::Complete(OperationResult res) { Backend::Backend(uint8_t modes, std::string_view tag_name) : modes(modes) { tag = storage_mgr->BackendMgr().GetComponentTag(std::string{tag_name}); tag_str = zeek::obj_desc_short(tag.AsVal().get()); + + // The rest of the metrics are initialized after the backend opens, but this one has + // to be here because it's possible it gets used by the open callback before Open() + // fully returns. + backends_opened_metric = + telemetry_mgr->CounterInstance("zeek", "storage_backends_opened", {}, "Number of backends opened", ""); } OperationResult Backend::Open(OpenResultCallback* cb, RecordValPtr options, TypePtr kt, TypePtr vt) { @@ -88,12 +140,22 @@ OperationResult Backend::Open(OpenResultCallback* cb, RecordValPtr options, Type if ( ! ret.value ) ret.value = cb->Backend(); + if ( ret.code == ReturnCode::SUCCESS ) + InitMetrics(); + + // Complete sync callbacks to make sure the metrics get initialized plus that the + // backend_opened event gets posted. + if ( cb->IsSyncCallback() ) + CompleteCallback(cb, ret); + return ret; } OperationResult Backend::Close(ResultCallback* cb) { return DoClose(cb); } OperationResult Backend::Put(ResultCallback* cb, ValPtr key, ValPtr value, bool overwrite, double expiration_time) { + cb->Init(put_metrics.get()); + // The intention for this method is to do some other heavy lifting in regard // to backends that need to pass data through the manager instead of directly // through the workers. For the first versions of the storage framework it @@ -109,10 +171,16 @@ OperationResult Backend::Put(ResultCallback* cb, ValPtr key, ValPtr value, bool return ret; } - return DoPut(cb, std::move(key), std::move(value), overwrite, expiration_time); + auto ret = DoPut(cb, std::move(key), std::move(value), overwrite, expiration_time); + if ( cb->IsSyncCallback() ) + cb->UpdateOperationMetrics(ret.code); + + return ret; } OperationResult Backend::Get(ResultCallback* cb, ValPtr key) { + cb->Init(get_metrics.get()); + // See the note in Put(). if ( ! same_type(key->GetType(), key_type) ) { auto ret = OperationResult{ReturnCode::KEY_TYPE_MISMATCH}; @@ -120,10 +188,16 @@ OperationResult Backend::Get(ResultCallback* cb, ValPtr key) { return ret; } - return DoGet(cb, std::move(key)); + auto ret = DoGet(cb, std::move(key)); + if ( cb->IsSyncCallback() ) + cb->UpdateOperationMetrics(ret.code); + + return ret; } OperationResult Backend::Erase(ResultCallback* cb, ValPtr key) { + cb->Init(erase_metrics.get()); + // See the note in Put(). if ( ! same_type(key->GetType(), key_type) ) { auto ret = OperationResult{ReturnCode::KEY_TYPE_MISMATCH}; @@ -131,7 +205,11 @@ OperationResult Backend::Erase(ResultCallback* cb, ValPtr key) { return ret; } - return DoErase(cb, std::move(key)); + auto ret = DoErase(cb, std::move(key)); + if ( cb->IsSyncCallback() ) + cb->UpdateOperationMetrics(ret.code); + + return ret; } void Backend::CompleteCallback(ResultCallback* cb, const OperationResult& data) const { @@ -145,12 +223,57 @@ void Backend::CompleteCallback(ResultCallback* cb, const OperationResult& data) } } -void Backend::EnqueueBackendOpened() { event_mgr.Enqueue(Storage::backend_opened, tag.AsVal(), backend_options); } +void Backend::EnqueueBackendOpened() { + event_mgr.Enqueue(Storage::backend_opened, tag.AsVal(), backend_options); + backends_opened_metric->Inc(); +} void Backend::EnqueueBackendLost(std::string_view reason) { event_mgr.Enqueue(Storage::backend_lost, tag.AsVal(), backend_options, make_intrusive(reason)); } +void Backend::InitMetrics() { + if ( metrics_initialized ) + return; + + metrics_initialized = true; + auto results_family = + telemetry_mgr->CounterFamily("zeek", "storage_backend_operation_results", + {"operation", "type", "config", "result"}, "Storage operation results"); + + auto bounds_val = zeek::id::find_val("Storage::latency_metric_bounds"); + std::vector bounds(bounds_val->Size()); + for ( unsigned int i = 0; i < bounds_val->Size(); i++ ) + bounds[i] = bounds_val->DoubleAt(i); + + auto latency_family = + telemetry_mgr->HistogramFamily("zeek", "storage_backend_operation_latency", {"operation", "type", "config"}, + bounds, "Storage Operation Latency", "seconds"); + + std::string metrics_config = GetConfigMetricsLabel(); + put_metrics = + std::make_unique(results_family, latency_family, "put", Tag(), metrics_config); + get_metrics = + std::make_unique(results_family, latency_family, "get", Tag(), metrics_config); + erase_metrics = + std::make_unique(results_family, latency_family, "erase", Tag(), metrics_config); + + bytes_read_metric = telemetry_mgr->CounterInstance("zeek", "storage_backend_data_written", + {{"type", Tag()}, {"config", metrics_config}}, + "Storage data written to backend", "bytes"); + bytes_written_metric = telemetry_mgr->CounterInstance("zeek", "storage_backend_data_read", + {{"type", Tag()}, {"config", metrics_config}}, + "Storage data read from backend", "bytes"); + + expired_entries_metric = telemetry_mgr->CounterInstance("zeek", "storage_backend_expired_entries", + {{"type", Tag()}, {"config", metrics_config}}, + "Storage expired entries removed by backend", ""); +} + +void Backend::IncBytesWrittenMetric(size_t written) { bytes_written_metric->Inc(written); } +void Backend::IncBytesReadMetric(size_t read) { bytes_read_metric->Inc(read); } +void Backend::IncExpiredEntriesMetric(size_t expired) { expired_entries_metric->Inc(expired); } + zeek::OpaqueTypePtr detail::backend_opaque; IMPLEMENT_OPAQUE_VALUE(detail::BackendHandleVal) diff --git a/src/storage/Backend.h b/src/storage/Backend.h index ab4156f44d..8aa660cc68 100644 --- a/src/storage/Backend.h +++ b/src/storage/Backend.h @@ -2,6 +2,8 @@ #pragma once +#include + #include "zeek/OpaqueVal.h" #include "zeek/Tag.h" #include "zeek/Val.h" @@ -12,8 +14,33 @@ class Trigger; using TriggerPtr = IntrusivePtr; } // namespace zeek::detail::trigger +namespace zeek::telemetry { +class Counter; +using CounterPtr = std::shared_ptr; +class CounterFamily; +using CounterFamilyPtr = std::shared_ptr; +class Histogram; +using HistogramPtr = std::shared_ptr; +class HistogramFamily; +using HistogramFamilyPtr = std::shared_ptr; +} // namespace zeek::telemetry + namespace zeek::storage { +namespace detail { +struct OperationMetrics { + telemetry::CounterPtr success; + telemetry::CounterPtr fail; + telemetry::CounterPtr error; + telemetry::CounterPtr timeouts; + telemetry::HistogramPtr latency; + + OperationMetrics(const telemetry::CounterFamilyPtr& results_family, + const telemetry::HistogramFamilyPtr& latency_family, std::string_view operation_type, + std::string_view backend_type, std::string_view backend_config); +}; +} // namespace detail + class Manager; /** @@ -82,10 +109,37 @@ public: OperationResult Result() const { return result; } + /** + * Stores the collection of metrics instruments to update when the operation completes + * and sets the start time for an operation to be used to update the latency metric + * when the operation completes. This is unset for open/close callbacks. + */ + void Init(detail::OperationMetrics* m); + + /** + * Update the metrics based on a return value. + */ + void UpdateOperationMetrics(EnumValPtr c); + + /** + * Stores the amount of data transferred in the operation. This can be used by async + * backends to set the amount transferred in Put operations so it can be added to the + * metrics when the operation finishes. + */ + void AddDataTransferredSize(size_t size) { transferred_size += size; } + + /** + * Returns the amount of data transferred in this operation. + */ + size_t GetDataTransferredSize() const { return transferred_size; } + protected: zeek::detail::trigger::TriggerPtr trigger; const void* assoc = nullptr; OperationResult result; + detail::OperationMetrics* operation_metrics = nullptr; + double start_time = 0.0; + size_t transferred_size = 0; }; class OpenResultCallback; @@ -101,7 +155,7 @@ public: /** * Returns a descriptive tag representing the source for debugging. */ - const char* Tag() { return tag_str.c_str(); } + const char* Tag() const { return tag_str.c_str(); } /** * Store a new key/value pair in the backend. @@ -234,6 +288,34 @@ protected: */ void CompleteCallback(ResultCallback* cb, const OperationResult& data) const; + /** + * Returns a string compatible with Prometheus that's used as a tag to differentiate + * entries of backend instances. + */ + std::string GetConfigMetricsLabel() const { return DoGetConfigMetricsLabel(); } + + /** + * Utility method to increase the metrics for number of bytes written by a backend. + * + * @param written The number of bytes written by the last operation. + */ + void IncBytesWrittenMetric(size_t written); + + /** + * Utility method to increase the metrics for number of bytes read by a backend. + * + * @param read The number of bytes read by the last operation. + */ + void IncBytesReadMetric(size_t read); + + /** + * Utility method to increase the metrics for number of entries expired and removed + * from the backend. + * + * @param expired The number of elements removed by the last operation. + */ + void IncExpiredEntriesMetric(size_t expired); + TypePtr key_type; TypePtr val_type; RecordValPtr backend_options; @@ -291,7 +373,30 @@ private: */ virtual void DoExpire(double current_network_time) {} + /** + * Returns a string compatible with Prometheus that's used as a tag to differentiate + * entries of backend instances. + */ + virtual std::string DoGetConfigMetricsLabel() const = 0; + + /** + * Initializes the instruments for various storage metrics. + */ + void InitMetrics(); + uint8_t modes; + bool metrics_initialized = false; + + // These are owned by the backend but are passed into the callbacks to be + // updated when those complete/timeout. + std::unique_ptr put_metrics; + std::unique_ptr get_metrics; + std::unique_ptr erase_metrics; + + telemetry::CounterPtr bytes_written_metric; + telemetry::CounterPtr bytes_read_metric; + telemetry::CounterPtr backends_opened_metric; + telemetry::CounterPtr expired_entries_metric; }; using BackendPtr = zeek::IntrusivePtr; diff --git a/src/storage/backend/redis/Redis.cc b/src/storage/backend/redis/Redis.cc index a57090e1ec..9425f897d1 100644 --- a/src/storage/backend/redis/Redis.cc +++ b/src/storage/backend/redis/Redis.cc @@ -11,6 +11,7 @@ #include "zeek/Val.h" #include "zeek/iosource/Manager.h" #include "zeek/storage/ReturnCode.h" +#include "zeek/telemetry/Counter.h" #include "hiredis/adapters/poll.h" #include "hiredis/async.h" @@ -246,6 +247,17 @@ constexpr char REQUIRED_VERSION[] = "6.2.0"; storage::BackendPtr Redis::Instantiate() { return make_intrusive(); } +std::string Redis::DoGetConfigMetricsLabel() const { + static auto running_under_test = id::find_val("running_under_test")->AsBool(); + std::string tag; + if ( running_under_test ) + tag = util::fmt("server_addr-%s", key_prefix.c_str()); + else + tag = util::fmt("%s-%s", server_addr.c_str(), key_prefix.c_str()); + + return tag; +} + /** * Called by the manager system to open the backend. */ @@ -406,6 +418,8 @@ OperationResult Redis::DoPut(ResultCallback* cb, ValPtr key, ValPtr value, bool if ( connected && status == REDIS_ERR ) return {ReturnCode::OPERATION_FAILED, util::fmt("Failed to queue put operation: %s", async_ctx->errstr)}; + cb->AddDataTransferredSize(key_data->size() + val_data->size()); + ++active_ops; // If reading pcaps insert into a secondary set that's ordered by expiration @@ -537,6 +551,8 @@ void Redis::DoExpire(double current_network_time) { // TODO: do we care if this failed? } + IncExpiredEntriesMetric(elements.size()); + freeReplyObject(reply); // Remove all of the elements from the range-set that match the time range. @@ -566,6 +582,8 @@ void Redis::HandlePutResult(redisReply* reply, ResultCallback* callback) { else if ( reply->type == REDIS_REPLY_ERROR ) res = ParseReplyError("put", reply->str); + IncBytesWrittenMetric(callback->GetDataTransferredSize()); + freeReplyObject(reply); CompleteCallback(callback, res); } @@ -583,6 +601,7 @@ void Redis::HandleGetResult(redisReply* reply, ResultCallback* callback) { else if ( reply->type == REDIS_REPLY_ERROR ) res = ParseReplyError("get", reply->str); else { + IncBytesReadMetric(reply->len); auto val = serializer->Unserialize({(std::byte*)reply->str, reply->len}, val_type); if ( val ) res = {ReturnCode::SUCCESS, "", val.value()}; diff --git a/src/storage/backend/redis/Redis.h b/src/storage/backend/redis/Redis.h index c34617cdb3..5778c2343e 100644 --- a/src/storage/backend/redis/Redis.h +++ b/src/storage/backend/redis/Redis.h @@ -27,7 +27,7 @@ public: * * @return The debugging name. */ - const char* Tag() override { return tag_str.c_str(); } + const char* Tag() override { return Backend::Tag(); } // IOSource interface double GetNextTimeout() override { return -1; } @@ -61,6 +61,7 @@ private: OperationResult DoErase(ResultCallback* cb, ValPtr key) override; void DoExpire(double current_network_time) override; void DoPoll() override; + std::string DoGetConfigMetricsLabel() const override; OperationResult ParseReplyError(std::string_view op_str, std::string_view reply_err_str) const; OperationResult CheckServerVersion(); diff --git a/src/storage/backend/sqlite/SQLite.cc b/src/storage/backend/sqlite/SQLite.cc index 7602b55328..b4213d38b9 100644 --- a/src/storage/backend/sqlite/SQLite.cc +++ b/src/storage/backend/sqlite/SQLite.cc @@ -12,6 +12,7 @@ #include "zeek/Func.h" #include "zeek/Val.h" #include "zeek/storage/ReturnCode.h" +#include "zeek/telemetry/Counter.h" #include "const.bif.netvar_h" @@ -75,6 +76,11 @@ OperationResult SQLite::RunPragma(std::string_view name, std::optional(); } +std::string SQLite::DoGetConfigMetricsLabel() const { + std::string tag = util::fmt("%s-%s", full_path.c_str(), table_name.c_str()); + return tag; +} + /** * Called by the manager system to open the backend. */ @@ -362,6 +368,8 @@ OperationResult SQLite::DoPut(ResultCallback* cb, ValPtr key, ValPtr value, bool step_result.code = ReturnCode::KEY_EXISTS; } + IncBytesWrittenMetric(val_data->size()); + return step_result; } @@ -391,6 +399,7 @@ OperationResult SQLite::DoGet(ResultCallback* cb, ValPtr key) { auto value_parser = [this](sqlite3_stmt* stmt) -> OperationResult { auto blob = static_cast(sqlite3_column_blob(stmt, 0)); size_t blob_size = sqlite3_column_bytes(stmt, 0); + IncBytesReadMetric(blob_size); auto val = serializer->Unserialize({blob, blob_size}, val_type); @@ -530,6 +539,11 @@ void SQLite::DoExpire(double current_network_time) { Error(err.c_str()); } + // Get the number of changes from the delete statement. This should be identical to the num_to_expire + // value earlier because we're under a transaction, but this should be the exact number that changed. + int changes = sqlite3_changes(db); + IncExpiredEntriesMetric(changes); + sqlite3_exec(expire_db, "commit transaction", nullptr, nullptr, &errMsg); sqlite3_free(errMsg); } diff --git a/src/storage/backend/sqlite/SQLite.h b/src/storage/backend/sqlite/SQLite.h index cd07dc8ad5..cdab9bf5d2 100644 --- a/src/storage/backend/sqlite/SQLite.h +++ b/src/storage/backend/sqlite/SQLite.h @@ -34,6 +34,7 @@ private: OperationResult DoGet(ResultCallback* cb, ValPtr key) override; OperationResult DoErase(ResultCallback* cb, ValPtr key) override; void DoExpire(double current_network_time) override; + std::string DoGetConfigMetricsLabel() const override; /** * Checks whether a status code returned by an sqlite call is a success. diff --git a/testing/btest/Baseline/scripts.base.frameworks.storage.sqlite-basic-sync-in-when/out b/testing/btest/Baseline/scripts.base.frameworks.storage.sqlite-basic-sync-in-when/out index c9184cdc98..3690e561c6 100644 --- a/testing/btest/Baseline/scripts.base.frameworks.storage.sqlite-basic-sync-in-when/out +++ b/testing/btest/Baseline/scripts.base.frameworks.storage.sqlite-basic-sync-in-when/out @@ -4,3 +4,9 @@ put result, [code=Storage::SUCCESS, error_str=, value=, value=value5678] get result same as inserted, T closed succesfully +Storage::backend_opened, Storage::STORAGE_BACKEND_SQLITE, [serializer=Storage::STORAGE_SERIALIZER_JSON, sqlite=[database_path=test.sqlite, table_name=testing, busy_timeout=5.0 secs, pragma_commands={ +[integrity_check] = , +[journal_mode] = WAL, +[synchronous] = normal, +[temp_store] = memory +}, pragma_timeout=500.0 msecs, pragma_wait_on_busy=5.0 msecs]] diff --git a/testing/btest/Baseline/scripts.base.frameworks.storage.sqlite-erase/out b/testing/btest/Baseline/scripts.base.frameworks.storage.sqlite-erase/out index 5649e321fd..9a0d8c8402 100644 --- a/testing/btest/Baseline/scripts.base.frameworks.storage.sqlite-erase/out +++ b/testing/btest/Baseline/scripts.base.frameworks.storage.sqlite-erase/out @@ -2,3 +2,9 @@ open result, [code=Storage::SUCCESS, error_str=, value=] erase result, [code=Storage::SUCCESS, error_str=, value=] get result, [code=Storage::KEY_NOT_FOUND, error_str=, value=] +Storage::backend_opened, Storage::STORAGE_BACKEND_SQLITE, [serializer=Storage::STORAGE_SERIALIZER_JSON, sqlite=[database_path=storage-test.sqlite, table_name=testing, busy_timeout=5.0 secs, pragma_commands={ +[integrity_check] = , +[journal_mode] = WAL, +[synchronous] = normal, +[temp_store] = memory +}, pragma_timeout=500.0 msecs, pragma_wait_on_busy=5.0 msecs]] diff --git a/testing/btest/plugins/storage-plugin/src/StorageDummy.cc b/testing/btest/plugins/storage-plugin/src/StorageDummy.cc index 59471dac18..e588ebd695 100644 --- a/testing/btest/plugins/storage-plugin/src/StorageDummy.cc +++ b/testing/btest/plugins/storage-plugin/src/StorageDummy.cc @@ -87,4 +87,6 @@ OperationResult StorageDummy::DoErase(ResultCallback* cb, ValPtr key) { return {ReturnCode::KEY_NOT_FOUND}; } +std::string StorageDummy::DoGetConfigMetricsLabel() const { return "storage-dummy"; } + } // namespace btest::storage::backend diff --git a/testing/btest/plugins/storage-plugin/src/StorageDummy.h b/testing/btest/plugins/storage-plugin/src/StorageDummy.h index 3e1e5d2a9d..ac3ef43535 100644 --- a/testing/btest/plugins/storage-plugin/src/StorageDummy.h +++ b/testing/btest/plugins/storage-plugin/src/StorageDummy.h @@ -49,6 +49,8 @@ public: */ zeek::storage::OperationResult DoErase(zeek::storage::ResultCallback* cb, zeek::ValPtr key) override; + std::string DoGetConfigMetricsLabel() const override; + private: std::map data; bool open = false; diff --git a/testing/btest/scripts/base/frameworks/storage/sqlite-erase.zeek b/testing/btest/scripts/base/frameworks/storage/sqlite-erase.zeek index b1b38bd77c..4ca830bd72 100644 --- a/testing/btest/scripts/base/frameworks/storage/sqlite-erase.zeek +++ b/testing/btest/scripts/base/frameworks/storage/sqlite-erase.zeek @@ -7,6 +7,11 @@ @load base/frameworks/storage/sync @load policy/frameworks/storage/backend/sqlite +event Storage::backend_opened(tag: Storage::Backend, config: any) + { + print "Storage::backend_opened", tag, config; + } + event zeek_init() { # Create a database file in the .tmp directory with a 'testing' table