Merge remote-tracking branch 'origin/topic/johanna/table-changes'

* origin/topic/johanna/table-changes: (26 commits)
  TableSync: try to make test more robust & add debug output
  Increase timeouts to see if FreeBSD will be happy with this.
  Try to make FreeBSD test happy with larger timeout.
  TableSync: refactor common functionality into function
  TableSync: don't raise &on_change, smaller fixes
  TableSync: rename auto_store -> table_store
  SyncTables: address feedback part 1 - naming (broker and zeek)
  BrokerStore <-> Zeek Tables: cleanup and bug workaround
  Zeek Table<->Brokerstore: cleanup, documentation, small fixes
  BrokerStore<->Zeek table: adopt to recent Zeek API changes
  BrokerStore<->Zeek Tables Fix a few small test failures.
  BrokerStore<->Zeek tables: allow setting storage location & tests
  BrokerStore<->Zeek tables: &backend works for in-memory stores.
  BrokerStore<->Zeek table - introdude &backend attribute
  BrokerStore<->Zeek tables: test for clones synchronizing to a master
  BrokerStore<->Zeek tables: load persistent tables on startup.
  Brokerstore<->Tables: attribute conflicts
  Zeek/Brokerstore updates: expiration
  Zeek/Brokerstore updates: add test that includes updates from clones
  Zeek/Brokerstore updates: first working end-to-end test
  ...
This commit is contained in:
Robin Sommer 2020-07-21 14:54:46 +00:00
commit c3f4971eb2
50 changed files with 2290 additions and 78 deletions

View file

@ -151,6 +151,8 @@ void Manager::InitPostScript()
log_topic_func = get_option("Broker::log_topic")->AsFunc();
log_id_type = zeek::id::find_type("Log::ID")->AsEnumType();
writer_id_type = zeek::id::find_type("Log::Writer")->AsEnumType();
zeek_table_manager = get_option("Broker::table_store_master")->AsBool();
zeek_table_db_directory = get_option("Broker::table_store_db_directory")->AsString()->CheckString();
opaque_of_data_type = zeek::make_intrusive<zeek::OpaqueType>("Broker::Data");
opaque_of_set_iterator = zeek::make_intrusive<zeek::OpaqueType>("Broker::SetIterator");
@ -212,6 +214,54 @@ void Manager::InitPostScript()
reporter->FatalError("Failed to register broker subscriber with iosource_mgr");
if ( ! iosource_mgr->RegisterFd(bstate->status_subscriber.fd(), this) )
reporter->FatalError("Failed to register broker status subscriber with iosource_mgr");
bstate->subscriber.add_topic(broker::topics::store_events, true);
InitializeBrokerStoreForwarding();
}
void Manager::InitializeBrokerStoreForwarding()
{
const auto& globals = zeek::detail::global_scope()->Vars();
for ( const auto& global : globals )
{
auto& id = global.second;
if ( id->HasVal() && id->GetAttr(zeek::detail::ATTR_BACKEND) )
{
const auto& attr = id->GetAttr(zeek::detail::ATTR_BACKEND);
auto e = static_cast<BifEnum::Broker::BackendType>(attr->GetExpr()->Eval(nullptr)->AsEnum());
auto storename = std::string("___sync_store_") + global.first;
id->GetVal()->AsTableVal()->SetBrokerStore(storename);
AddForwardedStore(storename, {zeek::NewRef{}, id->GetVal()->AsTableVal()});
// We only create masters here. For clones, we do all the work of setting up
// the forwarding - but we do not try to initialize the clone. We can only initialize
// the clone, once a node has a connection to a master. This is currently done in scriptland
// in scripts/base/frameworks/cluster/broker-stores.zeek. Once the ALM transport is ready
// we can change over to doing this here.
if ( ! zeek_table_manager )
continue;
auto backend = bro_broker::to_backend_type(e);
auto suffix = ".store";
switch ( backend ) {
case broker::backend::sqlite:
suffix = ".sqlite";
break;
case broker::backend::rocksdb:
suffix = ".rocksdb";
break;
default:
break;
}
auto path = zeek_table_db_directory + "/" + storename + suffix;
MakeMaster(storename, backend, broker::backend_options{{"path", path}});
}
}
}
void Manager::Terminate()
@ -872,7 +922,8 @@ void Manager::Process()
{
// Ensure that time gets update before processing broker messages, or events
// based on them might get scheduled wrong.
net_update_time(current_time());
if ( use_real_time )
net_update_time(current_time());
bool had_input = false;
@ -906,6 +957,12 @@ void Manager::Process()
auto& topic = broker::get_topic(message);
auto& msg = broker::get_data(message);
if ( broker::topics::store_events.prefix_of(topic) )
{
ProcessStoreEvent(std::move(msg));
continue;
}
try
{
DispatchMessage(topic, std::move(msg));
@ -941,6 +998,135 @@ void Manager::Process()
}
}
void Manager::ProcessStoreEventInsertUpdate(zeek::IntrusivePtr<zeek::TableVal> table, const std::string& store_id, const broker::data& key, const broker::data& data, const broker::data& old_value, bool insert)
{
auto type = "Insert";
if ( ! insert )
type = "Update";
if ( insert )
{
DBG_LOG(DBG_BROKER, "Store %s: Insert: %s:%s (%s:%s)", store_id.c_str(), to_string(key).c_str(), to_string(data).c_str(), key.get_type_name(), data.get_type_name());
}
else
{
DBG_LOG(DBG_BROKER, "Store %s: Update: %s->%s (%s)", store_id.c_str(), to_string(old_value).c_str(), to_string(data).c_str(), data.get_type_name());
}
if ( table->GetType()->IsSet() && data.get_type() != broker::data::type::none )
{
reporter->Error("ProcessStoreEvent %s got %s when expecting set", type, data.get_type_name());
return;
}
const auto& its = table->GetType()->AsTableType()->GetIndexTypes();
assert( its.size() == 1 );
auto zeek_key = data_to_val(key, its[0].get());
if ( ! zeek_key )
{
reporter->Error("ProcessStoreEvent %s: could not convert key \"%s\" for store \"%s\" while receiving remote data. This probably means the tables have different types on different nodes.", type, to_string(key).c_str(), store_id.c_str());
return;
}
if ( table->GetType()->IsSet() )
{
table->Assign(zeek_key, nullptr, false);
return;
}
// it is a table
auto zeek_value = data_to_val(data, table->GetType()->Yield().get());
if ( ! zeek_value )
{
reporter->Error("ProcessStoreEvent %s: could not convert value \"%s\" for key \"%s\" in store \"%s\" while receiving remote data. This probably means the tables have different types on different nodes.", type, to_string(data).c_str(), to_string(key).c_str(), store_id.c_str());
return;
}
table->Assign(zeek_key, zeek_value, false);
}
void Manager::ProcessStoreEvent(broker::data msg)
{
if ( auto insert = broker::store_event::insert::make(msg) )
{
auto storehandle = broker_mgr->LookupStore(insert.store_id());
if ( ! storehandle )
return;
auto table = storehandle->forward_to;
if ( ! table )
return;
// We sent this message. Ignore it.
if ( insert.publisher() == storehandle->store_pid )
return;
ProcessStoreEventInsertUpdate(table, insert.store_id(), insert.key(), insert.value(), {}, true);
}
else if ( auto update = broker::store_event::update::make(msg) )
{
auto storehandle = broker_mgr->LookupStore(update.store_id());
if ( ! storehandle )
return;
auto table = storehandle->forward_to;
if ( ! table )
return;
// We sent this message. Ignore it.
if ( update.publisher() == storehandle->store_pid )
return;
ProcessStoreEventInsertUpdate(table, update.store_id(), update.key(), update.new_value(), update.old_value(), false);
}
else if ( auto erase = broker::store_event::erase::make(msg) )
{
auto storehandle = broker_mgr->LookupStore(erase.store_id());
if ( ! storehandle )
return;
auto table = storehandle->forward_to;
if ( ! table )
return;
// We sent this message. Ignore it.
if ( erase.publisher() == storehandle->store_pid )
return;
auto key = erase.key();
DBG_LOG(DBG_BROKER, "Store %s: Erase key %s", erase.store_id().c_str(), to_string(key).c_str());
const auto& its = table->GetType()->AsTableType()->GetIndexTypes();
assert( its.size() == 1 );
auto zeek_key = data_to_val(key, its[0].get());
if ( ! zeek_key )
{
reporter->Error("ProcessStoreEvent: could not convert key \"%s\" for store \"%s\" while receiving remote erase. This probably means the tables have different types on different nodes.", to_string(key).c_str(), insert.store_id().c_str());
return;
}
table->Remove(*zeek_key, false);
}
else if ( auto expire = broker::store_event::expire::make(msg) )
{
// We just ignore expiries - expiring information on the Zeek side is handled by Zeek itself.
#ifdef DEBUG
// let's only debug log for stores that we know.
auto storehandle = broker_mgr->LookupStore(expire.store_id());
if ( ! storehandle )
return;
auto table = storehandle->forward_to;
if ( ! table )
return;
DBG_LOG(DBG_BROKER, "Store %s: Store expired key %s", expire.store_id().c_str(), to_string(expire.key()).c_str());
#endif /* DEBUG */
}
else
{
reporter->Error("ProcessStoreEvent: Unhandled event type");
}
}
void Manager::ProcessEvent(const broker::topic& topic, broker::zeek::Event ev)
{
@ -1406,16 +1592,76 @@ StoreHandleVal* Manager::MakeMaster(const string& name, broker::backend type,
data_stores.emplace(name, handle);
iosource_mgr->RegisterFd(handle->proxy.mailbox().descriptor(), this);
PrepareForwarding(name);
if ( bstate->endpoint.use_real_time() )
return handle;
if ( ! bstate->endpoint.use_real_time() )
// Wait for master to become available/responsive.
// Possibly avoids timeouts in scripts during unit tests.
handle->store.exists("");
BrokerStoreToZeekTable(name, handle);
// Wait for master to become available/responsive.
// Possibly avoids timeouts in scripts during unit tests.
handle->store.exists("");
return handle;
}
void Manager::BrokerStoreToZeekTable(const std::string& name, const StoreHandleVal* handle)
{
if ( ! handle->forward_to )
return;
auto keys = handle->store.keys();
if ( ! keys )
return;
auto set = caf::get_if<broker::set>(&(keys->get_data()));
auto table = handle->forward_to;
const auto& its = table->GetType()->AsTableType()->GetIndexTypes();
bool is_set = table->GetType()->IsSet();
assert( its.size() == 1 );
// disable &on_change notifications while filling the table.
table->DisableChangeNotifications();
for ( const auto key : *set )
{
auto zeek_key = data_to_val(key, its[0].get());
if ( ! zeek_key )
{
reporter->Error("Failed to convert key \"%s\" while importing broker store to table for store \"%s\". Aborting import.", to_string(key).c_str(), name.c_str());
// just abort - this probably means the types are incompatible
table->EnableChangeNotifications();
return;
}
if ( is_set )
{
table->Assign(zeek_key, nullptr, false);
continue;
}
auto value = handle->store.get(key);
if ( ! value )
{
reporter->Error("Failed to load value for key %s while importing Broker store %s to table", to_string(key).c_str(), name.c_str());
table->EnableChangeNotifications();
continue;
}
auto zeek_value = data_to_val(*value, table->GetType()->Yield().get());
if ( ! zeek_value )
{
reporter->Error("Could not convert %s to table value while trying to import Broker store %s. Aborting import.", to_string(value).c_str(), name.c_str());
table->EnableChangeNotifications();
return;
}
table->Assign(zeek_key, zeek_value, false);
}
table->EnableChangeNotifications();
return;
}
StoreHandleVal* Manager::MakeClone(const string& name, double resync_interval,
double stale_interval,
double mutation_buffer_interval)
@ -1443,7 +1689,7 @@ StoreHandleVal* Manager::MakeClone(const string& name, double resync_interval,
data_stores.emplace(name, handle);
iosource_mgr->RegisterFd(handle->proxy.mailbox().descriptor(), this);
PrepareForwarding(name);
return handle;
}
@ -1503,4 +1749,32 @@ const Stats& Manager::GetStatistics()
return statistics;
}
bool Manager::AddForwardedStore(const std::string& name, zeek::IntrusivePtr<zeek::TableVal> table)
{
if ( forwarded_stores.find(name) != forwarded_stores.end() )
{
reporter->Error("same &broker_store %s specified for two different variables", name.c_str());
return false;
}
DBG_LOG(DBG_BROKER, "Adding table forward for data store %s", name.c_str());
forwarded_stores.emplace(name, table);
PrepareForwarding(name);
return true;
}
void Manager::PrepareForwarding(const std::string &name)
{
auto handle = LookupStore(name);
if ( ! handle )
return;
if ( forwarded_stores.find(name) == forwarded_stores.end() )
return;
handle->forward_to = forwarded_stores.at(name);
DBG_LOG(DBG_BROKER, "Resolved table forward for data store %s", name.c_str());
}
} // namespace bro_broker