Merge remote-tracking branch 'origin/topic/seth/log-framework-ext'

* origin/topic/seth/log-framework-ext:
  Log extensions: series of small fixes and new tests.
  Change the function for log extension to take a path only and update tests.
  Final changes to log framework ext code.
  Add logging framework metadata mechanism.
  Add unrolling separator & field name map to logging framework.
This commit is contained in:
Johanna Amann 2016-08-10 20:29:48 -07:00
commit 0e44b91cd9
24 changed files with 882 additions and 292 deletions

View file

@ -23,6 +23,7 @@
using namespace logging;
struct Manager::Filter {
Val* fval;
string name;
EnumVal* id;
Func* pred;
@ -31,6 +32,11 @@ struct Manager::Filter {
Val* path_val;
EnumVal* writer;
TableVal* config;
TableVal* field_name_map;
string scope_sep;
string ext_prefix;
Func* ext_func;
int num_ext_fields;
bool local;
bool remote;
double interval;
@ -83,6 +89,11 @@ struct Manager::Stream {
Manager::Filter::~Filter()
{
Unref(fval);
Unref(field_name_map);
Unref(writer);
Unref(id);
for ( int i = 0; i < num_fields; ++i )
delete fields[i];
@ -376,12 +387,29 @@ bool Manager::DisableStream(EnumVal* id)
bool Manager::TraverseRecord(Stream* stream, Filter* filter, RecordType* rt,
TableVal* include, TableVal* exclude, string path, list<int> indices)
{
for ( int i = 0; i < rt->NumFields(); ++i )
// Only include extensions for the outer record.
int num_ext_fields = (indices.size() == 0) ? filter->num_ext_fields : 0;
int i = 0;
for ( int j = 0; j < num_ext_fields + rt->NumFields(); ++j )
{
BroType* t = rt->FieldType(i);
RecordType* rtype;
// If this is an ext field, set the rtype appropriately
if ( j < num_ext_fields )
{
i = j;
rtype = filter->ext_func->FType()->YieldType()->AsRecordType();
}
else
{
i = j - num_ext_fields;
rtype = rt;
}
BroType* t = rtype->FieldType(i);
// Ignore if &log not specified.
if ( ! rt->FieldDecl(i)->FindAttr(ATTR_LOG) )
if ( ! rtype->FieldDecl(i)->FindAttr(ATTR_LOG) )
continue;
list<int> new_indices = indices;
@ -391,9 +419,13 @@ bool Manager::TraverseRecord(Stream* stream, Filter* filter, RecordType* rt,
string new_path;
if ( ! path.size() )
new_path = rt->FieldName(i);
new_path = rtype->FieldName(i);
else
new_path = path + "." + rt->FieldName(i);
new_path = path + filter->scope_sep + rtype->FieldName(i);
// Add the ext prefix if this is an ext field.
if ( j < num_ext_fields )
new_path = filter->ext_prefix + new_path;
if ( t->InternalType() == TYPE_INTERNAL_OTHER )
{
@ -464,7 +496,6 @@ bool Manager::TraverseRecord(Stream* stream, Filter* filter, RecordType* rt,
}
// Alright, we want this field.
filter->indices.push_back(new_indices);
void* tmp =
@ -488,7 +519,7 @@ bool Manager::TraverseRecord(Stream* stream, Filter* filter, RecordType* rt,
else if ( t->Tag() == TYPE_VECTOR )
st = t->AsVectorType()->YieldType()->Tag();
bool optional = rt->FieldDecl(i)->FindAttr(ATTR_OPTIONAL);
bool optional = rtype->FieldDecl(i)->FindAttr(ATTR_OPTIONAL);
filter->fields[filter->num_fields - 1] = new threading::Field(new_path.c_str(), 0, t->Tag(), st, optional);
}
@ -523,8 +554,13 @@ bool Manager::AddFilter(EnumVal* id, RecordVal* fval)
Val* interv = fval->Lookup("interv", true);
Val* postprocessor = fval->Lookup("postprocessor", true);
Val* config = fval->Lookup("config", true);
Val* field_name_map = fval->Lookup("field_name_map", true);
Val* scope_sep = fval->Lookup("scope_sep", true);
Val* ext_prefix = fval->Lookup("ext_prefix", true);
Val* ext_func = fval->Lookup("ext_func", true);
Filter* filter = new Filter;
filter->fval = fval->Ref();
filter->name = name->AsString()->CheckString();
filter->id = id->Ref()->AsEnumVal();
filter->pred = pred ? pred->AsFunc() : 0;
@ -535,6 +571,10 @@ bool Manager::AddFilter(EnumVal* id, RecordVal* fval)
filter->interval = interv->AsInterval();
filter->postprocessor = postprocessor ? postprocessor->AsFunc() : 0;
filter->config = config->Ref()->AsTableVal();
filter->field_name_map = field_name_map->Ref()->AsTableVal();
filter->scope_sep = scope_sep->AsString()->CheckString();
filter->ext_prefix = ext_prefix->AsString()->CheckString();
filter->ext_func = ext_func ? ext_func->AsFunc() : 0;
Unref(name);
Unref(pred);
@ -544,12 +584,35 @@ bool Manager::AddFilter(EnumVal* id, RecordVal* fval)
Unref(interv);
Unref(postprocessor);
Unref(config);
Unref(field_name_map);
Unref(scope_sep);
Unref(ext_prefix);
Unref(ext_func);
// Build the list of fields that the filter wants included, including
// potentially rolling out fields.
Val* include = fval->Lookup("include");
Val* exclude = fval->Lookup("exclude");
filter->num_ext_fields = 0;
if ( filter->ext_func )
{
if ( filter->ext_func->FType()->YieldType()->Tag() == TYPE_RECORD )
{
filter->num_ext_fields = filter->ext_func->FType()->YieldType()->AsRecordType()->NumFields();
}
else if ( filter->ext_func->FType()->YieldType()->Tag() == TYPE_VOID )
{
// This is a special marker for the default no-implementation
// of the ext_func and we'll allow it to slide.
}
else
{
reporter->Error("Return value of log_ext is not a record (got %s)", type_name(filter->ext_func->FType()->YieldType()->Tag()));
return false;
}
}
filter->num_fields = 0;
filter->fields = 0;
if ( ! TraverseRecord(stream, filter, stream->columns,
@ -793,7 +856,22 @@ bool Manager::Write(EnumVal* id, RecordVal* columns)
threading::Field** arg_fields = new threading::Field*[filter->num_fields];
for ( int j = 0; j < filter->num_fields; ++j )
{
// Rename fields if a field name map is set.
if ( filter->field_name_map )
{
const char* name = filter->fields[j]->name;
StringVal *fn = new StringVal(name);
Val *val = 0;
if ( (val = filter->field_name_map->Lookup(fn, false)) != 0 )
{
delete [] filter->fields[j]->name;
filter->fields[j]->name = val->AsStringVal()->CheckString();
}
delete fn;
}
arg_fields[j] = new threading::Field(*filter->fields[j]);
}
WriterBackend::WriterInfo* info = new WriterBackend::WriterInfo;
info->path = copy_string(path.c_str());
@ -978,7 +1056,7 @@ threading::Value* Manager::ValToLogVal(Val* val, BroType* ty)
}
default:
reporter->InternalError("unsupported type for log_write");
reporter->InternalError("unsupported type %s for log_write", type_name(lval->type));
}
return lval;
@ -987,12 +1065,34 @@ threading::Value* Manager::ValToLogVal(Val* val, BroType* ty)
threading::Value** Manager::RecordToFilterVals(Stream* stream, Filter* filter,
RecordVal* columns)
{
RecordVal* ext_rec = nullptr;
if ( filter->num_ext_fields > 0 )
{
val_list vl(1);
vl.append(filter->path_val->Ref());
Val* res = filter->ext_func->Call(&vl);
if ( res )
ext_rec = res->AsRecordVal();
}
threading::Value** vals = new threading::Value*[filter->num_fields];
for ( int i = 0; i < filter->num_fields; ++i )
{
TypeTag type = TYPE_ERROR;
Val* val = columns;
Val* val;
if ( i < filter->num_ext_fields )
{
if ( ! ext_rec )
{
// executing function did not return record. Send empty for all vals.
vals[i] = new threading::Value(filter->fields[i]->type, false);
continue;
}
val = ext_rec;
}
else
val = columns;
// For each field, first find the right value, which can
// potentially be nested inside other records.
@ -1000,7 +1100,6 @@ threading::Value** Manager::RecordToFilterVals(Stream* stream, Filter* filter,
for ( list<int>::iterator j = indices.begin(); j != indices.end(); ++j )
{
type = val->Type()->AsRecordType()->FieldType(*j)->Tag();
val = val->AsRecordVal()->Lookup(*j);
if ( ! val )
@ -1015,6 +1114,9 @@ threading::Value** Manager::RecordToFilterVals(Stream* stream, Filter* filter,
vals[i] = ValToLogVal(val);
}
if ( ext_rec )
Unref(ext_rec);
return vals;
}
@ -1062,6 +1164,8 @@ WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, WriterBacken
found_filter_match = true;
winfo->interval = f->interval;
winfo->postprocessor = f->postprocessor;
break;
}
}