From f6251e62a0814d9d95d036d18bb9bee3d8931cd0 Mon Sep 17 00:00:00 2001 From: Johanna Amann Date: Wed, 1 Jul 2020 17:10:43 -0700 Subject: [PATCH] BrokerStore<->Zeek tables: allow setting storage location & tests With this, the basic functionality of &backend seems to be working. It is not yet integrated with zeekctl, one has to manually specify the storage location for the sqlite files somewhere when using sqlite. Usage for memory stores: global table_to_share: table[string] of count &backend=Broker::MEMORY; Usage for sqlite stores: redef Broker::auto_store_db_directory = "[path]"; global table_to_share: table[string] of count &backend=Broker::SQLITE; In both cases, the cluster should automatically sync to changes done by any node. When using sqlite, data should also be saved to disk and re-loaded on startup. --- src/broker/Manager.cc | 25 +++- .../clone.out | 18 +++ .../master.out | 18 +++ .../clone.out | 18 +++ .../master.out | 18 +++ .../brokerstore-backend-simple-reverse.zeek | 135 ++++++++++++++++++ .../store/brokerstore-backend-sqlite.zeek | 123 ++++++++++++++++ 7 files changed, 349 insertions(+), 6 deletions(-) create mode 100644 testing/btest/Baseline/broker.store.brokerstore-backend-simple-reverse/clone.out create mode 100644 testing/btest/Baseline/broker.store.brokerstore-backend-simple-reverse/master.out create mode 100644 testing/btest/Baseline/broker.store.brokerstore-backend-sqlite/clone.out create mode 100644 testing/btest/Baseline/broker.store.brokerstore-backend-sqlite/master.out create mode 100644 testing/btest/broker/store/brokerstore-backend-simple-reverse.zeek create mode 100644 testing/btest/broker/store/brokerstore-backend-sqlite.zeek diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index b897083a40..2dd32c2713 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -235,16 +235,29 @@ void Manager::InitializeBrokerStoreForwarding() id->GetVal()->AsTableVal()->SetBrokerStore(storename); AddForwardedStore(storename, {NewRef{}, id->GetVal()->AsTableVal()}); - // we only create masters here. For clones, we do all the work of setting up // the forwarding - but we do not try to initialize the clone. We can only initialize // the clone, once a node has a connection to a master. This is currently done in scriptland // - check FIXME. - if ( zeek_table_manager ) - { - auto backend = bro_broker::to_backend_type(e); - MakeMaster(storename, backend, broker::backend_options{}); - } + if ( ! zeek_table_manager ) + continue; + + auto backend = bro_broker::to_backend_type(e); + auto suffix = ".store"; + + switch ( backend ) { + case broker::backend::sqlite: + suffix = ".sqlite"; + break; + case broker::backend::rocksdb: + suffix = ".rocksdb"; + break; + default: + break; + } + auto path = zeek_table_db_directory + "/" + storename + suffix; + + MakeMaster(storename, backend, broker::backend_options{{"path", path}}); } } } diff --git a/testing/btest/Baseline/broker.store.brokerstore-backend-simple-reverse/clone.out b/testing/btest/Baseline/broker.store.brokerstore-backend-simple-reverse/clone.out new file mode 100644 index 0000000000..06d6a343ba --- /dev/null +++ b/testing/btest/Baseline/broker.store.brokerstore-backend-simple-reverse/clone.out @@ -0,0 +1,18 @@ +{ +[b] = 3, +[whatever] = 5, +[a] = 3 +} +{ +hi +} +{ +[b] = [a=2, b=d, c={ +elem1, +elem2 +}], +[a] = [a=1, b=c, c={ +elem1, +elem2 +}] +} diff --git a/testing/btest/Baseline/broker.store.brokerstore-backend-simple-reverse/master.out b/testing/btest/Baseline/broker.store.brokerstore-backend-simple-reverse/master.out new file mode 100644 index 0000000000..06d6a343ba --- /dev/null +++ b/testing/btest/Baseline/broker.store.brokerstore-backend-simple-reverse/master.out @@ -0,0 +1,18 @@ +{ +[b] = 3, +[whatever] = 5, +[a] = 3 +} +{ +hi +} +{ +[b] = [a=2, b=d, c={ +elem1, +elem2 +}], +[a] = [a=1, b=c, c={ +elem1, +elem2 +}] +} diff --git a/testing/btest/Baseline/broker.store.brokerstore-backend-sqlite/clone.out b/testing/btest/Baseline/broker.store.brokerstore-backend-sqlite/clone.out new file mode 100644 index 0000000000..06d6a343ba --- /dev/null +++ b/testing/btest/Baseline/broker.store.brokerstore-backend-sqlite/clone.out @@ -0,0 +1,18 @@ +{ +[b] = 3, +[whatever] = 5, +[a] = 3 +} +{ +hi +} +{ +[b] = [a=2, b=d, c={ +elem1, +elem2 +}], +[a] = [a=1, b=c, c={ +elem1, +elem2 +}] +} diff --git a/testing/btest/Baseline/broker.store.brokerstore-backend-sqlite/master.out b/testing/btest/Baseline/broker.store.brokerstore-backend-sqlite/master.out new file mode 100644 index 0000000000..06d6a343ba --- /dev/null +++ b/testing/btest/Baseline/broker.store.brokerstore-backend-sqlite/master.out @@ -0,0 +1,18 @@ +{ +[b] = 3, +[whatever] = 5, +[a] = 3 +} +{ +hi +} +{ +[b] = [a=2, b=d, c={ +elem1, +elem2 +}], +[a] = [a=1, b=c, c={ +elem1, +elem2 +}] +} diff --git a/testing/btest/broker/store/brokerstore-backend-simple-reverse.zeek b/testing/btest/broker/store/brokerstore-backend-simple-reverse.zeek new file mode 100644 index 0000000000..64e8363730 --- /dev/null +++ b/testing/btest/broker/store/brokerstore-backend-simple-reverse.zeek @@ -0,0 +1,135 @@ +# @TEST-PORT: BROKER_PORT1 +# @TEST-PORT: BROKER_PORT2 +# @TEST-PORT: BROKER_PORT3 + +# @TEST-EXEC: btest-bg-run manager-1 "ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=manager-1 zeek -B broker ../master.zeek >../master.out" +# @TEST-EXEC: btest-bg-run worker-1 "ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=worker-1 zeek -B broker ../clone.zeek >../clone.out" +# @TEST-EXEC: btest-bg-run worker-2 "ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=worker-2 zeek -B broker ../clone2.zeek >../clone2.out" +# @TEST-EXEC: btest-bg-wait 15 +# +# @TEST-EXEC: btest-diff master.out +# @TEST-EXEC: btest-diff clone.out +# @TEST-EXEC: diff master.out clone.out +# @TEST-EXEC: diff master.out clone2.out + +@TEST-START-FILE cluster-layout.zeek +redef Cluster::nodes = { + ["manager-1"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT1"))], + ["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT2")), $manager="manager-1", $interface="eth0"], + ["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT3")), $manager="manager-1", $interface="eth0"], +}; +@TEST-END-FILE + + +@TEST-START-FILE master.zeek +redef exit_only_after_terminate = T; +redef Log::enable_local_logging = T; +redef Log::default_rotation_interval = 0secs; + +module TestModule; + +type testrec: record { + a: count; + b: string; + c: set[string]; +}; + +global t: table[string] of count &backend=Broker::MEMORY; +global s: set[string] &backend=Broker::MEMORY; +global r: table[string] of testrec &backend=Broker::MEMORY; + +event zeek_init() + { + } + +event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) + { + print t; + print s; + print r; + terminate(); + } + +@TEST-END-FILE + +@TEST-START-FILE clone.zeek +redef exit_only_after_terminate = T; +redef Log::enable_local_logging = T; +redef Log::default_rotation_interval = 0secs; + +module TestModule; + +type testrec: record { + a: count; + b: string; + c: set[string]; +}; + +global t: table[string] of count &backend=Broker::MEMORY; +global s: set[string] &backend=Broker::MEMORY; +global r: table[string] of testrec &backend=Broker::MEMORY; + +event terminate_me() + { + terminate(); + } + +event dump_tables() + { + t["a"] = 5; + delete t["a"]; + add s["hi"]; + t["a"] = 2; + t["a"] = 3; + t["b"] = 3; + t["c"] = 4; + t["whatever"] = 5; + delete t["c"]; + r["a"] = testrec($a=1, $b="b", $c=set("elem1", "elem2")); + r["a"] = testrec($a=1, $b="c", $c=set("elem1", "elem2")); + r["b"] = testrec($a=2, $b="d", $c=set("elem1", "elem2")); + print t; + print s; + print r; + schedule 2sec { terminate_me() }; + } + +event Cluster::node_up(name: string, id: string) + { + #print "node up", name; + schedule 1secs { dump_tables() }; + } +@TEST-END-FILE + +@TEST-START-FILE clone2.zeek +redef exit_only_after_terminate = T; +redef Log::enable_local_logging = T; +redef Log::default_rotation_interval = 0secs; + +module TestModule; + +type testrec: record { + a: count; + b: string; + c: set[string]; +}; + +global t: table[string] of count &backend=Broker::MEMORY; +global s: set[string] &backend=Broker::MEMORY; +global r: table[string] of testrec &backend=Broker::MEMORY; + +event dump_tables() + { + print t; + print s; + print r; + terminate(); + } + +event Cluster::node_up(name: string, id: string) + { + #print "node up", name; + schedule 4secs { dump_tables() }; + } +@TEST-END-FILE + diff --git a/testing/btest/broker/store/brokerstore-backend-sqlite.zeek b/testing/btest/broker/store/brokerstore-backend-sqlite.zeek new file mode 100644 index 0000000000..0eb728fe22 --- /dev/null +++ b/testing/btest/broker/store/brokerstore-backend-sqlite.zeek @@ -0,0 +1,123 @@ +# @TEST-PORT: BROKER_PORT1 +# @TEST-PORT: BROKER_PORT2 +# @TEST-PORT: BROKER_PORT3 + +# @TEST-EXEC: zeek preseed-sqlite.zeek; +# @TEST-EXEC: btest-bg-run manager-1 "ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=manager-1 zeek -B broker ../master.zeek >../master.out" +# @TEST-EXEC: btest-bg-run worker-1 "ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=worker-1 zeek -B broker ../clone.zeek >../clone.out" +# @TEST-EXEC: btest-bg-run worker-2 "ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=worker-2 zeek -B broker ../clone.zeek >../clone2.out" +# @TEST-EXEC: btest-bg-wait 15 +# +# @TEST-EXEC: btest-diff master.out +# @TEST-EXEC: btest-diff clone.out +# @TEST-EXEC: diff master.out clone.out +# @TEST-EXEC: diff master.out clone2.out + +@TEST-START-FILE cluster-layout.zeek +redef Cluster::nodes = { + ["manager-1"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT1"))], + ["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT2")), $manager="manager-1", $interface="eth0"], + ["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT3")), $manager="manager-1", $interface="eth0"], +}; +@TEST-END-FILE + +@TEST-START-FILE preseed-sqlite.zeek + +module TestModule; + +type testrec: record { + a: count; + b: string; + c: set[string]; +}; + +global t: table[string] of count &backend=Broker::SQLITE; +global s: set[string] &backend=Broker::SQLITE; +global r: table[string] of testrec &backend=Broker::SQLITE; + +event zeek_init() + { + t["a"] = 5; + delete t["a"]; + add s["hi"]; + t["a"] = 2; + t["a"] = 3; + t["b"] = 3; + t["c"] = 4; + t["whatever"] = 5; + delete t["c"]; + r["a"] = testrec($a=1, $b="b", $c=set("elem1", "elem2")); + r["a"] = testrec($a=1, $b="c", $c=set("elem1", "elem2")); + r["b"] = testrec($a=2, $b="d", $c=set("elem1", "elem2")); + print t; + print s; + print r; + } + +@TEST-END-FILE + +@TEST-START-FILE master.zeek +redef exit_only_after_terminate = T; +redef Log::enable_local_logging = T; +redef Log::default_rotation_interval = 0secs; + +module TestModule; + +type testrec: record { + a: count; + b: string; + c: set[string]; +}; + +global t: table[string] of count &backend=Broker::SQLITE; +global s: set[string] &backend=Broker::SQLITE; +global r: table[string] of testrec &backend=Broker::SQLITE; + +redef Broker::auto_store_db_directory = ".."; + +event zeek_init() + { + print t; + print s; + print r; + } + +event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) + { + terminate(); + } + +@TEST-END-FILE + +@TEST-START-FILE clone.zeek +redef exit_only_after_terminate = T; +redef Log::enable_local_logging = T; +redef Log::default_rotation_interval = 0secs; + +module TestModule; + +type testrec: record { + a: count; + b: string; + c: set[string]; +}; + +global t: table[string] of count &backend=Broker::MEMORY; +global s: set[string] &backend=Broker::MEMORY; +global r: table[string] of testrec &backend=Broker::MEMORY; + + +event dump_tables() + { + print t; + print s; + print r; + terminate(); + } + +event Cluster::node_up(name: string, id: string) + { + #print "node up", name; + schedule 4secs { dump_tables() }; + } +@TEST-END-FILE