mirror of
https://github.com/zeek/zeek.git
synced 2025-10-04 15:48:19 +00:00
Cluster support for the metrics framework returns and all tests work again.
This commit is contained in:
parent
6600e62ea3
commit
47f58e6340
10 changed files with 76 additions and 67 deletions
|
@ -41,7 +41,7 @@ export {
|
|||
|
||||
## This event is sent by nodes in response to a
|
||||
## :bro:id:`Metrics::cluster_index_request` event.
|
||||
global cluster_index_response: event(uid: string, id: string, filter_name: string, index: Index, val: count);
|
||||
global cluster_index_response: event(uid: string, id: string, filter_name: string, index: Index, data: DataPoint);
|
||||
|
||||
## This is sent by workers to indicate that they crossed the percent of the
|
||||
## current threshold by the percentage defined globally in
|
||||
|
@ -76,7 +76,7 @@ global done_with: table[string] of count &create_expire=5mins &default=0;
|
|||
|
||||
# This variable is maintained by managers to track intermediate responses as
|
||||
# they are getting a global view for a certain index.
|
||||
global index_requests: table[string, string, string, Index] of count &create_expire=5mins &default=0;
|
||||
global index_requests: table[string, string, string, Index] of DataPoint &create_expire=5mins &default=[];
|
||||
|
||||
# This variable is maintained by all hosts for different purposes. Non-managers
|
||||
# maintain it to know what indexes they have recently sent as intermediate
|
||||
|
@ -157,12 +157,12 @@ event Metrics::cluster_filter_request(uid: string, id: string, filter_name: stri
|
|||
|
||||
event Metrics::cluster_index_request(uid: string, id: string, filter_name: string, index: Index)
|
||||
{
|
||||
local val=0;
|
||||
local data: DataPoint;
|
||||
if ( index in store[id, filter_name] )
|
||||
val = store[id, filter_name][index];
|
||||
data = store[id, filter_name][index];
|
||||
|
||||
# fmt("WORKER %s: received the cluster_index_request event for %s=%d.", Cluster::node, index2str(index), val);
|
||||
event Metrics::cluster_index_response(uid, id, filter_name, index, val);
|
||||
event Metrics::cluster_index_response(uid, id, filter_name, index, data);
|
||||
}
|
||||
|
||||
@endif
|
||||
|
@ -195,21 +195,19 @@ function data_added(filter: Filter, index: Index, val: count)
|
|||
do_notice(filter, index, val);
|
||||
}
|
||||
|
||||
event Metrics::cluster_index_response(uid: string, id: string, filter_name: string, index: Index, val: count)
|
||||
event Metrics::cluster_index_response(uid: string, id: string, filter_name: string, index: Index, data: DataPoint)
|
||||
{
|
||||
#print fmt("%0.6f MANAGER: receiving index data from %s", network_time(), get_event_peer()$descr);
|
||||
|
||||
if ( [uid, id, filter_name, index] !in index_requests )
|
||||
index_requests[uid, id, filter_name, index] = 0;
|
||||
|
||||
index_requests[uid, id, filter_name, index] += val;
|
||||
index_requests[uid, id, filter_name, index] = merge_data_points(index_requests[uid, id, filter_name, index], data);
|
||||
local ir = index_requests[uid, id, filter_name, index];
|
||||
|
||||
++done_with[uid];
|
||||
if ( Cluster::worker_count == done_with[uid] )
|
||||
{
|
||||
if ( check_notice(filter_store[id, filter_name], index, ir) )
|
||||
do_notice(filter_store[id, filter_name], index, ir);
|
||||
local size = ir?$num ? ir$num : |ir$unique_vals|;
|
||||
if ( check_notice(filter_store[id, filter_name], index, size) )
|
||||
do_notice(filter_store[id, filter_name], index, size);
|
||||
delete done_with[uid];
|
||||
delete index_requests[uid, id, filter_name, index];
|
||||
}
|
||||
|
@ -233,12 +231,13 @@ event Metrics::cluster_filter_response(uid: string, id: string, filter_name: str
|
|||
local local_data = filter_results[uid, id, filter_name];
|
||||
for ( index in data )
|
||||
{
|
||||
if ( index !in local_data )
|
||||
local_data[index] = 0;
|
||||
local_data[index] += data[index];
|
||||
if ( index in local_data )
|
||||
local_data[index] = merge_data_points(local_data[index], data[index]);
|
||||
else
|
||||
local_data[index] = data[index];
|
||||
}
|
||||
|
||||
# Mark another worker as being "done" for this uid.
|
||||
# Mark another worker as being "done" for this uid.
|
||||
if ( done )
|
||||
++done_with[uid];
|
||||
|
||||
|
|
|
@ -192,6 +192,32 @@ function index2str(index: Index): string
|
|||
return fmt("metric_index(%s)", out);
|
||||
}
|
||||
|
||||
function merge_data_points(dp1: DataPoint, dp2: DataPoint): DataPoint
|
||||
{
|
||||
local result: DataPoint;
|
||||
if ( dp1?$num || dp2?$num )
|
||||
{
|
||||
result$num = 0;
|
||||
if ( dp1?$num )
|
||||
result$num += dp1$num;
|
||||
if ( dp2?$num )
|
||||
result$num += dp2$num;
|
||||
}
|
||||
|
||||
if ( dp1?$unique_vals || dp2?$unique_vals )
|
||||
{
|
||||
result$unique_vals = set();
|
||||
if ( dp1?$unique_vals )
|
||||
for ( val1 in dp1$unique_vals )
|
||||
add result$unique_vals[val1];
|
||||
if ( dp2?$unique_vals )
|
||||
for ( val2 in dp2$unique_vals )
|
||||
add result$unique_vals[val2];
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
function write_log(ts: time, filter: Filter, data: MetricTable)
|
||||
{
|
||||
for ( index in data )
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue