mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 14:48:21 +00:00
Generalize Cluster::worker_count.
This commit is contained in:
parent
379624404c
commit
3db8bb4a44
6 changed files with 62 additions and 47 deletions
|
@ -202,7 +202,7 @@ export {
|
|||
## and it's maintained internally by the cluster framework. It's
|
||||
## primarily intended for use by managers to find out how many workers
|
||||
## should be responding to requests.
|
||||
global worker_count: count = 0;
|
||||
global worker_count: count = 0 &deprecated="Remove in v6.1. Active worker count can be obtained via get_active_node_count(Cluster::WORKER)";
|
||||
|
||||
## The cluster layout definition. This should be placed into a filter
|
||||
## named cluster-layout.zeek somewhere in the ZEEKPATH. It will be
|
||||
|
@ -212,6 +212,15 @@ export {
|
|||
## or "worker-1").
|
||||
const nodes: table[string] of Node = {} &redef;
|
||||
|
||||
## Returns the number of nodes defined in the cluster layout for a given
|
||||
## node type.
|
||||
global get_node_count: function(node_type: NodeType): count;
|
||||
|
||||
## Returns the number of nodes per type, the calling node is currently
|
||||
## connected to. This is primarily intended for use by the manager to find
|
||||
## out how many nodes should be responding to requests.
|
||||
global get_active_node_count: function(node_type: NodeType): count;
|
||||
|
||||
## Indicates whether or not the manager will act as the logger and receive
|
||||
## logs. This value should be set in the cluster-layout.zeek script (the
|
||||
## value should be true only if no logger is specified in Cluster::nodes).
|
||||
|
@ -262,7 +271,8 @@ export {
|
|||
global nodeid_topic: function(id: string): string;
|
||||
}
|
||||
|
||||
global active_worker_ids: set[string] = set();
|
||||
# Track active nodes per type.
|
||||
global active_node_ids: table[NodeType] of set[string];
|
||||
|
||||
type NamedNode: record {
|
||||
name: string;
|
||||
|
@ -272,25 +282,35 @@ type NamedNode: record {
|
|||
function nodes_with_type(node_type: NodeType): vector of NamedNode
|
||||
{
|
||||
local rval: vector of NamedNode = vector();
|
||||
local names: vector of string = vector();
|
||||
|
||||
for ( name in Cluster::nodes )
|
||||
names += name;
|
||||
|
||||
names = sort(names, strcmp);
|
||||
|
||||
for ( i in names )
|
||||
for ( name, n in Cluster::nodes )
|
||||
{
|
||||
name = names[i];
|
||||
local n = Cluster::nodes[name];
|
||||
|
||||
if ( n$node_type != node_type )
|
||||
next;
|
||||
|
||||
rval += NamedNode($name=name, $node=n);
|
||||
}
|
||||
|
||||
return rval;
|
||||
return sort(rval, function(n1: NamedNode, n2: NamedNode): int
|
||||
{ return strcmp(n1$name, n2$name); });
|
||||
}
|
||||
|
||||
function Cluster::get_node_count(node_type: NodeType): count
|
||||
{
|
||||
local cnt = 0;
|
||||
|
||||
for ( _, n in nodes )
|
||||
{
|
||||
if ( n$node_type == node_type )
|
||||
++cnt;
|
||||
}
|
||||
|
||||
return cnt;
|
||||
}
|
||||
|
||||
function Cluster::get_active_node_count(node_type: NodeType): count
|
||||
{
|
||||
return |active_node_ids[node_type]|;
|
||||
}
|
||||
|
||||
function is_enabled(): bool
|
||||
|
@ -319,6 +339,8 @@ function nodeid_topic(id: string): string
|
|||
return nodeid_topic_prefix + id + "/";
|
||||
}
|
||||
|
||||
@if ( Cluster::is_enabled() )
|
||||
|
||||
event Cluster::hello(name: string, id: string) &priority=10
|
||||
{
|
||||
if ( name !in nodes )
|
||||
|
@ -341,11 +363,14 @@ event Cluster::hello(name: string, id: string) &priority=10
|
|||
n$id = id;
|
||||
Cluster::log(fmt("got hello from %s (%s)", name, id));
|
||||
|
||||
if ( n$node_type !in active_node_ids )
|
||||
active_node_ids[n$node_type] = set();
|
||||
add active_node_ids[n$node_type][id];
|
||||
|
||||
@pragma push ignore-deprecations
|
||||
if ( n$node_type == WORKER )
|
||||
{
|
||||
add active_worker_ids[id];
|
||||
worker_count = |active_worker_ids|;
|
||||
}
|
||||
worker_count = |active_node_ids[WORKER]|;
|
||||
@pragma pop ignore-deprecations
|
||||
}
|
||||
|
||||
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) &priority=10
|
||||
|
@ -365,12 +390,12 @@ event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) &priority=1
|
|||
{
|
||||
Cluster::log(fmt("node down: %s", node_name));
|
||||
delete n$id;
|
||||
delete active_node_ids[n$node_type][endpoint$id];
|
||||
|
||||
@pragma push ignore-deprecations
|
||||
if ( n$node_type == WORKER )
|
||||
{
|
||||
delete active_worker_ids[endpoint$id];
|
||||
worker_count = |active_worker_ids|;
|
||||
}
|
||||
worker_count = |active_node_ids[WORKER]|;
|
||||
@pragma pop ignore-deprecations
|
||||
|
||||
event Cluster::node_down(node_name, endpoint$id);
|
||||
break;
|
||||
|
@ -390,6 +415,8 @@ event zeek_init() &priority=5
|
|||
Log::create_stream(Cluster::LOG, [$columns=Info, $path="cluster", $policy=log_policy]);
|
||||
}
|
||||
|
||||
@endif
|
||||
|
||||
function create_store(name: string, persistent: bool &default=F): Cluster::StoreInfo
|
||||
{
|
||||
local info = stores[name];
|
||||
|
|
|
@ -295,7 +295,6 @@ function handle_end_of_result_collection(uid: string, ss_name: string, key: Key,
|
|||
return;
|
||||
}
|
||||
|
||||
#print fmt("worker_count:%d :: done_with:%d", Cluster::worker_count, done_with[uid]);
|
||||
local ss = stats_store[ss_name];
|
||||
local ir = key_requests[uid];
|
||||
if ( check_thresholds(ss, key, ir, 1.0) )
|
||||
|
@ -357,7 +356,7 @@ event SumStats::send_no_key(uid: string, ss_name: string)
|
|||
done_with[uid] = 0;
|
||||
|
||||
++done_with[uid];
|
||||
if ( Cluster::worker_count == done_with[uid] )
|
||||
if ( Cluster::get_active_node_count(Cluster::WORKER) == done_with[uid] )
|
||||
{
|
||||
delete done_with[uid];
|
||||
|
||||
|
@ -394,7 +393,7 @@ event SumStats::send_a_key(uid: string, ss_name: string, key: Key)
|
|||
add stats_keys[uid][key];
|
||||
|
||||
++done_with[uid];
|
||||
if ( Cluster::worker_count == done_with[uid] )
|
||||
if ( Cluster::get_active_node_count(Cluster::WORKER) == done_with[uid] )
|
||||
{
|
||||
delete done_with[uid];
|
||||
|
||||
|
@ -437,7 +436,7 @@ event SumStats::cluster_send_result(uid: string, ss_name: string, key: Key, resu
|
|||
++done_with[uid];
|
||||
|
||||
if ( uid !in dynamic_requests &&
|
||||
uid in done_with && Cluster::worker_count == done_with[uid] )
|
||||
uid in done_with && Cluster::get_active_node_count(Cluster::WORKER) == done_with[uid] )
|
||||
{
|
||||
handle_end_of_result_collection(uid, ss_name, key, cleanup);
|
||||
|
||||
|
@ -481,7 +480,8 @@ function request_key(ss_name: string, key: Key): Result
|
|||
add dynamic_requests[uid];
|
||||
|
||||
event SumStats::cluster_get_result(uid, ss_name, key, F);
|
||||
return when [uid, ss_name, key] ( uid in done_with && Cluster::worker_count == done_with[uid] )
|
||||
return when [uid, ss_name, key] ( uid in done_with &&
|
||||
Cluster::get_active_node_count(Cluster::WORKER) == done_with[uid] )
|
||||
{
|
||||
#print "done with request_key";
|
||||
local result = key_requests[uid];
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue