duct-tape fix of values not propagating after intermediate check in cluster environments.

This commit is contained in:
Bernhard Amann 2013-05-02 11:34:33 -07:00
parent 9ea5a470e6
commit d984243a77
3 changed files with 21 additions and 8 deletions

View file

@ -97,7 +97,7 @@ function data_added(ss: SumStat, key: Key, result: Result)
check_thresholds(ss, key, result, cluster_request_global_view_percent) ) check_thresholds(ss, key, result, cluster_request_global_view_percent) )
{ {
# kick off intermediate update # kick off intermediate update
event SumStats::cluster_key_intermediate_response(ss$id, key); event SumStats::cluster_key_intermediate_response(ss$id, copy(key));
++recent_global_view_keys[ss$id, key]; ++recent_global_view_keys[ss$id, key];
} }
} }
@ -124,7 +124,7 @@ event SumStats::send_data(uid: string, ssid: string, data: ResultTable)
if ( |data| == 0 ) if ( |data| == 0 )
done = T; done = T;
event SumStats::cluster_ss_response(uid, ssid, local_data, done); event SumStats::cluster_ss_response(uid, ssid, copy(local_data), done);
if ( ! done ) if ( ! done )
schedule 0.01 sec { SumStats::send_data(uid, ssid, data) }; schedule 0.01 sec { SumStats::send_data(uid, ssid, data) };
} }
@ -150,7 +150,7 @@ event SumStats::cluster_key_request(uid: string, ssid: string, key: Key)
if ( ssid in result_store && key in result_store[ssid] ) if ( ssid in result_store && key in result_store[ssid] )
{ {
#print fmt("WORKER %s: received the cluster_key_request event for %s=%s.", Cluster::node, key2str(key), data); #print fmt("WORKER %s: received the cluster_key_request event for %s=%s.", Cluster::node, key2str(key), data);
event SumStats::cluster_key_response(uid, ssid, key, result_store[ssid][key]); event SumStats::cluster_key_response(uid, ssid, key, copy(result_store[ssid][key]));
} }
else else
{ {

View file

@ -1 +1,3 @@
A test metric threshold was crossed with a value of: 100.0 A test metric threshold was crossed with a value of: 101.0
End of epoch handler was called
101.0

View file

@ -4,7 +4,7 @@
# @TEST-EXEC: sleep 3 # @TEST-EXEC: sleep 3
# @TEST-EXEC: btest-bg-run worker-1 BROPATH=$BROPATH:.. CLUSTER_NODE=worker-1 bro %INPUT # @TEST-EXEC: btest-bg-run worker-1 BROPATH=$BROPATH:.. CLUSTER_NODE=worker-1 bro %INPUT
# @TEST-EXEC: btest-bg-run worker-2 BROPATH=$BROPATH:.. CLUSTER_NODE=worker-2 bro %INPUT # @TEST-EXEC: btest-bg-run worker-2 BROPATH=$BROPATH:.. CLUSTER_NODE=worker-2 bro %INPUT
# @TEST-EXEC: btest-bg-wait 10 # @TEST-EXEC: btest-bg-wait 20
# @TEST-EXEC: btest-diff manager-1/.stdout # @TEST-EXEC: btest-diff manager-1/.stdout
@TEST-START-FILE cluster-layout.bro @TEST-START-FILE cluster-layout.bro
@ -20,8 +20,15 @@ redef Log::default_rotation_interval = 0secs;
event bro_init() &priority=5 event bro_init() &priority=5
{ {
local r1: SumStats::Reducer = [$stream="test.metric", $apply=set(SumStats::SUM)]; local r1: SumStats::Reducer = [$stream="test.metric", $apply=set(SumStats::SUM)];
SumStats::create([$epoch=1hr, SumStats::create([$epoch=10secs,
$reducers=set(r1), $reducers=set(r1),
$epoch_finished(data: SumStats::ResultTable) =
{
print "End of epoch handler was called";
for ( res in data )
print data[res]["test.metric"]$sum;
terminate();
},
$threshold_val(key: SumStats::Key, result: SumStats::Result) = $threshold_val(key: SumStats::Key, result: SumStats::Result) =
{ {
return double_to_count(result["test.metric"]$sum); return double_to_count(result["test.metric"]$sum);
@ -30,7 +37,6 @@ event bro_init() &priority=5
$threshold_crossed(key: SumStats::Key, result: SumStats::Result) = $threshold_crossed(key: SumStats::Key, result: SumStats::Result) =
{ {
print fmt("A test metric threshold was crossed with a value of: %.1f", result["test.metric"]$sum); print fmt("A test metric threshold was crossed with a value of: %.1f", result["test.metric"]$sum);
terminate();
}]); }]);
} }
@ -52,8 +58,13 @@ event remote_connection_handshake_done(p: event_peer)
if ( p$descr == "manager-1" ) if ( p$descr == "manager-1" )
{ {
if ( Cluster::node == "worker-1" ) if ( Cluster::node == "worker-1" )
{
schedule 0.1sec { do_stats(1) }; schedule 0.1sec { do_stats(1) };
schedule 5secs { do_stats(60) };
}
if ( Cluster::node == "worker-2" ) if ( Cluster::node == "worker-2" )
schedule 0.5sec { do_stats(99) }; schedule 0.5sec { do_stats(40) };
} }
} }