mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 14:48:21 +00:00
start reworking input framework...
does not compile at the moment, but there are a few uncommitted changes that will be reverted in the next commit.
This commit is contained in:
parent
988f859761
commit
e2c521fc4e
6 changed files with 208 additions and 217 deletions
|
@ -4,30 +4,36 @@ module Input;
|
||||||
export {
|
export {
|
||||||
const default_reader = READER_ASCII &redef;
|
const default_reader = READER_ASCII &redef;
|
||||||
|
|
||||||
type ReaderDescription: record {
|
type StreamDescription: record {
|
||||||
source: string;
|
source: string;
|
||||||
idx: any;
|
|
||||||
val: any;
|
|
||||||
destination: any;
|
|
||||||
want_record: bool &default=T;
|
|
||||||
reader: Reader &default=default_reader;
|
reader: Reader &default=default_reader;
|
||||||
};
|
};
|
||||||
|
|
||||||
type Filter: record {
|
type Filter: record {
|
||||||
name: string;
|
|
||||||
## descriptive name. for later removal
|
## descriptive name. for later removal
|
||||||
|
name: string;
|
||||||
|
|
||||||
|
## for tables
|
||||||
|
idx: any &optional;
|
||||||
|
val: any &optional;
|
||||||
|
destination: any &optional;
|
||||||
|
want_record: bool &default=T;
|
||||||
|
table_ev: any &optional; # event containing idx, val as values.
|
||||||
|
|
||||||
|
## decision function, that decides if an insertion, update or removal should really be executed.
|
||||||
|
## or events should be thought
|
||||||
pred: function(typ: Input::Event, left: any, right: any): bool &optional;
|
pred: function(typ: Input::Event, left: any, right: any): bool &optional;
|
||||||
## decision function, that decides if an inserton, update or removal should really be executed
|
|
||||||
|
## for "normalized" events
|
||||||
|
ev: any &optional;
|
||||||
|
ev_description: any &optional;
|
||||||
};
|
};
|
||||||
|
|
||||||
const no_filter: Filter = [$name="<not found>"]; # Sentinel.
|
const no_filter: Filter = [$name="<not found>"]; # Sentinel.
|
||||||
|
|
||||||
global create_reader: function(id: Log::ID, description: Input::ReaderDescription) : bool;
|
global create_stream: function(id: Log::ID, description: Input::ReaderDescription) : bool;
|
||||||
global remove_reader: function(id: Log::ID) : bool;
|
global remove_stream: function(id: Log::ID) : bool;
|
||||||
global force_update: function(id: Log::ID) : bool;
|
global force_update: function(id: Log::ID) : bool;
|
||||||
global add_event: function(id: Log::ID, name: string) : bool;
|
|
||||||
global remove_event: function(id: Log::ID, name: string) : bool;
|
|
||||||
global add_filter: function(id: Log::ID, filter: Input::Filter) : bool;
|
global add_filter: function(id: Log::ID, filter: Input::Filter) : bool;
|
||||||
global remove_filter: function(id: Log::ID, name: string) : bool;
|
global remove_filter: function(id: Log::ID, name: string) : bool;
|
||||||
global get_filter: function(id: ID, name: string) : Filter;
|
global get_filter: function(id: ID, name: string) : Filter;
|
||||||
|
@ -41,14 +47,14 @@ module Input;
|
||||||
|
|
||||||
global filters: table[ID, string] of Filter;
|
global filters: table[ID, string] of Filter;
|
||||||
|
|
||||||
function create_reader(id: Log::ID, description: Input::ReaderDescription) : bool
|
function create_stream(id: Log::ID, description: Input::ReaderDescription) : bool
|
||||||
{
|
{
|
||||||
return __create_reader(id, description);
|
return __create_stream(id, description);
|
||||||
}
|
}
|
||||||
|
|
||||||
function remove_reader(id: Log::ID) : bool
|
function remove_stream(id: Log::ID) : bool
|
||||||
{
|
{
|
||||||
return __remove_reader(id);
|
return __remove_stream(id);
|
||||||
}
|
}
|
||||||
|
|
||||||
function force_update(id: Log::ID) : bool
|
function force_update(id: Log::ID) : bool
|
||||||
|
@ -56,16 +62,6 @@ function force_update(id: Log::ID) : bool
|
||||||
return __force_update(id);
|
return __force_update(id);
|
||||||
}
|
}
|
||||||
|
|
||||||
function add_event(id: Log::ID, name: string) : bool
|
|
||||||
{
|
|
||||||
return __add_event(id, name);
|
|
||||||
}
|
|
||||||
|
|
||||||
function remove_event(id: Log::ID, name: string) : bool
|
|
||||||
{
|
|
||||||
return __remove_event(id, name);
|
|
||||||
}
|
|
||||||
|
|
||||||
function add_filter(id: Log::ID, filter: Input::Filter) : bool
|
function add_filter(id: Log::ID, filter: Input::Filter) : bool
|
||||||
{
|
{
|
||||||
filters[id, filter$name] = filter;
|
filters[id, filter$name] = filter;
|
||||||
|
|
263
src/InputMgr.cc
263
src/InputMgr.cc
|
@ -27,22 +27,11 @@ declare(PDict, InputHash);
|
||||||
struct InputMgr::Filter {
|
struct InputMgr::Filter {
|
||||||
EnumVal* id;
|
EnumVal* id;
|
||||||
string name;
|
string name;
|
||||||
Func* pred;
|
|
||||||
|
|
||||||
~Filter();
|
|
||||||
};
|
|
||||||
|
|
||||||
InputMgr::Filter::~Filter() {
|
|
||||||
Unref(id);
|
|
||||||
}
|
|
||||||
|
|
||||||
struct InputMgr::ReaderInfo {
|
|
||||||
EnumVal* id;
|
|
||||||
EnumVal* type;
|
|
||||||
InputReader* reader;
|
|
||||||
unsigned int num_idx_fields;
|
unsigned int num_idx_fields;
|
||||||
unsigned int num_val_fields;
|
unsigned int num_val_fields;
|
||||||
bool want_record;
|
bool want_record;
|
||||||
|
EventHandlerPtr table_event;
|
||||||
|
|
||||||
TableVal* tab;
|
TableVal* tab;
|
||||||
RecordType* rtype;
|
RecordType* rtype;
|
||||||
|
@ -50,18 +39,42 @@ struct InputMgr::ReaderInfo {
|
||||||
|
|
||||||
PDict(InputHash)* currDict;
|
PDict(InputHash)* currDict;
|
||||||
PDict(InputHash)* lastDict;
|
PDict(InputHash)* lastDict;
|
||||||
|
|
||||||
list<string> events; // events we fire when "something" happens
|
Func* pred;
|
||||||
list<InputMgr::Filter> filters; // filters that can prevent our actions
|
|
||||||
|
EventHandlerPtr event;
|
||||||
|
RecordType* event_type;
|
||||||
|
|
||||||
|
~Filter();
|
||||||
|
};
|
||||||
|
|
||||||
|
InputMgr::Filter::~Filter() {
|
||||||
|
Unref(id);
|
||||||
|
if ( tab )
|
||||||
|
Unref(tab);
|
||||||
|
if ( itype )
|
||||||
|
Unref(itype);
|
||||||
|
if ( rtype )
|
||||||
|
Unref(rtype);
|
||||||
|
if ( event_type)
|
||||||
|
Unref(event_type);
|
||||||
|
}
|
||||||
|
|
||||||
|
struct InputMgr::ReaderInfo {
|
||||||
|
EnumVal* id;
|
||||||
|
EnumVal* type;
|
||||||
|
InputReader* reader;
|
||||||
|
|
||||||
|
//list<string> events; // events we fire when "something" happens
|
||||||
|
map<int, InputMgr::Filter> filters; // filters that can prevent our actions
|
||||||
|
|
||||||
~ReaderInfo();
|
~ReaderInfo();
|
||||||
};
|
};
|
||||||
|
|
||||||
InputMgr::ReaderInfo::~ReaderInfo() {
|
InputMgr::ReaderInfo::~ReaderInfo() {
|
||||||
|
// all the contents of filters should delete themselves automatically...
|
||||||
|
|
||||||
Unref(type);
|
Unref(type);
|
||||||
Unref(tab);
|
|
||||||
Unref(itype);
|
|
||||||
Unref(rtype);
|
|
||||||
Unref(id);
|
Unref(id);
|
||||||
|
|
||||||
delete(reader);
|
delete(reader);
|
||||||
|
@ -86,14 +99,14 @@ InputMgr::InputMgr()
|
||||||
}
|
}
|
||||||
|
|
||||||
// create a new input reader object to be used at whomevers leisure lateron.
|
// create a new input reader object to be used at whomevers leisure lateron.
|
||||||
InputReader* InputMgr::CreateReader(EnumVal* id, RecordVal* description)
|
InputReader* InputMgr::CreateStream(EnumVal* id, RecordVal* description)
|
||||||
{
|
{
|
||||||
InputReaderDefinition* ir = input_readers;
|
InputReaderDefinition* ir = input_readers;
|
||||||
|
|
||||||
RecordType* rtype = description->Type()->AsRecordType();
|
RecordType* rtype = description->Type()->AsRecordType();
|
||||||
if ( ! same_type(rtype, BifType::Record::Input::ReaderDescription, 0) )
|
if ( ! same_type(rtype, BifType::Record::Input::StreamDescription, 0) )
|
||||||
{
|
{
|
||||||
reporter->Error("readerDescription argument not of right type");
|
reporter->Error("Streamdescription argument not of right type");
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -145,55 +158,15 @@ InputReader* InputMgr::CreateReader(EnumVal* id, RecordVal* description)
|
||||||
const BroString* bsource = description->Lookup(rtype->FieldOffset("source"))->AsString();
|
const BroString* bsource = description->Lookup(rtype->FieldOffset("source"))->AsString();
|
||||||
string source((const char*) bsource->Bytes(), bsource->Len());
|
string source((const char*) bsource->Bytes(), bsource->Len());
|
||||||
|
|
||||||
RecordType *idx = description->Lookup(rtype->FieldOffset("idx"))->AsType()->AsTypeType()->Type()->AsRecordType();
|
|
||||||
RecordType *val = description->Lookup(rtype->FieldOffset("val"))->AsType()->AsTypeType()->Type()->AsRecordType();
|
|
||||||
TableVal *dst = description->Lookup(rtype->FieldOffset("destination"))->AsTableVal();
|
|
||||||
|
|
||||||
|
|
||||||
vector<LogField*> fieldsV; // vector, because we don't know the length beforehands
|
|
||||||
|
|
||||||
|
|
||||||
bool status = !UnrollRecordType(&fieldsV, idx, "");
|
|
||||||
|
|
||||||
int idxfields = fieldsV.size();
|
|
||||||
|
|
||||||
status = status || !UnrollRecordType(&fieldsV, val, "");
|
|
||||||
int valfields = fieldsV.size() - idxfields;
|
|
||||||
|
|
||||||
if ( status ) {
|
|
||||||
reporter->Error("Problem unrolling");
|
|
||||||
Unref(reader);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
Val *want_record = description->LookupWithDefault(rtype->FieldOffset("want_record"));
|
|
||||||
|
|
||||||
LogField** fields = new LogField*[fieldsV.size()];
|
|
||||||
for ( unsigned int i = 0; i < fieldsV.size(); i++ ) {
|
|
||||||
fields[i] = fieldsV[i];
|
|
||||||
}
|
|
||||||
|
|
||||||
ReaderInfo* info = new ReaderInfo;
|
ReaderInfo* info = new ReaderInfo;
|
||||||
info->reader = reader_obj;
|
info->reader = reader_obj;
|
||||||
info->type = reader->AsEnumVal(); // ref'd by lookupwithdefault
|
info->type = reader->AsEnumVal(); // ref'd by lookupwithdefault
|
||||||
info->num_idx_fields = idxfields;
|
|
||||||
info->num_val_fields = valfields;
|
|
||||||
info->tab = dst->Ref()->AsTableVal();
|
|
||||||
info->rtype = val->Ref()->AsRecordType();
|
|
||||||
info->id = id->Ref()->AsEnumVal();
|
info->id = id->Ref()->AsEnumVal();
|
||||||
info->itype = idx->Ref()->AsRecordType();
|
|
||||||
info->currDict = new PDict(InputHash);
|
|
||||||
info->lastDict = new PDict(InputHash);
|
|
||||||
info->want_record = ( want_record->InternalInt() == 1 );
|
|
||||||
Unref(want_record); // ref'd by lookupwithdefault
|
|
||||||
|
|
||||||
if ( valfields > 1 ) {
|
|
||||||
assert(info->want_record);
|
|
||||||
}
|
|
||||||
|
|
||||||
readers.push_back(info);
|
readers.push_back(info);
|
||||||
|
|
||||||
int success = reader_obj->Init(source, fieldsV.size(), idxfields, fields);
|
int success = reader_obj->Init(source);
|
||||||
if ( success == false ) {
|
if ( success == false ) {
|
||||||
assert( RemoveReader(id) );
|
assert( RemoveReader(id) );
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -208,6 +181,86 @@ InputReader* InputMgr::CreateReader(EnumVal* id, RecordVal* description)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool InputMgr::AddFilter(EnumVal *id, RecordVal* fval) {
|
||||||
|
ReaderInfo *i = FindReader(id);
|
||||||
|
if ( i == 0 ) {
|
||||||
|
reporter->Error("Stream not found");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
RecordType* rtype = fval->Type()->AsRecordType();
|
||||||
|
if ( ! same_type(rtype, BifType::Record::Input::Filter, 0) )
|
||||||
|
{
|
||||||
|
reporter->Error("filter argument not of right type");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
Val* name = fval->Lookup(rtype->FieldOffset("name"));
|
||||||
|
Val* pred = fval->Lookup(rtype->FieldOffset("pred"));
|
||||||
|
|
||||||
|
RecordType *idx = fval->Lookup(rtype->FieldOffset("idx"))->AsType()->AsTypeType()->Type()->AsRecordType();
|
||||||
|
RecordType *val = fval->Lookup(rtype->FieldOffset("val"))->AsType()->AsTypeType()->Type()->AsRecordType();
|
||||||
|
TableVal *dst = fval->Lookup(rtype->FieldOffset("destination"))->AsTableVal();
|
||||||
|
|
||||||
|
vector<LogField*> fieldsV; // vector, because we don't know the length beforehands
|
||||||
|
|
||||||
|
bool status = !UnrollRecordType(&fieldsV, idx, "");
|
||||||
|
|
||||||
|
int idxfields = fieldsV.size();
|
||||||
|
|
||||||
|
status = status || !UnrollRecordType(&fieldsV, val, "");
|
||||||
|
int valfields = fieldsV.size() - idxfields;
|
||||||
|
|
||||||
|
if ( status ) {
|
||||||
|
reporter->Error("Problem unrolling");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
Val *want_record = fval->LookupWithDefault(rtype->FieldOffset("want_record"));
|
||||||
|
|
||||||
|
LogField** fields = new LogField*[fieldsV.size()];
|
||||||
|
for ( unsigned int i = 0; i < fieldsV.size(); i++ ) {
|
||||||
|
fields[i] = fieldsV[i];
|
||||||
|
}
|
||||||
|
|
||||||
|
Filter filter;
|
||||||
|
filter.name = name->AsString()->CheckString();
|
||||||
|
filter.id = id->Ref()->AsEnumVal();
|
||||||
|
filter.pred = pred ? pred->AsFunc() : 0;
|
||||||
|
filter.num_idx_fields = idxfields;
|
||||||
|
filter.num_val_fields = valfields;
|
||||||
|
filter.tab = dst ? dst->Ref()->AsTableVal() : 0;
|
||||||
|
filter.rtype = rtype ? val->Ref()->AsRecordType() : 0;
|
||||||
|
filter.itype = itype ? idx->Ref()->AsRecordType() : 0;
|
||||||
|
// ya - well - we actually don't need them in every case... well, a few bytes of memory wasted
|
||||||
|
filter.currDict = new PDict(InputHash);
|
||||||
|
filter.lastDict = new PDict(InputHash);
|
||||||
|
filter.want_record = ( want_record->InternalInt() == 1 );
|
||||||
|
Unref(want_record); // ref'd by lookupwithdefault
|
||||||
|
|
||||||
|
if ( valfields > 1 ) {
|
||||||
|
assert(info->want_record);
|
||||||
|
}
|
||||||
|
|
||||||
|
i->filters[id->InternalInt()] = filter;
|
||||||
|
|
||||||
|
// ok, now we have to alert the reader of our new filter with our funky new fields
|
||||||
|
// the id is handled in a ... well, to be honest, a little bit sneaky manner.
|
||||||
|
// the "problem" is, that we can have several filters in the reader for one filter in the log manager.
|
||||||
|
// this is due to the fact, that a filter can either output it's result as a table, as an event...
|
||||||
|
// ...or as an table _and_ an event. And... if we have a table and an event, we actually need two different sets
|
||||||
|
// of filters in the reader, because the fields for the table and the event may differ and I absolutely do not want
|
||||||
|
// to build a union of these values and figure it out later.
|
||||||
|
// hence -> filter id is multiplicated with 2.
|
||||||
|
// filterId*2 -> results for table
|
||||||
|
// filterId*2+1 -> results for event
|
||||||
|
i->AddFilter( id->InternalInt() * 2, fieldsV.size(), idxfields, fields );
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
bool InputMgr::IsCompatibleType(BroType* t, bool atomic_only)
|
bool InputMgr::IsCompatibleType(BroType* t, bool atomic_only)
|
||||||
{
|
{
|
||||||
if ( ! t )
|
if ( ! t )
|
||||||
|
@ -258,7 +311,7 @@ bool InputMgr::IsCompatibleType(BroType* t, bool atomic_only)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
bool InputMgr::RemoveReader(const EnumVal* id) {
|
bool InputMgr::RemoveStream(const EnumVal* id) {
|
||||||
ReaderInfo *i = 0;
|
ReaderInfo *i = 0;
|
||||||
for ( vector<ReaderInfo *>::iterator s = readers.begin(); s != readers.end(); ++s )
|
for ( vector<ReaderInfo *>::iterator s = readers.begin(); s != readers.end(); ++s )
|
||||||
{
|
{
|
||||||
|
@ -281,42 +334,6 @@ bool InputMgr::RemoveReader(const EnumVal* id) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool InputMgr::RegisterEvent(const EnumVal* id, string eventName) {
|
|
||||||
ReaderInfo *i = FindReader(id);
|
|
||||||
if ( i == 0 ) {
|
|
||||||
reporter->InternalError("Reader not found");
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
i->events.push_back(eventName);
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
// remove first event with name eventName
|
|
||||||
// (though there shouldn't really be several events with the same name...
|
|
||||||
bool InputMgr::UnregisterEvent(const EnumVal* id, string eventName) {
|
|
||||||
ReaderInfo *i = FindReader(id);
|
|
||||||
if ( i == 0 ) {
|
|
||||||
reporter->InternalError("Reader not found");
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
std::list<string>::iterator it = i->events.begin();
|
|
||||||
while ( it != i->events.end() )
|
|
||||||
{
|
|
||||||
if ( *it == eventName ) {
|
|
||||||
it = i->events.erase(it);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
++it;
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
bool InputMgr::UnrollRecordType(vector<LogField*> *fields, const RecordType *rec, const string& nameprepend) {
|
bool InputMgr::UnrollRecordType(vector<LogField*> *fields, const RecordType *rec, const string& nameprepend) {
|
||||||
for ( int i = 0; i < rec->NumFields(); i++ )
|
for ( int i = 0; i < rec->NumFields(); i++ )
|
||||||
{
|
{
|
||||||
|
@ -363,34 +380,6 @@ bool InputMgr::ForceUpdate(const EnumVal* id)
|
||||||
return i->reader->Update();
|
return i->reader->Update();
|
||||||
}
|
}
|
||||||
|
|
||||||
bool InputMgr::AddFilter(EnumVal *id, RecordVal* fval) {
|
|
||||||
ReaderInfo *i = FindReader(id);
|
|
||||||
if ( i == 0 ) {
|
|
||||||
reporter->Error("Reader not found");
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
RecordType* rtype = fval->Type()->AsRecordType();
|
|
||||||
if ( ! same_type(rtype, BifType::Record::Input::Filter, 0) )
|
|
||||||
{
|
|
||||||
reporter->Error("filter argument not of right type");
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
Val* name = fval->Lookup(rtype->FieldOffset("name"));
|
|
||||||
Val* pred = fval->Lookup(rtype->FieldOffset("pred"));
|
|
||||||
|
|
||||||
Filter filter;
|
|
||||||
filter.name = name->AsString()->CheckString();
|
|
||||||
filter.id = id->Ref()->AsEnumVal();
|
|
||||||
filter.pred = pred ? pred->AsFunc() : 0;
|
|
||||||
|
|
||||||
i->filters.push_back(filter);
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool InputMgr::RemoveFilter(EnumVal* id, const string &name) {
|
bool InputMgr::RemoveFilter(EnumVal* id, const string &name) {
|
||||||
ReaderInfo *i = FindReader(id);
|
ReaderInfo *i = FindReader(id);
|
||||||
if ( i == 0 ) {
|
if ( i == 0 ) {
|
||||||
|
@ -398,7 +387,7 @@ bool InputMgr::RemoveFilter(EnumVal* id, const string &name) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
std::list<InputMgr::Filter>::iterator it = i->filters.begin();
|
std::list<InputMgr::Filter>::iterator it = i->filters.begin();
|
||||||
while ( it != i->filters.end() )
|
while ( it != i->filters.end() )
|
||||||
{
|
{
|
||||||
|
@ -410,8 +399,15 @@ bool InputMgr::RemoveFilter(EnumVal* id, const string &name) {
|
||||||
else
|
else
|
||||||
++it;
|
++it;
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
return false;;
|
map<int, InputMgr::Filter>::iterator it = i->filters.find(id->InternalInt());
|
||||||
|
if ( it == i->filters.end() ) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
it->filters.erase(it);
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -444,7 +440,7 @@ Val* InputMgr::LogValToIndexVal(int num_fields, const RecordType *type, const Lo
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void InputMgr::SendEntry(const InputReader* reader, const LogVal* const *vals) {
|
void InputMgr::SendEntry(const InputReader* reader, int id, const LogVal* const *vals) {
|
||||||
ReaderInfo *i = FindReader(reader);
|
ReaderInfo *i = FindReader(reader);
|
||||||
if ( i == 0 ) {
|
if ( i == 0 ) {
|
||||||
reporter->InternalError("Unknown reader");
|
reporter->InternalError("Unknown reader");
|
||||||
|
@ -605,7 +601,7 @@ void InputMgr::SendEntry(const InputReader* reader, const LogVal* const *vals) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void InputMgr::EndCurrentSend(const InputReader* reader) {
|
void InputMgr::EndCurrentSend(const InputReader* reader, int id) {
|
||||||
ReaderInfo *i = FindReader(reader);
|
ReaderInfo *i = FindReader(reader);
|
||||||
if ( i == 0 ) {
|
if ( i == 0 ) {
|
||||||
reporter->InternalError("Unknown reader");
|
reporter->InternalError("Unknown reader");
|
||||||
|
@ -693,7 +689,7 @@ void InputMgr::EndCurrentSend(const InputReader* reader) {
|
||||||
i->currDict = new PDict(InputHash);
|
i->currDict = new PDict(InputHash);
|
||||||
}
|
}
|
||||||
|
|
||||||
void InputMgr::Put(const InputReader* reader, const LogVal* const *vals) {
|
void InputMgr::Put(const InputReader* reader, int id, const LogVal* const *vals) {
|
||||||
ReaderInfo *i = FindReader(reader);
|
ReaderInfo *i = FindReader(reader);
|
||||||
if ( i == 0 ) {
|
if ( i == 0 ) {
|
||||||
reporter->InternalError("Unknown reader");
|
reporter->InternalError("Unknown reader");
|
||||||
|
@ -733,7 +729,7 @@ void InputMgr::Put(const InputReader* reader, const LogVal* const *vals) {
|
||||||
i->tab->Assign(idxval, valval);
|
i->tab->Assign(idxval, valval);
|
||||||
}
|
}
|
||||||
|
|
||||||
void InputMgr::Clear(const InputReader* reader) {
|
void InputMgr::Clear(const InputReader* reader, int id) {
|
||||||
ReaderInfo *i = FindReader(reader);
|
ReaderInfo *i = FindReader(reader);
|
||||||
if ( i == 0 ) {
|
if ( i == 0 ) {
|
||||||
reporter->InternalError("Unknown reader");
|
reporter->InternalError("Unknown reader");
|
||||||
|
@ -873,7 +869,6 @@ int InputMgr::GetLogValLength(const LogVal* val) {
|
||||||
case TYPE_VECTOR: {
|
case TYPE_VECTOR: {
|
||||||
int j = val->val.vector_val.size;
|
int j = val->val.vector_val.size;
|
||||||
for ( int i = 0; i < j; i++ ) {
|
for ( int i = 0; i < j; i++ ) {
|
||||||
reporter->Error("size is %d", val->val.vector_val.size);
|
|
||||||
length += GetLogValLength(val->val.vector_val.vals[i]);
|
length += GetLogValLength(val->val.vector_val.vals[i]);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -20,11 +20,9 @@ class InputMgr {
|
||||||
public:
|
public:
|
||||||
InputMgr();
|
InputMgr();
|
||||||
|
|
||||||
InputReader* CreateReader(EnumVal* id, RecordVal* description);
|
InputReader* CreateStream(EnumVal* id, RecordVal* description);
|
||||||
bool ForceUpdate(const EnumVal* id);
|
bool ForceUpdate(const EnumVal* id);
|
||||||
bool RemoveReader(const EnumVal* id);
|
bool RemoveStream(const EnumVal* id);
|
||||||
bool RegisterEvent(const EnumVal* id, string eventName);
|
|
||||||
bool UnregisterEvent(const EnumVal* id, string eventName);
|
|
||||||
|
|
||||||
bool AddFilter(EnumVal *id, RecordVal* filter);
|
bool AddFilter(EnumVal *id, RecordVal* filter);
|
||||||
bool RemoveFilter(EnumVal* id, const string &name);
|
bool RemoveFilter(EnumVal* id, const string &name);
|
||||||
|
@ -36,12 +34,14 @@ protected:
|
||||||
// Reports an error for the given reader.
|
// Reports an error for the given reader.
|
||||||
void Error(InputReader* reader, const char* msg);
|
void Error(InputReader* reader, const char* msg);
|
||||||
|
|
||||||
void Put(const InputReader* reader, const LogVal* const *vals);
|
// for readers to write to input stream in direct mode (reporting new/deleted values directly)
|
||||||
void Clear(const InputReader* reader);
|
void Put(const InputReader* reader, int id. const LogVal* const *vals);
|
||||||
bool Delete(const InputReader* reader, const LogVal* const *vals);
|
void Clear(const InputReader* reader, int id);
|
||||||
|
bool Delete(const InputReader* reader, int id, const LogVal* const *vals);
|
||||||
|
|
||||||
void SendEntry(const InputReader* reader, const LogVal* const *vals);
|
// for readers to write to input stream in indirect mode (manager is monitoring new/deleted values)
|
||||||
void EndCurrentSend(const InputReader* reader);
|
void SendEntry(const InputReader* reader, int id, const LogVal* const *vals);
|
||||||
|
void EndCurrentSend(const InputReader* reader, int id);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
struct ReaderInfo;
|
struct ReaderInfo;
|
||||||
|
|
|
@ -24,35 +24,42 @@ void InputReader::Error(const string &msg)
|
||||||
input_mgr->Error(this, msg.c_str());
|
input_mgr->Error(this, msg.c_str());
|
||||||
}
|
}
|
||||||
|
|
||||||
void InputReader::Put(const LogVal* const *val)
|
void InputReader::Put(int id, const LogVal* const *val)
|
||||||
{
|
{
|
||||||
input_mgr->Put(this, val);
|
input_mgr->Put(this, int id, val);
|
||||||
}
|
}
|
||||||
|
|
||||||
void InputReader::Clear()
|
void InputReader::Clear(int id)
|
||||||
{
|
{
|
||||||
input_mgr->Clear(this);
|
input_mgr->Clear(this, int id);
|
||||||
}
|
}
|
||||||
|
|
||||||
void InputReader::Delete(const LogVal* const *val)
|
void InputReader::Delete(int id, const LogVal* const *val)
|
||||||
{
|
{
|
||||||
input_mgr->Delete(this, val);
|
input_mgr->Delete(this, int id, val);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
bool InputReader::Init(string arg_source, int arg_num_fields, int arg_idx_fields,
|
bool InputReader::Init(string arg_source)
|
||||||
const LogField* const * arg_fields)
|
|
||||||
{
|
{
|
||||||
source = arg_source;
|
source = arg_source;
|
||||||
num_fields = arg_num_fields;
|
|
||||||
index_fields = arg_idx_fields;
|
|
||||||
fields = arg_fields;
|
|
||||||
|
|
||||||
// disable if DoInit returns error.
|
// disable if DoInit returns error.
|
||||||
disabled = !DoInit(arg_source, arg_num_fields, arg_idx_fields, arg_fields);
|
disabled = !DoInit(arg_source);
|
||||||
return !disabled;
|
return !disabled;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool InputReader::AddFilter(int id, int arg_num_fields,
|
||||||
|
const LogField* const * arg_fields)
|
||||||
|
{
|
||||||
|
return DoAddFilter(int id, arg_num_fields, arg_fields);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool InputReader::RemoveFilter(int id)
|
||||||
|
{
|
||||||
|
return DoRemoveFilter(int id);
|
||||||
|
}
|
||||||
|
|
||||||
void InputReader::Finish()
|
void InputReader::Finish()
|
||||||
{
|
{
|
||||||
DoFinish();
|
DoFinish();
|
||||||
|
@ -96,12 +103,12 @@ const char* InputReader::Fmt(const char* format, ...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void InputReader::SendEntry(const LogVal* const *vals)
|
void InputReader::SendEntry(int id, const LogVal* const *vals)
|
||||||
{
|
{
|
||||||
input_mgr->SendEntry(this, vals);
|
input_mgr->SendEntry(this, int id, vals);
|
||||||
}
|
}
|
||||||
|
|
||||||
void InputReader::EndCurrentSend()
|
void InputReader::EndCurrentSend(int id)
|
||||||
{
|
{
|
||||||
input_mgr->EndCurrentSend(this);
|
input_mgr->EndCurrentSend(this, int id);
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,7 +15,11 @@ public:
|
||||||
InputReader();
|
InputReader();
|
||||||
virtual ~InputReader();
|
virtual ~InputReader();
|
||||||
|
|
||||||
bool Init(string arg_source, int arg_num_fields, int arg_idx_fields, const LogField* const* fields);
|
bool Init(string arg_source);
|
||||||
|
|
||||||
|
bool AddFilter( int id, int arg_num_fields, const LogField* const* fields );
|
||||||
|
|
||||||
|
bool RemoveFilter ( int id );
|
||||||
|
|
||||||
void Finish();
|
void Finish();
|
||||||
|
|
||||||
|
@ -23,8 +27,11 @@ public:
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
// Methods that have to be overwritten by the individual readers
|
// Methods that have to be overwritten by the individual readers
|
||||||
virtual bool DoInit(string arg_source, int arg_num_fields, int arg_idx_fields, const LogField* const * fields) = 0;
|
virtual bool DoInit(string arg_sources) = 0;
|
||||||
|
|
||||||
|
virtual bool DoAddFilter( int id, int arg_num_fields, const LogField* const* fields ) = 0;
|
||||||
|
virtual bool DoRemoveFilter( int id );
|
||||||
|
|
||||||
virtual void DoFinish() = 0;
|
virtual void DoFinish() = 0;
|
||||||
|
|
||||||
// update file contents to logmgr
|
// update file contents to logmgr
|
||||||
|
@ -42,28 +49,26 @@ protected:
|
||||||
|
|
||||||
//void SendEvent(const string& name, const int num_vals, const LogVal* const *vals);
|
//void SendEvent(const string& name, const int num_vals, const LogVal* const *vals);
|
||||||
|
|
||||||
void Put(const LogVal* const *val);
|
// Content-sendinf-functions (simple mode). Including table-specific stuff that simply is not used if we have no table
|
||||||
void Clear();
|
void Put(int id, const LogVal* const *val);
|
||||||
void Delete(const LogVal* const *val);
|
void Delete(int id, const LogVal* const *val);
|
||||||
|
void Clear(int id);
|
||||||
|
|
||||||
void SendEntry(const LogVal* const *vals);
|
// Table-functions (tracking mode): Only changed lines are propagated.
|
||||||
void EndCurrentSend();
|
void SendEntry(int id, const LogVal* const *vals);
|
||||||
|
void EndCurrentSend(int id);
|
||||||
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
friend class InputMgr;
|
friend class InputMgr;
|
||||||
|
|
||||||
string source;
|
string source;
|
||||||
int num_fields;
|
|
||||||
int index_fields;
|
|
||||||
const LogField* const * fields;
|
|
||||||
|
|
||||||
// When an error occurs, this method is called to set a flag marking the
|
// When an error occurs, this method is called to set a flag marking the
|
||||||
// writer as disabled.
|
// writer as disabled.
|
||||||
|
|
||||||
bool disabled;
|
bool disabled;
|
||||||
|
bool Disabled() { return disabled; }
|
||||||
bool Disabled() { return disabled; }
|
|
||||||
|
|
||||||
// For implementing Fmt().
|
// For implementing Fmt().
|
||||||
char* buf;
|
char* buf;
|
||||||
|
|
|
@ -7,18 +7,18 @@ module Input;
|
||||||
#include "NetVar.h"
|
#include "NetVar.h"
|
||||||
%%}
|
%%}
|
||||||
|
|
||||||
type ReaderDescription: record;
|
type StreamDescription: record;
|
||||||
type Filter: record;
|
type Filter: record;
|
||||||
|
|
||||||
function Input::__create_reader%(id: Log::ID, description: Input::ReaderDescription%) : bool
|
function Input::__create_stream%(id: Log::ID, description: Input::StreamDescription%) : bool
|
||||||
%{
|
%{
|
||||||
InputReader *the_reader = input_mgr->CreateReader(id->AsEnumVal(), description->AsRecordVal());
|
InputReader *the_reader = input_mgr->CreateStream(id->AsEnumVal(), description->AsRecordVal());
|
||||||
return new Val( the_reader != 0, TYPE_BOOL );
|
return new Val( the_reader != 0, TYPE_BOOL );
|
||||||
%}
|
%}
|
||||||
|
|
||||||
function Input::__remove_reader%(id: Log::ID%) : bool
|
function Input::__remove_stream%(id: Log::ID%) : bool
|
||||||
%{
|
%{
|
||||||
bool res = input_mgr->RemoveReader(id->AsEnumVal());
|
bool res = input_mgr->RemoveStream(id->AsEnumVal());
|
||||||
return new Val( res, TYPE_BOOL );
|
return new Val( res, TYPE_BOOL );
|
||||||
%}
|
%}
|
||||||
|
|
||||||
|
@ -28,18 +28,6 @@ function Input::__force_update%(id: Log::ID%) : bool
|
||||||
return new Val( res, TYPE_BOOL );
|
return new Val( res, TYPE_BOOL );
|
||||||
%}
|
%}
|
||||||
|
|
||||||
function Input::__add_event%(id: Log::ID, name: string%) : bool
|
|
||||||
%{
|
|
||||||
bool res = input_mgr->RegisterEvent(id->AsEnumVal(), name->AsString()->CheckString());
|
|
||||||
return new Val( res, TYPE_BOOL );
|
|
||||||
%}
|
|
||||||
|
|
||||||
function Input::__remove_event%(id: Log::ID, name: string%) : bool
|
|
||||||
%{
|
|
||||||
bool res = input_mgr->UnregisterEvent(id->AsEnumVal(), name->AsString()->CheckString());
|
|
||||||
return new Val( res, TYPE_BOOL );
|
|
||||||
%}
|
|
||||||
|
|
||||||
function Input::__add_filter%(id: Log::ID, filter: Input::Filter%) : bool
|
function Input::__add_filter%(id: Log::ID, filter: Input::Filter%) : bool
|
||||||
%{
|
%{
|
||||||
bool res = input_mgr->AddFilter(id->AsEnumVal(), filter->AsRecordVal());
|
bool res = input_mgr->AddFilter(id->AsEnumVal(), filter->AsRecordVal());
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue