Merge branch 'topic/robin/topk-merge'

BIT-1048 #merged

I'm reverting the serializer version update for now as that breaks
Broccoli. Let's do that later for 2.2.

* topic/robin/topk-merge:
  update documentation, rename get* to Get* and make hasher persistent
  adapt to new folder structure
  fix opaqueval-related memleak
  synchronize pruned attribute
  potentially found wrong Ref.
  add sum function that can be used to get the number of total observed elements.
  in cluster settings, the resultvals can apparently been uninitialized in some special cases
  fix memory leaks
  fix warnings
  add topk cluster test
  make size of topk-list configureable when using sumstats
  implement merging for top-k.
  add serialization for topk
  make the get function const
  topk for sumstats
  well, a test that works..
  implement topk.
This commit is contained in:
Robin Sommer 2013-08-01 14:37:16 -07:00
commit 04ccb12183
21 changed files with 1440 additions and 3 deletions

17
CHANGES
View file

@ -1,4 +1,21 @@
2.1-1004 | 2013-08-01 14:37:43 -0700
* Adding a probabilistic data structure for computing "top k"
elements. (Bernhard Amann)
The corresponding functions are:
topk_init(size: count): opaque of topk
topk_add(handle: opaque of topk, value: any)
topk_get_top(handle: opaque of topk, k: count)
topk_count(handle: opaque of topk, value: any): count
topk_epsilon(handle: opaque of topk, value: any): count
topk_size(handle: opaque of topk): count
topk_sum(handle: opaque of topk): count
topk_merge(handle1: opaque of topk, handle2: opaque of topk)
topk_merge_prune(handle1: opaque of topk, handle2: opaque of topk)
2.1-971 | 2013-08-01 13:28:32 -0700 2.1-971 | 2013-08-01 13:28:32 -0700
* Fix some build errors. (Jon Siwek) * Fix some build errors. (Jon Siwek)

15
NEWS
View file

@ -122,6 +122,21 @@ New Functionality
See <INSERT LINK> for full documentation. See <INSERT LINK> for full documentation.
- Bro now provides a probabilistic data structure for computing
"top k" elements. The corresponding functions are:
topk_init(size: count): opaque of topk
topk_add(handle: opaque of topk, value: any)
topk_get_top(handle: opaque of topk, k: count)
topk_count(handle: opaque of topk, value: any): count
topk_epsilon(handle: opaque of topk, value: any): count
topk_size(handle: opaque of topk): count
topk_sum(handle: opaque of topk): count
topk_merge(handle1: opaque of topk, handle2: opaque of topk)
topk_merge_prune(handle1: opaque of topk, handle2: opaque of topk)
See <INSERT LINK> for full documentation.
- base/utils/exec.bro provides a module to start external processes - base/utils/exec.bro provides a module to start external processes
asynchronously and retrieve their output on termination. asynchronously and retrieve their output on termination.
base/utils/dir.bro uses it to monitor a directory for changes, and base/utils/dir.bro uses it to monitor a directory for changes, and

View file

@ -1 +1 @@
2.1-971 2.1-1004

View file

@ -5,5 +5,6 @@
@load ./sample @load ./sample
@load ./std-dev @load ./std-dev
@load ./sum @load ./sum
@load ./topk
@load ./unique @load ./unique
@load ./variance @load ./variance

View file

@ -0,0 +1,50 @@
@load base/frameworks/sumstats
module SumStats;
export {
redef record Reducer += {
## number of elements to keep in the top-k list
topk_size: count &default=500;
};
redef enum Calculation += {
TOPK
};
redef record ResultVal += {
topk: opaque of topk &optional;
};
}
hook init_resultval_hook(r: Reducer, rv: ResultVal)
{
if ( TOPK in r$apply && ! rv?$topk )
rv$topk = topk_init(r$topk_size);
}
hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal)
{
if ( TOPK in r$apply )
topk_add(rv$topk, obs);
}
hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal)
{
if ( rv1?$topk )
{
result$topk = topk_init(topk_size(rv1$topk));
topk_merge(result$topk, rv1$topk);
if ( rv2?$topk )
topk_merge(result$topk, rv2$topk);
}
else if ( rv2?$topk )
{
result$topk = topk_init(topk_size(rv2$topk));
topk_merge(result$topk, rv2$topk);
}
}

View file

@ -244,6 +244,7 @@ OpaqueType* md5_type;
OpaqueType* sha1_type; OpaqueType* sha1_type;
OpaqueType* sha256_type; OpaqueType* sha256_type;
OpaqueType* entropy_type; OpaqueType* entropy_type;
OpaqueType* topk_type;
OpaqueType* bloomfilter_type; OpaqueType* bloomfilter_type;
#include "const.bif.netvar_def" #include "const.bif.netvar_def"
@ -312,6 +313,7 @@ void init_general_global_var()
sha1_type = new OpaqueType("sha1"); sha1_type = new OpaqueType("sha1");
sha256_type = new OpaqueType("sha256"); sha256_type = new OpaqueType("sha256");
entropy_type = new OpaqueType("entropy"); entropy_type = new OpaqueType("entropy");
topk_type = new OpaqueType("topk");
bloomfilter_type = new OpaqueType("bloomfilter"); bloomfilter_type = new OpaqueType("bloomfilter");
} }

View file

@ -249,6 +249,7 @@ extern OpaqueType* md5_type;
extern OpaqueType* sha1_type; extern OpaqueType* sha1_type;
extern OpaqueType* sha256_type; extern OpaqueType* sha256_type;
extern OpaqueType* entropy_type; extern OpaqueType* entropy_type;
extern OpaqueType* topk_type;
extern OpaqueType* bloomfilter_type; extern OpaqueType* bloomfilter_type;
// Initializes globals that don't pertain to network/event analysis. // Initializes globals that don't pertain to network/event analysis.

View file

@ -108,7 +108,8 @@ SERIAL_VAL(MD5_VAL, 16)
SERIAL_VAL(SHA1_VAL, 17) SERIAL_VAL(SHA1_VAL, 17)
SERIAL_VAL(SHA256_VAL, 18) SERIAL_VAL(SHA256_VAL, 18)
SERIAL_VAL(ENTROPY_VAL, 19) SERIAL_VAL(ENTROPY_VAL, 19)
SERIAL_VAL(BLOOMFILTER_VAL, 20) SERIAL_VAL(TOPK_VAL, 20)
SERIAL_VAL(BLOOMFILTER_VAL, 21)
#define SERIAL_EXPR(name, val) SERIAL_CONST(name, val, EXPR) #define SERIAL_EXPR(name, val) SERIAL_CONST(name, val, EXPR)
SERIAL_EXPR(EXPR, 1) SERIAL_EXPR(EXPR, 1)

View file

@ -10,9 +10,11 @@ set(probabilistic_SRCS
BitVector.cc BitVector.cc
BloomFilter.cc BloomFilter.cc
CounterVector.cc CounterVector.cc
Hasher.cc) Hasher.cc
Topk.cc)
bif_target(bloom-filter.bif) bif_target(bloom-filter.bif)
bif_target(top-k.bif)
bro_add_subdir_library(probabilistic ${probabilistic_SRCS}) bro_add_subdir_library(probabilistic ${probabilistic_SRCS})
add_dependencies(bro_probabilistic generate_outputs) add_dependencies(bro_probabilistic generate_outputs)

499
src/probabilistic/Topk.cc Normal file
View file

@ -0,0 +1,499 @@
// See the file "COPYING" in the main distribution directory for copyright.
#include "probabilistic/Topk.h"
#include "CompHash.h"
#include "Reporter.h"
#include "Serializer.h"
#include "NetVar.h"
namespace probabilistic {
IMPLEMENT_SERIAL(TopkVal, SER_TOPK_VAL);
static void topk_element_hash_delete_func(void* val)
{
Element* e = (Element*) val;
delete e;
}
Element::~Element()
{
Unref(value);
}
void TopkVal::Typify(BroType* t)
{
assert(!hash && !type);
type = t->Ref();
TypeList* tl = new TypeList(t);
tl->Append(t->Ref());
hash = new CompositeHash(tl);
Unref(tl);
}
HashKey* TopkVal::GetHash(Val* v) const
{
HashKey* key = hash->ComputeHash(v, 1);
assert(key);
return key;
}
TopkVal::TopkVal(uint64 arg_size) : OpaqueVal(topk_type)
{
elementDict = new PDict(Element);
elementDict->SetDeleteFunc(topk_element_hash_delete_func);
size = arg_size;
type = 0;
numElements = 0;
pruned = false;
hash = 0;
}
TopkVal::TopkVal() : OpaqueVal(topk_type)
{
elementDict = new PDict(Element);
elementDict->SetDeleteFunc(topk_element_hash_delete_func);
size = 0;
type = 0;
numElements = 0;
hash = 0;
}
TopkVal::~TopkVal()
{
elementDict->Clear();
delete elementDict;
// now all elements are already gone - delete the buckets
std::list<Bucket*>::iterator bi = buckets.begin();
while ( bi != buckets.end() )
{
delete *bi;
bi++;
}
Unref(type);
delete hash;
}
void TopkVal::Merge(const TopkVal* value, bool doPrune)
{
if ( type == 0 )
{
assert(numElements == 0);
Typify(value->type);
}
else
{
if ( ! same_type(type, value->type) )
{
reporter->Error("Cannot merge top-k elements of differing types.");
return;
}
}
std::list<Bucket*>::const_iterator it = value->buckets.begin();
while ( it != value->buckets.end() )
{
Bucket* b = *it;
uint64_t currcount = b->count;
std::list<Element*>::const_iterator eit = b->elements.begin();
while ( eit != b->elements.end() )
{
Element* e = *eit;
// lookup if we already know this one...
HashKey* key = GetHash(e->value);
Element* olde = (Element*) elementDict->Lookup(key);
if ( olde == 0 )
{
olde = new Element();
olde->epsilon = 0;
olde->value = e->value->Ref();
// insert at bucket position 0
if ( buckets.size() > 0 )
{
assert (buckets.front()-> count > 0 );
}
Bucket* newbucket = new Bucket();
newbucket->count = 0;
newbucket->bucketPos = buckets.insert(buckets.begin(), newbucket);
olde->parent = newbucket;
newbucket->elements.insert(newbucket->elements.end(), olde);
elementDict->Insert(key, olde);
numElements++;
}
// now that we are sure that the old element is present - increment epsilon
olde->epsilon += e->epsilon;
// and increment position...
IncrementCounter(olde, currcount);
delete key;
eit++;
}
it++;
}
// now we have added everything. And our top-k table could be too big.
// prune everything...
assert(size > 0);
if ( ! doPrune )
return;
while ( numElements > size )
{
pruned = true;
assert(buckets.size() > 0 );
Bucket* b = buckets.front();
assert(b->elements.size() > 0);
Element* e = b->elements.front();
HashKey* key = GetHash(e->value);
elementDict->RemoveEntry(key);
delete e;
b->elements.pop_front();
if ( b->elements.size() == 0 )
{
delete b;
buckets.pop_front();
}
numElements--;
}
}
bool TopkVal::DoSerialize(SerialInfo* info) const
{
DO_SERIALIZE(SER_TOPK_VAL, OpaqueVal);
bool v = true;
v &= SERIALIZE(size);
v &= SERIALIZE(numElements);
v &= SERIALIZE(pruned);
bool type_present = (type != 0);
v &= SERIALIZE(type_present);
if ( type_present )
v &= type->Serialize(info);
else
assert(numElements == 0);
uint64_t i = 0;
std::list<Bucket*>::const_iterator it = buckets.begin();
while ( it != buckets.end() )
{
Bucket* b = *it;
uint32_t elements_count = b->elements.size();
v &= SERIALIZE(elements_count);
v &= SERIALIZE(b->count);
std::list<Element*>::const_iterator eit = b->elements.begin();
while ( eit != b->elements.end() )
{
Element* element = *eit;
v &= SERIALIZE(element->epsilon);
v &= element->value->Serialize(info);
eit++;
i++;
}
it++;
}
assert(i == numElements);
return v;
}
bool TopkVal::DoUnserialize(UnserialInfo* info)
{
DO_UNSERIALIZE(OpaqueVal);
bool v = true;
v &= UNSERIALIZE(&size);
v &= UNSERIALIZE(&numElements);
v &= UNSERIALIZE(&pruned);
bool type_present = false;
v &= UNSERIALIZE(&type_present);
if ( type_present )
{
BroType* deserialized_type = BroType::Unserialize(info);
Typify(deserialized_type);
Unref(deserialized_type);
assert(type);
}
else
assert(numElements == 0);
uint64_t i = 0;
while ( i < numElements )
{
Bucket* b = new Bucket();
uint32_t elements_count;
v &= UNSERIALIZE(&elements_count);
v &= UNSERIALIZE(&b->count);
b->bucketPos = buckets.insert(buckets.end(), b);
for ( uint64_t j = 0; j < elements_count; j++ )
{
Element* e = new Element();
v &= UNSERIALIZE(&e->epsilon);
e->value = Val::Unserialize(info, type);
e->parent = b;
b->elements.insert(b->elements.end(), e);
HashKey* key = GetHash(e->value);
assert (elementDict->Lookup(key) == 0);
elementDict->Insert(key, e);
delete key;
i++;
}
}
assert(i == numElements);
return v;
}
VectorVal* TopkVal::GetTopK(int k) const // returns vector
{
if ( numElements == 0 )
{
reporter->Error("Cannot return topk of empty");
return 0;
}
TypeList* vector_index = new TypeList(type);
vector_index->Append(type->Ref());
VectorType* v = new VectorType(vector_index);
VectorVal* t = new VectorVal(v);
// this does no estimation if the results is correct!
// in any case - just to make this future-proof (and I am lazy) - this can return more than k.
int read = 0;
std::list<Bucket*>::const_iterator it = buckets.end();
it--;
while (read < k )
{
//printf("Bucket %llu\n", (*it)->count);
std::list<Element*>::iterator eit = (*it)->elements.begin();
while ( eit != (*it)->elements.end() )
{
//printf("Size: %ld\n", (*it)->elements.size());
t->Assign(read, (*eit)->value->Ref());
read++;
eit++;
}
if ( it == buckets.begin() )
break;
it--;
}
Unref(v);
return t;
}
uint64_t TopkVal::GetCount(Val* value) const
{
HashKey* key = GetHash(value);
Element* e = (Element*) elementDict->Lookup(key);
if ( e == 0 )
{
reporter->Error("GetCount for element that is not in top-k");
return 0;
}
delete key;
return e->parent->count;
}
uint64_t TopkVal::GetEpsilon(Val* value) const
{
HashKey* key = GetHash(value);
Element* e = (Element*) elementDict->Lookup(key);
if ( e == 0 )
{
reporter->Error("GetEpsilon for element that is not in top-k");
return 0;
}
delete key;
return e->epsilon;
}
uint64_t TopkVal::GetSum() const
{
uint64_t sum = 0;
std::list<Bucket*>::const_iterator it = buckets.begin();
while ( it != buckets.end() )
{
sum += (*it)->elements.size() * (*it)->count;
it++;
}
if ( pruned )
reporter->Warning("TopkVal::GetSum() was used on a pruned data structure. Result values do not represent total element count");
return sum;
}
void TopkVal::Encountered(Val* encountered)
{
// ok, let's see if we already know this one.
if ( numElements == 0 )
Typify(encountered->Type());
else
if ( ! same_type(type, encountered->Type()) )
{
reporter->Error("Trying to add element to topk with differing type from other elements");
return;
}
// Step 1 - get the hash.
HashKey* key = GetHash(encountered);
Element* e = (Element*) elementDict->Lookup(key);
if ( e == 0 )
{
e = new Element();
e->epsilon = 0;
e->value = encountered->Ref(); // or no ref?
// well, we do not know this one yet...
if ( numElements < size )
{
// brilliant. just add it at position 1
if ( buckets.size() == 0 || (*buckets.begin())->count > 1 )
{
Bucket* b = new Bucket();
b->count = 1;
std::list<Bucket*>::iterator pos = buckets.insert(buckets.begin(), b);
b->bucketPos = pos;
b->elements.insert(b->elements.end(), e);
e->parent = b;
}
else
{
Bucket* b = *buckets.begin();
assert(b->count == 1);
b->elements.insert(b->elements.end(), e);
e->parent = b;
}
elementDict->Insert(key, e);
numElements++;
delete key;
return; // done. it is at pos 1.
}
else
{
// replace element with min-value
Bucket* b = *buckets.begin(); // bucket with smallest elements
// evict oldest element with least hits.
assert(b->elements.size() > 0);
HashKey* deleteKey = GetHash((*(b->elements.begin()))->value);
b->elements.erase(b->elements.begin());
Element* deleteElement = (Element*) elementDict->RemoveEntry(deleteKey);
assert(deleteElement); // there has to have been a minimal element...
delete deleteElement;
delete deleteKey;
// and add the new one to the end
e->epsilon = b->count;
b->elements.insert(b->elements.end(), e);
elementDict->Insert(key, e);
e->parent = b;
// fallthrough, increment operation has to run!
}
}
// ok, we now have an element in e
delete key;
IncrementCounter(e); // well, this certainly was anticlimatic.
}
// increment by count
void TopkVal::IncrementCounter(Element* e, unsigned int count)
{
Bucket* currBucket = e->parent;
uint64 currcount = currBucket->count;
// well, let's test if there is a bucket for currcount++
std::list<Bucket*>::iterator bucketIter = currBucket->bucketPos;
Bucket* nextBucket = 0;
bucketIter++;
while ( bucketIter != buckets.end() && (*bucketIter)->count < currcount+count )
bucketIter++;
if ( bucketIter != buckets.end() && (*bucketIter)->count == currcount+count )
nextBucket = *bucketIter;
if ( nextBucket == 0 )
{
// the bucket for the value that we want does not exist.
// create it...
Bucket* b = new Bucket();
b->count = currcount+count;
std::list<Bucket*>::iterator nextBucketPos = buckets.insert(bucketIter, b);
b->bucketPos = nextBucketPos; // and give it the iterator we know now.
nextBucket = b;
}
// ok, now we have the new bucket in nextBucket. Shift the element over...
currBucket->elements.remove(e);
nextBucket->elements.insert(nextBucket->elements.end(), e);
e->parent = nextBucket;
// if currBucket is empty, we have to delete it now
if ( currBucket->elements.size() == 0 )
{
buckets.remove(currBucket);
delete currBucket;
currBucket = 0;
}
}
};

149
src/probabilistic/Topk.h Normal file
View file

@ -0,0 +1,149 @@
// See the file "COPYING" in the main distribution directory for copyright.
#ifndef topk_h
#define topk_h
#include <list>
#include "Val.h"
#include "CompHash.h"
#include "OpaqueVal.h"
// This class implements the top-k algorithm. Or - to be more precise - an
// interpretation of it.
namespace probabilistic {
struct Element;
struct Bucket {
uint64 count;
std::list<Element*> elements;
// Iterators only get invalidated for removed elements. This one
// points to us - so it is invalid when we are no longer there. Cute,
// isn't it?
std::list<Bucket*>::iterator bucketPos;
};
struct Element {
uint64 epsilon;
Val* value;
Bucket* parent;
~Element();
};
declare(PDict, Element);
class TopkVal : public OpaqueVal {
public:
/**
* Construct a TopkVal.
*
* @param size specifies how many total elements are tracked
*
* @return A newly initialized TopkVal
*/
TopkVal(uint64 size);
/**
* Destructor.
*/
~TopkVal();
/**
* Call this when a new value is encountered. Note that on the first
* call, the Bro type of the value types that are counted is set. All
* following calls to encountered have to specify the same type.
*
* @param value The encountered element
*/
void Encountered(Val* value);
/**
* Get the first *k* elements of the result vector. At the moment,
* this does not check if it is in the right order or if we can prove
* that these are the correct top-k. Use count and epsilon for this.
*
* @param k Number of top-elements to return
*
* @returns The top-k encountered elements
*/
VectorVal* GetTopK(int k) const;
/**
* Get the current count tracked in the top-k data structure for a
* certain val. Returns 0 if the val is unknown (and logs the error
* to reporter).
*
* @param value Bro value to get counts for
*
* @returns internal count for val, 0 if unknown
*/
uint64_t GetCount(Val* value) const;
/**
* Get the current epsilon tracked in the top-k data structure for a
* certain val.
*
* @param value Bro value to get epsilons for
*
* @returns the epsilon. Returns 0 if the val is unknown (and logs
* the error to reporter)
*/
uint64_t GetEpsilon(Val* value) const;
/**
* Get the size set in the constructor
*
* @returns size of the top-k structure
*/
uint64_t GetSize() const { return size; }
/**
* Get the sum of all counts of all tracked elements. This is equal
* to the number of total observations up to this moment, if no
* elements were pruned from the data structure.
*
* @returns sum of all counts
*/
uint64_t GetSum() const;
/**
* Merge another top-k data structure into this one. doPrune
* specifies if the total count of elements is limited to size after
* merging. Please note, that pruning will invalidate the results of
* getSum.
*
* @param value TopkVal to merge into this TopkVal
*
* @param doPrune prune resulting TopkVal to size after merging
*/
void Merge(const TopkVal* value, bool doPrune=false);
protected:
/**
* Construct an empty TopkVal. Only used for deserialization
*/
TopkVal();
private:
void IncrementCounter(Element* e, unsigned int count = 1);
HashKey* GetHash(Val*) const; // this probably should go somewhere else.
void Typify(BroType*);
BroType* type;
CompositeHash* hash;
std::list<Bucket*> buckets;
PDict(Element)* elementDict;
uint64 size; // how many elements are we tracking?
uint64 numElements; // how many elements do we have at the moment
bool pruned; // was this data structure pruned?
DECLARE_SERIAL(TopkVal);
};
};
#endif

184
src/probabilistic/top-k.bif Normal file
View file

@ -0,0 +1,184 @@
# ===========================================================================
#
# Top-K Functions
#
# ===========================================================================
%%{
#include "probabilistic/Topk.h"
%%}
## Creates a top-k data structure which tracks *size* elements.
##
## size: number of elements to track
##
## Returns: Opaque pointer to the data structure.
##
## .. bro:see:: topk_add topk_get_top topk_count topk_epsilon
## topk_size topk_sum topk_merge topk_merge_prune
function topk_init%(size: count%): opaque of topk
%{
probabilistic::TopkVal* v = new probabilistic::TopkVal(size);
return v;
%}
## Add a new observed object to the data structure.
##
## .. note:: The first added object sets the type of data tracked by
## the top-k data structure. All following values have to be of the same
## type.
##
## handle: the TopK handle
##
## value: observed value
##
## .. bro:see:: topk_init topk_get_top topk_count topk_epsilon
## topk_size topk_sum topk_merge topk_merge_prune
function topk_add%(handle: opaque of topk, value: any%): any
%{
assert(handle);
probabilistic::TopkVal* h = (probabilistic::TopkVal*) handle;
h->Encountered(value);
return 0;
%}
## Get the first *k* elements of the top-k data structure.
##
## handle: the TopK handle
##
## k: number of elements to return
##
## Returns: vector of the first k elements
##
## .. bro:see:: topk_init topk_add topk_count topk_epsilon
## topk_size topk_sum topk_merge topk_merge_prune
function topk_get_top%(handle: opaque of topk, k: count%): any
%{
assert(handle);
probabilistic::TopkVal* h = (probabilistic::TopkVal*) handle;
return h->GetTopK(k);
%}
## Get an overestimated count of how often value has been encountered.
##
## .. note:: value has to be part of the currently tracked elements, otherwise
## 0 will be returned and an error message will be added to reporter.
##
## handle: the TopK handle
##
## value: Value to look up count for.
##
## Returns: Overestimated number for how often the element has been encountered
##
## .. bro:see:: topk_init topk_add topk_get_top topk_epsilon
## topk_size topk_sum topk_merge topk_merge_prune
function topk_count%(handle: opaque of topk, value: any%): count
%{
assert(handle);
probabilistic::TopkVal* h = (probabilistic::TopkVal*) handle;
return new Val(h->GetCount(value), TYPE_COUNT);
%}
## Get the maximal overestimation for count.
##
## .. note:: Same restrictions as for :bro:id:`topk_count` apply.
##
## handle: the TopK handle
##
## value: Value to look up epsilon for.
##
## Returns: Number which represents the maximal overesimation for the count of this element.
##
## .. bro:see:: topk_init topk_add topk_get_top topk_count
## topk_size topk_sum topk_merge topk_merge_prune
function topk_epsilon%(handle: opaque of topk, value: any%): count
%{
assert(handle);
probabilistic::TopkVal* h = (probabilistic::TopkVal*) handle;
return new Val(h->GetEpsilon(value), TYPE_COUNT);
%}
## Get the number of elements this data structure is supposed to track (given on init).
##
## .. note ::Note that the actual number of elements in the data structure can be lower
## or higher (due to non-pruned merges) than this.
##
## handle: the TopK handle
##
## Returns: size given during initialization
##
## .. bro:see:: topk_init topk_add topk_get_top topk_count topk_epsilon
## topk_sum topk_merge topk_merge_prune
function topk_size%(handle: opaque of topk%): count
%{
assert(handle);
probabilistic::TopkVal* h = (probabilistic::TopkVal*) handle;
return new Val(h->GetSize(), TYPE_COUNT);
%}
## Get the sum of all counts of all elements in the data structure.
##
## .. note:: This is equal to the number of all inserted objects if the data structure
## never has been pruned. Do not use after calling topk_merge_prune (will throw a
## warning message if used afterwards)
##
## handle: the TopK handle
##
## Returns: sum of all counts
##
## .. bro:see:: topk_init topk_add topk_get_top topk_count topk_epsilon
## topk_size topk_merge topk_merge_prune
function topk_sum%(handle: opaque of topk%): count
%{
assert(handle);
probabilistic::TopkVal* h = (probabilistic::TopkVal*) handle;
return new Val(h->GetSum(), TYPE_COUNT);
%}
## Merge the second topk data structure into the first.
##
## .. note:: This does not remove any elements, the resulting data structure can be
## bigger than the maximum size given on initialization.
##
## .. bro:see:: topk_init topk_add topk_get_top topk_count topk_epsilon
## topk_size topk_sum topk_merge_prune
function topk_merge%(handle1: opaque of topk, handle2: opaque of topk%): any
%{
assert(handle1);
assert(handle2);
probabilistic::TopkVal* h1 = (probabilistic::TopkVal*) handle1;
probabilistic::TopkVal* h2 = (probabilistic::TopkVal*) handle2;
h1->Merge(h2);
return 0;
%}
## Merge the second topk data structure into the first and prunes the final data
## structure back to the size given on initialization.
##
## .. note:: Use with care and only when being aware of the restrictions this
## entails. Do not call :bro:id:`topk_size` or :bro:id:`topk_add` afterwards,
## results will probably not be what you expect.
##
## handle1: the TopK handle in which the second TopK structure is merged
##
## handle2: the TopK handle in which is merged into the first TopK structure
##
## .. bro:see:: topk_init topk_add topk_get_top topk_count topk_epsilon
## topk_size topk_sum topk_merge
function topk_merge_prune%(handle1: opaque of topk, handle2: opaque of topk%): any
%{
assert(handle1);
assert(handle2);
probabilistic::TopkVal* h1 = (probabilistic::TopkVal*) handle1;
probabilistic::TopkVal* h2 = (probabilistic::TopkVal*) handle2;
h1->Merge(h2, true);
return 0;
%}

View file

@ -0,0 +1,11 @@
error: GetCount for element that is not in top-k
error: GetEpsilon for element that is not in top-k
error: GetCount for element that is not in top-k
error: GetEpsilon for element that is not in top-k
error: GetCount for element that is not in top-k
error: GetEpsilon for element that is not in top-k
error: GetCount for element that is not in top-k
error: GetEpsilon for element that is not in top-k
warning: TopkVal::GetSum() was used on a pruned data structure. Result values do not represent total element count
error: GetCount for element that is not in top-k
error: GetEpsilon for element that is not in top-k

View file

@ -0,0 +1,81 @@
[b, c]
4
0
0
2
0
2
1
[d, c]
5
0
0
2
1
3
2
[d, e]
6
3
2
3
2
[f, e]
7
4
3
3
2
[f, e]
8
4
3
4
2
[g, e]
9
0
0
4
2
5
4
[c, e, d]
19
6
0
5
0
4
0
[c, e]
6
0
5
0
0
0
[c, e]
22
12
0
10
0
0
0
[c, e]
19
6
0
5
0
4
0
[c, e, d]
38
12
0
10
0
8
0

View file

@ -0,0 +1,21 @@
1
2
6
4
5
1
[c, e, d]
1
2
6
4
5
1
[c, e, d]
2
4
12
8
10
2
[c, e, d]

View file

@ -0,0 +1,9 @@
Top entries for key counter
Num: 995, count: 100, epsilon: 0
Num: 1, count: 99, epsilon: 0
Num: 2, count: 98, epsilon: 0
Num: 3, count: 97, epsilon: 0
Num: 4, count: 96, epsilon: 0
Top entries for key two
Num: 2, count: 4, epsilon: 0
Num: 1, count: 3, epsilon: 0

View file

@ -0,0 +1,8 @@
Top entries for key counter
Num: 1, count: 99, epsilon: 0
Num: 2, count: 98, epsilon: 0
Num: 3, count: 97, epsilon: 0
Num: 4, count: 96, epsilon: 0
Num: 5, count: 95, epsilon: 0
Top entries for key two
Num: 1, count: 2, epsilon: 0

154
testing/btest/bifs/topk.bro Normal file
View file

@ -0,0 +1,154 @@
# @TEST-EXEC: bro -b %INPUT > out
# @TEST-EXEC: btest-diff out
# @TEST-EXEC: btest-diff .stderr
event bro_init()
{
local k1 = topk_init(2);
# first - peculiarity check...
topk_add(k1, "a");
topk_add(k1, "b");
topk_add(k1, "b");
topk_add(k1, "c");
local s = topk_get_top(k1, 5);
print s;
print topk_sum(k1);
print topk_count(k1, "a");
print topk_epsilon(k1, "a");
print topk_count(k1, "b");
print topk_epsilon(k1, "b");
print topk_count(k1, "c");
print topk_epsilon(k1, "c");
topk_add(k1, "d");
s = topk_get_top(k1, 5);
print s;
print topk_sum(k1);
print topk_count(k1, "b");
print topk_epsilon(k1, "b");
print topk_count(k1, "c");
print topk_epsilon(k1, "c");
print topk_count(k1, "d");
print topk_epsilon(k1, "d");
topk_add(k1, "e");
s = topk_get_top(k1, 5);
print s;
print topk_sum(k1);
print topk_count(k1, "d");
print topk_epsilon(k1, "d");
print topk_count(k1, "e");
print topk_epsilon(k1, "e");
topk_add(k1, "f");
s = topk_get_top(k1, 5);
print s;
print topk_sum(k1);
print topk_count(k1, "f");
print topk_epsilon(k1, "f");
print topk_count(k1, "e");
print topk_epsilon(k1, "e");
topk_add(k1, "e");
s = topk_get_top(k1, 5);
print s;
print topk_sum(k1);
print topk_count(k1, "f");
print topk_epsilon(k1, "f");
print topk_count(k1, "e");
print topk_epsilon(k1, "e");
topk_add(k1, "g");
s = topk_get_top(k1, 5);
print s;
print topk_sum(k1);
print topk_count(k1, "f");
print topk_epsilon(k1, "f");
print topk_count(k1, "e");
print topk_epsilon(k1, "e");
print topk_count(k1, "g");
print topk_epsilon(k1, "g");
k1 = topk_init(100);
topk_add(k1, "a");
topk_add(k1, "b");
topk_add(k1, "b");
topk_add(k1, "c");
topk_add(k1, "c");
topk_add(k1, "c");
topk_add(k1, "c");
topk_add(k1, "c");
topk_add(k1, "c");
topk_add(k1, "d");
topk_add(k1, "d");
topk_add(k1, "d");
topk_add(k1, "d");
topk_add(k1, "e");
topk_add(k1, "e");
topk_add(k1, "e");
topk_add(k1, "e");
topk_add(k1, "e");
topk_add(k1, "f");
s = topk_get_top(k1, 3);
print s;
print topk_sum(k1);
print topk_count(k1, "c");
print topk_epsilon(k1, "c");
print topk_count(k1, "e");
print topk_epsilon(k1, "d");
print topk_count(k1, "d");
print topk_epsilon(k1, "d");
local k3 = topk_init(2);
topk_merge_prune(k3, k1);
s = topk_get_top(k3, 3);
print s;
print topk_count(k3, "c");
print topk_epsilon(k3, "c");
print topk_count(k3, "e");
print topk_epsilon(k3, "e");
print topk_count(k3, "d");
print topk_epsilon(k3, "d");
topk_merge_prune(k3, k1);
s = topk_get_top(k3, 3);
print s;
print topk_sum(k3); # this gives a warning and a wrong result.
print topk_count(k3, "c");
print topk_epsilon(k3, "c");
print topk_count(k3, "e");
print topk_epsilon(k3, "e");
print topk_count(k3, "d");
print topk_epsilon(k3, "d");
k3 = topk_init(2);
topk_merge(k3, k1);
print s;
print topk_sum(k3);
print topk_count(k3, "c");
print topk_epsilon(k3, "c");
print topk_count(k3, "e");
print topk_epsilon(k3, "e");
print topk_count(k3, "d");
print topk_epsilon(k3, "d");
topk_merge(k3, k1);
s = topk_get_top(k3, 3);
print s;
print topk_sum(k3);
print topk_count(k3, "c");
print topk_epsilon(k3, "c");
print topk_count(k3, "e");
print topk_epsilon(k3, "e");
print topk_count(k3, "d");
print topk_epsilon(k3, "d");
}

View file

@ -0,0 +1,74 @@
# @TEST-EXEC: bro -b %INPUT runnumber=1 >out
# @TEST-EXEC: bro -b %INPUT runnumber=2 >>out
# @TEST-EXEC: bro -b %INPUT runnumber=3 >>out
# @TEST-EXEC: btest-diff out
global runnumber: count &redef; # differentiate runs
global k1: opaque of topk &persistent;
global k2: opaque of topk &persistent;
event bro_init()
{
k2 = topk_init(20);
if ( runnumber == 1 )
{
k1 = topk_init(100);
topk_add(k1, "a");
topk_add(k1, "b");
topk_add(k1, "b");
topk_add(k1, "c");
topk_add(k1, "c");
topk_add(k1, "c");
topk_add(k1, "c");
topk_add(k1, "c");
topk_add(k1, "c");
topk_add(k1, "d");
topk_add(k1, "d");
topk_add(k1, "d");
topk_add(k1, "d");
topk_add(k1, "e");
topk_add(k1, "e");
topk_add(k1, "e");
topk_add(k1, "e");
topk_add(k1, "e");
topk_add(k1, "f");
}
local s = topk_get_top(k1, 3);
print topk_count(k1, "a");
print topk_count(k1, "b");
print topk_count(k1, "c");
print topk_count(k1, "d");
print topk_count(k1, "e");
print topk_count(k1, "f");
if ( runnumber == 2 )
{
topk_add(k1, "a");
topk_add(k1, "b");
topk_add(k1, "b");
topk_add(k1, "c");
topk_add(k1, "c");
topk_add(k1, "c");
topk_add(k1, "c");
topk_add(k1, "c");
topk_add(k1, "c");
topk_add(k1, "d");
topk_add(k1, "d");
topk_add(k1, "d");
topk_add(k1, "d");
topk_add(k1, "e");
topk_add(k1, "e");
topk_add(k1, "e");
topk_add(k1, "e");
topk_add(k1, "e");
topk_add(k1, "f");
}
print s;
}

View file

@ -0,0 +1,110 @@
# @TEST-SERIALIZE: comm
#
# @TEST-EXEC: btest-bg-run manager-1 BROPATH=$BROPATH:.. CLUSTER_NODE=manager-1 bro %INPUT
# @TEST-EXEC: sleep 1
# @TEST-EXEC: btest-bg-run worker-1 BROPATH=$BROPATH:.. CLUSTER_NODE=worker-1 bro %INPUT
# @TEST-EXEC: btest-bg-run worker-2 BROPATH=$BROPATH:.. CLUSTER_NODE=worker-2 bro %INPUT
# @TEST-EXEC: btest-bg-wait 15
# @TEST-EXEC: btest-diff manager-1/.stdout
#
@TEST-START-FILE cluster-layout.bro
redef Cluster::nodes = {
["manager-1"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=37757/tcp, $workers=set("worker-1", "worker-2")],
["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=37760/tcp, $manager="manager-1", $interface="eth0"],
["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=37761/tcp, $manager="manager-1", $interface="eth1"],
};
@TEST-END-FILE
redef Log::default_rotation_interval = 0secs;
event bro_init() &priority=5
{
local r1: SumStats::Reducer = [$stream="test.metric",
$apply=set(SumStats::TOPK)];
SumStats::create([$epoch=5secs,
$reducers=set(r1),
$epoch_finished(data: SumStats::ResultTable) =
{
for ( key in data )
{
local r = data[key]["test.metric"];
local s: vector of SumStats::Observation;
s = topk_get_top(r$topk, 5);
print fmt("Top entries for key %s", key$str);
for ( element in s )
{
print fmt("Num: %d, count: %d, epsilon: %d", s[element]$num, topk_count(r$topk, s[element]), topk_epsilon(r$topk, s[element]));
}
terminate();
}
}
]);
}
event remote_connection_closed(p: event_peer)
{
terminate();
}
global ready_for_data: event();
redef Cluster::manager2worker_events += /^ready_for_data$/;
event ready_for_data()
{
const loop_v: vector of count = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100};
if ( Cluster::node == "worker-1" )
{
local a: count;
a = 0;
for ( i in loop_v )
{
a = a + 1;
for ( j in loop_v )
{
if ( i < j )
SumStats::observe("test.metric", [$str="counter"], [$num=a]);
}
}
SumStats::observe("test.metric", [$str="two"], [$num=1]);
SumStats::observe("test.metric", [$str="two"], [$num=1]);
}
if ( Cluster::node == "worker-2" )
{
SumStats::observe("test.metric", [$str="two"], [$num=2]);
SumStats::observe("test.metric", [$str="two"], [$num=2]);
SumStats::observe("test.metric", [$str="two"], [$num=2]);
SumStats::observe("test.metric", [$str="two"], [$num=2]);
SumStats::observe("test.metric", [$str="two"], [$num=1]);
for ( i in loop_v )
{
SumStats::observe("test.metric", [$str="counter"], [$num=995]);
}
}
}
@if ( Cluster::local_node_type() == Cluster::MANAGER )
global peer_count = 0;
event remote_connection_handshake_done(p: event_peer) &priority=-5
{
++peer_count;
if ( peer_count == 2 )
event ready_for_data();
}
@endif

View file

@ -0,0 +1,48 @@
# @TEST-EXEC: bro %INPUT
# @TEST-EXEC: btest-diff .stdout
event bro_init() &priority=5
{
local r1: SumStats::Reducer = [$stream="test.metric",
$apply=set(SumStats::TOPK)];
SumStats::create([$epoch=3secs,
$reducers=set(r1),
$epoch_finished(data: SumStats::ResultTable) =
{
for ( key in data )
{
local r = data[key]["test.metric"];
local s: vector of SumStats::Observation;
s = topk_get_top(r$topk, 5);
print fmt("Top entries for key %s", key$str);
for ( element in s )
{
print fmt("Num: %d, count: %d, epsilon: %d", s[element]$num, topk_count(r$topk, s[element]), topk_epsilon(r$topk, s[element]));
}
}
}
]);
const loop_v: vector of count = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100};
local a: count;
a = 0;
for ( i in loop_v )
{
a = a + 1;
for ( j in loop_v )
{
if ( i < j )
SumStats::observe("test.metric", [$str="counter"], [$num=a]);
}
}
SumStats::observe("test.metric", [$str="two"], [$num=1]);
SumStats::observe("test.metric", [$str="two"], [$num=1]);
}