Merge remote-tracking branch 'origin/master' into topic/seth/analyzer-framework

Conflicts:
	scripts/base/init-default.bro
	scripts/base/protocols/dns/main.bro
	scripts/base/protocols/ftp/main.bro
	scripts/base/protocols/http/main.bro
	scripts/base/protocols/irc/main.bro
	scripts/base/protocols/smtp/main.bro
	scripts/base/protocols/ssh/main.bro
	scripts/base/protocols/ssl/main.bro
	scripts/base/protocols/syslog/main.bro
	src/main.cc
	testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log
This commit is contained in:
Seth Hall 2013-07-04 23:07:52 -04:00
commit 5f8ee93ef0
1249 changed files with 267087 additions and 176962 deletions

View file

@ -0,0 +1 @@
@load ./main

View file

@ -0,0 +1,179 @@
##! Framework for managing Bro's protocol analyzers.
##!
##! The analyzer framework allows to dynamically enable or disable analyzers, as
##! well as to manage the well-known ports which automatically activate a
##! particular analyzer for new connections.
##!
##! Protocol analyzers are identified by unique tags of type
##! :bro:type:`Analyzer::Tag`, such as :bro:enum:`Analyzer::ANALYZER_HTTP` and
##! :bro:enum:`Analyzer::ANALYZER_HTTP`. These tags are defined internally by
##! the analyzers themselves, and documented in their analyzer-specific
##! description along with the events that they generate.
module Analyzer;
export {
## If true, all available analyzers are initially disabled at startup. One
## can then selectively enable them with
## :bro:id:`Analyzer::enable_analyzer`.
global disable_all = F &redef;
## Enables an analyzer. Once enabled, the analyzer may be used for analysis
## of future connections as decided by Bro's dynamic protocol detection.
##
## tag: The tag of the analyzer to enable.
##
## Returns: True if the analyzer was successfully enabled.
global enable_analyzer: function(tag: Analyzer::Tag) : bool;
## Disables an analyzer. Once disabled, the analyzer will not be used
## further for analysis of future connections.
##
## tag: The tag of the analyzer to disable.
##
## Returns: True if the analyzer was successfully disabled.
global disable_analyzer: function(tag: Analyzer::Tag) : bool;
## Registers a set of well-known ports for an analyzer. If a future
## connection on one of these ports is seen, the analyzer will be
## automatically assigned to parsing it. The function *adds* to all ports
## already registered, it doesn't replace them.
##
## tag: The tag of the analyzer.
##
## ports: The set of well-known ports to associate with the analyzer.
##
## Returns: True if the ports were sucessfully registered.
global register_for_ports: function(tag: Analyzer::Tag, ports: set[port]) : bool;
## Registers an individual well-known port for an analyzer. If a future
## connection on this port is seen, the analyzer will be automatically
## assigned to parsing it. The function *adds* to all ports already
## registered, it doesn't replace them.
##
## tag: The tag of the analyzer.
##
## p: The well-known port to associate with the analyzer.
##
## Returns: True if the port was sucessfully registered.
global register_for_port: function(tag: Analyzer::Tag, p: port) : bool;
## Returns a set of all well-known ports currently registered for a
## specific analyzer.
##
## tag: The tag of the analyzer.
##
## Returns: The set of ports.
global registered_ports: function(tag: Analyzer::Tag) : set[port];
## Returns a table of all ports-to-analyzer mappings currently registered.
##
## Returns: A table mapping each analyzer to the set of ports
## registered for it.
global all_registered_ports: function() : table[Analyzer::Tag] of set[port];
## Translates an analyzer type to a string with the analyzer's name.
##
## tag: The analyzer tag.
##
## Returns: The analyzer name corresponding to the tag.
global name: function(tag: Analyzer::Tag) : string;
## Schedules an analyzer for a future connection originating from a given IP
## address and port.
##
## orig: The IP address originating a connection in the future.
## 0.0.0.0 can be used as a wildcard to match any originator address.
##
## resp: The IP address responding to a connection from *orig*.
##
## resp_p: The destination port at *resp*.
##
## analyzer: The analyzer ID.
##
## tout: A timeout interval after which the scheduling request will be
## discarded if the connection has not yet been seen.
##
## Returns: True if succesful.
global schedule_analyzer: function(orig: addr, resp: addr, resp_p: port,
analyzer: Analyzer::Tag, tout: interval) : bool;
## A set of analyzers to disable by default at startup. The default set
## contains legacy analyzers that are no longer supported.
global disabled_analyzers: set[Analyzer::Tag] = {
ANALYZER_INTERCONN,
ANALYZER_STEPPINGSTONE,
ANALYZER_BACKDOOR,
ANALYZER_TCPSTATS,
} &redef;
}
@load base/bif/analyzer.bif
global ports: table[Analyzer::Tag] of set[port];
event bro_init() &priority=5
{
if ( disable_all )
__disable_all_analyzers();
for ( a in disabled_analyzers )
disable_analyzer(a);
}
function enable_analyzer(tag: Analyzer::Tag) : bool
{
return __enable_analyzer(tag);
}
function disable_analyzer(tag: Analyzer::Tag) : bool
{
return __disable_analyzer(tag);
}
function register_for_ports(tag: Analyzer::Tag, ports: set[port]) : bool
{
local rc = T;
for ( p in ports )
{
if ( ! register_for_port(tag, p) )
rc = F;
}
return rc;
}
function register_for_port(tag: Analyzer::Tag, p: port) : bool
{
if ( ! __register_for_port(tag, p) )
return F;
if ( tag !in ports )
ports[tag] = set();
add ports[tag][p];
return T;
}
function registered_ports(tag: Analyzer::Tag) : set[port]
{
return tag in ports ? ports[tag] : set();
}
function all_registered_ports(): table[Analyzer::Tag] of set[port]
{
return ports;
}
function name(atype: Analyzer::Tag) : string
{
return __name(atype);
}
function schedule_analyzer(orig: addr, resp: addr, resp_p: port,
analyzer: Analyzer::Tag, tout: interval) : bool
{
return __schedule_analyzer(orig, resp, resp_p, analyzer, tout);
}

View file

@ -39,7 +39,7 @@ export {
## The node type doing all the actual traffic analysis.
WORKER,
## A node acting as a traffic recorder using the
## `Time Machine <http://tracker.bro-ids.org/time-machine>`_ software.
## `Time Machine <http://tracker.bro.org/time-machine>`_ software.
TIME_MACHINE,
};

View file

@ -23,12 +23,12 @@ export {
analyzer: string &log;
## The textual reason for the analysis failure.
failure_reason: string &log;
## Disabled analyzer IDs. This is only for internal tracking
## Disabled analyzer IDs. This is only for internal tracking
## so as to not attempt to disable analyzers multiple times.
disabled_aids: set[count];
};
## Ignore violations which go this many bytes into the connection.
## Set to 0 to never ignore protocol violations.
const ignore_violations_after = 10 * 1024 &redef;
@ -41,41 +41,30 @@ redef record connection += {
event bro_init() &priority=5
{
Log::create_stream(DPD::LOG, [$columns=Info]);
# Populate the internal DPD analysis variable.
for ( a in dpd_config )
{
for ( p in dpd_config[a]$ports )
{
if ( p !in dpd_analyzer_ports )
dpd_analyzer_ports[p] = set();
add dpd_analyzer_ports[p][a];
}
}
}
event protocol_confirmation(c: connection, atype: count, aid: count) &priority=10
event protocol_confirmation(c: connection, atype: Analyzer::Tag, aid: count) &priority=10
{
local analyzer = analyzer_name(atype);
local analyzer = Analyzer::name(atype);
if ( fmt("-%s",analyzer) in c$service )
delete c$service[fmt("-%s", analyzer)];
add c$service[analyzer];
}
event protocol_violation(c: connection, atype: count, aid: count,
event protocol_violation(c: connection, atype: Analyzer::Tag, aid: count,
reason: string) &priority=10
{
local analyzer = analyzer_name(atype);
local analyzer = Analyzer::name(atype);
# If the service hasn't been confirmed yet, don't generate a log message
# for the protocol violation.
if ( analyzer !in c$service )
return;
delete c$service[analyzer];
add c$service[fmt("-%s", analyzer)];
local info: Info;
info$ts=network_time();
info$uid=c$uid;
@ -86,7 +75,7 @@ event protocol_violation(c: connection, atype: count, aid: count,
c$dpd = info;
}
event protocol_violation(c: connection, atype: count, aid: count, reason: string) &priority=5
event protocol_violation(c: connection, atype: Analyzer::Tag, aid: count, reason: string) &priority=5
{
if ( !c?$dpd || aid in c$dpd$disabled_aids )
return;
@ -94,13 +83,13 @@ event protocol_violation(c: connection, atype: count, aid: count, reason: string
local size = c$orig$size + c$resp$size;
if ( ignore_violations_after > 0 && size > ignore_violations_after )
return;
# Disable the analyzer that raised the last core-generated event.
disable_analyzer(c$id, aid);
add c$dpd$disabled_aids[aid];
}
event protocol_violation(c: connection, atype: count, aid: count,
event protocol_violation(c: connection, atype: Analyzer::Tag, aid: count,
reason: string) &priority=-5
{
if ( c?$dpd )

View file

@ -0,0 +1 @@
@load ./main.bro

View file

@ -0,0 +1,261 @@
##! An interface for driving the analysis of files, possibly independent of
##! any network protocol over which they're transported.
@load base/bif/file_analysis.bif
@load base/frameworks/logging
module FileAnalysis;
export {
redef enum Log::ID += {
## Logging stream for file analysis.
LOG
};
## A structure which represents a desired type of file analysis.
type AnalyzerArgs: record {
## The type of analysis.
tag: FileAnalysis::Tag;
## The local filename to which to write an extracted file. Must be
## set when *tag* is :bro:see:`FileAnalysis::ANALYZER_EXTRACT`.
extract_filename: string &optional;
## An event which will be generated for all new file contents,
## chunk-wise. Used when *tag* is
## :bro:see:`FileAnalysis::ANALYZER_DATA_EVENT`.
chunk_event: event(f: fa_file, data: string, off: count) &optional;
## An event which will be generated for all new file contents,
## stream-wise. Used when *tag* is
## :bro:see:`FileAnalysis::ANALYZER_DATA_EVENT`.
stream_event: event(f: fa_file, data: string) &optional;
} &redef;
## Contains all metadata related to the analysis of a given file.
## For the most part, fields here are derived from ones of the same name
## in :bro:see:`fa_file`.
type Info: record {
## An identifier associated with a single file.
id: string &log;
## Identifier associated with a container file from which this one was
## extracted as part of the file analysis.
parent_id: string &log &optional;
## An identification of the source of the file data. E.g. it may be
## a network protocol over which it was transferred, or a local file
## path which was read, or some other input source.
source: string &log &optional;
## If the source of this file is is a network connection, this field
## may be set to indicate the directionality.
is_orig: bool &log &optional;
## The time at which the last activity for the file was seen.
last_active: time &log;
## Number of bytes provided to the file analysis engine for the file.
seen_bytes: count &log &default=0;
## Total number of bytes that are supposed to comprise the full file.
total_bytes: count &log &optional;
## The number of bytes in the file stream that were completely missed
## during the process of analysis e.g. due to dropped packets.
missing_bytes: count &log &default=0;
## The number of not all-in-sequence bytes in the file stream that
## were delivered to file analyzers due to reassembly buffer overflow.
overflow_bytes: count &log &default=0;
## The amount of time between receiving new data for this file that
## the analysis engine will wait before giving up on it.
timeout_interval: interval &log &optional;
## The number of bytes at the beginning of a file to save for later
## inspection in *bof_buffer* field.
bof_buffer_size: count &log &optional;
## A mime type provided by libmagic against the *bof_buffer*, or
## in the cases where no buffering of the beginning of file occurs,
## an initial guess of the mime type based on the first data seen.
mime_type: string &log &optional;
## Whether the file analysis timed out at least once for the file.
timedout: bool &log &default=F;
## Connection UIDS over which the file was transferred.
conn_uids: set[string] &log;
## A set of analysis types done during the file analysis.
analyzers: set[FileAnalysis::Tag];
## Local filenames of extracted files.
extracted_files: set[string] &log;
## An MD5 digest of the file contents.
md5: string &log &optional;
## A SHA1 digest of the file contents.
sha1: string &log &optional;
## A SHA256 digest of the file contents.
sha256: string &log &optional;
} &redef;
## A table that can be used to disable file analysis completely for
## any files transferred over given network protocol analyzers.
const disable: table[Analyzer::Tag] of bool = table() &redef;
## Event that can be handled to access the Info record as it is sent on
## to the logging framework.
global log_file_analysis: event(rec: Info);
## The salt concatenated to unique file handle strings generated by
## :bro:see:`get_file_handle` before hashing them in to a file id
## (the *id* field of :bro:see:`fa_file`).
## Provided to help mitigate the possiblility of manipulating parts of
## network connections that factor in to the file handle in order to
## generate two handles that would hash to the same file id.
const salt = "I recommend changing this." &redef;
## Sets the *timeout_interval* field of :bro:see:`fa_file`, which is
## used to determine the length of inactivity that is allowed for a file
## before internal state related to it is cleaned up. When used within a
## :bro:see:`file_timeout` handler, the analysis will delay timing out
## again for the period specified by *t*.
##
## f: the file.
##
## t: the amount of time the file can remain inactive before discarding.
##
## Returns: true if the timeout interval was set, or false if analysis
## for the *id* isn't currently active.
global set_timeout_interval: function(f: fa_file, t: interval): bool;
## Adds an analyzer to the analysis of a given file.
##
## f: the file.
##
## args: the analyzer type to add along with any arguments it takes.
##
## Returns: true if the analyzer will be added, or false if analysis
## for the *id* isn't currently active or the *args*
## were invalid for the analyzer type.
global add_analyzer: function(f: fa_file, args: AnalyzerArgs): bool;
## Removes an analyzer from the analysis of a given file.
##
## f: the file.
##
## args: the analyzer (type and args) to remove.
##
## Returns: true if the analyzer will be removed, or false if analysis
## for the *id* isn't currently active.
global remove_analyzer: function(f: fa_file, args: AnalyzerArgs): bool;
## Stops/ignores any further analysis of a given file.
##
## f: the file.
##
## Returns: true if analysis for the given file will be ignored for the
## rest of it's contents, or false if analysis for the *id*
## isn't currently active.
global stop: function(f: fa_file): bool;
}
redef record fa_file += {
info: Info &optional;
};
function set_info(f: fa_file)
{
if ( ! f?$info )
{
local tmp: Info;
f$info = tmp;
}
f$info$id = f$id;
if ( f?$parent_id ) f$info$parent_id = f$parent_id;
if ( f?$source ) f$info$source = f$source;
if ( f?$is_orig ) f$info$is_orig = f$is_orig;
f$info$last_active = f$last_active;
f$info$seen_bytes = f$seen_bytes;
if ( f?$total_bytes ) f$info$total_bytes = f$total_bytes;
f$info$missing_bytes = f$missing_bytes;
f$info$overflow_bytes = f$overflow_bytes;
f$info$timeout_interval = f$timeout_interval;
f$info$bof_buffer_size = f$bof_buffer_size;
if ( f?$mime_type ) f$info$mime_type = f$mime_type;
if ( f?$conns )
for ( cid in f$conns )
add f$info$conn_uids[f$conns[cid]$uid];
}
function set_timeout_interval(f: fa_file, t: interval): bool
{
return __set_timeout_interval(f$id, t);
}
function add_analyzer(f: fa_file, args: AnalyzerArgs): bool
{
if ( ! __add_analyzer(f$id, args) ) return F;
set_info(f);
add f$info$analyzers[args$tag];
if ( args$tag == FileAnalysis::ANALYZER_EXTRACT )
add f$info$extracted_files[args$extract_filename];
return T;
}
function remove_analyzer(f: fa_file, args: AnalyzerArgs): bool
{
return __remove_analyzer(f$id, args);
}
function stop(f: fa_file): bool
{
return __stop(f$id);
}
event bro_init() &priority=5
{
Log::create_stream(FileAnalysis::LOG,
[$columns=Info, $ev=log_file_analysis]);
}
event file_timeout(f: fa_file) &priority=5
{
set_info(f);
f$info$timedout = T;
}
event file_hash(f: fa_file, kind: string, hash: string) &priority=5
{
set_info(f);
switch ( kind ) {
case "md5":
f$info$md5 = hash;
break;
case "sha1":
f$info$sha1 = hash;
break;
case "sha256":
f$info$sha256 = hash;
break;
}
}
event file_state_remove(f: fa_file) &priority=5
{
set_info(f);
}
event file_state_remove(f: fa_file) &priority=-5
{
Log::write(FileAnalysis::LOG, f$info);
}

View file

@ -2,4 +2,5 @@
@load ./readers/ascii
@load ./readers/raw
@load ./readers/benchmark
@load ./readers/binary
@load ./readers/sqlite

View file

@ -11,6 +11,24 @@ export {
## The default reader mode used. Defaults to `MANUAL`.
const default_mode = MANUAL &redef;
## Separator between fields.
## Please note that the separator has to be exactly one character long.
## Can be overwritten by individual writers.
const separator = "\t" &redef;
## Separator between set elements.
## Please note that the separator has to be exactly one character long.
## Can be overwritten by individual writers.
const set_separator = "," &redef;
## String to use for empty fields.
## Can be overwritten by individual writers.
const empty_field = "(empty)" &redef;
## String to use for an unset &optional field.
## Can be overwritten by individual writers.
const unset_field = "-" &redef;
## Flag that controls if the input framework accepts records
## that contain types that are not supported (at the moment
## file and function). If true, the input framework will
@ -104,6 +122,34 @@ export {
config: table[string] of string &default=table();
};
## A file analyis input stream type used to forward input data to the
## file analysis framework.
type AnalysisDescription: record {
## String that allows the reader to find the source.
## For `READER_ASCII`, this is the filename.
source: string;
## Reader to use for this steam. Compatible readers must be
## able to accept a filter of a single string type (i.e.
## they read a byte stream).
reader: Reader &default=Input::READER_BINARY;
## Read mode to use for this stream
mode: Mode &default=default_mode;
## Descriptive name that uniquely identifies the input source.
## Can be used used to remove a stream at a later time.
## This will also be used for the unique *source* field of
## :bro:see:`fa_file`. Most of the time, the best choice for this
## field will be the same value as the *source* field.
name: string;
## A key/value table that will be passed on the reader.
## Interpretation of the values is left to the writer, but
## usually they will be used for configuration purposes.
config: table[string] of string &default=table();
};
## Create a new table input from a given source. Returns true on success.
##
## description: `TableDescription` record describing the source.
@ -114,8 +160,16 @@ export {
## description: `TableDescription` record describing the source.
global add_event: function(description: Input::EventDescription) : bool;
## Create a new file analysis input from a given source. Data read from
## the source is automatically forwarded to the file analysis framework.
##
## description: A record describing the source
##
## Returns: true on sucess.
global add_analysis: function(description: Input::AnalysisDescription) : bool;
## Remove a input stream. Returns true on success and false if the named stream was
## not found.
## not found.
##
## id: string value identifying the stream to be removed
global remove: function(id: string) : bool;
@ -131,7 +185,7 @@ export {
global end_of_data: event(name: string, source:string);
}
@load base/input.bif
@load base/bif/input.bif
module Input;
@ -146,6 +200,11 @@ function add_event(description: Input::EventDescription) : bool
return __create_event_stream(description);
}
function add_analysis(description: Input::AnalysisDescription) : bool
{
return __create_analysis_stream(description);
}
function remove(id: string) : bool
{
return __remove_stream(id);

View file

@ -7,15 +7,15 @@ module InputAscii;
export {
## Separator between fields.
## Please note that the separator has to be exactly one character long
const separator = "\t" &redef;
const separator = Input::separator &redef;
## Separator between set elements.
## Please note that the separator has to be exactly one character long
const set_separator = "," &redef;
const set_separator = Input::set_separator &redef;
## String to use for empty fields.
const empty_field = "(empty)" &redef;
const empty_field = Input::empty_field &redef;
## String to use for an unset &optional field.
const unset_field = "-" &redef;
const unset_field = Input::unset_field &redef;
}

View file

@ -0,0 +1,8 @@
##! Interface for the binary input reader.
module InputBinary;
export {
## Size of data chunks to read from the input file at a time.
const chunk_size = 1024 &redef;
}

View file

@ -6,4 +6,12 @@ export {
## Separator between input records.
## Please note that the separator has to be exactly one character long
const record_separator = "\n" &redef;
## Event that is called when a process created by the raw reader exits.
##
## name: name of the input stream
## source: source of the input stream
## exit_code: exit code of the program, or number of the signal that forced the program to exit
## signal_exit: false when program exitted normally, true when program was forced to exit by a signal
global process_finished: event(name: string, source:string, exit_code:count, signal_exit:bool);
}

View file

@ -0,0 +1,17 @@
##! Interface for the SQLite input reader.
##!
##! The defaults are set to match Bro's ASCII output.
module InputSQLite;
export {
## Separator between set elements.
## Please note that the separator has to be exactly one character long.
const set_separator = Input::set_separator &redef;
## String to use for an unset &optional field.
const unset_field = Input::unset_field &redef;
## String to use for empty fields.
const empty_field = Input::empty_field &redef;
}

View file

@ -59,7 +59,7 @@ export {
};
## Enum to represent where data came from when it was discovered.
## The convenction is to prefix the name with "IN_".
## The convention is to prefix the name with ``IN_``.
type Where: enum {
## A catchall value to represent data of unknown provenance.
IN_ANYWHERE,
@ -342,4 +342,4 @@ function insert(item: Item)
else
event Intel::new_item(item);
}

View file

@ -2,5 +2,6 @@
@load ./postprocessors
@load ./writers/ascii
@load ./writers/dataseries
@load ./writers/sqlite
@load ./writers/elasticsearch
@load ./writers/none

View file

@ -17,6 +17,23 @@ export {
## anything else.
const default_writer = WRITER_ASCII &redef;
## Default separator between fields for logwriters.
## Can be overwritten by individual writers.
const separator = "\t" &redef;
## Separator between set elements.
## Can be overwritten by individual writers.
const set_separator = "," &redef;
## String to use for empty fields. This should be different from
## *unset_field* to make the output non-ambigious.
## Can be overwritten by individual writers.
const empty_field = "(empty)" &redef;
## String to use for an unset &optional field.
## Can be overwritten by individual writers.
const unset_field = "-" &redef;
## Type defining the content of a logging stream.
type Stream: record {
## A record type defining the log's columns.
@ -172,6 +189,15 @@ export {
## .. bro:see:: Log::add_default_filter Log::remove_default_filter
global create_stream: function(id: ID, stream: Stream) : bool;
## Removes a logging stream completely, stopping all the threads.
##
## id: The ID enum to be associated with the new logging stream.
##
## Returns: True if a new stream was successfully removed.
##
## .. bro:see:: Log::create_stream
global remove_stream: function(id: ID) : bool;
## Enables a previously disabled logging stream. Disabled streams
## will not be written to until they are enabled again. New streams
## are enabled by default.
@ -340,7 +366,7 @@ export {
# We keep a script-level copy of all filters so that we can manipulate them.
global filters: table[ID, string] of Filter;
@load base/logging.bif # Needs Filter and Stream defined.
@load base/bif/logging.bif # Needs Filter and Stream defined.
module Log;
@ -425,6 +451,12 @@ function create_stream(id: ID, stream: Stream) : bool
return add_default_filter(id);
}
function remove_stream(id: ID) : bool
{
delete active_streams[id];
return __remove_stream(id);
}
function disable_stream(id: ID) : bool
{
delete active_streams[id];

View file

@ -1,5 +1,16 @@
##! Interface for the ASCII log writer. Redefinable options are available
##! to tweak the output format of ASCII logs.
##!
##! The ASCII writer supports currently one writer-specific filter option via
##! ``config``: setting ``tsv`` to the string ``T`` turns the output into into
##! "tab-separated-value" mode where only a single header row with the column names
##! is printed out as meta information, with no "# fields" prepended; no other meta
##! data gets included in that mode.
##!
##! Example filter using this::
##!
##! local my_filter: Log::Filter = [$name = "my-filter", $writer = Log::WRITER_ASCII, $config = table(["tsv"] = "T")];
##!
module LogAscii;
@ -17,17 +28,17 @@ export {
const meta_prefix = "#" &redef;
## Separator between fields.
const separator = "\t" &redef;
const separator = Log::separator &redef;
## Separator between set elements.
const set_separator = "," &redef;
const set_separator = Log::set_separator &redef;
## String to use for empty fields. This should be different from
## *unset_field* to make the output non-ambigious.
const empty_field = "(empty)" &redef;
const empty_field = Log::empty_field &redef;
## String to use for an unset &optional field.
const unset_field = "-" &redef;
const unset_field = Log::unset_field &redef;
}
# Default function to postprocess a rotated ASCII log file. It moves the rotated

View file

@ -0,0 +1,17 @@
##! Interface for the SQLite log writer. Redefinable options are available
##! to tweak the output format of the SQLite reader.
module LogSQLite;
export {
## Separator between set elements.
const set_separator = Log::set_separator &redef;
## String to use for an unset &optional field.
const unset_field = Log::unset_field &redef;
## String to use for empty fields. This should be different from
## *unset_field* to make the output non-ambigious.
const empty_field = Log::empty_field &redef;
}

View file

@ -1,264 +0,0 @@
##! This implements transparent cluster support for the metrics framework.
##! Do not load this file directly. It's only meant to be loaded automatically
##! and will be depending on if the cluster framework has been enabled.
##! The goal of this script is to make metric calculation completely and
##! transparently automated when running on a cluster.
##!
##! Events defined here are not exported deliberately because they are meant
##! to be an internal implementation detail.
@load base/frameworks/cluster
@load ./main
module Metrics;
export {
## Allows a user to decide how large of result groups the
## workers should transmit values for cluster metric aggregation.
const cluster_send_in_groups_of = 50 &redef;
## The percent of the full threshold value that needs to be met
## on a single worker for that worker to send the value to its manager in
## order for it to request a global view for that value. There is no
## requirement that the manager requests a global view for the index
## since it may opt not to if it requested a global view for the index
## recently.
const cluster_request_global_view_percent = 0.1 &redef;
## Event sent by the manager in a cluster to initiate the
## collection of metrics values for a filter.
global cluster_filter_request: event(uid: string, id: ID, filter_name: string);
## Event sent by nodes that are collecting metrics after receiving
## a request for the metric filter from the manager.
global cluster_filter_response: event(uid: string, id: ID, filter_name: string, data: MetricTable, done: bool);
## This event is sent by the manager in a cluster to initiate the
## collection of a single index value from a filter. It's typically
## used to get intermediate updates before the break interval triggers
## to speed detection of a value crossing a threshold.
global cluster_index_request: event(uid: string, id: ID, filter_name: string, index: Index);
## This event is sent by nodes in response to a
## :bro:id:`Metrics::cluster_index_request` event.
global cluster_index_response: event(uid: string, id: ID, filter_name: string, index: Index, val: count);
## This is sent by workers to indicate that they crossed the percent of the
## current threshold by the percentage defined globally in
## :bro:id:`Metrics::cluster_request_global_view_percent`
global cluster_index_intermediate_response: event(id: Metrics::ID, filter_name: string, index: Metrics::Index, val: count);
## This event is scheduled internally on workers to send result chunks.
global send_data: event(uid: string, id: ID, filter_name: string, data: MetricTable);
}
# This is maintained by managers so they can know what data they requested and
# when they requested it.
global requested_results: table[string] of time = table() &create_expire=5mins;
# TODO: The next 4 variables make the assumption that a value never
# takes longer than 5 minutes to transmit from workers to manager. This needs to
# be tunable or self-tuning. These should also be restructured to be
# maintained within a single variable.
# This variable is maintained by manager nodes as they collect and aggregate
# results.
global filter_results: table[string, ID, string] of MetricTable &create_expire=5mins;
# This variable is maintained by manager nodes to track how many "dones" they
# collected per collection unique id. Once the number of results for a uid
# matches the number of peer nodes that results should be coming from, the
# result is written out and deleted from here.
# TODO: add an &expire_func in case not all results are received.
global done_with: table[string] of count &create_expire=5mins &default=0;
# This variable is maintained by managers to track intermediate responses as
# they are getting a global view for a certain index.
global index_requests: table[string, ID, string, Index] of count &create_expire=5mins &default=0;
# This variable is maintained by all hosts for different purposes. Non-managers
# maintain it to know what indexes they have recently sent as intermediate
# updates so they don't overwhelm their manager. Managers maintain it so they
# don't overwhelm workers with intermediate index requests. The count that is
# yielded is the number of times the percentage threshold has been crossed and
# an intermediate result has been received. The manager may optionally request
# the index again before data expires from here if too many workers are crossing
# the percentage threshold (not implemented yet!).
global recent_global_view_indexes: table[ID, string, Index] of count &create_expire=5mins &default=0;
# Add events to the cluster framework to make this work.
redef Cluster::manager2worker_events += /Metrics::cluster_(filter_request|index_request)/;
redef Cluster::worker2manager_events += /Metrics::cluster_(filter_response|index_response|index_intermediate_response)/;
@if ( Cluster::local_node_type() != Cluster::MANAGER )
# This is done on all non-manager node types in the event that a metric is
# being collected somewhere other than a worker.
function data_added(filter: Filter, index: Index, val: count)
{
# If an intermediate update for this value was sent recently, don't send
# it again.
if ( [filter$id, filter$name, index] in recent_global_view_indexes )
return;
# If val is 5 and global view % is 0.1 (10%), pct_val will be 50. If that
# crosses the full threshold then it's a candidate to send as an
# intermediate update.
local pct_val = double_to_count(val / cluster_request_global_view_percent);
if ( check_notice(filter, index, pct_val) )
{
# kick off intermediate update
event Metrics::cluster_index_intermediate_response(filter$id, filter$name, index, val);
++recent_global_view_indexes[filter$id, filter$name, index];
}
}
event Metrics::send_data(uid: string, id: ID, filter_name: string, data: MetricTable)
{
#print fmt("WORKER %s: sending data for uid %s...", Cluster::node, uid);
local local_data: MetricTable;
local num_added = 0;
for ( index in data )
{
local_data[index] = data[index];
delete data[index];
# Only send cluster_send_in_groups_of at a time. Queue another
# event to send the next group.
if ( cluster_send_in_groups_of == ++num_added )
break;
}
local done = F;
# If data is empty, this metric is done.
if ( |data| == 0 )
done = T;
event Metrics::cluster_filter_response(uid, id, filter_name, local_data, done);
if ( ! done )
event Metrics::send_data(uid, id, filter_name, data);
}
event Metrics::cluster_filter_request(uid: string, id: ID, filter_name: string)
{
#print fmt("WORKER %s: received the cluster_filter_request event.", Cluster::node);
# Initiate sending all of the data for the requested filter.
event Metrics::send_data(uid, id, filter_name, store[id, filter_name]);
# Lookup the actual filter and reset it, the reference to the data
# currently stored will be maintained interally by the send_data event.
reset(filter_store[id, filter_name]);
}
event Metrics::cluster_index_request(uid: string, id: ID, filter_name: string, index: Index)
{
local val=0;
if ( index in store[id, filter_name] )
val = store[id, filter_name][index];
# fmt("WORKER %s: received the cluster_index_request event for %s=%d.", Cluster::node, index2str(index), val);
event Metrics::cluster_index_response(uid, id, filter_name, index, val);
}
@endif
@if ( Cluster::local_node_type() == Cluster::MANAGER )
# Manager's handle logging.
event Metrics::log_it(filter: Filter)
{
#print fmt("%.6f MANAGER: breaking %s filter for %s metric", network_time(), filter$name, filter$id);
local uid = unique_id("");
# Set some tracking variables.
requested_results[uid] = network_time();
filter_results[uid, filter$id, filter$name] = table();
# Request data from peers.
event Metrics::cluster_filter_request(uid, filter$id, filter$name);
# Schedule the log_it event for the next break period.
schedule filter$break_interval { Metrics::log_it(filter) };
}
# This is unlikely to be called often, but it's here in case there are metrics
# being collected by managers.
function data_added(filter: Filter, index: Index, val: count)
{
if ( check_notice(filter, index, val) )
do_notice(filter, index, val);
}
event Metrics::cluster_index_response(uid: string, id: ID, filter_name: string, index: Index, val: count)
{
#print fmt("%0.6f MANAGER: receiving index data from %s", network_time(), get_event_peer()$descr);
if ( [uid, id, filter_name, index] !in index_requests )
index_requests[uid, id, filter_name, index] = 0;
index_requests[uid, id, filter_name, index] += val;
local ir = index_requests[uid, id, filter_name, index];
++done_with[uid];
if ( Cluster::worker_count == done_with[uid] )
{
if ( check_notice(filter_store[id, filter_name], index, ir) )
do_notice(filter_store[id, filter_name], index, ir);
delete done_with[uid];
delete index_requests[uid, id, filter_name, index];
}
}
# Managers handle intermediate updates here.
event Metrics::cluster_index_intermediate_response(id: ID, filter_name: string, index: Index, val: count)
{
#print fmt("MANAGER: receiving intermediate index data from %s", get_event_peer()$descr);
#print fmt("MANAGER: requesting index data for %s", index2str(index));
local uid = unique_id("");
event Metrics::cluster_index_request(uid, id, filter_name, index);
++recent_global_view_indexes[id, filter_name, index];
}
event Metrics::cluster_filter_response(uid: string, id: ID, filter_name: string, data: MetricTable, done: bool)
{
#print fmt("MANAGER: receiving results from %s", get_event_peer()$descr);
local local_data = filter_results[uid, id, filter_name];
for ( index in data )
{
if ( index !in local_data )
local_data[index] = 0;
local_data[index] += data[index];
}
# Mark another worker as being "done" for this uid.
if ( done )
++done_with[uid];
# If the data has been collected from all peers, we are done and ready to log.
if ( Cluster::worker_count == done_with[uid] )
{
local ts = network_time();
# Log the time this was initially requested if it's available.
if ( uid in requested_results )
{
ts = requested_results[uid];
delete requested_results[uid];
}
write_log(ts, filter_store[id, filter_name], local_data);
# Clean up
delete filter_results[uid, id, filter_name];
delete done_with[uid];
}
}
@endif

View file

@ -1,320 +0,0 @@
##! The metrics framework provides a way to count and measure data.
@load base/frameworks/notice
module Metrics;
export {
## The metrics logging stream identifier.
redef enum Log::ID += { LOG };
## Identifiers for metrics to collect.
type ID: enum {
## Blank placeholder value.
NOTHING,
};
## The default interval used for "breaking" metrics and writing the
## current value to the logging stream.
const default_break_interval = 15mins &redef;
## This is the interval for how often threshold based notices will happen
## after they have already fired.
const renotice_interval = 1hr &redef;
## Represents a thing which is having metrics collected for it. An instance
## of this record type and a :bro:type:`Metrics::ID` together represent a
## single measurement.
type Index: record {
## Host is the value to which this metric applies.
host: addr &optional;
## A non-address related metric or a sub-key for an address based metric.
## An example might be successful SSH connections by client IP address
## where the client string would be the index value.
## Another example might be number of HTTP requests to a particular
## value in a Host header. This is an example of a non-host based
## metric since multiple IP addresses could respond for the same Host
## header value.
str: string &optional;
## The CIDR block that this metric applies to. This is typically
## only used internally for host based aggregation.
network: subnet &optional;
} &log;
## The record type that is used for logging metrics.
type Info: record {
## Timestamp at which the metric was "broken".
ts: time &log;
## What measurement the metric represents.
metric_id: ID &log;
## The name of the filter being logged. :bro:type:`Metrics::ID` values
## can have multiple filters which represent different perspectives on
## the data so this is necessary to understand the value.
filter_name: string &log;
## What the metric value applies to.
index: Index &log;
## The simple numeric value of the metric.
value: count &log;
};
# TODO: configure a metrics filter logging stream to log the current
# metrics configuration in case someone is looking through
# old logs and the configuration has changed since then.
## Filters define how the data from a metric is aggregated and handled.
## Filters can be used to set how often the measurements are cut or "broken"
## and logged or how the data within them is aggregated. It's also
## possible to disable logging and use filters for thresholding.
type Filter: record {
## The :bro:type:`Metrics::ID` that this filter applies to.
id: ID &optional;
## The name for this filter so that multiple filters can be
## applied to a single metrics to get a different view of the same
## metric data being collected (different aggregation, break, etc).
name: string &default="default";
## A predicate so that you can decide per index if you would like
## to accept the data being inserted.
pred: function(index: Index): bool &optional;
## Global mask by which you'd like to aggregate traffic.
aggregation_mask: count &optional;
## This is essentially a mapping table between addresses and subnets.
aggregation_table: table[subnet] of subnet &optional;
## The interval at which this filter should be "broken" and written
## to the logging stream. The counters are also reset to zero at
## this time so any threshold based detection needs to be set to a
## number that should be expected to happen within this period.
break_interval: interval &default=default_break_interval;
## This determines if the result of this filter is sent to the metrics
## logging stream. One use for the logging framework is as an internal
## thresholding and statistics gathering utility that is meant to
## never log but rather to generate notices and derive data.
log: bool &default=T;
## If this and a $notice_threshold value are set, this notice type
## will be generated by the metrics framework.
note: Notice::Type &optional;
## A straight threshold for generating a notice.
notice_threshold: count &optional;
## A series of thresholds at which to generate notices.
notice_thresholds: vector of count &optional;
## How often this notice should be raised for this filter. It
## will be generated everytime it crosses a threshold, but if the
## $break_interval is set to 5mins and this is set to 1hr the notice
## only be generated once per hour even if something crosses the
## threshold in every break interval.
notice_freq: interval &optional;
};
## Function to associate a metric filter with a metric ID.
##
## id: The metric ID that the filter should be associated with.
##
## filter: The record representing the filter configuration.
global add_filter: function(id: ID, filter: Filter);
## Add data into a :bro:type:`Metrics::ID`. This should be called when
## a script has measured some point value and is ready to increment the
## counters.
##
## id: The metric ID that the data represents.
##
## index: The metric index that the value is to be added to.
##
## increment: How much to increment the counter by.
global add_data: function(id: ID, index: Index, increment: count);
## Helper function to represent a :bro:type:`Metrics::Index` value as
## a simple string
##
## index: The metric index that is to be converted into a string.
##
## Returns: A string reprentation of the metric index.
global index2str: function(index: Index): string;
## Event that is used to "finish" metrics and adapt the metrics
## framework for clustered or non-clustered usage.
##
## ..note: This is primarily intended for internal use.
global log_it: event(filter: Filter);
## Event to access metrics records as they are passed to the logging framework.
global log_metrics: event(rec: Info);
## Type to store a table of metrics values. Interal use only!
type MetricTable: table[Index] of count &default=0;
}
redef record Notice::Info += {
metric_index: Index &log &optional;
};
global metric_filters: table[ID] of vector of Filter = table();
global filter_store: table[ID, string] of Filter = table();
# This is indexed by metric ID and stream filter name.
global store: table[ID, string] of MetricTable = table() &default=table();
# This function checks if a threshold has been crossed and generates a
# notice if it has. It is also used as a method to implement
# mid-break-interval threshold crossing detection for cluster deployments.
global check_notice: function(filter: Filter, index: Index, val: count): bool;
# This is hook for watching thresholds being crossed. It is called whenever
# index values are updated and the new val is given as the `val` argument.
global data_added: function(filter: Filter, index: Index, val: count);
# This stores the current threshold index for filters using the
# $notice_threshold and $notice_thresholds elements.
global thresholds: table[ID, string, Index] of count = {} &create_expire=renotice_interval &default=0;
event bro_init() &priority=5
{
Log::create_stream(Metrics::LOG, [$columns=Info, $ev=log_metrics]);
}
function index2str(index: Index): string
{
local out = "";
if ( index?$host )
out = fmt("%shost=%s", out, index$host);
if ( index?$network )
out = fmt("%s%snetwork=%s", out, |out|==0 ? "" : ", ", index$network);
if ( index?$str )
out = fmt("%s%sstr=%s", out, |out|==0 ? "" : ", ", index$str);
return fmt("metric_index(%s)", out);
}
function write_log(ts: time, filter: Filter, data: MetricTable)
{
for ( index in data )
{
local val = data[index];
local m: Info = [$ts=ts,
$metric_id=filter$id,
$filter_name=filter$name,
$index=index,
$value=val];
if ( filter$log )
Log::write(Metrics::LOG, m);
}
}
function reset(filter: Filter)
{
store[filter$id, filter$name] = table();
}
function add_filter(id: ID, filter: Filter)
{
if ( filter?$aggregation_table && filter?$aggregation_mask )
{
print "INVALID Metric filter: Defined $aggregation_table and $aggregation_mask.";
return;
}
if ( [id, filter$name] in store )
{
print fmt("INVALID Metric filter: Filter with name \"%s\" already exists.", filter$name);
return;
}
if ( filter?$notice_threshold && filter?$notice_thresholds )
{
print "INVALID Metric filter: Defined both $notice_threshold and $notice_thresholds";
return;
}
if ( ! filter?$id )
filter$id = id;
if ( id !in metric_filters )
metric_filters[id] = vector();
metric_filters[id][|metric_filters[id]|] = filter;
filter_store[id, filter$name] = filter;
store[id, filter$name] = table();
schedule filter$break_interval { Metrics::log_it(filter) };
}
function add_data(id: ID, index: Index, increment: count)
{
if ( id !in metric_filters )
return;
local filters = metric_filters[id];
# Try to add the data to all of the defined filters for the metric.
for ( filter_id in filters )
{
local filter = filters[filter_id];
# If this filter has a predicate, run the predicate and skip this
# index if the predicate return false.
if ( filter?$pred && ! filter$pred(index) )
next;
if ( index?$host )
{
if ( filter?$aggregation_mask )
{
index$network = mask_addr(index$host, filter$aggregation_mask);
delete index$host;
}
else if ( filter?$aggregation_table )
{
# Don't add the data if the aggregation table doesn't include
# the given host address.
if ( index$host !in filter$aggregation_table )
return;
index$network = filter$aggregation_table[index$host];
delete index$host;
}
}
local metric_tbl = store[id, filter$name];
if ( index !in metric_tbl )
metric_tbl[index] = 0;
metric_tbl[index] += increment;
data_added(filter, index, metric_tbl[index]);
}
}
function check_notice(filter: Filter, index: Index, val: count): bool
{
if ( (filter?$notice_threshold &&
[filter$id, filter$name, index] !in thresholds &&
val >= filter$notice_threshold) ||
(filter?$notice_thresholds &&
|filter$notice_thresholds| <= thresholds[filter$id, filter$name, index] &&
val >= filter$notice_thresholds[thresholds[filter$id, filter$name, index]]) )
return T;
else
return F;
}
function do_notice(filter: Filter, index: Index, val: count)
{
# We include $peer_descr here because the a manager count have actually
# generated the notice even though the current remote peer for the event
# calling this could be a worker if this is running as a cluster.
local n: Notice::Info = [$note=filter$note,
$n=val,
$metric_index=index,
$peer_descr=peer_description];
n$msg = fmt("Threshold crossed by %s %d/%d", index2str(index), val, filter$notice_threshold);
if ( index?$str )
n$sub = index$str;
if ( index?$host )
n$src = index$host;
# TODO: not sure where to put the network yet.
NOTICE(n);
# This just needs set to some value so that it doesn't refire the
# notice until it expires from the table or it crosses the next
# threshold in the case of vectors of thresholds.
++thresholds[filter$id, filter$name, index];
}

View file

@ -1,21 +0,0 @@
@load ./main
module Metrics;
event Metrics::log_it(filter: Filter)
{
local id = filter$id;
local name = filter$name;
write_log(network_time(), filter, store[id, name]);
reset(filter);
schedule filter$break_interval { Metrics::log_it(filter) };
}
function data_added(filter: Filter, index: Index, val: count)
{
if ( check_notice(filter, index, val) )
do_notice(filter, index, val);
}

View file

@ -17,7 +17,9 @@
@if ( Cluster::is_enabled() )
@load ./cluster
@else
@load ./non-cluster
@endif
# Load here so that it can check whether clustering is enabled.
@load ./actions/pp-alarms
@load ./actions/pp-alarms

View file

@ -27,18 +27,17 @@ export {
## Notice types which should have the "remote" location looked up.
## If GeoIP support is not built in, this does nothing.
const lookup_location_types: set[Notice::Type] = {} &redef;
## Add a helper to the notice policy for looking up GeoIP data.
redef Notice::policy += {
[$pred(n: Notice::Info) = { return (n$note in Notice::lookup_location_types); },
$action = ACTION_ADD_GEODATA,
$priority = 10],
};
}
hook policy(n: Notice::Info) &priority=10
{
if ( n$note in Notice::lookup_location_types )
add n$actions[ACTION_ADD_GEODATA];
}
# This is handled at a high priority in case other notice handlers
# want to use the data.
event notice(n: Notice::Info) &priority=10
hook notice(n: Notice::Info) &priority=10
{
if ( ACTION_ADD_GEODATA in n$actions &&
|Site::local_nets| > 0 &&

View file

@ -17,20 +17,13 @@ export {
};
}
# This is a little awkward because we want to inject drop along with the
# synchronous functions.
event bro_init()
hook notice(n: Notice::Info)
{
local drop_func = function(n: Notice::Info)
if ( ACTION_DROP in n$actions )
{
if ( ACTION_DROP in n$actions )
{
#local drop = React::drop_address(n$src, "");
#local addl = drop?$sub ? fmt(" %s", drop$sub) : "";
#n$dropped = drop$note != Drop::AddressDropIgnored;
#n$msg += fmt(" [%s%s]", drop$note, addl);
}
};
add Notice::sync_functions[drop_func];
#local drop = React::drop_address(n$src, "");
#local addl = drop?$sub ? fmt(" %s", drop$sub) : "";
#n$dropped = drop$note != Drop::AddressDropIgnored;
#n$msg += fmt(" [%s%s]", drop$note, addl);
}
}

View file

@ -18,7 +18,7 @@ export {
};
}
event notice(n: Notice::Info) &priority=-5
hook notice(n: Notice::Info) &priority=-5
{
if ( |Site::local_admins| > 0 &&
ACTION_EMAIL_ADMIN in n$actions )

View file

@ -15,7 +15,7 @@ export {
const mail_page_dest = "" &redef;
}
event notice(n: Notice::Info) &priority=-5
hook notice(n: Notice::Info) &priority=-5
{
if ( ACTION_PAGE in n$actions )
email_notice_to(n, mail_page_dest, F);

View file

@ -105,7 +105,7 @@ event bro_init()
$postprocessor=pp_postprocessor]);
}
event notice(n: Notice::Info) &priority=-5
hook notice(n: Notice::Info) &priority=-5
{
if ( ! want_pp() )
return;

View file

@ -21,32 +21,10 @@ redef Cluster::manager2worker_events += /Notice::begin_suppression/;
redef Cluster::worker2manager_events += /Notice::cluster_notice/;
@if ( Cluster::local_node_type() != Cluster::MANAGER )
# The notice policy is completely handled by the manager and shouldn't be
# done by workers or proxies to save time for packet processing.
event bro_init() &priority=11
{
Notice::policy = table();
}
event Notice::begin_suppression(n: Notice::Info)
{
suppressing[n$note, n$identifier] = n;
}
event Notice::notice(n: Notice::Info)
{
# Send the locally generated notice on to the manager.
event Notice::cluster_notice(n);
}
event bro_init() &priority=-3
{
# Workers and proxies need to disable the notice streams because notice
# events are forwarded directly instead of being logged remotely.
Log::disable_stream(Notice::LOG);
Log::disable_stream(Notice::POLICY_LOG);
Log::disable_stream(Notice::ALARM_LOG);
}
@endif
@if ( Cluster::local_node_type() == Cluster::MANAGER )
@ -56,3 +34,19 @@ event Notice::cluster_notice(n: Notice::Info)
NOTICE(n);
}
@endif
module GLOBAL;
## This is the entry point in the global namespace for the notice framework.
function NOTICE(n: Notice::Info)
{
# Suppress this notice if necessary.
if ( Notice::is_being_suppressed(n) )
return;
if ( Cluster::local_node_type() == Cluster::MANAGER )
Notice::internal_NOTICE(n);
else
# For non-managers, send the notice on to the manager.
event Notice::cluster_notice(n);
}

View file

@ -13,7 +13,7 @@ module Notice;
# reference to the original notice)
global tmp_notice_storage: table[string] of Notice::Info &create_expire=max_email_delay+10secs;
event Notice::notice(n: Notice::Info) &priority=10
hook notice(n: Notice::Info) &priority=10
{
if ( ! n?$src && ! n?$dst )
return;

View file

@ -10,9 +10,6 @@ export {
redef enum Log::ID += {
## This is the primary logging stream for notices.
LOG,
## This is the notice policy auditing log. It records what the current
## notice policy is at Bro init time.
POLICY_LOG,
## This is the alarm stream.
ALARM_LOG,
};
@ -42,9 +39,6 @@ export {
## version of the alarm log is emailed in bulk to the address(es)
## configured in :bro:id:`Notice::mail_dest`.
ACTION_ALARM,
## Indicates that the notice should not be supressed by the normal
## duplicate notice suppression that the notice framework does.
ACTION_NO_SUPPRESS,
};
## The notice framework is able to do automatic notice supression by
@ -64,7 +58,7 @@ export {
## A connection 4-tuple identifying the endpoints concerned with the
## notice.
id: conn_id &log &optional;
## A shorthand way of giving the uid and id to a notice. The
## reference to the actual connection will be deleted after applying
## the notice policy.
@ -102,10 +96,6 @@ export {
## The actions which have been applied to this notice.
actions: set[Notice::Action] &log &optional;
## These are policy items that returned T and applied their action
## to the notice.
policy_items: set[count] &log &optional;
## By adding chunks of text into this element, other scripts can
## expand on notices that are being emailed. The normal way to add text
## is to extend the vector by handling the :bro:id:`Notice::notice`
@ -142,9 +132,8 @@ export {
identifier: string &optional;
## This field indicates the length of time that this
## unique notice should be suppressed. This field is automatically
## filled out and should not be written to by any other script.
suppress_for: interval &log &optional;
## unique notice should be suppressed.
suppress_for: interval &log &default=default_suppression_interval;
};
## Ignored notice types.
@ -159,58 +148,8 @@ export {
## intervals for entire notice types.
const type_suppression_intervals: table[Notice::Type] of interval = {} &redef;
## This is the record that defines the items that make up the notice policy.
type PolicyItem: record {
## This is the exact positional order in which the
## :bro:type:`Notice::PolicyItem` records are checked.
## This is set internally by the notice framework.
position: count &log &optional;
## Define the priority for this check. Items are checked in ordered
## from highest value (10) to lowest value (0).
priority: count &log &default=5;
## An action given to the notice if the predicate return true.
action: Notice::Action &log &default=ACTION_NONE;
## The pred (predicate) field is a function that returns a boolean T
## or F value. If the predicate function return true, the action in
## this record is applied to the notice that is given as an argument
## to the predicate function. If no predicate is supplied, it's
## assumed that the PolicyItem always applies.
pred: function(n: Notice::Info): bool &log &optional;
## Indicates this item should terminate policy processing if the
## predicate returns T.
halt: bool &log &default=F;
## This defines the length of time that this particular notice should
## be supressed.
suppress_for: interval &log &optional;
};
## Defines a notice policy that is extensible on a per-site basis.
## All notice processing is done through this variable.
const policy: set[PolicyItem] = {
[$pred(n: Notice::Info) = { return (n$note in Notice::ignored_types); },
$halt=T, $priority = 9],
[$pred(n: Notice::Info) = { return (n$note in Notice::not_suppressed_types); },
$action = ACTION_NO_SUPPRESS,
$priority = 9],
[$pred(n: Notice::Info) = { return (n$note in Notice::alarmed_types); },
$action = ACTION_ALARM,
$priority = 8],
[$pred(n: Notice::Info) = { return (n$note in Notice::emailed_types); },
$action = ACTION_EMAIL,
$priority = 8],
[$pred(n: Notice::Info) = {
if (n$note in Notice::type_suppression_intervals)
{
n$suppress_for=Notice::type_suppression_intervals[n$note];
return T;
}
return F;
},
$action = ACTION_NONE,
$priority = 8],
[$action = ACTION_LOG,
$priority = 0],
} &redef;
## The hook to modify notice handling.
global policy: hook(n: Notice::Info);
## Local system sendmail program.
const sendmail = "/usr/sbin/sendmail" &redef;
@ -240,25 +179,11 @@ export {
## This is the event that is called as the entry point to the
## notice framework by the global :bro:id:`NOTICE` function. By the time
## this event is generated, default values have already been filled out in
## the :bro:type:`Notice::Info` record and synchronous functions in the
## :bro:id:`Notice::sync_functions` have already been called. The notice
## the :bro:type:`Notice::Info` record and the notice
## policy has also been applied.
##
## n: The record containing notice data.
global notice: event(n: Info);
## This is a set of functions that provide a synchronous way for scripts
## extending the notice framework to run before the normal event based
## notice pathway that most of the notice framework takes. This is helpful
## in cases where an action against a notice needs to happen immediately
## and can't wait the short time for the event to bubble up to the top of
## the event queue. An example is the IP address dropping script that
## can block IP addresses that have notices generated because it
## needs to operate closer to real time than the event queue allows it to.
## Normally the event based extension model using the
## :bro:id:`Notice::notice` event will work fine if there aren't harder
## real time constraints.
const sync_functions: set[function(n: Notice::Info)] = set() &redef;
global notice: hook(n: Info);
## This event is generated when a notice begins to be suppressed.
##
@ -266,6 +191,11 @@ export {
## about to be suppressed.
global begin_suppression: event(n: Notice::Info);
## A function to determine if an event is supposed to be suppressed.
##
## n: The record containing the notice in question.
global is_being_suppressed: function(n: Notice::Info): bool;
## This event is generated on each occurence of an event being suppressed.
##
## n: The record containing notice data regarding the notice type
@ -299,13 +229,13 @@ export {
##
## Returns: a string of mail headers to which an email body can be appended
global email_headers: function(subject_desc: string, dest: string): string;
## This event can be handled to access the :bro:type:`Notice::Info`
## record as it is sent on to the logging framework.
##
## rec: The record containing notice data before it is logged.
global log_notice: event(rec: Info);
## This is an internal wrapper for the global :bro:id:`NOTICE` function;
## disregard.
##
@ -338,10 +268,6 @@ global suppressing: table[Type, string] of Notice::Info = {}
&create_expire=0secs
&expire_func=per_notice_suppression_interval;
# This is an internal variable used to store the notice policy ordered by
# priority.
global ordered_policy: vector of PolicyItem = vector();
function log_mailing_postprocessor(info: Log::RotationInfo): bool
{
if ( ! reading_traces() && mail_dest != "" )
@ -424,9 +350,7 @@ function email_notice_to(n: Notice::Info, dest: string, extend: bool)
}
else
{
event reporter_info(network_time(),
fmt("Notice email delay tokens weren't released in time (%s).", n$email_delay_tokens),
"");
Reporter::info(fmt("Notice email delay tokens weren't released in time (%s).", n$email_delay_tokens));
}
}
}
@ -468,7 +392,26 @@ function email_notice_to(n: Notice::Info, dest: string, extend: bool)
piped_exec(fmt("%s -t -oi", sendmail), email_text);
}
event notice(n: Notice::Info) &priority=-5
hook Notice::policy(n: Notice::Info) &priority=10
{
if ( n$note in Notice::ignored_types )
break;
if ( n$note in Notice::not_suppressed_types )
n$suppress_for=0secs;
if ( n$note in Notice::alarmed_types )
add n$actions[ACTION_ALARM];
if ( n$note in Notice::emailed_types )
add n$actions[ACTION_EMAIL];
if ( n$note in Notice::type_suppression_intervals )
n$suppress_for=Notice::type_suppression_intervals[n$note];
# Logging is a default action. It can be removed in a later hook if desired.
add n$actions[ACTION_LOG];
}
hook Notice::notice(n: Notice::Info) &priority=-5
{
if ( ACTION_EMAIL in n$actions )
email_notice_to(n, mail_dest, T);
@ -480,7 +423,6 @@ event notice(n: Notice::Info) &priority=-5
# Normally suppress further notices like this one unless directed not to.
# n$identifier *must* be specified for suppression to function at all.
if ( n?$identifier &&
ACTION_NO_SUPPRESS !in n$actions &&
[n$note, n$identifier] !in suppressing &&
n$suppress_for != 0secs )
{
@ -488,10 +430,7 @@ event notice(n: Notice::Info) &priority=-5
event Notice::begin_suppression(n);
}
}
## This determines if a notice is being suppressed. It is only used
## internally as part of the mechanics for the global :bro:id:`NOTICE`
## function.
function is_being_suppressed(n: Notice::Info): bool
{
if ( n?$identifier && [n$note, n$identifier] in suppressing )
@ -539,7 +478,7 @@ function apply_policy(n: Notice::Info)
n$p = n$id$resp_p;
}
if ( n?$p )
if ( n?$p )
n$proto = get_port_transport_proto(n$p);
if ( n?$iconn )
@ -565,27 +504,8 @@ function apply_policy(n: Notice::Info)
if ( ! n?$email_delay_tokens )
n$email_delay_tokens = set();
if ( ! n?$policy_items )
n$policy_items = set();
for ( i in ordered_policy )
{
# If there's no predicate or the predicate returns F.
if ( ! ordered_policy[i]?$pred || ordered_policy[i]$pred(n) )
{
add n$actions[ordered_policy[i]$action];
add n$policy_items[int_to_count(i)];
# If the predicate matched and there was a suppression interval,
# apply it to the notice now.
if ( ordered_policy[i]?$suppress_for )
n$suppress_for = ordered_policy[i]$suppress_for;
# If the policy item wants to halt policy processing, do it now!
if ( ordered_policy[i]$halt )
break;
}
}
# Apply the hook based policy.
hook Notice::policy(n);
# Apply the suppression time after applying the policy so that policy
# items can give custom suppression intervals. If there is no
@ -602,61 +522,15 @@ function apply_policy(n: Notice::Info)
delete n$iconn;
}
# Create the ordered notice policy automatically which will be used at runtime
# for prioritized matching of the notice policy.
event bro_init() &priority=10
{
# Create the policy log here because it's only written to in this handler.
Log::create_stream(Notice::POLICY_LOG, [$columns=PolicyItem]);
local tmp: table[count] of set[PolicyItem] = table();
for ( pi in policy )
{
if ( pi$priority < 0 || pi$priority > 10 )
Reporter::fatal("All Notice::PolicyItem priorities must be within 0 and 10");
if ( pi$priority !in tmp )
tmp[pi$priority] = set();
add tmp[pi$priority][pi];
}
local rev_count = vector(10,9,8,7,6,5,4,3,2,1,0);
for ( i in rev_count )
{
local j = rev_count[i];
if ( j in tmp )
{
for ( pi in tmp[j] )
{
pi$position = |ordered_policy|;
ordered_policy[|ordered_policy|] = pi;
Log::write(Notice::POLICY_LOG, pi);
}
}
}
}
function internal_NOTICE(n: Notice::Info)
{
# Suppress this notice if necessary.
if ( is_being_suppressed(n) )
return;
# Fill out fields that might be empty and do the policy processing.
apply_policy(n);
# Run the synchronous functions with the notice.
for ( func in sync_functions )
func(n);
# Generate the notice event with the notice.
event Notice::notice(n);
hook Notice::notice(n);
}
module GLOBAL;
## This is the entry point in the global namespace for notice framework.
function NOTICE(n: Notice::Info)
{
Notice::internal_NOTICE(n);
}
global NOTICE: function(n: Notice::Info);

View file

@ -0,0 +1,14 @@
@load ./main
module GLOBAL;
## This is the entry point in the global namespace for notice framework.
function NOTICE(n: Notice::Info)
{
# Suppress this notice if necessary.
if ( Notice::is_being_suppressed(n) )
return;
Notice::internal_NOTICE(n);
}

View file

@ -1,10 +1,15 @@
##! This framework is intended to create an output and filtering path for
##! internal messages/warnings/errors. It should typically be loaded to
##! avoid Bro spewing internal messages to standard error and instead log
##! them to a file in a standard way. Note that this framework deals with
##! the handling of internally-generated reporter messages, for the
##! interface into actually creating reporter messages from the scripting
##! layer, use the built-in functions in :doc:`/scripts/base/reporter.bif`.
##! log such messages to a file in a standard way. For the options to
##! toggle whether messages are additionally written to STDERR, see
##! :bro:see:`Reporter::info_to_stderr`,
##! :bro:see:`Reporter::warnings_to_stderr`, and
##! :bro:see:`Reporter::errors_to_stderr`.
##!
##! Note that this framework deals with the handling of internally generated
##! reporter messages, for the interface in to actually creating interface
##! into actually creating reporter messages from the scripting layer, use
##! the built-in functions in :doc:`/scripts/base/bif/reporter.bif`.
module Reporter;
@ -36,26 +41,11 @@ export {
## Not all reporter messages will have locations in them though.
location: string &log &optional;
};
## Tunable for sending reporter warning messages to STDERR. The option to
## turn it off is presented here in case Bro is being run by some
## external harness and shouldn't output anything to the console.
const warnings_to_stderr = T &redef;
## Tunable for sending reporter error messages to STDERR. The option to
## turn it off is presented here in case Bro is being run by some
## external harness and shouldn't output anything to the console.
const errors_to_stderr = T &redef;
}
global stderr: file;
event bro_init() &priority=5
{
Log::create_stream(Reporter::LOG, [$columns=Info]);
if ( errors_to_stderr || warnings_to_stderr )
stderr = open("/dev/stderr");
}
event reporter_info(t: time, msg: string, location: string) &priority=-5
@ -65,26 +55,10 @@ event reporter_info(t: time, msg: string, location: string) &priority=-5
event reporter_warning(t: time, msg: string, location: string) &priority=-5
{
if ( warnings_to_stderr )
{
if ( t > double_to_time(0.0) )
print stderr, fmt("WARNING: %.6f %s (%s)", t, msg, location);
else
print stderr, fmt("WARNING: %s (%s)", msg, location);
}
Log::write(Reporter::LOG, [$ts=t, $level=WARNING, $message=msg, $location=location]);
}
event reporter_error(t: time, msg: string, location: string) &priority=-5
{
if ( errors_to_stderr )
{
if ( t > double_to_time(0.0) )
print stderr, fmt("ERROR: %.6f %s (%s)", t, msg, location);
else
print stderr, fmt("ERROR: %s (%s)", msg, location);
}
Log::write(Reporter::LOG, [$ts=t, $level=ERROR, $message=msg, $location=location]);
}

View file

@ -148,7 +148,7 @@ function has_signature_matched(id: string, orig: addr, resp: addr): bool
event sig_summary(orig: addr, id: string, msg: string)
{
NOTICE([$note=Signature_Summary, $src=orig,
$filename=id, $msg=fmt("%s: %s", orig, msg),
$msg=fmt("%s: %s", orig, msg),
$n=count_per_orig[orig,id] ]);
}
@ -161,7 +161,7 @@ event signature_match(state: signature_state, msg: string, data: string)
return;
# Trim the matched data down to something reasonable
if ( byte_len(data) > 140 )
if ( |data| > 140 )
data = fmt("%s...", sub_bytes(data, 0, 140));
local src_addr: addr;
@ -209,7 +209,6 @@ event signature_match(state: signature_state, msg: string, data: string)
{
NOTICE([$note=Count_Signature, $conn=state$conn,
$msg=msg,
$filename=sig_id,
$n=count_per_resp[dst,sig_id],
$sub=fmt("%d matches of signature %s on host %s",
count_per_resp[dst,sig_id],
@ -240,7 +239,7 @@ event signature_match(state: signature_state, msg: string, data: string)
if ( notice )
NOTICE([$note=Sensitive_Signature,
$conn=state$conn, $src=src_addr,
$dst=dst_addr, $filename=sig_id, $msg=fmt("%s: %s", src_addr, msg),
$dst=dst_addr, $msg=fmt("%s: %s", src_addr, msg),
$sub=data]);
if ( action == SIG_FILE_BUT_NO_SCAN || action == SIG_SUMMARY )
@ -260,8 +259,8 @@ event signature_match(state: signature_state, msg: string, data: string)
add vert_table[orig, resp][sig_id];
local hcount = length(horiz_table[orig, sig_id]);
local vcount = length(vert_table[orig, resp]);
local hcount = |horiz_table[orig, sig_id]|;
local vcount = |vert_table[orig, resp]|;
if ( hcount in horiz_scan_thresholds && hcount != last_hthresh[orig] )
{
@ -274,7 +273,7 @@ event signature_match(state: signature_state, msg: string, data: string)
$src_addr=orig, $sig_id=sig_id, $event_msg=msg,
$host_count=hcount, $sub_msg=horz_scan_msg]);
NOTICE([$note=Multiple_Sig_Responders, $src=orig, $filename=sig_id,
NOTICE([$note=Multiple_Sig_Responders, $src=orig,
$msg=msg, $n=hcount, $sub=horz_scan_msg]);
last_hthresh[orig] = hcount;
@ -295,7 +294,6 @@ event signature_match(state: signature_state, msg: string, data: string)
$sub_msg=vert_scan_msg]);
NOTICE([$note=Multiple_Signatures, $src=orig, $dst=resp,
$filename=sig_id,
$msg=fmt("%s different signatures triggered", vcount),
$n=vcount, $sub=vert_scan_msg]);

View file

@ -29,6 +29,8 @@ export {
minor: count &optional;
## Minor subversion number
minor2: count &optional;
## Minor updates number
minor3: count &optional;
## Additional version string (e.g. "beta42")
addl: string &optional;
} &log;
@ -146,10 +148,10 @@ function parse(unparsed_version: string): Description
if ( /^[\/\-\._v\(]/ in sv )
sv = strip(sub(version_parts[2], /^\(?[\/\-\._v\(]/, ""));
local version_numbers = split_n(sv, /[\-\._,\[\(\{ ]/, F, 3);
if ( 4 in version_numbers && version_numbers[4] != "" )
v$addl = strip(version_numbers[4]);
if ( 5 in version_numbers && version_numbers[5] != "" )
v$addl = strip(version_numbers[5]);
else if ( 3 in version_parts && version_parts[3] != "" &&
version_parts[3] != ")" )
version_parts[3] != ")" )
{
if ( /^[[:blank:]]*\([a-zA-Z0-9\-\._[:blank:]]*\)/ in version_parts[3] )
{
@ -177,7 +179,9 @@ function parse(unparsed_version: string): Description
}
}
if ( 4 in version_numbers && version_numbers[4] != "" )
v$minor3 = extract_count(version_numbers[4]);
if ( 3 in version_numbers && version_numbers[3] != "" )
v$minor2 = extract_count(version_numbers[3]);
if ( 2 in version_numbers && version_numbers[2] != "" )
@ -332,8 +336,25 @@ function cmp_versions(v1: Version, v2: Version): int
return v1?$minor2 ? 1 : -1;
}
if ( v1?$minor3 && v2?$minor3 )
{
if ( v1$minor3 < v2$minor3 )
return -1;
if ( v1$minor3 > v2$minor3 )
return 1;
}
else
{
if ( !v1?$minor3 && !v2?$minor3 )
{ }
else
return v1?$minor3 ? 1 : -1;
}
if ( v1?$addl && v2?$addl )
{
return strcmp(v1$addl, v2$addl);
}
else
{
if ( !v1?$addl && !v2?$addl )
@ -341,6 +362,9 @@ function cmp_versions(v1: Version, v2: Version): int
else
return v1?$addl ? 1 : -1;
}
# A catcher return that should never be reached...hopefully
return 0;
}
function software_endpoint_name(id: conn_id, host: addr): string
@ -351,10 +375,11 @@ function software_endpoint_name(id: conn_id, host: addr): string
# Convert a version into a string "a.b.c-x".
function software_fmt_version(v: Version): string
{
return fmt("%d.%d.%d%s",
v?$major ? v$major : 0,
v?$minor ? v$minor : 0,
v?$minor2 ? v$minor2 : 0,
return fmt("%s%s%s%s%s",
v?$major ? fmt("%d", v$major) : "0",
v?$minor ? fmt(".%d", v$minor) : "",
v?$minor2 ? fmt(".%d", v$minor2) : "",
v?$minor3 ? fmt(".%d", v$minor3) : "",
v?$addl ? fmt("-%s", v$addl) : "");
}

View file

@ -1,4 +1,5 @@
@load ./main
@load ./plugins
# The cluster framework must be loaded first.
@load base/frameworks/cluster

View file

@ -0,0 +1,346 @@
##! This implements transparent cluster support for the SumStats framework.
##! Do not load this file directly. It's only meant to be loaded automatically
##! and will be depending on if the cluster framework has been enabled.
##! The goal of this script is to make sumstats calculation completely and
##! transparently automated when running on a cluster.
@load base/frameworks/cluster
@load ./main
module SumStats;
export {
## Allows a user to decide how large of result groups the workers should transmit
## values for cluster stats aggregation.
const cluster_send_in_groups_of = 50 &redef;
## The percent of the full threshold value that needs to be met on a single worker
## for that worker to send the value to its manager in order for it to request a
## global view for that value. There is no requirement that the manager requests
## a global view for the key since it may opt not to if it requested a global view
## for the key recently.
const cluster_request_global_view_percent = 0.2 &redef;
## This is to deal with intermediate update overload. A manager will only allow
## this many intermediate update requests to the workers to be inflight at any
## given time. Requested intermediate updates are currently thrown out and not
## performed. In practice this should hopefully have a minimal effect.
const max_outstanding_global_views = 10 &redef;
## Intermediate updates can cause overload situations on very large clusters. This
## option may help reduce load and correct intermittent problems. The goal for this
## option is also meant to be temporary.
const enable_intermediate_updates = T &redef;
## Event sent by the manager in a cluster to initiate the collection of values for
## a sumstat.
global cluster_ss_request: event(uid: string, ssid: string);
## Event sent by nodes that are collecting sumstats after receiving a request for
## the sumstat from the manager.
global cluster_ss_response: event(uid: string, ssid: string, data: ResultTable, done: bool);
## This event is sent by the manager in a cluster to initiate the collection of
## a single key value from a sumstat. It's typically used to get intermediate
## updates before the break interval triggers to speed detection of a value
## crossing a threshold.
global cluster_key_request: event(uid: string, ssid: string, key: Key);
## This event is sent by nodes in response to a
## :bro:id:`SumStats::cluster_key_request` event.
global cluster_key_response: event(uid: string, ssid: string, key: Key, result: Result);
## This is sent by workers to indicate that they crossed the percent
## of the current threshold by the percentage defined globally in
## :bro:id:`SumStats::cluster_request_global_view_percent`
global cluster_key_intermediate_response: event(ssid: string, key: SumStats::Key);
## This event is scheduled internally on workers to send result chunks.
global send_data: event(uid: string, ssid: string, data: ResultTable);
## This event is generated when a threshold is crossed.
global cluster_threshold_crossed: event(ssid: string, key: SumStats::Key, thold: Thresholding);
}
# Add events to the cluster framework to make this work.
redef Cluster::manager2worker_events += /SumStats::cluster_(ss_request|key_request|threshold_crossed)/;
redef Cluster::manager2worker_events += /SumStats::thresholds_reset/;
redef Cluster::worker2manager_events += /SumStats::cluster_(ss_response|key_response|key_intermediate_response)/;
@if ( Cluster::local_node_type() != Cluster::MANAGER )
# This variable is maintained to know what keys have recently sent as
# intermediate updates so they don't overwhelm their manager. The count that is
# yielded is the number of times the percentage threshold has been crossed and
# an intermediate result has been received.
global recent_global_view_keys: table[string, Key] of count &create_expire=1min &default=0;
event bro_init() &priority=-100
{
# The manager is the only host allowed to track these.
stats_store = table();
reducer_store = table();
}
# This is done on all non-manager node types in the event that a sumstat is
# being collected somewhere other than a worker.
function data_added(ss: SumStat, key: Key, result: Result)
{
# If an intermediate update for this value was sent recently, don't send
# it again.
if ( [ss$id, key] in recent_global_view_keys )
return;
# If val is 5 and global view % is 0.1 (10%), pct_val will be 50. If that
# crosses the full threshold then it's a candidate to send as an
# intermediate update.
if ( enable_intermediate_updates &&
check_thresholds(ss, key, result, cluster_request_global_view_percent) )
{
# kick off intermediate update
event SumStats::cluster_key_intermediate_response(ss$id, key);
++recent_global_view_keys[ss$id, key];
}
}
event SumStats::send_data(uid: string, ssid: string, data: ResultTable)
{
#print fmt("WORKER %s: sending data for uid %s...", Cluster::node, uid);
local local_data: ResultTable = table();
local num_added = 0;
for ( key in data )
{
local_data[key] = data[key];
delete data[key];
# Only send cluster_send_in_groups_of at a time. Queue another
# event to send the next group.
if ( cluster_send_in_groups_of == ++num_added )
break;
}
local done = F;
# If data is empty, this sumstat is done.
if ( |data| == 0 )
done = T;
# Note: copy is needed to compensate serialization caching issue. This should be
# changed to something else later.
event SumStats::cluster_ss_response(uid, ssid, copy(local_data), done);
if ( ! done )
schedule 0.01 sec { SumStats::send_data(uid, ssid, data) };
}
event SumStats::cluster_ss_request(uid: string, ssid: string)
{
#print fmt("WORKER %s: received the cluster_ss_request event for %s.", Cluster::node, id);
# Initiate sending all of the data for the requested stats.
if ( ssid in result_store )
event SumStats::send_data(uid, ssid, result_store[ssid]);
else
event SumStats::send_data(uid, ssid, table());
# Lookup the actual sumstats and reset it, the reference to the data
# currently stored will be maintained internally by the send_data event.
if ( ssid in stats_store )
reset(stats_store[ssid]);
}
event SumStats::cluster_key_request(uid: string, ssid: string, key: Key)
{
if ( ssid in result_store && key in result_store[ssid] )
{
#print fmt("WORKER %s: received the cluster_key_request event for %s=%s.", Cluster::node, key2str(key), data);
# Note: copy is needed to compensate serialization caching issue. This should be
# changed to something else later.
event SumStats::cluster_key_response(uid, ssid, key, copy(result_store[ssid][key]));
}
else
{
# We need to send an empty response if we don't have the data so that the manager
# can know that it heard back from all of the workers.
event SumStats::cluster_key_response(uid, ssid, key, table());
}
}
event SumStats::cluster_threshold_crossed(ssid: string, key: SumStats::Key, thold: Thresholding)
{
if ( ssid !in threshold_tracker )
threshold_tracker[ssid] = table();
threshold_tracker[ssid][key] = thold;
}
event SumStats::thresholds_reset(ssid: string)
{
threshold_tracker[ssid] = table();
}
@endif
@if ( Cluster::local_node_type() == Cluster::MANAGER )
# This variable is maintained by manager nodes as they collect and aggregate
# results.
# Index on a uid.
global stats_results: table[string] of ResultTable &read_expire=1min;
# This variable is maintained by manager nodes to track how many "dones" they
# collected per collection unique id. Once the number of results for a uid
# matches the number of peer nodes that results should be coming from, the
# result is written out and deleted from here.
# Indexed on a uid.
# TODO: add an &expire_func in case not all results are received.
global done_with: table[string] of count &read_expire=1min &default=0;
# This variable is maintained by managers to track intermediate responses as
# they are getting a global view for a certain key.
# Indexed on a uid.
global key_requests: table[string] of Result &read_expire=1min;
# This variable is maintained by managers to prevent overwhelming communication due
# to too many intermediate updates. Each sumstat is tracked separately so that
# one won't overwhelm and degrade other quieter sumstats.
# Indexed on a sumstat id.
global outstanding_global_views: table[string] of count &default=0;
const zero_time = double_to_time(0.0);
# Managers handle logging.
event SumStats::finish_epoch(ss: SumStat)
{
if ( network_time() > zero_time )
{
#print fmt("%.6f MANAGER: breaking %s sumstat for %s sumstat", network_time(), ss$name, ss$id);
local uid = unique_id("");
if ( uid in stats_results )
delete stats_results[uid];
stats_results[uid] = table();
# Request data from peers.
event SumStats::cluster_ss_request(uid, ss$id);
}
# Schedule the next finish_epoch event.
schedule ss$epoch { SumStats::finish_epoch(ss) };
}
# This is unlikely to be called often, but it's here in
# case there are sumstats being collected by managers.
function data_added(ss: SumStat, key: Key, result: Result)
{
if ( check_thresholds(ss, key, result, 1.0) )
{
threshold_crossed(ss, key, result);
event SumStats::cluster_threshold_crossed(ss$id, key, threshold_tracker[ss$id][key]);
}
}
event SumStats::cluster_key_response(uid: string, ssid: string, key: Key, result: Result)
{
#print fmt("%0.6f MANAGER: receiving key data from %s - %s=%s", network_time(), get_event_peer()$descr, key2str(key), result);
# We only want to try and do a value merge if there are actually measured datapoints
# in the Result.
if ( uid in key_requests )
key_requests[uid] = compose_results(key_requests[uid], result);
else
key_requests[uid] = result;
# Mark that a worker is done.
++done_with[uid];
#print fmt("worker_count:%d :: done_with:%d", Cluster::worker_count, done_with[uid]);
if ( Cluster::worker_count == done_with[uid] )
{
local ss = stats_store[ssid];
local ir = key_requests[uid];
if ( check_thresholds(ss, key, ir, 1.0) )
{
threshold_crossed(ss, key, ir);
event SumStats::cluster_threshold_crossed(ss$id, key, threshold_tracker[ss$id][key]);
}
delete done_with[uid];
delete key_requests[uid];
# Check that there is an outstanding view before subtracting.
if ( outstanding_global_views[ssid] > 0 )
--outstanding_global_views[ssid];
}
}
# Managers handle intermediate updates here.
event SumStats::cluster_key_intermediate_response(ssid: string, key: Key)
{
#print fmt("MANAGER: receiving intermediate key data from %s", get_event_peer()$descr);
#print fmt("MANAGER: requesting key data for %s", key2str(key));
if ( ssid in outstanding_global_views &&
|outstanding_global_views[ssid]| > max_outstanding_global_views )
{
# Don't do this intermediate update. Perhaps at some point in the future
# we will queue and randomly select from these ignored intermediate
# update requests.
return;
}
++outstanding_global_views[ssid];
local uid = unique_id("");
event SumStats::cluster_key_request(uid, ssid, key);
}
event SumStats::cluster_ss_response(uid: string, ssid: string, data: ResultTable, done: bool)
{
#print fmt("MANAGER: receiving results from %s", get_event_peer()$descr);
# Mark another worker as being "done" for this uid.
if ( done )
++done_with[uid];
local local_data = stats_results[uid];
local ss = stats_store[ssid];
for ( key in data )
{
if ( key in local_data )
local_data[key] = compose_results(local_data[key], data[key]);
else
local_data[key] = data[key];
# If a stat is done being collected, thresholds for each key
# need to be checked so we're doing it here to avoid doubly
# iterating over each key.
if ( Cluster::worker_count == done_with[uid] )
{
if ( check_thresholds(ss, key, local_data[key], 1.0) )
{
threshold_crossed(ss, key, local_data[key]);
event SumStats::cluster_threshold_crossed(ss$id, key, threshold_tracker[ss$id][key]);
}
}
}
# If the data has been collected from all peers, we are done and ready to finish.
if ( Cluster::worker_count == done_with[uid] )
{
if ( ss?$epoch_finished )
ss$epoch_finished(local_data);
# Clean up
delete stats_results[uid];
delete done_with[uid];
# Not sure I need to reset the sumstat on the manager.
reset(ss);
}
}
event remote_connection_handshake_done(p: event_peer) &priority=5
{
send_id(p, "SumStats::stats_store");
send_id(p, "SumStats::reducer_store");
}
@endif

View file

@ -0,0 +1,436 @@
##! The summary statistics framework provides a way to
##! summarize large streams of data into simple reduced
##! measurements.
module SumStats;
export {
## The various calculations are all defined as plugins.
type Calculation: enum {
PLACEHOLDER
};
## Represents a thing which is having summarization
## results collected for it.
type Key: record {
## A non-address related summarization or a sub-key for
## an address based summarization. An example might be
## successful SSH connections by client IP address
## where the client string would be the key value.
## Another example might be number of HTTP requests to
## a particular value in a Host header. This is an
## example of a non-host based metric since multiple
## IP addresses could respond for the same Host
## header value.
str: string &optional;
## Host is the value to which this metric applies.
host: addr &optional;
};
## Represents data being added for a single observation.
## Only supply a single field at a time!
type Observation: record {
## Count value.
num: count &optional;
## Double value.
dbl: double &optional;
## String value.
str: string &optional;
};
type Reducer: record {
## Observation stream identifier for the reducer
## to attach to.
stream: string;
## The calculations to perform on the data points.
apply: set[Calculation];
## A predicate so that you can decide per key if you
## would like to accept the data being inserted.
pred: function(key: SumStats::Key, obs: SumStats::Observation): bool &optional;
## A function to normalize the key. This can be used to aggregate or
## normalize the entire key.
normalize_key: function(key: SumStats::Key): Key &optional;
};
## Value calculated for an observation stream fed into a reducer.
## Most of the fields are added by plugins.
type ResultVal: record {
## The time when the first observation was added to
## this result value.
begin: time;
## The time when the last observation was added to
## this result value.
end: time;
## The number of observations received.
num: count &default=0;
};
## Type to store results for multiple reducers.
type Result: table[string] of ResultVal;
## Type to store a table of sumstats results indexed
## by keys.
type ResultTable: table[Key] of Result;
## SumStats represent an aggregation of reducers along with
## mechanisms to handle various situations like the epoch ending
## or thresholds being crossed.
##
## It's best to not access any global state outside
## of the variables given to the callbacks because there
## is no assurance provided as to where the callbacks
## will be executed on clusters.
type SumStat: record {
## The interval at which this filter should be "broken"
## and the '$epoch_finished' callback called. The
## results are also reset at this time so any threshold
## based detection needs to be set to a
## value that should be expected to happen within
## this epoch.
epoch: interval;
## The reducers for the SumStat
reducers: set[Reducer];
## Provide a function to calculate a value from the
## :bro:see:`SumStats::Result` structure which will be used
## for thresholding.
## This is required if a $threshold value is given.
threshold_val: function(key: SumStats::Key, result: SumStats::Result): count &optional;
## The threshold value for calling the
## $threshold_crossed callback.
threshold: count &optional;
## A series of thresholds for calling the
## $threshold_crossed callback.
threshold_series: vector of count &optional;
## A callback that is called when a threshold is crossed.
threshold_crossed: function(key: SumStats::Key, result: SumStats::Result) &optional;
## A callback with the full collection of Results for
## this SumStat.
epoch_finished: function(rt: SumStats::ResultTable) &optional;
};
## Create a summary statistic.
global create: function(ss: SumStats::SumStat);
## Add data into an observation stream. This should be
## called when a script has measured some point value.
##
## id: The observation stream identifier that the data
## point represents.
##
## key: The key that the value is related to.
##
## obs: The data point to send into the stream.
global observe: function(id: string, key: SumStats::Key, obs: SumStats::Observation);
## This record is primarily used for internal threshold tracking.
type Thresholding: record {
# Internal use only. Indicates if a simple threshold was already crossed.
is_threshold_crossed: bool &default=F;
# Internal use only. Current key for threshold series.
threshold_series_index: count &default=0;
};
## This event is generated when thresholds are reset for a SumStat.
##
## ssid: SumStats ID that thresholds were reset for.
global thresholds_reset: event(ssid: string);
## Helper function to represent a :bro:type:`SumStats::Key` value as
## a simple string.
##
## key: The metric key that is to be converted into a string.
##
## Returns: A string representation of the metric key.
global key2str: function(key: SumStats::Key): string;
}
redef record Reducer += {
# Internal use only. Provides a reference back to the related SumStats by it's ID.
sid: string &optional;
};
# Internal use only. For tracking thresholds per sumstat and key.
global threshold_tracker: table[string] of table[Key] of Thresholding &optional;
redef record SumStat += {
# Internal use only (mostly for cluster coherency).
id: string &optional;
};
# Store of sumstats indexed on the sumstat id.
global stats_store: table[string] of SumStat = table();
# Store of reducers indexed on the data point stream id.
global reducer_store: table[string] of set[Reducer] = table();
# Store of results indexed on the measurement id.
global result_store: table[string] of ResultTable = table();
# Store of threshold information.
global thresholds_store: table[string, Key] of bool = table();
# This is called whenever key values are updated and the new val is given as the
# `val` argument. It's only prototyped here because cluster and non-cluster have
# separate implementations.
global data_added: function(ss: SumStat, key: Key, result: Result);
# Prototype the hook point for plugins to do calculations.
global observe_hook: hook(r: Reducer, val: double, data: Observation, rv: ResultVal);
# Prototype the hook point for plugins to initialize any result values.
global init_resultval_hook: hook(r: Reducer, rv: ResultVal);
# Prototype the hook point for plugins to merge Results.
global compose_resultvals_hook: hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal);
# Event that is used to "finish" measurements and adapt the measurement
# framework for clustered or non-clustered usage.
global finish_epoch: event(ss: SumStat);
function key2str(key: Key): string
{
local out = "";
if ( key?$host )
out = fmt("%shost=%s", out, key$host);
if ( key?$str )
out = fmt("%s%sstr=%s", out, |out|==0 ? "" : ", ", key$str);
return fmt("sumstats_key(%s)", out);
}
function init_resultval(r: Reducer): ResultVal
{
local rv: ResultVal = [$begin=network_time(), $end=network_time()];
hook init_resultval_hook(r, rv);
return rv;
}
function compose_resultvals(rv1: ResultVal, rv2: ResultVal): ResultVal
{
local result: ResultVal;
result$begin = (rv1$begin < rv2$begin) ? rv1$begin : rv2$begin;
result$end = (rv1$end > rv2$end) ? rv1$end : rv2$end;
result$num = rv1$num + rv2$num;
# Run the plugin composition hooks.
hook compose_resultvals_hook(result, rv1, rv2);
return result;
}
function compose_results(r1: Result, r2: Result): Result
{
local result: Result = table();
if ( |r1| > |r2| )
{
for ( data_id in r1 )
{
if ( data_id in r2 )
result[data_id] = compose_resultvals(r1[data_id], r2[data_id]);
else
result[data_id] = r1[data_id];
}
}
else
{
for ( data_id in r2 )
{
if ( data_id in r1 )
result[data_id] = compose_resultvals(r1[data_id], r2[data_id]);
else
result[data_id] = r2[data_id];
}
}
return result;
}
function reset(ss: SumStat)
{
if ( ss$id in result_store )
delete result_store[ss$id];
result_store[ss$id] = table();
if ( ss?$threshold || ss?$threshold_series )
{
threshold_tracker[ss$id] = table();
event SumStats::thresholds_reset(ss$id);
}
}
function create(ss: SumStat)
{
if ( (ss?$threshold || ss?$threshold_series) && ! ss?$threshold_val )
{
Reporter::error("SumStats given a threshold with no $threshold_val function");
}
if ( ! ss?$id )
ss$id=unique_id("");
threshold_tracker[ss$id] = table();
stats_store[ss$id] = ss;
for ( reducer in ss$reducers )
{
reducer$sid = ss$id;
if ( reducer$stream !in reducer_store )
reducer_store[reducer$stream] = set();
add reducer_store[reducer$stream][reducer];
}
reset(ss);
schedule ss$epoch { SumStats::finish_epoch(ss) };
}
function observe(id: string, key: Key, obs: Observation)
{
if ( id !in reducer_store )
return;
# Try to add the data to all of the defined reducers.
for ( r in reducer_store[id] )
{
if ( r?$normalize_key )
key = r$normalize_key(copy(key));
# If this reducer has a predicate, run the predicate
# and skip this key if the predicate return false.
if ( r?$pred && ! r$pred(key, obs) )
next;
local ss = stats_store[r$sid];
# If there is a threshold and no epoch_finished callback
# we don't need to continue counting since the data will
# never be accessed. This was leading
# to some state management issues when measuring
# uniqueness.
# NOTE: this optimization could need removed in the
# future if on demand access is provided to the
# SumStats results.
if ( ! ss?$epoch_finished &&
r$sid in threshold_tracker &&
key in threshold_tracker[r$sid] &&
( ss?$threshold &&
threshold_tracker[r$sid][key]$is_threshold_crossed ) ||
( ss?$threshold_series &&
threshold_tracker[r$sid][key]$threshold_series_index+1 == |ss$threshold_series| ) )
next;
if ( r$sid !in result_store )
result_store[ss$id] = table();
local results = result_store[r$sid];
if ( key !in results )
results[key] = table();
local result = results[key];
if ( id !in result )
result[id] = init_resultval(r);
local result_val = result[id];
++result_val$num;
# Continually update the $end field.
result_val$end=network_time();
# If a string was given, fall back to 1.0 as the value.
local val = 1.0;
if ( obs?$num || obs?$dbl )
val = obs?$dbl ? obs$dbl : obs$num;
hook observe_hook(r, val, obs, result_val);
data_added(ss, key, result);
}
}
# This function checks if a threshold has been crossed. It is also used as a method to implement
# mid-break-interval threshold crossing detection for cluster deployments.
function check_thresholds(ss: SumStat, key: Key, result: Result, modify_pct: double): bool
{
if ( ! (ss?$threshold || ss?$threshold_series) )
return F;
# Add in the extra ResultVals to make threshold_vals easier to write.
if ( |ss$reducers| != |result| )
{
for ( reducer in ss$reducers )
{
if ( reducer$stream !in result )
result[reducer$stream] = init_resultval(reducer);
}
}
local watch = ss$threshold_val(key, result);
if ( modify_pct < 1.0 && modify_pct > 0.0 )
watch = double_to_count(floor(watch/modify_pct));
if ( ss$id !in threshold_tracker )
threshold_tracker[ss$id] = table();
local t_tracker = threshold_tracker[ss$id];
if ( key !in t_tracker )
{
local ttmp: Thresholding;
t_tracker[key] = ttmp;
}
local tt = t_tracker[key];
if ( ss?$threshold && ! tt$is_threshold_crossed && watch >= ss$threshold )
{
# Value crossed the threshold.
return T;
}
if ( ss?$threshold_series &&
|ss$threshold_series| >= tt$threshold_series_index &&
watch >= ss$threshold_series[tt$threshold_series_index] )
{
# A threshold series was given and the value crossed the next
# value in the series.
return T;
}
return F;
}
function threshold_crossed(ss: SumStat, key: Key, result: Result)
{
# If there is no callback, there is no point in any of this.
if ( ! ss?$threshold_crossed )
return;
# Add in the extra ResultVals to make threshold_crossed callbacks easier to write.
if ( |ss$reducers| != |result| )
{
for ( reducer in ss$reducers )
{
if ( reducer$stream !in result )
result[reducer$stream] = init_resultval(reducer);
}
}
ss$threshold_crossed(key, result);
local tt = threshold_tracker[ss$id][key];
tt$is_threshold_crossed = T;
# Bump up to the next threshold series index if a threshold series is being used.
if ( ss?$threshold_series )
++tt$threshold_series_index;
}

View file

@ -0,0 +1,24 @@
@load ./main
module SumStats;
event SumStats::finish_epoch(ss: SumStat)
{
if ( ss$id in result_store )
{
local data = result_store[ss$id];
if ( ss?$epoch_finished )
ss$epoch_finished(data);
reset(ss);
}
schedule ss$epoch { SumStats::finish_epoch(ss) };
}
function data_added(ss: SumStat, key: Key, result: Result)
{
if ( check_thresholds(ss, key, result, 1.0) )
threshold_crossed(ss, key, result);
}

View file

@ -0,0 +1,9 @@
@load ./average
@load ./last
@load ./max
@load ./min
@load ./sample
@load ./std-dev
@load ./sum
@load ./unique
@load ./variance

View file

@ -0,0 +1,36 @@
@load base/frameworks/sumstats/main
module SumStats;
export {
redef enum Calculation += {
## Calculate the average of the values.
AVERAGE
};
redef record ResultVal += {
## For numeric data, this calculates the average of all values.
average: double &optional;
};
}
hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal)
{
if ( AVERAGE in r$apply )
{
if ( ! rv?$average )
rv$average = val;
else
rv$average += (val - rv$average) / rv$num;
}
}
hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal)
{
if ( rv1?$average && rv2?$average )
result$average = ((rv1$average*rv1$num) + (rv2$average*rv2$num))/(rv1$num+rv2$num);
else if ( rv1?$average )
result$average = rv1$average;
else if ( rv2?$average )
result$average = rv2$average;
}

View file

@ -0,0 +1,55 @@
@load base/frameworks/sumstats
@load base/utils/queue
module SumStats;
export {
redef enum Calculation += {
## Keep last X observations in a queue
LAST
};
redef record Reducer += {
## number of elements to keep.
num_last_elements: count &default=0;
};
redef record ResultVal += {
## This is the queue where elements are maintained. Use the
## :bro:see:`SumStats::get_last` function to get a vector of
## the current element values.
last_elements: Queue::Queue &optional;
};
## Get a vector of element values from a ResultVal.
global get_last: function(rv: ResultVal): vector of Observation;
}
function get_last(rv: ResultVal): vector of Observation
{
local s: vector of Observation = vector();
if ( rv?$last_elements )
Queue::get_vector(rv$last_elements, s);
return s;
}
hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal)
{
if ( LAST in r$apply && r$num_last_elements > 0 )
{
if ( ! rv?$last_elements )
rv$last_elements = Queue::init([$max_len=r$num_last_elements]);
Queue::put(rv$last_elements, obs);
}
}
hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal)
{
# Merge $samples
if ( rv1?$last_elements && rv2?$last_elements )
result$last_elements = Queue::merge(rv1$last_elements, rv2$last_elements);
else if ( rv1?$last_elements )
result$last_elements = rv1$last_elements;
else if ( rv2?$last_elements )
result$last_elements = rv2$last_elements;
}

View file

@ -0,0 +1,38 @@
@load base/frameworks/sumstats/main
module SumStats;
export {
redef enum Calculation += {
## Find the maximum value.
MAX
};
redef record ResultVal += {
## For numeric data, this tracks the maximum value given.
max: double &optional;
};
}
hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal)
{
if ( MAX in r$apply )
{
if ( ! rv?$max )
rv$max = val;
else if ( val > rv$max )
rv$max = val;
}
}
hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal)
{
if ( rv1?$max && rv2?$max )
result$max = (rv1$max > rv2$max) ? rv1$max : rv2$max;
else if ( rv1?$max )
result$max = rv1$max;
else if ( rv2?$max )
result$max = rv2$max;
}

View file

@ -0,0 +1,36 @@
@load base/frameworks/sumstats/main
module SumStats;
export {
redef enum Calculation += {
## Find the minimum value.
MIN
};
redef record ResultVal += {
## For numeric data, this tracks the minimum value given.
min: double &optional;
};
}
hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal)
{
if ( MIN in r$apply )
{
if ( ! rv?$min )
rv$min = val;
else if ( val < rv$min )
rv$min = val;
}
}
hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal)
{
if ( rv1?$min && rv2?$min )
result$min = (rv1$min < rv2$min) ? rv1$min : rv2$min;
else if ( rv1?$min )
result$min = rv1$min;
else if ( rv2?$min )
result$min = rv2$min;
}

View file

@ -0,0 +1,120 @@
@load base/frameworks/sumstats/main
module SumStats;
export {
redef enum Calculation += {
## Get uniquely distributed random samples from the observation stream.
SAMPLE
};
redef record Reducer += {
## A number of sample Observations to collect.
num_samples: count &default=0;
};
redef record ResultVal += {
## This is the vector in which the samples are maintained.
samples: vector of Observation &default=vector();
## Number of total observed elements.
sample_elements: count &default=0;
};
}
redef record ResultVal += {
# Internal use only. This is not meant to be publically available
# and just a copy of num_samples from the Reducer. Needed for availability
# in the compose hook.
num_samples: count &default=0;
};
hook init_resultval_hook(r: Reducer, rv: ResultVal)
{
if ( SAMPLE in r$apply )
rv$num_samples = r$num_samples;
}
function sample_add_sample(obs:Observation, rv: ResultVal)
{
++rv$sample_elements;
if ( |rv$samples| < rv$num_samples )
rv$samples[|rv$samples|] = obs;
else
{
local ra = rand(rv$sample_elements);
if ( ra < rv$num_samples )
rv$samples[ra] = obs;
}
}
hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal)
{
if ( SAMPLE in r$apply )
{
sample_add_sample(obs, rv);
}
}
hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal)
{
if ( rv1$num_samples != rv2$num_samples )
{
Reporter::error("Merging sample sets with differing sizes is not supported");
return;
}
local num_samples = rv1$num_samples;
result$num_samples = num_samples;
if ( |rv1$samples| > num_samples || |rv2$samples| > num_samples )
{
Reporter::error("Sample vector with too many elements. Aborting.");
return;
}
if ( |rv1$samples| != num_samples && |rv2$samples| < num_samples )
{
if ( |rv1$samples| != rv1$sample_elements || |rv2$samples| < rv2$sample_elements )
{
Reporter::error("Mismatch in sample element size and tracking. Aborting merge");
return;
}
for ( i in rv1$samples )
sample_add_sample(rv1$samples[i], result);
for ( i in rv2$samples)
sample_add_sample(rv2$samples[i], result);
}
else
{
local other_vector: vector of Observation;
local othercount: count;
if ( rv1$sample_elements > rv2$sample_elements )
{
result$samples = copy(rv1$samples);
other_vector = rv2$samples;
othercount = rv2$sample_elements;
}
else
{
result$samples = copy(rv2$samples);
other_vector = rv1$samples;
othercount = rv1$sample_elements;
}
local totalcount = rv1$sample_elements + rv2$sample_elements;
result$sample_elements = totalcount;
for ( i in other_vector )
{
if ( rand(totalcount) <= othercount )
result$samples[i] = other_vector[i];
}
}
}

View file

@ -0,0 +1,34 @@
@load base/frameworks/sumstats/main
@load ./variance
module SumStats;
export {
redef enum Calculation += {
## Find the standard deviation of the values.
STD_DEV
};
redef record ResultVal += {
## For numeric data, this calculates the standard deviation.
std_dev: double &default=0.0;
};
}
function calc_std_dev(rv: ResultVal)
{
if ( rv?$variance )
rv$std_dev = sqrt(rv$variance);
}
# This depends on the variance plugin which uses priority -5
hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal) &priority=-10
{
if ( STD_DEV in r$apply )
calc_std_dev(rv);
}
hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal) &priority=-10
{
calc_std_dev(result);
}

View file

@ -0,0 +1,51 @@
@load base/frameworks/sumstats/main
module SumStats;
export {
redef enum Calculation += {
## Sums the values given. For string values,
## this will be the number of strings given.
SUM
};
redef record ResultVal += {
## For numeric data, this tracks the sum of all values.
sum: double &default=0.0;
};
type threshold_function: function(key: SumStats::Key, result: SumStats::Result): count;
global sum_threshold: function(data_id: string): threshold_function;
}
function sum_threshold(data_id: string): threshold_function
{
return function(key: SumStats::Key, result: SumStats::Result): count
{
print fmt("data_id: %s", data_id);
print result;
return double_to_count(result[data_id]$sum);
};
}
hook init_resultval_hook(r: Reducer, rv: ResultVal)
{
if ( SUM in r$apply && ! rv?$sum )
rv$sum = 0;
}
hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal)
{
if ( SUM in r$apply )
rv$sum += val;
}
hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal)
{
if ( rv1?$sum || rv2?$sum )
{
result$sum = rv1?$sum ? rv1$sum : 0;
if ( rv2?$sum )
result$sum += rv2$sum;
}
}

View file

@ -0,0 +1,53 @@
@load base/frameworks/sumstats/main
module SumStats;
export {
redef enum Calculation += {
## Calculate the number of unique values.
UNIQUE
};
redef record ResultVal += {
## If cardinality is being tracked, the number of unique
## items is tracked here.
unique: count &default=0;
};
}
redef record ResultVal += {
# Internal use only. This is not meant to be publically available
# because we don't want to trust that we can inspect the values
# since we will like move to a probalistic data structure in the future.
# TODO: in the future this will optionally be a hyperloglog structure
unique_vals: set[Observation] &optional;
};
hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal)
{
if ( UNIQUE in r$apply )
{
if ( ! rv?$unique_vals )
rv$unique_vals=set();
add rv$unique_vals[obs];
rv$unique = |rv$unique_vals|;
}
}
hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal)
{
if ( rv1?$unique_vals || rv2?$unique_vals )
{
if ( rv1?$unique_vals )
result$unique_vals = copy(rv1$unique_vals);
if ( rv2?$unique_vals )
if ( ! result?$unique_vals )
result$unique_vals = copy(rv2$unique_vals);
else
for ( val2 in rv2$unique_vals )
add result$unique_vals[copy(val2)];
result$unique = |result$unique_vals|;
}
}

View file

@ -0,0 +1,69 @@
@load base/frameworks/sumstats/main
@load ./average
module SumStats;
export {
redef enum Calculation += {
## Find the variance of the values.
VARIANCE
};
redef record ResultVal += {
## For numeric data, this calculates the variance.
variance: double &optional;
};
}
redef record ResultVal += {
# Internal use only. Used for incrementally calculating variance.
prev_avg: double &optional;
# Internal use only. For calculating incremental variance.
var_s: double &default=0.0;
};
function calc_variance(rv: ResultVal)
{
rv$variance = (rv$num > 1) ? rv$var_s/(rv$num-1) : 0.0;
}
# Reduced priority since this depends on the average
hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal) &priority=-5
{
if ( VARIANCE in r$apply )
{
if ( rv$num > 1 )
rv$var_s += ((val - rv$prev_avg) * (val - rv$average));
calc_variance(rv);
rv$prev_avg = rv$average;
}
}
# Reduced priority since this depends on the average
hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal) &priority=-5
{
if ( rv1?$var_s && rv1?$average &&
rv2?$var_s && rv2?$average )
{
local rv1_avg_sq = (rv1$average - result$average);
rv1_avg_sq = rv1_avg_sq*rv1_avg_sq;
local rv2_avg_sq = (rv2$average - result$average);
rv2_avg_sq = rv2_avg_sq*rv2_avg_sq;
result$var_s = rv1$num*(rv1$var_s/rv1$num + rv1_avg_sq) + rv2$num*(rv2$var_s/rv2$num + rv2_avg_sq);
}
else if ( rv1?$var_s )
result$var_s = rv1$var_s;
else if ( rv2?$var_s )
result$var_s = rv2$var_s;
if ( rv1?$prev_avg && rv2?$prev_avg )
result$prev_avg = ((rv1$prev_avg*rv1$num) + (rv2$prev_avg*rv2$num))/(rv1$num+rv2$num);
else if ( rv1?$prev_avg )
result$prev_avg = rv1$prev_avg;
else if ( rv2?$prev_avg )
result$prev_avg = rv2$prev_avg;
calc_variance(result);
}

View file

@ -83,16 +83,17 @@ export {
}
const ayiya_ports = { 5072/udp };
redef dpd_config += { [ANALYZER_AYIYA] = [$ports = ayiya_ports] };
const teredo_ports = { 3544/udp };
redef dpd_config += { [ANALYZER_TEREDO] = [$ports = teredo_ports] };
redef likely_server_ports += { ayiya_ports, teredo_ports };
const gtpv1_ports = { 2152/udp, 2123/udp };
redef likely_server_ports += { ayiya_ports, teredo_ports, gtpv1_ports };
event bro_init() &priority=5
{
Log::create_stream(Tunnel::LOG, [$columns=Info]);
Analyzer::register_for_ports(Analyzer::ANALYZER_AYIYA, ayiya_ports);
Analyzer::register_for_ports(Analyzer::ANALYZER_TEREDO, teredo_ports);
Analyzer::register_for_ports(Analyzer::ANALYZER_GTPV1, gtpv1_ports);
}
function register_all(ecv: EncapsulatingConnVector)