diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index b897083a40..2dd32c2713 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -235,16 +235,29 @@ void Manager::InitializeBrokerStoreForwarding() id->GetVal()->AsTableVal()->SetBrokerStore(storename); AddForwardedStore(storename, {NewRef{}, id->GetVal()->AsTableVal()}); - // we only create masters here. For clones, we do all the work of setting up // the forwarding - but we do not try to initialize the clone. We can only initialize // the clone, once a node has a connection to a master. This is currently done in scriptland // - check FIXME. - if ( zeek_table_manager ) - { - auto backend = bro_broker::to_backend_type(e); - MakeMaster(storename, backend, broker::backend_options{}); - } + if ( ! zeek_table_manager ) + continue; + + auto backend = bro_broker::to_backend_type(e); + auto suffix = ".store"; + + switch ( backend ) { + case broker::backend::sqlite: + suffix = ".sqlite"; + break; + case broker::backend::rocksdb: + suffix = ".rocksdb"; + break; + default: + break; + } + auto path = zeek_table_db_directory + "/" + storename + suffix; + + MakeMaster(storename, backend, broker::backend_options{{"path", path}}); } } } diff --git a/testing/btest/Baseline/broker.store.brokerstore-backend-simple-reverse/clone.out b/testing/btest/Baseline/broker.store.brokerstore-backend-simple-reverse/clone.out new file mode 100644 index 0000000000..06d6a343ba --- /dev/null +++ b/testing/btest/Baseline/broker.store.brokerstore-backend-simple-reverse/clone.out @@ -0,0 +1,18 @@ +{ +[b] = 3, +[whatever] = 5, +[a] = 3 +} +{ +hi +} +{ +[b] = [a=2, b=d, c={ +elem1, +elem2 +}], +[a] = [a=1, b=c, c={ +elem1, +elem2 +}] +} diff --git a/testing/btest/Baseline/broker.store.brokerstore-backend-simple-reverse/master.out b/testing/btest/Baseline/broker.store.brokerstore-backend-simple-reverse/master.out new file mode 100644 index 0000000000..06d6a343ba --- /dev/null +++ b/testing/btest/Baseline/broker.store.brokerstore-backend-simple-reverse/master.out @@ -0,0 +1,18 @@ +{ +[b] = 3, +[whatever] = 5, +[a] = 3 +} +{ +hi +} +{ +[b] = [a=2, b=d, c={ +elem1, +elem2 +}], +[a] = [a=1, b=c, c={ +elem1, +elem2 +}] +} diff --git a/testing/btest/Baseline/broker.store.brokerstore-backend-sqlite/clone.out b/testing/btest/Baseline/broker.store.brokerstore-backend-sqlite/clone.out new file mode 100644 index 0000000000..06d6a343ba --- /dev/null +++ b/testing/btest/Baseline/broker.store.brokerstore-backend-sqlite/clone.out @@ -0,0 +1,18 @@ +{ +[b] = 3, +[whatever] = 5, +[a] = 3 +} +{ +hi +} +{ +[b] = [a=2, b=d, c={ +elem1, +elem2 +}], +[a] = [a=1, b=c, c={ +elem1, +elem2 +}] +} diff --git a/testing/btest/Baseline/broker.store.brokerstore-backend-sqlite/master.out b/testing/btest/Baseline/broker.store.brokerstore-backend-sqlite/master.out new file mode 100644 index 0000000000..06d6a343ba --- /dev/null +++ b/testing/btest/Baseline/broker.store.brokerstore-backend-sqlite/master.out @@ -0,0 +1,18 @@ +{ +[b] = 3, +[whatever] = 5, +[a] = 3 +} +{ +hi +} +{ +[b] = [a=2, b=d, c={ +elem1, +elem2 +}], +[a] = [a=1, b=c, c={ +elem1, +elem2 +}] +} diff --git a/testing/btest/broker/store/brokerstore-backend-simple-reverse.zeek b/testing/btest/broker/store/brokerstore-backend-simple-reverse.zeek new file mode 100644 index 0000000000..64e8363730 --- /dev/null +++ b/testing/btest/broker/store/brokerstore-backend-simple-reverse.zeek @@ -0,0 +1,135 @@ +# @TEST-PORT: BROKER_PORT1 +# @TEST-PORT: BROKER_PORT2 +# @TEST-PORT: BROKER_PORT3 + +# @TEST-EXEC: btest-bg-run manager-1 "ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=manager-1 zeek -B broker ../master.zeek >../master.out" +# @TEST-EXEC: btest-bg-run worker-1 "ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=worker-1 zeek -B broker ../clone.zeek >../clone.out" +# @TEST-EXEC: btest-bg-run worker-2 "ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=worker-2 zeek -B broker ../clone2.zeek >../clone2.out" +# @TEST-EXEC: btest-bg-wait 15 +# +# @TEST-EXEC: btest-diff master.out +# @TEST-EXEC: btest-diff clone.out +# @TEST-EXEC: diff master.out clone.out +# @TEST-EXEC: diff master.out clone2.out + +@TEST-START-FILE cluster-layout.zeek +redef Cluster::nodes = { + ["manager-1"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT1"))], + ["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT2")), $manager="manager-1", $interface="eth0"], + ["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT3")), $manager="manager-1", $interface="eth0"], +}; +@TEST-END-FILE + + +@TEST-START-FILE master.zeek +redef exit_only_after_terminate = T; +redef Log::enable_local_logging = T; +redef Log::default_rotation_interval = 0secs; + +module TestModule; + +type testrec: record { + a: count; + b: string; + c: set[string]; +}; + +global t: table[string] of count &backend=Broker::MEMORY; +global s: set[string] &backend=Broker::MEMORY; +global r: table[string] of testrec &backend=Broker::MEMORY; + +event zeek_init() + { + } + +event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) + { + print t; + print s; + print r; + terminate(); + } + +@TEST-END-FILE + +@TEST-START-FILE clone.zeek +redef exit_only_after_terminate = T; +redef Log::enable_local_logging = T; +redef Log::default_rotation_interval = 0secs; + +module TestModule; + +type testrec: record { + a: count; + b: string; + c: set[string]; +}; + +global t: table[string] of count &backend=Broker::MEMORY; +global s: set[string] &backend=Broker::MEMORY; +global r: table[string] of testrec &backend=Broker::MEMORY; + +event terminate_me() + { + terminate(); + } + +event dump_tables() + { + t["a"] = 5; + delete t["a"]; + add s["hi"]; + t["a"] = 2; + t["a"] = 3; + t["b"] = 3; + t["c"] = 4; + t["whatever"] = 5; + delete t["c"]; + r["a"] = testrec($a=1, $b="b", $c=set("elem1", "elem2")); + r["a"] = testrec($a=1, $b="c", $c=set("elem1", "elem2")); + r["b"] = testrec($a=2, $b="d", $c=set("elem1", "elem2")); + print t; + print s; + print r; + schedule 2sec { terminate_me() }; + } + +event Cluster::node_up(name: string, id: string) + { + #print "node up", name; + schedule 1secs { dump_tables() }; + } +@TEST-END-FILE + +@TEST-START-FILE clone2.zeek +redef exit_only_after_terminate = T; +redef Log::enable_local_logging = T; +redef Log::default_rotation_interval = 0secs; + +module TestModule; + +type testrec: record { + a: count; + b: string; + c: set[string]; +}; + +global t: table[string] of count &backend=Broker::MEMORY; +global s: set[string] &backend=Broker::MEMORY; +global r: table[string] of testrec &backend=Broker::MEMORY; + +event dump_tables() + { + print t; + print s; + print r; + terminate(); + } + +event Cluster::node_up(name: string, id: string) + { + #print "node up", name; + schedule 4secs { dump_tables() }; + } +@TEST-END-FILE + diff --git a/testing/btest/broker/store/brokerstore-backend-sqlite.zeek b/testing/btest/broker/store/brokerstore-backend-sqlite.zeek new file mode 100644 index 0000000000..0eb728fe22 --- /dev/null +++ b/testing/btest/broker/store/brokerstore-backend-sqlite.zeek @@ -0,0 +1,123 @@ +# @TEST-PORT: BROKER_PORT1 +# @TEST-PORT: BROKER_PORT2 +# @TEST-PORT: BROKER_PORT3 + +# @TEST-EXEC: zeek preseed-sqlite.zeek; +# @TEST-EXEC: btest-bg-run manager-1 "ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=manager-1 zeek -B broker ../master.zeek >../master.out" +# @TEST-EXEC: btest-bg-run worker-1 "ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=worker-1 zeek -B broker ../clone.zeek >../clone.out" +# @TEST-EXEC: btest-bg-run worker-2 "ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=worker-2 zeek -B broker ../clone.zeek >../clone2.out" +# @TEST-EXEC: btest-bg-wait 15 +# +# @TEST-EXEC: btest-diff master.out +# @TEST-EXEC: btest-diff clone.out +# @TEST-EXEC: diff master.out clone.out +# @TEST-EXEC: diff master.out clone2.out + +@TEST-START-FILE cluster-layout.zeek +redef Cluster::nodes = { + ["manager-1"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT1"))], + ["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT2")), $manager="manager-1", $interface="eth0"], + ["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT3")), $manager="manager-1", $interface="eth0"], +}; +@TEST-END-FILE + +@TEST-START-FILE preseed-sqlite.zeek + +module TestModule; + +type testrec: record { + a: count; + b: string; + c: set[string]; +}; + +global t: table[string] of count &backend=Broker::SQLITE; +global s: set[string] &backend=Broker::SQLITE; +global r: table[string] of testrec &backend=Broker::SQLITE; + +event zeek_init() + { + t["a"] = 5; + delete t["a"]; + add s["hi"]; + t["a"] = 2; + t["a"] = 3; + t["b"] = 3; + t["c"] = 4; + t["whatever"] = 5; + delete t["c"]; + r["a"] = testrec($a=1, $b="b", $c=set("elem1", "elem2")); + r["a"] = testrec($a=1, $b="c", $c=set("elem1", "elem2")); + r["b"] = testrec($a=2, $b="d", $c=set("elem1", "elem2")); + print t; + print s; + print r; + } + +@TEST-END-FILE + +@TEST-START-FILE master.zeek +redef exit_only_after_terminate = T; +redef Log::enable_local_logging = T; +redef Log::default_rotation_interval = 0secs; + +module TestModule; + +type testrec: record { + a: count; + b: string; + c: set[string]; +}; + +global t: table[string] of count &backend=Broker::SQLITE; +global s: set[string] &backend=Broker::SQLITE; +global r: table[string] of testrec &backend=Broker::SQLITE; + +redef Broker::auto_store_db_directory = ".."; + +event zeek_init() + { + print t; + print s; + print r; + } + +event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) + { + terminate(); + } + +@TEST-END-FILE + +@TEST-START-FILE clone.zeek +redef exit_only_after_terminate = T; +redef Log::enable_local_logging = T; +redef Log::default_rotation_interval = 0secs; + +module TestModule; + +type testrec: record { + a: count; + b: string; + c: set[string]; +}; + +global t: table[string] of count &backend=Broker::MEMORY; +global s: set[string] &backend=Broker::MEMORY; +global r: table[string] of testrec &backend=Broker::MEMORY; + + +event dump_tables() + { + print t; + print s; + print r; + terminate(); + } + +event Cluster::node_up(name: string, id: string) + { + #print "node up", name; + schedule 4secs { dump_tables() }; + } +@TEST-END-FILE