diff --git a/src/cluster/CMakeLists.txt b/src/cluster/CMakeLists.txt index a4da087ff1..14a063943d 100644 --- a/src/cluster/CMakeLists.txt +++ b/src/cluster/CMakeLists.txt @@ -10,3 +10,5 @@ zeek_add_subdir_library( Manager.cc BIFS cluster.bif) + +add_subdirectory(serializer) diff --git a/src/cluster/serializer/CMakeLists.txt b/src/cluster/serializer/CMakeLists.txt new file mode 100644 index 0000000000..4f3a423cd9 --- /dev/null +++ b/src/cluster/serializer/CMakeLists.txt @@ -0,0 +1 @@ +add_subdirectory(broker) diff --git a/src/cluster/serializer/broker/CMakeLists.txt b/src/cluster/serializer/broker/CMakeLists.txt new file mode 100644 index 0000000000..4854cbb333 --- /dev/null +++ b/src/cluster/serializer/broker/CMakeLists.txt @@ -0,0 +1,9 @@ +zeek_add_plugin( + Zeek + Cluster_Serializer_Binary_Serialization_Format + INCLUDE_DIRS + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_BINARY_DIR} + SOURCES + Plugin.cc + Serializer.cc) diff --git a/src/cluster/serializer/broker/Plugin.cc b/src/cluster/serializer/broker/Plugin.cc new file mode 100644 index 0000000000..aa9ab83d79 --- /dev/null +++ b/src/cluster/serializer/broker/Plugin.cc @@ -0,0 +1,24 @@ +#include "Plugin.h" + +#include + +#include "zeek/cluster/Component.h" + +#include "Serializer.h" + +using namespace zeek::cluster; +using namespace zeek::plugin::Broker_Serializer; + +zeek::plugin::Configuration Plugin::Configure() { + AddComponent(new EventSerializerComponent("BROKER_BIN_V1", []() -> std::unique_ptr { + return std::make_unique(); + })); + AddComponent(new EventSerializerComponent("BROKER_JSON_V1", []() -> std::unique_ptr { + return std::make_unique(); + })); + + zeek::plugin::Configuration config; + config.name = "Zeek::Broker_Serializer"; + config.description = "Event serialization using Broker event formats (binary and json)"; + return config; +} diff --git a/src/cluster/serializer/broker/Plugin.h b/src/cluster/serializer/broker/Plugin.h new file mode 100644 index 0000000000..a14eb8a8ea --- /dev/null +++ b/src/cluster/serializer/broker/Plugin.h @@ -0,0 +1,14 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#pragma once + +#include "zeek/plugin/Plugin.h" + +namespace zeek::plugin::Broker_Serializer { + +class Plugin : public zeek::plugin::Plugin { +public: + zeek::plugin::Configuration Configure() override; +} plugin; + +} // namespace zeek::plugin::Broker_Serializer diff --git a/src/cluster/serializer/broker/README b/src/cluster/serializer/broker/README new file mode 100644 index 0000000000..b316631357 --- /dev/null +++ b/src/cluster/serializer/broker/README @@ -0,0 +1 @@ +Contains event serializers using Broker's BIN and JSON formats. diff --git a/src/cluster/serializer/broker/Serializer.cc b/src/cluster/serializer/broker/Serializer.cc new file mode 100644 index 0000000000..068dd20767 --- /dev/null +++ b/src/cluster/serializer/broker/Serializer.cc @@ -0,0 +1,219 @@ +#include "zeek/cluster/serializer/broker/Serializer.h" + +#include + +#include "zeek/Desc.h" +#include "zeek/Func.h" +#include "zeek/Reporter.h" +#include "zeek/broker/Data.h" +#include "zeek/cluster/Backend.h" + +#include "broker/data_envelope.hh" +#include "broker/error.hh" +#include "broker/format/json.hh" +#include "broker/zeek.hh" + +#include "zeek/3rdparty/doctest.h" + +using namespace zeek::cluster; + +namespace { + +/** + * Convert a cluster::detail::Event to a broker::zeek::Event. + * + * @param ev The cluster::detail::Event + * @return A broker::zeek::Event to be serialized, or nullopt in case of errors. + */ +std::optional to_broker_event(const detail::Event& ev) { + broker::vector xs; + xs.reserve(ev.args.size()); + for ( const auto& a : ev.args ) { + if ( auto res = zeek::Broker::detail::val_to_data(a.get()) ) { + xs.emplace_back(std::move(res.value())); + } + else { + return std::nullopt; + } + } + + return broker::zeek::Event(ev.HandlerName(), xs, broker::to_timestamp(ev.timestamp)); +} + +/** + * Convert a broker::zeek::Event to cluster::detail::Event by looking + * it up in Zeek's event handler registry and converting event arguments + * to the appropriate Val instances. + * + * @param broker_ev The broker side event. + * @returns A zeek::cluster::detail::Event instance, or std::nullopt if the conversion failed. + */ +std::optional to_zeek_event(const broker::zeek::Event& ev) { + auto&& name = ev.name(); + auto&& args = ev.args(); + + // Meh, technically need to convert ev.metadata() and + // expose it to script land as `table[count] of any` + // where consumers then know what to do with it. + // + // For now, handle the timestamp explicitly. + double ts; + if ( auto ev_ts = ev.ts() ) + broker::convert(*ev_ts, ts); + else + ts = zeek::run_state::network_time; + + zeek::Args vl; + zeek::EventHandlerPtr handler = zeek::event_registry->Lookup(name); + if ( ! handler ) { + zeek::reporter->Error("Failed to lookup handler for '%s'", std::string(name).c_str()); + return std::nullopt; + } + + const auto& arg_types = handler->GetFunc()->GetType()->ParamList()->GetTypes(); + + if ( arg_types.size() != args.size() ) { + std::string event_name(name); + zeek::reporter->Error("Unserialize error '%s' arg_types.size()=%zu and args.size()=%zu", event_name.c_str(), + arg_types.size(), args.size()); + + return std::nullopt; + } + + for ( size_t i = 0; i < args.size(); ++i ) { + const auto& expected_type = arg_types[i]; + auto arg = args[i].to_data(); + auto val = zeek::Broker::detail::data_to_val(arg, expected_type.get()); + if ( val ) + vl.emplace_back(std::move(val)); + else { + std::string event_name(name); + auto got_type = args[i].get_type_name(); + std::string argstr = broker::to_string(arg); + zeek::reporter + ->Error("Unserialize error for event '%s': broker value '%s' type '%s' to Zeek type '%s' failed", + event_name.c_str(), argstr.c_str(), got_type, obj_desc(expected_type.get()).c_str()); + + return std::nullopt; + } + } + + return detail::Event{handler, std::move(vl), ts}; +} + +} // namespace + +bool detail::BrokerBinV1_Serializer::SerializeEvent(detail::byte_buffer& buf, const detail::Event& event) { + auto ev = to_broker_event(event); + if ( ! ev ) + return false; + + // The produced broker::zeek::Event is already in bin::v1 format after + // constructing it, so we can take the raw bytes directly rather than + // going through encode() again. + // + // broker::format::bin::v1::encode(ev->move_data(), std::back_inserter(buf)); + assert(ev->raw()->shared_envelope() != nullptr); + auto [raw, size] = ev->raw().shared_envelope()->raw_bytes(); + buf.insert(buf.begin(), raw, raw + size); + return true; +} + +std::optional detail::BrokerBinV1_Serializer::UnserializeEvent(detail::byte_buffer_span buf) { + auto r = broker::data_envelope::deserialize(broker::endpoint_id::nil(), broker::endpoint_id::nil(), 0, "", + buf.data(), buf.size()); + if ( ! r ) + return std::nullopt; + + broker::zeek::Event ev(*r); + return to_zeek_event(ev); +} + + +// Convert char to std::byte during push_back() so that +// we don't need to copy from std::vector to a +// std::vector when rendering JSON. +template +struct PushBackAdapter { + explicit PushBackAdapter(T& c) : container(&c) {} + using value_type = char; + + void push_back(char c) { container->push_back(static_cast(c)); } + + T* container; +}; + + +bool detail::BrokerJsonV1_Serializer::SerializeEvent(byte_buffer& buf, const detail::Event& event) { + auto ev = to_broker_event(event); + if ( ! ev ) + return false; + + auto push_back_adaptor = PushBackAdapter(buf); + broker::format::json::v1::encode(ev->move_data(), std::back_inserter(push_back_adaptor)); + return true; +} + +std::optional detail::BrokerJsonV1_Serializer::UnserializeEvent(detail::byte_buffer_span buf) { + broker::variant res; + auto err = + broker::format::json::v1::decode(std::string_view{reinterpret_cast(buf.data()), buf.size()}, res); + if ( err ) { + zeek::reporter->Error("Decode error for JSON payload: '%s'", + err.message() ? err.message()->c_str() : "unknown"); + return std::nullopt; + } + + broker::zeek::Event ev(std::move(res)); + return to_zeek_event(ev); +} + +TEST_SUITE_BEGIN("cluster serializer broker"); + +#include "zeek/EventRegistry.h" + +TEST_CASE("roundtrip") { + auto* handler = zeek::event_registry->Lookup("Supervisor::node_status"); + detail::Event e{handler, zeek::Args{zeek::make_intrusive("TEST"), zeek::val_mgr->Count(42)}}; + detail::byte_buffer buf; + + SUBCASE("json") { + detail::BrokerJsonV1_Serializer serializer; + std::string expected = + R"({"@data-type":"vector","data":[{"@data-type":"count","data":1},{"@data-type":"count","data":1},{"@data-type":"vector","data":[{"@data-type":"string","data":"Supervisor::node_status"},{"@data-type":"vector","data":[{"@data-type":"string","data":"TEST"},{"@data-type":"count","data":42}]},{"@data-type":"vector","data":[{"@data-type":"vector","data":[{"@data-type":"count","data":1},{"@data-type":"timestamp","data":"1970-01-01T00:00:00.000"}]}]}]}]})"; + + serializer.SerializeEvent(buf, e); + + CHECK_EQ(expected, std::string{reinterpret_cast(buf.data()), buf.size()}); + + auto result = serializer.UnserializeEvent(buf); + REQUIRE(result); + CHECK_EQ(result->Handler(), handler); + CHECK_EQ(result->HandlerName(), "Supervisor::node_status"); + CHECK_EQ(result->args.size(), 2); + } + + SUBCASE("binary") { + detail::BrokerBinV1_Serializer serializer; + unsigned char expected_bytes[] = {0x0e, 0x03, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x02, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x0e, 0x03, 0x05, 0x17, 0x53, 0x75, + 0x70, 0x65, 0x72, 0x76, 0x69, 0x73, 0x6f, 0x72, 0x3a, 0x3a, 0x6e, 0x6f, 0x64, + 0x65, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x0e, 0x02, 0x05, 0x04, 0x54, + 0x45, 0x53, 0x54, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x2a, 0x0e, + 0x01, 0x0e, 0x02, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x09, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}; + std::byte* p = reinterpret_cast(&expected_bytes[0]); + detail::byte_buffer expected{p, p + sizeof(expected_bytes)}; + + serializer.SerializeEvent(buf, e); + + CHECK_EQ(expected, buf); + + auto result = serializer.UnserializeEvent(buf); + REQUIRE(result); + CHECK_EQ(result->Handler(), handler); + CHECK_EQ(result->HandlerName(), "Supervisor::node_status"); + CHECK_EQ(result->args.size(), 2); + } +} +TEST_SUITE_END(); diff --git a/src/cluster/serializer/broker/Serializer.h b/src/cluster/serializer/broker/Serializer.h new file mode 100644 index 0000000000..00973b90c1 --- /dev/null +++ b/src/cluster/serializer/broker/Serializer.h @@ -0,0 +1,31 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#pragma once + +#include "zeek/cluster/Serializer.h" + +namespace zeek::cluster::detail { + +// Implementation of the EventSerializer using the existing broker::detail::val_to_data() +// and broker::format::bin::v1::encode(). +class BrokerBinV1_Serializer : public EventSerializer { +public: + BrokerBinV1_Serializer() : EventSerializer("broker-bin-v1") {} + + bool SerializeEvent(detail::byte_buffer& buf, const detail::Event& event) override; + + std::optional UnserializeEvent(detail::byte_buffer_span buf) override; +}; + +// Implementation of the EventSerializer that uses the existing broker::detail::val_to_data() +// and broker::format::json::v1::encode() +class BrokerJsonV1_Serializer : public EventSerializer { +public: + BrokerJsonV1_Serializer() : EventSerializer("broker-json-v1") {} + + bool SerializeEvent(zeek::cluster::detail::byte_buffer& buf, const detail::Event& event) override; + + std::optional UnserializeEvent(detail::byte_buffer_span buf) override; +}; + +} // namespace zeek::cluster::detail diff --git a/testing/btest/Baseline/cluster.serializer-enum/out b/testing/btest/Baseline/cluster.serializer-enum/out new file mode 100644 index 0000000000..adf5b130cd --- /dev/null +++ b/testing/btest/Baseline/cluster.serializer-enum/out @@ -0,0 +1,7 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +Zeek::Broker_Serializer - Event serialization using Broker event formats (binary and json) (built-in) + [Event Serializer] BROKER_BIN_V1 (Cluster::EVENT_SERIALIZER_BROKER_BIN_V1) + [Event Serializer] BROKER_JSON_V1 (Cluster::EVENT_SERIALIZER_BROKER_JSON_V1) + +Cluster::EVENT_SERIALIZER_BROKER_BIN_V1, Cluster::EventSerializerTag +Cluster::EVENT_SERIALIZER_BROKER_JSON_V1, Cluster::EventSerializerTag diff --git a/testing/btest/cluster/serializer-enum.zeek b/testing/btest/cluster/serializer-enum.zeek new file mode 100644 index 0000000000..c619ef557f --- /dev/null +++ b/testing/btest/cluster/serializer-enum.zeek @@ -0,0 +1,11 @@ +# @TEST-DOC: Test cluster backend enum +# +# @TEST-EXEC: zeek -NN Zeek::Broker_Serializer >>out +# @TEST-EXEC: zeek -b %INPUT >>out +# @TEST-EXEC: btest-diff out + +event zeek_init() + { + print Cluster::EVENT_SERIALIZER_BROKER_BIN_V1, type_name(Cluster::EVENT_SERIALIZER_BROKER_BIN_V1); + print Cluster::EVENT_SERIALIZER_BROKER_JSON_V1, type_name(Cluster::EVENT_SERIALIZER_BROKER_JSON_V1); + }