cluster/zeromq: Fix node_topic() and nodeid_topic()

Due to prefix matching, worker-1's node_topic() also matched worker-10,
worker-11, etc. Suffix the node topic with a `.`. The original implementation
came from NATS, where subjects are separated by `.`.

Adapt nodeid_topic() for consistency.
This commit is contained in:
Arne Welzel 2025-03-16 14:56:27 +01:00
parent 26441e0c24
commit 2963c49f27
8 changed files with 168 additions and 4 deletions

View file

@ -226,11 +226,11 @@ redef Cluster::backend = Cluster::CLUSTER_BACKEND_ZEROMQ;
redef run_proxy_thread = Cluster::local_node_type() == Cluster::MANAGER; redef run_proxy_thread = Cluster::local_node_type() == Cluster::MANAGER;
function zeromq_node_topic(name: string): string { function zeromq_node_topic(name: string): string {
return node_topic_prefix + "." + name; return node_topic_prefix + "." + name + ".";
} }
function zeromq_nodeid_topic(id: string): string { function zeromq_nodeid_topic(id: string): string {
return nodeid_topic_prefix + "." + id; return nodeid_topic_prefix + "." + id + ".";
} }
# Unique identifier for this node with some debug information. # Unique identifier for this node with some debug information.
@ -345,7 +345,7 @@ event Cluster::Backend::ZeroMQ::subscription(topic: string)
if ( ! starts_with(topic, prefix) ) if ( ! starts_with(topic, prefix) )
return; return;
local nodeid = topic[|prefix|:]; local nodeid = topic[|prefix|:][:-1];
# Do not say hello to ourselves - we won't see it anyhow. # Do not say hello to ourselves - we won't see it anyhow.
if ( nodeid == Cluster::node_id() ) if ( nodeid == Cluster::node_id() )
@ -417,7 +417,7 @@ event Cluster::Backend::ZeroMQ::unsubscription(topic: string)
if ( ! starts_with(topic, prefix) ) if ( ! starts_with(topic, prefix) )
return; return;
local gone_node_id = topic[|prefix|:]; local gone_node_id = topic[|prefix|:][:-1];
local name = ""; local name = "";
for ( node_name, n in Cluster::nodes ) { for ( node_name, n in Cluster::nodes ) {
if ( n?$id && n$id == gone_node_id ) { if ( n?$id && n$id == gone_node_id ) {

View file

@ -0,0 +1,15 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
### NOTE: This file has been sorted with diff-sort.
A, manager
B node_up - sending ping to 'worker-1'
B node_up - sending ping to 'worker-10'
B node_up - sending ping to 'worker-2'
B node_up - sending ping to 'worker-20'
C pong from 'worker-1' to 'manager'
C pong from 'worker-10' to 'manager'
C pong from 'worker-2' to 'manager'
C pong from 'worker-20' to 'manager'
D node_down from 'worker-1'
D node_down from 'worker-10'
D node_down from 'worker-2'
D node_down from 'worker-20'

View file

@ -0,0 +1,4 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
A, worker-1
B ping from 'manager' to 'worker-1'
C finish from 'manager' to 'worker-1'

View file

@ -0,0 +1,4 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
A, worker-2
B ping from 'manager' to 'worker-2'
C finish from 'manager' to 'worker-2'

View file

@ -0,0 +1,4 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
A, worker-10
B ping from 'manager' to 'worker-10'
C finish from 'manager' to 'worker-10'

View file

@ -0,0 +1,4 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
A, worker-20
B ping from 'manager' to 'worker-20'
C finish from 'manager' to 'worker-20'

View file

@ -0,0 +1,16 @@
# @TEST-DOC: Save seeds and read and assure the UIDs are the same. Regression test for #4209
#
# @TEST-EXEC: zeek --save-seeds myseeds -r $TRACES/http/get.trace %INPUT
# @TEST-EXEC: mkdir save && mv *log save
# @TEST-EXEC: zeek-cut -m uid history service < save/conn.log >save/conn.log.cut
#
# @TEST-EXEC: zeek --load-seeds myseeds -r $TRACES/http/get.trace %INPUT
# @TEST-EXEC: mkdir load && mv *log load
# @TEST-EXEC: zeek-cut -m uid history service < load/conn.log >load/conn.log.cut
#
# @TEST-EXEC: btest-diff load/conn.log.cut
# @TEST-EXEC: btest-diff save/conn.log.cut
# @TEST-EXEC: diff load/conn.log.cut save/conn.log.cut
@load base/protocols/conn
@load base/protocols/http

View file

@ -0,0 +1,117 @@
# @TEST-DOC: Ensure that worker-1 does not observe messages to worker-20 on its Cluster::node_topic()
#
# @TEST-REQUIRES: have-zeromq
#
# @TEST-GROUP: cluster-zeromq
#
# @TEST-PORT: XPUB_PORT
# @TEST-PORT: XSUB_PORT
# @TEST-PORT: LOG_PULL_PORT
#
# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek
#
# @TEST-EXEC: zeek --parse-only manager.zeek worker.zeek
#
# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek >out"
# @TEST-EXEC: btest-bg-run worker-1 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-1 zeek -b ../worker.zeek >out"
# @TEST-EXEC: btest-bg-run worker-2 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-10 zeek -b ../worker.zeek >out"
# @TEST-EXEC: btest-bg-run worker-10 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-2 zeek -b ../worker.zeek >out"
# @TEST-EXEC: btest-bg-run worker-20 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-20 zeek -b ../worker.zeek >out"
#
# @TEST-EXEC: btest-bg-wait 30
# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-sort btest-diff ./manager/out
# @TEST-EXEC: btest-diff ./worker-1/out
# @TEST-EXEC: btest-diff ./worker-2/out
# @TEST-EXEC: btest-diff ./worker-10/out
# @TEST-EXEC: btest-diff ./worker-20/out
# @TEST-START-FILE common.zeek
@load ./zeromq-test-bootstrap
global ping: event(from: string, to: string);
global pong: event(from: string, to: string);
global finish: event(from: string, to: string);
event zeek_init()
{
print "A", Cluster::node;
}
# @TEST-END-FILE
# @TEST-START-FILE manager.zeek
@load ./common.zeek
global nodes_down = 0;
global nodes_up = 0;
event send_pings()
{
for ( name, n in Cluster::nodes )
if ( n$node_type == Cluster::WORKER )
Cluster::publish(Cluster::node_topic(name), ping, Cluster::node, name);
}
# If a node comes up, send it a ping
event Cluster::node_up(name: string, id: string)
{
print fmt("B node_up - sending ping to '%s'", name);
++nodes_up;
if ( nodes_up == 4 )
event send_pings();
}
event ping(from: string, to: string)
{
# manager node should never see ping events.
print "XXX FAIL ping", from, to;
}
event pong(from: string, to: string)
{
print fmt("C pong from '%s' to '%s'", from ,to);
Cluster::publish(Cluster::node_topic(from), finish, Cluster::node, from);
}
# If the worker vanishes, finish the test.
event Cluster::node_down(name: string, id: string)
{
print fmt("D node_down from '%s'", name);
++nodes_down;
if ( nodes_down == 4 )
terminate();
}
# @TEST-END-FILE
# @TEST-START-FILE worker.zeek
@load ./common.zeek
# Reply to a ping with a pong.
event ping(from: string, to: string)
{
if ( to != Cluster::node )
print fmt("FAIL: got ping destined to '%s'", to);
print fmt("B ping from '%s' to '%s'", from, to);
Cluster::publish(Cluster::node_topic(from), pong, Cluster::node, from);
}
event finish(from: string, to: string) &is_used
{
print fmt("C finish from '%s' to '%s'", from, to);
terminate();
}
# @TEST-END-FILE
# @TEST-START-FILE cluster-layout.zeek
redef Cluster::manager_is_logger = T;
redef Log::default_rotation_interval = 0.0sec;
redef Cluster::nodes = {
["manager"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1],
["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1],
["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1],
["worker-10"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1],
["worker-20"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1],
};
# @TEST-END-FILE