From 135094428e9992d42b1208e3f1181775e7d4e22f Mon Sep 17 00:00:00 2001 From: Seth Hall Date: Fri, 2 Aug 2013 16:30:34 -0400 Subject: [PATCH] Hopefully fix the SumStats cluster support. --- scripts/base/frameworks/sumstats/cluster.bro | 104 +++++++++++-------- 1 file changed, 58 insertions(+), 46 deletions(-) diff --git a/scripts/base/frameworks/sumstats/cluster.bro b/scripts/base/frameworks/sumstats/cluster.bro index d5c5bc440a..b13fff0431 100644 --- a/scripts/base/frameworks/sumstats/cluster.bro +++ b/scripts/base/frameworks/sumstats/cluster.bro @@ -10,10 +10,6 @@ module SumStats; export { - ## Allows a user to decide how large of result groups the workers should transmit - ## values for cluster stats aggregation. - const cluster_send_in_groups_of = 50 &redef; - ## The percent of the full threshold value that needs to be met on a single worker ## for that worker to send the value to its manager in order for it to request a ## global view for that value. There is no requirement that the manager requests @@ -53,7 +49,7 @@ export { ## This event is scheduled internally on workers to send result chunks. global send_data: event(uid: string, ss_name: string, data: ResultTable, cleanup: bool); - global get_a_key: event(uid: string, ss_name: string); + global get_a_key: event(uid: string, ss_name: string, cleanup: bool &default=F); global send_a_key: event(uid: string, ss_name: string, key: Key); global send_no_key: event(uid: string, ss_name: string); @@ -130,30 +126,38 @@ function data_added(ss: SumStat, key: Key, result: Result) # schedule 0.01 sec { SumStats::send_data(uid, T) }; # } -event SumStats::get_a_key(uid: string, ss_name: string) +event SumStats::get_a_key(uid: string, ss_name: string, cleanup: bool) { if ( uid in sending_results ) { if ( |sending_results[uid]| == 0 ) - event SumStats::send_no_key(uid, ss_name); - - for ( key in sending_results[uid] ) { - event SumStats::send_a_key(uid, ss_name, key); - # break to only send one. - break; + event SumStats::send_no_key(uid, ss_name); + } + else + { + for ( key in sending_results[uid] ) + { + event SumStats::send_a_key(uid, ss_name, key); + # break to only send one. + break; + } } } - else if ( ss_name in result_store && |result_store[ss_name]| > 0 ) + else if ( !cleanup && ss_name in result_store && |result_store[ss_name]| > 0 ) { if ( |result_store[ss_name]| == 0 ) - event SumStats::send_no_key(uid, ss_name); - - for ( key in result_store[ss_name] ) { - event SumStats::send_a_key(uid, ss_name, key); - # break to only send one. - break; + event SumStats::send_no_key(uid, ss_name); + } + else + { + for ( key in result_store[ss_name] ) + { + event SumStats::send_a_key(uid, ss_name, key); + # break to only send one. + break; + } } } else @@ -232,7 +236,7 @@ event SumStats::thresholds_reset(ss_name: string) # This variable is maintained by manager nodes as they collect and aggregate # results. # Index on a uid. -global stats_results: table[string] of ResultTable &create_expire=1min; +global stats_keys: table[string] of set[Key] &create_expire=1min; # This variable is maintained by manager nodes to track how many "dones" they # collected per collection unique id. Once the number of results for a uid @@ -262,15 +266,16 @@ event SumStats::finish_epoch(ss: SumStat) #print fmt("%.6f MANAGER: breaking %s sumstat", network_time(), ss$name); local uid = unique_id(""); - if ( uid in stats_results ) - delete stats_results[uid]; - stats_results[uid] = table(); + if ( uid in stats_keys ) + delete stats_keys[uid]; + stats_keys[uid] = set(); # Request data from peers. event SumStats::cluster_ss_request(uid, ss$name, T); done_with[uid] = 0; - event SumStats::get_a_key(uid, ss$name); + #print fmt("get_key by uid: %s", uid); + event SumStats::get_a_key(uid, ss$name, T); } # Schedule the next finish_epoch event. @@ -299,9 +304,6 @@ function handle_end_of_result_collection(uid: string, ss_name: string, key: Key, event SumStats::cluster_threshold_crossed(ss_name, key, threshold_tracker[ss_name][key]); } - delete key_requests[uid]; - delete done_with[uid]; - if ( cleanup ) { # This is done here because "cleanup" implicitly means @@ -319,25 +321,27 @@ function handle_end_of_result_collection(uid: string, ss_name: string, key: Key, --outstanding_global_views[ss_name]; } - if ( uid in stats_results ) - delete stats_results[uid][key]; + delete key_requests[uid]; + delete done_with[uid]; } function request_all_current_keys(uid: string, ss_name: string, cleanup: bool) { #print "request_all_current_keys"; - if ( uid in stats_results && |stats_results[uid]| > 0 ) + if ( uid in stats_keys && |stats_keys[uid]| > 0 ) { - #print fmt(" -- %d remaining keys here", |stats_results[uid]|); - for ( key in stats_results[uid] ) + #print fmt(" -- %d remaining keys here", |stats_keys[uid]|); + for ( key in stats_keys[uid] ) { done_with[uid] = 0; event SumStats::cluster_get_result(uid, ss_name, key, cleanup); when ( uid in done_with && Cluster::worker_count == done_with[uid] ) { + #print "done getting result"; handle_end_of_result_collection(uid, ss_name, key, cleanup); request_all_current_keys(uid, ss_name, cleanup); } + delete stats_keys[uid][key]; break; # only a single key } } @@ -345,7 +349,8 @@ function request_all_current_keys(uid: string, ss_name: string, cleanup: bool) { # Get more keys! And this breaks us out of the evented loop. done_with[uid] = 0; - event SumStats::get_a_key(uid, ss_name); + #print fmt("get_key by uid: %s", uid); + event SumStats::get_a_key(uid, ss_name, cleanup); } } @@ -357,7 +362,7 @@ event SumStats::send_no_key(uid: string, ss_name: string) { delete done_with[uid]; - if ( |stats_results[uid]| > 0 ) + if ( |stats_keys[uid]| > 0 ) { #print "we need more keys!"; # Now that we have a key from each worker, lets @@ -377,21 +382,21 @@ event SumStats::send_no_key(uid: string, ss_name: string) event SumStats::send_a_key(uid: string, ss_name: string, key: Key) { #print fmt("send_a_key %s", key); - if ( uid !in stats_results ) + if ( uid !in stats_keys ) { # no clue what happened here return; } - if ( key !in stats_results[uid] ) - stats_results[uid][key] = table(); + if ( key !in stats_keys[uid] ) + add stats_keys[uid][key]; ++done_with[uid]; if ( Cluster::worker_count == done_with[uid] ) { delete done_with[uid]; - if ( |stats_results[uid]| > 0 ) + if ( |stats_keys[uid]| > 0 ) { #print "we need more keys!"; # Now that we have a key from each worker, lets @@ -422,6 +427,12 @@ event SumStats::cluster_send_result(uid: string, ss_name: string, key: Key, resu # Mark that a worker is done. ++done_with[uid]; + + #if ( Cluster::worker_count == done_with[uid] ) + # { + # print "done"; + # handle_end_of_result_collection(uid, ss_name, key, cleanup); + # } } # Managers handle intermediate updates here. @@ -458,10 +469,10 @@ event SumStats::cluster_key_intermediate_response(ss_name: string, key: Key) # if ( ss_name !in stats_store ) # return; # -# if ( uid !in stats_results ) -# stats_results[uid] = table(); +# if ( uid !in stats_keys ) +# stats_keys[uid] = table(); # -# local local_data = stats_results[uid]; +# local local_data = stats_keys[uid]; # local ss = stats_store[ss_name]; # # for ( key in data ) @@ -498,7 +509,7 @@ event SumStats::cluster_key_intermediate_response(ss_name: string, key: Key) # ss$epoch_finished(now); # # # Clean up -# delete stats_results[uid]; +# delete stats_keys[uid]; # delete done_with[uid]; # reset(ss); # } @@ -508,17 +519,17 @@ event SumStats::cluster_key_intermediate_response(ss_name: string, key: Key) # { # # This only needs to be implemented this way for cluster compatibility. # local uid = unique_id("dyn-"); -# stats_results[uid] = table(); +# stats_keys[uid] = table(); # done_with[uid] = 0; # event SumStats::cluster_ss_request(uid, ss_name, F); # # return when ( uid in done_with && Cluster::worker_count == done_with[uid] ) # { -# if ( uid in stats_results ) +# if ( uid in stats_keys ) # { -# local ss_result = stats_results[uid]; +# local ss_result = stats_keys[uid]; # # Clean up -# delete stats_results[uid]; +# delete stats_keys[uid]; # delete done_with[uid]; # reset(stats_store[ss_name]); # return ss_result; @@ -542,6 +553,7 @@ function request_key(ss_name: string, key: Key): Result event SumStats::cluster_get_result(uid, ss_name, key, F); return when ( uid in done_with && Cluster::worker_count == done_with[uid] ) { + #print "done with request_key"; local result = key_requests[uid]; # Clean up delete key_requests[uid];