mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 14:48:21 +00:00
Merge remote-tracking branch 'origin/topic/awelzel/cluster-log-websocket-application-name'
* origin/topic/awelzel/cluster-log-websocket-application-name: cluster/WebSocket: Include X-Application-Name in cluster.log
This commit is contained in:
commit
c725311d07
11 changed files with 50 additions and 23 deletions
9
CHANGES
9
CHANGES
|
@ -1,3 +1,12 @@
|
||||||
|
8.0.0-dev.562 | 2025-06-30 17:55:58 +0200
|
||||||
|
|
||||||
|
* cluster/WebSocket: Include X-Application-Name in cluster.log (Arne Welzel, Corelight)
|
||||||
|
|
||||||
|
A bit ad-hoc formatting for the log, but that's mostly because cluster.log
|
||||||
|
only has message field and I don't think having a dedicated application_name
|
||||||
|
column is worth it. That could also be added by custom scripts if it's really
|
||||||
|
wanted for a given deployment.
|
||||||
|
|
||||||
8.0.0-dev.560 | 2025-06-30 13:29:07 +0200
|
8.0.0-dev.560 | 2025-06-30 13:29:07 +0200
|
||||||
|
|
||||||
* cluster/Telemetry: Cache CallExpr locations (Arne Welzel, Corelight)
|
* cluster/Telemetry: Cache CallExpr locations (Arne Welzel, Corelight)
|
||||||
|
|
2
VERSION
2
VERSION
|
@ -1 +1 @@
|
||||||
8.0.0-dev.560
|
8.0.0-dev.562
|
||||||
|
|
|
@ -398,6 +398,8 @@ export {
|
||||||
type EndpointInfo: record {
|
type EndpointInfo: record {
|
||||||
id: string;
|
id: string;
|
||||||
network: NetworkInfo;
|
network: NetworkInfo;
|
||||||
|
## The value of the X-Application-Name HTTP header, if any.
|
||||||
|
application_name: string &optional;
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -675,17 +677,25 @@ function listen_websocket(options: WebSocketServerOptions): bool
|
||||||
return Cluster::__listen_websocket(options);
|
return Cluster::__listen_websocket(options);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function format_endpoint_info(ei: EndpointInfo): string
|
||||||
|
{
|
||||||
|
local s = fmt("'%s' (%s:%d)", ei$id, ei$network$address, ei$network$bound_port);
|
||||||
|
if ( ei?$application_name )
|
||||||
|
s += fmt(" application_name=%s", ei$application_name);
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
|
||||||
event websocket_client_added(endpoint: EndpointInfo, subscriptions: string_vec)
|
event websocket_client_added(endpoint: EndpointInfo, subscriptions: string_vec)
|
||||||
{
|
{
|
||||||
local msg = fmt("WebSocket client '%s' (%s:%d) subscribed to %s",
|
local msg = fmt("WebSocket client %s subscribed to %s",
|
||||||
endpoint$id, endpoint$network$address, endpoint$network$bound_port, subscriptions);
|
format_endpoint_info(endpoint), subscriptions);
|
||||||
Cluster::log(msg);
|
Cluster::log(msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
event websocket_client_lost(endpoint: EndpointInfo, code: count, reason: string)
|
event websocket_client_lost(endpoint: EndpointInfo, code: count, reason: string)
|
||||||
{
|
{
|
||||||
local msg = fmt("WebSocket client '%s' (%s:%d) gone with code %d%s",
|
local msg = fmt("WebSocket client %s gone with code %d%s",
|
||||||
endpoint$id, endpoint$network$address, endpoint$network$bound_port, code,
|
format_endpoint_info(endpoint), code,
|
||||||
|reason| > 0 ? fmt(" and reason '%s'", reason) : "");
|
|reason| > 0 ? fmt(" and reason '%s'", reason) : "");
|
||||||
Cluster::log(msg);
|
Cluster::log(msg);
|
||||||
}
|
}
|
||||||
|
|
|
@ -138,7 +138,7 @@ bool is_cluster_pool(const zeek::Val* pool) {
|
||||||
}
|
}
|
||||||
|
|
||||||
zeek::RecordValPtr make_endpoint_info(const std::string& id, const std::string& address, uint32_t port,
|
zeek::RecordValPtr make_endpoint_info(const std::string& id, const std::string& address, uint32_t port,
|
||||||
TransportProto proto) {
|
TransportProto proto, std::optional<std::string> application_name) {
|
||||||
static const auto ep_info_type = zeek::id::find_type<zeek::RecordType>("Cluster::EndpointInfo");
|
static const auto ep_info_type = zeek::id::find_type<zeek::RecordType>("Cluster::EndpointInfo");
|
||||||
static const auto net_info_type = zeek::id::find_type<zeek::RecordType>("Cluster::NetworkInfo");
|
static const auto net_info_type = zeek::id::find_type<zeek::RecordType>("Cluster::NetworkInfo");
|
||||||
|
|
||||||
|
@ -149,6 +149,8 @@ zeek::RecordValPtr make_endpoint_info(const std::string& id, const std::string&
|
||||||
auto ep_rec = zeek::make_intrusive<zeek::RecordVal>(ep_info_type);
|
auto ep_rec = zeek::make_intrusive<zeek::RecordVal>(ep_info_type);
|
||||||
ep_rec->Assign(0, id);
|
ep_rec->Assign(0, id);
|
||||||
ep_rec->Assign(1, net_rec);
|
ep_rec->Assign(1, net_rec);
|
||||||
|
if ( application_name )
|
||||||
|
ep_rec->Assign(2, zeek::make_intrusive<zeek::StringVal>(*application_name));
|
||||||
|
|
||||||
return ep_rec;
|
return ep_rec;
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,6 +2,9 @@
|
||||||
|
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
|
#include <optional>
|
||||||
|
#include <string>
|
||||||
|
|
||||||
#include "zeek/IntrusivePtr.h"
|
#include "zeek/IntrusivePtr.h"
|
||||||
#include "zeek/Span.h"
|
#include "zeek/Span.h"
|
||||||
#include "zeek/net_util.h"
|
#include "zeek/net_util.h"
|
||||||
|
@ -60,7 +63,7 @@ bool is_cluster_pool(const zeek::Val* pool);
|
||||||
* @returns A record value of type Cluster::EndpointInfo filled with the provided info.
|
* @returns A record value of type Cluster::EndpointInfo filled with the provided info.
|
||||||
*/
|
*/
|
||||||
zeek::RecordValPtr make_endpoint_info(const std::string& id, const std::string& address, uint32_t port,
|
zeek::RecordValPtr make_endpoint_info(const std::string& id, const std::string& address, uint32_t port,
|
||||||
TransportProto proto);
|
TransportProto proto, std::optional<std::string> application_name);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper to go from a vector or array of std::strings to a zeek::VectorVal.
|
* Helper to go from a vector or array of std::strings to a zeek::VectorVal.
|
||||||
|
|
|
@ -350,7 +350,7 @@ void WebSocketEventDispatcher::Process(const WebSocketOpen& open) {
|
||||||
backend->InitPostScript();
|
backend->InitPostScript();
|
||||||
backend->Init(std::move(ws_id));
|
backend->Init(std::move(ws_id));
|
||||||
|
|
||||||
clients[id] = WebSocketClientEntry{id, wsc, std::move(backend)};
|
clients[id] = WebSocketClientEntry{id, wsc, std::move(backend), open.application_name};
|
||||||
}
|
}
|
||||||
|
|
||||||
void WebSocketEventDispatcher::Process(const WebSocketClose& close) {
|
void WebSocketEventDispatcher::Process(const WebSocketClose& close) {
|
||||||
|
@ -375,8 +375,9 @@ void WebSocketEventDispatcher::Process(const WebSocketClose& close) {
|
||||||
// Raise Cluster::websocket_client_lost() after the backend has terminated.
|
// Raise Cluster::websocket_client_lost() after the backend has terminated.
|
||||||
// In case any messages/events were still pending, Cluster::websocket_client_lost()
|
// In case any messages/events were still pending, Cluster::websocket_client_lost()
|
||||||
// should be the last event related to this WebSocket client.
|
// should be the last event related to this WebSocket client.
|
||||||
auto rec = zeek::cluster::detail::bif::make_endpoint_info(backend->NodeId(), wsc->getRemoteIp(),
|
auto rec =
|
||||||
wsc->getRemotePort(), TRANSPORT_TCP);
|
zeek::cluster::detail::bif::make_endpoint_info(backend->NodeId(), wsc->getRemoteIp(), wsc->getRemotePort(),
|
||||||
|
TRANSPORT_TCP, it->second.application_name);
|
||||||
zeek::event_mgr.Enqueue(Cluster::websocket_client_lost, std::move(rec), zeek::val_mgr->Count(close.code),
|
zeek::event_mgr.Enqueue(Cluster::websocket_client_lost, std::move(rec), zeek::val_mgr->Count(close.code),
|
||||||
zeek::make_intrusive<zeek::StringVal>(close.reason));
|
zeek::make_intrusive<zeek::StringVal>(close.reason));
|
||||||
}
|
}
|
||||||
|
@ -481,8 +482,9 @@ void WebSocketEventDispatcher::HandleSubscriptions(WebSocketClientEntry& entry,
|
||||||
void WebSocketEventDispatcher::HandleSubscriptionsActive(const WebSocketClientEntry& entry) {
|
void WebSocketEventDispatcher::HandleSubscriptionsActive(const WebSocketClientEntry& entry) {
|
||||||
auto& wsc = entry.wsc;
|
auto& wsc = entry.wsc;
|
||||||
|
|
||||||
auto rec = zeek::cluster::detail::bif::make_endpoint_info(entry.backend->NodeId(), wsc->getRemoteIp(),
|
auto rec =
|
||||||
wsc->getRemotePort(), TRANSPORT_TCP);
|
zeek::cluster::detail::bif::make_endpoint_info(entry.backend->NodeId(), wsc->getRemoteIp(),
|
||||||
|
wsc->getRemotePort(), TRANSPORT_TCP, entry.application_name);
|
||||||
auto subscriptions_vec = zeek::cluster::detail::bif::make_string_vec(wsc->GetSubscriptions());
|
auto subscriptions_vec = zeek::cluster::detail::bif::make_string_vec(wsc->GetSubscriptions());
|
||||||
zeek::event_mgr.Enqueue(Cluster::websocket_client_added, std::move(rec), std::move(subscriptions_vec));
|
zeek::event_mgr.Enqueue(Cluster::websocket_client_added, std::move(rec), std::move(subscriptions_vec));
|
||||||
|
|
||||||
|
|
|
@ -233,6 +233,7 @@ private:
|
||||||
std::string id;
|
std::string id;
|
||||||
std::shared_ptr<WebSocketClient> wsc;
|
std::shared_ptr<WebSocketClient> wsc;
|
||||||
std::shared_ptr<zeek::cluster::Backend> backend;
|
std::shared_ptr<zeek::cluster::Backend> backend;
|
||||||
|
std::optional<std::string> application_name; // The value from the HTTP X-Application-Name header, if any.
|
||||||
bool ready_to_publish = false;
|
bool ready_to_publish = false;
|
||||||
uint64_t msg_count = 0;
|
uint64_t msg_count = 0;
|
||||||
std::list<WebSocketMessage> queue;
|
std::list<WebSocketMessage> queue;
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||||
error in <...>/main.zeek, line 675: Already listening on 127.0.0.1:<port> (Cluster::__listen_websocket(ws_opts_x))
|
error in <...>/main.zeek, line 677: Already listening on 127.0.0.1:<port> (Cluster::__listen_websocket(ws_opts_x))
|
||||||
error in <...>/main.zeek, line 675: Already listening on 127.0.0.1:<port> (Cluster::__listen_websocket(ws_opts_wss_port))
|
error in <...>/main.zeek, line 677: Already listening on 127.0.0.1:<port> (Cluster::__listen_websocket(ws_opts_wss_port))
|
||||||
error in <...>/main.zeek, line 675: Already listening on 127.0.0.1:<port> (Cluster::__listen_websocket(ws_opts_qs))
|
error in <...>/main.zeek, line 677: Already listening on 127.0.0.1:<port> (Cluster::__listen_websocket(ws_opts_qs))
|
||||||
received termination signal
|
received termination signal
|
||||||
|
|
|
@ -1,3 +1,3 @@
|
||||||
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||||
error in <...>/main.zeek, line 675: Invalid tls_options: No key_file field (Cluster::__listen_websocket(Cluster::options.0))
|
error in <...>/main.zeek, line 677: Invalid tls_options: No key_file field (Cluster::__listen_websocket(Cluster::options.0))
|
||||||
error in <...>/main.zeek, line 675: Invalid tls_options: No cert_file field (Cluster::__listen_websocket(Cluster::options.3))
|
error in <...>/main.zeek, line 677: Invalid tls_options: No cert_file field (Cluster::__listen_websocket(Cluster::options.3))
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||||
manager WebSocket client <nodeid> (127.0.0.1:<port>) subscribed to [/topic/ws/1, /topic/ws/all]
|
manager WebSocket client <nodeid> (127.0.0.1:<port>) application_name=super-duper-app subscribed to [/topic/ws/1, /topic/ws/all]
|
||||||
manager WebSocket client <nodeid> (127.0.0.1:<port>) subscribed to [/topic/ws/2, /topic/ws/all]
|
manager WebSocket client <nodeid> (127.0.0.1:<port>) subscribed to [/topic/ws/2, /topic/ws/all]
|
||||||
manager WebSocket client <nodeid> (127.0.0.1:<port>) subscribed to [/topic/ws/3, /topic/ws/all]
|
manager WebSocket client <nodeid> (127.0.0.1:<port>) application_name=super-duper-app subscribed to [/topic/ws/3, /topic/ws/all]
|
||||||
manager WebSocket client <nodeid> (127.0.0.1:<port>) gone with code 1000
|
manager WebSocket client <nodeid> (127.0.0.1:<port>) application_name=super-duper-app gone with code 1000
|
||||||
manager WebSocket client <nodeid> (127.0.0.1:<port>) gone with code 1000
|
|
||||||
manager WebSocket client <nodeid> (127.0.0.1:<port>) gone with code 1000
|
manager WebSocket client <nodeid> (127.0.0.1:<port>) gone with code 1000
|
||||||
|
manager WebSocket client <nodeid> (127.0.0.1:<port>) application_name=super-duper-app gone with code 1000
|
||||||
|
|
|
@ -81,9 +81,9 @@ ws_port = os.environ['WEBSOCKET_PORT'].split('/')[0]
|
||||||
ws_url = f'ws://127.0.0.1:{ws_port}/v1/messages/json'
|
ws_url = f'ws://127.0.0.1:{ws_port}/v1/messages/json'
|
||||||
|
|
||||||
def run(ws_url):
|
def run(ws_url):
|
||||||
with connect(ws_url) as ws1:
|
with connect(ws_url, additional_headers={"X-Application-Name": "super-duper-app"}) as ws1:
|
||||||
with connect(ws_url) as ws2:
|
with connect(ws_url) as ws2:
|
||||||
with connect(ws_url) as ws3:
|
with connect(ws_url, additional_headers={"X-Application-Name": "super-duper-app"}) as ws3:
|
||||||
clients = [ws1, ws2, ws3]
|
clients = [ws1, ws2, ws3]
|
||||||
print("Connected!")
|
print("Connected!")
|
||||||
ids = set()
|
ids = set()
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue