Merge remote-tracking branch 'origin/topic/johanna/GH-348'

* origin/topic/johanna/GH-348:
  Sumstats: allow users to manage epoch manually
  Sumstats: epoch_finished was not called under certain circumstances
This commit is contained in:
Johanna Amann 2020-12-04 18:39:19 +00:00
commit 47ceac2491
10 changed files with 291 additions and 20 deletions

View file

@ -89,16 +89,20 @@ export {
## is no assurance provided as to where the callbacks
## will be executed on clusters.
type SumStat: record {
## An arbitrary name for the sumstat so that it can
## An arbitrary name for the sumstat so that it can
## be referred to later.
name: string;
## The interval at which this filter should be "broken"
## and the *epoch_result* callback called. The
## The interval at which this sumstat should be "broken"
## and the *epoch_result* callback called. The
## results are also reset at this time so any threshold
## based detection needs to be set to a
## value that should be expected to happen within
## this epoch.
##
## Passing an epoch of zero (e.g. ``0 secs``) causes this
## sumstat to be set to manual epochs. You will have to manually
## end the epoch by calling :zeek:see:`SumStats::next_epoch`.
epoch: interval;
## The reducers for the SumStat.
@ -129,12 +133,12 @@ export {
threshold_crossed: function(key: SumStats::Key, result: SumStats::Result) &optional;
## A callback that receives each of the results at the
## end of the analysis epoch. The function will be
## end of the analysis epoch. The function will be
## called once for each key.
epoch_result: function(ts: time, key: SumStats::Key, result: SumStats::Result) &optional;
## A callback that will be called when a single collection
## interval is completed. The *ts* value will be the time of
## A callback that will be called when a single collection
## interval is completed. The *ts* value will be the time of
## when the collection started.
epoch_finished: function(ts:time) &optional;
};
@ -156,8 +160,8 @@ export {
global observe: function(id: string, key: SumStats::Key, obs: SumStats::Observation);
## Dynamically request a sumstat key. This function should be
## used sparingly and not as a replacement for the callbacks
## from the :zeek:see:`SumStats::SumStat` record. The function is only
## used sparingly and not as a replacement for the callbacks
## from the :zeek:see:`SumStats::SumStat` record. The function is only
## available for use within "when" statements as an asynchronous
## function.
##
@ -175,6 +179,23 @@ export {
##
## Returns: A string representation of the metric key.
global key2str: function(key: SumStats::Key): string;
## Manually end the current epoch for a sumstat. Calling this function will
## cause the end of the epoch processing of sumstats to start. Note that the
## epoch will not end immediately - especially in a cluster settings, a number
## of messages need to be exchanged between the cluster nodes.
##
## Note that this function only can be called if the sumstat was created with
## an epoch time of zero (manual epochs).
##
## In a cluster, this function must be called on the manager; it will not have
## any effect when called on workers.
##
## ss_name: SumStat name.
##
## Returns: true on success, false on failure. Failures can be: sumstat not found,
## or sumstat not created for manual epochs.
global next_epoch: function(ss_name: string): bool;
}
# The function prototype for plugins to do calculations.
@ -248,6 +269,19 @@ global data_added: function(ss: SumStat, key: Key, result: Result);
# framework for clustered or non-clustered usage.
global finish_epoch: event(ss: SumStat);
function next_epoch(ss_name: string): bool
{
if ( ss_name !in stats_store )
return F;
local ss = stats_store[ss_name];
if ( ss$epoch != 0secs )
return F;
event SumStats::finish_epoch(ss);
return T;
}
function key2str(key: Key): string
{
local out = "";
@ -331,7 +365,7 @@ function reset(ss: SumStat)
}
}
# This could potentially recurse forever, but plugin authors
# This could potentially recurse forever, but plugin authors
# should be making sure they aren't causing reflexive dependencies.
function add_calc_deps(calcs: vector of Calculation, c: Calculation)
{
@ -377,8 +411,8 @@ function create(ss: SumStat)
if ( calc in calc_deps )
add_calc_deps(reducer$calc_funcs, calc);
# Don't add this calculation to the vector if
# it was already added by something else as a
# Don't add this calculation to the vector if
# it was already added by something else as a
# dependency.
local skip_calc=F;
for ( j in reducer$calc_funcs )
@ -396,7 +430,10 @@ function create(ss: SumStat)
}
reset(ss);
schedule ss$epoch { SumStats::finish_epoch(ss) };
## do not schedule epoch if this is set to manual epochs.
if ( ss$epoch != 0secs )
schedule ss$epoch { SumStats::finish_epoch(ss) };
}
function observe(id: string, orig_key: Key, obs: Observation)