Brokerstore<->Tables: attribute conflicts

Makes some attributes conflict with each other. This also needed the
test to change.

The test is a bit flaky - but I can, for the heck of it, not figure out
why. I am punting that for the future after spending a few hours on it.
This commit is contained in:
Johanna Amann 2020-06-25 19:28:35 -07:00
parent 09119ae69d
commit b027b69f5d
4 changed files with 74 additions and 41 deletions

View file

@ -458,6 +458,13 @@ void Attributes::CheckAttr(Attr* a)
break; break;
case ATTR_EXPIRE_READ: case ATTR_EXPIRE_READ:
{
if ( Find(ATTR_BROKER_STORE) )
{
Error("&broker_store and &read_expire cannot be used simultaneously");
}
}
// fallthrough
case ATTR_EXPIRE_WRITE: case ATTR_EXPIRE_WRITE:
case ATTR_EXPIRE_CREATE: case ATTR_EXPIRE_CREATE:
{ {
@ -536,6 +543,11 @@ void Attributes::CheckAttr(Attr* a)
if ( ! e_ft->CheckArgs(&expected_args) ) if ( ! e_ft->CheckArgs(&expected_args) )
Error("&expire_func argument type clash"); Error("&expire_func argument type clash");
if ( Find(ATTR_BROKER_STORE ) )
{
Error("&broker_store and &expire_func cannot be used simultaneously");
}
} }
break; break;
@ -623,9 +635,16 @@ void Attributes::CheckAttr(Attr* a)
{ {
Error("&broker_store only supports one-element set/table indexes"); Error("&broker_store only supports one-element set/table indexes");
} }
if ( Find(ATTR_EXPIRE_FUNC ) )
{
Error("&broker_store and &expire_func cannot be used simultaneously");
}
if ( Find(ATTR_EXPIRE_READ) )
{
Error("&broker_store and &read_expire cannot be used simultaneously");
}
break;
} }
case ATTR_TRACKED: case ATTR_TRACKED:
// FIXME: Check here for global ID? // FIXME: Check here for global ID?
break; break;

View file

@ -1577,12 +1577,12 @@ bool TableVal::Assign(IntrusivePtr<Val> index, std::unique_ptr<HashKey> k,
auto change_index = index ? std::move(index) auto change_index = index ? std::move(index)
: RecreateIndex(k_copy); : RecreateIndex(k_copy);
if ( ! broker_store.empty() ) if ( broker_forward && ! broker_store.empty() )
SendToStore(change_index.get(), new_entry_val, old_entry_val ? ELEMENT_CHANGED : ELEMENT_NEW); SendToStore(change_index.get(), new_entry_val, old_entry_val ? ELEMENT_CHANGED : ELEMENT_NEW);
if ( change_func ) if ( change_func )
{ {
const auto& v = old_entry_val ? old_entry_val->GetVal() : new_entry_val->GetVal(); const auto& v = old_entry_val ? old_entry_val->GetVal() : new_entry_val->GetVal();
CallChangeFunc(change_index.get(), v, old_entry_val ? ELEMENT_CHANGED : ELEMENT_NEW); CallChangeFunc(change_index, v, old_entry_val ? ELEMENT_CHANGED : ELEMENT_NEW);
} }
} }
@ -2049,7 +2049,7 @@ IntrusivePtr<ListVal> TableVal::RecreateIndex(const HashKey& k) const
return table_hash->RecoverVals(k); return table_hash->RecoverVals(k);
} }
void TableVal::CallChangeFunc(const Val* index, void TableVal::CallChangeFunc(const IntrusivePtr<Val>& index,
const IntrusivePtr<Val>& old_value, const IntrusivePtr<Val>& old_value,
OnChangeType tpe) OnChangeType tpe)
{ {
@ -2073,10 +2073,14 @@ void TableVal::CallChangeFunc(const Val* index,
} }
const Func* f = thefunc->AsFunc(); const Func* f = thefunc->AsFunc();
auto lv = index->AsListVal();
zeek::Args vl; zeek::Args vl;
vl.reserve(2 + lv->Length() + table_type->IsTable());
// we either get passed the raw index_val - or a ListVal with exactly one element.
if ( index->GetType()->Tag() == zeek::TYPE_LIST )
vl.reserve(2 + index->AsListVal()->Length() + table_type->IsTable());
else
vl.reserve(3 + table_type->IsTable());
vl.emplace_back(NewRef{}, this); vl.emplace_back(NewRef{}, this);
switch ( tpe ) switch ( tpe )
@ -2094,8 +2098,14 @@ void TableVal::CallChangeFunc(const Val* index,
vl.emplace_back(zeek::BifType::Enum::TableChange->GetVal(BifEnum::TableChange::TABLE_ELEMENT_EXPIRED)); vl.emplace_back(zeek::BifType::Enum::TableChange->GetVal(BifEnum::TableChange::TABLE_ELEMENT_EXPIRED));
} }
for ( const auto& v : lv->Vals() )
vl.emplace_back(v); if ( index->GetType()->Tag() == zeek::TYPE_LIST )
{
for ( const auto& v : index->AsListVal()->Vals() )
vl.emplace_back(v);
}
else
vl.emplace_back(index);
if ( table_type->IsTable() ) if ( table_type->IsTable() )
vl.emplace_back(old_value); vl.emplace_back(old_value);
@ -2122,7 +2132,7 @@ void TableVal::SendToStore(const Val* index, const TableEntryVal* new_entry_val,
if ( ! handle ) if ( ! handle )
return; return;
// we wither get passed the raw index_val - or a ListVal with exactly one element. // we either get passed the raw index_val - or a ListVal with exactly one element.
// Since broker does not support ListVals, we have to unoll this in the second case. // Since broker does not support ListVals, we have to unoll this in the second case.
const Val* index_val; const Val* index_val;
if ( index->GetType()->Tag() == zeek::TYPE_LIST ) if ( index->GetType()->Tag() == zeek::TYPE_LIST )
@ -2214,6 +2224,10 @@ void TableVal::SendToStore(const Val* index, const TableEntryVal* new_entry_val,
IntrusivePtr<Val> TableVal::Remove(const Val& index, bool broker_forward) IntrusivePtr<Val> TableVal::Remove(const Val& index, bool broker_forward)
{ {
auto k = MakeHashKey(index); auto k = MakeHashKey(index);
// this is totally cheating around the fact that we need a Intrusive pointer.
IntrusivePtr<Val> changefunc_val;
if ( change_func )
changefunc_val = RecreateIndex(*(k.get()));
TableEntryVal* v = k ? AsNonConstTable()->RemoveEntry(k.get()) : nullptr; TableEntryVal* v = k ? AsNonConstTable()->RemoveEntry(k.get()) : nullptr;
IntrusivePtr<Val> va; IntrusivePtr<Val> va;
@ -2230,7 +2244,7 @@ IntrusivePtr<Val> TableVal::Remove(const Val& index, bool broker_forward)
if ( broker_forward && ! broker_store.empty() ) if ( broker_forward && ! broker_store.empty() )
SendToStore(&index, nullptr, ELEMENT_REMOVED); SendToStore(&index, nullptr, ELEMENT_REMOVED);
if ( change_func ) if ( change_func )
CallChangeFunc(&index, va, ELEMENT_REMOVED); CallChangeFunc(changefunc_val, va, ELEMENT_REMOVED);
return va; return va;
} }
@ -2255,13 +2269,13 @@ IntrusivePtr<Val> TableVal::Remove(const HashKey& k)
Modified(); Modified();
if ( ( change_func && va ) || ( ! broker_store.empty() ) ) if ( va && ( change_func || ! broker_store.empty() ) )
{ {
auto index = table_hash->RecoverVals(k); auto index = table_hash->RecoverVals(k);
if ( ! broker_store.empty() ) if ( ! broker_store.empty() )
SendToStore(index.get(), nullptr, ELEMENT_REMOVED); SendToStore(index.get(), nullptr, ELEMENT_REMOVED);
if ( change_func && va ) if ( change_func && va )
CallChangeFunc(index.get(), va, ELEMENT_REMOVED); CallChangeFunc(index, va, ELEMENT_REMOVED);
} }
return va; return va;
@ -2565,7 +2579,7 @@ void TableVal::DoExpire(double t)
{ {
if ( ! idx ) if ( ! idx )
idx = RecreateIndex(*k); idx = RecreateIndex(*k);
CallChangeFunc(idx.get(), v->GetVal(), ELEMENT_EXPIRED); CallChangeFunc(idx, v->GetVal(), ELEMENT_EXPIRED);
} }
delete v; delete v;

View file

@ -1070,8 +1070,8 @@ protected:
// Enum for the different kinds of changes an &on_change handler can see // Enum for the different kinds of changes an &on_change handler can see
enum OnChangeType { ELEMENT_NEW, ELEMENT_CHANGED, ELEMENT_REMOVED, ELEMENT_EXPIRED }; enum OnChangeType { ELEMENT_NEW, ELEMENT_CHANGED, ELEMENT_REMOVED, ELEMENT_EXPIRED };
// Calls &change_func. Does not take ownership of values. (Refs if needed). // Calls &change_func.
void CallChangeFunc(const Val* index, const IntrusivePtr<Val>& old_value, void CallChangeFunc(const IntrusivePtr<Val>& index, const IntrusivePtr<Val>& old_value,
OnChangeType tpe); OnChangeType tpe);
// Sends data on to backing Broker Store // Sends data on to backing Broker Store

View file

@ -29,22 +29,22 @@ type testrec: record {
c: set[string]; c: set[string];
}; };
function expire_t(tbl: any, idx: string): interval function change_t(tbl: any, tpe: TableChange, idx: string, idxb: count)
{ {
print fmt("Expiring t: %s", idx); if ( tpe == TABLE_ELEMENT_EXPIRED )
return 0sec; print fmt("Expiring t: %s", idx);
} }
function expire_s(tbl: any, idx: string): interval function change_s(tbl: any, tpe: TableChange, idx: string, idbx: count)
{ {
print fmt("Expiring s: %s", idx); if ( tpe == TABLE_ELEMENT_EXPIRED )
return 0sec; print fmt("Expiring s: %s", idx);
} }
function expire_r(tbl: any, idx: string): interval function change_r(tbl: any, tpe: TableChange, idx: string, idxb: testrec)
{ {
print fmt("Expiring r: %s", idx); if ( tpe == TABLE_ELEMENT_EXPIRED )
return 0sec; print fmt("Expiring r: %s", idx);
} }
function print_keys() function print_keys()
@ -60,9 +60,9 @@ function print_keys()
} }
} }
global t: table[string] of count &broker_store="table" &create_expire=4sec &expire_func=expire_t; global t: table[string] of count &broker_store="table" &create_expire=4sec &on_change=change_t;
global s: table[string] of count &broker_store="set" &write_expire=3sec &expire_func=expire_s; global s: table[string] of count &broker_store="set" &write_expire=3sec &on_change=change_s;
global r: table[string] of testrec &broker_store="rec" &read_expire=5sec &expire_func=expire_r; global r: table[string] of testrec &broker_store="rec" &write_expire=5sec &on_change=change_r;
event zeek_init() event zeek_init()
{ {
@ -134,27 +134,27 @@ type testrec: record {
c: set[string]; c: set[string];
}; };
function expire_t(tbl: any, idx: string): interval function change_t(tbl: any, tpe: TableChange, idx: string, idxb: count)
{ {
print fmt("Expiring t: %s", idx); if ( tpe == TABLE_ELEMENT_EXPIRED )
return 0sec; print fmt("Expiring t: %s", idx);
} }
function expire_s(tbl: any, idx: string): interval function change_s(tbl: any, tpe: TableChange, idx: string, idbx: count)
{ {
print fmt("Expiring s: %s", idx); if ( tpe == TABLE_ELEMENT_EXPIRED )
return 0sec; print fmt("Expiring s: %s", idx);
} }
function expire_r(tbl: any, idx: string): interval function change_r(tbl: any, tpe: TableChange, idx: string, idxb: testrec)
{ {
print fmt("Expiring r: %s", idx); if ( tpe == TABLE_ELEMENT_EXPIRED )
return 0sec; print fmt("Expiring r: %s", idx);
} }
global t: table[string] of count &broker_store="table" &create_expire=4sec &expire_func=expire_t; global t: table[string] of count &broker_store="table" &create_expire=4sec &on_change=change_t;
global s: table[string] of count &broker_store="set" &write_expire=3sec &expire_func=expire_s; global s: table[string] of count &broker_store="set" &write_expire=3sec &on_change=change_s;
global r: table[string] of testrec &broker_store="rec" &read_expire=5sec &expire_func=expire_r; global r: table[string] of testrec &broker_store="rec" &write_expire=5sec &on_change=change_r;
event zeek_init() event zeek_init()
{ {