##! This is the implementation of the metrics framework. module Metrics; export { redef enum Log::ID += { METRICS }; type ID: enum { NOTHING, }; ## The default interval used for "breaking" metrics and writing the ## current value to the logging stream. const default_break_interval = 15mins &redef; type Info: record { ts: time &log; metric_id: ID &log; filter_name: string &log; agg_subnet: string &log &optional; index: string &log &optional; value: count &log; }; type Entry: record { ## Host is the value to which this metric applies. host: addr &optional; ## A non-address related metric or a sub-key for an address based metric. ## An example might be successful SSH connections by client IP address ## where the client string would be the index value. ## Another example might be number of HTTP requests to a particular ## value in a Host header. This is an example of a non-host based ## metric since multiple IP addresses could respond for the same Host ## header value. index: string &default=""; ## The value by which the counter should be increased in each filter ## where this entry is accepted. increment: count &default=1; }; # TODO: configure a metrics filter logging stream to log the current # metrics configuration in case someone is looking through # old logs and the configuration has changed since then. type Filter: record { ## The :bro:type:`Metrics::ID` that this filter applies to. id: ID &optional; ## The name for this filter so that multiple filters can be ## applied to a single metrics to get a different view of the same ## metric data being collected (different aggregation, break, etc). name: string &default="default"; ## A predicate so that you can decide per index if you would like ## to accept the data being inserted. pred: function(entry: Entry): bool &optional; ## Global mask by which you'd like to aggregate traffic. aggregation_mask: count &optional; ## This is essentially applying names to various subnets. aggregation_table: table[subnet] of string &optional; ## The interval at which the metric should be "broken" and written ## to the logging stream. break_interval: interval &default=default_break_interval; ## This determines if the result of this filter is sent to the metrics ## logging stream. One use for the logging framework is as an internal ## thresholding and statistics gathering utility that is meant to ## never log but rather to generate notices and derive data. log: bool &default=T; ## A straight threshold for generating a notice. notice_threshold: count &optional; ## A series of thresholds at which to generate notices. notice_thresholds: vector of count &optional; ## If this and a $notice_threshold value are set, this notice type ## will be generated by the metrics framework. note: Notice::Type &optional; }; global add_filter: function(id: ID, filter: Filter); global add_data: function(id: ID, entry: Entry); global log_metrics: event(rec: Info); } global metric_filters: table[ID] of vector of Filter = table(); type MetricIndex: table[string] of count &default=0; type MetricTable: table[string] of MetricIndex; # This is indexed by metric ID and stream filter name. global store: table[ID, string] of MetricTable = table(); # This stores the current threshold index for filters using the # $notice_thresholds element. global thresholds: table[string] of count = {} &default=0; event bro_init() &priority=5 { Log::create_stream(METRICS, [$columns=Info, $ev=log_metrics]); } function reset(filter: Filter) { store[filter$id, filter$name] = table(); } event log_it(filter: Filter) { # If this node is the manager in a cluster, this needs to request values # for this metric from all of the workers. local id = filter$id; local name = filter$name; for ( agg_subnet in store[id, name] ) { local metric_values = store[id, name][agg_subnet]; for ( index in metric_values ) { local val = metric_values[index]; local m: Info = [$ts=network_time(), $metric_id=id, $filter_name=name, $agg_subnet=fmt("%s", agg_subnet), $index=index, $value=val]; if ( filter?$notice_threshold && m$value >= filter$notice_threshold ) { print m; NOTICE([$note=filter$note, $msg=fmt("Metrics threshold crossed by %s %d/%d", m$agg_subnet, m$value, filter$notice_threshold), $n=m$value]); } else if ( filter?$notice_thresholds && m$value >= filter$notice_thresholds[thresholds[cat(id,name)]] ) { # TODO: implement this } # If there wasn't an index, remove the field. if ( index == "" ) delete m$index; # If there wasn't an aggregation subnet, remove the field. if ( agg_subnet == "" ) delete m$agg_subnet; Log::write(METRICS, m); } } reset(filter); schedule filter$break_interval { log_it(filter) }; } function add_filter(id: ID, filter: Filter) { if ( filter?$aggregation_table && filter?$aggregation_mask ) { print "INVALID Metric filter: Defined $aggregation_table and $aggregation_mask."; return; } if ( [id, filter$name] in store ) { print fmt("INVALID Metric filter: Filter with name \"%s\" already exists.", filter$name); return; } if ( filter?$notice_threshold && filter?$notice_thresholds ) { print "INVALID Metric filter: Defined both $notice_threshold and $notice_thresholds"; return; } if ( ! filter?$id ) filter$id = id; if ( id !in metric_filters ) metric_filters[id] = vector(); metric_filters[id][|metric_filters[id]|] = filter; store[id, filter$name] = table(); # Only do this on the manager if in a cluster. schedule filter$break_interval { log_it(filter) }; } function add_data(id: ID, entry: Entry) { if ( id !in metric_filters ) return; local filters = metric_filters[id]; # Add the data to any of the defined filters. for ( filter_id in filters ) { local filter = filters[filter_id]; # If this filter has a predicate, run the predicate and skip this # entry if the predicate return false. if ( filter?$pred && ! filter$pred(entry) ) next; local agg_subnet = ""; local filt_store = store[id, filter$name]; if ( entry?$host ) { if ( filter?$aggregation_mask ) { local agg_mask = filter$aggregation_mask; agg_subnet = fmt("%s", mask_addr(entry$host, agg_mask)); } else if ( filter?$aggregation_table ) { agg_subnet = fmt("%s", filter$aggregation_table[entry$host]); # if an aggregation table is being used and the value isn't # in the table, that means we aren't interested in it. if ( agg_subnet == "" ) next; } else agg_subnet = fmt("%s", entry$host); } if ( agg_subnet !in filt_store ) filt_store[agg_subnet] = table(); local fs = filt_store[agg_subnet]; if ( entry$index !in fs ) fs[entry$index] = 0; fs[entry$index] = fs[entry$index] + entry$increment; } }