From 8db83a5ed276dfb1f1af78eef9f60d55e0190c5b Mon Sep 17 00:00:00 2001 From: Johanna Amann Date: Thu, 28 May 2020 12:26:58 -0700 Subject: [PATCH] Make &broker_store take argument of type string. It turns out that directly passing a Broker::Store is not really a bright idea. Because - if we do that - we have to later try to intercept when the master/clone is generated to figure out what the actual name of the backing store is. Turns out that it is much easier to just use the name directly - and check if a store with that name exists when we want to insert something. I might want to reconsider this in the future in any case. At the moment this approach just stores one table into an entire store. In theory, one store should be able to handle several tables, but... that's more complex. So let's start with this for now. --- src/Attr.cc | 9 +++-- src/Val.cc | 38 +++++++++--------- src/Val.h | 2 +- src/broker/Manager.cc | 52 ++++++++++++++++++++++++- src/broker/Manager.h | 6 +++ src/broker/Store.h | 1 + src/scan.l | 1 + testing/btest/language/brokerstore.test | 6 ++- 8 files changed, 88 insertions(+), 27 deletions(-) diff --git a/src/Attr.cc b/src/Attr.cc index 32c318a825..6a736629ac 100644 --- a/src/Attr.cc +++ b/src/Attr.cc @@ -558,10 +558,11 @@ void Attributes::CheckAttr(Attr* a) break; } - const Expr *broker_store = a->AttrExpr(); - if ( broker_store->Type()->Tag() != TYPE_OPAQUE || broker_store->Type()->AsOpaqueType()->Name() != "Broker::Store" ) - Error("&broker_store must take an opaque of Broker::Store"); - + if ( a->AttrExpr()->Type()->Tag() != TYPE_STRING ) + { + Error("&broker_store must take a string argument"); + break; + } // Temporary since Broker does not support ListVals - and we cannot easily convert to set/vector if ( type->AsTableType()->IndexTypes()->length() != 1 ) diff --git a/src/Val.cc b/src/Val.cc index e023ce69f5..c38f185a8c 100644 --- a/src/Val.cc +++ b/src/Val.cc @@ -38,6 +38,7 @@ #include "broker/Data.h" #include "broker/Store.h" +#include "broker/Manager.h" #include "threading/formatters/JSON.h" @@ -1471,8 +1472,14 @@ void TableVal::SetAttrs(IntrusivePtr a) change_func = {NewRef{}, cf->AttrExpr()}; auto bs = attrs->FindAttr(ATTR_BROKER_STORE); - if ( bs ) - broker_store = {NewRef{}, bs->AttrExpr()}; + if ( bs && broker_store.empty() ) // this does not mesh well with being updated several times + { + IntrusivePtr c = bs->AttrExpr()->Eval(nullptr); + assert(c); + assert(c->Type()->Tag() == TYPE_STRING); + broker_store = c->AsStringVal()->AsString()->CheckString(); + broker_mgr->AddForwardedStore(broker_store, {NewRef{}, this}); + } } void TableVal::CheckExpireAttr(attr_tag at) @@ -1552,16 +1559,16 @@ bool TableVal::Assign(Val* index, HashKey* k, IntrusivePtr new_val) Modified(); - if ( change_func || broker_store ) + if ( change_func || ( ! broker_store.empty() ) ) { auto change_index = index ? IntrusivePtr{NewRef{}, index} : RecoverIndex(&k_copy); - if ( broker_store ) + if ( ! broker_store.empty() ) SendToStore(change_index.get(), new_val.get(), old_entry_val ? ELEMENT_CHANGED : ELEMENT_NEW); if ( change_func ) { - Val* v = old_entry_val ? old_entry_val->Value() : new_val.get(); - CallChangeFunc(change_index.get(), v, old_entry_val ? ELEMENT_CHANGED : ELEMENT_NEW); + Val* v = old_entry_val ? old_entry_val->Value() : new_val.get(); + CallChangeFunc(change_index.get(), v, old_entry_val ? ELEMENT_CHANGED : ELEMENT_NEW); } } @@ -2062,23 +2069,16 @@ void TableVal::CallChangeFunc(const Val* index, Val* old_value, OnChangeType tpe void TableVal::SendToStore(const Val* index, const Val* new_value, OnChangeType tpe) { - if ( ! broker_store || ! index ) + if ( broker_store.empty() || ! index ) return; try { - auto thestore = broker_store->Eval(0); + auto handle = broker_mgr->LookupStore(broker_store); - if ( ! thestore ) + if ( ! handle ) return; - if ( thestore->Type()->Tag() != TYPE_OPAQUE || broker_store->Type()->AsOpaqueType()->Name() != "Broker::Store" ) - { - thestore->Error("not a Broker::Store"); - return; - } - - auto handle = static_cast(thestore.get()); if ( index->AsListVal()->Length() != 1 ) { builtin_error("table with complex index not supported for &broker_store"); @@ -2149,7 +2149,7 @@ IntrusivePtr TableVal::Delete(const Val* index) Modified(); - if ( broker_store ) + if ( ! broker_store.empty() ) SendToStore(index, nullptr, ELEMENT_REMOVED); if ( change_func ) CallChangeFunc(index, va.get(), ELEMENT_REMOVED); @@ -2174,10 +2174,10 @@ IntrusivePtr TableVal::Delete(const HashKey* k) Modified(); - if ( ( change_func && va ) || broker_store ) + if ( ( change_func && va ) || ( ! broker_store.empty() ) ) { auto index = table_hash->RecoverVals(k); - if ( broker_store ) + if ( ! broker_store.empty() ) SendToStore(index.get(), nullptr, ELEMENT_REMOVED); if ( change_func && va ) CallChangeFunc(index.get(), va.get(), ELEMENT_REMOVED); diff --git a/src/Val.h b/src/Val.h index 4a074cdfa3..fdaa917565 100644 --- a/src/Val.h +++ b/src/Val.h @@ -895,7 +895,7 @@ protected: PrefixTable* subnets; IntrusivePtr def_val; IntrusivePtr change_func; - IntrusivePtr broker_store; + std::string broker_store; // prevent recursion of change functions bool in_change_func = false; diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index d54b061580..d7ce3a3054 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -212,6 +212,8 @@ void Manager::InitPostScript() reporter->FatalError("Failed to register broker subscriber with iosource_mgr"); if ( ! iosource_mgr->RegisterFd(bstate->status_subscriber.fd(), this) ) reporter->FatalError("Failed to register broker status subscriber with iosource_mgr"); + + bstate->subscriber.add_topic(broker::topics::store_events, true); } void Manager::Terminate() @@ -905,6 +907,20 @@ void Manager::Process() auto& topic = broker::get_topic(message); auto& msg = broker::get_data(message); + if ( topic == broker::topics::store_events ) + { + if (auto insert = broker::store_event::insert::make(msg)) + { + reporter->Warning("It is an insert!"); + reporter->Warning("Key/Data (endpoint): %s/%s (%s)", to_string(insert.key()).c_str(), to_string(insert.value()).c_str(), to_string(insert.publisher()).c_str()); + } + else + { + reporter->Warning("Unhandled event type"); + } + continue; + } + try { DispatchMessage(topic, std::move(msg)); @@ -1449,6 +1465,7 @@ StoreHandleVal* Manager::MakeMaster(const string& name, broker::backend type, data_stores.emplace(name, handle); iosource_mgr->RegisterFd(handle->proxy.mailbox().descriptor(), this); + CheckForwarding(name); if ( bstate->endpoint.use_real_time() ) return handle; @@ -1486,7 +1503,7 @@ StoreHandleVal* Manager::MakeClone(const string& name, double resync_interval, data_stores.emplace(name, handle); iosource_mgr->RegisterFd(handle->proxy.mailbox().descriptor(), this); - + CheckForwarding(name); return handle; } @@ -1504,6 +1521,9 @@ bool Manager::CloseStore(const string& name) if ( s == data_stores.end() ) return false; + auto pubid = s->second->store.frontend_id(); + forwarded_ids.erase(pubid); + iosource_mgr->UnregisterFd(s->second->proxy.mailbox().descriptor(), this); for ( auto i = pending_queries.begin(); i != pending_queries.end(); ) @@ -1546,4 +1566,34 @@ const Stats& Manager::GetStatistics() return statistics; } +bool Manager::AddForwardedStore(const std::string& name, IntrusivePtr table) + { + if ( forwarded_stores.find(name) != forwarded_stores.end() ) + { + reporter->Error("same &broker_store %s specified for two different variables", name.c_str()); + return false; + } + + DBG_LOG(DBG_BROKER, "Adding table forward for data store %s", name.c_str()); + forwarded_stores.emplace(name, table); + + CheckForwarding(name); + return true; + } + +void Manager::CheckForwarding(const std::string &name) + { + auto handle = LookupStore(name); + if ( ! handle ) + return; + + if ( forwarded_stores.find(name) == forwarded_stores.end() ) + return; + + auto pubid = handle->store.frontend_id(); + + DBG_LOG(DBG_BROKER, "Resolved publishder %s for table forward for data store %s", to_string(pubid).c_str(), name.c_str()); + forwarded_ids.emplace(pubid, forwarded_stores.at(name)); + } + } // namespace bro_broker diff --git a/src/broker/Manager.h b/src/broker/Manager.h index bbd38f88be..2746b89142 100644 --- a/src/broker/Manager.h +++ b/src/broker/Manager.h @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -294,6 +295,8 @@ public: */ StoreHandleVal* LookupStore(const std::string& name); + bool AddForwardedStore(const std::string& name, IntrusivePtr table); + /** * Close and unregister a data store. Any existing references to the * store handle will not be able to be used for any data store operations. @@ -347,6 +350,7 @@ private: void ProcessError(broker::error err); void ProcessStoreResponse(StoreHandleVal*, broker::store::response response); void FlushPendingQueries(); + void CheckForwarding(const std::string& name); void Error(const char* format, ...) __attribute__((format (printf, 2, 3))); @@ -381,6 +385,8 @@ private: std::string default_log_topic_prefix; std::shared_ptr bstate; std::unordered_map data_stores; + std::unordered_map> forwarded_stores; + std::unordered_map> forwarded_ids; std::unordered_map pending_queries; std::vector forwarded_prefixes; diff --git a/src/broker/Store.h b/src/broker/Store.h index 09799c8c68..4bf6dee717 100644 --- a/src/broker/Store.h +++ b/src/broker/Store.h @@ -6,6 +6,7 @@ #include "Trigger.h" #include +#include #include #include diff --git a/src/scan.l b/src/scan.l index 0fd1a65085..6f598cfe08 100644 --- a/src/scan.l +++ b/src/scan.l @@ -310,6 +310,7 @@ when return TOK_WHEN; &redef return TOK_ATTR_REDEF; &write_expire return TOK_ATTR_EXPIRE_WRITE; &on_change return TOK_ATTR_ON_CHANGE; +&broker_store return TOK_ATTR_BROKER_STORE; @deprecated.* { auto num_files = file_stack.length(); diff --git a/testing/btest/language/brokerstore.test b/testing/btest/language/brokerstore.test index 021a0d7463..cb1e207b36 100644 --- a/testing/btest/language/brokerstore.test +++ b/testing/btest/language/brokerstore.test @@ -1,13 +1,15 @@ # @TEST-EXEC: zeek %INPUT >output # @TEST-EXEC: btest-diff output +redef exit_only_after_terminate = T; + module TestModule; global tablestore: opaque of Broker::Store; global setstore: opaque of Broker::Store; -global t: table[string] of count &broker_store=tablestore; -global s: set[string] &broker_store=setstore; +global t: table[string] of count &broker_store="table"; +global s: set[string] &broker_store="set"; event zeek_init() {