diff --git a/scripts/base/frameworks/intel/cluster.zeek b/scripts/base/frameworks/intel/cluster.zeek index d890e455f9..016b49f35c 100644 --- a/scripts/base/frameworks/intel/cluster.zeek +++ b/scripts/base/frameworks/intel/cluster.zeek @@ -11,6 +11,9 @@ module Intel; global insert_item: event(item: Item) &is_used; global insert_indicator: event(item: Item) &is_used; +# Event to transfer the min_data_store to connecting nodes. +global new_min_data_store: event(store: MinDataStore) &is_used; + # By default the manager sends its current min_data_store to connecting workers. # During testing it's handy to suppress this, since receipt of the store # introduces nondeterminism when mixed with explicit data insertions. @@ -22,9 +25,10 @@ redef have_full_data = F; @endif @if ( Cluster::local_node_type() == Cluster::MANAGER ) -event zeek_init() +# The manager propagates remove_indicator() to workers. +event remove_indicator(item: Item) { - Broker::auto_publish(Cluster::worker_topic, remove_indicator); + Broker::publish(Cluster::worker_topic, remove_indicator, item); } # Handling of new worker nodes. @@ -35,7 +39,7 @@ event Cluster::node_up(name: string, id: string) # this by the insert_indicator event. if ( send_store_on_node_up && name in Cluster::nodes && Cluster::nodes[name]$node_type == Cluster::WORKER ) { - Broker::publish_id(Cluster::node_topic(name), "Intel::min_data_store"); + Broker::publish(Cluster::node_topic(name), new_min_data_store, min_data_store); } } @@ -43,6 +47,9 @@ event Cluster::node_up(name: string, id: string) # has to be distributed. event Intel::new_item(item: Item) &priority=5 { + # This shouldn't be required, pushing directly from + # the manager is more efficient and has less round + # trips for non-broker backends. local pt = Cluster::rr_topic(Cluster::proxy_pool, "intel_insert_rr_key"); if ( pt == "" ) @@ -73,11 +80,16 @@ event Intel::match_remote(s: Seen) &priority=5 } @endif + @if ( Cluster::local_node_type() == Cluster::WORKER ) -event zeek_init() +event match_remote(s: Seen) { - Broker::auto_publish(Cluster::manager_topic, match_remote); - Broker::auto_publish(Cluster::manager_topic, remove_item); + Broker::publish(Cluster::manager_topic, match_remote, s); + } + +event remove_item(item: Item, purge_indicator: bool) + { + Broker::publish(Cluster::manager_topic, remove_item, item, purge_indicator); } # On a worker, the new_item event requires to trigger the insertion @@ -92,6 +104,12 @@ event Intel::insert_indicator(item: Intel::Item) &priority=5 { Intel::_insert(item, F); } + +# Handling of a complete MinDataStore snapshot +event new_min_data_store(store: MinDataStore) + { + min_data_store = store; + } @endif @if ( Cluster::local_node_type() == Cluster::PROXY )