diff --git a/scripts/base/frameworks/measurement/cluster.bro b/scripts/base/frameworks/measurement/cluster.bro index 9c35b85b32..481b306417 100644 --- a/scripts/base/frameworks/measurement/cluster.bro +++ b/scripts/base/frameworks/measurement/cluster.bro @@ -33,70 +33,78 @@ export { ## The goal for this option is also meant to be temporary. const enable_intermediate_updates = T &redef; - # Event sent by the manager in a cluster to initiate the - # collection of metrics values for a measurement. + ## Event sent by the manager in a cluster to initiate the + ## collection of metrics values for a measurement. global cluster_measurement_request: event(uid: string, mid: string); - # Event sent by nodes that are collecting metrics after receiving - # a request for the metric measurement from the manager. + ## Event sent by nodes that are collecting metrics after receiving + ## a request for the metric measurement from the manager. global cluster_measurement_response: event(uid: string, mid: string, data: ResultTable, done: bool); - # This event is sent by the manager in a cluster to initiate the - # collection of a single key value from a measurement. It's typically - # used to get intermediate updates before the break interval triggers - # to speed detection of a value crossing a threshold. + ## This event is sent by the manager in a cluster to initiate the + ## collection of a single key value from a measurement. It's typically + ## used to get intermediate updates before the break interval triggers + ## to speed detection of a value crossing a threshold. global cluster_key_request: event(uid: string, mid: string, key: Key); - # This event is sent by nodes in response to a - # :bro:id:`Measurement::cluster_key_request` event. - global cluster_key_response: event(uid: string, mid: string, key: Key, result: ResultTable); + ## This event is sent by nodes in response to a + ## :bro:id:`Measurement::cluster_key_request` event. + global cluster_key_response: event(uid: string, mid: string, key: Key, result: Result); - # This is sent by workers to indicate that they crossed the percent of the - # current threshold by the percentage defined globally in - # :bro:id:`Measurement::cluster_request_global_view_percent` + ## This is sent by workers to indicate that they crossed the percent of the + ## current threshold by the percentage defined globally in + ## :bro:id:`Measurement::cluster_request_global_view_percent` global cluster_key_intermediate_response: event(mid: string, key: Measurement::Key); - # This event is scheduled internally on workers to send result chunks. - global send_data: event(uid: string, id: string, measurement_name: string, data: ResultTable); + ## This event is scheduled internally on workers to send result chunks. + global send_data: event(uid: string, mid: string, data: ResultTable); } # Add events to the cluster framework to make this work. redef Cluster::manager2worker_events += /Measurement::cluster_(measurement_request|key_request)/; +redef Cluster::manager2worker_events += /Measurement::new_measurement/; redef Cluster::worker2manager_events += /Measurement::cluster_(measurement_response|key_response|key_intermediate_response)/; @if ( Cluster::local_node_type() != Cluster::MANAGER ) -# This variable is maintained to know what keysthey have recently sent as +# This variable is maintained to know what keys have recently sent as # intermediate updates so they don't overwhelm their manager. The count that is # yielded is the number of times the percentage threshold has been crossed and # an intermediate result has been received. -global recent_global_view_keys: table[string, string, Key] of count &create_expire=1min &default=0; +global recent_global_view_keys: table[string, Key] of count &create_expire=1min &default=0; + +event bro_init() &priority=-100 + { + # The manager is the only host allowed to track these. + measurement_store = table(); + reducer_store = table(); + } # This is done on all non-manager node types in the event that a metric is # being collected somewhere other than a worker. -function data_added(measurement: Filter, key: Key, val: Result) +function data_added(m: Measurement, key: Key, result: Result) { # If an intermediate update for this value was sent recently, don't send # it again. - if ( [measurement$id, measurement$name, key] in recent_global_view_keys ) + if ( [m$id, key] in recent_global_view_keys ) return; # If val is 5 and global view % is 0.1 (10%), pct_val will be 50. If that # crosses the full threshold then it's a candidate to send as an # intermediate update. if ( enable_intermediate_updates && - check_thresholds(measurement, key, val, cluster_request_global_view_percent) ) + check_thresholds(m, key, result, cluster_request_global_view_percent) ) { # kick off intermediate update - event Measurement::cluster_key_intermediate_response(measurement$id, measurement$name, key); - ++recent_global_view_keys[measurement$id, measurement$name, key]; + event Measurement::cluster_key_intermediate_response(m$id, key); + ++recent_global_view_keys[m$id, key]; } } -event Measurement::send_data(uid: string, id: string, data: ResultTable) +event Measurement::send_data(uid: string, mid: string, data: ResultTable) { #print fmt("WORKER %s: sending data for uid %s...", Cluster::node, uid); - - local local_data: ResultTable; + + local local_data: ResultTable = table(); local num_added = 0; for ( key in data ) { @@ -114,9 +122,9 @@ event Measurement::send_data(uid: string, id: string, data: ResultTable) if ( |data| == 0 ) done = T; - event Measurement::cluster_measurement_response(uid, local_data, done); + event Measurement::cluster_measurement_response(uid, mid, local_data, done); if ( ! done ) - event Measurement::send_data(uid, mid, data); + schedule 0.01 sec { Measurement::send_data(uid, mid, data) }; } event Measurement::cluster_measurement_request(uid: string, mid: string) @@ -128,12 +136,13 @@ event Measurement::cluster_measurement_request(uid: string, mid: string) # Lookup the actual measurement and reset it, the reference to the data # currently stored will be maintained internally by the send_data event. - reset(measurement_store[mid]); + if ( mid in measurement_store ) + reset(measurement_store[mid]); } event Measurement::cluster_key_request(uid: string, mid: string, key: Key) { - if ( [mid] in result_store && key in result_store[mid] ) + if ( mid in result_store && key in result_store[mid] ) { #print fmt("WORKER %s: received the cluster_key_request event for %s=%s.", Cluster::node, key2str(key), data); event Measurement::cluster_key_response(uid, mid, key, result_store[mid][key]); @@ -142,7 +151,7 @@ event Measurement::cluster_key_request(uid: string, mid: string, key: Key) { # We need to send an empty response if we don't have the data so that the manager # can know that it heard back from all of the workers. - event Measurement::cluster_key_response(uid, mid, key, [$begin=network_time(), $end=network_time()]); + event Measurement::cluster_key_response(uid, mid, key, table()); } } @@ -152,62 +161,70 @@ event Measurement::cluster_key_request(uid: string, mid: string, key: Key) @if ( Cluster::local_node_type() == Cluster::MANAGER ) # This variable is maintained by manager nodes as they collect and aggregate -# results. It's index on a uid. +# results. +# Index on a uid. global measurement_results: table[string] of ResultTable &read_expire=1min; # This variable is maintained by manager nodes to track how many "dones" they # collected per collection unique id. Once the number of results for a uid # matches the number of peer nodes that results should be coming from, the # result is written out and deleted from here. +# Indexed on a uid. # TODO: add an &expire_func in case not all results are received. global done_with: table[string] of count &read_expire=1min &default=0; # This variable is maintained by managers to track intermediate responses as -# they are getting a global view for a certain key. Indexed on a uid. +# they are getting a global view for a certain key. +# Indexed on a uid. global key_requests: table[string] of Result &read_expire=1min; # This variable is maintained by managers to prevent overwhelming communication due # to too many intermediate updates. Each measurement is tracked separately so that -# one metric won't overwhelm and degrade other quieter metrics. Indexed on a -# measurement id. +# one won't overwhelm and degrade other quieter measurements. +# Indexed on a measurement id. global outstanding_global_views: table[string] of count &default=0; +const zero_time = double_to_time(0.0); # Managers handle logging. -event Measurement::finish_period(m: Measurement) +event Measurement::finish_epoch(m: Measurement) { - #print fmt("%.6f MANAGER: breaking %s measurement for %s metric", network_time(), measurement$name, measurement$id); - local uid = unique_id(""); - - if ( uid in measurement_results ) - delete measurement_results[uid]; - measurement_results[uid] = table(); - - # Request data from peers. - event Measurement::cluster_measurement_request(uid, m$id); - # Schedule the next finish_period event. - schedule m$epoch { Measurement::finish_period(m) }; + if ( network_time() > zero_time ) + { + #print fmt("%.6f MANAGER: breaking %s measurement for %s metric", network_time(), measurement$name, measurement$id); + local uid = unique_id(""); + + if ( uid in measurement_results ) + delete measurement_results[uid]; + measurement_results[uid] = table(); + + # Request data from peers. + event Measurement::cluster_measurement_request(uid, m$id); + } + + # Schedule the next finish_epoch event. + schedule m$epoch { Measurement::finish_epoch(m) }; } # This is unlikely to be called often, but it's here in case there are measurements # being collected by managers. function data_added(m: Measurement, key: Key, result: Result) { - #if ( check_thresholds(m, key, val, 1.0) ) - # threshold_crossed(m, key, val); + if ( check_thresholds(m, key, result, 1.0) ) + threshold_crossed(m, key, result); } event Measurement::cluster_key_response(uid: string, mid: string, key: Key, result: Result) { - #print fmt("%0.6f MANAGER: receiving key data from %s - %s=%s", network_time(), get_event_peer()$descr, key2str(key), val); + #print fmt("%0.6f MANAGER: receiving key data from %s - %s=%s", network_time(), get_event_peer()$descr, key2str(key), result); # We only want to try and do a value merge if there are actually measured datapoints # in the Result. - if ( result$num > 0 && uid in key_requests ) - key_requests[uid] = compose_resultvals(key_requests[uid], result); + if ( uid in key_requests ) + key_requests[uid] = compose_results(key_requests[uid], result); else key_requests[uid] = result; - # Mark that this worker is done. + # Mark that a worker is done. ++done_with[uid]; #print fmt("worker_count:%d :: done_with:%d", Cluster::worker_count, done_with[uid]); @@ -232,7 +249,7 @@ event Measurement::cluster_key_intermediate_response(mid: string, key: Key) #print fmt("MANAGER: receiving intermediate key data from %s", get_event_peer()$descr); #print fmt("MANAGER: requesting key data for %s", key2str(key)); - if ( [mid] in outstanding_global_views && + if ( mid in outstanding_global_views && |outstanding_global_views[mid]| > max_outstanding_global_views ) { # Don't do this intermediate update. Perhaps at some point in the future @@ -261,7 +278,7 @@ event Measurement::cluster_measurement_response(uid: string, mid: string, data: for ( key in data ) { if ( key in local_data ) - local_data[key] = compose_resultvals(local_data[key], data[key]); + local_data[key] = compose_results(local_data[key], data[key]); else local_data[key] = data[key]; @@ -280,8 +297,8 @@ event Measurement::cluster_measurement_response(uid: string, mid: string, data: # If the data has been collected from all peers, we are done and ready to finish. if ( Cluster::worker_count == done_with[uid] ) { - if ( m?$period_finished ) - m$period_finished(local_data); + if ( m?$epoch_finished ) + m$epoch_finished(local_data); # Clean up delete measurement_results[uid]; @@ -291,4 +308,9 @@ event Measurement::cluster_measurement_response(uid: string, mid: string, data: } } +event remote_connection_handshake_done(p: event_peer) &priority=5 + { + send_id(p, "Measurement::measurement_store"); + send_id(p, "Measurement::reducer_store"); + } @endif diff --git a/scripts/base/frameworks/measurement/main.bro b/scripts/base/frameworks/measurement/main.bro index a7f22ed3b7..a8e2950f5a 100644 --- a/scripts/base/frameworks/measurement/main.bro +++ b/scripts/base/frameworks/measurement/main.bro @@ -19,21 +19,21 @@ export { ## value in a Host header. This is an example of a non-host based ## metric since multiple IP addresses could respond for the same Host ## header value. - str: string &optional; + str: string &optional; ## Host is the value to which this metric applies. - host: addr &optional; - } &log; + host: addr &optional; + }; ## Represents data being added for a single metric data point. ## Only supply a single value here at a time. type DataPoint: record { ## Count value. - num: count &optional; + num: count &optional; ## Double value. - dbl: double &optional; + dbl: double &optional; ## String value. - str: string &optional; + str: string &optional; }; type Reducer: record { @@ -45,7 +45,7 @@ export { ## A predicate so that you can decide per key if you would like ## to accept the data being inserted. - pred: function(key: Measurement::Key, data: Measurement::DataPoint): bool &optional; + pred: function(key: Measurement::Key, point: Measurement::DataPoint): bool &optional; ## A function to normalize the key. This can be used to aggregate or ## normalize the entire key. @@ -54,44 +54,45 @@ export { ## Value calculated for a data point stream fed into a reducer. ## Most of the fields are added by plugins. - type Result: record { + type ResultVal: record { ## The time when the first data point was added to this result value. - begin: time &log; + begin: time; ## The time when the last data point was added to this result value. - end: time &log; + end: time; ## The number of measurements received. - num: count &log &default=0; + num: count &default=0; }; - ## Type to store a table of measurement results. First table is - ## indexed on the measurement Key and the enclosed table is - ## indexed on the data id that the Key was relevant for. - type ResultTable: table[Key] of table[string] of Result; + ## Type to store results for multiple reducers. + type Result: table[string] of ResultVal; - ## Filters define how the data from a metric is aggregated and handled. - ## Filters can be used to set how often the measurements are cut - ## and logged or how the data within them is aggregated. + ## Type to store a table of measurement results indexed by the measurement key. + type ResultTable: table[Key] of Result; + + ## Measurements represent an aggregation of reducers along with + ## mechanisms to handle various situations like the epoch ending + ## or thresholds being crossed. type Measurement: record { ## The interval at which this filter should be "broken" and the - ## callback called. The counters are also reset to zero at - ## this time so any threshold based detection needs to be set to a - ## number that should be expected to happen within this period. + ## '$epoch_finished' callback called. The results are also reset + ## at this time so any threshold based detection needs to be set to a + ## number that should be expected to happen within this epoch. epoch: interval; ## The reducers for the measurement indexed by data id. reducers: set[Reducer]; - ## Optionally provide a function to calculate a value from the Result + ## Provide a function to calculate a value from the :bro:see:`Result` ## structure which will be used for thresholding. - threshold_val: function(result: Measurement::Result): count &optional; + threshold_val: function(key: Measurement::Key, result: Measurement::Result): count &optional; ## The threshold value for calling the $threshold_crossed callback. - threshold: count &optional; + threshold: count &optional; ## A series of thresholds for calling the $threshold_crossed callback. - threshold_series: vector of count &optional; + threshold_series: vector of count &optional; ## A callback that is called when a threshold is crossed. threshold_crossed: function(key: Measurement::Key, result: Measurement::Result) &optional; @@ -100,22 +101,21 @@ export { ## It's best to not access any global state outside of the variables ## given to the callback because there is no assurance provided as to ## where the callback will be executed on clusters. - period_finished: function(data: Measurement::ResultTable) &optional; + epoch_finished: function(rt: Measurement::ResultTable) &optional; }; ## Create a measurement. global create: function(m: Measurement::Measurement); - ## Add data into a metric. This should be called when - ## a script has measured some point value and is ready to increment the - ## counters. + ## Add data into a data point stream. This should be called when + ## a script has measured some point value. ## - ## id: The metric identifier that the data represents. + ## id: The stream identifier that the data point represents. ## - ## key: The metric key that the value is to be added to. + ## key: The measurement key that the value is to be added to. ## - ## data: The data point to send into the stream. - global add_data: function(id: string, key: Measurement::Key, data: Measurement::DataPoint); + ## point: The data point to send into the stream. + global add_data: function(id: string, key: Measurement::Key, point: Measurement::DataPoint); ## Helper function to represent a :bro:type:`Measurement::Key` value as ## a simple string. @@ -124,15 +124,19 @@ export { ## ## Returns: A string representation of the metric key. global key2str: function(key: Measurement::Key): string; - + + ## This event is generated for each new measurement that is created. + ## + ## m: The record which describes a measurement. + global new_measurement: event(m: Measurement); } redef record Reducer += { - # Internal use only. Measurement ID. + # Internal use only. Provides a reference back to the related Measurement by it's ID. mid: string &optional; }; -redef record Result += { +type Thresholding: record { # Internal use only. Indicates if a simple threshold was already crossed. is_threshold_crossed: bool &default=F; @@ -143,16 +147,22 @@ redef record Result += { redef record Measurement += { # Internal use only (mostly for cluster coherency). id: string &optional; + + # Internal use only. For tracking tresholds per key. + threshold_tracker: table[Key] of Thresholding &optional; }; -# Store of reducers indexed on the data id. +# Store of measurements indexed on the measurement id. +global measurement_store: table[string] of Measurement = table(); + +# Store of reducers indexed on the data point stream id. global reducer_store: table[string] of set[Reducer] = table(); # Store of results indexed on the measurement id. global result_store: table[string] of ResultTable = table(); -# Store of measurements indexed on the measurement id. -global measurement_store: table[string] of Measurement = table(); +# Store of threshold information. +global thresholds_store: table[string, Key] of bool = table(); # This is called whenever # key values are updated and the new val is given as the `val` argument. @@ -161,13 +171,15 @@ global measurement_store: table[string] of Measurement = table(); global data_added: function(m: Measurement, key: Key, result: Result); # Prototype the hook point for plugins to do calculations. -global add_to_reducer: hook(r: Reducer, val: double, data: DataPoint, result: Result); +global add_to_reducer_hook: hook(r: Reducer, val: double, data: DataPoint, rv: ResultVal); +# Prototype the hook point for plugins to initialize any result values. +global init_resultval_hook: hook(r: Reducer, rv: ResultVal); # Prototype the hook point for plugins to merge Results. -global compose_resultvals_hook: hook(result: Result, rv1: Result, rv2: Result); +global compose_resultvals_hook: hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal); # Event that is used to "finish" measurements and adapt the measurement # framework for clustered or non-clustered usage. -global finish_period: event(m: Measurement); +global finish_epoch: event(m: Measurement); function key2str(key: Key): string { @@ -176,12 +188,19 @@ function key2str(key: Key): string out = fmt("%shost=%s", out, key$host); if ( key?$str ) out = fmt("%s%sstr=%s", out, |out|==0 ? "" : ", ", key$str); - return fmt("metric_key(%s)", out); + return fmt("measurement_key(%s)", out); } -function compose_resultvals(rv1: Result, rv2: Result): Result +function init_resultval(r: Reducer): ResultVal { - local result: Result; + local rv: ResultVal = [$begin=network_time(), $end=network_time()]; + hook init_resultval_hook(r, rv); + return rv; + } + +function compose_resultvals(rv1: ResultVal, rv2: ResultVal): ResultVal + { + local result: ResultVal; # Merge $begin (take the earliest one) result$begin = (rv1$begin < rv2$begin) ? rv1$begin : rv2$begin; @@ -192,18 +211,40 @@ function compose_resultvals(rv1: Result, rv2: Result): Result # Merge $num result$num = rv1$num + rv2$num; - # Merge $threshold_series_index - result$threshold_series_index = (rv1$threshold_series_index > rv2$threshold_series_index) ? rv1$threshold_series_index : rv2$threshold_series_index; - - # Merge $is_threshold_crossed - if ( rv1$is_threshold_crossed || rv2$is_threshold_crossed ) - result$is_threshold_crossed = T; - hook compose_resultvals_hook(result, rv1, rv2); return result; } + +function compose_results(r1: Result, r2: Result): Result + { + local result: Result = table(); + + if ( |r1| > |r2| ) + { + for ( data_id in r1 ) + { + if ( data_id in r2 ) + result[data_id] = compose_resultvals(r1[data_id], r2[data_id]); + else + result[data_id] = r1[data_id]; + } + } + else + { + for ( data_id in r2 ) + { + if ( data_id in r1 ) + result[data_id] = compose_resultvals(r1[data_id], r2[data_id]); + else + result[data_id] = r2[data_id]; + } + } + return result; + } + + function reset(m: Measurement) { if ( m$id in result_store ) @@ -214,7 +255,10 @@ function reset(m: Measurement) function create(m: Measurement) { - m$id=unique_id(""); + if ( ! m?$id ) + m$id=unique_id(""); + local tmp: table[Key] of Thresholding = table(); + m$threshold_tracker = tmp; measurement_store[m$id] = m; for ( reducer in m$reducers ) @@ -226,20 +270,20 @@ function create(m: Measurement) } reset(m); - schedule m$epoch { Measurement::finish_period(m) }; + schedule m$epoch { Measurement::finish_epoch(m) }; } -function add_data(data_id: string, key: Key, data: DataPoint) +function add_data(id: string, key: Key, point: DataPoint) { # Try to add the data to all of the defined reducers. - if ( data_id !in reducer_store ) + if ( id !in reducer_store ) return; - for ( r in reducer_store[data_id] ) + for ( r in reducer_store[id] ) { # If this reducer has a predicate, run the predicate # and skip this key if the predicate return false. - if ( r?$pred && ! r$pred(key, data) ) + if ( r?$pred && ! r$pred(key, point) ) next; if ( r?$normalize_key ) @@ -249,20 +293,21 @@ function add_data(data_id: string, key: Key, data: DataPoint) local results = result_store[m$id]; if ( key !in results ) results[key] = table(); - if ( data_id !in results[key] ) - results[key][data_id] = [$begin=network_time(), $end=network_time()]; + if ( id !in results[key] ) + results[key][id] = init_resultval(r); - local result = results[key][data_id]; - ++result$num; + local result = results[key]; + local result_val = result[id]; + ++result_val$num; # Continually update the $end field. - result$end=network_time(); + result_val$end=network_time(); # If a string was given, fall back to 1.0 as the value. local val = 1.0; - if ( data?$num || data?$dbl ) - val = data?$dbl ? data$dbl : data$num; + if ( point?$num || point?$dbl ) + val = point?$dbl ? point$dbl : point$num; - hook add_to_reducer(r, val, data, result); + hook add_to_reducer_hook(r, val, point, result_val); data_added(m, key, result); } } @@ -274,28 +319,37 @@ function check_thresholds(m: Measurement, key: Key, result: Result, modify_pct: if ( ! (m?$threshold || m?$threshold_series) ) return F; - local watch = 0.0; - #if ( val?$unique ) - # watch = val$unique; - #else if ( val?$sum ) - # watch = val$sum; + if ( key !in m$threshold_tracker ) + { + local tmp: Thresholding; + m$threshold_tracker[key] = tmp; + } - if ( m?$threshold_val ) - watch = m$threshold_val(result); + # Add in the extra ResultVals to make threshold_vals easier to write. + if ( |m$reducers| != |result| ) + { + for ( reducer in m$reducers ) + { + if ( reducer$stream !in result ) + result[reducer$stream] = init_resultval(reducer); + } + } + + local watch = m$threshold_val(key, result); if ( modify_pct < 1.0 && modify_pct > 0.0 ) - watch = watch/modify_pct; + watch = double_to_count(floor(watch/modify_pct)); - if ( ! result$is_threshold_crossed && - m?$threshold && watch >= m$threshold ) + local tt = m$threshold_tracker[key]; + if ( m?$threshold && ! tt$is_threshold_crossed && watch >= m$threshold ) { - # A default threshold was given and the value crossed it. + # Value crossed the threshold. return T; } if ( m?$threshold_series && - |m$threshold_series| >= result$threshold_series_index && - watch >= m$threshold_series[result$threshold_series_index] ) + |m$threshold_series| >= tt$threshold_series_index && + watch >= m$threshold_series[tt$threshold_series_index] ) { # A threshold series was given and the value crossed the next # value in the series. @@ -307,17 +361,29 @@ function check_thresholds(m: Measurement, key: Key, result: Result, modify_pct: function threshold_crossed(m: Measurement, key: Key, result: Result) { + # If there is no callback, there is no point in any of this. if ( ! m?$threshold_crossed ) return; #if ( val?$sample_queue ) # val$samples = Queue::get_str_vector(val$sample_queue); + # Add in the extra ResultVals to make threshold_crossed callbacks easier to write. + if ( |m$reducers| != |result| ) + { + for ( reducer in m$reducers ) + { + if ( reducer$stream !in result ) + result[reducer$stream] = init_resultval(reducer); + } + } + m$threshold_crossed(key, result); - result$is_threshold_crossed = T; + local tt = m$threshold_tracker[key]; + tt$is_threshold_crossed = T; # Bump up to the next threshold series index if a threshold series is being used. if ( m?$threshold_series ) - ++result$threshold_series_index; + ++tt$threshold_series_index; } diff --git a/scripts/base/frameworks/measurement/non-cluster.bro b/scripts/base/frameworks/measurement/non-cluster.bro index 7a0a2a2c3e..35ff9dc935 100644 --- a/scripts/base/frameworks/measurement/non-cluster.bro +++ b/scripts/base/frameworks/measurement/non-cluster.bro @@ -2,18 +2,18 @@ module Measurement; -event Measurement::finish_period(m: Measurement) +event Measurement::finish_epoch(m: Measurement) { if ( m$id in result_store ) { local data = result_store[m$id]; - if ( m?$period_finished ) - m$period_finished(data); + if ( m?$epoch_finished ) + m$epoch_finished(data); reset(m); } - - schedule m$epoch { Measurement::finish_period(m) }; + + schedule m$epoch { Measurement::finish_epoch(m) }; } diff --git a/scripts/base/frameworks/measurement/plugins/average.bro b/scripts/base/frameworks/measurement/plugins/average.bro index 629cb2fc7b..172e8c788d 100644 --- a/scripts/base/frameworks/measurement/plugins/average.bro +++ b/scripts/base/frameworks/measurement/plugins/average.bro @@ -7,24 +7,24 @@ export { AVERAGE }; - redef record Result += { + redef record ResultVal += { ## For numeric data, this calculates the average of all values. - average: double &log &optional; + average: double &optional; }; } -hook add_to_reducer(r: Reducer, val: double, data: DataPoint, result: Result) +hook add_to_reducer_hook(r: Reducer, val: double, data: DataPoint, rv: ResultVal) { if ( AVERAGE in r$apply ) { - if ( ! result?$average ) - result$average = val; + if ( ! rv?$average ) + rv$average = val; else - result$average += (val - result$average) / result$num; + rv$average += (val - rv$average) / rv$num; } } -hook compose_resultvals_hook(result: Result, rv1: Result, rv2: Result) +hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal) { if ( rv1?$average && rv2?$average ) result$average = ((rv1$average*rv1$num) + (rv2$average*rv2$num))/(rv1$num+rv2$num); diff --git a/scripts/base/frameworks/measurement/plugins/max.bro b/scripts/base/frameworks/measurement/plugins/max.bro index 5138e3f684..02b536f849 100644 --- a/scripts/base/frameworks/measurement/plugins/max.bro +++ b/scripts/base/frameworks/measurement/plugins/max.bro @@ -7,24 +7,24 @@ export { MAX }; - redef record Result += { + redef record ResultVal += { ## For numeric data, this tracks the maximum value given. - max: double &log &optional; + max: double &optional; }; } -hook add_to_reducer(r: Reducer, val: double, data: DataPoint, result: Result) +hook add_to_reducer_hook(r: Reducer, val: double, data: DataPoint, rv: ResultVal) { if ( MAX in r$apply ) { - if ( ! result?$max ) - result$max = val; - else if ( val > result$max ) - result$max = val; + if ( ! rv?$max ) + rv$max = val; + else if ( val > rv$max ) + rv$max = val; } } -hook compose_resultvals_hook(result: Result, rv1: Result, rv2: Result) +hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal) { if ( rv1?$max && rv2?$max ) result$max = (rv1$max > rv2$max) ? rv1$max : rv2$max; diff --git a/scripts/base/frameworks/measurement/plugins/min.bro b/scripts/base/frameworks/measurement/plugins/min.bro index ebdbb39373..944ee9fcb4 100644 --- a/scripts/base/frameworks/measurement/plugins/min.bro +++ b/scripts/base/frameworks/measurement/plugins/min.bro @@ -7,24 +7,24 @@ export { MIN }; - redef record Result += { + redef record ResultVal += { ## For numeric data, this tracks the minimum value given. - min: double &log &optional; + min: double &optional; }; } -hook add_to_reducer(r: Reducer, val: double, data: DataPoint, result: Result) +hook add_to_reducer_hook(r: Reducer, val: double, data: DataPoint, rv: ResultVal) { if ( MIN in r$apply ) { - if ( ! result?$min ) - result$min = val; - else if ( val < result$min ) - result$min = val; + if ( ! rv?$min ) + rv$min = val; + else if ( val < rv$min ) + rv$min = val; } } -hook compose_resultvals_hook(result: Result, rv1: Result, rv2: Result) +hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal) { if ( rv1?$min && rv2?$min ) result$min = (rv1$min < rv2$min) ? rv1$min : rv2$min; diff --git a/scripts/base/frameworks/measurement/plugins/sample.bro b/scripts/base/frameworks/measurement/plugins/sample.bro index 3edd92ce13..e0084e88d1 100644 --- a/scripts/base/frameworks/measurement/plugins/sample.bro +++ b/scripts/base/frameworks/measurement/plugins/sample.bro @@ -8,7 +8,7 @@ export { samples: count &default=0; }; - redef record Result += { + redef record ResultVal += { ## A sample of something being measured. This is helpful in ## some cases for collecting information to do further detection ## or better logging for forensic purposes. @@ -16,24 +16,24 @@ export { }; } -redef record Result += { +redef record ResultVal += { # Internal use only. This is the queue where samples # are maintained since the queue is self managing for # the number of samples requested. sample_queue: Queue::Queue &optional; }; -hook add_to_reducer(r: Reducer, val: double, data: DataPoint, result: Result) +hook add_to_reducer_hook(r: Reducer, val: double, data: DataPoint, rv: ResultVal) { if ( r$samples > 0 ) { - if ( ! result?$sample_queue ) - result$sample_queue = Queue::init([$max_len=r$samples]); - Queue::push(result$sample_queue, data$str); + if ( ! rv?$sample_queue ) + rv$sample_queue = Queue::init([$max_len=r$samples]); + Queue::push(rv$sample_queue, data$str); } } -hook compose_resultvals_hook(result: Result, rv1: Result, rv2: Result) +hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal) { # Merge $sample_queue if ( rv1?$sample_queue && rv2?$sample_queue ) diff --git a/scripts/base/frameworks/measurement/plugins/std-dev.bro b/scripts/base/frameworks/measurement/plugins/std-dev.bro index 6d13d7fc51..bcf2cdcb00 100644 --- a/scripts/base/frameworks/measurement/plugins/std-dev.bro +++ b/scripts/base/frameworks/measurement/plugins/std-dev.bro @@ -9,28 +9,31 @@ export { STD_DEV }; - redef record Result += { + redef record ResultVal += { ## For numeric data, this calculates the standard deviation. - std_dev: double &log &optional; + std_dev: double &optional; }; } +function calc_std_dev(rv: ResultVal) + { + if ( rv?$variance ) + rv$std_dev = sqrt(rv$variance); + } + # This depends on the variance plugin which uses priority -5 -hook add_to_reducer(r: Reducer, val: double, data: DataPoint, result: Result) +hook add_to_reducer_hook(r: Reducer, val: double, data: DataPoint, rv: ResultVal) &priority=-10 { if ( STD_DEV in r$apply ) { - if ( result?$variance ) - result$std_dev = sqrt(result$variance); + if ( rv?$variance ) + calc_std_dev(rv); + else + rv$std_dev = 0.0; } } -hook compose_resultvals_hook(result: Result, rv1: Result, rv2: Result) &priority=-10 +hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal) &priority=-10 { - if ( rv1?$sum || rv2?$sum ) - { - result$sum = rv1?$sum ? rv1$sum : 0; - if ( rv2?$sum ) - result$sum += rv2$sum; - } - } \ No newline at end of file + calc_std_dev(result); + } diff --git a/scripts/base/frameworks/measurement/plugins/sum.bro b/scripts/base/frameworks/measurement/plugins/sum.bro index 7e8b6ff692..8f989317d8 100644 --- a/scripts/base/frameworks/measurement/plugins/sum.bro +++ b/scripts/base/frameworks/measurement/plugins/sum.bro @@ -8,23 +8,32 @@ export { SUM }; - redef record Result += { + redef record ResultVal += { ## For numeric data, this tracks the sum of all values. - sum: double &log &optional; + sum: double &default=0.0; }; + + type threshold_function: function(key: Measurement::Key, result: Measurement::Result): count; + global sum_threshold: function(data_id: string): threshold_function; } -hook add_to_reducer(r: Reducer, val: double, data: DataPoint, result: Result) +function sum_threshold(data_id: string): threshold_function { - if ( SUM in r$apply ) + return function(key: Measurement::Key, result: Measurement::Result): count { - if ( ! result?$sum ) - result$sum = 0; - result$sum += val; - } + print fmt("data_id: %s", data_id); + print result; + return double_to_count(result[data_id]$sum); + }; } -hook compose_resultvals_hook(result: Result, rv1: Result, rv2: Result) +hook add_to_reducer_hook(r: Reducer, val: double, data: DataPoint, rv: ResultVal) + { + if ( SUM in r$apply ) + rv$sum += val; + } + +hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal) { if ( rv1?$sum || rv2?$sum ) { diff --git a/scripts/base/frameworks/measurement/plugins/unique.bro b/scripts/base/frameworks/measurement/plugins/unique.bro index 4f30206a4e..5160f0df91 100644 --- a/scripts/base/frameworks/measurement/plugins/unique.bro +++ b/scripts/base/frameworks/measurement/plugins/unique.bro @@ -7,14 +7,14 @@ export { UNIQUE }; - redef record Result += { + redef record ResultVal += { ## If cardinality is being tracked, the number of unique ## items is tracked here. - unique: count &log &optional; + unique: count &optional; }; } -redef record Result += { +redef record ResultVal += { # Internal use only. This is not meant to be publically available # because we don't want to trust that we can inspect the values # since we will like move to a probalistic data structure in the future. @@ -22,18 +22,18 @@ redef record Result += { unique_vals: set[DataPoint] &optional; }; -hook add_to_reducer(r: Reducer, val: double, data: DataPoint, result: Result) +hook add_to_reducer_hook(r: Reducer, val: double, data: DataPoint, rv: ResultVal) { if ( UNIQUE in r$apply ) { - if ( ! result?$unique_vals ) - result$unique_vals=set(); - add result$unique_vals[data]; - result$unique = |result$unique_vals|; + if ( ! rv?$unique_vals ) + rv$unique_vals=set(); + add rv$unique_vals[data]; + rv$unique = |rv$unique_vals|; } } -hook compose_resultvals_hook(result: Result, rv1: Result, rv2: Result) +hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal) { if ( rv1?$unique_vals || rv2?$unique_vals ) { diff --git a/scripts/base/frameworks/measurement/plugins/variance.bro b/scripts/base/frameworks/measurement/plugins/variance.bro index 07a7293539..dc94f39840 100644 --- a/scripts/base/frameworks/measurement/plugins/variance.bro +++ b/scripts/base/frameworks/measurement/plugins/variance.bro @@ -8,40 +8,40 @@ export { VARIANCE }; - redef record Result += { + redef record ResultVal += { ## For numeric data, this calculates the variance. - variance: double &log &optional; + variance: double &optional; }; } -redef record Result += { +redef record ResultVal += { # Internal use only. Used for incrementally calculating variance. prev_avg: double &optional; # Internal use only. For calculating incremental variance. - var_s: double &optional; + var_s: double &default=0.0; }; -hook add_to_reducer(r: Reducer, val: double, data: DataPoint, result: Result) +function calc_variance(rv: ResultVal) { - if ( VARIANCE in r$apply ) - result$prev_avg = result$average; + rv$variance = (rv$num > 1) ? rv$var_s/(rv$num-1) : 0.0; } # Reduced priority since this depends on the average -hook add_to_reducer(r: Reducer, val: double, data: DataPoint, result: Result) &priority=-5 +hook add_to_reducer_hook(r: Reducer, val: double, data: DataPoint, rv: ResultVal) &priority=-5 { if ( VARIANCE in r$apply ) { - if ( ! result?$var_s ) - result$var_s = 0.0; - result$var_s += (val - result$prev_avg) * (val - result$average); - result$variance = (val > 0) ? result$var_s/val : 0.0; + if ( rv$num > 1 ) + rv$var_s += ((val - rv$prev_avg) * (val - rv$average)); + + calc_variance(rv); + rv$prev_avg = rv$average; } } # Reduced priority since this depends on the average -hook compose_resultvals_hook(result: Result, rv1: Result, rv2: Result) &priority=-5 +hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal) &priority=-5 { if ( rv1?$var_s && rv2?$var_s ) { @@ -62,4 +62,6 @@ hook compose_resultvals_hook(result: Result, rv1: Result, rv2: Result) &priority result$prev_avg = rv1$prev_avg; else if ( rv2?$prev_avg ) result$prev_avg = rv2$prev_avg; + + calc_variance(result); } \ No newline at end of file diff --git a/testing/btest/Baseline/scripts.base.frameworks.measurement.basic-cluster/manager-1..stdout b/testing/btest/Baseline/scripts.base.frameworks.measurement.basic-cluster/manager-1..stdout new file mode 100644 index 0000000000..ea8904d2e6 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.measurement.basic-cluster/manager-1..stdout @@ -0,0 +1,4 @@ +Host: 6.5.4.3 - num:2 - sum:6.0 - avg:3.0 - max:5.0 - min:1.0 - var:8.0 - std_dev:2.8 - unique:2 +Host: 10.10.10.10 - num:1 - sum:5.0 - avg:5.0 - max:5.0 - min:5.0 - var:0.0 - std_dev:0.0 - unique:1 +Host: 1.2.3.4 - num:9 - sum:437.0 - avg:48.6 - max:95.0 - min:3.0 - var:758.8 - std_dev:27.5 - unique:8 +Host: 7.2.1.5 - num:2 - sum:145.0 - avg:72.5 - max:91.0 - min:54.0 - var:684.5 - std_dev:26.2 - unique:2 diff --git a/testing/btest/Baseline/scripts.base.frameworks.measurement.basic/.stdout b/testing/btest/Baseline/scripts.base.frameworks.measurement.basic/.stdout new file mode 100644 index 0000000000..208b6103b7 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.measurement.basic/.stdout @@ -0,0 +1,3 @@ +Host: 6.5.4.3 - num:1 - sum:2.0 - var:0.0 - avg:2.0 - max:2.0 - min:2.0 - std_dev:0.0 - unique:1 +Host: 1.2.3.4 - num:5 - sum:221.0 - var:1144.2 - avg:44.2 - max:94.0 - min:5.0 - std_dev:33.8 - unique:4 +Host: 7.2.1.5 - num:1 - sum:1.0 - var:0.0 - avg:1.0 - max:1.0 - min:1.0 - std_dev:0.0 - unique:1 diff --git a/testing/btest/Baseline/scripts.base.frameworks.measurement.cluster-intermediate-update/manager-1..stdout b/testing/btest/Baseline/scripts.base.frameworks.measurement.cluster-intermediate-update/manager-1..stdout new file mode 100644 index 0000000000..2a53389dc3 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.measurement.cluster-intermediate-update/manager-1..stdout @@ -0,0 +1 @@ +A test metric threshold was crossed with a value of: 100.0 diff --git a/testing/btest/Baseline/scripts.base.frameworks.measurement.thresholding/.stdout b/testing/btest/Baseline/scripts.base.frameworks.measurement.thresholding/.stdout new file mode 100644 index 0000000000..09c65c3864 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.measurement.thresholding/.stdout @@ -0,0 +1,6 @@ +THRESHOLD_SERIES: hit a threshold series value at 3 for measurement_key(host=1.2.3.4) +THRESHOLD: hit a threshold value at 6 for measurement_key(host=1.2.3.4) +THRESHOLD_SERIES: hit a threshold series value at 6 for measurement_key(host=1.2.3.4) +THRESHOLD: hit a threshold value at 1001 for measurement_key(host=7.2.1.5) +THRESHOLD_SERIES: hit a threshold series value at 1001 for measurement_key(host=7.2.1.5) +THRESHOLD WITH RATIO BETWEEN REDUCERS: hit a threshold value at 55x for measurement_key(host=7.2.1.5) diff --git a/testing/btest/scripts/base/frameworks/measurement/basic-cluster.bro b/testing/btest/scripts/base/frameworks/measurement/basic-cluster.bro new file mode 100644 index 0000000000..e2f5e4e7d5 --- /dev/null +++ b/testing/btest/scripts/base/frameworks/measurement/basic-cluster.bro @@ -0,0 +1,83 @@ +# @TEST-SERIALIZE: comm +# +# @TEST-EXEC: btest-bg-run manager-1 BROPATH=$BROPATH:.. CLUSTER_NODE=manager-1 bro %INPUT +# @TEST-EXEC: sleep 1 +# @TEST-EXEC: btest-bg-run worker-1 BROPATH=$BROPATH:.. CLUSTER_NODE=worker-1 bro %INPUT +# @TEST-EXEC: btest-bg-run worker-2 BROPATH=$BROPATH:.. CLUSTER_NODE=worker-2 bro %INPUT +# @TEST-EXEC: btest-bg-wait 15 + +# @TEST-EXEC: btest-diff manager-1/.stdout + +@TEST-START-FILE cluster-layout.bro +redef Cluster::nodes = { + ["manager-1"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=37757/tcp, $workers=set("worker-1", "worker-2")], + ["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=37760/tcp, $manager="manager-1", $interface="eth0"], + ["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=37761/tcp, $manager="manager-1", $interface="eth1"], +}; +@TEST-END-FILE + +redef Log::default_rotation_interval = 0secs; + +global n = 0; + +event bro_init() &priority=5 + { + local r1: Measurement::Reducer = [$stream="test.metric", $apply=set(Measurement::SUM, Measurement::MIN, Measurement::MAX, Measurement::AVERAGE, Measurement::STD_DEV, Measurement::VARIANCE, Measurement::UNIQUE)]; + Measurement::create([$epoch=5secs, + $reducers=set(r1), + $epoch_finished(rt: Measurement::ResultTable) = + { + for ( key in rt ) + { + local r = rt[key]["test.metric"]; + print fmt("Host: %s - num:%d - sum:%.1f - avg:%.1f - max:%.1f - min:%.1f - var:%.1f - std_dev:%.1f - unique:%d", key$host, r$num, r$sum, r$average, r$max, r$min, r$variance, r$std_dev, r$unique); + } + + terminate(); + } + ]); + } + +event remote_connection_closed(p: event_peer) + { + terminate(); + } + +global ready_for_data: event(); +redef Cluster::manager2worker_events += /^ready_for_data$/; + +event ready_for_data() + { + if ( Cluster::node == "worker-1" ) + { + Measurement::add_data("test.metric", [$host=1.2.3.4], [$num=34]); + Measurement::add_data("test.metric", [$host=1.2.3.4], [$num=30]); + Measurement::add_data("test.metric", [$host=6.5.4.3], [$num=1]); + Measurement::add_data("test.metric", [$host=7.2.1.5], [$num=54]); + } + if ( Cluster::node == "worker-2" ) + { + Measurement::add_data("test.metric", [$host=1.2.3.4], [$num=75]); + Measurement::add_data("test.metric", [$host=1.2.3.4], [$num=30]); + Measurement::add_data("test.metric", [$host=1.2.3.4], [$num=3]); + Measurement::add_data("test.metric", [$host=1.2.3.4], [$num=57]); + Measurement::add_data("test.metric", [$host=1.2.3.4], [$num=52]); + Measurement::add_data("test.metric", [$host=1.2.3.4], [$num=61]); + Measurement::add_data("test.metric", [$host=1.2.3.4], [$num=95]); + Measurement::add_data("test.metric", [$host=6.5.4.3], [$num=5]); + Measurement::add_data("test.metric", [$host=7.2.1.5], [$num=91]); + Measurement::add_data("test.metric", [$host=10.10.10.10], [$num=5]); + } + } + +@if ( Cluster::local_node_type() == Cluster::MANAGER ) + +global peer_count = 0; +event remote_connection_handshake_done(p: event_peer) &priority=-5 + { + ++peer_count; + if ( peer_count == 2 ) + event ready_for_data(); + } + +@endif diff --git a/testing/btest/scripts/base/frameworks/measurement/basic.bro b/testing/btest/scripts/base/frameworks/measurement/basic.bro new file mode 100644 index 0000000000..e9dd21e0ef --- /dev/null +++ b/testing/btest/scripts/base/frameworks/measurement/basic.bro @@ -0,0 +1,34 @@ +# @TEST-EXEC: bro %INPUT +# @TEST-EXEC: btest-diff .stdout + +event bro_init() &priority=5 + { + local r1: Measurement::Reducer = [$stream="test.metric", + $apply=set(Measurement::SUM, + Measurement::VARIANCE, + Measurement::AVERAGE, + Measurement::MAX, + Measurement::MIN, + Measurement::STD_DEV, + Measurement::UNIQUE)]; + Measurement::create([$epoch=3secs, + $reducers=set(r1), + $epoch_finished(data: Measurement::ResultTable) = + { + for ( key in data ) + { + local r = data[key]["test.metric"]; + print fmt("Host: %s - num:%d - sum:%.1f - var:%.1f - avg:%.1f - max:%.1f - min:%.1f - std_dev:%.1f - unique:%d", key$host, r$num, r$sum, r$variance, r$average, r$max, r$min, r$std_dev, r$unique); + } + } + ]); + + Measurement::add_data("test.metric", [$host=1.2.3.4], [$num=5]); + Measurement::add_data("test.metric", [$host=1.2.3.4], [$num=22]); + Measurement::add_data("test.metric", [$host=1.2.3.4], [$num=94]); + Measurement::add_data("test.metric", [$host=1.2.3.4], [$num=50]); + Measurement::add_data("test.metric", [$host=1.2.3.4], [$num=50]); + + Measurement::add_data("test.metric", [$host=6.5.4.3], [$num=2]); + Measurement::add_data("test.metric", [$host=7.2.1.5], [$num=1]); + } diff --git a/testing/btest/scripts/base/frameworks/metrics/cluster-intermediate-update.bro b/testing/btest/scripts/base/frameworks/measurement/cluster-intermediate-update.bro similarity index 56% rename from testing/btest/scripts/base/frameworks/metrics/cluster-intermediate-update.bro rename to testing/btest/scripts/base/frameworks/measurement/cluster-intermediate-update.bro index b16645dbe6..56f44db2eb 100644 --- a/testing/btest/scripts/base/frameworks/metrics/cluster-intermediate-update.bro +++ b/testing/btest/scripts/base/frameworks/measurement/cluster-intermediate-update.bro @@ -19,14 +19,20 @@ redef Log::default_rotation_interval = 0secs; event bro_init() &priority=5 { - Metrics::add_filter("test.metric", - [$every=1hr, - $measure=set(Metrics::SUM), + local r1: Measurement::Reducer = [$stream="test.metric", $apply=set(Measurement::SUM)]; + Measurement::create([$epoch=1hr, + $reducers=set(r1), + $threshold_val(key: Measurement::Key, result: Measurement::Result) = + { + return double_to_count(result["test.metric"]$sum); + }, $threshold=100, - $threshold_crossed(index: Metrics::Index, val: Metrics::ResultVal) = { - print "A test metric threshold was crossed!"; + $threshold_crossed(key: Measurement::Key, result: Measurement::Result) = + { + print fmt("A test metric threshold was crossed with a value of: %.1f", result["test.metric"]$sum); terminate(); - }]); + } + ]); } event remote_connection_closed(p: event_peer) @@ -39,13 +45,16 @@ event do_metrics(i: count) # Worker-1 will trigger an intermediate update and then if everything # works correctly, the data from worker-2 will hit the threshold and # should trigger the notice. - Metrics::add_data("test.metric", [$host=1.2.3.4], [$num=i]); + Measurement::add_data("test.metric", [$host=1.2.3.4], [$num=i]); } -event bro_init() +event remote_connection_handshake_done(p: event_peer) { - if ( Cluster::node == "worker-1" ) - schedule 2sec { do_metrics(99) }; - if ( Cluster::node == "worker-2" ) - event do_metrics(1); + if ( p$descr == "manager-1" ) + { + if ( Cluster::node == "worker-1" ) + schedule 0.1sec { do_metrics(1) }; + if ( Cluster::node == "worker-2" ) + schedule 0.5sec { do_metrics(99) }; + } } diff --git a/testing/btest/scripts/base/frameworks/measurement/thresholding.bro b/testing/btest/scripts/base/frameworks/measurement/thresholding.bro new file mode 100644 index 0000000000..d25350930e --- /dev/null +++ b/testing/btest/scripts/base/frameworks/measurement/thresholding.bro @@ -0,0 +1,73 @@ +# @TEST-EXEC: bro %INPUT +# @TEST-EXEC: btest-diff .stdout + +redef enum Notice::Type += { + Test_Notice, +}; + +event bro_init() &priority=5 + { + local r1: Measurement::Reducer = [$stream="test.metric", $apply=set(Measurement::SUM)]; + Measurement::create([$epoch=3secs, + $reducers=set(r1), + #$threshold_val = Measurement::sum_threshold("test.metric"), + $threshold_val(key: Measurement::Key, result: Measurement::Result) = + { + return double_to_count(result["test.metric"]$sum); + }, + $threshold=5, + $threshold_crossed(key: Measurement::Key, result: Measurement::Result) = + { + local r = result["test.metric"]; + print fmt("THRESHOLD: hit a threshold value at %.0f for %s", r$sum, Measurement::key2str(key)); + } + ]); + + local r2: Measurement::Reducer = [$stream="test.metric", $apply=set(Measurement::SUM)]; + Measurement::create([$epoch=3secs, + $reducers=set(r2), + #$threshold_val = Measurement::sum_threshold("test.metric"), + $threshold_val(key: Measurement::Key, result: Measurement::Result) = + { + return double_to_count(result["test.metric"]$sum); + }, + $threshold_series=vector(3,6,800), + $threshold_crossed(key: Measurement::Key, result: Measurement::Result) = + { + local r = result["test.metric"]; + print fmt("THRESHOLD_SERIES: hit a threshold series value at %.0f for %s", r$sum, Measurement::key2str(key)); + } + ]); + + local r3: Measurement::Reducer = [$stream="test.metric", $apply=set(Measurement::SUM)]; + local r4: Measurement::Reducer = [$stream="test.metric2", $apply=set(Measurement::SUM)]; + Measurement::create([$epoch=3secs, + $reducers=set(r3, r4), + $threshold_val(key: Measurement::Key, result: Measurement::Result) = + { + # Calculate a ratio between sums of two reducers. + if ( "test.metric2" in result && "test.metric" in result && + result["test.metric"]$sum > 0 ) + return double_to_count(result["test.metric2"]$sum / result["test.metric"]$sum); + else + return 0; + }, + # Looking for metric2 sum to be 5 times the sum of metric + $threshold=5, + $threshold_crossed(key: Measurement::Key, result: Measurement::Result) = + { + local thold = result["test.metric2"]$sum / result["test.metric"]$sum; + print fmt("THRESHOLD WITH RATIO BETWEEN REDUCERS: hit a threshold value at %.0fx for %s", thold, Measurement::key2str(key)); + } + ]); + + Measurement::add_data("test.metric", [$host=1.2.3.4], [$num=3]); + Measurement::add_data("test.metric", [$host=6.5.4.3], [$num=2]); + Measurement::add_data("test.metric", [$host=7.2.1.5], [$num=1]); + Measurement::add_data("test.metric", [$host=1.2.3.4], [$num=3]); + Measurement::add_data("test.metric", [$host=7.2.1.5], [$num=1000]); + Measurement::add_data("test.metric2", [$host=7.2.1.5], [$num=10]); + Measurement::add_data("test.metric2", [$host=7.2.1.5], [$num=1000]); + Measurement::add_data("test.metric2", [$host=7.2.1.5], [$num=54321]); + + } diff --git a/testing/btest/scripts/base/frameworks/metrics/basic-cluster.bro b/testing/btest/scripts/base/frameworks/metrics/basic-cluster.bro deleted file mode 100644 index c68a4f7beb..0000000000 --- a/testing/btest/scripts/base/frameworks/metrics/basic-cluster.bro +++ /dev/null @@ -1,88 +0,0 @@ -# @TEST-SERIALIZE: comm -# -# @TEST-EXEC: btest-bg-run manager-1 BROPATH=$BROPATH:.. CLUSTER_NODE=manager-1 bro %INPUT -# @TEST-EXEC: btest-bg-run proxy-1 BROPATH=$BROPATH:.. CLUSTER_NODE=proxy-1 bro %INPUT -# @TEST-EXEC: sleep 1 -# @TEST-EXEC: btest-bg-run worker-1 BROPATH=$BROPATH:.. CLUSTER_NODE=worker-1 bro %INPUT -# @TEST-EXEC: btest-bg-run worker-2 BROPATH=$BROPATH:.. CLUSTER_NODE=worker-2 bro %INPUT -# @TEST-EXEC: btest-bg-wait 15 -# @TEST-EXEC: btest-diff manager-1/metrics.log - -@TEST-START-FILE cluster-layout.bro -redef Cluster::nodes = { - ["manager-1"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=37757/tcp, $workers=set("worker-1", "worker-2")], - ["proxy-1"] = [$node_type=Cluster::PROXY, $ip=127.0.0.1, $p=37758/tcp, $manager="manager-1", $workers=set("worker-1", "worker-2")], - ["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=37760/tcp, $manager="manager-1", $proxy="proxy-1", $interface="eth0"], - ["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=37761/tcp, $manager="manager-1", $proxy="proxy-1", $interface="eth1"], -}; -@TEST-END-FILE - -redef Log::default_rotation_interval = 0secs; - -global n = 0; - -event bro_init() &priority=5 - { - Metrics::add_filter("test.metric", - [$every=3secs, - $measure=set(Metrics::SUM, Metrics::MIN, Metrics::MAX, Metrics::AVG, Metrics::STD_DEV, Metrics::VARIANCE, Metrics::UNIQUE), - $period_finished(ts: time, metric_name: string, filter_name: string, data: Metrics::MetricTable) = - { - Metrics::write_log(ts, metric_name, filter_name, data); - if ( ++n == 3 ) - { - terminate_communication(); - terminate(); - } - } - ]); - } - -event remote_connection_closed(p: event_peer) - { - terminate(); - } - -global ready_for_data: event(); - -redef Cluster::manager2worker_events += /ready_for_data/; - -@if ( Cluster::local_node_type() == Cluster::WORKER ) - -event ready_for_data() - { - if ( Cluster::node == "worker-1" ) - { - Metrics::add_data("test.metric", [$host=1.2.3.4], [$num=34]); - Metrics::add_data("test.metric", [$host=1.2.3.4], [$num=30]); - Metrics::add_data("test.metric", [$host=6.5.4.3], [$num=1]); - Metrics::add_data("test.metric", [$host=7.2.1.5], [$num=54]); - } - if ( Cluster::node == "worker-2" ) - { - Metrics::add_data("test.metric", [$host=1.2.3.4], [$num=75]); - Metrics::add_data("test.metric", [$host=1.2.3.4], [$num=30]); - Metrics::add_data("test.metric", [$host=1.2.3.4], [$num=3]); - Metrics::add_data("test.metric", [$host=1.2.3.4], [$num=57]); - Metrics::add_data("test.metric", [$host=1.2.3.4], [$num=52]); - Metrics::add_data("test.metric", [$host=1.2.3.4], [$num=61]); - Metrics::add_data("test.metric", [$host=1.2.3.4], [$num=95]); - Metrics::add_data("test.metric", [$host=6.5.4.3], [$num=5]); - Metrics::add_data("test.metric", [$host=7.2.1.5], [$num=91]); - } - } - -@endif - -@if ( Cluster::local_node_type() == Cluster::MANAGER ) - -global peer_count = 0; - -event remote_connection_handshake_done(p: event_peer) - { - ++peer_count; - if ( peer_count == 3 ) - event ready_for_data(); - } - -@endif diff --git a/testing/btest/scripts/base/frameworks/metrics/basic.bro b/testing/btest/scripts/base/frameworks/metrics/basic.bro deleted file mode 100644 index e665f2ea5c..0000000000 --- a/testing/btest/scripts/base/frameworks/metrics/basic.bro +++ /dev/null @@ -1,20 +0,0 @@ -# @TEST-EXEC: bro %INPUT -# @TEST-EXEC: btest-diff metrics.log - -event bro_init() &priority=5 - { - Metrics::add_filter("test.metric", - [$name="foo-bar", - $every=3secs, - $measure=set(Metrics::SUM, Metrics::VARIANCE, Metrics::AVG, Metrics::MAX, Metrics::MIN, Metrics::STD_DEV), - $period_finished=Metrics::write_log]); - - Metrics::add_data("test.metric", [$host=1.2.3.4], [$num=5]); - Metrics::add_data("test.metric", [$host=1.2.3.4], [$num=22]); - Metrics::add_data("test.metric", [$host=1.2.3.4], [$num=94]); - Metrics::add_data("test.metric", [$host=1.2.3.4], [$num=50]); - Metrics::add_data("test.metric", [$host=1.2.3.4], [$num=50]); - - Metrics::add_data("test.metric", [$host=6.5.4.3], [$num=2]); - Metrics::add_data("test.metric", [$host=7.2.1.5], [$num=1]); - } diff --git a/testing/btest/scripts/base/frameworks/metrics/thresholding.bro b/testing/btest/scripts/base/frameworks/metrics/thresholding.bro deleted file mode 100644 index f39443fc2a..0000000000 --- a/testing/btest/scripts/base/frameworks/metrics/thresholding.bro +++ /dev/null @@ -1,44 +0,0 @@ -# @TEST-EXEC: bro %INPUT -# @TEST-EXEC: btest-diff .stdout - - -redef enum Notice::Type += { - Test_Notice, -}; - -event bro_init() &priority=5 - { - Metrics::add_filter("test.metric", - [$name="foobar", - $every=3secs, - $measure=set(Metrics::SUM), - $threshold=5, - $threshold_crossed(index: Metrics::Index, val: Metrics::ResultVal) = { - print fmt("THRESHOLD: hit a threshold value at %.0f for %s", val$sum, Metrics::index2str(index)); - }]); - - Metrics::add_filter("test.metric", - [$name="foobar2", - $every=3secs, - $measure=set(Metrics::SUM), - $threshold_series=vector(3,6,800), - $threshold_crossed(index: Metrics::Index, val: Metrics::ResultVal) = { - print fmt("THRESHOLD_SERIES: hit a threshold series value at %.0f for %s", val$sum, Metrics::index2str(index)); - }]); - Metrics::add_filter("test.metric", - [$every=3secs, - $measure=set(Metrics::SUM), - $threshold_func(index: Metrics::Index, val: Metrics::ResultVal) = { - # This causes any data added to be cross the threshold. - return T; - }, - $threshold_crossed(index: Metrics::Index, val: Metrics::ResultVal) = { - print fmt("THRESHOLD_FUNC: hit a threshold function value at %.0f for %s", val$sum, Metrics::index2str(index)); - }]); - - Metrics::add_data("test.metric", [$host=1.2.3.4], [$num=3]); - Metrics::add_data("test.metric", [$host=6.5.4.3], [$num=2]); - Metrics::add_data("test.metric", [$host=7.2.1.5], [$num=1]); - Metrics::add_data("test.metric", [$host=1.2.3.4], [$num=3]); - Metrics::add_data("test.metric", [$host=7.2.1.5], [$num=1000]); - }