diff --git a/scripts/base/frameworks/config/main.bro b/scripts/base/frameworks/config/main.bro index c93271efa1..3503e5e6bd 100644 --- a/scripts/base/frameworks/config/main.bro +++ b/scripts/base/frameworks/config/main.bro @@ -50,25 +50,43 @@ export { } @if ( Cluster::is_enabled() ) +type OptionCacheValue: record { + val: any; + location: string; +}; + +global option_cache: table[string] of OptionCacheValue; + event bro_init() { Broker::subscribe(change_topic); } + event Config::cluster_set_option(ID: string, val: any, location: string) { +@if ( Cluster::local_node_type() == Cluster::MANAGER ) + option_cache[ID] = OptionCacheValue($val=val, $location=location); +@endif Option::set(ID, val, location); } function set_value(ID: string, val: any, location: string &default = "" &optional): bool { + local cache_val: any; + # first cache value in case setting it succeeds and we have to store it. + if ( Cluster::local_node_type() == Cluster::MANAGER ) + cache_val = copy(val); # First try setting it locally - abort if not possible. if ( ! Option::set(ID, val, location) ) - { return F; - } + # If setting worked, copy the new value into the cache on the manager + if ( Cluster::local_node_type() == Cluster::MANAGER ) + option_cache[ID] = OptionCacheValue($val=cache_val, $location=location); + # If it turns out that it is possible - send it to everyone else to apply. Broker::publish(change_topic, Config::cluster_set_option, ID, val, location); + if ( Cluster::local_node_type() != Cluster::MANAGER ) { Broker::relay(change_topic, change_topic, Config::cluster_set_option, ID, val, location); @@ -76,11 +94,27 @@ function set_value(ID: string, val: any, location: string &default = "" &optiona return T; } @else - # Standalone implementation - function set_value(ID: string, val: any, location: string &default = "" &optional): bool +# Standalone implementation +function set_value(ID: string, val: any, location: string &default = "" &optional): bool + { + return Option::set(ID, val, location); + } +@endif + +@if ( Cluster::is_enabled() && Cluster::local_node_type() == Cluster::MANAGER ) +# Handling of new worker nodes. +event Cluster::node_up(name: string, id: string) + { + # When a node connects, send it all current Option values. + if ( name in Cluster::nodes ) { - return Option::set(ID, val, location); + print option_cache; + for ( ID in option_cache ) + { + Broker::publish(Cluster::node_topic(name), Config::cluster_set_option, ID, option_cache[ID]$val, option_cache[ID]$location); + } } + } @endif diff --git a/testing/btest/Baseline/scripts.base.frameworks.config.basic_cluster/manager-1.config.log b/testing/btest/Baseline/scripts.base.frameworks.config.basic_cluster/manager-1.config.log new file mode 100644 index 0000000000..2953e3d78d --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.config.basic_cluster/manager-1.config.log @@ -0,0 +1,11 @@ +#separator \x09 +#set_separator , +#empty_field (empty) +#unset_field - +#path config +#open 2018-06-22-18-27-45 +#fields ts id old_value new_value location +#types time string string string string +1529692065.525489 testport 42/tcp 44/tcp - +1529692065.562594 teststring a b comment +#close 2018-06-22-18-27-50 diff --git a/testing/btest/scripts/base/frameworks/config/basic_cluster.bro b/testing/btest/scripts/base/frameworks/config/basic_cluster.bro index 1703bee8e5..4dc743a0cf 100644 --- a/testing/btest/scripts/base/frameworks/config/basic_cluster.bro +++ b/testing/btest/scripts/base/frameworks/config/basic_cluster.bro @@ -6,6 +6,7 @@ # @TEST-EXEC: btest-diff manager-1/.stdout # @TEST-EXEC: btest-diff worker-1/.stdout # @TEST-EXEC: btest-diff worker-2/.stdout +# @TEST-EXEC: btest-diff manager-1/config.log @load base/frameworks/config @@ -45,7 +46,6 @@ event ready_for_data() Config::set_value("testport", 44/tcp); Config::set_value("teststring", "b", "comment"); } - @endif event die() diff --git a/testing/btest/scripts/base/frameworks/config/cluster_resend.bro b/testing/btest/scripts/base/frameworks/config/cluster_resend.bro new file mode 100644 index 0000000000..450c03cbdf --- /dev/null +++ b/testing/btest/scripts/base/frameworks/config/cluster_resend.bro @@ -0,0 +1,102 @@ +# @TEST-EXEC: btest-bg-run manager-1 BROPATH=$BROPATH:.. CLUSTER_NODE=manager-1 bro %INPUT +# @TEST-EXEC: sleep 1 +# @TEST-EXEC: btest-bg-run worker-1 BROPATH=$BROPATH:.. CLUSTER_NODE=worker-1 bro %INPUT +# @TEST-EXEC: sleep 15 +# @TEST-EXEC: btest-bg-run worker-2 BROPATH=$BROPATH:.. CLUSTER_NODE=worker-2 bro %INPUT +# @TEST-EXEC: btest-bg-wait 15 +# @TEST-EXEC: btest-diff manager-1/.stdout +# @TEST-EXEC: btest-diff worker-1/.stdout +# @TEST-EXEC: btest-diff worker-2/.stdout +# @TEST-EXEC: btest-diff manager-1/config.log + +# In this test we check if values get updated on a worker, even if they were set before the +# worker is present. + +@load base/frameworks/config + + +@TEST-START-FILE cluster-layout.bro +redef Cluster::nodes = { + ["manager-1"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=37757/tcp], + ["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=37760/tcp, $manager="manager-1", $interface="eth0"], + ["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=37761/tcp, $manager="manager-1", $interface="eth1"], +}; +@TEST-END-FILE + +redef Log::default_rotation_interval = 0secs; + +export { + option testport = 42/tcp; + option teststring = "a"; +} + +global n = 0; + +event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) + { + terminate(); + } + +global ready_for_data: event(); + +event bro_init() + { + Broker::auto_publish(Cluster::worker_topic, ready_for_data); + } + +@if ( Cluster::node == "worker-1" ) +event ready_for_data() + { + Config::set_value("testport", 44/tcp); + Config::set_value("teststring", "b", "comment"); + } +@endif + +@if ( Cluster::is_enabled() && Cluster::local_node_type() == Cluster::MANAGER ) +event Cluster::node_up(name: string, id: string) + { + print "Node up", name; + } +@endif + +event die() + { + terminate(); + } + +function option_changed(ID: string, new_value: any, location: string): any + { + print "option changed", ID, new_value, location; + #schedule 5sec { die() }; + return new_value; + } + +event bro_init() &priority=5 + { + Option::set_change_handler("testport", option_changed, -100); + Option::set_change_handler("teststring", option_changed, -100); + } + +@if ( Cluster::local_node_type() == Cluster::MANAGER ) + +global peer_count = 0; +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) + { + ++peer_count; + if ( peer_count == 1 ) + event ready_for_data(); + } + +@endif + +event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) + { + terminate(); + } + +module Config; + +event Config::cluster_set_option(ID: string, val: any, location: string) + { + print "cluster_set_option for ", ID; + }