intel: Switch to Cluster::publish()

This isn't quite making things a lot nicer, but more explicit.
This commit is contained in:
Arne Welzel 2024-11-05 16:26:01 +01:00
parent 91c03cd988
commit 93478a246e

View file

@ -11,6 +11,9 @@ module Intel;
global insert_item: event(item: Item) &is_used;
global insert_indicator: event(item: Item) &is_used;
# Event to transfer the min_data_store to connecting nodes.
global new_min_data_store: event(store: MinDataStore) &is_used;
# By default the manager sends its current min_data_store to connecting workers.
# During testing it's handy to suppress this, since receipt of the store
# introduces nondeterminism when mixed with explicit data insertions.
@ -22,9 +25,10 @@ redef have_full_data = F;
@endif
@if ( Cluster::local_node_type() == Cluster::MANAGER )
event zeek_init()
# The manager propagates remove_indicator() to workers.
event remove_indicator(item: Item)
{
Broker::auto_publish(Cluster::worker_topic, remove_indicator);
Broker::publish(Cluster::worker_topic, remove_indicator, item);
}
# Handling of new worker nodes.
@ -35,7 +39,7 @@ event Cluster::node_up(name: string, id: string)
# this by the insert_indicator event.
if ( send_store_on_node_up && name in Cluster::nodes && Cluster::nodes[name]$node_type == Cluster::WORKER )
{
Broker::publish_id(Cluster::node_topic(name), "Intel::min_data_store");
Broker::publish(Cluster::node_topic(name), new_min_data_store, min_data_store);
}
}
@ -43,6 +47,9 @@ event Cluster::node_up(name: string, id: string)
# has to be distributed.
event Intel::new_item(item: Item) &priority=5
{
# This shouldn't be required, pushing directly from
# the manager is more efficient and has less round
# trips for non-broker backends.
local pt = Cluster::rr_topic(Cluster::proxy_pool, "intel_insert_rr_key");
if ( pt == "" )
@ -73,11 +80,16 @@ event Intel::match_remote(s: Seen) &priority=5
}
@endif
@if ( Cluster::local_node_type() == Cluster::WORKER )
event zeek_init()
event match_remote(s: Seen)
{
Broker::auto_publish(Cluster::manager_topic, match_remote);
Broker::auto_publish(Cluster::manager_topic, remove_item);
Broker::publish(Cluster::manager_topic, match_remote, s);
}
event remove_item(item: Item, purge_indicator: bool)
{
Broker::publish(Cluster::manager_topic, remove_item, item, purge_indicator);
}
# On a worker, the new_item event requires to trigger the insertion
@ -92,6 +104,12 @@ event Intel::insert_indicator(item: Intel::Item) &priority=5
{
Intel::_insert(item, F);
}
# Handling of a complete MinDataStore snapshot
event new_min_data_store(store: MinDataStore)
{
min_data_store = store;
}
@endif
@if ( Cluster::local_node_type() == Cluster::PROXY )