diff --git a/src/Val.cc b/src/Val.cc index b4c3b9c1ad..5d19a9dc77 100644 --- a/src/Val.cc +++ b/src/Val.cc @@ -1488,7 +1488,7 @@ void TableVal::SetAttrs(IntrusivePtr a) { IntrusivePtr c = bs->GetExpr()->Eval(nullptr); assert(c); - assert(c->GetType()->Tag() == TYPE_STRING); + assert(c->GetType()->Tag() == zeek::TYPE_STRING); broker_store = c->AsStringVal()->AsString()->CheckString(); broker_mgr->AddForwardedStore(broker_store, {NewRef{}, this}); } @@ -1578,7 +1578,7 @@ bool TableVal::Assign(IntrusivePtr index, std::unique_ptr k, : RecreateIndex(k_copy); if ( ! broker_store.empty() ) - SendToStore(change_index.get(), new_entry_val->GetVal().get(), old_entry_val ? ELEMENT_CHANGED : ELEMENT_NEW); + SendToStore(change_index.get(), new_entry_val, old_entry_val ? ELEMENT_CHANGED : ELEMENT_NEW); if ( change_func ) { const auto& v = old_entry_val ? old_entry_val->GetVal() : new_entry_val->GetVal(); @@ -2110,7 +2110,7 @@ void TableVal::CallChangeFunc(const Val* index, in_change_func = false; } -void TableVal::SendToStore(const Val* index, const Val* new_value, OnChangeType tpe) +void TableVal::SendToStore(const Val* index, const TableEntryVal* new_entry_val, OnChangeType tpe) { if ( broker_store.empty() || ! index ) return; @@ -2125,7 +2125,7 @@ void TableVal::SendToStore(const Val* index, const Val* new_value, OnChangeType // we wither get passed the raw index_val - or a ListVal with exactly one element. // Since broker does not support ListVals, we have to unoll this in the second case. const Val* index_val; - if ( index->GetType()->Tag() == TYPE_LIST ) + if ( index->GetType()->Tag() == zeek::TYPE_LIST ) { if ( index->AsListVal()->Length() != 1 ) { @@ -2154,27 +2154,55 @@ void TableVal::SendToStore(const Val* index, const Val* new_value, OnChangeType { case ELEMENT_NEW: case ELEMENT_CHANGED: + { + broker::optional expiry; + + auto expire_time = GetExpireTime(); + if ( expire_time == 0 ) + // Entry is set to immediately expire. Let's not forward it. + break; + + if ( expire_time > 0 ) + { + if ( attrs->Find(zeek::detail::ATTR_EXPIRE_CREATE) ) + { + // for create expiry, we have to substract the already elapsed time from the expiry. + auto e = expire_time - (network_time - new_entry_val->ExpireAccessTime()); + if ( e <= 0 ) + // element already expired? Let's not insert it. + break; + expiry = bro_broker::convert_expiry(e); + } + else + expiry = bro_broker::convert_expiry(expire_time); + } if ( table_type->IsSet() ) - handle->store.put(std::move(*broker_index), broker::data()); + handle->store.put(std::move(*broker_index), broker::data(), expiry); else { - if ( ! new_value ) + if ( ! new_entry_val ) { builtin_error("did not receive new value for broker-store send operation"); return; } + auto new_value = new_entry_val->GetVal().get(); auto broker_val = bro_broker::val_to_data(new_value); if ( ! broker_val ) { builtin_error("invalid Broker data conversation for table value"); return; } - handle->store.put(std::move(*broker_index), std::move(*broker_val)); + handle->store.put(std::move(*broker_index), std::move(*broker_val), expiry); } break; + } case ELEMENT_REMOVED: handle->store.erase(std::move(*broker_index)); break; + case ELEMENT_EXPIRED: + // we do nothing here. The broker store does its own expiration - so the element + // should expire at about the same time. + break; } } catch ( InterpreterException& e ) diff --git a/src/Val.h b/src/Val.h index ed5bbf5692..bbd1fb7e2d 100644 --- a/src/Val.h +++ b/src/Val.h @@ -1075,7 +1075,7 @@ protected: OnChangeType tpe); // Sends data on to backing Broker Store - void SendToStore(const Val* index, const Val* new_value, OnChangeType tpe); + void SendToStore(const Val* index, const TableEntryVal* new_entry_val, OnChangeType tpe); IntrusivePtr DoClone(CloneState* state) override; diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index 9659a146d5..18403b9f9c 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -1076,6 +1076,22 @@ void Manager::ProcessStoreEvent(const broker::topic& topic, broker::data msg) } table->Remove(*zeek_key, false); } + else if ( auto expire = broker::store_event::expire::make(msg) ) + { + // We just ignore expirys - expiring information on the Zeek side is handled by Zeek itself. +#ifdef DEBUG + // let's only debug log for stores that we know. + auto storehandle = broker_mgr->LookupStore(expire.store_id()); + if ( ! storehandle ) + return; + + auto table = storehandle->forward_to; + if ( ! table ) + return; + + DBG_LOG(DBG_BROKER, "Store %s: Store expired key %s", expire.store_id().c_str(), to_string(expire.key()).c_str()); +#endif /* DEBUG */ + } else { reporter->Error("ProcessStoreEvent: Unhandled event type"); diff --git a/src/broker/Store.h b/src/broker/Store.h index 48bb4c36a6..46787c0525 100644 --- a/src/broker/Store.h +++ b/src/broker/Store.h @@ -46,6 +46,25 @@ inline IntrusivePtr query_result(IntrusivePtr data) return rval; } +/** + * Convert an expiry from a double (used to Zeek) to the format required by Broker + * @param e: expire interval as double; 0 if no expiry + * @return expire interval in broker format + */ +static broker::optional convert_expiry(double e) + { + broker::optional ts; + + if ( e ) + { + broker::timespan x; + broker::convert(e, x); + ts = x; + } + + return ts; + } + /** * Used for asynchronous data store queries which use "when" statements. */ diff --git a/src/broker/store.bif b/src/broker/store.bif index 68262f667b..6f1fdca8fd 100644 --- a/src/broker/store.bif +++ b/src/broker/store.bif @@ -7,19 +7,6 @@ #include "broker/Data.h" #include "Trigger.h" -static broker::optional prepare_expiry(double e) - { - broker::optional ts; - - if ( e ) - { - broker::timespan x; - broker::convert(e, x); - ts = x; - } - - return ts; - } %%} module Broker; @@ -262,7 +249,7 @@ function Broker::__put_unique%(h: opaque of Broker::Store, handle->store); auto req_id = handle->proxy.put_unique(std::move(*key), std::move(*val), - prepare_expiry(e)); + bro_broker::convert_expiry(e)); broker_mgr->TrackStoreQuery(handle, req_id, cb); return nullptr; @@ -384,7 +371,7 @@ function Broker::__put%(h: opaque of Broker::Store, return val_mgr->False(); } - handle->store.put(std::move(*key), std::move(*val), prepare_expiry(e)); + handle->store.put(std::move(*key), std::move(*val), bro_broker::convert_expiry(e)); return val_mgr->True(); %} @@ -435,7 +422,7 @@ function Broker::__increment%(h: opaque of Broker::Store, k: any, a: any, } handle->store.increment(std::move(*key), std::move(*amount), - prepare_expiry(e)); + bro_broker::convert_expiry(e)); return val_mgr->True(); %} @@ -464,7 +451,7 @@ function Broker::__decrement%(h: opaque of Broker::Store, k: any, a: any, return val_mgr->False(); } - handle->store.decrement(std::move(*key), std::move(*amount), prepare_expiry(e)); + handle->store.decrement(std::move(*key), std::move(*amount), bro_broker::convert_expiry(e)); return val_mgr->True(); %} @@ -493,7 +480,7 @@ function Broker::__append%(h: opaque of Broker::Store, k: any, s: any, return val_mgr->False(); } - handle->store.append(std::move(*key), std::move(*str), prepare_expiry(e)); + handle->store.append(std::move(*key), std::move(*str), bro_broker::convert_expiry(e)); return val_mgr->True(); %} @@ -523,7 +510,7 @@ function Broker::__insert_into_set%(h: opaque of Broker::Store, k: any, i: any, } handle->store.insert_into(std::move(*key), std::move(*idx), - prepare_expiry(e)); + bro_broker::convert_expiry(e)); return val_mgr->True(); %} @@ -560,7 +547,7 @@ function Broker::__insert_into_table%(h: opaque of Broker::Store, k: any, } handle->store.insert_into(std::move(*key), std::move(*idx), - std::move(*val), prepare_expiry(e)); + std::move(*val), bro_broker::convert_expiry(e)); return val_mgr->True(); %} @@ -590,7 +577,7 @@ function Broker::__remove_from%(h: opaque of Broker::Store, k: any, i: any, } handle->store.remove_from(std::move(*key), std::move(*idx), - prepare_expiry(e)); + bro_broker::convert_expiry(e)); return val_mgr->True(); %} @@ -619,7 +606,7 @@ function Broker::__push%(h: opaque of Broker::Store, k: any, v: any, return val_mgr->False(); } - handle->store.push(std::move(*key), std::move(*val), prepare_expiry(e)); + handle->store.push(std::move(*key), std::move(*val), bro_broker::convert_expiry(e)); return val_mgr->True(); %} @@ -640,7 +627,7 @@ function Broker::__pop%(h: opaque of Broker::Store, k: any, e: interval%): bool return val_mgr->False(); } - handle->store.pop(std::move(*key), prepare_expiry(e)); + handle->store.pop(std::move(*key), bro_broker::convert_expiry(e)); return val_mgr->True(); %} diff --git a/testing/btest/Baseline/broker.store.brokerstore-attr-expire/clone.out b/testing/btest/Baseline/broker.store.brokerstore-attr-expire/clone.out new file mode 100644 index 0000000000..100aaba603 --- /dev/null +++ b/testing/btest/Baseline/broker.store.brokerstore-attr-expire/clone.out @@ -0,0 +1,19 @@ +Peer added +Expiring s: expire_first +Expiring t: b +Expiring t: whatever +Expiring t: a +Expiring r: recb +Expiring s: expire_later +Expiring t: expire_later_in_t_not_with_a +Expiring r: reca +{ + +} +{ + +} +{ + +} +terminating diff --git a/testing/btest/broker/store/brokerstore-attr-expire.zeek b/testing/btest/broker/store/brokerstore-attr-expire.zeek new file mode 100644 index 0000000000..32ca097de7 --- /dev/null +++ b/testing/btest/broker/store/brokerstore-attr-expire.zeek @@ -0,0 +1,181 @@ +# @TEST-PORT: BROKER_PORT + +# @TEST-EXEC: btest-bg-run master "zeek -B broker -b ../master.zeek >../master.out" +# @TEST-EXEC: btest-bg-run clone "zeek -B broker -b ../clone.zeek >../clone.out" +# @TEST-EXEC: btest-bg-wait 20 +# +# @TEST-EXEC: btest-diff clone.out + +@TEST-START-FILE master.zeek +redef exit_only_after_terminate = T; +redef table_expire_interval = 0.5sec; + +module TestModule; + +global start_time: time; + +function time_past(): interval + { + return network_time() - start_time; + } + +global tablestore: opaque of Broker::Store; +global setstore: opaque of Broker::Store; +global recordstore: opaque of Broker::Store; + +type testrec: record { + a: count; + b: string; + c: set[string]; +}; + +function expire_t(tbl: any, idx: string): interval + { + print fmt("Expiring t: %s", idx); + return 0sec; + } + +function expire_s(tbl: any, idx: string): interval + { + print fmt("Expiring s: %s", idx); + return 0sec; + } + +function expire_r(tbl: any, idx: string): interval + { + print fmt("Expiring r: %s", idx); + return 0sec; + } + +function print_keys() + { + print "Printing keys"; + when ( local s = Broker::keys(tablestore) ) + { + print "keys", s; + } + timeout 2sec + { + print fmt(""); + } + } + +global t: table[string] of count &broker_store="table" &create_expire=4sec &expire_func=expire_t; +global s: table[string] of count &broker_store="set" &write_expire=3sec &expire_func=expire_s; +global r: table[string] of testrec &broker_store="rec" &read_expire=5sec &expire_func=expire_r; + +event zeek_init() + { + Broker::listen("127.0.0.1", to_port(getenv("BROKER_PORT"))); + tablestore = Broker::create_master("table"); + setstore = Broker::create_master("set"); + recordstore = Broker::create_master("rec"); + } + +event update_stuff() + { + t["a"] = 3; + t["expire_later_in_t_not_with_a"] = 4; + s["expire_later"] = 2; + r["reca"] = testrec($a=1, $b="c", $c=set("elem1", "elem2")); + } + +event insert_stuff() + { + print "Inserting stuff"; + start_time = network_time(); + t["a"] = 5; + delete t["a"]; + s["expire_first"] = 0; + s["expire_later"] = 1; + t["a"] = 2; + t["b"] = 3; + t["whatever"] = 5; + r["reca"] = testrec($a=1, $b="b", $c=set("elem1", "elem2")); + r["recb"] = testrec($a=2, $b="d", $c=set("elem1", "elem2")); + print t; + print s; + print r; + schedule 1.5sec { update_stuff() }; + } + +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) + { + print "Peer added ", endpoint; + schedule 3secs { insert_stuff() }; + } + +event terminate_me() + { + print "Terminating"; + terminate(); + } + +event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) + { + print_keys(); + schedule 3secs { terminate_me() }; + } +@TEST-END-FILE + +@TEST-START-FILE clone.zeek +redef exit_only_after_terminate = T; +redef table_expire_interval = 0.5sec; + +module TestModule; + +global tablestore: opaque of Broker::Store; +global setstore: opaque of Broker::Store; +global recordstore: opaque of Broker::Store; + +type testrec: record { + a: count; + b: string; + c: set[string]; +}; + +function expire_t(tbl: any, idx: string): interval + { + print fmt("Expiring t: %s", idx); + return 0sec; + } + +function expire_s(tbl: any, idx: string): interval + { + print fmt("Expiring s: %s", idx); + return 0sec; + } + +function expire_r(tbl: any, idx: string): interval + { + print fmt("Expiring r: %s", idx); + return 0sec; + } + +global t: table[string] of count &broker_store="table" &create_expire=4sec &expire_func=expire_t; +global s: table[string] of count &broker_store="set" &write_expire=3sec &expire_func=expire_s; +global r: table[string] of testrec &broker_store="rec" &read_expire=5sec &expire_func=expire_r; + +event zeek_init() + { + Broker::peer("127.0.0.1", to_port(getenv("BROKER_PORT"))); + } + +event dump_tables() + { + print t; + print s; + print r; + print "terminating"; + terminate(); + } + +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) + { + print "Peer added"; + tablestore = Broker::create_clone("table"); + setstore = Broker::create_clone("set"); + recordstore = Broker::create_clone("rec"); + schedule 15secs { dump_tables() }; + } +@TEST-END-FILE