##! The metrics framework provides a way to count and measure data. @load base/frameworks/notice module Metrics; export { ## The metrics logging stream identifier. redef enum Log::ID += { LOG }; ## Identifiers for metrics to collect. type ID: enum { ## Blank placeholder value. NOTHING, }; ## The default interval used for "breaking" metrics and writing the ## current value to the logging stream. const default_break_interval = 15mins &redef; ## This is the interval for how often threshold based notices will happen ## after they have already fired. const renotice_interval = 1hr &redef; ## Represents a thing which is having metrics collected for it. An instance ## of this record type and a :bro:type:`Metrics::ID` together represent a ## single measurement. type Index: record { ## Host is the value to which this metric applies. host: addr &optional; ## A non-address related metric or a sub-key for an address based metric. ## An example might be successful SSH connections by client IP address ## where the client string would be the index value. ## Another example might be number of HTTP requests to a particular ## value in a Host header. This is an example of a non-host based ## metric since multiple IP addresses could respond for the same Host ## header value. str: string &optional; ## The CIDR block that this metric applies to. This is typically ## only used internally for host based aggregation. network: subnet &optional; } &log; ## The record type that is used for logging metrics. type Info: record { ## Timestamp at which the metric was "broken". ts: time &log; ## What measurement the metric represents. metric_id: ID &log; ## The name of the filter being logged. :bro:type:`Metrics::ID` values ## can have multiple filters which represent different perspectives on ## the data so this is necessary to understand the value. filter_name: string &log; ## What the metric value applies to. index: Index &log; ## The simple numeric value of the metric. value: count &log; }; # TODO: configure a metrics filter logging stream to log the current # metrics configuration in case someone is looking through # old logs and the configuration has changed since then. ## Filters define how the data from a metric is aggregated and handled. ## Filters can be used to set how often the measurements are cut or "broken" ## and logged or how the data within them is aggregated. It's also ## possible to disable logging and use filters for thresholding. type Filter: record { ## The :bro:type:`Metrics::ID` that this filter applies to. id: ID &optional; ## The name for this filter so that multiple filters can be ## applied to a single metrics to get a different view of the same ## metric data being collected (different aggregation, break, etc). name: string &default="default"; ## A predicate so that you can decide per index if you would like ## to accept the data being inserted. pred: function(index: Index): bool &optional; ## Global mask by which you'd like to aggregate traffic. aggregation_mask: count &optional; ## This is essentially a mapping table between addresses and subnets. aggregation_table: table[subnet] of subnet &optional; ## The interval at which this filter should be "broken" and written ## to the logging stream. The counters are also reset to zero at ## this time so any threshold based detection needs to be set to a ## number that should be expected to happen within this period. break_interval: interval &default=default_break_interval; ## This determines if the result of this filter is sent to the metrics ## logging stream. One use for the logging framework is as an internal ## thresholding and statistics gathering utility that is meant to ## never log but rather to generate notices and derive data. log: bool &default=T; ## If this and a $notice_threshold value are set, this notice type ## will be generated by the metrics framework. note: Notice::Type &optional; ## A straight threshold for generating a notice. notice_threshold: count &optional; ## A series of thresholds at which to generate notices. notice_thresholds: vector of count &optional; ## How often this notice should be raised for this filter. It ## will be generated everytime it crosses a threshold, but if the ## $break_interval is set to 5mins and this is set to 1hr the notice ## only be generated once per hour even if something crosses the ## threshold in every break interval. notice_freq: interval &optional; }; ## Function to associate a metric filter with a metric ID. ## ## id: The metric ID that the filter should be associated with. ## ## filter: The record representing the filter configuration. global add_filter: function(id: ID, filter: Filter); ## Add data into a :bro:type:`Metrics::ID`. This should be called when ## a script has measured some point value and is ready to increment the ## counters. ## ## id: The metric ID that the data represents. ## ## index: The metric index that the value is to be added to. ## ## increment: How much to increment the counter by. global add_data: function(id: ID, index: Index, increment: count); ## Helper function to represent a :bro:type:`Metrics::Index` value as ## a simple string ## ## index: The metric index that is to be converted into a string. ## ## Returns: A string reprentation of the metric index. global index2str: function(index: Index): string; ## Event that is used to "finish" metrics and adapt the metrics ## framework for clustered or non-clustered usage. ## ## ..note: This is primarily intended for internal use. global log_it: event(filter: Filter); ## Event to access metrics records as they are passed to the logging framework. global log_metrics: event(rec: Info); ## Type to store a table of metrics values. Interal use only! type MetricTable: table[Index] of count &default=0; } redef record Notice::Info += { metric_index: Index &log &optional; }; global metric_filters: table[ID] of vector of Filter = table(); global filter_store: table[ID, string] of Filter = table(); # This is indexed by metric ID and stream filter name. global store: table[ID, string] of MetricTable = table() &default=table(); # This function checks if a threshold has been crossed and generates a # notice if it has. It is also used as a method to implement # mid-break-interval threshold crossing detection for cluster deployments. global check_notice: function(filter: Filter, index: Index, val: count): bool; # This is hook for watching thresholds being crossed. It is called whenever # index values are updated and the new val is given as the `val` argument. global data_added: function(filter: Filter, index: Index, val: count); # This stores the current threshold index for filters using the # $notice_threshold and $notice_thresholds elements. global thresholds: table[ID, string, Index] of count = {} &create_expire=renotice_interval &default=0; event bro_init() &priority=5 { Log::create_stream(Metrics::LOG, [$columns=Info, $ev=log_metrics]); } function index2str(index: Index): string { local out = ""; if ( index?$host ) out = fmt("%shost=%s", out, index$host); if ( index?$network ) out = fmt("%s%snetwork=%s", out, |out|==0 ? "" : ", ", index$network); if ( index?$str ) out = fmt("%s%sstr=%s", out, |out|==0 ? "" : ", ", index$str); return fmt("metric_index(%s)", out); } function write_log(ts: time, filter: Filter, data: MetricTable) { for ( index in data ) { local val = data[index]; local m: Info = [$ts=ts, $metric_id=filter$id, $filter_name=filter$name, $index=index, $value=val]; if ( filter$log ) Log::write(Metrics::LOG, m); } } function reset(filter: Filter) { store[filter$id, filter$name] = table(); } function add_filter(id: ID, filter: Filter) { if ( filter?$aggregation_table && filter?$aggregation_mask ) { print "INVALID Metric filter: Defined $aggregation_table and $aggregation_mask."; return; } if ( [id, filter$name] in store ) { print fmt("INVALID Metric filter: Filter with name \"%s\" already exists.", filter$name); return; } if ( filter?$notice_threshold && filter?$notice_thresholds ) { print "INVALID Metric filter: Defined both $notice_threshold and $notice_thresholds"; return; } if ( ! filter?$id ) filter$id = id; if ( id !in metric_filters ) metric_filters[id] = vector(); metric_filters[id][|metric_filters[id]|] = filter; filter_store[id, filter$name] = filter; store[id, filter$name] = table(); schedule filter$break_interval { Metrics::log_it(filter) }; } function add_data(id: ID, index: Index, increment: count) { if ( id !in metric_filters ) return; local filters = metric_filters[id]; # Try to add the data to all of the defined filters for the metric. for ( filter_id in filters ) { local filter = filters[filter_id]; # If this filter has a predicate, run the predicate and skip this # index if the predicate return false. if ( filter?$pred && ! filter$pred(index) ) next; if ( index?$host ) { if ( filter?$aggregation_mask ) { index$network = mask_addr(index$host, filter$aggregation_mask); delete index$host; } else if ( filter?$aggregation_table ) { # Don't add the data if the aggregation table doesn't include # the given host address. if ( index$host !in filter$aggregation_table ) return; index$network = filter$aggregation_table[index$host]; delete index$host; } } local metric_tbl = store[id, filter$name]; if ( index !in metric_tbl ) metric_tbl[index] = 0; metric_tbl[index] += increment; data_added(filter, index, metric_tbl[index]); } } function check_notice(filter: Filter, index: Index, val: count): bool { if ( (filter?$notice_threshold && [filter$id, filter$name, index] !in thresholds && val >= filter$notice_threshold) || (filter?$notice_thresholds && |filter$notice_thresholds| <= thresholds[filter$id, filter$name, index] && val >= filter$notice_thresholds[thresholds[filter$id, filter$name, index]]) ) return T; else return F; } function do_notice(filter: Filter, index: Index, val: count) { # We include $peer_descr here because the a manager count have actually # generated the notice even though the current remote peer for the event # calling this could be a worker if this is running as a cluster. local n: Notice::Info = [$note=filter$note, $n=val, $metric_index=index, $peer_descr=peer_description]; n$msg = fmt("Threshold crossed by %s %d/%d", index2str(index), val, filter$notice_threshold); if ( index?$str ) n$sub = index$str; if ( index?$host ) n$src = index$host; # TODO: not sure where to put the network yet. NOTICE(n); # This just needs set to some value so that it doesn't refire the # notice until it expires from the table or it crosses the next # threshold in the case of vectors of thresholds. ++thresholds[filter$id, filter$name, index]; }