mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 06:38:20 +00:00
cluster/WebSocket: Include X-Application-Name in cluster.log
A bit ad-hoc formatting for the log, but that's mostly because cluster.log only has message field and I don't think having a dedicated application_name column is worth it. That could also be added by custom scripts if it's really wanted for a given deployment.
This commit is contained in:
parent
5847a2d32e
commit
1d931b5a2f
9 changed files with 40 additions and 22 deletions
|
@ -398,6 +398,8 @@ export {
|
|||
type EndpointInfo: record {
|
||||
id: string;
|
||||
network: NetworkInfo;
|
||||
## The value of the X-Application-Name HTTP header, if any.
|
||||
application_name: string &optional;
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -675,17 +677,25 @@ function listen_websocket(options: WebSocketServerOptions): bool
|
|||
return Cluster::__listen_websocket(options);
|
||||
}
|
||||
|
||||
function format_endpoint_info(ei: EndpointInfo): string
|
||||
{
|
||||
local s = fmt("'%s' (%s:%d)", ei$id, ei$network$address, ei$network$bound_port);
|
||||
if ( ei?$application_name )
|
||||
s += fmt(" application_name=%s", ei$application_name);
|
||||
return s;
|
||||
}
|
||||
|
||||
event websocket_client_added(endpoint: EndpointInfo, subscriptions: string_vec)
|
||||
{
|
||||
local msg = fmt("WebSocket client '%s' (%s:%d) subscribed to %s",
|
||||
endpoint$id, endpoint$network$address, endpoint$network$bound_port, subscriptions);
|
||||
local msg = fmt("WebSocket client %s subscribed to %s",
|
||||
format_endpoint_info(endpoint), subscriptions);
|
||||
Cluster::log(msg);
|
||||
}
|
||||
|
||||
event websocket_client_lost(endpoint: EndpointInfo, code: count, reason: string)
|
||||
{
|
||||
local msg = fmt("WebSocket client '%s' (%s:%d) gone with code %d%s",
|
||||
endpoint$id, endpoint$network$address, endpoint$network$bound_port, code,
|
||||
local msg = fmt("WebSocket client %s gone with code %d%s",
|
||||
format_endpoint_info(endpoint), code,
|
||||
|reason| > 0 ? fmt(" and reason '%s'", reason) : "");
|
||||
Cluster::log(msg);
|
||||
}
|
||||
|
|
|
@ -138,7 +138,7 @@ bool is_cluster_pool(const zeek::Val* pool) {
|
|||
}
|
||||
|
||||
zeek::RecordValPtr make_endpoint_info(const std::string& id, const std::string& address, uint32_t port,
|
||||
TransportProto proto) {
|
||||
TransportProto proto, std::optional<std::string> application_name) {
|
||||
static const auto ep_info_type = zeek::id::find_type<zeek::RecordType>("Cluster::EndpointInfo");
|
||||
static const auto net_info_type = zeek::id::find_type<zeek::RecordType>("Cluster::NetworkInfo");
|
||||
|
||||
|
@ -149,6 +149,8 @@ zeek::RecordValPtr make_endpoint_info(const std::string& id, const std::string&
|
|||
auto ep_rec = zeek::make_intrusive<zeek::RecordVal>(ep_info_type);
|
||||
ep_rec->Assign(0, id);
|
||||
ep_rec->Assign(1, net_rec);
|
||||
if ( application_name )
|
||||
ep_rec->Assign(2, zeek::make_intrusive<zeek::StringVal>(*application_name));
|
||||
|
||||
return ep_rec;
|
||||
}
|
||||
|
|
|
@ -2,6 +2,9 @@
|
|||
|
||||
#pragma once
|
||||
|
||||
#include <optional>
|
||||
#include <string>
|
||||
|
||||
#include "zeek/IntrusivePtr.h"
|
||||
#include "zeek/Span.h"
|
||||
#include "zeek/net_util.h"
|
||||
|
@ -60,7 +63,7 @@ bool is_cluster_pool(const zeek::Val* pool);
|
|||
* @returns A record value of type Cluster::EndpointInfo filled with the provided info.
|
||||
*/
|
||||
zeek::RecordValPtr make_endpoint_info(const std::string& id, const std::string& address, uint32_t port,
|
||||
TransportProto proto);
|
||||
TransportProto proto, std::optional<std::string> application_name);
|
||||
|
||||
/**
|
||||
* Helper to go from a vector or array of std::strings to a zeek::VectorVal.
|
||||
|
|
|
@ -350,7 +350,7 @@ void WebSocketEventDispatcher::Process(const WebSocketOpen& open) {
|
|||
backend->InitPostScript();
|
||||
backend->Init(std::move(ws_id));
|
||||
|
||||
clients[id] = WebSocketClientEntry{id, wsc, std::move(backend)};
|
||||
clients[id] = WebSocketClientEntry{id, wsc, std::move(backend), open.application_name};
|
||||
}
|
||||
|
||||
void WebSocketEventDispatcher::Process(const WebSocketClose& close) {
|
||||
|
@ -375,8 +375,9 @@ void WebSocketEventDispatcher::Process(const WebSocketClose& close) {
|
|||
// Raise Cluster::websocket_client_lost() after the backend has terminated.
|
||||
// In case any messages/events were still pending, Cluster::websocket_client_lost()
|
||||
// should be the last event related to this WebSocket client.
|
||||
auto rec = zeek::cluster::detail::bif::make_endpoint_info(backend->NodeId(), wsc->getRemoteIp(),
|
||||
wsc->getRemotePort(), TRANSPORT_TCP);
|
||||
auto rec =
|
||||
zeek::cluster::detail::bif::make_endpoint_info(backend->NodeId(), wsc->getRemoteIp(), wsc->getRemotePort(),
|
||||
TRANSPORT_TCP, it->second.application_name);
|
||||
zeek::event_mgr.Enqueue(Cluster::websocket_client_lost, std::move(rec), zeek::val_mgr->Count(close.code),
|
||||
zeek::make_intrusive<zeek::StringVal>(close.reason));
|
||||
}
|
||||
|
@ -481,8 +482,9 @@ void WebSocketEventDispatcher::HandleSubscriptions(WebSocketClientEntry& entry,
|
|||
void WebSocketEventDispatcher::HandleSubscriptionsActive(const WebSocketClientEntry& entry) {
|
||||
auto& wsc = entry.wsc;
|
||||
|
||||
auto rec = zeek::cluster::detail::bif::make_endpoint_info(entry.backend->NodeId(), wsc->getRemoteIp(),
|
||||
wsc->getRemotePort(), TRANSPORT_TCP);
|
||||
auto rec =
|
||||
zeek::cluster::detail::bif::make_endpoint_info(entry.backend->NodeId(), wsc->getRemoteIp(),
|
||||
wsc->getRemotePort(), TRANSPORT_TCP, entry.application_name);
|
||||
auto subscriptions_vec = zeek::cluster::detail::bif::make_string_vec(wsc->GetSubscriptions());
|
||||
zeek::event_mgr.Enqueue(Cluster::websocket_client_added, std::move(rec), std::move(subscriptions_vec));
|
||||
|
||||
|
|
|
@ -233,6 +233,7 @@ private:
|
|||
std::string id;
|
||||
std::shared_ptr<WebSocketClient> wsc;
|
||||
std::shared_ptr<zeek::cluster::Backend> backend;
|
||||
std::optional<std::string> application_name; // The value from the HTTP X-Application-Name header, if any.
|
||||
bool ready_to_publish = false;
|
||||
uint64_t msg_count = 0;
|
||||
std::list<WebSocketMessage> queue;
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||
error in <...>/main.zeek, line 675: Already listening on 127.0.0.1:<port> (Cluster::__listen_websocket(ws_opts_x))
|
||||
error in <...>/main.zeek, line 675: Already listening on 127.0.0.1:<port> (Cluster::__listen_websocket(ws_opts_wss_port))
|
||||
error in <...>/main.zeek, line 675: Already listening on 127.0.0.1:<port> (Cluster::__listen_websocket(ws_opts_qs))
|
||||
error in <...>/main.zeek, line 677: Already listening on 127.0.0.1:<port> (Cluster::__listen_websocket(ws_opts_x))
|
||||
error in <...>/main.zeek, line 677: Already listening on 127.0.0.1:<port> (Cluster::__listen_websocket(ws_opts_wss_port))
|
||||
error in <...>/main.zeek, line 677: Already listening on 127.0.0.1:<port> (Cluster::__listen_websocket(ws_opts_qs))
|
||||
received termination signal
|
||||
|
|
|
@ -1,3 +1,3 @@
|
|||
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||
error in <...>/main.zeek, line 675: Invalid tls_options: No key_file field (Cluster::__listen_websocket(Cluster::options.0))
|
||||
error in <...>/main.zeek, line 675: Invalid tls_options: No cert_file field (Cluster::__listen_websocket(Cluster::options.3))
|
||||
error in <...>/main.zeek, line 677: Invalid tls_options: No key_file field (Cluster::__listen_websocket(Cluster::options.0))
|
||||
error in <...>/main.zeek, line 677: Invalid tls_options: No cert_file field (Cluster::__listen_websocket(Cluster::options.3))
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||
manager WebSocket client <nodeid> (127.0.0.1:<port>) subscribed to [/topic/ws/1, /topic/ws/all]
|
||||
manager WebSocket client <nodeid> (127.0.0.1:<port>) application_name=super-duper-app subscribed to [/topic/ws/1, /topic/ws/all]
|
||||
manager WebSocket client <nodeid> (127.0.0.1:<port>) subscribed to [/topic/ws/2, /topic/ws/all]
|
||||
manager WebSocket client <nodeid> (127.0.0.1:<port>) subscribed to [/topic/ws/3, /topic/ws/all]
|
||||
manager WebSocket client <nodeid> (127.0.0.1:<port>) gone with code 1000
|
||||
manager WebSocket client <nodeid> (127.0.0.1:<port>) gone with code 1000
|
||||
manager WebSocket client <nodeid> (127.0.0.1:<port>) application_name=super-duper-app subscribed to [/topic/ws/3, /topic/ws/all]
|
||||
manager WebSocket client <nodeid> (127.0.0.1:<port>) application_name=super-duper-app gone with code 1000
|
||||
manager WebSocket client <nodeid> (127.0.0.1:<port>) gone with code 1000
|
||||
manager WebSocket client <nodeid> (127.0.0.1:<port>) application_name=super-duper-app gone with code 1000
|
||||
|
|
|
@ -81,9 +81,9 @@ ws_port = os.environ['WEBSOCKET_PORT'].split('/')[0]
|
|||
ws_url = f'ws://127.0.0.1:{ws_port}/v1/messages/json'
|
||||
|
||||
def run(ws_url):
|
||||
with connect(ws_url) as ws1:
|
||||
with connect(ws_url, additional_headers={"X-Application-Name": "super-duper-app"}) as ws1:
|
||||
with connect(ws_url) as ws2:
|
||||
with connect(ws_url) as ws3:
|
||||
with connect(ws_url, additional_headers={"X-Application-Name": "super-duper-app"}) as ws3:
|
||||
clients = [ws1, ws2, ws3]
|
||||
print("Connected!")
|
||||
ids = set()
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue