mirror of
https://github.com/zeek/zeek.git
synced 2025-10-05 16:18:19 +00:00
More cluster tuning for the metrics framework.
- Fixed several state maintenance issues for intermediate updates. - Added a new tuning variable Metrics::max_outstanding_global_views which limits the number of in-flight intermediate updates per metric filter. - Changed the default global view threshold percent to 20% (up from 10%)
This commit is contained in:
parent
50827d8df0
commit
c3a6916572
1 changed files with 39 additions and 21 deletions
|
@ -20,7 +20,13 @@ export {
|
|||
## requirement that the manager requests a global view for the index
|
||||
## since it may opt not to if it requested a global view for the index
|
||||
## recently.
|
||||
const cluster_request_global_view_percent = 0.1 &redef;
|
||||
const cluster_request_global_view_percent = 0.2 &redef;
|
||||
|
||||
## This is to deal with intermediate update overload. A manager will only allow
|
||||
## this many intermediate update requests to the workers to be inflight at
|
||||
## any given time. Requested intermediate updates are currently thrown out
|
||||
## and not performed. In practice this should hopefully have a minimal effect.
|
||||
const max_outstanding_global_views = 10 &redef;
|
||||
|
||||
## Intermediate updates can cause overload situations on very large clusters.
|
||||
## This option may help reduce load and correct intermittent problems.
|
||||
|
@ -55,21 +61,17 @@ export {
|
|||
}
|
||||
|
||||
|
||||
# This variable is maintained by all hosts for different purposes. Non-managers
|
||||
# maintain it to know what indexes they have recently sent as intermediate
|
||||
# updates so they don't overwhelm their manager. Managers maintain it so they
|
||||
# don't overwhelm workers with intermediate index requests. The count that is
|
||||
# yielded is the number of times the percentage threshold has been crossed and
|
||||
# an intermediate result has been received. The manager may optionally request
|
||||
# the index again before data expires from here if too many workers are crossing
|
||||
# the percentage threshold (not implemented yet!).
|
||||
global recent_global_view_indexes: table[string, string, Index] of count &create_expire=1min &default=0;
|
||||
|
||||
# Add events to the cluster framework to make this work.
|
||||
redef Cluster::manager2worker_events += /Metrics::cluster_(filter_request|index_request)/;
|
||||
redef Cluster::worker2manager_events += /Metrics::cluster_(filter_response|index_response|index_intermediate_response)/;
|
||||
|
||||
@if ( Cluster::local_node_type() != Cluster::MANAGER )
|
||||
# This variable is maintained to know what indexes they have recently sent as
|
||||
# intermediate updates so they don't overwhelm their manager. The count that is
|
||||
# yielded is the number of times the percentage threshold has been crossed and
|
||||
# an intermediate result has been received.
|
||||
global recent_global_view_indexes: table[string, string, Index] of count &create_expire=1min &default=0;
|
||||
|
||||
# This is done on all non-manager node types in the event that a metric is
|
||||
# being collected somewhere other than a worker.
|
||||
function data_added(filter: Filter, index: Index, val: ResultVal)
|
||||
|
@ -134,9 +136,14 @@ event Metrics::cluster_index_request(uid: string, id: string, filter_name: strin
|
|||
{
|
||||
if ( [id, filter_name] in store && index in store[id, filter_name] )
|
||||
{
|
||||
local data = store[id, filter_name][index];
|
||||
#print fmt("WORKER %s: received the cluster_index_request event for %s=%s.", Cluster::node, index2str(index), data);
|
||||
event Metrics::cluster_index_response(uid, id, filter_name, index, data);
|
||||
event Metrics::cluster_index_response(uid, id, filter_name, index, store[id, filter_name][index]);
|
||||
}
|
||||
else
|
||||
{
|
||||
# We need to send an empty response if we don't have the data so that the manager
|
||||
# can know that it heard back from all of the workers.
|
||||
event Metrics::cluster_index_response(uid, id, filter_name, index, [$begin=network_time(), $end=network_time()]);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -164,8 +171,12 @@ global done_with: table[string] of count &read_expire=1min &default=0;
|
|||
# they are getting a global view for a certain index.
|
||||
global index_requests: table[string, string, string, Index] of ResultVal &read_expire=1min;
|
||||
|
||||
# This variable is maintained by managers to prevent overwhelming communication due
|
||||
# to too many intermediate updates. Each metric filter is tracked separately so that
|
||||
# one metric won't overwhelm and degrade other quieter metrics.
|
||||
global outstanding_global_views: table[string, string] of count;
|
||||
|
||||
# Manager's handle logging.
|
||||
# Managers handle logging.
|
||||
event Metrics::finish_period(filter: Filter)
|
||||
{
|
||||
#print fmt("%.6f MANAGER: breaking %s filter for %s metric", network_time(), filter$name, filter$id);
|
||||
|
@ -194,26 +205,28 @@ function data_added(filter: Filter, index: Index, val: ResultVal)
|
|||
event Metrics::cluster_index_response(uid: string, id: string, filter_name: string, index: Index, val: ResultVal)
|
||||
{
|
||||
#print fmt("%0.6f MANAGER: receiving index data from %s - %s=%s", network_time(), get_event_peer()$descr, index2str(index), val);
|
||||
if ( [uid, id, filter_name, index] in index_requests )
|
||||
|
||||
# We only want to try and do a value merge if there are actually measured datapoints
|
||||
# in the ResultVal.
|
||||
if ( val$num > 0 && [uid, id, filter_name, index] in index_requests )
|
||||
index_requests[uid, id, filter_name, index] = merge_result_vals(index_requests[uid, id, filter_name, index], val);
|
||||
else
|
||||
index_requests[uid, id, filter_name, index] = val;
|
||||
|
||||
local ir = index_requests[uid, id, filter_name, index];
|
||||
|
||||
# Mark that this worker is done.
|
||||
++done_with[uid];
|
||||
|
||||
#print fmt("worker_count:%d :: done_with:%d", Cluster::worker_count, done_with[uid]);
|
||||
|
||||
if ( Cluster::worker_count == done_with[uid] )
|
||||
{
|
||||
local ir = index_requests[uid, id, filter_name, index];
|
||||
if ( check_thresholds(filter_store[id, filter_name], index, ir, 1.0) )
|
||||
{
|
||||
threshold_crossed(filter_store[id, filter_name], index, ir);
|
||||
}
|
||||
delete done_with[uid];
|
||||
delete index_requests[uid, id, filter_name, index];
|
||||
--outstanding_global_views[id, filter_name];
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -223,11 +236,16 @@ event Metrics::cluster_index_intermediate_response(id: string, filter_name: stri
|
|||
#print fmt("MANAGER: receiving intermediate index data from %s", get_event_peer()$descr);
|
||||
#print fmt("MANAGER: requesting index data for %s", index2str(index));
|
||||
|
||||
# If a worker recently sent this as an intermediate update, don't request it.
|
||||
if ( [id, filter_name, index] in recent_global_view_indexes )
|
||||
if ( [id, filter_name] in outstanding_global_views &&
|
||||
|outstanding_global_views[id, filter_name]| > max_outstanding_global_views )
|
||||
{
|
||||
# Don't do this intermediate update. Perhaps at some point in the future
|
||||
# we will queue and randomly select from these ignored intermediate
|
||||
# update requests.
|
||||
return;
|
||||
}
|
||||
|
||||
++recent_global_view_indexes[id, filter_name, index];
|
||||
++outstanding_global_views[id, filter_name];
|
||||
|
||||
local uid = unique_id("");
|
||||
event Metrics::cluster_index_request(uid, id, filter_name, index);
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue