diff --git a/NEWS b/NEWS index ec2bb7efe8..fd246b46b3 100644 --- a/NEWS +++ b/NEWS @@ -65,7 +65,8 @@ New Functionality ```global t: table[string] of count &backend=Broker::MEMORY;``` - This feature is documented in detail here: FIXME. + This feature is documented in detail here: + https://docs.zeek.org/en/current/frameworks/broker.html#broker-store-backed-zeek-tables-for-data-synchronization-and-persistence Note: this feature is experimental and the syntax/featureset can change in the future. diff --git a/src/Attr.cc b/src/Attr.cc index 351a360618..66a084c08e 100644 --- a/src/Attr.cc +++ b/src/Attr.cc @@ -465,7 +465,7 @@ void Attributes::CheckAttr(Attr* a) } if ( Find(ATTR_BACKEND) ) { - Error("&broker_store and &backend cannot be used simultaneously"); + Error("&backend and &broker_store cannot be used simultaneously"); } } // fallthrough @@ -646,7 +646,7 @@ void Attributes::CheckAttr(Attr* a) Error("&backend only supports one-element set/table indexes"); } - // Only support atomic types for the moment. + // Only support atomic types for the moment, unless explicitly overriden if ( ! type->AsTableType()->IsSet() && ! input::Manager::IsCompatibleType(type->AsTableType()->Yield().get(), true) && ! Find(ATTR_BROKER_STORE_ALLOW_COMPLEX) ) @@ -689,6 +689,7 @@ void Attributes::CheckAttr(Attr* a) Error("&broker_store only supports one-element set/table indexes"); } + // Only support atomic types for the moment, unless explicitly overriden if ( ! type->AsTableType()->IsSet() && ! input::Manager::IsCompatibleType(type->AsTableType()->Yield().get(), true) && ! Find(ATTR_BROKER_STORE_ALLOW_COMPLEX) ) diff --git a/src/Attr.h b/src/Attr.h index 952bb7a92a..0e18f0ace2 100644 --- a/src/Attr.h +++ b/src/Attr.h @@ -44,7 +44,7 @@ enum AttrTag { ATTR_ON_CHANGE, // for table change tracking ATTR_BROKER_STORE, // for Broker store backed tables ATTR_BROKER_STORE_ALLOW_COMPLEX, // for Broker store backed tables - ATTR_BACKEND, // for Broker store backed tabled + ATTR_BACKEND, // for Broker store backed tables ATTR_DEPRECATED, NUM_ATTRS // this item should always be last }; diff --git a/src/Val.h b/src/Val.h index 40d35cc58c..8000c5ed06 100644 --- a/src/Val.h +++ b/src/Val.h @@ -1029,6 +1029,16 @@ public: */ void SetBrokerStore(const std::string& store) { broker_store = store; } + /** + * Disable change notification processing of &on_change until re-enabled. + */ + void DisableChangeNotifications() { in_change_func = true; } + + /** + * Re-enables change notifcations after being disabled by DisableChangeNotifications. + */ + void EnableChangeNotifications() { in_change_func = false; } + protected: void Init(zeek::TableTypePtr t); diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index 064b231855..49a1728a6c 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -1009,13 +1009,13 @@ void Manager::ProcessStoreEvent(broker::data msg) if ( ! table ) return; - auto key = insert.key(); - auto data = insert.value(); - // We sent this message. Ignore it. if ( insert.publisher() == storehandle->store_pid ) return; + auto key = insert.key(); + auto data = insert.value(); + DBG_LOG(DBG_BROKER, "Store %s: Insert: %s:%s (%s:%s)", insert.store_id().c_str(), to_string(insert.key()).c_str(), to_string(insert.value()).c_str(), insert.key().get_type_name(), insert.value().get_type_name()); if ( table->GetType()->IsSet() && data.get_type() != broker::data::type::none ) @@ -1058,13 +1058,13 @@ void Manager::ProcessStoreEvent(broker::data msg) if ( ! table ) return; - auto key = update.key(); - auto data = update.new_value(); - // We sent this message. Ignore it. if ( update.publisher() == storehandle->store_pid ) return; + auto key = update.key(); + auto data = update.new_value(); + DBG_LOG(DBG_BROKER, "Store %s: Update: %s->%s (%s)", update.store_id().c_str(), to_string(update.old_value()).c_str(), to_string(update.new_value()).c_str(), update.new_value().get_type_name()); if ( table->GetType()->IsSet() && data.get_type() != broker::data::type::none ) @@ -1653,7 +1653,7 @@ StoreHandleVal* Manager::MakeMaster(const string& name, broker::backend type, data_stores.emplace(name, handle); iosource_mgr->RegisterFd(handle->proxy.mailbox().descriptor(), this); - CheckForwarding(name); + PrepareForwarding(name); if ( ! bstate->endpoint.use_real_time() ) // Wait for master to become available/responsive. @@ -1667,7 +1667,6 @@ StoreHandleVal* Manager::MakeMaster(const string& name, broker::backend type, void Manager::BrokerStoreToZeekTable(const std::string& name, const StoreHandleVal* handle) { - // consider if it might be wise to disable &on_change while filling the table if ( ! handle->forward_to ) return; @@ -1680,6 +1679,10 @@ void Manager::BrokerStoreToZeekTable(const std::string& name, const StoreHandleV const auto& its = table->GetType()->AsTableType()->GetIndexTypes(); bool is_set = table->GetType()->IsSet(); assert( its.size() == 1 ); + + // disable &on_change notifications while filling the table. + table->DisableChangeNotifications(); + for ( const auto key : *set ) { auto zeek_key = data_to_val(key, its[0].get()); @@ -1687,6 +1690,7 @@ void Manager::BrokerStoreToZeekTable(const std::string& name, const StoreHandleV { reporter->Error("Failed to convert key \"%s\" while importing broker store to table for store \"%s\". Aborting import.", to_string(key).c_str(), name.c_str()); // just abort - this probably means the types are incompatible + table->EnableChangeNotifications(); return; } @@ -1700,6 +1704,7 @@ void Manager::BrokerStoreToZeekTable(const std::string& name, const StoreHandleV if ( ! value ) { reporter->Error("Failed to load value for key %s while importing Broker store %s to table", to_string(key).c_str(), name.c_str()); + table->EnableChangeNotifications(); continue; } @@ -1707,11 +1712,13 @@ void Manager::BrokerStoreToZeekTable(const std::string& name, const StoreHandleV if ( ! zeek_value ) { reporter->Error("Could not convert %s to table value while trying to import Broker store %s. Aborting import.", to_string(value).c_str(), name.c_str()); + table->EnableChangeNotifications(); return; } table->Assign(zeek_key, zeek_value, false); } + table->EnableChangeNotifications(); return; } @@ -1742,7 +1749,7 @@ StoreHandleVal* Manager::MakeClone(const string& name, double resync_interval, data_stores.emplace(name, handle); iosource_mgr->RegisterFd(handle->proxy.mailbox().descriptor(), this); - CheckForwarding(name); + PrepareForwarding(name); return handle; } @@ -1760,8 +1767,6 @@ bool Manager::CloseStore(const string& name) if ( s == data_stores.end() ) return false; - auto pubid = s->second->store.frontend_id(); - iosource_mgr->UnregisterFd(s->second->proxy.mailbox().descriptor(), this); for ( auto i = pending_queries.begin(); i != pending_queries.end(); ) @@ -1815,11 +1820,11 @@ bool Manager::AddForwardedStore(const std::string& name, zeek::IntrusivePtr