From bb9e6583e0b113aa82553c4f55341034d61d87a2 Mon Sep 17 00:00:00 2001 From: Jon Siwek Date: Tue, 3 Feb 2015 13:54:40 -0600 Subject: [PATCH] broker integration: Comm::Data/Store::Handle opaque serialization For now, this is needed when locally cloning Vals. E.g. "when" statements will clone an entire frame and data store queries use "when" statements, so it's likely there will be locals of these opaque types that get cloned. --- src/SerialTypes.h | 2 + src/comm/CMakeLists.txt | 3 ++ src/comm/Data.cc | 34 +++++++++++++++++ src/comm/Data.h | 8 +++- src/comm/Manager.cc | 1 + src/comm/Store.cc | 82 +++++++++++++++++++++++++++++++++++++---- src/comm/Store.h | 12 ++++-- 7 files changed, 130 insertions(+), 12 deletions(-) diff --git a/src/SerialTypes.h b/src/SerialTypes.h index d2f227838c..4e6bbb11ac 100644 --- a/src/SerialTypes.h +++ b/src/SerialTypes.h @@ -113,6 +113,8 @@ SERIAL_VAL(TOPK_VAL, 20) SERIAL_VAL(BLOOMFILTER_VAL, 21) SERIAL_VAL(CARDINALITY_VAL, 22) SERIAL_VAL(X509_VAL, 23) +SERIAL_VAL(COMM_STORE_HANDLE_VAL, 24) +SERIAL_VAL(COMM_DATA_VAL, 25) #define SERIAL_EXPR(name, val) SERIAL_CONST(name, val, EXPR) SERIAL_EXPR(EXPR, 1) diff --git a/src/comm/CMakeLists.txt b/src/comm/CMakeLists.txt index da726e54d6..6453e006bf 100644 --- a/src/comm/CMakeLists.txt +++ b/src/comm/CMakeLists.txt @@ -10,6 +10,9 @@ if ( ROCKSDB_INCLUDE_DIR ) include_directories(BEFORE ${ROCKSDB_INCLUDE_DIR}) endif () +include_directories(BEFORE ${LIBCAF_INCLUDE_DIR_CORE}) +include_directories(BEFORE ${LIBCAF_INCLUDE_DIR_IO}) + set(comm_SRCS Data.cc Manager.cc diff --git a/src/comm/Data.cc b/src/comm/Data.cc index 3b1a240988..0ea7666f9e 100644 --- a/src/comm/Data.cc +++ b/src/comm/Data.cc @@ -1,5 +1,7 @@ #include "Data.h" #include "comm/data.bif.h" +#include +#include using namespace std; @@ -663,3 +665,35 @@ broker::data& comm::opaque_field_to_data(RecordVal* v, Frame* f) return static_cast(d)->data; } + +IMPLEMENT_SERIAL(comm::DataVal, SER_COMM_DATA_VAL); + +bool comm::DataVal::DoSerialize(SerialInfo* info) const + { + DO_SERIALIZE(SER_COMM_DATA_VAL, OpaqueVal); + + std::string serial; + caf::binary_serializer bs(std::back_inserter(serial)); + bs << data; + + if ( ! SERIALIZE_STR(serial.data(), serial.size()) ) + return false; + + return true; + } + +bool comm::DataVal::DoUnserialize(UnserialInfo* info) + { + DO_UNSERIALIZE(OpaqueVal); + + const char* serial; + int len; + + if ( ! UNSERIALIZE_STR(&serial, &len) ) + return false; + + caf::binary_deserializer bd(serial, len); + caf::uniform_typeid()->deserialize(&data, &bd); + delete [] serial; + return true; + } diff --git a/src/comm/Data.h b/src/comm/Data.h index da10853127..ed3c16f677 100644 --- a/src/comm/Data.h +++ b/src/comm/Data.h @@ -27,7 +27,6 @@ broker::util::optional val_to_data(Val* v); Val* data_to_val(broker::data d, BroType* type); -// TODO: actually need to implement Bro's serialization to support copying vals class DataVal : public OpaqueVal { public: @@ -42,7 +41,14 @@ public: d->Add("}"); } + DECLARE_SERIAL(DataVal); + broker::data data; + +protected: + + DataVal() + {} }; struct type_name_getter { diff --git a/src/comm/Manager.cc b/src/comm/Manager.cc index 3d6aad4d1e..cfce84a1c9 100644 --- a/src/comm/Manager.cc +++ b/src/comm/Manager.cc @@ -787,6 +787,7 @@ bool comm::Manager::CloseStore(const broker::store::identifier& id, ++it; } + delete it->second->store; it->second->store = nullptr; Unref(it->second); return true; diff --git a/src/comm/Store.cc b/src/comm/Store.cc index 0d94795ce8..8c55c31785 100644 --- a/src/comm/Store.cc +++ b/src/comm/Store.cc @@ -78,17 +78,15 @@ comm::StoreHandleVal::StoreHandleVal(broker::store::identifier id, switch ( store_type ) { case StoreType::FRONTEND: - store.reset(new broker::store::frontend(comm_mgr->Endpoint(), - move(id))); + store = new broker::store::frontend(comm_mgr->Endpoint(), move(id)); break; case StoreType::MASTER: - store.reset(new broker::store::master(comm_mgr->Endpoint(), - move(id), move(backend))); + store = new broker::store::master(comm_mgr->Endpoint(), move(id), + move(backend)); break; case StoreType::CLONE: - store.reset(new broker::store::clone(comm_mgr->Endpoint(), - move(id), resync, - move(backend))); + store = new broker::store::clone(comm_mgr->Endpoint(), move(id), resync, + move(backend)); break; default: reporter->FatalError("unknown data store type: %d", @@ -139,3 +137,73 @@ void comm::StoreHandleVal::ValDescribe(ODesc* d) const d->Add("}"); } + +IMPLEMENT_SERIAL(comm::StoreHandleVal, SER_COMM_STORE_HANDLE_VAL); + +bool comm::StoreHandleVal::DoSerialize(SerialInfo* info) const + { + DO_SERIALIZE(SER_COMM_STORE_HANDLE_VAL, OpaqueVal); + + bool have_store = store != nullptr; + + if ( ! SERIALIZE(have_store) ) + return false; + + if ( ! have_store ) + return true; + + if ( ! SERIALIZE(static_cast(store_type)) ) + return false; + + if ( ! SERIALIZE_STR(store->id().data(), store->id().size()) ) + return false; + + return true; + } + +bool comm::StoreHandleVal::DoUnserialize(UnserialInfo* info) + { + DO_UNSERIALIZE(OpaqueVal); + + bool have_store; + + if ( ! UNSERIALIZE(&have_store) ) + return false; + + if ( ! have_store ) + { + store = nullptr; + return true; + } + + int type; + + if ( ! UNSERIALIZE(&type) ) + return false; + + const char* id_str; + int len; + + if ( ! UNSERIALIZE_STR(&id_str, &len) ) + return false; + + broker::store::identifier id(id_str, len); + delete [] id_str; + + auto handle = comm_mgr->LookupStore(id, static_cast(type)); + + if ( ! handle ) + { + // Passing serialized version of store handles to other Bro processes + // doesn't make sense, only allow local clones of the handle val. + reporter->Error("failed to look up unserialized store handle %s, %d", + id.data(), type); + store = nullptr; + return false; + } + + store = handle->store; + store_type = handle->store_type; + backend_type = handle->backend_type; + return true; + } diff --git a/src/comm/Store.h b/src/comm/Store.h index 3183afbbb3..b02c5b4f5b 100644 --- a/src/comm/Store.h +++ b/src/comm/Store.h @@ -98,9 +98,6 @@ private: StoreType store_type; }; -// TODO: actually need to implement Bro's serialization to support copying vals -// but doesn't make sense to "copy" a master data store, so assert we can -// lookup a store by pair locally (i.e. shouldn't send handles remotely). class StoreHandleVal : public OpaqueVal { public: @@ -112,9 +109,16 @@ public: void ValDescribe(ODesc* d) const override; - std::unique_ptr store; + DECLARE_SERIAL(StoreHandleVal); + + broker::store::frontend* store; comm::StoreType store_type; broker::util::optional backend_type; + +protected: + + StoreHandleVal() + {} }; } // namespace comm