diff --git a/scripts/base/frameworks/sumstats/cluster.zeek b/scripts/base/frameworks/sumstats/cluster.zeek index 5a975c5ee9..c5d45e7a7d 100644 --- a/scripts/base/frameworks/sumstats/cluster.zeek +++ b/scripts/base/frameworks/sumstats/cluster.zeek @@ -61,14 +61,6 @@ global recent_global_view_keys: set[string, Key] &create_expire=1min; @if ( Cluster::local_node_type() != Cluster::MANAGER ) -event zeek_init() &priority=100 - { - Broker::auto_publish(Cluster::manager_topic, SumStats::cluster_send_result); - Broker::auto_publish(Cluster::manager_topic, SumStats::cluster_key_intermediate_response); - Broker::auto_publish(Cluster::manager_topic, SumStats::send_a_key); - Broker::auto_publish(Cluster::manager_topic, SumStats::send_no_key); - } - # Result tables indexed on a uid that are currently being sent to the # manager. global sending_results: table[string] of ResultTable = table() &read_expire=1min; @@ -87,7 +79,8 @@ function data_added(ss: SumStat, key: Key, result: Result) if ( check_thresholds(ss, key, result, cluster_request_global_view_percent) ) { # kick off intermediate update - event SumStats::cluster_key_intermediate_response(ss$name, key); + Broker::publish(Cluster::manager_topic, SumStats::cluster_key_intermediate_response, + ss$name, key); add recent_global_view_keys[ss$name, key]; } } @@ -98,13 +91,15 @@ event SumStats::get_a_key(uid: string, ss_name: string, cleanup: bool) { if ( |sending_results[uid]| == 0 ) { - event SumStats::send_no_key(uid, ss_name); + Broker::publish(Cluster::manager_topic, SumStats::send_no_key, + uid, ss_name); } else { for ( key in sending_results[uid] ) { - event SumStats::send_a_key(uid, ss_name, key); + Broker::publish(Cluster::manager_topic, SumStats::send_a_key, + uid, ss_name, key); # break to only send one. break; } @@ -120,7 +115,8 @@ event SumStats::get_a_key(uid: string, ss_name: string, cleanup: bool) { for ( key in result_store[ss_name] ) { - event SumStats::send_a_key(uid, ss_name, key); + Broker::publish(Cluster::manager_topic, SumStats::send_a_key, + uid, ss_name, key); # break to only send one. break; } @@ -128,7 +124,8 @@ event SumStats::get_a_key(uid: string, ss_name: string, cleanup: bool) } else { - event SumStats::send_no_key(uid, ss_name); + Broker::publish(Cluster::manager_topic, SumStats::send_no_key, + uid, ss_name); } } @@ -154,16 +151,20 @@ event SumStats::cluster_get_result(uid: string, ss_name: string, key: Key, clean { if ( uid in sending_results && key in sending_results[uid] ) { + # XXX: Is that comment stale? + # # Note: copy is needed to compensate serialization caching issue. This should be # changed to something else later. - event SumStats::cluster_send_result(uid, ss_name, key, copy(sending_results[uid][key]), cleanup); + Broker::publish(Cluster::manager_topic, SumStats::cluster_send_result, + uid, ss_name, key, copy(sending_results[uid][key]), cleanup); delete sending_results[uid][key]; } else { # We need to send an empty response if we don't have the data so that the manager # can know that it heard back from all of the workers. - event SumStats::cluster_send_result(uid, ss_name, key, table(), cleanup); + Broker::publish(Cluster::manager_topic, SumStats::cluster_send_result, + uid, ss_name, key, table(), cleanup); } } else @@ -172,13 +173,15 @@ event SumStats::cluster_get_result(uid: string, ss_name: string, key: Key, clean { # Note: copy is needed to compensate serialization caching issue. This should be # changed to something else later. - event SumStats::cluster_send_result(uid, ss_name, key, copy(result_store[ss_name][key]), cleanup); + Broker::publish(Cluster::manager_topic, SumStats::cluster_send_result, + uid, ss_name, key, copy(result_store[ss_name][key]), cleanup); } else { # We need to send an empty response if we don't have the data so that the manager # can know that it heard back from all of the workers. - event SumStats::cluster_send_result(uid, ss_name, key, table(), cleanup); + Broker::publish(Cluster::manager_topic, SumStats::cluster_send_result, + uid, ss_name, key, table(), cleanup); } } } @@ -209,14 +212,6 @@ function request_key(ss_name: string, key: Key): Result @if ( Cluster::local_node_type() == Cluster::MANAGER ) -event zeek_init() &priority=100 - { - Broker::auto_publish(Cluster::worker_topic, SumStats::cluster_ss_request); - Broker::auto_publish(Cluster::worker_topic, SumStats::cluster_get_result); - Broker::auto_publish(Cluster::worker_topic, SumStats::cluster_threshold_crossed); - Broker::auto_publish(Cluster::worker_topic, SumStats::get_a_key); - } - # This variable is maintained by manager nodes as they collect and aggregate # results. # Index on a uid. @@ -263,12 +258,14 @@ event SumStats::finish_epoch(ss: SumStat) stats_keys[uid] = set(); # Request data from peers. - event SumStats::cluster_ss_request(uid, ss$name, T); + Broker::publish(Cluster::worker_topic, SumStats::cluster_ss_request, + uid, ss$name, T); done_with[uid] = 0; #print fmt("get_key by uid: %s", uid); - event SumStats::get_a_key(uid, ss$name, T); + Broker::publish(Cluster::worker_topic, SumStats::get_a_key, + uid, ss$name, T); } # Schedule the next finish_epoch event. @@ -283,7 +280,8 @@ function data_added(ss: SumStat, key: Key, result: Result) if ( check_thresholds(ss, key, result, 1.0) ) { threshold_crossed(ss, key, result); - event SumStats::cluster_threshold_crossed(ss$name, key, threshold_tracker[ss$name][key]); + Broker::publish(Cluster::worker_topic, SumStats::cluster_threshold_crossed, + ss$name, key, threshold_tracker[ss$name][key]); } } @@ -300,7 +298,8 @@ function handle_end_of_result_collection(uid: string, ss_name: string, key: Key, if ( check_thresholds(ss, key, ir, 1.0) ) { threshold_crossed(ss, key, ir); - event SumStats::cluster_threshold_crossed(ss_name, key, threshold_tracker[ss_name][key]); + Broker::publish(Cluster::worker_topic, SumStats::cluster_threshold_crossed, + ss_name, key, threshold_tracker[ss_name][key]); } if ( cleanup ) @@ -336,7 +335,8 @@ function request_all_current_keys(uid: string, ss_name: string, cleanup: bool) } done_with[uid] = 0; - event SumStats::cluster_get_result(uid, ss_name, key, cleanup); + Broker::publish(Cluster::worker_topic, SumStats::cluster_get_result, + uid, ss_name, key, cleanup); delete stats_keys[uid][key]; } else @@ -344,7 +344,8 @@ function request_all_current_keys(uid: string, ss_name: string, cleanup: bool) # Get more keys! And this breaks us out of the evented loop. done_with[uid] = 0; #print fmt("get_key by uid: %s", uid); - event SumStats::get_a_key(uid, ss_name, cleanup); + Broker::publish(Cluster::worker_topic, SumStats::get_a_key, + uid, ss_name, cleanup); } } @@ -469,7 +470,8 @@ event SumStats::cluster_key_intermediate_response(ss_name: string, key: Key) add outstanding_global_views[ss_name][uid]; done_with[uid] = 0; #print fmt("requesting results for: %s", uid); - event SumStats::cluster_get_result(uid, ss_name, key, F); + Broker::publish(Cluster::worker_topic, SumStats::cluster_get_result, + uid, ss_name, key, F); } function request_key(ss_name: string, key: Key): Result @@ -479,7 +481,8 @@ function request_key(ss_name: string, key: Key): Result key_requests[uid] = table(); add dynamic_requests[uid]; - event SumStats::cluster_get_result(uid, ss_name, key, F); + Broker::publish(Cluster::worker_topic, SumStats::cluster_get_result, + uid, ss_name, key, F); return when [uid, ss_name, key] ( uid in done_with && Cluster::get_active_node_count(Cluster::WORKER) == done_with[uid] ) {