mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 06:38:20 +00:00
Fix SumStats "last" plugin in cluster mode
This commit is contained in:
parent
e3612960ee
commit
01e17b5ea0
5 changed files with 97 additions and 4 deletions
4
CHANGES
4
CHANGES
|
@ -1,4 +1,8 @@
|
|||
|
||||
2.6-beta2-83 | 2018-11-08 12:25:21 -0600
|
||||
|
||||
* Fix SumStats "last" plugin in cluster mode (Jon Siwek, Corelight)
|
||||
|
||||
2.6-beta2-82 | 2018-11-08 09:38:52 -0600
|
||||
|
||||
* Remove unnecessary Bloom filter empty check (Matthias Vallentin)
|
||||
|
|
2
VERSION
2
VERSION
|
@ -1 +1 @@
|
|||
2.6-beta2-82
|
||||
2.6-beta2-83
|
||||
|
|
|
@ -17,7 +17,8 @@ export {
|
|||
};
|
||||
|
||||
redef record ResultVal += {
|
||||
## This is the queue where elements are maintained. Use the
|
||||
## This is the queue where elements are maintained.
|
||||
## Don't access this value directly, instead use the
|
||||
## :bro:see:`SumStats::get_last` function to get a vector of
|
||||
## the current element values.
|
||||
last_elements: Queue::Queue &optional;
|
||||
|
@ -29,10 +30,21 @@ export {
|
|||
|
||||
function get_last(rv: ResultVal): vector of Observation
|
||||
{
|
||||
local s: vector of Observation = vector();
|
||||
local s: vector of any = vector();
|
||||
|
||||
if ( rv?$last_elements )
|
||||
Queue::get_vector(rv$last_elements, s);
|
||||
return s;
|
||||
|
||||
local rval: vector of Observation = vector();
|
||||
|
||||
for ( i in s )
|
||||
# When using the cluster-ized version of SumStats, Queue's
|
||||
# internal table storage uses "any" type for values, so we need
|
||||
# to cast them here or else they may be left as Broker::Data from
|
||||
# the unserialization process.
|
||||
rval += s[i] as Observation;
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
||||
hook register_observe_plugins()
|
||||
|
|
|
@ -0,0 +1,9 @@
|
|||
node up, worker-1
|
||||
test thresh crossed, [[num=0, dbl=<uninitialized>, str=<uninitialized>]]
|
||||
test thresh crossed, [[num=1, dbl=<uninitialized>, str=<uninitialized>]]
|
||||
test thresh crossed, [[num=2, dbl=<uninitialized>, str=<uninitialized>]]
|
||||
test thresh crossed, [[num=3, dbl=<uninitialized>, str=<uninitialized>]]
|
||||
test thresh crossed, [[num=4, dbl=<uninitialized>, str=<uninitialized>]]
|
||||
test thresh crossed, [[num=5, dbl=<uninitialized>, str=<uninitialized>]]
|
||||
test thresh crossed, [[num=6, dbl=<uninitialized>, str=<uninitialized>]]
|
||||
test thresh crossed, [[num=7, dbl=<uninitialized>, str=<uninitialized>]]
|
|
@ -0,0 +1,68 @@
|
|||
# @TEST-SERIALIZE: comm
|
||||
#
|
||||
# @TEST-EXEC: btest-bg-run manager-1 BROPATH=$BROPATH:.. CLUSTER_NODE=manager-1 bro %INPUT
|
||||
# @TEST-EXEC: btest-bg-run worker-1 BROPATH=$BROPATH:.. CLUSTER_NODE=worker-1 bro %INPUT
|
||||
# @TEST-EXEC: btest-bg-wait 25
|
||||
|
||||
# @TEST-EXEC: btest-diff manager-1/.stdout
|
||||
#
|
||||
@TEST-START-FILE cluster-layout.bro
|
||||
redef Cluster::nodes = {
|
||||
["manager-1"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=37757/tcp],
|
||||
["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=37760/tcp, $manager="manager-1", $interface="eth0"],
|
||||
};
|
||||
@TEST-END-FILE
|
||||
|
||||
global c = 0;
|
||||
|
||||
event do_observe()
|
||||
{
|
||||
print "do observe", c;
|
||||
SumStats::observe("test",
|
||||
[$str=cat(c)],
|
||||
[$num=c]
|
||||
);
|
||||
++c;
|
||||
schedule 0.1secs { do_observe() };
|
||||
}
|
||||
|
||||
event bro_init()
|
||||
{
|
||||
local r1 = SumStats::Reducer($stream="test",
|
||||
$apply=set(SumStats::LAST),
|
||||
$num_last_elements=1
|
||||
);
|
||||
|
||||
SumStats::create([$name="test",
|
||||
$epoch=10secs,
|
||||
$reducers=set(r1),
|
||||
$threshold_val(key: SumStats::Key, result: SumStats::Result): double = { return 2.0; },
|
||||
$threshold = 1.0,
|
||||
$threshold_crossed(key: SumStats::Key, result: SumStats::Result) =
|
||||
{
|
||||
local l = SumStats::get_last(result["test"]);
|
||||
print "test thresh crossed", l;
|
||||
|
||||
if ( l[0]$num == 7 )
|
||||
terminate();
|
||||
}
|
||||
]);
|
||||
}
|
||||
|
||||
event Cluster::node_up(name: string, id: string)
|
||||
{
|
||||
print "node up", name;
|
||||
|
||||
if ( Cluster::node == "worker-1" && name == "manager-1" )
|
||||
schedule 0.1secs { do_observe() };
|
||||
}
|
||||
|
||||
event Cluster::node_down(name: string, id: string)
|
||||
{
|
||||
print "node down", name;
|
||||
}
|
||||
|
||||
event Broker::peer_lost(endpoint: Broker::EndpointInfo, id: string)
|
||||
{
|
||||
terminate();
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue