From 40f60f26b36fde9fd050f2cef28a8cdf096a81fc Mon Sep 17 00:00:00 2001 From: Tim Wojtulewicz Date: Fri, 7 Feb 2025 16:55:40 -0700 Subject: [PATCH] Run expiration on a separate thread --- scripts/base/init-bare.zeek | 5 +++-- src/3rdparty | 2 +- src/storage/Manager.cc | 40 +++++++++++++++++++++++++++++++------ src/storage/Manager.h | 7 ++++++- 4 files changed, 44 insertions(+), 10 deletions(-) diff --git a/scripts/base/init-bare.zeek b/scripts/base/init-bare.zeek index a94d29e006..625415586f 100644 --- a/scripts/base/init-bare.zeek +++ b/scripts/base/init-bare.zeek @@ -6222,8 +6222,9 @@ module Storage; export { ## The interval used by the storage framework for automatic expiration - ## of elements in all backends that don't support it natively. - const expire_interval = 5.0 secs &redef; + ## of elements in all backends that don't support it natively, or if + ## using expiration while reading pcap files. + const expire_interval = 15.0 secs &redef; } module GLOBAL; diff --git a/src/3rdparty b/src/3rdparty index 0d15c8be14..059a4a369f 160000 --- a/src/3rdparty +++ b/src/3rdparty @@ -1 +1 @@ -Subproject commit 0d15c8be14851914c8fd2d378bb700dae7e7b991 +Subproject commit 059a4a369f2a52d8013f0645b69e1bf2194e97c6 diff --git a/src/storage/Manager.cc b/src/storage/Manager.cc index 53afa52ebb..18f3c9cb35 100644 --- a/src/storage/Manager.cc +++ b/src/storage/Manager.cc @@ -2,17 +2,31 @@ #include "zeek/storage/Manager.h" +#include + #include "zeek/Desc.h" +#include "zeek/RunState.h" #include "const.bif.netvar_h" +std::atomic_flag expire_running; + namespace zeek::storage { void detail::ExpirationTimer::Dispatch(double t, bool is_expire) { if ( is_expire ) return; - storage_mgr->Expire(); + // If there isn't an active thread, spin up a new one. Expiration may take + // some time to complete and we want it to get all the way done before we + // start another one running. If this causes us to skip a cycle, that's not + // a big deal as the next cycle will catch anything that should be expired + // in the interim. + if ( ! expire_running.test_and_set() ) { + DBG_LOG(DBG_STORAGE, "Starting new expiration thread"); + storage_mgr->expiration_thread = std::jthread([]() { storage_mgr->Expire(); }); + } + storage_mgr->StartExpirationTimer(); } @@ -62,7 +76,9 @@ ErrorResult Manager::OpenBackend(BackendPtr backend, RecordValPtr options, TypeP } ErrorResult Manager::CloseBackend(BackendPtr backend, ErrorResultCallback* cb) { - // Remove from the list always, even if the close may fail below and even in an async context. + // Expiration runs on a separate thread and loops over the vector of backends. The mutex + // here ensures exclusive access. This one happens in a block because we can remove the + // backend from the vector before actually closing it. { std::unique_lock lk(backends_mtx); auto it = std::find(backends.begin(), backends.end(), backend); @@ -80,22 +96,34 @@ ErrorResult Manager::CloseBackend(BackendPtr backend, ErrorResultCallback* cb) { } void Manager::Expire() { - DBG_LOG(DBG_STORAGE, "Expire running, have %zu backends to check", backends.size()); + // Expiration runs on a separate thread and loops over the vector of backends. The mutex + // here ensures exclusive access. std::unique_lock lk(backends_mtx); - for ( const auto& b : backends ) { - if ( b->IsOpen() ) - b->Expire(); + + DBG_LOG(DBG_STORAGE, "Expiration running, have %zu backends to check", backends.size()); + + for ( auto it = backends.begin(); it != backends.end() && ! run_state::terminating; ++it ) { + if ( (*it)->IsOpen() ) + (*it)->Expire(); } + + expire_running.clear(); } void Manager::StartExpirationTimer() { zeek::detail::timer_mgr->Add( new detail::ExpirationTimer(run_state::network_time + zeek::BifConst::Storage::expire_interval)); + DBG_LOG(DBG_STORAGE, "Next expiration check at %f", + run_state::network_time + zeek::BifConst::Storage::expire_interval); } void Manager::RegisterBackend(BackendPtr backend) { + // Expiration runs on a separate thread and loops over the vector of backends. The mutex + // here ensures exclusive access. std::unique_lock lk(backends_mtx); + backends.push_back(std::move(backend)); + DBG_LOG(DBG_STORAGE, "Registered backends: %zu", backends.size()); } } // namespace zeek::storage diff --git a/src/storage/Manager.h b/src/storage/Manager.h index 2974f218ad..2f240e0486 100644 --- a/src/storage/Manager.h +++ b/src/storage/Manager.h @@ -3,7 +3,9 @@ #pragma once #include +#include +#include "zeek/3rdparty/jthread.hpp" #include "zeek/Timer.h" #include "zeek/plugin/ComponentManager.h" #include "zeek/storage/Backend.h" @@ -65,10 +67,13 @@ public: */ ErrorResult CloseBackend(BackendPtr backend, ErrorResultCallback* cb = nullptr); + void Expire(); + protected: friend class storage::detail::ExpirationTimer; - void Expire(); + void RunExpireThread(); void StartExpirationTimer(); + std::jthread expiration_thread; friend class storage::OpenResultCallback; void RegisterBackend(BackendPtr backend);