Hopefully fixing a strange error.

This commit is contained in:
Seth Hall 2013-05-22 14:59:31 -04:00
parent 0a18b62d12
commit c4a1f30a87

View file

@ -33,7 +33,7 @@ export {
## Event sent by nodes that are collecting sumstats after receiving a request for ## Event sent by nodes that are collecting sumstats after receiving a request for
## the sumstat from the manager. ## the sumstat from the manager.
global cluster_ss_response: event(uid: string, ss_name: string, data: ResultTable, done: bool); global cluster_ss_response: event(uid: string, ss_name: string, data: ResultTable, done: bool, cleanup: bool);
## This event is sent by the manager in a cluster to initiate the collection of ## This event is sent by the manager in a cluster to initiate the collection of
## a single key value from a sumstat. It's typically used to get intermediate ## a single key value from a sumstat. It's typically used to get intermediate
@ -115,7 +115,7 @@ event SumStats::send_data(uid: string, ss_name: string, data: ResultTable, clean
# Note: copy is needed to compensate serialization caching issue. This should be # Note: copy is needed to compensate serialization caching issue. This should be
# changed to something else later. # changed to something else later.
event SumStats::cluster_ss_response(uid, ss_name, copy(local_data), done); event SumStats::cluster_ss_response(uid, ss_name, copy(local_data), done, cleanup);
if ( ! done ) if ( ! done )
schedule 0.01 sec { SumStats::send_data(uid, ss_name, incoming_data, T) }; schedule 0.01 sec { SumStats::send_data(uid, ss_name, incoming_data, T) };
} }
@ -175,7 +175,7 @@ event SumStats::thresholds_reset(ss_name: string)
# This variable is maintained by manager nodes as they collect and aggregate # This variable is maintained by manager nodes as they collect and aggregate
# results. # results.
# Index on a uid. # Index on a uid.
global stats_results: table[string] of ResultTable &create_expire=1min &default=table(); global stats_results: table[string] of ResultTable &create_expire=1min;
# This variable is maintained by manager nodes to track how many "dones" they # This variable is maintained by manager nodes to track how many "dones" they
# collected per collection unique id. Once the number of results for a uid # collected per collection unique id. Once the number of results for a uid
@ -292,7 +292,7 @@ event SumStats::cluster_key_intermediate_response(ss_name: string, key: Key)
event SumStats::cluster_key_request(uid, ss_name, key, T); event SumStats::cluster_key_request(uid, ss_name, key, T);
} }
event SumStats::cluster_ss_response(uid: string, ss_name: string, data: ResultTable, done: bool) event SumStats::cluster_ss_response(uid: string, ss_name: string, data: ResultTable, done: bool, cleanup: bool)
{ {
#print fmt("MANAGER: receiving results from %s", get_event_peer()$descr); #print fmt("MANAGER: receiving results from %s", get_event_peer()$descr);
@ -300,6 +300,13 @@ event SumStats::cluster_ss_response(uid: string, ss_name: string, data: ResultTa
if ( done ) if ( done )
++done_with[uid]; ++done_with[uid];
# We had better only be getting requests for stuff that exists.
if ( ss_name !in stats_store )
return;
if ( uid !in stats_results )
stats_results[uid] = table();
local local_data = stats_results[uid]; local local_data = stats_results[uid];
local ss = stats_store[ss_name]; local ss = stats_store[ss_name];
@ -324,8 +331,7 @@ event SumStats::cluster_ss_response(uid: string, ss_name: string, data: ResultTa
} }
# If the data has been collected from all peers, we are done and ready to finish. # If the data has been collected from all peers, we are done and ready to finish.
if ( Cluster::worker_count == done_with[uid] && if ( cleanup && Cluster::worker_count == done_with[uid] )
/^dyn-/ !in uid )
{ {
if ( ss?$epoch_finished ) if ( ss?$epoch_finished )
ss$epoch_finished(local_data); ss$epoch_finished(local_data);