mirror of
https://github.com/zeek/zeek.git
synced 2025-10-17 05:58:20 +00:00
Merge remote-tracking branch 'origin/topic/jazoff/bit-1649'
* origin/topic/jazoff/bit-1649: Track outstanding_global_views updates by uid Also track recent_global_view_keys on manager BIT-1649 #merged
This commit is contained in:
commit
ff114709db
3 changed files with 25 additions and 19 deletions
6
CHANGES
6
CHANGES
|
@ -1,4 +1,10 @@
|
||||||
|
|
||||||
|
2.4-947 | 2016-08-16 12:10:02 -0700
|
||||||
|
|
||||||
|
* Fix issues with handling of indermediate sumstats updates. (Justin Azoff)
|
||||||
|
|
||||||
|
* Address coverity errors. (Johanna Amann)
|
||||||
|
|
||||||
2.4-943 | 2016-08-15 17:03:14 -0700
|
2.4-943 | 2016-08-15 17:03:14 -0700
|
||||||
|
|
||||||
* Add 'bro-config' script. (Jon Siwek)
|
* Add 'bro-config' script. (Jon Siwek)
|
||||||
|
|
2
VERSION
2
VERSION
|
@ -1 +1 @@
|
||||||
2.4-943
|
2.4-947
|
||||||
|
|
|
@ -61,12 +61,11 @@ redef Cluster::manager2worker_events += /SumStats::(get_a_key)/;
|
||||||
redef Cluster::worker2manager_events += /SumStats::cluster_(send_result|key_intermediate_response)/;
|
redef Cluster::worker2manager_events += /SumStats::cluster_(send_result|key_intermediate_response)/;
|
||||||
redef Cluster::worker2manager_events += /SumStats::(send_a_key|send_no_key)/;
|
redef Cluster::worker2manager_events += /SumStats::(send_a_key|send_no_key)/;
|
||||||
|
|
||||||
|
# This variable is maintained to know what keys have recently sent or received
|
||||||
|
# intermediate updates so they don't overwhelm the manager.
|
||||||
|
global recent_global_view_keys: set[string, Key] &create_expire=1min;
|
||||||
|
|
||||||
@if ( Cluster::local_node_type() != Cluster::MANAGER )
|
@if ( Cluster::local_node_type() != Cluster::MANAGER )
|
||||||
# This variable is maintained to know what keys have recently sent as
|
|
||||||
# intermediate updates so they don't overwhelm their manager. The count that is
|
|
||||||
# yielded is the number of times the percentage threshold has been crossed and
|
|
||||||
# an intermediate result has been received.
|
|
||||||
global recent_global_view_keys: table[string, Key] of count &create_expire=1min &default=0;
|
|
||||||
|
|
||||||
# Result tables indexed on a uid that are currently being sent to the
|
# Result tables indexed on a uid that are currently being sent to the
|
||||||
# manager.
|
# manager.
|
||||||
|
@ -76,8 +75,7 @@ global sending_results: table[string] of ResultTable = table() &read_expire=1min
|
||||||
# being collected somewhere other than a worker.
|
# being collected somewhere other than a worker.
|
||||||
function data_added(ss: SumStat, key: Key, result: Result)
|
function data_added(ss: SumStat, key: Key, result: Result)
|
||||||
{
|
{
|
||||||
# If an intermediate update for this value was sent recently, don't send
|
# If an intermediate update for this key was sent recently, don't send it again
|
||||||
# it again.
|
|
||||||
if ( [ss$name, key] in recent_global_view_keys )
|
if ( [ss$name, key] in recent_global_view_keys )
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
@ -88,7 +86,7 @@ function data_added(ss: SumStat, key: Key, result: Result)
|
||||||
{
|
{
|
||||||
# kick off intermediate update
|
# kick off intermediate update
|
||||||
event SumStats::cluster_key_intermediate_response(ss$name, key);
|
event SumStats::cluster_key_intermediate_response(ss$name, key);
|
||||||
++recent_global_view_keys[ss$name, key];
|
add recent_global_view_keys[ss$name, key];
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -239,7 +237,7 @@ global dynamic_requests: set[string] &read_expire=1min;
|
||||||
# to too many intermediate updates. Each sumstat is tracked separately so that
|
# to too many intermediate updates. Each sumstat is tracked separately so that
|
||||||
# one won't overwhelm and degrade other quieter sumstats.
|
# one won't overwhelm and degrade other quieter sumstats.
|
||||||
# Indexed on a sumstat id.
|
# Indexed on a sumstat id.
|
||||||
global outstanding_global_views: table[string] of count &read_expire=1min &default=0;
|
global outstanding_global_views: table[string] of set[string] &read_expire=1min;
|
||||||
|
|
||||||
const zero_time = double_to_time(0.0);
|
const zero_time = double_to_time(0.0);
|
||||||
# Managers handle logging.
|
# Managers handle logging.
|
||||||
|
@ -305,12 +303,10 @@ function handle_end_of_result_collection(uid: string, ss_name: string, key: Key,
|
||||||
ss$epoch_result(now, key, ir);
|
ss$epoch_result(now, key, ir);
|
||||||
}
|
}
|
||||||
|
|
||||||
# Check that there is an outstanding view before subtracting.
|
|
||||||
# Global views only apply to non-dynamic requests. Dynamic
|
|
||||||
# requests must be serviced.
|
|
||||||
if ( outstanding_global_views[ss_name] > 0 )
|
|
||||||
--outstanding_global_views[ss_name];
|
|
||||||
}
|
}
|
||||||
|
# Check if this was an intermediate update
|
||||||
|
if ( ss_name in outstanding_global_views )
|
||||||
|
delete outstanding_global_views[ss_name][uid];
|
||||||
|
|
||||||
delete key_requests[uid];
|
delete key_requests[uid];
|
||||||
delete done_with[uid];
|
delete done_with[uid];
|
||||||
|
@ -441,9 +437,14 @@ event SumStats::cluster_key_intermediate_response(ss_name: string, key: Key)
|
||||||
{
|
{
|
||||||
#print fmt("MANAGER: receiving intermediate key data from %s", get_event_peer()$descr);
|
#print fmt("MANAGER: receiving intermediate key data from %s", get_event_peer()$descr);
|
||||||
#print fmt("MANAGER: requesting key data for %s", key);
|
#print fmt("MANAGER: requesting key data for %s", key);
|
||||||
|
# If an intermediate update for this key was handled recently, don't do it again
|
||||||
|
if ( [ss_name, key] in recent_global_view_keys )
|
||||||
|
return;
|
||||||
|
add recent_global_view_keys[ss_name, key];
|
||||||
|
|
||||||
if ( ss_name in outstanding_global_views &&
|
if ( ss_name !in outstanding_global_views)
|
||||||
|outstanding_global_views[ss_name]| > max_outstanding_global_views )
|
outstanding_global_views[ss_name] = set();
|
||||||
|
else if ( |outstanding_global_views[ss_name]| > max_outstanding_global_views )
|
||||||
{
|
{
|
||||||
# Don't do this intermediate update. Perhaps at some point in the future
|
# Don't do this intermediate update. Perhaps at some point in the future
|
||||||
# we will queue and randomly select from these ignored intermediate
|
# we will queue and randomly select from these ignored intermediate
|
||||||
|
@ -451,9 +452,8 @@ event SumStats::cluster_key_intermediate_response(ss_name: string, key: Key)
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
++outstanding_global_views[ss_name];
|
|
||||||
|
|
||||||
local uid = unique_id("");
|
local uid = unique_id("");
|
||||||
|
add outstanding_global_views[ss_name][uid];
|
||||||
done_with[uid] = 0;
|
done_with[uid] = 0;
|
||||||
#print fmt("requesting results for: %s", uid);
|
#print fmt("requesting results for: %s", uid);
|
||||||
event SumStats::cluster_get_result(uid, ss_name, key, F);
|
event SumStats::cluster_get_result(uid, ss_name, key, F);
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue