From 3d3b7a07594243b404a4640864d1f396a42128aa Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Thu, 10 Apr 2025 16:15:54 +0200 Subject: [PATCH] cluster/Backend: Add ProcessError() Allow backends to pass errors to a strategy. Locally, these raise Cluster::Backend::error() events that are logged to the reporter as errors. --- scripts/base/frameworks/cluster/main.zeek | 7 +++++ src/cluster/Backend.cc | 12 +++++++++ src/cluster/Backend.h | 31 ++++++++++++++++++++++- src/cluster/cluster.bif | 9 +++++++ src/cluster/websocket/WebSocket.cc | 14 +++++++--- 5 files changed, 69 insertions(+), 4 deletions(-) diff --git a/scripts/base/frameworks/cluster/main.zeek b/scripts/base/frameworks/cluster/main.zeek index 36a19885d6..0520a262c6 100644 --- a/scripts/base/frameworks/cluster/main.zeek +++ b/scripts/base/frameworks/cluster/main.zeek @@ -667,3 +667,10 @@ event websocket_client_lost(endpoint: EndpointInfo) endpoint$id, endpoint$network$address, endpoint$network$bound_port); Cluster::log(msg); } + +# If a backend reports an error, propagate it via a reporter error message. +event Cluster::Backend::error(tag: string, message: string) + { + local msg = fmt("Cluster::Backend::error: %s (%s)", tag, message); + Reporter::error(msg); + } diff --git a/src/cluster/Backend.cc b/src/cluster/Backend.cc index c6e83f345d..c89fd74f8e 100644 --- a/src/cluster/Backend.cc +++ b/src/cluster/Backend.cc @@ -13,6 +13,7 @@ #include "zeek/cluster/Manager.h" #include "zeek/cluster/OnLoop.h" #include "zeek/cluster/Serializer.h" +#include "zeek/cluster/cluster.bif.h" #include "zeek/logging/Manager.h" #include "zeek/util.h" @@ -28,6 +29,13 @@ void detail::LocalEventHandlingStrategy::DoProcessLocalEvent(EventHandlerPtr h, zeek::event_mgr.Enqueue(h, std::move(args)); } +// Backend errors are raised via a generic Cluster::Backend::error(tag, message) event. +void detail::LocalEventHandlingStrategy::DoProcessError(std::string_view tag, std::string_view message) { + if ( Cluster::Backend::error ) + zeek::event_mgr.Enqueue(Cluster::Backend::error, zeek::make_intrusive(tag), + zeek::make_intrusive(message)); +} + std::optional detail::check_args(const zeek::FuncValPtr& handler, zeek::ArgsSpan args) { const auto& func_type = handler->GetType(); @@ -127,6 +135,10 @@ bool Backend::ProcessEvent(std::string_view topic, detail::Event e) { return event_handling_strategy->ProcessEvent(topic, std::move(e)); } +void Backend::ProcessError(std::string_view tag, std::string_view message) { + return event_handling_strategy->ProcessError(tag, message); +} + bool Backend::ProcessEventMessage(std::string_view topic, std::string_view format, byte_buffer_span payload) { if ( format != event_serializer->Name() ) { zeek::reporter->Error("ProcessEventMessage: Wrong format: %s vs %s", std::string{format}.c_str(), diff --git a/src/cluster/Backend.h b/src/cluster/Backend.h index 0f6d1e87dc..3b0d956448 100644 --- a/src/cluster/Backend.h +++ b/src/cluster/Backend.h @@ -112,12 +112,20 @@ public: * When the backend is instantiated for a WebSocket client, * local scripting layer should not raise events for the * WebSocket client. - + * * @param h The event handler to use. * @param args The event arguments. */ void ProcessLocalEvent(EventHandlerPtr h, zeek::Args args) { DoProcessLocalEvent(h, std::move(args)); } + /** + * Process an error. + * + * @param tag A stringified structured error tag not further specified. + * @param message A free form message with more context. + */ + void ProcessError(std::string_view tag, std::string_view message) { return DoProcessError(tag, message); }; + private: /** * Hook method for implementing ProcessEvent(). @@ -136,6 +144,14 @@ private: * @param args The event arguments. */ virtual void DoProcessLocalEvent(EventHandlerPtr h, zeek::Args args) = 0; + + /** + * Hook method for implementing ProcessError(). + * + * @param tag A stringified structured error tag not further specified. + * @param message A free form message with more context. + */ + virtual void DoProcessError(std::string_view tag, std::string_view message) = 0; }; /** @@ -145,6 +161,7 @@ class LocalEventHandlingStrategy : public EventHandlingStrategy { private: bool DoProcessEvent(std::string_view topic, Event e) override; void DoProcessLocalEvent(EventHandlerPtr h, zeek::Args args) override; + void DoProcessError(std::string_view tag, std::string_view message) override; }; /** @@ -322,6 +339,18 @@ protected: */ bool ProcessEvent(std::string_view topic, detail::Event e); + /** + * An error happened, pass it to the event handling strategy. + * + * Errors are not necessarily in response to a publish operation, but + * can also be raised when receiving messages. E.g. if received data + * couldn't be properly parsed. + * + * @param tag A stringified structured error tag not further specified. + * @param message A free form message with more context. + */ + void ProcessError(std::string_view tag, std::string_view message); + /** * Process an incoming event message. */ diff --git a/src/cluster/cluster.bif b/src/cluster/cluster.bif index cd39cde68b..08557ff765 100644 --- a/src/cluster/cluster.bif +++ b/src/cluster/cluster.bif @@ -201,3 +201,12 @@ function Cluster::__listen_websocket%(options: WebSocketServerOptions%): bool auto result = zeek::cluster::manager->ListenWebSocket(server_options); return zeek::val_mgr->Bool(result); %} + +module Cluster::Backend; + +## Generated on cluster backend error. +## +## tag: A structured tag, not further specified. +## +## message: A free form message with more details about the error. +event error%(tag: string, message: string%); diff --git a/src/cluster/websocket/WebSocket.cc b/src/cluster/websocket/WebSocket.cc index 46af5b7b2b..0369e09446 100644 --- a/src/cluster/websocket/WebSocket.cc +++ b/src/cluster/websocket/WebSocket.cc @@ -87,6 +87,14 @@ private: */ void DoProcessLocalEvent(zeek::EventHandlerPtr h, zeek::Args args) override {} + /** + * Send errors directly to the client. + */ + void DoProcessError(std::string_view tag, std::string_view message) override { + // Just send out the error. + wsc->SendError(tag, message); + } + std::string buffer; std::shared_ptr wsc; WebSocketEventDispatcher* dispatcher; @@ -128,14 +136,14 @@ private: // Inspired by broker/internal/json_client.cc -WebSocketClient::SendInfo WebSocketClient::SendError(std::string_view code, std::string_view message) { +WebSocketClient::SendInfo WebSocketClient::SendError(std::string_view tag, std::string_view message) { std::string buf; - buf.reserve(code.size() + message.size() + 32); + buf.reserve(tag.size() + message.size() + 32); auto out = std::back_inserter(buf); *out++ = '{'; broker::format::json::v1::append_field("type", "error", out); *out++ = ','; - broker::format::json::v1::append_field("code", code, out); + broker::format::json::v1::append_field("code", tag, out); *out++ = ','; broker::format::json::v1::append_field("message", message, out); *out++ = '}';