Zeek/Brokerstore updates: first working end-to-end test

This commit fixes a few more loose ends to actually make the
Zeek Table<->brokerstore syncing work. This mostly slightly changes the
TableVal assign/remove operators to prevent loops when a remote change
arrives.

The tests inserts a value into a table on the manager, and it pops out
in a table on a clone - which is the easiest case.

Timeouts are still not handled at all; the behavior when inserting into
a clone is untested.
This commit is contained in:
Johanna Amann 2020-06-05 16:14:51 -07:00
parent 62f208086c
commit 65c12ba6e9
6 changed files with 166 additions and 62 deletions

View file

@ -1520,7 +1520,7 @@ void TableVal::CheckExpireAttr(attr_tag at)
} }
} }
bool TableVal::Assign(IntrusivePtr<Val> index, IntrusivePtr<Val> new_val) bool TableVal::Assign(IntrusivePtr<Val> index, IntrusivePtr<Val> new_val, bool broker_forward)
{ {
auto k = MakeHashKey(*index); auto k = MakeHashKey(*index);
@ -1530,7 +1530,7 @@ bool TableVal::Assign(IntrusivePtr<Val> index, IntrusivePtr<Val> new_val)
return false; return false;
} }
return Assign(std::move(index), std::move(k), std::move(new_val)); return Assign(std::move(index), std::move(k), std::move(new_val), broker_forward);
} }
bool TableVal::Assign(Val* index, Val* new_val) bool TableVal::Assign(Val* index, Val* new_val)
@ -1539,7 +1539,7 @@ bool TableVal::Assign(Val* index, Val* new_val)
} }
bool TableVal::Assign(IntrusivePtr<Val> index, std::unique_ptr<HashKey> k, bool TableVal::Assign(IntrusivePtr<Val> index, std::unique_ptr<HashKey> k,
IntrusivePtr<Val> new_val) IntrusivePtr<Val> new_val, bool broker_forward)
{ {
bool is_set = table_type->IsSet(); bool is_set = table_type->IsSet();
@ -1572,17 +1572,19 @@ bool TableVal::Assign(IntrusivePtr<Val> index, std::unique_ptr<HashKey> k,
Modified(); Modified();
if ( change_func || ( ! broker_store.empty() ) ) if ( change_func || ( broker_forward && ! broker_store.empty() ) )
{ {
auto change_index = index ? std::move(index) auto change_index = index ? std::move(index)
: RecreateIndex(k_copy); : RecreateIndex(k_copy);
const auto& v = old_entry_val ? old_entry_val->GetVal() : new_entry_val->GetVal();
if ( ! broker_store.empty() ) if ( ! broker_store.empty() )
SendToStore(change_index.get(), v.get(), old_entry_val ? ELEMENT_CHANGED : ELEMENT_NEW); SendToStore(change_index.get(), new_entry_val->GetVal().get(), old_entry_val ? ELEMENT_CHANGED : ELEMENT_NEW);
if ( change_func ) if ( change_func )
{
const auto& v = old_entry_val ? old_entry_val->GetVal() : new_entry_val->GetVal();
CallChangeFunc(change_index.get(), v, old_entry_val ? ELEMENT_CHANGED : ELEMENT_NEW); CallChangeFunc(change_index.get(), v, old_entry_val ? ELEMENT_CHANGED : ELEMENT_NEW);
} }
}
delete old_entry_val; delete old_entry_val;
@ -2180,7 +2182,7 @@ void TableVal::SendToStore(const Val* index, const Val* new_value, OnChangeType
} }
} }
IntrusivePtr<Val> TableVal::Remove(const Val& index) IntrusivePtr<Val> TableVal::Remove(const Val& index, bool broker_forward)
{ {
auto k = MakeHashKey(index); auto k = MakeHashKey(index);
TableEntryVal* v = k ? AsNonConstTable()->RemoveEntry(k.get()) : nullptr; TableEntryVal* v = k ? AsNonConstTable()->RemoveEntry(k.get()) : nullptr;
@ -2196,7 +2198,7 @@ IntrusivePtr<Val> TableVal::Remove(const Val& index)
Modified(); Modified();
if ( ! broker_store.empty() ) if ( broker_forward && ! broker_store.empty() )
SendToStore(&index, nullptr, ELEMENT_REMOVED); SendToStore(&index, nullptr, ELEMENT_REMOVED);
if ( change_func ) if ( change_func )
CallChangeFunc(&index, va, ELEMENT_REMOVED); CallChangeFunc(&index, va, ELEMENT_REMOVED);

View file

@ -772,9 +772,11 @@ public:
* @param index The key to assign. * @param index The key to assign.
* @param new_val The value to assign at the index. For a set, this * @param new_val The value to assign at the index. For a set, this
* must be nullptr. * must be nullptr.
* @param broker_forward Controls if the value will be forwarded to attached
* broker stores.
* @return True if the assignment type-checked. * @return True if the assignment type-checked.
*/ */
bool Assign(IntrusivePtr<Val> index, IntrusivePtr<Val> new_val); bool Assign(IntrusivePtr<Val> index, IntrusivePtr<Val> new_val, bool broker_forward = true);
/** /**
* Assigns a value at an associated index in the table (or in the * Assigns a value at an associated index in the table (or in the
@ -784,10 +786,12 @@ public:
* @param k A precomputed hash key to use. * @param k A precomputed hash key to use.
* @param new_val The value to assign at the index. For a set, this * @param new_val The value to assign at the index. For a set, this
* must be nullptr. * must be nullptr.
* @param broker_forward Controls if the value will be forwarded to attached
* broker stores.
* @return True if the assignment type-checked. * @return True if the assignment type-checked.
*/ */
bool Assign(IntrusivePtr<Val> index, std::unique_ptr<HashKey> k, bool Assign(IntrusivePtr<Val> index, std::unique_ptr<HashKey> k,
IntrusivePtr<Val> new_val); IntrusivePtr<Val> new_val, bool broker_forward = true);
// Returns true if the assignment typechecked, false if not. The // Returns true if the assignment typechecked, false if not. The
// methods take ownership of new_val, but not of the index. If we're // methods take ownership of new_val, but not of the index. If we're
@ -909,12 +913,14 @@ public:
/** /**
* Remove an element from the table and return it. * Remove an element from the table and return it.
* @param index The index to remove. * @param index The index to remove.
* @param broker_forward Controls if the remove operation will be forwarded to attached
* broker stores.
* @return The value associated with the index if it exists, else nullptr. * @return The value associated with the index if it exists, else nullptr.
* For a sets that don't really contain associated values, a placeholder * For a sets that don't really contain associated values, a placeholder
* value is returned to differentiate it from non-existent index (nullptr), * value is returned to differentiate it from non-existent index (nullptr),
* but otherwise has no meaning in relation to the set's contents. * but otherwise has no meaning in relation to the set's contents.
*/ */
IntrusivePtr<Val> Remove(const Val& index); IntrusivePtr<Val> Remove(const Val& index, bool broker_forward = true);
/** /**
* Same as Remove(const Val&), but uses a precomputed hash key. * Same as Remove(const Val&), but uses a precomputed hash key.

View file

@ -963,17 +963,19 @@ void Manager::ProcessStoreEvent(const broker::topic& topic, broker::data msg)
auto key = insert.key(); auto key = insert.key();
auto data = insert.value(); auto data = insert.value();
DBG_LOG(DBG_BROKER, "Insert Data: %s:%s (%s:%s)", to_string(insert.key()).c_str(), to_string(insert.value()).c_str(), insert.key().get_type_name(), insert.value().get_type_name()); // We sent this message. Ignore it.
if ( insert.publisher() == storehandle->store_pid )
return;
DBG_LOG(DBG_BROKER, "Store %s: Insert: %s:%s (%s:%s)", insert.store_id().c_str(), to_string(insert.key()).c_str(), to_string(insert.value()).c_str(), insert.key().get_type_name(), insert.value().get_type_name());
if ( table->GetType()->IsSet() && data.get_type() != broker::data::type::none ) if ( table->GetType()->IsSet() && data.get_type() != broker::data::type::none )
{ {
reporter->Error("ProcessStoreEvent Insert got %s when expecting set", data.get_type_name()); reporter->Error("ProcessStoreEvent Insert got %s when expecting set", data.get_type_name());
return; return;
} }
// We sent this message. Ignore it.
if ( insert.publisher() == storehandle->store_pid )
return;
// FIXME: expiry! // FIXME: expiry!
const auto& its = table->GetType()->AsTableType()->IndexTypes(); const auto& its = table->GetType()->AsTableType()->IndexTypes();
assert( its.size() == 1 ); assert( its.size() == 1 );
@ -986,7 +988,7 @@ void Manager::ProcessStoreEvent(const broker::topic& topic, broker::data msg)
if ( table->GetType()->IsSet() ) if ( table->GetType()->IsSet() )
{ {
table->Assign(zeek_key, nullptr); table->Assign(zeek_key, nullptr, false);
return; return;
} }
@ -997,7 +999,7 @@ void Manager::ProcessStoreEvent(const broker::topic& topic, broker::data msg)
reporter->Error("ProcessStoreEvent: failed to convert value"); reporter->Error("ProcessStoreEvent: failed to convert value");
return; return;
} }
table->Assign(zeek_key, zeek_value); table->Assign(zeek_key, zeek_value, false);
} }
else if ( auto update = broker::store_event::update::make(msg) ) else if ( auto update = broker::store_event::update::make(msg) )
{ {
@ -1012,17 +1014,18 @@ void Manager::ProcessStoreEvent(const broker::topic& topic, broker::data msg)
auto key = update.key(); auto key = update.key();
auto data = update.new_value(); auto data = update.new_value();
DBG_LOG(DBG_BROKER, "Update Data: %s->%s (%s)", to_string(update.old_value()).c_str(), to_string(update.new_value()).c_str(), update.new_value().get_type_name()); // We sent this message. Ignore it.
if ( update.publisher() == storehandle->store_pid )
return;
DBG_LOG(DBG_BROKER, "Store %s: Update: %s->%s (%s)", update.store_id().c_str(), to_string(update.old_value()).c_str(), to_string(update.new_value()).c_str(), update.new_value().get_type_name());
if ( table->GetType()->IsSet() && data.get_type() != broker::data::type::none ) if ( table->GetType()->IsSet() && data.get_type() != broker::data::type::none )
{ {
reporter->Error("ProcessStoreEvent Update got %s when expecting set", data.get_type_name()); reporter->Error("ProcessStoreEvent Update got %s when expecting set", data.get_type_name());
return; return;
} }
// We sent this message. Ignore it.
if ( update.publisher() == storehandle->store_pid )
return;
const auto& its = table->GetType()->AsTableType()->IndexTypes(); const auto& its = table->GetType()->AsTableType()->IndexTypes();
assert( its.size() == 1 ); assert( its.size() == 1 );
auto zeek_key = data_to_val(std::move(key), its[0].get()); auto zeek_key = data_to_val(std::move(key), its[0].get());
@ -1034,7 +1037,7 @@ void Manager::ProcessStoreEvent(const broker::topic& topic, broker::data msg)
if ( table->GetType()->IsSet() ) if ( table->GetType()->IsSet() )
{ {
table->Assign(zeek_key, nullptr); table->Assign(zeek_key, nullptr, false);
return; return;
} }
@ -1045,7 +1048,7 @@ void Manager::ProcessStoreEvent(const broker::topic& topic, broker::data msg)
reporter->Error("ProcessStoreEvent: failed to convert value"); reporter->Error("ProcessStoreEvent: failed to convert value");
return; return;
} }
table->Assign(zeek_key, zeek_value); table->Assign(zeek_key, zeek_value, false);
} }
else if ( auto erase = broker::store_event::erase::make(msg) ) else if ( auto erase = broker::store_event::erase::make(msg) )
{ {
@ -1057,13 +1060,12 @@ void Manager::ProcessStoreEvent(const broker::topic& topic, broker::data msg)
if ( ! table ) if ( ! table )
return; return;
DBG_LOG(DBG_BROKER, "Erase for key %s", erase.store_id().c_str());
// We sent this message. Ignore it. // We sent this message. Ignore it.
if ( erase.publisher() == storehandle->store_pid ) if ( erase.publisher() == storehandle->store_pid )
return; return;
auto key = erase.key(); auto key = erase.key();
DBG_LOG(DBG_BROKER, "Store %s: Erase key %s", erase.store_id().c_str(), to_string(key).c_str());
const auto& its = table->GetType()->AsTableType()->IndexTypes(); const auto& its = table->GetType()->AsTableType()->IndexTypes();
assert( its.size() == 1 ); assert( its.size() == 1 );
auto zeek_key = data_to_val(std::move(key), its[0].get()); auto zeek_key = data_to_val(std::move(key), its[0].get());
@ -1072,7 +1074,7 @@ void Manager::ProcessStoreEvent(const broker::topic& topic, broker::data msg)
reporter->Error("ProcessStoreEvent: failed to convert key"); reporter->Error("ProcessStoreEvent: failed to convert key");
return; return;
} }
table->Remove(*zeek_key); table->Remove(*zeek_key, false);
} }
else else
{ {

View file

@ -0,0 +1,19 @@
Peer added
{
[b] = 3,
[whatever] = 5,
[a] = 3
}
{
hi
}
{
[b] = [a=2, b=d, c={
elem1,
elem2
}],
[a] = [a=1, b=c, c={
elem1,
elem2
}]
}

View file

@ -0,0 +1,109 @@
# @TEST-PORT: BROKER_PORT
# @TEST-EXEC: btest-bg-run master "zeek -B broker -b ../master.zeek >../master.out"
# @TEST-EXEC: btest-bg-run clone "zeek -B broker -b ../clone.zeek >../clone.out"
# @TEST-EXEC: btest-bg-wait 15
#
# @TEST-EXEC: btest-diff clone.out
@TEST-START-FILE master.zeek
redef exit_only_after_terminate = T;
module TestModule;
global tablestore: opaque of Broker::Store;
global setstore: opaque of Broker::Store;
global recordstore: opaque of Broker::Store;
type testrec: record {
a: count;
b: string;
c: set[string];
};
global t: table[string] of count &broker_store="table";
global s: set[string] &broker_store="set";
global r: table[string] of testrec &broker_store="rec";
event zeek_init()
{
Broker::listen("127.0.0.1", to_port(getenv("BROKER_PORT")));
tablestore = Broker::create_master("table");
setstore = Broker::create_master("set");
recordstore = Broker::create_master("rec");
}
event insert_stuff()
{
print "Inserting stuff";
t["a"] = 5;
delete t["a"];
add s["hi"];
t["a"] = 2;
t["a"] = 3;
t["b"] = 3;
t["c"] = 4;
t["whatever"] = 5;
delete t["c"];
r["a"] = testrec($a=1, $b="b", $c=set("elem1", "elem2"));
r["a"] = testrec($a=1, $b="c", $c=set("elem1", "elem2"));
r["b"] = testrec($a=2, $b="d", $c=set("elem1", "elem2"));
print t;
print s;
print r;
}
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
{
print "Peer added ", endpoint;
schedule 3secs { insert_stuff() };
}
event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
{
terminate();
}
@TEST-END-FILE
@TEST-START-FILE clone.zeek
redef exit_only_after_terminate = T;
module TestModule;
global tablestore: opaque of Broker::Store;
global setstore: opaque of Broker::Store;
global recordstore: opaque of Broker::Store;
type testrec: record {
a: count;
b: string;
c: set[string];
};
global t: table[string] of count &broker_store="table";
global s: set[string] &broker_store="set";
global r: table[string] of testrec &broker_store="rec";
event zeek_init()
{
Broker::peer("127.0.0.1", to_port(getenv("BROKER_PORT")));
}
event dump_tables()
{
print t;
print s;
print r;
terminate();
}
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
{
print "Peer added";
tablestore = Broker::create_clone("table");
setstore = Broker::create_clone("set");
recordstore = Broker::create_clone("rec");
schedule 5secs { dump_tables() };
}
@TEST-END-FILE

View file

@ -1,34 +0,0 @@
# @TEST-EXEC: zeek %INPUT >output
# @TEST-EXEC: btest-diff output
redef exit_only_after_terminate = T;
module TestModule;
global tablestore: opaque of Broker::Store;
#global tabletwostore: opaque of Broker::Store;
global setstore: opaque of Broker::Store;
global t: table[string] of count &broker_store="table";
#global ct: table[string, string] of count &broker_store="table2";
global s: set[string] &broker_store="set";
event zeek_init()
{
tablestore = Broker::create_master("table");
#tabletwostore = Broker::create_master("table2");
setstore = Broker::create_master("set");
print "inserting";
t["a"] = 5;
delete t["a"];
#add s["hi"];
#print "changing";
t["a"] = 2;
t["a"] = 3;
t["b"] = 3;
t["c"] = 4;
t["whatever"] = 5;
#print "deleting";
#delete t["a"];
#delete s["hi"];
}