diff --git a/scripts/base/frameworks/sumstats/cluster.zeek b/scripts/base/frameworks/sumstats/cluster.zeek index 86125884a5..2296a4e38c 100644 --- a/scripts/base/frameworks/sumstats/cluster.zeek +++ b/scripts/base/frameworks/sumstats/cluster.zeek @@ -272,7 +272,8 @@ event SumStats::finish_epoch(ss: SumStat) } # Schedule the next finish_epoch event. - schedule ss$epoch { SumStats::finish_epoch(ss) }; + if ( ss$epoch != 0secs ) + schedule ss$epoch { SumStats::finish_epoch(ss) }; } # This is unlikely to be called often, but it's here in diff --git a/scripts/base/frameworks/sumstats/main.zeek b/scripts/base/frameworks/sumstats/main.zeek index 3f73d278e5..9e5e3cb4d1 100644 --- a/scripts/base/frameworks/sumstats/main.zeek +++ b/scripts/base/frameworks/sumstats/main.zeek @@ -89,16 +89,20 @@ export { ## is no assurance provided as to where the callbacks ## will be executed on clusters. type SumStat: record { - ## An arbitrary name for the sumstat so that it can + ## An arbitrary name for the sumstat so that it can ## be referred to later. name: string; - - ## The interval at which this filter should be "broken" - ## and the *epoch_result* callback called. The + + ## The interval at which this sumstat should be "broken" + ## and the *epoch_result* callback called. The ## results are also reset at this time so any threshold ## based detection needs to be set to a ## value that should be expected to happen within ## this epoch. + ## + ## Passing an epoch of zero (e.g. ``0 secs``) causes this + ## sumstat to be set to manual epochs. You will have to manually + ## end the epoch by calling :zeek:see:`SumStats::next_epoch`. epoch: interval; ## The reducers for the SumStat. @@ -129,12 +133,12 @@ export { threshold_crossed: function(key: SumStats::Key, result: SumStats::Result) &optional; ## A callback that receives each of the results at the - ## end of the analysis epoch. The function will be + ## end of the analysis epoch. The function will be ## called once for each key. epoch_result: function(ts: time, key: SumStats::Key, result: SumStats::Result) &optional; - - ## A callback that will be called when a single collection - ## interval is completed. The *ts* value will be the time of + + ## A callback that will be called when a single collection + ## interval is completed. The *ts* value will be the time of ## when the collection started. epoch_finished: function(ts:time) &optional; }; @@ -156,8 +160,8 @@ export { global observe: function(id: string, key: SumStats::Key, obs: SumStats::Observation); ## Dynamically request a sumstat key. This function should be - ## used sparingly and not as a replacement for the callbacks - ## from the :zeek:see:`SumStats::SumStat` record. The function is only + ## used sparingly and not as a replacement for the callbacks + ## from the :zeek:see:`SumStats::SumStat` record. The function is only ## available for use within "when" statements as an asynchronous ## function. ## @@ -175,6 +179,23 @@ export { ## ## Returns: A string representation of the metric key. global key2str: function(key: SumStats::Key): string; + + ## Manually end the current epoch for a sumstat. Calling this function will + ## cause the end of the epoch processing of sumstats to start. Note that the + ## epoch will not end immidiately - especially in a cluster settings, a number + ## of messages need to be exchanged between the cluster nodes. + ## + ## Note that this function only can be called if the sumstat was created with + ## an epoch time of zero (manual epochs). + ## + ## In a cluster, this function must be called on the manager; it will not have + ## any effect when called on workers. + ## + ## ss_name: SumStat name. + ## + ## Returns: true on success, false on failure. Failures can be: sumstat not found, + ## or sumstat not created for manual epochs. + global next_epoch: function(ss_name: string): bool; } # The function prototype for plugins to do calculations. @@ -248,6 +269,19 @@ global data_added: function(ss: SumStat, key: Key, result: Result); # framework for clustered or non-clustered usage. global finish_epoch: event(ss: SumStat); +function next_epoch(ss_name: string): bool + { + if ( ss_name !in stats_store ) + return F; + + local ss = stats_store[ss_name]; + if ( ss$epoch != 0secs ) + return F; + + event SumStats::finish_epoch(ss); + return T; + } + function key2str(key: Key): string { local out = ""; @@ -331,7 +365,7 @@ function reset(ss: SumStat) } } -# This could potentially recurse forever, but plugin authors +# This could potentially recurse forever, but plugin authors # should be making sure they aren't causing reflexive dependencies. function add_calc_deps(calcs: vector of Calculation, c: Calculation) { @@ -377,8 +411,8 @@ function create(ss: SumStat) if ( calc in calc_deps ) add_calc_deps(reducer$calc_funcs, calc); - # Don't add this calculation to the vector if - # it was already added by something else as a + # Don't add this calculation to the vector if + # it was already added by something else as a # dependency. local skip_calc=F; for ( j in reducer$calc_funcs ) @@ -396,7 +430,10 @@ function create(ss: SumStat) } reset(ss); - schedule ss$epoch { SumStats::finish_epoch(ss) }; + + ## do not schedule epoch if this is set to manual epochs. + if ( ss$epoch != 0secs ) + schedule ss$epoch { SumStats::finish_epoch(ss) }; } function observe(id: string, orig_key: Key, obs: Observation) diff --git a/scripts/base/frameworks/sumstats/non-cluster.zeek b/scripts/base/frameworks/sumstats/non-cluster.zeek index 8146dc50bf..c905d56e37 100644 --- a/scripts/base/frameworks/sumstats/non-cluster.zeek +++ b/scripts/base/frameworks/sumstats/non-cluster.zeek @@ -54,14 +54,15 @@ event SumStats::finish_epoch(ss: SumStat) } } } - + # We can reset here because we know that the reference # to the data will be maintained by the process_epoch_result # event. reset(ss); } - schedule ss$epoch { SumStats::finish_epoch(ss) }; + if ( ss$epoch != 0secs ) + schedule ss$epoch { SumStats::finish_epoch(ss) }; } function data_added(ss: SumStat, key: Key, result: Result) diff --git a/testing/btest/Baseline/scripts.base.frameworks.sumstats.manual-epoch-cluster/manager-1..stdout b/testing/btest/Baseline/scripts.base.frameworks.sumstats.manual-epoch-cluster/manager-1..stdout new file mode 100644 index 0000000000..24adb52783 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.sumstats.manual-epoch-cluster/manager-1..stdout @@ -0,0 +1,11 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +### NOTE: This file has been sorted with diff-sort. +Host: 1.2.3.4 - num:9 - sum:437.0 - avg:48.6 - max:95.0 - min:3.0 - var:758.8 - std_dev:27.5 - unique:8 - hllunique:8 +Host: 10.10.10.10 - num:1 - sum:5.0 - avg:5.0 - max:5.0 - min:5.0 - var:0.0 - std_dev:0.0 - unique:1 - hllunique:1 +Host: 6.5.4.3 - num:2 - sum:6.0 - avg:3.0 - max:5.0 - min:1.0 - var:8.0 - std_dev:2.8 - unique:2 - hllunique:2 +Host: 7.2.1.5 - num:2 - sum:145.0 - avg:72.5 - max:91.0 - min:54.0 - var:684.5 - std_dev:26.2 - unique:2 - hllunique:2 +Performing first epoch, no observations +Performing second epoch with overvations +Sending ready for data +epoch finished, F +epoch finished, T diff --git a/testing/btest/Baseline/scripts.base.frameworks.sumstats.manual-epoch/standalone..stdout b/testing/btest/Baseline/scripts.base.frameworks.sumstats.manual-epoch/standalone..stdout new file mode 100644 index 0000000000..67c235c609 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.sumstats.manual-epoch/standalone..stdout @@ -0,0 +1,8 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +Performing first epoch, no observations +epoch_finished +Performing second epoch with overvations +Host: 1.2.3.4 - num:5 - sum:221.0 - var:1144.2 - avg:44.2 - max:94.0 - min:5.0 - std_dev:33.8 - unique:4 - hllunique:4 +Host: 6.5.4.3 - num:1 - sum:2.0 - var:0.0 - avg:2.0 - max:2.0 - min:2.0 - std_dev:0.0 - unique:1 - hllunique:1 +Host: 7.2.1.5 - num:1 - sum:1.0 - var:0.0 - avg:1.0 - max:1.0 - min:1.0 - std_dev:0.0 - unique:1 - hllunique:1 +epoch_finished diff --git a/testing/btest/scripts/base/frameworks/sumstats/manual-epoch-cluster.zeek b/testing/btest/scripts/base/frameworks/sumstats/manual-epoch-cluster.zeek new file mode 100644 index 0000000000..92db51858b --- /dev/null +++ b/testing/btest/scripts/base/frameworks/sumstats/manual-epoch-cluster.zeek @@ -0,0 +1,123 @@ +# @TEST-PORT: BROKER_PORT1 +# @TEST-PORT: BROKER_PORT2 +# @TEST-PORT: BROKER_PORT3 +# +# @TEST-EXEC: btest-bg-run manager-1 ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=manager-1 zeek -b %INPUT +# @TEST-EXEC: btest-bg-run worker-1 ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=worker-1 zeek -b %INPUT +# @TEST-EXEC: btest-bg-run worker-2 ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=worker-2 zeek -b %INPUT +# @TEST-EXEC: btest-bg-wait 30 + +# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-sort btest-diff manager-1/.stdout + +@load base/frameworks/sumstats +@load base/frameworks/cluster + +@TEST-START-FILE cluster-layout.zeek +redef Cluster::nodes = { + ["manager-1"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT1"))], + ["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT2")), $manager="manager-1", $interface="eth0"], + ["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT3")), $manager="manager-1", $interface="eth1"], +}; +@TEST-END-FILE + +redef Log::default_rotation_interval = 0secs; + +global n = 0; +global did_data = F; + +event zeek_init() &priority=5 + { + local r1: SumStats::Reducer = [$stream="test", $apply=set(SumStats::SUM, SumStats::MIN, SumStats::MAX, SumStats::AVERAGE, SumStats::STD_DEV, SumStats::VARIANCE, SumStats::UNIQUE, SumStats::HLL_UNIQUE)]; + SumStats::create([$name="test", + $epoch=0secs, + $reducers=set(r1), + $epoch_result(ts: time, key: SumStats::Key, result: SumStats::Result) = + { + if ( ! did_data ) return; + local r = result["test"]; + print fmt("Host: %s - num:%d - sum:%.1f - avg:%.1f - max:%.1f - min:%.1f - var:%.1f - std_dev:%.1f - unique:%d - hllunique:%d", key$host, r$num, r$sum, r$average, r$max, r$min, r$variance, r$std_dev, r$unique, r$hll_unique); + }, + $epoch_finished(ts: time) = + { + print "epoch finished", did_data; + if ( did_data ) + terminate(); + }]); + } + +event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) + { + terminate(); + } + +global ready_for_data: event(); + +event ready_for_data() + { + if ( Cluster::node == "worker-1" ) + { + SumStats::observe("test", [$host=1.2.3.4], [$num=34]); + SumStats::observe("test", [$host=1.2.3.4], [$num=30]); + SumStats::observe("test", [$host=6.5.4.3], [$num=1]); + SumStats::observe("test", [$host=7.2.1.5], [$num=54]); + } + if ( Cluster::node == "worker-2" ) + { + SumStats::observe("test", [$host=1.2.3.4], [$num=75]); + SumStats::observe("test", [$host=1.2.3.4], [$num=30]); + SumStats::observe("test", [$host=1.2.3.4], [$num=3]); + SumStats::observe("test", [$host=1.2.3.4], [$num=57]); + SumStats::observe("test", [$host=1.2.3.4], [$num=52]); + SumStats::observe("test", [$host=1.2.3.4], [$num=61]); + SumStats::observe("test", [$host=1.2.3.4], [$num=95]); + SumStats::observe("test", [$host=6.5.4.3], [$num=5]); + SumStats::observe("test", [$host=7.2.1.5], [$num=91]); + SumStats::observe("test", [$host=10.10.10.10], [$num=5]); + } + + did_data = T; + } + +@if ( Cluster::local_node_type() == Cluster::MANAGER ) + +event second_test() + { + print "Performing second epoch with overvations"; + local ret = SumStats::next_epoch("test"); + if ( ! ret ) + print "Return value false"; + } + +event send_ready_for_data() + { + print "Sending ready for data"; + event ready_for_data(); + } + + +event cont_test() + { + print "Performing first epoch, no observations"; + local ret = SumStats::next_epoch("test"); + if ( ! ret ) + print "Return value false"; + schedule 5secs { send_ready_for_data() }; + schedule 10secs { second_test() }; + } + +event zeek_init() &priority=100 + { + Broker::auto_publish(Cluster::worker_topic, ready_for_data); + } + +global peer_count = 0; + +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) + { + ++peer_count; + + if ( peer_count == 2 ) + event cont_test(); + } + +@endif diff --git a/testing/btest/scripts/base/frameworks/sumstats/manual-epoch.zeek b/testing/btest/scripts/base/frameworks/sumstats/manual-epoch.zeek new file mode 100644 index 0000000000..b99dbc63f6 --- /dev/null +++ b/testing/btest/scripts/base/frameworks/sumstats/manual-epoch.zeek @@ -0,0 +1,60 @@ +# @TEST-EXEC: btest-bg-run standalone zeek -b %INPUT +# @TEST-EXEC: btest-bg-wait 15 +# @TEST-EXEC: btest-diff standalone/.stdout + +@load base/frameworks/sumstats + +redef exit_only_after_terminate=T; + +event second_test() + { + SumStats::observe("test.metric", [$host=1.2.3.4], [$num=5]); + SumStats::observe("test.metric", [$host=1.2.3.4], [$num=22]); + SumStats::observe("test.metric", [$host=1.2.3.4], [$num=94]); + SumStats::observe("test.metric", [$host=1.2.3.4], [$num=50]); + SumStats::observe("test.metric", [$host=1.2.3.4], [$num=50]); + + SumStats::observe("test.metric", [$host=6.5.4.3], [$num=2]); + SumStats::observe("test.metric", [$host=7.2.1.5], [$num=1]); + print "Performing second epoch with overvations"; + local ret = SumStats::next_epoch("test"); + if ( ! ret ) + print "Return value false"; + } + +event cont_test() + { + print "Performing first epoch, no observations"; + local ret = SumStats::next_epoch("test"); + if ( ! ret ) + print "Return value false"; + schedule 2secs { second_test() }; + } + +event zeek_init() &priority=5 + { + local r1: SumStats::Reducer = [$stream="test.metric", + $apply=set(SumStats::SUM, + SumStats::VARIANCE, + SumStats::AVERAGE, + SumStats::MAX, + SumStats::MIN, + SumStats::STD_DEV, + SumStats::UNIQUE, + SumStats::HLL_UNIQUE)]; + SumStats::create([$name="test", + $epoch=0secs, + $reducers=set(r1), + $epoch_result(ts: time, key: SumStats::Key, result: SumStats::Result) = + { + local r = result["test.metric"]; + print fmt("Host: %s - num:%d - sum:%.1f - var:%.1f - avg:%.1f - max:%.1f - min:%.1f - std_dev:%.1f - unique:%d - hllunique:%d", key$host, r$num, r$sum, r$variance, r$average, r$max, r$min, r$std_dev, r$unique, r$hll_unique); + terminate(); + }, + $epoch_finished(ts: time) = + { + print "epoch_finished"; + }]); + + schedule 1secs { cont_test() }; + }