diff --git a/CHANGES b/CHANGES index 6f765443f9..b9458bc634 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,25 @@ +7.2.0-dev.173 | 2025-02-05 16:40:41 +0100 + + * cluster/zeromq: Call DoTerminate() in destructor (Arne Welzel, Corelight) + + Normal life-cycle is that Terminate() / DoTerminate() is called + by zeek-setup code. If that doesn't happen, shutdown and join + threads during destructor. + + try { } catch (...) suggested by Benjamin. + + * cluster/zeromq: Use lambda for thread trampoline (Arne Welzel, Corelight) + + * cluster/zeromq: Do not call util::fmt() from thread (Arne Welzel, Corelight) + + ...util::fmt() uses a static buffer, so this is problematic. + + I've dabbled a bit replacing std::thread with using threading::BasicThread + which would offer Fmt(), but this makes things more complicated. Primarily + as BasicThread is registered with the thread manager and the shutdown + interactions become entangled. The thread might be terminated before the + backend, or vice-versa. Seems nicer for the thread to be owned by the backend. + 7.2.0-dev.169 | 2025-02-05 11:10:21 +0100 * cluster/zeromq: Fix Unsubscribe() bug caused by \x00 prefix (Arne Welzel, Corelight) diff --git a/VERSION b/VERSION index aa26564136..48bb374b1d 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -7.2.0-dev.169 +7.2.0-dev.173 diff --git a/src/cluster/backend/zeromq/ZeroMQ.cc b/src/cluster/backend/zeromq/ZeroMQ.cc index ef117749ac..99207e82f5 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.cc +++ b/src/cluster/backend/zeromq/ZeroMQ.cc @@ -59,15 +59,6 @@ constexpr DebugFlag operator&(zeek_uint_t x, DebugFlag y) { } \ } while ( 0 ) -namespace { -void self_thread_fun(void* arg) { - auto* self = static_cast(arg); - self->Run(); -} - -} // namespace - - ZeroMQBackend::ZeroMQBackend(std::unique_ptr es, std::unique_ptr ls, std::unique_ptr ehs) : ThreadedBackend(std::move(es), std::move(ls), std::move(ehs)) { @@ -75,6 +66,16 @@ ZeroMQBackend::ZeroMQBackend(std::unique_ptr es, std::unique_pt main_inproc = zmq::socket_t(ctx, zmq::socket_type::pair); } +ZeroMQBackend::~ZeroMQBackend() { + try { + // DoTerminate is idempotent. + DoTerminate(); + } catch ( ... ) { + // This should never happen. + abort(); + } +} + void ZeroMQBackend::DoInitPostScript() { ThreadedBackend::DoInitPostScript(); @@ -118,6 +119,7 @@ void ZeroMQBackend::DoTerminate() { if ( proxy_thread ) { ZEROMQ_DEBUG("Shutting down proxy thread"); proxy_thread->Shutdown(); + proxy_thread.reset(); } ZEROMQ_DEBUG("Terminated"); @@ -227,7 +229,9 @@ bool ZeroMQBackend::DoInit() { // Setup connectivity between main and child thread. main_inproc.bind("inproc://inproc-bridge"); child_inproc.connect("inproc://inproc-bridge"); - self_thread = std::thread(self_thread_fun, this); + + // Thread is joined in backend->DoTerminate(), backend outlives it. + self_thread = std::thread([](auto* backend) { backend->Run(); }, this); // After connecting, call ThreadedBackend::DoInit() to register // the IO source with the loop. @@ -356,7 +360,9 @@ bool ZeroMQBackend::DoPublishLogWrites(const logging::detail::LogWriteHeader& he } void ZeroMQBackend::Run() { - util::detail::set_thread_name(zeek::util::fmt("zmq-%p", this)); + char name[4 + 2 + 16 + 1]{}; // zmq-0x<8byte pointer in hex> + snprintf(name, sizeof(name), "zmq-%p", this); + util::detail::set_thread_name(name); ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::THREAD, "Thread starting (%p)\n", this); using MultipartMessage = std::vector; diff --git a/src/cluster/backend/zeromq/ZeroMQ.h b/src/cluster/backend/zeromq/ZeroMQ.h index 6959fcb59e..3a6783ff5c 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.h +++ b/src/cluster/backend/zeromq/ZeroMQ.h @@ -20,6 +20,11 @@ public: ZeroMQBackend(std::unique_ptr es, std::unique_ptr ls, std::unique_ptr ehs); + /** + * Destructor. + */ + ~ZeroMQBackend(); + /** * Spawns a thread running zmq_proxy() for the configured XPUB/XSUB listen * sockets. Only one node in a cluster should do this.