Update to recent broker changes.

Specifically the store name is now part of the messages.
This commit is contained in:
Johanna Amann 2020-06-05 14:30:44 -07:00
parent 9d9aefaec3
commit 62f208086c
3 changed files with 30 additions and 18 deletions

@ -1 +1 @@
Subproject commit 5c676e0fd42ecd5e36cbff19cca5b79a8f3b5a3d Subproject commit 8bba3c45c85d99752dc6f8da9b40b768e3c39804

View file

@ -1576,14 +1576,13 @@ bool TableVal::Assign(IntrusivePtr<Val> index, std::unique_ptr<HashKey> k,
{ {
auto change_index = index ? std::move(index) auto change_index = index ? std::move(index)
: RecreateIndex(k_copy); : RecreateIndex(k_copy);
if ( ! broker_store.empty() )
SendToStore(change_index.get(), new_val.get(), old_entry_val ? ELEMENT_CHANGED : ELEMENT_NEW);
if ( change_func )
{
const auto& v = old_entry_val ? old_entry_val->GetVal() : new_entry_val->GetVal(); const auto& v = old_entry_val ? old_entry_val->GetVal() : new_entry_val->GetVal();
if ( ! broker_store.empty() )
SendToStore(change_index.get(), v.get(), old_entry_val ? ELEMENT_CHANGED : ELEMENT_NEW);
if ( change_func )
CallChangeFunc(change_index.get(), v, old_entry_val ? ELEMENT_CHANGED : ELEMENT_NEW); CallChangeFunc(change_index.get(), v, old_entry_val ? ELEMENT_CHANGED : ELEMENT_NEW);
} }
}
delete old_entry_val; delete old_entry_val;

View file

@ -950,10 +950,9 @@ void Manager::Process()
void Manager::ProcessStoreEvent(const broker::topic& topic, broker::data msg) void Manager::ProcessStoreEvent(const broker::topic& topic, broker::data msg)
{ {
auto topic_parts = broker::topic::split(topic); if ( auto insert = broker::store_event::insert::make(msg) )
const auto& store_name = topic_parts.back(); {
auto storehandle = broker_mgr->LookupStore(insert.store_id());
auto storehandle = broker_mgr->LookupStore(store_name);
if ( ! storehandle ) if ( ! storehandle )
return; return;
@ -961,8 +960,6 @@ void Manager::ProcessStoreEvent(const broker::topic& topic, broker::data msg)
if ( ! table ) if ( ! table )
return; return;
if ( auto insert = broker::store_event::insert::make(msg) )
{
auto key = insert.key(); auto key = insert.key();
auto data = insert.value(); auto data = insert.value();
@ -1004,6 +1001,14 @@ void Manager::ProcessStoreEvent(const broker::topic& topic, broker::data msg)
} }
else if ( auto update = broker::store_event::update::make(msg) ) else if ( auto update = broker::store_event::update::make(msg) )
{ {
auto storehandle = broker_mgr->LookupStore(update.store_id());
if ( ! storehandle )
return;
auto table = storehandle->forward_to;
if ( ! table )
return;
auto key = update.key(); auto key = update.key();
auto data = update.new_value(); auto data = update.new_value();
@ -1044,7 +1049,15 @@ void Manager::ProcessStoreEvent(const broker::topic& topic, broker::data msg)
} }
else if ( auto erase = broker::store_event::erase::make(msg) ) else if ( auto erase = broker::store_event::erase::make(msg) )
{ {
DBG_LOG(DBG_BROKER, "Erase for key %s", store_name.c_str()); auto storehandle = broker_mgr->LookupStore(erase.store_id());
if ( ! storehandle )
return;
auto table = storehandle->forward_to;
if ( ! table )
return;
DBG_LOG(DBG_BROKER, "Erase for key %s", erase.store_id().c_str());
// We sent this message. Ignore it. // We sent this message. Ignore it.
if ( erase.publisher() == storehandle->store_pid ) if ( erase.publisher() == storehandle->store_pid )