Removed the when statement from general use in the sumstats cluster code.

This commit is contained in:
Seth Hall 2013-08-26 13:57:42 -04:00
parent b3a5c5f412
commit 72d2cd363e

View file

@ -95,37 +95,6 @@ function data_added(ss: SumStat, key: Key, result: Result)
} }
} }
#event SumStats::send_data(uid: string, ss_name: string, cleanup: bool)
# {
# #print fmt("WORKER %s: sending data for uid %s...", Cluster::node, uid);
#
# local local_data: ResultTable = table();
# local incoming_data: ResultTable = cleanup ? data : copy(data);
#
# local num_added = 0;
# for ( key in incoming_data )
# {
# local_data[key] = incoming_data[key];
# delete incoming_data[key];
#
# # Only send cluster_send_in_groups_of at a time. Queue another
# # event to send the next group.
# if ( cluster_send_in_groups_of == ++num_added )
# break;
# }
#
# local done = F;
# # If data is empty, this sumstat is done.
# if ( |incoming_data| == 0 )
# done = T;
#
# # Note: copy is needed to compensate serialization caching issue. This should be
# # changed to something else later.
# event SumStats::cluster_ss_response(uid, ss_name, copy(local_data), done, cleanup);
# if ( ! done )
# schedule 0.01 sec { SumStats::send_data(uid, T) };
# }
event SumStats::get_a_key(uid: string, ss_name: string, cleanup: bool) event SumStats::get_a_key(uid: string, ss_name: string, cleanup: bool)
{ {
if ( uid in sending_results ) if ( uid in sending_results )
@ -204,6 +173,8 @@ event SumStats::cluster_get_result(uid: string, ss_name: string, key: Key, clean
{ {
if ( ss_name in result_store && key in result_store[ss_name] ) if ( ss_name in result_store && key in result_store[ss_name] )
{ {
# Note: copy is needed to compensate serialization caching issue. This should be
# changed to something else later.
event SumStats::cluster_send_result(uid, ss_name, key, copy(result_store[ss_name][key]), cleanup); event SumStats::cluster_send_result(uid, ss_name, key, copy(result_store[ss_name][key]), cleanup);
} }
else else
@ -231,7 +202,12 @@ event SumStats::cluster_threshold_crossed(ss_name: string, key: SumStats::Key, t
# This variable is maintained by manager nodes as they collect and aggregate # This variable is maintained by manager nodes as they collect and aggregate
# results. # results.
# Index on a uid. # Index on a uid.
global stats_keys: table[string] of set[Key] &create_expire=1min; global stats_keys: table[string] of set[Key] &create_expire=1min
&expire_func=function(s: table[string] of set[Key], idx: string): interval
{
Reporter::warning(fmt("SumStat key request for the %s SumStat took longer than 1 minute and was automatically cancelled.", idx));
return 0secs;
};
# This variable is maintained by manager nodes to track how many "dones" they # This variable is maintained by manager nodes to track how many "dones" they
# collected per collection unique id. Once the number of results for a uid # collected per collection unique id. Once the number of results for a uid
@ -246,11 +222,15 @@ global done_with: table[string] of count &create_expire=1min &default=0;
# Indexed on a uid. # Indexed on a uid.
global key_requests: table[string] of Result &create_expire=1min; global key_requests: table[string] of Result &create_expire=1min;
# Store uids for dynamic requests here to avoid cleanup on the uid.
# (This needs to be done differently!)
global dynamic_requests: set[string] &create_expire=1min;
# This variable is maintained by managers to prevent overwhelming communication due # This variable is maintained by managers to prevent overwhelming communication due
# to too many intermediate updates. Each sumstat is tracked separately so that # to too many intermediate updates. Each sumstat is tracked separately so that
# one won't overwhelm and degrade other quieter sumstats. # one won't overwhelm and degrade other quieter sumstats.
# Indexed on a sumstat id. # Indexed on a sumstat id.
global outstanding_global_views: table[string] of count &create_expire=1min &default=0; global outstanding_global_views: table[string] of count &read_expire=1min &default=0;
const zero_time = double_to_time(0.0); const zero_time = double_to_time(0.0);
# Managers handle logging. # Managers handle logging.
@ -337,12 +317,6 @@ function request_all_current_keys(uid: string, ss_name: string, cleanup: bool)
{ {
done_with[uid] = 0; done_with[uid] = 0;
event SumStats::cluster_get_result(uid, ss_name, key, cleanup); event SumStats::cluster_get_result(uid, ss_name, key, cleanup);
when ( uid in done_with && Cluster::worker_count == done_with[uid] )
{
#print "done getting result";
handle_end_of_result_collection(uid, ss_name, key, cleanup);
request_all_current_keys(uid, ss_name, cleanup);
}
delete stats_keys[uid][key]; delete stats_keys[uid][key];
break; # only a single key break; # only a single key
} }
@ -392,7 +366,7 @@ event SumStats::send_a_key(uid: string, ss_name: string, key: Key)
#print fmt("send_a_key %s", key); #print fmt("send_a_key %s", key);
if ( uid !in stats_keys ) if ( uid !in stats_keys )
{ {
# no clue what happened here Reporter::warning(fmt("Manager received a uid for an unknown request. SumStat: %s, Key: %s", ss_name, key));
return; return;
} }
@ -442,11 +416,14 @@ event SumStats::cluster_send_result(uid: string, ss_name: string, key: Key, resu
#print fmt("MANAGER: got a result for %s %s from %s", uid, key, get_event_peer()$descr); #print fmt("MANAGER: got a result for %s %s from %s", uid, key, get_event_peer()$descr);
++done_with[uid]; ++done_with[uid];
#if ( Cluster::worker_count == done_with[uid] ) if ( uid !in dynamic_requests &&
# { uid in done_with && Cluster::worker_count == done_with[uid] )
# print "done"; {
# handle_end_of_result_collection(uid, ss_name, key, cleanup); handle_end_of_result_collection(uid, ss_name, key, cleanup);
# }
if ( cleanup )
request_all_current_keys(uid, ss_name, cleanup);
}
} }
# Managers handle intermediate updates here. # Managers handle intermediate updates here.
@ -470,110 +447,14 @@ event SumStats::cluster_key_intermediate_response(ss_name: string, key: Key)
done_with[uid] = 0; done_with[uid] = 0;
#print fmt("requesting results for: %s", uid); #print fmt("requesting results for: %s", uid);
event SumStats::cluster_get_result(uid, ss_name, key, F); event SumStats::cluster_get_result(uid, ss_name, key, F);
when ( uid in done_with && Cluster::worker_count == done_with[uid] )
{
#print fmt("workers: %d done: %d", Cluster::worker_count, done_with[uid]);
handle_end_of_result_collection(uid, ss_name, key, F);
} }
timeout 1.1min
{
Reporter::warning(fmt("Dynamic SumStat intermediate key request for %s (%s) took longer than 1 minute and was automatically cancelled.", ss_name, key));
}
}
#event SumStats::cluster_ss_response(uid: string, ss_name: string, data: ResultTable, done: bool, cleanup: bool)
# {
# #print fmt("MANAGER: receiving results from %s", get_event_peer()$descr);
#
# # Mark another worker as being "done" for this uid.
# if ( done )
# ++done_with[uid];
#
# # We had better only be getting requests for stuff that exists.
# if ( ss_name !in stats_store )
# return;
#
# if ( uid !in stats_keys )
# stats_keys[uid] = table();
#
# local local_data = stats_keys[uid];
# local ss = stats_store[ss_name];
#
# for ( key in data )
# {
# if ( key in local_data )
# local_data[key] = compose_results(local_data[key], data[key]);
# else
# local_data[key] = data[key];
#
# # If a stat is done being collected, thresholds for each key
# # need to be checked so we're doing it here to avoid doubly
# # iterating over each key.
# if ( Cluster::worker_count == done_with[uid] )
# {
# if ( check_thresholds(ss, key, local_data[key], 1.0) )
# {
# threshold_crossed(ss, key, local_data[key]);
# event SumStats::cluster_threshold_crossed(ss$name, key, threshold_tracker[ss$name][key]);
# }
# }
# }
#
# # If the data has been collected from all peers, we are done and ready to finish.
# if ( cleanup && Cluster::worker_count == done_with[uid] )
# {
# local now = network_time();
# if ( ss?$epoch_result )
# {
# for ( key in local_data )
# ss$epoch_result(now, key, local_data[key]);
# }
#
# if ( ss?$epoch_finished )
# ss$epoch_finished(now);
#
# # Clean up
# delete stats_keys[uid];
# delete done_with[uid];
# reset(ss);
# }
# }
#function request(ss_name: string): ResultTable
# {
# # This only needs to be implemented this way for cluster compatibility.
# local uid = unique_id("dyn-");
# stats_keys[uid] = table();
# done_with[uid] = 0;
# event SumStats::cluster_ss_request(uid, ss_name, F);
#
# return when ( uid in done_with && Cluster::worker_count == done_with[uid] )
# {
# if ( uid in stats_keys )
# {
# local ss_result = stats_keys[uid];
# # Clean up
# delete stats_keys[uid];
# delete done_with[uid];
# reset(stats_store[ss_name]);
# return ss_result;
# }
# else
# return table();
# }
# timeout 1.1min
# {
# Reporter::warning(fmt("Dynamic SumStat request for %s took longer than 1 minute and was automatically cancelled.", ss_name));
# return table();
# }
# }
function request_key(ss_name: string, key: Key): Result function request_key(ss_name: string, key: Key): Result
{ {
local uid = unique_id(""); local uid = unique_id("");
done_with[uid] = 0; done_with[uid] = 0;
key_requests[uid] = table(); key_requests[uid] = table();
add dynamic_requests[uid];
event SumStats::cluster_get_result(uid, ss_name, key, F); event SumStats::cluster_get_result(uid, ss_name, key, F);
return when ( uid in done_with && Cluster::worker_count == done_with[uid] ) return when ( uid in done_with && Cluster::worker_count == done_with[uid] )
@ -583,13 +464,14 @@ function request_key(ss_name: string, key: Key): Result
# Clean up # Clean up
delete key_requests[uid]; delete key_requests[uid];
delete done_with[uid]; delete done_with[uid];
delete dynamic_requests[uid];
return result; return result;
} }
timeout 1.1min timeout 1.1min
{ {
Reporter::warning(fmt("Dynamic SumStat key request for %s (%s) took longer than 1 minute and was automatically cancelled.", ss_name, key)); Reporter::warning(fmt("Dynamic SumStat key request for %s in SumStat %s took longer than 1 minute and was automatically cancelled.", key, ss_name));
return table(); return Result();
} }
} }