From c21c18ea45d24a20f48c42b0d828c184d1f66ebd Mon Sep 17 00:00:00 2001 From: Bernhard Amann Date: Mon, 22 Apr 2013 01:10:29 -0700 Subject: [PATCH 01/16] implement topk. This is _completely_ untested. It compiles. It will probably do nothing else (well, besides crashing Bro). --- src/CMakeLists.txt | 1 + src/Topk.cc | 224 +++++++++++++++++++++++++++++++++++++++++++++ src/Topk.h | 56 ++++++++++++ 3 files changed, 281 insertions(+) create mode 100644 src/Topk.cc create mode 100644 src/Topk.h diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 83a018ccde..bc2512af68 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -408,6 +408,7 @@ set(bro_SRCS Telnet.cc Teredo.cc Timer.cc + Topk.cc Traverse.cc Trigger.cc TunnelEncapsulation.cc diff --git a/src/Topk.cc b/src/Topk.cc new file mode 100644 index 0000000000..ef7d7bfbd8 --- /dev/null +++ b/src/Topk.cc @@ -0,0 +1,224 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#include "Topk.h" +#include "CompHash.h" +#include "Reporter.h" + +namespace Topk { + +static void topk_element_hash_delete_func(void* val) + { + Element* e = (Element*) val; + delete e; + } + +Element::~Element() + { + if ( value ) + Unref(value); + value=0; + } + +HashKey* Topk::GetHash(Val* v) + { + TypeList* tl = new TypeList(v->Type()); + tl->Append(v->Type()); + CompositeHash* topk_hash = new CompositeHash(tl); + Unref(tl); + + HashKey* key = topk_hash->ComputeHash(v, 1); + assert(key); + return key; + } + +Topk::Topk(uint64 arg_size) + { + elementDict = new PDict(Element); + elementDict->SetDeleteFunc(topk_element_hash_delete_func); + size = arg_size; + type = 0; + } + +Topk::~Topk() + { + elementDict->Clear(); + delete elementDict; + + // now all elements are already gone - delete the buckets + std::list::iterator bi = buckets.begin(); + while ( bi != buckets.end() ) + { + delete *bi; + bi++; + } + + if ( type ) + Unref(type); + type = 0; + } + +VectorVal* Topk::getTopK(int k) // returns vector + { + if ( numElements == 0 ) + { + reporter->Error("Cannot return topk of empty"); + return 0; + } + + TypeList* vector_index = new TypeList(type); + vector_index->Append(type); + VectorType* v = new VectorType(vector_index); + VectorVal* t = new VectorVal(v); + + // this does no estimation if the results is correct! + // in any case - just to make this future-proof (and I am lazy) - this can return more than k. + + int read = 0; + std::list::iterator it = buckets.end(); + while (read < k ) + { + std::list::iterator eit = (*it)->elements.begin(); + while (eit != (*it)->elements.end() ) + { + t->Assign(read, (*eit)->value->Ref()); + read++; + } + + if ( it == buckets.begin() ) + break; + } + + + Unref(v); + return t; + } + +void Topk::Encountered(Val* encountered) + { + // ok, let's see if we already know this one. + + // check type compatibility + if ( numElements == 0 ) + type = encountered->Type()->Ref(); + else + if ( !same_type(type, encountered->Type()) ) + { + reporter->Error("Trying to add element to topk with differing type from other elements"); + return; + } + + + // Step 1 - get the hash. + HashKey* key = GetHash(encountered); + Element* e = (Element*) elementDict->Lookup(key); + + if ( e == 0 ) + { + e = new Element(); + e->epsilon = 0; + e->value = encountered->Ref(); // or no ref? + + + // well, we do not know this one yet... + if ( numElements < size ) + { + // brilliant. just add it at position 1 + if ( buckets.size() == 0 || (*buckets.begin())->count > 1 ) + { + Bucket* b = new Bucket(); + b->count = 1; + std::list::iterator pos = buckets.insert(buckets.begin(), b); + b->bucketPos = pos; + b->elements.insert(b->elements.end(), e); + e->parent = b; + } + else + { + Bucket* b = *buckets.begin(); + assert(b->count == 1); + b->elements.insert(b->elements.end(), e); + e->parent = b; + } + + elementDict->Insert(key, e); + numElements++; + delete key; + return; // done. it is at pos 1. + } + else + { + // replace element with min-value + Bucket* b = *buckets.begin(); // bucket with smallest elements + // evict oldest element with least hits. + assert(b->elements.size() > 0); + HashKey* deleteKey = GetHash((*(b->elements.begin()))->value); + b->elements.erase(b->elements.begin()); + Element* deleteElement = (Element*) elementDict->RemoveEntry(deleteKey); + assert(deleteElement); // there has to have been a minimal element... + delete deleteElement; + delete deleteKey; + // and add the new one to the end + e->epsilon = b->count; + b->elements.insert(b->elements.end(), e); + elementDict->Insert(key, e); + // fallthrough, increment operation has to run! + } + + } + + // ok, we now have an element in e + delete key; + IncrementCounter(e); // well, this certainly was anticlimatic. + + } + +void Topk::IncrementCounter(Element* e) + { + Bucket* currBucket = e->parent; + uint64 currcount = currBucket->count; + + // well, let's test if there is a bucket for currcount++ + std::list::iterator bucketIter = currBucket->bucketPos; + + Bucket* nextBucket = 0; + + bucketIter++; + + if ( bucketIter != buckets.end() ) + { + if ( (*bucketIter)->count == currcount+1 ) + nextBucket = *bucketIter; + } + + if ( nextBucket == 0 ) + { + // the bucket for the value that we want does not exist. + // create it... + + Bucket* b = new Bucket(); + b->count = currcount+1; + + std::list::iterator nextBucketPos = buckets.insert(bucketIter, b); + b->bucketPos = nextBucketPos; // and give it the iterator we know now. + + nextBucket = b; + } + + // ok, now we have the new bucket in nextBucket. Shift the element over... + currBucket->elements.remove(e); + nextBucket->elements.insert(nextBucket->elements.end(), e); + + e->parent = nextBucket; + + // if currBucket is empty, we have to delete it now + if ( currBucket->elements.size() == 0 ) + { + buckets.remove(currBucket); + delete currBucket; + currBucket = 0; + } + + + } + +}; diff --git a/src/Topk.h b/src/Topk.h new file mode 100644 index 0000000000..b38e1e8ab3 --- /dev/null +++ b/src/Topk.h @@ -0,0 +1,56 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#ifndef topk_h +#define topk_h + +#include +#include "Val.h" +#include "CompHash.h" + +// This class implements the top-k algorithm. Or - to be more precise - my interpretation of it. + +namespace Topk { + +struct Element; + +struct Bucket { + uint64 count; + std::list elements; + std::list::iterator bucketPos; // iterators only get invalidated for removed elements. This one points to us - so it is invalid when we are no longer there. Cute, isn't it? +}; + +struct Element { + uint64 epsilon; + Val* value; + Bucket* parent; + + ~Element(); +}; + + +declare(PDict, Element); + +class Topk { + +public: + Topk(uint64 size); + ~Topk(); + void Encountered(Val* value); // we saw something + VectorVal* getTopK(int k); // returns vector + +private: + void IncrementCounter(Element* e); + HashKey* GetHash(Val*); // this probably should go somewhere else. + + BroType* type; + std::list buckets; + PDict(Element)* elementDict; + uint64 size; // how many elements are we tracking? + uint64 numElements; // how many elements do we have at the moment + + +}; + +}; + +#endif From ce7ad003f251e8c76be3d07190907157cd9a87c1 Mon Sep 17 00:00:00 2001 From: Bernhard Amann Date: Mon, 22 Apr 2013 02:40:42 -0700 Subject: [PATCH 02/16] well, a test that works.. Note: merging top-k data structures is not yet possible (and is actually quite awkward/expensive). I will have to think about how to do that for a bit... --- src/Topk.cc | 23 +++++++---- src/Topk.h | 9 ++-- src/bro.bif | 27 ++++++++++++ testing/btest/Baseline/bifs.topk/out | 7 ++++ testing/btest/bifs/topk.bro | 61 ++++++++++++++++++++++++++++ 5 files changed, 115 insertions(+), 12 deletions(-) create mode 100644 testing/btest/Baseline/bifs.topk/out create mode 100644 testing/btest/bifs/topk.bro diff --git a/src/Topk.cc b/src/Topk.cc index ef7d7bfbd8..8f4d63ed78 100644 --- a/src/Topk.cc +++ b/src/Topk.cc @@ -19,7 +19,7 @@ Element::~Element() value=0; } -HashKey* Topk::GetHash(Val* v) +HashKey* TopkVal::GetHash(Val* v) { TypeList* tl = new TypeList(v->Type()); tl->Append(v->Type()); @@ -31,15 +31,16 @@ HashKey* Topk::GetHash(Val* v) return key; } -Topk::Topk(uint64 arg_size) +TopkVal::TopkVal(uint64 arg_size) : OpaqueVal(new OpaqueType("topk")) { elementDict = new PDict(Element); elementDict->SetDeleteFunc(topk_element_hash_delete_func); size = arg_size; type = 0; + numElements = 0; } -Topk::~Topk() +TopkVal::~TopkVal() { elementDict->Clear(); delete elementDict; @@ -57,7 +58,7 @@ Topk::~Topk() type = 0; } -VectorVal* Topk::getTopK(int k) // returns vector +VectorVal* TopkVal::getTopK(int k) // returns vector { if ( numElements == 0 ) { @@ -75,17 +76,23 @@ VectorVal* Topk::getTopK(int k) // returns vector int read = 0; std::list::iterator it = buckets.end(); + it--; while (read < k ) { + //printf("Bucket %llu\n", (*it)->count); std::list::iterator eit = (*it)->elements.begin(); while (eit != (*it)->elements.end() ) { + //printf("Size: %ld\n", (*it)->elements.size()); t->Assign(read, (*eit)->value->Ref()); read++; + eit++; } if ( it == buckets.begin() ) break; + + it--; } @@ -93,13 +100,14 @@ VectorVal* Topk::getTopK(int k) // returns vector return t; } -void Topk::Encountered(Val* encountered) +void TopkVal::Encountered(Val* encountered) { // ok, let's see if we already know this one. + //printf("NumElements: %d\n", numElements); // check type compatibility if ( numElements == 0 ) - type = encountered->Type()->Ref(); + type = encountered->Type()->Ref()->Ref(); else if ( !same_type(type, encountered->Type()) ) { @@ -161,6 +169,7 @@ void Topk::Encountered(Val* encountered) e->epsilon = b->count; b->elements.insert(b->elements.end(), e); elementDict->Insert(key, e); + e->parent = b; // fallthrough, increment operation has to run! } @@ -172,7 +181,7 @@ void Topk::Encountered(Val* encountered) } -void Topk::IncrementCounter(Element* e) +void TopkVal::IncrementCounter(Element* e) { Bucket* currBucket = e->parent; uint64 currcount = currBucket->count; diff --git a/src/Topk.h b/src/Topk.h index b38e1e8ab3..7c983ebdfc 100644 --- a/src/Topk.h +++ b/src/Topk.h @@ -6,6 +6,7 @@ #include #include "Val.h" #include "CompHash.h" +#include "OpaqueVal.h" // This class implements the top-k algorithm. Or - to be more precise - my interpretation of it. @@ -30,11 +31,11 @@ struct Element { declare(PDict, Element); -class Topk { +class TopkVal : public OpaqueVal { public: - Topk(uint64 size); - ~Topk(); + TopkVal(uint64 size); + ~TopkVal(); void Encountered(Val* value); // we saw something VectorVal* getTopK(int k); // returns vector @@ -47,8 +48,6 @@ private: PDict(Element)* elementDict; uint64 size; // how many elements are we tracking? uint64 numElements; // how many elements do we have at the moment - - }; }; diff --git a/src/bro.bif b/src/bro.bif index ac54da0e75..695337bcf1 100644 --- a/src/bro.bif +++ b/src/bro.bif @@ -5642,3 +5642,30 @@ function anonymize_addr%(a: addr, cl: IPAddrAnonymizationClass%): addr } %} + + +%%{ +#include "Topk.h" +%%} + +function topk_init%(size: count%): opaque of topk + %{ + Topk::TopkVal* v = new Topk::TopkVal(size); + return v; + %} + +function topk_add%(handle: opaque of topk, value: any%): any + %{ + assert(handle); + Topk::TopkVal* h = (Topk::TopkVal*) handle; + h->Encountered(value); + + return 0; + %} + +function topk_get_top%(handle: opaque of topk, k: count%): any + %{ + assert(handle); + Topk::TopkVal* h = (Topk::TopkVal*) handle; + return h->getTopK(k); + %} diff --git a/testing/btest/Baseline/bifs.topk/out b/testing/btest/Baseline/bifs.topk/out new file mode 100644 index 0000000000..94aa5bd572 --- /dev/null +++ b/testing/btest/Baseline/bifs.topk/out @@ -0,0 +1,7 @@ +[b, c] +[d, c] +[d, e] +[f, e] +[f, e] +[g, e] +[c, e, d] diff --git a/testing/btest/bifs/topk.bro b/testing/btest/bifs/topk.bro new file mode 100644 index 0000000000..af1f38c773 --- /dev/null +++ b/testing/btest/bifs/topk.bro @@ -0,0 +1,61 @@ +# @TEST-EXEC: bro -b %INPUT > out +# @TEST-EXEC: btest-diff out + +event bro_init() + { + local k1 = topk_init(2); + + # first - peculiarity check... + topk_add(k1, "a"); + topk_add(k1, "b"); + topk_add(k1, "b"); + topk_add(k1, "c"); + + local s = topk_get_top(k1, 5); + print s; + + topk_add(k1, "d"); + s = topk_get_top(k1, 5); + print s; + + topk_add(k1, "e"); + s = topk_get_top(k1, 5); + print s; + + topk_add(k1, "f"); + s = topk_get_top(k1, 5); + print s; + + topk_add(k1, "e"); + s = topk_get_top(k1, 5); + print s; + + topk_add(k1, "g"); + s = topk_get_top(k1, 5); + print s; + + k1 = topk_init(100); + topk_add(k1, "a"); + topk_add(k1, "b"); + topk_add(k1, "b"); + topk_add(k1, "c"); + topk_add(k1, "c"); + topk_add(k1, "c"); + topk_add(k1, "c"); + topk_add(k1, "c"); + topk_add(k1, "c"); + topk_add(k1, "d"); + topk_add(k1, "d"); + topk_add(k1, "d"); + topk_add(k1, "d"); + topk_add(k1, "e"); + topk_add(k1, "e"); + topk_add(k1, "e"); + topk_add(k1, "e"); + topk_add(k1, "e"); + topk_add(k1, "f"); + s = topk_get_top(k1, 3); + print s; + + +} From de5769a88fd123cf6f34f978cf67ec5ee494de15 Mon Sep 17 00:00:00 2001 From: Bernhard Amann Date: Tue, 23 Apr 2013 15:19:01 -0700 Subject: [PATCH 03/16] topk for sumstats --- .../frameworks/sumstats/plugins/__load__.bro | 3 +- .../base/frameworks/sumstats/plugins/topk.bro | 24 ++++++++++ src/Topk.cc | 33 ++++++++++++- src/Topk.h | 4 +- src/bro.bif | 15 ++++++ testing/btest/Baseline/bifs.topk/.stderr | 6 +++ testing/btest/Baseline/bifs.topk/out | 30 ++++++++++++ .../.stdout | 8 ++++ testing/btest/bifs/topk.bro | 31 ++++++++++++ .../scripts/base/frameworks/sumstats/topk.bro | 48 +++++++++++++++++++ 10 files changed, 198 insertions(+), 4 deletions(-) create mode 100644 scripts/base/frameworks/sumstats/plugins/topk.bro create mode 100644 testing/btest/Baseline/bifs.topk/.stderr create mode 100644 testing/btest/Baseline/scripts.base.frameworks.sumstats.topk/.stdout create mode 100644 testing/btest/scripts/base/frameworks/sumstats/topk.bro diff --git a/scripts/base/frameworks/sumstats/plugins/__load__.bro b/scripts/base/frameworks/sumstats/plugins/__load__.bro index 0d4c2ed302..35191a4776 100644 --- a/scripts/base/frameworks/sumstats/plugins/__load__.bro +++ b/scripts/base/frameworks/sumstats/plugins/__load__.bro @@ -4,5 +4,6 @@ @load ./sample @load ./std-dev @load ./sum +@load ./topk @load ./unique -@load ./variance \ No newline at end of file +@load ./variance diff --git a/scripts/base/frameworks/sumstats/plugins/topk.bro b/scripts/base/frameworks/sumstats/plugins/topk.bro new file mode 100644 index 0000000000..f64e9fb18d --- /dev/null +++ b/scripts/base/frameworks/sumstats/plugins/topk.bro @@ -0,0 +1,24 @@ +@load base/frameworks/sumstats + +module SumStats; + +export { + redef enum Calculation += { + TOPK + }; + + redef record ResultVal += { + topk: opaque of topk &default=topk_init(500); + }; + +} + +hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal) + { + if ( TOPK in r$apply ) + { + topk_add(rv$topk, obs); + } + } + + diff --git a/src/Topk.cc b/src/Topk.cc index 8f4d63ed78..d5866b4f41 100644 --- a/src/Topk.cc +++ b/src/Topk.cc @@ -19,7 +19,7 @@ Element::~Element() value=0; } -HashKey* TopkVal::GetHash(Val* v) +HashKey* TopkVal::GetHash(Val* v) const { TypeList* tl = new TypeList(v->Type()); tl->Append(v->Type()); @@ -58,7 +58,8 @@ TopkVal::~TopkVal() type = 0; } -VectorVal* TopkVal::getTopK(int k) // returns vector + +VectorVal* TopkVal::getTopK(int k) // returns vector { if ( numElements == 0 ) { @@ -100,6 +101,34 @@ VectorVal* TopkVal::getTopK(int k) // returns vector return t; } +uint64_t TopkVal::getCount(Val* value) const + { + HashKey* key = GetHash(value); + Element* e = (Element*) elementDict->Lookup(key); + + if ( e == 0 ) + { + reporter->Error("getCount for element that is not in top-k"); + return 0; + } + + return e->parent->count; + } + +uint64_t TopkVal::getEpsilon(Val* value) const + { + HashKey* key = GetHash(value); + Element* e = (Element*) elementDict->Lookup(key); + + if ( e == 0 ) + { + reporter->Error("getEpsilon for element that is not in top-k"); + return 0; + } + + return e->epsilon; + } + void TopkVal::Encountered(Val* encountered) { // ok, let's see if we already know this one. diff --git a/src/Topk.h b/src/Topk.h index 7c983ebdfc..e4c6aa5aea 100644 --- a/src/Topk.h +++ b/src/Topk.h @@ -38,10 +38,12 @@ public: ~TopkVal(); void Encountered(Val* value); // we saw something VectorVal* getTopK(int k); // returns vector + uint64_t getCount(Val* value) const; + uint64_t getEpsilon(Val* value) const; private: void IncrementCounter(Element* e); - HashKey* GetHash(Val*); // this probably should go somewhere else. + HashKey* GetHash(Val*) const; // this probably should go somewhere else. BroType* type; std::list buckets; diff --git a/src/bro.bif b/src/bro.bif index 695337bcf1..e8e78c7872 100644 --- a/src/bro.bif +++ b/src/bro.bif @@ -5669,3 +5669,18 @@ function topk_get_top%(handle: opaque of topk, k: count%): any Topk::TopkVal* h = (Topk::TopkVal*) handle; return h->getTopK(k); %} + +function topk_count%(handle: opaque of topk, value: any%): count + %{ + assert(handle); + Topk::TopkVal* h = (Topk::TopkVal*) handle; + return new Val(h->getCount(value), TYPE_COUNT); + %} + +function topk_epsilon%(handle: opaque of topk, value: any%): count + %{ + assert(handle); + Topk::TopkVal* h = (Topk::TopkVal*) handle; + return new Val(h->getEpsilon(value), TYPE_COUNT); + %} + diff --git a/testing/btest/Baseline/bifs.topk/.stderr b/testing/btest/Baseline/bifs.topk/.stderr new file mode 100644 index 0000000000..f57e35ca51 --- /dev/null +++ b/testing/btest/Baseline/bifs.topk/.stderr @@ -0,0 +1,6 @@ +error: getCount for element that is not in top-k +error: getEpsilon for element that is not in top-k +error: getCount for element that is not in top-k +error: getEpsilon for element that is not in top-k +error: getCount for element that is not in top-k +error: getEpsilon for element that is not in top-k diff --git a/testing/btest/Baseline/bifs.topk/out b/testing/btest/Baseline/bifs.topk/out index 94aa5bd572..2116a30a12 100644 --- a/testing/btest/Baseline/bifs.topk/out +++ b/testing/btest/Baseline/bifs.topk/out @@ -1,7 +1,37 @@ [b, c] +0 +0 +2 +0 +2 +1 [d, c] +0 +0 +2 +1 +3 +2 [d, e] +3 +2 +3 +2 [f, e] +4 +3 +3 +2 [f, e] +4 +3 +4 +2 [g, e] +0 +0 +4 +2 +5 +4 [c, e, d] diff --git a/testing/btest/Baseline/scripts.base.frameworks.sumstats.topk/.stdout b/testing/btest/Baseline/scripts.base.frameworks.sumstats.topk/.stdout new file mode 100644 index 0000000000..c85316eecc --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.sumstats.topk/.stdout @@ -0,0 +1,8 @@ +Top entries for key counter +Num: 1, count: 99, epsilon: 0 +Num: 2, count: 98, epsilon: 0 +Num: 3, count: 97, epsilon: 0 +Num: 4, count: 96, epsilon: 0 +Num: 5, count: 95, epsilon: 0 +Top entries for key two +Num: 1, count: 2, epsilon: 0 diff --git a/testing/btest/bifs/topk.bro b/testing/btest/bifs/topk.bro index af1f38c773..9d936ce2f4 100644 --- a/testing/btest/bifs/topk.bro +++ b/testing/btest/bifs/topk.bro @@ -1,5 +1,6 @@ # @TEST-EXEC: bro -b %INPUT > out # @TEST-EXEC: btest-diff out +# @TEST-EXEC: btest-diff .stderr event bro_init() { @@ -13,26 +14,56 @@ event bro_init() local s = topk_get_top(k1, 5); print s; + print topk_count(k1, "a"); + print topk_epsilon(k1, "a"); + print topk_count(k1, "b"); + print topk_epsilon(k1, "b"); + print topk_count(k1, "c"); + print topk_epsilon(k1, "c"); topk_add(k1, "d"); s = topk_get_top(k1, 5); print s; + print topk_count(k1, "b"); + print topk_epsilon(k1, "b"); + print topk_count(k1, "c"); + print topk_epsilon(k1, "c"); + print topk_count(k1, "d"); + print topk_epsilon(k1, "d"); topk_add(k1, "e"); s = topk_get_top(k1, 5); print s; + print topk_count(k1, "d"); + print topk_epsilon(k1, "d"); + print topk_count(k1, "e"); + print topk_epsilon(k1, "e"); topk_add(k1, "f"); s = topk_get_top(k1, 5); print s; + print topk_count(k1, "f"); + print topk_epsilon(k1, "f"); + print topk_count(k1, "e"); + print topk_epsilon(k1, "e"); topk_add(k1, "e"); s = topk_get_top(k1, 5); print s; + print topk_count(k1, "f"); + print topk_epsilon(k1, "f"); + print topk_count(k1, "e"); + print topk_epsilon(k1, "e"); topk_add(k1, "g"); s = topk_get_top(k1, 5); print s; + print topk_count(k1, "f"); + print topk_epsilon(k1, "f"); + print topk_count(k1, "e"); + print topk_epsilon(k1, "e"); + print topk_count(k1, "g"); + print topk_epsilon(k1, "g"); k1 = topk_init(100); topk_add(k1, "a"); diff --git a/testing/btest/scripts/base/frameworks/sumstats/topk.bro b/testing/btest/scripts/base/frameworks/sumstats/topk.bro new file mode 100644 index 0000000000..22a5af1bc7 --- /dev/null +++ b/testing/btest/scripts/base/frameworks/sumstats/topk.bro @@ -0,0 +1,48 @@ +# @TEST-EXEC: bro %INPUT +# @TEST-EXEC: btest-diff .stdout + +event bro_init() &priority=5 + { + local r1: SumStats::Reducer = [$stream="test.metric", + $apply=set(SumStats::TOPK)]; + SumStats::create([$epoch=3secs, + $reducers=set(r1), + $epoch_finished(data: SumStats::ResultTable) = + { + for ( key in data ) + { + local r = data[key]["test.metric"]; + + local s: vector of SumStats::Observation; + s = topk_get_top(r$topk, 5); + + print fmt("Top entries for key %s", key$str); + for ( element in s ) + { + print fmt("Num: %d, count: %d, epsilon: %d", s[element]$num, topk_count(r$topk, s[element]), topk_epsilon(r$topk, s[element])); + } + + } + } + ]); + + + const loop_v: vector of count = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100}; + + local a: count; + a = 0; + + for ( i in loop_v ) + { + a = a + 1; + for ( j in loop_v ) + { + if ( i < j ) + SumStats::observe("test.metric", [$str="counter"], [$num=a]); + } + } + + + SumStats::observe("test.metric", [$str="two"], [$num=1]); + SumStats::observe("test.metric", [$str="two"], [$num=1]); + } From a426c7612270b5cc40143cc14b4e3c42c6499617 Mon Sep 17 00:00:00 2001 From: Bernhard Amann Date: Tue, 23 Apr 2013 18:23:34 -0700 Subject: [PATCH 04/16] make the get function const --- src/Topk.cc | 4 ++-- src/Topk.h | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Topk.cc b/src/Topk.cc index d5866b4f41..b89fa2e96f 100644 --- a/src/Topk.cc +++ b/src/Topk.cc @@ -59,7 +59,7 @@ TopkVal::~TopkVal() } -VectorVal* TopkVal::getTopK(int k) // returns vector +VectorVal* TopkVal::getTopK(int k) const // returns vector { if ( numElements == 0 ) { @@ -76,7 +76,7 @@ VectorVal* TopkVal::getTopK(int k) // returns vector // in any case - just to make this future-proof (and I am lazy) - this can return more than k. int read = 0; - std::list::iterator it = buckets.end(); + std::list::const_iterator it = buckets.end(); it--; while (read < k ) { diff --git a/src/Topk.h b/src/Topk.h index e4c6aa5aea..f486948c5c 100644 --- a/src/Topk.h +++ b/src/Topk.h @@ -37,7 +37,7 @@ public: TopkVal(uint64 size); ~TopkVal(); void Encountered(Val* value); // we saw something - VectorVal* getTopK(int k); // returns vector + VectorVal* getTopK(int k) const; // returns vector uint64_t getCount(Val* value) const; uint64_t getEpsilon(Val* value) const; From 6f863d2259a5d388068f4ad70571ab62dcaa9cd4 Mon Sep 17 00:00:00 2001 From: Bernhard Amann Date: Tue, 23 Apr 2013 23:24:02 -0700 Subject: [PATCH 05/16] add serialization for topk --- src/SerialTypes.h | 1 + src/Topk.cc | 108 ++++++++++++++++++ src/Topk.h | 5 + .../btest/Baseline/bifs.topk_persistence/out | 21 ++++ testing/btest/bifs/topk_persistence.bro | 74 ++++++++++++ 5 files changed, 209 insertions(+) create mode 100644 testing/btest/Baseline/bifs.topk_persistence/out create mode 100644 testing/btest/bifs/topk_persistence.bro diff --git a/src/SerialTypes.h b/src/SerialTypes.h index 723badab1e..f07392eff4 100644 --- a/src/SerialTypes.h +++ b/src/SerialTypes.h @@ -104,6 +104,7 @@ SERIAL_VAL(MD5_VAL, 16) SERIAL_VAL(SHA1_VAL, 17) SERIAL_VAL(SHA256_VAL, 18) SERIAL_VAL(ENTROPY_VAL, 19) +SERIAL_VAL(TOPK_VAL, 20) #define SERIAL_EXPR(name, val) SERIAL_CONST(name, val, EXPR) SERIAL_EXPR(EXPR, 1) diff --git a/src/Topk.cc b/src/Topk.cc index b89fa2e96f..a31f49adf4 100644 --- a/src/Topk.cc +++ b/src/Topk.cc @@ -3,9 +3,13 @@ #include "Topk.h" #include "CompHash.h" #include "Reporter.h" +#include "Serializer.h" + namespace Topk { +IMPLEMENT_SERIAL(TopkVal, SER_TOPK_VAL); + static void topk_element_hash_delete_func(void* val) { Element* e = (Element*) val; @@ -40,6 +44,15 @@ TopkVal::TopkVal(uint64 arg_size) : OpaqueVal(new OpaqueType("topk")) numElements = 0; } +TopkVal::TopkVal() : OpaqueVal(new OpaqueType("topk")) + { + elementDict = new PDict(Element); + elementDict->SetDeleteFunc(topk_element_hash_delete_func); + size = 0; + type = 0; + numElements = 0; + } + TopkVal::~TopkVal() { elementDict->Clear(); @@ -59,6 +72,101 @@ TopkVal::~TopkVal() } +bool TopkVal::DoSerialize(SerialInfo* info) const + { + DO_SERIALIZE(SER_TOPK_VAL, OpaqueVal); + + bool v = true; + + v &= SERIALIZE(size); + v &= SERIALIZE(numElements); + bool type_present = (type != 0); + v &= SERIALIZE(type_present); + if ( type_present ) + v &= type->Serialize(info); + else + assert(numElements == 0); + + int i = 0; + std::list::const_iterator it = buckets.begin(); + while ( it != buckets.end() ) + { + Bucket* b = *it; + uint32_t elements_count = b->elements.size(); + v &= SERIALIZE(elements_count); + v &= SERIALIZE(b->count); + std::list::const_iterator eit = b->elements.begin(); + while ( eit != b->elements.end() ) + { + Element* element = *eit; + v &= SERIALIZE(element->epsilon); + v &= element->value->Serialize(info); + + eit++; + i++; + } + + it++; + } + + assert(i == numElements); + + return v; + } + +bool TopkVal::DoUnserialize(UnserialInfo* info) + { + DO_UNSERIALIZE(OpaqueVal); + + bool v = true; + + v &= UNSERIALIZE(&size); + v &= UNSERIALIZE(&numElements); + bool type_present = false; + v &= UNSERIALIZE(&type_present); + if ( type_present ) + { + type = BroType::Unserialize(info); + assert(type); + } + else + assert(numElements == 0); + + int i = 0; + while ( i < numElements ) + { + Bucket* b = new Bucket(); + uint32_t elements_count; + v &= UNSERIALIZE(&elements_count); + v &= UNSERIALIZE(&b->count); + b->bucketPos = buckets.insert(buckets.end(), b); + + for ( int j = 0; j < elements_count; j++ ) + { + Element* e = new Element(); + v &= UNSERIALIZE(&e->epsilon); + e->value = Val::Unserialize(info, type); + e->parent = b; + + b->elements.insert(b->elements.end(), e); + + HashKey* key = GetHash(e->value); + assert ( elementDict->Lookup(key) == 0 ); + + elementDict->Insert(key, e); + delete key; + + + i++; + } + } + + assert(i == numElements); + + return v; + } + + VectorVal* TopkVal::getTopK(int k) const // returns vector { if ( numElements == 0 ) diff --git a/src/Topk.h b/src/Topk.h index f486948c5c..0e38319380 100644 --- a/src/Topk.h +++ b/src/Topk.h @@ -41,6 +41,9 @@ public: uint64_t getCount(Val* value) const; uint64_t getEpsilon(Val* value) const; +protected: + TopkVal(); // for deserialize + private: void IncrementCounter(Element* e); HashKey* GetHash(Val*) const; // this probably should go somewhere else. @@ -50,6 +53,8 @@ private: PDict(Element)* elementDict; uint64 size; // how many elements are we tracking? uint64 numElements; // how many elements do we have at the moment + + DECLARE_SERIAL(TopkVal); }; }; diff --git a/testing/btest/Baseline/bifs.topk_persistence/out b/testing/btest/Baseline/bifs.topk_persistence/out new file mode 100644 index 0000000000..ef3d0cef30 --- /dev/null +++ b/testing/btest/Baseline/bifs.topk_persistence/out @@ -0,0 +1,21 @@ +1 +2 +6 +4 +5 +1 +[c, e, d] +1 +2 +6 +4 +5 +1 +[c, e, d] +2 +4 +12 +8 +10 +2 +[c, e, d] diff --git a/testing/btest/bifs/topk_persistence.bro b/testing/btest/bifs/topk_persistence.bro new file mode 100644 index 0000000000..4d599c2780 --- /dev/null +++ b/testing/btest/bifs/topk_persistence.bro @@ -0,0 +1,74 @@ +# @TEST-EXEC: bro -b %INPUT runnumber=1 >out +# @TEST-EXEC: bro -b %INPUT runnumber=2 >>out +# @TEST-EXEC: bro -b %INPUT runnumber=3 >>out +# @TEST-EXEC: btest-diff out + +global runnumber: count &redef; # differentiate runs + +global k1: opaque of topk &persistent; +global k2: opaque of topk &persistent; + +event bro_init() + { + + k2 = topk_init(20); + + if ( runnumber == 1 ) + { + k1 = topk_init(100); + + topk_add(k1, "a"); + topk_add(k1, "b"); + topk_add(k1, "b"); + topk_add(k1, "c"); + topk_add(k1, "c"); + topk_add(k1, "c"); + topk_add(k1, "c"); + topk_add(k1, "c"); + topk_add(k1, "c"); + topk_add(k1, "d"); + topk_add(k1, "d"); + topk_add(k1, "d"); + topk_add(k1, "d"); + topk_add(k1, "e"); + topk_add(k1, "e"); + topk_add(k1, "e"); + topk_add(k1, "e"); + topk_add(k1, "e"); + topk_add(k1, "f"); + } + + local s = topk_get_top(k1, 3); + print topk_count(k1, "a"); + print topk_count(k1, "b"); + print topk_count(k1, "c"); + print topk_count(k1, "d"); + print topk_count(k1, "e"); + print topk_count(k1, "f"); + + if ( runnumber == 2 ) + { + topk_add(k1, "a"); + topk_add(k1, "b"); + topk_add(k1, "b"); + topk_add(k1, "c"); + topk_add(k1, "c"); + topk_add(k1, "c"); + topk_add(k1, "c"); + topk_add(k1, "c"); + topk_add(k1, "c"); + topk_add(k1, "d"); + topk_add(k1, "d"); + topk_add(k1, "d"); + topk_add(k1, "d"); + topk_add(k1, "e"); + topk_add(k1, "e"); + topk_add(k1, "e"); + topk_add(k1, "e"); + topk_add(k1, "e"); + topk_add(k1, "f"); + } + + print s; + + } From 2f48008c423019c05a209939f00d8c52d29eb1ee Mon Sep 17 00:00:00 2001 From: Bernhard Amann Date: Wed, 24 Apr 2013 06:17:51 -0700 Subject: [PATCH 06/16] implement merging for top-k. I am not (entirely) sure that this is mathematically correct, but I am (more and more) getting the feeling that it... might be. In any case - this was the last step and now it should work in cluster settings. --- .../base/frameworks/sumstats/plugins/topk.bro | 7 ++ src/Topk.cc | 106 ++++++++++++++++-- src/Topk.h | 3 +- src/bro.bif | 13 +++ testing/btest/Baseline/bifs.topk/.stderr | 4 + testing/btest/Baseline/bifs.topk/out | 20 ++++ testing/btest/bifs/topk.bro | 28 +++++ 7 files changed, 173 insertions(+), 8 deletions(-) diff --git a/scripts/base/frameworks/sumstats/plugins/topk.bro b/scripts/base/frameworks/sumstats/plugins/topk.bro index f64e9fb18d..6107a252ae 100644 --- a/scripts/base/frameworks/sumstats/plugins/topk.bro +++ b/scripts/base/frameworks/sumstats/plugins/topk.bro @@ -22,3 +22,10 @@ hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal) } +hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal) + { + result$topk = topk_init(500); + + topk_merge(result$topk, rv1$topk); + topk_merge(result$topk, rv2$topk); + } diff --git a/src/Topk.cc b/src/Topk.cc index a31f49adf4..8ad2113235 100644 --- a/src/Topk.cc +++ b/src/Topk.cc @@ -71,6 +71,97 @@ TopkVal::~TopkVal() type = 0; } +void TopkVal::Merge(const TopkVal* value) + { + + if ( type == 0 ) + { + assert(numElements == 0); + type = value->type->Ref(); + } + else + if ( !same_type(type, value->type) ) + { + reporter->Error("Tried to merge top-k elements of differing types. Aborted"); + return; + } + + std::list::const_iterator it = value->buckets.begin(); + while ( it != value->buckets.end() ) + { + Bucket* b = *it; + uint64_t currcount = b->count; + std::list::const_iterator eit = b->elements.begin(); + + while ( eit != b->elements.end() ) + { + Element* e = *eit; + // lookup if we already know this one... + HashKey* key = GetHash(e->value); + Element* olde = (Element*) elementDict->Lookup(key); + + if ( olde == 0 ) + { + olde = new Element(); + olde->epsilon=0; + olde->value = e->value->Ref(); + // insert at bucket position 0 + if ( buckets.size() > 0 ) + { + assert (buckets.front()-> count > 0 ); + } + + Bucket* newbucket = new Bucket(); + newbucket->count = 0; + newbucket->bucketPos = buckets.insert(buckets.begin(), newbucket); + + olde->parent = newbucket; + newbucket->elements.insert(newbucket->elements.end(), olde); + + elementDict->Insert(key, olde); + numElements++; + + } + + // now that we are sure that the old element is present - increment epsilon + olde->epsilon += e->epsilon; + // and increment position... + IncrementCounter(olde, currcount); + delete key; + + eit++; + } + + it++; + } + + // now we have added everything. And our top-k table could be too big. + // prune everything... + + assert(size > 0); + while ( numElements > size ) + { + assert(buckets.size() > 0 ); + Bucket* b = buckets.front(); + assert(b->elements.size() > 0); + + Element* e = b->elements.front(); + HashKey* key = GetHash(e->value); + elementDict->RemoveEntry(key); + delete e; + + b->elements.pop_front(); + + if ( b->elements.size() == 0 ) + { + delete b; + buckets.pop_front(); + } + + numElements--; + } + + } bool TopkVal::DoSerialize(SerialInfo* info) const { @@ -318,7 +409,8 @@ void TopkVal::Encountered(Val* encountered) } -void TopkVal::IncrementCounter(Element* e) +// increment by count +void TopkVal::IncrementCounter(Element* e, unsigned int count) { Bucket* currBucket = e->parent; uint64 currcount = currBucket->count; @@ -330,11 +422,11 @@ void TopkVal::IncrementCounter(Element* e) bucketIter++; - if ( bucketIter != buckets.end() ) - { - if ( (*bucketIter)->count == currcount+1 ) - nextBucket = *bucketIter; - } + while ( bucketIter != buckets.end() && (*bucketIter)->count < currcount+count ) + bucketIter++; + + if ( bucketIter != buckets.end() && (*bucketIter)->count == currcount+count ) + nextBucket = *bucketIter; if ( nextBucket == 0 ) { @@ -342,7 +434,7 @@ void TopkVal::IncrementCounter(Element* e) // create it... Bucket* b = new Bucket(); - b->count = currcount+1; + b->count = currcount+count; std::list::iterator nextBucketPos = buckets.insert(bucketIter, b); b->bucketPos = nextBucketPos; // and give it the iterator we know now. diff --git a/src/Topk.h b/src/Topk.h index 0e38319380..30e87f7a99 100644 --- a/src/Topk.h +++ b/src/Topk.h @@ -40,12 +40,13 @@ public: VectorVal* getTopK(int k) const; // returns vector uint64_t getCount(Val* value) const; uint64_t getEpsilon(Val* value) const; + void Merge(const TopkVal* value); protected: TopkVal(); // for deserialize private: - void IncrementCounter(Element* e); + void IncrementCounter(Element* e, unsigned int count = 1); HashKey* GetHash(Val*) const; // this probably should go somewhere else. BroType* type; diff --git a/src/bro.bif b/src/bro.bif index e8e78c7872..b6f101c025 100644 --- a/src/bro.bif +++ b/src/bro.bif @@ -5684,3 +5684,16 @@ function topk_epsilon%(handle: opaque of topk, value: any%): count return new Val(h->getEpsilon(value), TYPE_COUNT); %} +function topk_merge%(handle1: opaque of topk, handle2: opaque of topk%): any + %{ + assert(handle1); + assert(handle2); + + Topk::TopkVal* h1 = (Topk::TopkVal*) handle1; + Topk::TopkVal* h2 = (Topk::TopkVal*) handle2; + + h1->Merge(h2); + + return 0; + %} + diff --git a/testing/btest/Baseline/bifs.topk/.stderr b/testing/btest/Baseline/bifs.topk/.stderr index f57e35ca51..f2bd316fd8 100644 --- a/testing/btest/Baseline/bifs.topk/.stderr +++ b/testing/btest/Baseline/bifs.topk/.stderr @@ -4,3 +4,7 @@ error: getCount for element that is not in top-k error: getEpsilon for element that is not in top-k error: getCount for element that is not in top-k error: getEpsilon for element that is not in top-k +error: getCount for element that is not in top-k +error: getEpsilon for element that is not in top-k +error: getCount for element that is not in top-k +error: getEpsilon for element that is not in top-k diff --git a/testing/btest/Baseline/bifs.topk/out b/testing/btest/Baseline/bifs.topk/out index 2116a30a12..8db55eeca8 100644 --- a/testing/btest/Baseline/bifs.topk/out +++ b/testing/btest/Baseline/bifs.topk/out @@ -35,3 +35,23 @@ 5 4 [c, e, d] +6 +0 +5 +0 +4 +0 +[c, e] +6 +0 +5 +0 +0 +0 +[c, e] +12 +0 +10 +0 +0 +0 diff --git a/testing/btest/bifs/topk.bro b/testing/btest/bifs/topk.bro index 9d936ce2f4..92a68999cc 100644 --- a/testing/btest/bifs/topk.bro +++ b/testing/btest/bifs/topk.bro @@ -87,6 +87,34 @@ event bro_init() topk_add(k1, "f"); s = topk_get_top(k1, 3); print s; + print topk_count(k1, "c"); + print topk_epsilon(k1, "c"); + print topk_count(k1, "e"); + print topk_epsilon(k1, "d"); + print topk_count(k1, "d"); + print topk_epsilon(k1, "d"); + local k3 = topk_init(2); + topk_merge(k3, k1); + + s = topk_get_top(k3, 3); + print s; + print topk_count(k3, "c"); + print topk_epsilon(k3, "c"); + print topk_count(k3, "e"); + print topk_epsilon(k3, "e"); + print topk_count(k3, "d"); + print topk_epsilon(k3, "d"); + + topk_merge(k3, k1); + + s = topk_get_top(k3, 3); + print s; + print topk_count(k3, "c"); + print topk_epsilon(k3, "c"); + print topk_count(k3, "e"); + print topk_epsilon(k3, "e"); + print topk_count(k3, "d"); + print topk_epsilon(k3, "d"); } From c0890f2a0f1448dc81f4b3e6b3e54a45a6e5a08b Mon Sep 17 00:00:00 2001 From: Bernhard Amann Date: Wed, 24 Apr 2013 15:01:06 -0700 Subject: [PATCH 07/16] make size of topk-list configureable when using sumstats --- scripts/base/frameworks/sumstats/plugins/topk.bro | 15 +++++++++++++-- src/Topk.h | 1 + src/bro.bif | 7 +++++++ 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/scripts/base/frameworks/sumstats/plugins/topk.bro b/scripts/base/frameworks/sumstats/plugins/topk.bro index 6107a252ae..a830b1c5ec 100644 --- a/scripts/base/frameworks/sumstats/plugins/topk.bro +++ b/scripts/base/frameworks/sumstats/plugins/topk.bro @@ -3,16 +3,27 @@ module SumStats; export { + redef record Reducer += { + ## number of elements to keep in the top-k list + topk_size: count &default=500; + }; + redef enum Calculation += { TOPK }; redef record ResultVal += { - topk: opaque of topk &default=topk_init(500); + topk: opaque of topk &optional; }; } +hook init_resultval_hook(r: Reducer, rv: ResultVal) + { + if ( TOPK in r$apply && ! rv?$topk ) + rv$topk = topk_init(r$topk_size); + } + hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal) { if ( TOPK in r$apply ) @@ -24,7 +35,7 @@ hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal) hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal) { - result$topk = topk_init(500); + result$topk = topk_init(topk_size(rv1$topk)); topk_merge(result$topk, rv1$topk); topk_merge(result$topk, rv2$topk); diff --git a/src/Topk.h b/src/Topk.h index 30e87f7a99..51a2d75251 100644 --- a/src/Topk.h +++ b/src/Topk.h @@ -40,6 +40,7 @@ public: VectorVal* getTopK(int k) const; // returns vector uint64_t getCount(Val* value) const; uint64_t getEpsilon(Val* value) const; + uint64_t getSize() const { return size; } void Merge(const TopkVal* value); protected: diff --git a/src/bro.bif b/src/bro.bif index b6f101c025..4d0db54c8c 100644 --- a/src/bro.bif +++ b/src/bro.bif @@ -5684,6 +5684,13 @@ function topk_epsilon%(handle: opaque of topk, value: any%): count return new Val(h->getEpsilon(value), TYPE_COUNT); %} +function topk_size%(handle: opaque of topk%): count + %{ + assert(handle); + Topk::TopkVal* h = (Topk::TopkVal*) handle; + return new Val(h->getSize(), TYPE_COUNT); + %} + function topk_merge%(handle1: opaque of topk, handle2: opaque of topk%): any %{ assert(handle1); From 12cbf20ce07a8cd8a9eb6fd866d2c83855eda865 Mon Sep 17 00:00:00 2001 From: Bernhard Amann Date: Wed, 24 Apr 2013 15:30:24 -0700 Subject: [PATCH 08/16] add topk cluster test --- .../manager-1..stdout | 9 ++ .../base/frameworks/sumstats/topk-cluster.bro | 110 ++++++++++++++++++ 2 files changed, 119 insertions(+) create mode 100644 testing/btest/Baseline/scripts.base.frameworks.sumstats.topk-cluster/manager-1..stdout create mode 100644 testing/btest/scripts/base/frameworks/sumstats/topk-cluster.bro diff --git a/testing/btest/Baseline/scripts.base.frameworks.sumstats.topk-cluster/manager-1..stdout b/testing/btest/Baseline/scripts.base.frameworks.sumstats.topk-cluster/manager-1..stdout new file mode 100644 index 0000000000..2d076eeac7 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.sumstats.topk-cluster/manager-1..stdout @@ -0,0 +1,9 @@ +Top entries for key counter +Num: 995, count: 100, epsilon: 0 +Num: 1, count: 99, epsilon: 0 +Num: 2, count: 98, epsilon: 0 +Num: 3, count: 97, epsilon: 0 +Num: 4, count: 96, epsilon: 0 +Top entries for key two +Num: 2, count: 4, epsilon: 0 +Num: 1, count: 3, epsilon: 0 diff --git a/testing/btest/scripts/base/frameworks/sumstats/topk-cluster.bro b/testing/btest/scripts/base/frameworks/sumstats/topk-cluster.bro new file mode 100644 index 0000000000..0ade38e86c --- /dev/null +++ b/testing/btest/scripts/base/frameworks/sumstats/topk-cluster.bro @@ -0,0 +1,110 @@ +# @TEST-SERIALIZE: comm +# +# @TEST-EXEC: btest-bg-run manager-1 BROPATH=$BROPATH:.. CLUSTER_NODE=manager-1 bro %INPUT +# @TEST-EXEC: sleep 1 +# @TEST-EXEC: btest-bg-run worker-1 BROPATH=$BROPATH:.. CLUSTER_NODE=worker-1 bro %INPUT +# @TEST-EXEC: btest-bg-run worker-2 BROPATH=$BROPATH:.. CLUSTER_NODE=worker-2 bro %INPUT +# @TEST-EXEC: btest-bg-wait 15 + +# @TEST-EXEC: btest-diff manager-1/.stdout +# +@TEST-START-FILE cluster-layout.bro +redef Cluster::nodes = { + ["manager-1"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=37757/tcp, $workers=set("worker-1", "worker-2")], + ["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=37760/tcp, $manager="manager-1", $interface="eth0"], + ["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=37761/tcp, $manager="manager-1", $interface="eth1"], +}; +@TEST-END-FILE + +redef Log::default_rotation_interval = 0secs; + + +event bro_init() &priority=5 + { + local r1: SumStats::Reducer = [$stream="test.metric", + $apply=set(SumStats::TOPK)]; + SumStats::create([$epoch=5secs, + $reducers=set(r1), + $epoch_finished(data: SumStats::ResultTable) = + { + for ( key in data ) + { + local r = data[key]["test.metric"]; + + local s: vector of SumStats::Observation; + s = topk_get_top(r$topk, 5); + + print fmt("Top entries for key %s", key$str); + for ( element in s ) + { + print fmt("Num: %d, count: %d, epsilon: %d", s[element]$num, topk_count(r$topk, s[element]), topk_epsilon(r$topk, s[element])); + } + + terminate(); + } + } + ]); + + + } + +event remote_connection_closed(p: event_peer) + { + terminate(); + } + +global ready_for_data: event(); +redef Cluster::manager2worker_events += /^ready_for_data$/; + +event ready_for_data() + { + const loop_v: vector of count = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100}; + + + if ( Cluster::node == "worker-1" ) + { + + local a: count; + a = 0; + + for ( i in loop_v ) + { + a = a + 1; + for ( j in loop_v ) + { + if ( i < j ) + SumStats::observe("test.metric", [$str="counter"], [$num=a]); + } + } + + + SumStats::observe("test.metric", [$str="two"], [$num=1]); + SumStats::observe("test.metric", [$str="two"], [$num=1]); + } + if ( Cluster::node == "worker-2" ) + { + SumStats::observe("test.metric", [$str="two"], [$num=2]); + SumStats::observe("test.metric", [$str="two"], [$num=2]); + SumStats::observe("test.metric", [$str="two"], [$num=2]); + SumStats::observe("test.metric", [$str="two"], [$num=2]); + SumStats::observe("test.metric", [$str="two"], [$num=1]); + + for ( i in loop_v ) + { + SumStats::observe("test.metric", [$str="counter"], [$num=995]); + } + } + } + +@if ( Cluster::local_node_type() == Cluster::MANAGER ) + +global peer_count = 0; +event remote_connection_handshake_done(p: event_peer) &priority=-5 + { + ++peer_count; + if ( peer_count == 2 ) + event ready_for_data(); + } + +@endif + From fd2e0503068117d406a87b9a6d7d843b59289f59 Mon Sep 17 00:00:00 2001 From: Bernhard Amann Date: Fri, 26 Apr 2013 11:34:07 -0700 Subject: [PATCH 09/16] fix warnings --- src/Topk.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Topk.cc b/src/Topk.cc index 8ad2113235..26f14c4fcb 100644 --- a/src/Topk.cc +++ b/src/Topk.cc @@ -178,7 +178,7 @@ bool TopkVal::DoSerialize(SerialInfo* info) const else assert(numElements == 0); - int i = 0; + uint64_t i = 0; std::list::const_iterator it = buckets.begin(); while ( it != buckets.end() ) { @@ -223,7 +223,7 @@ bool TopkVal::DoUnserialize(UnserialInfo* info) else assert(numElements == 0); - int i = 0; + uint64_t i = 0; while ( i < numElements ) { Bucket* b = new Bucket(); @@ -232,7 +232,7 @@ bool TopkVal::DoUnserialize(UnserialInfo* info) v &= UNSERIALIZE(&b->count); b->bucketPos = buckets.insert(buckets.end(), b); - for ( int j = 0; j < elements_count; j++ ) + for ( uint64_t j = 0; j < elements_count; j++ ) { Element* e = new Element(); v &= UNSERIALIZE(&e->epsilon); From 1accee41edb48f7ed162f879bdf593f505d84b5b Mon Sep 17 00:00:00 2001 From: Bernhard Amann Date: Fri, 26 Apr 2013 14:06:38 -0700 Subject: [PATCH 10/16] fix memory leaks --- src/Topk.cc | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/Topk.cc b/src/Topk.cc index 26f14c4fcb..116a4d3de4 100644 --- a/src/Topk.cc +++ b/src/Topk.cc @@ -26,12 +26,13 @@ Element::~Element() HashKey* TopkVal::GetHash(Val* v) const { TypeList* tl = new TypeList(v->Type()); - tl->Append(v->Type()); + tl->Append(v->Type()->Ref()); CompositeHash* topk_hash = new CompositeHash(tl); Unref(tl); HashKey* key = topk_hash->ComputeHash(v, 1); assert(key); + delete topk_hash; return key; } @@ -311,6 +312,7 @@ uint64_t TopkVal::getCount(Val* value) const return 0; } + delete key; return e->parent->count; } @@ -325,6 +327,7 @@ uint64_t TopkVal::getEpsilon(Val* value) const return 0; } + delete key; return e->epsilon; } From 07ecd31bbdcb85db52a206cb24ced7ea9b32c9f7 Mon Sep 17 00:00:00 2001 From: Bernhard Amann Date: Sun, 28 Apr 2013 21:21:22 -0700 Subject: [PATCH 11/16] in cluster settings, the resultvals can apparently been uninitialized in some special cases --- .../base/frameworks/sumstats/plugins/topk.bro | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/scripts/base/frameworks/sumstats/plugins/topk.bro b/scripts/base/frameworks/sumstats/plugins/topk.bro index a830b1c5ec..ed6074b081 100644 --- a/scripts/base/frameworks/sumstats/plugins/topk.bro +++ b/scripts/base/frameworks/sumstats/plugins/topk.bro @@ -35,8 +35,18 @@ hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal) hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal) { - result$topk = topk_init(topk_size(rv1$topk)); + if ( rv1?$topk ) + { + result$topk = topk_init(topk_size(rv1$topk)); - topk_merge(result$topk, rv1$topk); - topk_merge(result$topk, rv2$topk); + topk_merge(result$topk, rv1$topk); + if ( rv2?$topk ) + topk_merge(result$topk, rv2$topk); + } + else if ( rv2?$topk ) + { + result$topk = topk_init(topk_size(rv2$topk)); + topk_merge(result$topk, rv2$topk); + } + } From 160da6f1a6c246532705ac8a0f5ab49eee51c00e Mon Sep 17 00:00:00 2001 From: Bernhard Amann Date: Sun, 28 Apr 2013 21:55:06 -0700 Subject: [PATCH 12/16] add sum function that can be used to get the number of total observed elements. Add methods to merge with and without pruning (before only merge method was with pruning, which invalidates the number of total observed elements) --- src/Topk.cc | 60 +++++++++++++++++------- src/Topk.h | 32 ++++++++++++- src/bro.bif | 54 +++++++++++++++++++++ testing/btest/Baseline/bifs.topk/.stderr | 1 + testing/btest/Baseline/bifs.topk/out | 24 ++++++++++ testing/btest/bifs/topk.bro | 38 ++++++++++++++- 6 files changed, 187 insertions(+), 22 deletions(-) diff --git a/src/Topk.cc b/src/Topk.cc index 116a4d3de4..2527ecc4bc 100644 --- a/src/Topk.cc +++ b/src/Topk.cc @@ -43,6 +43,7 @@ TopkVal::TopkVal(uint64 arg_size) : OpaqueVal(new OpaqueType("topk")) size = arg_size; type = 0; numElements = 0; + pruned = false; } TopkVal::TopkVal() : OpaqueVal(new OpaqueType("topk")) @@ -72,7 +73,7 @@ TopkVal::~TopkVal() type = 0; } -void TopkVal::Merge(const TopkVal* value) +void TopkVal::Merge(const TopkVal* value, bool doPrune) { if ( type == 0 ) @@ -140,26 +141,31 @@ void TopkVal::Merge(const TopkVal* value) // prune everything... assert(size > 0); - while ( numElements > size ) + + if ( doPrune ) { - assert(buckets.size() > 0 ); - Bucket* b = buckets.front(); - assert(b->elements.size() > 0); - - Element* e = b->elements.front(); - HashKey* key = GetHash(e->value); - elementDict->RemoveEntry(key); - delete e; - - b->elements.pop_front(); - - if ( b->elements.size() == 0 ) + while ( numElements > size ) { - delete b; - buckets.pop_front(); - } + pruned = true; + assert(buckets.size() > 0 ); + Bucket* b = buckets.front(); + assert(b->elements.size() > 0); - numElements--; + Element* e = b->elements.front(); + HashKey* key = GetHash(e->value); + elementDict->RemoveEntry(key); + delete e; + + b->elements.pop_front(); + + if ( b->elements.size() == 0 ) + { + delete b; + buckets.pop_front(); + } + + numElements--; + } } } @@ -330,6 +336,24 @@ uint64_t TopkVal::getEpsilon(Val* value) const delete key; return e->epsilon; } + +uint64_t TopkVal::getSum() const + { + uint64_t sum = 0; + + std::list::const_iterator it = buckets.begin(); + while ( it != buckets.end() ) + { + sum += (*it)->elements.size() * (*it)->count; + + it++; + } + + if ( pruned ) + reporter->Warning("TopkVal::getSum() was used on a pruned data structure. Result values do not represent total element count"); + + return sum; + } void TopkVal::Encountered(Val* encountered) { diff --git a/src/Topk.h b/src/Topk.h index 51a2d75251..608b810ddb 100644 --- a/src/Topk.h +++ b/src/Topk.h @@ -34,14 +34,41 @@ declare(PDict, Element); class TopkVal : public OpaqueVal { public: + // Initialize a TopkVal. Size specifies how many total elements are tracked TopkVal(uint64 size); ~TopkVal(); - void Encountered(Val* value); // we saw something + + // Call this, when a new value is encountered. Note that on the first call, + // the Bro-Type of the value types that are counted is set. All following calls + // to encountered have to specify the same type + void Encountered(Val* value); + + // Return the first k elements of the result vector. At the moment, this does + // not check if it is in the right order or if we can prove that these are + // the correct top-k. Use count and epsilon for this. VectorVal* getTopK(int k) const; // returns vector + + // Get the current count tracked in the top-k data structure for a certain val. + // Returns 0 if the val is unknown (and logs the error to reporter) uint64_t getCount(Val* value) const; + + // Get the current epsilon tracked in the top-k data structure for a certain val. + // Returns 0 if the val is unknown (and logs the error to reporter) uint64_t getEpsilon(Val* value) const; + + // Get the size set in the constructor uint64_t getSize() const { return size; } - void Merge(const TopkVal* value); + + // Get the sum of all counts of all tracked elements. This is equal to the number + // of total observations up to this moment, if no elements were pruned from the data + // structure. + uint64_t getSum() const; + + // Merge another top-k data structure in this one. + // doPrune specifies if the total count of elements is limited to size after + // merging. + // Please note, that pruning will invalidate the results of getSum. + void Merge(const TopkVal* value, bool doPrune=false); protected: TopkVal(); // for deserialize @@ -55,6 +82,7 @@ private: PDict(Element)* elementDict; uint64 size; // how many elements are we tracking? uint64 numElements; // how many elements do we have at the moment + bool pruned; // was this data structure pruned? DECLARE_SERIAL(TopkVal); }; diff --git a/src/bro.bif b/src/bro.bif index 195e4c2bde..4c46b23241 100644 --- a/src/bro.bif +++ b/src/bro.bif @@ -5739,12 +5739,18 @@ function anonymize_addr%(a: addr, cl: IPAddrAnonymizationClass%): addr #include "Topk.h" %%} +## Creates a top-k data structure which tracks size elements. +## +## Returns: Opaque pointer to the data structure. function topk_init%(size: count%): opaque of topk %{ Topk::TopkVal* v = new Topk::TopkVal(size); return v; %} +## Add a new observed object to the data structure. The first +## added object sets the type of data tracked by the top-k data +## structure. All following values have to be of the same type function topk_add%(handle: opaque of topk, value: any%): any %{ assert(handle); @@ -5754,6 +5760,9 @@ function topk_add%(handle: opaque of topk, value: any%): any return 0; %} +## Get the first k elements of the top-k data structure +## +## Returns: vector of the first k elements function topk_get_top%(handle: opaque of topk, k: count%): any %{ assert(handle); @@ -5761,6 +5770,11 @@ function topk_get_top%(handle: opaque of topk, k: count%): any return h->getTopK(k); %} +## Get an overestimated count of how often value has been encountered. +## value has to be part of the currently tracked elements, otherwise +## 0 will be returned and an error message will be added to reporter. +## +## Returns: Overestimated number for how often the element has been encountered function topk_count%(handle: opaque of topk, value: any%): count %{ assert(handle); @@ -5768,6 +5782,10 @@ function topk_count%(handle: opaque of topk, value: any%): count return new Val(h->getCount(value), TYPE_COUNT); %} +## Get a the maximal overestimation for count. Same restrictiosn as for topk_count +## apply. +## +## Returns: Number which represents the maximal overesimation for the count of this element. function topk_epsilon%(handle: opaque of topk, value: any%): count %{ assert(handle); @@ -5775,6 +5793,11 @@ function topk_epsilon%(handle: opaque of topk, value: any%): count return new Val(h->getEpsilon(value), TYPE_COUNT); %} +## Get the number of elements this data structure is supposed to track (given on init). +## Note that the actual number of elements in the data structure can be lower or higher +## than this. (higher due to non-pruned merges) +## +## Returns: size given during initialization function topk_size%(handle: opaque of topk%): count %{ assert(handle); @@ -5782,6 +5805,20 @@ function topk_size%(handle: opaque of topk%): count return new Val(h->getSize(), TYPE_COUNT); %} +## Get the sum of all counts of all elements in the data structure. Is equal to the number +## of all inserted objects if the data structure never has been pruned. Do not use after +## calling topk_merge_prune (will throw a warning message if used afterwards) +## +## Returns: sum of all counts +function topk_sum%(handle: opaque of topk%): count + %{ + assert(handle); + Topk::TopkVal* h = (Topk::TopkVal*) handle; + return new Val(h->getSum(), TYPE_COUNT); + %} + +## Merge the second topk data structure into the first. Does not remove any elements, the +## resulting data structure can be bigger than the maximum size given on initialization. function topk_merge%(handle1: opaque of topk, handle2: opaque of topk%): any %{ assert(handle1); @@ -5795,3 +5832,20 @@ function topk_merge%(handle1: opaque of topk, handle2: opaque of topk%): any return 0; %} +## Merge the second topk data structure into the first and prunes the final data structure +## back to the size given on initialization. Use with care and only when being aware of the +## restrictions this imposed. Do not call topk_size or topk_add afterwards, results will +## probably not be what you expect. +function topk_merge_prune%(handle1: opaque of topk, handle2: opaque of topk%): any + %{ + assert(handle1); + assert(handle2); + + Topk::TopkVal* h1 = (Topk::TopkVal*) handle1; + Topk::TopkVal* h2 = (Topk::TopkVal*) handle2; + + h1->Merge(h2, true); + + return 0; + %} + diff --git a/testing/btest/Baseline/bifs.topk/.stderr b/testing/btest/Baseline/bifs.topk/.stderr index f2bd316fd8..80626107aa 100644 --- a/testing/btest/Baseline/bifs.topk/.stderr +++ b/testing/btest/Baseline/bifs.topk/.stderr @@ -6,5 +6,6 @@ error: getCount for element that is not in top-k error: getEpsilon for element that is not in top-k error: getCount for element that is not in top-k error: getEpsilon for element that is not in top-k +warning: TopkVal::getSum() was used on a pruned data structure. Result values do not represent total element count error: getCount for element that is not in top-k error: getEpsilon for element that is not in top-k diff --git a/testing/btest/Baseline/bifs.topk/out b/testing/btest/Baseline/bifs.topk/out index 8db55eeca8..1ce5c4b850 100644 --- a/testing/btest/Baseline/bifs.topk/out +++ b/testing/btest/Baseline/bifs.topk/out @@ -1,4 +1,5 @@ [b, c] +4 0 0 2 @@ -6,6 +7,7 @@ 2 1 [d, c] +5 0 0 2 @@ -13,21 +15,25 @@ 3 2 [d, e] +6 3 2 3 2 [f, e] +7 4 3 3 2 [f, e] +8 4 3 4 2 [g, e] +9 0 0 4 @@ -35,6 +41,7 @@ 5 4 [c, e, d] +19 6 0 5 @@ -49,9 +56,26 @@ 0 0 [c, e] +22 12 0 10 0 0 0 +[c, e] +19 +6 +0 +5 +0 +4 +0 +[c, e, d] +38 +12 +0 +10 +0 +8 +0 diff --git a/testing/btest/bifs/topk.bro b/testing/btest/bifs/topk.bro index 92a68999cc..02d13c4195 100644 --- a/testing/btest/bifs/topk.bro +++ b/testing/btest/bifs/topk.bro @@ -14,6 +14,7 @@ event bro_init() local s = topk_get_top(k1, 5); print s; + print topk_sum(k1); print topk_count(k1, "a"); print topk_epsilon(k1, "a"); print topk_count(k1, "b"); @@ -24,6 +25,7 @@ event bro_init() topk_add(k1, "d"); s = topk_get_top(k1, 5); print s; + print topk_sum(k1); print topk_count(k1, "b"); print topk_epsilon(k1, "b"); print topk_count(k1, "c"); @@ -34,6 +36,7 @@ event bro_init() topk_add(k1, "e"); s = topk_get_top(k1, 5); print s; + print topk_sum(k1); print topk_count(k1, "d"); print topk_epsilon(k1, "d"); print topk_count(k1, "e"); @@ -42,6 +45,7 @@ event bro_init() topk_add(k1, "f"); s = topk_get_top(k1, 5); print s; + print topk_sum(k1); print topk_count(k1, "f"); print topk_epsilon(k1, "f"); print topk_count(k1, "e"); @@ -50,6 +54,7 @@ event bro_init() topk_add(k1, "e"); s = topk_get_top(k1, 5); print s; + print topk_sum(k1); print topk_count(k1, "f"); print topk_epsilon(k1, "f"); print topk_count(k1, "e"); @@ -58,6 +63,7 @@ event bro_init() topk_add(k1, "g"); s = topk_get_top(k1, 5); print s; + print topk_sum(k1); print topk_count(k1, "f"); print topk_epsilon(k1, "f"); print topk_count(k1, "e"); @@ -87,6 +93,7 @@ event bro_init() topk_add(k1, "f"); s = topk_get_top(k1, 3); print s; + print topk_sum(k1); print topk_count(k1, "c"); print topk_epsilon(k1, "c"); print topk_count(k1, "e"); @@ -95,7 +102,7 @@ event bro_init() print topk_epsilon(k1, "d"); local k3 = topk_init(2); - topk_merge(k3, k1); + topk_merge_prune(k3, k1); s = topk_get_top(k3, 3); print s; @@ -106,10 +113,11 @@ event bro_init() print topk_count(k3, "d"); print topk_epsilon(k3, "d"); - topk_merge(k3, k1); + topk_merge_prune(k3, k1); s = topk_get_top(k3, 3); print s; + print topk_sum(k3); # this gives a warning and a wrong result. print topk_count(k3, "c"); print topk_epsilon(k3, "c"); print topk_count(k3, "e"); @@ -117,4 +125,30 @@ event bro_init() print topk_count(k3, "d"); print topk_epsilon(k3, "d"); + k3 = topk_init(2); + topk_merge(k3, k1); + print s; + print topk_sum(k3); + print topk_count(k3, "c"); + print topk_epsilon(k3, "c"); + print topk_count(k3, "e"); + print topk_epsilon(k3, "e"); + print topk_count(k3, "d"); + print topk_epsilon(k3, "d"); + + topk_merge(k3, k1); + + s = topk_get_top(k3, 3); + print s; + print topk_sum(k3); + print topk_count(k3, "c"); + print topk_epsilon(k3, "c"); + print topk_count(k3, "e"); + print topk_epsilon(k3, "e"); + print topk_count(k3, "d"); + print topk_epsilon(k3, "d"); + + + + } From c6e69ddc05cc6cdcaa8006e0d2fb2535a885f30b Mon Sep 17 00:00:00 2001 From: Bernhard Amann Date: Wed, 1 May 2013 17:06:45 -0700 Subject: [PATCH 13/16] potentially found wrong Ref. --- src/Topk.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Topk.cc b/src/Topk.cc index 2527ecc4bc..9c0a04607d 100644 --- a/src/Topk.cc +++ b/src/Topk.cc @@ -274,7 +274,7 @@ VectorVal* TopkVal::getTopK(int k) const // returns vector } TypeList* vector_index = new TypeList(type); - vector_index->Append(type); + vector_index->Append(type->Ref()); VectorType* v = new VectorType(vector_index); VectorVal* t = new VectorVal(v); @@ -362,7 +362,7 @@ void TopkVal::Encountered(Val* encountered) //printf("NumElements: %d\n", numElements); // check type compatibility if ( numElements == 0 ) - type = encountered->Type()->Ref()->Ref(); + type = encountered->Type()->Ref(); else if ( !same_type(type, encountered->Type()) ) { From 075bfc5b3ded8b2f223805eaa48c4d04451868a8 Mon Sep 17 00:00:00 2001 From: Bernhard Amann Date: Thu, 2 May 2013 12:09:35 -0700 Subject: [PATCH 14/16] synchronize pruned attribute --- src/Topk.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/Topk.cc b/src/Topk.cc index 9c0a04607d..2b84b389b0 100644 --- a/src/Topk.cc +++ b/src/Topk.cc @@ -178,6 +178,7 @@ bool TopkVal::DoSerialize(SerialInfo* info) const v &= SERIALIZE(size); v &= SERIALIZE(numElements); + v &= SERIALIZE(pruned); bool type_present = (type != 0); v &= SERIALIZE(type_present); if ( type_present ) @@ -220,6 +221,7 @@ bool TopkVal::DoUnserialize(UnserialInfo* info) v &= UNSERIALIZE(&size); v &= UNSERIALIZE(&numElements); + v &= UNSERIALIZE(&pruned); bool type_present = false; v &= UNSERIALIZE(&type_present); if ( type_present ) From cf6e768ad641988320a3379b5ead701ea09a0ff1 Mon Sep 17 00:00:00 2001 From: Bernhard Amann Date: Fri, 3 May 2013 23:08:26 -0700 Subject: [PATCH 15/16] fix opaqueval-related memleak --- src/NetVar.cc | 2 ++ src/NetVar.h | 1 + src/Topk.cc | 5 +++-- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/NetVar.cc b/src/NetVar.cc index 012e4a85bc..1a2e604e90 100644 --- a/src/NetVar.cc +++ b/src/NetVar.cc @@ -243,6 +243,7 @@ OpaqueType* md5_type; OpaqueType* sha1_type; OpaqueType* sha256_type; OpaqueType* entropy_type; +OpaqueType* topk_type; #include "const.bif.netvar_def" #include "types.bif.netvar_def" @@ -308,6 +309,7 @@ void init_general_global_var() sha1_type = new OpaqueType("sha1"); sha256_type = new OpaqueType("sha256"); entropy_type = new OpaqueType("entropy"); + topk_type = new OpaqueType("topk"); } void init_net_var() diff --git a/src/NetVar.h b/src/NetVar.h index d7590b20e7..37ed3c7c85 100644 --- a/src/NetVar.h +++ b/src/NetVar.h @@ -248,6 +248,7 @@ extern OpaqueType* md5_type; extern OpaqueType* sha1_type; extern OpaqueType* sha256_type; extern OpaqueType* entropy_type; +extern OpaqueType* topk_type; // Initializes globals that don't pertain to network/event analysis. extern void init_general_global_var(); diff --git a/src/Topk.cc b/src/Topk.cc index 2b84b389b0..10374f3087 100644 --- a/src/Topk.cc +++ b/src/Topk.cc @@ -4,6 +4,7 @@ #include "CompHash.h" #include "Reporter.h" #include "Serializer.h" +#include "NetVar.h" namespace Topk { @@ -36,7 +37,7 @@ HashKey* TopkVal::GetHash(Val* v) const return key; } -TopkVal::TopkVal(uint64 arg_size) : OpaqueVal(new OpaqueType("topk")) +TopkVal::TopkVal(uint64 arg_size) : OpaqueVal(topk_type) { elementDict = new PDict(Element); elementDict->SetDeleteFunc(topk_element_hash_delete_func); @@ -46,7 +47,7 @@ TopkVal::TopkVal(uint64 arg_size) : OpaqueVal(new OpaqueType("topk")) pruned = false; } -TopkVal::TopkVal() : OpaqueVal(new OpaqueType("topk")) +TopkVal::TopkVal() : OpaqueVal(topk_type) { elementDict = new PDict(Element); elementDict->SetDeleteFunc(topk_element_hash_delete_func); From 5122bf4a7cbe5e78802042729d53009d5cc28ab5 Mon Sep 17 00:00:00 2001 From: Bernhard Amann Date: Wed, 31 Jul 2013 12:06:59 -0700 Subject: [PATCH 16/16] adapt to new folder structure --- src/CMakeLists.txt | 1 - src/bro.bif | 114 ---------------- src/probabilistic/CMakeLists.txt | 4 +- src/{ => probabilistic}/Topk.cc | 4 +- src/{ => probabilistic}/Topk.h | 2 +- src/probabilistic/top-k.bif | 122 ++++++++++++++++++ .../out | 0 .../topk_persistence.bro => istate/topk.bro} | 0 8 files changed, 128 insertions(+), 119 deletions(-) rename src/{ => probabilistic}/Topk.cc (99%) rename src/{ => probabilistic}/Topk.h (99%) create mode 100644 src/probabilistic/top-k.bif rename testing/btest/Baseline/{bifs.topk_persistence => istate.topk}/out (100%) rename testing/btest/{bifs/topk_persistence.bro => istate/topk.bro} (100%) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 2693e1f280..4a65ddd4d3 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -320,7 +320,6 @@ set(bro_SRCS Stats.cc Stmt.cc Timer.cc - Topk.cc Traverse.cc Trigger.cc TunnelEncapsulation.cc diff --git a/src/bro.bif b/src/bro.bif index fab11c7e90..efb913bbf7 100644 --- a/src/bro.bif +++ b/src/bro.bif @@ -4976,117 +4976,3 @@ function anonymize_addr%(a: addr, cl: IPAddrAnonymizationClass%): addr } %} -%%{ -#include "Topk.h" -%%} - -## Creates a top-k data structure which tracks size elements. -## -## Returns: Opaque pointer to the data structure. -function topk_init%(size: count%): opaque of topk - %{ - Topk::TopkVal* v = new Topk::TopkVal(size); - return v; - %} - -## Add a new observed object to the data structure. The first -## added object sets the type of data tracked by the top-k data -## structure. All following values have to be of the same type -function topk_add%(handle: opaque of topk, value: any%): any - %{ - assert(handle); - Topk::TopkVal* h = (Topk::TopkVal*) handle; - h->Encountered(value); - - return 0; - %} - -## Get the first k elements of the top-k data structure -## -## Returns: vector of the first k elements -function topk_get_top%(handle: opaque of topk, k: count%): any - %{ - assert(handle); - Topk::TopkVal* h = (Topk::TopkVal*) handle; - return h->getTopK(k); - %} - -## Get an overestimated count of how often value has been encountered. -## value has to be part of the currently tracked elements, otherwise -## 0 will be returned and an error message will be added to reporter. -## -## Returns: Overestimated number for how often the element has been encountered -function topk_count%(handle: opaque of topk, value: any%): count - %{ - assert(handle); - Topk::TopkVal* h = (Topk::TopkVal*) handle; - return new Val(h->getCount(value), TYPE_COUNT); - %} - -## Get a the maximal overestimation for count. Same restrictiosn as for topk_count -## apply. -## -## Returns: Number which represents the maximal overesimation for the count of this element. -function topk_epsilon%(handle: opaque of topk, value: any%): count - %{ - assert(handle); - Topk::TopkVal* h = (Topk::TopkVal*) handle; - return new Val(h->getEpsilon(value), TYPE_COUNT); - %} - -## Get the number of elements this data structure is supposed to track (given on init). -## Note that the actual number of elements in the data structure can be lower or higher -## than this. (higher due to non-pruned merges) -## -## Returns: size given during initialization -function topk_size%(handle: opaque of topk%): count - %{ - assert(handle); - Topk::TopkVal* h = (Topk::TopkVal*) handle; - return new Val(h->getSize(), TYPE_COUNT); - %} - -## Get the sum of all counts of all elements in the data structure. Is equal to the number -## of all inserted objects if the data structure never has been pruned. Do not use after -## calling topk_merge_prune (will throw a warning message if used afterwards) -## -## Returns: sum of all counts -function topk_sum%(handle: opaque of topk%): count - %{ - assert(handle); - Topk::TopkVal* h = (Topk::TopkVal*) handle; - return new Val(h->getSum(), TYPE_COUNT); - %} - -## Merge the second topk data structure into the first. Does not remove any elements, the -## resulting data structure can be bigger than the maximum size given on initialization. -function topk_merge%(handle1: opaque of topk, handle2: opaque of topk%): any - %{ - assert(handle1); - assert(handle2); - - Topk::TopkVal* h1 = (Topk::TopkVal*) handle1; - Topk::TopkVal* h2 = (Topk::TopkVal*) handle2; - - h1->Merge(h2); - - return 0; - %} - -## Merge the second topk data structure into the first and prunes the final data structure -## back to the size given on initialization. Use with care and only when being aware of the -## restrictions this imposed. Do not call topk_size or topk_add afterwards, results will -## probably not be what you expect. -function topk_merge_prune%(handle1: opaque of topk, handle2: opaque of topk%): any - %{ - assert(handle1); - assert(handle2); - - Topk::TopkVal* h1 = (Topk::TopkVal*) handle1; - Topk::TopkVal* h2 = (Topk::TopkVal*) handle2; - - h1->Merge(h2, true); - - return 0; - %} - diff --git a/src/probabilistic/CMakeLists.txt b/src/probabilistic/CMakeLists.txt index af062b24ae..a36dfbbd6b 100644 --- a/src/probabilistic/CMakeLists.txt +++ b/src/probabilistic/CMakeLists.txt @@ -10,9 +10,11 @@ set(probabilistic_SRCS BitVector.cc BloomFilter.cc CounterVector.cc - Hasher.cc) + Hasher.cc + Topk.cc) bif_target(bloom-filter.bif) +bif_target(top-k.bif) bro_add_subdir_library(probabilistic ${probabilistic_SRCS}) add_dependencies(bro_probabilistic generate_outputs) diff --git a/src/Topk.cc b/src/probabilistic/Topk.cc similarity index 99% rename from src/Topk.cc rename to src/probabilistic/Topk.cc index 10374f3087..d03a10ccfc 100644 --- a/src/Topk.cc +++ b/src/probabilistic/Topk.cc @@ -1,13 +1,13 @@ // See the file "COPYING" in the main distribution directory for copyright. -#include "Topk.h" +#include "probabilistic/Topk.h" #include "CompHash.h" #include "Reporter.h" #include "Serializer.h" #include "NetVar.h" -namespace Topk { +namespace probabilistic { IMPLEMENT_SERIAL(TopkVal, SER_TOPK_VAL); diff --git a/src/Topk.h b/src/probabilistic/Topk.h similarity index 99% rename from src/Topk.h rename to src/probabilistic/Topk.h index 608b810ddb..2c47fbd181 100644 --- a/src/Topk.h +++ b/src/probabilistic/Topk.h @@ -10,7 +10,7 @@ // This class implements the top-k algorithm. Or - to be more precise - my interpretation of it. -namespace Topk { +namespace probabilistic { struct Element; diff --git a/src/probabilistic/top-k.bif b/src/probabilistic/top-k.bif new file mode 100644 index 0000000000..83d8e275c1 --- /dev/null +++ b/src/probabilistic/top-k.bif @@ -0,0 +1,122 @@ +# =========================================================================== +# +# Top-K Functions +# +# =========================================================================== + + +%%{ +#include "probabilistic/Topk.h" +%%} + +## Creates a top-k data structure which tracks size elements. +## +## Returns: Opaque pointer to the data structure. +function topk_init%(size: count%): opaque of topk + %{ + probabilistic::TopkVal* v = new probabilistic::TopkVal(size); + return v; + %} + +## Add a new observed object to the data structure. The first +## added object sets the type of data tracked by the top-k data +## structure. All following values have to be of the same type +function topk_add%(handle: opaque of topk, value: any%): any + %{ + assert(handle); + probabilistic::TopkVal* h = (probabilistic::TopkVal*) handle; + h->Encountered(value); + + return 0; + %} + +## Get the first k elements of the top-k data structure +## +## Returns: vector of the first k elements +function topk_get_top%(handle: opaque of topk, k: count%): any + %{ + assert(handle); + probabilistic::TopkVal* h = (probabilistic::TopkVal*) handle; + return h->getTopK(k); + %} + +## Get an overestimated count of how often value has been encountered. +## value has to be part of the currently tracked elements, otherwise +## 0 will be returned and an error message will be added to reporter. +## +## Returns: Overestimated number for how often the element has been encountered +function topk_count%(handle: opaque of topk, value: any%): count + %{ + assert(handle); + probabilistic::TopkVal* h = (probabilistic::TopkVal*) handle; + return new Val(h->getCount(value), TYPE_COUNT); + %} + +## Get a the maximal overestimation for count. Same restrictiosn as for topk_count +## apply. +## +## Returns: Number which represents the maximal overesimation for the count of this element. +function topk_epsilon%(handle: opaque of topk, value: any%): count + %{ + assert(handle); + probabilistic::TopkVal* h = (probabilistic::TopkVal*) handle; + return new Val(h->getEpsilon(value), TYPE_COUNT); + %} + +## Get the number of elements this data structure is supposed to track (given on init). +## Note that the actual number of elements in the data structure can be lower or higher +## than this. (higher due to non-pruned merges) +## +## Returns: size given during initialization +function topk_size%(handle: opaque of topk%): count + %{ + assert(handle); + probabilistic::TopkVal* h = (probabilistic::TopkVal*) handle; + return new Val(h->getSize(), TYPE_COUNT); + %} + +## Get the sum of all counts of all elements in the data structure. Is equal to the number +## of all inserted objects if the data structure never has been pruned. Do not use after +## calling topk_merge_prune (will throw a warning message if used afterwards) +## +## Returns: sum of all counts +function topk_sum%(handle: opaque of topk%): count + %{ + assert(handle); + probabilistic::TopkVal* h = (probabilistic::TopkVal*) handle; + return new Val(h->getSum(), TYPE_COUNT); + %} + +## Merge the second topk data structure into the first. Does not remove any elements, the +## resulting data structure can be bigger than the maximum size given on initialization. +function topk_merge%(handle1: opaque of topk, handle2: opaque of topk%): any + %{ + assert(handle1); + assert(handle2); + + probabilistic::TopkVal* h1 = (probabilistic::TopkVal*) handle1; + probabilistic::TopkVal* h2 = (probabilistic::TopkVal*) handle2; + + h1->Merge(h2); + + return 0; + %} + +## Merge the second topk data structure into the first and prunes the final data structure +## back to the size given on initialization. Use with care and only when being aware of the +## restrictions this imposed. Do not call topk_size or topk_add afterwards, results will +## probably not be what you expect. +function topk_merge_prune%(handle1: opaque of topk, handle2: opaque of topk%): any + %{ + assert(handle1); + assert(handle2); + + probabilistic::TopkVal* h1 = (probabilistic::TopkVal*) handle1; + probabilistic::TopkVal* h2 = (probabilistic::TopkVal*) handle2; + + h1->Merge(h2, true); + + return 0; + %} + + diff --git a/testing/btest/Baseline/bifs.topk_persistence/out b/testing/btest/Baseline/istate.topk/out similarity index 100% rename from testing/btest/Baseline/bifs.topk_persistence/out rename to testing/btest/Baseline/istate.topk/out diff --git a/testing/btest/bifs/topk_persistence.bro b/testing/btest/istate/topk.bro similarity index 100% rename from testing/btest/bifs/topk_persistence.bro rename to testing/btest/istate/topk.bro