Zeek/Brokerstore updates: expiration

Expiration is done separately by the manager, the clones, and the
brokerstore. But - it should happen roughly at the same time.
This commit is contained in:
Johanna Amann 2020-06-24 16:49:18 -07:00
parent c8a3a90339
commit 09119ae69d
7 changed files with 281 additions and 31 deletions

View file

@ -1488,7 +1488,7 @@ void TableVal::SetAttrs(IntrusivePtr<zeek::detail::Attributes> a)
{
IntrusivePtr<Val> c = bs->GetExpr()->Eval(nullptr);
assert(c);
assert(c->GetType()->Tag() == TYPE_STRING);
assert(c->GetType()->Tag() == zeek::TYPE_STRING);
broker_store = c->AsStringVal()->AsString()->CheckString();
broker_mgr->AddForwardedStore(broker_store, {NewRef{}, this});
}
@ -1578,7 +1578,7 @@ bool TableVal::Assign(IntrusivePtr<Val> index, std::unique_ptr<HashKey> k,
: RecreateIndex(k_copy);
if ( ! broker_store.empty() )
SendToStore(change_index.get(), new_entry_val->GetVal().get(), old_entry_val ? ELEMENT_CHANGED : ELEMENT_NEW);
SendToStore(change_index.get(), new_entry_val, old_entry_val ? ELEMENT_CHANGED : ELEMENT_NEW);
if ( change_func )
{
const auto& v = old_entry_val ? old_entry_val->GetVal() : new_entry_val->GetVal();
@ -2110,7 +2110,7 @@ void TableVal::CallChangeFunc(const Val* index,
in_change_func = false;
}
void TableVal::SendToStore(const Val* index, const Val* new_value, OnChangeType tpe)
void TableVal::SendToStore(const Val* index, const TableEntryVal* new_entry_val, OnChangeType tpe)
{
if ( broker_store.empty() || ! index )
return;
@ -2125,7 +2125,7 @@ void TableVal::SendToStore(const Val* index, const Val* new_value, OnChangeType
// we wither get passed the raw index_val - or a ListVal with exactly one element.
// Since broker does not support ListVals, we have to unoll this in the second case.
const Val* index_val;
if ( index->GetType()->Tag() == TYPE_LIST )
if ( index->GetType()->Tag() == zeek::TYPE_LIST )
{
if ( index->AsListVal()->Length() != 1 )
{
@ -2154,27 +2154,55 @@ void TableVal::SendToStore(const Val* index, const Val* new_value, OnChangeType
{
case ELEMENT_NEW:
case ELEMENT_CHANGED:
{
broker::optional<broker::timespan> expiry;
auto expire_time = GetExpireTime();
if ( expire_time == 0 )
// Entry is set to immediately expire. Let's not forward it.
break;
if ( expire_time > 0 )
{
if ( attrs->Find(zeek::detail::ATTR_EXPIRE_CREATE) )
{
// for create expiry, we have to substract the already elapsed time from the expiry.
auto e = expire_time - (network_time - new_entry_val->ExpireAccessTime());
if ( e <= 0 )
// element already expired? Let's not insert it.
break;
expiry = bro_broker::convert_expiry(e);
}
else
expiry = bro_broker::convert_expiry(expire_time);
}
if ( table_type->IsSet() )
handle->store.put(std::move(*broker_index), broker::data());
handle->store.put(std::move(*broker_index), broker::data(), expiry);
else
{
if ( ! new_value )
if ( ! new_entry_val )
{
builtin_error("did not receive new value for broker-store send operation");
return;
}
auto new_value = new_entry_val->GetVal().get();
auto broker_val = bro_broker::val_to_data(new_value);
if ( ! broker_val )
{
builtin_error("invalid Broker data conversation for table value");
return;
}
handle->store.put(std::move(*broker_index), std::move(*broker_val));
handle->store.put(std::move(*broker_index), std::move(*broker_val), expiry);
}
break;
}
case ELEMENT_REMOVED:
handle->store.erase(std::move(*broker_index));
break;
case ELEMENT_EXPIRED:
// we do nothing here. The broker store does its own expiration - so the element
// should expire at about the same time.
break;
}
}
catch ( InterpreterException& e )