diff --git a/CHANGES b/CHANGES index 259b322637..c817e25a41 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,18 @@ +6.0.0-dev.551 | 2023-05-11 14:00:31 +0200 + + * Add compatibility tests for timestamped events. (Jan Grashoefer, Corelight) + + This adds compatibility tests for receiving non-timestamped events as + well as providing timestamps via broker websockets. + + * Add timestamps to auto published broker events. (Jan Grashoefer, Corelight) + + * Add timestamps to manually published broker events. (Jan Grashoefer, Corelight) + + * Annotate scheduled events with intended timestamp. (Jan Grashoefer, Corelight) + + * Add timestamp to events. (Jan Grashoefer, Corelight) + 6.0.0-dev.544 | 2023-05-11 00:01:20 +0200 * GH-3028: policy/community-id: Do not use new_connection() (Arne Welzel, Corelight) diff --git a/NEWS b/NEWS index e58a91fc0f..caf01c0804 100644 --- a/NEWS +++ b/NEWS @@ -91,6 +91,19 @@ New Functionality To disable this functionality, pass ``--disable-javascript`` to configure. +- Zeek events now hold network timestamps. For scheduled events, the timestamp + represents the network time for which the event was scheduled for, otherwise + it is the network time at event creation. A new bif ``current_event_time()`` + allows to retrieve the current event's network timestamp within the script-layer. + + When Zeek sends events via Broker to other nodes in a cluster, an event's network + timestamp is attached to the Broker messages. On a receiving Zeek node executing a + handler for a remote event, ``current_event_time()`` returns the network time of + the sending node at the time the event was created. + + The Broker level implementation allows to exchange arbitrary event metadata, but + Zeek's script and C++ APIs currently only expose network timestamp functionality. + - A new bif ``from_json()`` can be used to parse JSON strings into records. type A: record { a: addr; }; diff --git a/VERSION b/VERSION index 3b81a39e43..e35d066411 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -6.0.0-dev.544 +6.0.0-dev.551 diff --git a/auxil/broker b/auxil/broker index 000834f60a..3dd913bacf 160000 --- a/auxil/broker +++ b/auxil/broker @@ -1 +1 @@ -Subproject commit 000834f60ab7540041c431a3657c23c7476e368d +Subproject commit 3dd913bacfc394f8108bf8306be0a60c253a3d13 diff --git a/src/Event.cc b/src/Event.cc index 065cee5839..de26955d56 100644 --- a/src/Event.cc +++ b/src/Event.cc @@ -7,7 +7,6 @@ #include "zeek/Desc.h" #include "zeek/Func.h" #include "zeek/NetVar.h" -#include "zeek/RunState.h" #include "zeek/Trigger.h" #include "zeek/Val.h" #include "zeek/iosource/Manager.h" @@ -19,10 +18,10 @@ zeek::EventMgr zeek::event_mgr; namespace zeek { -Event::Event(EventHandlerPtr arg_handler, zeek::Args arg_args, util::detail::SourceID arg_src, - analyzer::ID arg_aid, Obj* arg_obj) - : handler(arg_handler), args(std::move(arg_args)), src(arg_src), aid(arg_aid), obj(arg_obj), - next_event(nullptr) +Event::Event(const EventHandlerPtr& arg_handler, zeek::Args arg_args, + util::detail::SourceID arg_src, analyzer::ID arg_aid, Obj* arg_obj, double arg_ts) + : handler(arg_handler), args(std::move(arg_args)), src(arg_src), aid(arg_aid), ts(arg_ts), + obj(arg_obj), next_event(nullptr) { if ( obj ) Ref(obj); @@ -53,7 +52,7 @@ void Event::Dispatch(bool no_remote) try { - handler->Call(&args, no_remote); + handler->Call(&args, no_remote, ts); } catch ( InterpreterException& e ) @@ -74,6 +73,7 @@ EventMgr::EventMgr() head = tail = nullptr; current_src = util::detail::SOURCE_LOCAL; current_aid = 0; + current_ts = 0; src_val = nullptr; draining = false; } @@ -91,9 +91,9 @@ EventMgr::~EventMgr() } void EventMgr::Enqueue(const EventHandlerPtr& h, Args vl, util::detail::SourceID src, - analyzer::ID aid, Obj* obj) + analyzer::ID aid, Obj* obj, double ts) { - QueueEvent(new Event(h, std::move(vl), src, aid, obj)); + QueueEvent(new Event(h, std::move(vl), src, aid, obj, ts)); } void EventMgr::QueueEvent(Event* event) @@ -120,6 +120,8 @@ void EventMgr::QueueEvent(Event* event) void EventMgr::Dispatch(Event* event, bool no_remote) { current_src = event->Source(); + current_aid = event->Analyzer(); + current_ts = event->Time(); event->Dispatch(no_remote); Unref(event); } @@ -154,6 +156,7 @@ void EventMgr::Drain() current_src = current->Source(); current_aid = current->Analyzer(); + current_ts = current->Time(); current->Dispatch(); Unref(current); diff --git a/src/Event.h b/src/Event.h index dc3e931bd8..b746635007 100644 --- a/src/Event.h +++ b/src/Event.h @@ -15,14 +15,19 @@ namespace zeek { +namespace run_state + { +extern double network_time; + } // namespace run_state + class EventMgr; class Event final : public Obj { public: - Event(EventHandlerPtr handler, zeek::Args args, + Event(const EventHandlerPtr& handler, zeek::Args args, util::detail::SourceID src = util::detail::SOURCE_LOCAL, analyzer::ID aid = 0, - Obj* obj = nullptr); + Obj* obj = nullptr, double ts = run_state::network_time); void SetNext(Event* n) { next_event = n; } Event* NextEvent() const { return next_event; } @@ -31,6 +36,7 @@ public: analyzer::ID Analyzer() const { return aid; } EventHandlerPtr Handler() const { return handler; } const zeek::Args& Args() const { return args; } + double Time() const { return ts; } void Describe(ODesc* d) const override; @@ -45,6 +51,7 @@ protected: zeek::Args args; util::detail::SourceID src; analyzer::ID aid; + double ts; Obj* obj; Event* next_event; }; @@ -66,10 +73,12 @@ public: * @param aid identifies the protocol analyzer generating the event. * @param obj an arbitrary object to use as a "cookie" or just hold a * reference to until dispatching the event. + * @param ts timestamp at which the event is intended to be executed + * (defaults to current network time). */ void Enqueue(const EventHandlerPtr& h, zeek::Args vl, util::detail::SourceID src = util::detail::SOURCE_LOCAL, analyzer::ID aid = 0, - Obj* obj = nullptr); + Obj* obj = nullptr, double ts = run_state::network_time); /** * A version of Enqueue() taking a variable number of arguments. @@ -95,6 +104,11 @@ public: // non-analyzer event. analyzer::ID CurrentAnalyzer() const { return current_aid; } + // Returns the timestamp of the last raised event. The timestamp reflects the network time + // the event was intended to be executed. For scheduled events, this is the time the event + // was scheduled to. For any other event, this is the time when the event was created. + double CurrentEventTime() const { return current_ts; } + int Size() const { return num_events_queued - num_events_dispatched; } void Describe(ODesc* d) const override; @@ -114,6 +128,7 @@ protected: Event* tail; util::detail::SourceID current_src; analyzer::ID current_aid; + double current_ts; RecordVal* src_val; bool draining; detail::Flare queue_flare; diff --git a/src/EventHandler.cc b/src/EventHandler.cc index e15234d750..79f12d32cb 100644 --- a/src/EventHandler.cc +++ b/src/EventHandler.cc @@ -52,7 +52,7 @@ void EventHandler::SetFunc(FuncPtr f) local = std::move(f); } -void EventHandler::Call(Args* vl, bool no_remote) +void EventHandler::Call(Args* vl, bool no_remote, double ts) { if ( ! call_count ) { @@ -100,10 +100,10 @@ void EventHandler::Call(Args* vl, bool no_remote) ++it; if ( it != auto_publish.end() ) - broker_mgr->PublishEvent(topic, Name(), xs); + broker_mgr->PublishEvent(topic, Name(), xs, ts); else { - broker_mgr->PublishEvent(topic, Name(), std::move(xs)); + broker_mgr->PublishEvent(topic, Name(), std::move(xs), ts); break; } } diff --git a/src/EventHandler.h b/src/EventHandler.h index 258682adfc..89d5d705fc 100644 --- a/src/EventHandler.h +++ b/src/EventHandler.h @@ -14,6 +14,11 @@ namespace zeek { +namespace run_state + { +extern double network_time; + } // namespace run_state + class Func; using FuncPtr = IntrusivePtr; @@ -34,7 +39,7 @@ public: void AutoUnpublish(const std::string& topic) { auto_publish.erase(topic); } - void Call(zeek::Args* vl, bool no_remote = false); + void Call(zeek::Args* vl, bool no_remote = false, double ts = run_state::network_time); // Returns true if there is at least one local or remote handler. explicit operator bool() const; diff --git a/src/Expr.cc b/src/Expr.cc index 1a7c06020a..b3e8d5572d 100644 --- a/src/Expr.cc +++ b/src/Expr.cc @@ -4279,7 +4279,8 @@ ScheduleTimer::~ScheduleTimer() { } void ScheduleTimer::Dispatch(double /* t */, bool /* is_expire */) { if ( event ) - event_mgr.Enqueue(event, std::move(args)); + event_mgr.Enqueue(event, std::move(args), util::detail::SOURCE_LOCAL, 0, nullptr, + this->Time()); } ScheduleExpr::ScheduleExpr(ExprPtr arg_when, EventExprPtr arg_event) diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index fda653364c..0c47096c34 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -643,7 +643,7 @@ std::string Manager::NodeID() const return to_string(bstate->endpoint.node_id()); } -bool Manager::PublishEvent(string topic, std::string name, broker::vector args) +bool Manager::PublishEvent(string topic, std::string name, broker::vector args, double ts) { if ( bstate->endpoint.is_shutdown() ) return true; @@ -652,7 +652,7 @@ bool Manager::PublishEvent(string topic, std::string name, broker::vector args) return true; DBG_LOG(DBG_BROKER, "Publishing event: %s", RenderEvent(topic, name, args).c_str()); - broker::zeek::Event ev(std::move(name), std::move(args)); + broker::zeek::Event ev(std::move(name), std::move(args), broker::to_timestamp(ts)); bstate->endpoint.publish(std::move(topic), ev.move_data()); ++statistics.num_events_outgoing; return true; @@ -681,7 +681,10 @@ bool Manager::PublishEvent(string topic, RecordVal* args) xs.emplace_back(data_val->data); } - return PublishEvent(std::move(topic), event_name, std::move(xs)); + // At this point we come from script-land. This means that publishing of the event was + // explicitly triggered. Hence, the timestamp is set to the current network time. This also + // means that timestamping cannot be manipulated from script-land for now. + return PublishEvent(std::move(topic), event_name, std::move(xs), run_state::network_time); } bool Manager::PublishIdentifier(std::string topic, std::string id) @@ -1386,8 +1389,15 @@ void Manager::ProcessEvent(const broker::topic& topic, broker::zeek::Event ev) auto name = std::move(ev.name()); auto args = std::move(ev.args()); + double ts; - DBG_LOG(DBG_BROKER, "Process event: %s %s", name.data(), RenderMessage(args).data()); + if ( auto ev_ts = ev.ts() ) + broker::convert(*ev_ts, ts); + else + // Default to current network time, if the received event did not contain a timestamp. + ts = run_state::network_time; + + DBG_LOG(DBG_BROKER, "Process event: %s (%.6f) %s", name.data(), ts, RenderMessage(args).data()); ++statistics.num_events_incoming; auto handler = event_registry->Lookup(name); @@ -1469,7 +1479,7 @@ void Manager::ProcessEvent(const broker::topic& topic, broker::zeek::Event ev) } if ( vl.size() == args.size() ) - event_mgr.Enqueue(handler, std::move(vl), util::detail::SOURCE_BROKER); + event_mgr.Enqueue(handler, std::move(vl), util::detail::SOURCE_BROKER, 0, nullptr, ts); } bool Manager::ProcessLogCreate(broker::zeek::LogCreate lc) diff --git a/src/broker/Manager.h b/src/broker/Manager.h index 6e514990cb..eb3d050b9b 100644 --- a/src/broker/Manager.h +++ b/src/broker/Manager.h @@ -183,9 +183,11 @@ public: * of this topic name. * @param name the name of the event * @param args the event's arguments + * @param ts the timestamp the event is intended to be executed * @return true if the message is sent successfully. */ - bool PublishEvent(std::string topic, std::string name, broker::vector args); + bool PublishEvent(std::string topic, std::string name, broker::vector args, + double ts = run_state::network_time); /** * Send an event to any interested peers. @@ -193,7 +195,8 @@ public: * Peers advertise interest by registering a subscription to some prefix * of this topic name. * @param ev the event and its arguments to send to peers, in the form of - * a Broker::Event record type. + * a Broker::Event record type. The timestamp for the event is set to the + * current network time. * @return true if the message is sent successfully. */ bool PublishEvent(std::string topic, RecordVal* ev); diff --git a/src/zeek.bif b/src/zeek.bif index b0e3c6611a..1d4abd4e7a 100644 --- a/src/zeek.bif +++ b/src/zeek.bif @@ -354,6 +354,19 @@ function set_network_time%(nt: time%): bool return zeek::val_mgr->True(); %} +## Returns the timestamp of the last raised event. The timestamp reflects the +## network time the event was intended to be executed. For scheduled events, +## this is the time the event was scheduled for. For any other event, this is +## the time when the event was created. +## +## Returns: The timestamp of the last raised event. +## +## .. zeek:see:: current_time set_network_time +function current_event_time%(%): time + %{ + return zeek::make_intrusive(zeek::event_mgr.CurrentEventTime()); + %} + ## Returns a system environment variable. ## ## var: The name of the variable whose value to request. diff --git a/testing/btest/Baseline/broker.remote_event_auto_ts/recv.recv.out b/testing/btest/Baseline/broker.remote_event_auto_ts/recv.recv.out new file mode 100644 index 0000000000..d9ad0dd05e --- /dev/null +++ b/testing/btest/Baseline/broker.remote_event_auto_ts/recv.recv.out @@ -0,0 +1,11 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +receiver added peer: endpoint=127.0.0.1 msg=handshake successful +receiver got ping: my-message-a intended for 1989-12-12-22:00:00 stamped to 1989-12-12-22:00:00 (is_remote = T) +receiver got ping: my-message-b intended for 1989-12-12-22:15:00 stamped to 1989-12-12-22:15:00 (is_remote = T) +receiver got ping: my-message-c intended for 1989-12-12-22:30:00 stamped to 1989-12-12-22:30:00 (is_remote = T) +receiver got ping: my-message-a intended for 1989-12-12-23:00:00 stamped to 1989-12-12-23:00:00 (is_remote = T) +receiver got ping: my-message-b intended for 1989-12-12-23:15:00 stamped to 1989-12-12-23:15:00 (is_remote = T) +receiver got ping: my-message-c intended for 1989-12-12-23:30:00 stamped to 1989-12-12-23:30:00 (is_remote = T) +receiver got ping: my-message-a intended for 1989-12-13-00:00:00 stamped to 1989-12-13-00:00:00 (is_remote = T) +receiver got ping: my-message-b intended for 1989-12-13-00:15:00 stamped to 1989-12-13-00:15:00 (is_remote = T) +receiver got ping: my-message-c intended for 1989-12-13-00:30:00 stamped to 1989-12-13-00:30:00 (is_remote = T) diff --git a/testing/btest/Baseline/broker.remote_event_auto_ts/send.send.out b/testing/btest/Baseline/broker.remote_event_auto_ts/send.send.out new file mode 100644 index 0000000000..2a0a5c48ee --- /dev/null +++ b/testing/btest/Baseline/broker.remote_event_auto_ts/send.send.out @@ -0,0 +1,13 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +sender added peer: endpoint=127.0.0.1 msg=handshake successful +>> Run 1 (1989-12-12-22:00:00) +>> Run 2 (1989-12-12-23:00:00) +>> Run 3 (1989-12-13-00:00:00) +>> Run 4 (1989-12-13-01:00:00) +>> Run 5 (1989-12-13-02:00:00) +>> Run 6 (1989-12-13-03:00:00) +>> Run 7 (1989-12-13-04:00:00) +>> Run 8 (1989-12-13-05:00:00) +>> Run 9 (1989-12-13-06:00:00) +>> Run 10 (1989-12-13-07:00:00) +sender lost peer: endpoint=127.0.0.1 msg=lost connection to remote peer diff --git a/testing/btest/Baseline/broker.remote_event_ts/recv.recv.out b/testing/btest/Baseline/broker.remote_event_ts/recv.recv.out new file mode 100644 index 0000000000..a79faa982e --- /dev/null +++ b/testing/btest/Baseline/broker.remote_event_ts/recv.recv.out @@ -0,0 +1,12 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +receiver added peer: endpoint=127.0.0.1 msg=handshake successful +receiver got ping: my-message intended for 1989-12-12-22:00:00 stamped to 1989-12-12-22:00:00 (is_remote = T) +receiver got ping: my-message intended for 1989-12-12-23:00:00 stamped to 1989-12-12-23:00:00 (is_remote = T) +receiver got ping: my-message intended for 1989-12-13-00:00:00 stamped to 1989-12-13-00:00:00 (is_remote = T) +receiver got ping: my-message intended for 1989-12-13-01:00:00 stamped to 1989-12-13-01:00:00 (is_remote = T) +receiver got ping: my-message intended for 1989-12-13-02:00:00 stamped to 1989-12-13-02:00:00 (is_remote = T) +receiver got ping: my-message intended for 1989-12-13-03:00:00 stamped to 1989-12-13-03:00:00 (is_remote = T) +receiver got ping: my-message intended for 1989-12-13-04:00:00 stamped to 1989-12-13-04:00:00 (is_remote = T) +receiver got ping: my-message intended for 1989-12-13-05:00:00 stamped to 1989-12-13-05:00:00 (is_remote = T) +receiver got ping: my-message intended for 1989-12-13-06:00:00 stamped to 1989-12-13-06:00:00 (is_remote = T) +receiver got ping: my-message intended for 1989-12-13-07:00:00 stamped to 1989-12-13-07:00:00 (is_remote = T) diff --git a/testing/btest/Baseline/broker.remote_event_ts/send.send.out b/testing/btest/Baseline/broker.remote_event_ts/send.send.out new file mode 100644 index 0000000000..2a0a5c48ee --- /dev/null +++ b/testing/btest/Baseline/broker.remote_event_ts/send.send.out @@ -0,0 +1,13 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +sender added peer: endpoint=127.0.0.1 msg=handshake successful +>> Run 1 (1989-12-12-22:00:00) +>> Run 2 (1989-12-12-23:00:00) +>> Run 3 (1989-12-13-00:00:00) +>> Run 4 (1989-12-13-01:00:00) +>> Run 5 (1989-12-13-02:00:00) +>> Run 6 (1989-12-13-03:00:00) +>> Run 7 (1989-12-13-04:00:00) +>> Run 8 (1989-12-13-05:00:00) +>> Run 9 (1989-12-13-06:00:00) +>> Run 10 (1989-12-13-07:00:00) +sender lost peer: endpoint=127.0.0.1 msg=lost connection to remote peer diff --git a/testing/btest/Baseline/broker.remote_event_ts_compat/client.output b/testing/btest/Baseline/broker.remote_event_ts_compat/client.output new file mode 100644 index 0000000000..d61cedbbe1 --- /dev/null +++ b/testing/btest/Baseline/broker.remote_event_ts_compat/client.output @@ -0,0 +1,3 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +send event without timestamp +send event with timestamp diff --git a/testing/btest/Baseline/broker.remote_event_ts_compat/server.output b/testing/btest/Baseline/broker.remote_event_ts_compat/server.output new file mode 100644 index 0000000000..34a7c3a50b --- /dev/null +++ b/testing/btest/Baseline/broker.remote_event_ts_compat/server.output @@ -0,0 +1,5 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +added peer: endpoint=127.0.0.1 msg=handshake successful +got my_event(without ts) stamped to 42.0 at network time 42.0 +got my_event(with ts) stamped to 23.0 at network time 42.0 +lost peer: endpoint=127.0.0.1 msg=lost connection to remote peer diff --git a/testing/btest/Baseline/broker.web-socket-events-metadata/client.output b/testing/btest/Baseline/broker.web-socket-events-metadata/client.output new file mode 100644 index 0000000000..5fa2aec2b5 --- /dev/null +++ b/testing/btest/Baseline/broker.web-socket-events-metadata/client.output @@ -0,0 +1,4 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +ping args ['my-message', 1] metadata [{'@data-type': 'vector', 'data': [{'@data-type': 'count', 'data': 1}, {'@data-type': 'timestamp', 'data': '2023-04-18T12:13:14.000'}]}] +ping args ['my-message', 2] metadata [{'@data-type': 'vector', 'data': [{'@data-type': 'count', 'data': 1}, {'@data-type': 'timestamp', 'data': '2023-04-18T12:13:24.000'}]}] +ping args ['my-message', 3] metadata [{'@data-type': 'vector', 'data': [{'@data-type': 'count', 'data': 1}, {'@data-type': 'timestamp', 'data': '2023-04-18T12:13:34.000'}]}] diff --git a/testing/btest/Baseline/broker.web-socket-events-metadata/server.output b/testing/btest/Baseline/broker.web-socket-events-metadata/server.output new file mode 100644 index 0000000000..92f0a20dbf --- /dev/null +++ b/testing/btest/Baseline/broker.web-socket-events-metadata/server.output @@ -0,0 +1,6 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +sender added peer: endpoint=127.0.0.1 msg=handshake successful +sender got pong: my-message, 1 network_time=1681819994.0 current_event_time=1681819995.0 +sender got pong: my-message, 2 network_time=1681820004.0 current_event_time=1681819996.0 +sender got pong: my-message, 3 network_time=1681820014.0 current_event_time=1681819997.0 +sender lost peer: endpoint=127.0.0.1 msg=lost connection to client diff --git a/testing/btest/Baseline/language.event-ts-scheduled/out b/testing/btest/Baseline/language.event-ts-scheduled/out new file mode 100644 index 0000000000..6890f28697 --- /dev/null +++ b/testing/btest/Baseline/language.event-ts-scheduled/out @@ -0,0 +1,41 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +>> Run 0 (1989-12-12-22:00:00) +<< Run 0 (1989-12-12-22:00:00) +[1989-12-12-23:00:00] Test was scheduled at 1989-12-12-22:00:00 for 1989-12-12-22:15:00 +[1989-12-12-23:00:00] Test was scheduled at 1989-12-12-22:00:00 for 1989-12-12-22:30:00 +>> Run 1 (1989-12-12-23:00:00) +<< Run 1 (1989-12-12-23:00:00) +[1989-12-13-00:00:00] Test was scheduled at 1989-12-12-23:00:00 for 1989-12-12-23:15:00 +[1989-12-13-00:00:00] Test was scheduled at 1989-12-12-23:00:00 for 1989-12-12-23:30:00 +>> Run 2 (1989-12-13-00:00:00) +<< Run 2 (1989-12-13-00:00:00) +[1989-12-13-01:00:00] Test was scheduled at 1989-12-13-00:00:00 for 1989-12-13-00:15:00 +[1989-12-13-01:00:00] Test was scheduled at 1989-12-13-00:00:00 for 1989-12-13-00:30:00 +>> Run 3 (1989-12-13-01:00:00) +<< Run 3 (1989-12-13-01:00:00) +[1989-12-13-02:00:00] Test was scheduled at 1989-12-13-01:00:00 for 1989-12-13-01:15:00 +[1989-12-13-02:00:00] Test was scheduled at 1989-12-13-01:00:00 for 1989-12-13-01:30:00 +>> Run 4 (1989-12-13-02:00:00) +<< Run 4 (1989-12-13-02:00:00) +[1989-12-13-03:00:00] Test was scheduled at 1989-12-13-02:00:00 for 1989-12-13-02:15:00 +[1989-12-13-03:00:00] Test was scheduled at 1989-12-13-02:00:00 for 1989-12-13-02:30:00 +>> Run 5 (1989-12-13-03:00:00) +<< Run 5 (1989-12-13-03:00:00) +[1989-12-13-04:00:00] Test was scheduled at 1989-12-13-03:00:00 for 1989-12-13-03:15:00 +[1989-12-13-04:00:00] Test was scheduled at 1989-12-13-03:00:00 for 1989-12-13-03:30:00 +>> Run 6 (1989-12-13-04:00:00) +<< Run 6 (1989-12-13-04:00:00) +[1989-12-13-05:00:00] Test was scheduled at 1989-12-13-04:00:00 for 1989-12-13-04:15:00 +[1989-12-13-05:00:00] Test was scheduled at 1989-12-13-04:00:00 for 1989-12-13-04:30:00 +>> Run 7 (1989-12-13-05:00:00) +<< Run 7 (1989-12-13-05:00:00) +[1989-12-13-06:00:00] Test was scheduled at 1989-12-13-05:00:00 for 1989-12-13-05:15:00 +[1989-12-13-06:00:00] Test was scheduled at 1989-12-13-05:00:00 for 1989-12-13-05:30:00 +>> Run 8 (1989-12-13-06:00:00) +<< Run 8 (1989-12-13-06:00:00) +[1989-12-13-07:00:00] Test was scheduled at 1989-12-13-06:00:00 for 1989-12-13-06:15:00 +[1989-12-13-07:00:00] Test was scheduled at 1989-12-13-06:00:00 for 1989-12-13-06:30:00 +>> Run 9 (1989-12-13-07:00:00) +<< Run 9 (1989-12-13-07:00:00) +[1989-12-13-07:00:00] Test was scheduled at 1989-12-13-07:00:00 for 1989-12-13-07:15:00 +[1989-12-13-07:00:00] Test was scheduled at 1989-12-13-07:00:00 for 1989-12-13-07:30:00 diff --git a/testing/btest/Baseline/language.event-ts/out b/testing/btest/Baseline/language.event-ts/out new file mode 100644 index 0000000000..7ced6e45a0 --- /dev/null +++ b/testing/btest/Baseline/language.event-ts/out @@ -0,0 +1,51 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +>> Run 0 (1989-12-12-22:00:00): +[1989-12-12-22:00:00] Test 4 was scheduled at 1989-12-12-22:00:00 +[1989-12-12-22:00:00] Test 3 was scheduled at 1989-12-12-22:00:00 +[1989-12-12-22:00:00] Test 2 was scheduled at 1989-12-12-22:00:00 +[1989-12-12-22:00:01] Test 1 was scheduled at 1989-12-12-22:00:00 +>> Run 1 (1989-12-12-22:00:01): +[1989-12-12-22:00:01] Test 4 was scheduled at 1989-12-12-22:00:01 +[1989-12-12-22:00:01] Test 3 was scheduled at 1989-12-12-22:00:01 +[1989-12-12-22:00:01] Test 2 was scheduled at 1989-12-12-22:00:01 +[1989-12-12-22:00:02] Test 1 was scheduled at 1989-12-12-22:00:01 +>> Run 2 (1989-12-12-22:00:02): +[1989-12-12-22:00:02] Test 4 was scheduled at 1989-12-12-22:00:02 +[1989-12-12-22:00:02] Test 3 was scheduled at 1989-12-12-22:00:02 +[1989-12-12-22:00:02] Test 2 was scheduled at 1989-12-12-22:00:02 +[1989-12-12-22:00:03] Test 1 was scheduled at 1989-12-12-22:00:02 +>> Run 3 (1989-12-12-22:00:03): +[1989-12-12-22:00:03] Test 4 was scheduled at 1989-12-12-22:00:03 +[1989-12-12-22:00:03] Test 3 was scheduled at 1989-12-12-22:00:03 +[1989-12-12-22:00:03] Test 2 was scheduled at 1989-12-12-22:00:03 +[1989-12-12-22:00:04] Test 1 was scheduled at 1989-12-12-22:00:03 +>> Run 4 (1989-12-12-22:00:04): +[1989-12-12-22:00:04] Test 4 was scheduled at 1989-12-12-22:00:04 +[1989-12-12-22:00:04] Test 3 was scheduled at 1989-12-12-22:00:04 +[1989-12-12-22:00:04] Test 2 was scheduled at 1989-12-12-22:00:04 +[1989-12-12-22:00:05] Test 1 was scheduled at 1989-12-12-22:00:04 +>> Run 5 (1989-12-12-22:00:05): +[1989-12-12-22:00:05] Test 4 was scheduled at 1989-12-12-22:00:05 +[1989-12-12-22:00:05] Test 3 was scheduled at 1989-12-12-22:00:05 +[1989-12-12-22:00:05] Test 2 was scheduled at 1989-12-12-22:00:05 +[1989-12-12-22:00:06] Test 1 was scheduled at 1989-12-12-22:00:05 +>> Run 6 (1989-12-12-22:00:06): +[1989-12-12-22:00:06] Test 4 was scheduled at 1989-12-12-22:00:06 +[1989-12-12-22:00:06] Test 3 was scheduled at 1989-12-12-22:00:06 +[1989-12-12-22:00:06] Test 2 was scheduled at 1989-12-12-22:00:06 +[1989-12-12-22:00:07] Test 1 was scheduled at 1989-12-12-22:00:06 +>> Run 7 (1989-12-12-22:00:07): +[1989-12-12-22:00:07] Test 4 was scheduled at 1989-12-12-22:00:07 +[1989-12-12-22:00:07] Test 3 was scheduled at 1989-12-12-22:00:07 +[1989-12-12-22:00:07] Test 2 was scheduled at 1989-12-12-22:00:07 +[1989-12-12-22:00:08] Test 1 was scheduled at 1989-12-12-22:00:07 +>> Run 8 (1989-12-12-22:00:08): +[1989-12-12-22:00:08] Test 4 was scheduled at 1989-12-12-22:00:08 +[1989-12-12-22:00:08] Test 3 was scheduled at 1989-12-12-22:00:08 +[1989-12-12-22:00:08] Test 2 was scheduled at 1989-12-12-22:00:08 +[1989-12-12-22:00:09] Test 1 was scheduled at 1989-12-12-22:00:08 +>> Run 9 (1989-12-12-22:00:09): +[1989-12-12-22:00:09] Test 4 was scheduled at 1989-12-12-22:00:09 +[1989-12-12-22:00:09] Test 3 was scheduled at 1989-12-12-22:00:09 +[1989-12-12-22:00:09] Test 2 was scheduled at 1989-12-12-22:00:09 +[1989-12-12-22:00:09] Test 1 was scheduled at 1989-12-12-22:00:09 diff --git a/testing/btest/Traces/ticks-dns-1hr.pcap b/testing/btest/Traces/ticks-dns-1hr.pcap new file mode 100644 index 0000000000..ea0fc4401b Binary files /dev/null and b/testing/btest/Traces/ticks-dns-1hr.pcap differ diff --git a/testing/btest/Traces/ticks-dns.pcap b/testing/btest/Traces/ticks-dns.pcap new file mode 100644 index 0000000000..41a9b3d74b Binary files /dev/null and b/testing/btest/Traces/ticks-dns.pcap differ diff --git a/testing/btest/broker/remote_event_auto_ts.zeek b/testing/btest/broker/remote_event_auto_ts.zeek new file mode 100644 index 0000000000..31bc2da59f --- /dev/null +++ b/testing/btest/broker/remote_event_auto_ts.zeek @@ -0,0 +1,82 @@ +# @TEST-GROUP: broker +# +# @TEST-PORT: BROKER_PORT +# +# @TEST-EXEC: btest-bg-run recv "zeek -b ../recv.zeek >recv.out" +# @TEST-EXEC: btest-bg-run send "zeek -b -r $TRACES/ticks-dns-1hr.pcap ../send.zeek >send.out" +# +# @TEST-EXEC: btest-bg-wait 45 +# @TEST-EXEC: btest-diff recv/recv.out +# @TEST-EXEC: btest-diff send/send.out + +# @TEST-START-FILE send.zeek + +redef exit_only_after_terminate = T; + +global runs = 0; +global ping: event(msg: string, intended_ts: time); + +event zeek_init() + { + suspend_processing(); + Broker::subscribe("zeek/event/my_topic"); + Broker::auto_publish("zeek/event/my_topic", ping); + Broker::peer("127.0.0.1", to_port(getenv("BROKER_PORT"))); + } + +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) + { + print fmt("sender added peer: endpoint=%s msg=%s", + endpoint$network$address, msg); + continue_processing(); + } + +event new_connection(c: connection) + { + print fmt(">> Run %s (%D)", ++runs, network_time()); + + event ping("my-message-a", network_time()); + schedule 30 mins { ping("my-message-c", network_time() + 30 mins) }; + schedule 15 mins { ping("my-message-b", network_time() + 15 mins) }; + } + +event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) + { + print fmt("sender lost peer: endpoint=%s msg=%s", + endpoint$network$address, msg); + terminate(); + } + +# @TEST-END-FILE + + +# @TEST-START-FILE recv.zeek + +redef exit_only_after_terminate = T; + +global msg_count = 0; + +event zeek_init() + { + Broker::subscribe("zeek/event/my_topic"); + Broker::listen("127.0.0.1", to_port(getenv("BROKER_PORT"))); + } + +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) + { + print fmt("receiver added peer: endpoint=%s msg=%s", endpoint$network$address, msg); + } + +event ping(msg: string, intended_ts: time) &is_used + { + if ( ++msg_count >= 10 ) + { + terminate(); + return; + } + + print fmt("receiver got ping: %s intended for %D stamped to %D (is_remote = %s)", + msg, intended_ts, current_event_time(), is_remote_event()); + } + +# @TEST-END-FILE diff --git a/testing/btest/broker/remote_event_ts.zeek b/testing/btest/broker/remote_event_ts.zeek new file mode 100644 index 0000000000..e4f2f508d1 --- /dev/null +++ b/testing/btest/broker/remote_event_ts.zeek @@ -0,0 +1,77 @@ +# @TEST-GROUP: broker +# +# @TEST-PORT: BROKER_PORT +# +# @TEST-EXEC: btest-bg-run recv "zeek -b ../recv.zeek >recv.out" +# @TEST-EXEC: btest-bg-run send "zeek -b -r $TRACES/ticks-dns-1hr.pcap ../send.zeek >send.out" +# +# @TEST-EXEC: btest-bg-wait 45 +# @TEST-EXEC: btest-diff recv/recv.out +# @TEST-EXEC: btest-diff send/send.out + +# @TEST-START-FILE send.zeek + +redef exit_only_after_terminate = T; + +global runs = 0; +global ping: event(msg: string, intended_ts: time); + +event zeek_init() + { + Broker::subscribe("zeek/event/my_topic"); + Broker::peer("127.0.0.1", to_port(getenv("BROKER_PORT"))); + suspend_processing(); + } + +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) + { + print fmt("sender added peer: endpoint=%s msg=%s", + endpoint$network$address, msg); + continue_processing(); + } + +event new_connection(c: connection) + { + print fmt(">> Run %s (%D)", ++runs, network_time()); + + local e = Broker::make_event(ping, "my-message", network_time()); + Broker::publish("zeek/event/my_topic", e); + } + +event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) + { + print fmt("sender lost peer: endpoint=%s msg=%s", + endpoint$network$address, msg); + terminate(); + } + +# @TEST-END-FILE + + +# @TEST-START-FILE recv.zeek + +redef exit_only_after_terminate = T; + +global msg_count = 0; + +event zeek_init() + { + Broker::subscribe("zeek/event/my_topic"); + Broker::listen("127.0.0.1", to_port(getenv("BROKER_PORT"))); + } + +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) + { + print fmt("receiver added peer: endpoint=%s msg=%s", endpoint$network$address, msg); + } + +event ping(msg: string, intended_ts: time) + { + print fmt("receiver got ping: %s intended for %D stamped to %D (is_remote = %s)", + msg, intended_ts, current_event_time(), is_remote_event()); + + if ( ++msg_count >= 10 ) + terminate(); + } + +# @TEST-END-FILE diff --git a/testing/btest/broker/remote_event_ts_compat.zeek b/testing/btest/broker/remote_event_ts_compat.zeek new file mode 100644 index 0000000000..3bf5fa6f7d --- /dev/null +++ b/testing/btest/broker/remote_event_ts_compat.zeek @@ -0,0 +1,92 @@ +# @TEST-DOC: Test compatibility with peers sending events without timestamps. +# +# @TEST-GROUP: broker +# @TEST-PORT: BROKER_PORT +# +# @TEST-REQUIRES: python3 -V +# @TEST-REQUIRES: TOPIC=/zeek/my_topic python3 client.py check +# +# @TEST-EXEC: TOPIC=/zeek/my_topic btest-bg-run server "zeek %INPUT >output" +# @TEST-EXEC: TOPIC=/zeek/my_topic btest-bg-run client "python3 ../client.py >output" +# +# @TEST-EXEC: btest-bg-wait 45 +# @TEST-EXEC: btest-diff server/output +# @TEST-EXEC: btest-diff client/output + +redef exit_only_after_terminate = T; +redef allow_network_time_forward = F; + +event zeek_init() + { + Broker::subscribe(getenv("TOPIC")); + Broker::listen("127.0.0.1", to_port(getenv("BROKER_PORT"))); + set_network_time(double_to_time(42.0)); + } + +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) + { + print fmt("added peer: endpoint=%s msg=%s", endpoint$network$address, msg); + } + +event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) + { + print fmt("lost peer: endpoint=%s msg=%s", endpoint$network$address, msg); + terminate(); + } + +event my_event(msg: string) &is_used + { + print fmt("got my_event(%s) stamped to %s at network time %s", + msg, current_event_time(), network_time()); + } + + +@TEST-START-FILE client.py +""" +Python script sending timestamped and non-timestamped event to TOPIC +""" +import datetime +import os +import sys + +# Prep the PYTHONPATH for the build directory. +broker_path = os.path.join(os.environ["BUILD"], "auxil", "broker", "python") +sys.path.insert(0, broker_path) + +import broker + +# 1024/tcp +broker_port = int(os.environ["BROKER_PORT"].split("/")[0]) +broker_topic = os.environ["TOPIC"] + +# We were able to import broker and parse the broker_port, should be good. +if len(sys.argv) > 1 and sys.argv[1] == "check": + sys.exit(0) + +# Setup endpoint and connect to Zeek. +with broker.Endpoint() as ep, \ + ep.make_status_subscriber(True) as ss: + + ep.peer("127.0.0.1", broker_port) + st = ss.get(2) + if not (st[0].code() == broker.SC.EndpointDiscovered and + st[1].code() == broker.SC.PeerAdded): + print("could not connect") + exit(0) + + # Send events and close connection + print("send event without timestamp") + my_event = broker.zeek.Event("my_event", "without ts") + ep.publish(broker_topic, my_event) + + print("send event with timestamp") + ts = datetime.datetime.fromtimestamp(23.0, broker.utc) + metadata = { + broker.zeek.Metadata.NETWORK_TIMESTAMP: ts, + } + my_event = broker.zeek.Event("my_event", "with ts", metadata=metadata) + ep.publish(broker_topic, my_event) + + ep.shutdown() + +@TEST-END-FILE diff --git a/testing/btest/broker/web-socket-events-metadata.zeek b/testing/btest/broker/web-socket-events-metadata.zeek new file mode 100644 index 0000000000..52b1c8ab3b --- /dev/null +++ b/testing/btest/broker/web-socket-events-metadata.zeek @@ -0,0 +1,148 @@ +# @TEST-GROUP: broker +# +# This test requires the websockets module, available via +# "pip install websockets". +# @TEST-REQUIRES: python3 -c 'import websockets' +# +# @TEST-PORT: BROKER_PORT +# +# @TEST-EXEC: btest-bg-run server "zeek -b %INPUT >output" +# @TEST-EXEC: btest-bg-run client "python3 ../client.py >output" +# +# @TEST-EXEC: btest-bg-wait 5 +# @TEST-EXEC: btest-diff client/output +# @TEST-EXEC: TEST_DIFF_CANONIFIER= btest-diff server/output + +redef allow_network_time_forward = F; +redef exit_only_after_terminate = T; +redef Broker::disable_ssl = T; + +global event_count = 0; + +global ping: event(msg: string, c: count); + +event zeek_init() + { + # Tue 18 Apr 2023 12:13:14 PM UTC + set_network_time(double_to_time(1681819994.0)); + Broker::subscribe("/zeek/event/my_topic"); + Broker::listen_websocket("127.0.0.1", to_port(getenv("BROKER_PORT"))); + } + +function send_event() + { + ++event_count; + local e = Broker::make_event(ping, "my-message", event_count); + Broker::publish("/zeek/event/my_topic", e); + } + +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) + { + print fmt("sender added peer: endpoint=%s msg=%s", endpoint$network$address, msg); + send_event(); + } + +event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) + { + print fmt("sender lost peer: endpoint=%s msg=%s", endpoint$network$address, msg); + terminate(); + } + +event pong(msg: string, n: count) &is_used + { + print fmt("sender got pong: %s, %s network_time=%s current_event_time=%s", + msg, n, network_time(), current_event_time()); + set_network_time(network_time() + 10sec); + send_event(); + } + + +@TEST-START-FILE client.py +import asyncio, datetime, websockets, os, time, json, sys + +ws_port = os.environ['BROKER_PORT'].split('/')[0] +ws_url = 'ws://localhost:%s/v1/messages/json' % ws_port +topic = '"/zeek/event/my_topic"' + +def broker_value(type, val): + return { + '@data-type': type, + 'data': val + } + +async def do_run(): + # Try up to 30 times. + connected = False + for i in range(30): + try: + ws = await websockets.connect(ws_url) + connected = True + + # send filter and wait for ack + await ws.send('[%s]' % topic) + ack_json = await ws.recv() + ack = json.loads(ack_json) + if not 'type' in ack or ack['type'] != 'ack': + print('*** unexpected ACK from server:') + print(ack_json) + sys.exit() + except Exception as e: + if not connected: + print('failed to connect to %s, try again (%s)' % (ws_url, e), file=sys.stderr) + await asyncio.sleep(1) + continue + else: + print('exception: %s' % e, file=sys.stderr) + sys.exit() + + for round in range(3): + # wait for ping + msg = await ws.recv() + msg = json.loads(msg) + if not 'type' in msg or msg['type'] != 'data-message': + print("unexpected type", msg) + continue + ping = msg['data'][2]['data'] + if len(ping) < 3: + print("no metadata on event") + continue + + name = ping[0]['data'] + args = [x['data'] for x in ping[1]['data']] + metadata = ping[2]['data'] + print(name, "args", args, "metadata", metadata) + + # send pong + dt = datetime.datetime.utcfromtimestamp(1681819994 + args[1]) + ts_str = dt.isoformat('T', 'milliseconds') + pong = [ + broker_value('string', 'pong'), + broker_value('vector', [ + broker_value('string', args[0]), + broker_value('count', args[1]), + ]), + broker_value('vector', [ + broker_value('vector', [ + broker_value('count', 1), # network_timestamp + broker_value('timestamp', ts_str), + ]), + ]), + ] + + ev = [broker_value('count', 1), broker_value('count', 1), broker_value('vector', pong)] + msg = { + 'type': 'data-message', + 'topic': '/zeek/event/my_topic', + '@data-type': 'vector', 'data': ev + } + + msg = json.dumps(msg) + await ws.send(msg) + + await ws.close() + sys.exit() + +loop = asyncio.get_event_loop() +loop.run_until_complete(do_run()) + +@TEST-END-FILE diff --git a/testing/btest/broker/web-socket-events.zeek b/testing/btest/broker/web-socket-events.zeek index b94ef9c362..9002b803b4 100644 --- a/testing/btest/broker/web-socket-events.zeek +++ b/testing/btest/broker/web-socket-events.zeek @@ -84,7 +84,7 @@ async def do_run(): except Exception as e: if not connected: print('failed to connect to %s, try again (%s)' % (ws_url, e), file=sys.stderr) - time.sleep(1) + await asyncio.sleep(1) continue else: print('exception: %s' % e, file=sys.stderr) diff --git a/testing/btest/language/event-ts-scheduled.zeek b/testing/btest/language/event-ts-scheduled.zeek new file mode 100644 index 0000000000..bc0088845f --- /dev/null +++ b/testing/btest/language/event-ts-scheduled.zeek @@ -0,0 +1,20 @@ +# @TEST-EXEC: zeek -b -r $TRACES/ticks-dns-1hr.pcap %INPUT > out +# @TEST-EXEC: btest-diff out + +global runs = 0; + +event test(schedule_time: time) + { + print fmt("[%D] Test was scheduled at %D for %D", network_time(), + schedule_time, current_event_time()); + } + +event new_connection(c: connection) + { + local nt = network_time(); + print fmt(">> Run %s (%D)", runs, nt); + schedule 30 mins { test(nt) }; + schedule 15 mins { test(nt) }; + print fmt("<< Run %s (%D)", runs, nt); + ++runs; + } diff --git a/testing/btest/language/event-ts.zeek b/testing/btest/language/event-ts.zeek new file mode 100644 index 0000000000..f35c3e5994 --- /dev/null +++ b/testing/btest/language/event-ts.zeek @@ -0,0 +1,28 @@ +# @TEST-EXEC: zeek -b -r $TRACES/ticks-dns.pcap %INPUT > out +# @TEST-EXEC: btest-diff out + +# Note: We use a PCAP with DNS queries only so that we have a single packet per +# time step. Thus the run loop will be executed only once per time step. + +global runs = -1; + +event test(depth: count) + { + if ( depth == 0 ) + return; + + print fmt("[%D] Test %s was scheduled at %D", network_time(), depth, current_event_time()); + event test(--depth); + } + +event new_connection(c: connection) + { + print fmt(">> Run %s (%D):", ++runs, network_time()); + # Descend into recursion to enqueue events until we add an event that will + # be handled in the next run loop iteration, i.e. at a different timestamp + # than it was enqueued. Use four levels of recursion as every drain of the + # event queue handles two layers and the event queue is drained two times. + # First after processing a packet and second in the run loop. Finally, we + # expect an event so that network_time() > current_event_time(). + event test(4); + }