Merge remote-tracking branch 'origin/master' into topic/seth/metrics-updates

Conflicts:
	testing/btest/Baseline/scripts.base.frameworks.metrics.basic-cluster/manager-1.metrics.log
	testing/btest/Baseline/scripts.base.frameworks.metrics.basic/metrics.log
	testing/btest/Baseline/scripts.base.frameworks.metrics.cluster-intermediate-update/manager-1.notice.log
	testing/btest/Baseline/scripts.base.frameworks.metrics.notice/notice.log
	testing/btest/scripts/base/frameworks/metrics/basic-cluster.bro
This commit is contained in:
Seth Hall 2012-10-01 16:23:06 -04:00
commit 6750b0f7b9
911 changed files with 36856 additions and 5211 deletions

View file

@ -77,6 +77,9 @@ export {
node_type: NodeType;
## The IP address of the cluster node.
ip: addr;
## If the *ip* field is a non-global IPv6 address, this field
## can specify a particular :rfc:`4007` ``zone_id``.
zone_id: string &default="";
## The port to which the this local node can connect when
## establishing communication.
p: port;

View file

@ -19,23 +19,26 @@ event bro_init() &priority=9
# Connections from the control node for runtime control and update events.
# Every node in a cluster is eligible for control from this host.
if ( n$node_type == CONTROL )
Communication::nodes["control"] = [$host=n$ip, $connect=F,
$class="control", $events=control_events];
Communication::nodes["control"] = [$host=n$ip, $zone_id=n$zone_id,
$connect=F, $class="control",
$events=control_events];
if ( me$node_type == MANAGER )
{
if ( n$node_type == WORKER && n$manager == node )
Communication::nodes[i] =
[$host=n$ip, $connect=F,
[$host=n$ip, $zone_id=n$zone_id, $connect=F,
$class=i, $events=worker2manager_events, $request_logs=T];
if ( n$node_type == PROXY && n$manager == node )
Communication::nodes[i] =
[$host=n$ip, $connect=F,
[$host=n$ip, $zone_id=n$zone_id, $connect=F,
$class=i, $events=proxy2manager_events, $request_logs=T];
if ( n$node_type == TIME_MACHINE && me?$time_machine && me$time_machine == i )
Communication::nodes["time-machine"] = [$host=nodes[i]$ip, $p=nodes[i]$p,
Communication::nodes["time-machine"] = [$host=nodes[i]$ip,
$zone_id=nodes[i]$zone_id,
$p=nodes[i]$p,
$connect=T, $retry=1min,
$events=tm2manager_events];
}
@ -44,7 +47,8 @@ event bro_init() &priority=9
{
if ( n$node_type == WORKER && n$proxy == node )
Communication::nodes[i] =
[$host=n$ip, $connect=F, $class=i, $sync=T, $auth=T, $events=worker2proxy_events];
[$host=n$ip, $zone_id=n$zone_id, $connect=F, $class=i,
$sync=T, $auth=T, $events=worker2proxy_events];
# accepts connections from the previous one.
# (This is not ideal for setups with many proxies)
@ -53,16 +57,18 @@ event bro_init() &priority=9
{
if ( n?$proxy )
Communication::nodes[i]
= [$host=n$ip, $p=n$p,
= [$host=n$ip, $zone_id=n$zone_id, $p=n$p,
$connect=T, $auth=F, $sync=T, $retry=1mins];
else if ( me?$proxy && me$proxy == i )
Communication::nodes[me$proxy]
= [$host=nodes[i]$ip, $connect=F, $auth=T, $sync=T];
= [$host=nodes[i]$ip, $zone_id=nodes[i]$zone_id,
$connect=F, $auth=T, $sync=T];
}
# Finally the manager, to send it status updates.
if ( n$node_type == MANAGER && me$manager == i )
Communication::nodes["manager"] = [$host=nodes[i]$ip,
$zone_id=nodes[i]$zone_id,
$p=nodes[i]$p,
$connect=T, $retry=1mins,
$class=node,
@ -72,6 +78,7 @@ event bro_init() &priority=9
{
if ( n$node_type == MANAGER && me$manager == i )
Communication::nodes["manager"] = [$host=nodes[i]$ip,
$zone_id=nodes[i]$zone_id,
$p=nodes[i]$p,
$connect=T, $retry=1mins,
$class=node,
@ -79,6 +86,7 @@ event bro_init() &priority=9
if ( n$node_type == PROXY && me$proxy == i )
Communication::nodes["proxy"] = [$host=nodes[i]$ip,
$zone_id=nodes[i]$zone_id,
$p=nodes[i]$p,
$connect=T, $retry=1mins,
$sync=T, $class=node,
@ -87,6 +95,7 @@ event bro_init() &priority=9
if ( n$node_type == TIME_MACHINE &&
me?$time_machine && me$time_machine == i )
Communication::nodes["time-machine"] = [$host=nodes[i]$ip,
$zone_id=nodes[i]$zone_id,
$p=nodes[i]$p,
$connect=T,
$retry=1min,

View file

@ -2,6 +2,7 @@
##! and/or transfer events.
@load base/frameworks/packet-filter
@load base/utils/addrs
module Communication;
@ -9,17 +10,31 @@ export {
## The communication logging stream identifier.
redef enum Log::ID += { LOG };
## Which interface to listen on (0.0.0.0 for any interface).
## Which interface to listen on. The addresses ``0.0.0.0`` and ``[::]``
## are wildcards.
const listen_interface = 0.0.0.0 &redef;
## Which port to listen on.
const listen_port = 47757/tcp &redef;
## This defines if a listening socket should use SSL.
const listen_ssl = F &redef;
## Default compression level. Compression level is 0-9, with 0 = no
## Defines if a listening socket can bind to IPv6 addresses.
const listen_ipv6 = F &redef;
## If :bro:id:`Communication::listen_interface` is a non-global
## IPv6 address and requires a specific :rfc:`4007` ``zone_id``,
## it can be specified here.
const listen_ipv6_zone_id = "" &redef;
## Defines the interval at which to retry binding to
## :bro:id:`Communication::listen_interface` on
## :bro:id:`Communication::listen_port` if it's already in use.
const listen_retry = 30 secs &redef;
## Default compression level. Compression level is 0-9, with 0 = no
## compression.
global compression_level = 0 &redef;
@ -27,7 +42,7 @@ export {
type Info: record {
## The network time at which a communication event occurred.
ts: time &log;
## The peer name (if any) for which a communication event is concerned.
## The peer name (if any) with which a communication event is concerned.
peer: string &log &optional;
## Where the communication event message originated from, that is,
## either from the scripting layer or inside the Bro process.
@ -51,7 +66,11 @@ export {
type Node: record {
## Remote address.
host: addr;
## If the *host* field is a non-global IPv6 address, this field
## can specify a particular :rfc:`4007` ``zone_id``.
zone_id: string &optional;
## Port of the remote Bro communication endpoint if we are initiating
## the connection based on the :bro:id:`connect` field.
p: port &optional;
@ -101,7 +120,7 @@ export {
## The remote peer.
peer: event_peer &optional;
## Indicates the status of the node.
connected: bool &default = F;
};
@ -144,7 +163,7 @@ event bro_init() &priority=5
function do_script_log_common(level: count, src: count, msg: string)
{
Log::write(Communication::LOG, [$ts = network_time(),
Log::write(Communication::LOG, [$ts = network_time(),
$level = (level == REMOTE_LOG_INFO ? "info" : "error"),
$src_name = src_names[src],
$peer = get_event_peer()$descr,
@ -160,7 +179,7 @@ event remote_log(level: count, src: count, msg: string)
# This is a core generated event.
event remote_log_peer(p: event_peer, level: count, src: count, msg: string)
{
local rmsg = fmt("[#%d/%s:%d] %s", p$id, p$host, p$p, msg);
local rmsg = fmt("[#%d/%s:%d] %s", p$id, addr_to_uri(p$host), p$p, msg);
do_script_log_common(level, src, rmsg);
}
@ -178,10 +197,11 @@ function connect_peer(peer: string)
p = node$p;
local class = node?$class ? node$class : "";
local id = connect(node$host, p, class, node$retry, node$ssl);
local zone_id = node?$zone_id ? node$zone_id : "";
local id = connect(node$host, zone_id, p, class, node$retry, node$ssl);
if ( id == PEER_ID_NONE )
Log::write(Communication::LOG, [$ts = network_time(),
Log::write(Communication::LOG, [$ts = network_time(),
$peer = get_event_peer()$descr,
$message = "can't trigger connect"]);
pending_peers[id] = node;
@ -320,7 +340,7 @@ event bro_init() &priority = -10 # let others modify nodes
{
if ( |nodes| > 0 )
enable_communication();
for ( tag in nodes )
{
if ( ! nodes[tag]$connect )

View file

@ -11,6 +11,10 @@ export {
## The port of the host that will be controlled.
const host_port = 0/tcp &redef;
## If :bro:id:`Control::host` is a non-global IPv6 address and
## requires a specific :rfc:`4007` ``zone_id``, it can be set here.
const zone_id = "" &redef;
## The command that is being done. It's typically set on the
## command line.
const cmd = "" &redef;

View file

@ -149,3 +149,64 @@ signature dpd_ssl_client {
payload /^(\x16\x03[\x00\x01\x02]..\x01...\x03[\x00\x01\x02]|...?\x01[\x00\x01\x02][\x02\x03]).*/
tcp-state originator
}
signature dpd_ayiya {
ip-proto = udp
payload /^..\x11\x29/
enable "ayiya"
}
signature dpd_teredo {
ip-proto = udp
payload /^(\x00\x00)|(\x00\x01)|([\x60-\x6f])/
enable "teredo"
}
signature dpd_socks4_client {
ip-proto == tcp
# '32' is a rather arbitrary max length for the user name.
payload /^\x04[\x01\x02].{0,32}\x00/
tcp-state originator
}
signature dpd_socks4_server {
ip-proto == tcp
requires-reverse-signature dpd_socks4_client
payload /^\x00[\x5a\x5b\x5c\x5d]/
tcp-state responder
enable "socks"
}
signature dpd_socks4_reverse_client {
ip-proto == tcp
# '32' is a rather arbitrary max length for the user name.
payload /^\x04[\x01\x02].{0,32}\x00/
tcp-state responder
}
signature dpd_socks4_reverse_server {
ip-proto == tcp
requires-reverse-signature dpd_socks4_reverse_client
payload /^\x00[\x5a\x5b\x5c\x5d]/
tcp-state originator
enable "socks"
}
signature dpd_socks5_client {
ip-proto == tcp
# Watch for a few authentication methods to reduce false positives.
payload /^\x05.[\x00\x01\x02]/
tcp-state originator
}
signature dpd_socks5_server {
ip-proto == tcp
requires-reverse-signature dpd_socks5_client
# Watch for a single authentication method to be chosen by the server or
# the server to indicate the no authentication is required.
payload /^\x05(\x00|\x01[\x00\x01\x02])/
tcp-state responder
enable "socks"
}

View file

@ -3,8 +3,7 @@
module DPD;
## Add the DPD signatures to the signature framework.
redef signature_files += "base/frameworks/dpd/dpd.sig";
@load-sigs ./dpd.sig
export {
## Add the DPD logging stream identifier.
@ -105,5 +104,8 @@ event protocol_violation(c: connection, atype: count, aid: count,
reason: string) &priority=-5
{
if ( c?$dpd )
{
Log::write(DPD::LOG, c$dpd);
delete c$dpd;
}
}

View file

@ -0,0 +1,5 @@
@load ./main
@load ./readers/ascii
@load ./readers/raw
@load ./readers/benchmark

View file

@ -0,0 +1,156 @@
##! The input framework provides a way to read previously stored data either
##! as an event stream or into a bro table.
module Input;
export {
## The default input reader used. Defaults to `READER_ASCII`.
const default_reader = READER_ASCII &redef;
## The default reader mode used. Defaults to `MANUAL`.
const default_mode = MANUAL &redef;
## Flag that controls if the input framework accepts records
## that contain types that are not supported (at the moment
## file and function). If true, the input framework will
## warn in these cases, but continue. If false, it will
## abort. Defaults to false (abort)
const accept_unsupported_types = F &redef;
## TableFilter description type used for the `table` method.
type TableDescription: record {
## Common definitions for tables and events
## String that allows the reader to find the source.
## For `READER_ASCII`, this is the filename.
source: string;
## Reader to use for this stream
reader: Reader &default=default_reader;
## Read mode to use for this stream
mode: Mode &default=default_mode;
## Descriptive name. Used to remove a stream at a later time
name: string;
# Special definitions for tables
## Table which will receive the data read by the input framework
destination: any;
## Record that defines the values used as the index of the table
idx: any;
## Record that defines the values used as the elements of the table
## If val is undefined, destination has to be a set.
val: any &optional;
## Defines if the value of the table is a record (default), or a single value. Val
## can only contain one element when this is set to false.
want_record: bool &default=T;
## The event that is raised each time a value is added to, changed in or removed
## from the table. The event will receive an Input::Event enum as the first
## argument, the idx record as the second argument and the value (record) as the
## third argument.
ev: any &optional; # event containing idx, val as values.
## Predicate function that can decide if an insertion, update or removal should
## really be executed. Parameters are the same as for the event. If true is
## returned, the update is performed. If false is returned, it is skipped.
pred: function(typ: Input::Event, left: any, right: any): bool &optional;
## A key/value table that will be passed on the reader.
## Interpretation of the values is left to the writer, but
## usually they will be used for configuration purposes.
config: table[string] of string &default=table();
};
## EventFilter description type used for the `event` method.
type EventDescription: record {
## Common definitions for tables and events
## String that allows the reader to find the source.
## For `READER_ASCII`, this is the filename.
source: string;
## Reader to use for this steam
reader: Reader &default=default_reader;
## Read mode to use for this stream
mode: Mode &default=default_mode;
## Descriptive name. Used to remove a stream at a later time
name: string;
# Special definitions for events
## Record describing the fields to be retrieved from the source input.
fields: any;
## If want_record if false, the event receives each value in fields as a separate argument.
## If it is set to true (default), the event receives all fields in a single record value.
want_record: bool &default=T;
## The event that is raised each time a new line is received from the reader.
## The event will receive an Input::Event enum as the first element, and the fields as the following arguments.
ev: any;
## A key/value table that will be passed on the reader.
## Interpretation of the values is left to the writer, but
## usually they will be used for configuration purposes.
config: table[string] of string &default=table();
};
## Create a new table input from a given source. Returns true on success.
##
## description: `TableDescription` record describing the source.
global add_table: function(description: Input::TableDescription) : bool;
## Create a new event input from a given source. Returns true on success.
##
## description: `TableDescription` record describing the source.
global add_event: function(description: Input::EventDescription) : bool;
## Remove a input stream. Returns true on success and false if the named stream was not found.
##
## id: string value identifying the stream to be removed
global remove: function(id: string) : bool;
## Forces the current input to be checked for changes.
## Returns true on success and false if the named stream was not found
##
## id: string value identifying the stream
global force_update: function(id: string) : bool;
## Event that is called, when the update of a specific source is finished
global update_finished: event(name: string, source:string);
}
@load base/input.bif
module Input;
function add_table(description: Input::TableDescription) : bool
{
return __create_table_stream(description);
}
function add_event(description: Input::EventDescription) : bool
{
return __create_event_stream(description);
}
function remove(id: string) : bool
{
return __remove_stream(id);
}
function force_update(id: string) : bool
{
return __force_update(id);
}

View file

@ -0,0 +1,21 @@
##! Interface for the ascii input reader.
##!
##! The defaults are set to match Bro's ASCII output.
module InputAscii;
export {
## Separator between fields.
## Please note that the separator has to be exactly one character long
const separator = "\t" &redef;
## Separator between set elements.
## Please note that the separator has to be exactly one character long
const set_separator = "," &redef;
## String to use for empty fields.
const empty_field = "(empty)" &redef;
## String to use for an unset &optional field.
const unset_field = "-" &redef;
}

View file

@ -0,0 +1,23 @@
##! Interface for the ascii input reader.
module InputBenchmark;
export {
## multiplication factor for each second
const factor = 1.0 &redef;
## spread factor between lines
const spread = 0 &redef;
## spreading where usleep = 1000000 / autospread * num_lines
const autospread = 0.0 &redef;
## addition factor for each heartbeat
const addfactor = 0 &redef;
## stop spreading at x lines per heartbeat
const stopspreadat = 0 &redef;
## 1 -> enable timed spreading
const timedspread = 0.0 &redef;
}

View file

@ -0,0 +1,9 @@
##! Interface for the raw input reader.
module InputRaw;
export {
## Separator between input records.
## Please note that the separator has to be exactly one character long
const record_separator = "\n" &redef;
}

View file

@ -1,3 +1,6 @@
@load ./main
@load ./postprocessors
@load ./writers/ascii
@load ./writers/dataseries
@load ./writers/elasticsearch
@load ./writers/none

View file

@ -96,6 +96,12 @@ export {
## file name. Generally, filenames are expected to given
## without any extensions; writers will add appropiate
## extensions automatically.
##
## If this path is found to conflict with another filter's
## for the same writer type, it is automatically corrected
## by appending "-N", where N is the smallest integer greater
## or equal to 2 that allows the corrected path name to not
## conflict with another filter's.
path: string &optional;
## A function returning the output path for recording entries
@ -115,7 +121,10 @@ export {
## rec: An instance of the streams's ``columns`` type with its
## fields set to the values to be logged.
##
## Returns: The path to be used for the filter.
## Returns: The path to be used for the filter, which will be subject
## to the same automatic correction rules as the *path*
## field of :bro:type:`Log::Filter` in the case of conflicts
## with other filters trying to use the same writer/path pair.
path_func: function(id: ID, path: string, rec: any): string &optional;
## Subset of column names to record. If not given, all
@ -138,6 +147,11 @@ export {
## Callback function to trigger for rotated files. If not set, the
## default comes out of :bro:id:`Log::default_rotation_postprocessors`.
postprocessor: function(info: RotationInfo) : bool &optional;
## A key/value table that will be passed on to the writer.
## Interpretation of the values is left to the writer, but
## usually they will be used for configuration purposes.
config: table[string] of string &default=table();
};
## Sentinel value for indicating that a filter was not found when looked up.
@ -313,6 +327,11 @@ export {
## Log::default_rotation_postprocessor_cmd
## Log::default_rotation_postprocessors
global run_rotation_postprocessor_cmd: function(info: RotationInfo, npath: string) : bool;
## The streams which are currently active and not disabled.
## This table is not meant to be modified by users! Only use it for
## examining which streams are active.
global active_streams: table[ID] of Stream = table();
}
# We keep a script-level copy of all filters so that we can manipulate them.
@ -327,20 +346,23 @@ function __default_rotation_postprocessor(info: RotationInfo) : bool
{
if ( info$writer in default_rotation_postprocessors )
return default_rotation_postprocessors[info$writer](info);
else
# Return T by default so that postprocessor-less writers don't shutdown.
return T;
}
function default_path_func(id: ID, path: string, rec: any) : string
{
# The suggested path value is a previous result of this function
# or a filter path explicitly set by the user, so continue using it.
if ( path != "" )
return path;
local id_str = fmt("%s", id);
local parts = split1(id_str, /::/);
if ( |parts| == 2 )
{
# The suggested path value is a previous result of this function
# or a filter path explicitly set by the user, so continue using it.
if ( path != "" )
return path;
# Example: Notice::LOG -> "notice"
if ( parts[2] == "LOG" )
{
@ -356,11 +378,11 @@ function default_path_func(id: ID, path: string, rec: any) : string
output = cat(output, sub_bytes(module_parts[4],1,1), "_", sub_bytes(module_parts[4], 2, |module_parts[4]|));
return to_lower(output);
}
# Example: Notice::POLICY_LOG -> "notice_policy"
if ( /_LOG$/ in parts[2] )
parts[2] = sub(parts[2], /_LOG$/, "");
return cat(to_lower(parts[1]),"_",to_lower(parts[2]));
}
else
@ -376,13 +398,16 @@ function run_rotation_postprocessor_cmd(info: RotationInfo, npath: string) : boo
if ( pp_cmd == "" )
return T;
# Turn, e.g., Log::WRITER_ASCII into "ascii".
local writer = subst_string(to_lower(fmt("%s", info$writer)), "log::writer_", "");
# The date format is hard-coded here to provide a standardized
# script interface.
system(fmt("%s %s %s %s %s %d",
system(fmt("%s %s %s %s %s %d %s",
pp_cmd, npath, info$path,
strftime("%y-%m-%d_%H.%M.%S", info$open),
strftime("%y-%m-%d_%H.%M.%S", info$close),
info$terminating));
info$terminating, writer));
return T;
}
@ -392,11 +417,15 @@ function create_stream(id: ID, stream: Stream) : bool
if ( ! __create_stream(id, stream) )
return F;
active_streams[id] = stream;
return add_default_filter(id);
}
function disable_stream(id: ID) : bool
{
delete active_streams[id];
return __disable_stream(id);
}
@ -407,7 +436,7 @@ function add_filter(id: ID, filter: Filter) : bool
# definition.
if ( ! filter?$path_func )
filter$path_func = default_path_func;
filters[id, filter$name] = filter;
return __add_filter(id, filter);
}

View file

@ -8,12 +8,13 @@ export {
## into files. This is primarily for debugging purposes.
const output_to_stdout = F &redef;
## If true, include a header line with column names and description
## of the other ASCII logging options that were used.
const include_header = T &redef;
## If true, include lines with log meta information such as column names with
## types, the values of ASCII logging options that in use, and the time when the
## file was opened and closes (the latter at the end).
const include_meta = T &redef;
## Prefix for the header line if included.
const header_prefix = "#" &redef;
## Prefix for lines with meta information.
const meta_prefix = "#" &redef;
## Separator between fields.
const separator = "\t" &redef;

View file

@ -0,0 +1,60 @@
##! Interface for the DataSeries log writer.
module LogDataSeries;
export {
## Compression to use with the DS output file. Options are:
##
## 'none' -- No compression.
## 'lzf' -- LZF compression. Very quick, but leads to larger output files.
## 'lzo' -- LZO compression. Very fast decompression times.
## 'gz' -- GZIP compression. Slower than LZF, but also produces smaller output.
## 'bz2' -- BZIP2 compression. Slower than GZIP, but also produces smaller output.
const compression = "gz" &redef;
## The extent buffer size.
## Larger values here lead to better compression and more efficient writes, but
## also increase the lag between the time events are received and the time they
## are actually written to disk.
const extent_size = 65536 &redef;
## Should we dump the XML schema we use for this DS file to disk?
## If yes, the XML schema shares the name of the logfile, but has
## an XML ending.
const dump_schema = F &redef;
## How many threads should DataSeries spawn to perform compression?
## Note that this dictates the number of threads per log stream. If
## you're using a lot of streams, you may want to keep this number
## relatively small.
##
## Default value is 1, which will spawn one thread / stream.
##
## Maximum is 128, minimum is 1.
const num_threads = 1 &redef;
## Should time be stored as an integer or a double?
## Storing time as a double leads to possible precision issues and
## can (significantly) increase the size of the resulting DS log.
## That said, timestamps stored in double form are consistent
## with the rest of Bro, including the standard ASCII log. Hence, we
## use them by default.
const use_integer_for_time = F &redef;
}
# Default function to postprocess a rotated DataSeries log file. It moves the
# rotated file to a new name that includes a timestamp with the opening time, and
# then runs the writer's default postprocessor command on it.
function default_rotation_postprocessor_func(info: Log::RotationInfo) : bool
{
# Move file to name including both opening and closing time.
local dst = fmt("%s.%s.ds", info$path,
strftime(Log::default_rotation_date_format, info$open));
system(fmt("/bin/mv %s %s", info$fname, dst));
# Run default postprocessor.
return Log::run_rotation_postprocessor_cmd(info, dst);
}
redef Log::default_rotation_postprocessors += { [Log::WRITER_DATASERIES] = default_rotation_postprocessor_func };

View file

@ -0,0 +1,46 @@
##! Log writer for sending logs to an ElasticSearch server.
##!
##! Note: This module is in testing and is not yet considered stable!
##!
##! There is one known memory issue. If your elasticsearch server is
##! running slowly and taking too long to return from bulk insert
##! requests, the message queue to the writer thread will continue
##! growing larger and larger giving the appearance of a memory leak.
module LogElasticSearch;
export {
## Name of the ES cluster
const cluster_name = "elasticsearch" &redef;
## ES Server
const server_host = "127.0.0.1" &redef;
## ES Port
const server_port = 9200 &redef;
## Name of the ES index
const index_prefix = "bro" &redef;
## The ES type prefix comes before the name of the related log.
## e.g. prefix = "bro\_" would create types of bro_dns, bro_software, etc.
const type_prefix = "" &redef;
## The time before an ElasticSearch transfer will timeout.
## This is not working!
const transfer_timeout = 2secs;
## The batch size is the number of messages that will be queued up before
## they are sent to be bulk indexed.
const max_batch_size = 1000 &redef;
## The maximum amount of wall-clock time that is allowed to pass without
## finishing a bulk log send. This represents the maximum delay you
## would like to have with your logs before they are sent to ElasticSearch.
const max_batch_interval = 1min &redef;
## The maximum byte size for a buffered JSON string to send to the bulk
## insert API.
const max_byte_size = 1024 * 1024 &redef;
}

View file

@ -0,0 +1,17 @@
##! Interface for the None log writer. Thiis writer is mainly for debugging.
module LogNone;
export {
## If true, output debugging output that can be useful for unit
## testing the logging framework.
const debug = F &redef;
}
function default_rotation_postprocessor_func(info: Log::RotationInfo) : bool
{
return T;
}
redef Log::default_rotation_postprocessors += { [Log::WRITER_NONE] = default_rotation_postprocessor_func };

View file

@ -23,7 +23,10 @@ redef Cluster::worker2manager_events += /Notice::cluster_notice/;
@if ( Cluster::local_node_type() != Cluster::MANAGER )
# The notice policy is completely handled by the manager and shouldn't be
# done by workers or proxies to save time for packet processing.
redef policy = {};
event bro_init() &priority=-11
{
Notice::policy = table();
}
event Notice::begin_suppression(n: Notice::Info)
{

View file

@ -1,5 +1,5 @@
##! This framework is intended to create an output and filtering path for
##! internal messages/warnings/errors. It should typically be loaded to
##! This framework is intended to create an output and filtering path for
##! internal messages/warnings/errors. It should typically be loaded to
##! avoid Bro spewing internal messages to standard error and instead log
##! them to a file in a standard way. Note that this framework deals with
##! the handling of internally-generated reporter messages, for the
@ -13,11 +13,11 @@ export {
redef enum Log::ID += { LOG };
## An indicator of reporter message severity.
type Level: enum {
type Level: enum {
## Informational, not needing specific attention.
INFO,
INFO,
## Warning of a potential problem.
WARNING,
WARNING,
## A non-fatal error that should be addressed, but doesn't
## terminate program execution.
ERROR
@ -36,24 +36,55 @@ export {
## Not all reporter messages will have locations in them though.
location: string &log &optional;
};
## Tunable for sending reporter warning messages to STDERR. The option to
## turn it off is presented here in case Bro is being run by some
## external harness and shouldn't output anything to the console.
const warnings_to_stderr = T &redef;
## Tunable for sending reporter error messages to STDERR. The option to
## turn it off is presented here in case Bro is being run by some
## external harness and shouldn't output anything to the console.
const errors_to_stderr = T &redef;
}
global stderr: file;
event bro_init() &priority=5
{
Log::create_stream(Reporter::LOG, [$columns=Info]);
if ( errors_to_stderr || warnings_to_stderr )
stderr = open("/dev/stderr");
}
event reporter_info(t: time, msg: string, location: string)
event reporter_info(t: time, msg: string, location: string) &priority=-5
{
Log::write(Reporter::LOG, [$ts=t, $level=INFO, $message=msg, $location=location]);
}
event reporter_warning(t: time, msg: string, location: string)
event reporter_warning(t: time, msg: string, location: string) &priority=-5
{
if ( warnings_to_stderr )
{
if ( t > double_to_time(0.0) )
print stderr, fmt("WARNING: %.6f %s (%s)", t, msg, location);
else
print stderr, fmt("WARNING: %s (%s)", msg, location);
}
Log::write(Reporter::LOG, [$ts=t, $level=WARNING, $message=msg, $location=location]);
}
event reporter_error(t: time, msg: string, location: string)
event reporter_error(t: time, msg: string, location: string) &priority=-5
{
if ( errors_to_stderr )
{
if ( t > double_to_time(0.0) )
print stderr, fmt("ERROR: %.6f %s (%s)", t, msg, location);
else
print stderr, fmt("ERROR: %s (%s)", msg, location);
}
Log::write(Reporter::LOG, [$ts=t, $level=ERROR, $message=msg, $location=location]);
}

View file

@ -0,0 +1 @@
@load ./main

View file

@ -0,0 +1,149 @@
##! This script handles the tracking/logging of tunnels (e.g. Teredo,
##! AYIYA, or IP-in-IP such as 6to4 where "IP" is either IPv4 or IPv6).
##!
##! For any connection that occurs over a tunnel, information about its
##! encapsulating tunnels is also found in the *tunnel* field of
##! :bro:type:`connection`.
module Tunnel;
export {
## The tunnel logging stream identifier.
redef enum Log::ID += { LOG };
## Types of interesting activity that can occur with a tunnel.
type Action: enum {
## A new tunnel (encapsulating "connection") has been seen.
DISCOVER,
## A tunnel connection has closed.
CLOSE,
## No new connections over a tunnel happened in the amount of
## time indicated by :bro:see:`Tunnel::expiration_interval`.
EXPIRE,
};
## The record type which contains column fields of the tunnel log.
type Info: record {
## Time at which some tunnel activity occurred.
ts: time &log;
## The unique identifier for the tunnel, which may correspond
## to a :bro:type:`connection`'s *uid* field for non-IP-in-IP tunnels.
## This is optional because there could be numerous connections
## for payload proxies like SOCKS but we should treat it as a single
## tunnel.
uid: string &log &optional;
## The tunnel "connection" 4-tuple of endpoint addresses/ports.
## For an IP tunnel, the ports will be 0.
id: conn_id &log;
## The type of tunnel.
tunnel_type: Tunnel::Type &log;
## The type of activity that occurred.
action: Action &log;
};
## Logs all tunnels in an encapsulation chain with action
## :bro:see:`Tunnel::DISCOVER` that aren't already in the
## :bro:id:`Tunnel::active` table and adds them if not.
global register_all: function(ecv: EncapsulatingConnVector);
## Logs a single tunnel "connection" with action
## :bro:see:`Tunnel::DISCOVER` if it's not already in the
## :bro:id:`Tunnel::active` table and adds it if not.
global register: function(ec: EncapsulatingConn);
## Logs a single tunnel "connection" with action
## :bro:see:`Tunnel::EXPIRE` and removes it from the
## :bro:id:`Tunnel::active` table.
##
## t: A table of tunnels.
##
## idx: The index of the tunnel table corresponding to the tunnel to expire.
##
## Returns: 0secs, which when this function is used as an
## :bro:attr:`&expire_func`, indicates to remove the element at
## *idx* immediately.
global expire: function(t: table[conn_id] of Info, idx: conn_id): interval;
## Removes a single tunnel from the :bro:id:`Tunnel::active` table
## and logs the closing/expiration of the tunnel.
##
## tunnel: The tunnel which has closed or expired.
##
## action: The specific reason for the tunnel ending.
global close: function(tunnel: Info, action: Action);
## The amount of time a tunnel is not used in establishment of new
## connections before it is considered inactive/expired.
const expiration_interval = 1hrs &redef;
## Currently active tunnels. That is, tunnels for which new, encapsulated
## connections have been seen in the interval indicated by
## :bro:see:`Tunnel::expiration_interval`.
global active: table[conn_id] of Info = table() &read_expire=expiration_interval &expire_func=expire;
}
const ayiya_ports = { 5072/udp };
redef dpd_config += { [ANALYZER_AYIYA] = [$ports = ayiya_ports] };
const teredo_ports = { 3544/udp };
redef dpd_config += { [ANALYZER_TEREDO] = [$ports = teredo_ports] };
redef likely_server_ports += { ayiya_ports, teredo_ports };
event bro_init() &priority=5
{
Log::create_stream(Tunnel::LOG, [$columns=Info]);
}
function register_all(ecv: EncapsulatingConnVector)
{
for ( i in ecv )
register(ecv[i]);
}
function register(ec: EncapsulatingConn)
{
if ( ec$cid !in active )
{
local tunnel: Info;
tunnel$ts = network_time();
if ( ec?$uid )
tunnel$uid = ec$uid;
tunnel$id = ec$cid;
tunnel$action = DISCOVER;
tunnel$tunnel_type = ec$tunnel_type;
active[ec$cid] = tunnel;
Log::write(LOG, tunnel);
}
}
function close(tunnel: Info, action: Action)
{
tunnel$action = action;
tunnel$ts = network_time();
Log::write(LOG, tunnel);
delete active[tunnel$id];
}
function expire(t: table[conn_id] of Info, idx: conn_id): interval
{
close(t[idx], EXPIRE);
return 0secs;
}
event new_connection(c: connection) &priority=5
{
if ( c?$tunnel )
register_all(c$tunnel);
}
event tunnel_changed(c: connection, e: EncapsulatingConnVector) &priority=5
{
register_all(e);
}
event connection_state_remove(c: connection) &priority=-5
{
if ( c$id in active )
close(active[c$id], CLOSE);
}