Add storage metrics for operations, expirations, data transferred

This commit is contained in:
Tim Wojtulewicz 2025-06-26 11:19:31 -07:00
parent cab0883254
commit a0ffe7f748
12 changed files with 293 additions and 6 deletions

View file

@ -29,4 +29,7 @@ export {
## backend.
expire_time: interval &default=0sec;
};
# The histogram buckets to use for operation latency metrics, in seconds.
const latency_metric_bounds: vector of double = { 0.001, 0.01, 0.1, 1.0, } &redef;
}

View file

@ -8,9 +8,30 @@
#include "zeek/storage/Manager.h"
#include "zeek/storage/ReturnCode.h"
#include "zeek/storage/storage-events.bif.h"
#include "zeek/telemetry/Counter.h"
#include "zeek/telemetry/Histogram.h"
#include "zeek/telemetry/Manager.h"
namespace zeek::storage {
namespace detail {
OperationMetrics::OperationMetrics(const telemetry::CounterFamilyPtr& results_family,
const telemetry::HistogramFamilyPtr& latency_family, std::string_view operation_type,
std::string_view backend_type, std::string_view backend_config)
: success(results_family->GetOrAdd(
{{"operation", operation_type}, {"type", backend_type}, {"config", backend_config}, {"result", "success"}})),
fail(results_family->GetOrAdd(
{{"operation", operation_type}, {"type", backend_type}, {"config", backend_config}, {"result", "fail"}})),
error(results_family->GetOrAdd(
{{"operation", operation_type}, {"type", backend_type}, {"config", backend_config}, {"result", "error"}})),
timeouts(results_family->GetOrAdd(
{{"operation", operation_type}, {"type", backend_type}, {"config", backend_config}, {"result", "timeout"}})),
latency(latency_family->GetOrAdd(
{{"operation", operation_type}, {"type", backend_type}, {"config", backend_config}})) {}
} // namespace detail
RecordValPtr OperationResult::BuildVal() { return MakeVal(code, err_str, value); }
RecordValPtr OperationResult::MakeVal(EnumValPtr code, std::string_view err_str, ValPtr value) {
@ -32,9 +53,14 @@ ResultCallback::ResultCallback(zeek::detail::trigger::TriggerPtr trigger, const
void ResultCallback::Timeout() {
if ( ! IsSyncCallback() )
trigger->Cache(assoc, OperationResult::MakeVal(ReturnCode::TIMEOUT).get());
if ( operation_metrics )
operation_metrics->timeouts->Inc();
}
void ResultCallback::Complete(OperationResult res) {
UpdateOperationMetrics(res.code);
// If this is a sync callback, there isn't a trigger to process. Store the result and bail.
if ( IsSyncCallback() ) {
result = std::move(res);
@ -46,6 +72,25 @@ void ResultCallback::Complete(OperationResult res) {
trigger->Release();
}
void ResultCallback::Init(detail::OperationMetrics* m) {
operation_metrics = m;
start_time = util::current_time(true);
}
void ResultCallback::UpdateOperationMetrics(EnumValPtr c) {
if ( operation_metrics ) {
if ( c == ReturnCode::SUCCESS )
operation_metrics->success->Inc();
else if ( c == ReturnCode::KEY_EXISTS || c == ReturnCode::KEY_NOT_FOUND )
operation_metrics->fail->Inc();
else if ( c != ReturnCode::IN_PROGRESS )
operation_metrics->error->Inc();
// Store the latency between start and completion in milliseconds.
operation_metrics->latency->Observe(util::current_time(true) - start_time);
}
}
OpenResultCallback::OpenResultCallback(IntrusivePtr<detail::BackendHandleVal> backend)
: ResultCallback(), backend(std::move(backend)) {}
@ -55,6 +100,7 @@ OpenResultCallback::OpenResultCallback(zeek::detail::trigger::TriggerPtr trigger
void OpenResultCallback::Complete(OperationResult res) {
if ( res.code == ReturnCode::SUCCESS ) {
backend->backend->InitMetrics();
backend->backend->EnqueueBackendOpened();
}
@ -68,6 +114,12 @@ void OpenResultCallback::Complete(OperationResult res) {
Backend::Backend(uint8_t modes, std::string_view tag_name) : modes(modes) {
tag = storage_mgr->BackendMgr().GetComponentTag(std::string{tag_name});
tag_str = zeek::obj_desc_short(tag.AsVal().get());
// The rest of the metrics are initialized after the backend opens, but this one has
// to be here because it's possible it gets used by the open callback before Open()
// fully returns.
backends_opened_metric =
telemetry_mgr->CounterInstance("zeek", "storage_backends_opened", {}, "Number of backends opened", "");
}
OperationResult Backend::Open(OpenResultCallback* cb, RecordValPtr options, TypePtr kt, TypePtr vt) {
@ -88,12 +140,22 @@ OperationResult Backend::Open(OpenResultCallback* cb, RecordValPtr options, Type
if ( ! ret.value )
ret.value = cb->Backend();
if ( ret.code == ReturnCode::SUCCESS )
InitMetrics();
// Complete sync callbacks to make sure the metrics get initialized plus that the
// backend_opened event gets posted.
if ( cb->IsSyncCallback() )
CompleteCallback(cb, ret);
return ret;
}
OperationResult Backend::Close(ResultCallback* cb) { return DoClose(cb); }
OperationResult Backend::Put(ResultCallback* cb, ValPtr key, ValPtr value, bool overwrite, double expiration_time) {
cb->Init(put_metrics.get());
// The intention for this method is to do some other heavy lifting in regard
// to backends that need to pass data through the manager instead of directly
// through the workers. For the first versions of the storage framework it
@ -109,10 +171,16 @@ OperationResult Backend::Put(ResultCallback* cb, ValPtr key, ValPtr value, bool
return ret;
}
return DoPut(cb, std::move(key), std::move(value), overwrite, expiration_time);
auto ret = DoPut(cb, std::move(key), std::move(value), overwrite, expiration_time);
if ( cb->IsSyncCallback() )
cb->UpdateOperationMetrics(ret.code);
return ret;
}
OperationResult Backend::Get(ResultCallback* cb, ValPtr key) {
cb->Init(get_metrics.get());
// See the note in Put().
if ( ! same_type(key->GetType(), key_type) ) {
auto ret = OperationResult{ReturnCode::KEY_TYPE_MISMATCH};
@ -120,10 +188,16 @@ OperationResult Backend::Get(ResultCallback* cb, ValPtr key) {
return ret;
}
return DoGet(cb, std::move(key));
auto ret = DoGet(cb, std::move(key));
if ( cb->IsSyncCallback() )
cb->UpdateOperationMetrics(ret.code);
return ret;
}
OperationResult Backend::Erase(ResultCallback* cb, ValPtr key) {
cb->Init(erase_metrics.get());
// See the note in Put().
if ( ! same_type(key->GetType(), key_type) ) {
auto ret = OperationResult{ReturnCode::KEY_TYPE_MISMATCH};
@ -131,7 +205,11 @@ OperationResult Backend::Erase(ResultCallback* cb, ValPtr key) {
return ret;
}
return DoErase(cb, std::move(key));
auto ret = DoErase(cb, std::move(key));
if ( cb->IsSyncCallback() )
cb->UpdateOperationMetrics(ret.code);
return ret;
}
void Backend::CompleteCallback(ResultCallback* cb, const OperationResult& data) const {
@ -145,12 +223,57 @@ void Backend::CompleteCallback(ResultCallback* cb, const OperationResult& data)
}
}
void Backend::EnqueueBackendOpened() { event_mgr.Enqueue(Storage::backend_opened, tag.AsVal(), backend_options); }
void Backend::EnqueueBackendOpened() {
event_mgr.Enqueue(Storage::backend_opened, tag.AsVal(), backend_options);
backends_opened_metric->Inc();
}
void Backend::EnqueueBackendLost(std::string_view reason) {
event_mgr.Enqueue(Storage::backend_lost, tag.AsVal(), backend_options, make_intrusive<StringVal>(reason));
}
void Backend::InitMetrics() {
if ( metrics_initialized )
return;
metrics_initialized = true;
auto results_family =
telemetry_mgr->CounterFamily("zeek", "storage_backend_operation_results",
{"operation", "type", "config", "result"}, "Storage operation results");
auto bounds_val = zeek::id::find_val<zeek::VectorVal>("Storage::latency_metric_bounds");
std::vector<double> bounds(bounds_val->Size());
for ( unsigned int i = 0; i < bounds_val->Size(); i++ )
bounds[i] = bounds_val->DoubleAt(i);
auto latency_family =
telemetry_mgr->HistogramFamily("zeek", "storage_backend_operation_latency", {"operation", "type", "config"},
bounds, "Storage Operation Latency", "seconds");
std::string metrics_config = GetConfigMetricsLabel();
put_metrics =
std::make_unique<detail::OperationMetrics>(results_family, latency_family, "put", Tag(), metrics_config);
get_metrics =
std::make_unique<detail::OperationMetrics>(results_family, latency_family, "get", Tag(), metrics_config);
erase_metrics =
std::make_unique<detail::OperationMetrics>(results_family, latency_family, "erase", Tag(), metrics_config);
bytes_read_metric = telemetry_mgr->CounterInstance("zeek", "storage_backend_data_written",
{{"type", Tag()}, {"config", metrics_config}},
"Storage data written to backend", "bytes");
bytes_written_metric = telemetry_mgr->CounterInstance("zeek", "storage_backend_data_read",
{{"type", Tag()}, {"config", metrics_config}},
"Storage data read from backend", "bytes");
expired_entries_metric = telemetry_mgr->CounterInstance("zeek", "storage_backend_expired_entries",
{{"type", Tag()}, {"config", metrics_config}},
"Storage expired entries removed by backend", "");
}
void Backend::IncBytesWrittenMetric(size_t written) { bytes_written_metric->Inc(written); }
void Backend::IncBytesReadMetric(size_t read) { bytes_read_metric->Inc(read); }
void Backend::IncExpiredEntriesMetric(size_t expired) { expired_entries_metric->Inc(expired); }
zeek::OpaqueTypePtr detail::backend_opaque;
IMPLEMENT_OPAQUE_VALUE(detail::BackendHandleVal)

View file

@ -2,6 +2,8 @@
#pragma once
#include <memory>
#include "zeek/OpaqueVal.h"
#include "zeek/Tag.h"
#include "zeek/Val.h"
@ -12,8 +14,33 @@ class Trigger;
using TriggerPtr = IntrusivePtr<Trigger>;
} // namespace zeek::detail::trigger
namespace zeek::telemetry {
class Counter;
using CounterPtr = std::shared_ptr<Counter>;
class CounterFamily;
using CounterFamilyPtr = std::shared_ptr<CounterFamily>;
class Histogram;
using HistogramPtr = std::shared_ptr<Histogram>;
class HistogramFamily;
using HistogramFamilyPtr = std::shared_ptr<HistogramFamily>;
} // namespace zeek::telemetry
namespace zeek::storage {
namespace detail {
struct OperationMetrics {
telemetry::CounterPtr success;
telemetry::CounterPtr fail;
telemetry::CounterPtr error;
telemetry::CounterPtr timeouts;
telemetry::HistogramPtr latency;
OperationMetrics(const telemetry::CounterFamilyPtr& results_family,
const telemetry::HistogramFamilyPtr& latency_family, std::string_view operation_type,
std::string_view backend_type, std::string_view backend_config);
};
} // namespace detail
class Manager;
/**
@ -82,10 +109,37 @@ public:
OperationResult Result() const { return result; }
/**
* Stores the collection of metrics instruments to update when the operation completes
* and sets the start time for an operation to be used to update the latency metric
* when the operation completes. This is unset for open/close callbacks.
*/
void Init(detail::OperationMetrics* m);
/**
* Update the metrics based on a return value.
*/
void UpdateOperationMetrics(EnumValPtr c);
/**
* Stores the amount of data transferred in the operation. This can be used by async
* backends to set the amount transferred in Put operations so it can be added to the
* metrics when the operation finishes.
*/
void AddDataTransferredSize(size_t size) { transferred_size += size; }
/**
* Returns the amount of data transferred in this operation.
*/
size_t GetDataTransferredSize() const { return transferred_size; }
protected:
zeek::detail::trigger::TriggerPtr trigger;
const void* assoc = nullptr;
OperationResult result;
detail::OperationMetrics* operation_metrics = nullptr;
double start_time = 0.0;
size_t transferred_size = 0;
};
class OpenResultCallback;
@ -101,7 +155,7 @@ public:
/**
* Returns a descriptive tag representing the source for debugging.
*/
const char* Tag() { return tag_str.c_str(); }
const char* Tag() const { return tag_str.c_str(); }
/**
* Store a new key/value pair in the backend.
@ -234,6 +288,34 @@ protected:
*/
void CompleteCallback(ResultCallback* cb, const OperationResult& data) const;
/**
* Returns a string compatible with Prometheus that's used as a tag to differentiate
* entries of backend instances.
*/
std::string GetConfigMetricsLabel() const { return DoGetConfigMetricsLabel(); }
/**
* Utility method to increase the metrics for number of bytes written by a backend.
*
* @param written The number of bytes written by the last operation.
*/
void IncBytesWrittenMetric(size_t written);
/**
* Utility method to increase the metrics for number of bytes read by a backend.
*
* @param read The number of bytes read by the last operation.
*/
void IncBytesReadMetric(size_t read);
/**
* Utility method to increase the metrics for number of entries expired and removed
* from the backend.
*
* @param expired The number of elements removed by the last operation.
*/
void IncExpiredEntriesMetric(size_t expired);
TypePtr key_type;
TypePtr val_type;
RecordValPtr backend_options;
@ -291,7 +373,30 @@ private:
*/
virtual void DoExpire(double current_network_time) {}
/**
* Returns a string compatible with Prometheus that's used as a tag to differentiate
* entries of backend instances.
*/
virtual std::string DoGetConfigMetricsLabel() const = 0;
/**
* Initializes the instruments for various storage metrics.
*/
void InitMetrics();
uint8_t modes;
bool metrics_initialized = false;
// These are owned by the backend but are passed into the callbacks to be
// updated when those complete/timeout.
std::unique_ptr<detail::OperationMetrics> put_metrics;
std::unique_ptr<detail::OperationMetrics> get_metrics;
std::unique_ptr<detail::OperationMetrics> erase_metrics;
telemetry::CounterPtr bytes_written_metric;
telemetry::CounterPtr bytes_read_metric;
telemetry::CounterPtr backends_opened_metric;
telemetry::CounterPtr expired_entries_metric;
};
using BackendPtr = zeek::IntrusivePtr<Backend>;

View file

@ -11,6 +11,7 @@
#include "zeek/Val.h"
#include "zeek/iosource/Manager.h"
#include "zeek/storage/ReturnCode.h"
#include "zeek/telemetry/Counter.h"
#include "hiredis/adapters/poll.h"
#include "hiredis/async.h"
@ -246,6 +247,17 @@ constexpr char REQUIRED_VERSION[] = "6.2.0";
storage::BackendPtr Redis::Instantiate() { return make_intrusive<Redis>(); }
std::string Redis::DoGetConfigMetricsLabel() const {
static auto running_under_test = id::find_val("running_under_test")->AsBool();
std::string tag;
if ( running_under_test )
tag = util::fmt("server_addr-%s", key_prefix.c_str());
else
tag = util::fmt("%s-%s", server_addr.c_str(), key_prefix.c_str());
return tag;
}
/**
* Called by the manager system to open the backend.
*/
@ -406,6 +418,8 @@ OperationResult Redis::DoPut(ResultCallback* cb, ValPtr key, ValPtr value, bool
if ( connected && status == REDIS_ERR )
return {ReturnCode::OPERATION_FAILED, util::fmt("Failed to queue put operation: %s", async_ctx->errstr)};
cb->AddDataTransferredSize(key_data->size() + val_data->size());
++active_ops;
// If reading pcaps insert into a secondary set that's ordered by expiration
@ -537,6 +551,8 @@ void Redis::DoExpire(double current_network_time) {
// TODO: do we care if this failed?
}
IncExpiredEntriesMetric(elements.size());
freeReplyObject(reply);
// Remove all of the elements from the range-set that match the time range.
@ -566,6 +582,8 @@ void Redis::HandlePutResult(redisReply* reply, ResultCallback* callback) {
else if ( reply->type == REDIS_REPLY_ERROR )
res = ParseReplyError("put", reply->str);
IncBytesWrittenMetric(callback->GetDataTransferredSize());
freeReplyObject(reply);
CompleteCallback(callback, res);
}
@ -583,6 +601,7 @@ void Redis::HandleGetResult(redisReply* reply, ResultCallback* callback) {
else if ( reply->type == REDIS_REPLY_ERROR )
res = ParseReplyError("get", reply->str);
else {
IncBytesReadMetric(reply->len);
auto val = serializer->Unserialize({(std::byte*)reply->str, reply->len}, val_type);
if ( val )
res = {ReturnCode::SUCCESS, "", val.value()};

View file

@ -27,7 +27,7 @@ public:
*
* @return The debugging name.
*/
const char* Tag() override { return tag_str.c_str(); }
const char* Tag() override { return Backend::Tag(); }
// IOSource interface
double GetNextTimeout() override { return -1; }
@ -61,6 +61,7 @@ private:
OperationResult DoErase(ResultCallback* cb, ValPtr key) override;
void DoExpire(double current_network_time) override;
void DoPoll() override;
std::string DoGetConfigMetricsLabel() const override;
OperationResult ParseReplyError(std::string_view op_str, std::string_view reply_err_str) const;
OperationResult CheckServerVersion();

View file

@ -12,6 +12,7 @@
#include "zeek/Func.h"
#include "zeek/Val.h"
#include "zeek/storage/ReturnCode.h"
#include "zeek/telemetry/Counter.h"
#include "const.bif.netvar_h"
@ -75,6 +76,11 @@ OperationResult SQLite::RunPragma(std::string_view name, std::optional<std::stri
storage::BackendPtr SQLite::Instantiate() { return make_intrusive<SQLite>(); }
std::string SQLite::DoGetConfigMetricsLabel() const {
std::string tag = util::fmt("%s-%s", full_path.c_str(), table_name.c_str());
return tag;
}
/**
* Called by the manager system to open the backend.
*/
@ -362,6 +368,8 @@ OperationResult SQLite::DoPut(ResultCallback* cb, ValPtr key, ValPtr value, bool
step_result.code = ReturnCode::KEY_EXISTS;
}
IncBytesWrittenMetric(val_data->size());
return step_result;
}
@ -391,6 +399,7 @@ OperationResult SQLite::DoGet(ResultCallback* cb, ValPtr key) {
auto value_parser = [this](sqlite3_stmt* stmt) -> OperationResult {
auto blob = static_cast<const std::byte*>(sqlite3_column_blob(stmt, 0));
size_t blob_size = sqlite3_column_bytes(stmt, 0);
IncBytesReadMetric(blob_size);
auto val = serializer->Unserialize({blob, blob_size}, val_type);
@ -530,6 +539,11 @@ void SQLite::DoExpire(double current_network_time) {
Error(err.c_str());
}
// Get the number of changes from the delete statement. This should be identical to the num_to_expire
// value earlier because we're under a transaction, but this should be the exact number that changed.
int changes = sqlite3_changes(db);
IncExpiredEntriesMetric(changes);
sqlite3_exec(expire_db, "commit transaction", nullptr, nullptr, &errMsg);
sqlite3_free(errMsg);
}

View file

@ -34,6 +34,7 @@ private:
OperationResult DoGet(ResultCallback* cb, ValPtr key) override;
OperationResult DoErase(ResultCallback* cb, ValPtr key) override;
void DoExpire(double current_network_time) override;
std::string DoGetConfigMetricsLabel() const override;
/**
* Checks whether a status code returned by an sqlite call is a success.

View file

@ -4,3 +4,9 @@ put result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=<uninitiali
get result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=value5678]
get result same as inserted, T
closed succesfully
Storage::backend_opened, Storage::STORAGE_BACKEND_SQLITE, [serializer=Storage::STORAGE_SERIALIZER_JSON, sqlite=[database_path=test.sqlite, table_name=testing, busy_timeout=5.0 secs, pragma_commands={
[integrity_check] = ,
[journal_mode] = WAL,
[synchronous] = normal,
[temp_store] = memory
}, pragma_timeout=500.0 msecs, pragma_wait_on_busy=5.0 msecs]]

View file

@ -2,3 +2,9 @@
open result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=<opaque of BackendHandleVal>]
erase result, [code=Storage::SUCCESS, error_str=<uninitialized>, value=<uninitialized>]
get result, [code=Storage::KEY_NOT_FOUND, error_str=<uninitialized>, value=<uninitialized>]
Storage::backend_opened, Storage::STORAGE_BACKEND_SQLITE, [serializer=Storage::STORAGE_SERIALIZER_JSON, sqlite=[database_path=storage-test.sqlite, table_name=testing, busy_timeout=5.0 secs, pragma_commands={
[integrity_check] = ,
[journal_mode] = WAL,
[synchronous] = normal,
[temp_store] = memory
}, pragma_timeout=500.0 msecs, pragma_wait_on_busy=5.0 msecs]]

View file

@ -87,4 +87,6 @@ OperationResult StorageDummy::DoErase(ResultCallback* cb, ValPtr key) {
return {ReturnCode::KEY_NOT_FOUND};
}
std::string StorageDummy::DoGetConfigMetricsLabel() const { return "storage-dummy"; }
} // namespace btest::storage::backend

View file

@ -49,6 +49,8 @@ public:
*/
zeek::storage::OperationResult DoErase(zeek::storage::ResultCallback* cb, zeek::ValPtr key) override;
std::string DoGetConfigMetricsLabel() const override;
private:
std::map<zeek::byte_buffer, zeek::byte_buffer> data;
bool open = false;

View file

@ -7,6 +7,11 @@
@load base/frameworks/storage/sync
@load policy/frameworks/storage/backend/sqlite
event Storage::backend_opened(tag: Storage::Backend, config: any)
{
print "Storage::backend_opened", tag, config;
}
event zeek_init()
{
# Create a database file in the .tmp directory with a 'testing' table