mirror of
https://github.com/zeek/zeek.git
synced 2025-10-17 14:08:20 +00:00
cluster/websocket: Implement WebSocket server
This commit is contained in:
parent
1e757b2b59
commit
6032741868
75 changed files with 2860 additions and 4 deletions
8
src/cluster/websocket/CMakeLists.txt
Normal file
8
src/cluster/websocket/CMakeLists.txt
Normal file
|
@ -0,0 +1,8 @@
|
|||
add_subdirectory(auxil)
|
||||
|
||||
zeek_add_plugin(
|
||||
Zeek Cluster_WebSocket
|
||||
INCLUDE_DIRS ${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR}
|
||||
DEPENDENCIES ixwebsocket::ixwebsocket
|
||||
SOURCES Plugin.cc WebSocket.cc WebSocket-IXWebSocket.cc
|
||||
BIFS events.bif)
|
19
src/cluster/websocket/Plugin.cc
Normal file
19
src/cluster/websocket/Plugin.cc
Normal file
|
@ -0,0 +1,19 @@
|
|||
// See the file "COPYING" in the main distribution directory for copyright.
|
||||
|
||||
#include "zeek/cluster/websocket/Plugin.h"
|
||||
|
||||
namespace zeek::plugin::Cluster_WebSocket {
|
||||
// Definition of plugin.
|
||||
Plugin plugin;
|
||||
}; // namespace zeek::plugin::Cluster_WebSocket
|
||||
|
||||
namespace zeek::plugin::Cluster_WebSocket {
|
||||
|
||||
zeek::plugin::Configuration Plugin::Configure() {
|
||||
zeek::plugin::Configuration config;
|
||||
config.name = "Zeek::Cluster_WebSocket";
|
||||
config.description = "Provides WebSocket access to a Zeek cluster";
|
||||
return config;
|
||||
}
|
||||
|
||||
} // namespace zeek::plugin::Cluster_WebSocket
|
16
src/cluster/websocket/Plugin.h
Normal file
16
src/cluster/websocket/Plugin.h
Normal file
|
@ -0,0 +1,16 @@
|
|||
// See the file "COPYING" in the main distribution directory for copyright.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "zeek/plugin/Plugin.h"
|
||||
|
||||
namespace zeek::plugin::Cluster_WebSocket {
|
||||
|
||||
class Plugin : public zeek::plugin::Plugin {
|
||||
public:
|
||||
zeek::plugin::Configuration Configure() override;
|
||||
};
|
||||
|
||||
extern Plugin plugin;
|
||||
|
||||
} // namespace zeek::plugin::Cluster_WebSocket
|
5
src/cluster/websocket/README
Normal file
5
src/cluster/websocket/README
Normal file
|
@ -0,0 +1,5 @@
|
|||
A Zeek Cluster's WebSocket interface
|
||||
====================================
|
||||
|
||||
This directory contains code that allows external applications
|
||||
to connect to Zeek using WebSocket connections.
|
170
src/cluster/websocket/WebSocket-IXWebSocket.cc
Normal file
170
src/cluster/websocket/WebSocket-IXWebSocket.cc
Normal file
|
@ -0,0 +1,170 @@
|
|||
// See the file "COPYING" in the main distribution directory for copyright.
|
||||
|
||||
// Implementation of a WebSocket server and clients using the IXWebSocket client library.
|
||||
#include "zeek/cluster/websocket/WebSocket.h"
|
||||
|
||||
#include <memory>
|
||||
#include <stdexcept>
|
||||
|
||||
#include "zeek/Reporter.h"
|
||||
|
||||
#include "ixwebsocket/IXConnectionState.h"
|
||||
#include "ixwebsocket/IXSocketTLSOptions.h"
|
||||
#include "ixwebsocket/IXWebSocket.h"
|
||||
#include "ixwebsocket/IXWebSocketSendData.h"
|
||||
#include "ixwebsocket/IXWebSocketServer.h"
|
||||
|
||||
namespace zeek::cluster::websocket::detail::ixwebsocket {
|
||||
|
||||
/**
|
||||
* Implementation of WebSocketClient for the IXWebsocket library.
|
||||
*/
|
||||
class IxWebSocketClient : public WebSocketClient {
|
||||
public:
|
||||
IxWebSocketClient(std::shared_ptr<ix::ConnectionState> cs, std::shared_ptr<ix::WebSocket> ws)
|
||||
: cs(std::move(cs)), ws(std::move(ws)) {
|
||||
if ( ! this->cs || ! this->ws )
|
||||
throw std::invalid_argument("expected ws and cs to be set");
|
||||
}
|
||||
|
||||
bool IsTerminated() const override {
|
||||
if ( cs->isTerminated() )
|
||||
return true;
|
||||
|
||||
auto rs = ws->getReadyState();
|
||||
return rs == ix::ReadyState::Closing || rs == ix::ReadyState::Closed;
|
||||
}
|
||||
|
||||
void Close(uint16_t code, const std::string& reason) override { ws->close(code, reason); }
|
||||
|
||||
SendInfo SendText(std::string_view sv) override {
|
||||
if ( cs->isTerminated() )
|
||||
return {true}; // small lie
|
||||
|
||||
auto send_info = ws->sendUtf8Text(ix::IXWebSocketSendData{sv.data(), sv.size()});
|
||||
return SendInfo{send_info.success};
|
||||
}
|
||||
|
||||
const std::string& getId() override { return cs->getId(); }
|
||||
const std::string& getRemoteIp() override { return cs->getRemoteIp(); }
|
||||
int getRemotePort() override { return cs->getRemotePort(); }
|
||||
|
||||
private:
|
||||
std::shared_ptr<ix::ConnectionState> cs;
|
||||
std::shared_ptr<ix::WebSocket> ws;
|
||||
};
|
||||
|
||||
/**
|
||||
* Implementation of WebSocketServer using the IXWebsocket library.
|
||||
*/
|
||||
class IXWebSocketServer : public WebSocketServer {
|
||||
public:
|
||||
IXWebSocketServer(std::unique_ptr<WebSocketEventDispatcher> dispatcher, std::unique_ptr<ix::WebSocketServer> server)
|
||||
: WebSocketServer(std::move(dispatcher)), server(std::move(server)) {}
|
||||
|
||||
private:
|
||||
void DoTerminate() override {
|
||||
// Stop the server.
|
||||
server->stop();
|
||||
}
|
||||
|
||||
std::unique_ptr<ix::WebSocketServer> server;
|
||||
};
|
||||
|
||||
std::unique_ptr<WebSocketServer> StartServer(std::unique_ptr<WebSocketEventDispatcher> dispatcher,
|
||||
const ServerOptions& options) {
|
||||
auto server =
|
||||
std::make_unique<ix::WebSocketServer>(options.port, options.host, ix::SocketServer::kDefaultTcpBacklog,
|
||||
options.max_connections,
|
||||
ix::WebSocketServer::kDefaultHandShakeTimeoutSecs,
|
||||
ix::SocketServer::kDefaultAddressFamily, options.ping_interval_seconds);
|
||||
|
||||
if ( ! options.per_message_deflate )
|
||||
server->disablePerMessageDeflate();
|
||||
|
||||
const auto& tls_options = options.tls_options;
|
||||
if ( tls_options.TlsEnabled() ) {
|
||||
ix::SocketTLSOptions ix_tls_options{};
|
||||
ix_tls_options.tls = true;
|
||||
ix_tls_options.certFile = tls_options.cert_file.value();
|
||||
ix_tls_options.keyFile = tls_options.key_file.value();
|
||||
|
||||
if ( tls_options.enable_peer_verification ) {
|
||||
if ( ! tls_options.ca_file.empty() )
|
||||
ix_tls_options.caFile = tls_options.ca_file;
|
||||
}
|
||||
else {
|
||||
// This is the IXWebSocket library's way of
|
||||
// disabling peer verification.
|
||||
ix_tls_options.caFile = "NONE";
|
||||
}
|
||||
|
||||
if ( ! tls_options.ciphers.empty() )
|
||||
ix_tls_options.ciphers = tls_options.ciphers;
|
||||
|
||||
server->setTLSOptions(ix_tls_options);
|
||||
}
|
||||
|
||||
// Using the legacy IXWebsocketAPI API to acquire a shared_ptr to the ix::WebSocket instance.
|
||||
ix::WebSocketServer::OnConnectionCallback connection_callback =
|
||||
[dispatcher = dispatcher.get()](std::weak_ptr<ix::WebSocket> websocket,
|
||||
std::shared_ptr<ix::ConnectionState> cs) -> void {
|
||||
// Hold a shared_ptr to the WebSocket object until we see the close.
|
||||
std::shared_ptr<ix::WebSocket> ws = websocket.lock();
|
||||
|
||||
// Client already gone or terminated? Weird...
|
||||
if ( ! ws || cs->isTerminated() )
|
||||
return;
|
||||
|
||||
auto id = cs->getId();
|
||||
int remotePort = cs->getRemotePort();
|
||||
std::string remoteIp = cs->getRemoteIp();
|
||||
|
||||
auto ixws = std::make_shared<IxWebSocketClient>(std::move(cs), ws);
|
||||
|
||||
// These callbacks run in per client threads. The actual processing happens
|
||||
// on the main thread via a single WebSocketDemux instance.
|
||||
ix::OnMessageCallback message_callback = [dispatcher, id, remotePort, remoteIp,
|
||||
ixws](const ix::WebSocketMessagePtr& msg) mutable {
|
||||
if ( msg->type == ix::WebSocketMessageType::Open ) {
|
||||
dispatcher->QueueForProcessing(
|
||||
WebSocketOpen{id, msg->openInfo.uri, msg->openInfo.protocol, std::move(ixws)});
|
||||
}
|
||||
else if ( msg->type == ix::WebSocketMessageType::Message ) {
|
||||
dispatcher->QueueForProcessing(WebSocketMessage{id, msg->str});
|
||||
}
|
||||
else if ( msg->type == ix::WebSocketMessageType::Close ) {
|
||||
dispatcher->QueueForProcessing(WebSocketClose{id});
|
||||
}
|
||||
else if ( msg->type == ix::WebSocketMessageType::Error ) {
|
||||
dispatcher->QueueForProcessing(WebSocketClose{id});
|
||||
}
|
||||
};
|
||||
|
||||
ws->setOnMessageCallback(message_callback);
|
||||
};
|
||||
|
||||
server->setOnConnectionCallback(connection_callback);
|
||||
|
||||
const auto [success, reason] = server->listen();
|
||||
if ( ! success ) {
|
||||
zeek::reporter->Error("WebSocket: Unable to listen on %s:%d: %s", options.host.c_str(), options.port,
|
||||
reason.c_str());
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
server->start();
|
||||
|
||||
return std::make_unique<IXWebSocketServer>(std::move(dispatcher), std::move(server));
|
||||
}
|
||||
|
||||
|
||||
} // namespace zeek::cluster::websocket::detail::ixwebsocket
|
||||
|
||||
using namespace zeek::cluster::websocket::detail;
|
||||
|
||||
std::unique_ptr<WebSocketServer> zeek::cluster::websocket::detail::StartServer(
|
||||
std::unique_ptr<WebSocketEventDispatcher> dispatcher, const ServerOptions& options) {
|
||||
// Just delegate to the above IXWebSocket specific implementation.
|
||||
return ixwebsocket::StartServer(std::move(dispatcher), options);
|
||||
}
|
504
src/cluster/websocket/WebSocket.cc
Normal file
504
src/cluster/websocket/WebSocket.cc
Normal file
|
@ -0,0 +1,504 @@
|
|||
// See the file "COPYING" in the main distribution directory for copyright.
|
||||
|
||||
// Implement Broker's WebSocket client handling in Zeek.
|
||||
|
||||
#include "zeek/cluster/websocket/WebSocket.h"
|
||||
|
||||
#include <memory>
|
||||
#include <string_view>
|
||||
#include <variant>
|
||||
|
||||
#include "zeek/Reporter.h"
|
||||
#include "zeek/cluster/Backend.h"
|
||||
#include "zeek/cluster/BifSupport.h"
|
||||
#include "zeek/cluster/Manager.h"
|
||||
#include "zeek/cluster/OnLoop.h"
|
||||
#include "zeek/cluster/Serializer.h"
|
||||
#include "zeek/cluster/serializer/broker/Serializer.h"
|
||||
#include "zeek/cluster/websocket/Plugin.h"
|
||||
#include "zeek/cluster/websocket/events.bif.h"
|
||||
#include "zeek/net_util.h"
|
||||
#include "zeek/threading/MsgThread.h"
|
||||
|
||||
#include "broker/data.bif.h"
|
||||
#include "broker/data_envelope.hh"
|
||||
#include "broker/error.hh"
|
||||
#include "broker/format/json.hh"
|
||||
#include "broker/zeek.hh"
|
||||
#include "rapidjson/document.h"
|
||||
#include "rapidjson/rapidjson.h"
|
||||
|
||||
|
||||
#define WS_DEBUG(...) PLUGIN_DBG_LOG(zeek::plugin::Cluster_WebSocket::plugin, __VA_ARGS__)
|
||||
|
||||
namespace zeek {
|
||||
const char* zeek_version();
|
||||
}
|
||||
|
||||
using namespace zeek::cluster::websocket::detail;
|
||||
|
||||
namespace {
|
||||
|
||||
class WebSocketEventHandlingStrategy : public zeek::cluster::detail::EventHandlingStrategy {
|
||||
public:
|
||||
WebSocketEventHandlingStrategy(std::shared_ptr<WebSocketClient> ws, WebSocketEventDispatcher* dispatcher)
|
||||
: wsc(std::move(ws)), dispatcher(dispatcher) {}
|
||||
|
||||
private:
|
||||
/**
|
||||
* Any received remote event is encoded into Broker's JSON v1 format and
|
||||
* send over to the WebSocket client.
|
||||
*
|
||||
* We leverage low-level Broker encoding functions here directly. This
|
||||
* will need some abstractions if client's can opt to use different encodings
|
||||
* of events in the future.
|
||||
*/
|
||||
bool DoHandleRemoteEvent(std::string_view topic, zeek::cluster::detail::Event e) override {
|
||||
// If the client has left, no point in sending it any pending event.
|
||||
if ( wsc->IsTerminated() )
|
||||
return true;
|
||||
|
||||
|
||||
// Any events received from the backend before an Ack was sent
|
||||
// are discarded.
|
||||
if ( ! wsc->IsAcked() )
|
||||
return true;
|
||||
|
||||
// XXX The serialization is somewhat slow, it would be good to offload
|
||||
// it to a thread, or try to go from Val's directly to JSON and see
|
||||
// if that's faster.
|
||||
auto ev = zeek::cluster::detail::to_broker_event(e);
|
||||
if ( ! ev ) {
|
||||
fprintf(stderr, "[ERROR] Unable to go from detail::Event to broker::event\n");
|
||||
return false;
|
||||
}
|
||||
|
||||
buffer.clear();
|
||||
auto envelope = broker::data_envelope::make(topic, ev->as_data());
|
||||
broker::format::json::v1::encode(envelope, std::back_inserter(buffer));
|
||||
|
||||
dispatcher->QueueReply(WebSocketSendReply{wsc, buffer});
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Events from backends aren't enqueued into the event loop when
|
||||
* running for WebSocket clients.
|
||||
*/
|
||||
void DoEnqueueLocalEvent(zeek::EventHandlerPtr h, zeek::Args args) override {}
|
||||
|
||||
std::string buffer;
|
||||
std::shared_ptr<WebSocketClient> wsc;
|
||||
WebSocketEventDispatcher* dispatcher;
|
||||
};
|
||||
|
||||
class ReplyInputMessage : public zeek::threading::BasicInputMessage {
|
||||
public:
|
||||
ReplyInputMessage(WebSocketReply work) : zeek::threading::BasicInputMessage("ReplyInput"), work(std::move(work)) {};
|
||||
bool Process() override {
|
||||
return std::visit([this](auto& item) -> bool { return Process(item); }, work);
|
||||
};
|
||||
|
||||
private:
|
||||
bool Process(const WebSocketSendReply& sr) {
|
||||
const auto& wsc = sr.wsc;
|
||||
if ( wsc->IsTerminated() )
|
||||
return true;
|
||||
|
||||
auto send_info = wsc->SendText(sr.msg);
|
||||
if ( ! send_info.success && ! wsc->IsTerminated() )
|
||||
fprintf(stderr, "[ERROR] Failed to send reply to WebSocket client %s (%s:%d)\n", wsc->getId().c_str(),
|
||||
wsc->getRemoteIp().c_str(), wsc->getRemotePort());
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool Process(const WebSocketCloseReply& cr) {
|
||||
const auto& wsc = cr.wsc;
|
||||
if ( ! wsc->IsTerminated() )
|
||||
wsc->Close(cr.code, cr.reason);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
WebSocketReply work;
|
||||
};
|
||||
|
||||
} // namespace
|
||||
|
||||
|
||||
// Inspired by broker/internal/json_client.cc
|
||||
WebSocketClient::SendInfo WebSocketClient::SendError(std::string_view code, std::string_view message) {
|
||||
std::string buf;
|
||||
buf.reserve(code.size() + message.size() + 32);
|
||||
auto out = std::back_inserter(buf);
|
||||
*out++ = '{';
|
||||
broker::format::json::v1::append_field("type", "error", out);
|
||||
*out++ = ',';
|
||||
broker::format::json::v1::append_field("code", code, out);
|
||||
*out++ = ',';
|
||||
broker::format::json::v1::append_field("message", message, out);
|
||||
*out++ = '}';
|
||||
return SendText(buf);
|
||||
}
|
||||
|
||||
// Inspired by broker/internal/json_client.cc
|
||||
WebSocketClient::SendInfo WebSocketClient::SendAck(std::string_view endpoint, std::string_view version) {
|
||||
std::string buf;
|
||||
buf.reserve(endpoint.size() + version.size() + 32);
|
||||
auto out = std::back_inserter(buf);
|
||||
*out++ = '{';
|
||||
broker::format::json::v1::append_field("type", "ack", out);
|
||||
*out++ = ',';
|
||||
broker::format::json::v1::append_field("endpoint", endpoint, out);
|
||||
*out++ = ',';
|
||||
broker::format::json::v1::append_field("version", version, out);
|
||||
*out++ = '}';
|
||||
auto r = SendText(buf);
|
||||
acked = true;
|
||||
return r;
|
||||
}
|
||||
|
||||
void WebSocketClient::SetSubscriptions(const std::vector<std::string>& topic_prefixes) {
|
||||
for ( const auto& topic_prefix : topic_prefixes )
|
||||
subscriptions_state[topic_prefix] = false;
|
||||
}
|
||||
|
||||
void WebSocketClient::SetSubscriptionActive(const std::string& topic_prefix) {
|
||||
if ( subscriptions_state.count(topic_prefix) == 0 ) {
|
||||
zeek::reporter->InternalWarning("Unknown topic_prefix for WebSocket client %s!", topic_prefix.c_str());
|
||||
return;
|
||||
}
|
||||
|
||||
subscriptions_state[topic_prefix] = true;
|
||||
}
|
||||
|
||||
bool WebSocketClient::AllSubscriptionsActive() const {
|
||||
for ( const auto& [_, status] : subscriptions_state ) {
|
||||
if ( ! status )
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
const std::vector<std::string> WebSocketClient::GetSubscriptions() const {
|
||||
std::vector<std::string> subs;
|
||||
subs.reserve(subscriptions_state.size());
|
||||
|
||||
for ( const auto& [topic, _] : subscriptions_state )
|
||||
subs.emplace_back(topic);
|
||||
|
||||
return subs;
|
||||
}
|
||||
|
||||
class zeek::cluster::websocket::detail::ReplyMsgThread : public zeek::threading::MsgThread {
|
||||
public:
|
||||
ReplyMsgThread() : zeek::threading::MsgThread() { SetName("ws-reply-thread"); }
|
||||
|
||||
void Run() override {
|
||||
zeek::util::detail::set_thread_name("zk/ws-reply-thread");
|
||||
MsgThread::Run();
|
||||
}
|
||||
|
||||
bool OnHeartbeat(double network_time, double current_time) override { return true; }
|
||||
|
||||
bool OnFinish(double network_time) override { return true; }
|
||||
};
|
||||
|
||||
WebSocketEventDispatcher::WebSocketEventDispatcher() {
|
||||
onloop =
|
||||
new zeek::detail::OnLoopProcess<WebSocketEventDispatcher, WebSocketEvent>(this, "WebSocketEventDispatcher");
|
||||
// Register the onloop instance the IO loop. Lifetime will be managed by the loop.
|
||||
onloop->Register(false);
|
||||
|
||||
reply_msg_thread = new ReplyMsgThread();
|
||||
reply_msg_thread->Start();
|
||||
}
|
||||
|
||||
WebSocketEventDispatcher::~WebSocketEventDispatcher() {
|
||||
// Freed by threading manager.
|
||||
reply_msg_thread = nullptr;
|
||||
}
|
||||
|
||||
void WebSocketEventDispatcher::Terminate() {
|
||||
WS_DEBUG("Terminating WebSocketEventDispatcher");
|
||||
|
||||
for ( auto& [_, client] : clients ) {
|
||||
const auto& wsc = client.wsc;
|
||||
const auto& backend = client.backend;
|
||||
WS_DEBUG("Sending close to WebSocket client %s (%s:%d)", wsc->getId().c_str(), wsc->getRemoteIp().c_str(),
|
||||
wsc->getRemotePort());
|
||||
|
||||
QueueReply(WebSocketCloseReply{wsc, 1001, "Terminating"});
|
||||
backend->Terminate();
|
||||
}
|
||||
|
||||
clients.clear();
|
||||
}
|
||||
|
||||
void WebSocketEventDispatcher::QueueForProcessing(WebSocketEvent&& event) {
|
||||
// Just delegate to onloop. The work will be done in Process()
|
||||
onloop->QueueForProcessing(std::move(event));
|
||||
}
|
||||
|
||||
void WebSocketEventDispatcher::QueueReply(WebSocketReply&& reply) {
|
||||
// Delegate to the reply thread.
|
||||
reply_msg_thread->SendIn(new ReplyInputMessage(std::move(reply)));
|
||||
}
|
||||
|
||||
// WebSocketDemux::Process() runs in the main thread.
|
||||
//
|
||||
// XXX: How is this going to work with class broker? With
|
||||
// ZeroMQ, each WebSocket client has its own XPUB/XSUB
|
||||
// connectivity to a central broker and similarly with NATS.
|
||||
// But with broker we need to do something different.
|
||||
// Maybe connect to the local endpoint.
|
||||
//
|
||||
// We cannot actually instantiate a Broker backend :-(
|
||||
//
|
||||
// We could also have InitPostScript() recognize Broker
|
||||
// and start its internal server instead.
|
||||
void WebSocketEventDispatcher::Process(const WebSocketEvent& event) {
|
||||
std::visit([this](auto&& arg) { Process(arg); }, event);
|
||||
}
|
||||
|
||||
void WebSocketEventDispatcher::Process(const WebSocketOpen& open) {
|
||||
const auto& wsc = open.wsc;
|
||||
const auto& id = open.id;
|
||||
const auto& it = clients.find(id);
|
||||
if ( it != clients.end() ) {
|
||||
// This shouldn't happen!
|
||||
reporter->Error("Open for existing WebSocket client with id %s!", id.c_str());
|
||||
QueueReply(WebSocketCloseReply{wsc, 1001, "Internal error"});
|
||||
return;
|
||||
}
|
||||
|
||||
// As of now, terminate clients coming to anything other than /v1/messages/json.
|
||||
if ( open.uri != "/v1/messages/json" ) {
|
||||
open.wsc->SendError("invalid_uri", "Invalid URI - use /v1/messages/json");
|
||||
open.wsc->Close(1008, "Invalid URI - use /v1/messages/json");
|
||||
|
||||
// Still create an entry as we might see messages and close events coming in.
|
||||
clients[id] = WebSocketClientEntry{id, wsc, nullptr};
|
||||
return;
|
||||
}
|
||||
|
||||
// Generate an ID for this client.
|
||||
auto ws_id = cluster::backend->NodeId() + "-websocket-" + id;
|
||||
|
||||
const auto& event_serializer_val = id::find_val<zeek::EnumVal>("Cluster::event_serializer");
|
||||
auto event_serializer = cluster::manager->InstantiateEventSerializer(event_serializer_val);
|
||||
const auto& cluster_backend_val = id::find_val<zeek::EnumVal>("Cluster::backend");
|
||||
auto event_handling_strategy = std::make_unique<WebSocketEventHandlingStrategy>(wsc, this);
|
||||
auto backend = zeek::cluster::manager->InstantiateBackend(cluster_backend_val, std::move(event_serializer), nullptr,
|
||||
std::move(event_handling_strategy));
|
||||
|
||||
WS_DEBUG("New WebSocket client %s (%s:%d) - using id %s backend=%p", id.c_str(), wsc->getRemoteIp().c_str(),
|
||||
wsc->getRemotePort(), ws_id.c_str(), backend.get());
|
||||
|
||||
// XXX: We call InitPostScript to populate member vars required for connectivity.
|
||||
backend->InitPostScript();
|
||||
backend->Init(std::move(ws_id));
|
||||
|
||||
clients[id] = WebSocketClientEntry{id, wsc, std::move(backend)};
|
||||
}
|
||||
|
||||
void WebSocketEventDispatcher::Process(const WebSocketClose& close) {
|
||||
const auto& id = close.id;
|
||||
const auto& it = clients.find(id);
|
||||
|
||||
if ( it == clients.end() ) {
|
||||
reporter->Error("Close from non-existing WebSocket client with id %s!", id.c_str());
|
||||
return;
|
||||
}
|
||||
|
||||
auto& wsc = it->second.wsc;
|
||||
auto& backend = it->second.backend;
|
||||
|
||||
WS_DEBUG("Close from client %s (%s:%d) backend=%p", wsc->getId().c_str(), wsc->getRemoteIp().c_str(),
|
||||
wsc->getRemotePort(), backend.get());
|
||||
|
||||
// If the client doesn't have a backend, it wasn't ever properly instantiated.
|
||||
if ( backend ) {
|
||||
auto rec = zeek::cluster::detail::bif::make_endpoint_info(backend->NodeId(), wsc->getRemoteIp(),
|
||||
wsc->getRemotePort(), TRANSPORT_TCP);
|
||||
zeek::event_mgr.Enqueue(Cluster::websocket_client_lost, std::move(rec));
|
||||
|
||||
backend->Terminate();
|
||||
}
|
||||
|
||||
clients.erase(it);
|
||||
}
|
||||
|
||||
// SubscribeFinished is produced internally.
|
||||
void WebSocketEventDispatcher::Process(const WebSocketSubscribeFinished& fin) {
|
||||
const auto& it = clients.find(fin.id);
|
||||
if ( it == clients.end() ) {
|
||||
reporter->Error("Subscribe finished from non-existing WebSocket client with id %s!", fin.id.c_str());
|
||||
return;
|
||||
}
|
||||
|
||||
auto& entry = it->second;
|
||||
auto& wsc = entry.wsc;
|
||||
|
||||
entry.wsc->SetSubscriptionActive(fin.topic_prefix);
|
||||
|
||||
if ( ! entry.wsc->AllSubscriptionsActive() ) {
|
||||
// More subscriptions to come.
|
||||
return;
|
||||
}
|
||||
|
||||
auto rec = zeek::cluster::detail::bif::make_endpoint_info(backend->NodeId(), wsc->getRemoteIp(),
|
||||
wsc->getRemotePort(), TRANSPORT_TCP);
|
||||
auto subscriptions_vec = zeek::cluster::detail::bif::make_string_vec(wsc->GetSubscriptions());
|
||||
zeek::event_mgr.Enqueue(Cluster::websocket_client_added, std::move(rec), std::move(subscriptions_vec));
|
||||
|
||||
entry.wsc->SendAck(entry.backend->NodeId(), zeek::zeek_version());
|
||||
|
||||
WS_DEBUG("Sent Ack to %s %s\n", fin.id.c_str(), entry.backend->NodeId().c_str());
|
||||
|
||||
// Process any queued messages now.
|
||||
for ( auto& msg : entry.queue ) {
|
||||
assert(entry.msg_count > 1);
|
||||
Process(msg);
|
||||
}
|
||||
}
|
||||
|
||||
void WebSocketEventDispatcher::HandleSubscriptions(WebSocketClientEntry& entry, std::string_view buf) {
|
||||
rapidjson::Document doc;
|
||||
doc.Parse(buf.data(), buf.size());
|
||||
if ( ! doc.IsArray() ) {
|
||||
entry.wsc->SendError(broker::enum_str(broker::ec::deserialization_failed), "subscriptions not an array");
|
||||
return;
|
||||
}
|
||||
|
||||
std::vector<std::string> subscriptions;
|
||||
|
||||
for ( rapidjson::SizeType i = 0; i < doc.Size(); i++ ) {
|
||||
if ( ! doc[i].IsString() ) {
|
||||
entry.wsc->SendError(broker::enum_str(broker::ec::deserialization_failed),
|
||||
"individual subscription not a string");
|
||||
return;
|
||||
}
|
||||
|
||||
subscriptions.emplace_back(doc[i].GetString());
|
||||
}
|
||||
|
||||
entry.wsc->SetSubscriptions(subscriptions);
|
||||
|
||||
auto cb = [this, id = entry.id, wsc = entry.wsc](const std::string& topic,
|
||||
const Backend::SubscriptionCallbackInfo& info) {
|
||||
if ( info.status == Backend::CallbackStatus::Error ) {
|
||||
zeek::reporter->Error("Subscribe for WebSocket client failed!");
|
||||
|
||||
// Is this going to work out?
|
||||
QueueReply(WebSocketCloseReply{wsc, 1011, "Could not subscribe. Something bad happened!"});
|
||||
}
|
||||
else {
|
||||
Process(WebSocketSubscribeFinished{id, topic});
|
||||
}
|
||||
};
|
||||
|
||||
for ( const auto& subscription : subscriptions ) {
|
||||
if ( ! entry.backend->Subscribe(subscription, cb) ) {
|
||||
zeek::reporter->Error("Subscribe for WebSocket client failed!");
|
||||
QueueReply(WebSocketCloseReply{entry.wsc, 1011, "Could not subscribe. Something bad happened!"});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void WebSocketEventDispatcher::HandleEvent(WebSocketClientEntry& entry, std::string_view buf) {
|
||||
// Unserialize the message as an event.
|
||||
broker::variant res;
|
||||
auto err = broker::format::json::v1::decode(buf, res);
|
||||
if ( err ) {
|
||||
entry.wsc->SendError(broker::enum_str(broker::ec::deserialization_failed), "failed to decode JSON object");
|
||||
return;
|
||||
}
|
||||
|
||||
std::string topic = std::string(res->shared_envelope()->topic());
|
||||
|
||||
if ( topic == broker::topic::reserved ) {
|
||||
entry.wsc->SendError(broker::enum_str(broker::ec::deserialization_failed), "no topic in top-level JSON object");
|
||||
return;
|
||||
}
|
||||
|
||||
broker::zeek::Event broker_ev(std::move(res));
|
||||
|
||||
// This is not guaranteed to work! If the node running the WebSocket
|
||||
// API does not have the declaration of the event that another node
|
||||
// is sending, it cannot instantiate the zeek::cluster::Event for
|
||||
// re-publishing to a cluster backend.
|
||||
//
|
||||
// Does that make conceptional sense? Basically the WebSocket API
|
||||
// has Zeek-script awareness.
|
||||
//
|
||||
// It works with Broker today because Broker treats messages opaquely.
|
||||
// It knows how to convert from JSON into Broker binary format as these
|
||||
// are compatible.
|
||||
//
|
||||
// However, the broker format is under specified (vectors are used for various
|
||||
// types without being tagged explicitly), so it's not possible to determine
|
||||
// the final Zeek type without having access to the script-layer.
|
||||
//
|
||||
// I'm not sure this is a real problem, other than it being unfortunate that
|
||||
// the Zeek process running the WebSocket API requires access to all declarations
|
||||
// of events being transmitted via WebSockets. Though this might be a given anyhow.
|
||||
//
|
||||
// See broker/Data.cc for broker::vector conversion to see the collisions:
|
||||
// vector, list, func, record, pattern, opaque are all encoded using
|
||||
// broker::vector rather than dedicated types.
|
||||
//
|
||||
// Switching to a JSON v2 format that ensures all Zeek types are represented
|
||||
// explicitly would help.
|
||||
const auto& zeek_ev = cluster::detail::to_zeek_event(broker_ev);
|
||||
if ( ! zeek_ev ) {
|
||||
entry.wsc->SendError(broker::enum_str(broker::ec::deserialization_failed), "failed to create Zeek event");
|
||||
return;
|
||||
}
|
||||
|
||||
WS_DEBUG("Publishing event %s to topic '%s'", std::string(zeek_ev->HandlerName()).c_str(), topic.c_str());
|
||||
entry.backend->PublishEvent(topic, *zeek_ev);
|
||||
}
|
||||
|
||||
// Process a WebSocket message from a client.
|
||||
//
|
||||
// If it's the first message, the code is expecting a subscriptions
|
||||
// array, otherwise it'll be a remote event.
|
||||
void WebSocketEventDispatcher::Process(const WebSocketMessage& msg) {
|
||||
const auto& id = msg.id;
|
||||
|
||||
const auto& it = clients.find(id);
|
||||
if ( it == clients.end() ) {
|
||||
reporter->Error("WebSocket message from non-existing WebSocket client %s", id.c_str());
|
||||
return;
|
||||
}
|
||||
|
||||
// Client without backend wasn't accepted, just discard its message.
|
||||
if ( ! it->second.backend )
|
||||
return;
|
||||
|
||||
auto& entry = it->second;
|
||||
const auto& wsc = entry.wsc;
|
||||
entry.msg_count++;
|
||||
|
||||
WS_DEBUG("Message %" PRIu64 " size=%zu from %s (%s:%d) backend=%p", entry.msg_count, msg.msg.size(),
|
||||
wsc->getId().c_str(), wsc->getRemoteIp().c_str(), wsc->getRemotePort(), entry.backend.get());
|
||||
|
||||
// First message is the subscription message.
|
||||
if ( entry.msg_count == 1 ) {
|
||||
WS_DEBUG("Subscriptions from client: %s: (%s:%d)\n", id.c_str(), wsc->getRemoteIp().c_str(),
|
||||
wsc->getRemotePort());
|
||||
HandleSubscriptions(entry, msg.msg);
|
||||
}
|
||||
else {
|
||||
if ( ! wsc->IsAcked() ) {
|
||||
WS_DEBUG("Client sending messages before receiving ack!");
|
||||
entry.queue.push_back(msg);
|
||||
return;
|
||||
}
|
||||
|
||||
HandleEvent(entry, msg.msg);
|
||||
}
|
||||
}
|
321
src/cluster/websocket/WebSocket.h
Normal file
321
src/cluster/websocket/WebSocket.h
Normal file
|
@ -0,0 +1,321 @@
|
|||
// See the file "COPYING" in the main distribution directory for copyright.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <list>
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <optional>
|
||||
#include <string>
|
||||
#include <variant>
|
||||
#include <vector>
|
||||
|
||||
namespace zeek {
|
||||
|
||||
namespace detail {
|
||||
|
||||
template<class Proc, class Work>
|
||||
class OnLoopProcess;
|
||||
}
|
||||
|
||||
namespace cluster {
|
||||
|
||||
class Backend;
|
||||
|
||||
namespace websocket::detail {
|
||||
|
||||
|
||||
/**
|
||||
* Library independent interface for a WebSocket client.
|
||||
*
|
||||
* All methods should be safe to be called from Zeek's
|
||||
* main thread, though some may fail if the client has vanished
|
||||
* or vanishes during an operation.
|
||||
*/
|
||||
class WebSocketClient {
|
||||
public:
|
||||
virtual ~WebSocketClient() = default;
|
||||
|
||||
/**
|
||||
* @returns true if the WebSocket client has terminated
|
||||
*/
|
||||
virtual bool IsTerminated() const = 0;
|
||||
|
||||
/**
|
||||
* Close the WebSocket connection with the given code/reason.
|
||||
*/
|
||||
virtual void Close(uint16_t code = 1000, const std::string& reason = "Normal closure") = 0;
|
||||
|
||||
/**
|
||||
* Information about the send operation.
|
||||
*/
|
||||
struct SendInfo {
|
||||
bool success;
|
||||
};
|
||||
|
||||
/**
|
||||
* Thread safe sending.
|
||||
*
|
||||
* This might be called from Zeek's main thread and
|
||||
* must be safe to be called whether or not the connection
|
||||
* with the client is still alive.
|
||||
*
|
||||
* @param sv The buffer to send as a WebSocket message.
|
||||
*/
|
||||
virtual SendInfo SendText(std::string_view sv) = 0;
|
||||
|
||||
/**
|
||||
* Send an error in Broker JSON/v1 format to the client.
|
||||
*/
|
||||
SendInfo SendError(std::string_view code, std::string_view ctx);
|
||||
|
||||
/**
|
||||
* Send an ACK message Broker JSON/v1 format to the client.
|
||||
*/
|
||||
SendInfo SendAck(std::string_view endpoint, std::string_view version);
|
||||
|
||||
/**
|
||||
* @return - has an ACK been sent to the client?
|
||||
*/
|
||||
bool IsAcked() const { return acked; }
|
||||
|
||||
/**
|
||||
* @return The WebSocket client's identifier.
|
||||
*/
|
||||
virtual const std::string& getId() = 0;
|
||||
|
||||
/**
|
||||
* @return The WebSocket client's remote IP address.
|
||||
*/
|
||||
virtual const std::string& getRemoteIp() = 0;
|
||||
|
||||
/**
|
||||
* @return The WebSocket client's remote port.
|
||||
*/
|
||||
virtual int getRemotePort() = 0;
|
||||
|
||||
/**
|
||||
* Store the client's subscriptions as "not active".
|
||||
*/
|
||||
void SetSubscriptions(const std::vector<std::string>& topic_prefixes);
|
||||
|
||||
/**
|
||||
* @return The client's subscriptions.
|
||||
*/
|
||||
const std::vector<std::string> GetSubscriptions() const;
|
||||
|
||||
/**
|
||||
* Store the client's subscriptions as "not active".
|
||||
*/
|
||||
void SetSubscriptionActive(const std::string& topic_prefix);
|
||||
|
||||
/**
|
||||
* @return true if all subscriptions have an active status.
|
||||
*/
|
||||
bool AllSubscriptionsActive() const;
|
||||
|
||||
private:
|
||||
bool acked = false;
|
||||
std::map<std::string, bool> subscriptions_state;
|
||||
};
|
||||
|
||||
// An new WebSocket client connected. Client is locally identified by `id`.
|
||||
struct WebSocketOpen {
|
||||
std::string id;
|
||||
std::string uri;
|
||||
std::string protocol;
|
||||
std::shared_ptr<WebSocketClient> wsc;
|
||||
};
|
||||
|
||||
// A WebSocket client disconnected.
|
||||
struct WebSocketClose {
|
||||
std::string id;
|
||||
};
|
||||
|
||||
// A WebSocket client send a message.
|
||||
struct WebSocketMessage {
|
||||
std::string id;
|
||||
std::string msg;
|
||||
};
|
||||
|
||||
// Produced internally when a WebSocket client's
|
||||
// subscription has completed.
|
||||
struct WebSocketSubscribeFinished {
|
||||
std::string id;
|
||||
std::string topic_prefix;
|
||||
};
|
||||
|
||||
using WebSocketEvent = std::variant<WebSocketOpen, WebSocketSubscribeFinished, WebSocketClose, WebSocketMessage>;
|
||||
|
||||
struct WebSocketSendReply {
|
||||
std::shared_ptr<WebSocketClient> wsc;
|
||||
std::string msg;
|
||||
};
|
||||
|
||||
struct WebSocketCloseReply {
|
||||
std::shared_ptr<WebSocketClient> wsc;
|
||||
uint16_t code = 1000;
|
||||
std::string reason = "Normal closure";
|
||||
};
|
||||
|
||||
using WebSocketReply = std::variant<WebSocketSendReply, WebSocketCloseReply>;
|
||||
|
||||
|
||||
class ReplyMsgThread;
|
||||
|
||||
/**
|
||||
* Handle events produced by WebSocket clients.
|
||||
*
|
||||
* Any thread may call QueueForProcessing(). Process() runs
|
||||
* on Zeek's main thread.
|
||||
*/
|
||||
class WebSocketEventDispatcher {
|
||||
public:
|
||||
WebSocketEventDispatcher();
|
||||
|
||||
~WebSocketEventDispatcher();
|
||||
|
||||
/**
|
||||
* Called shutting down a WebSocket server.
|
||||
*/
|
||||
void Terminate();
|
||||
|
||||
/**
|
||||
* Queue the given WebSocket event to be processed on Zeek's main loop.
|
||||
*
|
||||
* @param work The WebSocket event to process.
|
||||
*/
|
||||
void QueueForProcessing(WebSocketEvent&& event);
|
||||
|
||||
/**
|
||||
* Send a reply to the given websocket client.
|
||||
*
|
||||
* The dispatcher has an internal thread for serializing
|
||||
* and sending out the event.
|
||||
*/
|
||||
void QueueReply(WebSocketReply&& reply);
|
||||
|
||||
private:
|
||||
/**
|
||||
* Main processing function of the dispatcher.
|
||||
*
|
||||
* This runs on Zeek's main thread.
|
||||
*/
|
||||
void Process(const WebSocketEvent& event);
|
||||
|
||||
void Process(const WebSocketOpen& open);
|
||||
void Process(const WebSocketSubscribeFinished& fin);
|
||||
void Process(const WebSocketMessage& msg);
|
||||
void Process(const WebSocketClose& close);
|
||||
|
||||
|
||||
/**
|
||||
* Data structure for tracking WebSocket clients.
|
||||
*/
|
||||
struct WebSocketClientEntry {
|
||||
std::string id;
|
||||
std::shared_ptr<WebSocketClient> wsc;
|
||||
std::shared_ptr<zeek::cluster::Backend> backend;
|
||||
uint64_t msg_count = 0;
|
||||
std::list<WebSocketMessage> queue;
|
||||
};
|
||||
|
||||
|
||||
void HandleSubscriptions(WebSocketClientEntry& entry, std::string_view buf);
|
||||
void HandleEvent(WebSocketClientEntry& entry, std::string_view buf);
|
||||
|
||||
// Allow access to Process(WebSocketEvent)
|
||||
friend zeek::detail::OnLoopProcess<WebSocketEventDispatcher, WebSocketEvent>;
|
||||
|
||||
// Clients that this dispatcher is tracking.
|
||||
std::map<std::string, WebSocketClientEntry> clients;
|
||||
|
||||
// Connector to the IO loop.
|
||||
zeek::detail::OnLoopProcess<WebSocketEventDispatcher, WebSocketEvent>* onloop = nullptr;
|
||||
|
||||
// Thread replying to clients. Zeek's threading manager takes ownership.
|
||||
ReplyMsgThread* reply_msg_thread = nullptr;
|
||||
};
|
||||
|
||||
/**
|
||||
* An abstract WebSocket server.
|
||||
*/
|
||||
class WebSocketServer {
|
||||
public:
|
||||
WebSocketServer(std::unique_ptr<WebSocketEventDispatcher> demux) : dispatcher(std::move(demux)) {}
|
||||
virtual ~WebSocketServer() = default;
|
||||
|
||||
/**
|
||||
* Stop this server.
|
||||
*/
|
||||
void Terminate() {
|
||||
dispatcher->Terminate();
|
||||
|
||||
DoTerminate();
|
||||
}
|
||||
|
||||
WebSocketEventDispatcher& Dispatcher() { return *dispatcher; }
|
||||
|
||||
private:
|
||||
/**
|
||||
* Hook to be implemented when a server is terminated.
|
||||
*/
|
||||
virtual void DoTerminate() = 0;
|
||||
|
||||
std::unique_ptr<WebSocketEventDispatcher> dispatcher;
|
||||
};
|
||||
|
||||
/**
|
||||
* TLS configuration for a WebSocket server.
|
||||
*/
|
||||
struct TLSOptions {
|
||||
std::optional<std::string> cert_file;
|
||||
std::optional<std::string> key_file;
|
||||
bool enable_peer_verification = false;
|
||||
std::string ca_file;
|
||||
std::string ciphers;
|
||||
|
||||
/**
|
||||
* Is TLS enabled?
|
||||
*/
|
||||
bool TlsEnabled() const { return cert_file.has_value() && key_file.has_value(); }
|
||||
|
||||
bool operator==(const TLSOptions& o) const {
|
||||
return cert_file == o.cert_file && key_file == o.key_file &&
|
||||
enable_peer_verification == o.enable_peer_verification && ca_file == o.ca_file && ciphers == o.ciphers;
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Options for a WebSocket server.
|
||||
*/
|
||||
struct ServerOptions {
|
||||
std::string host;
|
||||
uint16_t port = 0;
|
||||
int ping_interval_seconds = 5;
|
||||
int max_connections = 100;
|
||||
bool per_message_deflate = false;
|
||||
struct TLSOptions tls_options;
|
||||
|
||||
bool operator==(const ServerOptions& o) const {
|
||||
return host == o.host && port == o.port && ping_interval_seconds == o.ping_interval_seconds &&
|
||||
max_connections == o.max_connections && per_message_deflate == o.per_message_deflate &&
|
||||
tls_options == o.tls_options;
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
/**
|
||||
* Start a WebSocket server.
|
||||
*
|
||||
* @param dispatcher The dispatcher to use for the server.
|
||||
* @param options Options for the server.
|
||||
*
|
||||
* @return Pointer to a new WebSocketServer instance or nullptr on error.
|
||||
*/
|
||||
std::unique_ptr<WebSocketServer> StartServer(std::unique_ptr<WebSocketEventDispatcher> dispatcher,
|
||||
const ServerOptions& options);
|
||||
|
||||
} // namespace websocket::detail
|
||||
} // namespace cluster
|
||||
} // namespace zeek
|
7
src/cluster/websocket/auxil/CMakeLists.txt
Normal file
7
src/cluster/websocket/auxil/CMakeLists.txt
Normal file
|
@ -0,0 +1,7 @@
|
|||
option(IXWEBSOCKET_INSTALL "Install IXWebSocket" OFF)
|
||||
|
||||
set(BUILD_SHARED_LIBS OFF)
|
||||
set(USE_TLS ON)
|
||||
set(USE_OPEN_SSL ON)
|
||||
|
||||
add_subdirectory(IXWebSocket)
|
13
src/cluster/websocket/events.bif
Normal file
13
src/cluster/websocket/events.bif
Normal file
|
@ -0,0 +1,13 @@
|
|||
module Cluster;
|
||||
|
||||
## Generated when a new WebSocket client has connected.
|
||||
##
|
||||
## endpoint: Various information about the WebSocket client.
|
||||
##
|
||||
## subscriptions: The WebSocket client's subscriptions as provided in the handshake.
|
||||
event websocket_client_added%(endpoint: EndpointInfo, subscriptions: string_vec%);
|
||||
|
||||
## Generated when a WebSocket client was lost.
|
||||
##
|
||||
## endpoint: Various information about the WebSocket client.
|
||||
event websocket_client_lost%(endpoint: EndpointInfo%);
|
Loading…
Add table
Add a link
Reference in a new issue