diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index 49a1728a6c..a1b7c7d530 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -997,6 +997,49 @@ void Manager::Process() } } +void Manager::ProcessStoreEventInsertUpdate(zeek::IntrusivePtr table, const std::string& store_id, const broker::data& key, const broker::data& data, const broker::data& old_value, bool insert) + { + auto type = "Insert"; + if ( ! insert ) + type = "Update"; + + if ( insert ) + DBG_LOG(DBG_BROKER, "Store %s: Insert: %s:%s (%s:%s)", store_id.c_str(), to_string(key).c_str(), to_string(data).c_str(), key.get_type_name(), data.get_type_name()); + else + DBG_LOG(DBG_BROKER, "Store %s: Update: %s->%s (%s)", store_id.c_str(), to_string(old_value).c_str(), to_string(data).c_str(), data.get_type_name()); + + + if ( table->GetType()->IsSet() && data.get_type() != broker::data::type::none ) + { + reporter->Error("ProcessStoreEvent %s got %s when expecting set", type, data.get_type_name()); + return; + } + + const auto& its = table->GetType()->AsTableType()->GetIndexTypes(); + assert( its.size() == 1 ); + auto zeek_key = data_to_val(key, its[0].get()); + if ( ! zeek_key ) + { + reporter->Error("ProcessStoreEvent %s: could not convert key \"%s\" for store \"%s\" while receiving remote data. This probably means the tables have different types on different nodes.", type, to_string(key).c_str(), store_id.c_str()); + return; + } + + if ( table->GetType()->IsSet() ) + { + table->Assign(zeek_key, nullptr, false); + return; + } + + // it is a table + auto zeek_value = data_to_val(data, table->GetType()->Yield().get()); + if ( ! zeek_value ) + { + reporter->Error("ProcessStoreEvent %s: could not convert value \"%s\" for key \"%s\" in store \"%s\" while receiving remote data. This probably means the tables have different types on different nodes.", type, to_string(data).c_str(), to_string(key).c_str(), store_id.c_str()); + return; + } + table->Assign(zeek_key, zeek_value, false); + } + void Manager::ProcessStoreEvent(broker::data msg) { if ( auto insert = broker::store_event::insert::make(msg) ) @@ -1013,41 +1056,8 @@ void Manager::ProcessStoreEvent(broker::data msg) if ( insert.publisher() == storehandle->store_pid ) return; - auto key = insert.key(); - auto data = insert.value(); - - DBG_LOG(DBG_BROKER, "Store %s: Insert: %s:%s (%s:%s)", insert.store_id().c_str(), to_string(insert.key()).c_str(), to_string(insert.value()).c_str(), insert.key().get_type_name(), insert.value().get_type_name()); - - if ( table->GetType()->IsSet() && data.get_type() != broker::data::type::none ) - { - reporter->Error("ProcessStoreEvent Insert got %s when expecting set", data.get_type_name()); - return; - } - - const auto& its = table->GetType()->AsTableType()->GetIndexTypes(); - assert( its.size() == 1 ); - auto zeek_key = data_to_val(key, its[0].get()); - if ( ! zeek_key ) - { - reporter->Error("ProcessStoreEvent: could not convert key \"%s\" for store \"%s\" while receiving remote insert. This probably means the tables have different types on different nodes.", to_string(key).c_str(), insert.store_id().c_str()); - return; - } - - if ( table->GetType()->IsSet() ) - { - table->Assign(zeek_key, nullptr, false); - return; - } - - // it is a table - auto zeek_value = data_to_val(data, table->GetType()->Yield().get()); - if ( ! zeek_value ) - { - reporter->Error("ProcessStoreEvent: could not convert value \"%s\" for key \"%s\" in store \"%s\" while receiving remote insert. This probably means the tables have different types on different nodes.", to_string(data).c_str(), to_string(key).c_str(), insert.store_id().c_str()); - return; - } - table->Assign(zeek_key, zeek_value, false); - } + ProcessStoreEventInsertUpdate(table, insert.store_id(), insert.key(), insert.value(), {}, true); + } else if ( auto update = broker::store_event::update::make(msg) ) { auto storehandle = broker_mgr->LookupStore(update.store_id()); @@ -1062,40 +1072,7 @@ void Manager::ProcessStoreEvent(broker::data msg) if ( update.publisher() == storehandle->store_pid ) return; - auto key = update.key(); - auto data = update.new_value(); - - DBG_LOG(DBG_BROKER, "Store %s: Update: %s->%s (%s)", update.store_id().c_str(), to_string(update.old_value()).c_str(), to_string(update.new_value()).c_str(), update.new_value().get_type_name()); - - if ( table->GetType()->IsSet() && data.get_type() != broker::data::type::none ) - { - reporter->Error("ProcessStoreEvent Update got %s when expecting set", data.get_type_name()); - return; - } - - const auto& its = table->GetType()->AsTableType()->GetIndexTypes(); - assert( its.size() == 1 ); - auto zeek_key = data_to_val(key, its[0].get()); - if ( ! zeek_key ) - { - reporter->Error("ProcessStoreEvent: could not convert key \"%s\" for store \"%s\" while receiving remote update. This probably means the tables have different types on different nodes.", to_string(key).c_str(), insert.store_id().c_str()); - return; - } - - if ( table->GetType()->IsSet() ) - { - table->Assign(zeek_key, nullptr, false); - return; - } - - // it is a table - auto zeek_value = data_to_val(data, table->GetType()->Yield().get()); - if ( ! zeek_value ) - { - reporter->Error("ProcessStoreEvent: could not convert value \"%s\" for key \"%s\" in store \"%s\" while receiving remote update. This probably means the tables have different types on different nodes.", to_string(data).c_str(), to_string(key).c_str(), insert.store_id().c_str()); - return; - } - table->Assign(zeek_key, zeek_value, false); + ProcessStoreEventInsertUpdate(table, update.store_id(), update.key(), update.new_value(), update.old_value(), false); } else if ( auto erase = broker::store_event::erase::make(msg) ) { diff --git a/src/broker/Manager.h b/src/broker/Manager.h index 9b5af56183..bcd231aa9e 100644 --- a/src/broker/Manager.h +++ b/src/broker/Manager.h @@ -361,6 +361,8 @@ private: void DispatchMessage(const broker::topic& topic, broker::data msg); // Process events used for Broker store backed zeek tables void ProcessStoreEvent(broker::data msg); + // Common functionality for processing insert and update events. + void ProcessStoreEventInsertUpdate(zeek::IntrusivePtr table, const std::string& store_id, const broker::data& key, const broker::data& data, const broker::data& old_value, bool insert); void ProcessEvent(const broker::topic& topic, broker::zeek::Event ev); bool ProcessLogCreate(broker::zeek::LogCreate lc); bool ProcessLogWrite(broker::zeek::LogWrite lw); diff --git a/testing/btest/Baseline/broker.store.brokerstore-backend-simple-incompatible/worker-1..stderr b/testing/btest/Baseline/broker.store.brokerstore-backend-simple-incompatible/worker-1..stderr index 8a5d27c280..ea16941541 100644 --- a/testing/btest/Baseline/broker.store.brokerstore-backend-simple-incompatible/worker-1..stderr +++ b/testing/btest/Baseline/broker.store.brokerstore-backend-simple-incompatible/worker-1..stderr @@ -1,3 +1,3 @@ -error: ProcessStoreEvent: could not convert value "b" for key "a" in store "___sync_store_TestModule::s" while receiving remote insert. This probably means the tables have different types on different nodes. -error: ProcessStoreEvent: could not convert key "a" for store "___sync_store_TestModule::t" while receiving remote insert. This probably means the tables have different types on different nodes. +error: ProcessStoreEvent Insert: could not convert value "b" for key "a" in store "___sync_store_TestModule::s" while receiving remote data. This probably means the tables have different types on different nodes. +error: ProcessStoreEvent Insert: could not convert key "a" for store "___sync_store_TestModule::t" while receiving remote data. This probably means the tables have different types on different nodes. received termination signal