SumStats changes to how thresholding works to simplify and reduce memory use.

This commit is contained in:
Seth Hall 2013-05-23 10:12:17 -04:00
parent 6bd9ab3bd6
commit 4f4ef99a6b
2 changed files with 47 additions and 60 deletions

View file

@ -54,7 +54,7 @@ export {
global send_data: event(uid: string, ss_name: string, data: ResultTable, cleanup: bool); global send_data: event(uid: string, ss_name: string, data: ResultTable, cleanup: bool);
## This event is generated when a threshold is crossed. ## This event is generated when a threshold is crossed.
global cluster_threshold_crossed: event(ss_name: string, key: SumStats::Key, thold: Thresholding); global cluster_threshold_crossed: event(ss_name: string, key: SumStats::Key, thold_index: count);
} }
# Add events to the cluster framework to make this work. # Add events to the cluster framework to make this work.
@ -154,12 +154,12 @@ event SumStats::cluster_key_request(uid: string, ss_name: string, key: Key, clea
} }
} }
event SumStats::cluster_threshold_crossed(ss_name: string, key: SumStats::Key, thold: Thresholding) event SumStats::cluster_threshold_crossed(ss_name: string, key: SumStats::Key, thold_index: count)
{ {
if ( ss_name !in threshold_tracker ) if ( ss_name !in threshold_tracker )
threshold_tracker[ss_name] = table(); threshold_tracker[ss_name] = table();
threshold_tracker[ss_name][key] = thold; threshold_tracker[ss_name][key] = thold_index;
} }
event SumStats::thresholds_reset(ss_name: string) event SumStats::thresholds_reset(ss_name: string)

View file

@ -189,22 +189,30 @@ redef record Reducer += {
calc_funcs: vector of Calculation &optional; calc_funcs: vector of Calculation &optional;
}; };
redef record Thresholding += {
# Internal use only. Indicates if a simple threshold was already crossed.
is_threshold_crossed: bool &default=F;
# Internal use only. Current key for threshold series.
threshold_series_index: count &default=0;
};
# Internal use only. For tracking thresholds per sumstat and key. # Internal use only. For tracking thresholds per sumstat and key.
global threshold_tracker: table[string] of table[Key] of Thresholding &optional; # In the case of a single threshold, 0 means the threshold isn't crossed.
# In the case of a threshold series, the number tracks the threshold offset.
global threshold_tracker: table[string] of table[Key] of count;
redef record SumStat += { function increment_threshold_tracker(ss_name: string, key: Key)
# Internal use only. {
ssname: string &optional; if ( ss_name !in threshold_tracker )
}; threshold_tracker[ss_name] = table();
if ( key !in threshold_tracker[ss_name] )
threshold_tracker[ss_name][key] = 0;
++threshold_tracker[ss_name][key];
}
function get_threshold_index(ss_name: string, key: Key): count
{
if ( ss_name !in threshold_tracker )
return 0;
if ( key !in threshold_tracker[ss_name] )
return 0;
return threshold_tracker[ss_name][key];
}
# Prototype the hook point for plugins to initialize any result values. # Prototype the hook point for plugins to initialize any result values.
global init_resultval_hook: hook(r: Reducer, rv: ResultVal); global init_resultval_hook: hook(r: Reducer, rv: ResultVal);
@ -318,8 +326,7 @@ function reset(ss: SumStat)
result_store[ss$name] = table(); result_store[ss$name] = table();
if ( (ss?$threshold || ss?$threshold_series) && if ( ss$name in threshold_tracker )
ss$name in threshold_tracker )
{ {
delete threshold_tracker[ss$name]; delete threshold_tracker[ss$name];
threshold_tracker[ss$name] = table(); threshold_tracker[ss$name] = table();
@ -360,6 +367,9 @@ function create(ss: SumStat)
stats_store[ss$name] = ss; stats_store[ss$name] = ss;
if ( ss?$threshold || ss?$threshold_series )
threshold_tracker[ss$name] = table();
for ( reducer in ss$reducers ) for ( reducer in ss$reducers )
{ {
reducer$ssname = ss$name; reducer$ssname = ss$name;
@ -420,13 +430,11 @@ function observe(id: string, key: Key, obs: Observation)
# SumStats results. # SumStats results.
if ( ! ss?$epoch_finished && if ( ! ss?$epoch_finished &&
( ss?$threshold && ( ss?$threshold &&
r$ssname in threshold_tracker &&
key in threshold_tracker[r$ssname] && key in threshold_tracker[r$ssname] &&
threshold_tracker[r$ssname][key]$is_threshold_crossed ) || threshold_tracker[r$ssname][key] != 0 ) ||
( ss?$threshold_series && ( ss?$threshold_series &&
r$ssname in threshold_tracker &&
key in threshold_tracker[r$ssname] && key in threshold_tracker[r$ssname] &&
threshold_tracker[r$ssname][key]$threshold_series_index == |ss$threshold_series| ) ) threshold_tracker[r$ssname][key] == |ss$threshold_series| ) )
{ {
next; next;
} }
@ -464,7 +472,7 @@ function observe(id: string, key: Key, obs: Observation)
# mid-break-interval threshold crossing detection for cluster deployments. # mid-break-interval threshold crossing detection for cluster deployments.
function check_thresholds(ss: SumStat, key: Key, result: Result, modify_pct: double): bool function check_thresholds(ss: SumStat, key: Key, result: Result, modify_pct: double): bool
{ {
if ( ! (ss?$threshold || ss?$threshold_series) ) if ( ! (ss?$threshold || ss?$threshold_series || ss?$threshold_crossed) )
return F; return F;
# Add in the extra ResultVals to make threshold_vals easier to write. # Add in the extra ResultVals to make threshold_vals easier to write.
@ -484,42 +492,25 @@ function check_thresholds(ss: SumStat, key: Key, result: Result, modify_pct: dou
if ( modify_pct < 1.0 && modify_pct > 0.0 ) if ( modify_pct < 1.0 && modify_pct > 0.0 )
watch = watch/modify_pct; watch = watch/modify_pct;
if ( ss$name !in threshold_tracker ) local t_index = get_threshold_index(ss$name, key);
threshold_tracker[ss$name] = table();
local t_tracker = threshold_tracker[ss$name];
if ( ss?$threshold ) if ( ss?$threshold &&
t_index == 0 && # Check that the threshold hasn't already been crossed.
watch >= ss$threshold )
{ {
local tt: Thresholding; # Value crossed the threshold.
if ( key in t_tracker ) return T;
tt = t_tracker[key];
if ( ! tt$is_threshold_crossed &&
watch >= ss$threshold )
{
t_tracker[key] = tt;
# Value crossed the threshold.
return T;
}
} }
if ( ss?$threshold_series ) if ( ss?$threshold_series &&
|ss$threshold_series| > t_index && # Check if there are more thresholds.
watch >= ss$threshold_series[t_index] )
{ {
local tt2: Thresholding; # A threshold series was given and the value crossed the next
if ( key in t_tracker ) # value in the series.
tt2 = t_tracker[key]; return T;
if ( |ss$threshold_series| > tt2$threshold_series_index &&
watch >= ss$threshold_series[tt2$threshold_series_index] )
{
t_tracker[key] = tt2;
# A threshold series was given and the value crossed the next
# value in the series.
return T;
}
} }
return F; return F;
} }
@ -529,6 +520,8 @@ function threshold_crossed(ss: SumStat, key: Key, result: Result)
if ( ! ss?$threshold_crossed ) if ( ! ss?$threshold_crossed )
return; return;
increment_threshold_tracker(ss$name,key);
# Add in the extra ResultVals to make threshold_crossed callbacks easier to write. # Add in the extra ResultVals to make threshold_crossed callbacks easier to write.
if ( |ss$reducers| != |result| ) if ( |ss$reducers| != |result| )
{ {
@ -540,11 +533,5 @@ function threshold_crossed(ss: SumStat, key: Key, result: Result)
} }
ss$threshold_crossed(key, result); ss$threshold_crossed(key, result);
local tt = threshold_tracker[ss$name][key];
tt$is_threshold_crossed = T;
# Bump up to the next threshold series index if a threshold series is being used.
if ( ss?$threshold_series )
++tt$threshold_series_index;
} }