SumStats tests pass.

This commit is contained in:
Seth Hall 2013-04-15 15:28:11 -04:00
parent fbe967e16a
commit 437815454d
6 changed files with 66 additions and 66 deletions

View file

@ -35,34 +35,34 @@ export {
## Event sent by the manager in a cluster to initiate the ## Event sent by the manager in a cluster to initiate the
## collection of metrics values for a measurement. ## collection of metrics values for a measurement.
global cluster_measurement_request: event(uid: string, mid: string); global cluster_ss_request: event(uid: string, ssid: string);
## Event sent by nodes that are collecting metrics after receiving ## Event sent by nodes that are collecting metrics after receiving
## a request for the metric measurement from the manager. ## a request for the metric measurement from the manager.
global cluster_measurement_response: event(uid: string, mid: string, data: ResultTable, done: bool); global cluster_ss_response: event(uid: string, ssid: string, data: ResultTable, done: bool);
## This event is sent by the manager in a cluster to initiate the ## This event is sent by the manager in a cluster to initiate the
## collection of a single key value from a measurement. It's typically ## collection of a single key value from a measurement. It's typically
## used to get intermediate updates before the break interval triggers ## used to get intermediate updates before the break interval triggers
## to speed detection of a value crossing a threshold. ## to speed detection of a value crossing a threshold.
global cluster_key_request: event(uid: string, mid: string, key: Key); global cluster_key_request: event(uid: string, ssid: string, key: Key);
## This event is sent by nodes in response to a ## This event is sent by nodes in response to a
## :bro:id:`SumStats::cluster_key_request` event. ## :bro:id:`SumStats::cluster_key_request` event.
global cluster_key_response: event(uid: string, mid: string, key: Key, result: Result); global cluster_key_response: event(uid: string, ssid: string, key: Key, result: Result);
## This is sent by workers to indicate that they crossed the percent of the ## This is sent by workers to indicate that they crossed the percent of the
## current threshold by the percentage defined globally in ## current threshold by the percentage defined globally in
## :bro:id:`SumStats::cluster_request_global_view_percent` ## :bro:id:`SumStats::cluster_request_global_view_percent`
global cluster_key_intermediate_response: event(mid: string, key: SumStats::Key); global cluster_key_intermediate_response: event(ssid: string, key: SumStats::Key);
## This event is scheduled internally on workers to send result chunks. ## This event is scheduled internally on workers to send result chunks.
global send_data: event(uid: string, mid: string, data: ResultTable); global send_data: event(uid: string, ssid: string, data: ResultTable);
} }
# Add events to the cluster framework to make this work. # Add events to the cluster framework to make this work.
redef Cluster::manager2worker_events += /SumStats::cluster_(measurement_request|key_request)/; redef Cluster::manager2worker_events += /SumStats::cluster_(ss_request|key_request)/;
redef Cluster::worker2manager_events += /SumStats::cluster_(measurement_response|key_response|key_intermediate_response)/; redef Cluster::worker2manager_events += /SumStats::cluster_(ss_response|key_response|key_intermediate_response)/;
@if ( Cluster::local_node_type() != Cluster::MANAGER ) @if ( Cluster::local_node_type() != Cluster::MANAGER )
# This variable is maintained to know what keys have recently sent as # This variable is maintained to know what keys have recently sent as
@ -99,7 +99,7 @@ function data_added(ss: SumStat, key: Key, result: Result)
} }
} }
event SumStats::send_data(uid: string, mid: string, data: ResultTable) event SumStats::send_data(uid: string, ssid: string, data: ResultTable)
{ {
#print fmt("WORKER %s: sending data for uid %s...", Cluster::node, uid); #print fmt("WORKER %s: sending data for uid %s...", Cluster::node, uid);
@ -121,39 +121,39 @@ event SumStats::send_data(uid: string, mid: string, data: ResultTable)
if ( |data| == 0 ) if ( |data| == 0 )
done = T; done = T;
event SumStats::cluster_measurement_response(uid, mid, local_data, done); event SumStats::cluster_ss_response(uid, ssid, local_data, done);
if ( ! done ) if ( ! done )
schedule 0.01 sec { SumStats::send_data(uid, mid, data) }; schedule 0.01 sec { SumStats::send_data(uid, ssid, data) };
} }
event SumStats::cluster_measurement_request(uid: string, mid: string) event SumStats::cluster_ss_request(uid: string, ssid: string)
{ {
#print fmt("WORKER %s: received the cluster_measurement_request event for %s.", Cluster::node, id); #print fmt("WORKER %s: received the cluster_ss_request event for %s.", Cluster::node, id);
# Initiate sending all of the data for the requested measurement. # Initiate sending all of the data for the requested measurement.
if ( mid in result_store ) if ( ssid in result_store )
event SumStats::send_data(uid, mid, result_store[mid]); event SumStats::send_data(uid, ssid, result_store[ssid]);
else else
event SumStats::send_data(uid, mid, table()); event SumStats::send_data(uid, ssid, table());
# Lookup the actual measurement and reset it, the reference to the data # Lookup the actual measurement and reset it, the reference to the data
# currently stored will be maintained internally by the send_data event. # currently stored will be maintained internally by the send_data event.
if ( mid in stats_store ) if ( ssid in stats_store )
reset(stats_store[mid]); reset(stats_store[ssid]);
} }
event SumStats::cluster_key_request(uid: string, mid: string, key: Key) event SumStats::cluster_key_request(uid: string, ssid: string, key: Key)
{ {
if ( mid in result_store && key in result_store[mid] ) if ( ssid in result_store && key in result_store[ssid] )
{ {
#print fmt("WORKER %s: received the cluster_key_request event for %s=%s.", Cluster::node, key2str(key), data); #print fmt("WORKER %s: received the cluster_key_request event for %s=%s.", Cluster::node, key2str(key), data);
event SumStats::cluster_key_response(uid, mid, key, result_store[mid][key]); event SumStats::cluster_key_response(uid, ssid, key, result_store[ssid][key]);
} }
else else
{ {
# We need to send an empty response if we don't have the data so that the manager # We need to send an empty response if we don't have the data so that the manager
# can know that it heard back from all of the workers. # can know that it heard back from all of the workers.
event SumStats::cluster_key_response(uid, mid, key, table()); event SumStats::cluster_key_response(uid, ssid, key, table());
} }
} }
@ -195,16 +195,16 @@ event SumStats::finish_epoch(ss: SumStat)
#print fmt("%.6f MANAGER: breaking %s measurement for %s metric", network_time(), measurement$name, measurement$id); #print fmt("%.6f MANAGER: breaking %s measurement for %s metric", network_time(), measurement$name, measurement$id);
local uid = unique_id(""); local uid = unique_id("");
if ( uid in measurement_results ) if ( uid in stats_results )
delete measurement_results[uid]; delete stats_results[uid];
stats_results[uid] = table(); stats_results[uid] = table();
# Request data from peers. # Request data from peers.
event SumStats::cluster_measurement_request(uid, ss$id); event SumStats::cluster_ss_request(uid, ss$id);
} }
# Schedule the next finish_epoch event. # Schedule the next finish_epoch event.
schedule m$epoch { SumStats::finish_epoch(m) }; schedule ss$epoch { SumStats::finish_epoch(ss) };
} }
# This is unlikely to be called often, but it's here in case there are measurements # This is unlikely to be called often, but it's here in case there are measurements
@ -252,7 +252,7 @@ event SumStats::cluster_key_intermediate_response(ssid: string, key: Key)
#print fmt("MANAGER: requesting key data for %s", key2str(key)); #print fmt("MANAGER: requesting key data for %s", key2str(key));
if ( ssid in outstanding_global_views && if ( ssid in outstanding_global_views &&
|outstanding_global_views[mid]| > max_outstanding_global_views ) |outstanding_global_views[ssid]| > max_outstanding_global_views )
{ {
# Don't do this intermediate update. Perhaps at some point in the future # Don't do this intermediate update. Perhaps at some point in the future
# we will queue and randomly select from these ignored intermediate # we will queue and randomly select from these ignored intermediate
@ -266,7 +266,7 @@ event SumStats::cluster_key_intermediate_response(ssid: string, key: Key)
event SumStats::cluster_key_request(uid, ssid, key); event SumStats::cluster_key_request(uid, ssid, key);
} }
event SumStats::cluster_measurement_response(uid: string, ssid: string, data: ResultTable, done: bool) event SumStats::cluster_ss_response(uid: string, ssid: string, data: ResultTable, done: bool)
{ {
#print fmt("MANAGER: receiving results from %s", get_event_peer()$descr); #print fmt("MANAGER: receiving results from %s", get_event_peer()$descr);

View file

@ -120,7 +120,7 @@ export {
}; };
## Create a summary statistic. ## Create a summary statistic.
global create: function(m: SumStats::SumStat); global create: function(ss: SumStats::SumStat);
## Add data into an observation stream. This should be ## Add data into an observation stream. This should be
## called when a script has measured some point value. ## called when a script has measured some point value.
@ -158,13 +158,13 @@ type Thresholding: record {
# Internal use only. For tracking thresholds per sumstat and key. # Internal use only. For tracking thresholds per sumstat and key.
global threshold_tracker: table[string] of table[Key] of Thresholding &optional; global threshold_tracker: table[string] of table[Key] of Thresholding &optional;
redef record SumStats += { redef record SumStat += {
# Internal use only (mostly for cluster coherency). # Internal use only (mostly for cluster coherency).
id: string &optional; id: string &optional;
}; };
# Store of sumstats indexed on the sumstat id. # Store of sumstats indexed on the sumstat id.
global stats_store: table[string] of SumStats = table(); global stats_store: table[string] of SumStat = table();
# Store of reducers indexed on the data point stream id. # Store of reducers indexed on the data point stream id.
global reducer_store: table[string] of set[Reducer] = table(); global reducer_store: table[string] of set[Reducer] = table();
@ -179,7 +179,7 @@ global thresholds_store: table[string, Key] of bool = table();
# key values are updated and the new val is given as the `val` argument. # key values are updated and the new val is given as the `val` argument.
# It's only prototyped here because cluster and non-cluster have separate # It's only prototyped here because cluster and non-cluster have separate
# implementations. # implementations.
global data_added: function(m: SumStats, key: Key, result: Result); global data_added: function(ss: SumStat, key: Key, result: Result);
# Prototype the hook point for plugins to do calculations. # Prototype the hook point for plugins to do calculations.
global add_to_reducer_hook: hook(r: Reducer, val: double, data: Observation, rv: ResultVal); global add_to_reducer_hook: hook(r: Reducer, val: double, data: Observation, rv: ResultVal);
@ -190,7 +190,7 @@ global compose_resultvals_hook: hook(result: ResultVal, rv1: ResultVal, rv2: Res
# Event that is used to "finish" measurements and adapt the measurement # Event that is used to "finish" measurements and adapt the measurement
# framework for clustered or non-clustered usage. # framework for clustered or non-clustered usage.
global finish_epoch: event(m: SumStats); global finish_epoch: event(ss: SumStat);
function key2str(key: Key): string function key2str(key: Key): string
{ {

View file

@ -2,23 +2,23 @@
module SumStats; module SumStats;
event SumStats::finish_epoch(m: SumStats) event SumStats::finish_epoch(ss: SumStat)
{ {
if ( m$id in result_store ) if ( ss$id in result_store )
{ {
local data = result_store[m$id]; local data = result_store[ss$id];
if ( m?$epoch_finished ) if ( ss?$epoch_finished )
m$epoch_finished(data); ss$epoch_finished(data);
reset(m); reset(ss);
} }
schedule m$epoch { SumStats::finish_epoch(m) }; schedule ss$epoch { SumStats::finish_epoch(ss) };
} }
function data_added(m: SumStats, key: Key, result: Result) function data_added(ss: SumStat, key: Key, result: Result)
{ {
if ( check_thresholds(m, key, result, 1.0) ) if ( check_thresholds(ss, key, result, 1.0) )
threshold_crossed(m, key, result); threshold_crossed(ss, key, result);
} }

View file

@ -29,9 +29,9 @@
@load base/frameworks/communication @load base/frameworks/communication
@load base/frameworks/control @load base/frameworks/control
@load base/frameworks/cluster @load base/frameworks/cluster
@load base/frameworks/measurement
@load base/frameworks/intel @load base/frameworks/intel
@load base/frameworks/reporter @load base/frameworks/reporter
@load base/frameworks/sumstats
@load base/frameworks/tunnels @load base/frameworks/tunnels
@load base/protocols/conn @load base/protocols/conn

View file

@ -1,6 +1,6 @@
THRESHOLD_SERIES: hit a threshold series value at 3 for measurement_key(host=1.2.3.4) THRESHOLD_SERIES: hit a threshold series value at 3 for sumstats_key(host=1.2.3.4)
THRESHOLD_SERIES: hit a threshold series value at 6 for measurement_key(host=1.2.3.4) THRESHOLD_SERIES: hit a threshold series value at 6 for sumstats_key(host=1.2.3.4)
THRESHOLD: hit a threshold value at 6 for measurement_key(host=1.2.3.4) THRESHOLD: hit a threshold value at 6 for sumstats_key(host=1.2.3.4)
THRESHOLD_SERIES: hit a threshold series value at 1001 for measurement_key(host=7.2.1.5) THRESHOLD_SERIES: hit a threshold series value at 1001 for sumstats_key(host=7.2.1.5)
THRESHOLD: hit a threshold value at 1001 for measurement_key(host=7.2.1.5) THRESHOLD: hit a threshold value at 1001 for sumstats_key(host=7.2.1.5)
THRESHOLD WITH RATIO BETWEEN REDUCERS: hit a threshold value at 55x for measurement_key(host=7.2.1.5) THRESHOLD WITH RATIO BETWEEN REDUCERS: hit a threshold value at 55x for sumstats_key(host=7.2.1.5)

View file

@ -22,14 +22,14 @@ global n = 0;
event bro_init() &priority=5 event bro_init() &priority=5
{ {
local r1: SumStats::Reducer = [$stream="test.metric", $apply=set(SumStats::SUM, SumStats::MIN, SumStats::MAX, SumStats::AVERAGE, SumStats::STD_DEV, SumStats::VARIANCE, SumStats::UNIQUE)]; local r1: SumStats::Reducer = [$stream="test", $apply=set(SumStats::SUM, SumStats::MIN, SumStats::MAX, SumStats::AVERAGE, SumStats::STD_DEV, SumStats::VARIANCE, SumStats::UNIQUE)];
SumStats::create([$epoch=5secs, SumStats::create([$epoch=5secs,
$reducers=set(r1), $reducers=set(r1),
$epoch_finished(rt: SumStats::ResultTable) = $epoch_finished(rt: SumStats::ResultTable) =
{ {
for ( key in rt ) for ( key in rt )
{ {
local r = rt[key]["test.metric"]; local r = rt[key]["test"];
print fmt("Host: %s - num:%d - sum:%.1f - avg:%.1f - max:%.1f - min:%.1f - var:%.1f - std_dev:%.1f - unique:%d", key$host, r$num, r$sum, r$average, r$max, r$min, r$variance, r$std_dev, r$unique); print fmt("Host: %s - num:%d - sum:%.1f - avg:%.1f - max:%.1f - min:%.1f - var:%.1f - std_dev:%.1f - unique:%d", key$host, r$num, r$sum, r$average, r$max, r$min, r$variance, r$std_dev, r$unique);
} }
@ -49,23 +49,23 @@ event ready_for_data()
{ {
if ( Cluster::node == "worker-1" ) if ( Cluster::node == "worker-1" )
{ {
SumStats::observe("test.metric", [$host=1.2.3.4], [$num=34]); SumStats::observe("test", [$host=1.2.3.4], [$num=34]);
SumStats::observe("test.metric", [$host=1.2.3.4], [$num=30]); SumStats::observe("test", [$host=1.2.3.4], [$num=30]);
SumStats::observe("test.metric", [$host=6.5.4.3], [$num=1]); SumStats::observe("test", [$host=6.5.4.3], [$num=1]);
SumStats::observe("test.metric", [$host=7.2.1.5], [$num=54]); SumStats::observe("test", [$host=7.2.1.5], [$num=54]);
} }
if ( Cluster::node == "worker-2" ) if ( Cluster::node == "worker-2" )
{ {
SumStats::observe("test.metric", [$host=1.2.3.4], [$num=75]); SumStats::observe("test", [$host=1.2.3.4], [$num=75]);
SumStats::observe("test.metric", [$host=1.2.3.4], [$num=30]); SumStats::observe("test", [$host=1.2.3.4], [$num=30]);
SumStats::observe("test.metric", [$host=1.2.3.4], [$num=3]); SumStats::observe("test", [$host=1.2.3.4], [$num=3]);
SumStats::observe("test.metric", [$host=1.2.3.4], [$num=57]); SumStats::observe("test", [$host=1.2.3.4], [$num=57]);
SumStats::observe("test.metric", [$host=1.2.3.4], [$num=52]); SumStats::observe("test", [$host=1.2.3.4], [$num=52]);
SumStats::observe("test.metric", [$host=1.2.3.4], [$num=61]); SumStats::observe("test", [$host=1.2.3.4], [$num=61]);
SumStats::observe("test.metric", [$host=1.2.3.4], [$num=95]); SumStats::observe("test", [$host=1.2.3.4], [$num=95]);
SumStats::observe("test.metric", [$host=6.5.4.3], [$num=5]); SumStats::observe("test", [$host=6.5.4.3], [$num=5]);
SumStats::observe("test.metric", [$host=7.2.1.5], [$num=91]); SumStats::observe("test", [$host=7.2.1.5], [$num=91]);
SumStats::observe("test.metric", [$host=10.10.10.10], [$num=5]); SumStats::observe("test", [$host=10.10.10.10], [$num=5]);
} }
} }