Merge remote branch 'origin/master' into topic/bernhard/hyperloglog

Conflicts:
	src/3rdparty
This commit is contained in:
Bernhard Amann 2013-08-26 12:53:13 -07:00
commit 74f96d22ef
232 changed files with 9163 additions and 148274 deletions

View file

@ -27,6 +27,9 @@ export {
## File hash which is non-hash type specific. It's up to the user to query
## for any relevant hash types.
FILE_HASH,
## File names. Typically with protocols with definite indications
## of a file name.
FILE_NAME,
## Certificate SHA-1 hash.
CERT_HASH,
};
@ -80,6 +83,10 @@ export {
## If the data was discovered within a connection, the
## connection record should go into get to give context to the data.
conn: connection &optional;
## If the data was discovered within a file, the file record
## should go here to provide context to the data.
f: fa_file &optional;
};
## Record used for the logging framework representing a positive
@ -95,6 +102,16 @@ export {
## this is the conn_id for the connection.
id: conn_id &log &optional;
## If a file was associated with this intelligence hit,
## this is the uid for the file.
fuid: string &log &optional;
## A mime type if the intelligence hit is related to a file.
## If the $f field is provided this will be automatically filled out.
file_mime_type: string &log &optional;
## Frequently files can be "described" to give a bit more context.
## If the $f field is provided this field will be automatically filled out.
file_desc: string &log &optional;
## Where the data was seen.
seen: Seen &log;
## Sources which supplied data that resulted in this match.
@ -248,7 +265,25 @@ function has_meta(check: MetaData, metas: set[MetaData]): bool
event Intel::match(s: Seen, items: set[Item]) &priority=5
{
local info: Info = [$ts=network_time(), $seen=s];
local info = Info($ts=network_time(), $seen=s);
if ( s?$f )
{
if ( s$f?$conns && |s$f$conns| == 1 )
{
for ( cid in s$f$conns )
s$conn = s$f$conns[cid];
}
if ( ! info?$fuid )
info$fuid = s$f$id;
if ( ! info?$file_mime_type && s$f?$mime_type )
info$file_mime_type = s$f$mime_type;
if ( ! info?$file_desc )
info$file_desc = Files::describe(s$f);
}
if ( s?$conn )
{

View file

@ -60,7 +60,7 @@ export {
# Add events to the cluster framework to make this work.
redef Cluster::manager2worker_events += /SumStats::cluster_(ss_request|get_result|threshold_crossed)/;
redef Cluster::manager2worker_events += /SumStats::(thresholds_reset|get_a_key)/;
redef Cluster::manager2worker_events += /SumStats::(get_a_key)/;
redef Cluster::worker2manager_events += /SumStats::cluster_(ss_response|send_result|key_intermediate_response)/;
redef Cluster::worker2manager_events += /SumStats::(send_a_key|send_no_key)/;
@ -95,37 +95,6 @@ function data_added(ss: SumStat, key: Key, result: Result)
}
}
#event SumStats::send_data(uid: string, ss_name: string, cleanup: bool)
# {
# #print fmt("WORKER %s: sending data for uid %s...", Cluster::node, uid);
#
# local local_data: ResultTable = table();
# local incoming_data: ResultTable = cleanup ? data : copy(data);
#
# local num_added = 0;
# for ( key in incoming_data )
# {
# local_data[key] = incoming_data[key];
# delete incoming_data[key];
#
# # Only send cluster_send_in_groups_of at a time. Queue another
# # event to send the next group.
# if ( cluster_send_in_groups_of == ++num_added )
# break;
# }
#
# local done = F;
# # If data is empty, this sumstat is done.
# if ( |incoming_data| == 0 )
# done = T;
#
# # Note: copy is needed to compensate serialization caching issue. This should be
# # changed to something else later.
# event SumStats::cluster_ss_response(uid, ss_name, copy(local_data), done, cleanup);
# if ( ! done )
# schedule 0.01 sec { SumStats::send_data(uid, T) };
# }
event SumStats::get_a_key(uid: string, ss_name: string, cleanup: bool)
{
if ( uid in sending_results )
@ -204,6 +173,8 @@ event SumStats::cluster_get_result(uid: string, ss_name: string, key: Key, clean
{
if ( ss_name in result_store && key in result_store[ss_name] )
{
# Note: copy is needed to compensate serialization caching issue. This should be
# changed to something else later.
event SumStats::cluster_send_result(uid, ss_name, key, copy(result_store[ss_name][key]), cleanup);
}
else
@ -223,11 +194,6 @@ event SumStats::cluster_threshold_crossed(ss_name: string, key: SumStats::Key, t
threshold_tracker[ss_name][key] = thold_index;
}
event SumStats::thresholds_reset(ss_name: string)
{
delete threshold_tracker[ss_name];
}
@endif
@ -236,7 +202,12 @@ event SumStats::thresholds_reset(ss_name: string)
# This variable is maintained by manager nodes as they collect and aggregate
# results.
# Index on a uid.
global stats_keys: table[string] of set[Key] &create_expire=1min;
global stats_keys: table[string] of set[Key] &create_expire=1min
&expire_func=function(s: table[string] of set[Key], idx: string): interval
{
Reporter::warning(fmt("SumStat key request for the %s SumStat uid took longer than 1 minute and was automatically cancelled.", idx));
return 0secs;
};
# This variable is maintained by manager nodes to track how many "dones" they
# collected per collection unique id. Once the number of results for a uid
@ -251,11 +222,15 @@ global done_with: table[string] of count &create_expire=1min &default=0;
# Indexed on a uid.
global key_requests: table[string] of Result &create_expire=1min;
# Store uids for dynamic requests here to avoid cleanup on the uid.
# (This needs to be done differently!)
global dynamic_requests: set[string] &create_expire=1min;
# This variable is maintained by managers to prevent overwhelming communication due
# to too many intermediate updates. Each sumstat is tracked separately so that
# one won't overwhelm and degrade other quieter sumstats.
# Indexed on a sumstat id.
global outstanding_global_views: table[string] of count &create_expire=1min &default=0;
global outstanding_global_views: table[string] of count &read_expire=1min &default=0;
const zero_time = double_to_time(0.0);
# Managers handle logging.
@ -274,6 +249,7 @@ event SumStats::finish_epoch(ss: SumStat)
event SumStats::cluster_ss_request(uid, ss$name, T);
done_with[uid] = 0;
#print fmt("get_key by uid: %s", uid);
event SumStats::get_a_key(uid, ss$name, T);
}
@ -295,6 +271,12 @@ function data_added(ss: SumStat, key: Key, result: Result)
function handle_end_of_result_collection(uid: string, ss_name: string, key: Key, cleanup: bool)
{
if ( uid !in key_requests )
{
Reporter::warning(fmt("Tried to handle end of result collection with missing uid in key_request sumstat:%s, key:%s.", ss_name, key));
return;
}
#print fmt("worker_count:%d :: done_with:%d", Cluster::worker_count, done_with[uid]);
local ss = stats_store[ss_name];
local ir = key_requests[uid];
@ -335,12 +317,6 @@ function request_all_current_keys(uid: string, ss_name: string, cleanup: bool)
{
done_with[uid] = 0;
event SumStats::cluster_get_result(uid, ss_name, key, cleanup);
when ( uid in done_with && Cluster::worker_count == done_with[uid] )
{
#print "done getting result";
handle_end_of_result_collection(uid, ss_name, key, cleanup);
request_all_current_keys(uid, ss_name, cleanup);
}
delete stats_keys[uid][key];
break; # only a single key
}
@ -357,12 +333,16 @@ function request_all_current_keys(uid: string, ss_name: string, cleanup: bool)
event SumStats::send_no_key(uid: string, ss_name: string)
{
#print "send_no_key";
if ( uid !in done_with )
done_with[uid] = 0;
++done_with[uid];
if ( Cluster::worker_count == done_with[uid] )
{
delete done_with[uid];
if ( |stats_keys[uid]| > 0 )
if ( uid in stats_keys && |stats_keys[uid]| > 0 )
{
#print "we need more keys!";
# Now that we have a key from each worker, lets
@ -375,6 +355,9 @@ event SumStats::send_no_key(uid: string, ss_name: string)
local ss = stats_store[ss_name];
if ( ss?$epoch_finished )
ss$epoch_finished(network_time());
delete stats_keys[uid];
reset(ss);
}
}
}
@ -384,7 +367,7 @@ event SumStats::send_a_key(uid: string, ss_name: string, key: Key)
#print fmt("send_a_key %s", key);
if ( uid !in stats_keys )
{
# no clue what happened here
Reporter::warning(fmt("Manager received a uid for an unknown request. SumStat: %s, Key: %s", ss_name, key));
return;
}
@ -409,6 +392,8 @@ event SumStats::send_a_key(uid: string, ss_name: string, key: Key)
local ss = stats_store[ss_name];
if ( ss?$epoch_finished )
ss$epoch_finished(network_time());
reset(ss);
}
}
}
@ -426,20 +411,27 @@ event SumStats::cluster_send_result(uid: string, ss_name: string, key: Key, resu
key_requests[uid] = compose_results(key_requests[uid], result);
# Mark that a worker is done.
if ( uid !in done_with )
done_with[uid] = 0;
#print fmt("MANAGER: got a result for %s %s from %s", uid, key, get_event_peer()$descr);
++done_with[uid];
#if ( Cluster::worker_count == done_with[uid] )
# {
# print "done";
# handle_end_of_result_collection(uid, ss_name, key, cleanup);
# }
if ( uid !in dynamic_requests &&
uid in done_with && Cluster::worker_count == done_with[uid] )
{
handle_end_of_result_collection(uid, ss_name, key, cleanup);
if ( cleanup )
request_all_current_keys(uid, ss_name, cleanup);
}
}
# Managers handle intermediate updates here.
event SumStats::cluster_key_intermediate_response(ss_name: string, key: Key)
{
#print fmt("MANAGER: receiving intermediate key data from %s", get_event_peer()$descr);
#print fmt("MANAGER: requesting key data for %s", key2str(key));
#print fmt("MANAGER: requesting key data for %s", key);
if ( ss_name in outstanding_global_views &&
|outstanding_global_views[ss_name]| > max_outstanding_global_views )
@ -454,110 +446,16 @@ event SumStats::cluster_key_intermediate_response(ss_name: string, key: Key)
local uid = unique_id("");
done_with[uid] = 0;
#print fmt("requesting results for: %s", uid);
event SumStats::cluster_get_result(uid, ss_name, key, F);
when ( uid in done_with && Cluster::worker_count == done_with[uid] )
{
handle_end_of_result_collection(uid, ss_name, key, F);
}
timeout 1.1min
{
Reporter::warning(fmt("Dynamic SumStat intermediate key request for %s (%s) took longer than 1 minute and was automatically cancelled.", ss_name, key));
}
}
#event SumStats::cluster_ss_response(uid: string, ss_name: string, data: ResultTable, done: bool, cleanup: bool)
# {
# #print fmt("MANAGER: receiving results from %s", get_event_peer()$descr);
#
# # Mark another worker as being "done" for this uid.
# if ( done )
# ++done_with[uid];
#
# # We had better only be getting requests for stuff that exists.
# if ( ss_name !in stats_store )
# return;
#
# if ( uid !in stats_keys )
# stats_keys[uid] = table();
#
# local local_data = stats_keys[uid];
# local ss = stats_store[ss_name];
#
# for ( key in data )
# {
# if ( key in local_data )
# local_data[key] = compose_results(local_data[key], data[key]);
# else
# local_data[key] = data[key];
#
# # If a stat is done being collected, thresholds for each key
# # need to be checked so we're doing it here to avoid doubly
# # iterating over each key.
# if ( Cluster::worker_count == done_with[uid] )
# {
# if ( check_thresholds(ss, key, local_data[key], 1.0) )
# {
# threshold_crossed(ss, key, local_data[key]);
# event SumStats::cluster_threshold_crossed(ss$name, key, threshold_tracker[ss$name][key]);
# }
# }
# }
#
# # If the data has been collected from all peers, we are done and ready to finish.
# if ( cleanup && Cluster::worker_count == done_with[uid] )
# {
# local now = network_time();
# if ( ss?$epoch_result )
# {
# for ( key in local_data )
# ss$epoch_result(now, key, local_data[key]);
# }
#
# if ( ss?$epoch_finished )
# ss$epoch_finished(now);
#
# # Clean up
# delete stats_keys[uid];
# delete done_with[uid];
# reset(ss);
# }
# }
#function request(ss_name: string): ResultTable
# {
# # This only needs to be implemented this way for cluster compatibility.
# local uid = unique_id("dyn-");
# stats_keys[uid] = table();
# done_with[uid] = 0;
# event SumStats::cluster_ss_request(uid, ss_name, F);
#
# return when ( uid in done_with && Cluster::worker_count == done_with[uid] )
# {
# if ( uid in stats_keys )
# {
# local ss_result = stats_keys[uid];
# # Clean up
# delete stats_keys[uid];
# delete done_with[uid];
# reset(stats_store[ss_name]);
# return ss_result;
# }
# else
# return table();
# }
# timeout 1.1min
# {
# Reporter::warning(fmt("Dynamic SumStat request for %s took longer than 1 minute and was automatically cancelled.", ss_name));
# return table();
# }
# }
function request_key(ss_name: string, key: Key): Result
{
local uid = unique_id("");
done_with[uid] = 0;
key_requests[uid] = table();
add dynamic_requests[uid];
event SumStats::cluster_get_result(uid, ss_name, key, F);
return when ( uid in done_with && Cluster::worker_count == done_with[uid] )
@ -567,13 +465,14 @@ function request_key(ss_name: string, key: Key): Result
# Clean up
delete key_requests[uid];
delete done_with[uid];
delete dynamic_requests[uid];
return result;
}
timeout 1.1min
{
Reporter::warning(fmt("Dynamic SumStat key request for %s (%s) took longer than 1 minute and was automatically cancelled.", ss_name, key));
return table();
Reporter::warning(fmt("Dynamic SumStat key request for %s in SumStat %s took longer than 1 minute and was automatically cancelled.", key, ss_name));
return Result();
}
}

View file

@ -153,11 +153,6 @@ export {
## Returns: The result for the requested sumstat key.
global request_key: function(ss_name: string, key: Key): Result;
## This event is generated when thresholds are reset for a SumStat.
##
## name: SumStats name that thresholds were reset for.
global thresholds_reset: event(name: string);
## Helper function to represent a :bro:type:`SumStats::Key` value as
## a simple string.
##
@ -321,7 +316,6 @@ function reset(ss: SumStat)
{
delete threshold_tracker[ss$name];
threshold_tracker[ss$name] = table();
event SumStats::thresholds_reset(ss$name);
}
}