diff --git a/scripts/base/frameworks/broker/__load__.zeek b/scripts/base/frameworks/broker/__load__.zeek index 77dd69d554..a30468a776 100644 --- a/scripts/base/frameworks/broker/__load__.zeek +++ b/scripts/base/frameworks/broker/__load__.zeek @@ -1,3 +1,4 @@ @load ./main @load ./store @load ./log +@load ./backpressure diff --git a/scripts/base/frameworks/broker/backpressure.zeek b/scripts/base/frameworks/broker/backpressure.zeek new file mode 100644 index 0000000000..652935eed9 --- /dev/null +++ b/scripts/base/frameworks/broker/backpressure.zeek @@ -0,0 +1,35 @@ +##! This handles Broker peers that fall so far behind in handling messages that +##! this node sends it that the local Broker endpoint decides to unpeer them. +##! Zeek captures this as follows: +##! +##! - In broker.log, with a regular "peer-removed" entry indicating CAF's reason. +##! - Via eventing through :zeek:see:`Broker::peer_removed` as done in this script. +##! +##! The cluster framework additionally captures the unpeering as follows: +##! +##! - In cluster.log, with a higher-level message indicating the node names involved. +##! - Via telemetry, using a labeled counter. + +event Broker::peer_removed(endpoint: Broker::EndpointInfo, msg: string) + { + if ( "caf::sec::backpressure_overflow" !in msg ) { + return; + } + + if ( ! endpoint?$network ) { + Reporter::error(fmt("Missing network info to re-peer with %s", endpoint$id)); + return; + } + + # Re-establish the peering so Broker's reconnect behavior kicks in once + # the other endpoint catches up. Broker will periodically re-try + # connecting as necessary. If the other endpoint originally connected to + # us, our attempt will fail (since we attempt to connect to the peer's + # ephemeral port), but in that case the peer will reconnect with us once + # it recovers. + # + # We could do this more cleanly by leveraging information from the + # cluster framework (since it knows who connects to whom), but that + # would further entangle Broker into it. + Broker::peer(endpoint$network$address, endpoint$network$bound_port); +} diff --git a/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log b/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log index b66d68514f..4ff5710865 100644 --- a/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log +++ b/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log @@ -116,6 +116,7 @@ scripts/base/init-frameworks-and-bifs.zeek build/scripts/base/bif/data.bif.zeek build/scripts/base/bif/store.bif.zeek scripts/base/frameworks/broker/log.zeek + scripts/base/frameworks/broker/backpressure.zeek scripts/base/frameworks/supervisor/__load__.zeek scripts/base/frameworks/supervisor/control.zeek scripts/base/frameworks/supervisor/main.zeek diff --git a/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log b/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log index a96ef95891..3b3cd83597 100644 --- a/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log +++ b/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log @@ -116,6 +116,7 @@ scripts/base/init-frameworks-and-bifs.zeek build/scripts/base/bif/data.bif.zeek build/scripts/base/bif/store.bif.zeek scripts/base/frameworks/broker/log.zeek + scripts/base/frameworks/broker/backpressure.zeek scripts/base/frameworks/supervisor/__load__.zeek scripts/base/frameworks/supervisor/control.zeek scripts/base/frameworks/supervisor/main.zeek diff --git a/testing/btest/Baseline/plugins.hooks/output b/testing/btest/Baseline/plugins.hooks/output index f36305197b..2fbef0f95f 100644 --- a/testing/btest/Baseline/plugins.hooks/output +++ b/testing/btest/Baseline/plugins.hooks/output @@ -461,6 +461,7 @@ 0.000000 MetaHookPost LoadFile(0, ./addrs, <...>/addrs.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./analyzer.bif.zeek, <...>/analyzer.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./api, <...>/api.zeek) -> -1 +0.000000 MetaHookPost LoadFile(0, ./backpressure, <...>/backpressure.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./bloom-filter.bif.zeek, <...>/bloom-filter.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./cardinality-counter.bif.zeek, <...>/cardinality-counter.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./comm.bif.zeek, <...>/comm.bif.zeek) -> -1 @@ -760,6 +761,7 @@ 0.000000 MetaHookPost LoadFileExtended(0, ./addrs, <...>/addrs.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./analyzer.bif.zeek, <...>/analyzer.bif.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./api, <...>/api.zeek) -> (-1, ) +0.000000 MetaHookPost LoadFileExtended(0, ./backpressure, <...>/backpressure.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./bloom-filter.bif.zeek, <...>/bloom-filter.bif.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./cardinality-counter.bif.zeek, <...>/cardinality-counter.bif.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./comm.bif.zeek, <...>/comm.bif.zeek) -> (-1, ) @@ -1392,6 +1394,7 @@ 0.000000 MetaHookPre LoadFile(0, ./addrs, <...>/addrs.zeek) 0.000000 MetaHookPre LoadFile(0, ./analyzer.bif.zeek, <...>/analyzer.bif.zeek) 0.000000 MetaHookPre LoadFile(0, ./api, <...>/api.zeek) +0.000000 MetaHookPre LoadFile(0, ./backpressure, <...>/backpressure.zeek) 0.000000 MetaHookPre LoadFile(0, ./bloom-filter.bif.zeek, <...>/bloom-filter.bif.zeek) 0.000000 MetaHookPre LoadFile(0, ./cardinality-counter.bif.zeek, <...>/cardinality-counter.bif.zeek) 0.000000 MetaHookPre LoadFile(0, ./comm.bif.zeek, <...>/comm.bif.zeek) @@ -1691,6 +1694,7 @@ 0.000000 MetaHookPre LoadFileExtended(0, ./addrs, <...>/addrs.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./analyzer.bif.zeek, <...>/analyzer.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./api, <...>/api.zeek) +0.000000 MetaHookPre LoadFileExtended(0, ./backpressure, <...>/backpressure.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./bloom-filter.bif.zeek, <...>/bloom-filter.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./cardinality-counter.bif.zeek, <...>/cardinality-counter.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./comm.bif.zeek, <...>/comm.bif.zeek) @@ -2324,6 +2328,7 @@ 0.000000 | HookLoadFile ./api <...>/api.zeek 0.000000 | HookLoadFile ./archive <...>/archive.sig 0.000000 | HookLoadFile ./audio <...>/audio.sig +0.000000 | HookLoadFile ./backpressure <...>/backpressure.zeek 0.000000 | HookLoadFile ./bloom-filter.bif.zeek <...>/bloom-filter.bif.zeek 0.000000 | HookLoadFile ./cardinality-counter.bif.zeek <...>/cardinality-counter.bif.zeek 0.000000 | HookLoadFile ./comm.bif.zeek <...>/comm.bif.zeek @@ -2623,6 +2628,7 @@ 0.000000 | HookLoadFileExtended ./api <...>/api.zeek 0.000000 | HookLoadFileExtended ./archive <...>/archive.sig 0.000000 | HookLoadFileExtended ./audio <...>/audio.sig +0.000000 | HookLoadFileExtended ./backpressure <...>/backpressure.zeek 0.000000 | HookLoadFileExtended ./bloom-filter.bif.zeek <...>/bloom-filter.bif.zeek 0.000000 | HookLoadFileExtended ./cardinality-counter.bif.zeek <...>/cardinality-counter.bif.zeek 0.000000 | HookLoadFileExtended ./comm.bif.zeek <...>/comm.bif.zeek