mirror of
https://github.com/zeek/zeek.git
synced 2025-10-10 10:38:20 +00:00
rename a couple of structures and make the names in manager fit the api more.
This should it make easier for other people to understand what is going on without having knowledge of an "internal api * means * in external api" mapping.
This commit is contained in:
parent
a5cc98bb5d
commit
1967f6f81c
2 changed files with 65 additions and 57 deletions
|
@ -23,6 +23,11 @@ using namespace input;
|
|||
using threading::Value;
|
||||
using threading::Field;
|
||||
|
||||
/**
|
||||
* InputHashes are used as Dictionaries to store the value and index hashes for all lines currently stored in a table.
|
||||
* Index hash is stored as HashKey*, because it is thrown into other Bro functions that need the complex structure of it.
|
||||
* For everything we do (with values), we just take the hash_t value and compare it directly with ==
|
||||
*/
|
||||
struct InputHash {
|
||||
hash_t valhash;
|
||||
HashKey* idxkey;
|
||||
|
@ -41,7 +46,10 @@ static void input_hash_delete_func(void* val) {
|
|||
|
||||
declare(PDict, InputHash);
|
||||
|
||||
class Manager::Filter {
|
||||
/**
|
||||
* Base stuff that every stream can do
|
||||
*/
|
||||
class Manager::Stream {
|
||||
public:
|
||||
string name;
|
||||
string source;
|
||||
|
@ -49,25 +57,25 @@ public:
|
|||
|
||||
int mode;
|
||||
|
||||
FilterType filter_type; // to distinguish between event and table filters
|
||||
StreamType filter_type; // to distinguish between event and table filters
|
||||
|
||||
EnumVal* type;
|
||||
ReaderFrontend* reader;
|
||||
|
||||
RecordVal* description;
|
||||
|
||||
Filter();
|
||||
virtual ~Filter();
|
||||
Stream();
|
||||
virtual ~Stream();
|
||||
};
|
||||
|
||||
Manager::Filter::Filter() {
|
||||
Manager::Stream::Stream() {
|
||||
type = 0;
|
||||
reader = 0;
|
||||
description = 0;
|
||||
removed = false;
|
||||
}
|
||||
|
||||
Manager::Filter::~Filter() {
|
||||
Manager::Stream::~Stream() {
|
||||
if ( type )
|
||||
Unref(type);
|
||||
if ( description )
|
||||
|
@ -77,7 +85,7 @@ Manager::Filter::~Filter() {
|
|||
delete(reader);
|
||||
}
|
||||
|
||||
class Manager::TableFilter: public Manager::Filter {
|
||||
class Manager::TableStream: public Manager::Stream {
|
||||
public:
|
||||
|
||||
unsigned int num_idx_fields;
|
||||
|
@ -96,11 +104,11 @@ public:
|
|||
|
||||
EventHandlerPtr event;
|
||||
|
||||
TableFilter();
|
||||
~TableFilter();
|
||||
TableStream();
|
||||
~TableStream();
|
||||
};
|
||||
|
||||
class Manager::EventFilter: public Manager::Filter {
|
||||
class Manager::EventStream: public Manager::Stream {
|
||||
public:
|
||||
EventHandlerPtr event;
|
||||
|
||||
|
@ -108,11 +116,11 @@ public:
|
|||
unsigned int num_fields;
|
||||
|
||||
bool want_record;
|
||||
EventFilter();
|
||||
~EventFilter();
|
||||
EventStream();
|
||||
~EventStream();
|
||||
};
|
||||
|
||||
Manager::TableFilter::TableFilter() : Manager::Filter::Filter() {
|
||||
Manager::TableStream::TableStream() : Manager::Stream::Stream() {
|
||||
filter_type = TABLE_FILTER;
|
||||
|
||||
tab = 0;
|
||||
|
@ -125,18 +133,18 @@ Manager::TableFilter::TableFilter() : Manager::Filter::Filter() {
|
|||
pred = 0;
|
||||
}
|
||||
|
||||
Manager::EventFilter::EventFilter() : Manager::Filter::Filter() {
|
||||
Manager::EventStream::EventStream() : Manager::Stream::Stream() {
|
||||
fields = 0;
|
||||
filter_type = EVENT_FILTER;
|
||||
}
|
||||
|
||||
Manager::EventFilter::~EventFilter() {
|
||||
Manager::EventStream::~EventStream() {
|
||||
if ( fields ) {
|
||||
Unref(fields);
|
||||
}
|
||||
}
|
||||
|
||||
Manager::TableFilter::~TableFilter() {
|
||||
Manager::TableStream::~TableStream() {
|
||||
if ( tab )
|
||||
Unref(tab);
|
||||
if ( itype )
|
||||
|
@ -176,7 +184,7 @@ Manager::Manager()
|
|||
}
|
||||
|
||||
Manager::~Manager() {
|
||||
for ( map<ReaderFrontend*, Filter*>::iterator s = readers.begin(); s != readers.end(); ++s ) {
|
||||
for ( map<ReaderFrontend*, Stream*>::iterator s = readers.begin(); s != readers.end(); ++s ) {
|
||||
delete s->second;
|
||||
delete s->first;
|
||||
}
|
||||
|
@ -233,7 +241,7 @@ ReaderBackend* Manager::CreateBackend(ReaderFrontend* frontend, bro_int_t type)
|
|||
}
|
||||
|
||||
// create a new input reader object to be used at whomevers leisure lateron.
|
||||
bool Manager::CreateStream(Filter* info, RecordVal* description)
|
||||
bool Manager::CreateStream(Stream* info, RecordVal* description)
|
||||
{
|
||||
ReaderDefinition* ir = input_readers;
|
||||
|
||||
|
@ -249,7 +257,7 @@ bool Manager::CreateStream(Filter* info, RecordVal* description)
|
|||
Unref(name_val);
|
||||
|
||||
{
|
||||
Filter *i = FindFilter(name);
|
||||
Stream *i = FindStream(name);
|
||||
if ( i != 0 ) {
|
||||
reporter->Error("Trying create already existing input stream %s", name.c_str());
|
||||
return false;
|
||||
|
@ -296,7 +304,7 @@ bool Manager::CreateEventStream(RecordVal* fval) {
|
|||
return false;
|
||||
}
|
||||
|
||||
EventFilter* filter = new EventFilter();
|
||||
EventStream* filter = new EventStream();
|
||||
{
|
||||
bool res = CreateStream(filter, fval);
|
||||
if ( res == false ) {
|
||||
|
@ -412,7 +420,7 @@ bool Manager::CreateTableStream(RecordVal* fval) {
|
|||
return false;
|
||||
}
|
||||
|
||||
TableFilter* filter = new TableFilter();
|
||||
TableStream* filter = new TableStream();
|
||||
{
|
||||
bool res = CreateStream(filter, fval);
|
||||
if ( res == false ) {
|
||||
|
@ -620,7 +628,7 @@ bool Manager::IsCompatibleType(BroType* t, bool atomic_only)
|
|||
|
||||
|
||||
bool Manager::RemoveStream(const string &name) {
|
||||
Filter *i = FindFilter(name);
|
||||
Stream *i = FindStream(name);
|
||||
|
||||
if ( i == 0 ) {
|
||||
return false; // not found
|
||||
|
@ -644,7 +652,7 @@ bool Manager::RemoveStream(const string &name) {
|
|||
}
|
||||
|
||||
bool Manager::RemoveStreamContinuation(ReaderFrontend* reader) {
|
||||
Filter *i = FindFilter(reader);
|
||||
Stream *i = FindStream(reader);
|
||||
|
||||
if ( i == 0 ) {
|
||||
reporter->Error("Stream not found in RemoveStreamContinuation");
|
||||
|
@ -712,7 +720,7 @@ bool Manager::UnrollRecordType(vector<Field*> *fields, const RecordType *rec, co
|
|||
|
||||
bool Manager::ForceUpdate(const string &name)
|
||||
{
|
||||
Filter *i = FindFilter(name);
|
||||
Stream *i = FindStream(name);
|
||||
if ( i == 0 ) {
|
||||
reporter->Error("Stream %s not found", name.c_str());
|
||||
return false;
|
||||
|
@ -786,7 +794,7 @@ Val* Manager::ValueToIndexVal(int num_fields, const RecordType *type, const Valu
|
|||
|
||||
|
||||
void Manager::SendEntry(ReaderFrontend* reader, Value* *vals) {
|
||||
Filter *i = FindFilter(reader);
|
||||
Stream *i = FindStream(reader);
|
||||
if ( i == 0 ) {
|
||||
reporter->InternalError("Unknown reader in SendEntry");
|
||||
return;
|
||||
|
@ -797,7 +805,7 @@ void Manager::SendEntry(ReaderFrontend* reader, Value* *vals) {
|
|||
readFields = SendEntryTable(i, vals);
|
||||
} else if ( i->filter_type == EVENT_FILTER ) {
|
||||
EnumVal *type = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event);
|
||||
readFields = SendEventFilterEvent(i, type, vals);
|
||||
readFields = SendEventStreamEvent(i, type, vals);
|
||||
} else {
|
||||
assert(false);
|
||||
}
|
||||
|
@ -808,13 +816,13 @@ void Manager::SendEntry(ReaderFrontend* reader, Value* *vals) {
|
|||
delete [] vals;
|
||||
}
|
||||
|
||||
int Manager::SendEntryTable(Filter* i, const Value* const *vals) {
|
||||
int Manager::SendEntryTable(Stream* i, const Value* const *vals) {
|
||||
bool updated = false;
|
||||
|
||||
assert(i);
|
||||
|
||||
assert(i->filter_type == TABLE_FILTER);
|
||||
TableFilter* filter = (TableFilter*) i;
|
||||
TableStream* filter = (TableStream*) i;
|
||||
|
||||
HashKey* idxhash = HashValues(filter->num_idx_fields, vals);
|
||||
|
||||
|
@ -979,7 +987,7 @@ int Manager::SendEntryTable(Filter* i, const Value* const *vals) {
|
|||
|
||||
|
||||
void Manager::EndCurrentSend(ReaderFrontend* reader) {
|
||||
Filter *i = FindFilter(reader);
|
||||
Stream *i = FindStream(reader);
|
||||
if ( i == 0 ) {
|
||||
reporter->InternalError("Unknown reader in EndCurrentSend");
|
||||
return;
|
||||
|
@ -996,7 +1004,7 @@ void Manager::EndCurrentSend(ReaderFrontend* reader) {
|
|||
}
|
||||
|
||||
assert(i->filter_type == TABLE_FILTER);
|
||||
TableFilter* filter = (TableFilter*) i;
|
||||
TableStream* filter = (TableStream*) i;
|
||||
|
||||
// lastdict contains all deleted entries and should be empty apart from that
|
||||
IterCookie *c = filter->lastDict->InitForIteration();
|
||||
|
@ -1083,7 +1091,7 @@ void Manager::EndCurrentSend(ReaderFrontend* reader) {
|
|||
}
|
||||
|
||||
void Manager::Put(ReaderFrontend* reader, Value* *vals) {
|
||||
Filter *i = FindFilter(reader);
|
||||
Stream *i = FindStream(reader);
|
||||
if ( i == 0 ) {
|
||||
reporter->InternalError("Unknown reader in Put");
|
||||
return;
|
||||
|
@ -1094,7 +1102,7 @@ void Manager::Put(ReaderFrontend* reader, Value* *vals) {
|
|||
readFields = PutTable(i, vals);
|
||||
} else if ( i->filter_type == EVENT_FILTER ) {
|
||||
EnumVal *type = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event);
|
||||
readFields = SendEventFilterEvent(i, type, vals);
|
||||
readFields = SendEventStreamEvent(i, type, vals);
|
||||
} else {
|
||||
assert(false);
|
||||
}
|
||||
|
@ -1106,11 +1114,11 @@ void Manager::Put(ReaderFrontend* reader, Value* *vals) {
|
|||
|
||||
}
|
||||
|
||||
int Manager::SendEventFilterEvent(Filter* i, EnumVal* type, const Value* const *vals) {
|
||||
int Manager::SendEventStreamEvent(Stream* i, EnumVal* type, const Value* const *vals) {
|
||||
assert(i);
|
||||
|
||||
assert(i->filter_type == EVENT_FILTER);
|
||||
EventFilter* filter = (EventFilter*) i;
|
||||
EventStream* filter = (EventStream*) i;
|
||||
|
||||
Val *val;
|
||||
list<Val*> out_vals;
|
||||
|
@ -1143,11 +1151,11 @@ int Manager::SendEventFilterEvent(Filter* i, EnumVal* type, const Value* const *
|
|||
|
||||
}
|
||||
|
||||
int Manager::PutTable(Filter* i, const Value* const *vals) {
|
||||
int Manager::PutTable(Stream* i, const Value* const *vals) {
|
||||
assert(i);
|
||||
|
||||
assert(i->filter_type == TABLE_FILTER);
|
||||
TableFilter* filter = (TableFilter*) i;
|
||||
TableStream* filter = (TableStream*) i;
|
||||
|
||||
Val* idxval = ValueToIndexVal(filter->num_idx_fields, filter->itype, vals);
|
||||
Val* valval;
|
||||
|
@ -1244,7 +1252,7 @@ int Manager::PutTable(Filter* i, const Value* const *vals) {
|
|||
|
||||
// Todo:: perhaps throw some kind of clear-event?
|
||||
void Manager::Clear(ReaderFrontend* reader) {
|
||||
Filter *i = FindFilter(reader);
|
||||
Stream *i = FindStream(reader);
|
||||
if ( i == 0 ) {
|
||||
reporter->InternalError("Unknown reader in Clear");
|
||||
return;
|
||||
|
@ -1256,14 +1264,14 @@ void Manager::Clear(ReaderFrontend* reader) {
|
|||
#endif
|
||||
|
||||
assert(i->filter_type == TABLE_FILTER);
|
||||
TableFilter* filter = (TableFilter*) i;
|
||||
TableStream* filter = (TableStream*) i;
|
||||
|
||||
filter->tab->RemoveAll();
|
||||
}
|
||||
|
||||
// put interface: delete old entry from table.
|
||||
bool Manager::Delete(ReaderFrontend* reader, Value* *vals) {
|
||||
Filter *i = FindFilter(reader);
|
||||
Stream *i = FindStream(reader);
|
||||
if ( i == 0 ) {
|
||||
reporter->InternalError("Unknown reader in Delete");
|
||||
return false;
|
||||
|
@ -1273,7 +1281,7 @@ bool Manager::Delete(ReaderFrontend* reader, Value* *vals) {
|
|||
int readVals = 0;
|
||||
|
||||
if ( i->filter_type == TABLE_FILTER ) {
|
||||
TableFilter* filter = (TableFilter*) i;
|
||||
TableStream* filter = (TableStream*) i;
|
||||
Val* idxval = ValueToIndexVal(filter->num_idx_fields, filter->itype, vals);
|
||||
assert(idxval != 0);
|
||||
readVals = filter->num_idx_fields + filter->num_val_fields;
|
||||
|
@ -1320,7 +1328,7 @@ bool Manager::Delete(ReaderFrontend* reader, Value* *vals) {
|
|||
}
|
||||
} else if ( i->filter_type == EVENT_FILTER ) {
|
||||
EnumVal *type = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event);
|
||||
readVals = SendEventFilterEvent(i, type, vals);
|
||||
readVals = SendEventStreamEvent(i, type, vals);
|
||||
success = true;
|
||||
} else {
|
||||
assert(false);
|
||||
|
@ -1840,9 +1848,9 @@ Val* Manager::ValueToVal(const Value* val, BroType* request_type) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
Manager::Filter* Manager::FindFilter(const string &name)
|
||||
Manager::Stream* Manager::FindStream(const string &name)
|
||||
{
|
||||
for ( map<ReaderFrontend*, Filter*>::iterator s = readers.begin(); s != readers.end(); ++s )
|
||||
for ( map<ReaderFrontend*, Stream*>::iterator s = readers.begin(); s != readers.end(); ++s )
|
||||
{
|
||||
if ( (*s).second->name == name )
|
||||
{
|
||||
|
@ -1853,9 +1861,9 @@ Manager::Filter* Manager::FindFilter(const string &name)
|
|||
return 0;
|
||||
}
|
||||
|
||||
Manager::Filter* Manager::FindFilter(ReaderFrontend* reader)
|
||||
Manager::Stream* Manager::FindStream(ReaderFrontend* reader)
|
||||
{
|
||||
map<ReaderFrontend*, Filter*>::iterator s = readers.find(reader);
|
||||
map<ReaderFrontend*, Stream*>::iterator s = readers.find(reader);
|
||||
if ( s != readers.end() ) {
|
||||
return s->second;
|
||||
}
|
||||
|
|
|
@ -107,25 +107,25 @@ protected:
|
|||
// Functions are called from the ReaderBackend to notify the manager, that a filter has been removed
|
||||
// or a stream has been closed.
|
||||
// Used to prevent race conditions where data for a specific filter is still in the queue when the
|
||||
// RemoveFilter directive is executed by the main thread.
|
||||
// RemoveStream directive is executed by the main thread.
|
||||
// This makes sure all data that has ben queued for a filter is still received.
|
||||
bool RemoveStreamContinuation(ReaderFrontend* reader);
|
||||
|
||||
private:
|
||||
class Filter;
|
||||
class TableFilter;
|
||||
class EventFilter;
|
||||
class Stream;
|
||||
class TableStream;
|
||||
class EventStream;
|
||||
|
||||
bool CreateStream(Filter*, RecordVal* description);
|
||||
bool CreateStream(Stream*, RecordVal* description);
|
||||
|
||||
// SendEntry implementation for Tablefilter
|
||||
int SendEntryTable(Filter* i, const threading::Value* const *vals);
|
||||
int SendEntryTable(Stream* i, const threading::Value* const *vals);
|
||||
|
||||
// Put implementation for Tablefilter
|
||||
int PutTable(Filter* i, const threading::Value* const *vals);
|
||||
int PutTable(Stream* i, const threading::Value* const *vals);
|
||||
|
||||
// SendEntry and Put implementation for Eventfilter
|
||||
int SendEventFilterEvent(Filter* i, EnumVal* type, const threading::Value* const *vals);
|
||||
int SendEventStreamEvent(Stream* i, EnumVal* type, const threading::Value* const *vals);
|
||||
|
||||
// Checks is a bro type can be used for data reading. The equivalend in threading cannot be used, because we have support different types
|
||||
// from the log framework
|
||||
|
@ -163,12 +163,12 @@ private:
|
|||
// Converts a Bro ListVal to a RecordVal given the record type
|
||||
RecordVal* ListValToRecordVal(ListVal* list, RecordType *request_type, int* position);
|
||||
|
||||
Filter* FindFilter(const string &name);
|
||||
Filter* FindFilter(ReaderFrontend* reader);
|
||||
Stream* FindStream(const string &name);
|
||||
Stream* FindStream(ReaderFrontend* reader);
|
||||
|
||||
enum FilterType { TABLE_FILTER, EVENT_FILTER };
|
||||
enum StreamType { TABLE_FILTER, EVENT_FILTER };
|
||||
|
||||
map<ReaderFrontend*, Filter*> readers;
|
||||
map<ReaderFrontend*, Stream*> readers;
|
||||
};
|
||||
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue