From 16c745cee433f81ab9b7b2062a829dea34e7127d Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Wed, 5 Feb 2025 13:43:02 +0100 Subject: [PATCH 1/3] cluster/zeromq: Do not call util::fmt() from thread ...util::fmt() uses a static buffer, so this is problematic. I've dabbled a bit replacing std::thread with using threading::BasicThread which would offer Fmt(), but this makes things more complicated. Primarily as BasicThread is registered with the thread manager and the shutdown interactions become entangled. The thread might be terminated before the backend, or vice-versa. Seems nicer for the thread to be owned by the backend. --- src/cluster/backend/zeromq/ZeroMQ.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/cluster/backend/zeromq/ZeroMQ.cc b/src/cluster/backend/zeromq/ZeroMQ.cc index ef117749ac..66e0b2fd9e 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.cc +++ b/src/cluster/backend/zeromq/ZeroMQ.cc @@ -356,7 +356,9 @@ bool ZeroMQBackend::DoPublishLogWrites(const logging::detail::LogWriteHeader& he } void ZeroMQBackend::Run() { - util::detail::set_thread_name(zeek::util::fmt("zmq-%p", this)); + char name[4 + 2 + 16 + 1]{}; // zmq-0x<8byte pointer in hex> + snprintf(name, sizeof(name), "zmq-%p", this); + util::detail::set_thread_name(name); ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::THREAD, "Thread starting (%p)\n", this); using MultipartMessage = std::vector; From 2c6d934ef44159853562cf0b0d245c0d56105dc6 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Wed, 5 Feb 2025 13:49:06 +0100 Subject: [PATCH 2/3] cluster/zeromq: Use lambda for thread trampoline --- src/cluster/backend/zeromq/ZeroMQ.cc | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/src/cluster/backend/zeromq/ZeroMQ.cc b/src/cluster/backend/zeromq/ZeroMQ.cc index 66e0b2fd9e..ee1ec4df89 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.cc +++ b/src/cluster/backend/zeromq/ZeroMQ.cc @@ -59,15 +59,6 @@ constexpr DebugFlag operator&(zeek_uint_t x, DebugFlag y) { } \ } while ( 0 ) -namespace { -void self_thread_fun(void* arg) { - auto* self = static_cast(arg); - self->Run(); -} - -} // namespace - - ZeroMQBackend::ZeroMQBackend(std::unique_ptr es, std::unique_ptr ls, std::unique_ptr ehs) : ThreadedBackend(std::move(es), std::move(ls), std::move(ehs)) { @@ -227,7 +218,9 @@ bool ZeroMQBackend::DoInit() { // Setup connectivity between main and child thread. main_inproc.bind("inproc://inproc-bridge"); child_inproc.connect("inproc://inproc-bridge"); - self_thread = std::thread(self_thread_fun, this); + + // Thread is joined in backend->DoTerminate(), backend outlives it. + self_thread = std::thread([](auto* backend) { backend->Run(); }, this); // After connecting, call ThreadedBackend::DoInit() to register // the IO source with the loop. From 6008e67008103dc1f23ed0015b1f0930def7ce99 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Wed, 5 Feb 2025 15:25:08 +0100 Subject: [PATCH 3/3] cluster/zeromq: Call DoTerminate() in destructor Normal life-cycle is that Terminate() / DoTerminate() is called by zeek-setup code. If that doesn't happen, shutdown and join threads during destructor. try { } catch (...) suggested by Benjamin. --- src/cluster/backend/zeromq/ZeroMQ.cc | 11 +++++++++++ src/cluster/backend/zeromq/ZeroMQ.h | 5 +++++ 2 files changed, 16 insertions(+) diff --git a/src/cluster/backend/zeromq/ZeroMQ.cc b/src/cluster/backend/zeromq/ZeroMQ.cc index ee1ec4df89..99207e82f5 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.cc +++ b/src/cluster/backend/zeromq/ZeroMQ.cc @@ -66,6 +66,16 @@ ZeroMQBackend::ZeroMQBackend(std::unique_ptr es, std::unique_pt main_inproc = zmq::socket_t(ctx, zmq::socket_type::pair); } +ZeroMQBackend::~ZeroMQBackend() { + try { + // DoTerminate is idempotent. + DoTerminate(); + } catch ( ... ) { + // This should never happen. + abort(); + } +} + void ZeroMQBackend::DoInitPostScript() { ThreadedBackend::DoInitPostScript(); @@ -109,6 +119,7 @@ void ZeroMQBackend::DoTerminate() { if ( proxy_thread ) { ZEROMQ_DEBUG("Shutting down proxy thread"); proxy_thread->Shutdown(); + proxy_thread.reset(); } ZEROMQ_DEBUG("Terminated"); diff --git a/src/cluster/backend/zeromq/ZeroMQ.h b/src/cluster/backend/zeromq/ZeroMQ.h index 6959fcb59e..3a6783ff5c 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.h +++ b/src/cluster/backend/zeromq/ZeroMQ.h @@ -20,6 +20,11 @@ public: ZeroMQBackend(std::unique_ptr es, std::unique_ptr ls, std::unique_ptr ehs); + /** + * Destructor. + */ + ~ZeroMQBackend(); + /** * Spawns a thread running zmq_proxy() for the configured XPUB/XSUB listen * sockets. Only one node in a cluster should do this.