mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 14:48:21 +00:00
Add timestamps to manually published broker events.
This commit is contained in:
parent
ae152f1777
commit
c12640b048
5 changed files with 113 additions and 4 deletions
|
@ -1 +1 @@
|
||||||
Subproject commit 000834f60ab7540041c431a3657c23c7476e368d
|
Subproject commit 3dd913bacfc394f8108bf8306be0a60c253a3d13
|
|
@ -652,7 +652,7 @@ bool Manager::PublishEvent(string topic, std::string name, broker::vector args)
|
||||||
return true;
|
return true;
|
||||||
|
|
||||||
DBG_LOG(DBG_BROKER, "Publishing event: %s", RenderEvent(topic, name, args).c_str());
|
DBG_LOG(DBG_BROKER, "Publishing event: %s", RenderEvent(topic, name, args).c_str());
|
||||||
broker::zeek::Event ev(std::move(name), std::move(args));
|
broker::zeek::Event ev(std::move(name), std::move(args), run_state::network_time);
|
||||||
bstate->endpoint.publish(std::move(topic), ev.move_data());
|
bstate->endpoint.publish(std::move(topic), ev.move_data());
|
||||||
++statistics.num_events_outgoing;
|
++statistics.num_events_outgoing;
|
||||||
return true;
|
return true;
|
||||||
|
@ -1386,8 +1386,15 @@ void Manager::ProcessEvent(const broker::topic& topic, broker::zeek::Event ev)
|
||||||
|
|
||||||
auto name = std::move(ev.name());
|
auto name = std::move(ev.name());
|
||||||
auto args = std::move(ev.args());
|
auto args = std::move(ev.args());
|
||||||
|
double ts;
|
||||||
|
|
||||||
DBG_LOG(DBG_BROKER, "Process event: %s %s", name.data(), RenderMessage(args).data());
|
if ( auto ev_ts = ev.ts() )
|
||||||
|
broker::convert(*ev_ts, ts);
|
||||||
|
else
|
||||||
|
// Default to current network time, if the received event did not contain a timestamp.
|
||||||
|
ts = run_state::network_time;
|
||||||
|
|
||||||
|
DBG_LOG(DBG_BROKER, "Process event: %s (%.6f) %s", name.data(), ts, RenderMessage(args).data());
|
||||||
++statistics.num_events_incoming;
|
++statistics.num_events_incoming;
|
||||||
auto handler = event_registry->Lookup(name);
|
auto handler = event_registry->Lookup(name);
|
||||||
|
|
||||||
|
@ -1469,7 +1476,7 @@ void Manager::ProcessEvent(const broker::topic& topic, broker::zeek::Event ev)
|
||||||
}
|
}
|
||||||
|
|
||||||
if ( vl.size() == args.size() )
|
if ( vl.size() == args.size() )
|
||||||
event_mgr.Enqueue(handler, std::move(vl), util::detail::SOURCE_BROKER);
|
event_mgr.Enqueue(handler, std::move(vl), util::detail::SOURCE_BROKER, 0, nullptr, ts);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Manager::ProcessLogCreate(broker::zeek::LogCreate lc)
|
bool Manager::ProcessLogCreate(broker::zeek::LogCreate lc)
|
||||||
|
|
12
testing/btest/Baseline/broker.remote_event_ts/recv.recv.out
Normal file
12
testing/btest/Baseline/broker.remote_event_ts/recv.recv.out
Normal file
|
@ -0,0 +1,12 @@
|
||||||
|
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||||
|
receiver added peer: endpoint=127.0.0.1 msg=handshake successful
|
||||||
|
receiver got ping: my-message intended for 1989-12-12-22:00:00 stamped to 1989-12-12-22:00:00 (is_remote = T)
|
||||||
|
receiver got ping: my-message intended for 1989-12-12-23:00:00 stamped to 1989-12-12-23:00:00 (is_remote = T)
|
||||||
|
receiver got ping: my-message intended for 1989-12-13-00:00:00 stamped to 1989-12-13-00:00:00 (is_remote = T)
|
||||||
|
receiver got ping: my-message intended for 1989-12-13-01:00:00 stamped to 1989-12-13-01:00:00 (is_remote = T)
|
||||||
|
receiver got ping: my-message intended for 1989-12-13-02:00:00 stamped to 1989-12-13-02:00:00 (is_remote = T)
|
||||||
|
receiver got ping: my-message intended for 1989-12-13-03:00:00 stamped to 1989-12-13-03:00:00 (is_remote = T)
|
||||||
|
receiver got ping: my-message intended for 1989-12-13-04:00:00 stamped to 1989-12-13-04:00:00 (is_remote = T)
|
||||||
|
receiver got ping: my-message intended for 1989-12-13-05:00:00 stamped to 1989-12-13-05:00:00 (is_remote = T)
|
||||||
|
receiver got ping: my-message intended for 1989-12-13-06:00:00 stamped to 1989-12-13-06:00:00 (is_remote = T)
|
||||||
|
receiver got ping: my-message intended for 1989-12-13-07:00:00 stamped to 1989-12-13-07:00:00 (is_remote = T)
|
13
testing/btest/Baseline/broker.remote_event_ts/send.send.out
Normal file
13
testing/btest/Baseline/broker.remote_event_ts/send.send.out
Normal file
|
@ -0,0 +1,13 @@
|
||||||
|
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||||
|
sender added peer: endpoint=127.0.0.1 msg=handshake successful
|
||||||
|
>> Run 1 (1989-12-12-22:00:00)
|
||||||
|
>> Run 2 (1989-12-12-23:00:00)
|
||||||
|
>> Run 3 (1989-12-13-00:00:00)
|
||||||
|
>> Run 4 (1989-12-13-01:00:00)
|
||||||
|
>> Run 5 (1989-12-13-02:00:00)
|
||||||
|
>> Run 6 (1989-12-13-03:00:00)
|
||||||
|
>> Run 7 (1989-12-13-04:00:00)
|
||||||
|
>> Run 8 (1989-12-13-05:00:00)
|
||||||
|
>> Run 9 (1989-12-13-06:00:00)
|
||||||
|
>> Run 10 (1989-12-13-07:00:00)
|
||||||
|
sender lost peer: endpoint=127.0.0.1 msg=lost connection to remote peer
|
77
testing/btest/broker/remote_event_ts.zeek
Normal file
77
testing/btest/broker/remote_event_ts.zeek
Normal file
|
@ -0,0 +1,77 @@
|
||||||
|
# @TEST-GROUP: broker
|
||||||
|
#
|
||||||
|
# @TEST-PORT: BROKER_PORT
|
||||||
|
#
|
||||||
|
# @TEST-EXEC: btest-bg-run recv "zeek -b ../recv.zeek >recv.out"
|
||||||
|
# @TEST-EXEC: btest-bg-run send "zeek -b -r $TRACES/ticks-dns-1hr.pcap ../send.zeek >send.out"
|
||||||
|
#
|
||||||
|
# @TEST-EXEC: btest-bg-wait 45
|
||||||
|
# @TEST-EXEC: btest-diff recv/recv.out
|
||||||
|
# @TEST-EXEC: btest-diff send/send.out
|
||||||
|
|
||||||
|
# @TEST-START-FILE send.zeek
|
||||||
|
|
||||||
|
redef exit_only_after_terminate = T;
|
||||||
|
|
||||||
|
global runs = 0;
|
||||||
|
global ping: event(msg: string, intended_ts: time);
|
||||||
|
|
||||||
|
event zeek_init()
|
||||||
|
{
|
||||||
|
Broker::subscribe("zeek/event/my_topic");
|
||||||
|
Broker::peer("127.0.0.1", to_port(getenv("BROKER_PORT")));
|
||||||
|
suspend_processing();
|
||||||
|
}
|
||||||
|
|
||||||
|
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
|
||||||
|
{
|
||||||
|
print fmt("sender added peer: endpoint=%s msg=%s",
|
||||||
|
endpoint$network$address, msg);
|
||||||
|
continue_processing();
|
||||||
|
}
|
||||||
|
|
||||||
|
event new_connection(c: connection)
|
||||||
|
{
|
||||||
|
print fmt(">> Run %s (%D)", ++runs, network_time());
|
||||||
|
|
||||||
|
local e = Broker::make_event(ping, "my-message", network_time());
|
||||||
|
Broker::publish("zeek/event/my_topic", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
|
||||||
|
{
|
||||||
|
print fmt("sender lost peer: endpoint=%s msg=%s",
|
||||||
|
endpoint$network$address, msg);
|
||||||
|
terminate();
|
||||||
|
}
|
||||||
|
|
||||||
|
# @TEST-END-FILE
|
||||||
|
|
||||||
|
|
||||||
|
# @TEST-START-FILE recv.zeek
|
||||||
|
|
||||||
|
redef exit_only_after_terminate = T;
|
||||||
|
|
||||||
|
global msg_count = 0;
|
||||||
|
|
||||||
|
event zeek_init()
|
||||||
|
{
|
||||||
|
Broker::subscribe("zeek/event/my_topic");
|
||||||
|
Broker::listen("127.0.0.1", to_port(getenv("BROKER_PORT")));
|
||||||
|
}
|
||||||
|
|
||||||
|
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
|
||||||
|
{
|
||||||
|
print fmt("receiver added peer: endpoint=%s msg=%s", endpoint$network$address, msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
event ping(msg: string, intended_ts: time)
|
||||||
|
{
|
||||||
|
print fmt("receiver got ping: %s intended for %D stamped to %D (is_remote = %s)",
|
||||||
|
msg, intended_ts, current_event_time(), is_remote_event());
|
||||||
|
|
||||||
|
if ( ++msg_count >= 10 )
|
||||||
|
terminate();
|
||||||
|
}
|
||||||
|
|
||||||
|
# @TEST-END-FILE
|
Loading…
Add table
Add a link
Reference in a new issue