Merge branch 'topic/christian/gh4318-track-broker-peerings'

* topic/christian/gh4318-track-broker-peerings:
  Use Broker peering directionality when re-peering after backpressure overflows
  Expand Broker APIs to allow tracking directionality of peering establishment
This commit is contained in:
Christian Kreibich 2025-04-21 17:14:44 -07:00
commit ed161692dd
13 changed files with 147 additions and 16 deletions

View file

@ -1,3 +1,9 @@
7.2.0-dev.569 | 2025-04-21 17:14:44 -0700
* Use Broker peering directionality when re-peering after backpressure overflows (Christian Kreibich, Corelight)
* Expand Broker APIs to allow tracking directionality of peering establishment (Christian Kreibich, Corelight)
7.2.0-dev.565 | 2025-04-18 11:36:17 -0700
* Use longer path when including krb5.h to match the cmake lookup (Tim Wojtulewicz, Corelight)

7
NEWS
View file

@ -124,6 +124,13 @@ Changed Functionality
looking up the "Conn::LOG" identifier allows to directly query the ``EnumVal``
using ``ID::GetVal()``.
- When the send buffer to a Broker peer overflows and the "disconnect" overflow
policy is in use, Zeek now only attempts to re-establish peerings when the
node observing the overflow originally established the peering. That is,
re-peering is now only attempted in consistency with the underlying Broker
peering topology. This avoids pointless connection attempts to ephemeral TCP
client-side ports, which clould clutter the Broker logs.
Removed Functionality
---------------------

View file

@ -1 +1 @@
7.2.0-dev.565
7.2.0-dev.569

View file

@ -10,26 +10,22 @@
##! - In cluster.log, with a higher-level message indicating the node names involved.
##! - Via telemetry, using a labeled counter.
event Broker::peer_removed(endpoint: Broker::EndpointInfo, msg: string)
event Broker::peer_removed(ep: Broker::EndpointInfo, msg: string)
{
if ( "caf::sec::backpressure_overflow" !in msg ) {
return;
}
if ( ! endpoint?$network ) {
Reporter::error(fmt("Missing network info to re-peer with %s", endpoint$id));
if ( ! ep?$network ) {
Reporter::error(fmt("Missing network info to re-peer with %s", ep$id));
return;
}
# Re-establish the peering so Broker's reconnect behavior kicks in once
# the other endpoint catches up. Broker will periodically re-try
# connecting as necessary. If the other endpoint originally connected to
# us, our attempt will fail (since we attempt to connect to the peer's
# ephemeral port), but in that case the peer will reconnect with us once
# it recovers.
#
# We could do this more cleanly by leveraging information from the
# cluster framework (since it knows who connects to whom), but that
# would further entangle Broker into it.
Broker::peer(endpoint$network$address, endpoint$network$bound_port);
# Re-establish the peering. Broker will periodically re-try connecting
# as necessary. Do this only if the local node originally established
# the peering, otherwise we would connect to an ephemeral client-side
# TCP port that doesn't listen. If we didn't originally establish the
# peering, the other side will retry anyway.
if ( Broker::is_outbound_peering(ep$network$address, ep$network$bound_port) )
Broker::peer(ep$network$address, ep$network$bound_port);
}

View file

@ -264,6 +264,10 @@ export {
type PeerInfo: record {
peer: EndpointInfo;
status: PeerStatus;
## Whether the local node created the peering, as opposed to a
## remote establishing it by connecting to us.
is_outbound: bool;
};
type PeerInfos: vector of PeerInfo;
@ -367,6 +371,16 @@ export {
## TODO: We do not have a function yet to terminate a connection.
global unpeer: function(a: string, p: port): bool;
## Whether the local node originally initiated the peering with the
## given endpoint.
##
## a: the address used in previous successful call to :zeek:see:`Broker::peer`.
##
## p: the port used in previous successful call to :zeek:see:`Broker::peer`.
##
## Returns:: True if this node initiated the peering.
global is_outbound_peering: function(a: string, p: port): bool;
## Get a list of all peer connections.
##
## Returns: a list of all peer connections.
@ -522,6 +536,11 @@ function unpeer(a: string, p: port): bool
return __unpeer(a, p);
}
function is_outbound_peering(a: string, p: port): bool
{
return __is_outbound_peering(a, p);
}
function peers(): vector of PeerInfo
{
return __peers();

View file

@ -233,6 +233,7 @@ public:
LoggerQueuePtr loggerQueue;
SeverityLevel logSeverity = SeverityLevel::critical;
SeverityLevel stderrSeverity = SeverityLevel::critical;
std::unordered_set<broker::network_info> outbound_peerings;
};
const broker::endpoint_info Manager::NoPeer{{}, {}};
@ -582,6 +583,7 @@ void Manager::Peer(const string& addr, uint16_t port, double retry) {
auto secs = broker::timeout::seconds(static_cast<uint64_t>(retry));
bstate->endpoint.peer_nosync(addr, port, secs);
bstate->outbound_peerings.emplace(broker::network_info(addr, port));
auto counts_as_iosource = get_option("Broker::peer_counts_as_iosource")->AsBool();
@ -613,6 +615,15 @@ void Manager::Unpeer(const string& addr, uint16_t port) {
FlushLogBuffers();
bstate->endpoint.unpeer_nosync(addr, port);
bstate->outbound_peerings.erase(broker::network_info(addr, port));
}
bool Manager::IsOutboundPeering(const string& addr, uint16_t port) const {
return bstate->outbound_peerings.find(broker::network_info(addr, port)) != bstate->outbound_peerings.end();
}
bool Manager::IsOutboundPeering(const broker::network_info& ni) const {
return bstate->outbound_peerings.find(ni) != bstate->outbound_peerings.end();
}
std::vector<broker::peer_info> Manager::Peers() const {

View file

@ -154,6 +154,21 @@ public:
*/
void Unpeer(const std::string& addr, uint16_t port);
/**
* Whether the local node originally initiated the peering with the
* given endpoint.
* @param addr the address used in zeek::Broker::Manager::Peer().
* @param port the port used in zeek::Broker::Manager::Peer().
*/
bool IsOutboundPeering(const std::string& addr, uint16_t port) const;
/**
* Whether the local node originally initiated the peering with the
* given endpoint.
* @param ni the address and port used in zeek::Broker::Manager::Peer().
*/
bool IsOutboundPeering(const broker::network_info& ni) const;
/**
* @return a list of peer endpoints.
*/

View file

@ -203,6 +203,12 @@ function Broker::__unpeer%(a: string, p: port%): bool
return zeek::val_mgr->True();
%}
function Broker::__is_outbound_peering%(a: string, p: port%): bool
%{
zeek::Broker::Manager::ScriptScopeGuard ssg;
return zeek::val_mgr->Bool(broker_mgr->IsOutboundPeering(a->CheckString(), p->Port()));
%}
function Broker::__peers%(%): PeerInfos
%{
zeek::Broker::Manager::ScriptScopeGuard ssg;
@ -237,6 +243,15 @@ function Broker::__peers%(%): PeerInfos
peer_info->Assign(0, std::move(endpoint_info));
peer_info->Assign(1, zeek::BifType::Enum::Broker::PeerStatus->GetEnumVal(ps));
// Broker has an existing concept of peer flags, see the broker::peer_info
// and broker::peer_flags structs. They currently aren't currently, but
// we can update the following logic once they are.
if ( p.peer.network.has_value() )
peer_info->Assign(2, zeek::val_mgr->Bool(broker_mgr->IsOutboundPeering(p.peer.network.value())));
else
peer_info->Assign(2, zeek::val_mgr->False());
rval->Assign(i, std::move(peer_info));
++i;
}

View file

@ -0,0 +1,4 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
peered, this is the outgoing peering: T
via Broker::peers(): T
after unpeering: F

View file

@ -0,0 +1,3 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
peered, this is the outgoing peering: F
via Broker::peers(): F

View file

@ -1,2 +1,2 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
557 seen BiFs, 0 unseen BiFs (), 0 new BiFs ()
558 seen BiFs, 0 unseen BiFs (), 0 new BiFs ()

View file

@ -0,0 +1,54 @@
# This tests whether the script-layer can correctly query if a given Broker
# peering originated from the local node or from another node that peered with it.
#
# @TEST-GROUP: broker
# @TEST-PORT: BROKER_PORT
#
# @TEST-EXEC: btest-bg-run client "zeek -b ../client.zeek >out"
# @TEST-EXEC: btest-bg-run server "zeek -b ../server.zeek >out"
#
# @TEST-EXEC: btest-bg-wait 15
# @TEST-EXEC: btest-diff client/out
# @TEST-EXEC: btest-diff server/out
# @TEST-START-FILE client.zeek
redef exit_only_after_terminate = T;
event zeek_init()
{
Broker::subscribe("zeek/event/my_topic");
Broker::peer("127.0.0.1", to_port(getenv("BROKER_PORT")));
}
event Broker::peer_added(ep: Broker::EndpointInfo, msg: string)
{
print fmt("peered, this is the outgoing peering: %s",
Broker::is_outbound_peering(ep$network$address, ep$network$bound_port));
print fmt("via Broker::peers(): %s", Broker::peers()[0]$is_outbound);
Broker::unpeer("127.0.0.1", to_port(getenv("BROKER_PORT")));
print fmt("after unpeering: %s",
Broker::is_outbound_peering(ep$network$address, ep$network$bound_port));
terminate();
}
# @TEST-END-FILE
# @TEST-START-FILE server.zeek
redef exit_only_after_terminate = T;
event zeek_init()
{
Broker::subscribe("zeek/event/my_topic");
Broker::listen("127.0.0.1", to_port(getenv("BROKER_PORT")));
}
event Broker::peer_added(ep: Broker::EndpointInfo, msg: string)
{
print fmt("peered, this is the outgoing peering: %s",
Broker::is_outbound_peering(ep$network$address, ep$network$bound_port));
print fmt("via Broker::peers(): %s", Broker::peers()[0]$is_outbound);
terminate();
}
# @TEST-END-FILE

View file

@ -38,6 +38,7 @@ global known_BiFs = set(
"Broker::__insert_into_set",
"Broker::__insert_into_table",
"Broker::__is_closed",
"Broker::__is_outbound_peering",
"Broker::__keys",
"Broker::__listen",
"Broker::__node_id",