Merge branch 'master' into topic/jsiwek/reorg-followup

Conflicts:
	scripts/base/frameworks/cluster/setup-connections.bro
	scripts/base/frameworks/metrics/main.bro
	scripts/base/frameworks/notice/actions/email_admin.bro
	scripts/base/frameworks/notice/weird.bro
	scripts/base/protocols/mime/file-hash.bro
	scripts/base/protocols/mime/file-ident.bro
	scripts/policy/frameworks/communication/listen-clear.bro
	scripts/policy/frameworks/communication/listen-ssl.bro
	scripts/policy/frameworks/control/controller.bro
	scripts/policy/frameworks/metrics/http-example.bro
	scripts/policy/frameworks/metrics/ssl-example.bro
	scripts/policy/protocols/conn/scan.bro
	src/CMakeLists.txt
This commit is contained in:
Jon Siwek 2011-08-15 15:34:25 -05:00
commit 41dd0b98e9
79 changed files with 855 additions and 311 deletions

View file

@ -19,7 +19,7 @@ redef peer_description = Cluster::node;
@load ./setup-connections
# Don't start the listening process until we're a bit more sure that the
# Don't load the listening script until we're a bit more sure that the
# cluster framework is actually being enabled.
@load frameworks/communication/listen-clear

View file

@ -48,6 +48,25 @@ export {
time_machine: string &optional;
};
## This function can be called at any time to determine if the cluster
## framework is being enabled for this run.
global is_enabled: function(): bool;
## This function can be called at any time to determine what type of
## cluster node the current Bro instance is going to be acting as.
## :bro:id:`is_enabled` should be called first to find out if this is
## actually going to be a cluster node.
global local_node_type: function(): NodeType;
## This gives the value for the number of workers currently connected to,
## and it's maintained internally by the cluster framework. It's
## primarily intended for use by managers to find out how many workers
## should be responding to requests.
global worker_count: count = 0;
## The cluster layout definition. This should be placed into a filter
## named cluster-layout.bro somewhere in the BROPATH. It will be
## automatically loaded if the CLUSTER_NODE environment variable is set.
const nodes: table[string] of Node = {} &redef;
## This is usually supplied on the command line for each instance
@ -55,7 +74,29 @@ export {
const node = getenv("CLUSTER_NODE") &redef;
}
event bro_init()
function is_enabled(): bool
{
return (node != "");
}
function local_node_type(): NodeType
{
return nodes[node]$node_type;
}
event remote_connection_handshake_done(p: event_peer)
{
if ( nodes[p$descr]$node_type == WORKER )
++worker_count;
}
event remote_connection_closed(p: event_peer)
{
if ( nodes[p$descr]$node_type == WORKER )
--worker_count;
}
event bro_init() &priority=5
{
# If a node is given, but it's an unknown name we need to fail.
if ( node != "" && node !in nodes )

View file

@ -12,7 +12,7 @@
@prefixes += cluster-manager
## Load the script for local site configuration for the manager node.
# Load the script for local site configuration for the manager node.
@load site/local-manager
## Turn off remote logging since this is the manager and should only log here.

View file

@ -1,7 +1,7 @@
@prefixes += cluster-proxy
## Load the script for local site configuration for proxy nodes.
# Load the script for local site configuration for proxy nodes.
@load site/local-proxy
## The proxy only syncs state; does not forward events.

View file

@ -2,7 +2,7 @@
@prefixes += cluster-worker
## Load the script for local site configuration for the worker nodes.
# Load the script for local site configuration for the worker nodes.
@load site/local-worker
## Don't do any local logging.

View file

@ -62,13 +62,12 @@ event bro_init() &priority=9
$connect=T, $retry=1mins,
$class=node];
}
else if ( me$node_type == WORKER )
{
if ( n$node_type == MANAGER && me$manager == i )
Communication::nodes["manager"] = [$host=nodes[i]$ip, $p=nodes[i]$p,
$connect=T, $retry=1mins,
$class=node];
$class=node, $events=manager_events];
if ( n$node_type == PROXY && me$proxy == i )
Communication::nodes["proxy"] = [$host=nodes[i]$ip, $p=nodes[i]$p,

View file

@ -1 +1 @@
@load ./main
@load ./main

View file

@ -33,10 +33,12 @@ export {
##
## id: The log stream.
## path: A suggested path value, which may be either the filter's ``path``
## if defined or a fall-back generated internally.
## if defined or a fall-back generated internally.
## rec: An instance of the streams's ``columns`` type with its
## fields set to the values to logged.
##
## Returns: The path to be used for the filter.
global default_path_func: function(id: ID, path: string) : string &redef;
global default_path_func: function(id: ID, path: string, rec: any) : string &redef;
## Filter customizing logging.
type Filter: record {
@ -71,7 +73,15 @@ export {
## different strings for separate calls, but be careful: it's
## easy to flood the disk by returning a new string for each
## connection ...
path_func: function(id: ID, path: string): string &optional;
##
## id: The log stream.
## path: A suggested path value, which may be either the filter's ``path``
## if defined or a fall-back generated internally.
## rec: An instance of the streams's ``columns`` type with its
## fields set to the values to logged.
##
## Returns: The path to be used for the filter.
path_func: function(id: ID, path: string, rec: any): string &optional;
## Subset of column names to record. If not given, all
## columns are recorded.
@ -160,7 +170,7 @@ function __default_rotation_postprocessor(info: RotationInfo) : bool
return default_rotation_postprocessors[info$writer](info);
}
function default_path_func(id: ID, path: string) : string
function default_path_func(id: ID, path: string, rec: any) : string
{
# TODO for Seth: Do what you want. :)
return path;

View file

@ -1 +1,11 @@
@load ./main
# The cluster framework must be loaded first.
@load base/frameworks/cluster
# Load either the cluster support script or the non-cluster support script.
@if ( Cluster::is_enabled() )
@load ./cluster
@else
@load ./non-cluster
@endif

View file

@ -0,0 +1,146 @@
##! This implements transparent cluster support for the metrics framework.
##! Do not load this file directly. It's only meant to be loaded automatically
##! and will be depending on if the cluster framework has been enabled.
##! The goal of this script is to make metric calculation completely and
##! transparently automated when running on a cluster.
@load base/frameworks/cluster
module Metrics;
export {
## This event is sent by the manager in a cluster to initiate the 3
## collection of metrics values
global cluster_collect: event(uid: string, id: ID, filter_name: string);
## This event is sent by nodes that are collecting metrics after receiving
## a request for the metric filter from the manager.
global cluster_results: event(uid: string, id: ID, filter_name: string, data: MetricTable, done: bool);
## This event is used internally by workers to send result chunks.
global send_data: event(uid: string, id: ID, filter_name: string, data: MetricTable);
## This value allows a user to decide how large of result groups the
## workers should transmit values.
const cluster_send_in_groups_of = 50 &redef;
}
# This is maintained by managers so they can know what data they requested and
# when they requested it.
global requested_results: table[string] of time = table() &create_expire=5mins;
# TODO: Both of the next variables make the assumption that a value never
# takes longer than 5 minutes to transmit from workers to manager. This needs to
# be tunable or self-tuning. These should also be restructured to be
# maintained within a single variable.
# This variable is maintained by manager nodes as they collect and aggregate
# results.
global collecting_results: table[string, ID, string] of MetricTable &create_expire=5mins;
# This variable is maintained by manager nodes to track how many "dones" they
# collected per collection unique id. Once the number of results for a uid
# matches the number of peer nodes that results should be coming from, the
# result is written out and deleted from here.
# TODO: add an &expire_func in case not all results are received.
global done_with: table[string] of count &create_expire=5mins &default=0;
# Add events to the cluster framework to make this work.
redef Cluster::manager_events += /Metrics::cluster_collect/;
redef Cluster::worker_events += /Metrics::cluster_results/;
# The metrics collection process can only be done by a manager.
@if ( Cluster::local_node_type() == Cluster::MANAGER )
event Metrics::log_it(filter: Filter)
{
local uid = unique_id("");
# Set some tracking variables.
requested_results[uid] = network_time();
collecting_results[uid, filter$id, filter$name] = table();
# Request data from peers.
event Metrics::cluster_collect(uid, filter$id, filter$name);
# Schedule the log_it event for the next break period.
schedule filter$break_interval { Metrics::log_it(filter) };
}
@endif
@if ( Cluster::local_node_type() == Cluster::WORKER )
event Metrics::send_data(uid: string, id: ID, filter_name: string, data: MetricTable)
{
#print fmt("WORKER %s: sending data for uid %s...", Cluster::node, uid);
local local_data: MetricTable;
local num_added = 0;
for ( index in data )
{
local_data[index] = data[index];
delete data[index];
# Only send cluster_send_in_groups_of at a time. Queue another
# event to send the next group.
if ( cluster_send_in_groups_of == ++num_added )
break;
}
local done = F;
# If data is empty, this metric is done.
if ( |data| == 0 )
done = T;
event Metrics::cluster_results(uid, id, filter_name, local_data, done);
if ( ! done )
event Metrics::send_data(uid, id, filter_name, data);
}
event Metrics::cluster_collect(uid: string, id: ID, filter_name: string)
{
#print fmt("WORKER %s: received the cluster_collect event.", Cluster::node);
event Metrics::send_data(uid, id, filter_name, store[id, filter_name]);
# Lookup the actual filter and reset it, the reference to the data
# currently stored will be maintained interally by the send_data event.
reset(filter_store[id, filter_name]);
}
@endif
@if ( Cluster::local_node_type() == Cluster::MANAGER )
event Metrics::cluster_results(uid: string, id: ID, filter_name: string, data: MetricTable, done: bool)
{
#print fmt("MANAGER: receiving results from %s", get_event_peer()$descr);
local local_data = collecting_results[uid, id, filter_name];
for ( index in data )
{
if ( index !in local_data )
local_data[index] = 0;
local_data[index] += data[index];
}
# Mark another worker as being "done" for this uid.
if ( done )
++done_with[uid];
# If the data has been collected from all peers, we are done and ready to log.
if ( Cluster::worker_count == done_with[uid] )
{
local ts = network_time();
# Log the time this was initially requested if it's available.
if ( uid in requested_results )
ts = requested_results[uid];
write_log(ts, filter_store[id, filter_name], local_data);
if ( [uid, id, filter_name] in collecting_results )
delete collecting_results[uid, id, filter_name];
if ( uid in done_with )
delete done_with[uid];
if ( uid in requested_results )
delete requested_results[uid];
}
}
@endif

View file

@ -15,16 +15,7 @@ export {
## current value to the logging stream.
const default_break_interval = 15mins &redef;
type Info: record {
ts: time &log;
metric_id: ID &log;
filter_name: string &log;
agg_subnet: string &log &optional;
index: string &log &optional;
value: count &log;
};
type Entry: record {
type Index: record {
## Host is the value to which this metric applies.
host: addr &optional;
@ -35,11 +26,19 @@ export {
## value in a Host header. This is an example of a non-host based
## metric since multiple IP addresses could respond for the same Host
## header value.
index: string &default="";
str: string &optional;
## The value by which the counter should be increased in each filter
## where this entry is accepted.
increment: count &default=1;
## The CIDR block that this metric applies to. This is typically
## only used internally for host based aggregation.
network: subnet &optional;
} &log;
type Info: record {
ts: time &log;
metric_id: ID &log;
filter_name: string &log;
index: Index &log;
value: count &log;
};
# TODO: configure a metrics filter logging stream to log the current
@ -54,11 +53,11 @@ export {
name: string &default="default";
## A predicate so that you can decide per index if you would like
## to accept the data being inserted.
pred: function(entry: Entry): bool &optional;
pred: function(index: Index): bool &optional;
## Global mask by which you'd like to aggregate traffic.
aggregation_mask: count &optional;
## This is essentially applying names to various subnets.
aggregation_table: table[subnet] of string &optional;
aggregation_table: table[subnet] of subnet &optional;
## The interval at which the metric should be "broken" and written
## to the logging stream.
break_interval: interval &default=default_break_interval;
@ -70,6 +69,7 @@ export {
## A straight threshold for generating a notice.
notice_threshold: count &optional;
## A series of thresholds at which to generate notices.
## TODO: This is not implemented yet!
notice_thresholds: vector of count &optional;
## If this and a $notice_threshold value are set, this notice type
## will be generated by the metrics framework.
@ -77,15 +77,23 @@ export {
};
global add_filter: function(id: ID, filter: Filter);
global add_data: function(id: ID, entry: Entry);
global add_data: function(id: ID, index: Index, increment: count);
# This is the event that is used to "finish" metrics and adapt the metrics
# framework for clustered or non-clustered usage.
global log_it: event(filter: Filter);
global log_metrics: event(rec: Info);
}
global metric_filters: table[ID] of vector of Filter = table();
redef record Notice::Info += {
metric_index: Index &log &optional;
};
type MetricIndex: table[string] of count &default=0;
type MetricTable: table[string] of MetricIndex;
global metric_filters: table[ID] of vector of Filter = table();
global filter_store: table[ID, string] of Filter = table();
type MetricTable: table[Index] of count &default=0;
# This is indexed by metric ID and stream filter name.
global store: table[ID, string] of MetricTable = table();
@ -97,63 +105,45 @@ event bro_init() &priority=5
{
Log::create_stream(METRICS, [$columns=Info, $ev=log_metrics]);
}
function write_log(ts: time, filter: Filter, data: MetricTable)
{
for ( index in data )
{
local val = data[index];
local m: Info = [$ts=ts,
$metric_id=filter$id,
$filter_name=filter$name,
$index=index,
$value=val];
if ( m$index?$host &&
filter?$notice_threshold &&
m$value >= filter$notice_threshold )
{
NOTICE([$note=filter$note,
$msg=fmt("Metrics threshold crossed by %s %d/%d", index$host, m$value, filter$notice_threshold),
$src=m$index$host, $n=m$value,
$metric_index=index]);
}
else if ( filter?$notice_thresholds &&
m$value >= filter$notice_thresholds[thresholds[cat(filter$id,filter$name)]] )
{
# TODO: implement this
}
if ( filter$log )
Log::write(METRICS, m);
}
}
function reset(filter: Filter)
{
store[filter$id, filter$name] = table();
}
event log_it(filter: Filter)
{
# If this node is the manager in a cluster, this needs to request values
# for this metric from all of the workers.
local id = filter$id;
local name = filter$name;
for ( agg_subnet in store[id, name] )
{
local metric_values = store[id, name][agg_subnet];
for ( index in metric_values )
{
local val = metric_values[index];
local m: Info = [$ts=network_time(),
$metric_id=id,
$filter_name=name,
$agg_subnet=fmt("%s", agg_subnet),
$index=index,
$value=val];
if ( filter?$notice_threshold &&
m$value >= filter$notice_threshold )
{
print m;
NOTICE([$note=filter$note,
$msg=fmt("Metrics threshold crossed by %s %d/%d", m$agg_subnet, m$value, filter$notice_threshold),
$n=m$value]);
}
else if ( filter?$notice_thresholds &&
m$value >= filter$notice_thresholds[thresholds[cat(id,name)]] )
{
# TODO: implement this
}
# If there wasn't an index, remove the field.
if ( index == "" )
delete m$index;
# If there wasn't an aggregation subnet, remove the field.
if ( agg_subnet == "" )
delete m$agg_subnet;
Log::write(METRICS, m);
}
}
reset(filter);
schedule filter$break_interval { log_it(filter) };
}
function add_filter(id: ID, filter: Filter)
{
if ( filter?$aggregation_table && filter?$aggregation_mask )
@ -179,13 +169,13 @@ function add_filter(id: ID, filter: Filter)
metric_filters[id] = vector();
metric_filters[id][|metric_filters[id]|] = filter;
filter_store[id, filter$name] = filter;
store[id, filter$name] = table();
# Only do this on the manager if in a cluster.
schedule filter$break_interval { log_it(filter) };
schedule filter$break_interval { Metrics::log_it(filter) };
}
function add_data(id: ID, entry: Entry)
function add_data(id: ID, index: Index, increment: count)
{
if ( id !in metric_filters )
return;
@ -198,38 +188,28 @@ function add_data(id: ID, entry: Entry)
local filter = filters[filter_id];
# If this filter has a predicate, run the predicate and skip this
# entry if the predicate return false.
# index if the predicate return false.
if ( filter?$pred &&
! filter$pred(entry) )
! filter$pred(index) )
next;
local agg_subnet = "";
local filt_store = store[id, filter$name];
if ( entry?$host )
if ( index?$host )
{
if ( filter?$aggregation_mask )
{
local agg_mask = filter$aggregation_mask;
agg_subnet = fmt("%s", mask_addr(entry$host, agg_mask));
index$network = mask_addr(index$host, filter$aggregation_mask);
delete index$host;
}
else if ( filter?$aggregation_table )
{
agg_subnet = fmt("%s", filter$aggregation_table[entry$host]);
# if an aggregation table is being used and the value isn't
# in the table, that means we aren't interested in it.
if ( agg_subnet == "" )
next;
index$network = filter$aggregation_table[index$host];
delete index$host;
}
else
agg_subnet = fmt("%s", entry$host);
}
if ( agg_subnet !in filt_store )
filt_store[agg_subnet] = table();
local fs = filt_store[agg_subnet];
if ( entry$index !in fs )
fs[entry$index] = 0;
fs[entry$index] = fs[entry$index] + entry$increment;
if ( index !in filt_store )
filt_store[index] = 0;
filt_store[index] += increment;
}
}

View file

@ -0,0 +1,17 @@
module Metrics;
export {
}
event Metrics::log_it(filter: Filter)
{
local id = filter$id;
local name = filter$name;
write_log(network_time(), filter, store[id, name]);
reset(filter);
schedule filter$break_interval { Metrics::log_it(filter) };
}

View file

@ -6,7 +6,8 @@
@load ./actions/drop
@load ./actions/email_admin
@load ./actions/page
@load ./actions/add-geodata
# Load the script to add hostnames to emails by default.
# NOTE: this exposes a memleak in async DNS lookups.
#@load ./extend-email/hostnames
# There shouldn't be any defaul toverhead from loading these since they
# *should* only do anything when notices have the ACTION_EMAIL action applied.
@load ./extend-email/hostnames

View file

@ -0,0 +1,47 @@
##! This script adds geographic location data to notices for the "remote"
##! host in a connection. It does make the assumption that one of the
##! addresses in a connection is "local" and one is "remote" which is
##! probably a safe assumption to make in most cases. If both addresses
##! are remote, it will use the $src address.
module Notice;
export {
redef enum Action += {
## Indicates that the notice should have geodata added for the
## "remote" host. :bro:id:`Site::local_nets` must be defined
## in order for this to work.
ACTION_ADD_GEODATA
};
redef record Info += {
## If libGeoIP support is built in, notices can have geographic
## information attached to them.
remote_location: geo_location &log &optional;
};
## Notice types which should have the "remote" location looked up.
## If GeoIP support is not built in, this does nothing.
const lookup_location_types: set[Notice::Type] = {} &redef;
## Add a helper to the notice policy for looking up GeoIP data.
redef Notice::policy += {
[$pred(n: Notice::Info) = { return (n$note in Notice::lookup_location_types); },
$priority = 10],
};
}
# This is handled at a high priority in case other notice handlers
# want to use the data.
event notice(n: Notice::Info) &priority=10
{
if ( ACTION_ADD_GEODATA in n$actions &&
|Site::local_nets| > 0 &&
! n?$remote_location )
{
if ( n?$src && ! Site::is_local_addr(n$src) )
n$remote_location = lookup_location(n$src);
else if ( n?$dst && ! Site::is_local_addr(n$dst) )
n$remote_location = lookup_location(n$dst);
}
}

View file

@ -23,11 +23,11 @@
@load base/frameworks/signatures
@load base/frameworks/packet-filter
@load base/frameworks/software
@load base/frameworks/intel
@load base/frameworks/metrics
@load base/frameworks/communication
@load base/frameworks/control
@load base/frameworks/cluster
@load base/frameworks/metrics
@load base/frameworks/intel
@load base/frameworks/reporter
@load base/protocols/conn

View file

@ -17,9 +17,11 @@ export {
## Networks that are considered "local".
const local_nets: set[subnet] &redef;
## This is used for mapping between local networks and string
## values for the CIDRs represented.
global local_nets_table: table[subnet] of string = {};
## This is used for retrieving the subnet when you multiple
## :bro:id:`local_nets`. A membership query can be done with an
## :bro:type:`addr` and the table will yield the subnet it was found
## within.
global local_nets_table: table[subnet] of subnet = {};
## Networks that are considered "neighbors".
const neighbor_nets: set[subnet] &redef;
@ -145,6 +147,6 @@ event bro_init() &priority=10
# Create the local_nets mapping table.
for ( cidr in Site::local_nets )
local_nets_table[cidr] = fmt("%s", cidr);
local_nets_table[cidr] = cidr;
}