Broker Store updates: get a bit more infrastructure in place.

This compiles, but besides giving debug messages (and partially
performing inserts/updates) it is not really helpful and definitely WIP.

This also shows that I might have to re-think the approach that we will
take here. So far, we actually insert tables as tables into
Brokerstores. This opens up the potential to just have several tables
synchronized via a single brokerstore.

However, it turns out, that the current store_event API sends the
complete table with each update. Which is problematic for obvious
reasons - and not really sustainable.
This commit is contained in:
Johanna Amann 2020-05-29 14:32:16 -07:00
parent 8db83a5ed2
commit 558e89b3ba
5 changed files with 164 additions and 26 deletions

View file

@ -2079,14 +2079,27 @@ void TableVal::SendToStore(const Val* index, const Val* new_value, OnChangeType
if ( ! handle ) if ( ! handle )
return; return;
// we wither get passed the raw index_val - or a ListVal with exactly one element.
// Since broker does not support ListVals, we have to unoll this in the second case.
const Val* index_val;
if ( index->Type()->Tag() == TYPE_LIST )
{
if ( index->AsListVal()->Length() != 1 ) if ( index->AsListVal()->Length() != 1 )
{ {
builtin_error("table with complex index not supported for &broker_store"); builtin_error("table with complex index not supported for &broker_store");
return; return;
} }
const auto index_val = index->AsListVal()->Index(0); index_val = index->AsListVal()->Index(0);
auto key_val = new StringVal("test"); }
else
{
index_val = index;
}
// FIXME: at the moment this is hardcoded to the name of the broker store. I needed something to be able to tell
// me which store a change came from - and this still seems to be missing from the store_events. (Or I am blind).
auto key_val = new StringVal(broker_store);
auto broker_key = bro_broker::val_to_data(key_val); auto broker_key = bro_broker::val_to_data(key_val);
auto broker_index = bro_broker::val_to_data(index_val); auto broker_index = bro_broker::val_to_data(index_val);
Unref(key_val); Unref(key_val);

View file

@ -909,15 +909,7 @@ void Manager::Process()
if ( topic == broker::topics::store_events ) if ( topic == broker::topics::store_events )
{ {
if (auto insert = broker::store_event::insert::make(msg)) ProcessStoreEvent(topic, std::move(msg));
{
reporter->Warning("It is an insert!");
reporter->Warning("Key/Data (endpoint): %s/%s (%s)", to_string(insert.key()).c_str(), to_string(insert.value()).c_str(), to_string(insert.publisher()).c_str());
}
else
{
reporter->Warning("Unhandled event type");
}
continue; continue;
} }
@ -956,6 +948,132 @@ void Manager::Process()
} }
} }
void Manager::ProcessStoreEvent(const broker::topic& topic, broker::data msg)
{
// for all of the following we currently cheat. The key always is the name of the store. This will change
// in the future.
if ( auto insert = broker::store_event::insert::make(msg) )
{
auto remstore = to_string(insert.key());
auto storehandle = broker_mgr->LookupStore(remstore);
if ( ! storehandle )
return;
auto table = storehandle->forward_to;
if ( ! table )
return;
auto data = insert.value();
reporter->Warning("Insert Data: %s (%s)", to_string(insert.value()).c_str(), insert.value().get_type_name());
if ( table->Type()->IsSet() && data.get_type() != broker::data::type::set )
{
reporter->Error("ProcessStoreEvent Insert got %s when expecting set", data.get_type_name());
return;
}
else if ( ! table->Type()->IsSet() && data.get_type() != broker::data::type::table )
{
reporter->Error("ProcessStoreEvent Insert got %s when expecting table", data.get_type_name());
return;
}
// We sent this message. Ignore it.
//if ( insert.publisher() == storehandle->store_pid )
// continue;
// We currently use data_to_val here. It creates a bit of overhead, but makes the code easier. We could change
// this to manual unrolling in the future.
auto val = data_to_val(std::move(data), table->Type());
// So we are just doing it manually - at least for now.
auto temptable = val->AsTable();
auto temptableval = val->AsTableVal();
HashKey* k;
TableEntryVal* entry;
auto c = temptable->InitForIteration();
while ( (entry = temptable->NextEntry(k, c)))
{
auto lv = temptableval->RecoverIndex(k);
delete k;
Val* entry_key = lv->Length() == 1 ? lv->Index(0) : lv.get();
// note - at the moment this code creates a loop
// Fixme: expiry!
if ( temptableval->Type()->IsSet() )
table->Assign(entry_key, nullptr);
else
table->Assign(entry_key, entry->Value());
}
}
else if ( auto update = broker::store_event::update::make(msg) )
{
auto remstore = to_string(update.key());
auto storehandle = broker_mgr->LookupStore(remstore);
if ( ! storehandle )
return;
auto table = storehandle->forward_to;
if ( ! table )
return;
auto data = update.new_value();
reporter->Warning("Update Data: %s->%s (%s)", to_string(update.old_value()).c_str(), to_string(update.new_value()).c_str(), update.new_value().get_type_name());
if ( table->Type()->IsSet() && data.get_type() != broker::data::type::set )
{
reporter->Error("ProcessStoreEvent Update got %s when expecting set", data.get_type_name());
return;
}
else if ( ! table->Type()->IsSet() && data.get_type() != broker::data::type::table )
{
reporter->Error("ProcessStoreEvent Update got %s when expecting table", data.get_type_name());
return;
}
// We sent this message. Ignore it.
if ( update.publisher() == storehandle->store_pid )
return;
// We currently use data_to_val here. It creates a bit of overhead, but makes the code easier. We could change
// this to manual unrolling in the future.
auto val = data_to_val(std::move(data), table->Type());
// So we are just doing it manually - at least for now.
auto temptable = val->AsTable();
auto temptableval = val->AsTableVal();
HashKey* k;
TableEntryVal* entry;
auto c = temptable->InitForIteration();
while ( (entry = temptable->NextEntry(k, c)))
{
auto lv = temptableval->RecoverIndex(k);
delete k;
Val* entry_key = lv->Length() == 1 ? lv->Index(0) : lv.get();
// note - at the moment this code creates a loop
if ( temptableval->Type()->IsSet() )
table->Assign(entry_key, nullptr);
else
table->Assign(entry_key, entry->Value());
}
}
else if ( auto erase = broker::store_event::erase::make(msg) )
{
auto remstore = to_string(erase.key());
auto storehandle = broker_mgr->LookupStore(remstore);
if ( ! storehandle )
return;
auto table = storehandle->forward_to;
if ( ! table )
return;
// I'm actually not sure we can get an erase - since we will never delete keys...
reporter->Warning("Erase for key %s", remstore.c_str());
}
else
{
reporter->Warning("Unhandled event type");
}
}
void Manager::ProcessEvent(const broker::topic& topic, broker::zeek::Event ev) void Manager::ProcessEvent(const broker::topic& topic, broker::zeek::Event ev)
{ {
@ -1522,7 +1640,6 @@ bool Manager::CloseStore(const string& name)
return false; return false;
auto pubid = s->second->store.frontend_id(); auto pubid = s->second->store.frontend_id();
forwarded_ids.erase(pubid);
iosource_mgr->UnregisterFd(s->second->proxy.mailbox().descriptor(), this); iosource_mgr->UnregisterFd(s->second->proxy.mailbox().descriptor(), this);
@ -1590,10 +1707,8 @@ void Manager::CheckForwarding(const std::string &name)
if ( forwarded_stores.find(name) == forwarded_stores.end() ) if ( forwarded_stores.find(name) == forwarded_stores.end() )
return; return;
auto pubid = handle->store.frontend_id(); handle->forward_to = forwarded_stores.at(name);
DBG_LOG(DBG_BROKER, "Resolved table forward for data store %s", name.c_str());
DBG_LOG(DBG_BROKER, "Resolved publishder %s for table forward for data store %s", to_string(pubid).c_str(), name.c_str());
forwarded_ids.emplace(pubid, forwarded_stores.at(name));
} }
} // namespace bro_broker } // namespace bro_broker

View file

@ -342,6 +342,7 @@ public:
private: private:
void DispatchMessage(const broker::topic& topic, broker::data msg); void DispatchMessage(const broker::topic& topic, broker::data msg);
void ProcessStoreEvent(const broker::topic& topic, broker::data msg);
void ProcessEvent(const broker::topic& topic, broker::zeek::Event ev); void ProcessEvent(const broker::topic& topic, broker::zeek::Event ev);
bool ProcessLogCreate(broker::zeek::LogCreate lc); bool ProcessLogCreate(broker::zeek::LogCreate lc);
bool ProcessLogWrite(broker::zeek::LogWrite lw); bool ProcessLogWrite(broker::zeek::LogWrite lw);
@ -386,7 +387,6 @@ private:
std::shared_ptr<BrokerState> bstate; std::shared_ptr<BrokerState> bstate;
std::unordered_map<std::string, StoreHandleVal*> data_stores; std::unordered_map<std::string, StoreHandleVal*> data_stores;
std::unordered_map<std::string, IntrusivePtr<TableVal>> forwarded_stores; std::unordered_map<std::string, IntrusivePtr<TableVal>> forwarded_stores;
std::unordered_map<broker::publisher_id, IntrusivePtr<TableVal>> forwarded_ids;
std::unordered_map<query_id, StoreQueryCallback*, std::unordered_map<query_id, StoreQueryCallback*,
query_id_hasher> pending_queries; query_id_hasher> pending_queries;
std::vector<std::string> forwarded_prefixes; std::vector<std::string> forwarded_prefixes;

View file

@ -95,13 +95,16 @@ private:
class StoreHandleVal : public OpaqueVal { class StoreHandleVal : public OpaqueVal {
public: public:
StoreHandleVal(broker::store s) StoreHandleVal(broker::store s)
: OpaqueVal(bro_broker::opaque_of_store_handle), store{s}, proxy{store} : OpaqueVal(bro_broker::opaque_of_store_handle), store{s}, proxy{store}, store_pid{store.frontend_id()}
{ } { }
void ValDescribe(ODesc* d) const override; void ValDescribe(ODesc* d) const override;
broker::store store; broker::store store;
broker::store::proxy proxy; broker::store::proxy proxy;
broker::publisher_id store_pid;
// Zeek table that events are forwarded to.
IntrusivePtr<TableVal> forward_to;
protected: protected:
StoreHandleVal() = default; StoreHandleVal() = default;

View file

@ -6,21 +6,28 @@ redef exit_only_after_terminate = T;
module TestModule; module TestModule;
global tablestore: opaque of Broker::Store; global tablestore: opaque of Broker::Store;
#global tabletwostore: opaque of Broker::Store;
global setstore: opaque of Broker::Store; global setstore: opaque of Broker::Store;
global t: table[string] of count &broker_store="table"; global t: table[string] of count &broker_store="table";
#global ct: table[string, string] of count &broker_store="table2";
global s: set[string] &broker_store="set"; global s: set[string] &broker_store="set";
event zeek_init() event zeek_init()
{ {
tablestore = Broker::create_master("table"); tablestore = Broker::create_master("table");
#tabletwostore = Broker::create_master("table2");
setstore = Broker::create_master("set"); setstore = Broker::create_master("set");
print "inserting"; print "inserting";
t["a"] = 5; t["a"] = 5;
add s["hi"];
print "changing";
t["a"] = 2;
print "deleting";
delete t["a"]; delete t["a"];
delete s["hi"]; #add s["hi"];
#print "changing";
t["a"] = 2;
t["b"] = 3;
t["c"] = 4;
t["whatever"] = 5;
#print "deleting";
#delete t["a"];
#delete s["hi"];
} }