diff --git a/aux/broker b/aux/broker index 177bdfac2c..c217119d9a 160000 --- a/aux/broker +++ b/aux/broker @@ -1 +1 @@ -Subproject commit 177bdfac2c768d9ed8f3edb10e9e2dbd0d6f8723 +Subproject commit c217119d9a484da941161d182cdc0a1f86a0d40f diff --git a/src/comm/Data.h b/src/comm/Data.h index c720dcda71..da10853127 100644 --- a/src/comm/Data.h +++ b/src/comm/Data.h @@ -27,6 +27,7 @@ broker::util::optional val_to_data(Val* v); Val* data_to_val(broker::data d, BroType* type); +// TODO: actually need to implement Bro's serialization to support copying vals class DataVal : public OpaqueVal { public: diff --git a/src/comm/Manager.cc b/src/comm/Manager.cc index 0b887d4f37..3d6aad4d1e 100644 --- a/src/comm/Manager.cc +++ b/src/comm/Manager.cc @@ -23,7 +23,7 @@ int comm::Manager::send_flags_unsolicited_idx; comm::Manager::~Manager() { for ( auto& s : data_stores ) - CloseStore(s.first); + CloseStore(s.first.first, s.first.second); } bool comm::Manager::InitPreScript() @@ -741,17 +741,34 @@ bool comm::Manager::AddStore(StoreHandleVal* handle) if ( ! handle->store ) return false; - if ( data_stores.find(handle->store->id()) != data_stores.end() ) + auto key = make_pair(handle->store->id(), handle->store_type); + + if ( data_stores.find(key) != data_stores.end() ) return false; - data_stores[handle->store->id()] = handle; + data_stores[key] = handle; Ref(handle); return true; } -bool comm::Manager::CloseStore(const broker::store::identifier& id) +comm::StoreHandleVal* +comm::Manager::LookupStore(const broker::store::identifier& id, + comm::StoreType type) { - auto it = data_stores.find(id); + auto key = make_pair(id, type); + auto it = data_stores.find(key); + + if ( it == data_stores.end() ) + return nullptr; + + return it->second; + } + +bool comm::Manager::CloseStore(const broker::store::identifier& id, + StoreType type) + { + auto key = make_pair(id, type); + auto it = data_stores.find(key); if ( it == data_stores.end() ) return false; @@ -760,7 +777,7 @@ bool comm::Manager::CloseStore(const broker::store::identifier& id) { auto query = *it; - if ( query->StoreID() == id ) + if ( query->GetStoreType() == type && query->StoreID() == id ) { it = pending_queries.erase(it); query->Abort(); diff --git a/src/comm/Manager.h b/src/comm/Manager.h index c9cc2c8464..31fdfa56c1 100644 --- a/src/comm/Manager.h +++ b/src/comm/Manager.h @@ -63,7 +63,9 @@ public: bool AddStore(StoreHandleVal* handle); - bool CloseStore(const broker::store::identifier& id); + StoreHandleVal* LookupStore(const broker::store::identifier& id, StoreType type); + + bool CloseStore(const broker::store::identifier& id, StoreType type); bool TrackStoreQuery(StoreQueryCallback* cb); @@ -91,7 +93,8 @@ private: std::map event_subscriptions; std::map log_subscriptions; - std::map data_stores; + std::map, + StoreHandleVal*> data_stores; std::unordered_set pending_queries; static VectorType* vector_of_data_type; diff --git a/src/comm/Store.h b/src/comm/Store.h index b3a8ccb339..3183afbbb3 100644 --- a/src/comm/Store.h +++ b/src/comm/Store.h @@ -56,8 +56,10 @@ class StoreQueryCallback { public: StoreQueryCallback(Trigger* arg_trigger, const CallExpr* arg_call, - broker::store::identifier arg_store_id) - : trigger(arg_trigger), call(arg_call), store_id(move(arg_store_id)) + broker::store::identifier arg_store_id, + StoreType arg_store_type) + : trigger(arg_trigger), call(arg_call), store_id(move(arg_store_id)), + store_type(arg_store_type) { Ref(trigger); } @@ -85,13 +87,20 @@ public: const broker::store::identifier& StoreID() const { return store_id; } + StoreType GetStoreType() const + { return store_type; } + private: Trigger* trigger; const CallExpr* call; broker::store::identifier store_id; + StoreType store_type; }; +// TODO: actually need to implement Bro's serialization to support copying vals +// but doesn't make sense to "copy" a master data store, so assert we can +// lookup a store by pair locally (i.e. shouldn't send handles remotely). class StoreHandleVal : public OpaqueVal { public: diff --git a/src/comm/store.bif b/src/comm/store.bif index 176e55268e..7d09704d31 100644 --- a/src/comm/store.bif +++ b/src/comm/store.bif @@ -25,10 +25,20 @@ enum BackendType %{ function Store::create_master%(id: string, b: BackendType &default = MEMORY, options: BackendOptions &default = BackendOptions()%): opaque of Store::Handle %{ - auto rval = new comm::StoreHandleVal(id->CheckString(), comm::StoreType::MASTER, - static_cast(b->AsEnum()), - options->AsRecordVal()); - comm_mgr->AddStore(rval); + auto id_str = id->CheckString(); + auto type = comm::StoreType::MASTER; + auto rval = comm_mgr->LookupStore(id_str, type); + + if ( rval ) + { + Ref(rval); + return rval; + } + + rval = new comm::StoreHandleVal(id_str, type, + static_cast(b->AsEnum()), + options->AsRecordVal()); + assert(comm_mgr->AddStore(rval)); return rval; %} @@ -36,25 +46,39 @@ function Store::create_clone%(id: string, b: BackendType &default = MEMORY, options: BackendOptions &default = BackendOptions(), resync: interval &default = 1sec%): opaque of Store::Handle %{ - auto rval = new comm::StoreHandleVal(id->CheckString(), comm::StoreType::CLONE, - static_cast(b->AsEnum()), - options->AsRecordVal(), - std::chrono::duration(resync)); - comm_mgr->AddStore(rval); + auto id_str = id->CheckString(); + auto type = comm::StoreType::CLONE; + auto rval = comm_mgr->LookupStore(id_str, type); + + if ( rval ) + { + Ref(rval); + return rval; + } + + rval = new comm::StoreHandleVal(id_str, type, + static_cast(b->AsEnum()), + options->AsRecordVal(), + std::chrono::duration(resync)); + assert(comm_mgr->AddStore(rval)); return rval; %} function Store::create_frontend%(id: string%): opaque of Store::Handle %{ - auto rval = new comm::StoreHandleVal(id->CheckString(), comm::StoreType::FRONTEND, - {}, nullptr); - comm_mgr->AddStore(rval); - return rval; - %} + auto id_str = id->CheckString(); + auto type = comm::StoreType::FRONTEND; + auto rval = comm_mgr->LookupStore(id_str, type); -function Store::close_by_name%(id: string%): bool - %{ - return new Val(comm_mgr->CloseStore(id->CheckString()), TYPE_BOOL); + if ( rval ) + { + Ref(rval); + return rval; + } + + rval = new comm::StoreHandleVal(id_str, type, {}, nullptr); + assert(comm_mgr->AddStore(rval)); + return rval; %} function Store::close_by_handle%(h: opaque of Store::Handle%): bool @@ -64,7 +88,8 @@ function Store::close_by_handle%(h: opaque of Store::Handle%): bool if ( ! handle->store ) return new Val(false, TYPE_BOOL); - return new Val(comm_mgr->CloseStore(handle->store->id()), TYPE_BOOL); + return new Val(comm_mgr->CloseStore(handle->store->id(), + handle->store_type), TYPE_BOOL); %} ########################### @@ -264,7 +289,8 @@ static bool prepare_for_query(Val* opaque, Frame* frame, frame->SetDelayed(); trigger->Hold(); *cb = new comm::StoreQueryCallback(trigger, frame->GetCall(), - (*handle)->store->id()); + (*handle)->store->id(), + (*handle)->store_type); comm_mgr->TrackStoreQuery(*cb); return true; } diff --git a/testing/btest/Baseline/comm.clone_store/clone.clone.out b/testing/btest/Baseline/comm.clone_store/clone.clone.out new file mode 100644 index 0000000000..8a7c89a19b --- /dev/null +++ b/testing/btest/Baseline/comm.clone_store/clone.clone.out @@ -0,0 +1,5 @@ +clone keys, [status=Store::SUCCESS, result=[d=broker::data{[one, two, myset, myvec]}]] +lookup, one, [status=Store::SUCCESS, result=[d=broker::data{111}]] +lookup, two, [status=Store::SUCCESS, result=[d=broker::data{222}]] +lookup, myset, [status=Store::SUCCESS, result=[d=broker::data{{a, c, d}}]] +lookup, myvec, [status=Store::SUCCESS, result=[d=broker::data{[delta, alpha, beta, gamma, omega]}]] diff --git a/testing/btest/Baseline/comm.clone_store/master.master.out b/testing/btest/Baseline/comm.clone_store/master.master.out new file mode 100644 index 0000000000..e69de29bb2 diff --git a/testing/btest/comm/clone_store.bro b/testing/btest/comm/clone_store.bro new file mode 100644 index 0000000000..03e0fe172f --- /dev/null +++ b/testing/btest/comm/clone_store.bro @@ -0,0 +1,114 @@ +# @TEST_SERIALIZE: brokercomm +# @TEST_REQUIRES: grep -q ENABLE_BROKER $BUILD/CMakeCache.txt + +# @TEST-EXEC: btest-bg-run clone "bro -b ../clone.bro >clone.out" +# @TEST-EXEC: btest-bg-run master "bro -b ../master.bro >master.out" + +# @TEST-EXEC: btest-bg-wait 20 +# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-sort btest-diff clone/clone.out +# @TEST-EXEC: btest-diff master/master.out + +@TEST-START-FILE clone.bro + +redef exit_only_after_terminate = T; + +global h: opaque of Store::Handle; +global expected_key_count = 4; +global key_count = 0; + +event done() + { + terminate(); + } + +function do_lookup(key: string) + { + when ( local res = Store::lookup(h, Comm::data(key)) ) + { + ++key_count; + print "lookup", key, res; + + if ( key_count == expected_key_count ) + event done(); + } + timeout 10sec + { print "timeout"; } + } + +event ready() + { + h = Store::create_clone("mystore"); + + when ( local res = Store::keys(h) ) + { + print "clone keys", res; + do_lookup(Comm::refine_to_string(Comm::vector_lookup(res$result, 0))); + do_lookup(Comm::refine_to_string(Comm::vector_lookup(res$result, 1))); + do_lookup(Comm::refine_to_string(Comm::vector_lookup(res$result, 2))); + do_lookup(Comm::refine_to_string(Comm::vector_lookup(res$result, 3))); + } + timeout 10sec + { print "timeout"; } + } + +event bro_init() + { + Comm::listen(9999/tcp, "127.0.0.1"); + Comm::subscribe_to_events("bro/event/ready"); + Comm::auto_event("bro/event/done", done); + } + +@TEST-END-FILE + +@TEST-START-FILE master.bro + +redef exit_only_after_terminate = T; + +global h: opaque of Store::Handle; + +function dv(d: Comm::Data): Comm::DataVector + { + local rval: Comm::DataVector; + rval[0] = d; + return rval; + } + +global ready: event(); + +event done() + { + terminate(); + } + +event Comm::remote_connection_established(peer_address: string, + peer_port: port, + peer_name: string) + { + local myset: set[string] = {"a", "b", "c"}; + local myvec: vector of string = {"alpha", "beta", "gamma"}; + h = Store::create_master("mystore"); + Store::insert(h, Comm::data("one"), Comm::data(110)); + Store::insert(h, Comm::data("two"), Comm::data(223)); + Store::insert(h, Comm::data("myset"), Comm::data(myset)); + Store::insert(h, Comm::data("myvec"), Comm::data(myvec)); + Store::increment(h, Comm::data("one")); + Store::decrement(h, Comm::data("two")); + Store::add_to_set(h, Comm::data("myset"), Comm::data("d")); + Store::remove_from_set(h, Comm::data("myset"), Comm::data("b")); + Store::push_left(h, Comm::data("myvec"), dv(Comm::data("delta"))); + Store::push_right(h, Comm::data("myvec"), dv(Comm::data("omega"))); + + when ( local res = Store::size(h) ) + { event ready(); } + timeout 10sec + { print "timeout"; } + } + +event bro_init() + { + Comm::connect("127.0.0.1", 9999/tcp, 1secs); + Comm::auto_event("bro/event/ready", ready); + Comm::subscribe_to_events("bro/event/done"); + } + +@TEST-END-FILE