mirror of
https://github.com/zeek/zeek.git
synced 2025-10-06 16:48:19 +00:00
way less compile errors.
This commit is contained in:
parent
e726bfe301
commit
833e724400
9 changed files with 257 additions and 151 deletions
|
@ -3,18 +3,20 @@
|
|||
#include <algorithm>
|
||||
|
||||
#include "Manager.h"
|
||||
#include "ReaderFrontend.h"
|
||||
#include "ReaderBackend.h"
|
||||
#include "readers/Ascii.h"
|
||||
|
||||
#include "Event.h"
|
||||
#include "EventHandler.h"
|
||||
#include "NetVar.h"
|
||||
#include "Net.h"
|
||||
|
||||
|
||||
#include "InputReader.h"
|
||||
|
||||
#include "InputReaderAscii.h"
|
||||
|
||||
#include "CompHash.h"
|
||||
|
||||
#include "../threading/SerializationTypes.h"
|
||||
|
||||
using namespace input;
|
||||
using threading::Value;
|
||||
using threading::Field;
|
||||
|
@ -99,7 +101,7 @@ Manager::TableFilter::~TableFilter() {
|
|||
struct Manager::ReaderInfo {
|
||||
EnumVal* id;
|
||||
EnumVal* type;
|
||||
InputReader* reader;
|
||||
ReaderFrontend* reader;
|
||||
|
||||
//list<string> events; // events we fire when "something" happens
|
||||
map<int, Manager::Filter*> filters; // filters that can prevent our actions
|
||||
|
@ -132,38 +134,27 @@ bool Manager::ReaderInfo::HasFilter(int id) {
|
|||
}
|
||||
|
||||
|
||||
struct InputReaderDefinition {
|
||||
struct ReaderDefinition {
|
||||
bro_int_t type; // the type
|
||||
const char *name; // descriptive name for error messages
|
||||
bool (*init)(); // optional one-time inifializing function
|
||||
InputReader* (*factory)(); // factory function for creating instances
|
||||
ReaderBackend* (*factory)(ReaderFrontend* frontend); // factory function for creating instances
|
||||
};
|
||||
|
||||
InputReaderDefinition input_readers[] = {
|
||||
{ BifEnum::Input::READER_ASCII, "Ascii", 0, InputReaderAscii::Instantiate },
|
||||
ReaderDefinition input_readers[] = {
|
||||
{ BifEnum::Input::READER_ASCII, "Ascii", 0, reader::Ascii::Instantiate },
|
||||
|
||||
// End marker
|
||||
{ BifEnum::Input::READER_DEFAULT, "None", 0, (InputReader* (*)())0 }
|
||||
{ BifEnum::Input::READER_DEFAULT, "None", 0, (ReaderBackend* (*)(ReaderFrontend* frontend))0 }
|
||||
};
|
||||
|
||||
Manager::Manager()
|
||||
{
|
||||
}
|
||||
|
||||
// create a new input reader object to be used at whomevers leisure lateron.
|
||||
InputReader* Manager::CreateStream(EnumVal* id, RecordVal* description)
|
||||
{
|
||||
InputReaderDefinition* ir = input_readers;
|
||||
|
||||
RecordType* rtype = description->Type()->AsRecordType();
|
||||
if ( ! same_type(rtype, BifType::Record::Input::StreamDescription, 0) )
|
||||
{
|
||||
reporter->Error("Streamdescription argument not of right type");
|
||||
return 0;
|
||||
}
|
||||
ReaderBackend* Manager::CreateBackend(ReaderFrontend* frontend, bro_int_t type) {
|
||||
ReaderDefinition* ir = input_readers;
|
||||
|
||||
EnumVal* reader = description->LookupWithDefault(rtype->FieldOffset("reader"))->AsEnumVal();
|
||||
|
||||
while ( true ) {
|
||||
if ( ir->type == BifEnum::Input::READER_DEFAULT )
|
||||
{
|
||||
|
@ -171,7 +162,7 @@ InputReader* Manager::CreateStream(EnumVal* id, RecordVal* description)
|
|||
return 0;
|
||||
}
|
||||
|
||||
if ( ir->type != reader->AsEnum() ) {
|
||||
if ( ir->type != type ) {
|
||||
// no, didn't find the right one...
|
||||
++ir;
|
||||
continue;
|
||||
|
@ -201,9 +192,30 @@ InputReader* Manager::CreateStream(EnumVal* id, RecordVal* description)
|
|||
// all done. break.
|
||||
break;
|
||||
}
|
||||
|
||||
assert(ir->factory);
|
||||
InputReader* reader_obj = (*ir->factory)();
|
||||
|
||||
ReaderBackend* backend = (*ir->factory)(frontend);
|
||||
assert(backend);
|
||||
|
||||
frontend->ty_name = ir->name;
|
||||
return backend;
|
||||
}
|
||||
|
||||
// create a new input reader object to be used at whomevers leisure lateron.
|
||||
ReaderFrontend* Manager::CreateStream(EnumVal* id, RecordVal* description)
|
||||
{
|
||||
ReaderDefinition* ir = input_readers;
|
||||
|
||||
RecordType* rtype = description->Type()->AsRecordType();
|
||||
if ( ! same_type(rtype, BifType::Record::Input::StreamDescription, 0) )
|
||||
{
|
||||
reporter->Error("Streamdescription argument not of right type");
|
||||
return 0;
|
||||
}
|
||||
|
||||
EnumVal* reader = description->LookupWithDefault(rtype->FieldOffset("reader"))->AsEnumVal();
|
||||
|
||||
ReaderFrontend* reader_obj = new ReaderFrontend(id->AsEnum());
|
||||
assert(reader_obj);
|
||||
|
||||
// get the source...
|
||||
|
@ -217,16 +229,16 @@ InputReader* Manager::CreateStream(EnumVal* id, RecordVal* description)
|
|||
|
||||
readers.push_back(info);
|
||||
|
||||
int success = reader_obj->Init(source);
|
||||
if ( success == false ) {
|
||||
reader_obj->Init(source);
|
||||
/* if ( success == false ) {
|
||||
assert( RemoveStream(id) );
|
||||
return 0;
|
||||
}
|
||||
success = reader_obj->Update();
|
||||
if ( success == false ) {
|
||||
} */
|
||||
reader_obj->Update();
|
||||
/* if ( success == false ) {
|
||||
assert ( RemoveStream(id) );
|
||||
return 0;
|
||||
}
|
||||
} */
|
||||
|
||||
return reader_obj;
|
||||
|
||||
|
@ -306,7 +318,7 @@ bool Manager::AddEventFilter(EnumVal *id, RecordVal* fval) {
|
|||
}
|
||||
|
||||
|
||||
vector<LogField*> fieldsV; // vector, because UnrollRecordType needs it
|
||||
vector<Field*> fieldsV; // vector, because UnrollRecordType needs it
|
||||
|
||||
bool status = !UnrollRecordType(&fieldsV, fields, "");
|
||||
|
||||
|
@ -316,7 +328,7 @@ bool Manager::AddEventFilter(EnumVal *id, RecordVal* fval) {
|
|||
}
|
||||
|
||||
|
||||
LogField** logf = new LogField*[fieldsV.size()];
|
||||
Field** logf = new Field*[fieldsV.size()];
|
||||
for ( unsigned int i = 0; i < fieldsV.size(); i++ ) {
|
||||
logf[i] = fieldsV[i];
|
||||
}
|
||||
|
@ -410,7 +422,7 @@ bool Manager::AddTableFilter(EnumVal *id, RecordVal* fval) {
|
|||
|
||||
}
|
||||
|
||||
vector<LogField*> fieldsV; // vector, because we don't know the length beforehands
|
||||
vector<Field*> fieldsV; // vector, because we don't know the length beforehands
|
||||
|
||||
bool status = !UnrollRecordType(&fieldsV, idx, "");
|
||||
|
||||
|
@ -430,7 +442,7 @@ bool Manager::AddTableFilter(EnumVal *id, RecordVal* fval) {
|
|||
}
|
||||
|
||||
|
||||
LogField** fields = new LogField*[fieldsV.size()];
|
||||
Field** fields = new Field*[fieldsV.size()];
|
||||
for ( unsigned int i = 0; i < fieldsV.size(); i++ ) {
|
||||
fields[i] = fieldsV[i];
|
||||
}
|
||||
|
@ -538,12 +550,12 @@ bool Manager::RemoveStream(const EnumVal* id) {
|
|||
return true;
|
||||
}
|
||||
|
||||
bool Manager::UnrollRecordType(vector<LogField*> *fields, const RecordType *rec, const string& nameprepend) {
|
||||
bool Manager::UnrollRecordType(vector<Field*> *fields, const RecordType *rec, const string& nameprepend) {
|
||||
for ( int i = 0; i < rec->NumFields(); i++ )
|
||||
{
|
||||
|
||||
if ( !IsCompatibleType(rec->FieldType(i)) ) {
|
||||
reporter->Error("Incompatible type \"%s\" in table definition for InputReader", type_name(rec->FieldType(i)->Tag()));
|
||||
reporter->Error("Incompatible type \"%s\" in table definition for ReaderFrontend", type_name(rec->FieldType(i)->Tag()));
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -557,7 +569,7 @@ bool Manager::UnrollRecordType(vector<LogField*> *fields, const RecordType *rec,
|
|||
}
|
||||
|
||||
} else {
|
||||
LogField* field = new LogField();
|
||||
Field* field = new Field();
|
||||
field->name = nameprepend + rec->FieldName(i);
|
||||
field->type = rec->FieldType(i)->Tag();
|
||||
if ( field->type == TYPE_TABLE ) {
|
||||
|
@ -591,7 +603,9 @@ bool Manager::ForceUpdate(const EnumVal* id)
|
|||
return false;
|
||||
}
|
||||
|
||||
return i->reader->Update();
|
||||
i->reader->Update();
|
||||
|
||||
return true; // update is async :(
|
||||
}
|
||||
|
||||
bool Manager::RemoveTableFilter(EnumVal* id, const string &name) {
|
||||
|
@ -638,21 +652,21 @@ bool Manager::RemoveEventFilter(EnumVal* id, const string &name) {
|
|||
return true;
|
||||
}
|
||||
|
||||
Val* Manager::LogValToIndexVal(int num_fields, const RecordType *type, const LogVal* const *vals) {
|
||||
Val* Manager::ValueToIndexVal(int num_fields, const RecordType *type, const Value* const *vals) {
|
||||
Val* idxval;
|
||||
int position = 0;
|
||||
|
||||
|
||||
if ( num_fields == 1 && type->FieldType(0)->Tag() != TYPE_RECORD ) {
|
||||
idxval = LogValToVal(vals[0], type->FieldType(0));
|
||||
idxval = ValueToVal(vals[0], type->FieldType(0));
|
||||
position = 1;
|
||||
} else {
|
||||
ListVal *l = new ListVal(TYPE_ANY);
|
||||
for ( int j = 0 ; j < type->NumFields(); j++ ) {
|
||||
if ( type->FieldType(j)->Tag() == TYPE_RECORD ) {
|
||||
l->Append(LogValToRecordVal(vals, type->FieldType(j)->AsRecordType(), &position));
|
||||
l->Append(ValueToRecordVal(vals, type->FieldType(j)->AsRecordType(), &position));
|
||||
} else {
|
||||
l->Append(LogValToVal(vals[position], type->FieldType(j)));
|
||||
l->Append(ValueToVal(vals[position], type->FieldType(j)));
|
||||
position++;
|
||||
}
|
||||
}
|
||||
|
@ -666,7 +680,7 @@ Val* Manager::LogValToIndexVal(int num_fields, const RecordType *type, const Log
|
|||
}
|
||||
|
||||
|
||||
void Manager::SendEntry(const InputReader* reader, int id, const LogVal* const *vals) {
|
||||
void Manager::SendEntry(const ReaderFrontend* reader, int id, const Value* const *vals) {
|
||||
ReaderInfo *i = FindReader(reader);
|
||||
if ( i == 0 ) {
|
||||
reporter->InternalError("Unknown reader");
|
||||
|
@ -689,7 +703,7 @@ void Manager::SendEntry(const InputReader* reader, int id, const LogVal* const *
|
|||
|
||||
}
|
||||
|
||||
void Manager::SendEntryTable(const InputReader* reader, int id, const LogVal* const *vals) {
|
||||
void Manager::SendEntryTable(const ReaderFrontend* reader, int id, const Value* const *vals) {
|
||||
ReaderInfo *i = FindReader(reader);
|
||||
|
||||
bool updated = false;
|
||||
|
@ -701,12 +715,12 @@ void Manager::SendEntryTable(const InputReader* reader, int id, const LogVal* co
|
|||
TableFilter* filter = (TableFilter*) i->filters[id];
|
||||
|
||||
//reporter->Error("Hashing %d index fields", i->num_idx_fields);
|
||||
HashKey* idxhash = HashLogVals(filter->num_idx_fields, vals);
|
||||
HashKey* idxhash = HashValues(filter->num_idx_fields, vals);
|
||||
//reporter->Error("Result: %d", (uint64_t) idxhash->Hash());
|
||||
//reporter->Error("Hashing %d val fields", i->num_val_fields);
|
||||
HashKey* valhash = 0;
|
||||
if ( filter->num_val_fields > 0 )
|
||||
HashLogVals(filter->num_val_fields, vals+filter->num_idx_fields);
|
||||
HashValues(filter->num_val_fields, vals+filter->num_idx_fields);
|
||||
|
||||
//reporter->Error("Result: %d", (uint64_t) valhash->Hash());
|
||||
|
||||
|
@ -731,16 +745,16 @@ void Manager::SendEntryTable(const InputReader* reader, int id, const LogVal* co
|
|||
}
|
||||
|
||||
|
||||
Val* idxval = LogValToIndexVal(filter->num_idx_fields, filter->itype, vals);
|
||||
Val* idxval = ValueToIndexVal(filter->num_idx_fields, filter->itype, vals);
|
||||
Val* valval;
|
||||
|
||||
int position = filter->num_idx_fields;
|
||||
if ( filter->num_val_fields == 0 ) {
|
||||
valval = 0;
|
||||
} else if ( filter->num_val_fields == 1 && !filter->want_record ) {
|
||||
valval = LogValToVal(vals[position], filter->rtype->FieldType(0));
|
||||
valval = ValueToVal(vals[position], filter->rtype->FieldType(0));
|
||||
} else {
|
||||
valval = LogValToRecordVal(vals, filter->rtype, &position);
|
||||
valval = ValueToRecordVal(vals, filter->rtype, &position);
|
||||
}
|
||||
|
||||
|
||||
|
@ -757,7 +771,7 @@ void Manager::SendEntryTable(const InputReader* reader, int id, const LogVal* co
|
|||
EnumVal* ev;
|
||||
//Ref(idxval);
|
||||
int startpos = 0;
|
||||
Val* predidx = LogValToRecordVal(vals, filter->itype, &startpos);
|
||||
Val* predidx = ValueToRecordVal(vals, filter->itype, &startpos);
|
||||
Ref(valval);
|
||||
|
||||
if ( updated ) {
|
||||
|
@ -831,7 +845,7 @@ void Manager::SendEntryTable(const InputReader* reader, int id, const LogVal* co
|
|||
}
|
||||
|
||||
|
||||
void Manager::EndCurrentSend(const InputReader* reader, int id) {
|
||||
void Manager::EndCurrentSend(const ReaderFrontend* reader, int id) {
|
||||
ReaderInfo *i = FindReader(reader);
|
||||
if ( i == 0 ) {
|
||||
reporter->InternalError("Unknown reader");
|
||||
|
@ -914,7 +928,7 @@ void Manager::EndCurrentSend(const InputReader* reader, int id) {
|
|||
filter->currDict = new PDict(InputHash);
|
||||
}
|
||||
|
||||
void Manager::Put(const InputReader* reader, int id, const LogVal* const *vals) {
|
||||
void Manager::Put(const ReaderFrontend* reader, int id, const Value* const *vals) {
|
||||
ReaderInfo *i = FindReader(reader);
|
||||
if ( i == 0 ) {
|
||||
reporter->InternalError("Unknown reader");
|
||||
|
@ -937,7 +951,7 @@ void Manager::Put(const InputReader* reader, int id, const LogVal* const *vals)
|
|||
|
||||
}
|
||||
|
||||
void Manager::SendEventFilterEvent(const InputReader* reader, EnumVal* type, int id, const LogVal* const *vals) {
|
||||
void Manager::SendEventFilterEvent(const ReaderFrontend* reader, EnumVal* type, int id, const Value* const *vals) {
|
||||
ReaderInfo *i = FindReader(reader);
|
||||
|
||||
bool updated = false;
|
||||
|
@ -956,15 +970,15 @@ void Manager::SendEventFilterEvent(const InputReader* reader, EnumVal* type, int
|
|||
|
||||
int position = 0;
|
||||
if ( filter->want_record ) {
|
||||
RecordVal * r = LogValToRecordVal(vals, filter->fields, &position);
|
||||
RecordVal * r = ValueToRecordVal(vals, filter->fields, &position);
|
||||
out_vals.push_back(r);
|
||||
} else {
|
||||
for ( int j = 0; j < filter->fields->NumFields(); j++) {
|
||||
Val* val = 0;
|
||||
if ( filter->fields->FieldType(j)->Tag() == TYPE_RECORD ) {
|
||||
val = LogValToRecordVal(vals, filter->fields->FieldType(j)->AsRecordType(), &position);
|
||||
val = ValueToRecordVal(vals, filter->fields->FieldType(j)->AsRecordType(), &position);
|
||||
} else {
|
||||
val = LogValToVal(vals[position], filter->fields->FieldType(j));
|
||||
val = ValueToVal(vals[position], filter->fields->FieldType(j));
|
||||
position++;
|
||||
}
|
||||
out_vals.push_back(val);
|
||||
|
@ -975,7 +989,7 @@ void Manager::SendEventFilterEvent(const InputReader* reader, EnumVal* type, int
|
|||
|
||||
}
|
||||
|
||||
void Manager::PutTable(const InputReader* reader, int id, const LogVal* const *vals) {
|
||||
void Manager::PutTable(const ReaderFrontend* reader, int id, const Value* const *vals) {
|
||||
ReaderInfo *i = FindReader(reader);
|
||||
|
||||
assert(i);
|
||||
|
@ -984,22 +998,22 @@ void Manager::PutTable(const InputReader* reader, int id, const LogVal* const *v
|
|||
assert(i->filters[id]->filter_type == TABLE_FILTER);
|
||||
TableFilter* filter = (TableFilter*) i->filters[id];
|
||||
|
||||
Val* idxval = LogValToIndexVal(filter->num_idx_fields, filter->itype, vals);
|
||||
Val* idxval = ValueToIndexVal(filter->num_idx_fields, filter->itype, vals);
|
||||
Val* valval;
|
||||
|
||||
int position = filter->num_idx_fields;
|
||||
if ( filter->num_val_fields == 0 ) {
|
||||
valval = 0;
|
||||
} else if ( filter->num_val_fields == 1 && !filter->want_record ) {
|
||||
valval = LogValToVal(vals[filter->num_idx_fields], filter->rtype->FieldType(filter->num_idx_fields));
|
||||
valval = ValueToVal(vals[filter->num_idx_fields], filter->rtype->FieldType(filter->num_idx_fields));
|
||||
} else {
|
||||
valval = LogValToRecordVal(vals, filter->rtype, &position);
|
||||
valval = ValueToRecordVal(vals, filter->rtype, &position);
|
||||
}
|
||||
|
||||
filter->tab->Assign(idxval, valval);
|
||||
}
|
||||
|
||||
void Manager::Clear(const InputReader* reader, int id) {
|
||||
void Manager::Clear(const ReaderFrontend* reader, int id) {
|
||||
ReaderInfo *i = FindReader(reader);
|
||||
if ( i == 0 ) {
|
||||
reporter->InternalError("Unknown reader");
|
||||
|
@ -1014,7 +1028,7 @@ void Manager::Clear(const InputReader* reader, int id) {
|
|||
filter->tab->RemoveAll();
|
||||
}
|
||||
|
||||
bool Manager::Delete(const InputReader* reader, int id, const LogVal* const *vals) {
|
||||
bool Manager::Delete(const ReaderFrontend* reader, int id, const Value* const *vals) {
|
||||
ReaderInfo *i = FindReader(reader);
|
||||
if ( i == 0 ) {
|
||||
reporter->InternalError("Unknown reader");
|
||||
|
@ -1025,7 +1039,7 @@ bool Manager::Delete(const InputReader* reader, int id, const LogVal* const *val
|
|||
|
||||
if ( i->filters[id]->filter_type == TABLE_FILTER ) {
|
||||
TableFilter* filter = (TableFilter*) i->filters[id];
|
||||
Val* idxval = LogValToIndexVal(filter->num_idx_fields, filter->itype, vals);
|
||||
Val* idxval = ValueToIndexVal(filter->num_idx_fields, filter->itype, vals);
|
||||
return( filter->tab->Delete(idxval) != 0 );
|
||||
} else if ( i->filters[id]->filter_type == EVENT_FILTER ) {
|
||||
EnumVal *type = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event);
|
||||
|
@ -1037,12 +1051,12 @@ bool Manager::Delete(const InputReader* reader, int id, const LogVal* const *val
|
|||
}
|
||||
}
|
||||
|
||||
void Manager::Error(InputReader* reader, const char* msg)
|
||||
void Manager::Error(ReaderFrontend* reader, const char* msg)
|
||||
{
|
||||
reporter->Error("error with input reader for %s: %s", reader->Source().c_str(), msg);
|
||||
}
|
||||
|
||||
bool Manager::SendEvent(const string& name, const int num_vals, const LogVal* const *vals)
|
||||
bool Manager::SendEvent(const string& name, const int num_vals, const Value* const *vals)
|
||||
{
|
||||
EventHandler* handler = event_registry->Lookup(name.c_str());
|
||||
if ( handler == 0 ) {
|
||||
|
@ -1059,7 +1073,7 @@ bool Manager::SendEvent(const string& name, const int num_vals, const LogVal* co
|
|||
|
||||
val_list* vl = new val_list;
|
||||
for ( int i = 0; i < num_vals; i++) {
|
||||
vl->append(LogValToVal(vals[i], type->FieldType(i)));
|
||||
vl->append(ValueToVal(vals[i], type->FieldType(i)));
|
||||
}
|
||||
|
||||
mgr.Dispatch(new Event(handler, vl));
|
||||
|
@ -1118,7 +1132,7 @@ RecordVal* Manager::ListValToRecordVal(ListVal* list, RecordType *request_type,
|
|||
|
||||
|
||||
|
||||
RecordVal* Manager::LogValToRecordVal(const LogVal* const *vals, RecordType *request_type, int* position) {
|
||||
RecordVal* Manager::ValueToRecordVal(const Value* const *vals, RecordType *request_type, int* position) {
|
||||
if ( position == 0 ) {
|
||||
reporter->InternalError("Need position");
|
||||
return 0;
|
||||
|
@ -1136,9 +1150,9 @@ RecordVal* Manager::LogValToRecordVal(const LogVal* const *vals, RecordType *req
|
|||
|
||||
Val* fieldVal = 0;
|
||||
if ( request_type->FieldType(i)->Tag() == TYPE_RECORD ) {
|
||||
fieldVal = LogValToRecordVal(vals, request_type->FieldType(i)->AsRecordType(), position);
|
||||
fieldVal = ValueToRecordVal(vals, request_type->FieldType(i)->AsRecordType(), position);
|
||||
} else {
|
||||
fieldVal = LogValToVal(vals[*position], request_type->FieldType(i));
|
||||
fieldVal = ValueToVal(vals[*position], request_type->FieldType(i));
|
||||
(*position)++;
|
||||
}
|
||||
|
||||
|
@ -1150,7 +1164,7 @@ RecordVal* Manager::LogValToRecordVal(const LogVal* const *vals, RecordType *req
|
|||
}
|
||||
|
||||
|
||||
int Manager::GetLogValLength(const LogVal* val) {
|
||||
int Manager::GetValueLength(const Value* val) {
|
||||
int length = 0;
|
||||
|
||||
switch (val->type) {
|
||||
|
@ -1193,7 +1207,7 @@ int Manager::GetLogValLength(const LogVal* val) {
|
|||
|
||||
case TYPE_TABLE: {
|
||||
for ( int i = 0; i < val->val.set_val.size; i++ ) {
|
||||
length += GetLogValLength(val->val.set_val.vals[i]);
|
||||
length += GetValueLength(val->val.set_val.vals[i]);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
@ -1201,20 +1215,20 @@ int Manager::GetLogValLength(const LogVal* val) {
|
|||
case TYPE_VECTOR: {
|
||||
int j = val->val.vector_val.size;
|
||||
for ( int i = 0; i < j; i++ ) {
|
||||
length += GetLogValLength(val->val.vector_val.vals[i]);
|
||||
length += GetValueLength(val->val.vector_val.vals[i]);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
reporter->InternalError("unsupported type %d for GetLogValLength", val->type);
|
||||
reporter->InternalError("unsupported type %d for GetValueLength", val->type);
|
||||
}
|
||||
|
||||
return length;
|
||||
|
||||
}
|
||||
|
||||
int Manager::CopyLogVal(char *data, const int startpos, const LogVal* val) {
|
||||
int Manager::CopyValue(char *data, const int startpos, const Value* val) {
|
||||
switch ( val->type ) {
|
||||
case TYPE_BOOL:
|
||||
case TYPE_INT:
|
||||
|
@ -1276,7 +1290,7 @@ int Manager::CopyLogVal(char *data, const int startpos, const LogVal* val) {
|
|||
case TYPE_TABLE: {
|
||||
int length = 0;
|
||||
for ( int i = 0; i < val->val.set_val.size; i++ ) {
|
||||
length += CopyLogVal(data, startpos+length, val->val.set_val.vals[i]);
|
||||
length += CopyValue(data, startpos+length, val->val.set_val.vals[i]);
|
||||
}
|
||||
return length;
|
||||
break;
|
||||
|
@ -1286,14 +1300,14 @@ int Manager::CopyLogVal(char *data, const int startpos, const LogVal* val) {
|
|||
int length = 0;
|
||||
int j = val->val.vector_val.size;
|
||||
for ( int i = 0; i < j; i++ ) {
|
||||
length += CopyLogVal(data, startpos+length, val->val.vector_val.vals[i]);
|
||||
length += CopyValue(data, startpos+length, val->val.vector_val.vals[i]);
|
||||
}
|
||||
return length;
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
reporter->InternalError("unsupported type %d for CopyLogVal", val->type);
|
||||
reporter->InternalError("unsupported type %d for CopyValue", val->type);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -1302,12 +1316,12 @@ int Manager::CopyLogVal(char *data, const int startpos, const LogVal* val) {
|
|||
|
||||
}
|
||||
|
||||
HashKey* Manager::HashLogVals(const int num_elements, const LogVal* const *vals) {
|
||||
HashKey* Manager::HashValues(const int num_elements, const Value* const *vals) {
|
||||
int length = 0;
|
||||
|
||||
for ( int i = 0; i < num_elements; i++ ) {
|
||||
const LogVal* val = vals[i];
|
||||
length += GetLogValLength(val);
|
||||
const Value* val = vals[i];
|
||||
length += GetValueLength(val);
|
||||
}
|
||||
|
||||
//reporter->Error("Length: %d", length);
|
||||
|
@ -1318,8 +1332,8 @@ HashKey* Manager::HashLogVals(const int num_elements, const LogVal* const *vals)
|
|||
reporter->InternalError("Could not malloc?");
|
||||
}
|
||||
for ( int i = 0; i < num_elements; i++ ) {
|
||||
const LogVal* val = vals[i];
|
||||
position += CopyLogVal(data, position, val);
|
||||
const Value* val = vals[i];
|
||||
position += CopyValue(data, position, val);
|
||||
}
|
||||
|
||||
assert(position == length);
|
||||
|
@ -1328,7 +1342,7 @@ HashKey* Manager::HashLogVals(const int num_elements, const LogVal* const *vals)
|
|||
|
||||
}
|
||||
|
||||
Val* Manager::LogValToVal(const LogVal* val, BroType* request_type) {
|
||||
Val* Manager::ValueToVal(const Value* val, BroType* request_type) {
|
||||
|
||||
if ( request_type->Tag() != TYPE_ANY && request_type->Tag() != val->type ) {
|
||||
reporter->InternalError("Typetags don't match: %d vs %d", request_type->Tag(), val->type);
|
||||
|
@ -1384,7 +1398,7 @@ Val* Manager::LogValToVal(const LogVal* val, BroType* request_type) {
|
|||
SetType* s = new SetType(set_index, 0);
|
||||
TableVal* t = new TableVal(s);
|
||||
for ( int i = 0; i < val->val.set_val.size; i++ ) {
|
||||
t->Assign(LogValToVal( val->val.set_val.vals[i], type ), 0);
|
||||
t->Assign(ValueToVal( val->val.set_val.vals[i], type ), 0);
|
||||
}
|
||||
return t;
|
||||
break;
|
||||
|
@ -1396,7 +1410,7 @@ Val* Manager::LogValToVal(const LogVal* val, BroType* request_type) {
|
|||
VectorType* vt = new VectorType(type->Ref());
|
||||
VectorVal* v = new VectorVal(vt);
|
||||
for ( int i = 0; i < val->val.vector_val.size; i++ ) {
|
||||
v->Assign(i, LogValToVal( val->val.set_val.vals[i], type ), 0);
|
||||
v->Assign(i, ValueToVal( val->val.set_val.vals[i], type ), 0);
|
||||
}
|
||||
return v;
|
||||
|
||||
|
@ -1425,7 +1439,7 @@ Val* Manager::LogValToVal(const LogVal* val, BroType* request_type) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
Manager::ReaderInfo* Manager::FindReader(const InputReader* reader)
|
||||
Manager::ReaderInfo* Manager::FindReader(const ReaderFrontend* reader)
|
||||
{
|
||||
for ( vector<ReaderInfo *>::iterator s = readers.begin(); s != readers.end(); ++s )
|
||||
{
|
||||
|
@ -1460,4 +1474,3 @@ string Manager::Hash(const string &input) {
|
|||
return out;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
namespace input {
|
||||
|
||||
class ReaderFrontend;
|
||||
class ReaderBackend;
|
||||
|
||||
class Manager {
|
||||
public:
|
||||
|
@ -30,7 +31,8 @@ public:
|
|||
bool RemoveEventFilter(EnumVal* id, const string &name);
|
||||
|
||||
protected:
|
||||
|
||||
friend class ReaderFrontend;
|
||||
|
||||
// Reports an error for the given reader.
|
||||
void Error(ReaderFrontend* reader, const char* msg);
|
||||
|
||||
|
@ -42,6 +44,8 @@ protected:
|
|||
// for readers to write to input stream in indirect mode (manager is monitoring new/deleted values)
|
||||
void SendEntry(const ReaderFrontend* reader, int id, const threading::Value* const *vals);
|
||||
void EndCurrentSend(const ReaderFrontend* reader, int id);
|
||||
|
||||
ReaderBackend* CreateBackend(ReaderFrontend* frontend, bro_int_t type);
|
||||
|
||||
private:
|
||||
struct ReaderInfo;
|
||||
|
|
|
@ -1,13 +1,44 @@
|
|||
// See the file "COPYING" in the main distribution directory for copyright.
|
||||
|
||||
#include "InputReader.h"
|
||||
#include "ReaderBackend.h"
|
||||
#include "ReaderFrontend.h"
|
||||
|
||||
using threading::Value;
|
||||
using threading::Field;
|
||||
|
||||
namespace logging {
|
||||
namespace input {
|
||||
|
||||
InputReader::InputReader(ReaderFrontend *arg_frontend) :MsgThread()
|
||||
class ErrorMessage : public threading::OutputMessage<ReaderFrontend> {
|
||||
public:
|
||||
ErrorMessage(ReaderFrontend* reader, string message)
|
||||
: threading::OutputMessage<ReaderFrontend>("Error", reader),
|
||||
message(message) {}
|
||||
|
||||
virtual bool Process() {
|
||||
input_mgr->Error(object, message.c_str());
|
||||
return true;
|
||||
}
|
||||
|
||||
private:
|
||||
string message;
|
||||
}
|
||||
|
||||
class PutMessage : public threading::OutputMessage<ReaderFrontend> {
|
||||
public:
|
||||
PutMessage(ReaderFrontend* reader, int id, const Value* const *val)
|
||||
: threading::OutputMessage<ReaderFrontend>("Error", reader),
|
||||
id(id), val(val) {}
|
||||
|
||||
virtual bool Process() {
|
||||
return input_mgr->Put(object, id, val);
|
||||
}
|
||||
|
||||
private:
|
||||
int id;
|
||||
Value* val;
|
||||
}
|
||||
|
||||
ReaderBackend::ReaderBackend(ReaderFrontend* arg_frontend) : MsgThread()
|
||||
{
|
||||
buf = 0;
|
||||
buf_len = 1024;
|
||||
|
@ -18,38 +49,47 @@ InputReader::InputReader(ReaderFrontend *arg_frontend) :MsgThread()
|
|||
SetName(frontend->Name());
|
||||
}
|
||||
|
||||
InputReader::~InputReader()
|
||||
ReaderBackend::~ReaderBackend()
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
void InputReader::Error(const char *msg)
|
||||
void ReaderBackend::Error(const string &msg)
|
||||
{
|
||||
input_mgr->Error(this, msg);
|
||||
SendOut(new ErrorMessage(frontend, msg);
|
||||
}
|
||||
|
||||
void InputReader::Error(const string &msg)
|
||||
void ReaderBackend::Put(int id, const Value* const *val)
|
||||
{
|
||||
input_mgr->Error(this, msg.c_str());
|
||||
SendOut(new PutMessage(frontend, id, val);
|
||||
}
|
||||
|
||||
void InputReader::Put(int id, const LogVal* const *val)
|
||||
void ReaderBackend::Delete(int id, const Value* const *val)
|
||||
{
|
||||
input_mgr->Put(this, id, val);
|
||||
SendOut(new DeleteMessage(frontend, id, val);
|
||||
}
|
||||
|
||||
void InputReader::Clear(int id)
|
||||
void ReaderBackend::Clear(int id)
|
||||
{
|
||||
input_mgr->Clear(this, id);
|
||||
SendOut(new ClearMessage(frontend, id);
|
||||
}
|
||||
|
||||
void InputReader::Delete(int id, const LogVal* const *val)
|
||||
bool ReaderBackend::SendEvent(const string& name, const int num_vals, const Value* const *vals)
|
||||
{
|
||||
input_mgr->Delete(this, id, val);
|
||||
SendOut(new SendEventMessage(frontend, name, num_vals, vals);
|
||||
}
|
||||
|
||||
void ReaderBackend::EndCurrentSend(int id)
|
||||
{
|
||||
SendOut(new EndCurrentSendMessage(frontent, id);
|
||||
}
|
||||
|
||||
void ReaderBackend::SendEntry(int id, const Value* const *vals)
|
||||
{
|
||||
SendOut(new SendEntryMessage(frontend, id, vals);
|
||||
}
|
||||
|
||||
bool InputReader::Init(string arg_source)
|
||||
bool ReaderBackend::Init(string arg_source)
|
||||
{
|
||||
source = arg_source;
|
||||
|
||||
|
@ -58,35 +98,31 @@ bool InputReader::Init(string arg_source)
|
|||
return !disabled;
|
||||
}
|
||||
|
||||
bool InputReader::AddFilter(int id, int arg_num_fields,
|
||||
const LogField* const * arg_fields)
|
||||
bool ReaderBackend::AddFilter(int id, int arg_num_fields,
|
||||
const Field* const * arg_fields)
|
||||
{
|
||||
return DoAddFilter(id, arg_num_fields, arg_fields);
|
||||
}
|
||||
|
||||
bool InputReader::RemoveFilter(int id)
|
||||
bool ReaderBackend::RemoveFilter(int id)
|
||||
{
|
||||
return DoRemoveFilter(id);
|
||||
}
|
||||
|
||||
void InputReader::Finish()
|
||||
void ReaderBackend::Finish()
|
||||
{
|
||||
DoFinish();
|
||||
disabled = true;
|
||||
}
|
||||
|
||||
bool InputReader::Update()
|
||||
bool ReaderBackend::Update()
|
||||
{
|
||||
return DoUpdate();
|
||||
}
|
||||
|
||||
bool InputReader::SendEvent(const string& name, const int num_vals, const LogVal* const *vals)
|
||||
{
|
||||
return input_mgr->SendEvent(name, num_vals, vals);
|
||||
}
|
||||
|
||||
// stolen from logwriter
|
||||
const char* InputReader::Fmt(const char* format, ...)
|
||||
const char* ReaderBackend::Fmt(const char* format, ...)
|
||||
{
|
||||
if ( ! buf )
|
||||
buf = (char*) malloc(buf_len);
|
||||
|
@ -111,14 +147,5 @@ const char* InputReader::Fmt(const char* format, ...)
|
|||
}
|
||||
|
||||
|
||||
void InputReader::SendEntry(int id, const LogVal* const *vals)
|
||||
{
|
||||
input_mgr->SendEntry(this, id, vals);
|
||||
}
|
||||
|
||||
void InputReader::EndCurrentSend(int id)
|
||||
{
|
||||
input_mgr->EndCurrentSend(this, id);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,26 +1,25 @@
|
|||
// See the file "COPYING" in the main distribution directory for copyright.
|
||||
//
|
||||
// Same notes about thread safety as in LogWriter.h apply.
|
||||
|
||||
|
||||
#ifndef INPUT_READERBACKEND_H
|
||||
#define INPUT_READERBACKEND_H
|
||||
|
||||
#include "InputMgr.h"
|
||||
#include "BroString.h"
|
||||
#include "LogMgr.h"
|
||||
#include "../threading/SerializationTypes.h"
|
||||
#include "threading/MsgThread.h"
|
||||
|
||||
namespace input {
|
||||
|
||||
class ReaderFrontend;
|
||||
|
||||
class ReaderBackend : public threading::MsgThread {
|
||||
public:
|
||||
ReaderBackend(ReaderFrontend *frontend);
|
||||
ReaderBackend(ReaderFrontend* frontend);
|
||||
|
||||
virtual ~ReaderBackend();
|
||||
|
||||
bool Init(string arg_source);
|
||||
|
||||
bool AddFilter( int id, int arg_num_fields, const LogField* const* fields );
|
||||
bool AddFilter( int id, int arg_num_fields, const threading::Field* const* fields );
|
||||
|
||||
bool RemoveFilter ( int id );
|
||||
|
||||
|
@ -32,7 +31,7 @@ protected:
|
|||
// Methods that have to be overwritten by the individual readers
|
||||
virtual bool DoInit(string arg_sources) = 0;
|
||||
|
||||
virtual bool DoAddFilter( int id, int arg_num_fields, const LogField* const* fields ) = 0;
|
||||
virtual bool DoAddFilter( int id, int arg_num_fields, const threading::Field* const* fields ) = 0;
|
||||
|
||||
virtual bool DoRemoveFilter( int id ) = 0;
|
||||
|
||||
|
@ -51,15 +50,15 @@ protected:
|
|||
// A thread-safe version of fmt(). (stolen from logwriter)
|
||||
const char* Fmt(const char* format, ...);
|
||||
|
||||
bool SendEvent(const string& name, const int num_vals, const LogVal* const *vals);
|
||||
bool SendEvent(const string& name, const int num_vals, const threading::Value* const *vals);
|
||||
|
||||
// Content-sendinf-functions (simple mode). Including table-specific stuff that simply is not used if we have no table
|
||||
void Put(int id, const LogVal* const *val);
|
||||
void Delete(int id, const LogVal* const *val);
|
||||
void Put(int id, const threading::Value* const *val);
|
||||
void Delete(int id, const threading::Value* const *val);
|
||||
void Clear(int id);
|
||||
|
||||
// Table-functions (tracking mode): Only changed lines are propagated.
|
||||
void SendEntry(int id, const LogVal* const *vals);
|
||||
void SendEntry(int id, const threading::Value* const *vals);
|
||||
void EndCurrentSend(int id);
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,54 @@
|
|||
// See the file "COPYING" in the main distribution directory for copyright.
|
||||
|
||||
#ifndef INPUT_READERFRONTEND_H
|
||||
#define INPUT_READERFRONTEND_H
|
||||
|
||||
#include "Manager.h"
|
||||
|
||||
#include "threading/MsgThread.h"
|
||||
|
||||
namespace input {
|
||||
|
||||
class ReaderBackend;
|
||||
|
||||
class ReaderFrontend {
|
||||
public:
|
||||
ReaderFrontend(bro_int_t type);
|
||||
|
||||
virtual ~ReaderFrontend();
|
||||
|
||||
void Init(string arg_source);
|
||||
|
||||
void Update();
|
||||
|
||||
void AddFilter( int id, int arg_num_fields, const threading::Field* const* fields );
|
||||
|
||||
void Finish();
|
||||
|
||||
/**
|
||||
* Returns a descriptive name for the reader, including the type of
|
||||
* the backend and the source used.
|
||||
*
|
||||
* This method is safe to call from any thread.
|
||||
*/
|
||||
string Name() const;
|
||||
|
||||
|
||||
protected:
|
||||
friend class Manager;
|
||||
|
||||
const string Source() const { return source; }
|
||||
|
||||
string ty_name; // Name of the backend type. Set by the manager.
|
||||
|
||||
private:
|
||||
string source;
|
||||
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
|
||||
#endif /* INPUT_READERFRONTEND_H */
|
||||
|
||||
|
|
@ -1,13 +1,15 @@
|
|||
// See the file "COPYING" in the main distribution directory for copyright.
|
||||
|
||||
#ifndef INPUTREADERASCII_H
|
||||
#define INPUTREADERASCII_H
|
||||
#ifndef INPUT_READERS_ASCII_H
|
||||
#define INPUT_READERS_ASCII_H
|
||||
|
||||
#include "InputReader.h"
|
||||
#include <fstream>
|
||||
#include <iostream>
|
||||
#include <vector>
|
||||
|
||||
#include "../ReaderBackend.h"
|
||||
|
||||
namespace input { namespace reader {
|
||||
|
||||
// Description for input field mapping
|
||||
struct FieldMapping {
|
||||
string name;
|
||||
|
@ -28,18 +30,18 @@ struct FieldMapping {
|
|||
};
|
||||
|
||||
|
||||
class InputReaderAscii : public InputReader {
|
||||
class Ascii : public ReaderBackend {
|
||||
public:
|
||||
InputReaderAscii();
|
||||
~InputReaderAscii();
|
||||
Ascii(ReaderFrontend* frontend);
|
||||
~Ascii();
|
||||
|
||||
static InputReader* Instantiate() { return new InputReaderAscii; }
|
||||
static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Ascii(frontend); }
|
||||
|
||||
protected:
|
||||
|
||||
virtual bool DoInit(string path);
|
||||
|
||||
virtual bool DoAddFilter( int id, int arg_num_fields, const LogField* const* fields );
|
||||
virtual bool DoAddFilter( int id, int arg_num_fields, const threading::Field* const* fields );
|
||||
|
||||
virtual bool DoRemoveFilter ( int id );
|
||||
|
||||
|
@ -52,7 +54,7 @@ private:
|
|||
struct Filter {
|
||||
unsigned int num_fields;
|
||||
|
||||
const LogField* const * fields; // raw mapping
|
||||
const threading::Field* const * fields; // raw mapping
|
||||
|
||||
// map columns in the file to columns to send back to the manager
|
||||
vector<FieldMapping> columnMap;
|
||||
|
@ -64,7 +66,7 @@ private:
|
|||
TransportProto StringToProto(const string &proto);
|
||||
|
||||
bool ReadHeader();
|
||||
LogVal* EntryToVal(string s, FieldMapping type);
|
||||
threading::Value* EntryToVal(string s, FieldMapping type);
|
||||
|
||||
bool GetLine(string& str);
|
||||
|
||||
|
@ -85,4 +87,7 @@ private:
|
|||
};
|
||||
|
||||
|
||||
#endif /* INPUTREADERASCII_H */
|
||||
}
|
||||
}
|
||||
|
||||
#endif /* INPUT_READERS_ASCII_H */
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
#include "writers/Ascii.h"
|
||||
#include "writers/None.h"
|
||||
|
||||
#include "threading/SerializationTypes.h"
|
||||
#include "../threading/SerializationTypes.h"
|
||||
|
||||
using namespace logging;
|
||||
using threading::Value;
|
||||
|
|
|
@ -12,7 +12,8 @@ bool Field::Read(SerializationFormat* fmt)
|
|||
int t;
|
||||
int st;
|
||||
|
||||
bool success = (fmt->Read(&name, "name") && fmt->Read(&t, "type") && fmt->Read(&st, "subtype") );
|
||||
bool success = (fmt->Read(&name, "name") && fmt->Read(&secondary_name, "secondary_name") &&
|
||||
fmt->Read(&t, "type") && fmt->Read(&st, "subtype") );
|
||||
type = (TypeTag) t;
|
||||
subtype = (TypeTag) st;
|
||||
|
||||
|
@ -21,7 +22,8 @@ bool Field::Read(SerializationFormat* fmt)
|
|||
|
||||
bool Field::Write(SerializationFormat* fmt) const
|
||||
{
|
||||
return (fmt->Write(name, "name") && fmt->Write((int)type, "type") && fmt->Write((int)subtype, "subtype"));
|
||||
return (fmt->Write(name, "name") && fmt->Write(secondary_name, "secondary_name") && fmt->Write((int)type, "type") &&
|
||||
fmt->Write((int)subtype, "subtype"));
|
||||
}
|
||||
|
||||
Value::~Value()
|
||||
|
|
|
@ -13,6 +13,8 @@ namespace threading {
|
|||
*/
|
||||
struct Field {
|
||||
string name; //! Name of the field.
|
||||
// needed by input framework. port fields have two names (one for the port, one for the type) - this specifies the secondary name.
|
||||
string secondary_name;
|
||||
TypeTag type; //! Type of the field.
|
||||
TypeTag subtype; //! Inner type for sets.
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue