get hll ready for merging

This commit is contained in:
Bernhard Amann 2013-07-30 16:47:26 -07:00
parent 5b9d80e50d
commit 18c10f3cb5
9 changed files with 240 additions and 218 deletions

View file

@ -7,75 +7,6 @@
#include "probabilistic/HyperLogLog.h"
CardinalityVal::CardinalityVal() : OpaqueVal(cardinality_type)
{
valid = false;
}
CardinalityVal::~CardinalityVal()
{
if ( valid && c != 0 )
delete c;
c = 0;
valid = false;
}
IMPLEMENT_SERIAL(CardinalityVal, SER_CARDINALITY_VAL);
bool CardinalityVal::DoSerialize(SerialInfo* info) const
{
DO_SERIALIZE(SER_CARDINALITY_VAL, OpaqueVal);
bool serialvalid = true;
serialvalid &= SERIALIZE(&valid);
if ( ! IsValid() )
return serialvalid;
assert(c);
serialvalid &= SERIALIZE(c->m);
serialvalid &= SERIALIZE(c->V);
serialvalid &= SERIALIZE(c->alpha_m);
for ( unsigned int i = 0; i < c->m; i++ )
serialvalid &= SERIALIZE( c->buckets[i] );
return serialvalid;
}
bool CardinalityVal::DoUnserialize(UnserialInfo* info)
{
DO_UNSERIALIZE(OpaqueVal);
bool serialvalid = UNSERIALIZE(&valid);
if ( ! IsValid() )
return serialvalid;
uint64_t m;
serialvalid &= UNSERIALIZE(&m);
c = new probabilistic::CardinalityCounter(m);
serialvalid &= UNSERIALIZE(&c->V);
serialvalid &= UNSERIALIZE(&c->alpha_m);
uint8_t* buckets = c->buckets;
for ( unsigned int i = 0; i < m; i++ )
{
uint8_t* currbucket = buckets + i;
serialvalid &= UNSERIALIZE( currbucket );
}
return valid;
}
bool CardinalityVal::Init(probabilistic::CardinalityCounter* arg_c)
{
if ( valid )
return false;
valid = true;
c = arg_c;
return valid;
}
bool HashVal::IsValid() const
{
@ -738,3 +669,105 @@ bool BloomFilterVal::DoUnserialize(UnserialInfo* info)
bloom_filter = probabilistic::BloomFilter::Unserialize(info);
return bloom_filter != 0;
}
CardinalityVal::CardinalityVal() : OpaqueVal(cardinality_type)
{
c = 0;
type = 0;
hash = 0;
}
CardinalityVal::CardinalityVal(probabilistic::CardinalityCounter* arg_c) : OpaqueVal(cardinality_type)
{
c = arg_c;
type = 0;
hash = 0;
}
CardinalityVal::~CardinalityVal()
{
Unref(type);
delete c;
delete hash;
}
IMPLEMENT_SERIAL(CardinalityVal, SER_CARDINALITY_VAL);
bool CardinalityVal::DoSerialize(SerialInfo* info) const
{
DO_SERIALIZE(SER_CARDINALITY_VAL, OpaqueVal);
bool valid = true;
bool is_typed = (type != 0);
valid &= SERIALIZE(is_typed);
if ( is_typed )
valid &= type->Serialize(info);
assert(c);
valid &= SERIALIZE(c->m);
valid &= SERIALIZE(c->V);
valid &= SERIALIZE(c->alpha_m);
for ( unsigned int i = 0; i < c->m; i++ )
valid &= SERIALIZE( c->buckets[i] );
return valid;
}
bool CardinalityVal::DoUnserialize(UnserialInfo* info)
{
DO_UNSERIALIZE(OpaqueVal);
uint64_t m;
bool valid = true;
bool is_typed;
if ( ! UNSERIALIZE(&is_typed) )
return false;
if ( is_typed )
{
BroType* t = BroType::Unserialize(info);
if ( ! Typify(t) )
return false;
Unref(t);
}
valid &= UNSERIALIZE(&m);
c = new probabilistic::CardinalityCounter(m);
valid &= UNSERIALIZE(&c->V);
valid &= UNSERIALIZE(&c->alpha_m);
uint8_t* buckets = c->buckets;
for ( unsigned int i = 0; i < m; i++ )
{
uint8_t* currbucket = buckets + i;
valid &= UNSERIALIZE( currbucket );
}
return valid;
}
bool CardinalityVal::Typify(BroType* arg_type)
{
if ( type )
return false;
type = arg_type;
type->Ref();
TypeList* tl = new TypeList(type);
tl->Append(type);
hash = new CompositeHash(tl);
Unref(tl);
return true;
}
BroType* CardinalityVal::Type() const
{
return type;
}

View file

@ -15,22 +15,6 @@ namespace probabilistic {
class CardinalityCounter;
}
class CardinalityVal: public OpaqueVal {
public:
CardinalityVal();
~CardinalityVal();
bool Init(probabilistic::CardinalityCounter*);
bool IsValid() const { return valid; };
probabilistic::CardinalityCounter* Get() { return c; };
private:
bool valid;
probabilistic::CardinalityCounter* c;
DECLARE_SERIAL(CardinalityVal);
};
class HashVal : public OpaqueVal {
public:
virtual bool IsValid() const;
@ -164,4 +148,26 @@ private:
probabilistic::BloomFilter* bloom_filter;
};
class CardinalityVal: public OpaqueVal {
public:
explicit CardinalityVal(probabilistic::CardinalityCounter*);
virtual ~CardinalityVal();
BroType* Type() const;
bool Typify(BroType* type);
probabilistic::CardinalityCounter* Get() { return c; };
protected:
CardinalityVal();
private:
BroType* type;
CompositeHash* hash;
probabilistic::CardinalityCounter* c;
DECLARE_SERIAL(CardinalityVal);
};
#endif

View file

@ -4,23 +4,24 @@
#include <stdint.h>
#include "HyperLogLog.h"
#include <iostream>
#include "Reporter.h"
using namespace probabilistic;
int CardinalityCounter::optimalB(double error)
{
double initial_estimate = 2*(log(1.04)-log(error))/log(2);
int answer = (int) floor(initial_estimate);
double k;
double initial_estimate = 2*(log(1.04)-log(error))/log(2);
int answer = (int) floor(initial_estimate);
double k;
do
{
answer++;
k = pow(2, (answer - initial_estimate)/2);
}
while (erf(k/sqrt(2)) < HLL_CONF);
do
{
answer++;
k = pow(2, (answer - initial_estimate)/2);
}
while (erf(k/sqrt(2)) < HLL_CONF);
return answer;
return answer;
}
CardinalityCounter::CardinalityCounter(uint64_t size)
@ -28,14 +29,16 @@ CardinalityCounter::CardinalityCounter(uint64_t size)
m = size;
buckets = new uint8_t[m];
if(m == 16)
if (m == 16)
alpha_m = 0.673;
else if(m == 32)
else if (m == 32)
alpha_m = 0.697;
else if(m == 64)
else if (m == 64)
alpha_m = 0.709;
else
else if (m >= 128)
alpha_m = 0.7213/(1+1.079/m);
else
reporter->InternalError("Invalid size %lld. Size either has to be 16, 32, 64 or bigger than 128", size);
for (uint64_t i = 0; i < m; i++)
buckets[i] = 0;
@ -55,8 +58,10 @@ CardinalityCounter::CardinalityCounter(double error_margin)
alpha_m = 0.697;
else if(m == 64)
alpha_m = 0.709;
else
else if(m >= 128)
alpha_m = 0.7213/(1+1.079/m);
else
reporter->InternalError("Invalid m %lld calculated for error margin %f", m, error_margin);
for (uint64_t i = 0; i < m; i++)
buckets[i] = 0;
@ -96,7 +101,7 @@ void CardinalityCounter::addElement(uint64_t hash)
if (temp > buckets[index])
buckets[index] = temp;
}
}
double CardinalityCounter::size()
{
@ -113,7 +118,7 @@ double CardinalityCounter::size()
return answer;
else
return -pow(2,64)*log(1-answer/pow(2,64));
}
}
void CardinalityCounter::merge(CardinalityCounter* c)
{

View file

@ -14,64 +14,88 @@ using namespace probabilistic;
module GLOBAL;
## Initializes the hash for the HyperLogLog cardinality counting algorithm.
## It returns true if it was successful in creating a structure and false
## if it wasn't.
##
## err: the desired error rate (e.g. 0.01).
##
## Returns: a hll cardinality handle.
##
## .. bro:see:: hll_cardinality_estimate hll_cardinality_merge_into hll_cardinality_add
## hll_cardinality_copy
function hll_cardinality_init%(err: double%): opaque of cardinality
%{
CardinalityCounter* c = new CardinalityCounter(err);
CardinalityVal* cv = new CardinalityVal();
if ( !c )
reporter->Error("Failed initialize Cardinality counter");
else
cv->Init(c);
CardinalityVal* cv = new CardinalityVal(c);
return cv;
%}
## Adds an element to the HyperLogLog data structure located at index.
##elem->Type() to get the type of elem.
## Adds an element to the HyperLogLog data structure
##
## handle: the hll handle.
##
## elem: the element to add
##
## Returns: 1 on success
##
## .. bro:see:: hll_cardinality_estimate hll_cardinality_merge_into
## hll_cardinality_init hll_cardinality_copy
function hll_cardinality_add%(handle: opaque of cardinality, elem: any%): bool
%{
if ( !((CardinalityVal*) handle)->IsValid() ) {
reporter->Error("Need valid handle");
CardinalityVal* cv = static_cast<CardinalityVal*>(handle);
if ( ! cv->Type() && ! cv->Typify(elem->Type()) )
{
reporter->Error("failed to set HLL type");
return new Val(0, TYPE_BOOL);
}
}
else if ( ! same_type(cv->Type(), elem->Type()) )
{
reporter->Error("incompatible HLL data type");
return new Val(0, TYPE_BOOL);
}
int status = 0;
uint64_t a = 123456;
TypeList* tl = new TypeList(elem->Type());
tl->Append(elem->Type());
CompositeHash* hll_hash = new CompositeHash(tl);
Unref(tl);
CardinalityCounter* h = ((CardinalityVal*) handle)->Get();
HashKey* key = hll_hash->ComputeHash(elem, 1);
a = key->Hash();
h->addElement(a);
uint64_t hash = key->Hash();
CardinalityCounter* h = cv->Get();
h->addElement(hash);
delete hll_hash;
return new Val(1, TYPE_BOOL);
%}
## The data structure at index1 will contain the combined count for the
## elements measured by index1 and index2.
## It returns true if it either cloned the value at index2 into index1
## or if it merged the two data structures together.
## Merges the second hll data structure into the first
##
## .. note:: The same restrictions as for bloom-filter merging apply
##
## handle1: the first hll handle, which will contain the merged result
##
## handle2: the second hll handle, which will be merged into the first
##
## Returns: 1 on success
##
## .. bro:see:: hll_cardinality_estimate hll_cardinality_add
## hll_cardinality_init hll_cardinality_copy
function hll_cardinality_merge_into%(handle1: opaque of cardinality, handle2: opaque of cardinality%): bool
%{
CardinalityVal* v1 = (CardinalityVal*) handle1;
CardinalityVal* v2 = (CardinalityVal*) handle2;
CardinalityVal* v1 = static_cast<CardinalityVal*>(handle1);
CardinalityVal* v2 = static_cast<CardinalityVal*>(handle2);
if ( !v1->IsValid() || !v2->IsValid() ) {
reporter->Error("need valid handles");
if ( ( v1->Type() != v2->Type() ) && // both 0 is ok
( v1->Type() != 0 ) && // any one 0 also is ok
( v2->Type() != 0 ) &&
! same_type(v1->Type(), v2->Type()) )
{
reporter->Error("incompatible HLL types");
return new Val(0, TYPE_BOOL);
}
}
CardinalityCounter* h1 = v1->Get();
CardinalityCounter* h2 = v2->Get();
@ -81,51 +105,42 @@ function hll_cardinality_merge_into%(handle1: opaque of cardinality, handle2: op
return new Val(1, TYPE_BOOL);
%}
## Returns true if it destroyed something. False if it didn't.
#function hll_cardinality_destroy%(handle: opaque of cardinality%): bool
# %{
# if ( !((CardinalityVal*) handle)->IsValid() ) {
# reporter->Error("Need valid handle");
# return new Val(0, TYPE_BOOL);
# }
# CardinalityCounter* h = ((CardinalityVal*) handle)->Get();
# delete h;
# h = 0;
# return new Val(1, TYPE_BOOL);
# %}
## Returns the cardinality estimate. Returns -1.0 if there is nothing in that index.
## Estimate the cardinality of the HLL data structure.
##
## handle: the hll handle
##
## Returns the cardinality estimate. Returns -1.0 if the structure is empty
##
## .. bro:see:: hll_cardinality_merge_into hll_cardinality_add
## hll_cardinality_init hll_cardinality_copy
function hll_cardinality_estimate%(handle: opaque of cardinality%): double
%{
if ( !((CardinalityVal*) handle)->IsValid() ) {
reporter->Error("Need valid handle");
return new Val(0, TYPE_BOOL);
}
CardinalityCounter* h = ((CardinalityVal*) handle)->Get();
CardinalityVal* cv = static_cast<CardinalityVal*>(handle);
CardinalityCounter* h = cv->Get();
double estimate = h->size();
return new Val(estimate, TYPE_DOUBLE);
%}
## Stores the data structure at index2 into index1. Deletes the data structure at index1
## if there was any. Returns True if the data structure at index1 was changed in any way.
function hll_cardinality_clone%(handle: opaque of cardinality%): opaque of cardinality
## Copy a hll data structure
##
## handle: data structure to copy
##
## Returns: copy of handle
##
## .. bro:see:: hll_cardinality_estimate hll_cardinality_merge_into hll_cardinality_add
## hll_cardinality_init
function hll_cardinality_copy%(handle: opaque of cardinality%): opaque of cardinality
%{
if ( !((CardinalityVal*) handle)->IsValid() ) {
reporter->Error("Need valid handle");
return new Val(0, TYPE_BOOL);
}
CardinalityCounter* h = ((CardinalityVal*) handle)->Get();
CardinalityVal* cv = static_cast<CardinalityVal*>(handle);
CardinalityCounter* h = cv->Get();
uint64_t m = h->getM();
CardinalityCounter* h2 = new CardinalityCounter(m);
int i = 0;
h2->merge(h);
CardinalityVal* cv = new CardinalityVal();
cv->Init(h2);
return cv;
CardinalityVal* out = new CardinalityVal(h2);
return out;
%}

View file

@ -0,0 +1 @@
error: incompatible HLL data type

View file

@ -1,35 +1,37 @@
#
# @TEST-EXEC: bro %INPUT>out
# @TEST-EXEC: btest-diff out
# @TEST-EXEC: btest-diff .stderr
event bro_init()
{
local c1 = hll_cardinality_init(0.01);
local c2 = hll_cardinality_init(0.01);
local add1 = "hey";
local add2 = "hi";
local add3 = 123;
local add1 = 2001;
local add2 = 2002;
local add3 = 2003;
hll_cardinality_add(c1, add1);
hll_cardinality_add(c1, add2);
hll_cardinality_add(c1, add3);
hll_cardinality_add(c1, "a");
hll_cardinality_add(c1, "b");
hll_cardinality_add(c1, "c");
hll_cardinality_add(c1, "d");
hll_cardinality_add(c1, "e");
hll_cardinality_add(c1, "f");
hll_cardinality_add(c1, "g");
hll_cardinality_add(c1, "h");
hll_cardinality_add(c1, "i");
hll_cardinality_add(c1, "j");
hll_cardinality_add(c1, 1000);
hll_cardinality_add(c1, 1001);
hll_cardinality_add(c1, 101);
hll_cardinality_add(c1, 1003);
hll_cardinality_add(c1, 1004);
hll_cardinality_add(c1, 1005);
hll_cardinality_add(c1, 1006);
hll_cardinality_add(c1, 1007);
hll_cardinality_add(c1, 1008);
hll_cardinality_add(c1, 1009);
hll_cardinality_add(c2, add1);
hll_cardinality_add(c2, add2);
hll_cardinality_add(c2, add3);
hll_cardinality_add(c2, 1);
hll_cardinality_add(c2, "b");
hll_cardinality_add(c2, 101);
hll_cardinality_add(c2, 2);
hll_cardinality_add(c2, 3);
hll_cardinality_add(c2, 4);
@ -49,7 +51,7 @@ event bro_init()
print "This value should be around 0:";
print hll_cardinality_estimate(m2);
local c3 = hll_cardinality_clone(c1);
local c3 = hll_cardinality_copy(c1);
print "This value should be around 13:";
print hll_cardinality_estimate(c3);

View file

@ -1,40 +0,0 @@
# @TEST-EXEC: BRO_SEED_FILE="" bro -b %INPUT runnumber=1 >out
# @TEST-EXEC: BRO_SEED_FILE="" bro -b %INPUT runnumber=2 >>out
# @TEST-EXEC: BRO_SEED_FILE="" bro -b %INPUT runnumber=3 >>out
# @TEST-EXEC: btest-diff out
global runnumber: count &redef; # differentiate first and second run
global card: opaque of cardinality &persistent;
event bro_init()
{
print runnumber;
if ( runnumber == 1 )
{
card = hll_cardinality_init(0.01);
hll_cardinality_add(card, "a");
hll_cardinality_add(card, "b");
hll_cardinality_add(card, "c");
hll_cardinality_add(card, "d");
hll_cardinality_add(card, "e");
hll_cardinality_add(card, "f");
hll_cardinality_add(card, "g");
hll_cardinality_add(card, "h");
hll_cardinality_add(card, "i");
hll_cardinality_add(card, "j");
}
print hll_cardinality_estimate(card);
if ( runnumber == 2 )
{
hll_cardinality_add(card, "a");
hll_cardinality_add(card, "b");
hll_cardinality_add(card, "c");
hll_cardinality_add(card, "aa");
}
}