diff --git a/auxil/broker b/auxil/broker index 000834f60a..3dd913bacf 160000 --- a/auxil/broker +++ b/auxil/broker @@ -1 +1 @@ -Subproject commit 000834f60ab7540041c431a3657c23c7476e368d +Subproject commit 3dd913bacfc394f8108bf8306be0a60c253a3d13 diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index fda653364c..54eb2bac82 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -652,7 +652,7 @@ bool Manager::PublishEvent(string topic, std::string name, broker::vector args) return true; DBG_LOG(DBG_BROKER, "Publishing event: %s", RenderEvent(topic, name, args).c_str()); - broker::zeek::Event ev(std::move(name), std::move(args)); + broker::zeek::Event ev(std::move(name), std::move(args), run_state::network_time); bstate->endpoint.publish(std::move(topic), ev.move_data()); ++statistics.num_events_outgoing; return true; @@ -1386,8 +1386,15 @@ void Manager::ProcessEvent(const broker::topic& topic, broker::zeek::Event ev) auto name = std::move(ev.name()); auto args = std::move(ev.args()); + double ts; - DBG_LOG(DBG_BROKER, "Process event: %s %s", name.data(), RenderMessage(args).data()); + if ( auto ev_ts = ev.ts() ) + broker::convert(*ev_ts, ts); + else + // Default to current network time, if the received event did not contain a timestamp. + ts = run_state::network_time; + + DBG_LOG(DBG_BROKER, "Process event: %s (%.6f) %s", name.data(), ts, RenderMessage(args).data()); ++statistics.num_events_incoming; auto handler = event_registry->Lookup(name); @@ -1469,7 +1476,7 @@ void Manager::ProcessEvent(const broker::topic& topic, broker::zeek::Event ev) } if ( vl.size() == args.size() ) - event_mgr.Enqueue(handler, std::move(vl), util::detail::SOURCE_BROKER); + event_mgr.Enqueue(handler, std::move(vl), util::detail::SOURCE_BROKER, 0, nullptr, ts); } bool Manager::ProcessLogCreate(broker::zeek::LogCreate lc) diff --git a/testing/btest/Baseline/broker.remote_event_ts/recv.recv.out b/testing/btest/Baseline/broker.remote_event_ts/recv.recv.out new file mode 100644 index 0000000000..a79faa982e --- /dev/null +++ b/testing/btest/Baseline/broker.remote_event_ts/recv.recv.out @@ -0,0 +1,12 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +receiver added peer: endpoint=127.0.0.1 msg=handshake successful +receiver got ping: my-message intended for 1989-12-12-22:00:00 stamped to 1989-12-12-22:00:00 (is_remote = T) +receiver got ping: my-message intended for 1989-12-12-23:00:00 stamped to 1989-12-12-23:00:00 (is_remote = T) +receiver got ping: my-message intended for 1989-12-13-00:00:00 stamped to 1989-12-13-00:00:00 (is_remote = T) +receiver got ping: my-message intended for 1989-12-13-01:00:00 stamped to 1989-12-13-01:00:00 (is_remote = T) +receiver got ping: my-message intended for 1989-12-13-02:00:00 stamped to 1989-12-13-02:00:00 (is_remote = T) +receiver got ping: my-message intended for 1989-12-13-03:00:00 stamped to 1989-12-13-03:00:00 (is_remote = T) +receiver got ping: my-message intended for 1989-12-13-04:00:00 stamped to 1989-12-13-04:00:00 (is_remote = T) +receiver got ping: my-message intended for 1989-12-13-05:00:00 stamped to 1989-12-13-05:00:00 (is_remote = T) +receiver got ping: my-message intended for 1989-12-13-06:00:00 stamped to 1989-12-13-06:00:00 (is_remote = T) +receiver got ping: my-message intended for 1989-12-13-07:00:00 stamped to 1989-12-13-07:00:00 (is_remote = T) diff --git a/testing/btest/Baseline/broker.remote_event_ts/send.send.out b/testing/btest/Baseline/broker.remote_event_ts/send.send.out new file mode 100644 index 0000000000..2a0a5c48ee --- /dev/null +++ b/testing/btest/Baseline/broker.remote_event_ts/send.send.out @@ -0,0 +1,13 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +sender added peer: endpoint=127.0.0.1 msg=handshake successful +>> Run 1 (1989-12-12-22:00:00) +>> Run 2 (1989-12-12-23:00:00) +>> Run 3 (1989-12-13-00:00:00) +>> Run 4 (1989-12-13-01:00:00) +>> Run 5 (1989-12-13-02:00:00) +>> Run 6 (1989-12-13-03:00:00) +>> Run 7 (1989-12-13-04:00:00) +>> Run 8 (1989-12-13-05:00:00) +>> Run 9 (1989-12-13-06:00:00) +>> Run 10 (1989-12-13-07:00:00) +sender lost peer: endpoint=127.0.0.1 msg=lost connection to remote peer diff --git a/testing/btest/broker/remote_event_ts.zeek b/testing/btest/broker/remote_event_ts.zeek new file mode 100644 index 0000000000..e4f2f508d1 --- /dev/null +++ b/testing/btest/broker/remote_event_ts.zeek @@ -0,0 +1,77 @@ +# @TEST-GROUP: broker +# +# @TEST-PORT: BROKER_PORT +# +# @TEST-EXEC: btest-bg-run recv "zeek -b ../recv.zeek >recv.out" +# @TEST-EXEC: btest-bg-run send "zeek -b -r $TRACES/ticks-dns-1hr.pcap ../send.zeek >send.out" +# +# @TEST-EXEC: btest-bg-wait 45 +# @TEST-EXEC: btest-diff recv/recv.out +# @TEST-EXEC: btest-diff send/send.out + +# @TEST-START-FILE send.zeek + +redef exit_only_after_terminate = T; + +global runs = 0; +global ping: event(msg: string, intended_ts: time); + +event zeek_init() + { + Broker::subscribe("zeek/event/my_topic"); + Broker::peer("127.0.0.1", to_port(getenv("BROKER_PORT"))); + suspend_processing(); + } + +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) + { + print fmt("sender added peer: endpoint=%s msg=%s", + endpoint$network$address, msg); + continue_processing(); + } + +event new_connection(c: connection) + { + print fmt(">> Run %s (%D)", ++runs, network_time()); + + local e = Broker::make_event(ping, "my-message", network_time()); + Broker::publish("zeek/event/my_topic", e); + } + +event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) + { + print fmt("sender lost peer: endpoint=%s msg=%s", + endpoint$network$address, msg); + terminate(); + } + +# @TEST-END-FILE + + +# @TEST-START-FILE recv.zeek + +redef exit_only_after_terminate = T; + +global msg_count = 0; + +event zeek_init() + { + Broker::subscribe("zeek/event/my_topic"); + Broker::listen("127.0.0.1", to_port(getenv("BROKER_PORT"))); + } + +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) + { + print fmt("receiver added peer: endpoint=%s msg=%s", endpoint$network$address, msg); + } + +event ping(msg: string, intended_ts: time) + { + print fmt("receiver got ping: %s intended for %D stamped to %D (is_remote = %s)", + msg, intended_ts, current_event_time(), is_remote_event()); + + if ( ++msg_count >= 10 ) + terminate(); + } + +# @TEST-END-FILE