diff --git a/scripts/base/frameworks/sumstats/cluster.zeek b/scripts/base/frameworks/sumstats/cluster.zeek index 3016b5bb42..9d003d12f6 100644 --- a/scripts/base/frameworks/sumstats/cluster.zeek +++ b/scripts/base/frameworks/sumstats/cluster.zeek @@ -79,8 +79,8 @@ function data_added(ss: SumStat, key: Key, result: Result) if ( check_thresholds(ss, key, result, cluster_request_global_view_percent) ) { # kick off intermediate update - Broker::publish(Cluster::manager_topic, SumStats::cluster_key_intermediate_response, - ss$name, key); + Cluster::publish(Cluster::manager_topic, SumStats::cluster_key_intermediate_response, + ss$name, key); add recent_global_view_keys[ss$name, key]; } } @@ -91,15 +91,15 @@ event SumStats::get_a_key(uid: string, ss_name: string, cleanup: bool) { if ( |sending_results[uid]| == 0 ) { - Broker::publish(Cluster::manager_topic, SumStats::send_no_key, - uid, ss_name); + Cluster::publish(Cluster::manager_topic, SumStats::send_no_key, + uid, ss_name); } else { for ( key in sending_results[uid] ) { - Broker::publish(Cluster::manager_topic, SumStats::send_a_key, - uid, ss_name, key); + Cluster::publish(Cluster::manager_topic, SumStats::send_a_key, + uid, ss_name, key); # break to only send one. break; } @@ -115,8 +115,8 @@ event SumStats::get_a_key(uid: string, ss_name: string, cleanup: bool) { for ( key in result_store[ss_name] ) { - Broker::publish(Cluster::manager_topic, SumStats::send_a_key, - uid, ss_name, key); + Cluster::publish(Cluster::manager_topic, SumStats::send_a_key, + uid, ss_name, key); # break to only send one. break; } @@ -124,8 +124,8 @@ event SumStats::get_a_key(uid: string, ss_name: string, cleanup: bool) } else { - Broker::publish(Cluster::manager_topic, SumStats::send_no_key, - uid, ss_name); + Cluster::publish(Cluster::manager_topic, SumStats::send_no_key, + uid, ss_name); } } @@ -151,31 +151,31 @@ event SumStats::cluster_get_result(uid: string, ss_name: string, key: Key, clean { if ( uid in sending_results && key in sending_results[uid] ) { - Broker::publish(Cluster::manager_topic, SumStats::cluster_send_result, - uid, ss_name, key, sending_results[uid][key], cleanup); + Cluster::publish(Cluster::manager_topic, SumStats::cluster_send_result, + uid, ss_name, key, sending_results[uid][key], cleanup); delete sending_results[uid][key]; } else { # We need to send an empty response if we don't have the data so that the manager # can know that it heard back from all of the workers. - Broker::publish(Cluster::manager_topic, SumStats::cluster_send_result, - uid, ss_name, key, table(), cleanup); + Cluster::publish(Cluster::manager_topic, SumStats::cluster_send_result, + uid, ss_name, key, table(), cleanup); } } else { if ( ss_name in result_store && key in result_store[ss_name] ) { - Broker::publish(Cluster::manager_topic, SumStats::cluster_send_result, - uid, ss_name, key, result_store[ss_name][key], cleanup); + Cluster::publish(Cluster::manager_topic, SumStats::cluster_send_result, + uid, ss_name, key, result_store[ss_name][key], cleanup); } else { # We need to send an empty response if we don't have the data so that the manager # can know that it heard back from all of the workers. - Broker::publish(Cluster::manager_topic, SumStats::cluster_send_result, - uid, ss_name, key, table(), cleanup); + Cluster::publish(Cluster::manager_topic, SumStats::cluster_send_result, + uid, ss_name, key, table(), cleanup); } } } @@ -252,14 +252,14 @@ event SumStats::finish_epoch(ss: SumStat) stats_keys[uid] = set(); # Request data from peers. - Broker::publish(Cluster::worker_topic, SumStats::cluster_ss_request, - uid, ss$name, T); + Cluster::publish(Cluster::worker_topic, SumStats::cluster_ss_request, + uid, ss$name, T); done_with[uid] = 0; #print fmt("get_key by uid: %s", uid); - Broker::publish(Cluster::worker_topic, SumStats::get_a_key, - uid, ss$name, T); + Cluster::publish(Cluster::worker_topic, SumStats::get_a_key, + uid, ss$name, T); } # Schedule the next finish_epoch event. @@ -274,8 +274,8 @@ function data_added(ss: SumStat, key: Key, result: Result) if ( check_thresholds(ss, key, result, 1.0) ) { threshold_crossed(ss, key, result); - Broker::publish(Cluster::worker_topic, SumStats::cluster_threshold_crossed, - ss$name, key, threshold_tracker[ss$name][key]); + Cluster::publish(Cluster::worker_topic, SumStats::cluster_threshold_crossed, + ss$name, key, threshold_tracker[ss$name][key]); } } @@ -292,8 +292,8 @@ function handle_end_of_result_collection(uid: string, ss_name: string, key: Key, if ( check_thresholds(ss, key, ir, 1.0) ) { threshold_crossed(ss, key, ir); - Broker::publish(Cluster::worker_topic, SumStats::cluster_threshold_crossed, - ss_name, key, threshold_tracker[ss_name][key]); + Cluster::publish(Cluster::worker_topic, SumStats::cluster_threshold_crossed, + ss_name, key, threshold_tracker[ss_name][key]); } if ( cleanup ) @@ -329,8 +329,8 @@ function request_all_current_keys(uid: string, ss_name: string, cleanup: bool) } done_with[uid] = 0; - Broker::publish(Cluster::worker_topic, SumStats::cluster_get_result, - uid, ss_name, key, cleanup); + Cluster::publish(Cluster::worker_topic, SumStats::cluster_get_result, + uid, ss_name, key, cleanup); delete stats_keys[uid][key]; } else @@ -338,8 +338,8 @@ function request_all_current_keys(uid: string, ss_name: string, cleanup: bool) # Get more keys! And this breaks us out of the evented loop. done_with[uid] = 0; #print fmt("get_key by uid: %s", uid); - Broker::publish(Cluster::worker_topic, SumStats::get_a_key, - uid, ss_name, cleanup); + Cluster::publish(Cluster::worker_topic, SumStats::get_a_key, + uid, ss_name, cleanup); } } @@ -464,8 +464,8 @@ event SumStats::cluster_key_intermediate_response(ss_name: string, key: Key) add outstanding_global_views[ss_name][uid]; done_with[uid] = 0; #print fmt("requesting results for: %s", uid); - Broker::publish(Cluster::worker_topic, SumStats::cluster_get_result, - uid, ss_name, key, F); + Cluster::publish(Cluster::worker_topic, SumStats::cluster_get_result, + uid, ss_name, key, F); } function request_key(ss_name: string, key: Key): Result @@ -475,8 +475,8 @@ function request_key(ss_name: string, key: Key): Result key_requests[uid] = table(); add dynamic_requests[uid]; - Broker::publish(Cluster::worker_topic, SumStats::cluster_get_result, - uid, ss_name, key, F); + Cluster::publish(Cluster::worker_topic, SumStats::cluster_get_result, + uid, ss_name, key, F); return when [uid, ss_name, key] ( uid in done_with && Cluster::get_active_node_count(Cluster::WORKER) == done_with[uid] ) {