From 0e23a8bc9e24e535b2fff61f63232c6360c14638 Mon Sep 17 00:00:00 2001 From: Seth Hall Date: Tue, 30 Jul 2013 11:46:51 -0400 Subject: [PATCH] Beginning rework of SumStats API. --- scripts/base/frameworks/sumstats/cluster.bro | 133 +++++++++--------- scripts/base/frameworks/sumstats/main.bro | 31 +--- .../base/frameworks/sumstats/non-cluster.bro | 11 +- 3 files changed, 84 insertions(+), 91 deletions(-) diff --git a/scripts/base/frameworks/sumstats/cluster.bro b/scripts/base/frameworks/sumstats/cluster.bro index a69dd1ab54..0c005d72a6 100644 --- a/scripts/base/frameworks/sumstats/cluster.bro +++ b/scripts/base/frameworks/sumstats/cluster.bro @@ -39,10 +39,10 @@ export { ## a single key value from a sumstat. It's typically used to get intermediate ## updates before the break interval triggers to speed detection of a value ## crossing a threshold. - global cluster_key_request: event(uid: string, ss_name: string, key: Key, cleanup: bool); + global cluster_keys_request: event(uid: string, ss_name: string, key: set[Key], cleanup: bool); ## This event is sent by nodes in response to a - ## :bro:id:`SumStats::cluster_key_request` event. + ## :bro:id:`SumStats::cluster_keys_request` event. global cluster_key_response: event(uid: string, ss_name: string, key: Key, result: Result, cleanup: bool); ## This is sent by workers to indicate that they crossed the percent @@ -89,68 +89,71 @@ function data_added(ss: SumStat, key: Key, result: Result) } } -event SumStats::send_data(uid: string, ss_name: string, data: ResultTable, cleanup: bool) +#event SumStats::send_data(uid: string, ss_name: string, data: ResultTable, cleanup: bool) +# { +# #print fmt("WORKER %s: sending data for uid %s...", Cluster::node, uid); +# +# local local_data: ResultTable = table(); +# local incoming_data: ResultTable = cleanup ? data : copy(data); +# +# local num_added = 0; +# for ( key in incoming_data ) +# { +# local_data[key] = incoming_data[key]; +# delete incoming_data[key]; +# +# # Only send cluster_send_in_groups_of at a time. Queue another +# # event to send the next group. +# if ( cluster_send_in_groups_of == ++num_added ) +# break; +# } +# +# local done = F; +# # If data is empty, this sumstat is done. +# if ( |incoming_data| == 0 ) +# done = T; +# +# # Note: copy is needed to compensate serialization caching issue. This should be +# # changed to something else later. +# event SumStats::cluster_ss_response(uid, ss_name, copy(local_data), done, cleanup); +# if ( ! done ) +# schedule 0.01 sec { SumStats::send_data(uid, ss_name, incoming_data, T) }; +# } + +#event SumStats::cluster_ss_request(uid: string, ss_name: string, cleanup: bool) +# { +# #print fmt("WORKER %s: received the cluster_ss_request event for %s.", Cluster::node, id); +# +# # Initiate sending all of the data for the requested stats. +# if ( ss_name in result_store ) +# event SumStats::send_data(uid, ss_name, result_store[ss_name], cleanup); +# else +# event SumStats::send_data(uid, ss_name, table(), cleanup); +# +# # Lookup the actual sumstats and reset it, the reference to the data +# # currently stored will be maintained internally by the send_data event. +# if ( ss_name in stats_store && cleanup ) +# reset(stats_store[ss_name]); +# } + +event SumStats::cluster_keys_request(uid: string, ss_name: string, keys: set[Key], cleanup: bool) { - #print fmt("WORKER %s: sending data for uid %s...", Cluster::node, uid); - - local local_data: ResultTable = table(); - local incoming_data: ResultTable = cleanup ? data : copy(data); - - local num_added = 0; - for ( key in incoming_data ) + for ( key in keys ) { - local_data[key] = incoming_data[key]; - delete incoming_data[key]; + if ( ss_name in result_store && key in result_store[ss_name] ) + { + #print fmt("WORKER %s: received the cluster_keys_request event for %s=%s.", Cluster::node, key2str(key), data); - # Only send cluster_send_in_groups_of at a time. Queue another - # event to send the next group. - if ( cluster_send_in_groups_of == ++num_added ) - break; - } - - local done = F; - # If data is empty, this sumstat is done. - if ( |incoming_data| == 0 ) - done = T; - - # Note: copy is needed to compensate serialization caching issue. This should be - # changed to something else later. - event SumStats::cluster_ss_response(uid, ss_name, copy(local_data), done, cleanup); - if ( ! done ) - schedule 0.01 sec { SumStats::send_data(uid, ss_name, incoming_data, T) }; - } - -event SumStats::cluster_ss_request(uid: string, ss_name: string, cleanup: bool) - { - #print fmt("WORKER %s: received the cluster_ss_request event for %s.", Cluster::node, id); - - # Initiate sending all of the data for the requested stats. - if ( ss_name in result_store ) - event SumStats::send_data(uid, ss_name, result_store[ss_name], cleanup); - else - event SumStats::send_data(uid, ss_name, table(), cleanup); - - # Lookup the actual sumstats and reset it, the reference to the data - # currently stored will be maintained internally by the send_data event. - if ( ss_name in stats_store && cleanup ) - reset(stats_store[ss_name]); - } - -event SumStats::cluster_key_request(uid: string, ss_name: string, key: Key, cleanup: bool) - { - if ( ss_name in result_store && key in result_store[ss_name] ) - { - #print fmt("WORKER %s: received the cluster_key_request event for %s=%s.", Cluster::node, key2str(key), data); - - # Note: copy is needed to compensate serialization caching issue. This should be - # changed to something else later. - event SumStats::cluster_key_response(uid, ss_name, key, copy(result_store[ss_name][key]), cleanup); - } - else - { - # We need to send an empty response if we don't have the data so that the manager - # can know that it heard back from all of the workers. - event SumStats::cluster_key_response(uid, ss_name, key, table(), cleanup); + # Note: copy is needed to compensate serialization caching issue. This should be + # changed to something else later. + event SumStats::cluster_key_response(uid, ss_name, key, copy(result_store[ss_name][key]), cleanup); + } + else + { + # We need to send an empty response if we don't have the data so that the manager + # can know that it heard back from all of the workers. + event SumStats::cluster_key_response(uid, ss_name, key, table(), cleanup); + } } } @@ -252,6 +255,10 @@ event SumStats::cluster_key_response(uid: string, ss_name: string, key: Key, res threshold_crossed(ss, key, ir); event SumStats::cluster_threshold_crossed(ss$name, key, threshold_tracker[ss$name][key]); } + if () + { + + } if ( cleanup ) { @@ -289,7 +296,7 @@ event SumStats::cluster_key_intermediate_response(ss_name: string, key: Key) local uid = unique_id(""); done_with[uid] = 0; - event SumStats::cluster_key_request(uid, ss_name, key, T); + event SumStats::cluster_keys_request(uid, ss_name, set(key), T); } event SumStats::cluster_ss_response(uid: string, ss_name: string, data: ResultTable, done: bool, cleanup: bool) @@ -378,7 +385,7 @@ function request_key(ss_name: string, key: Key): Result done_with[uid] = 0; key_requests[uid] = table(); - event SumStats::cluster_key_request(uid, ss_name, key, F); + event SumStats::cluster_keys_request(uid, ss_name, set(key), F); return when ( uid in done_with && Cluster::worker_count == done_with[uid] ) { local result = key_requests[uid]; diff --git a/scripts/base/frameworks/sumstats/main.bro b/scripts/base/frameworks/sumstats/main.bro index 3afa507c7a..8d5a668cde 100644 --- a/scripts/base/frameworks/sumstats/main.bro +++ b/scripts/base/frameworks/sumstats/main.bro @@ -74,10 +74,6 @@ export { ## Type to store results for multiple reducers. type Result: table[string] of ResultVal; - ## Type to store a table of sumstats results indexed - ## by keys. - type ResultTable: table[Key] of Result; - ## SumStats represent an aggregation of reducers along with ## mechanisms to handle various situations like the epoch ending ## or thresholds being crossed. @@ -92,7 +88,7 @@ export { name: string; ## The interval at which this filter should be "broken" - ## and the '$epoch_finished' callback called. The + ## and the '$epoch_result' callback called. The ## results are also reset at this time so any threshold ## based detection needs to be set to a ## value that should be expected to happen within @@ -119,15 +115,10 @@ export { ## A callback that is called when a threshold is crossed. threshold_crossed: function(key: SumStats::Key, result: SumStats::Result) &optional; - ## A callback with the full collection of Results for - ## this SumStat. - epoch_finished: function(rt: SumStats::ResultTable) &optional; - #epoch_finished: function(num_keys: count) &optional; - ## A callback that receives each of the results at the ## end of the analysis epoch. The function will be ## called once for each key. - #epoch_finished_result: function(key::SumStats::Key, result: SumStats::Result) &optional; + epoch_result: function(ts: time, key::SumStats::Key, result: SumStats::Result) &optional; }; ## Create a summary statistic. @@ -144,17 +135,6 @@ export { ## obs: The data point to send into the stream. global observe: function(id: string, key: SumStats::Key, obs: SumStats::Observation); - ## Dynamically request a sumstat. This function should be - ## used sparingly and not as a replacement for the callbacks - ## from the :bro:see:`SumStat` record. The function is only - ## available for use within "when" statements as an asynchronous - ## function. - ## - ## ss_name: SumState name. - ## - ## Returns: The result table for the requested sumstat. - global request: function(ss_name: string): ResultTable; - ## Dynamically request a sumstat key. This function should be ## used sparingly and not as a replacement for the callbacks ## from the :bro:see:`SumStat` record. The function is only @@ -182,6 +162,9 @@ export { global key2str: function(key: SumStats::Key): string; } +# Type to store a table of sumstats results indexed by keys. +type ResultTable: table[Key] of Result; + # The function prototype for plugins to do calculations. type ObserveFunc: function(r: Reducer, val: double, data: Observation, rv: ResultVal); @@ -423,7 +406,7 @@ function observe(id: string, key: Key, obs: Observation) local ss = stats_store[r$ssname]; - # If there is a threshold and no epoch_finished callback + # If there is a threshold and no epoch_result callback # we don't need to continue counting since the data will # never be accessed. This was leading # to some state management issues when measuring @@ -431,7 +414,7 @@ function observe(id: string, key: Key, obs: Observation) # NOTE: this optimization could need removed in the # future if on demand access is provided to the # SumStats results. - if ( ! ss?$epoch_finished && + if ( ! ss?$epoch_result && r$ssname in threshold_tracker && ( ss?$threshold && key in threshold_tracker[r$ssname] && diff --git a/scripts/base/frameworks/sumstats/non-cluster.bro b/scripts/base/frameworks/sumstats/non-cluster.bro index 265261f1bd..b7e18bd55a 100644 --- a/scripts/base/frameworks/sumstats/non-cluster.bro +++ b/scripts/base/frameworks/sumstats/non-cluster.bro @@ -6,10 +6,13 @@ event SumStats::finish_epoch(ss: SumStat) { if ( ss$name in result_store ) { - local data = result_store[ss$name]; - if ( ss?$epoch_finished ) - ss$epoch_finished(data); - + if ( ss?$epoch_result ) + { + local data = result_store[ss$name]; + # TODO: don't block here. + for ( key in data ) + ss$epoch_result(network_time(), key, data[key]); + } reset(ss); }