Hopefully fix the SumStats cluster support.

This commit is contained in:
Seth Hall 2013-08-02 16:30:34 -04:00
parent 7b8073556e
commit 135094428e

View file

@ -10,10 +10,6 @@
module SumStats;
export {
## Allows a user to decide how large of result groups the workers should transmit
## values for cluster stats aggregation.
const cluster_send_in_groups_of = 50 &redef;
## The percent of the full threshold value that needs to be met on a single worker
## for that worker to send the value to its manager in order for it to request a
## global view for that value. There is no requirement that the manager requests
@ -53,7 +49,7 @@ export {
## This event is scheduled internally on workers to send result chunks.
global send_data: event(uid: string, ss_name: string, data: ResultTable, cleanup: bool);
global get_a_key: event(uid: string, ss_name: string);
global get_a_key: event(uid: string, ss_name: string, cleanup: bool &default=F);
global send_a_key: event(uid: string, ss_name: string, key: Key);
global send_no_key: event(uid: string, ss_name: string);
@ -130,13 +126,16 @@ function data_added(ss: SumStat, key: Key, result: Result)
# schedule 0.01 sec { SumStats::send_data(uid, T) };
# }
event SumStats::get_a_key(uid: string, ss_name: string)
event SumStats::get_a_key(uid: string, ss_name: string, cleanup: bool)
{
if ( uid in sending_results )
{
if ( |sending_results[uid]| == 0 )
{
event SumStats::send_no_key(uid, ss_name);
}
else
{
for ( key in sending_results[uid] )
{
event SumStats::send_a_key(uid, ss_name, key);
@ -144,11 +143,15 @@ event SumStats::get_a_key(uid: string, ss_name: string)
break;
}
}
else if ( ss_name in result_store && |result_store[ss_name]| > 0 )
}
else if ( !cleanup && ss_name in result_store && |result_store[ss_name]| > 0 )
{
if ( |result_store[ss_name]| == 0 )
{
event SumStats::send_no_key(uid, ss_name);
}
else
{
for ( key in result_store[ss_name] )
{
event SumStats::send_a_key(uid, ss_name, key);
@ -156,6 +159,7 @@ event SumStats::get_a_key(uid: string, ss_name: string)
break;
}
}
}
else
{
event SumStats::send_no_key(uid, ss_name);
@ -232,7 +236,7 @@ event SumStats::thresholds_reset(ss_name: string)
# This variable is maintained by manager nodes as they collect and aggregate
# results.
# Index on a uid.
global stats_results: table[string] of ResultTable &create_expire=1min;
global stats_keys: table[string] of set[Key] &create_expire=1min;
# This variable is maintained by manager nodes to track how many "dones" they
# collected per collection unique id. Once the number of results for a uid
@ -262,15 +266,16 @@ event SumStats::finish_epoch(ss: SumStat)
#print fmt("%.6f MANAGER: breaking %s sumstat", network_time(), ss$name);
local uid = unique_id("");
if ( uid in stats_results )
delete stats_results[uid];
stats_results[uid] = table();
if ( uid in stats_keys )
delete stats_keys[uid];
stats_keys[uid] = set();
# Request data from peers.
event SumStats::cluster_ss_request(uid, ss$name, T);
done_with[uid] = 0;
event SumStats::get_a_key(uid, ss$name);
#print fmt("get_key by uid: %s", uid);
event SumStats::get_a_key(uid, ss$name, T);
}
# Schedule the next finish_epoch event.
@ -299,9 +304,6 @@ function handle_end_of_result_collection(uid: string, ss_name: string, key: Key,
event SumStats::cluster_threshold_crossed(ss_name, key, threshold_tracker[ss_name][key]);
}
delete key_requests[uid];
delete done_with[uid];
if ( cleanup )
{
# This is done here because "cleanup" implicitly means
@ -319,25 +321,27 @@ function handle_end_of_result_collection(uid: string, ss_name: string, key: Key,
--outstanding_global_views[ss_name];
}
if ( uid in stats_results )
delete stats_results[uid][key];
delete key_requests[uid];
delete done_with[uid];
}
function request_all_current_keys(uid: string, ss_name: string, cleanup: bool)
{
#print "request_all_current_keys";
if ( uid in stats_results && |stats_results[uid]| > 0 )
if ( uid in stats_keys && |stats_keys[uid]| > 0 )
{
#print fmt(" -- %d remaining keys here", |stats_results[uid]|);
for ( key in stats_results[uid] )
#print fmt(" -- %d remaining keys here", |stats_keys[uid]|);
for ( key in stats_keys[uid] )
{
done_with[uid] = 0;
event SumStats::cluster_get_result(uid, ss_name, key, cleanup);
when ( uid in done_with && Cluster::worker_count == done_with[uid] )
{
#print "done getting result";
handle_end_of_result_collection(uid, ss_name, key, cleanup);
request_all_current_keys(uid, ss_name, cleanup);
}
delete stats_keys[uid][key];
break; # only a single key
}
}
@ -345,7 +349,8 @@ function request_all_current_keys(uid: string, ss_name: string, cleanup: bool)
{
# Get more keys! And this breaks us out of the evented loop.
done_with[uid] = 0;
event SumStats::get_a_key(uid, ss_name);
#print fmt("get_key by uid: %s", uid);
event SumStats::get_a_key(uid, ss_name, cleanup);
}
}
@ -357,7 +362,7 @@ event SumStats::send_no_key(uid: string, ss_name: string)
{
delete done_with[uid];
if ( |stats_results[uid]| > 0 )
if ( |stats_keys[uid]| > 0 )
{
#print "we need more keys!";
# Now that we have a key from each worker, lets
@ -377,21 +382,21 @@ event SumStats::send_no_key(uid: string, ss_name: string)
event SumStats::send_a_key(uid: string, ss_name: string, key: Key)
{
#print fmt("send_a_key %s", key);
if ( uid !in stats_results )
if ( uid !in stats_keys )
{
# no clue what happened here
return;
}
if ( key !in stats_results[uid] )
stats_results[uid][key] = table();
if ( key !in stats_keys[uid] )
add stats_keys[uid][key];
++done_with[uid];
if ( Cluster::worker_count == done_with[uid] )
{
delete done_with[uid];
if ( |stats_results[uid]| > 0 )
if ( |stats_keys[uid]| > 0 )
{
#print "we need more keys!";
# Now that we have a key from each worker, lets
@ -422,6 +427,12 @@ event SumStats::cluster_send_result(uid: string, ss_name: string, key: Key, resu
# Mark that a worker is done.
++done_with[uid];
#if ( Cluster::worker_count == done_with[uid] )
# {
# print "done";
# handle_end_of_result_collection(uid, ss_name, key, cleanup);
# }
}
# Managers handle intermediate updates here.
@ -458,10 +469,10 @@ event SumStats::cluster_key_intermediate_response(ss_name: string, key: Key)
# if ( ss_name !in stats_store )
# return;
#
# if ( uid !in stats_results )
# stats_results[uid] = table();
# if ( uid !in stats_keys )
# stats_keys[uid] = table();
#
# local local_data = stats_results[uid];
# local local_data = stats_keys[uid];
# local ss = stats_store[ss_name];
#
# for ( key in data )
@ -498,7 +509,7 @@ event SumStats::cluster_key_intermediate_response(ss_name: string, key: Key)
# ss$epoch_finished(now);
#
# # Clean up
# delete stats_results[uid];
# delete stats_keys[uid];
# delete done_with[uid];
# reset(ss);
# }
@ -508,17 +519,17 @@ event SumStats::cluster_key_intermediate_response(ss_name: string, key: Key)
# {
# # This only needs to be implemented this way for cluster compatibility.
# local uid = unique_id("dyn-");
# stats_results[uid] = table();
# stats_keys[uid] = table();
# done_with[uid] = 0;
# event SumStats::cluster_ss_request(uid, ss_name, F);
#
# return when ( uid in done_with && Cluster::worker_count == done_with[uid] )
# {
# if ( uid in stats_results )
# if ( uid in stats_keys )
# {
# local ss_result = stats_results[uid];
# local ss_result = stats_keys[uid];
# # Clean up
# delete stats_results[uid];
# delete stats_keys[uid];
# delete done_with[uid];
# reset(stats_store[ss_name]);
# return ss_result;
@ -542,6 +553,7 @@ function request_key(ss_name: string, key: Key): Result
event SumStats::cluster_get_result(uid, ss_name, key, F);
return when ( uid in done_with && Cluster::worker_count == done_with[uid] )
{
#print "done with request_key";
local result = key_requests[uid];
# Clean up
delete key_requests[uid];