Add input interface to forward data for file analysis.

The new Input::add_analysis function is used to automatically forward
input data on to the file analysis framework.
This commit is contained in:
Jon Siwek 2013-05-21 10:29:22 -05:00
parent 90fa331279
commit 0ef074594d
9 changed files with 219 additions and 45 deletions

View file

@ -122,6 +122,35 @@ export {
config: table[string] of string &default=table();
};
## A file analyis input stream type used to forward input data to the
## file analysis framework.
type AnalysisDescription: record {
## String that allows the reader to find the source.
## For `READER_ASCII`, this is the filename.
source: string;
## Reader to use for this steam. Compatible readers must be
## able to accept a filter of a single string type (i.e.
## they read a byte stream).
reader: Reader &default=Input::READER_BINARY;
## Read mode to use for this stream
mode: Mode &default=default_mode;
## Descriptive name that uniquely identifies the input source.
## Can be used used to remove a stream at a later time.
## This will also be used for the unique *source* field of
## :bro:see:`fa_file`. Most of the time, the best choice for this
## field will be the same value as the *source* field.
name: string;
## A key/value table that will be passed on the reader.
## Interpretation of the values is left to the writer, but
## usually they will be used for configuration purposes.
config: table[string] of string &default=table();
};
## Create a new table input from a given source. Returns true on success.
##
## description: `TableDescription` record describing the source.
@ -132,6 +161,14 @@ export {
## description: `TableDescription` record describing the source.
global add_event: function(description: Input::EventDescription) : bool;
## Create a new file analysis input from a given source. Data read from
## the source is automatically forwarded to the file analysis framework.
##
## description: A record describing the source
##
## Returns: true on sucess.
global add_analysis: function(description: Input::AnalysisDescription) : bool;
## Remove a input stream. Returns true on success and false if the named stream was
## not found.
##
@ -164,6 +201,11 @@ function add_event(description: Input::EventDescription) : bool
return __create_event_stream(description);
}
function add_analysis(description: Input::AnalysisDescription) : bool
{
return __create_analysis_stream(description);
}
function remove(id: string) : bool
{
return __remove_stream(id);

View file

@ -85,14 +85,10 @@ File::File(const string& file_id, Connection* conn, AnalyzerTag::Tag tag,
if ( conn )
{
// add source, connection, is_orig fields
val->Assign(source_idx, new StringVal(::Analyzer::GetTagName(tag)));
SetSource(::Analyzer::GetTagName(tag));
val->Assign(is_orig_idx, new Val(is_orig, TYPE_BOOL));
UpdateConnectionFields(conn);
}
else
{
// TODO: what to use as source field? (input framework interface)
}
UpdateLastActivityTime();
}
@ -172,6 +168,18 @@ int File::Idx(const string& field)
return rval;
}
string File::GetSource() const
{
Val* v = val->Lookup(source_idx);
return v ? v->AsString()->CheckString() : string();
}
void File::SetSource(const string& source)
{
val->Assign(source_idx, new StringVal(source.c_str()));
}
double File::GetTimeoutInterval() const
{
return LookupFieldDefaultInterval(timeout_interval_idx);

View file

@ -26,6 +26,17 @@ public:
*/
RecordVal* GetVal() const { return val; }
/**
* @return the value of the "source" field from #val record or an empty
* string if it's not initialized.
*/
string GetSource() const;
/**
* Set the "source" field from #val record to \a source.
*/
void SetSource(const string& source);
/**
* @return value (seconds) of the "timeout_interval" field from #val record.
*/

View file

@ -62,7 +62,8 @@ void Manager::SetHandle(const string& handle)
void Manager::DataIn(const u_char* data, uint64 len, uint64 offset,
AnalyzerTag::Tag tag, Connection* conn, bool is_orig)
{
File* file = GetFile(conn, tag, is_orig);
GetFileHandle(tag, conn, is_orig);
File* file = GetFile(current_file_id, conn, tag, is_orig);
if ( ! file )
return;
@ -76,9 +77,10 @@ void Manager::DataIn(const u_char* data, uint64 len, uint64 offset,
void Manager::DataIn(const u_char* data, uint64 len, AnalyzerTag::Tag tag,
Connection* conn, bool is_orig)
{
GetFileHandle(tag, conn, is_orig);
// Sequential data input shouldn't be going over multiple conns, so don't
// do the check to update connection set.
File* file = GetFile(conn, tag, is_orig, false);
File* file = GetFile(current_file_id, conn, tag, is_orig, false);
if ( ! file )
return;
@ -89,6 +91,23 @@ void Manager::DataIn(const u_char* data, uint64 len, AnalyzerTag::Tag tag,
RemoveFile(file->GetID());
}
void Manager::DataIn(const u_char* data, uint64 len, const string& file_id,
const string& source)
{
File* file = GetFile(file_id);
if ( ! file )
return;
if ( file->GetSource().empty() )
file->SetSource(source);
file->DataIn(data, len);
if ( file->IsComplete() )
RemoveFile(file->GetID());
}
void Manager::EndOfFile(AnalyzerTag::Tag tag, Connection* conn)
{
EndOfFile(tag, conn, true);
@ -102,10 +121,16 @@ void Manager::EndOfFile(AnalyzerTag::Tag tag, Connection* conn, bool is_orig)
RemoveFile(current_file_id);
}
void Manager::EndOfFile(const string& file_id)
{
RemoveFile(file_id);
}
void Manager::Gap(uint64 offset, uint64 len, AnalyzerTag::Tag tag,
Connection* conn, bool is_orig)
{
File* file = GetFile(conn, tag, is_orig);
GetFileHandle(tag, conn, is_orig);
File* file = GetFile(current_file_id, conn, tag, is_orig);
if ( ! file )
return;
@ -116,7 +141,8 @@ void Manager::Gap(uint64 offset, uint64 len, AnalyzerTag::Tag tag,
void Manager::SetSize(uint64 size, AnalyzerTag::Tag tag, Connection* conn,
bool is_orig)
{
File* file = GetFile(conn, tag, is_orig);
GetFileHandle(tag, conn, is_orig);
File* file = GetFile(current_file_id, conn, tag, is_orig);
if ( ! file )
return;
@ -169,27 +195,23 @@ bool Manager::RemoveAnalyzer(const string& file_id, const RecordVal* args) const
return file->RemoveAnalyzer(args);
}
File* Manager::GetFile(Connection* conn, AnalyzerTag::Tag tag, bool is_orig,
bool update_conn)
File* Manager::GetFile(const string& file_id, Connection* conn,
AnalyzerTag::Tag tag, bool is_orig, bool update_conn)
{
// sets current_file_id for us
GetFileHandle(tag, conn, is_orig);
if ( current_file_id.empty() )
if ( file_id.empty() )
return 0;
if ( IsIgnored(current_file_id) )
if ( IsIgnored(file_id) )
return 0;
File* rval = id_map[current_file_id];
File* rval = id_map[file_id];
if ( ! rval )
{
rval = id_map[current_file_id] = new File(current_file_id, conn, tag,
is_orig);
rval = id_map[file_id] = new File(file_id, conn, tag, is_orig);
rval->ScheduleInactivityTimer();
if ( IsIgnored(current_file_id) )
if ( IsIgnored(file_id) )
return 0;
}
else

View file

@ -56,11 +56,18 @@ public:
void DataIn(const u_char* data, uint64 len, AnalyzerTag::Tag tag,
Connection* conn, bool is_orig);
/**
* Pass in sequential file data from external source (e.g. input framework).
*/
void DataIn(const u_char* data, uint64 len, const string& file_id,
const string& source);
/**
* Signal the end of file data.
*/
void EndOfFile(AnalyzerTag::Tag tag, Connection* conn);
void EndOfFile(AnalyzerTag::Tag tag, Connection* conn, bool is_orig);
void EndOfFile(const string& file_id);
/**
* Signal a gap in the file data stream.
@ -118,13 +125,13 @@ protected:
typedef map<string, File*> IDMap;
/**
* @return the File object mapped to #current_file_id or a null pointer if
* @return the File object mapped to \a file_id or a null pointer if
* analysis is being ignored for the associated file. An File
* object may be created if a mapping doesn't exist, and if it did
* exist, the activity time is refreshed along with any
* connection-related fields.
*/
File* GetFile(Connection* conn = 0,
File* GetFile(const string& file_id, Connection* conn = 0,
AnalyzerTag::Tag tag = AnalyzerTag::Error,
bool is_orig = false, bool update_conn = true);

View file

@ -9,6 +9,7 @@ module Input;
type TableDescription: record;
type EventDescription: record;
type AnalysisDescription: record;
function Input::__create_table_stream%(description: Input::TableDescription%) : bool
%{
@ -22,6 +23,12 @@ function Input::__create_event_stream%(description: Input::EventDescription%) :
return new Val(res, TYPE_BOOL);
%}
function Input::__create_analysis_stream%(description: Input::AnalysisDescription%) : bool
%{
bool res = input_mgr->CreateAnalysisStream(description->AsRecordVal());
return new Val(res, TYPE_BOOL);
%}
function Input::__remove_stream%(id: string%) : bool
%{
bool res = input_mgr->RemoveStream(id->AsString()->CheckString());

View file

@ -15,6 +15,7 @@
#include "EventHandler.h"
#include "NetVar.h"
#include "Net.h"
#include "../file_analysis/Manager.h"
#include "CompHash.h"
@ -148,6 +149,14 @@ public:
~EventStream();
};
class Manager::AnalysisStream: public Manager::Stream {
public:
string file_id;
AnalysisStream();
~AnalysisStream();
};
Manager::TableStream::TableStream() : Manager::Stream::Stream()
{
stream_type = TABLE_STREAM;
@ -198,6 +207,15 @@ Manager::TableStream::~TableStream()
}
}
Manager::AnalysisStream::AnalysisStream() : Manager::Stream::Stream()
{
stream_type = ANALYSIS_STREAM;
}
Manager::AnalysisStream::~AnalysisStream()
{
}
Manager::Manager()
{
end_of_data = internal_handler("Input::end_of_data");
@ -274,7 +292,8 @@ bool Manager::CreateStream(Stream* info, RecordVal* description)
RecordType* rtype = description->Type()->AsRecordType();
if ( ! ( same_type(rtype, BifType::Record::Input::TableDescription, 0)
|| same_type(rtype, BifType::Record::Input::EventDescription, 0) ) )
|| same_type(rtype, BifType::Record::Input::EventDescription, 0)
|| same_type(rtype, BifType::Record::Input::AnalysisDescription, 0) ) )
{
reporter->Error("Streamdescription argument not of right type for new input stream");
return false;
@ -680,6 +699,40 @@ bool Manager::CreateTableStream(RecordVal* fval)
return true;
}
bool Manager::CreateAnalysisStream(RecordVal* fval)
{
RecordType* rtype = fval->Type()->AsRecordType();
if ( ! same_type(rtype, BifType::Record::Input::AnalysisDescription, 0) )
{
reporter->Error("AnalysisDescription argument not of right type");
return false;
}
AnalysisStream* stream = new AnalysisStream();
{
if ( ! CreateStream(stream, fval) )
{
delete stream;
return false;
}
}
stream->file_id = file_mgr->HashHandle(stream->name);
assert(stream->reader);
// reader takes in a byte stream as the only field
Field** fields = new Field*[1];
fields[0] = new Field("bytestream", 0, TYPE_STRING, TYPE_VOID, false);
stream->reader->Init(1, fields);
readers[stream->reader] = stream;
DBG_LOG(DBG_INPUT, "Successfully created analysis stream %s",
stream->name.c_str());
return true;
}
bool Manager::IsCompatibleType(BroType* t, bool atomic_only)
{
@ -966,6 +1019,15 @@ void Manager::SendEntry(ReaderFrontend* reader, Value* *vals)
readFields = SendEventStreamEvent(i, type, vals);
}
else if ( i->stream_type == ANALYSIS_STREAM )
{
readFields = 1;
assert(vals[0]->type == TYPE_STRING);
file_mgr->DataIn(reinterpret_cast<u_char*>(vals[0]->val.string_val.data),
vals[0]->val.string_val.length,
static_cast<AnalysisStream*>(i)->file_id, i->name);
}
else
assert(false);
@ -1179,7 +1241,7 @@ void Manager::EndCurrentSend(ReaderFrontend* reader)
DBG_LOG(DBG_INPUT, "Got EndCurrentSend stream %s", i->name.c_str());
#endif
if ( i->stream_type == EVENT_STREAM )
if ( i->stream_type != TABLE_STREAM )
{
// just signal the end of the data source
SendEndOfData(i);
@ -1288,6 +1350,9 @@ void Manager::SendEndOfData(ReaderFrontend* reader)
void Manager::SendEndOfData(const Stream *i)
{
SendEvent(end_of_data, 2, new StringVal(i->name.c_str()), new StringVal(i->info->source));
if ( i->stream_type == ANALYSIS_STREAM )
file_mgr->EndOfFile(static_cast<const AnalysisStream*>(i)->file_id);
}
void Manager::Put(ReaderFrontend* reader, Value* *vals)
@ -1310,6 +1375,15 @@ void Manager::Put(ReaderFrontend* reader, Value* *vals)
readFields = SendEventStreamEvent(i, type, vals);
}
else if ( i->stream_type == ANALYSIS_STREAM )
{
readFields = 1;
assert(vals[0]->type == TYPE_STRING);
file_mgr->DataIn(reinterpret_cast<u_char*>(vals[0]->val.string_val.data),
vals[0]->val.string_val.length,
static_cast<AnalysisStream*>(i)->file_id, i->name);
}
else
assert(false);
@ -1577,6 +1651,12 @@ bool Manager::Delete(ReaderFrontend* reader, Value* *vals)
success = true;
}
else if ( i->stream_type == ANALYSIS_STREAM )
{
// can't do anything
success = true;
}
else
{
assert(false);

View file

@ -55,6 +55,18 @@ public:
*/
bool CreateEventStream(RecordVal* description);
/**
* Creates a new input stream which will forward the data from the data
* source on to the file analysis framework. The internal BiF defined
* in input.bif just forward here. For an input reader to be compatible
* with this method, it must be able to accept a filter of a single string
* type (i.e. they read a byte stream).
*
* @param description A record of the script type \c
* Input::AnalysisDescription
*/
bool CreateAnalysisStream(RecordVal* description);
/**
* Force update on a input stream. Forces a re-read of the whole
* input source. Usually used when an input stream is opened in
@ -138,6 +150,7 @@ private:
class Stream;
class TableStream;
class EventStream;
class AnalysisStream;
// Actual RemoveStream implementation -- the function's public and
// protected definitions are wrappers around this function.
@ -202,7 +215,7 @@ private:
Stream* FindStream(const string &name);
Stream* FindStream(ReaderFrontend* reader);
enum StreamType { TABLE_STREAM, EVENT_STREAM };
enum StreamType { TABLE_STREAM, EVENT_STREAM, ANALYSIS_STREAM };
map<ReaderFrontend*, Stream*> readers;

View file

@ -18,28 +18,12 @@ redef test_get_file_name = function(f: fa_file): string
T -42 SSH::LOG 21 123 10.0.0.0/24 1.2.3.4 3.14 1315801931.273616 100.000000 hurz 2,4,1,3 CC,AA,BB EMPTY 10,20,30 EMPTY 4242
@TEST-END-FILE
module A;
type Val: record {
s: string;
};
event line(description: Input::EventDescription, tpe: Input::Event, s: string)
{
FileAnalysis::data_stream(description$source, s);
}
event Input::end_of_data(name: string, source: string)
{
FileAnalysis::eof(source);
}
event bro_init()
{
Input::add_event([$source="../input.log", $reader=Input::READER_BINARY,
$mode=Input::MANUAL, $name="input", $fields=Val,
$ev=line, $want_record=F]);
Input::remove("input");
local source: string = "../input.log";
Input::add_analysis([$source=source, $reader=Input::READER_BINARY,
$mode=Input::MANUAL, $name=source]);
Input::remove(source);
}
event file_state_remove(f: fa_file) &priority=-10