FileAnalysis: buffer input that can't get unique file handle immediately

A retry happens on every new input and also periodically based on a
timer.  If a file handle is returned at those times, the input is
forwarded for analysis, else it keeps retrying until a timeout
threshold.
This commit is contained in:
Jon Siwek 2013-03-14 10:57:16 -05:00
parent 878dfff2f2
commit 637fe69cf9
11 changed files with 319 additions and 103 deletions

View file

@ -8,8 +8,19 @@
using namespace file_analysis;
Manager::Manager()
void DrainTimer::Dispatch(double t, int is_expire)
{
using BifConst::FileAnalysis::pending_file_drain_interval;
DBG_LOG(DBG_FILE_ANALYSIS, "DrainTimer dispatched");
file_mgr->DrainPending();
if ( ! is_expire )
timer_mgr->Add(new DrainTimer(pending_file_drain_interval));
}
Manager::Manager() : is_draining(false)
{
using BifConst::FileAnalysis::pending_file_drain_interval;
timer_mgr->Add(new DrainTimer(pending_file_drain_interval));
}
Manager::~Manager()
@ -38,10 +49,13 @@ string Manager::GetFileHandle(Analyzer* root, Connection* conn,
vl.append(new Val(is_orig, TYPE_BOOL));
Val* result = callback->AsFunc()->Call(&vl);
string rval = result->AsString()->CheckString();
Unref(result);
if ( ! rval.empty() ) return rval;
if ( result )
{
string rval = result->AsString()->CheckString();
Unref(result);
if ( ! rval.empty() ) return rval;
}
}
for ( analyzer_list::const_iterator it = root->GetChildren().begin();
@ -63,14 +77,29 @@ string Manager::GetFileHandle(Connection* conn, bool is_orig) const
void Manager::DrainPending()
{
for ( size_t i = 0; i < pending.size(); ++i )
pending[i].Retry();
if ( is_draining ) return;
pending.clear();
is_draining = true;
PendingList::iterator it = pending.begin();
while ( it != pending.end() )
{
if ( (*it)->Retry() || (*it)->IsStale() )
{
delete *it;
pending.erase(it++);
}
else
++it;
}
is_draining = false;
}
void Manager::Terminate()
{
DrainPending();
vector<FileID> keys;
for ( IDMap::iterator it = id_map.begin(); it != id_map.end(); ++it )
keys.push_back(it->first);
@ -78,19 +107,24 @@ void Manager::Terminate()
Timeout(keys[i], true);
}
void Manager::DataIn(const u_char* data, uint64 len, uint64 offset,
Connection* conn, bool is_orig, bool allow_retry)
bool Manager::DataIn(const u_char* data, uint64 len, uint64 offset,
Connection* conn, bool is_orig)
{
DrainPending();
string unique = GetFileHandle(conn, is_orig);
if ( ! unique.empty() )
{
DataIn(data, len, offset, GetInfo(unique, conn));
return;
return true;
}
if ( allow_retry )
pending.push_back(PendingFile(data, len, offset, conn, is_orig));
if ( ! is_draining )
pending.push_back(new PendingDataInChunk(data, len, offset, conn,
is_orig));
return false;
}
void Manager::DataIn(const u_char* data, uint64 len, uint64 offset,
@ -102,6 +136,8 @@ void Manager::DataIn(const u_char* data, uint64 len, uint64 offset,
void Manager::DataIn(const u_char* data, uint64 len, uint64 offset,
Info* info)
{
DrainPending();
if ( ! info ) return;
info->DataIn(data, len, offset);
@ -110,19 +146,23 @@ void Manager::DataIn(const u_char* data, uint64 len, uint64 offset,
RemoveFile(info->GetUnique());
}
void Manager::DataIn(const u_char* data, uint64 len, Connection* conn,
bool is_orig, bool allow_retry)
bool Manager::DataIn(const u_char* data, uint64 len, Connection* conn,
bool is_orig)
{
DrainPending();
string unique = GetFileHandle(conn, is_orig);
if ( ! unique.empty() )
{
DataIn(data, len, GetInfo(unique, conn));
return;
return true;
}
if ( allow_retry )
pending.push_back(PendingFile(data, len, conn, is_orig));
if ( ! is_draining )
pending.push_back(new PendingDataInStream(data, len, conn, is_orig));
return false;
}
void Manager::DataIn(const u_char* data, uint64 len, const string& unique)
@ -132,6 +172,8 @@ void Manager::DataIn(const u_char* data, uint64 len, const string& unique)
void Manager::DataIn(const u_char* data, uint64 len, Info* info)
{
DrainPending();
if ( ! info ) return;
info->DataIn(data, len);
@ -146,27 +188,46 @@ void Manager::EndOfFile(Connection* conn)
EndOfFile(conn, false);
}
void Manager::EndOfFile(Connection* conn, bool is_orig)
bool Manager::EndOfFile(Connection* conn, bool is_orig)
{
DrainPending();
string unique = GetFileHandle(conn, is_orig);
if ( unique.empty() ) return; // nothing to do
if ( ! unique.empty() )
{
RemoveFile(unique);
return true;
}
RemoveFile(unique);
if ( ! is_draining )
pending.push_back(new PendingEOF(conn, is_orig));
return false;
}
void Manager::EndOfFile(const string& unique)
{
DrainPending();
RemoveFile(unique);
}
void Manager::Gap(uint64 offset, uint64 len, Connection* conn, bool is_orig)
bool Manager::Gap(uint64 offset, uint64 len, Connection* conn, bool is_orig)
{
DrainPending();
string unique = GetFileHandle(conn, is_orig);
if ( unique.empty() ) return; // nothing to do since no data has been seen
if ( ! unique.empty() )
{
Gap(offset, len, GetInfo(unique, conn));
return true;
}
Gap(offset, len, GetInfo(unique, conn));
if ( ! is_draining )
pending.push_back(new PendingGap(offset, len, conn, is_orig));
return false;
}
void Manager::Gap(uint64 offset, uint64 len, const string& unique)
@ -176,18 +237,29 @@ void Manager::Gap(uint64 offset, uint64 len, const string& unique)
void Manager::Gap(uint64 offset, uint64 len, Info* info)
{
DrainPending();
if ( ! info ) return;
info->Gap(offset, len);
}
void Manager::SetSize(uint64 size, Connection* conn, bool is_orig)
bool Manager::SetSize(uint64 size, Connection* conn, bool is_orig)
{
DrainPending();
string unique = GetFileHandle(conn, is_orig);
if ( unique.empty() ) return; // ok assuming this always follows a DataIn()
if ( ! unique.empty() )
{
SetSize(size, GetInfo(unique, conn));
return true;
}
SetSize(size, GetInfo(unique, conn));
if ( ! is_draining )
pending.push_back(new PendingSize(size, conn, is_orig));
return false;
}
void Manager::SetSize(uint64 size, const string& unique)
@ -197,6 +269,8 @@ void Manager::SetSize(uint64 size, const string& unique)
void Manager::SetSize(uint64 size, Info* info)
{
DrainPending();
if ( ! info ) return;
info->SetTotalBytes(size);
@ -294,6 +368,8 @@ Info* Manager::Lookup(const FileID& file_id) const
void Manager::Timeout(const FileID& file_id, bool is_terminating)
{
DrainPending();
Info* info = Lookup(file_id);
if ( ! info ) return;