mirror of
https://github.com/zeek/zeek.git
synced 2025-10-03 15:18:20 +00:00
Add regular debugging output for interesting operations (stream/filter operations) to input framework (this was way overdue)
This commit is contained in:
parent
c3d2f1d5fc
commit
b4e6971aab
4 changed files with 114 additions and 3 deletions
|
@ -5,7 +5,7 @@ module Input;
|
||||||
|
|
||||||
export {
|
export {
|
||||||
|
|
||||||
redef enum Input::ID += { TABLE_READ };
|
redef enum Input::ID += { TABLE_READ, EVENT_READ };
|
||||||
|
|
||||||
## The default input reader used. Defaults to `READER_ASCII`.
|
## The default input reader used. Defaults to `READER_ASCII`.
|
||||||
const default_reader = READER_ASCII &redef;
|
const default_reader = READER_ASCII &redef;
|
||||||
|
@ -123,6 +123,8 @@ export {
|
||||||
## filter: the `TableFilter` record describing the filter.
|
## filter: the `TableFilter` record describing the filter.
|
||||||
global read_table: function(description: Input::StreamDescription, filter: Input::TableFilter) : bool;
|
global read_table: function(description: Input::StreamDescription, filter: Input::TableFilter) : bool;
|
||||||
|
|
||||||
|
global read_event: function(description: Input::StreamDescription, filter: Input::EventFilter) : bool;
|
||||||
|
|
||||||
global update_finished: event(id: Input::ID);
|
global update_finished: event(id: Input::ID);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -182,6 +184,27 @@ function read_table(description: Input::StreamDescription, filter: Input::TableF
|
||||||
if ( ok ) {
|
if ( ok ) {
|
||||||
ok = add_tablefilter(id, filter);
|
ok = add_tablefilter(id, filter);
|
||||||
}
|
}
|
||||||
|
if ( ok ) {
|
||||||
|
ok = remove_tablefilter(id, filter$name);
|
||||||
|
}
|
||||||
|
if ( ok ) {
|
||||||
|
ok = remove_stream(id);
|
||||||
|
} else {
|
||||||
|
remove_stream(id);
|
||||||
|
}
|
||||||
|
|
||||||
|
return ok;
|
||||||
|
}
|
||||||
|
|
||||||
|
function read_event(description: Input::StreamDescription, filter: Input::EventFilter) : bool {
|
||||||
|
local ok: bool = T;
|
||||||
|
# since we create and delete it ourselves this should be ok... at least for singlethreaded operation
|
||||||
|
local id: Input::ID = Input::EVENT_READ;
|
||||||
|
|
||||||
|
ok = create_stream(id, description);
|
||||||
|
if ( ok ) {
|
||||||
|
ok = add_eventfilter(id, filter);
|
||||||
|
}
|
||||||
if ( ok ) {
|
if ( ok ) {
|
||||||
ok = remove_stream(id);
|
ok = remove_stream(id);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -15,7 +15,8 @@ DebugLogger::Stream DebugLogger::streams[NUM_DBGS] = {
|
||||||
{ "compressor", 0, false }, {"string", 0, false },
|
{ "compressor", 0, false }, {"string", 0, false },
|
||||||
{ "notifiers", 0, false }, { "main-loop", 0, false },
|
{ "notifiers", 0, false }, { "main-loop", 0, false },
|
||||||
{ "dpd", 0, false }, { "tm", 0, false },
|
{ "dpd", 0, false }, { "tm", 0, false },
|
||||||
{ "logging", 0, false }, { "threading", 0, false }
|
{ "logging", 0, false }, {"input", 0, false },
|
||||||
|
{ "threading", 0, false }
|
||||||
};
|
};
|
||||||
|
|
||||||
DebugLogger::DebugLogger(const char* filename)
|
DebugLogger::DebugLogger(const char* filename)
|
||||||
|
|
|
@ -24,6 +24,7 @@ enum DebugStream {
|
||||||
DBG_DPD, // Dynamic application detection framework
|
DBG_DPD, // Dynamic application detection framework
|
||||||
DBG_TM, // Time-machine packet input via Brocolli
|
DBG_TM, // Time-machine packet input via Brocolli
|
||||||
DBG_LOGGING, // Logging streams
|
DBG_LOGGING, // Logging streams
|
||||||
|
DBG_INPUT, // Input streams
|
||||||
DBG_THREADING, // Threading system
|
DBG_THREADING, // Threading system
|
||||||
|
|
||||||
NUM_DBGS // Has to be last
|
NUM_DBGS // Has to be last
|
||||||
|
|
|
@ -205,12 +205,24 @@ ReaderBackend* Manager::CreateBackend(ReaderFrontend* frontend, bro_int_t type)
|
||||||
// create a new input reader object to be used at whomevers leisure lateron.
|
// create a new input reader object to be used at whomevers leisure lateron.
|
||||||
ReaderFrontend* Manager::CreateStream(EnumVal* id, RecordVal* description)
|
ReaderFrontend* Manager::CreateStream(EnumVal* id, RecordVal* description)
|
||||||
{
|
{
|
||||||
|
{
|
||||||
|
ReaderInfo *i = FindReader(id);
|
||||||
|
if ( i != 0 ) {
|
||||||
|
ODesc desc;
|
||||||
|
id->Describe(&desc);
|
||||||
|
reporter->Error("Trying create already existing input stream %s", desc.Description());
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
ReaderDefinition* ir = input_readers;
|
ReaderDefinition* ir = input_readers;
|
||||||
|
|
||||||
RecordType* rtype = description->Type()->AsRecordType();
|
RecordType* rtype = description->Type()->AsRecordType();
|
||||||
if ( ! same_type(rtype, BifType::Record::Input::StreamDescription, 0) )
|
if ( ! same_type(rtype, BifType::Record::Input::StreamDescription, 0) )
|
||||||
{
|
{
|
||||||
reporter->Error("Streamdescription argument not of right type");
|
ODesc desc;
|
||||||
|
id->Describe(&desc);
|
||||||
|
reporter->Error("Streamdescription argument not of right type for new input stream %s", desc.Description());
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -239,6 +251,13 @@ ReaderFrontend* Manager::CreateStream(EnumVal* id, RecordVal* description)
|
||||||
|
|
||||||
reader_obj->Init(source, mode->InternalInt(), do_autostart);
|
reader_obj->Init(source, mode->InternalInt(), do_autostart);
|
||||||
|
|
||||||
|
#ifdef DEBUG
|
||||||
|
ODesc desc;
|
||||||
|
id->Describe(&desc);
|
||||||
|
DBG_LOG(DBG_INPUT, "Successfully created new input stream %s",
|
||||||
|
desc.Description());
|
||||||
|
#endif
|
||||||
|
|
||||||
return reader_obj;
|
return reader_obj;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -503,6 +522,13 @@ bool Manager::AddTableFilter(EnumVal *id, RecordVal* fval) {
|
||||||
i->filters[filterid] = filter;
|
i->filters[filterid] = filter;
|
||||||
i->reader->AddFilter( filterid, fieldsV.size(), fields );
|
i->reader->AddFilter( filterid, fieldsV.size(), fields );
|
||||||
|
|
||||||
|
#ifdef DEBUG
|
||||||
|
ODesc desc;
|
||||||
|
id->Describe(&desc);
|
||||||
|
DBG_LOG(DBG_INPUT, "Successfully created new table filter %s for stream",
|
||||||
|
filter->name.c_str(), desc.Description());
|
||||||
|
#endif
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -574,6 +600,13 @@ bool Manager::RemoveStream(const EnumVal* id) {
|
||||||
|
|
||||||
i->reader->Finish();
|
i->reader->Finish();
|
||||||
|
|
||||||
|
#ifdef DEBUG
|
||||||
|
ODesc desc;
|
||||||
|
id->Describe(&desc);
|
||||||
|
DBG_LOG(DBG_INPUT, "Successfully queued removal of stream %s",
|
||||||
|
desc.Description());
|
||||||
|
#endif
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -586,6 +619,12 @@ bool Manager::RemoveStreamContinuation(const ReaderFrontend* reader) {
|
||||||
if ( (*s)->reader && (*s)->reader == reader )
|
if ( (*s)->reader && (*s)->reader == reader )
|
||||||
{
|
{
|
||||||
i = *s;
|
i = *s;
|
||||||
|
#ifdef DEBUG
|
||||||
|
ODesc desc;
|
||||||
|
i->id->Describe(&desc);
|
||||||
|
DBG_LOG(DBG_INPUT, "Successfully executed removal of stream %s",
|
||||||
|
desc.Description());
|
||||||
|
#endif
|
||||||
delete(i);
|
delete(i);
|
||||||
readers.erase(s);
|
readers.erase(s);
|
||||||
return true;
|
return true;
|
||||||
|
@ -651,6 +690,13 @@ bool Manager::ForceUpdate(const EnumVal* id)
|
||||||
|
|
||||||
i->reader->Update();
|
i->reader->Update();
|
||||||
|
|
||||||
|
#ifdef DEBUG
|
||||||
|
ODesc desc;
|
||||||
|
id->Describe(&desc);
|
||||||
|
DBG_LOG(DBG_INPUT, "Forcing update of stream %s",
|
||||||
|
desc.Description());
|
||||||
|
#endif
|
||||||
|
|
||||||
return true; // update is async :(
|
return true; // update is async :(
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -685,6 +731,13 @@ bool Manager::RemoveTableFilter(EnumVal* id, const string &name) {
|
||||||
|
|
||||||
i->reader->RemoveFilter(filterId);
|
i->reader->RemoveFilter(filterId);
|
||||||
|
|
||||||
|
#ifdef DEBUG
|
||||||
|
ODesc desc;
|
||||||
|
id->Describe(&desc);
|
||||||
|
DBG_LOG(DBG_INPUT, "Queued removal of tablefilter %s for stream %s",
|
||||||
|
name.c_str(), desc.Description());
|
||||||
|
#endif
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -701,6 +754,13 @@ bool Manager::RemoveFilterContinuation(const ReaderFrontend* reader, const int f
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#ifdef DEBUG
|
||||||
|
ODesc desc;
|
||||||
|
i->id->Describe(&desc);
|
||||||
|
DBG_LOG(DBG_INPUT, "Executed removal of (table|event)-filter %s for stream %s",
|
||||||
|
(*it).second->name.c_str(), desc.Description());
|
||||||
|
#endif
|
||||||
|
|
||||||
delete (*it).second;
|
delete (*it).second;
|
||||||
i->filters.erase(it);
|
i->filters.erase(it);
|
||||||
|
|
||||||
|
@ -736,6 +796,13 @@ bool Manager::RemoveEventFilter(EnumVal* id, const string &name) {
|
||||||
}
|
}
|
||||||
|
|
||||||
i->reader->RemoveFilter(filterId);
|
i->reader->RemoveFilter(filterId);
|
||||||
|
|
||||||
|
#ifdef DEBUG
|
||||||
|
ODesc desc;
|
||||||
|
id->Describe(&desc);
|
||||||
|
DBG_LOG(DBG_INPUT, "Queued removal of eventfilter %s for stream %s",
|
||||||
|
name.c_str(), desc.Description());
|
||||||
|
#endif
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -948,6 +1015,13 @@ void Manager::EndCurrentSend(const ReaderFrontend* reader, int id) {
|
||||||
|
|
||||||
assert(i->HasFilter(id));
|
assert(i->HasFilter(id));
|
||||||
|
|
||||||
|
#ifdef DEBUG
|
||||||
|
ODesc desc;
|
||||||
|
i->id->Describe(&desc);
|
||||||
|
DBG_LOG(DBG_INPUT, "Got EndCurrentSend for filter %d and stream %s",
|
||||||
|
id, desc.Description());
|
||||||
|
#endif
|
||||||
|
|
||||||
if ( i->filters[id]->filter_type == EVENT_FILTER ) {
|
if ( i->filters[id]->filter_type == EVENT_FILTER ) {
|
||||||
// nothing to do..
|
// nothing to do..
|
||||||
return;
|
return;
|
||||||
|
@ -1018,6 +1092,11 @@ void Manager::EndCurrentSend(const ReaderFrontend* reader, int id) {
|
||||||
filter->lastDict = filter->currDict;
|
filter->lastDict = filter->currDict;
|
||||||
filter->currDict = new PDict(InputHash);
|
filter->currDict = new PDict(InputHash);
|
||||||
|
|
||||||
|
#ifdef DEBUG
|
||||||
|
DBG_LOG(DBG_INPUT, "EndCurrentSend complete for filter %d and stream %s, queueing update_finished event",
|
||||||
|
id, desc.Description());
|
||||||
|
#endif
|
||||||
|
|
||||||
// Send event that the current update is indeed finished.
|
// Send event that the current update is indeed finished.
|
||||||
EventHandler* handler = event_registry->Lookup("Input::update_finished");
|
EventHandler* handler = event_registry->Lookup("Input::update_finished");
|
||||||
if ( handler == 0 ) {
|
if ( handler == 0 ) {
|
||||||
|
@ -1202,6 +1281,13 @@ void Manager::Clear(const ReaderFrontend* reader, int id) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#ifdef DEBUG
|
||||||
|
ODesc desc;
|
||||||
|
i->id->Describe(&desc);
|
||||||
|
DBG_LOG(DBG_INPUT, "Got Clear for filter %d and stream %s",
|
||||||
|
id, desc.Description());
|
||||||
|
#endif
|
||||||
|
|
||||||
assert(i->HasFilter(id));
|
assert(i->HasFilter(id));
|
||||||
|
|
||||||
assert(i->filters[id]->filter_type == TABLE_FILTER);
|
assert(i->filters[id]->filter_type == TABLE_FILTER);
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue