From 0010e65f6d7b0e3216a771956d31cb9e0747084b Mon Sep 17 00:00:00 2001 From: Christian Kreibich Date: Fri, 11 Oct 2024 17:12:03 -0700 Subject: [PATCH] Support re-peering with Broker peers that fall behind This adds re-peering at the Broker level for peers that Broker decided to unpeer. We keep this at the Broker level since this behavior is specific to it (as opposed to other cluster backends). Includes baseline updates for btests that pick up on the new script's @load. --- scripts/base/frameworks/broker/__load__.zeek | 1 + .../base/frameworks/broker/backpressure.zeek | 35 +++++++++++++++++++ .../canonified_loaded_scripts.log | 1 + .../canonified_loaded_scripts.log | 1 + testing/btest/Baseline/plugins.hooks/output | 6 ++++ 5 files changed, 44 insertions(+) create mode 100644 scripts/base/frameworks/broker/backpressure.zeek diff --git a/scripts/base/frameworks/broker/__load__.zeek b/scripts/base/frameworks/broker/__load__.zeek index 77dd69d554..a30468a776 100644 --- a/scripts/base/frameworks/broker/__load__.zeek +++ b/scripts/base/frameworks/broker/__load__.zeek @@ -1,3 +1,4 @@ @load ./main @load ./store @load ./log +@load ./backpressure diff --git a/scripts/base/frameworks/broker/backpressure.zeek b/scripts/base/frameworks/broker/backpressure.zeek new file mode 100644 index 0000000000..652935eed9 --- /dev/null +++ b/scripts/base/frameworks/broker/backpressure.zeek @@ -0,0 +1,35 @@ +##! This handles Broker peers that fall so far behind in handling messages that +##! this node sends it that the local Broker endpoint decides to unpeer them. +##! Zeek captures this as follows: +##! +##! - In broker.log, with a regular "peer-removed" entry indicating CAF's reason. +##! - Via eventing through :zeek:see:`Broker::peer_removed` as done in this script. +##! +##! The cluster framework additionally captures the unpeering as follows: +##! +##! - In cluster.log, with a higher-level message indicating the node names involved. +##! - Via telemetry, using a labeled counter. + +event Broker::peer_removed(endpoint: Broker::EndpointInfo, msg: string) + { + if ( "caf::sec::backpressure_overflow" !in msg ) { + return; + } + + if ( ! endpoint?$network ) { + Reporter::error(fmt("Missing network info to re-peer with %s", endpoint$id)); + return; + } + + # Re-establish the peering so Broker's reconnect behavior kicks in once + # the other endpoint catches up. Broker will periodically re-try + # connecting as necessary. If the other endpoint originally connected to + # us, our attempt will fail (since we attempt to connect to the peer's + # ephemeral port), but in that case the peer will reconnect with us once + # it recovers. + # + # We could do this more cleanly by leveraging information from the + # cluster framework (since it knows who connects to whom), but that + # would further entangle Broker into it. + Broker::peer(endpoint$network$address, endpoint$network$bound_port); +} diff --git a/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log b/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log index 5e88f9d327..443d35b00e 100644 --- a/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log +++ b/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log @@ -119,6 +119,7 @@ scripts/base/init-frameworks-and-bifs.zeek build/scripts/base/bif/data.bif.zeek build/scripts/base/bif/store.bif.zeek scripts/base/frameworks/broker/log.zeek + scripts/base/frameworks/broker/backpressure.zeek scripts/base/frameworks/supervisor/__load__.zeek scripts/base/frameworks/supervisor/control.zeek scripts/base/frameworks/supervisor/main.zeek diff --git a/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log b/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log index a3f06f9db9..8a23826c17 100644 --- a/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log +++ b/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log @@ -119,6 +119,7 @@ scripts/base/init-frameworks-and-bifs.zeek build/scripts/base/bif/data.bif.zeek build/scripts/base/bif/store.bif.zeek scripts/base/frameworks/broker/log.zeek + scripts/base/frameworks/broker/backpressure.zeek scripts/base/frameworks/supervisor/__load__.zeek scripts/base/frameworks/supervisor/control.zeek scripts/base/frameworks/supervisor/main.zeek diff --git a/testing/btest/Baseline/plugins.hooks/output b/testing/btest/Baseline/plugins.hooks/output index 48ce2af63b..42b310bcc5 100644 --- a/testing/btest/Baseline/plugins.hooks/output +++ b/testing/btest/Baseline/plugins.hooks/output @@ -461,6 +461,7 @@ 0.000000 MetaHookPost LoadFile(0, ./addrs, <...>/addrs.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./analyzer.bif.zeek, <...>/analyzer.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./api, <...>/api.zeek) -> -1 +0.000000 MetaHookPost LoadFile(0, ./backpressure, <...>/backpressure.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./bloom-filter.bif.zeek, <...>/bloom-filter.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./cardinality-counter.bif.zeek, <...>/cardinality-counter.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./cluster.bif.zeek, <...>/cluster.bif.zeek) -> -1 @@ -765,6 +766,7 @@ 0.000000 MetaHookPost LoadFileExtended(0, ./addrs, <...>/addrs.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./analyzer.bif.zeek, <...>/analyzer.bif.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./api, <...>/api.zeek) -> (-1, ) +0.000000 MetaHookPost LoadFileExtended(0, ./backpressure, <...>/backpressure.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./bloom-filter.bif.zeek, <...>/bloom-filter.bif.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./cardinality-counter.bif.zeek, <...>/cardinality-counter.bif.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./cluster.bif.zeek, <...>/cluster.bif.zeek) -> (-1, ) @@ -1401,6 +1403,7 @@ 0.000000 MetaHookPre LoadFile(0, ./addrs, <...>/addrs.zeek) 0.000000 MetaHookPre LoadFile(0, ./analyzer.bif.zeek, <...>/analyzer.bif.zeek) 0.000000 MetaHookPre LoadFile(0, ./api, <...>/api.zeek) +0.000000 MetaHookPre LoadFile(0, ./backpressure, <...>/backpressure.zeek) 0.000000 MetaHookPre LoadFile(0, ./bloom-filter.bif.zeek, <...>/bloom-filter.bif.zeek) 0.000000 MetaHookPre LoadFile(0, ./cardinality-counter.bif.zeek, <...>/cardinality-counter.bif.zeek) 0.000000 MetaHookPre LoadFile(0, ./cluster.bif.zeek, <...>/cluster.bif.zeek) @@ -1705,6 +1708,7 @@ 0.000000 MetaHookPre LoadFileExtended(0, ./addrs, <...>/addrs.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./analyzer.bif.zeek, <...>/analyzer.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./api, <...>/api.zeek) +0.000000 MetaHookPre LoadFileExtended(0, ./backpressure, <...>/backpressure.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./bloom-filter.bif.zeek, <...>/bloom-filter.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./cardinality-counter.bif.zeek, <...>/cardinality-counter.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./cluster.bif.zeek, <...>/cluster.bif.zeek) @@ -2342,6 +2346,7 @@ 0.000000 | HookLoadFile ./api <...>/api.zeek 0.000000 | HookLoadFile ./archive <...>/archive.sig 0.000000 | HookLoadFile ./audio <...>/audio.sig +0.000000 | HookLoadFile ./backpressure <...>/backpressure.zeek 0.000000 | HookLoadFile ./bloom-filter.bif.zeek <...>/bloom-filter.bif.zeek 0.000000 | HookLoadFile ./cardinality-counter.bif.zeek <...>/cardinality-counter.bif.zeek 0.000000 | HookLoadFile ./cluster.bif.zeek <...>/cluster.bif.zeek @@ -2646,6 +2651,7 @@ 0.000000 | HookLoadFileExtended ./api <...>/api.zeek 0.000000 | HookLoadFileExtended ./archive <...>/archive.sig 0.000000 | HookLoadFileExtended ./audio <...>/audio.sig +0.000000 | HookLoadFileExtended ./backpressure <...>/backpressure.zeek 0.000000 | HookLoadFileExtended ./bloom-filter.bif.zeek <...>/bloom-filter.bif.zeek 0.000000 | HookLoadFileExtended ./cardinality-counter.bif.zeek <...>/cardinality-counter.bif.zeek 0.000000 | HookLoadFileExtended ./cluster.bif.zeek <...>/cluster.bif.zeek