diff --git a/src/broker/Manager.h b/src/broker/Manager.h index c9e7f3c8e9..d2dfbf2261 100644 --- a/src/broker/Manager.h +++ b/src/broker/Manager.h @@ -400,8 +400,7 @@ private: // This should never be reached, broker itself doesn't call this and overrides // the generic DoPublishEvent() method that would call this. - bool DoPublishEvent(const std::string& topic, const std::string& format, - const cluster::detail::byte_buffer& buf) override { + bool DoPublishEvent(const std::string& topic, const std::string& format, const byte_buffer& buf) override { throw std::logic_error("not implemented"); } @@ -416,7 +415,7 @@ private: } bool DoPublishLogWrites(const logging::detail::LogWriteHeader& header, const std::string& format, - cluster::detail::byte_buffer& buf) override { + byte_buffer& buf) override { // Not implemented by broker. throw std::logic_error("not implemented"); } diff --git a/src/cluster/Backend.cc b/src/cluster/Backend.cc index 8fba80b48b..53b70047cc 100644 --- a/src/cluster/Backend.cc +++ b/src/cluster/Backend.cc @@ -100,7 +100,7 @@ std::optional Backend::MakeClusterEvent(FuncValPtr handler, ArgsS // Default implementation doing the serialization. bool Backend::DoPublishEvent(const std::string& topic, cluster::detail::Event& event) { - cluster::detail::byte_buffer buf; + byte_buffer buf; if ( ! event_serializer->SerializeEvent(buf, event) ) return false; @@ -111,7 +111,7 @@ bool Backend::DoPublishEvent(const std::string& topic, cluster::detail::Event& e // Default implementation doing log record serialization. bool Backend::DoPublishLogWrites(const zeek::logging::detail::LogWriteHeader& header, zeek::Span records) { - cluster::detail::byte_buffer buf; + byte_buffer buf; if ( ! log_serializer->SerializeLogWrite(buf, header, records) ) return false; @@ -123,8 +123,7 @@ void Backend::EnqueueEvent(EventHandlerPtr h, zeek::Args args) { event_handling_strategy->EnqueueLocalEvent(h, std::move(args)); } -bool Backend::ProcessEventMessage(std::string_view topic, std::string_view format, - const detail::byte_buffer_span payload) { +bool Backend::ProcessEventMessage(std::string_view topic, std::string_view format, const byte_buffer_span payload) { if ( format != event_serializer->Name() ) { zeek::reporter->Error("ProcessEventMessage: Wrong format: %s vs %s", std::string{format}.c_str(), event_serializer->Name().c_str()); @@ -143,7 +142,7 @@ bool Backend::ProcessEventMessage(std::string_view topic, std::string_view forma return event_handling_strategy->HandleRemoteEvent(topic, std::move(*r)); } -bool Backend::ProcessLogMessage(std::string_view format, detail::byte_buffer_span payload) { +bool Backend::ProcessLogMessage(std::string_view format, byte_buffer_span payload) { // We could also dynamically lookup the right de-serializer, but // for now assume we just receive what is configured. if ( format != log_serializer->Name() ) { @@ -162,7 +161,7 @@ bool Backend::ProcessLogMessage(std::string_view format, detail::byte_buffer_spa return zeek::log_mgr->WriteBatchFromRemote(result->header, std::move(result->records)); } -bool ThreadedBackend::ProcessBackendMessage(int tag, detail::byte_buffer_span payload) { +bool ThreadedBackend::ProcessBackendMessage(int tag, byte_buffer_span payload) { return DoProcessBackendMessage(tag, payload); } diff --git a/src/cluster/Backend.h b/src/cluster/Backend.h index 5f0495ac18..03f6de3e49 100644 --- a/src/cluster/Backend.h +++ b/src/cluster/Backend.h @@ -311,12 +311,12 @@ protected: /** * Process an incoming event message. */ - bool ProcessEventMessage(std::string_view topic, std::string_view format, detail::byte_buffer_span payload); + bool ProcessEventMessage(std::string_view topic, std::string_view format, byte_buffer_span payload); /** * Process an incoming log message. */ - bool ProcessLogMessage(std::string_view format, detail::byte_buffer_span payload); + bool ProcessLogMessage(std::string_view format, byte_buffer_span payload); private: /** @@ -350,7 +350,7 @@ private: /** * Publish a cluster::detail::Event to the given topic. * - * The default implementation serializes to a detail::byte_buffer and + * The default implementation serializes to a byte_buffer and * calls DoPublishEvent() with the resulting buffer. * * This hook method only exists for the existing Broker implementation that @@ -373,8 +373,7 @@ private: * @param buf the serialized Event. * @return true if the message has been published successfully. */ - virtual bool DoPublishEvent(const std::string& topic, const std::string& format, - const detail::byte_buffer& buf) = 0; + virtual bool DoPublishEvent(const std::string& topic, const std::string& format, const byte_buffer& buf) = 0; /** * Register interest in messages that use a certain topic prefix. @@ -405,7 +404,7 @@ private: /** * Serialize a log batch, then forward it to DoPublishLogWrites() below. - * The default implementation serializes to a detail::byte_buffer and + * The default implementation serializes to a byte_buffer and * calls DoPublishLogWrites() with the resulting buffer. * * This hook method only exists for the existing Broker implementation that @@ -440,7 +439,7 @@ private: * @return true if the message has been published successfully. */ virtual bool DoPublishLogWrites(const zeek::logging::detail::LogWriteHeader& header, const std::string& format, - detail::byte_buffer& buf) = 0; + byte_buffer& buf) = 0; std::string name; zeek::Tag tag; @@ -471,7 +470,7 @@ private: struct EventMessage { std::string topic; std::string format; - detail::byte_buffer payload; + byte_buffer payload; auto payload_span() const { return Span(payload.data(), payload.size()); }; }; @@ -481,7 +480,7 @@ struct EventMessage { */ struct LogMessage { std::string format; - detail::byte_buffer payload; + byte_buffer payload; auto payload_span() const { return Span(payload.data(), payload.size()); }; }; @@ -494,7 +493,7 @@ struct LogMessage { */ struct BackendMessage { int tag; - detail::byte_buffer payload; + byte_buffer payload; auto payload_span() const { return Span(payload.data(), payload.size()); }; }; @@ -546,13 +545,13 @@ private: /** * Process a backend specific message queued as BackendMessage. */ - bool ProcessBackendMessage(int tag, detail::byte_buffer_span payload); + bool ProcessBackendMessage(int tag, byte_buffer_span payload); /** * If a cluster backend produces messages of type BackendMessage, * this method will be invoked by the main thread to process it. */ - virtual bool DoProcessBackendMessage(int tag, detail::byte_buffer_span payload) { return false; }; + virtual bool DoProcessBackendMessage(int tag, byte_buffer_span payload) { return false; }; /** * Hook method for OnLooProcess. diff --git a/src/cluster/Serializer.h b/src/cluster/Serializer.h index 2be72ceca2..cc2402641b 100644 --- a/src/cluster/Serializer.h +++ b/src/cluster/Serializer.h @@ -6,7 +6,6 @@ #include #include -#include #include "zeek/Span.h" #include "zeek/logging/Types.h" @@ -15,13 +14,8 @@ namespace zeek::cluster { namespace detail { class Event; - -using byte_buffer = std::vector; -using byte_buffer_span = Span; - } // namespace detail - /** * This class handles encoding of events into byte buffers and back. * @@ -40,7 +34,7 @@ public: * * @returns True on success, false in exceptional cases (e.g. unsupported serialization). */ - virtual bool SerializeEvent(detail::byte_buffer& buf, const detail::Event& event) = 0; + virtual bool SerializeEvent(byte_buffer& buf, const detail::Event& event) = 0; /** * Unserialize an event from a given byte buffer. @@ -49,7 +43,7 @@ public: * * @returns The event, or std::nullopt on error. */ - virtual std::optional UnserializeEvent(detail::byte_buffer_span buf) = 0; + virtual std::optional UnserializeEvent(byte_buffer_span buf) = 0; /** * @returns The name of this event serializer instance. @@ -85,7 +79,7 @@ public: * @param header The log batch header. * @param records The actual log writes. */ - virtual bool SerializeLogWrite(detail::byte_buffer& buf, const logging::detail::LogWriteHeader& header, + virtual bool SerializeLogWrite(byte_buffer& buf, const logging::detail::LogWriteHeader& header, zeek::Span records) = 0; /** @@ -93,7 +87,7 @@ public: * * @param buf The span representing received log writes. */ - virtual std::optional UnserializeLogWrite(detail::byte_buffer_span buf) = 0; + virtual std::optional UnserializeLogWrite(byte_buffer_span buf) = 0; /** * @returns The name of this log serializer instance. diff --git a/src/cluster/backend/zeromq/ZeroMQ.cc b/src/cluster/backend/zeromq/ZeroMQ.cc index 43a48aec89..3e587dd963 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.cc +++ b/src/cluster/backend/zeromq/ZeroMQ.cc @@ -253,8 +253,7 @@ bool ZeroMQBackend::SpawnZmqProxyThread() { return proxy_thread->Start(); } -bool ZeroMQBackend::DoPublishEvent(const std::string& topic, const std::string& format, - const cluster::detail::byte_buffer& buf) { +bool ZeroMQBackend::DoPublishEvent(const std::string& topic, const std::string& format, const byte_buffer& buf) { // Publishing an event happens as a multipart message with 4 parts: // // * The topic to publish to - this is required by XPUB/XSUB @@ -336,7 +335,7 @@ bool ZeroMQBackend::DoUnsubscribe(const std::string& topic_prefix) { } bool ZeroMQBackend::DoPublishLogWrites(const logging::detail::LogWriteHeader& header, const std::string& format, - cluster::detail::byte_buffer& buf) { + byte_buffer& buf) { ZEROMQ_DEBUG("Publishing %zu bytes of log writes (path %s)", buf.size(), header.path.c_str()); static std::string message_type = "log-write"; @@ -405,7 +404,7 @@ void ZeroMQBackend::Run() { continue; } - detail::byte_buffer payload{msg[3].data(), msg[3].data() + msg[3].size()}; + byte_buffer payload{msg[3].data(), msg[3].data() + msg[3].size()}; LogMessage lm{.format = std::string(msg[2].data(), msg[2].size()), .payload = std::move(payload)}; @@ -487,7 +486,7 @@ void ZeroMQBackend::Run() { QueueMessage qm; auto* start = msg[0].data() + 1; auto* end = msg[0].data() + msg[0].size(); - detail::byte_buffer topic(start, end); + byte_buffer topic(start, end); if ( first == 1 ) { qm = BackendMessage{1, std::move(topic)}; } @@ -516,7 +515,7 @@ void ZeroMQBackend::Run() { if ( sender == NodeId() ) continue; - detail::byte_buffer payload{msg[3].data(), msg[3].data() + msg[3].size()}; + byte_buffer payload{msg[3].data(), msg[3].data() + msg[3].size()}; EventMessage em{.topic = std::string(msg[0].data(), msg[0].size()), .format = std::string(msg[2].data(), msg[2].size()), .payload = std::move(payload)}; @@ -644,7 +643,7 @@ void ZeroMQBackend::Run() { } } -bool ZeroMQBackend::DoProcessBackendMessage(int tag, detail::byte_buffer_span payload) { +bool ZeroMQBackend::DoProcessBackendMessage(int tag, byte_buffer_span payload) { if ( tag == 0 || tag == 1 ) { std::string topic{reinterpret_cast(payload.data()), payload.size()}; zeek::EventHandlerPtr eh; diff --git a/src/cluster/backend/zeromq/ZeroMQ.h b/src/cluster/backend/zeromq/ZeroMQ.h index fed3585394..088f922f34 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.h +++ b/src/cluster/backend/zeromq/ZeroMQ.h @@ -60,17 +60,16 @@ private: void DoTerminate() override; - bool DoPublishEvent(const std::string& topic, const std::string& format, - const cluster::detail::byte_buffer& buf) override; + bool DoPublishEvent(const std::string& topic, const std::string& format, const byte_buffer& buf) override; bool DoSubscribe(const std::string& topic_prefix, SubscribeCallback cb) override; bool DoUnsubscribe(const std::string& topic_prefix) override; bool DoPublishLogWrites(const logging::detail::LogWriteHeader& header, const std::string& format, - cluster::detail::byte_buffer& buf) override; + byte_buffer& buf) override; - bool DoProcessBackendMessage(int tag, detail::byte_buffer_span payload) override; + bool DoProcessBackendMessage(int tag, byte_buffer_span payload) override; // Script level variables. std::string connect_xsub_endpoint; diff --git a/src/cluster/serializer/binary-serialization-format/Serializer.cc b/src/cluster/serializer/binary-serialization-format/Serializer.cc index 1bb2e49710..b26087e34c 100644 --- a/src/cluster/serializer/binary-serialization-format/Serializer.cc +++ b/src/cluster/serializer/binary-serialization-format/Serializer.cc @@ -77,7 +77,7 @@ bool detail::BinarySerializationFormatLogSerializer::SerializeLogWrite(byte_buff } std::optional detail::BinarySerializationFormatLogSerializer::UnserializeLogWrite( - detail::byte_buffer_span buf) { + byte_buffer_span buf) { zeek::detail::BinarySerializationFormat fmt; fmt.StartRead(reinterpret_cast(buf.data()), buf.size()); @@ -145,7 +145,7 @@ std::optional detail::BinarySerializationF TEST_SUITE_BEGIN("cluster serializer binary-serialization-format"); TEST_CASE("roundtrip") { - detail::byte_buffer buf; + zeek::byte_buffer buf; detail::BinarySerializationFormatLogSerializer serializer; static const auto& stream_id_type = zeek::id::find_type("Log::ID"); @@ -161,7 +161,7 @@ TEST_CASE("roundtrip") { 0x3f, 0xf0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x00, 0x16, 0x01, 0x40, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}; std::byte* p = reinterpret_cast(&expected_bytes[0]); - detail::byte_buffer expected{p, p + sizeof(expected_bytes)}; + zeek::byte_buffer expected{p, p + sizeof(expected_bytes)}; auto s = stream_id_type->Lookup("Log::UNKNOWN"); REQUIRE_GE(s, 0); diff --git a/src/cluster/serializer/binary-serialization-format/Serializer.h b/src/cluster/serializer/binary-serialization-format/Serializer.h index 2fa6bbc4c6..79640635e4 100644 --- a/src/cluster/serializer/binary-serialization-format/Serializer.h +++ b/src/cluster/serializer/binary-serialization-format/Serializer.h @@ -13,10 +13,10 @@ class BinarySerializationFormatLogSerializer : public cluster::LogSerializer { public: BinarySerializationFormatLogSerializer() : LogSerializer("zeek-bin-serializer") {} - bool SerializeLogWrite(cluster::detail::byte_buffer& buf, const logging::detail::LogWriteHeader& header, + bool SerializeLogWrite(byte_buffer& buf, const logging::detail::LogWriteHeader& header, zeek::Span records) override; - std::optional UnserializeLogWrite(detail::byte_buffer_span buf) override; + std::optional UnserializeLogWrite(byte_buffer_span buf) override; }; } // namespace zeek::cluster::detail diff --git a/src/cluster/serializer/broker/Serializer.cc b/src/cluster/serializer/broker/Serializer.cc index 5bd32f8738..d47e61018d 100644 --- a/src/cluster/serializer/broker/Serializer.cc +++ b/src/cluster/serializer/broker/Serializer.cc @@ -101,7 +101,7 @@ std::optional detail::to_zeek_event(const broker::zeek::Event& ev return detail::Event{handler, std::move(vl), ts}; } -bool detail::BrokerBinV1_Serializer::SerializeEvent(detail::byte_buffer& buf, const detail::Event& event) { +bool detail::BrokerBinV1_Serializer::SerializeEvent(byte_buffer& buf, const detail::Event& event) { auto ev = to_broker_event(event); if ( ! ev ) return false; @@ -117,7 +117,7 @@ bool detail::BrokerBinV1_Serializer::SerializeEvent(detail::byte_buffer& buf, co return true; } -std::optional detail::BrokerBinV1_Serializer::UnserializeEvent(detail::byte_buffer_span buf) { +std::optional detail::BrokerBinV1_Serializer::UnserializeEvent(byte_buffer_span buf) { auto r = broker::data_envelope::deserialize(broker::endpoint_id::nil(), broker::endpoint_id::nil(), 0, "", buf.data(), buf.size()); if ( ! r ) @@ -152,7 +152,7 @@ bool detail::BrokerJsonV1_Serializer::SerializeEvent(byte_buffer& buf, const det return true; } -std::optional detail::BrokerJsonV1_Serializer::UnserializeEvent(detail::byte_buffer_span buf) { +std::optional detail::BrokerJsonV1_Serializer::UnserializeEvent(byte_buffer_span buf) { broker::variant res; auto err = broker::format::json::v1::decode(std::string_view{reinterpret_cast(buf.data()), buf.size()}, res); @@ -173,7 +173,7 @@ TEST_SUITE_BEGIN("cluster serializer broker"); TEST_CASE("roundtrip") { auto* handler = zeek::event_registry->Lookup("Supervisor::node_status"); detail::Event e{handler, zeek::Args{zeek::make_intrusive("TEST"), zeek::val_mgr->Count(42)}}; - detail::byte_buffer buf; + zeek::byte_buffer buf; SUBCASE("json") { detail::BrokerJsonV1_Serializer serializer; @@ -201,7 +201,7 @@ TEST_CASE("roundtrip") { 0x01, 0x0e, 0x02, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x09, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}; std::byte* p = reinterpret_cast(&expected_bytes[0]); - detail::byte_buffer expected{p, p + sizeof(expected_bytes)}; + zeek::byte_buffer expected{p, p + sizeof(expected_bytes)}; serializer.SerializeEvent(buf, e); diff --git a/src/cluster/serializer/broker/Serializer.h b/src/cluster/serializer/broker/Serializer.h index 4b9adae241..60e2691dbf 100644 --- a/src/cluster/serializer/broker/Serializer.h +++ b/src/cluster/serializer/broker/Serializer.h @@ -34,9 +34,9 @@ class BrokerBinV1_Serializer : public EventSerializer { public: BrokerBinV1_Serializer() : EventSerializer("broker-bin-v1") {} - bool SerializeEvent(detail::byte_buffer& buf, const detail::Event& event) override; + bool SerializeEvent(byte_buffer& buf, const detail::Event& event) override; - std::optional UnserializeEvent(detail::byte_buffer_span buf) override; + std::optional UnserializeEvent(byte_buffer_span buf) override; }; // Implementation of the EventSerializer that uses the existing broker::detail::val_to_data() @@ -45,9 +45,9 @@ class BrokerJsonV1_Serializer : public EventSerializer { public: BrokerJsonV1_Serializer() : EventSerializer("broker-json-v1") {} - bool SerializeEvent(zeek::cluster::detail::byte_buffer& buf, const detail::Event& event) override; + bool SerializeEvent(byte_buffer& buf, const detail::Event& event) override; - std::optional UnserializeEvent(detail::byte_buffer_span buf) override; + std::optional UnserializeEvent(byte_buffer_span buf) override; }; } // namespace zeek::cluster::detail diff --git a/src/storage/Serializer.h b/src/storage/Serializer.h index dea136fcb2..8b5643bf1e 100644 --- a/src/storage/Serializer.h +++ b/src/storage/Serializer.h @@ -2,16 +2,10 @@ #pragma once -#include "zeek/Span.h" #include "zeek/Val.h" namespace zeek::storage { -namespace detail { -using byte_buffer = std::vector; -using byte_buffer_span = Span; -} // namespace detail - /** * Base class for a serializer used by storage backends. */ @@ -27,7 +21,7 @@ public: * @return On success, a byte buffer containing the serialized data. std::nullopt will * be returned on failure. */ - virtual std::optional Serialize(ValPtr val) = 0; + virtual std::optional Serialize(ValPtr val) = 0; /** * Unserializes a byte buffer into Zeek Val objects of a specific type. @@ -38,7 +32,7 @@ public: * @return A zeek::expected containing either the unserialized Val data on success, or * a string containing an error message on failure. */ - virtual zeek::expected Unserialize(detail::byte_buffer_span buf, TypePtr type) = 0; + virtual zeek::expected Unserialize(byte_buffer_span buf, TypePtr type) = 0; protected: Serializer(std::string name) : name(std::move(name)) {} diff --git a/src/storage/serializer/json/JSON.cc b/src/storage/serializer/json/JSON.cc index 8625eaa0be..a639112524 100644 --- a/src/storage/serializer/json/JSON.cc +++ b/src/storage/serializer/json/JSON.cc @@ -10,8 +10,8 @@ std::unique_ptr JSON::Instantiate() { return std::make_unique( JSON::JSON() : Serializer("JSON") {} -std::optional JSON::Serialize(ValPtr val) { - detail::byte_buffer buf; +std::optional JSON::Serialize(ValPtr val) { + byte_buffer buf; auto json = val->ToJSON(); buf.reserve(json->Len()); @@ -21,7 +21,7 @@ std::optional JSON::Serialize(ValPtr val) { return buf; } -zeek::expected JSON::Unserialize(detail::byte_buffer_span buf, TypePtr type) { +zeek::expected JSON::Unserialize(byte_buffer_span buf, TypePtr type) { std::string_view text{reinterpret_cast(buf.data()), buf.size()}; return zeek::detail::ValFromJSON(text, type, Func::nil); } diff --git a/src/storage/serializer/json/JSON.h b/src/storage/serializer/json/JSON.h index 2b5475936a..c9789d6b9f 100644 --- a/src/storage/serializer/json/JSON.h +++ b/src/storage/serializer/json/JSON.h @@ -13,8 +13,8 @@ public: JSON(); ~JSON() override = default; - std::optional Serialize(ValPtr val) override; - zeek::expected Unserialize(detail::byte_buffer_span buf, TypePtr type) override; + std::optional Serialize(ValPtr val) override; + zeek::expected Unserialize(byte_buffer_span buf, TypePtr type) override; }; } // namespace zeek::storage::serializer::json diff --git a/src/util.h b/src/util.h index a59d2d1962..b5cff50e6e 100644 --- a/src/util.h +++ b/src/util.h @@ -104,6 +104,8 @@ template using unexpected = nonstd::unexpected; } // namespace zeek +#include "zeek/Span.h" + using zeek_int_t = int64_t; using zeek_uint_t = uint64_t; @@ -119,6 +121,10 @@ namespace zeek { class ODesc; class RecordVal; +// Byte buffer types used by serialization code in storage and cluster. +using byte_buffer = std::vector; +using byte_buffer_span = Span; + namespace util { namespace detail { diff --git a/testing/btest/plugins/storage-plugin/src/StorageDummy.h b/testing/btest/plugins/storage-plugin/src/StorageDummy.h index c295aee82e..3e1e5d2a9d 100644 --- a/testing/btest/plugins/storage-plugin/src/StorageDummy.h +++ b/testing/btest/plugins/storage-plugin/src/StorageDummy.h @@ -50,7 +50,7 @@ public: zeek::storage::OperationResult DoErase(zeek::storage::ResultCallback* cb, zeek::ValPtr key) override; private: - std::map data; + std::map data; bool open = false; };