diff --git a/scripts/base/frameworks/measurement/__load__.bro b/scripts/base/frameworks/measurement/__load__.bro index fc784e1632..c2b77e706a 100644 --- a/scripts/base/frameworks/measurement/__load__.bro +++ b/scripts/base/frameworks/measurement/__load__.bro @@ -1,5 +1,4 @@ @load ./main - @load ./plugins # The cluster framework must be loaded first. diff --git a/scripts/base/frameworks/measurement/cluster.bro b/scripts/base/frameworks/measurement/cluster.bro index 6ccf5bb2f9..9c35b85b32 100644 --- a/scripts/base/frameworks/measurement/cluster.bro +++ b/scripts/base/frameworks/measurement/cluster.bro @@ -17,8 +17,8 @@ export { ## The percent of the full threshold value that needs to be met ## on a single worker for that worker to send the value to its manager in ## order for it to request a global view for that value. There is no - ## requirement that the manager requests a global view for the index - ## since it may opt not to if it requested a global view for the index + ## requirement that the manager requests a global view for the key + ## since it may opt not to if it requested a global view for the key ## recently. const cluster_request_global_view_percent = 0.2 &redef; @@ -34,75 +34,74 @@ export { const enable_intermediate_updates = T &redef; # Event sent by the manager in a cluster to initiate the - # collection of metrics values for a filter. - global cluster_filter_request: event(uid: string, id: string, filter_name: string); + # collection of metrics values for a measurement. + global cluster_measurement_request: event(uid: string, mid: string); # Event sent by nodes that are collecting metrics after receiving - # a request for the metric filter from the manager. - global cluster_filter_response: event(uid: string, id: string, filter_name: string, data: MetricTable, done: bool); + # a request for the metric measurement from the manager. + global cluster_measurement_response: event(uid: string, mid: string, data: ResultTable, done: bool); # This event is sent by the manager in a cluster to initiate the - # collection of a single index value from a filter. It's typically + # collection of a single key value from a measurement. It's typically # used to get intermediate updates before the break interval triggers # to speed detection of a value crossing a threshold. - global cluster_index_request: event(uid: string, id: string, filter_name: string, index: Index); + global cluster_key_request: event(uid: string, mid: string, key: Key); # This event is sent by nodes in response to a - # :bro:id:`Measurement::cluster_index_request` event. - global cluster_index_response: event(uid: string, id: string, filter_name: string, index: Index, val: ResultVal); + # :bro:id:`Measurement::cluster_key_request` event. + global cluster_key_response: event(uid: string, mid: string, key: Key, result: ResultTable); # This is sent by workers to indicate that they crossed the percent of the # current threshold by the percentage defined globally in # :bro:id:`Measurement::cluster_request_global_view_percent` - global cluster_index_intermediate_response: event(id: string, filter_name: string, index: Measurement::Index); + global cluster_key_intermediate_response: event(mid: string, key: Measurement::Key); # This event is scheduled internally on workers to send result chunks. - global send_data: event(uid: string, id: string, filter_name: string, data: MetricTable); + global send_data: event(uid: string, id: string, measurement_name: string, data: ResultTable); } - # Add events to the cluster framework to make this work. -redef Cluster::manager2worker_events += /Measurement::cluster_(filter_request|index_request)/; -redef Cluster::worker2manager_events += /Measurement::cluster_(filter_response|index_response|index_intermediate_response)/; +redef Cluster::manager2worker_events += /Measurement::cluster_(measurement_request|key_request)/; +redef Cluster::worker2manager_events += /Measurement::cluster_(measurement_response|key_response|key_intermediate_response)/; @if ( Cluster::local_node_type() != Cluster::MANAGER ) -# This variable is maintained to know what indexes they have recently sent as +# This variable is maintained to know what keysthey have recently sent as # intermediate updates so they don't overwhelm their manager. The count that is # yielded is the number of times the percentage threshold has been crossed and # an intermediate result has been received. -global recent_global_view_indexes: table[string, string, Index] of count &create_expire=1min &default=0; +global recent_global_view_keys: table[string, string, Key] of count &create_expire=1min &default=0; # This is done on all non-manager node types in the event that a metric is # being collected somewhere other than a worker. -function data_added(filter: Filter, index: Index, val: ResultVal) +function data_added(measurement: Filter, key: Key, val: Result) { # If an intermediate update for this value was sent recently, don't send # it again. - if ( [filter$id, filter$name, index] in recent_global_view_indexes ) + if ( [measurement$id, measurement$name, key] in recent_global_view_keys ) return; # If val is 5 and global view % is 0.1 (10%), pct_val will be 50. If that # crosses the full threshold then it's a candidate to send as an # intermediate update. if ( enable_intermediate_updates && - check_thresholds(filter, index, val, cluster_request_global_view_percent) ) + check_thresholds(measurement, key, val, cluster_request_global_view_percent) ) { # kick off intermediate update - event Measurement::cluster_index_intermediate_response(filter$id, filter$name, index); - ++recent_global_view_indexes[filter$id, filter$name, index]; + event Measurement::cluster_key_intermediate_response(measurement$id, measurement$name, key); + ++recent_global_view_keys[measurement$id, measurement$name, key]; } } -event Measurement::send_data(uid: string, id: string, filter_name: string, data: MetricTable) +event Measurement::send_data(uid: string, id: string, data: ResultTable) { #print fmt("WORKER %s: sending data for uid %s...", Cluster::node, uid); - local local_data: MetricTable; + local local_data: ResultTable; local num_added = 0; - for ( index in data ) + for ( key in data ) { - local_data[index] = data[index]; - delete data[index]; + local_data[key] = data[key]; + delete data[key]; # Only send cluster_send_in_groups_of at a time. Queue another # event to send the next group. @@ -115,35 +114,35 @@ event Measurement::send_data(uid: string, id: string, filter_name: string, data: if ( |data| == 0 ) done = T; - event Measurement::cluster_filter_response(uid, id, filter_name, local_data, done); + event Measurement::cluster_measurement_response(uid, local_data, done); if ( ! done ) - event Measurement::send_data(uid, id, filter_name, data); + event Measurement::send_data(uid, mid, data); } -event Measurement::cluster_filter_request(uid: string, id: string, filter_name: string) +event Measurement::cluster_measurement_request(uid: string, mid: string) { - #print fmt("WORKER %s: received the cluster_filter_request event for %s.", Cluster::node, id); + #print fmt("WORKER %s: received the cluster_measurement_request event for %s.", Cluster::node, id); - # Initiate sending all of the data for the requested filter. - event Measurement::send_data(uid, id, filter_name, store[id, filter_name]); + # Initiate sending all of the data for the requested measurement. + event Measurement::send_data(uid, mid, result_store[mid]); - # Lookup the actual filter and reset it, the reference to the data + # Lookup the actual measurement and reset it, the reference to the data # currently stored will be maintained internally by the send_data event. - reset(filter_store[id, filter_name]); + reset(measurement_store[mid]); } -event Measurement::cluster_index_request(uid: string, id: string, filter_name: string, index: Index) +event Measurement::cluster_key_request(uid: string, mid: string, key: Key) { - if ( [id, filter_name] in store && index in store[id, filter_name] ) + if ( [mid] in result_store && key in result_store[mid] ) { - #print fmt("WORKER %s: received the cluster_index_request event for %s=%s.", Cluster::node, index2str(index), data); - event Measurement::cluster_index_response(uid, id, filter_name, index, store[id, filter_name][index]); + #print fmt("WORKER %s: received the cluster_key_request event for %s=%s.", Cluster::node, key2str(key), data); + event Measurement::cluster_key_response(uid, mid, key, result_store[mid][key]); } else { # We need to send an empty response if we don't have the data so that the manager # can know that it heard back from all of the workers. - event Measurement::cluster_index_response(uid, id, filter_name, index, [$begin=network_time(), $end=network_time()]); + event Measurement::cluster_key_response(uid, mid, key, [$begin=network_time(), $end=network_time()]); } } @@ -153,12 +152,8 @@ event Measurement::cluster_index_request(uid: string, id: string, filter_name: s @if ( Cluster::local_node_type() == Cluster::MANAGER ) # This variable is maintained by manager nodes as they collect and aggregate -# results. -global filter_results: table[string, string, string] of MetricTable &read_expire=1min; - -# This is maintained by managers so they can know what data they requested and -# when they requested it. -global requested_results: table[string] of time = table() &create_expire=5mins; +# results. It's index on a uid. +global measurement_results: table[string] of ResultTable &read_expire=1min; # This variable is maintained by manager nodes to track how many "dones" they # collected per collection unique id. Once the number of results for a uid @@ -168,50 +163,49 @@ global requested_results: table[string] of time = table() &create_expire=5mins; global done_with: table[string] of count &read_expire=1min &default=0; # This variable is maintained by managers to track intermediate responses as -# they are getting a global view for a certain index. -global index_requests: table[string, string, string, Index] of ResultVal &read_expire=1min; +# they are getting a global view for a certain key. Indexed on a uid. +global key_requests: table[string] of Result &read_expire=1min; # This variable is maintained by managers to prevent overwhelming communication due -# to too many intermediate updates. Each metric filter is tracked separately so that -# one metric won't overwhelm and degrade other quieter metrics. -global outstanding_global_views: table[string, string] of count &default=0; +# to too many intermediate updates. Each measurement is tracked separately so that +# one metric won't overwhelm and degrade other quieter metrics. Indexed on a +# measurement id. +global outstanding_global_views: table[string] of count &default=0; # Managers handle logging. -event Measurement::finish_period(filter: Filter) +event Measurement::finish_period(m: Measurement) { - #print fmt("%.6f MANAGER: breaking %s filter for %s metric", network_time(), filter$name, filter$id); + #print fmt("%.6f MANAGER: breaking %s measurement for %s metric", network_time(), measurement$name, measurement$id); local uid = unique_id(""); - # Set some tracking variables. - requested_results[uid] = network_time(); - if ( [uid, filter$id, filter$name] in filter_results ) - delete filter_results[uid, filter$id, filter$name]; - filter_results[uid, filter$id, filter$name] = table(); + if ( uid in measurement_results ) + delete measurement_results[uid]; + measurement_results[uid] = table(); # Request data from peers. - event Measurement::cluster_filter_request(uid, filter$id, filter$name); + event Measurement::cluster_measurement_request(uid, m$id); # Schedule the next finish_period event. - schedule filter$every { Measurement::finish_period(filter) }; + schedule m$epoch { Measurement::finish_period(m) }; } -# This is unlikely to be called often, but it's here in case there are metrics +# This is unlikely to be called often, but it's here in case there are measurements # being collected by managers. -function data_added(filter: Filter, index: Index, val: ResultVal) +function data_added(m: Measurement, key: Key, result: Result) { - if ( check_thresholds(filter, index, val, 1.0) ) - threshold_crossed(filter, index, val); + #if ( check_thresholds(m, key, val, 1.0) ) + # threshold_crossed(m, key, val); } -event Measurement::cluster_index_response(uid: string, id: string, filter_name: string, index: Index, val: ResultVal) +event Measurement::cluster_key_response(uid: string, mid: string, key: Key, result: Result) { - #print fmt("%0.6f MANAGER: receiving index data from %s - %s=%s", network_time(), get_event_peer()$descr, index2str(index), val); + #print fmt("%0.6f MANAGER: receiving key data from %s - %s=%s", network_time(), get_event_peer()$descr, key2str(key), val); # We only want to try and do a value merge if there are actually measured datapoints - # in the ResultVal. - if ( val$num > 0 && [uid, id, filter_name, index] in index_requests ) - index_requests[uid, id, filter_name, index] = merge_result_vals(index_requests[uid, id, filter_name, index], val); + # in the Result. + if ( result$num > 0 && uid in key_requests ) + key_requests[uid] = compose_resultvals(key_requests[uid], result); else - index_requests[uid, id, filter_name, index] = val; + key_requests[uid] = result; # Mark that this worker is done. ++done_with[uid]; @@ -219,27 +213,27 @@ event Measurement::cluster_index_response(uid: string, id: string, filter_name: #print fmt("worker_count:%d :: done_with:%d", Cluster::worker_count, done_with[uid]); if ( Cluster::worker_count == done_with[uid] ) { - local ir = index_requests[uid, id, filter_name, index]; - if ( check_thresholds(filter_store[id, filter_name], index, ir, 1.0) ) - { - threshold_crossed(filter_store[id, filter_name], index, ir); - } + local m = measurement_store[mid]; + local ir = key_requests[uid]; + if ( check_thresholds(m, key, ir, 1.0) ) + threshold_crossed(m, key, ir); + delete done_with[uid]; - delete index_requests[uid, id, filter_name, index]; + delete key_requests[uid]; # Check that there is an outstanding view before subtracting. - if ( outstanding_global_views[id, filter_name] > 0 ) - --outstanding_global_views[id, filter_name]; + if ( outstanding_global_views[mid] > 0 ) + --outstanding_global_views[mid]; } } # Managers handle intermediate updates here. -event Measurement::cluster_index_intermediate_response(id: string, filter_name: string, index: Index) +event Measurement::cluster_key_intermediate_response(mid: string, key: Key) { - #print fmt("MANAGER: receiving intermediate index data from %s", get_event_peer()$descr); - #print fmt("MANAGER: requesting index data for %s", index2str(index)); + #print fmt("MANAGER: receiving intermediate key data from %s", get_event_peer()$descr); + #print fmt("MANAGER: requesting key data for %s", key2str(key)); - if ( [id, filter_name] in outstanding_global_views && - |outstanding_global_views[id, filter_name]| > max_outstanding_global_views ) + if ( [mid] in outstanding_global_views && + |outstanding_global_views[mid]| > max_outstanding_global_views ) { # Don't do this intermediate update. Perhaps at some point in the future # we will queue and randomly select from these ignored intermediate @@ -247,38 +241,38 @@ event Measurement::cluster_index_intermediate_response(id: string, filter_name: return; } - ++outstanding_global_views[id, filter_name]; + ++outstanding_global_views[mid]; local uid = unique_id(""); - event Measurement::cluster_index_request(uid, id, filter_name, index); + event Measurement::cluster_key_request(uid, mid, key); } -event Measurement::cluster_filter_response(uid: string, id: string, filter_name: string, data: MetricTable, done: bool) +event Measurement::cluster_measurement_response(uid: string, mid: string, data: ResultTable, done: bool) { #print fmt("MANAGER: receiving results from %s", get_event_peer()$descr); - + # Mark another worker as being "done" for this uid. if ( done ) ++done_with[uid]; - local local_data = filter_results[uid, id, filter_name]; - local filter = filter_store[id, filter_name]; + local local_data = measurement_results[uid]; + local m = measurement_store[mid]; - for ( index in data ) + for ( key in data ) { - if ( index in local_data ) - local_data[index] = merge_result_vals(local_data[index], data[index]); + if ( key in local_data ) + local_data[key] = compose_resultvals(local_data[key], data[key]); else - local_data[index] = data[index]; + local_data[key] = data[key]; - # If a filter is done being collected, thresholds for each index + # If a measurement is done being collected, thresholds for each key # need to be checked so we're doing it here to avoid doubly iterating - # over each index. + # over each key. if ( Cluster::worker_count == done_with[uid] ) { - if ( check_thresholds(filter, index, local_data[index], 1.0) ) + if ( check_thresholds(m, key, local_data[key], 1.0) ) { - threshold_crossed(filter, index, local_data[index]); + threshold_crossed(m, key, local_data[key]); } } } @@ -286,22 +280,14 @@ event Measurement::cluster_filter_response(uid: string, id: string, filter_name: # If the data has been collected from all peers, we are done and ready to finish. if ( Cluster::worker_count == done_with[uid] ) { - local ts = network_time(); - # Log the time this was initially requested if it's available. - if ( uid in requested_results ) - { - ts = requested_results[uid]; - delete requested_results[uid]; - } - - if ( filter?$period_finished ) - filter$period_finished(ts, filter$id, filter$name, local_data); + if ( m?$period_finished ) + m$period_finished(local_data); # Clean up - delete filter_results[uid, id, filter_name]; + delete measurement_results[uid]; delete done_with[uid]; - # Not sure I need to reset the filter on the manager. - reset(filter); + # Not sure I need to reset the measurement on the manager. + reset(m); } } diff --git a/scripts/base/frameworks/measurement/main.bro b/scripts/base/frameworks/measurement/main.bro index 3809fb16cc..a7f22ed3b7 100644 --- a/scripts/base/frameworks/measurement/main.bro +++ b/scripts/base/frameworks/measurement/main.bro @@ -5,24 +5,16 @@ module Measurement; export { - ## The metrics logging stream identifier. - redef enum Log::ID += { LOG }; - - ## This is the interval for how often threshold based notices will happen - ## after they have already fired. - const threshold_crossed_restart_interval = 1hr &redef; - ## The various calculations are all defined as plugins. type Calculation: enum { PLACEHOLDER }; - ## Represents a thing which is having metrics collected for it. An instance - ## of this record type and an id together represent a single measurement. - type Index: record { + ## Represents a thing which is having measurement results collected for it. + type Key: record { ## A non-address related metric or a sub-key for an address based metric. ## An example might be successful SSH connections by client IP address - ## where the client string would be the index value. + ## where the client string would be the key value. ## Another example might be number of HTTP requests to a particular ## value in a Host header. This is an example of a non-host based ## metric since multiple IP addresses could respond for the same Host @@ -44,176 +36,152 @@ export { str: string &optional; }; - ## Value supplied when a metric is finished. It contains all - ## of the measurements collected for the metric. Most of the - ## fields are added by calculation plugins. - type ResultVal: record { - ## The time when this result was first started. + type Reducer: record { + ## Data stream identifier for the reducer to attach to. + stream: string; + + ## The calculations to perform on the data points. + apply: set[Calculation]; + + ## A predicate so that you can decide per key if you would like + ## to accept the data being inserted. + pred: function(key: Measurement::Key, data: Measurement::DataPoint): bool &optional; + + ## A function to normalize the key. This can be used to aggregate or + ## normalize the entire key. + normalize_key: function(key: Measurement::Key): Key &optional; + }; + + ## Value calculated for a data point stream fed into a reducer. + ## Most of the fields are added by plugins. + type Result: record { + ## The time when the first data point was added to this result value. begin: time &log; - ## The time when the last value was added to this result. + ## The time when the last data point was added to this result value. end: time &log; ## The number of measurements received. num: count &log &default=0; - - ## A sample of something being measured. This is helpful in - ## some cases for collecting information to do further detection - ## or better logging for forensic purposes. - samples: vector of string &optional; - }; - - type Measurement: record { - ## The calculations to perform on the data. - apply: set[Calculation]; - - ## A predicate so that you can decide per index if you would like - ## to accept the data being inserted. - pred: function(index: Measurement::Index, data: Measurement::DataPoint): bool &optional; - - ## A function to normalize the index. This can be used to aggregate or - ## normalize the entire index. - normalize_func: function(index: Measurement::Index): Index &optional; - - ## A number of sample DataPoints to collect. - samples: count &optional; }; - - type Results: record { - begin: time; - end: time; - result - }; - - ## Type to store a table of metrics result values. - type ResultTable: table[Index] of Results; + ## Type to store a table of measurement results. First table is + ## indexed on the measurement Key and the enclosed table is + ## indexed on the data id that the Key was relevant for. + type ResultTable: table[Key] of table[string] of Result; ## Filters define how the data from a metric is aggregated and handled. ## Filters can be used to set how often the measurements are cut ## and logged or how the data within them is aggregated. - type Filter: record { - ## A name for the filter in case multiple filters are being - ## applied to the same metric. In most cases the default - ## filter name is fine and this field does not need to be set. - id: string; - - ## The interval at which this filter should be "broken" and written - ## to the logging stream. The counters are also reset to zero at + type Measurement: record { + ## The interval at which this filter should be "broken" and the + ## callback called. The counters are also reset to zero at ## this time so any threshold based detection needs to be set to a ## number that should be expected to happen within this period. - every: interval; + epoch: interval; - ## Optionally provide a function to calculate a value from the ResultVal - ## structure which will be used for thresholding. If no function is - ## provided, then in the following order of preference either the - ## $unique or the $sum fields will be used. - threshold_val_func: function(val: Measurement::ResultVal): count &optional; + ## The reducers for the measurement indexed by data id. + reducers: set[Reducer]; + + ## Optionally provide a function to calculate a value from the Result + ## structure which will be used for thresholding. + threshold_val: function(result: Measurement::Result): count &optional; ## The threshold value for calling the $threshold_crossed callback. threshold: count &optional; ## A series of thresholds for calling the $threshold_crossed callback. threshold_series: vector of count &optional; + + ## A callback that is called when a threshold is crossed. + threshold_crossed: function(key: Measurement::Key, result: Measurement::Result) &optional; - ## A callback with the full collection of ResultVals for this filter. + ## A callback with the full collection of Results for this filter. ## It's best to not access any global state outside of the variables ## given to the callback because there is no assurance provided as to ## where the callback will be executed on clusters. period_finished: function(data: Measurement::ResultTable) &optional; - - ## A callback that is called when a threshold is crossed. - threshold_crossed: function(index: Measurement::Index, val: Measurement::ResultVal) &optional; }; - ## Function to associate a metric filter with a metric ID. - ## - ## id: The metric ID that the filter should be associated with. - ## - ## filter: The record representing the filter configuration. - global add_filter: function(id: string, filter: Measurement::Filter); - + ## Create a measurement. + global create: function(m: Measurement::Measurement); + ## Add data into a metric. This should be called when ## a script has measured some point value and is ready to increment the ## counters. ## ## id: The metric identifier that the data represents. ## - ## index: The metric index that the value is to be added to. + ## key: The metric key that the value is to be added to. ## - ## increment: How much to increment the counter by. - global add_data: function(id: string, index: Measurement::Index, data: Measurement::DataPoint); + ## data: The data point to send into the stream. + global add_data: function(id: string, key: Measurement::Key, data: Measurement::DataPoint); - ## Helper function to represent a :bro:type:`Measurement::Index` value as + ## Helper function to represent a :bro:type:`Measurement::Key` value as ## a simple string. ## - ## index: The metric index that is to be converted into a string. + ## key: The metric key that is to be converted into a string. ## - ## Returns: A string reprentation of the metric index. - global index2str: function(index: Measurement::Index): string; - - ## Event to access metrics records as they are passed to the logging framework. - global log_metrics: event(rec: Measurement::Info); + ## Returns: A string representation of the metric key. + global key2str: function(key: Measurement::Key): string; } -redef record Filter += { - # Internal use only. The metric that this filter applies to. The value is automatically set. - id: string &optional; +redef record Reducer += { + # Internal use only. Measurement ID. + mid: string &optional; }; -redef record ResultVal += { - # Internal use only. This is the queue where samples - # are maintained since the queue is self managing for - # the number of samples requested. - sample_queue: Queue::Queue &optional; - +redef record Result += { # Internal use only. Indicates if a simple threshold was already crossed. is_threshold_crossed: bool &default=F; - # Internal use only. Current index for threshold series. + # Internal use only. Current key for threshold series. threshold_series_index: count &default=0; }; -# Store the filters indexed on the metric identifier and filter name. -global filter_store: table[string, string] of Filter = table(); +redef record Measurement += { + # Internal use only (mostly for cluster coherency). + id: string &optional; +}; -# This is indexed by metric id and filter name. -global store: table[string, string] of ResultTable = table(); +# Store of reducers indexed on the data id. +global reducer_store: table[string] of set[Reducer] = table(); -# This is a hook for watching thresholds being crossed. It is called whenever -# index values are updated and the new val is given as the `val` argument. +# Store of results indexed on the measurement id. +global result_store: table[string] of ResultTable = table(); + +# Store of measurements indexed on the measurement id. +global measurement_store: table[string] of Measurement = table(); + +# This is called whenever +# key values are updated and the new val is given as the `val` argument. # It's only prototyped here because cluster and non-cluster have separate # implementations. -global data_added: function(filter: Filter, index: Index, val: ResultVal); +global data_added: function(m: Measurement, key: Key, result: Result); # Prototype the hook point for plugins to do calculations. -global add_to_calculation: hook(filter: Filter, val: double, data: DataPoint, result: ResultVal); -# Prototype the hook point for plugins to merge Measurements. -global plugin_merge_measurements: hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal); +global add_to_reducer: hook(r: Reducer, val: double, data: DataPoint, result: Result); +# Prototype the hook point for plugins to merge Results. +global compose_resultvals_hook: hook(result: Result, rv1: Result, rv2: Result); -# Event that is used to "finish" metrics and adapt the metrics +# Event that is used to "finish" measurements and adapt the measurement # framework for clustered or non-clustered usage. -global finish_period: event(filter: Measurement::Filter); +global finish_period: event(m: Measurement); -event bro_init() &priority=5 - { - Log::create_stream(Measurement::LOG, [$columns=Info, $ev=log_metrics]); - } - -function index2str(index: Index): string +function key2str(key: Key): string { local out = ""; - if ( index?$host ) - out = fmt("%shost=%s", out, index$host); - if ( index?$str ) - out = fmt("%s%sstr=%s", out, |out|==0 ? "" : ", ", index$str); - return fmt("metric_index(%s)", out); + if ( key?$host ) + out = fmt("%shost=%s", out, key$host); + if ( key?$str ) + out = fmt("%s%sstr=%s", out, |out|==0 ? "" : ", ", key$str); + return fmt("metric_key(%s)", out); } -function merge_result_vals(rv1: ResultVal, rv2: ResultVal): ResultVal +function compose_resultvals(rv1: Result, rv2: Result): Result { - local result: ResultVal; + local result: Result; # Merge $begin (take the earliest one) result$begin = (rv1$begin < rv2$begin) ? rv1$begin : rv2$begin; @@ -224,16 +192,6 @@ function merge_result_vals(rv1: ResultVal, rv2: ResultVal): ResultVal # Merge $num result$num = rv1$num + rv2$num; - hook plugin_merge_measurements(result, rv1, rv2); - - # Merge $sample_queue - if ( rv1?$sample_queue && rv2?$sample_queue ) - result$sample_queue = Queue::merge(rv1$sample_queue, rv2$sample_queue); - else if ( rv1?$sample_queue ) - result$sample_queue = rv1$sample_queue; - else if ( rv2?$sample_queue ) - result$sample_queue = rv2$sample_queue; - # Merge $threshold_series_index result$threshold_series_index = (rv1$threshold_series_index > rv2$threshold_series_index) ? rv1$threshold_series_index : rv2$threshold_series_index; @@ -241,105 +199,103 @@ function merge_result_vals(rv1: ResultVal, rv2: ResultVal): ResultVal if ( rv1$is_threshold_crossed || rv2$is_threshold_crossed ) result$is_threshold_crossed = T; + hook compose_resultvals_hook(result, rv1, rv2); + return result; } -function reset(filter: Filter) +function reset(m: Measurement) { - if ( [filter$id, filter$name] in store ) - delete store[filter$id, filter$name]; + if ( m$id in result_store ) + delete result_store[m$id]; - store[filter$id, filter$name] = table(); + result_store[m$id] = table(); } -function add_filter(id: string, filter: Filter) +function create(m: Measurement) { - if ( [id, filter$name] in store ) + m$id=unique_id(""); + measurement_store[m$id] = m; + + for ( reducer in m$reducers ) { - Reporter::warning(fmt("invalid Metric filter (%s): Filter with same name already exists.", filter$name)); - return; + reducer$mid = m$id; + if ( reducer$stream !in reducer_store ) + reducer_store[reducer$stream] = set(); + add reducer_store[reducer$stream][reducer]; } - if ( ! filter?$id ) - filter$id = id; - - filter_store[id, filter$name] = filter; - store[id, filter$name] = table(); - - schedule filter$every { Measurement::finish_period(filter) }; + reset(m); + schedule m$epoch { Measurement::finish_period(m) }; } -function add_data(id: string, index: Index, data: DataPoint) +function add_data(data_id: string, key: Key, data: DataPoint) { - # Try to add the data to all of the defined filters for the metric. - for ( [metric_id, filter_id] in filter_store ) + # Try to add the data to all of the defined reducers. + if ( data_id !in reducer_store ) + return; + + for ( r in reducer_store[data_id] ) { - local filter = filter_store[metric_id, filter_id]; - - # If this filter has a predicate, run the predicate and skip this - # index if the predicate return false. - if ( filter?$pred && ! filter$pred(index, data) ) + # If this reducer has a predicate, run the predicate + # and skip this key if the predicate return false. + if ( r?$pred && ! r$pred(key, data) ) next; - #if ( filter?$normalize_func ) - # index = filter$normalize_func(copy(index)); + if ( r?$normalize_key ) + key = r$normalize_key(copy(key)); - local metric_tbl = store[id, filter$name]; - if ( index !in metric_tbl ) - metric_tbl[index] = [$begin=network_time(), $end=network_time()]; + local m = measurement_store[r$mid]; + local results = result_store[m$id]; + if ( key !in results ) + results[key] = table(); + if ( data_id !in results[key] ) + results[key][data_id] = [$begin=network_time(), $end=network_time()]; - local result = metric_tbl[index]; + local result = results[key][data_id]; + ++result$num; + # Continually update the $end field. + result$end=network_time(); # If a string was given, fall back to 1.0 as the value. local val = 1.0; if ( data?$num || data?$dbl ) val = data?$dbl ? data$dbl : data$num; - ++result$num; - # Continually update the $end field. - result$end=network_time(); - - #if ( filter?$samples && filter$samples > 0 && data?$str ) - # { - # if ( ! result?$sample_queue ) - # result$sample_queue = Queue::init([$max_len=filter$samples]); - # Queue::push(result$sample_queue, data$str); - # } - - hook add_to_calculation(filter, val, data, result); - data_added(filter, index, result); + hook add_to_reducer(r, val, data, result); + data_added(m, key, result); } } # This function checks if a threshold has been crossed. It is also used as a method to implement # mid-break-interval threshold crossing detection for cluster deployments. -function check_thresholds(filter: Filter, index: Index, val: ResultVal, modify_pct: double): bool +function check_thresholds(m: Measurement, key: Key, result: Result, modify_pct: double): bool { - if ( ! (filter?$threshold || filter?$threshold_series) ) - return; + if ( ! (m?$threshold || m?$threshold_series) ) + return F; local watch = 0.0; - if ( val?$unique ) - watch = val$unique; - else if ( val?$sum ) - watch = val$sum; + #if ( val?$unique ) + # watch = val$unique; + #else if ( val?$sum ) + # watch = val$sum; - if ( filter?$threshold_val_func ) - watch = filter$threshold_val_func(val); + if ( m?$threshold_val ) + watch = m$threshold_val(result); if ( modify_pct < 1.0 && modify_pct > 0.0 ) watch = watch/modify_pct; - if ( ! val$is_threshold_crossed && - filter?$threshold && watch >= filter$threshold ) + if ( ! result$is_threshold_crossed && + m?$threshold && watch >= m$threshold ) { # A default threshold was given and the value crossed it. return T; } - if ( filter?$threshold_series && - |filter$threshold_series| >= val$threshold_series_index && - watch >= filter$threshold_series[val$threshold_series_index] ) + if ( m?$threshold_series && + |m$threshold_series| >= result$threshold_series_index && + watch >= m$threshold_series[result$threshold_series_index] ) { # A threshold series was given and the value crossed the next # value in the series. @@ -349,19 +305,19 @@ function check_thresholds(filter: Filter, index: Index, val: ResultVal, modify_p return F; } -function threshold_crossed(filter: Filter, index: Index, val: ResultVal) +function threshold_crossed(m: Measurement, key: Key, result: Result) { - if ( ! filter?$threshold_crossed ) + if ( ! m?$threshold_crossed ) return; - if ( val?$sample_queue ) - val$samples = Queue::get_str_vector(val$sample_queue); + #if ( val?$sample_queue ) + # val$samples = Queue::get_str_vector(val$sample_queue); - filter$threshold_crossed(index, val); - val$is_threshold_crossed = T; + m$threshold_crossed(key, result); + result$is_threshold_crossed = T; # Bump up to the next threshold series index if a threshold series is being used. - if ( filter?$threshold_series ) - ++val$threshold_series_index; + if ( m?$threshold_series ) + ++result$threshold_series_index; } diff --git a/scripts/base/frameworks/measurement/non-cluster.bro b/scripts/base/frameworks/measurement/non-cluster.bro index 11bb7f16dc..7a0a2a2c3e 100644 --- a/scripts/base/frameworks/measurement/non-cluster.bro +++ b/scripts/base/frameworks/measurement/non-cluster.bro @@ -2,20 +2,23 @@ module Measurement; -event Measurement::finish_period(filter: Filter) +event Measurement::finish_period(m: Measurement) { - local data = store[filter$id, filter$name]; - if ( filter?$period_finished ) - filter$period_finished(network_time(), filter$id, filter$name, data); + if ( m$id in result_store ) + { + local data = result_store[m$id]; + if ( m?$period_finished ) + m$period_finished(data); - reset(filter); + reset(m); + } - schedule filter$every { Measurement::finish_period(filter) }; + schedule m$epoch { Measurement::finish_period(m) }; } -function data_added(filter: Filter, index: Index, val: ResultVal) +function data_added(m: Measurement, key: Key, result: Result) { - if ( check_thresholds(filter, index, val, 1.0) ) - threshold_crossed(filter, index, val); + if ( check_thresholds(m, key, result, 1.0) ) + threshold_crossed(m, key, result); } diff --git a/scripts/base/frameworks/measurement/plugins/__load__.bro b/scripts/base/frameworks/measurement/plugins/__load__.bro index b708f917d1..0d4c2ed302 100644 --- a/scripts/base/frameworks/measurement/plugins/__load__.bro +++ b/scripts/base/frameworks/measurement/plugins/__load__.bro @@ -1,6 +1,7 @@ @load ./average @load ./max @load ./min +@load ./sample @load ./std-dev @load ./sum @load ./unique diff --git a/scripts/base/frameworks/measurement/plugins/average.bro b/scripts/base/frameworks/measurement/plugins/average.bro index d3e1bef4d5..629cb2fc7b 100644 --- a/scripts/base/frameworks/measurement/plugins/average.bro +++ b/scripts/base/frameworks/measurement/plugins/average.bro @@ -1,5 +1,5 @@ -module Metrics; +module Measurement; export { redef enum Calculation += { @@ -7,15 +7,15 @@ export { AVERAGE }; - redef record ResultVal += { + redef record Result += { ## For numeric data, this calculates the average of all values. average: double &log &optional; }; } -hook add_to_calculation(filter: Filter, val: double, data: DataPoint, result: ResultVal) +hook add_to_reducer(r: Reducer, val: double, data: DataPoint, result: Result) { - if ( AVERAGE in filter$measure ) + if ( AVERAGE in r$apply ) { if ( ! result?$average ) result$average = val; @@ -24,7 +24,7 @@ hook add_to_calculation(filter: Filter, val: double, data: DataPoint, result: Re } } -hook plugin_merge_measurements(result: ResultVal, rv1: ResultVal, rv2: ResultVal) +hook compose_resultvals_hook(result: Result, rv1: Result, rv2: Result) { if ( rv1?$average && rv2?$average ) result$average = ((rv1$average*rv1$num) + (rv2$average*rv2$num))/(rv1$num+rv2$num); diff --git a/scripts/base/frameworks/measurement/plugins/max.bro b/scripts/base/frameworks/measurement/plugins/max.bro index 806713dbd4..5138e3f684 100644 --- a/scripts/base/frameworks/measurement/plugins/max.bro +++ b/scripts/base/frameworks/measurement/plugins/max.bro @@ -1,5 +1,5 @@ -module Metrics; +module Measurement; export { redef enum Calculation += { @@ -7,15 +7,15 @@ export { MAX }; - redef record ResultVal += { + redef record Result += { ## For numeric data, this tracks the maximum value given. max: double &log &optional; }; } -hook add_to_calculation(filter: Filter, val: double, data: DataPoint, result: ResultVal) +hook add_to_reducer(r: Reducer, val: double, data: DataPoint, result: Result) { - if ( MAX in filter$measure ) + if ( MAX in r$apply ) { if ( ! result?$max ) result$max = val; @@ -24,7 +24,7 @@ hook add_to_calculation(filter: Filter, val: double, data: DataPoint, result: Re } } -hook plugin_merge_measurements(result: ResultVal, rv1: ResultVal, rv2: ResultVal) +hook compose_resultvals_hook(result: Result, rv1: Result, rv2: Result) { if ( rv1?$max && rv2?$max ) result$max = (rv1$max > rv2$max) ? rv1$max : rv2$max; diff --git a/scripts/base/frameworks/measurement/plugins/min.bro b/scripts/base/frameworks/measurement/plugins/min.bro index e0d4003b31..ebdbb39373 100644 --- a/scripts/base/frameworks/measurement/plugins/min.bro +++ b/scripts/base/frameworks/measurement/plugins/min.bro @@ -1,5 +1,5 @@ -module Metrics; +module Measurement; export { redef enum Calculation += { @@ -7,15 +7,15 @@ export { MIN }; - redef record ResultVal += { + redef record Result += { ## For numeric data, this tracks the minimum value given. min: double &log &optional; }; } -hook add_to_calculation(filter: Filter, val: double, data: DataPoint, result: ResultVal) +hook add_to_reducer(r: Reducer, val: double, data: DataPoint, result: Result) { - if ( MIN in filter$measure ) + if ( MIN in r$apply ) { if ( ! result?$min ) result$min = val; @@ -24,7 +24,7 @@ hook add_to_calculation(filter: Filter, val: double, data: DataPoint, result: Re } } -hook plugin_merge_measurements(result: ResultVal, rv1: ResultVal, rv2: ResultVal) +hook compose_resultvals_hook(result: Result, rv1: Result, rv2: Result) { if ( rv1?$min && rv2?$min ) result$min = (rv1$min < rv2$min) ? rv1$min : rv2$min; diff --git a/scripts/base/frameworks/measurement/plugins/sample.bro b/scripts/base/frameworks/measurement/plugins/sample.bro new file mode 100644 index 0000000000..3edd92ce13 --- /dev/null +++ b/scripts/base/frameworks/measurement/plugins/sample.bro @@ -0,0 +1,45 @@ + +module Measurement; + +export { + + redef record Reducer += { + ## A number of sample DataPoints to collect. + samples: count &default=0; + }; + + redef record Result += { + ## A sample of something being measured. This is helpful in + ## some cases for collecting information to do further detection + ## or better logging for forensic purposes. + samples: vector of Measurement::DataPoint &optional; + }; +} + +redef record Result += { + # Internal use only. This is the queue where samples + # are maintained since the queue is self managing for + # the number of samples requested. + sample_queue: Queue::Queue &optional; +}; + +hook add_to_reducer(r: Reducer, val: double, data: DataPoint, result: Result) + { + if ( r$samples > 0 ) + { + if ( ! result?$sample_queue ) + result$sample_queue = Queue::init([$max_len=r$samples]); + Queue::push(result$sample_queue, data$str); + } + } + +hook compose_resultvals_hook(result: Result, rv1: Result, rv2: Result) + { + # Merge $sample_queue + if ( rv1?$sample_queue && rv2?$sample_queue ) + result$sample_queue = Queue::merge(rv1$sample_queue, rv2$sample_queue); + else if ( rv1?$sample_queue ) + result$sample_queue = rv1$sample_queue; + else if ( rv2?$sample_queue ) + result$sample_queue = rv2$sample_queue; + } \ No newline at end of file diff --git a/scripts/base/frameworks/measurement/plugins/std-dev.bro b/scripts/base/frameworks/measurement/plugins/std-dev.bro index cbd0db3416..6d13d7fc51 100644 --- a/scripts/base/frameworks/measurement/plugins/std-dev.bro +++ b/scripts/base/frameworks/measurement/plugins/std-dev.bro @@ -1,7 +1,7 @@ @load ./sum @load ./variance -module Metrics; +module Measurement; export { redef enum Calculation += { @@ -9,23 +9,23 @@ export { STD_DEV }; - redef record ResultVal += { + redef record Result += { ## For numeric data, this calculates the standard deviation. std_dev: double &log &optional; }; } # This depends on the variance plugin which uses priority -5 -hook add_to_calculation(filter: Filter, val: double, data: DataPoint, result: ResultVal) &priority=-10 +hook add_to_reducer(r: Reducer, val: double, data: DataPoint, result: Result) { - if ( STD_DEV in filter$measure ) + if ( STD_DEV in r$apply ) { if ( result?$variance ) result$std_dev = sqrt(result$variance); } } -hook plugin_merge_measurements(result: ResultVal, rv1: ResultVal, rv2: ResultVal) &priority=-10 +hook compose_resultvals_hook(result: Result, rv1: Result, rv2: Result) &priority=-10 { if ( rv1?$sum || rv2?$sum ) { diff --git a/scripts/base/frameworks/measurement/plugins/sum.bro b/scripts/base/frameworks/measurement/plugins/sum.bro index 2f615ffb6c..7e8b6ff692 100644 --- a/scripts/base/frameworks/measurement/plugins/sum.bro +++ b/scripts/base/frameworks/measurement/plugins/sum.bro @@ -1,5 +1,5 @@ -module Metrics; +module Measurement; export { redef enum Calculation += { @@ -8,15 +8,15 @@ export { SUM }; - redef record ResultVal += { + redef record Result += { ## For numeric data, this tracks the sum of all values. sum: double &log &optional; }; } -hook add_to_calculation(filter: Filter, val: double, data: DataPoint, result: ResultVal) +hook add_to_reducer(r: Reducer, val: double, data: DataPoint, result: Result) { - if ( SUM in filter$measure ) + if ( SUM in r$apply ) { if ( ! result?$sum ) result$sum = 0; @@ -24,7 +24,7 @@ hook add_to_calculation(filter: Filter, val: double, data: DataPoint, result: Re } } -hook plugin_merge_measurements(result: ResultVal, rv1: ResultVal, rv2: ResultVal) +hook compose_resultvals_hook(result: Result, rv1: Result, rv2: Result) { if ( rv1?$sum || rv2?$sum ) { diff --git a/scripts/base/frameworks/measurement/plugins/unique.bro b/scripts/base/frameworks/measurement/plugins/unique.bro index 66cab47897..4f30206a4e 100644 --- a/scripts/base/frameworks/measurement/plugins/unique.bro +++ b/scripts/base/frameworks/measurement/plugins/unique.bro @@ -1,5 +1,5 @@ -module Metrics; +module Measurement; export { redef enum Calculation += { @@ -7,14 +7,14 @@ export { UNIQUE }; - redef record ResultVal += { + redef record Result += { ## If cardinality is being tracked, the number of unique ## items is tracked here. unique: count &log &optional; }; } -redef record ResultVal += { +redef record Result += { # Internal use only. This is not meant to be publically available # because we don't want to trust that we can inspect the values # since we will like move to a probalistic data structure in the future. @@ -22,17 +22,18 @@ redef record ResultVal += { unique_vals: set[DataPoint] &optional; }; -hook add_to_calculation(filter: Filter, val: double, data: DataPoint, result: ResultVal) +hook add_to_reducer(r: Reducer, val: double, data: DataPoint, result: Result) { - if ( UNIQUE in filter$measure ) + if ( UNIQUE in r$apply ) { if ( ! result?$unique_vals ) result$unique_vals=set(); add result$unique_vals[data]; + result$unique = |result$unique_vals|; } } -hook plugin_merge_measurements(result: ResultVal, rv1: ResultVal, rv2: ResultVal) +hook compose_resultvals_hook(result: Result, rv1: Result, rv2: Result) { if ( rv1?$unique_vals || rv2?$unique_vals ) { diff --git a/scripts/base/frameworks/measurement/plugins/variance.bro b/scripts/base/frameworks/measurement/plugins/variance.bro index df83361c35..07a7293539 100644 --- a/scripts/base/frameworks/measurement/plugins/variance.bro +++ b/scripts/base/frameworks/measurement/plugins/variance.bro @@ -1,6 +1,6 @@ @load ./average -module Metrics; +module Measurement; export { redef enum Calculation += { @@ -8,13 +8,13 @@ export { VARIANCE }; - redef record ResultVal += { + redef record Result += { ## For numeric data, this calculates the variance. variance: double &log &optional; }; } -redef record ResultVal += { +redef record Result += { # Internal use only. Used for incrementally calculating variance. prev_avg: double &optional; @@ -22,16 +22,16 @@ redef record ResultVal += { var_s: double &optional; }; -hook add_to_calculation(filter: Filter, val: double, data: DataPoint, result: ResultVal) &priority=5 +hook add_to_reducer(r: Reducer, val: double, data: DataPoint, result: Result) { - if ( VARIANCE in filter$measure ) + if ( VARIANCE in r$apply ) result$prev_avg = result$average; } # Reduced priority since this depends on the average -hook add_to_calculation(filter: Filter, val: double, data: DataPoint, result: ResultVal) &priority=-5 +hook add_to_reducer(r: Reducer, val: double, data: DataPoint, result: Result) &priority=-5 { - if ( VARIANCE in filter$measure ) + if ( VARIANCE in r$apply ) { if ( ! result?$var_s ) result$var_s = 0.0; @@ -41,7 +41,7 @@ hook add_to_calculation(filter: Filter, val: double, data: DataPoint, result: Re } # Reduced priority since this depends on the average -hook plugin_merge_measurements(result: ResultVal, rv1: ResultVal, rv2: ResultVal) &priority=-5 +hook compose_resultvals_hook(result: Result, rv1: Result, rv2: Result) &priority=-5 { if ( rv1?$var_s && rv2?$var_s ) { diff --git a/scripts/policy/misc/app-metrics.bro b/scripts/policy/misc/app-metrics.bro index 68deddaa29..d76511fe98 100644 --- a/scripts/policy/misc/app-metrics.bro +++ b/scripts/policy/misc/app-metrics.bro @@ -16,51 +16,33 @@ export { }; ## The frequency of logging the stats collected by this script. - const break_interval = 15mins &redef; + const break_interval = 1min &redef; } redef record connection += { resp_hostname: string &optional; }; -function app_metrics_rollup(index: Measurement::Index, vals: table[string, string] of Measurement::ResultVal) - { - local l: Info; - l$ts = network_time(); - for ( [metric_name, filter_name] in vals ) - { - local val = vals[metric_name, filter_name]; - l$app = index$str; - if ( metric_name == "apps.bytes" ) - l$bytes = double_to_count(floor(val$sum)); - else if ( metric_name == "apps.hits" ) - { - l$hits = val$num; - l$uniq_hosts = val$unique; - } - } - } event bro_init() &priority=3 { Log::create_stream(AppMeasurement::LOG, [$columns=Info]); - #Measurement::create_index_rollup("AppMeasurement", app_metrics_rollup); - #Measurement::add_filter("apps.bytes", [$every=break_interval, $measure=set(Measurement::SUM), $rollup="AppMeasurement"]); - #Measurement::add_filter("apps.hits", [$every=break_interval, $measure=set(Measurement::UNIQUE), $rollup="AppMeasurement"]); - + local r1: Measurement::Reducer = [$stream="apps.bytes", $apply=set(Measurement::SUM)]; + local r2: Measurement::Reducer = [$stream="apps.hits", $apply=set(Measurement::UNIQUE)]; Measurement::create([$epoch=break_interval, - $measurements=table(["apps.bytes"] = [$apply=set(Measurement::SUM)], - ["apps.hits"] = [$apply=set(Measurement::UNIQUE)]), - $period_finished(result: Measurement::Results) = + $reducers=set(r1, r2), + $period_finished(data: Measurement::ResultTable) = { local l: Info; l$ts = network_time(); - for ( index in result ) + for ( key in data ) { - l$bytes = double_to_count(floor(result[index]["apps.bytes"]$sum)); - l$hits = result[index]["apps.hits"]$num; - l$uniq_hosts = result[index]["apps.hits"]$unique; + local result = data[key]; + l$app = key$str; + l$bytes = double_to_count(floor(result["apps.bytes"]$sum)); + l$hits = result["apps.hits"]$num; + l$uniq_hosts = result["apps.hits"]$unique; Log::write(LOG, l); } }]);