mirror of
https://github.com/zeek/zeek.git
synced 2025-10-15 04:58:21 +00:00
Merge remote-tracking branch 'origin/topic/seth/sumstats-updates'
* origin/topic/seth/sumstats-updates: Still fixing bugs in sumstats updated api cluster support. Hopefully fix the SumStats cluster support. Fix the SumStats top-k plugin and test. Updates for SumStats API to deal with high memory stats. Beginning rework of SumStats API. Tiny fix to account for missing str field (not sure how this happens yet) Add server samples to SSH bruteforce detection. Fix a reporter message in sumstats. SumStats changes to how thresholding works to simplify and reduce memory use. More adjustments to try and correct SumStats memory use. Hopefully fixing a strange error. Large update for the SumStats framework.
This commit is contained in:
commit
56de65461e
48 changed files with 1196 additions and 584 deletions
|
@ -23,16 +23,16 @@ global n = 0;
|
|||
event bro_init() &priority=5
|
||||
{
|
||||
local r1: SumStats::Reducer = [$stream="test", $apply=set(SumStats::SUM, SumStats::MIN, SumStats::MAX, SumStats::AVERAGE, SumStats::STD_DEV, SumStats::VARIANCE, SumStats::UNIQUE)];
|
||||
SumStats::create([$epoch=5secs,
|
||||
SumStats::create([$name="test",
|
||||
$epoch=5secs,
|
||||
$reducers=set(r1),
|
||||
$epoch_finished(rt: SumStats::ResultTable) =
|
||||
$epoch_result(ts: time, key: SumStats::Key, result: SumStats::Result) =
|
||||
{
|
||||
local r = result["test"];
|
||||
print fmt("Host: %s - num:%d - sum:%.1f - avg:%.1f - max:%.1f - min:%.1f - var:%.1f - std_dev:%.1f - unique:%d", key$host, r$num, r$sum, r$average, r$max, r$min, r$variance, r$std_dev, r$unique);
|
||||
},
|
||||
$epoch_finished(ts: time) =
|
||||
{
|
||||
for ( key in rt )
|
||||
{
|
||||
local r = rt[key]["test"];
|
||||
print fmt("Host: %s - num:%d - sum:%.1f - avg:%.1f - max:%.1f - min:%.1f - var:%.1f - std_dev:%.1f - unique:%d", key$host, r$num, r$sum, r$average, r$max, r$min, r$variance, r$std_dev, r$unique);
|
||||
}
|
||||
|
||||
terminate();
|
||||
}]);
|
||||
}
|
||||
|
|
|
@ -11,16 +11,14 @@ event bro_init() &priority=5
|
|||
SumStats::MIN,
|
||||
SumStats::STD_DEV,
|
||||
SumStats::UNIQUE)];
|
||||
SumStats::create([$epoch=3secs,
|
||||
$reducers=set(r1),
|
||||
$epoch_finished(data: SumStats::ResultTable) =
|
||||
{
|
||||
for ( key in data )
|
||||
{
|
||||
local r = data[key]["test.metric"];
|
||||
print fmt("Host: %s - num:%d - sum:%.1f - var:%.1f - avg:%.1f - max:%.1f - min:%.1f - std_dev:%.1f - unique:%d", key$host, r$num, r$sum, r$variance, r$average, r$max, r$min, r$std_dev, r$unique);
|
||||
}
|
||||
}
|
||||
SumStats::create([$name="test",
|
||||
$epoch=3secs,
|
||||
$reducers=set(r1),
|
||||
$epoch_result(ts: time, key: SumStats::Key, result: SumStats::Result) =
|
||||
{
|
||||
local r = result["test.metric"];
|
||||
print fmt("Host: %s - num:%d - sum:%.1f - var:%.1f - avg:%.1f - max:%.1f - min:%.1f - std_dev:%.1f - unique:%d", key$host, r$num, r$sum, r$variance, r$average, r$max, r$min, r$std_dev, r$unique);
|
||||
}
|
||||
]);
|
||||
|
||||
SumStats::observe("test.metric", [$host=1.2.3.4], [$num=5]);
|
||||
|
|
|
@ -20,20 +20,23 @@ redef Log::default_rotation_interval = 0secs;
|
|||
event bro_init() &priority=5
|
||||
{
|
||||
local r1: SumStats::Reducer = [$stream="test.metric", $apply=set(SumStats::SUM)];
|
||||
SumStats::create([$epoch=10secs,
|
||||
SumStats::create([$name="test",
|
||||
$epoch=10secs,
|
||||
$reducers=set(r1),
|
||||
$epoch_finished(data: SumStats::ResultTable) =
|
||||
{
|
||||
print "End of epoch handler was called";
|
||||
for ( res in data )
|
||||
print data[res]["test.metric"]$sum;
|
||||
terminate();
|
||||
},
|
||||
$epoch_result(ts: time, key: SumStats::Key, result: SumStats::Result) =
|
||||
{
|
||||
print result["test.metric"]$sum;
|
||||
},
|
||||
$epoch_finished(ts: time) =
|
||||
{
|
||||
print "End of epoch handler was called";
|
||||
terminate();
|
||||
},
|
||||
$threshold_val(key: SumStats::Key, result: SumStats::Result) =
|
||||
{
|
||||
return double_to_count(result["test.metric"]$sum);
|
||||
return result["test.metric"]$sum;
|
||||
},
|
||||
$threshold=100,
|
||||
$threshold=100.0,
|
||||
$threshold_crossed(key: SumStats::Key, result: SumStats::Result) =
|
||||
{
|
||||
print fmt("A test metric threshold was crossed with a value of: %.1f", result["test.metric"]$sum);
|
||||
|
|
|
@ -0,0 +1,96 @@
|
|||
# @TEST-SERIALIZE: comm
|
||||
#
|
||||
# @TEST-EXEC: btest-bg-run manager-1 BROPATH=$BROPATH:.. CLUSTER_NODE=manager-1 bro %INPUT
|
||||
# @TEST-EXEC: sleep 1
|
||||
# @TEST-EXEC: btest-bg-run worker-1 BROPATH=$BROPATH:.. CLUSTER_NODE=worker-1 bro %INPUT
|
||||
# @TEST-EXEC: btest-bg-run worker-2 BROPATH=$BROPATH:.. CLUSTER_NODE=worker-2 bro %INPUT
|
||||
# @TEST-EXEC: btest-bg-wait 15
|
||||
|
||||
# @TEST-EXEC: btest-diff manager-1/.stdout
|
||||
|
||||
@TEST-START-FILE cluster-layout.bro
|
||||
redef Cluster::nodes = {
|
||||
["manager-1"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=37757/tcp, $workers=set("worker-1", "worker-2")],
|
||||
["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=37760/tcp, $manager="manager-1", $interface="eth0"],
|
||||
["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=37761/tcp, $manager="manager-1", $interface="eth1"],
|
||||
};
|
||||
@TEST-END-FILE
|
||||
|
||||
redef Log::default_rotation_interval = 0secs;
|
||||
|
||||
global n = 0;
|
||||
|
||||
event bro_init() &priority=5
|
||||
{
|
||||
local r1 = SumStats::Reducer($stream="test", $apply=set(SumStats::SUM, SumStats::MIN, SumStats::MAX, SumStats::AVERAGE, SumStats::STD_DEV, SumStats::VARIANCE, SumStats::UNIQUE));
|
||||
SumStats::create([$name="test sumstat",
|
||||
$epoch=1hr,
|
||||
$reducers=set(r1)]);
|
||||
}
|
||||
|
||||
event remote_connection_closed(p: event_peer)
|
||||
{
|
||||
terminate();
|
||||
}
|
||||
|
||||
global ready_for_data: event();
|
||||
redef Cluster::manager2worker_events += /^ready_for_data$/;
|
||||
|
||||
event ready_for_data()
|
||||
{
|
||||
if ( Cluster::node == "worker-1" )
|
||||
{
|
||||
SumStats::observe("test", [$host=1.2.3.4], [$num=34]);
|
||||
SumStats::observe("test", [$host=1.2.3.4], [$num=30]);
|
||||
SumStats::observe("test", [$host=6.5.4.3], [$num=1]);
|
||||
SumStats::observe("test", [$host=7.2.1.5], [$num=54]);
|
||||
}
|
||||
if ( Cluster::node == "worker-2" )
|
||||
{
|
||||
SumStats::observe("test", [$host=1.2.3.4], [$num=75]);
|
||||
SumStats::observe("test", [$host=1.2.3.4], [$num=30]);
|
||||
SumStats::observe("test", [$host=7.2.1.5], [$num=91]);
|
||||
SumStats::observe("test", [$host=10.10.10.10], [$num=5]);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
event on_demand2()
|
||||
{
|
||||
local host = 7.2.1.5;
|
||||
when ( local result = SumStats::request_key("test sumstat", [$host=host]) )
|
||||
{
|
||||
print "SumStat key request";
|
||||
if ( "test" in result )
|
||||
print fmt(" Host: %s -> %.0f", host, result["test"]$sum);
|
||||
terminate();
|
||||
}
|
||||
}
|
||||
|
||||
event on_demand()
|
||||
{
|
||||
#when ( local results = SumStats::request("test sumstat") )
|
||||
# {
|
||||
# print "Complete SumStat request";
|
||||
# print fmt(" Host: %s -> %.0f", 6.5.4.3, results[[$host=6.5.4.3]]["test"]$sum);
|
||||
# print fmt(" Host: %s -> %.0f", 10.10.10.10, results[[$host=10.10.10.10]]["test"]$sum);
|
||||
# print fmt(" Host: %s -> %.0f", 1.2.3.4, results[[$host=1.2.3.4]]["test"]$sum);
|
||||
# print fmt(" Host: %s -> %.0f", 7.2.1.5, results[[$host=7.2.1.5]]["test"]$sum);
|
||||
|
||||
event on_demand2();
|
||||
# }
|
||||
}
|
||||
|
||||
global peer_count = 0;
|
||||
event remote_connection_handshake_done(p: event_peer) &priority=-5
|
||||
{
|
||||
++peer_count;
|
||||
if ( peer_count == 2 )
|
||||
{
|
||||
if ( Cluster::local_node_type() == Cluster::MANAGER )
|
||||
event ready_for_data();
|
||||
|
||||
schedule 1sec { on_demand() };
|
||||
}
|
||||
}
|
||||
|
46
testing/btest/scripts/base/frameworks/sumstats/on-demand.bro
Normal file
46
testing/btest/scripts/base/frameworks/sumstats/on-demand.bro
Normal file
|
@ -0,0 +1,46 @@
|
|||
# @TEST-EXEC: bro %INPUT
|
||||
# @TEST-EXEC: btest-diff .stdout
|
||||
|
||||
redef exit_only_after_terminate=T;
|
||||
|
||||
|
||||
## Requesting a full sumstats resulttable is not supported yet.
|
||||
#event on_demand()
|
||||
# {
|
||||
# when ( local results = SumStats::request("test") )
|
||||
# {
|
||||
# print "Complete SumStat request";
|
||||
# for ( key in results )
|
||||
# {
|
||||
# print fmt(" Host: %s -> %.0f", key$host, results[key]["test.reducer"]$sum);
|
||||
# }
|
||||
# }
|
||||
# }
|
||||
|
||||
event on_demand_key()
|
||||
{
|
||||
local host = 1.2.3.4;
|
||||
when ( local result = SumStats::request_key("test", [$host=host]) )
|
||||
{
|
||||
print fmt("Key request for %s", host);
|
||||
print fmt(" Host: %s -> %.0f", host, result["test.reducer"]$sum);
|
||||
terminate();
|
||||
}
|
||||
}
|
||||
|
||||
event bro_init() &priority=5
|
||||
{
|
||||
local r1: SumStats::Reducer = [$stream="test.reducer",
|
||||
$apply=set(SumStats::SUM)];
|
||||
SumStats::create([$name="test",
|
||||
$epoch=1hr,
|
||||
$reducers=set(r1)]);
|
||||
|
||||
# Seed some data but notice there are no callbacks defined in the sumstat!
|
||||
SumStats::observe("test.reducer", [$host=1.2.3.4], [$num=42]);
|
||||
SumStats::observe("test.reducer", [$host=4.3.2.1], [$num=7]);
|
||||
|
||||
#schedule 0.1 secs { on_demand() };
|
||||
schedule 1 secs { on_demand_key() };
|
||||
}
|
||||
|
|
@ -5,8 +5,7 @@
|
|||
# @TEST-EXEC: btest-bg-run worker-1 BROPATH=$BROPATH:.. CLUSTER_NODE=worker-1 bro %INPUT
|
||||
# @TEST-EXEC: btest-bg-run worker-2 BROPATH=$BROPATH:.. CLUSTER_NODE=worker-2 bro %INPUT
|
||||
# @TEST-EXEC: btest-bg-wait 15
|
||||
# @TEST-EXEC: cat manager-1/.stdout | sort > out
|
||||
# @TEST-EXEC: btest-diff out
|
||||
# @TEST-EXEC: btest-diff manager-1/.stdout
|
||||
|
||||
@TEST-START-FILE cluster-layout.bro
|
||||
redef Cluster::nodes = {
|
||||
|
@ -18,25 +17,24 @@ redef Cluster::nodes = {
|
|||
|
||||
redef Log::default_rotation_interval = 0secs;
|
||||
|
||||
global n = 0;
|
||||
|
||||
event bro_init() &priority=5
|
||||
{
|
||||
local r1: SumStats::Reducer = [$stream="test", $apply=set(SumStats::SAMPLE), $num_samples=5];
|
||||
SumStats::create([$epoch=5secs,
|
||||
SumStats::create([$name="test",
|
||||
$epoch=5secs,
|
||||
$reducers=set(r1),
|
||||
$epoch_finished(rt: SumStats::ResultTable) =
|
||||
$epoch_result(ts: time, key: SumStats::Key, result: SumStats::Result) =
|
||||
{
|
||||
for ( key in rt )
|
||||
{
|
||||
print key$host;
|
||||
local r = rt[key]["test"];
|
||||
for ( sample in r$samples ) {
|
||||
print r$samples[sample];
|
||||
}
|
||||
print r$sample_elements;
|
||||
}
|
||||
local r = result["test"];
|
||||
print fmt("Host: %s Sampled observations: %d", key$host, r$sample_elements);
|
||||
local sample_nums: vector of count = vector();
|
||||
for ( sample in r$samples )
|
||||
sample_nums[|sample_nums|] =r$samples[sample]$num;
|
||||
|
||||
print fmt(" %s", sort(sample_nums));
|
||||
},
|
||||
$epoch_finished(ts: time) =
|
||||
{
|
||||
terminate();
|
||||
}]);
|
||||
}
|
||||
|
|
|
@ -5,19 +5,16 @@ event bro_init() &priority=5
|
|||
{
|
||||
local r1: SumStats::Reducer = [$stream="test.metric",
|
||||
$apply=set(SumStats::SAMPLE), $num_samples=2];
|
||||
SumStats::create([$epoch=3secs,
|
||||
$reducers=set(r1),
|
||||
$epoch_finished(data: SumStats::ResultTable) =
|
||||
{
|
||||
for ( key in data )
|
||||
{
|
||||
print key$host;
|
||||
local r = data[key]["test.metric"];
|
||||
print r$samples;
|
||||
print r$sample_elements;
|
||||
}
|
||||
}
|
||||
]);
|
||||
SumStats::create([$name="test",
|
||||
$epoch=3secs,
|
||||
$reducers=set(r1),
|
||||
$epoch_result(ts: time, key: SumStats::Key, result: SumStats::Result) =
|
||||
{
|
||||
print key$host;
|
||||
local r = result["test.metric"];
|
||||
print r$samples;
|
||||
print r$sample_elements;
|
||||
}]);
|
||||
|
||||
SumStats::observe("test.metric", [$host=1.2.3.4], [$num=5]);
|
||||
SumStats::observe("test.metric", [$host=1.2.3.4], [$num=22]);
|
||||
|
|
|
@ -8,14 +8,15 @@ redef enum Notice::Type += {
|
|||
event bro_init() &priority=5
|
||||
{
|
||||
local r1: SumStats::Reducer = [$stream="test.metric", $apply=set(SumStats::SUM)];
|
||||
SumStats::create([$epoch=3secs,
|
||||
SumStats::create([$name="test1",
|
||||
$epoch=3secs,
|
||||
$reducers=set(r1),
|
||||
#$threshold_val = SumStats::sum_threshold("test.metric"),
|
||||
$threshold_val(key: SumStats::Key, result: SumStats::Result) =
|
||||
{
|
||||
return double_to_count(result["test.metric"]$sum);
|
||||
return result["test.metric"]$sum;
|
||||
},
|
||||
$threshold=5,
|
||||
$threshold=5.0,
|
||||
$threshold_crossed(key: SumStats::Key, result: SumStats::Result) =
|
||||
{
|
||||
local r = result["test.metric"];
|
||||
|
@ -24,14 +25,15 @@ event bro_init() &priority=5
|
|||
]);
|
||||
|
||||
local r2: SumStats::Reducer = [$stream="test.metric", $apply=set(SumStats::SUM)];
|
||||
SumStats::create([$epoch=3secs,
|
||||
SumStats::create([$name="test2",
|
||||
$epoch=3secs,
|
||||
$reducers=set(r2),
|
||||
#$threshold_val = SumStats::sum_threshold("test.metric"),
|
||||
$threshold_val(key: SumStats::Key, result: SumStats::Result) =
|
||||
{
|
||||
return double_to_count(result["test.metric"]$sum);
|
||||
return result["test.metric"]$sum;
|
||||
},
|
||||
$threshold_series=vector(3,6,800),
|
||||
$threshold_series=vector(3.0,6.0,800.0),
|
||||
$threshold_crossed(key: SumStats::Key, result: SumStats::Result) =
|
||||
{
|
||||
local r = result["test.metric"];
|
||||
|
@ -41,19 +43,20 @@ event bro_init() &priority=5
|
|||
|
||||
local r3: SumStats::Reducer = [$stream="test.metric", $apply=set(SumStats::SUM)];
|
||||
local r4: SumStats::Reducer = [$stream="test.metric2", $apply=set(SumStats::SUM)];
|
||||
SumStats::create([$epoch=3secs,
|
||||
SumStats::create([$name="test3",
|
||||
$epoch=3secs,
|
||||
$reducers=set(r3, r4),
|
||||
$threshold_val(key: SumStats::Key, result: SumStats::Result) =
|
||||
{
|
||||
# Calculate a ratio between sums of two reducers.
|
||||
if ( "test.metric2" in result && "test.metric" in result &&
|
||||
result["test.metric"]$sum > 0 )
|
||||
return double_to_count(result["test.metric2"]$sum / result["test.metric"]$sum);
|
||||
return result["test.metric2"]$sum / result["test.metric"]$sum;
|
||||
else
|
||||
return 0;
|
||||
return 0.0;
|
||||
},
|
||||
# Looking for metric2 sum to be 5 times the sum of metric
|
||||
$threshold=5,
|
||||
$threshold=5.0,
|
||||
$threshold_crossed(key: SumStats::Key, result: SumStats::Result) =
|
||||
{
|
||||
local thold = result["test.metric2"]$sum / result["test.metric"]$sum;
|
||||
|
|
|
@ -23,27 +23,24 @@ event bro_init() &priority=5
|
|||
{
|
||||
local r1: SumStats::Reducer = [$stream="test.metric",
|
||||
$apply=set(SumStats::TOPK)];
|
||||
SumStats::create([$epoch=5secs,
|
||||
$reducers=set(r1),
|
||||
$epoch_finished(data: SumStats::ResultTable) =
|
||||
{
|
||||
for ( key in data )
|
||||
{
|
||||
local r = data[key]["test.metric"];
|
||||
|
||||
local s: vector of SumStats::Observation;
|
||||
s = topk_get_top(r$topk, 5);
|
||||
|
||||
print fmt("Top entries for key %s", key$str);
|
||||
for ( element in s )
|
||||
{
|
||||
print fmt("Num: %d, count: %d, epsilon: %d", s[element]$num, topk_count(r$topk, s[element]), topk_epsilon(r$topk, s[element]));
|
||||
}
|
||||
|
||||
terminate();
|
||||
}
|
||||
}
|
||||
]);
|
||||
SumStats::create([$name="topk-test",
|
||||
$epoch=5secs,
|
||||
$reducers=set(r1),
|
||||
$epoch_result(ts: time, key: SumStats::Key, result: SumStats::Result) =
|
||||
{
|
||||
local r = result["test.metric"];
|
||||
local s: vector of SumStats::Observation;
|
||||
s = topk_get_top(r$topk, 5);
|
||||
print fmt("Top entries for key %s", key$str);
|
||||
for ( element in s )
|
||||
{
|
||||
print fmt("Num: %d, count: %d, epsilon: %d", s[element]$num, topk_count(r$topk, s[element]), topk_epsilon(r$topk, s[element]));
|
||||
}
|
||||
},
|
||||
$epoch_finished(ts: time) =
|
||||
{
|
||||
terminate();
|
||||
}]);
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -5,26 +5,21 @@ event bro_init() &priority=5
|
|||
{
|
||||
local r1: SumStats::Reducer = [$stream="test.metric",
|
||||
$apply=set(SumStats::TOPK)];
|
||||
SumStats::create([$epoch=3secs,
|
||||
$reducers=set(r1),
|
||||
$epoch_finished(data: SumStats::ResultTable) =
|
||||
{
|
||||
for ( key in data )
|
||||
{
|
||||
local r = data[key]["test.metric"];
|
||||
|
||||
local s: vector of SumStats::Observation;
|
||||
s = topk_get_top(r$topk, 5);
|
||||
|
||||
print fmt("Top entries for key %s", key$str);
|
||||
for ( element in s )
|
||||
{
|
||||
print fmt("Num: %d, count: %d, epsilon: %d", s[element]$num, topk_count(r$topk, s[element]), topk_epsilon(r$topk, s[element]));
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
]);
|
||||
SumStats::create([$name="topk-test",
|
||||
$epoch=3secs,
|
||||
$reducers=set(r1),
|
||||
$epoch_result(ts: time, key: SumStats::Key, result: SumStats::Result) =
|
||||
{
|
||||
local r = result["test.metric"];
|
||||
local s: vector of SumStats::Observation;
|
||||
s = topk_get_top(r$topk, 5);
|
||||
|
||||
print fmt("Top entries for key %s", key$str);
|
||||
for ( element in s )
|
||||
{
|
||||
print fmt("Num: %d, count: %d, epsilon: %d", s[element]$num, topk_count(r$topk, s[element]), topk_epsilon(r$topk, s[element]));
|
||||
}
|
||||
}]);
|
||||
|
||||
|
||||
const loop_v: vector of count = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100};
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue