diff --git a/scripts/base/frameworks/cluster/main.zeek b/scripts/base/frameworks/cluster/main.zeek index bff45bf29e..36a19885d6 100644 --- a/scripts/base/frameworks/cluster/main.zeek +++ b/scripts/base/frameworks/cluster/main.zeek @@ -327,11 +327,62 @@ export { ## The arguments for the event. args: vector of any; }; + + ## The TLS options for a WebSocket server. + ## + ## If cert_file and key_file are set, TLS is enabled. If both + ## are unset, TLS is disabled. Any other combination is an error. + type WebSocketTLSOptions: record { + ## The cert file to use. + cert_file: string &optional; + ## The key file to use. + key_file: string &optional; + ## Expect peers to send client certificates. + enable_peer_verification: bool &default=F; + ## The CA certificate or CA bundle used for peer verification. + ## Empty will use the implementations's default when + ## ``enable_peer_verification`` is T. + ca_file: string &default=""; + ## The ciphers to use. Empty will use the implementation's defaults. + ciphers: string &default=""; + }; + + ## WebSocket server options to pass to :zeek:see:`Cluster::listen_websocket`. + type WebSocketServerOptions: record { + ## The host address to listen on. + listen_host: string; + ## The port the WebSocket server is supposed to listen on. + listen_port: port; + ## The TLS options used for this WebSocket server. By default, + ## TLS is disabled. See also :zeek:see:`Cluster::WebSocketTLSOptions`. + tls_options: WebSocketTLSOptions &default=WebSocketTLSOptions(); + }; + + ## Start listening on a WebSocket address. + ## + ## options: The server :zeek:see:`Cluster::WebSocketServerOptions` to use. + ## + ## Returns: T on success, else F. + global listen_websocket: function(options: WebSocketServerOptions): bool; + + ## Network information of an endpoint. + type NetworkInfo: record { + ## The IP address or hostname where the endpoint listens. + address: string; + ## The port where the endpoint is bound to. + bound_port: port; + }; + + ## Information about a WebSocket endpoint. + type EndpointInfo: record { + id: string; + network: NetworkInfo; + }; } # Needs declaration of Cluster::Event type. @load base/bif/cluster.bif - +@load base/bif/plugins/Zeek_Cluster_WebSocket.events.bif.zeek # Track active nodes per type. global active_node_ids: table[NodeType] of set[string]; @@ -597,3 +648,22 @@ function unsubscribe(topic: string): bool { return Cluster::__unsubscribe(topic); } + +function listen_websocket(options: WebSocketServerOptions): bool + { + return Cluster::__listen_websocket(options); + } + +event websocket_client_added(endpoint: EndpointInfo, subscriptions: string_vec) + { + local msg = fmt("WebSocket client '%s' (%s:%d) subscribed to %s", + endpoint$id, endpoint$network$address, endpoint$network$bound_port, subscriptions); + Cluster::log(msg); + } + +event websocket_client_lost(endpoint: EndpointInfo) + { + local msg = fmt("WebSocket client '%s' (%s:%d) gone", + endpoint$id, endpoint$network$address, endpoint$network$bound_port); + Cluster::log(msg); + } diff --git a/src/cluster/BifSupport.cc b/src/cluster/BifSupport.cc index 9d3b0ac5c4..5eb5db72eb 100644 --- a/src/cluster/BifSupport.cc +++ b/src/cluster/BifSupport.cc @@ -65,7 +65,7 @@ zeek::RecordValPtr make_event(zeek::ArgsSpan args) { } const auto func = zeek::FuncValPtr{zeek::NewRef{}, maybe_func_val->AsFuncVal()}; - auto checked_args = cluster::detail::check_args(func, args.subspan(1)); + auto checked_args = zeek::cluster::detail::check_args(func, args.subspan(1)); if ( ! checked_args ) return rec; @@ -109,7 +109,7 @@ zeek::ValPtr publish_event(const zeek::ValPtr& topic, zeek::ArgsSpan args) { } else if ( args[0]->GetType()->Tag() == zeek::TYPE_RECORD ) { if ( args[0]->GetType() == cluster_event_type ) { // Handling Cluster::Event record type - auto ev = to_cluster_event(cast_intrusive(args[0])); + auto ev = to_cluster_event(zeek::cast_intrusive(args[0])); if ( ! ev ) return zeek::val_mgr->False(); @@ -148,4 +148,32 @@ bool is_cluster_pool(const zeek::Val* pool) { return pool->GetType() == pool_type; } + +zeek::RecordValPtr make_endpoint_info(const std::string& id, const std::string& address, uint32_t port, + TransportProto proto) { + static const auto ep_info_type = zeek::id::find_type("Cluster::EndpointInfo"); + static const auto net_info_type = zeek::id::find_type("Cluster::NetworkInfo"); + + auto net_rec = zeek::make_intrusive(net_info_type); + net_rec->Assign(0, address); + net_rec->Assign(1, zeek::val_mgr->Port(port, proto)); + + auto ep_rec = zeek::make_intrusive(ep_info_type); + ep_rec->Assign(0, id); + ep_rec->Assign(1, net_rec); + + return ep_rec; +} + +zeek::VectorValPtr make_string_vec(zeek::Span strings) { + static const auto string_vec_type = zeek::id::find_type("string_vec"); + auto vec = zeek::make_intrusive(string_vec_type); + vec->Reserve(strings.size()); + + for ( const auto& s : strings ) + vec->Append(zeek::make_intrusive(s)); + + return vec; +} + } // namespace zeek::cluster::detail::bif diff --git a/src/cluster/BifSupport.h b/src/cluster/BifSupport.h index b02fc97bd0..938962ec58 100644 --- a/src/cluster/BifSupport.h +++ b/src/cluster/BifSupport.h @@ -4,6 +4,7 @@ #include "zeek/IntrusivePtr.h" #include "zeek/Span.h" +#include "zeek/net_util.h" // Helpers for cluster.bif @@ -15,6 +16,8 @@ class Frame; class RecordVal; using RecordValPtr = IntrusivePtr; +class VectorVal; +using VectorValPtr = IntrusivePtr; class Val; using ValPtr = IntrusivePtr; @@ -46,6 +49,28 @@ zeek::ValPtr publish_event(const zeek::ValPtr& topic, zeek::ArgsSpan args); bool is_cluster_pool(const zeek::Val* pool); +/** + * Create a Cluster::EndpointInfo record with a nested Cluster::NetworkInfo record. + * + * @param id The string to use as id in the record. + * @param address The string to use as address in the network record. + * @param port The port to use in the network record. + * @param proto The proto for the given port value. + * + * @returns A record value of type Cluster::EndpointInfo filled with the provided info. + */ +zeek::RecordValPtr make_endpoint_info(const std::string& id, const std::string& address, uint32_t port, + TransportProto proto); + +/** + * Helper to go from a vector or array of std::strings to a zeek::VectorVal. + * + * @param strings The std::string instances. + * + * @return a VectorVal instance of type string_vec filled with strings. + */ +zeek::VectorValPtr make_string_vec(zeek::Span strings); + } // namespace cluster::detail::bif } // namespace zeek diff --git a/src/cluster/CMakeLists.txt b/src/cluster/CMakeLists.txt index bb421c6c01..fa2093be63 100644 --- a/src/cluster/CMakeLists.txt +++ b/src/cluster/CMakeLists.txt @@ -13,3 +13,4 @@ zeek_add_subdir_library( add_subdirectory(backend) add_subdirectory(serializer) +add_subdirectory(websocket) diff --git a/src/cluster/Manager.cc b/src/cluster/Manager.cc index ffac82c396..28b1055f59 100644 --- a/src/cluster/Manager.cc +++ b/src/cluster/Manager.cc @@ -2,7 +2,10 @@ #include "zeek/cluster/Manager.h" +#include "zeek/Func.h" #include "zeek/cluster/Serializer.h" +#include "zeek/cluster/websocket/WebSocket.h" +#include "zeek/util.h" using namespace zeek::cluster; @@ -11,6 +14,15 @@ Manager::Manager() event_serializers(plugin::ComponentManager("Cluster", "EventSerializerTag")), log_serializers(plugin::ComponentManager("Cluster", "LogSerializerTag")) {} +// Force destructor definition into compilation unit to avoid needing the +// full websocket::Server declaration in cluster/Manager.h. +Manager::~Manager() = default; + +void Manager::Terminate() { + for ( const auto& [_, entry] : websocket_servers ) + entry.server->Terminate(); +} + std::unique_ptr Manager::InstantiateBackend( const zeek::EnumValPtr& tag, std::unique_ptr event_serializer, std::unique_ptr log_serializer, @@ -30,3 +42,25 @@ std::unique_ptr Manager::InstantiateLogSerializer(const zeek::Enu const LogSerializerComponent* c = LogSerializers().Lookup(tag); return c ? c->Factory()() : nullptr; } + +bool Manager::ListenWebSocket(const websocket::detail::ServerOptions& options) { + WebSocketServerKey key{options.host, options.port}; + + if ( websocket_servers.count(key) != 0 ) { + const auto& entry = websocket_servers[key]; + if ( entry.options == options ) + return true; + + zeek::emit_builtin_error(zeek::util::fmt("Already listening on %s:%d", options.host.c_str(), options.port)); + return false; + } + + auto server = + websocket::detail::StartServer(std::make_unique(), options); + + if ( ! server ) + return false; + + websocket_servers.insert({key, WebSocketServerEntry{options, std::move(server)}}); + return true; +} diff --git a/src/cluster/Manager.h b/src/cluster/Manager.h index 3e64924039..d1cc8628fb 100644 --- a/src/cluster/Manager.h +++ b/src/cluster/Manager.h @@ -2,10 +2,12 @@ #pragma once +#include #include #include "zeek/cluster/Component.h" #include "zeek/cluster/Serializer.h" +#include "zeek/cluster/websocket/WebSocket.h" #include "zeek/plugin/ComponentManager.h" namespace zeek::cluster { @@ -19,6 +21,15 @@ namespace zeek::cluster { class Manager { public: Manager(); + ~Manager(); + + /** + * Terminate the cluster manager. + * + * This shuts down any WebSocket servers that were started + * at termination time. + */ + void Terminate(); /** * Instantiate a cluster backend with the given enum value and @@ -69,10 +80,26 @@ public: */ plugin::ComponentManager& LogSerializers() { return log_serializers; }; + /** + * Start a WebSocket server for the given address and port pair. + * + * @param options The options to use for the WebSocket server. + * + * @return True on success, else false. + */ + bool ListenWebSocket(const websocket::detail::ServerOptions& options); + private: plugin::ComponentManager backends; plugin::ComponentManager event_serializers; plugin::ComponentManager log_serializers; + + using WebSocketServerKey = std::pair; + struct WebSocketServerEntry { + websocket::detail::ServerOptions options; + std::unique_ptr server; + }; + std::map websocket_servers; }; // This manager instance only exists for plugins to register components, diff --git a/src/cluster/cluster.bif b/src/cluster/cluster.bif index ec2c8f115e..cd39cde68b 100644 --- a/src/cluster/cluster.bif +++ b/src/cluster/cluster.bif @@ -3,6 +3,8 @@ #include "zeek/cluster/Backend.h" #include "zeek/cluster/BifSupport.h" +#include "zeek/cluster/Manager.h" +#include "zeek/cluster/websocket/WebSocket.h" using namespace zeek::cluster::detail::bif; @@ -11,6 +13,7 @@ using namespace zeek::cluster::detail::bif; module Cluster; type Cluster::Event: record; +type Cluster::WebSocketTLSOptions: record; ## Publishes an event to a given topic. ## @@ -147,3 +150,54 @@ function Cluster::publish_hrw%(pool: Pool, key: any, ...%): bool ScriptLocationScope scope{frame}; return publish_event(topic, args); %} + +function Cluster::__listen_websocket%(options: WebSocketServerOptions%): bool + %{ + using namespace zeek::cluster::websocket::detail; + + const auto& server_options_type = zeek::id::find_type("Cluster::WebSocketServerOptions"); + const auto& tls_options_type = zeek::id::find_type("Cluster::WebSocketTLSOptions"); + + if ( options->GetType() != server_options_type ) { + zeek::emit_builtin_error("expected type Cluster::WebSocketServerOptions for options"); + return zeek::val_mgr->False(); + } + + auto options_rec = zeek::IntrusivePtr{zeek::NewRef{}, options->AsRecordVal()}; + auto tls_options_rec = options_rec->GetFieldOrDefault("tls_options"); + + if ( tls_options_rec->GetType() != tls_options_type ) { + zeek::emit_builtin_error("expected type Cluster::WebSocketTLSOptions for tls_options"); + return zeek::val_mgr->False(); + } + + bool have_cert = tls_options_rec->HasField("cert_file"); + bool have_key = tls_options_rec->HasField("key_file"); + + if ( (have_cert || have_key) && ! (have_cert && have_key) ) { + std::string error = "Invalid tls_options: "; + if ( have_cert ) + error += "No key_file field"; + else + error += "No cert_file field"; + zeek::emit_builtin_error(error.c_str()); + return zeek::val_mgr->False(); + } + + struct TLSOptions tls_options = { + have_cert ? std::optional{tls_options_rec->GetField("cert_file")->ToStdString()} : std::nullopt, + have_key ? std::optional{tls_options_rec->GetField("key_file")->ToStdString()} : std::nullopt, + tls_options_rec->GetFieldOrDefault("enable_peer_verification")->Get(), + tls_options_rec->GetFieldOrDefault("ca_file")->ToStdString(), + tls_options_rec->GetFieldOrDefault("ciphers")->ToStdString(), + }; + + struct ServerOptions server_options { + options_rec->GetField("listen_host")->ToStdString(), + static_cast(options_rec->GetField("listen_port")->Port()), + }; + server_options.tls_options = std::move(tls_options); + + auto result = zeek::cluster::manager->ListenWebSocket(server_options); + return zeek::val_mgr->Bool(result); + %} diff --git a/src/cluster/websocket/CMakeLists.txt b/src/cluster/websocket/CMakeLists.txt new file mode 100644 index 0000000000..1c41cf14b8 --- /dev/null +++ b/src/cluster/websocket/CMakeLists.txt @@ -0,0 +1,8 @@ +add_subdirectory(auxil) + +zeek_add_plugin( + Zeek Cluster_WebSocket + INCLUDE_DIRS ${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR} + DEPENDENCIES ixwebsocket::ixwebsocket + SOURCES Plugin.cc WebSocket.cc WebSocket-IXWebSocket.cc + BIFS events.bif) diff --git a/src/cluster/websocket/Plugin.cc b/src/cluster/websocket/Plugin.cc new file mode 100644 index 0000000000..89a6e9d21f --- /dev/null +++ b/src/cluster/websocket/Plugin.cc @@ -0,0 +1,19 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#include "zeek/cluster/websocket/Plugin.h" + +namespace zeek::plugin::Cluster_WebSocket { +// Definition of plugin. +Plugin plugin; +}; // namespace zeek::plugin::Cluster_WebSocket + +namespace zeek::plugin::Cluster_WebSocket { + +zeek::plugin::Configuration Plugin::Configure() { + zeek::plugin::Configuration config; + config.name = "Zeek::Cluster_WebSocket"; + config.description = "Provides WebSocket access to a Zeek cluster"; + return config; +} + +} // namespace zeek::plugin::Cluster_WebSocket diff --git a/src/cluster/websocket/Plugin.h b/src/cluster/websocket/Plugin.h new file mode 100644 index 0000000000..97e2a15db8 --- /dev/null +++ b/src/cluster/websocket/Plugin.h @@ -0,0 +1,16 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#pragma once + +#include "zeek/plugin/Plugin.h" + +namespace zeek::plugin::Cluster_WebSocket { + +class Plugin : public zeek::plugin::Plugin { +public: + zeek::plugin::Configuration Configure() override; +}; + +extern Plugin plugin; + +} // namespace zeek::plugin::Cluster_WebSocket diff --git a/src/cluster/websocket/README b/src/cluster/websocket/README new file mode 100644 index 0000000000..5f80cc6a64 --- /dev/null +++ b/src/cluster/websocket/README @@ -0,0 +1,5 @@ +A Zeek Cluster's WebSocket interface +==================================== + +This directory contains code that allows external applications +to connect to Zeek using WebSocket connections. diff --git a/src/cluster/websocket/WebSocket-IXWebSocket.cc b/src/cluster/websocket/WebSocket-IXWebSocket.cc new file mode 100644 index 0000000000..d2b8cb289b --- /dev/null +++ b/src/cluster/websocket/WebSocket-IXWebSocket.cc @@ -0,0 +1,170 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +// Implementation of a WebSocket server and clients using the IXWebSocket client library. +#include "zeek/cluster/websocket/WebSocket.h" + +#include +#include + +#include "zeek/Reporter.h" + +#include "ixwebsocket/IXConnectionState.h" +#include "ixwebsocket/IXSocketTLSOptions.h" +#include "ixwebsocket/IXWebSocket.h" +#include "ixwebsocket/IXWebSocketSendData.h" +#include "ixwebsocket/IXWebSocketServer.h" + +namespace zeek::cluster::websocket::detail::ixwebsocket { + +/** + * Implementation of WebSocketClient for the IXWebsocket library. + */ +class IxWebSocketClient : public WebSocketClient { +public: + IxWebSocketClient(std::shared_ptr cs, std::shared_ptr ws) + : cs(std::move(cs)), ws(std::move(ws)) { + if ( ! this->cs || ! this->ws ) + throw std::invalid_argument("expected ws and cs to be set"); + } + + bool IsTerminated() const override { + if ( cs->isTerminated() ) + return true; + + auto rs = ws->getReadyState(); + return rs == ix::ReadyState::Closing || rs == ix::ReadyState::Closed; + } + + void Close(uint16_t code, const std::string& reason) override { ws->close(code, reason); } + + SendInfo SendText(std::string_view sv) override { + if ( cs->isTerminated() ) + return {true}; // small lie + + auto send_info = ws->sendUtf8Text(ix::IXWebSocketSendData{sv.data(), sv.size()}); + return SendInfo{send_info.success}; + } + + const std::string& getId() override { return cs->getId(); } + const std::string& getRemoteIp() override { return cs->getRemoteIp(); } + int getRemotePort() override { return cs->getRemotePort(); } + +private: + std::shared_ptr cs; + std::shared_ptr ws; +}; + +/** + * Implementation of WebSocketServer using the IXWebsocket library. + */ +class IXWebSocketServer : public WebSocketServer { +public: + IXWebSocketServer(std::unique_ptr dispatcher, std::unique_ptr server) + : WebSocketServer(std::move(dispatcher)), server(std::move(server)) {} + +private: + void DoTerminate() override { + // Stop the server. + server->stop(); + } + + std::unique_ptr server; +}; + +std::unique_ptr StartServer(std::unique_ptr dispatcher, + const ServerOptions& options) { + auto server = + std::make_unique(options.port, options.host, ix::SocketServer::kDefaultTcpBacklog, + options.max_connections, + ix::WebSocketServer::kDefaultHandShakeTimeoutSecs, + ix::SocketServer::kDefaultAddressFamily, options.ping_interval_seconds); + + if ( ! options.per_message_deflate ) + server->disablePerMessageDeflate(); + + const auto& tls_options = options.tls_options; + if ( tls_options.TlsEnabled() ) { + ix::SocketTLSOptions ix_tls_options{}; + ix_tls_options.tls = true; + ix_tls_options.certFile = tls_options.cert_file.value(); + ix_tls_options.keyFile = tls_options.key_file.value(); + + if ( tls_options.enable_peer_verification ) { + if ( ! tls_options.ca_file.empty() ) + ix_tls_options.caFile = tls_options.ca_file; + } + else { + // This is the IXWebSocket library's way of + // disabling peer verification. + ix_tls_options.caFile = "NONE"; + } + + if ( ! tls_options.ciphers.empty() ) + ix_tls_options.ciphers = tls_options.ciphers; + + server->setTLSOptions(ix_tls_options); + } + + // Using the legacy IXWebsocketAPI API to acquire a shared_ptr to the ix::WebSocket instance. + ix::WebSocketServer::OnConnectionCallback connection_callback = + [dispatcher = dispatcher.get()](std::weak_ptr websocket, + std::shared_ptr cs) -> void { + // Hold a shared_ptr to the WebSocket object until we see the close. + std::shared_ptr ws = websocket.lock(); + + // Client already gone or terminated? Weird... + if ( ! ws || cs->isTerminated() ) + return; + + auto id = cs->getId(); + int remotePort = cs->getRemotePort(); + std::string remoteIp = cs->getRemoteIp(); + + auto ixws = std::make_shared(std::move(cs), ws); + + // These callbacks run in per client threads. The actual processing happens + // on the main thread via a single WebSocketDemux instance. + ix::OnMessageCallback message_callback = [dispatcher, id, remotePort, remoteIp, + ixws](const ix::WebSocketMessagePtr& msg) mutable { + if ( msg->type == ix::WebSocketMessageType::Open ) { + dispatcher->QueueForProcessing( + WebSocketOpen{id, msg->openInfo.uri, msg->openInfo.protocol, std::move(ixws)}); + } + else if ( msg->type == ix::WebSocketMessageType::Message ) { + dispatcher->QueueForProcessing(WebSocketMessage{id, msg->str}); + } + else if ( msg->type == ix::WebSocketMessageType::Close ) { + dispatcher->QueueForProcessing(WebSocketClose{id}); + } + else if ( msg->type == ix::WebSocketMessageType::Error ) { + dispatcher->QueueForProcessing(WebSocketClose{id}); + } + }; + + ws->setOnMessageCallback(message_callback); + }; + + server->setOnConnectionCallback(connection_callback); + + const auto [success, reason] = server->listen(); + if ( ! success ) { + zeek::reporter->Error("WebSocket: Unable to listen on %s:%d: %s", options.host.c_str(), options.port, + reason.c_str()); + return nullptr; + } + + server->start(); + + return std::make_unique(std::move(dispatcher), std::move(server)); +} + + +} // namespace zeek::cluster::websocket::detail::ixwebsocket + +using namespace zeek::cluster::websocket::detail; + +std::unique_ptr zeek::cluster::websocket::detail::StartServer( + std::unique_ptr dispatcher, const ServerOptions& options) { + // Just delegate to the above IXWebSocket specific implementation. + return ixwebsocket::StartServer(std::move(dispatcher), options); +} diff --git a/src/cluster/websocket/WebSocket.cc b/src/cluster/websocket/WebSocket.cc new file mode 100644 index 0000000000..115db9aec8 --- /dev/null +++ b/src/cluster/websocket/WebSocket.cc @@ -0,0 +1,504 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +// Implement Broker's WebSocket client handling in Zeek. + +#include "zeek/cluster/websocket/WebSocket.h" + +#include +#include +#include + +#include "zeek/Reporter.h" +#include "zeek/cluster/Backend.h" +#include "zeek/cluster/BifSupport.h" +#include "zeek/cluster/Manager.h" +#include "zeek/cluster/OnLoop.h" +#include "zeek/cluster/Serializer.h" +#include "zeek/cluster/serializer/broker/Serializer.h" +#include "zeek/cluster/websocket/Plugin.h" +#include "zeek/cluster/websocket/events.bif.h" +#include "zeek/net_util.h" +#include "zeek/threading/MsgThread.h" + +#include "broker/data.bif.h" +#include "broker/data_envelope.hh" +#include "broker/error.hh" +#include "broker/format/json.hh" +#include "broker/zeek.hh" +#include "rapidjson/document.h" +#include "rapidjson/rapidjson.h" + + +#define WS_DEBUG(...) PLUGIN_DBG_LOG(zeek::plugin::Cluster_WebSocket::plugin, __VA_ARGS__) + +namespace zeek { +const char* zeek_version(); +} + +using namespace zeek::cluster::websocket::detail; + +namespace { + +class WebSocketEventHandlingStrategy : public zeek::cluster::detail::EventHandlingStrategy { +public: + WebSocketEventHandlingStrategy(std::shared_ptr ws, WebSocketEventDispatcher* dispatcher) + : wsc(std::move(ws)), dispatcher(dispatcher) {} + +private: + /** + * Any received remote event is encoded into Broker's JSON v1 format and + * send over to the WebSocket client. + * + * We leverage low-level Broker encoding functions here directly. This + * will need some abstractions if client's can opt to use different encodings + * of events in the future. + */ + bool DoHandleRemoteEvent(std::string_view topic, zeek::cluster::detail::Event e) override { + // If the client has left, no point in sending it any pending event. + if ( wsc->IsTerminated() ) + return true; + + + // Any events received from the backend before an Ack was sent + // are discarded. + if ( ! wsc->IsAcked() ) + return true; + + // XXX The serialization is somewhat slow, it would be good to offload + // it to a thread, or try to go from Val's directly to JSON and see + // if that's faster. + auto ev = zeek::cluster::detail::to_broker_event(e); + if ( ! ev ) { + fprintf(stderr, "[ERROR] Unable to go from detail::Event to broker::event\n"); + return false; + } + + buffer.clear(); + auto envelope = broker::data_envelope::make(topic, ev->as_data()); + broker::format::json::v1::encode(envelope, std::back_inserter(buffer)); + + dispatcher->QueueReply(WebSocketSendReply{wsc, buffer}); + return true; + } + + /** + * Events from backends aren't enqueued into the event loop when + * running for WebSocket clients. + */ + void DoEnqueueLocalEvent(zeek::EventHandlerPtr h, zeek::Args args) override {} + + std::string buffer; + std::shared_ptr wsc; + WebSocketEventDispatcher* dispatcher; +}; + +class ReplyInputMessage : public zeek::threading::BasicInputMessage { +public: + ReplyInputMessage(WebSocketReply work) : zeek::threading::BasicInputMessage("ReplyInput"), work(std::move(work)) {}; + bool Process() override { + return std::visit([this](auto& item) -> bool { return Process(item); }, work); + }; + +private: + bool Process(const WebSocketSendReply& sr) { + const auto& wsc = sr.wsc; + if ( wsc->IsTerminated() ) + return true; + + auto send_info = wsc->SendText(sr.msg); + if ( ! send_info.success && ! wsc->IsTerminated() ) + fprintf(stderr, "[ERROR] Failed to send reply to WebSocket client %s (%s:%d)\n", wsc->getId().c_str(), + wsc->getRemoteIp().c_str(), wsc->getRemotePort()); + + return true; + } + + bool Process(const WebSocketCloseReply& cr) { + const auto& wsc = cr.wsc; + if ( ! wsc->IsTerminated() ) + wsc->Close(cr.code, cr.reason); + + return true; + } + + WebSocketReply work; +}; + +} // namespace + + +// Inspired by broker/internal/json_client.cc +WebSocketClient::SendInfo WebSocketClient::SendError(std::string_view code, std::string_view message) { + std::string buf; + buf.reserve(code.size() + message.size() + 32); + auto out = std::back_inserter(buf); + *out++ = '{'; + broker::format::json::v1::append_field("type", "error", out); + *out++ = ','; + broker::format::json::v1::append_field("code", code, out); + *out++ = ','; + broker::format::json::v1::append_field("message", message, out); + *out++ = '}'; + return SendText(buf); +} + +// Inspired by broker/internal/json_client.cc +WebSocketClient::SendInfo WebSocketClient::SendAck(std::string_view endpoint, std::string_view version) { + std::string buf; + buf.reserve(endpoint.size() + version.size() + 32); + auto out = std::back_inserter(buf); + *out++ = '{'; + broker::format::json::v1::append_field("type", "ack", out); + *out++ = ','; + broker::format::json::v1::append_field("endpoint", endpoint, out); + *out++ = ','; + broker::format::json::v1::append_field("version", version, out); + *out++ = '}'; + auto r = SendText(buf); + acked = true; + return r; +} + +void WebSocketClient::SetSubscriptions(const std::vector& topic_prefixes) { + for ( const auto& topic_prefix : topic_prefixes ) + subscriptions_state[topic_prefix] = false; +} + +void WebSocketClient::SetSubscriptionActive(const std::string& topic_prefix) { + if ( subscriptions_state.count(topic_prefix) == 0 ) { + zeek::reporter->InternalWarning("Unknown topic_prefix for WebSocket client %s!", topic_prefix.c_str()); + return; + } + + subscriptions_state[topic_prefix] = true; +} + +bool WebSocketClient::AllSubscriptionsActive() const { + for ( const auto& [_, status] : subscriptions_state ) { + if ( ! status ) + return false; + } + + return true; +} + +const std::vector WebSocketClient::GetSubscriptions() const { + std::vector subs; + subs.reserve(subscriptions_state.size()); + + for ( const auto& [topic, _] : subscriptions_state ) + subs.emplace_back(topic); + + return subs; +} + +class zeek::cluster::websocket::detail::ReplyMsgThread : public zeek::threading::MsgThread { +public: + ReplyMsgThread() : zeek::threading::MsgThread() { SetName("ws-reply-thread"); } + + void Run() override { + zeek::util::detail::set_thread_name("zk/ws-reply-thread"); + MsgThread::Run(); + } + + bool OnHeartbeat(double network_time, double current_time) override { return true; } + + bool OnFinish(double network_time) override { return true; } +}; + +WebSocketEventDispatcher::WebSocketEventDispatcher() { + onloop = + new zeek::detail::OnLoopProcess(this, "WebSocketEventDispatcher"); + // Register the onloop instance the IO loop. Lifetime will be managed by the loop. + onloop->Register(false); + + reply_msg_thread = new ReplyMsgThread(); + reply_msg_thread->Start(); +} + +WebSocketEventDispatcher::~WebSocketEventDispatcher() { + // Freed by threading manager. + reply_msg_thread = nullptr; +} + +void WebSocketEventDispatcher::Terminate() { + WS_DEBUG("Terminating WebSocketEventDispatcher"); + + for ( auto& [_, client] : clients ) { + const auto& wsc = client.wsc; + const auto& backend = client.backend; + WS_DEBUG("Sending close to WebSocket client %s (%s:%d)", wsc->getId().c_str(), wsc->getRemoteIp().c_str(), + wsc->getRemotePort()); + + QueueReply(WebSocketCloseReply{wsc, 1001, "Terminating"}); + backend->Terminate(); + } + + clients.clear(); +} + +void WebSocketEventDispatcher::QueueForProcessing(WebSocketEvent&& event) { + // Just delegate to onloop. The work will be done in Process() + onloop->QueueForProcessing(std::move(event)); +} + +void WebSocketEventDispatcher::QueueReply(WebSocketReply&& reply) { + // Delegate to the reply thread. + reply_msg_thread->SendIn(new ReplyInputMessage(std::move(reply))); +} + +// WebSocketDemux::Process() runs in the main thread. +// +// XXX: How is this going to work with class broker? With +// ZeroMQ, each WebSocket client has its own XPUB/XSUB +// connectivity to a central broker and similarly with NATS. +// But with broker we need to do something different. +// Maybe connect to the local endpoint. +// +// We cannot actually instantiate a Broker backend :-( +// +// We could also have InitPostScript() recognize Broker +// and start its internal server instead. +void WebSocketEventDispatcher::Process(const WebSocketEvent& event) { + std::visit([this](auto&& arg) { Process(arg); }, event); +} + +void WebSocketEventDispatcher::Process(const WebSocketOpen& open) { + const auto& wsc = open.wsc; + const auto& id = open.id; + const auto& it = clients.find(id); + if ( it != clients.end() ) { + // This shouldn't happen! + reporter->Error("Open for existing WebSocket client with id %s!", id.c_str()); + QueueReply(WebSocketCloseReply{wsc, 1001, "Internal error"}); + return; + } + + // As of now, terminate clients coming to anything other than /v1/messages/json. + if ( open.uri != "/v1/messages/json" ) { + open.wsc->SendError("invalid_uri", "Invalid URI - use /v1/messages/json"); + open.wsc->Close(1008, "Invalid URI - use /v1/messages/json"); + + // Still create an entry as we might see messages and close events coming in. + clients[id] = WebSocketClientEntry{id, wsc, nullptr}; + return; + } + + // Generate an ID for this client. + auto ws_id = cluster::backend->NodeId() + "-websocket-" + id; + + const auto& event_serializer_val = id::find_val("Cluster::event_serializer"); + auto event_serializer = cluster::manager->InstantiateEventSerializer(event_serializer_val); + const auto& cluster_backend_val = id::find_val("Cluster::backend"); + auto event_handling_strategy = std::make_unique(wsc, this); + auto backend = zeek::cluster::manager->InstantiateBackend(cluster_backend_val, std::move(event_serializer), nullptr, + std::move(event_handling_strategy)); + + WS_DEBUG("New WebSocket client %s (%s:%d) - using id %s backend=%p", id.c_str(), wsc->getRemoteIp().c_str(), + wsc->getRemotePort(), ws_id.c_str(), backend.get()); + + // XXX: We call InitPostScript to populate member vars required for connectivity. + backend->InitPostScript(); + backend->Init(std::move(ws_id)); + + clients[id] = WebSocketClientEntry{id, wsc, std::move(backend)}; +} + +void WebSocketEventDispatcher::Process(const WebSocketClose& close) { + const auto& id = close.id; + const auto& it = clients.find(id); + + if ( it == clients.end() ) { + reporter->Error("Close from non-existing WebSocket client with id %s!", id.c_str()); + return; + } + + auto& wsc = it->second.wsc; + auto& backend = it->second.backend; + + WS_DEBUG("Close from client %s (%s:%d) backend=%p", wsc->getId().c_str(), wsc->getRemoteIp().c_str(), + wsc->getRemotePort(), backend.get()); + + // If the client doesn't have a backend, it wasn't ever properly instantiated. + if ( backend ) { + auto rec = zeek::cluster::detail::bif::make_endpoint_info(backend->NodeId(), wsc->getRemoteIp(), + wsc->getRemotePort(), TRANSPORT_TCP); + zeek::event_mgr.Enqueue(Cluster::websocket_client_lost, std::move(rec)); + + backend->Terminate(); + } + + clients.erase(it); +} + +// SubscribeFinished is produced internally. +void WebSocketEventDispatcher::Process(const WebSocketSubscribeFinished& fin) { + const auto& it = clients.find(fin.id); + if ( it == clients.end() ) { + reporter->Error("Subscribe finished from non-existing WebSocket client with id %s!", fin.id.c_str()); + return; + } + + auto& entry = it->second; + auto& wsc = entry.wsc; + + entry.wsc->SetSubscriptionActive(fin.topic_prefix); + + if ( ! entry.wsc->AllSubscriptionsActive() ) { + // More subscriptions to come. + return; + } + + auto rec = zeek::cluster::detail::bif::make_endpoint_info(backend->NodeId(), wsc->getRemoteIp(), + wsc->getRemotePort(), TRANSPORT_TCP); + auto subscriptions_vec = zeek::cluster::detail::bif::make_string_vec(wsc->GetSubscriptions()); + zeek::event_mgr.Enqueue(Cluster::websocket_client_added, std::move(rec), std::move(subscriptions_vec)); + + entry.wsc->SendAck(entry.backend->NodeId(), zeek::zeek_version()); + + WS_DEBUG("Sent Ack to %s %s\n", fin.id.c_str(), entry.backend->NodeId().c_str()); + + // Process any queued messages now. + for ( auto& msg : entry.queue ) { + assert(entry.msg_count > 1); + Process(msg); + } +} + +void WebSocketEventDispatcher::HandleSubscriptions(WebSocketClientEntry& entry, std::string_view buf) { + rapidjson::Document doc; + doc.Parse(buf.data(), buf.size()); + if ( ! doc.IsArray() ) { + entry.wsc->SendError(broker::enum_str(broker::ec::deserialization_failed), "subscriptions not an array"); + return; + } + + std::vector subscriptions; + + for ( rapidjson::SizeType i = 0; i < doc.Size(); i++ ) { + if ( ! doc[i].IsString() ) { + entry.wsc->SendError(broker::enum_str(broker::ec::deserialization_failed), + "individual subscription not a string"); + return; + } + + subscriptions.emplace_back(doc[i].GetString()); + } + + entry.wsc->SetSubscriptions(subscriptions); + + auto cb = [this, id = entry.id, wsc = entry.wsc](const std::string& topic, + const Backend::SubscriptionCallbackInfo& info) { + if ( info.status == Backend::CallbackStatus::Error ) { + zeek::reporter->Error("Subscribe for WebSocket client failed!"); + + // Is this going to work out? + QueueReply(WebSocketCloseReply{wsc, 1011, "Could not subscribe. Something bad happened!"}); + } + else { + Process(WebSocketSubscribeFinished{id, topic}); + } + }; + + for ( const auto& subscription : subscriptions ) { + if ( ! entry.backend->Subscribe(subscription, cb) ) { + zeek::reporter->Error("Subscribe for WebSocket client failed!"); + QueueReply(WebSocketCloseReply{entry.wsc, 1011, "Could not subscribe. Something bad happened!"}); + } + } +} + +void WebSocketEventDispatcher::HandleEvent(WebSocketClientEntry& entry, std::string_view buf) { + // Unserialize the message as an event. + broker::variant res; + auto err = broker::format::json::v1::decode(buf, res); + if ( err ) { + entry.wsc->SendError(broker::enum_str(broker::ec::deserialization_failed), "failed to decode JSON object"); + return; + } + + std::string topic = std::string(res->shared_envelope()->topic()); + + if ( topic == broker::topic::reserved ) { + entry.wsc->SendError(broker::enum_str(broker::ec::deserialization_failed), "no topic in top-level JSON object"); + return; + } + + broker::zeek::Event broker_ev(std::move(res)); + + // This is not guaranteed to work! If the node running the WebSocket + // API does not have the declaration of the event that another node + // is sending, it cannot instantiate the zeek::cluster::Event for + // re-publishing to a cluster backend. + // + // Does that make conceptional sense? Basically the WebSocket API + // has Zeek-script awareness. + // + // It works with Broker today because Broker treats messages opaquely. + // It knows how to convert from JSON into Broker binary format as these + // are compatible. + // + // However, the broker format is under specified (vectors are used for various + // types without being tagged explicitly), so it's not possible to determine + // the final Zeek type without having access to the script-layer. + // + // I'm not sure this is a real problem, other than it being unfortunate that + // the Zeek process running the WebSocket API requires access to all declarations + // of events being transmitted via WebSockets. Though this might be a given anyhow. + // + // See broker/Data.cc for broker::vector conversion to see the collisions: + // vector, list, func, record, pattern, opaque are all encoded using + // broker::vector rather than dedicated types. + // + // Switching to a JSON v2 format that ensures all Zeek types are represented + // explicitly would help. + const auto& zeek_ev = cluster::detail::to_zeek_event(broker_ev); + if ( ! zeek_ev ) { + entry.wsc->SendError(broker::enum_str(broker::ec::deserialization_failed), "failed to create Zeek event"); + return; + } + + WS_DEBUG("Publishing event %s to topic '%s'", std::string(zeek_ev->HandlerName()).c_str(), topic.c_str()); + entry.backend->PublishEvent(topic, *zeek_ev); +} + +// Process a WebSocket message from a client. +// +// If it's the first message, the code is expecting a subscriptions +// array, otherwise it'll be a remote event. +void WebSocketEventDispatcher::Process(const WebSocketMessage& msg) { + const auto& id = msg.id; + + const auto& it = clients.find(id); + if ( it == clients.end() ) { + reporter->Error("WebSocket message from non-existing WebSocket client %s", id.c_str()); + return; + } + + // Client without backend wasn't accepted, just discard its message. + if ( ! it->second.backend ) + return; + + auto& entry = it->second; + const auto& wsc = entry.wsc; + entry.msg_count++; + + WS_DEBUG("Message %" PRIu64 " size=%zu from %s (%s:%d) backend=%p", entry.msg_count, msg.msg.size(), + wsc->getId().c_str(), wsc->getRemoteIp().c_str(), wsc->getRemotePort(), entry.backend.get()); + + // First message is the subscription message. + if ( entry.msg_count == 1 ) { + WS_DEBUG("Subscriptions from client: %s: (%s:%d)\n", id.c_str(), wsc->getRemoteIp().c_str(), + wsc->getRemotePort()); + HandleSubscriptions(entry, msg.msg); + } + else { + if ( ! wsc->IsAcked() ) { + WS_DEBUG("Client sending messages before receiving ack!"); + entry.queue.push_back(msg); + return; + } + + HandleEvent(entry, msg.msg); + } +} diff --git a/src/cluster/websocket/WebSocket.h b/src/cluster/websocket/WebSocket.h new file mode 100644 index 0000000000..8d8e51ceee --- /dev/null +++ b/src/cluster/websocket/WebSocket.h @@ -0,0 +1,321 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace zeek { + +namespace detail { + +template +class OnLoopProcess; +} + +namespace cluster { + +class Backend; + +namespace websocket::detail { + + +/** + * Library independent interface for a WebSocket client. + * + * All methods should be safe to be called from Zeek's + * main thread, though some may fail if the client has vanished + * or vanishes during an operation. + */ +class WebSocketClient { +public: + virtual ~WebSocketClient() = default; + + /** + * @returns true if the WebSocket client has terminated + */ + virtual bool IsTerminated() const = 0; + + /** + * Close the WebSocket connection with the given code/reason. + */ + virtual void Close(uint16_t code = 1000, const std::string& reason = "Normal closure") = 0; + + /** + * Information about the send operation. + */ + struct SendInfo { + bool success; + }; + + /** + * Thread safe sending. + * + * This might be called from Zeek's main thread and + * must be safe to be called whether or not the connection + * with the client is still alive. + * + * @param sv The buffer to send as a WebSocket message. + */ + virtual SendInfo SendText(std::string_view sv) = 0; + + /** + * Send an error in Broker JSON/v1 format to the client. + */ + SendInfo SendError(std::string_view code, std::string_view ctx); + + /** + * Send an ACK message Broker JSON/v1 format to the client. + */ + SendInfo SendAck(std::string_view endpoint, std::string_view version); + + /** + * @return - has an ACK been sent to the client? + */ + bool IsAcked() const { return acked; } + + /** + * @return The WebSocket client's identifier. + */ + virtual const std::string& getId() = 0; + + /** + * @return The WebSocket client's remote IP address. + */ + virtual const std::string& getRemoteIp() = 0; + + /** + * @return The WebSocket client's remote port. + */ + virtual int getRemotePort() = 0; + + /** + * Store the client's subscriptions as "not active". + */ + void SetSubscriptions(const std::vector& topic_prefixes); + + /** + * @return The client's subscriptions. + */ + const std::vector GetSubscriptions() const; + + /** + * Store the client's subscriptions as "not active". + */ + void SetSubscriptionActive(const std::string& topic_prefix); + + /** + * @return true if all subscriptions have an active status. + */ + bool AllSubscriptionsActive() const; + +private: + bool acked = false; + std::map subscriptions_state; +}; + +// An new WebSocket client connected. Client is locally identified by `id`. +struct WebSocketOpen { + std::string id; + std::string uri; + std::string protocol; + std::shared_ptr wsc; +}; + +// A WebSocket client disconnected. +struct WebSocketClose { + std::string id; +}; + +// A WebSocket client send a message. +struct WebSocketMessage { + std::string id; + std::string msg; +}; + +// Produced internally when a WebSocket client's +// subscription has completed. +struct WebSocketSubscribeFinished { + std::string id; + std::string topic_prefix; +}; + +using WebSocketEvent = std::variant; + +struct WebSocketSendReply { + std::shared_ptr wsc; + std::string msg; +}; + +struct WebSocketCloseReply { + std::shared_ptr wsc; + uint16_t code = 1000; + std::string reason = "Normal closure"; +}; + +using WebSocketReply = std::variant; + + +class ReplyMsgThread; + +/** + * Handle events produced by WebSocket clients. + * + * Any thread may call QueueForProcessing(). Process() runs + * on Zeek's main thread. + */ +class WebSocketEventDispatcher { +public: + WebSocketEventDispatcher(); + + ~WebSocketEventDispatcher(); + + /** + * Called shutting down a WebSocket server. + */ + void Terminate(); + + /** + * Queue the given WebSocket event to be processed on Zeek's main loop. + * + * @param work The WebSocket event to process. + */ + void QueueForProcessing(WebSocketEvent&& event); + + /** + * Send a reply to the given websocket client. + * + * The dispatcher has an internal thread for serializing + * and sending out the event. + */ + void QueueReply(WebSocketReply&& reply); + +private: + /** + * Main processing function of the dispatcher. + * + * This runs on Zeek's main thread. + */ + void Process(const WebSocketEvent& event); + + void Process(const WebSocketOpen& open); + void Process(const WebSocketSubscribeFinished& fin); + void Process(const WebSocketMessage& msg); + void Process(const WebSocketClose& close); + + + /** + * Data structure for tracking WebSocket clients. + */ + struct WebSocketClientEntry { + std::string id; + std::shared_ptr wsc; + std::shared_ptr backend; + uint64_t msg_count = 0; + std::list queue; + }; + + + void HandleSubscriptions(WebSocketClientEntry& entry, std::string_view buf); + void HandleEvent(WebSocketClientEntry& entry, std::string_view buf); + + // Allow access to Process(WebSocketEvent) + friend zeek::detail::OnLoopProcess; + + // Clients that this dispatcher is tracking. + std::map clients; + + // Connector to the IO loop. + zeek::detail::OnLoopProcess* onloop = nullptr; + + // Thread replying to clients. Zeek's threading manager takes ownership. + ReplyMsgThread* reply_msg_thread = nullptr; +}; + +/** + * An abstract WebSocket server. + */ +class WebSocketServer { +public: + WebSocketServer(std::unique_ptr demux) : dispatcher(std::move(demux)) {} + virtual ~WebSocketServer() = default; + + /** + * Stop this server. + */ + void Terminate() { + dispatcher->Terminate(); + + DoTerminate(); + } + + WebSocketEventDispatcher& Dispatcher() { return *dispatcher; } + +private: + /** + * Hook to be implemented when a server is terminated. + */ + virtual void DoTerminate() = 0; + + std::unique_ptr dispatcher; +}; + +/** + * TLS configuration for a WebSocket server. + */ +struct TLSOptions { + std::optional cert_file; + std::optional key_file; + bool enable_peer_verification = false; + std::string ca_file; + std::string ciphers; + + /** + * Is TLS enabled? + */ + bool TlsEnabled() const { return cert_file.has_value() && key_file.has_value(); } + + bool operator==(const TLSOptions& o) const { + return cert_file == o.cert_file && key_file == o.key_file && + enable_peer_verification == o.enable_peer_verification && ca_file == o.ca_file && ciphers == o.ciphers; + } +}; + +/** + * Options for a WebSocket server. + */ +struct ServerOptions { + std::string host; + uint16_t port = 0; + int ping_interval_seconds = 5; + int max_connections = 100; + bool per_message_deflate = false; + struct TLSOptions tls_options; + + bool operator==(const ServerOptions& o) const { + return host == o.host && port == o.port && ping_interval_seconds == o.ping_interval_seconds && + max_connections == o.max_connections && per_message_deflate == o.per_message_deflate && + tls_options == o.tls_options; + } +}; + + +/** + * Start a WebSocket server. + * + * @param dispatcher The dispatcher to use for the server. + * @param options Options for the server. + * + * @return Pointer to a new WebSocketServer instance or nullptr on error. + */ +std::unique_ptr StartServer(std::unique_ptr dispatcher, + const ServerOptions& options); + +} // namespace websocket::detail +} // namespace cluster +} // namespace zeek diff --git a/src/cluster/websocket/auxil/CMakeLists.txt b/src/cluster/websocket/auxil/CMakeLists.txt new file mode 100644 index 0000000000..5131dbf0ac --- /dev/null +++ b/src/cluster/websocket/auxil/CMakeLists.txt @@ -0,0 +1,7 @@ +option(IXWEBSOCKET_INSTALL "Install IXWebSocket" OFF) + +set(BUILD_SHARED_LIBS OFF) +set(USE_TLS ON) +set(USE_OPEN_SSL ON) + +add_subdirectory(IXWebSocket) diff --git a/src/cluster/websocket/events.bif b/src/cluster/websocket/events.bif new file mode 100644 index 0000000000..d730b42419 --- /dev/null +++ b/src/cluster/websocket/events.bif @@ -0,0 +1,13 @@ +module Cluster; + +## Generated when a new WebSocket client has connected. +## +## endpoint: Various information about the WebSocket client. +## +## subscriptions: The WebSocket client's subscriptions as provided in the handshake. +event websocket_client_added%(endpoint: EndpointInfo, subscriptions: string_vec%); + +## Generated when a WebSocket client was lost. +## +## endpoint: Various information about the WebSocket client. +event websocket_client_lost%(endpoint: EndpointInfo%); diff --git a/src/script_opt/FuncInfo.cc b/src/script_opt/FuncInfo.cc index 8b7a19b81f..8d836facb3 100644 --- a/src/script_opt/FuncInfo.cc +++ b/src/script_opt/FuncInfo.cc @@ -74,6 +74,7 @@ static std::unordered_map func_attrs = { {"Analyzer::__tag", ATTR_FOLDABLE}, {"Cluster::Backend::ZeroMQ::spawn_zmq_proxy_thread", ATTR_NO_SCRIPT_SIDE_EFFECTS}, {"Cluster::Backend::__init", ATTR_NO_SCRIPT_SIDE_EFFECTS}, + {"Cluster::__listen_websocket", ATTR_NO_SCRIPT_SIDE_EFFECTS}, {"Cluster::__subscribe", ATTR_NO_SCRIPT_SIDE_EFFECTS}, {"Cluster::__unsubscribe", ATTR_NO_SCRIPT_SIDE_EFFECTS}, {"Cluster::make_event", ATTR_NO_SCRIPT_SIDE_EFFECTS}, diff --git a/src/zeek-setup.cc b/src/zeek-setup.cc index 5af17f0d8f..05e4e0fb52 100644 --- a/src/zeek-setup.cc +++ b/src/zeek-setup.cc @@ -378,6 +378,7 @@ static void terminate_zeek() { notifier::detail::registry.Terminate(); log_mgr->Terminate(); input_mgr->Terminate(); + cluster::manager->Terminate(); thread_mgr->Terminate(); broker_mgr->Terminate(); diff --git a/testing/btest/Baseline.zam/cluster.websocket.listen-idempotent/.stderr b/testing/btest/Baseline.zam/cluster.websocket.listen-idempotent/.stderr new file mode 100644 index 0000000000..889b895763 --- /dev/null +++ b/testing/btest/Baseline.zam/cluster.websocket.listen-idempotent/.stderr @@ -0,0 +1,4 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +error in <...>/main.zeek, line 654: Already listening on 127.0.0.1: (Cluster::__listen_websocket(ws_opts_x)) +error in <...>/main.zeek, line 654: Already listening on 127.0.0.1: (Cluster::__listen_websocket(ws_opts_wss_port)) +received termination signal diff --git a/testing/btest/Baseline.zam/cluster.websocket.tls-usage-error/.stderr b/testing/btest/Baseline.zam/cluster.websocket.tls-usage-error/.stderr new file mode 100644 index 0000000000..0e09ca4de9 --- /dev/null +++ b/testing/btest/Baseline.zam/cluster.websocket.tls-usage-error/.stderr @@ -0,0 +1,3 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +error in <...>/main.zeek, line 654: Invalid tls_options: No key_file field (Cluster::__listen_websocket(Cluster::options.0)) +error in <...>/main.zeek, line 654: Invalid tls_options: No cert_file field (Cluster::__listen_websocket(Cluster::options.3)) diff --git a/testing/btest/Baseline/cluster.websocket.bad-event-args/..client..stderr b/testing/btest/Baseline/cluster.websocket.bad-event-args/..client..stderr new file mode 100644 index 0000000000..49d861c74c --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.bad-event-args/..client..stderr @@ -0,0 +1 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. diff --git a/testing/btest/Baseline/cluster.websocket.bad-event-args/..client.out b/testing/btest/Baseline/cluster.websocket.bad-event-args/..client.out new file mode 100644 index 0000000000..f9e01b1fea --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.bad-event-args/..client.out @@ -0,0 +1,7 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +Connected! +err1 {'type': 'error', 'code': 'deserialization_failed', 'message': 'failed to decode JSON object'} +err2 {'type': 'error', 'code': 'deserialization_failed', 'message': 'failed to create Zeek event'} +err3 {'type': 'error', 'code': 'deserialization_failed', 'message': 'failed to create Zeek event'} +pong {'@data-type': 'string', 'data': 'pong'} {'@data-type': 'vector', 'data': [{'@data-type': 'string', 'data': 'Hello'}, {'@data-type': 'count', 'data': 42}]} +err4 {'type': 'error', 'code': 'deserialization_failed', 'message': 'failed to decode JSON object'} diff --git a/testing/btest/Baseline/cluster.websocket.bad-event-args/..manager..stderr b/testing/btest/Baseline/cluster.websocket.bad-event-args/..manager..stderr new file mode 100644 index 0000000000..2141fb2cd1 --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.bad-event-args/..manager..stderr @@ -0,0 +1,4 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +error: Unserialize error 'ping' arg_types.size()=2 and args.size()=1 +error: Unserialize error for event 'ping': broker value '42' type 'count' to Zeek type 'string string' failed +received termination signal diff --git a/testing/btest/Baseline/cluster.websocket.bad-event-args/..manager.out b/testing/btest/Baseline/cluster.websocket.bad-event-args/..manager.out new file mode 100644 index 0000000000..78d2341e0b --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.bad-event-args/..manager.out @@ -0,0 +1,4 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +Cluster::websocket_client_added, [/zeek/event/my_topic] +got ping: Hello, 42 +Cluster::websocket_client_lost diff --git a/testing/btest/Baseline/cluster.websocket.bad-subscriptions/..client..stderr b/testing/btest/Baseline/cluster.websocket.bad-subscriptions/..client..stderr new file mode 100644 index 0000000000..49d861c74c --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.bad-subscriptions/..client..stderr @@ -0,0 +1 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. diff --git a/testing/btest/Baseline/cluster.websocket.bad-subscriptions/..client.out b/testing/btest/Baseline/cluster.websocket.bad-subscriptions/..client.out new file mode 100644 index 0000000000..228c6687c9 --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.bad-subscriptions/..client.out @@ -0,0 +1,5 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +broken array response {'type': 'error', 'code': 'deserialization_failed', 'message': 'subscriptions not an array'} +non string error {'type': 'error', 'code': 'deserialization_failed', 'message': 'individual subscription not a string'} +mix error {'type': 'error', 'code': 'deserialization_failed', 'message': 'individual subscription not a string'} +ack True diff --git a/testing/btest/Baseline/cluster.websocket.bad-subscriptions/..manager..stderr b/testing/btest/Baseline/cluster.websocket.bad-subscriptions/..manager..stderr new file mode 100644 index 0000000000..e3f6131b1d --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.bad-subscriptions/..manager..stderr @@ -0,0 +1,2 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +received termination signal diff --git a/testing/btest/Baseline/cluster.websocket.bad-subscriptions/..manager.out b/testing/btest/Baseline/cluster.websocket.bad-subscriptions/..manager.out new file mode 100644 index 0000000000..29ee473cda --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.bad-subscriptions/..manager.out @@ -0,0 +1,6 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +Cluster::websocket_client_lost, 1 +Cluster::websocket_client_lost, 2 +Cluster::websocket_client_lost, 3 +Cluster::websocket_client_added, 1, [/duplicate, /is/okay, /topic/good] +Cluster::websocket_client_lost, 4 diff --git a/testing/btest/Baseline/cluster.websocket.bad-url/..client..stderr b/testing/btest/Baseline/cluster.websocket.bad-url/..client..stderr new file mode 100644 index 0000000000..49d861c74c --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.bad-url/..client..stderr @@ -0,0 +1 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. diff --git a/testing/btest/Baseline/cluster.websocket.bad-url/..client.out b/testing/btest/Baseline/cluster.websocket.bad-url/..client.out new file mode 100644 index 0000000000..360a9982a2 --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.bad-url/..client.out @@ -0,0 +1,4 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +Connected ws_good! +Connected ws_bad! +Error for ws_bad {'type': 'error', 'code': 'invalid_uri', 'message': 'Invalid URI - use /v1/messages/json'} diff --git a/testing/btest/Baseline/cluster.websocket.bad-url/..manager..stderr b/testing/btest/Baseline/cluster.websocket.bad-url/..manager..stderr new file mode 100644 index 0000000000..e3f6131b1d --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.bad-url/..manager..stderr @@ -0,0 +1,2 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +received termination signal diff --git a/testing/btest/Baseline/cluster.websocket.bad-url/..manager.out b/testing/btest/Baseline/cluster.websocket.bad-url/..manager.out new file mode 100644 index 0000000000..725d3a7f80 --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.bad-url/..manager.out @@ -0,0 +1,3 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +Cluster::websocket_client_added, [hello-good] +Cluster::websocket_client_lost diff --git a/testing/btest/Baseline/cluster.websocket.cluster-log/..client..stderr b/testing/btest/Baseline/cluster.websocket.cluster-log/..client..stderr new file mode 100644 index 0000000000..49d861c74c --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.cluster-log/..client..stderr @@ -0,0 +1 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. diff --git a/testing/btest/Baseline/cluster.websocket.cluster-log/..client.out b/testing/btest/Baseline/cluster.websocket.cluster-log/..client.out new file mode 100644 index 0000000000..bc1ccc095a --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.cluster-log/..client.out @@ -0,0 +1,3 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +Connected! +unique ids 3 diff --git a/testing/btest/Baseline/cluster.websocket.cluster-log/..manager..stderr b/testing/btest/Baseline/cluster.websocket.cluster-log/..manager..stderr new file mode 100644 index 0000000000..e3f6131b1d --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.cluster-log/..manager..stderr @@ -0,0 +1,2 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +received termination signal diff --git a/testing/btest/Baseline/cluster.websocket.cluster-log/..manager.cluster.log.cannonified b/testing/btest/Baseline/cluster.websocket.cluster-log/..manager.cluster.log.cannonified new file mode 100644 index 0000000000..bc1ec612fb --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.cluster-log/..manager.cluster.log.cannonified @@ -0,0 +1,7 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +manager WebSocket client (127.0.0.1:) subscribed to [/topic/ws/1, /topic/ws/all] +manager WebSocket client (127.0.0.1:) subscribed to [/topic/ws/2, /topic/ws/all] +manager WebSocket client (127.0.0.1:) subscribed to [/topic/ws/3, /topic/ws/all] +manager WebSocket client (127.0.0.1:) gone +manager WebSocket client (127.0.0.1:) gone +manager WebSocket client (127.0.0.1:) gone diff --git a/testing/btest/Baseline/cluster.websocket.cluster-log/..manager.out b/testing/btest/Baseline/cluster.websocket.cluster-log/..manager.out new file mode 100644 index 0000000000..958fdd87a3 --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.cluster-log/..manager.out @@ -0,0 +1,7 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +Cluster::websocket_client_added, 1, [/topic/ws/1, /topic/ws/all] +Cluster::websocket_client_added, 2, [/topic/ws/2, /topic/ws/all] +Cluster::websocket_client_added, 3, [/topic/ws/3, /topic/ws/all] +Cluster::websocket_client_lost, 1 +Cluster::websocket_client_lost, 2 +Cluster::websocket_client_lost, 3 diff --git a/testing/btest/Baseline/cluster.websocket.listen-idempotent/.stderr b/testing/btest/Baseline/cluster.websocket.listen-idempotent/.stderr new file mode 100644 index 0000000000..e2bea8f00f --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.listen-idempotent/.stderr @@ -0,0 +1,4 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +error in <...>/listen-idempotent.zeek, line 43: Already listening on 127.0.0.1: (Cluster::listen_websocket(ws_opts_x)) +error in <...>/listen-idempotent.zeek, line 47: Already listening on 127.0.0.1: (Cluster::listen_websocket(ws_opts_wss_port)) +received termination signal diff --git a/testing/btest/Baseline/cluster.websocket.one-pipelining/..client..stderr b/testing/btest/Baseline/cluster.websocket.one-pipelining/..client..stderr new file mode 100644 index 0000000000..49d861c74c --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.one-pipelining/..client..stderr @@ -0,0 +1 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. diff --git a/testing/btest/Baseline/cluster.websocket.one-pipelining/..client.out b/testing/btest/Baseline/cluster.websocket.one-pipelining/..client.out new file mode 100644 index 0000000000..b8dcb923f8 --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.one-pipelining/..client.out @@ -0,0 +1,17 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +Connected! +Sending ping 0 +Sending ping 1 +Sending ping 2 +Sending ping 3 +Sending ping 4 +Receiving pong 0 +topic /zeek/event/my_topic event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 1}] +Receiving pong 1 +topic /zeek/event/my_topic event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 2}] +Receiving pong 2 +topic /zeek/event/my_topic event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 3}] +Receiving pong 3 +topic /zeek/event/my_topic event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 4}] +Receiving pong 4 +topic /zeek/event/my_topic event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 5}] diff --git a/testing/btest/Baseline/cluster.websocket.one-pipelining/..manager..stderr b/testing/btest/Baseline/cluster.websocket.one-pipelining/..manager..stderr new file mode 100644 index 0000000000..e3f6131b1d --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.one-pipelining/..manager..stderr @@ -0,0 +1,2 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +received termination signal diff --git a/testing/btest/Baseline/cluster.websocket.one-pipelining/..manager.out b/testing/btest/Baseline/cluster.websocket.one-pipelining/..manager.out new file mode 100644 index 0000000000..88a473b3b8 --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.one-pipelining/..manager.out @@ -0,0 +1,8 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +Cluster::websocket_client_added, [/zeek/event/my_topic] +got ping: python-websocket-client, 0 +got ping: python-websocket-client, 1 +got ping: python-websocket-client, 2 +got ping: python-websocket-client, 3 +got ping: python-websocket-client, 4 +Cluster::websocket_client_lost diff --git a/testing/btest/Baseline/cluster.websocket.one/..client..stderr b/testing/btest/Baseline/cluster.websocket.one/..client..stderr new file mode 100644 index 0000000000..49d861c74c --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.one/..client..stderr @@ -0,0 +1 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. diff --git a/testing/btest/Baseline/cluster.websocket.one/..client.out b/testing/btest/Baseline/cluster.websocket.one/..client.out new file mode 100644 index 0000000000..f637d596f8 --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.one/..client.out @@ -0,0 +1,17 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +Connected! +Sending ping 0 +Receiving pong 0 +topic /zeek/event/my_topic event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 1}] +Sending ping 1 +Receiving pong 1 +topic /zeek/event/my_topic event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 2}] +Sending ping 2 +Receiving pong 2 +topic /zeek/event/my_topic event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 3}] +Sending ping 3 +Receiving pong 3 +topic /zeek/event/my_topic event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 4}] +Sending ping 4 +Receiving pong 4 +topic /zeek/event/my_topic event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 5}] diff --git a/testing/btest/Baseline/cluster.websocket.one/..manager..stderr b/testing/btest/Baseline/cluster.websocket.one/..manager..stderr new file mode 100644 index 0000000000..e3f6131b1d --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.one/..manager..stderr @@ -0,0 +1,2 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +received termination signal diff --git a/testing/btest/Baseline/cluster.websocket.one/..manager.out b/testing/btest/Baseline/cluster.websocket.one/..manager.out new file mode 100644 index 0000000000..88a473b3b8 --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.one/..manager.out @@ -0,0 +1,8 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +Cluster::websocket_client_added, [/zeek/event/my_topic] +got ping: python-websocket-client, 0 +got ping: python-websocket-client, 1 +got ping: python-websocket-client, 2 +got ping: python-websocket-client, 3 +got ping: python-websocket-client, 4 +Cluster::websocket_client_lost diff --git a/testing/btest/Baseline/cluster.websocket.three/..client..stderr b/testing/btest/Baseline/cluster.websocket.three/..client..stderr new file mode 100644 index 0000000000..49d861c74c --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.three/..client..stderr @@ -0,0 +1 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. diff --git a/testing/btest/Baseline/cluster.websocket.three/..client.out b/testing/btest/Baseline/cluster.websocket.three/..client.out new file mode 100644 index 0000000000..ab54483ded --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.three/..client.out @@ -0,0 +1,83 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +Connected! +unique ids 3 +ws1 sending ping 0 +receiving pong 0 +ev: topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'orig_msg=ws1'}, {'@data-type': 'count', 'data': 1}] +ev: topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'orig_msg=ws1'}, {'@data-type': 'count', 'data': 1}] +ev: topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'orig_msg=ws1'}, {'@data-type': 'count', 'data': 1}] +ws2 sending ping 1 +receiving pong 1 +ev: topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'orig_msg=ws2'}, {'@data-type': 'count', 'data': 2}] +ev: topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'orig_msg=ws2'}, {'@data-type': 'count', 'data': 2}] +ev: topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'orig_msg=ws2'}, {'@data-type': 'count', 'data': 2}] +ws3 sending ping 2 +receiving pong 2 +ev: topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'orig_msg=ws3'}, {'@data-type': 'count', 'data': 3}] +ev: topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'orig_msg=ws3'}, {'@data-type': 'count', 'data': 3}] +ev: topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'orig_msg=ws3'}, {'@data-type': 'count', 'data': 3}] +ws1 sending ping 3 +receiving pong 3 +ev: topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'orig_msg=ws1'}, {'@data-type': 'count', 'data': 4}] +ev: topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'orig_msg=ws1'}, {'@data-type': 'count', 'data': 4}] +ev: topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'orig_msg=ws1'}, {'@data-type': 'count', 'data': 4}] +ws2 sending ping 4 +receiving pong 4 +ev: topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'orig_msg=ws2'}, {'@data-type': 'count', 'data': 5}] +ev: topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'orig_msg=ws2'}, {'@data-type': 'count', 'data': 5}] +ev: topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'orig_msg=ws2'}, {'@data-type': 'count', 'data': 5}] +ws3 sending ping 5 +receiving pong 5 +ev: topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'orig_msg=ws3'}, {'@data-type': 'count', 'data': 6}] +ev: topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'orig_msg=ws3'}, {'@data-type': 'count', 'data': 6}] +ev: topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'orig_msg=ws3'}, {'@data-type': 'count', 'data': 6}] +ws1 sending ping 6 +receiving pong 6 +ev: topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'orig_msg=ws1'}, {'@data-type': 'count', 'data': 7}] +ev: topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'orig_msg=ws1'}, {'@data-type': 'count', 'data': 7}] +ev: topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'orig_msg=ws1'}, {'@data-type': 'count', 'data': 7}] +ws2 sending ping 7 +receiving pong 7 +ev: topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'orig_msg=ws2'}, {'@data-type': 'count', 'data': 8}] +ev: topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'orig_msg=ws2'}, {'@data-type': 'count', 'data': 8}] +ev: topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'orig_msg=ws2'}, {'@data-type': 'count', 'data': 8}] +ws3 sending ping 8 +receiving pong 8 +ev: topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'orig_msg=ws3'}, {'@data-type': 'count', 'data': 9}] +ev: topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'orig_msg=ws3'}, {'@data-type': 'count', 'data': 9}] +ev: topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'orig_msg=ws3'}, {'@data-type': 'count', 'data': 9}] +ws1 sending ping 9 +receiving pong 9 +ev: topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'orig_msg=ws1'}, {'@data-type': 'count', 'data': 10}] +ev: topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'orig_msg=ws1'}, {'@data-type': 'count', 'data': 10}] +ev: topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'orig_msg=ws1'}, {'@data-type': 'count', 'data': 10}] +ws2 sending ping 10 +receiving pong 10 +ev: topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'orig_msg=ws2'}, {'@data-type': 'count', 'data': 11}] +ev: topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'orig_msg=ws2'}, {'@data-type': 'count', 'data': 11}] +ev: topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'orig_msg=ws2'}, {'@data-type': 'count', 'data': 11}] +ws3 sending ping 11 +receiving pong 11 +ev: topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'orig_msg=ws3'}, {'@data-type': 'count', 'data': 12}] +ev: topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'orig_msg=ws3'}, {'@data-type': 'count', 'data': 12}] +ev: topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'orig_msg=ws3'}, {'@data-type': 'count', 'data': 12}] +ws1 sending ping 12 +receiving pong 12 +ev: topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'orig_msg=ws1'}, {'@data-type': 'count', 'data': 13}] +ev: topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'orig_msg=ws1'}, {'@data-type': 'count', 'data': 13}] +ev: topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'orig_msg=ws1'}, {'@data-type': 'count', 'data': 13}] +ws2 sending ping 13 +receiving pong 13 +ev: topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'orig_msg=ws2'}, {'@data-type': 'count', 'data': 14}] +ev: topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'orig_msg=ws2'}, {'@data-type': 'count', 'data': 14}] +ev: topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'orig_msg=ws2'}, {'@data-type': 'count', 'data': 14}] +ws3 sending ping 14 +receiving pong 14 +ev: topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'orig_msg=ws3'}, {'@data-type': 'count', 'data': 15}] +ev: topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'orig_msg=ws3'}, {'@data-type': 'count', 'data': 15}] +ev: topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'orig_msg=ws3'}, {'@data-type': 'count', 'data': 15}] +ws1 sending ping 15 +receiving pong 15 +ev: topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'orig_msg=ws1'}, {'@data-type': 'count', 'data': 16}] +ev: topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'orig_msg=ws1'}, {'@data-type': 'count', 'data': 16}] +ev: topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'orig_msg=ws1'}, {'@data-type': 'count', 'data': 16}] diff --git a/testing/btest/Baseline/cluster.websocket.three/..manager..stderr b/testing/btest/Baseline/cluster.websocket.three/..manager..stderr new file mode 100644 index 0000000000..e3f6131b1d --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.three/..manager..stderr @@ -0,0 +1,2 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +received termination signal diff --git a/testing/btest/Baseline/cluster.websocket.three/..manager.out b/testing/btest/Baseline/cluster.websocket.three/..manager.out new file mode 100644 index 0000000000..30dca05f31 --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.three/..manager.out @@ -0,0 +1,23 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +Cluster::websocket_client_added, 1, [/test/clients] +Cluster::websocket_client_added, 2, [/test/clients] +Cluster::websocket_client_added, 3, [/test/clients] +got ping: ws1, 0 +got ping: ws2, 1 +got ping: ws3, 2 +got ping: ws1, 3 +got ping: ws2, 4 +got ping: ws3, 5 +got ping: ws1, 6 +got ping: ws2, 7 +got ping: ws3, 8 +got ping: ws1, 9 +got ping: ws2, 10 +got ping: ws3, 11 +got ping: ws1, 12 +got ping: ws2, 13 +got ping: ws3, 14 +got ping: ws1, 15 +Cluster::websocket_client_lost, 1 +Cluster::websocket_client_lost, 2 +Cluster::websocket_client_lost, 3 diff --git a/testing/btest/Baseline/cluster.websocket.tls-usage-error/.stderr b/testing/btest/Baseline/cluster.websocket.tls-usage-error/.stderr new file mode 100644 index 0000000000..46b508db58 --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.tls-usage-error/.stderr @@ -0,0 +1,3 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +error in <...>/tls-usage-error.zeek, line 17: Invalid tls_options: No key_file field (Cluster::listen_websocket((coerce [$listen_host=127.0.0.1, $listen_port=1234/tcp, $tls_options=tls_options_no_key] to Cluster::WebSocketServerOptions))) +error in <...>/tls-usage-error.zeek, line 18: Invalid tls_options: No cert_file field (Cluster::listen_websocket((coerce [$listen_host=127.0.0.1, $listen_port=1234/tcp, $tls_options=tls_options_no_cert] to Cluster::WebSocketServerOptions))) diff --git a/testing/btest/Baseline/cluster.websocket.tls/..client..stderr b/testing/btest/Baseline/cluster.websocket.tls/..client..stderr new file mode 100644 index 0000000000..49d861c74c --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.tls/..client..stderr @@ -0,0 +1 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. diff --git a/testing/btest/Baseline/cluster.websocket.tls/..client.out b/testing/btest/Baseline/cluster.websocket.tls/..client.out new file mode 100644 index 0000000000..f637d596f8 --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.tls/..client.out @@ -0,0 +1,17 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +Connected! +Sending ping 0 +Receiving pong 0 +topic /zeek/event/my_topic event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 1}] +Sending ping 1 +Receiving pong 1 +topic /zeek/event/my_topic event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 2}] +Sending ping 2 +Receiving pong 2 +topic /zeek/event/my_topic event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 3}] +Sending ping 3 +Receiving pong 3 +topic /zeek/event/my_topic event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 4}] +Sending ping 4 +Receiving pong 4 +topic /zeek/event/my_topic event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 5}] diff --git a/testing/btest/Baseline/cluster.websocket.tls/..manager..stderr b/testing/btest/Baseline/cluster.websocket.tls/..manager..stderr new file mode 100644 index 0000000000..e3f6131b1d --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.tls/..manager..stderr @@ -0,0 +1,2 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +received termination signal diff --git a/testing/btest/Baseline/cluster.websocket.tls/..manager.out b/testing/btest/Baseline/cluster.websocket.tls/..manager.out new file mode 100644 index 0000000000..88a473b3b8 --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.tls/..manager.out @@ -0,0 +1,8 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +Cluster::websocket_client_added, [/zeek/event/my_topic] +got ping: python-websocket-client, 0 +got ping: python-websocket-client, 1 +got ping: python-websocket-client, 2 +got ping: python-websocket-client, 3 +got ping: python-websocket-client, 4 +Cluster::websocket_client_lost diff --git a/testing/btest/Baseline/cluster.websocket.two-pipelining/..client..stderr b/testing/btest/Baseline/cluster.websocket.two-pipelining/..client..stderr new file mode 100644 index 0000000000..49d861c74c --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.two-pipelining/..client..stderr @@ -0,0 +1 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. diff --git a/testing/btest/Baseline/cluster.websocket.two-pipelining/..client.out b/testing/btest/Baseline/cluster.websocket.two-pipelining/..client.out new file mode 100644 index 0000000000..616a268fed --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.two-pipelining/..client.out @@ -0,0 +1,54 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +Connected! +Sending ping 0 - ws1 +Sending ping 0 - ws2 +Sending ping 1 - ws1 +Sending ping 1 - ws2 +Sending ping 2 - ws1 +Sending ping 2 - ws2 +Sending ping 3 - ws1 +Sending ping 3 - ws2 +Sending ping 4 - ws1 +Sending ping 4 - ws2 +Receiving ack - ws1 +Receiving ack - ws2 +Receiving pong 0 - ws0 +topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 1}] +Receiving pong 0 - ws1 +topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 1}] +Receiving pong 1 - ws0 +topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 2}] +Receiving pong 1 - ws1 +topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 2}] +Receiving pong 2 - ws0 +topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 3}] +Receiving pong 2 - ws1 +topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 3}] +Receiving pong 3 - ws0 +topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 4}] +Receiving pong 3 - ws1 +topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 4}] +Receiving pong 4 - ws0 +topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 5}] +Receiving pong 4 - ws1 +topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 5}] +Receiving pong 5 - ws0 +topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 6}] +Receiving pong 5 - ws1 +topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 6}] +Receiving pong 6 - ws0 +topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 7}] +Receiving pong 6 - ws1 +topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 7}] +Receiving pong 7 - ws0 +topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 8}] +Receiving pong 7 - ws1 +topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 8}] +Receiving pong 8 - ws0 +topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 9}] +Receiving pong 8 - ws1 +topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 9}] +Receiving pong 9 - ws0 +topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 10}] +Receiving pong 9 - ws1 +topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 10}] diff --git a/testing/btest/Baseline/cluster.websocket.two-pipelining/..manager..stderr b/testing/btest/Baseline/cluster.websocket.two-pipelining/..manager..stderr new file mode 100644 index 0000000000..e3f6131b1d --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.two-pipelining/..manager..stderr @@ -0,0 +1,2 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +received termination signal diff --git a/testing/btest/Baseline/cluster.websocket.two-pipelining/..manager.out.sorted b/testing/btest/Baseline/cluster.websocket.two-pipelining/..manager.out.sorted new file mode 100644 index 0000000000..ae8f79c02d --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.two-pipelining/..manager.out.sorted @@ -0,0 +1,15 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +A Cluster::websocket_client_added, 1, [/zeek/event/to_client] +A Cluster::websocket_client_added, 2, [/zeek/event/to_client] +B got ping: ws1, 0 +B got ping: ws1, 1 +B got ping: ws1, 2 +B got ping: ws1, 3 +B got ping: ws1, 4 +B got ping: ws2, 0 +B got ping: ws2, 1 +B got ping: ws2, 2 +B got ping: ws2, 3 +B got ping: ws2, 4 +C Cluster::websocket_client_lost +C Cluster::websocket_client_lost diff --git a/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log b/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log index 0afe9db25e..d99e27e5ec 100644 --- a/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log +++ b/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log @@ -137,6 +137,7 @@ scripts/base/init-frameworks-and-bifs.zeek scripts/base/frameworks/control/__load__.zeek scripts/base/frameworks/control/main.zeek build/scripts/base/bif/cluster.bif.zeek + build/scripts/base/bif/plugins/Zeek_Cluster_WebSocket.events.bif.zeek scripts/base/frameworks/cluster/pools.zeek scripts/base/utils/hash_hrw.zeek scripts/base/frameworks/config/__load__.zeek diff --git a/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log b/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log index 9300733959..4e61a3a752 100644 --- a/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log +++ b/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log @@ -137,6 +137,7 @@ scripts/base/init-frameworks-and-bifs.zeek scripts/base/frameworks/control/__load__.zeek scripts/base/frameworks/control/main.zeek build/scripts/base/bif/cluster.bif.zeek + build/scripts/base/bif/plugins/Zeek_Cluster_WebSocket.events.bif.zeek scripts/base/frameworks/cluster/pools.zeek scripts/base/utils/hash_hrw.zeek scripts/base/frameworks/config/__load__.zeek diff --git a/testing/btest/Baseline/opt.ZAM-bif-tracking/output b/testing/btest/Baseline/opt.ZAM-bif-tracking/output index 7434167583..0731586167 100644 --- a/testing/btest/Baseline/opt.ZAM-bif-tracking/output +++ b/testing/btest/Baseline/opt.ZAM-bif-tracking/output @@ -1,2 +1,2 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. -545 seen BiFs, 0 unseen BiFs (), 0 new BiFs () +546 seen BiFs, 0 unseen BiFs (), 0 new BiFs () diff --git a/testing/btest/Baseline/plugins.hooks/output b/testing/btest/Baseline/plugins.hooks/output index 32330c2da9..9174cce539 100644 --- a/testing/btest/Baseline/plugins.hooks/output +++ b/testing/btest/Baseline/plugins.hooks/output @@ -339,6 +339,7 @@ 0.000000 MetaHookPost LoadFile(0, ./Zeek_BinaryReader.binary.bif.zeek, <...>/Zeek_BinaryReader.binary.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./Zeek_BitTorrent.events.bif.zeek, <...>/Zeek_BitTorrent.events.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek, <...>/Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek) -> -1 +0.000000 MetaHookPost LoadFile(0, ./Zeek_Cluster_WebSocket.events.bif.zeek, <...>/Zeek_Cluster_WebSocket.events.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./Zeek_ConfigReader.config.bif.zeek, <...>/Zeek_ConfigReader.config.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./Zeek_ConnSize.events.bif.zeek, <...>/Zeek_ConnSize.events.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./Zeek_ConnSize.functions.bif.zeek, <...>/Zeek_ConnSize.functions.bif.zeek) -> -1 @@ -533,6 +534,7 @@ 0.000000 MetaHookPost LoadFile(0, base/init-frameworks-and-bifs.zeek, <...>/init-frameworks-and-bifs.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, base/packet-protocols, <...>/packet-protocols) -> -1 0.000000 MetaHookPost LoadFile(0, base<...>/CPP-load.bif, <...>/CPP-load.bif.zeek) -> -1 +0.000000 MetaHookPost LoadFile(0, base<...>/Zeek_Cluster_WebSocket.events.bif.zeek, <...>/Zeek_Cluster_WebSocket.events.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, base<...>/Zeek_GTPv1.events.bif, <...>/Zeek_GTPv1.events.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, base<...>/Zeek_GTPv1.functions.bif, <...>/Zeek_GTPv1.functions.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, base<...>/Zeek_KRB.types.bif, <...>/Zeek_KRB.types.bif.zeek) -> -1 @@ -647,6 +649,7 @@ 0.000000 MetaHookPost LoadFileExtended(0, ./Zeek_BinaryReader.binary.bif.zeek, <...>/Zeek_BinaryReader.binary.bif.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./Zeek_BitTorrent.events.bif.zeek, <...>/Zeek_BitTorrent.events.bif.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek, <...>/Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek) -> (-1, ) +0.000000 MetaHookPost LoadFileExtended(0, ./Zeek_Cluster_WebSocket.events.bif.zeek, <...>/Zeek_Cluster_WebSocket.events.bif.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./Zeek_ConfigReader.config.bif.zeek, <...>/Zeek_ConfigReader.config.bif.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./Zeek_ConnSize.events.bif.zeek, <...>/Zeek_ConnSize.events.bif.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./Zeek_ConnSize.functions.bif.zeek, <...>/Zeek_ConnSize.functions.bif.zeek) -> (-1, ) @@ -841,6 +844,7 @@ 0.000000 MetaHookPost LoadFileExtended(0, base/init-frameworks-and-bifs.zeek, <...>/init-frameworks-and-bifs.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, base/packet-protocols, <...>/packet-protocols) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, base<...>/CPP-load.bif, <...>/CPP-load.bif.zeek) -> (-1, ) +0.000000 MetaHookPost LoadFileExtended(0, base<...>/Zeek_Cluster_WebSocket.events.bif.zeek, <...>/Zeek_Cluster_WebSocket.events.bif.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, base<...>/Zeek_GTPv1.events.bif, <...>/Zeek_GTPv1.events.bif.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, base<...>/Zeek_GTPv1.functions.bif, <...>/Zeek_GTPv1.functions.bif.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, base<...>/Zeek_KRB.types.bif, <...>/Zeek_KRB.types.bif.zeek) -> (-1, ) @@ -1288,6 +1292,7 @@ 0.000000 MetaHookPre LoadFile(0, ./Zeek_BinaryReader.binary.bif.zeek, <...>/Zeek_BinaryReader.binary.bif.zeek) 0.000000 MetaHookPre LoadFile(0, ./Zeek_BitTorrent.events.bif.zeek, <...>/Zeek_BitTorrent.events.bif.zeek) 0.000000 MetaHookPre LoadFile(0, ./Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek, <...>/Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek) +0.000000 MetaHookPre LoadFile(0, ./Zeek_Cluster_WebSocket.events.bif.zeek, <...>/Zeek_Cluster_WebSocket.events.bif.zeek) 0.000000 MetaHookPre LoadFile(0, ./Zeek_ConfigReader.config.bif.zeek, <...>/Zeek_ConfigReader.config.bif.zeek) 0.000000 MetaHookPre LoadFile(0, ./Zeek_ConnSize.events.bif.zeek, <...>/Zeek_ConnSize.events.bif.zeek) 0.000000 MetaHookPre LoadFile(0, ./Zeek_ConnSize.functions.bif.zeek, <...>/Zeek_ConnSize.functions.bif.zeek) @@ -1482,6 +1487,7 @@ 0.000000 MetaHookPre LoadFile(0, base/init-frameworks-and-bifs.zeek, <...>/init-frameworks-and-bifs.zeek) 0.000000 MetaHookPre LoadFile(0, base/packet-protocols, <...>/packet-protocols) 0.000000 MetaHookPre LoadFile(0, base<...>/CPP-load.bif, <...>/CPP-load.bif.zeek) +0.000000 MetaHookPre LoadFile(0, base<...>/Zeek_Cluster_WebSocket.events.bif.zeek, <...>/Zeek_Cluster_WebSocket.events.bif.zeek) 0.000000 MetaHookPre LoadFile(0, base<...>/Zeek_GTPv1.events.bif, <...>/Zeek_GTPv1.events.bif.zeek) 0.000000 MetaHookPre LoadFile(0, base<...>/Zeek_GTPv1.functions.bif, <...>/Zeek_GTPv1.functions.bif.zeek) 0.000000 MetaHookPre LoadFile(0, base<...>/Zeek_KRB.types.bif, <...>/Zeek_KRB.types.bif.zeek) @@ -1596,6 +1602,7 @@ 0.000000 MetaHookPre LoadFileExtended(0, ./Zeek_BinaryReader.binary.bif.zeek, <...>/Zeek_BinaryReader.binary.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./Zeek_BitTorrent.events.bif.zeek, <...>/Zeek_BitTorrent.events.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek, <...>/Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek) +0.000000 MetaHookPre LoadFileExtended(0, ./Zeek_Cluster_WebSocket.events.bif.zeek, <...>/Zeek_Cluster_WebSocket.events.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./Zeek_ConfigReader.config.bif.zeek, <...>/Zeek_ConfigReader.config.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./Zeek_ConnSize.events.bif.zeek, <...>/Zeek_ConnSize.events.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./Zeek_ConnSize.functions.bif.zeek, <...>/Zeek_ConnSize.functions.bif.zeek) @@ -1790,6 +1797,7 @@ 0.000000 MetaHookPre LoadFileExtended(0, base/init-frameworks-and-bifs.zeek, <...>/init-frameworks-and-bifs.zeek) 0.000000 MetaHookPre LoadFileExtended(0, base/packet-protocols, <...>/packet-protocols) 0.000000 MetaHookPre LoadFileExtended(0, base<...>/CPP-load.bif, <...>/CPP-load.bif.zeek) +0.000000 MetaHookPre LoadFileExtended(0, base<...>/Zeek_Cluster_WebSocket.events.bif.zeek, <...>/Zeek_Cluster_WebSocket.events.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, base<...>/Zeek_GTPv1.events.bif, <...>/Zeek_GTPv1.events.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, base<...>/Zeek_GTPv1.functions.bif, <...>/Zeek_GTPv1.functions.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, base<...>/Zeek_KRB.types.bif, <...>/Zeek_KRB.types.bif.zeek) @@ -2236,6 +2244,7 @@ 0.000000 | HookLoadFile ./Zeek_BinaryReader.binary.bif.zeek <...>/Zeek_BinaryReader.binary.bif.zeek 0.000000 | HookLoadFile ./Zeek_BitTorrent.events.bif.zeek <...>/Zeek_BitTorrent.events.bif.zeek 0.000000 | HookLoadFile ./Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek <...>/Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek +0.000000 | HookLoadFile ./Zeek_Cluster_WebSocket.events.bif.zeek <...>/Zeek_Cluster_WebSocket.events.bif.zeek 0.000000 | HookLoadFile ./Zeek_ConfigReader.config.bif.zeek <...>/Zeek_ConfigReader.config.bif.zeek 0.000000 | HookLoadFile ./Zeek_ConnSize.events.bif.zeek <...>/Zeek_ConnSize.events.bif.zeek 0.000000 | HookLoadFile ./Zeek_ConnSize.functions.bif.zeek <...>/Zeek_ConnSize.functions.bif.zeek @@ -2443,6 +2452,7 @@ 0.000000 | HookLoadFile base/init-frameworks-and-bifs.zeek <...>/init-frameworks-and-bifs.zeek 0.000000 | HookLoadFile base/packet-protocols <...>/packet-protocols 0.000000 | HookLoadFile base<...>/CPP-load.bif <...>/CPP-load.bif.zeek +0.000000 | HookLoadFile base<...>/Zeek_Cluster_WebSocket.events.bif.zeek <...>/Zeek_Cluster_WebSocket.events.bif.zeek 0.000000 | HookLoadFile base<...>/Zeek_GTPv1.events.bif <...>/Zeek_GTPv1.events.bif.zeek 0.000000 | HookLoadFile base<...>/Zeek_GTPv1.functions.bif <...>/Zeek_GTPv1.functions.bif.zeek 0.000000 | HookLoadFile base<...>/Zeek_KRB.types.bif <...>/Zeek_KRB.types.bif.zeek @@ -2544,6 +2554,7 @@ 0.000000 | HookLoadFileExtended ./Zeek_BinaryReader.binary.bif.zeek <...>/Zeek_BinaryReader.binary.bif.zeek 0.000000 | HookLoadFileExtended ./Zeek_BitTorrent.events.bif.zeek <...>/Zeek_BitTorrent.events.bif.zeek 0.000000 | HookLoadFileExtended ./Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek <...>/Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek +0.000000 | HookLoadFileExtended ./Zeek_Cluster_WebSocket.events.bif.zeek <...>/Zeek_Cluster_WebSocket.events.bif.zeek 0.000000 | HookLoadFileExtended ./Zeek_ConfigReader.config.bif.zeek <...>/Zeek_ConfigReader.config.bif.zeek 0.000000 | HookLoadFileExtended ./Zeek_ConnSize.events.bif.zeek <...>/Zeek_ConnSize.events.bif.zeek 0.000000 | HookLoadFileExtended ./Zeek_ConnSize.functions.bif.zeek <...>/Zeek_ConnSize.functions.bif.zeek @@ -2751,6 +2762,7 @@ 0.000000 | HookLoadFileExtended base/init-frameworks-and-bifs.zeek <...>/init-frameworks-and-bifs.zeek 0.000000 | HookLoadFileExtended base/packet-protocols <...>/packet-protocols 0.000000 | HookLoadFileExtended base<...>/CPP-load.bif <...>/CPP-load.bif.zeek +0.000000 | HookLoadFileExtended base<...>/Zeek_Cluster_WebSocket.events.bif.zeek <...>/Zeek_Cluster_WebSocket.events.bif.zeek 0.000000 | HookLoadFileExtended base<...>/Zeek_GTPv1.events.bif <...>/Zeek_GTPv1.events.bif.zeek 0.000000 | HookLoadFileExtended base<...>/Zeek_GTPv1.functions.bif <...>/Zeek_GTPv1.functions.bif.zeek 0.000000 | HookLoadFileExtended base<...>/Zeek_KRB.types.bif <...>/Zeek_KRB.types.bif.zeek diff --git a/testing/btest/cluster/websocket/bad-event-args.zeek b/testing/btest/cluster/websocket/bad-event-args.zeek new file mode 100644 index 0000000000..01aabb7808 --- /dev/null +++ b/testing/btest/cluster/websocket/bad-event-args.zeek @@ -0,0 +1,126 @@ +# @TEST-DOC: A WebSocket client sending invalid data for an event. +# +# @TEST-REQUIRES: have-zeromq +# @TEST-REQUIRES: python3 -c 'import websockets.sync' +# +# @TEST-GROUP: cluster-zeromq +# +# @TEST-PORT: XPUB_PORT +# @TEST-PORT: XSUB_PORT +# @TEST-PORT: LOG_PULL_PORT +# @TEST-PORT: WEBSOCKET_PORT +# +# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-simple.zeek cluster-layout.zeek +# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek +# +# @TEST-EXEC: zeek -b --parse-only manager.zeek +# @TEST-EXEC: python3 -m py_compile client.py +# +# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek >out" +# @TEST-EXEC: btest-bg-run client "python3 ../client.py >out" +# +# @TEST-EXEC: btest-bg-wait 30 +# @TEST-EXEC: btest-diff ./manager/out +# @TEST-EXEC: btest-diff ./manager/.stderr +# @TEST-EXEC: btest-diff ./client/out +# @TEST-EXEC: btest-diff ./client/.stderr + +# @TEST-START-FILE manager.zeek +@load ./zeromq-test-bootstrap +redef exit_only_after_terminate = T; + +global ping: event(msg: string, c: count) &is_used; +global pong: event(msg: string, c: count) &is_used; + +event zeek_init() + { + Cluster::subscribe("/zeek/event/my_topic"); + Cluster::listen_websocket([$listen_host="127.0.0.1", $listen_port=to_port(getenv("WEBSOCKET_PORT"))]); + } + +event ping(msg: string, n: count) &is_used + { + print fmt("got ping: %s, %s", msg, n); + local e = Cluster::make_event(pong, msg, n); + Cluster::publish("/zeek/event/my_topic", e); + } + +event Cluster::websocket_client_added(info: Cluster::EndpointInfo, subscriptions: string_vec) + { + print "Cluster::websocket_client_added", subscriptions; + } + +event Cluster::websocket_client_lost(info: Cluster::EndpointInfo) + { + print "Cluster::websocket_client_lost"; + terminate(); + } +# @TEST-END-FILE + + +@TEST-START-FILE client.py +import json, os, time +from websockets.sync.client import connect + +ws_port = os.environ['WEBSOCKET_PORT'].split('/')[0] +ws_url = f'ws://127.0.0.1:{ws_port}/v1/messages/json' +topic = '/zeek/event/my_topic' + +def make_ping(event_args): + return { + "type": "data-message", + "topic": topic, + "@data-type": "vector", + "data": [ + {"@data-type": "count", "data": 1}, # Format + {"@data-type": "count", "data": 1}, # Type + {"@data-type": "vector", "data": [ + { "@data-type": "string", "data": "ping"}, # Event name + { "@data-type": "vector", "data": event_args }, + ], }, + ], + } + +def run(ws_url): + with connect(ws_url) as ws: + print("Connected!") + # Send subscriptions + ws.send(json.dumps([topic])) + ack = json.loads(ws.recv()) + assert "type" in ack + assert ack["type"] == "ack" + assert "endpoint" in ack + assert "version" in ack + + ws.send(json.dumps(make_ping(42))) + err1 = json.loads(ws.recv()) + print("err1", err1) + ws.send(json.dumps(make_ping([{"@data-type": "string", "data": "Hello"}]))) + err2 = json.loads(ws.recv()) + print("err2", err2) + ws.send(json.dumps(make_ping([{"@data-type": "count", "data": 42}, {"@data-type": "string", "data": "Hello"}]))) + err3 = json.loads(ws.recv()) + print("err3", err3) + + # This should be good ping(string, count) + ws.send(json.dumps(make_ping([{"@data-type": "string", "data": "Hello"}, {"@data-type": "count", "data": 42}]))) + pong = json.loads(ws.recv()) + name, args, _ = pong["data"][2]["data"] + print("pong", name, args) + + # This one fails again + ws.send(json.dumps(make_ping([{"@data-type": "money", "data": 0}]))) + err4 = json.loads(ws.recv()) + print("err4", err4) + +def main(): + for _ in range(100): + try: + run(ws_url) + break + except ConnectionRefusedError: + time.sleep(0.1) + +if __name__ == "__main__": + main() +@TEST-END-FILE diff --git a/testing/btest/cluster/websocket/bad-subscriptions.zeek b/testing/btest/cluster/websocket/bad-subscriptions.zeek new file mode 100644 index 0000000000..f6acb33aab --- /dev/null +++ b/testing/btest/cluster/websocket/bad-subscriptions.zeek @@ -0,0 +1,102 @@ +# @TEST-DOC: Clients sends broken subscription arrays +# +# @TEST-REQUIRES: have-zeromq +# @TEST-REQUIRES: python3 -c 'import websockets.sync' +# +# @TEST-GROUP: cluster-zeromq +# +# @TEST-PORT: XPUB_PORT +# @TEST-PORT: XSUB_PORT +# @TEST-PORT: LOG_PULL_PORT +# @TEST-PORT: WEBSOCKET_PORT +# +# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-simple.zeek cluster-layout.zeek +# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek +# +# @TEST-EXEC: zeek -b --parse-only manager.zeek +# @TEST-EXEC: python3 -m py_compile client.py +# +# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek >out" +# @TEST-EXEC: btest-bg-run client "python3 ../client.py >out" +# +# @TEST-EXEC: btest-bg-wait 30 +# @TEST-EXEC: btest-diff ./manager/out +# @TEST-EXEC: btest-diff ./manager/.stderr +# @TEST-EXEC: btest-diff ./client/out +# @TEST-EXEC: btest-diff ./client/.stderr + +# @TEST-START-FILE manager.zeek +@load ./zeromq-test-bootstrap +redef exit_only_after_terminate = T; + +global event_count = 0; + +global ping: event(msg: string, c: count) &is_used; +global pong: event(msg: string, c: count) &is_used; + +event zeek_init() + { + Cluster::subscribe("/zeek/event/my_topic"); + Cluster::listen_websocket([$listen_host="127.0.0.1", $listen_port=to_port(getenv("WEBSOCKET_PORT"))]); + } + +global added = 0; +global lost = 0; + +event Cluster::websocket_client_added(info: Cluster::EndpointInfo, subscriptions: string_vec) + { + ++added; + print "Cluster::websocket_client_added", added, subscriptions; + } + +event Cluster::websocket_client_lost(info: Cluster::EndpointInfo) + { + ++lost; + print "Cluster::websocket_client_lost", lost; + if ( lost == 4 ) + terminate(); + } +# @TEST-END-FILE + + +@TEST-START-FILE client.py +import json, os, time +from websockets.sync.client import connect + +ws_port = os.environ['WEBSOCKET_PORT'].split('/')[0] +ws_url = f'ws://127.0.0.1:{ws_port}/v1/messages/json' +topic = '/zeek/event/my_topic' + +def run(ws_url): + with connect(ws_url) as ws: + ws.send('["broken", "brrr') + err = json.loads(ws.recv()) + print("broken array response", err) + + with connect(ws_url) as ws: + ws.send('[1, 2]') + err = json.loads(ws.recv()) + print("non string error", err) + + with connect(ws_url) as ws: + ws.send('[1, "/my_topic"]') + err = json.loads(ws.recv()) + print("mix error", err) + + # This should work - maybe duplicate isn't great, but good for testing. + with connect(ws_url) as ws: + ws.send('["/topic/good", "/duplicate", "/duplicate", "/is/okay"]') + ack = json.loads(ws.recv()) + print("ack", ack["type"] == "ack") + +def main(): + for _ in range(100): + try: + run(ws_url) + break + except ConnectionRefusedError: + time.sleep(0.1) + +if __name__ == "__main__": + main() +@TEST-END-FILE diff --git a/testing/btest/cluster/websocket/bad-url.zeek b/testing/btest/cluster/websocket/bad-url.zeek new file mode 100644 index 0000000000..6bc6f9b185 --- /dev/null +++ b/testing/btest/cluster/websocket/bad-url.zeek @@ -0,0 +1,101 @@ +# @TEST-DOC: Run a single node cluster (manager) with a websocket server and have a single client connect. +# +# @TEST-REQUIRES: have-zeromq +# @TEST-REQUIRES: python3 -c 'import websockets.sync' +# +# @TEST-GROUP: cluster-zeromq +# +# @TEST-PORT: XPUB_PORT +# @TEST-PORT: XSUB_PORT +# @TEST-PORT: LOG_PULL_PORT +# @TEST-PORT: WEBSOCKET_PORT +# +# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-simple.zeek cluster-layout.zeek +# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek +# +# @TEST-EXEC: zeek -b --parse-only manager.zeek +# @TEST-EXEC: python3 -m py_compile client.py +# +# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek >out" +# @TEST-EXEC: btest-bg-run client "python3 ../client.py >out" +# +# @TEST-EXEC: btest-bg-wait 5 +# @TEST-EXEC: btest-diff ./manager/out +# @TEST-EXEC: btest-diff ./manager/.stderr +# @TEST-EXEC: btest-diff ./client/out +# @TEST-EXEC: btest-diff ./client/.stderr + +# @TEST-START-FILE manager.zeek +@load ./zeromq-test-bootstrap +redef exit_only_after_terminate = T; + +global ping_count = 0; + +global ping: event(msg: string, c: count) &is_used; +global pong: event(msg: string, c: count) &is_used; + +event zeek_init() + { + Cluster::subscribe("/zeek/event/my_topic"); + Cluster::listen_websocket([$listen_host="127.0.0.1", $listen_port=to_port(getenv("WEBSOCKET_PORT"))]); + } + +event ping(msg: string, n: count) &is_used + { + ++ping_count; + print fmt("got ping: %s, %s", msg, n); + local e = Cluster::make_event(pong, "my-message", ping_count); + Cluster::publish("/zeek/event/my_topic", e); + } + +event Cluster::websocket_client_added(info: Cluster::EndpointInfo, subscriptions: string_vec) + { + print "Cluster::websocket_client_added", subscriptions; + } + +event Cluster::websocket_client_lost(info: Cluster::EndpointInfo) + { + print "Cluster::websocket_client_lost"; + terminate(); + } +# @TEST-END-FILE + + +@TEST-START-FILE client.py +import json, os, time +import websockets.exceptions +from websockets.sync.client import connect + +ws_port = os.environ['WEBSOCKET_PORT'].split('/')[0] +ws_prefix = f'ws://127.0.0.1:{ws_port}' +topic = '/zeek/event/my_topic' + + +def run(ws_prefix): + with connect(ws_prefix + '/v1/messages/json') as ws_good: + print('Connected ws_good!') + with connect(ws_prefix + '/v0/messages/json') as ws_bad: + print('Connected ws_bad!') + try: + err = json.loads(ws_bad.recv()) + except websockets.exceptions.ConnectionClosedError as e: + pass + + print('Error for ws_bad', err) + + ws_good.send(json.dumps(['hello-good'])) + ack = json.loads(ws_good.recv()) + assert 'type' in ack + assert ack['type'] == 'ack' + +def main(): + for _ in range(100): + try: + run(ws_prefix) + break + except ConnectionRefusedError: + time.sleep(0.1) + +if __name__ == '__main__': + main() +@TEST-END-FILE diff --git a/testing/btest/cluster/websocket/cluster-log.zeek b/testing/btest/cluster/websocket/cluster-log.zeek new file mode 100644 index 0000000000..e8e6c6f001 --- /dev/null +++ b/testing/btest/cluster/websocket/cluster-log.zeek @@ -0,0 +1,111 @@ +# @TEST-DOC: Test websocket clients appearing in cluster.log +# +# @TEST-REQUIRES: have-zeromq +# @TEST-REQUIRES: python3 -c 'import websockets.sync' +# +# @TEST-GROUP: cluster-zeromq +# +# @TEST-PORT: XPUB_PORT +# @TEST-PORT: XSUB_PORT +# @TEST-PORT: LOG_PULL_PORT +# @TEST-PORT: WEBSOCKET_PORT +# +# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-simple.zeek cluster-layout.zeek +# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek +# +# @TEST-EXEC: zeek -b --parse-only manager.zeek +# @TEST-EXEC: python3 -m py_compile client.py +# +# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek >out" +# @TEST-EXEC: btest-bg-run client "python3 ../client.py >out" +# +# @TEST-EXEC: btest-bg-wait 30 +# @TEST-EXEC: btest-diff ./manager/out +# @TEST-EXEC: btest-diff ./manager/.stderr +# @TEST-EXEC: zeek-cut node message < ./manager/cluster.log | sed -r "s/client '.+' /client /g" | sed -r "s/:[0-9]+/:/g" > ./manager/cluster.log.cannonified +# @TEST-EXEC: btest-diff ./manager/cluster.log.cannonified +# @TEST-EXEC: btest-diff ./client/out +# @TEST-EXEC: btest-diff ./client/.stderr + +# @TEST-START-FILE manager.zeek +@load ./zeromq-test-bootstrap +redef exit_only_after_terminate = T; + +# Have the manager create cluster.log +redef Log::enable_local_logging = T; +redef Log::default_rotation_interval = 0sec; + +global ping_count = 0; + +global ping: event(msg: string, c: count) &is_used; +global pong: event(msg: string, c: count) &is_used; + +event zeek_init() + { + Cluster::subscribe("/test/manager"); + Cluster::listen_websocket([$listen_host="127.0.0.1", $listen_port=to_port(getenv("WEBSOCKET_PORT"))]); + } + +event ping(msg: string, n: count) &is_used + { + ++ping_count; + print fmt("got ping: %s, %s", msg, n); + local e = Cluster::make_event(pong, fmt("orig_msg=%s", msg), ping_count); + Cluster::publish("/test/clients", e); + } + +global added = 0; +global lost = 0; + +event Cluster::websocket_client_added(info: Cluster::EndpointInfo, subscriptions: string_vec) + { + ++added; + print "Cluster::websocket_client_added", added, subscriptions; + } + +event Cluster::websocket_client_lost(info: Cluster::EndpointInfo) + { + ++lost; + print "Cluster::websocket_client_lost", lost; + if ( lost == 3 ) + terminate(); + } +# @TEST-END-FILE + + +@TEST-START-FILE client.py +import json, os, time +from websockets.sync.client import connect + +ws_port = os.environ['WEBSOCKET_PORT'].split('/')[0] +ws_url = f'ws://127.0.0.1:{ws_port}/v1/messages/json' + +def run(ws_url): + with connect(ws_url) as ws1: + with connect(ws_url) as ws2: + with connect(ws_url) as ws3: + clients = [ws1, ws2, ws3] + print("Connected!") + ids = set() + for i, c in enumerate(clients, 1): + c.send(json.dumps([f"/topic/ws/{i}", "/topic/ws/all"])) + ack = json.loads(c.recv()) + assert "type" in ack, repr(ack) + assert ack["type"] == "ack" + assert "endpoint" in ack, repr(ack) + assert "version" in ack + ids.add(ack["endpoint"]) + + print("unique ids", len(ids)) + +def main(): + for _ in range(100): + try: + run(ws_url) + break + except ConnectionRefusedError: + time.sleep(0.1) + +if __name__ == "__main__": + main() +@TEST-END-FILE diff --git a/testing/btest/cluster/websocket/listen-idempotent.zeek b/testing/btest/cluster/websocket/listen-idempotent.zeek new file mode 100644 index 0000000000..448ab5e1ac --- /dev/null +++ b/testing/btest/cluster/websocket/listen-idempotent.zeek @@ -0,0 +1,50 @@ +# @TEST-DOC: Allow listening with the same tls_options on the same port, but fail for disagreeing tls_options. +# +# @TEST-EXEC: zeek -b %INPUT +# @TEST-EXEC: TEST_DIFF_CANONIFIER='sed -E "s/127.0.0.1:[0-9]+/127.0.0.1:/g" | $SCRIPTS/diff-remove-abspath' btest-diff .stderr +# +# @TEST-PORT: WEBSOCKET_PORT +# @TEST-PORT: WEBSOCKET_SECURE_PORT + +event zeek_init() + { + local tls_options = Cluster::WebSocketTLSOptions( + $cert_file="../localhost.crt", + $key_file="../localhost.key", + ); + + local tls_options_2 = Cluster::WebSocketTLSOptions( + $cert_file="../localhost.crt", + $key_file="../localhost.key", + ); + local ws_port = to_port(getenv("WEBSOCKET_PORT")); + local wss_port = to_port(getenv("WEBSOCKET_SECURE_PORT")); + + local ws_opts = Cluster::WebSocketServerOptions($listen_host="127.0.0.1", $listen_port=ws_port); + local ws_opts_x = copy(ws_opts); + ws_opts_x$tls_options = tls_options; + + local ws_opts_wss_port = Cluster::WebSocketServerOptions($listen_host="127.0.0.1", $listen_port=wss_port); + + local ws_tls_opts = Cluster::WebSocketServerOptions( + $listen_host="127.0.0.1", + $listen_port=wss_port, + $tls_options=tls_options, + ); + # Same as ws_tls_opts + local ws_tls_opts_copy = Cluster::WebSocketServerOptions( + $listen_host="127.0.0.1", + $listen_port=wss_port, + $tls_options=tls_options_2, + ); + + assert Cluster::listen_websocket(ws_opts); + assert Cluster::listen_websocket(ws_opts); + assert ! Cluster::listen_websocket(ws_opts_x); + assert Cluster::listen_websocket(ws_tls_opts); + assert Cluster::listen_websocket(ws_tls_opts); + assert Cluster::listen_websocket(ws_tls_opts_copy); + assert ! Cluster::listen_websocket(ws_opts_wss_port); + + terminate(); + } diff --git a/testing/btest/cluster/websocket/one-pipelining.zeek b/testing/btest/cluster/websocket/one-pipelining.zeek new file mode 100644 index 0000000000..3bf6096ee5 --- /dev/null +++ b/testing/btest/cluster/websocket/one-pipelining.zeek @@ -0,0 +1,123 @@ +# @TEST-DOC: Send subscriptions and events without waiting for pong, should be okay, the websocket server will queue this a bit. +# +# @TEST-REQUIRES: have-zeromq +# @TEST-REQUIRES: python3 -c 'import websockets.sync' +# +# @TEST-GROUP: cluster-zeromq +# +# @TEST-PORT: XPUB_PORT +# @TEST-PORT: XSUB_PORT +# @TEST-PORT: LOG_PULL_PORT +# @TEST-PORT: WEBSOCKET_PORT +# +# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-simple.zeek cluster-layout.zeek +# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek +# +# @TEST-EXEC: zeek -b --parse-only manager.zeek +# @TEST-EXEC: python3 -m py_compile client.py +# +# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek >out" +# @TEST-EXEC: btest-bg-run client "python3 ../client.py >out" +# +# @TEST-EXEC: btest-bg-wait 30 +# @TEST-EXEC: btest-diff ./manager/out +# @TEST-EXEC: btest-diff ./manager/.stderr +# @TEST-EXEC: btest-diff ./client/out +# @TEST-EXEC: btest-diff ./client/.stderr + +# @TEST-START-FILE manager.zeek +@load ./zeromq-test-bootstrap +redef exit_only_after_terminate = T; + +global ping_count = 0; + +global ping: event(msg: string, c: count) &is_used; +global pong: event(msg: string, c: count) &is_used; + +event zeek_init() + { + Cluster::subscribe("/zeek/event/my_topic"); + Cluster::listen_websocket([$listen_host="127.0.0.1", $listen_port=to_port(getenv("WEBSOCKET_PORT"))]); + } + +event ping(msg: string, n: count) &is_used + { + ++ping_count; + print fmt("got ping: %s, %s", msg, n); + local e = Cluster::make_event(pong, "my-message", ping_count); + Cluster::publish("/zeek/event/my_topic", e); + } + +event Cluster::websocket_client_added(info: Cluster::EndpointInfo, subscriptions: string_vec) + { + print "Cluster::websocket_client_added", subscriptions; + } + +event Cluster::websocket_client_lost(info: Cluster::EndpointInfo) + { + print "Cluster::websocket_client_lost"; + terminate(); + } +# @TEST-END-FILE + + +@TEST-START-FILE client.py +import json, os, time +from websockets.sync.client import connect + +ws_port = os.environ['WEBSOCKET_PORT'].split('/')[0] +ws_url = f'ws://127.0.0.1:{ws_port}/v1/messages/json' +topic = '/zeek/event/my_topic' + +def make_ping(c): + return { + "type": "data-message", + "topic": topic, + "@data-type": "vector", + "data": [ + {"@data-type": "count", "data": 1}, # Format + {"@data-type": "count", "data": 1}, # Type + {"@data-type": "vector", "data": [ + { "@data-type": "string", "data": "ping"}, # Event name + { "@data-type": "vector", "data": [ # event args + {"@data-type": "string", "data": f"python-websocket-client"}, + {"@data-type": "count", "data": c}, + ], }, + ], }, + ], + } + +def run(ws_url): + with connect(ws_url) as ws: + print("Connected!") + # Send subscriptions + ws.send(json.dumps([topic])) + + for i in range(5): + print("Sending ping", i) + ws.send(json.dumps(make_ping(i))) + + ack = json.loads(ws.recv()) + assert "type" in ack + assert ack["type"] == "ack" + assert "endpoint" in ack + assert "version" in ack + + for i in range(5): + print("Receiving pong", i) + pong = json.loads(ws.recv()) + assert pong["@data-type"] == "vector" + ev = pong["data"][2]["data"] + print("topic", pong["topic"], "event name", ev[0]["data"], "args", ev[1]["data"]) + +def main(): + for _ in range(100): + try: + run(ws_url) + break + except ConnectionRefusedError: + time.sleep(0.1) + +if __name__ == "__main__": + main() +@TEST-END-FILE diff --git a/testing/btest/cluster/websocket/one.zeek b/testing/btest/cluster/websocket/one.zeek new file mode 100644 index 0000000000..e3ecf77d4f --- /dev/null +++ b/testing/btest/cluster/websocket/one.zeek @@ -0,0 +1,120 @@ +# @TEST-DOC: Run a single node cluster (manager) with a websocket server and have a single client connect. +# +# @TEST-REQUIRES: have-zeromq +# @TEST-REQUIRES: python3 -c 'import websockets.sync' +# +# @TEST-GROUP: cluster-zeromq +# +# @TEST-PORT: XPUB_PORT +# @TEST-PORT: XSUB_PORT +# @TEST-PORT: LOG_PULL_PORT +# @TEST-PORT: WEBSOCKET_PORT +# +# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-simple.zeek cluster-layout.zeek +# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek +# +# @TEST-EXEC: zeek -b --parse-only manager.zeek +# @TEST-EXEC: python3 -m py_compile client.py +# +# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek >out" +# @TEST-EXEC: btest-bg-run client "python3 ../client.py >out" +# +# @TEST-EXEC: btest-bg-wait 30 +# @TEST-EXEC: btest-diff ./manager/out +# @TEST-EXEC: btest-diff ./manager/.stderr +# @TEST-EXEC: btest-diff ./client/out +# @TEST-EXEC: btest-diff ./client/.stderr + +# @TEST-START-FILE manager.zeek +@load ./zeromq-test-bootstrap +redef exit_only_after_terminate = T; + +global ping_count = 0; + +global ping: event(msg: string, c: count) &is_used; +global pong: event(msg: string, c: count) &is_used; + +event zeek_init() + { + Cluster::subscribe("/zeek/event/my_topic"); + Cluster::listen_websocket([$listen_host="127.0.0.1", $listen_port=to_port(getenv("WEBSOCKET_PORT"))]); + } + +event ping(msg: string, n: count) &is_used + { + ++ping_count; + print fmt("got ping: %s, %s", msg, n); + local e = Cluster::make_event(pong, "my-message", ping_count); + Cluster::publish("/zeek/event/my_topic", e); + } + +event Cluster::websocket_client_added(info: Cluster::EndpointInfo, subscriptions: string_vec) + { + print "Cluster::websocket_client_added", subscriptions; + } + +event Cluster::websocket_client_lost(info: Cluster::EndpointInfo) + { + print "Cluster::websocket_client_lost"; + terminate(); + } +# @TEST-END-FILE + + +@TEST-START-FILE client.py +import json, os, time +from websockets.sync.client import connect + +ws_port = os.environ['WEBSOCKET_PORT'].split('/')[0] +ws_url = f'ws://127.0.0.1:{ws_port}/v1/messages/json' +topic = '/zeek/event/my_topic' + +def make_ping(c): + return { + "type": "data-message", + "topic": topic, + "@data-type": "vector", + "data": [ + {"@data-type": "count", "data": 1}, # Format + {"@data-type": "count", "data": 1}, # Type + {"@data-type": "vector", "data": [ + { "@data-type": "string", "data": "ping"}, # Event name + { "@data-type": "vector", "data": [ # event args + {"@data-type": "string", "data": f"python-websocket-client"}, + {"@data-type": "count", "data": c}, + ], }, + ], }, + ], + } + +def run(ws_url): + with connect(ws_url) as ws: + print("Connected!") + # Send subscriptions + ws.send(json.dumps([topic])) + ack = json.loads(ws.recv()) + assert "type" in ack + assert ack["type"] == "ack" + assert "endpoint" in ack + assert "version" in ack + + for i in range(5): + print("Sending ping", i) + ws.send(json.dumps(make_ping(i))) + print("Receiving pong", i) + pong = json.loads(ws.recv()) + assert pong["@data-type"] == "vector" + ev = pong["data"][2]["data"] + print("topic", pong["topic"], "event name", ev[0]["data"], "args", ev[1]["data"]) + +def main(): + for _ in range(100): + try: + run(ws_url) + break + except ConnectionRefusedError: + time.sleep(0.1) + +if __name__ == "__main__": + main() +@TEST-END-FILE diff --git a/testing/btest/cluster/websocket/three.zeek b/testing/btest/cluster/websocket/three.zeek new file mode 100644 index 0000000000..3810c04e3c --- /dev/null +++ b/testing/btest/cluster/websocket/three.zeek @@ -0,0 +1,137 @@ +# @TEST-DOC: Run a single node cluster (manager) with a websocket server, have three clients connect. +# +# @TEST-REQUIRES: have-zeromq +# @TEST-REQUIRES: python3 -c 'import websockets.sync' +# +# @TEST-GROUP: cluster-zeromq +# +# @TEST-PORT: XPUB_PORT +# @TEST-PORT: XSUB_PORT +# @TEST-PORT: LOG_PULL_PORT +# @TEST-PORT: WEBSOCKET_PORT +# +# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-simple.zeek cluster-layout.zeek +# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek +# +# @TEST-EXEC: zeek -b --parse-only manager.zeek +# @TEST-EXEC: python3 -m py_compile client.py +# +# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek >out" +# @TEST-EXEC: btest-bg-run client "python3 ../client.py >out" +# +# @TEST-EXEC: btest-bg-wait 30 +# @TEST-EXEC: btest-diff ./manager/out +# @TEST-EXEC: btest-diff ./manager/.stderr +# @TEST-EXEC: btest-diff ./client/out +# @TEST-EXEC: btest-diff ./client/.stderr + +# @TEST-START-FILE manager.zeek +@load ./zeromq-test-bootstrap +redef exit_only_after_terminate = T; + +global ping_count = 0; + +global ping: event(msg: string, c: count) &is_used; +global pong: event(msg: string, c: count) &is_used; + +event zeek_init() + { + Cluster::subscribe("/test/manager"); + Cluster::listen_websocket([$listen_host="127.0.0.1", $listen_port=to_port(getenv("WEBSOCKET_PORT"))]); + } + +event ping(msg: string, n: count) &is_used + { + ++ping_count; + print fmt("got ping: %s, %s", msg, n); + local e = Cluster::make_event(pong, fmt("orig_msg=%s", msg), ping_count); + Cluster::publish("/test/clients", e); + } + +global added = 0; +global lost = 0; + +event Cluster::websocket_client_added(info: Cluster::EndpointInfo, subscriptions: string_vec) + { + ++added; + print "Cluster::websocket_client_added", added, subscriptions; + } + +event Cluster::websocket_client_lost(info: Cluster::EndpointInfo) + { + ++lost; + print "Cluster::websocket_client_lost", lost; + if ( lost == 3 ) + terminate(); + } +# @TEST-END-FILE + + +@TEST-START-FILE client.py +import json, os, time +from websockets.sync.client import connect + +ws_port = os.environ['WEBSOCKET_PORT'].split('/')[0] +ws_url = f'ws://127.0.0.1:{ws_port}/v1/messages/json' +topic = '/test/clients' + +def make_ping(c, who): + return { + "type": "data-message", + "topic": "/test/manager", + "@data-type": "vector", + "data": [ + {"@data-type": "count", "data": 1}, # Format + {"@data-type": "count", "data": 1}, # Type + {"@data-type": "vector", "data": [ + { "@data-type": "string", "data": "ping"}, # Event name + { "@data-type": "vector", "data": [ # event args + {"@data-type": "string", "data": who}, + {"@data-type": "count", "data": c}, + ], }, + ], }, + ], + } + +def run(ws_url): + with connect(ws_url) as ws1: + with connect(ws_url) as ws2: + with connect(ws_url) as ws3: + clients = [ws1, ws2, ws3] + print("Connected!") + ids = set() + for c in clients: + c.send(json.dumps([topic])) + for c in clients: + ack = json.loads(c.recv()) + assert "type" in ack, repr(ack) + assert ack["type"] == "ack" + assert "endpoint" in ack, repr(ack) + assert "version" in ack + ids.add(ack["endpoint"]) + + print("unique ids", len(ids)) + + for i in range(16): + c = clients[i % len(clients)] + name = f"ws{(i % len(clients)) + 1}" + print(name, "sending ping", i) + c.send(json.dumps(make_ping(i, name))) + + print("receiving pong", i) + for c in clients: + pong = json.loads(c.recv()) + ev = pong["data"][2]["data"] + print("ev: topic", pong["topic"], "event name", ev[0]["data"], "args", ev[1]["data"]) + +def main(): + for _ in range(100): + try: + run(ws_url) + break + except ConnectionRefusedError: + time.sleep(0.1) + +if __name__ == "__main__": + main() +@TEST-END-FILE diff --git a/testing/btest/cluster/websocket/tls-usage-error.zeek b/testing/btest/cluster/websocket/tls-usage-error.zeek new file mode 100644 index 0000000000..d6357324dd --- /dev/null +++ b/testing/btest/cluster/websocket/tls-usage-error.zeek @@ -0,0 +1,19 @@ +# @TEST-DOC: Calling listen_websocket() with badly configured WebSocketTLSOptions. +# +# @TEST-EXEC: zeek -b %INPUT +# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-remove-abspath btest-diff .stderr + + +event zeek_init() + { + local tls_options_no_key = Cluster::WebSocketTLSOptions( + $cert_file="../localhost.crt", + ); + + local tls_options_no_cert = Cluster::WebSocketTLSOptions( + $key_file="../localhost.key", + ); + + assert ! Cluster::listen_websocket([$listen_host="127.0.0.1", $listen_port=1234/tcp, $tls_options=tls_options_no_key]); + assert ! Cluster::listen_websocket([$listen_host="127.0.0.1", $listen_port=1234/tcp, $tls_options=tls_options_no_cert]); + } diff --git a/testing/btest/cluster/websocket/tls.zeek b/testing/btest/cluster/websocket/tls.zeek new file mode 100644 index 0000000000..171dc6f5c2 --- /dev/null +++ b/testing/btest/cluster/websocket/tls.zeek @@ -0,0 +1,151 @@ +# @TEST-DOC: Run a single node cluster (manager) with a websocket server that has TLS enabled. +# +# @TEST-REQUIRES: have-zeromq +# @TEST-REQUIRES: python3 -c 'import websockets.asyncio' +# +# @TEST-GROUP: cluster-zeromq +# +# @TEST-PORT: XPUB_PORT +# @TEST-PORT: XSUB_PORT +# @TEST-PORT: LOG_PULL_PORT +# @TEST-PORT: WEBSOCKET_PORT +# +# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-simple.zeek cluster-layout.zeek +# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek +# +# @TEST-EXEC: zeek -b --parse-only manager.zeek +# @TEST-EXEC: python3 -m py_compile client.py +# +# @TEST-EXEC: chmod +x gen-localhost-certs.sh +# @TEST-EXEC: ./gen-localhost-certs.sh +# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek >out" +# @TEST-EXEC: btest-bg-run client "python3 ../client.py >out" +# +# @TEST-EXEC: btest-bg-wait 30 +# @TEST-EXEC: btest-diff ./manager/out +# @TEST-EXEC: btest-diff ./manager/.stderr +# @TEST-EXEC: btest-diff ./client/out +# @TEST-EXEC: btest-diff ./client/.stderr + +# @TEST-START-FILE manager.zeek +@load ./zeromq-test-bootstrap +redef exit_only_after_terminate = T; + +global ping_count = 0; + +global ping: event(msg: string, c: count) &is_used; +global pong: event(msg: string, c: count) &is_used; + +event zeek_init() + { + Cluster::subscribe("/zeek/event/my_topic"); + + local tls_options = Cluster::WebSocketTLSOptions( + $cert_file="../localhost.crt", + $key_file="../localhost.key", + ); + + local ws_server_options = Cluster::WebSocketServerOptions( + $listen_host="127.0.0.1", + $listen_port=to_port(getenv("WEBSOCKET_PORT")), + $tls_options=tls_options, + ); + + Cluster::listen_websocket(ws_server_options); + } + +event ping(msg: string, n: count) &is_used + { + ++ping_count; + print fmt("got ping: %s, %s", msg, n); + local e = Cluster::make_event(pong, "my-message", ping_count); + Cluster::publish("/zeek/event/my_topic", e); + } + +event Cluster::websocket_client_added(info: Cluster::EndpointInfo, subscriptions: string_vec) + { + print "Cluster::websocket_client_added", subscriptions; + } + +event Cluster::websocket_client_lost(info: Cluster::EndpointInfo) + { + print "Cluster::websocket_client_lost"; + terminate(); + } +# @TEST-END-FILE + + +@TEST-START-FILE client.py +import asyncio, json, os, socket, time +from websockets.asyncio.client import connect + +ws_port = os.environ['WEBSOCKET_PORT'].split('/')[0] +ws_url = f'wss://localhost:{ws_port}/v1/messages/json' +topic = '/zeek/event/my_topic' + +# Make the websockets library use the custom server cert. +# https://stackoverflow.com/a/55856969 +os.environ["SSL_CERT_FILE"] = "../localhost.crt" + +def make_ping(c): + return { + "type": "data-message", + "topic": topic, + "@data-type": "vector", + "data": [ + {"@data-type": "count", "data": 1}, # Format + {"@data-type": "count", "data": 1}, # Type + {"@data-type": "vector", "data": [ + { "@data-type": "string", "data": "ping"}, # Event name + { "@data-type": "vector", "data": [ # event args + {"@data-type": "string", "data": f"python-websocket-client"}, + {"@data-type": "count", "data": c}, + ], }, + ], }, + ], + } + +async def run(): + async with connect(ws_url, family=socket.AF_INET) as ws: + print("Connected!") + # Send subscriptions + await ws.send(json.dumps([topic])) + ack = json.loads(await ws.recv()) + assert "type" in ack + assert ack["type"] == "ack" + assert "endpoint" in ack + assert "version" in ack + + for i in range(5): + print("Sending ping", i) + await ws.send(json.dumps(make_ping(i))) + print("Receiving pong", i) + pong = json.loads(await ws.recv()) + assert pong["@data-type"] == "vector" + ev = pong["data"][2]["data"] + print("topic", pong["topic"], "event name", ev[0]["data"], "args", ev[1]["data"]) + +def main(): + for _ in range(100): + try: + asyncio.run(run()) + break + except ConnectionRefusedError: + time.sleep(0.1) + +if __name__ == "__main__": + main() +@TEST-END-FILE + +# The cert and key were generated with OpenSSL using the following command, +# taken from https://letsencrypt.org/docs/certificates-for-localhost/ +# +# The test will generate the script, but the certificate is valid +# for 10 years. +@TEST-START-FILE gen-localhost-certs.sh +#!/usr/bin/env bash +openssl req -x509 -out localhost.crt -keyout localhost.key \ + -newkey rsa:2048 -nodes -sha256 -days 3650 \ + -subj '/CN=localhost' -extensions EXT -config <( \ + printf "[dn]\nCN=localhost\n[req]\ndistinguished_name = dn\n[EXT]\nsubjectAltName=DNS:localhost\nbasicConstraints=CA:TRUE") +@TEST-END-FILE diff --git a/testing/btest/cluster/websocket/two-pipelining.zeek b/testing/btest/cluster/websocket/two-pipelining.zeek new file mode 100644 index 0000000000..cfef86522f --- /dev/null +++ b/testing/btest/cluster/websocket/two-pipelining.zeek @@ -0,0 +1,158 @@ +# @TEST-DOC: Send subscriptions and events without waiting for pong, should be okay, the websocket server will queue this a bit. +# +# @TEST-REQUIRES: have-zeromq +# @TEST-REQUIRES: python3 -c 'import websockets.sync' +# +# @TEST-GROUP: cluster-zeromq +# +# @TEST-PORT: XPUB_PORT +# @TEST-PORT: XSUB_PORT +# @TEST-PORT: LOG_PULL_PORT +# @TEST-PORT: WEBSOCKET_PORT +# +# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-simple.zeek cluster-layout.zeek +# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek +# +# @TEST-EXEC: zeek -b --parse-only manager.zeek +# @TEST-EXEC: python3 -m py_compile client.py +# +# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek >out" +# @TEST-EXEC: btest-bg-run client "python3 ../client.py >out" +# +# @TEST-EXEC: btest-bg-wait 30 +# @TEST-EXEC: sort ./manager/out > ./manager/out.sorted +# @TEST-EXEC: btest-diff ./manager/out.sorted +# @TEST-EXEC: btest-diff ./manager/.stderr +# @TEST-EXEC: btest-diff ./client/out +# @TEST-EXEC: btest-diff ./client/.stderr + +# @TEST-START-FILE manager.zeek +@load ./zeromq-test-bootstrap +redef exit_only_after_terminate = T; + +global ping_count = 0; + +global ping: event(msg: string, c: count) &is_used; +global pong: event(msg: string, c: count) &is_used; + +event zeek_init() + { + Cluster::subscribe("/zeek/event/to_manager"); + Cluster::listen_websocket([$listen_host="127.0.0.1", $listen_port=to_port(getenv("WEBSOCKET_PORT"))]); + } + +global added = 0; +global lost = 0; + +type Item: record { + msg: string; + n: count; +}; + +global queue: vector of Item; + +event ping(msg: string, n: count) &is_used + { + # Queue the pings if we haven't seen both clients yet. + if ( added < 2 ) + { + queue += Item($msg=msg, $n=n); + return; + } + + ++ping_count; + print fmt("B got ping: %s, %s", msg, n); + local e = Cluster::make_event(pong, "my-message", ping_count); + Cluster::publish("/zeek/event/to_client", e); + } + +event Cluster::websocket_client_added(info: Cluster::EndpointInfo, subscriptions: string_vec) + { + ++added; + print "A Cluster::websocket_client_added", added, subscriptions; + + if ( added == 2 ) + { + # Anything in the queue? + for ( _, item in queue ) + event ping(item$msg, item$n); + } + } + +event Cluster::websocket_client_lost(info: Cluster::EndpointInfo) + { + ++lost; + print "C Cluster::websocket_client_lost"; + if ( lost == 2 ) + terminate(); + } +# @TEST-END-FILE + + +@TEST-START-FILE client.py +import json, os, time +from websockets.sync.client import connect + +ws_port = os.environ['WEBSOCKET_PORT'].split('/')[0] +ws_url = f'ws://127.0.0.1:{ws_port}/v1/messages/json' +topic = '/zeek/event/to_client' + +def make_ping(c, who): + return { + "type": "data-message", + "topic": "/zeek/event/to_manager", + "@data-type": "vector", + "data": [ + {"@data-type": "count", "data": 1}, # Format + {"@data-type": "count", "data": 1}, # Type + {"@data-type": "vector", "data": [ + { "@data-type": "string", "data": "ping"}, # Event name + { "@data-type": "vector", "data": [ # event args + {"@data-type": "string", "data": who}, + {"@data-type": "count", "data": c}, + ], }, + ], }, + ], + } + +def run(ws_url): + with connect(ws_url) as ws1: + with connect(ws_url) as ws2: + clients = [ws1, ws2] + print("Connected!") + # Send subscriptions + for ws in clients: + ws.send(json.dumps([topic])) + + for i in range(5): + for c, ws in enumerate(clients, 1): + print(f"Sending ping {i} - ws{c}") + ws.send(json.dumps(make_ping(i, f"ws{c}"))) + + for c, ws in enumerate(clients, 1): + print(f"Receiving ack - ws{c}") + ack = json.loads(ws.recv()) + assert "type" in ack + assert ack["type"] == "ack" + assert "endpoint" in ack + assert "version" in ack + + for i in range(10): + for c, ws in enumerate(clients): + print(f"Receiving pong {i} - ws{c}") + pong = json.loads(ws.recv()) + assert pong["@data-type"] == "vector" + ev = pong["data"][2]["data"] + print("topic", pong["topic"], "event name", ev[0]["data"], "args", ev[1]["data"]) + +def main(): + for _ in range(100): + try: + run(ws_url) + break + except ConnectionRefusedError: + time.sleep(0.1) + +if __name__ == "__main__": + main() +@TEST-END-FILE diff --git a/testing/btest/opt/ZAM-bif-tracking.zeek b/testing/btest/opt/ZAM-bif-tracking.zeek index 33d387f408..1668c4af35 100644 --- a/testing/btest/opt/ZAM-bif-tracking.zeek +++ b/testing/btest/opt/ZAM-bif-tracking.zeek @@ -98,6 +98,7 @@ global known_BiFs = set( "Broker::make_event", "Broker::publish", "Cluster::Backend::__init", + "Cluster::__listen_websocket", "Cluster::__subscribe", "Cluster::__unsubscribe", "Cluster::make_event",