From c3a6916572436cffd4fc0e63bbdb8bcea584cd69 Mon Sep 17 00:00:00 2001 From: Seth Hall Date: Fri, 4 Jan 2013 16:54:13 -0500 Subject: [PATCH] More cluster tuning for the metrics framework. - Fixed several state maintenance issues for intermediate updates. - Added a new tuning variable Metrics::max_outstanding_global_views which limits the number of in-flight intermediate updates per metric filter. - Changed the default global view threshold percent to 20% (up from 10%) --- scripts/base/frameworks/metrics/cluster.bro | 60 +++++++++++++-------- 1 file changed, 39 insertions(+), 21 deletions(-) diff --git a/scripts/base/frameworks/metrics/cluster.bro b/scripts/base/frameworks/metrics/cluster.bro index f14f3b1518..01e127e4bf 100644 --- a/scripts/base/frameworks/metrics/cluster.bro +++ b/scripts/base/frameworks/metrics/cluster.bro @@ -20,7 +20,13 @@ export { ## requirement that the manager requests a global view for the index ## since it may opt not to if it requested a global view for the index ## recently. - const cluster_request_global_view_percent = 0.1 &redef; + const cluster_request_global_view_percent = 0.2 &redef; + + ## This is to deal with intermediate update overload. A manager will only allow + ## this many intermediate update requests to the workers to be inflight at + ## any given time. Requested intermediate updates are currently thrown out + ## and not performed. In practice this should hopefully have a minimal effect. + const max_outstanding_global_views = 10 &redef; ## Intermediate updates can cause overload situations on very large clusters. ## This option may help reduce load and correct intermittent problems. @@ -55,21 +61,17 @@ export { } -# This variable is maintained by all hosts for different purposes. Non-managers -# maintain it to know what indexes they have recently sent as intermediate -# updates so they don't overwhelm their manager. Managers maintain it so they -# don't overwhelm workers with intermediate index requests. The count that is -# yielded is the number of times the percentage threshold has been crossed and -# an intermediate result has been received. The manager may optionally request -# the index again before data expires from here if too many workers are crossing -# the percentage threshold (not implemented yet!). -global recent_global_view_indexes: table[string, string, Index] of count &create_expire=1min &default=0; - # Add events to the cluster framework to make this work. redef Cluster::manager2worker_events += /Metrics::cluster_(filter_request|index_request)/; redef Cluster::worker2manager_events += /Metrics::cluster_(filter_response|index_response|index_intermediate_response)/; @if ( Cluster::local_node_type() != Cluster::MANAGER ) +# This variable is maintained to know what indexes they have recently sent as +# intermediate updates so they don't overwhelm their manager. The count that is +# yielded is the number of times the percentage threshold has been crossed and +# an intermediate result has been received. +global recent_global_view_indexes: table[string, string, Index] of count &create_expire=1min &default=0; + # This is done on all non-manager node types in the event that a metric is # being collected somewhere other than a worker. function data_added(filter: Filter, index: Index, val: ResultVal) @@ -134,9 +136,14 @@ event Metrics::cluster_index_request(uid: string, id: string, filter_name: strin { if ( [id, filter_name] in store && index in store[id, filter_name] ) { - local data = store[id, filter_name][index]; #print fmt("WORKER %s: received the cluster_index_request event for %s=%s.", Cluster::node, index2str(index), data); - event Metrics::cluster_index_response(uid, id, filter_name, index, data); + event Metrics::cluster_index_response(uid, id, filter_name, index, store[id, filter_name][index]); + } + else + { + # We need to send an empty response if we don't have the data so that the manager + # can know that it heard back from all of the workers. + event Metrics::cluster_index_response(uid, id, filter_name, index, [$begin=network_time(), $end=network_time()]); } } @@ -164,8 +171,12 @@ global done_with: table[string] of count &read_expire=1min &default=0; # they are getting a global view for a certain index. global index_requests: table[string, string, string, Index] of ResultVal &read_expire=1min; +# This variable is maintained by managers to prevent overwhelming communication due +# to too many intermediate updates. Each metric filter is tracked separately so that +# one metric won't overwhelm and degrade other quieter metrics. +global outstanding_global_views: table[string, string] of count; -# Manager's handle logging. +# Managers handle logging. event Metrics::finish_period(filter: Filter) { #print fmt("%.6f MANAGER: breaking %s filter for %s metric", network_time(), filter$name, filter$id); @@ -194,26 +205,28 @@ function data_added(filter: Filter, index: Index, val: ResultVal) event Metrics::cluster_index_response(uid: string, id: string, filter_name: string, index: Index, val: ResultVal) { #print fmt("%0.6f MANAGER: receiving index data from %s - %s=%s", network_time(), get_event_peer()$descr, index2str(index), val); - if ( [uid, id, filter_name, index] in index_requests ) + + # We only want to try and do a value merge if there are actually measured datapoints + # in the ResultVal. + if ( val$num > 0 && [uid, id, filter_name, index] in index_requests ) index_requests[uid, id, filter_name, index] = merge_result_vals(index_requests[uid, id, filter_name, index], val); else index_requests[uid, id, filter_name, index] = val; - local ir = index_requests[uid, id, filter_name, index]; - # Mark that this worker is done. ++done_with[uid]; #print fmt("worker_count:%d :: done_with:%d", Cluster::worker_count, done_with[uid]); - if ( Cluster::worker_count == done_with[uid] ) { + local ir = index_requests[uid, id, filter_name, index]; if ( check_thresholds(filter_store[id, filter_name], index, ir, 1.0) ) { threshold_crossed(filter_store[id, filter_name], index, ir); } delete done_with[uid]; delete index_requests[uid, id, filter_name, index]; + --outstanding_global_views[id, filter_name]; } } @@ -223,11 +236,16 @@ event Metrics::cluster_index_intermediate_response(id: string, filter_name: stri #print fmt("MANAGER: receiving intermediate index data from %s", get_event_peer()$descr); #print fmt("MANAGER: requesting index data for %s", index2str(index)); - # If a worker recently sent this as an intermediate update, don't request it. - if ( [id, filter_name, index] in recent_global_view_indexes ) + if ( [id, filter_name] in outstanding_global_views && + |outstanding_global_views[id, filter_name]| > max_outstanding_global_views ) + { + # Don't do this intermediate update. Perhaps at some point in the future + # we will queue and randomly select from these ignored intermediate + # update requests. return; + } - ++recent_global_view_indexes[id, filter_name, index]; + ++outstanding_global_views[id, filter_name]; local uid = unique_id(""); event Metrics::cluster_index_request(uid, id, filter_name, index);