Add logging framework metadata mechanism.

Example script coming soon.
This commit is contained in:
Seth Hall 2016-05-24 09:28:07 -04:00
parent b28801ce95
commit 3e3f6f13cc
3 changed files with 118 additions and 39 deletions

View file

@ -33,6 +33,9 @@ struct Manager::Filter {
TableVal* config;
TableVal* field_name_map;
string unrolling_sep;
string metadata_prefix;
Func* metadata_func;
int num_metadata;
bool local;
bool remote;
double interval;
@ -378,24 +381,45 @@ bool Manager::DisableStream(EnumVal* id)
bool Manager::TraverseRecord(Stream* stream, Filter* filter, RecordType* rt,
TableVal* include, TableVal* exclude, string path, list<int> indices)
{
for ( int i = 0; i < rt->NumFields(); ++i )
// Only include metadata for the outer record.
int num_metadata = (indices.size() == 0) ? filter->num_metadata : 0;
int i = 0;
for ( int j = 0; j < num_metadata + rt->NumFields(); ++j )
{
BroType* t = rt->FieldType(i);
RecordType* rtype;
// If this is a metadata field, set the rtype appropriately
if ( j < num_metadata )
{
i = j;
rtype = filter->metadata_func->FType()->YieldType()->AsRecordType();
}
else
{
i = j - num_metadata;
rtype = rt;
}
BroType* t = rtype->FieldType(i);
// Ignore if &log not specified.
if ( ! rt->FieldDecl(i)->FindAttr(ATTR_LOG) )
if ( ! rtype->FieldDecl(i)->FindAttr(ATTR_LOG) )
continue;
list<int> new_indices = indices;
new_indices.push_back(i);
new_indices.push_back(j);
// Build path name.
string new_path;
if ( ! path.size() )
new_path = rt->FieldName(i);
new_path = rtype->FieldName(i);
else
new_path = path + filter->unrolling_sep + rt->FieldName(i);
new_path = path + filter->unrolling_sep + rtype->FieldName(i);
// Add the metadata prefix if this is a metadata field.
if ( j < num_metadata )
new_path = filter->metadata_prefix + new_path;
if ( t->InternalType() == TYPE_INTERNAL_OTHER )
{
@ -466,7 +490,6 @@ bool Manager::TraverseRecord(Stream* stream, Filter* filter, RecordType* rt,
}
// Alright, we want this field.
filter->indices.push_back(new_indices);
void* tmp =
@ -490,7 +513,7 @@ bool Manager::TraverseRecord(Stream* stream, Filter* filter, RecordType* rt,
else if ( t->Tag() == TYPE_VECTOR )
st = t->AsVectorType()->YieldType()->Tag();
bool optional = rt->FieldDecl(i)->FindAttr(ATTR_OPTIONAL);
bool optional = rtype->FieldDecl(i)->FindAttr(ATTR_OPTIONAL);
filter->fields[filter->num_fields - 1] = new threading::Field(new_path.c_str(), 0, t->Tag(), st, optional);
}
@ -527,6 +550,8 @@ bool Manager::AddFilter(EnumVal* id, RecordVal* fval)
Val* config = fval->Lookup("config", true);
Val* field_name_map = fval->Lookup("field_name_map", true);
Val* unrolling_sep = fval->Lookup("unrolling_sep", true);
Val* metadata_prefix = fval->Lookup("metadata_prefix", true);
Val* metadata_func = fval->Lookup("metadata_func", true);
Filter* filter = new Filter;
filter->name = name->AsString()->CheckString();
@ -541,6 +566,8 @@ bool Manager::AddFilter(EnumVal* id, RecordVal* fval)
filter->config = config->Ref()->AsTableVal();
filter->field_name_map = field_name_map->Ref()->AsTableVal();
filter->unrolling_sep = unrolling_sep->AsString()->CheckString();
filter->metadata_prefix = metadata_prefix->AsString()->CheckString();
filter->metadata_func = metadata_func ? metadata_func->AsFunc() : 0;
Unref(name);
Unref(pred);
@ -552,12 +579,18 @@ bool Manager::AddFilter(EnumVal* id, RecordVal* fval)
Unref(config);
Unref(field_name_map);
Unref(unrolling_sep);
Unref(metadata_prefix);
Unref(metadata_func);
// Build the list of fields that the filter wants included, including
// potentially rolling out fields.
Val* include = fval->Lookup("include");
Val* exclude = fval->Lookup("exclude");
filter->num_metadata = 0;
if ( filter->metadata_func )
filter->num_metadata = filter->metadata_func->FType()->YieldType()->AsRecordType()->NumFields();
filter->num_fields = 0;
filter->fields = 0;
if ( ! TraverseRecord(stream, filter, stream->columns,
@ -1012,32 +1045,56 @@ threading::Value* Manager::ValToLogVal(Val* val, BroType* ty)
threading::Value** Manager::RecordToFilterVals(Stream* stream, Filter* filter,
RecordVal* columns)
{
RecordVal* metadata_rec = 0;
if ( filter->metadata_func )
{
val_list vl(1);
vl.append(new StringVal(filter->path));
metadata_rec = filter->metadata_func->Call(&vl)->AsRecordVal();
}
threading::Value** vals = new threading::Value*[filter->num_fields];
for ( int i = 0; i < filter->num_fields; ++i )
for ( int i=0; i < filter->num_fields; ++i )
{
TypeTag type = TYPE_ERROR;
Val* val = columns;
// For each field, first find the right value, which can
// potentially be nested inside other records.
list<int>& indices = filter->indices[i];
for ( list<int>::iterator j = indices.begin(); j != indices.end(); ++j )
if ( i < filter->num_metadata )
{
type = val->Type()->AsRecordType()->FieldType(*j)->Tag();
val = val->AsRecordVal()->Lookup(*j);
if ( ! val )
{
// Value, or any of its parents, is not set.
vals[i] = new threading::Value(filter->fields[i]->type, false);
break;
}
vals[i] = ValToLogVal(metadata_rec->Lookup(i));
}
else
{
if ( val )
vals[i] = ValToLogVal(val);
TypeTag type = TYPE_ERROR;
Val* val = columns;
// For each field, first find the right value, which can
// potentially be nested inside other records.
list<int>& indices = filter->indices[i];
bool metadata_done = false;
for ( list<int>::iterator j = indices.begin(); j != indices.end(); ++j )
{
// Only check for metadata on the outermost record.
int nmd = 0;
if ( !metadata_done )
{
nmd = filter->num_metadata;
metadata_done = true;
}
val = val->AsRecordVal()->Lookup((*j) - nmd);
if ( ! val )
{
// Value, or any of its parents, is not set.
vals[i] = new threading::Value(filter->fields[i]->type, false);
break;
}
}
if ( val )
vals[i] = ValToLogVal(val);
}
}
return vals;
@ -1087,6 +1144,8 @@ WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, WriterBacken
found_filter_match = true;
winfo->interval = f->interval;
winfo->postprocessor = f->postprocessor;
break;
}
}