mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 06:38:20 +00:00
broker/Manager: Fetch and forward all metadata from events
Also use the generic metadata version for publishing, keep the ts-based API for now, but only add timestamps when EventMetadata::add_network_timestamp is T. I'm not sure what the right way forward here is, maybe deprecating Broker's publish event variations and funneling through cluster.
This commit is contained in:
parent
96f2d5d369
commit
e1f70164e0
1 changed files with 16 additions and 16 deletions
|
@ -8,6 +8,7 @@
|
||||||
#include <broker/event.hh>
|
#include <broker/event.hh>
|
||||||
#include <broker/event_observer.hh>
|
#include <broker/event_observer.hh>
|
||||||
#include <broker/logger.hh>
|
#include <broker/logger.hh>
|
||||||
|
#include <broker/time.hh>
|
||||||
#include <broker/variant.hh>
|
#include <broker/variant.hh>
|
||||||
#include <broker/zeek.hh>
|
#include <broker/zeek.hh>
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
|
@ -42,6 +43,8 @@
|
||||||
#include "zeek/telemetry/Manager.h"
|
#include "zeek/telemetry/Manager.h"
|
||||||
#include "zeek/util.h"
|
#include "zeek/util.h"
|
||||||
|
|
||||||
|
#include "const.bif.netvar_h"
|
||||||
|
|
||||||
using namespace std;
|
using namespace std;
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
|
@ -849,8 +852,15 @@ bool Manager::PublishEvent(string topic, std::string name, broker::vector args,
|
||||||
if ( peer_count == 0 && hub_count == 0 )
|
if ( peer_count == 0 && hub_count == 0 )
|
||||||
return true;
|
return true;
|
||||||
|
|
||||||
broker::zeek::Event ev(name, args, broker::to_timestamp(ts));
|
broker::vector meta;
|
||||||
DBG_LOG(DBG_BROKER, "Publishing event: %s", RenderEvent(topic, name, ev.args()).c_str());
|
if ( BifConst::EventMetadata::add_network_timestamp ) {
|
||||||
|
broker::vector entry{static_cast<broker::count>(zeek::detail::MetadataType::NetworkTimestamp),
|
||||||
|
broker::to_timestamp(ts)};
|
||||||
|
meta.emplace_back(std::move(entry));
|
||||||
|
}
|
||||||
|
|
||||||
|
broker::zeek::Event ev(name, args, meta);
|
||||||
|
DBG_LOG(DBG_BROKER, "Publishing event: %s", RenderEvent(topic, std::string(ev.name()), ev.args()).c_str());
|
||||||
bstate->endpoint.publish(std::move(topic), ev.move_data());
|
bstate->endpoint.publish(std::move(topic), ev.move_data());
|
||||||
num_events_outgoing_metric->Inc();
|
num_events_outgoing_metric->Inc();
|
||||||
return true;
|
return true;
|
||||||
|
@ -1571,15 +1581,10 @@ void Manager::ProcessMessage(std::string_view topic, broker::zeek::Event& ev) {
|
||||||
|
|
||||||
auto&& name = ev.name();
|
auto&& name = ev.name();
|
||||||
auto&& args = ev.args();
|
auto&& args = ev.args();
|
||||||
double ts;
|
auto meta = cluster::detail::metadata_vector_from_broker_event(ev);
|
||||||
|
|
||||||
if ( auto ev_ts = ev.ts() )
|
DBG_LOG(DBG_BROKER, "Process event: %s (with %zu metadata entries) %s", std::string{name}.c_str(),
|
||||||
broker::convert(*ev_ts, ts);
|
meta ? meta->size() : 0, RenderMessage(args).c_str());
|
||||||
else
|
|
||||||
// Default to current network time, if the received event did not contain a timestamp.
|
|
||||||
ts = run_state::network_time;
|
|
||||||
|
|
||||||
DBG_LOG(DBG_BROKER, "Process event: %s (%.6f) %s", std::string{name}.c_str(), ts, RenderMessage(args).c_str());
|
|
||||||
num_events_incoming_metric->Inc();
|
num_events_incoming_metric->Inc();
|
||||||
auto handler = event_registry->Lookup(name);
|
auto handler = event_registry->Lookup(name);
|
||||||
|
|
||||||
|
@ -1655,13 +1660,8 @@ void Manager::ProcessMessage(std::string_view topic, broker::zeek::Event& ev) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if ( vl.size() == args.size() ) {
|
if ( vl.size() == args.size() )
|
||||||
zeek::detail::EventMetadataVectorPtr meta;
|
|
||||||
if ( ts >= 0.0 )
|
|
||||||
meta = zeek::detail::MakeEventMetadataVector(ts);
|
|
||||||
|
|
||||||
event_mgr.Enqueue(std::move(meta), handler, std::move(vl), util::detail::SOURCE_BROKER);
|
event_mgr.Enqueue(std::move(meta), handler, std::move(vl), util::detail::SOURCE_BROKER);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Manager::ProcessMessage(std::string_view, broker::zeek::LogCreate& lc) {
|
bool Manager::ProcessMessage(std::string_view, broker::zeek::LogCreate& lc) {
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue