make port annotation work and ascii input reader way more rebust with better error messages.

This commit is contained in:
Bernhard Amann 2011-12-06 10:42:37 -08:00
parent aecbbdd966
commit 4a690484ec
8 changed files with 187 additions and 81 deletions

View file

@ -17,7 +17,7 @@ const char* attr_name(attr_tag t)
"&persistent", "&synchronized", "&postprocessor",
"&encrypt", "&match", "&disable_print_hook",
"&raw_output", "&mergeable", "&priority",
"&group", "&log", "&error_handler", "(&tracked)",
"&group", "&log", "&error_handler", "&type_column", "(&tracked)",
};
return attr_names[int(t)];
@ -417,6 +417,26 @@ void Attributes::CheckAttr(Attr* a)
Error("&log applied to a type that cannot be logged");
break;
case ATTR_TYPE_COLUMN:
{
if ( type->Tag() != TYPE_PORT )
{
Error("type_column tag only applicable to ports");
break;
}
BroType* atype = a->AttrExpr()->Type();
if ( atype->Tag() != TYPE_STRING ) {
Error("type column needs to have a string argument");
break;
}
break;
}
default:
BadTag("Attributes::CheckAttr", attr_name(a->Tag()));
}

View file

@ -1167,10 +1167,15 @@ int InputMgr::GetLogValLength(const LogVal* val) {
case TYPE_COUNT:
case TYPE_COUNTER:
case TYPE_PORT:
length += sizeof(val->val.uint_val);
break;
case TYPE_PORT:
length += sizeof(val->val.port_val.port);
if ( val->val.port_val.proto != 0 )
length += val->val.port_val.proto->size();
break;
case TYPE_DOUBLE:
case TYPE_TIME:
case TYPE_INTERVAL:
@ -1228,12 +1233,24 @@ int InputMgr::CopyLogVal(char *data, const int startpos, const LogVal* val) {
case TYPE_COUNT:
case TYPE_COUNTER:
case TYPE_PORT:
//*(data+startpos) = val->val.uint_val;
memcpy(data+startpos, (const void*) &(val->val.uint_val), sizeof(val->val.uint_val));
return sizeof(val->val.uint_val);
break;
case TYPE_PORT: {
int length = 0;
memcpy(data+startpos, (const void*) &(val->val.port_val.port), sizeof(val->val.port_val.port));
length += sizeof(val->val.port_val.port);
if ( val->val.port_val.proto != 0 ) {
memcpy(data+startpos, val->val.port_val.proto->c_str(), val->val.port_val.proto->length());
length += val->val.port_val.proto->size();
}
return length;
break;
}
case TYPE_DOUBLE:
case TYPE_TIME:
case TYPE_INTERVAL:
@ -1320,6 +1337,24 @@ HashKey* InputMgr::HashLogVals(const int num_elements, const LogVal* const *vals
}
TransportProto InputMgr::StringToProto(const string &proto) {
if ( proto == "unknown" ) {
return TRANSPORT_UNKNOWN;
} else if ( proto == "tcp" ) {
return TRANSPORT_TCP;
} else if ( proto == "udp" ) {
return TRANSPORT_UDP;
} else if ( proto == "icmp" ) {
return TRANSPORT_ICMP;
}
//assert(false);
reporter->Error("Tried to parse invalid/unknown protocol: %s", proto.c_str());
return TRANSPORT_UNKNOWN;
}
Val* InputMgr::LogValToVal(const LogVal* val, BroType* request_type) {
if ( request_type->Tag() != TYPE_ANY && request_type->Tag() != val->type ) {
@ -1357,7 +1392,10 @@ Val* InputMgr::LogValToVal(const LogVal* val, BroType* request_type) {
}
case TYPE_PORT:
return new PortVal(val->val.uint_val);
if ( val->val.port_val.proto == 0 )
return new PortVal(val->val.port_val.port);
else
return new PortVal(val->val.port_val.port, StringToProto(*val->val.port_val.proto));
break;
case TYPE_ADDR:

View file

@ -68,6 +68,8 @@ private:
Val* LogValToIndexVal(int num_fields, const RecordType* type, const LogVal* const *vals);
RecordVal* LogValToRecordVal(const LogVal* const *vals, RecordType *request_type, int* position);
TransportProto StringToProto(const string &proto);
ReaderInfo* FindReader(const InputReader* reader);
ReaderInfo* FindReader(const EnumVal* id);

View file

@ -10,25 +10,27 @@ FieldMapping::FieldMapping(const string& arg_name, const TypeTag& arg_type, int
: name(arg_name), type(arg_type)
{
position = arg_position;
secondary_position = -1;
}
FieldMapping::FieldMapping(const string& arg_name, const TypeTag& arg_type, const TypeTag& arg_subtype, int arg_position)
: name(arg_name), type(arg_type), subtype(arg_subtype)
{
position = arg_position;
secondary_position = -1;
}
FieldMapping::FieldMapping(const FieldMapping& arg)
: name(arg.name), type(arg.type), subtype(arg.subtype)
{
position = arg.position;
secondary_position = arg.secondary_position;
}
FieldMapping FieldMapping::subType() {
return FieldMapping(name, subtype, position);
}
InputReaderAscii::InputReaderAscii()
{
file = 0;
@ -122,45 +124,45 @@ bool InputReaderAscii::ReadHeader() {
return false;
}
map<string, uint32_t> fields;
// construcr list of field names.
istringstream splitstream(line);
int pos=0;
while ( splitstream ) {
string s;
if ( !getline(splitstream, s, separator[0]))
break;
fields[s] = pos;
pos++;
}
for ( map<int, Filter>::iterator it = filters.begin(); it != filters.end(); it++ ) {
// split on tabs...
istringstream splitstream(line);
unsigned int currTab = 0;
int wantFields = 0;
while ( splitstream ) {
string s;
if ( !getline(splitstream, s, separator[0]))
break;
// current found heading in s... compare if we want it
for ( unsigned int i = 0; i < (*it).second.num_fields; i++ ) {
const LogField* field = (*it).second.fields[i];
if ( field->name == s ) {
// cool, found field. note position
FieldMapping f(field->name, field->type, field->subtype, i);
(*it).second.columnMap.push_back(f);
wantFields++;
break; // done with searching
for ( unsigned int i = 0; i < (*it).second.num_fields; i++ ) {
const LogField* field = (*it).second.fields[i];
map<string, uint32_t>::iterator fit = fields.find(field->name);
if ( fit == fields.end() ) {
Error(Fmt("Did not find requested field %s in input data file.", field->name.c_str()));
return false;
}
FieldMapping f(field->name, field->type, field->subtype, fields[field->name]);
if ( field->secondary_name != "" ) {
map<string, uint32_t>::iterator fit2 = fields.find(field->secondary_name);
if ( fit2 == fields.end() ) {
Error(Fmt("Could not find requested port type field %s in input data file.", field->secondary_name.c_str()));
return false;
}
f.secondary_position = fields[field->secondary_name];
}
// look if we did push something...
if ( (*it).second.columnMap.size() == currTab ) {
// no, we didn't. note that...
FieldMapping empty;
(*it).second.columnMap.push_back(empty);
}
// done
currTab++;
(*it).second.columnMap.push_back(f);
}
if ( wantFields != (int) (*it).second.num_fields ) {
// we did not find all fields?
// :(
Error(Fmt("One of the requested fields could not be found in the input data file. Found %d fields, wanted %d. Filternum: %d", wantFields, (*it).second.num_fields, (*it).first));
return false;
}
}
// well, that seems to have worked...
@ -220,10 +222,14 @@ LogVal* InputReaderAscii::EntryToVal(string s, FieldMapping field) {
case TYPE_COUNT:
case TYPE_COUNTER:
case TYPE_PORT:
val->val.uint_val = atoi(s.c_str());
break;
case TYPE_PORT:
val->val.port_val.port = atoi(s.c_str());
val->val.port_val.proto = 0;
break;
case TYPE_SUBNET: {
int pos = s.find("/");
string width = s.substr(pos+1);
@ -346,59 +352,55 @@ bool InputReaderAscii::DoUpdate() {
string line;
while ( GetLine(line ) ) {
// split on tabs
istringstream splitstream(line);
map<int, string> stringfields;
int pos = 0;
while ( splitstream ) {
string s;
if ( !getline(splitstream, s, separator[0]) )
break;
stringfields[pos] = s;
pos++;
}
pos--; // for easy comparisons of max element.
for ( map<int, Filter>::iterator it = filters.begin(); it != filters.end(); it++ ) {
// split on tabs
istringstream splitstream(line);
LogVal** fields = new LogVal*[(*it).second.num_fields];
//string string_fields[num_fields];
unsigned int currTab = 0;
unsigned int currField = 0;
while ( splitstream ) {
int fpos = 0;
for ( vector<FieldMapping>::iterator fit = (*it).second.columnMap.begin();
fit != (*it).second.columnMap.end();
fit++ ){
string s;
if ( !getline(splitstream, s, separator[0]) )
break;
if ( currTab >= (*it).second.columnMap.size() ) {
Error("Tabs in heading do not match tabs in data?");
//disabled = true;
if ( (*fit).position > pos || (*fit).secondary_position > pos ) {
Error(Fmt("Not enough fields in line %s. Found %d fields, want positions %d and %d", line.c_str(), pos, (*fit).position, (*fit).secondary_position));
return false;
}
FieldMapping currMapping = (*it).second.columnMap[currTab];
currTab++;
if ( currMapping.IsEmpty() ) {
// well, that was easy
continue;
}
if ( currField >= (*it).second.num_fields ) {
Error("internal error - fieldnum greater as possible");
return false;
}
LogVal* val = EntryToVal(s, currMapping);
LogVal* val = EntryToVal(stringfields[(*fit).position], *fit);
if ( val == 0 ) {
return false;
}
fields[currMapping.position] = val;
//string_fields[currMapping.position] = s;
currField++;
}
if ( currField != (*it).second.num_fields ) {
Error("curr_field != num_fields in DoUpdate. Columns in file do not match column definition.");
return false;
if ( (*fit).secondary_position != -1 ) {
// we have a port definition :)
assert(val->type == TYPE_PORT );
// Error(Fmt("Got type %d != PORT with secondary position!", val->type));
val->val.port_val.proto = new string(stringfields[(*fit).secondary_position]);
}
fields[fpos] = val;
fpos++;
}
assert ( (unsigned int) fpos == (*it).second.num_fields );
SendEntry((*it).first, fields);

View file

@ -21,10 +21,10 @@ struct FieldMapping {
FieldMapping(const string& arg_name, const TypeTag& arg_type, int arg_position);
FieldMapping(const string& arg_name, const TypeTag& arg_type, const TypeTag& arg_subtype, int arg_position);
FieldMapping(const FieldMapping& arg);
FieldMapping() { position = -1; }
FieldMapping() { position = -1; secondary_position = -1; }
FieldMapping subType();
bool IsEmpty() { return position == -1; }
//bool IsEmpty() { return position == -1; }
};

View file

@ -1088,7 +1088,6 @@ string LogMgr::TransportProtoToString(TransportProto p) {
}
assert(false);
return "";
}
LogVal* LogMgr::ValToLogVal(Val* val, BroType* ty)

View file

@ -0,0 +1,6 @@
Trying to find field: t
{
[1.2.3.4] = [p=80/tcp],
[1.2.4.6] = [p=30/unknown],
[1.2.3.5] = [p=52/udp]
}

View file

@ -0,0 +1,39 @@
#
# @TEST-EXEC: bro %INPUT >out
# @TEST-EXEC: btest-diff out
@TEST-START-FILE input.log
#fields i p t
1.2.3.4 80 tcp
1.2.3.5 52 udp
1.2.4.6 30 unknown
@TEST-END-FILE
redef InputAscii::empty_field = "EMPTY";
module A;
export {
redef enum Input::ID += { INPUT };
}
type Idx: record {
i: addr;
};
type Val: record {
p: port &type_column="t";
};
global servers: table[addr] of Val = table();
event bro_init()
{
# first read in the old stuff into the table...
Input::create_stream(A::INPUT, [$source="input.log"]);
Input::add_tablefilter(A::INPUT, [$name="ssh", $idx=Idx, $val=Val, $destination=servers]);
Input::force_update(A::INPUT);
print servers;
Input::remove_tablefilter(A::INPUT, "ssh");
Input::remove_stream(A::INPUT);
}