diff --git a/aux/broker b/aux/broker index 5c676e0fd4..8bba3c45c8 160000 --- a/aux/broker +++ b/aux/broker @@ -1 +1 @@ -Subproject commit 5c676e0fd42ecd5e36cbff19cca5b79a8f3b5a3d +Subproject commit 8bba3c45c85d99752dc6f8da9b40b768e3c39804 diff --git a/src/Val.cc b/src/Val.cc index 8c1cbe34db..37c936b073 100644 --- a/src/Val.cc +++ b/src/Val.cc @@ -1576,13 +1576,12 @@ bool TableVal::Assign(IntrusivePtr index, std::unique_ptr k, { auto change_index = index ? std::move(index) : RecreateIndex(k_copy); + const auto& v = old_entry_val ? old_entry_val->GetVal() : new_entry_val->GetVal(); + if ( ! broker_store.empty() ) - SendToStore(change_index.get(), new_val.get(), old_entry_val ? ELEMENT_CHANGED : ELEMENT_NEW); + SendToStore(change_index.get(), v.get(), old_entry_val ? ELEMENT_CHANGED : ELEMENT_NEW); if ( change_func ) - { - const auto& v = old_entry_val ? old_entry_val->GetVal() : new_entry_val->GetVal(); CallChangeFunc(change_index.get(), v, old_entry_val ? ELEMENT_CHANGED : ELEMENT_NEW); - } } delete old_entry_val; @@ -2138,7 +2137,7 @@ void TableVal::SendToStore(const Val* index, const Val* new_value, OnChangeType index_val = index; } - // FIXME: switch back to just storing tables directly in the broker store? + // FIXME: switch back to just storing tables directly in the broker store? // me which store a change came from - and this still seems to be missing from the store_events. (Or I am blind). auto broker_index = bro_broker::val_to_data(index_val); diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index d248fde171..fa4c586707 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -950,19 +950,16 @@ void Manager::Process() void Manager::ProcessStoreEvent(const broker::topic& topic, broker::data msg) { - auto topic_parts = broker::topic::split(topic); - const auto& store_name = topic_parts.back(); - - auto storehandle = broker_mgr->LookupStore(store_name); - if ( ! storehandle ) - return; - - auto table = storehandle->forward_to; - if ( ! table ) - return; - if ( auto insert = broker::store_event::insert::make(msg) ) { + auto storehandle = broker_mgr->LookupStore(insert.store_id()); + if ( ! storehandle ) + return; + + auto table = storehandle->forward_to; + if ( ! table ) + return; + auto key = insert.key(); auto data = insert.value(); @@ -1004,6 +1001,14 @@ void Manager::ProcessStoreEvent(const broker::topic& topic, broker::data msg) } else if ( auto update = broker::store_event::update::make(msg) ) { + auto storehandle = broker_mgr->LookupStore(update.store_id()); + if ( ! storehandle ) + return; + + auto table = storehandle->forward_to; + if ( ! table ) + return; + auto key = update.key(); auto data = update.new_value(); @@ -1044,7 +1049,15 @@ void Manager::ProcessStoreEvent(const broker::topic& topic, broker::data msg) } else if ( auto erase = broker::store_event::erase::make(msg) ) { - DBG_LOG(DBG_BROKER, "Erase for key %s", store_name.c_str()); + auto storehandle = broker_mgr->LookupStore(erase.store_id()); + if ( ! storehandle ) + return; + + auto table = storehandle->forward_to; + if ( ! table ) + return; + + DBG_LOG(DBG_BROKER, "Erase for key %s", erase.store_id().c_str()); // We sent this message. Ignore it. if ( erase.publisher() == storehandle->store_pid )