mirror of
https://github.com/zeek/zeek.git
synced 2025-10-05 16:18:19 +00:00
make benchmark reader hartbeat inverval aware
fix small memleak on tablereader destruction make timespread better configureable
This commit is contained in:
parent
b47620e501
commit
1170a87769
6 changed files with 34 additions and 12 deletions
|
@ -19,5 +19,5 @@ export {
|
||||||
const stopspreadat = 0 &redef;
|
const stopspreadat = 0 &redef;
|
||||||
|
|
||||||
## 1 -> enable timed spreading
|
## 1 -> enable timed spreading
|
||||||
const timedspread = 0 &redef;
|
const timedspread = 0.0 &redef;
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,4 +52,4 @@ const spread: count;
|
||||||
const autospread: double;
|
const autospread: double;
|
||||||
const addfactor: count;
|
const addfactor: count;
|
||||||
const stopspreadat: count;
|
const stopspreadat: count;
|
||||||
const timedspread: count;
|
const timedspread: double;
|
||||||
|
|
|
@ -133,11 +133,15 @@ Manager::TableFilter::~TableFilter() {
|
||||||
if ( rtype ) // can be 0 for sets
|
if ( rtype ) // can be 0 for sets
|
||||||
Unref(rtype);
|
Unref(rtype);
|
||||||
|
|
||||||
if ( currDict != 0 )
|
if ( currDict != 0 ) {
|
||||||
|
currDict->Clear();
|
||||||
delete currDict;
|
delete currDict;
|
||||||
|
}
|
||||||
|
|
||||||
if ( lastDict != 0 )
|
if ( lastDict != 0 ) {
|
||||||
|
lastDict->Clear();;
|
||||||
delete lastDict;
|
delete lastDict;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct ReaderDefinition {
|
struct ReaderDefinition {
|
||||||
|
@ -898,6 +902,7 @@ int Manager::SendEntryTable(Filter* i, const Value* const *vals) {
|
||||||
}
|
}
|
||||||
|
|
||||||
//i->tab->Assign(idxval, valval);
|
//i->tab->Assign(idxval, valval);
|
||||||
|
assert(idxval);
|
||||||
HashKey* k = filter->tab->ComputeHash(idxval);
|
HashKey* k = filter->tab->ComputeHash(idxval);
|
||||||
if ( !k ) {
|
if ( !k ) {
|
||||||
reporter->InternalError("could not hash");
|
reporter->InternalError("could not hash");
|
||||||
|
@ -1067,8 +1072,6 @@ void Manager::Put(ReaderFrontend* reader, Value* *vals) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int Manager::SendEventFilterEvent(Filter* i, EnumVal* type, const Value* const *vals) {
|
int Manager::SendEventFilterEvent(Filter* i, EnumVal* type, const Value* const *vals) {
|
||||||
bool updated = false;
|
|
||||||
|
|
||||||
assert(i);
|
assert(i);
|
||||||
|
|
||||||
assert(i->filter_type == EVENT_FILTER);
|
assert(i->filter_type == EVENT_FILTER);
|
||||||
|
|
|
@ -13,6 +13,8 @@
|
||||||
#include <sys/stat.h>
|
#include <sys/stat.h>
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
|
|
||||||
|
#include "../../threading/Manager.h"
|
||||||
|
|
||||||
using namespace input::reader;
|
using namespace input::reader;
|
||||||
using threading::Value;
|
using threading::Value;
|
||||||
using threading::Field;
|
using threading::Field;
|
||||||
|
@ -27,7 +29,7 @@ Benchmark::Benchmark(ReaderFrontend *frontend) : ReaderBackend(frontend)
|
||||||
add = int(BifConst::InputBenchmark::addfactor);
|
add = int(BifConst::InputBenchmark::addfactor);
|
||||||
autospread_time = 0;
|
autospread_time = 0;
|
||||||
stopspreadat = int(BifConst::InputBenchmark::stopspreadat);
|
stopspreadat = int(BifConst::InputBenchmark::stopspreadat);
|
||||||
timedspread = int(BifConst::InputBenchmark::timedspread);
|
timedspread = double(BifConst::InputBenchmark::timedspread);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -87,7 +89,8 @@ double Benchmark::CurrTime() {
|
||||||
|
|
||||||
// read the entire file and send appropriate thingies back to InputMgr
|
// read the entire file and send appropriate thingies back to InputMgr
|
||||||
bool Benchmark::DoUpdate() {
|
bool Benchmark::DoUpdate() {
|
||||||
for ( int i = 0; i < num_lines; i++ ) {
|
int linestosend = num_lines * threading::Manager::HEART_BEAT_INTERVAL;
|
||||||
|
for ( int i = 0; i < linestosend; i++ ) {
|
||||||
Value** field = new Value*[num_fields];
|
Value** field = new Value*[num_fields];
|
||||||
for (unsigned int j = 0; j < num_fields; j++ ) {
|
for (unsigned int j = 0; j < num_fields; j++ ) {
|
||||||
field[j] = EntryToVal(fields[j]->type, fields[j]->subtype);
|
field[j] = EntryToVal(fields[j]->type, fields[j]->subtype);
|
||||||
|
@ -108,12 +111,13 @@ bool Benchmark::DoUpdate() {
|
||||||
usleep( autospread_time );
|
usleep( autospread_time );
|
||||||
}
|
}
|
||||||
|
|
||||||
if ( timedspread == 1 ) {
|
if ( timedspread != 0.0 ) {
|
||||||
double diff;
|
double diff;
|
||||||
do {
|
do {
|
||||||
diff = CurrTime() - heartbeatstarttime;
|
diff = CurrTime() - heartbeatstarttime;
|
||||||
//printf("%d %f\n", i, diff);
|
//printf("%d %f\n", i, diff);
|
||||||
} while ( diff < i/(num_lines + (num_lines * 0.15) ) );
|
//} while ( diff < i/threading::Manager::HEART_BEAT_INTERVAL*(num_lines + (num_lines * timedspread) ) );
|
||||||
|
} while ( diff/threading::Manager::HEART_BEAT_INTERVAL < i/(linestosend + (linestosend * timedspread) ) );
|
||||||
//} while ( diff < 0.8);
|
//} while ( diff < 0.8);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -226,6 +230,15 @@ threading::Value* Benchmark::EntryToVal(TypeTag type, TypeTag subtype) {
|
||||||
|
|
||||||
bool Benchmark::DoHeartbeat(double network_time, double current_time)
|
bool Benchmark::DoHeartbeat(double network_time, double current_time)
|
||||||
{
|
{
|
||||||
|
/*
|
||||||
|
* This does not work the way I envisioned it, because the queueing is the problem.
|
||||||
|
printf("%f\n", CurrTime() - current_time);
|
||||||
|
if ( CurrTime() - current_time > 0.25 ) {
|
||||||
|
// event has hung for a time. refuse.
|
||||||
|
SendEvent("EndBenchmark", 0, 0);
|
||||||
|
return true;
|
||||||
|
} */
|
||||||
|
|
||||||
ReaderBackend::DoHeartbeat(network_time, current_time);
|
ReaderBackend::DoHeartbeat(network_time, current_time);
|
||||||
num_lines = (int) ( (double) num_lines*multiplication_factor);
|
num_lines = (int) ( (double) num_lines*multiplication_factor);
|
||||||
num_lines += add;
|
num_lines += add;
|
||||||
|
|
|
@ -45,7 +45,7 @@ private:
|
||||||
int add;
|
int add;
|
||||||
int stopspreadat;
|
int stopspreadat;
|
||||||
double heartbeatstarttime;
|
double heartbeatstarttime;
|
||||||
int timedspread;
|
double timedspread;
|
||||||
|
|
||||||
string RandomString(const int len);
|
string RandomString(const int len);
|
||||||
|
|
||||||
|
|
|
@ -9,6 +9,10 @@
|
||||||
#include "BasicThread.h"
|
#include "BasicThread.h"
|
||||||
#include "MsgThread.h"
|
#include "MsgThread.h"
|
||||||
|
|
||||||
|
namespace input { namespace reader {
|
||||||
|
class Benchmark;
|
||||||
|
}}
|
||||||
|
|
||||||
namespace threading {
|
namespace threading {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -80,6 +84,7 @@ public:
|
||||||
protected:
|
protected:
|
||||||
friend class BasicThread;
|
friend class BasicThread;
|
||||||
friend class MsgThread;
|
friend class MsgThread;
|
||||||
|
friend class input::reader::Benchmark; // needs heartbeat
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Registers a new basic thread with the manager. This is
|
* Registers a new basic thread with the manager. This is
|
||||||
|
@ -119,8 +124,9 @@ protected:
|
||||||
*/
|
*/
|
||||||
virtual const char* Tag() { return "threading::Manager"; }
|
virtual const char* Tag() { return "threading::Manager"; }
|
||||||
|
|
||||||
|
static const int HEART_BEAT_INTERVAL = 10;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
static const int HEART_BEAT_INTERVAL = 1;
|
|
||||||
|
|
||||||
typedef std::list<BasicThread*> all_thread_list;
|
typedef std::list<BasicThread*> all_thread_list;
|
||||||
all_thread_list all_threads;
|
all_thread_list all_threads;
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue