API updates for metrics framework.

- Removed default logging.  Now a function is available for the new
  $period_finished filter field to get the same behavior for logging
  named Metrics::write_log.

- Added index rollups for getting multiple metrics result values
  as the same time.
This commit is contained in:
Seth Hall 2012-12-18 01:08:59 -05:00
parent 69030fdff3
commit 69b7ce12d2
17 changed files with 304 additions and 162 deletions

View file

@ -60,18 +60,18 @@ global requested_results: table[string] of time = table() &create_expire=5mins;
# This variable is maintained by manager nodes as they collect and aggregate
# results.
global filter_results: table[string, string, string] of MetricTable &create_expire=5mins;
global filter_results: table[string, string, string] of MetricTable &read_expire=1min;
# This variable is maintained by manager nodes to track how many "dones" they
# collected per collection unique id. Once the number of results for a uid
# matches the number of peer nodes that results should be coming from, the
# result is written out and deleted from here.
# TODO: add an &expire_func in case not all results are received.
global done_with: table[string] of count &create_expire=5mins &default=0;
global done_with: table[string] of count &read_expire=1min &default=0;
# This variable is maintained by managers to track intermediate responses as
# they are getting a global view for a certain index.
global index_requests: table[string, string, string, Index] of ResultVal &create_expire=5mins &default=[];
global index_requests: table[string, string, string, Index] of ResultVal &read_expire=1min;
# This variable is maintained by all hosts for different purposes. Non-managers
# maintain it to know what indexes they have recently sent as intermediate
@ -163,7 +163,7 @@ event Metrics::cluster_index_request(uid: string, id: string, filter_name: strin
@if ( Cluster::local_node_type() == Cluster::MANAGER )
# Manager's handle logging.
event Metrics::log_it(filter: Filter)
event Metrics::finish_period(filter: Filter)
{
#print fmt("%.6f MANAGER: breaking %s filter for %s metric", network_time(), filter$name, filter$id);
local uid = unique_id("");
@ -174,8 +174,8 @@ event Metrics::log_it(filter: Filter)
# Request data from peers.
event Metrics::cluster_filter_request(uid, filter$id, filter$name);
# Schedule the log_it event for the next break period.
schedule filter$every { Metrics::log_it(filter) };
# Schedule the next finish_period event.
schedule filter$every { Metrics::finish_period(filter) };
}
# This is unlikely to be called often, but it's here in case there are metrics
@ -237,6 +237,8 @@ event Metrics::cluster_filter_response(uid: string, id: string, filter_name: str
++done_with[uid];
local local_data = filter_results[uid, id, filter_name];
local filter = filter_store[id, filter_name];
for ( index in data )
{
if ( index in local_data )
@ -245,18 +247,18 @@ event Metrics::cluster_filter_response(uid: string, id: string, filter_name: str
local_data[index] = data[index];
# If a filter is done being collected, thresholds for each index
# need to checked so we're doing it here to avoid doubly iterating
# need to be checked so we're doing it here to avoid doubly iterating
# over each index.
if ( Cluster::worker_count == done_with[uid] )
{
if ( check_thresholds(filter_store[id, filter_name], index, local_data[index], 1.0) )
if ( check_thresholds(filter, index, local_data[index], 1.0) )
{
threshold_crossed(filter_store[id, filter_name], index, local_data[index]);
threshold_crossed(filter, index, local_data[index]);
}
}
}
# If the data has been collected from all peers, we are done and ready to log.
# If the data has been collected from all peers, we are done and ready to finish.
if ( Cluster::worker_count == done_with[uid] )
{
local ts = network_time();
@ -267,11 +269,30 @@ event Metrics::cluster_filter_response(uid: string, id: string, filter_name: str
delete requested_results[uid];
}
write_log(ts, filter_store[id, filter_name], local_data);
if ( filter?$rollup )
{
for ( index in local_data )
{
if ( index !in rollup_store )
rollup_store[index] = table();
rollup_store[index][id, filter_name] = local_data[index];
# If all of the result vals are stored then the rollup callback can be executed.
if ( |rollup_store[index]| == |rollups[filter$rollup]$filters| )
{
rollups[filter$rollup]$callback(index, rollup_store[index]);
}
}
}
if ( filter?$period_finished )
filter$period_finished(ts, filter$id, filter$name, local_data);
# Clean up
delete filter_results[uid, id, filter_name];
delete done_with[uid];
# Not sure I need to reset the filter on the manager.
reset(filter);
}
}