Merge remote-tracking branch 'origin/topic/awelzel/deprecate-broker-auto-publish'

* origin/topic/awelzel/deprecate-broker-auto-publish:
  sumstats: Remove copy() for Broker::publish() calls
  broker/Publish: Use event time instead of network time
  broker/Eventhandler: Deprecate Broker::auto_publish() for v8.1
  btest: Remove Broker::auto_publish() usages
  frameworks/control: Remove Broker::auto_publish()
  catch-and-release: Remove Broker::auto_publish()
  ssl/validate-certs: Remove Broker::auto_publish()
  sumstats: Remove Broker::auto_publish()
  cluster_started: No Broker::auto_publish() use
  openflow: Remove Broker::auto_publish()
  dhcp: Remove Broker::auto_publish()
  frameworks/notice: Remove Broker::auto_publish()
  netcontrol: Replace Broker::auto_publish()
  intel: Switch to Cluster::publish()
  broker: Support publish() of unspecified set() / table()
  types: Fix table() resulting in table_type->IsSet() == true
This commit is contained in:
Arne Welzel 2024-11-14 14:15:24 +01:00
commit 18bfdb8a2b
47 changed files with 705 additions and 279 deletions

View file

@ -395,7 +395,7 @@ export {
## ev: a Zeek event value.
##
## Returns: true if automatic event sending is now enabled.
global auto_publish: function(topic: string, ev: any): bool;
global auto_publish: function(topic: string, ev: any): bool &deprecated="Remove in v8.1. Switch to explicit Broker::publish() calls. Auto-publish won't work with all cluster backends.";
## Stop automatically sending an event to peers upon local dispatch.
##
@ -405,7 +405,7 @@ export {
##
## Returns: true if automatic events will not occur for the topic/event
## pair.
global auto_unpublish: function(topic: string, ev: any): bool;
global auto_unpublish: function(topic: string, ev: any): bool &deprecated="Remove in v8.1. See Broker::auto_publish()";
}
@load base/bif/comm.bif

View file

@ -11,6 +11,9 @@ module Intel;
global insert_item: event(item: Item) &is_used;
global insert_indicator: event(item: Item) &is_used;
# Event to transfer the min_data_store to connecting nodes.
global new_min_data_store: event(store: MinDataStore) &is_used;
# By default the manager sends its current min_data_store to connecting workers.
# During testing it's handy to suppress this, since receipt of the store
# introduces nondeterminism when mixed with explicit data insertions.
@ -22,9 +25,10 @@ redef have_full_data = F;
@endif
@if ( Cluster::local_node_type() == Cluster::MANAGER )
event zeek_init()
# The manager propagates remove_indicator() to workers.
event remove_indicator(item: Item)
{
Broker::auto_publish(Cluster::worker_topic, remove_indicator);
Broker::publish(Cluster::worker_topic, remove_indicator, item);
}
# Handling of new worker nodes.
@ -35,7 +39,7 @@ event Cluster::node_up(name: string, id: string)
# this by the insert_indicator event.
if ( send_store_on_node_up && name in Cluster::nodes && Cluster::nodes[name]$node_type == Cluster::WORKER )
{
Broker::publish_id(Cluster::node_topic(name), "Intel::min_data_store");
Broker::publish(Cluster::node_topic(name), new_min_data_store, min_data_store);
}
}
@ -43,6 +47,9 @@ event Cluster::node_up(name: string, id: string)
# has to be distributed.
event Intel::new_item(item: Item) &priority=5
{
# This shouldn't be required, pushing directly from
# the manager is more efficient and has less round
# trips for non-broker backends.
local pt = Cluster::rr_topic(Cluster::proxy_pool, "intel_insert_rr_key");
if ( pt == "" )
@ -73,11 +80,16 @@ event Intel::match_remote(s: Seen) &priority=5
}
@endif
@if ( Cluster::local_node_type() == Cluster::WORKER )
event zeek_init()
event match_remote(s: Seen)
{
Broker::auto_publish(Cluster::manager_topic, match_remote);
Broker::auto_publish(Cluster::manager_topic, remove_item);
Broker::publish(Cluster::manager_topic, match_remote, s);
}
event remove_item(item: Item, purge_indicator: bool)
{
Broker::publish(Cluster::manager_topic, remove_item, item, purge_indicator);
}
# On a worker, the new_item event requires to trigger the insertion
@ -92,6 +104,12 @@ event Intel::insert_indicator(item: Intel::Item) &priority=5
{
Intel::_insert(item, F);
}
# Handling of a complete MinDataStore snapshot
event new_min_data_store(store: MinDataStore)
{
min_data_store = store;
}
@endif
@if ( Cluster::local_node_type() == Cluster::PROXY )

View file

@ -16,26 +16,6 @@ export {
global cluster_netcontrol_delete_rule: event(id: string, reason: string);
}
@if ( Cluster::local_node_type() == Cluster::MANAGER )
event zeek_init()
{
Broker::auto_publish(Cluster::worker_topic, NetControl::rule_added);
Broker::auto_publish(Cluster::worker_topic, NetControl::rule_removed);
Broker::auto_publish(Cluster::worker_topic, NetControl::rule_timeout);
Broker::auto_publish(Cluster::worker_topic, NetControl::rule_error);
Broker::auto_publish(Cluster::worker_topic, NetControl::rule_exists);
Broker::auto_publish(Cluster::worker_topic, NetControl::rule_new);
Broker::auto_publish(Cluster::worker_topic, NetControl::rule_destroyed);
}
@else
event zeek_init()
{
Broker::auto_publish(Cluster::manager_topic, NetControl::cluster_netcontrol_add_rule);
Broker::auto_publish(Cluster::manager_topic, NetControl::cluster_netcontrol_remove_rule);
Broker::auto_publish(Cluster::manager_topic, NetControl::cluster_netcontrol_delete_rule);
}
@endif
function activate(p: PluginState, priority: int)
{
# We only run the activate function on the manager.
@ -66,7 +46,7 @@ function add_rule(r: Rule) : string
if ( r$id == "" )
r$id = cat(Cluster::node, ":", ++local_rule_count);
event NetControl::cluster_netcontrol_add_rule(r);
Broker::publish(Cluster::manager_topic, NetControl::cluster_netcontrol_add_rule, r);
return r$id;
}
}
@ -77,7 +57,7 @@ function delete_rule(id: string, reason: string &default="") : bool
return delete_rule_impl(id, reason);
else
{
event NetControl::cluster_netcontrol_delete_rule(id, reason);
Broker::publish(Cluster::manager_topic, NetControl::cluster_netcontrol_delete_rule, id, reason);
return T; # well, we can't know here. So - just hope...
}
}
@ -88,7 +68,7 @@ function remove_rule(id: string, reason: string &default="") : bool
return remove_rule_impl(id, reason);
else
{
event NetControl::cluster_netcontrol_remove_rule(id, reason);
Broker::publish(Cluster::manager_topic, NetControl::cluster_netcontrol_remove_rule, id, reason);
return T; # well, we can't know here. So - just hope...
}
}
@ -120,6 +100,8 @@ event rule_exists(r: Rule, p: PluginState, msg: string) &priority=5
if ( r?$expire && r$expire > 0secs && ! p$plugin$can_expire )
schedule r$expire { rule_expire(r, p) };
Broker::publish(Cluster::worker_topic, rule_exists, r, p, msg);
}
event rule_added(r: Rule, p: PluginState, msg: string) &priority=5
@ -128,21 +110,39 @@ event rule_added(r: Rule, p: PluginState, msg: string) &priority=5
if ( r?$expire && r$expire > 0secs && ! p$plugin$can_expire )
schedule r$expire { rule_expire(r, p) };
Broker::publish(Cluster::worker_topic, rule_added, r, p, msg);
}
event rule_removed(r: Rule, p: PluginState, msg: string) &priority=-5
{
rule_removed_impl(r, p, msg);
Broker::publish(Cluster::worker_topic, rule_removed, r, p, msg);
}
event rule_timeout(r: Rule, i: FlowInfo, p: PluginState) &priority=-5
{
rule_timeout_impl(r, i, p);
Broker::publish(Cluster::worker_topic, rule_timeout, r, i, p);
}
event rule_error(r: Rule, p: PluginState, msg: string) &priority=-5
{
rule_error_impl(r, p, msg);
Broker::publish(Cluster::worker_topic, rule_error, r, msg);
}
event rule_new(r: Rule)
{
Broker::publish(Cluster::worker_topic, rule_new, r);
}
event rule_destroyed(r: Rule)
{
Broker::publish(Cluster::worker_topic, rule_destroyed, r);
}
@endif

View file

@ -281,18 +281,6 @@ export {
## identifier: The identifier string of the notice that should be suppressed.
global begin_suppression: event(ts: time, suppress_for: interval, note: Type, identifier: string);
## This is an internal event that is used to broadcast the begin_suppression
## event over a cluster.
##
## ts: time indicating then when the notice to be suppressed occurred.
##
## suppress_for: length of time that this notice should be suppressed.
##
## note: The :zeek:type:`Notice::Type` of the notice.
##
## identifier: The identifier string of the notice that should be suppressed.
global manager_begin_suppression: event(ts: time, suppress_for: interval, note: Type, identifier: string);
## A function to determine if an event is supposed to be suppressed.
##
## n: The record containing the notice in question.
@ -536,39 +524,34 @@ hook Notice::notice(n: Notice::Info) &priority=-5
event Notice::begin_suppression(n$ts, n$suppress_for, n$note, n$identifier);
suppressing[n$note, n$identifier] = n$ts + n$suppress_for;
@if ( Cluster::is_enabled() && Cluster::local_node_type() != Cluster::MANAGER )
event Notice::manager_begin_suppression(n$ts, n$suppress_for, n$note, n$identifier);
# Notify the manager about the new suppression, it'll broadcast
# to the other nodes in the cluster.
# Once we have global pub/sub, we could also unconditionally
# send to a notice specific topic for communicating
# suppressions directly to all nodes.
Broker::publish(Cluster::manager_topic, Notice::begin_suppression,
n$ts, n$suppress_for, n$note, n$identifier);
@endif
}
}
event Notice::begin_suppression(ts: time, suppress_for: interval, note: Type,
identifier: string)
# The manager currently re-publishes Notice::begin_suppression to worker
# and proxy nodes.
@if ( Cluster::is_enabled() && Cluster::local_node_type() == Cluster::MANAGER )
event Notice::begin_suppression(ts: time, suppress_for: interval, note: Type, identifier: string)
{
local e = Broker::make_event(Notice::begin_suppression, ts, suppress_for, note, identifier);
Broker::publish(Cluster::worker_topic, e);
Broker::publish(Cluster::proxy_topic, e);
}
@endif
event Notice::begin_suppression(ts: time, suppress_for: interval, note: Type, identifier: string)
{
local suppress_until = ts + suppress_for;
suppressing[note, identifier] = suppress_until;
}
@if ( Cluster::is_enabled() && Cluster::local_node_type() == Cluster::MANAGER )
event zeek_init()
{
Broker::auto_publish(Cluster::worker_topic, Notice::begin_suppression);
Broker::auto_publish(Cluster::proxy_topic, Notice::begin_suppression);
}
event Notice::manager_begin_suppression(ts: time, suppress_for: interval, note: Type,
identifier: string)
{
event Notice::begin_suppression(ts, suppress_for, note, identifier);
}
@endif
@if ( Cluster::is_enabled() && Cluster::local_node_type() != Cluster::MANAGER )
event zeek_init()
{
Broker::auto_publish(Cluster::manager_topic, Notice::manager_begin_suppression);
}
@endif
function is_being_suppressed(n: Notice::Info): bool
{
if ( n?$identifier && [n$note, n$identifier] in suppressing )

View file

@ -13,15 +13,6 @@ export {
global cluster_flow_clear: event(name: string);
}
@if ( Cluster::local_node_type() != Cluster::MANAGER )
# Workers need ability to forward commands to manager.
event zeek_init()
{
Broker::auto_publish(Cluster::manager_topic, OpenFlow::cluster_flow_mod);
Broker::auto_publish(Cluster::manager_topic, OpenFlow::cluster_flow_clear);
}
@endif
# the flow_mod function wrapper
function flow_mod(controller: Controller, match: ofp_match, flow_mod: ofp_flow_mod): bool
{
@ -31,7 +22,7 @@ function flow_mod(controller: Controller, match: ofp_match, flow_mod: ofp_flow_m
if ( Cluster::local_node_type() == Cluster::MANAGER )
return controller$flow_mod(controller$state, match, flow_mod);
else
event OpenFlow::cluster_flow_mod(controller$state$_name, match, flow_mod);
Broker::publish(Cluster::manager_topic, OpenFlow::cluster_flow_mod, controller$state$_name, match, flow_mod);
return T;
}
@ -44,7 +35,7 @@ function flow_clear(controller: Controller): bool
if ( Cluster::local_node_type() == Cluster::MANAGER )
return controller$flow_clear(controller$state);
else
event OpenFlow::cluster_flow_clear(controller$state$_name);
Broker::publish(Cluster::manager_topic, OpenFlow::cluster_flow_clear, controller$state$_name);
return T;
}

View file

@ -61,14 +61,6 @@ global recent_global_view_keys: set[string, Key] &create_expire=1min;
@if ( Cluster::local_node_type() != Cluster::MANAGER )
event zeek_init() &priority=100
{
Broker::auto_publish(Cluster::manager_topic, SumStats::cluster_send_result);
Broker::auto_publish(Cluster::manager_topic, SumStats::cluster_key_intermediate_response);
Broker::auto_publish(Cluster::manager_topic, SumStats::send_a_key);
Broker::auto_publish(Cluster::manager_topic, SumStats::send_no_key);
}
# Result tables indexed on a uid that are currently being sent to the
# manager.
global sending_results: table[string] of ResultTable = table() &read_expire=1min;
@ -87,7 +79,8 @@ function data_added(ss: SumStat, key: Key, result: Result)
if ( check_thresholds(ss, key, result, cluster_request_global_view_percent) )
{
# kick off intermediate update
event SumStats::cluster_key_intermediate_response(ss$name, key);
Broker::publish(Cluster::manager_topic, SumStats::cluster_key_intermediate_response,
ss$name, key);
add recent_global_view_keys[ss$name, key];
}
}
@ -98,13 +91,15 @@ event SumStats::get_a_key(uid: string, ss_name: string, cleanup: bool)
{
if ( |sending_results[uid]| == 0 )
{
event SumStats::send_no_key(uid, ss_name);
Broker::publish(Cluster::manager_topic, SumStats::send_no_key,
uid, ss_name);
}
else
{
for ( key in sending_results[uid] )
{
event SumStats::send_a_key(uid, ss_name, key);
Broker::publish(Cluster::manager_topic, SumStats::send_a_key,
uid, ss_name, key);
# break to only send one.
break;
}
@ -120,7 +115,8 @@ event SumStats::get_a_key(uid: string, ss_name: string, cleanup: bool)
{
for ( key in result_store[ss_name] )
{
event SumStats::send_a_key(uid, ss_name, key);
Broker::publish(Cluster::manager_topic, SumStats::send_a_key,
uid, ss_name, key);
# break to only send one.
break;
}
@ -128,7 +124,8 @@ event SumStats::get_a_key(uid: string, ss_name: string, cleanup: bool)
}
else
{
event SumStats::send_no_key(uid, ss_name);
Broker::publish(Cluster::manager_topic, SumStats::send_no_key,
uid, ss_name);
}
}
@ -154,31 +151,31 @@ event SumStats::cluster_get_result(uid: string, ss_name: string, key: Key, clean
{
if ( uid in sending_results && key in sending_results[uid] )
{
# Note: copy is needed to compensate serialization caching issue. This should be
# changed to something else later.
event SumStats::cluster_send_result(uid, ss_name, key, copy(sending_results[uid][key]), cleanup);
Broker::publish(Cluster::manager_topic, SumStats::cluster_send_result,
uid, ss_name, key, sending_results[uid][key], cleanup);
delete sending_results[uid][key];
}
else
{
# We need to send an empty response if we don't have the data so that the manager
# can know that it heard back from all of the workers.
event SumStats::cluster_send_result(uid, ss_name, key, table(), cleanup);
Broker::publish(Cluster::manager_topic, SumStats::cluster_send_result,
uid, ss_name, key, table(), cleanup);
}
}
else
{
if ( ss_name in result_store && key in result_store[ss_name] )
{
# Note: copy is needed to compensate serialization caching issue. This should be
# changed to something else later.
event SumStats::cluster_send_result(uid, ss_name, key, copy(result_store[ss_name][key]), cleanup);
Broker::publish(Cluster::manager_topic, SumStats::cluster_send_result,
uid, ss_name, key, result_store[ss_name][key], cleanup);
}
else
{
# We need to send an empty response if we don't have the data so that the manager
# can know that it heard back from all of the workers.
event SumStats::cluster_send_result(uid, ss_name, key, table(), cleanup);
Broker::publish(Cluster::manager_topic, SumStats::cluster_send_result,
uid, ss_name, key, table(), cleanup);
}
}
}
@ -209,14 +206,6 @@ function request_key(ss_name: string, key: Key): Result
@if ( Cluster::local_node_type() == Cluster::MANAGER )
event zeek_init() &priority=100
{
Broker::auto_publish(Cluster::worker_topic, SumStats::cluster_ss_request);
Broker::auto_publish(Cluster::worker_topic, SumStats::cluster_get_result);
Broker::auto_publish(Cluster::worker_topic, SumStats::cluster_threshold_crossed);
Broker::auto_publish(Cluster::worker_topic, SumStats::get_a_key);
}
# This variable is maintained by manager nodes as they collect and aggregate
# results.
# Index on a uid.
@ -263,12 +252,14 @@ event SumStats::finish_epoch(ss: SumStat)
stats_keys[uid] = set();
# Request data from peers.
event SumStats::cluster_ss_request(uid, ss$name, T);
Broker::publish(Cluster::worker_topic, SumStats::cluster_ss_request,
uid, ss$name, T);
done_with[uid] = 0;
#print fmt("get_key by uid: %s", uid);
event SumStats::get_a_key(uid, ss$name, T);
Broker::publish(Cluster::worker_topic, SumStats::get_a_key,
uid, ss$name, T);
}
# Schedule the next finish_epoch event.
@ -283,7 +274,8 @@ function data_added(ss: SumStat, key: Key, result: Result)
if ( check_thresholds(ss, key, result, 1.0) )
{
threshold_crossed(ss, key, result);
event SumStats::cluster_threshold_crossed(ss$name, key, threshold_tracker[ss$name][key]);
Broker::publish(Cluster::worker_topic, SumStats::cluster_threshold_crossed,
ss$name, key, threshold_tracker[ss$name][key]);
}
}
@ -300,7 +292,8 @@ function handle_end_of_result_collection(uid: string, ss_name: string, key: Key,
if ( check_thresholds(ss, key, ir, 1.0) )
{
threshold_crossed(ss, key, ir);
event SumStats::cluster_threshold_crossed(ss_name, key, threshold_tracker[ss_name][key]);
Broker::publish(Cluster::worker_topic, SumStats::cluster_threshold_crossed,
ss_name, key, threshold_tracker[ss_name][key]);
}
if ( cleanup )
@ -336,7 +329,8 @@ function request_all_current_keys(uid: string, ss_name: string, cleanup: bool)
}
done_with[uid] = 0;
event SumStats::cluster_get_result(uid, ss_name, key, cleanup);
Broker::publish(Cluster::worker_topic, SumStats::cluster_get_result,
uid, ss_name, key, cleanup);
delete stats_keys[uid][key];
}
else
@ -344,7 +338,8 @@ function request_all_current_keys(uid: string, ss_name: string, cleanup: bool)
# Get more keys! And this breaks us out of the evented loop.
done_with[uid] = 0;
#print fmt("get_key by uid: %s", uid);
event SumStats::get_a_key(uid, ss_name, cleanup);
Broker::publish(Cluster::worker_topic, SumStats::get_a_key,
uid, ss_name, cleanup);
}
}
@ -469,7 +464,8 @@ event SumStats::cluster_key_intermediate_response(ss_name: string, key: Key)
add outstanding_global_views[ss_name][uid];
done_with[uid] = 0;
#print fmt("requesting results for: %s", uid);
event SumStats::cluster_get_result(uid, ss_name, key, F);
Broker::publish(Cluster::worker_topic, SumStats::cluster_get_result,
uid, ss_name, key, F);
}
function request_key(ss_name: string, key: Key): Result
@ -479,7 +475,8 @@ function request_key(ss_name: string, key: Key): Result
key_requests[uid] = table();
add dynamic_requests[uid];
event SumStats::cluster_get_result(uid, ss_name, key, F);
Broker::publish(Cluster::worker_topic, SumStats::cluster_get_result,
uid, ss_name, key, F);
return when [uid, ss_name, key] ( uid in done_with &&
Cluster::get_active_node_count(Cluster::WORKER) == done_with[uid] )
{

View file

@ -134,13 +134,6 @@ event zeek_init() &priority=5
Analyzer::register_for_ports(Analyzer::ANALYZER_DHCP, ports);
}
@if ( Cluster::is_enabled() )
event zeek_init()
{
Broker::auto_publish(Cluster::manager_topic, DHCP::aggregate_msgs);
}
@endif
function join_data_expiration(t: table[count] of Info, idx: count): interval
{
local info = t[idx];
@ -307,7 +300,11 @@ event DHCP::aggregate_msgs(ts: time, id: conn_id, uid: string, is_orig: bool, ms
# Aggregate DHCP messages to the manager.
event dhcp_message(c: connection, is_orig: bool, msg: DHCP::Msg, options: DHCP::Options) &priority=-5
{
event DHCP::aggregate_msgs(network_time(), c$id, c$uid, is_orig, msg, options);
if ( Cluster::is_enabled() && Cluster::local_node_type() != Cluster::MANAGER )
Broker::publish(Cluster::manager_topic, DHCP::aggregate_msgs,
network_time(), c$id, c$uid, is_orig, msg, options);
else
event DHCP::aggregate_msgs(network_time(), c$id, c$uid, is_orig, msg, options);
}
event zeek_done() &priority=-5