mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 14:48:21 +00:00
cluster/serializer: Add Broker based event serializers
This adds the first event serializers that use broker functionality. Binary and JSON formats.
This commit is contained in:
parent
ef04a199c8
commit
9ec872d161
10 changed files with 319 additions and 0 deletions
|
@ -10,3 +10,5 @@ zeek_add_subdir_library(
|
|||
Manager.cc
|
||||
BIFS
|
||||
cluster.bif)
|
||||
|
||||
add_subdirectory(serializer)
|
||||
|
|
1
src/cluster/serializer/CMakeLists.txt
Normal file
1
src/cluster/serializer/CMakeLists.txt
Normal file
|
@ -0,0 +1 @@
|
|||
add_subdirectory(broker)
|
9
src/cluster/serializer/broker/CMakeLists.txt
Normal file
9
src/cluster/serializer/broker/CMakeLists.txt
Normal file
|
@ -0,0 +1,9 @@
|
|||
zeek_add_plugin(
|
||||
Zeek
|
||||
Cluster_Serializer_Binary_Serialization_Format
|
||||
INCLUDE_DIRS
|
||||
${CMAKE_CURRENT_SOURCE_DIR}
|
||||
${CMAKE_CURRENT_BINARY_DIR}
|
||||
SOURCES
|
||||
Plugin.cc
|
||||
Serializer.cc)
|
24
src/cluster/serializer/broker/Plugin.cc
Normal file
24
src/cluster/serializer/broker/Plugin.cc
Normal file
|
@ -0,0 +1,24 @@
|
|||
#include "Plugin.h"
|
||||
|
||||
#include <memory>
|
||||
|
||||
#include "zeek/cluster/Component.h"
|
||||
|
||||
#include "Serializer.h"
|
||||
|
||||
using namespace zeek::cluster;
|
||||
using namespace zeek::plugin::Broker_Serializer;
|
||||
|
||||
zeek::plugin::Configuration Plugin::Configure() {
|
||||
AddComponent(new EventSerializerComponent("BROKER_BIN_V1", []() -> std::unique_ptr<EventSerializer> {
|
||||
return std::make_unique<cluster::detail::BrokerBinV1_Serializer>();
|
||||
}));
|
||||
AddComponent(new EventSerializerComponent("BROKER_JSON_V1", []() -> std::unique_ptr<EventSerializer> {
|
||||
return std::make_unique<cluster::detail::BrokerJsonV1_Serializer>();
|
||||
}));
|
||||
|
||||
zeek::plugin::Configuration config;
|
||||
config.name = "Zeek::Broker_Serializer";
|
||||
config.description = "Event serialization using Broker event formats (binary and json)";
|
||||
return config;
|
||||
}
|
14
src/cluster/serializer/broker/Plugin.h
Normal file
14
src/cluster/serializer/broker/Plugin.h
Normal file
|
@ -0,0 +1,14 @@
|
|||
// See the file "COPYING" in the main distribution directory for copyright.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "zeek/plugin/Plugin.h"
|
||||
|
||||
namespace zeek::plugin::Broker_Serializer {
|
||||
|
||||
class Plugin : public zeek::plugin::Plugin {
|
||||
public:
|
||||
zeek::plugin::Configuration Configure() override;
|
||||
} plugin;
|
||||
|
||||
} // namespace zeek::plugin::Broker_Serializer
|
1
src/cluster/serializer/broker/README
Normal file
1
src/cluster/serializer/broker/README
Normal file
|
@ -0,0 +1 @@
|
|||
Contains event serializers using Broker's BIN and JSON formats.
|
219
src/cluster/serializer/broker/Serializer.cc
Normal file
219
src/cluster/serializer/broker/Serializer.cc
Normal file
|
@ -0,0 +1,219 @@
|
|||
#include "zeek/cluster/serializer/broker/Serializer.h"
|
||||
|
||||
#include <optional>
|
||||
|
||||
#include "zeek/Desc.h"
|
||||
#include "zeek/Func.h"
|
||||
#include "zeek/Reporter.h"
|
||||
#include "zeek/broker/Data.h"
|
||||
#include "zeek/cluster/Backend.h"
|
||||
|
||||
#include "broker/data_envelope.hh"
|
||||
#include "broker/error.hh"
|
||||
#include "broker/format/json.hh"
|
||||
#include "broker/zeek.hh"
|
||||
|
||||
#include "zeek/3rdparty/doctest.h"
|
||||
|
||||
using namespace zeek::cluster;
|
||||
|
||||
namespace {
|
||||
|
||||
/**
|
||||
* Convert a cluster::detail::Event to a broker::zeek::Event.
|
||||
*
|
||||
* @param ev The cluster::detail::Event
|
||||
* @return A broker::zeek::Event to be serialized, or nullopt in case of errors.
|
||||
*/
|
||||
std::optional<broker::zeek::Event> to_broker_event(const detail::Event& ev) {
|
||||
broker::vector xs;
|
||||
xs.reserve(ev.args.size());
|
||||
for ( const auto& a : ev.args ) {
|
||||
if ( auto res = zeek::Broker::detail::val_to_data(a.get()) ) {
|
||||
xs.emplace_back(std::move(res.value()));
|
||||
}
|
||||
else {
|
||||
return std::nullopt;
|
||||
}
|
||||
}
|
||||
|
||||
return broker::zeek::Event(ev.HandlerName(), xs, broker::to_timestamp(ev.timestamp));
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a broker::zeek::Event to cluster::detail::Event by looking
|
||||
* it up in Zeek's event handler registry and converting event arguments
|
||||
* to the appropriate Val instances.
|
||||
*
|
||||
* @param broker_ev The broker side event.
|
||||
* @returns A zeek::cluster::detail::Event instance, or std::nullopt if the conversion failed.
|
||||
*/
|
||||
std::optional<detail::Event> to_zeek_event(const broker::zeek::Event& ev) {
|
||||
auto&& name = ev.name();
|
||||
auto&& args = ev.args();
|
||||
|
||||
// Meh, technically need to convert ev.metadata() and
|
||||
// expose it to script land as `table[count] of any`
|
||||
// where consumers then know what to do with it.
|
||||
//
|
||||
// For now, handle the timestamp explicitly.
|
||||
double ts;
|
||||
if ( auto ev_ts = ev.ts() )
|
||||
broker::convert(*ev_ts, ts);
|
||||
else
|
||||
ts = zeek::run_state::network_time;
|
||||
|
||||
zeek::Args vl;
|
||||
zeek::EventHandlerPtr handler = zeek::event_registry->Lookup(name);
|
||||
if ( ! handler ) {
|
||||
zeek::reporter->Error("Failed to lookup handler for '%s'", std::string(name).c_str());
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
const auto& arg_types = handler->GetFunc()->GetType()->ParamList()->GetTypes();
|
||||
|
||||
if ( arg_types.size() != args.size() ) {
|
||||
std::string event_name(name);
|
||||
zeek::reporter->Error("Unserialize error '%s' arg_types.size()=%zu and args.size()=%zu", event_name.c_str(),
|
||||
arg_types.size(), args.size());
|
||||
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
for ( size_t i = 0; i < args.size(); ++i ) {
|
||||
const auto& expected_type = arg_types[i];
|
||||
auto arg = args[i].to_data();
|
||||
auto val = zeek::Broker::detail::data_to_val(arg, expected_type.get());
|
||||
if ( val )
|
||||
vl.emplace_back(std::move(val));
|
||||
else {
|
||||
std::string event_name(name);
|
||||
auto got_type = args[i].get_type_name();
|
||||
std::string argstr = broker::to_string(arg);
|
||||
zeek::reporter
|
||||
->Error("Unserialize error for event '%s': broker value '%s' type '%s' to Zeek type '%s' failed",
|
||||
event_name.c_str(), argstr.c_str(), got_type, obj_desc(expected_type.get()).c_str());
|
||||
|
||||
return std::nullopt;
|
||||
}
|
||||
}
|
||||
|
||||
return detail::Event{handler, std::move(vl), ts};
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
bool detail::BrokerBinV1_Serializer::SerializeEvent(detail::byte_buffer& buf, const detail::Event& event) {
|
||||
auto ev = to_broker_event(event);
|
||||
if ( ! ev )
|
||||
return false;
|
||||
|
||||
// The produced broker::zeek::Event is already in bin::v1 format after
|
||||
// constructing it, so we can take the raw bytes directly rather than
|
||||
// going through encode() again.
|
||||
//
|
||||
// broker::format::bin::v1::encode(ev->move_data(), std::back_inserter(buf));
|
||||
assert(ev->raw()->shared_envelope() != nullptr);
|
||||
auto [raw, size] = ev->raw().shared_envelope()->raw_bytes();
|
||||
buf.insert(buf.begin(), raw, raw + size);
|
||||
return true;
|
||||
}
|
||||
|
||||
std::optional<detail::Event> detail::BrokerBinV1_Serializer::UnserializeEvent(detail::byte_buffer_span buf) {
|
||||
auto r = broker::data_envelope::deserialize(broker::endpoint_id::nil(), broker::endpoint_id::nil(), 0, "",
|
||||
buf.data(), buf.size());
|
||||
if ( ! r )
|
||||
return std::nullopt;
|
||||
|
||||
broker::zeek::Event ev(*r);
|
||||
return to_zeek_event(ev);
|
||||
}
|
||||
|
||||
|
||||
// Convert char to std::byte during push_back() so that
|
||||
// we don't need to copy from std::vector<char> to a
|
||||
// std::vector<std::byte> when rendering JSON.
|
||||
template<typename T>
|
||||
struct PushBackAdapter {
|
||||
explicit PushBackAdapter(T& c) : container(&c) {}
|
||||
using value_type = char;
|
||||
|
||||
void push_back(char c) { container->push_back(static_cast<std::byte>(c)); }
|
||||
|
||||
T* container;
|
||||
};
|
||||
|
||||
|
||||
bool detail::BrokerJsonV1_Serializer::SerializeEvent(byte_buffer& buf, const detail::Event& event) {
|
||||
auto ev = to_broker_event(event);
|
||||
if ( ! ev )
|
||||
return false;
|
||||
|
||||
auto push_back_adaptor = PushBackAdapter(buf);
|
||||
broker::format::json::v1::encode(ev->move_data(), std::back_inserter(push_back_adaptor));
|
||||
return true;
|
||||
}
|
||||
|
||||
std::optional<detail::Event> detail::BrokerJsonV1_Serializer::UnserializeEvent(detail::byte_buffer_span buf) {
|
||||
broker::variant res;
|
||||
auto err =
|
||||
broker::format::json::v1::decode(std::string_view{reinterpret_cast<const char*>(buf.data()), buf.size()}, res);
|
||||
if ( err ) {
|
||||
zeek::reporter->Error("Decode error for JSON payload: '%s'",
|
||||
err.message() ? err.message()->c_str() : "unknown");
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
broker::zeek::Event ev(std::move(res));
|
||||
return to_zeek_event(ev);
|
||||
}
|
||||
|
||||
TEST_SUITE_BEGIN("cluster serializer broker");
|
||||
|
||||
#include "zeek/EventRegistry.h"
|
||||
|
||||
TEST_CASE("roundtrip") {
|
||||
auto* handler = zeek::event_registry->Lookup("Supervisor::node_status");
|
||||
detail::Event e{handler, zeek::Args{zeek::make_intrusive<zeek::StringVal>("TEST"), zeek::val_mgr->Count(42)}};
|
||||
detail::byte_buffer buf;
|
||||
|
||||
SUBCASE("json") {
|
||||
detail::BrokerJsonV1_Serializer serializer;
|
||||
std::string expected =
|
||||
R"({"@data-type":"vector","data":[{"@data-type":"count","data":1},{"@data-type":"count","data":1},{"@data-type":"vector","data":[{"@data-type":"string","data":"Supervisor::node_status"},{"@data-type":"vector","data":[{"@data-type":"string","data":"TEST"},{"@data-type":"count","data":42}]},{"@data-type":"vector","data":[{"@data-type":"vector","data":[{"@data-type":"count","data":1},{"@data-type":"timestamp","data":"1970-01-01T00:00:00.000"}]}]}]}]})";
|
||||
|
||||
serializer.SerializeEvent(buf, e);
|
||||
|
||||
CHECK_EQ(expected, std::string{reinterpret_cast<char*>(buf.data()), buf.size()});
|
||||
|
||||
auto result = serializer.UnserializeEvent(buf);
|
||||
REQUIRE(result);
|
||||
CHECK_EQ(result->Handler(), handler);
|
||||
CHECK_EQ(result->HandlerName(), "Supervisor::node_status");
|
||||
CHECK_EQ(result->args.size(), 2);
|
||||
}
|
||||
|
||||
SUBCASE("binary") {
|
||||
detail::BrokerBinV1_Serializer serializer;
|
||||
unsigned char expected_bytes[] = {0x0e, 0x03, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x02, 0x00,
|
||||
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x0e, 0x03, 0x05, 0x17, 0x53, 0x75,
|
||||
0x70, 0x65, 0x72, 0x76, 0x69, 0x73, 0x6f, 0x72, 0x3a, 0x3a, 0x6e, 0x6f, 0x64,
|
||||
0x65, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x0e, 0x02, 0x05, 0x04, 0x54,
|
||||
0x45, 0x53, 0x54, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x2a, 0x0e,
|
||||
0x01, 0x0e, 0x02, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x09,
|
||||
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
|
||||
std::byte* p = reinterpret_cast<std::byte*>(&expected_bytes[0]);
|
||||
detail::byte_buffer expected{p, p + sizeof(expected_bytes)};
|
||||
|
||||
serializer.SerializeEvent(buf, e);
|
||||
|
||||
CHECK_EQ(expected, buf);
|
||||
|
||||
auto result = serializer.UnserializeEvent(buf);
|
||||
REQUIRE(result);
|
||||
CHECK_EQ(result->Handler(), handler);
|
||||
CHECK_EQ(result->HandlerName(), "Supervisor::node_status");
|
||||
CHECK_EQ(result->args.size(), 2);
|
||||
}
|
||||
}
|
||||
TEST_SUITE_END();
|
31
src/cluster/serializer/broker/Serializer.h
Normal file
31
src/cluster/serializer/broker/Serializer.h
Normal file
|
@ -0,0 +1,31 @@
|
|||
// See the file "COPYING" in the main distribution directory for copyright.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "zeek/cluster/Serializer.h"
|
||||
|
||||
namespace zeek::cluster::detail {
|
||||
|
||||
// Implementation of the EventSerializer using the existing broker::detail::val_to_data()
|
||||
// and broker::format::bin::v1::encode().
|
||||
class BrokerBinV1_Serializer : public EventSerializer {
|
||||
public:
|
||||
BrokerBinV1_Serializer() : EventSerializer("broker-bin-v1") {}
|
||||
|
||||
bool SerializeEvent(detail::byte_buffer& buf, const detail::Event& event) override;
|
||||
|
||||
std::optional<detail::Event> UnserializeEvent(detail::byte_buffer_span buf) override;
|
||||
};
|
||||
|
||||
// Implementation of the EventSerializer that uses the existing broker::detail::val_to_data()
|
||||
// and broker::format::json::v1::encode()
|
||||
class BrokerJsonV1_Serializer : public EventSerializer {
|
||||
public:
|
||||
BrokerJsonV1_Serializer() : EventSerializer("broker-json-v1") {}
|
||||
|
||||
bool SerializeEvent(zeek::cluster::detail::byte_buffer& buf, const detail::Event& event) override;
|
||||
|
||||
std::optional<detail::Event> UnserializeEvent(detail::byte_buffer_span buf) override;
|
||||
};
|
||||
|
||||
} // namespace zeek::cluster::detail
|
7
testing/btest/Baseline/cluster.serializer-enum/out
Normal file
7
testing/btest/Baseline/cluster.serializer-enum/out
Normal file
|
@ -0,0 +1,7 @@
|
|||
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||
Zeek::Broker_Serializer - Event serialization using Broker event formats (binary and json) (built-in)
|
||||
[Event Serializer] BROKER_BIN_V1 (Cluster::EVENT_SERIALIZER_BROKER_BIN_V1)
|
||||
[Event Serializer] BROKER_JSON_V1 (Cluster::EVENT_SERIALIZER_BROKER_JSON_V1)
|
||||
|
||||
Cluster::EVENT_SERIALIZER_BROKER_BIN_V1, Cluster::EventSerializerTag
|
||||
Cluster::EVENT_SERIALIZER_BROKER_JSON_V1, Cluster::EventSerializerTag
|
11
testing/btest/cluster/serializer-enum.zeek
Normal file
11
testing/btest/cluster/serializer-enum.zeek
Normal file
|
@ -0,0 +1,11 @@
|
|||
# @TEST-DOC: Test cluster backend enum
|
||||
#
|
||||
# @TEST-EXEC: zeek -NN Zeek::Broker_Serializer >>out
|
||||
# @TEST-EXEC: zeek -b %INPUT >>out
|
||||
# @TEST-EXEC: btest-diff out
|
||||
|
||||
event zeek_init()
|
||||
{
|
||||
print Cluster::EVENT_SERIALIZER_BROKER_BIN_V1, type_name(Cluster::EVENT_SERIALIZER_BROKER_BIN_V1);
|
||||
print Cluster::EVENT_SERIALIZER_BROKER_JSON_V1, type_name(Cluster::EVENT_SERIALIZER_BROKER_JSON_V1);
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue