BrokerStore<->Zeek tables: allow setting storage location & tests

With this, the basic functionality of &backend seems to be working.

It is not yet integrated with zeekctl, one has to manually specify the
storage location for the sqlite files somewhere when using sqlite.

Usage for memory stores:

global table_to_share: table[string] of count &backend=Broker::MEMORY;

Usage for sqlite stores:

redef Broker::auto_store_db_directory = "[path]";
global table_to_share: table[string] of count &backend=Broker::SQLITE;

In both cases, the cluster should automatically sync to changes done by
any node. When using sqlite, data should also be saved to disk and
re-loaded on startup.
This commit is contained in:
Johanna Amann 2020-07-01 17:10:43 -07:00
parent a220b02722
commit f6251e62a0
7 changed files with 349 additions and 6 deletions

View file

@ -235,16 +235,29 @@ void Manager::InitializeBrokerStoreForwarding()
id->GetVal()->AsTableVal()->SetBrokerStore(storename);
AddForwardedStore(storename, {NewRef{}, id->GetVal()->AsTableVal()});
// we only create masters here. For clones, we do all the work of setting up
// the forwarding - but we do not try to initialize the clone. We can only initialize
// the clone, once a node has a connection to a master. This is currently done in scriptland
// - check FIXME.
if ( zeek_table_manager )
{
if ( ! zeek_table_manager )
continue;
auto backend = bro_broker::to_backend_type(e);
MakeMaster(storename, backend, broker::backend_options{});
auto suffix = ".store";
switch ( backend ) {
case broker::backend::sqlite:
suffix = ".sqlite";
break;
case broker::backend::rocksdb:
suffix = ".rocksdb";
break;
default:
break;
}
auto path = zeek_table_db_directory + "/" + storename + suffix;
MakeMaster(storename, backend, broker::backend_options{{"path", path}});
}
}
}

View file

@ -0,0 +1,18 @@
{
[b] = 3,
[whatever] = 5,
[a] = 3
}
{
hi
}
{
[b] = [a=2, b=d, c={
elem1,
elem2
}],
[a] = [a=1, b=c, c={
elem1,
elem2
}]
}

View file

@ -0,0 +1,18 @@
{
[b] = 3,
[whatever] = 5,
[a] = 3
}
{
hi
}
{
[b] = [a=2, b=d, c={
elem1,
elem2
}],
[a] = [a=1, b=c, c={
elem1,
elem2
}]
}

View file

@ -0,0 +1,18 @@
{
[b] = 3,
[whatever] = 5,
[a] = 3
}
{
hi
}
{
[b] = [a=2, b=d, c={
elem1,
elem2
}],
[a] = [a=1, b=c, c={
elem1,
elem2
}]
}

View file

@ -0,0 +1,18 @@
{
[b] = 3,
[whatever] = 5,
[a] = 3
}
{
hi
}
{
[b] = [a=2, b=d, c={
elem1,
elem2
}],
[a] = [a=1, b=c, c={
elem1,
elem2
}]
}

View file

@ -0,0 +1,135 @@
# @TEST-PORT: BROKER_PORT1
# @TEST-PORT: BROKER_PORT2
# @TEST-PORT: BROKER_PORT3
# @TEST-EXEC: btest-bg-run manager-1 "ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=manager-1 zeek -B broker ../master.zeek >../master.out"
# @TEST-EXEC: btest-bg-run worker-1 "ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=worker-1 zeek -B broker ../clone.zeek >../clone.out"
# @TEST-EXEC: btest-bg-run worker-2 "ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=worker-2 zeek -B broker ../clone2.zeek >../clone2.out"
# @TEST-EXEC: btest-bg-wait 15
#
# @TEST-EXEC: btest-diff master.out
# @TEST-EXEC: btest-diff clone.out
# @TEST-EXEC: diff master.out clone.out
# @TEST-EXEC: diff master.out clone2.out
@TEST-START-FILE cluster-layout.zeek
redef Cluster::nodes = {
["manager-1"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT1"))],
["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT2")), $manager="manager-1", $interface="eth0"],
["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT3")), $manager="manager-1", $interface="eth0"],
};
@TEST-END-FILE
@TEST-START-FILE master.zeek
redef exit_only_after_terminate = T;
redef Log::enable_local_logging = T;
redef Log::default_rotation_interval = 0secs;
module TestModule;
type testrec: record {
a: count;
b: string;
c: set[string];
};
global t: table[string] of count &backend=Broker::MEMORY;
global s: set[string] &backend=Broker::MEMORY;
global r: table[string] of testrec &backend=Broker::MEMORY;
event zeek_init()
{
}
event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
{
print t;
print s;
print r;
terminate();
}
@TEST-END-FILE
@TEST-START-FILE clone.zeek
redef exit_only_after_terminate = T;
redef Log::enable_local_logging = T;
redef Log::default_rotation_interval = 0secs;
module TestModule;
type testrec: record {
a: count;
b: string;
c: set[string];
};
global t: table[string] of count &backend=Broker::MEMORY;
global s: set[string] &backend=Broker::MEMORY;
global r: table[string] of testrec &backend=Broker::MEMORY;
event terminate_me()
{
terminate();
}
event dump_tables()
{
t["a"] = 5;
delete t["a"];
add s["hi"];
t["a"] = 2;
t["a"] = 3;
t["b"] = 3;
t["c"] = 4;
t["whatever"] = 5;
delete t["c"];
r["a"] = testrec($a=1, $b="b", $c=set("elem1", "elem2"));
r["a"] = testrec($a=1, $b="c", $c=set("elem1", "elem2"));
r["b"] = testrec($a=2, $b="d", $c=set("elem1", "elem2"));
print t;
print s;
print r;
schedule 2sec { terminate_me() };
}
event Cluster::node_up(name: string, id: string)
{
#print "node up", name;
schedule 1secs { dump_tables() };
}
@TEST-END-FILE
@TEST-START-FILE clone2.zeek
redef exit_only_after_terminate = T;
redef Log::enable_local_logging = T;
redef Log::default_rotation_interval = 0secs;
module TestModule;
type testrec: record {
a: count;
b: string;
c: set[string];
};
global t: table[string] of count &backend=Broker::MEMORY;
global s: set[string] &backend=Broker::MEMORY;
global r: table[string] of testrec &backend=Broker::MEMORY;
event dump_tables()
{
print t;
print s;
print r;
terminate();
}
event Cluster::node_up(name: string, id: string)
{
#print "node up", name;
schedule 4secs { dump_tables() };
}
@TEST-END-FILE

View file

@ -0,0 +1,123 @@
# @TEST-PORT: BROKER_PORT1
# @TEST-PORT: BROKER_PORT2
# @TEST-PORT: BROKER_PORT3
# @TEST-EXEC: zeek preseed-sqlite.zeek;
# @TEST-EXEC: btest-bg-run manager-1 "ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=manager-1 zeek -B broker ../master.zeek >../master.out"
# @TEST-EXEC: btest-bg-run worker-1 "ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=worker-1 zeek -B broker ../clone.zeek >../clone.out"
# @TEST-EXEC: btest-bg-run worker-2 "ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=worker-2 zeek -B broker ../clone.zeek >../clone2.out"
# @TEST-EXEC: btest-bg-wait 15
#
# @TEST-EXEC: btest-diff master.out
# @TEST-EXEC: btest-diff clone.out
# @TEST-EXEC: diff master.out clone.out
# @TEST-EXEC: diff master.out clone2.out
@TEST-START-FILE cluster-layout.zeek
redef Cluster::nodes = {
["manager-1"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT1"))],
["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT2")), $manager="manager-1", $interface="eth0"],
["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT3")), $manager="manager-1", $interface="eth0"],
};
@TEST-END-FILE
@TEST-START-FILE preseed-sqlite.zeek
module TestModule;
type testrec: record {
a: count;
b: string;
c: set[string];
};
global t: table[string] of count &backend=Broker::SQLITE;
global s: set[string] &backend=Broker::SQLITE;
global r: table[string] of testrec &backend=Broker::SQLITE;
event zeek_init()
{
t["a"] = 5;
delete t["a"];
add s["hi"];
t["a"] = 2;
t["a"] = 3;
t["b"] = 3;
t["c"] = 4;
t["whatever"] = 5;
delete t["c"];
r["a"] = testrec($a=1, $b="b", $c=set("elem1", "elem2"));
r["a"] = testrec($a=1, $b="c", $c=set("elem1", "elem2"));
r["b"] = testrec($a=2, $b="d", $c=set("elem1", "elem2"));
print t;
print s;
print r;
}
@TEST-END-FILE
@TEST-START-FILE master.zeek
redef exit_only_after_terminate = T;
redef Log::enable_local_logging = T;
redef Log::default_rotation_interval = 0secs;
module TestModule;
type testrec: record {
a: count;
b: string;
c: set[string];
};
global t: table[string] of count &backend=Broker::SQLITE;
global s: set[string] &backend=Broker::SQLITE;
global r: table[string] of testrec &backend=Broker::SQLITE;
redef Broker::auto_store_db_directory = "..";
event zeek_init()
{
print t;
print s;
print r;
}
event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
{
terminate();
}
@TEST-END-FILE
@TEST-START-FILE clone.zeek
redef exit_only_after_terminate = T;
redef Log::enable_local_logging = T;
redef Log::default_rotation_interval = 0secs;
module TestModule;
type testrec: record {
a: count;
b: string;
c: set[string];
};
global t: table[string] of count &backend=Broker::MEMORY;
global s: set[string] &backend=Broker::MEMORY;
global r: table[string] of testrec &backend=Broker::MEMORY;
event dump_tables()
{
print t;
print s;
print r;
terminate();
}
event Cluster::node_up(name: string, id: string)
{
#print "node up", name;
schedule 4secs { dump_tables() };
}
@TEST-END-FILE