From 558e89b3ba6c9d1cc4ac12426f8fe226ff38972b Mon Sep 17 00:00:00 2001 From: Johanna Amann Date: Fri, 29 May 2020 14:32:16 -0700 Subject: [PATCH] Broker Store updates: get a bit more infrastructure in place. This compiles, but besides giving debug messages (and partially performing inserts/updates) it is not really helpful and definitely WIP. This also shows that I might have to re-think the approach that we will take here. So far, we actually insert tables as tables into Brokerstores. This opens up the potential to just have several tables synchronized via a single brokerstore. However, it turns out, that the current store_event API sends the complete table with each update. Which is problematic for obvious reasons - and not really sustainable. --- src/Val.cc | 23 +++- src/broker/Manager.cc | 143 +++++++++++++++++++++--- src/broker/Manager.h | 2 +- src/broker/Store.h | 5 +- testing/btest/language/brokerstore.test | 17 ++- 5 files changed, 164 insertions(+), 26 deletions(-) diff --git a/src/Val.cc b/src/Val.cc index c38f185a8c..a88474d994 100644 --- a/src/Val.cc +++ b/src/Val.cc @@ -2079,14 +2079,27 @@ void TableVal::SendToStore(const Val* index, const Val* new_value, OnChangeType if ( ! handle ) return; - if ( index->AsListVal()->Length() != 1 ) + // we wither get passed the raw index_val - or a ListVal with exactly one element. + // Since broker does not support ListVals, we have to unoll this in the second case. + const Val* index_val; + if ( index->Type()->Tag() == TYPE_LIST ) { - builtin_error("table with complex index not supported for &broker_store"); - return; + if ( index->AsListVal()->Length() != 1 ) + { + builtin_error("table with complex index not supported for &broker_store"); + return; + } + + index_val = index->AsListVal()->Index(0); + } + else + { + index_val = index; } - const auto index_val = index->AsListVal()->Index(0); - auto key_val = new StringVal("test"); + // FIXME: at the moment this is hardcoded to the name of the broker store. I needed something to be able to tell + // me which store a change came from - and this still seems to be missing from the store_events. (Or I am blind). + auto key_val = new StringVal(broker_store); auto broker_key = bro_broker::val_to_data(key_val); auto broker_index = bro_broker::val_to_data(index_val); Unref(key_val); diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index d7ce3a3054..d99f47423c 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -909,15 +909,7 @@ void Manager::Process() if ( topic == broker::topics::store_events ) { - if (auto insert = broker::store_event::insert::make(msg)) - { - reporter->Warning("It is an insert!"); - reporter->Warning("Key/Data (endpoint): %s/%s (%s)", to_string(insert.key()).c_str(), to_string(insert.value()).c_str(), to_string(insert.publisher()).c_str()); - } - else - { - reporter->Warning("Unhandled event type"); - } + ProcessStoreEvent(topic, std::move(msg)); continue; } @@ -956,6 +948,132 @@ void Manager::Process() } } +void Manager::ProcessStoreEvent(const broker::topic& topic, broker::data msg) + { + // for all of the following we currently cheat. The key always is the name of the store. This will change + // in the future. + if ( auto insert = broker::store_event::insert::make(msg) ) + { + auto remstore = to_string(insert.key()); + auto storehandle = broker_mgr->LookupStore(remstore); + if ( ! storehandle ) + return; + + auto table = storehandle->forward_to; + if ( ! table ) + return; + + auto data = insert.value(); + + reporter->Warning("Insert Data: %s (%s)", to_string(insert.value()).c_str(), insert.value().get_type_name()); + if ( table->Type()->IsSet() && data.get_type() != broker::data::type::set ) + { + reporter->Error("ProcessStoreEvent Insert got %s when expecting set", data.get_type_name()); + return; + } + else if ( ! table->Type()->IsSet() && data.get_type() != broker::data::type::table ) + { + reporter->Error("ProcessStoreEvent Insert got %s when expecting table", data.get_type_name()); + return; + } + + // We sent this message. Ignore it. + //if ( insert.publisher() == storehandle->store_pid ) + // continue; + + // We currently use data_to_val here. It creates a bit of overhead, but makes the code easier. We could change + // this to manual unrolling in the future. + auto val = data_to_val(std::move(data), table->Type()); + // So we are just doing it manually - at least for now. + auto temptable = val->AsTable(); + auto temptableval = val->AsTableVal(); + HashKey* k; + TableEntryVal* entry; + auto c = temptable->InitForIteration(); + while ( (entry = temptable->NextEntry(k, c))) + { + auto lv = temptableval->RecoverIndex(k); + delete k; + Val* entry_key = lv->Length() == 1 ? lv->Index(0) : lv.get(); + + // note - at the moment this code creates a loop + // Fixme: expiry! + if ( temptableval->Type()->IsSet() ) + table->Assign(entry_key, nullptr); + else + table->Assign(entry_key, entry->Value()); + } + } + else if ( auto update = broker::store_event::update::make(msg) ) + { + auto remstore = to_string(update.key()); + auto storehandle = broker_mgr->LookupStore(remstore); + if ( ! storehandle ) + return; + + auto table = storehandle->forward_to; + if ( ! table ) + return; + + auto data = update.new_value(); + + reporter->Warning("Update Data: %s->%s (%s)", to_string(update.old_value()).c_str(), to_string(update.new_value()).c_str(), update.new_value().get_type_name()); + if ( table->Type()->IsSet() && data.get_type() != broker::data::type::set ) + { + reporter->Error("ProcessStoreEvent Update got %s when expecting set", data.get_type_name()); + return; + } + else if ( ! table->Type()->IsSet() && data.get_type() != broker::data::type::table ) + { + reporter->Error("ProcessStoreEvent Update got %s when expecting table", data.get_type_name()); + return; + } + + // We sent this message. Ignore it. + if ( update.publisher() == storehandle->store_pid ) + return; + + // We currently use data_to_val here. It creates a bit of overhead, but makes the code easier. We could change + // this to manual unrolling in the future. + auto val = data_to_val(std::move(data), table->Type()); + // So we are just doing it manually - at least for now. + auto temptable = val->AsTable(); + auto temptableval = val->AsTableVal(); + HashKey* k; + TableEntryVal* entry; + auto c = temptable->InitForIteration(); + while ( (entry = temptable->NextEntry(k, c))) + { + auto lv = temptableval->RecoverIndex(k); + delete k; + Val* entry_key = lv->Length() == 1 ? lv->Index(0) : lv.get(); + + // note - at the moment this code creates a loop + if ( temptableval->Type()->IsSet() ) + table->Assign(entry_key, nullptr); + else + table->Assign(entry_key, entry->Value()); + } + } + else if ( auto erase = broker::store_event::erase::make(msg) ) + { + auto remstore = to_string(erase.key()); + auto storehandle = broker_mgr->LookupStore(remstore); + if ( ! storehandle ) + return; + + auto table = storehandle->forward_to; + if ( ! table ) + return; + + // I'm actually not sure we can get an erase - since we will never delete keys... + reporter->Warning("Erase for key %s", remstore.c_str()); + } + else + { + reporter->Warning("Unhandled event type"); + } + } void Manager::ProcessEvent(const broker::topic& topic, broker::zeek::Event ev) { @@ -1522,7 +1640,6 @@ bool Manager::CloseStore(const string& name) return false; auto pubid = s->second->store.frontend_id(); - forwarded_ids.erase(pubid); iosource_mgr->UnregisterFd(s->second->proxy.mailbox().descriptor(), this); @@ -1590,10 +1707,8 @@ void Manager::CheckForwarding(const std::string &name) if ( forwarded_stores.find(name) == forwarded_stores.end() ) return; - auto pubid = handle->store.frontend_id(); - - DBG_LOG(DBG_BROKER, "Resolved publishder %s for table forward for data store %s", to_string(pubid).c_str(), name.c_str()); - forwarded_ids.emplace(pubid, forwarded_stores.at(name)); + handle->forward_to = forwarded_stores.at(name); + DBG_LOG(DBG_BROKER, "Resolved table forward for data store %s", name.c_str()); } } // namespace bro_broker diff --git a/src/broker/Manager.h b/src/broker/Manager.h index 2746b89142..dd04093945 100644 --- a/src/broker/Manager.h +++ b/src/broker/Manager.h @@ -342,6 +342,7 @@ public: private: void DispatchMessage(const broker::topic& topic, broker::data msg); + void ProcessStoreEvent(const broker::topic& topic, broker::data msg); void ProcessEvent(const broker::topic& topic, broker::zeek::Event ev); bool ProcessLogCreate(broker::zeek::LogCreate lc); bool ProcessLogWrite(broker::zeek::LogWrite lw); @@ -386,7 +387,6 @@ private: std::shared_ptr bstate; std::unordered_map data_stores; std::unordered_map> forwarded_stores; - std::unordered_map> forwarded_ids; std::unordered_map pending_queries; std::vector forwarded_prefixes; diff --git a/src/broker/Store.h b/src/broker/Store.h index 4bf6dee717..2f1462939e 100644 --- a/src/broker/Store.h +++ b/src/broker/Store.h @@ -95,13 +95,16 @@ private: class StoreHandleVal : public OpaqueVal { public: StoreHandleVal(broker::store s) - : OpaqueVal(bro_broker::opaque_of_store_handle), store{s}, proxy{store} + : OpaqueVal(bro_broker::opaque_of_store_handle), store{s}, proxy{store}, store_pid{store.frontend_id()} { } void ValDescribe(ODesc* d) const override; broker::store store; broker::store::proxy proxy; + broker::publisher_id store_pid; + // Zeek table that events are forwarded to. + IntrusivePtr forward_to; protected: StoreHandleVal() = default; diff --git a/testing/btest/language/brokerstore.test b/testing/btest/language/brokerstore.test index cb1e207b36..b983e83851 100644 --- a/testing/btest/language/brokerstore.test +++ b/testing/btest/language/brokerstore.test @@ -6,21 +6,28 @@ redef exit_only_after_terminate = T; module TestModule; global tablestore: opaque of Broker::Store; +#global tabletwostore: opaque of Broker::Store; global setstore: opaque of Broker::Store; global t: table[string] of count &broker_store="table"; +#global ct: table[string, string] of count &broker_store="table2"; global s: set[string] &broker_store="set"; event zeek_init() { tablestore = Broker::create_master("table"); + #tabletwostore = Broker::create_master("table2"); setstore = Broker::create_master("set"); print "inserting"; t["a"] = 5; - add s["hi"]; - print "changing"; - t["a"] = 2; - print "deleting"; delete t["a"]; - delete s["hi"]; + #add s["hi"]; + #print "changing"; + t["a"] = 2; + t["b"] = 3; + t["c"] = 4; + t["whatever"] = 5; + #print "deleting"; + #delete t["a"]; + #delete s["hi"]; }