From 72d2cd363eb8dc3e41f83d73152b9923c6723afc Mon Sep 17 00:00:00 2001 From: Seth Hall Date: Mon, 26 Aug 2013 13:57:42 -0400 Subject: [PATCH] Removed the when statement from general use in the sumstats cluster code. --- scripts/base/frameworks/sumstats/cluster.bro | 170 +++---------------- 1 file changed, 26 insertions(+), 144 deletions(-) diff --git a/scripts/base/frameworks/sumstats/cluster.bro b/scripts/base/frameworks/sumstats/cluster.bro index 6c72d5ff17..b6c0668f40 100644 --- a/scripts/base/frameworks/sumstats/cluster.bro +++ b/scripts/base/frameworks/sumstats/cluster.bro @@ -95,37 +95,6 @@ function data_added(ss: SumStat, key: Key, result: Result) } } -#event SumStats::send_data(uid: string, ss_name: string, cleanup: bool) -# { -# #print fmt("WORKER %s: sending data for uid %s...", Cluster::node, uid); -# -# local local_data: ResultTable = table(); -# local incoming_data: ResultTable = cleanup ? data : copy(data); -# -# local num_added = 0; -# for ( key in incoming_data ) -# { -# local_data[key] = incoming_data[key]; -# delete incoming_data[key]; -# -# # Only send cluster_send_in_groups_of at a time. Queue another -# # event to send the next group. -# if ( cluster_send_in_groups_of == ++num_added ) -# break; -# } -# -# local done = F; -# # If data is empty, this sumstat is done. -# if ( |incoming_data| == 0 ) -# done = T; -# -# # Note: copy is needed to compensate serialization caching issue. This should be -# # changed to something else later. -# event SumStats::cluster_ss_response(uid, ss_name, copy(local_data), done, cleanup); -# if ( ! done ) -# schedule 0.01 sec { SumStats::send_data(uid, T) }; -# } - event SumStats::get_a_key(uid: string, ss_name: string, cleanup: bool) { if ( uid in sending_results ) @@ -204,6 +173,8 @@ event SumStats::cluster_get_result(uid: string, ss_name: string, key: Key, clean { if ( ss_name in result_store && key in result_store[ss_name] ) { + # Note: copy is needed to compensate serialization caching issue. This should be + # changed to something else later. event SumStats::cluster_send_result(uid, ss_name, key, copy(result_store[ss_name][key]), cleanup); } else @@ -231,7 +202,12 @@ event SumStats::cluster_threshold_crossed(ss_name: string, key: SumStats::Key, t # This variable is maintained by manager nodes as they collect and aggregate # results. # Index on a uid. -global stats_keys: table[string] of set[Key] &create_expire=1min; +global stats_keys: table[string] of set[Key] &create_expire=1min + &expire_func=function(s: table[string] of set[Key], idx: string): interval + { + Reporter::warning(fmt("SumStat key request for the %s SumStat took longer than 1 minute and was automatically cancelled.", idx)); + return 0secs; + }; # This variable is maintained by manager nodes to track how many "dones" they # collected per collection unique id. Once the number of results for a uid @@ -246,11 +222,15 @@ global done_with: table[string] of count &create_expire=1min &default=0; # Indexed on a uid. global key_requests: table[string] of Result &create_expire=1min; +# Store uids for dynamic requests here to avoid cleanup on the uid. +# (This needs to be done differently!) +global dynamic_requests: set[string] &create_expire=1min; + # This variable is maintained by managers to prevent overwhelming communication due # to too many intermediate updates. Each sumstat is tracked separately so that # one won't overwhelm and degrade other quieter sumstats. # Indexed on a sumstat id. -global outstanding_global_views: table[string] of count &create_expire=1min &default=0; +global outstanding_global_views: table[string] of count &read_expire=1min &default=0; const zero_time = double_to_time(0.0); # Managers handle logging. @@ -337,12 +317,6 @@ function request_all_current_keys(uid: string, ss_name: string, cleanup: bool) { done_with[uid] = 0; event SumStats::cluster_get_result(uid, ss_name, key, cleanup); - when ( uid in done_with && Cluster::worker_count == done_with[uid] ) - { - #print "done getting result"; - handle_end_of_result_collection(uid, ss_name, key, cleanup); - request_all_current_keys(uid, ss_name, cleanup); - } delete stats_keys[uid][key]; break; # only a single key } @@ -392,7 +366,7 @@ event SumStats::send_a_key(uid: string, ss_name: string, key: Key) #print fmt("send_a_key %s", key); if ( uid !in stats_keys ) { - # no clue what happened here + Reporter::warning(fmt("Manager received a uid for an unknown request. SumStat: %s, Key: %s", ss_name, key)); return; } @@ -442,11 +416,14 @@ event SumStats::cluster_send_result(uid: string, ss_name: string, key: Key, resu #print fmt("MANAGER: got a result for %s %s from %s", uid, key, get_event_peer()$descr); ++done_with[uid]; - #if ( Cluster::worker_count == done_with[uid] ) - # { - # print "done"; - # handle_end_of_result_collection(uid, ss_name, key, cleanup); - # } + if ( uid !in dynamic_requests && + uid in done_with && Cluster::worker_count == done_with[uid] ) + { + handle_end_of_result_collection(uid, ss_name, key, cleanup); + + if ( cleanup ) + request_all_current_keys(uid, ss_name, cleanup); + } } # Managers handle intermediate updates here. @@ -470,110 +447,14 @@ event SumStats::cluster_key_intermediate_response(ss_name: string, key: Key) done_with[uid] = 0; #print fmt("requesting results for: %s", uid); event SumStats::cluster_get_result(uid, ss_name, key, F); - when ( uid in done_with && Cluster::worker_count == done_with[uid] ) - { - #print fmt("workers: %d done: %d", Cluster::worker_count, done_with[uid]); - handle_end_of_result_collection(uid, ss_name, key, F); - } - timeout 1.1min - { - Reporter::warning(fmt("Dynamic SumStat intermediate key request for %s (%s) took longer than 1 minute and was automatically cancelled.", ss_name, key)); - } - } -#event SumStats::cluster_ss_response(uid: string, ss_name: string, data: ResultTable, done: bool, cleanup: bool) -# { -# #print fmt("MANAGER: receiving results from %s", get_event_peer()$descr); -# -# # Mark another worker as being "done" for this uid. -# if ( done ) -# ++done_with[uid]; -# -# # We had better only be getting requests for stuff that exists. -# if ( ss_name !in stats_store ) -# return; -# -# if ( uid !in stats_keys ) -# stats_keys[uid] = table(); -# -# local local_data = stats_keys[uid]; -# local ss = stats_store[ss_name]; -# -# for ( key in data ) -# { -# if ( key in local_data ) -# local_data[key] = compose_results(local_data[key], data[key]); -# else -# local_data[key] = data[key]; -# -# # If a stat is done being collected, thresholds for each key -# # need to be checked so we're doing it here to avoid doubly -# # iterating over each key. -# if ( Cluster::worker_count == done_with[uid] ) -# { -# if ( check_thresholds(ss, key, local_data[key], 1.0) ) -# { -# threshold_crossed(ss, key, local_data[key]); -# event SumStats::cluster_threshold_crossed(ss$name, key, threshold_tracker[ss$name][key]); -# } -# } -# } -# -# # If the data has been collected from all peers, we are done and ready to finish. -# if ( cleanup && Cluster::worker_count == done_with[uid] ) -# { -# local now = network_time(); -# if ( ss?$epoch_result ) -# { -# for ( key in local_data ) -# ss$epoch_result(now, key, local_data[key]); -# } -# -# if ( ss?$epoch_finished ) -# ss$epoch_finished(now); -# -# # Clean up -# delete stats_keys[uid]; -# delete done_with[uid]; -# reset(ss); -# } -# } - -#function request(ss_name: string): ResultTable -# { -# # This only needs to be implemented this way for cluster compatibility. -# local uid = unique_id("dyn-"); -# stats_keys[uid] = table(); -# done_with[uid] = 0; -# event SumStats::cluster_ss_request(uid, ss_name, F); -# -# return when ( uid in done_with && Cluster::worker_count == done_with[uid] ) -# { -# if ( uid in stats_keys ) -# { -# local ss_result = stats_keys[uid]; -# # Clean up -# delete stats_keys[uid]; -# delete done_with[uid]; -# reset(stats_store[ss_name]); -# return ss_result; -# } -# else -# return table(); -# } -# timeout 1.1min -# { -# Reporter::warning(fmt("Dynamic SumStat request for %s took longer than 1 minute and was automatically cancelled.", ss_name)); -# return table(); -# } -# } - function request_key(ss_name: string, key: Key): Result { local uid = unique_id(""); done_with[uid] = 0; key_requests[uid] = table(); + add dynamic_requests[uid]; event SumStats::cluster_get_result(uid, ss_name, key, F); return when ( uid in done_with && Cluster::worker_count == done_with[uid] ) @@ -583,13 +464,14 @@ function request_key(ss_name: string, key: Key): Result # Clean up delete key_requests[uid]; delete done_with[uid]; + delete dynamic_requests[uid]; return result; } timeout 1.1min { - Reporter::warning(fmt("Dynamic SumStat key request for %s (%s) took longer than 1 minute and was automatically cancelled.", ss_name, key)); - return table(); + Reporter::warning(fmt("Dynamic SumStat key request for %s in SumStat %s took longer than 1 minute and was automatically cancelled.", key, ss_name)); + return Result(); } }