Beginning rework of SumStats API.

This commit is contained in:
Seth Hall 2013-07-30 11:46:51 -04:00
parent 4b9d8b2c73
commit 0e23a8bc9e
3 changed files with 84 additions and 91 deletions

View file

@ -39,10 +39,10 @@ export {
## a single key value from a sumstat. It's typically used to get intermediate ## a single key value from a sumstat. It's typically used to get intermediate
## updates before the break interval triggers to speed detection of a value ## updates before the break interval triggers to speed detection of a value
## crossing a threshold. ## crossing a threshold.
global cluster_key_request: event(uid: string, ss_name: string, key: Key, cleanup: bool); global cluster_keys_request: event(uid: string, ss_name: string, key: set[Key], cleanup: bool);
## This event is sent by nodes in response to a ## This event is sent by nodes in response to a
## :bro:id:`SumStats::cluster_key_request` event. ## :bro:id:`SumStats::cluster_keys_request` event.
global cluster_key_response: event(uid: string, ss_name: string, key: Key, result: Result, cleanup: bool); global cluster_key_response: event(uid: string, ss_name: string, key: Key, result: Result, cleanup: bool);
## This is sent by workers to indicate that they crossed the percent ## This is sent by workers to indicate that they crossed the percent
@ -89,68 +89,71 @@ function data_added(ss: SumStat, key: Key, result: Result)
} }
} }
event SumStats::send_data(uid: string, ss_name: string, data: ResultTable, cleanup: bool) #event SumStats::send_data(uid: string, ss_name: string, data: ResultTable, cleanup: bool)
# {
# #print fmt("WORKER %s: sending data for uid %s...", Cluster::node, uid);
#
# local local_data: ResultTable = table();
# local incoming_data: ResultTable = cleanup ? data : copy(data);
#
# local num_added = 0;
# for ( key in incoming_data )
# {
# local_data[key] = incoming_data[key];
# delete incoming_data[key];
#
# # Only send cluster_send_in_groups_of at a time. Queue another
# # event to send the next group.
# if ( cluster_send_in_groups_of == ++num_added )
# break;
# }
#
# local done = F;
# # If data is empty, this sumstat is done.
# if ( |incoming_data| == 0 )
# done = T;
#
# # Note: copy is needed to compensate serialization caching issue. This should be
# # changed to something else later.
# event SumStats::cluster_ss_response(uid, ss_name, copy(local_data), done, cleanup);
# if ( ! done )
# schedule 0.01 sec { SumStats::send_data(uid, ss_name, incoming_data, T) };
# }
#event SumStats::cluster_ss_request(uid: string, ss_name: string, cleanup: bool)
# {
# #print fmt("WORKER %s: received the cluster_ss_request event for %s.", Cluster::node, id);
#
# # Initiate sending all of the data for the requested stats.
# if ( ss_name in result_store )
# event SumStats::send_data(uid, ss_name, result_store[ss_name], cleanup);
# else
# event SumStats::send_data(uid, ss_name, table(), cleanup);
#
# # Lookup the actual sumstats and reset it, the reference to the data
# # currently stored will be maintained internally by the send_data event.
# if ( ss_name in stats_store && cleanup )
# reset(stats_store[ss_name]);
# }
event SumStats::cluster_keys_request(uid: string, ss_name: string, keys: set[Key], cleanup: bool)
{ {
#print fmt("WORKER %s: sending data for uid %s...", Cluster::node, uid); for ( key in keys )
local local_data: ResultTable = table();
local incoming_data: ResultTable = cleanup ? data : copy(data);
local num_added = 0;
for ( key in incoming_data )
{ {
local_data[key] = incoming_data[key]; if ( ss_name in result_store && key in result_store[ss_name] )
delete incoming_data[key]; {
#print fmt("WORKER %s: received the cluster_keys_request event for %s=%s.", Cluster::node, key2str(key), data);
# Only send cluster_send_in_groups_of at a time. Queue another # Note: copy is needed to compensate serialization caching issue. This should be
# event to send the next group. # changed to something else later.
if ( cluster_send_in_groups_of == ++num_added ) event SumStats::cluster_key_response(uid, ss_name, key, copy(result_store[ss_name][key]), cleanup);
break; }
} else
{
local done = F; # We need to send an empty response if we don't have the data so that the manager
# If data is empty, this sumstat is done. # can know that it heard back from all of the workers.
if ( |incoming_data| == 0 ) event SumStats::cluster_key_response(uid, ss_name, key, table(), cleanup);
done = T; }
# Note: copy is needed to compensate serialization caching issue. This should be
# changed to something else later.
event SumStats::cluster_ss_response(uid, ss_name, copy(local_data), done, cleanup);
if ( ! done )
schedule 0.01 sec { SumStats::send_data(uid, ss_name, incoming_data, T) };
}
event SumStats::cluster_ss_request(uid: string, ss_name: string, cleanup: bool)
{
#print fmt("WORKER %s: received the cluster_ss_request event for %s.", Cluster::node, id);
# Initiate sending all of the data for the requested stats.
if ( ss_name in result_store )
event SumStats::send_data(uid, ss_name, result_store[ss_name], cleanup);
else
event SumStats::send_data(uid, ss_name, table(), cleanup);
# Lookup the actual sumstats and reset it, the reference to the data
# currently stored will be maintained internally by the send_data event.
if ( ss_name in stats_store && cleanup )
reset(stats_store[ss_name]);
}
event SumStats::cluster_key_request(uid: string, ss_name: string, key: Key, cleanup: bool)
{
if ( ss_name in result_store && key in result_store[ss_name] )
{
#print fmt("WORKER %s: received the cluster_key_request event for %s=%s.", Cluster::node, key2str(key), data);
# Note: copy is needed to compensate serialization caching issue. This should be
# changed to something else later.
event SumStats::cluster_key_response(uid, ss_name, key, copy(result_store[ss_name][key]), cleanup);
}
else
{
# We need to send an empty response if we don't have the data so that the manager
# can know that it heard back from all of the workers.
event SumStats::cluster_key_response(uid, ss_name, key, table(), cleanup);
} }
} }
@ -252,6 +255,10 @@ event SumStats::cluster_key_response(uid: string, ss_name: string, key: Key, res
threshold_crossed(ss, key, ir); threshold_crossed(ss, key, ir);
event SumStats::cluster_threshold_crossed(ss$name, key, threshold_tracker[ss$name][key]); event SumStats::cluster_threshold_crossed(ss$name, key, threshold_tracker[ss$name][key]);
} }
if ()
{
}
if ( cleanup ) if ( cleanup )
{ {
@ -289,7 +296,7 @@ event SumStats::cluster_key_intermediate_response(ss_name: string, key: Key)
local uid = unique_id(""); local uid = unique_id("");
done_with[uid] = 0; done_with[uid] = 0;
event SumStats::cluster_key_request(uid, ss_name, key, T); event SumStats::cluster_keys_request(uid, ss_name, set(key), T);
} }
event SumStats::cluster_ss_response(uid: string, ss_name: string, data: ResultTable, done: bool, cleanup: bool) event SumStats::cluster_ss_response(uid: string, ss_name: string, data: ResultTable, done: bool, cleanup: bool)
@ -378,7 +385,7 @@ function request_key(ss_name: string, key: Key): Result
done_with[uid] = 0; done_with[uid] = 0;
key_requests[uid] = table(); key_requests[uid] = table();
event SumStats::cluster_key_request(uid, ss_name, key, F); event SumStats::cluster_keys_request(uid, ss_name, set(key), F);
return when ( uid in done_with && Cluster::worker_count == done_with[uid] ) return when ( uid in done_with && Cluster::worker_count == done_with[uid] )
{ {
local result = key_requests[uid]; local result = key_requests[uid];

View file

@ -74,10 +74,6 @@ export {
## Type to store results for multiple reducers. ## Type to store results for multiple reducers.
type Result: table[string] of ResultVal; type Result: table[string] of ResultVal;
## Type to store a table of sumstats results indexed
## by keys.
type ResultTable: table[Key] of Result;
## SumStats represent an aggregation of reducers along with ## SumStats represent an aggregation of reducers along with
## mechanisms to handle various situations like the epoch ending ## mechanisms to handle various situations like the epoch ending
## or thresholds being crossed. ## or thresholds being crossed.
@ -92,7 +88,7 @@ export {
name: string; name: string;
## The interval at which this filter should be "broken" ## The interval at which this filter should be "broken"
## and the '$epoch_finished' callback called. The ## and the '$epoch_result' callback called. The
## results are also reset at this time so any threshold ## results are also reset at this time so any threshold
## based detection needs to be set to a ## based detection needs to be set to a
## value that should be expected to happen within ## value that should be expected to happen within
@ -119,15 +115,10 @@ export {
## A callback that is called when a threshold is crossed. ## A callback that is called when a threshold is crossed.
threshold_crossed: function(key: SumStats::Key, result: SumStats::Result) &optional; threshold_crossed: function(key: SumStats::Key, result: SumStats::Result) &optional;
## A callback with the full collection of Results for
## this SumStat.
epoch_finished: function(rt: SumStats::ResultTable) &optional;
#epoch_finished: function(num_keys: count) &optional;
## A callback that receives each of the results at the ## A callback that receives each of the results at the
## end of the analysis epoch. The function will be ## end of the analysis epoch. The function will be
## called once for each key. ## called once for each key.
#epoch_finished_result: function(key::SumStats::Key, result: SumStats::Result) &optional; epoch_result: function(ts: time, key::SumStats::Key, result: SumStats::Result) &optional;
}; };
## Create a summary statistic. ## Create a summary statistic.
@ -144,17 +135,6 @@ export {
## obs: The data point to send into the stream. ## obs: The data point to send into the stream.
global observe: function(id: string, key: SumStats::Key, obs: SumStats::Observation); global observe: function(id: string, key: SumStats::Key, obs: SumStats::Observation);
## Dynamically request a sumstat. This function should be
## used sparingly and not as a replacement for the callbacks
## from the :bro:see:`SumStat` record. The function is only
## available for use within "when" statements as an asynchronous
## function.
##
## ss_name: SumState name.
##
## Returns: The result table for the requested sumstat.
global request: function(ss_name: string): ResultTable;
## Dynamically request a sumstat key. This function should be ## Dynamically request a sumstat key. This function should be
## used sparingly and not as a replacement for the callbacks ## used sparingly and not as a replacement for the callbacks
## from the :bro:see:`SumStat` record. The function is only ## from the :bro:see:`SumStat` record. The function is only
@ -182,6 +162,9 @@ export {
global key2str: function(key: SumStats::Key): string; global key2str: function(key: SumStats::Key): string;
} }
# Type to store a table of sumstats results indexed by keys.
type ResultTable: table[Key] of Result;
# The function prototype for plugins to do calculations. # The function prototype for plugins to do calculations.
type ObserveFunc: function(r: Reducer, val: double, data: Observation, rv: ResultVal); type ObserveFunc: function(r: Reducer, val: double, data: Observation, rv: ResultVal);
@ -423,7 +406,7 @@ function observe(id: string, key: Key, obs: Observation)
local ss = stats_store[r$ssname]; local ss = stats_store[r$ssname];
# If there is a threshold and no epoch_finished callback # If there is a threshold and no epoch_result callback
# we don't need to continue counting since the data will # we don't need to continue counting since the data will
# never be accessed. This was leading # never be accessed. This was leading
# to some state management issues when measuring # to some state management issues when measuring
@ -431,7 +414,7 @@ function observe(id: string, key: Key, obs: Observation)
# NOTE: this optimization could need removed in the # NOTE: this optimization could need removed in the
# future if on demand access is provided to the # future if on demand access is provided to the
# SumStats results. # SumStats results.
if ( ! ss?$epoch_finished && if ( ! ss?$epoch_result &&
r$ssname in threshold_tracker && r$ssname in threshold_tracker &&
( ss?$threshold && ( ss?$threshold &&
key in threshold_tracker[r$ssname] && key in threshold_tracker[r$ssname] &&

View file

@ -6,10 +6,13 @@ event SumStats::finish_epoch(ss: SumStat)
{ {
if ( ss$name in result_store ) if ( ss$name in result_store )
{ {
local data = result_store[ss$name]; if ( ss?$epoch_result )
if ( ss?$epoch_finished ) {
ss$epoch_finished(data); local data = result_store[ss$name];
# TODO: don't block here.
for ( key in data )
ss$epoch_result(network_time(), key, data[key]);
}
reset(ss); reset(ss);
} }