diff --git a/src/Event.cc b/src/Event.cc index 3a2426c121..d80787d325 100644 --- a/src/Event.cc +++ b/src/Event.cc @@ -53,7 +53,7 @@ void Event::Dispatch(bool no_remote) try { - handler->Call(&args, no_remote); + handler->Call(&args, no_remote, timestamp); } catch ( InterpreterException& e ) diff --git a/src/EventHandler.cc b/src/EventHandler.cc index e15234d750..79f12d32cb 100644 --- a/src/EventHandler.cc +++ b/src/EventHandler.cc @@ -52,7 +52,7 @@ void EventHandler::SetFunc(FuncPtr f) local = std::move(f); } -void EventHandler::Call(Args* vl, bool no_remote) +void EventHandler::Call(Args* vl, bool no_remote, double ts) { if ( ! call_count ) { @@ -100,10 +100,10 @@ void EventHandler::Call(Args* vl, bool no_remote) ++it; if ( it != auto_publish.end() ) - broker_mgr->PublishEvent(topic, Name(), xs); + broker_mgr->PublishEvent(topic, Name(), xs, ts); else { - broker_mgr->PublishEvent(topic, Name(), std::move(xs)); + broker_mgr->PublishEvent(topic, Name(), std::move(xs), ts); break; } } diff --git a/src/EventHandler.h b/src/EventHandler.h index 258682adfc..89d5d705fc 100644 --- a/src/EventHandler.h +++ b/src/EventHandler.h @@ -14,6 +14,11 @@ namespace zeek { +namespace run_state + { +extern double network_time; + } // namespace run_state + class Func; using FuncPtr = IntrusivePtr; @@ -34,7 +39,7 @@ public: void AutoUnpublish(const std::string& topic) { auto_publish.erase(topic); } - void Call(zeek::Args* vl, bool no_remote = false); + void Call(zeek::Args* vl, bool no_remote = false, double ts = run_state::network_time); // Returns true if there is at least one local or remote handler. explicit operator bool() const; diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index 54eb2bac82..0c47096c34 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -643,7 +643,7 @@ std::string Manager::NodeID() const return to_string(bstate->endpoint.node_id()); } -bool Manager::PublishEvent(string topic, std::string name, broker::vector args) +bool Manager::PublishEvent(string topic, std::string name, broker::vector args, double ts) { if ( bstate->endpoint.is_shutdown() ) return true; @@ -652,7 +652,7 @@ bool Manager::PublishEvent(string topic, std::string name, broker::vector args) return true; DBG_LOG(DBG_BROKER, "Publishing event: %s", RenderEvent(topic, name, args).c_str()); - broker::zeek::Event ev(std::move(name), std::move(args), run_state::network_time); + broker::zeek::Event ev(std::move(name), std::move(args), broker::to_timestamp(ts)); bstate->endpoint.publish(std::move(topic), ev.move_data()); ++statistics.num_events_outgoing; return true; @@ -681,7 +681,10 @@ bool Manager::PublishEvent(string topic, RecordVal* args) xs.emplace_back(data_val->data); } - return PublishEvent(std::move(topic), event_name, std::move(xs)); + // At this point we come from script-land. This means that publishing of the event was + // explicitly triggered. Hence, the timestamp is set to the current network time. This also + // means that timestamping cannot be manipulated from script-land for now. + return PublishEvent(std::move(topic), event_name, std::move(xs), run_state::network_time); } bool Manager::PublishIdentifier(std::string topic, std::string id) diff --git a/src/broker/Manager.h b/src/broker/Manager.h index 6e514990cb..eb3d050b9b 100644 --- a/src/broker/Manager.h +++ b/src/broker/Manager.h @@ -183,9 +183,11 @@ public: * of this topic name. * @param name the name of the event * @param args the event's arguments + * @param ts the timestamp the event is intended to be executed * @return true if the message is sent successfully. */ - bool PublishEvent(std::string topic, std::string name, broker::vector args); + bool PublishEvent(std::string topic, std::string name, broker::vector args, + double ts = run_state::network_time); /** * Send an event to any interested peers. @@ -193,7 +195,8 @@ public: * Peers advertise interest by registering a subscription to some prefix * of this topic name. * @param ev the event and its arguments to send to peers, in the form of - * a Broker::Event record type. + * a Broker::Event record type. The timestamp for the event is set to the + * current network time. * @return true if the message is sent successfully. */ bool PublishEvent(std::string topic, RecordVal* ev); diff --git a/testing/btest/Baseline/broker.remote_event_auto_ts/recv.recv.out b/testing/btest/Baseline/broker.remote_event_auto_ts/recv.recv.out new file mode 100644 index 0000000000..d9ad0dd05e --- /dev/null +++ b/testing/btest/Baseline/broker.remote_event_auto_ts/recv.recv.out @@ -0,0 +1,11 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +receiver added peer: endpoint=127.0.0.1 msg=handshake successful +receiver got ping: my-message-a intended for 1989-12-12-22:00:00 stamped to 1989-12-12-22:00:00 (is_remote = T) +receiver got ping: my-message-b intended for 1989-12-12-22:15:00 stamped to 1989-12-12-22:15:00 (is_remote = T) +receiver got ping: my-message-c intended for 1989-12-12-22:30:00 stamped to 1989-12-12-22:30:00 (is_remote = T) +receiver got ping: my-message-a intended for 1989-12-12-23:00:00 stamped to 1989-12-12-23:00:00 (is_remote = T) +receiver got ping: my-message-b intended for 1989-12-12-23:15:00 stamped to 1989-12-12-23:15:00 (is_remote = T) +receiver got ping: my-message-c intended for 1989-12-12-23:30:00 stamped to 1989-12-12-23:30:00 (is_remote = T) +receiver got ping: my-message-a intended for 1989-12-13-00:00:00 stamped to 1989-12-13-00:00:00 (is_remote = T) +receiver got ping: my-message-b intended for 1989-12-13-00:15:00 stamped to 1989-12-13-00:15:00 (is_remote = T) +receiver got ping: my-message-c intended for 1989-12-13-00:30:00 stamped to 1989-12-13-00:30:00 (is_remote = T) diff --git a/testing/btest/Baseline/broker.remote_event_auto_ts/send.send.out b/testing/btest/Baseline/broker.remote_event_auto_ts/send.send.out new file mode 100644 index 0000000000..2a0a5c48ee --- /dev/null +++ b/testing/btest/Baseline/broker.remote_event_auto_ts/send.send.out @@ -0,0 +1,13 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +sender added peer: endpoint=127.0.0.1 msg=handshake successful +>> Run 1 (1989-12-12-22:00:00) +>> Run 2 (1989-12-12-23:00:00) +>> Run 3 (1989-12-13-00:00:00) +>> Run 4 (1989-12-13-01:00:00) +>> Run 5 (1989-12-13-02:00:00) +>> Run 6 (1989-12-13-03:00:00) +>> Run 7 (1989-12-13-04:00:00) +>> Run 8 (1989-12-13-05:00:00) +>> Run 9 (1989-12-13-06:00:00) +>> Run 10 (1989-12-13-07:00:00) +sender lost peer: endpoint=127.0.0.1 msg=lost connection to remote peer diff --git a/testing/btest/broker/remote_event_auto_ts.zeek b/testing/btest/broker/remote_event_auto_ts.zeek new file mode 100644 index 0000000000..31bc2da59f --- /dev/null +++ b/testing/btest/broker/remote_event_auto_ts.zeek @@ -0,0 +1,82 @@ +# @TEST-GROUP: broker +# +# @TEST-PORT: BROKER_PORT +# +# @TEST-EXEC: btest-bg-run recv "zeek -b ../recv.zeek >recv.out" +# @TEST-EXEC: btest-bg-run send "zeek -b -r $TRACES/ticks-dns-1hr.pcap ../send.zeek >send.out" +# +# @TEST-EXEC: btest-bg-wait 45 +# @TEST-EXEC: btest-diff recv/recv.out +# @TEST-EXEC: btest-diff send/send.out + +# @TEST-START-FILE send.zeek + +redef exit_only_after_terminate = T; + +global runs = 0; +global ping: event(msg: string, intended_ts: time); + +event zeek_init() + { + suspend_processing(); + Broker::subscribe("zeek/event/my_topic"); + Broker::auto_publish("zeek/event/my_topic", ping); + Broker::peer("127.0.0.1", to_port(getenv("BROKER_PORT"))); + } + +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) + { + print fmt("sender added peer: endpoint=%s msg=%s", + endpoint$network$address, msg); + continue_processing(); + } + +event new_connection(c: connection) + { + print fmt(">> Run %s (%D)", ++runs, network_time()); + + event ping("my-message-a", network_time()); + schedule 30 mins { ping("my-message-c", network_time() + 30 mins) }; + schedule 15 mins { ping("my-message-b", network_time() + 15 mins) }; + } + +event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) + { + print fmt("sender lost peer: endpoint=%s msg=%s", + endpoint$network$address, msg); + terminate(); + } + +# @TEST-END-FILE + + +# @TEST-START-FILE recv.zeek + +redef exit_only_after_terminate = T; + +global msg_count = 0; + +event zeek_init() + { + Broker::subscribe("zeek/event/my_topic"); + Broker::listen("127.0.0.1", to_port(getenv("BROKER_PORT"))); + } + +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) + { + print fmt("receiver added peer: endpoint=%s msg=%s", endpoint$network$address, msg); + } + +event ping(msg: string, intended_ts: time) &is_used + { + if ( ++msg_count >= 10 ) + { + terminate(); + return; + } + + print fmt("receiver got ping: %s intended for %D stamped to %D (is_remote = %s)", + msg, intended_ts, current_event_time(), is_remote_event()); + } + +# @TEST-END-FILE