logging: Introduce Log::delay() and Log::delay_finish()

This is a verbose, opinionated and fairly restrictive version of the log delay idea.
Main drivers are explicitly, foot-gun-avoidance and implementation simplicity.

Calling the new Log::delay() function is only allowed within the execution
of a Log::log_stream_policy() hook for the currently active log write.

Conceptually, the delay is placed between the execution of the global stream
policy hook and the individual filter policy hooks. A post delay callback
can be registered with every Log::delay() invocation. Post delay callbacks
can (1) modify a log record as they see fit, (2) veto the forwarding of the
log record to the log filters and (3) extend the delay duration by calling
Log::delay() again. The last point allows to delay a record by an indefinite
amount of time, rather than a fixed maximum amount. This should be rare and
is therefore explicit.

Log::delay() increases an internal reference count and returns an opaque
token value to be passed to Log::delay_finish() to release a delay reference.
Once all references are released, the record is forwarded to all filters
attached to a stream when the delay completes.

This functionality separates Log::log_stream_policy() and individual filter
policy hooks. One consequence is that a common use-case of filter policy hooks,
removing unproductive log records, may run after a record was delayed. Users
can lift their filtering logic to the stream level (or replicate the condition
before the delay decision). The main motivation here is that deciding on a
stream-level delay in per-filter hooks is too late. Attaching multiple filters
to a stream can additionally result in hard to understand behavior.

On the flip side, filter policy hooks are guaranteed to run after the delay
and can be used for further mangling or filtering of a delayed record.
This commit is contained in:
Arne Welzel 2023-11-12 16:03:23 +01:00
parent dc552e647f
commit f0e67022fd
105 changed files with 3505 additions and 86 deletions

View file

@ -30,6 +30,104 @@ using namespace std;
namespace zeek::logging {
namespace detail {
using DelayTokenType = zeek_uint_t;
class DelayInfo;
using DelayInfoPtr = std::shared_ptr<DelayInfo>;
constexpr zeek::TypeTag ZeekDelayTokenTypeTag = TYPE_COUNT;
DelayTokenType to_internal_delay_token(const ValPtr& val) {
assert(val->GetType()->Tag() == ZeekDelayTokenTypeTag);
return val->AsCount();
}
using DelayWriteMap = std::map<WriteContext, DelayInfoPtr>;
using DelayTokenMap = std::unordered_map<DelayTokenType, DelayInfoPtr>;
using DelayQueue = std::list<DelayInfoPtr>;
// DelayInfo tracks information of Log::write() operation that was
// delayed during execution of the Log::log_stream_policy hook.
class DelayInfo {
public:
static const DelayInfoPtr nil;
explicit DelayInfo(const WriteContext& ctx, const zeek::ValPtr token_val, double expire_time)
: ctx(ctx), token_val(token_val), expire_time(expire_time) {}
// No copy or assignment of DelayInfo itself, should
// always be managed through a shared pointer.
DelayInfo(const DelayInfo&) = delete;
DelayInfo& operator=(const DelayInfo&) = delete;
void IncDelayRefs() { ++delay_refs; };
void DecDelayRefs() {
assert(delay_refs > 0);
--delay_refs;
};
int DelayRefs() const { return delay_refs; };
bool HasDelayRefs() const { return delay_refs > 0; };
const RecordValPtr& Record() const { return ctx.record; }
const EnumValPtr& StreamId() const { return ctx.id; }
const WriteContext& Context() const { return ctx; }
const ValPtr& TokenVal() const { return token_val; }
double ExpireTime() const { return expire_time; }
// The position in the delay queue for efficient removal.
DelayQueue::const_iterator QueuePosition() const { return queue_position; }
bool IsInQueue() const { return enqueued; }
void SetQueuePosition(DelayQueue::const_iterator pos) {
queue_position = pos;
enqueued = true;
}
const std::vector<FuncPtr>& PostDelayCallbacks() const { return post_delay_callbacks; }
void AppendPostDelayCallback(FuncPtr f) { post_delay_callbacks.emplace_back(std::move(f)); }
private:
// ActiveWrite information
WriteContext ctx;
// References - number of Log::delay() calls.
int delay_refs = 1;
// Token for this delay.
ValPtr token_val;
// Stamped on the first Log::delay() call during
// Log::log_stream_policy execution.
double expire_time = 0.0;
// Callbacks to invoke when all references were released, or the delay expired.
std::vector<FuncPtr> post_delay_callbacks;
// Has this DelayInfo object been enqueued?
bool enqueued = false;
// Iterator pointing this instance in the delay_queue.
DelayQueue::const_iterator queue_position;
};
const DelayInfoPtr DelayInfo::nil = nullptr;
// Helper class for dealing with nested Write() calls.
class ActiveWriteScope {
public:
ActiveWriteScope(std::vector<WriteContext>& active_writes, WriteContext w) : active_writes{active_writes} {
active_writes.push_back(std::move(w));
}
~ActiveWriteScope() { active_writes.pop_back(); }
private:
std::vector<WriteContext>& active_writes;
};
} // namespace detail
struct Manager::Filter {
Val* fval = nullptr;
string name;
@ -96,9 +194,42 @@ struct Manager::Stream {
std::optional<telemetry::IntCounter> total_writes; // Initialized on first write.
// State about delayed writes for this Stream.
detail::DelayQueue delay_queue;
detail::DelayTokenMap delay_tokens;
detail::DelayWriteMap delayed_writes;
detail::WriteIdx write_idx = 0;
Manager::LogDelayExpiredTimer* delay_timer = nullptr;
double max_delay_interval = 0.0;
zeek_uint_t max_delay_queue_size = 1;
bool evicting = false;
~Stream();
const detail::DelayInfoPtr& GetDelayInfo(const detail::WriteContext& ctx);
void EnqueueWriteForDelay(const detail::WriteContext& ctx);
void EvictDelayedWrites();
void ScheduleLogDelayExpiredTimer(double t);
void DispatchDelayExpiredTimer(double t, bool is_expire);
};
// Timer for the head of the per stream delay queue.
class Manager::LogDelayExpiredTimer : public zeek::detail::Timer {
public:
LogDelayExpiredTimer(Manager::Stream* const stream, double t)
: Timer(t, zeek::detail::TIMER_LOG_DELAY_EXPIRE), stream(stream) {}
void Dispatch(double t, bool is_expire) override { stream->DispatchDelayExpiredTimer(t, is_expire); }
private:
Manager::Stream* const stream;
};
Manager::Filter::~Filter() {
Unref(fval);
Unref(field_name_map);
@ -131,8 +262,119 @@ Manager::Stream::~Stream() {
for ( list<Filter*>::iterator f = filters.begin(); f != filters.end(); ++f )
delete *f;
if ( delay_timer )
zeek::detail::timer_mgr->Cancel(delay_timer);
delay_timer = nullptr;
}
const detail::DelayInfoPtr& Manager::Stream::GetDelayInfo(const detail::WriteContext& ctx) {
const auto& it = delayed_writes.find(ctx);
if ( it != delayed_writes.end() )
return it->second;
return detail::DelayInfo::nil;
}
void Manager::Stream::EnqueueWriteForDelay(const detail::WriteContext& ctx) {
const auto delay_info = GetDelayInfo(ctx);
assert(delay_info != nullptr);
const auto& position = delay_queue.insert(delay_queue.end(), delay_info);
delay_info->SetQueuePosition(position);
DBG_LOG(DBG_LOGGING, "Enqueue record %p with delay_refs=%d, expire_time=%lf", ctx.record.get(),
delay_info->DelayRefs(), delay_info->ExpireTime());
EvictDelayedWrites();
// If all delays have already been resolved after Log::write() returned,
// directly complete the delay.
if ( ! delay_info->HasDelayRefs() ) {
zeek::log_mgr->DelayCompleted(this, *delay_info);
return;
}
ScheduleLogDelayExpiredTimer(delay_info->ExpireTime());
}
void Manager::Stream::EvictDelayedWrites() {
// Prevent recursion as DelayCompleted() may call EnqueueWriteForDelay()
// in turn calling into eviction.
DBG_LOG(DBG_LOGGING, "EvictDelayedWrites queue_size=%ld max=%ld evicting=%d", delay_queue.size(),
max_delay_queue_size, evicting);
if ( evicting )
return;
evicting = true;
if ( delay_queue.size() > max_delay_queue_size ) {
if ( delay_timer ) {
zeek::detail::timer_mgr->Cancel(delay_timer);
delay_timer = nullptr;
}
// It may happen that all records are re-delayed, which we allow,
// but also trigger a warning. This could be caused by indefinite
// redelaying through post_delay_callbacks.
auto start_queue_size = delay_queue.size();
decltype(start_queue_size) current = 0;
while ( delay_queue.size() > max_delay_queue_size ) {
++current;
const auto& evict_delay_info = delay_queue.front();
DBG_LOG(DBG_LOGGING, "Evicting record %p", evict_delay_info->Record().get());
// Delay completed will remove it from the queue, no need to pop.
zeek::log_mgr->DelayCompleted(this, *evict_delay_info);
if ( current == start_queue_size ) {
reporter->Warning("unable to evict delayed records for stream %s queue_size=%ld, all re-delayed?",
id->GetType<EnumType>()->Lookup(id->InternalInt()), delay_queue.size());
break;
}
}
ScheduleLogDelayExpiredTimer(delay_queue.front()->ExpireTime());
}
evicting = false;
}
void Manager::Stream::ScheduleLogDelayExpiredTimer(double t) {
if ( delay_timer != nullptr ) {
assert(delay_timer->Time() <= t);
return;
}
delay_timer = new LogDelayExpiredTimer(this, t);
zeek::detail::timer_mgr->Add(delay_timer);
}
void Manager::Stream::DispatchDelayExpiredTimer(double t, bool is_expire) {
delay_timer = nullptr;
while ( ! delay_queue.empty() ) {
const auto& delay_info = delay_queue.front();
// If is_expire, drain the queue. Otherwise, stop
// when the next record in the queue is in the future.
if ( ! is_expire && delay_info->ExpireTime() > t )
break;
assert(delay_info->Record() != nullptr);
DBG_LOG(DBG_LOGGING, "Delayed record %p expired", delay_info->Record().get());
zeek::log_mgr->DelayCompleted(this, *delay_info);
}
// Re-arm the timer if there's more to do.
if ( ! delay_queue.empty() )
ScheduleLogDelayExpiredTimer(delay_queue.front()->ExpireTime());
}
Manager::Manager()
: plugin::ComponentManager<logging::Component>("Log", "Writer"),
total_log_stream_writes_family(telemetry_mgr->CounterFamily("zeek", "log-stream-writes", {"module", "stream"},
@ -660,19 +902,6 @@ bool Manager::Write(EnumVal* id, RecordVal* columns_arg) {
if ( stream->event )
event_mgr.Enqueue(stream->event, columns);
bool stream_veto = false;
if ( log_stream_policy_hook ) {
auto v = log_stream_policy_hook->Invoke(columns, IntrusivePtr{NewRef{}, id});
if ( v && ! v->AsBool() ) {
// We record the fact that this hook is vetoing
// the write, but continue on to the filter-
// level hooks to allow them to run anyway.
// They cannot "un-veto".
stream_veto = true;
}
}
if ( ! stream->total_writes ) {
std::string module_name = zeek::detail::extract_module_name(stream->name.c_str());
std::initializer_list<telemetry::LabelView> labels{{"module", module_name}, {"stream", stream->name}};
@ -681,6 +910,47 @@ bool Manager::Write(EnumVal* id, RecordVal* columns_arg) {
stream->total_writes->Inc();
bool stream_veto = false;
{
// Scope for active write.
uint64_t idx = ++stream->write_idx;
detail::WriteContext active_write{{zeek::NewRef{}, id}, columns, idx};
detail::ActiveWriteScope active_write_scope{active_writes, active_write};
if ( log_stream_policy_hook ) {
auto v = log_stream_policy_hook->Invoke(columns, IntrusivePtr{NewRef{}, id});
if ( v && ! v->AsBool() ) {
// We record the fact that this hook is vetoing
// the write, but continue on to the filter-
// level hooks to allow them to run anyway.
// They cannot "un-veto".
stream_veto = true;
}
}
// Assert a Log::write() happening during the Log::log_stream_policy
// didn't corrupt our notion of active_writes.
assert(active_writes.back().record == active_write.record);
assert(active_writes.back().idx == active_write.idx);
if ( const auto& delay_info = stream->GetDelayInfo(active_write); delay_info ) {
if ( ! stream_veto ) {
DBG_LOG(DBG_LOGGING, "Active write %p was delayed", delay_info->Record().get());
stream->EnqueueWriteForDelay(active_write);
// We're done for now.
return true;
}
// There's a stream veto, so we've never put anything into
// the queue. Do the cleanup here and fall through to the
// policy hooks.
stream->delay_tokens.erase(detail::to_internal_delay_token(delay_info->TokenVal()));
stream->delayed_writes.erase(active_writes.back());
}
} // scope for active write.
return WriteToFilters(stream, columns, stream_veto ? PolicyVerdict::VETO : PolicyVerdict::PASS);
}
@ -701,7 +971,7 @@ bool Manager::WriteToFilters(const Manager::Stream* stream, zeek::RecordValPtr c
continue;
}
// Even if Log::log_stream_policy vetoed, we, invoke filter policy
// Even if Log::log_stream_policy vetoed, we invoke filter policy
// hooks. Skip actually writing here.
if ( stream_verdict == PolicyVerdict::VETO )
continue;
@ -878,6 +1148,185 @@ bool Manager::WriteToFilters(const Manager::Stream* stream, zeek::RecordValPtr c
return true;
}
ValPtr Manager::Delay(const EnumValPtr& id, const RecordValPtr record, FuncPtr post_delay_cb) {
if ( active_writes.size() == 0 ) {
reporter->Error("invalid Log::delay() call: no active write context available");
return zeek::val_mgr->Count(0);
}
const auto& active_write_ctx = active_writes.back();
if ( active_write_ctx.id != id || active_write_ctx.record != record ) {
reporter->Error("invalid Log::delay() call: active write mismatch");
return zeek::val_mgr->Count(0);
}
DBG_LOG(DBG_LOGGING, "Delay() for %p RefCnt=%d", record.get(), record->RefCnt());
ValPtr token_val;
Stream* stream = FindStream(id.get());
if ( const auto& delay_info = stream->GetDelayInfo(active_write_ctx); delay_info ) {
// Previously delayed, return the same token to script-land.
token_val = delay_info->TokenVal();
delay_info->IncDelayRefs();
if ( post_delay_cb )
delay_info->AppendPostDelayCallback(post_delay_cb);
}
else {
// This is the first time this Log::write() is delayed, allocate a
// new token an return it to script land.
detail::DelayTokenType token = ++last_delay_token;
token_val = zeek::val_mgr->Count(token);
double expire_time = run_state::network_time + stream->max_delay_interval;
auto new_delay_info = std::make_shared<detail::DelayInfo>(active_write_ctx, token_val, expire_time);
if ( post_delay_cb )
new_delay_info->AppendPostDelayCallback(post_delay_cb);
// Immediately keep information via the token, too, so that DelayFinish()
// works right away (even directly after Delay().
stream->delay_tokens[token] = new_delay_info;
stream->delayed_writes.emplace(active_write_ctx, new_delay_info);
assert(stream->GetDelayInfo(active_write_ctx) != detail::DelayInfo::nil);
}
DBG_LOG(DBG_LOGGING, "Delayed log record %p RefCnt=%d token=%ld post_delay_cb=%p", record.get(), record->RefCnt(),
token_val->AsCount(), post_delay_cb.get());
return token_val;
}
bool Manager::DelayFinish(const EnumValPtr& id, const RecordValPtr& record, const ValPtr& token_val) {
Stream* stream = FindStream(id.get());
if ( ! stream )
return false;
if ( token_val->GetType()->Tag() != detail::ZeekDelayTokenTypeTag ) {
reporter->Error("invalid delay token type %s", zeek::type_name(token_val->GetType()->Tag()));
return false;
}
detail::DelayTokenType token = detail::to_internal_delay_token(token_val);
DBG_LOG(DBG_LOGGING, "DelayFinish() for %p RefCnt=%d token=%ld", record.get(), record->RefCnt(), token);
const auto& it = stream->delay_tokens.find(token);
if ( it == stream->delay_tokens.end() ) {
reporter->Error("non-existing log record for token=%ld %p", token, record.get());
return false;
}
auto& delay_info = it->second;
if ( delay_info->Record() != record ) {
reporter->Error("record mismatch token=%ld %p and %p", token, record.get(), delay_info->Record().get());
return false;
}
if ( ! delay_info->HasDelayRefs() ) {
reporter->Error("delay reference underflow for token=%ld", token);
return false;
}
delay_info->DecDelayRefs();
// Only call DelayCompleted() if this was ever properly enqueued.
if ( delay_info->IsInQueue() && ! delay_info->HasDelayRefs() )
DelayCompleted(stream, *delay_info);
return true;
}
// Delaying has completed.
bool Manager::DelayCompleted(Stream* stream, detail::DelayInfo& delay_info) {
auto token = detail::to_internal_delay_token(delay_info.TokenVal());
assert(stream->delay_tokens.find(token) != stream->delay_tokens.end());
DBG_LOG(DBG_LOGGING, "DelayCompleted() for log record %p RefCnt=%d token=%ld", delay_info.Record().get(),
delay_info.Record()->RefCnt(), token);
bool res = false;
bool allow = true;
{
// Push a new active write when running the post delay callbacks. This
// allows re-delay the record and putting it at the end of the queue.
uint64_t idx = ++stream->write_idx;
detail::WriteContext write_context{delay_info.StreamId(), delay_info.Record(), idx};
detail::ActiveWriteScope active_write_scope{active_writes, write_context};
for ( const auto& cb : delay_info.PostDelayCallbacks() ) {
auto v = cb->Invoke(delay_info.Record(), delay_info.StreamId());
if ( v )
allow &= v->AsBool();
}
DBG_LOG(DBG_LOGGING, "DelayCompleted() post_delay_callback for record %p outcome=%d", delay_info.Record().get(),
allow);
if ( const auto& new_delay_info = stream->GetDelayInfo(write_context); new_delay_info ) {
// Post delay callbacks re-delayed, clean-up.
stream->delay_queue.erase(delay_info.QueuePosition());
stream->delay_tokens.erase(token);
stream->delayed_writes.erase(delay_info.Context());
DBG_LOG(DBG_LOGGING, "Enqueue re-delayed record %p as %ld (delay_refs=%d)", new_delay_info->Record().get(),
write_context.idx, new_delay_info->DelayRefs());
stream->EnqueueWriteForDelay(write_context);
return true;
}
}
// If any of the callbacks vetoed, don't even let the filter policy hooks
// see it. This is somewhat different from Log::log_stream_policy, but
// seems somewhat saner.
if ( allow )
res = WriteToFilters(stream, delay_info.Record(), PolicyVerdict::PASS);
// Clear the state.
stream->delay_queue.erase(delay_info.QueuePosition());
stream->delay_tokens.erase(token);
stream->delayed_writes.erase(delay_info.Context());
return res;
}
bool Manager::SetMaxDelayInterval(const EnumValPtr& id, double delay) {
Stream* stream = FindStream(id.get());
if ( ! stream )
return false;
DBG_LOG(DBG_LOGGING, "SetMaxDelayInterval: stream=%s max_delay=%f", stream->name.c_str(), delay);
// We rely on script land to protect us from not setting a lower value.
// Could consider to update the expiration time for all pending writes
// the queue and start expiring from the head, too.
if ( delay < stream->max_delay_interval ) {
reporter->Warning("refusing to set lower delay %f < %f", delay, stream->max_delay_interval);
return false;
}
stream->max_delay_interval = delay;
return true;
}
bool Manager::SetMaxDelayQueueSize(const EnumValPtr& id, zeek_uint_t queue_size) {
Stream* stream = FindStream(id.get());
if ( ! stream )
return false;
DBG_LOG(DBG_LOGGING, "SetMaxDelayQueueSize: stream=%s queue_size=%ld", stream->name.c_str(), queue_size);
stream->max_delay_queue_size = queue_size;
stream->EvictDelayedWrites();
return true;
}
threading::Value* Manager::ValToLogVal(std::optional<ZVal>& val, Type* ty) {
if ( ! val )
return new threading::Value(ty->Tag(), false);