broker integration: add unit test for store clones

This commit is contained in:
Jon Siwek 2015-02-03 11:09:39 -06:00
parent 05a865a907
commit 441c46df76
9 changed files with 205 additions and 30 deletions

@ -1 +1 @@
Subproject commit 177bdfac2c768d9ed8f3edb10e9e2dbd0d6f8723
Subproject commit c217119d9a484da941161d182cdc0a1f86a0d40f

View file

@ -27,6 +27,7 @@ broker::util::optional<broker::data> val_to_data(Val* v);
Val* data_to_val(broker::data d, BroType* type);
// TODO: actually need to implement Bro's serialization to support copying vals
class DataVal : public OpaqueVal {
public:

View file

@ -23,7 +23,7 @@ int comm::Manager::send_flags_unsolicited_idx;
comm::Manager::~Manager()
{
for ( auto& s : data_stores )
CloseStore(s.first);
CloseStore(s.first.first, s.first.second);
}
bool comm::Manager::InitPreScript()
@ -741,17 +741,34 @@ bool comm::Manager::AddStore(StoreHandleVal* handle)
if ( ! handle->store )
return false;
if ( data_stores.find(handle->store->id()) != data_stores.end() )
auto key = make_pair(handle->store->id(), handle->store_type);
if ( data_stores.find(key) != data_stores.end() )
return false;
data_stores[handle->store->id()] = handle;
data_stores[key] = handle;
Ref(handle);
return true;
}
bool comm::Manager::CloseStore(const broker::store::identifier& id)
comm::StoreHandleVal*
comm::Manager::LookupStore(const broker::store::identifier& id,
comm::StoreType type)
{
auto it = data_stores.find(id);
auto key = make_pair(id, type);
auto it = data_stores.find(key);
if ( it == data_stores.end() )
return nullptr;
return it->second;
}
bool comm::Manager::CloseStore(const broker::store::identifier& id,
StoreType type)
{
auto key = make_pair(id, type);
auto it = data_stores.find(key);
if ( it == data_stores.end() )
return false;
@ -760,7 +777,7 @@ bool comm::Manager::CloseStore(const broker::store::identifier& id)
{
auto query = *it;
if ( query->StoreID() == id )
if ( query->GetStoreType() == type && query->StoreID() == id )
{
it = pending_queries.erase(it);
query->Abort();

View file

@ -63,7 +63,9 @@ public:
bool AddStore(StoreHandleVal* handle);
bool CloseStore(const broker::store::identifier& id);
StoreHandleVal* LookupStore(const broker::store::identifier& id, StoreType type);
bool CloseStore(const broker::store::identifier& id, StoreType type);
bool TrackStoreQuery(StoreQueryCallback* cb);
@ -91,7 +93,8 @@ private:
std::map<std::string, broker::message_queue> event_subscriptions;
std::map<std::string, broker::message_queue> log_subscriptions;
std::map<broker::store::identifier, StoreHandleVal*> data_stores;
std::map<std::pair<broker::store::identifier, StoreType>,
StoreHandleVal*> data_stores;
std::unordered_set<StoreQueryCallback*> pending_queries;
static VectorType* vector_of_data_type;

View file

@ -56,8 +56,10 @@ class StoreQueryCallback {
public:
StoreQueryCallback(Trigger* arg_trigger, const CallExpr* arg_call,
broker::store::identifier arg_store_id)
: trigger(arg_trigger), call(arg_call), store_id(move(arg_store_id))
broker::store::identifier arg_store_id,
StoreType arg_store_type)
: trigger(arg_trigger), call(arg_call), store_id(move(arg_store_id)),
store_type(arg_store_type)
{
Ref(trigger);
}
@ -85,13 +87,20 @@ public:
const broker::store::identifier& StoreID() const
{ return store_id; }
StoreType GetStoreType() const
{ return store_type; }
private:
Trigger* trigger;
const CallExpr* call;
broker::store::identifier store_id;
StoreType store_type;
};
// TODO: actually need to implement Bro's serialization to support copying vals
// but doesn't make sense to "copy" a master data store, so assert we can
// lookup a store by <id, type> pair locally (i.e. shouldn't send handles remotely).
class StoreHandleVal : public OpaqueVal {
public:

View file

@ -25,10 +25,20 @@ enum BackendType %{
function Store::create_master%(id: string, b: BackendType &default = MEMORY,
options: BackendOptions &default = BackendOptions()%): opaque of Store::Handle
%{
auto rval = new comm::StoreHandleVal(id->CheckString(), comm::StoreType::MASTER,
static_cast<BifEnum::Store::BackendType>(b->AsEnum()),
options->AsRecordVal());
comm_mgr->AddStore(rval);
auto id_str = id->CheckString();
auto type = comm::StoreType::MASTER;
auto rval = comm_mgr->LookupStore(id_str, type);
if ( rval )
{
Ref(rval);
return rval;
}
rval = new comm::StoreHandleVal(id_str, type,
static_cast<BifEnum::Store::BackendType>(b->AsEnum()),
options->AsRecordVal());
assert(comm_mgr->AddStore(rval));
return rval;
%}
@ -36,25 +46,39 @@ function Store::create_clone%(id: string, b: BackendType &default = MEMORY,
options: BackendOptions &default = BackendOptions(),
resync: interval &default = 1sec%): opaque of Store::Handle
%{
auto rval = new comm::StoreHandleVal(id->CheckString(), comm::StoreType::CLONE,
static_cast<BifEnum::Store::BackendType>(b->AsEnum()),
options->AsRecordVal(),
std::chrono::duration<double>(resync));
comm_mgr->AddStore(rval);
auto id_str = id->CheckString();
auto type = comm::StoreType::CLONE;
auto rval = comm_mgr->LookupStore(id_str, type);
if ( rval )
{
Ref(rval);
return rval;
}
rval = new comm::StoreHandleVal(id_str, type,
static_cast<BifEnum::Store::BackendType>(b->AsEnum()),
options->AsRecordVal(),
std::chrono::duration<double>(resync));
assert(comm_mgr->AddStore(rval));
return rval;
%}
function Store::create_frontend%(id: string%): opaque of Store::Handle
%{
auto rval = new comm::StoreHandleVal(id->CheckString(), comm::StoreType::FRONTEND,
{}, nullptr);
comm_mgr->AddStore(rval);
return rval;
%}
auto id_str = id->CheckString();
auto type = comm::StoreType::FRONTEND;
auto rval = comm_mgr->LookupStore(id_str, type);
function Store::close_by_name%(id: string%): bool
%{
return new Val(comm_mgr->CloseStore(id->CheckString()), TYPE_BOOL);
if ( rval )
{
Ref(rval);
return rval;
}
rval = new comm::StoreHandleVal(id_str, type, {}, nullptr);
assert(comm_mgr->AddStore(rval));
return rval;
%}
function Store::close_by_handle%(h: opaque of Store::Handle%): bool
@ -64,7 +88,8 @@ function Store::close_by_handle%(h: opaque of Store::Handle%): bool
if ( ! handle->store )
return new Val(false, TYPE_BOOL);
return new Val(comm_mgr->CloseStore(handle->store->id()), TYPE_BOOL);
return new Val(comm_mgr->CloseStore(handle->store->id(),
handle->store_type), TYPE_BOOL);
%}
###########################
@ -264,7 +289,8 @@ static bool prepare_for_query(Val* opaque, Frame* frame,
frame->SetDelayed();
trigger->Hold();
*cb = new comm::StoreQueryCallback(trigger, frame->GetCall(),
(*handle)->store->id());
(*handle)->store->id(),
(*handle)->store_type);
comm_mgr->TrackStoreQuery(*cb);
return true;
}

View file

@ -0,0 +1,5 @@
clone keys, [status=Store::SUCCESS, result=[d=broker::data{[one, two, myset, myvec]}]]
lookup, one, [status=Store::SUCCESS, result=[d=broker::data{111}]]
lookup, two, [status=Store::SUCCESS, result=[d=broker::data{222}]]
lookup, myset, [status=Store::SUCCESS, result=[d=broker::data{{a, c, d}}]]
lookup, myvec, [status=Store::SUCCESS, result=[d=broker::data{[delta, alpha, beta, gamma, omega]}]]

View file

@ -0,0 +1,114 @@
# @TEST_SERIALIZE: brokercomm
# @TEST_REQUIRES: grep -q ENABLE_BROKER $BUILD/CMakeCache.txt
# @TEST-EXEC: btest-bg-run clone "bro -b ../clone.bro >clone.out"
# @TEST-EXEC: btest-bg-run master "bro -b ../master.bro >master.out"
# @TEST-EXEC: btest-bg-wait 20
# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-sort btest-diff clone/clone.out
# @TEST-EXEC: btest-diff master/master.out
@TEST-START-FILE clone.bro
redef exit_only_after_terminate = T;
global h: opaque of Store::Handle;
global expected_key_count = 4;
global key_count = 0;
event done()
{
terminate();
}
function do_lookup(key: string)
{
when ( local res = Store::lookup(h, Comm::data(key)) )
{
++key_count;
print "lookup", key, res;
if ( key_count == expected_key_count )
event done();
}
timeout 10sec
{ print "timeout"; }
}
event ready()
{
h = Store::create_clone("mystore");
when ( local res = Store::keys(h) )
{
print "clone keys", res;
do_lookup(Comm::refine_to_string(Comm::vector_lookup(res$result, 0)));
do_lookup(Comm::refine_to_string(Comm::vector_lookup(res$result, 1)));
do_lookup(Comm::refine_to_string(Comm::vector_lookup(res$result, 2)));
do_lookup(Comm::refine_to_string(Comm::vector_lookup(res$result, 3)));
}
timeout 10sec
{ print "timeout"; }
}
event bro_init()
{
Comm::listen(9999/tcp, "127.0.0.1");
Comm::subscribe_to_events("bro/event/ready");
Comm::auto_event("bro/event/done", done);
}
@TEST-END-FILE
@TEST-START-FILE master.bro
redef exit_only_after_terminate = T;
global h: opaque of Store::Handle;
function dv(d: Comm::Data): Comm::DataVector
{
local rval: Comm::DataVector;
rval[0] = d;
return rval;
}
global ready: event();
event done()
{
terminate();
}
event Comm::remote_connection_established(peer_address: string,
peer_port: port,
peer_name: string)
{
local myset: set[string] = {"a", "b", "c"};
local myvec: vector of string = {"alpha", "beta", "gamma"};
h = Store::create_master("mystore");
Store::insert(h, Comm::data("one"), Comm::data(110));
Store::insert(h, Comm::data("two"), Comm::data(223));
Store::insert(h, Comm::data("myset"), Comm::data(myset));
Store::insert(h, Comm::data("myvec"), Comm::data(myvec));
Store::increment(h, Comm::data("one"));
Store::decrement(h, Comm::data("two"));
Store::add_to_set(h, Comm::data("myset"), Comm::data("d"));
Store::remove_from_set(h, Comm::data("myset"), Comm::data("b"));
Store::push_left(h, Comm::data("myvec"), dv(Comm::data("delta")));
Store::push_right(h, Comm::data("myvec"), dv(Comm::data("omega")));
when ( local res = Store::size(h) )
{ event ready(); }
timeout 10sec
{ print "timeout"; }
}
event bro_init()
{
Comm::connect("127.0.0.1", 9999/tcp, 1secs);
Comm::auto_event("bro/event/ready", ready);
Comm::subscribe_to_events("bro/event/done");
}
@TEST-END-FILE