diff --git a/src/cluster/backend/zeromq/ZeroMQ.cc b/src/cluster/backend/zeromq/ZeroMQ.cc index 45763fab54..76373b3c64 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.cc +++ b/src/cluster/backend/zeromq/ZeroMQ.cc @@ -91,7 +91,10 @@ ZeroMQBackend::ZeroMQBackend(std::unique_ptr es, std::unique_pt "Number of published events dropped due to XPUB socket HWM.")), total_onloop_drops( zeek::telemetry_mgr->CounterInstance("zeek", "cluster_zeromq_onloop_drops", {}, - "Number of received events dropped due to OnLoop queue full.")) {} + "Number of received events dropped due to OnLoop queue full.")), + total_msg_errors( + zeek::telemetry_mgr->CounterInstance("zeek", "cluster_zeromq_msg_errors", {}, + "Number of events with the wrong number of message parts.")) {} ZeroMQBackend::~ZeroMQBackend() { try { @@ -513,6 +516,7 @@ void ZeroMQBackend::HandleInprocMessages(std::vector& msgs) { } else { ZEROMQ_THREAD_PRINTF("inproc: error: expected 2 or 4 parts, have %zu!\n", msg.size()); + total_msg_errors->Inc(); } } } @@ -522,6 +526,7 @@ void ZeroMQBackend::HandleLogMessages(const std::vector& msgs) // sender, format, type, payload if ( msg.size() != 4 ) { ZEROMQ_THREAD_PRINTF("log: error: expected 4 parts, have %zu!\n", msg.size()); + total_msg_errors->Inc(); continue; } @@ -540,6 +545,7 @@ void ZeroMQBackend::HandleXPubMessages(const std::vector& msgs for ( const auto& msg : msgs ) { if ( msg.size() != 1 ) { ZEROMQ_THREAD_PRINTF("xpub: error: expected 1 part, have %zu!\n", msg.size()); + total_msg_errors->Inc(); continue; } @@ -576,6 +582,7 @@ void ZeroMQBackend::HandleXSubMessages(const std::vector& msgs for ( const auto& msg : msgs ) { if ( msg.size() != 4 ) { ZEROMQ_THREAD_PRINTF("xsub: error: expected 4 parts, have %zu!\n", msg.size()); + total_msg_errors->Inc(); continue; } diff --git a/src/cluster/backend/zeromq/ZeroMQ.h b/src/cluster/backend/zeromq/ZeroMQ.h index 11e905d2d7..952c978dd3 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.h +++ b/src/cluster/backend/zeromq/ZeroMQ.h @@ -140,6 +140,7 @@ private: zeek::telemetry::CounterPtr total_xpub_drops; // events dropped due to XPUB socket hwm reached zeek::telemetry::CounterPtr total_onloop_drops; // events dropped due to onloop queue full + zeek::telemetry::CounterPtr total_msg_errors; // messages with the wrong number of parts // Could rework to log-once-every X seconds if needed. double xpub_drop_last_warn_at = 0.0;