Updates for SumStats API to deal with high memory stats.

- The code is a mess and will need to be cleaned up, but the
    tests do pass.
This commit is contained in:
Seth Hall 2013-08-02 12:44:33 -04:00
parent 7db531e162
commit 4f8100774c
16 changed files with 391 additions and 230 deletions

View file

@ -203,7 +203,13 @@ rest_target(${psd} policy/frameworks/software/vulnerable.bro)
rest_target(${psd} policy/integration/barnyard2/main.bro) rest_target(${psd} policy/integration/barnyard2/main.bro)
rest_target(${psd} policy/integration/barnyard2/types.bro) rest_target(${psd} policy/integration/barnyard2/types.bro)
rest_target(${psd} policy/integration/collective-intel/main.bro) rest_target(${psd} policy/integration/collective-intel/main.bro)
rest_target(${psd} policy/misc/app-metrics.bro) rest_target(${psd} policy/misc/app-stats/main.bro)
rest_target(${psd} policy/misc/app-stats/plugins/facebook.bro)
rest_target(${psd} policy/misc/app-stats/plugins/gmail.bro)
rest_target(${psd} policy/misc/app-stats/plugins/google.bro)
rest_target(${psd} policy/misc/app-stats/plugins/netflix.bro)
rest_target(${psd} policy/misc/app-stats/plugins/pandora.bro)
rest_target(${psd} policy/misc/app-stats/plugins/youtube.bro)
rest_target(${psd} policy/misc/capture-loss.bro) rest_target(${psd} policy/misc/capture-loss.bro)
rest_target(${psd} policy/misc/detect-traceroute/main.bro) rest_target(${psd} policy/misc/detect-traceroute/main.bro)
rest_target(${psd} policy/misc/load-balancing.bro) rest_target(${psd} policy/misc/load-balancing.bro)

View file

@ -33,17 +33,17 @@ export {
## Event sent by nodes that are collecting sumstats after receiving a request for ## Event sent by nodes that are collecting sumstats after receiving a request for
## the sumstat from the manager. ## the sumstat from the manager.
global cluster_ss_response: event(uid: string, ss_name: string, data: ResultTable, done: bool, cleanup: bool); #global cluster_ss_response: event(uid: string, ss_name: string, data: ResultTable, done: bool, cleanup: bool);
## This event is sent by the manager in a cluster to initiate the collection of ## This event is sent by the manager in a cluster to initiate the collection of
## a single key value from a sumstat. It's typically used to get intermediate ## a single key value from a sumstat. It's typically used to get intermediate
## updates before the break interval triggers to speed detection of a value ## updates before the break interval triggers to speed detection of a value
## crossing a threshold. ## crossing a threshold.
global cluster_keys_request: event(uid: string, ss_name: string, key: set[Key], cleanup: bool); global cluster_get_result: event(uid: string, ss_name: string, key: Key, cleanup: bool);
## This event is sent by nodes in response to a ## This event is sent by nodes in response to a
## :bro:id:`SumStats::cluster_keys_request` event. ## :bro:id:`SumStats::cluster_get_result` event.
global cluster_key_response: event(uid: string, ss_name: string, key: Key, result: Result, cleanup: bool); global cluster_send_result: event(uid: string, ss_name: string, key: Key, result: Result, cleanup: bool);
## This is sent by workers to indicate that they crossed the percent ## This is sent by workers to indicate that they crossed the percent
## of the current threshold by the percentage defined globally in ## of the current threshold by the percentage defined globally in
@ -53,14 +53,20 @@ export {
## This event is scheduled internally on workers to send result chunks. ## This event is scheduled internally on workers to send result chunks.
global send_data: event(uid: string, ss_name: string, data: ResultTable, cleanup: bool); global send_data: event(uid: string, ss_name: string, data: ResultTable, cleanup: bool);
global get_a_key: event(uid: string, ss_name: string);
global send_a_key: event(uid: string, ss_name: string, key: Key);
global send_no_key: event(uid: string, ss_name: string);
## This event is generated when a threshold is crossed. ## This event is generated when a threshold is crossed.
global cluster_threshold_crossed: event(ss_name: string, key: SumStats::Key, thold_index: count); global cluster_threshold_crossed: event(ss_name: string, key: SumStats::Key, thold_index: count);
} }
# Add events to the cluster framework to make this work. # Add events to the cluster framework to make this work.
redef Cluster::manager2worker_events += /SumStats::cluster_(ss_request|key_request|threshold_crossed)/; redef Cluster::manager2worker_events += /SumStats::cluster_(ss_request|get_result|threshold_crossed)/;
redef Cluster::manager2worker_events += /SumStats::thresholds_reset/; redef Cluster::manager2worker_events += /SumStats::(thresholds_reset|get_a_key)/;
redef Cluster::worker2manager_events += /SumStats::cluster_(ss_response|key_response|key_intermediate_response)/; redef Cluster::worker2manager_events += /SumStats::cluster_(ss_response|send_result|key_intermediate_response)/;
redef Cluster::worker2manager_events += /SumStats::(send_a_key|send_no_key)/;
@if ( Cluster::local_node_type() != Cluster::MANAGER ) @if ( Cluster::local_node_type() != Cluster::MANAGER )
# This variable is maintained to know what keys have recently sent as # This variable is maintained to know what keys have recently sent as
@ -69,6 +75,10 @@ redef Cluster::worker2manager_events += /SumStats::cluster_(ss_response|key_resp
# an intermediate result has been received. # an intermediate result has been received.
global recent_global_view_keys: table[string, Key] of count &create_expire=1min &default=0; global recent_global_view_keys: table[string, Key] of count &create_expire=1min &default=0;
# Result tables indexed on a uid that are currently being sent to the
# manager.
global sending_results: table[string] of ResultTable = table();
# This is done on all non-manager node types in the event that a sumstat is # This is done on all non-manager node types in the event that a sumstat is
# being collected somewhere other than a worker. # being collected somewhere other than a worker.
function data_added(ss: SumStat, key: Key, result: Result) function data_added(ss: SumStat, key: Key, result: Result)
@ -89,7 +99,7 @@ function data_added(ss: SumStat, key: Key, result: Result)
} }
} }
#event SumStats::send_data(uid: string, ss_name: string, data: ResultTable, cleanup: bool) #event SumStats::send_data(uid: string, ss_name: string, cleanup: bool)
# { # {
# #print fmt("WORKER %s: sending data for uid %s...", Cluster::node, uid); # #print fmt("WORKER %s: sending data for uid %s...", Cluster::node, uid);
# #
@ -117,42 +127,86 @@ function data_added(ss: SumStat, key: Key, result: Result)
# # changed to something else later. # # changed to something else later.
# event SumStats::cluster_ss_response(uid, ss_name, copy(local_data), done, cleanup); # event SumStats::cluster_ss_response(uid, ss_name, copy(local_data), done, cleanup);
# if ( ! done ) # if ( ! done )
# schedule 0.01 sec { SumStats::send_data(uid, ss_name, incoming_data, T) }; # schedule 0.01 sec { SumStats::send_data(uid, T) };
# } # }
#event SumStats::cluster_ss_request(uid: string, ss_name: string, cleanup: bool) event SumStats::get_a_key(uid: string, ss_name: string)
# {
# #print fmt("WORKER %s: received the cluster_ss_request event for %s.", Cluster::node, id);
#
# # Initiate sending all of the data for the requested stats.
# if ( ss_name in result_store )
# event SumStats::send_data(uid, ss_name, result_store[ss_name], cleanup);
# else
# event SumStats::send_data(uid, ss_name, table(), cleanup);
#
# # Lookup the actual sumstats and reset it, the reference to the data
# # currently stored will be maintained internally by the send_data event.
# if ( ss_name in stats_store && cleanup )
# reset(stats_store[ss_name]);
# }
event SumStats::cluster_keys_request(uid: string, ss_name: string, keys: set[Key], cleanup: bool)
{ {
for ( key in keys ) if ( uid in sending_results )
{ {
if ( ss_name in result_store && key in result_store[ss_name] ) if ( |sending_results[uid]| == 0 )
{ event SumStats::send_no_key(uid, ss_name);
#print fmt("WORKER %s: received the cluster_keys_request event for %s=%s.", Cluster::node, key2str(key), data);
for ( key in sending_results[uid] )
{
event SumStats::send_a_key(uid, ss_name, key);
# break to only send one.
break;
}
}
else if ( ss_name in result_store && |result_store[ss_name]| > 0 )
{
if ( |result_store[ss_name]| == 0 )
event SumStats::send_no_key(uid, ss_name);
for ( key in result_store[ss_name] )
{
event SumStats::send_a_key(uid, ss_name, key);
# break to only send one.
break;
}
}
else
{
event SumStats::send_no_key(uid, ss_name);
}
}
event SumStats::cluster_ss_request(uid: string, ss_name: string, cleanup: bool)
{
#print fmt("WORKER %s: received the cluster_ss_request event for %s.", Cluster::node, id);
# Create a back store for the result
sending_results[uid] = (ss_name in result_store) ? copy(result_store[ss_name]) : table();
# Lookup the actual sumstats and reset it, the reference to the data
# currently stored will be maintained internally from the
# sending_results table.
if ( cleanup && ss_name in stats_store )
reset(stats_store[ss_name]);
}
event SumStats::cluster_get_result(uid: string, ss_name: string, key: Key, cleanup: bool)
{
#print fmt("WORKER %s: received the cluster_get_result event for %s=%s.", Cluster::node, key2str(key), data);
if ( cleanup ) # data will implicitly be in sending_results (i know this isn't great)
{
if ( uid in sending_results && key in sending_results[uid] )
{
# Note: copy is needed to compensate serialization caching issue. This should be # Note: copy is needed to compensate serialization caching issue. This should be
# changed to something else later. # changed to something else later.
event SumStats::cluster_key_response(uid, ss_name, key, copy(result_store[ss_name][key]), cleanup); event SumStats::cluster_send_result(uid, ss_name, key, copy(sending_results[uid][key]), cleanup);
delete sending_results[uid][key];
} }
else else
{ {
# We need to send an empty response if we don't have the data so that the manager # We need to send an empty response if we don't have the data so that the manager
# can know that it heard back from all of the workers. # can know that it heard back from all of the workers.
event SumStats::cluster_key_response(uid, ss_name, key, table(), cleanup); event SumStats::cluster_send_result(uid, ss_name, key, table(), cleanup);
}
}
else
{
if ( ss_name in result_store && key in result_store[ss_name] )
{
event SumStats::cluster_send_result(uid, ss_name, key, copy(result_store[ss_name][key]), cleanup);
}
else
{
# We need to send an empty response if we don't have the data so that the manager
# can know that it heard back from all of the workers.
event SumStats::cluster_send_result(uid, ss_name, key, table(), cleanup);
} }
} }
} }
@ -205,7 +259,7 @@ event SumStats::finish_epoch(ss: SumStat)
{ {
if ( network_time() > zero_time ) if ( network_time() > zero_time )
{ {
#print fmt("%.6f MANAGER: breaking %s sumstat for %s sumstat", network_time(), ss$name, ss$id); #print fmt("%.6f MANAGER: breaking %s sumstat", network_time(), ss$name);
local uid = unique_id(""); local uid = unique_id("");
if ( uid in stats_results ) if ( uid in stats_results )
@ -214,6 +268,9 @@ event SumStats::finish_epoch(ss: SumStat)
# Request data from peers. # Request data from peers.
event SumStats::cluster_ss_request(uid, ss$name, T); event SumStats::cluster_ss_request(uid, ss$name, T);
done_with[uid] = 0;
event SumStats::get_a_key(uid, ss$name);
} }
# Schedule the next finish_epoch event. # Schedule the next finish_epoch event.
@ -231,8 +288,129 @@ function data_added(ss: SumStat, key: Key, result: Result)
} }
} }
event SumStats::cluster_key_response(uid: string, ss_name: string, key: Key, result: Result, cleanup: bool) function handle_end_of_result_collection(uid: string, ss_name: string, key: Key, cleanup: bool)
{ {
#print fmt("worker_count:%d :: done_with:%d", Cluster::worker_count, done_with[uid]);
local ss = stats_store[ss_name];
local ir = key_requests[uid];
if ( check_thresholds(ss, key, ir, 1.0) )
{
threshold_crossed(ss, key, ir);
event SumStats::cluster_threshold_crossed(ss_name, key, threshold_tracker[ss_name][key]);
}
delete key_requests[uid];
delete done_with[uid];
if ( cleanup )
{
# This is done here because "cleanup" implicitly means
# it's the end of an epoch.
if ( ss?$epoch_result && |ir| > 0 )
{
local now = network_time();
ss$epoch_result(now, key, ir);
}
# Check that there is an outstanding view before subtracting.
# Global views only apply to non-dynamic requests. Dynamic
# requests must be serviced.
if ( outstanding_global_views[ss_name] > 0 )
--outstanding_global_views[ss_name];
}
if ( uid in stats_results )
delete stats_results[uid][key];
}
function request_all_current_keys(uid: string, ss_name: string, cleanup: bool)
{
#print "request_all_current_keys";
if ( uid in stats_results && |stats_results[uid]| > 0 )
{
#print fmt(" -- %d remaining keys here", |stats_results[uid]|);
for ( key in stats_results[uid] )
{
done_with[uid] = 0;
event SumStats::cluster_get_result(uid, ss_name, key, cleanup);
when ( uid in done_with && Cluster::worker_count == done_with[uid] )
{
handle_end_of_result_collection(uid, ss_name, key, cleanup);
request_all_current_keys(uid, ss_name, cleanup);
}
break; # only a single key
}
}
else
{
# Get more keys! And this breaks us out of the evented loop.
done_with[uid] = 0;
event SumStats::get_a_key(uid, ss_name);
}
}
event SumStats::send_no_key(uid: string, ss_name: string)
{
#print "send_no_key";
++done_with[uid];
if ( Cluster::worker_count == done_with[uid] )
{
delete done_with[uid];
if ( |stats_results[uid]| > 0 )
{
#print "we need more keys!";
# Now that we have a key from each worker, lets
# grab all of the results.
request_all_current_keys(uid, ss_name, T);
}
else
{
#print "we're out of keys!";
local ss = stats_store[ss_name];
if ( ss?$epoch_finished )
ss$epoch_finished(network_time());
}
}
}
event SumStats::send_a_key(uid: string, ss_name: string, key: Key)
{
#print fmt("send_a_key %s", key);
if ( uid !in stats_results )
{
# no clue what happened here
return;
}
if ( key !in stats_results[uid] )
stats_results[uid][key] = table();
++done_with[uid];
if ( Cluster::worker_count == done_with[uid] )
{
delete done_with[uid];
if ( |stats_results[uid]| > 0 )
{
#print "we need more keys!";
# Now that we have a key from each worker, lets
# grab all of the results.
request_all_current_keys(uid, ss_name, T);
}
else
{
#print "we're out of keys!";
local ss = stats_store[ss_name];
if ( ss?$epoch_finished )
ss$epoch_finished(network_time());
}
}
}
event SumStats::cluster_send_result(uid: string, ss_name: string, key: Key, result: Result, cleanup: bool)
{
#print "cluster_send_result";
#print fmt("%0.6f MANAGER: receiving key data from %s - %s=%s", network_time(), get_event_peer()$descr, key2str(key), result); #print fmt("%0.6f MANAGER: receiving key data from %s - %s=%s", network_time(), get_event_peer()$descr, key2str(key), result);
# We only want to try and do a value merge if there are actually measured datapoints # We only want to try and do a value merge if there are actually measured datapoints
@ -244,37 +422,6 @@ event SumStats::cluster_key_response(uid: string, ss_name: string, key: Key, res
# Mark that a worker is done. # Mark that a worker is done.
++done_with[uid]; ++done_with[uid];
#print fmt("worker_count:%d :: done_with:%d", Cluster::worker_count, done_with[uid]);
if ( Cluster::worker_count == done_with[uid] )
{
local ss = stats_store[ss_name];
local ir = key_requests[uid];
if ( check_thresholds(ss, key, ir, 1.0) )
{
threshold_crossed(ss, key, ir);
event SumStats::cluster_threshold_crossed(ss$name, key, threshold_tracker[ss$name][key]);
}
if ()
{
}
if ( cleanup )
{
# We only want to delete the data if this is a non dynamic
# request because the dynamic requests use when statements
# and the data needs to remain available.
delete key_requests[uid];
delete done_with[uid];
# Check that there is an outstanding view before subtracting.
# Global views only apply to non-dynamic requests. Dynamic
# requests must be serviced.
if ( outstanding_global_views[ss_name] > 0 )
--outstanding_global_views[ss_name];
}
}
} }
# Managers handle intermediate updates here. # Managers handle intermediate updates here.
@ -296,96 +443,103 @@ event SumStats::cluster_key_intermediate_response(ss_name: string, key: Key)
local uid = unique_id(""); local uid = unique_id("");
done_with[uid] = 0; done_with[uid] = 0;
event SumStats::cluster_keys_request(uid, ss_name, set(key), T); event SumStats::cluster_get_result(uid, ss_name, key, T);
} }
event SumStats::cluster_ss_response(uid: string, ss_name: string, data: ResultTable, done: bool, cleanup: bool) #event SumStats::cluster_ss_response(uid: string, ss_name: string, data: ResultTable, done: bool, cleanup: bool)
{ # {
#print fmt("MANAGER: receiving results from %s", get_event_peer()$descr); # #print fmt("MANAGER: receiving results from %s", get_event_peer()$descr);
#
# # Mark another worker as being "done" for this uid.
# if ( done )
# ++done_with[uid];
#
# # We had better only be getting requests for stuff that exists.
# if ( ss_name !in stats_store )
# return;
#
# if ( uid !in stats_results )
# stats_results[uid] = table();
#
# local local_data = stats_results[uid];
# local ss = stats_store[ss_name];
#
# for ( key in data )
# {
# if ( key in local_data )
# local_data[key] = compose_results(local_data[key], data[key]);
# else
# local_data[key] = data[key];
#
# # If a stat is done being collected, thresholds for each key
# # need to be checked so we're doing it here to avoid doubly
# # iterating over each key.
# if ( Cluster::worker_count == done_with[uid] )
# {
# if ( check_thresholds(ss, key, local_data[key], 1.0) )
# {
# threshold_crossed(ss, key, local_data[key]);
# event SumStats::cluster_threshold_crossed(ss$name, key, threshold_tracker[ss$name][key]);
# }
# }
# }
#
# # If the data has been collected from all peers, we are done and ready to finish.
# if ( cleanup && Cluster::worker_count == done_with[uid] )
# {
# local now = network_time();
# if ( ss?$epoch_result )
# {
# for ( key in local_data )
# ss$epoch_result(now, key, local_data[key]);
# }
#
# if ( ss?$epoch_finished )
# ss$epoch_finished(now);
#
# # Clean up
# delete stats_results[uid];
# delete done_with[uid];
# reset(ss);
# }
# }
# Mark another worker as being "done" for this uid. #function request(ss_name: string): ResultTable
if ( done ) # {
++done_with[uid]; # # This only needs to be implemented this way for cluster compatibility.
# local uid = unique_id("dyn-");
# We had better only be getting requests for stuff that exists. # stats_results[uid] = table();
if ( ss_name !in stats_store ) # done_with[uid] = 0;
return; # event SumStats::cluster_ss_request(uid, ss_name, F);
#
if ( uid !in stats_results ) # return when ( uid in done_with && Cluster::worker_count == done_with[uid] )
stats_results[uid] = table(); # {
# if ( uid in stats_results )
local local_data = stats_results[uid]; # {
local ss = stats_store[ss_name]; # local ss_result = stats_results[uid];
# # Clean up
for ( key in data ) # delete stats_results[uid];
{ # delete done_with[uid];
if ( key in local_data ) # reset(stats_store[ss_name]);
local_data[key] = compose_results(local_data[key], data[key]); # return ss_result;
else # }
local_data[key] = data[key]; # else
# return table();
# If a stat is done being collected, thresholds for each key # }
# need to be checked so we're doing it here to avoid doubly # timeout 1.1min
# iterating over each key. # {
if ( Cluster::worker_count == done_with[uid] ) # Reporter::warning(fmt("Dynamic SumStat request for %s took longer than 1 minute and was automatically cancelled.", ss_name));
{ # return table();
if ( check_thresholds(ss, key, local_data[key], 1.0) ) # }
{ # }
threshold_crossed(ss, key, local_data[key]);
event SumStats::cluster_threshold_crossed(ss$name, key, threshold_tracker[ss$name][key]);
}
}
}
# If the data has been collected from all peers, we are done and ready to finish.
if ( cleanup && Cluster::worker_count == done_with[uid] )
{
if ( ss?$epoch_finished )
ss$epoch_finished(local_data);
# Clean up
delete stats_results[uid];
delete done_with[uid];
reset(ss);
}
}
function request(ss_name: string): ResultTable
{
# This only needs to be implemented this way for cluster compatibility.
local uid = unique_id("dyn-");
stats_results[uid] = table();
done_with[uid] = 0;
event SumStats::cluster_ss_request(uid, ss_name, F);
return when ( uid in done_with && Cluster::worker_count == done_with[uid] )
{
if ( uid in stats_results )
{
local ss_result = stats_results[uid];
# Clean up
delete stats_results[uid];
delete done_with[uid];
reset(stats_store[ss_name]);
return ss_result;
}
else
return table();
}
timeout 1.1min
{
Reporter::warning(fmt("Dynamic SumStat request for %s took longer than 1 minute and was automatically cancelled.", ss_name));
return table();
}
}
function request_key(ss_name: string, key: Key): Result function request_key(ss_name: string, key: Key): Result
{ {
local uid = unique_id("dyn-"); local uid = unique_id("");
done_with[uid] = 0; done_with[uid] = 0;
key_requests[uid] = table(); key_requests[uid] = table();
event SumStats::cluster_keys_request(uid, ss_name, set(key), F); event SumStats::cluster_get_result(uid, ss_name, key, F);
return when ( uid in done_with && Cluster::worker_count == done_with[uid] ) return when ( uid in done_with && Cluster::worker_count == done_with[uid] )
{ {
local result = key_requests[uid]; local result = key_requests[uid];

View file

@ -118,7 +118,12 @@ export {
## A callback that receives each of the results at the ## A callback that receives each of the results at the
## end of the analysis epoch. The function will be ## end of the analysis epoch. The function will be
## called once for each key. ## called once for each key.
epoch_result: function(ts: time, key::SumStats::Key, result: SumStats::Result) &optional; epoch_result: function(ts: time, key: SumStats::Key, result: SumStats::Result) &optional;
## A callback that will be called when a single collection
## interval is completed. The ts value will be the time of
## when the collection started.
epoch_finished: function(ts:time) &optional;
}; };
## Create a summary statistic. ## Create a summary statistic.

View file

@ -6,13 +6,19 @@ event SumStats::finish_epoch(ss: SumStat)
{ {
if ( ss$name in result_store ) if ( ss$name in result_store )
{ {
local now = network_time();
if ( ss?$epoch_result ) if ( ss?$epoch_result )
{ {
local data = result_store[ss$name]; local data = result_store[ss$name];
# TODO: don't block here. # TODO: don't block here.
for ( key in data ) for ( key in data )
ss$epoch_result(network_time(), key, data[key]); ss$epoch_result(now, key, data[key]);
} }
if ( ss?$epoch_finished )
ss$epoch_finished(now);
reset(ss); reset(ss);
} }

View file

@ -45,20 +45,16 @@ event bro_init() &priority=3
SumStats::create([$name="app-metrics", SumStats::create([$name="app-metrics",
$epoch=break_interval, $epoch=break_interval,
$reducers=set(r1, r2), $reducers=set(r1, r2),
$epoch_finished(data: SumStats::ResultTable) = $epoch_result(ts: time, key: SumStats::Key, result: SumStats::Result) =
{ {
local l: Info; local l: Info;
l$ts = network_time(); l$ts = network_time();
l$ts_delta = break_interval; l$ts_delta = break_interval;
for ( key in data ) l$app = key$str;
{ l$bytes = double_to_count(floor(result["apps.bytes"]$sum));
local result = data[key]; l$hits = result["apps.hits"]$num;
l$app = key$str; l$uniq_hosts = result["apps.hits"]$unique;
l$bytes = double_to_count(floor(result["apps.bytes"]$sum)); Log::write(LOG, l);
l$hits = result["apps.hits"]$num;
l$uniq_hosts = result["apps.hits"]$unique;
Log::write(LOG, l);
}
}]); }]);
} }

View file

@ -35,7 +35,15 @@
@load integration/barnyard2/types.bro @load integration/barnyard2/types.bro
@load integration/collective-intel/__load__.bro @load integration/collective-intel/__load__.bro
@load integration/collective-intel/main.bro @load integration/collective-intel/main.bro
@load misc/app-metrics.bro @load misc/app-stats/__load__.bro
@load misc/app-stats/main.bro
@load misc/app-stats/plugins/__load__.bro
@load misc/app-stats/plugins/facebook.bro
@load misc/app-stats/plugins/gmail.bro
@load misc/app-stats/plugins/google.bro
@load misc/app-stats/plugins/netflix.bro
@load misc/app-stats/plugins/pandora.bro
@load misc/app-stats/plugins/youtube.bro
@load misc/capture-loss.bro @load misc/capture-loss.bro
@load misc/detect-traceroute/__load__.bro @load misc/detect-traceroute/__load__.bro
@load misc/detect-traceroute/main.bro @load misc/detect-traceroute/main.bro

View file

@ -1,3 +1,3 @@
A test metric threshold was crossed with a value of: 101.0 A test metric threshold was crossed with a value of: 101.0
End of epoch handler was called
101.0 101.0
End of epoch handler was called

View file

@ -1,7 +1,2 @@
Complete SumStat request
Host: 6.5.4.3 -> 1
Host: 10.10.10.10 -> 5
Host: 1.2.3.4 -> 169
Host: 7.2.1.5 -> 145
SumStat key request SumStat key request
Host: 7.2.1.5 -> 145 Host: 7.2.1.5 -> 145

View file

@ -1,5 +1,2 @@
Complete SumStat request
Host: 1.2.3.4 -> 42
Host: 4.3.2.1 -> 7
Key request for 1.2.3.4 Key request for 1.2.3.4
Host: 1.2.3.4 -> 42 Host: 1.2.3.4 -> 42

View file

@ -26,14 +26,13 @@ event bro_init() &priority=5
SumStats::create([$name="test", SumStats::create([$name="test",
$epoch=5secs, $epoch=5secs,
$reducers=set(r1), $reducers=set(r1),
$epoch_finished(rt: SumStats::ResultTable) = $epoch_result(ts: time, key: SumStats::Key, result: SumStats::Result) =
{
local r = result["test"];
print fmt("Host: %s - num:%d - sum:%.1f - avg:%.1f - max:%.1f - min:%.1f - var:%.1f - std_dev:%.1f - unique:%d", key$host, r$num, r$sum, r$average, r$max, r$min, r$variance, r$std_dev, r$unique);
},
$epoch_finished(ts: time) =
{ {
for ( key in rt )
{
local r = rt[key]["test"];
print fmt("Host: %s - num:%d - sum:%.1f - avg:%.1f - max:%.1f - min:%.1f - var:%.1f - std_dev:%.1f - unique:%d", key$host, r$num, r$sum, r$average, r$max, r$min, r$variance, r$std_dev, r$unique);
}
terminate(); terminate();
}]); }]);
} }

View file

@ -14,13 +14,10 @@ event bro_init() &priority=5
SumStats::create([$name="test", SumStats::create([$name="test",
$epoch=3secs, $epoch=3secs,
$reducers=set(r1), $reducers=set(r1),
$epoch_finished(data: SumStats::ResultTable) = $epoch_result(ts: time, key: SumStats::Key, result: SumStats::Result) =
{ {
for ( key in data ) local r = result["test.metric"];
{ print fmt("Host: %s - num:%d - sum:%.1f - var:%.1f - avg:%.1f - max:%.1f - min:%.1f - std_dev:%.1f - unique:%d", key$host, r$num, r$sum, r$variance, r$average, r$max, r$min, r$std_dev, r$unique);
local r = data[key]["test.metric"];
print fmt("Host: %s - num:%d - sum:%.1f - var:%.1f - avg:%.1f - max:%.1f - min:%.1f - std_dev:%.1f - unique:%d", key$host, r$num, r$sum, r$variance, r$average, r$max, r$min, r$std_dev, r$unique);
}
} }
]); ]);

View file

@ -23,11 +23,13 @@ event bro_init() &priority=5
SumStats::create([$name="test", SumStats::create([$name="test",
$epoch=10secs, $epoch=10secs,
$reducers=set(r1), $reducers=set(r1),
$epoch_finished(data: SumStats::ResultTable) = $epoch_result(ts: time, key: SumStats::Key, result: SumStats::Result) =
{
print result["test.metric"]$sum;
},
$epoch_finished(ts: time) =
{ {
print "End of epoch handler was called"; print "End of epoch handler was called";
for ( res in data )
print data[res]["test.metric"]$sum;
terminate(); terminate();
}, },
$threshold_val(key: SumStats::Key, result: SumStats::Result) = $threshold_val(key: SumStats::Key, result: SumStats::Result) =

View file

@ -22,7 +22,7 @@ global n = 0;
event bro_init() &priority=5 event bro_init() &priority=5
{ {
local r1: SumStats::Reducer = [$stream="test", $apply=set(SumStats::SUM, SumStats::MIN, SumStats::MAX, SumStats::AVERAGE, SumStats::STD_DEV, SumStats::VARIANCE, SumStats::UNIQUE)]; local r1 = SumStats::Reducer($stream="test", $apply=set(SumStats::SUM, SumStats::MIN, SumStats::MAX, SumStats::AVERAGE, SumStats::STD_DEV, SumStats::VARIANCE, SumStats::UNIQUE));
SumStats::create([$name="test sumstat", SumStats::create([$name="test sumstat",
$epoch=1hr, $epoch=1hr,
$reducers=set(r1)]); $reducers=set(r1)]);
@ -61,23 +61,24 @@ event on_demand2()
when ( local result = SumStats::request_key("test sumstat", [$host=host]) ) when ( local result = SumStats::request_key("test sumstat", [$host=host]) )
{ {
print "SumStat key request"; print "SumStat key request";
print fmt(" Host: %s -> %.0f", host, result["test"]$sum); if ( "test" in result )
print fmt(" Host: %s -> %.0f", host, result["test"]$sum);
terminate(); terminate();
} }
} }
event on_demand() event on_demand()
{ {
when ( local results = SumStats::request("test sumstat") ) #when ( local results = SumStats::request("test sumstat") )
{ # {
print "Complete SumStat request"; # print "Complete SumStat request";
print fmt(" Host: %s -> %.0f", 6.5.4.3, results[[$host=6.5.4.3]]["test"]$sum); # print fmt(" Host: %s -> %.0f", 6.5.4.3, results[[$host=6.5.4.3]]["test"]$sum);
print fmt(" Host: %s -> %.0f", 10.10.10.10, results[[$host=10.10.10.10]]["test"]$sum); # print fmt(" Host: %s -> %.0f", 10.10.10.10, results[[$host=10.10.10.10]]["test"]$sum);
print fmt(" Host: %s -> %.0f", 1.2.3.4, results[[$host=1.2.3.4]]["test"]$sum); # print fmt(" Host: %s -> %.0f", 1.2.3.4, results[[$host=1.2.3.4]]["test"]$sum);
print fmt(" Host: %s -> %.0f", 7.2.1.5, results[[$host=7.2.1.5]]["test"]$sum); # print fmt(" Host: %s -> %.0f", 7.2.1.5, results[[$host=7.2.1.5]]["test"]$sum);
event on_demand2(); event on_demand2();
} # }
} }
global peer_count = 0; global peer_count = 0;

View file

@ -4,17 +4,18 @@
redef exit_only_after_terminate=T; redef exit_only_after_terminate=T;
event on_demand() ## Requesting a full sumstats resulttable is not supported yet.
{ #event on_demand()
when ( local results = SumStats::request("test") ) # {
{ # when ( local results = SumStats::request("test") )
print "Complete SumStat request"; # {
for ( key in results ) # print "Complete SumStat request";
{ # for ( key in results )
print fmt(" Host: %s -> %.0f", key$host, results[key]["test.reducer"]$sum); # {
} # print fmt(" Host: %s -> %.0f", key$host, results[key]["test.reducer"]$sum);
} # }
} # }
# }
event on_demand_key() event on_demand_key()
{ {
@ -39,7 +40,7 @@ event bro_init() &priority=5
SumStats::observe("test.reducer", [$host=1.2.3.4], [$num=42]); SumStats::observe("test.reducer", [$host=1.2.3.4], [$num=42]);
SumStats::observe("test.reducer", [$host=4.3.2.1], [$num=7]); SumStats::observe("test.reducer", [$host=4.3.2.1], [$num=7]);
schedule 0.1 secs { on_demand() }; #schedule 0.1 secs { on_demand() };
schedule 1 secs { on_demand_key() }; schedule 1 secs { on_demand_key() };
} }

View file

@ -23,21 +23,18 @@ event bro_init() &priority=5
SumStats::create([$name="test", SumStats::create([$name="test",
$epoch=5secs, $epoch=5secs,
$reducers=set(r1), $reducers=set(r1),
$epoch_finished(rt: SumStats::ResultTable) = $epoch_result(ts: time, key: SumStats::Key, result: SumStats::Result) =
{ {
local hosts: vector of addr = vector(6.5.4.3, 10.10.10.10, 1.2.3.4, 7.2.1.5); local r = result["test"];
for ( i in hosts ) print fmt("Host: %s Sampled observations: %d", key$host, r$sample_elements);
{ local sample_nums: vector of count = vector();
local key = [$host=hosts[i]]; for ( sample in r$samples )
local r = rt[key]["test"]; sample_nums[|sample_nums|] =r$samples[sample]$num;
print fmt("Host: %s Sampled observations: %d", key$host, r$sample_elements); print fmt(" %s", sort(sample_nums));
local sample_nums: vector of count = vector(); },
for ( sample in r$samples ) $epoch_finished(ts: time) =
sample_nums[|sample_nums|] =r$samples[sample]$num; {
print fmt(" %s", sort(sample_nums));
}
terminate(); terminate();
}]); }]);
} }

View file

@ -8,15 +8,12 @@ event bro_init() &priority=5
SumStats::create([$name="test", SumStats::create([$name="test",
$epoch=3secs, $epoch=3secs,
$reducers=set(r1), $reducers=set(r1),
$epoch_finished(data: SumStats::ResultTable) = $epoch_result(ts: time, key: SumStats::Key, result: SumStats::Result) =
{ {
for ( key in data ) print key$host;
{ local r = result["test.metric"];
print key$host; print r$samples;
local r = data[key]["test.metric"]; print r$sample_elements;
print r$samples;
print r$sample_elements;
}
}]); }]);
SumStats::observe("test.metric", [$host=1.2.3.4], [$num=5]); SumStats::observe("test.metric", [$host=1.2.3.4], [$num=5]);