Merge remote-tracking branch 'jgras/topic/jgras/event-ts'

* jgras/topic/jgras/event-ts:
  Add compatibility tests for timestamped events.
  Add timestamps to auto published broker events.
  Add timestamps to manually published broker events.
  Annotate scheduled events with intended timestamp.
  Add timestamp to events.

One timestamp to ts rename during the merge.
This commit is contained in:
Arne Welzel 2023-05-11 14:00:31 +02:00
commit 11776d60e0
31 changed files with 710 additions and 26 deletions

15
CHANGES
View file

@ -1,3 +1,18 @@
6.0.0-dev.551 | 2023-05-11 14:00:31 +0200
* Add compatibility tests for timestamped events. (Jan Grashoefer, Corelight)
This adds compatibility tests for receiving non-timestamped events as
well as providing timestamps via broker websockets.
* Add timestamps to auto published broker events. (Jan Grashoefer, Corelight)
* Add timestamps to manually published broker events. (Jan Grashoefer, Corelight)
* Annotate scheduled events with intended timestamp. (Jan Grashoefer, Corelight)
* Add timestamp to events. (Jan Grashoefer, Corelight)
6.0.0-dev.544 | 2023-05-11 00:01:20 +0200 6.0.0-dev.544 | 2023-05-11 00:01:20 +0200
* GH-3028: policy/community-id: Do not use new_connection() (Arne Welzel, Corelight) * GH-3028: policy/community-id: Do not use new_connection() (Arne Welzel, Corelight)

13
NEWS
View file

@ -91,6 +91,19 @@ New Functionality
To disable this functionality, pass ``--disable-javascript`` to configure. To disable this functionality, pass ``--disable-javascript`` to configure.
- Zeek events now hold network timestamps. For scheduled events, the timestamp
represents the network time for which the event was scheduled for, otherwise
it is the network time at event creation. A new bif ``current_event_time()``
allows to retrieve the current event's network timestamp within the script-layer.
When Zeek sends events via Broker to other nodes in a cluster, an event's network
timestamp is attached to the Broker messages. On a receiving Zeek node executing a
handler for a remote event, ``current_event_time()`` returns the network time of
the sending node at the time the event was created.
The Broker level implementation allows to exchange arbitrary event metadata, but
Zeek's script and C++ APIs currently only expose network timestamp functionality.
- A new bif ``from_json()`` can be used to parse JSON strings into records. - A new bif ``from_json()`` can be used to parse JSON strings into records.
type A: record { a: addr; }; type A: record { a: addr; };

View file

@ -1 +1 @@
6.0.0-dev.544 6.0.0-dev.551

@ -1 +1 @@
Subproject commit 000834f60ab7540041c431a3657c23c7476e368d Subproject commit 3dd913bacfc394f8108bf8306be0a60c253a3d13

View file

@ -7,7 +7,6 @@
#include "zeek/Desc.h" #include "zeek/Desc.h"
#include "zeek/Func.h" #include "zeek/Func.h"
#include "zeek/NetVar.h" #include "zeek/NetVar.h"
#include "zeek/RunState.h"
#include "zeek/Trigger.h" #include "zeek/Trigger.h"
#include "zeek/Val.h" #include "zeek/Val.h"
#include "zeek/iosource/Manager.h" #include "zeek/iosource/Manager.h"
@ -19,10 +18,10 @@ zeek::EventMgr zeek::event_mgr;
namespace zeek namespace zeek
{ {
Event::Event(EventHandlerPtr arg_handler, zeek::Args arg_args, util::detail::SourceID arg_src, Event::Event(const EventHandlerPtr& arg_handler, zeek::Args arg_args,
analyzer::ID arg_aid, Obj* arg_obj) util::detail::SourceID arg_src, analyzer::ID arg_aid, Obj* arg_obj, double arg_ts)
: handler(arg_handler), args(std::move(arg_args)), src(arg_src), aid(arg_aid), obj(arg_obj), : handler(arg_handler), args(std::move(arg_args)), src(arg_src), aid(arg_aid), ts(arg_ts),
next_event(nullptr) obj(arg_obj), next_event(nullptr)
{ {
if ( obj ) if ( obj )
Ref(obj); Ref(obj);
@ -53,7 +52,7 @@ void Event::Dispatch(bool no_remote)
try try
{ {
handler->Call(&args, no_remote); handler->Call(&args, no_remote, ts);
} }
catch ( InterpreterException& e ) catch ( InterpreterException& e )
@ -74,6 +73,7 @@ EventMgr::EventMgr()
head = tail = nullptr; head = tail = nullptr;
current_src = util::detail::SOURCE_LOCAL; current_src = util::detail::SOURCE_LOCAL;
current_aid = 0; current_aid = 0;
current_ts = 0;
src_val = nullptr; src_val = nullptr;
draining = false; draining = false;
} }
@ -91,9 +91,9 @@ EventMgr::~EventMgr()
} }
void EventMgr::Enqueue(const EventHandlerPtr& h, Args vl, util::detail::SourceID src, void EventMgr::Enqueue(const EventHandlerPtr& h, Args vl, util::detail::SourceID src,
analyzer::ID aid, Obj* obj) analyzer::ID aid, Obj* obj, double ts)
{ {
QueueEvent(new Event(h, std::move(vl), src, aid, obj)); QueueEvent(new Event(h, std::move(vl), src, aid, obj, ts));
} }
void EventMgr::QueueEvent(Event* event) void EventMgr::QueueEvent(Event* event)
@ -120,6 +120,8 @@ void EventMgr::QueueEvent(Event* event)
void EventMgr::Dispatch(Event* event, bool no_remote) void EventMgr::Dispatch(Event* event, bool no_remote)
{ {
current_src = event->Source(); current_src = event->Source();
current_aid = event->Analyzer();
current_ts = event->Time();
event->Dispatch(no_remote); event->Dispatch(no_remote);
Unref(event); Unref(event);
} }
@ -154,6 +156,7 @@ void EventMgr::Drain()
current_src = current->Source(); current_src = current->Source();
current_aid = current->Analyzer(); current_aid = current->Analyzer();
current_ts = current->Time();
current->Dispatch(); current->Dispatch();
Unref(current); Unref(current);

View file

@ -15,14 +15,19 @@
namespace zeek namespace zeek
{ {
namespace run_state
{
extern double network_time;
} // namespace run_state
class EventMgr; class EventMgr;
class Event final : public Obj class Event final : public Obj
{ {
public: public:
Event(EventHandlerPtr handler, zeek::Args args, Event(const EventHandlerPtr& handler, zeek::Args args,
util::detail::SourceID src = util::detail::SOURCE_LOCAL, analyzer::ID aid = 0, util::detail::SourceID src = util::detail::SOURCE_LOCAL, analyzer::ID aid = 0,
Obj* obj = nullptr); Obj* obj = nullptr, double ts = run_state::network_time);
void SetNext(Event* n) { next_event = n; } void SetNext(Event* n) { next_event = n; }
Event* NextEvent() const { return next_event; } Event* NextEvent() const { return next_event; }
@ -31,6 +36,7 @@ public:
analyzer::ID Analyzer() const { return aid; } analyzer::ID Analyzer() const { return aid; }
EventHandlerPtr Handler() const { return handler; } EventHandlerPtr Handler() const { return handler; }
const zeek::Args& Args() const { return args; } const zeek::Args& Args() const { return args; }
double Time() const { return ts; }
void Describe(ODesc* d) const override; void Describe(ODesc* d) const override;
@ -45,6 +51,7 @@ protected:
zeek::Args args; zeek::Args args;
util::detail::SourceID src; util::detail::SourceID src;
analyzer::ID aid; analyzer::ID aid;
double ts;
Obj* obj; Obj* obj;
Event* next_event; Event* next_event;
}; };
@ -66,10 +73,12 @@ public:
* @param aid identifies the protocol analyzer generating the event. * @param aid identifies the protocol analyzer generating the event.
* @param obj an arbitrary object to use as a "cookie" or just hold a * @param obj an arbitrary object to use as a "cookie" or just hold a
* reference to until dispatching the event. * reference to until dispatching the event.
* @param ts timestamp at which the event is intended to be executed
* (defaults to current network time).
*/ */
void Enqueue(const EventHandlerPtr& h, zeek::Args vl, void Enqueue(const EventHandlerPtr& h, zeek::Args vl,
util::detail::SourceID src = util::detail::SOURCE_LOCAL, analyzer::ID aid = 0, util::detail::SourceID src = util::detail::SOURCE_LOCAL, analyzer::ID aid = 0,
Obj* obj = nullptr); Obj* obj = nullptr, double ts = run_state::network_time);
/** /**
* A version of Enqueue() taking a variable number of arguments. * A version of Enqueue() taking a variable number of arguments.
@ -95,6 +104,11 @@ public:
// non-analyzer event. // non-analyzer event.
analyzer::ID CurrentAnalyzer() const { return current_aid; } analyzer::ID CurrentAnalyzer() const { return current_aid; }
// Returns the timestamp of the last raised event. The timestamp reflects the network time
// the event was intended to be executed. For scheduled events, this is the time the event
// was scheduled to. For any other event, this is the time when the event was created.
double CurrentEventTime() const { return current_ts; }
int Size() const { return num_events_queued - num_events_dispatched; } int Size() const { return num_events_queued - num_events_dispatched; }
void Describe(ODesc* d) const override; void Describe(ODesc* d) const override;
@ -114,6 +128,7 @@ protected:
Event* tail; Event* tail;
util::detail::SourceID current_src; util::detail::SourceID current_src;
analyzer::ID current_aid; analyzer::ID current_aid;
double current_ts;
RecordVal* src_val; RecordVal* src_val;
bool draining; bool draining;
detail::Flare queue_flare; detail::Flare queue_flare;

View file

@ -52,7 +52,7 @@ void EventHandler::SetFunc(FuncPtr f)
local = std::move(f); local = std::move(f);
} }
void EventHandler::Call(Args* vl, bool no_remote) void EventHandler::Call(Args* vl, bool no_remote, double ts)
{ {
if ( ! call_count ) if ( ! call_count )
{ {
@ -100,10 +100,10 @@ void EventHandler::Call(Args* vl, bool no_remote)
++it; ++it;
if ( it != auto_publish.end() ) if ( it != auto_publish.end() )
broker_mgr->PublishEvent(topic, Name(), xs); broker_mgr->PublishEvent(topic, Name(), xs, ts);
else else
{ {
broker_mgr->PublishEvent(topic, Name(), std::move(xs)); broker_mgr->PublishEvent(topic, Name(), std::move(xs), ts);
break; break;
} }
} }

View file

@ -14,6 +14,11 @@
namespace zeek namespace zeek
{ {
namespace run_state
{
extern double network_time;
} // namespace run_state
class Func; class Func;
using FuncPtr = IntrusivePtr<Func>; using FuncPtr = IntrusivePtr<Func>;
@ -34,7 +39,7 @@ public:
void AutoUnpublish(const std::string& topic) { auto_publish.erase(topic); } void AutoUnpublish(const std::string& topic) { auto_publish.erase(topic); }
void Call(zeek::Args* vl, bool no_remote = false); void Call(zeek::Args* vl, bool no_remote = false, double ts = run_state::network_time);
// Returns true if there is at least one local or remote handler. // Returns true if there is at least one local or remote handler.
explicit operator bool() const; explicit operator bool() const;

View file

@ -4279,7 +4279,8 @@ ScheduleTimer::~ScheduleTimer() { }
void ScheduleTimer::Dispatch(double /* t */, bool /* is_expire */) void ScheduleTimer::Dispatch(double /* t */, bool /* is_expire */)
{ {
if ( event ) if ( event )
event_mgr.Enqueue(event, std::move(args)); event_mgr.Enqueue(event, std::move(args), util::detail::SOURCE_LOCAL, 0, nullptr,
this->Time());
} }
ScheduleExpr::ScheduleExpr(ExprPtr arg_when, EventExprPtr arg_event) ScheduleExpr::ScheduleExpr(ExprPtr arg_when, EventExprPtr arg_event)

View file

@ -643,7 +643,7 @@ std::string Manager::NodeID() const
return to_string(bstate->endpoint.node_id()); return to_string(bstate->endpoint.node_id());
} }
bool Manager::PublishEvent(string topic, std::string name, broker::vector args) bool Manager::PublishEvent(string topic, std::string name, broker::vector args, double ts)
{ {
if ( bstate->endpoint.is_shutdown() ) if ( bstate->endpoint.is_shutdown() )
return true; return true;
@ -652,7 +652,7 @@ bool Manager::PublishEvent(string topic, std::string name, broker::vector args)
return true; return true;
DBG_LOG(DBG_BROKER, "Publishing event: %s", RenderEvent(topic, name, args).c_str()); DBG_LOG(DBG_BROKER, "Publishing event: %s", RenderEvent(topic, name, args).c_str());
broker::zeek::Event ev(std::move(name), std::move(args)); broker::zeek::Event ev(std::move(name), std::move(args), broker::to_timestamp(ts));
bstate->endpoint.publish(std::move(topic), ev.move_data()); bstate->endpoint.publish(std::move(topic), ev.move_data());
++statistics.num_events_outgoing; ++statistics.num_events_outgoing;
return true; return true;
@ -681,7 +681,10 @@ bool Manager::PublishEvent(string topic, RecordVal* args)
xs.emplace_back(data_val->data); xs.emplace_back(data_val->data);
} }
return PublishEvent(std::move(topic), event_name, std::move(xs)); // At this point we come from script-land. This means that publishing of the event was
// explicitly triggered. Hence, the timestamp is set to the current network time. This also
// means that timestamping cannot be manipulated from script-land for now.
return PublishEvent(std::move(topic), event_name, std::move(xs), run_state::network_time);
} }
bool Manager::PublishIdentifier(std::string topic, std::string id) bool Manager::PublishIdentifier(std::string topic, std::string id)
@ -1386,8 +1389,15 @@ void Manager::ProcessEvent(const broker::topic& topic, broker::zeek::Event ev)
auto name = std::move(ev.name()); auto name = std::move(ev.name());
auto args = std::move(ev.args()); auto args = std::move(ev.args());
double ts;
DBG_LOG(DBG_BROKER, "Process event: %s %s", name.data(), RenderMessage(args).data()); if ( auto ev_ts = ev.ts() )
broker::convert(*ev_ts, ts);
else
// Default to current network time, if the received event did not contain a timestamp.
ts = run_state::network_time;
DBG_LOG(DBG_BROKER, "Process event: %s (%.6f) %s", name.data(), ts, RenderMessage(args).data());
++statistics.num_events_incoming; ++statistics.num_events_incoming;
auto handler = event_registry->Lookup(name); auto handler = event_registry->Lookup(name);
@ -1469,7 +1479,7 @@ void Manager::ProcessEvent(const broker::topic& topic, broker::zeek::Event ev)
} }
if ( vl.size() == args.size() ) if ( vl.size() == args.size() )
event_mgr.Enqueue(handler, std::move(vl), util::detail::SOURCE_BROKER); event_mgr.Enqueue(handler, std::move(vl), util::detail::SOURCE_BROKER, 0, nullptr, ts);
} }
bool Manager::ProcessLogCreate(broker::zeek::LogCreate lc) bool Manager::ProcessLogCreate(broker::zeek::LogCreate lc)

View file

@ -183,9 +183,11 @@ public:
* of this topic name. * of this topic name.
* @param name the name of the event * @param name the name of the event
* @param args the event's arguments * @param args the event's arguments
* @param ts the timestamp the event is intended to be executed
* @return true if the message is sent successfully. * @return true if the message is sent successfully.
*/ */
bool PublishEvent(std::string topic, std::string name, broker::vector args); bool PublishEvent(std::string topic, std::string name, broker::vector args,
double ts = run_state::network_time);
/** /**
* Send an event to any interested peers. * Send an event to any interested peers.
@ -193,7 +195,8 @@ public:
* Peers advertise interest by registering a subscription to some prefix * Peers advertise interest by registering a subscription to some prefix
* of this topic name. * of this topic name.
* @param ev the event and its arguments to send to peers, in the form of * @param ev the event and its arguments to send to peers, in the form of
* a Broker::Event record type. * a Broker::Event record type. The timestamp for the event is set to the
* current network time.
* @return true if the message is sent successfully. * @return true if the message is sent successfully.
*/ */
bool PublishEvent(std::string topic, RecordVal* ev); bool PublishEvent(std::string topic, RecordVal* ev);

View file

@ -354,6 +354,19 @@ function set_network_time%(nt: time%): bool
return zeek::val_mgr->True(); return zeek::val_mgr->True();
%} %}
## Returns the timestamp of the last raised event. The timestamp reflects the
## network time the event was intended to be executed. For scheduled events,
## this is the time the event was scheduled for. For any other event, this is
## the time when the event was created.
##
## Returns: The timestamp of the last raised event.
##
## .. zeek:see:: current_time set_network_time
function current_event_time%(%): time
%{
return zeek::make_intrusive<zeek::TimeVal>(zeek::event_mgr.CurrentEventTime());
%}
## Returns a system environment variable. ## Returns a system environment variable.
## ##
## var: The name of the variable whose value to request. ## var: The name of the variable whose value to request.

View file

@ -0,0 +1,11 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
receiver added peer: endpoint=127.0.0.1 msg=handshake successful
receiver got ping: my-message-a intended for 1989-12-12-22:00:00 stamped to 1989-12-12-22:00:00 (is_remote = T)
receiver got ping: my-message-b intended for 1989-12-12-22:15:00 stamped to 1989-12-12-22:15:00 (is_remote = T)
receiver got ping: my-message-c intended for 1989-12-12-22:30:00 stamped to 1989-12-12-22:30:00 (is_remote = T)
receiver got ping: my-message-a intended for 1989-12-12-23:00:00 stamped to 1989-12-12-23:00:00 (is_remote = T)
receiver got ping: my-message-b intended for 1989-12-12-23:15:00 stamped to 1989-12-12-23:15:00 (is_remote = T)
receiver got ping: my-message-c intended for 1989-12-12-23:30:00 stamped to 1989-12-12-23:30:00 (is_remote = T)
receiver got ping: my-message-a intended for 1989-12-13-00:00:00 stamped to 1989-12-13-00:00:00 (is_remote = T)
receiver got ping: my-message-b intended for 1989-12-13-00:15:00 stamped to 1989-12-13-00:15:00 (is_remote = T)
receiver got ping: my-message-c intended for 1989-12-13-00:30:00 stamped to 1989-12-13-00:30:00 (is_remote = T)

View file

@ -0,0 +1,13 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
sender added peer: endpoint=127.0.0.1 msg=handshake successful
>> Run 1 (1989-12-12-22:00:00)
>> Run 2 (1989-12-12-23:00:00)
>> Run 3 (1989-12-13-00:00:00)
>> Run 4 (1989-12-13-01:00:00)
>> Run 5 (1989-12-13-02:00:00)
>> Run 6 (1989-12-13-03:00:00)
>> Run 7 (1989-12-13-04:00:00)
>> Run 8 (1989-12-13-05:00:00)
>> Run 9 (1989-12-13-06:00:00)
>> Run 10 (1989-12-13-07:00:00)
sender lost peer: endpoint=127.0.0.1 msg=lost connection to remote peer

View file

@ -0,0 +1,12 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
receiver added peer: endpoint=127.0.0.1 msg=handshake successful
receiver got ping: my-message intended for 1989-12-12-22:00:00 stamped to 1989-12-12-22:00:00 (is_remote = T)
receiver got ping: my-message intended for 1989-12-12-23:00:00 stamped to 1989-12-12-23:00:00 (is_remote = T)
receiver got ping: my-message intended for 1989-12-13-00:00:00 stamped to 1989-12-13-00:00:00 (is_remote = T)
receiver got ping: my-message intended for 1989-12-13-01:00:00 stamped to 1989-12-13-01:00:00 (is_remote = T)
receiver got ping: my-message intended for 1989-12-13-02:00:00 stamped to 1989-12-13-02:00:00 (is_remote = T)
receiver got ping: my-message intended for 1989-12-13-03:00:00 stamped to 1989-12-13-03:00:00 (is_remote = T)
receiver got ping: my-message intended for 1989-12-13-04:00:00 stamped to 1989-12-13-04:00:00 (is_remote = T)
receiver got ping: my-message intended for 1989-12-13-05:00:00 stamped to 1989-12-13-05:00:00 (is_remote = T)
receiver got ping: my-message intended for 1989-12-13-06:00:00 stamped to 1989-12-13-06:00:00 (is_remote = T)
receiver got ping: my-message intended for 1989-12-13-07:00:00 stamped to 1989-12-13-07:00:00 (is_remote = T)

View file

@ -0,0 +1,13 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
sender added peer: endpoint=127.0.0.1 msg=handshake successful
>> Run 1 (1989-12-12-22:00:00)
>> Run 2 (1989-12-12-23:00:00)
>> Run 3 (1989-12-13-00:00:00)
>> Run 4 (1989-12-13-01:00:00)
>> Run 5 (1989-12-13-02:00:00)
>> Run 6 (1989-12-13-03:00:00)
>> Run 7 (1989-12-13-04:00:00)
>> Run 8 (1989-12-13-05:00:00)
>> Run 9 (1989-12-13-06:00:00)
>> Run 10 (1989-12-13-07:00:00)
sender lost peer: endpoint=127.0.0.1 msg=lost connection to remote peer

View file

@ -0,0 +1,3 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
send event without timestamp
send event with timestamp

View file

@ -0,0 +1,5 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
added peer: endpoint=127.0.0.1 msg=handshake successful
got my_event(without ts) stamped to 42.0 at network time 42.0
got my_event(with ts) stamped to 23.0 at network time 42.0
lost peer: endpoint=127.0.0.1 msg=lost connection to remote peer

View file

@ -0,0 +1,4 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
ping args ['my-message', 1] metadata [{'@data-type': 'vector', 'data': [{'@data-type': 'count', 'data': 1}, {'@data-type': 'timestamp', 'data': '2023-04-18T12:13:14.000'}]}]
ping args ['my-message', 2] metadata [{'@data-type': 'vector', 'data': [{'@data-type': 'count', 'data': 1}, {'@data-type': 'timestamp', 'data': '2023-04-18T12:13:24.000'}]}]
ping args ['my-message', 3] metadata [{'@data-type': 'vector', 'data': [{'@data-type': 'count', 'data': 1}, {'@data-type': 'timestamp', 'data': '2023-04-18T12:13:34.000'}]}]

View file

@ -0,0 +1,6 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
sender added peer: endpoint=127.0.0.1 msg=handshake successful
sender got pong: my-message, 1 network_time=1681819994.0 current_event_time=1681819995.0
sender got pong: my-message, 2 network_time=1681820004.0 current_event_time=1681819996.0
sender got pong: my-message, 3 network_time=1681820014.0 current_event_time=1681819997.0
sender lost peer: endpoint=127.0.0.1 msg=lost connection to client

View file

@ -0,0 +1,41 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
>> Run 0 (1989-12-12-22:00:00)
<< Run 0 (1989-12-12-22:00:00)
[1989-12-12-23:00:00] Test was scheduled at 1989-12-12-22:00:00 for 1989-12-12-22:15:00
[1989-12-12-23:00:00] Test was scheduled at 1989-12-12-22:00:00 for 1989-12-12-22:30:00
>> Run 1 (1989-12-12-23:00:00)
<< Run 1 (1989-12-12-23:00:00)
[1989-12-13-00:00:00] Test was scheduled at 1989-12-12-23:00:00 for 1989-12-12-23:15:00
[1989-12-13-00:00:00] Test was scheduled at 1989-12-12-23:00:00 for 1989-12-12-23:30:00
>> Run 2 (1989-12-13-00:00:00)
<< Run 2 (1989-12-13-00:00:00)
[1989-12-13-01:00:00] Test was scheduled at 1989-12-13-00:00:00 for 1989-12-13-00:15:00
[1989-12-13-01:00:00] Test was scheduled at 1989-12-13-00:00:00 for 1989-12-13-00:30:00
>> Run 3 (1989-12-13-01:00:00)
<< Run 3 (1989-12-13-01:00:00)
[1989-12-13-02:00:00] Test was scheduled at 1989-12-13-01:00:00 for 1989-12-13-01:15:00
[1989-12-13-02:00:00] Test was scheduled at 1989-12-13-01:00:00 for 1989-12-13-01:30:00
>> Run 4 (1989-12-13-02:00:00)
<< Run 4 (1989-12-13-02:00:00)
[1989-12-13-03:00:00] Test was scheduled at 1989-12-13-02:00:00 for 1989-12-13-02:15:00
[1989-12-13-03:00:00] Test was scheduled at 1989-12-13-02:00:00 for 1989-12-13-02:30:00
>> Run 5 (1989-12-13-03:00:00)
<< Run 5 (1989-12-13-03:00:00)
[1989-12-13-04:00:00] Test was scheduled at 1989-12-13-03:00:00 for 1989-12-13-03:15:00
[1989-12-13-04:00:00] Test was scheduled at 1989-12-13-03:00:00 for 1989-12-13-03:30:00
>> Run 6 (1989-12-13-04:00:00)
<< Run 6 (1989-12-13-04:00:00)
[1989-12-13-05:00:00] Test was scheduled at 1989-12-13-04:00:00 for 1989-12-13-04:15:00
[1989-12-13-05:00:00] Test was scheduled at 1989-12-13-04:00:00 for 1989-12-13-04:30:00
>> Run 7 (1989-12-13-05:00:00)
<< Run 7 (1989-12-13-05:00:00)
[1989-12-13-06:00:00] Test was scheduled at 1989-12-13-05:00:00 for 1989-12-13-05:15:00
[1989-12-13-06:00:00] Test was scheduled at 1989-12-13-05:00:00 for 1989-12-13-05:30:00
>> Run 8 (1989-12-13-06:00:00)
<< Run 8 (1989-12-13-06:00:00)
[1989-12-13-07:00:00] Test was scheduled at 1989-12-13-06:00:00 for 1989-12-13-06:15:00
[1989-12-13-07:00:00] Test was scheduled at 1989-12-13-06:00:00 for 1989-12-13-06:30:00
>> Run 9 (1989-12-13-07:00:00)
<< Run 9 (1989-12-13-07:00:00)
[1989-12-13-07:00:00] Test was scheduled at 1989-12-13-07:00:00 for 1989-12-13-07:15:00
[1989-12-13-07:00:00] Test was scheduled at 1989-12-13-07:00:00 for 1989-12-13-07:30:00

View file

@ -0,0 +1,51 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
>> Run 0 (1989-12-12-22:00:00):
[1989-12-12-22:00:00] Test 4 was scheduled at 1989-12-12-22:00:00
[1989-12-12-22:00:00] Test 3 was scheduled at 1989-12-12-22:00:00
[1989-12-12-22:00:00] Test 2 was scheduled at 1989-12-12-22:00:00
[1989-12-12-22:00:01] Test 1 was scheduled at 1989-12-12-22:00:00
>> Run 1 (1989-12-12-22:00:01):
[1989-12-12-22:00:01] Test 4 was scheduled at 1989-12-12-22:00:01
[1989-12-12-22:00:01] Test 3 was scheduled at 1989-12-12-22:00:01
[1989-12-12-22:00:01] Test 2 was scheduled at 1989-12-12-22:00:01
[1989-12-12-22:00:02] Test 1 was scheduled at 1989-12-12-22:00:01
>> Run 2 (1989-12-12-22:00:02):
[1989-12-12-22:00:02] Test 4 was scheduled at 1989-12-12-22:00:02
[1989-12-12-22:00:02] Test 3 was scheduled at 1989-12-12-22:00:02
[1989-12-12-22:00:02] Test 2 was scheduled at 1989-12-12-22:00:02
[1989-12-12-22:00:03] Test 1 was scheduled at 1989-12-12-22:00:02
>> Run 3 (1989-12-12-22:00:03):
[1989-12-12-22:00:03] Test 4 was scheduled at 1989-12-12-22:00:03
[1989-12-12-22:00:03] Test 3 was scheduled at 1989-12-12-22:00:03
[1989-12-12-22:00:03] Test 2 was scheduled at 1989-12-12-22:00:03
[1989-12-12-22:00:04] Test 1 was scheduled at 1989-12-12-22:00:03
>> Run 4 (1989-12-12-22:00:04):
[1989-12-12-22:00:04] Test 4 was scheduled at 1989-12-12-22:00:04
[1989-12-12-22:00:04] Test 3 was scheduled at 1989-12-12-22:00:04
[1989-12-12-22:00:04] Test 2 was scheduled at 1989-12-12-22:00:04
[1989-12-12-22:00:05] Test 1 was scheduled at 1989-12-12-22:00:04
>> Run 5 (1989-12-12-22:00:05):
[1989-12-12-22:00:05] Test 4 was scheduled at 1989-12-12-22:00:05
[1989-12-12-22:00:05] Test 3 was scheduled at 1989-12-12-22:00:05
[1989-12-12-22:00:05] Test 2 was scheduled at 1989-12-12-22:00:05
[1989-12-12-22:00:06] Test 1 was scheduled at 1989-12-12-22:00:05
>> Run 6 (1989-12-12-22:00:06):
[1989-12-12-22:00:06] Test 4 was scheduled at 1989-12-12-22:00:06
[1989-12-12-22:00:06] Test 3 was scheduled at 1989-12-12-22:00:06
[1989-12-12-22:00:06] Test 2 was scheduled at 1989-12-12-22:00:06
[1989-12-12-22:00:07] Test 1 was scheduled at 1989-12-12-22:00:06
>> Run 7 (1989-12-12-22:00:07):
[1989-12-12-22:00:07] Test 4 was scheduled at 1989-12-12-22:00:07
[1989-12-12-22:00:07] Test 3 was scheduled at 1989-12-12-22:00:07
[1989-12-12-22:00:07] Test 2 was scheduled at 1989-12-12-22:00:07
[1989-12-12-22:00:08] Test 1 was scheduled at 1989-12-12-22:00:07
>> Run 8 (1989-12-12-22:00:08):
[1989-12-12-22:00:08] Test 4 was scheduled at 1989-12-12-22:00:08
[1989-12-12-22:00:08] Test 3 was scheduled at 1989-12-12-22:00:08
[1989-12-12-22:00:08] Test 2 was scheduled at 1989-12-12-22:00:08
[1989-12-12-22:00:09] Test 1 was scheduled at 1989-12-12-22:00:08
>> Run 9 (1989-12-12-22:00:09):
[1989-12-12-22:00:09] Test 4 was scheduled at 1989-12-12-22:00:09
[1989-12-12-22:00:09] Test 3 was scheduled at 1989-12-12-22:00:09
[1989-12-12-22:00:09] Test 2 was scheduled at 1989-12-12-22:00:09
[1989-12-12-22:00:09] Test 1 was scheduled at 1989-12-12-22:00:09

Binary file not shown.

Binary file not shown.

View file

@ -0,0 +1,82 @@
# @TEST-GROUP: broker
#
# @TEST-PORT: BROKER_PORT
#
# @TEST-EXEC: btest-bg-run recv "zeek -b ../recv.zeek >recv.out"
# @TEST-EXEC: btest-bg-run send "zeek -b -r $TRACES/ticks-dns-1hr.pcap ../send.zeek >send.out"
#
# @TEST-EXEC: btest-bg-wait 45
# @TEST-EXEC: btest-diff recv/recv.out
# @TEST-EXEC: btest-diff send/send.out
# @TEST-START-FILE send.zeek
redef exit_only_after_terminate = T;
global runs = 0;
global ping: event(msg: string, intended_ts: time);
event zeek_init()
{
suspend_processing();
Broker::subscribe("zeek/event/my_topic");
Broker::auto_publish("zeek/event/my_topic", ping);
Broker::peer("127.0.0.1", to_port(getenv("BROKER_PORT")));
}
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
{
print fmt("sender added peer: endpoint=%s msg=%s",
endpoint$network$address, msg);
continue_processing();
}
event new_connection(c: connection)
{
print fmt(">> Run %s (%D)", ++runs, network_time());
event ping("my-message-a", network_time());
schedule 30 mins { ping("my-message-c", network_time() + 30 mins) };
schedule 15 mins { ping("my-message-b", network_time() + 15 mins) };
}
event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
{
print fmt("sender lost peer: endpoint=%s msg=%s",
endpoint$network$address, msg);
terminate();
}
# @TEST-END-FILE
# @TEST-START-FILE recv.zeek
redef exit_only_after_terminate = T;
global msg_count = 0;
event zeek_init()
{
Broker::subscribe("zeek/event/my_topic");
Broker::listen("127.0.0.1", to_port(getenv("BROKER_PORT")));
}
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
{
print fmt("receiver added peer: endpoint=%s msg=%s", endpoint$network$address, msg);
}
event ping(msg: string, intended_ts: time) &is_used
{
if ( ++msg_count >= 10 )
{
terminate();
return;
}
print fmt("receiver got ping: %s intended for %D stamped to %D (is_remote = %s)",
msg, intended_ts, current_event_time(), is_remote_event());
}
# @TEST-END-FILE

View file

@ -0,0 +1,77 @@
# @TEST-GROUP: broker
#
# @TEST-PORT: BROKER_PORT
#
# @TEST-EXEC: btest-bg-run recv "zeek -b ../recv.zeek >recv.out"
# @TEST-EXEC: btest-bg-run send "zeek -b -r $TRACES/ticks-dns-1hr.pcap ../send.zeek >send.out"
#
# @TEST-EXEC: btest-bg-wait 45
# @TEST-EXEC: btest-diff recv/recv.out
# @TEST-EXEC: btest-diff send/send.out
# @TEST-START-FILE send.zeek
redef exit_only_after_terminate = T;
global runs = 0;
global ping: event(msg: string, intended_ts: time);
event zeek_init()
{
Broker::subscribe("zeek/event/my_topic");
Broker::peer("127.0.0.1", to_port(getenv("BROKER_PORT")));
suspend_processing();
}
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
{
print fmt("sender added peer: endpoint=%s msg=%s",
endpoint$network$address, msg);
continue_processing();
}
event new_connection(c: connection)
{
print fmt(">> Run %s (%D)", ++runs, network_time());
local e = Broker::make_event(ping, "my-message", network_time());
Broker::publish("zeek/event/my_topic", e);
}
event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
{
print fmt("sender lost peer: endpoint=%s msg=%s",
endpoint$network$address, msg);
terminate();
}
# @TEST-END-FILE
# @TEST-START-FILE recv.zeek
redef exit_only_after_terminate = T;
global msg_count = 0;
event zeek_init()
{
Broker::subscribe("zeek/event/my_topic");
Broker::listen("127.0.0.1", to_port(getenv("BROKER_PORT")));
}
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
{
print fmt("receiver added peer: endpoint=%s msg=%s", endpoint$network$address, msg);
}
event ping(msg: string, intended_ts: time)
{
print fmt("receiver got ping: %s intended for %D stamped to %D (is_remote = %s)",
msg, intended_ts, current_event_time(), is_remote_event());
if ( ++msg_count >= 10 )
terminate();
}
# @TEST-END-FILE

View file

@ -0,0 +1,92 @@
# @TEST-DOC: Test compatibility with peers sending events without timestamps.
#
# @TEST-GROUP: broker
# @TEST-PORT: BROKER_PORT
#
# @TEST-REQUIRES: python3 -V
# @TEST-REQUIRES: TOPIC=/zeek/my_topic python3 client.py check
#
# @TEST-EXEC: TOPIC=/zeek/my_topic btest-bg-run server "zeek %INPUT >output"
# @TEST-EXEC: TOPIC=/zeek/my_topic btest-bg-run client "python3 ../client.py >output"
#
# @TEST-EXEC: btest-bg-wait 45
# @TEST-EXEC: btest-diff server/output
# @TEST-EXEC: btest-diff client/output
redef exit_only_after_terminate = T;
redef allow_network_time_forward = F;
event zeek_init()
{
Broker::subscribe(getenv("TOPIC"));
Broker::listen("127.0.0.1", to_port(getenv("BROKER_PORT")));
set_network_time(double_to_time(42.0));
}
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
{
print fmt("added peer: endpoint=%s msg=%s", endpoint$network$address, msg);
}
event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
{
print fmt("lost peer: endpoint=%s msg=%s", endpoint$network$address, msg);
terminate();
}
event my_event(msg: string) &is_used
{
print fmt("got my_event(%s) stamped to %s at network time %s",
msg, current_event_time(), network_time());
}
@TEST-START-FILE client.py
"""
Python script sending timestamped and non-timestamped event to TOPIC
"""
import datetime
import os
import sys
# Prep the PYTHONPATH for the build directory.
broker_path = os.path.join(os.environ["BUILD"], "auxil", "broker", "python")
sys.path.insert(0, broker_path)
import broker
# 1024/tcp
broker_port = int(os.environ["BROKER_PORT"].split("/")[0])
broker_topic = os.environ["TOPIC"]
# We were able to import broker and parse the broker_port, should be good.
if len(sys.argv) > 1 and sys.argv[1] == "check":
sys.exit(0)
# Setup endpoint and connect to Zeek.
with broker.Endpoint() as ep, \
ep.make_status_subscriber(True) as ss:
ep.peer("127.0.0.1", broker_port)
st = ss.get(2)
if not (st[0].code() == broker.SC.EndpointDiscovered and
st[1].code() == broker.SC.PeerAdded):
print("could not connect")
exit(0)
# Send events and close connection
print("send event without timestamp")
my_event = broker.zeek.Event("my_event", "without ts")
ep.publish(broker_topic, my_event)
print("send event with timestamp")
ts = datetime.datetime.fromtimestamp(23.0, broker.utc)
metadata = {
broker.zeek.Metadata.NETWORK_TIMESTAMP: ts,
}
my_event = broker.zeek.Event("my_event", "with ts", metadata=metadata)
ep.publish(broker_topic, my_event)
ep.shutdown()
@TEST-END-FILE

View file

@ -0,0 +1,148 @@
# @TEST-GROUP: broker
#
# This test requires the websockets module, available via
# "pip install websockets".
# @TEST-REQUIRES: python3 -c 'import websockets'
#
# @TEST-PORT: BROKER_PORT
#
# @TEST-EXEC: btest-bg-run server "zeek -b %INPUT >output"
# @TEST-EXEC: btest-bg-run client "python3 ../client.py >output"
#
# @TEST-EXEC: btest-bg-wait 5
# @TEST-EXEC: btest-diff client/output
# @TEST-EXEC: TEST_DIFF_CANONIFIER= btest-diff server/output
redef allow_network_time_forward = F;
redef exit_only_after_terminate = T;
redef Broker::disable_ssl = T;
global event_count = 0;
global ping: event(msg: string, c: count);
event zeek_init()
{
# Tue 18 Apr 2023 12:13:14 PM UTC
set_network_time(double_to_time(1681819994.0));
Broker::subscribe("/zeek/event/my_topic");
Broker::listen_websocket("127.0.0.1", to_port(getenv("BROKER_PORT")));
}
function send_event()
{
++event_count;
local e = Broker::make_event(ping, "my-message", event_count);
Broker::publish("/zeek/event/my_topic", e);
}
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
{
print fmt("sender added peer: endpoint=%s msg=%s", endpoint$network$address, msg);
send_event();
}
event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
{
print fmt("sender lost peer: endpoint=%s msg=%s", endpoint$network$address, msg);
terminate();
}
event pong(msg: string, n: count) &is_used
{
print fmt("sender got pong: %s, %s network_time=%s current_event_time=%s",
msg, n, network_time(), current_event_time());
set_network_time(network_time() + 10sec);
send_event();
}
@TEST-START-FILE client.py
import asyncio, datetime, websockets, os, time, json, sys
ws_port = os.environ['BROKER_PORT'].split('/')[0]
ws_url = 'ws://localhost:%s/v1/messages/json' % ws_port
topic = '"/zeek/event/my_topic"'
def broker_value(type, val):
return {
'@data-type': type,
'data': val
}
async def do_run():
# Try up to 30 times.
connected = False
for i in range(30):
try:
ws = await websockets.connect(ws_url)
connected = True
# send filter and wait for ack
await ws.send('[%s]' % topic)
ack_json = await ws.recv()
ack = json.loads(ack_json)
if not 'type' in ack or ack['type'] != 'ack':
print('*** unexpected ACK from server:')
print(ack_json)
sys.exit()
except Exception as e:
if not connected:
print('failed to connect to %s, try again (%s)' % (ws_url, e), file=sys.stderr)
await asyncio.sleep(1)
continue
else:
print('exception: %s' % e, file=sys.stderr)
sys.exit()
for round in range(3):
# wait for ping
msg = await ws.recv()
msg = json.loads(msg)
if not 'type' in msg or msg['type'] != 'data-message':
print("unexpected type", msg)
continue
ping = msg['data'][2]['data']
if len(ping) < 3:
print("no metadata on event")
continue
name = ping[0]['data']
args = [x['data'] for x in ping[1]['data']]
metadata = ping[2]['data']
print(name, "args", args, "metadata", metadata)
# send pong
dt = datetime.datetime.utcfromtimestamp(1681819994 + args[1])
ts_str = dt.isoformat('T', 'milliseconds')
pong = [
broker_value('string', 'pong'),
broker_value('vector', [
broker_value('string', args[0]),
broker_value('count', args[1]),
]),
broker_value('vector', [
broker_value('vector', [
broker_value('count', 1), # network_timestamp
broker_value('timestamp', ts_str),
]),
]),
]
ev = [broker_value('count', 1), broker_value('count', 1), broker_value('vector', pong)]
msg = {
'type': 'data-message',
'topic': '/zeek/event/my_topic',
'@data-type': 'vector', 'data': ev
}
msg = json.dumps(msg)
await ws.send(msg)
await ws.close()
sys.exit()
loop = asyncio.get_event_loop()
loop.run_until_complete(do_run())
@TEST-END-FILE

View file

@ -84,7 +84,7 @@ async def do_run():
except Exception as e: except Exception as e:
if not connected: if not connected:
print('failed to connect to %s, try again (%s)' % (ws_url, e), file=sys.stderr) print('failed to connect to %s, try again (%s)' % (ws_url, e), file=sys.stderr)
time.sleep(1) await asyncio.sleep(1)
continue continue
else: else:
print('exception: %s' % e, file=sys.stderr) print('exception: %s' % e, file=sys.stderr)

View file

@ -0,0 +1,20 @@
# @TEST-EXEC: zeek -b -r $TRACES/ticks-dns-1hr.pcap %INPUT > out
# @TEST-EXEC: btest-diff out
global runs = 0;
event test(schedule_time: time)
{
print fmt("[%D] Test was scheduled at %D for %D", network_time(),
schedule_time, current_event_time());
}
event new_connection(c: connection)
{
local nt = network_time();
print fmt(">> Run %s (%D)", runs, nt);
schedule 30 mins { test(nt) };
schedule 15 mins { test(nt) };
print fmt("<< Run %s (%D)", runs, nt);
++runs;
}

View file

@ -0,0 +1,28 @@
# @TEST-EXEC: zeek -b -r $TRACES/ticks-dns.pcap %INPUT > out
# @TEST-EXEC: btest-diff out
# Note: We use a PCAP with DNS queries only so that we have a single packet per
# time step. Thus the run loop will be executed only once per time step.
global runs = -1;
event test(depth: count)
{
if ( depth == 0 )
return;
print fmt("[%D] Test %s was scheduled at %D", network_time(), depth, current_event_time());
event test(--depth);
}
event new_connection(c: connection)
{
print fmt(">> Run %s (%D):", ++runs, network_time());
# Descend into recursion to enqueue events until we add an event that will
# be handled in the next run loop iteration, i.e. at a different timestamp
# than it was enqueued. Use four levels of recursion as every drain of the
# event queue handles two layers and the event queue is drained two times.
# First after processing a packet and second in the run loop. Finally, we
# expect an event so that network_time() > current_event_time().
event test(4);
}