diff --git a/auxil/prometheus-cpp b/auxil/prometheus-cpp index 2fec7205d1..4649065e2a 160000 --- a/auxil/prometheus-cpp +++ b/auxil/prometheus-cpp @@ -1 +1 @@ -Subproject commit 2fec7205d1a9cb4829b86c943d599696d53de85c +Subproject commit 4649065e2a1dd21c81e41cd6007dce5486b77fc0 diff --git a/scripts/base/init-bare.zeek b/scripts/base/init-bare.zeek index 30b49def26..85a1dc0f20 100644 --- a/scripts/base/init-bare.zeek +++ b/scripts/base/init-bare.zeek @@ -5883,6 +5883,13 @@ export { type MetricVector : vector of Metric; type HistogramMetricVector : vector of HistogramMetric; + + ## Maximum amount of time for CivetWeb HTTP threads to + ## wait for metric callbacks to complete on the IO loop. + const callback_timeout: interval = 5sec &redef; + + ## Number of CivetWeb threads to use. + const civetweb_threads: count = 2 &redef; } module GLOBAL; diff --git a/src/telemetry/CMakeLists.txt b/src/telemetry/CMakeLists.txt index d61cbdb671..a760dcd13c 100644 --- a/src/telemetry/CMakeLists.txt +++ b/src/telemetry/CMakeLists.txt @@ -9,6 +9,7 @@ zeek_add_subdir_library( ProcessStats.cc Utils.cc BIFS + consts.bif telemetry.bif) # We don't need to include the civetweb headers across the whole project, only diff --git a/src/telemetry/Manager.cc b/src/telemetry/Manager.cc index 04c47ba3ef..6ae4e06ec6 100644 --- a/src/telemetry/Manager.cc +++ b/src/telemetry/Manager.cc @@ -6,6 +6,7 @@ // CivetServer is from the civetweb submodule in prometheus-cpp #include +#include #include #include #include @@ -16,19 +17,32 @@ #include "zeek/3rdparty/doctest.h" #include "zeek/ID.h" +#include "zeek/RunState.h" #include "zeek/ZeekString.h" #include "zeek/broker/Manager.h" +#include "zeek/iosource/Manager.h" #include "zeek/telemetry/ProcessStats.h" #include "zeek/telemetry/Timer.h" +#include "zeek/telemetry/consts.bif.h" #include "zeek/telemetry/telemetry.bif.h" #include "zeek/threading/formatters/detail/json.h" namespace zeek::telemetry { -Manager::Manager() { prometheus_registry = std::make_shared(); } +/** + * Prometheus Collectable interface used to insert Zeek callback processing + * before the Prometheus registry's collection of metric data. + */ +class ZeekCollectable : public prometheus::Collectable { +public: + std::vector Collect() const override { + telemetry_mgr->WaitForPrometheusCallbacks(); + return {}; + } +}; + +Manager::Manager() : IOSource(true) { prometheus_registry = std::make_shared(); } -// This can't be defined as =default because of the use of unique_ptr with a forward-declared type -// in Manager.h Manager::~Manager() {} void Manager::InitPostScript() { @@ -75,7 +89,9 @@ void Manager::InitPostScript() { if ( ! getenv("ZEEKCTL_CHECK_CONFIG") ) { try { - prometheus_exposer = std::make_unique(prometheus_url, 2, callbacks); + prometheus_exposer = + std::make_unique(prometheus_url, BifConst::Telemetry::civetweb_threads, + callbacks); // CivetWeb stores a copy of the callbacks, so we're safe to delete the pointer here delete callbacks; @@ -84,6 +100,13 @@ void Manager::InitPostScript() { prometheus_url.c_str()); } + // This has to be inserted before the registry below. The exposer + // processes the collectors in order of insertion. We want to make + // sure that the callbacks get called and the values in the metrics + // are updated before prometheus-cpp scrapes them. + zeek_collectable = std::make_shared(); + prometheus_exposer->RegisterCollectable(zeek_collectable); + prometheus_exposer->RegisterCollectable(prometheus_registry); } } @@ -130,6 +153,21 @@ void Manager::InitPostScript() { return metric; }); #endif + + iosource_mgr->RegisterFd(collector_flare.FD(), this); +} + +void Manager::Terminate() { + // Notify the collector condition so that it doesn't hang waiting for + // a collector request to complete. + collector_cv.notify_all(); + + // Shut down the exposer first of all so we stop getting requests for + // data. This keeps us from getting a request on another thread while + // we're shutting down. + prometheus_exposer.reset(); + + iosource_mgr->UnregisterFd(collector_flare.FD(), this); } // -- collect metric stuff ----------------------------------------------------- @@ -545,6 +583,39 @@ HistogramPtr Manager::HistogramInstance(std::string_view prefix, std::string_vie return HistogramInstance(prefix, name, lbls, bounds_span, helptext, unit); } +void Manager::ProcessFd(int fd, int flags) { + std::unique_lock lk(collector_cv_mtx); + + collector_flare.Extinguish(); + + prometheus_registry->UpdateViaCallbacks(); + collector_response_idx = collector_request_idx; + + lk.unlock(); + collector_cv.notify_all(); +} + +void Manager::WaitForPrometheusCallbacks() { + std::unique_lock lk(collector_cv_mtx); + + ++collector_request_idx; + uint64_t expected_idx = collector_request_idx; + collector_flare.Fire(); + + // It should *not* take 5 seconds to go through all of the callbacks, but + // set this to have a timeout anyways just to avoid a deadlock. + bool res = collector_cv.wait_for(lk, + std::chrono::microseconds( + static_cast(BifConst::Telemetry::callback_timeout * 1000000)), + [expected_idx]() { + return telemetry_mgr->collector_response_idx >= expected_idx || + zeek::run_state::terminating; + }); + + if ( ! res ) + fprintf(stderr, "Timeout waiting for prometheus callbacks\n"); +} + } // namespace zeek::telemetry // -- unit tests --------------------------------------------------------------- diff --git a/src/telemetry/Manager.h b/src/telemetry/Manager.h index c4c2537f1a..d967fe43c0 100644 --- a/src/telemetry/Manager.h +++ b/src/telemetry/Manager.h @@ -9,8 +9,10 @@ #include #include +#include "zeek/Flare.h" #include "zeek/IntrusivePtr.h" #include "zeek/Span.h" +#include "zeek/iosource/IOSource.h" #include "zeek/telemetry/Counter.h" #include "zeek/telemetry/Gauge.h" #include "zeek/telemetry/Histogram.h" @@ -29,15 +31,16 @@ class Registry; namespace zeek::telemetry { +class ZeekCollectable; + /** * Manages a collection of metric families. */ -class Manager final { +class Manager final : public iosource::IOSource { public: Manager(); Manager(const Manager&) = delete; - Manager& operator=(const Manager&) = delete; ~Manager(); @@ -50,6 +53,8 @@ public: */ void InitPostScript(); + void Terminate(); + /** * @return A VectorVal containing all counter and gauge metrics and their values matching prefix and name. * @param prefix The prefix pattern to use for filtering. Supports globbing. @@ -88,8 +93,8 @@ public: * @param labels Values for all label dimensions of the metric. * @param helptext Short explanation of the metric. * @param unit Unit of measurement. - * @param callback Passing a callback method will enable asynchronous mode. The callback method will be called by - * the metrics subsystem whenever data is requested. + * @param callback Passing a callback method will enable asynchronous mode. The callback method will be called + * by the metrics subsystem whenever data is requested. */ CounterPtr CounterInstance(std::string_view prefix, std::string_view name, Span labels, std::string_view helptext, std::string_view unit = "", @@ -124,8 +129,8 @@ public: * @param labels Values for all label dimensions of the metric. * @param helptext Short explanation of the metric. * @param unit Unit of measurement. - * @param callback Passing a callback method will enable asynchronous mode. The callback method will be called by - * the metrics subsystem whenever data is requested. + * @param callback Passing a callback method will enable asynchronous mode. The callback method will be called + * by the metrics subsystem whenever data is requested. */ GaugePtr GaugeInstance(std::string_view prefix, std::string_view name, Span labels, std::string_view helptext, std::string_view unit = "", @@ -212,6 +217,12 @@ public: */ std::shared_ptr GetRegistry() const { return prometheus_registry; } + // IOSource interface + double GetNextTimeout() override { return -1.0; } + void Process() override {} + const char* Tag() override { return "Telemetry::Manager"; } + void ProcessFd(int fd, int flags) override; + protected: template static auto WithLabelNames(Span xs, F continuation) { @@ -231,6 +242,15 @@ protected: } } + friend class ZeekCollectable; + + /** + * Fires the flare for prometheus-cpp callback handling and waits for it to complete. + * This can be called from other threads to ensure the callback handling happens on + * the main thread. + */ + void WaitForPrometheusCallbacks(); + private: RecordValPtr GetMetricOptsRecord(const prometheus::MetricFamily& metric_family); void BuildClusterJson(); @@ -250,6 +270,14 @@ private: std::unique_ptr prometheus_exposer; std::string cluster_json; + + std::shared_ptr zeek_collectable; + zeek::detail::Flare collector_flare; + std::condition_variable collector_cv; + std::mutex collector_cv_mtx; + // Only modified under collector_cv_mtx! + uint64_t collector_request_idx = 0; + uint64_t collector_response_idx = 0; }; } // namespace zeek::telemetry diff --git a/src/telemetry/consts.bif b/src/telemetry/consts.bif new file mode 100644 index 0000000000..76c256dfa1 --- /dev/null +++ b/src/telemetry/consts.bif @@ -0,0 +1,2 @@ +const Telemetry::callback_timeout: interval; +const Telemetry::civetweb_threads: count; diff --git a/src/zeek-setup.cc b/src/zeek-setup.cc index b05af74f20..3f5549b78a 100644 --- a/src/zeek-setup.cc +++ b/src/zeek-setup.cc @@ -376,6 +376,7 @@ static void terminate_zeek() { input_mgr->Terminate(); thread_mgr->Terminate(); broker_mgr->Terminate(); + telemetry_mgr->Terminate(); event_mgr.Drain(); @@ -716,6 +717,7 @@ SetupResult setup(int argc, char** argv, Options* zopts) { // when that variable is defined. auto early_shutdown = [] { broker_mgr->Terminate(); + telemetry_mgr->Terminate(); delete iosource_mgr; delete telemetry_mgr; }; diff --git a/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log b/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log index aee483157a..5466f0ea13 100644 --- a/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log +++ b/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log @@ -146,6 +146,7 @@ scripts/base/init-frameworks-and-bifs.zeek scripts/base/frameworks/files/magic/__load__.zeek scripts/base/frameworks/telemetry/options.zeek build/scripts/base/bif/__load__.zeek + build/scripts/base/bif/consts.bif.zeek build/scripts/base/bif/telemetry.bif.zeek build/scripts/base/bif/zeekygen.bif.zeek build/scripts/base/bif/pcap.bif.zeek diff --git a/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log b/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log index 40a1c5b84c..07dc32da6d 100644 --- a/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log +++ b/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log @@ -146,6 +146,7 @@ scripts/base/init-frameworks-and-bifs.zeek scripts/base/frameworks/files/magic/__load__.zeek scripts/base/frameworks/telemetry/options.zeek build/scripts/base/bif/__load__.zeek + build/scripts/base/bif/consts.bif.zeek build/scripts/base/bif/telemetry.bif.zeek build/scripts/base/bif/zeekygen.bif.zeek build/scripts/base/bif/pcap.bif.zeek diff --git a/testing/btest/Baseline/plugins.hooks/output b/testing/btest/Baseline/plugins.hooks/output index c3b551dc84..2514d98015 100644 --- a/testing/btest/Baseline/plugins.hooks/output +++ b/testing/btest/Baseline/plugins.hooks/output @@ -464,6 +464,7 @@ 0.000000 MetaHookPost LoadFile(0, ./comm.bif.zeek, <...>/comm.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./communityid.bif.zeek, <...>/communityid.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./const.bif.zeek, <...>/const.bif.zeek) -> -1 +0.000000 MetaHookPost LoadFile(0, ./consts.bif.zeek, <...>/consts.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./contents, <...>/contents.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./control, <...>/control.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./data.bif.zeek, <...>/data.bif.zeek) -> -1 @@ -758,6 +759,7 @@ 0.000000 MetaHookPost LoadFileExtended(0, ./comm.bif.zeek, <...>/comm.bif.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./communityid.bif.zeek, <...>/communityid.bif.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./const.bif.zeek, <...>/const.bif.zeek) -> (-1, ) +0.000000 MetaHookPost LoadFileExtended(0, ./consts.bif.zeek, <...>/consts.bif.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./contents, <...>/contents.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./control, <...>/control.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./data.bif.zeek, <...>/data.bif.zeek) -> (-1, ) @@ -1384,6 +1386,7 @@ 0.000000 MetaHookPre LoadFile(0, ./comm.bif.zeek, <...>/comm.bif.zeek) 0.000000 MetaHookPre LoadFile(0, ./communityid.bif.zeek, <...>/communityid.bif.zeek) 0.000000 MetaHookPre LoadFile(0, ./const.bif.zeek, <...>/const.bif.zeek) +0.000000 MetaHookPre LoadFile(0, ./consts.bif.zeek, <...>/consts.bif.zeek) 0.000000 MetaHookPre LoadFile(0, ./contents, <...>/contents.zeek) 0.000000 MetaHookPre LoadFile(0, ./control, <...>/control.zeek) 0.000000 MetaHookPre LoadFile(0, ./data.bif.zeek, <...>/data.bif.zeek) @@ -1678,6 +1681,7 @@ 0.000000 MetaHookPre LoadFileExtended(0, ./comm.bif.zeek, <...>/comm.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./communityid.bif.zeek, <...>/communityid.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./const.bif.zeek, <...>/const.bif.zeek) +0.000000 MetaHookPre LoadFileExtended(0, ./consts.bif.zeek, <...>/consts.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./contents, <...>/contents.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./control, <...>/control.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./data.bif.zeek, <...>/data.bif.zeek) @@ -2305,6 +2309,7 @@ 0.000000 | HookLoadFile ./comm.bif.zeek <...>/comm.bif.zeek 0.000000 | HookLoadFile ./communityid.bif.zeek <...>/communityid.bif.zeek 0.000000 | HookLoadFile ./const.bif.zeek <...>/const.bif.zeek +0.000000 | HookLoadFile ./consts.bif.zeek <...>/consts.bif.zeek 0.000000 | HookLoadFile ./contents <...>/contents.zeek 0.000000 | HookLoadFile ./control <...>/control.zeek 0.000000 | HookLoadFile ./data.bif.zeek <...>/data.bif.zeek @@ -2599,6 +2604,7 @@ 0.000000 | HookLoadFileExtended ./comm.bif.zeek <...>/comm.bif.zeek 0.000000 | HookLoadFileExtended ./communityid.bif.zeek <...>/communityid.bif.zeek 0.000000 | HookLoadFileExtended ./const.bif.zeek <...>/const.bif.zeek +0.000000 | HookLoadFileExtended ./consts.bif.zeek <...>/consts.bif.zeek 0.000000 | HookLoadFileExtended ./contents <...>/contents.zeek 0.000000 | HookLoadFileExtended ./control <...>/control.zeek 0.000000 | HookLoadFileExtended ./data.bif.zeek <...>/data.bif.zeek