diff --git a/CHANGES b/CHANGES index 6b1a1327d9..d1f1dde6a6 100644 --- a/CHANGES +++ b/CHANGES @@ -1,4 +1,35 @@ +2.1-1026 | 2013-08-02 22:35:09 -0400 + + * Fix the SumStats top-k plugin and test. (Seth Hall) + + * Rework of SumStats API to reduce high instantaneous memory + use on clusters. (Seth Hall) + + * Large update for the SumStats framework. + + - On-demand access to sumstats results through "return from" + functions named SumStats::request and Sumstats::request_key. + Both functions are tested in standalone and clustered modes. + + - $name field has returned to SumStats which simplifies cluster + code and makes the on-demand access stuff possible. + + - Clustered results can only be collected for 1 minute from their + time of creation now instead of time of last read. + + - Thresholds use doubles instead of counts everywhere now. + + - Calculation dependency resolution occurs at start up time now + instead of doing it at observation time which provide a minor + cpu performance improvement. A new plugin registration mechanism + was created to support this change. + + - AppStats now has a minimal doc string and is broken into hook-based + plugins. + + - AppStats and traceroute detection added to local.bro (Seth Hall) + 2.1-1009 | 2013-08-02 17:19:08 -0700 * A number of exec module and raw input reader fixes. (Jon Siwek) diff --git a/VERSION b/VERSION index db04f73c84..b6505c675b 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.1-1009 +2.1-1026 diff --git a/doc/scripts/DocSourcesList.cmake b/doc/scripts/DocSourcesList.cmake index 570de8bb6f..e405834ac6 100644 --- a/doc/scripts/DocSourcesList.cmake +++ b/doc/scripts/DocSourcesList.cmake @@ -206,7 +206,13 @@ rest_target(${psd} policy/frameworks/software/vulnerable.bro) rest_target(${psd} policy/integration/barnyard2/main.bro) rest_target(${psd} policy/integration/barnyard2/types.bro) rest_target(${psd} policy/integration/collective-intel/main.bro) -rest_target(${psd} policy/misc/app-metrics.bro) +rest_target(${psd} policy/misc/app-stats/main.bro) +rest_target(${psd} policy/misc/app-stats/plugins/facebook.bro) +rest_target(${psd} policy/misc/app-stats/plugins/gmail.bro) +rest_target(${psd} policy/misc/app-stats/plugins/google.bro) +rest_target(${psd} policy/misc/app-stats/plugins/netflix.bro) +rest_target(${psd} policy/misc/app-stats/plugins/pandora.bro) +rest_target(${psd} policy/misc/app-stats/plugins/youtube.bro) rest_target(${psd} policy/misc/capture-loss.bro) rest_target(${psd} policy/misc/detect-traceroute/main.bro) rest_target(${psd} policy/misc/load-balancing.bro) diff --git a/scripts/base/frameworks/sumstats/cluster.bro b/scripts/base/frameworks/sumstats/cluster.bro index be0a5b5ded..d2db09f312 100644 --- a/scripts/base/frameworks/sumstats/cluster.bro +++ b/scripts/base/frameworks/sumstats/cluster.bro @@ -10,10 +10,6 @@ module SumStats; export { - ## Allows a user to decide how large of result groups the workers should transmit - ## values for cluster stats aggregation. - const cluster_send_in_groups_of = 50 &redef; - ## The percent of the full threshold value that needs to be met on a single worker ## for that worker to send the value to its manager in order for it to request a ## global view for that value. There is no requirement that the manager requests @@ -27,45 +23,46 @@ export { ## performed. In practice this should hopefully have a minimal effect. const max_outstanding_global_views = 10 &redef; - ## Intermediate updates can cause overload situations on very large clusters. This - ## option may help reduce load and correct intermittent problems. The goal for this - ## option is also meant to be temporary. - const enable_intermediate_updates = T &redef; - ## Event sent by the manager in a cluster to initiate the collection of values for ## a sumstat. - global cluster_ss_request: event(uid: string, ssid: string); + global cluster_ss_request: event(uid: string, ss_name: string, cleanup: bool); ## Event sent by nodes that are collecting sumstats after receiving a request for ## the sumstat from the manager. - global cluster_ss_response: event(uid: string, ssid: string, data: ResultTable, done: bool); + #global cluster_ss_response: event(uid: string, ss_name: string, data: ResultTable, done: bool, cleanup: bool); ## This event is sent by the manager in a cluster to initiate the collection of ## a single key value from a sumstat. It's typically used to get intermediate ## updates before the break interval triggers to speed detection of a value ## crossing a threshold. - global cluster_key_request: event(uid: string, ssid: string, key: Key); + global cluster_get_result: event(uid: string, ss_name: string, key: Key, cleanup: bool); ## This event is sent by nodes in response to a - ## :bro:id:`SumStats::cluster_key_request` event. - global cluster_key_response: event(uid: string, ssid: string, key: Key, result: Result); + ## :bro:id:`SumStats::cluster_get_result` event. + global cluster_send_result: event(uid: string, ss_name: string, key: Key, result: Result, cleanup: bool); ## This is sent by workers to indicate that they crossed the percent ## of the current threshold by the percentage defined globally in ## :bro:id:`SumStats::cluster_request_global_view_percent` - global cluster_key_intermediate_response: event(ssid: string, key: SumStats::Key); + global cluster_key_intermediate_response: event(ss_name: string, key: SumStats::Key); ## This event is scheduled internally on workers to send result chunks. - global send_data: event(uid: string, ssid: string, data: ResultTable); + global send_data: event(uid: string, ss_name: string, data: ResultTable, cleanup: bool); + + global get_a_key: event(uid: string, ss_name: string, cleanup: bool &default=F); + + global send_a_key: event(uid: string, ss_name: string, key: Key); + global send_no_key: event(uid: string, ss_name: string); ## This event is generated when a threshold is crossed. - global cluster_threshold_crossed: event(ssid: string, key: SumStats::Key, thold: Thresholding); + global cluster_threshold_crossed: event(ss_name: string, key: SumStats::Key, thold_index: count); } # Add events to the cluster framework to make this work. -redef Cluster::manager2worker_events += /SumStats::cluster_(ss_request|key_request|threshold_crossed)/; -redef Cluster::manager2worker_events += /SumStats::thresholds_reset/; -redef Cluster::worker2manager_events += /SumStats::cluster_(ss_response|key_response|key_intermediate_response)/; +redef Cluster::manager2worker_events += /SumStats::cluster_(ss_request|get_result|threshold_crossed)/; +redef Cluster::manager2worker_events += /SumStats::(thresholds_reset|get_a_key)/; +redef Cluster::worker2manager_events += /SumStats::cluster_(ss_response|send_result|key_intermediate_response)/; +redef Cluster::worker2manager_events += /SumStats::(send_a_key|send_no_key)/; @if ( Cluster::local_node_type() != Cluster::MANAGER ) # This variable is maintained to know what keys have recently sent as @@ -74,12 +71,9 @@ redef Cluster::worker2manager_events += /SumStats::cluster_(ss_response|key_resp # an intermediate result has been received. global recent_global_view_keys: table[string, Key] of count &create_expire=1min &default=0; -event bro_init() &priority=-100 - { - # The manager is the only host allowed to track these. - stats_store = table(); - reducer_store = table(); - } +# Result tables indexed on a uid that are currently being sent to the +# manager. +global sending_results: table[string] of ResultTable = table(); # This is done on all non-manager node types in the event that a sumstat is # being collected somewhere other than a worker. @@ -87,95 +81,151 @@ function data_added(ss: SumStat, key: Key, result: Result) { # If an intermediate update for this value was sent recently, don't send # it again. - if ( [ss$id, key] in recent_global_view_keys ) + if ( [ss$name, key] in recent_global_view_keys ) return; # If val is 5 and global view % is 0.1 (10%), pct_val will be 50. If that # crosses the full threshold then it's a candidate to send as an # intermediate update. - if ( enable_intermediate_updates && - check_thresholds(ss, key, result, cluster_request_global_view_percent) ) + if ( check_thresholds(ss, key, result, cluster_request_global_view_percent) ) { # kick off intermediate update - event SumStats::cluster_key_intermediate_response(ss$id, key); - ++recent_global_view_keys[ss$id, key]; + event SumStats::cluster_key_intermediate_response(ss$name, key); + ++recent_global_view_keys[ss$name, key]; } } -event SumStats::send_data(uid: string, ssid: string, data: ResultTable) +#event SumStats::send_data(uid: string, ss_name: string, cleanup: bool) +# { +# #print fmt("WORKER %s: sending data for uid %s...", Cluster::node, uid); +# +# local local_data: ResultTable = table(); +# local incoming_data: ResultTable = cleanup ? data : copy(data); +# +# local num_added = 0; +# for ( key in incoming_data ) +# { +# local_data[key] = incoming_data[key]; +# delete incoming_data[key]; +# +# # Only send cluster_send_in_groups_of at a time. Queue another +# # event to send the next group. +# if ( cluster_send_in_groups_of == ++num_added ) +# break; +# } +# +# local done = F; +# # If data is empty, this sumstat is done. +# if ( |incoming_data| == 0 ) +# done = T; +# +# # Note: copy is needed to compensate serialization caching issue. This should be +# # changed to something else later. +# event SumStats::cluster_ss_response(uid, ss_name, copy(local_data), done, cleanup); +# if ( ! done ) +# schedule 0.01 sec { SumStats::send_data(uid, T) }; +# } + +event SumStats::get_a_key(uid: string, ss_name: string, cleanup: bool) { - #print fmt("WORKER %s: sending data for uid %s...", Cluster::node, uid); - - local local_data: ResultTable = table(); - local num_added = 0; - for ( key in data ) + if ( uid in sending_results ) { - local_data[key] = data[key]; - delete data[key]; - - # Only send cluster_send_in_groups_of at a time. Queue another - # event to send the next group. - if ( cluster_send_in_groups_of == ++num_added ) - break; + if ( |sending_results[uid]| == 0 ) + { + event SumStats::send_no_key(uid, ss_name); + } + else + { + for ( key in sending_results[uid] ) + { + event SumStats::send_a_key(uid, ss_name, key); + # break to only send one. + break; + } + } + } + else if ( !cleanup && ss_name in result_store && |result_store[ss_name]| > 0 ) + { + if ( |result_store[ss_name]| == 0 ) + { + event SumStats::send_no_key(uid, ss_name); + } + else + { + for ( key in result_store[ss_name] ) + { + event SumStats::send_a_key(uid, ss_name, key); + # break to only send one. + break; + } + } + } + else + { + event SumStats::send_no_key(uid, ss_name); } - - local done = F; - # If data is empty, this sumstat is done. - if ( |data| == 0 ) - done = T; - - # Note: copy is needed to compensate serialization caching issue. This should be - # changed to something else later. - event SumStats::cluster_ss_response(uid, ssid, copy(local_data), done); - if ( ! done ) - schedule 0.01 sec { SumStats::send_data(uid, ssid, data) }; } -event SumStats::cluster_ss_request(uid: string, ssid: string) +event SumStats::cluster_ss_request(uid: string, ss_name: string, cleanup: bool) { #print fmt("WORKER %s: received the cluster_ss_request event for %s.", Cluster::node, id); - # Initiate sending all of the data for the requested stats. - if ( ssid in result_store ) - event SumStats::send_data(uid, ssid, result_store[ssid]); - else - event SumStats::send_data(uid, ssid, table()); + # Create a back store for the result + sending_results[uid] = (ss_name in result_store) ? copy(result_store[ss_name]) : table(); # Lookup the actual sumstats and reset it, the reference to the data - # currently stored will be maintained internally by the send_data event. - if ( ssid in stats_store ) - reset(stats_store[ssid]); + # currently stored will be maintained internally from the + # sending_results table. + if ( cleanup && ss_name in stats_store ) + reset(stats_store[ss_name]); } -event SumStats::cluster_key_request(uid: string, ssid: string, key: Key) +event SumStats::cluster_get_result(uid: string, ss_name: string, key: Key, cleanup: bool) { - if ( ssid in result_store && key in result_store[ssid] ) - { - #print fmt("WORKER %s: received the cluster_key_request event for %s=%s.", Cluster::node, key2str(key), data); + #print fmt("WORKER %s: received the cluster_get_result event for %s=%s.", Cluster::node, key2str(key), data); - # Note: copy is needed to compensate serialization caching issue. This should be - # changed to something else later. - event SumStats::cluster_key_response(uid, ssid, key, copy(result_store[ssid][key])); - } - else + if ( cleanup ) # data will implicitly be in sending_results (i know this isn't great) { - # We need to send an empty response if we don't have the data so that the manager - # can know that it heard back from all of the workers. - event SumStats::cluster_key_response(uid, ssid, key, table()); + if ( uid in sending_results && key in sending_results[uid] ) + { + # Note: copy is needed to compensate serialization caching issue. This should be + # changed to something else later. + event SumStats::cluster_send_result(uid, ss_name, key, copy(sending_results[uid][key]), cleanup); + delete sending_results[uid][key]; + } + else + { + # We need to send an empty response if we don't have the data so that the manager + # can know that it heard back from all of the workers. + event SumStats::cluster_send_result(uid, ss_name, key, table(), cleanup); + } + } + else + { + if ( ss_name in result_store && key in result_store[ss_name] ) + { + event SumStats::cluster_send_result(uid, ss_name, key, copy(result_store[ss_name][key]), cleanup); + } + else + { + # We need to send an empty response if we don't have the data so that the manager + # can know that it heard back from all of the workers. + event SumStats::cluster_send_result(uid, ss_name, key, table(), cleanup); + } } } -event SumStats::cluster_threshold_crossed(ssid: string, key: SumStats::Key, thold: Thresholding) +event SumStats::cluster_threshold_crossed(ss_name: string, key: SumStats::Key, thold_index: count) { - if ( ssid !in threshold_tracker ) - threshold_tracker[ssid] = table(); + if ( ss_name !in threshold_tracker ) + threshold_tracker[ss_name] = table(); - threshold_tracker[ssid][key] = thold; + threshold_tracker[ss_name][key] = thold_index; } -event SumStats::thresholds_reset(ssid: string) +event SumStats::thresholds_reset(ss_name: string) { - threshold_tracker[ssid] = table(); + delete threshold_tracker[ss_name]; } @endif @@ -186,7 +236,7 @@ event SumStats::thresholds_reset(ssid: string) # This variable is maintained by manager nodes as they collect and aggregate # results. # Index on a uid. -global stats_results: table[string] of ResultTable &read_expire=1min; +global stats_keys: table[string] of set[Key] &create_expire=1min; # This variable is maintained by manager nodes to track how many "dones" they # collected per collection unique id. Once the number of results for a uid @@ -194,18 +244,18 @@ global stats_results: table[string] of ResultTable &read_expire=1min; # result is written out and deleted from here. # Indexed on a uid. # TODO: add an &expire_func in case not all results are received. -global done_with: table[string] of count &read_expire=1min &default=0; +global done_with: table[string] of count &create_expire=1min &default=0; # This variable is maintained by managers to track intermediate responses as # they are getting a global view for a certain key. # Indexed on a uid. -global key_requests: table[string] of Result &read_expire=1min; +global key_requests: table[string] of Result &create_expire=1min; # This variable is maintained by managers to prevent overwhelming communication due # to too many intermediate updates. Each sumstat is tracked separately so that # one won't overwhelm and degrade other quieter sumstats. # Indexed on a sumstat id. -global outstanding_global_views: table[string] of count &default=0; +global outstanding_global_views: table[string] of count &create_expire=1min &default=0; const zero_time = double_to_time(0.0); # Managers handle logging. @@ -213,15 +263,19 @@ event SumStats::finish_epoch(ss: SumStat) { if ( network_time() > zero_time ) { - #print fmt("%.6f MANAGER: breaking %s sumstat for %s sumstat", network_time(), ss$name, ss$id); + #print fmt("%.6f MANAGER: breaking %s sumstat", network_time(), ss$name); local uid = unique_id(""); - if ( uid in stats_results ) - delete stats_results[uid]; - stats_results[uid] = table(); + if ( uid in stats_keys ) + delete stats_keys[uid]; + stats_keys[uid] = set(); # Request data from peers. - event SumStats::cluster_ss_request(uid, ss$id); + event SumStats::cluster_ss_request(uid, ss$name, T); + + done_with[uid] = 0; + #print fmt("get_key by uid: %s", uid); + event SumStats::get_a_key(uid, ss$name, T); } # Schedule the next finish_epoch event. @@ -235,51 +289,160 @@ function data_added(ss: SumStat, key: Key, result: Result) if ( check_thresholds(ss, key, result, 1.0) ) { threshold_crossed(ss, key, result); - event SumStats::cluster_threshold_crossed(ss$id, key, threshold_tracker[ss$id][key]); + event SumStats::cluster_threshold_crossed(ss$name, key, threshold_tracker[ss$name][key]); } } -event SumStats::cluster_key_response(uid: string, ssid: string, key: Key, result: Result) +function handle_end_of_result_collection(uid: string, ss_name: string, key: Key, cleanup: bool) { + #print fmt("worker_count:%d :: done_with:%d", Cluster::worker_count, done_with[uid]); + local ss = stats_store[ss_name]; + local ir = key_requests[uid]; + if ( check_thresholds(ss, key, ir, 1.0) ) + { + threshold_crossed(ss, key, ir); + event SumStats::cluster_threshold_crossed(ss_name, key, threshold_tracker[ss_name][key]); + } + + if ( cleanup ) + { + # This is done here because "cleanup" implicitly means + # it's the end of an epoch. + if ( ss?$epoch_result && |ir| > 0 ) + { + local now = network_time(); + ss$epoch_result(now, key, ir); + } + + # Check that there is an outstanding view before subtracting. + # Global views only apply to non-dynamic requests. Dynamic + # requests must be serviced. + if ( outstanding_global_views[ss_name] > 0 ) + --outstanding_global_views[ss_name]; + } + + delete key_requests[uid]; + delete done_with[uid]; + } + +function request_all_current_keys(uid: string, ss_name: string, cleanup: bool) + { + #print "request_all_current_keys"; + if ( uid in stats_keys && |stats_keys[uid]| > 0 ) + { + #print fmt(" -- %d remaining keys here", |stats_keys[uid]|); + for ( key in stats_keys[uid] ) + { + done_with[uid] = 0; + event SumStats::cluster_get_result(uid, ss_name, key, cleanup); + when ( uid in done_with && Cluster::worker_count == done_with[uid] ) + { + #print "done getting result"; + handle_end_of_result_collection(uid, ss_name, key, cleanup); + request_all_current_keys(uid, ss_name, cleanup); + } + delete stats_keys[uid][key]; + break; # only a single key + } + } + else + { + # Get more keys! And this breaks us out of the evented loop. + done_with[uid] = 0; + #print fmt("get_key by uid: %s", uid); + event SumStats::get_a_key(uid, ss_name, cleanup); + } + } + +event SumStats::send_no_key(uid: string, ss_name: string) + { + #print "send_no_key"; + ++done_with[uid]; + if ( Cluster::worker_count == done_with[uid] ) + { + delete done_with[uid]; + + if ( |stats_keys[uid]| > 0 ) + { + #print "we need more keys!"; + # Now that we have a key from each worker, lets + # grab all of the results. + request_all_current_keys(uid, ss_name, T); + } + else + { + #print "we're out of keys!"; + local ss = stats_store[ss_name]; + if ( ss?$epoch_finished ) + ss$epoch_finished(network_time()); + } + } + } + +event SumStats::send_a_key(uid: string, ss_name: string, key: Key) + { + #print fmt("send_a_key %s", key); + if ( uid !in stats_keys ) + { + # no clue what happened here + return; + } + + if ( key !in stats_keys[uid] ) + add stats_keys[uid][key]; + + ++done_with[uid]; + if ( Cluster::worker_count == done_with[uid] ) + { + delete done_with[uid]; + + if ( |stats_keys[uid]| > 0 ) + { + #print "we need more keys!"; + # Now that we have a key from each worker, lets + # grab all of the results. + request_all_current_keys(uid, ss_name, T); + } + else + { + #print "we're out of keys!"; + local ss = stats_store[ss_name]; + if ( ss?$epoch_finished ) + ss$epoch_finished(network_time()); + } + } + } + +event SumStats::cluster_send_result(uid: string, ss_name: string, key: Key, result: Result, cleanup: bool) + { + #print "cluster_send_result"; #print fmt("%0.6f MANAGER: receiving key data from %s - %s=%s", network_time(), get_event_peer()$descr, key2str(key), result); # We only want to try and do a value merge if there are actually measured datapoints # in the Result. - if ( uid in key_requests ) - key_requests[uid] = compose_results(key_requests[uid], result); - else + if ( uid !in key_requests || |key_requests[uid]| == 0 ) key_requests[uid] = result; + else + key_requests[uid] = compose_results(key_requests[uid], result); # Mark that a worker is done. ++done_with[uid]; - #print fmt("worker_count:%d :: done_with:%d", Cluster::worker_count, done_with[uid]); - if ( Cluster::worker_count == done_with[uid] ) - { - local ss = stats_store[ssid]; - local ir = key_requests[uid]; - if ( check_thresholds(ss, key, ir, 1.0) ) - { - threshold_crossed(ss, key, ir); - event SumStats::cluster_threshold_crossed(ss$id, key, threshold_tracker[ss$id][key]); - } - - delete done_with[uid]; - delete key_requests[uid]; - # Check that there is an outstanding view before subtracting. - if ( outstanding_global_views[ssid] > 0 ) - --outstanding_global_views[ssid]; - } + #if ( Cluster::worker_count == done_with[uid] ) + # { + # print "done"; + # handle_end_of_result_collection(uid, ss_name, key, cleanup); + # } } # Managers handle intermediate updates here. -event SumStats::cluster_key_intermediate_response(ssid: string, key: Key) +event SumStats::cluster_key_intermediate_response(ss_name: string, key: Key) { #print fmt("MANAGER: receiving intermediate key data from %s", get_event_peer()$descr); #print fmt("MANAGER: requesting key data for %s", key2str(key)); - if ( ssid in outstanding_global_views && - |outstanding_global_views[ssid]| > max_outstanding_global_views ) + if ( ss_name in outstanding_global_views && + |outstanding_global_views[ss_name]| > max_outstanding_global_views ) { # Don't do this intermediate update. Perhaps at some point in the future # we will queue and randomly select from these ignored intermediate @@ -287,60 +450,131 @@ event SumStats::cluster_key_intermediate_response(ssid: string, key: Key) return; } - ++outstanding_global_views[ssid]; + ++outstanding_global_views[ss_name]; local uid = unique_id(""); - event SumStats::cluster_key_request(uid, ssid, key); - } - -event SumStats::cluster_ss_response(uid: string, ssid: string, data: ResultTable, done: bool) - { - #print fmt("MANAGER: receiving results from %s", get_event_peer()$descr); - - # Mark another worker as being "done" for this uid. - if ( done ) - ++done_with[uid]; - - local local_data = stats_results[uid]; - local ss = stats_store[ssid]; - - for ( key in data ) + done_with[uid] = 0; + event SumStats::cluster_get_result(uid, ss_name, key, F); + when ( uid in done_with && Cluster::worker_count == done_with[uid] ) { - if ( key in local_data ) - local_data[key] = compose_results(local_data[key], data[key]); - else - local_data[key] = data[key]; - - # If a stat is done being collected, thresholds for each key - # need to be checked so we're doing it here to avoid doubly - # iterating over each key. - if ( Cluster::worker_count == done_with[uid] ) - { - if ( check_thresholds(ss, key, local_data[key], 1.0) ) - { - threshold_crossed(ss, key, local_data[key]); - event SumStats::cluster_threshold_crossed(ss$id, key, threshold_tracker[ss$id][key]); - } - } + handle_end_of_result_collection(uid, ss_name, key, F); + } + timeout 1.1min + { + Reporter::warning(fmt("Dynamic SumStat intermediate key request for %s (%s) took longer than 1 minute and was automatically cancelled.", ss_name, key)); } - # If the data has been collected from all peers, we are done and ready to finish. - if ( Cluster::worker_count == done_with[uid] ) - { - if ( ss?$epoch_finished ) - ss$epoch_finished(local_data); + } +#event SumStats::cluster_ss_response(uid: string, ss_name: string, data: ResultTable, done: bool, cleanup: bool) +# { +# #print fmt("MANAGER: receiving results from %s", get_event_peer()$descr); +# +# # Mark another worker as being "done" for this uid. +# if ( done ) +# ++done_with[uid]; +# +# # We had better only be getting requests for stuff that exists. +# if ( ss_name !in stats_store ) +# return; +# +# if ( uid !in stats_keys ) +# stats_keys[uid] = table(); +# +# local local_data = stats_keys[uid]; +# local ss = stats_store[ss_name]; +# +# for ( key in data ) +# { +# if ( key in local_data ) +# local_data[key] = compose_results(local_data[key], data[key]); +# else +# local_data[key] = data[key]; +# +# # If a stat is done being collected, thresholds for each key +# # need to be checked so we're doing it here to avoid doubly +# # iterating over each key. +# if ( Cluster::worker_count == done_with[uid] ) +# { +# if ( check_thresholds(ss, key, local_data[key], 1.0) ) +# { +# threshold_crossed(ss, key, local_data[key]); +# event SumStats::cluster_threshold_crossed(ss$name, key, threshold_tracker[ss$name][key]); +# } +# } +# } +# +# # If the data has been collected from all peers, we are done and ready to finish. +# if ( cleanup && Cluster::worker_count == done_with[uid] ) +# { +# local now = network_time(); +# if ( ss?$epoch_result ) +# { +# for ( key in local_data ) +# ss$epoch_result(now, key, local_data[key]); +# } +# +# if ( ss?$epoch_finished ) +# ss$epoch_finished(now); +# +# # Clean up +# delete stats_keys[uid]; +# delete done_with[uid]; +# reset(ss); +# } +# } + +#function request(ss_name: string): ResultTable +# { +# # This only needs to be implemented this way for cluster compatibility. +# local uid = unique_id("dyn-"); +# stats_keys[uid] = table(); +# done_with[uid] = 0; +# event SumStats::cluster_ss_request(uid, ss_name, F); +# +# return when ( uid in done_with && Cluster::worker_count == done_with[uid] ) +# { +# if ( uid in stats_keys ) +# { +# local ss_result = stats_keys[uid]; +# # Clean up +# delete stats_keys[uid]; +# delete done_with[uid]; +# reset(stats_store[ss_name]); +# return ss_result; +# } +# else +# return table(); +# } +# timeout 1.1min +# { +# Reporter::warning(fmt("Dynamic SumStat request for %s took longer than 1 minute and was automatically cancelled.", ss_name)); +# return table(); +# } +# } + +function request_key(ss_name: string, key: Key): Result + { + local uid = unique_id(""); + done_with[uid] = 0; + key_requests[uid] = table(); + + event SumStats::cluster_get_result(uid, ss_name, key, F); + return when ( uid in done_with && Cluster::worker_count == done_with[uid] ) + { + #print "done with request_key"; + local result = key_requests[uid]; # Clean up - delete stats_results[uid]; + delete key_requests[uid]; delete done_with[uid]; - # Not sure I need to reset the sumstat on the manager. - reset(ss); + + return result; + } + timeout 1.1min + { + Reporter::warning(fmt("Dynamic SumStat key request for %s (%s) took longer than 1 minute and was automatically cancelled.", ss_name, key)); + return table(); } } -event remote_connection_handshake_done(p: event_peer) &priority=5 - { - send_id(p, "SumStats::stats_store"); - send_id(p, "SumStats::reducer_store"); - } @endif diff --git a/scripts/base/frameworks/sumstats/main.bro b/scripts/base/frameworks/sumstats/main.bro index cc2aba2362..282b03da6b 100644 --- a/scripts/base/frameworks/sumstats/main.bro +++ b/scripts/base/frameworks/sumstats/main.bro @@ -74,10 +74,6 @@ export { ## Type to store results for multiple reducers. type Result: table[string] of ResultVal; - ## Type to store a table of sumstats results indexed - ## by keys. - type ResultTable: table[Key] of Result; - ## SumStats represent an aggregation of reducers along with ## mechanisms to handle various situations like the epoch ending ## or thresholds being crossed. @@ -87,8 +83,12 @@ export { ## is no assurance provided as to where the callbacks ## will be executed on clusters. type SumStat: record { + ## An arbitrary name for the sumstat so that it can + ## be referred to later. + name: string; + ## The interval at which this filter should be "broken" - ## and the '$epoch_finished' callback called. The + ## and the '$epoch_result' callback called. The ## results are also reset at this time so any threshold ## based detection needs to be set to a ## value that should be expected to happen within @@ -102,22 +102,28 @@ export { ## :bro:see:`SumStats::Result` structure which will be used ## for thresholding. ## This is required if a $threshold value is given. - threshold_val: function(key: SumStats::Key, result: SumStats::Result): count &optional; + threshold_val: function(key: SumStats::Key, result: SumStats::Result): double &optional; ## The threshold value for calling the ## $threshold_crossed callback. - threshold: count &optional; + threshold: double &optional; ## A series of thresholds for calling the ## $threshold_crossed callback. - threshold_series: vector of count &optional; + threshold_series: vector of double &optional; ## A callback that is called when a threshold is crossed. threshold_crossed: function(key: SumStats::Key, result: SumStats::Result) &optional; - ## A callback with the full collection of Results for - ## this SumStat. - epoch_finished: function(rt: SumStats::ResultTable) &optional; + ## A callback that receives each of the results at the + ## end of the analysis epoch. The function will be + ## called once for each key. + epoch_result: function(ts: time, key: SumStats::Key, result: SumStats::Result) &optional; + + ## A callback that will be called when a single collection + ## interval is completed. The ts value will be the time of + ## when the collection started. + epoch_finished: function(ts:time) &optional; }; ## Create a summary statistic. @@ -134,19 +140,23 @@ export { ## obs: The data point to send into the stream. global observe: function(id: string, key: SumStats::Key, obs: SumStats::Observation); - ## This record is primarily used for internal threshold tracking. - type Thresholding: record { - # Internal use only. Indicates if a simple threshold was already crossed. - is_threshold_crossed: bool &default=F; - - # Internal use only. Current key for threshold series. - threshold_series_index: count &default=0; - }; + ## Dynamically request a sumstat key. This function should be + ## used sparingly and not as a replacement for the callbacks + ## from the :bro:see:`SumStat` record. The function is only + ## available for use within "when" statements as an asynchronous + ## function. + ## + ## ss_name: SumStat name. + ## + ## key: The SumStat key being requested. + ## + ## Returns: The result for the requested sumstat key. + global request_key: function(ss_name: string, key: Key): Result; ## This event is generated when thresholds are reset for a SumStat. ## - ## ssid: SumStats ID that thresholds were reset for. - global thresholds_reset: event(ssid: string); + ## name: SumStats name that thresholds were reset for. + global thresholds_reset: event(name: string); ## Helper function to represent a :bro:type:`SumStats::Key` value as ## a simple string. @@ -157,18 +167,49 @@ export { global key2str: function(key: SumStats::Key): string; } +# Type to store a table of sumstats results indexed by keys. +type ResultTable: table[Key] of Result; + +# The function prototype for plugins to do calculations. +type ObserveFunc: function(r: Reducer, val: double, data: Observation, rv: ResultVal); + redef record Reducer += { - # Internal use only. Provides a reference back to the related SumStats by it's ID. - sid: string &optional; + # Internal use only. Provides a reference back to the related SumStats by its name. + ssname: string &optional; + + calc_funcs: vector of Calculation &optional; }; # Internal use only. For tracking thresholds per sumstat and key. -global threshold_tracker: table[string] of table[Key] of Thresholding &optional; +# In the case of a single threshold, 0 means the threshold isn't crossed. +# In the case of a threshold series, the number tracks the threshold offset. +global threshold_tracker: table[string] of table[Key] of count; -redef record SumStat += { - # Internal use only (mostly for cluster coherency). - id: string &optional; -}; +function increment_threshold_tracker(ss_name: string, key: Key) + { + if ( ss_name !in threshold_tracker ) + threshold_tracker[ss_name] = table(); + if ( key !in threshold_tracker[ss_name] ) + threshold_tracker[ss_name][key] = 0; + + ++threshold_tracker[ss_name][key]; + } + +function get_threshold_index(ss_name: string, key: Key): count + { + if ( ss_name !in threshold_tracker ) + return 0; + if ( key !in threshold_tracker[ss_name] ) + return 0; + + return threshold_tracker[ss_name][key]; + } + +# Prototype the hook point for plugins to initialize any result values. +global init_resultval_hook: hook(r: Reducer, rv: ResultVal); + +# Prototype the hook point for plugins to merge Results. +global compose_resultvals_hook: hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal); # Store of sumstats indexed on the sumstat id. global stats_store: table[string] of SumStat = table(); @@ -182,20 +223,20 @@ global result_store: table[string] of ResultTable = table(); # Store of threshold information. global thresholds_store: table[string, Key] of bool = table(); +# Store the calculations. +global calc_store: table[Calculation] of ObserveFunc = table(); + +# Store the dependencies for Calculations. +global calc_deps: table[Calculation] of vector of Calculation = table(); + +# Hook for registering observation calculation plugins. +global register_observe_plugins: hook(); + # This is called whenever key values are updated and the new val is given as the # `val` argument. It's only prototyped here because cluster and non-cluster have # separate implementations. global data_added: function(ss: SumStat, key: Key, result: Result); -# Prototype the hook point for plugins to do calculations. -global observe_hook: hook(r: Reducer, val: double, data: Observation, rv: ResultVal); - -# Prototype the hook point for plugins to initialize any result values. -global init_resultval_hook: hook(r: Reducer, rv: ResultVal); - -# Prototype the hook point for plugins to merge Results. -global compose_resultvals_hook: hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal); - # Event that is used to "finish" measurements and adapt the measurement # framework for clustered or non-clustered usage. global finish_epoch: event(ss: SumStat); @@ -210,6 +251,24 @@ function key2str(key: Key): string return fmt("sumstats_key(%s)", out); } +function register_observe_plugin(calc: Calculation, func: ObserveFunc) + { + calc_store[calc] = func; + } + +function add_observe_plugin_dependency(calc: Calculation, depends_on: Calculation) + { + if ( calc !in calc_deps ) + calc_deps[calc] = vector(); + calc_deps[calc][|calc_deps[calc]|] = depends_on; + } + +event bro_init() &priority=100000 + { + # Call all of the plugin registration hooks + hook register_observe_plugins(); + } + function init_resultval(r: Reducer): ResultVal { local rv: ResultVal = [$begin=network_time(), $end=network_time()]; @@ -234,25 +293,17 @@ function compose_results(r1: Result, r2: Result): Result { local result: Result = table(); - if ( |r1| > |r2| ) + for ( id in r1 ) { - for ( data_id in r1 ) - { - if ( data_id in r2 ) - result[data_id] = compose_resultvals(r1[data_id], r2[data_id]); - else - result[data_id] = r1[data_id]; - } + result[id] = r1[id]; } - else + + for ( id in r2 ) { - for ( data_id in r2 ) - { - if ( data_id in r1 ) - result[data_id] = compose_resultvals(r1[data_id], r2[data_id]); - else - result[data_id] = r2[data_id]; - } + if ( id in r1 ) + result[id] = compose_resultvals(r1[id], r2[id]); + else + result[id] = r2[id]; } return result; @@ -261,18 +312,43 @@ function compose_results(r1: Result, r2: Result): Result function reset(ss: SumStat) { - if ( ss$id in result_store ) - delete result_store[ss$id]; + if ( ss$name in result_store ) + delete result_store[ss$name]; - result_store[ss$id] = table(); + result_store[ss$name] = table(); - if ( ss?$threshold || ss?$threshold_series ) + if ( ss$name in threshold_tracker ) { - threshold_tracker[ss$id] = table(); - event SumStats::thresholds_reset(ss$id); + delete threshold_tracker[ss$name]; + threshold_tracker[ss$name] = table(); + event SumStats::thresholds_reset(ss$name); } } +# This could potentially recurse forever, but plugin authors +# should be making sure they aren't causing reflexive dependencies. +function add_calc_deps(calcs: vector of Calculation, c: Calculation) + { + #print fmt("Checking for deps for %s", c); + for ( i in calc_deps[c] ) + { + local skip_calc=F; + for ( j in calcs ) + { + if ( calcs[j] == calc_deps[c][i] ) + skip_calc=T; + } + if ( ! skip_calc ) + { + if ( calc_deps[c][i] in calc_deps ) + add_calc_deps(calcs, calc_deps[c][i]); + calcs[|c|] = calc_deps[c][i]; + #print fmt("add dep for %s [%s] ", c, calc_deps[c][i]); + } + } + + } + function create(ss: SumStat) { if ( (ss?$threshold || ss?$threshold_series) && ! ss?$threshold_val ) @@ -280,14 +356,34 @@ function create(ss: SumStat) Reporter::error("SumStats given a threshold with no $threshold_val function"); } - if ( ! ss?$id ) - ss$id=unique_id(""); - threshold_tracker[ss$id] = table(); - stats_store[ss$id] = ss; + stats_store[ss$name] = ss; + + if ( ss?$threshold || ss?$threshold_series ) + threshold_tracker[ss$name] = table(); for ( reducer in ss$reducers ) { - reducer$sid = ss$id; + reducer$ssname = ss$name; + reducer$calc_funcs = vector(); + for ( calc in reducer$apply ) + { + # Add in dependencies recursively. + if ( calc in calc_deps ) + add_calc_deps(reducer$calc_funcs, calc); + + # Don't add this calculation to the vector if + # it was already added by something else as a + # dependency. + local skip_calc=F; + for ( j in reducer$calc_funcs ) + { + if ( calc == reducer$calc_funcs[j] ) + skip_calc=T; + } + if ( ! skip_calc ) + reducer$calc_funcs[|reducer$calc_funcs|] = calc; + } + if ( reducer$stream !in reducer_store ) reducer_store[reducer$stream] = set(); add reducer_store[reducer$stream][reducer]; @@ -313,9 +409,9 @@ function observe(id: string, key: Key, obs: Observation) if ( r?$pred && ! r$pred(key, obs) ) next; - local ss = stats_store[r$sid]; + local ss = stats_store[r$ssname]; - # If there is a threshold and no epoch_finished callback + # If there is a threshold and no epoch_result callback # we don't need to continue counting since the data will # never be accessed. This was leading # to some state management issues when measuring @@ -323,18 +419,21 @@ function observe(id: string, key: Key, obs: Observation) # NOTE: this optimization could need removed in the # future if on demand access is provided to the # SumStats results. - if ( ! ss?$epoch_finished && - r$sid in threshold_tracker && - key in threshold_tracker[r$sid] && + if ( ! ss?$epoch_result && + r$ssname in threshold_tracker && ( ss?$threshold && - threshold_tracker[r$sid][key]$is_threshold_crossed ) || + key in threshold_tracker[r$ssname] && + threshold_tracker[r$ssname][key] != 0 ) || ( ss?$threshold_series && - threshold_tracker[r$sid][key]$threshold_series_index+1 == |ss$threshold_series| ) ) + key in threshold_tracker[r$ssname] && + threshold_tracker[r$ssname][key] == |ss$threshold_series| ) ) + { next; + } - if ( r$sid !in result_store ) - result_store[ss$id] = table(); - local results = result_store[r$sid]; + if ( r$ssname !in result_store ) + result_store[r$ssname] = table(); + local results = result_store[r$ssname]; if ( key !in results ) results[key] = table(); @@ -350,10 +449,13 @@ function observe(id: string, key: Key, obs: Observation) # If a string was given, fall back to 1.0 as the value. local val = 1.0; - if ( obs?$num || obs?$dbl ) - val = obs?$dbl ? obs$dbl : obs$num; + if ( obs?$num ) + val = obs$num; + else if ( obs?$dbl ) + val = obs$dbl; - hook observe_hook(r, val, obs, result_val); + for ( i in r$calc_funcs ) + calc_store[r$calc_funcs[i]](r, val, obs, result_val); data_added(ss, key, result); } } @@ -362,10 +464,12 @@ function observe(id: string, key: Key, obs: Observation) # mid-break-interval threshold crossing detection for cluster deployments. function check_thresholds(ss: SumStat, key: Key, result: Result, modify_pct: double): bool { - if ( ! (ss?$threshold || ss?$threshold_series) ) + if ( ! (ss?$threshold || ss?$threshold_series || ss?$threshold_crossed) ) return F; # Add in the extra ResultVals to make threshold_vals easier to write. + # This length comparison should work because we just need to make + # sure that we have the same number of reducers and results. if ( |ss$reducers| != |result| ) { for ( reducer in ss$reducers ) @@ -378,28 +482,21 @@ function check_thresholds(ss: SumStat, key: Key, result: Result, modify_pct: dou local watch = ss$threshold_val(key, result); if ( modify_pct < 1.0 && modify_pct > 0.0 ) - watch = double_to_count(floor(watch/modify_pct)); + watch = watch/modify_pct; - if ( ss$id !in threshold_tracker ) - threshold_tracker[ss$id] = table(); - local t_tracker = threshold_tracker[ss$id]; + local t_index = get_threshold_index(ss$name, key); - if ( key !in t_tracker ) - { - local ttmp: Thresholding; - t_tracker[key] = ttmp; - } - local tt = t_tracker[key]; - - if ( ss?$threshold && ! tt$is_threshold_crossed && watch >= ss$threshold ) + if ( ss?$threshold && + t_index == 0 && # Check that the threshold hasn't already been crossed. + watch >= ss$threshold ) { # Value crossed the threshold. return T; } if ( ss?$threshold_series && - |ss$threshold_series| >= tt$threshold_series_index && - watch >= ss$threshold_series[tt$threshold_series_index] ) + |ss$threshold_series| > t_index && # Check if there are more thresholds. + watch >= ss$threshold_series[t_index] ) { # A threshold series was given and the value crossed the next # value in the series. @@ -415,6 +512,8 @@ function threshold_crossed(ss: SumStat, key: Key, result: Result) if ( ! ss?$threshold_crossed ) return; + increment_threshold_tracker(ss$name,key); + # Add in the extra ResultVals to make threshold_crossed callbacks easier to write. if ( |ss$reducers| != |result| ) { @@ -426,11 +525,5 @@ function threshold_crossed(ss: SumStat, key: Key, result: Result) } ss$threshold_crossed(key, result); - local tt = threshold_tracker[ss$id][key]; - tt$is_threshold_crossed = T; - - # Bump up to the next threshold series index if a threshold series is being used. - if ( ss?$threshold_series ) - ++tt$threshold_series_index; } diff --git a/scripts/base/frameworks/sumstats/non-cluster.bro b/scripts/base/frameworks/sumstats/non-cluster.bro index f27d4b5cfb..97e1817598 100644 --- a/scripts/base/frameworks/sumstats/non-cluster.bro +++ b/scripts/base/frameworks/sumstats/non-cluster.bro @@ -4,11 +4,20 @@ module SumStats; event SumStats::finish_epoch(ss: SumStat) { - if ( ss$id in result_store ) + if ( ss$name in result_store ) { - local data = result_store[ss$id]; + local now = network_time(); + + if ( ss?$epoch_result ) + { + local data = result_store[ss$name]; + # TODO: don't block here. + for ( key in data ) + ss$epoch_result(now, key, data[key]); + } + if ( ss?$epoch_finished ) - ss$epoch_finished(data); + ss$epoch_finished(now); reset(ss); } @@ -16,9 +25,32 @@ event SumStats::finish_epoch(ss: SumStat) schedule ss$epoch { SumStats::finish_epoch(ss) }; } - function data_added(ss: SumStat, key: Key, result: Result) { if ( check_thresholds(ss, key, result, 1.0) ) threshold_crossed(ss, key, result); } + +function request(ss_name: string): ResultTable + { + # This only needs to be implemented this way for cluster compatibility. + return when ( T ) + { + if ( ss_name in result_store ) + return result_store[ss_name]; + else + return table(); + } + } + +function request_key(ss_name: string, key: Key): Result + { + # This only needs to be implemented this way for cluster compatibility. + return when ( T ) + { + if ( ss_name in result_store && key in result_store[ss_name] ) + return result_store[ss_name][key]; + else + return table(); + } + } \ No newline at end of file diff --git a/scripts/base/frameworks/sumstats/plugins/average.bro b/scripts/base/frameworks/sumstats/plugins/average.bro index ad82a91d20..8f7f7b568f 100644 --- a/scripts/base/frameworks/sumstats/plugins/average.bro +++ b/scripts/base/frameworks/sumstats/plugins/average.bro @@ -1,4 +1,4 @@ -@load base/frameworks/sumstats/main +@load ../main module SumStats; @@ -14,17 +14,18 @@ export { }; } -hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal) +hook register_observe_plugins() { - if ( AVERAGE in r$apply ) + register_observe_plugin(AVERAGE, function(r: Reducer, val: double, obs: Observation, rv: ResultVal) { if ( ! rv?$average ) rv$average = val; else rv$average += (val - rv$average) / rv$num; - } + }); } + hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal) { if ( rv1?$average && rv2?$average ) diff --git a/scripts/base/frameworks/sumstats/plugins/last.bro b/scripts/base/frameworks/sumstats/plugins/last.bro index daebe30cf5..24376f8a2b 100644 --- a/scripts/base/frameworks/sumstats/plugins/last.bro +++ b/scripts/base/frameworks/sumstats/plugins/last.bro @@ -33,16 +33,20 @@ function get_last(rv: ResultVal): vector of Observation return s; } -hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal) +hook register_observe_plugins() { - if ( LAST in r$apply && r$num_last_elements > 0 ) + register_observe_plugin(LAST, function(r: Reducer, val: double, obs: Observation, rv: ResultVal) { - if ( ! rv?$last_elements ) - rv$last_elements = Queue::init([$max_len=r$num_last_elements]); - Queue::put(rv$last_elements, obs); - } + if ( r$num_last_elements > 0 ) + { + if ( ! rv?$last_elements ) + rv$last_elements = Queue::init([$max_len=r$num_last_elements]); + Queue::put(rv$last_elements, obs); + } + }); } + hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal) { # Merge $samples diff --git a/scripts/base/frameworks/sumstats/plugins/max.bro b/scripts/base/frameworks/sumstats/plugins/max.bro index f9ff9258ee..d43ad9dc38 100644 --- a/scripts/base/frameworks/sumstats/plugins/max.bro +++ b/scripts/base/frameworks/sumstats/plugins/max.bro @@ -1,4 +1,4 @@ -@load base/frameworks/sumstats/main +@load ../main module SumStats; @@ -14,15 +14,15 @@ export { }; } -hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal) +hook register_observe_plugins() { - if ( MAX in r$apply ) + register_observe_plugin(MAX, function(r: Reducer, val: double, obs: Observation, rv: ResultVal) { if ( ! rv?$max ) rv$max = val; else if ( val > rv$max ) rv$max = val; - } + }); } hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal) diff --git a/scripts/base/frameworks/sumstats/plugins/min.bro b/scripts/base/frameworks/sumstats/plugins/min.bro index 95d492f428..014755cf32 100644 --- a/scripts/base/frameworks/sumstats/plugins/min.bro +++ b/scripts/base/frameworks/sumstats/plugins/min.bro @@ -1,4 +1,4 @@ -@load base/frameworks/sumstats/main +@load ../main module SumStats; @@ -14,17 +14,18 @@ export { }; } -hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal) +hook register_observe_plugins() { - if ( MIN in r$apply ) + register_observe_plugin(MIN, function(r: Reducer, val: double, obs: Observation, rv: ResultVal) { if ( ! rv?$min ) rv$min = val; else if ( val < rv$min ) rv$min = val; - } + }); } + hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal) { if ( rv1?$min && rv2?$min ) diff --git a/scripts/base/frameworks/sumstats/plugins/sample.bro b/scripts/base/frameworks/sumstats/plugins/sample.bro index 9ce85c7795..c270ef42fd 100644 --- a/scripts/base/frameworks/sumstats/plugins/sample.bro +++ b/scripts/base/frameworks/sumstats/plugins/sample.bro @@ -47,15 +47,14 @@ function sample_add_sample(obs:Observation, rv: ResultVal) if ( ra < rv$num_samples ) rv$samples[ra] = obs; } - } -hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal) +hook register_observe_plugins() { - if ( SAMPLE in r$apply ) + register_observe_plugin(SAMPLE, function(r: Reducer, val: double, obs: Observation, rv: ResultVal) { sample_add_sample(obs, rv); - } + }); } hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal) @@ -75,7 +74,6 @@ hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal) return; } - if ( |rv1$samples| != num_samples && |rv2$samples| < num_samples ) { if ( |rv1$samples| != rv1$sample_elements || |rv2$samples| < rv2$sample_elements ) diff --git a/scripts/base/frameworks/sumstats/plugins/std-dev.bro b/scripts/base/frameworks/sumstats/plugins/std-dev.bro index 0f32e25a68..2e5b95b212 100644 --- a/scripts/base/frameworks/sumstats/plugins/std-dev.bro +++ b/scripts/base/frameworks/sumstats/plugins/std-dev.bro @@ -1,5 +1,5 @@ -@load base/frameworks/sumstats/main @load ./variance +@load ../main module SumStats; @@ -21,11 +21,18 @@ function calc_std_dev(rv: ResultVal) rv$std_dev = sqrt(rv$variance); } -# This depends on the variance plugin which uses priority -5 -hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal) &priority=-10 +hook std_dev_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal) { - if ( STD_DEV in r$apply ) + calc_std_dev(rv); + } + +hook register_observe_plugins() &priority=-10 + { + register_observe_plugin(STD_DEV, function(r: Reducer, val: double, obs: Observation, rv: ResultVal) + { calc_std_dev(rv); + }); + add_observe_plugin_dependency(STD_DEV, VARIANCE); } hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal) &priority=-10 diff --git a/scripts/base/frameworks/sumstats/plugins/sum.bro b/scripts/base/frameworks/sumstats/plugins/sum.bro index db2246742b..074b4b72f3 100644 --- a/scripts/base/frameworks/sumstats/plugins/sum.bro +++ b/scripts/base/frameworks/sumstats/plugins/sum.bro @@ -1,4 +1,4 @@ -@load base/frameworks/sumstats/main +@load ../main module SumStats; @@ -14,19 +14,19 @@ export { sum: double &default=0.0; }; - type threshold_function: function(key: SumStats::Key, result: SumStats::Result): count; - global sum_threshold: function(data_id: string): threshold_function; + #type threshold_function: function(key: SumStats::Key, result: SumStats::Result): count; + #global sum_threshold: function(data_id: string): threshold_function; } -function sum_threshold(data_id: string): threshold_function - { - return function(key: SumStats::Key, result: SumStats::Result): count - { - print fmt("data_id: %s", data_id); - print result; - return double_to_count(result[data_id]$sum); - }; - } +#function sum_threshold(data_id: string): threshold_function +# { +# return function(key: SumStats::Key, result: SumStats::Result): count +# { +# print fmt("data_id: %s", data_id); +# print result; +# return double_to_count(result[data_id]$sum); +# }; +# } hook init_resultval_hook(r: Reducer, rv: ResultVal) { @@ -34,10 +34,12 @@ hook init_resultval_hook(r: Reducer, rv: ResultVal) rv$sum = 0; } -hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal) +hook register_observe_plugins() { - if ( SUM in r$apply ) + register_observe_plugin(SUM, function(r: Reducer, val: double, obs: Observation, rv: ResultVal) + { rv$sum += val; + }); } hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal) diff --git a/scripts/base/frameworks/sumstats/plugins/topk.bro b/scripts/base/frameworks/sumstats/plugins/topk.bro index 58f8168f5b..cb90af962e 100644 --- a/scripts/base/frameworks/sumstats/plugins/topk.bro +++ b/scripts/base/frameworks/sumstats/plugins/topk.bro @@ -18,18 +18,20 @@ export { } +hook register_observe_plugins() + { + register_observe_plugin(TOPK, function(r: Reducer, val: double, obs: Observation, rv: ResultVal) + { + topk_add(rv$topk, obs); + }); + } + hook init_resultval_hook(r: Reducer, rv: ResultVal) { if ( TOPK in r$apply && ! rv?$topk ) rv$topk = topk_init(r$topk_size); } -hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal) - { - if ( TOPK in r$apply ) - topk_add(rv$topk, obs); - } - hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal) { if ( rv1?$topk ) diff --git a/scripts/base/frameworks/sumstats/plugins/unique.bro b/scripts/base/frameworks/sumstats/plugins/unique.bro index b8bfc6a4e2..011949ce2f 100644 --- a/scripts/base/frameworks/sumstats/plugins/unique.bro +++ b/scripts/base/frameworks/sumstats/plugins/unique.bro @@ -1,4 +1,4 @@ -@load base/frameworks/sumstats/main +@load ../main module SumStats; @@ -23,15 +23,15 @@ redef record ResultVal += { unique_vals: set[Observation] &optional; }; -hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal) +hook register_observe_plugins() { - if ( UNIQUE in r$apply ) + register_observe_plugin(UNIQUE, function(r: Reducer, val: double, obs: Observation, rv: ResultVal) { if ( ! rv?$unique_vals ) rv$unique_vals=set(); add rv$unique_vals[obs]; rv$unique = |rv$unique_vals|; - } + }); } hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal) diff --git a/scripts/base/frameworks/sumstats/plugins/variance.bro b/scripts/base/frameworks/sumstats/plugins/variance.bro index 773c7d697c..12d30cc4fe 100644 --- a/scripts/base/frameworks/sumstats/plugins/variance.bro +++ b/scripts/base/frameworks/sumstats/plugins/variance.bro @@ -1,5 +1,5 @@ -@load base/frameworks/sumstats/main @load ./average +@load ../main module SumStats; @@ -28,17 +28,17 @@ function calc_variance(rv: ResultVal) rv$variance = (rv$num > 1) ? rv$var_s/(rv$num-1) : 0.0; } -# Reduced priority since this depends on the average -hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal) &priority=-5 +hook register_observe_plugins() &priority=-5 { - if ( VARIANCE in r$apply ) + register_observe_plugin(VARIANCE, function(r: Reducer, val: double, obs: Observation, rv: ResultVal) { if ( rv$num > 1 ) rv$var_s += ((val - rv$prev_avg) * (val - rv$average)); calc_variance(rv); rv$prev_avg = rv$average; - } + }); + add_observe_plugin_dependency(VARIANCE, AVERAGE); } # Reduced priority since this depends on the average diff --git a/scripts/policy/misc/app-metrics.bro b/scripts/policy/misc/app-metrics.bro deleted file mode 100644 index 3df38ad8ad..0000000000 --- a/scripts/policy/misc/app-metrics.bro +++ /dev/null @@ -1,109 +0,0 @@ -@load base/protocols/http -@load base/protocols/ssl -@load base/frameworks/sumstats - -module AppStats; - -export { - redef enum Log::ID += { LOG }; - - type Info: record { - ## Timestamp when the log line was finished and written. - ts: time &log; - ## Time interval that the log line covers. - ts_delta: interval &log; - ## The name of the "app", like "facebook" or "netflix". - app: string &log; - ## The number of unique local hosts using the app. - uniq_hosts: count &log; - ## The number of hits to the app in total. - hits: count &log; - ## The total number of bytes received by users of the app. - bytes: count &log; - }; - - ## The frequency of logging the stats collected by this script. - const break_interval = 15mins &redef; -} - -redef record connection += { - resp_hostname: string &optional; -}; - -event bro_init() &priority=3 - { - Log::create_stream(AppStats::LOG, [$columns=Info]); - - local r1: SumStats::Reducer = [$stream="apps.bytes", $apply=set(SumStats::SUM)]; - local r2: SumStats::Reducer = [$stream="apps.hits", $apply=set(SumStats::UNIQUE)]; - SumStats::create([$epoch=break_interval, - $reducers=set(r1, r2), - $epoch_finished(data: SumStats::ResultTable) = - { - local l: Info; - l$ts = network_time(); - l$ts_delta = break_interval; - for ( key in data ) - { - local result = data[key]; - l$app = key$str; - l$bytes = double_to_count(floor(result["apps.bytes"]$sum)); - l$hits = result["apps.hits"]$num; - l$uniq_hosts = result["apps.hits"]$unique; - Log::write(LOG, l); - } - }]); - } - -function add_sumstats(id: conn_id, hostname: string, size: count) - { - if ( /\.youtube\.com$/ in hostname && size > 512*1024 ) - { - SumStats::observe("apps.bytes", [$str="youtube"], [$num=size]); - SumStats::observe("apps.hits", [$str="youtube"], [$str=cat(id$orig_h)]); - } - else if ( /(\.facebook\.com|\.fbcdn\.net)$/ in hostname && size > 20 ) - { - SumStats::observe("apps.bytes", [$str="facebook"], [$num=size]); - SumStats::observe("apps.hits", [$str="facebook"], [$str=cat(id$orig_h)]); - } - else if ( /\.google\.com$/ in hostname && size > 20 ) - { - SumStats::observe("apps.bytes", [$str="google"], [$num=size]); - SumStats::observe("apps.hits", [$str="google"], [$str=cat(id$orig_h)]); - } - else if ( /\.nflximg\.com$/ in hostname && size > 200*1024 ) - { - SumStats::observe("apps.bytes", [$str="netflix"], [$num=size]); - SumStats::observe("apps.hits", [$str="netflix"], [$str=cat(id$orig_h)]); - } - else if ( /\.(pandora|p-cdn)\.com$/ in hostname && size > 512*1024 ) - { - SumStats::observe("apps.bytes", [$str="pandora"], [$num=size]); - SumStats::observe("apps.hits", [$str="pandora"], [$str=cat(id$orig_h)]); - } - else if ( /\.gmail\.com$/ in hostname && size > 20 ) - { - SumStats::observe("apps.bytes", [$str="gmail"], [$num=size]); - SumStats::observe("apps.hits", [$str="gmail"], [$str=cat(id$orig_h)]); - } -} - - -event ssl_established(c: connection) - { - if ( c?$ssl && c$ssl?$server_name ) - c$resp_hostname = c$ssl$server_name; - } - -event connection_finished(c: connection) - { - if ( c?$resp_hostname ) - add_sumstats(c$id, c$resp_hostname, c$resp$size); - } - -event HTTP::log_http(rec: HTTP::Info) - { - if( rec?$host ) - add_sumstats(rec$id, rec$host, rec$response_body_len); - } diff --git a/scripts/policy/misc/app-stats/__load__.bro b/scripts/policy/misc/app-stats/__load__.bro new file mode 100644 index 0000000000..c468d055ee --- /dev/null +++ b/scripts/policy/misc/app-stats/__load__.bro @@ -0,0 +1,2 @@ +@load ./main +@load ./plugins \ No newline at end of file diff --git a/scripts/policy/misc/app-stats/main.bro b/scripts/policy/misc/app-stats/main.bro new file mode 100644 index 0000000000..24c9ac2ade --- /dev/null +++ b/scripts/policy/misc/app-stats/main.bro @@ -0,0 +1,77 @@ +#! AppStats collects information about web applications in use +#! on the network. + +@load base/protocols/http +@load base/protocols/ssl +@load base/frameworks/sumstats + +module AppStats; + +export { + redef enum Log::ID += { LOG }; + + type Info: record { + ## Timestamp when the log line was finished and written. + ts: time &log; + ## Time interval that the log line covers. + ts_delta: interval &log; + ## The name of the "app", like "facebook" or "netflix". + app: string &log; + ## The number of unique local hosts using the app. + uniq_hosts: count &log; + ## The number of hits to the app in total. + hits: count &log; + ## The total number of bytes received by users of the app. + bytes: count &log; + }; + + ## The frequency of logging the stats collected by this script. + const break_interval = 15mins &redef; +} + +redef record connection += { + resp_hostname: string &optional; +}; + +global add_sumstats: hook(id: conn_id, hostname: string, size: count); + + +event bro_init() &priority=3 + { + Log::create_stream(AppStats::LOG, [$columns=Info]); + + local r1: SumStats::Reducer = [$stream="apps.bytes", $apply=set(SumStats::SUM)]; + local r2: SumStats::Reducer = [$stream="apps.hits", $apply=set(SumStats::UNIQUE)]; + SumStats::create([$name="app-metrics", + $epoch=break_interval, + $reducers=set(r1, r2), + $epoch_result(ts: time, key: SumStats::Key, result: SumStats::Result) = + { + local l: Info; + l$ts = network_time(); + l$ts_delta = break_interval; + l$app = key$str; + l$bytes = double_to_count(floor(result["apps.bytes"]$sum)); + l$hits = result["apps.hits"]$num; + l$uniq_hosts = result["apps.hits"]$unique; + Log::write(LOG, l); + }]); + } + +event ssl_established(c: connection) + { + if ( c?$ssl && c$ssl?$server_name ) + c$resp_hostname = c$ssl$server_name; + } + +event connection_finished(c: connection) + { + if ( c?$resp_hostname ) + hook add_sumstats(c$id, c$resp_hostname, c$resp$size); + } + +event HTTP::log_http(rec: HTTP::Info) + { + if( rec?$host ) + hook add_sumstats(rec$id, rec$host, rec$response_body_len); + } diff --git a/scripts/policy/misc/app-stats/plugins/__load__.bro b/scripts/policy/misc/app-stats/plugins/__load__.bro new file mode 100644 index 0000000000..7a3ea2da81 --- /dev/null +++ b/scripts/policy/misc/app-stats/plugins/__load__.bro @@ -0,0 +1,6 @@ +@load ./facebook +@load ./gmail +@load ./google +@load ./netflix +@load ./pandora +@load ./youtube \ No newline at end of file diff --git a/scripts/policy/misc/app-stats/plugins/facebook.bro b/scripts/policy/misc/app-stats/plugins/facebook.bro new file mode 100644 index 0000000000..edcb02b72a --- /dev/null +++ b/scripts/policy/misc/app-stats/plugins/facebook.bro @@ -0,0 +1,12 @@ +@load ../main + +module AppStats; + +hook add_sumstats(id: conn_id, hostname: string, size: count) + { + if ( /\.(facebook\.com|fbcdn\.net)$/ in hostname && size > 20 ) + { + SumStats::observe("apps.bytes", [$str="facebook"], [$num=size]); + SumStats::observe("apps.hits", [$str="facebook"], [$str=cat(id$orig_h)]); + } + } \ No newline at end of file diff --git a/scripts/policy/misc/app-stats/plugins/gmail.bro b/scripts/policy/misc/app-stats/plugins/gmail.bro new file mode 100644 index 0000000000..1642fb7651 --- /dev/null +++ b/scripts/policy/misc/app-stats/plugins/gmail.bro @@ -0,0 +1,12 @@ +@load ../main + +module AppStats; + +hook add_sumstats(id: conn_id, hostname: string, size: count) + { + if ( /\.gmail\.com$/ in hostname && size > 20 ) + { + SumStats::observe("apps.bytes", [$str="gmail"], [$num=size]); + SumStats::observe("apps.hits", [$str="gmail"], [$str=cat(id$orig_h)]); + } + } \ No newline at end of file diff --git a/scripts/policy/misc/app-stats/plugins/google.bro b/scripts/policy/misc/app-stats/plugins/google.bro new file mode 100644 index 0000000000..e1da3a9068 --- /dev/null +++ b/scripts/policy/misc/app-stats/plugins/google.bro @@ -0,0 +1,12 @@ +@load ../main + +module AppStats; + +hook add_sumstats(id: conn_id, hostname: string, size: count) + { + if ( /\.google\.com$/ in hostname && size > 20 ) + { + SumStats::observe("apps.bytes", [$str="google"], [$num=size]); + SumStats::observe("apps.hits", [$str="google"], [$str=cat(id$orig_h)]); + } + } \ No newline at end of file diff --git a/scripts/policy/misc/app-stats/plugins/netflix.bro b/scripts/policy/misc/app-stats/plugins/netflix.bro new file mode 100644 index 0000000000..5d429f0caf --- /dev/null +++ b/scripts/policy/misc/app-stats/plugins/netflix.bro @@ -0,0 +1,12 @@ +@load ../main + +module AppStats; + +hook add_sumstats(id: conn_id, hostname: string, size: count) + { + if ( /\.nflximg\.com$/ in hostname && size > 200*1024 ) + { + SumStats::observe("apps.bytes", [$str="netflix"], [$num=size]); + SumStats::observe("apps.hits", [$str="netflix"], [$str=cat(id$orig_h)]); + } + } \ No newline at end of file diff --git a/scripts/policy/misc/app-stats/plugins/pandora.bro b/scripts/policy/misc/app-stats/plugins/pandora.bro new file mode 100644 index 0000000000..6cfbfab72d --- /dev/null +++ b/scripts/policy/misc/app-stats/plugins/pandora.bro @@ -0,0 +1,12 @@ +@load ../main + +module AppStats; + +hook add_sumstats(id: conn_id, hostname: string, size: count) + { + if ( /\.(pandora|p-cdn)\.com$/ in hostname && size > 512*1024 ) + { + SumStats::observe("apps.bytes", [$str="pandora"], [$num=size]); + SumStats::observe("apps.hits", [$str="pandora"], [$str=cat(id$orig_h)]); + } + } \ No newline at end of file diff --git a/scripts/policy/misc/app-stats/plugins/youtube.bro b/scripts/policy/misc/app-stats/plugins/youtube.bro new file mode 100644 index 0000000000..af872cfdac --- /dev/null +++ b/scripts/policy/misc/app-stats/plugins/youtube.bro @@ -0,0 +1,12 @@ +@load ../main + +module AppStats; + +hook add_sumstats(id: conn_id, hostname: string, size: count) + { + if ( /\.youtube\.com$/ in hostname && size > 512*1024 ) + { + SumStats::observe("apps.bytes", [$str="youtube"], [$num=size]); + SumStats::observe("apps.hits", [$str="youtube"], [$str=cat(id$orig_h)]); + } + } \ No newline at end of file diff --git a/scripts/policy/misc/detect-traceroute/main.bro b/scripts/policy/misc/detect-traceroute/main.bro index 3ed315746f..6b472f2948 100644 --- a/scripts/policy/misc/detect-traceroute/main.bro +++ b/scripts/policy/misc/detect-traceroute/main.bro @@ -29,7 +29,7 @@ export { ## Defines the threshold for ICMP Time Exceeded messages for a src-dst pair. ## This threshold only comes into play after a host is found to be ## sending low ttl packets. - const icmp_time_exceeded_threshold = 3 &redef; + const icmp_time_exceeded_threshold: double = 3 &redef; ## Interval at which to watch for the ## :bro:id:`Traceroute::icmp_time_exceeded_threshold` variable to be @@ -57,16 +57,17 @@ event bro_init() &priority=5 local r1: SumStats::Reducer = [$stream="traceroute.time_exceeded", $apply=set(SumStats::UNIQUE)]; local r2: SumStats::Reducer = [$stream="traceroute.low_ttl_packet", $apply=set(SumStats::SUM)]; - SumStats::create([$epoch=icmp_time_exceeded_interval, + SumStats::create([$name="traceroute-detection", + $epoch=icmp_time_exceeded_interval, $reducers=set(r1, r2), $threshold_val(key: SumStats::Key, result: SumStats::Result) = { # Give a threshold value of zero depending on if the host # sends a low ttl packet. if ( require_low_ttl_packets && result["traceroute.low_ttl_packet"]$sum == 0 ) - return 0; + return 0.0; else - return result["traceroute.time_exceeded"]$unique; + return result["traceroute.time_exceeded"]$unique+0; }, $threshold=icmp_time_exceeded_threshold, $threshold_crossed(key: SumStats::Key, result: SumStats::Result) = diff --git a/scripts/policy/misc/scan.bro b/scripts/policy/misc/scan.bro index 31caf527b7..909ccac02b 100644 --- a/scripts/policy/misc/scan.bro +++ b/scripts/policy/misc/scan.bro @@ -40,15 +40,11 @@ export { ## The threshold of a unique number of hosts a scanning host has to have failed ## connections with on a single port. - const addr_scan_threshold = 25 &redef; + const addr_scan_threshold = 25.0 &redef; ## The threshold of a number of unique ports a scanning host has to have failed ## connections with on a single victim host. - const port_scan_threshold = 15 &redef; - - ## Custom thresholds based on service for address scan. This is primarily - ## useful for setting reduced thresholds for specific ports. - const addr_scan_custom_thresholds: table[port] of count &redef; + const port_scan_threshold = 15.0 &redef; global Scan::addr_scan_policy: hook(scanner: addr, victim: addr, scanned_port: port); global Scan::port_scan_policy: hook(scanner: addr, victim: addr, scanned_port: port); @@ -57,11 +53,12 @@ export { event bro_init() &priority=5 { local r1: SumStats::Reducer = [$stream="scan.addr.fail", $apply=set(SumStats::UNIQUE)]; - SumStats::create([$epoch=addr_scan_interval, + SumStats::create([$name="addr-scan", + $epoch=addr_scan_interval, $reducers=set(r1), $threshold_val(key: SumStats::Key, result: SumStats::Result) = { - return double_to_count(result["scan.addr.fail"]$unique); + return result["scan.addr.fail"]$unique+0.0; }, #$threshold_func=check_addr_scan_threshold, $threshold=addr_scan_threshold, @@ -81,11 +78,12 @@ event bro_init() &priority=5 # Note: port scans are tracked similar to: table[src_ip, dst_ip] of set(port); local r2: SumStats::Reducer = [$stream="scan.port.fail", $apply=set(SumStats::UNIQUE)]; - SumStats::create([$epoch=port_scan_interval, + SumStats::create([$name="port-scan", + $epoch=port_scan_interval, $reducers=set(r2), $threshold_val(key: SumStats::Key, result: SumStats::Result) = { - return double_to_count(result["scan.port.fail"]$unique); + return result["scan.port.fail"]$unique+0.0; }, $threshold=port_scan_threshold, $threshold_crossed(key: SumStats::Key, result: SumStats::Result) = diff --git a/scripts/policy/protocols/ftp/detect-bruteforcing.bro b/scripts/policy/protocols/ftp/detect-bruteforcing.bro index 21c9c403c7..36dfafb53a 100644 --- a/scripts/policy/protocols/ftp/detect-bruteforcing.bro +++ b/scripts/policy/protocols/ftp/detect-bruteforcing.bro @@ -17,7 +17,7 @@ export { ## How many rejected usernames or passwords are required before being ## considered to be bruteforcing. - const bruteforce_threshold = 20 &redef; + const bruteforce_threshold: double = 20 &redef; ## The time period in which the threshold needs to be crossed before ## being reset. @@ -28,11 +28,12 @@ export { event bro_init() { local r1: SumStats::Reducer = [$stream="ftp.failed_auth", $apply=set(SumStats::UNIQUE)]; - SumStats::create([$epoch=bruteforce_measurement_interval, + SumStats::create([$name="ftp-detect-bruteforcing", + $epoch=bruteforce_measurement_interval, $reducers=set(r1), $threshold_val(key: SumStats::Key, result: SumStats::Result) = { - return result["ftp.failed_auth"]$num; + return result["ftp.failed_auth"]$num+0.0; }, $threshold=bruteforce_threshold, $threshold_crossed(key: SumStats::Key, result: SumStats::Result) = diff --git a/scripts/policy/protocols/http/detect-sqli.bro b/scripts/policy/protocols/http/detect-sqli.bro index 8671bbd165..79d8d6f2f9 100644 --- a/scripts/policy/protocols/http/detect-sqli.bro +++ b/scripts/policy/protocols/http/detect-sqli.bro @@ -28,7 +28,7 @@ export { ## Defines the threshold that determines if an SQL injection attack ## is ongoing based on the number of requests that appear to be SQL ## injection attacks. - const sqli_requests_threshold = 50 &redef; + const sqli_requests_threshold: double = 50.0 &redef; ## Interval at which to watch for the ## :bro:id:`HTTP::sqli_requests_threshold` variable to be crossed. @@ -64,11 +64,12 @@ event bro_init() &priority=3 # determine when it looks like an actual attack and how to respond when # thresholds are crossed. local r1: SumStats::Reducer = [$stream="http.sqli.attacker", $apply=set(SumStats::SUM, SumStats::SAMPLE), $num_samples=collect_SQLi_samples]; - SumStats::create([$epoch=sqli_requests_interval, + SumStats::create([$name="detect-sqli-attackers", + $epoch=sqli_requests_interval, $reducers=set(r1), $threshold_val(key: SumStats::Key, result: SumStats::Result) = { - return double_to_count(result["http.sqli.attacker"]$sum); + return result["http.sqli.attacker"]$sum; }, $threshold=sqli_requests_threshold, $threshold_crossed(key: SumStats::Key, result: SumStats::Result) = @@ -82,11 +83,12 @@ event bro_init() &priority=3 }]); local r2: SumStats::Reducer = [$stream="http.sqli.victim", $apply=set(SumStats::SUM, SumStats::SAMPLE), $num_samples=collect_SQLi_samples]; - SumStats::create([$epoch=sqli_requests_interval, + SumStats::create([$name="detect-sqli-victims", + $epoch=sqli_requests_interval, $reducers=set(r2), $threshold_val(key: SumStats::Key, result: SumStats::Result) = { - return double_to_count(result["http.sqli.victim"]$sum); + return result["http.sqli.victim"]$sum; }, $threshold=sqli_requests_threshold, $threshold_crossed(key: SumStats::Key, result: SumStats::Result) = diff --git a/scripts/policy/protocols/ssh/detect-bruteforcing.bro b/scripts/policy/protocols/ssh/detect-bruteforcing.bro index ada418e61f..7988ecb0ad 100644 --- a/scripts/policy/protocols/ssh/detect-bruteforcing.bro +++ b/scripts/policy/protocols/ssh/detect-bruteforcing.bro @@ -27,7 +27,7 @@ export { ## The number of failed SSH connections before a host is designated as ## guessing passwords. - const password_guesses_limit = 30 &redef; + const password_guesses_limit: double = 30 &redef; ## The amount of time to remember presumed non-successful logins to build ## model of a password guesser. @@ -42,20 +42,29 @@ export { event bro_init() { - local r1: SumStats::Reducer = [$stream="ssh.login.failure", $apply=set(SumStats::SUM)]; - SumStats::create([$epoch=guessing_timeout, + local r1: SumStats::Reducer = [$stream="ssh.login.failure", $apply=set(SumStats::SUM, SumStats::SAMPLE), $num_samples=5]; + SumStats::create([$name="detect-ssh-bruteforcing", + $epoch=guessing_timeout, $reducers=set(r1), $threshold_val(key: SumStats::Key, result: SumStats::Result) = { - return double_to_count(result["ssh.login.failure"]$sum); + return result["ssh.login.failure"]$sum; }, $threshold=password_guesses_limit, $threshold_crossed(key: SumStats::Key, result: SumStats::Result) = { local r = result["ssh.login.failure"]; + local sub_msg = fmt("Sampled servers: "); + local samples = r$samples; + for ( i in samples ) + { + if ( samples[i]?$str ) + sub_msg = fmt("%s%s %s", sub_msg, i==0 ? "":",", samples[i]$str); + } # Generate the notice. NOTICE([$note=Password_Guessing, $msg=fmt("%s appears to be guessing SSH passwords (seen in %d connections).", key$host, r$num), + $sub=sub_msg, $src=key$host, $identifier=cat(key$host)]); }]); @@ -78,5 +87,5 @@ event SSH::heuristic_failed_login(c: connection) # be ignored. if ( ! (id$orig_h in ignore_guessers && id$resp_h in ignore_guessers[id$orig_h]) ) - SumStats::observe("ssh.login.failure", [$host=id$orig_h], [$num=1]); + SumStats::observe("ssh.login.failure", [$host=id$orig_h], [$str=cat(id$resp_h)]); } diff --git a/scripts/site/local.bro b/scripts/site/local.bro index e4b3a44e7a..5b4af4d87e 100644 --- a/scripts/site/local.bro +++ b/scripts/site/local.bro @@ -11,6 +11,13 @@ # Load the scan detection script. @load misc/scan +# Log some information about web applications being used by users +# on your network. +@load misc/app-stats + +# Detect traceroute being run on the network. +@load misc/detect-traceroute + # Generate notices when vulnerable versions of software are discovered. # The default is to only monitor software found in the address space defined # as "local". Refer to the software framework's documentation for more diff --git a/scripts/test-all-policy.bro b/scripts/test-all-policy.bro index dcf50b538e..f0900fda07 100644 --- a/scripts/test-all-policy.bro +++ b/scripts/test-all-policy.bro @@ -35,7 +35,15 @@ @load integration/barnyard2/types.bro @load integration/collective-intel/__load__.bro @load integration/collective-intel/main.bro -@load misc/app-metrics.bro +@load misc/app-stats/__load__.bro +@load misc/app-stats/main.bro +@load misc/app-stats/plugins/__load__.bro +@load misc/app-stats/plugins/facebook.bro +@load misc/app-stats/plugins/gmail.bro +@load misc/app-stats/plugins/google.bro +@load misc/app-stats/plugins/netflix.bro +@load misc/app-stats/plugins/pandora.bro +@load misc/app-stats/plugins/youtube.bro @load misc/capture-loss.bro @load misc/detect-traceroute/__load__.bro @load misc/detect-traceroute/main.bro diff --git a/testing/btest/Baseline/scripts.base.frameworks.sumstats.cluster-intermediate-update/manager-1..stdout b/testing/btest/Baseline/scripts.base.frameworks.sumstats.cluster-intermediate-update/manager-1..stdout index a5428dd3b7..810cdb0ae8 100644 --- a/testing/btest/Baseline/scripts.base.frameworks.sumstats.cluster-intermediate-update/manager-1..stdout +++ b/testing/btest/Baseline/scripts.base.frameworks.sumstats.cluster-intermediate-update/manager-1..stdout @@ -1,3 +1,3 @@ A test metric threshold was crossed with a value of: 101.0 -End of epoch handler was called 101.0 +End of epoch handler was called diff --git a/testing/btest/Baseline/scripts.base.frameworks.sumstats.on-demand-cluster/manager-1..stdout b/testing/btest/Baseline/scripts.base.frameworks.sumstats.on-demand-cluster/manager-1..stdout new file mode 100644 index 0000000000..0445fc68b2 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.sumstats.on-demand-cluster/manager-1..stdout @@ -0,0 +1,2 @@ +SumStat key request + Host: 7.2.1.5 -> 145 diff --git a/testing/btest/Baseline/scripts.base.frameworks.sumstats.on-demand/.stdout b/testing/btest/Baseline/scripts.base.frameworks.sumstats.on-demand/.stdout new file mode 100644 index 0000000000..7d62edb7f7 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.sumstats.on-demand/.stdout @@ -0,0 +1,2 @@ +Key request for 1.2.3.4 + Host: 1.2.3.4 -> 42 diff --git a/testing/btest/Baseline/scripts.base.frameworks.sumstats.sample-cluster/manager-1..stdout b/testing/btest/Baseline/scripts.base.frameworks.sumstats.sample-cluster/manager-1..stdout new file mode 100644 index 0000000000..2eb4687e41 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.sumstats.sample-cluster/manager-1..stdout @@ -0,0 +1,8 @@ +Host: 6.5.4.3 Sampled observations: 2 + [2, 5] +Host: 10.10.10.10 Sampled observations: 1 + [5] +Host: 1.2.3.4 Sampled observations: 34 + [5, 22, 52, 91, 94] +Host: 7.2.1.5 Sampled observations: 2 + [1, 91] diff --git a/testing/btest/Baseline/scripts.base.frameworks.sumstats.sample-cluster/out b/testing/btest/Baseline/scripts.base.frameworks.sumstats.sample-cluster/out deleted file mode 100644 index 2451b82f45..0000000000 --- a/testing/btest/Baseline/scripts.base.frameworks.sumstats.sample-cluster/out +++ /dev/null @@ -1,18 +0,0 @@ -1 -1.2.3.4 -10.10.10.10 -2 -2 -34 -6.5.4.3 -7.2.1.5 -[num=1, dbl=, str=] -[num=2, dbl=, str=] -[num=22, dbl=, str=] -[num=5, dbl=, str=] -[num=5, dbl=, str=] -[num=5, dbl=, str=] -[num=52, dbl=, str=] -[num=91, dbl=, str=] -[num=91, dbl=, str=] -[num=94, dbl=, str=] diff --git a/testing/btest/scripts/base/frameworks/sumstats/basic-cluster.bro b/testing/btest/scripts/base/frameworks/sumstats/basic-cluster.bro index 1b7903ca1a..2206673c3c 100644 --- a/testing/btest/scripts/base/frameworks/sumstats/basic-cluster.bro +++ b/testing/btest/scripts/base/frameworks/sumstats/basic-cluster.bro @@ -23,16 +23,16 @@ global n = 0; event bro_init() &priority=5 { local r1: SumStats::Reducer = [$stream="test", $apply=set(SumStats::SUM, SumStats::MIN, SumStats::MAX, SumStats::AVERAGE, SumStats::STD_DEV, SumStats::VARIANCE, SumStats::UNIQUE)]; - SumStats::create([$epoch=5secs, + SumStats::create([$name="test", + $epoch=5secs, $reducers=set(r1), - $epoch_finished(rt: SumStats::ResultTable) = + $epoch_result(ts: time, key: SumStats::Key, result: SumStats::Result) = + { + local r = result["test"]; + print fmt("Host: %s - num:%d - sum:%.1f - avg:%.1f - max:%.1f - min:%.1f - var:%.1f - std_dev:%.1f - unique:%d", key$host, r$num, r$sum, r$average, r$max, r$min, r$variance, r$std_dev, r$unique); + }, + $epoch_finished(ts: time) = { - for ( key in rt ) - { - local r = rt[key]["test"]; - print fmt("Host: %s - num:%d - sum:%.1f - avg:%.1f - max:%.1f - min:%.1f - var:%.1f - std_dev:%.1f - unique:%d", key$host, r$num, r$sum, r$average, r$max, r$min, r$variance, r$std_dev, r$unique); - } - terminate(); }]); } diff --git a/testing/btest/scripts/base/frameworks/sumstats/basic.bro b/testing/btest/scripts/base/frameworks/sumstats/basic.bro index 0b2851bf10..906d69a6f3 100644 --- a/testing/btest/scripts/base/frameworks/sumstats/basic.bro +++ b/testing/btest/scripts/base/frameworks/sumstats/basic.bro @@ -11,16 +11,14 @@ event bro_init() &priority=5 SumStats::MIN, SumStats::STD_DEV, SumStats::UNIQUE)]; - SumStats::create([$epoch=3secs, - $reducers=set(r1), - $epoch_finished(data: SumStats::ResultTable) = - { - for ( key in data ) - { - local r = data[key]["test.metric"]; - print fmt("Host: %s - num:%d - sum:%.1f - var:%.1f - avg:%.1f - max:%.1f - min:%.1f - std_dev:%.1f - unique:%d", key$host, r$num, r$sum, r$variance, r$average, r$max, r$min, r$std_dev, r$unique); - } - } + SumStats::create([$name="test", + $epoch=3secs, + $reducers=set(r1), + $epoch_result(ts: time, key: SumStats::Key, result: SumStats::Result) = + { + local r = result["test.metric"]; + print fmt("Host: %s - num:%d - sum:%.1f - var:%.1f - avg:%.1f - max:%.1f - min:%.1f - std_dev:%.1f - unique:%d", key$host, r$num, r$sum, r$variance, r$average, r$max, r$min, r$std_dev, r$unique); + } ]); SumStats::observe("test.metric", [$host=1.2.3.4], [$num=5]); diff --git a/testing/btest/scripts/base/frameworks/sumstats/cluster-intermediate-update.bro b/testing/btest/scripts/base/frameworks/sumstats/cluster-intermediate-update.bro index bed1793721..4fb6b817d3 100644 --- a/testing/btest/scripts/base/frameworks/sumstats/cluster-intermediate-update.bro +++ b/testing/btest/scripts/base/frameworks/sumstats/cluster-intermediate-update.bro @@ -20,20 +20,23 @@ redef Log::default_rotation_interval = 0secs; event bro_init() &priority=5 { local r1: SumStats::Reducer = [$stream="test.metric", $apply=set(SumStats::SUM)]; - SumStats::create([$epoch=10secs, + SumStats::create([$name="test", + $epoch=10secs, $reducers=set(r1), - $epoch_finished(data: SumStats::ResultTable) = - { - print "End of epoch handler was called"; - for ( res in data ) - print data[res]["test.metric"]$sum; - terminate(); - }, + $epoch_result(ts: time, key: SumStats::Key, result: SumStats::Result) = + { + print result["test.metric"]$sum; + }, + $epoch_finished(ts: time) = + { + print "End of epoch handler was called"; + terminate(); + }, $threshold_val(key: SumStats::Key, result: SumStats::Result) = { - return double_to_count(result["test.metric"]$sum); + return result["test.metric"]$sum; }, - $threshold=100, + $threshold=100.0, $threshold_crossed(key: SumStats::Key, result: SumStats::Result) = { print fmt("A test metric threshold was crossed with a value of: %.1f", result["test.metric"]$sum); diff --git a/testing/btest/scripts/base/frameworks/sumstats/on-demand-cluster.bro b/testing/btest/scripts/base/frameworks/sumstats/on-demand-cluster.bro new file mode 100644 index 0000000000..48068d8cfe --- /dev/null +++ b/testing/btest/scripts/base/frameworks/sumstats/on-demand-cluster.bro @@ -0,0 +1,96 @@ +# @TEST-SERIALIZE: comm +# +# @TEST-EXEC: btest-bg-run manager-1 BROPATH=$BROPATH:.. CLUSTER_NODE=manager-1 bro %INPUT +# @TEST-EXEC: sleep 1 +# @TEST-EXEC: btest-bg-run worker-1 BROPATH=$BROPATH:.. CLUSTER_NODE=worker-1 bro %INPUT +# @TEST-EXEC: btest-bg-run worker-2 BROPATH=$BROPATH:.. CLUSTER_NODE=worker-2 bro %INPUT +# @TEST-EXEC: btest-bg-wait 15 + +# @TEST-EXEC: btest-diff manager-1/.stdout + +@TEST-START-FILE cluster-layout.bro +redef Cluster::nodes = { + ["manager-1"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=37757/tcp, $workers=set("worker-1", "worker-2")], + ["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=37760/tcp, $manager="manager-1", $interface="eth0"], + ["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=37761/tcp, $manager="manager-1", $interface="eth1"], +}; +@TEST-END-FILE + +redef Log::default_rotation_interval = 0secs; + +global n = 0; + +event bro_init() &priority=5 + { + local r1 = SumStats::Reducer($stream="test", $apply=set(SumStats::SUM, SumStats::MIN, SumStats::MAX, SumStats::AVERAGE, SumStats::STD_DEV, SumStats::VARIANCE, SumStats::UNIQUE)); + SumStats::create([$name="test sumstat", + $epoch=1hr, + $reducers=set(r1)]); + } + +event remote_connection_closed(p: event_peer) + { + terminate(); + } + +global ready_for_data: event(); +redef Cluster::manager2worker_events += /^ready_for_data$/; + +event ready_for_data() + { + if ( Cluster::node == "worker-1" ) + { + SumStats::observe("test", [$host=1.2.3.4], [$num=34]); + SumStats::observe("test", [$host=1.2.3.4], [$num=30]); + SumStats::observe("test", [$host=6.5.4.3], [$num=1]); + SumStats::observe("test", [$host=7.2.1.5], [$num=54]); + } + if ( Cluster::node == "worker-2" ) + { + SumStats::observe("test", [$host=1.2.3.4], [$num=75]); + SumStats::observe("test", [$host=1.2.3.4], [$num=30]); + SumStats::observe("test", [$host=7.2.1.5], [$num=91]); + SumStats::observe("test", [$host=10.10.10.10], [$num=5]); + } + } + + +event on_demand2() + { + local host = 7.2.1.5; + when ( local result = SumStats::request_key("test sumstat", [$host=host]) ) + { + print "SumStat key request"; + if ( "test" in result ) + print fmt(" Host: %s -> %.0f", host, result["test"]$sum); + terminate(); + } + } + +event on_demand() + { + #when ( local results = SumStats::request("test sumstat") ) + # { + # print "Complete SumStat request"; + # print fmt(" Host: %s -> %.0f", 6.5.4.3, results[[$host=6.5.4.3]]["test"]$sum); + # print fmt(" Host: %s -> %.0f", 10.10.10.10, results[[$host=10.10.10.10]]["test"]$sum); + # print fmt(" Host: %s -> %.0f", 1.2.3.4, results[[$host=1.2.3.4]]["test"]$sum); + # print fmt(" Host: %s -> %.0f", 7.2.1.5, results[[$host=7.2.1.5]]["test"]$sum); + + event on_demand2(); + # } + } + +global peer_count = 0; +event remote_connection_handshake_done(p: event_peer) &priority=-5 + { + ++peer_count; + if ( peer_count == 2 ) + { + if ( Cluster::local_node_type() == Cluster::MANAGER ) + event ready_for_data(); + + schedule 1sec { on_demand() }; + } + } + diff --git a/testing/btest/scripts/base/frameworks/sumstats/on-demand.bro b/testing/btest/scripts/base/frameworks/sumstats/on-demand.bro new file mode 100644 index 0000000000..78aba726ca --- /dev/null +++ b/testing/btest/scripts/base/frameworks/sumstats/on-demand.bro @@ -0,0 +1,46 @@ +# @TEST-EXEC: bro %INPUT +# @TEST-EXEC: btest-diff .stdout + +redef exit_only_after_terminate=T; + + +## Requesting a full sumstats resulttable is not supported yet. +#event on_demand() +# { +# when ( local results = SumStats::request("test") ) +# { +# print "Complete SumStat request"; +# for ( key in results ) +# { +# print fmt(" Host: %s -> %.0f", key$host, results[key]["test.reducer"]$sum); +# } +# } +# } + +event on_demand_key() + { + local host = 1.2.3.4; + when ( local result = SumStats::request_key("test", [$host=host]) ) + { + print fmt("Key request for %s", host); + print fmt(" Host: %s -> %.0f", host, result["test.reducer"]$sum); + terminate(); + } + } + +event bro_init() &priority=5 + { + local r1: SumStats::Reducer = [$stream="test.reducer", + $apply=set(SumStats::SUM)]; + SumStats::create([$name="test", + $epoch=1hr, + $reducers=set(r1)]); + + # Seed some data but notice there are no callbacks defined in the sumstat! + SumStats::observe("test.reducer", [$host=1.2.3.4], [$num=42]); + SumStats::observe("test.reducer", [$host=4.3.2.1], [$num=7]); + + #schedule 0.1 secs { on_demand() }; + schedule 1 secs { on_demand_key() }; + } + diff --git a/testing/btest/scripts/base/frameworks/sumstats/sample-cluster.bro b/testing/btest/scripts/base/frameworks/sumstats/sample-cluster.bro index 1b0f0eec94..1f2bab0229 100644 --- a/testing/btest/scripts/base/frameworks/sumstats/sample-cluster.bro +++ b/testing/btest/scripts/base/frameworks/sumstats/sample-cluster.bro @@ -5,8 +5,7 @@ # @TEST-EXEC: btest-bg-run worker-1 BROPATH=$BROPATH:.. CLUSTER_NODE=worker-1 bro %INPUT # @TEST-EXEC: btest-bg-run worker-2 BROPATH=$BROPATH:.. CLUSTER_NODE=worker-2 bro %INPUT # @TEST-EXEC: btest-bg-wait 15 -# @TEST-EXEC: cat manager-1/.stdout | sort > out -# @TEST-EXEC: btest-diff out +# @TEST-EXEC: btest-diff manager-1/.stdout @TEST-START-FILE cluster-layout.bro redef Cluster::nodes = { @@ -18,25 +17,24 @@ redef Cluster::nodes = { redef Log::default_rotation_interval = 0secs; -global n = 0; - event bro_init() &priority=5 { local r1: SumStats::Reducer = [$stream="test", $apply=set(SumStats::SAMPLE), $num_samples=5]; - SumStats::create([$epoch=5secs, + SumStats::create([$name="test", + $epoch=5secs, $reducers=set(r1), - $epoch_finished(rt: SumStats::ResultTable) = + $epoch_result(ts: time, key: SumStats::Key, result: SumStats::Result) = { - for ( key in rt ) - { - print key$host; - local r = rt[key]["test"]; - for ( sample in r$samples ) { - print r$samples[sample]; - } - print r$sample_elements; - } + local r = result["test"]; + print fmt("Host: %s Sampled observations: %d", key$host, r$sample_elements); + local sample_nums: vector of count = vector(); + for ( sample in r$samples ) + sample_nums[|sample_nums|] =r$samples[sample]$num; + print fmt(" %s", sort(sample_nums)); + }, + $epoch_finished(ts: time) = + { terminate(); }]); } diff --git a/testing/btest/scripts/base/frameworks/sumstats/sample.bro b/testing/btest/scripts/base/frameworks/sumstats/sample.bro index 04d7b4f256..4ba395b463 100644 --- a/testing/btest/scripts/base/frameworks/sumstats/sample.bro +++ b/testing/btest/scripts/base/frameworks/sumstats/sample.bro @@ -5,19 +5,16 @@ event bro_init() &priority=5 { local r1: SumStats::Reducer = [$stream="test.metric", $apply=set(SumStats::SAMPLE), $num_samples=2]; - SumStats::create([$epoch=3secs, - $reducers=set(r1), - $epoch_finished(data: SumStats::ResultTable) = - { - for ( key in data ) - { - print key$host; - local r = data[key]["test.metric"]; - print r$samples; - print r$sample_elements; - } - } - ]); + SumStats::create([$name="test", + $epoch=3secs, + $reducers=set(r1), + $epoch_result(ts: time, key: SumStats::Key, result: SumStats::Result) = + { + print key$host; + local r = result["test.metric"]; + print r$samples; + print r$sample_elements; + }]); SumStats::observe("test.metric", [$host=1.2.3.4], [$num=5]); SumStats::observe("test.metric", [$host=1.2.3.4], [$num=22]); diff --git a/testing/btest/scripts/base/frameworks/sumstats/thresholding.bro b/testing/btest/scripts/base/frameworks/sumstats/thresholding.bro index ddc053bd23..b00b30a375 100644 --- a/testing/btest/scripts/base/frameworks/sumstats/thresholding.bro +++ b/testing/btest/scripts/base/frameworks/sumstats/thresholding.bro @@ -8,14 +8,15 @@ redef enum Notice::Type += { event bro_init() &priority=5 { local r1: SumStats::Reducer = [$stream="test.metric", $apply=set(SumStats::SUM)]; - SumStats::create([$epoch=3secs, + SumStats::create([$name="test1", + $epoch=3secs, $reducers=set(r1), #$threshold_val = SumStats::sum_threshold("test.metric"), $threshold_val(key: SumStats::Key, result: SumStats::Result) = { - return double_to_count(result["test.metric"]$sum); + return result["test.metric"]$sum; }, - $threshold=5, + $threshold=5.0, $threshold_crossed(key: SumStats::Key, result: SumStats::Result) = { local r = result["test.metric"]; @@ -24,14 +25,15 @@ event bro_init() &priority=5 ]); local r2: SumStats::Reducer = [$stream="test.metric", $apply=set(SumStats::SUM)]; - SumStats::create([$epoch=3secs, + SumStats::create([$name="test2", + $epoch=3secs, $reducers=set(r2), #$threshold_val = SumStats::sum_threshold("test.metric"), $threshold_val(key: SumStats::Key, result: SumStats::Result) = { - return double_to_count(result["test.metric"]$sum); + return result["test.metric"]$sum; }, - $threshold_series=vector(3,6,800), + $threshold_series=vector(3.0,6.0,800.0), $threshold_crossed(key: SumStats::Key, result: SumStats::Result) = { local r = result["test.metric"]; @@ -41,19 +43,20 @@ event bro_init() &priority=5 local r3: SumStats::Reducer = [$stream="test.metric", $apply=set(SumStats::SUM)]; local r4: SumStats::Reducer = [$stream="test.metric2", $apply=set(SumStats::SUM)]; - SumStats::create([$epoch=3secs, + SumStats::create([$name="test3", + $epoch=3secs, $reducers=set(r3, r4), $threshold_val(key: SumStats::Key, result: SumStats::Result) = { # Calculate a ratio between sums of two reducers. if ( "test.metric2" in result && "test.metric" in result && result["test.metric"]$sum > 0 ) - return double_to_count(result["test.metric2"]$sum / result["test.metric"]$sum); + return result["test.metric2"]$sum / result["test.metric"]$sum; else - return 0; + return 0.0; }, # Looking for metric2 sum to be 5 times the sum of metric - $threshold=5, + $threshold=5.0, $threshold_crossed(key: SumStats::Key, result: SumStats::Result) = { local thold = result["test.metric2"]$sum / result["test.metric"]$sum; diff --git a/testing/btest/scripts/base/frameworks/sumstats/topk-cluster.bro b/testing/btest/scripts/base/frameworks/sumstats/topk-cluster.bro index 0ade38e86c..d26cee4244 100644 --- a/testing/btest/scripts/base/frameworks/sumstats/topk-cluster.bro +++ b/testing/btest/scripts/base/frameworks/sumstats/topk-cluster.bro @@ -23,27 +23,24 @@ event bro_init() &priority=5 { local r1: SumStats::Reducer = [$stream="test.metric", $apply=set(SumStats::TOPK)]; - SumStats::create([$epoch=5secs, - $reducers=set(r1), - $epoch_finished(data: SumStats::ResultTable) = - { - for ( key in data ) - { - local r = data[key]["test.metric"]; - - local s: vector of SumStats::Observation; - s = topk_get_top(r$topk, 5); - - print fmt("Top entries for key %s", key$str); - for ( element in s ) - { - print fmt("Num: %d, count: %d, epsilon: %d", s[element]$num, topk_count(r$topk, s[element]), topk_epsilon(r$topk, s[element])); - } - - terminate(); - } - } - ]); + SumStats::create([$name="topk-test", + $epoch=5secs, + $reducers=set(r1), + $epoch_result(ts: time, key: SumStats::Key, result: SumStats::Result) = + { + local r = result["test.metric"]; + local s: vector of SumStats::Observation; + s = topk_get_top(r$topk, 5); + print fmt("Top entries for key %s", key$str); + for ( element in s ) + { + print fmt("Num: %d, count: %d, epsilon: %d", s[element]$num, topk_count(r$topk, s[element]), topk_epsilon(r$topk, s[element])); + } + }, + $epoch_finished(ts: time) = + { + terminate(); + }]); } diff --git a/testing/btest/scripts/base/frameworks/sumstats/topk.bro b/testing/btest/scripts/base/frameworks/sumstats/topk.bro index 22a5af1bc7..99c301c669 100644 --- a/testing/btest/scripts/base/frameworks/sumstats/topk.bro +++ b/testing/btest/scripts/base/frameworks/sumstats/topk.bro @@ -5,26 +5,21 @@ event bro_init() &priority=5 { local r1: SumStats::Reducer = [$stream="test.metric", $apply=set(SumStats::TOPK)]; - SumStats::create([$epoch=3secs, - $reducers=set(r1), - $epoch_finished(data: SumStats::ResultTable) = - { - for ( key in data ) - { - local r = data[key]["test.metric"]; - - local s: vector of SumStats::Observation; - s = topk_get_top(r$topk, 5); - - print fmt("Top entries for key %s", key$str); - for ( element in s ) - { - print fmt("Num: %d, count: %d, epsilon: %d", s[element]$num, topk_count(r$topk, s[element]), topk_epsilon(r$topk, s[element])); - } - - } - } - ]); + SumStats::create([$name="topk-test", + $epoch=3secs, + $reducers=set(r1), + $epoch_result(ts: time, key: SumStats::Key, result: SumStats::Result) = + { + local r = result["test.metric"]; + local s: vector of SumStats::Observation; + s = topk_get_top(r$topk, 5); + + print fmt("Top entries for key %s", key$str); + for ( element in s ) + { + print fmt("Num: %d, count: %d, epsilon: %d", s[element]$num, topk_count(r$topk, s[element]), topk_epsilon(r$topk, s[element])); + } + }]); const loop_v: vector of count = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100};