From a0ae06b3cdd3e37a0e8393809661d1e9a2cbf3b0 Mon Sep 17 00:00:00 2001 From: Tim Wojtulewicz Date: Fri, 1 Mar 2024 13:43:37 -0700 Subject: [PATCH] Convert telemetry code to use prometheus-cpp --- CMakeLists.txt | 10 + src/EventHandler.cc | 4 +- src/EventHandler.h | 10 +- src/OpaqueVal.cc | 26 +- src/OpaqueVal.h | 32 +- src/broker/Manager.cc | 2 - src/logging/Manager.cc | 12 +- src/logging/Manager.h | 4 +- src/session/Manager.cc | 36 +- src/telemetry/CMakeLists.txt | 9 +- src/telemetry/Counter.h | 260 ++++++------ src/telemetry/Gauge.h | 337 +++++++-------- src/telemetry/Histogram.h | 339 +++++++++------- src/telemetry/Manager.cc | 745 ++++++++++++++-------------------- src/telemetry/Manager.h | 358 ++++++++-------- src/telemetry/MetricFamily.cc | 78 ++++ src/telemetry/MetricFamily.h | 69 +++- src/telemetry/ProcessStats.cc | 204 ++++++++++ src/telemetry/ProcessStats.h | 23 ++ src/telemetry/Timer.h | 8 +- src/telemetry/telemetry.bif | 141 ++++--- src/zeek-setup.cc | 5 +- 22 files changed, 1517 insertions(+), 1195 deletions(-) create mode 100644 src/telemetry/MetricFamily.cc create mode 100644 src/telemetry/ProcessStats.cc create mode 100644 src/telemetry/ProcessStats.h diff --git a/CMakeLists.txt b/CMakeLists.txt index cbf3f5ec32..0053285c72 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -236,6 +236,11 @@ if (ZEEK_STANDALONE) set(zeek_exe_access PRIVATE) else () add_library(zeek_lib STATIC) + + if (${CMAKE_SYSTEM_NAME} MATCHES "FreeBSD") + target_link_libraries(zeek_exe PRIVATE util) + target_link_libraries(zeek_exe PRIVATE procstat) + endif () endif () if (TARGET zeek_lib) @@ -248,6 +253,11 @@ if (TARGET zeek_lib) install(TARGETS zeek_lib LIBRARY DESTINATION lib) # Tell zeek_target_link_libraries to add library dependencies as PRIVATE. set(zeek_lib_access PRIVATE) + + if (${CMAKE_SYSTEM_NAME} MATCHES "FreeBSD") + target_link_libraries(zeek_exe PRIVATE util) + target_link_libraries(zeek_exe PRIVATE procstat) + endif () endif () # When building our fuzzers, we also need one extra top-level target that diff --git a/src/EventHandler.cc b/src/EventHandler.cc index 1a26082a77..46be3fb7c3 100644 --- a/src/EventHandler.cc +++ b/src/EventHandler.cc @@ -49,7 +49,7 @@ void EventHandler::Call(Args* vl, bool no_remote, double ts) { telemetry_mgr->CounterFamily("zeek", "event-handler-invocations", {"name"}, "Number of times the given event handler was called", "1", true); - call_count = eh_invocations_family.GetOrAdd({{"name", name}}); + call_count = eh_invocations_family->GetOrAdd({{"name", name}}); } call_count->Inc(); @@ -113,4 +113,6 @@ void EventHandler::NewEvent(Args* vl) { event_mgr.Dispatch(ev); } +uint64_t EventHandler::CallCount() const { return call_count ? call_count->Value() : 0; } + } // namespace zeek diff --git a/src/EventHandler.h b/src/EventHandler.h index 1ef05213c5..b062db6bfb 100644 --- a/src/EventHandler.h +++ b/src/EventHandler.h @@ -9,7 +9,6 @@ #include "zeek/Type.h" #include "zeek/ZeekArgs.h" #include "zeek/ZeekList.h" -#include "zeek/telemetry/Counter.h" namespace zeek { @@ -17,6 +16,10 @@ namespace run_state { extern double network_time; } // namespace run_state +namespace telemetry { +class IntCounter; +} + class Func; using FuncPtr = IntrusivePtr; @@ -60,7 +63,8 @@ public: void SetGenerateAlways(bool arg_generate_always = true) { generate_always = arg_generate_always; } bool GenerateAlways() const { return generate_always; } - uint64_t CallCount() const { return call_count ? call_count->Value() : 0; } + // Returns the number of times this EventHandler has been called since startup. + uint64_t CallCount() const; private: void NewEvent(zeek::Args* vl); // Raise new_event() meta event. @@ -74,7 +78,7 @@ private: bool generate_always; // Initialize this lazy, so we don't expose metrics for 0 values. - std::optional call_count; + std::shared_ptr call_count; std::unordered_set auto_publish; }; diff --git a/src/OpaqueVal.cc b/src/OpaqueVal.cc index 1a2a90360a..bce619fd5d 100644 --- a/src/OpaqueVal.cc +++ b/src/OpaqueVal.cc @@ -1096,29 +1096,31 @@ std::optional TelemetryVal::DoSerializeData() const { return std::nu bool TelemetryVal::DoUnserializeData(BrokerDataView) { return false; } -TelemetryVal::TelemetryVal(telemetry::IntCounter) : OpaqueVal(int_counter_metric_type) {} +TelemetryVal::TelemetryVal(std::shared_ptr) : OpaqueVal(int_counter_metric_type) {} -TelemetryVal::TelemetryVal(telemetry::IntCounterFamily) : OpaqueVal(int_counter_metric_family_type) {} +TelemetryVal::TelemetryVal(std::shared_ptr) : OpaqueVal(int_counter_metric_family_type) {} -TelemetryVal::TelemetryVal(telemetry::DblCounter) : OpaqueVal(dbl_counter_metric_type) {} +TelemetryVal::TelemetryVal(std::shared_ptr) : OpaqueVal(dbl_counter_metric_type) {} -TelemetryVal::TelemetryVal(telemetry::DblCounterFamily) : OpaqueVal(dbl_counter_metric_family_type) {} +TelemetryVal::TelemetryVal(std::shared_ptr) : OpaqueVal(dbl_counter_metric_family_type) {} -TelemetryVal::TelemetryVal(telemetry::IntGauge) : OpaqueVal(int_gauge_metric_type) {} +TelemetryVal::TelemetryVal(std::shared_ptr) : OpaqueVal(int_gauge_metric_type) {} -TelemetryVal::TelemetryVal(telemetry::IntGaugeFamily) : OpaqueVal(int_gauge_metric_family_type) {} +TelemetryVal::TelemetryVal(std::shared_ptr) : OpaqueVal(int_gauge_metric_family_type) {} -TelemetryVal::TelemetryVal(telemetry::DblGauge) : OpaqueVal(dbl_gauge_metric_type) {} +TelemetryVal::TelemetryVal(std::shared_ptr) : OpaqueVal(dbl_gauge_metric_type) {} -TelemetryVal::TelemetryVal(telemetry::DblGaugeFamily) : OpaqueVal(dbl_gauge_metric_family_type) {} +TelemetryVal::TelemetryVal(std::shared_ptr) : OpaqueVal(dbl_gauge_metric_family_type) {} -TelemetryVal::TelemetryVal(telemetry::IntHistogram) : OpaqueVal(int_histogram_metric_type) {} +TelemetryVal::TelemetryVal(std::shared_ptr) : OpaqueVal(int_histogram_metric_type) {} -TelemetryVal::TelemetryVal(telemetry::IntHistogramFamily) : OpaqueVal(int_histogram_metric_family_type) {} +TelemetryVal::TelemetryVal(std::shared_ptr) + : OpaqueVal(int_histogram_metric_family_type) {} -TelemetryVal::TelemetryVal(telemetry::DblHistogram) : OpaqueVal(dbl_histogram_metric_type) {} +TelemetryVal::TelemetryVal(std::shared_ptr) : OpaqueVal(dbl_histogram_metric_type) {} -TelemetryVal::TelemetryVal(telemetry::DblHistogramFamily) : OpaqueVal(dbl_histogram_metric_family_type) {} +TelemetryVal::TelemetryVal(std::shared_ptr) + : OpaqueVal(dbl_histogram_metric_family_type) {} IMPLEMENT_OPAQUE_VALUE(IntCounterMetricVal) IMPLEMENT_OPAQUE_VALUE(IntCounterMetricFamilyVal) diff --git a/src/OpaqueVal.h b/src/OpaqueVal.h index 76e67e3a88..7e94cbcc2c 100644 --- a/src/OpaqueVal.h +++ b/src/OpaqueVal.h @@ -448,18 +448,18 @@ private: */ class TelemetryVal : public OpaqueVal { protected: - explicit TelemetryVal(telemetry::IntCounter); - explicit TelemetryVal(telemetry::IntCounterFamily); - explicit TelemetryVal(telemetry::DblCounter); - explicit TelemetryVal(telemetry::DblCounterFamily); - explicit TelemetryVal(telemetry::IntGauge); - explicit TelemetryVal(telemetry::IntGaugeFamily); - explicit TelemetryVal(telemetry::DblGauge); - explicit TelemetryVal(telemetry::DblGaugeFamily); - explicit TelemetryVal(telemetry::IntHistogram); - explicit TelemetryVal(telemetry::IntHistogramFamily); - explicit TelemetryVal(telemetry::DblHistogram); - explicit TelemetryVal(telemetry::DblHistogramFamily); + explicit TelemetryVal(std::shared_ptr); + explicit TelemetryVal(std::shared_ptr); + explicit TelemetryVal(std::shared_ptr); + explicit TelemetryVal(std::shared_ptr); + explicit TelemetryVal(std::shared_ptr); + explicit TelemetryVal(std::shared_ptr); + explicit TelemetryVal(std::shared_ptr); + explicit TelemetryVal(std::shared_ptr); + explicit TelemetryVal(std::shared_ptr); + explicit TelemetryVal(std::shared_ptr); + explicit TelemetryVal(std::shared_ptr); + explicit TelemetryVal(std::shared_ptr); std::optional DoSerializeData() const override; bool DoUnserializeData(BrokerDataView data) override; @@ -468,11 +468,11 @@ protected: template class TelemetryValImpl : public TelemetryVal { public: - using HandleType = Handle; + using HandleType = std::shared_ptr; - explicit TelemetryValImpl(Handle hdl) : TelemetryVal(hdl), hdl(hdl) {} + explicit TelemetryValImpl(HandleType hdl) : TelemetryVal(hdl), hdl(hdl) {} - Handle GetHandle() const noexcept { return hdl; } + HandleType GetHandle() const noexcept { return hdl; } static zeek::OpaqueValPtr OpaqueInstantiate() { reporter->Error("TelemetryValImpl::OpaqueInstantiate is unsupported"); @@ -485,7 +485,7 @@ protected: const char* OpaqueName() const override { return Handle::OpaqueName; } private: - Handle hdl; + HandleType hdl; }; using IntCounterMetricVal = TelemetryValImpl; diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index a80f6a3746..6030cf6138 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -415,8 +415,6 @@ void Manager::InitPostScript() { bstate->subscriber.add_topic(broker::topic::store_events(), true); - telemetry_mgr->InitPostBrokerSetup(bstate->endpoint); - InitializeBrokerStoreForwarding(); } diff --git a/src/logging/Manager.cc b/src/logging/Manager.cc index e79cdaf09f..a3758336bc 100644 --- a/src/logging/Manager.cc +++ b/src/logging/Manager.cc @@ -223,9 +223,9 @@ struct Manager::WriterInfo { bool hook_initialized = false; string instantiating_filter; - telemetry::IntCounter total_writes; + std::shared_ptr total_writes; - WriterInfo(telemetry::IntCounter total_writes) : total_writes(total_writes) {} + WriterInfo(std::shared_ptr total_writes) : total_writes(std::move(total_writes)) {} }; struct Manager::Stream { @@ -244,7 +244,7 @@ struct Manager::Stream { bool enable_remote = false; - std::optional total_writes; // Initialized on first write. + std::shared_ptr total_writes; // Initialized on first write. // State about delayed writes for this Stream. detail::DelayQueue delay_queue; @@ -947,7 +947,7 @@ bool Manager::Write(EnumVal* id, RecordVal* columns_arg) { if ( ! stream->total_writes ) { std::string module_name = zeek::detail::extract_module_name(stream->name.c_str()); std::initializer_list labels{{"module", module_name}, {"stream", stream->name}}; - stream->total_writes = total_log_stream_writes_family.GetOrAdd(labels); + stream->total_writes = total_log_stream_writes_family->GetOrAdd(labels); } stream->total_writes->Inc(); @@ -1173,7 +1173,7 @@ bool Manager::WriteToFilters(const Manager::Stream* stream, zeek::RecordValPtr c } assert(w != stream->writers.end()); - w->second->total_writes.Inc(); + w->second->total_writes->Inc(); // Write takes ownership of vals. assert(writer); @@ -1616,7 +1616,7 @@ WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, WriterBacken {"filter-name", instantiating_filter}, {"path", info->path}}; - WriterInfo* winfo = new WriterInfo(zeek::log_mgr->total_log_writer_writes_family.GetOrAdd(labels)); + WriterInfo* winfo = new WriterInfo(zeek::log_mgr->total_log_writer_writes_family->GetOrAdd(labels)); winfo->type = writer->Ref()->AsEnumVal(); winfo->writer = nullptr; winfo->open_time = run_state::network_time; diff --git a/src/logging/Manager.h b/src/logging/Manager.h index d1e39d55d2..863bd1307a 100644 --- a/src/logging/Manager.h +++ b/src/logging/Manager.h @@ -403,8 +403,8 @@ private: FuncPtr rotation_format_func; FuncPtr log_stream_policy_hook; - telemetry::IntCounterFamily total_log_stream_writes_family; - telemetry::IntCounterFamily total_log_writer_writes_family; + std::shared_ptr total_log_stream_writes_family; + std::shared_ptr total_log_writer_writes_family; zeek_uint_t last_delay_token = 0; std::vector active_writes; diff --git a/src/session/Manager.cc b/src/session/Manager.cc index 6452e79ac2..9aead4db4f 100644 --- a/src/session/Manager.cc +++ b/src/session/Manager.cc @@ -32,22 +32,22 @@ namespace detail { class ProtocolStats { public: struct Protocol { - telemetry::IntGauge active; - telemetry::IntCounter total; + std::shared_ptr active; + std::shared_ptr total; ssize_t max = 0; - Protocol(telemetry::IntGaugeFamily active_family, telemetry::IntCounterFamily total_family, - std::string protocol) - : active(active_family.GetOrAdd({{"protocol", protocol}})), - total(total_family.GetOrAdd({{"protocol", protocol}})) {} + Protocol(const std::shared_ptr& active_family, + const std::shared_ptr& total_family, std::string protocol) + : active(active_family->GetOrAdd({{"protocol", protocol}})), + total(total_family->GetOrAdd({{"protocol", protocol}})) {} }; using ProtocolMap = std::map; ProtocolMap::iterator InitCounters(const std::string& protocol) { - telemetry::IntGaugeFamily active_family = + auto active_family = telemetry_mgr->GaugeFamily("zeek", "active-sessions", {"protocol"}, "Active Zeek Sessions"); - telemetry::IntCounterFamily total_family = + auto total_family = telemetry_mgr->CounterFamily("zeek", "total-sessions", {"protocol"}, "Total number of sessions", "1", true); auto [it, inserted] = entries.insert({protocol, Protocol{active_family, total_family, protocol}}); @@ -116,7 +116,7 @@ void Manager::Remove(Session* s) { else { Connection* c = static_cast(s); if ( auto* stat_block = stats->GetCounters(c->TransportIdentifier()) ) - stat_block->active.Dec(); + stat_block->active->Dec(); } // Mark that the session isn't in the table so that in case the @@ -190,18 +190,18 @@ void Manager::Clear() { void Manager::GetStats(Stats& s) { auto* tcp_stats = stats->GetCounters("tcp"); s.max_TCP_conns = tcp_stats->max; - s.num_TCP_conns = tcp_stats->active.Value(); - s.cumulative_TCP_conns = tcp_stats->total.Value(); + s.num_TCP_conns = tcp_stats->active->Value(); + s.cumulative_TCP_conns = tcp_stats->total->Value(); auto* udp_stats = stats->GetCounters("udp"); s.max_UDP_conns = udp_stats->max; - s.num_UDP_conns = udp_stats->active.Value(); - s.cumulative_UDP_conns = udp_stats->total.Value(); + s.num_UDP_conns = udp_stats->active->Value(); + s.cumulative_UDP_conns = udp_stats->total->Value(); auto* icmp_stats = stats->GetCounters("icmp"); s.max_ICMP_conns = icmp_stats->max; - s.num_ICMP_conns = icmp_stats->active.Value(); - s.cumulative_ICMP_conns = icmp_stats->total.Value(); + s.num_ICMP_conns = icmp_stats->active->Value(); + s.cumulative_ICMP_conns = icmp_stats->total->Value(); s.num_fragments = zeek::detail::fragment_mgr->Size(); s.max_fragments = zeek::detail::fragment_mgr->MaxFragments(); @@ -238,10 +238,10 @@ void Manager::InsertSession(detail::Key key, Session* session) { std::string protocol = session->TransportIdentifier(); if ( auto* stat_block = stats->GetCounters(protocol) ) { - stat_block->active.Inc(); - stat_block->total.Inc(); + stat_block->active->Inc(); + stat_block->total->Inc(); - if ( stat_block->active.Value() > stat_block->max ) + if ( stat_block->active->Value() > stat_block->max ) stat_block->max++; } } diff --git a/src/telemetry/CMakeLists.txt b/src/telemetry/CMakeLists.txt index 1c71ba42a3..ed7e5284ee 100644 --- a/src/telemetry/CMakeLists.txt +++ b/src/telemetry/CMakeLists.txt @@ -1 +1,8 @@ -zeek_add_subdir_library(telemetry SOURCES Manager.cc BIFS telemetry.bif) +zeek_add_subdir_library( + telemetry + SOURCES + Manager.cc + MetricFamily.cc + ProcessStats.cc + BIFS + telemetry.bif) diff --git a/src/telemetry/Counter.h b/src/telemetry/Counter.h index 4877c22d89..b0b5d20a6c 100644 --- a/src/telemetry/Counter.h +++ b/src/telemetry/Counter.h @@ -8,190 +8,182 @@ #include "zeek/Span.h" #include "zeek/telemetry/MetricFamily.h" +#include "zeek/telemetry/telemetry.bif.h" -#include "broker/telemetry/fwd.hh" +#include "prometheus/counter.h" +#include "prometheus/family.h" namespace zeek::telemetry { -class DblCounterFamily; -class IntCounterFamily; -class Manager; - -/** - * A handle to a metric that represents an integer value that can only go up. - */ -class IntCounter { +template +class BaseCounter { public: - friend class IntCounterFamily; - - static inline const char* OpaqueName = "IntCounterMetricVal"; - - IntCounter() = delete; - IntCounter(const IntCounter&) noexcept = default; - IntCounter& operator=(const IntCounter&) noexcept = default; + using Handle = prometheus::Counter; + using FamilyType = prometheus::Family; /** * Increments the value by 1. */ - void Inc() noexcept { broker::telemetry::inc(hdl); } + void Inc() noexcept { Inc(1); } /** * Increments the value by @p amount. * @pre `amount >= 0` */ - void Inc(int64_t amount) noexcept { broker::telemetry::inc(hdl, amount); } + void Inc(BaseType amount) noexcept { handle.Increment(amount); } /** * Increments the value by 1. * @return The new value. */ - int64_t operator++() noexcept { return broker::telemetry::inc(hdl); } + BaseType operator++() noexcept { + Inc(1); + return Value(); + } - /** - * @return The current value. - */ - int64_t Value() const noexcept { return broker::telemetry::value(hdl); } + BaseType Value() const noexcept { return static_cast(handle.Value()); } - /** - * @return Whether @c this and @p other refer to the same counter. - */ - constexpr bool IsSameAs(const IntCounter& other) const noexcept { return hdl == other.hdl; } + bool operator==(const BaseCounter& rhs) const noexcept { return &handle == &rhs.handle; } + bool operator!=(const BaseCounter& rhs) const noexcept { return &handle != &rhs.handle; } -private: - using Handle = broker::telemetry::int_counter_hdl*; + bool CompareLabels(const prometheus::Labels& lbls) const { return labels == lbls; } - explicit IntCounter(Handle hdl) noexcept : hdl(hdl) {} +protected: + explicit BaseCounter(FamilyType& family, const prometheus::Labels& labels) noexcept + : handle(family.Add(labels)), labels(labels) {} - Handle hdl; + Handle& handle; + prometheus::Labels labels; + BaseType last_value = 0; }; /** - * Checks whether two @ref IntCounter handles are identical. - * @return Whether @p lhs and @p rhs refer to the same object. - * @note compare their @c value instead to check for equality. + * A handle to a metric that represents an integer value that can only go up. */ -constexpr bool operator==(const IntCounter& lhs, const IntCounter& rhs) noexcept { return lhs.IsSameAs(rhs); } +class IntCounter : public BaseCounter { +public: + static inline const char* OpaqueName = "IntCounterMetricVal"; + explicit IntCounter(FamilyType& family, const prometheus::Labels& labels) noexcept : BaseCounter(family, labels) {} +}; -/// @relates IntCounter -constexpr bool operator!=(const IntCounter& lhs, const IntCounter& rhs) noexcept { return ! (lhs == rhs); } +/** + * A handle to a metric that represents a double value that can only go up. + */ +class DblCounter : public BaseCounter { +public: + static inline const char* OpaqueName = "DblCounterMetricVal"; + explicit DblCounter(FamilyType& family, const prometheus::Labels& labels) noexcept : BaseCounter(family, labels) {} +}; + +template +class BaseCounterFamily : public MetricFamily, + public std::enable_shared_from_this> { +public: + BaseCounterFamily(std::string_view prefix, std::string_view name, Span labels, + std::string_view helptext, std::shared_ptr registry, + std::string_view unit = "", bool is_sum = false) + : MetricFamily(prefix, name, labels, helptext, unit, is_sum), + family(prometheus::BuildCounter().Name(full_name).Help(std::string{helptext}).Register(*registry)) {} + + /** + * Returns the metrics handle for given labels, creating a new instance + * lazily if necessary. + */ + std::shared_ptr GetOrAdd(Span labels) { + prometheus::Labels p_labels = BuildPrometheusLabels(labels); + + auto check = [&](const std::shared_ptr& counter) { return counter->CompareLabels(p_labels); }; + + if ( auto it = std::find_if(counters.begin(), counters.end(), check); it != counters.end() ) + return *it; + + auto counter = std::make_shared(family, p_labels); + counters.push_back(counter); + return counter; + } + + /** + * @copydoc GetOrAdd + */ + std::shared_ptr GetOrAdd(std::initializer_list labels) { + return GetOrAdd(Span{labels.begin(), labels.size()}); + } + + std::vector>& GetAllCounters() { return counters; } + + std::vector Collect() const override { + static auto string_vec_type = zeek::id::find_type("string_vec"); + static auto metric_record_type = zeek::id::find_type("Telemetry::Metric"); + static auto opts_idx = metric_record_type->FieldOffset("opts"); + static auto labels_idx = metric_record_type->FieldOffset("labels"); + static auto value_idx = metric_record_type->FieldOffset("value"); + static auto count_value_idx = metric_record_type->FieldOffset("count_value"); + + RecordValPtr opts_record = GetMetricOptsRecord(); + + std::vector records; + for ( const auto& ctr : counters ) { + auto label_values_vec = make_intrusive(string_vec_type); + for ( const auto& [label_key, label] : ctr->Labels() ) + if ( label_key != "endpoint" ) + label_values_vec->Append(make_intrusive(label)); + + auto r = make_intrusive(metric_record_type); + r->Assign(labels_idx, label_values_vec); + r->Assign(opts_idx, opts_record); + + if constexpr ( std::is_same_v ) + r->Assign(value_idx, zeek::make_intrusive(ctr->Value())); + else { + r->Assign(value_idx, zeek::make_intrusive(static_cast(ctr->Value()))); + r->Assign(count_value_idx, val_mgr->Count(ctr->Value())); + } + + records.push_back(std::move(r)); + } + + return records; + } + +protected: + prometheus::Family& family; + std::vector> counters; +}; /** * Manages a collection of IntCounter metrics. */ -class IntCounterFamily : public MetricFamily { +class IntCounterFamily : public BaseCounterFamily { public: - friend class Manager; - static inline const char* OpaqueName = "IntCounterMetricFamilyVal"; - using InstanceType = IntCounter; + explicit IntCounterFamily(std::string_view prefix, std::string_view name, Span labels, + std::string_view helptext, std::shared_ptr registry, + std::string_view unit = "", bool is_sum = false) + : BaseCounterFamily(prefix, name, labels, helptext, std::move(registry), unit, is_sum) {} IntCounterFamily(const IntCounterFamily&) noexcept = default; - IntCounterFamily& operator=(const IntCounterFamily&) noexcept = default; + IntCounterFamily& operator=(const IntCounterFamily&) noexcept = delete; - /** - * Returns the metrics handle for given labels, creating a new instance - * lazily if necessary. - */ - IntCounter GetOrAdd(Span labels) { return IntCounter{int_counter_get_or_add(hdl, labels)}; } - - /** - * @copydoc GetOrAdd - */ - IntCounter GetOrAdd(std::initializer_list labels) { - return GetOrAdd(Span{labels.begin(), labels.size()}); - } - -private: - using Handle = broker::telemetry::int_counter_family_hdl*; - - explicit IntCounterFamily(Handle hdl) : MetricFamily(upcast(hdl)) {} + zeek_int_t MetricType() const noexcept override { return BifEnum::Telemetry::MetricType::INT_COUNTER; } }; -/** - * A handle to a metric that represents a floating point value that can only go - * up. - */ -class DblCounter { -public: - friend class DblCounterFamily; - - static inline const char* OpaqueName = "DblCounterMetricVal"; - - DblCounter() = delete; - DblCounter(const DblCounter&) noexcept = default; - DblCounter& operator=(const DblCounter&) noexcept = default; - - /** - * Increments the value by 1. - */ - void Inc() noexcept { broker::telemetry::inc(hdl); } - - /** - * Increments the value by @p amount. - * @pre `amount >= 0` - */ - void Inc(double amount) noexcept { broker::telemetry::inc(hdl, amount); } - - /** - * @return The current value. - */ - double Value() const noexcept { return broker::telemetry::value(hdl); } - - /** - * @return Whether @c this and @p other refer to the same counter. - */ - constexpr bool IsSameAs(const DblCounter& other) const noexcept { return hdl == other.hdl; } - -private: - using Handle = broker::telemetry::dbl_counter_hdl*; - - explicit DblCounter(Handle hdl) noexcept : hdl(hdl) {} - - Handle hdl; -}; - -/** - * Checks whether two @ref DblCounter handles are identical. - * @return Whether @p lhs and @p rhs refer to the same object. - * @note compare their @c value instead to check for equality. - */ -constexpr bool operator==(const DblCounter& lhs, const DblCounter& rhs) noexcept { return lhs.IsSameAs(rhs); } - -/// @relates DblCounter -constexpr bool operator!=(const DblCounter& lhs, const DblCounter& rhs) noexcept { return ! (lhs == rhs); } - /** * Manages a collection of DblCounter metrics. */ -class DblCounterFamily : public MetricFamily { +class DblCounterFamily : public BaseCounterFamily { public: - friend class Manager; - static inline const char* OpaqueName = "DblCounterMetricFamilyVal"; - using InstanceType = DblCounter; + explicit DblCounterFamily(std::string_view prefix, std::string_view name, Span labels, + std::string_view helptext, std::shared_ptr registry, + std::string_view unit = "", bool is_sum = false) + : BaseCounterFamily(prefix, name, labels, helptext, std::move(registry), unit, is_sum) {} DblCounterFamily(const DblCounterFamily&) noexcept = default; - DblCounterFamily& operator=(const DblCounterFamily&) noexcept = default; + DblCounterFamily& operator=(const DblCounterFamily&) noexcept = delete; - /** - * Returns the metrics handle for given labels, creating a new instance - * lazily if necessary. - */ - DblCounter GetOrAdd(Span labels) { return DblCounter{dbl_counter_get_or_add(hdl, labels)}; } - - /** - * @copydoc GetOrAdd - */ - DblCounter GetOrAdd(std::initializer_list labels) { - return GetOrAdd(Span{labels.begin(), labels.size()}); - } - -private: - using Handle = broker::telemetry::dbl_counter_family_hdl*; - - explicit DblCounterFamily(Handle hdl) : MetricFamily(upcast(hdl)) {} + zeek_int_t MetricType() const noexcept override { return BifEnum::Telemetry::MetricType::DOUBLE_COUNTER; } }; namespace detail { diff --git a/src/telemetry/Gauge.h b/src/telemetry/Gauge.h index 22bd131394..68c08fa9d6 100644 --- a/src/telemetry/Gauge.h +++ b/src/telemetry/Gauge.h @@ -8,211 +8,216 @@ #include "zeek/Span.h" #include "zeek/telemetry/MetricFamily.h" +#include "zeek/telemetry/telemetry.bif.h" -#include "broker/telemetry/fwd.hh" +#include "prometheus/family.h" +#include "prometheus/gauge.h" namespace zeek::telemetry { -class DblGaugeFamily; -class IntGaugeFamily; -class Manager; +template +class BaseGauge { +public: + using Handle = prometheus::Gauge; + using FamilyType = prometheus::Family; + + /** + * Increments the value by 1. + */ + void Inc() noexcept { Inc(1); } + + /** + * Increments the value by @p amount. + */ + void Inc(BaseType amount) noexcept { handle.Increment(amount); } + + /** + * Increments the value by 1. + * @return The new value. + */ + BaseType operator++() noexcept { + Inc(1); + return Value(); + } + + /** + * Decrements the value by 1. + */ + void Dec() noexcept { Dec(1); } + + /** + * Decrements the value by @p amount. + */ + void Dec(BaseType amount) noexcept { handle.Decrement(amount); } + + /** + * Decrements the value by 1. + * @return The new value. + */ + BaseType operator--() noexcept { + Dec(1); + return Value(); + } + + BaseType Value() const noexcept { return static_cast(handle.Value()); } + + /** + * Directly sets the value of the gauge. + */ + void Set(BaseType v) { handle.Set(v); } + + bool operator==(const BaseGauge& rhs) const noexcept { return &handle == &rhs.handle; } + bool operator!=(const BaseGauge& rhs) const noexcept { return &handle != &rhs.handle; } + + bool CompareLabels(const prometheus::Labels& lbls) const { return labels == lbls; } + +protected: + explicit BaseGauge(FamilyType& family, const prometheus::Labels& labels) noexcept + : handle(family.Add(labels)), labels(labels) {} + + Handle& handle; + prometheus::Labels labels; + BaseType last_value = 0; +}; /** * A handle to a metric that represents an integer value. Gauges are more * permissive than counters and also allow decrementing the value. */ -class IntGauge { +class IntGauge : public BaseGauge { public: - friend class IntGaugeFamily; - static inline const char* OpaqueName = "IntGaugeMetricVal"; - IntGauge() = delete; - IntGauge(const IntGauge&) noexcept = default; - IntGauge& operator=(const IntGauge&) noexcept = default; + explicit IntGauge(FamilyType& family, const prometheus::Labels& labels) noexcept : BaseGauge(family, labels) {} - /** - * Increments the value by 1. - */ - void Inc() noexcept { broker::telemetry::inc(hdl); } - - /** - * Increments the value by @p amount. - */ - void Inc(int64_t amount) noexcept { broker::telemetry::inc(hdl, amount); } - - /** - * Increments the value by 1. - * @return The new value. - */ - int64_t operator++() noexcept { return broker::telemetry::inc(hdl); } - - /** - * Decrements the value by 1. - */ - void Dec() noexcept { broker::telemetry::dec(hdl); } - - /** - * Decrements the value by @p amount. - */ - void Dec(int64_t amount) noexcept { broker::telemetry::dec(hdl, amount); } - - /** - * Decrements the value by 1. - * @return The new value. - */ - int64_t operator--() noexcept { return broker::telemetry::dec(hdl); } - - /** - * @return The current value. - */ - int64_t Value() const noexcept { return broker::telemetry::value(hdl); } - - /** - * @return Whether @c this and @p other refer to the same counter. - */ - constexpr bool IsSameAs(const IntGauge& other) const noexcept { return hdl == other.hdl; } - -private: - using Handle = broker::telemetry::int_gauge_hdl*; - - explicit IntGauge(Handle hdl) noexcept : hdl(hdl) {} - - Handle hdl; + IntGauge(const IntGauge&) = delete; + IntGauge& operator=(const IntGauge&) = delete; }; /** - * Checks whether two @ref IntGauge handles are identical. - * @return Whether @p lhs and @p rhs refer to the same object. - * @note compare their @c value instead to check for equality. + * A handle to a metric that represents a double value. Gauges are more + * permissive than counters and also allow decrementing the value. */ -constexpr bool operator==(const IntGauge& lhs, const IntGauge& rhs) noexcept { return lhs.IsSameAs(rhs); } +class DblGauge : public BaseGauge { +public: + static inline const char* OpaqueName = "DblGaugeMetricVal"; -/// @relates IntGauge -constexpr bool operator!=(const IntGauge& lhs, const IntGauge& rhs) noexcept { return ! (lhs == rhs); } + explicit DblGauge(FamilyType& family, const prometheus::Labels& labels) noexcept : BaseGauge(family, labels) {} + + DblGauge(const DblGauge&) = delete; + DblGauge& operator=(const DblGauge&) = delete; +}; + +template +class BaseGaugeFamily : public MetricFamily, public std::enable_shared_from_this> { +public: + BaseGaugeFamily(std::string_view prefix, std::string_view name, Span labels, + std::string_view helptext, std::shared_ptr registry, + std::string_view unit = "", bool is_sum = false) + : MetricFamily(prefix, name, labels, helptext, unit, is_sum), + family(prometheus::BuildGauge().Name(full_name).Help(std::string{helptext}).Register(*registry)) {} + + /** + * Returns the metrics handle for given labels, creating a new instance + * lazily if necessary. + */ + std::shared_ptr GetOrAdd(Span labels) { + prometheus::Labels p_labels = BuildPrometheusLabels(labels); + + auto check = [&](const std::shared_ptr& gauge) { return gauge->CompareLabels(p_labels); }; + + if ( auto it = std::find_if(gauges.begin(), gauges.end(), check); it != gauges.end() ) + return *it; + + auto gauge = std::make_shared(family, p_labels); + gauges.push_back(gauge); + return gauge; + } + + /** + * @copydoc GetOrAdd + */ + std::shared_ptr GetOrAdd(std::initializer_list labels) { + return GetOrAdd(Span{labels.begin(), labels.size()}); + } + + std::vector>& GetAllGauges() { return gauges; } + + std::vector Collect() const override { + static auto string_vec_type = zeek::id::find_type("string_vec"); + static auto metric_record_type = zeek::id::find_type("Telemetry::Metric"); + static auto opts_idx = metric_record_type->FieldOffset("opts"); + static auto labels_idx = metric_record_type->FieldOffset("labels"); + static auto value_idx = metric_record_type->FieldOffset("value"); + static auto count_value_idx = metric_record_type->FieldOffset("count_value"); + + RecordValPtr opts_record = GetMetricOptsRecord(); + + std::vector records; + for ( const auto& g : gauges ) { + auto label_values_vec = make_intrusive(string_vec_type); + for ( const auto& [label_key, label] : g->Labels() ) + if ( label_key != "endpoint" ) + label_values_vec->Append(make_intrusive(label)); + + auto r = make_intrusive(metric_record_type); + r->Assign(labels_idx, label_values_vec); + r->Assign(opts_idx, opts_record); + + if constexpr ( std::is_same_v ) + r->Assign(value_idx, zeek::make_intrusive(g->Value())); + else { + r->Assign(value_idx, zeek::make_intrusive(static_cast(g->Value()))); + r->Assign(count_value_idx, val_mgr->Count(g->Value())); + } + + records.push_back(std::move(r)); + } + + return records; + } + +protected: + prometheus::Family& family; + std::vector> gauges; +}; /** * Manages a collection of IntGauge metrics. */ -class IntGaugeFamily : public MetricFamily { +class IntGaugeFamily : public BaseGaugeFamily { public: - friend class Manager; - static inline const char* OpaqueName = "IntGaugeMetricFamilyVal"; - using InstanceType = IntGauge; + IntGaugeFamily(std::string_view prefix, std::string_view name, Span labels, + std::string_view helptext, std::shared_ptr registry, + std::string_view unit = "", bool is_sum = false) + : BaseGaugeFamily(prefix, name, labels, helptext, std::move(registry), unit, is_sum) {} + IntGaugeFamily(const IntGaugeFamily&) noexcept = default; - IntGaugeFamily& operator=(const IntGaugeFamily&) noexcept = default; + IntGaugeFamily& operator=(const IntGaugeFamily&) noexcept = delete; - /** - * Returns the metrics handle for given labels, creating a new instance - * lazily if necessary. - */ - IntGauge GetOrAdd(Span labels) { return IntGauge{int_gauge_get_or_add(hdl, labels)}; } - - /** - * @copydoc GetOrAdd - */ - IntGauge GetOrAdd(std::initializer_list labels) { return GetOrAdd(Span{labels.begin(), labels.size()}); } - -private: - using Handle = broker::telemetry::int_gauge_family_hdl*; - - explicit IntGaugeFamily(Handle hdl) : MetricFamily(upcast(hdl)) {} + zeek_int_t MetricType() const noexcept override { return BifEnum::Telemetry::MetricType::INT_GAUGE; } }; -/** - * A handle to a metric that represents a floating point value. Gauges are more - * permissive than counters and also allow decrementing the value. - */ -class DblGauge { -public: - friend class DblGaugeFamily; - - static inline const char* OpaqueName = "DblGaugeMetricVal"; - - DblGauge() = delete; - DblGauge(const DblGauge&) noexcept = default; - DblGauge& operator=(const DblGauge&) noexcept = default; - - /** - * Increments the value by 1. - */ - void Inc() noexcept { broker::telemetry::inc(hdl); } - - /** - * Increments the value by @p amount. - */ - void Inc(double amount) noexcept { broker::telemetry::inc(hdl, amount); } - - /** - * Increments the value by 1. - */ - void Dec() noexcept { broker::telemetry::dec(hdl); } - - /** - * Increments the value by @p amount. - */ - void Dec(double amount) noexcept { broker::telemetry::dec(hdl, amount); } - - /** - * @return The current value. - */ - double Value() const noexcept { return broker::telemetry::value(hdl); } - - /** - * @return Whether @c this and @p other refer to the same counter. - */ - constexpr bool IsSameAs(const DblGauge& other) const noexcept { return hdl == other.hdl; } - -private: - using Handle = broker::telemetry::dbl_gauge_hdl*; - - explicit DblGauge(Handle hdl) noexcept : hdl(hdl) {} - - Handle hdl; -}; - -/** - * Checks whether two @ref DblGauge handles are identical. - * @return Whether @p lhs and @p rhs refer to the same object. - * @note compare their @c value instead to check for equality. - */ -constexpr bool operator==(const DblGauge& lhs, const DblGauge& rhs) noexcept { return lhs.IsSameAs(rhs); } - -/// @relates DblGauge -constexpr bool operator!=(const DblGauge& lhs, const DblGauge& rhs) noexcept { return ! (lhs == rhs); } - /** * Manages a collection of DblGauge metrics. */ -class DblGaugeFamily : public MetricFamily { +class DblGaugeFamily : public BaseGaugeFamily { public: - friend class Manager; - static inline const char* OpaqueName = "DblGaugeMetricFamilyVal"; - using InstanceType = DblGauge; + DblGaugeFamily(std::string_view prefix, std::string_view name, Span labels, + std::string_view helptext, std::shared_ptr registry, + std::string_view unit = "", bool is_sum = false) + : BaseGaugeFamily(prefix, name, labels, helptext, std::move(registry), unit, is_sum) {} + DblGaugeFamily(const DblGaugeFamily&) noexcept = default; - DblGaugeFamily& operator=(const DblGaugeFamily&) noexcept = default; + DblGaugeFamily& operator=(const DblGaugeFamily&) noexcept = delete; - /** - * Returns the metrics handle for given labels, creating a new instance - * lazily if necessary. - */ - DblGauge GetOrAdd(Span labels) { return DblGauge{dbl_gauge_get_or_add(hdl, labels)}; } - - /** - * @copydoc GetOrAdd - */ - DblGauge GetOrAdd(std::initializer_list labels) { return GetOrAdd(Span{labels.begin(), labels.size()}); } - -private: - using Handle = broker::telemetry::dbl_gauge_family_hdl*; - - explicit DblGaugeFamily(Handle hdl) : MetricFamily(upcast(hdl)) {} + zeek_int_t MetricType() const noexcept override { return BifEnum::Telemetry::MetricType::DOUBLE_GAUGE; } }; namespace detail { diff --git a/src/telemetry/Histogram.h b/src/telemetry/Histogram.h index 2dd2dd3512..0fe5bfb8cd 100644 --- a/src/telemetry/Histogram.h +++ b/src/telemetry/Histogram.h @@ -8,193 +8,234 @@ #include "zeek/Span.h" #include "zeek/telemetry/MetricFamily.h" +#include "zeek/telemetry/telemetry.bif.h" -#include "broker/telemetry/fwd.hh" +#include "prometheus/family.h" +#include "prometheus/histogram.h" namespace zeek::telemetry { -class DblHistogramFamily; -class IntHistogramFamily; -class Manager; +template +class BaseHistogram { +public: + using Handle = prometheus::Histogram; + using FamilyType = prometheus::Family; + + /** + * Increments all buckets with an upper bound less than or equal to @p value + * by one and adds @p value to the total sum of all observed values. + */ + void Observe(BaseType value) noexcept { handle.Observe(value); } + + /// @return The sum of all observed values. + // TODO + BaseType Sum() const noexcept { + auto metric = handle.Collect(); + return static_cast(metric.histogram.sample_sum); + } + + bool operator==(const BaseHistogram& rhs) const noexcept { return &handle == &rhs.handle; } + bool operator!=(const BaseHistogram& rhs) const noexcept { return &handle != &rhs.handle; } + + bool CompareLabels(const prometheus::Labels& lbls) const { return labels == lbls; } + +protected: + explicit BaseHistogram(FamilyType& family, const prometheus::Labels& labels, + prometheus::Histogram::BucketBoundaries bounds) noexcept + : handle(family.Add(labels, std::move(bounds))), labels(labels) {} + + Handle& handle; + prometheus::Labels labels; +}; /** * A handle to a metric that represents an aggregable distribution of observed * measurements with integer precision. Sorts individual measurements into * configurable buckets. */ -class IntHistogram { +class IntHistogram : public BaseHistogram { public: - friend class IntHistogramFamily; - static inline const char* OpaqueName = "IntHistogramMetricVal"; + explicit IntHistogram(FamilyType& family, const prometheus::Labels& labels, + prometheus::Histogram::BucketBoundaries bounds) noexcept + : BaseHistogram(family, labels, std::move(bounds)) {} + IntHistogram() = delete; - IntHistogram(const IntHistogram&) noexcept = default; - IntHistogram& operator=(const IntHistogram&) noexcept = default; - - /** - * Increments all buckets with an upper bound less than or equal to @p value - * by one and adds @p value to the total sum of all observed values. - */ - void Observe(int64_t value) noexcept { return broker::telemetry::observe(hdl, value); } - - /// @return The sum of all observed values. - int64_t Sum() const noexcept { return broker::telemetry::sum(hdl); } - - /// @return The number of buckets, including the implicit "infinite" bucket. - size_t NumBuckets() const noexcept { return broker::telemetry::num_buckets(hdl); } - - /// @return The number of observations in the bucket at @p index. - /// @pre index < NumBuckets() - int64_t CountAt(size_t index) const noexcept { return broker::telemetry::count_at(hdl, index); } - - /// @return The upper bound of the bucket at @p index. - /// @pre index < NumBuckets() - int64_t UpperBoundAt(size_t index) const noexcept { return broker::telemetry::upper_bound_at(hdl, index); } - - /** - * @return Whether @c this and @p other refer to the same histogram. - */ - constexpr bool IsSameAs(const IntHistogram& other) const noexcept { return hdl == other.hdl; } - -private: - using Handle = broker::telemetry::int_histogram_hdl*; - - explicit IntHistogram(Handle hdl) noexcept : hdl(hdl) {} - - Handle hdl; -}; - -/** - * Checks whether two @ref IntHistogram handles are identical. - * @return Whether @p lhs and @p rhs refer to the same object. - */ -constexpr bool operator==(const IntHistogram& lhs, const IntHistogram& rhs) noexcept { return lhs.IsSameAs(rhs); } - -/// @relates IntHistogram -constexpr bool operator!=(const IntHistogram& lhs, const IntHistogram& rhs) noexcept { return ! (lhs == rhs); } - -/** - * Manages a collection of IntHistogram metrics. - */ -class IntHistogramFamily : public MetricFamily { -public: - friend class Manager; - - static inline const char* OpaqueName = "IntHistogramMetricFamilyVal"; - - using InstanceType = IntHistogram; - - IntHistogramFamily(const IntHistogramFamily&) noexcept = default; - IntHistogramFamily& operator=(const IntHistogramFamily&) noexcept = default; - - /** - * Returns the metrics handle for given labels, creating a new instance - * lazily if necessary. - */ - IntHistogram GetOrAdd(Span labels) { return IntHistogram{int_histogram_get_or_add(hdl, labels)}; } - - /** - * @copydoc GetOrAdd - */ - IntHistogram GetOrAdd(std::initializer_list labels) { - return GetOrAdd(Span{labels.begin(), labels.size()}); - } - -private: - using Handle = broker::telemetry::int_histogram_family_hdl*; - - explicit IntHistogramFamily(Handle hdl) : MetricFamily(upcast(hdl)) {} + IntHistogram(const IntHistogram&) noexcept = delete; + IntHistogram& operator=(const IntHistogram&) noexcept = delete; }; /** * A handle to a metric that represents an aggregable distribution of observed - * measurements with floating point precision. Sorts individual measurements - * into configurable buckets. + * measurements with integer precision. Sorts individual measurements into + * configurable buckets. */ -class DblHistogram { +class DblHistogram : public BaseHistogram { public: - friend class DblHistogramFamily; - static inline const char* OpaqueName = "DblHistogramMetricVal"; + explicit DblHistogram(FamilyType& family, const prometheus::Labels& labels, + prometheus::Histogram::BucketBoundaries bounds) noexcept + : BaseHistogram(family, labels, std::move(bounds)) {} + DblHistogram() = delete; - DblHistogram(const DblHistogram&) noexcept = default; - DblHistogram& operator=(const DblHistogram&) noexcept = default; - - /** - * Increments all buckets with an upper bound less than or equal to @p value - * by one and adds @p value to the total sum of all observed values. - */ - void Observe(double value) noexcept { broker::telemetry::observe(hdl, value); } - - /// @return The sum of all observed values. - double Sum() const noexcept { return broker::telemetry::sum(hdl); } - - /// @return The number of buckets, including the implicit "infinite" bucket. - size_t NumBuckets() const noexcept { return broker::telemetry::num_buckets(hdl); } - - /// @return The number of observations in the bucket at @p index. - /// @pre index < NumBuckets() - int64_t CountAt(size_t index) const noexcept { return broker::telemetry::count_at(hdl, index); } - - /// @return The upper bound of the bucket at @p index. - /// @pre index < NumBuckets() - double UpperBoundAt(size_t index) const noexcept { return broker::telemetry::upper_bound_at(hdl, index); } - - /** - * @return Whether @c this and @p other refer to the same histogram. - */ - constexpr bool IsSameAs(const DblHistogram& other) const noexcept { return hdl == other.hdl; } - -private: - using Handle = broker::telemetry::dbl_histogram_hdl*; - - explicit DblHistogram(Handle hdl) noexcept : hdl(hdl) {} - - Handle hdl; + DblHistogram(const DblHistogram&) noexcept = delete; + DblHistogram& operator=(const DblHistogram&) noexcept = delete; }; -/** - * Checks whether two @ref DblHistogram handles are identical. - * @return Whether @p lhs and @p rhs refer to the same object. - */ -constexpr bool operator==(const DblHistogram& lhs, const DblHistogram& rhs) noexcept { return lhs.IsSameAs(rhs); } - -/// @relates DblHistogram -constexpr bool operator!=(const DblHistogram& lhs, const DblHistogram& rhs) noexcept { return ! (lhs == rhs); } - -/** - * Manages a collection of DblHistogram metrics. - */ -class DblHistogramFamily : public MetricFamily { +template +class BaseHistogramFamily : public MetricFamily, + public std::enable_shared_from_this> { public: - friend class Manager; - - static inline const char* OpaqueName = "DblHistogramMetricFamilyVal"; - - using InstanceType = DblHistogram; - - DblHistogramFamily(const DblHistogramFamily&) noexcept = default; - DblHistogramFamily& operator=(const DblHistogramFamily&) noexcept = default; - /** * Returns the metrics handle for given labels, creating a new instance * lazily if necessary. */ - DblHistogram GetOrAdd(Span labels) { return DblHistogram{dbl_histogram_get_or_add(hdl, labels)}; } + std::shared_ptr GetOrAdd(Span labels) { + prometheus::Labels p_labels = BuildPrometheusLabels(labels); + + auto check = [&](const std::shared_ptr& histo) { return histo->CompareLabels(p_labels); }; + + if ( auto it = std::find_if(histograms.begin(), histograms.end(), check); it != histograms.end() ) + return *it; + + auto histogram = std::make_shared(family, p_labels, boundaries); + histograms.push_back(histogram); + return histogram; + } /** * @copydoc GetOrAdd */ - DblHistogram GetOrAdd(std::initializer_list labels) { + std::shared_ptr GetOrAdd(std::initializer_list labels) { return GetOrAdd(Span{labels.begin(), labels.size()}); } -private: - using Handle = broker::telemetry::dbl_histogram_family_hdl*; + std::vector Collect() const override { + static auto string_vec_type = zeek::id::find_type("string_vec"); + static auto double_vec_type = zeek::id::find_type("double_vec"); + static auto count_vec_type = zeek::id::find_type("index_vec"); + static auto histogram_metric_type = zeek::id::find_type("Telemetry::HistogramMetric"); + static auto labels_idx = histogram_metric_type->FieldOffset("labels"); + static auto values_idx = histogram_metric_type->FieldOffset("values"); + static auto count_values_idx = histogram_metric_type->FieldOffset("count_values"); - explicit DblHistogramFamily(Handle hdl) : MetricFamily(upcast(hdl)) {} + static auto observations_idx = histogram_metric_type->FieldOffset("observations"); + static auto count_observations_idx = histogram_metric_type->FieldOffset("count_observations"); + + static auto sum_idx = histogram_metric_type->FieldOffset("sum"); + static auto count_sum_idx = histogram_metric_type->FieldOffset("count_sum"); + + static auto opts_idx = histogram_metric_type->FieldOffset("opts"); + static auto opts_rt = zeek::id::find_type("Telemetry::MetricOpts"); + static auto bounds_idx = opts_rt->FieldOffset("bounds"); + static auto count_bounds_idx = opts_rt->FieldOffset("count_bounds"); + + RecordValPtr opts_record = GetMetricOptsRecord(); + + std::vector records; + for ( const auto& h : histograms ) { + auto label_values_vec = make_intrusive(string_vec_type); + for ( const auto& [label_key, label] : h->Labels() ) + if ( label_key != "endpoint" ) + label_values_vec->Append(make_intrusive(label)); + + auto r = make_intrusive(histogram_metric_type); + r->Assign(labels_idx, label_values_vec); + r->Assign(opts_idx, opts_record); + + auto histo_data = h->Collect(); + + auto counts_double_vec = make_intrusive(double_vec_type); + auto counts_count_vec = make_intrusive(count_vec_type); + uint64_t last = 0.0; + for ( const auto& b : histo_data.bucket ) { + counts_double_vec->Append( + zeek::make_intrusive(static_cast(b.cumulative_count - last))); + counts_count_vec->Append(val_mgr->Count(b.cumulative_count - last)); + last = b.cumulative_count; + } + + // TODO: these could be generated at creation time instead of repeatedly here + auto bounds_vec = make_intrusive(double_vec_type); + auto count_bounds_vec = make_intrusive(count_vec_type); + for ( auto b : boundaries ) { + bounds_vec->Append(zeek::make_intrusive(b)); + count_bounds_vec->Append(val_mgr->Count(static_cast(b))); + } + + bounds_vec->Append(zeek::make_intrusive(std::numeric_limits::infinity())); + count_bounds_vec->Append(val_mgr->Count(std::numeric_limits::infinity())); + + r->Assign(values_idx, counts_double_vec); + r->Assign(observations_idx, zeek::make_intrusive(static_cast(histo_data.sample_count))); + r->Assign(sum_idx, zeek::make_intrusive(histo_data.sample_sum)); + + RecordValPtr local_opts_record = r->GetField(opts_idx); + local_opts_record->Assign(bounds_idx, bounds_vec); + + if constexpr ( ! std::is_same_v ) { + r->Assign(count_values_idx, counts_count_vec); + r->Assign(count_observations_idx, val_mgr->Count(histo_data.sample_count)); + r->Assign(count_sum_idx, val_mgr->Count(static_cast(histo_data.sample_sum))); + r->Assign(count_bounds_idx, count_bounds_vec); + } + + records.push_back(std::move(r)); + } + + return records; + } + +protected: + BaseHistogramFamily(std::string_view prefix, std::string_view name, Span labels, + Span default_upper_bounds, std::string_view helptext, + std::shared_ptr registry, std::string_view unit = "") + : MetricFamily(prefix, name, labels, helptext, unit, false), + family(prometheus::BuildHistogram().Name(full_name).Help(std::string{helptext}).Register(*registry)) { + std::copy(default_upper_bounds.begin(), default_upper_bounds.end(), std::back_inserter(boundaries)); + } + + prometheus::Family& family; + prometheus::Histogram::BucketBoundaries boundaries; + std::vector> histograms; +}; + +/** + * Manages a collection of IntHistogram metrics. + */ +class IntHistogramFamily : public BaseHistogramFamily { +public: + static inline const char* OpaqueName = "IntHistogramMetricFamilyVal"; + + IntHistogramFamily(std::string_view prefix, std::string_view name, Span labels, + Span default_upper_bounds, std::string_view helptext, + std::shared_ptr registry, std::string_view unit = "") + : BaseHistogramFamily(prefix, name, labels, default_upper_bounds, helptext, std::move(registry), unit) {} + + IntHistogramFamily(const IntHistogramFamily&) noexcept = delete; + IntHistogramFamily& operator=(const IntHistogramFamily&) noexcept = delete; + + zeek_int_t MetricType() const noexcept override { return BifEnum::Telemetry::MetricType::INT_HISTOGRAM; } +}; + +/** + * Manages a collection of DblHistogram metrics. + */ +class DblHistogramFamily : public BaseHistogramFamily { +public: + static inline const char* OpaqueName = "DblHistogramMetricFamilyVal"; + + DblHistogramFamily(std::string_view prefix, std::string_view name, Span labels, + Span default_upper_bounds, std::string_view helptext, + std::shared_ptr registry, std::string_view unit = "") + : BaseHistogramFamily(prefix, name, labels, default_upper_bounds, helptext, std::move(registry), unit) {} + + DblHistogramFamily(const DblHistogramFamily&) noexcept = delete; + DblHistogramFamily& operator=(const DblHistogramFamily&) noexcept = delete; + + zeek_int_t MetricType() const noexcept override { return BifEnum::Telemetry::MetricType::DOUBLE_HISTOGRAM; } }; namespace detail { diff --git a/src/telemetry/Manager.cc b/src/telemetry/Manager.cc index 32cefe662c..ebaf7354cd 100644 --- a/src/telemetry/Manager.cc +++ b/src/telemetry/Manager.cc @@ -2,422 +2,291 @@ #include "zeek/telemetry/Manager.h" -#include +#include #include #include #include "zeek/3rdparty/doctest.h" #include "zeek/ID.h" +#include "zeek/ZeekString.h" #include "zeek/broker/Manager.h" +#include "zeek/telemetry/ProcessStats.h" #include "zeek/telemetry/Timer.h" #include "zeek/telemetry/telemetry.bif.h" -#include "broker/telemetry/metric_registry.hh" - -namespace { -using NativeManager = broker::telemetry::metric_registry; -using NativeManagerImpl = broker::telemetry::metric_registry_impl; -using NativeManagerImplPtr = zeek::IntrusivePtr; -using DoubleValPtr = zeek::IntrusivePtr; - -std::vector extract_label_values(broker::telemetry::const_label_list labels) { - auto get_value = [](const auto& label) { return label.second; }; - std::vector v; - std::transform(labels.begin(), labels.end(), std::back_inserter(v), get_value); - return v; -} - -// Convert an int64_t or double to a DoubleValPtr. int64_t is casted. -template -DoubleValPtr as_double_val(T val) { - if constexpr ( std::is_same_v ) { - return zeek::make_intrusive(static_cast(val)); - } - else { - static_assert(std::is_same_v); - return zeek::make_intrusive(val); - } -}; - -} // namespace - namespace zeek::telemetry { -Manager::Manager() { - auto reg = NativeManager::pre_init_instance(); - NativeManagerImplPtr ptr{NewRef{}, reg.pimpl()}; - pimpl.swap(ptr); +Manager::Manager() { prometheus_registry = std::make_shared(); } + +void Manager::InitPostScript() { + std::string prometheus_url; + if ( auto env = getenv("ZEEK_METRICS_PORT") ) + prometheus_url = util::fmt("localhost:%s", env); + else if ( auto env = getenv("BROKER_METRICS_PORT") ) { + // Remove this in v7.1 when the Broker variables are removed + reporter->Warning("BROKER_METRICS_PORT is deprecated, use ZEEK_METRICS_PORT."); + prometheus_url = util::fmt("localhost:%s", env); + } + else { + auto metrics_port = id::find_val("Telemetry::metrics_port")->AsPortVal(); + if ( metrics_port->Port() == 0 ) + // Remove this in v7.1 when the Broker variables are removed + metrics_port = id::find_val("Broker::metrics_port")->AsPortVal(); + + if ( metrics_port->Port() != 0 ) + prometheus_url = util::fmt("localhost:%u", metrics_port->Port()); + } + + if ( ! prometheus_url.empty() ) { + printf("prometheus configured\n"); + + prometheus_exposer = std::make_unique(prometheus_url); + prometheus_exposer->RegisterCollectable(prometheus_registry); + + // Import topics are only enabled if Prometheus is enabled, because we don't care + // to get imported metrics if we're just going to drop them on the floor. + auto topics = import_topics; + if ( auto env = getenv("ZEEK_METRICS_IMPORT_TOPICS") ) { + topics = util::split(std::string{env}, ":"); + } + else if ( auto env = getenv("BROKER_METRICS_IMPORT_TOPICS") ) { + // Remove this in v7.1 when the Broker variables are removed + reporter->Warning("BROKER_METRICS_IMPORT_TOPICS is deprecated, use ZEEK_METRICS_IMPORT_TOPICS."); + topics = util::split(std::string{env}, ":"); + } + else { + auto script_topics = id::find_val("Telemetry::metrics_import_topics")->AsVectorVal(); + if ( script_topics->Size() == 0 ) + // Remove this in v7.1 when the Broker variables are removed + script_topics = id::find_val("Broker::metrics_import_topics")->AsVectorVal(); + + for ( int i = 0; i < script_topics->Size(); i++ ) + topics.push_back(script_topics->StringValAt(i)->ToStdString()); + } + + for ( const auto& topic : topics ) { + broker_mgr->Subscribe(topic); + } + } + + if ( export_topic.empty() ) { + if ( auto env = getenv("ZEEK_METRICS_EXPORT_TOPIC") ) + export_topic = env; + else if ( auto env = getenv("BROKER_METRICS_EXPORT_TOPIC") ) { + // Remove this in v7.1 when the Broker variables are removed + reporter->Warning("BROKER_METRICS_EXPORT_TOPIC is deprecated, use ZEEK_METRICS_EXPORT_TOPIC."); + export_topic = env; + } + else { + auto script_topic = id::find_val("Telemetry::metrics_export_topic")->AsStringVal(); + if ( script_topic->Len() == 0 ) + // Remove this in v7.1 when the Broker variables are removed + script_topic = id::find_val("Broker::metrics_export_topic")->AsStringVal(); + + export_topic = script_topic->ToStdString(); + } + } + + if ( export_endpoint.empty() ) { + if ( auto env = getenv("ZEEK_METRICS_ENDPOINT_NAME") ) + export_endpoint = env; + else if ( auto env = getenv("BROKER_METRICS_ENDPOINT_NAME") ) { + // Remove this in v7.1 when the Broker variables are removed + reporter->Warning("BROKER_METRICS_ENDPOINT_NAME is deprecated, use ZEEK_METRICS_ENDPOINT_NAME."); + export_endpoint = env; + } + else { + auto script_endpoint = id::find_val("Telemetry::metrics_export_endpoint_name")->AsStringVal(); + if ( script_endpoint->Len() == 0 ) + // Remove this in v7.1 when the Broker variables are removed + script_endpoint = id::find_val("Broker::metrics_export_endpoint_name")->AsStringVal(); + + export_endpoint = script_endpoint->ToStdString(); + } + } + + if ( export_interval == 0 ) { + if ( auto env = getenv("ZEEK_METRICS_EXPORT_INTERVAL") ) + export_interval = std::strtod(env, nullptr); + else if ( auto env = getenv("BROKER_METRICS_EXPORT_INTERVAL") ) { + reporter->Warning("BROKER_METRICS_EXPORT_INTERVAL is deprecated, use ZEEK_METRICS_EXPORT_INTERVAL."); + export_interval = std::strtod(env, nullptr); + } + else { + export_interval = id::find_val("Telemetry::metrics_export_interval")->AsInterval(); + if ( export_interval == 0 ) + // Remove this in v7.1 when the Broker variables are removed + export_interval = id::find_val("Broker::metrics_export_interval")->AsInterval(); + } + } + + if ( export_prefixes.empty() ) { + if ( auto env = getenv("ZEEK_METRICS_EXPORT_PREFIXES") ) { + export_prefixes = util::split(std::string{env}, ":"); + } + else if ( auto env = getenv("BROKER_METRICS_EXPORT_PREFIXES") ) { + reporter->Warning("BROKER_METRICS_EXPORT_PREFIXES is deprecated, use ZEEK_METRICS_EXPORT_PREFIXES."); + export_prefixes = util::split(std::string{env}, ":"); + } + else { + auto script_topics = id::find_val("Telemetry::metrics_export_prefixes")->AsVectorVal(); + if ( script_topics->Size() == 0 ) + // Remove this in v7.1 when the Broker variables are removed + script_topics = id::find_val("Broker::metrics_export_prefixes")->AsVectorVal(); + + for ( int i = 0; i < script_topics->Size(); i++ ) + export_prefixes.push_back(script_topics->StringValAt(i)->ToStdString()); + } + } + + // printf("topic: %s\n", export_topic.c_str()); + // printf("endpoint: %s\n", export_endpoint.c_str()); + // printf("interval: %f\n", export_interval); + // printf("prefixes: %zu\n", export_prefixes.size()); + + if ( ! export_topic.empty() && ! export_endpoint.empty() && export_interval > 0 ) { + printf("topic exporter configured\n"); + } + +#ifdef HAVE_PROCESS_STAT_METRICS + static auto get_stats = [this]() -> const detail::process_stats* { + double now = util::current_time(); + if ( this->process_stats_last_updated < now - 0.01 ) { + this->current_process_stats = detail::get_process_stats(); + this->process_stats_last_updated = now; + } + + return &this->current_process_stats; + }; +/* + rss_gauge = + GaugeInstance("process", "resident_memory", {}, "Resident memory size", "bytes", false, + [](metrics_api::ObserverResult r, void* state) { + auto* s = get_stats(); + opentelemetry::nostd::get< + opentelemetry::nostd::shared_ptr>>(r) + ->Observe(s->rss); + }); + + vms_gauge = + GaugeInstance("process", "virtual_memory", {}, "Virtual memory size", "bytes", false, + [](metrics_api::ObserverResult r, void* state) { + auto* s = get_stats(); + opentelemetry::nostd::get< + opentelemetry::nostd::shared_ptr>>(r) + ->Observe(s->vms); + }); + + cpu_gauge = GaugeInstance("process", "cpu", {}, "Total user and system CPU time spent", "seconds", false, + [](metrics_api::ObserverResult r, void* state) { + auto* s = get_stats(); + opentelemetry::nostd::get< + opentelemetry::nostd::shared_ptr>>(r) + ->Observe(s->cpu); + }); + + fds_gauge = + GaugeInstance("process", "open_fds", {}, "Number of open file descriptors", "", false, + [](metrics_api::ObserverResult r, void* state) { + auto* s = get_stats(); + opentelemetry::nostd::get< + opentelemetry::nostd::shared_ptr>>(r) + ->Observe(s->fds); + }); +*/ +#endif } -void Manager::InitPostScript() {} +std::shared_ptr Manager::LookupFamily(std::string_view prefix, std::string_view name) const { + auto check = [&](const auto& fam) { return fam.second->Prefix() == prefix && fam.second->Name() == name; }; -void Manager::InitPostBrokerSetup(broker::endpoint& ep) { - auto reg = NativeManager::merge(NativeManager{pimpl.get()}, ep); - NativeManagerImplPtr ptr{NewRef{}, reg.pimpl()}; - pimpl.swap(ptr); + if ( auto it = std::find_if(families.begin(), families.end(), check); it != families.end() ) + return it->second; + + return nullptr; } // -- collect metric stuff ----------------------------------------------------- -template -zeek::RecordValPtr Manager::GetMetricOptsRecord(Manager::MetricType metric_type, - const broker::telemetry::metric_family_hdl* family) { - static auto string_vec_type = zeek::id::find_type("string_vec"); - static auto double_vec_type = zeek::id::find_type("double_vec"); - static auto count_vec_type = zeek::id::find_type("index_vec"); - static auto metric_opts_type = zeek::id::find_type("Telemetry::MetricOpts"); +ValPtr Manager::CollectMetrics(std::string_view prefix_pattern, std::string_view name_pattern) { + static auto metrics_vector_type = zeek::id::find_type("any_vec"); + VectorValPtr ret_val = make_intrusive(metrics_vector_type); - static auto prefix_idx = metric_opts_type->FieldOffset("prefix"); - static auto name_idx = metric_opts_type->FieldOffset("name"); - static auto help_text_idx = metric_opts_type->FieldOffset("help_text"); - static auto unit_idx = metric_opts_type->FieldOffset("unit"); - static auto is_total_idx = metric_opts_type->FieldOffset("is_total"); - static auto labels_idx = metric_opts_type->FieldOffset("labels"); - static auto bounds_idx = metric_opts_type->FieldOffset("bounds"); - static auto metric_type_idx = metric_opts_type->FieldOffset("metric_type"); + // Build a map of all of the families that match the patterns based on their full prefixed + // name. This will let us match those families against the items returned from the otel reader. + for ( const auto& [name, family] : families ) { + // Histograms are handled by CollectHistogramMetrics and should be ignored here. + if ( family->MetricType() == BifEnum::Telemetry::MetricType::INT_HISTOGRAM || + family->MetricType() == BifEnum::Telemetry::MetricType::DOUBLE_HISTOGRAM ) + continue; - if ( const auto& it = metric_opts_cache.find(family); it != metric_opts_cache.end() ) - return it->second; - - auto r = make_intrusive(metric_opts_type); - r->Assign(prefix_idx, make_intrusive(broker::telemetry::prefix(family))); - r->Assign(name_idx, make_intrusive(broker::telemetry::name(family))); - r->Assign(help_text_idx, make_intrusive(broker::telemetry::helptext(family))); - r->Assign(unit_idx, make_intrusive(broker::telemetry::unit(family))); - r->Assign(is_total_idx, val_mgr->Bool(broker::telemetry::is_sum(family))); - - auto label_names_vec = make_intrusive(string_vec_type); - for ( const auto& l : broker::telemetry::label_names(family) ) - label_names_vec->Append(make_intrusive(l)); - - r->Assign(labels_idx, label_names_vec); - - // This is mapping Manager.h enums to bif values depending on - // the template type and whether this is a counter, gauge or - // histogram. - zeek_int_t metric_type_int = -1; - if constexpr ( std::is_same_v ) { - switch ( metric_type ) { - case MetricType::Counter: metric_type_int = BifEnum::Telemetry::MetricType::DOUBLE_COUNTER; break; - case MetricType::Gauge: metric_type_int = BifEnum::Telemetry::MetricType::DOUBLE_GAUGE; break; - case MetricType::Histogram: metric_type_int = BifEnum::Telemetry::MetricType::DOUBLE_HISTOGRAM; break; - } - } - else { - switch ( metric_type ) { - case MetricType::Counter: metric_type_int = BifEnum::Telemetry::MetricType::INT_COUNTER; break; - case MetricType::Gauge: metric_type_int = BifEnum::Telemetry::MetricType::INT_GAUGE; break; - case MetricType::Histogram: metric_type_int = BifEnum::Telemetry::MetricType::INT_HISTOGRAM; break; + if ( family->Matches(prefix_pattern, name_pattern) ) { + auto records = family->Collect(); + for ( const auto& r : records ) + ret_val->Append(r); } } - if ( metric_type_int < 0 ) - reporter->FatalError("Unable to lookup metric type %d", int(metric_type)); - - r->Assign(metric_type_idx, zeek::BifType::Enum::Telemetry::MetricType->GetEnumVal(metric_type_int)); - - // Add bounds and optionally count_bounds into the MetricOpts record. - static auto opts_rt = zeek::id::find_type("Telemetry::MetricOpts"); - static auto opts_rt_idx_bounds = opts_rt->FieldOffset("bounds"); - static auto opts_rt_idx_count_bounds = opts_rt->FieldOffset("count_bounds"); - - if ( metric_type == MetricType::Histogram ) { - auto add_double_bounds = [](auto& r, const auto* histogram_family) { - size_t buckets = broker::telemetry::num_buckets(histogram_family); - auto bounds_vec = make_intrusive(double_vec_type); - for ( size_t i = 0; i < buckets; i++ ) - bounds_vec->Append(as_double_val(broker::telemetry::upper_bound_at(histogram_family, i))); - - r->Assign(opts_rt_idx_bounds, bounds_vec); - }; - - if constexpr ( std::is_same_v ) { - auto histogram_family = broker::telemetry::as_int_histogram_family(family); - add_double_bounds(r, histogram_family); - - // Add count_bounds to int64_t histograms - size_t buckets = broker::telemetry::num_buckets(histogram_family); - auto count_bounds_vec = make_intrusive(count_vec_type); - for ( size_t i = 0; i < buckets; i++ ) - count_bounds_vec->Append(val_mgr->Count(broker::telemetry::upper_bound_at(histogram_family, i))); - - r->Assign(opts_rt_idx_count_bounds, count_bounds_vec); - } - else { - static_assert(std::is_same_v); - add_double_bounds(r, broker::telemetry::as_dbl_histogram_family(family)); - } - } - - metric_opts_cache.insert({family, r}); - - return r; + return ret_val; } -zeek::RecordValPtr Manager::CollectedValueMetric::AsMetricRecord() const { - static auto string_vec_type = zeek::id::find_type("string_vec"); - static auto metric_record_type = zeek::id::find_type("Telemetry::Metric"); - static auto opts_idx = metric_record_type->FieldOffset("opts"); - static auto labels_idx = metric_record_type->FieldOffset("labels"); - static auto value_idx = metric_record_type->FieldOffset("value"); - static auto count_value_idx = metric_record_type->FieldOffset("count_value"); +ValPtr Manager::CollectHistogramMetrics(std::string_view prefix_pattern, std::string_view name_pattern) { + static auto metrics_vector_type = zeek::id::find_type("any_vec"); + VectorValPtr ret_val = make_intrusive(metrics_vector_type); - auto r = make_intrusive(metric_record_type); + // Build a map of all of the families that match the patterns based on their full prefixed + // name. This will let us match those families against the items returned from the otel reader. + for ( const auto& [name, family] : families ) { + if ( family->MetricType() != BifEnum::Telemetry::MetricType::INT_HISTOGRAM && + family->MetricType() != BifEnum::Telemetry::MetricType::DOUBLE_HISTOGRAM ) + continue; - auto label_values_vec = make_intrusive(string_vec_type); - for ( const auto& l : label_values ) - label_values_vec->Append(make_intrusive(l)); - - r->Assign(labels_idx, label_values_vec); - - auto fn = [&](auto val) { - using val_t = decltype(val); - auto opts_record = telemetry_mgr->GetMetricOptsRecord(metric_type, family); - r->Assign(opts_idx, opts_record); - r->Assign(value_idx, as_double_val(val)); - if constexpr ( std::is_same_v ) - r->Assign(count_value_idx, val_mgr->Count(val)); - }; - - std::visit(fn, value); - - return r; -} - -zeek::RecordValPtr Manager::CollectedHistogramMetric::AsHistogramMetricRecord() const { - static auto string_vec_type = zeek::id::find_type("string_vec"); - static auto double_vec_type = zeek::id::find_type("double_vec"); - static auto count_vec_type = zeek::id::find_type("index_vec"); - static auto histogram_metric_type = zeek::id::find_type("Telemetry::HistogramMetric"); - static auto opts_idx = histogram_metric_type->FieldOffset("opts"); - static auto labels_idx = histogram_metric_type->FieldOffset("labels"); - static auto values_idx = histogram_metric_type->FieldOffset("values"); - static auto count_values_idx = histogram_metric_type->FieldOffset("count_values"); - static auto observations_idx = histogram_metric_type->FieldOffset("observations"); - static auto sum_idx = histogram_metric_type->FieldOffset("sum"); - static auto count_observations_idx = histogram_metric_type->FieldOffset("count_observations"); - static auto count_sum_idx = histogram_metric_type->FieldOffset("count_sum"); - - auto r = make_intrusive(histogram_metric_type); - - auto label_values_vec = make_intrusive(string_vec_type); - for ( const auto& l : label_values ) - label_values_vec->Append(make_intrusive(l)); - - r->Assign(labels_idx, label_values_vec); - - auto fn = [&](const auto& histogram_data) { - using val_t = std::decay_t; - auto opts_record = telemetry_mgr->GetMetricOptsRecord(MetricType::Histogram, family); - r->Assign(opts_idx, opts_record); - - val_t observations = 0; - auto values_vec = make_intrusive(double_vec_type); - auto count_values_vec = make_intrusive(count_vec_type); - - for ( const auto& b : histogram_data.buckets ) { - observations += b.count; - values_vec->Append(as_double_val(b.count)); - if constexpr ( std::is_same_v ) - count_values_vec->Append(val_mgr->Count(b.count)); + if ( family->Matches(prefix_pattern, name_pattern) ) { + auto records = family->Collect(); + for ( const auto& r : records ) + ret_val->Append(r); } + } - r->Assign(values_idx, values_vec); - r->Assign(sum_idx, as_double_val(histogram_data.sum)); - r->Assign(observations_idx, as_double_val(observations)); - - // Add extra fields just for int64_t based histograms with type count - if constexpr ( std::is_same_v ) { - r->Assign(count_values_idx, count_values_vec); - r->Assign(count_sum_idx, val_mgr->Count(histogram_data.sum)); - r->Assign(count_observations_idx, val_mgr->Count(observations)); - } - }; - - std::visit(fn, histogram); - - return r; + return ret_val; } /** - * Encapsulate matching of prefix and name against a broker::telemetry::metric_family_hdl + * Changes the frequency for publishing scraped metrics to the target topic. + * Passing a zero-length interval has no effect. + * @param value Interval between two scrapes in seconds. */ -class MetricFamilyMatcher { -public: - MetricFamilyMatcher(std::string_view prefix, std::string_view name) : prefix_pattern(prefix), name_pattern(name) {} - - /** - * @return true if the given family's prefix and name match, else false; - */ - bool operator()(const broker::telemetry::metric_family_hdl* family) { - auto prefix = std::string{broker::telemetry::prefix(family)}; - auto name = std::string{broker::telemetry::name(family)}; - - return fnmatch(prefix_pattern.c_str(), prefix.c_str(), 0) != FNM_NOMATCH && - fnmatch(name_pattern.c_str(), name.c_str(), 0) != FNM_NOMATCH; - } - -private: - std::string prefix_pattern; - std::string name_pattern; -}; +void Manager::SetMetricsExportInterval(double value) { export_interval = value; } /** - * A collector implementation for counters and gauges. + * Sets a new target topic for the metrics. Passing an empty string has no + * effect. + * @param value The new topic for publishing local metrics to. */ -class MetricsCollector : public broker::telemetry::metrics_collector { - using MetricType = Manager::MetricType; - -public: - MetricsCollector(std::string_view prefix, std::string_view name) : matches(prefix, name) {} - - void operator()(const broker::telemetry::metric_family_hdl* family, - const broker::telemetry::dbl_counter_hdl* counter, - broker::telemetry::const_label_list labels) override { - if ( matches(family) ) - metrics.emplace_back(MetricType::Counter, family, extract_label_values(labels), - broker::telemetry::value(counter)); - } - - void operator()(const broker::telemetry::metric_family_hdl* family, - const broker::telemetry::int_counter_hdl* counter, - broker::telemetry::const_label_list labels) override { - if ( matches(family) ) - metrics.emplace_back(MetricType::Counter, family, extract_label_values(labels), - broker::telemetry::value(counter)); - } - - void operator()(const broker::telemetry::metric_family_hdl* family, const broker::telemetry::dbl_gauge_hdl* gauge, - broker::telemetry::const_label_list labels) override { - if ( matches(family) ) - metrics.emplace_back(MetricType::Gauge, family, extract_label_values(labels), - broker::telemetry::value(gauge)); - } - - void operator()(const broker::telemetry::metric_family_hdl* family, const broker::telemetry::int_gauge_hdl* gauge, - broker::telemetry::const_label_list labels) override { - if ( matches(family) ) - metrics.emplace_back(MetricType::Gauge, family, extract_label_values(labels), - broker::telemetry::value(gauge)); - } - - void operator()(const broker::telemetry::metric_family_hdl* family, - const broker::telemetry::dbl_histogram_hdl* histogram, - broker::telemetry::const_label_list labels) override { - // Ignored - } - - void operator()(const broker::telemetry::metric_family_hdl* family, - const broker::telemetry::int_histogram_hdl* histogram, - broker::telemetry::const_label_list labels) override { - // Ignored - } - - std::vector& GetResult() { return metrics; } - -private: - MetricFamilyMatcher matches; - std::vector metrics; -}; - -std::vector Manager::CollectMetrics(std::string_view prefix, std::string_view name) { - auto collector = MetricsCollector(prefix, name); - - pimpl->collect(collector); - - return std::move(collector.GetResult()); -} +void Manager::SetMetricsExportTopic(std::string value) { export_topic = std::move(value); } /** - * A collector implementation for histograms. + * Sets the import topics for a node importing metrics. + * + * @param topics List of topics from which to import metrics. */ -class HistogramMetricsCollector : public broker::telemetry::metrics_collector { - using MetricType = Manager::MetricType; +void Manager::SetMetricsImportTopics(std::vector topics) { import_topics = std::move(topics); } -public: - HistogramMetricsCollector(std::string_view prefix, std::string_view name) : matches(prefix, name) {} +/** + * Sets a new ID for the metrics exporter. Passing an empty string has no + * effect. + * @param value The new ID of the exporter in published metrics. + */ +void Manager::SetMetricsExportEndpointName(std::string value) { export_endpoint = std::move(value); } - void operator()(const broker::telemetry::metric_family_hdl* family, - const broker::telemetry::dbl_counter_hdl* counter, - broker::telemetry::const_label_list labels) override { - // Ignored - } +/** + * Sets a prefix selection for the metrics exporter. An empty vector selects + * *all* metrics. + * @param filter List of selected metric prefixes or an empty vector for + * selecting all metrics. + */ +void Manager::SetMetricsExportPrefixes(std::vector filter) { export_prefixes = std::move(filter); } - void operator()(const broker::telemetry::metric_family_hdl* family, - const broker::telemetry::int_counter_hdl* counter, - broker::telemetry::const_label_list labels) override { - // Ignored - } - - void operator()(const broker::telemetry::metric_family_hdl* family, const broker::telemetry::dbl_gauge_hdl* gauge, - broker::telemetry::const_label_list labels) override { - // Ignored - } - - void operator()(const broker::telemetry::metric_family_hdl* family, const broker::telemetry::int_gauge_hdl* gauge, - broker::telemetry::const_label_list labels) override { - // Ignored - } - - void operator()(const broker::telemetry::metric_family_hdl* family, - const broker::telemetry::dbl_histogram_hdl* histogram, - broker::telemetry::const_label_list labels) override { - if ( ! matches(family) ) - return; - - size_t num_buckets = broker::telemetry::num_buckets(histogram); - - Manager::CollectedHistogramMetric::DblHistogramData histogram_data; - histogram_data.buckets.reserve(num_buckets); - - for ( size_t i = 0; i < num_buckets; i++ ) { - double c = broker::telemetry::count_at(histogram, i); - double ub = broker::telemetry::upper_bound_at(histogram, i); - histogram_data.buckets.emplace_back(c, ub); - } - - histogram_data.sum = broker::telemetry::sum(histogram); - - metrics.emplace_back(family, extract_label_values(labels), std::move(histogram_data)); - } - - void operator()(const broker::telemetry::metric_family_hdl* family, - const broker::telemetry::int_histogram_hdl* histogram, - broker::telemetry::const_label_list labels) override { - if ( ! matches(family) ) - return; - - size_t num_buckets = broker::telemetry::num_buckets(histogram); - - Manager::CollectedHistogramMetric::IntHistogramData histogram_data; - histogram_data.buckets.reserve(num_buckets); - - for ( size_t i = 0; i < num_buckets; i++ ) { - int64_t c = broker::telemetry::count_at(histogram, i); - int64_t ub = broker::telemetry::upper_bound_at(histogram, i); - histogram_data.buckets.emplace_back(c, ub); - } - - histogram_data.sum = broker::telemetry::sum(histogram); - - metrics.emplace_back(family, extract_label_values(labels), std::move(histogram_data)); - } - - std::vector& GetResult() { return metrics; } - -private: - MetricFamilyMatcher matches; - std::vector metrics; -}; - -std::vector Manager::CollectHistogramMetrics(std::string_view prefix, - std::string_view name) { - auto collector = HistogramMetricsCollector(prefix, name); - - pimpl->collect(collector); - - return std::move(collector.GetResult()); -} } // namespace zeek::telemetry @@ -444,42 +313,42 @@ SCENARIO("telemetry managers provide access to counter families") { WHEN("retrieving an IntCounter family") { auto family = mgr.CounterFamily("zeek", "requests", {"method"}, "test", "1", true); THEN("the family object stores the parameters") { - CHECK_EQ(family.Prefix(), "zeek"sv); - CHECK_EQ(family.Name(), "requests"sv); - CHECK_EQ(toVector(family.LabelNames()), std::vector{"method"s}); - CHECK_EQ(family.Helptext(), "test"sv); - CHECK_EQ(family.Unit(), "1"sv); - CHECK_EQ(family.IsSum(), true); + CHECK_EQ(family->Prefix(), "zeek"sv); + CHECK_EQ(family->Name(), "requests"sv); + CHECK_EQ(toVector(family->LabelNames()), std::vector{"method"s}); + CHECK_EQ(family->Helptext(), "test"sv); + CHECK_EQ(family->Unit(), "1"sv); + CHECK_EQ(family->IsSum(), true); } AND_THEN("GetOrAdd returns the same metric for the same labels") { - auto first = family.GetOrAdd({{"method", "get"}}); - auto second = family.GetOrAdd({{"method", "get"}}); + auto first = family->GetOrAdd({{"method", "get"}}); + auto second = family->GetOrAdd({{"method", "get"}}); CHECK_EQ(first, second); } AND_THEN("GetOrAdd returns different metric for the disjoint labels") { - auto first = family.GetOrAdd({{"method", "get"}}); - auto second = family.GetOrAdd({{"method", "put"}}); + auto first = family->GetOrAdd({{"method", "get"}}); + auto second = family->GetOrAdd({{"method", "put"}}); CHECK_NE(first, second); } } WHEN("retrieving a DblCounter family") { auto family = mgr.CounterFamily("zeek", "runtime", {"query"}, "test", "seconds", true); THEN("the family object stores the parameters") { - CHECK_EQ(family.Prefix(), "zeek"sv); - CHECK_EQ(family.Name(), "runtime"sv); - CHECK_EQ(toVector(family.LabelNames()), std::vector{"query"s}); - CHECK_EQ(family.Helptext(), "test"sv); - CHECK_EQ(family.Unit(), "seconds"sv); - CHECK_EQ(family.IsSum(), true); + CHECK_EQ(family->Prefix(), "zeek"sv); + CHECK_EQ(family->Name(), "runtime"sv); + CHECK_EQ(toVector(family->LabelNames()), std::vector{"query"s}); + CHECK_EQ(family->Helptext(), "test"sv); + CHECK_EQ(family->Unit(), "seconds"sv); + CHECK_EQ(family->IsSum(), true); } AND_THEN("GetOrAdd returns the same metric for the same labels") { - auto first = family.GetOrAdd({{"query", "foo"}}); - auto second = family.GetOrAdd({{"query", "foo"}}); + auto first = family->GetOrAdd({{"query", "foo"}}); + auto second = family->GetOrAdd({{"query", "foo"}}); CHECK_EQ(first, second); } AND_THEN("GetOrAdd returns different metric for the disjoint labels") { - auto first = family.GetOrAdd({{"query", "foo"}}); - auto second = family.GetOrAdd({{"query", "bar"}}); + auto first = family->GetOrAdd({{"query", "foo"}}); + auto second = family->GetOrAdd({{"query", "bar"}}); CHECK_NE(first, second); } } @@ -490,44 +359,44 @@ SCENARIO("telemetry managers provide access to gauge families") { GIVEN("a telemetry manager") { Manager mgr; WHEN("retrieving an IntGauge family") { - auto family = mgr.GaugeFamily("zeek", "open-connections", {"protocol"}, "test"); + auto family = mgr.GaugeFamily("zeek", "open-connections", {"protocol"}, "test", "1"); THEN("the family object stores the parameters") { - CHECK_EQ(family.Prefix(), "zeek"sv); - CHECK_EQ(family.Name(), "open-connections"sv); - CHECK_EQ(toVector(family.LabelNames()), std::vector{"protocol"s}); - CHECK_EQ(family.Helptext(), "test"sv); - CHECK_EQ(family.Unit(), "1"sv); - CHECK_EQ(family.IsSum(), false); + CHECK_EQ(family->Prefix(), "zeek"sv); + CHECK_EQ(family->Name(), "open_connections"sv); + CHECK_EQ(toVector(family->LabelNames()), std::vector{"protocol"s}); + CHECK_EQ(family->Helptext(), "test"sv); + CHECK_EQ(family->Unit(), "1"sv); + CHECK_EQ(family->IsSum(), false); } AND_THEN("GetOrAdd returns the same metric for the same labels") { - auto first = family.GetOrAdd({{"protocol", "tcp"}}); - auto second = family.GetOrAdd({{"protocol", "tcp"}}); + auto first = family->GetOrAdd({{"protocol", "tcp"}}); + auto second = family->GetOrAdd({{"protocol", "tcp"}}); CHECK_EQ(first, second); } AND_THEN("GetOrAdd returns different metric for the disjoint labels") { - auto first = family.GetOrAdd({{"protocol", "tcp"}}); - auto second = family.GetOrAdd({{"protocol", "quic"}}); + auto first = family->GetOrAdd({{"protocol", "tcp"}}); + auto second = family->GetOrAdd({{"protocol", "quic"}}); CHECK_NE(first, second); } } WHEN("retrieving a DblGauge family") { auto family = mgr.GaugeFamily("zeek", "water-level", {"river"}, "test", "meters"); THEN("the family object stores the parameters") { - CHECK_EQ(family.Prefix(), "zeek"sv); - CHECK_EQ(family.Name(), "water-level"sv); - CHECK_EQ(toVector(family.LabelNames()), std::vector{"river"s}); - CHECK_EQ(family.Helptext(), "test"sv); - CHECK_EQ(family.Unit(), "meters"sv); - CHECK_EQ(family.IsSum(), false); + CHECK_EQ(family->Prefix(), "zeek"sv); + CHECK_EQ(family->Name(), "water_level"sv); + CHECK_EQ(toVector(family->LabelNames()), std::vector{"river"s}); + CHECK_EQ(family->Helptext(), "test"sv); + CHECK_EQ(family->Unit(), "meters"sv); + CHECK_EQ(family->IsSum(), false); } AND_THEN("GetOrAdd returns the same metric for the same labels") { - auto first = family.GetOrAdd({{"river", "Sacramento"}}); - auto second = family.GetOrAdd({{"river", "Sacramento"}}); + auto first = family->GetOrAdd({{"river", "Sacramento"}}); + auto second = family->GetOrAdd({{"river", "Sacramento"}}); CHECK_EQ(first, second); } AND_THEN("GetOrAdd returns different metric for the disjoint labels") { - auto first = family.GetOrAdd({{"query", "Sacramento"}}); - auto second = family.GetOrAdd({{"query", "San Joaquin"}}); + auto first = family->GetOrAdd({{"query", "Sacramento"}}); + auto second = family->GetOrAdd({{"query", "San Joaquin"}}); CHECK_NE(first, second); } } @@ -541,21 +410,21 @@ SCENARIO("telemetry managers provide access to histogram families") { int64_t buckets[] = {10, 20}; auto family = mgr.HistogramFamily("zeek", "payload-size", {"protocol"}, buckets, "test", "bytes"); THEN("the family object stores the parameters") { - CHECK_EQ(family.Prefix(), "zeek"sv); - CHECK_EQ(family.Name(), "payload-size"sv); - CHECK_EQ(toVector(family.LabelNames()), std::vector{"protocol"s}); - CHECK_EQ(family.Helptext(), "test"sv); - CHECK_EQ(family.Unit(), "bytes"sv); - CHECK_EQ(family.IsSum(), false); + CHECK_EQ(family->Prefix(), "zeek"sv); + CHECK_EQ(family->Name(), "payload_size"sv); + CHECK_EQ(toVector(family->LabelNames()), std::vector{"protocol"s}); + CHECK_EQ(family->Helptext(), "test"sv); + CHECK_EQ(family->Unit(), "bytes"sv); + CHECK_EQ(family->IsSum(), false); } AND_THEN("GetOrAdd returns the same metric for the same labels") { - auto first = family.GetOrAdd({{"protocol", "tcp"}}); - auto second = family.GetOrAdd({{"protocol", "tcp"}}); + auto first = family->GetOrAdd({{"protocol", "tcp"}}); + auto second = family->GetOrAdd({{"protocol", "tcp"}}); CHECK_EQ(first, second); } AND_THEN("GetOrAdd returns different metric for the disjoint labels") { - auto first = family.GetOrAdd({{"protocol", "tcp"}}); - auto second = family.GetOrAdd({{"protocol", "udp"}}); + auto first = family->GetOrAdd({{"protocol", "tcp"}}); + auto second = family->GetOrAdd({{"protocol", "udp"}}); CHECK_NE(first, second); } } @@ -563,31 +432,31 @@ SCENARIO("telemetry managers provide access to histogram families") { double buckets[] = {10.0, 20.0}; auto family = mgr.HistogramFamily("zeek", "parse-time", {"protocol"}, buckets, "test", "seconds"); THEN("the family object stores the parameters") { - CHECK_EQ(family.Prefix(), "zeek"sv); - CHECK_EQ(family.Name(), "parse-time"sv); - CHECK_EQ(toVector(family.LabelNames()), std::vector{"protocol"s}); - CHECK_EQ(family.Helptext(), "test"sv); - CHECK_EQ(family.Unit(), "seconds"sv); - CHECK_EQ(family.IsSum(), false); + CHECK_EQ(family->Prefix(), "zeek"sv); + CHECK_EQ(family->Name(), "parse_time"sv); + CHECK_EQ(toVector(family->LabelNames()), std::vector{"protocol"s}); + CHECK_EQ(family->Helptext(), "test"sv); + CHECK_EQ(family->Unit(), "seconds"sv); + CHECK_EQ(family->IsSum(), false); } AND_THEN("GetOrAdd returns the same metric for the same labels") { - auto first = family.GetOrAdd({{"protocol", "tcp"}}); - auto second = family.GetOrAdd({{"protocol", "tcp"}}); + auto first = family->GetOrAdd({{"protocol", "tcp"}}); + auto second = family->GetOrAdd({{"protocol", "tcp"}}); CHECK_EQ(first, second); } AND_THEN("GetOrAdd returns different metric for the disjoint labels") { - auto first = family.GetOrAdd({{"protocol", "tcp"}}); - auto second = family.GetOrAdd({{"protocol", "udp"}}); + auto first = family->GetOrAdd({{"protocol", "tcp"}}); + auto second = family->GetOrAdd({{"protocol", "udp"}}); CHECK_NE(first, second); } AND_THEN("Timers add observations to histograms") { - auto hg = family.GetOrAdd({{"protocol", "tst"}}); - CHECK_EQ(hg.Sum(), 0.0); + auto hg = family->GetOrAdd({{"protocol", "tst"}}); + CHECK_EQ(hg->Sum(), 0.0); { Timer observer{hg}; std::this_thread::sleep_for(1ms); } - CHECK_NE(hg.Sum(), 0.0); + CHECK_NE(hg->Sum(), 0.0); } } } diff --git a/src/telemetry/Manager.h b/src/telemetry/Manager.h index 96fa6d994f..9c0c46c075 100644 --- a/src/telemetry/Manager.h +++ b/src/telemetry/Manager.h @@ -2,11 +2,11 @@ #pragma once +#include #include #include +#include #include -#include -#include #include #include "zeek/IntrusivePtr.h" @@ -14,149 +14,52 @@ #include "zeek/telemetry/Counter.h" #include "zeek/telemetry/Gauge.h" #include "zeek/telemetry/Histogram.h" +#include "zeek/telemetry/ProcessStats.h" -#include "broker/telemetry/fwd.hh" - -namespace broker { -class endpoint; -} +#include "prometheus/exposer.h" +#include "prometheus/registry.h" namespace zeek { class RecordVal; using RecordValPtr = IntrusivePtr; } // namespace zeek -namespace zeek::Broker { -class Manager; -} - namespace zeek::telemetry { +class OtelReader; + /** * Manages a collection of metric families. */ -class Manager { +class Manager final { public: - friend class Broker::Manager; - Manager(); Manager(const Manager&) = delete; Manager& operator=(const Manager&) = delete; - virtual ~Manager() = default; + ~Manager() = default; /** * Initialization of the manager. This is called late during Zeek's * initialization after any scripts are processed. */ - virtual void InitPostScript(); + void InitPostScript(); /** - * Supported metric types. - */ - enum class MetricType { Counter, Gauge, Histogram }; - - /** - * Captures information about counter and gauge metrics. - */ - struct CollectedValueMetric { - /** - * Constructor. - * @param metric_type The type of this metric. - * @param family Broker layer family handle for this metric. - * @param label_values The string values for each of the metric's labels. - * @param value The metric's current value. - */ - CollectedValueMetric(MetricType metric_type, const broker::telemetry::metric_family_hdl* family, - std::vector label_values, std::variant value) - : metric_type(metric_type), family(family), label_values(std::move(label_values)), value(value) {} - - /** - * @return A script layer Telemetry::Metric record for this metric. - */ - zeek::RecordValPtr AsMetricRecord() const; - - enum MetricType metric_type; - const broker::telemetry::metric_family_hdl* family; - std::vector label_values; - std::variant value; - }; - - /** - * Captures information about histogram metrics. - */ - struct CollectedHistogramMetric { - /** - * Helper struct representing a single bucket of a histogram. - * @tparam T The data type used by the histogram (double or int64_t). - */ - template - struct Bucket { - Bucket(T count, T upper_bound) : count(count), upper_bound(upper_bound) {} - - T count; - T upper_bound; - }; - - /** - * Helper struct representing a histogram as sum and buckets. - * @tparam T The data type used by the histogram (double or int64_t). - */ - template - struct HistogramData { - T sum; - std::vector> buckets; - }; - - using DblHistogramData = HistogramData; - using IntHistogramData = HistogramData; - - /** - * Constructor. - * @param family Broker layer family handle for this metric. - * @param label_values The string values for each of the metric's labels. - * @param histogram The histogram's data (sum and individual buckets). - */ - CollectedHistogramMetric(const broker::telemetry::metric_family_hdl* family, - std::vector label_values, - std::variant histogram) - - : family(family), label_values(std::move(label_values)), histogram(std::move(histogram)) {} - - const broker::telemetry::metric_family_hdl* family; - std::vector label_values; - std::variant histogram; - - /** - * @return A script layer Telemetry::HistogramMetric record for this histogram. - */ - zeek::RecordValPtr AsHistogramMetricRecord() const; - }; - - /** - * @return A script layer Telemetry::MetricOpts record for the given metric family. - * @param metric_typ The type of metric. - * @param family Broker layer family handle for the family. - * @tparam T The underlying data type (double or int64_t) - */ - template - zeek::RecordValPtr GetMetricOptsRecord(MetricType metric_type, const broker::telemetry::metric_family_hdl* family); - - /** - * @return All counter and gauge metrics and their values matching prefix and name. + * @return A VectorVal containing all counter and gauge metrics and their values matching prefix and name. * @param prefix The prefix pattern to use for filtering. Supports globbing. * @param name The name pattern to use for filtering. Supports globbing. */ - std::vector CollectMetrics(std::string_view prefix, std::string_view name); + ValPtr CollectMetrics(std::string_view prefix, std::string_view name); /** - * @return All histogram metrics and their data matching prefix and name. + * @return A VectorVal containing all histogram metrics and their values matching prefix and name. * @param prefix The prefix pattern to use for filtering. Supports globbing. * @param name The name pattern to use for filtering. Supports globbing. */ - std::vector CollectHistogramMetrics(std::string_view prefix, std::string_view name); + ValPtr CollectHistogramMetrics(std::string_view prefix, std::string_view name); /** * @return A counter metric family. Creates the family lazily if necessary. @@ -165,27 +68,41 @@ public: * @param labels Names for all label dimensions of the metric. * @param helptext Short explanation of the metric. * @param unit Unit of measurement. - * @param is_sum Indicates whether this metric accumulates something, where - * only the total value is of interest. + * @param is_sum Indicates whether this metric accumulates something, where only the total value is of interest. + * @param callback Passing a callback method will enable asynchronous mode. The callback method will be called by + * the metrics subsystem whenever data is requested. */ template auto CounterFamily(std::string_view prefix, std::string_view name, Span labels, - std::string_view helptext, std::string_view unit = "1", bool is_sum = false) { + std::string_view helptext, std::string_view unit = "", bool is_sum = false) { + auto fam = LookupFamily(prefix, name); + if constexpr ( std::is_same::value ) { - auto fam = int_counter_fam(Ptr(), prefix, name, labels, helptext, unit, is_sum); - return IntCounterFamily{fam}; + if ( fam ) + return std::static_pointer_cast(fam); + + auto int_fam = + std::make_shared(prefix, name, labels, helptext, prometheus_registry, unit, is_sum); + families.insert_or_assign(int_fam->FullName(), int_fam); + return int_fam; } else { static_assert(std::is_same::value, "metrics only support int64_t and double values"); - auto fam = dbl_counter_fam(Ptr(), prefix, name, labels, helptext, unit, is_sum); - return DblCounterFamily{fam}; + + if ( fam ) + return std::static_pointer_cast(fam); + + auto dbl_fam = + std::make_shared(prefix, name, labels, helptext, prometheus_registry, unit, is_sum); + families.insert_or_assign(dbl_fam->FullName(), dbl_fam); + return dbl_fam; } } /// @copydoc CounterFamily template auto CounterFamily(std::string_view prefix, std::string_view name, std::initializer_list labels, - std::string_view helptext, std::string_view unit = "1", bool is_sum = false) { + std::string_view helptext, std::string_view unit = "", bool is_sum = false) { auto lbl_span = Span{labels.begin(), labels.size()}; return CounterFamily(prefix, name, lbl_span, helptext, unit, is_sum); } @@ -198,25 +115,28 @@ public: * @param labels Values for all label dimensions of the metric. * @param helptext Short explanation of the metric. * @param unit Unit of measurement. - * @param is_sum Indicates whether this metric accumulates something, where - * only the total value is of interest. + * @param is_sum Indicates whether this metric accumulates something, where only the total value is of interest. + * @param callback Passing a callback method will enable asynchronous mode. The callback method will be called by + * the metrics subsystem whenever data is requested. */ template - Counter CounterInstance(std::string_view prefix, std::string_view name, Span labels, - std::string_view helptext, std::string_view unit = "1", bool is_sum = false) { + std::shared_ptr> CounterInstance(std::string_view prefix, std::string_view name, + Span labels, std::string_view helptext, + std::string_view unit = "", bool is_sum = false) { return WithLabelNames(labels, [&, this](auto labelNames) { auto family = CounterFamily(prefix, name, labelNames, helptext, unit, is_sum); - return family.getOrAdd(labels); + return family->GetOrAdd(labels); }); } /// @copydoc counterInstance template - Counter CounterInstance(std::string_view prefix, std::string_view name, - std::initializer_list labels, std::string_view helptext, - std::string_view unit = "1", bool is_sum = false) { + std::shared_ptr> CounterInstance(std::string_view prefix, std::string_view name, + std::initializer_list labels, + std::string_view helptext, std::string_view unit = "", + bool is_sum = false) { auto lbl_span = Span{labels.begin(), labels.size()}; - return CounterInstance(prefix, name, lbl_span, helptext, unit, is_sum); + return CounterInstance(prefix, name, lbl_span, helptext, unit, is_sum); } /** @@ -226,27 +146,40 @@ public: * @param labels Names for all label dimensions of the metric. * @param helptext Short explanation of the metric. * @param unit Unit of measurement. - * @param is_sum Indicates whether this metric accumulates something, where - * only the total value is of interest. + * @param is_sum Indicates whether this metric accumulates something, where only the total value is of interest. + * @param callback Passing a callback method will enable asynchronous mode. The callback method will be called by + * the metrics subsystem whenever data is requested. */ template auto GaugeFamily(std::string_view prefix, std::string_view name, Span labels, - std::string_view helptext, std::string_view unit = "1", bool is_sum = false) { + std::string_view helptext, std::string_view unit = "", bool is_sum = false) { + auto fam = LookupFamily(prefix, name); + if constexpr ( std::is_same::value ) { - auto fam = int_gauge_fam(Ptr(), prefix, name, labels, helptext, unit, is_sum); - return IntGaugeFamily{fam}; + if ( fam ) + return std::static_pointer_cast(fam); + + auto int_fam = + std::make_shared(prefix, name, labels, helptext, prometheus_registry, unit, is_sum); + families.insert_or_assign(int_fam->FullName(), int_fam); + return int_fam; } else { static_assert(std::is_same::value, "metrics only support int64_t and double values"); - auto fam = dbl_gauge_fam(Ptr(), prefix, name, labels, helptext, unit, is_sum); - return DblGaugeFamily{fam}; + if ( fam ) + return std::static_pointer_cast(fam); + + auto dbl_fam = + std::make_shared(prefix, name, labels, helptext, prometheus_registry, unit, is_sum); + families.insert_or_assign(dbl_fam->FullName(), dbl_fam); + return dbl_fam; } } /// @copydoc GaugeFamily template auto GaugeFamily(std::string_view prefix, std::string_view name, std::initializer_list labels, - std::string_view helptext, std::string_view unit = "1", bool is_sum = false) { + std::string_view helptext, std::string_view unit = "", bool is_sum = false) { auto lbl_span = Span{labels.begin(), labels.size()}; return GaugeFamily(prefix, name, lbl_span, helptext, unit, is_sum); } @@ -259,25 +192,27 @@ public: * @param labels Values for all label dimensions of the metric. * @param helptext Short explanation of the metric. * @param unit Unit of measurement. - * @param is_sum Indicates whether this metric accumulates something, where - * only the total value is of interest. + * @param is_sum Indicates whether this metric accumulates something, where only the total value is of interest. + * @param callback Passing a callback method will enable asynchronous mode. The callback method will be called by + * the metrics subsystem whenever data is requested. */ template - Gauge GaugeInstance(std::string_view prefix, std::string_view name, Span labels, - std::string_view helptext, std::string_view unit = "1", bool is_sum = false) { + std::shared_ptr> GaugeInstance(std::string_view prefix, std::string_view name, + Span labels, std::string_view helptext, + std::string_view unit = "", bool is_sum = false) { return WithLabelNames(labels, [&, this](auto labelNames) { auto family = GaugeFamily(prefix, name, labelNames, helptext, unit, is_sum); - return family.getOrAdd(labels); + return family->GetOrAdd(labels); }); } /// @copydoc GaugeInstance template - Gauge GaugeInstance(std::string_view prefix, std::string_view name, - std::initializer_list labels, std::string_view helptext, - std::string_view unit = "1", bool is_sum = false) { + std::shared_ptr> GaugeInstance(std::string_view prefix, std::string_view name, + std::initializer_list labels, std::string_view helptext, + std::string_view unit = "", bool is_sum = false) { auto lbl_span = Span{labels.begin(), labels.size()}; - return GaugeInstance(prefix, name, lbl_span, helptext, unit, is_sum); + return GaugeInstance(prefix, name, lbl_span, helptext, unit, is_sum); } // Forces the compiler to use the type `Span` instead of trying to @@ -315,15 +250,27 @@ public: template auto HistogramFamily(std::string_view prefix, std::string_view name, Span labels, ConstSpan default_upper_bounds, std::string_view helptext, - std::string_view unit = "1", bool is_sum = false) { + std::string_view unit = "") { + auto fam = LookupFamily(prefix, name); + if constexpr ( std::is_same::value ) { - auto fam = int_histogram_fam(Ptr(), prefix, name, labels, default_upper_bounds, helptext, unit, is_sum); - return IntHistogramFamily{fam}; + if ( fam ) + return std::static_pointer_cast(fam); + + auto int_fam = std::make_shared(prefix, name, labels, default_upper_bounds, helptext, + prometheus_registry, unit); + families.insert_or_assign(int_fam->FullName(), int_fam); + return int_fam; } else { static_assert(std::is_same::value, "metrics only support int64_t and double values"); - auto fam = dbl_histogram_fam(Ptr(), prefix, name, labels, default_upper_bounds, helptext, unit, is_sum); - return DblHistogramFamily{fam}; + if ( fam ) + return std::static_pointer_cast(fam); + + auto dbl_fam = std::make_shared(prefix, name, labels, default_upper_bounds, helptext, + prometheus_registry, unit); + families.insert_or_assign(dbl_fam->FullName(), dbl_fam); + return dbl_fam; } } @@ -331,9 +278,9 @@ public: template auto HistogramFamily(std::string_view prefix, std::string_view name, std::initializer_list labels, ConstSpan default_upper_bounds, std::string_view helptext, - std::string_view unit = "1", bool is_sum = false) { + std::string_view unit = "") { auto lbl_span = Span{labels.begin(), labels.size()}; - return HistogramFamily(prefix, name, lbl_span, default_upper_bounds, helptext, unit, is_sum); + return HistogramFamily(prefix, name, lbl_span, default_upper_bounds, helptext, unit); } /** @@ -357,29 +304,76 @@ public: * @p default_upper_bounds via run-time configuration. */ template - Histogram HistogramInstance(std::string_view prefix, std::string_view name, Span labels, - ConstSpan default_upper_bounds, std::string_view helptext, - std::string_view unit = "1", bool is_sum = false) { + std::shared_ptr> HistogramInstance(std::string_view prefix, std::string_view name, + Span labels, + ConstSpan default_upper_bounds, + std::string_view helptext, std::string_view unit = "") { return WithLabelNames(labels, [&, this](auto labelNames) { - auto family = - HistogramFamily(prefix, name, labelNames, default_upper_bounds, helptext, unit, is_sum); - return family.getOrAdd(labels); + auto family = HistogramFamily(prefix, name, labelNames, default_upper_bounds, helptext, unit); + return family->GetOrAdd(labels); }); } /// @copdoc HistogramInstance template - Histogram HistogramInstance(std::string_view prefix, std::string_view name, - std::initializer_list labels, - ConstSpan default_upper_bounds, std::string_view helptext, - std::string_view unit = "1", bool is_sum = false) { + std::shared_ptr> HistogramInstance(std::string_view prefix, std::string_view name, + std::initializer_list labels, + std::initializer_list default_upper_bounds, + std::string_view helptext, std::string_view unit = "") { auto lbls = Span{labels.begin(), labels.size()}; - return HistogramInstance(prefix, name, lbls, default_upper_bounds, helptext, unit, is_sum); + auto bounds = Span{default_upper_bounds.begin(), default_upper_bounds.size()}; + return HistogramInstance(prefix, name, lbls, bounds, helptext, unit); + } + + /** + * Changes the frequency for publishing scraped metrics to the target topic. + * Passing a zero-length interval has no effect. + * @param value Interval between two scrapes in seconds. + */ + void SetMetricsExportInterval(double value); + + /** + * Sets a new target topic for the metrics. Passing an empty string has no + * effect. + * @param value The new topic for publishing local metrics to. + */ + void SetMetricsExportTopic(std::string value); + + /** + * Sets the import topics for a node importing metrics. + * + * @param topics List of topics from which to import metrics. + */ + void SetMetricsImportTopics(std::vector topics); + + /** + * Sets a new ID for the metrics exporter. Passing an empty string has no + * effect. + * @param value The new ID of the exporter in published metrics. + */ + void SetMetricsExportEndpointName(std::string value); + + /** + * Sets a prefix selection for the metrics exporter. An empty vector selects + * *all* metrics. + * @param filter List of selected metric prefixes or an empty vector for + * selecting all metrics. + */ + void SetMetricsExportPrefixes(std::vector filter); + + bool IsExporting() const { return ! export_topic.empty() && ! export_endpoint.empty(); } + + const std::string& MetricsSchema() const { return metrics_schema; } + + std::shared_ptr GetFamilyByFullName(const std::string& full_name) const { + if ( auto it = families.find(full_name); it != families.end() ) + return it->second; + return nullptr; } protected: template - static void WithLabelNames(Span xs, F continuation) { + static auto WithLabelNames(Span xs, F continuation) { if ( xs.size() <= 10 ) { std::string_view buf[10]; for ( size_t index = 0; index < xs.size(); ++index ) @@ -390,29 +384,41 @@ protected: else { std::vector buf; for ( auto x : xs ) - buf.emplace_back(x.first, x.second); + buf.emplace_back(x.first); return continuation(Span{buf}); } } - broker::telemetry::metric_registry_impl* Ptr() { return pimpl.get(); } - - // Connects all the dots after the Broker Manager constructed the endpoint - // for this Zeek instance. Called from Broker::Manager::InitPostScript(). - void InitPostBrokerSetup(broker::endpoint&); - - IntrusivePtr pimpl; - private: - // Caching of metric_family_hdl instances to their Zeek record representation. - std::unordered_map metric_opts_cache; + std::shared_ptr LookupFamily(std::string_view prefix, std::string_view name) const; + + std::string metrics_schema; + + std::shared_ptr otel_reader; + std::map> families; + + detail::process_stats current_process_stats; + double process_stats_last_updated = 0.0; + + std::shared_ptr rss_gauge; + std::shared_ptr vms_gauge; + std::shared_ptr cpu_gauge; + std::shared_ptr fds_gauge; + + std::string export_topic; + std::vector import_topics; + std::string export_endpoint; + std::vector export_prefixes; + double export_interval = 0.0; + + std::shared_ptr prometheus_registry; + std::unique_ptr prometheus_exposer; }; } // namespace zeek::telemetry namespace zeek { - extern telemetry::Manager* telemetry_mgr; } // namespace zeek diff --git a/src/telemetry/MetricFamily.cc b/src/telemetry/MetricFamily.cc new file mode 100644 index 0000000000..035661013f --- /dev/null +++ b/src/telemetry/MetricFamily.cc @@ -0,0 +1,78 @@ +#include "zeek/telemetry/MetricFamily.h" + +#include + +#include "zeek/Val.h" +#include "zeek/telemetry/telemetry.bif.h" + +namespace zeek::telemetry { + +MetricFamily::MetricFamily(std::string_view prefix, std::string_view name, Span lbls, + std::string_view helptext, std::string_view unit, bool is_sum) + : prefix(prefix), helptext(helptext), unit(unit), is_sum(is_sum) { + this->name = util::strreplace(std::string{name}, "-", "_"); + for ( const auto& lbl : lbls ) { + labels.emplace_back(lbl); + } + + full_name = util::fmt("%s_%s", this->prefix.c_str(), this->name.c_str()); +} + +RecordValPtr MetricFamily::GetMetricOptsRecord() const { + if ( record_val ) + return record_val; + + static auto string_vec_type = zeek::id::find_type("string_vec"); + static auto metric_opts_type = zeek::id::find_type("Telemetry::MetricOpts"); + + static auto prefix_idx = metric_opts_type->FieldOffset("prefix"); + static auto name_idx = metric_opts_type->FieldOffset("name"); + static auto help_text_idx = metric_opts_type->FieldOffset("help_text"); + static auto unit_idx = metric_opts_type->FieldOffset("unit"); + static auto is_total_idx = metric_opts_type->FieldOffset("is_total"); + static auto labels_idx = metric_opts_type->FieldOffset("labels"); + static auto bounds_idx = metric_opts_type->FieldOffset("bounds"); + static auto metric_type_idx = metric_opts_type->FieldOffset("metric_type"); + + record_val = make_intrusive(metric_opts_type); + record_val->Assign(prefix_idx, make_intrusive(prefix)); + record_val->Assign(name_idx, make_intrusive(name)); + record_val->Assign(help_text_idx, make_intrusive(helptext)); + record_val->Assign(unit_idx, make_intrusive(unit)); + record_val->Assign(is_total_idx, val_mgr->Bool(is_sum)); + + auto label_names_vec = make_intrusive(string_vec_type); + for ( const auto& lbl : labels ) + label_names_vec->Append(make_intrusive(lbl)); + + record_val->Assign(labels_idx, label_names_vec); + + record_val->Assign(metric_type_idx, zeek::BifType::Enum::Telemetry::MetricType->GetEnumVal(MetricType())); + + return record_val; +} + +bool MetricFamily::Matches(std::string_view prefix_pattern, std::string_view name_pattern) const noexcept { + return fnmatch(prefix_pattern.data(), prefix.c_str(), 0) != FNM_NOMATCH && + fnmatch(name_pattern.data(), name.c_str(), 0) != FNM_NOMATCH; +} + +prometheus::Labels MetricFamily::BuildPrometheusLabels(Span labels) { + prometheus::Labels p_labels; + + bool found_endpoint = false; + for ( const auto& lbl : labels ) { + p_labels.emplace(util::strreplace(std::string{lbl.first}, "-", "_"), lbl.second); + if ( lbl.first == "endpoint" ) + found_endpoint = true; + } + + if ( ! found_endpoint ) { + auto endpoint = id::find_val("Telemetry::metrics_export_endpoint_name")->AsStringVal(); + p_labels.emplace("endpoint", endpoint->ToStdString()); + } + + return p_labels; +} + +} // namespace zeek::telemetry diff --git a/src/telemetry/MetricFamily.h b/src/telemetry/MetricFamily.h index c68bec4b30..d7797512b0 100644 --- a/src/telemetry/MetricFamily.h +++ b/src/telemetry/MetricFamily.h @@ -7,8 +7,9 @@ #include #include "zeek/Span.h" +#include "zeek/Val.h" -#include "broker/telemetry/metric_family.hh" +#include "prometheus/labels.h" namespace zeek::telemetry { @@ -27,50 +28,92 @@ public: MetricFamily(const MetricFamily&) noexcept = default; MetricFamily& operator=(const MetricFamily&) noexcept = default; + virtual ~MetricFamily() = default; + /** * @return The prefix (namespace) this family belongs to. Builtin metrics * of Zeek return @c zeek. Custom metrics, e.g., created in a * script, may use a prefix that represents the application/script * or protocol (e.g. @c http) name. */ - std::string_view Prefix() const noexcept { return broker::telemetry::prefix(hdl); } + std::string_view Prefix() const noexcept { return prefix; } /** * @return The human-readable name of the metric, e.g., * @p open-connections. */ - std::string_view Name() const noexcept { return broker::telemetry::name(hdl); } + std::string_view Name() const noexcept { return name; } + + /** + * @return The complete name for the family including prefix. + */ + std::string FullName() const noexcept { return full_name; } /** * @return The names for all label dimensions. */ - Span LabelNames() const noexcept { return broker::telemetry::label_names(hdl); } + Span LabelNames() const noexcept { return labels; } /** * @return A short explanation of the metric. */ - std::string_view Helptext() const noexcept { return broker::telemetry::helptext(hdl); } + std::string_view Helptext() const noexcept { return helptext; } /** - * @return The unit of measurement, preferably a base unit such as - * @c bytes or @c seconds. Dimensionless counts return the - * pseudo-unit @c 1. + * @return The unit of measurement, preferably a base unit such as @c bytes + * or @c seconds. */ - std::string_view Unit() const noexcept { return broker::telemetry::unit(hdl); } + std::string_view Unit() const noexcept { return unit; } /** * @return Whether metrics of this family accumulate values, where only the * total value is of interest. For example, the total number of * HTTP requests. */ - bool IsSum() const noexcept { return broker::telemetry::is_sum(hdl); } + bool IsSum() const noexcept { return is_sum; } + + /** + * Converts the family data into script layer record. This record + * lazily-allocated and reused for each instrument associated with this + * family. + * + * @return A script layer Telemetry::Metric record for this family. + */ + RecordValPtr GetMetricOptsRecord() const; + + /** + * @return The type of this metric, defined as one of the values in the + * script-layer Telemetry::MetricType enum. + */ + virtual zeek_int_t MetricType() const noexcept = 0; + + /** + * @return Whether the prefix and name of this family matches the patterns + * provided. + */ + bool Matches(std::string_view prefix_pattern, std::string_view name_pattern) const noexcept; + + virtual std::vector Collect() const = 0; protected: - using Handle = broker::telemetry::metric_family_hdl*; + MetricFamily(std::string_view prefix, std::string_view name, Span lbls, + std::string_view helptext, std::string_view unit, bool is_sum = false); - explicit MetricFamily(Handle hdl) : hdl(hdl) {} + /** + * Builds a set of labels for prometheus based on a set of labels from + * Zeek. This adds an 'endpoint' label if it's missing from the set. + */ + static prometheus::Labels BuildPrometheusLabels(Span labels); - Handle hdl; + std::string prefix; + std::string name; + std::string full_name; + std::vector labels; + std::string helptext; + std::string unit; + bool is_sum = false; + + mutable RecordValPtr record_val; }; } // namespace zeek::telemetry diff --git a/src/telemetry/ProcessStats.cc b/src/telemetry/ProcessStats.cc new file mode 100644 index 0000000000..b5c3c48022 --- /dev/null +++ b/src/telemetry/ProcessStats.cc @@ -0,0 +1,204 @@ +#include "zeek/telemetry/ProcessStats.h" + +#include "zeek/util.h" + +#ifdef __APPLE__ + +#include +#include +#include +#include +#include +#include +#include + +namespace zeek::telemetry::detail { + +process_stats get_process_stats() { + process_stats result; + + // Fetch memory usage. + { + mach_task_basic_info info; + mach_msg_type_number_t count = MACH_TASK_BASIC_INFO_COUNT; + if ( task_info(mach_task_self(), MACH_TASK_BASIC_INFO, reinterpret_cast(&info), &count) == + KERN_SUCCESS ) { + result.rss = static_cast(info.resident_size); + result.vms = static_cast(info.virtual_size); + } + } + // Fetch CPU time. + { + task_thread_times_info info; + mach_msg_type_number_t count = TASK_THREAD_TIMES_INFO_COUNT; + if ( task_info(mach_task_self(), TASK_THREAD_TIMES_INFO, reinterpret_cast(&info), &count) == + KERN_SUCCESS ) { + // Round to milliseconds. + result.cpu += info.user_time.seconds; + result.cpu += ceil(info.user_time.microseconds / 1000.0) / 1000.0; + result.cpu += info.system_time.seconds; + result.cpu += ceil(info.system_time.microseconds / 1000.0) / 1000.0; + } + } + // Fetch open file handles. + { + // proc_pidinfo is undocumented, but this is what lsof also uses. + auto suggested_buf_size = proc_pidinfo(getpid(), PROC_PIDLISTFDS, 0, nullptr, 0); + if ( suggested_buf_size > 0 ) { + auto buf_size = suggested_buf_size; + auto buf = malloc(buf_size); // TODO: could be thread-local + auto res = proc_pidinfo(getpid(), PROC_PIDLISTFDS, 0, buf, buf_size); + free(buf); + if ( res > 0 ) + result.fds = static_cast(res / sizeof(proc_fdinfo)); + } + } + + return result; +} + +} // namespace zeek::telemetry::detail + +#elif defined(HAVE_LINUX) + +#include +std::atomic global_ticks_per_second; +std::atomic global_page_size; + +namespace zeek::telemetry::detail { + +/// Caches the result from a `sysconf` call in a cache variable to avoid +/// frequent syscalls. Sets `cache_var` to -1 in case of an error. Initially, +/// `cache_var` must be 0 and we assume a successful syscall would always return +/// some value > 0. If `cache_var` is > 0 then this function simply returns the +/// cached value directly. +bool load_system_setting(std::atomic& cache_var, long& var, int name, [[maybe_unused]] const char* pretty_name) { + var = cache_var.load(); + switch ( var ) { + case -1: return false; + case 0: + var = sysconf(name); + if ( var <= 0 ) { + var = -1; + cache_var = var; + return false; + } + else { + cache_var = var; + return true; + } + default: return true; + } +} + +#define TRY_LOAD(varname, confname) load_system_setting(global_##varname, varname, confname, #confname) + +process_stats get_process_stats() { + process_stats result; + + long ticks_per_second = 0; + long page_size = 0; + + if ( ! TRY_LOAD(ticks_per_second, _SC_CLK_TCK) || ! TRY_LOAD(page_size, _SC_PAGE_SIZE) ) + return result; + + if ( auto f = fopen("/proc/self/stat", "r") ) { + unsigned long utime_ticks = 0; + unsigned long stime_ticks = 0; + unsigned long vmsize_bytes = 0; + unsigned long rss_pages = 0; + + auto rd = fscanf(f, + "%*d " // 1. PID + "%*s " // 2. Executable + "%*c " // 3. State + "%*d " // 4. Parent PID + "%*d " // 5. Process group ID + "%*d " // 6. Session ID + "%*d " // 7. Controlling terminal + "%*d " // 8. Foreground process group ID + "%*u " // 9. Flags + "%*u " // 10. Number of minor faults + "%*u " // 11. Number of minor faults of waited-for children + "%*u " // 12. Number of major faults + "%*u " // 13. Number of major faults of waited-for children + "%lu " // 14. CPU user time in ticks + "%lu " // 15. CPU kernel time in ticks + "%*d " // 16. CPU user time of waited-for children + "%*d " // 17. CPU kernel time of waited-for children + "%*d " // 18. Priority + "%*d " // 19. Nice value + "%*d " // 20. Num threads + "%*d " // 21. Obsolete since 2.6 + "%*u " // 22. Time the process started after system boot + "%lu " // 23. Virtual memory size in bytes + "%ld", // 24. Resident set size in pages + &utime_ticks, &stime_ticks, &vmsize_bytes, &rss_pages); + fclose(f); + + if ( rd != 4 ) + return result; + + result.rss = rss_pages * page_size; + result.vms = vmsize_bytes; + result.cpu = static_cast(utime_ticks + stime_ticks) / ticks_per_second; + + zeek::filesystem::path fd_path{"/proc/self/fd"}; + result.fds = + std::distance(zeek::filesystem::directory_iterator{fd_path}, zeek::filesystem::directory_iterator{}); + } + + return result; +} + +} // namespace zeek::telemetry::detail + +#elif defined(__FreeBSD__) + +// Force these includes into a specific order so that the libraries can find +// all of the required types. +// clang-format off +#include +#include +#include +#include +#include +#include +#include +// clang-format on + +namespace zeek::telemetry::detail { + +process_stats get_process_stats() { + process_stats result; + + struct kinfo_proc* kp = kinfo_getproc(getpid()); + result.vms = kp->ki_size; + result.rss = kp->ki_rssize * getpagesize(); + result.cpu = static_cast(kp->ki_runtime) / 1000000.0; + + struct procstat* procstat = procstat_open_sysctl(); + struct filestat_list* files = procstat_getfiles(procstat, kp, 0); + struct filestat* file = nullptr; + + // Use one of the looping methods from sys/queue.h instead of + // implementing this by hand. + STAILQ_FOREACH(file, files, next) + result.fds++; + + procstat_freeprocs(procstat, kp); + procstat_close(procstat); + + return result; +} + +#else + +process_stats get_process_stats() { + process_stats result = {0}; + return result; +} + +} // namespace zeek::telemetry::detail + +#endif diff --git a/src/telemetry/ProcessStats.h b/src/telemetry/ProcessStats.h new file mode 100644 index 0000000000..02581362cc --- /dev/null +++ b/src/telemetry/ProcessStats.h @@ -0,0 +1,23 @@ +#pragma once + +#include "zeek/zeek-config.h" + +#include + +namespace zeek::telemetry::detail { + +struct process_stats { + int64_t rss = 0; + int64_t vms = 0; + double cpu = 0.0; + int64_t fds = 0; +}; + +#if defined(__APPLE__) || defined(HAVE_LINUX) || defined(__FreeBSD__) + +#define HAVE_PROCESS_STAT_METRICS +process_stats get_process_stats(); + +#endif + +} // namespace zeek::telemetry::detail diff --git a/src/telemetry/Timer.h b/src/telemetry/Timer.h index 4c947a8d73..02048325c0 100644 --- a/src/telemetry/Timer.h +++ b/src/telemetry/Timer.h @@ -15,7 +15,7 @@ class [[nodiscard]] Timer { public: using Clock = std::chrono::steady_clock; - explicit Timer(DblHistogram h) : h_(h) { start_ = Clock::now(); } + explicit Timer(std::shared_ptr h) : h_(std::move(h)) { start_ = Clock::now(); } Timer(const Timer&) = delete; @@ -30,14 +30,14 @@ public: auto Started() const noexcept { return start_; } /// Calls `h.Observe` with the time passed since `start`. - static void Observe(DblHistogram h, Clock::time_point start) { + static void Observe(const std::shared_ptr& h, Clock::time_point start) { using DblSec = std::chrono::duration; if ( auto end = Clock::now(); end > start ) - h.Observe(std::chrono::duration_cast(end - start).count()); + h->Observe(std::chrono::duration_cast(end - start).count()); } private: - DblHistogram h_; + std::shared_ptr h_; Clock::time_point start_; }; diff --git a/src/telemetry/telemetry.bif b/src/telemetry/telemetry.bif index 7b3b423a86..f46a1f39eb 100644 --- a/src/telemetry/telemetry.bif +++ b/src/telemetry/telemetry.bif @@ -12,6 +12,7 @@ enum MetricType %{ %} %%{ + #include "zeek/telemetry/Counter.h" #include "zeek/telemetry/Gauge.h" #include "zeek/telemetry/Histogram.h" @@ -112,7 +113,7 @@ function Telemetry::__int_counter_family%(prefix: string, name: string, labels: string_vec, helptext: string &default = "Zeek Script Metric", - unit: string &default = "1", + unit: string &default = "", is_sum: bool &default = F%): opaque of int_counter_metric_family %{ auto lbl_vec = sv_vec(labels->AsVectorVal()); @@ -129,9 +130,9 @@ function Telemetry::__int_counter_metric_get_or_add%(family: opaque of int_count { auto hdl = ptr->GetHandle(); auto lbl_map = sv_tbl(labels->AsTableVal()); - if ( is_valid(lbl_map, hdl.LabelNames()) ) + if ( is_valid(lbl_map, hdl->LabelNames()) ) { - auto res = hdl.GetOrAdd(lbl_map); + auto res = hdl->GetOrAdd(lbl_map); return zeek::make_intrusive(res); } else @@ -150,14 +151,14 @@ function Telemetry::__int_counter_metric_get_or_add%(family: opaque of int_count function Telemetry::__int_counter_inc%(val: opaque of int_counter_metric, amount: int &default = 1%): bool %{ - return with(val, "Telemetry::int_counter_inc: invalid handle.", [amount](auto hdl) { hdl.Inc(amount); }); + return with(val, "Telemetry::int_counter_inc: invalid handle.", [amount](auto hdl) { hdl->Inc(amount); }); %} function Telemetry::__int_counter_value%(val: opaque of int_counter_metric%): int %{ if ( auto ptr = dynamic_cast(val) ) { - return zeek::val_mgr->Int(ptr->GetHandle().Value()); + return zeek::val_mgr->Int(ptr->GetHandle()->Value()); } else { @@ -172,7 +173,7 @@ function Telemetry::__dbl_counter_family%(prefix: string, name: string, labels: string_vec, helptext: string &default = "Zeek Script Metric", - unit: string &default = "1", + unit: string &default = "", is_sum: bool &default = F%): opaque of dbl_counter_metric_family %{ auto lbl_vec = sv_vec(labels->AsVectorVal()); @@ -189,9 +190,9 @@ function Telemetry::__dbl_counter_metric_get_or_add%(family: opaque of dbl_count { auto hdl = ptr->GetHandle(); auto lbl_map = sv_tbl(labels->AsTableVal()); - if ( is_valid(lbl_map, hdl.LabelNames()) ) + if ( is_valid(lbl_map, hdl->LabelNames()) ) { - auto res = hdl.GetOrAdd(lbl_map); + auto res = hdl->GetOrAdd(lbl_map); return zeek::make_intrusive(res); } else @@ -210,14 +211,14 @@ function Telemetry::__dbl_counter_metric_get_or_add%(family: opaque of dbl_count function Telemetry::__dbl_counter_inc%(val: opaque of dbl_counter_metric, amount: double &default = 1.0%): bool %{ - return with(val, "Telemetry::dbl_counter_inc: invalid handle.", [amount](auto hdl) { hdl.Inc(amount); }); + return with(val, "Telemetry::dbl_counter_inc: invalid handle.", [amount](auto hdl) { hdl->Inc(amount); }); %} function Telemetry::__dbl_counter_value%(val: opaque of dbl_counter_metric%): double %{ if ( auto ptr = dynamic_cast(val) ) { - return zeek::make_intrusive(ptr->GetHandle().Value()); + return zeek::make_intrusive(ptr->GetHandle()->Value()); } else { @@ -232,7 +233,7 @@ function Telemetry::__int_gauge_family%(prefix: string, name: string, labels: string_vec, helptext: string &default = "Zeek Script Metric", - unit: string &default = "1", + unit: string &default = "", is_sum: bool &default = F%): opaque of int_gauge_metric_family %{ auto lbl_vec = sv_vec(labels->AsVectorVal()); @@ -249,9 +250,9 @@ function Telemetry::__int_gauge_metric_get_or_add%(family: opaque of int_gauge_m { auto hdl = ptr->GetHandle(); auto lbl_map = sv_tbl(labels->AsTableVal()); - if ( is_valid(lbl_map, hdl.LabelNames()) ) + if ( is_valid(lbl_map, hdl->LabelNames()) ) { - auto res = hdl.GetOrAdd(lbl_map); + auto res = hdl->GetOrAdd(lbl_map); return zeek::make_intrusive(res); } else @@ -270,20 +271,20 @@ function Telemetry::__int_gauge_metric_get_or_add%(family: opaque of int_gauge_m function Telemetry::__int_gauge_inc%(val: opaque of int_gauge_metric, amount: int &default = 1%): bool %{ - return with(val, "Telemetry::int_gauge_inc: invalid handle.", [amount](auto hdl) { hdl.Inc(amount); }); + return with(val, "Telemetry::int_gauge_inc: invalid handle.", [amount](auto hdl) { hdl->Inc(amount); }); %} function Telemetry::__int_gauge_dec%(val: opaque of int_gauge_metric, amount: int &default = 1%): bool %{ - return with(val, "Telemetry::int_gauge_dec: invalid handle.", [amount](auto hdl) { hdl.Dec(amount); }); + return with(val, "Telemetry::int_gauge_dec: invalid handle.", [amount](auto hdl) { hdl->Dec(amount); }); %} function Telemetry::__int_gauge_value%(val: opaque of int_gauge_metric%): int %{ if ( auto ptr = dynamic_cast(val) ) { - return zeek::val_mgr->Int(ptr->GetHandle().Value()); + return zeek::val_mgr->Int(ptr->GetHandle()->Value()); } else { @@ -298,7 +299,7 @@ function Telemetry::__dbl_gauge_family%(prefix: string, name: string, labels: string_vec, helptext: string &default = "Zeek Script Metric", - unit: string &default = "1", + unit: string &default = "", is_sum: bool &default = F%): opaque of dbl_gauge_metric_family %{ auto lbl_vec = sv_vec(labels->AsVectorVal()); @@ -315,9 +316,9 @@ function Telemetry::__dbl_gauge_metric_get_or_add%(family: opaque of dbl_gauge_m { auto hdl = ptr->GetHandle(); auto lbl_map = sv_tbl(labels->AsTableVal()); - if ( is_valid(lbl_map, hdl.LabelNames()) ) + if ( is_valid(lbl_map, hdl->LabelNames()) ) { - auto res = hdl.GetOrAdd(lbl_map); + auto res = hdl->GetOrAdd(lbl_map); return zeek::make_intrusive(res); } else @@ -336,20 +337,20 @@ function Telemetry::__dbl_gauge_metric_get_or_add%(family: opaque of dbl_gauge_m function Telemetry::__dbl_gauge_inc%(val: opaque of dbl_gauge_metric, amount: double &default = 1.0%): bool %{ - return with(val, "Telemetry::dbl_gauge_inc: invalid handle.", [amount](auto hdl) { hdl.Inc(amount); }); + return with(val, "Telemetry::dbl_gauge_inc: invalid handle.", [amount](auto hdl) { hdl->Inc(amount); }); %} function Telemetry::__dbl_gauge_dec%(val: opaque of dbl_gauge_metric, amount: double &default = 1.0%): bool %{ - return with(val, "Telemetry::dbl_gauge_dec: invalid handle.", [amount](auto hdl) { hdl.Dec(amount); }); + return with(val, "Telemetry::dbl_gauge_dec: invalid handle.", [amount](auto hdl) { hdl->Dec(amount); }); %} function Telemetry::__dbl_gauge_value%(val: opaque of dbl_gauge_metric%): double %{ if ( auto ptr = dynamic_cast(val) ) { - return zeek::make_intrusive(ptr->GetHandle().Value()); + return zeek::make_intrusive(ptr->GetHandle()->Value()); } else { @@ -365,14 +366,14 @@ function Telemetry::__int_histogram_family%(prefix: string, labels: string_vec, bounds: int_vec, helptext: string &default = "Zeek Script Metric", - unit: string &default = "1", + unit: string &default = "", is_sum: bool &default = F%): opaque of int_histogram_metric_family %{ auto lbl_vec = sv_vec(labels->AsVectorVal()); auto std_bounds = to_std_vec(bounds); auto hdl = telemetry_mgr->HistogramFamily(sv(prefix), sv(name), lbl_vec, std_bounds, sv(helptext), - sv(unit), is_sum); + sv(unit)); return zeek::make_intrusive(hdl); %} @@ -384,9 +385,9 @@ function Telemetry::__int_histogram_metric_get_or_add%(family: opaque of int_his { auto hdl = ptr->GetHandle(); auto lbl_map = sv_tbl(labels->AsTableVal()); - if ( is_valid(lbl_map, hdl.LabelNames()) ) + if ( is_valid(lbl_map, hdl->LabelNames()) ) { - auto res = hdl.GetOrAdd(lbl_map); + auto res = hdl->GetOrAdd(lbl_map); return zeek::make_intrusive(res); } else @@ -405,14 +406,14 @@ function Telemetry::__int_histogram_metric_get_or_add%(family: opaque of int_his function Telemetry::__int_histogram_observe%(val: opaque of int_histogram_metric, measurement: int%): bool %{ - return with(val, "Telemetry::int_histogram_inc: invalid handle.", [measurement](auto hdl) { hdl.Observe(measurement); }); + return with(val, "Telemetry::int_histogram_inc: invalid handle.", [measurement](auto hdl) { hdl->Observe(measurement); }); %} function Telemetry::__int_histogram_sum%(val: opaque of int_histogram_metric%): int %{ if ( auto ptr = dynamic_cast(val) ) { - return zeek::val_mgr->Int(ptr->GetHandle().Sum()); + return zeek::val_mgr->Int(ptr->GetHandle()->Sum()); } else { @@ -428,15 +429,14 @@ function Telemetry::__dbl_histogram_family%(prefix: string, labels: string_vec, bounds: double_vec, helptext: string &default = "Zeek Script Metric", - unit: string &default = "1", + unit: string &default = "", is_sum: bool &default = F%): opaque of dbl_histogram_metric_family %{ auto lbl_vec = sv_vec(labels->AsVectorVal()); auto std_bounds = to_std_vec(bounds); auto hdl = telemetry_mgr->HistogramFamily(sv(prefix), sv(name), lbl_vec, std_bounds, - sv(helptext), sv(unit), - is_sum); + sv(helptext), sv(unit)); return zeek::make_intrusive(hdl); %} @@ -448,9 +448,9 @@ function Telemetry::__dbl_histogram_metric_get_or_add%(family: opaque of dbl_his { auto hdl = ptr->GetHandle(); auto lbl_map = sv_tbl(labels->AsTableVal()); - if ( is_valid(lbl_map, hdl.LabelNames()) ) + if ( is_valid(lbl_map, hdl->LabelNames()) ) { - auto res = hdl.GetOrAdd(lbl_map); + auto res = hdl->GetOrAdd(lbl_map); return zeek::make_intrusive(res); } else @@ -469,14 +469,14 @@ function Telemetry::__dbl_histogram_metric_get_or_add%(family: opaque of dbl_his function Telemetry::__dbl_histogram_observe%(val: opaque of dbl_histogram_metric, measurement: double%): bool %{ - return with(val, "Telemetry::dbl_histogram_inc: invalid handle.", [measurement](auto hdl) { hdl.Observe(measurement); }); + return with(val, "Telemetry::dbl_histogram_inc: invalid handle.", [measurement](auto hdl) { hdl->Observe(measurement); }); %} function Telemetry::__dbl_histogram_sum%(val: opaque of dbl_histogram_metric%): double %{ if ( auto ptr = dynamic_cast(val) ) { - return zeek::make_intrusive(ptr->GetHandle().Sum()); + return zeek::make_intrusive(ptr->GetHandle()->Sum()); } else { @@ -487,26 +487,61 @@ function Telemetry::__dbl_histogram_sum%(val: opaque of dbl_histogram_metric%): function Telemetry::__collect_metrics%(prefix: string, name: string%): any_vec %{ - auto metrics = telemetry_mgr->CollectMetrics(sv(prefix), sv(name)); - - static auto metrics_vector_type = zeek::id::find_type("any_vec"); - auto vec = zeek::make_intrusive(metrics_vector_type); - - for ( const auto& m : metrics ) - vec->Append(m.AsMetricRecord()); - - return vec; + return telemetry_mgr->CollectMetrics(sv(prefix), sv(name)); %} function Telemetry::__collect_histogram_metrics%(prefix: string, name: string%): any_vec %{ - auto metrics = telemetry_mgr->CollectHistogramMetrics(sv(prefix), sv(name)); - - static auto metrics_vector_type = zeek::id::find_type("any_vec"); - auto vec = zeek::make_intrusive(metrics_vector_type); - - for ( const auto& m : metrics ) - vec->Append(m.AsHistogramMetricRecord()); - - return vec; + return telemetry_mgr->CollectHistogramMetrics(sv(prefix), sv(name)); %} + +function Telemetry::__set_metrics_export_interval%(value: interval%): bool + %{ + // This BIF may run prior to telemetry::Manager::InitPostScript. In this case, + // telemetry_mgr is still null but we can safely ignore this event because the + // Manager is going to initialize Telemetry using the most recent value of the + // corresponding option. + if ( telemetry_mgr ) + telemetry_mgr->SetMetricsExportInterval(value); + return zeek::val_mgr->True(); + %} + +function Telemetry::__set_metrics_export_topic%(value: string%): bool + %{ + if ( telemetry_mgr ) + telemetry_mgr->SetMetricsExportTopic(value->ToStdString()); + return zeek::val_mgr->True(); + %} + +function Telemetry::__set_metrics_import_topics%(filter: string_vec%): bool + %{ + if ( telemetry_mgr ) + { + std::vector slist; + auto* vval = filter->AsVectorVal(); + for ( unsigned index = 0; index < vval->Size(); ++index ) + slist.emplace_back(vval->StringValAt(index)->ToStdString()); + telemetry_mgr->SetMetricsImportTopics(std::move(slist)); + } + return zeek::val_mgr->True(); + %} + +function Telemetry::__set_metrics_export_endpoint_name%(value: string%): bool + %{ + if ( telemetry_mgr ) + telemetry_mgr->SetMetricsExportEndpointName(value->ToStdString()); + return zeek::val_mgr->True(); + %} + +function Telemetry::__set_metrics_export_prefixes%(filter: string_vec%): bool + %{ + if ( telemetry_mgr ) + { + std::vector slist; + auto* vval = filter->AsVectorVal(); + for ( unsigned index = 0; index < vval->Size(); ++index ) + slist.emplace_back(vval->StringValAt(index)->ToStdString()); + telemetry_mgr->SetMetricsExportPrefixes(std::move(slist)); + } + return zeek::val_mgr->True(); + %} \ No newline at end of file diff --git a/src/zeek-setup.cc b/src/zeek-setup.cc index cb1b019c74..bfae1ad005 100644 --- a/src/zeek-setup.cc +++ b/src/zeek-setup.cc @@ -811,7 +811,6 @@ SetupResult setup(int argc, char** argv, Options* zopts) { RecordType::InitPostScript(); - telemetry_mgr->InitPostScript(); iosource_mgr->InitPostScript(); log_mgr->InitPostScript(); plugin_mgr->InitPostScript(); @@ -820,6 +819,10 @@ SetupResult setup(int argc, char** argv, Options* zopts) { timer_mgr->InitPostScript(); event_mgr.InitPostScript(); + // telemetry_mgr has be initialized after broker manager since it might + // register for a topic and would fail to do so otherwise. + telemetry_mgr->InitPostScript(); + if ( supervisor_mgr ) supervisor_mgr->InitPostScript();