diff --git a/CHANGES b/CHANGES index fc9c26e9fa..82b47e55b0 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,9 @@ +7.2.0-dev.569 | 2025-04-21 17:14:44 -0700 + + * Use Broker peering directionality when re-peering after backpressure overflows (Christian Kreibich, Corelight) + + * Expand Broker APIs to allow tracking directionality of peering establishment (Christian Kreibich, Corelight) + 7.2.0-dev.565 | 2025-04-18 11:36:17 -0700 * Use longer path when including krb5.h to match the cmake lookup (Tim Wojtulewicz, Corelight) diff --git a/NEWS b/NEWS index ef76edcb44..5fe7b1db10 100644 --- a/NEWS +++ b/NEWS @@ -124,6 +124,13 @@ Changed Functionality looking up the "Conn::LOG" identifier allows to directly query the ``EnumVal`` using ``ID::GetVal()``. +- When the send buffer to a Broker peer overflows and the "disconnect" overflow + policy is in use, Zeek now only attempts to re-establish peerings when the + node observing the overflow originally established the peering. That is, + re-peering is now only attempted in consistency with the underlying Broker + peering topology. This avoids pointless connection attempts to ephemeral TCP + client-side ports, which clould clutter the Broker logs. + Removed Functionality --------------------- diff --git a/VERSION b/VERSION index e9916690ac..947cb69e2a 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -7.2.0-dev.565 +7.2.0-dev.569 diff --git a/scripts/base/frameworks/broker/backpressure.zeek b/scripts/base/frameworks/broker/backpressure.zeek index 652935eed9..592c90cf75 100644 --- a/scripts/base/frameworks/broker/backpressure.zeek +++ b/scripts/base/frameworks/broker/backpressure.zeek @@ -10,26 +10,22 @@ ##! - In cluster.log, with a higher-level message indicating the node names involved. ##! - Via telemetry, using a labeled counter. -event Broker::peer_removed(endpoint: Broker::EndpointInfo, msg: string) +event Broker::peer_removed(ep: Broker::EndpointInfo, msg: string) { if ( "caf::sec::backpressure_overflow" !in msg ) { return; } - if ( ! endpoint?$network ) { - Reporter::error(fmt("Missing network info to re-peer with %s", endpoint$id)); + if ( ! ep?$network ) { + Reporter::error(fmt("Missing network info to re-peer with %s", ep$id)); return; } - # Re-establish the peering so Broker's reconnect behavior kicks in once - # the other endpoint catches up. Broker will periodically re-try - # connecting as necessary. If the other endpoint originally connected to - # us, our attempt will fail (since we attempt to connect to the peer's - # ephemeral port), but in that case the peer will reconnect with us once - # it recovers. - # - # We could do this more cleanly by leveraging information from the - # cluster framework (since it knows who connects to whom), but that - # would further entangle Broker into it. - Broker::peer(endpoint$network$address, endpoint$network$bound_port); + # Re-establish the peering. Broker will periodically re-try connecting + # as necessary. Do this only if the local node originally established + # the peering, otherwise we would connect to an ephemeral client-side + # TCP port that doesn't listen. If we didn't originally establish the + # peering, the other side will retry anyway. + if ( Broker::is_outbound_peering(ep$network$address, ep$network$bound_port) ) + Broker::peer(ep$network$address, ep$network$bound_port); } diff --git a/scripts/base/frameworks/broker/main.zeek b/scripts/base/frameworks/broker/main.zeek index 1213555dc5..1cd99b3771 100644 --- a/scripts/base/frameworks/broker/main.zeek +++ b/scripts/base/frameworks/broker/main.zeek @@ -264,6 +264,10 @@ export { type PeerInfo: record { peer: EndpointInfo; status: PeerStatus; + + ## Whether the local node created the peering, as opposed to a + ## remote establishing it by connecting to us. + is_outbound: bool; }; type PeerInfos: vector of PeerInfo; @@ -367,6 +371,16 @@ export { ## TODO: We do not have a function yet to terminate a connection. global unpeer: function(a: string, p: port): bool; + ## Whether the local node originally initiated the peering with the + ## given endpoint. + ## + ## a: the address used in previous successful call to :zeek:see:`Broker::peer`. + ## + ## p: the port used in previous successful call to :zeek:see:`Broker::peer`. + ## + ## Returns:: True if this node initiated the peering. + global is_outbound_peering: function(a: string, p: port): bool; + ## Get a list of all peer connections. ## ## Returns: a list of all peer connections. @@ -522,6 +536,11 @@ function unpeer(a: string, p: port): bool return __unpeer(a, p); } +function is_outbound_peering(a: string, p: port): bool + { + return __is_outbound_peering(a, p); + } + function peers(): vector of PeerInfo { return __peers(); diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index 3ea29b8b58..a77f050ede 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -233,6 +233,7 @@ public: LoggerQueuePtr loggerQueue; SeverityLevel logSeverity = SeverityLevel::critical; SeverityLevel stderrSeverity = SeverityLevel::critical; + std::unordered_set outbound_peerings; }; const broker::endpoint_info Manager::NoPeer{{}, {}}; @@ -582,6 +583,7 @@ void Manager::Peer(const string& addr, uint16_t port, double retry) { auto secs = broker::timeout::seconds(static_cast(retry)); bstate->endpoint.peer_nosync(addr, port, secs); + bstate->outbound_peerings.emplace(broker::network_info(addr, port)); auto counts_as_iosource = get_option("Broker::peer_counts_as_iosource")->AsBool(); @@ -613,6 +615,15 @@ void Manager::Unpeer(const string& addr, uint16_t port) { FlushLogBuffers(); bstate->endpoint.unpeer_nosync(addr, port); + bstate->outbound_peerings.erase(broker::network_info(addr, port)); +} + +bool Manager::IsOutboundPeering(const string& addr, uint16_t port) const { + return bstate->outbound_peerings.find(broker::network_info(addr, port)) != bstate->outbound_peerings.end(); +} + +bool Manager::IsOutboundPeering(const broker::network_info& ni) const { + return bstate->outbound_peerings.find(ni) != bstate->outbound_peerings.end(); } std::vector Manager::Peers() const { diff --git a/src/broker/Manager.h b/src/broker/Manager.h index d2dfbf2261..215e9efdf5 100644 --- a/src/broker/Manager.h +++ b/src/broker/Manager.h @@ -154,6 +154,21 @@ public: */ void Unpeer(const std::string& addr, uint16_t port); + /** + * Whether the local node originally initiated the peering with the + * given endpoint. + * @param addr the address used in zeek::Broker::Manager::Peer(). + * @param port the port used in zeek::Broker::Manager::Peer(). + */ + bool IsOutboundPeering(const std::string& addr, uint16_t port) const; + + /** + * Whether the local node originally initiated the peering with the + * given endpoint. + * @param ni the address and port used in zeek::Broker::Manager::Peer(). + */ + bool IsOutboundPeering(const broker::network_info& ni) const; + /** * @return a list of peer endpoints. */ diff --git a/src/broker/comm.bif b/src/broker/comm.bif index 81f1114a22..0fefce871d 100644 --- a/src/broker/comm.bif +++ b/src/broker/comm.bif @@ -203,6 +203,12 @@ function Broker::__unpeer%(a: string, p: port%): bool return zeek::val_mgr->True(); %} +function Broker::__is_outbound_peering%(a: string, p: port%): bool + %{ + zeek::Broker::Manager::ScriptScopeGuard ssg; + return zeek::val_mgr->Bool(broker_mgr->IsOutboundPeering(a->CheckString(), p->Port())); + %} + function Broker::__peers%(%): PeerInfos %{ zeek::Broker::Manager::ScriptScopeGuard ssg; @@ -237,6 +243,15 @@ function Broker::__peers%(%): PeerInfos peer_info->Assign(0, std::move(endpoint_info)); peer_info->Assign(1, zeek::BifType::Enum::Broker::PeerStatus->GetEnumVal(ps)); + // Broker has an existing concept of peer flags, see the broker::peer_info + // and broker::peer_flags structs. They currently aren't currently, but + // we can update the following logic once they are. + + if ( p.peer.network.has_value() ) + peer_info->Assign(2, zeek::val_mgr->Bool(broker_mgr->IsOutboundPeering(p.peer.network.value()))); + else + peer_info->Assign(2, zeek::val_mgr->False()); + rval->Assign(i, std::move(peer_info)); ++i; } diff --git a/testing/btest/Baseline/broker.peering-directionality/client.out b/testing/btest/Baseline/broker.peering-directionality/client.out new file mode 100644 index 0000000000..4dbe099979 --- /dev/null +++ b/testing/btest/Baseline/broker.peering-directionality/client.out @@ -0,0 +1,4 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +peered, this is the outgoing peering: T +via Broker::peers(): T +after unpeering: F diff --git a/testing/btest/Baseline/broker.peering-directionality/server.out b/testing/btest/Baseline/broker.peering-directionality/server.out new file mode 100644 index 0000000000..e0845c9bf0 --- /dev/null +++ b/testing/btest/Baseline/broker.peering-directionality/server.out @@ -0,0 +1,3 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +peered, this is the outgoing peering: F +via Broker::peers(): F diff --git a/testing/btest/Baseline/opt.ZAM-bif-tracking/output b/testing/btest/Baseline/opt.ZAM-bif-tracking/output index 3c90b8960c..c20efb9801 100644 --- a/testing/btest/Baseline/opt.ZAM-bif-tracking/output +++ b/testing/btest/Baseline/opt.ZAM-bif-tracking/output @@ -1,2 +1,2 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. -557 seen BiFs, 0 unseen BiFs (), 0 new BiFs () +558 seen BiFs, 0 unseen BiFs (), 0 new BiFs () diff --git a/testing/btest/broker/peering-directionality.zeek b/testing/btest/broker/peering-directionality.zeek new file mode 100644 index 0000000000..080ee7224b --- /dev/null +++ b/testing/btest/broker/peering-directionality.zeek @@ -0,0 +1,54 @@ +# This tests whether the script-layer can correctly query if a given Broker +# peering originated from the local node or from another node that peered with it. +# +# @TEST-GROUP: broker +# @TEST-PORT: BROKER_PORT +# +# @TEST-EXEC: btest-bg-run client "zeek -b ../client.zeek >out" +# @TEST-EXEC: btest-bg-run server "zeek -b ../server.zeek >out" +# +# @TEST-EXEC: btest-bg-wait 15 +# @TEST-EXEC: btest-diff client/out +# @TEST-EXEC: btest-diff server/out + +# @TEST-START-FILE client.zeek +redef exit_only_after_terminate = T; + +event zeek_init() + { + Broker::subscribe("zeek/event/my_topic"); + Broker::peer("127.0.0.1", to_port(getenv("BROKER_PORT"))); + } + +event Broker::peer_added(ep: Broker::EndpointInfo, msg: string) + { + print fmt("peered, this is the outgoing peering: %s", + Broker::is_outbound_peering(ep$network$address, ep$network$bound_port)); + print fmt("via Broker::peers(): %s", Broker::peers()[0]$is_outbound); + + Broker::unpeer("127.0.0.1", to_port(getenv("BROKER_PORT"))); + + print fmt("after unpeering: %s", + Broker::is_outbound_peering(ep$network$address, ep$network$bound_port)); + terminate(); + } +# @TEST-END-FILE + + +# @TEST-START-FILE server.zeek +redef exit_only_after_terminate = T; + +event zeek_init() + { + Broker::subscribe("zeek/event/my_topic"); + Broker::listen("127.0.0.1", to_port(getenv("BROKER_PORT"))); +} + +event Broker::peer_added(ep: Broker::EndpointInfo, msg: string) + { + print fmt("peered, this is the outgoing peering: %s", + Broker::is_outbound_peering(ep$network$address, ep$network$bound_port)); + print fmt("via Broker::peers(): %s", Broker::peers()[0]$is_outbound); + terminate(); + } +# @TEST-END-FILE diff --git a/testing/btest/opt/ZAM-bif-tracking.zeek b/testing/btest/opt/ZAM-bif-tracking.zeek index b57e55e762..4dbce8e247 100644 --- a/testing/btest/opt/ZAM-bif-tracking.zeek +++ b/testing/btest/opt/ZAM-bif-tracking.zeek @@ -38,6 +38,7 @@ global known_BiFs = set( "Broker::__insert_into_set", "Broker::__insert_into_table", "Broker::__is_closed", + "Broker::__is_outbound_peering", "Broker::__keys", "Broker::__listen", "Broker::__node_id",