Fix filter path_func to allow record argument as a subset of stream's columns.

This required adding the ability for RecordVal::CoerceTo functions to
optionally allow orphaning fields.  The default is to not allow it, but
now before writing to a log, the value of the stream's columns is coerced
down, if necessary, before passing it on to the filter's path_func.

Addresses #600.
This commit is contained in:
Jon Siwek 2011-09-09 14:57:22 -05:00
parent 3a3f58d5df
commit 7ff2a3e115
6 changed files with 82 additions and 6 deletions

View file

@ -902,7 +902,13 @@ bool LogMgr::Write(EnumVal* id, RecordVal* columns)
val_list vl(3); val_list vl(3);
vl.append(id->Ref()); vl.append(id->Ref());
vl.append(filter->path_val->Ref()); vl.append(filter->path_val->Ref());
vl.append(columns->Ref()); Val* rec_arg;
BroType* rt = filter->path_func->FType()->Args()->FieldType("rec");
if ( rt->Tag() == TYPE_RECORD )
rec_arg = columns->CoerceTo(rt->AsRecordType(), true);
else
rec_arg = columns->Ref();
vl.append(rec_arg);
Val* v = filter->path_func->Call(&vl); Val* v = filter->path_func->Call(&vl);
if ( ! v->Type()->Tag() == TYPE_STRING ) if ( ! v->Type()->Tag() == TYPE_STRING )

View file

@ -2863,7 +2863,7 @@ Val* RecordVal::LookupWithDefault(int field) const
return record_type->FieldDefault(field); return record_type->FieldDefault(field);
} }
RecordVal* RecordVal::CoerceTo(const RecordType* t, Val* aggr) const RecordVal* RecordVal::CoerceTo(const RecordType* t, Val* aggr, bool allow_orphaning) const
{ {
if ( ! record_promotion_compatible(t->AsRecordType(), Type()->AsRecordType()) ) if ( ! record_promotion_compatible(t->AsRecordType(), Type()->AsRecordType()) )
return 0; return 0;
@ -2883,6 +2883,8 @@ RecordVal* RecordVal::CoerceTo(const RecordType* t, Val* aggr) const
if ( t_i < 0 ) if ( t_i < 0 )
{ {
if ( allow_orphaning ) continue;
char buf[512]; char buf[512];
safe_snprintf(buf, sizeof(buf), safe_snprintf(buf, sizeof(buf),
"orphan field \"%s\" in initialization", "orphan field \"%s\" in initialization",
@ -2916,7 +2918,7 @@ RecordVal* RecordVal::CoerceTo(const RecordType* t, Val* aggr) const
return ar; return ar;
} }
RecordVal* RecordVal::CoerceTo(RecordType* t) RecordVal* RecordVal::CoerceTo(RecordType* t, bool allow_orphaning)
{ {
if ( same_type(Type(), t) ) if ( same_type(Type(), t) )
{ {
@ -2924,7 +2926,7 @@ RecordVal* RecordVal::CoerceTo(RecordType* t)
return this; return this;
} }
return CoerceTo(t, 0); return CoerceTo(t, 0, allow_orphaning);
} }
void RecordVal::Describe(ODesc* d) const void RecordVal::Describe(ODesc* d) const

View file

@ -909,8 +909,11 @@ public:
// *aggr* is optional; if non-zero, we add to it. See // *aggr* is optional; if non-zero, we add to it. See
// Expr::InitVal(). We leave it out in the non-const version to make // Expr::InitVal(). We leave it out in the non-const version to make
// the choice unambigious. // the choice unambigious.
RecordVal* CoerceTo(const RecordType* other, Val* aggr) const; //
RecordVal* CoerceTo(RecordType* other); // The *allow_orphaning* parameter allows for a record to be demoted down
// to a record type that contains less fields
RecordVal* CoerceTo(const RecordType* other, Val* aggr, bool allow_orphaning = false) const;
RecordVal* CoerceTo(RecordType* other, bool allow_orphaning = false);
unsigned int MemoryAllocation() const; unsigned int MemoryAllocation() const;
void DescribeReST(ODesc* d) const; void DescribeReST(ODesc* d) const;

View file

@ -0,0 +1,34 @@
#separator \x09
#path local
#fields ts id.orig_h
#types time addr
1300475168.652003 141.142.220.118
1300475168.724007 141.142.220.118
1300475168.859163 141.142.220.118
1300475168.902635 141.142.220.118
1300475168.892936 141.142.220.118
1300475168.892913 141.142.220.118
1300475168.855305 141.142.220.118
1300475168.855330 141.142.220.118
1300475168.895267 141.142.220.118
1300475168.853899 141.142.220.118
1300475168.893988 141.142.220.118
1300475168.894787 141.142.220.118
1300475173.117362 141.142.220.226
1300475173.153679 141.142.220.238
1300475168.857956 141.142.220.118
1300475168.854378 141.142.220.118
1300475168.854837 141.142.220.118
1300475167.099816 141.142.220.50
1300475168.891644 141.142.220.118
1300475168.892037 141.142.220.118
1300475171.677081 141.142.220.226
1300475168.894422 141.142.220.118
1300475167.096535 141.142.220.202
1300475168.858713 141.142.220.118
1300475168.902195 141.142.220.118
1300475169.899438 141.142.220.44
1300475168.892414 141.142.220.118
1300475168.858306 141.142.220.118
1300475168.901749 141.142.220.118
1300475170.862384 141.142.220.226

View file

@ -0,0 +1,5 @@
#separator \x09
#path remote
#fields ts id.orig_h
#types time addr
1300475169.780331 173.192.163.128

View file

@ -0,0 +1,26 @@
# @TEST-EXEC: bro -b -r $TRACES/wikipedia.trace %INPUT
# @TEST-EXEC: btest-diff local.log
# @TEST-EXEC: btest-diff remote.log
#
# The record value passed into the path_func should be allowed to contain a
# subset of the fields in the stream's columns.
@load base/utils/site
@load base/protocols/conn
@load base/frameworks/notice
redef Site::local_nets = {141.142.0.0/16};
function split_log(id: Log::ID, path: string, rec: record {id:conn_id;}): string
{
return Site::is_local_addr(rec$id$orig_h) ? "local" : "remote";
}
event bro_init()
{
# Add a new filter to the Conn::LOG stream that logs only
# timestamp and originator address.
local filter: Log::Filter = [$name="dst-only", $path_func=split_log,
$include=set("ts", "id.orig_h")];
Log::add_filter(Conn::LOG, filter);
}