mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 14:48:21 +00:00

- Log path's are generated in the scripting land now. The default Log stream ID to path string mapping works like this: - Notice::LOG -> "notice" - Notice::POLICY_LOG -> "notice_policy" - TestModule::LOG -> "test_module" - Logging streams updated across all of the shipped scripts to be more user friendly. Instead of the logging stream ID HTTP::HTTP, we now have HTTP::LOG, etc. - The priorities on some bro_init handlers have been adjusted to make the process of applying filters or disabling streams easier for users.
270 lines
9.2 KiB
Text
270 lines
9.2 KiB
Text
##! This is the implementation of the metrics framework.
|
|
|
|
@load base/frameworks/notice
|
|
|
|
module Metrics;
|
|
|
|
export {
|
|
redef enum Log::ID += { LOG };
|
|
|
|
type ID: enum {
|
|
NOTHING,
|
|
};
|
|
|
|
## The default interval used for "breaking" metrics and writing the
|
|
## current value to the logging stream.
|
|
const default_break_interval = 15mins &redef;
|
|
|
|
## This is the interval for how often notices will happen after they have
|
|
## already fired.
|
|
const renotice_interval = 1hr &redef;
|
|
|
|
type Index: record {
|
|
## Host is the value to which this metric applies.
|
|
host: addr &optional;
|
|
|
|
## A non-address related metric or a sub-key for an address based metric.
|
|
## An example might be successful SSH connections by client IP address
|
|
## where the client string would be the index value.
|
|
## Another example might be number of HTTP requests to a particular
|
|
## value in a Host header. This is an example of a non-host based
|
|
## metric since multiple IP addresses could respond for the same Host
|
|
## header value.
|
|
str: string &optional;
|
|
|
|
## The CIDR block that this metric applies to. This is typically
|
|
## only used internally for host based aggregation.
|
|
network: subnet &optional;
|
|
} &log;
|
|
|
|
type Info: record {
|
|
ts: time &log;
|
|
metric_id: ID &log;
|
|
filter_name: string &log;
|
|
index: Index &log;
|
|
value: count &log;
|
|
};
|
|
|
|
# TODO: configure a metrics filter logging stream to log the current
|
|
# metrics configuration in case someone is looking through
|
|
# old logs and the configuration has changed since then.
|
|
type Filter: record {
|
|
## The :bro:type:`Metrics::ID` that this filter applies to.
|
|
id: ID &optional;
|
|
## The name for this filter so that multiple filters can be
|
|
## applied to a single metrics to get a different view of the same
|
|
## metric data being collected (different aggregation, break, etc).
|
|
name: string &default="default";
|
|
## A predicate so that you can decide per index if you would like
|
|
## to accept the data being inserted.
|
|
pred: function(index: Index): bool &optional;
|
|
## Global mask by which you'd like to aggregate traffic.
|
|
aggregation_mask: count &optional;
|
|
## This is essentially a mapping table between addresses and subnets.
|
|
aggregation_table: table[subnet] of subnet &optional;
|
|
## The interval at which the metric should be "broken" and written
|
|
## to the logging stream. The counters are also reset to zero at
|
|
## this time so any threshold based detection needs to be set to a
|
|
## number that should be expected to happen within this period.
|
|
break_interval: interval &default=default_break_interval;
|
|
## This determines if the result of this filter is sent to the metrics
|
|
## logging stream. One use for the logging framework is as an internal
|
|
## thresholding and statistics gathering utility that is meant to
|
|
## never log but rather to generate notices and derive data.
|
|
log: bool &default=T;
|
|
## If this and a $notice_threshold value are set, this notice type
|
|
## will be generated by the metrics framework.
|
|
note: Notice::Type &optional;
|
|
## A straight threshold for generating a notice.
|
|
notice_threshold: count &optional;
|
|
## A series of thresholds at which to generate notices.
|
|
notice_thresholds: vector of count &optional;
|
|
## How often this notice should be raised for this metric index. It
|
|
## will be generated everytime it crosses a threshold, but if the
|
|
## $break_interval is set to 5mins and this is set to 1hr the notice
|
|
## only be generated once per hour even if something crosses the
|
|
## threshold in every break interval.
|
|
notice_freq: interval &optional;
|
|
};
|
|
|
|
global add_filter: function(id: ID, filter: Filter);
|
|
global add_data: function(id: ID, index: Index, increment: count);
|
|
global index2str: function(index: Index): string;
|
|
|
|
# This is the event that is used to "finish" metrics and adapt the metrics
|
|
# framework for clustered or non-clustered usage.
|
|
global log_it: event(filter: Filter);
|
|
|
|
global log_metrics: event(rec: Info);
|
|
}
|
|
|
|
redef record Notice::Info += {
|
|
metric_index: Index &log &optional;
|
|
};
|
|
|
|
global metric_filters: table[ID] of vector of Filter = table();
|
|
global filter_store: table[ID, string] of Filter = table();
|
|
|
|
type MetricTable: table[Index] of count &default=0;
|
|
# This is indexed by metric ID and stream filter name.
|
|
global store: table[ID, string] of MetricTable = table() &default=table();
|
|
|
|
# This function checks if a threshold has been crossed and generates a
|
|
# notice if it has. It is also used as a method to implement
|
|
# mid-break-interval threshold crossing detection for cluster deployments.
|
|
global check_notice: function(filter: Filter, index: Index, val: count): bool;
|
|
|
|
# This is hook for watching thresholds being crossed. It is called whenever
|
|
# index values are updated and the new val is given as the `val` argument.
|
|
global data_added: function(filter: Filter, index: Index, val: count);
|
|
|
|
# This stores the current threshold index for filters using the
|
|
# $notice_threshold and $notice_thresholds elements.
|
|
global thresholds: table[ID, string, Index] of count = {} &create_expire=renotice_interval &default=0;
|
|
|
|
event bro_init() &priority=5
|
|
{
|
|
Log::create_stream(Metrics::LOG, [$columns=Info, $ev=log_metrics]);
|
|
}
|
|
|
|
function index2str(index: Index): string
|
|
{
|
|
local out = "";
|
|
if ( index?$host )
|
|
out = fmt("%shost=%s", out, index$host);
|
|
if ( index?$network )
|
|
out = fmt("%s%snetwork=%s", out, |out|==0 ? "" : ", ", index$network);
|
|
if ( index?$str )
|
|
out = fmt("%s%sstr=%s", out, |out|==0 ? "" : ", ", index$str);
|
|
return fmt("metric_index(%s)", out);
|
|
}
|
|
|
|
function write_log(ts: time, filter: Filter, data: MetricTable)
|
|
{
|
|
for ( index in data )
|
|
{
|
|
local val = data[index];
|
|
local m: Info = [$ts=ts,
|
|
$metric_id=filter$id,
|
|
$filter_name=filter$name,
|
|
$index=index,
|
|
$value=val];
|
|
|
|
if ( filter$log )
|
|
Log::write(Metrics::LOG, m);
|
|
}
|
|
}
|
|
|
|
|
|
function reset(filter: Filter)
|
|
{
|
|
store[filter$id, filter$name] = table();
|
|
}
|
|
|
|
function add_filter(id: ID, filter: Filter)
|
|
{
|
|
if ( filter?$aggregation_table && filter?$aggregation_mask )
|
|
{
|
|
print "INVALID Metric filter: Defined $aggregation_table and $aggregation_mask.";
|
|
return;
|
|
}
|
|
if ( [id, filter$name] in store )
|
|
{
|
|
print fmt("INVALID Metric filter: Filter with name \"%s\" already exists.", filter$name);
|
|
return;
|
|
}
|
|
if ( filter?$notice_threshold && filter?$notice_thresholds )
|
|
{
|
|
print "INVALID Metric filter: Defined both $notice_threshold and $notice_thresholds";
|
|
return;
|
|
}
|
|
|
|
if ( ! filter?$id )
|
|
filter$id = id;
|
|
|
|
if ( id !in metric_filters )
|
|
metric_filters[id] = vector();
|
|
metric_filters[id][|metric_filters[id]|] = filter;
|
|
|
|
filter_store[id, filter$name] = filter;
|
|
store[id, filter$name] = table();
|
|
|
|
schedule filter$break_interval { Metrics::log_it(filter) };
|
|
}
|
|
|
|
function add_data(id: ID, index: Index, increment: count)
|
|
{
|
|
if ( id !in metric_filters )
|
|
return;
|
|
|
|
local filters = metric_filters[id];
|
|
|
|
# Try to add the data to all of the defined filters for the metric.
|
|
for ( filter_id in filters )
|
|
{
|
|
local filter = filters[filter_id];
|
|
|
|
# If this filter has a predicate, run the predicate and skip this
|
|
# index if the predicate return false.
|
|
if ( filter?$pred && ! filter$pred(index) )
|
|
next;
|
|
|
|
if ( index?$host )
|
|
{
|
|
if ( filter?$aggregation_mask )
|
|
{
|
|
index$network = mask_addr(index$host, filter$aggregation_mask);
|
|
delete index$host;
|
|
}
|
|
else if ( filter?$aggregation_table )
|
|
{
|
|
index$network = filter$aggregation_table[index$host];
|
|
delete index$host;
|
|
}
|
|
}
|
|
|
|
local metric_tbl = store[id, filter$name];
|
|
if ( index !in metric_tbl )
|
|
metric_tbl[index] = 0;
|
|
metric_tbl[index] += increment;
|
|
|
|
data_added(filter, index, metric_tbl[index]);
|
|
}
|
|
}
|
|
|
|
function check_notice(filter: Filter, index: Index, val: count): bool
|
|
{
|
|
if ( (filter?$notice_threshold &&
|
|
[filter$id, filter$name, index] !in thresholds &&
|
|
val >= filter$notice_threshold) ||
|
|
(filter?$notice_thresholds &&
|
|
|filter$notice_thresholds| <= thresholds[filter$id, filter$name, index] &&
|
|
val >= filter$notice_thresholds[thresholds[filter$id, filter$name, index]]) )
|
|
return T;
|
|
else
|
|
return F;
|
|
}
|
|
|
|
function do_notice(filter: Filter, index: Index, val: count)
|
|
{
|
|
# We include $peer_descr here because the a manager count have actually
|
|
# generated the notice even though the current remote peer for the event
|
|
# calling this could be a worker if this is running as a cluster.
|
|
local n: Notice::Info = [$note=filter$note,
|
|
$n=val,
|
|
$metric_index=index,
|
|
$peer_descr=peer_description];
|
|
n$msg = fmt("Threshold crossed by %s %d/%d", index2str(index), val, filter$notice_threshold);
|
|
if ( index?$str )
|
|
n$sub = index$str;
|
|
if ( index?$host )
|
|
n$src = index$host;
|
|
# TODO: not sure where to put the network yet.
|
|
|
|
NOTICE(n);
|
|
|
|
# This just needs set to some value so that it doesn't refire the
|
|
# notice until it expires from the table or it crosses the next
|
|
# threshold in the case of vectors of thresholds.
|
|
++thresholds[filter$id, filter$name, index];
|
|
}
|