mirror of
https://github.com/zeek/zeek.git
synced 2025-10-13 03:58:20 +00:00
SumStats test checkpoint.
This commit is contained in:
parent
437815454d
commit
1cac89e4f8
16 changed files with 55 additions and 116 deletions
|
@ -34,15 +34,15 @@ export {
|
|||
const enable_intermediate_updates = T &redef;
|
||||
|
||||
## Event sent by the manager in a cluster to initiate the
|
||||
## collection of metrics values for a measurement.
|
||||
## collection of metrics values for a sumstat.
|
||||
global cluster_ss_request: event(uid: string, ssid: string);
|
||||
|
||||
## Event sent by nodes that are collecting metrics after receiving
|
||||
## a request for the metric measurement from the manager.
|
||||
## a request for the metric sumstat from the manager.
|
||||
global cluster_ss_response: event(uid: string, ssid: string, data: ResultTable, done: bool);
|
||||
|
||||
## This event is sent by the manager in a cluster to initiate the
|
||||
## collection of a single key value from a measurement. It's typically
|
||||
## collection of a single key value from a sumstat. It's typically
|
||||
## used to get intermediate updates before the break interval triggers
|
||||
## to speed detection of a value crossing a threshold.
|
||||
global cluster_key_request: event(uid: string, ssid: string, key: Key);
|
||||
|
@ -130,13 +130,13 @@ event SumStats::cluster_ss_request(uid: string, ssid: string)
|
|||
{
|
||||
#print fmt("WORKER %s: received the cluster_ss_request event for %s.", Cluster::node, id);
|
||||
|
||||
# Initiate sending all of the data for the requested measurement.
|
||||
# Initiate sending all of the data for the requested stats.
|
||||
if ( ssid in result_store )
|
||||
event SumStats::send_data(uid, ssid, result_store[ssid]);
|
||||
else
|
||||
event SumStats::send_data(uid, ssid, table());
|
||||
|
||||
# Lookup the actual measurement and reset it, the reference to the data
|
||||
# Lookup the actual sumstats and reset it, the reference to the data
|
||||
# currently stored will be maintained internally by the send_data event.
|
||||
if ( ssid in stats_store )
|
||||
reset(stats_store[ssid]);
|
||||
|
@ -181,9 +181,9 @@ global done_with: table[string] of count &read_expire=1min &default=0;
|
|||
global key_requests: table[string] of Result &read_expire=1min;
|
||||
|
||||
# This variable is maintained by managers to prevent overwhelming communication due
|
||||
# to too many intermediate updates. Each measurement is tracked separately so that
|
||||
# one won't overwhelm and degrade other quieter measurements.
|
||||
# Indexed on a measurement id.
|
||||
# to too many intermediate updates. Each sumstat is tracked separately so that
|
||||
# one won't overwhelm and degrade other quieter sumstats.
|
||||
# Indexed on a sumstat id.
|
||||
global outstanding_global_views: table[string] of count &default=0;
|
||||
|
||||
const zero_time = double_to_time(0.0);
|
||||
|
@ -192,7 +192,7 @@ event SumStats::finish_epoch(ss: SumStat)
|
|||
{
|
||||
if ( network_time() > zero_time )
|
||||
{
|
||||
#print fmt("%.6f MANAGER: breaking %s measurement for %s metric", network_time(), measurement$name, measurement$id);
|
||||
#print fmt("%.6f MANAGER: breaking %s sumstat for %s metric", network_time(), ss$name, ss$id);
|
||||
local uid = unique_id("");
|
||||
|
||||
if ( uid in stats_results )
|
||||
|
@ -207,8 +207,8 @@ event SumStats::finish_epoch(ss: SumStat)
|
|||
schedule ss$epoch { SumStats::finish_epoch(ss) };
|
||||
}
|
||||
|
||||
# This is unlikely to be called often, but it's here in case there are measurements
|
||||
# being collected by managers.
|
||||
# This is unlikely to be called often, but it's here in
|
||||
# case there are sumstats being collected by managers.
|
||||
function data_added(ss: SumStat, key: Key, result: Result)
|
||||
{
|
||||
if ( check_thresholds(ss, key, result, 1.0) )
|
||||
|
@ -305,7 +305,7 @@ event SumStats::cluster_ss_response(uid: string, ssid: string, data: ResultTable
|
|||
# Clean up
|
||||
delete stats_results[uid];
|
||||
delete done_with[uid];
|
||||
# Not sure I need to reset the measurement on the manager.
|
||||
# Not sure I need to reset the sumstat on the manager.
|
||||
reset(ss);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,7 +14,7 @@ export {
|
|||
};
|
||||
}
|
||||
|
||||
hook add_to_reducer_hook(r: Reducer, val: double, data: Observation, rv: ResultVal)
|
||||
hook add_to_reducer_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal)
|
||||
{
|
||||
if ( AVERAGE in r$apply )
|
||||
{
|
||||
|
|
|
@ -14,7 +14,7 @@ export {
|
|||
};
|
||||
}
|
||||
|
||||
hook add_to_reducer_hook(r: Reducer, val: double, data: Observation, rv: ResultVal)
|
||||
hook add_to_reducer_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal)
|
||||
{
|
||||
if ( MAX in r$apply )
|
||||
{
|
||||
|
|
|
@ -14,7 +14,7 @@ export {
|
|||
};
|
||||
}
|
||||
|
||||
hook add_to_reducer_hook(r: Reducer, val: double, data: Observation, rv: ResultVal)
|
||||
hook add_to_reducer_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal)
|
||||
{
|
||||
if ( MIN in r$apply )
|
||||
{
|
||||
|
|
|
@ -29,13 +29,13 @@ function get_samples(rv: ResultVal): vector of Observation
|
|||
return s;
|
||||
}
|
||||
|
||||
hook add_to_reducer_hook(r: Reducer, val: double, data: Observation, rv: ResultVal)
|
||||
hook add_to_reducer_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal)
|
||||
{
|
||||
if ( r$samples > 0 )
|
||||
{
|
||||
if ( ! rv?$samples )
|
||||
rv$samples = Queue::init([$max_len=r$samples]);
|
||||
Queue::put(rv$samples, data);
|
||||
Queue::put(rv$samples, obs);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -22,13 +22,10 @@ function calc_std_dev(rv: ResultVal)
|
|||
}
|
||||
|
||||
# This depends on the variance plugin which uses priority -5
|
||||
hook add_to_reducer_hook(r: Reducer, val: double, data: Observation, rv: ResultVal) &priority=-10
|
||||
hook add_to_reducer_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal) &priority=-10
|
||||
{
|
||||
if ( STD_DEV in r$apply )
|
||||
{
|
||||
if ( rv?$variance )
|
||||
calc_std_dev(rv);
|
||||
}
|
||||
calc_std_dev(rv);
|
||||
}
|
||||
|
||||
hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal) &priority=-10
|
||||
|
|
|
@ -34,7 +34,7 @@ hook init_resultval_hook(r: Reducer, rv: ResultVal)
|
|||
rv$sum = 0;
|
||||
}
|
||||
|
||||
hook add_to_reducer_hook(r: Reducer, val: double, data: Observation, rv: ResultVal)
|
||||
hook add_to_reducer_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal)
|
||||
{
|
||||
if ( SUM in r$apply )
|
||||
rv$sum += val;
|
||||
|
|
|
@ -23,13 +23,13 @@ redef record ResultVal += {
|
|||
unique_vals: set[Observation] &optional;
|
||||
};
|
||||
|
||||
hook add_to_reducer_hook(r: Reducer, val: double, data: Observation, rv: ResultVal)
|
||||
hook add_to_reducer_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal)
|
||||
{
|
||||
if ( UNIQUE in r$apply )
|
||||
{
|
||||
if ( ! rv?$unique_vals )
|
||||
rv$unique_vals=set();
|
||||
add rv$unique_vals[data];
|
||||
add rv$unique_vals[obs];
|
||||
rv$unique = |rv$unique_vals|;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,7 +29,7 @@ function calc_variance(rv: ResultVal)
|
|||
}
|
||||
|
||||
# Reduced priority since this depends on the average
|
||||
hook add_to_reducer_hook(r: Reducer, val: double, data: Observation, rv: ResultVal) &priority=-5
|
||||
hook add_to_reducer_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal) &priority=-5
|
||||
{
|
||||
if ( VARIANCE in r$apply )
|
||||
{
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue