diff --git a/scripts/base/frameworks/metrics/cluster.bro b/scripts/base/frameworks/metrics/cluster.bro index 94281eb883..91efa98996 100644 --- a/scripts/base/frameworks/metrics/cluster.bro +++ b/scripts/base/frameworks/metrics/cluster.bro @@ -3,39 +3,66 @@ ##! and will be depending on if the cluster framework has been enabled. ##! The goal of this script is to make metric calculation completely and ##! transparently automated when running on a cluster. +##! +##! Events defined here are not exported deliberately because they are meant +##! to be an internal implementation detail. @load base/frameworks/cluster module Metrics; export { - ## This event is sent by the manager in a cluster to initiate the 3 - ## collection of metrics values - global cluster_collect: event(uid: string, id: ID, filter_name: string); - - ## This event is sent by nodes that are collecting metrics after receiving - ## a request for the metric filter from the manager. - global cluster_results: event(uid: string, id: ID, filter_name: string, data: MetricTable, done: bool); - - ## This event is used internally by workers to send result chunks. - global send_data: event(uid: string, id: ID, filter_name: string, data: MetricTable); - ## This value allows a user to decide how large of result groups the ## workers should transmit values. const cluster_send_in_groups_of = 50 &redef; + + ## This is the percent of the full threshold value that needs to be met + ## on a single worker for that worker to send the value to its manager in + ## order for it to request a global view for that value. There is no + ## requirement that the manager requests a global view for the index + ## since it may opt not to if it requested a global view for the index + ## recently. + const cluster_request_global_view_percent = 0.1 &redef; } +## This event is sent by the manager in a cluster to initiate the +## collection of metrics values for a filter. +global cluster_filter_request: event(uid: string, id: ID, filter_name: string); + +## This event is sent by nodes that are collecting metrics after receiving +## a request for the metric filter from the manager. +global cluster_filter_response: event(uid: string, id: ID, filter_name: string, data: MetricTable, done: bool); + +## This event is sent by the manager in a cluster to initiate the +## collection of a single index value from a filter. It's typically +## used to get intermediate updates before the break interval triggers +## to speed detection of a value crossing a threshold. +global cluster_index_request: event(uid: string, id: ID, filter_name: string, index: Index); + +## This event is sent by nodes in response to a +## :bro:id:`cluster_index_request` event. +global cluster_index_response: event(uid: string, id: ID, filter_name: string, index: Index, val: count); + +## This is sent by workers to indicate that they crossed the percent of the +## current threshold by the percentage defined globally in +## :bro:id:`cluster_request_global_view_percent` +global cluster_index_intermediate_response: event(id: Metrics::ID, filter_name: string, index: Metrics::Index, val: count); + +## This event is scheduled internally on workers to send result chunks. +global send_data: event(uid: string, id: ID, filter_name: string, data: MetricTable); + # This is maintained by managers so they can know what data they requested and # when they requested it. global requested_results: table[string] of time = table() &create_expire=5mins; -# TODO: Both of the next variables make the assumption that a value never +# TODO: The next 4 variables make the assumption that a value never # takes longer than 5 minutes to transmit from workers to manager. This needs to # be tunable or self-tuning. These should also be restructured to be # maintained within a single variable. + # This variable is maintained by manager nodes as they collect and aggregate # results. -global collecting_results: table[string, ID, string] of MetricTable &create_expire=5mins; +global filter_results: table[string, ID, string] of MetricTable &create_expire=5mins; # This variable is maintained by manager nodes to track how many "dones" they # collected per collection unique id. Once the number of results for a uid @@ -44,28 +71,47 @@ global collecting_results: table[string, ID, string] of MetricTable &create_expi # TODO: add an &expire_func in case not all results are received. global done_with: table[string] of count &create_expire=5mins &default=0; +# This variable is maintained by managers to track intermediate responses as +# they are getting a global view for a certain index. +global index_requests: table[string, ID, string, Index] of count &create_expire=5mins &default=0; + +# This variable is maintained by all hosts for different purposes. Non-managers +# maintain it to know what indexes they have recently sent as intermediate +# updates so they don't overwhelm their manager. Managers maintain it so they +# don't overwhelm workers with intermediate index requests. The count that is +# yielded is the number of times the percentage threshold has been crossed and +# an intermediate result has been received. The manager may optionally request +# the index again before data expires from here if too many workers are crossing +# the percentage threshold (not implemented yet!). +global recent_global_view_indexes: table[ID, string, Index] of count &create_expire=5mins &default=0; + # Add events to the cluster framework to make this work. -redef Cluster::manager_events += /Metrics::cluster_collect/; -redef Cluster::worker_events += /Metrics::cluster_results/; +redef Cluster::manager_events += /Metrics::cluster_(filter_request|index_request)/; +redef Cluster::worker_events += /Metrics::cluster_(filter_response|index_response|index_intermediate_response)/; -# The metrics collection process can only be done by a manager. -@if ( Cluster::local_node_type() == Cluster::MANAGER ) -event Metrics::log_it(filter: Filter) +@if ( Cluster::local_node_type() != Cluster::MANAGER ) +# This is done on all non-manager node types in the event that a metric is +# being collected somewhere other than a worker. +function data_added(filter: Filter, index: Index, val: count) { - local uid = unique_id(""); + # If an intermediate update for this value was sent recently, don't send + # it again. + if ( [filter$id, filter$name, index] in recent_global_view_indexes ) + return; + + # If val is 5 and global view % is 0.1 (10%), pct_val will be 50. If that + # crosses the full threshold then it's a candidate to send as an + # intermediate update. + local pct_val = double_to_count(val / cluster_request_global_view_percent); - # Set some tracking variables. - requested_results[uid] = network_time(); - collecting_results[uid, filter$id, filter$name] = table(); - - # Request data from peers. - event Metrics::cluster_collect(uid, filter$id, filter$name); - # Schedule the log_it event for the next break period. - schedule filter$break_interval { Metrics::log_it(filter) }; + if ( check_notice(filter, index, pct_val) ) + { + # kick off intermediate update + event Metrics::cluster_index_intermediate_response(filter$id, filter$name, index, val); + + ++recent_global_view_indexes[filter$id, filter$name, index]; + } } -@endif - -@if ( Cluster::local_node_type() == Cluster::WORKER ) event Metrics::send_data(uid: string, id: ID, filter_name: string, data: MetricTable) { @@ -89,31 +135,99 @@ event Metrics::send_data(uid: string, id: ID, filter_name: string, data: MetricT if ( |data| == 0 ) done = T; - event Metrics::cluster_results(uid, id, filter_name, local_data, done); + event Metrics::cluster_filter_response(uid, id, filter_name, local_data, done); if ( ! done ) event Metrics::send_data(uid, id, filter_name, data); } -event Metrics::cluster_collect(uid: string, id: ID, filter_name: string) +event Metrics::cluster_filter_request(uid: string, id: ID, filter_name: string) { - #print fmt("WORKER %s: received the cluster_collect event.", Cluster::node); + #print fmt("WORKER %s: received the cluster_filter_request event.", Cluster::node); + # Initiate sending all of the data for the requested filter. event Metrics::send_data(uid, id, filter_name, store[id, filter_name]); - + # Lookup the actual filter and reset it, the reference to the data # currently stored will be maintained interally by the send_data event. reset(filter_store[id, filter_name]); } + +event Metrics::cluster_index_request(uid: string, id: ID, filter_name: string, index: Index) + { + local val=0; + if ( index in store[id, filter_name] ) + val = store[id, filter_name][index]; + + # fmt("WORKER %s: received the cluster_index_request event for %s=%d.", Cluster::node, index2str(index), val); + event Metrics::cluster_index_response(uid, id, filter_name, index, val); + } + @endif @if ( Cluster::local_node_type() == Cluster::MANAGER ) -event Metrics::cluster_results(uid: string, id: ID, filter_name: string, data: MetricTable, done: bool) +# Manager's handle logging. +event Metrics::log_it(filter: Filter) + { + #print fmt("%.6f MANAGER: breaking %s filter for %s metric", network_time(), filter$name, filter$id); + + local uid = unique_id(""); + + # Set some tracking variables. + requested_results[uid] = network_time(); + filter_results[uid, filter$id, filter$name] = table(); + + # Request data from peers. + event Metrics::cluster_filter_request(uid, filter$id, filter$name); + # Schedule the log_it event for the next break period. + schedule filter$break_interval { Metrics::log_it(filter) }; + } + +# This is unlikely to be called often, but it's here in case there are metrics +# being collected by managers. +function data_added(filter: Filter, index: Index, val: count) + { + if ( check_notice(filter, index, val) ) + do_notice(filter, index, val); + } + +event Metrics::cluster_index_response(uid: string, id: ID, filter_name: string, index: Index, val: count) + { + #print fmt("%0.6f MANAGER: receiving index data from %s", network_time(), get_event_peer()$descr); + + if ( [uid, id, filter_name, index] !in index_requests ) + index_requests[uid, id, filter_name, index] = 0; + + index_requests[uid, id, filter_name, index] += val; + local ir = index_requests[uid, id, filter_name, index]; + + ++done_with[uid]; + if ( Cluster::worker_count == done_with[uid] ) + { + if ( check_notice(filter_store[id, filter_name], index, ir) ) + do_notice(filter_store[id, filter_name], index, ir); + delete done_with[uid]; + delete index_requests[uid, id, filter_name, index]; + } + } + +# Managers handle intermediate updates here. +event Metrics::cluster_index_intermediate_response(id: ID, filter_name: string, index: Index, val: count) + { + #print fmt("MANAGER: receiving intermediate index data from %s", get_event_peer()$descr); + #print fmt("MANAGER: requesting index data for %s", index2str(index)); + + local uid = unique_id(""); + event Metrics::cluster_index_request(uid, id, filter_name, index); + ++recent_global_view_indexes[id, filter_name, index]; + } + +event Metrics::cluster_filter_response(uid: string, id: ID, filter_name: string, data: MetricTable, done: bool) { #print fmt("MANAGER: receiving results from %s", get_event_peer()$descr); - local local_data = collecting_results[uid, id, filter_name]; + local local_data = filter_results[uid, id, filter_name]; for ( index in data ) { if ( index !in local_data ) @@ -131,15 +245,16 @@ event Metrics::cluster_results(uid: string, id: ID, filter_name: string, data: M local ts = network_time(); # Log the time this was initially requested if it's available. if ( uid in requested_results ) + { ts = requested_results[uid]; - - write_log(ts, filter_store[id, filter_name], local_data); - if ( [uid, id, filter_name] in collecting_results ) - delete collecting_results[uid, id, filter_name]; - if ( uid in done_with ) - delete done_with[uid]; - if ( uid in requested_results ) delete requested_results[uid]; + } + + write_log(ts, filter_store[id, filter_name], local_data); + + # Clean up + delete filter_results[uid, id, filter_name]; + delete done_with[uid]; } } diff --git a/scripts/base/frameworks/metrics/main.bro b/scripts/base/frameworks/metrics/main.bro index e488877bc1..2dd9e19b03 100644 --- a/scripts/base/frameworks/metrics/main.bro +++ b/scripts/base/frameworks/metrics/main.bro @@ -63,20 +63,28 @@ export { ## This is essentially a mapping table between addresses and subnets. aggregation_table: table[subnet] of subnet &optional; ## The interval at which the metric should be "broken" and written - ## to the logging stream. + ## to the logging stream. The counters are also reset to zero at + ## this time so any threshold based detection needs to be set to a + ## number that should be expected to happen within this period. break_interval: interval &default=default_break_interval; ## This determines if the result of this filter is sent to the metrics ## logging stream. One use for the logging framework is as an internal ## thresholding and statistics gathering utility that is meant to ## never log but rather to generate notices and derive data. log: bool &default=T; + ## If this and a $notice_threshold value are set, this notice type + ## will be generated by the metrics framework. + note: Notice::Type &optional; ## A straight threshold for generating a notice. notice_threshold: count &optional; ## A series of thresholds at which to generate notices. notice_thresholds: vector of count &optional; - ## If this and a $notice_threshold value are set, this notice type - ## will be generated by the metrics framework. - note: Notice::Type &optional; + ## How often this notice should be raised for this metric index. It + ## will be generated everytime it crosses a threshold, but if the + ## $break_interval is set to 5mins and this is set to 1hr the notice + ## only be generated once per hour even if something crosses the + ## threshold in every break interval. + notice_freq: interval &optional; }; global add_filter: function(id: ID, filter: Filter); @@ -99,7 +107,16 @@ global filter_store: table[ID, string] of Filter = table(); type MetricTable: table[Index] of count &default=0; # This is indexed by metric ID and stream filter name. -global store: table[ID, string] of MetricTable = table(); +global store: table[ID, string] of MetricTable = table() &default=table(); + +# This function checks if a threshold has been crossed and generates a +# notice if it has. It is also used as a method to implement +# mid-break-interval threshold crossing detection for cluster deployments. +global check_notice: function(filter: Filter, index: Index, val: count): bool; + +# This is hook for watching thresholds being crossed. It is called whenever +# index values are updated and the new val is given as the `val` argument. +global data_added: function(filter: Filter, index: Index, val: count); # This stores the current threshold index for filters using the # $notice_threshold and $notice_thresholds elements. @@ -121,7 +138,7 @@ function index2str(index: Index): string out = fmt("%s%sstr=%s", out, |out|==0 ? "" : ", ", index$str); return fmt("metric_index(%s)", out); } - + function write_log(ts: time, filter: Filter, data: MetricTable) { for ( index in data ) @@ -132,29 +149,6 @@ function write_log(ts: time, filter: Filter, data: MetricTable) $filter_name=filter$name, $index=index, $value=val]; - - if ( (filter?$notice_threshold && - m$value >= filter$notice_threshold && - [filter$id, filter$name, index] !in thresholds) || - (filter?$notice_thresholds && - |filter$notice_thresholds| <= thresholds[filter$id, filter$name, index] && - m$value >= filter$notice_thresholds[thresholds[filter$id, filter$name, index]]) ) - { - local n: Notice::Info = [$note=filter$note, $n=m$value, $metric_index=index]; - n$msg = fmt("Metrics threshold crossed by %s %d/%d", index2str(index), m$value, filter$notice_threshold); - if ( m$index?$str ) - n$sub = m$index$str; - if ( m$index?$host ) - n$src = m$index$host; - # TODO: not sure where to put the network yet. - - NOTICE(n); - - # This just needs set to some value so that it doesn't refire the - # notice until it expires from the table or it cross the next - # threshold in the case of vectors of thesholds. - ++thresholds[filter$id, filter$name, index]; - } if ( filter$log ) Log::write(METRICS, m); @@ -205,15 +199,14 @@ function add_data(id: ID, index: Index, increment: count) local filters = metric_filters[id]; - # Add the data to any of the defined filters. + # Try to add the data to all of the defined filters for the metric. for ( filter_id in filters ) { local filter = filters[filter_id]; # If this filter has a predicate, run the predicate and skip this # index if the predicate return false. - if ( filter?$pred && - ! filter$pred(index) ) + if ( filter?$pred && ! filter$pred(index) ) next; if ( index?$host ) @@ -229,10 +222,49 @@ function add_data(id: ID, index: Index, increment: count) delete index$host; } } - + local metric_tbl = store[id, filter$name]; if ( index !in metric_tbl ) metric_tbl[index] = 0; metric_tbl[index] += increment; + + data_added(filter, index, metric_tbl[index]); } } + +function check_notice(filter: Filter, index: Index, val: count): bool + { + if ( (filter?$notice_threshold && + [filter$id, filter$name, index] !in thresholds && + val >= filter$notice_threshold) || + (filter?$notice_thresholds && + |filter$notice_thresholds| <= thresholds[filter$id, filter$name, index] && + val >= filter$notice_thresholds[thresholds[filter$id, filter$name, index]]) ) + return T; + else + return F; + } + +function do_notice(filter: Filter, index: Index, val: count) + { + # We include $peer_descr here because the a manager count have actually + # generated the notice even though the current remote peer for the event + # calling this could be a worker if this is running as a cluster. + local n: Notice::Info = [$note=filter$note, + $n=val, + $metric_index=index, + $peer_descr=peer_description]; + n$msg = fmt("Threshold crossed by %s %d/%d", index2str(index), val, filter$notice_threshold); + if ( index?$str ) + n$sub = index$str; + if ( index?$host ) + n$src = index$host; + # TODO: not sure where to put the network yet. + + NOTICE(n); + + # This just needs set to some value so that it doesn't refire the + # notice until it expires from the table or it crosses the next + # threshold in the case of vectors of thresholds. + ++thresholds[filter$id, filter$name, index]; + } diff --git a/scripts/base/frameworks/metrics/non-cluster.bro b/scripts/base/frameworks/metrics/non-cluster.bro index 4e6d1e3d65..a467ebf714 100644 --- a/scripts/base/frameworks/metrics/non-cluster.bro +++ b/scripts/base/frameworks/metrics/non-cluster.bro @@ -11,3 +11,10 @@ event Metrics::log_it(filter: Filter) schedule filter$break_interval { Metrics::log_it(filter) }; } + + +function data_added(filter: Filter, index: Index, val: count) + { + if ( check_notice(filter, index, val) ) + do_notice(filter, index, val); + } \ No newline at end of file diff --git a/scripts/base/frameworks/notice/main.bro b/scripts/base/frameworks/notice/main.bro index 595851b7c5..ea7a472031 100644 --- a/scripts/base/frameworks/notice/main.bro +++ b/scripts/base/frameworks/notice/main.bro @@ -308,7 +308,9 @@ function apply_policy(n: Notice::Info) if ( ! n?$src_peer ) n$src_peer = get_event_peer(); - n$peer_descr = n$src_peer?$descr ? n$src_peer$descr : fmt("%s", n$src_peer$host); + if ( ! n?$peer_descr ) + n$peer_descr = n$src_peer?$descr ? + n$src_peer$descr : fmt("%s", n$src_peer$host); if ( ! n?$actions ) n$actions = set(); @@ -340,7 +342,7 @@ function apply_policy(n: Notice::Info) # Create the ordered notice policy automatically which will be used at runtime # for prioritized matching of the notice policy. -event bro_init() +event bro_init() &priority=10 { local tmp: table[count] of set[PolicyItem] = table(); for ( pi in policy ) diff --git a/scripts/policy/protocols/ssh/detect-bruteforcing.bro b/scripts/policy/protocols/ssh/detect-bruteforcing.bro index e38f63ad8e..fb1c075d86 100644 --- a/scripts/policy/protocols/ssh/detect-bruteforcing.bro +++ b/scripts/policy/protocols/ssh/detect-bruteforcing.bro @@ -32,11 +32,6 @@ export { ## client subnets and the yield value represents server subnets. const ignore_guessers: table[subnet] of subnet &redef; - ## Keeps count of how many rejections a host has had. - global password_rejections: table[addr] of TrackCount - &write_expire=guessing_timeout - &synchronized; - ## Keeps track of hosts identified as guessing passwords. global password_guessers: set[addr] &read_expire=guessing_timeout+1hr &synchronized; } @@ -46,6 +41,7 @@ event bro_init() Metrics::add_filter(FAILED_LOGIN, [$name="detect-bruteforcing", $log=F, $note=Password_Guessing, $notice_threshold=password_guesses_limit, + $notice_freq=1hr, $break_interval=guessing_timeout]); } @@ -59,9 +55,7 @@ event SSH::heuristic_successful_login(c: connection) # { # NOTICE([$note=Login_By_Password_Guesser, # $conn=c, - # $n=password_rejections[id$orig_h]$n, - # $msg=fmt("Successful SSH login by password guesser %s", id$orig_h), - # $sub=fmt("%d failed logins", password_rejections[id$orig_h]$n)]); + # $msg=fmt("Successful SSH login by password guesser %s", id$orig_h)]); # } } @@ -74,14 +68,4 @@ event SSH::heuristic_failed_login(c: connection) if ( ! (id$orig_h in ignore_guessers && id$resp_h in ignore_guessers[id$orig_h]) ) Metrics::add_data(FAILED_LOGIN, [$host=id$orig_h], 1); - - #if ( default_check_threshold(password_rejections[id$orig_h]) ) - # { - # add password_guessers[id$orig_h]; - # NOTICE([$note=Password_Guessing, - # $conn=c, - # $msg=fmt("SSH password guessing by %s", id$orig_h), - # $sub=fmt("%d apparently failed logins", password_rejections[id$orig_h]$n), - # $n=password_rejections[id$orig_h]$n]); - # } } \ No newline at end of file diff --git a/testing/btest/Baseline/policy.frameworks.metrics.cluster-intermediate-update/manager-1.notice.log b/testing/btest/Baseline/policy.frameworks.metrics.cluster-intermediate-update/manager-1.notice.log new file mode 100644 index 0000000000..48c74fe7c4 --- /dev/null +++ b/testing/btest/Baseline/policy.frameworks.metrics.cluster-intermediate-update/manager-1.notice.log @@ -0,0 +1,2 @@ +# ts uid id.orig_h id.orig_p id.resp_h id.resp_p note msg sub src dst p n peer_descr actions policy_items dropped remote_location.country_code remote_location.region remote_location.city remote_location.latitude remote_location.longitude metric_index.host metric_index.str metric_index.network +1313897486.017657 - - - - - Test_Notice Threshold crossed by metric_index(host=1.2.3.4) 100/100 - 1.2.3.4 - - 100 manager-1 Notice::ACTION_LOG 4 - - - - - - 1.2.3.4 - - diff --git a/testing/btest/Baseline/policy.frameworks.metrics.notice/notice.log b/testing/btest/Baseline/policy.frameworks.metrics.notice/notice.log index 282e3c7b7b..1e0e6a572b 100644 --- a/testing/btest/Baseline/policy.frameworks.metrics.notice/notice.log +++ b/testing/btest/Baseline/policy.frameworks.metrics.notice/notice.log @@ -1,4 +1,3 @@ # ts uid id.orig_h id.orig_p id.resp_h id.resp_p note msg sub src dst p n peer_descr actions policy_items dropped remote_location.country_code remote_location.region remote_location.city remote_location.latitude remote_location.longitude metric_index.host metric_index.str metric_index.network -1313508844.321207 - - - - - Test_Notice Metrics threshold crossed by metric_index(host=6.5.4.3) 2/1 - 6.5.4.3 - - 2 bro Notice::ACTION_LOG 4 - - - - - - 6.5.4.3 - - -1313508844.321207 - - - - - Test_Notice Metrics threshold crossed by metric_index(host=1.2.3.4) 3/1 - 1.2.3.4 - - 3 bro Notice::ACTION_LOG 4 - - - - - - 1.2.3.4 - - -1313508844.321207 - - - - - Test_Notice Metrics threshold crossed by metric_index(host=7.2.1.5) 1/1 - 7.2.1.5 - - 1 bro Notice::ACTION_LOG 4 - - - - - - 7.2.1.5 - - +1313685819.326521 - - - - - Test_Notice Threshold crossed by metric_index(host=1.2.3.4) 3/2 - 1.2.3.4 - - 3 bro Notice::ACTION_LOG 4 - - - - - - 1.2.3.4 - - +1313685819.326521 - - - - - Test_Notice Threshold crossed by metric_index(host=6.5.4.3) 2/2 - 6.5.4.3 - - 2 bro Notice::ACTION_LOG 4 - - - - - - 6.5.4.3 - - diff --git a/testing/btest/policy/frameworks/metrics/basic-cluster.bro b/testing/btest/policy/frameworks/metrics/basic-cluster.bro index eda41c3759..75dd7b762d 100644 --- a/testing/btest/policy/frameworks/metrics/basic-cluster.bro +++ b/testing/btest/policy/frameworks/metrics/basic-cluster.bro @@ -24,15 +24,11 @@ event bro_init() &priority=5 Metrics::add_filter(TEST_METRIC, [$name="foo-bar", $break_interval=3secs]); + + if ( Cluster::local_node_type() == Cluster::WORKER ) + { + Metrics::add_data(TEST_METRIC, [$host=1.2.3.4], 3); + Metrics::add_data(TEST_METRIC, [$host=6.5.4.3], 2); + Metrics::add_data(TEST_METRIC, [$host=7.2.1.5], 1); + } } - -@if ( Cluster::local_node_type() == Cluster::WORKER ) - -event bro_init() - { - Metrics::add_data(TEST_METRIC, [$host=1.2.3.4], 3); - Metrics::add_data(TEST_METRIC, [$host=6.5.4.3], 2); - Metrics::add_data(TEST_METRIC, [$host=7.2.1.5], 1); - } - -@endif \ No newline at end of file diff --git a/testing/btest/policy/frameworks/metrics/cluster-intermediate-update.bro b/testing/btest/policy/frameworks/metrics/cluster-intermediate-update.bro new file mode 100644 index 0000000000..519de35805 --- /dev/null +++ b/testing/btest/policy/frameworks/metrics/cluster-intermediate-update.bro @@ -0,0 +1,54 @@ +# @TEST-EXEC: btest-bg-run manager-1 BROPATH=$BROPATH:.. CLUSTER_NODE=manager-1 bro %INPUT +# @TEST-EXEC: btest-bg-run proxy-1 BROPATH=$BROPATH:.. CLUSTER_NODE=proxy-1 bro %INPUT +# @TEST-EXEC: sleep 1 +# @TEST-EXEC: btest-bg-run worker-1 BROPATH=$BROPATH:.. CLUSTER_NODE=worker-1 bro %INPUT +# @TEST-EXEC: btest-bg-run worker-2 BROPATH=$BROPATH:.. CLUSTER_NODE=worker-2 bro %INPUT +# @TEST-EXEC: btest-bg-wait -k 5 +# @TEST-EXEC: btest-diff manager-1/notice.log + +@TEST-START-FILE cluster-layout.bro +redef Cluster::nodes = { + ["manager-1"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=37757/tcp, $workers=set("worker-1")], + ["proxy-1"] = [$node_type=Cluster::PROXY, $ip=127.0.0.1, $p=37758/tcp, $manager="manager-1", $workers=set("worker-1")], + ["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=37760/tcp, $manager="manager-1", $proxy="proxy-1", $interface="eth0"], + ["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=37761/tcp, $manager="manager-1", $proxy="proxy-1", $interface="eth1"], +}; +@TEST-END-FILE + +redef enum Notice::Type += { + Test_Notice, +}; + +redef enum Metrics::ID += { + TEST_METRIC, +}; + +event bro_init() &priority=5 + { + Metrics::add_filter(TEST_METRIC, + [$name="foo-bar", + $break_interval=1hr, + $note=Test_Notice, + $notice_threshold=100, + $log=T]); + } + +@if ( Cluster::local_node_type() == Cluster::WORKER ) + +event do_metrics(i: count) + { + # Worker-1 will trigger an intermediate update and then if everything + # works correctly, the data from worker-2 will hit the threshold and + # should trigger the notice. + Metrics::add_data(TEST_METRIC, [$host=1.2.3.4], i); + } + +event bro_init() + { + if ( Cluster::node == "worker-1" ) + schedule 2sec { do_metrics(99) }; + if ( Cluster::node == "worker-2" ) + event do_metrics(1); + } + +@endif \ No newline at end of file diff --git a/testing/btest/policy/frameworks/metrics/notice.bro b/testing/btest/policy/frameworks/metrics/notice.bro index 3451af18f4..0ac9faa956 100644 --- a/testing/btest/policy/frameworks/metrics/notice.bro +++ b/testing/btest/policy/frameworks/metrics/notice.bro @@ -1,6 +1,7 @@ # @TEST-EXEC: bro %INPUT # @TEST-EXEC: btest-diff notice.log + redef enum Notice::Type += { Test_Notice, }; @@ -15,7 +16,7 @@ event bro_init() &priority=5 [$name="foo-bar", $break_interval=3secs, $note=Test_Notice, - $notice_threshold=1, + $notice_threshold=2, $log=F]); Metrics::add_data(TEST_METRIC, [$host=1.2.3.4], 3); Metrics::add_data(TEST_METRIC, [$host=6.5.4.3], 2);