From bec965b66f7df28f023ff34600ed26a4f1735c20 Mon Sep 17 00:00:00 2001 From: Seth Hall Date: Tue, 21 May 2013 15:52:59 -0400 Subject: [PATCH] Large update for the SumStats framework. - On-demand access to sumstats results through "return from" functions named SumStats::request and Sumstats::request_key. Both functions are tested in standalone and clustered modes. - $name field has returned to SumStats which simplifies cluster code and makes the on-demand access stuff possible. - Clustered results can only be collected for 1 minute from their time of creation now instead of time of last read. - Thresholds use doubles instead of counts everywhere now. - Calculation dependency resolution occurs at start up time now instead of doing it at observation time which provide a minor cpu performance improvement. A new plugin registration mechanism was created to support this change. - AppStats now has a minimal doc string and is broken into hook-based plugins. - AppStats and traceroute detection added to local.bro --- scripts/base/frameworks/sumstats/cluster.bro | 200 +++++++++------ scripts/base/frameworks/sumstats/main.bro | 241 +++++++++++++----- .../base/frameworks/sumstats/non-cluster.bro | 29 ++- .../frameworks/sumstats/plugins/average.bro | 9 +- .../base/frameworks/sumstats/plugins/max.bro | 8 +- .../base/frameworks/sumstats/plugins/min.bro | 9 +- .../frameworks/sumstats/plugins/sample.bro | 13 +- .../frameworks/sumstats/plugins/std-dev.bro | 15 +- .../base/frameworks/sumstats/plugins/sum.bro | 30 ++- .../frameworks/sumstats/plugins/unique.bro | 8 +- .../frameworks/sumstats/plugins/variance.bro | 10 +- scripts/policy/misc/app-stats/__load__.bro | 2 + .../{app-metrics.bro => app-stats/main.bro} | 48 +--- .../misc/app-stats/plugins/__load__.bro | 6 + .../misc/app-stats/plugins/facebook.bro | 12 + .../policy/misc/app-stats/plugins/gmail.bro | 12 + .../policy/misc/app-stats/plugins/google.bro | 12 + .../policy/misc/app-stats/plugins/netflix.bro | 12 + .../policy/misc/app-stats/plugins/pandora.bro | 12 + .../policy/misc/app-stats/plugins/youtube.bro | 12 + .../policy/misc/detect-traceroute/main.bro | 9 +- scripts/policy/misc/scan.bro | 18 +- .../protocols/ftp/detect-bruteforcing.bro | 7 +- scripts/policy/protocols/http/detect-sqli.bro | 12 +- .../protocols/ssh/detect-bruteforcing.bro | 7 +- scripts/site/local.bro | 7 + .../manager-1..stdout | 7 + .../.stdout | 5 + .../frameworks/sumstats/basic-cluster.bro | 3 +- .../base/frameworks/sumstats/basic.bro | 21 +- .../sumstats/cluster-intermediate-update.bro | 7 +- .../frameworks/sumstats/on-demand-cluster.bro | 93 +++++++ .../base/frameworks/sumstats/on-demand.bro | 45 ++++ .../base/frameworks/sumstats/thresholding.bro | 23 +- 34 files changed, 687 insertions(+), 277 deletions(-) create mode 100644 scripts/policy/misc/app-stats/__load__.bro rename scripts/policy/misc/{app-metrics.bro => app-stats/main.bro} (58%) create mode 100644 scripts/policy/misc/app-stats/plugins/__load__.bro create mode 100644 scripts/policy/misc/app-stats/plugins/facebook.bro create mode 100644 scripts/policy/misc/app-stats/plugins/gmail.bro create mode 100644 scripts/policy/misc/app-stats/plugins/google.bro create mode 100644 scripts/policy/misc/app-stats/plugins/netflix.bro create mode 100644 scripts/policy/misc/app-stats/plugins/pandora.bro create mode 100644 scripts/policy/misc/app-stats/plugins/youtube.bro create mode 100644 testing/btest/Baseline/scripts.base.frameworks.sumstats.on-demand-cluster/manager-1..stdout create mode 100644 testing/btest/Baseline/scripts.base.frameworks.sumstats.on-demand/.stdout create mode 100644 testing/btest/scripts/base/frameworks/sumstats/on-demand-cluster.bro create mode 100644 testing/btest/scripts/base/frameworks/sumstats/on-demand.bro diff --git a/scripts/base/frameworks/sumstats/cluster.bro b/scripts/base/frameworks/sumstats/cluster.bro index 9ee63a674e..23e20ae9c0 100644 --- a/scripts/base/frameworks/sumstats/cluster.bro +++ b/scripts/base/frameworks/sumstats/cluster.bro @@ -27,39 +27,34 @@ export { ## performed. In practice this should hopefully have a minimal effect. const max_outstanding_global_views = 10 &redef; - ## Intermediate updates can cause overload situations on very large clusters. This - ## option may help reduce load and correct intermittent problems. The goal for this - ## option is also meant to be temporary. - const enable_intermediate_updates = T &redef; - ## Event sent by the manager in a cluster to initiate the collection of values for ## a sumstat. - global cluster_ss_request: event(uid: string, ssid: string); + global cluster_ss_request: event(uid: string, ss_name: string, cleanup: bool); ## Event sent by nodes that are collecting sumstats after receiving a request for ## the sumstat from the manager. - global cluster_ss_response: event(uid: string, ssid: string, data: ResultTable, done: bool); + global cluster_ss_response: event(uid: string, ss_name: string, data: ResultTable, done: bool); ## This event is sent by the manager in a cluster to initiate the collection of ## a single key value from a sumstat. It's typically used to get intermediate ## updates before the break interval triggers to speed detection of a value ## crossing a threshold. - global cluster_key_request: event(uid: string, ssid: string, key: Key); + global cluster_key_request: event(uid: string, ss_name: string, key: Key, cleanup: bool); ## This event is sent by nodes in response to a ## :bro:id:`SumStats::cluster_key_request` event. - global cluster_key_response: event(uid: string, ssid: string, key: Key, result: Result); + global cluster_key_response: event(uid: string, ss_name: string, key: Key, result: Result, cleanup: bool); ## This is sent by workers to indicate that they crossed the percent ## of the current threshold by the percentage defined globally in ## :bro:id:`SumStats::cluster_request_global_view_percent` - global cluster_key_intermediate_response: event(ssid: string, key: SumStats::Key); + global cluster_key_intermediate_response: event(ss_name: string, key: SumStats::Key); ## This event is scheduled internally on workers to send result chunks. - global send_data: event(uid: string, ssid: string, data: ResultTable); + global send_data: event(uid: string, ss_name: string, data: ResultTable, cleanup: bool); ## This event is generated when a threshold is crossed. - global cluster_threshold_crossed: event(ssid: string, key: SumStats::Key, thold: Thresholding); + global cluster_threshold_crossed: event(ss_name: string, key: SumStats::Key, thold: Thresholding); } # Add events to the cluster framework to make this work. @@ -74,44 +69,38 @@ redef Cluster::worker2manager_events += /SumStats::cluster_(ss_response|key_resp # an intermediate result has been received. global recent_global_view_keys: table[string, Key] of count &create_expire=1min &default=0; -event bro_init() &priority=-100 - { - # The manager is the only host allowed to track these. - stats_store = table(); - reducer_store = table(); - } - # This is done on all non-manager node types in the event that a sumstat is # being collected somewhere other than a worker. function data_added(ss: SumStat, key: Key, result: Result) { # If an intermediate update for this value was sent recently, don't send # it again. - if ( [ss$id, key] in recent_global_view_keys ) + if ( [ss$name, key] in recent_global_view_keys ) return; # If val is 5 and global view % is 0.1 (10%), pct_val will be 50. If that # crosses the full threshold then it's a candidate to send as an # intermediate update. - if ( enable_intermediate_updates && - check_thresholds(ss, key, result, cluster_request_global_view_percent) ) + if ( check_thresholds(ss, key, result, cluster_request_global_view_percent) ) { # kick off intermediate update - event SumStats::cluster_key_intermediate_response(ss$id, key); - ++recent_global_view_keys[ss$id, key]; + event SumStats::cluster_key_intermediate_response(ss$name, key); + ++recent_global_view_keys[ss$name, key]; } } -event SumStats::send_data(uid: string, ssid: string, data: ResultTable) +event SumStats::send_data(uid: string, ss_name: string, data: ResultTable, cleanup: bool) { #print fmt("WORKER %s: sending data for uid %s...", Cluster::node, uid); local local_data: ResultTable = table(); + local incoming_data: ResultTable = cleanup ? data : copy(data); + local num_added = 0; - for ( key in data ) + for ( key in incoming_data ) { - local_data[key] = data[key]; - delete data[key]; + local_data[key] = incoming_data[key]; + delete incoming_data[key]; # Only send cluster_send_in_groups_of at a time. Queue another # event to send the next group. @@ -121,56 +110,56 @@ event SumStats::send_data(uid: string, ssid: string, data: ResultTable) local done = F; # If data is empty, this sumstat is done. - if ( |data| == 0 ) + if ( |incoming_data| == 0 ) done = T; - event SumStats::cluster_ss_response(uid, ssid, local_data, done); + event SumStats::cluster_ss_response(uid, ss_name, local_data, done); if ( ! done ) - schedule 0.01 sec { SumStats::send_data(uid, ssid, data) }; + schedule 0.01 sec { SumStats::send_data(uid, ss_name, incoming_data, T) }; } -event SumStats::cluster_ss_request(uid: string, ssid: string) +event SumStats::cluster_ss_request(uid: string, ss_name: string, cleanup: bool) { #print fmt("WORKER %s: received the cluster_ss_request event for %s.", Cluster::node, id); # Initiate sending all of the data for the requested stats. - if ( ssid in result_store ) - event SumStats::send_data(uid, ssid, result_store[ssid]); + if ( ss_name in result_store ) + event SumStats::send_data(uid, ss_name, result_store[ss_name], cleanup); else - event SumStats::send_data(uid, ssid, table()); + event SumStats::send_data(uid, ss_name, table(), cleanup); # Lookup the actual sumstats and reset it, the reference to the data # currently stored will be maintained internally by the send_data event. - if ( ssid in stats_store ) - reset(stats_store[ssid]); + if ( ss_name in stats_store && cleanup ) + reset(stats_store[ss_name]); } -event SumStats::cluster_key_request(uid: string, ssid: string, key: Key) +event SumStats::cluster_key_request(uid: string, ss_name: string, key: Key, cleanup: bool) { - if ( ssid in result_store && key in result_store[ssid] ) + if ( ss_name in result_store && key in result_store[ss_name] ) { #print fmt("WORKER %s: received the cluster_key_request event for %s=%s.", Cluster::node, key2str(key), data); - event SumStats::cluster_key_response(uid, ssid, key, result_store[ssid][key]); + event SumStats::cluster_key_response(uid, ss_name, key, result_store[ss_name][key], cleanup); } else { # We need to send an empty response if we don't have the data so that the manager # can know that it heard back from all of the workers. - event SumStats::cluster_key_response(uid, ssid, key, table()); + event SumStats::cluster_key_response(uid, ss_name, key, table(), cleanup); } } -event SumStats::cluster_threshold_crossed(ssid: string, key: SumStats::Key, thold: Thresholding) +event SumStats::cluster_threshold_crossed(ss_name: string, key: SumStats::Key, thold: Thresholding) { - if ( ssid !in threshold_tracker ) - threshold_tracker[ssid] = table(); + if ( ss_name !in threshold_tracker ) + threshold_tracker[ss_name] = table(); - threshold_tracker[ssid][key] = thold; + threshold_tracker[ss_name][key] = thold; } -event SumStats::thresholds_reset(ssid: string) +event SumStats::thresholds_reset(ss_name: string) { - threshold_tracker[ssid] = table(); + threshold_tracker[ss_name] = table(); } @endif @@ -181,7 +170,7 @@ event SumStats::thresholds_reset(ssid: string) # This variable is maintained by manager nodes as they collect and aggregate # results. # Index on a uid. -global stats_results: table[string] of ResultTable &read_expire=1min; +global stats_results: table[string] of ResultTable &create_expire=1min &default=table(); # This variable is maintained by manager nodes to track how many "dones" they # collected per collection unique id. Once the number of results for a uid @@ -189,18 +178,18 @@ global stats_results: table[string] of ResultTable &read_expire=1min; # result is written out and deleted from here. # Indexed on a uid. # TODO: add an &expire_func in case not all results are received. -global done_with: table[string] of count &read_expire=1min &default=0; +global done_with: table[string] of count &create_expire=1min &default=0; # This variable is maintained by managers to track intermediate responses as # they are getting a global view for a certain key. # Indexed on a uid. -global key_requests: table[string] of Result &read_expire=1min; +global key_requests: table[string] of Result &create_expire=1min; # This variable is maintained by managers to prevent overwhelming communication due # to too many intermediate updates. Each sumstat is tracked separately so that # one won't overwhelm and degrade other quieter sumstats. # Indexed on a sumstat id. -global outstanding_global_views: table[string] of count &default=0; +global outstanding_global_views: table[string] of count &create_expire=1min &default=0; const zero_time = double_to_time(0.0); # Managers handle logging. @@ -216,7 +205,7 @@ event SumStats::finish_epoch(ss: SumStat) stats_results[uid] = table(); # Request data from peers. - event SumStats::cluster_ss_request(uid, ss$id); + event SumStats::cluster_ss_request(uid, ss$name, T); } # Schedule the next finish_epoch event. @@ -230,20 +219,20 @@ function data_added(ss: SumStat, key: Key, result: Result) if ( check_thresholds(ss, key, result, 1.0) ) { threshold_crossed(ss, key, result); - event SumStats::cluster_threshold_crossed(ss$id, key, threshold_tracker[ss$id][key]); + event SumStats::cluster_threshold_crossed(ss$name, key, threshold_tracker[ss$name][key]); } } -event SumStats::cluster_key_response(uid: string, ssid: string, key: Key, result: Result) +event SumStats::cluster_key_response(uid: string, ss_name: string, key: Key, result: Result, cleanup: bool) { #print fmt("%0.6f MANAGER: receiving key data from %s - %s=%s", network_time(), get_event_peer()$descr, key2str(key), result); # We only want to try and do a value merge if there are actually measured datapoints # in the Result. - if ( uid in key_requests ) - key_requests[uid] = compose_results(key_requests[uid], result); - else + if ( uid !in key_requests || |key_requests[uid]| == 0 ) key_requests[uid] = result; + else + key_requests[uid] = compose_results(key_requests[uid], result); # Mark that a worker is done. ++done_with[uid]; @@ -251,30 +240,39 @@ event SumStats::cluster_key_response(uid: string, ssid: string, key: Key, result #print fmt("worker_count:%d :: done_with:%d", Cluster::worker_count, done_with[uid]); if ( Cluster::worker_count == done_with[uid] ) { - local ss = stats_store[ssid]; + local ss = stats_store[ss_name]; local ir = key_requests[uid]; if ( check_thresholds(ss, key, ir, 1.0) ) { threshold_crossed(ss, key, ir); - event SumStats::cluster_threshold_crossed(ss$id, key, threshold_tracker[ss$id][key]); + event SumStats::cluster_threshold_crossed(ss$name, key, threshold_tracker[ss$name][key]); } - delete done_with[uid]; - delete key_requests[uid]; - # Check that there is an outstanding view before subtracting. - if ( outstanding_global_views[ssid] > 0 ) - --outstanding_global_views[ssid]; + if ( cleanup ) + { + # We only want to delete the data if this is a non dynamic + # request because the dynamic requests use when statements + # and the data needs to remain available. + delete key_requests[uid]; + delete done_with[uid]; + + # Check that there is an outstanding view before subtracting. + # Global views only apply to non-dynamic requests. Dynamic + # requests must be serviced. + if ( outstanding_global_views[ss_name] > 0 ) + --outstanding_global_views[ss_name]; + } } } # Managers handle intermediate updates here. -event SumStats::cluster_key_intermediate_response(ssid: string, key: Key) +event SumStats::cluster_key_intermediate_response(ss_name: string, key: Key) { #print fmt("MANAGER: receiving intermediate key data from %s", get_event_peer()$descr); #print fmt("MANAGER: requesting key data for %s", key2str(key)); - if ( ssid in outstanding_global_views && - |outstanding_global_views[ssid]| > max_outstanding_global_views ) + if ( ss_name in outstanding_global_views && + |outstanding_global_views[ss_name]| > max_outstanding_global_views ) { # Don't do this intermediate update. Perhaps at some point in the future # we will queue and randomly select from these ignored intermediate @@ -282,13 +280,14 @@ event SumStats::cluster_key_intermediate_response(ssid: string, key: Key) return; } - ++outstanding_global_views[ssid]; + ++outstanding_global_views[ss_name]; local uid = unique_id(""); - event SumStats::cluster_key_request(uid, ssid, key); + done_with[uid] = 0; + event SumStats::cluster_key_request(uid, ss_name, key, T); } -event SumStats::cluster_ss_response(uid: string, ssid: string, data: ResultTable, done: bool) +event SumStats::cluster_ss_response(uid: string, ss_name: string, data: ResultTable, done: bool) { #print fmt("MANAGER: receiving results from %s", get_event_peer()$descr); @@ -297,7 +296,7 @@ event SumStats::cluster_ss_response(uid: string, ssid: string, data: ResultTable ++done_with[uid]; local local_data = stats_results[uid]; - local ss = stats_store[ssid]; + local ss = stats_store[ss_name]; for ( key in data ) { @@ -314,13 +313,14 @@ event SumStats::cluster_ss_response(uid: string, ssid: string, data: ResultTable if ( check_thresholds(ss, key, local_data[key], 1.0) ) { threshold_crossed(ss, key, local_data[key]); - event SumStats::cluster_threshold_crossed(ss$id, key, threshold_tracker[ss$id][key]); + event SumStats::cluster_threshold_crossed(ss$name, key, threshold_tracker[ss$name][key]); } } } # If the data has been collected from all peers, we are done and ready to finish. - if ( Cluster::worker_count == done_with[uid] ) + if ( Cluster::worker_count == done_with[uid] && + /^dyn-/ !in uid ) { if ( ss?$epoch_finished ) ss$epoch_finished(local_data); @@ -328,14 +328,60 @@ event SumStats::cluster_ss_response(uid: string, ssid: string, data: ResultTable # Clean up delete stats_results[uid]; delete done_with[uid]; - # Not sure I need to reset the sumstat on the manager. reset(ss); } } -event remote_connection_handshake_done(p: event_peer) &priority=5 +function request(ss_name: string): ResultTable { - send_id(p, "SumStats::stats_store"); - send_id(p, "SumStats::reducer_store"); + # This only needs to be implemented this way for cluster compatibility. + local uid = unique_id("dyn-"); + stats_results[uid] = table(); + done_with[uid] = 0; + event SumStats::cluster_ss_request(uid, ss_name, F); + + return when ( uid in done_with && Cluster::worker_count == done_with[uid] ) + { + if ( uid in stats_results ) + { + local ss_result = stats_results[uid]; + # Clean up + delete stats_results[uid]; + delete done_with[uid]; + reset(stats_store[ss_name]); + return ss_result; + } + else + return table(); + } + timeout 1.1min + { + Reporter::warning(fmt("Dynamic SumStat request for %s took longer than 1 minute and was automatically cancelled.", ss_name)); + return table(); + } } + +function request_key(ss_name: string, key: Key): Result + { + local uid = unique_id("dyn-"); + done_with[uid] = 0; + key_requests[uid] = table(); + + event SumStats::cluster_key_request(uid, ss_name, key, F); + return when ( uid in done_with && Cluster::worker_count == done_with[uid] ) + { + local result = key_requests[uid]; + # Clean up + delete key_requests[uid]; + delete done_with[uid]; + + return result; + } + timeout 1.1min + { + Reporter::warning(fmt("Dynamic SumStat key request for %s (%s) took longer than 1 minute and was automatically cancelled.", ss_name, key)); + return table(); + } + } + @endif diff --git a/scripts/base/frameworks/sumstats/main.bro b/scripts/base/frameworks/sumstats/main.bro index 6864966766..b1e5761483 100644 --- a/scripts/base/frameworks/sumstats/main.bro +++ b/scripts/base/frameworks/sumstats/main.bro @@ -87,6 +87,10 @@ export { ## is no assurance provided as to where the callbacks ## will be executed on clusters. type SumStat: record { + ## An arbitrary name for the sumstat so that it can + ## be referred to later. + name: string; + ## The interval at which this filter should be "broken" ## and the '$epoch_finished' callback called. The ## results are also reset at this time so any threshold @@ -102,22 +106,22 @@ export { ## :bro:see:`Result` structure which will be used ## for thresholding. ## This is required if a $threshold value is given. - threshold_val: function(key: SumStats::Key, result: SumStats::Result): count &optional; + threshold_val: function(key: SumStats::Key, result: SumStats::Result): double &optional; ## The threshold value for calling the ## $threshold_crossed callback. - threshold: count &optional; + threshold: double &optional; ## A series of thresholds for calling the ## $threshold_crossed callback. - threshold_series: vector of count &optional; + threshold_series: vector of double &optional; ## A callback that is called when a threshold is crossed. threshold_crossed: function(key: SumStats::Key, result: SumStats::Result) &optional; ## A callback with the full collection of Results for ## this SumStat. - epoch_finished: function(rt: SumStats::ResultTable) &optional; + epoch_finished: function(rt: SumStats::ResultTable) &optional; }; ## Create a summary statistic. @@ -134,19 +138,37 @@ export { ## obs: The data point to send into the stream. global observe: function(id: string, key: SumStats::Key, obs: SumStats::Observation); - ## This record is primarily used for internal threshold tracking. - type Thresholding: record { - # Internal use only. Indicates if a simple threshold was already crossed. - is_threshold_crossed: bool &default=F; + ## Dynamically request a sumstat. This function should be + ## used sparingly and not as a replacement for the callbacks + ## from the :bro:see:`SumStat` record. The function is only + ## available for use within "when" statements as an asynchronous + ## function. + ## + ## ss_name: SumState name. + ## + ## Returns: The result table for the requested sumstat. + global request: function(ss_name: string): ResultTable; - # Internal use only. Current key for threshold series. - threshold_series_index: count &default=0; - }; + ## Dynamically request a sumstat key. This function should be + ## used sparingly and not as a replacement for the callbacks + ## from the :bro:see:`SumStat` record. The function is only + ## available for use within "when" statements as an asynchronous + ## function. + ## + ## ss_name: SumStat name. + ## + ## key: The SumStat key being requested. + ## + ## Returns: The result for the requested sumstat key. + global request_key: function(ss_name: string, key: Key): Result; + + ## This record is primarily used for internal threshold tracking. + type Thresholding: record {}; ## This event is generated when thresholds are reset for a SumStat. ## - ## ssid: SumStats ID that thresholds were reset for. - global thresholds_reset: event(ssid: string); + ## name: SumStats name that thresholds were reset for. + global thresholds_reset: event(name: string); ## Helper function to represent a :bro:type:`SumStats::Key` value as ## a simple string. @@ -157,19 +179,43 @@ export { global key2str: function(key: SumStats::Key): string; } +# The function prototype for plugins to do calculations. +type ObserveFunc: function(r: Reducer, val: double, data: Observation, rv: ResultVal); + redef record Reducer += { - # Internal use only. Provides a reference back to the related SumStats by it's ID. - sid: string &optional; + # Internal use only. Provides a reference back to the related SumStats by its name. + ssname: string &optional; + + calc_funcs: vector of Calculation &optional; }; +redef record Thresholding += { + # Internal use only. Indicates if a simple threshold was already crossed. + is_threshold_crossed: bool &default=F; + + # Internal use only. Current key for threshold series. + threshold_series_index: count &default=0; +}; + + # Internal use only. For tracking thresholds per sumstat and key. global threshold_tracker: table[string] of table[Key] of Thresholding &optional; redef record SumStat += { + # Internal use only. + ssname: string &optional; + # Internal use only (mostly for cluster coherency). id: string &optional; }; +# Prototype the hook point for plugins to initialize any result values. +global init_resultval_hook: hook(r: Reducer, rv: ResultVal); + +# Prototype the hook point for plugins to merge Results. +global compose_resultvals_hook: hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal); + + # Store of sumstats indexed on the sumstat id. global stats_store: table[string] of SumStat = table(); @@ -182,20 +228,20 @@ global result_store: table[string] of ResultTable = table(); # Store of threshold information. global thresholds_store: table[string, Key] of bool = table(); +# Store the calculations. +global calc_store: table[Calculation] of ObserveFunc = table(); + +# Store the dependencies for Calculations. +global calc_deps: table[Calculation] of vector of Calculation = table(); + +# Hook for registering observation calculation plugins. +global register_observe_plugins: hook(); + # This is called whenever key values are updated and the new val is given as the # `val` argument. It's only prototyped here because cluster and non-cluster have # separate implementations. global data_added: function(ss: SumStat, key: Key, result: Result); -# Prototype the hook point for plugins to do calculations. -global observe_hook: hook(r: Reducer, val: double, data: Observation, rv: ResultVal); - -# Prototype the hook point for plugins to initialize any result values. -global init_resultval_hook: hook(r: Reducer, rv: ResultVal); - -# Prototype the hook point for plugins to merge Results. -global compose_resultvals_hook: hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal); - # Event that is used to "finish" measurements and adapt the measurement # framework for clustered or non-clustered usage. global finish_epoch: event(ss: SumStat); @@ -210,6 +256,24 @@ function key2str(key: Key): string return fmt("sumstats_key(%s)", out); } +function register_observe_plugin(calc: Calculation, func: ObserveFunc) + { + calc_store[calc] = func; + } + +function add_observe_plugin_dependency(calc: Calculation, depends_on: Calculation) + { + if ( calc !in calc_deps ) + calc_deps[calc] = vector(); + calc_deps[calc][|calc_deps[calc]|] = depends_on; + } + +event bro_init() &priority=100000 + { + # Call all of the plugin registration hooks + hook register_observe_plugins(); + } + function init_resultval(r: Reducer): ResultVal { local rv: ResultVal = [$begin=network_time(), $end=network_time()]; @@ -234,25 +298,17 @@ function compose_results(r1: Result, r2: Result): Result { local result: Result = table(); - if ( |r1| > |r2| ) + for ( id in r1 ) { - for ( data_id in r1 ) - { - if ( data_id in r2 ) - result[data_id] = compose_resultvals(r1[data_id], r2[data_id]); - else - result[data_id] = r1[data_id]; - } + result[id] = r1[id]; } - else + + for ( id in r2 ) { - for ( data_id in r2 ) - { - if ( data_id in r1 ) - result[data_id] = compose_resultvals(r1[data_id], r2[data_id]); - else - result[data_id] = r2[data_id]; - } + if ( id in r1 ) + result[id] = compose_resultvals(r1[id], r2[id]); + else + result[id] = r2[id]; } return result; @@ -261,18 +317,42 @@ function compose_results(r1: Result, r2: Result): Result function reset(ss: SumStat) { - if ( ss$id in result_store ) - delete result_store[ss$id]; + if ( ss$name in result_store ) + delete result_store[ss$name]; - result_store[ss$id] = table(); + result_store[ss$name] = table(); if ( ss?$threshold || ss?$threshold_series ) { - threshold_tracker[ss$id] = table(); - event SumStats::thresholds_reset(ss$id); + threshold_tracker[ss$name] = table(); + event SumStats::thresholds_reset(ss$name); } } +# This could potentially recurse forever, but plugin authors +# should be making sure they aren't causing reflexive dependencies. +function add_calc_deps(calcs: vector of Calculation, c: Calculation) + { + #print fmt("Checking for deps for %s", c); + for ( i in calc_deps[c] ) + { + local skip_calc=F; + for ( j in calcs ) + { + if ( calcs[j] == calc_deps[c][i] ) + skip_calc=T; + } + if ( ! skip_calc ) + { + if ( calc_deps[c][i] in calc_deps ) + add_calc_deps(calcs, calc_deps[c][i]); + calcs[|c|] = calc_deps[c][i]; + #print fmt("add dep for %s [%s] ", c, calc_deps[c][i]); + } + } + + } + function create(ss: SumStat) { if ( (ss?$threshold || ss?$threshold_series) && ! ss?$threshold_val ) @@ -280,14 +360,32 @@ function create(ss: SumStat) Reporter::error("SumStats given a threshold with no $threshold_val function"); } - if ( ! ss?$id ) - ss$id=unique_id(""); - threshold_tracker[ss$id] = table(); - stats_store[ss$id] = ss; + threshold_tracker[ss$name] = table(); + stats_store[ss$name] = ss; for ( reducer in ss$reducers ) { - reducer$sid = ss$id; + reducer$ssname = ss$name; + reducer$calc_funcs = vector(); + for ( calc in reducer$apply ) + { + # Add in dependencies recursively. + if ( calc in calc_deps ) + add_calc_deps(reducer$calc_funcs, calc); + + # Don't add this calculation to the vector if + # it was already added by something else as a + # dependency. + local skip_calc=F; + for ( j in reducer$calc_funcs ) + { + if ( calc == reducer$calc_funcs[j] ) + skip_calc=T; + } + if ( ! skip_calc ) + reducer$calc_funcs[|reducer$calc_funcs|] = calc; + } + if ( reducer$stream !in reducer_store ) reducer_store[reducer$stream] = set(); add reducer_store[reducer$stream][reducer]; @@ -313,7 +411,7 @@ function observe(id: string, key: Key, obs: Observation) if ( r?$pred && ! r$pred(key, obs) ) next; - local ss = stats_store[r$sid]; + local ss = stats_store[r$ssname]; # If there is a threshold and no epoch_finished callback # we don't need to continue counting since the data will @@ -324,17 +422,21 @@ function observe(id: string, key: Key, obs: Observation) # future if on demand access is provided to the # SumStats results. if ( ! ss?$epoch_finished && - r$sid in threshold_tracker && - key in threshold_tracker[r$sid] && ( ss?$threshold && - threshold_tracker[r$sid][key]$is_threshold_crossed ) || + r$ssname in threshold_tracker && + key in threshold_tracker[r$ssname] && + threshold_tracker[r$ssname][key]$is_threshold_crossed ) || ( ss?$threshold_series && - threshold_tracker[r$sid][key]$threshold_series_index+1 == |ss$threshold_series| ) ) + r$ssname in threshold_tracker && + key in threshold_tracker[r$ssname] && + threshold_tracker[r$ssname][key]$threshold_series_index == |ss$threshold_series| ) ) + { next; + } - if ( r$sid !in result_store ) - result_store[ss$id] = table(); - local results = result_store[r$sid]; + if ( r$ssname !in result_store ) + result_store[r$ssname] = table(); + local results = result_store[r$ssname]; if ( key !in results ) results[key] = table(); @@ -350,10 +452,13 @@ function observe(id: string, key: Key, obs: Observation) # If a string was given, fall back to 1.0 as the value. local val = 1.0; - if ( obs?$num || obs?$dbl ) - val = obs?$dbl ? obs$dbl : obs$num; + if ( obs?$num ) + val = obs$num; + else if ( obs?$dbl ) + val = obs$dbl; - hook observe_hook(r, val, obs, result_val); + for ( i in r$calc_funcs ) + calc_store[r$calc_funcs[i]](r, val, obs, result_val); data_added(ss, key, result); } } @@ -366,6 +471,8 @@ function check_thresholds(ss: SumStat, key: Key, result: Result, modify_pct: dou return F; # Add in the extra ResultVals to make threshold_vals easier to write. + # This length comparison should work because we just need to make + # sure that we have the same number of reducers and results. if ( |ss$reducers| != |result| ) { for ( reducer in ss$reducers ) @@ -378,11 +485,11 @@ function check_thresholds(ss: SumStat, key: Key, result: Result, modify_pct: dou local watch = ss$threshold_val(key, result); if ( modify_pct < 1.0 && modify_pct > 0.0 ) - watch = double_to_count(floor(watch/modify_pct)); + watch = watch/modify_pct; - if ( ss$id !in threshold_tracker ) - threshold_tracker[ss$id] = table(); - local t_tracker = threshold_tracker[ss$id]; + if ( ss$name !in threshold_tracker ) + threshold_tracker[ss$name] = table(); + local t_tracker = threshold_tracker[ss$name]; if ( key !in t_tracker ) { @@ -398,7 +505,7 @@ function check_thresholds(ss: SumStat, key: Key, result: Result, modify_pct: dou } if ( ss?$threshold_series && - |ss$threshold_series| >= tt$threshold_series_index && + |ss$threshold_series| > tt$threshold_series_index && watch >= ss$threshold_series[tt$threshold_series_index] ) { # A threshold series was given and the value crossed the next @@ -426,7 +533,7 @@ function threshold_crossed(ss: SumStat, key: Key, result: Result) } ss$threshold_crossed(key, result); - local tt = threshold_tracker[ss$id][key]; + local tt = threshold_tracker[ss$name][key]; tt$is_threshold_crossed = T; # Bump up to the next threshold series index if a threshold series is being used. diff --git a/scripts/base/frameworks/sumstats/non-cluster.bro b/scripts/base/frameworks/sumstats/non-cluster.bro index f27d4b5cfb..265261f1bd 100644 --- a/scripts/base/frameworks/sumstats/non-cluster.bro +++ b/scripts/base/frameworks/sumstats/non-cluster.bro @@ -4,9 +4,9 @@ module SumStats; event SumStats::finish_epoch(ss: SumStat) { - if ( ss$id in result_store ) + if ( ss$name in result_store ) { - local data = result_store[ss$id]; + local data = result_store[ss$name]; if ( ss?$epoch_finished ) ss$epoch_finished(data); @@ -16,9 +16,32 @@ event SumStats::finish_epoch(ss: SumStat) schedule ss$epoch { SumStats::finish_epoch(ss) }; } - function data_added(ss: SumStat, key: Key, result: Result) { if ( check_thresholds(ss, key, result, 1.0) ) threshold_crossed(ss, key, result); } + +function request(ss_name: string): ResultTable + { + # This only needs to be implemented this way for cluster compatibility. + return when ( T ) + { + if ( ss_name in result_store ) + return result_store[ss_name]; + else + return table(); + } + } + +function request_key(ss_name: string, key: Key): Result + { + # This only needs to be implemented this way for cluster compatibility. + return when ( T ) + { + if ( ss_name in result_store && key in result_store[ss_name] ) + return result_store[ss_name][key]; + else + return table(); + } + } \ No newline at end of file diff --git a/scripts/base/frameworks/sumstats/plugins/average.bro b/scripts/base/frameworks/sumstats/plugins/average.bro index a409bb9408..8f7f7b568f 100644 --- a/scripts/base/frameworks/sumstats/plugins/average.bro +++ b/scripts/base/frameworks/sumstats/plugins/average.bro @@ -1,4 +1,4 @@ -@load base/frameworks/sumstats +@load ../main module SumStats; @@ -14,17 +14,18 @@ export { }; } -hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal) +hook register_observe_plugins() { - if ( AVERAGE in r$apply ) + register_observe_plugin(AVERAGE, function(r: Reducer, val: double, obs: Observation, rv: ResultVal) { if ( ! rv?$average ) rv$average = val; else rv$average += (val - rv$average) / rv$num; - } + }); } + hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal) { if ( rv1?$average && rv2?$average ) diff --git a/scripts/base/frameworks/sumstats/plugins/max.bro b/scripts/base/frameworks/sumstats/plugins/max.bro index 6167d31f10..d43ad9dc38 100644 --- a/scripts/base/frameworks/sumstats/plugins/max.bro +++ b/scripts/base/frameworks/sumstats/plugins/max.bro @@ -1,4 +1,4 @@ -@load base/frameworks/sumstats +@load ../main module SumStats; @@ -14,15 +14,15 @@ export { }; } -hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal) +hook register_observe_plugins() { - if ( MAX in r$apply ) + register_observe_plugin(MAX, function(r: Reducer, val: double, obs: Observation, rv: ResultVal) { if ( ! rv?$max ) rv$max = val; else if ( val > rv$max ) rv$max = val; - } + }); } hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal) diff --git a/scripts/base/frameworks/sumstats/plugins/min.bro b/scripts/base/frameworks/sumstats/plugins/min.bro index a15ed0e733..014755cf32 100644 --- a/scripts/base/frameworks/sumstats/plugins/min.bro +++ b/scripts/base/frameworks/sumstats/plugins/min.bro @@ -1,4 +1,4 @@ -@load base/frameworks/sumstats +@load ../main module SumStats; @@ -14,17 +14,18 @@ export { }; } -hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal) +hook register_observe_plugins() { - if ( MIN in r$apply ) + register_observe_plugin(MIN, function(r: Reducer, val: double, obs: Observation, rv: ResultVal) { if ( ! rv?$min ) rv$min = val; else if ( val < rv$min ) rv$min = val; - } + }); } + hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal) { if ( rv1?$min && rv2?$min ) diff --git a/scripts/base/frameworks/sumstats/plugins/sample.bro b/scripts/base/frameworks/sumstats/plugins/sample.bro index d0587bde08..92d17e503d 100644 --- a/scripts/base/frameworks/sumstats/plugins/sample.bro +++ b/scripts/base/frameworks/sumstats/plugins/sample.bro @@ -1,9 +1,14 @@ -@load base/frameworks/sumstats @load base/utils/queue +@load ../main module SumStats; export { + redef enum Calculation += { + ## Collect a sample of the last few observations. + SAMPLE + }; + redef record Reducer += { ## A number of sample Observations to collect. samples: count &default=0; @@ -27,14 +32,14 @@ function get_samples(rv: ResultVal): vector of Observation return s; } -hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal) +hook register_observe_plugins() { - if ( r$samples > 0 ) + register_observe_plugin(SAMPLE, function(r: Reducer, val: double, obs: Observation, rv: ResultVal) { if ( ! rv?$samples ) rv$samples = Queue::init([$max_len=r$samples]); Queue::put(rv$samples, obs); - } + }); } hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal) diff --git a/scripts/base/frameworks/sumstats/plugins/std-dev.bro b/scripts/base/frameworks/sumstats/plugins/std-dev.bro index 6411fe4bce..2e5b95b212 100644 --- a/scripts/base/frameworks/sumstats/plugins/std-dev.bro +++ b/scripts/base/frameworks/sumstats/plugins/std-dev.bro @@ -1,5 +1,5 @@ @load ./variance -@load base/frameworks/sumstats +@load ../main module SumStats; @@ -21,11 +21,18 @@ function calc_std_dev(rv: ResultVal) rv$std_dev = sqrt(rv$variance); } -# This depends on the variance plugin which uses priority -5 -hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal) &priority=-10 +hook std_dev_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal) { - if ( STD_DEV in r$apply ) + calc_std_dev(rv); + } + +hook register_observe_plugins() &priority=-10 + { + register_observe_plugin(STD_DEV, function(r: Reducer, val: double, obs: Observation, rv: ResultVal) + { calc_std_dev(rv); + }); + add_observe_plugin_dependency(STD_DEV, VARIANCE); } hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal) &priority=-10 diff --git a/scripts/base/frameworks/sumstats/plugins/sum.bro b/scripts/base/frameworks/sumstats/plugins/sum.bro index 3e5b28e2be..074b4b72f3 100644 --- a/scripts/base/frameworks/sumstats/plugins/sum.bro +++ b/scripts/base/frameworks/sumstats/plugins/sum.bro @@ -1,4 +1,4 @@ -@load base/frameworks/sumstats +@load ../main module SumStats; @@ -14,19 +14,19 @@ export { sum: double &default=0.0; }; - type threshold_function: function(key: SumStats::Key, result: SumStats::Result): count; - global sum_threshold: function(data_id: string): threshold_function; + #type threshold_function: function(key: SumStats::Key, result: SumStats::Result): count; + #global sum_threshold: function(data_id: string): threshold_function; } -function sum_threshold(data_id: string): threshold_function - { - return function(key: SumStats::Key, result: SumStats::Result): count - { - print fmt("data_id: %s", data_id); - print result; - return double_to_count(result[data_id]$sum); - }; - } +#function sum_threshold(data_id: string): threshold_function +# { +# return function(key: SumStats::Key, result: SumStats::Result): count +# { +# print fmt("data_id: %s", data_id); +# print result; +# return double_to_count(result[data_id]$sum); +# }; +# } hook init_resultval_hook(r: Reducer, rv: ResultVal) { @@ -34,10 +34,12 @@ hook init_resultval_hook(r: Reducer, rv: ResultVal) rv$sum = 0; } -hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal) +hook register_observe_plugins() { - if ( SUM in r$apply ) + register_observe_plugin(SUM, function(r: Reducer, val: double, obs: Observation, rv: ResultVal) + { rv$sum += val; + }); } hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal) diff --git a/scripts/base/frameworks/sumstats/plugins/unique.bro b/scripts/base/frameworks/sumstats/plugins/unique.bro index a407a487a2..15fc28adb6 100644 --- a/scripts/base/frameworks/sumstats/plugins/unique.bro +++ b/scripts/base/frameworks/sumstats/plugins/unique.bro @@ -1,4 +1,4 @@ -@load base/frameworks/sumstats +@load ../main module SumStats; @@ -23,15 +23,15 @@ redef record ResultVal += { unique_vals: set[Observation] &optional; }; -hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal) +hook register_observe_plugins() { - if ( UNIQUE in r$apply ) + register_observe_plugin(UNIQUE, function(r: Reducer, val: double, obs: Observation, rv: ResultVal) { if ( ! rv?$unique_vals ) rv$unique_vals=set(); add rv$unique_vals[obs]; rv$unique = |rv$unique_vals|; - } + }); } hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal) diff --git a/scripts/base/frameworks/sumstats/plugins/variance.bro b/scripts/base/frameworks/sumstats/plugins/variance.bro index 1e7a00ea97..12d30cc4fe 100644 --- a/scripts/base/frameworks/sumstats/plugins/variance.bro +++ b/scripts/base/frameworks/sumstats/plugins/variance.bro @@ -1,5 +1,5 @@ @load ./average -@load base/frameworks/sumstats +@load ../main module SumStats; @@ -28,17 +28,17 @@ function calc_variance(rv: ResultVal) rv$variance = (rv$num > 1) ? rv$var_s/(rv$num-1) : 0.0; } -# Reduced priority since this depends on the average -hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal) &priority=-5 +hook register_observe_plugins() &priority=-5 { - if ( VARIANCE in r$apply ) + register_observe_plugin(VARIANCE, function(r: Reducer, val: double, obs: Observation, rv: ResultVal) { if ( rv$num > 1 ) rv$var_s += ((val - rv$prev_avg) * (val - rv$average)); calc_variance(rv); rv$prev_avg = rv$average; - } + }); + add_observe_plugin_dependency(VARIANCE, AVERAGE); } # Reduced priority since this depends on the average diff --git a/scripts/policy/misc/app-stats/__load__.bro b/scripts/policy/misc/app-stats/__load__.bro new file mode 100644 index 0000000000..c468d055ee --- /dev/null +++ b/scripts/policy/misc/app-stats/__load__.bro @@ -0,0 +1,2 @@ +@load ./main +@load ./plugins \ No newline at end of file diff --git a/scripts/policy/misc/app-metrics.bro b/scripts/policy/misc/app-stats/main.bro similarity index 58% rename from scripts/policy/misc/app-metrics.bro rename to scripts/policy/misc/app-stats/main.bro index 3df38ad8ad..e4a38c6893 100644 --- a/scripts/policy/misc/app-metrics.bro +++ b/scripts/policy/misc/app-stats/main.bro @@ -1,3 +1,6 @@ +#! AppStats collects information about web applications in use +#! on the network. + @load base/protocols/http @load base/protocols/ssl @load base/frameworks/sumstats @@ -30,13 +33,17 @@ redef record connection += { resp_hostname: string &optional; }; +global add_sumstats: hook(id: conn_id, hostname: string, size: count); + + event bro_init() &priority=3 { Log::create_stream(AppStats::LOG, [$columns=Info]); local r1: SumStats::Reducer = [$stream="apps.bytes", $apply=set(SumStats::SUM)]; local r2: SumStats::Reducer = [$stream="apps.hits", $apply=set(SumStats::UNIQUE)]; - SumStats::create([$epoch=break_interval, + SumStats::create([$name="app-metrics", + $epoch=break_interval, $reducers=set(r1, r2), $epoch_finished(data: SumStats::ResultTable) = { @@ -55,41 +62,6 @@ event bro_init() &priority=3 }]); } -function add_sumstats(id: conn_id, hostname: string, size: count) - { - if ( /\.youtube\.com$/ in hostname && size > 512*1024 ) - { - SumStats::observe("apps.bytes", [$str="youtube"], [$num=size]); - SumStats::observe("apps.hits", [$str="youtube"], [$str=cat(id$orig_h)]); - } - else if ( /(\.facebook\.com|\.fbcdn\.net)$/ in hostname && size > 20 ) - { - SumStats::observe("apps.bytes", [$str="facebook"], [$num=size]); - SumStats::observe("apps.hits", [$str="facebook"], [$str=cat(id$orig_h)]); - } - else if ( /\.google\.com$/ in hostname && size > 20 ) - { - SumStats::observe("apps.bytes", [$str="google"], [$num=size]); - SumStats::observe("apps.hits", [$str="google"], [$str=cat(id$orig_h)]); - } - else if ( /\.nflximg\.com$/ in hostname && size > 200*1024 ) - { - SumStats::observe("apps.bytes", [$str="netflix"], [$num=size]); - SumStats::observe("apps.hits", [$str="netflix"], [$str=cat(id$orig_h)]); - } - else if ( /\.(pandora|p-cdn)\.com$/ in hostname && size > 512*1024 ) - { - SumStats::observe("apps.bytes", [$str="pandora"], [$num=size]); - SumStats::observe("apps.hits", [$str="pandora"], [$str=cat(id$orig_h)]); - } - else if ( /\.gmail\.com$/ in hostname && size > 20 ) - { - SumStats::observe("apps.bytes", [$str="gmail"], [$num=size]); - SumStats::observe("apps.hits", [$str="gmail"], [$str=cat(id$orig_h)]); - } -} - - event ssl_established(c: connection) { if ( c?$ssl && c$ssl?$server_name ) @@ -99,11 +71,11 @@ event ssl_established(c: connection) event connection_finished(c: connection) { if ( c?$resp_hostname ) - add_sumstats(c$id, c$resp_hostname, c$resp$size); + hook add_sumstats(c$id, c$resp_hostname, c$resp$size); } event HTTP::log_http(rec: HTTP::Info) { if( rec?$host ) - add_sumstats(rec$id, rec$host, rec$response_body_len); + hook add_sumstats(rec$id, rec$host, rec$response_body_len); } diff --git a/scripts/policy/misc/app-stats/plugins/__load__.bro b/scripts/policy/misc/app-stats/plugins/__load__.bro new file mode 100644 index 0000000000..7a3ea2da81 --- /dev/null +++ b/scripts/policy/misc/app-stats/plugins/__load__.bro @@ -0,0 +1,6 @@ +@load ./facebook +@load ./gmail +@load ./google +@load ./netflix +@load ./pandora +@load ./youtube \ No newline at end of file diff --git a/scripts/policy/misc/app-stats/plugins/facebook.bro b/scripts/policy/misc/app-stats/plugins/facebook.bro new file mode 100644 index 0000000000..edcb02b72a --- /dev/null +++ b/scripts/policy/misc/app-stats/plugins/facebook.bro @@ -0,0 +1,12 @@ +@load ../main + +module AppStats; + +hook add_sumstats(id: conn_id, hostname: string, size: count) + { + if ( /\.(facebook\.com|fbcdn\.net)$/ in hostname && size > 20 ) + { + SumStats::observe("apps.bytes", [$str="facebook"], [$num=size]); + SumStats::observe("apps.hits", [$str="facebook"], [$str=cat(id$orig_h)]); + } + } \ No newline at end of file diff --git a/scripts/policy/misc/app-stats/plugins/gmail.bro b/scripts/policy/misc/app-stats/plugins/gmail.bro new file mode 100644 index 0000000000..1642fb7651 --- /dev/null +++ b/scripts/policy/misc/app-stats/plugins/gmail.bro @@ -0,0 +1,12 @@ +@load ../main + +module AppStats; + +hook add_sumstats(id: conn_id, hostname: string, size: count) + { + if ( /\.gmail\.com$/ in hostname && size > 20 ) + { + SumStats::observe("apps.bytes", [$str="gmail"], [$num=size]); + SumStats::observe("apps.hits", [$str="gmail"], [$str=cat(id$orig_h)]); + } + } \ No newline at end of file diff --git a/scripts/policy/misc/app-stats/plugins/google.bro b/scripts/policy/misc/app-stats/plugins/google.bro new file mode 100644 index 0000000000..e1da3a9068 --- /dev/null +++ b/scripts/policy/misc/app-stats/plugins/google.bro @@ -0,0 +1,12 @@ +@load ../main + +module AppStats; + +hook add_sumstats(id: conn_id, hostname: string, size: count) + { + if ( /\.google\.com$/ in hostname && size > 20 ) + { + SumStats::observe("apps.bytes", [$str="google"], [$num=size]); + SumStats::observe("apps.hits", [$str="google"], [$str=cat(id$orig_h)]); + } + } \ No newline at end of file diff --git a/scripts/policy/misc/app-stats/plugins/netflix.bro b/scripts/policy/misc/app-stats/plugins/netflix.bro new file mode 100644 index 0000000000..5d429f0caf --- /dev/null +++ b/scripts/policy/misc/app-stats/plugins/netflix.bro @@ -0,0 +1,12 @@ +@load ../main + +module AppStats; + +hook add_sumstats(id: conn_id, hostname: string, size: count) + { + if ( /\.nflximg\.com$/ in hostname && size > 200*1024 ) + { + SumStats::observe("apps.bytes", [$str="netflix"], [$num=size]); + SumStats::observe("apps.hits", [$str="netflix"], [$str=cat(id$orig_h)]); + } + } \ No newline at end of file diff --git a/scripts/policy/misc/app-stats/plugins/pandora.bro b/scripts/policy/misc/app-stats/plugins/pandora.bro new file mode 100644 index 0000000000..6cfbfab72d --- /dev/null +++ b/scripts/policy/misc/app-stats/plugins/pandora.bro @@ -0,0 +1,12 @@ +@load ../main + +module AppStats; + +hook add_sumstats(id: conn_id, hostname: string, size: count) + { + if ( /\.(pandora|p-cdn)\.com$/ in hostname && size > 512*1024 ) + { + SumStats::observe("apps.bytes", [$str="pandora"], [$num=size]); + SumStats::observe("apps.hits", [$str="pandora"], [$str=cat(id$orig_h)]); + } + } \ No newline at end of file diff --git a/scripts/policy/misc/app-stats/plugins/youtube.bro b/scripts/policy/misc/app-stats/plugins/youtube.bro new file mode 100644 index 0000000000..af872cfdac --- /dev/null +++ b/scripts/policy/misc/app-stats/plugins/youtube.bro @@ -0,0 +1,12 @@ +@load ../main + +module AppStats; + +hook add_sumstats(id: conn_id, hostname: string, size: count) + { + if ( /\.youtube\.com$/ in hostname && size > 512*1024 ) + { + SumStats::observe("apps.bytes", [$str="youtube"], [$num=size]); + SumStats::observe("apps.hits", [$str="youtube"], [$str=cat(id$orig_h)]); + } + } \ No newline at end of file diff --git a/scripts/policy/misc/detect-traceroute/main.bro b/scripts/policy/misc/detect-traceroute/main.bro index c194d03e13..0cb7082919 100644 --- a/scripts/policy/misc/detect-traceroute/main.bro +++ b/scripts/policy/misc/detect-traceroute/main.bro @@ -29,7 +29,7 @@ export { ## Defines the threshold for ICMP Time Exceeded messages for a src-dst pair. ## This threshold only comes into play after a host is found to be ## sending low ttl packets. - const icmp_time_exceeded_threshold = 3 &redef; + const icmp_time_exceeded_threshold: double = 3 &redef; ## Interval at which to watch for the ## :bro:id:`ICMPTimeExceeded::icmp_time_exceeded_threshold` variable to be crossed. @@ -57,16 +57,17 @@ event bro_init() &priority=5 local r1: SumStats::Reducer = [$stream="traceroute.time_exceeded", $apply=set(SumStats::UNIQUE)]; local r2: SumStats::Reducer = [$stream="traceroute.low_ttl_packet", $apply=set(SumStats::SUM)]; - SumStats::create([$epoch=icmp_time_exceeded_interval, + SumStats::create([$name="traceroute-detection", + $epoch=icmp_time_exceeded_interval, $reducers=set(r1, r2), $threshold_val(key: SumStats::Key, result: SumStats::Result) = { # Give a threshold value of zero depending on if the host # sends a low ttl packet. if ( require_low_ttl_packets && result["traceroute.low_ttl_packet"]$sum == 0 ) - return 0; + return 0.0; else - return result["traceroute.time_exceeded"]$unique; + return result["traceroute.time_exceeded"]$unique+0; }, $threshold=icmp_time_exceeded_threshold, $threshold_crossed(key: SumStats::Key, result: SumStats::Result) = diff --git a/scripts/policy/misc/scan.bro b/scripts/policy/misc/scan.bro index f3dcaf2291..f9c47a0311 100644 --- a/scripts/policy/misc/scan.bro +++ b/scripts/policy/misc/scan.bro @@ -39,15 +39,11 @@ export { ## The threshold of a unique number of hosts a scanning host has to have failed ## connections with on a single port. - const addr_scan_threshold = 25 &redef; + const addr_scan_threshold = 25.0 &redef; ## The threshold of a number of unique ports a scanning host has to have failed ## connections with on a single victim host. - const port_scan_threshold = 15 &redef; - - ## Custom thresholds based on service for address scan. This is primarily - ## useful for setting reduced thresholds for specific ports. - const addr_scan_custom_thresholds: table[port] of count &redef; + const port_scan_threshold = 15.0 &redef; global Scan::addr_scan_policy: hook(scanner: addr, victim: addr, scanned_port: port); global Scan::port_scan_policy: hook(scanner: addr, victim: addr, scanned_port: port); @@ -56,11 +52,12 @@ export { event bro_init() &priority=5 { local r1: SumStats::Reducer = [$stream="scan.addr.fail", $apply=set(SumStats::UNIQUE)]; - SumStats::create([$epoch=addr_scan_interval, + SumStats::create([$name="addr-scan", + $epoch=addr_scan_interval, $reducers=set(r1), $threshold_val(key: SumStats::Key, result: SumStats::Result) = { - return double_to_count(result["scan.addr.fail"]$unique); + return result["scan.addr.fail"]$unique+0.0; }, #$threshold_func=check_addr_scan_threshold, $threshold=addr_scan_threshold, @@ -80,11 +77,12 @@ event bro_init() &priority=5 # Note: port scans are tracked similar to: table[src_ip, dst_ip] of set(port); local r2: SumStats::Reducer = [$stream="scan.port.fail", $apply=set(SumStats::UNIQUE)]; - SumStats::create([$epoch=port_scan_interval, + SumStats::create([$name="port-scan", + $epoch=port_scan_interval, $reducers=set(r2), $threshold_val(key: SumStats::Key, result: SumStats::Result) = { - return double_to_count(result["scan.port.fail"]$unique); + return result["scan.port.fail"]$unique+0.0; }, $threshold=port_scan_threshold, $threshold_crossed(key: SumStats::Key, result: SumStats::Result) = diff --git a/scripts/policy/protocols/ftp/detect-bruteforcing.bro b/scripts/policy/protocols/ftp/detect-bruteforcing.bro index 21c9c403c7..36dfafb53a 100644 --- a/scripts/policy/protocols/ftp/detect-bruteforcing.bro +++ b/scripts/policy/protocols/ftp/detect-bruteforcing.bro @@ -17,7 +17,7 @@ export { ## How many rejected usernames or passwords are required before being ## considered to be bruteforcing. - const bruteforce_threshold = 20 &redef; + const bruteforce_threshold: double = 20 &redef; ## The time period in which the threshold needs to be crossed before ## being reset. @@ -28,11 +28,12 @@ export { event bro_init() { local r1: SumStats::Reducer = [$stream="ftp.failed_auth", $apply=set(SumStats::UNIQUE)]; - SumStats::create([$epoch=bruteforce_measurement_interval, + SumStats::create([$name="ftp-detect-bruteforcing", + $epoch=bruteforce_measurement_interval, $reducers=set(r1), $threshold_val(key: SumStats::Key, result: SumStats::Result) = { - return result["ftp.failed_auth"]$num; + return result["ftp.failed_auth"]$num+0.0; }, $threshold=bruteforce_threshold, $threshold_crossed(key: SumStats::Key, result: SumStats::Result) = diff --git a/scripts/policy/protocols/http/detect-sqli.bro b/scripts/policy/protocols/http/detect-sqli.bro index 11dba0dc46..1d8d9d0904 100644 --- a/scripts/policy/protocols/http/detect-sqli.bro +++ b/scripts/policy/protocols/http/detect-sqli.bro @@ -28,7 +28,7 @@ export { ## Defines the threshold that determines if an SQL injection attack ## is ongoing based on the number of requests that appear to be SQL ## injection attacks. - const sqli_requests_threshold = 50 &redef; + const sqli_requests_threshold: double = 50.0 &redef; ## Interval at which to watch for the ## :bro:id:`HTTP::sqli_requests_threshold` variable to be crossed. @@ -64,11 +64,12 @@ event bro_init() &priority=3 # determine when it looks like an actual attack and how to respond when # thresholds are crossed. local r1: SumStats::Reducer = [$stream="http.sqli.attacker", $apply=set(SumStats::SUM), $samples=collect_SQLi_samples]; - SumStats::create([$epoch=sqli_requests_interval, + SumStats::create([$name="detect-sqli-attackers", + $epoch=sqli_requests_interval, $reducers=set(r1), $threshold_val(key: SumStats::Key, result: SumStats::Result) = { - return double_to_count(result["http.sqli.attacker"]$sum); + return result["http.sqli.attacker"]$sum; }, $threshold=sqli_requests_threshold, $threshold_crossed(key: SumStats::Key, result: SumStats::Result) = @@ -82,11 +83,12 @@ event bro_init() &priority=3 }]); local r2: SumStats::Reducer = [$stream="http.sqli.victim", $apply=set(SumStats::SUM), $samples=collect_SQLi_samples]; - SumStats::create([$epoch=sqli_requests_interval, + SumStats::create([$name="detect-sqli-victims", + $epoch=sqli_requests_interval, $reducers=set(r2), $threshold_val(key: SumStats::Key, result: SumStats::Result) = { - return double_to_count(result["http.sqli.victim"]$sum); + return result["http.sqli.victim"]$sum; }, $threshold=sqli_requests_threshold, $threshold_crossed(key: SumStats::Key, result: SumStats::Result) = diff --git a/scripts/policy/protocols/ssh/detect-bruteforcing.bro b/scripts/policy/protocols/ssh/detect-bruteforcing.bro index 309905e939..161a314b8c 100644 --- a/scripts/policy/protocols/ssh/detect-bruteforcing.bro +++ b/scripts/policy/protocols/ssh/detect-bruteforcing.bro @@ -27,7 +27,7 @@ export { ## The number of failed SSH connections before a host is designated as ## guessing passwords. - const password_guesses_limit = 30 &redef; + const password_guesses_limit: double = 30 &redef; ## The amount of time to remember presumed non-successful logins to build ## model of a password guesser. @@ -43,11 +43,12 @@ export { event bro_init() { local r1: SumStats::Reducer = [$stream="ssh.login.failure", $apply=set(SumStats::SUM)]; - SumStats::create([$epoch=guessing_timeout, + SumStats::create([$name="detect-ssh-bruteforcing", + $epoch=guessing_timeout, $reducers=set(r1), $threshold_val(key: SumStats::Key, result: SumStats::Result) = { - return double_to_count(result["ssh.login.failure"]$sum); + return result["ssh.login.failure"]$sum; }, $threshold=password_guesses_limit, $threshold_crossed(key: SumStats::Key, result: SumStats::Result) = diff --git a/scripts/site/local.bro b/scripts/site/local.bro index dfebd9923a..90832b4a07 100644 --- a/scripts/site/local.bro +++ b/scripts/site/local.bro @@ -11,6 +11,13 @@ # Load the scan detection script. @load misc/scan +# Log some information about web applications being used by users +# on your network. +@load misc/app-metrics + +# Detect traceroute being run on the network. +@load misc/detect-traceroute + # Generate notices when vulnerable versions of software are discovered. # The default is to only monitor software found in the address space defined # as "local". Refer to the software framework's documentation for more diff --git a/testing/btest/Baseline/scripts.base.frameworks.sumstats.on-demand-cluster/manager-1..stdout b/testing/btest/Baseline/scripts.base.frameworks.sumstats.on-demand-cluster/manager-1..stdout new file mode 100644 index 0000000000..ed14a1c753 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.sumstats.on-demand-cluster/manager-1..stdout @@ -0,0 +1,7 @@ +Complete SumStat request + Host: 6.5.4.3 -> 1 + Host: 10.10.10.10 -> 5 + Host: 1.2.3.4 -> 169 + Host: 7.2.1.5 -> 145 +SumStat key request + Host: 7.2.1.5 -> 145 diff --git a/testing/btest/Baseline/scripts.base.frameworks.sumstats.on-demand/.stdout b/testing/btest/Baseline/scripts.base.frameworks.sumstats.on-demand/.stdout new file mode 100644 index 0000000000..876c368eb3 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.sumstats.on-demand/.stdout @@ -0,0 +1,5 @@ +Complete SumStat request + Host: 1.2.3.4 -> 42 + Host: 4.3.2.1 -> 7 +Key request for 1.2.3.4 + Host: 1.2.3.4 -> 42 diff --git a/testing/btest/scripts/base/frameworks/sumstats/basic-cluster.bro b/testing/btest/scripts/base/frameworks/sumstats/basic-cluster.bro index 1b7903ca1a..956c43a57b 100644 --- a/testing/btest/scripts/base/frameworks/sumstats/basic-cluster.bro +++ b/testing/btest/scripts/base/frameworks/sumstats/basic-cluster.bro @@ -23,7 +23,8 @@ global n = 0; event bro_init() &priority=5 { local r1: SumStats::Reducer = [$stream="test", $apply=set(SumStats::SUM, SumStats::MIN, SumStats::MAX, SumStats::AVERAGE, SumStats::STD_DEV, SumStats::VARIANCE, SumStats::UNIQUE)]; - SumStats::create([$epoch=5secs, + SumStats::create([$name="test", + $epoch=5secs, $reducers=set(r1), $epoch_finished(rt: SumStats::ResultTable) = { diff --git a/testing/btest/scripts/base/frameworks/sumstats/basic.bro b/testing/btest/scripts/base/frameworks/sumstats/basic.bro index 0b2851bf10..54160cbf54 100644 --- a/testing/btest/scripts/base/frameworks/sumstats/basic.bro +++ b/testing/btest/scripts/base/frameworks/sumstats/basic.bro @@ -11,16 +11,17 @@ event bro_init() &priority=5 SumStats::MIN, SumStats::STD_DEV, SumStats::UNIQUE)]; - SumStats::create([$epoch=3secs, - $reducers=set(r1), - $epoch_finished(data: SumStats::ResultTable) = - { - for ( key in data ) - { - local r = data[key]["test.metric"]; - print fmt("Host: %s - num:%d - sum:%.1f - var:%.1f - avg:%.1f - max:%.1f - min:%.1f - std_dev:%.1f - unique:%d", key$host, r$num, r$sum, r$variance, r$average, r$max, r$min, r$std_dev, r$unique); - } - } + SumStats::create([$name="test", + $epoch=3secs, + $reducers=set(r1), + $epoch_finished(data: SumStats::ResultTable) = + { + for ( key in data ) + { + local r = data[key]["test.metric"]; + print fmt("Host: %s - num:%d - sum:%.1f - var:%.1f - avg:%.1f - max:%.1f - min:%.1f - std_dev:%.1f - unique:%d", key$host, r$num, r$sum, r$variance, r$average, r$max, r$min, r$std_dev, r$unique); + } + } ]); SumStats::observe("test.metric", [$host=1.2.3.4], [$num=5]); diff --git a/testing/btest/scripts/base/frameworks/sumstats/cluster-intermediate-update.bro b/testing/btest/scripts/base/frameworks/sumstats/cluster-intermediate-update.bro index 303a0dc852..d54e432030 100644 --- a/testing/btest/scripts/base/frameworks/sumstats/cluster-intermediate-update.bro +++ b/testing/btest/scripts/base/frameworks/sumstats/cluster-intermediate-update.bro @@ -20,13 +20,14 @@ redef Log::default_rotation_interval = 0secs; event bro_init() &priority=5 { local r1: SumStats::Reducer = [$stream="test.metric", $apply=set(SumStats::SUM)]; - SumStats::create([$epoch=1hr, + SumStats::create([$name="test", + $epoch=1hr, $reducers=set(r1), $threshold_val(key: SumStats::Key, result: SumStats::Result) = { - return double_to_count(result["test.metric"]$sum); + return result["test.metric"]$sum; }, - $threshold=100, + $threshold=100.0, $threshold_crossed(key: SumStats::Key, result: SumStats::Result) = { print fmt("A test metric threshold was crossed with a value of: %.1f", result["test.metric"]$sum); diff --git a/testing/btest/scripts/base/frameworks/sumstats/on-demand-cluster.bro b/testing/btest/scripts/base/frameworks/sumstats/on-demand-cluster.bro new file mode 100644 index 0000000000..9ee0511b47 --- /dev/null +++ b/testing/btest/scripts/base/frameworks/sumstats/on-demand-cluster.bro @@ -0,0 +1,93 @@ +# @TEST-SERIALIZE: comm +# +# @TEST-EXEC: btest-bg-run manager-1 BROPATH=$BROPATH:.. CLUSTER_NODE=manager-1 bro %INPUT +# @TEST-EXEC: sleep 1 +# @TEST-EXEC: btest-bg-run worker-1 BROPATH=$BROPATH:.. CLUSTER_NODE=worker-1 bro %INPUT +# @TEST-EXEC: btest-bg-run worker-2 BROPATH=$BROPATH:.. CLUSTER_NODE=worker-2 bro %INPUT +# @TEST-EXEC: btest-bg-wait 15 + +# @TEST-EXEC: btest-diff manager-1/.stdout + +@TEST-START-FILE cluster-layout.bro +redef Cluster::nodes = { + ["manager-1"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=37757/tcp, $workers=set("worker-1", "worker-2")], + ["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=37760/tcp, $manager="manager-1", $interface="eth0"], + ["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=37761/tcp, $manager="manager-1", $interface="eth1"], +}; +@TEST-END-FILE + +redef Log::default_rotation_interval = 0secs; + +global n = 0; + +event bro_init() &priority=5 + { + local r1: SumStats::Reducer = [$stream="test", $apply=set(SumStats::SUM, SumStats::MIN, SumStats::MAX, SumStats::AVERAGE, SumStats::STD_DEV, SumStats::VARIANCE, SumStats::UNIQUE)]; + SumStats::create([$name="test sumstat", + $epoch=1hr, + $reducers=set(r1)]); + } + +event remote_connection_closed(p: event_peer) + { + terminate(); + } + +global ready_for_data: event(); +redef Cluster::manager2worker_events += /^ready_for_data$/; + +event ready_for_data() + { + if ( Cluster::node == "worker-1" ) + { + SumStats::observe("test", [$host=1.2.3.4], [$num=34]); + SumStats::observe("test", [$host=1.2.3.4], [$num=30]); + SumStats::observe("test", [$host=6.5.4.3], [$num=1]); + SumStats::observe("test", [$host=7.2.1.5], [$num=54]); + } + if ( Cluster::node == "worker-2" ) + { + SumStats::observe("test", [$host=1.2.3.4], [$num=75]); + SumStats::observe("test", [$host=1.2.3.4], [$num=30]); + SumStats::observe("test", [$host=7.2.1.5], [$num=91]); + SumStats::observe("test", [$host=10.10.10.10], [$num=5]); + } + } + + +event on_demand2() + { + local host = 7.2.1.5; + when ( local result = SumStats::request_key("test sumstat", [$host=host]) ) + { + print "SumStat key request"; + print fmt(" Host: %s -> %.0f", host, result["test"]$sum); + terminate(); + } + } + +event on_demand() + { + when ( local results = SumStats::request("test sumstat") ) + { + print "Complete SumStat request"; + for ( key in results ) + print fmt(" Host: %s -> %.0f", key$host, results[key]["test"]$sum); + + event on_demand2(); + } + } + +global peer_count = 0; +event remote_connection_handshake_done(p: event_peer) &priority=-5 + { + ++peer_count; + if ( peer_count == 2 ) + { + if ( Cluster::local_node_type() == Cluster::MANAGER ) + event ready_for_data(); + + schedule 1sec { on_demand() }; + } + } + diff --git a/testing/btest/scripts/base/frameworks/sumstats/on-demand.bro b/testing/btest/scripts/base/frameworks/sumstats/on-demand.bro new file mode 100644 index 0000000000..f93e1a72dc --- /dev/null +++ b/testing/btest/scripts/base/frameworks/sumstats/on-demand.bro @@ -0,0 +1,45 @@ +# @TEST-EXEC: bro %INPUT +# @TEST-EXEC: btest-diff .stdout + +redef exit_only_after_terminate=T; + + +event on_demand() + { + when ( local results = SumStats::request("test") ) + { + print "Complete SumStat request"; + for ( key in results ) + { + print fmt(" Host: %s -> %.0f", key$host, results[key]["test.reducer"]$sum); + } + } + } + +event on_demand_key() + { + local host = 1.2.3.4; + when ( local result = SumStats::request_key("test", [$host=host]) ) + { + print fmt("Key request for %s", host); + print fmt(" Host: %s -> %.0f", host, result["test.reducer"]$sum); + terminate(); + } + } + +event bro_init() &priority=5 + { + local r1: SumStats::Reducer = [$stream="test.reducer", + $apply=set(SumStats::SUM)]; + SumStats::create([$name="test", + $epoch=1hr, + $reducers=set(r1)]); + + # Seed some data but notice there are no callbacks defined in the sumstat! + SumStats::observe("test.reducer", [$host=1.2.3.4], [$num=42]); + SumStats::observe("test.reducer", [$host=4.3.2.1], [$num=7]); + + schedule 0.1 secs { on_demand() }; + schedule 1 secs { on_demand_key() }; + } + diff --git a/testing/btest/scripts/base/frameworks/sumstats/thresholding.bro b/testing/btest/scripts/base/frameworks/sumstats/thresholding.bro index ddc053bd23..b00b30a375 100644 --- a/testing/btest/scripts/base/frameworks/sumstats/thresholding.bro +++ b/testing/btest/scripts/base/frameworks/sumstats/thresholding.bro @@ -8,14 +8,15 @@ redef enum Notice::Type += { event bro_init() &priority=5 { local r1: SumStats::Reducer = [$stream="test.metric", $apply=set(SumStats::SUM)]; - SumStats::create([$epoch=3secs, + SumStats::create([$name="test1", + $epoch=3secs, $reducers=set(r1), #$threshold_val = SumStats::sum_threshold("test.metric"), $threshold_val(key: SumStats::Key, result: SumStats::Result) = { - return double_to_count(result["test.metric"]$sum); + return result["test.metric"]$sum; }, - $threshold=5, + $threshold=5.0, $threshold_crossed(key: SumStats::Key, result: SumStats::Result) = { local r = result["test.metric"]; @@ -24,14 +25,15 @@ event bro_init() &priority=5 ]); local r2: SumStats::Reducer = [$stream="test.metric", $apply=set(SumStats::SUM)]; - SumStats::create([$epoch=3secs, + SumStats::create([$name="test2", + $epoch=3secs, $reducers=set(r2), #$threshold_val = SumStats::sum_threshold("test.metric"), $threshold_val(key: SumStats::Key, result: SumStats::Result) = { - return double_to_count(result["test.metric"]$sum); + return result["test.metric"]$sum; }, - $threshold_series=vector(3,6,800), + $threshold_series=vector(3.0,6.0,800.0), $threshold_crossed(key: SumStats::Key, result: SumStats::Result) = { local r = result["test.metric"]; @@ -41,19 +43,20 @@ event bro_init() &priority=5 local r3: SumStats::Reducer = [$stream="test.metric", $apply=set(SumStats::SUM)]; local r4: SumStats::Reducer = [$stream="test.metric2", $apply=set(SumStats::SUM)]; - SumStats::create([$epoch=3secs, + SumStats::create([$name="test3", + $epoch=3secs, $reducers=set(r3, r4), $threshold_val(key: SumStats::Key, result: SumStats::Result) = { # Calculate a ratio between sums of two reducers. if ( "test.metric2" in result && "test.metric" in result && result["test.metric"]$sum > 0 ) - return double_to_count(result["test.metric2"]$sum / result["test.metric"]$sum); + return result["test.metric2"]$sum / result["test.metric"]$sum; else - return 0; + return 0.0; }, # Looking for metric2 sum to be 5 times the sum of metric - $threshold=5, + $threshold=5.0, $threshold_crossed(key: SumStats::Key, result: SumStats::Result) = { local thold = result["test.metric2"]$sum / result["test.metric"]$sum;