mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 14:48:21 +00:00
Continue work on config framework clusterization.
This does not currently work.
This commit is contained in:
parent
948bb4b9ec
commit
d6990119db
4 changed files with 153 additions and 6 deletions
|
@ -50,25 +50,43 @@ export {
|
|||
}
|
||||
|
||||
@if ( Cluster::is_enabled() )
|
||||
type OptionCacheValue: record {
|
||||
val: any;
|
||||
location: string;
|
||||
};
|
||||
|
||||
global option_cache: table[string] of OptionCacheValue;
|
||||
|
||||
event bro_init()
|
||||
{
|
||||
Broker::subscribe(change_topic);
|
||||
}
|
||||
|
||||
|
||||
event Config::cluster_set_option(ID: string, val: any, location: string)
|
||||
{
|
||||
@if ( Cluster::local_node_type() == Cluster::MANAGER )
|
||||
option_cache[ID] = OptionCacheValue($val=val, $location=location);
|
||||
@endif
|
||||
Option::set(ID, val, location);
|
||||
}
|
||||
|
||||
function set_value(ID: string, val: any, location: string &default = "" &optional): bool
|
||||
{
|
||||
local cache_val: any;
|
||||
# first cache value in case setting it succeeds and we have to store it.
|
||||
if ( Cluster::local_node_type() == Cluster::MANAGER )
|
||||
cache_val = copy(val);
|
||||
# First try setting it locally - abort if not possible.
|
||||
if ( ! Option::set(ID, val, location) )
|
||||
{
|
||||
return F;
|
||||
}
|
||||
# If setting worked, copy the new value into the cache on the manager
|
||||
if ( Cluster::local_node_type() == Cluster::MANAGER )
|
||||
option_cache[ID] = OptionCacheValue($val=cache_val, $location=location);
|
||||
|
||||
# If it turns out that it is possible - send it to everyone else to apply.
|
||||
Broker::publish(change_topic, Config::cluster_set_option, ID, val, location);
|
||||
|
||||
if ( Cluster::local_node_type() != Cluster::MANAGER )
|
||||
{
|
||||
Broker::relay(change_topic, change_topic, Config::cluster_set_option, ID, val, location);
|
||||
|
@ -76,11 +94,27 @@ function set_value(ID: string, val: any, location: string &default = "" &optiona
|
|||
return T;
|
||||
}
|
||||
@else
|
||||
# Standalone implementation
|
||||
function set_value(ID: string, val: any, location: string &default = "" &optional): bool
|
||||
# Standalone implementation
|
||||
function set_value(ID: string, val: any, location: string &default = "" &optional): bool
|
||||
{
|
||||
return Option::set(ID, val, location);
|
||||
}
|
||||
@endif
|
||||
|
||||
@if ( Cluster::is_enabled() && Cluster::local_node_type() == Cluster::MANAGER )
|
||||
# Handling of new worker nodes.
|
||||
event Cluster::node_up(name: string, id: string)
|
||||
{
|
||||
# When a node connects, send it all current Option values.
|
||||
if ( name in Cluster::nodes )
|
||||
{
|
||||
return Option::set(ID, val, location);
|
||||
print option_cache;
|
||||
for ( ID in option_cache )
|
||||
{
|
||||
Broker::publish(Cluster::node_topic(name), Config::cluster_set_option, ID, option_cache[ID]$val, option_cache[ID]$location);
|
||||
}
|
||||
}
|
||||
}
|
||||
@endif
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,11 @@
|
|||
#separator \x09
|
||||
#set_separator ,
|
||||
#empty_field (empty)
|
||||
#unset_field -
|
||||
#path config
|
||||
#open 2018-06-22-18-27-45
|
||||
#fields ts id old_value new_value location
|
||||
#types time string string string string
|
||||
1529692065.525489 testport 42/tcp 44/tcp -
|
||||
1529692065.562594 teststring a b comment
|
||||
#close 2018-06-22-18-27-50
|
|
@ -6,6 +6,7 @@
|
|||
# @TEST-EXEC: btest-diff manager-1/.stdout
|
||||
# @TEST-EXEC: btest-diff worker-1/.stdout
|
||||
# @TEST-EXEC: btest-diff worker-2/.stdout
|
||||
# @TEST-EXEC: btest-diff manager-1/config.log
|
||||
|
||||
@load base/frameworks/config
|
||||
|
||||
|
@ -45,7 +46,6 @@ event ready_for_data()
|
|||
Config::set_value("testport", 44/tcp);
|
||||
Config::set_value("teststring", "b", "comment");
|
||||
}
|
||||
|
||||
@endif
|
||||
|
||||
event die()
|
||||
|
|
102
testing/btest/scripts/base/frameworks/config/cluster_resend.bro
Normal file
102
testing/btest/scripts/base/frameworks/config/cluster_resend.bro
Normal file
|
@ -0,0 +1,102 @@
|
|||
# @TEST-EXEC: btest-bg-run manager-1 BROPATH=$BROPATH:.. CLUSTER_NODE=manager-1 bro %INPUT
|
||||
# @TEST-EXEC: sleep 1
|
||||
# @TEST-EXEC: btest-bg-run worker-1 BROPATH=$BROPATH:.. CLUSTER_NODE=worker-1 bro %INPUT
|
||||
# @TEST-EXEC: sleep 15
|
||||
# @TEST-EXEC: btest-bg-run worker-2 BROPATH=$BROPATH:.. CLUSTER_NODE=worker-2 bro %INPUT
|
||||
# @TEST-EXEC: btest-bg-wait 15
|
||||
# @TEST-EXEC: btest-diff manager-1/.stdout
|
||||
# @TEST-EXEC: btest-diff worker-1/.stdout
|
||||
# @TEST-EXEC: btest-diff worker-2/.stdout
|
||||
# @TEST-EXEC: btest-diff manager-1/config.log
|
||||
|
||||
# In this test we check if values get updated on a worker, even if they were set before the
|
||||
# worker is present.
|
||||
|
||||
@load base/frameworks/config
|
||||
|
||||
|
||||
@TEST-START-FILE cluster-layout.bro
|
||||
redef Cluster::nodes = {
|
||||
["manager-1"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=37757/tcp],
|
||||
["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=37760/tcp, $manager="manager-1", $interface="eth0"],
|
||||
["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=37761/tcp, $manager="manager-1", $interface="eth1"],
|
||||
};
|
||||
@TEST-END-FILE
|
||||
|
||||
redef Log::default_rotation_interval = 0secs;
|
||||
|
||||
export {
|
||||
option testport = 42/tcp;
|
||||
option teststring = "a";
|
||||
}
|
||||
|
||||
global n = 0;
|
||||
|
||||
event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
|
||||
{
|
||||
terminate();
|
||||
}
|
||||
|
||||
global ready_for_data: event();
|
||||
|
||||
event bro_init()
|
||||
{
|
||||
Broker::auto_publish(Cluster::worker_topic, ready_for_data);
|
||||
}
|
||||
|
||||
@if ( Cluster::node == "worker-1" )
|
||||
event ready_for_data()
|
||||
{
|
||||
Config::set_value("testport", 44/tcp);
|
||||
Config::set_value("teststring", "b", "comment");
|
||||
}
|
||||
@endif
|
||||
|
||||
@if ( Cluster::is_enabled() && Cluster::local_node_type() == Cluster::MANAGER )
|
||||
event Cluster::node_up(name: string, id: string)
|
||||
{
|
||||
print "Node up", name;
|
||||
}
|
||||
@endif
|
||||
|
||||
event die()
|
||||
{
|
||||
terminate();
|
||||
}
|
||||
|
||||
function option_changed(ID: string, new_value: any, location: string): any
|
||||
{
|
||||
print "option changed", ID, new_value, location;
|
||||
#schedule 5sec { die() };
|
||||
return new_value;
|
||||
}
|
||||
|
||||
event bro_init() &priority=5
|
||||
{
|
||||
Option::set_change_handler("testport", option_changed, -100);
|
||||
Option::set_change_handler("teststring", option_changed, -100);
|
||||
}
|
||||
|
||||
@if ( Cluster::local_node_type() == Cluster::MANAGER )
|
||||
|
||||
global peer_count = 0;
|
||||
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
|
||||
{
|
||||
++peer_count;
|
||||
if ( peer_count == 1 )
|
||||
event ready_for_data();
|
||||
}
|
||||
|
||||
@endif
|
||||
|
||||
event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
|
||||
{
|
||||
terminate();
|
||||
}
|
||||
|
||||
module Config;
|
||||
|
||||
event Config::cluster_set_option(ID: string, val: any, location: string)
|
||||
{
|
||||
print "cluster_set_option for ", ID;
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue