From a220b027221faa310341a23ee6d6ef9607e07011 Mon Sep 17 00:00:00 2001 From: Johanna Amann Date: Wed, 1 Jul 2020 16:38:10 -0700 Subject: [PATCH] BrokerStore<->Zeek tables: &backend works for in-memory stores. Currently this requires using this with a normal cluster - or sending messages by yourself. It, in principle, should also work with SQLITE - but that is a bit nonsensical without being able to change the storage location. --- scripts/base/frameworks/cluster/__load__.zeek | 2 + .../frameworks/cluster/broker-stores.zeek | 46 +++++++++++++++++ scripts/base/frameworks/cluster/main.zeek | 4 -- scripts/base/init-bare.zeek | 1 + src/broker/Manager.cc | 10 ++-- src/zeek.bif | 3 +- .../clone.out | 18 +++++++ .../master.out | 18 +++++++ .../store/brokerstore-backend-simple.zeek | 49 ++++++++++--------- 9 files changed, 118 insertions(+), 33 deletions(-) create mode 100644 scripts/base/frameworks/cluster/broker-stores.zeek create mode 100644 testing/btest/Baseline/broker.store.brokerstore-backend-simple/clone.out create mode 100644 testing/btest/Baseline/broker.store.brokerstore-backend-simple/master.out diff --git a/scripts/base/frameworks/cluster/__load__.zeek b/scripts/base/frameworks/cluster/__load__.zeek index a04d6744d2..47918e7d0d 100644 --- a/scripts/base/frameworks/cluster/__load__.zeek +++ b/scripts/base/frameworks/cluster/__load__.zeek @@ -49,5 +49,7 @@ redef Broker::log_topic = Cluster::rr_log_topic; @load ./nodes/worker @endif +@load ./broker-stores.zeek + @endif @endif diff --git a/scripts/base/frameworks/cluster/broker-stores.zeek b/scripts/base/frameworks/cluster/broker-stores.zeek new file mode 100644 index 0000000000..11155e5387 --- /dev/null +++ b/scripts/base/frameworks/cluster/broker-stores.zeek @@ -0,0 +1,46 @@ + +module Broker; + +export { + global announce_masters: event(masters: set[string]); +} + +@if ( Cluster::is_enabled() && Cluster::local_node_type() != Cluster::MANAGER ) +redef Broker::auto_store_master = F; +@endif + +@if ( Broker::auto_store_master ) + +global broker_backed_ids: set[string]; + +event zeek_init() + { + local globals = global_ids(); + for ( id in globals ) + { + if ( globals[id]$broker_backend ) + add broker_backed_ids[id]; + } + } + +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) &priority=1 + { + if ( ! Cluster::is_enabled() ) + return; + + local e = Broker::make_event(Broker::announce_masters, broker_backed_ids); + Broker::publish(Cluster::nodeid_topic(endpoint$id), e); + } + +@else + +event Broker::announce_masters(masters: set[string]) + { + for ( i in masters ) + { + local name = "___sync_store_" + i; + Broker::create_clone(name); + } + } + +@endif diff --git a/scripts/base/frameworks/cluster/main.zeek b/scripts/base/frameworks/cluster/main.zeek index 28c4630d7a..1f39a7f7cf 100644 --- a/scripts/base/frameworks/cluster/main.zeek +++ b/scripts/base/frameworks/cluster/main.zeek @@ -249,10 +249,6 @@ export { global nodeid_topic: function(id: string): string; } -@if ( Cluster::is_enabled() && Cluster::local_node_type() != Cluster::MANAGER ) ) -redef Broker::store_master = T; -@endif - global active_worker_ids: set[string] = set(); type NamedNode: record { diff --git a/scripts/base/init-bare.zeek b/scripts/base/init-bare.zeek index a02a676df9..d65b97b116 100644 --- a/scripts/base/init-bare.zeek +++ b/scripts/base/init-bare.zeek @@ -734,6 +734,7 @@ type script_id: record { enum_constant: bool; ##< True if the identifier is an enum value. option_value: bool; ##< True if the identifier is an option. redefinable: bool; ##< True if the identifier is declared with the :zeek:attr:`&redef` attribute. + broker_backend: bool; ##< True if the identifier has a broker backend defined using the :zeek:attr:`&backend` attribute. value: any &optional; ##< The current value of the identifier. }; diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index 435f2e921a..b897083a40 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -235,13 +235,15 @@ void Manager::InitializeBrokerStoreForwarding() id->GetVal()->AsTableVal()->SetBrokerStore(storename); AddForwardedStore(storename, {NewRef{}, id->GetVal()->AsTableVal()}); - auto backend = bro_broker::to_backend_type(e); + // we only create masters here. For clones, we do all the work of setting up + // the forwarding - but we do not try to initialize the clone. We can only initialize + // the clone, once a node has a connection to a master. This is currently done in scriptland + // - check FIXME. if ( zeek_table_manager ) - MakeMaster(storename, backend, broker::backend_options{}); - else { - MakeClone(storename); + auto backend = bro_broker::to_backend_type(e); + MakeMaster(storename, backend, broker::backend_options{}); } } } diff --git a/src/zeek.bif b/src/zeek.bif index d63bf1453f..6a7fc740d8 100644 --- a/src/zeek.bif +++ b/src/zeek.bif @@ -1955,9 +1955,10 @@ function global_ids%(%): id_table rec->Assign(3, val_mgr->Bool(id->IsEnumConst())); rec->Assign(4, val_mgr->Bool(id->IsOption())); rec->Assign(5, val_mgr->Bool(id->IsRedefinable())); + rec->Assign(6, val_mgr->Bool(id->GetAttr(zeek::detail::ATTR_BACKEND) != zeek::detail::Attr::nil)); if ( id->HasVal() ) - rec->Assign(6, id->GetVal()); + rec->Assign(7, id->GetVal()); auto id_name = make_intrusive(id->Name()); ids->Assign(std::move(id_name), std::move(rec)); diff --git a/testing/btest/Baseline/broker.store.brokerstore-backend-simple/clone.out b/testing/btest/Baseline/broker.store.brokerstore-backend-simple/clone.out new file mode 100644 index 0000000000..06d6a343ba --- /dev/null +++ b/testing/btest/Baseline/broker.store.brokerstore-backend-simple/clone.out @@ -0,0 +1,18 @@ +{ +[b] = 3, +[whatever] = 5, +[a] = 3 +} +{ +hi +} +{ +[b] = [a=2, b=d, c={ +elem1, +elem2 +}], +[a] = [a=1, b=c, c={ +elem1, +elem2 +}] +} diff --git a/testing/btest/Baseline/broker.store.brokerstore-backend-simple/master.out b/testing/btest/Baseline/broker.store.brokerstore-backend-simple/master.out new file mode 100644 index 0000000000..06d6a343ba --- /dev/null +++ b/testing/btest/Baseline/broker.store.brokerstore-backend-simple/master.out @@ -0,0 +1,18 @@ +{ +[b] = 3, +[whatever] = 5, +[a] = 3 +} +{ +hi +} +{ +[b] = [a=2, b=d, c={ +elem1, +elem2 +}], +[a] = [a=1, b=c, c={ +elem1, +elem2 +}] +} diff --git a/testing/btest/broker/store/brokerstore-backend-simple.zeek b/testing/btest/broker/store/brokerstore-backend-simple.zeek index 1382fdc7e3..f21906302d 100644 --- a/testing/btest/broker/store/brokerstore-backend-simple.zeek +++ b/testing/btest/broker/store/brokerstore-backend-simple.zeek @@ -1,13 +1,30 @@ -# @TEST-PORT: BROKER_PORT +# @TEST-PORT: BROKER_PORT1 +# @TEST-PORT: BROKER_PORT2 +# @TEST-PORT: BROKER_PORT3 -# @TEST-EXEC: btest-bg-run master "zeek -B broker -b ../master.zeek >../master.out" -# @TEST-EXEC: btest-bg-run clone "zeek -B broker -b ../clone.zeek >../clone.out" +# @TEST-EXEC: btest-bg-run manager-1 "ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=manager-1 zeek -B broker ../master.zeek >../master.out" +# @TEST-EXEC: btest-bg-run worker-1 "ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=worker-1 zeek -B broker ../clone.zeek >../clone.out" +# @TEST-EXEC: btest-bg-run worker-2 "ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=worker-2 zeek -B broker ../clone.zeek >../clone2.out" # @TEST-EXEC: btest-bg-wait 15 # +# @TEST-EXEC: btest-diff master.out # @TEST-EXEC: btest-diff clone.out +# @TEST-EXEC: diff master.out clone.out +# @TEST-EXEC: diff master.out clone2.out + +@TEST-START-FILE cluster-layout.zeek +redef Cluster::nodes = { + ["manager-1"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT1"))], + ["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT2")), $manager="manager-1", $interface="eth0"], + ["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT3")), $manager="manager-1", $interface="eth0"], +}; +@TEST-END-FILE + @TEST-START-FILE master.zeek redef exit_only_after_terminate = T; +redef Log::enable_local_logging = T; +redef Log::default_rotation_interval = 0secs; module TestModule; @@ -23,12 +40,6 @@ global r: table[string] of testrec &backend=Broker::MEMORY; event zeek_init() { - Broker::listen("127.0.0.1", to_port(getenv("BROKER_PORT"))); - } - -event insert_stuff() - { - print "Inserting stuff"; t["a"] = 5; delete t["a"]; add s["hi"]; @@ -46,12 +57,6 @@ event insert_stuff() print r; } -event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) - { - print "Peer added ", endpoint; - schedule 3secs { insert_stuff() }; - } - event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) { terminate(); @@ -61,7 +66,8 @@ event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) @TEST-START-FILE clone.zeek redef exit_only_after_terminate = T; -redef Broker::auto_store_master = F; +redef Log::enable_local_logging = T; +redef Log::default_rotation_interval = 0secs; module TestModule; @@ -76,11 +82,6 @@ global s: set[string] &backend=Broker::MEMORY; global r: table[string] of testrec &backend=Broker::MEMORY; -event zeek_init() - { - Broker::peer("127.0.0.1", to_port(getenv("BROKER_PORT"))); - } - event dump_tables() { print t; @@ -89,9 +90,9 @@ event dump_tables() terminate(); } -event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) +event Cluster::node_up(name: string, id: string) { - print "Peer added"; - schedule 5secs { dump_tables() }; + #print "node up", name; + schedule 4secs { dump_tables() }; } @TEST-END-FILE