From fdde1e98411d053e6e079246a9a7cdf61429e8c0 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Thu, 14 Nov 2024 18:38:45 +0100 Subject: [PATCH] cluster/serializer: Add binary-serialization-format This is a serializer for log records that is using SerialTypes for serializing and un-serializing rather. Essentially, this is similar to what broker does except for the envelope. --- src/cluster/serializer/CMakeLists.txt | 1 + .../CMakeLists.txt | 9 + .../binary-serialization-format/Plugin.cc | 22 ++ .../binary-serialization-format/Plugin.h | 14 ++ .../binary-serialization-format/Serializer.cc | 198 ++++++++++++++++++ .../binary-serialization-format/Serializer.h | 22 ++ .../Baseline/cluster.serializer-enum/out | 4 + .../scripts.base.files.x509.files/files.log | 12 +- testing/btest/cluster/serializer-enum.zeek | 2 + 9 files changed, 278 insertions(+), 6 deletions(-) create mode 100644 src/cluster/serializer/binary-serialization-format/CMakeLists.txt create mode 100644 src/cluster/serializer/binary-serialization-format/Plugin.cc create mode 100644 src/cluster/serializer/binary-serialization-format/Plugin.h create mode 100644 src/cluster/serializer/binary-serialization-format/Serializer.cc create mode 100644 src/cluster/serializer/binary-serialization-format/Serializer.h diff --git a/src/cluster/serializer/CMakeLists.txt b/src/cluster/serializer/CMakeLists.txt index 4f3a423cd9..58a5accbe7 100644 --- a/src/cluster/serializer/CMakeLists.txt +++ b/src/cluster/serializer/CMakeLists.txt @@ -1 +1,2 @@ add_subdirectory(broker) +add_subdirectory(binary-serialization-format) diff --git a/src/cluster/serializer/binary-serialization-format/CMakeLists.txt b/src/cluster/serializer/binary-serialization-format/CMakeLists.txt new file mode 100644 index 0000000000..ba4b332bdc --- /dev/null +++ b/src/cluster/serializer/binary-serialization-format/CMakeLists.txt @@ -0,0 +1,9 @@ +zeek_add_plugin( + Zeek + Zeek_Binary_Serializer + INCLUDE_DIRS + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_BINARY_DIR} + SOURCES + Plugin.cc + Serializer.cc) diff --git a/src/cluster/serializer/binary-serialization-format/Plugin.cc b/src/cluster/serializer/binary-serialization-format/Plugin.cc new file mode 100644 index 0000000000..dc328b131a --- /dev/null +++ b/src/cluster/serializer/binary-serialization-format/Plugin.cc @@ -0,0 +1,22 @@ +#include "zeek/cluster/serializer/binary-serialization-format/Plugin.h" + +#include "zeek/cluster/Component.h" +#include "zeek/cluster/serializer/binary-serialization-format/Serializer.h" + +using namespace zeek::cluster; + +namespace zeek::plugin::Zeek_Binary_Serializer { + +Plugin plugin; + +zeek::plugin::Configuration Plugin::Configure() { + AddComponent(new LogSerializerComponent("ZEEK_BIN_V1", []() -> std::unique_ptr { + return std::make_unique(); + })); + + zeek::plugin::Configuration config; + config.name = "Zeek::Binary_Serializer"; + config.description = "Serialization using Zeek's custom binary serialization format"; + return config; +} +} // namespace zeek::plugin::Zeek_Binary_Serializer diff --git a/src/cluster/serializer/binary-serialization-format/Plugin.h b/src/cluster/serializer/binary-serialization-format/Plugin.h new file mode 100644 index 0000000000..3403813954 --- /dev/null +++ b/src/cluster/serializer/binary-serialization-format/Plugin.h @@ -0,0 +1,14 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#pragma once + +#include "zeek/plugin/Plugin.h" + +namespace zeek::plugin::Zeek_Binary_Serializer { + +class Plugin : public zeek::plugin::Plugin { +public: + zeek::plugin::Configuration Configure() override; +}; + +} // namespace zeek::plugin::Zeek_Binary_Serializer diff --git a/src/cluster/serializer/binary-serialization-format/Serializer.cc b/src/cluster/serializer/binary-serialization-format/Serializer.cc new file mode 100644 index 0000000000..fdddeacd54 --- /dev/null +++ b/src/cluster/serializer/binary-serialization-format/Serializer.cc @@ -0,0 +1,198 @@ +#include "zeek/cluster/serializer/binary-serialization-format/Serializer.h" + +#include + +#include "zeek/DebugLogger.h" +#include "zeek/ID.h" +#include "zeek/Reporter.h" +#include "zeek/SerializationFormat.h" +#include "zeek/Type.h" +#include "zeek/Val.h" +#include "zeek/cluster/Serializer.h" +#include "zeek/cluster/serializer/binary-serialization-format/Plugin.h" +#include "zeek/logging/Types.h" +#include "zeek/threading/SerialTypes.h" + +using namespace zeek::cluster; + +namespace zeek::plugin::Zeek_Binary_Serializer { + +extern Plugin plugin; + +} + +#define SERIALIZER_DEBUG(...) PLUGIN_DBG_LOG(zeek::plugin::Zeek_Binary_Serializer::plugin, __VA_ARGS__) + +bool detail::BinarySerializationFormatLogSerializer::SerializeLogWrite(byte_buffer& buf, + const logging::detail::LogWriteHeader& header, + zeek::Span records) { + zeek::detail::BinarySerializationFormat fmt; + + SERIALIZER_DEBUG("Serializing stream=%s writer=%s filter=%s path=%s num_fields=%zu num_records=%zu", + header.stream_name.c_str(), header.filter_name.c_str(), header.filter_name.c_str(), + header.path.c_str(), header.fields.size(), records.size()); + + fmt.StartWrite(); + + // Header: stream_name, writer_id, filter_name, path, num_fields, schema fields + bool success = true; + success &= fmt.Write(header.stream_name, "stream_id"); + success &= fmt.Write(header.writer_name, "writer_id"); + success &= fmt.Write(header.filter_name, "filter_name"); + success &= fmt.Write(header.path, "path"); + success &= fmt.Write(static_cast(header.fields.size()), "num_fields"); + for ( const auto& f : header.fields ) + success &= f.Write(&fmt); + + success &= fmt.Write(static_cast(records.size()), "num_records"); + + if ( ! success ) { + reporter->Error("Failed to remotely log stream %s: header serialization failed", header.stream_name.c_str()); + return false; + } + + // Write out the payload. + for ( const auto& rec : records ) { + for ( size_t i = 0; i < rec.size(); ++i ) { + if ( ! rec[i].Write(&fmt) ) { + reporter->Error("Failed to remotely log stream %s: field %zu serialization failed", + header.stream_name.c_str(), i); + return false; + } + } + } + + char* data; + uint32_t len; + len = fmt.EndWrite(&data); + + // Copy result into buffer. Would be nice to serialize directly into the buffer, + // but the SerializationFormat doesn't really allow for that. + buf.resize(len); + memcpy(buf.data(), data, len); + free(data); + return true; +} + +std::optional detail::BinarySerializationFormatLogSerializer::UnserializeLogWrite( + detail::byte_buffer_span buf) { + zeek::detail::BinarySerializationFormat fmt; + fmt.StartRead(reinterpret_cast(buf.data()), buf.size()); + + logging::detail::LogWriteHeader header; + std::vector records; + + fmt.Read(&header.stream_name, "stream_id"); + fmt.Read(&header.writer_name, "writer_id"); + fmt.Read(&header.filter_name, "filter_name"); + fmt.Read(&header.path, "path"); + + if ( ! header.PopulateEnumVals() ) { + reporter->Error("Failed to populate enum vals from stream_name='%s' writer_name='%s'", + header.stream_name.c_str(), header.writer_name.c_str()); + return {}; + }; + + uint32_t num_fields; + if ( ! fmt.Read(&num_fields, "num_fields") ) { + reporter->Error("Failed to read num_fields"); + return {}; + } + + header.fields.resize(num_fields); + + for ( size_t i = 0; i < header.fields.size(); i++ ) + if ( ! header.fields[i].Read(&fmt) ) { + reporter->Error("Failed to read schema field %zu", i); + return {}; + } + + uint32_t num_records; + if ( ! fmt.Read(&num_records, "num_records") ) { + reporter->Error("Failed to read schema field"); + return {}; + } + + SERIALIZER_DEBUG("stream=%s writer=%s filter=%s path=%s num_fields=%u num_records=%u", header.stream_name.c_str(), + header.writer_name.c_str(), header.filter_name.c_str(), header.path.c_str(), num_fields, + num_records); + + records.reserve(num_records); + + for ( uint32_t i = 0; i < num_records; i++ ) { + logging::detail::LogRecord rec(num_fields); + for ( uint32_t j = 0; j < num_fields; j++ ) { + if ( ! rec[j].Read(&fmt) ) { + reporter->Error("Failed to read record %u field %u", i, j); + return {}; + } + } + + records.push_back(std::move(rec)); + } + + fmt.EndRead(); + + return logging::detail::LogWriteBatch{std::move(header), std::move(records)}; +} + +#include "zeek/ID.h" + +#include "zeek/3rdparty/doctest.h" + +TEST_SUITE_BEGIN("cluster serializer binary-serialization-format"); + +TEST_CASE("roundtrip") { + detail::byte_buffer buf; + detail::BinarySerializationFormatLogSerializer serializer; + + static const auto& stream_id_type = zeek::id::find_type("Log::ID"); + static const auto& writer_id_type = zeek::id::find_type("Log::Writer"); + + unsigned char expected_bytes[] = {0x00, 0x00, 0x00, 0x0c, 0x4c, 0x6f, 0x67, 0x3a, 0x3a, 0x55, 0x4e, 0x4b, 0x4e, + 0x4f, 0x57, 0x4e, 0x00, 0x00, 0x00, 0x10, 0x4c, 0x6f, 0x67, 0x3a, 0x3a, 0x57, + 0x52, 0x49, 0x54, 0x45, 0x52, 0x5f, 0x4e, 0x4f, 0x4e, 0x45, 0x00, 0x00, 0x00, + 0x07, 0x64, 0x65, 0x66, 0x61, 0x75, 0x6c, 0x74, 0x00, 0x00, 0x00, 0x07, 0x6d, + 0x79, 0x2d, 0x70, 0x61, 0x74, 0x68, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, + 0x00, 0x02, 0x74, 0x73, 0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x00, 0x16, 0x00, + 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x00, 0x16, 0x01, + 0x3f, 0xf0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, 0x00, + 0x00, 0x00, 0x16, 0x01, 0x40, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}; + std::byte* p = reinterpret_cast(&expected_bytes[0]); + detail::byte_buffer expected{p, p + sizeof(expected_bytes)}; + + auto s = stream_id_type->Lookup("Log::UNKNOWN"); + REQUIRE_GE(s, 0); + auto w = writer_id_type->Lookup("Log::WRITER_NONE"); + REQUIRE_GE(w, 0); + + const auto& stream = stream_id_type->GetEnumVal(s); + const auto& writer = writer_id_type->GetEnumVal(w); + + zeek::logging::detail::LogWriteHeader hdr(stream, writer, "default", "my-path"); + hdr.fields = {zeek::threading::Field{"ts", nullptr, zeek::TYPE_TIME, zeek::TYPE_ERROR, false}}; + + std::vector records; + records.push_back({zeek::threading::Value{zeek::TYPE_TIME, zeek::TYPE_ERROR, true}}); + records.push_back({zeek::threading::Value{zeek::TYPE_TIME, zeek::TYPE_ERROR, true}}); + records[0][0].val.double_val = 1.0; + records[1][0].val.double_val = 2.0; + + REQUIRE(serializer.SerializeLogWrite(buf, hdr, records)); + CHECK_EQ(expected, buf); + + // for ( auto c : buf ) + // std::fprintf(stderr, "0x%02x,", int(c)); + + + auto result = serializer.UnserializeLogWrite(buf); + REQUIRE(result); + + CHECK_EQ(result->header.fields.size(), 1); + CHECK_EQ(result->records.size(), 2); + CHECK_EQ(result->records[0][0].val.double_val, 1.0); + CHECK_EQ(result->records[1][0].val.double_val, 2.0); + CHECK_EQ("Log::UNKNOWN", result->header.stream_name); + CHECK_EQ("Log::WRITER_NONE", result->header.writer_name); +} +TEST_SUITE_END(); diff --git a/src/cluster/serializer/binary-serialization-format/Serializer.h b/src/cluster/serializer/binary-serialization-format/Serializer.h new file mode 100644 index 0000000000..2fa6bbc4c6 --- /dev/null +++ b/src/cluster/serializer/binary-serialization-format/Serializer.h @@ -0,0 +1,22 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#pragma once + +#include + +#include "zeek/cluster/Serializer.h" +#include "zeek/logging/Types.h" + +namespace zeek::cluster::detail { + +class BinarySerializationFormatLogSerializer : public cluster::LogSerializer { +public: + BinarySerializationFormatLogSerializer() : LogSerializer("zeek-bin-serializer") {} + + bool SerializeLogWrite(cluster::detail::byte_buffer& buf, const logging::detail::LogWriteHeader& header, + zeek::Span records) override; + + std::optional UnserializeLogWrite(detail::byte_buffer_span buf) override; +}; + +} // namespace zeek::cluster::detail diff --git a/testing/btest/Baseline/cluster.serializer-enum/out b/testing/btest/Baseline/cluster.serializer-enum/out index adf5b130cd..8e3b28a865 100644 --- a/testing/btest/Baseline/cluster.serializer-enum/out +++ b/testing/btest/Baseline/cluster.serializer-enum/out @@ -3,5 +3,9 @@ Zeek::Broker_Serializer - Event serialization using Broker event formats (binary [Event Serializer] BROKER_BIN_V1 (Cluster::EVENT_SERIALIZER_BROKER_BIN_V1) [Event Serializer] BROKER_JSON_V1 (Cluster::EVENT_SERIALIZER_BROKER_JSON_V1) +Zeek::Binary_Serializer - Serialization using Zeek's custom binary serialization format (built-in) + [Log Serializer] ZEEK_BIN_V1 (Cluster::LOG_SERIALIZER_ZEEK_BIN_V1) + Cluster::EVENT_SERIALIZER_BROKER_BIN_V1, Cluster::EventSerializerTag Cluster::EVENT_SERIALIZER_BROKER_JSON_V1, Cluster::EventSerializerTag +Cluster::LOG_SERIALIZER_ZEEK_BIN_V1, Cluster::LogSerializerTag diff --git a/testing/btest/Baseline/scripts.base.files.x509.files/files.log b/testing/btest/Baseline/scripts.base.files.x509.files/files.log index ce19924fa1..e64dfc52c0 100644 --- a/testing/btest/Baseline/scripts.base.files.x509.files/files.log +++ b/testing/btest/Baseline/scripts.base.files.x509.files/files.log @@ -7,10 +7,10 @@ #open XXXX-XX-XX-XX-XX-XX #fields ts fuid uid id.orig_h id.orig_p id.resp_h id.resp_p source depth analyzers mime_type filename duration local_orig is_orig seen_bytes total_bytes missing_bytes overflow_bytes timedout parent_fuid md5 sha1 sha256 #types time string string addr port addr port string count set[string] string string interval bool bool count count count count bool string string string string -XXXXXXXXXX.XXXXXX FgN3AE3of2TRIqaeQe CHhAvVGS1DHFjwGM9 192.168.4.149 60623 74.125.239.129 443 SSL 0 X509,SHA256,SHA1,MD5 application/x-x509-user-cert - 0.000000 F F 1859 - 0 0 F - 7af07aca6d5c6e8e87fe4bb34786edc0 548b9e03bc183d1cd39f93a37985cb3950f8f06f 6bacfa4536150ed996f2b0c05ab6e345a257225f449aeb9d2018ccd88f4ede43 -XXXXXXXXXX.XXXXXX Fv2Agc4z5boBOacQi6 CHhAvVGS1DHFjwGM9 192.168.4.149 60623 74.125.239.129 443 SSL 0 X509,SHA256,SHA1,MD5 application/x-x509-ca-cert - 0.000000 F F 1032 - 0 0 F - 9e4ac96474245129d9766700412a1f89 d83c1a7f4d0446bb2081b81a1670f8183451ca24 a047a37fa2d2e118a4f5095fe074d6cfe0e352425a7632bf8659c03919a6c81d -XXXXXXXXXX.XXXXXX Ftmyeg2qgI2V38Dt3g CHhAvVGS1DHFjwGM9 192.168.4.149 60623 74.125.239.129 443 SSL 0 X509,SHA256,SHA1,MD5 application/x-x509-ca-cert - 0.000000 F F 897 - 0 0 F - 2e7db2a31d0e3da4b25f49b9542a2e1a 7359755c6df9a0abc3060bce369564c8ec4542a3 3c35cc963eb004451323d3275d05b353235053490d9cd83729a2faf5e7ca1cc0 -XXXXXXXXXX.XXXXXX FUFNf84cduA0IJCp07 ClEkJM2Vm5giqnMf4h 192.168.4.149 60624 74.125.239.129 443 SSL 0 X509,SHA256,SHA1,MD5 application/x-x509-user-cert - 0.000000 F F 1859 - 0 0 F - 7af07aca6d5c6e8e87fe4bb34786edc0 548b9e03bc183d1cd39f93a37985cb3950f8f06f 6bacfa4536150ed996f2b0c05ab6e345a257225f449aeb9d2018ccd88f4ede43 -XXXXXXXXXX.XXXXXX F1H4bd2OKGbLPEdHm4 ClEkJM2Vm5giqnMf4h 192.168.4.149 60624 74.125.239.129 443 SSL 0 X509,SHA256,SHA1,MD5 application/x-x509-ca-cert - 0.000000 F F 1032 - 0 0 F - 9e4ac96474245129d9766700412a1f89 d83c1a7f4d0446bb2081b81a1670f8183451ca24 a047a37fa2d2e118a4f5095fe074d6cfe0e352425a7632bf8659c03919a6c81d -XXXXXXXXXX.XXXXXX Fgsbci2jxFXYMOHOhi ClEkJM2Vm5giqnMf4h 192.168.4.149 60624 74.125.239.129 443 SSL 0 X509,SHA256,SHA1,MD5 application/x-x509-ca-cert - 0.000000 F F 897 - 0 0 F - 2e7db2a31d0e3da4b25f49b9542a2e1a 7359755c6df9a0abc3060bce369564c8ec4542a3 3c35cc963eb004451323d3275d05b353235053490d9cd83729a2faf5e7ca1cc0 +XXXXXXXXXX.XXXXXX FgN3AE3of2TRIqaeQe CHhAvVGS1DHFjwGM9 192.168.4.149 60623 74.125.239.129 443 SSL 0 SHA256,X509,SHA1,MD5 application/x-x509-user-cert - 0.000000 F F 1859 - 0 0 F - 7af07aca6d5c6e8e87fe4bb34786edc0 548b9e03bc183d1cd39f93a37985cb3950f8f06f 6bacfa4536150ed996f2b0c05ab6e345a257225f449aeb9d2018ccd88f4ede43 +XXXXXXXXXX.XXXXXX Fv2Agc4z5boBOacQi6 CHhAvVGS1DHFjwGM9 192.168.4.149 60623 74.125.239.129 443 SSL 0 SHA256,X509,SHA1,MD5 application/x-x509-ca-cert - 0.000000 F F 1032 - 0 0 F - 9e4ac96474245129d9766700412a1f89 d83c1a7f4d0446bb2081b81a1670f8183451ca24 a047a37fa2d2e118a4f5095fe074d6cfe0e352425a7632bf8659c03919a6c81d +XXXXXXXXXX.XXXXXX Ftmyeg2qgI2V38Dt3g CHhAvVGS1DHFjwGM9 192.168.4.149 60623 74.125.239.129 443 SSL 0 SHA256,X509,SHA1,MD5 application/x-x509-ca-cert - 0.000000 F F 897 - 0 0 F - 2e7db2a31d0e3da4b25f49b9542a2e1a 7359755c6df9a0abc3060bce369564c8ec4542a3 3c35cc963eb004451323d3275d05b353235053490d9cd83729a2faf5e7ca1cc0 +XXXXXXXXXX.XXXXXX FUFNf84cduA0IJCp07 ClEkJM2Vm5giqnMf4h 192.168.4.149 60624 74.125.239.129 443 SSL 0 SHA256,X509,SHA1,MD5 application/x-x509-user-cert - 0.000000 F F 1859 - 0 0 F - 7af07aca6d5c6e8e87fe4bb34786edc0 548b9e03bc183d1cd39f93a37985cb3950f8f06f 6bacfa4536150ed996f2b0c05ab6e345a257225f449aeb9d2018ccd88f4ede43 +XXXXXXXXXX.XXXXXX F1H4bd2OKGbLPEdHm4 ClEkJM2Vm5giqnMf4h 192.168.4.149 60624 74.125.239.129 443 SSL 0 SHA256,X509,SHA1,MD5 application/x-x509-ca-cert - 0.000000 F F 1032 - 0 0 F - 9e4ac96474245129d9766700412a1f89 d83c1a7f4d0446bb2081b81a1670f8183451ca24 a047a37fa2d2e118a4f5095fe074d6cfe0e352425a7632bf8659c03919a6c81d +XXXXXXXXXX.XXXXXX Fgsbci2jxFXYMOHOhi ClEkJM2Vm5giqnMf4h 192.168.4.149 60624 74.125.239.129 443 SSL 0 SHA256,X509,SHA1,MD5 application/x-x509-ca-cert - 0.000000 F F 897 - 0 0 F - 2e7db2a31d0e3da4b25f49b9542a2e1a 7359755c6df9a0abc3060bce369564c8ec4542a3 3c35cc963eb004451323d3275d05b353235053490d9cd83729a2faf5e7ca1cc0 #close XXXX-XX-XX-XX-XX-XX diff --git a/testing/btest/cluster/serializer-enum.zeek b/testing/btest/cluster/serializer-enum.zeek index c619ef557f..0b877544ff 100644 --- a/testing/btest/cluster/serializer-enum.zeek +++ b/testing/btest/cluster/serializer-enum.zeek @@ -1,6 +1,7 @@ # @TEST-DOC: Test cluster backend enum # # @TEST-EXEC: zeek -NN Zeek::Broker_Serializer >>out +# @TEST-EXEC: zeek -NN Zeek::Binary_Serializer >>out # @TEST-EXEC: zeek -b %INPUT >>out # @TEST-EXEC: btest-diff out @@ -8,4 +9,5 @@ event zeek_init() { print Cluster::EVENT_SERIALIZER_BROKER_BIN_V1, type_name(Cluster::EVENT_SERIALIZER_BROKER_BIN_V1); print Cluster::EVENT_SERIALIZER_BROKER_JSON_V1, type_name(Cluster::EVENT_SERIALIZER_BROKER_JSON_V1); + print Cluster::LOG_SERIALIZER_ZEEK_BIN_V1, type_name(Cluster::LOG_SERIALIZER_ZEEK_BIN_V1); }