diff --git a/CHANGES b/CHANGES index e4a4789f80..ca665576b1 100644 --- a/CHANGES +++ b/CHANGES @@ -1,4 +1,18 @@ +2.1-1216 | 2013-08-31 10:39:40 -0700 + + + * Support for probabilistic set cardinality, using the HyperLogLog + algorithm. (Bernhard Amann, Soumya Basu) + + Bro now provides the following BiFs: + + hll_cardinality_init(err: double, confidence: double): opaque of cardinality + hll_cardinality_add(handle: opaque of cardinality, elem: any): bool + hll_cardinality_merge_into(handle1: opaque of cardinality, handle2: opaque of cardinality): bool + hll_cardinality_estimate(handle: opaque of cardinality): double + hll_cardinality_copy(handle: opaque of cardinality): opaque of cardinality + 2.1-1154 | 2013-08-30 08:27:45 -0700 * Fix global opaque val segfault. Addresses BIT-1071. (Jon Siwek) diff --git a/VERSION b/VERSION index 7da5601ca6..e8782adeb0 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.1-1154 +2.1-1216 diff --git a/scripts/base/frameworks/sumstats/plugins/__load__.bro b/scripts/base/frameworks/sumstats/plugins/__load__.bro index d2f89e41c5..0b57597e58 100644 --- a/scripts/base/frameworks/sumstats/plugins/__load__.bro +++ b/scripts/base/frameworks/sumstats/plugins/__load__.bro @@ -1,4 +1,5 @@ @load ./average +@load ./hll_unique @load ./last @load ./max @load ./min diff --git a/scripts/base/frameworks/sumstats/plugins/hll_unique.bro b/scripts/base/frameworks/sumstats/plugins/hll_unique.bro new file mode 100644 index 0000000000..ed2d3b6e6c --- /dev/null +++ b/scripts/base/frameworks/sumstats/plugins/hll_unique.bro @@ -0,0 +1,62 @@ +@load base/frameworks/sumstats + +module SumStats; + +export { + redef record Reducer += { + ## The error margin for HLL. + hll_error_margin: double &default=0.01; + + ## The confidence for HLL. + hll_confidence: double &default=0.95; + }; + + redef enum Calculation += { + ## Calculate the number of unique values. + HLL_UNIQUE + }; + + redef record ResultVal += { + ## If cardinality is being tracked, the number of unique + ## items is tracked here. + hll_unique: count &default=0; + }; +} + +redef record ResultVal += { + # Internal use only. This is not meant to be publically available + # because probabilistic data structures have to be examined using + # specialized bifs. + card: opaque of cardinality &optional; + + # We need these in the compose hook. + hll_error_margin: double &optional; + hll_confidence: double &optional; +}; + +hook register_observe_plugins() + { + register_observe_plugin(HLL_UNIQUE, function(r: Reducer, val: double, obs: Observation, rv: ResultVal) + { + if ( ! rv?$card ) + { + rv$card = hll_cardinality_init(r$hll_error_margin, r$hll_confidence); + rv$hll_error_margin = r$hll_error_margin; + rv$hll_confidence = r$hll_confidence; + rv$hll_unique = 0; + } + + hll_cardinality_add(rv$card, obs); + rv$hll_unique = double_to_count(hll_cardinality_estimate(rv$card)); + }); + } + +hook compose_resultvals_hook(result: ResultVal, rv1: ResultVal, rv2: ResultVal) + { + local rhll = hll_cardinality_init(rv1$hll_error_margin, rv1$hll_confidence); + hll_cardinality_merge_into(rhll, rv1$card); + hll_cardinality_merge_into(rhll, rv2$card); + + result$card = rhll; + result$hll_unique = double_to_count(hll_cardinality_estimate(rhll)); + } diff --git a/scripts/base/init-bare.bro b/scripts/base/init-bare.bro index 59f13bf2eb..fe3b84a93b 100644 --- a/scripts/base/init-bare.bro +++ b/scripts/base/init-bare.bro @@ -702,7 +702,6 @@ type entropy_test_result: record { @load base/bif/strings.bif @load base/bif/bro.bif @load base/bif/reporter.bif -@load base/bif/bloom-filter.bif ## Deprecated. This is superseded by the new logging framework. global log_file_name: function(tag: string): string &redef; diff --git a/src/OpaqueVal.cc b/src/OpaqueVal.cc index 0c1d9d509d..b7ccd770ce 100644 --- a/src/OpaqueVal.cc +++ b/src/OpaqueVal.cc @@ -4,6 +4,8 @@ #include "NetVar.h" #include "Reporter.h" #include "Serializer.h" +#include "probabilistic/BloomFilter.h" +#include "probabilistic/CardinalityCounter.h" bool HashVal::IsValid() const { @@ -551,7 +553,7 @@ bool BloomFilterVal::Typify(BroType* arg_type) type->Ref(); TypeList* tl = new TypeList(type); - tl->Append(type); + tl->Append(type->Ref()); hash = new CompositeHash(tl); Unref(tl); @@ -673,3 +675,92 @@ bool BloomFilterVal::DoUnserialize(UnserialInfo* info) bloom_filter = probabilistic::BloomFilter::Unserialize(info); return bloom_filter != 0; } + +CardinalityVal::CardinalityVal() : OpaqueVal(cardinality_type) + { + c = 0; + type = 0; + hash = 0; + } + +CardinalityVal::CardinalityVal(probabilistic::CardinalityCounter* arg_c) + : OpaqueVal(cardinality_type) + { + c = arg_c; + type = 0; + hash = 0; + } + +CardinalityVal::~CardinalityVal() + { + Unref(type); + delete c; + delete hash; + } + +IMPLEMENT_SERIAL(CardinalityVal, SER_CARDINALITY_VAL); + +bool CardinalityVal::DoSerialize(SerialInfo* info) const + { + DO_SERIALIZE(SER_CARDINALITY_VAL, OpaqueVal); + + bool valid = true; + bool is_typed = (type != 0); + + valid &= SERIALIZE(is_typed); + + if ( is_typed ) + valid &= type->Serialize(info); + + return c->Serialize(info); + } + +bool CardinalityVal::DoUnserialize(UnserialInfo* info) + { + DO_UNSERIALIZE(OpaqueVal); + + bool is_typed; + if ( ! UNSERIALIZE(&is_typed) ) + return false; + + if ( is_typed ) + { + BroType* t = BroType::Unserialize(info); + if ( ! Typify(t) ) + return false; + + Unref(t); + } + + c = probabilistic::CardinalityCounter::Unserialize(info); + return c != 0; + } + +bool CardinalityVal::Typify(BroType* arg_type) + { + if ( type ) + return false; + + type = arg_type; + type->Ref(); + + TypeList* tl = new TypeList(type); + tl->Append(type->Ref()); + hash = new CompositeHash(tl); + Unref(tl); + + return true; + } + +BroType* CardinalityVal::Type() const + { + return type; + } + +void CardinalityVal::Add(const Val* val) + { + HashKey* key = hash->ComputeHash(val, 1); + c->AddElement(key->Hash()); + delete key; + } + diff --git a/src/OpaqueVal.h b/src/OpaqueVal.h index 08a20b1a31..70ba48f8d1 100644 --- a/src/OpaqueVal.h +++ b/src/OpaqueVal.h @@ -9,10 +9,9 @@ #include "Val.h" #include "digest.h" -#include "probabilistic/BloomFilter.h" - namespace probabilistic { class BloomFilter; + class CardinalityCounter; } class HashVal : public OpaqueVal { @@ -149,4 +148,28 @@ private: probabilistic::BloomFilter* bloom_filter; }; + +class CardinalityVal: public OpaqueVal { +public: + explicit CardinalityVal(probabilistic::CardinalityCounter*); + virtual ~CardinalityVal(); + + void Add(const Val* val); + + BroType* Type() const; + bool Typify(BroType* type); + + probabilistic::CardinalityCounter* Get() { return c; }; + +protected: + CardinalityVal(); + +private: + BroType* type; + CompositeHash* hash; + probabilistic::CardinalityCounter* c; + + DECLARE_SERIAL(CardinalityVal); +}; + #endif diff --git a/src/SerialTypes.h b/src/SerialTypes.h index 5271caa2e3..69927afb74 100644 --- a/src/SerialTypes.h +++ b/src/SerialTypes.h @@ -110,6 +110,7 @@ SERIAL_VAL(SHA256_VAL, 18) SERIAL_VAL(ENTROPY_VAL, 19) SERIAL_VAL(TOPK_VAL, 20) SERIAL_VAL(BLOOMFILTER_VAL, 21) +SERIAL_VAL(CARDINALITY_VAL, 22) #define SERIAL_EXPR(name, val) SERIAL_CONST(name, val, EXPR) SERIAL_EXPR(EXPR, 1) diff --git a/src/SerializationFormat.cc b/src/SerializationFormat.cc index 10dd4f29ea..a005a103cf 100644 --- a/src/SerializationFormat.cc +++ b/src/SerializationFormat.cc @@ -107,6 +107,15 @@ bool BinarySerializationFormat::Read(int* v, const char* tag) return true; } +bool BinarySerializationFormat::Read(uint8* v, const char* tag) + { + if ( ! ReadData(v, sizeof(*v)) ) + return false; + + DBG_LOG(DBG_SERIAL, "Read uint8 %hu [%s]", *v, tag); + return true; + } + bool BinarySerializationFormat::Read(uint16* v, const char* tag) { if ( ! ReadData(v, sizeof(*v)) ) @@ -301,6 +310,12 @@ bool BinarySerializationFormat::Write(char v, const char* tag) return WriteData(&v, 1); } +bool BinarySerializationFormat::Write(uint8 v, const char* tag) + { + DBG_LOG(DBG_SERIAL, "Write uint8 %hu [%s]", v, tag); + return WriteData(&v, sizeof(v)); + } + bool BinarySerializationFormat::Write(uint16 v, const char* tag) { DBG_LOG(DBG_SERIAL, "Write uint16 %hu [%s]", v, tag); @@ -447,6 +462,12 @@ bool XMLSerializationFormat::Read(int* v, const char* tag) return false; } +bool XMLSerializationFormat::Read(uint8* v, const char* tag) + { + reporter->InternalError("no reading of xml"); + return false; + } + bool XMLSerializationFormat::Read(uint16* v, const char* tag) { reporter->InternalError("no reading of xml"); @@ -530,6 +551,13 @@ bool XMLSerializationFormat::Write(char v, const char* tag) return WriteElem(tag, "char", &v, 1); } +bool XMLSerializationFormat::Write(uint8 v, const char* tag) + { + const char* tmp = fmt("%" PRIu8, v); + return WriteElem(tag, "uint8", tmp, strlen(tmp)); + } + + bool XMLSerializationFormat::Write(uint16 v, const char* tag) { const char* tmp = fmt("%" PRIu16, v); diff --git a/src/SerializationFormat.h b/src/SerializationFormat.h index f270b61bae..05cf56d961 100644 --- a/src/SerializationFormat.h +++ b/src/SerializationFormat.h @@ -23,6 +23,7 @@ public: virtual void EndRead(); virtual bool Read(int* v, const char* tag) = 0; + virtual bool Read(uint8* v, const char* tag) = 0; virtual bool Read(uint16* v, const char* tag) = 0; virtual bool Read(uint32* v, const char* tag) = 0; virtual bool Read(int64* v, const char* tag) = 0; @@ -47,6 +48,7 @@ public: virtual uint32 EndWrite(char** data); // passes ownership virtual bool Write(int v, const char* tag) = 0; + virtual bool Write(uint8 v, const char* tag) = 0; virtual bool Write(uint16 v, const char* tag) = 0; virtual bool Write(uint32 v, const char* tag) = 0; virtual bool Write(int64 v, const char* tag) = 0; @@ -92,6 +94,7 @@ public: virtual ~BinarySerializationFormat(); virtual bool Read(int* v, const char* tag); + virtual bool Read(uint8* v, const char* tag); virtual bool Read(uint16* v, const char* tag); virtual bool Read(uint32* v, const char* tag); virtual bool Read(int64* v, const char* tag); @@ -106,6 +109,7 @@ public: virtual bool Read(struct in_addr* addr, const char* tag); virtual bool Read(struct in6_addr* addr, const char* tag); virtual bool Write(int v, const char* tag); + virtual bool Write(uint8 v, const char* tag); virtual bool Write(uint16 v, const char* tag); virtual bool Write(uint32 v, const char* tag); virtual bool Write(int64 v, const char* tag); @@ -132,6 +136,7 @@ public: // We don't write anything if tag is nil. virtual bool Write(int v, const char* tag); + virtual bool Write(uint8 v, const char* tag); virtual bool Write(uint16 v, const char* tag); virtual bool Write(uint32 v, const char* tag); virtual bool Write(int64 v, const char* tag); @@ -152,6 +157,7 @@ public: // Not implemented. virtual bool Read(int* v, const char* tag); + virtual bool Read(uint8* v, const char* tag); virtual bool Read(uint16* v, const char* tag); virtual bool Read(uint32* v, const char* tag); virtual bool Read(int64* v, const char* tag); diff --git a/src/Serializer.h b/src/Serializer.h index 72e0723880..719d4dc527 100644 --- a/src/Serializer.h +++ b/src/Serializer.h @@ -54,6 +54,7 @@ public: DECLARE_WRITE(type) DECLARE_IO(int) + DECLARE_IO(uint8) DECLARE_IO(uint16) DECLARE_IO(uint32) DECLARE_IO(int64) diff --git a/src/Type.h b/src/Type.h index 52fdfe5043..a6163d5152 100644 --- a/src/Type.h +++ b/src/Type.h @@ -613,6 +613,7 @@ extern OpaqueType* md5_type; extern OpaqueType* sha1_type; extern OpaqueType* sha256_type; extern OpaqueType* entropy_type; +extern OpaqueType* cardinality_type; extern OpaqueType* topk_type; extern OpaqueType* bloomfilter_type; diff --git a/src/bro.bif b/src/bro.bif index e4d7db1d5b..c9b77e4368 100644 --- a/src/bro.bif +++ b/src/bro.bif @@ -1,8 +1,7 @@ ##! A collection of built-in functions that implement a variety of things ##! such as general programming algorithms, string processing, math functions, -##! introspection, type conversion, file/directory manipulation, packet -##! filtering, inter-process communication and controlling protocol analyzer -##! behavior. +##! introspection, type conversion, file/directory manipulation, packet filtering, +##! inter-process communication and controlling protocol analyzer behavior. %%{ // C segment #include diff --git a/src/main.cc b/src/main.cc index bc47e21fc5..5acba12367 100644 --- a/src/main.cc +++ b/src/main.cc @@ -128,6 +128,7 @@ OpaqueType* md5_type = 0; OpaqueType* sha1_type = 0; OpaqueType* sha256_type = 0; OpaqueType* entropy_type = 0; +OpaqueType* cardinality_type = 0; OpaqueType* topk_type = 0; OpaqueType* bloomfilter_type = 0; @@ -856,6 +857,7 @@ int main(int argc, char** argv) sha1_type = new OpaqueType("sha1"); sha256_type = new OpaqueType("sha256"); entropy_type = new OpaqueType("entropy"); + cardinality_type = new OpaqueType("cardinality"); topk_type = new OpaqueType("topk"); bloomfilter_type = new OpaqueType("bloomfilter"); diff --git a/src/probabilistic/CMakeLists.txt b/src/probabilistic/CMakeLists.txt index a36dfbbd6b..b845ecc7a2 100644 --- a/src/probabilistic/CMakeLists.txt +++ b/src/probabilistic/CMakeLists.txt @@ -9,11 +9,13 @@ include_directories(BEFORE set(probabilistic_SRCS BitVector.cc BloomFilter.cc + CardinalityCounter.cc CounterVector.cc Hasher.cc Topk.cc) bif_target(bloom-filter.bif) +bif_target(cardinality-counter.bif) bif_target(top-k.bif) bro_add_subdir_library(probabilistic ${probabilistic_SRCS}) diff --git a/src/probabilistic/CardinalityCounter.cc b/src/probabilistic/CardinalityCounter.cc new file mode 100644 index 0000000000..7a1814d6ae --- /dev/null +++ b/src/probabilistic/CardinalityCounter.cc @@ -0,0 +1,191 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#include +#include +#include + +#include "CardinalityCounter.h" +#include "Reporter.h" +#include "Serializer.h" + +using namespace probabilistic; + +int CardinalityCounter::OptimalB(double error, double confidence) + { + double initial_estimate = 2 * (log(1.04) - log(error)) / log(2); + int answer = (int) floor(initial_estimate); + + double k = 0; + + do { + answer++; + k = pow(2, (answer - initial_estimate) / 2); + } while ( erf(k / sqrt(2)) < confidence ); + + return answer; + } + +void CardinalityCounter::Init(uint64 size) + { + m = size; + buckets = new uint8_t[m]; + + // The following magic values are taken directly out of the + // description of the HyperLogLog algorithn. + + if ( m == 16 ) + alpha_m = 0.673; + + else if ( m == 32 ) + alpha_m = 0.697; + + else if ( m == 64 ) + alpha_m = 0.709; + + else if ( m >= 128 ) + alpha_m = 0.7213 / (1 + 1.079 / m); + + else + reporter->InternalError("Invalid size %" PRIu64 ". Size either has to be 16, 32, 64 or bigger than 128", size); + + for ( uint64 i = 0; i < m; i++ ) + buckets[i] = 0; + + V = m; + } + +CardinalityCounter::CardinalityCounter(double error_margin, double confidence) + { + int b = OptimalB(error_margin, confidence); + Init((uint64) pow(2, b)); + } + +CardinalityCounter::CardinalityCounter(uint64 size) + { + Init(size); + } + +CardinalityCounter::CardinalityCounter(uint64 arg_size, uint64 arg_V, double arg_alpha_m) + { + m = arg_size; + buckets = new uint8_t[m]; + alpha_m = arg_alpha_m; + V = arg_V; + } + +CardinalityCounter::~CardinalityCounter() + { + delete [] buckets; + } + +uint8_t CardinalityCounter::Rank(uint64 hash_modified) + { + uint8_t answer = 0; + + hash_modified = (uint64)(hash_modified / m); + hash_modified *= 2; + + do { + hash_modified = (uint64)(hash_modified / 2); + answer++; + } while ( hash_modified % 2 == 0); + + return answer; + } + +void CardinalityCounter::AddElement(uint64 hash) + { + uint64 index = hash % m; + hash = hash-index; + + if( buckets[index] == 0 ) + V--; + + uint8_t temp = Rank(hash); + + if ( temp > buckets[index] ) + buckets[index] = temp; + } + +double CardinalityCounter::Size() + { + double answer = 0; + for ( unsigned int i = 0; i < m; i++ ) + answer += pow(2, -((int)buckets[i])); + + answer = 1 / answer; + answer = (alpha_m * m * m * answer); + + if ( answer <= 5.0 * (m/2) ) + return m * log(((double)m) / V); + + else if ( answer <= (pow(2, 64) / 30) ) + return answer; + + else + return -pow(2, 64) * log(1 - (answer / pow(2, 64))); + } + +void CardinalityCounter::Merge(CardinalityCounter* c) + { + uint8_t* temp = c->GetBuckets(); + + V = 0; + + for ( unsigned int i = 0; i < m; i++ ) + { + if ( temp[i] > buckets[i] ) + buckets[i] = temp[i]; + + if ( buckets[i] == 0 ) + ++V; + } + } + +uint8_t* CardinalityCounter::GetBuckets() + { + return buckets; + } + +uint64 CardinalityCounter::GetM() + { + return m; + } + +bool CardinalityCounter::Serialize(SerialInfo* info) const + { + bool valid = true; + + valid &= SERIALIZE(m); + valid &= SERIALIZE(V); + valid &= SERIALIZE(alpha_m); + + for ( unsigned int i = 0; i < m; i++ ) + valid &= SERIALIZE(buckets[i]); + + return valid; + } + +CardinalityCounter* CardinalityCounter::Unserialize(UnserialInfo* info) + { + uint64_t m; + uint64 V; + double alpha_m; + + bool valid = true; + valid &= UNSERIALIZE(&m); + valid &= UNSERIALIZE(&V); + valid &= UNSERIALIZE(&alpha_m); + + CardinalityCounter* c = new CardinalityCounter(m, V, alpha_m); + + uint8_t* buckets = c->buckets; + + for ( unsigned int i = 0; i < m; i++ ) + { + uint8_t* currbucket = buckets + i; + valid &= UNSERIALIZE(currbucket); + } + + return valid ? c : 0; + } diff --git a/src/probabilistic/CardinalityCounter.h b/src/probabilistic/CardinalityCounter.h new file mode 100644 index 0000000000..2707c53808 --- /dev/null +++ b/src/probabilistic/CardinalityCounter.h @@ -0,0 +1,168 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#ifndef PROBABILISTIC_CARDINALITYCOUNTER_H +#define PROBABILISTIC_CARDINALITYCOUNTER_H + +#include +#include + +namespace probabilistic { + +/** + * A probabilisitc cardinality counter using the HyperLogLog algorithm. + * + * TODO: Update doc string. + */ +class CardinalityCounter { +public: + /** + * Constructor. + * + * Based on the error_margin, the number of buckets that need to be + * kept will be determined. Based on the max_size, the number of bits + * that will be used from the hash function will be determined. + * + * We need the hash function to return integers that are uniformly + * distributed from 0 to 2^L-1. And if that happens, the maximum + * cardinality that this counter can handle is approximately 2^L. By + * default, we will assume a value of 64 bits. + * + * Confidence in the estimate given by a cardinality counter is. + * + * In other words, if the cardinality is estimated to be 100 with 2% + * error margin and HLL_CONFis 0.95, then we are 95% sure that the + * actual cardinality is between 98 and 102. + */ + CardinalityCounter(double error_margin, double confidence = 0.95); + + /** + * Constructor used for cloning. + * + * The error margin will be 1.04/sqrt(m) with approximately 68% + * probability. + */ + CardinalityCounter(uint64 size); + + /** + * Deletes the class variables. + */ + ~CardinalityCounter(); + + /** + * This will add an element to the counter. It's responsible for + * adding an element and updating the value of V, if that applies. + */ + void AddElement(uint64 hash); + + /** + * Returns the size estimate of the set. First, it has the "raw" + * HyperLogLog estimate. And then, we check if it's too "large" or + * "small" because the raw estimate doesn't do well in those cases. + * Thus, we correct for those errors as specified in the paper. + */ + double Size(); + + /** + * Returns the buckets array that holds all of the rough cardinality + * estimates. + */ + uint8_t* GetBuckets(); + + /** + * Merges the argument cardinality counter with this one. The error + * margins are assumed to be the same, so they have the same number of + * buckets. If any of the conditions are violated, then the return + * value of size() is meaningless. + */ + void Merge(CardinalityCounter* c); + + /** + * Returns the value of m. Should be used only for statistical + * purposes. + */ + uint64 GetM(); + + /** +c * Serializes the cardinality counter. + * + * @param info The serializaton information to use. + * + * @return True if successful. + */ + bool Serialize(SerialInfo* info) const; + + /** + * Unserializes a cardinality counter. + * + * @param info The serializaton information to use. + * + * @return The unserialized cardinality counter, or null if an error + * occured. + */ + static CardinalityCounter* Unserialize(UnserialInfo* info); + +private: + /** + * Constructor used when unserializing, i.e., all parameters are + * known. + */ + CardinalityCounter(uint64 size, uint64 V, double alpha_m); + + /** + * Helper function with code used jointly by multiple constructors. + */ + void Init(uint64 arg_size); + + /** + * This function will calculate the smallest value of b that will + * satisfy these the constraints of a specified error margin and + * confidence level. + * + * The exact expression for b is as follows: + * Define x = 2*(log(1.04*k/error)/log(2)). Then b is the ceiling of x + * + * error is the error margin. + * + * k is the number of standard deviations that we have to go to have + * a confidence level of conf. + * + * confidence: TODO. + */ + int OptimalB(double error, double confidence); + + /** + * Computes when the first one appears in the element. It looks at the + * bitstring from the end though. A precondition is that the argument + * is already divisible by m, so we just ignore the last b bits, since + * m = 2^b and the last b bits will always be 0. + */ + uint8_t Rank(uint64 hash_modified); + + /** + * This is the number of buckets that will be stored. The standard + * error is 1.04/sqrt(m), so the actual cardinality will be the + * estimate +/- 1.04/sqrt(m) with approximately 68% probability. + */ + uint64 m; + + /** + * These are the actual buckets that are storing an estimate of the + * cardinality. All these need to do is count when the first 1 bit + * appears in the bitstring and that location is at most 65, so not + * that many bits are needed to store it. + */ + uint8_t* buckets; + + /** + * There are some state constants that need to be kept track of to + * make the final estimate easier. V is the number of values in + * buckets that are 0 and this is used in the small error correction. + * alpha_m is a multiplicative constant used in the algorithm. + */ + uint64 V; + double alpha_m; +}; + +} + +#endif diff --git a/src/probabilistic/cardinality-counter.bif b/src/probabilistic/cardinality-counter.bif new file mode 100644 index 0000000000..4d5316c9a3 --- /dev/null +++ b/src/probabilistic/cardinality-counter.bif @@ -0,0 +1,139 @@ +# =========================================================================== +# +# HyperLogLog Functions +# +# =========================================================================== + + +%%{ +#include "probabilistic/CardinalityCounter.h" + +using namespace probabilistic; +%%} + +module GLOBAL; + +## Initializes a probabilistic cardinality counter that uses the HyperLogLog algorithm. +## +## err: the desired error rate (e.g. 0.01). +## +## confidence: the desirec confidence for the error rate (e.g., 0.95). +## +## Returns: a HLL cardinality handle. +## +## .. bro:see:: hll_cardinality_estimate hll_cardinality_merge_into hll_cardinality_add +## hll_cardinality_copy +function hll_cardinality_init%(err: double, confidence: double%): opaque of cardinality + %{ + CardinalityCounter* c = new CardinalityCounter(err, confidence); + CardinalityVal* cv = new CardinalityVal(c); + + return cv; + %} + +## Adds an element to a HyperLogLog cardinality counter. +## +## handle: the HLL handle. +## +## elem: the element to add +## +## Returns: true on success +## +## .. bro:see:: hll_cardinality_estimate hll_cardinality_merge_into +## hll_cardinality_init hll_cardinality_copy +function hll_cardinality_add%(handle: opaque of cardinality, elem: any%): bool + %{ + CardinalityVal* cv = static_cast(handle); + + if ( ! cv->Type() && ! cv->Typify(elem->Type()) ) + { + reporter->Error("failed to set HLL type"); + return new Val(0, TYPE_BOOL); + } + + else if ( ! same_type(cv->Type(), elem->Type()) ) + { + reporter->Error("incompatible HLL data type"); + return new Val(0, TYPE_BOOL); + } + + cv->Add(elem); + return new Val(1, TYPE_BOOL); + %} + +## Merges a HLL cardinality counter into another. +## +## .. note:: The same restrictions as for Bloom filter merging apply, see +## :bro:id:`bloomfilter_merge`. +## +## handle1: the first HLL handle, which will contain the merged result +## +## handle2: the second HLL handle, which will be merged into the first +## +## Returns: true on success +## +## .. bro:see:: hll_cardinality_estimate hll_cardinality_add +## hll_cardinality_init hll_cardinality_copy +function hll_cardinality_merge_into%(handle1: opaque of cardinality, handle2: opaque of cardinality%): bool + %{ + CardinalityVal* v1 = static_cast(handle1); + CardinalityVal* v2 = static_cast(handle2); + + if ( (v1->Type() != v2->Type()) && // both 0 is ok + (v1->Type() != 0) && // any one 0 also is ok + (v2->Type() != 0) && + ! same_type(v1->Type(), v2->Type()) ) + { + reporter->Error("incompatible HLL types"); + return new Val(0, TYPE_BOOL); + } + + CardinalityCounter* h1 = v1->Get(); + CardinalityCounter* h2 = v2->Get(); + + h1->Merge(h2); + + return new Val(1, TYPE_BOOL); + %} + +## Estimate the current cardinality of an HLL cardinality counter. +## +## handle: the HLL handle +## +## Returns: the cardinality estimate. Returns -1.0 if the counter is empty. +## +## .. bro:see:: hll_cardinality_merge_into hll_cardinality_add +## hll_cardinality_init hll_cardinality_copy +function hll_cardinality_estimate%(handle: opaque of cardinality%): double + %{ + CardinalityVal* cv = static_cast(handle); + CardinalityCounter* h = cv->Get(); + + double estimate = h->Size(); + + return new Val(estimate, TYPE_DOUBLE); + %} + +## Copy a HLL cardinality counter. +## +## handle: cardinality counter to copy +## +## Returns: copy of handle +## +## .. bro:see:: hll_cardinality_estimate hll_cardinality_merge_into hll_cardinality_add +## hll_cardinality_init +function hll_cardinality_copy%(handle: opaque of cardinality%): opaque of cardinality + %{ + CardinalityVal* cv = static_cast(handle); + CardinalityCounter* h = cv->Get(); + + uint64_t m = h->GetM(); + CardinalityCounter* h2 = new CardinalityCounter(m); + + int i = 0; + h2->Merge(h); + CardinalityVal* out = new CardinalityVal(h2); + + return out; + %} + diff --git a/testing/btest/Baseline/bifs.hll_cardinality/.stderr b/testing/btest/Baseline/bifs.hll_cardinality/.stderr new file mode 100644 index 0000000000..840ee3363e --- /dev/null +++ b/testing/btest/Baseline/bifs.hll_cardinality/.stderr @@ -0,0 +1 @@ +error: incompatible HLL data type diff --git a/testing/btest/Baseline/bifs.hll_cardinality/out b/testing/btest/Baseline/bifs.hll_cardinality/out new file mode 100644 index 0000000000..8d20248cc3 --- /dev/null +++ b/testing/btest/Baseline/bifs.hll_cardinality/out @@ -0,0 +1,23 @@ +This value should be around 13: +13.00129 +This value should be about 12: +12.001099 +This value should be around 0: +0.0 +This value should be around 13: +13.00129 +This value should be 0: +0.0 +This value should be true: +T +This value should be about 12: +12.001099 +12.001099 +This value should be true: +T +This value should be about 21: +21.003365 +This value should be about 13: +13.00129 +This value should be about 12: +12.001099 diff --git a/testing/btest/Baseline/core.leaks.hll_cluster/manager-1..stdout b/testing/btest/Baseline/core.leaks.hll_cluster/manager-1..stdout new file mode 100644 index 0000000000..910a87642c --- /dev/null +++ b/testing/btest/Baseline/core.leaks.hll_cluster/manager-1..stdout @@ -0,0 +1,2 @@ +This value should be about 21: +21.003365 diff --git a/testing/btest/Baseline/core.leaks.hll_cluster/worker-1..stdout b/testing/btest/Baseline/core.leaks.hll_cluster/worker-1..stdout new file mode 100644 index 0000000000..e64c2b30aa --- /dev/null +++ b/testing/btest/Baseline/core.leaks.hll_cluster/worker-1..stdout @@ -0,0 +1,2 @@ +This value should be around 13: +13.00129 diff --git a/testing/btest/Baseline/core.leaks.hll_cluster/worker-2..stdout b/testing/btest/Baseline/core.leaks.hll_cluster/worker-2..stdout new file mode 100644 index 0000000000..d2b4f08b8d --- /dev/null +++ b/testing/btest/Baseline/core.leaks.hll_cluster/worker-2..stdout @@ -0,0 +1,2 @@ +This value should be about 12: +12.001099 diff --git a/testing/btest/Baseline/istate.hll/out b/testing/btest/Baseline/istate.hll/out new file mode 100644 index 0000000000..f5bb99d960 --- /dev/null +++ b/testing/btest/Baseline/istate.hll/out @@ -0,0 +1,6 @@ +1 +10.000763 +2 +10.000763 +3 +11.000923 diff --git a/testing/btest/Baseline/scripts.base.frameworks.sumstats.basic-cluster/manager-1..stdout b/testing/btest/Baseline/scripts.base.frameworks.sumstats.basic-cluster/manager-1..stdout index ea8904d2e6..ab25d52947 100644 --- a/testing/btest/Baseline/scripts.base.frameworks.sumstats.basic-cluster/manager-1..stdout +++ b/testing/btest/Baseline/scripts.base.frameworks.sumstats.basic-cluster/manager-1..stdout @@ -1,4 +1,4 @@ -Host: 6.5.4.3 - num:2 - sum:6.0 - avg:3.0 - max:5.0 - min:1.0 - var:8.0 - std_dev:2.8 - unique:2 -Host: 10.10.10.10 - num:1 - sum:5.0 - avg:5.0 - max:5.0 - min:5.0 - var:0.0 - std_dev:0.0 - unique:1 -Host: 1.2.3.4 - num:9 - sum:437.0 - avg:48.6 - max:95.0 - min:3.0 - var:758.8 - std_dev:27.5 - unique:8 -Host: 7.2.1.5 - num:2 - sum:145.0 - avg:72.5 - max:91.0 - min:54.0 - var:684.5 - std_dev:26.2 - unique:2 +Host: 6.5.4.3 - num:2 - sum:6.0 - avg:3.0 - max:5.0 - min:1.0 - var:8.0 - std_dev:2.8 - unique:2 - hllunique:2 +Host: 10.10.10.10 - num:1 - sum:5.0 - avg:5.0 - max:5.0 - min:5.0 - var:0.0 - std_dev:0.0 - unique:1 - hllunique:1 +Host: 1.2.3.4 - num:9 - sum:437.0 - avg:48.6 - max:95.0 - min:3.0 - var:758.8 - std_dev:27.5 - unique:8 - hllunique:8 +Host: 7.2.1.5 - num:2 - sum:145.0 - avg:72.5 - max:91.0 - min:54.0 - var:684.5 - std_dev:26.2 - unique:2 - hllunique:2 diff --git a/testing/btest/Baseline/scripts.base.frameworks.sumstats.basic/.stdout b/testing/btest/Baseline/scripts.base.frameworks.sumstats.basic/.stdout index 208b6103b7..0ada495cfc 100644 --- a/testing/btest/Baseline/scripts.base.frameworks.sumstats.basic/.stdout +++ b/testing/btest/Baseline/scripts.base.frameworks.sumstats.basic/.stdout @@ -1,3 +1,3 @@ -Host: 6.5.4.3 - num:1 - sum:2.0 - var:0.0 - avg:2.0 - max:2.0 - min:2.0 - std_dev:0.0 - unique:1 -Host: 1.2.3.4 - num:5 - sum:221.0 - var:1144.2 - avg:44.2 - max:94.0 - min:5.0 - std_dev:33.8 - unique:4 -Host: 7.2.1.5 - num:1 - sum:1.0 - var:0.0 - avg:1.0 - max:1.0 - min:1.0 - std_dev:0.0 - unique:1 +Host: 6.5.4.3 - num:1 - sum:2.0 - var:0.0 - avg:2.0 - max:2.0 - min:2.0 - std_dev:0.0 - unique:1 - hllunique:1 +Host: 1.2.3.4 - num:5 - sum:221.0 - var:1144.2 - avg:44.2 - max:94.0 - min:5.0 - std_dev:33.8 - unique:4 - hllunique:4 +Host: 7.2.1.5 - num:1 - sum:1.0 - var:0.0 - avg:1.0 - max:1.0 - min:1.0 - std_dev:0.0 - unique:1 - hllunique:1 diff --git a/testing/btest/bifs/hll_cardinality.bro b/testing/btest/bifs/hll_cardinality.bro new file mode 100644 index 0000000000..d1b0807416 --- /dev/null +++ b/testing/btest/bifs/hll_cardinality.bro @@ -0,0 +1,83 @@ +# +# @TEST-EXEC: bro %INPUT>out +# @TEST-EXEC: btest-diff out +# @TEST-EXEC: btest-diff .stderr + +event bro_init() + { + local c1 = hll_cardinality_init(0.01, 0.95); + local c2 = hll_cardinality_init(0.01, 0.95); + + local add1 = 2001; + local add2 = 2002; + local add3 = 2003; + + hll_cardinality_add(c1, add1); + hll_cardinality_add(c1, add2); + hll_cardinality_add(c1, add3); + hll_cardinality_add(c1, 1000); + hll_cardinality_add(c1, 1001); + hll_cardinality_add(c1, 101); + hll_cardinality_add(c1, 1003); + hll_cardinality_add(c1, 1004); + hll_cardinality_add(c1, 1005); + hll_cardinality_add(c1, 1006); + hll_cardinality_add(c1, 1007); + hll_cardinality_add(c1, 1008); + hll_cardinality_add(c1, 1009); + + hll_cardinality_add(c2, add1); + hll_cardinality_add(c2, add2); + hll_cardinality_add(c2, add3); + hll_cardinality_add(c2, 1); + hll_cardinality_add(c2, "b"); + hll_cardinality_add(c2, 101); + hll_cardinality_add(c2, 2); + hll_cardinality_add(c2, 3); + hll_cardinality_add(c2, 4); + hll_cardinality_add(c2, 5); + hll_cardinality_add(c2, 6); + hll_cardinality_add(c2, 7); + hll_cardinality_add(c2, 8); + + print "This value should be around 13:"; + print hll_cardinality_estimate(c1); + + print "This value should be about 12:"; + print hll_cardinality_estimate(c2); + + local m2 = hll_cardinality_init(0.02, 0.95); + + print "This value should be around 0:"; + print hll_cardinality_estimate(m2); + + local c3 = hll_cardinality_copy(c1); + + print "This value should be around 13:"; + print hll_cardinality_estimate(c3); + + c3 = hll_cardinality_init(0.01, 0.95); + print "This value should be 0:"; + print hll_cardinality_estimate(c3); + + print "This value should be true:"; + print hll_cardinality_merge_into(c3, c2); + + print "This value should be about 12:"; + print hll_cardinality_estimate(c2); + print hll_cardinality_estimate(c3); + + print "This value should be true:"; + print hll_cardinality_merge_into(c2, c1); + + print "This value should be about 21:"; + print hll_cardinality_estimate(c2); + + print "This value should be about 13:"; + print hll_cardinality_estimate(c1); + + print "This value should be about 12:"; + print hll_cardinality_estimate(c3); + + } + diff --git a/testing/btest/core/leaks/hll_cluster.bro b/testing/btest/core/leaks/hll_cluster.bro new file mode 100644 index 0000000000..65fe8da447 --- /dev/null +++ b/testing/btest/core/leaks/hll_cluster.bro @@ -0,0 +1,111 @@ +# Needs perftools support. +# +# @TEST-SERIALIZE: comm +# @TEST-GROUP: leaks +# +# @TEST-REQUIRES: bro --help 2>&1 | grep -q mem-leaks +# +# @TEST-EXEC: bro %INPUT>out +# @TEST-EXEC: btest-bg-run manager-1 HEAP_CHECK_DUMP_DIRECTORY=. HEAPCHECK=local BROPATH=$BROPATH:.. CLUSTER_NODE=manager-1 bro %INPUT +# @TEST-EXEC: sleep 2 +# @TEST-EXEC: btest-bg-run worker-1 HEAP_CHECK_DUMP_DIRECTORY=. HEAPCHECK=local BROPATH=$BROPATH:.. CLUSTER_NODE=worker-1 bro runnumber=1 %INPUT +# @TEST-EXEC: btest-bg-run worker-2 HEAP_CHECK_DUMP_DIRECTORY=. HEAPCHECK=local BROPATH=$BROPATH:.. CLUSTER_NODE=worker-2 bro runnumber=2 %INPUT +# @TEST-EXEC: btest-bg-wait -k 10 +# +# @TEST-EXEC: btest-diff manager-1/.stdout +# @TEST-EXEC: btest-diff worker-1/.stdout +# @TEST-EXEC: btest-diff worker-2/.stdout + +@TEST-START-FILE cluster-layout.bro +redef Cluster::nodes = { + ["manager-1"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=37757/tcp, $workers=set("worker-1", "worker-2")], + ["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=37760/tcp, $manager="manager-1"], + ["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=37761/tcp, $manager="manager-1"], +}; +@TEST-END-FILE + +redef Log::default_rotation_interval = 0secs; + +global hll_data: event(data: opaque of cardinality); + +redef Cluster::worker2manager_events += /hll_data/; + +@if ( Cluster::local_node_type() == Cluster::WORKER ) + +global runnumber: count &redef; # differentiate runs + +event remote_connection_handshake_done(p: event_peer) + { + local c = hll_cardinality_init(0.01, 0.95); + + local add1 = 2001; + local add2 = 2002; + local add3 = 2003; + + if ( runnumber == 1 ) + { + hll_cardinality_add(c, add1); + hll_cardinality_add(c, add2); + hll_cardinality_add(c, add3); + hll_cardinality_add(c, 1000); + hll_cardinality_add(c, 1001); + hll_cardinality_add(c, 101); + hll_cardinality_add(c, 1003); + hll_cardinality_add(c, 1004); + hll_cardinality_add(c, 1005); + hll_cardinality_add(c, 1006); + hll_cardinality_add(c, 1007); + hll_cardinality_add(c, 1008); + hll_cardinality_add(c, 1009); + print "This value should be around 13:"; + print hll_cardinality_estimate(c); + } + else if ( runnumber == 2 ) + { + hll_cardinality_add(c, add1); + hll_cardinality_add(c, add2); + hll_cardinality_add(c, add3); + hll_cardinality_add(c, 1); + hll_cardinality_add(c, 101); + hll_cardinality_add(c, 2); + hll_cardinality_add(c, 3); + hll_cardinality_add(c, 4); + hll_cardinality_add(c, 5); + hll_cardinality_add(c, 6); + hll_cardinality_add(c, 7); + hll_cardinality_add(c, 8); + print "This value should be about 12:"; + print hll_cardinality_estimate(c); + } + + event hll_data(c); + + terminate(); + } + +@endif + +@if ( Cluster::local_node_type() == Cluster::MANAGER ) + +global result_count = 0; +global hll: opaque of cardinality; + +event bro_init() + { + hll = hll_cardinality_init(0.01, 0.95); + } + +event hll_data(data: opaque of cardinality) + { + hll_cardinality_merge_into(hll, data); + ++result_count; + + if ( result_count == 2 ) + { + print "This value should be about 21:"; + print hll_cardinality_estimate(hll); + terminate(); + } + } + +@endif diff --git a/testing/btest/istate/hll.bro b/testing/btest/istate/hll.bro new file mode 100644 index 0000000000..511a892644 --- /dev/null +++ b/testing/btest/istate/hll.bro @@ -0,0 +1,40 @@ +# @TEST-EXEC: bro -b %INPUT runnumber=1 >out +# @TEST-EXEC: bro -b %INPUT runnumber=2 >>out +# @TEST-EXEC: bro -b %INPUT runnumber=3 >>out +# @TEST-EXEC: btest-diff out + +global runnumber: count &redef; # differentiate first and second run + +global card: opaque of cardinality &persistent; + +event bro_init() + { + print runnumber; + + if ( runnumber == 1 ) + { + card = hll_cardinality_init(0.01, 0.95); + + hll_cardinality_add(card, "a"); + hll_cardinality_add(card, "b"); + hll_cardinality_add(card, "c"); + hll_cardinality_add(card, "d"); + hll_cardinality_add(card, "e"); + hll_cardinality_add(card, "f"); + hll_cardinality_add(card, "g"); + hll_cardinality_add(card, "h"); + hll_cardinality_add(card, "i"); + hll_cardinality_add(card, "j"); + } + + print hll_cardinality_estimate(card); + + if ( runnumber == 2 ) + { + hll_cardinality_add(card, "a"); + hll_cardinality_add(card, "b"); + hll_cardinality_add(card, "c"); + hll_cardinality_add(card, "aa"); + } + } + diff --git a/testing/btest/scripts/base/frameworks/sumstats/basic-cluster.bro b/testing/btest/scripts/base/frameworks/sumstats/basic-cluster.bro index 2206673c3c..e83096919a 100644 --- a/testing/btest/scripts/base/frameworks/sumstats/basic-cluster.bro +++ b/testing/btest/scripts/base/frameworks/sumstats/basic-cluster.bro @@ -22,14 +22,14 @@ global n = 0; event bro_init() &priority=5 { - local r1: SumStats::Reducer = [$stream="test", $apply=set(SumStats::SUM, SumStats::MIN, SumStats::MAX, SumStats::AVERAGE, SumStats::STD_DEV, SumStats::VARIANCE, SumStats::UNIQUE)]; + local r1: SumStats::Reducer = [$stream="test", $apply=set(SumStats::SUM, SumStats::MIN, SumStats::MAX, SumStats::AVERAGE, SumStats::STD_DEV, SumStats::VARIANCE, SumStats::UNIQUE, SumStats::HLL_UNIQUE)]; SumStats::create([$name="test", $epoch=5secs, $reducers=set(r1), $epoch_result(ts: time, key: SumStats::Key, result: SumStats::Result) = { local r = result["test"]; - print fmt("Host: %s - num:%d - sum:%.1f - avg:%.1f - max:%.1f - min:%.1f - var:%.1f - std_dev:%.1f - unique:%d", key$host, r$num, r$sum, r$average, r$max, r$min, r$variance, r$std_dev, r$unique); + print fmt("Host: %s - num:%d - sum:%.1f - avg:%.1f - max:%.1f - min:%.1f - var:%.1f - std_dev:%.1f - unique:%d - hllunique:%d", key$host, r$num, r$sum, r$average, r$max, r$min, r$variance, r$std_dev, r$unique, r$hllunique); }, $epoch_finished(ts: time) = { diff --git a/testing/btest/scripts/base/frameworks/sumstats/basic.bro b/testing/btest/scripts/base/frameworks/sumstats/basic.bro index 906d69a6f3..d8738b71b7 100644 --- a/testing/btest/scripts/base/frameworks/sumstats/basic.bro +++ b/testing/btest/scripts/base/frameworks/sumstats/basic.bro @@ -10,14 +10,15 @@ event bro_init() &priority=5 SumStats::MAX, SumStats::MIN, SumStats::STD_DEV, - SumStats::UNIQUE)]; + SumStats::UNIQUE, + SumStats::HLL_UNIQUE)]; SumStats::create([$name="test", $epoch=3secs, $reducers=set(r1), $epoch_result(ts: time, key: SumStats::Key, result: SumStats::Result) = { local r = result["test.metric"]; - print fmt("Host: %s - num:%d - sum:%.1f - var:%.1f - avg:%.1f - max:%.1f - min:%.1f - std_dev:%.1f - unique:%d", key$host, r$num, r$sum, r$variance, r$average, r$max, r$min, r$std_dev, r$unique); + print fmt("Host: %s - num:%d - sum:%.1f - var:%.1f - avg:%.1f - max:%.1f - min:%.1f - std_dev:%.1f - unique:%d - hllunique:%d", key$host, r$num, r$sum, r$variance, r$average, r$max, r$min, r$std_dev, r$unique, r$hllunique); } ]);