diff --git a/scripts/base/frameworks/sumstats/plugins/__load__.bro b/scripts/base/frameworks/sumstats/plugins/__load__.bro index c0ee3a6767..d2f89e41c5 100644 --- a/scripts/base/frameworks/sumstats/plugins/__load__.bro +++ b/scripts/base/frameworks/sumstats/plugins/__load__.bro @@ -5,5 +5,6 @@ @load ./sample @load ./std-dev @load ./sum +@load ./topk @load ./unique @load ./variance diff --git a/scripts/base/frameworks/sumstats/plugins/topk.bro b/scripts/base/frameworks/sumstats/plugins/topk.bro new file mode 100644 index 0000000000..58f8168f5b --- /dev/null +++ b/scripts/base/frameworks/sumstats/plugins/topk.bro @@ -0,0 +1,50 @@ +@load base/frameworks/sumstats + +module SumStats; + +export { + redef record Reducer += { + ## number of elements to keep in the top-k list + topk_size: count &default=500; + }; + + redef enum Calculation += { + TOPK + }; + + redef record ResultVal += { + topk: opaque of topk &optional; + }; + +} + +hook init_resultval_hook(r: Reducer, rv: ResultVal) + { + if ( TOPK in r$apply && ! rv?$topk ) + rv$topk = topk_init(r$topk_size); + } + +hook observe_hook(r: Reducer, val: double, obs: Observation, rv: ResultVal) + { + if ( TOPK in r$apply ) + topk_add(rv$topk, obs); + } + +hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal) + { + if ( rv1?$topk ) + { + result$topk = topk_init(topk_size(rv1$topk)); + + topk_merge(result$topk, rv1$topk); + + if ( rv2?$topk ) + topk_merge(result$topk, rv2$topk); + } + + else if ( rv2?$topk ) + { + result$topk = topk_init(topk_size(rv2$topk)); + topk_merge(result$topk, rv2$topk); + } + } diff --git a/src/NetVar.cc b/src/NetVar.cc index 388aa46f10..a24a7e3534 100644 --- a/src/NetVar.cc +++ b/src/NetVar.cc @@ -242,6 +242,7 @@ OpaqueType* md5_type; OpaqueType* sha1_type; OpaqueType* sha256_type; OpaqueType* entropy_type; +OpaqueType* topk_type; OpaqueType* bloomfilter_type; #include "const.bif.netvar_def" @@ -308,6 +309,7 @@ void init_general_global_var() sha1_type = new OpaqueType("sha1"); sha256_type = new OpaqueType("sha256"); entropy_type = new OpaqueType("entropy"); + topk_type = new OpaqueType("topk"); bloomfilter_type = new OpaqueType("bloomfilter"); } diff --git a/src/NetVar.h b/src/NetVar.h index 7ce33d1a1a..048205d82f 100644 --- a/src/NetVar.h +++ b/src/NetVar.h @@ -247,6 +247,7 @@ extern OpaqueType* md5_type; extern OpaqueType* sha1_type; extern OpaqueType* sha256_type; extern OpaqueType* entropy_type; +extern OpaqueType* topk_type; extern OpaqueType* bloomfilter_type; // Initializes globals that don't pertain to network/event analysis. diff --git a/src/SerialTypes.h b/src/SerialTypes.h index 9933d005f0..5271caa2e3 100644 --- a/src/SerialTypes.h +++ b/src/SerialTypes.h @@ -108,7 +108,8 @@ SERIAL_VAL(MD5_VAL, 16) SERIAL_VAL(SHA1_VAL, 17) SERIAL_VAL(SHA256_VAL, 18) SERIAL_VAL(ENTROPY_VAL, 19) -SERIAL_VAL(BLOOMFILTER_VAL, 20) +SERIAL_VAL(TOPK_VAL, 20) +SERIAL_VAL(BLOOMFILTER_VAL, 21) #define SERIAL_EXPR(name, val) SERIAL_CONST(name, val, EXPR) SERIAL_EXPR(EXPR, 1) diff --git a/src/Serializer.h b/src/Serializer.h index 72e0723880..c1af1a2c4f 100644 --- a/src/Serializer.h +++ b/src/Serializer.h @@ -125,7 +125,7 @@ protected: // This will be increased whenever there is an incompatible change // in the data format. - static const uint32 DATA_FORMAT_VERSION = 23; + static const uint32 DATA_FORMAT_VERSION = 24; ChunkedIO* io; diff --git a/src/probabilistic/CMakeLists.txt b/src/probabilistic/CMakeLists.txt index af062b24ae..a36dfbbd6b 100644 --- a/src/probabilistic/CMakeLists.txt +++ b/src/probabilistic/CMakeLists.txt @@ -10,9 +10,11 @@ set(probabilistic_SRCS BitVector.cc BloomFilter.cc CounterVector.cc - Hasher.cc) + Hasher.cc + Topk.cc) bif_target(bloom-filter.bif) +bif_target(top-k.bif) bro_add_subdir_library(probabilistic ${probabilistic_SRCS}) add_dependencies(bro_probabilistic generate_outputs) diff --git a/src/probabilistic/Topk.cc b/src/probabilistic/Topk.cc new file mode 100644 index 0000000000..dbfa3cdb83 --- /dev/null +++ b/src/probabilistic/Topk.cc @@ -0,0 +1,493 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#include "probabilistic/Topk.h" +#include "CompHash.h" +#include "Reporter.h" +#include "Serializer.h" +#include "NetVar.h" + +namespace probabilistic { + +IMPLEMENT_SERIAL(TopkVal, SER_TOPK_VAL); + +static void topk_element_hash_delete_func(void* val) + { + Element* e = (Element*) val; + delete e; + } + +Element::~Element() + { + Unref(value); + value = 0; + } + +HashKey* TopkVal::GetHash(Val* v) const + { + TypeList* tl = new TypeList(v->Type()); + tl->Append(v->Type()->Ref()); + CompositeHash* topk_hash = new CompositeHash(tl); + Unref(tl); + + HashKey* key = topk_hash->ComputeHash(v, 1); + assert(key); + delete topk_hash; + return key; + } + +TopkVal::TopkVal(uint64 arg_size) : OpaqueVal(topk_type) + { + elementDict = new PDict(Element); + elementDict->SetDeleteFunc(topk_element_hash_delete_func); + size = arg_size; + type = 0; + numElements = 0; + pruned = false; + } + +TopkVal::TopkVal() : OpaqueVal(topk_type) + { + elementDict = new PDict(Element); + elementDict->SetDeleteFunc(topk_element_hash_delete_func); + size = 0; + type = 0; + numElements = 0; + } + +TopkVal::~TopkVal() + { + elementDict->Clear(); + delete elementDict; + + // now all elements are already gone - delete the buckets + std::list::iterator bi = buckets.begin(); + while ( bi != buckets.end() ) + { + delete *bi; + bi++; + } + + Unref(type); + type = 0; + } + +void TopkVal::Merge(const TopkVal* value, bool doPrune) + { + if ( type == 0 ) + { + assert(numElements == 0); + type = value->type->Ref(); + } + + else + { + if ( ! same_type(type, value->type) ) + { + reporter->Error("Cannot merge top-k elements of differing types."); + return; + } + } + + std::list::const_iterator it = value->buckets.begin(); + while ( it != value->buckets.end() ) + { + Bucket* b = *it; + uint64_t currcount = b->count; + std::list::const_iterator eit = b->elements.begin(); + + while ( eit != b->elements.end() ) + { + Element* e = *eit; + // lookup if we already know this one... + HashKey* key = GetHash(e->value); + Element* olde = (Element*) elementDict->Lookup(key); + + if ( olde == 0 ) + { + olde = new Element(); + olde->epsilon = 0; + olde->value = e->value->Ref(); + // insert at bucket position 0 + if ( buckets.size() > 0 ) + { + assert (buckets.front()-> count > 0 ); + } + + Bucket* newbucket = new Bucket(); + newbucket->count = 0; + newbucket->bucketPos = buckets.insert(buckets.begin(), newbucket); + + olde->parent = newbucket; + newbucket->elements.insert(newbucket->elements.end(), olde); + + elementDict->Insert(key, olde); + numElements++; + + } + + // now that we are sure that the old element is present - increment epsilon + olde->epsilon += e->epsilon; + + // and increment position... + IncrementCounter(olde, currcount); + delete key; + + eit++; + } + + it++; + } + + // now we have added everything. And our top-k table could be too big. + // prune everything... + + assert(size > 0); + + if ( ! doPrune ) + return; + + while ( numElements > size ) + { + pruned = true; + assert(buckets.size() > 0 ); + Bucket* b = buckets.front(); + assert(b->elements.size() > 0); + + Element* e = b->elements.front(); + HashKey* key = GetHash(e->value); + elementDict->RemoveEntry(key); + delete e; + + b->elements.pop_front(); + + if ( b->elements.size() == 0 ) + { + delete b; + buckets.pop_front(); + } + + numElements--; + } + } + +bool TopkVal::DoSerialize(SerialInfo* info) const + { + DO_SERIALIZE(SER_TOPK_VAL, OpaqueVal); + + bool v = true; + + v &= SERIALIZE(size); + v &= SERIALIZE(numElements); + v &= SERIALIZE(pruned); + + bool type_present = (type != 0); + v &= SERIALIZE(type_present); + + if ( type_present ) + v &= type->Serialize(info); + else + assert(numElements == 0); + + uint64_t i = 0; + std::list::const_iterator it = buckets.begin(); + while ( it != buckets.end() ) + { + Bucket* b = *it; + uint32_t elements_count = b->elements.size(); + v &= SERIALIZE(elements_count); + v &= SERIALIZE(b->count); + + std::list::const_iterator eit = b->elements.begin(); + while ( eit != b->elements.end() ) + { + Element* element = *eit; + v &= SERIALIZE(element->epsilon); + v &= element->value->Serialize(info); + + eit++; + i++; + } + + it++; + } + + assert(i == numElements); + + return v; + } + +bool TopkVal::DoUnserialize(UnserialInfo* info) + { + DO_UNSERIALIZE(OpaqueVal); + + bool v = true; + + v &= UNSERIALIZE(&size); + v &= UNSERIALIZE(&numElements); + v &= UNSERIALIZE(&pruned); + + bool type_present = false; + v &= UNSERIALIZE(&type_present); + if ( type_present ) + { + type = BroType::Unserialize(info); + assert(type); + } + else + assert(numElements == 0); + + uint64_t i = 0; + while ( i < numElements ) + { + Bucket* b = new Bucket(); + uint32_t elements_count; + v &= UNSERIALIZE(&elements_count); + v &= UNSERIALIZE(&b->count); + b->bucketPos = buckets.insert(buckets.end(), b); + + for ( uint64_t j = 0; j < elements_count; j++ ) + { + Element* e = new Element(); + v &= UNSERIALIZE(&e->epsilon); + e->value = Val::Unserialize(info, type); + e->parent = b; + + b->elements.insert(b->elements.end(), e); + + HashKey* key = GetHash(e->value); + assert (elementDict->Lookup(key) == 0); + + elementDict->Insert(key, e); + delete key; + + i++; + } + } + + assert(i == numElements); + + return v; + } + + +VectorVal* TopkVal::getTopK(int k) const // returns vector + { + if ( numElements == 0 ) + { + reporter->Error("Cannot return topk of empty"); + return 0; + } + + TypeList* vector_index = new TypeList(type); + vector_index->Append(type->Ref()); + VectorType* v = new VectorType(vector_index); + VectorVal* t = new VectorVal(v); + + // this does no estimation if the results is correct! + // in any case - just to make this future-proof (and I am lazy) - this can return more than k. + + int read = 0; + std::list::const_iterator it = buckets.end(); + it--; + while (read < k ) + { + //printf("Bucket %llu\n", (*it)->count); + std::list::iterator eit = (*it)->elements.begin(); + while ( eit != (*it)->elements.end() ) + { + //printf("Size: %ld\n", (*it)->elements.size()); + t->Assign(read, (*eit)->value->Ref()); + read++; + eit++; + } + + if ( it == buckets.begin() ) + break; + + it--; + } + + Unref(v); + return t; + } + +uint64_t TopkVal::getCount(Val* value) const + { + HashKey* key = GetHash(value); + Element* e = (Element*) elementDict->Lookup(key); + + if ( e == 0 ) + { + reporter->Error("getCount for element that is not in top-k"); + return 0; + } + + delete key; + return e->parent->count; + } + +uint64_t TopkVal::getEpsilon(Val* value) const + { + HashKey* key = GetHash(value); + Element* e = (Element*) elementDict->Lookup(key); + + if ( e == 0 ) + { + reporter->Error("getEpsilon for element that is not in top-k"); + return 0; + } + + delete key; + return e->epsilon; + } + +uint64_t TopkVal::getSum() const + { + uint64_t sum = 0; + + std::list::const_iterator it = buckets.begin(); + while ( it != buckets.end() ) + { + sum += (*it)->elements.size() * (*it)->count; + + it++; + } + + if ( pruned ) + reporter->Warning("TopkVal::getSum() was used on a pruned data structure. Result values do not represent total element count"); + + return sum; + } + +void TopkVal::Encountered(Val* encountered) + { + // ok, let's see if we already know this one. + + //printf("NumElements: %d\n", numElements); + // check type compatibility + if ( numElements == 0 ) + type = encountered->Type()->Ref(); + else + if ( ! same_type(type, encountered->Type()) ) + { + reporter->Error("Trying to add element to topk with differing type from other elements"); + return; + } + + // Step 1 - get the hash. + HashKey* key = GetHash(encountered); + Element* e = (Element*) elementDict->Lookup(key); + + if ( e == 0 ) + { + e = new Element(); + e->epsilon = 0; + e->value = encountered->Ref(); // or no ref? + + // well, we do not know this one yet... + if ( numElements < size ) + { + // brilliant. just add it at position 1 + if ( buckets.size() == 0 || (*buckets.begin())->count > 1 ) + { + Bucket* b = new Bucket(); + b->count = 1; + std::list::iterator pos = buckets.insert(buckets.begin(), b); + b->bucketPos = pos; + b->elements.insert(b->elements.end(), e); + e->parent = b; + } + else + { + Bucket* b = *buckets.begin(); + assert(b->count == 1); + b->elements.insert(b->elements.end(), e); + e->parent = b; + } + + elementDict->Insert(key, e); + numElements++; + delete key; + + return; // done. it is at pos 1. + } + + else + { + // replace element with min-value + Bucket* b = *buckets.begin(); // bucket with smallest elements + + // evict oldest element with least hits. + assert(b->elements.size() > 0); + HashKey* deleteKey = GetHash((*(b->elements.begin()))->value); + b->elements.erase(b->elements.begin()); + Element* deleteElement = (Element*) elementDict->RemoveEntry(deleteKey); + assert(deleteElement); // there has to have been a minimal element... + delete deleteElement; + delete deleteKey; + + // and add the new one to the end + e->epsilon = b->count; + b->elements.insert(b->elements.end(), e); + elementDict->Insert(key, e); + e->parent = b; + + // fallthrough, increment operation has to run! + } + + } + + // ok, we now have an element in e + delete key; + IncrementCounter(e); // well, this certainly was anticlimatic. + } + +// increment by count +void TopkVal::IncrementCounter(Element* e, unsigned int count) + { + Bucket* currBucket = e->parent; + uint64 currcount = currBucket->count; + + // well, let's test if there is a bucket for currcount++ + std::list::iterator bucketIter = currBucket->bucketPos; + + Bucket* nextBucket = 0; + + bucketIter++; + + while ( bucketIter != buckets.end() && (*bucketIter)->count < currcount+count ) + bucketIter++; + + if ( bucketIter != buckets.end() && (*bucketIter)->count == currcount+count ) + nextBucket = *bucketIter; + + if ( nextBucket == 0 ) + { + // the bucket for the value that we want does not exist. + // create it... + + Bucket* b = new Bucket(); + b->count = currcount+count; + + std::list::iterator nextBucketPos = buckets.insert(bucketIter, b); + b->bucketPos = nextBucketPos; // and give it the iterator we know now. + + nextBucket = b; + } + + // ok, now we have the new bucket in nextBucket. Shift the element over... + currBucket->elements.remove(e); + nextBucket->elements.insert(nextBucket->elements.end(), e); + + e->parent = nextBucket; + + // if currBucket is empty, we have to delete it now + if ( currBucket->elements.size() == 0 ) + { + buckets.remove(currBucket); + delete currBucket; + currBucket = 0; + } + } + +}; diff --git a/src/probabilistic/Topk.h b/src/probabilistic/Topk.h new file mode 100644 index 0000000000..af15acf955 --- /dev/null +++ b/src/probabilistic/Topk.h @@ -0,0 +1,97 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#ifndef topk_h +#define topk_h + +#include +#include "Val.h" +#include "CompHash.h" +#include "OpaqueVal.h" + +// This class implements the top-k algorithm. Or - to be more precise - an +// interpretation of it. + +namespace probabilistic { + +struct Element; + +struct Bucket { + uint64 count; + std::list elements; + + // iterators only get invalidated for removed elements. This one + // points to us - so it is invalid when we are no longer there. Cute, + // isn't it? + std::list::iterator bucketPos; +}; + +struct Element { + uint64 epsilon; + Val* value; + Bucket* parent; + + ~Element(); +}; + +declare(PDict, Element); + +class TopkVal : public OpaqueVal { + +public: + // Initialize a TopkVal. Size specifies how many total elements are + // tracked + TopkVal(uint64 size); + ~TopkVal(); + + // Call this, when a new value is encountered. Note that on the first call, + // the Bro-Type of the value types that are counted is set. All following calls + // to encountered have to specify the same type + void Encountered(Val* value); + + // Return the first k elements of the result vector. At the moment, this does + // not check if it is in the right order or if we can prove that these are + // the correct top-k. Use count and epsilon for this. + VectorVal* getTopK(int k) const; // returns vector + + // Get the current count tracked in the top-k data structure for a certain val. + // Returns 0 if the val is unknown (and logs the error to reporter) + uint64_t getCount(Val* value) const; + + // Get the current epsilon tracked in the top-k data structure for a certain val. + // Returns 0 if the val is unknown (and logs the error to reporter) + uint64_t getEpsilon(Val* value) const; + + // Get the size set in the constructor + uint64_t getSize() const { return size; } + + // Get the sum of all counts of all tracked elements. This is equal to the number + // of total observations up to this moment, if no elements were pruned from the data + // structure. + uint64_t getSum() const; + + // Merge another top-k data structure in this one. + // doPrune specifies if the total count of elements is limited to size after + // merging. + // Please note, that pruning will invalidate the results of getSum. + void Merge(const TopkVal* value, bool doPrune=false); + +protected: + TopkVal(); // for deserialize + +private: + void IncrementCounter(Element* e, unsigned int count = 1); + HashKey* GetHash(Val*) const; // this probably should go somewhere else. + + BroType* type; + std::list buckets; + PDict(Element)* elementDict; + uint64 size; // how many elements are we tracking? + uint64 numElements; // how many elements do we have at the moment + bool pruned; // was this data structure pruned? + + DECLARE_SERIAL(TopkVal); +}; + +}; + +#endif diff --git a/src/probabilistic/top-k.bif b/src/probabilistic/top-k.bif new file mode 100644 index 0000000000..f4e7753d90 --- /dev/null +++ b/src/probabilistic/top-k.bif @@ -0,0 +1,122 @@ +# =========================================================================== +# +# Top-K Functions +# +# =========================================================================== + + +%%{ +#include "probabilistic/Topk.h" +%%} + +## Creates a top-k data structure which tracks *size* elements. +## +## Returns: Opaque pointer to the data structure. +function topk_init%(size: count%): opaque of topk + %{ + probabilistic::TopkVal* v = new probabilistic::TopkVal(size); + return v; + %} + +## Add a new observed object to the data structure. The first +## added object sets the type of data tracked by the top-k data +## structure. All following values have to be of the same type +function topk_add%(handle: opaque of topk, value: any%): any + %{ + assert(handle); + probabilistic::TopkVal* h = (probabilistic::TopkVal*) handle; + h->Encountered(value); + + return 0; + %} + +## Get the first k elements of the top-k data structure +## +## Returns: vector of the first k elements +function topk_get_top%(handle: opaque of topk, k: count%): any + %{ + assert(handle); + probabilistic::TopkVal* h = (probabilistic::TopkVal*) handle; + return h->getTopK(k); + %} + +## Get an overestimated count of how often value has been encountered. +## value has to be part of the currently tracked elements, otherwise +## 0 will be returned and an error message will be added to reporter. +## +## Returns: Overestimated number for how often the element has been encountered +function topk_count%(handle: opaque of topk, value: any%): count + %{ + assert(handle); + probabilistic::TopkVal* h = (probabilistic::TopkVal*) handle; + return new Val(h->getCount(value), TYPE_COUNT); + %} + +## Get a the maximal overestimation for count. Same restrictiosn as for topk_count +## apply. +## +## Returns: Number which represents the maximal overesimation for the count of this element. +function topk_epsilon%(handle: opaque of topk, value: any%): count + %{ + assert(handle); + probabilistic::TopkVal* h = (probabilistic::TopkVal*) handle; + return new Val(h->getEpsilon(value), TYPE_COUNT); + %} + +## Get the number of elements this data structure is supposed to track (given on init). +## Note that the actual number of elements in the data structure can be lower or higher +## than this. (higher due to non-pruned merges) +## +## Returns: size given during initialization +function topk_size%(handle: opaque of topk%): count + %{ + assert(handle); + probabilistic::TopkVal* h = (probabilistic::TopkVal*) handle; + return new Val(h->getSize(), TYPE_COUNT); + %} + +## Get the sum of all counts of all elements in the data structure. Is equal to the number +## of all inserted objects if the data structure never has been pruned. Do not use after +## calling topk_merge_prune (will throw a warning message if used afterwards) +## +## Returns: sum of all counts +function topk_sum%(handle: opaque of topk%): count + %{ + assert(handle); + probabilistic::TopkVal* h = (probabilistic::TopkVal*) handle; + return new Val(h->getSum(), TYPE_COUNT); + %} + +## Merge the second topk data structure into the first. Does not remove any elements, the +## resulting data structure can be bigger than the maximum size given on initialization. +function topk_merge%(handle1: opaque of topk, handle2: opaque of topk%): any + %{ + assert(handle1); + assert(handle2); + + probabilistic::TopkVal* h1 = (probabilistic::TopkVal*) handle1; + probabilistic::TopkVal* h2 = (probabilistic::TopkVal*) handle2; + + h1->Merge(h2); + + return 0; + %} + +## Merge the second topk data structure into the first and prunes the final data structure +## back to the size given on initialization. Use with care and only when being aware of the +## restrictions this imposed. Do not call topk_size or topk_add afterwards, results will +## probably not be what you expect. +function topk_merge_prune%(handle1: opaque of topk, handle2: opaque of topk%): any + %{ + assert(handle1); + assert(handle2); + + probabilistic::TopkVal* h1 = (probabilistic::TopkVal*) handle1; + probabilistic::TopkVal* h2 = (probabilistic::TopkVal*) handle2; + + h1->Merge(h2, true); + + return 0; + %} + + diff --git a/testing/btest/Baseline/bifs.topk/.stderr b/testing/btest/Baseline/bifs.topk/.stderr new file mode 100644 index 0000000000..80626107aa --- /dev/null +++ b/testing/btest/Baseline/bifs.topk/.stderr @@ -0,0 +1,11 @@ +error: getCount for element that is not in top-k +error: getEpsilon for element that is not in top-k +error: getCount for element that is not in top-k +error: getEpsilon for element that is not in top-k +error: getCount for element that is not in top-k +error: getEpsilon for element that is not in top-k +error: getCount for element that is not in top-k +error: getEpsilon for element that is not in top-k +warning: TopkVal::getSum() was used on a pruned data structure. Result values do not represent total element count +error: getCount for element that is not in top-k +error: getEpsilon for element that is not in top-k diff --git a/testing/btest/Baseline/bifs.topk/out b/testing/btest/Baseline/bifs.topk/out new file mode 100644 index 0000000000..1ce5c4b850 --- /dev/null +++ b/testing/btest/Baseline/bifs.topk/out @@ -0,0 +1,81 @@ +[b, c] +4 +0 +0 +2 +0 +2 +1 +[d, c] +5 +0 +0 +2 +1 +3 +2 +[d, e] +6 +3 +2 +3 +2 +[f, e] +7 +4 +3 +3 +2 +[f, e] +8 +4 +3 +4 +2 +[g, e] +9 +0 +0 +4 +2 +5 +4 +[c, e, d] +19 +6 +0 +5 +0 +4 +0 +[c, e] +6 +0 +5 +0 +0 +0 +[c, e] +22 +12 +0 +10 +0 +0 +0 +[c, e] +19 +6 +0 +5 +0 +4 +0 +[c, e, d] +38 +12 +0 +10 +0 +8 +0 diff --git a/testing/btest/Baseline/istate.topk/out b/testing/btest/Baseline/istate.topk/out new file mode 100644 index 0000000000..ef3d0cef30 --- /dev/null +++ b/testing/btest/Baseline/istate.topk/out @@ -0,0 +1,21 @@ +1 +2 +6 +4 +5 +1 +[c, e, d] +1 +2 +6 +4 +5 +1 +[c, e, d] +2 +4 +12 +8 +10 +2 +[c, e, d] diff --git a/testing/btest/Baseline/scripts.base.frameworks.sumstats.topk-cluster/manager-1..stdout b/testing/btest/Baseline/scripts.base.frameworks.sumstats.topk-cluster/manager-1..stdout new file mode 100644 index 0000000000..2d076eeac7 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.sumstats.topk-cluster/manager-1..stdout @@ -0,0 +1,9 @@ +Top entries for key counter +Num: 995, count: 100, epsilon: 0 +Num: 1, count: 99, epsilon: 0 +Num: 2, count: 98, epsilon: 0 +Num: 3, count: 97, epsilon: 0 +Num: 4, count: 96, epsilon: 0 +Top entries for key two +Num: 2, count: 4, epsilon: 0 +Num: 1, count: 3, epsilon: 0 diff --git a/testing/btest/Baseline/scripts.base.frameworks.sumstats.topk/.stdout b/testing/btest/Baseline/scripts.base.frameworks.sumstats.topk/.stdout new file mode 100644 index 0000000000..c85316eecc --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.sumstats.topk/.stdout @@ -0,0 +1,8 @@ +Top entries for key counter +Num: 1, count: 99, epsilon: 0 +Num: 2, count: 98, epsilon: 0 +Num: 3, count: 97, epsilon: 0 +Num: 4, count: 96, epsilon: 0 +Num: 5, count: 95, epsilon: 0 +Top entries for key two +Num: 1, count: 2, epsilon: 0 diff --git a/testing/btest/bifs/topk.bro b/testing/btest/bifs/topk.bro new file mode 100644 index 0000000000..02d13c4195 --- /dev/null +++ b/testing/btest/bifs/topk.bro @@ -0,0 +1,154 @@ +# @TEST-EXEC: bro -b %INPUT > out +# @TEST-EXEC: btest-diff out +# @TEST-EXEC: btest-diff .stderr + +event bro_init() + { + local k1 = topk_init(2); + + # first - peculiarity check... + topk_add(k1, "a"); + topk_add(k1, "b"); + topk_add(k1, "b"); + topk_add(k1, "c"); + + local s = topk_get_top(k1, 5); + print s; + print topk_sum(k1); + print topk_count(k1, "a"); + print topk_epsilon(k1, "a"); + print topk_count(k1, "b"); + print topk_epsilon(k1, "b"); + print topk_count(k1, "c"); + print topk_epsilon(k1, "c"); + + topk_add(k1, "d"); + s = topk_get_top(k1, 5); + print s; + print topk_sum(k1); + print topk_count(k1, "b"); + print topk_epsilon(k1, "b"); + print topk_count(k1, "c"); + print topk_epsilon(k1, "c"); + print topk_count(k1, "d"); + print topk_epsilon(k1, "d"); + + topk_add(k1, "e"); + s = topk_get_top(k1, 5); + print s; + print topk_sum(k1); + print topk_count(k1, "d"); + print topk_epsilon(k1, "d"); + print topk_count(k1, "e"); + print topk_epsilon(k1, "e"); + + topk_add(k1, "f"); + s = topk_get_top(k1, 5); + print s; + print topk_sum(k1); + print topk_count(k1, "f"); + print topk_epsilon(k1, "f"); + print topk_count(k1, "e"); + print topk_epsilon(k1, "e"); + + topk_add(k1, "e"); + s = topk_get_top(k1, 5); + print s; + print topk_sum(k1); + print topk_count(k1, "f"); + print topk_epsilon(k1, "f"); + print topk_count(k1, "e"); + print topk_epsilon(k1, "e"); + + topk_add(k1, "g"); + s = topk_get_top(k1, 5); + print s; + print topk_sum(k1); + print topk_count(k1, "f"); + print topk_epsilon(k1, "f"); + print topk_count(k1, "e"); + print topk_epsilon(k1, "e"); + print topk_count(k1, "g"); + print topk_epsilon(k1, "g"); + + k1 = topk_init(100); + topk_add(k1, "a"); + topk_add(k1, "b"); + topk_add(k1, "b"); + topk_add(k1, "c"); + topk_add(k1, "c"); + topk_add(k1, "c"); + topk_add(k1, "c"); + topk_add(k1, "c"); + topk_add(k1, "c"); + topk_add(k1, "d"); + topk_add(k1, "d"); + topk_add(k1, "d"); + topk_add(k1, "d"); + topk_add(k1, "e"); + topk_add(k1, "e"); + topk_add(k1, "e"); + topk_add(k1, "e"); + topk_add(k1, "e"); + topk_add(k1, "f"); + s = topk_get_top(k1, 3); + print s; + print topk_sum(k1); + print topk_count(k1, "c"); + print topk_epsilon(k1, "c"); + print topk_count(k1, "e"); + print topk_epsilon(k1, "d"); + print topk_count(k1, "d"); + print topk_epsilon(k1, "d"); + + local k3 = topk_init(2); + topk_merge_prune(k3, k1); + + s = topk_get_top(k3, 3); + print s; + print topk_count(k3, "c"); + print topk_epsilon(k3, "c"); + print topk_count(k3, "e"); + print topk_epsilon(k3, "e"); + print topk_count(k3, "d"); + print topk_epsilon(k3, "d"); + + topk_merge_prune(k3, k1); + + s = topk_get_top(k3, 3); + print s; + print topk_sum(k3); # this gives a warning and a wrong result. + print topk_count(k3, "c"); + print topk_epsilon(k3, "c"); + print topk_count(k3, "e"); + print topk_epsilon(k3, "e"); + print topk_count(k3, "d"); + print topk_epsilon(k3, "d"); + + k3 = topk_init(2); + topk_merge(k3, k1); + print s; + print topk_sum(k3); + print topk_count(k3, "c"); + print topk_epsilon(k3, "c"); + print topk_count(k3, "e"); + print topk_epsilon(k3, "e"); + print topk_count(k3, "d"); + print topk_epsilon(k3, "d"); + + topk_merge(k3, k1); + + s = topk_get_top(k3, 3); + print s; + print topk_sum(k3); + print topk_count(k3, "c"); + print topk_epsilon(k3, "c"); + print topk_count(k3, "e"); + print topk_epsilon(k3, "e"); + print topk_count(k3, "d"); + print topk_epsilon(k3, "d"); + + + + +} diff --git a/testing/btest/istate/topk.bro b/testing/btest/istate/topk.bro new file mode 100644 index 0000000000..4d599c2780 --- /dev/null +++ b/testing/btest/istate/topk.bro @@ -0,0 +1,74 @@ +# @TEST-EXEC: bro -b %INPUT runnumber=1 >out +# @TEST-EXEC: bro -b %INPUT runnumber=2 >>out +# @TEST-EXEC: bro -b %INPUT runnumber=3 >>out +# @TEST-EXEC: btest-diff out + +global runnumber: count &redef; # differentiate runs + +global k1: opaque of topk &persistent; +global k2: opaque of topk &persistent; + +event bro_init() + { + + k2 = topk_init(20); + + if ( runnumber == 1 ) + { + k1 = topk_init(100); + + topk_add(k1, "a"); + topk_add(k1, "b"); + topk_add(k1, "b"); + topk_add(k1, "c"); + topk_add(k1, "c"); + topk_add(k1, "c"); + topk_add(k1, "c"); + topk_add(k1, "c"); + topk_add(k1, "c"); + topk_add(k1, "d"); + topk_add(k1, "d"); + topk_add(k1, "d"); + topk_add(k1, "d"); + topk_add(k1, "e"); + topk_add(k1, "e"); + topk_add(k1, "e"); + topk_add(k1, "e"); + topk_add(k1, "e"); + topk_add(k1, "f"); + } + + local s = topk_get_top(k1, 3); + print topk_count(k1, "a"); + print topk_count(k1, "b"); + print topk_count(k1, "c"); + print topk_count(k1, "d"); + print topk_count(k1, "e"); + print topk_count(k1, "f"); + + if ( runnumber == 2 ) + { + topk_add(k1, "a"); + topk_add(k1, "b"); + topk_add(k1, "b"); + topk_add(k1, "c"); + topk_add(k1, "c"); + topk_add(k1, "c"); + topk_add(k1, "c"); + topk_add(k1, "c"); + topk_add(k1, "c"); + topk_add(k1, "d"); + topk_add(k1, "d"); + topk_add(k1, "d"); + topk_add(k1, "d"); + topk_add(k1, "e"); + topk_add(k1, "e"); + topk_add(k1, "e"); + topk_add(k1, "e"); + topk_add(k1, "e"); + topk_add(k1, "f"); + } + + print s; + + } diff --git a/testing/btest/scripts/base/frameworks/sumstats/topk-cluster.bro b/testing/btest/scripts/base/frameworks/sumstats/topk-cluster.bro new file mode 100644 index 0000000000..0ade38e86c --- /dev/null +++ b/testing/btest/scripts/base/frameworks/sumstats/topk-cluster.bro @@ -0,0 +1,110 @@ +# @TEST-SERIALIZE: comm +# +# @TEST-EXEC: btest-bg-run manager-1 BROPATH=$BROPATH:.. CLUSTER_NODE=manager-1 bro %INPUT +# @TEST-EXEC: sleep 1 +# @TEST-EXEC: btest-bg-run worker-1 BROPATH=$BROPATH:.. CLUSTER_NODE=worker-1 bro %INPUT +# @TEST-EXEC: btest-bg-run worker-2 BROPATH=$BROPATH:.. CLUSTER_NODE=worker-2 bro %INPUT +# @TEST-EXEC: btest-bg-wait 15 + +# @TEST-EXEC: btest-diff manager-1/.stdout +# +@TEST-START-FILE cluster-layout.bro +redef Cluster::nodes = { + ["manager-1"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=37757/tcp, $workers=set("worker-1", "worker-2")], + ["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=37760/tcp, $manager="manager-1", $interface="eth0"], + ["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=37761/tcp, $manager="manager-1", $interface="eth1"], +}; +@TEST-END-FILE + +redef Log::default_rotation_interval = 0secs; + + +event bro_init() &priority=5 + { + local r1: SumStats::Reducer = [$stream="test.metric", + $apply=set(SumStats::TOPK)]; + SumStats::create([$epoch=5secs, + $reducers=set(r1), + $epoch_finished(data: SumStats::ResultTable) = + { + for ( key in data ) + { + local r = data[key]["test.metric"]; + + local s: vector of SumStats::Observation; + s = topk_get_top(r$topk, 5); + + print fmt("Top entries for key %s", key$str); + for ( element in s ) + { + print fmt("Num: %d, count: %d, epsilon: %d", s[element]$num, topk_count(r$topk, s[element]), topk_epsilon(r$topk, s[element])); + } + + terminate(); + } + } + ]); + + + } + +event remote_connection_closed(p: event_peer) + { + terminate(); + } + +global ready_for_data: event(); +redef Cluster::manager2worker_events += /^ready_for_data$/; + +event ready_for_data() + { + const loop_v: vector of count = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100}; + + + if ( Cluster::node == "worker-1" ) + { + + local a: count; + a = 0; + + for ( i in loop_v ) + { + a = a + 1; + for ( j in loop_v ) + { + if ( i < j ) + SumStats::observe("test.metric", [$str="counter"], [$num=a]); + } + } + + + SumStats::observe("test.metric", [$str="two"], [$num=1]); + SumStats::observe("test.metric", [$str="two"], [$num=1]); + } + if ( Cluster::node == "worker-2" ) + { + SumStats::observe("test.metric", [$str="two"], [$num=2]); + SumStats::observe("test.metric", [$str="two"], [$num=2]); + SumStats::observe("test.metric", [$str="two"], [$num=2]); + SumStats::observe("test.metric", [$str="two"], [$num=2]); + SumStats::observe("test.metric", [$str="two"], [$num=1]); + + for ( i in loop_v ) + { + SumStats::observe("test.metric", [$str="counter"], [$num=995]); + } + } + } + +@if ( Cluster::local_node_type() == Cluster::MANAGER ) + +global peer_count = 0; +event remote_connection_handshake_done(p: event_peer) &priority=-5 + { + ++peer_count; + if ( peer_count == 2 ) + event ready_for_data(); + } + +@endif + diff --git a/testing/btest/scripts/base/frameworks/sumstats/topk.bro b/testing/btest/scripts/base/frameworks/sumstats/topk.bro new file mode 100644 index 0000000000..22a5af1bc7 --- /dev/null +++ b/testing/btest/scripts/base/frameworks/sumstats/topk.bro @@ -0,0 +1,48 @@ +# @TEST-EXEC: bro %INPUT +# @TEST-EXEC: btest-diff .stdout + +event bro_init() &priority=5 + { + local r1: SumStats::Reducer = [$stream="test.metric", + $apply=set(SumStats::TOPK)]; + SumStats::create([$epoch=3secs, + $reducers=set(r1), + $epoch_finished(data: SumStats::ResultTable) = + { + for ( key in data ) + { + local r = data[key]["test.metric"]; + + local s: vector of SumStats::Observation; + s = topk_get_top(r$topk, 5); + + print fmt("Top entries for key %s", key$str); + for ( element in s ) + { + print fmt("Num: %d, count: %d, epsilon: %d", s[element]$num, topk_count(r$topk, s[element]), topk_epsilon(r$topk, s[element])); + } + + } + } + ]); + + + const loop_v: vector of count = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100}; + + local a: count; + a = 0; + + for ( i in loop_v ) + { + a = a + 1; + for ( j in loop_v ) + { + if ( i < j ) + SumStats::observe("test.metric", [$str="counter"], [$num=a]); + } + } + + + SumStats::observe("test.metric", [$str="two"], [$num=1]); + SumStats::observe("test.metric", [$str="two"], [$num=1]); + }