Merge remote-tracking branch 'origin/topic/timw/telemetry-threading'

* origin/topic/timw/telemetry-threading:
  Process metric callbacks from the main-loop thread

(cherry picked from commit 3c3853dc7d)
This commit is contained in:
Tim Wojtulewicz 2024-08-02 15:49:40 -07:00
parent 056bbe04ea
commit dd4597865a
10 changed files with 130 additions and 11 deletions

@ -1 +1 @@
Subproject commit 2fec7205d1a9cb4829b86c943d599696d53de85c
Subproject commit 4649065e2a1dd21c81e41cd6007dce5486b77fc0

View file

@ -5883,6 +5883,13 @@ export {
type MetricVector : vector of Metric;
type HistogramMetricVector : vector of HistogramMetric;
## Maximum amount of time for CivetWeb HTTP threads to
## wait for metric callbacks to complete on the IO loop.
const callback_timeout: interval = 5sec &redef;
## Number of CivetWeb threads to use.
const civetweb_threads: count = 2 &redef;
}
module GLOBAL;

View file

@ -9,6 +9,7 @@ zeek_add_subdir_library(
ProcessStats.cc
Utils.cc
BIFS
consts.bif
telemetry.bif)
# We don't need to include the civetweb headers across the whole project, only

View file

@ -6,6 +6,7 @@
// CivetServer is from the civetweb submodule in prometheus-cpp
#include <CivetServer.h>
#include <prometheus/collectable.h>
#include <prometheus/exposer.h>
#include <prometheus/registry.h>
#include <rapidjson/document.h>
@ -16,19 +17,32 @@
#include "zeek/3rdparty/doctest.h"
#include "zeek/ID.h"
#include "zeek/RunState.h"
#include "zeek/ZeekString.h"
#include "zeek/broker/Manager.h"
#include "zeek/iosource/Manager.h"
#include "zeek/telemetry/ProcessStats.h"
#include "zeek/telemetry/Timer.h"
#include "zeek/telemetry/consts.bif.h"
#include "zeek/telemetry/telemetry.bif.h"
#include "zeek/threading/formatters/detail/json.h"
namespace zeek::telemetry {
Manager::Manager() { prometheus_registry = std::make_shared<prometheus::Registry>(); }
/**
* Prometheus Collectable interface used to insert Zeek callback processing
* before the Prometheus registry's collection of metric data.
*/
class ZeekCollectable : public prometheus::Collectable {
public:
std::vector<prometheus::MetricFamily> Collect() const override {
telemetry_mgr->WaitForPrometheusCallbacks();
return {};
}
};
Manager::Manager() : IOSource(true) { prometheus_registry = std::make_shared<prometheus::Registry>(); }
// This can't be defined as =default because of the use of unique_ptr with a forward-declared type
// in Manager.h
Manager::~Manager() {}
void Manager::InitPostScript() {
@ -75,7 +89,9 @@ void Manager::InitPostScript() {
if ( ! getenv("ZEEKCTL_CHECK_CONFIG") ) {
try {
prometheus_exposer = std::make_unique<prometheus::Exposer>(prometheus_url, 2, callbacks);
prometheus_exposer =
std::make_unique<prometheus::Exposer>(prometheus_url, BifConst::Telemetry::civetweb_threads,
callbacks);
// CivetWeb stores a copy of the callbacks, so we're safe to delete the pointer here
delete callbacks;
@ -84,6 +100,13 @@ void Manager::InitPostScript() {
prometheus_url.c_str());
}
// This has to be inserted before the registry below. The exposer
// processes the collectors in order of insertion. We want to make
// sure that the callbacks get called and the values in the metrics
// are updated before prometheus-cpp scrapes them.
zeek_collectable = std::make_shared<ZeekCollectable>();
prometheus_exposer->RegisterCollectable(zeek_collectable);
prometheus_exposer->RegisterCollectable(prometheus_registry);
}
}
@ -130,6 +153,21 @@ void Manager::InitPostScript() {
return metric;
});
#endif
iosource_mgr->RegisterFd(collector_flare.FD(), this);
}
void Manager::Terminate() {
// Notify the collector condition so that it doesn't hang waiting for
// a collector request to complete.
collector_cv.notify_all();
// Shut down the exposer first of all so we stop getting requests for
// data. This keeps us from getting a request on another thread while
// we're shutting down.
prometheus_exposer.reset();
iosource_mgr->UnregisterFd(collector_flare.FD(), this);
}
// -- collect metric stuff -----------------------------------------------------
@ -545,6 +583,39 @@ HistogramPtr Manager::HistogramInstance(std::string_view prefix, std::string_vie
return HistogramInstance(prefix, name, lbls, bounds_span, helptext, unit);
}
void Manager::ProcessFd(int fd, int flags) {
std::unique_lock<std::mutex> lk(collector_cv_mtx);
collector_flare.Extinguish();
prometheus_registry->UpdateViaCallbacks();
collector_response_idx = collector_request_idx;
lk.unlock();
collector_cv.notify_all();
}
void Manager::WaitForPrometheusCallbacks() {
std::unique_lock<std::mutex> lk(collector_cv_mtx);
++collector_request_idx;
uint64_t expected_idx = collector_request_idx;
collector_flare.Fire();
// It should *not* take 5 seconds to go through all of the callbacks, but
// set this to have a timeout anyways just to avoid a deadlock.
bool res = collector_cv.wait_for(lk,
std::chrono::microseconds(
static_cast<long>(BifConst::Telemetry::callback_timeout * 1000000)),
[expected_idx]() {
return telemetry_mgr->collector_response_idx >= expected_idx ||
zeek::run_state::terminating;
});
if ( ! res )
fprintf(stderr, "Timeout waiting for prometheus callbacks\n");
}
} // namespace zeek::telemetry
// -- unit tests ---------------------------------------------------------------

View file

@ -9,8 +9,10 @@
#include <string_view>
#include <vector>
#include "zeek/Flare.h"
#include "zeek/IntrusivePtr.h"
#include "zeek/Span.h"
#include "zeek/iosource/IOSource.h"
#include "zeek/telemetry/Counter.h"
#include "zeek/telemetry/Gauge.h"
#include "zeek/telemetry/Histogram.h"
@ -29,15 +31,16 @@ class Registry;
namespace zeek::telemetry {
class ZeekCollectable;
/**
* Manages a collection of metric families.
*/
class Manager final {
class Manager final : public iosource::IOSource {
public:
Manager();
Manager(const Manager&) = delete;
Manager& operator=(const Manager&) = delete;
~Manager();
@ -50,6 +53,8 @@ public:
*/
void InitPostScript();
void Terminate();
/**
* @return A VectorVal containing all counter and gauge metrics and their values matching prefix and name.
* @param prefix The prefix pattern to use for filtering. Supports globbing.
@ -88,8 +93,8 @@ public:
* @param labels Values for all label dimensions of the metric.
* @param helptext Short explanation of the metric.
* @param unit Unit of measurement.
* @param callback Passing a callback method will enable asynchronous mode. The callback method will be called by
* the metrics subsystem whenever data is requested.
* @param callback Passing a callback method will enable asynchronous mode. The callback method will be called
* by the metrics subsystem whenever data is requested.
*/
CounterPtr CounterInstance(std::string_view prefix, std::string_view name, Span<const LabelView> labels,
std::string_view helptext, std::string_view unit = "",
@ -124,8 +129,8 @@ public:
* @param labels Values for all label dimensions of the metric.
* @param helptext Short explanation of the metric.
* @param unit Unit of measurement.
* @param callback Passing a callback method will enable asynchronous mode. The callback method will be called by
* the metrics subsystem whenever data is requested.
* @param callback Passing a callback method will enable asynchronous mode. The callback method will be called
* by the metrics subsystem whenever data is requested.
*/
GaugePtr GaugeInstance(std::string_view prefix, std::string_view name, Span<const LabelView> labels,
std::string_view helptext, std::string_view unit = "",
@ -212,6 +217,12 @@ public:
*/
std::shared_ptr<prometheus::Registry> GetRegistry() const { return prometheus_registry; }
// IOSource interface
double GetNextTimeout() override { return -1.0; }
void Process() override {}
const char* Tag() override { return "Telemetry::Manager"; }
void ProcessFd(int fd, int flags) override;
protected:
template<class F>
static auto WithLabelNames(Span<const LabelView> xs, F continuation) {
@ -231,6 +242,15 @@ protected:
}
}
friend class ZeekCollectable;
/**
* Fires the flare for prometheus-cpp callback handling and waits for it to complete.
* This can be called from other threads to ensure the callback handling happens on
* the main thread.
*/
void WaitForPrometheusCallbacks();
private:
RecordValPtr GetMetricOptsRecord(const prometheus::MetricFamily& metric_family);
void BuildClusterJson();
@ -250,6 +270,14 @@ private:
std::unique_ptr<prometheus::Exposer> prometheus_exposer;
std::string cluster_json;
std::shared_ptr<ZeekCollectable> zeek_collectable;
zeek::detail::Flare collector_flare;
std::condition_variable collector_cv;
std::mutex collector_cv_mtx;
// Only modified under collector_cv_mtx!
uint64_t collector_request_idx = 0;
uint64_t collector_response_idx = 0;
};
} // namespace zeek::telemetry

2
src/telemetry/consts.bif Normal file
View file

@ -0,0 +1,2 @@
const Telemetry::callback_timeout: interval;
const Telemetry::civetweb_threads: count;

View file

@ -376,6 +376,7 @@ static void terminate_zeek() {
input_mgr->Terminate();
thread_mgr->Terminate();
broker_mgr->Terminate();
telemetry_mgr->Terminate();
event_mgr.Drain();
@ -716,6 +717,7 @@ SetupResult setup(int argc, char** argv, Options* zopts) {
// when that variable is defined.
auto early_shutdown = [] {
broker_mgr->Terminate();
telemetry_mgr->Terminate();
delete iosource_mgr;
delete telemetry_mgr;
};

View file

@ -146,6 +146,7 @@ scripts/base/init-frameworks-and-bifs.zeek
scripts/base/frameworks/files/magic/__load__.zeek
scripts/base/frameworks/telemetry/options.zeek
build/scripts/base/bif/__load__.zeek
build/scripts/base/bif/consts.bif.zeek
build/scripts/base/bif/telemetry.bif.zeek
build/scripts/base/bif/zeekygen.bif.zeek
build/scripts/base/bif/pcap.bif.zeek

View file

@ -146,6 +146,7 @@ scripts/base/init-frameworks-and-bifs.zeek
scripts/base/frameworks/files/magic/__load__.zeek
scripts/base/frameworks/telemetry/options.zeek
build/scripts/base/bif/__load__.zeek
build/scripts/base/bif/consts.bif.zeek
build/scripts/base/bif/telemetry.bif.zeek
build/scripts/base/bif/zeekygen.bif.zeek
build/scripts/base/bif/pcap.bif.zeek

View file

@ -464,6 +464,7 @@
0.000000 MetaHookPost LoadFile(0, ./comm.bif.zeek, <...>/comm.bif.zeek) -> -1
0.000000 MetaHookPost LoadFile(0, ./communityid.bif.zeek, <...>/communityid.bif.zeek) -> -1
0.000000 MetaHookPost LoadFile(0, ./const.bif.zeek, <...>/const.bif.zeek) -> -1
0.000000 MetaHookPost LoadFile(0, ./consts.bif.zeek, <...>/consts.bif.zeek) -> -1
0.000000 MetaHookPost LoadFile(0, ./contents, <...>/contents.zeek) -> -1
0.000000 MetaHookPost LoadFile(0, ./control, <...>/control.zeek) -> -1
0.000000 MetaHookPost LoadFile(0, ./data.bif.zeek, <...>/data.bif.zeek) -> -1
@ -758,6 +759,7 @@
0.000000 MetaHookPost LoadFileExtended(0, ./comm.bif.zeek, <...>/comm.bif.zeek) -> (-1, <no content>)
0.000000 MetaHookPost LoadFileExtended(0, ./communityid.bif.zeek, <...>/communityid.bif.zeek) -> (-1, <no content>)
0.000000 MetaHookPost LoadFileExtended(0, ./const.bif.zeek, <...>/const.bif.zeek) -> (-1, <no content>)
0.000000 MetaHookPost LoadFileExtended(0, ./consts.bif.zeek, <...>/consts.bif.zeek) -> (-1, <no content>)
0.000000 MetaHookPost LoadFileExtended(0, ./contents, <...>/contents.zeek) -> (-1, <no content>)
0.000000 MetaHookPost LoadFileExtended(0, ./control, <...>/control.zeek) -> (-1, <no content>)
0.000000 MetaHookPost LoadFileExtended(0, ./data.bif.zeek, <...>/data.bif.zeek) -> (-1, <no content>)
@ -1384,6 +1386,7 @@
0.000000 MetaHookPre LoadFile(0, ./comm.bif.zeek, <...>/comm.bif.zeek)
0.000000 MetaHookPre LoadFile(0, ./communityid.bif.zeek, <...>/communityid.bif.zeek)
0.000000 MetaHookPre LoadFile(0, ./const.bif.zeek, <...>/const.bif.zeek)
0.000000 MetaHookPre LoadFile(0, ./consts.bif.zeek, <...>/consts.bif.zeek)
0.000000 MetaHookPre LoadFile(0, ./contents, <...>/contents.zeek)
0.000000 MetaHookPre LoadFile(0, ./control, <...>/control.zeek)
0.000000 MetaHookPre LoadFile(0, ./data.bif.zeek, <...>/data.bif.zeek)
@ -1678,6 +1681,7 @@
0.000000 MetaHookPre LoadFileExtended(0, ./comm.bif.zeek, <...>/comm.bif.zeek)
0.000000 MetaHookPre LoadFileExtended(0, ./communityid.bif.zeek, <...>/communityid.bif.zeek)
0.000000 MetaHookPre LoadFileExtended(0, ./const.bif.zeek, <...>/const.bif.zeek)
0.000000 MetaHookPre LoadFileExtended(0, ./consts.bif.zeek, <...>/consts.bif.zeek)
0.000000 MetaHookPre LoadFileExtended(0, ./contents, <...>/contents.zeek)
0.000000 MetaHookPre LoadFileExtended(0, ./control, <...>/control.zeek)
0.000000 MetaHookPre LoadFileExtended(0, ./data.bif.zeek, <...>/data.bif.zeek)
@ -2305,6 +2309,7 @@
0.000000 | HookLoadFile ./comm.bif.zeek <...>/comm.bif.zeek
0.000000 | HookLoadFile ./communityid.bif.zeek <...>/communityid.bif.zeek
0.000000 | HookLoadFile ./const.bif.zeek <...>/const.bif.zeek
0.000000 | HookLoadFile ./consts.bif.zeek <...>/consts.bif.zeek
0.000000 | HookLoadFile ./contents <...>/contents.zeek
0.000000 | HookLoadFile ./control <...>/control.zeek
0.000000 | HookLoadFile ./data.bif.zeek <...>/data.bif.zeek
@ -2599,6 +2604,7 @@
0.000000 | HookLoadFileExtended ./comm.bif.zeek <...>/comm.bif.zeek
0.000000 | HookLoadFileExtended ./communityid.bif.zeek <...>/communityid.bif.zeek
0.000000 | HookLoadFileExtended ./const.bif.zeek <...>/const.bif.zeek
0.000000 | HookLoadFileExtended ./consts.bif.zeek <...>/consts.bif.zeek
0.000000 | HookLoadFileExtended ./contents <...>/contents.zeek
0.000000 | HookLoadFileExtended ./control <...>/control.zeek
0.000000 | HookLoadFileExtended ./data.bif.zeek <...>/data.bif.zeek