send events when input entries change

This commit is contained in:
Bernhard Amann 2011-11-03 14:04:13 -07:00
parent b5a77aa77b
commit 4845c3a9a6
5 changed files with 133 additions and 14 deletions

View file

@ -39,6 +39,8 @@ struct InputMgr::ReaderInfo {
PDict(InputHash)* currDict;
PDict(InputHash)* lastDict;
list<string>* events; // events we fire when "something" happens
};
struct InputReaderDefinition {
@ -165,6 +167,7 @@ InputReader* InputMgr::CreateReader(EnumVal* id, RecordVal* description)
info->currDict = new PDict(InputHash);
info->lastDict = new PDict(InputHash);
info->events = new list<string>();
int success = reader_obj->Init(source, fieldsV.size(), idxfields, fields);
if ( success == false ) {
@ -223,7 +226,7 @@ bool InputMgr::IsCompatibleType(BroType* t)
return false;
}
bool InputMgr::RemoveReader(EnumVal* id) {
bool InputMgr::RemoveReader(const EnumVal* id) {
ReaderInfo *i = 0;
for ( vector<ReaderInfo *>::iterator s = readers.begin(); s != readers.end(); ++s )
{
@ -254,6 +257,43 @@ bool InputMgr::RemoveReader(EnumVal* id) {
return true;
}
bool InputMgr::RegisterEvent(const EnumVal* id, string eventName) {
ReaderInfo *i = FindReader(id);
if ( i == 0 ) {
reporter->InternalError("Reader not found");
return false;
}
i->events->push_back(eventName);
return true;
}
bool InputMgr::UnregisterEvent(const EnumVal* id, string eventName) {
ReaderInfo *i = FindReader(id);
if ( i == 0 ) {
reporter->InternalError("Reader not found");
return false;
}
bool erased = false;
std::list<string>::iterator it = i->events->begin();
while ( it != i->events->end() )
{
if ( *it == eventName ) {
it = i->events->erase(it);
erased = true;
}
else
++it;
}
return erased;
}
bool InputMgr::UnrollRecordType(vector<LogField*> *fields, const RecordType *rec, const string& nameprepend) {
for ( int i = 0; i < rec->NumFields(); i++ )
{
@ -286,7 +326,7 @@ bool InputMgr::UnrollRecordType(vector<LogField*> *fields, const RecordType *rec
}
bool InputMgr::ForceUpdate(EnumVal* id)
bool InputMgr::ForceUpdate(const EnumVal* id)
{
ReaderInfo *i = FindReader(id);
if ( i == 0 ) {
@ -333,6 +373,8 @@ void InputMgr::SendEntry(const InputReader* reader, const LogVal* const *vals) {
return;
}
bool updated = false;
//reporter->Error("Hashing %d index fields", i->num_idx_fields);
HashKey* idxhash = HashLogVals(i->num_idx_fields, vals);
@ -355,6 +397,7 @@ void InputMgr::SendEntry(const InputReader* reader, const LogVal* const *vals) {
// updated
i->lastDict->Remove(idxhash);
delete(h);
updated = true;
}
}
@ -411,8 +454,25 @@ void InputMgr::SendEntry(const InputReader* reader, const LogVal* const *vals) {
i->currDict->Insert(idxhash, ih);
std::list<string>::iterator it = i->events->begin();
while ( it != i->events->end() ) {
EnumVal* ev;
if ( updated ) {
ev = new EnumVal(BifEnum::Input::EVENT_CHANGED, BifType::Enum::Input::Event);
} else {
ev = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event);
}
Ref(idxval);
Ref(valval);
SendEvent(*it, ev, idxval, valval);
++it;
}
}
void InputMgr::EndCurrentSend(const InputReader* reader) {
ReaderInfo *i = FindReader(reader);
if ( i == 0 ) {
@ -423,6 +483,23 @@ void InputMgr::EndCurrentSend(const InputReader* reader) {
IterCookie *c = i->lastDict->InitForIteration();
InputHash* ih;
while ( ( ih = i->lastDict->NextEntry(c) ) ) {
if ( i->events->size() > 0 ) {
ListVal *idx = i->tab->RecoverIndex(ih->idxkey);
assert(idx != 0);
Val *val = i->tab->Lookup(idx);
assert(val != 0);
std::list<string>::iterator it = i->events->begin();
while ( it != i->events->end() ) {
Ref(idx);
Ref(val);
EnumVal *ev = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event);
SendEvent(*it, ev, idx, val);
++it;
}
}
reporter->Error("Expiring element");
i->tab->Delete(ih->idxkey);
}
@ -523,6 +600,22 @@ void InputMgr::SendEvent(const string& name, const int num_vals, const LogVal* c
mgr.Dispatch(new Event(handler, vl));
}
void InputMgr::SendEvent(const string& name, EnumVal* event, Val* left, Val* right)
{
EventHandler* handler = event_registry->Lookup(name.c_str());
if ( handler == 0 ) {
reporter->Error("Event %s not found", name.c_str());
return;
}
val_list* vl = new val_list;
vl->append(event);
vl->append(left);
vl->append(right);
mgr.Dispatch(new Event(handler, vl));
}
Val* InputMgr::LogValToRecordVal(const LogVal* const *vals, RecordType *request_type, int* position) {
if ( position == 0 ) {

View file

@ -21,8 +21,11 @@ public:
InputMgr();
InputReader* CreateReader(EnumVal* id, RecordVal* description);
bool ForceUpdate(EnumVal* id);
bool RemoveReader(EnumVal* id);
bool ForceUpdate(const EnumVal* id);
bool RemoveReader(const EnumVal* id);
bool RegisterEvent(const EnumVal* id, string eventName);
bool UnregisterEvent(const EnumVal* id, string eventName);
protected:
friend class InputReader;
@ -43,6 +46,7 @@ private:
bool IsCompatibleType(BroType* t);
bool UnrollRecordType(vector<LogField*> *fields, const RecordType *rec, const string& nameprepend);
void SendEvent(const string& name, EnumVal* event, Val* left, Val* right);
HashKey* HashLogVals(const int num_elements, const LogVal* const *vals);

View file

@ -1,13 +1,12 @@
// See the file "COPYING" in the main distribution directory for copyright.
#include "InputReader.h"
// #include "EventRegistry.h"
// #include "Event.h"
InputReader::InputReader()
{
buf = 0;
buf_len = 1024;
disabled = true; // disabled will be set correcty in init.
disabled = true; // disabled will be set correcty in init.
}
InputReader::~InputReader()
@ -21,9 +20,9 @@ void InputReader::Error(const char *msg)
}
void InputReader::Error(const string &msg)
{
{
input_mgr->Error(this, msg.c_str());
}
}
void InputReader::Put(const LogVal* const *val)
{
@ -54,16 +53,19 @@ bool InputReader::Init(string arg_source, int arg_num_fields, int arg_idx_fields
return !disabled;
}
void InputReader::Finish() {
void InputReader::Finish()
{
DoFinish();
disabled = true;
}
bool InputReader::Update() {
bool InputReader::Update()
{
return DoUpdate();
}
void InputReader::SendEvent(const string& name, const int num_vals, const LogVal* const *vals) {
void InputReader::SendEvent(const string& name, const int num_vals, const LogVal* const *vals)
{
input_mgr->SendEvent(name, num_vals, vals);
}
@ -93,10 +95,12 @@ const char* InputReader::Fmt(const char* format, ...)
}
void InputReader::SendEntry(const LogVal* const *vals) {
void InputReader::SendEntry(const LogVal* const *vals)
{
input_mgr->SendEntry(this, vals);
}
void InputReader::EndCurrentSend() {
void InputReader::EndCurrentSend()
{
input_mgr->EndCurrentSend(this);
}

View file

@ -21,6 +21,17 @@ function Input::__force_update%(id: ID%) : bool
return new Val( res, TYPE_BOOL );
%}
function Input::__add_event%(id: ID, name: string%) : bool
%{
bool res = input_mgr->RegisterEvent(id->AsEnumVal(), name->AsString()->CheckString());
return new Val( res, TYPE_BOOL );
%}
function Input::__remove_event%(id: ID, name: string%) : bool
%{
bool res = input_mgr->UnregisterEvent(id->AsEnumVal(), name->AsString()->CheckString());
return new Val( res, TYPE_BOOL );
%}
function Input::__remove_reader%(id: ID%) : bool
%{

View file

@ -174,6 +174,13 @@ enum Reader %{
READER_ASCII,
%}
enum Event %{
EVENT_NEW,
EVENT_CHANGED,
EVENT_REMOVED,
%}
enum ID %{
Unknown,
%}