Merge remote-tracking branch 'origin/topic/awelzel/zeromq-fix-fmt-call-thanks-tsan'

* origin/topic/awelzel/zeromq-fix-fmt-call-thanks-tsan:
  cluster/zeromq: Call DoTerminate() in destructor
  cluster/zeromq: Use lambda for thread trampoline
  cluster/zeromq: Do not call util::fmt() from thread
This commit is contained in:
Arne Welzel 2025-02-05 16:40:41 +01:00
commit fbdefd1451
4 changed files with 45 additions and 12 deletions

22
CHANGES
View file

@ -1,3 +1,25 @@
7.2.0-dev.173 | 2025-02-05 16:40:41 +0100
* cluster/zeromq: Call DoTerminate() in destructor (Arne Welzel, Corelight)
Normal life-cycle is that Terminate() / DoTerminate() is called
by zeek-setup code. If that doesn't happen, shutdown and join
threads during destructor.
try { } catch (...) suggested by Benjamin.
* cluster/zeromq: Use lambda for thread trampoline (Arne Welzel, Corelight)
* cluster/zeromq: Do not call util::fmt() from thread (Arne Welzel, Corelight)
...util::fmt() uses a static buffer, so this is problematic.
I've dabbled a bit replacing std::thread with using threading::BasicThread
which would offer Fmt(), but this makes things more complicated. Primarily
as BasicThread is registered with the thread manager and the shutdown
interactions become entangled. The thread might be terminated before the
backend, or vice-versa. Seems nicer for the thread to be owned by the backend.
7.2.0-dev.169 | 2025-02-05 11:10:21 +0100 7.2.0-dev.169 | 2025-02-05 11:10:21 +0100
* cluster/zeromq: Fix Unsubscribe() bug caused by \x00 prefix (Arne Welzel, Corelight) * cluster/zeromq: Fix Unsubscribe() bug caused by \x00 prefix (Arne Welzel, Corelight)

View file

@ -1 +1 @@
7.2.0-dev.169 7.2.0-dev.173

View file

@ -59,15 +59,6 @@ constexpr DebugFlag operator&(zeek_uint_t x, DebugFlag y) {
} \ } \
} while ( 0 ) } while ( 0 )
namespace {
void self_thread_fun(void* arg) {
auto* self = static_cast<ZeroMQBackend*>(arg);
self->Run();
}
} // namespace
ZeroMQBackend::ZeroMQBackend(std::unique_ptr<EventSerializer> es, std::unique_ptr<LogSerializer> ls, ZeroMQBackend::ZeroMQBackend(std::unique_ptr<EventSerializer> es, std::unique_ptr<LogSerializer> ls,
std::unique_ptr<detail::EventHandlingStrategy> ehs) std::unique_ptr<detail::EventHandlingStrategy> ehs)
: ThreadedBackend(std::move(es), std::move(ls), std::move(ehs)) { : ThreadedBackend(std::move(es), std::move(ls), std::move(ehs)) {
@ -75,6 +66,16 @@ ZeroMQBackend::ZeroMQBackend(std::unique_ptr<EventSerializer> es, std::unique_pt
main_inproc = zmq::socket_t(ctx, zmq::socket_type::pair); main_inproc = zmq::socket_t(ctx, zmq::socket_type::pair);
} }
ZeroMQBackend::~ZeroMQBackend() {
try {
// DoTerminate is idempotent.
DoTerminate();
} catch ( ... ) {
// This should never happen.
abort();
}
}
void ZeroMQBackend::DoInitPostScript() { void ZeroMQBackend::DoInitPostScript() {
ThreadedBackend::DoInitPostScript(); ThreadedBackend::DoInitPostScript();
@ -118,6 +119,7 @@ void ZeroMQBackend::DoTerminate() {
if ( proxy_thread ) { if ( proxy_thread ) {
ZEROMQ_DEBUG("Shutting down proxy thread"); ZEROMQ_DEBUG("Shutting down proxy thread");
proxy_thread->Shutdown(); proxy_thread->Shutdown();
proxy_thread.reset();
} }
ZEROMQ_DEBUG("Terminated"); ZEROMQ_DEBUG("Terminated");
@ -227,7 +229,9 @@ bool ZeroMQBackend::DoInit() {
// Setup connectivity between main and child thread. // Setup connectivity between main and child thread.
main_inproc.bind("inproc://inproc-bridge"); main_inproc.bind("inproc://inproc-bridge");
child_inproc.connect("inproc://inproc-bridge"); child_inproc.connect("inproc://inproc-bridge");
self_thread = std::thread(self_thread_fun, this);
// Thread is joined in backend->DoTerminate(), backend outlives it.
self_thread = std::thread([](auto* backend) { backend->Run(); }, this);
// After connecting, call ThreadedBackend::DoInit() to register // After connecting, call ThreadedBackend::DoInit() to register
// the IO source with the loop. // the IO source with the loop.
@ -356,7 +360,9 @@ bool ZeroMQBackend::DoPublishLogWrites(const logging::detail::LogWriteHeader& he
} }
void ZeroMQBackend::Run() { void ZeroMQBackend::Run() {
util::detail::set_thread_name(zeek::util::fmt("zmq-%p", this)); char name[4 + 2 + 16 + 1]{}; // zmq-0x<8byte pointer in hex><nul>
snprintf(name, sizeof(name), "zmq-%p", this);
util::detail::set_thread_name(name);
ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::THREAD, "Thread starting (%p)\n", this); ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::THREAD, "Thread starting (%p)\n", this);
using MultipartMessage = std::vector<zmq::message_t>; using MultipartMessage = std::vector<zmq::message_t>;

View file

@ -20,6 +20,11 @@ public:
ZeroMQBackend(std::unique_ptr<EventSerializer> es, std::unique_ptr<LogSerializer> ls, ZeroMQBackend(std::unique_ptr<EventSerializer> es, std::unique_ptr<LogSerializer> ls,
std::unique_ptr<detail::EventHandlingStrategy> ehs); std::unique_ptr<detail::EventHandlingStrategy> ehs);
/**
* Destructor.
*/
~ZeroMQBackend();
/** /**
* Spawns a thread running zmq_proxy() for the configured XPUB/XSUB listen * Spawns a thread running zmq_proxy() for the configured XPUB/XSUB listen
* sockets. Only one node in a cluster should do this. * sockets. Only one node in a cluster should do this.