mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 14:48:21 +00:00
TableSync: refactor common functionality into function
This addresses feedback and puts the common update and insert functionality into its own function.
This commit is contained in:
parent
36db9d8369
commit
5982b1e4b2
3 changed files with 50 additions and 71 deletions
|
@ -997,6 +997,49 @@ void Manager::Process()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void Manager::ProcessStoreEventInsertUpdate(zeek::IntrusivePtr<zeek::TableVal> table, const std::string& store_id, const broker::data& key, const broker::data& data, const broker::data& old_value, bool insert)
|
||||||
|
{
|
||||||
|
auto type = "Insert";
|
||||||
|
if ( ! insert )
|
||||||
|
type = "Update";
|
||||||
|
|
||||||
|
if ( insert )
|
||||||
|
DBG_LOG(DBG_BROKER, "Store %s: Insert: %s:%s (%s:%s)", store_id.c_str(), to_string(key).c_str(), to_string(data).c_str(), key.get_type_name(), data.get_type_name());
|
||||||
|
else
|
||||||
|
DBG_LOG(DBG_BROKER, "Store %s: Update: %s->%s (%s)", store_id.c_str(), to_string(old_value).c_str(), to_string(data).c_str(), data.get_type_name());
|
||||||
|
|
||||||
|
|
||||||
|
if ( table->GetType()->IsSet() && data.get_type() != broker::data::type::none )
|
||||||
|
{
|
||||||
|
reporter->Error("ProcessStoreEvent %s got %s when expecting set", type, data.get_type_name());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const auto& its = table->GetType()->AsTableType()->GetIndexTypes();
|
||||||
|
assert( its.size() == 1 );
|
||||||
|
auto zeek_key = data_to_val(key, its[0].get());
|
||||||
|
if ( ! zeek_key )
|
||||||
|
{
|
||||||
|
reporter->Error("ProcessStoreEvent %s: could not convert key \"%s\" for store \"%s\" while receiving remote data. This probably means the tables have different types on different nodes.", type, to_string(key).c_str(), store_id.c_str());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ( table->GetType()->IsSet() )
|
||||||
|
{
|
||||||
|
table->Assign(zeek_key, nullptr, false);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// it is a table
|
||||||
|
auto zeek_value = data_to_val(data, table->GetType()->Yield().get());
|
||||||
|
if ( ! zeek_value )
|
||||||
|
{
|
||||||
|
reporter->Error("ProcessStoreEvent %s: could not convert value \"%s\" for key \"%s\" in store \"%s\" while receiving remote data. This probably means the tables have different types on different nodes.", type, to_string(data).c_str(), to_string(key).c_str(), store_id.c_str());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
table->Assign(zeek_key, zeek_value, false);
|
||||||
|
}
|
||||||
|
|
||||||
void Manager::ProcessStoreEvent(broker::data msg)
|
void Manager::ProcessStoreEvent(broker::data msg)
|
||||||
{
|
{
|
||||||
if ( auto insert = broker::store_event::insert::make(msg) )
|
if ( auto insert = broker::store_event::insert::make(msg) )
|
||||||
|
@ -1013,40 +1056,7 @@ void Manager::ProcessStoreEvent(broker::data msg)
|
||||||
if ( insert.publisher() == storehandle->store_pid )
|
if ( insert.publisher() == storehandle->store_pid )
|
||||||
return;
|
return;
|
||||||
|
|
||||||
auto key = insert.key();
|
ProcessStoreEventInsertUpdate(table, insert.store_id(), insert.key(), insert.value(), {}, true);
|
||||||
auto data = insert.value();
|
|
||||||
|
|
||||||
DBG_LOG(DBG_BROKER, "Store %s: Insert: %s:%s (%s:%s)", insert.store_id().c_str(), to_string(insert.key()).c_str(), to_string(insert.value()).c_str(), insert.key().get_type_name(), insert.value().get_type_name());
|
|
||||||
|
|
||||||
if ( table->GetType()->IsSet() && data.get_type() != broker::data::type::none )
|
|
||||||
{
|
|
||||||
reporter->Error("ProcessStoreEvent Insert got %s when expecting set", data.get_type_name());
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const auto& its = table->GetType()->AsTableType()->GetIndexTypes();
|
|
||||||
assert( its.size() == 1 );
|
|
||||||
auto zeek_key = data_to_val(key, its[0].get());
|
|
||||||
if ( ! zeek_key )
|
|
||||||
{
|
|
||||||
reporter->Error("ProcessStoreEvent: could not convert key \"%s\" for store \"%s\" while receiving remote insert. This probably means the tables have different types on different nodes.", to_string(key).c_str(), insert.store_id().c_str());
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ( table->GetType()->IsSet() )
|
|
||||||
{
|
|
||||||
table->Assign(zeek_key, nullptr, false);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// it is a table
|
|
||||||
auto zeek_value = data_to_val(data, table->GetType()->Yield().get());
|
|
||||||
if ( ! zeek_value )
|
|
||||||
{
|
|
||||||
reporter->Error("ProcessStoreEvent: could not convert value \"%s\" for key \"%s\" in store \"%s\" while receiving remote insert. This probably means the tables have different types on different nodes.", to_string(data).c_str(), to_string(key).c_str(), insert.store_id().c_str());
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
table->Assign(zeek_key, zeek_value, false);
|
|
||||||
}
|
}
|
||||||
else if ( auto update = broker::store_event::update::make(msg) )
|
else if ( auto update = broker::store_event::update::make(msg) )
|
||||||
{
|
{
|
||||||
|
@ -1062,40 +1072,7 @@ void Manager::ProcessStoreEvent(broker::data msg)
|
||||||
if ( update.publisher() == storehandle->store_pid )
|
if ( update.publisher() == storehandle->store_pid )
|
||||||
return;
|
return;
|
||||||
|
|
||||||
auto key = update.key();
|
ProcessStoreEventInsertUpdate(table, update.store_id(), update.key(), update.new_value(), update.old_value(), false);
|
||||||
auto data = update.new_value();
|
|
||||||
|
|
||||||
DBG_LOG(DBG_BROKER, "Store %s: Update: %s->%s (%s)", update.store_id().c_str(), to_string(update.old_value()).c_str(), to_string(update.new_value()).c_str(), update.new_value().get_type_name());
|
|
||||||
|
|
||||||
if ( table->GetType()->IsSet() && data.get_type() != broker::data::type::none )
|
|
||||||
{
|
|
||||||
reporter->Error("ProcessStoreEvent Update got %s when expecting set", data.get_type_name());
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const auto& its = table->GetType()->AsTableType()->GetIndexTypes();
|
|
||||||
assert( its.size() == 1 );
|
|
||||||
auto zeek_key = data_to_val(key, its[0].get());
|
|
||||||
if ( ! zeek_key )
|
|
||||||
{
|
|
||||||
reporter->Error("ProcessStoreEvent: could not convert key \"%s\" for store \"%s\" while receiving remote update. This probably means the tables have different types on different nodes.", to_string(key).c_str(), insert.store_id().c_str());
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ( table->GetType()->IsSet() )
|
|
||||||
{
|
|
||||||
table->Assign(zeek_key, nullptr, false);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// it is a table
|
|
||||||
auto zeek_value = data_to_val(data, table->GetType()->Yield().get());
|
|
||||||
if ( ! zeek_value )
|
|
||||||
{
|
|
||||||
reporter->Error("ProcessStoreEvent: could not convert value \"%s\" for key \"%s\" in store \"%s\" while receiving remote update. This probably means the tables have different types on different nodes.", to_string(data).c_str(), to_string(key).c_str(), insert.store_id().c_str());
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
table->Assign(zeek_key, zeek_value, false);
|
|
||||||
}
|
}
|
||||||
else if ( auto erase = broker::store_event::erase::make(msg) )
|
else if ( auto erase = broker::store_event::erase::make(msg) )
|
||||||
{
|
{
|
||||||
|
|
|
@ -361,6 +361,8 @@ private:
|
||||||
void DispatchMessage(const broker::topic& topic, broker::data msg);
|
void DispatchMessage(const broker::topic& topic, broker::data msg);
|
||||||
// Process events used for Broker store backed zeek tables
|
// Process events used for Broker store backed zeek tables
|
||||||
void ProcessStoreEvent(broker::data msg);
|
void ProcessStoreEvent(broker::data msg);
|
||||||
|
// Common functionality for processing insert and update events.
|
||||||
|
void ProcessStoreEventInsertUpdate(zeek::IntrusivePtr<zeek::TableVal> table, const std::string& store_id, const broker::data& key, const broker::data& data, const broker::data& old_value, bool insert);
|
||||||
void ProcessEvent(const broker::topic& topic, broker::zeek::Event ev);
|
void ProcessEvent(const broker::topic& topic, broker::zeek::Event ev);
|
||||||
bool ProcessLogCreate(broker::zeek::LogCreate lc);
|
bool ProcessLogCreate(broker::zeek::LogCreate lc);
|
||||||
bool ProcessLogWrite(broker::zeek::LogWrite lw);
|
bool ProcessLogWrite(broker::zeek::LogWrite lw);
|
||||||
|
|
|
@ -1,3 +1,3 @@
|
||||||
error: ProcessStoreEvent: could not convert value "b" for key "a" in store "___sync_store_TestModule::s" while receiving remote insert. This probably means the tables have different types on different nodes.
|
error: ProcessStoreEvent Insert: could not convert value "b" for key "a" in store "___sync_store_TestModule::s" while receiving remote data. This probably means the tables have different types on different nodes.
|
||||||
error: ProcessStoreEvent: could not convert key "a" for store "___sync_store_TestModule::t" while receiving remote insert. This probably means the tables have different types on different nodes.
|
error: ProcessStoreEvent Insert: could not convert key "a" for store "___sync_store_TestModule::t" while receiving remote data. This probably means the tables have different types on different nodes.
|
||||||
received termination signal
|
received termination signal
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue