mirror of
https://github.com/zeek/zeek.git
synced 2025-10-14 04:28:20 +00:00
Metrics framework update. Mostly to make metrics work on clusters.
- Metrics now work on cluster deployments with no caveats. It should be completely transparent. Intermediate updates to speed some detection will come later.
This commit is contained in:
parent
2af9d9bc20
commit
3919a35b9b
18 changed files with 407 additions and 117 deletions
|
@ -1,5 +1,7 @@
|
|||
##! This is the implementation of the metrics framework.
|
||||
|
||||
@load base/frameworks/notice
|
||||
|
||||
module Metrics;
|
||||
|
||||
export {
|
||||
|
@ -13,16 +15,7 @@ export {
|
|||
## current value to the logging stream.
|
||||
const default_break_interval = 15mins &redef;
|
||||
|
||||
type Info: record {
|
||||
ts: time &log;
|
||||
metric_id: ID &log;
|
||||
filter_name: string &log;
|
||||
agg_subnet: string &log &optional;
|
||||
index: string &log &optional;
|
||||
value: count &log;
|
||||
};
|
||||
|
||||
type Entry: record {
|
||||
type Index: record {
|
||||
## Host is the value to which this metric applies.
|
||||
host: addr &optional;
|
||||
|
||||
|
@ -33,11 +26,19 @@ export {
|
|||
## value in a Host header. This is an example of a non-host based
|
||||
## metric since multiple IP addresses could respond for the same Host
|
||||
## header value.
|
||||
index: string &default="";
|
||||
str: string &optional;
|
||||
|
||||
## The value by which the counter should be increased in each filter
|
||||
## where this entry is accepted.
|
||||
increment: count &default=1;
|
||||
## The CIDR block that this metric applies to. This is typically
|
||||
## only used internally for host based aggregation.
|
||||
network: subnet &optional;
|
||||
} &log;
|
||||
|
||||
type Info: record {
|
||||
ts: time &log;
|
||||
metric_id: ID &log;
|
||||
filter_name: string &log;
|
||||
index: Index &log;
|
||||
value: count &log;
|
||||
};
|
||||
|
||||
# TODO: configure a metrics filter logging stream to log the current
|
||||
|
@ -52,11 +53,11 @@ export {
|
|||
name: string &default="default";
|
||||
## A predicate so that you can decide per index if you would like
|
||||
## to accept the data being inserted.
|
||||
pred: function(entry: Entry): bool &optional;
|
||||
pred: function(index: Index): bool &optional;
|
||||
## Global mask by which you'd like to aggregate traffic.
|
||||
aggregation_mask: count &optional;
|
||||
## This is essentially applying names to various subnets.
|
||||
aggregation_table: table[subnet] of string &optional;
|
||||
aggregation_table: table[subnet] of subnet &optional;
|
||||
## The interval at which the metric should be "broken" and written
|
||||
## to the logging stream.
|
||||
break_interval: interval &default=default_break_interval;
|
||||
|
@ -68,6 +69,7 @@ export {
|
|||
## A straight threshold for generating a notice.
|
||||
notice_threshold: count &optional;
|
||||
## A series of thresholds at which to generate notices.
|
||||
## TODO: This is not implemented yet!
|
||||
notice_thresholds: vector of count &optional;
|
||||
## If this and a $notice_threshold value are set, this notice type
|
||||
## will be generated by the metrics framework.
|
||||
|
@ -75,15 +77,23 @@ export {
|
|||
};
|
||||
|
||||
global add_filter: function(id: ID, filter: Filter);
|
||||
global add_data: function(id: ID, entry: Entry);
|
||||
global add_data: function(id: ID, index: Index, increment: count);
|
||||
|
||||
# This is the event that is used to "finish" metrics and adapt the metrics
|
||||
# framework for clustered or non-clustered usage.
|
||||
global log_it: event(filter: Filter);
|
||||
|
||||
global log_metrics: event(rec: Info);
|
||||
}
|
||||
|
||||
global metric_filters: table[ID] of vector of Filter = table();
|
||||
redef record Notice::Info += {
|
||||
metric_index: Index &log &optional;
|
||||
};
|
||||
|
||||
type MetricIndex: table[string] of count &default=0;
|
||||
type MetricTable: table[string] of MetricIndex;
|
||||
global metric_filters: table[ID] of vector of Filter = table();
|
||||
global filter_store: table[ID, string] of Filter = table();
|
||||
|
||||
type MetricTable: table[Index] of count &default=0;
|
||||
# This is indexed by metric ID and stream filter name.
|
||||
global store: table[ID, string] of MetricTable = table();
|
||||
|
||||
|
@ -95,63 +105,45 @@ event bro_init() &priority=5
|
|||
{
|
||||
Log::create_stream(METRICS, [$columns=Info, $ev=log_metrics]);
|
||||
}
|
||||
|
||||
|
||||
function write_log(ts: time, filter: Filter, data: MetricTable)
|
||||
{
|
||||
for ( index in data )
|
||||
{
|
||||
local val = data[index];
|
||||
local m: Info = [$ts=ts,
|
||||
$metric_id=filter$id,
|
||||
$filter_name=filter$name,
|
||||
$index=index,
|
||||
$value=val];
|
||||
|
||||
if ( m$index?$host &&
|
||||
filter?$notice_threshold &&
|
||||
m$value >= filter$notice_threshold )
|
||||
{
|
||||
NOTICE([$note=filter$note,
|
||||
$msg=fmt("Metrics threshold crossed by %s %d/%d", index$host, m$value, filter$notice_threshold),
|
||||
$src=m$index$host, $n=m$value,
|
||||
$metric_index=index]);
|
||||
}
|
||||
|
||||
else if ( filter?$notice_thresholds &&
|
||||
m$value >= filter$notice_thresholds[thresholds[cat(filter$id,filter$name)]] )
|
||||
{
|
||||
# TODO: implement this
|
||||
}
|
||||
|
||||
if ( filter$log )
|
||||
Log::write(METRICS, m);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
function reset(filter: Filter)
|
||||
{
|
||||
store[filter$id, filter$name] = table();
|
||||
}
|
||||
|
||||
event log_it(filter: Filter)
|
||||
{
|
||||
# If this node is the manager in a cluster, this needs to request values
|
||||
# for this metric from all of the workers.
|
||||
|
||||
local id = filter$id;
|
||||
local name = filter$name;
|
||||
for ( agg_subnet in store[id, name] )
|
||||
{
|
||||
local metric_values = store[id, name][agg_subnet];
|
||||
for ( index in metric_values )
|
||||
{
|
||||
local val = metric_values[index];
|
||||
local m: Info = [$ts=network_time(),
|
||||
$metric_id=id,
|
||||
$filter_name=name,
|
||||
$agg_subnet=fmt("%s", agg_subnet),
|
||||
$index=index,
|
||||
$value=val];
|
||||
|
||||
if ( filter?$notice_threshold &&
|
||||
m$value >= filter$notice_threshold )
|
||||
{
|
||||
print m;
|
||||
NOTICE([$note=filter$note,
|
||||
$msg=fmt("Metrics threshold crossed by %s %d/%d", m$agg_subnet, m$value, filter$notice_threshold),
|
||||
$n=m$value]);
|
||||
}
|
||||
|
||||
else if ( filter?$notice_thresholds &&
|
||||
m$value >= filter$notice_thresholds[thresholds[cat(id,name)]] )
|
||||
{
|
||||
# TODO: implement this
|
||||
}
|
||||
|
||||
# If there wasn't an index, remove the field.
|
||||
if ( index == "" )
|
||||
delete m$index;
|
||||
|
||||
# If there wasn't an aggregation subnet, remove the field.
|
||||
if ( agg_subnet == "" )
|
||||
delete m$agg_subnet;
|
||||
|
||||
Log::write(METRICS, m);
|
||||
}
|
||||
}
|
||||
reset(filter);
|
||||
|
||||
schedule filter$break_interval { log_it(filter) };
|
||||
}
|
||||
|
||||
function add_filter(id: ID, filter: Filter)
|
||||
{
|
||||
if ( filter?$aggregation_table && filter?$aggregation_mask )
|
||||
|
@ -177,13 +169,13 @@ function add_filter(id: ID, filter: Filter)
|
|||
metric_filters[id] = vector();
|
||||
metric_filters[id][|metric_filters[id]|] = filter;
|
||||
|
||||
filter_store[id, filter$name] = filter;
|
||||
store[id, filter$name] = table();
|
||||
|
||||
# Only do this on the manager if in a cluster.
|
||||
schedule filter$break_interval { log_it(filter) };
|
||||
schedule filter$break_interval { Metrics::log_it(filter) };
|
||||
}
|
||||
|
||||
function add_data(id: ID, entry: Entry)
|
||||
function add_data(id: ID, index: Index, increment: count)
|
||||
{
|
||||
if ( id !in metric_filters )
|
||||
return;
|
||||
|
@ -196,38 +188,28 @@ function add_data(id: ID, entry: Entry)
|
|||
local filter = filters[filter_id];
|
||||
|
||||
# If this filter has a predicate, run the predicate and skip this
|
||||
# entry if the predicate return false.
|
||||
# index if the predicate return false.
|
||||
if ( filter?$pred &&
|
||||
! filter$pred(entry) )
|
||||
! filter$pred(index) )
|
||||
next;
|
||||
|
||||
local agg_subnet = "";
|
||||
local filt_store = store[id, filter$name];
|
||||
if ( entry?$host )
|
||||
if ( index?$host )
|
||||
{
|
||||
if ( filter?$aggregation_mask )
|
||||
{
|
||||
local agg_mask = filter$aggregation_mask;
|
||||
agg_subnet = fmt("%s", mask_addr(entry$host, agg_mask));
|
||||
index$network = mask_addr(index$host, filter$aggregation_mask);
|
||||
delete index$host;
|
||||
}
|
||||
else if ( filter?$aggregation_table )
|
||||
{
|
||||
agg_subnet = fmt("%s", filter$aggregation_table[entry$host]);
|
||||
# if an aggregation table is being used and the value isn't
|
||||
# in the table, that means we aren't interested in it.
|
||||
if ( agg_subnet == "" )
|
||||
next;
|
||||
index$network = filter$aggregation_table[index$host];
|
||||
delete index$host;
|
||||
}
|
||||
else
|
||||
agg_subnet = fmt("%s", entry$host);
|
||||
}
|
||||
|
||||
if ( agg_subnet !in filt_store )
|
||||
filt_store[agg_subnet] = table();
|
||||
|
||||
local fs = filt_store[agg_subnet];
|
||||
if ( entry$index !in fs )
|
||||
fs[entry$index] = 0;
|
||||
fs[entry$index] = fs[entry$index] + entry$increment;
|
||||
if ( index !in filt_store )
|
||||
filt_store[index] = 0;
|
||||
filt_store[index] += increment;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue