diff --git a/scripts/base/frameworks/input/readers/benchmark.bro b/scripts/base/frameworks/input/readers/benchmark.bro index b5adc70861..fe44914271 100644 --- a/scripts/base/frameworks/input/readers/benchmark.bro +++ b/scripts/base/frameworks/input/readers/benchmark.bro @@ -19,5 +19,5 @@ export { const stopspreadat = 0 &redef; ## 1 -> enable timed spreading - const timedspread = 0 &redef; + const timedspread = 0.0 &redef; } diff --git a/src/input.bif b/src/input.bif index 63cbb2796d..0749ac0287 100644 --- a/src/input.bif +++ b/src/input.bif @@ -52,4 +52,4 @@ const spread: count; const autospread: double; const addfactor: count; const stopspreadat: count; -const timedspread: count; +const timedspread: double; diff --git a/src/input/Manager.cc b/src/input/Manager.cc index f8ad493e11..218a9209ee 100644 --- a/src/input/Manager.cc +++ b/src/input/Manager.cc @@ -133,11 +133,15 @@ Manager::TableFilter::~TableFilter() { if ( rtype ) // can be 0 for sets Unref(rtype); - if ( currDict != 0 ) + if ( currDict != 0 ) { + currDict->Clear(); delete currDict; + } - if ( lastDict != 0 ) + if ( lastDict != 0 ) { + lastDict->Clear();; delete lastDict; + } } struct ReaderDefinition { @@ -898,6 +902,7 @@ int Manager::SendEntryTable(Filter* i, const Value* const *vals) { } //i->tab->Assign(idxval, valval); + assert(idxval); HashKey* k = filter->tab->ComputeHash(idxval); if ( !k ) { reporter->InternalError("could not hash"); @@ -1067,8 +1072,6 @@ void Manager::Put(ReaderFrontend* reader, Value* *vals) { } int Manager::SendEventFilterEvent(Filter* i, EnumVal* type, const Value* const *vals) { - bool updated = false; - assert(i); assert(i->filter_type == EVENT_FILTER); diff --git a/src/input/readers/Benchmark.cc b/src/input/readers/Benchmark.cc index 391fdd7435..118b57f616 100644 --- a/src/input/readers/Benchmark.cc +++ b/src/input/readers/Benchmark.cc @@ -13,6 +13,8 @@ #include #include +#include "../../threading/Manager.h" + using namespace input::reader; using threading::Value; using threading::Field; @@ -27,7 +29,7 @@ Benchmark::Benchmark(ReaderFrontend *frontend) : ReaderBackend(frontend) add = int(BifConst::InputBenchmark::addfactor); autospread_time = 0; stopspreadat = int(BifConst::InputBenchmark::stopspreadat); - timedspread = int(BifConst::InputBenchmark::timedspread); + timedspread = double(BifConst::InputBenchmark::timedspread); } @@ -87,7 +89,8 @@ double Benchmark::CurrTime() { // read the entire file and send appropriate thingies back to InputMgr bool Benchmark::DoUpdate() { - for ( int i = 0; i < num_lines; i++ ) { + int linestosend = num_lines * threading::Manager::HEART_BEAT_INTERVAL; + for ( int i = 0; i < linestosend; i++ ) { Value** field = new Value*[num_fields]; for (unsigned int j = 0; j < num_fields; j++ ) { field[j] = EntryToVal(fields[j]->type, fields[j]->subtype); @@ -108,12 +111,13 @@ bool Benchmark::DoUpdate() { usleep( autospread_time ); } - if ( timedspread == 1 ) { + if ( timedspread != 0.0 ) { double diff; do { diff = CurrTime() - heartbeatstarttime; //printf("%d %f\n", i, diff); - } while ( diff < i/(num_lines + (num_lines * 0.15) ) ); + //} while ( diff < i/threading::Manager::HEART_BEAT_INTERVAL*(num_lines + (num_lines * timedspread) ) ); + } while ( diff/threading::Manager::HEART_BEAT_INTERVAL < i/(linestosend + (linestosend * timedspread) ) ); //} while ( diff < 0.8); } @@ -226,6 +230,15 @@ threading::Value* Benchmark::EntryToVal(TypeTag type, TypeTag subtype) { bool Benchmark::DoHeartbeat(double network_time, double current_time) { + /* + * This does not work the way I envisioned it, because the queueing is the problem. + printf("%f\n", CurrTime() - current_time); + if ( CurrTime() - current_time > 0.25 ) { + // event has hung for a time. refuse. + SendEvent("EndBenchmark", 0, 0); + return true; + } */ + ReaderBackend::DoHeartbeat(network_time, current_time); num_lines = (int) ( (double) num_lines*multiplication_factor); num_lines += add; diff --git a/src/input/readers/Benchmark.h b/src/input/readers/Benchmark.h index e5dca66889..ca248586da 100644 --- a/src/input/readers/Benchmark.h +++ b/src/input/readers/Benchmark.h @@ -45,7 +45,7 @@ private: int add; int stopspreadat; double heartbeatstarttime; - int timedspread; + double timedspread; string RandomString(const int len); diff --git a/src/threading/Manager.h b/src/threading/Manager.h index 7d9ba766d4..d5d78b288a 100644 --- a/src/threading/Manager.h +++ b/src/threading/Manager.h @@ -9,6 +9,10 @@ #include "BasicThread.h" #include "MsgThread.h" +namespace input { namespace reader { + class Benchmark; +}} + namespace threading { /** @@ -80,6 +84,7 @@ public: protected: friend class BasicThread; friend class MsgThread; + friend class input::reader::Benchmark; // needs heartbeat /** * Registers a new basic thread with the manager. This is @@ -118,9 +123,10 @@ protected: * Part of the IOSource interface. */ virtual const char* Tag() { return "threading::Manager"; } + + static const int HEART_BEAT_INTERVAL = 10; private: - static const int HEART_BEAT_INTERVAL = 1; typedef std::list all_thread_list; all_thread_list all_threads;