Reimplement serialization infrastructure for OpaqueVals.

We need this to sender through Broker, and we also leverage it for
cloning opaques. The serialization methods now produce Broker data
instances directly, and no longer go through the binary formatter.

Summary of the new API for types derived from OpaqueVal:

    - Add DECLARE_OPAQUE_VALUE(<class>) to the class declaration
    - Add IMPLEMENT_OPAQUE_VALUE(<class>) to the class' implementation file
    - Implement these two methods (which are declated by the 1st macro):
        - broker::data DoSerialize() const
        - bool DoUnserialize(const broker::data& data)

This machinery should work correctly from dynamic plugins as well.

OpaqueVal provides a default implementation of DoClone() as well that
goes through serialization. Derived classes can provide a more
efficient version if they want.

The declaration of the "OpaqueVal" class has moved into the header
file "OpaqueVal.h", along with the new serialization infrastructure.
This is breaking existing code that relies on the location, but
because the API is changing anyways that seems fine.

This adds an internal BiF
"Broker::__opaque_clone_through_serialization" that does what the name
says: deep-copying an opaque by serializing, then-deserializing. That
can be used to tests the new functionality from btests.

Not quite done yet. TODO:
    - Not all tests pass yet:
        [  0%] language.named-set-ctors ... failed
        [ 16%] language.copy-all-opaques ... failed
        [ 33%] language.set-type-checking ... failed
        [ 50%] language.table-init-container-ctors ... failed
        [ 66%] coverage.sphinx-zeekygen-docs ... failed
        [ 83%] scripts.base.frameworks.sumstats.basic-cluster ... failed

      (Some of the serialization may still be buggy.)

    - Clean up the code a bit more.
This commit is contained in:
Robin Sommer 2019-06-15 21:19:21 +00:00
parent 1ce5521ecc
commit 01e662b3e0
28 changed files with 1556 additions and 52 deletions

View file

@ -505,6 +505,47 @@ uint64 BitVector::Hash() const
return digest;
}
broker::expected<broker::data> BitVector::Serialize() const
{
broker::vector v = {static_cast<uint64>(num_bits), static_cast<uint64>(bits.size())};
v.reserve(2 + bits.size());
for ( size_t i = 0; i < bits.size(); ++i )
v.emplace_back(static_cast<uint64>(bits[i]));
return {v};
}
std::unique_ptr<BitVector> BitVector::Unserialize(const broker::data& data)
{
auto v = caf::get_if<broker::vector>(&data);
if ( ! (v && v->size() >= 2) )
return nullptr;
auto num_bits = caf::get_if<uint64>(&(*v)[0]);
auto size = caf::get_if<uint64>(&(*v)[1]);
if ( ! (num_bits && size) )
return nullptr;
if ( v->size() != 2 + *size )
return nullptr;
auto bv = std::unique_ptr<BitVector>(new BitVector());
bv->num_bits = *num_bits;
for ( size_t i = 0; i < *size; ++i )
{
auto x = caf::get_if<uint64>(&(*v)[2 + i]);
if ( ! x )
return nullptr;
bv->bits.push_back(*x);
}
return std::move(bv);
}
BitVector::size_type BitVector::lowest_bit(block_type block)
{
block_type x = block - (block & (block - 1));

View file

@ -6,6 +6,8 @@
#include <iterator>
#include <vector>
#include <broker/Data.h>
namespace probabilistic {
/**
@ -281,6 +283,9 @@ public:
*/
uint64_t Hash() const;
broker::expected<broker::data> Serialize() const;
static std::unique_ptr<BitVector> Unserialize(const broker::data& data);
private:
/**
* Computes the number of excess/unused bits in the bit vector.

View file

@ -28,6 +28,51 @@ BloomFilter::~BloomFilter()
delete hasher;
}
broker::expected<broker::data> BloomFilter::Serialize() const
{
auto h = hasher->Serialize();
auto d = DoSerialize();
if ( (! h) || d == broker::none() )
return broker::ec::invalid_data; // Cannot serialize
return broker::vector{static_cast<uint64>(Type()), std::move(*h), std::move(d)};
}
std::unique_ptr<BloomFilter> BloomFilter::Unserialize(const broker::data& data)
{
auto v = caf::get_if<broker::vector>(&data);
if ( ! (v && v->size() == 3) )
return nullptr;
auto type = caf::get_if<uint64>(&(*v)[0]);
if ( ! type )
return nullptr;
auto hasher_ = Hasher::Unserialize((*v)[1]);
if ( ! hasher_ )
return nullptr;
std::unique_ptr<BloomFilter> bf;
switch ( *type ) {
case Basic:
bf = std::unique_ptr<BloomFilter>(new BasicBloomFilter());
break;
case Counting:
bf = std::unique_ptr<BloomFilter>(new CountingBloomFilter());
break;
}
if ( ! bf->DoUnserialize((*v)[2]) )
return nullptr;
bf->hasher = hasher_.release();
return std::move(bf);
}
size_t BasicBloomFilter::M(double fp, size_t capacity)
{
double ln2 = std::log(2);
@ -126,6 +171,25 @@ size_t BasicBloomFilter::Count(const HashKey* key) const
return 1;
}
broker::data BasicBloomFilter::DoSerialize() const
{
auto b = bits->Serialize();
if ( ! b )
return broker::none();
return *b;
}
bool BasicBloomFilter::DoUnserialize(const broker::data& data)
{
auto b = BitVector::Unserialize(data);
if ( ! b )
return false;
bits = b.release();
return true;
}
CountingBloomFilter::CountingBloomFilter()
{
cells = 0;
@ -217,3 +281,22 @@ size_t CountingBloomFilter::Count(const HashKey* key) const
return min;
}
broker::data CountingBloomFilter::DoSerialize() const
{
auto c = cells->Serialize();
if ( ! c )
return broker::none();
return *c;
}
bool CountingBloomFilter::DoUnserialize(const broker::data& data)
{
auto c = CounterVector::Unserialize(data);
if ( ! c )
return false;
cells = c.release();
return true;
}

View file

@ -4,6 +4,9 @@
#define PROBABILISTIC_BLOOMFILTER_H
#include <vector>
#include <broker/Data.h>
#include "BitVector.h"
#include "Hasher.h"
@ -11,6 +14,9 @@ namespace probabilistic {
class CounterVector;
/** Types of derived BloomFilter classes. */
enum BloomFilterType { Basic, Counting };
/**
* The abstract base class for Bloom filters.
*/
@ -71,6 +77,9 @@ public:
*/
virtual string InternalState() const = 0;
broker::expected<broker::data> Serialize() const;
static std::unique_ptr<BloomFilter> Unserialize(const broker::data& data);
protected:
/**
* Default constructor.
@ -84,6 +93,10 @@ protected:
*/
explicit BloomFilter(const Hasher* hasher);
virtual broker::data DoSerialize() const = 0;
virtual bool DoUnserialize(const broker::data& data) = 0;
virtual BloomFilterType Type() const = 0;
const Hasher* hasher;
};
@ -144,6 +157,8 @@ public:
string InternalState() const override;
protected:
friend class BloomFilter;
/**
* Default constructor.
*/
@ -152,6 +167,10 @@ protected:
// Overridden from BloomFilter.
void Add(const HashKey* key) override;
size_t Count(const HashKey* key) const override;
broker::data DoSerialize() const override;
bool DoUnserialize(const broker::data& data) override;
BloomFilterType Type() const override
{ return BloomFilterType::Basic; }
private:
BitVector* bits;
@ -187,6 +206,8 @@ public:
string InternalState() const override;
protected:
friend class BloomFilter;
/**
* Default constructor.
*/
@ -195,6 +216,10 @@ protected:
// Overridden from BloomFilter.
void Add(const HashKey* key) override;
size_t Count(const HashKey* key) const override;
broker::data DoSerialize() const override;
bool DoUnserialize(const broker::data& data) override;
BloomFilterType Type() const override
{ return BloomFilterType::Counting; }
private:
CounterVector* cells;

View file

@ -196,6 +196,47 @@ uint64_t CardinalityCounter::GetM() const
return m;
}
broker::expected<broker::data> CardinalityCounter::Serialize() const
{
broker::vector v = {m, V, alpha_m};
v.reserve(3 + m);
for ( size_t i = 0; i < m; ++i )
v.emplace_back(static_cast<uint64>(buckets[i]));
return {v};
}
std::unique_ptr<CardinalityCounter> CardinalityCounter::Unserialize(const broker::data& data)
{
auto v = caf::get_if<broker::vector>(&data);
if ( ! (v && v->size() >= 3) )
return nullptr;
auto m = caf::get_if<uint64>(&(*v)[0]);
auto V = caf::get_if<uint64>(&(*v)[1]);
auto alpha_m = caf::get_if<double>(&(*v)[2]);
if ( ! (m && V && alpha_m) )
return nullptr;
if ( v->size() != 3 + *m )
return nullptr;
auto cc = std::unique_ptr<CardinalityCounter>(new CardinalityCounter(*m, *V, *alpha_m));
for ( size_t i = 0; i < *m; ++i )
{
auto x = caf::get_if<uint64>(&(*v)[3 + i]);
if ( ! x )
return nullptr;
cc->buckets.push_back(*x);
}
return std::move(cc);
}
/**
* The following function is copied from libc/string/flsll.c from the FreeBSD source
* tree. Original copyright message follows

View file

@ -84,6 +84,9 @@ public:
*/
bool Merge(CardinalityCounter* c);
broker::expected<broker::data> Serialize() const;
static std::unique_ptr<CardinalityCounter> Unserialize(const broker::data& data);
protected:
/**
* Return the number of buckets.

View file

@ -158,3 +158,29 @@ uint64_t CounterVector::Hash() const
return bits->Hash();
}
broker::expected<broker::data> CounterVector::Serialize() const
{
auto b = bits->Serialize();
if ( ! b )
return broker::ec::invalid_data; // Cannot serialize
return broker::vector{static_cast<uint64>(width), std::move(*b)};
}
std::unique_ptr<CounterVector> CounterVector::Unserialize(const broker::data& data)
{
auto v = caf::get_if<broker::vector>(&data);
if ( ! (v && v->size() >= 2) )
return nullptr;
auto width = caf::get_if<uint64>(&(*v)[0]);
auto bits = BitVector::Unserialize((*v)[1]);
auto cv = std::unique_ptr<CounterVector>(new CounterVector());
cv->width = *width;
cv->bits = bits.release();
return std::move(cv);
}

View file

@ -6,6 +6,8 @@
#include <cstddef>
#include <cstdint>
#include <broker/Data.h>
namespace probabilistic {
class BitVector;
@ -134,6 +136,9 @@ public:
*/
uint64_t Hash() const;
broker::expected<broker::data> Serialize() const;
static std::unique_ptr<CounterVector> Unserialize(const broker::data& data);
protected:
friend CounterVector operator|(const CounterVector& x,
const CounterVector& y);

View file

@ -46,6 +46,47 @@ Hasher::Hasher(size_t arg_k, seed_t arg_seed)
seed = arg_seed;
}
broker::expected<broker::data> Hasher::Serialize() const
{
return broker::vector{
static_cast<uint64>(Type()), static_cast<uint64>(k),
seed.h1, seed.h2 };
}
std::unique_ptr<Hasher> Hasher::Unserialize(const broker::data& data)
{
auto v = caf::get_if<broker::vector>(&data);
if ( ! (v && v->size() == 4) )
return nullptr;
auto type = caf::get_if<uint64>(&(*v)[0]);
auto k = caf::get_if<uint64>(&(*v)[1]);
auto h1 = caf::get_if<uint64>(&(*v)[2]);
auto h2 = caf::get_if<uint64>(&(*v)[3]);
if ( ! (type && k && h1 && h2) )
return nullptr;
std::unique_ptr<Hasher> hasher;
switch ( *type ) {
case Default:
hasher = std::unique_ptr<Hasher>(new DefaultHasher(*k, {*h1, *h2}));
break;
case Double:
hasher = std::unique_ptr<Hasher>(new DoubleHasher(*k, {*h1, *h2}));
break;
}
// Note that the derived classed don't hold any further state of
// their own. They reconstruct all their information from their
// constructors' arguments.
return std::move(hasher);
}
UHF::UHF()
{
memset(&seed, 0, sizeof(seed));

View file

@ -3,10 +3,15 @@
#ifndef PROBABILISTIC_HASHER_H
#define PROBABILISTIC_HASHER_H
#include <broker/Data.h>
#include "Hash.h"
namespace probabilistic {
/** Types of derived Hasher classes. */
enum HasherType { Default, Double };
/**
* Abstract base class for hashers. A hasher creates a family of hash
* functions to hash an element *k* times.
@ -98,6 +103,9 @@ public:
*/
seed_t Seed() const { return seed; }
broker::expected<broker::data> Serialize() const;
static std::unique_ptr<Hasher> Unserialize(const broker::data& data);
protected:
Hasher() { }
@ -110,6 +118,8 @@ protected:
*/
Hasher(size_t arg_k, seed_t arg_seed);
virtual HasherType Type() const = 0;
private:
size_t k;
seed_t seed;
@ -175,6 +185,9 @@ public:
return ! (x == y);
}
broker::expected<broker::data> Serialize() const;
static UHF Unserialize(const broker::data& data);
private:
static size_t compute_seed(Hasher::seed_t seed);
@ -205,6 +218,9 @@ public:
private:
DefaultHasher() { }
HasherType Type() const override
{ return HasherType::Default; }
std::vector<UHF> hash_functions;
};
@ -231,6 +247,9 @@ public:
private:
DoubleHasher() { }
HasherType Type() const override
{ return HasherType::Double; }
UHF h1;
UHF h2;
};

View file

@ -1,5 +1,6 @@
// See the file "COPYING" in the main distribution directory for copyright.
#include "broker/Data.h"
#include "probabilistic/Topk.h"
#include "CompHash.h"
#include "Reporter.h"
@ -405,4 +406,126 @@ void TopkVal::IncrementCounter(Element* e, unsigned int count)
}
}
};
IMPLEMENT_OPAQUE_VALUE(TopkVal)
broker::data TopkVal::DoSerialize() const
{
broker::vector d = {size, numElements, pruned};
if ( type )
{
auto t = SerializeType(type);
if ( t == broker::none() )
return broker::none();
d.emplace_back(t);
}
else
d.emplace_back(broker::none());
uint64_t i = 0;
std::list<Bucket*>::const_iterator it = buckets.begin();
while ( it != buckets.end() )
{
Bucket* b = *it;
uint32_t elements_count = b->elements.size();
d.emplace_back(static_cast<uint64>(b->elements.size()));
d.emplace_back(b->count);
std::list<Element*>::const_iterator eit = b->elements.begin();
while ( eit != b->elements.end() )
{
Element* element = *eit;
d.emplace_back(element->epsilon);
auto v = bro_broker::val_to_data(element->value);
if ( ! v )
return broker::none();
d.emplace_back(*v);
eit++;
i++;
}
it++;
}
assert(i == numElements);
return d;
}
bool TopkVal::DoUnserialize(const broker::data& data)
{
auto v = caf::get_if<broker::vector>(&data);
if ( ! (v && v->size() >= 4) )
return false;
auto size_ = caf::get_if<uint64>(&(*v)[0]);
auto numElements_ = caf::get_if<uint64>(&(*v)[1]);
auto pruned_ = caf::get_if<bool>(&(*v)[2]);
if ( ! (size_ && numElements_ && pruned_) )
return false;
size = *size_;
numElements = *numElements_;
pruned = *pruned_;
auto no_type = caf::get_if<broker::none>(&(*v)[3]);
if ( ! no_type )
{
BroType* t = UnserializeType((*v)[3]);
if ( ! t )
return false;
Typify(t);
Unref(t);
}
uint64_t i = 0;
uint64_t idx = 4;
while ( i < numElements )
{
Bucket* b = new Bucket();
auto elements_count = caf::get_if<uint64>(&(*v)[idx++]);
auto count = caf::get_if<uint64>(&(*v)[idx++]);
if ( ! (elements_count && count) )
return false;
b->count = *count;
b->bucketPos = buckets.insert(buckets.end(), b);
for ( uint64_t j = 0; j < *elements_count; j++ )
{
Element* e = new Element();
auto epsilon = caf::get_if<uint64>(&(*v)[idx++]);
Val* val = bro_broker::data_to_val((*v)[idx++], type);
if ( ! (epsilon && val) )
return false;
e->epsilon = *epsilon;
e->value = val;
e->parent = b;
b->elements.insert(b->elements.end(), e);
HashKey* key = GetHash(e->value);
assert (elementDict->Lookup(key) == 0);
elementDict->Insert(key, e);
delete key;
i++;
}
}
assert(i == numElements);
return true;
}
}

View file

@ -131,6 +131,8 @@ public:
*/
Val* DoClone(CloneState* state) override;
DECLARE_OPAQUE_VALUE(TopkVal)
protected:
/**
* Construct an empty TopkVal. Only used for deserialization