cluster/Backend: Add ProcessError()

Allow backends to pass errors to a strategy. Locally, these raise
Cluster::Backend::error() events that are logged to the reporter
as errors.
This commit is contained in:
Arne Welzel 2025-04-10 16:15:54 +02:00
parent fcc0f45c57
commit 3d3b7a0759
5 changed files with 69 additions and 4 deletions

View file

@ -667,3 +667,10 @@ event websocket_client_lost(endpoint: EndpointInfo)
endpoint$id, endpoint$network$address, endpoint$network$bound_port); endpoint$id, endpoint$network$address, endpoint$network$bound_port);
Cluster::log(msg); Cluster::log(msg);
} }
# If a backend reports an error, propagate it via a reporter error message.
event Cluster::Backend::error(tag: string, message: string)
{
local msg = fmt("Cluster::Backend::error: %s (%s)", tag, message);
Reporter::error(msg);
}

View file

@ -13,6 +13,7 @@
#include "zeek/cluster/Manager.h" #include "zeek/cluster/Manager.h"
#include "zeek/cluster/OnLoop.h" #include "zeek/cluster/OnLoop.h"
#include "zeek/cluster/Serializer.h" #include "zeek/cluster/Serializer.h"
#include "zeek/cluster/cluster.bif.h"
#include "zeek/logging/Manager.h" #include "zeek/logging/Manager.h"
#include "zeek/util.h" #include "zeek/util.h"
@ -28,6 +29,13 @@ void detail::LocalEventHandlingStrategy::DoProcessLocalEvent(EventHandlerPtr h,
zeek::event_mgr.Enqueue(h, std::move(args)); zeek::event_mgr.Enqueue(h, std::move(args));
} }
// Backend errors are raised via a generic Cluster::Backend::error(tag, message) event.
void detail::LocalEventHandlingStrategy::DoProcessError(std::string_view tag, std::string_view message) {
if ( Cluster::Backend::error )
zeek::event_mgr.Enqueue(Cluster::Backend::error, zeek::make_intrusive<zeek::StringVal>(tag),
zeek::make_intrusive<zeek::StringVal>(message));
}
std::optional<zeek::Args> detail::check_args(const zeek::FuncValPtr& handler, zeek::ArgsSpan args) { std::optional<zeek::Args> detail::check_args(const zeek::FuncValPtr& handler, zeek::ArgsSpan args) {
const auto& func_type = handler->GetType<zeek::FuncType>(); const auto& func_type = handler->GetType<zeek::FuncType>();
@ -127,6 +135,10 @@ bool Backend::ProcessEvent(std::string_view topic, detail::Event e) {
return event_handling_strategy->ProcessEvent(topic, std::move(e)); return event_handling_strategy->ProcessEvent(topic, std::move(e));
} }
void Backend::ProcessError(std::string_view tag, std::string_view message) {
return event_handling_strategy->ProcessError(tag, message);
}
bool Backend::ProcessEventMessage(std::string_view topic, std::string_view format, byte_buffer_span payload) { bool Backend::ProcessEventMessage(std::string_view topic, std::string_view format, byte_buffer_span payload) {
if ( format != event_serializer->Name() ) { if ( format != event_serializer->Name() ) {
zeek::reporter->Error("ProcessEventMessage: Wrong format: %s vs %s", std::string{format}.c_str(), zeek::reporter->Error("ProcessEventMessage: Wrong format: %s vs %s", std::string{format}.c_str(),

View file

@ -112,12 +112,20 @@ public:
* When the backend is instantiated for a WebSocket client, * When the backend is instantiated for a WebSocket client,
* local scripting layer should not raise events for the * local scripting layer should not raise events for the
* WebSocket client. * WebSocket client.
*
* @param h The event handler to use. * @param h The event handler to use.
* @param args The event arguments. * @param args The event arguments.
*/ */
void ProcessLocalEvent(EventHandlerPtr h, zeek::Args args) { DoProcessLocalEvent(h, std::move(args)); } void ProcessLocalEvent(EventHandlerPtr h, zeek::Args args) { DoProcessLocalEvent(h, std::move(args)); }
/**
* Process an error.
*
* @param tag A stringified structured error tag not further specified.
* @param message A free form message with more context.
*/
void ProcessError(std::string_view tag, std::string_view message) { return DoProcessError(tag, message); };
private: private:
/** /**
* Hook method for implementing ProcessEvent(). * Hook method for implementing ProcessEvent().
@ -136,6 +144,14 @@ private:
* @param args The event arguments. * @param args The event arguments.
*/ */
virtual void DoProcessLocalEvent(EventHandlerPtr h, zeek::Args args) = 0; virtual void DoProcessLocalEvent(EventHandlerPtr h, zeek::Args args) = 0;
/**
* Hook method for implementing ProcessError().
*
* @param tag A stringified structured error tag not further specified.
* @param message A free form message with more context.
*/
virtual void DoProcessError(std::string_view tag, std::string_view message) = 0;
}; };
/** /**
@ -145,6 +161,7 @@ class LocalEventHandlingStrategy : public EventHandlingStrategy {
private: private:
bool DoProcessEvent(std::string_view topic, Event e) override; bool DoProcessEvent(std::string_view topic, Event e) override;
void DoProcessLocalEvent(EventHandlerPtr h, zeek::Args args) override; void DoProcessLocalEvent(EventHandlerPtr h, zeek::Args args) override;
void DoProcessError(std::string_view tag, std::string_view message) override;
}; };
/** /**
@ -322,6 +339,18 @@ protected:
*/ */
bool ProcessEvent(std::string_view topic, detail::Event e); bool ProcessEvent(std::string_view topic, detail::Event e);
/**
* An error happened, pass it to the event handling strategy.
*
* Errors are not necessarily in response to a publish operation, but
* can also be raised when receiving messages. E.g. if received data
* couldn't be properly parsed.
*
* @param tag A stringified structured error tag not further specified.
* @param message A free form message with more context.
*/
void ProcessError(std::string_view tag, std::string_view message);
/** /**
* Process an incoming event message. * Process an incoming event message.
*/ */

View file

@ -201,3 +201,12 @@ function Cluster::__listen_websocket%(options: WebSocketServerOptions%): bool
auto result = zeek::cluster::manager->ListenWebSocket(server_options); auto result = zeek::cluster::manager->ListenWebSocket(server_options);
return zeek::val_mgr->Bool(result); return zeek::val_mgr->Bool(result);
%} %}
module Cluster::Backend;
## Generated on cluster backend error.
##
## tag: A structured tag, not further specified.
##
## message: A free form message with more details about the error.
event error%(tag: string, message: string%);

View file

@ -87,6 +87,14 @@ private:
*/ */
void DoProcessLocalEvent(zeek::EventHandlerPtr h, zeek::Args args) override {} void DoProcessLocalEvent(zeek::EventHandlerPtr h, zeek::Args args) override {}
/**
* Send errors directly to the client.
*/
void DoProcessError(std::string_view tag, std::string_view message) override {
// Just send out the error.
wsc->SendError(tag, message);
}
std::string buffer; std::string buffer;
std::shared_ptr<WebSocketClient> wsc; std::shared_ptr<WebSocketClient> wsc;
WebSocketEventDispatcher* dispatcher; WebSocketEventDispatcher* dispatcher;
@ -128,14 +136,14 @@ private:
// Inspired by broker/internal/json_client.cc // Inspired by broker/internal/json_client.cc
WebSocketClient::SendInfo WebSocketClient::SendError(std::string_view code, std::string_view message) { WebSocketClient::SendInfo WebSocketClient::SendError(std::string_view tag, std::string_view message) {
std::string buf; std::string buf;
buf.reserve(code.size() + message.size() + 32); buf.reserve(tag.size() + message.size() + 32);
auto out = std::back_inserter(buf); auto out = std::back_inserter(buf);
*out++ = '{'; *out++ = '{';
broker::format::json::v1::append_field("type", "error", out); broker::format::json::v1::append_field("type", "error", out);
*out++ = ','; *out++ = ',';
broker::format::json::v1::append_field("code", code, out); broker::format::json::v1::append_field("code", tag, out);
*out++ = ','; *out++ = ',';
broker::format::json::v1::append_field("message", message, out); broker::format::json::v1::append_field("message", message, out);
*out++ = '}'; *out++ = '}';