Merge remote-tracking branch 'origin/topic/awelzel/move-broker-to-cluster-publish'

* origin/topic/awelzel/move-broker-to-cluster-publish:
  netcontrol: Move to Cluster::publish()
  openflow: Move to Cluster::publish()
  netcontrol/catch-and-release: Move to Cluster::publish()
  config: Move to Cluster::publish()
  ssl/validate-certs: Move to Cluster::publish()
  irc: Move to Cluster::publish()
  ftp: Move to Cluster::publish()
  dhcp: Move to cluster publish
  notice: Move to Cluster::publish()
  intel: Move to Cluster::publish()
  sumstats: Move to Cluster::publish()
This commit is contained in:
Tim Wojtulewicz 2024-12-12 13:17:51 -07:00
commit 1158757b2b
13 changed files with 109 additions and 85 deletions

24
CHANGES
View file

@ -1,3 +1,27 @@
7.1.0-dev.808 | 2024-12-12 13:17:51 -0700
* netcontrol: Move to Cluster::publish() (Arne Welzel, Corelight)
* openflow: Move to Cluster::publish() (Arne Welzel, Corelight)
* netcontrol/catch-and-release: Move to Cluster::publish() (Arne Welzel, Corelight)
* config: Move to Cluster::publish() (Arne Welzel, Corelight)
* ssl/validate-certs: Move to Cluster::publish() (Arne Welzel, Corelight)
* irc: Move to Cluster::publish() (Arne Welzel, Corelight)
* ftp: Move to Cluster::publish() (Arne Welzel, Corelight)
* dhcp: Move to cluster publish (Arne Welzel, Corelight)
* notice: Move to Cluster::publish() (Arne Welzel, Corelight)
* intel: Move to Cluster::publish() (Arne Welzel, Corelight)
* sumstats: Move to Cluster::publish() (Arne Welzel, Corelight)
7.1.0-dev.796 | 2024-12-12 13:16:57 -0700 7.1.0-dev.796 | 2024-12-12 13:16:57 -0700
* cluster/Backend: Handle unspecified table/set (Arne Welzel, Corelight) * cluster/Backend: Handle unspecified table/set (Arne Welzel, Corelight)

View file

@ -1 +1 @@
7.1.0-dev.796 7.1.0-dev.808

View file

@ -60,7 +60,7 @@ global Config::cluster_set_option: event(ID: string, val: any, location: string)
function broadcast_option(ID: string, val: any, location: string) &is_used function broadcast_option(ID: string, val: any, location: string) &is_used
{ {
for ( topic in Cluster::broadcast_topics ) for ( topic in Cluster::broadcast_topics )
Broker::publish(topic, Config::cluster_set_option, ID, val, location); Cluster::publish(topic, Config::cluster_set_option, ID, val, location);
} }
event Config::cluster_set_option(ID: string, val: any, location: string) event Config::cluster_set_option(ID: string, val: any, location: string)
@ -89,7 +89,7 @@ function set_value(ID: string, val: any, location: string &default = ""): bool
option_cache[ID] = OptionCacheValue($val=val, $location=location); option_cache[ID] = OptionCacheValue($val=val, $location=location);
broadcast_option(ID, val, location); broadcast_option(ID, val, location);
@else @else
Broker::publish(Cluster::manager_topic, Config::cluster_set_option, Cluster::publish(Cluster::manager_topic, Config::cluster_set_option,
ID, val, location); ID, val, location);
@endif @endif
@ -109,7 +109,7 @@ event Cluster::node_up(name: string, id: string) &priority=-10
# When a node connects, send it all current Option values. # When a node connects, send it all current Option values.
if ( name in Cluster::nodes ) if ( name in Cluster::nodes )
for ( ID in option_cache ) for ( ID in option_cache )
Broker::publish(Cluster::node_topic(name), Config::cluster_set_option, ID, option_cache[ID]$val, option_cache[ID]$location); Cluster::publish(Cluster::node_topic(name), Config::cluster_set_option, ID, option_cache[ID]$val, option_cache[ID]$location);
} }
@endif @endif

View file

@ -28,7 +28,7 @@ redef have_full_data = F;
# The manager propagates remove_indicator() to workers. # The manager propagates remove_indicator() to workers.
event remove_indicator(item: Item) event remove_indicator(item: Item)
{ {
Broker::publish(Cluster::worker_topic, remove_indicator, item); Cluster::publish(Cluster::worker_topic, remove_indicator, item);
} }
# Handling of new worker nodes. # Handling of new worker nodes.
@ -39,7 +39,7 @@ event Cluster::node_up(name: string, id: string)
# this by the insert_indicator event. # this by the insert_indicator event.
if ( send_store_on_node_up && name in Cluster::nodes && Cluster::nodes[name]$node_type == Cluster::WORKER ) if ( send_store_on_node_up && name in Cluster::nodes && Cluster::nodes[name]$node_type == Cluster::WORKER )
{ {
Broker::publish(Cluster::node_topic(name), new_min_data_store, min_data_store); Cluster::publish(Cluster::node_topic(name), new_min_data_store, min_data_store);
} }
} }
@ -57,7 +57,7 @@ event Intel::new_item(item: Item) &priority=5
# relaying via a proxy. # relaying via a proxy.
pt = Cluster::worker_topic; pt = Cluster::worker_topic;
Broker::publish(pt, Intel::insert_indicator, item); Cluster::publish(pt, Intel::insert_indicator, item);
} }
# Handling of item insertion triggered by remote node. # Handling of item insertion triggered by remote node.
@ -84,19 +84,19 @@ event Intel::match_remote(s: Seen) &priority=5
@if ( Cluster::local_node_type() == Cluster::WORKER ) @if ( Cluster::local_node_type() == Cluster::WORKER )
event match_remote(s: Seen) event match_remote(s: Seen)
{ {
Broker::publish(Cluster::manager_topic, match_remote, s); Cluster::publish(Cluster::manager_topic, match_remote, s);
} }
event remove_item(item: Item, purge_indicator: bool) event remove_item(item: Item, purge_indicator: bool)
{ {
Broker::publish(Cluster::manager_topic, remove_item, item, purge_indicator); Cluster::publish(Cluster::manager_topic, remove_item, item, purge_indicator);
} }
# On a worker, the new_item event requires to trigger the insertion # On a worker, the new_item event requires to trigger the insertion
# on the manager to update the back-end data store. # on the manager to update the back-end data store.
event Intel::new_item(item: Intel::Item) &priority=5 event Intel::new_item(item: Intel::Item) &priority=5
{ {
Broker::publish(Cluster::manager_topic, Intel::insert_item, item); Cluster::publish(Cluster::manager_topic, Intel::insert_item, item);
} }
# Handling of new indicators published by the manager. # Handling of new indicators published by the manager.
@ -116,7 +116,7 @@ event new_min_data_store(store: MinDataStore)
event Intel::insert_indicator(item: Intel::Item) &priority=5 event Intel::insert_indicator(item: Intel::Item) &priority=5
{ {
# Just forwarding from manager to workers. # Just forwarding from manager to workers.
Broker::publish(Cluster::worker_topic, Intel::insert_indicator, item); Cluster::publish(Cluster::worker_topic, Intel::insert_indicator, item);
} }
@endif @endif

View file

@ -46,7 +46,7 @@ function add_rule(r: Rule) : string
if ( r$id == "" ) if ( r$id == "" )
r$id = cat(Cluster::node, ":", ++local_rule_count); r$id = cat(Cluster::node, ":", ++local_rule_count);
Broker::publish(Cluster::manager_topic, NetControl::cluster_netcontrol_add_rule, r); Cluster::publish(Cluster::manager_topic, NetControl::cluster_netcontrol_add_rule, r);
return r$id; return r$id;
} }
} }
@ -57,7 +57,7 @@ function delete_rule(id: string, reason: string &default="") : bool
return delete_rule_impl(id, reason); return delete_rule_impl(id, reason);
else else
{ {
Broker::publish(Cluster::manager_topic, NetControl::cluster_netcontrol_delete_rule, id, reason); Cluster::publish(Cluster::manager_topic, NetControl::cluster_netcontrol_delete_rule, id, reason);
return T; # well, we can't know here. So - just hope... return T; # well, we can't know here. So - just hope...
} }
} }
@ -68,7 +68,7 @@ function remove_rule(id: string, reason: string &default="") : bool
return remove_rule_impl(id, reason); return remove_rule_impl(id, reason);
else else
{ {
Broker::publish(Cluster::manager_topic, NetControl::cluster_netcontrol_remove_rule, id, reason); Cluster::publish(Cluster::manager_topic, NetControl::cluster_netcontrol_remove_rule, id, reason);
return T; # well, we can't know here. So - just hope... return T; # well, we can't know here. So - just hope...
} }
} }
@ -101,7 +101,7 @@ event rule_exists(r: Rule, p: PluginState, msg: string) &priority=5
if ( r?$expire && r$expire > 0secs && ! p$plugin$can_expire ) if ( r?$expire && r$expire > 0secs && ! p$plugin$can_expire )
schedule r$expire { rule_expire(r, p) }; schedule r$expire { rule_expire(r, p) };
Broker::publish(Cluster::worker_topic, rule_exists, r, p, msg); Cluster::publish(Cluster::worker_topic, rule_exists, r, p, msg);
} }
event rule_added(r: Rule, p: PluginState, msg: string) &priority=5 event rule_added(r: Rule, p: PluginState, msg: string) &priority=5
@ -111,38 +111,38 @@ event rule_added(r: Rule, p: PluginState, msg: string) &priority=5
if ( r?$expire && r$expire > 0secs && ! p$plugin$can_expire ) if ( r?$expire && r$expire > 0secs && ! p$plugin$can_expire )
schedule r$expire { rule_expire(r, p) }; schedule r$expire { rule_expire(r, p) };
Broker::publish(Cluster::worker_topic, rule_added, r, p, msg); Cluster::publish(Cluster::worker_topic, rule_added, r, p, msg);
} }
event rule_removed(r: Rule, p: PluginState, msg: string) &priority=-5 event rule_removed(r: Rule, p: PluginState, msg: string) &priority=-5
{ {
rule_removed_impl(r, p, msg); rule_removed_impl(r, p, msg);
Broker::publish(Cluster::worker_topic, rule_removed, r, p, msg); Cluster::publish(Cluster::worker_topic, rule_removed, r, p, msg);
} }
event rule_timeout(r: Rule, i: FlowInfo, p: PluginState) &priority=-5 event rule_timeout(r: Rule, i: FlowInfo, p: PluginState) &priority=-5
{ {
rule_timeout_impl(r, i, p); rule_timeout_impl(r, i, p);
Broker::publish(Cluster::worker_topic, rule_timeout, r, i, p); Cluster::publish(Cluster::worker_topic, rule_timeout, r, i, p);
} }
event rule_error(r: Rule, p: PluginState, msg: string) &priority=-5 event rule_error(r: Rule, p: PluginState, msg: string) &priority=-5
{ {
rule_error_impl(r, p, msg); rule_error_impl(r, p, msg);
Broker::publish(Cluster::worker_topic, rule_error, r, msg); Cluster::publish(Cluster::worker_topic, rule_error, r, msg);
} }
event rule_new(r: Rule) event rule_new(r: Rule)
{ {
Broker::publish(Cluster::worker_topic, rule_new, r); Cluster::publish(Cluster::worker_topic, rule_new, r);
} }
event rule_destroyed(r: Rule) event rule_destroyed(r: Rule)
{ {
Broker::publish(Cluster::worker_topic, rule_destroyed, r); Cluster::publish(Cluster::worker_topic, rule_destroyed, r);
} }
@endif @endif

View file

@ -529,7 +529,7 @@ hook Notice::notice(n: Notice::Info) &priority=-5
# Once we have global pub/sub, we could also unconditionally # Once we have global pub/sub, we could also unconditionally
# send to a notice specific topic for communicating # send to a notice specific topic for communicating
# suppressions directly to all nodes. # suppressions directly to all nodes.
Broker::publish(Cluster::manager_topic, Notice::begin_suppression, Cluster::publish(Cluster::manager_topic, Notice::begin_suppression,
n$ts, n$suppress_for, n$note, n$identifier); n$ts, n$suppress_for, n$note, n$identifier);
@endif @endif
} }
@ -540,9 +540,9 @@ hook Notice::notice(n: Notice::Info) &priority=-5
@if ( Cluster::is_enabled() && Cluster::local_node_type() == Cluster::MANAGER ) @if ( Cluster::is_enabled() && Cluster::local_node_type() == Cluster::MANAGER )
event Notice::begin_suppression(ts: time, suppress_for: interval, note: Type, identifier: string) event Notice::begin_suppression(ts: time, suppress_for: interval, note: Type, identifier: string)
{ {
local e = Broker::make_event(Notice::begin_suppression, ts, suppress_for, note, identifier); local e = Cluster::make_event(Notice::begin_suppression, ts, suppress_for, note, identifier);
Broker::publish(Cluster::worker_topic, e); Cluster::publish(Cluster::worker_topic, e);
Broker::publish(Cluster::proxy_topic, e); Cluster::publish(Cluster::proxy_topic, e);
} }
@endif @endif

View file

@ -22,7 +22,7 @@ function flow_mod(controller: Controller, match: ofp_match, flow_mod: ofp_flow_m
if ( Cluster::local_node_type() == Cluster::MANAGER ) if ( Cluster::local_node_type() == Cluster::MANAGER )
return controller$flow_mod(controller$state, match, flow_mod); return controller$flow_mod(controller$state, match, flow_mod);
else else
Broker::publish(Cluster::manager_topic, OpenFlow::cluster_flow_mod, controller$state$_name, match, flow_mod); Cluster::publish(Cluster::manager_topic, OpenFlow::cluster_flow_mod, controller$state$_name, match, flow_mod);
return T; return T;
} }
@ -35,7 +35,7 @@ function flow_clear(controller: Controller): bool
if ( Cluster::local_node_type() == Cluster::MANAGER ) if ( Cluster::local_node_type() == Cluster::MANAGER )
return controller$flow_clear(controller$state); return controller$flow_clear(controller$state);
else else
Broker::publish(Cluster::manager_topic, OpenFlow::cluster_flow_clear, controller$state$_name); Cluster::publish(Cluster::manager_topic, OpenFlow::cluster_flow_clear, controller$state$_name);
return T; return T;
} }

View file

@ -79,7 +79,7 @@ function data_added(ss: SumStat, key: Key, result: Result)
if ( check_thresholds(ss, key, result, cluster_request_global_view_percent) ) if ( check_thresholds(ss, key, result, cluster_request_global_view_percent) )
{ {
# kick off intermediate update # kick off intermediate update
Broker::publish(Cluster::manager_topic, SumStats::cluster_key_intermediate_response, Cluster::publish(Cluster::manager_topic, SumStats::cluster_key_intermediate_response,
ss$name, key); ss$name, key);
add recent_global_view_keys[ss$name, key]; add recent_global_view_keys[ss$name, key];
} }
@ -91,14 +91,14 @@ event SumStats::get_a_key(uid: string, ss_name: string, cleanup: bool)
{ {
if ( |sending_results[uid]| == 0 ) if ( |sending_results[uid]| == 0 )
{ {
Broker::publish(Cluster::manager_topic, SumStats::send_no_key, Cluster::publish(Cluster::manager_topic, SumStats::send_no_key,
uid, ss_name); uid, ss_name);
} }
else else
{ {
for ( key in sending_results[uid] ) for ( key in sending_results[uid] )
{ {
Broker::publish(Cluster::manager_topic, SumStats::send_a_key, Cluster::publish(Cluster::manager_topic, SumStats::send_a_key,
uid, ss_name, key); uid, ss_name, key);
# break to only send one. # break to only send one.
break; break;
@ -115,7 +115,7 @@ event SumStats::get_a_key(uid: string, ss_name: string, cleanup: bool)
{ {
for ( key in result_store[ss_name] ) for ( key in result_store[ss_name] )
{ {
Broker::publish(Cluster::manager_topic, SumStats::send_a_key, Cluster::publish(Cluster::manager_topic, SumStats::send_a_key,
uid, ss_name, key); uid, ss_name, key);
# break to only send one. # break to only send one.
break; break;
@ -124,7 +124,7 @@ event SumStats::get_a_key(uid: string, ss_name: string, cleanup: bool)
} }
else else
{ {
Broker::publish(Cluster::manager_topic, SumStats::send_no_key, Cluster::publish(Cluster::manager_topic, SumStats::send_no_key,
uid, ss_name); uid, ss_name);
} }
} }
@ -151,7 +151,7 @@ event SumStats::cluster_get_result(uid: string, ss_name: string, key: Key, clean
{ {
if ( uid in sending_results && key in sending_results[uid] ) if ( uid in sending_results && key in sending_results[uid] )
{ {
Broker::publish(Cluster::manager_topic, SumStats::cluster_send_result, Cluster::publish(Cluster::manager_topic, SumStats::cluster_send_result,
uid, ss_name, key, sending_results[uid][key], cleanup); uid, ss_name, key, sending_results[uid][key], cleanup);
delete sending_results[uid][key]; delete sending_results[uid][key];
} }
@ -159,7 +159,7 @@ event SumStats::cluster_get_result(uid: string, ss_name: string, key: Key, clean
{ {
# We need to send an empty response if we don't have the data so that the manager # We need to send an empty response if we don't have the data so that the manager
# can know that it heard back from all of the workers. # can know that it heard back from all of the workers.
Broker::publish(Cluster::manager_topic, SumStats::cluster_send_result, Cluster::publish(Cluster::manager_topic, SumStats::cluster_send_result,
uid, ss_name, key, table(), cleanup); uid, ss_name, key, table(), cleanup);
} }
} }
@ -167,14 +167,14 @@ event SumStats::cluster_get_result(uid: string, ss_name: string, key: Key, clean
{ {
if ( ss_name in result_store && key in result_store[ss_name] ) if ( ss_name in result_store && key in result_store[ss_name] )
{ {
Broker::publish(Cluster::manager_topic, SumStats::cluster_send_result, Cluster::publish(Cluster::manager_topic, SumStats::cluster_send_result,
uid, ss_name, key, result_store[ss_name][key], cleanup); uid, ss_name, key, result_store[ss_name][key], cleanup);
} }
else else
{ {
# We need to send an empty response if we don't have the data so that the manager # We need to send an empty response if we don't have the data so that the manager
# can know that it heard back from all of the workers. # can know that it heard back from all of the workers.
Broker::publish(Cluster::manager_topic, SumStats::cluster_send_result, Cluster::publish(Cluster::manager_topic, SumStats::cluster_send_result,
uid, ss_name, key, table(), cleanup); uid, ss_name, key, table(), cleanup);
} }
} }
@ -252,13 +252,13 @@ event SumStats::finish_epoch(ss: SumStat)
stats_keys[uid] = set(); stats_keys[uid] = set();
# Request data from peers. # Request data from peers.
Broker::publish(Cluster::worker_topic, SumStats::cluster_ss_request, Cluster::publish(Cluster::worker_topic, SumStats::cluster_ss_request,
uid, ss$name, T); uid, ss$name, T);
done_with[uid] = 0; done_with[uid] = 0;
#print fmt("get_key by uid: %s", uid); #print fmt("get_key by uid: %s", uid);
Broker::publish(Cluster::worker_topic, SumStats::get_a_key, Cluster::publish(Cluster::worker_topic, SumStats::get_a_key,
uid, ss$name, T); uid, ss$name, T);
} }
@ -274,7 +274,7 @@ function data_added(ss: SumStat, key: Key, result: Result)
if ( check_thresholds(ss, key, result, 1.0) ) if ( check_thresholds(ss, key, result, 1.0) )
{ {
threshold_crossed(ss, key, result); threshold_crossed(ss, key, result);
Broker::publish(Cluster::worker_topic, SumStats::cluster_threshold_crossed, Cluster::publish(Cluster::worker_topic, SumStats::cluster_threshold_crossed,
ss$name, key, threshold_tracker[ss$name][key]); ss$name, key, threshold_tracker[ss$name][key]);
} }
} }
@ -292,7 +292,7 @@ function handle_end_of_result_collection(uid: string, ss_name: string, key: Key,
if ( check_thresholds(ss, key, ir, 1.0) ) if ( check_thresholds(ss, key, ir, 1.0) )
{ {
threshold_crossed(ss, key, ir); threshold_crossed(ss, key, ir);
Broker::publish(Cluster::worker_topic, SumStats::cluster_threshold_crossed, Cluster::publish(Cluster::worker_topic, SumStats::cluster_threshold_crossed,
ss_name, key, threshold_tracker[ss_name][key]); ss_name, key, threshold_tracker[ss_name][key]);
} }
@ -329,7 +329,7 @@ function request_all_current_keys(uid: string, ss_name: string, cleanup: bool)
} }
done_with[uid] = 0; done_with[uid] = 0;
Broker::publish(Cluster::worker_topic, SumStats::cluster_get_result, Cluster::publish(Cluster::worker_topic, SumStats::cluster_get_result,
uid, ss_name, key, cleanup); uid, ss_name, key, cleanup);
delete stats_keys[uid][key]; delete stats_keys[uid][key];
} }
@ -338,7 +338,7 @@ function request_all_current_keys(uid: string, ss_name: string, cleanup: bool)
# Get more keys! And this breaks us out of the evented loop. # Get more keys! And this breaks us out of the evented loop.
done_with[uid] = 0; done_with[uid] = 0;
#print fmt("get_key by uid: %s", uid); #print fmt("get_key by uid: %s", uid);
Broker::publish(Cluster::worker_topic, SumStats::get_a_key, Cluster::publish(Cluster::worker_topic, SumStats::get_a_key,
uid, ss_name, cleanup); uid, ss_name, cleanup);
} }
} }
@ -464,7 +464,7 @@ event SumStats::cluster_key_intermediate_response(ss_name: string, key: Key)
add outstanding_global_views[ss_name][uid]; add outstanding_global_views[ss_name][uid];
done_with[uid] = 0; done_with[uid] = 0;
#print fmt("requesting results for: %s", uid); #print fmt("requesting results for: %s", uid);
Broker::publish(Cluster::worker_topic, SumStats::cluster_get_result, Cluster::publish(Cluster::worker_topic, SumStats::cluster_get_result,
uid, ss_name, key, F); uid, ss_name, key, F);
} }
@ -475,7 +475,7 @@ function request_key(ss_name: string, key: Key): Result
key_requests[uid] = table(); key_requests[uid] = table();
add dynamic_requests[uid]; add dynamic_requests[uid];
Broker::publish(Cluster::worker_topic, SumStats::cluster_get_result, Cluster::publish(Cluster::worker_topic, SumStats::cluster_get_result,
uid, ss_name, key, F); uid, ss_name, key, F);
return when [uid, ss_name, key] ( uid in done_with && return when [uid, ss_name, key] ( uid in done_with &&
Cluster::get_active_node_count(Cluster::WORKER) == done_with[uid] ) Cluster::get_active_node_count(Cluster::WORKER) == done_with[uid] )

View file

@ -301,7 +301,7 @@ event DHCP::aggregate_msgs(ts: time, id: conn_id, uid: string, is_orig: bool, ms
event dhcp_message(c: connection, is_orig: bool, msg: DHCP::Msg, options: DHCP::Options) &priority=-5 event dhcp_message(c: connection, is_orig: bool, msg: DHCP::Msg, options: DHCP::Options) &priority=-5
{ {
if ( Cluster::is_enabled() && Cluster::local_node_type() != Cluster::MANAGER ) if ( Cluster::is_enabled() && Cluster::local_node_type() != Cluster::MANAGER )
Broker::publish(Cluster::manager_topic, DHCP::aggregate_msgs, Cluster::publish(Cluster::manager_topic, DHCP::aggregate_msgs,
network_time(), c$id, c$uid, is_orig, msg, options); network_time(), c$id, c$uid, is_orig, msg, options);
else else
event DHCP::aggregate_msgs(network_time(), c$id, c$uid, is_orig, msg, options); event DHCP::aggregate_msgs(network_time(), c$id, c$uid, is_orig, msg, options);

View file

@ -225,7 +225,7 @@ event sync_add_expected_data(s: Info, chan: ExpectedDataChannel) &is_used
{ {
@if ( Cluster::local_node_type() == Cluster::PROXY || @if ( Cluster::local_node_type() == Cluster::PROXY ||
Cluster::local_node_type() == Cluster::MANAGER ) Cluster::local_node_type() == Cluster::MANAGER )
Broker::publish(Cluster::worker_topic, sync_add_expected_data, minimize_info(s), chan); Cluster::publish(Cluster::worker_topic, sync_add_expected_data, minimize_info(s), chan);
@else @else
ftp_data_expected[chan$resp_h, chan$resp_p] = s; ftp_data_expected[chan$resp_h, chan$resp_p] = s;
Analyzer::schedule_analyzer(chan$orig_h, chan$resp_h, chan$resp_p, Analyzer::schedule_analyzer(chan$orig_h, chan$resp_h, chan$resp_p,
@ -238,7 +238,7 @@ event sync_remove_expected_data(resp_h: addr, resp_p: port) &is_used
{ {
@if ( Cluster::local_node_type() == Cluster::PROXY || @if ( Cluster::local_node_type() == Cluster::PROXY ||
Cluster::local_node_type() == Cluster::MANAGER ) Cluster::local_node_type() == Cluster::MANAGER )
Broker::publish(Cluster::worker_topic, sync_remove_expected_data, resp_h, resp_p); Cluster::publish(Cluster::worker_topic, sync_remove_expected_data, resp_h, resp_p);
@else @else
delete ftp_data_expected[resp_h, resp_p]; delete ftp_data_expected[resp_h, resp_p];
@endif @endif
@ -253,7 +253,7 @@ function add_expected_data_channel(s: Info, chan: ExpectedDataChannel)
Analyzer::ANALYZER_FTP_DATA, Analyzer::ANALYZER_FTP_DATA,
5mins); 5mins);
@if ( Cluster::is_enabled() ) @if ( Cluster::is_enabled() )
Broker::publish(ftp_relay_topic(), sync_add_expected_data, minimize_info(s), chan); Cluster::publish(ftp_relay_topic(), sync_add_expected_data, minimize_info(s), chan);
@endif @endif
} }
@ -470,7 +470,7 @@ hook finalize_ftp_data(c: connection)
{ {
delete ftp_data_expected[c$id$resp_h, c$id$resp_p]; delete ftp_data_expected[c$id$resp_h, c$id$resp_p];
@if ( Cluster::is_enabled() ) @if ( Cluster::is_enabled() )
Broker::publish(ftp_relay_topic(), sync_remove_expected_data, c$id$resp_h, c$id$resp_p); Cluster::publish(ftp_relay_topic(), sync_remove_expected_data, c$id$resp_h, c$id$resp_p);
@endif @endif
} }
} }

View file

@ -48,7 +48,7 @@ event dcc_transfer_add(host: addr, p: port, info: Info) &is_used
{ {
@if ( Cluster::local_node_type() == Cluster::PROXY || @if ( Cluster::local_node_type() == Cluster::PROXY ||
Cluster::local_node_type() == Cluster::MANAGER ) Cluster::local_node_type() == Cluster::MANAGER )
Broker::publish(Cluster::worker_topic, dcc_transfer_add, host, p, info); Cluster::publish(Cluster::worker_topic, dcc_transfer_add, host, p, info);
@else @else
dcc_expected_transfers[host, p] = info; dcc_expected_transfers[host, p] = info;
Analyzer::schedule_analyzer(0.0.0.0, host, p, Analyzer::schedule_analyzer(0.0.0.0, host, p,
@ -60,7 +60,7 @@ event dcc_transfer_remove(host: addr, p: port) &is_used
{ {
@if ( Cluster::local_node_type() == Cluster::PROXY || @if ( Cluster::local_node_type() == Cluster::PROXY ||
Cluster::local_node_type() == Cluster::MANAGER ) Cluster::local_node_type() == Cluster::MANAGER )
Broker::publish(Cluster::worker_topic, dcc_transfer_remove, host, p); Cluster::publish(Cluster::worker_topic, dcc_transfer_remove, host, p);
@else @else
delete dcc_expected_transfers[host, p]; delete dcc_expected_transfers[host, p];
@endif @endif
@ -90,7 +90,7 @@ function log_dcc(f: fa_file)
delete dcc_expected_transfers[cid$resp_h, cid$resp_p]; delete dcc_expected_transfers[cid$resp_h, cid$resp_p];
@if ( Cluster::is_enabled() ) @if ( Cluster::is_enabled() )
Broker::publish(dcc_relay_topic(), dcc_transfer_remove, Cluster::publish(dcc_relay_topic(), dcc_transfer_remove,
cid$resp_h, cid$resp_p); cid$resp_h, cid$resp_p);
@endif @endif
return; return;
@ -118,7 +118,7 @@ event irc_dcc_message(c: connection, is_orig: bool,
dcc_expected_transfers[address, p] = c$irc; dcc_expected_transfers[address, p] = c$irc;
@if ( Cluster::is_enabled() ) @if ( Cluster::is_enabled() )
Broker::publish(dcc_relay_topic(), dcc_transfer_add, address, p, c$irc); Cluster::publish(dcc_relay_topic(), dcc_transfer_add, address, p, c$irc);
@endif @endif
} }
@ -139,7 +139,7 @@ hook finalize_irc_data(c: connection)
delete dcc_expected_transfers[c$id$resp_h, c$id$resp_p]; delete dcc_expected_transfers[c$id$resp_h, c$id$resp_p];
@if ( Cluster::is_enabled() ) @if ( Cluster::is_enabled() )
Broker::publish(dcc_relay_topic(), dcc_transfer_remove, Cluster::publish(dcc_relay_topic(), dcc_transfer_remove,
c$id$resp_h, c$id$resp_p); c$id$resp_h, c$id$resp_p);
@endif @endif
} }

View file

@ -378,12 +378,12 @@ function drop_address_catch_release(a: addr, location: string &default=""): Bloc
Log::write(CATCH_RELEASE, log); Log::write(CATCH_RELEASE, log);
blocks[a] = bi; blocks[a] = bi;
@if ( Cluster::is_enabled() ) @if ( Cluster::is_enabled() )
Broker::publish(Cluster::worker_topic, NetControl::catch_release_block_new, a, bi); Cluster::publish(Cluster::worker_topic, NetControl::catch_release_block_new, a, bi);
@endif @endif
@endif @endif
@if ( Cluster::is_enabled() && Cluster::local_node_type() != Cluster::MANAGER ) @if ( Cluster::is_enabled() && Cluster::local_node_type() != Cluster::MANAGER )
Broker::publish(Cluster::manager_topic, NetControl::catch_release_add, a, location); Cluster::publish(Cluster::manager_topic, NetControl::catch_release_add, a, location);
@endif @endif
return bi; return bi;
} }
@ -401,7 +401,7 @@ function drop_address_catch_release(a: addr, location: string &default=""): Bloc
bi$location = location; bi$location = location;
blocks[a] = bi; blocks[a] = bi;
@if ( Cluster::is_enabled() ) @if ( Cluster::is_enabled() )
Broker::publish(Cluster::worker_topic, NetControl::catch_release_block_new, a, bi); Cluster::publish(Cluster::worker_topic, NetControl::catch_release_block_new, a, bi);
@endif @endif
log = populate_log_record(a, bi, DROP_REQUESTED); log = populate_log_record(a, bi, DROP_REQUESTED);
Log::write(CATCH_RELEASE, log); Log::write(CATCH_RELEASE, log);
@ -413,7 +413,7 @@ function drop_address_catch_release(a: addr, location: string &default=""): Bloc
@if ( Cluster::is_enabled() && Cluster::local_node_type() != Cluster::MANAGER ) @if ( Cluster::is_enabled() && Cluster::local_node_type() != Cluster::MANAGER )
bi = BlockInfo($watch_until=network_time()+catch_release_intervals[1], $block_until=network_time()+block_interval, $current_interval=0, $current_block_id=""); bi = BlockInfo($watch_until=network_time()+catch_release_intervals[1], $block_until=network_time()+block_interval, $current_interval=0, $current_block_id="");
Broker::publish(Cluster::manager_topic, NetControl::catch_release_add, a, location); Cluster::publish(Cluster::manager_topic, NetControl::catch_release_add, a, location);
return bi; return bi;
@endif @endif
@ -435,10 +435,10 @@ function unblock_address_catch_release(a: addr, reason: string &default=""): boo
remove_rule(bi$current_block_id, reason); remove_rule(bi$current_block_id, reason);
@endif @endif
@if ( Cluster::is_enabled() && Cluster::local_node_type() == Cluster::MANAGER ) @if ( Cluster::is_enabled() && Cluster::local_node_type() == Cluster::MANAGER )
Broker::publish(Cluster::worker_topic, NetControl::catch_release_block_delete, a); Cluster::publish(Cluster::worker_topic, NetControl::catch_release_block_delete, a);
@endif @endif
@if ( Cluster::is_enabled() && Cluster::local_node_type() != Cluster::MANAGER ) @if ( Cluster::is_enabled() && Cluster::local_node_type() != Cluster::MANAGER )
Broker::publish(Cluster::manager_topic, NetControl::catch_release_delete, a, reason); Cluster::publish(Cluster::manager_topic, NetControl::catch_release_delete, a, reason);
@endif @endif
return T; return T;
@ -494,13 +494,13 @@ function catch_release_seen(a: addr)
Log::write(CATCH_RELEASE, log); Log::write(CATCH_RELEASE, log);
@endif @endif
@if ( Cluster::is_enabled() && Cluster::local_node_type() == Cluster::MANAGER ) @if ( Cluster::is_enabled() && Cluster::local_node_type() == Cluster::MANAGER )
Broker::publish(Cluster::worker_topic, NetControl::catch_release_block_new, a, bi); Cluster::publish(Cluster::worker_topic, NetControl::catch_release_block_new, a, bi);
@endif @endif
@if ( Cluster::is_enabled() && Cluster::local_node_type() != Cluster::MANAGER ) @if ( Cluster::is_enabled() && Cluster::local_node_type() != Cluster::MANAGER )
if ( a in catch_release_recently_notified ) if ( a in catch_release_recently_notified )
return; return;
Broker::publish(Cluster::manager_topic, NetControl::catch_release_encountered, a); Cluster::publish(Cluster::manager_topic, NetControl::catch_release_encountered, a);
add catch_release_recently_notified[a]; add catch_release_recently_notified[a];
@endif @endif

View file

@ -65,7 +65,7 @@ function add_to_cache(key: string, value: vector of opaque of x509)
{ {
intermediate_cache[key] = value; intermediate_cache[key] = value;
@if ( Cluster::is_enabled() ) @if ( Cluster::is_enabled() )
Broker::publish(Cluster::manager_topic, SSL::new_intermediate, key, value); Cluster::publish(Cluster::manager_topic, SSL::new_intermediate, key, value);
@endif @endif
} }
@ -80,7 +80,7 @@ event SSL::new_intermediate(key: string, value: vector of opaque of x509)
return; return;
intermediate_cache[key] = value; intermediate_cache[key] = value;
Broker::publish(Cluster::worker_topic, SSL::intermediate_add, key, value); Cluster::publish(Cluster::worker_topic, SSL::intermediate_add, key, value);
} }
function cache_validate(chain: vector of opaque of x509): X509::Result function cache_validate(chain: vector of opaque of x509): X509::Result