Add sending of values to nodes that dropped out.

The only node that cannot be recovered is the manager - and the manager
should just re-read its own configuration and be ok :)
This commit is contained in:
Johanna Amann 2018-06-29 13:10:00 -07:00
parent 5f07673e25
commit c28f1ae0ce
8 changed files with 48 additions and 21 deletions

View file

@ -62,7 +62,6 @@ event bro_init()
Broker::subscribe(change_topic); Broker::subscribe(change_topic);
} }
event Config::cluster_set_option(ID: string, val: any, location: string) event Config::cluster_set_option(ID: string, val: any, location: string)
{ {
@if ( Cluster::local_node_type() == Cluster::MANAGER ) @if ( Cluster::local_node_type() == Cluster::MANAGER )
@ -103,18 +102,13 @@ function set_value(ID: string, val: any, location: string &default = "" &optiona
@if ( Cluster::is_enabled() && Cluster::local_node_type() == Cluster::MANAGER ) @if ( Cluster::is_enabled() && Cluster::local_node_type() == Cluster::MANAGER )
# Handling of new worker nodes. # Handling of new worker nodes.
event Cluster::node_up(name: string, id: string) event Cluster::node_up(name: string, id: string) &priority=-10
{ {
# When a node connects, send it all current Option values. # When a node connects, send it all current Option values.
if ( name in Cluster::nodes ) if ( name in Cluster::nodes )
{
print option_cache;
for ( ID in option_cache ) for ( ID in option_cache )
{
Broker::publish(Cluster::node_topic(name), Config::cluster_set_option, ID, option_cache[ID]$val, option_cache[ID]$location); Broker::publish(Cluster::node_topic(name), Config::cluster_set_option, ID, option_cache[ID]$val, option_cache[ID]$location);
} }
}
}
@endif @endif

View file

@ -48,7 +48,12 @@ static bool call_option_handlers_and_set_value(StringVal* name, ID* i, Val* val,
## ##
## Returns: true on success, false when an error occurred. ## Returns: true on success, false when an error occurred.
## ##
## .. bro:see:: Option::set_change_handler ## .. bro:see:: Option::set_change_handler Config::set_value
##
## .. note:: :bro:id:`Option::set` only works on one node and does not distribute
## new values across a cluster. The higher-level :bro:id:`Config::set_value`
## supports clusterization and should typically be used instead of this
## lower-level function.
function Option::set%(ID: string, val: any, location: string &default=""%): bool function Option::set%(ID: string, val: any, location: string &default=""%): bool
%{ %{
auto i = global_scope()->Lookup(ID->CheckString()); auto i = global_scope()->Lookup(ID->CheckString());

View file

@ -0,0 +1,5 @@
Node up, worker-1
option changed, testcount, 1,
option changed, testport, 44/tcp,
option changed, teststring, b, comment
Node up, worker-2

View file

@ -0,0 +1,12 @@
#separator \x09
#set_separator ,
#empty_field (empty)
#unset_field -
#path config
#open 2018-06-29-19-46-30
#fields ts id old_value new_value location
#types time string string string string
1530301590.505311 testcount 0 1 -
1530301590.605012 testport 42/tcp 44/tcp -
1530301590.605012 teststring a b comment
#close 2018-06-29-19-46-51

View file

@ -0,0 +1,5 @@
option changed, testport, 44/tcp,
option changed, teststring, b, comment
option changed, testcount, 1,
option changed, testport, 44/tcp,
option changed, teststring, b, comment

View file

@ -0,0 +1,3 @@
option changed, testport, 44/tcp,
option changed, testcount, 1,
option changed, teststring, b, comment

View file

@ -69,7 +69,7 @@ event bro_init() &priority=5
@if ( Cluster::local_node_type() == Cluster::MANAGER ) @if ( Cluster::local_node_type() == Cluster::MANAGER )
global peer_count = 0; global peer_count = 0;
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) event Cluster::node_up(name: string, id: string)
{ {
++peer_count; ++peer_count;
if ( peer_count == 2 ) if ( peer_count == 2 )

View file

@ -28,6 +28,7 @@ redef Log::default_rotation_interval = 0secs;
export { export {
option testport = 42/tcp; option testport = 42/tcp;
option teststring = "a"; option teststring = "a";
option testcount: count = 0;
} }
global n = 0; global n = 0;
@ -52,10 +53,10 @@ event ready_for_data()
} }
@endif @endif
@if ( Cluster::is_enabled() && Cluster::local_node_type() == Cluster::MANAGER ) @if ( Cluster::node == "manager-1" )
event Cluster::node_up(name: string, id: string) event ready_for_data()
{ {
print "Node up", name; Config::set_value("testcount", 1);
} }
@endif @endif
@ -64,10 +65,18 @@ event die()
terminate(); terminate();
} }
@if ( Cluster::is_enabled() && Cluster::local_node_type() == Cluster::MANAGER )
event Cluster::node_up(name: string, id: string)
{
print "Node up", name;
if ( name == "worker-2" )
schedule 5sec { die() };
}
@endif
function option_changed(ID: string, new_value: any, location: string): any function option_changed(ID: string, new_value: any, location: string): any
{ {
print "option changed", ID, new_value, location; print "option changed", ID, new_value, location;
#schedule 5sec { die() };
return new_value; return new_value;
} }
@ -75,12 +84,13 @@ event bro_init() &priority=5
{ {
Option::set_change_handler("testport", option_changed, -100); Option::set_change_handler("testport", option_changed, -100);
Option::set_change_handler("teststring", option_changed, -100); Option::set_change_handler("teststring", option_changed, -100);
Option::set_change_handler("testcount", option_changed, -100);
} }
@if ( Cluster::local_node_type() == Cluster::MANAGER ) @if ( Cluster::local_node_type() == Cluster::MANAGER )
global peer_count = 0; global peer_count = 0;
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) event Cluster::node_up(name: string, id: string) &priority=-5
{ {
++peer_count; ++peer_count;
if ( peer_count == 1 ) if ( peer_count == 1 )
@ -93,10 +103,3 @@ event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
{ {
terminate(); terminate();
} }
module Config;
event Config::cluster_set_option(ID: string, val: any, location: string)
{
print "cluster_set_option for ", ID;
}