mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 14:48:21 +00:00
sumstats: Remove Broker::auto_publish()
This commit is contained in:
parent
416887157c
commit
883ae3694c
1 changed files with 36 additions and 33 deletions
|
@ -61,14 +61,6 @@ global recent_global_view_keys: set[string, Key] &create_expire=1min;
|
||||||
|
|
||||||
@if ( Cluster::local_node_type() != Cluster::MANAGER )
|
@if ( Cluster::local_node_type() != Cluster::MANAGER )
|
||||||
|
|
||||||
event zeek_init() &priority=100
|
|
||||||
{
|
|
||||||
Broker::auto_publish(Cluster::manager_topic, SumStats::cluster_send_result);
|
|
||||||
Broker::auto_publish(Cluster::manager_topic, SumStats::cluster_key_intermediate_response);
|
|
||||||
Broker::auto_publish(Cluster::manager_topic, SumStats::send_a_key);
|
|
||||||
Broker::auto_publish(Cluster::manager_topic, SumStats::send_no_key);
|
|
||||||
}
|
|
||||||
|
|
||||||
# Result tables indexed on a uid that are currently being sent to the
|
# Result tables indexed on a uid that are currently being sent to the
|
||||||
# manager.
|
# manager.
|
||||||
global sending_results: table[string] of ResultTable = table() &read_expire=1min;
|
global sending_results: table[string] of ResultTable = table() &read_expire=1min;
|
||||||
|
@ -87,7 +79,8 @@ function data_added(ss: SumStat, key: Key, result: Result)
|
||||||
if ( check_thresholds(ss, key, result, cluster_request_global_view_percent) )
|
if ( check_thresholds(ss, key, result, cluster_request_global_view_percent) )
|
||||||
{
|
{
|
||||||
# kick off intermediate update
|
# kick off intermediate update
|
||||||
event SumStats::cluster_key_intermediate_response(ss$name, key);
|
Broker::publish(Cluster::manager_topic, SumStats::cluster_key_intermediate_response,
|
||||||
|
ss$name, key);
|
||||||
add recent_global_view_keys[ss$name, key];
|
add recent_global_view_keys[ss$name, key];
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -98,13 +91,15 @@ event SumStats::get_a_key(uid: string, ss_name: string, cleanup: bool)
|
||||||
{
|
{
|
||||||
if ( |sending_results[uid]| == 0 )
|
if ( |sending_results[uid]| == 0 )
|
||||||
{
|
{
|
||||||
event SumStats::send_no_key(uid, ss_name);
|
Broker::publish(Cluster::manager_topic, SumStats::send_no_key,
|
||||||
|
uid, ss_name);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
for ( key in sending_results[uid] )
|
for ( key in sending_results[uid] )
|
||||||
{
|
{
|
||||||
event SumStats::send_a_key(uid, ss_name, key);
|
Broker::publish(Cluster::manager_topic, SumStats::send_a_key,
|
||||||
|
uid, ss_name, key);
|
||||||
# break to only send one.
|
# break to only send one.
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -120,7 +115,8 @@ event SumStats::get_a_key(uid: string, ss_name: string, cleanup: bool)
|
||||||
{
|
{
|
||||||
for ( key in result_store[ss_name] )
|
for ( key in result_store[ss_name] )
|
||||||
{
|
{
|
||||||
event SumStats::send_a_key(uid, ss_name, key);
|
Broker::publish(Cluster::manager_topic, SumStats::send_a_key,
|
||||||
|
uid, ss_name, key);
|
||||||
# break to only send one.
|
# break to only send one.
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -128,7 +124,8 @@ event SumStats::get_a_key(uid: string, ss_name: string, cleanup: bool)
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
event SumStats::send_no_key(uid, ss_name);
|
Broker::publish(Cluster::manager_topic, SumStats::send_no_key,
|
||||||
|
uid, ss_name);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -154,16 +151,20 @@ event SumStats::cluster_get_result(uid: string, ss_name: string, key: Key, clean
|
||||||
{
|
{
|
||||||
if ( uid in sending_results && key in sending_results[uid] )
|
if ( uid in sending_results && key in sending_results[uid] )
|
||||||
{
|
{
|
||||||
|
# XXX: Is that comment stale?
|
||||||
|
#
|
||||||
# Note: copy is needed to compensate serialization caching issue. This should be
|
# Note: copy is needed to compensate serialization caching issue. This should be
|
||||||
# changed to something else later.
|
# changed to something else later.
|
||||||
event SumStats::cluster_send_result(uid, ss_name, key, copy(sending_results[uid][key]), cleanup);
|
Broker::publish(Cluster::manager_topic, SumStats::cluster_send_result,
|
||||||
|
uid, ss_name, key, copy(sending_results[uid][key]), cleanup);
|
||||||
delete sending_results[uid][key];
|
delete sending_results[uid][key];
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
# We need to send an empty response if we don't have the data so that the manager
|
# We need to send an empty response if we don't have the data so that the manager
|
||||||
# can know that it heard back from all of the workers.
|
# can know that it heard back from all of the workers.
|
||||||
event SumStats::cluster_send_result(uid, ss_name, key, table(), cleanup);
|
Broker::publish(Cluster::manager_topic, SumStats::cluster_send_result,
|
||||||
|
uid, ss_name, key, table(), cleanup);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -172,13 +173,15 @@ event SumStats::cluster_get_result(uid: string, ss_name: string, key: Key, clean
|
||||||
{
|
{
|
||||||
# Note: copy is needed to compensate serialization caching issue. This should be
|
# Note: copy is needed to compensate serialization caching issue. This should be
|
||||||
# changed to something else later.
|
# changed to something else later.
|
||||||
event SumStats::cluster_send_result(uid, ss_name, key, copy(result_store[ss_name][key]), cleanup);
|
Broker::publish(Cluster::manager_topic, SumStats::cluster_send_result,
|
||||||
|
uid, ss_name, key, copy(result_store[ss_name][key]), cleanup);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
# We need to send an empty response if we don't have the data so that the manager
|
# We need to send an empty response if we don't have the data so that the manager
|
||||||
# can know that it heard back from all of the workers.
|
# can know that it heard back from all of the workers.
|
||||||
event SumStats::cluster_send_result(uid, ss_name, key, table(), cleanup);
|
Broker::publish(Cluster::manager_topic, SumStats::cluster_send_result,
|
||||||
|
uid, ss_name, key, table(), cleanup);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -209,14 +212,6 @@ function request_key(ss_name: string, key: Key): Result
|
||||||
|
|
||||||
@if ( Cluster::local_node_type() == Cluster::MANAGER )
|
@if ( Cluster::local_node_type() == Cluster::MANAGER )
|
||||||
|
|
||||||
event zeek_init() &priority=100
|
|
||||||
{
|
|
||||||
Broker::auto_publish(Cluster::worker_topic, SumStats::cluster_ss_request);
|
|
||||||
Broker::auto_publish(Cluster::worker_topic, SumStats::cluster_get_result);
|
|
||||||
Broker::auto_publish(Cluster::worker_topic, SumStats::cluster_threshold_crossed);
|
|
||||||
Broker::auto_publish(Cluster::worker_topic, SumStats::get_a_key);
|
|
||||||
}
|
|
||||||
|
|
||||||
# This variable is maintained by manager nodes as they collect and aggregate
|
# This variable is maintained by manager nodes as they collect and aggregate
|
||||||
# results.
|
# results.
|
||||||
# Index on a uid.
|
# Index on a uid.
|
||||||
|
@ -263,12 +258,14 @@ event SumStats::finish_epoch(ss: SumStat)
|
||||||
stats_keys[uid] = set();
|
stats_keys[uid] = set();
|
||||||
|
|
||||||
# Request data from peers.
|
# Request data from peers.
|
||||||
event SumStats::cluster_ss_request(uid, ss$name, T);
|
Broker::publish(Cluster::worker_topic, SumStats::cluster_ss_request,
|
||||||
|
uid, ss$name, T);
|
||||||
|
|
||||||
done_with[uid] = 0;
|
done_with[uid] = 0;
|
||||||
|
|
||||||
#print fmt("get_key by uid: %s", uid);
|
#print fmt("get_key by uid: %s", uid);
|
||||||
event SumStats::get_a_key(uid, ss$name, T);
|
Broker::publish(Cluster::worker_topic, SumStats::get_a_key,
|
||||||
|
uid, ss$name, T);
|
||||||
}
|
}
|
||||||
|
|
||||||
# Schedule the next finish_epoch event.
|
# Schedule the next finish_epoch event.
|
||||||
|
@ -283,7 +280,8 @@ function data_added(ss: SumStat, key: Key, result: Result)
|
||||||
if ( check_thresholds(ss, key, result, 1.0) )
|
if ( check_thresholds(ss, key, result, 1.0) )
|
||||||
{
|
{
|
||||||
threshold_crossed(ss, key, result);
|
threshold_crossed(ss, key, result);
|
||||||
event SumStats::cluster_threshold_crossed(ss$name, key, threshold_tracker[ss$name][key]);
|
Broker::publish(Cluster::worker_topic, SumStats::cluster_threshold_crossed,
|
||||||
|
ss$name, key, threshold_tracker[ss$name][key]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -300,7 +298,8 @@ function handle_end_of_result_collection(uid: string, ss_name: string, key: Key,
|
||||||
if ( check_thresholds(ss, key, ir, 1.0) )
|
if ( check_thresholds(ss, key, ir, 1.0) )
|
||||||
{
|
{
|
||||||
threshold_crossed(ss, key, ir);
|
threshold_crossed(ss, key, ir);
|
||||||
event SumStats::cluster_threshold_crossed(ss_name, key, threshold_tracker[ss_name][key]);
|
Broker::publish(Cluster::worker_topic, SumStats::cluster_threshold_crossed,
|
||||||
|
ss_name, key, threshold_tracker[ss_name][key]);
|
||||||
}
|
}
|
||||||
|
|
||||||
if ( cleanup )
|
if ( cleanup )
|
||||||
|
@ -336,7 +335,8 @@ function request_all_current_keys(uid: string, ss_name: string, cleanup: bool)
|
||||||
}
|
}
|
||||||
|
|
||||||
done_with[uid] = 0;
|
done_with[uid] = 0;
|
||||||
event SumStats::cluster_get_result(uid, ss_name, key, cleanup);
|
Broker::publish(Cluster::worker_topic, SumStats::cluster_get_result,
|
||||||
|
uid, ss_name, key, cleanup);
|
||||||
delete stats_keys[uid][key];
|
delete stats_keys[uid][key];
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -344,7 +344,8 @@ function request_all_current_keys(uid: string, ss_name: string, cleanup: bool)
|
||||||
# Get more keys! And this breaks us out of the evented loop.
|
# Get more keys! And this breaks us out of the evented loop.
|
||||||
done_with[uid] = 0;
|
done_with[uid] = 0;
|
||||||
#print fmt("get_key by uid: %s", uid);
|
#print fmt("get_key by uid: %s", uid);
|
||||||
event SumStats::get_a_key(uid, ss_name, cleanup);
|
Broker::publish(Cluster::worker_topic, SumStats::get_a_key,
|
||||||
|
uid, ss_name, cleanup);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -469,7 +470,8 @@ event SumStats::cluster_key_intermediate_response(ss_name: string, key: Key)
|
||||||
add outstanding_global_views[ss_name][uid];
|
add outstanding_global_views[ss_name][uid];
|
||||||
done_with[uid] = 0;
|
done_with[uid] = 0;
|
||||||
#print fmt("requesting results for: %s", uid);
|
#print fmt("requesting results for: %s", uid);
|
||||||
event SumStats::cluster_get_result(uid, ss_name, key, F);
|
Broker::publish(Cluster::worker_topic, SumStats::cluster_get_result,
|
||||||
|
uid, ss_name, key, F);
|
||||||
}
|
}
|
||||||
|
|
||||||
function request_key(ss_name: string, key: Key): Result
|
function request_key(ss_name: string, key: Key): Result
|
||||||
|
@ -479,7 +481,8 @@ function request_key(ss_name: string, key: Key): Result
|
||||||
key_requests[uid] = table();
|
key_requests[uid] = table();
|
||||||
add dynamic_requests[uid];
|
add dynamic_requests[uid];
|
||||||
|
|
||||||
event SumStats::cluster_get_result(uid, ss_name, key, F);
|
Broker::publish(Cluster::worker_topic, SumStats::cluster_get_result,
|
||||||
|
uid, ss_name, key, F);
|
||||||
return when [uid, ss_name, key] ( uid in done_with &&
|
return when [uid, ss_name, key] ( uid in done_with &&
|
||||||
Cluster::get_active_node_count(Cluster::WORKER) == done_with[uid] )
|
Cluster::get_active_node_count(Cluster::WORKER) == done_with[uid] )
|
||||||
{
|
{
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue