Run expiration on a separate thread

This commit is contained in:
Tim Wojtulewicz 2025-02-07 16:55:40 -07:00
parent cad48cebd4
commit 40f60f26b3
4 changed files with 44 additions and 10 deletions

View file

@ -6222,8 +6222,9 @@ module Storage;
export {
## The interval used by the storage framework for automatic expiration
## of elements in all backends that don't support it natively.
const expire_interval = 5.0 secs &redef;
## of elements in all backends that don't support it natively, or if
## using expiration while reading pcap files.
const expire_interval = 15.0 secs &redef;
}
module GLOBAL;

@ -1 +1 @@
Subproject commit 0d15c8be14851914c8fd2d378bb700dae7e7b991
Subproject commit 059a4a369f2a52d8013f0645b69e1bf2194e97c6

View file

@ -2,17 +2,31 @@
#include "zeek/storage/Manager.h"
#include <atomic>
#include "zeek/Desc.h"
#include "zeek/RunState.h"
#include "const.bif.netvar_h"
std::atomic_flag expire_running;
namespace zeek::storage {
void detail::ExpirationTimer::Dispatch(double t, bool is_expire) {
if ( is_expire )
return;
storage_mgr->Expire();
// If there isn't an active thread, spin up a new one. Expiration may take
// some time to complete and we want it to get all the way done before we
// start another one running. If this causes us to skip a cycle, that's not
// a big deal as the next cycle will catch anything that should be expired
// in the interim.
if ( ! expire_running.test_and_set() ) {
DBG_LOG(DBG_STORAGE, "Starting new expiration thread");
storage_mgr->expiration_thread = std::jthread([]() { storage_mgr->Expire(); });
}
storage_mgr->StartExpirationTimer();
}
@ -62,7 +76,9 @@ ErrorResult Manager::OpenBackend(BackendPtr backend, RecordValPtr options, TypeP
}
ErrorResult Manager::CloseBackend(BackendPtr backend, ErrorResultCallback* cb) {
// Remove from the list always, even if the close may fail below and even in an async context.
// Expiration runs on a separate thread and loops over the vector of backends. The mutex
// here ensures exclusive access. This one happens in a block because we can remove the
// backend from the vector before actually closing it.
{
std::unique_lock<std::mutex> lk(backends_mtx);
auto it = std::find(backends.begin(), backends.end(), backend);
@ -80,22 +96,34 @@ ErrorResult Manager::CloseBackend(BackendPtr backend, ErrorResultCallback* cb) {
}
void Manager::Expire() {
DBG_LOG(DBG_STORAGE, "Expire running, have %zu backends to check", backends.size());
// Expiration runs on a separate thread and loops over the vector of backends. The mutex
// here ensures exclusive access.
std::unique_lock<std::mutex> lk(backends_mtx);
for ( const auto& b : backends ) {
if ( b->IsOpen() )
b->Expire();
DBG_LOG(DBG_STORAGE, "Expiration running, have %zu backends to check", backends.size());
for ( auto it = backends.begin(); it != backends.end() && ! run_state::terminating; ++it ) {
if ( (*it)->IsOpen() )
(*it)->Expire();
}
expire_running.clear();
}
void Manager::StartExpirationTimer() {
zeek::detail::timer_mgr->Add(
new detail::ExpirationTimer(run_state::network_time + zeek::BifConst::Storage::expire_interval));
DBG_LOG(DBG_STORAGE, "Next expiration check at %f",
run_state::network_time + zeek::BifConst::Storage::expire_interval);
}
void Manager::RegisterBackend(BackendPtr backend) {
// Expiration runs on a separate thread and loops over the vector of backends. The mutex
// here ensures exclusive access.
std::unique_lock<std::mutex> lk(backends_mtx);
backends.push_back(std::move(backend));
DBG_LOG(DBG_STORAGE, "Registered backends: %zu", backends.size());
}
} // namespace zeek::storage

View file

@ -3,7 +3,9 @@
#pragma once
#include <mutex>
#include <thread>
#include "zeek/3rdparty/jthread.hpp"
#include "zeek/Timer.h"
#include "zeek/plugin/ComponentManager.h"
#include "zeek/storage/Backend.h"
@ -65,10 +67,13 @@ public:
*/
ErrorResult CloseBackend(BackendPtr backend, ErrorResultCallback* cb = nullptr);
void Expire();
protected:
friend class storage::detail::ExpirationTimer;
void Expire();
void RunExpireThread();
void StartExpirationTimer();
std::jthread expiration_thread;
friend class storage::OpenResultCallback;
void RegisterBackend(BackendPtr backend);