TableSync: don't raise &on_change, smaller fixes

This addresses PR feedback. The main component in this commit is to
disable &on_change notifications when &backend loads a table from sqlite
on startup.
This commit is contained in:
Johanna Amann 2020-07-17 14:21:27 -07:00
parent 930a5c8ebd
commit 36db9d8369
8 changed files with 43 additions and 20 deletions

View file

@ -1009,13 +1009,13 @@ void Manager::ProcessStoreEvent(broker::data msg)
if ( ! table )
return;
auto key = insert.key();
auto data = insert.value();
// We sent this message. Ignore it.
if ( insert.publisher() == storehandle->store_pid )
return;
auto key = insert.key();
auto data = insert.value();
DBG_LOG(DBG_BROKER, "Store %s: Insert: %s:%s (%s:%s)", insert.store_id().c_str(), to_string(insert.key()).c_str(), to_string(insert.value()).c_str(), insert.key().get_type_name(), insert.value().get_type_name());
if ( table->GetType()->IsSet() && data.get_type() != broker::data::type::none )
@ -1058,13 +1058,13 @@ void Manager::ProcessStoreEvent(broker::data msg)
if ( ! table )
return;
auto key = update.key();
auto data = update.new_value();
// We sent this message. Ignore it.
if ( update.publisher() == storehandle->store_pid )
return;
auto key = update.key();
auto data = update.new_value();
DBG_LOG(DBG_BROKER, "Store %s: Update: %s->%s (%s)", update.store_id().c_str(), to_string(update.old_value()).c_str(), to_string(update.new_value()).c_str(), update.new_value().get_type_name());
if ( table->GetType()->IsSet() && data.get_type() != broker::data::type::none )
@ -1653,7 +1653,7 @@ StoreHandleVal* Manager::MakeMaster(const string& name, broker::backend type,
data_stores.emplace(name, handle);
iosource_mgr->RegisterFd(handle->proxy.mailbox().descriptor(), this);
CheckForwarding(name);
PrepareForwarding(name);
if ( ! bstate->endpoint.use_real_time() )
// Wait for master to become available/responsive.
@ -1667,7 +1667,6 @@ StoreHandleVal* Manager::MakeMaster(const string& name, broker::backend type,
void Manager::BrokerStoreToZeekTable(const std::string& name, const StoreHandleVal* handle)
{
// consider if it might be wise to disable &on_change while filling the table
if ( ! handle->forward_to )
return;
@ -1680,6 +1679,10 @@ void Manager::BrokerStoreToZeekTable(const std::string& name, const StoreHandleV
const auto& its = table->GetType()->AsTableType()->GetIndexTypes();
bool is_set = table->GetType()->IsSet();
assert( its.size() == 1 );
// disable &on_change notifications while filling the table.
table->DisableChangeNotifications();
for ( const auto key : *set )
{
auto zeek_key = data_to_val(key, its[0].get());
@ -1687,6 +1690,7 @@ void Manager::BrokerStoreToZeekTable(const std::string& name, const StoreHandleV
{
reporter->Error("Failed to convert key \"%s\" while importing broker store to table for store \"%s\". Aborting import.", to_string(key).c_str(), name.c_str());
// just abort - this probably means the types are incompatible
table->EnableChangeNotifications();
return;
}
@ -1700,6 +1704,7 @@ void Manager::BrokerStoreToZeekTable(const std::string& name, const StoreHandleV
if ( ! value )
{
reporter->Error("Failed to load value for key %s while importing Broker store %s to table", to_string(key).c_str(), name.c_str());
table->EnableChangeNotifications();
continue;
}
@ -1707,11 +1712,13 @@ void Manager::BrokerStoreToZeekTable(const std::string& name, const StoreHandleV
if ( ! zeek_value )
{
reporter->Error("Could not convert %s to table value while trying to import Broker store %s. Aborting import.", to_string(value).c_str(), name.c_str());
table->EnableChangeNotifications();
return;
}
table->Assign(zeek_key, zeek_value, false);
}
table->EnableChangeNotifications();
return;
}
@ -1742,7 +1749,7 @@ StoreHandleVal* Manager::MakeClone(const string& name, double resync_interval,
data_stores.emplace(name, handle);
iosource_mgr->RegisterFd(handle->proxy.mailbox().descriptor(), this);
CheckForwarding(name);
PrepareForwarding(name);
return handle;
}
@ -1760,8 +1767,6 @@ bool Manager::CloseStore(const string& name)
if ( s == data_stores.end() )
return false;
auto pubid = s->second->store.frontend_id();
iosource_mgr->UnregisterFd(s->second->proxy.mailbox().descriptor(), this);
for ( auto i = pending_queries.begin(); i != pending_queries.end(); )
@ -1815,11 +1820,11 @@ bool Manager::AddForwardedStore(const std::string& name, zeek::IntrusivePtr<zeek
DBG_LOG(DBG_BROKER, "Adding table forward for data store %s", name.c_str());
forwarded_stores.emplace(name, table);
CheckForwarding(name);
PrepareForwarding(name);
return true;
}
void Manager::CheckForwarding(const std::string &name)
void Manager::PrepareForwarding(const std::string &name)
{
auto handle = LookupStore(name);
if ( ! handle )