diff --git a/src/SerialTypes.h b/src/SerialTypes.h index d2f227838c..4e6bbb11ac 100644 --- a/src/SerialTypes.h +++ b/src/SerialTypes.h @@ -113,6 +113,8 @@ SERIAL_VAL(TOPK_VAL, 20) SERIAL_VAL(BLOOMFILTER_VAL, 21) SERIAL_VAL(CARDINALITY_VAL, 22) SERIAL_VAL(X509_VAL, 23) +SERIAL_VAL(COMM_STORE_HANDLE_VAL, 24) +SERIAL_VAL(COMM_DATA_VAL, 25) #define SERIAL_EXPR(name, val) SERIAL_CONST(name, val, EXPR) SERIAL_EXPR(EXPR, 1) diff --git a/src/comm/CMakeLists.txt b/src/comm/CMakeLists.txt index da726e54d6..6453e006bf 100644 --- a/src/comm/CMakeLists.txt +++ b/src/comm/CMakeLists.txt @@ -10,6 +10,9 @@ if ( ROCKSDB_INCLUDE_DIR ) include_directories(BEFORE ${ROCKSDB_INCLUDE_DIR}) endif () +include_directories(BEFORE ${LIBCAF_INCLUDE_DIR_CORE}) +include_directories(BEFORE ${LIBCAF_INCLUDE_DIR_IO}) + set(comm_SRCS Data.cc Manager.cc diff --git a/src/comm/Data.cc b/src/comm/Data.cc index 3b1a240988..0ea7666f9e 100644 --- a/src/comm/Data.cc +++ b/src/comm/Data.cc @@ -1,5 +1,7 @@ #include "Data.h" #include "comm/data.bif.h" +#include +#include using namespace std; @@ -663,3 +665,35 @@ broker::data& comm::opaque_field_to_data(RecordVal* v, Frame* f) return static_cast(d)->data; } + +IMPLEMENT_SERIAL(comm::DataVal, SER_COMM_DATA_VAL); + +bool comm::DataVal::DoSerialize(SerialInfo* info) const + { + DO_SERIALIZE(SER_COMM_DATA_VAL, OpaqueVal); + + std::string serial; + caf::binary_serializer bs(std::back_inserter(serial)); + bs << data; + + if ( ! SERIALIZE_STR(serial.data(), serial.size()) ) + return false; + + return true; + } + +bool comm::DataVal::DoUnserialize(UnserialInfo* info) + { + DO_UNSERIALIZE(OpaqueVal); + + const char* serial; + int len; + + if ( ! UNSERIALIZE_STR(&serial, &len) ) + return false; + + caf::binary_deserializer bd(serial, len); + caf::uniform_typeid()->deserialize(&data, &bd); + delete [] serial; + return true; + } diff --git a/src/comm/Data.h b/src/comm/Data.h index da10853127..ed3c16f677 100644 --- a/src/comm/Data.h +++ b/src/comm/Data.h @@ -27,7 +27,6 @@ broker::util::optional val_to_data(Val* v); Val* data_to_val(broker::data d, BroType* type); -// TODO: actually need to implement Bro's serialization to support copying vals class DataVal : public OpaqueVal { public: @@ -42,7 +41,14 @@ public: d->Add("}"); } + DECLARE_SERIAL(DataVal); + broker::data data; + +protected: + + DataVal() + {} }; struct type_name_getter { diff --git a/src/comm/Manager.cc b/src/comm/Manager.cc index 3d6aad4d1e..cfce84a1c9 100644 --- a/src/comm/Manager.cc +++ b/src/comm/Manager.cc @@ -787,6 +787,7 @@ bool comm::Manager::CloseStore(const broker::store::identifier& id, ++it; } + delete it->second->store; it->second->store = nullptr; Unref(it->second); return true; diff --git a/src/comm/Store.cc b/src/comm/Store.cc index 0d94795ce8..8c55c31785 100644 --- a/src/comm/Store.cc +++ b/src/comm/Store.cc @@ -78,17 +78,15 @@ comm::StoreHandleVal::StoreHandleVal(broker::store::identifier id, switch ( store_type ) { case StoreType::FRONTEND: - store.reset(new broker::store::frontend(comm_mgr->Endpoint(), - move(id))); + store = new broker::store::frontend(comm_mgr->Endpoint(), move(id)); break; case StoreType::MASTER: - store.reset(new broker::store::master(comm_mgr->Endpoint(), - move(id), move(backend))); + store = new broker::store::master(comm_mgr->Endpoint(), move(id), + move(backend)); break; case StoreType::CLONE: - store.reset(new broker::store::clone(comm_mgr->Endpoint(), - move(id), resync, - move(backend))); + store = new broker::store::clone(comm_mgr->Endpoint(), move(id), resync, + move(backend)); break; default: reporter->FatalError("unknown data store type: %d", @@ -139,3 +137,73 @@ void comm::StoreHandleVal::ValDescribe(ODesc* d) const d->Add("}"); } + +IMPLEMENT_SERIAL(comm::StoreHandleVal, SER_COMM_STORE_HANDLE_VAL); + +bool comm::StoreHandleVal::DoSerialize(SerialInfo* info) const + { + DO_SERIALIZE(SER_COMM_STORE_HANDLE_VAL, OpaqueVal); + + bool have_store = store != nullptr; + + if ( ! SERIALIZE(have_store) ) + return false; + + if ( ! have_store ) + return true; + + if ( ! SERIALIZE(static_cast(store_type)) ) + return false; + + if ( ! SERIALIZE_STR(store->id().data(), store->id().size()) ) + return false; + + return true; + } + +bool comm::StoreHandleVal::DoUnserialize(UnserialInfo* info) + { + DO_UNSERIALIZE(OpaqueVal); + + bool have_store; + + if ( ! UNSERIALIZE(&have_store) ) + return false; + + if ( ! have_store ) + { + store = nullptr; + return true; + } + + int type; + + if ( ! UNSERIALIZE(&type) ) + return false; + + const char* id_str; + int len; + + if ( ! UNSERIALIZE_STR(&id_str, &len) ) + return false; + + broker::store::identifier id(id_str, len); + delete [] id_str; + + auto handle = comm_mgr->LookupStore(id, static_cast(type)); + + if ( ! handle ) + { + // Passing serialized version of store handles to other Bro processes + // doesn't make sense, only allow local clones of the handle val. + reporter->Error("failed to look up unserialized store handle %s, %d", + id.data(), type); + store = nullptr; + return false; + } + + store = handle->store; + store_type = handle->store_type; + backend_type = handle->backend_type; + return true; + } diff --git a/src/comm/Store.h b/src/comm/Store.h index 3183afbbb3..b02c5b4f5b 100644 --- a/src/comm/Store.h +++ b/src/comm/Store.h @@ -98,9 +98,6 @@ private: StoreType store_type; }; -// TODO: actually need to implement Bro's serialization to support copying vals -// but doesn't make sense to "copy" a master data store, so assert we can -// lookup a store by pair locally (i.e. shouldn't send handles remotely). class StoreHandleVal : public OpaqueVal { public: @@ -112,9 +109,16 @@ public: void ValDescribe(ODesc* d) const override; - std::unique_ptr store; + DECLARE_SERIAL(StoreHandleVal); + + broker::store::frontend* store; comm::StoreType store_type; broker::util::optional backend_type; + +protected: + + StoreHandleVal() + {} }; } // namespace comm