Merge branch 'topic/bernhard/input' into topic/bernhard/input-threads

most stuff is inplace, logging framework needs a few changes merged before continuing here...

Conflicts:
	src/CMakeLists.txt
	src/LogMgr.h
	src/logging/Manager.cc
	src/main.cc
This commit is contained in:
Bernhard Amann 2012-02-06 10:54:07 -08:00
commit f6c6387c52
48 changed files with 3399 additions and 7 deletions

1463
src/input/Manager.cc Normal file

File diff suppressed because it is too large Load diff

90
src/input/Manager.h Normal file
View file

@ -0,0 +1,90 @@
// See the file "COPYING" in the main distribution directory for copyright.
#ifndef INPUT_MANAGER_H
#define INPUT_MANAGER_H
#include "../BroString.h"
#include "../Val.h"
#include "../EventHandler.h"
#include "../RemoteSerializer.h"
#include <vector>
namespace input {
class ReaderFrontend;
class Manager {
public:
Manager();
ReaderFrontend* CreateStream(EnumVal* id, RecordVal* description);
bool ForceUpdate(const EnumVal* id);
bool RemoveStream(const EnumVal* id);
bool AddTableFilter(EnumVal *id, RecordVal* filter);
bool RemoveTableFilter(EnumVal* id, const string &name);
bool AddEventFilter(EnumVal *id, RecordVal* filter);
bool RemoveEventFilter(EnumVal* id, const string &name);
protected:
// Reports an error for the given reader.
void Error(ReaderFrontend* reader, const char* msg);
// for readers to write to input stream in direct mode (reporting new/deleted values directly)
void Put(const ReaderFrontend* reader, int id, const threading::Value* const *vals);
void Clear(const ReaderFrontend* reader, int id);
bool Delete(const ReaderFrontend* reader, int id, const threading::Value* const *vals);
// for readers to write to input stream in indirect mode (manager is monitoring new/deleted values)
void SendEntry(const ReaderFrontend* reader, int id, const threading::Value* const *vals);
void EndCurrentSend(const ReaderFrontend* reader, int id);
private:
struct ReaderInfo;
void SendEntryTable(const ReaderFrontend* reader, int id, const threading::Value* const *vals);
void PutTable(const ReaderFrontend* reader, int id, const threading::Value* const *vals);
void SendEventFilterEvent(const ReaderFrontend* reader, EnumVal* type, int id, const threading::Value* const *vals);
bool IsCompatibleType(BroType* t, bool atomic_only=false);
bool UnrollRecordType(vector<threading::Field*> *fields, const RecordType *rec, const string& nameprepend);
void SendEvent(EventHandlerPtr ev, const int numvals, ...);
void SendEvent(EventHandlerPtr ev, list<Val*> events);
bool SendEvent(const string& name, const int num_vals, const threading::Value* const *vals);
HashKey* HashValues(const int num_elements, const threading::Value* const *vals);
int GetValueLength(const threading::Value* val);
int CopyValue(char *data, const int startpos, const threading::Value* val);
Val* ValueToVal(const threading::Value* val, BroType* request_type);
Val* ValueToIndexVal(int num_fields, const RecordType* type, const threading::Value* const *vals);
RecordVal* ValueToRecordVal(const threading::Value* const *vals, RecordType *request_type, int* position);
RecordVal* ListValToRecordVal(ListVal* list, RecordType *request_type, int* position);
ReaderInfo* FindReader(const ReaderFrontend* reader);
ReaderInfo* FindReader(const EnumVal* id);
vector<ReaderInfo*> readers;
string Hash(const string &input);
class Filter;
class TableFilter;
class EventFilter;
enum FilterType { TABLE_FILTER, EVENT_FILTER };
};
}
extern input::Manager* input_mgr;
#endif /* INPUT_MANAGER_H */

124
src/input/ReaderBackend.cc Normal file
View file

@ -0,0 +1,124 @@
// See the file "COPYING" in the main distribution directory for copyright.
#include "InputReader.h"
using threading::Value;
using threading::Field;
namespace logging {
InputReader::InputReader(ReaderFrontend *arg_frontend) :MsgThread()
{
buf = 0;
buf_len = 1024;
disabled = true; // disabled will be set correcty in init.
frontend = arg_frontend;
SetName(frontend->Name());
}
InputReader::~InputReader()
{
}
void InputReader::Error(const char *msg)
{
input_mgr->Error(this, msg);
}
void InputReader::Error(const string &msg)
{
input_mgr->Error(this, msg.c_str());
}
void InputReader::Put(int id, const LogVal* const *val)
{
input_mgr->Put(this, id, val);
}
void InputReader::Clear(int id)
{
input_mgr->Clear(this, id);
}
void InputReader::Delete(int id, const LogVal* const *val)
{
input_mgr->Delete(this, id, val);
}
bool InputReader::Init(string arg_source)
{
source = arg_source;
// disable if DoInit returns error.
disabled = !DoInit(arg_source);
return !disabled;
}
bool InputReader::AddFilter(int id, int arg_num_fields,
const LogField* const * arg_fields)
{
return DoAddFilter(id, arg_num_fields, arg_fields);
}
bool InputReader::RemoveFilter(int id)
{
return DoRemoveFilter(id);
}
void InputReader::Finish()
{
DoFinish();
disabled = true;
}
bool InputReader::Update()
{
return DoUpdate();
}
bool InputReader::SendEvent(const string& name, const int num_vals, const LogVal* const *vals)
{
return input_mgr->SendEvent(name, num_vals, vals);
}
// stolen from logwriter
const char* InputReader::Fmt(const char* format, ...)
{
if ( ! buf )
buf = (char*) malloc(buf_len);
va_list al;
va_start(al, format);
int n = safe_vsnprintf(buf, buf_len, format, al);
va_end(al);
if ( (unsigned int) n >= buf_len )
{ // Not enough room, grow the buffer.
buf_len = n + 32;
buf = (char*) realloc(buf, buf_len);
// Is it portable to restart?
va_start(al, format);
n = safe_vsnprintf(buf, buf_len, format, al);
va_end(al);
}
return buf;
}
void InputReader::SendEntry(int id, const LogVal* const *vals)
{
input_mgr->SendEntry(this, id, vals);
}
void InputReader::EndCurrentSend(int id)
{
input_mgr->EndCurrentSend(this, id);
}
}

86
src/input/ReaderBackend.h Normal file
View file

@ -0,0 +1,86 @@
// See the file "COPYING" in the main distribution directory for copyright.
//
// Same notes about thread safety as in LogWriter.h apply.
#ifndef INPUT_READERBACKEND_H
#define INPUT_READERBACKEND_H
#include "InputMgr.h"
#include "BroString.h"
#include "LogMgr.h"
namespace input {
class ReaderBackend : public threading::MsgThread {
public:
ReaderBackend(ReaderFrontend *frontend);
virtual ~ReaderBackend();
bool Init(string arg_source);
bool AddFilter( int id, int arg_num_fields, const LogField* const* fields );
bool RemoveFilter ( int id );
void Finish();
bool Update();
protected:
// Methods that have to be overwritten by the individual readers
virtual bool DoInit(string arg_sources) = 0;
virtual bool DoAddFilter( int id, int arg_num_fields, const LogField* const* fields ) = 0;
virtual bool DoRemoveFilter( int id ) = 0;
virtual void DoFinish() = 0;
// update file contents to logmgr
virtual bool DoUpdate() = 0;
// Reports an error to the user.
void Error(const string &msg);
void Error(const char *msg);
// The following methods return the information as passed to Init().
const string Source() const { return source; }
// A thread-safe version of fmt(). (stolen from logwriter)
const char* Fmt(const char* format, ...);
bool SendEvent(const string& name, const int num_vals, const LogVal* const *vals);
// Content-sendinf-functions (simple mode). Including table-specific stuff that simply is not used if we have no table
void Put(int id, const LogVal* const *val);
void Delete(int id, const LogVal* const *val);
void Clear(int id);
// Table-functions (tracking mode): Only changed lines are propagated.
void SendEntry(int id, const LogVal* const *vals);
void EndCurrentSend(int id);
private:
// Frontend that instantiated us. This object must not be access from
// this class, it's running in a different thread!
ReaderFrontend* frontend;
string source;
// When an error occurs, this method is called to set a flag marking the
// writer as disabled.
bool disabled;
bool Disabled() { return disabled; }
// For implementing Fmt().
char* buf;
unsigned int buf_len;
};
}
#endif /* INPUT_READERBACKEND_H */

View file

@ -0,0 +1,28 @@
// See the file "COPYING" in the main distribution directory for copyright.
#ifndef INPUT_READERFRONTEND_H
#define INPUT_READERFRONTEND_H
#include "Manager.h"
#include "threading/MsgThread.h"
namespace logging {
class ReaderBackend;
class ReaderFrontend {
ReaderFrontend(bro_int_t type);
virtual ~ReaderFrontend();
protected:
friend class Manager;
};
}
#endif /* INPUT_READERFRONTEND_H */

View file

457
src/input/readers/Ascii.cc Normal file
View file

@ -0,0 +1,457 @@
// See the file "COPYING" in the main distribution directory for copyright.
#include "InputReaderAscii.h"
#include "DebugLogger.h"
#include "NetVar.h"
#include <sstream>
FieldMapping::FieldMapping(const string& arg_name, const TypeTag& arg_type, int arg_position)
: name(arg_name), type(arg_type)
{
position = arg_position;
secondary_position = -1;
}
FieldMapping::FieldMapping(const string& arg_name, const TypeTag& arg_type, const TypeTag& arg_subtype, int arg_position)
: name(arg_name), type(arg_type), subtype(arg_subtype)
{
position = arg_position;
secondary_position = -1;
}
FieldMapping::FieldMapping(const FieldMapping& arg)
: name(arg.name), type(arg.type), subtype(arg.subtype)
{
position = arg.position;
secondary_position = arg.secondary_position;
}
FieldMapping FieldMapping::subType() {
return FieldMapping(name, subtype, position);
}
InputReaderAscii::InputReaderAscii()
{
file = 0;
//keyMap = new map<string, string>();
separator.assign( (const char*) BifConst::InputAscii::separator->Bytes(), BifConst::InputAscii::separator->Len());
if ( separator.size() != 1 ) {
Error("separator length has to be 1. Separator will be truncated.");
}
set_separator.assign( (const char*) BifConst::InputAscii::set_separator->Bytes(), BifConst::InputAscii::set_separator->Len());
if ( set_separator.size() != 1 ) {
Error("set_separator length has to be 1. Separator will be truncated.");
}
empty_field.assign( (const char*) BifConst::InputAscii::empty_field->Bytes(), BifConst::InputAscii::empty_field->Len());
unset_field.assign( (const char*) BifConst::InputAscii::unset_field->Bytes(), BifConst::InputAscii::unset_field->Len());
}
InputReaderAscii::~InputReaderAscii()
{
DoFinish();
}
void InputReaderAscii::DoFinish()
{
filters.empty();
if ( file != 0 ) {
file->close();
delete(file);
file = 0;
}
}
bool InputReaderAscii::DoInit(string path)
{
fname = path;
file = new ifstream(path.c_str());
if ( !file->is_open() ) {
Error(Fmt("cannot open %s", fname.c_str()));
return false;
}
return true;
}
bool InputReaderAscii::DoAddFilter( int id, int arg_num_fields, const LogField* const* fields ) {
if ( HasFilter(id) ) {
return false; // no, we don't want to add this a second time
}
Filter f;
f.num_fields = arg_num_fields;
f.fields = fields;
filters[id] = f;
return true;
}
bool InputReaderAscii::DoRemoveFilter ( int id ) {
if (!HasFilter(id) ) {
return false;
}
assert ( filters.erase(id) == 1 );
return true;
}
bool InputReaderAscii::HasFilter(int id) {
map<int, Filter>::iterator it = filters.find(id);
if ( it == filters.end() ) {
return false;
}
return true;
}
bool InputReaderAscii::ReadHeader() {
// try to read the header line...
string line;
if ( !GetLine(line) ) {
Error("could not read first line");
return false;
}
map<string, uint32_t> fields;
// construcr list of field names.
istringstream splitstream(line);
int pos=0;
while ( splitstream ) {
string s;
if ( !getline(splitstream, s, separator[0]))
break;
fields[s] = pos;
pos++;
}
for ( map<int, Filter>::iterator it = filters.begin(); it != filters.end(); it++ ) {
for ( unsigned int i = 0; i < (*it).second.num_fields; i++ ) {
const LogField* field = (*it).second.fields[i];
map<string, uint32_t>::iterator fit = fields.find(field->name);
if ( fit == fields.end() ) {
Error(Fmt("Did not find requested field %s in input data file.", field->name.c_str()));
return false;
}
FieldMapping f(field->name, field->type, field->subtype, fields[field->name]);
if ( field->secondary_name != "" ) {
map<string, uint32_t>::iterator fit2 = fields.find(field->secondary_name);
if ( fit2 == fields.end() ) {
Error(Fmt("Could not find requested port type field %s in input data file.", field->secondary_name.c_str()));
return false;
}
f.secondary_position = fields[field->secondary_name];
}
(*it).second.columnMap.push_back(f);
}
}
// well, that seems to have worked...
return true;
}
bool InputReaderAscii::GetLine(string& str) {
while ( getline(*file, str) ) {
if ( str[0] != '#' ) {
return true;
}
if ( str.compare(0,8, "#fields\t") == 0 ) {
str = str.substr(8);
return true;
}
}
return false;
}
TransportProto InputReaderAscii::StringToProto(const string &proto) {
if ( proto == "unknown" ) {
return TRANSPORT_UNKNOWN;
} else if ( proto == "tcp" ) {
return TRANSPORT_TCP;
} else if ( proto == "udp" ) {
return TRANSPORT_UDP;
} else if ( proto == "icmp" ) {
return TRANSPORT_ICMP;
}
//assert(false);
reporter->Error("Tried to parse invalid/unknown protocol: %s", proto.c_str());
return TRANSPORT_UNKNOWN;
}
LogVal* InputReaderAscii::EntryToVal(string s, FieldMapping field) {
LogVal* val = new LogVal(field.type, true);
if ( s.compare(unset_field) == 0 ) { // field is not set...
return new LogVal(field.type, false);
}
switch ( field.type ) {
case TYPE_ENUM:
case TYPE_STRING:
val->val.string_val = new string(s);
break;
case TYPE_BOOL:
if ( s == "T" ) {
val->val.int_val = 1;
} else if ( s == "F" ) {
val->val.int_val = 0;
} else {
Error(Fmt("Invalid value for boolean: %s", s.c_str()));
return false;
}
break;
case TYPE_INT:
val->val.int_val = atoi(s.c_str());
break;
case TYPE_DOUBLE:
case TYPE_TIME:
case TYPE_INTERVAL:
val->val.double_val = atof(s.c_str());
break;
case TYPE_COUNT:
case TYPE_COUNTER:
val->val.uint_val = atoi(s.c_str());
break;
case TYPE_PORT:
val->val.port_val.port = atoi(s.c_str());
val->val.port_val.proto = TRANSPORT_UNKNOWN;
break;
case TYPE_SUBNET: {
int pos = s.find("/");
string width = s.substr(pos+1);
val->val.subnet_val.width = atoi(width.c_str());
string addr = s.substr(0, pos);
s = addr;
// NOTE: dotted_to_addr BREAKS THREAD SAFETY! it uses reporter.
// Solve this some other time....
#ifdef BROv6
if ( s.find(':') != s.npos ) {
uint32* addr = dotted_to_addr6(s.c_str());
copy_addr(val->val.subnet_val.net, addr);
delete addr;
} else {
val->val.subnet_val.net[0] = val->val.subnet_val.net[1] = val->val.subnet_val.net[2] = 0;
val->val.subnet_val.net[3] = dotted_to_addr(s.c_str());
}
#else
val->val.subnet_val.net = dotted_to_addr(s.c_str());
#endif
break;
}
case TYPE_ADDR: {
// NOTE: dottet_to_addr BREAKS THREAD SAFETY! it uses reporter.
// Solve this some other time....
#ifdef BROv6
if ( s.find(':') != s.npos ) {
uint32* addr = dotted_to_addr6(s.c_str());
copy_addr(val->val.addr_val, addr);
delete addr;
} else {
val->val.addr_val[0] = val->val.addr_val[1] = val->val.addr_val[2] = 0;
val->val.addr_val[3] = dotted_to_addr(s.c_str());
}
#else
uint32 t = dotted_to_addr(s.c_str());
copy_addr(&t, val->val.addr_val);
#endif
break;
}
case TYPE_TABLE:
case TYPE_VECTOR:
// First - common initialization
// Then - initialization for table.
// Then - initialization for vector.
// Then - common stuff
{
// how many entries do we have...
unsigned int length = 1;
for ( unsigned int i = 0; i < s.size(); i++ )
if ( s[i] == ',') length++;
unsigned int pos = 0;
if ( s.compare(empty_field) == 0 )
length = 0;
LogVal** lvals = new LogVal* [length];
if ( field.type == TYPE_TABLE ) {
val->val.set_val.vals = lvals;
val->val.set_val.size = length;
} else if ( field.type == TYPE_VECTOR ) {
val->val.vector_val.vals = lvals;
val->val.vector_val.size = length;
} else {
assert(false);
}
if ( length == 0 )
break; //empty
istringstream splitstream(s);
while ( splitstream ) {
string element;
if ( !getline(splitstream, element, set_separator[0]) )
break;
if ( pos >= length ) {
Error(Fmt("Internal error while parsing set. pos %d >= length %d. Element: %s", pos, length, element.c_str()));
break;
}
LogVal* newval = EntryToVal(element, field.subType());
if ( newval == 0 ) {
Error("Error while reading set");
return 0;
}
lvals[pos] = newval;
pos++;
}
if ( pos != length ) {
Error("Internal error while parsing set: did not find all elements");
return 0;
}
break;
}
default:
Error(Fmt("unsupported field format %d for %s", field.type,
field.name.c_str()));
return 0;
}
return val;
}
// read the entire file and send appropriate thingies back to InputMgr
bool InputReaderAscii::DoUpdate() {
// dirty, fix me. (well, apparently after trying seeking, etc - this is not that bad)
if ( file && file->is_open() ) {
file->close();
}
file = new ifstream(fname.c_str());
if ( !file->is_open() ) {
Error(Fmt("cannot open %s", fname.c_str()));
return false;
}
//
// file->seekg(0, ios::beg); // do not forget clear.
if ( ReadHeader() == false ) {
return false;
}
string line;
while ( GetLine(line ) ) {
// split on tabs
istringstream splitstream(line);
map<int, string> stringfields;
int pos = 0;
while ( splitstream ) {
string s;
if ( !getline(splitstream, s, separator[0]) )
break;
stringfields[pos] = s;
pos++;
}
pos--; // for easy comparisons of max element.
for ( map<int, Filter>::iterator it = filters.begin(); it != filters.end(); it++ ) {
LogVal** fields = new LogVal*[(*it).second.num_fields];
int fpos = 0;
for ( vector<FieldMapping>::iterator fit = (*it).second.columnMap.begin();
fit != (*it).second.columnMap.end();
fit++ ){
if ( (*fit).position > pos || (*fit).secondary_position > pos ) {
Error(Fmt("Not enough fields in line %s. Found %d fields, want positions %d and %d", line.c_str(), pos, (*fit).position, (*fit).secondary_position));
return false;
}
LogVal* val = EntryToVal(stringfields[(*fit).position], *fit);
if ( val == 0 ) {
return false;
}
if ( (*fit).secondary_position != -1 ) {
// we have a port definition :)
assert(val->type == TYPE_PORT );
// Error(Fmt("Got type %d != PORT with secondary position!", val->type));
val->val.port_val.proto = StringToProto(stringfields[(*fit).secondary_position]);
}
fields[fpos] = val;
fpos++;
}
assert ( (unsigned int) fpos == (*it).second.num_fields );
SendEntry((*it).first, fields);
for ( unsigned int i = 0; i < (*it).second.num_fields; i++ ) {
delete fields[i];
}
delete [] fields;
}
}
//file->clear(); // remove end of file evil bits
//file->seekg(0, ios::beg); // and seek to start.
for ( map<int, Filter>::iterator it = filters.begin(); it != filters.end(); it++ ) {
EndCurrentSend((*it).first);
}
return true;
}

88
src/input/readers/Ascii.h Normal file
View file

@ -0,0 +1,88 @@
// See the file "COPYING" in the main distribution directory for copyright.
#ifndef INPUTREADERASCII_H
#define INPUTREADERASCII_H
#include "InputReader.h"
#include <fstream>
#include <iostream>
#include <vector>
// Description for input field mapping
struct FieldMapping {
string name;
TypeTag type;
// internal type for sets and vectors
TypeTag subtype;
int position;
// for ports: pos of the second field
int secondary_position;
FieldMapping(const string& arg_name, const TypeTag& arg_type, int arg_position);
FieldMapping(const string& arg_name, const TypeTag& arg_type, const TypeTag& arg_subtype, int arg_position);
FieldMapping(const FieldMapping& arg);
FieldMapping() { position = -1; secondary_position = -1; }
FieldMapping subType();
//bool IsEmpty() { return position == -1; }
};
class InputReaderAscii : public InputReader {
public:
InputReaderAscii();
~InputReaderAscii();
static InputReader* Instantiate() { return new InputReaderAscii; }
protected:
virtual bool DoInit(string path);
virtual bool DoAddFilter( int id, int arg_num_fields, const LogField* const* fields );
virtual bool DoRemoveFilter ( int id );
virtual void DoFinish();
virtual bool DoUpdate();
private:
struct Filter {
unsigned int num_fields;
const LogField* const * fields; // raw mapping
// map columns in the file to columns to send back to the manager
vector<FieldMapping> columnMap;
};
bool HasFilter(int id);
TransportProto StringToProto(const string &proto);
bool ReadHeader();
LogVal* EntryToVal(string s, FieldMapping type);
bool GetLine(string& str);
ifstream* file;
string fname;
map<int, Filter> filters;
// Options set from the script-level.
string separator;
string set_separator;
string empty_field;
string unset_field;
};
#endif /* INPUTREADERASCII_H */