diff --git a/src/Attr.cc b/src/Attr.cc index 32c318a825..6a736629ac 100644 --- a/src/Attr.cc +++ b/src/Attr.cc @@ -558,10 +558,11 @@ void Attributes::CheckAttr(Attr* a) break; } - const Expr *broker_store = a->AttrExpr(); - if ( broker_store->Type()->Tag() != TYPE_OPAQUE || broker_store->Type()->AsOpaqueType()->Name() != "Broker::Store" ) - Error("&broker_store must take an opaque of Broker::Store"); - + if ( a->AttrExpr()->Type()->Tag() != TYPE_STRING ) + { + Error("&broker_store must take a string argument"); + break; + } // Temporary since Broker does not support ListVals - and we cannot easily convert to set/vector if ( type->AsTableType()->IndexTypes()->length() != 1 ) diff --git a/src/Val.cc b/src/Val.cc index e023ce69f5..c38f185a8c 100644 --- a/src/Val.cc +++ b/src/Val.cc @@ -38,6 +38,7 @@ #include "broker/Data.h" #include "broker/Store.h" +#include "broker/Manager.h" #include "threading/formatters/JSON.h" @@ -1471,8 +1472,14 @@ void TableVal::SetAttrs(IntrusivePtr a) change_func = {NewRef{}, cf->AttrExpr()}; auto bs = attrs->FindAttr(ATTR_BROKER_STORE); - if ( bs ) - broker_store = {NewRef{}, bs->AttrExpr()}; + if ( bs && broker_store.empty() ) // this does not mesh well with being updated several times + { + IntrusivePtr c = bs->AttrExpr()->Eval(nullptr); + assert(c); + assert(c->Type()->Tag() == TYPE_STRING); + broker_store = c->AsStringVal()->AsString()->CheckString(); + broker_mgr->AddForwardedStore(broker_store, {NewRef{}, this}); + } } void TableVal::CheckExpireAttr(attr_tag at) @@ -1552,16 +1559,16 @@ bool TableVal::Assign(Val* index, HashKey* k, IntrusivePtr new_val) Modified(); - if ( change_func || broker_store ) + if ( change_func || ( ! broker_store.empty() ) ) { auto change_index = index ? IntrusivePtr{NewRef{}, index} : RecoverIndex(&k_copy); - if ( broker_store ) + if ( ! broker_store.empty() ) SendToStore(change_index.get(), new_val.get(), old_entry_val ? ELEMENT_CHANGED : ELEMENT_NEW); if ( change_func ) { - Val* v = old_entry_val ? old_entry_val->Value() : new_val.get(); - CallChangeFunc(change_index.get(), v, old_entry_val ? ELEMENT_CHANGED : ELEMENT_NEW); + Val* v = old_entry_val ? old_entry_val->Value() : new_val.get(); + CallChangeFunc(change_index.get(), v, old_entry_val ? ELEMENT_CHANGED : ELEMENT_NEW); } } @@ -2062,23 +2069,16 @@ void TableVal::CallChangeFunc(const Val* index, Val* old_value, OnChangeType tpe void TableVal::SendToStore(const Val* index, const Val* new_value, OnChangeType tpe) { - if ( ! broker_store || ! index ) + if ( broker_store.empty() || ! index ) return; try { - auto thestore = broker_store->Eval(0); + auto handle = broker_mgr->LookupStore(broker_store); - if ( ! thestore ) + if ( ! handle ) return; - if ( thestore->Type()->Tag() != TYPE_OPAQUE || broker_store->Type()->AsOpaqueType()->Name() != "Broker::Store" ) - { - thestore->Error("not a Broker::Store"); - return; - } - - auto handle = static_cast(thestore.get()); if ( index->AsListVal()->Length() != 1 ) { builtin_error("table with complex index not supported for &broker_store"); @@ -2149,7 +2149,7 @@ IntrusivePtr TableVal::Delete(const Val* index) Modified(); - if ( broker_store ) + if ( ! broker_store.empty() ) SendToStore(index, nullptr, ELEMENT_REMOVED); if ( change_func ) CallChangeFunc(index, va.get(), ELEMENT_REMOVED); @@ -2174,10 +2174,10 @@ IntrusivePtr TableVal::Delete(const HashKey* k) Modified(); - if ( ( change_func && va ) || broker_store ) + if ( ( change_func && va ) || ( ! broker_store.empty() ) ) { auto index = table_hash->RecoverVals(k); - if ( broker_store ) + if ( ! broker_store.empty() ) SendToStore(index.get(), nullptr, ELEMENT_REMOVED); if ( change_func && va ) CallChangeFunc(index.get(), va.get(), ELEMENT_REMOVED); diff --git a/src/Val.h b/src/Val.h index 4a074cdfa3..fdaa917565 100644 --- a/src/Val.h +++ b/src/Val.h @@ -895,7 +895,7 @@ protected: PrefixTable* subnets; IntrusivePtr def_val; IntrusivePtr change_func; - IntrusivePtr broker_store; + std::string broker_store; // prevent recursion of change functions bool in_change_func = false; diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index d54b061580..d7ce3a3054 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -212,6 +212,8 @@ void Manager::InitPostScript() reporter->FatalError("Failed to register broker subscriber with iosource_mgr"); if ( ! iosource_mgr->RegisterFd(bstate->status_subscriber.fd(), this) ) reporter->FatalError("Failed to register broker status subscriber with iosource_mgr"); + + bstate->subscriber.add_topic(broker::topics::store_events, true); } void Manager::Terminate() @@ -905,6 +907,20 @@ void Manager::Process() auto& topic = broker::get_topic(message); auto& msg = broker::get_data(message); + if ( topic == broker::topics::store_events ) + { + if (auto insert = broker::store_event::insert::make(msg)) + { + reporter->Warning("It is an insert!"); + reporter->Warning("Key/Data (endpoint): %s/%s (%s)", to_string(insert.key()).c_str(), to_string(insert.value()).c_str(), to_string(insert.publisher()).c_str()); + } + else + { + reporter->Warning("Unhandled event type"); + } + continue; + } + try { DispatchMessage(topic, std::move(msg)); @@ -1449,6 +1465,7 @@ StoreHandleVal* Manager::MakeMaster(const string& name, broker::backend type, data_stores.emplace(name, handle); iosource_mgr->RegisterFd(handle->proxy.mailbox().descriptor(), this); + CheckForwarding(name); if ( bstate->endpoint.use_real_time() ) return handle; @@ -1486,7 +1503,7 @@ StoreHandleVal* Manager::MakeClone(const string& name, double resync_interval, data_stores.emplace(name, handle); iosource_mgr->RegisterFd(handle->proxy.mailbox().descriptor(), this); - + CheckForwarding(name); return handle; } @@ -1504,6 +1521,9 @@ bool Manager::CloseStore(const string& name) if ( s == data_stores.end() ) return false; + auto pubid = s->second->store.frontend_id(); + forwarded_ids.erase(pubid); + iosource_mgr->UnregisterFd(s->second->proxy.mailbox().descriptor(), this); for ( auto i = pending_queries.begin(); i != pending_queries.end(); ) @@ -1546,4 +1566,34 @@ const Stats& Manager::GetStatistics() return statistics; } +bool Manager::AddForwardedStore(const std::string& name, IntrusivePtr table) + { + if ( forwarded_stores.find(name) != forwarded_stores.end() ) + { + reporter->Error("same &broker_store %s specified for two different variables", name.c_str()); + return false; + } + + DBG_LOG(DBG_BROKER, "Adding table forward for data store %s", name.c_str()); + forwarded_stores.emplace(name, table); + + CheckForwarding(name); + return true; + } + +void Manager::CheckForwarding(const std::string &name) + { + auto handle = LookupStore(name); + if ( ! handle ) + return; + + if ( forwarded_stores.find(name) == forwarded_stores.end() ) + return; + + auto pubid = handle->store.frontend_id(); + + DBG_LOG(DBG_BROKER, "Resolved publishder %s for table forward for data store %s", to_string(pubid).c_str(), name.c_str()); + forwarded_ids.emplace(pubid, forwarded_stores.at(name)); + } + } // namespace bro_broker diff --git a/src/broker/Manager.h b/src/broker/Manager.h index bbd38f88be..2746b89142 100644 --- a/src/broker/Manager.h +++ b/src/broker/Manager.h @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -294,6 +295,8 @@ public: */ StoreHandleVal* LookupStore(const std::string& name); + bool AddForwardedStore(const std::string& name, IntrusivePtr table); + /** * Close and unregister a data store. Any existing references to the * store handle will not be able to be used for any data store operations. @@ -347,6 +350,7 @@ private: void ProcessError(broker::error err); void ProcessStoreResponse(StoreHandleVal*, broker::store::response response); void FlushPendingQueries(); + void CheckForwarding(const std::string& name); void Error(const char* format, ...) __attribute__((format (printf, 2, 3))); @@ -381,6 +385,8 @@ private: std::string default_log_topic_prefix; std::shared_ptr bstate; std::unordered_map data_stores; + std::unordered_map> forwarded_stores; + std::unordered_map> forwarded_ids; std::unordered_map pending_queries; std::vector forwarded_prefixes; diff --git a/src/broker/Store.h b/src/broker/Store.h index 09799c8c68..4bf6dee717 100644 --- a/src/broker/Store.h +++ b/src/broker/Store.h @@ -6,6 +6,7 @@ #include "Trigger.h" #include +#include #include #include diff --git a/src/scan.l b/src/scan.l index 0fd1a65085..6f598cfe08 100644 --- a/src/scan.l +++ b/src/scan.l @@ -310,6 +310,7 @@ when return TOK_WHEN; &redef return TOK_ATTR_REDEF; &write_expire return TOK_ATTR_EXPIRE_WRITE; &on_change return TOK_ATTR_ON_CHANGE; +&broker_store return TOK_ATTR_BROKER_STORE; @deprecated.* { auto num_files = file_stack.length(); diff --git a/testing/btest/language/brokerstore.test b/testing/btest/language/brokerstore.test index 021a0d7463..cb1e207b36 100644 --- a/testing/btest/language/brokerstore.test +++ b/testing/btest/language/brokerstore.test @@ -1,13 +1,15 @@ # @TEST-EXEC: zeek %INPUT >output # @TEST-EXEC: btest-diff output +redef exit_only_after_terminate = T; + module TestModule; global tablestore: opaque of Broker::Store; global setstore: opaque of Broker::Store; -global t: table[string] of count &broker_store=tablestore; -global s: set[string] &broker_store=setstore; +global t: table[string] of count &broker_store="table"; +global s: set[string] &broker_store="set"; event zeek_init() {