Also track recent_global_view_keys on manager

Previously, recent_global_view_keys was only tracked on workers causing
a popular key to be sent up and handled by the manager once for each
worker.

This records the key inside recent_global_view_keys on the manager after
the first update, making the rest of the updates no-ops.

Additionally, since the counter value was never used, it has been
changed from a table to a set.
This commit is contained in:
Justin Azoff 2016-07-29 12:26:04 -04:00
parent 4ad5d9073a
commit 1f7f16be9d

View file

@ -61,12 +61,11 @@ redef Cluster::manager2worker_events += /SumStats::(get_a_key)/;
redef Cluster::worker2manager_events += /SumStats::cluster_(send_result|key_intermediate_response)/; redef Cluster::worker2manager_events += /SumStats::cluster_(send_result|key_intermediate_response)/;
redef Cluster::worker2manager_events += /SumStats::(send_a_key|send_no_key)/; redef Cluster::worker2manager_events += /SumStats::(send_a_key|send_no_key)/;
# This variable is maintained to know what keys have recently sent or received
# intermediate updates so they don't overwhelm the manager.
global recent_global_view_keys: set[string, Key] &create_expire=1min;
@if ( Cluster::local_node_type() != Cluster::MANAGER ) @if ( Cluster::local_node_type() != Cluster::MANAGER )
# This variable is maintained to know what keys have recently sent as
# intermediate updates so they don't overwhelm their manager. The count that is
# yielded is the number of times the percentage threshold has been crossed and
# an intermediate result has been received.
global recent_global_view_keys: table[string, Key] of count &create_expire=1min &default=0;
# Result tables indexed on a uid that are currently being sent to the # Result tables indexed on a uid that are currently being sent to the
# manager. # manager.
@ -76,8 +75,7 @@ global sending_results: table[string] of ResultTable = table() &read_expire=1min
# being collected somewhere other than a worker. # being collected somewhere other than a worker.
function data_added(ss: SumStat, key: Key, result: Result) function data_added(ss: SumStat, key: Key, result: Result)
{ {
# If an intermediate update for this value was sent recently, don't send # If an intermediate update for this key was sent recently, don't send it again
# it again.
if ( [ss$name, key] in recent_global_view_keys ) if ( [ss$name, key] in recent_global_view_keys )
return; return;
@ -88,7 +86,7 @@ function data_added(ss: SumStat, key: Key, result: Result)
{ {
# kick off intermediate update # kick off intermediate update
event SumStats::cluster_key_intermediate_response(ss$name, key); event SumStats::cluster_key_intermediate_response(ss$name, key);
++recent_global_view_keys[ss$name, key]; add recent_global_view_keys[ss$name, key];
} }
} }
@ -441,6 +439,10 @@ event SumStats::cluster_key_intermediate_response(ss_name: string, key: Key)
{ {
#print fmt("MANAGER: receiving intermediate key data from %s", get_event_peer()$descr); #print fmt("MANAGER: receiving intermediate key data from %s", get_event_peer()$descr);
#print fmt("MANAGER: requesting key data for %s", key); #print fmt("MANAGER: requesting key data for %s", key);
# If an intermediate update for this key was handled recently, don't do it again
if ( [ss_name, key] in recent_global_view_keys )
return;
add recent_global_view_keys[ss_name, key];
if ( ss_name in outstanding_global_views && if ( ss_name in outstanding_global_views &&
|outstanding_global_views[ss_name]| > max_outstanding_global_views ) |outstanding_global_views[ss_name]| > max_outstanding_global_views )