mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 14:48:21 +00:00

This change adds two new hooks to the Intel framework that can be used to intercept added and removed indicators and their type. These hooks are fairly low-level. One immediate use-case is to count the number of indicators loaded per Intel::Type and enable and disable the corresponding event groups of the intel/seen scripts. I attempted to gauge the overhead and while it's definitely there, loading a file with ~500k DOMAIN entries takes somewhere around ~0.5 seconds hooks when populated via the min_data_store store mechanism. While that doesn't sound great, it actually takes the manager on my system 2.5 seconds to serialize and Cluster::publish() the min_data_store alone and its doing that serially for every active worker. Mostly to say that the bigger overhead in that area on the manager doing redundant work per worker. Co-authored-by: Mohan Dhawan <mohan@corelight.com> (cherry picked from commit 3366d81e98ef381d843f6d76628834fdcd622e25)
133 lines
4 KiB
Text
133 lines
4 KiB
Text
##! Cluster transparency support for the intelligence framework. This is mostly
|
|
##! oriented toward distributing intelligence information across clusters.
|
|
|
|
@load ./main
|
|
@load base/frameworks/cluster
|
|
|
|
module Intel;
|
|
|
|
# Internal events for cluster data distribution. Marked as &is_used since
|
|
# they're communicated via Broker.
|
|
global insert_item: event(item: Item) &is_used;
|
|
global insert_indicator: event(item: Item) &is_used;
|
|
|
|
# Event to transfer the min_data_store to connecting nodes.
|
|
global new_min_data_store: event(store: MinDataStore) &is_used;
|
|
|
|
# By default the manager sends its current min_data_store to connecting workers.
|
|
# During testing it's handy to suppress this, since receipt of the store
|
|
# introduces nondeterminism when mixed with explicit data insertions.
|
|
const send_store_on_node_up = T &redef;
|
|
|
|
# If this process is not a manager process, we don't want the full metadata.
|
|
@if ( Cluster::local_node_type() != Cluster::MANAGER )
|
|
redef have_full_data = F;
|
|
@endif
|
|
|
|
@if ( Cluster::local_node_type() == Cluster::MANAGER )
|
|
event zeek_init()
|
|
{
|
|
Broker::auto_publish(Cluster::worker_topic, remove_indicator);
|
|
}
|
|
|
|
# Handling of new worker nodes.
|
|
event Cluster::node_up(name: string, id: string)
|
|
{
|
|
# When a worker connects, send it the complete minimal data store unless
|
|
# we turned off that feature. The store will be kept up to date after
|
|
# this by the insert_indicator event.
|
|
if ( send_store_on_node_up && name in Cluster::nodes && Cluster::nodes[name]$node_type == Cluster::WORKER )
|
|
{
|
|
Broker::publish(Cluster::node_topic(name), new_min_data_store, min_data_store);
|
|
}
|
|
}
|
|
|
|
# On the manager, the new_item event indicates a new indicator that
|
|
# has to be distributed.
|
|
event Intel::new_item(item: Item) &priority=5
|
|
{
|
|
local pt = Cluster::rr_topic(Cluster::proxy_pool, "intel_insert_rr_key");
|
|
|
|
if ( pt == "" )
|
|
# No proxies alive, publish to all workers ourself instead of
|
|
# relaying via a proxy.
|
|
pt = Cluster::worker_topic;
|
|
|
|
Broker::publish(pt, Intel::insert_indicator, item);
|
|
}
|
|
|
|
# Handling of item insertion triggered by remote node.
|
|
event Intel::insert_item(item: Intel::Item) &priority=5
|
|
{
|
|
Intel::_insert(item, T);
|
|
}
|
|
|
|
# Handling of item removal triggered by remote node.
|
|
event Intel::remove_item(item: Item, purge_indicator: bool) &priority=5
|
|
{
|
|
remove(item, purge_indicator);
|
|
}
|
|
|
|
# Handling of match triggered by remote node.
|
|
event Intel::match_remote(s: Seen) &priority=5
|
|
{
|
|
if ( Intel::find(s) )
|
|
event Intel::match(s, Intel::get_items(s));
|
|
}
|
|
@endif
|
|
|
|
@if ( Cluster::local_node_type() == Cluster::WORKER )
|
|
event zeek_init()
|
|
{
|
|
Broker::auto_publish(Cluster::manager_topic, match_remote);
|
|
Broker::auto_publish(Cluster::manager_topic, remove_item);
|
|
}
|
|
|
|
# On a worker, the new_item event requires to trigger the insertion
|
|
# on the manager to update the back-end data store.
|
|
event Intel::new_item(item: Intel::Item) &priority=5
|
|
{
|
|
Broker::publish(Cluster::manager_topic, Intel::insert_item, item);
|
|
}
|
|
|
|
# Handling of new indicators published by the manager.
|
|
event Intel::insert_indicator(item: Intel::Item) &priority=5
|
|
{
|
|
Intel::_insert(item, F);
|
|
}
|
|
|
|
function invoke_indicator_hook(store: MinDataStore, h: hook(v: string, t: Intel::Type))
|
|
{
|
|
for ( a in store$host_data )
|
|
hook h(cat(a), Intel::ADDR);
|
|
|
|
for ( sn in store$subnet_data)
|
|
hook h(cat(sn), Intel::SUBNET);
|
|
|
|
for ( [indicator_value, indicator_type] in store$string_data )
|
|
hook h(indicator_value, indicator_type);
|
|
}
|
|
|
|
# Handling of a complete MinDataStore snapshot
|
|
#
|
|
# Invoke the removed and inserted hooks using the old and new min data store
|
|
# instances, respectively. The way this event is used, the original
|
|
# min_data_store should essentially be empty.
|
|
event new_min_data_store(store: MinDataStore)
|
|
{
|
|
invoke_indicator_hook(min_data_store, Intel::indicator_removed);
|
|
|
|
min_data_store = store;
|
|
|
|
invoke_indicator_hook(min_data_store, Intel::indicator_inserted);
|
|
}
|
|
@endif
|
|
|
|
@if ( Cluster::local_node_type() == Cluster::PROXY )
|
|
event Intel::insert_indicator(item: Intel::Item) &priority=5
|
|
{
|
|
# Just forwarding from manager to workers.
|
|
Broker::publish(Cluster::worker_topic, Intel::insert_indicator, item);
|
|
}
|
|
@endif
|
|
|