sumstats: Move to Cluster::publish()

This commit is contained in:
Arne Welzel 2024-12-11 18:04:45 +00:00
parent fdf783df65
commit f58a2c2ca8

View file

@ -79,7 +79,7 @@ function data_added(ss: SumStat, key: Key, result: Result)
if ( check_thresholds(ss, key, result, cluster_request_global_view_percent) )
{
# kick off intermediate update
Broker::publish(Cluster::manager_topic, SumStats::cluster_key_intermediate_response,
Cluster::publish(Cluster::manager_topic, SumStats::cluster_key_intermediate_response,
ss$name, key);
add recent_global_view_keys[ss$name, key];
}
@ -91,14 +91,14 @@ event SumStats::get_a_key(uid: string, ss_name: string, cleanup: bool)
{
if ( |sending_results[uid]| == 0 )
{
Broker::publish(Cluster::manager_topic, SumStats::send_no_key,
Cluster::publish(Cluster::manager_topic, SumStats::send_no_key,
uid, ss_name);
}
else
{
for ( key in sending_results[uid] )
{
Broker::publish(Cluster::manager_topic, SumStats::send_a_key,
Cluster::publish(Cluster::manager_topic, SumStats::send_a_key,
uid, ss_name, key);
# break to only send one.
break;
@ -115,7 +115,7 @@ event SumStats::get_a_key(uid: string, ss_name: string, cleanup: bool)
{
for ( key in result_store[ss_name] )
{
Broker::publish(Cluster::manager_topic, SumStats::send_a_key,
Cluster::publish(Cluster::manager_topic, SumStats::send_a_key,
uid, ss_name, key);
# break to only send one.
break;
@ -124,7 +124,7 @@ event SumStats::get_a_key(uid: string, ss_name: string, cleanup: bool)
}
else
{
Broker::publish(Cluster::manager_topic, SumStats::send_no_key,
Cluster::publish(Cluster::manager_topic, SumStats::send_no_key,
uid, ss_name);
}
}
@ -151,7 +151,7 @@ event SumStats::cluster_get_result(uid: string, ss_name: string, key: Key, clean
{
if ( uid in sending_results && key in sending_results[uid] )
{
Broker::publish(Cluster::manager_topic, SumStats::cluster_send_result,
Cluster::publish(Cluster::manager_topic, SumStats::cluster_send_result,
uid, ss_name, key, sending_results[uid][key], cleanup);
delete sending_results[uid][key];
}
@ -159,7 +159,7 @@ event SumStats::cluster_get_result(uid: string, ss_name: string, key: Key, clean
{
# We need to send an empty response if we don't have the data so that the manager
# can know that it heard back from all of the workers.
Broker::publish(Cluster::manager_topic, SumStats::cluster_send_result,
Cluster::publish(Cluster::manager_topic, SumStats::cluster_send_result,
uid, ss_name, key, table(), cleanup);
}
}
@ -167,14 +167,14 @@ event SumStats::cluster_get_result(uid: string, ss_name: string, key: Key, clean
{
if ( ss_name in result_store && key in result_store[ss_name] )
{
Broker::publish(Cluster::manager_topic, SumStats::cluster_send_result,
Cluster::publish(Cluster::manager_topic, SumStats::cluster_send_result,
uid, ss_name, key, result_store[ss_name][key], cleanup);
}
else
{
# We need to send an empty response if we don't have the data so that the manager
# can know that it heard back from all of the workers.
Broker::publish(Cluster::manager_topic, SumStats::cluster_send_result,
Cluster::publish(Cluster::manager_topic, SumStats::cluster_send_result,
uid, ss_name, key, table(), cleanup);
}
}
@ -252,13 +252,13 @@ event SumStats::finish_epoch(ss: SumStat)
stats_keys[uid] = set();
# Request data from peers.
Broker::publish(Cluster::worker_topic, SumStats::cluster_ss_request,
Cluster::publish(Cluster::worker_topic, SumStats::cluster_ss_request,
uid, ss$name, T);
done_with[uid] = 0;
#print fmt("get_key by uid: %s", uid);
Broker::publish(Cluster::worker_topic, SumStats::get_a_key,
Cluster::publish(Cluster::worker_topic, SumStats::get_a_key,
uid, ss$name, T);
}
@ -274,7 +274,7 @@ function data_added(ss: SumStat, key: Key, result: Result)
if ( check_thresholds(ss, key, result, 1.0) )
{
threshold_crossed(ss, key, result);
Broker::publish(Cluster::worker_topic, SumStats::cluster_threshold_crossed,
Cluster::publish(Cluster::worker_topic, SumStats::cluster_threshold_crossed,
ss$name, key, threshold_tracker[ss$name][key]);
}
}
@ -292,7 +292,7 @@ function handle_end_of_result_collection(uid: string, ss_name: string, key: Key,
if ( check_thresholds(ss, key, ir, 1.0) )
{
threshold_crossed(ss, key, ir);
Broker::publish(Cluster::worker_topic, SumStats::cluster_threshold_crossed,
Cluster::publish(Cluster::worker_topic, SumStats::cluster_threshold_crossed,
ss_name, key, threshold_tracker[ss_name][key]);
}
@ -329,7 +329,7 @@ function request_all_current_keys(uid: string, ss_name: string, cleanup: bool)
}
done_with[uid] = 0;
Broker::publish(Cluster::worker_topic, SumStats::cluster_get_result,
Cluster::publish(Cluster::worker_topic, SumStats::cluster_get_result,
uid, ss_name, key, cleanup);
delete stats_keys[uid][key];
}
@ -338,7 +338,7 @@ function request_all_current_keys(uid: string, ss_name: string, cleanup: bool)
# Get more keys! And this breaks us out of the evented loop.
done_with[uid] = 0;
#print fmt("get_key by uid: %s", uid);
Broker::publish(Cluster::worker_topic, SumStats::get_a_key,
Cluster::publish(Cluster::worker_topic, SumStats::get_a_key,
uid, ss_name, cleanup);
}
}
@ -464,7 +464,7 @@ event SumStats::cluster_key_intermediate_response(ss_name: string, key: Key)
add outstanding_global_views[ss_name][uid];
done_with[uid] = 0;
#print fmt("requesting results for: %s", uid);
Broker::publish(Cluster::worker_topic, SumStats::cluster_get_result,
Cluster::publish(Cluster::worker_topic, SumStats::cluster_get_result,
uid, ss_name, key, F);
}
@ -475,7 +475,7 @@ function request_key(ss_name: string, key: Key): Result
key_requests[uid] = table();
add dynamic_requests[uid];
Broker::publish(Cluster::worker_topic, SumStats::cluster_get_result,
Cluster::publish(Cluster::worker_topic, SumStats::cluster_get_result,
uid, ss_name, key, F);
return when [uid, ss_name, key] ( uid in done_with &&
Cluster::get_active_node_count(Cluster::WORKER) == done_with[uid] )