Merge remote-tracking branch 'origin/topic/awelzel/cluster-event-metadata-fixes-for-8.0'

* origin/topic/awelzel/cluster-event-metadata-fixes-for-8.0:
  cluster/Backend: Fallback to current network time when current event has not timestamp
  cluster/serializer/broker: Do not send empty metadata vectors around

(cherry picked from commit 3e89e6b328)
This commit is contained in:
Arne Welzel 2025-08-22 10:12:35 +02:00 committed by Tim Wojtulewicz
parent 4bfac4a087
commit c0a80fe610
18 changed files with 196 additions and 6 deletions

22
CHANGES
View file

@ -1,3 +1,25 @@
8.0.0-6 | 2025-08-22 09:25:14 -0700
* cluster/Backend: Fallback to current network time when current event has not timestamp (Arne Welzel, Corelight)
When a WebSocket client sends an event to Zeek without explicit network
timestamp metadata, Zeek would use -1.0 as a timestamp for any events
published while handling this event. Instead, it seems far more sensible
to use the current network time in that scenario.
(cherry picked from commit 3e89e6b3288453b7a0f89fb742384e213cf5cc94)
* cluster/serializer/broker: Do not send empty metadata vectors around (Arne Welzel, Corelight)
Event when there's no metadata attached to an event, we'd still use the
constructor passing an empty metadata vector, resulting in an on-the-wire
representation with an empty trailing vector.
Particularly visible when just snooping events via websocat. There also
seems to be some bug with the timestamp -1 handling.
(cherry picked from commit 3e89e6b3288453b7a0f89fb742384e213cf5cc94)
8.0.0-5 | 2025-08-22 09:24:21 -0700
* cluster/serializer/broker: Do not special case Broker::Data anymore (Arne Welzel, Corelight)

View file

@ -1 +1 @@
8.0.0-5
8.0.0-6

View file

@ -132,10 +132,19 @@ std::optional<Event> Backend::MakeClusterEvent(FuncValPtr handler, ArgsSpan args
*
* @J-Gras prefers the current behavior. @awelzel wonders if there should
* be an opt-in/opt-out for this behavior. Procrastinating it for now.
*
* In any case, if the current event has no timestamp information
* (detail::NO_TIMESTAMP is -1.0), use the current network time for
* the outgoing event instead as network timestamp metadata.
*/
zeek::detail::EventMetadataVectorPtr meta;
if ( zeek::BifConst::EventMetadata::add_network_timestamp )
meta = zeek::detail::MakeEventMetadataVector(zeek::event_mgr.CurrentEventTime());
if ( zeek::BifConst::EventMetadata::add_network_timestamp ) {
auto ts = zeek::event_mgr.CurrentEventTime();
if ( ts == zeek::detail::NO_TIMESTAMP )
ts = run_state::network_time;
meta = zeek::detail::MakeEventMetadataVector(ts);
}
return Event{eh, std::move(*checked_args), std::move(meta)};
}

View file

@ -66,8 +66,8 @@ std::optional<broker::zeek::Event> detail::to_broker_event(const zeek::cluster::
}
// Convert metadata from the cluster::detail::Event event to broker's event metadata format.
broker::vector broker_meta;
if ( const auto* meta = ev.Metadata(); meta != nullptr ) {
broker::vector broker_meta;
broker_meta.reserve(meta->size());
for ( const auto& m : *meta ) {
@ -81,9 +81,11 @@ std::optional<broker::zeek::Event> detail::to_broker_event(const zeek::cluster::
obj_desc_short(m.Val()).c_str());
}
}
return broker::zeek::Event(ev.HandlerName(), xs, broker_meta);
}
return broker::zeek::Event(ev.HandlerName(), xs, broker_meta);
return broker::zeek::Event(ev.HandlerName(), xs);
}
std::optional<zeek::cluster::Event> detail::to_zeek_event(const broker::zeek::Event& ev) {

View file

@ -0,0 +1 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.

View file

@ -0,0 +1,5 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
Connected!
ack {'type': 'ack', 'endpoint': 'endpoint', 'version': 'endpoint'}
ping {'type': 'data-message', 'topic': '/test/pings/', '@data-type': 'vector', 'data': [{'@data-type': 'count', 'data': 1}, {'@data-type': 'count', 'data': 1}, {'@data-type': 'vector', 'data': [{'@data-type': 'string', 'data': 'ping'}, {'@data-type': 'vector', 'data': [{'@data-type': 'string', 'data': 'fourty-two'}, {'@data-type': 'count', 'data': 42}]}, {'@data-type': 'vector', 'data': [{'@data-type': 'vector', 'data': [{'@data-type': 'count', 'data': 1}, {'@data-type': 'timestamp', 'data': '1970-01-01T01:42:42'}]}]}]}]}
pong {'type': 'data-message', 'topic': '/test/pongs/', '@data-type': 'vector', 'data': [{'@data-type': 'count', 'data': 1}, {'@data-type': 'count', 'data': 1}, {'@data-type': 'vector', 'data': [{'@data-type': 'string', 'data': 'pong'}, {'@data-type': 'vector', 'data': [{'@data-type': 'string', 'data': 'fourty-two fourty-two'}, {'@data-type': 'count', 'data': 84}]}, {'@data-type': 'vector', 'data': [{'@data-type': 'vector', 'data': [{'@data-type': 'count', 'data': 1}, {'@data-type': 'timestamp', 'data': '1970-01-01T01:42:42.000'}]}]}]}]}

View file

@ -0,0 +1 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.

View file

@ -0,0 +1,5 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
Connected!
ack {'type': 'ack', 'endpoint': 'endpoint', 'version': 'endpoint'}
ping {'type': 'data-message', 'topic': '/test/pings/', '@data-type': 'vector', 'data': [{'@data-type': 'count', 'data': 1}, {'@data-type': 'count', 'data': 1}, {'@data-type': 'vector', 'data': [{'@data-type': 'string', 'data': 'ping'}, {'@data-type': 'vector', 'data': [{'@data-type': 'string', 'data': 'fourty-two'}, {'@data-type': 'count', 'data': 42}]}]}]}
pong {'type': 'data-message', 'topic': '/test/pongs/', '@data-type': 'vector', 'data': [{'@data-type': 'count', 'data': 1}, {'@data-type': 'count', 'data': 1}, {'@data-type': 'vector', 'data': [{'@data-type': 'string', 'data': 'pong'}, {'@data-type': 'vector', 'data': [{'@data-type': 'string', 'data': 'fourty-two fourty-two'}, {'@data-type': 'count', 'data': 84}]}, {'@data-type': 'vector', 'data': [{'@data-type': 'vector', 'data': [{'@data-type': 'count', 'data': 1}, {'@data-type': 'timestamp', 'data': '1970-01-01T01:18:31.000'}]}]}]}]}

View file

@ -0,0 +1 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.

View file

@ -0,0 +1,5 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
Connected!
ack {'type': 'ack', 'endpoint': 'endpoint', 'version': 'endpoint'}
ping {'type': 'data-message', 'topic': '/test/pings/', '@data-type': 'vector', 'data': [{'@data-type': 'count', 'data': 1}, {'@data-type': 'count', 'data': 1}, {'@data-type': 'vector', 'data': [{'@data-type': 'string', 'data': 'ping'}, {'@data-type': 'vector', 'data': [{'@data-type': 'string', 'data': 'fourty-two'}, {'@data-type': 'count', 'data': 42}]}]}]}
pong {'type': 'data-message', 'topic': '/test/pongs/', '@data-type': 'vector', 'data': [{'@data-type': 'count', 'data': 1}, {'@data-type': 'count', 'data': 1}, {'@data-type': 'vector', 'data': [{'@data-type': 'string', 'data': 'pong'}, {'@data-type': 'vector', 'data': [{'@data-type': 'string', 'data': 'fourty-two fourty-two'}, {'@data-type': 'count', 'data': 84}]}]}]}

View file

@ -0,0 +1,2 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
<params>, line 1: received termination signal

View file

@ -0,0 +1,4 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
Cluster::websocket_client_added, [/test/pongs/, /zeek/wstest/ws1/]
ping: fourty-two, 42 (metadata=[[id=EventMetadata::NETWORK_TIMESTAMP, val=6162.0]]), sending pong...
Cluster::websocket_client_lost

View file

@ -0,0 +1,2 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
<params>, line 1: received termination signal

View file

@ -0,0 +1,4 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
Cluster::websocket_client_added, [/test/pongs/, /zeek/wstest/ws1/]
ping: fourty-two, 42 (metadata=[]), sending pong...
Cluster::websocket_client_lost

View file

@ -0,0 +1,2 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
received termination signal

View file

@ -0,0 +1,4 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
Cluster::websocket_client_added, [/test/pongs/, /zeek/wstest/ws1/]
ping: fourty-two, 42 (metadata=[]), sending pong...
Cluster::websocket_client_lost

View file

@ -105,7 +105,7 @@ def run(ws_url):
# This should be good ping(string, count)
ws.send(json.dumps(make_ping([{"@data-type": "string", "data": "Hello"}, {"@data-type": "count", "data": 42}])))
pong = json.loads(ws.recv())
name, args, _ = pong["data"][2]["data"]
name, args = pong["data"][2]["data"]
print("pong", name, args)
# This one fails again

View file

@ -0,0 +1,121 @@
# @TEST-DOC: Run a single node cluster (manager) with a websocket server and have a single client connect to check the metadata it receives.
#
# @TEST-REQUIRES: have-zeromq
# @TEST-REQUIRES: python3 -c 'import websockets.sync'
#
# @TEST-GROUP: cluster-zeromq
#
# @TEST-PORT: XPUB_PORT
# @TEST-PORT: XSUB_PORT
# @TEST-PORT: LOG_PULL_PORT
# @TEST-PORT: WEBSOCKET_PORT
#
# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-simple.zeek cluster-layout.zeek
# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek
# @TEST-EXEC: cp $FILES/ws/wstest.py .
#
# @TEST-EXEC: zeek -b --parse-only manager.zeek
# @TEST-EXEC: python3 -m py_compile client.py
#
# @TEST-EXEC: btest-bg-run manager-no-metadata "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek >out"
# @TEST-EXEC: btest-bg-run client-no-metadata "python3 ../client.py >out"
#
# @TEST-EXEC: btest-bg-wait 30
# @TEST-EXEC: btest-diff ./manager-no-metadata/out
# @TEST-EXEC: btest-diff ./manager-no-metadata/.stderr
# @TEST-EXEC: btest-diff ./client-no-metadata/out
# @TEST-EXEC: btest-diff ./client-no-metadata/.stderr
#
# @TEST-EXEC: btest-bg-run manager-metadata "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek EventMetadata::add_network_timestamp=T >out"
# @TEST-EXEC: btest-bg-run client-metadata "python3 ../client.py >out"
#
# @TEST-EXEC: btest-bg-wait 30
# @TEST-EXEC: btest-diff ./manager-metadata/out
# @TEST-EXEC: btest-diff ./manager-metadata/.stderr
# @TEST-EXEC: btest-diff ./client-metadata/out
# @TEST-EXEC: btest-diff ./client-metadata/.stderr
#
# @TEST-EXEC: btest-bg-run manager-metadata-from-client "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek EventMetadata::add_network_timestamp=T >out"
# @TEST-EXEC: btest-bg-run client-metadata-from-client "NETWORK_TIMESTAMP=1970-01-01T01:42:42 python3 ../client.py >out"
# @TEST-EXEC: btest-bg-wait 30
# @TEST-EXEC: btest-diff ./manager-metadata-from-client/out
# @TEST-EXEC: btest-diff ./manager-metadata-from-client/.stderr
# @TEST-EXEC: btest-diff ./client-metadata-from-client/out
# @TEST-EXEC: btest-diff ./client-metadata-from-client/.stderr
# @TEST-START-FILE manager.zeek
@load ./zeromq-test-bootstrap
redef exit_only_after_terminate = T;
redef allow_network_time_forward = F;
global ping: event(msg: string, c: count) &is_used;
global pong: event(msg: string, c: count) &is_used;
event zeek_init()
{
set_network_time(double_to_time(4711.0));
Cluster::subscribe("/test/pings/");
Cluster::listen_websocket([$listen_addr=127.0.0.1, $listen_port=to_port(getenv("WEBSOCKET_PORT"))]);
}
event ping(msg: string, n: count) &is_used
{
print fmt("ping: %s, %s (metadata=%s), sending pong...", msg, n, EventMetadata::current_all());
Cluster::publish("/test/pongs/", pong, msg + " " + msg, n + n);
}
event Cluster::websocket_client_added(info: Cluster::EndpointInfo, subscriptions: string_vec)
{
print "Cluster::websocket_client_added", subscriptions;
}
event Cluster::websocket_client_lost(info: Cluster::EndpointInfo, code: count, reason: string)
{
print "Cluster::websocket_client_lost";
terminate();
}
# @TEST-END-FILE
# @TEST-START-FILE client.py
import os
import wstest
def run(ws_url):
with wstest.connect("ws1", ws_url) as c:
print("Connected!")
ack = c.hello_v1(["/test/pongs/"])
assert "type" in ack
assert ack["type"] == "ack"
assert "endpoint" in ack
assert "version" in ack
ack["endpoint"] = "endpoint"
ack["version"] = "endpoint"
print("ack", ack)
ping = wstest.build_event_v1("/test/pings/", "ping", ["fourty-two", 42])
if ts_str := os.environ.get("NETWORK_TIMESTAMP"):
# Sneak timestamp metadata into the ping if the env variable is set
ping["data"][2]["data"] += [{
"@data-type": "vector",
"data": [{
"@data-type": "vector", "data": [
{"@data-type": "count", "data": 1},
{"@data-type": "timestamp", "data": ts_str}
],
}]
}]
print("ping", ping)
c.send_json(ping)
pong = c.recv_json()
print("pong", pong)
if __name__ == "__main__":
wstest.main(run, wstest.WS4_URL_V1)
# @TEST-END-FILE