Merge branch 'master' into topic/robin/dnp3-merge-v3

Conflicts:
	scripts/base/init-default.bro
This commit is contained in:
Robin Sommer 2013-08-09 17:11:51 -07:00
commit 0e7f51f78c
90 changed files with 1841 additions and 923 deletions

View file

@ -10,10 +10,6 @@
module SumStats;
export {
## Allows a user to decide how large of result groups the workers should transmit
## values for cluster stats aggregation.
const cluster_send_in_groups_of = 50 &redef;
## The percent of the full threshold value that needs to be met on a single worker
## for that worker to send the value to its manager in order for it to request a
## global view for that value. There is no requirement that the manager requests
@ -27,45 +23,46 @@ export {
## performed. In practice this should hopefully have a minimal effect.
const max_outstanding_global_views = 10 &redef;
## Intermediate updates can cause overload situations on very large clusters. This
## option may help reduce load and correct intermittent problems. The goal for this
## option is also meant to be temporary.
const enable_intermediate_updates = T &redef;
## Event sent by the manager in a cluster to initiate the collection of values for
## a sumstat.
global cluster_ss_request: event(uid: string, ssid: string);
global cluster_ss_request: event(uid: string, ss_name: string, cleanup: bool);
## Event sent by nodes that are collecting sumstats after receiving a request for
## the sumstat from the manager.
global cluster_ss_response: event(uid: string, ssid: string, data: ResultTable, done: bool);
#global cluster_ss_response: event(uid: string, ss_name: string, data: ResultTable, done: bool, cleanup: bool);
## This event is sent by the manager in a cluster to initiate the collection of
## a single key value from a sumstat. It's typically used to get intermediate
## updates before the break interval triggers to speed detection of a value
## crossing a threshold.
global cluster_key_request: event(uid: string, ssid: string, key: Key);
global cluster_get_result: event(uid: string, ss_name: string, key: Key, cleanup: bool);
## This event is sent by nodes in response to a
## :bro:id:`SumStats::cluster_key_request` event.
global cluster_key_response: event(uid: string, ssid: string, key: Key, result: Result);
## :bro:id:`SumStats::cluster_get_result` event.
global cluster_send_result: event(uid: string, ss_name: string, key: Key, result: Result, cleanup: bool);
## This is sent by workers to indicate that they crossed the percent
## of the current threshold by the percentage defined globally in
## :bro:id:`SumStats::cluster_request_global_view_percent`
global cluster_key_intermediate_response: event(ssid: string, key: SumStats::Key);
global cluster_key_intermediate_response: event(ss_name: string, key: SumStats::Key);
## This event is scheduled internally on workers to send result chunks.
global send_data: event(uid: string, ssid: string, data: ResultTable);
global send_data: event(uid: string, ss_name: string, data: ResultTable, cleanup: bool);
global get_a_key: event(uid: string, ss_name: string, cleanup: bool &default=F);
global send_a_key: event(uid: string, ss_name: string, key: Key);
global send_no_key: event(uid: string, ss_name: string);
## This event is generated when a threshold is crossed.
global cluster_threshold_crossed: event(ssid: string, key: SumStats::Key, thold: Thresholding);
global cluster_threshold_crossed: event(ss_name: string, key: SumStats::Key, thold_index: count);
}
# Add events to the cluster framework to make this work.
redef Cluster::manager2worker_events += /SumStats::cluster_(ss_request|key_request|threshold_crossed)/;
redef Cluster::manager2worker_events += /SumStats::thresholds_reset/;
redef Cluster::worker2manager_events += /SumStats::cluster_(ss_response|key_response|key_intermediate_response)/;
redef Cluster::manager2worker_events += /SumStats::cluster_(ss_request|get_result|threshold_crossed)/;
redef Cluster::manager2worker_events += /SumStats::(thresholds_reset|get_a_key)/;
redef Cluster::worker2manager_events += /SumStats::cluster_(ss_response|send_result|key_intermediate_response)/;
redef Cluster::worker2manager_events += /SumStats::(send_a_key|send_no_key)/;
@if ( Cluster::local_node_type() != Cluster::MANAGER )
# This variable is maintained to know what keys have recently sent as
@ -74,12 +71,9 @@ redef Cluster::worker2manager_events += /SumStats::cluster_(ss_response|key_resp
# an intermediate result has been received.
global recent_global_view_keys: table[string, Key] of count &create_expire=1min &default=0;
event bro_init() &priority=-100
{
# The manager is the only host allowed to track these.
stats_store = table();
reducer_store = table();
}
# Result tables indexed on a uid that are currently being sent to the
# manager.
global sending_results: table[string] of ResultTable = table() &create_expire=1min;
# This is done on all non-manager node types in the event that a sumstat is
# being collected somewhere other than a worker.
@ -87,95 +81,151 @@ function data_added(ss: SumStat, key: Key, result: Result)
{
# If an intermediate update for this value was sent recently, don't send
# it again.
if ( [ss$id, key] in recent_global_view_keys )
if ( [ss$name, key] in recent_global_view_keys )
return;
# If val is 5 and global view % is 0.1 (10%), pct_val will be 50. If that
# crosses the full threshold then it's a candidate to send as an
# intermediate update.
if ( enable_intermediate_updates &&
check_thresholds(ss, key, result, cluster_request_global_view_percent) )
if ( check_thresholds(ss, key, result, cluster_request_global_view_percent) )
{
# kick off intermediate update
event SumStats::cluster_key_intermediate_response(ss$id, key);
++recent_global_view_keys[ss$id, key];
event SumStats::cluster_key_intermediate_response(ss$name, key);
++recent_global_view_keys[ss$name, key];
}
}
event SumStats::send_data(uid: string, ssid: string, data: ResultTable)
#event SumStats::send_data(uid: string, ss_name: string, cleanup: bool)
# {
# #print fmt("WORKER %s: sending data for uid %s...", Cluster::node, uid);
#
# local local_data: ResultTable = table();
# local incoming_data: ResultTable = cleanup ? data : copy(data);
#
# local num_added = 0;
# for ( key in incoming_data )
# {
# local_data[key] = incoming_data[key];
# delete incoming_data[key];
#
# # Only send cluster_send_in_groups_of at a time. Queue another
# # event to send the next group.
# if ( cluster_send_in_groups_of == ++num_added )
# break;
# }
#
# local done = F;
# # If data is empty, this sumstat is done.
# if ( |incoming_data| == 0 )
# done = T;
#
# # Note: copy is needed to compensate serialization caching issue. This should be
# # changed to something else later.
# event SumStats::cluster_ss_response(uid, ss_name, copy(local_data), done, cleanup);
# if ( ! done )
# schedule 0.01 sec { SumStats::send_data(uid, T) };
# }
event SumStats::get_a_key(uid: string, ss_name: string, cleanup: bool)
{
#print fmt("WORKER %s: sending data for uid %s...", Cluster::node, uid);
local local_data: ResultTable = table();
local num_added = 0;
for ( key in data )
if ( uid in sending_results )
{
local_data[key] = data[key];
delete data[key];
# Only send cluster_send_in_groups_of at a time. Queue another
# event to send the next group.
if ( cluster_send_in_groups_of == ++num_added )
break;
if ( |sending_results[uid]| == 0 )
{
event SumStats::send_no_key(uid, ss_name);
}
else
{
for ( key in sending_results[uid] )
{
event SumStats::send_a_key(uid, ss_name, key);
# break to only send one.
break;
}
}
}
else if ( !cleanup && ss_name in result_store && |result_store[ss_name]| > 0 )
{
if ( |result_store[ss_name]| == 0 )
{
event SumStats::send_no_key(uid, ss_name);
}
else
{
for ( key in result_store[ss_name] )
{
event SumStats::send_a_key(uid, ss_name, key);
# break to only send one.
break;
}
}
}
else
{
event SumStats::send_no_key(uid, ss_name);
}
local done = F;
# If data is empty, this sumstat is done.
if ( |data| == 0 )
done = T;
# Note: copy is needed to compensate serialization caching issue. This should be
# changed to something else later.
event SumStats::cluster_ss_response(uid, ssid, copy(local_data), done);
if ( ! done )
schedule 0.01 sec { SumStats::send_data(uid, ssid, data) };
}
event SumStats::cluster_ss_request(uid: string, ssid: string)
event SumStats::cluster_ss_request(uid: string, ss_name: string, cleanup: bool)
{
#print fmt("WORKER %s: received the cluster_ss_request event for %s.", Cluster::node, id);
# Initiate sending all of the data for the requested stats.
if ( ssid in result_store )
event SumStats::send_data(uid, ssid, result_store[ssid]);
else
event SumStats::send_data(uid, ssid, table());
# Create a back store for the result
sending_results[uid] = (ss_name in result_store) ? result_store[ss_name] : table();
# Lookup the actual sumstats and reset it, the reference to the data
# currently stored will be maintained internally by the send_data event.
if ( ssid in stats_store )
reset(stats_store[ssid]);
# currently stored will be maintained internally from the
# sending_results table.
if ( cleanup && ss_name in stats_store )
reset(stats_store[ss_name]);
}
event SumStats::cluster_key_request(uid: string, ssid: string, key: Key)
event SumStats::cluster_get_result(uid: string, ss_name: string, key: Key, cleanup: bool)
{
if ( ssid in result_store && key in result_store[ssid] )
{
#print fmt("WORKER %s: received the cluster_key_request event for %s=%s.", Cluster::node, key2str(key), data);
#print fmt("WORKER %s: received the cluster_get_result event for %s=%s.", Cluster::node, key2str(key), data);
# Note: copy is needed to compensate serialization caching issue. This should be
# changed to something else later.
event SumStats::cluster_key_response(uid, ssid, key, copy(result_store[ssid][key]));
}
else
if ( cleanup ) # data will implicitly be in sending_results (i know this isn't great)
{
# We need to send an empty response if we don't have the data so that the manager
# can know that it heard back from all of the workers.
event SumStats::cluster_key_response(uid, ssid, key, table());
if ( uid in sending_results && key in sending_results[uid] )
{
# Note: copy is needed to compensate serialization caching issue. This should be
# changed to something else later.
event SumStats::cluster_send_result(uid, ss_name, key, copy(sending_results[uid][key]), cleanup);
delete sending_results[uid][key];
}
else
{
# We need to send an empty response if we don't have the data so that the manager
# can know that it heard back from all of the workers.
event SumStats::cluster_send_result(uid, ss_name, key, table(), cleanup);
}
}
else
{
if ( ss_name in result_store && key in result_store[ss_name] )
{
event SumStats::cluster_send_result(uid, ss_name, key, copy(result_store[ss_name][key]), cleanup);
}
else
{
# We need to send an empty response if we don't have the data so that the manager
# can know that it heard back from all of the workers.
event SumStats::cluster_send_result(uid, ss_name, key, table(), cleanup);
}
}
}
event SumStats::cluster_threshold_crossed(ssid: string, key: SumStats::Key, thold: Thresholding)
event SumStats::cluster_threshold_crossed(ss_name: string, key: SumStats::Key, thold_index: count)
{
if ( ssid !in threshold_tracker )
threshold_tracker[ssid] = table();
if ( ss_name !in threshold_tracker )
threshold_tracker[ss_name] = table();
threshold_tracker[ssid][key] = thold;
threshold_tracker[ss_name][key] = thold_index;
}
event SumStats::thresholds_reset(ssid: string)
event SumStats::thresholds_reset(ss_name: string)
{
threshold_tracker[ssid] = table();
delete threshold_tracker[ss_name];
}
@endif
@ -186,7 +236,7 @@ event SumStats::thresholds_reset(ssid: string)
# This variable is maintained by manager nodes as they collect and aggregate
# results.
# Index on a uid.
global stats_results: table[string] of ResultTable &read_expire=1min;
global stats_keys: table[string] of set[Key] &create_expire=1min;
# This variable is maintained by manager nodes to track how many "dones" they
# collected per collection unique id. Once the number of results for a uid
@ -194,18 +244,18 @@ global stats_results: table[string] of ResultTable &read_expire=1min;
# result is written out and deleted from here.
# Indexed on a uid.
# TODO: add an &expire_func in case not all results are received.
global done_with: table[string] of count &read_expire=1min &default=0;
global done_with: table[string] of count &create_expire=1min &default=0;
# This variable is maintained by managers to track intermediate responses as
# they are getting a global view for a certain key.
# Indexed on a uid.
global key_requests: table[string] of Result &read_expire=1min;
global key_requests: table[string] of Result &create_expire=1min;
# This variable is maintained by managers to prevent overwhelming communication due
# to too many intermediate updates. Each sumstat is tracked separately so that
# one won't overwhelm and degrade other quieter sumstats.
# Indexed on a sumstat id.
global outstanding_global_views: table[string] of count &default=0;
global outstanding_global_views: table[string] of count &create_expire=1min &default=0;
const zero_time = double_to_time(0.0);
# Managers handle logging.
@ -213,15 +263,19 @@ event SumStats::finish_epoch(ss: SumStat)
{
if ( network_time() > zero_time )
{
#print fmt("%.6f MANAGER: breaking %s sumstat for %s sumstat", network_time(), ss$name, ss$id);
#print fmt("%.6f MANAGER: breaking %s sumstat", network_time(), ss$name);
local uid = unique_id("");
if ( uid in stats_results )
delete stats_results[uid];
stats_results[uid] = table();
if ( uid in stats_keys )
delete stats_keys[uid];
stats_keys[uid] = set();
# Request data from peers.
event SumStats::cluster_ss_request(uid, ss$id);
event SumStats::cluster_ss_request(uid, ss$name, T);
done_with[uid] = 0;
#print fmt("get_key by uid: %s", uid);
event SumStats::get_a_key(uid, ss$name, T);
}
# Schedule the next finish_epoch event.
@ -235,51 +289,160 @@ function data_added(ss: SumStat, key: Key, result: Result)
if ( check_thresholds(ss, key, result, 1.0) )
{
threshold_crossed(ss, key, result);
event SumStats::cluster_threshold_crossed(ss$id, key, threshold_tracker[ss$id][key]);
event SumStats::cluster_threshold_crossed(ss$name, key, threshold_tracker[ss$name][key]);
}
}
event SumStats::cluster_key_response(uid: string, ssid: string, key: Key, result: Result)
function handle_end_of_result_collection(uid: string, ss_name: string, key: Key, cleanup: bool)
{
#print fmt("worker_count:%d :: done_with:%d", Cluster::worker_count, done_with[uid]);
local ss = stats_store[ss_name];
local ir = key_requests[uid];
if ( check_thresholds(ss, key, ir, 1.0) )
{
threshold_crossed(ss, key, ir);
event SumStats::cluster_threshold_crossed(ss_name, key, threshold_tracker[ss_name][key]);
}
if ( cleanup )
{
# This is done here because "cleanup" implicitly means
# it's the end of an epoch.
if ( ss?$epoch_result && |ir| > 0 )
{
local now = network_time();
ss$epoch_result(now, key, ir);
}
# Check that there is an outstanding view before subtracting.
# Global views only apply to non-dynamic requests. Dynamic
# requests must be serviced.
if ( outstanding_global_views[ss_name] > 0 )
--outstanding_global_views[ss_name];
}
delete key_requests[uid];
delete done_with[uid];
}
function request_all_current_keys(uid: string, ss_name: string, cleanup: bool)
{
#print "request_all_current_keys";
if ( uid in stats_keys && |stats_keys[uid]| > 0 )
{
#print fmt(" -- %d remaining keys here", |stats_keys[uid]|);
for ( key in stats_keys[uid] )
{
done_with[uid] = 0;
event SumStats::cluster_get_result(uid, ss_name, key, cleanup);
when ( uid in done_with && Cluster::worker_count == done_with[uid] )
{
#print "done getting result";
handle_end_of_result_collection(uid, ss_name, key, cleanup);
request_all_current_keys(uid, ss_name, cleanup);
}
delete stats_keys[uid][key];
break; # only a single key
}
}
else
{
# Get more keys! And this breaks us out of the evented loop.
done_with[uid] = 0;
#print fmt("get_key by uid: %s", uid);
event SumStats::get_a_key(uid, ss_name, cleanup);
}
}
event SumStats::send_no_key(uid: string, ss_name: string)
{
#print "send_no_key";
++done_with[uid];
if ( Cluster::worker_count == done_with[uid] )
{
delete done_with[uid];
if ( |stats_keys[uid]| > 0 )
{
#print "we need more keys!";
# Now that we have a key from each worker, lets
# grab all of the results.
request_all_current_keys(uid, ss_name, T);
}
else
{
#print "we're out of keys!";
local ss = stats_store[ss_name];
if ( ss?$epoch_finished )
ss$epoch_finished(network_time());
}
}
}
event SumStats::send_a_key(uid: string, ss_name: string, key: Key)
{
#print fmt("send_a_key %s", key);
if ( uid !in stats_keys )
{
# no clue what happened here
return;
}
if ( key !in stats_keys[uid] )
add stats_keys[uid][key];
++done_with[uid];
if ( Cluster::worker_count == done_with[uid] )
{
delete done_with[uid];
if ( |stats_keys[uid]| > 0 )
{
#print "we need more keys!";
# Now that we have a key from each worker, lets
# grab all of the results.
request_all_current_keys(uid, ss_name, T);
}
else
{
#print "we're out of keys!";
local ss = stats_store[ss_name];
if ( ss?$epoch_finished )
ss$epoch_finished(network_time());
}
}
}
event SumStats::cluster_send_result(uid: string, ss_name: string, key: Key, result: Result, cleanup: bool)
{
#print "cluster_send_result";
#print fmt("%0.6f MANAGER: receiving key data from %s - %s=%s", network_time(), get_event_peer()$descr, key2str(key), result);
# We only want to try and do a value merge if there are actually measured datapoints
# in the Result.
if ( uid in key_requests )
key_requests[uid] = compose_results(key_requests[uid], result);
else
if ( uid !in key_requests || |key_requests[uid]| == 0 )
key_requests[uid] = result;
else
key_requests[uid] = compose_results(key_requests[uid], result);
# Mark that a worker is done.
++done_with[uid];
#print fmt("worker_count:%d :: done_with:%d", Cluster::worker_count, done_with[uid]);
if ( Cluster::worker_count == done_with[uid] )
{
local ss = stats_store[ssid];
local ir = key_requests[uid];
if ( check_thresholds(ss, key, ir, 1.0) )
{
threshold_crossed(ss, key, ir);
event SumStats::cluster_threshold_crossed(ss$id, key, threshold_tracker[ss$id][key]);
}
delete done_with[uid];
delete key_requests[uid];
# Check that there is an outstanding view before subtracting.
if ( outstanding_global_views[ssid] > 0 )
--outstanding_global_views[ssid];
}
#if ( Cluster::worker_count == done_with[uid] )
# {
# print "done";
# handle_end_of_result_collection(uid, ss_name, key, cleanup);
# }
}
# Managers handle intermediate updates here.
event SumStats::cluster_key_intermediate_response(ssid: string, key: Key)
event SumStats::cluster_key_intermediate_response(ss_name: string, key: Key)
{
#print fmt("MANAGER: receiving intermediate key data from %s", get_event_peer()$descr);
#print fmt("MANAGER: requesting key data for %s", key2str(key));
if ( ssid in outstanding_global_views &&
|outstanding_global_views[ssid]| > max_outstanding_global_views )
if ( ss_name in outstanding_global_views &&
|outstanding_global_views[ss_name]| > max_outstanding_global_views )
{
# Don't do this intermediate update. Perhaps at some point in the future
# we will queue and randomly select from these ignored intermediate
@ -287,60 +450,131 @@ event SumStats::cluster_key_intermediate_response(ssid: string, key: Key)
return;
}
++outstanding_global_views[ssid];
++outstanding_global_views[ss_name];
local uid = unique_id("");
event SumStats::cluster_key_request(uid, ssid, key);
}
event SumStats::cluster_ss_response(uid: string, ssid: string, data: ResultTable, done: bool)
{
#print fmt("MANAGER: receiving results from %s", get_event_peer()$descr);
# Mark another worker as being "done" for this uid.
if ( done )
++done_with[uid];
local local_data = stats_results[uid];
local ss = stats_store[ssid];
for ( key in data )
done_with[uid] = 0;
event SumStats::cluster_get_result(uid, ss_name, key, F);
when ( uid in done_with && Cluster::worker_count == done_with[uid] )
{
if ( key in local_data )
local_data[key] = compose_results(local_data[key], data[key]);
else
local_data[key] = data[key];
# If a stat is done being collected, thresholds for each key
# need to be checked so we're doing it here to avoid doubly
# iterating over each key.
if ( Cluster::worker_count == done_with[uid] )
{
if ( check_thresholds(ss, key, local_data[key], 1.0) )
{
threshold_crossed(ss, key, local_data[key]);
event SumStats::cluster_threshold_crossed(ss$id, key, threshold_tracker[ss$id][key]);
}
}
handle_end_of_result_collection(uid, ss_name, key, F);
}
timeout 1.1min
{
Reporter::warning(fmt("Dynamic SumStat intermediate key request for %s (%s) took longer than 1 minute and was automatically cancelled.", ss_name, key));
}
# If the data has been collected from all peers, we are done and ready to finish.
if ( Cluster::worker_count == done_with[uid] )
{
if ( ss?$epoch_finished )
ss$epoch_finished(local_data);
}
#event SumStats::cluster_ss_response(uid: string, ss_name: string, data: ResultTable, done: bool, cleanup: bool)
# {
# #print fmt("MANAGER: receiving results from %s", get_event_peer()$descr);
#
# # Mark another worker as being "done" for this uid.
# if ( done )
# ++done_with[uid];
#
# # We had better only be getting requests for stuff that exists.
# if ( ss_name !in stats_store )
# return;
#
# if ( uid !in stats_keys )
# stats_keys[uid] = table();
#
# local local_data = stats_keys[uid];
# local ss = stats_store[ss_name];
#
# for ( key in data )
# {
# if ( key in local_data )
# local_data[key] = compose_results(local_data[key], data[key]);
# else
# local_data[key] = data[key];
#
# # If a stat is done being collected, thresholds for each key
# # need to be checked so we're doing it here to avoid doubly
# # iterating over each key.
# if ( Cluster::worker_count == done_with[uid] )
# {
# if ( check_thresholds(ss, key, local_data[key], 1.0) )
# {
# threshold_crossed(ss, key, local_data[key]);
# event SumStats::cluster_threshold_crossed(ss$name, key, threshold_tracker[ss$name][key]);
# }
# }
# }
#
# # If the data has been collected from all peers, we are done and ready to finish.
# if ( cleanup && Cluster::worker_count == done_with[uid] )
# {
# local now = network_time();
# if ( ss?$epoch_result )
# {
# for ( key in local_data )
# ss$epoch_result(now, key, local_data[key]);
# }
#
# if ( ss?$epoch_finished )
# ss$epoch_finished(now);
#
# # Clean up
# delete stats_keys[uid];
# delete done_with[uid];
# reset(ss);
# }
# }
#function request(ss_name: string): ResultTable
# {
# # This only needs to be implemented this way for cluster compatibility.
# local uid = unique_id("dyn-");
# stats_keys[uid] = table();
# done_with[uid] = 0;
# event SumStats::cluster_ss_request(uid, ss_name, F);
#
# return when ( uid in done_with && Cluster::worker_count == done_with[uid] )
# {
# if ( uid in stats_keys )
# {
# local ss_result = stats_keys[uid];
# # Clean up
# delete stats_keys[uid];
# delete done_with[uid];
# reset(stats_store[ss_name]);
# return ss_result;
# }
# else
# return table();
# }
# timeout 1.1min
# {
# Reporter::warning(fmt("Dynamic SumStat request for %s took longer than 1 minute and was automatically cancelled.", ss_name));
# return table();
# }
# }
function request_key(ss_name: string, key: Key): Result
{
local uid = unique_id("");
done_with[uid] = 0;
key_requests[uid] = table();
event SumStats::cluster_get_result(uid, ss_name, key, F);
return when ( uid in done_with && Cluster::worker_count == done_with[uid] )
{
#print "done with request_key";
local result = key_requests[uid];
# Clean up
delete stats_results[uid];
delete key_requests[uid];
delete done_with[uid];
# Not sure I need to reset the sumstat on the manager.
reset(ss);
return result;
}
timeout 1.1min
{
Reporter::warning(fmt("Dynamic SumStat key request for %s (%s) took longer than 1 minute and was automatically cancelled.", ss_name, key));
return table();
}
}
event remote_connection_handshake_done(p: event_peer) &priority=5
{
send_id(p, "SumStats::stats_store");
send_id(p, "SumStats::reducer_store");
}
@endif

View file

@ -74,10 +74,6 @@ export {
## Type to store results for multiple reducers.
type Result: table[string] of ResultVal;
## Type to store a table of sumstats results indexed
## by keys.
type ResultTable: table[Key] of Result;
## SumStats represent an aggregation of reducers along with
## mechanisms to handle various situations like the epoch ending
## or thresholds being crossed.
@ -87,8 +83,12 @@ export {
## is no assurance provided as to where the callbacks
## will be executed on clusters.
type SumStat: record {
## An arbitrary name for the sumstat so that it can
## be referred to later.
name: string;
## The interval at which this filter should be "broken"
## and the '$epoch_finished' callback called. The
## and the '$epoch_result' callback called. The
## results are also reset at this time so any threshold
## based detection needs to be set to a
## value that should be expected to happen within
@ -102,22 +102,28 @@ export {
## :bro:see:`SumStats::Result` structure which will be used
## for thresholding.
## This is required if a $threshold value is given.
threshold_val: function(key: SumStats::Key, result: SumStats::Result): count &optional;
threshold_val: function(key: SumStats::Key, result: SumStats::Result): double &optional;
## The threshold value for calling the
## $threshold_crossed callback.
threshold: count &optional;
threshold: double &optional;
## A series of thresholds for calling the
## $threshold_crossed callback.
threshold_series: vector of count &optional;
threshold_series: vector of double &optional;
## A callback that is called when a threshold is crossed.
threshold_crossed: function(key: SumStats::Key, result: SumStats::Result) &optional;
## A callback with the full collection of Results for
## this SumStat.
epoch_finished: function(rt: SumStats::ResultTable) &optional;
## A callback that receives each of the results at the
## end of the analysis epoch. The function will be
## called once for each key.
epoch_result: function(ts: time, key: SumStats::Key, result: SumStats::Result) &optional;
## A callback that will be called when a single collection
## interval is completed. The ts value will be the time of
## when the collection started.
epoch_finished: function(ts:time) &optional;
};
## Create a summary statistic.
@ -134,19 +140,23 @@ export {
## obs: The data point to send into the stream.
global observe: function(id: string, key: SumStats::Key, obs: SumStats::Observation);
## This record is primarily used for internal threshold tracking.
type Thresholding: record {
# Internal use only. Indicates if a simple threshold was already crossed.
is_threshold_crossed: bool &default=F;
# Internal use only. Current key for threshold series.
threshold_series_index: count &default=0;
};
## Dynamically request a sumstat key. This function should be
## used sparingly and not as a replacement for the callbacks
## from the :bro:see:`SumStat` record. The function is only
## available for use within "when" statements as an asynchronous
## function.
##
## ss_name: SumStat name.
##
## key: The SumStat key being requested.
##
## Returns: The result for the requested sumstat key.
global request_key: function(ss_name: string, key: Key): Result;
## This event is generated when thresholds are reset for a SumStat.
##
## ssid: SumStats ID that thresholds were reset for.
global thresholds_reset: event(ssid: string);
## name: SumStats name that thresholds were reset for.
global thresholds_reset: event(name: string);
## Helper function to represent a :bro:type:`SumStats::Key` value as
## a simple string.
@ -157,18 +167,49 @@ export {
global key2str: function(key: SumStats::Key): string;
}
# Type to store a table of sumstats results indexed by keys.
type ResultTable: table[Key] of Result;
# The function prototype for plugins to do calculations.
type ObserveFunc: function(r: Reducer, val: double, data: Observation, rv: ResultVal);
redef record Reducer += {
# Internal use only. Provides a reference back to the related SumStats by it's ID.
sid: string &optional;
# Internal use only. Provides a reference back to the related SumStats by its name.
ssname: string &optional;
calc_funcs: vector of Calculation &optional;
};
# Internal use only. For tracking thresholds per sumstat and key.
global threshold_tracker: table[string] of table[Key] of Thresholding &optional;
# In the case of a single threshold, 0 means the threshold isn't crossed.
# In the case of a threshold series, the number tracks the threshold offset.
global threshold_tracker: table[string] of table[Key] of count;
redef record SumStat += {
# Internal use only (mostly for cluster coherency).
id: string &optional;
};
function increment_threshold_tracker(ss_name: string, key: Key)
{
if ( ss_name !in threshold_tracker )
threshold_tracker[ss_name] = table();
if ( key !in threshold_tracker[ss_name] )
threshold_tracker[ss_name][key] = 0;
++threshold_tracker[ss_name][key];
}
function get_threshold_index(ss_name: string, key: Key): count
{
if ( ss_name !in threshold_tracker )
return 0;
if ( key !in threshold_tracker[ss_name] )
return 0;
return threshold_tracker[ss_name][key];
}
# Prototype the hook point for plugins to initialize any result values.
global init_resultval_hook: hook(r: Reducer, rv: ResultVal);
# Prototype the hook point for plugins to merge Results.
global compose_resultvals_hook: hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal);
# Store of sumstats indexed on the sumstat id.
global stats_store: table[string] of SumStat = table();
@ -182,20 +223,20 @@ global result_store: table[string] of ResultTable = table();
# Store of threshold information.
global thresholds_store: table[string, Key] of bool = table();
# Store the calculations.
global calc_store: table[Calculation] of ObserveFunc = table();
# Store the dependencies for Calculations.
global calc_deps: table[Calculation] of vector of Calculation = table();
# Hook for registering observation calculation plugins.
global register_observe_plugins: hook();
# This is called whenever key values are updated and the new val is given as the
# `val` argument. It's only prototyped here because cluster and non-cluster have
# separate implementations.
global data_added: function(ss: SumStat, key: Key, result: Result);
# Prototype the hook point for plugins to do calculations.
global observe_hook: hook(r: Reducer, val: double, data: Observation, rv: ResultVal);
# Prototype the hook point for plugins to initialize any result values.
global init_resultval_hook: hook(r: Reducer, rv: ResultVal);
# Prototype the hook point for plugins to merge Results.
global compose_resultvals_hook: hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal);
# Event that is used to "finish" measurements and adapt the measurement
# framework for clustered or non-clustered usage.
global finish_epoch: event(ss: SumStat);
@ -210,6 +251,24 @@ function key2str(key: Key): string
return fmt("sumstats_key(%s)", out);
}
function register_observe_plugin(calc: Calculation, func: ObserveFunc)
{
calc_store[calc] = func;
}
function add_observe_plugin_dependency(calc: Calculation, depends_on: Calculation)
{
if ( calc !in calc_deps )
calc_deps[calc] = vector();
calc_deps[calc][|calc_deps[calc]|] = depends_on;
}
event bro_init() &priority=100000
{
# Call all of the plugin registration hooks
hook register_observe_plugins();
}
function init_resultval(r: Reducer): ResultVal
{
local rv: ResultVal = [$begin=network_time(), $end=network_time()];
@ -234,25 +293,17 @@ function compose_results(r1: Result, r2: Result): Result
{
local result: Result = table();
if ( |r1| > |r2| )
for ( id in r1 )
{
for ( data_id in r1 )
{
if ( data_id in r2 )
result[data_id] = compose_resultvals(r1[data_id], r2[data_id]);
else
result[data_id] = r1[data_id];
}
result[id] = r1[id];
}
else
for ( id in r2 )
{
for ( data_id in r2 )
{
if ( data_id in r1 )
result[data_id] = compose_resultvals(r1[data_id], r2[data_id]);
else
result[data_id] = r2[data_id];
}
if ( id in r1 )
result[id] = compose_resultvals(r1[id], r2[id]);
else
result[id] = r2[id];
}
return result;
@ -261,18 +312,43 @@ function compose_results(r1: Result, r2: Result): Result
function reset(ss: SumStat)
{
if ( ss$id in result_store )
delete result_store[ss$id];
if ( ss$name in result_store )
delete result_store[ss$name];
result_store[ss$id] = table();
result_store[ss$name] = table();
if ( ss?$threshold || ss?$threshold_series )
if ( ss$name in threshold_tracker )
{
threshold_tracker[ss$id] = table();
event SumStats::thresholds_reset(ss$id);
delete threshold_tracker[ss$name];
threshold_tracker[ss$name] = table();
event SumStats::thresholds_reset(ss$name);
}
}
# This could potentially recurse forever, but plugin authors
# should be making sure they aren't causing reflexive dependencies.
function add_calc_deps(calcs: vector of Calculation, c: Calculation)
{
#print fmt("Checking for deps for %s", c);
for ( i in calc_deps[c] )
{
local skip_calc=F;
for ( j in calcs )
{
if ( calcs[j] == calc_deps[c][i] )
skip_calc=T;
}
if ( ! skip_calc )
{
if ( calc_deps[c][i] in calc_deps )
add_calc_deps(calcs, calc_deps[c][i]);
calcs[|c|] = calc_deps[c][i];
#print fmt("add dep for %s [%s] ", c, calc_deps[c][i]);
}
}
}
function create(ss: SumStat)
{
if ( (ss?$threshold || ss?$threshold_series) && ! ss?$threshold_val )
@ -280,14 +356,34 @@ function create(ss: SumStat)
Reporter::error("SumStats given a threshold with no $threshold_val function");
}
if ( ! ss?$id )
ss$id=unique_id("");
threshold_tracker[ss$id] = table();
stats_store[ss$id] = ss;
stats_store[ss$name] = ss;
if ( ss?$threshold || ss?$threshold_series )
threshold_tracker[ss$name] = table();
for ( reducer in ss$reducers )
{
reducer$sid = ss$id;
reducer$ssname = ss$name;
reducer$calc_funcs = vector();
for ( calc in reducer$apply )
{
# Add in dependencies recursively.
if ( calc in calc_deps )
add_calc_deps(reducer$calc_funcs, calc);
# Don't add this calculation to the vector if
# it was already added by something else as a
# dependency.
local skip_calc=F;
for ( j in reducer$calc_funcs )
{
if ( calc == reducer$calc_funcs[j] )
skip_calc=T;
}
if ( ! skip_calc )
reducer$calc_funcs[|reducer$calc_funcs|] = calc;
}
if ( reducer$stream !in reducer_store )
reducer_store[reducer$stream] = set();
add reducer_store[reducer$stream][reducer];
@ -313,9 +409,9 @@ function observe(id: string, key: Key, obs: Observation)
if ( r?$pred && ! r$pred(key, obs) )
next;
local ss = stats_store[r$sid];
local ss = stats_store[r$ssname];
# If there is a threshold and no epoch_finished callback
# If there is a threshold and no epoch_result callback
# we don't need to continue counting since the data will
# never be accessed. This was leading
# to some state management issues when measuring
@ -323,18 +419,21 @@ function observe(id: string, key: Key, obs: Observation)
# NOTE: this optimization could need removed in the
# future if on demand access is provided to the
# SumStats results.
if ( ! ss?$epoch_finished &&
r$sid in threshold_tracker &&
key in threshold_tracker[r$sid] &&
if ( ! ss?$epoch_result &&
r$ssname in threshold_tracker &&
( ss?$threshold &&
threshold_tracker[r$sid][key]$is_threshold_crossed ) ||
key in threshold_tracker[r$ssname] &&
threshold_tracker[r$ssname][key] != 0 ) ||
( ss?$threshold_series &&
threshold_tracker[r$sid][key]$threshold_series_index+1 == |ss$threshold_series| ) )
key in threshold_tracker[r$ssname] &&
threshold_tracker[r$ssname][key] == |ss$threshold_series| ) )
{
next;
}
if ( r$sid !in result_store )
result_store[ss$id] = table();
local results = result_store[r$sid];
if ( r$ssname !in result_store )
result_store[r$ssname] = table();
local results = result_store[r$ssname];
if ( key !in results )
results[key] = table();
@ -350,10 +449,13 @@ function observe(id: string, key: Key, obs: Observation)
# If a string was given, fall back to 1.0 as the value.
local val = 1.0;
if ( obs?$num || obs?$dbl )
val = obs?$dbl ? obs$dbl : obs$num;
if ( obs?$num )
val = obs$num;
else if ( obs?$dbl )
val = obs$dbl;
hook observe_hook(r, val, obs, result_val);
for ( i in r$calc_funcs )
calc_store[r$calc_funcs[i]](r, val, obs, result_val);
data_added(ss, key, result);
}
}
@ -362,10 +464,12 @@ function observe(id: string, key: Key, obs: Observation)
# mid-break-interval threshold crossing detection for cluster deployments.
function check_thresholds(ss: SumStat, key: Key, result: Result, modify_pct: double): bool
{
if ( ! (ss?$threshold || ss?$threshold_series) )
if ( ! (ss?$threshold || ss?$threshold_series || ss?$threshold_crossed) )
return F;
# Add in the extra ResultVals to make threshold_vals easier to write.
# This length comparison should work because we just need to make
# sure that we have the same number of reducers and results.
if ( |ss$reducers| != |result| )
{
for ( reducer in ss$reducers )
@ -378,28 +482,21 @@ function check_thresholds(ss: SumStat, key: Key, result: Result, modify_pct: dou
local watch = ss$threshold_val(key, result);
if ( modify_pct < 1.0 && modify_pct > 0.0 )
watch = double_to_count(floor(watch/modify_pct));
watch = watch/modify_pct;
if ( ss$id !in threshold_tracker )
threshold_tracker[ss$id] = table();
local t_tracker = threshold_tracker[ss$id];
local t_index = get_threshold_index(ss$name, key);
if ( key !in t_tracker )
{
local ttmp: Thresholding;
t_tracker[key] = ttmp;
}
local tt = t_tracker[key];
if ( ss?$threshold && ! tt$is_threshold_crossed && watch >= ss$threshold )
if ( ss?$threshold &&
t_index == 0 && # Check that the threshold hasn't already been crossed.
watch >= ss$threshold )
{
# Value crossed the threshold.
return T;
}
if ( ss?$threshold_series &&
|ss$threshold_series| >= tt$threshold_series_index &&
watch >= ss$threshold_series[tt$threshold_series_index] )
|ss$threshold_series| > t_index && # Check if there are more thresholds.
watch >= ss$threshold_series[t_index] )
{
# A threshold series was given and the value crossed the next
# value in the series.
@ -415,6 +512,8 @@ function threshold_crossed(ss: SumStat, key: Key, result: Result)
if ( ! ss?$threshold_crossed )
return;
increment_threshold_tracker(ss$name,key);
# Add in the extra ResultVals to make threshold_crossed callbacks easier to write.
if ( |ss$reducers| != |result| )
{
@ -426,11 +525,5 @@ function threshold_crossed(ss: SumStat, key: Key, result: Result)
}
ss$threshold_crossed(key, result);
local tt = threshold_tracker[ss$id][key];
tt$is_threshold_crossed = T;
# Bump up to the next threshold series index if a threshold series is being used.
if ( ss?$threshold_series )
++tt$threshold_series_index;
}

View file

@ -4,11 +4,20 @@ module SumStats;
event SumStats::finish_epoch(ss: SumStat)
{
if ( ss$id in result_store )
if ( ss$name in result_store )
{
local data = result_store[ss$id];
local now = network_time();
if ( ss?$epoch_result )
{
local data = result_store[ss$name];
# TODO: don't block here.
for ( key in data )
ss$epoch_result(now, key, data[key]);
}
if ( ss?$epoch_finished )
ss$epoch_finished(data);
ss$epoch_finished(now);
reset(ss);
}
@ -16,9 +25,32 @@ event SumStats::finish_epoch(ss: SumStat)
schedule ss$epoch { SumStats::finish_epoch(ss) };
}
function data_added(ss: SumStat, key: Key, result: Result)
{
if ( check_thresholds(ss, key, result, 1.0) )
threshold_crossed(ss, key, result);
}
function request(ss_name: string): ResultTable
{
# This only needs to be implemented this way for cluster compatibility.
return when ( T )
{
if ( ss_name in result_store )
return result_store[ss_name];
else
return table();
}
}
function request_key(ss_name: string, key: Key): Result
{
# This only needs to be implemented this way for cluster compatibility.
return when ( T )
{
if ( ss_name in result_store && key in result_store[ss_name] )
return result_store[ss_name][key];
else
return table();
}
}

View file

@ -1,4 +1,4 @@
@load base/frameworks/sumstats/main
@load ../main
module SumStats;
@ -14,17 +14,18 @@ export {
};
}
hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal)
hook register_observe_plugins()
{
if ( AVERAGE in r$apply )
register_observe_plugin(AVERAGE, function(r: Reducer, val: double, obs: Observation, rv: ResultVal)
{
if ( ! rv?$average )
rv$average = val;
else
rv$average += (val - rv$average) / rv$num;
}
});
}
hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal)
{
if ( rv1?$average && rv2?$average )

View file

@ -33,16 +33,20 @@ function get_last(rv: ResultVal): vector of Observation
return s;
}
hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal)
hook register_observe_plugins()
{
if ( LAST in r$apply && r$num_last_elements > 0 )
register_observe_plugin(LAST, function(r: Reducer, val: double, obs: Observation, rv: ResultVal)
{
if ( ! rv?$last_elements )
rv$last_elements = Queue::init([$max_len=r$num_last_elements]);
Queue::put(rv$last_elements, obs);
}
if ( r$num_last_elements > 0 )
{
if ( ! rv?$last_elements )
rv$last_elements = Queue::init([$max_len=r$num_last_elements]);
Queue::put(rv$last_elements, obs);
}
});
}
hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal)
{
# Merge $samples

View file

@ -1,4 +1,4 @@
@load base/frameworks/sumstats/main
@load ../main
module SumStats;
@ -14,15 +14,15 @@ export {
};
}
hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal)
hook register_observe_plugins()
{
if ( MAX in r$apply )
register_observe_plugin(MAX, function(r: Reducer, val: double, obs: Observation, rv: ResultVal)
{
if ( ! rv?$max )
rv$max = val;
else if ( val > rv$max )
rv$max = val;
}
});
}
hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal)

View file

@ -1,4 +1,4 @@
@load base/frameworks/sumstats/main
@load ../main
module SumStats;
@ -14,17 +14,18 @@ export {
};
}
hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal)
hook register_observe_plugins()
{
if ( MIN in r$apply )
register_observe_plugin(MIN, function(r: Reducer, val: double, obs: Observation, rv: ResultVal)
{
if ( ! rv?$min )
rv$min = val;
else if ( val < rv$min )
rv$min = val;
}
});
}
hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal)
{
if ( rv1?$min && rv2?$min )

View file

@ -47,15 +47,14 @@ function sample_add_sample(obs:Observation, rv: ResultVal)
if ( ra < rv$num_samples )
rv$samples[ra] = obs;
}
}
hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal)
hook register_observe_plugins()
{
if ( SAMPLE in r$apply )
register_observe_plugin(SAMPLE, function(r: Reducer, val: double, obs: Observation, rv: ResultVal)
{
sample_add_sample(obs, rv);
}
});
}
hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal)
@ -75,7 +74,6 @@ hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal)
return;
}
if ( |rv1$samples| != num_samples && |rv2$samples| < num_samples )
{
if ( |rv1$samples| != rv1$sample_elements || |rv2$samples| < rv2$sample_elements )

View file

@ -1,5 +1,5 @@
@load base/frameworks/sumstats/main
@load ./variance
@load ../main
module SumStats;
@ -21,11 +21,18 @@ function calc_std_dev(rv: ResultVal)
rv$std_dev = sqrt(rv$variance);
}
# This depends on the variance plugin which uses priority -5
hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal) &priority=-10
hook std_dev_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal)
{
if ( STD_DEV in r$apply )
calc_std_dev(rv);
}
hook register_observe_plugins() &priority=-10
{
register_observe_plugin(STD_DEV, function(r: Reducer, val: double, obs: Observation, rv: ResultVal)
{
calc_std_dev(rv);
});
add_observe_plugin_dependency(STD_DEV, VARIANCE);
}
hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal) &priority=-10

View file

@ -1,4 +1,4 @@
@load base/frameworks/sumstats/main
@load ../main
module SumStats;
@ -14,19 +14,19 @@ export {
sum: double &default=0.0;
};
type threshold_function: function(key: SumStats::Key, result: SumStats::Result): count;
global sum_threshold: function(data_id: string): threshold_function;
#type threshold_function: function(key: SumStats::Key, result: SumStats::Result): count;
#global sum_threshold: function(data_id: string): threshold_function;
}
function sum_threshold(data_id: string): threshold_function
{
return function(key: SumStats::Key, result: SumStats::Result): count
{
print fmt("data_id: %s", data_id);
print result;
return double_to_count(result[data_id]$sum);
};
}
#function sum_threshold(data_id: string): threshold_function
# {
# return function(key: SumStats::Key, result: SumStats::Result): count
# {
# print fmt("data_id: %s", data_id);
# print result;
# return double_to_count(result[data_id]$sum);
# };
# }
hook init_resultval_hook(r: Reducer, rv: ResultVal)
{
@ -34,10 +34,12 @@ hook init_resultval_hook(r: Reducer, rv: ResultVal)
rv$sum = 0;
}
hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal)
hook register_observe_plugins()
{
if ( SUM in r$apply )
register_observe_plugin(SUM, function(r: Reducer, val: double, obs: Observation, rv: ResultVal)
{
rv$sum += val;
});
}
hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal)

View file

@ -18,18 +18,20 @@ export {
}
hook register_observe_plugins()
{
register_observe_plugin(TOPK, function(r: Reducer, val: double, obs: Observation, rv: ResultVal)
{
topk_add(rv$topk, obs);
});
}
hook init_resultval_hook(r: Reducer, rv: ResultVal)
{
if ( TOPK in r$apply && ! rv?$topk )
rv$topk = topk_init(r$topk_size);
}
hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal)
{
if ( TOPK in r$apply )
topk_add(rv$topk, obs);
}
hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal)
{
if ( rv1?$topk )

View file

@ -1,4 +1,4 @@
@load base/frameworks/sumstats/main
@load ../main
module SumStats;
@ -23,15 +23,15 @@ redef record ResultVal += {
unique_vals: set[Observation] &optional;
};
hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal)
hook register_observe_plugins()
{
if ( UNIQUE in r$apply )
register_observe_plugin(UNIQUE, function(r: Reducer, val: double, obs: Observation, rv: ResultVal)
{
if ( ! rv?$unique_vals )
rv$unique_vals=set();
add rv$unique_vals[obs];
rv$unique = |rv$unique_vals|;
}
});
}
hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal)

View file

@ -1,5 +1,5 @@
@load base/frameworks/sumstats/main
@load ./average
@load ../main
module SumStats;
@ -28,17 +28,17 @@ function calc_variance(rv: ResultVal)
rv$variance = (rv$num > 1) ? rv$var_s/(rv$num-1) : 0.0;
}
# Reduced priority since this depends on the average
hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal) &priority=-5
hook register_observe_plugins() &priority=-5
{
if ( VARIANCE in r$apply )
register_observe_plugin(VARIANCE, function(r: Reducer, val: double, obs: Observation, rv: ResultVal)
{
if ( rv$num > 1 )
rv$var_s += ((val - rv$prev_avg) * (val - rv$average));
calc_variance(rv);
rv$prev_avg = rv$average;
}
});
add_observe_plugin_dependency(VARIANCE, AVERAGE);
}
# Reduced priority since this depends on the average

View file

@ -39,6 +39,7 @@
@load base/frameworks/tunnels
@load base/protocols/conn
@load base/protocols/dhcp
@load base/protocols/dnp3
@load base/protocols/dns
@load base/protocols/ftp

View file

@ -0,0 +1,4 @@
@load ./consts
@load ./main
@load-sigs ./dpd.sig

View file

@ -0,0 +1,20 @@
##! Types, errors, and fields for analyzing DHCP data. A helper file
##! for DHCP analysis scripts.
module DHCP;
export {
## Types of DHCP messages. See RFC 1533.
const message_types = {
[1] = "DHCP_DISCOVER",
[2] = "DHCP_OFFER",
[3] = "DHCP_REQUEST",
[4] = "DHCP_DECLINE",
[5] = "DHCP_ACK",
[6] = "DHCP_NAK",
[7] = "DHCP_RELEASE",
[8] = "DHCP_INFORM",
} &default = function(n: count): string { return fmt("unknown-message-type-%d", n); };
}

View file

@ -0,0 +1,5 @@
signature dhcp_cookie {
ip-proto == udp
payload /^.*\x63\x82\x53\x63/
enable "dhcp"
}

View file

@ -0,0 +1,75 @@
##! Analyzes DHCP traffic in order to log DHCP leases given to clients.
##! This script ignores large swaths of the protocol, since it is rather
##! noisy on most networks, and focuses on the end-result: assigned leases.
##!
##! If you'd like to track known DHCP devices and to log the hostname
##! supplied by the client, see policy/protocols/dhcp/known-devices.bro
@load ./utils.bro
module DHCP;
export {
redef enum Log::ID += { LOG };
## The record type which contains the column fields of the DHCP log.
type Info: record {
## The earliest time at which a DHCP message over the
## associated connection is observed.
ts: time &log;
## A unique identifier of the connection over which DHCP is
## occuring.
uid: string &log;
## The connection's 4-tuple of endpoint addresses/ports.
id: conn_id &log;
## Client's hardware address.
mac: string &log &optional;
## Client's actual assigned IP address.
assigned_ip: addr &log &optional;
## IP address lease interval.
lease_time: interval &log &optional;
## A random number choosen by the client for this transaction.
trans_id: count &log;
};
## Event that can be handled to access the DHCP
## record as it is sent on to the logging framework.
global log_dhcp: event(rec: Info);
}
# Add the dhcp info to the connection record
redef record connection += {
dhcp: Info &optional;
};
# 67/udp is the server's port, 68/udp the client.
const ports = { 67/udp, 68/udp };
redef likely_server_ports += { 67/udp };
event bro_init()
{
Log::create_stream(DHCP::LOG, [$columns=Info, $ev=log_dhcp]);
Analyzer::register_for_ports(Analyzer::ANALYZER_DHCP, ports);
}
event dhcp_ack(c: connection, msg: dhcp_msg, mask: addr, router: dhcp_router_list, lease: interval, serv_addr: addr, host_name: string)
{
local info: Info;
info$ts = network_time();
info$id = c$id;
info$uid = c$uid;
info$lease_time = lease;
info$trans_id = msg$xid;
if ( msg$h_addr != "" )
info$mac = msg$h_addr;
if ( reverse_ip(msg$yiaddr) != 0.0.0.0 )
info$assigned_ip = reverse_ip(msg$yiaddr);
else
info$assigned_ip = c$id$orig_h;
c$dhcp = info;
Log::write(DHCP::LOG, c$dhcp);
}

View file

@ -0,0 +1,21 @@
##! Utilities specific for DHCP processing.
@load ./main
module DHCP;
export {
## Reverse the octets of an IPv4 IP.
##
## ip: An :bro:type:`addr` IPv4 address.
##
## Returns: A reversed addr.
global reverse_ip: function(ip: addr): addr;
}
function reverse_ip(ip: addr): addr
{
local octets = split(cat(ip), /\./);
return to_addr(cat(octets[4], ".", octets[3], ".", octets[2], ".", octets[1]));
}

View file

@ -137,8 +137,9 @@ function log_record(info: Info)
}
timeout 15secs
{
Reporter::info(fmt("SSL delay tokens not released in time (%s tokens remaining)",
|info$delay_tokens|));
# We are just going to log the record anyway.
delete info$delay_tokens;
log_record(info);
}
}
}

View file

@ -28,7 +28,7 @@ event Dir::monitor_ev(dir: string, last_files: set[string],
callback: function(fname: string),
poll_interval: interval)
{
when ( local result = Exec::run([$cmd=fmt("ls -i \"%s/\"", str_shell_escape(dir))]) )
when ( local result = Exec::run([$cmd=fmt("ls -i -1 \"%s/\"", str_shell_escape(dir))]) )
{
if ( result$exit_code != 0 )
{

View file

@ -163,6 +163,7 @@ function run(cmd: Command): Result
Input::add_event([$name=cmd$uid,
$source=fmt("%s |", cmd$cmd),
$reader=Input::READER_RAW,
$mode=Input::STREAM,
$fields=Exec::OneLine,
$ev=Exec::line,
$want_record=F,