fix memory leak for tables... nearly completely.

There is still a tiny where I have not yet found where the delete could be missing.

For big table imports the memory footprint is significant nevertheless -- with tables of > 200000 entries, memory consumption can apparently reach in excess of 1.5Gb - and on a first glance this seems legitimate.

(The reason for this is probably that we use several hash tables to keep the performance impact small).
This commit is contained in:
Bernhard Amann 2012-03-29 01:09:11 -07:00
parent b7bbda7244
commit 8e526a7f83

View file

@ -34,6 +34,11 @@ InputHash::~InputHash() {
delete idxkey; delete idxkey;
} }
static void input_hash_delete_func(void* val) {
InputHash* h = (InputHash*) val;
delete h;
}
declare(PDict, InputHash); declare(PDict, InputHash);
class Manager::Filter { class Manager::Filter {
@ -170,6 +175,14 @@ Manager::Manager()
{ {
} }
Manager::~Manager() {
for ( map<ReaderFrontend*, Filter*>::iterator s = readers.begin(); s != readers.end(); ++s ) {
delete s->second;
delete s->first;
}
}
ReaderBackend* Manager::CreateBackend(ReaderFrontend* frontend, bro_int_t type) { ReaderBackend* Manager::CreateBackend(ReaderFrontend* frontend, bro_int_t type) {
ReaderDefinition* ir = input_readers; ReaderDefinition* ir = input_readers;
@ -527,7 +540,9 @@ bool Manager::CreateTableStream(RecordVal* fval) {
filter->itype = idx->AsRecordType(); filter->itype = idx->AsRecordType();
filter->event = event ? event_registry->Lookup(event->GetID()->Name()) : 0; filter->event = event ? event_registry->Lookup(event->GetID()->Name()) : 0;
filter->currDict = new PDict(InputHash); filter->currDict = new PDict(InputHash);
filter->currDict->SetDeleteFunc(input_hash_delete_func);
filter->lastDict = new PDict(InputHash); filter->lastDict = new PDict(InputHash);
filter->lastDict->SetDeleteFunc(input_hash_delete_func);
filter->want_record = ( want_record->InternalInt() == 1 ); filter->want_record = ( want_record->InternalInt() == 1 );
Unref(want_record); // ref'd by lookupwithdefault Unref(want_record); // ref'd by lookupwithdefault
@ -820,20 +835,20 @@ int Manager::SendEntryTable(Filter* i, const Value* const *vals) {
} }
} }
InputHash *h = filter->lastDict->Lookup(idxhash); InputHash *h = filter->lastDict->Lookup(idxhash);
if ( h != 0 ) { if ( h != 0 ) {
// seen before // seen before
if ( filter->num_val_fields == 0 || h->valhash == valhash ) { if ( filter->num_val_fields == 0 || h->valhash == valhash ) {
// ok, exact duplicate // ok, exact duplicate, move entry to new dicrionary and do nothing else.
filter->lastDict->Remove(idxhash); filter->lastDict->Remove(idxhash);
filter->currDict->Insert(idxhash, h); filter->currDict->Insert(idxhash, h);
delete idxhash; delete idxhash;
return filter->num_val_fields + filter->num_idx_fields; return filter->num_val_fields + filter->num_idx_fields;
} else { } else {
assert( filter->num_val_fields > 0 ); assert( filter->num_val_fields > 0 );
// updated // entry was updated in some way
filter->lastDict->Remove(idxhash); filter->lastDict->Remove(idxhash);
delete(h); // keep h for predicates
updated = true; updated = true;
} }
@ -881,8 +896,10 @@ int Manager::SendEntryTable(Filter* i, const Value* const *vals) {
Unref(predidx); Unref(predidx);
if ( !updated ) { if ( !updated ) {
// throw away. Hence - we quit. And remove the entry from the current dictionary... // throw away. Hence - we quit. And remove the entry from the current dictionary...
delete(filter->currDict->RemoveEntry(idxhash)); // (but why should it be in there? assert this).
assert ( filter->currDict->RemoveEntry(idxhash) == 0 );
delete idxhash; delete idxhash;
delete h;
return filter->num_val_fields + filter->num_idx_fields; return filter->num_val_fields + filter->num_idx_fields;
} else { } else {
// keep old one // keep old one
@ -893,6 +910,12 @@ int Manager::SendEntryTable(Filter* i, const Value* const *vals) {
} }
} }
// now we don't need h anymore - if we are here, the entry is updated and a new h is created.
if ( h ) {
delete h;
h = 0;
}
Val* idxval; Val* idxval;
@ -1014,6 +1037,7 @@ void Manager::EndCurrentSend(ReaderFrontend* reader) {
Unref(predidx); Unref(predidx);
Unref(ev); Unref(ev);
filter->currDict->Insert(lastDictIdxKey, filter->lastDict->RemoveEntry(lastDictIdxKey)); filter->currDict->Insert(lastDictIdxKey, filter->lastDict->RemoveEntry(lastDictIdxKey));
delete lastDictIdxKey;
continue; continue;
} }
} }
@ -1030,8 +1054,9 @@ void Manager::EndCurrentSend(ReaderFrontend* reader) {
if ( ev ) if ( ev )
Unref(ev); Unref(ev);
filter->tab->Delete(ih->idxkey); Unref(filter->tab->Delete(ih->idxkey));
filter->lastDict->Remove(lastDictIdxKey); // deletex in next line filter->lastDict->Remove(lastDictIdxKey); // delete in next line
delete lastDictIdxKey;
delete(ih); delete(ih);
} }
@ -1040,6 +1065,7 @@ void Manager::EndCurrentSend(ReaderFrontend* reader) {
filter->lastDict = filter->currDict; filter->lastDict = filter->currDict;
filter->currDict = new PDict(InputHash); filter->currDict = new PDict(InputHash);
filter->currDict->SetDeleteFunc(input_hash_delete_func);
#ifdef DEBUG #ifdef DEBUG
DBG_LOG(DBG_INPUT, "EndCurrentSend complete for stream %s, queueing update_finished event", DBG_LOG(DBG_INPUT, "EndCurrentSend complete for stream %s, queueing update_finished event",
@ -1284,9 +1310,12 @@ bool Manager::Delete(ReaderFrontend* reader, Value* *vals) {
// only if filter = true -> no filtering // only if filter = true -> no filtering
if ( filterresult ) { if ( filterresult ) {
success = ( filter->tab->Delete(idxval) != 0 ); Val* retptr = filter->tab->Delete(idxval);
success = ( retptr != 0 );
if ( !success ) { if ( !success ) {
reporter->Error("Internal error while deleting values from input table"); reporter->Error("Internal error while deleting values from input table");
} else {
Unref(retptr);
} }
} }
} else if ( i->filter_type == EVENT_FILTER ) { } else if ( i->filter_type == EVENT_FILTER ) {