mirror of
https://github.com/zeek/zeek.git
synced 2025-10-14 12:38:20 +00:00
Reworked cluster intelligence data distribution mechanism and fixed tests.
- Intel data distribution on clusters is now pushed in whole by the manager when a worker connects. Additions after that point are managed by the normal single-item distribution mechanism already built into the intelligence framework. - The manager maintains the complete "minimal" data store that the workers use to do their matching so that full "minimal" data distribution is very easy. - Tests are cleaned up and work.
This commit is contained in:
parent
38468f9daa
commit
bf9651b323
16 changed files with 84 additions and 177 deletions
|
@ -6,29 +6,23 @@
|
|||
|
||||
module Intel;
|
||||
|
||||
redef record Item += {
|
||||
## This field is used internally for cluster transparency to avoid
|
||||
## re-dispatching intelligence items over and over from workers.
|
||||
first_dispatch: bool &default=T;
|
||||
};
|
||||
|
||||
# If this process is not a manager process, we don't want the full metadata
|
||||
@if ( Cluster::local_node_type() != Cluster::MANAGER )
|
||||
redef have_full_data = F;
|
||||
@endif
|
||||
|
||||
global cluster_new_item: event(item: Item);
|
||||
global cluster_updated_item: event(item: Item);
|
||||
|
||||
redef record Item += {
|
||||
## This field is solely used internally for cluster transparency with
|
||||
## the intelligence framework to avoid storms of intelligence data
|
||||
## swirling forever. It allows data to propagate only a single time.
|
||||
first_dispatch: bool &default=T;
|
||||
};
|
||||
|
||||
# Primary intelligence distribution comes from manager.
|
||||
redef Cluster::manager2worker_events += /^Intel::cluster_.*$/;
|
||||
redef Cluster::manager2worker_events += /^Intel::(cluster_new_item)$/;
|
||||
# If a worker finds intelligence and adds it, it should share it back to the manager.
|
||||
redef Cluster::worker2manager_events += /^Intel::(cluster_.*|match_no_items)$/;
|
||||
|
||||
@if ( Cluster::local_node_type() != Cluster::MANAGER )
|
||||
redef Intel::data_store &synchronized;
|
||||
@endif
|
||||
redef Cluster::worker2manager_events += /^Intel::(cluster_new_item|match_no_items)$/;
|
||||
|
||||
@if ( Cluster::local_node_type() == Cluster::MANAGER )
|
||||
event Intel::match_no_items(s: Seen) &priority=5
|
||||
|
@ -36,19 +30,13 @@ event Intel::match_no_items(s: Seen) &priority=5
|
|||
event Intel::match(s, Intel::get_items(s));
|
||||
}
|
||||
|
||||
global initial_sync = F;
|
||||
event remote_connection_handshake_done(p: event_peer)
|
||||
{
|
||||
# Insert the data once something is connected.
|
||||
# This should only push the data to a single host where the
|
||||
# normal Bro synchronization should take over.
|
||||
if ( ! initial_sync )
|
||||
# When a worker connects, send it the complete minimal data store.
|
||||
# It will be kept up to date after this by the cluster_new_item event.
|
||||
if ( Cluster::nodes[p$descr]$node_type == Cluster::WORKER )
|
||||
{
|
||||
initial_sync = T;
|
||||
for ( net in data_store$net_data )
|
||||
event Intel::cluster_new_item([$net=net, $meta=[$source=""]]);
|
||||
for ( [str, str_type] in data_store$string_data )
|
||||
event Intel::cluster_new_item([$str=str, $str_type=str_type, $meta=[$source=""]]);
|
||||
send_id(p, "min_data_store");
|
||||
}
|
||||
}
|
||||
@endif
|
||||
|
@ -60,34 +48,14 @@ event Intel::cluster_new_item(item: Intel::Item) &priority=5
|
|||
Intel::insert(item);
|
||||
}
|
||||
|
||||
event Intel::cluster_updated_item(item: Intel::Item) &priority=5
|
||||
{
|
||||
# Ignore locally generated events to avoid event storms.
|
||||
if ( is_remote_event() )
|
||||
Intel::insert(item);
|
||||
}
|
||||
|
||||
event Intel::new_item(item: Intel::Item) &priority=5
|
||||
{
|
||||
# The cluster manager always rebroadcasts intelligence.
|
||||
# Workers redistribute it if it was locally generated on
|
||||
# the worker.
|
||||
# Workers redistribute it if it was locally generated.
|
||||
if ( Cluster::local_node_type() == Cluster::MANAGER ||
|
||||
item$first_dispatch )
|
||||
{
|
||||
item$first_dispatch = F;
|
||||
item$first_dispatch=F;
|
||||
event Intel::cluster_new_item(item);
|
||||
}
|
||||
}
|
||||
|
||||
event Intel::updated_item(item: Intel::Item) &priority=5
|
||||
{
|
||||
# If this is the first time this item has been dispatched or this
|
||||
# is a manager, send it over the cluster.
|
||||
if ( Cluster::local_node_type() == Cluster::MANAGER ||
|
||||
item$first_dispatch )
|
||||
{
|
||||
item$first_dispatch = F;
|
||||
event Intel::cluster_updated_item(item);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,3 +26,4 @@ event bro_init() &priority=5
|
|||
$ev=Intel::read_entry]);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -129,6 +129,16 @@ type DataStore: record {
|
|||
};
|
||||
global data_store: DataStore &redef;
|
||||
|
||||
# The inmemory data structure for holding the barest matchable intelligence.
|
||||
# This is primarily for workers to do the initial quick matches and store
|
||||
# a minimal amount of data for the full match to happen on the manager.
|
||||
type MinDataStore: record {
|
||||
net_data: set[subnet];
|
||||
string_data: set[string, StrType];
|
||||
};
|
||||
global min_data_store: MinDataStore &redef;
|
||||
|
||||
|
||||
event bro_init() &priority=5
|
||||
{
|
||||
Log::create_stream(LOG, [$columns=Info, $ev=log_intel]);
|
||||
|
@ -137,12 +147,14 @@ event bro_init() &priority=5
|
|||
function find(s: Seen): bool
|
||||
{
|
||||
if ( s?$host &&
|
||||
s$host in data_store$net_data )
|
||||
((have_full_data && s$host in data_store$net_data) ||
|
||||
(s$host in min_data_store$net_data)))
|
||||
{
|
||||
return T;
|
||||
}
|
||||
else if ( s?$str && s?$str_type &&
|
||||
[s$str, s$str_type] in data_store$string_data )
|
||||
((have_full_data && [s$str, s$str_type] in data_store$string_data) ||
|
||||
([s$str, s$str_type] in min_data_store$string_data)))
|
||||
{
|
||||
return T;
|
||||
}
|
||||
|
@ -232,7 +244,7 @@ function has_meta(check: MetaData, metas: set[MetaData]): bool
|
|||
return F;
|
||||
}
|
||||
|
||||
event Intel::match(s: Seen, items: set[Item])
|
||||
event Intel::match(s: Seen, items: set[Item]) &priority=5
|
||||
{
|
||||
local empty_set: set[string] = set();
|
||||
local info: Info = [$ts=network_time(), $seen=s, $sources=empty_set];
|
||||
|
@ -264,24 +276,39 @@ function insert(item: Item)
|
|||
if ( item?$host )
|
||||
{
|
||||
local host = mask_addr(item$host, is_v4_addr(item$host) ? 32 : 128);
|
||||
if ( host !in data_store$net_data )
|
||||
data_store$net_data[host] = set();
|
||||
|
||||
metas = data_store$net_data[host];
|
||||
if ( have_full_data )
|
||||
{
|
||||
if ( host !in data_store$net_data )
|
||||
data_store$net_data[host] = set();
|
||||
|
||||
metas = data_store$net_data[host];
|
||||
}
|
||||
|
||||
add min_data_store$net_data[host];
|
||||
}
|
||||
else if ( item?$net )
|
||||
{
|
||||
if ( item$net !in data_store$net_data )
|
||||
data_store$net_data[item$net] = set();
|
||||
if ( have_full_data )
|
||||
{
|
||||
if ( item$net !in data_store$net_data )
|
||||
data_store$net_data[item$net] = set();
|
||||
|
||||
metas = data_store$net_data[item$net];
|
||||
metas = data_store$net_data[item$net];
|
||||
}
|
||||
|
||||
add min_data_store$net_data[item$net];
|
||||
}
|
||||
else if ( item?$str )
|
||||
{
|
||||
if ( [item$str, item$str_type] !in data_store$string_data )
|
||||
data_store$string_data[item$str, item$str_type] = set();
|
||||
if ( have_full_data )
|
||||
{
|
||||
if ( [item$str, item$str_type] !in data_store$string_data )
|
||||
data_store$string_data[item$str, item$str_type] = set();
|
||||
|
||||
metas = data_store$string_data[item$str, item$str_type];
|
||||
metas = data_store$string_data[item$str, item$str_type];
|
||||
}
|
||||
|
||||
add min_data_store$string_data[item$str, item$str_type];
|
||||
}
|
||||
|
||||
local updated = F;
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue