Large update for the SumStats framework.

- On-demand access to sumstats results through "return from"
   functions named SumStats::request and Sumstats::request_key.
   Both functions are tested in standalone and clustered modes.

 - $name field has returned to SumStats which simplifies cluster
   code and makes the on-demand access stuff possible.

 - Clustered results can only be collected for 1 minute from their
   time of creation now instead of time of last read.

 - Thresholds use doubles instead of counts everywhere now.

 - Calculation dependency resolution occurs at start up time now
   instead of doing it at observation time which provide a minor
   cpu performance improvement.  A new plugin registration mechanism
   was created to support this change.

 - AppStats now has a minimal doc string and is broken into hook-based
   plugins.

 - AppStats and traceroute detection added to local.bro
This commit is contained in:
Seth Hall 2013-05-21 15:52:59 -04:00
parent 7d7d30e1f7
commit bec965b66f
34 changed files with 687 additions and 277 deletions

View file

@ -27,39 +27,34 @@ export {
## performed. In practice this should hopefully have a minimal effect.
const max_outstanding_global_views = 10 &redef;
## Intermediate updates can cause overload situations on very large clusters. This
## option may help reduce load and correct intermittent problems. The goal for this
## option is also meant to be temporary.
const enable_intermediate_updates = T &redef;
## Event sent by the manager in a cluster to initiate the collection of values for
## a sumstat.
global cluster_ss_request: event(uid: string, ssid: string);
global cluster_ss_request: event(uid: string, ss_name: string, cleanup: bool);
## Event sent by nodes that are collecting sumstats after receiving a request for
## the sumstat from the manager.
global cluster_ss_response: event(uid: string, ssid: string, data: ResultTable, done: bool);
global cluster_ss_response: event(uid: string, ss_name: string, data: ResultTable, done: bool);
## This event is sent by the manager in a cluster to initiate the collection of
## a single key value from a sumstat. It's typically used to get intermediate
## updates before the break interval triggers to speed detection of a value
## crossing a threshold.
global cluster_key_request: event(uid: string, ssid: string, key: Key);
global cluster_key_request: event(uid: string, ss_name: string, key: Key, cleanup: bool);
## This event is sent by nodes in response to a
## :bro:id:`SumStats::cluster_key_request` event.
global cluster_key_response: event(uid: string, ssid: string, key: Key, result: Result);
global cluster_key_response: event(uid: string, ss_name: string, key: Key, result: Result, cleanup: bool);
## This is sent by workers to indicate that they crossed the percent
## of the current threshold by the percentage defined globally in
## :bro:id:`SumStats::cluster_request_global_view_percent`
global cluster_key_intermediate_response: event(ssid: string, key: SumStats::Key);
global cluster_key_intermediate_response: event(ss_name: string, key: SumStats::Key);
## This event is scheduled internally on workers to send result chunks.
global send_data: event(uid: string, ssid: string, data: ResultTable);
global send_data: event(uid: string, ss_name: string, data: ResultTable, cleanup: bool);
## This event is generated when a threshold is crossed.
global cluster_threshold_crossed: event(ssid: string, key: SumStats::Key, thold: Thresholding);
global cluster_threshold_crossed: event(ss_name: string, key: SumStats::Key, thold: Thresholding);
}
# Add events to the cluster framework to make this work.
@ -74,44 +69,38 @@ redef Cluster::worker2manager_events += /SumStats::cluster_(ss_response|key_resp
# an intermediate result has been received.
global recent_global_view_keys: table[string, Key] of count &create_expire=1min &default=0;
event bro_init() &priority=-100
{
# The manager is the only host allowed to track these.
stats_store = table();
reducer_store = table();
}
# This is done on all non-manager node types in the event that a sumstat is
# being collected somewhere other than a worker.
function data_added(ss: SumStat, key: Key, result: Result)
{
# If an intermediate update for this value was sent recently, don't send
# it again.
if ( [ss$id, key] in recent_global_view_keys )
if ( [ss$name, key] in recent_global_view_keys )
return;
# If val is 5 and global view % is 0.1 (10%), pct_val will be 50. If that
# crosses the full threshold then it's a candidate to send as an
# intermediate update.
if ( enable_intermediate_updates &&
check_thresholds(ss, key, result, cluster_request_global_view_percent) )
if ( check_thresholds(ss, key, result, cluster_request_global_view_percent) )
{
# kick off intermediate update
event SumStats::cluster_key_intermediate_response(ss$id, key);
++recent_global_view_keys[ss$id, key];
event SumStats::cluster_key_intermediate_response(ss$name, key);
++recent_global_view_keys[ss$name, key];
}
}
event SumStats::send_data(uid: string, ssid: string, data: ResultTable)
event SumStats::send_data(uid: string, ss_name: string, data: ResultTable, cleanup: bool)
{
#print fmt("WORKER %s: sending data for uid %s...", Cluster::node, uid);
local local_data: ResultTable = table();
local incoming_data: ResultTable = cleanup ? data : copy(data);
local num_added = 0;
for ( key in data )
for ( key in incoming_data )
{
local_data[key] = data[key];
delete data[key];
local_data[key] = incoming_data[key];
delete incoming_data[key];
# Only send cluster_send_in_groups_of at a time. Queue another
# event to send the next group.
@ -121,56 +110,56 @@ event SumStats::send_data(uid: string, ssid: string, data: ResultTable)
local done = F;
# If data is empty, this sumstat is done.
if ( |data| == 0 )
if ( |incoming_data| == 0 )
done = T;
event SumStats::cluster_ss_response(uid, ssid, local_data, done);
event SumStats::cluster_ss_response(uid, ss_name, local_data, done);
if ( ! done )
schedule 0.01 sec { SumStats::send_data(uid, ssid, data) };
schedule 0.01 sec { SumStats::send_data(uid, ss_name, incoming_data, T) };
}
event SumStats::cluster_ss_request(uid: string, ssid: string)
event SumStats::cluster_ss_request(uid: string, ss_name: string, cleanup: bool)
{
#print fmt("WORKER %s: received the cluster_ss_request event for %s.", Cluster::node, id);
# Initiate sending all of the data for the requested stats.
if ( ssid in result_store )
event SumStats::send_data(uid, ssid, result_store[ssid]);
if ( ss_name in result_store )
event SumStats::send_data(uid, ss_name, result_store[ss_name], cleanup);
else
event SumStats::send_data(uid, ssid, table());
event SumStats::send_data(uid, ss_name, table(), cleanup);
# Lookup the actual sumstats and reset it, the reference to the data
# currently stored will be maintained internally by the send_data event.
if ( ssid in stats_store )
reset(stats_store[ssid]);
if ( ss_name in stats_store && cleanup )
reset(stats_store[ss_name]);
}
event SumStats::cluster_key_request(uid: string, ssid: string, key: Key)
event SumStats::cluster_key_request(uid: string, ss_name: string, key: Key, cleanup: bool)
{
if ( ssid in result_store && key in result_store[ssid] )
if ( ss_name in result_store && key in result_store[ss_name] )
{
#print fmt("WORKER %s: received the cluster_key_request event for %s=%s.", Cluster::node, key2str(key), data);
event SumStats::cluster_key_response(uid, ssid, key, result_store[ssid][key]);
event SumStats::cluster_key_response(uid, ss_name, key, result_store[ss_name][key], cleanup);
}
else
{
# We need to send an empty response if we don't have the data so that the manager
# can know that it heard back from all of the workers.
event SumStats::cluster_key_response(uid, ssid, key, table());
event SumStats::cluster_key_response(uid, ss_name, key, table(), cleanup);
}
}
event SumStats::cluster_threshold_crossed(ssid: string, key: SumStats::Key, thold: Thresholding)
event SumStats::cluster_threshold_crossed(ss_name: string, key: SumStats::Key, thold: Thresholding)
{
if ( ssid !in threshold_tracker )
threshold_tracker[ssid] = table();
if ( ss_name !in threshold_tracker )
threshold_tracker[ss_name] = table();
threshold_tracker[ssid][key] = thold;
threshold_tracker[ss_name][key] = thold;
}
event SumStats::thresholds_reset(ssid: string)
event SumStats::thresholds_reset(ss_name: string)
{
threshold_tracker[ssid] = table();
threshold_tracker[ss_name] = table();
}
@endif
@ -181,7 +170,7 @@ event SumStats::thresholds_reset(ssid: string)
# This variable is maintained by manager nodes as they collect and aggregate
# results.
# Index on a uid.
global stats_results: table[string] of ResultTable &read_expire=1min;
global stats_results: table[string] of ResultTable &create_expire=1min &default=table();
# This variable is maintained by manager nodes to track how many "dones" they
# collected per collection unique id. Once the number of results for a uid
@ -189,18 +178,18 @@ global stats_results: table[string] of ResultTable &read_expire=1min;
# result is written out and deleted from here.
# Indexed on a uid.
# TODO: add an &expire_func in case not all results are received.
global done_with: table[string] of count &read_expire=1min &default=0;
global done_with: table[string] of count &create_expire=1min &default=0;
# This variable is maintained by managers to track intermediate responses as
# they are getting a global view for a certain key.
# Indexed on a uid.
global key_requests: table[string] of Result &read_expire=1min;
global key_requests: table[string] of Result &create_expire=1min;
# This variable is maintained by managers to prevent overwhelming communication due
# to too many intermediate updates. Each sumstat is tracked separately so that
# one won't overwhelm and degrade other quieter sumstats.
# Indexed on a sumstat id.
global outstanding_global_views: table[string] of count &default=0;
global outstanding_global_views: table[string] of count &create_expire=1min &default=0;
const zero_time = double_to_time(0.0);
# Managers handle logging.
@ -216,7 +205,7 @@ event SumStats::finish_epoch(ss: SumStat)
stats_results[uid] = table();
# Request data from peers.
event SumStats::cluster_ss_request(uid, ss$id);
event SumStats::cluster_ss_request(uid, ss$name, T);
}
# Schedule the next finish_epoch event.
@ -230,20 +219,20 @@ function data_added(ss: SumStat, key: Key, result: Result)
if ( check_thresholds(ss, key, result, 1.0) )
{
threshold_crossed(ss, key, result);
event SumStats::cluster_threshold_crossed(ss$id, key, threshold_tracker[ss$id][key]);
event SumStats::cluster_threshold_crossed(ss$name, key, threshold_tracker[ss$name][key]);
}
}
event SumStats::cluster_key_response(uid: string, ssid: string, key: Key, result: Result)
event SumStats::cluster_key_response(uid: string, ss_name: string, key: Key, result: Result, cleanup: bool)
{
#print fmt("%0.6f MANAGER: receiving key data from %s - %s=%s", network_time(), get_event_peer()$descr, key2str(key), result);
# We only want to try and do a value merge if there are actually measured datapoints
# in the Result.
if ( uid in key_requests )
key_requests[uid] = compose_results(key_requests[uid], result);
else
if ( uid !in key_requests || |key_requests[uid]| == 0 )
key_requests[uid] = result;
else
key_requests[uid] = compose_results(key_requests[uid], result);
# Mark that a worker is done.
++done_with[uid];
@ -251,30 +240,39 @@ event SumStats::cluster_key_response(uid: string, ssid: string, key: Key, result
#print fmt("worker_count:%d :: done_with:%d", Cluster::worker_count, done_with[uid]);
if ( Cluster::worker_count == done_with[uid] )
{
local ss = stats_store[ssid];
local ss = stats_store[ss_name];
local ir = key_requests[uid];
if ( check_thresholds(ss, key, ir, 1.0) )
{
threshold_crossed(ss, key, ir);
event SumStats::cluster_threshold_crossed(ss$id, key, threshold_tracker[ss$id][key]);
event SumStats::cluster_threshold_crossed(ss$name, key, threshold_tracker[ss$name][key]);
}
delete done_with[uid];
delete key_requests[uid];
# Check that there is an outstanding view before subtracting.
if ( outstanding_global_views[ssid] > 0 )
--outstanding_global_views[ssid];
if ( cleanup )
{
# We only want to delete the data if this is a non dynamic
# request because the dynamic requests use when statements
# and the data needs to remain available.
delete key_requests[uid];
delete done_with[uid];
# Check that there is an outstanding view before subtracting.
# Global views only apply to non-dynamic requests. Dynamic
# requests must be serviced.
if ( outstanding_global_views[ss_name] > 0 )
--outstanding_global_views[ss_name];
}
}
}
# Managers handle intermediate updates here.
event SumStats::cluster_key_intermediate_response(ssid: string, key: Key)
event SumStats::cluster_key_intermediate_response(ss_name: string, key: Key)
{
#print fmt("MANAGER: receiving intermediate key data from %s", get_event_peer()$descr);
#print fmt("MANAGER: requesting key data for %s", key2str(key));
if ( ssid in outstanding_global_views &&
|outstanding_global_views[ssid]| > max_outstanding_global_views )
if ( ss_name in outstanding_global_views &&
|outstanding_global_views[ss_name]| > max_outstanding_global_views )
{
# Don't do this intermediate update. Perhaps at some point in the future
# we will queue and randomly select from these ignored intermediate
@ -282,13 +280,14 @@ event SumStats::cluster_key_intermediate_response(ssid: string, key: Key)
return;
}
++outstanding_global_views[ssid];
++outstanding_global_views[ss_name];
local uid = unique_id("");
event SumStats::cluster_key_request(uid, ssid, key);
done_with[uid] = 0;
event SumStats::cluster_key_request(uid, ss_name, key, T);
}
event SumStats::cluster_ss_response(uid: string, ssid: string, data: ResultTable, done: bool)
event SumStats::cluster_ss_response(uid: string, ss_name: string, data: ResultTable, done: bool)
{
#print fmt("MANAGER: receiving results from %s", get_event_peer()$descr);
@ -297,7 +296,7 @@ event SumStats::cluster_ss_response(uid: string, ssid: string, data: ResultTable
++done_with[uid];
local local_data = stats_results[uid];
local ss = stats_store[ssid];
local ss = stats_store[ss_name];
for ( key in data )
{
@ -314,13 +313,14 @@ event SumStats::cluster_ss_response(uid: string, ssid: string, data: ResultTable
if ( check_thresholds(ss, key, local_data[key], 1.0) )
{
threshold_crossed(ss, key, local_data[key]);
event SumStats::cluster_threshold_crossed(ss$id, key, threshold_tracker[ss$id][key]);
event SumStats::cluster_threshold_crossed(ss$name, key, threshold_tracker[ss$name][key]);
}
}
}
# If the data has been collected from all peers, we are done and ready to finish.
if ( Cluster::worker_count == done_with[uid] )
if ( Cluster::worker_count == done_with[uid] &&
/^dyn-/ !in uid )
{
if ( ss?$epoch_finished )
ss$epoch_finished(local_data);
@ -328,14 +328,60 @@ event SumStats::cluster_ss_response(uid: string, ssid: string, data: ResultTable
# Clean up
delete stats_results[uid];
delete done_with[uid];
# Not sure I need to reset the sumstat on the manager.
reset(ss);
}
}
event remote_connection_handshake_done(p: event_peer) &priority=5
function request(ss_name: string): ResultTable
{
send_id(p, "SumStats::stats_store");
send_id(p, "SumStats::reducer_store");
# This only needs to be implemented this way for cluster compatibility.
local uid = unique_id("dyn-");
stats_results[uid] = table();
done_with[uid] = 0;
event SumStats::cluster_ss_request(uid, ss_name, F);
return when ( uid in done_with && Cluster::worker_count == done_with[uid] )
{
if ( uid in stats_results )
{
local ss_result = stats_results[uid];
# Clean up
delete stats_results[uid];
delete done_with[uid];
reset(stats_store[ss_name]);
return ss_result;
}
else
return table();
}
timeout 1.1min
{
Reporter::warning(fmt("Dynamic SumStat request for %s took longer than 1 minute and was automatically cancelled.", ss_name));
return table();
}
}
function request_key(ss_name: string, key: Key): Result
{
local uid = unique_id("dyn-");
done_with[uid] = 0;
key_requests[uid] = table();
event SumStats::cluster_key_request(uid, ss_name, key, F);
return when ( uid in done_with && Cluster::worker_count == done_with[uid] )
{
local result = key_requests[uid];
# Clean up
delete key_requests[uid];
delete done_with[uid];
return result;
}
timeout 1.1min
{
Reporter::warning(fmt("Dynamic SumStat key request for %s (%s) took longer than 1 minute and was automatically cancelled.", ss_name, key));
return table();
}
}
@endif

View file

@ -87,6 +87,10 @@ export {
## is no assurance provided as to where the callbacks
## will be executed on clusters.
type SumStat: record {
## An arbitrary name for the sumstat so that it can
## be referred to later.
name: string;
## The interval at which this filter should be "broken"
## and the '$epoch_finished' callback called. The
## results are also reset at this time so any threshold
@ -102,22 +106,22 @@ export {
## :bro:see:`Result` structure which will be used
## for thresholding.
## This is required if a $threshold value is given.
threshold_val: function(key: SumStats::Key, result: SumStats::Result): count &optional;
threshold_val: function(key: SumStats::Key, result: SumStats::Result): double &optional;
## The threshold value for calling the
## $threshold_crossed callback.
threshold: count &optional;
threshold: double &optional;
## A series of thresholds for calling the
## $threshold_crossed callback.
threshold_series: vector of count &optional;
threshold_series: vector of double &optional;
## A callback that is called when a threshold is crossed.
threshold_crossed: function(key: SumStats::Key, result: SumStats::Result) &optional;
## A callback with the full collection of Results for
## this SumStat.
epoch_finished: function(rt: SumStats::ResultTable) &optional;
epoch_finished: function(rt: SumStats::ResultTable) &optional;
};
## Create a summary statistic.
@ -134,19 +138,37 @@ export {
## obs: The data point to send into the stream.
global observe: function(id: string, key: SumStats::Key, obs: SumStats::Observation);
## This record is primarily used for internal threshold tracking.
type Thresholding: record {
# Internal use only. Indicates if a simple threshold was already crossed.
is_threshold_crossed: bool &default=F;
## Dynamically request a sumstat. This function should be
## used sparingly and not as a replacement for the callbacks
## from the :bro:see:`SumStat` record. The function is only
## available for use within "when" statements as an asynchronous
## function.
##
## ss_name: SumState name.
##
## Returns: The result table for the requested sumstat.
global request: function(ss_name: string): ResultTable;
# Internal use only. Current key for threshold series.
threshold_series_index: count &default=0;
};
## Dynamically request a sumstat key. This function should be
## used sparingly and not as a replacement for the callbacks
## from the :bro:see:`SumStat` record. The function is only
## available for use within "when" statements as an asynchronous
## function.
##
## ss_name: SumStat name.
##
## key: The SumStat key being requested.
##
## Returns: The result for the requested sumstat key.
global request_key: function(ss_name: string, key: Key): Result;
## This record is primarily used for internal threshold tracking.
type Thresholding: record {};
## This event is generated when thresholds are reset for a SumStat.
##
## ssid: SumStats ID that thresholds were reset for.
global thresholds_reset: event(ssid: string);
## name: SumStats name that thresholds were reset for.
global thresholds_reset: event(name: string);
## Helper function to represent a :bro:type:`SumStats::Key` value as
## a simple string.
@ -157,19 +179,43 @@ export {
global key2str: function(key: SumStats::Key): string;
}
# The function prototype for plugins to do calculations.
type ObserveFunc: function(r: Reducer, val: double, data: Observation, rv: ResultVal);
redef record Reducer += {
# Internal use only. Provides a reference back to the related SumStats by it's ID.
sid: string &optional;
# Internal use only. Provides a reference back to the related SumStats by its name.
ssname: string &optional;
calc_funcs: vector of Calculation &optional;
};
redef record Thresholding += {
# Internal use only. Indicates if a simple threshold was already crossed.
is_threshold_crossed: bool &default=F;
# Internal use only. Current key for threshold series.
threshold_series_index: count &default=0;
};
# Internal use only. For tracking thresholds per sumstat and key.
global threshold_tracker: table[string] of table[Key] of Thresholding &optional;
redef record SumStat += {
# Internal use only.
ssname: string &optional;
# Internal use only (mostly for cluster coherency).
id: string &optional;
};
# Prototype the hook point for plugins to initialize any result values.
global init_resultval_hook: hook(r: Reducer, rv: ResultVal);
# Prototype the hook point for plugins to merge Results.
global compose_resultvals_hook: hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal);
# Store of sumstats indexed on the sumstat id.
global stats_store: table[string] of SumStat = table();
@ -182,20 +228,20 @@ global result_store: table[string] of ResultTable = table();
# Store of threshold information.
global thresholds_store: table[string, Key] of bool = table();
# Store the calculations.
global calc_store: table[Calculation] of ObserveFunc = table();
# Store the dependencies for Calculations.
global calc_deps: table[Calculation] of vector of Calculation = table();
# Hook for registering observation calculation plugins.
global register_observe_plugins: hook();
# This is called whenever key values are updated and the new val is given as the
# `val` argument. It's only prototyped here because cluster and non-cluster have
# separate implementations.
global data_added: function(ss: SumStat, key: Key, result: Result);
# Prototype the hook point for plugins to do calculations.
global observe_hook: hook(r: Reducer, val: double, data: Observation, rv: ResultVal);
# Prototype the hook point for plugins to initialize any result values.
global init_resultval_hook: hook(r: Reducer, rv: ResultVal);
# Prototype the hook point for plugins to merge Results.
global compose_resultvals_hook: hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal);
# Event that is used to "finish" measurements and adapt the measurement
# framework for clustered or non-clustered usage.
global finish_epoch: event(ss: SumStat);
@ -210,6 +256,24 @@ function key2str(key: Key): string
return fmt("sumstats_key(%s)", out);
}
function register_observe_plugin(calc: Calculation, func: ObserveFunc)
{
calc_store[calc] = func;
}
function add_observe_plugin_dependency(calc: Calculation, depends_on: Calculation)
{
if ( calc !in calc_deps )
calc_deps[calc] = vector();
calc_deps[calc][|calc_deps[calc]|] = depends_on;
}
event bro_init() &priority=100000
{
# Call all of the plugin registration hooks
hook register_observe_plugins();
}
function init_resultval(r: Reducer): ResultVal
{
local rv: ResultVal = [$begin=network_time(), $end=network_time()];
@ -234,25 +298,17 @@ function compose_results(r1: Result, r2: Result): Result
{
local result: Result = table();
if ( |r1| > |r2| )
for ( id in r1 )
{
for ( data_id in r1 )
{
if ( data_id in r2 )
result[data_id] = compose_resultvals(r1[data_id], r2[data_id]);
else
result[data_id] = r1[data_id];
}
result[id] = r1[id];
}
else
for ( id in r2 )
{
for ( data_id in r2 )
{
if ( data_id in r1 )
result[data_id] = compose_resultvals(r1[data_id], r2[data_id]);
else
result[data_id] = r2[data_id];
}
if ( id in r1 )
result[id] = compose_resultvals(r1[id], r2[id]);
else
result[id] = r2[id];
}
return result;
@ -261,18 +317,42 @@ function compose_results(r1: Result, r2: Result): Result
function reset(ss: SumStat)
{
if ( ss$id in result_store )
delete result_store[ss$id];
if ( ss$name in result_store )
delete result_store[ss$name];
result_store[ss$id] = table();
result_store[ss$name] = table();
if ( ss?$threshold || ss?$threshold_series )
{
threshold_tracker[ss$id] = table();
event SumStats::thresholds_reset(ss$id);
threshold_tracker[ss$name] = table();
event SumStats::thresholds_reset(ss$name);
}
}
# This could potentially recurse forever, but plugin authors
# should be making sure they aren't causing reflexive dependencies.
function add_calc_deps(calcs: vector of Calculation, c: Calculation)
{
#print fmt("Checking for deps for %s", c);
for ( i in calc_deps[c] )
{
local skip_calc=F;
for ( j in calcs )
{
if ( calcs[j] == calc_deps[c][i] )
skip_calc=T;
}
if ( ! skip_calc )
{
if ( calc_deps[c][i] in calc_deps )
add_calc_deps(calcs, calc_deps[c][i]);
calcs[|c|] = calc_deps[c][i];
#print fmt("add dep for %s [%s] ", c, calc_deps[c][i]);
}
}
}
function create(ss: SumStat)
{
if ( (ss?$threshold || ss?$threshold_series) && ! ss?$threshold_val )
@ -280,14 +360,32 @@ function create(ss: SumStat)
Reporter::error("SumStats given a threshold with no $threshold_val function");
}
if ( ! ss?$id )
ss$id=unique_id("");
threshold_tracker[ss$id] = table();
stats_store[ss$id] = ss;
threshold_tracker[ss$name] = table();
stats_store[ss$name] = ss;
for ( reducer in ss$reducers )
{
reducer$sid = ss$id;
reducer$ssname = ss$name;
reducer$calc_funcs = vector();
for ( calc in reducer$apply )
{
# Add in dependencies recursively.
if ( calc in calc_deps )
add_calc_deps(reducer$calc_funcs, calc);
# Don't add this calculation to the vector if
# it was already added by something else as a
# dependency.
local skip_calc=F;
for ( j in reducer$calc_funcs )
{
if ( calc == reducer$calc_funcs[j] )
skip_calc=T;
}
if ( ! skip_calc )
reducer$calc_funcs[|reducer$calc_funcs|] = calc;
}
if ( reducer$stream !in reducer_store )
reducer_store[reducer$stream] = set();
add reducer_store[reducer$stream][reducer];
@ -313,7 +411,7 @@ function observe(id: string, key: Key, obs: Observation)
if ( r?$pred && ! r$pred(key, obs) )
next;
local ss = stats_store[r$sid];
local ss = stats_store[r$ssname];
# If there is a threshold and no epoch_finished callback
# we don't need to continue counting since the data will
@ -324,17 +422,21 @@ function observe(id: string, key: Key, obs: Observation)
# future if on demand access is provided to the
# SumStats results.
if ( ! ss?$epoch_finished &&
r$sid in threshold_tracker &&
key in threshold_tracker[r$sid] &&
( ss?$threshold &&
threshold_tracker[r$sid][key]$is_threshold_crossed ) ||
r$ssname in threshold_tracker &&
key in threshold_tracker[r$ssname] &&
threshold_tracker[r$ssname][key]$is_threshold_crossed ) ||
( ss?$threshold_series &&
threshold_tracker[r$sid][key]$threshold_series_index+1 == |ss$threshold_series| ) )
r$ssname in threshold_tracker &&
key in threshold_tracker[r$ssname] &&
threshold_tracker[r$ssname][key]$threshold_series_index == |ss$threshold_series| ) )
{
next;
}
if ( r$sid !in result_store )
result_store[ss$id] = table();
local results = result_store[r$sid];
if ( r$ssname !in result_store )
result_store[r$ssname] = table();
local results = result_store[r$ssname];
if ( key !in results )
results[key] = table();
@ -350,10 +452,13 @@ function observe(id: string, key: Key, obs: Observation)
# If a string was given, fall back to 1.0 as the value.
local val = 1.0;
if ( obs?$num || obs?$dbl )
val = obs?$dbl ? obs$dbl : obs$num;
if ( obs?$num )
val = obs$num;
else if ( obs?$dbl )
val = obs$dbl;
hook observe_hook(r, val, obs, result_val);
for ( i in r$calc_funcs )
calc_store[r$calc_funcs[i]](r, val, obs, result_val);
data_added(ss, key, result);
}
}
@ -366,6 +471,8 @@ function check_thresholds(ss: SumStat, key: Key, result: Result, modify_pct: dou
return F;
# Add in the extra ResultVals to make threshold_vals easier to write.
# This length comparison should work because we just need to make
# sure that we have the same number of reducers and results.
if ( |ss$reducers| != |result| )
{
for ( reducer in ss$reducers )
@ -378,11 +485,11 @@ function check_thresholds(ss: SumStat, key: Key, result: Result, modify_pct: dou
local watch = ss$threshold_val(key, result);
if ( modify_pct < 1.0 && modify_pct > 0.0 )
watch = double_to_count(floor(watch/modify_pct));
watch = watch/modify_pct;
if ( ss$id !in threshold_tracker )
threshold_tracker[ss$id] = table();
local t_tracker = threshold_tracker[ss$id];
if ( ss$name !in threshold_tracker )
threshold_tracker[ss$name] = table();
local t_tracker = threshold_tracker[ss$name];
if ( key !in t_tracker )
{
@ -398,7 +505,7 @@ function check_thresholds(ss: SumStat, key: Key, result: Result, modify_pct: dou
}
if ( ss?$threshold_series &&
|ss$threshold_series| >= tt$threshold_series_index &&
|ss$threshold_series| > tt$threshold_series_index &&
watch >= ss$threshold_series[tt$threshold_series_index] )
{
# A threshold series was given and the value crossed the next
@ -426,7 +533,7 @@ function threshold_crossed(ss: SumStat, key: Key, result: Result)
}
ss$threshold_crossed(key, result);
local tt = threshold_tracker[ss$id][key];
local tt = threshold_tracker[ss$name][key];
tt$is_threshold_crossed = T;
# Bump up to the next threshold series index if a threshold series is being used.

View file

@ -4,9 +4,9 @@ module SumStats;
event SumStats::finish_epoch(ss: SumStat)
{
if ( ss$id in result_store )
if ( ss$name in result_store )
{
local data = result_store[ss$id];
local data = result_store[ss$name];
if ( ss?$epoch_finished )
ss$epoch_finished(data);
@ -16,9 +16,32 @@ event SumStats::finish_epoch(ss: SumStat)
schedule ss$epoch { SumStats::finish_epoch(ss) };
}
function data_added(ss: SumStat, key: Key, result: Result)
{
if ( check_thresholds(ss, key, result, 1.0) )
threshold_crossed(ss, key, result);
}
function request(ss_name: string): ResultTable
{
# This only needs to be implemented this way for cluster compatibility.
return when ( T )
{
if ( ss_name in result_store )
return result_store[ss_name];
else
return table();
}
}
function request_key(ss_name: string, key: Key): Result
{
# This only needs to be implemented this way for cluster compatibility.
return when ( T )
{
if ( ss_name in result_store && key in result_store[ss_name] )
return result_store[ss_name][key];
else
return table();
}
}

View file

@ -1,4 +1,4 @@
@load base/frameworks/sumstats
@load ../main
module SumStats;
@ -14,17 +14,18 @@ export {
};
}
hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal)
hook register_observe_plugins()
{
if ( AVERAGE in r$apply )
register_observe_plugin(AVERAGE, function(r: Reducer, val: double, obs: Observation, rv: ResultVal)
{
if ( ! rv?$average )
rv$average = val;
else
rv$average += (val - rv$average) / rv$num;
}
});
}
hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal)
{
if ( rv1?$average && rv2?$average )

View file

@ -1,4 +1,4 @@
@load base/frameworks/sumstats
@load ../main
module SumStats;
@ -14,15 +14,15 @@ export {
};
}
hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal)
hook register_observe_plugins()
{
if ( MAX in r$apply )
register_observe_plugin(MAX, function(r: Reducer, val: double, obs: Observation, rv: ResultVal)
{
if ( ! rv?$max )
rv$max = val;
else if ( val > rv$max )
rv$max = val;
}
});
}
hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal)

View file

@ -1,4 +1,4 @@
@load base/frameworks/sumstats
@load ../main
module SumStats;
@ -14,17 +14,18 @@ export {
};
}
hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal)
hook register_observe_plugins()
{
if ( MIN in r$apply )
register_observe_plugin(MIN, function(r: Reducer, val: double, obs: Observation, rv: ResultVal)
{
if ( ! rv?$min )
rv$min = val;
else if ( val < rv$min )
rv$min = val;
}
});
}
hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal)
{
if ( rv1?$min && rv2?$min )

View file

@ -1,9 +1,14 @@
@load base/frameworks/sumstats
@load base/utils/queue
@load ../main
module SumStats;
export {
redef enum Calculation += {
## Collect a sample of the last few observations.
SAMPLE
};
redef record Reducer += {
## A number of sample Observations to collect.
samples: count &default=0;
@ -27,14 +32,14 @@ function get_samples(rv: ResultVal): vector of Observation
return s;
}
hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal)
hook register_observe_plugins()
{
if ( r$samples > 0 )
register_observe_plugin(SAMPLE, function(r: Reducer, val: double, obs: Observation, rv: ResultVal)
{
if ( ! rv?$samples )
rv$samples = Queue::init([$max_len=r$samples]);
Queue::put(rv$samples, obs);
}
});
}
hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal)

View file

@ -1,5 +1,5 @@
@load ./variance
@load base/frameworks/sumstats
@load ../main
module SumStats;
@ -21,11 +21,18 @@ function calc_std_dev(rv: ResultVal)
rv$std_dev = sqrt(rv$variance);
}
# This depends on the variance plugin which uses priority -5
hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal) &priority=-10
hook std_dev_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal)
{
if ( STD_DEV in r$apply )
calc_std_dev(rv);
}
hook register_observe_plugins() &priority=-10
{
register_observe_plugin(STD_DEV, function(r: Reducer, val: double, obs: Observation, rv: ResultVal)
{
calc_std_dev(rv);
});
add_observe_plugin_dependency(STD_DEV, VARIANCE);
}
hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal) &priority=-10

View file

@ -1,4 +1,4 @@
@load base/frameworks/sumstats
@load ../main
module SumStats;
@ -14,19 +14,19 @@ export {
sum: double &default=0.0;
};
type threshold_function: function(key: SumStats::Key, result: SumStats::Result): count;
global sum_threshold: function(data_id: string): threshold_function;
#type threshold_function: function(key: SumStats::Key, result: SumStats::Result): count;
#global sum_threshold: function(data_id: string): threshold_function;
}
function sum_threshold(data_id: string): threshold_function
{
return function(key: SumStats::Key, result: SumStats::Result): count
{
print fmt("data_id: %s", data_id);
print result;
return double_to_count(result[data_id]$sum);
};
}
#function sum_threshold(data_id: string): threshold_function
# {
# return function(key: SumStats::Key, result: SumStats::Result): count
# {
# print fmt("data_id: %s", data_id);
# print result;
# return double_to_count(result[data_id]$sum);
# };
# }
hook init_resultval_hook(r: Reducer, rv: ResultVal)
{
@ -34,10 +34,12 @@ hook init_resultval_hook(r: Reducer, rv: ResultVal)
rv$sum = 0;
}
hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal)
hook register_observe_plugins()
{
if ( SUM in r$apply )
register_observe_plugin(SUM, function(r: Reducer, val: double, obs: Observation, rv: ResultVal)
{
rv$sum += val;
});
}
hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal)

View file

@ -1,4 +1,4 @@
@load base/frameworks/sumstats
@load ../main
module SumStats;
@ -23,15 +23,15 @@ redef record ResultVal += {
unique_vals: set[Observation] &optional;
};
hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal)
hook register_observe_plugins()
{
if ( UNIQUE in r$apply )
register_observe_plugin(UNIQUE, function(r: Reducer, val: double, obs: Observation, rv: ResultVal)
{
if ( ! rv?$unique_vals )
rv$unique_vals=set();
add rv$unique_vals[obs];
rv$unique = |rv$unique_vals|;
}
});
}
hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal)

View file

@ -1,5 +1,5 @@
@load ./average
@load base/frameworks/sumstats
@load ../main
module SumStats;
@ -28,17 +28,17 @@ function calc_variance(rv: ResultVal)
rv$variance = (rv$num > 1) ? rv$var_s/(rv$num-1) : 0.0;
}
# Reduced priority since this depends on the average
hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal) &priority=-5
hook register_observe_plugins() &priority=-5
{
if ( VARIANCE in r$apply )
register_observe_plugin(VARIANCE, function(r: Reducer, val: double, obs: Observation, rv: ResultVal)
{
if ( rv$num > 1 )
rv$var_s += ((val - rv$prev_avg) * (val - rv$average));
calc_variance(rv);
rv$prev_avg = rv$average;
}
});
add_observe_plugin_dependency(VARIANCE, AVERAGE);
}
# Reduced priority since this depends on the average

View file

@ -0,0 +1,2 @@
@load ./main
@load ./plugins

View file

@ -1,3 +1,6 @@
#! AppStats collects information about web applications in use
#! on the network.
@load base/protocols/http
@load base/protocols/ssl
@load base/frameworks/sumstats
@ -30,13 +33,17 @@ redef record connection += {
resp_hostname: string &optional;
};
global add_sumstats: hook(id: conn_id, hostname: string, size: count);
event bro_init() &priority=3
{
Log::create_stream(AppStats::LOG, [$columns=Info]);
local r1: SumStats::Reducer = [$stream="apps.bytes", $apply=set(SumStats::SUM)];
local r2: SumStats::Reducer = [$stream="apps.hits", $apply=set(SumStats::UNIQUE)];
SumStats::create([$epoch=break_interval,
SumStats::create([$name="app-metrics",
$epoch=break_interval,
$reducers=set(r1, r2),
$epoch_finished(data: SumStats::ResultTable) =
{
@ -55,41 +62,6 @@ event bro_init() &priority=3
}]);
}
function add_sumstats(id: conn_id, hostname: string, size: count)
{
if ( /\.youtube\.com$/ in hostname && size > 512*1024 )
{
SumStats::observe("apps.bytes", [$str="youtube"], [$num=size]);
SumStats::observe("apps.hits", [$str="youtube"], [$str=cat(id$orig_h)]);
}
else if ( /(\.facebook\.com|\.fbcdn\.net)$/ in hostname && size > 20 )
{
SumStats::observe("apps.bytes", [$str="facebook"], [$num=size]);
SumStats::observe("apps.hits", [$str="facebook"], [$str=cat(id$orig_h)]);
}
else if ( /\.google\.com$/ in hostname && size > 20 )
{
SumStats::observe("apps.bytes", [$str="google"], [$num=size]);
SumStats::observe("apps.hits", [$str="google"], [$str=cat(id$orig_h)]);
}
else if ( /\.nflximg\.com$/ in hostname && size > 200*1024 )
{
SumStats::observe("apps.bytes", [$str="netflix"], [$num=size]);
SumStats::observe("apps.hits", [$str="netflix"], [$str=cat(id$orig_h)]);
}
else if ( /\.(pandora|p-cdn)\.com$/ in hostname && size > 512*1024 )
{
SumStats::observe("apps.bytes", [$str="pandora"], [$num=size]);
SumStats::observe("apps.hits", [$str="pandora"], [$str=cat(id$orig_h)]);
}
else if ( /\.gmail\.com$/ in hostname && size > 20 )
{
SumStats::observe("apps.bytes", [$str="gmail"], [$num=size]);
SumStats::observe("apps.hits", [$str="gmail"], [$str=cat(id$orig_h)]);
}
}
event ssl_established(c: connection)
{
if ( c?$ssl && c$ssl?$server_name )
@ -99,11 +71,11 @@ event ssl_established(c: connection)
event connection_finished(c: connection)
{
if ( c?$resp_hostname )
add_sumstats(c$id, c$resp_hostname, c$resp$size);
hook add_sumstats(c$id, c$resp_hostname, c$resp$size);
}
event HTTP::log_http(rec: HTTP::Info)
{
if( rec?$host )
add_sumstats(rec$id, rec$host, rec$response_body_len);
hook add_sumstats(rec$id, rec$host, rec$response_body_len);
}

View file

@ -0,0 +1,6 @@
@load ./facebook
@load ./gmail
@load ./google
@load ./netflix
@load ./pandora
@load ./youtube

View file

@ -0,0 +1,12 @@
@load ../main
module AppStats;
hook add_sumstats(id: conn_id, hostname: string, size: count)
{
if ( /\.(facebook\.com|fbcdn\.net)$/ in hostname && size > 20 )
{
SumStats::observe("apps.bytes", [$str="facebook"], [$num=size]);
SumStats::observe("apps.hits", [$str="facebook"], [$str=cat(id$orig_h)]);
}
}

View file

@ -0,0 +1,12 @@
@load ../main
module AppStats;
hook add_sumstats(id: conn_id, hostname: string, size: count)
{
if ( /\.gmail\.com$/ in hostname && size > 20 )
{
SumStats::observe("apps.bytes", [$str="gmail"], [$num=size]);
SumStats::observe("apps.hits", [$str="gmail"], [$str=cat(id$orig_h)]);
}
}

View file

@ -0,0 +1,12 @@
@load ../main
module AppStats;
hook add_sumstats(id: conn_id, hostname: string, size: count)
{
if ( /\.google\.com$/ in hostname && size > 20 )
{
SumStats::observe("apps.bytes", [$str="google"], [$num=size]);
SumStats::observe("apps.hits", [$str="google"], [$str=cat(id$orig_h)]);
}
}

View file

@ -0,0 +1,12 @@
@load ../main
module AppStats;
hook add_sumstats(id: conn_id, hostname: string, size: count)
{
if ( /\.nflximg\.com$/ in hostname && size > 200*1024 )
{
SumStats::observe("apps.bytes", [$str="netflix"], [$num=size]);
SumStats::observe("apps.hits", [$str="netflix"], [$str=cat(id$orig_h)]);
}
}

View file

@ -0,0 +1,12 @@
@load ../main
module AppStats;
hook add_sumstats(id: conn_id, hostname: string, size: count)
{
if ( /\.(pandora|p-cdn)\.com$/ in hostname && size > 512*1024 )
{
SumStats::observe("apps.bytes", [$str="pandora"], [$num=size]);
SumStats::observe("apps.hits", [$str="pandora"], [$str=cat(id$orig_h)]);
}
}

View file

@ -0,0 +1,12 @@
@load ../main
module AppStats;
hook add_sumstats(id: conn_id, hostname: string, size: count)
{
if ( /\.youtube\.com$/ in hostname && size > 512*1024 )
{
SumStats::observe("apps.bytes", [$str="youtube"], [$num=size]);
SumStats::observe("apps.hits", [$str="youtube"], [$str=cat(id$orig_h)]);
}
}

View file

@ -29,7 +29,7 @@ export {
## Defines the threshold for ICMP Time Exceeded messages for a src-dst pair.
## This threshold only comes into play after a host is found to be
## sending low ttl packets.
const icmp_time_exceeded_threshold = 3 &redef;
const icmp_time_exceeded_threshold: double = 3 &redef;
## Interval at which to watch for the
## :bro:id:`ICMPTimeExceeded::icmp_time_exceeded_threshold` variable to be crossed.
@ -57,16 +57,17 @@ event bro_init() &priority=5
local r1: SumStats::Reducer = [$stream="traceroute.time_exceeded", $apply=set(SumStats::UNIQUE)];
local r2: SumStats::Reducer = [$stream="traceroute.low_ttl_packet", $apply=set(SumStats::SUM)];
SumStats::create([$epoch=icmp_time_exceeded_interval,
SumStats::create([$name="traceroute-detection",
$epoch=icmp_time_exceeded_interval,
$reducers=set(r1, r2),
$threshold_val(key: SumStats::Key, result: SumStats::Result) =
{
# Give a threshold value of zero depending on if the host
# sends a low ttl packet.
if ( require_low_ttl_packets && result["traceroute.low_ttl_packet"]$sum == 0 )
return 0;
return 0.0;
else
return result["traceroute.time_exceeded"]$unique;
return result["traceroute.time_exceeded"]$unique+0;
},
$threshold=icmp_time_exceeded_threshold,
$threshold_crossed(key: SumStats::Key, result: SumStats::Result) =

View file

@ -39,15 +39,11 @@ export {
## The threshold of a unique number of hosts a scanning host has to have failed
## connections with on a single port.
const addr_scan_threshold = 25 &redef;
const addr_scan_threshold = 25.0 &redef;
## The threshold of a number of unique ports a scanning host has to have failed
## connections with on a single victim host.
const port_scan_threshold = 15 &redef;
## Custom thresholds based on service for address scan. This is primarily
## useful for setting reduced thresholds for specific ports.
const addr_scan_custom_thresholds: table[port] of count &redef;
const port_scan_threshold = 15.0 &redef;
global Scan::addr_scan_policy: hook(scanner: addr, victim: addr, scanned_port: port);
global Scan::port_scan_policy: hook(scanner: addr, victim: addr, scanned_port: port);
@ -56,11 +52,12 @@ export {
event bro_init() &priority=5
{
local r1: SumStats::Reducer = [$stream="scan.addr.fail", $apply=set(SumStats::UNIQUE)];
SumStats::create([$epoch=addr_scan_interval,
SumStats::create([$name="addr-scan",
$epoch=addr_scan_interval,
$reducers=set(r1),
$threshold_val(key: SumStats::Key, result: SumStats::Result) =
{
return double_to_count(result["scan.addr.fail"]$unique);
return result["scan.addr.fail"]$unique+0.0;
},
#$threshold_func=check_addr_scan_threshold,
$threshold=addr_scan_threshold,
@ -80,11 +77,12 @@ event bro_init() &priority=5
# Note: port scans are tracked similar to: table[src_ip, dst_ip] of set(port);
local r2: SumStats::Reducer = [$stream="scan.port.fail", $apply=set(SumStats::UNIQUE)];
SumStats::create([$epoch=port_scan_interval,
SumStats::create([$name="port-scan",
$epoch=port_scan_interval,
$reducers=set(r2),
$threshold_val(key: SumStats::Key, result: SumStats::Result) =
{
return double_to_count(result["scan.port.fail"]$unique);
return result["scan.port.fail"]$unique+0.0;
},
$threshold=port_scan_threshold,
$threshold_crossed(key: SumStats::Key, result: SumStats::Result) =

View file

@ -17,7 +17,7 @@ export {
## How many rejected usernames or passwords are required before being
## considered to be bruteforcing.
const bruteforce_threshold = 20 &redef;
const bruteforce_threshold: double = 20 &redef;
## The time period in which the threshold needs to be crossed before
## being reset.
@ -28,11 +28,12 @@ export {
event bro_init()
{
local r1: SumStats::Reducer = [$stream="ftp.failed_auth", $apply=set(SumStats::UNIQUE)];
SumStats::create([$epoch=bruteforce_measurement_interval,
SumStats::create([$name="ftp-detect-bruteforcing",
$epoch=bruteforce_measurement_interval,
$reducers=set(r1),
$threshold_val(key: SumStats::Key, result: SumStats::Result) =
{
return result["ftp.failed_auth"]$num;
return result["ftp.failed_auth"]$num+0.0;
},
$threshold=bruteforce_threshold,
$threshold_crossed(key: SumStats::Key, result: SumStats::Result) =

View file

@ -28,7 +28,7 @@ export {
## Defines the threshold that determines if an SQL injection attack
## is ongoing based on the number of requests that appear to be SQL
## injection attacks.
const sqli_requests_threshold = 50 &redef;
const sqli_requests_threshold: double = 50.0 &redef;
## Interval at which to watch for the
## :bro:id:`HTTP::sqli_requests_threshold` variable to be crossed.
@ -64,11 +64,12 @@ event bro_init() &priority=3
# determine when it looks like an actual attack and how to respond when
# thresholds are crossed.
local r1: SumStats::Reducer = [$stream="http.sqli.attacker", $apply=set(SumStats::SUM), $samples=collect_SQLi_samples];
SumStats::create([$epoch=sqli_requests_interval,
SumStats::create([$name="detect-sqli-attackers",
$epoch=sqli_requests_interval,
$reducers=set(r1),
$threshold_val(key: SumStats::Key, result: SumStats::Result) =
{
return double_to_count(result["http.sqli.attacker"]$sum);
return result["http.sqli.attacker"]$sum;
},
$threshold=sqli_requests_threshold,
$threshold_crossed(key: SumStats::Key, result: SumStats::Result) =
@ -82,11 +83,12 @@ event bro_init() &priority=3
}]);
local r2: SumStats::Reducer = [$stream="http.sqli.victim", $apply=set(SumStats::SUM), $samples=collect_SQLi_samples];
SumStats::create([$epoch=sqli_requests_interval,
SumStats::create([$name="detect-sqli-victims",
$epoch=sqli_requests_interval,
$reducers=set(r2),
$threshold_val(key: SumStats::Key, result: SumStats::Result) =
{
return double_to_count(result["http.sqli.victim"]$sum);
return result["http.sqli.victim"]$sum;
},
$threshold=sqli_requests_threshold,
$threshold_crossed(key: SumStats::Key, result: SumStats::Result) =

View file

@ -27,7 +27,7 @@ export {
## The number of failed SSH connections before a host is designated as
## guessing passwords.
const password_guesses_limit = 30 &redef;
const password_guesses_limit: double = 30 &redef;
## The amount of time to remember presumed non-successful logins to build
## model of a password guesser.
@ -43,11 +43,12 @@ export {
event bro_init()
{
local r1: SumStats::Reducer = [$stream="ssh.login.failure", $apply=set(SumStats::SUM)];
SumStats::create([$epoch=guessing_timeout,
SumStats::create([$name="detect-ssh-bruteforcing",
$epoch=guessing_timeout,
$reducers=set(r1),
$threshold_val(key: SumStats::Key, result: SumStats::Result) =
{
return double_to_count(result["ssh.login.failure"]$sum);
return result["ssh.login.failure"]$sum;
},
$threshold=password_guesses_limit,
$threshold_crossed(key: SumStats::Key, result: SumStats::Result) =

View file

@ -11,6 +11,13 @@
# Load the scan detection script.
@load misc/scan
# Log some information about web applications being used by users
# on your network.
@load misc/app-metrics
# Detect traceroute being run on the network.
@load misc/detect-traceroute
# Generate notices when vulnerable versions of software are discovered.
# The default is to only monitor software found in the address space defined
# as "local". Refer to the software framework's documentation for more

View file

@ -0,0 +1,7 @@
Complete SumStat request
Host: 6.5.4.3 -> 1
Host: 10.10.10.10 -> 5
Host: 1.2.3.4 -> 169
Host: 7.2.1.5 -> 145
SumStat key request
Host: 7.2.1.5 -> 145

View file

@ -0,0 +1,5 @@
Complete SumStat request
Host: 1.2.3.4 -> 42
Host: 4.3.2.1 -> 7
Key request for 1.2.3.4
Host: 1.2.3.4 -> 42

View file

@ -23,7 +23,8 @@ global n = 0;
event bro_init() &priority=5
{
local r1: SumStats::Reducer = [$stream="test", $apply=set(SumStats::SUM, SumStats::MIN, SumStats::MAX, SumStats::AVERAGE, SumStats::STD_DEV, SumStats::VARIANCE, SumStats::UNIQUE)];
SumStats::create([$epoch=5secs,
SumStats::create([$name="test",
$epoch=5secs,
$reducers=set(r1),
$epoch_finished(rt: SumStats::ResultTable) =
{

View file

@ -11,16 +11,17 @@ event bro_init() &priority=5
SumStats::MIN,
SumStats::STD_DEV,
SumStats::UNIQUE)];
SumStats::create([$epoch=3secs,
$reducers=set(r1),
$epoch_finished(data: SumStats::ResultTable) =
{
for ( key in data )
{
local r = data[key]["test.metric"];
print fmt("Host: %s - num:%d - sum:%.1f - var:%.1f - avg:%.1f - max:%.1f - min:%.1f - std_dev:%.1f - unique:%d", key$host, r$num, r$sum, r$variance, r$average, r$max, r$min, r$std_dev, r$unique);
}
}
SumStats::create([$name="test",
$epoch=3secs,
$reducers=set(r1),
$epoch_finished(data: SumStats::ResultTable) =
{
for ( key in data )
{
local r = data[key]["test.metric"];
print fmt("Host: %s - num:%d - sum:%.1f - var:%.1f - avg:%.1f - max:%.1f - min:%.1f - std_dev:%.1f - unique:%d", key$host, r$num, r$sum, r$variance, r$average, r$max, r$min, r$std_dev, r$unique);
}
}
]);
SumStats::observe("test.metric", [$host=1.2.3.4], [$num=5]);

View file

@ -20,13 +20,14 @@ redef Log::default_rotation_interval = 0secs;
event bro_init() &priority=5
{
local r1: SumStats::Reducer = [$stream="test.metric", $apply=set(SumStats::SUM)];
SumStats::create([$epoch=1hr,
SumStats::create([$name="test",
$epoch=1hr,
$reducers=set(r1),
$threshold_val(key: SumStats::Key, result: SumStats::Result) =
{
return double_to_count(result["test.metric"]$sum);
return result["test.metric"]$sum;
},
$threshold=100,
$threshold=100.0,
$threshold_crossed(key: SumStats::Key, result: SumStats::Result) =
{
print fmt("A test metric threshold was crossed with a value of: %.1f", result["test.metric"]$sum);

View file

@ -0,0 +1,93 @@
# @TEST-SERIALIZE: comm
#
# @TEST-EXEC: btest-bg-run manager-1 BROPATH=$BROPATH:.. CLUSTER_NODE=manager-1 bro %INPUT
# @TEST-EXEC: sleep 1
# @TEST-EXEC: btest-bg-run worker-1 BROPATH=$BROPATH:.. CLUSTER_NODE=worker-1 bro %INPUT
# @TEST-EXEC: btest-bg-run worker-2 BROPATH=$BROPATH:.. CLUSTER_NODE=worker-2 bro %INPUT
# @TEST-EXEC: btest-bg-wait 15
# @TEST-EXEC: btest-diff manager-1/.stdout
@TEST-START-FILE cluster-layout.bro
redef Cluster::nodes = {
["manager-1"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=37757/tcp, $workers=set("worker-1", "worker-2")],
["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=37760/tcp, $manager="manager-1", $interface="eth0"],
["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=37761/tcp, $manager="manager-1", $interface="eth1"],
};
@TEST-END-FILE
redef Log::default_rotation_interval = 0secs;
global n = 0;
event bro_init() &priority=5
{
local r1: SumStats::Reducer = [$stream="test", $apply=set(SumStats::SUM, SumStats::MIN, SumStats::MAX, SumStats::AVERAGE, SumStats::STD_DEV, SumStats::VARIANCE, SumStats::UNIQUE)];
SumStats::create([$name="test sumstat",
$epoch=1hr,
$reducers=set(r1)]);
}
event remote_connection_closed(p: event_peer)
{
terminate();
}
global ready_for_data: event();
redef Cluster::manager2worker_events += /^ready_for_data$/;
event ready_for_data()
{
if ( Cluster::node == "worker-1" )
{
SumStats::observe("test", [$host=1.2.3.4], [$num=34]);
SumStats::observe("test", [$host=1.2.3.4], [$num=30]);
SumStats::observe("test", [$host=6.5.4.3], [$num=1]);
SumStats::observe("test", [$host=7.2.1.5], [$num=54]);
}
if ( Cluster::node == "worker-2" )
{
SumStats::observe("test", [$host=1.2.3.4], [$num=75]);
SumStats::observe("test", [$host=1.2.3.4], [$num=30]);
SumStats::observe("test", [$host=7.2.1.5], [$num=91]);
SumStats::observe("test", [$host=10.10.10.10], [$num=5]);
}
}
event on_demand2()
{
local host = 7.2.1.5;
when ( local result = SumStats::request_key("test sumstat", [$host=host]) )
{
print "SumStat key request";
print fmt(" Host: %s -> %.0f", host, result["test"]$sum);
terminate();
}
}
event on_demand()
{
when ( local results = SumStats::request("test sumstat") )
{
print "Complete SumStat request";
for ( key in results )
print fmt(" Host: %s -> %.0f", key$host, results[key]["test"]$sum);
event on_demand2();
}
}
global peer_count = 0;
event remote_connection_handshake_done(p: event_peer) &priority=-5
{
++peer_count;
if ( peer_count == 2 )
{
if ( Cluster::local_node_type() == Cluster::MANAGER )
event ready_for_data();
schedule 1sec { on_demand() };
}
}

View file

@ -0,0 +1,45 @@
# @TEST-EXEC: bro %INPUT
# @TEST-EXEC: btest-diff .stdout
redef exit_only_after_terminate=T;
event on_demand()
{
when ( local results = SumStats::request("test") )
{
print "Complete SumStat request";
for ( key in results )
{
print fmt(" Host: %s -> %.0f", key$host, results[key]["test.reducer"]$sum);
}
}
}
event on_demand_key()
{
local host = 1.2.3.4;
when ( local result = SumStats::request_key("test", [$host=host]) )
{
print fmt("Key request for %s", host);
print fmt(" Host: %s -> %.0f", host, result["test.reducer"]$sum);
terminate();
}
}
event bro_init() &priority=5
{
local r1: SumStats::Reducer = [$stream="test.reducer",
$apply=set(SumStats::SUM)];
SumStats::create([$name="test",
$epoch=1hr,
$reducers=set(r1)]);
# Seed some data but notice there are no callbacks defined in the sumstat!
SumStats::observe("test.reducer", [$host=1.2.3.4], [$num=42]);
SumStats::observe("test.reducer", [$host=4.3.2.1], [$num=7]);
schedule 0.1 secs { on_demand() };
schedule 1 secs { on_demand_key() };
}

View file

@ -8,14 +8,15 @@ redef enum Notice::Type += {
event bro_init() &priority=5
{
local r1: SumStats::Reducer = [$stream="test.metric", $apply=set(SumStats::SUM)];
SumStats::create([$epoch=3secs,
SumStats::create([$name="test1",
$epoch=3secs,
$reducers=set(r1),
#$threshold_val = SumStats::sum_threshold("test.metric"),
$threshold_val(key: SumStats::Key, result: SumStats::Result) =
{
return double_to_count(result["test.metric"]$sum);
return result["test.metric"]$sum;
},
$threshold=5,
$threshold=5.0,
$threshold_crossed(key: SumStats::Key, result: SumStats::Result) =
{
local r = result["test.metric"];
@ -24,14 +25,15 @@ event bro_init() &priority=5
]);
local r2: SumStats::Reducer = [$stream="test.metric", $apply=set(SumStats::SUM)];
SumStats::create([$epoch=3secs,
SumStats::create([$name="test2",
$epoch=3secs,
$reducers=set(r2),
#$threshold_val = SumStats::sum_threshold("test.metric"),
$threshold_val(key: SumStats::Key, result: SumStats::Result) =
{
return double_to_count(result["test.metric"]$sum);
return result["test.metric"]$sum;
},
$threshold_series=vector(3,6,800),
$threshold_series=vector(3.0,6.0,800.0),
$threshold_crossed(key: SumStats::Key, result: SumStats::Result) =
{
local r = result["test.metric"];
@ -41,19 +43,20 @@ event bro_init() &priority=5
local r3: SumStats::Reducer = [$stream="test.metric", $apply=set(SumStats::SUM)];
local r4: SumStats::Reducer = [$stream="test.metric2", $apply=set(SumStats::SUM)];
SumStats::create([$epoch=3secs,
SumStats::create([$name="test3",
$epoch=3secs,
$reducers=set(r3, r4),
$threshold_val(key: SumStats::Key, result: SumStats::Result) =
{
# Calculate a ratio between sums of two reducers.
if ( "test.metric2" in result && "test.metric" in result &&
result["test.metric"]$sum > 0 )
return double_to_count(result["test.metric2"]$sum / result["test.metric"]$sum);
return result["test.metric2"]$sum / result["test.metric"]$sum;
else
return 0;
return 0.0;
},
# Looking for metric2 sum to be 5 times the sum of metric
$threshold=5,
$threshold=5.0,
$threshold_crossed(key: SumStats::Key, result: SumStats::Result) =
{
local thold = result["test.metric2"]$sum / result["test.metric"]$sum;