diff --git a/scripts/base/frameworks/broker/store.zeek b/scripts/base/frameworks/broker/store.zeek index 50559c4522..2456ccbf2f 100644 --- a/scripts/base/frameworks/broker/store.zeek +++ b/scripts/base/frameworks/broker/store.zeek @@ -25,6 +25,10 @@ export { ## A negative/zero value indicates to never buffer commands. const default_clone_mutation_buffer_interval = 2min &redef; + const auto_store_master = T &redef; + + const auto_store_db_directory = "." &redef; + ## Whether a data store query could be completed or not. type QueryStatus: enum { SUCCESS, diff --git a/scripts/base/frameworks/cluster/main.zeek b/scripts/base/frameworks/cluster/main.zeek index 1f39a7f7cf..28c4630d7a 100644 --- a/scripts/base/frameworks/cluster/main.zeek +++ b/scripts/base/frameworks/cluster/main.zeek @@ -249,6 +249,10 @@ export { global nodeid_topic: function(id: string): string; } +@if ( Cluster::is_enabled() && Cluster::local_node_type() != Cluster::MANAGER ) ) +redef Broker::store_master = T; +@endif + global active_worker_ids: set[string] = set(); type NamedNode: record { diff --git a/src/Attr.cc b/src/Attr.cc index 5b56fe1a07..1bcd8827ae 100644 --- a/src/Attr.cc +++ b/src/Attr.cc @@ -19,7 +19,7 @@ const char* attr_name(attr_tag t) "&read_expire", "&write_expire", "&create_expire", "&raw_output", "&priority", "&group", "&log", "&error_handler", "&type_column", - "(&tracked)", "&on_change", "&broker_store", + "(&tracked)", "&on_change", "&broker_store", "&backend", "&deprecated", }; @@ -616,6 +616,43 @@ void Attributes::CheckAttr(Attr* a) } break; + case ATTR_BACKEND: + { + if ( ! global_var || type->Tag() != TYPE_TABLE ) + { + Error("&broker_store only applicable to global sets/tables"); + break; + } + + // cannot do better equality check - the broker types are not actually existing yet when we + // are here. We will do that later - before actually attaching to a broker store + if ( a->GetExpr()->GetType()->Tag() != TYPE_ENUM ) + { + Error("&broker_store must take an enum argument"); + break; + } + + // Temporary since Broker does not support ListVals - and we cannot easily convert to set/vector + if ( type->AsTableType()->IndexTypes().size() != 1 ) + { + Error("&backend only supports one-element set/table indexes"); + } + + if ( Find(ATTR_EXPIRE_FUNC ) ) + { + Error("&backend and &expire_func cannot be used simultaneously"); + } + if ( Find(ATTR_EXPIRE_READ) ) + { + Error("&backend and &read_expire cannot be used simultaneously"); + } + if ( Find(ATTR_BROKER_STORE) ) + { + Error("&backend and &broker_store cannot be used simultaneously"); + } + + break; + } case ATTR_BROKER_STORE: { if ( type->Tag() != TYPE_TABLE ) @@ -635,6 +672,7 @@ void Attributes::CheckAttr(Attr* a) { Error("&broker_store only supports one-element set/table indexes"); } + if ( Find(ATTR_EXPIRE_FUNC ) ) { Error("&broker_store and &expire_func cannot be used simultaneously"); diff --git a/src/Attr.h b/src/Attr.h index a1a2677981..be4cc7c605 100644 --- a/src/Attr.h +++ b/src/Attr.h @@ -33,6 +33,7 @@ enum [[deprecated("Remove in v4.1. Use zeek::detail::attr_tag instead.")]] attr_ ATTR_TRACKED, // hidden attribute, tracked by NotifierRegistry ATTR_ON_CHANGE, // for table change tracking ATTR_BROKER_STORE, // for broker-store backed tables + ATTR_BACKEND, // for broker-store backed tabled ATTR_DEPRECATED, NUM_ATTRS // this item should always be last }; @@ -58,6 +59,7 @@ enum attr_tag { ATTR_TRACKED, // hidden attribute, tracked by NotifierRegistry ATTR_ON_CHANGE, // for table change tracking ATTR_BROKER_STORE, // for broker-store backed tables + ATTR_BACKEND, // for broker-store backed tabled ATTR_DEPRECATED, NUM_ATTRS // this item should always be last }; diff --git a/src/Val.h b/src/Val.h index d3f3e18f1a..be87a59ee3 100644 --- a/src/Val.h +++ b/src/Val.h @@ -1030,6 +1030,8 @@ public: // on zeek::RecordTypes. static void DoneParsing(); + void SetBrokerStore(const std::string& store) { broker_store = store; } + protected: void Init(IntrusivePtr t); diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index b9c95125e1..435f2e921a 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -151,6 +151,8 @@ void Manager::InitPostScript() log_topic_func = get_option("Broker::log_topic")->AsFunc(); log_id_type = zeek::id::find_type("Log::ID")->AsEnumType(); writer_id_type = zeek::id::find_type("Log::Writer")->AsEnumType(); + zeek_table_manager = get_option("Broker::auto_store_master")->AsBool(); + zeek_table_db_directory = get_option("Broker::auto_store_db_directory")->AsString()->CheckString(); opaque_of_data_type = make_intrusive("Broker::Data"); opaque_of_set_iterator = make_intrusive("Broker::SetIterator"); @@ -214,6 +216,35 @@ void Manager::InitPostScript() reporter->FatalError("Failed to register broker status subscriber with iosource_mgr"); bstate->subscriber.add_topic(broker::topics::store_events, true); + + InitializeBrokerStoreForwarding(); + } + +void Manager::InitializeBrokerStoreForwarding() + { + const auto& globals = global_scope()->Vars(); + + for ( const auto& global : globals ) + { + auto& id = global.second; + if ( id->HasVal() && id->GetAttr(zeek::detail::ATTR_BACKEND) ) + { + const auto& attr = id->GetAttr(zeek::detail::ATTR_BACKEND); + auto e = static_cast(attr->GetExpr()->Eval(nullptr)->AsEnum()); + auto storename = std::string("___sync_store_") + global.first; + id->GetVal()->AsTableVal()->SetBrokerStore(storename); + AddForwardedStore(storename, {NewRef{}, id->GetVal()->AsTableVal()}); + + auto backend = bro_broker::to_backend_type(e); + + if ( zeek_table_manager ) + MakeMaster(storename, backend, broker::backend_options{}); + else + { + MakeClone(storename); + } + } + } } void Manager::Terminate() diff --git a/src/broker/Manager.h b/src/broker/Manager.h index 9ccefde4b0..201040aaaf 100644 --- a/src/broker/Manager.h +++ b/src/broker/Manager.h @@ -355,6 +355,7 @@ private: void ProcessError(broker::error err); void ProcessStoreResponse(StoreHandleVal*, broker::store::response response); void FlushPendingQueries(); + void InitializeBrokerStoreForwarding(); // Check if a broker store is associated to a table on the Zeek side. void CheckForwarding(const std::string& name); // Send the content of a broker store to the backing table. This is typically used @@ -411,6 +412,8 @@ private: IntrusivePtr vector_of_data_type; zeek::EnumType* log_id_type; zeek::EnumType* writer_id_type; + bool zeek_table_manager = false; + std::string zeek_table_db_directory; static int script_scope; }; diff --git a/src/parse.y b/src/parse.y index f5576f7995..d8d7abcf95 100644 --- a/src/parse.y +++ b/src/parse.y @@ -5,7 +5,7 @@ // Switching parser table type fixes ambiguity problems. %define lr.type ielr -%expect 117 +%expect 123 %token TOK_ADD TOK_ADD_TO TOK_ADDR TOK_ANY %token TOK_ATENDIF TOK_ATELSE TOK_ATIF TOK_ATIFDEF TOK_ATIFNDEF @@ -24,7 +24,7 @@ %token TOK_ATTR_ADD_FUNC TOK_ATTR_DEFAULT TOK_ATTR_OPTIONAL TOK_ATTR_REDEF %token TOK_ATTR_DEL_FUNC TOK_ATTR_EXPIRE_FUNC %token TOK_ATTR_EXPIRE_CREATE TOK_ATTR_EXPIRE_READ TOK_ATTR_EXPIRE_WRITE -%token TOK_ATTR_RAW_OUTPUT TOK_ATTR_ON_CHANGE TOK_ATTR_BROKER_STORE +%token TOK_ATTR_RAW_OUTPUT TOK_ATTR_ON_CHANGE TOK_ATTR_BROKER_STORE TOK_ATTR_BACKEND %token TOK_ATTR_PRIORITY TOK_ATTR_LOG TOK_ATTR_ERROR_HANDLER %token TOK_ATTR_TYPE_COLUMN TOK_ATTR_DEPRECATED @@ -1367,6 +1367,8 @@ attr: { $$ = new zeek::detail::Attr(zeek::detail::ATTR_ON_CHANGE, {AdoptRef{}, $3}); } | TOK_ATTR_BROKER_STORE '=' expr { $$ = new zeek::detail::Attr(zeek::detail::ATTR_BROKER_STORE, {AdoptRef{}, $3}); } + | TOK_ATTR_BACKEND '=' expr + { $$ = new zeek::detail::Attr(zeek::detail::ATTR_BACKEND, {AdoptRef{}, $3}); } | TOK_ATTR_EXPIRE_FUNC '=' expr { $$ = new zeek::detail::Attr(zeek::detail::ATTR_EXPIRE_FUNC, {AdoptRef{}, $3}); } | TOK_ATTR_EXPIRE_CREATE '=' expr diff --git a/src/scan.l b/src/scan.l index bcaa084663..b2a716229a 100644 --- a/src/scan.l +++ b/src/scan.l @@ -313,6 +313,7 @@ when return TOK_WHEN; &write_expire return TOK_ATTR_EXPIRE_WRITE; &on_change return TOK_ATTR_ON_CHANGE; &broker_store return TOK_ATTR_BROKER_STORE; +&backend return TOK_ATTR_BACKEND; @deprecated.* { auto num_files = file_stack.length(); diff --git a/testing/btest/broker/store/brokerstore-backend-simple.zeek b/testing/btest/broker/store/brokerstore-backend-simple.zeek new file mode 100644 index 0000000000..1382fdc7e3 --- /dev/null +++ b/testing/btest/broker/store/brokerstore-backend-simple.zeek @@ -0,0 +1,97 @@ +# @TEST-PORT: BROKER_PORT + +# @TEST-EXEC: btest-bg-run master "zeek -B broker -b ../master.zeek >../master.out" +# @TEST-EXEC: btest-bg-run clone "zeek -B broker -b ../clone.zeek >../clone.out" +# @TEST-EXEC: btest-bg-wait 15 +# +# @TEST-EXEC: btest-diff clone.out + +@TEST-START-FILE master.zeek +redef exit_only_after_terminate = T; + +module TestModule; + +type testrec: record { + a: count; + b: string; + c: set[string]; +}; + +global t: table[string] of count &backend=Broker::MEMORY; +global s: set[string] &backend=Broker::MEMORY; +global r: table[string] of testrec &backend=Broker::MEMORY; + +event zeek_init() + { + Broker::listen("127.0.0.1", to_port(getenv("BROKER_PORT"))); + } + +event insert_stuff() + { + print "Inserting stuff"; + t["a"] = 5; + delete t["a"]; + add s["hi"]; + t["a"] = 2; + t["a"] = 3; + t["b"] = 3; + t["c"] = 4; + t["whatever"] = 5; + delete t["c"]; + r["a"] = testrec($a=1, $b="b", $c=set("elem1", "elem2")); + r["a"] = testrec($a=1, $b="c", $c=set("elem1", "elem2")); + r["b"] = testrec($a=2, $b="d", $c=set("elem1", "elem2")); + print t; + print s; + print r; + } + +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) + { + print "Peer added ", endpoint; + schedule 3secs { insert_stuff() }; + } + +event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) + { + terminate(); + } + +@TEST-END-FILE + +@TEST-START-FILE clone.zeek +redef exit_only_after_terminate = T; +redef Broker::auto_store_master = F; + +module TestModule; + +type testrec: record { + a: count; + b: string; + c: set[string]; +}; + +global t: table[string] of count &backend=Broker::MEMORY; +global s: set[string] &backend=Broker::MEMORY; +global r: table[string] of testrec &backend=Broker::MEMORY; + + +event zeek_init() + { + Broker::peer("127.0.0.1", to_port(getenv("BROKER_PORT"))); + } + +event dump_tables() + { + print t; + print s; + print r; + terminate(); + } + +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) + { + print "Peer added"; + schedule 5secs { dump_tables() }; + } +@TEST-END-FILE