mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 22:58:20 +00:00
FileAnalysis: insert explicit event queue flush points.
And added an event called "event_queue_flush_point" to mark where that occured in the event stream. The FAF now uses an explicit event queue flush instead of buffering input in order to wait for a file handle to be returned from script-layer.
This commit is contained in:
parent
d9321e2203
commit
2747e839fb
15 changed files with 128 additions and 382 deletions
|
@ -29,38 +29,18 @@ void Manager::Terminate()
|
|||
Timeout(keys[i], true);
|
||||
}
|
||||
|
||||
void Manager::ReceiveHandle(const string& handle)
|
||||
void Manager::SetHandle(const string& handle)
|
||||
{
|
||||
if ( pending.empty() )
|
||||
reporter->InternalError("File analysis underflow");
|
||||
|
||||
PendingFile* pf = pending.front();
|
||||
if ( ! handle.empty() )
|
||||
pf->Finish(handle);
|
||||
delete pf;
|
||||
pending.pop();
|
||||
}
|
||||
|
||||
void Manager::EventDrainDone()
|
||||
{
|
||||
if ( pending.empty() ) return;
|
||||
|
||||
reporter->Error("Too few return_file_handle() calls, discarding pending"
|
||||
" file analysis input.");
|
||||
|
||||
while ( ! pending.empty() )
|
||||
{
|
||||
delete pending.front();
|
||||
pending.pop();
|
||||
}
|
||||
current_handle = handle;
|
||||
}
|
||||
|
||||
void Manager::DataIn(const u_char* data, uint64 len, uint64 offset,
|
||||
AnalyzerTag::Tag tag, Connection* conn, bool is_orig)
|
||||
{
|
||||
if ( IsDisabled(tag) ) return;
|
||||
if ( ! QueueHandleEvent(tag, conn, is_orig) ) return;
|
||||
pending.push(new PendingDataInChunk(data, len, offset, tag, conn));
|
||||
|
||||
GetFileHandle(tag, conn, is_orig);
|
||||
DataIn(data, len, offset, GetFile(current_handle, conn, tag));
|
||||
}
|
||||
|
||||
void Manager::DataIn(const u_char* data, uint64 len, uint64 offset,
|
||||
|
@ -84,8 +64,8 @@ void Manager::DataIn(const u_char* data, uint64 len, AnalyzerTag::Tag tag,
|
|||
Connection* conn, bool is_orig)
|
||||
{
|
||||
if ( IsDisabled(tag) ) return;
|
||||
if ( ! QueueHandleEvent(tag, conn, is_orig) ) return;
|
||||
pending.push(new PendingDataInStream(data, len, tag, conn));
|
||||
GetFileHandle(tag, conn, is_orig);
|
||||
DataIn(data, len, GetFile(current_handle, conn, tag));
|
||||
}
|
||||
|
||||
void Manager::DataIn(const u_char* data, uint64 len, const string& unique)
|
||||
|
@ -112,8 +92,9 @@ void Manager::EndOfFile(AnalyzerTag::Tag tag, Connection* conn)
|
|||
void Manager::EndOfFile(AnalyzerTag::Tag tag, Connection* conn, bool is_orig)
|
||||
{
|
||||
if ( IsDisabled(tag) ) return;
|
||||
if ( ! QueueHandleEvent(tag, conn, is_orig) ) return;
|
||||
pending.push(new PendingEOF(tag, conn));
|
||||
|
||||
GetFileHandle(tag, conn, is_orig);
|
||||
EndOfFile(current_handle);
|
||||
}
|
||||
|
||||
void Manager::EndOfFile(const string& unique)
|
||||
|
@ -125,8 +106,9 @@ void Manager::Gap(uint64 offset, uint64 len, AnalyzerTag::Tag tag,
|
|||
Connection* conn, bool is_orig)
|
||||
{
|
||||
if ( IsDisabled(tag) ) return;
|
||||
if ( ! QueueHandleEvent(tag, conn, is_orig) ) return;
|
||||
pending.push(new PendingGap(offset, len, tag, conn));
|
||||
|
||||
GetFileHandle(tag, conn, is_orig);
|
||||
Gap(offset, len, GetFile(current_handle, conn, tag));
|
||||
}
|
||||
|
||||
void Manager::Gap(uint64 offset, uint64 len, const string& unique)
|
||||
|
@ -145,8 +127,9 @@ void Manager::SetSize(uint64 size, AnalyzerTag::Tag tag, Connection* conn,
|
|||
bool is_orig)
|
||||
{
|
||||
if ( IsDisabled(tag) ) return;
|
||||
if ( ! QueueHandleEvent(tag, conn, is_orig) ) return;
|
||||
pending.push(new PendingSize(size, tag, conn));
|
||||
|
||||
GetFileHandle(tag, conn, is_orig);
|
||||
SetSize(size, GetFile(current_handle, conn, tag));
|
||||
}
|
||||
|
||||
void Manager::SetSize(uint64 size, const string& unique)
|
||||
|
@ -166,12 +149,13 @@ void Manager::SetSize(uint64 size, File* file)
|
|||
|
||||
void Manager::FileEvent(EventHandlerPtr h, File* file)
|
||||
{
|
||||
if ( IsIgnored(file->GetUnique()) ) return;
|
||||
if ( ! h ) return;
|
||||
if ( IsIgnored(file->GetUnique()) ) return;
|
||||
|
||||
val_list * vl = new val_list();
|
||||
vl->append(file->GetVal()->Ref());
|
||||
mgr.Dispatch(new Event(h, vl));
|
||||
|
||||
mgr.QueueEvent(h, vl);
|
||||
}
|
||||
|
||||
bool Manager::PostponeTimeout(const FileID& file_id) const
|
||||
|
@ -205,6 +189,7 @@ bool Manager::RemoveAction(const FileID& file_id, const RecordVal* args) const
|
|||
File* Manager::GetFile(const string& unique, Connection* conn,
|
||||
AnalyzerTag::Tag tag)
|
||||
{
|
||||
if ( unique.empty() ) return 0;
|
||||
if ( IsIgnored(unique) ) return 0;
|
||||
|
||||
File* rval = str_map[unique];
|
||||
|
@ -251,6 +236,7 @@ void Manager::Timeout(const FileID& file_id, bool is_terminating)
|
|||
file->postpone_timeout = false;
|
||||
|
||||
FileEvent(file_timeout, file);
|
||||
mgr.Drain(); // need immediate feedback about whether to postpone
|
||||
|
||||
if ( file->postpone_timeout && ! is_terminating )
|
||||
{
|
||||
|
@ -306,6 +292,21 @@ bool Manager::IsIgnored(const string& unique)
|
|||
return ignored.find(unique) != ignored.end();
|
||||
}
|
||||
|
||||
void Manager::GetFileHandle(AnalyzerTag::Tag tag, Connection* c, bool is_orig)
|
||||
{
|
||||
current_handle.clear();
|
||||
|
||||
if ( ! get_file_handle ) return;
|
||||
|
||||
val_list* vl = new val_list();
|
||||
vl->append(new Val(tag, TYPE_COUNT));
|
||||
vl->append(c->BuildConnVal());
|
||||
vl->append(new Val(is_orig, TYPE_BOOL));
|
||||
|
||||
mgr.QueueEvent(get_file_handle, vl);
|
||||
mgr.Drain(); // need file handle immediately so we don't have to buffer data
|
||||
}
|
||||
|
||||
bool Manager::IsDisabled(AnalyzerTag::Tag tag)
|
||||
{
|
||||
if ( ! disabled )
|
||||
|
@ -322,17 +323,3 @@ bool Manager::IsDisabled(AnalyzerTag::Tag tag)
|
|||
|
||||
return rval;
|
||||
}
|
||||
|
||||
bool Manager::QueueHandleEvent(AnalyzerTag::Tag tag, Connection* conn,
|
||||
bool is_orig)
|
||||
{
|
||||
if ( ! get_file_handle ) return false;
|
||||
|
||||
val_list* vl = new val_list();
|
||||
vl->append(new Val(tag, TYPE_COUNT));
|
||||
vl->append(conn->BuildConnVal());
|
||||
vl->append(new Val(is_orig, TYPE_BOOL));
|
||||
|
||||
mgr.QueueEvent(get_file_handle, vl);
|
||||
return true;
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue