More cleanup and fixed to the metrics framework.

This commit is contained in:
Seth Hall 2012-11-19 23:43:15 -05:00
parent 5b81cfe7e2
commit 95b12262e4
2 changed files with 79 additions and 38 deletions

View file

@ -1,5 +1,7 @@
##! The metrics framework provides a way to count and measure data.
@load base/utils/queue
module Metrics;
export {
@ -12,15 +14,23 @@ export {
## This is the interval for how often threshold based notices will happen
## after they have already fired.
const threshold_series_restart_interval = 1hr &redef;
const threshold_crossed_restart_interval = 1hr &redef;
type Calculation: enum {
## Sums the values given. For string values,
## this will be the number of strings given.
SUM,
## Find the minimum value.
MIN,
## Find the maximum value.
MAX,
## Find the variance of the values.
VARIANCE,
## Find the standard deviation of the values.
STD_DEV,
## Calculate the average of the values.
AVG,
## Calculate the number of unique values.
UNIQUE,
};
@ -45,9 +55,13 @@ export {
} &log;
## Represents data being added for a single metric data point.
## Only supply a single value here at a time.
type DataPoint: record {
## Count value.
num: count &optional;
## Double value.
dbl: double &optional;
## String value.
str: string &optional;
};
@ -82,7 +96,7 @@ export {
## A sample of something being measured. This is helpful in
## some cases for collecting information to do further detection
## or better logging for forensic purposes.
sample: set[DataPoint] &optional;
samples: vector of string &optional;
};
## The record type that is used for logging metrics.
@ -145,7 +159,7 @@ export {
threshold_func: function(index: Metrics::Index, val: Metrics::ResultVal): bool &optional;
## A function callback that is called when a threshold is crossed.
threshold_crossed: function(index: Metrics::Index, val: Metrics::ResultVal) &optional;
## A number of sample DataPoints to collect for the threshold
## A number of sample DataPoint strings to collect for the threshold
## crossing callback.
samples: count &optional;
};
@ -193,6 +207,17 @@ redef record ResultVal += {
# since we will like move to a probalistic data structure in the future.
# TODO: in the future this will optionally be a hyperloglog structure
unique_vals: set[DataPoint] &optional;
# Internal use only. This is the queue where samples
# are maintained since the queue is self managing for
# the number of samples requested.
sample_queue: Queue::Queue &optional;
# Internal use only. Indicates if a simple threshold was already crossed.
is_threshold_crossed: bool &default=F;
# Internal use only. Current index for threshold series.
threshold_series_index: count &default=0;
};
# Type to store a table of metrics values.
@ -207,9 +232,6 @@ global filter_store: table[string, string] of Filter = table();
# This is indexed by metric id and filter name.
global store: table[string, string] of MetricTable = table() &default=table();
# This stores the current threshold index for filters using $threshold_series.
global threshold_series_index: table[string, string, Index] of count = {} &create_expire=threshold_series_restart_interval &default=0;
# This is hook for watching thresholds being crossed. It is called whenever
# index values are updated and the new val is given as the `val` argument.
# It's only prototyped here because cluster and non-cluster has separate
@ -311,6 +333,7 @@ function merge_result_vals(rv1: ResultVal, rv2: ResultVal): ResultVal
else if ( rv2?$var_s )
result$var_s = rv2$var_s;
# Merge $unique_vals
if ( rv1?$unique_vals || rv2?$unique_vals )
{
result$unique_vals = set();
@ -322,6 +345,21 @@ function merge_result_vals(rv1: ResultVal, rv2: ResultVal): ResultVal
add result$unique_vals[val2];
}
# Merge $sample_queue
if ( rv1?$sample_queue && rv2?$sample_queue )
result$sample_queue = Queue::merge(rv1$sample_queue, rv2$sample_queue);
else if ( rv1?$sample_queue )
result$sample_queue = rv1$sample_queue;
else if ( rv2?$sample_queue )
result$sample_queue = rv2$sample_queue;
# Merge $threshold_series_index
result$threshold_series_index = (rv1$threshold_series_index > rv2$threshold_series_index) ? rv1$threshold_series_index : rv2$threshold_series_index;
# Merge $is_threshold_crossed
if ( rv1$is_threshold_crossed || rv2$is_threshold_crossed )
result$is_threshold_crossed = T;
do_calculated_fields(result);
return result;
}
@ -412,6 +450,13 @@ function add_data(id: string, index: Index, data: DataPoint)
++result$num;
if ( filter?$samples && data?$str )
{
if ( ! result?$sample_queue )
result$sample_queue = Queue::init([$max_len=filter$samples]);
Queue::push(result$sample_queue, data$str);
}
if ( SUM in filter$measure )
{
if ( ! result?$sum ) result$sum = 0;
@ -422,7 +467,7 @@ function add_data(id: string, index: Index, data: DataPoint)
{
if ( ! result?$min )
result$min = val;
else if (val < result$min)
else if ( val < result$min )
result$min = val;
}
@ -430,7 +475,7 @@ function add_data(id: string, index: Index, data: DataPoint)
{
if ( ! result?$max )
result$max = val;
else if (val > result$max)
else if ( val > result$max )
result$max = val;
}
@ -485,22 +530,24 @@ function check_thresholds(filter: Filter, index: Index, val: ResultVal, modify_p
if ( modify_pct < 1.0 && modify_pct > 0.0 )
watch = watch/modify_pct;
if ( filter?$threshold && watch >= filter$threshold )
if ( ! val$is_threshold_crossed &&
filter?$threshold && watch >= filter$threshold )
{
# A default threshold was given and the value crossed it.
return T;
}
if ( filter?$threshold_series &&
|filter$threshold_series| >= threshold_series_index[filter$id, filter$name, index] &&
watch >= filter$threshold_series[threshold_series_index[filter$id, filter$name, index]] )
|filter$threshold_series| >= val$threshold_series_index &&
watch >= filter$threshold_series[val$threshold_series_index] )
{
# A threshold series was given and the value crossed the next
# value in the series.
return T;
}
if ( filter?$threshold_func &&
if ( ! val$is_threshold_crossed &&
filter?$threshold_func &&
filter$threshold_func(index, val) )
{
# The threshold function indicated it was crossed.
@ -512,20 +559,16 @@ function check_thresholds(filter: Filter, index: Index, val: ResultVal, modify_p
function threshold_crossed(filter: Filter, index: Index, val: ResultVal)
{
if ( filter?$threshold_crossed )
filter$threshold_crossed(index, val);
if ( ! filter?$threshold_crossed )
return;
# If I don't reset here, the value just keeps
# retriggering once the threshold has been exceeded.
if ( !filter?$threshold_series )
{
reset(filter);
}
else
{
# This just needs set to some value so that it doesn't refire the
# notice until it expires from the table or it crosses the next
# threshold in the case of vectors of thresholds.
++threshold_series_index[filter$id, filter$name, index];
}
if ( val?$sample_queue )
val$samples = Queue::get_str_vector(val$sample_queue);
filter$threshold_crossed(index, val);
val$is_threshold_crossed = T;
# Bump up to the next threshold series index if a threshold series is being used.
if ( filter?$threshold_series )
++val$threshold_series_index;
}

View file

@ -4,7 +4,5 @@ THRESHOLD_FUNC: hit a threshold function value at 2 for metric_index(host=6.5.4.
THRESHOLD_FUNC: hit a threshold function value at 1 for metric_index(host=7.2.1.5)
THRESHOLD: hit a threshold value at 6 for metric_index(host=1.2.3.4)
THRESHOLD_SERIES: hit a threshold series value at 6 for metric_index(host=1.2.3.4)
THRESHOLD_FUNC: hit a threshold function value at 3 for metric_index(host=1.2.3.4)
THRESHOLD: hit a threshold value at 1000 for metric_index(host=7.2.1.5)
THRESHOLD: hit a threshold value at 1001 for metric_index(host=7.2.1.5)
THRESHOLD_SERIES: hit a threshold series value at 1001 for metric_index(host=7.2.1.5)
THRESHOLD_FUNC: hit a threshold function value at 1000 for metric_index(host=7.2.1.5)