broker integration: Comm::Data/Store::Handle opaque serialization

For now, this is needed when locally cloning Vals.  E.g. "when"
statements will clone an entire frame and data store queries use "when"
statements, so it's likely there will be locals of these opaque types
that get cloned.
This commit is contained in:
Jon Siwek 2015-02-03 13:54:40 -06:00
parent 441c46df76
commit bb9e6583e0
7 changed files with 130 additions and 12 deletions

View file

@ -113,6 +113,8 @@ SERIAL_VAL(TOPK_VAL, 20)
SERIAL_VAL(BLOOMFILTER_VAL, 21)
SERIAL_VAL(CARDINALITY_VAL, 22)
SERIAL_VAL(X509_VAL, 23)
SERIAL_VAL(COMM_STORE_HANDLE_VAL, 24)
SERIAL_VAL(COMM_DATA_VAL, 25)
#define SERIAL_EXPR(name, val) SERIAL_CONST(name, val, EXPR)
SERIAL_EXPR(EXPR, 1)

View file

@ -10,6 +10,9 @@ if ( ROCKSDB_INCLUDE_DIR )
include_directories(BEFORE ${ROCKSDB_INCLUDE_DIR})
endif ()
include_directories(BEFORE ${LIBCAF_INCLUDE_DIR_CORE})
include_directories(BEFORE ${LIBCAF_INCLUDE_DIR_IO})
set(comm_SRCS
Data.cc
Manager.cc

View file

@ -1,5 +1,7 @@
#include "Data.h"
#include "comm/data.bif.h"
#include <caf/binary_serializer.hpp>
#include <caf/binary_deserializer.hpp>
using namespace std;
@ -663,3 +665,35 @@ broker::data& comm::opaque_field_to_data(RecordVal* v, Frame* f)
return static_cast<DataVal*>(d)->data;
}
IMPLEMENT_SERIAL(comm::DataVal, SER_COMM_DATA_VAL);
bool comm::DataVal::DoSerialize(SerialInfo* info) const
{
DO_SERIALIZE(SER_COMM_DATA_VAL, OpaqueVal);
std::string serial;
caf::binary_serializer bs(std::back_inserter(serial));
bs << data;
if ( ! SERIALIZE_STR(serial.data(), serial.size()) )
return false;
return true;
}
bool comm::DataVal::DoUnserialize(UnserialInfo* info)
{
DO_UNSERIALIZE(OpaqueVal);
const char* serial;
int len;
if ( ! UNSERIALIZE_STR(&serial, &len) )
return false;
caf::binary_deserializer bd(serial, len);
caf::uniform_typeid<broker::data>()->deserialize(&data, &bd);
delete [] serial;
return true;
}

View file

@ -27,7 +27,6 @@ broker::util::optional<broker::data> val_to_data(Val* v);
Val* data_to_val(broker::data d, BroType* type);
// TODO: actually need to implement Bro's serialization to support copying vals
class DataVal : public OpaqueVal {
public:
@ -42,7 +41,14 @@ public:
d->Add("}");
}
DECLARE_SERIAL(DataVal);
broker::data data;
protected:
DataVal()
{}
};
struct type_name_getter {

View file

@ -787,6 +787,7 @@ bool comm::Manager::CloseStore(const broker::store::identifier& id,
++it;
}
delete it->second->store;
it->second->store = nullptr;
Unref(it->second);
return true;

View file

@ -78,17 +78,15 @@ comm::StoreHandleVal::StoreHandleVal(broker::store::identifier id,
switch ( store_type ) {
case StoreType::FRONTEND:
store.reset(new broker::store::frontend(comm_mgr->Endpoint(),
move(id)));
store = new broker::store::frontend(comm_mgr->Endpoint(), move(id));
break;
case StoreType::MASTER:
store.reset(new broker::store::master(comm_mgr->Endpoint(),
move(id), move(backend)));
store = new broker::store::master(comm_mgr->Endpoint(), move(id),
move(backend));
break;
case StoreType::CLONE:
store.reset(new broker::store::clone(comm_mgr->Endpoint(),
move(id), resync,
move(backend)));
store = new broker::store::clone(comm_mgr->Endpoint(), move(id), resync,
move(backend));
break;
default:
reporter->FatalError("unknown data store type: %d",
@ -139,3 +137,73 @@ void comm::StoreHandleVal::ValDescribe(ODesc* d) const
d->Add("}");
}
IMPLEMENT_SERIAL(comm::StoreHandleVal, SER_COMM_STORE_HANDLE_VAL);
bool comm::StoreHandleVal::DoSerialize(SerialInfo* info) const
{
DO_SERIALIZE(SER_COMM_STORE_HANDLE_VAL, OpaqueVal);
bool have_store = store != nullptr;
if ( ! SERIALIZE(have_store) )
return false;
if ( ! have_store )
return true;
if ( ! SERIALIZE(static_cast<int>(store_type)) )
return false;
if ( ! SERIALIZE_STR(store->id().data(), store->id().size()) )
return false;
return true;
}
bool comm::StoreHandleVal::DoUnserialize(UnserialInfo* info)
{
DO_UNSERIALIZE(OpaqueVal);
bool have_store;
if ( ! UNSERIALIZE(&have_store) )
return false;
if ( ! have_store )
{
store = nullptr;
return true;
}
int type;
if ( ! UNSERIALIZE(&type) )
return false;
const char* id_str;
int len;
if ( ! UNSERIALIZE_STR(&id_str, &len) )
return false;
broker::store::identifier id(id_str, len);
delete [] id_str;
auto handle = comm_mgr->LookupStore(id, static_cast<comm::StoreType>(type));
if ( ! handle )
{
// Passing serialized version of store handles to other Bro processes
// doesn't make sense, only allow local clones of the handle val.
reporter->Error("failed to look up unserialized store handle %s, %d",
id.data(), type);
store = nullptr;
return false;
}
store = handle->store;
store_type = handle->store_type;
backend_type = handle->backend_type;
return true;
}

View file

@ -98,9 +98,6 @@ private:
StoreType store_type;
};
// TODO: actually need to implement Bro's serialization to support copying vals
// but doesn't make sense to "copy" a master data store, so assert we can
// lookup a store by <id, type> pair locally (i.e. shouldn't send handles remotely).
class StoreHandleVal : public OpaqueVal {
public:
@ -112,9 +109,16 @@ public:
void ValDescribe(ODesc* d) const override;
std::unique_ptr<broker::store::frontend> store;
DECLARE_SERIAL(StoreHandleVal);
broker::store::frontend* store;
comm::StoreType store_type;
broker::util::optional<BifEnum::Store::BackendType> backend_type;
protected:
StoreHandleVal()
{}
};
} // namespace comm