diff --git a/src/Val.cc b/src/Val.cc index 37c936b073..1f81d15398 100644 --- a/src/Val.cc +++ b/src/Val.cc @@ -1520,7 +1520,7 @@ void TableVal::CheckExpireAttr(attr_tag at) } } -bool TableVal::Assign(IntrusivePtr index, IntrusivePtr new_val) +bool TableVal::Assign(IntrusivePtr index, IntrusivePtr new_val, bool broker_forward) { auto k = MakeHashKey(*index); @@ -1530,7 +1530,7 @@ bool TableVal::Assign(IntrusivePtr index, IntrusivePtr new_val) return false; } - return Assign(std::move(index), std::move(k), std::move(new_val)); + return Assign(std::move(index), std::move(k), std::move(new_val), broker_forward); } bool TableVal::Assign(Val* index, Val* new_val) @@ -1539,7 +1539,7 @@ bool TableVal::Assign(Val* index, Val* new_val) } bool TableVal::Assign(IntrusivePtr index, std::unique_ptr k, - IntrusivePtr new_val) + IntrusivePtr new_val, bool broker_forward) { bool is_set = table_type->IsSet(); @@ -1572,16 +1572,18 @@ bool TableVal::Assign(IntrusivePtr index, std::unique_ptr k, Modified(); - if ( change_func || ( ! broker_store.empty() ) ) + if ( change_func || ( broker_forward && ! broker_store.empty() ) ) { auto change_index = index ? std::move(index) : RecreateIndex(k_copy); - const auto& v = old_entry_val ? old_entry_val->GetVal() : new_entry_val->GetVal(); if ( ! broker_store.empty() ) - SendToStore(change_index.get(), v.get(), old_entry_val ? ELEMENT_CHANGED : ELEMENT_NEW); + SendToStore(change_index.get(), new_entry_val->GetVal().get(), old_entry_val ? ELEMENT_CHANGED : ELEMENT_NEW); if ( change_func ) + { + const auto& v = old_entry_val ? old_entry_val->GetVal() : new_entry_val->GetVal(); CallChangeFunc(change_index.get(), v, old_entry_val ? ELEMENT_CHANGED : ELEMENT_NEW); + } } delete old_entry_val; @@ -2180,7 +2182,7 @@ void TableVal::SendToStore(const Val* index, const Val* new_value, OnChangeType } } -IntrusivePtr TableVal::Remove(const Val& index) +IntrusivePtr TableVal::Remove(const Val& index, bool broker_forward) { auto k = MakeHashKey(index); TableEntryVal* v = k ? AsNonConstTable()->RemoveEntry(k.get()) : nullptr; @@ -2196,7 +2198,7 @@ IntrusivePtr TableVal::Remove(const Val& index) Modified(); - if ( ! broker_store.empty() ) + if ( broker_forward && ! broker_store.empty() ) SendToStore(&index, nullptr, ELEMENT_REMOVED); if ( change_func ) CallChangeFunc(&index, va, ELEMENT_REMOVED); diff --git a/src/Val.h b/src/Val.h index 564a464850..4870bcb74d 100644 --- a/src/Val.h +++ b/src/Val.h @@ -772,9 +772,11 @@ public: * @param index The key to assign. * @param new_val The value to assign at the index. For a set, this * must be nullptr. + * @param broker_forward Controls if the value will be forwarded to attached + * broker stores. * @return True if the assignment type-checked. */ - bool Assign(IntrusivePtr index, IntrusivePtr new_val); + bool Assign(IntrusivePtr index, IntrusivePtr new_val, bool broker_forward = true); /** * Assigns a value at an associated index in the table (or in the @@ -784,10 +786,12 @@ public: * @param k A precomputed hash key to use. * @param new_val The value to assign at the index. For a set, this * must be nullptr. + * @param broker_forward Controls if the value will be forwarded to attached + * broker stores. * @return True if the assignment type-checked. */ bool Assign(IntrusivePtr index, std::unique_ptr k, - IntrusivePtr new_val); + IntrusivePtr new_val, bool broker_forward = true); // Returns true if the assignment typechecked, false if not. The // methods take ownership of new_val, but not of the index. If we're @@ -909,12 +913,14 @@ public: /** * Remove an element from the table and return it. * @param index The index to remove. + * @param broker_forward Controls if the remove operation will be forwarded to attached + * broker stores. * @return The value associated with the index if it exists, else nullptr. * For a sets that don't really contain associated values, a placeholder * value is returned to differentiate it from non-existent index (nullptr), * but otherwise has no meaning in relation to the set's contents. */ - IntrusivePtr Remove(const Val& index); + IntrusivePtr Remove(const Val& index, bool broker_forward = true); /** * Same as Remove(const Val&), but uses a precomputed hash key. diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index fa4c586707..ef672ceb41 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -963,17 +963,19 @@ void Manager::ProcessStoreEvent(const broker::topic& topic, broker::data msg) auto key = insert.key(); auto data = insert.value(); - DBG_LOG(DBG_BROKER, "Insert Data: %s:%s (%s:%s)", to_string(insert.key()).c_str(), to_string(insert.value()).c_str(), insert.key().get_type_name(), insert.value().get_type_name()); + // We sent this message. Ignore it. + if ( insert.publisher() == storehandle->store_pid ) + return; + + DBG_LOG(DBG_BROKER, "Store %s: Insert: %s:%s (%s:%s)", insert.store_id().c_str(), to_string(insert.key()).c_str(), to_string(insert.value()).c_str(), insert.key().get_type_name(), insert.value().get_type_name()); + + if ( table->GetType()->IsSet() && data.get_type() != broker::data::type::none ) { reporter->Error("ProcessStoreEvent Insert got %s when expecting set", data.get_type_name()); return; } - // We sent this message. Ignore it. - if ( insert.publisher() == storehandle->store_pid ) - return; - // FIXME: expiry! const auto& its = table->GetType()->AsTableType()->IndexTypes(); assert( its.size() == 1 ); @@ -986,7 +988,7 @@ void Manager::ProcessStoreEvent(const broker::topic& topic, broker::data msg) if ( table->GetType()->IsSet() ) { - table->Assign(zeek_key, nullptr); + table->Assign(zeek_key, nullptr, false); return; } @@ -997,7 +999,7 @@ void Manager::ProcessStoreEvent(const broker::topic& topic, broker::data msg) reporter->Error("ProcessStoreEvent: failed to convert value"); return; } - table->Assign(zeek_key, zeek_value); + table->Assign(zeek_key, zeek_value, false); } else if ( auto update = broker::store_event::update::make(msg) ) { @@ -1012,17 +1014,18 @@ void Manager::ProcessStoreEvent(const broker::topic& topic, broker::data msg) auto key = update.key(); auto data = update.new_value(); - DBG_LOG(DBG_BROKER, "Update Data: %s->%s (%s)", to_string(update.old_value()).c_str(), to_string(update.new_value()).c_str(), update.new_value().get_type_name()); + // We sent this message. Ignore it. + if ( update.publisher() == storehandle->store_pid ) + return; + + DBG_LOG(DBG_BROKER, "Store %s: Update: %s->%s (%s)", update.store_id().c_str(), to_string(update.old_value()).c_str(), to_string(update.new_value()).c_str(), update.new_value().get_type_name()); + if ( table->GetType()->IsSet() && data.get_type() != broker::data::type::none ) { reporter->Error("ProcessStoreEvent Update got %s when expecting set", data.get_type_name()); return; } - // We sent this message. Ignore it. - if ( update.publisher() == storehandle->store_pid ) - return; - const auto& its = table->GetType()->AsTableType()->IndexTypes(); assert( its.size() == 1 ); auto zeek_key = data_to_val(std::move(key), its[0].get()); @@ -1034,7 +1037,7 @@ void Manager::ProcessStoreEvent(const broker::topic& topic, broker::data msg) if ( table->GetType()->IsSet() ) { - table->Assign(zeek_key, nullptr); + table->Assign(zeek_key, nullptr, false); return; } @@ -1045,7 +1048,7 @@ void Manager::ProcessStoreEvent(const broker::topic& topic, broker::data msg) reporter->Error("ProcessStoreEvent: failed to convert value"); return; } - table->Assign(zeek_key, zeek_value); + table->Assign(zeek_key, zeek_value, false); } else if ( auto erase = broker::store_event::erase::make(msg) ) { @@ -1057,13 +1060,12 @@ void Manager::ProcessStoreEvent(const broker::topic& topic, broker::data msg) if ( ! table ) return; - DBG_LOG(DBG_BROKER, "Erase for key %s", erase.store_id().c_str()); - // We sent this message. Ignore it. if ( erase.publisher() == storehandle->store_pid ) return; auto key = erase.key(); + DBG_LOG(DBG_BROKER, "Store %s: Erase key %s", erase.store_id().c_str(), to_string(key).c_str()); const auto& its = table->GetType()->AsTableType()->IndexTypes(); assert( its.size() == 1 ); auto zeek_key = data_to_val(std::move(key), its[0].get()); @@ -1072,7 +1074,7 @@ void Manager::ProcessStoreEvent(const broker::topic& topic, broker::data msg) reporter->Error("ProcessStoreEvent: failed to convert key"); return; } - table->Remove(*zeek_key); + table->Remove(*zeek_key, false); } else { diff --git a/testing/btest/Baseline/broker.store.brokerstore-attr-simple/clone.out b/testing/btest/Baseline/broker.store.brokerstore-attr-simple/clone.out new file mode 100644 index 0000000000..b2bbfd2600 --- /dev/null +++ b/testing/btest/Baseline/broker.store.brokerstore-attr-simple/clone.out @@ -0,0 +1,19 @@ +Peer added +{ +[b] = 3, +[whatever] = 5, +[a] = 3 +} +{ +hi +} +{ +[b] = [a=2, b=d, c={ +elem1, +elem2 +}], +[a] = [a=1, b=c, c={ +elem1, +elem2 +}] +} diff --git a/testing/btest/broker/store/brokerstore-attr-simple.zeek b/testing/btest/broker/store/brokerstore-attr-simple.zeek new file mode 100644 index 0000000000..ac2bfd2233 --- /dev/null +++ b/testing/btest/broker/store/brokerstore-attr-simple.zeek @@ -0,0 +1,109 @@ +# @TEST-PORT: BROKER_PORT + +# @TEST-EXEC: btest-bg-run master "zeek -B broker -b ../master.zeek >../master.out" +# @TEST-EXEC: btest-bg-run clone "zeek -B broker -b ../clone.zeek >../clone.out" +# @TEST-EXEC: btest-bg-wait 15 +# +# @TEST-EXEC: btest-diff clone.out + +@TEST-START-FILE master.zeek +redef exit_only_after_terminate = T; + +module TestModule; + +global tablestore: opaque of Broker::Store; +global setstore: opaque of Broker::Store; +global recordstore: opaque of Broker::Store; + +type testrec: record { + a: count; + b: string; + c: set[string]; +}; + +global t: table[string] of count &broker_store="table"; +global s: set[string] &broker_store="set"; +global r: table[string] of testrec &broker_store="rec"; + +event zeek_init() + { + Broker::listen("127.0.0.1", to_port(getenv("BROKER_PORT"))); + tablestore = Broker::create_master("table"); + setstore = Broker::create_master("set"); + recordstore = Broker::create_master("rec"); + } + +event insert_stuff() + { + print "Inserting stuff"; + t["a"] = 5; + delete t["a"]; + add s["hi"]; + t["a"] = 2; + t["a"] = 3; + t["b"] = 3; + t["c"] = 4; + t["whatever"] = 5; + delete t["c"]; + r["a"] = testrec($a=1, $b="b", $c=set("elem1", "elem2")); + r["a"] = testrec($a=1, $b="c", $c=set("elem1", "elem2")); + r["b"] = testrec($a=2, $b="d", $c=set("elem1", "elem2")); + print t; + print s; + print r; + } + +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) + { + print "Peer added ", endpoint; + schedule 3secs { insert_stuff() }; + } + +event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) + { + terminate(); + } + +@TEST-END-FILE + +@TEST-START-FILE clone.zeek +redef exit_only_after_terminate = T; + +module TestModule; + +global tablestore: opaque of Broker::Store; +global setstore: opaque of Broker::Store; +global recordstore: opaque of Broker::Store; + +type testrec: record { + a: count; + b: string; + c: set[string]; +}; + +global t: table[string] of count &broker_store="table"; +global s: set[string] &broker_store="set"; +global r: table[string] of testrec &broker_store="rec"; + +event zeek_init() + { + Broker::peer("127.0.0.1", to_port(getenv("BROKER_PORT"))); + } + +event dump_tables() + { + print t; + print s; + print r; + terminate(); + } + +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) + { + print "Peer added"; + tablestore = Broker::create_clone("table"); + setstore = Broker::create_clone("set"); + recordstore = Broker::create_clone("rec"); + schedule 5secs { dump_tables() }; + } +@TEST-END-FILE diff --git a/testing/btest/language/brokerstore.test b/testing/btest/language/brokerstore.test deleted file mode 100644 index c9fda7aa6f..0000000000 --- a/testing/btest/language/brokerstore.test +++ /dev/null @@ -1,34 +0,0 @@ -# @TEST-EXEC: zeek %INPUT >output -# @TEST-EXEC: btest-diff output - -redef exit_only_after_terminate = T; - -module TestModule; - -global tablestore: opaque of Broker::Store; -#global tabletwostore: opaque of Broker::Store; -global setstore: opaque of Broker::Store; - -global t: table[string] of count &broker_store="table"; -#global ct: table[string, string] of count &broker_store="table2"; -global s: set[string] &broker_store="set"; - -event zeek_init() - { - tablestore = Broker::create_master("table"); - #tabletwostore = Broker::create_master("table2"); - setstore = Broker::create_master("set"); - print "inserting"; - t["a"] = 5; - delete t["a"]; - #add s["hi"]; - #print "changing"; - t["a"] = 2; - t["a"] = 3; - t["b"] = 3; - t["c"] = 4; - t["whatever"] = 5; - #print "deleting"; - #delete t["a"]; - #delete s["hi"]; - }