diff --git a/doc/scripts/DocSourcesList.cmake b/doc/scripts/DocSourcesList.cmake index 117430223e..4e957d03a0 100644 --- a/doc/scripts/DocSourcesList.cmake +++ b/doc/scripts/DocSourcesList.cmake @@ -46,9 +46,9 @@ rest_target(${psd} base/frameworks/logging/writers/ascii.bro) rest_target(${psd} base/frameworks/logging/writers/dataseries.bro) rest_target(${psd} base/frameworks/logging/writers/elasticsearch.bro) rest_target(${psd} base/frameworks/logging/writers/none.bro) -rest_target(${psd} base/frameworks/metrics/cluster.bro) -rest_target(${psd} base/frameworks/metrics/main.bro) -rest_target(${psd} base/frameworks/metrics/non-cluster.bro) +rest_target(${psd} base/frameworks/measurement/cluster.bro) +rest_target(${psd} base/frameworks/measurement/main.bro) +rest_target(${psd} base/frameworks/measurement/non-cluster.bro) rest_target(${psd} base/frameworks/notice/actions/add-geodata.bro) rest_target(${psd} base/frameworks/notice/actions/drop.bro) rest_target(${psd} base/frameworks/notice/actions/email_admin.bro) @@ -103,6 +103,7 @@ rest_target(${psd} base/utils/files.bro) rest_target(${psd} base/utils/numbers.bro) rest_target(${psd} base/utils/paths.bro) rest_target(${psd} base/utils/patterns.bro) +rest_target(${psd} base/utils/queue.bro) rest_target(${psd} base/utils/site.bro) rest_target(${psd} base/utils/strings.bro) rest_target(${psd} base/utils/thresholds.bro) @@ -130,13 +131,18 @@ rest_target(${psd} policy/integration/barnyard2/main.bro) rest_target(${psd} policy/integration/barnyard2/types.bro) rest_target(${psd} policy/integration/collective-intel/main.bro) rest_target(${psd} policy/misc/analysis-groups.bro) +rest_target(${psd} policy/misc/app-metrics.bro) rest_target(${psd} policy/misc/capture-loss.bro) +rest_target(${psd} policy/misc/detect-traceroute/main.bro) rest_target(${psd} policy/misc/loaded-scripts.bro) rest_target(${psd} policy/misc/profiling.bro) rest_target(${psd} policy/misc/stats.bro) rest_target(${psd} policy/misc/trim-trace-file.bro) +rest_target(${psd} policy/protocols/conn/conn-stats-per-host.bro) rest_target(${psd} policy/protocols/conn/known-hosts.bro) rest_target(${psd} policy/protocols/conn/known-services.bro) +rest_target(${psd} policy/protocols/conn/metrics.bro) +rest_target(${psd} policy/protocols/conn/scan.bro) rest_target(${psd} policy/protocols/conn/weirds.bro) rest_target(${psd} policy/protocols/dns/auth-addl.bro) rest_target(${psd} policy/protocols/dns/detect-external-names.bro) @@ -154,6 +160,7 @@ rest_target(${psd} policy/protocols/modbus/known-masters-slaves.bro) rest_target(${psd} policy/protocols/modbus/track-memmap.bro) rest_target(${psd} policy/protocols/smtp/blocklists.bro) rest_target(${psd} policy/protocols/smtp/detect-suspicious-orig.bro) +rest_target(${psd} policy/protocols/smtp/metrics.bro) rest_target(${psd} policy/protocols/smtp/software.bro) rest_target(${psd} policy/protocols/ssh/detect-bruteforcing.bro) rest_target(${psd} policy/protocols/ssh/geo-data.bro) diff --git a/scripts/base/frameworks/metrics/__load__.bro b/scripts/base/frameworks/measurement/__load__.bro similarity index 93% rename from scripts/base/frameworks/metrics/__load__.bro rename to scripts/base/frameworks/measurement/__load__.bro index 35f6b30fb5..c2b77e706a 100644 --- a/scripts/base/frameworks/metrics/__load__.bro +++ b/scripts/base/frameworks/measurement/__load__.bro @@ -1,4 +1,5 @@ @load ./main +@load ./plugins # The cluster framework must be loaded first. @load base/frameworks/cluster diff --git a/scripts/base/frameworks/measurement/cluster.bro b/scripts/base/frameworks/measurement/cluster.bro new file mode 100644 index 0000000000..481b306417 --- /dev/null +++ b/scripts/base/frameworks/measurement/cluster.bro @@ -0,0 +1,316 @@ +##! This implements transparent cluster support for the metrics framework. +##! Do not load this file directly. It's only meant to be loaded automatically +##! and will be depending on if the cluster framework has been enabled. +##! The goal of this script is to make metric calculation completely and +##! transparently automated when running on a cluster. + +@load base/frameworks/cluster +@load ./main + +module Measurement; + +export { + ## Allows a user to decide how large of result groups the + ## workers should transmit values for cluster metric aggregation. + const cluster_send_in_groups_of = 50 &redef; + + ## The percent of the full threshold value that needs to be met + ## on a single worker for that worker to send the value to its manager in + ## order for it to request a global view for that value. There is no + ## requirement that the manager requests a global view for the key + ## since it may opt not to if it requested a global view for the key + ## recently. + const cluster_request_global_view_percent = 0.2 &redef; + + ## This is to deal with intermediate update overload. A manager will only allow + ## this many intermediate update requests to the workers to be inflight at + ## any given time. Requested intermediate updates are currently thrown out + ## and not performed. In practice this should hopefully have a minimal effect. + const max_outstanding_global_views = 10 &redef; + + ## Intermediate updates can cause overload situations on very large clusters. + ## This option may help reduce load and correct intermittent problems. + ## The goal for this option is also meant to be temporary. + const enable_intermediate_updates = T &redef; + + ## Event sent by the manager in a cluster to initiate the + ## collection of metrics values for a measurement. + global cluster_measurement_request: event(uid: string, mid: string); + + ## Event sent by nodes that are collecting metrics after receiving + ## a request for the metric measurement from the manager. + global cluster_measurement_response: event(uid: string, mid: string, data: ResultTable, done: bool); + + ## This event is sent by the manager in a cluster to initiate the + ## collection of a single key value from a measurement. It's typically + ## used to get intermediate updates before the break interval triggers + ## to speed detection of a value crossing a threshold. + global cluster_key_request: event(uid: string, mid: string, key: Key); + + ## This event is sent by nodes in response to a + ## :bro:id:`Measurement::cluster_key_request` event. + global cluster_key_response: event(uid: string, mid: string, key: Key, result: Result); + + ## This is sent by workers to indicate that they crossed the percent of the + ## current threshold by the percentage defined globally in + ## :bro:id:`Measurement::cluster_request_global_view_percent` + global cluster_key_intermediate_response: event(mid: string, key: Measurement::Key); + + ## This event is scheduled internally on workers to send result chunks. + global send_data: event(uid: string, mid: string, data: ResultTable); +} + +# Add events to the cluster framework to make this work. +redef Cluster::manager2worker_events += /Measurement::cluster_(measurement_request|key_request)/; +redef Cluster::manager2worker_events += /Measurement::new_measurement/; +redef Cluster::worker2manager_events += /Measurement::cluster_(measurement_response|key_response|key_intermediate_response)/; + +@if ( Cluster::local_node_type() != Cluster::MANAGER ) +# This variable is maintained to know what keys have recently sent as +# intermediate updates so they don't overwhelm their manager. The count that is +# yielded is the number of times the percentage threshold has been crossed and +# an intermediate result has been received. +global recent_global_view_keys: table[string, Key] of count &create_expire=1min &default=0; + +event bro_init() &priority=-100 + { + # The manager is the only host allowed to track these. + measurement_store = table(); + reducer_store = table(); + } + +# This is done on all non-manager node types in the event that a metric is +# being collected somewhere other than a worker. +function data_added(m: Measurement, key: Key, result: Result) + { + # If an intermediate update for this value was sent recently, don't send + # it again. + if ( [m$id, key] in recent_global_view_keys ) + return; + + # If val is 5 and global view % is 0.1 (10%), pct_val will be 50. If that + # crosses the full threshold then it's a candidate to send as an + # intermediate update. + if ( enable_intermediate_updates && + check_thresholds(m, key, result, cluster_request_global_view_percent) ) + { + # kick off intermediate update + event Measurement::cluster_key_intermediate_response(m$id, key); + ++recent_global_view_keys[m$id, key]; + } + } + +event Measurement::send_data(uid: string, mid: string, data: ResultTable) + { + #print fmt("WORKER %s: sending data for uid %s...", Cluster::node, uid); + + local local_data: ResultTable = table(); + local num_added = 0; + for ( key in data ) + { + local_data[key] = data[key]; + delete data[key]; + + # Only send cluster_send_in_groups_of at a time. Queue another + # event to send the next group. + if ( cluster_send_in_groups_of == ++num_added ) + break; + } + + local done = F; + # If data is empty, this metric is done. + if ( |data| == 0 ) + done = T; + + event Measurement::cluster_measurement_response(uid, mid, local_data, done); + if ( ! done ) + schedule 0.01 sec { Measurement::send_data(uid, mid, data) }; + } + +event Measurement::cluster_measurement_request(uid: string, mid: string) + { + #print fmt("WORKER %s: received the cluster_measurement_request event for %s.", Cluster::node, id); + + # Initiate sending all of the data for the requested measurement. + event Measurement::send_data(uid, mid, result_store[mid]); + + # Lookup the actual measurement and reset it, the reference to the data + # currently stored will be maintained internally by the send_data event. + if ( mid in measurement_store ) + reset(measurement_store[mid]); + } + +event Measurement::cluster_key_request(uid: string, mid: string, key: Key) + { + if ( mid in result_store && key in result_store[mid] ) + { + #print fmt("WORKER %s: received the cluster_key_request event for %s=%s.", Cluster::node, key2str(key), data); + event Measurement::cluster_key_response(uid, mid, key, result_store[mid][key]); + } + else + { + # We need to send an empty response if we don't have the data so that the manager + # can know that it heard back from all of the workers. + event Measurement::cluster_key_response(uid, mid, key, table()); + } + } + +@endif + + +@if ( Cluster::local_node_type() == Cluster::MANAGER ) + +# This variable is maintained by manager nodes as they collect and aggregate +# results. +# Index on a uid. +global measurement_results: table[string] of ResultTable &read_expire=1min; + +# This variable is maintained by manager nodes to track how many "dones" they +# collected per collection unique id. Once the number of results for a uid +# matches the number of peer nodes that results should be coming from, the +# result is written out and deleted from here. +# Indexed on a uid. +# TODO: add an &expire_func in case not all results are received. +global done_with: table[string] of count &read_expire=1min &default=0; + +# This variable is maintained by managers to track intermediate responses as +# they are getting a global view for a certain key. +# Indexed on a uid. +global key_requests: table[string] of Result &read_expire=1min; + +# This variable is maintained by managers to prevent overwhelming communication due +# to too many intermediate updates. Each measurement is tracked separately so that +# one won't overwhelm and degrade other quieter measurements. +# Indexed on a measurement id. +global outstanding_global_views: table[string] of count &default=0; + +const zero_time = double_to_time(0.0); +# Managers handle logging. +event Measurement::finish_epoch(m: Measurement) + { + if ( network_time() > zero_time ) + { + #print fmt("%.6f MANAGER: breaking %s measurement for %s metric", network_time(), measurement$name, measurement$id); + local uid = unique_id(""); + + if ( uid in measurement_results ) + delete measurement_results[uid]; + measurement_results[uid] = table(); + + # Request data from peers. + event Measurement::cluster_measurement_request(uid, m$id); + } + + # Schedule the next finish_epoch event. + schedule m$epoch { Measurement::finish_epoch(m) }; + } + +# This is unlikely to be called often, but it's here in case there are measurements +# being collected by managers. +function data_added(m: Measurement, key: Key, result: Result) + { + if ( check_thresholds(m, key, result, 1.0) ) + threshold_crossed(m, key, result); + } + +event Measurement::cluster_key_response(uid: string, mid: string, key: Key, result: Result) + { + #print fmt("%0.6f MANAGER: receiving key data from %s - %s=%s", network_time(), get_event_peer()$descr, key2str(key), result); + + # We only want to try and do a value merge if there are actually measured datapoints + # in the Result. + if ( uid in key_requests ) + key_requests[uid] = compose_results(key_requests[uid], result); + else + key_requests[uid] = result; + + # Mark that a worker is done. + ++done_with[uid]; + + #print fmt("worker_count:%d :: done_with:%d", Cluster::worker_count, done_with[uid]); + if ( Cluster::worker_count == done_with[uid] ) + { + local m = measurement_store[mid]; + local ir = key_requests[uid]; + if ( check_thresholds(m, key, ir, 1.0) ) + threshold_crossed(m, key, ir); + + delete done_with[uid]; + delete key_requests[uid]; + # Check that there is an outstanding view before subtracting. + if ( outstanding_global_views[mid] > 0 ) + --outstanding_global_views[mid]; + } + } + +# Managers handle intermediate updates here. +event Measurement::cluster_key_intermediate_response(mid: string, key: Key) + { + #print fmt("MANAGER: receiving intermediate key data from %s", get_event_peer()$descr); + #print fmt("MANAGER: requesting key data for %s", key2str(key)); + + if ( mid in outstanding_global_views && + |outstanding_global_views[mid]| > max_outstanding_global_views ) + { + # Don't do this intermediate update. Perhaps at some point in the future + # we will queue and randomly select from these ignored intermediate + # update requests. + return; + } + + ++outstanding_global_views[mid]; + + local uid = unique_id(""); + event Measurement::cluster_key_request(uid, mid, key); + } + +event Measurement::cluster_measurement_response(uid: string, mid: string, data: ResultTable, done: bool) + { + #print fmt("MANAGER: receiving results from %s", get_event_peer()$descr); + + # Mark another worker as being "done" for this uid. + if ( done ) + ++done_with[uid]; + + local local_data = measurement_results[uid]; + local m = measurement_store[mid]; + + for ( key in data ) + { + if ( key in local_data ) + local_data[key] = compose_results(local_data[key], data[key]); + else + local_data[key] = data[key]; + + # If a measurement is done being collected, thresholds for each key + # need to be checked so we're doing it here to avoid doubly iterating + # over each key. + if ( Cluster::worker_count == done_with[uid] ) + { + if ( check_thresholds(m, key, local_data[key], 1.0) ) + { + threshold_crossed(m, key, local_data[key]); + } + } + } + + # If the data has been collected from all peers, we are done and ready to finish. + if ( Cluster::worker_count == done_with[uid] ) + { + if ( m?$epoch_finished ) + m$epoch_finished(local_data); + + # Clean up + delete measurement_results[uid]; + delete done_with[uid]; + # Not sure I need to reset the measurement on the manager. + reset(m); + } + } + +event remote_connection_handshake_done(p: event_peer) &priority=5 + { + send_id(p, "Measurement::measurement_store"); + send_id(p, "Measurement::reducer_store"); + } +@endif diff --git a/scripts/base/frameworks/measurement/main.bro b/scripts/base/frameworks/measurement/main.bro new file mode 100644 index 0000000000..f649dbe1f2 --- /dev/null +++ b/scripts/base/frameworks/measurement/main.bro @@ -0,0 +1,391 @@ +##! The metrics framework provides a way to count and measure data. + +@load base/utils/queue + +module Measurement; + +export { + ## The various calculations are all defined as plugins. + type Calculation: enum { + PLACEHOLDER + }; + + ## Represents a thing which is having measurement results collected for it. + type Key: record { + ## A non-address related metric or a sub-key for an address based metric. + ## An example might be successful SSH connections by client IP address + ## where the client string would be the key value. + ## Another example might be number of HTTP requests to a particular + ## value in a Host header. This is an example of a non-host based + ## metric since multiple IP addresses could respond for the same Host + ## header value. + str: string &optional; + + ## Host is the value to which this metric applies. + host: addr &optional; + }; + + ## Represents data being added for a single metric data point. + ## Only supply a single value here at a time. + type DataPoint: record { + ## Count value. + num: count &optional; + ## Double value. + dbl: double &optional; + ## String value. + str: string &optional; + }; + + type Reducer: record { + ## Data stream identifier for the reducer to attach to. + stream: string; + + ## The calculations to perform on the data points. + apply: set[Calculation]; + + ## A predicate so that you can decide per key if you would like + ## to accept the data being inserted. + pred: function(key: Measurement::Key, point: Measurement::DataPoint): bool &optional; + + ## A function to normalize the key. This can be used to aggregate or + ## normalize the entire key. + normalize_key: function(key: Measurement::Key): Key &optional; + }; + + ## Value calculated for a data point stream fed into a reducer. + ## Most of the fields are added by plugins. + type ResultVal: record { + ## The time when the first data point was added to this result value. + begin: time; + + ## The time when the last data point was added to this result value. + end: time; + + ## The number of measurements received. + num: count &default=0; + }; + + ## Type to store results for multiple reducers. + type Result: table[string] of ResultVal; + + ## Type to store a table of measurement results indexed by the measurement key. + type ResultTable: table[Key] of Result; + + ## Measurements represent an aggregation of reducers along with + ## mechanisms to handle various situations like the epoch ending + ## or thresholds being crossed. + type Measurement: record { + ## The interval at which this filter should be "broken" and the + ## '$epoch_finished' callback called. The results are also reset + ## at this time so any threshold based detection needs to be set to a + ## number that should be expected to happen within this epoch. + epoch: interval; + + ## The reducers for the measurement indexed by data id. + reducers: set[Reducer]; + + ## Provide a function to calculate a value from the :bro:see:`Result` + ## structure which will be used for thresholding. + threshold_val: function(key: Measurement::Key, result: Measurement::Result): count &optional; + + ## The threshold value for calling the $threshold_crossed callback. + threshold: count &optional; + + ## A series of thresholds for calling the $threshold_crossed callback. + threshold_series: vector of count &optional; + + ## A callback that is called when a threshold is crossed. + threshold_crossed: function(key: Measurement::Key, result: Measurement::Result) &optional; + + ## A callback with the full collection of Results for this filter. + ## It's best to not access any global state outside of the variables + ## given to the callback because there is no assurance provided as to + ## where the callback will be executed on clusters. + epoch_finished: function(rt: Measurement::ResultTable) &optional; + }; + + ## Create a measurement. + global create: function(m: Measurement::Measurement); + + ## Add data into a data point stream. This should be called when + ## a script has measured some point value. + ## + ## id: The stream identifier that the data point represents. + ## + ## key: The measurement key that the value is to be added to. + ## + ## point: The data point to send into the stream. + global add_data: function(id: string, key: Measurement::Key, point: Measurement::DataPoint); + + ## Helper function to represent a :bro:type:`Measurement::Key` value as + ## a simple string. + ## + ## key: The metric key that is to be converted into a string. + ## + ## Returns: A string representation of the metric key. + global key2str: function(key: Measurement::Key): string; + + ## This event is generated for each new measurement that is created. + ## + ## m: The record which describes a measurement. + global new_measurement: event(m: Measurement); +} + +redef record Reducer += { + # Internal use only. Provides a reference back to the related Measurement by it's ID. + mid: string &optional; +}; + +type Thresholding: record { + # Internal use only. Indicates if a simple threshold was already crossed. + is_threshold_crossed: bool &default=F; + + # Internal use only. Current key for threshold series. + threshold_series_index: count &default=0; +}; + +redef record Measurement += { + # Internal use only (mostly for cluster coherency). + id: string &optional; + + # Internal use only. For tracking tresholds per key. + threshold_tracker: table[Key] of Thresholding &optional; +}; + +# Store of measurements indexed on the measurement id. +global measurement_store: table[string] of Measurement = table(); + +# Store of reducers indexed on the data point stream id. +global reducer_store: table[string] of set[Reducer] = table(); + +# Store of results indexed on the measurement id. +global result_store: table[string] of ResultTable = table(); + +# Store of threshold information. +global thresholds_store: table[string, Key] of bool = table(); + +# This is called whenever +# key values are updated and the new val is given as the `val` argument. +# It's only prototyped here because cluster and non-cluster have separate +# implementations. +global data_added: function(m: Measurement, key: Key, result: Result); + +# Prototype the hook point for plugins to do calculations. +global add_to_reducer_hook: hook(r: Reducer, val: double, data: DataPoint, rv: ResultVal); +# Prototype the hook point for plugins to initialize any result values. +global init_resultval_hook: hook(r: Reducer, rv: ResultVal); +# Prototype the hook point for plugins to merge Results. +global compose_resultvals_hook: hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal); + +# Event that is used to "finish" measurements and adapt the measurement +# framework for clustered or non-clustered usage. +global finish_epoch: event(m: Measurement); + +function key2str(key: Key): string + { + local out = ""; + if ( key?$host ) + out = fmt("%shost=%s", out, key$host); + if ( key?$str ) + out = fmt("%s%sstr=%s", out, |out|==0 ? "" : ", ", key$str); + return fmt("measurement_key(%s)", out); + } + +function init_resultval(r: Reducer): ResultVal + { + local rv: ResultVal = [$begin=network_time(), $end=network_time()]; + hook init_resultval_hook(r, rv); + return rv; + } + +function compose_resultvals(rv1: ResultVal, rv2: ResultVal): ResultVal + { + local result: ResultVal; + + # Merge $begin (take the earliest one) + result$begin = (rv1$begin < rv2$begin) ? rv1$begin : rv2$begin; + + # Merge $end (take the latest one) + result$end = (rv1$end > rv2$end) ? rv1$end : rv2$end; + + # Merge $num + result$num = rv1$num + rv2$num; + + hook compose_resultvals_hook(result, rv1, rv2); + + return result; + } + +function compose_results(r1: Result, r2: Result): Result + { + local result: Result = table(); + + if ( |r1| > |r2| ) + { + for ( data_id in r1 ) + { + if ( data_id in r2 ) + result[data_id] = compose_resultvals(r1[data_id], r2[data_id]); + else + result[data_id] = r1[data_id]; + } + } + else + { + for ( data_id in r2 ) + { + if ( data_id in r1 ) + result[data_id] = compose_resultvals(r1[data_id], r2[data_id]); + else + result[data_id] = r2[data_id]; + } + } + + return result; + } + + +function reset(m: Measurement) + { + if ( m$id in result_store ) + delete result_store[m$id]; + + result_store[m$id] = table(); + } + +function create(m: Measurement) + { + if ( (m?$threshold || m?$threshold_series) && ! m?$threshold_val ) + { + Reporter::error("Measurement given a threshold with no $threshold_val function"); + } + + if ( ! m?$id ) + m$id=unique_id(""); + local tmp: table[Key] of Thresholding = table(); + m$threshold_tracker = tmp; + measurement_store[m$id] = m; + + for ( reducer in m$reducers ) + { + reducer$mid = m$id; + if ( reducer$stream !in reducer_store ) + reducer_store[reducer$stream] = set(); + add reducer_store[reducer$stream][reducer]; + } + + reset(m); + schedule m$epoch { Measurement::finish_epoch(m) }; + } + +function add_data(id: string, key: Key, point: DataPoint) + { + # Try to add the data to all of the defined reducers. + if ( id !in reducer_store ) + return; + + for ( r in reducer_store[id] ) + { + # If this reducer has a predicate, run the predicate + # and skip this key if the predicate return false. + if ( r?$pred && ! r$pred(key, point) ) + next; + + if ( r?$normalize_key ) + key = r$normalize_key(copy(key)); + + local m = measurement_store[r$mid]; + local results = result_store[m$id]; + if ( key !in results ) + results[key] = table(); + if ( id !in results[key] ) + results[key][id] = init_resultval(r); + + local result = results[key]; + local result_val = result[id]; + ++result_val$num; + # Continually update the $end field. + result_val$end=network_time(); + + # If a string was given, fall back to 1.0 as the value. + local val = 1.0; + if ( point?$num || point?$dbl ) + val = point?$dbl ? point$dbl : point$num; + + hook add_to_reducer_hook(r, val, point, result_val); + data_added(m, key, result); + } + } + +# This function checks if a threshold has been crossed. It is also used as a method to implement +# mid-break-interval threshold crossing detection for cluster deployments. +function check_thresholds(m: Measurement, key: Key, result: Result, modify_pct: double): bool + { + if ( ! (m?$threshold || m?$threshold_series) ) + return F; + + if ( key !in m$threshold_tracker ) + { + local tmp: Thresholding; + m$threshold_tracker[key] = tmp; + } + + # Add in the extra ResultVals to make threshold_vals easier to write. + if ( |m$reducers| != |result| ) + { + for ( reducer in m$reducers ) + { + if ( reducer$stream !in result ) + result[reducer$stream] = init_resultval(reducer); + } + } + + local watch = m$threshold_val(key, result); + + if ( modify_pct < 1.0 && modify_pct > 0.0 ) + watch = double_to_count(floor(watch/modify_pct)); + + local tt = m$threshold_tracker[key]; + if ( m?$threshold && ! tt$is_threshold_crossed && watch >= m$threshold ) + { + # Value crossed the threshold. + return T; + } + + if ( m?$threshold_series && + |m$threshold_series| >= tt$threshold_series_index && + watch >= m$threshold_series[tt$threshold_series_index] ) + { + # A threshold series was given and the value crossed the next + # value in the series. + return T; + } + + return F; + } + +function threshold_crossed(m: Measurement, key: Key, result: Result) + { + # If there is no callback, there is no point in any of this. + if ( ! m?$threshold_crossed ) + return; + + # Add in the extra ResultVals to make threshold_crossed callbacks easier to write. + if ( |m$reducers| != |result| ) + { + for ( reducer in m$reducers ) + { + if ( reducer$stream !in result ) + result[reducer$stream] = init_resultval(reducer); + } + } + + m$threshold_crossed(key, result); + local tt = m$threshold_tracker[key]; + tt$is_threshold_crossed = T; + + # Bump up to the next threshold series index if a threshold series is being used. + if ( m?$threshold_series ) + ++tt$threshold_series_index; + } + diff --git a/scripts/base/frameworks/measurement/non-cluster.bro b/scripts/base/frameworks/measurement/non-cluster.bro new file mode 100644 index 0000000000..35ff9dc935 --- /dev/null +++ b/scripts/base/frameworks/measurement/non-cluster.bro @@ -0,0 +1,24 @@ +@load ./main + +module Measurement; + +event Measurement::finish_epoch(m: Measurement) + { + if ( m$id in result_store ) + { + local data = result_store[m$id]; + if ( m?$epoch_finished ) + m$epoch_finished(data); + + reset(m); + } + + schedule m$epoch { Measurement::finish_epoch(m) }; + } + + +function data_added(m: Measurement, key: Key, result: Result) + { + if ( check_thresholds(m, key, result, 1.0) ) + threshold_crossed(m, key, result); + } diff --git a/scripts/base/frameworks/measurement/plugins/__load__.bro b/scripts/base/frameworks/measurement/plugins/__load__.bro new file mode 100644 index 0000000000..0d4c2ed302 --- /dev/null +++ b/scripts/base/frameworks/measurement/plugins/__load__.bro @@ -0,0 +1,8 @@ +@load ./average +@load ./max +@load ./min +@load ./sample +@load ./std-dev +@load ./sum +@load ./unique +@load ./variance \ No newline at end of file diff --git a/scripts/base/frameworks/measurement/plugins/average.bro b/scripts/base/frameworks/measurement/plugins/average.bro new file mode 100644 index 0000000000..172e8c788d --- /dev/null +++ b/scripts/base/frameworks/measurement/plugins/average.bro @@ -0,0 +1,35 @@ + +module Measurement; + +export { + redef enum Calculation += { + ## Calculate the average of the values. + AVERAGE + }; + + redef record ResultVal += { + ## For numeric data, this calculates the average of all values. + average: double &optional; + }; +} + +hook add_to_reducer_hook(r: Reducer, val: double, data: DataPoint, rv: ResultVal) + { + if ( AVERAGE in r$apply ) + { + if ( ! rv?$average ) + rv$average = val; + else + rv$average += (val - rv$average) / rv$num; + } + } + +hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal) + { + if ( rv1?$average && rv2?$average ) + result$average = ((rv1$average*rv1$num) + (rv2$average*rv2$num))/(rv1$num+rv2$num); + else if ( rv1?$average ) + result$average = rv1$average; + else if ( rv2?$average ) + result$average = rv2$average; + } \ No newline at end of file diff --git a/scripts/base/frameworks/measurement/plugins/max.bro b/scripts/base/frameworks/measurement/plugins/max.bro new file mode 100644 index 0000000000..02b536f849 --- /dev/null +++ b/scripts/base/frameworks/measurement/plugins/max.bro @@ -0,0 +1,37 @@ + +module Measurement; + +export { + redef enum Calculation += { + ## Find the maximum value. + MAX + }; + + redef record ResultVal += { + ## For numeric data, this tracks the maximum value given. + max: double &optional; + }; +} + +hook add_to_reducer_hook(r: Reducer, val: double, data: DataPoint, rv: ResultVal) + { + if ( MAX in r$apply ) + { + if ( ! rv?$max ) + rv$max = val; + else if ( val > rv$max ) + rv$max = val; + } + } + +hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal) + { + if ( rv1?$max && rv2?$max ) + result$max = (rv1$max > rv2$max) ? rv1$max : rv2$max; + else if ( rv1?$max ) + result$max = rv1$max; + else if ( rv2?$max ) + result$max = rv2$max; + } + + diff --git a/scripts/base/frameworks/measurement/plugins/min.bro b/scripts/base/frameworks/measurement/plugins/min.bro new file mode 100644 index 0000000000..944ee9fcb4 --- /dev/null +++ b/scripts/base/frameworks/measurement/plugins/min.bro @@ -0,0 +1,35 @@ + +module Measurement; + +export { + redef enum Calculation += { + ## Find the minimum value. + MIN + }; + + redef record ResultVal += { + ## For numeric data, this tracks the minimum value given. + min: double &optional; + }; +} + +hook add_to_reducer_hook(r: Reducer, val: double, data: DataPoint, rv: ResultVal) + { + if ( MIN in r$apply ) + { + if ( ! rv?$min ) + rv$min = val; + else if ( val < rv$min ) + rv$min = val; + } + } + +hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal) + { + if ( rv1?$min && rv2?$min ) + result$min = (rv1$min < rv2$min) ? rv1$min : rv2$min; + else if ( rv1?$min ) + result$min = rv1$min; + else if ( rv2?$min ) + result$min = rv2$min; + } \ No newline at end of file diff --git a/scripts/base/frameworks/measurement/plugins/sample.bro b/scripts/base/frameworks/measurement/plugins/sample.bro new file mode 100644 index 0000000000..018b7c9652 --- /dev/null +++ b/scripts/base/frameworks/measurement/plugins/sample.bro @@ -0,0 +1,49 @@ +@load base/utils/queue + +module Measurement; + +export { + + redef record Reducer += { + ## A number of sample DataPoints to collect. + samples: count &default=0; + }; + + redef record ResultVal += { + ## A sample of something being measured. This is helpful in + ## some cases for collecting information to do further detection + ## or better logging for forensic purposes. + samples: vector of Measurement::DataPoint &optional; + }; +} + +redef record ResultVal += { + # Internal use only. This is the queue where samples + # are maintained since the queue is self managing for + # the number of samples requested. + sample_queue: Queue::Queue &optional; +}; + +hook add_to_reducer_hook(r: Reducer, val: double, data: DataPoint, rv: ResultVal) + { + if ( r$samples > 0 ) + { + if ( ! rv?$sample_queue ) + rv$sample_queue = Queue::init([$max_len=r$samples]); + if ( ! rv?$samples ) + rv$samples = vector(); + Queue::put(rv$sample_queue, data); + Queue::get_vector(rv$sample_queue, rv$samples); + } + } + +hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal) + { + # Merge $sample_queue + if ( rv1?$sample_queue && rv2?$sample_queue ) + result$sample_queue = Queue::merge(rv1$sample_queue, rv2$sample_queue); + else if ( rv1?$sample_queue ) + result$sample_queue = rv1$sample_queue; + else if ( rv2?$sample_queue ) + result$sample_queue = rv2$sample_queue; + } \ No newline at end of file diff --git a/scripts/base/frameworks/measurement/plugins/std-dev.bro b/scripts/base/frameworks/measurement/plugins/std-dev.bro new file mode 100644 index 0000000000..bcf2cdcb00 --- /dev/null +++ b/scripts/base/frameworks/measurement/plugins/std-dev.bro @@ -0,0 +1,39 @@ +@load ./sum +@load ./variance + +module Measurement; + +export { + redef enum Calculation += { + ## Find the standard deviation of the values. + STD_DEV + }; + + redef record ResultVal += { + ## For numeric data, this calculates the standard deviation. + std_dev: double &optional; + }; +} + +function calc_std_dev(rv: ResultVal) + { + if ( rv?$variance ) + rv$std_dev = sqrt(rv$variance); + } + +# This depends on the variance plugin which uses priority -5 +hook add_to_reducer_hook(r: Reducer, val: double, data: DataPoint, rv: ResultVal) &priority=-10 + { + if ( STD_DEV in r$apply ) + { + if ( rv?$variance ) + calc_std_dev(rv); + else + rv$std_dev = 0.0; + } + } + +hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal) &priority=-10 + { + calc_std_dev(result); + } diff --git a/scripts/base/frameworks/measurement/plugins/sum.bro b/scripts/base/frameworks/measurement/plugins/sum.bro new file mode 100644 index 0000000000..5a25573870 --- /dev/null +++ b/scripts/base/frameworks/measurement/plugins/sum.bro @@ -0,0 +1,50 @@ + +module Measurement; + +export { + redef enum Calculation += { + ## Sums the values given. For string values, + ## this will be the number of strings given. + SUM + }; + + redef record ResultVal += { + ## For numeric data, this tracks the sum of all values. + sum: double &default=0.0; + }; + + type threshold_function: function(key: Measurement::Key, result: Measurement::Result): count; + global sum_threshold: function(data_id: string): threshold_function; +} + +function sum_threshold(data_id: string): threshold_function + { + return function(key: Measurement::Key, result: Measurement::Result): count + { + print fmt("data_id: %s", data_id); + print result; + return double_to_count(result[data_id]$sum); + }; + } + +hook init_resultval_hook(r: Reducer, rv: ResultVal) + { + if ( SUM in r$apply && ! rv?$sum ) + rv$sum = 0; + } + +hook add_to_reducer_hook(r: Reducer, val: double, data: DataPoint, rv: ResultVal) + { + if ( SUM in r$apply ) + rv$sum += val; + } + +hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal) + { + if ( rv1?$sum || rv2?$sum ) + { + result$sum = rv1?$sum ? rv1$sum : 0; + if ( rv2?$sum ) + result$sum += rv2$sum; + } + } \ No newline at end of file diff --git a/scripts/base/frameworks/measurement/plugins/unique.bro b/scripts/base/frameworks/measurement/plugins/unique.bro new file mode 100644 index 0000000000..7664663d29 --- /dev/null +++ b/scripts/base/frameworks/measurement/plugins/unique.bro @@ -0,0 +1,52 @@ + +module Measurement; + +export { + redef enum Calculation += { + ## Calculate the number of unique values. + UNIQUE + }; + + redef record ResultVal += { + ## If cardinality is being tracked, the number of unique + ## items is tracked here. + unique: count &default=0; + }; +} + +redef record ResultVal += { + # Internal use only. This is not meant to be publically available + # because we don't want to trust that we can inspect the values + # since we will like move to a probalistic data structure in the future. + # TODO: in the future this will optionally be a hyperloglog structure + unique_vals: set[DataPoint] &optional; +}; + +hook add_to_reducer_hook(r: Reducer, val: double, data: DataPoint, rv: ResultVal) + { + if ( UNIQUE in r$apply ) + { + if ( ! rv?$unique_vals ) + rv$unique_vals=set(); + add rv$unique_vals[data]; + rv$unique = |rv$unique_vals|; + } + } + +hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal) + { + if ( rv1?$unique_vals || rv2?$unique_vals ) + { + if ( rv1?$unique_vals ) + result$unique_vals = rv1$unique_vals; + + if ( rv2?$unique_vals ) + if ( ! result?$unique_vals ) + result$unique_vals = rv2$unique_vals; + else + for ( val2 in rv2$unique_vals ) + add result$unique_vals[val2]; + + result$unique = |result$unique_vals|; + } + } \ No newline at end of file diff --git a/scripts/base/frameworks/measurement/plugins/variance.bro b/scripts/base/frameworks/measurement/plugins/variance.bro new file mode 100644 index 0000000000..dc94f39840 --- /dev/null +++ b/scripts/base/frameworks/measurement/plugins/variance.bro @@ -0,0 +1,67 @@ +@load ./average + +module Measurement; + +export { + redef enum Calculation += { + ## Find the variance of the values. + VARIANCE + }; + + redef record ResultVal += { + ## For numeric data, this calculates the variance. + variance: double &optional; + }; +} + +redef record ResultVal += { + # Internal use only. Used for incrementally calculating variance. + prev_avg: double &optional; + + # Internal use only. For calculating incremental variance. + var_s: double &default=0.0; +}; + +function calc_variance(rv: ResultVal) + { + rv$variance = (rv$num > 1) ? rv$var_s/(rv$num-1) : 0.0; + } + +# Reduced priority since this depends on the average +hook add_to_reducer_hook(r: Reducer, val: double, data: DataPoint, rv: ResultVal) &priority=-5 + { + if ( VARIANCE in r$apply ) + { + if ( rv$num > 1 ) + rv$var_s += ((val - rv$prev_avg) * (val - rv$average)); + + calc_variance(rv); + rv$prev_avg = rv$average; + } + } + +# Reduced priority since this depends on the average +hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal) &priority=-5 + { + if ( rv1?$var_s && rv2?$var_s ) + { + local rv1_avg_sq = (rv1$average - result$average); + rv1_avg_sq = rv1_avg_sq*rv1_avg_sq; + local rv2_avg_sq = (rv2$average - result$average); + rv2_avg_sq = rv2_avg_sq*rv2_avg_sq; + result$var_s = rv1$num*(rv1$var_s/rv1$num + rv1_avg_sq) + rv2$num*(rv2$var_s/rv2$num + rv2_avg_sq); + } + else if ( rv1?$var_s ) + result$var_s = rv1$var_s; + else if ( rv2?$var_s ) + result$var_s = rv2$var_s; + + if ( rv1?$prev_avg && rv2?$prev_avg ) + result$prev_avg = ((rv1$prev_avg*rv1$num) + (rv2$prev_avg*rv2$num))/(rv1$num+rv2$num); + else if ( rv1?$prev_avg ) + result$prev_avg = rv1$prev_avg; + else if ( rv2?$prev_avg ) + result$prev_avg = rv2$prev_avg; + + calc_variance(result); + } \ No newline at end of file diff --git a/scripts/base/frameworks/measurement/simple.bro b/scripts/base/frameworks/measurement/simple.bro new file mode 100644 index 0000000000..51bf7e8c44 --- /dev/null +++ b/scripts/base/frameworks/measurement/simple.bro @@ -0,0 +1,6 @@ + +module Metrics; + +export { + +} \ No newline at end of file diff --git a/scripts/base/frameworks/metrics/cluster.bro b/scripts/base/frameworks/metrics/cluster.bro deleted file mode 100644 index 4804bc5005..0000000000 --- a/scripts/base/frameworks/metrics/cluster.bro +++ /dev/null @@ -1,264 +0,0 @@ -##! This implements transparent cluster support for the metrics framework. -##! Do not load this file directly. It's only meant to be loaded automatically -##! and will be depending on if the cluster framework has been enabled. -##! The goal of this script is to make metric calculation completely and -##! transparently automated when running on a cluster. -##! -##! Events defined here are not exported deliberately because they are meant -##! to be an internal implementation detail. - -@load base/frameworks/cluster -@load ./main - -module Metrics; - -export { - ## Allows a user to decide how large of result groups the - ## workers should transmit values for cluster metric aggregation. - const cluster_send_in_groups_of = 50 &redef; - - ## The percent of the full threshold value that needs to be met - ## on a single worker for that worker to send the value to its manager in - ## order for it to request a global view for that value. There is no - ## requirement that the manager requests a global view for the index - ## since it may opt not to if it requested a global view for the index - ## recently. - const cluster_request_global_view_percent = 0.1 &redef; - - ## Event sent by the manager in a cluster to initiate the - ## collection of metrics values for a filter. - global cluster_filter_request: event(uid: string, id: ID, filter_name: string); - - ## Event sent by nodes that are collecting metrics after receiving - ## a request for the metric filter from the manager. - global cluster_filter_response: event(uid: string, id: ID, filter_name: string, data: MetricTable, done: bool); - - ## This event is sent by the manager in a cluster to initiate the - ## collection of a single index value from a filter. It's typically - ## used to get intermediate updates before the break interval triggers - ## to speed detection of a value crossing a threshold. - global cluster_index_request: event(uid: string, id: ID, filter_name: string, index: Index); - - ## This event is sent by nodes in response to a - ## :bro:id:`Metrics::cluster_index_request` event. - global cluster_index_response: event(uid: string, id: ID, filter_name: string, index: Index, val: count); - - ## This is sent by workers to indicate that they crossed the percent of the - ## current threshold by the percentage defined globally in - ## :bro:id:`Metrics::cluster_request_global_view_percent` - global cluster_index_intermediate_response: event(id: Metrics::ID, filter_name: string, index: Metrics::Index, val: count); - - ## This event is scheduled internally on workers to send result chunks. - global send_data: event(uid: string, id: ID, filter_name: string, data: MetricTable); - -} - - -# This is maintained by managers so they can know what data they requested and -# when they requested it. -global requested_results: table[string] of time = table() &create_expire=5mins; - -# TODO: The next 4 variables make the assumption that a value never -# takes longer than 5 minutes to transmit from workers to manager. This needs to -# be tunable or self-tuning. These should also be restructured to be -# maintained within a single variable. - -# This variable is maintained by manager nodes as they collect and aggregate -# results. -global filter_results: table[string, ID, string] of MetricTable &create_expire=5mins; - -# This variable is maintained by manager nodes to track how many "dones" they -# collected per collection unique id. Once the number of results for a uid -# matches the number of peer nodes that results should be coming from, the -# result is written out and deleted from here. -# TODO: add an &expire_func in case not all results are received. -global done_with: table[string] of count &create_expire=5mins &default=0; - -# This variable is maintained by managers to track intermediate responses as -# they are getting a global view for a certain index. -global index_requests: table[string, ID, string, Index] of count &create_expire=5mins &default=0; - -# This variable is maintained by all hosts for different purposes. Non-managers -# maintain it to know what indexes they have recently sent as intermediate -# updates so they don't overwhelm their manager. Managers maintain it so they -# don't overwhelm workers with intermediate index requests. The count that is -# yielded is the number of times the percentage threshold has been crossed and -# an intermediate result has been received. The manager may optionally request -# the index again before data expires from here if too many workers are crossing -# the percentage threshold (not implemented yet!). -global recent_global_view_indexes: table[ID, string, Index] of count &create_expire=5mins &default=0; - -# Add events to the cluster framework to make this work. -redef Cluster::manager2worker_events += /Metrics::cluster_(filter_request|index_request)/; -redef Cluster::worker2manager_events += /Metrics::cluster_(filter_response|index_response|index_intermediate_response)/; - -@if ( Cluster::local_node_type() != Cluster::MANAGER ) -# This is done on all non-manager node types in the event that a metric is -# being collected somewhere other than a worker. -function data_added(filter: Filter, index: Index, val: count) - { - # If an intermediate update for this value was sent recently, don't send - # it again. - if ( [filter$id, filter$name, index] in recent_global_view_indexes ) - return; - - # If val is 5 and global view % is 0.1 (10%), pct_val will be 50. If that - # crosses the full threshold then it's a candidate to send as an - # intermediate update. - local pct_val = double_to_count(val / cluster_request_global_view_percent); - - if ( check_notice(filter, index, pct_val) ) - { - # kick off intermediate update - event Metrics::cluster_index_intermediate_response(filter$id, filter$name, index, val); - - ++recent_global_view_indexes[filter$id, filter$name, index]; - } - } - -event Metrics::send_data(uid: string, id: ID, filter_name: string, data: MetricTable) - { - #print fmt("WORKER %s: sending data for uid %s...", Cluster::node, uid); - - local local_data: MetricTable; - local num_added = 0; - for ( index in data ) - { - local_data[index] = data[index]; - delete data[index]; - - # Only send cluster_send_in_groups_of at a time. Queue another - # event to send the next group. - if ( cluster_send_in_groups_of == ++num_added ) - break; - } - - local done = F; - # If data is empty, this metric is done. - if ( |data| == 0 ) - done = T; - - event Metrics::cluster_filter_response(uid, id, filter_name, local_data, done); - if ( ! done ) - event Metrics::send_data(uid, id, filter_name, data); - } - -event Metrics::cluster_filter_request(uid: string, id: ID, filter_name: string) - { - #print fmt("WORKER %s: received the cluster_filter_request event.", Cluster::node); - - # Initiate sending all of the data for the requested filter. - event Metrics::send_data(uid, id, filter_name, store[id, filter_name]); - - # Lookup the actual filter and reset it, the reference to the data - # currently stored will be maintained interally by the send_data event. - reset(filter_store[id, filter_name]); - } - -event Metrics::cluster_index_request(uid: string, id: ID, filter_name: string, index: Index) - { - local val=0; - if ( index in store[id, filter_name] ) - val = store[id, filter_name][index]; - - # fmt("WORKER %s: received the cluster_index_request event for %s=%d.", Cluster::node, index2str(index), val); - event Metrics::cluster_index_response(uid, id, filter_name, index, val); - } - -@endif - - -@if ( Cluster::local_node_type() == Cluster::MANAGER ) - -# Manager's handle logging. -event Metrics::log_it(filter: Filter) - { - #print fmt("%.6f MANAGER: breaking %s filter for %s metric", network_time(), filter$name, filter$id); - - local uid = unique_id(""); - - # Set some tracking variables. - requested_results[uid] = network_time(); - filter_results[uid, filter$id, filter$name] = table(); - - # Request data from peers. - event Metrics::cluster_filter_request(uid, filter$id, filter$name); - # Schedule the log_it event for the next break period. - schedule filter$break_interval { Metrics::log_it(filter) }; - } - -# This is unlikely to be called often, but it's here in case there are metrics -# being collected by managers. -function data_added(filter: Filter, index: Index, val: count) - { - if ( check_notice(filter, index, val) ) - do_notice(filter, index, val); - } - -event Metrics::cluster_index_response(uid: string, id: ID, filter_name: string, index: Index, val: count) - { - #print fmt("%0.6f MANAGER: receiving index data from %s", network_time(), get_event_peer()$descr); - - if ( [uid, id, filter_name, index] !in index_requests ) - index_requests[uid, id, filter_name, index] = 0; - - index_requests[uid, id, filter_name, index] += val; - local ir = index_requests[uid, id, filter_name, index]; - - ++done_with[uid]; - if ( Cluster::worker_count == done_with[uid] ) - { - if ( check_notice(filter_store[id, filter_name], index, ir) ) - do_notice(filter_store[id, filter_name], index, ir); - delete done_with[uid]; - delete index_requests[uid, id, filter_name, index]; - } - } - -# Managers handle intermediate updates here. -event Metrics::cluster_index_intermediate_response(id: ID, filter_name: string, index: Index, val: count) - { - #print fmt("MANAGER: receiving intermediate index data from %s", get_event_peer()$descr); - #print fmt("MANAGER: requesting index data for %s", index2str(index)); - - local uid = unique_id(""); - event Metrics::cluster_index_request(uid, id, filter_name, index); - ++recent_global_view_indexes[id, filter_name, index]; - } - -event Metrics::cluster_filter_response(uid: string, id: ID, filter_name: string, data: MetricTable, done: bool) - { - #print fmt("MANAGER: receiving results from %s", get_event_peer()$descr); - - local local_data = filter_results[uid, id, filter_name]; - for ( index in data ) - { - if ( index !in local_data ) - local_data[index] = 0; - local_data[index] += data[index]; - } - - # Mark another worker as being "done" for this uid. - if ( done ) - ++done_with[uid]; - - # If the data has been collected from all peers, we are done and ready to log. - if ( Cluster::worker_count == done_with[uid] ) - { - local ts = network_time(); - # Log the time this was initially requested if it's available. - if ( uid in requested_results ) - { - ts = requested_results[uid]; - delete requested_results[uid]; - } - - write_log(ts, filter_store[id, filter_name], local_data); - - # Clean up - delete filter_results[uid, id, filter_name]; - delete done_with[uid]; - } - } - -@endif diff --git a/scripts/base/frameworks/metrics/main.bro b/scripts/base/frameworks/metrics/main.bro deleted file mode 100644 index d322d128fe..0000000000 --- a/scripts/base/frameworks/metrics/main.bro +++ /dev/null @@ -1,320 +0,0 @@ -##! The metrics framework provides a way to count and measure data. - -@load base/frameworks/notice - -module Metrics; - -export { - ## The metrics logging stream identifier. - redef enum Log::ID += { LOG }; - - ## Identifiers for metrics to collect. - type ID: enum { - ## Blank placeholder value. - NOTHING, - }; - - ## The default interval used for "breaking" metrics and writing the - ## current value to the logging stream. - const default_break_interval = 15mins &redef; - - ## This is the interval for how often threshold based notices will happen - ## after they have already fired. - const renotice_interval = 1hr &redef; - - ## Represents a thing which is having metrics collected for it. An instance - ## of this record type and a :bro:type:`Metrics::ID` together represent a - ## single measurement. - type Index: record { - ## Host is the value to which this metric applies. - host: addr &optional; - - ## A non-address related metric or a sub-key for an address based metric. - ## An example might be successful SSH connections by client IP address - ## where the client string would be the index value. - ## Another example might be number of HTTP requests to a particular - ## value in a Host header. This is an example of a non-host based - ## metric since multiple IP addresses could respond for the same Host - ## header value. - str: string &optional; - - ## The CIDR block that this metric applies to. This is typically - ## only used internally for host based aggregation. - network: subnet &optional; - } &log; - - ## The record type that is used for logging metrics. - type Info: record { - ## Timestamp at which the metric was "broken". - ts: time &log; - ## What measurement the metric represents. - metric_id: ID &log; - ## The name of the filter being logged. :bro:type:`Metrics::ID` values - ## can have multiple filters which represent different perspectives on - ## the data so this is necessary to understand the value. - filter_name: string &log; - ## What the metric value applies to. - index: Index &log; - ## The simple numeric value of the metric. - value: count &log; - }; - - # TODO: configure a metrics filter logging stream to log the current - # metrics configuration in case someone is looking through - # old logs and the configuration has changed since then. - - ## Filters define how the data from a metric is aggregated and handled. - ## Filters can be used to set how often the measurements are cut or "broken" - ## and logged or how the data within them is aggregated. It's also - ## possible to disable logging and use filters for thresholding. - type Filter: record { - ## The :bro:type:`Metrics::ID` that this filter applies to. - id: ID &optional; - ## The name for this filter so that multiple filters can be - ## applied to a single metrics to get a different view of the same - ## metric data being collected (different aggregation, break, etc). - name: string &default="default"; - ## A predicate so that you can decide per index if you would like - ## to accept the data being inserted. - pred: function(index: Index): bool &optional; - ## Global mask by which you'd like to aggregate traffic. - aggregation_mask: count &optional; - ## This is essentially a mapping table between addresses and subnets. - aggregation_table: table[subnet] of subnet &optional; - ## The interval at which this filter should be "broken" and written - ## to the logging stream. The counters are also reset to zero at - ## this time so any threshold based detection needs to be set to a - ## number that should be expected to happen within this period. - break_interval: interval &default=default_break_interval; - ## This determines if the result of this filter is sent to the metrics - ## logging stream. One use for the logging framework is as an internal - ## thresholding and statistics gathering utility that is meant to - ## never log but rather to generate notices and derive data. - log: bool &default=T; - ## If this and a $notice_threshold value are set, this notice type - ## will be generated by the metrics framework. - note: Notice::Type &optional; - ## A straight threshold for generating a notice. - notice_threshold: count &optional; - ## A series of thresholds at which to generate notices. - notice_thresholds: vector of count &optional; - ## How often this notice should be raised for this filter. It - ## will be generated everytime it crosses a threshold, but if the - ## $break_interval is set to 5mins and this is set to 1hr the notice - ## only be generated once per hour even if something crosses the - ## threshold in every break interval. - notice_freq: interval &optional; - }; - - ## Function to associate a metric filter with a metric ID. - ## - ## id: The metric ID that the filter should be associated with. - ## - ## filter: The record representing the filter configuration. - global add_filter: function(id: ID, filter: Filter); - - ## Add data into a :bro:type:`Metrics::ID`. This should be called when - ## a script has measured some point value and is ready to increment the - ## counters. - ## - ## id: The metric ID that the data represents. - ## - ## index: The metric index that the value is to be added to. - ## - ## increment: How much to increment the counter by. - global add_data: function(id: ID, index: Index, increment: count); - - ## Helper function to represent a :bro:type:`Metrics::Index` value as - ## a simple string - ## - ## index: The metric index that is to be converted into a string. - ## - ## Returns: A string reprentation of the metric index. - global index2str: function(index: Index): string; - - ## Event that is used to "finish" metrics and adapt the metrics - ## framework for clustered or non-clustered usage. - ## - ## ..note: This is primarily intended for internal use. - global log_it: event(filter: Filter); - - ## Event to access metrics records as they are passed to the logging framework. - global log_metrics: event(rec: Info); - - ## Type to store a table of metrics values. Interal use only! - type MetricTable: table[Index] of count &default=0; -} - -redef record Notice::Info += { - metric_index: Index &log &optional; -}; - -global metric_filters: table[ID] of vector of Filter = table(); -global filter_store: table[ID, string] of Filter = table(); - -# This is indexed by metric ID and stream filter name. -global store: table[ID, string] of MetricTable = table() &default=table(); - -# This function checks if a threshold has been crossed and generates a -# notice if it has. It is also used as a method to implement -# mid-break-interval threshold crossing detection for cluster deployments. -global check_notice: function(filter: Filter, index: Index, val: count): bool; - -# This is hook for watching thresholds being crossed. It is called whenever -# index values are updated and the new val is given as the `val` argument. -global data_added: function(filter: Filter, index: Index, val: count); - -# This stores the current threshold index for filters using the -# $notice_threshold and $notice_thresholds elements. -global thresholds: table[ID, string, Index] of count = {} &create_expire=renotice_interval &default=0; - -event bro_init() &priority=5 - { - Log::create_stream(Metrics::LOG, [$columns=Info, $ev=log_metrics]); - } - -function index2str(index: Index): string - { - local out = ""; - if ( index?$host ) - out = fmt("%shost=%s", out, index$host); - if ( index?$network ) - out = fmt("%s%snetwork=%s", out, |out|==0 ? "" : ", ", index$network); - if ( index?$str ) - out = fmt("%s%sstr=%s", out, |out|==0 ? "" : ", ", index$str); - return fmt("metric_index(%s)", out); - } - -function write_log(ts: time, filter: Filter, data: MetricTable) - { - for ( index in data ) - { - local val = data[index]; - local m: Info = [$ts=ts, - $metric_id=filter$id, - $filter_name=filter$name, - $index=index, - $value=val]; - - if ( filter$log ) - Log::write(Metrics::LOG, m); - } - } - - -function reset(filter: Filter) - { - store[filter$id, filter$name] = table(); - } - -function add_filter(id: ID, filter: Filter) - { - if ( filter?$aggregation_table && filter?$aggregation_mask ) - { - print "INVALID Metric filter: Defined $aggregation_table and $aggregation_mask."; - return; - } - if ( [id, filter$name] in store ) - { - print fmt("INVALID Metric filter: Filter with name \"%s\" already exists.", filter$name); - return; - } - if ( filter?$notice_threshold && filter?$notice_thresholds ) - { - print "INVALID Metric filter: Defined both $notice_threshold and $notice_thresholds"; - return; - } - - if ( ! filter?$id ) - filter$id = id; - - if ( id !in metric_filters ) - metric_filters[id] = vector(); - metric_filters[id][|metric_filters[id]|] = filter; - - filter_store[id, filter$name] = filter; - store[id, filter$name] = table(); - - schedule filter$break_interval { Metrics::log_it(filter) }; - } - -function add_data(id: ID, index: Index, increment: count) - { - if ( id !in metric_filters ) - return; - - local filters = metric_filters[id]; - - # Try to add the data to all of the defined filters for the metric. - for ( filter_id in filters ) - { - local filter = filters[filter_id]; - - # If this filter has a predicate, run the predicate and skip this - # index if the predicate return false. - if ( filter?$pred && ! filter$pred(index) ) - next; - - if ( index?$host ) - { - if ( filter?$aggregation_mask ) - { - index$network = mask_addr(index$host, filter$aggregation_mask); - delete index$host; - } - else if ( filter?$aggregation_table ) - { - # Don't add the data if the aggregation table doesn't include - # the given host address. - if ( index$host !in filter$aggregation_table ) - return; - index$network = filter$aggregation_table[index$host]; - delete index$host; - } - } - - local metric_tbl = store[id, filter$name]; - if ( index !in metric_tbl ) - metric_tbl[index] = 0; - metric_tbl[index] += increment; - - data_added(filter, index, metric_tbl[index]); - } - } - -function check_notice(filter: Filter, index: Index, val: count): bool - { - if ( (filter?$notice_threshold && - [filter$id, filter$name, index] !in thresholds && - val >= filter$notice_threshold) || - (filter?$notice_thresholds && - |filter$notice_thresholds| <= thresholds[filter$id, filter$name, index] && - val >= filter$notice_thresholds[thresholds[filter$id, filter$name, index]]) ) - return T; - else - return F; - } - -function do_notice(filter: Filter, index: Index, val: count) - { - # We include $peer_descr here because the a manager count have actually - # generated the notice even though the current remote peer for the event - # calling this could be a worker if this is running as a cluster. - local n: Notice::Info = [$note=filter$note, - $n=val, - $metric_index=index, - $peer_descr=peer_description]; - n$msg = fmt("Threshold crossed by %s %d/%d", index2str(index), val, filter$notice_threshold); - if ( index?$str ) - n$sub = index$str; - if ( index?$host ) - n$src = index$host; - # TODO: not sure where to put the network yet. - - NOTICE(n); - - # This just needs set to some value so that it doesn't refire the - # notice until it expires from the table or it crosses the next - # threshold in the case of vectors of thresholds. - ++thresholds[filter$id, filter$name, index]; - } diff --git a/scripts/base/frameworks/metrics/non-cluster.bro b/scripts/base/frameworks/metrics/non-cluster.bro deleted file mode 100644 index 85c050fb25..0000000000 --- a/scripts/base/frameworks/metrics/non-cluster.bro +++ /dev/null @@ -1,21 +0,0 @@ -@load ./main - -module Metrics; - -event Metrics::log_it(filter: Filter) - { - local id = filter$id; - local name = filter$name; - - write_log(network_time(), filter, store[id, name]); - reset(filter); - - schedule filter$break_interval { Metrics::log_it(filter) }; - } - - -function data_added(filter: Filter, index: Index, val: count) - { - if ( check_notice(filter, index, val) ) - do_notice(filter, index, val); - } diff --git a/scripts/base/init-default.bro b/scripts/base/init-default.bro index 8b36899f10..bb64accaea 100644 --- a/scripts/base/init-default.bro +++ b/scripts/base/init-default.bro @@ -12,8 +12,10 @@ @load base/utils/numbers @load base/utils/paths @load base/utils/patterns +@load base/utils/queue @load base/utils/strings @load base/utils/thresholds +@load base/utils/time @load base/utils/urls # This has some deep interplay between types and BiFs so it's @@ -27,7 +29,7 @@ @load base/frameworks/communication @load base/frameworks/control @load base/frameworks/cluster -@load base/frameworks/metrics +@load base/frameworks/measurement @load base/frameworks/intel @load base/frameworks/reporter @load base/frameworks/tunnels diff --git a/scripts/base/protocols/ssh/main.bro b/scripts/base/protocols/ssh/main.bro index cd20f4e913..d782c4fa37 100644 --- a/scripts/base/protocols/ssh/main.bro +++ b/scripts/base/protocols/ssh/main.bro @@ -17,12 +17,6 @@ export { ## The SSH protocol logging stream identifier. redef enum Log::ID += { LOG }; - redef enum Notice::Type += { - ## Indicates that a heuristically detected "successful" SSH - ## authentication occurred. - Login - }; - type Info: record { ## Time when the SSH connection began. ts: time &log; @@ -30,9 +24,9 @@ export { uid: string &log; ## The connection's 4-tuple of endpoint addresses/ports. id: conn_id &log; - ## Indicates if the login was heuristically guessed to be "success" - ## or "failure". - status: string &log &optional; + ## Indicates if the login was heuristically guessed to be "success", + ## "failure", or "undetermined". + status: string &log &default="undetermined"; ## Direction of the connection. If the client was a local host ## logging into an external host, this would be OUTBOUND. INBOUND ## would be set for the opposite situation. @@ -54,12 +48,12 @@ export { ## The size in bytes of data sent by the server at which the SSH ## connection is presumed to be successful. - const authentication_data_size = 5500 &redef; + const authentication_data_size = 4000 &redef; ## If true, we tell the event engine to not look at further data ## packets after the initial SSH handshake. Helps with performance ## (especially with large file transfers) but precludes some - ## kinds of analyses (e.g., tracking connection size). + ## kinds of analyses. const skip_processing_after_detection = F &redef; ## Event that is generated when the heuristic thinks that a login @@ -104,54 +98,60 @@ function set_session(c: connection) function check_ssh_connection(c: connection, done: bool) { - # If done watching this connection, just return. + # If already done watching this connection, just return. if ( c$ssh$done ) return; - # Make sure conn_size_analyzer is active by checking - # resp$num_bytes_ip. In general it should always be active though. - if ( ! c$resp?$num_bytes_ip ) - return; - - # Remove the IP and TCP header length from the total size. - # TODO: Fix for IPv6. This whole approach also seems to break in some - # cases where there are more header bytes than num_bytes_ip. - local header_bytes = c$resp$num_pkts*32 + c$resp$num_pkts*20; - local server_bytes = c$resp$num_bytes_ip; - if ( server_bytes >= header_bytes ) - server_bytes = server_bytes - header_bytes; - else - server_bytes = c$resp$size; - - # If this is still a live connection and the byte count has not crossed - # the threshold, just return and let the rescheduled check happen later. - if ( ! done && server_bytes < authentication_data_size ) - return; - - # Make sure the server has sent back more than 50 bytes to filter out - # hosts that are just port scanning. Nothing is ever logged if the server - # doesn't send back at least 50 bytes. - if ( server_bytes < 50 ) - return; - - c$ssh$direction = Site::is_local_addr(c$id$orig_h) ? OUTBOUND : INBOUND; - c$ssh$resp_size = server_bytes; - - if ( server_bytes < authentication_data_size ) + if ( done ) { - c$ssh$status = "failure"; - event SSH::heuristic_failed_login(c); + # If this connection is done, then we can look to see if + # this matches the conditions for a failed login. Failed + # logins are only detected at connection state removal. + + if ( # Require originators to have sent at least 50 bytes. + c$orig$size > 50 && + # Responders must be below 4000 bytes. + c$resp$size < 4000 && + # Responder must have sent fewer than 40 packets. + c$resp$num_pkts < 40 && + # If there was a content gap we can't reliably do this heuristic. + c$conn$missed_bytes == 0)# && + # Only "normal" connections can count. + #c$conn?$conn_state && c$conn$conn_state in valid_states ) + { + c$ssh$status = "failure"; + event SSH::heuristic_failed_login(c); + } + + if ( c$resp$size > authentication_data_size ) + { + c$ssh$status = "success"; + event SSH::heuristic_successful_login(c); + } } else - { - # presumed successful login - c$ssh$status = "success"; - event SSH::heuristic_successful_login(c); + { + # If this connection is still being tracked, then it's possible + # to watch for it to be a successful connection. + if ( c$resp$size > authentication_data_size ) + { + c$ssh$status = "success"; + event SSH::heuristic_successful_login(c); + } + else + # This connection must be tracked longer. Let the scheduled + # check happen again. + return; } + + # Set the direction for the log. + c$ssh$direction = Site::is_local_addr(c$id$orig_h) ? OUTBOUND : INBOUND; # Set the "done" flag to prevent the watching event from rescheduling # after detection is done. c$ssh$done=T; + + Log::write(SSH::LOG, c$ssh); if ( skip_processing_after_detection ) { @@ -161,18 +161,6 @@ function check_ssh_connection(c: connection, done: bool) } } -event SSH::heuristic_successful_login(c: connection) &priority=-5 - { - NOTICE([$note=Login, - $msg="Heuristically detected successful SSH login.", - $conn=c]); - - Log::write(SSH::LOG, c$ssh); - } -event SSH::heuristic_failed_login(c: connection) &priority=-5 - { - Log::write(SSH::LOG, c$ssh); - } event connection_state_remove(c: connection) &priority=-5 { diff --git a/scripts/base/utils/queue.bro b/scripts/base/utils/queue.bro new file mode 100644 index 0000000000..1e7a293e17 --- /dev/null +++ b/scripts/base/utils/queue.bro @@ -0,0 +1,143 @@ +##! A FIFO queue. + +module Queue; + +export { + ## Settings for initializing the queue. + type Settings: record { + ## If a maximum length is set for the queue + ## it will maintain itself at that + ## maximum length automatically. + max_len: count &optional; + }; + + ## The internal data structure for the queue. + type Queue: record {}; + + ## Initialize a queue record structure. + ## + ## s: A :bro:record:`Settings` record configuring the queue. + ## + ## Returns: An opaque queue record. + global init: function(s: Settings): Queue; + + ## Push a string onto the top of a queue. + ## + ## q: The queue to put the value into. + ## + ## val: The value to insert into the queue. + global put: function(q: Queue, val: any); + + ## Pop a string from the bottom of a queue. + ## + ## q: The queue to get the string from. + ## + ## Returns: The value gotten from the queue. + global get: function(q: Queue): any; + + ## Merge two queue's together. If any settings are applied + ## to the queues, the settings from q1 are used for the new + ## merged queue. + ## + ## q1: The first queue. Settings are taken from here. + ## + ## q2: The second queue. + ## + ## Returns: A new queue from merging the other two together. + global merge: function(q1: Queue, q2: Queue): Queue; + + ## Get the number of items in a queue. + ## + ## q: The queue. + ## + ## Returns: The length of the queue. + global len: function(q: Queue): count; + + ## Get the contents of the queue as a vector. + ## + ## q: The queue. + ## + ## ret: A vector containing the + ## current contents of q as the type of ret. + global get_vector: function(q: Queue, ret: vector of any); + +} + +redef record Queue += { + # Indicator for if the queue was appropriately initialized. + initialized: bool &default=F; + # The values are stored here. + vals: table[count] of any &optional; + # Settings for the queue. + settings: Settings &optional; + # The top value in the vals table. + top: count &default=0; + # The bottom value in the vals table. + bottom: count &default=0; + # The number of bytes in the queue. + size: count &default=0; +}; + +function init(s: Settings): Queue + { + local q: Queue; + q$vals=table(); + q$settings = copy(s); + q$initialized=T; + return q; + } + +function put(q: Queue, val: any) + { + if ( q$settings?$max_len && len(q) >= q$settings$max_len ) + get(q); + q$vals[q$top] = val; + ++q$top; + } + +function get(q: Queue): any + { + local ret = q$vals[q$bottom]; + delete q$vals[q$bottom]; + ++q$bottom; + return ret; + } + +function merge(q1: Queue, q2: Queue): Queue + { + local ret = init(q1$settings); + local i = q1$bottom; + local j = q2$bottom; + for ( ignored_val in q1$vals ) + { + if ( i in q1$vals ) + put(ret, q1$vals[i]); + if ( j in q2$vals ) + put(ret, q2$vals[j]); + ++i; + ++j; + } + return ret; + } + +function len(q: Queue): count + { + return |q$vals|; + } + +function get_vector(q: Queue, ret: vector of any) + { + local i = q$bottom; + local j = 0; + # Really dumb hack, this is only to provide + # the iteration for the correct number of + # values in q$vals. + for ( ignored_val in q$vals ) + { + if ( i >= q$top ) + break; + + ret[j] = q$vals[i]; + ++j; ++i; + } + } diff --git a/scripts/base/utils/time.bro b/scripts/base/utils/time.bro new file mode 100644 index 0000000000..abae46c144 --- /dev/null +++ b/scripts/base/utils/time.bro @@ -0,0 +1,9 @@ + +## Given an interval, returns a string of the form 3m34s to +## give a minimalized human readable string for the minutes +## and seconds represented by the interval. +function duration_to_mins_secs(dur: interval): string + { + local dur_count = double_to_count(interval_to_double(dur)); + return fmt("%dm%ds", dur_count/60, dur_count%60); + } diff --git a/scripts/policy/frameworks/metrics/conn-example.bro b/scripts/policy/frameworks/metrics/conn-example.bro index 974012963b..3f87ecb283 100644 --- a/scripts/policy/frameworks/metrics/conn-example.bro +++ b/scripts/policy/frameworks/metrics/conn-example.bro @@ -1,25 +1,26 @@ ##! An example of using the metrics framework to collect connection metrics ##! aggregated into /24 CIDR ranges. -@load base/frameworks/metrics +@load base/frameworks/measurement @load base/utils/site -redef enum Metrics::ID += { - CONNS_ORIGINATED, - CONNS_RESPONDED -}; - event bro_init() { - Metrics::add_filter(CONNS_ORIGINATED, [$aggregation_mask=24, $break_interval=1mins]); + #Metrics::add_filter("conns.originated", [$aggregation_mask=24, $break_interval=1mins]); + Metrics::add_filter("conns.originated", [$every=1mins, $measure=set(Metrics::SUM), + $aggregation_table=Site::local_nets_table, + $period_finished=Metrics::write_log]); + # Site::local_nets must be defined in order for this to actually do anything. - Metrics::add_filter(CONNS_RESPONDED, [$aggregation_table=Site::local_nets_table, $break_interval=1mins]); + Metrics::add_filter("conns.responded", [$every=1mins, $measure=set(Metrics::SUM), + $aggregation_table=Site::local_nets_table, + $period_finished=Metrics::write_log]); + } event connection_established(c: connection) { - Metrics::add_data(CONNS_ORIGINATED, [$host=c$id$orig_h], 1); - Metrics::add_data(CONNS_RESPONDED, [$host=c$id$resp_h], 1); + Metrics::add_data("conns.originated", [$host=c$id$orig_h], [$num=1]); + Metrics::add_data("conns.responded", [$host=c$id$resp_h], [$num=1]); } - diff --git a/scripts/policy/frameworks/metrics/http-example.bro b/scripts/policy/frameworks/metrics/http-example.bro index 58ca4e6614..d7aa304754 100644 --- a/scripts/policy/frameworks/metrics/http-example.bro +++ b/scripts/policy/frameworks/metrics/http-example.bro @@ -2,36 +2,28 @@ ##! only local networks. Additionally, the status code for the response from ##! the request is added into the metric. -@load base/frameworks/metrics +@load base/frameworks/measurement @load base/protocols/http @load base/utils/site -redef enum Metrics::ID += { - ## Measures HTTP requests indexed on both the request host and the response - ## code from the server. - HTTP_REQUESTS_BY_STATUS_CODE, - - ## Currently unfinished and not working. - HTTP_REQUESTS_BY_HOST_HEADER, -}; - event bro_init() { - # TODO: these are waiting on a fix with table vals + records before they will work. - #Metrics::add_filter(HTTP_REQUESTS_BY_HOST_HEADER, - # [$pred(index: Metrics::Index) = { return Site::is_local_addr(index$host); }, - # $aggregation_mask=24, - # $break_interval=1min]); + Metrics::add_filter("http.request.by_host_header", + [$every=1min, $measure=set(Metrics::SUM), + $pred(index: Metrics::Index, data: Metrics::DataPoint) = { return T; return Site::is_local_addr(index$host); }, + $aggregation_mask=24, + $period_finished=Metrics::write_log]); # Site::local_nets must be defined in order for this to actually do anything. - Metrics::add_filter(HTTP_REQUESTS_BY_STATUS_CODE, [$aggregation_table=Site::local_nets_table, - $break_interval=1min]); + Metrics::add_filter("http.request.by_status_code", [$every=1min, $measure=set(Metrics::SUM), + $aggregation_table=Site::local_nets_table, + $period_finished=Metrics::write_log]); } event HTTP::log_http(rec: HTTP::Info) { if ( rec?$host ) - Metrics::add_data(HTTP_REQUESTS_BY_HOST_HEADER, [$str=rec$host], 1); + Metrics::add_data("http.request.by_host_header", [$str=rec$host], [$num=1]); if ( rec?$status_code ) - Metrics::add_data(HTTP_REQUESTS_BY_STATUS_CODE, [$host=rec$id$orig_h, $str=fmt("%d", rec$status_code)], 1); + Metrics::add_data("http.request.by_status_code", [$host=rec$id$orig_h, $str=fmt("%d", rec$status_code)], [$num=1]); } diff --git a/scripts/policy/frameworks/metrics/ssl-example.bro b/scripts/policy/frameworks/metrics/ssl-example.bro index 5ec675779a..400373c06c 100644 --- a/scripts/policy/frameworks/metrics/ssl-example.bro +++ b/scripts/policy/frameworks/metrics/ssl-example.bro @@ -3,26 +3,21 @@ ##! establishments. Names ending in google.com are being filtered out as an ##! example of the predicate based filtering in metrics filters. -@load base/frameworks/metrics +@load base/frameworks/measurement @load base/protocols/ssl -redef enum Metrics::ID += { - SSL_SERVERNAME, -}; - event bro_init() { - Metrics::add_filter(SSL_SERVERNAME, + Metrics::add_filter("ssl.by_servername", [$name="no-google-ssl-servers", - $pred(index: Metrics::Index) = { + $every=10secs, $measure=set(Metrics::SUM), + $pred(index: Metrics::Index, data: Metrics::DataPoint) = { return (/google\.com$/ !in index$str); - }, - $break_interval=10secs - ]); + }]); } event SSL::log_ssl(rec: SSL::Info) { if ( rec?$server_name ) - Metrics::add_data(SSL_SERVERNAME, [$str=rec$server_name], 1); + Metrics::add_data("ssl.by_servername", [$str=rec$server_name], [$num=1]); } diff --git a/scripts/policy/misc/app-metrics.bro b/scripts/policy/misc/app-metrics.bro new file mode 100644 index 0000000000..967d5eb88f --- /dev/null +++ b/scripts/policy/misc/app-metrics.bro @@ -0,0 +1,109 @@ +@load base/protocols/http +@load base/protocols/ssl +@load base/frameworks/measurement + +module AppMeasurement; + +export { + redef enum Log::ID += { LOG }; + + type Info: record { + ## Timestamp when the log line was finished and written. + ts: time &log; + ## Time interval that the log line covers. + ts_delta: interval &log; + ## The name of the "app", like "facebook" or "netflix". + app: string &log; + ## The number of unique local hosts using the app. + uniq_hosts: count &log; + ## The number of hits to the app in total. + hits: count &log; + ## The total number of bytes received by users of the app. + bytes: count &log; + }; + + ## The frequency of logging the stats collected by this script. + const break_interval = 15mins &redef; +} + +redef record connection += { + resp_hostname: string &optional; +}; + +event bro_init() &priority=3 + { + Log::create_stream(AppMeasurement::LOG, [$columns=Info]); + + local r1: Measurement::Reducer = [$stream="apps.bytes", $apply=set(Measurement::SUM)]; + local r2: Measurement::Reducer = [$stream="apps.hits", $apply=set(Measurement::UNIQUE)]; + Measurement::create([$epoch=break_interval, + $reducers=set(r1, r2), + $epoch_finished(data: Measurement::ResultTable) = + { + local l: Info; + l$ts = network_time(); + l$ts_delta = break_interval; + for ( key in data ) + { + local result = data[key]; + l$app = key$str; + l$bytes = double_to_count(floor(result["apps.bytes"]$sum)); + l$hits = result["apps.hits"]$num; + l$uniq_hosts = result["apps.hits"]$unique; + Log::write(LOG, l); + } + }]); + } + +function do_measurement(id: conn_id, hostname: string, size: count) + { + if ( /\.youtube\.com$/ in hostname && size > 512*1024 ) + { + Measurement::add_data("apps.bytes", [$str="youtube"], [$num=size]); + Measurement::add_data("apps.hits", [$str="youtube"], [$str=cat(id$orig_h)]); + } + else if ( /(\.facebook\.com|\.fbcdn\.net)$/ in hostname && size > 20 ) + { + Measurement::add_data("apps.bytes", [$str="facebook"], [$num=size]); + Measurement::add_data("apps.hits", [$str="facebook"], [$str=cat(id$orig_h)]); + } + else if ( /\.google\.com$/ in hostname && size > 20 ) + { + Measurement::add_data("apps.bytes", [$str="google"], [$num=size]); + Measurement::add_data("apps.hits", [$str="google"], [$str=cat(id$orig_h)]); + } + else if ( /\.nflximg\.com$/ in hostname && size > 200*1024 ) + { + Measurement::add_data("apps.bytes", [$str="netflix"], [$num=size]); + Measurement::add_data("apps.hits", [$str="netflix"], [$str=cat(id$orig_h)]); + } + else if ( /\.(pandora|p-cdn)\.com$/ in hostname && size > 512*1024 ) + { + Measurement::add_data("apps.bytes", [$str="pandora"], [$num=size]); + Measurement::add_data("apps.hits", [$str="pandora"], [$str=cat(id$orig_h)]); + } + else if ( /\.gmail\.com$/ in hostname && size > 20 ) + { + Measurement::add_data("apps.bytes", [$str="gmail"], [$num=size]); + Measurement::add_data("apps.hits", [$str="gmail"], [$str=cat(id$orig_h)]); + } +} + + +event ssl_established(c: connection) + { + if ( c?$ssl && c$ssl?$server_name ) + c$resp_hostname = c$ssl$server_name; + } + +event connection_finished(c: connection) + { + if ( c?$resp_hostname ) + do_measurement(c$id, c$resp_hostname, c$resp$size); + } + +event HTTP::log_http(rec: HTTP::Info) + { + if( rec?$host ) + do_measurement(rec$id, rec$host, rec$response_body_len); + } diff --git a/scripts/policy/misc/capture-loss.bro b/scripts/policy/misc/capture-loss.bro index b2d23020f8..1f0726299d 100644 --- a/scripts/policy/misc/capture-loss.bro +++ b/scripts/policy/misc/capture-loss.bro @@ -8,7 +8,6 @@ ##! for a sequence number that's above a gap). @load base/frameworks/notice -@load base/frameworks/metrics module CaptureLoss; diff --git a/scripts/policy/misc/detect-traceroute/__load__.bro b/scripts/policy/misc/detect-traceroute/__load__.bro new file mode 100644 index 0000000000..d551be57d3 --- /dev/null +++ b/scripts/policy/misc/detect-traceroute/__load__.bro @@ -0,0 +1 @@ +@load ./main \ No newline at end of file diff --git a/scripts/policy/misc/detect-traceroute/detect-low-ttls.sig b/scripts/policy/misc/detect-traceroute/detect-low-ttls.sig new file mode 100644 index 0000000000..c04a8905f7 --- /dev/null +++ b/scripts/policy/misc/detect-traceroute/detect-low-ttls.sig @@ -0,0 +1,9 @@ +signature traceroute-detector-ipv4 { + header ip[8] < 10 + event "match" +} + +signature traceroute-detector-ipv6 { + header ip6[7] < 10 + event "match" +} diff --git a/scripts/policy/misc/detect-traceroute/main.bro b/scripts/policy/misc/detect-traceroute/main.bro new file mode 100644 index 0000000000..1b9f369ca5 --- /dev/null +++ b/scripts/policy/misc/detect-traceroute/main.bro @@ -0,0 +1,93 @@ +##! This script detects large number of ICMP Time Exceeded messages heading +##! toward hosts that have sent low TTL packets. +##! It generates a notice when the number of ICMP Time Exceeded +##! messages for a source-destination pair exceeds threshold +@load base/frameworks/measurement +@load base/frameworks/signatures +@load-sigs ./detect-low-ttls.sig + +redef Signatures::ignored_ids += /traceroute-detector.*/; + +module Traceroute; + +export { + redef enum Log::ID += { LOG }; + + redef enum Notice::Type += { + ## Indicates that a host was seen running traceroutes. For more + ## detail about specific traceroutes that we run, refer to the + ## traceroute.log. + Detected + }; + + ## By default this script requires that any host detected running traceroutes + ## first send low TTL packets (TTL < 10) to the traceroute destination host. + ## Changing this this setting to `F` will relax the detection a bit by + ## solely relying on ICMP time-exceeded messages to detect traceroute. + const require_low_ttl_packets = T &redef; + + ## Defines the threshold for ICMP Time Exceeded messages for a src-dst pair. + ## This threshold only comes into play after a host is found to be + ## sending low ttl packets. + const icmp_time_exceeded_threshold = 3 &redef; + + ## Interval at which to watch for the + ## :bro:id:`ICMPTimeExceeded::icmp_time_exceeded_threshold` variable to be crossed. + ## At the end of each interval the counter is reset. + const icmp_time_exceeded_interval = 3min &redef; + + ## The log record for the traceroute log. + type Info: record { + ## Timestamp + ts: time &log; + ## Address initiaing the traceroute. + src: addr &log; + ## Destination address of the traceroute. + dst: addr &log; + }; + + global log_traceroute: event(rec: Traceroute::Info); +} + +event bro_init() &priority=5 + { + Log::create_stream(Traceroute::LOG, [$columns=Info, $ev=log_traceroute]); + + local r1: Measurement::Reducer = [$stream="traceroute.time_exceeded", $apply=set(Measurement::UNIQUE)]; + local r2: Measurement::Reducer = [$stream="traceroute.low_ttl_packet", $apply=set(Measurement::SUM)]; + Measurement::create([$epoch=icmp_time_exceeded_interval, + $reducers=set(r1, r2), + $threshold_val(key: Measurement::Key, result: Measurement::Result) = + { + # Give a threshold value of zero depending on if the host + # sends a low ttl packet. + if ( require_low_ttl_packets && result["traceroute.low_ttl_packet"]$sum == 0 ) + return 0; + else + return result["traceroute.time_exceeded"]$unique; + }, + $threshold=icmp_time_exceeded_threshold, + $threshold_crossed(key: Measurement::Key, result: Measurement::Result) = + { + local parts = split1(key$str, /-/); + local src = to_addr(parts[1]); + local dst = to_addr(parts[2]); + Log::write(LOG, [$ts=network_time(), $src=src, $dst=dst]); + NOTICE([$note=Traceroute::Detected, + $msg=fmt("%s seems to be running traceroute", src), + $src=src, $dst=dst, + $identifier=cat(src)]); + }]); + } + +# Low TTL packets are detected with a signature. +event signature_match(state: signature_state, msg: string, data: string) + { + if ( state$sig_id == /traceroute-detector.*/ ) + Measurement::add_data("traceroute.low_ttl_packet", [$str=cat(state$conn$id$orig_h,"-",state$conn$id$resp_h)], [$num=1]); + } + +event icmp_time_exceeded(c: connection, icmp: icmp_conn, code: count, context: icmp_context) + { + Measurement::add_data("traceroute.time_exceeded", [$str=cat(context$id$orig_h,"-",context$id$resp_h)], [$str=cat(c$id$orig_h)]); + } diff --git a/scripts/policy/misc/scan.bro b/scripts/policy/misc/scan.bro new file mode 100644 index 0000000000..2ea1e9c0fe --- /dev/null +++ b/scripts/policy/misc/scan.bro @@ -0,0 +1,230 @@ +##! Scan detection +##! +##! ..Authors: Sheharbano Khattak +##! Seth Hall +##! All the authors of the old scan.bro + +@load base/frameworks/notice +@load base/frameworks/measurement + +@load base/utils/time + +module Scan; + +export { + redef enum Notice::Type += { + ## Address scans detect that a host appears to be scanning some number + ## of hosts on a single port. This notice is generated when more than + ## :bro:id:`addr_scan_threshold` unique hosts are seen over the + ## previous :bro:id:`addr_scan_interval` time range. + Address_Scan, + ## Port scans detect that an attacking host appears to be scanning a + ## single victim host on several ports. This notice is generated when + ## an attacking host attempts to connect to :bro:id:`port_scan_threshold` + ## unique ports on a single host over the previous + ## :bro:id:`port_scan_interval` time range. + Port_Scan, + }; + + ## Failed connection attempts are tracked over this time interval for the address + ## scan detection. A higher interval will detect slower scanners, but may + ## also yield more false positives. + const addr_scan_interval = 5min &redef; + ## Failed connection attempts are tracked over this time interval for the port + ## scan detection. A higher interval will detect slower scanners, but may + ## also yield more false positives. + const port_scan_interval = 5min &redef; + + ## The threshold of a unique number of hosts a scanning host has to have failed + ## connections with on a single port. + const addr_scan_threshold = 25 &redef; + ## The threshold of a number of unique ports a scanning host has to have failed + ## connections with on a single victim host. + const port_scan_threshold = 15 &redef; + + ## Custom thresholds based on service for address scan. This is primarily + ## useful for setting reduced thresholds for specific ports. + const addr_scan_custom_thresholds: table[port] of count &redef; + + global Scan::addr_scan_policy: hook(scanner: addr, victim: addr, scanned_port: port); + + global Scan::port_scan_policy: hook(scanner: addr, victim: addr, scanned_port: port); +} + + +#function check_addr_scan_threshold(key: Measurement::Key, val: Measurement::Result): bool +# { +# # We don't need to do this if no custom thresholds are defined. +# if ( |addr_scan_custom_thresholds| == 0 ) +# return F; +# +# local service = to_port(key$str); +# return ( service in addr_scan_custom_thresholds && +# val$sum > addr_scan_custom_thresholds[service] ); +# } + +event bro_init() &priority=5 + { + local r1: Measurement::Reducer = [$stream="scan.addr.fail", $apply=set(Measurement::UNIQUE)]; + Measurement::create([$epoch=addr_scan_interval, + $reducers=set(r1), + $threshold_val(key: Measurement::Key, result: Measurement::Result) = + { + return double_to_count(result["scan.addr.fail"]$unique); + }, + #$threshold_func=check_addr_scan_threshold, + $threshold=addr_scan_threshold, + $threshold_crossed(key: Measurement::Key, result: Measurement::Result) = + { + local r = result["scan.addr.fail"]; + local side = Site::is_local_addr(key$host) ? "local" : "remote"; + local dur = duration_to_mins_secs(r$end-r$begin); + local message=fmt("%s scanned at least %d unique hosts on port %s in %s", key$host, r$unique, key$str, dur); + NOTICE([$note=Address_Scan, + $src=key$host, + $p=to_port(key$str), + $sub=side, + $msg=message, + $identifier=cat(key$host)]); + }]); + + # Note: port scans are tracked similar to: table[src_ip, dst_ip] of set(port); + local r2: Measurement::Reducer = [$stream="scan.port.fail", $apply=set(Measurement::UNIQUE)]; + Measurement::create([$epoch=port_scan_interval, + $reducers=set(r2), + $threshold_val(key: Measurement::Key, result: Measurement::Result) = + { + return double_to_count(result["scan.port.fail"]$unique); + }, + $threshold=port_scan_threshold, + $threshold_crossed(key: Measurement::Key, result: Measurement::Result) = + { + local r = result["scan.port.fail"]; + local side = Site::is_local_addr(key$host) ? "local" : "remote"; + local dur = duration_to_mins_secs(r$end-r$begin); + local message = fmt("%s scanned at least %d unique ports of host %s in %s", key$host, r$unique, key$str, dur); + NOTICE([$note=Port_Scan, + $src=key$host, + $dst=to_addr(key$str), + $sub=side, + $msg=message, + $identifier=cat(key$host)]); + }]); + } + +function add_metrics(id: conn_id, reverse: bool) + { + local scanner = id$orig_h; + local victim = id$resp_h; + local scanned_port = id$resp_p; + + if ( reverse ) + { + scanner = id$resp_h; + victim = id$orig_h; + scanned_port = id$orig_p; + } + + # Defaults to be implemented with a hook... + #local transport_layer_proto = get_port_transport_proto(service); + #if ( suppress_UDP_scan_checks && (transport_layer_proto == udp) ) + # return F; + #else if ( suppress_TCP_scan_checks && (transport_layer_proto == tcp) ) + # return F; + #else if ( suppress_ICMP_scan_checks && (transport_layer_proto == icmp) ) + # return F; + + # TODO: all of this whitelist/blacklist will be done + # through the upcoming hook mechanism + # Blacklisting/whitelisting services + #if ( |analyze_services| > 0 ) + # { + # if ( service !in analyze_services ) + # return F; + # } + #else if ( service in skip_services ) + # return F; + # + ## Blacklisting/whitelisting subnets + #if ( |analyze_subnets| > 0 && host !in analyze_subnets ) + # return F; + + if ( hook Scan::addr_scan_policy(scanner, victim, scanned_port) ) + Measurement::add_data("scan.addr.fail", [$host=scanner, $str=cat(scanned_port)], [$str=cat(victim)]); + + if ( hook Scan::port_scan_policy(scanner, victim, scanned_port) ) + Measurement::add_data("scan.port.fail", [$host=scanner, $str=cat(victim)], [$str=cat(scanned_port)]); + } + +function is_failed_conn(c: connection): bool + { + # Sr || ( (hR || ShR) && (data not sent in any direction) ) + if ( (c$orig$state == TCP_SYN_SENT && c$resp$state == TCP_RESET) || + (((c$orig$state == TCP_RESET && c$resp$state == TCP_SYN_ACK_SENT) || + (c$orig$state == TCP_RESET && c$resp$state == TCP_ESTABLISHED && "S" in c$history ) + ) && /[Dd]/ !in c$history ) + ) + return T; + return F; + } + +function is_reverse_failed_conn(c: connection): bool + { + # reverse scan i.e. conn dest is the scanner + # sR || ( (Hr || sHr) && (data not sent in any direction) ) + if ( (c$resp$state == TCP_SYN_SENT && c$orig$state == TCP_RESET) || + (((c$resp$state == TCP_RESET && c$orig$state == TCP_SYN_ACK_SENT) || + (c$resp$state == TCP_RESET && c$orig$state == TCP_ESTABLISHED && "s" in c$history ) + ) && /[Dd]/ !in c$history ) + ) + return T; + return F; + } + +## Generated for an unsuccessful connection attempt. This +## event is raised when an originator unsuccessfully attempted +## to establish a connection. “Unsuccessful” is defined as at least +## tcp_attempt_delay seconds having elapsed since the originator +## first sent a connection establishment packet to the destination +## without seeing a reply. +event connection_attempt(c: connection) + { + local is_reverse_scan = F; + if ( "H" in c$history ) + is_reverse_scan = T; + + add_metrics(c$id, is_reverse_scan); + } + +## Generated for a rejected TCP connection. This event +## is raised when an originator attempted to setup a TCP +## connection but the responder replied with a RST packet +## denying it. +event connection_rejected(c: connection) + { + local is_reverse_scan = F; + if ( "s" in c$history ) + is_reverse_scan = T; + + add_metrics(c$id, is_reverse_scan); + } + +## Generated when an endpoint aborted a TCP connection. +## The event is raised when one endpoint of an *established* +## TCP connection aborted by sending a RST packet. +event connection_reset(c: connection) + { + if ( is_failed_conn(c) ) + add_metrics(c$id, F); + else if ( is_reverse_failed_conn(c) ) + add_metrics(c$id, T); + } + +## Generated for each still-open connection when Bro terminates. +event connection_pending(c: connection) + { + if ( is_failed_conn(c) ) + add_metrics(c$id, F); + else if ( is_reverse_failed_conn(c) ) + add_metrics(c$id, T); + } diff --git a/scripts/policy/protocols/conn/conn-stats-per-host.bro b/scripts/policy/protocols/conn/conn-stats-per-host.bro new file mode 100644 index 0000000000..d537d13b72 --- /dev/null +++ b/scripts/policy/protocols/conn/conn-stats-per-host.bro @@ -0,0 +1,27 @@ + +@load base/protocols/conn +@load base/frameworks/measurement + +event bro_init() &priority=5 + { + Metrics::add_filter("conn.orig.data", + [$every=5mins, + $measure=set(Metrics::VARIANCE, Metrics::AVG, Metrics::MAX, Metrics::MIN, Metrics::STD_DEV), + $period_finished=Metrics::write_log]); + Metrics::add_filter("conn.resp.data", + [$every=5mins, + $measure=set(Metrics::VARIANCE, Metrics::AVG, Metrics::MAX, Metrics::MIN, Metrics::STD_DEV), + $period_finished=Metrics::write_log]); + } + + +event connection_state_remove(c: connection) + { + if ( ! (c$conn$conn_state == "SF" && c$conn$proto == tcp) ) + return; + + if ( Site::is_local_addr(c$id$orig_h) ) + Metrics::add_data("conn.orig.data", [$host=c$id$orig_h], [$num=c$orig$size]); + if ( Site::is_local_addr(c$id$resp_h) ) + Metrics::add_data("conn.resp.data", [$host=c$id$resp_h], [$num=c$resp$size]); + } \ No newline at end of file diff --git a/scripts/policy/protocols/conn/metrics.bro b/scripts/policy/protocols/conn/metrics.bro new file mode 100644 index 0000000000..62ca96ea0a --- /dev/null +++ b/scripts/policy/protocols/conn/metrics.bro @@ -0,0 +1,24 @@ +@load base/frameworks/measurement +@load base/utils/site + +event bro_init() &priority=3 + { + Metrics::add_filter("conns.country", [$every=1hr, $measure=set(Metrics::SUM), + $period_finished=Metrics::write_log]); + Metrics::add_filter("hosts.active", [$every=1hr, $measure=set(Metrics::SUM), + $period_finished=Metrics::write_log]); + } + +event connection_established(c: connection) &priority=3 + { + if ( Site::is_local_addr(c$id$orig_h) ) + { + local loc = lookup_location(c$id$resp_h); + if ( loc?$country_code ) + Metrics::add_data("conns.country", [$str=loc$country_code], [$num=1]); + } + + local the_host = Site::is_local_addr(c$id$orig_h) ? c$id$orig_h : c$id$resp_h; + # There is no index for this. + Metrics::add_data("hosts.active", [], [$str=cat(the_host)]); + } diff --git a/scripts/policy/protocols/ftp/detect-bruteforcing.bro b/scripts/policy/protocols/ftp/detect-bruteforcing.bro new file mode 100644 index 0000000000..286cc95979 --- /dev/null +++ b/scripts/policy/protocols/ftp/detect-bruteforcing.bro @@ -0,0 +1,52 @@ + +@load base/protocols/ftp +@load base/frameworks/measurement + +@load base/utils/time + +module FTP; + +export { + redef enum Notice::Type += { + ## Indicates a host bruteforcing FTP logins by watching for too many + ## rejected usernames or failed passwords. + Bruteforcing + }; + + ## How many rejected usernames or passwords are required before being + ## considered to be bruteforcing. + const bruteforce_threshold = 20 &redef; + + ## The time period in which the threshold needs to be crossed before + ## being reset. + const bruteforce_measurement_interval = 15mins &redef; +} + + +event bro_init() + { + Metrics::add_filter("ftp.failed_auth", [$every=bruteforce_measurement_interval, + $measure=set(Metrics::UNIQUE), + $threshold_val_func(val: Metrics::Result) = { return val$num; }, + $threshold=bruteforce_threshold, + $threshold_crossed(index: Metrics::Index, val: Metrics::Result) = + { + local dur = duration_to_mins_secs(val$end-val$begin); + local plural = val$unique>1 ? "s" : ""; + local message = fmt("%s had %d failed logins on %d FTP server%s in %s", index$host, val$num, val$unique, plural, dur); + NOTICE([$note=FTP::Bruteforcing, + $src=index$host, + $msg=message, + $identifier=cat(index$host)]); + }]); + } + +event ftp_reply(c: connection, code: count, msg: string, cont_resp: bool) + { + local cmd = c$ftp$cmdarg$cmd; + if ( cmd == "USER" || cmd == "PASS" ) + { + if ( FTP::parse_ftp_reply_code(code)$x == 5 ) + Metrics::add_data("ftp.failed_auth", [$host=c$id$orig_h], [$str=cat(c$id$resp_h)]); + } + } \ No newline at end of file diff --git a/scripts/policy/protocols/http/detect-sqli.bro b/scripts/policy/protocols/http/detect-sqli.bro index a92565c63a..bb47ec2f47 100644 --- a/scripts/policy/protocols/http/detect-sqli.bro +++ b/scripts/policy/protocols/http/detect-sqli.bro @@ -1,7 +1,7 @@ ##! SQL injection attack detection in HTTP. @load base/frameworks/notice -@load base/frameworks/metrics +@load base/frameworks/measurement @load base/protocols/http module HTTP; @@ -15,13 +15,6 @@ export { SQL_Injection_Victim, }; - redef enum Metrics::ID += { - ## Metric to track SQL injection attackers. - SQLI_ATTACKER, - ## Metrics to track SQL injection victims. - SQLI_VICTIM, - }; - redef enum Tags += { ## Indicator of a URI based SQL injection attack. URI_SQLI, @@ -42,6 +35,11 @@ export { ## At the end of each interval the counter is reset. const sqli_requests_interval = 5min &redef; + ## Collecting samples will add extra data to notice emails + ## by collecting some sample SQL injection url paths. Disable + ## sample collection by setting this value to 0. + const collect_SQLi_samples = 5 &redef; + ## Regular expression is used to match URI based SQL injections. const match_sql_injection_uri = /[\?&][^[:blank:]\x00-\x37\|]+?=[\-[:alnum:]%]+([[:blank:]\x00-\x37]|\/\*.*?\*\/)*['"]?([[:blank:]\x00-\x37]|\/\*.*?\*\/|\)?;)+.*?([hH][aA][vV][iI][nN][gG]|[uU][nN][iI][oO][nN]|[eE][xX][eE][cC]|[sS][eE][lL][eE][cC][tT]|[dD][eE][lL][eE][tT][eE]|[dD][rR][oO][pP]|[dD][eE][cC][lL][aA][rR][eE]|[cC][rR][eE][aA][tT][eE]|[iI][nN][sS][eE][rR][tT])([[:blank:]\x00-\x37]|\/\*.*?\*\/)+/ @@ -52,20 +50,54 @@ export { | /\/\*![[:digit:]]{5}.*?\*\// &redef; } +function format_sqli_samples(samples: vector of Measurement::DataPoint): string + { + local ret = "SQL Injection samples\n---------------------"; + for ( i in samples ) + ret += "\n" + samples[i]$str; + return ret; + } + event bro_init() &priority=3 { # Add filters to the metrics so that the metrics framework knows how to # determine when it looks like an actual attack and how to respond when # thresholds are crossed. - - Metrics::add_filter(SQLI_ATTACKER, [$log=F, - $notice_threshold=sqli_requests_threshold, - $break_interval=sqli_requests_interval, - $note=SQL_Injection_Attacker]); - Metrics::add_filter(SQLI_VICTIM, [$log=F, - $notice_threshold=sqli_requests_threshold, - $break_interval=sqli_requests_interval, - $note=SQL_Injection_Victim]); + local r1: Measurement::Reducer = [$stream="http.sqli.attacker", $apply=set(Measurement::SUM), $samples=collect_SQLi_samples]; + Measurement::create([$epoch=sqli_requests_interval, + $reducers=set(r1), + $threshold_val(key: Measurement::Key, result: Measurement::Result) = + { + return double_to_count(result["http.sqli.attacker"]$sum); + }, + $threshold=sqli_requests_threshold, + $threshold_crossed(key: Measurement::Key, result: Measurement::Result) = + { + local r = result["http.sqli.attacker"]; + NOTICE([$note=SQL_Injection_Attacker, + $msg="An SQL injection attacker was discovered!", + $email_body_sections=vector(format_sqli_samples(r$samples)), + $src=key$host, + $identifier=cat(key$host)]); + }]); + + local r2: Measurement::Reducer = [$stream="http.sqli.victim", $apply=set(Measurement::SUM), $samples=collect_SQLi_samples]; + Measurement::create([$epoch=sqli_requests_interval, + $reducers=set(r2), + $threshold_val(key: Measurement::Key, result: Measurement::Result) = + { + return double_to_count(result["http.sqli.victim"]$sum); + }, + $threshold=sqli_requests_threshold, + $threshold_crossed(key: Measurement::Key, result: Measurement::Result) = + { + local r = result["http.sqli.victim"]; + NOTICE([$note=SQL_Injection_Victim, + $msg="An SQL injection victim was discovered!", + $email_body_sections=vector(format_sqli_samples(r$samples)), + $src=key$host, + $identifier=cat(key$host)]); + }]); } event http_request(c: connection, method: string, original_URI: string, @@ -75,7 +107,7 @@ event http_request(c: connection, method: string, original_URI: string, { add c$http$tags[URI_SQLI]; - Metrics::add_data(SQLI_ATTACKER, [$host=c$id$orig_h], 1); - Metrics::add_data(SQLI_VICTIM, [$host=c$id$resp_h], 1); + Measurement::add_data("http.sqli.attacker", [$host=c$id$orig_h], [$str=original_URI]); + Measurement::add_data("http.sqli.victim", [$host=c$id$resp_h], [$str=original_URI]); } } diff --git a/scripts/policy/protocols/smtp/metrics.bro b/scripts/policy/protocols/smtp/metrics.bro new file mode 100644 index 0000000000..19a3220805 --- /dev/null +++ b/scripts/policy/protocols/smtp/metrics.bro @@ -0,0 +1,37 @@ +##! This script is meant to answer the following questions... +##! "How many unique 'MAIL FROM' addresses are being used by local mail servers per hour?" +##! "How much mail is being sent from each local mail server per hour?" + +@load base/protocols/smtp +@load base/frameworks/measurement +@load base/utils/site +@load base/utils/directions-and-hosts + +module SMTPMetrics; + +export { + ## Define the break intervals for all of the metrics collected and logged by this script. + const breaks=1hr &redef; +} + +event bro_init() &priority=5 + { + Metrics::add_filter("smtp.mailfrom", [$every=breaks, + $measure=set(Metrics::SUM), + $pred(index: Metrics::Index, data: Metrics::DataPoint) = { + return addr_matches_host(index$host, LOCAL_HOSTS); + }]); + Metrics::add_filter("smtp.messages", [$every=breaks, + $measure=set(Metrics::SUM), + $pred(index: Metrics::Index, data: Metrics::DataPoint) = { + return addr_matches_host(index$host, LOCAL_HOSTS); + }]); + } + +event SMTP::log_smtp(rec: SMTP::Info) + { + Metrics::add_data("smtp.messages", [$host=rec$id$orig_h], [$num=1]); + + if ( rec?$mailfrom ) + Metrics::add_data("smtp.mailfrom", [$host=rec$id$orig_h], [$str=rec$mailfrom]); + } diff --git a/scripts/policy/protocols/ssh/detect-bruteforcing.bro b/scripts/policy/protocols/ssh/detect-bruteforcing.bro index aa6e920c12..cf2d4030fd 100644 --- a/scripts/policy/protocols/ssh/detect-bruteforcing.bro +++ b/scripts/policy/protocols/ssh/detect-bruteforcing.bro @@ -2,7 +2,7 @@ ##! bruteforcing over SSH. @load base/protocols/ssh -@load base/frameworks/metrics +@load base/frameworks/measurement @load base/frameworks/notice @load base/frameworks/intel @@ -19,12 +19,12 @@ export { ## currently implemented. Login_By_Password_Guesser, }; - - redef enum Metrics::ID += { - ## Metric is to measure failed logins. - FAILED_LOGIN, - }; + redef enum Intel::Where += { + ## An indicator of the login for the intel framework. + SSH::SUCCESSFUL_LOGIN, + }; + ## The number of failed SSH connections before a host is designated as ## guessing passwords. const password_guesses_limit = 30 &redef; @@ -38,33 +38,40 @@ export { ## heuristic fails and this acts as the whitelist. The index represents ## client subnets and the yield value represents server subnets. const ignore_guessers: table[subnet] of subnet &redef; - - ## Tracks hosts identified as guessing passwords. - global password_guessers: set[addr] - &read_expire=guessing_timeout+1hr &synchronized &redef; } event bro_init() { - Metrics::add_filter(FAILED_LOGIN, [$name="detect-bruteforcing", $log=F, - $note=Password_Guessing, - $notice_threshold=password_guesses_limit, - $notice_freq=1hr, - $break_interval=guessing_timeout]); + local r1: Measurement::Reducer = [$stream="ssh.login.failure", $apply=set(Measurement::SUM)]; + Measurement::create([$epoch=guessing_timeout, + $reducers=set(r1), + $threshold_val(key: Measurement::Key, result: Measurement::Result) = + { + return double_to_count(result["ssh.login.failure"]$sum); + }, + $threshold=password_guesses_limit, + $threshold_crossed(key: Measurement::Key, result: Measurement::Result) = + { + local r = result["ssh.login.failure"]; + # Generate the notice. + NOTICE([$note=Password_Guessing, + $msg=fmt("%s appears to be guessing SSH passwords (seen in %d connections).", key$host, r$num), + $src=key$host, + $identifier=cat(key$host)]); + # Insert the guesser into the intel framework. + Intel::insert([$host=key$host, + $meta=[$source="local", + $desc=fmt("Bro observed %d apparently failed SSH connections.", r$num)]]); + }]); } event SSH::heuristic_successful_login(c: connection) { local id = c$id; - # TODO: This is out for the moment pending some more additions to the - # metrics framework. - #if ( id$orig_h in password_guessers ) - # { - # NOTICE([$note=Login_By_Password_Guesser, - # $conn=c, - # $msg=fmt("Successful SSH login by password guesser %s", id$orig_h)]); - # } + Intel::seen([$host=id$orig_h, + $conn=c, + $where=SSH::SUCCESSFUL_LOGIN]); } event SSH::heuristic_failed_login(c: connection) @@ -75,5 +82,5 @@ event SSH::heuristic_failed_login(c: connection) # be ignored. if ( ! (id$orig_h in ignore_guessers && id$resp_h in ignore_guessers[id$orig_h]) ) - Metrics::add_data(FAILED_LOGIN, [$host=id$orig_h], 1); + Measurement::add_data("ssh.login.failure", [$host=id$orig_h], [$num=1]); } diff --git a/scripts/site/local.bro b/scripts/site/local.bro index 6dff7881f5..dfebd9923a 100644 --- a/scripts/site/local.bro +++ b/scripts/site/local.bro @@ -8,6 +8,9 @@ # Apply the default tuning scripts for common tuning settings. @load tuning/defaults +# Load the scan detection script. +@load misc/scan + # Generate notices when vulnerable versions of software are discovered. # The default is to only monitor software found in the address space defined # as "local". Refer to the software framework's documentation for more diff --git a/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log b/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log index d9e8ee0703..097fc1f2ca 100644 --- a/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log +++ b/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log @@ -38,6 +38,7 @@ scripts/base/init-default.bro scripts/base/utils/files.bro scripts/base/utils/numbers.bro scripts/base/utils/paths.bro + scripts/base/utils/queue.bro scripts/base/utils/strings.bro scripts/base/utils/thresholds.bro scripts/base/utils/urls.bro diff --git a/testing/btest/Baseline/scripts.base.frameworks.measurement.basic-cluster/manager-1..stdout b/testing/btest/Baseline/scripts.base.frameworks.measurement.basic-cluster/manager-1..stdout new file mode 100644 index 0000000000..ea8904d2e6 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.measurement.basic-cluster/manager-1..stdout @@ -0,0 +1,4 @@ +Host: 6.5.4.3 - num:2 - sum:6.0 - avg:3.0 - max:5.0 - min:1.0 - var:8.0 - std_dev:2.8 - unique:2 +Host: 10.10.10.10 - num:1 - sum:5.0 - avg:5.0 - max:5.0 - min:5.0 - var:0.0 - std_dev:0.0 - unique:1 +Host: 1.2.3.4 - num:9 - sum:437.0 - avg:48.6 - max:95.0 - min:3.0 - var:758.8 - std_dev:27.5 - unique:8 +Host: 7.2.1.5 - num:2 - sum:145.0 - avg:72.5 - max:91.0 - min:54.0 - var:684.5 - std_dev:26.2 - unique:2 diff --git a/testing/btest/Baseline/scripts.base.frameworks.measurement.basic/.stdout b/testing/btest/Baseline/scripts.base.frameworks.measurement.basic/.stdout new file mode 100644 index 0000000000..208b6103b7 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.measurement.basic/.stdout @@ -0,0 +1,3 @@ +Host: 6.5.4.3 - num:1 - sum:2.0 - var:0.0 - avg:2.0 - max:2.0 - min:2.0 - std_dev:0.0 - unique:1 +Host: 1.2.3.4 - num:5 - sum:221.0 - var:1144.2 - avg:44.2 - max:94.0 - min:5.0 - std_dev:33.8 - unique:4 +Host: 7.2.1.5 - num:1 - sum:1.0 - var:0.0 - avg:1.0 - max:1.0 - min:1.0 - std_dev:0.0 - unique:1 diff --git a/testing/btest/Baseline/scripts.base.frameworks.measurement.cluster-intermediate-update/manager-1..stdout b/testing/btest/Baseline/scripts.base.frameworks.measurement.cluster-intermediate-update/manager-1..stdout new file mode 100644 index 0000000000..2a53389dc3 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.measurement.cluster-intermediate-update/manager-1..stdout @@ -0,0 +1 @@ +A test metric threshold was crossed with a value of: 100.0 diff --git a/testing/btest/Baseline/scripts.base.frameworks.measurement.thresholding/.stdout b/testing/btest/Baseline/scripts.base.frameworks.measurement.thresholding/.stdout new file mode 100644 index 0000000000..09c65c3864 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.measurement.thresholding/.stdout @@ -0,0 +1,6 @@ +THRESHOLD_SERIES: hit a threshold series value at 3 for measurement_key(host=1.2.3.4) +THRESHOLD: hit a threshold value at 6 for measurement_key(host=1.2.3.4) +THRESHOLD_SERIES: hit a threshold series value at 6 for measurement_key(host=1.2.3.4) +THRESHOLD: hit a threshold value at 1001 for measurement_key(host=7.2.1.5) +THRESHOLD_SERIES: hit a threshold series value at 1001 for measurement_key(host=7.2.1.5) +THRESHOLD WITH RATIO BETWEEN REDUCERS: hit a threshold value at 55x for measurement_key(host=7.2.1.5) diff --git a/testing/btest/Baseline/scripts.base.frameworks.metrics.basic-cluster/manager-1.metrics.log b/testing/btest/Baseline/scripts.base.frameworks.metrics.basic-cluster/manager-1.metrics.log index cb1bd5af01..bdc86c68bb 100644 --- a/testing/btest/Baseline/scripts.base.frameworks.metrics.basic-cluster/manager-1.metrics.log +++ b/testing/btest/Baseline/scripts.base.frameworks.metrics.basic-cluster/manager-1.metrics.log @@ -3,10 +3,10 @@ #empty_field (empty) #unset_field - #path metrics -#open 2012-07-20-01-50-41 -#fields ts metric_id filter_name index.host index.str index.network value -#types time enum string addr string subnet count -1342749041.601712 TEST_METRIC foo-bar 6.5.4.3 - - 4 -1342749041.601712 TEST_METRIC foo-bar 7.2.1.5 - - 2 -1342749041.601712 TEST_METRIC foo-bar 1.2.3.4 - - 6 -#close 2012-07-20-01-50-49 +#open 2012-12-17-18-43-15 +#fields ts ts_delta metric index.str index.host index.network result.begin result.end result.num result.sum result.min result.max result.avg result.variance result.std_dev result.unique +#types time interval string string addr subnet time time count double double double double double double count +1355769795.365325 3.000000 test.metric - 6.5.4.3 - 1355769793.449322 1355769793.458467 2 6.0 1.0 5.0 3.0 4.0 2.0 2 +1355769795.365325 3.000000 test.metric - 1.2.3.4 - 1355769793.449322 1355769793.458467 9 437.0 3.0 95.0 48.555556 674.469136 25.970544 8 +1355769795.365325 3.000000 test.metric - 7.2.1.5 - 1355769793.449322 1355769793.458467 2 145.0 54.0 91.0 72.5 342.25 18.5 2 +#close 2012-12-17-18-43-21 diff --git a/testing/btest/Baseline/scripts.base.frameworks.metrics.basic/metrics.log b/testing/btest/Baseline/scripts.base.frameworks.metrics.basic/metrics.log index fb6476ee88..51d892e8d5 100644 --- a/testing/btest/Baseline/scripts.base.frameworks.metrics.basic/metrics.log +++ b/testing/btest/Baseline/scripts.base.frameworks.metrics.basic/metrics.log @@ -3,10 +3,10 @@ #empty_field (empty) #unset_field - #path metrics -#open 2012-07-20-01-49-22 -#fields ts metric_id filter_name index.host index.str index.network value -#types time enum string addr string subnet count -1342748962.841548 TEST_METRIC foo-bar 6.5.4.3 - - 2 -1342748962.841548 TEST_METRIC foo-bar 7.2.1.5 - - 1 -1342748962.841548 TEST_METRIC foo-bar 1.2.3.4 - - 3 -#close 2012-07-20-01-49-22 +#open 2012-12-17-18-43-45 +#fields ts ts_delta metric index.str index.host index.network result.begin result.end result.num result.sum result.min result.max result.avg result.variance result.std_dev result.unique +#types time interval string string addr subnet time time count double double double double double double count +1355769825.947161 3.000000 test.metric - 6.5.4.3 - 1355769825.947161 1355769825.947161 1 2.0 2.0 2.0 2.0 0.0 0.0 - +1355769825.947161 3.000000 test.metric - 1.2.3.4 - 1355769825.947161 1355769825.947161 5 221.0 5.0 94.0 44.2 915.36 30.254917 - +1355769825.947161 3.000000 test.metric - 7.2.1.5 - 1355769825.947161 1355769825.947161 1 1.0 1.0 1.0 1.0 0.0 0.0 - +#close 2012-12-17-18-43-45 diff --git a/testing/btest/Baseline/scripts.base.frameworks.metrics.cluster-intermediate-update/manager-1..stdout b/testing/btest/Baseline/scripts.base.frameworks.metrics.cluster-intermediate-update/manager-1..stdout new file mode 100644 index 0000000000..2d0750ca18 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.metrics.cluster-intermediate-update/manager-1..stdout @@ -0,0 +1 @@ +A test metric threshold was crossed! diff --git a/testing/btest/Baseline/scripts.base.frameworks.metrics.thresholding/.stdout b/testing/btest/Baseline/scripts.base.frameworks.metrics.thresholding/.stdout new file mode 100644 index 0000000000..da692f2fe2 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.metrics.thresholding/.stdout @@ -0,0 +1,8 @@ +THRESHOLD_SERIES: hit a threshold series value at 3 for metric_index(host=1.2.3.4) +THRESHOLD_FUNC: hit a threshold function value at 3 for metric_index(host=1.2.3.4) +THRESHOLD_FUNC: hit a threshold function value at 2 for metric_index(host=6.5.4.3) +THRESHOLD_FUNC: hit a threshold function value at 1 for metric_index(host=7.2.1.5) +THRESHOLD: hit a threshold value at 6 for metric_index(host=1.2.3.4) +THRESHOLD_SERIES: hit a threshold series value at 6 for metric_index(host=1.2.3.4) +THRESHOLD: hit a threshold value at 1001 for metric_index(host=7.2.1.5) +THRESHOLD_SERIES: hit a threshold series value at 1001 for metric_index(host=7.2.1.5) diff --git a/testing/btest/Baseline/scripts.base.utils.queue/output b/testing/btest/Baseline/scripts.base.utils.queue/output new file mode 100644 index 0000000000..b878006310 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.utils.queue/output @@ -0,0 +1,11 @@ +This is a get_cnt_vector test: 3 +This is a get_cnt_vector test: 4 +This is a get_str_vector test: 3 +This is a get_str_vector test: 4 +Testing pop: 3 +Length after pop: 1 +Size of q2: 4 +String queue value: test 1 +String queue value: test 2 +String queue value: test 2 +String queue value: test 1 diff --git a/testing/btest/scripts/base/frameworks/measurement/basic-cluster.bro b/testing/btest/scripts/base/frameworks/measurement/basic-cluster.bro new file mode 100644 index 0000000000..e2f5e4e7d5 --- /dev/null +++ b/testing/btest/scripts/base/frameworks/measurement/basic-cluster.bro @@ -0,0 +1,83 @@ +# @TEST-SERIALIZE: comm +# +# @TEST-EXEC: btest-bg-run manager-1 BROPATH=$BROPATH:.. CLUSTER_NODE=manager-1 bro %INPUT +# @TEST-EXEC: sleep 1 +# @TEST-EXEC: btest-bg-run worker-1 BROPATH=$BROPATH:.. CLUSTER_NODE=worker-1 bro %INPUT +# @TEST-EXEC: btest-bg-run worker-2 BROPATH=$BROPATH:.. CLUSTER_NODE=worker-2 bro %INPUT +# @TEST-EXEC: btest-bg-wait 15 + +# @TEST-EXEC: btest-diff manager-1/.stdout + +@TEST-START-FILE cluster-layout.bro +redef Cluster::nodes = { + ["manager-1"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=37757/tcp, $workers=set("worker-1", "worker-2")], + ["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=37760/tcp, $manager="manager-1", $interface="eth0"], + ["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=37761/tcp, $manager="manager-1", $interface="eth1"], +}; +@TEST-END-FILE + +redef Log::default_rotation_interval = 0secs; + +global n = 0; + +event bro_init() &priority=5 + { + local r1: Measurement::Reducer = [$stream="test.metric", $apply=set(Measurement::SUM, Measurement::MIN, Measurement::MAX, Measurement::AVERAGE, Measurement::STD_DEV, Measurement::VARIANCE, Measurement::UNIQUE)]; + Measurement::create([$epoch=5secs, + $reducers=set(r1), + $epoch_finished(rt: Measurement::ResultTable) = + { + for ( key in rt ) + { + local r = rt[key]["test.metric"]; + print fmt("Host: %s - num:%d - sum:%.1f - avg:%.1f - max:%.1f - min:%.1f - var:%.1f - std_dev:%.1f - unique:%d", key$host, r$num, r$sum, r$average, r$max, r$min, r$variance, r$std_dev, r$unique); + } + + terminate(); + } + ]); + } + +event remote_connection_closed(p: event_peer) + { + terminate(); + } + +global ready_for_data: event(); +redef Cluster::manager2worker_events += /^ready_for_data$/; + +event ready_for_data() + { + if ( Cluster::node == "worker-1" ) + { + Measurement::add_data("test.metric", [$host=1.2.3.4], [$num=34]); + Measurement::add_data("test.metric", [$host=1.2.3.4], [$num=30]); + Measurement::add_data("test.metric", [$host=6.5.4.3], [$num=1]); + Measurement::add_data("test.metric", [$host=7.2.1.5], [$num=54]); + } + if ( Cluster::node == "worker-2" ) + { + Measurement::add_data("test.metric", [$host=1.2.3.4], [$num=75]); + Measurement::add_data("test.metric", [$host=1.2.3.4], [$num=30]); + Measurement::add_data("test.metric", [$host=1.2.3.4], [$num=3]); + Measurement::add_data("test.metric", [$host=1.2.3.4], [$num=57]); + Measurement::add_data("test.metric", [$host=1.2.3.4], [$num=52]); + Measurement::add_data("test.metric", [$host=1.2.3.4], [$num=61]); + Measurement::add_data("test.metric", [$host=1.2.3.4], [$num=95]); + Measurement::add_data("test.metric", [$host=6.5.4.3], [$num=5]); + Measurement::add_data("test.metric", [$host=7.2.1.5], [$num=91]); + Measurement::add_data("test.metric", [$host=10.10.10.10], [$num=5]); + } + } + +@if ( Cluster::local_node_type() == Cluster::MANAGER ) + +global peer_count = 0; +event remote_connection_handshake_done(p: event_peer) &priority=-5 + { + ++peer_count; + if ( peer_count == 2 ) + event ready_for_data(); + } + +@endif diff --git a/testing/btest/scripts/base/frameworks/measurement/basic.bro b/testing/btest/scripts/base/frameworks/measurement/basic.bro new file mode 100644 index 0000000000..e9dd21e0ef --- /dev/null +++ b/testing/btest/scripts/base/frameworks/measurement/basic.bro @@ -0,0 +1,34 @@ +# @TEST-EXEC: bro %INPUT +# @TEST-EXEC: btest-diff .stdout + +event bro_init() &priority=5 + { + local r1: Measurement::Reducer = [$stream="test.metric", + $apply=set(Measurement::SUM, + Measurement::VARIANCE, + Measurement::AVERAGE, + Measurement::MAX, + Measurement::MIN, + Measurement::STD_DEV, + Measurement::UNIQUE)]; + Measurement::create([$epoch=3secs, + $reducers=set(r1), + $epoch_finished(data: Measurement::ResultTable) = + { + for ( key in data ) + { + local r = data[key]["test.metric"]; + print fmt("Host: %s - num:%d - sum:%.1f - var:%.1f - avg:%.1f - max:%.1f - min:%.1f - std_dev:%.1f - unique:%d", key$host, r$num, r$sum, r$variance, r$average, r$max, r$min, r$std_dev, r$unique); + } + } + ]); + + Measurement::add_data("test.metric", [$host=1.2.3.4], [$num=5]); + Measurement::add_data("test.metric", [$host=1.2.3.4], [$num=22]); + Measurement::add_data("test.metric", [$host=1.2.3.4], [$num=94]); + Measurement::add_data("test.metric", [$host=1.2.3.4], [$num=50]); + Measurement::add_data("test.metric", [$host=1.2.3.4], [$num=50]); + + Measurement::add_data("test.metric", [$host=6.5.4.3], [$num=2]); + Measurement::add_data("test.metric", [$host=7.2.1.5], [$num=1]); + } diff --git a/testing/btest/scripts/base/frameworks/measurement/cluster-intermediate-update.bro b/testing/btest/scripts/base/frameworks/measurement/cluster-intermediate-update.bro new file mode 100644 index 0000000000..56f44db2eb --- /dev/null +++ b/testing/btest/scripts/base/frameworks/measurement/cluster-intermediate-update.bro @@ -0,0 +1,60 @@ +# @TEST-SERIALIZE: comm +# +# @TEST-EXEC: btest-bg-run manager-1 BROPATH=$BROPATH:.. CLUSTER_NODE=manager-1 bro %INPUT +# @TEST-EXEC: sleep 3 +# @TEST-EXEC: btest-bg-run worker-1 BROPATH=$BROPATH:.. CLUSTER_NODE=worker-1 bro %INPUT +# @TEST-EXEC: btest-bg-run worker-2 BROPATH=$BROPATH:.. CLUSTER_NODE=worker-2 bro %INPUT +# @TEST-EXEC: btest-bg-wait 10 +# @TEST-EXEC: btest-diff manager-1/.stdout + +@TEST-START-FILE cluster-layout.bro +redef Cluster::nodes = { + ["manager-1"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=37757/tcp, $workers=set("worker-1", "worker-2")], + ["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=37760/tcp, $manager="manager-1", $interface="eth0"], + ["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=37761/tcp, $manager="manager-1", $interface="eth1"], +}; +@TEST-END-FILE + +redef Log::default_rotation_interval = 0secs; + +event bro_init() &priority=5 + { + local r1: Measurement::Reducer = [$stream="test.metric", $apply=set(Measurement::SUM)]; + Measurement::create([$epoch=1hr, + $reducers=set(r1), + $threshold_val(key: Measurement::Key, result: Measurement::Result) = + { + return double_to_count(result["test.metric"]$sum); + }, + $threshold=100, + $threshold_crossed(key: Measurement::Key, result: Measurement::Result) = + { + print fmt("A test metric threshold was crossed with a value of: %.1f", result["test.metric"]$sum); + terminate(); + } + ]); + } + +event remote_connection_closed(p: event_peer) + { + terminate(); + } + +event do_metrics(i: count) + { + # Worker-1 will trigger an intermediate update and then if everything + # works correctly, the data from worker-2 will hit the threshold and + # should trigger the notice. + Measurement::add_data("test.metric", [$host=1.2.3.4], [$num=i]); + } + +event remote_connection_handshake_done(p: event_peer) + { + if ( p$descr == "manager-1" ) + { + if ( Cluster::node == "worker-1" ) + schedule 0.1sec { do_metrics(1) }; + if ( Cluster::node == "worker-2" ) + schedule 0.5sec { do_metrics(99) }; + } + } diff --git a/testing/btest/scripts/base/frameworks/measurement/thresholding.bro b/testing/btest/scripts/base/frameworks/measurement/thresholding.bro new file mode 100644 index 0000000000..d25350930e --- /dev/null +++ b/testing/btest/scripts/base/frameworks/measurement/thresholding.bro @@ -0,0 +1,73 @@ +# @TEST-EXEC: bro %INPUT +# @TEST-EXEC: btest-diff .stdout + +redef enum Notice::Type += { + Test_Notice, +}; + +event bro_init() &priority=5 + { + local r1: Measurement::Reducer = [$stream="test.metric", $apply=set(Measurement::SUM)]; + Measurement::create([$epoch=3secs, + $reducers=set(r1), + #$threshold_val = Measurement::sum_threshold("test.metric"), + $threshold_val(key: Measurement::Key, result: Measurement::Result) = + { + return double_to_count(result["test.metric"]$sum); + }, + $threshold=5, + $threshold_crossed(key: Measurement::Key, result: Measurement::Result) = + { + local r = result["test.metric"]; + print fmt("THRESHOLD: hit a threshold value at %.0f for %s", r$sum, Measurement::key2str(key)); + } + ]); + + local r2: Measurement::Reducer = [$stream="test.metric", $apply=set(Measurement::SUM)]; + Measurement::create([$epoch=3secs, + $reducers=set(r2), + #$threshold_val = Measurement::sum_threshold("test.metric"), + $threshold_val(key: Measurement::Key, result: Measurement::Result) = + { + return double_to_count(result["test.metric"]$sum); + }, + $threshold_series=vector(3,6,800), + $threshold_crossed(key: Measurement::Key, result: Measurement::Result) = + { + local r = result["test.metric"]; + print fmt("THRESHOLD_SERIES: hit a threshold series value at %.0f for %s", r$sum, Measurement::key2str(key)); + } + ]); + + local r3: Measurement::Reducer = [$stream="test.metric", $apply=set(Measurement::SUM)]; + local r4: Measurement::Reducer = [$stream="test.metric2", $apply=set(Measurement::SUM)]; + Measurement::create([$epoch=3secs, + $reducers=set(r3, r4), + $threshold_val(key: Measurement::Key, result: Measurement::Result) = + { + # Calculate a ratio between sums of two reducers. + if ( "test.metric2" in result && "test.metric" in result && + result["test.metric"]$sum > 0 ) + return double_to_count(result["test.metric2"]$sum / result["test.metric"]$sum); + else + return 0; + }, + # Looking for metric2 sum to be 5 times the sum of metric + $threshold=5, + $threshold_crossed(key: Measurement::Key, result: Measurement::Result) = + { + local thold = result["test.metric2"]$sum / result["test.metric"]$sum; + print fmt("THRESHOLD WITH RATIO BETWEEN REDUCERS: hit a threshold value at %.0fx for %s", thold, Measurement::key2str(key)); + } + ]); + + Measurement::add_data("test.metric", [$host=1.2.3.4], [$num=3]); + Measurement::add_data("test.metric", [$host=6.5.4.3], [$num=2]); + Measurement::add_data("test.metric", [$host=7.2.1.5], [$num=1]); + Measurement::add_data("test.metric", [$host=1.2.3.4], [$num=3]); + Measurement::add_data("test.metric", [$host=7.2.1.5], [$num=1000]); + Measurement::add_data("test.metric2", [$host=7.2.1.5], [$num=10]); + Measurement::add_data("test.metric2", [$host=7.2.1.5], [$num=1000]); + Measurement::add_data("test.metric2", [$host=7.2.1.5], [$num=54321]); + + } diff --git a/testing/btest/scripts/base/frameworks/metrics/basic-cluster.bro b/testing/btest/scripts/base/frameworks/metrics/basic-cluster.bro deleted file mode 100644 index 89ae5bf79f..0000000000 --- a/testing/btest/scripts/base/frameworks/metrics/basic-cluster.bro +++ /dev/null @@ -1,78 +0,0 @@ -# @TEST-SERIALIZE: comm -# -# @TEST-EXEC: btest-bg-run manager-1 BROPATH=$BROPATH:.. CLUSTER_NODE=manager-1 bro %INPUT -# @TEST-EXEC: btest-bg-run proxy-1 BROPATH=$BROPATH:.. CLUSTER_NODE=proxy-1 bro %INPUT -# @TEST-EXEC: sleep 1 -# @TEST-EXEC: btest-bg-run worker-1 BROPATH=$BROPATH:.. CLUSTER_NODE=worker-1 bro %INPUT -# @TEST-EXEC: btest-bg-run worker-2 BROPATH=$BROPATH:.. CLUSTER_NODE=worker-2 bro %INPUT -# @TEST-EXEC: btest-bg-wait 30 -# @TEST-EXEC: btest-diff manager-1/metrics.log - -@TEST-START-FILE cluster-layout.bro -redef Cluster::nodes = { - ["manager-1"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=37757/tcp, $workers=set("worker-1", "worker-2")], - ["proxy-1"] = [$node_type=Cluster::PROXY, $ip=127.0.0.1, $p=37758/tcp, $manager="manager-1", $workers=set("worker-1", "worker-2")], - ["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=37760/tcp, $manager="manager-1", $proxy="proxy-1", $interface="eth0"], - ["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=37761/tcp, $manager="manager-1", $proxy="proxy-1", $interface="eth1"], -}; -@TEST-END-FILE - -redef Log::default_rotation_interval = 0secs; - -redef enum Metrics::ID += { - TEST_METRIC, -}; - -event bro_init() &priority=5 - { - Metrics::add_filter(TEST_METRIC, - [$name="foo-bar", - $break_interval=3secs]); - } - -event remote_connection_closed(p: event_peer) - { - terminate(); - } - -global ready_for_data: event(); - -redef Cluster::manager2worker_events += /ready_for_data/; - -@if ( Cluster::local_node_type() == Cluster::WORKER ) - -event ready_for_data() - { - Metrics::add_data(TEST_METRIC, [$host=1.2.3.4], 3); - Metrics::add_data(TEST_METRIC, [$host=6.5.4.3], 2); - Metrics::add_data(TEST_METRIC, [$host=7.2.1.5], 1); - } - -@endif - -@if ( Cluster::local_node_type() == Cluster::MANAGER ) - -global n = 0; -global peer_count = 0; - -event Metrics::log_metrics(rec: Metrics::Info) - { - n = n + 1; - if ( n == 3 ) - { - terminate_communication(); - terminate(); - } - } - -event remote_connection_handshake_done(p: event_peer) - { - print p; - peer_count = peer_count + 1; - if ( peer_count == 3 ) - { - event ready_for_data(); - } - } - -@endif diff --git a/testing/btest/scripts/base/frameworks/metrics/basic.bro b/testing/btest/scripts/base/frameworks/metrics/basic.bro deleted file mode 100644 index 43e7ac28ef..0000000000 --- a/testing/btest/scripts/base/frameworks/metrics/basic.bro +++ /dev/null @@ -1,16 +0,0 @@ -# @TEST-EXEC: bro %INPUT -# @TEST-EXEC: btest-diff metrics.log - -redef enum Metrics::ID += { - TEST_METRIC, -}; - -event bro_init() &priority=5 - { - Metrics::add_filter(TEST_METRIC, - [$name="foo-bar", - $break_interval=3secs]); - Metrics::add_data(TEST_METRIC, [$host=1.2.3.4], 3); - Metrics::add_data(TEST_METRIC, [$host=6.5.4.3], 2); - Metrics::add_data(TEST_METRIC, [$host=7.2.1.5], 1); - } diff --git a/testing/btest/scripts/base/frameworks/metrics/cluster-intermediate-update.bro b/testing/btest/scripts/base/frameworks/metrics/cluster-intermediate-update.bro deleted file mode 100644 index db2c7e9f5d..0000000000 --- a/testing/btest/scripts/base/frameworks/metrics/cluster-intermediate-update.bro +++ /dev/null @@ -1,73 +0,0 @@ -# @TEST-SERIALIZE: comm -# -# @TEST-EXEC: btest-bg-run manager-1 BROPATH=$BROPATH:.. CLUSTER_NODE=manager-1 bro %INPUT -# @TEST-EXEC: btest-bg-run proxy-1 BROPATH=$BROPATH:.. CLUSTER_NODE=proxy-1 bro %INPUT -# @TEST-EXEC: sleep 1 -# @TEST-EXEC: btest-bg-run worker-1 BROPATH=$BROPATH:.. CLUSTER_NODE=worker-1 bro %INPUT -# @TEST-EXEC: btest-bg-run worker-2 BROPATH=$BROPATH:.. CLUSTER_NODE=worker-2 bro %INPUT -# @TEST-EXEC: btest-bg-wait 20 -# @TEST-EXEC: btest-diff manager-1/notice.log - -@TEST-START-FILE cluster-layout.bro -redef Cluster::nodes = { - ["manager-1"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=37757/tcp, $workers=set("worker-1")], - ["proxy-1"] = [$node_type=Cluster::PROXY, $ip=127.0.0.1, $p=37758/tcp, $manager="manager-1", $workers=set("worker-1")], - ["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=37760/tcp, $manager="manager-1", $proxy="proxy-1", $interface="eth0"], - ["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=37761/tcp, $manager="manager-1", $proxy="proxy-1", $interface="eth1"], -}; -@TEST-END-FILE - -redef Log::default_rotation_interval = 0secs; - -redef enum Notice::Type += { - Test_Notice, -}; - -redef enum Metrics::ID += { - TEST_METRIC, -}; - -event bro_init() &priority=5 - { - Metrics::add_filter(TEST_METRIC, - [$name="foo-bar", - $break_interval=1hr, - $note=Test_Notice, - $notice_threshold=100, - $log=T]); - } - -event remote_connection_closed(p: event_peer) - { - terminate(); - } - -@if ( Cluster::local_node_type() == Cluster::MANAGER ) - -event Notice::log_notice(rec: Notice::Info) - { - terminate_communication(); - terminate(); - } - -@endif - -@if ( Cluster::local_node_type() == Cluster::WORKER ) - -event do_metrics(i: count) - { - # Worker-1 will trigger an intermediate update and then if everything - # works correctly, the data from worker-2 will hit the threshold and - # should trigger the notice. - Metrics::add_data(TEST_METRIC, [$host=1.2.3.4], i); - } - -event bro_init() - { - if ( Cluster::node == "worker-1" ) - schedule 2sec { do_metrics(99) }; - if ( Cluster::node == "worker-2" ) - event do_metrics(1); - } - -@endif diff --git a/testing/btest/scripts/base/utils/queue.test b/testing/btest/scripts/base/utils/queue.test new file mode 100644 index 0000000000..50f541a25f --- /dev/null +++ b/testing/btest/scripts/base/utils/queue.test @@ -0,0 +1,35 @@ +# @TEST-EXEC: bro -b %INPUT > output +# @TEST-EXEC: btest-diff output + +# This is loaded by default +@load base/utils/queue + +event bro_init() + { + local q = Queue::init([$max_len=2]); + Queue::push(q, 1); + Queue::push(q, 2); + Queue::push(q, 3); + Queue::push(q, 4); + local test1 = Queue::get_cnt_vector(q); + for ( i in test1 ) + print fmt("This is a get_cnt_vector test: %d", test1[i]); + + local test2 = Queue::get_str_vector(q); + for ( i in test2 ) + print fmt("This is a get_str_vector test: %s", test2[i]); + + local test_val = Queue::pop(q); + print fmt("Testing pop: %s", test_val); + print fmt("Length after pop: %d", Queue::len(q)); + + local q2 = Queue::init([]); + Queue::push(q2, "test 1"); + Queue::push(q2, "test 2"); + Queue::push(q2, "test 2"); + Queue::push(q2, "test 1"); + print fmt("Size of q2: %d", Queue::len(q2)); + local test3: vector of string = Queue::get_str_vector(q2); + for ( i in test3 ) + print fmt("String queue value: %s", test3[i]); + } \ No newline at end of file