diff --git a/doc/scripts/DocSourcesList.cmake b/doc/scripts/DocSourcesList.cmake index 2efa45ef38..97150f84aa 100644 --- a/doc/scripts/DocSourcesList.cmake +++ b/doc/scripts/DocSourcesList.cmake @@ -203,7 +203,13 @@ rest_target(${psd} policy/frameworks/software/vulnerable.bro) rest_target(${psd} policy/integration/barnyard2/main.bro) rest_target(${psd} policy/integration/barnyard2/types.bro) rest_target(${psd} policy/integration/collective-intel/main.bro) -rest_target(${psd} policy/misc/app-metrics.bro) +rest_target(${psd} policy/misc/app-stats/main.bro) +rest_target(${psd} policy/misc/app-stats/plugins/facebook.bro) +rest_target(${psd} policy/misc/app-stats/plugins/gmail.bro) +rest_target(${psd} policy/misc/app-stats/plugins/google.bro) +rest_target(${psd} policy/misc/app-stats/plugins/netflix.bro) +rest_target(${psd} policy/misc/app-stats/plugins/pandora.bro) +rest_target(${psd} policy/misc/app-stats/plugins/youtube.bro) rest_target(${psd} policy/misc/capture-loss.bro) rest_target(${psd} policy/misc/detect-traceroute/main.bro) rest_target(${psd} policy/misc/load-balancing.bro) diff --git a/scripts/base/frameworks/sumstats/cluster.bro b/scripts/base/frameworks/sumstats/cluster.bro index 0c005d72a6..d5c5bc440a 100644 --- a/scripts/base/frameworks/sumstats/cluster.bro +++ b/scripts/base/frameworks/sumstats/cluster.bro @@ -33,17 +33,17 @@ export { ## Event sent by nodes that are collecting sumstats after receiving a request for ## the sumstat from the manager. - global cluster_ss_response: event(uid: string, ss_name: string, data: ResultTable, done: bool, cleanup: bool); + #global cluster_ss_response: event(uid: string, ss_name: string, data: ResultTable, done: bool, cleanup: bool); ## This event is sent by the manager in a cluster to initiate the collection of ## a single key value from a sumstat. It's typically used to get intermediate ## updates before the break interval triggers to speed detection of a value ## crossing a threshold. - global cluster_keys_request: event(uid: string, ss_name: string, key: set[Key], cleanup: bool); + global cluster_get_result: event(uid: string, ss_name: string, key: Key, cleanup: bool); ## This event is sent by nodes in response to a - ## :bro:id:`SumStats::cluster_keys_request` event. - global cluster_key_response: event(uid: string, ss_name: string, key: Key, result: Result, cleanup: bool); + ## :bro:id:`SumStats::cluster_get_result` event. + global cluster_send_result: event(uid: string, ss_name: string, key: Key, result: Result, cleanup: bool); ## This is sent by workers to indicate that they crossed the percent ## of the current threshold by the percentage defined globally in @@ -53,14 +53,20 @@ export { ## This event is scheduled internally on workers to send result chunks. global send_data: event(uid: string, ss_name: string, data: ResultTable, cleanup: bool); + global get_a_key: event(uid: string, ss_name: string); + + global send_a_key: event(uid: string, ss_name: string, key: Key); + global send_no_key: event(uid: string, ss_name: string); + ## This event is generated when a threshold is crossed. global cluster_threshold_crossed: event(ss_name: string, key: SumStats::Key, thold_index: count); } # Add events to the cluster framework to make this work. -redef Cluster::manager2worker_events += /SumStats::cluster_(ss_request|key_request|threshold_crossed)/; -redef Cluster::manager2worker_events += /SumStats::thresholds_reset/; -redef Cluster::worker2manager_events += /SumStats::cluster_(ss_response|key_response|key_intermediate_response)/; +redef Cluster::manager2worker_events += /SumStats::cluster_(ss_request|get_result|threshold_crossed)/; +redef Cluster::manager2worker_events += /SumStats::(thresholds_reset|get_a_key)/; +redef Cluster::worker2manager_events += /SumStats::cluster_(ss_response|send_result|key_intermediate_response)/; +redef Cluster::worker2manager_events += /SumStats::(send_a_key|send_no_key)/; @if ( Cluster::local_node_type() != Cluster::MANAGER ) # This variable is maintained to know what keys have recently sent as @@ -69,6 +75,10 @@ redef Cluster::worker2manager_events += /SumStats::cluster_(ss_response|key_resp # an intermediate result has been received. global recent_global_view_keys: table[string, Key] of count &create_expire=1min &default=0; +# Result tables indexed on a uid that are currently being sent to the +# manager. +global sending_results: table[string] of ResultTable = table(); + # This is done on all non-manager node types in the event that a sumstat is # being collected somewhere other than a worker. function data_added(ss: SumStat, key: Key, result: Result) @@ -89,7 +99,7 @@ function data_added(ss: SumStat, key: Key, result: Result) } } -#event SumStats::send_data(uid: string, ss_name: string, data: ResultTable, cleanup: bool) +#event SumStats::send_data(uid: string, ss_name: string, cleanup: bool) # { # #print fmt("WORKER %s: sending data for uid %s...", Cluster::node, uid); # @@ -117,42 +127,86 @@ function data_added(ss: SumStat, key: Key, result: Result) # # changed to something else later. # event SumStats::cluster_ss_response(uid, ss_name, copy(local_data), done, cleanup); # if ( ! done ) -# schedule 0.01 sec { SumStats::send_data(uid, ss_name, incoming_data, T) }; +# schedule 0.01 sec { SumStats::send_data(uid, T) }; # } -#event SumStats::cluster_ss_request(uid: string, ss_name: string, cleanup: bool) -# { -# #print fmt("WORKER %s: received the cluster_ss_request event for %s.", Cluster::node, id); -# -# # Initiate sending all of the data for the requested stats. -# if ( ss_name in result_store ) -# event SumStats::send_data(uid, ss_name, result_store[ss_name], cleanup); -# else -# event SumStats::send_data(uid, ss_name, table(), cleanup); -# -# # Lookup the actual sumstats and reset it, the reference to the data -# # currently stored will be maintained internally by the send_data event. -# if ( ss_name in stats_store && cleanup ) -# reset(stats_store[ss_name]); -# } - -event SumStats::cluster_keys_request(uid: string, ss_name: string, keys: set[Key], cleanup: bool) +event SumStats::get_a_key(uid: string, ss_name: string) { - for ( key in keys ) + if ( uid in sending_results ) { - if ( ss_name in result_store && key in result_store[ss_name] ) - { - #print fmt("WORKER %s: received the cluster_keys_request event for %s=%s.", Cluster::node, key2str(key), data); + if ( |sending_results[uid]| == 0 ) + event SumStats::send_no_key(uid, ss_name); + for ( key in sending_results[uid] ) + { + event SumStats::send_a_key(uid, ss_name, key); + # break to only send one. + break; + } + } + else if ( ss_name in result_store && |result_store[ss_name]| > 0 ) + { + if ( |result_store[ss_name]| == 0 ) + event SumStats::send_no_key(uid, ss_name); + + for ( key in result_store[ss_name] ) + { + event SumStats::send_a_key(uid, ss_name, key); + # break to only send one. + break; + } + } + else + { + event SumStats::send_no_key(uid, ss_name); + } + } + +event SumStats::cluster_ss_request(uid: string, ss_name: string, cleanup: bool) + { + #print fmt("WORKER %s: received the cluster_ss_request event for %s.", Cluster::node, id); + + # Create a back store for the result + sending_results[uid] = (ss_name in result_store) ? copy(result_store[ss_name]) : table(); + + # Lookup the actual sumstats and reset it, the reference to the data + # currently stored will be maintained internally from the + # sending_results table. + if ( cleanup && ss_name in stats_store ) + reset(stats_store[ss_name]); + } + +event SumStats::cluster_get_result(uid: string, ss_name: string, key: Key, cleanup: bool) + { + #print fmt("WORKER %s: received the cluster_get_result event for %s=%s.", Cluster::node, key2str(key), data); + + if ( cleanup ) # data will implicitly be in sending_results (i know this isn't great) + { + if ( uid in sending_results && key in sending_results[uid] ) + { # Note: copy is needed to compensate serialization caching issue. This should be # changed to something else later. - event SumStats::cluster_key_response(uid, ss_name, key, copy(result_store[ss_name][key]), cleanup); + event SumStats::cluster_send_result(uid, ss_name, key, copy(sending_results[uid][key]), cleanup); + delete sending_results[uid][key]; } else { # We need to send an empty response if we don't have the data so that the manager # can know that it heard back from all of the workers. - event SumStats::cluster_key_response(uid, ss_name, key, table(), cleanup); + event SumStats::cluster_send_result(uid, ss_name, key, table(), cleanup); + } + } + else + { + if ( ss_name in result_store && key in result_store[ss_name] ) + { + event SumStats::cluster_send_result(uid, ss_name, key, copy(result_store[ss_name][key]), cleanup); + } + else + { + # We need to send an empty response if we don't have the data so that the manager + # can know that it heard back from all of the workers. + event SumStats::cluster_send_result(uid, ss_name, key, table(), cleanup); } } } @@ -205,7 +259,7 @@ event SumStats::finish_epoch(ss: SumStat) { if ( network_time() > zero_time ) { - #print fmt("%.6f MANAGER: breaking %s sumstat for %s sumstat", network_time(), ss$name, ss$id); + #print fmt("%.6f MANAGER: breaking %s sumstat", network_time(), ss$name); local uid = unique_id(""); if ( uid in stats_results ) @@ -214,6 +268,9 @@ event SumStats::finish_epoch(ss: SumStat) # Request data from peers. event SumStats::cluster_ss_request(uid, ss$name, T); + + done_with[uid] = 0; + event SumStats::get_a_key(uid, ss$name); } # Schedule the next finish_epoch event. @@ -231,8 +288,129 @@ function data_added(ss: SumStat, key: Key, result: Result) } } -event SumStats::cluster_key_response(uid: string, ss_name: string, key: Key, result: Result, cleanup: bool) +function handle_end_of_result_collection(uid: string, ss_name: string, key: Key, cleanup: bool) { + #print fmt("worker_count:%d :: done_with:%d", Cluster::worker_count, done_with[uid]); + local ss = stats_store[ss_name]; + local ir = key_requests[uid]; + if ( check_thresholds(ss, key, ir, 1.0) ) + { + threshold_crossed(ss, key, ir); + event SumStats::cluster_threshold_crossed(ss_name, key, threshold_tracker[ss_name][key]); + } + + delete key_requests[uid]; + delete done_with[uid]; + + if ( cleanup ) + { + # This is done here because "cleanup" implicitly means + # it's the end of an epoch. + if ( ss?$epoch_result && |ir| > 0 ) + { + local now = network_time(); + ss$epoch_result(now, key, ir); + } + + # Check that there is an outstanding view before subtracting. + # Global views only apply to non-dynamic requests. Dynamic + # requests must be serviced. + if ( outstanding_global_views[ss_name] > 0 ) + --outstanding_global_views[ss_name]; + } + + if ( uid in stats_results ) + delete stats_results[uid][key]; + } + +function request_all_current_keys(uid: string, ss_name: string, cleanup: bool) + { + #print "request_all_current_keys"; + if ( uid in stats_results && |stats_results[uid]| > 0 ) + { + #print fmt(" -- %d remaining keys here", |stats_results[uid]|); + for ( key in stats_results[uid] ) + { + done_with[uid] = 0; + event SumStats::cluster_get_result(uid, ss_name, key, cleanup); + when ( uid in done_with && Cluster::worker_count == done_with[uid] ) + { + handle_end_of_result_collection(uid, ss_name, key, cleanup); + request_all_current_keys(uid, ss_name, cleanup); + } + break; # only a single key + } + } + else + { + # Get more keys! And this breaks us out of the evented loop. + done_with[uid] = 0; + event SumStats::get_a_key(uid, ss_name); + } + } + +event SumStats::send_no_key(uid: string, ss_name: string) + { + #print "send_no_key"; + ++done_with[uid]; + if ( Cluster::worker_count == done_with[uid] ) + { + delete done_with[uid]; + + if ( |stats_results[uid]| > 0 ) + { + #print "we need more keys!"; + # Now that we have a key from each worker, lets + # grab all of the results. + request_all_current_keys(uid, ss_name, T); + } + else + { + #print "we're out of keys!"; + local ss = stats_store[ss_name]; + if ( ss?$epoch_finished ) + ss$epoch_finished(network_time()); + } + } + } + +event SumStats::send_a_key(uid: string, ss_name: string, key: Key) + { + #print fmt("send_a_key %s", key); + if ( uid !in stats_results ) + { + # no clue what happened here + return; + } + + if ( key !in stats_results[uid] ) + stats_results[uid][key] = table(); + + ++done_with[uid]; + if ( Cluster::worker_count == done_with[uid] ) + { + delete done_with[uid]; + + if ( |stats_results[uid]| > 0 ) + { + #print "we need more keys!"; + # Now that we have a key from each worker, lets + # grab all of the results. + request_all_current_keys(uid, ss_name, T); + } + else + { + #print "we're out of keys!"; + local ss = stats_store[ss_name]; + if ( ss?$epoch_finished ) + ss$epoch_finished(network_time()); + } + } + } + +event SumStats::cluster_send_result(uid: string, ss_name: string, key: Key, result: Result, cleanup: bool) + { + #print "cluster_send_result"; #print fmt("%0.6f MANAGER: receiving key data from %s - %s=%s", network_time(), get_event_peer()$descr, key2str(key), result); # We only want to try and do a value merge if there are actually measured datapoints @@ -244,37 +422,6 @@ event SumStats::cluster_key_response(uid: string, ss_name: string, key: Key, res # Mark that a worker is done. ++done_with[uid]; - - #print fmt("worker_count:%d :: done_with:%d", Cluster::worker_count, done_with[uid]); - if ( Cluster::worker_count == done_with[uid] ) - { - local ss = stats_store[ss_name]; - local ir = key_requests[uid]; - if ( check_thresholds(ss, key, ir, 1.0) ) - { - threshold_crossed(ss, key, ir); - event SumStats::cluster_threshold_crossed(ss$name, key, threshold_tracker[ss$name][key]); - } - if () - { - - } - - if ( cleanup ) - { - # We only want to delete the data if this is a non dynamic - # request because the dynamic requests use when statements - # and the data needs to remain available. - delete key_requests[uid]; - delete done_with[uid]; - - # Check that there is an outstanding view before subtracting. - # Global views only apply to non-dynamic requests. Dynamic - # requests must be serviced. - if ( outstanding_global_views[ss_name] > 0 ) - --outstanding_global_views[ss_name]; - } - } } # Managers handle intermediate updates here. @@ -296,96 +443,103 @@ event SumStats::cluster_key_intermediate_response(ss_name: string, key: Key) local uid = unique_id(""); done_with[uid] = 0; - event SumStats::cluster_keys_request(uid, ss_name, set(key), T); + event SumStats::cluster_get_result(uid, ss_name, key, T); } -event SumStats::cluster_ss_response(uid: string, ss_name: string, data: ResultTable, done: bool, cleanup: bool) - { - #print fmt("MANAGER: receiving results from %s", get_event_peer()$descr); +#event SumStats::cluster_ss_response(uid: string, ss_name: string, data: ResultTable, done: bool, cleanup: bool) +# { +# #print fmt("MANAGER: receiving results from %s", get_event_peer()$descr); +# +# # Mark another worker as being "done" for this uid. +# if ( done ) +# ++done_with[uid]; +# +# # We had better only be getting requests for stuff that exists. +# if ( ss_name !in stats_store ) +# return; +# +# if ( uid !in stats_results ) +# stats_results[uid] = table(); +# +# local local_data = stats_results[uid]; +# local ss = stats_store[ss_name]; +# +# for ( key in data ) +# { +# if ( key in local_data ) +# local_data[key] = compose_results(local_data[key], data[key]); +# else +# local_data[key] = data[key]; +# +# # If a stat is done being collected, thresholds for each key +# # need to be checked so we're doing it here to avoid doubly +# # iterating over each key. +# if ( Cluster::worker_count == done_with[uid] ) +# { +# if ( check_thresholds(ss, key, local_data[key], 1.0) ) +# { +# threshold_crossed(ss, key, local_data[key]); +# event SumStats::cluster_threshold_crossed(ss$name, key, threshold_tracker[ss$name][key]); +# } +# } +# } +# +# # If the data has been collected from all peers, we are done and ready to finish. +# if ( cleanup && Cluster::worker_count == done_with[uid] ) +# { +# local now = network_time(); +# if ( ss?$epoch_result ) +# { +# for ( key in local_data ) +# ss$epoch_result(now, key, local_data[key]); +# } +# +# if ( ss?$epoch_finished ) +# ss$epoch_finished(now); +# +# # Clean up +# delete stats_results[uid]; +# delete done_with[uid]; +# reset(ss); +# } +# } - # Mark another worker as being "done" for this uid. - if ( done ) - ++done_with[uid]; - - # We had better only be getting requests for stuff that exists. - if ( ss_name !in stats_store ) - return; - - if ( uid !in stats_results ) - stats_results[uid] = table(); - - local local_data = stats_results[uid]; - local ss = stats_store[ss_name]; - - for ( key in data ) - { - if ( key in local_data ) - local_data[key] = compose_results(local_data[key], data[key]); - else - local_data[key] = data[key]; - - # If a stat is done being collected, thresholds for each key - # need to be checked so we're doing it here to avoid doubly - # iterating over each key. - if ( Cluster::worker_count == done_with[uid] ) - { - if ( check_thresholds(ss, key, local_data[key], 1.0) ) - { - threshold_crossed(ss, key, local_data[key]); - event SumStats::cluster_threshold_crossed(ss$name, key, threshold_tracker[ss$name][key]); - } - } - } - - # If the data has been collected from all peers, we are done and ready to finish. - if ( cleanup && Cluster::worker_count == done_with[uid] ) - { - if ( ss?$epoch_finished ) - ss$epoch_finished(local_data); - - # Clean up - delete stats_results[uid]; - delete done_with[uid]; - reset(ss); - } - } - -function request(ss_name: string): ResultTable - { - # This only needs to be implemented this way for cluster compatibility. - local uid = unique_id("dyn-"); - stats_results[uid] = table(); - done_with[uid] = 0; - event SumStats::cluster_ss_request(uid, ss_name, F); - - return when ( uid in done_with && Cluster::worker_count == done_with[uid] ) - { - if ( uid in stats_results ) - { - local ss_result = stats_results[uid]; - # Clean up - delete stats_results[uid]; - delete done_with[uid]; - reset(stats_store[ss_name]); - return ss_result; - } - else - return table(); - } - timeout 1.1min - { - Reporter::warning(fmt("Dynamic SumStat request for %s took longer than 1 minute and was automatically cancelled.", ss_name)); - return table(); - } - } +#function request(ss_name: string): ResultTable +# { +# # This only needs to be implemented this way for cluster compatibility. +# local uid = unique_id("dyn-"); +# stats_results[uid] = table(); +# done_with[uid] = 0; +# event SumStats::cluster_ss_request(uid, ss_name, F); +# +# return when ( uid in done_with && Cluster::worker_count == done_with[uid] ) +# { +# if ( uid in stats_results ) +# { +# local ss_result = stats_results[uid]; +# # Clean up +# delete stats_results[uid]; +# delete done_with[uid]; +# reset(stats_store[ss_name]); +# return ss_result; +# } +# else +# return table(); +# } +# timeout 1.1min +# { +# Reporter::warning(fmt("Dynamic SumStat request for %s took longer than 1 minute and was automatically cancelled.", ss_name)); +# return table(); +# } +# } function request_key(ss_name: string, key: Key): Result { - local uid = unique_id("dyn-"); + local uid = unique_id(""); done_with[uid] = 0; key_requests[uid] = table(); - event SumStats::cluster_keys_request(uid, ss_name, set(key), F); + event SumStats::cluster_get_result(uid, ss_name, key, F); return when ( uid in done_with && Cluster::worker_count == done_with[uid] ) { local result = key_requests[uid]; diff --git a/scripts/base/frameworks/sumstats/main.bro b/scripts/base/frameworks/sumstats/main.bro index 7a98783df8..282b03da6b 100644 --- a/scripts/base/frameworks/sumstats/main.bro +++ b/scripts/base/frameworks/sumstats/main.bro @@ -118,7 +118,12 @@ export { ## A callback that receives each of the results at the ## end of the analysis epoch. The function will be ## called once for each key. - epoch_result: function(ts: time, key::SumStats::Key, result: SumStats::Result) &optional; + epoch_result: function(ts: time, key: SumStats::Key, result: SumStats::Result) &optional; + + ## A callback that will be called when a single collection + ## interval is completed. The ts value will be the time of + ## when the collection started. + epoch_finished: function(ts:time) &optional; }; ## Create a summary statistic. diff --git a/scripts/base/frameworks/sumstats/non-cluster.bro b/scripts/base/frameworks/sumstats/non-cluster.bro index b7e18bd55a..97e1817598 100644 --- a/scripts/base/frameworks/sumstats/non-cluster.bro +++ b/scripts/base/frameworks/sumstats/non-cluster.bro @@ -6,13 +6,19 @@ event SumStats::finish_epoch(ss: SumStat) { if ( ss$name in result_store ) { + local now = network_time(); + if ( ss?$epoch_result ) { local data = result_store[ss$name]; # TODO: don't block here. for ( key in data ) - ss$epoch_result(network_time(), key, data[key]); + ss$epoch_result(now, key, data[key]); } + + if ( ss?$epoch_finished ) + ss$epoch_finished(now); + reset(ss); } diff --git a/scripts/policy/misc/app-stats/main.bro b/scripts/policy/misc/app-stats/main.bro index e4a38c6893..24c9ac2ade 100644 --- a/scripts/policy/misc/app-stats/main.bro +++ b/scripts/policy/misc/app-stats/main.bro @@ -45,20 +45,16 @@ event bro_init() &priority=3 SumStats::create([$name="app-metrics", $epoch=break_interval, $reducers=set(r1, r2), - $epoch_finished(data: SumStats::ResultTable) = + $epoch_result(ts: time, key: SumStats::Key, result: SumStats::Result) = { local l: Info; - l$ts = network_time(); - l$ts_delta = break_interval; - for ( key in data ) - { - local result = data[key]; - l$app = key$str; - l$bytes = double_to_count(floor(result["apps.bytes"]$sum)); - l$hits = result["apps.hits"]$num; - l$uniq_hosts = result["apps.hits"]$unique; - Log::write(LOG, l); - } + l$ts = network_time(); + l$ts_delta = break_interval; + l$app = key$str; + l$bytes = double_to_count(floor(result["apps.bytes"]$sum)); + l$hits = result["apps.hits"]$num; + l$uniq_hosts = result["apps.hits"]$unique; + Log::write(LOG, l); }]); } diff --git a/scripts/test-all-policy.bro b/scripts/test-all-policy.bro index dcf50b538e..f0900fda07 100644 --- a/scripts/test-all-policy.bro +++ b/scripts/test-all-policy.bro @@ -35,7 +35,15 @@ @load integration/barnyard2/types.bro @load integration/collective-intel/__load__.bro @load integration/collective-intel/main.bro -@load misc/app-metrics.bro +@load misc/app-stats/__load__.bro +@load misc/app-stats/main.bro +@load misc/app-stats/plugins/__load__.bro +@load misc/app-stats/plugins/facebook.bro +@load misc/app-stats/plugins/gmail.bro +@load misc/app-stats/plugins/google.bro +@load misc/app-stats/plugins/netflix.bro +@load misc/app-stats/plugins/pandora.bro +@load misc/app-stats/plugins/youtube.bro @load misc/capture-loss.bro @load misc/detect-traceroute/__load__.bro @load misc/detect-traceroute/main.bro diff --git a/testing/btest/Baseline/scripts.base.frameworks.sumstats.cluster-intermediate-update/manager-1..stdout b/testing/btest/Baseline/scripts.base.frameworks.sumstats.cluster-intermediate-update/manager-1..stdout index a5428dd3b7..810cdb0ae8 100644 --- a/testing/btest/Baseline/scripts.base.frameworks.sumstats.cluster-intermediate-update/manager-1..stdout +++ b/testing/btest/Baseline/scripts.base.frameworks.sumstats.cluster-intermediate-update/manager-1..stdout @@ -1,3 +1,3 @@ A test metric threshold was crossed with a value of: 101.0 -End of epoch handler was called 101.0 +End of epoch handler was called diff --git a/testing/btest/Baseline/scripts.base.frameworks.sumstats.on-demand-cluster/manager-1..stdout b/testing/btest/Baseline/scripts.base.frameworks.sumstats.on-demand-cluster/manager-1..stdout index ed14a1c753..0445fc68b2 100644 --- a/testing/btest/Baseline/scripts.base.frameworks.sumstats.on-demand-cluster/manager-1..stdout +++ b/testing/btest/Baseline/scripts.base.frameworks.sumstats.on-demand-cluster/manager-1..stdout @@ -1,7 +1,2 @@ -Complete SumStat request - Host: 6.5.4.3 -> 1 - Host: 10.10.10.10 -> 5 - Host: 1.2.3.4 -> 169 - Host: 7.2.1.5 -> 145 SumStat key request Host: 7.2.1.5 -> 145 diff --git a/testing/btest/Baseline/scripts.base.frameworks.sumstats.on-demand/.stdout b/testing/btest/Baseline/scripts.base.frameworks.sumstats.on-demand/.stdout index 876c368eb3..7d62edb7f7 100644 --- a/testing/btest/Baseline/scripts.base.frameworks.sumstats.on-demand/.stdout +++ b/testing/btest/Baseline/scripts.base.frameworks.sumstats.on-demand/.stdout @@ -1,5 +1,2 @@ -Complete SumStat request - Host: 1.2.3.4 -> 42 - Host: 4.3.2.1 -> 7 Key request for 1.2.3.4 Host: 1.2.3.4 -> 42 diff --git a/testing/btest/scripts/base/frameworks/sumstats/basic-cluster.bro b/testing/btest/scripts/base/frameworks/sumstats/basic-cluster.bro index 956c43a57b..2206673c3c 100644 --- a/testing/btest/scripts/base/frameworks/sumstats/basic-cluster.bro +++ b/testing/btest/scripts/base/frameworks/sumstats/basic-cluster.bro @@ -26,14 +26,13 @@ event bro_init() &priority=5 SumStats::create([$name="test", $epoch=5secs, $reducers=set(r1), - $epoch_finished(rt: SumStats::ResultTable) = + $epoch_result(ts: time, key: SumStats::Key, result: SumStats::Result) = + { + local r = result["test"]; + print fmt("Host: %s - num:%d - sum:%.1f - avg:%.1f - max:%.1f - min:%.1f - var:%.1f - std_dev:%.1f - unique:%d", key$host, r$num, r$sum, r$average, r$max, r$min, r$variance, r$std_dev, r$unique); + }, + $epoch_finished(ts: time) = { - for ( key in rt ) - { - local r = rt[key]["test"]; - print fmt("Host: %s - num:%d - sum:%.1f - avg:%.1f - max:%.1f - min:%.1f - var:%.1f - std_dev:%.1f - unique:%d", key$host, r$num, r$sum, r$average, r$max, r$min, r$variance, r$std_dev, r$unique); - } - terminate(); }]); } diff --git a/testing/btest/scripts/base/frameworks/sumstats/basic.bro b/testing/btest/scripts/base/frameworks/sumstats/basic.bro index 54160cbf54..906d69a6f3 100644 --- a/testing/btest/scripts/base/frameworks/sumstats/basic.bro +++ b/testing/btest/scripts/base/frameworks/sumstats/basic.bro @@ -14,13 +14,10 @@ event bro_init() &priority=5 SumStats::create([$name="test", $epoch=3secs, $reducers=set(r1), - $epoch_finished(data: SumStats::ResultTable) = + $epoch_result(ts: time, key: SumStats::Key, result: SumStats::Result) = { - for ( key in data ) - { - local r = data[key]["test.metric"]; - print fmt("Host: %s - num:%d - sum:%.1f - var:%.1f - avg:%.1f - max:%.1f - min:%.1f - std_dev:%.1f - unique:%d", key$host, r$num, r$sum, r$variance, r$average, r$max, r$min, r$std_dev, r$unique); - } + local r = result["test.metric"]; + print fmt("Host: %s - num:%d - sum:%.1f - var:%.1f - avg:%.1f - max:%.1f - min:%.1f - std_dev:%.1f - unique:%d", key$host, r$num, r$sum, r$variance, r$average, r$max, r$min, r$std_dev, r$unique); } ]); diff --git a/testing/btest/scripts/base/frameworks/sumstats/cluster-intermediate-update.bro b/testing/btest/scripts/base/frameworks/sumstats/cluster-intermediate-update.bro index 97986aeddf..4fb6b817d3 100644 --- a/testing/btest/scripts/base/frameworks/sumstats/cluster-intermediate-update.bro +++ b/testing/btest/scripts/base/frameworks/sumstats/cluster-intermediate-update.bro @@ -23,11 +23,13 @@ event bro_init() &priority=5 SumStats::create([$name="test", $epoch=10secs, $reducers=set(r1), - $epoch_finished(data: SumStats::ResultTable) = + $epoch_result(ts: time, key: SumStats::Key, result: SumStats::Result) = + { + print result["test.metric"]$sum; + }, + $epoch_finished(ts: time) = { print "End of epoch handler was called"; - for ( res in data ) - print data[res]["test.metric"]$sum; terminate(); }, $threshold_val(key: SumStats::Key, result: SumStats::Result) = diff --git a/testing/btest/scripts/base/frameworks/sumstats/on-demand-cluster.bro b/testing/btest/scripts/base/frameworks/sumstats/on-demand-cluster.bro index 29fcd14c64..48068d8cfe 100644 --- a/testing/btest/scripts/base/frameworks/sumstats/on-demand-cluster.bro +++ b/testing/btest/scripts/base/frameworks/sumstats/on-demand-cluster.bro @@ -22,7 +22,7 @@ global n = 0; event bro_init() &priority=5 { - local r1: SumStats::Reducer = [$stream="test", $apply=set(SumStats::SUM, SumStats::MIN, SumStats::MAX, SumStats::AVERAGE, SumStats::STD_DEV, SumStats::VARIANCE, SumStats::UNIQUE)]; + local r1 = SumStats::Reducer($stream="test", $apply=set(SumStats::SUM, SumStats::MIN, SumStats::MAX, SumStats::AVERAGE, SumStats::STD_DEV, SumStats::VARIANCE, SumStats::UNIQUE)); SumStats::create([$name="test sumstat", $epoch=1hr, $reducers=set(r1)]); @@ -61,23 +61,24 @@ event on_demand2() when ( local result = SumStats::request_key("test sumstat", [$host=host]) ) { print "SumStat key request"; - print fmt(" Host: %s -> %.0f", host, result["test"]$sum); + if ( "test" in result ) + print fmt(" Host: %s -> %.0f", host, result["test"]$sum); terminate(); } } event on_demand() { - when ( local results = SumStats::request("test sumstat") ) - { - print "Complete SumStat request"; - print fmt(" Host: %s -> %.0f", 6.5.4.3, results[[$host=6.5.4.3]]["test"]$sum); - print fmt(" Host: %s -> %.0f", 10.10.10.10, results[[$host=10.10.10.10]]["test"]$sum); - print fmt(" Host: %s -> %.0f", 1.2.3.4, results[[$host=1.2.3.4]]["test"]$sum); - print fmt(" Host: %s -> %.0f", 7.2.1.5, results[[$host=7.2.1.5]]["test"]$sum); + #when ( local results = SumStats::request("test sumstat") ) + # { + # print "Complete SumStat request"; + # print fmt(" Host: %s -> %.0f", 6.5.4.3, results[[$host=6.5.4.3]]["test"]$sum); + # print fmt(" Host: %s -> %.0f", 10.10.10.10, results[[$host=10.10.10.10]]["test"]$sum); + # print fmt(" Host: %s -> %.0f", 1.2.3.4, results[[$host=1.2.3.4]]["test"]$sum); + # print fmt(" Host: %s -> %.0f", 7.2.1.5, results[[$host=7.2.1.5]]["test"]$sum); event on_demand2(); - } + # } } global peer_count = 0; diff --git a/testing/btest/scripts/base/frameworks/sumstats/on-demand.bro b/testing/btest/scripts/base/frameworks/sumstats/on-demand.bro index f93e1a72dc..78aba726ca 100644 --- a/testing/btest/scripts/base/frameworks/sumstats/on-demand.bro +++ b/testing/btest/scripts/base/frameworks/sumstats/on-demand.bro @@ -4,17 +4,18 @@ redef exit_only_after_terminate=T; -event on_demand() - { - when ( local results = SumStats::request("test") ) - { - print "Complete SumStat request"; - for ( key in results ) - { - print fmt(" Host: %s -> %.0f", key$host, results[key]["test.reducer"]$sum); - } - } - } +## Requesting a full sumstats resulttable is not supported yet. +#event on_demand() +# { +# when ( local results = SumStats::request("test") ) +# { +# print "Complete SumStat request"; +# for ( key in results ) +# { +# print fmt(" Host: %s -> %.0f", key$host, results[key]["test.reducer"]$sum); +# } +# } +# } event on_demand_key() { @@ -39,7 +40,7 @@ event bro_init() &priority=5 SumStats::observe("test.reducer", [$host=1.2.3.4], [$num=42]); SumStats::observe("test.reducer", [$host=4.3.2.1], [$num=7]); - schedule 0.1 secs { on_demand() }; + #schedule 0.1 secs { on_demand() }; schedule 1 secs { on_demand_key() }; } diff --git a/testing/btest/scripts/base/frameworks/sumstats/sample-cluster.bro b/testing/btest/scripts/base/frameworks/sumstats/sample-cluster.bro index 794625c25b..1f2bab0229 100644 --- a/testing/btest/scripts/base/frameworks/sumstats/sample-cluster.bro +++ b/testing/btest/scripts/base/frameworks/sumstats/sample-cluster.bro @@ -23,21 +23,18 @@ event bro_init() &priority=5 SumStats::create([$name="test", $epoch=5secs, $reducers=set(r1), - $epoch_finished(rt: SumStats::ResultTable) = + $epoch_result(ts: time, key: SumStats::Key, result: SumStats::Result) = { - local hosts: vector of addr = vector(6.5.4.3, 10.10.10.10, 1.2.3.4, 7.2.1.5); - for ( i in hosts ) - { - local key = [$host=hosts[i]]; - local r = rt[key]["test"]; + local r = result["test"]; + print fmt("Host: %s Sampled observations: %d", key$host, r$sample_elements); + local sample_nums: vector of count = vector(); + for ( sample in r$samples ) + sample_nums[|sample_nums|] =r$samples[sample]$num; - print fmt("Host: %s Sampled observations: %d", key$host, r$sample_elements); - local sample_nums: vector of count = vector(); - for ( sample in r$samples ) - sample_nums[|sample_nums|] =r$samples[sample]$num; - - print fmt(" %s", sort(sample_nums)); - } + print fmt(" %s", sort(sample_nums)); + }, + $epoch_finished(ts: time) = + { terminate(); }]); } diff --git a/testing/btest/scripts/base/frameworks/sumstats/sample.bro b/testing/btest/scripts/base/frameworks/sumstats/sample.bro index 595c1e710a..4ba395b463 100644 --- a/testing/btest/scripts/base/frameworks/sumstats/sample.bro +++ b/testing/btest/scripts/base/frameworks/sumstats/sample.bro @@ -8,15 +8,12 @@ event bro_init() &priority=5 SumStats::create([$name="test", $epoch=3secs, $reducers=set(r1), - $epoch_finished(data: SumStats::ResultTable) = + $epoch_result(ts: time, key: SumStats::Key, result: SumStats::Result) = { - for ( key in data ) - { - print key$host; - local r = data[key]["test.metric"]; - print r$samples; - print r$sample_elements; - } + print key$host; + local r = result["test.metric"]; + print r$samples; + print r$sample_elements; }]); SumStats::observe("test.metric", [$host=1.2.3.4], [$num=5]);