Merge remote-tracking branch 'origin/topic/bernhard/metrics-samples'

Closes #1003.

* origin/topic/bernhard/metrics-samples:
  finishing touches, make test more robust, rename function in last again
  change names of data structures after talking with seth
  make last plugin nicer and samplify sqli detector
  add tests for sampler
  reservoir sampler. untested.
This commit is contained in:
Robin Sommer 2013-05-15 16:09:31 -07:00
commit f76446fb4e
10 changed files with 347 additions and 29 deletions

View file

@ -1,4 +1,9 @@
2.1-626 | 2013-05-15 16:09:31 -0700
* Add "reservoir" sampler for SumStats framework. This maintains
a set of N uniquely distributed random samples. (Bernhard Amann)
2.1-619 | 2013-05-15 16:01:42 -0700
* SQLite reader and writer combo. This allows to read/write

View file

@ -1 +1 @@
2.1-619
2.1-626

View file

@ -1,4 +1,5 @@
@load ./average
@load ./last
@load ./max
@load ./min
@load ./sample

View file

@ -0,0 +1,54 @@
@load base/frameworks/sumstats
@load base/utils/queue
module SumStats;
export {
redef enum Calculation += {
## Keep last X observations in a queue
LAST
};
redef record Reducer += {
## number of elements to keep.
num_last_elements: count &default=0;
};
redef record ResultVal += {
## This is the queue where elements are maintained. Use the
## :bro:see:`SumStats::get_elements` function to get a vector of the current element values.
last_elements: Queue::Queue &optional;
};
## Get a vector of element values from a ResultVal.
global get_last: function(rv: ResultVal): vector of Observation;
}
function get_last(rv: ResultVal): vector of Observation
{
local s: vector of Observation = vector();
if ( rv?$last_elements )
Queue::get_vector(rv$last_elements, s);
return s;
}
hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal)
{
if ( LAST in r$apply && r$num_last_elements > 0 )
{
if ( ! rv?$last_elements )
rv$last_elements = Queue::init([$max_len=r$num_last_elements]);
Queue::put(rv$last_elements, obs);
}
}
hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal)
{
# Merge $samples
if ( rv1?$last_elements && rv2?$last_elements )
result$last_elements = Queue::merge(rv1$last_elements, rv2$last_elements);
else if ( rv1?$last_elements )
result$last_elements = rv1$last_elements;
else if ( rv2?$last_elements )
result$last_elements = rv2$last_elements;
}

View file

@ -1,49 +1,120 @@
@load base/frameworks/sumstats/main
@load base/utils/queue
module SumStats;
export {
redef enum Calculation += {
## Get uniquely distributed random samples from the observation stream.
SAMPLE
};
redef record Reducer += {
## A number of sample Observations to collect.
samples: count &default=0;
num_samples: count &default=0;
};
redef record ResultVal += {
## This is the queue where samples are maintained. Use the
## :bro:see:`SumStats::get_samples` function to get a vector of the samples.
samples: Queue::Queue &optional;
};
## This is the vector in which the samples are maintained.
samples: vector of Observation &default=vector();
## Get a vector of sample Observation values from a ResultVal.
global get_samples: function(rv: ResultVal): vector of Observation;
## Number of total observed elements.
sample_elements: count &default=0;
};
}
function get_samples(rv: ResultVal): vector of Observation
redef record ResultVal += {
# Internal use only. This is not meant to be publically available
# and just a copy of num_samples from the Reducer. Needed for availability
# in the compose hook.
num_samples: count &default=0;
};
hook init_resultval_hook(r: Reducer, rv: ResultVal)
{
local s: vector of Observation = vector();
if ( rv?$samples )
Queue::get_vector(rv$samples, s);
return s;
if ( SAMPLE in r$apply )
rv$num_samples = r$num_samples;
}
function sample_add_sample(obs:Observation, rv: ResultVal)
{
++rv$sample_elements;
if ( |rv$samples| < rv$num_samples )
rv$samples[|rv$samples|] = obs;
else
{
local ra = rand(rv$sample_elements);
if ( ra < rv$num_samples )
rv$samples[ra] = obs;
}
}
hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal)
{
if ( r$samples > 0 )
if ( SAMPLE in r$apply )
{
if ( ! rv?$samples )
rv$samples = Queue::init([$max_len=r$samples]);
Queue::put(rv$samples, obs);
sample_add_sample(obs, rv);
}
}
hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal)
{
# Merge $samples
if ( rv1?$samples && rv2?$samples )
result$samples = Queue::merge(rv1$samples, rv2$samples);
else if ( rv1?$samples )
result$samples = rv1$samples;
else if ( rv2?$samples )
result$samples = rv2$samples;
if ( rv1$num_samples != rv2$num_samples )
{
Reporter::error("Merging sample sets with differing sizes is not supported");
return;
}
local num_samples = rv1$num_samples;
result$num_samples = num_samples;
if ( |rv1$samples| > num_samples || |rv2$samples| > num_samples )
{
Reporter::error("Sample vector with too many elements. Aborting.");
return;
}
if ( |rv1$samples| != num_samples && |rv2$samples| < num_samples )
{
if ( |rv1$samples| != rv1$sample_elements || |rv2$samples| < rv2$sample_elements )
{
Reporter::error("Mismatch in sample element size and tracking. Aborting merge");
return;
}
for ( i in rv1$samples )
sample_add_sample(rv1$samples[i], result);
for ( i in rv2$samples)
sample_add_sample(rv2$samples[i], result);
}
else
{
local other_vector: vector of Observation;
local othercount: count;
if ( rv1$sample_elements > rv2$sample_elements )
{
result$samples = copy(rv1$samples);
other_vector = rv2$samples;
othercount = rv2$sample_elements;
}
else
{
result$samples = copy(rv2$samples);
other_vector = rv1$samples;
othercount = rv1$sample_elements;
}
local totalcount = rv1$sample_elements + rv2$sample_elements;
result$sample_elements = totalcount;
for ( i in other_vector )
{
if ( rand(totalcount) <= othercount )
result$samples[i] = other_vector[i];
}
}
}

View file

@ -63,7 +63,7 @@ event bro_init() &priority=3
# Add filters to the metrics so that the metrics framework knows how to
# determine when it looks like an actual attack and how to respond when
# thresholds are crossed.
local r1: SumStats::Reducer = [$stream="http.sqli.attacker", $apply=set(SumStats::SUM), $samples=collect_SQLi_samples];
local r1: SumStats::Reducer = [$stream="http.sqli.attacker", $apply=set(SumStats::SUM, SumStats::SAMPLE), $num_samples=collect_SQLi_samples];
SumStats::create([$epoch=sqli_requests_interval,
$reducers=set(r1),
$threshold_val(key: SumStats::Key, result: SumStats::Result) =
@ -76,12 +76,12 @@ event bro_init() &priority=3
local r = result["http.sqli.attacker"];
NOTICE([$note=SQL_Injection_Attacker,
$msg="An SQL injection attacker was discovered!",
$email_body_sections=vector(format_sqli_samples(SumStats::get_samples(r))),
$email_body_sections=vector(format_sqli_samples(r$samples)),
$src=key$host,
$identifier=cat(key$host)]);
}]);
local r2: SumStats::Reducer = [$stream="http.sqli.victim", $apply=set(SumStats::SUM), $samples=collect_SQLi_samples];
local r2: SumStats::Reducer = [$stream="http.sqli.victim", $apply=set(SumStats::SUM, SumStats::SAMPLE), $num_samples=collect_SQLi_samples];
SumStats::create([$epoch=sqli_requests_interval,
$reducers=set(r2),
$threshold_val(key: SumStats::Key, result: SumStats::Result) =
@ -94,7 +94,7 @@ event bro_init() &priority=3
local r = result["http.sqli.victim"];
NOTICE([$note=SQL_Injection_Victim,
$msg="An SQL injection victim was discovered!",
$email_body_sections=vector(format_sqli_samples(SumStats::get_samples(r))),
$email_body_sections=vector(format_sqli_samples(r$samples)),
$src=key$host,
$identifier=cat(key$host)]);
}]);

View file

@ -0,0 +1,18 @@
1
1.2.3.4
10.10.10.10
2
2
34
6.5.4.3
7.2.1.5
[num=1, dbl=<uninitialized>, str=<uninitialized>]
[num=2, dbl=<uninitialized>, str=<uninitialized>]
[num=22, dbl=<uninitialized>, str=<uninitialized>]
[num=5, dbl=<uninitialized>, str=<uninitialized>]
[num=5, dbl=<uninitialized>, str=<uninitialized>]
[num=5, dbl=<uninitialized>, str=<uninitialized>]
[num=52, dbl=<uninitialized>, str=<uninitialized>]
[num=91, dbl=<uninitialized>, str=<uninitialized>]
[num=91, dbl=<uninitialized>, str=<uninitialized>]
[num=94, dbl=<uninitialized>, str=<uninitialized>]

View file

@ -0,0 +1,9 @@
6.5.4.3
[[num=2, dbl=<uninitialized>, str=<uninitialized>]]
1
1.2.3.4
[[num=5, dbl=<uninitialized>, str=<uninitialized>], [num=51, dbl=<uninitialized>, str=<uninitialized>]]
20
7.2.1.5
[[num=1, dbl=<uninitialized>, str=<uninitialized>]]
1

View file

@ -0,0 +1,113 @@
# @TEST-SERIALIZE: comm
#
# @TEST-EXEC: btest-bg-run manager-1 BROPATH=$BROPATH:.. CLUSTER_NODE=manager-1 bro %INPUT
# @TEST-EXEC: sleep 1
# @TEST-EXEC: btest-bg-run worker-1 BROPATH=$BROPATH:.. CLUSTER_NODE=worker-1 bro %INPUT
# @TEST-EXEC: btest-bg-run worker-2 BROPATH=$BROPATH:.. CLUSTER_NODE=worker-2 bro %INPUT
# @TEST-EXEC: btest-bg-wait 15
# @TEST-EXEC: cat manager-1/.stdout | sort > out
# @TEST-EXEC: btest-diff out
@TEST-START-FILE cluster-layout.bro
redef Cluster::nodes = {
["manager-1"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=37757/tcp, $workers=set("worker-1", "worker-2")],
["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=37760/tcp, $manager="manager-1", $interface="eth0"],
["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=37761/tcp, $manager="manager-1", $interface="eth1"],
};
@TEST-END-FILE
redef Log::default_rotation_interval = 0secs;
global n = 0;
event bro_init() &priority=5
{
local r1: SumStats::Reducer = [$stream="test", $apply=set(SumStats::SAMPLE), $num_samples=5];
SumStats::create([$epoch=5secs,
$reducers=set(r1),
$epoch_finished(rt: SumStats::ResultTable) =
{
for ( key in rt )
{
print key$host;
local r = rt[key]["test"];
for ( sample in r$samples ) {
print r$samples[sample];
}
print r$sample_elements;
}
terminate();
}]);
}
event remote_connection_closed(p: event_peer)
{
terminate();
}
global ready_for_data: event();
redef Cluster::manager2worker_events += /^ready_for_data$/;
event ready_for_data()
{
if ( Cluster::node == "worker-1" )
{
SumStats::observe("test", [$host=1.2.3.4], [$num=5]);
SumStats::observe("test", [$host=1.2.3.4], [$num=22]);
SumStats::observe("test", [$host=1.2.3.4], [$num=94]);
SumStats::observe("test", [$host=1.2.3.4], [$num=50]);
# I checked the random numbers. seems legit.
SumStats::observe("test", [$host=1.2.3.4], [$num=51]);
SumStats::observe("test", [$host=1.2.3.4], [$num=61]);
SumStats::observe("test", [$host=1.2.3.4], [$num=61]);
SumStats::observe("test", [$host=1.2.3.4], [$num=71]);
SumStats::observe("test", [$host=1.2.3.4], [$num=81]);
SumStats::observe("test", [$host=1.2.3.4], [$num=91]);
SumStats::observe("test", [$host=1.2.3.4], [$num=101]);
SumStats::observe("test", [$host=1.2.3.4], [$num=111]);
SumStats::observe("test", [$host=1.2.3.4], [$num=121]);
SumStats::observe("test", [$host=1.2.3.4], [$num=131]);
SumStats::observe("test", [$host=1.2.3.4], [$num=141]);
SumStats::observe("test", [$host=1.2.3.4], [$num=151]);
SumStats::observe("test", [$host=1.2.3.4], [$num=161]);
SumStats::observe("test", [$host=1.2.3.4], [$num=171]);
SumStats::observe("test", [$host=1.2.3.4], [$num=181]);
SumStats::observe("test", [$host=1.2.3.4], [$num=191]);
SumStats::observe("test", [$host=6.5.4.3], [$num=2]);
SumStats::observe("test", [$host=7.2.1.5], [$num=1]);
}
if ( Cluster::node == "worker-2" )
{
SumStats::observe("test", [$host=1.2.3.4], [$num=75]);
SumStats::observe("test", [$host=1.2.3.4], [$num=30]);
SumStats::observe("test", [$host=1.2.3.4], [$num=3]);
SumStats::observe("test", [$host=1.2.3.4], [$num=57]);
SumStats::observe("test", [$host=1.2.3.4], [$num=52]);
SumStats::observe("test", [$host=1.2.3.4], [$num=61]);
SumStats::observe("test", [$host=1.2.3.4], [$num=95]);
SumStats::observe("test", [$host=1.2.3.4], [$num=95]);
SumStats::observe("test", [$host=1.2.3.4], [$num=95]);
SumStats::observe("test", [$host=1.2.3.4], [$num=95]);
SumStats::observe("test", [$host=1.2.3.4], [$num=95]);
SumStats::observe("test", [$host=1.2.3.4], [$num=95]);
SumStats::observe("test", [$host=1.2.3.4], [$num=95]);
SumStats::observe("test", [$host=1.2.3.4], [$num=95]);
SumStats::observe("test", [$host=6.5.4.3], [$num=5]);
SumStats::observe("test", [$host=7.2.1.5], [$num=91]);
SumStats::observe("test", [$host=10.10.10.10], [$num=5]);
}
}
@if ( Cluster::local_node_type() == Cluster::MANAGER )
global peer_count = 0;
event remote_connection_handshake_done(p: event_peer) &priority=-5
{
++peer_count;
if ( peer_count == 2 )
event ready_for_data();
}
@endif

View file

@ -0,0 +1,47 @@
# @TEST-EXEC: bro %INPUT
# @TEST-EXEC: btest-diff .stdout
event bro_init() &priority=5
{
local r1: SumStats::Reducer = [$stream="test.metric",
$apply=set(SumStats::SAMPLE), $num_samples=2];
SumStats::create([$epoch=3secs,
$reducers=set(r1),
$epoch_finished(data: SumStats::ResultTable) =
{
for ( key in data )
{
print key$host;
local r = data[key]["test.metric"];
print r$samples;
print r$sample_elements;
}
}
]);
SumStats::observe("test.metric", [$host=1.2.3.4], [$num=5]);
SumStats::observe("test.metric", [$host=1.2.3.4], [$num=22]);
SumStats::observe("test.metric", [$host=1.2.3.4], [$num=94]);
SumStats::observe("test.metric", [$host=1.2.3.4], [$num=50]);
# I checked the random numbers. seems legit.
SumStats::observe("test.metric", [$host=1.2.3.4], [$num=51]);
SumStats::observe("test.metric", [$host=1.2.3.4], [$num=51]);
SumStats::observe("test.metric", [$host=1.2.3.4], [$num=51]);
SumStats::observe("test.metric", [$host=1.2.3.4], [$num=51]);
SumStats::observe("test.metric", [$host=1.2.3.4], [$num=51]);
SumStats::observe("test.metric", [$host=1.2.3.4], [$num=51]);
SumStats::observe("test.metric", [$host=1.2.3.4], [$num=51]);
SumStats::observe("test.metric", [$host=1.2.3.4], [$num=51]);
SumStats::observe("test.metric", [$host=1.2.3.4], [$num=51]);
SumStats::observe("test.metric", [$host=1.2.3.4], [$num=51]);
SumStats::observe("test.metric", [$host=1.2.3.4], [$num=51]);
SumStats::observe("test.metric", [$host=1.2.3.4], [$num=51]);
SumStats::observe("test.metric", [$host=1.2.3.4], [$num=51]);
SumStats::observe("test.metric", [$host=1.2.3.4], [$num=51]);
SumStats::observe("test.metric", [$host=1.2.3.4], [$num=51]);
SumStats::observe("test.metric", [$host=1.2.3.4], [$num=51]);
SumStats::observe("test.metric", [$host=6.5.4.3], [$num=2]);
SumStats::observe("test.metric", [$host=7.2.1.5], [$num=1]);
}