Process metric callbacks from the main-loop thread

This avoids the callbacks from being processed on the worker thread
spawned by Civetweb. It fixes data race issues with lookups involving
global variables, amongst other threading issues.
This commit is contained in:
Tim Wojtulewicz 2024-07-26 11:12:54 -07:00
parent 9d9cc51e9d
commit 7ac7ce1d2b
10 changed files with 130 additions and 11 deletions

View file

@ -9,8 +9,10 @@
#include <string_view>
#include <vector>
#include "zeek/Flare.h"
#include "zeek/IntrusivePtr.h"
#include "zeek/Span.h"
#include "zeek/iosource/IOSource.h"
#include "zeek/telemetry/Counter.h"
#include "zeek/telemetry/Gauge.h"
#include "zeek/telemetry/Histogram.h"
@ -29,15 +31,16 @@ class Registry;
namespace zeek::telemetry {
class ZeekCollectable;
/**
* Manages a collection of metric families.
*/
class Manager final {
class Manager final : public iosource::IOSource {
public:
Manager();
Manager(const Manager&) = delete;
Manager& operator=(const Manager&) = delete;
~Manager();
@ -50,6 +53,8 @@ public:
*/
void InitPostScript();
void Terminate();
/**
* @return A VectorVal containing all counter and gauge metrics and their values matching prefix and name.
* @param prefix The prefix pattern to use for filtering. Supports globbing.
@ -88,8 +93,8 @@ public:
* @param labels Values for all label dimensions of the metric.
* @param helptext Short explanation of the metric.
* @param unit Unit of measurement.
* @param callback Passing a callback method will enable asynchronous mode. The callback method will be called by
* the metrics subsystem whenever data is requested.
* @param callback Passing a callback method will enable asynchronous mode. The callback method will be called
* by the metrics subsystem whenever data is requested.
*/
CounterPtr CounterInstance(std::string_view prefix, std::string_view name, Span<const LabelView> labels,
std::string_view helptext, std::string_view unit = "",
@ -124,8 +129,8 @@ public:
* @param labels Values for all label dimensions of the metric.
* @param helptext Short explanation of the metric.
* @param unit Unit of measurement.
* @param callback Passing a callback method will enable asynchronous mode. The callback method will be called by
* the metrics subsystem whenever data is requested.
* @param callback Passing a callback method will enable asynchronous mode. The callback method will be called
* by the metrics subsystem whenever data is requested.
*/
GaugePtr GaugeInstance(std::string_view prefix, std::string_view name, Span<const LabelView> labels,
std::string_view helptext, std::string_view unit = "",
@ -212,6 +217,12 @@ public:
*/
std::shared_ptr<prometheus::Registry> GetRegistry() const { return prometheus_registry; }
// IOSource interface
double GetNextTimeout() override { return -1.0; }
void Process() override {}
const char* Tag() override { return "Telemetry::Manager"; }
void ProcessFd(int fd, int flags) override;
protected:
template<class F>
static auto WithLabelNames(Span<const LabelView> xs, F continuation) {
@ -231,6 +242,15 @@ protected:
}
}
friend class ZeekCollectable;
/**
* Fires the flare for prometheus-cpp callback handling and waits for it to complete.
* This can be called from other threads to ensure the callback handling happens on
* the main thread.
*/
void WaitForPrometheusCallbacks();
private:
RecordValPtr GetMetricOptsRecord(const prometheus::MetricFamily& metric_family);
void BuildClusterJson();
@ -250,6 +270,14 @@ private:
std::unique_ptr<prometheus::Exposer> prometheus_exposer;
std::string cluster_json;
std::shared_ptr<ZeekCollectable> zeek_collectable;
zeek::detail::Flare collector_flare;
std::condition_variable collector_cv;
std::mutex collector_cv_mtx;
// Only modified under collector_cv_mtx!
uint64_t collector_request_idx = 0;
uint64_t collector_response_idx = 0;
};
} // namespace zeek::telemetry