mirror of
https://github.com/zeek/zeek.git
synced 2025-10-05 08:08:19 +00:00
It works. Even including all unit tests.
But: there are still a few places where I am sure that there are race conditions & memory leaks & I do not really like the current interface & I have to add a few more messages between the front and backend. But - it works :)
This commit is contained in:
parent
4e868d282d
commit
88233efb2c
16 changed files with 127 additions and 49 deletions
|
@ -114,6 +114,8 @@ export {
|
||||||
## filter: the `TableFilter` record describing the filter.
|
## filter: the `TableFilter` record describing the filter.
|
||||||
global read_table: function(description: Input::StreamDescription, filter: Input::TableFilter) : bool;
|
global read_table: function(description: Input::StreamDescription, filter: Input::TableFilter) : bool;
|
||||||
|
|
||||||
|
global update_finished: event(id: Input::ID);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@load base/input.bif
|
@load base/input.bif
|
||||||
|
|
|
@ -215,7 +215,7 @@ ReaderFrontend* Manager::CreateStream(EnumVal* id, RecordVal* description)
|
||||||
|
|
||||||
EnumVal* reader = description->LookupWithDefault(rtype->FieldOffset("reader"))->AsEnumVal();
|
EnumVal* reader = description->LookupWithDefault(rtype->FieldOffset("reader"))->AsEnumVal();
|
||||||
|
|
||||||
ReaderFrontend* reader_obj = new ReaderFrontend(id->AsEnum());
|
ReaderFrontend* reader_obj = new ReaderFrontend(reader->InternalInt());
|
||||||
assert(reader_obj);
|
assert(reader_obj);
|
||||||
|
|
||||||
// get the source...
|
// get the source...
|
||||||
|
@ -680,7 +680,7 @@ Val* Manager::ValueToIndexVal(int num_fields, const RecordType *type, const Valu
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void Manager::SendEntry(const ReaderFrontend* reader, const int id, const Value* const *vals) {
|
void Manager::SendEntry(const ReaderFrontend* reader, const int id, Value* *vals) {
|
||||||
ReaderInfo *i = FindReader(reader);
|
ReaderInfo *i = FindReader(reader);
|
||||||
if ( i == 0 ) {
|
if ( i == 0 ) {
|
||||||
reporter->InternalError("Unknown reader");
|
reporter->InternalError("Unknown reader");
|
||||||
|
@ -692,18 +692,25 @@ void Manager::SendEntry(const ReaderFrontend* reader, const int id, const Value*
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int readFields;
|
||||||
if ( i->filters[id]->filter_type == TABLE_FILTER ) {
|
if ( i->filters[id]->filter_type == TABLE_FILTER ) {
|
||||||
SendEntryTable(reader, id, vals);
|
readFields = SendEntryTable(reader, id, vals);
|
||||||
} else if ( i->filters[id]->filter_type == EVENT_FILTER ) {
|
} else if ( i->filters[id]->filter_type == EVENT_FILTER ) {
|
||||||
EnumVal *type = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event);
|
EnumVal *type = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event);
|
||||||
SendEventFilterEvent(reader, type, id, vals);
|
readFields = SendEventFilterEvent(reader, type, id, vals);
|
||||||
} else {
|
} else {
|
||||||
assert(false);
|
assert(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for ( int i = 0; i < readFields; i++ ) {
|
||||||
|
delete vals[i];
|
||||||
|
}
|
||||||
|
delete [] vals;
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void Manager::SendEntryTable(const ReaderFrontend* reader, const int id, const Value* const *vals) {
|
int Manager::SendEntryTable(const ReaderFrontend* reader, const int id, const Value* const *vals) {
|
||||||
ReaderInfo *i = FindReader(reader);
|
ReaderInfo *i = FindReader(reader);
|
||||||
|
|
||||||
bool updated = false;
|
bool updated = false;
|
||||||
|
@ -733,7 +740,7 @@ void Manager::SendEntryTable(const ReaderFrontend* reader, const int id, const V
|
||||||
// ok, exact duplicate
|
// ok, exact duplicate
|
||||||
filter->lastDict->Remove(idxhash);
|
filter->lastDict->Remove(idxhash);
|
||||||
filter->currDict->Insert(idxhash, h);
|
filter->currDict->Insert(idxhash, h);
|
||||||
return;
|
return filter->num_val_fields + filter->num_idx_fields;
|
||||||
} else {
|
} else {
|
||||||
assert( filter->num_val_fields > 0 );
|
assert( filter->num_val_fields > 0 );
|
||||||
// updated
|
// updated
|
||||||
|
@ -794,11 +801,11 @@ void Manager::SendEntryTable(const ReaderFrontend* reader, const int id, const V
|
||||||
if ( !updated ) {
|
if ( !updated ) {
|
||||||
// throw away. Hence - we quit. And remove the entry from the current dictionary...
|
// throw away. Hence - we quit. And remove the entry from the current dictionary...
|
||||||
delete(filter->currDict->RemoveEntry(idxhash));
|
delete(filter->currDict->RemoveEntry(idxhash));
|
||||||
return;
|
return filter->num_val_fields + filter->num_idx_fields;
|
||||||
} else {
|
} else {
|
||||||
// keep old one
|
// keep old one
|
||||||
filter->currDict->Insert(idxhash, h);
|
filter->currDict->Insert(idxhash, h);
|
||||||
return;
|
return filter->num_val_fields + filter->num_idx_fields;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -809,7 +816,7 @@ void Manager::SendEntryTable(const ReaderFrontend* reader, const int id, const V
|
||||||
HashKey* k = filter->tab->ComputeHash(idxval);
|
HashKey* k = filter->tab->ComputeHash(idxval);
|
||||||
if ( !k ) {
|
if ( !k ) {
|
||||||
reporter->InternalError("could not hash");
|
reporter->InternalError("could not hash");
|
||||||
return;
|
return filter->num_val_fields + filter->num_idx_fields;
|
||||||
}
|
}
|
||||||
|
|
||||||
filter->tab->Assign(idxval, k, valval);
|
filter->tab->Assign(idxval, k, valval);
|
||||||
|
@ -842,6 +849,9 @@ void Manager::SendEntryTable(const ReaderFrontend* reader, const int id, const V
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
return filter->num_val_fields + filter->num_idx_fields;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -926,9 +936,21 @@ void Manager::EndCurrentSend(const ReaderFrontend* reader, int id) {
|
||||||
|
|
||||||
filter->lastDict = filter->currDict;
|
filter->lastDict = filter->currDict;
|
||||||
filter->currDict = new PDict(InputHash);
|
filter->currDict = new PDict(InputHash);
|
||||||
|
|
||||||
|
// Send event that the current update is indeed finished.
|
||||||
|
|
||||||
|
|
||||||
|
EventHandler* handler = event_registry->Lookup("Input::update_finished");
|
||||||
|
if ( handler == 0 ) {
|
||||||
|
reporter->InternalError("Input::update_finished not found!");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
Ref(i->id);
|
||||||
|
SendEvent(handler, 1, i->id);
|
||||||
}
|
}
|
||||||
|
|
||||||
void Manager::Put(const ReaderFrontend* reader, int id, const Value* const *vals) {
|
void Manager::Put(const ReaderFrontend* reader, int id, Value* *vals) {
|
||||||
ReaderInfo *i = FindReader(reader);
|
ReaderInfo *i = FindReader(reader);
|
||||||
if ( i == 0 ) {
|
if ( i == 0 ) {
|
||||||
reporter->InternalError("Unknown reader");
|
reporter->InternalError("Unknown reader");
|
||||||
|
@ -951,7 +973,7 @@ void Manager::Put(const ReaderFrontend* reader, int id, const Value* const *vals
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void Manager::SendEventFilterEvent(const ReaderFrontend* reader, EnumVal* type, int id, const Value* const *vals) {
|
int Manager::SendEventFilterEvent(const ReaderFrontend* reader, EnumVal* type, int id, const Value* const *vals) {
|
||||||
ReaderInfo *i = FindReader(reader);
|
ReaderInfo *i = FindReader(reader);
|
||||||
|
|
||||||
bool updated = false;
|
bool updated = false;
|
||||||
|
@ -985,11 +1007,13 @@ void Manager::SendEventFilterEvent(const ReaderFrontend* reader, EnumVal* type,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SendEvent(filter->event, out_vals);
|
SendEvent(filter->event, out_vals);
|
||||||
|
|
||||||
|
return filter->fields->NumFields();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void Manager::PutTable(const ReaderFrontend* reader, int id, const Value* const *vals) {
|
int Manager::PutTable(const ReaderFrontend* reader, int id, const Value* const *vals) {
|
||||||
ReaderInfo *i = FindReader(reader);
|
ReaderInfo *i = FindReader(reader);
|
||||||
|
|
||||||
assert(i);
|
assert(i);
|
||||||
|
@ -1011,6 +1035,8 @@ void Manager::PutTable(const ReaderFrontend* reader, int id, const Value* const
|
||||||
}
|
}
|
||||||
|
|
||||||
filter->tab->Assign(idxval, valval);
|
filter->tab->Assign(idxval, valval);
|
||||||
|
|
||||||
|
return filter->num_idx_fields + filter->num_val_fields;
|
||||||
}
|
}
|
||||||
|
|
||||||
void Manager::Clear(const ReaderFrontend* reader, int id) {
|
void Manager::Clear(const ReaderFrontend* reader, int id) {
|
||||||
|
@ -1028,7 +1054,7 @@ void Manager::Clear(const ReaderFrontend* reader, int id) {
|
||||||
filter->tab->RemoveAll();
|
filter->tab->RemoveAll();
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Manager::Delete(const ReaderFrontend* reader, int id, const Value* const *vals) {
|
bool Manager::Delete(const ReaderFrontend* reader, int id, Value* *vals) {
|
||||||
ReaderInfo *i = FindReader(reader);
|
ReaderInfo *i = FindReader(reader);
|
||||||
if ( i == 0 ) {
|
if ( i == 0 ) {
|
||||||
reporter->InternalError("Unknown reader");
|
reporter->InternalError("Unknown reader");
|
||||||
|
@ -1037,18 +1063,29 @@ bool Manager::Delete(const ReaderFrontend* reader, int id, const Value* const *v
|
||||||
|
|
||||||
assert(i->HasFilter(id));
|
assert(i->HasFilter(id));
|
||||||
|
|
||||||
|
bool success = false;
|
||||||
|
int readVals = 0;
|
||||||
|
|
||||||
if ( i->filters[id]->filter_type == TABLE_FILTER ) {
|
if ( i->filters[id]->filter_type == TABLE_FILTER ) {
|
||||||
TableFilter* filter = (TableFilter*) i->filters[id];
|
TableFilter* filter = (TableFilter*) i->filters[id];
|
||||||
Val* idxval = ValueToIndexVal(filter->num_idx_fields, filter->itype, vals);
|
Val* idxval = ValueToIndexVal(filter->num_idx_fields, filter->itype, vals);
|
||||||
return( filter->tab->Delete(idxval) != 0 );
|
readVals = filter->num_idx_fields + filter->num_val_fields;
|
||||||
|
success = ( filter->tab->Delete(idxval) != 0 );
|
||||||
} else if ( i->filters[id]->filter_type == EVENT_FILTER ) {
|
} else if ( i->filters[id]->filter_type == EVENT_FILTER ) {
|
||||||
EnumVal *type = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event);
|
EnumVal *type = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event);
|
||||||
SendEventFilterEvent(reader, type, id, vals);
|
readVals = SendEventFilterEvent(reader, type, id, vals);
|
||||||
return true;
|
success = true;
|
||||||
} else {
|
} else {
|
||||||
assert(false);
|
assert(false);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for ( int i = 0; i < readVals; i++ ) {
|
||||||
|
delete vals[i];
|
||||||
|
}
|
||||||
|
delete [] vals;
|
||||||
|
|
||||||
|
return success;
|
||||||
}
|
}
|
||||||
|
|
||||||
void Manager::Error(ReaderFrontend* reader, const char* msg)
|
void Manager::Error(ReaderFrontend* reader, const char* msg)
|
||||||
|
@ -1056,7 +1093,7 @@ void Manager::Error(ReaderFrontend* reader, const char* msg)
|
||||||
reporter->Error("error with input reader for %s: %s", reader->Source().c_str(), msg);
|
reporter->Error("error with input reader for %s: %s", reader->Source().c_str(), msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Manager::SendEvent(const string& name, const int num_vals, const Value* const *vals)
|
bool Manager::SendEvent(const string& name, const int num_vals, Value* *vals)
|
||||||
{
|
{
|
||||||
EventHandler* handler = event_registry->Lookup(name.c_str());
|
EventHandler* handler = event_registry->Lookup(name.c_str());
|
||||||
if ( handler == 0 ) {
|
if ( handler == 0 ) {
|
||||||
|
@ -1078,6 +1115,11 @@ bool Manager::SendEvent(const string& name, const int num_vals, const Value* con
|
||||||
|
|
||||||
mgr.Dispatch(new Event(handler, vl));
|
mgr.Dispatch(new Event(handler, vl));
|
||||||
|
|
||||||
|
for ( int i = 0; i < num_vals; i++ ) {
|
||||||
|
delete vals[i];
|
||||||
|
}
|
||||||
|
delete [] vals;
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -44,15 +44,15 @@ protected:
|
||||||
void Error(ReaderFrontend* reader, const char* msg);
|
void Error(ReaderFrontend* reader, const char* msg);
|
||||||
|
|
||||||
// for readers to write to input stream in direct mode (reporting new/deleted values directly)
|
// for readers to write to input stream in direct mode (reporting new/deleted values directly)
|
||||||
void Put(const ReaderFrontend* reader, int id, const threading::Value* const *vals);
|
void Put(const ReaderFrontend* reader, int id, threading::Value* *vals);
|
||||||
void Clear(const ReaderFrontend* reader, int id);
|
void Clear(const ReaderFrontend* reader, int id);
|
||||||
bool Delete(const ReaderFrontend* reader, int id, const threading::Value* const *vals);
|
bool Delete(const ReaderFrontend* reader, int id, threading::Value* *vals);
|
||||||
|
|
||||||
// for readers to write to input stream in indirect mode (manager is monitoring new/deleted values)
|
// for readers to write to input stream in indirect mode (manager is monitoring new/deleted values)
|
||||||
void SendEntry(const ReaderFrontend* reader, const int id, const threading::Value* const *vals);
|
void SendEntry(const ReaderFrontend* reader, const int id, threading::Value* *vals);
|
||||||
void EndCurrentSend(const ReaderFrontend* reader, const int id);
|
void EndCurrentSend(const ReaderFrontend* reader, const int id);
|
||||||
|
|
||||||
bool SendEvent(const string& name, const int num_vals, const threading::Value* const *vals);
|
bool SendEvent(const string& name, const int num_vals, threading::Value* *vals);
|
||||||
|
|
||||||
ReaderBackend* CreateBackend(ReaderFrontend* frontend, bro_int_t type);
|
ReaderBackend* CreateBackend(ReaderFrontend* frontend, bro_int_t type);
|
||||||
|
|
||||||
|
@ -60,9 +60,9 @@ protected:
|
||||||
private:
|
private:
|
||||||
struct ReaderInfo;
|
struct ReaderInfo;
|
||||||
|
|
||||||
void SendEntryTable(const ReaderFrontend* reader, int id, const threading::Value* const *vals);
|
int SendEntryTable(const ReaderFrontend* reader, int id, const threading::Value* const *vals);
|
||||||
void PutTable(const ReaderFrontend* reader, int id, const threading::Value* const *vals);
|
int PutTable(const ReaderFrontend* reader, int id, const threading::Value* const *vals);
|
||||||
void SendEventFilterEvent(const ReaderFrontend* reader, EnumVal* type, int id, const threading::Value* const *vals);
|
int SendEventFilterEvent(const ReaderFrontend* reader, EnumVal* type, int id, const threading::Value* const *vals);
|
||||||
|
|
||||||
bool IsCompatibleType(BroType* t, bool atomic_only=false);
|
bool IsCompatibleType(BroType* t, bool atomic_only=false);
|
||||||
|
|
||||||
|
|
|
@ -26,7 +26,7 @@ private:
|
||||||
|
|
||||||
class PutMessage : public threading::OutputMessage<ReaderFrontend> {
|
class PutMessage : public threading::OutputMessage<ReaderFrontend> {
|
||||||
public:
|
public:
|
||||||
PutMessage(ReaderFrontend* reader, int id, const Value* const *val)
|
PutMessage(ReaderFrontend* reader, int id, Value* *val)
|
||||||
: threading::OutputMessage<ReaderFrontend>("Put", reader),
|
: threading::OutputMessage<ReaderFrontend>("Put", reader),
|
||||||
id(id), val(val) {}
|
id(id), val(val) {}
|
||||||
|
|
||||||
|
@ -37,12 +37,12 @@ public:
|
||||||
|
|
||||||
private:
|
private:
|
||||||
int id;
|
int id;
|
||||||
const Value* const *val;
|
Value* *val;
|
||||||
};
|
};
|
||||||
|
|
||||||
class DeleteMessage : public threading::OutputMessage<ReaderFrontend> {
|
class DeleteMessage : public threading::OutputMessage<ReaderFrontend> {
|
||||||
public:
|
public:
|
||||||
DeleteMessage(ReaderFrontend* reader, int id, const Value* const *val)
|
DeleteMessage(ReaderFrontend* reader, int id, Value* *val)
|
||||||
: threading::OutputMessage<ReaderFrontend>("Delete", reader),
|
: threading::OutputMessage<ReaderFrontend>("Delete", reader),
|
||||||
id(id), val(val) {}
|
id(id), val(val) {}
|
||||||
|
|
||||||
|
@ -52,7 +52,7 @@ public:
|
||||||
|
|
||||||
private:
|
private:
|
||||||
int id;
|
int id;
|
||||||
const Value* const *val;
|
Value* *val;
|
||||||
};
|
};
|
||||||
|
|
||||||
class ClearMessage : public threading::OutputMessage<ReaderFrontend> {
|
class ClearMessage : public threading::OutputMessage<ReaderFrontend> {
|
||||||
|
@ -72,7 +72,7 @@ private:
|
||||||
|
|
||||||
class SendEventMessage : public threading::OutputMessage<ReaderFrontend> {
|
class SendEventMessage : public threading::OutputMessage<ReaderFrontend> {
|
||||||
public:
|
public:
|
||||||
SendEventMessage(ReaderFrontend* reader, const string& name, const int num_vals, const Value* const *val)
|
SendEventMessage(ReaderFrontend* reader, const string& name, const int num_vals, Value* *val)
|
||||||
: threading::OutputMessage<ReaderFrontend>("SendEvent", reader),
|
: threading::OutputMessage<ReaderFrontend>("SendEvent", reader),
|
||||||
name(name), num_vals(num_vals), val(val) {}
|
name(name), num_vals(num_vals), val(val) {}
|
||||||
|
|
||||||
|
@ -83,14 +83,14 @@ public:
|
||||||
private:
|
private:
|
||||||
const string name;
|
const string name;
|
||||||
const int num_vals;
|
const int num_vals;
|
||||||
const Value* const *val;
|
Value* *val;
|
||||||
};
|
};
|
||||||
|
|
||||||
class SendEntryMessage : public threading::OutputMessage<ReaderFrontend> {
|
class SendEntryMessage : public threading::OutputMessage<ReaderFrontend> {
|
||||||
public:
|
public:
|
||||||
SendEntryMessage(ReaderFrontend* reader, const int id, const Value* const *val)
|
SendEntryMessage(ReaderFrontend* reader, const int id, Value* *val)
|
||||||
: threading::OutputMessage<ReaderFrontend>("SendEntry", reader),
|
: threading::OutputMessage<ReaderFrontend>("SendEntry", reader),
|
||||||
id(id), val(val) {}
|
id(id), val(val) { }
|
||||||
|
|
||||||
virtual bool Process() {
|
virtual bool Process() {
|
||||||
input_mgr->SendEntry(Object(), id, val);
|
input_mgr->SendEntry(Object(), id, val);
|
||||||
|
@ -99,13 +99,13 @@ public:
|
||||||
|
|
||||||
private:
|
private:
|
||||||
const int id;
|
const int id;
|
||||||
const Value* const *val;
|
Value* *val;
|
||||||
};
|
};
|
||||||
|
|
||||||
class EndCurrentSendMessage : public threading::OutputMessage<ReaderFrontend> {
|
class EndCurrentSendMessage : public threading::OutputMessage<ReaderFrontend> {
|
||||||
public:
|
public:
|
||||||
EndCurrentSendMessage(ReaderFrontend* reader, int id)
|
EndCurrentSendMessage(ReaderFrontend* reader, int id)
|
||||||
: threading::OutputMessage<ReaderFrontend>("SendEntry", reader),
|
: threading::OutputMessage<ReaderFrontend>("EndCurrentSend", reader),
|
||||||
id(id) {}
|
id(id) {}
|
||||||
|
|
||||||
virtual bool Process() {
|
virtual bool Process() {
|
||||||
|
@ -145,12 +145,12 @@ void ReaderBackend::Error(const char *msg)
|
||||||
} */
|
} */
|
||||||
|
|
||||||
|
|
||||||
void ReaderBackend::Put(int id, const Value* const *val)
|
void ReaderBackend::Put(int id, Value* *val)
|
||||||
{
|
{
|
||||||
SendOut(new PutMessage(frontend, id, val));
|
SendOut(new PutMessage(frontend, id, val));
|
||||||
}
|
}
|
||||||
|
|
||||||
void ReaderBackend::Delete(int id, const Value* const *val)
|
void ReaderBackend::Delete(int id, Value* *val)
|
||||||
{
|
{
|
||||||
SendOut(new DeleteMessage(frontend, id, val));
|
SendOut(new DeleteMessage(frontend, id, val));
|
||||||
}
|
}
|
||||||
|
@ -160,7 +160,7 @@ void ReaderBackend::Clear(int id)
|
||||||
SendOut(new ClearMessage(frontend, id));
|
SendOut(new ClearMessage(frontend, id));
|
||||||
}
|
}
|
||||||
|
|
||||||
void ReaderBackend::SendEvent(const string& name, const int num_vals, const Value* const *vals)
|
void ReaderBackend::SendEvent(const string& name, const int num_vals, Value* *vals)
|
||||||
{
|
{
|
||||||
SendOut(new SendEventMessage(frontend, name, num_vals, vals));
|
SendOut(new SendEventMessage(frontend, name, num_vals, vals));
|
||||||
}
|
}
|
||||||
|
@ -170,7 +170,7 @@ void ReaderBackend::EndCurrentSend(int id)
|
||||||
SendOut(new EndCurrentSendMessage(frontend, id));
|
SendOut(new EndCurrentSendMessage(frontend, id));
|
||||||
}
|
}
|
||||||
|
|
||||||
void ReaderBackend::SendEntry(int id, const Value* const *vals)
|
void ReaderBackend::SendEntry(int id, Value* *vals)
|
||||||
{
|
{
|
||||||
SendOut(new SendEntryMessage(frontend, id, vals));
|
SendOut(new SendEntryMessage(frontend, id, vals));
|
||||||
}
|
}
|
||||||
|
|
|
@ -50,15 +50,15 @@ protected:
|
||||||
// A thread-safe version of fmt(). (stolen from logwriter)
|
// A thread-safe version of fmt(). (stolen from logwriter)
|
||||||
const char* Fmt(const char* format, ...);
|
const char* Fmt(const char* format, ...);
|
||||||
|
|
||||||
void SendEvent(const string& name, const int num_vals, const threading::Value* const *vals);
|
void SendEvent(const string& name, const int num_vals, threading::Value* *vals);
|
||||||
|
|
||||||
// Content-sendinf-functions (simple mode). Including table-specific stuff that simply is not used if we have no table
|
// Content-sendinf-functions (simple mode). Including table-specific stuff that simply is not used if we have no table
|
||||||
void Put(int id, const threading::Value* const *val);
|
void Put(int id, threading::Value* *val);
|
||||||
void Delete(int id, const threading::Value* const *val);
|
void Delete(int id, threading::Value* *val);
|
||||||
void Clear(int id);
|
void Clear(int id);
|
||||||
|
|
||||||
// Table-functions (tracking mode): Only changed lines are propagated.
|
// Table-functions (tracking mode): Only changed lines are propagated.
|
||||||
void SendEntry(int id, const threading::Value* const *vals);
|
void SendEntry(int id, threading::Value* *vals);
|
||||||
void EndCurrentSend(int id);
|
void EndCurrentSend(int id);
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -56,6 +56,7 @@ private:
|
||||||
const threading::Field* const* fields;
|
const threading::Field* const* fields;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
ReaderFrontend::ReaderFrontend(bro_int_t type) {
|
ReaderFrontend::ReaderFrontend(bro_int_t type) {
|
||||||
disabled = initialized = false;
|
disabled = initialized = false;
|
||||||
ty_name = "<not set>";
|
ty_name = "<not set>";
|
||||||
|
|
|
@ -445,10 +445,12 @@ bool Ascii::DoUpdate() {
|
||||||
|
|
||||||
SendEntry((*it).first, fields);
|
SendEntry((*it).first, fields);
|
||||||
|
|
||||||
for ( unsigned int i = 0; i < (*it).second.num_fields; i++ ) {
|
/* Do not do this, ownership changes to other thread
|
||||||
|
* for ( unsigned int i = 0; i < (*it).second.num_fields; i++ ) {
|
||||||
delete fields[i];
|
delete fields[i];
|
||||||
}
|
}
|
||||||
delete [] fields;
|
delete [] fields;
|
||||||
|
*/
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -295,6 +295,7 @@ void terminate_bro()
|
||||||
|
|
||||||
log_mgr->Terminate();
|
log_mgr->Terminate();
|
||||||
thread_mgr->Terminate();
|
thread_mgr->Terminate();
|
||||||
|
mgr.Drain();
|
||||||
|
|
||||||
delete timer_mgr;
|
delete timer_mgr;
|
||||||
delete dns_mgr;
|
delete dns_mgr;
|
||||||
|
|
|
@ -111,7 +111,7 @@ void Manager::Process()
|
||||||
|
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
string s = msg->Name() + " failed, terminating thread";
|
string s = msg->Name() + " failed, terminating thread " + t->Name() + " (in ThreadManager)";
|
||||||
reporter->Error("%s", s.c_str());
|
reporter->Error("%s", s.c_str());
|
||||||
t->Stop();
|
t->Stop();
|
||||||
}
|
}
|
||||||
|
|
|
@ -267,7 +267,7 @@ void MsgThread::Run()
|
||||||
|
|
||||||
if ( ! result )
|
if ( ! result )
|
||||||
{
|
{
|
||||||
string s = msg->Name() + " failed, terminating thread";
|
string s = msg->Name() + " failed, terminating thread (MsgThread)";
|
||||||
Error(s.c_str());
|
Error(s.c_str());
|
||||||
Stop();
|
Stop();
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -48,7 +48,10 @@ event bro_init()
|
||||||
Input::create_stream(A::INPUT, [$source="input.log"]);
|
Input::create_stream(A::INPUT, [$source="input.log"]);
|
||||||
Input::add_tablefilter(A::INPUT, [$name="ssh", $idx=Idx, $val=Val, $destination=servers]);
|
Input::add_tablefilter(A::INPUT, [$name="ssh", $idx=Idx, $val=Val, $destination=servers]);
|
||||||
Input::force_update(A::INPUT);
|
Input::force_update(A::INPUT);
|
||||||
print servers;
|
|
||||||
Input::remove_tablefilter(A::INPUT, "ssh");
|
Input::remove_tablefilter(A::INPUT, "ssh");
|
||||||
Input::remove_stream(A::INPUT);
|
Input::remove_stream(A::INPUT);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
event Input::update_finished(id: Input::ID) {
|
||||||
|
print servers;
|
||||||
|
}
|
||||||
|
|
|
@ -34,5 +34,9 @@ event bro_init()
|
||||||
Input::create_stream(A::INPUT, [$source="input.log"]);
|
Input::create_stream(A::INPUT, [$source="input.log"]);
|
||||||
Input::add_tablefilter(A::INPUT, [$name="input", $idx=Idx, $val=Val, $destination=servers, $want_record=F]);
|
Input::add_tablefilter(A::INPUT, [$name="input", $idx=Idx, $val=Val, $destination=servers, $want_record=F]);
|
||||||
Input::force_update(A::INPUT);
|
Input::force_update(A::INPUT);
|
||||||
|
}
|
||||||
|
|
||||||
|
event Input::update_finished(id: Input::ID) {
|
||||||
print servers;
|
print servers;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -34,5 +34,9 @@ event bro_init()
|
||||||
Input::create_stream(A::INPUT, [$source="input.log"]);
|
Input::create_stream(A::INPUT, [$source="input.log"]);
|
||||||
Input::add_tablefilter(A::INPUT, [$name="input", $idx=Idx, $val=Val, $destination=servers]);
|
Input::add_tablefilter(A::INPUT, [$name="input", $idx=Idx, $val=Val, $destination=servers]);
|
||||||
Input::force_update(A::INPUT);
|
Input::force_update(A::INPUT);
|
||||||
|
}
|
||||||
|
|
||||||
|
event Input::update_finished(id: Input::ID) {
|
||||||
print servers;
|
print servers;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -39,3 +39,10 @@ event bro_init()
|
||||||
Input::remove_tablefilter(A::INPUT, "input");
|
Input::remove_tablefilter(A::INPUT, "input");
|
||||||
Input::remove_stream(A::INPUT);
|
Input::remove_stream(A::INPUT);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
event Input::update_finished(id: Input::ID) {
|
||||||
|
print servers[1.2.3.4];
|
||||||
|
print servers[1.2.3.5];
|
||||||
|
print servers[1.2.3.6];
|
||||||
|
}
|
||||||
|
|
||||||
|
|
|
@ -42,6 +42,9 @@ event bro_init()
|
||||||
$pred(typ: Input::Event, left: Idx, right: bool) = { return right; }
|
$pred(typ: Input::Event, left: Idx, right: bool) = { return right; }
|
||||||
]);
|
]);
|
||||||
Input::force_update(A::INPUT);
|
Input::force_update(A::INPUT);
|
||||||
|
}
|
||||||
|
|
||||||
|
event Input::update_finished(id: Input::ID) {
|
||||||
if ( 1 in servers ) {
|
if ( 1 in servers ) {
|
||||||
print "VALID";
|
print "VALID";
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,6 +35,8 @@ type Val: record {
|
||||||
global destination1: table[int] of Val = table();
|
global destination1: table[int] of Val = table();
|
||||||
global destination2: table[int] of Val = table();
|
global destination2: table[int] of Val = table();
|
||||||
|
|
||||||
|
global done: bool = F;
|
||||||
|
|
||||||
event bro_init()
|
event bro_init()
|
||||||
{
|
{
|
||||||
# first read in the old stuff into the table...
|
# first read in the old stuff into the table...
|
||||||
|
@ -45,6 +47,15 @@ event bro_init()
|
||||||
Input::add_tablefilter(A::INPUT, [$name="input2",$idx=Idx, $val=Val, $destination=destination2]);
|
Input::add_tablefilter(A::INPUT, [$name="input2",$idx=Idx, $val=Val, $destination=destination2]);
|
||||||
|
|
||||||
Input::force_update(A::INPUT);
|
Input::force_update(A::INPUT);
|
||||||
|
}
|
||||||
|
|
||||||
|
event Input::update_finished(id: Input::ID) {
|
||||||
|
if ( done == T ) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
done = T;
|
||||||
|
|
||||||
if ( 1 in destination1 ) {
|
if ( 1 in destination1 ) {
|
||||||
print "VALID";
|
print "VALID";
|
||||||
}
|
}
|
||||||
|
@ -90,6 +101,4 @@ event bro_init()
|
||||||
if ( 7 in destination2 ) {
|
if ( 7 in destination2 ) {
|
||||||
print "VALID";
|
print "VALID";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue