diff --git a/CHANGES b/CHANGES index 5894ead16d..2a671a7613 100644 --- a/CHANGES +++ b/CHANGES @@ -1,4 +1,23 @@ +3.2.0-dev.959 | 2020-07-21 15:34:59 +0000 + + * Broker Store table synchronizatio, (Johanna Amann, Corelight) + + Zeek now supports synchronizing tables/sets across clusters using + a backing Broker store. The same feature also allows persistent + storage of data in tables/sets over Zeek restarts. This feature is + implemented using the new ``&backend`` attribute. + + To synchronize a table over a cluster, you can, e.g., use: + + global t: table[string] of count &backend=Broker::MEMORY; + + This feature is documented in detail here: + + https://docs.zeek.org/en/current/frameworks/broker.html#broker-store-backed-zeek-tables-for-data-synchronization-and-persistence + + This feature is experimental and the syntax/specifics can change in the future. + 3.2.0-dev.919 | 2020-07-17 16:37:11 -0700 * Use namespaced version of Location to silence warnings (Tim Wojtulewicz, Corelight) diff --git a/NEWS b/NEWS index 41b44de954..4bdfe4ea10 100644 --- a/NEWS +++ b/NEWS @@ -55,6 +55,22 @@ New Functionality for which no connection state information is available in the core anymore. These cases will raise the new ``expired_conn_weird`` event. +- Broker Store table synchronization (experimental). + + Zeek now supports synchronizing tables/sets across clusters using a backing Broker + store. The same feature also allows persistent storage of data in tables/sets + over Zeek restarts. This feature is implemented using the new ``&backend`` attribute. + + To synchronize a table over a cluster, you can, e.g., use: + + global t: table[string] of count &backend=Broker::MEMORY; + + This feature is documented in detail here: + + https://docs.zeek.org/en/current/frameworks/broker.html#broker-store-backed-zeek-tables-for-data-synchronization-and-persistence + + Note: this feature is experimental and the syntax/specifics can change in the future. + Changed Functionality --------------------- diff --git a/VERSION b/VERSION index e749c36f90..f8bb279c65 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -3.2.0-dev.919 +3.2.0-dev.959 diff --git a/auxil/zeekctl b/auxil/zeekctl index d1a52ae1b6..51794e4446 160000 --- a/auxil/zeekctl +++ b/auxil/zeekctl @@ -1 +1 @@ -Subproject commit d1a52ae1b65d66d56394cb155f10dae4435a5bba +Subproject commit 51794e44462b6df2dc88700fa000be42107b7337 diff --git a/doc b/doc index 9958974168..0ec41dce98 160000 --- a/doc +++ b/doc @@ -1 +1 @@ -Subproject commit 9958974168016e84879af38410881022d1f1fd6d +Subproject commit 0ec41dce98a5a1693d3a241455b95d84bd116cbd diff --git a/scripts/base/frameworks/broker/store.zeek b/scripts/base/frameworks/broker/store.zeek index 50559c4522..2b79cb2c76 100644 --- a/scripts/base/frameworks/broker/store.zeek +++ b/scripts/base/frameworks/broker/store.zeek @@ -25,6 +25,16 @@ export { ## A negative/zero value indicates to never buffer commands. const default_clone_mutation_buffer_interval = 2min &redef; + ## If set to true, the current node is the master node for Broker stores + ## backing Zeek tables. By default this value will be automatically set to + ## true in standalone mode, and on the manager node of a cluster. This value + ## should not typically be changed manually. + const table_store_master = T &redef; + + ## The directory used for storing persistent database files when using Broker + ## store backed Zeek tables. + const table_store_db_directory = "." &redef; + ## Whether a data store query could be completed or not. type QueryStatus: enum { SUCCESS, @@ -136,7 +146,7 @@ export { global store_name: function(h: opaque of Broker::Store): string; ## Check if a key exists in a data store. - ## + ## ## h: the handle of the store to query. ## ## k: the key to lookup. @@ -178,7 +188,7 @@ export { ## k: the key of the container value to lookup. ## ## i: the index to retrieve from the container value. - ## + ## ## Returns: For tables and vectors, the value at the given index, or ## failure if the index doesn't exist. For sets, a boolean ## indicating whether the index exists. Returns failure if the key @@ -217,7 +227,7 @@ export { ## k: the key whose associated value is to be modified. The key must ## already exist. ## - ## a: the amount to increment the value by. + ## a: the amount to increment the value by. ## ## e: the new expiration interval of the modified key. If null, the ## current expiration time isn't changed. @@ -235,7 +245,7 @@ export { ## k: the key whose associated value is to be modified. The key must ## already exist. ## - ## amount: the amount to decrement the value by. + ## amount: the amount to decrement the value by. ## ## e: the new expiration interval of the modified key. If null, the current ## expiration time isn't changed. @@ -258,7 +268,7 @@ export { ## current expiration time isn't changed. ## ## Returns: false if the store handle was not valid. - global append: function(h: opaque of Broker::Store, k: any, s: string, + global append: function(h: opaque of Broker::Store, k: any, s: string, e: interval &default=0sec) : bool; ## Inserts an element into an existing set. @@ -275,7 +285,7 @@ export { ## ## Returns: false if the store handle was not valid. global insert_into_set: function(h: opaque of Broker::Store, - k: any, i: any, + k: any, i: any, e: interval &default=0sec) : bool; ## Inserts an element into an existing table. @@ -286,7 +296,7 @@ export { ## already exist. ## ## i: the index to insert into the table - ## + ## ## v: the value to associate with the index. ## ## e: the new expiration interval of the modified key. If null, the @@ -311,7 +321,7 @@ export { ## ## Returns: false if the store handle was not valid. global remove_from: function(h: opaque of Broker::Store, - k: any, i: any, + k: any, i: any, e: interval &default=0sec) : bool; ## Appends an element to an existing vector. @@ -328,7 +338,7 @@ export { ## ## Returns: false if the store handle was not valid. global push: function(h: opaque of Broker::Store, - k: any, v: any, + k: any, v: any, e: interval &default=0sec) : bool; ## Removes the last element of an existing vector. @@ -383,7 +393,7 @@ export { ## d: the communication data. ## ## Returns: The data type associated with the communication data. - ## Note that broker represents records in the same way as + ## Note that Broker represents records in the same way as ## vectors, so there is no "record" type. global data_type: function(d: Broker::Data): Broker::DataType; diff --git a/scripts/base/frameworks/cluster/__load__.zeek b/scripts/base/frameworks/cluster/__load__.zeek index a04d6744d2..47918e7d0d 100644 --- a/scripts/base/frameworks/cluster/__load__.zeek +++ b/scripts/base/frameworks/cluster/__load__.zeek @@ -49,5 +49,7 @@ redef Broker::log_topic = Cluster::rr_log_topic; @load ./nodes/worker @endif +@load ./broker-stores.zeek + @endif @endif diff --git a/scripts/base/frameworks/cluster/broker-stores.zeek b/scripts/base/frameworks/cluster/broker-stores.zeek new file mode 100644 index 0000000000..04be1fe782 --- /dev/null +++ b/scripts/base/frameworks/cluster/broker-stores.zeek @@ -0,0 +1,61 @@ +##! This script deals with the cluster parts of Broker backed Zeek tables. +##! It makes sure that the master store is set correctly and that clones +##! are automatically created on the non-manager nodes. + +# Note - this script should become unnecessary in the future, when we just can +# speculatively attach clones. This should be possible once the new ALM Broker +# transport becomes available. + +@load ./main + +module Broker; + +export { + ## Event that is used by the manager to announce the master stores for Broker backed + ## tables. + global announce_masters: event(masters: set[string]); +} + +# If we are not the manager, disable automatically generating masters. We will attach +# clones instead. +@if ( Cluster::is_enabled() && Cluster::local_node_type() != Cluster::MANAGER ) +redef Broker::table_store_master = F; +@endif + +@if ( Broker::table_store_master ) + +global broker_backed_ids: set[string]; + +event zeek_init() + { + local globals = global_ids(); + for ( id in globals ) + { + if ( globals[id]$broker_backend ) + add broker_backed_ids[id]; + } + } + +# Send the auto masters we created to the newly connected node +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) &priority=1 + { + if ( ! Cluster::is_enabled() ) + return; + + local e = Broker::make_event(Broker::announce_masters, broker_backed_ids); + Broker::publish(Cluster::nodeid_topic(endpoint$id), e); + } + +@else + +event Broker::announce_masters(masters: set[string]) + { + for ( i in masters ) + { + # this magic name for the store is created in broker/Manager.cc for the manager. + local name = "___sync_store_" + i; + Broker::create_clone(name); + } + } + +@endif diff --git a/scripts/base/init-bare.zeek b/scripts/base/init-bare.zeek index 5813e63c9e..8278694dcd 100644 --- a/scripts/base/init-bare.zeek +++ b/scripts/base/init-bare.zeek @@ -747,6 +747,7 @@ type script_id: record { enum_constant: bool; ##< True if the identifier is an enum value. option_value: bool; ##< True if the identifier is an option. redefinable: bool; ##< True if the identifier is declared with the :zeek:attr:`&redef` attribute. + broker_backend: bool; ##< True if the identifier has a Broker backend defined using the :zeek:attr:`&backend` attribute. value: any &optional; ##< The current value of the identifier. }; diff --git a/src/Attr.cc b/src/Attr.cc index c464be6f54..5fbced8740 100644 --- a/src/Attr.cc +++ b/src/Attr.cc @@ -7,6 +7,7 @@ #include "Desc.h" #include "Val.h" #include "IntrusivePtr.h" +#include "input/Manager.h" #include "threading/SerialTypes.h" namespace zeek::detail { @@ -19,7 +20,8 @@ const char* attr_name(AttrTag t) "&read_expire", "&write_expire", "&create_expire", "&raw_output", "&priority", "&group", "&log", "&error_handler", "&type_column", - "(&tracked)", "&on_change", "&deprecated", + "(&tracked)", "&on_change", "&broker_store", + "&broker_allow_complex_type", "&backend", "&deprecated", }; return attr_names[int(t)]; @@ -456,6 +458,15 @@ void Attributes::CheckAttr(Attr* a) break; case ATTR_EXPIRE_READ: + { + if ( Find(ATTR_BROKER_STORE) ) + Error("&broker_store and &read_expire cannot be used simultaneously"); + + if ( Find(ATTR_BACKEND) ) + Error("&backend and &read_expire cannot be used simultaneously"); + } + // fallthrough + case ATTR_EXPIRE_WRITE: case ATTR_EXPIRE_CREATE: { @@ -534,14 +545,22 @@ void Attributes::CheckAttr(Attr* a) if ( ! e_ft->CheckArgs(&expected_args) ) Error("&expire_func argument type clash"); - } + + if ( Find(ATTR_BROKER_STORE ) ) + Error("&broker_store and &expire_func cannot be used simultaneously"); + + if ( Find(ATTR_BACKEND ) ) + Error("&backend and &expire_func cannot be used simultaneously"); + break; + } + case ATTR_ON_CHANGE: { if ( type->Tag() != TYPE_TABLE ) { - Error("&on_change only applicable to tables"); + Error("&on_change only applicable to sets/tables"); break; } @@ -602,6 +621,98 @@ void Attributes::CheckAttr(Attr* a) } break; + case ATTR_BACKEND: + { + if ( ! global_var || type->Tag() != TYPE_TABLE ) + { + Error("&backend only applicable to global sets/tables"); + break; + } + + // cannot do better equality check - the Broker types are not + // actually existing yet when we are here. We will do that + // later - before actually attaching to a broker store + if ( a->GetExpr()->GetType()->Tag() != TYPE_ENUM ) + { + Error("&backend must take an enum argument"); + break; + } + + // Temporary since Broker does not support ListVals - and we + // cannot easily convert to set/vector + if ( type->AsTableType()->GetIndexTypes().size() != 1 ) + Error("&backend only supports one-element set/table indexes"); + + // Only support atomic types for the moment, unless + // explicitly overriden + if ( ! type->AsTableType()->IsSet() && + ! input::Manager::IsCompatibleType(type->AsTableType()->Yield().get(), true) && + ! Find(ATTR_BROKER_STORE_ALLOW_COMPLEX) ) + { + Error("&backend only supports atomic types as table value"); + } + + if ( Find(ATTR_EXPIRE_FUNC ) ) + Error("&backend and &expire_func cannot be used simultaneously"); + + if ( Find(ATTR_EXPIRE_READ) ) + Error("&backend and &read_expire cannot be used simultaneously"); + + if ( Find(ATTR_BROKER_STORE) ) + Error("&backend and &broker_store cannot be used simultaneously"); + + break; + } + + case ATTR_BROKER_STORE: + { + if ( type->Tag() != TYPE_TABLE ) + { + Error("&broker_store only applicable to sets/tables"); + break; + } + + if ( a->GetExpr()->GetType()->Tag() != TYPE_STRING ) + { + Error("&broker_store must take a string argument"); + break; + } + + // Temporary since Broker does not support ListVals - and we + // cannot easily convert to set/vector + if ( type->AsTableType()->GetIndexTypes().size() != 1 && ! Find(ATTR_BROKER_STORE_ALLOW_COMPLEX) ) + Error("&broker_store only supports one-element set/table indexes"); + + // Only support atomic types for the moment, unless + // explicitly overriden + if ( ! type->AsTableType()->IsSet() && + ! input::Manager::IsCompatibleType(type->AsTableType()->Yield().get(), true) && + ! Find(ATTR_BROKER_STORE_ALLOW_COMPLEX) ) + { + Error("&broker_store only supports atomic types as table value"); + } + + if ( Find(ATTR_EXPIRE_FUNC ) ) + Error("&broker_store and &expire_func cannot be used simultaneously"); + + if ( Find(ATTR_EXPIRE_READ) ) + Error("&broker_store and &read_expire cannot be used simultaneously"); + + if ( Find(ATTR_BACKEND) ) + Error("&backend and &broker_store cannot be used simultaneously"); + + break; + } + + case ATTR_BROKER_STORE_ALLOW_COMPLEX: + { + if ( type->Tag() != TYPE_TABLE ) + { + Error("&broker_allow_complex_type only applicable to sets/tables"); + break; + } + } + case ATTR_TRACKED: // FIXME: Check here for global ID? break; diff --git a/src/Attr.h b/src/Attr.h index 2c9d278a73..0e18f0ace2 100644 --- a/src/Attr.h +++ b/src/Attr.h @@ -15,7 +15,6 @@ ZEEK_FORWARD_DECLARE_NAMESPACED(Expr, zeek::detail); // modify expressions or supply metadata on types, and the kind that // are extra metadata on every variable instance. - namespace zeek { class Type; @@ -43,6 +42,9 @@ enum AttrTag { ATTR_TYPE_COLUMN, // for input framework ATTR_TRACKED, // hidden attribute, tracked by NotifierRegistry ATTR_ON_CHANGE, // for table change tracking + ATTR_BROKER_STORE, // for Broker store backed tables + ATTR_BROKER_STORE_ALLOW_COMPLEX, // for Broker store backed tables + ATTR_BACKEND, // for Broker store backed tables ATTR_DEPRECATED, NUM_ATTRS // this item should always be last }; diff --git a/src/Val.cc b/src/Val.cc index 7b51ce9620..9079a47a6f 100644 --- a/src/Val.cc +++ b/src/Val.cc @@ -37,6 +37,8 @@ #include "ID.h" #include "broker/Data.h" +#include "broker/Store.h" +#include "broker/Manager.h" #include "threading/formatters/JSON.h" @@ -1511,6 +1513,16 @@ void TableVal::SetAttrs(detail::AttributesPtr a) if ( cf ) change_func = cf->GetExpr(); + + auto bs = attrs->Find(zeek::detail::ATTR_BROKER_STORE); + if ( bs && broker_store.empty() ) + { + IntrusivePtr c = bs->GetExpr()->Eval(nullptr); + assert(c); + assert(c->GetType()->Tag() == zeek::TYPE_STRING); + broker_store = c->AsStringVal()->AsString()->CheckString(); + broker_mgr->AddForwardedStore(broker_store, {NewRef{}, this}); + } } void TableVal::CheckExpireAttr(detail::AttrTag at) @@ -1539,7 +1551,7 @@ void TableVal::CheckExpireAttr(detail::AttrTag at) } } -bool TableVal::Assign(ValPtr index, ValPtr new_val) +bool TableVal::Assign(ValPtr index, ValPtr new_val, bool broker_forward) { auto k = MakeHashKey(*index); @@ -1549,7 +1561,7 @@ bool TableVal::Assign(ValPtr index, ValPtr new_val) return false; } - return Assign(std::move(index), std::move(k), std::move(new_val)); + return Assign(std::move(index), std::move(k), std::move(new_val), broker_forward); } bool TableVal::Assign(Val* index, Val* new_val) @@ -1558,7 +1570,7 @@ bool TableVal::Assign(Val* index, Val* new_val) } bool TableVal::Assign(ValPtr index, std::unique_ptr k, - ValPtr new_val) + ValPtr new_val, bool broker_forward) { bool is_set = table_type->IsSet(); @@ -1591,11 +1603,19 @@ bool TableVal::Assign(ValPtr index, std::unique_ptr k, Modified(); - if ( change_func ) + if ( change_func || ( broker_forward && ! broker_store.empty() ) ) { - auto change_index = index ? std::move(index) : RecreateIndex(k_copy); - const auto& v = old_entry_val ? old_entry_val->GetVal() : new_entry_val->GetVal(); - CallChangeFunc(change_index.get(), v, old_entry_val ? ELEMENT_CHANGED : ELEMENT_NEW); + auto change_index = index ? std::move(index) + : RecreateIndex(k_copy); + + if ( broker_forward && ! broker_store.empty() ) + SendToStore(change_index.get(), new_entry_val, old_entry_val ? ELEMENT_CHANGED : ELEMENT_NEW); + + if ( change_func ) + { + const auto& v = old_entry_val ? old_entry_val->GetVal() : new_entry_val->GetVal(); + CallChangeFunc(change_index, v, old_entry_val ? ELEMENT_CHANGED : ELEMENT_NEW); + } } delete old_entry_val; @@ -2061,7 +2081,7 @@ ListValPtr TableVal::RecreateIndex(const HashKey& k) const return table_hash->RecoverVals(k); } -void TableVal::CallChangeFunc(const Val* index, +void TableVal::CallChangeFunc(const ValPtr& index, const ValPtr& old_value, OnChangeType tpe) { @@ -2076,9 +2096,7 @@ void TableVal::CallChangeFunc(const Val* index, auto thefunc = change_func->Eval(nullptr); if ( ! thefunc ) - { return; - } if ( thefunc->GetType()->Tag() != TYPE_FUNC ) { @@ -2087,10 +2105,14 @@ void TableVal::CallChangeFunc(const Val* index, } const zeek::Func* f = thefunc->AsFunc(); - auto lv = index->AsListVal(); + zeek::Args vl; + + // we either get passed the raw index_val - or a ListVal with exactly one element. + if ( index->GetType()->Tag() == zeek::TYPE_LIST ) + vl.reserve(2 + index->AsListVal()->Length() + table_type->IsTable()); + else + vl.reserve(3 + table_type->IsTable()); - Args vl; - vl.reserve(2 + lv->Length() + table_type->IsTable()); vl.emplace_back(NewRef{}, this); switch ( tpe ) @@ -2108,8 +2130,14 @@ void TableVal::CallChangeFunc(const Val* index, vl.emplace_back(BifType::Enum::TableChange->GetEnumVal(BifEnum::TableChange::TABLE_ELEMENT_EXPIRED)); } - for ( const auto& v : lv->Vals() ) - vl.emplace_back(v); + + if ( index->GetType()->Tag() == zeek::TYPE_LIST ) + { + for ( const auto& v : index->AsListVal()->Vals() ) + vl.emplace_back(v); + } + else + vl.emplace_back(index); if ( table_type->IsTable() ) vl.emplace_back(old_value); @@ -2124,9 +2152,115 @@ void TableVal::CallChangeFunc(const Val* index, in_change_func = false; } -ValPtr TableVal::Remove(const Val& index) +void TableVal::SendToStore(const Val* index, const TableEntryVal* new_entry_val, OnChangeType tpe) + { + if ( broker_store.empty() || ! index ) + return; + + try + { + auto handle = broker_mgr->LookupStore(broker_store); + + if ( ! handle ) + return; + + // we either get passed the raw index_val - or a ListVal with exactly one element. + // Since Broker does not support ListVals, we have to unoll this in the second case. + const Val* index_val; + if ( index->GetType()->Tag() == zeek::TYPE_LIST ) + { + if ( index->AsListVal()->Length() != 1 ) + { + zeek::emit_builtin_error("table with complex index not supported for &broker_store"); + return; + } + + index_val = index->AsListVal()->Idx(0).get(); + } + else + { + index_val = index; + } + + auto broker_index = bro_broker::val_to_data(index_val); + + if ( ! broker_index ) + { + zeek::emit_builtin_error("invalid Broker data conversation for table index"); + return; + } + + switch ( tpe ) + { + case ELEMENT_NEW: + case ELEMENT_CHANGED: + { + broker::optional expiry; + + auto expire_time = GetExpireTime(); + if ( expire_time == 0 ) + // Entry is set to immediately expire. Let's not forward it. + break; + + if ( expire_time > 0 ) + { + if ( attrs->Find(zeek::detail::ATTR_EXPIRE_CREATE) ) + { + // for create expiry, we have to substract the already elapsed time from the expiry. + auto e = expire_time - (network_time - new_entry_val->ExpireAccessTime()); + if ( e <= 0 ) + // element already expired? Let's not insert it. + break; + + expiry = bro_broker::convert_expiry(e); + } + else + expiry = bro_broker::convert_expiry(expire_time); + } + + if ( table_type->IsSet() ) + handle->store.put(std::move(*broker_index), broker::data(), expiry); + else + { + if ( ! new_entry_val ) + { + zeek::emit_builtin_error("did not receive new value for Broker datastore send operation"); + return; + } + + auto new_value = new_entry_val->GetVal().get(); + auto broker_val = bro_broker::val_to_data(new_value); + if ( ! broker_val ) + { + zeek::emit_builtin_error("invalid Broker data conversation for table value"); + return; + } + + handle->store.put(std::move(*broker_index), std::move(*broker_val), expiry); + } + break; + } + + case ELEMENT_REMOVED: + handle->store.erase(std::move(*broker_index)); + break; + + case ELEMENT_EXPIRED: + // we do nothing here. The Broker store does its own expiration - so the element + // should expire at about the same time. + break; + } + } + catch ( InterpreterException& e ) + { + zeek::emit_builtin_error("The previous error was encountered while trying to resolve the &broker_store attribute of the set/table. Potentially the Broker::Store has not been initialized before being used."); + } + } + +ValPtr TableVal::Remove(const Val& index, bool broker_forward) { auto k = MakeHashKey(index); + TableEntryVal* v = k ? AsNonConstTable()->RemoveEntry(k.get()) : nullptr; ValPtr va; @@ -2140,8 +2274,15 @@ ValPtr TableVal::Remove(const Val& index) Modified(); + if ( broker_forward && ! broker_store.empty() ) + SendToStore(&index, nullptr, ELEMENT_REMOVED); + if ( change_func ) - CallChangeFunc(&index, va, ELEMENT_REMOVED); + { + // this is totally cheating around the fact that we need a Intrusive pointer. + IntrusivePtr changefunc_val = RecreateIndex(*(k.get())); + CallChangeFunc(changefunc_val, va, ELEMENT_REMOVED); + } return va; } @@ -2166,10 +2307,14 @@ ValPtr TableVal::Remove(const HashKey& k) Modified(); - if ( change_func && va ) + if ( va && ( change_func || ! broker_store.empty() ) ) { auto index = table_hash->RecoverVals(k); - CallChangeFunc(index.get(), va, ELEMENT_REMOVED); + if ( ! broker_store.empty() ) + SendToStore(index.get(), nullptr, ELEMENT_REMOVED); + + if ( change_func && va ) + CallChangeFunc(index, va, ELEMENT_REMOVED); } return va; @@ -2473,7 +2618,8 @@ void TableVal::DoExpire(double t) { if ( ! idx ) idx = RecreateIndex(*k); - CallChangeFunc(idx.get(), v->GetVal(), ELEMENT_EXPIRED); + + CallChangeFunc(idx, v->GetVal(), ELEMENT_EXPIRED); } delete v; diff --git a/src/Val.h b/src/Val.h index 33f0d2290c..0a99526fbb 100644 --- a/src/Val.h +++ b/src/Val.h @@ -784,9 +784,11 @@ public: * @param index The key to assign. * @param new_val The value to assign at the index. For a set, this * must be nullptr. + * @param broker_forward Controls if the value will be forwarded to attached + * Broker stores. * @return True if the assignment type-checked. */ - bool Assign(ValPtr index, ValPtr new_val); + bool Assign(ValPtr index, ValPtr new_val, bool broker_forward = true); /** * Assigns a value at an associated index in the table (or in the @@ -796,10 +798,12 @@ public: * @param k A precomputed hash key to use. * @param new_val The value to assign at the index. For a set, this * must be nullptr. + * @param broker_forward Controls if the value will be forwarded to attached + * Broker stores. * @return True if the assignment type-checked. */ bool Assign(ValPtr index, std::unique_ptr k, - ValPtr new_val); + ValPtr new_val, bool broker_forward = true); // Returns true if the assignment typechecked, false if not. The // methods take ownership of new_val, but not of the index. If we're @@ -921,12 +925,14 @@ public: /** * Remove an element from the table and return it. * @param index The index to remove. + * @param broker_forward Controls if the remove operation will be forwarded to attached + * Broker stores. * @return The value associated with the index if it exists, else nullptr. * For a sets that don't really contain associated values, a placeholder * value is returned to differentiate it from non-existent index (nullptr), * but otherwise has no meaning in relation to the set's contents. */ - ValPtr Remove(const Val& index); + ValPtr Remove(const Val& index, bool broker_forward = true); /** * Same as Remove(const Val&), but uses a precomputed hash key. @@ -1017,6 +1023,22 @@ public: // on zeek::RecordTypes. static void DoneParsing(); + /** + * Sets the name of the Broker store that is backing this table. + * @param store store that is backing this table. + */ + void SetBrokerStore(const std::string& store) { broker_store = store; } + + /** + * Disable change notification processing of &on_change until re-enabled. + */ + void DisableChangeNotifications() { in_change_func = true; } + + /** + * Re-enables change notifcations after being disabled by DisableChangeNotifications. + */ + void EnableChangeNotifications() { in_change_func = false; } + protected: void Init(zeek::TableTypePtr t); @@ -1049,10 +1071,13 @@ protected: // Enum for the different kinds of changes an &on_change handler can see enum OnChangeType { ELEMENT_NEW, ELEMENT_CHANGED, ELEMENT_REMOVED, ELEMENT_EXPIRED }; - // Calls &change_func. Does not take ownership of values. (Refs if needed). - void CallChangeFunc(const Val* index, const ValPtr& old_value, + // Calls &change_func. + void CallChangeFunc(const ValPtr& index, const ValPtr& old_value, OnChangeType tpe); + // Sends data on to backing Broker Store + void SendToStore(const Val* index, const TableEntryVal* new_entry_val, OnChangeType tpe); + ValPtr DoClone(CloneState* state) override; zeek::TableTypePtr table_type; @@ -1065,6 +1090,7 @@ protected: PrefixTable* subnets; ValPtr def_val; zeek::detail::ExprPtr change_func; + std::string broker_store; // prevent recursion of change functions bool in_change_func = false; diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index b42ebc2057..03805c5810 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -151,6 +151,8 @@ void Manager::InitPostScript() log_topic_func = get_option("Broker::log_topic")->AsFunc(); log_id_type = zeek::id::find_type("Log::ID")->AsEnumType(); writer_id_type = zeek::id::find_type("Log::Writer")->AsEnumType(); + zeek_table_manager = get_option("Broker::table_store_master")->AsBool(); + zeek_table_db_directory = get_option("Broker::table_store_db_directory")->AsString()->CheckString(); opaque_of_data_type = zeek::make_intrusive("Broker::Data"); opaque_of_set_iterator = zeek::make_intrusive("Broker::SetIterator"); @@ -212,6 +214,54 @@ void Manager::InitPostScript() reporter->FatalError("Failed to register broker subscriber with iosource_mgr"); if ( ! iosource_mgr->RegisterFd(bstate->status_subscriber.fd(), this) ) reporter->FatalError("Failed to register broker status subscriber with iosource_mgr"); + + bstate->subscriber.add_topic(broker::topics::store_events, true); + + InitializeBrokerStoreForwarding(); + } + +void Manager::InitializeBrokerStoreForwarding() + { + const auto& globals = zeek::detail::global_scope()->Vars(); + + for ( const auto& global : globals ) + { + auto& id = global.second; + if ( id->HasVal() && id->GetAttr(zeek::detail::ATTR_BACKEND) ) + { + const auto& attr = id->GetAttr(zeek::detail::ATTR_BACKEND); + auto e = static_cast(attr->GetExpr()->Eval(nullptr)->AsEnum()); + auto storename = std::string("___sync_store_") + global.first; + id->GetVal()->AsTableVal()->SetBrokerStore(storename); + AddForwardedStore(storename, {zeek::NewRef{}, id->GetVal()->AsTableVal()}); + + // We only create masters here. For clones, we do all the work of setting up + // the forwarding - but we do not try to initialize the clone. We can only initialize + // the clone, once a node has a connection to a master. This is currently done in scriptland + // in scripts/base/frameworks/cluster/broker-stores.zeek. Once the ALM transport is ready + // we can change over to doing this here. + if ( ! zeek_table_manager ) + continue; + + auto backend = bro_broker::to_backend_type(e); + auto suffix = ".store"; + + switch ( backend ) { + case broker::backend::sqlite: + suffix = ".sqlite"; + break; + case broker::backend::rocksdb: + suffix = ".rocksdb"; + break; + default: + break; + } + + auto path = zeek_table_db_directory + "/" + storename + suffix; + + MakeMaster(storename, backend, broker::backend_options{{"path", path}}); + } + } } void Manager::Terminate() @@ -872,7 +922,8 @@ void Manager::Process() { // Ensure that time gets update before processing broker messages, or events // based on them might get scheduled wrong. - net_update_time(current_time()); + if ( use_real_time ) + net_update_time(current_time()); bool had_input = false; @@ -906,6 +957,12 @@ void Manager::Process() auto& topic = broker::get_topic(message); auto& msg = broker::get_data(message); + if ( broker::topics::store_events.prefix_of(topic) ) + { + ProcessStoreEvent(std::move(msg)); + continue; + } + try { DispatchMessage(topic, std::move(msg)); @@ -941,6 +998,135 @@ void Manager::Process() } } +void Manager::ProcessStoreEventInsertUpdate(zeek::IntrusivePtr table, const std::string& store_id, const broker::data& key, const broker::data& data, const broker::data& old_value, bool insert) + { + auto type = "Insert"; + if ( ! insert ) + type = "Update"; + + if ( insert ) + { + DBG_LOG(DBG_BROKER, "Store %s: Insert: %s:%s (%s:%s)", store_id.c_str(), to_string(key).c_str(), to_string(data).c_str(), key.get_type_name(), data.get_type_name()); + } + else + { + DBG_LOG(DBG_BROKER, "Store %s: Update: %s->%s (%s)", store_id.c_str(), to_string(old_value).c_str(), to_string(data).c_str(), data.get_type_name()); + } + + if ( table->GetType()->IsSet() && data.get_type() != broker::data::type::none ) + { + reporter->Error("ProcessStoreEvent %s got %s when expecting set", type, data.get_type_name()); + return; + } + + const auto& its = table->GetType()->AsTableType()->GetIndexTypes(); + assert( its.size() == 1 ); + auto zeek_key = data_to_val(key, its[0].get()); + if ( ! zeek_key ) + { + reporter->Error("ProcessStoreEvent %s: could not convert key \"%s\" for store \"%s\" while receiving remote data. This probably means the tables have different types on different nodes.", type, to_string(key).c_str(), store_id.c_str()); + return; + } + + if ( table->GetType()->IsSet() ) + { + table->Assign(zeek_key, nullptr, false); + return; + } + + // it is a table + auto zeek_value = data_to_val(data, table->GetType()->Yield().get()); + if ( ! zeek_value ) + { + reporter->Error("ProcessStoreEvent %s: could not convert value \"%s\" for key \"%s\" in store \"%s\" while receiving remote data. This probably means the tables have different types on different nodes.", type, to_string(data).c_str(), to_string(key).c_str(), store_id.c_str()); + return; + } + + table->Assign(zeek_key, zeek_value, false); + } + +void Manager::ProcessStoreEvent(broker::data msg) + { + if ( auto insert = broker::store_event::insert::make(msg) ) + { + auto storehandle = broker_mgr->LookupStore(insert.store_id()); + if ( ! storehandle ) + return; + + auto table = storehandle->forward_to; + if ( ! table ) + return; + + // We sent this message. Ignore it. + if ( insert.publisher() == storehandle->store_pid ) + return; + + ProcessStoreEventInsertUpdate(table, insert.store_id(), insert.key(), insert.value(), {}, true); + } + else if ( auto update = broker::store_event::update::make(msg) ) + { + auto storehandle = broker_mgr->LookupStore(update.store_id()); + if ( ! storehandle ) + return; + + auto table = storehandle->forward_to; + if ( ! table ) + return; + + // We sent this message. Ignore it. + if ( update.publisher() == storehandle->store_pid ) + return; + + ProcessStoreEventInsertUpdate(table, update.store_id(), update.key(), update.new_value(), update.old_value(), false); + } + else if ( auto erase = broker::store_event::erase::make(msg) ) + { + auto storehandle = broker_mgr->LookupStore(erase.store_id()); + if ( ! storehandle ) + return; + + auto table = storehandle->forward_to; + if ( ! table ) + return; + + // We sent this message. Ignore it. + if ( erase.publisher() == storehandle->store_pid ) + return; + + auto key = erase.key(); + DBG_LOG(DBG_BROKER, "Store %s: Erase key %s", erase.store_id().c_str(), to_string(key).c_str()); + const auto& its = table->GetType()->AsTableType()->GetIndexTypes(); + assert( its.size() == 1 ); + auto zeek_key = data_to_val(key, its[0].get()); + if ( ! zeek_key ) + { + reporter->Error("ProcessStoreEvent: could not convert key \"%s\" for store \"%s\" while receiving remote erase. This probably means the tables have different types on different nodes.", to_string(key).c_str(), insert.store_id().c_str()); + return; + } + + table->Remove(*zeek_key, false); + } + else if ( auto expire = broker::store_event::expire::make(msg) ) + { + // We just ignore expiries - expiring information on the Zeek side is handled by Zeek itself. +#ifdef DEBUG + // let's only debug log for stores that we know. + auto storehandle = broker_mgr->LookupStore(expire.store_id()); + if ( ! storehandle ) + return; + + auto table = storehandle->forward_to; + if ( ! table ) + return; + + DBG_LOG(DBG_BROKER, "Store %s: Store expired key %s", expire.store_id().c_str(), to_string(expire.key()).c_str()); +#endif /* DEBUG */ + } + else + { + reporter->Error("ProcessStoreEvent: Unhandled event type"); + } + } void Manager::ProcessEvent(const broker::topic& topic, broker::zeek::Event ev) { @@ -1406,16 +1592,76 @@ StoreHandleVal* Manager::MakeMaster(const string& name, broker::backend type, data_stores.emplace(name, handle); iosource_mgr->RegisterFd(handle->proxy.mailbox().descriptor(), this); + PrepareForwarding(name); - if ( bstate->endpoint.use_real_time() ) - return handle; + if ( ! bstate->endpoint.use_real_time() ) + // Wait for master to become available/responsive. + // Possibly avoids timeouts in scripts during unit tests. + handle->store.exists(""); + + BrokerStoreToZeekTable(name, handle); - // Wait for master to become available/responsive. - // Possibly avoids timeouts in scripts during unit tests. - handle->store.exists(""); return handle; } +void Manager::BrokerStoreToZeekTable(const std::string& name, const StoreHandleVal* handle) + { + if ( ! handle->forward_to ) + return; + + auto keys = handle->store.keys(); + if ( ! keys ) + return; + + auto set = caf::get_if(&(keys->get_data())); + auto table = handle->forward_to; + const auto& its = table->GetType()->AsTableType()->GetIndexTypes(); + bool is_set = table->GetType()->IsSet(); + assert( its.size() == 1 ); + + // disable &on_change notifications while filling the table. + table->DisableChangeNotifications(); + + for ( const auto key : *set ) + { + auto zeek_key = data_to_val(key, its[0].get()); + if ( ! zeek_key ) + { + reporter->Error("Failed to convert key \"%s\" while importing broker store to table for store \"%s\". Aborting import.", to_string(key).c_str(), name.c_str()); + // just abort - this probably means the types are incompatible + table->EnableChangeNotifications(); + return; + } + + if ( is_set ) + { + table->Assign(zeek_key, nullptr, false); + continue; + } + + auto value = handle->store.get(key); + if ( ! value ) + { + reporter->Error("Failed to load value for key %s while importing Broker store %s to table", to_string(key).c_str(), name.c_str()); + table->EnableChangeNotifications(); + continue; + } + + auto zeek_value = data_to_val(*value, table->GetType()->Yield().get()); + if ( ! zeek_value ) + { + reporter->Error("Could not convert %s to table value while trying to import Broker store %s. Aborting import.", to_string(value).c_str(), name.c_str()); + table->EnableChangeNotifications(); + return; + } + + table->Assign(zeek_key, zeek_value, false); + } + + table->EnableChangeNotifications(); + return; + } + StoreHandleVal* Manager::MakeClone(const string& name, double resync_interval, double stale_interval, double mutation_buffer_interval) @@ -1443,7 +1689,7 @@ StoreHandleVal* Manager::MakeClone(const string& name, double resync_interval, data_stores.emplace(name, handle); iosource_mgr->RegisterFd(handle->proxy.mailbox().descriptor(), this); - + PrepareForwarding(name); return handle; } @@ -1503,4 +1749,32 @@ const Stats& Manager::GetStatistics() return statistics; } +bool Manager::AddForwardedStore(const std::string& name, zeek::IntrusivePtr table) + { + if ( forwarded_stores.find(name) != forwarded_stores.end() ) + { + reporter->Error("same &broker_store %s specified for two different variables", name.c_str()); + return false; + } + + DBG_LOG(DBG_BROKER, "Adding table forward for data store %s", name.c_str()); + forwarded_stores.emplace(name, table); + + PrepareForwarding(name); + return true; + } + +void Manager::PrepareForwarding(const std::string &name) + { + auto handle = LookupStore(name); + if ( ! handle ) + return; + + if ( forwarded_stores.find(name) == forwarded_stores.end() ) + return; + + handle->forward_to = forwarded_stores.at(name); + DBG_LOG(DBG_BROKER, "Resolved table forward for data store %s", name.c_str()); + } + } // namespace bro_broker diff --git a/src/broker/Manager.h b/src/broker/Manager.h index ca079ca890..bcd231aa9e 100644 --- a/src/broker/Manager.h +++ b/src/broker/Manager.h @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -24,6 +25,7 @@ ZEEK_FORWARD_DECLARE_NAMESPACED(Func, zeek); ZEEK_FORWARD_DECLARE_NAMESPACED(Frame, zeek::detail); ZEEK_FORWARD_DECLARE_NAMESPACED(VectorType, zeek); +ZEEK_FORWARD_DECLARE_NAMESPACED(TableVal, zeek); namespace zeek { using VectorTypePtr = zeek::IntrusivePtr; @@ -301,6 +303,17 @@ public: */ StoreHandleVal* LookupStore(const std::string& name); + /** + * Register a Zeek table that is associated with a Broker store that is backing it. This + * causes all changes that happen to the Broker store in the future to be applied to theZzeek + * table. + * A single Broker store can only be forwarded to a single table. + * @param name name of the Broker store. + * @param table pointer to the table/set that is being backed. + * @return true on success, false if the named store is already being forwarded. + */ + bool AddForwardedStore(const std::string& name, zeek::IntrusivePtr table); + /** * Close and unregister a data store. Any existing references to the * store handle will not be able to be used for any data store operations. @@ -346,6 +359,10 @@ public: private: void DispatchMessage(const broker::topic& topic, broker::data msg); + // Process events used for Broker store backed zeek tables + void ProcessStoreEvent(broker::data msg); + // Common functionality for processing insert and update events. + void ProcessStoreEventInsertUpdate(zeek::IntrusivePtr table, const std::string& store_id, const broker::data& key, const broker::data& data, const broker::data& old_value, bool insert); void ProcessEvent(const broker::topic& topic, broker::zeek::Event ev); bool ProcessLogCreate(broker::zeek::LogCreate lc); bool ProcessLogWrite(broker::zeek::LogWrite lw); @@ -354,6 +371,13 @@ private: void ProcessError(broker::error err); void ProcessStoreResponse(StoreHandleVal*, broker::store::response response); void FlushPendingQueries(); + // Initializes the masters for Broker backed Zeek tables when using the &backend attribute + void InitializeBrokerStoreForwarding(); + // Check if a Broker store is associated to a table on the Zeek side. + void PrepareForwarding(const std::string& name); + // Send the content of a Broker store to the backing table. This is typically used + // when a master/clone is created. + void BrokerStoreToZeekTable(const std::string& name, const StoreHandleVal* handle); void Error(const char* format, ...) __attribute__((format (printf, 2, 3))); @@ -388,6 +412,7 @@ private: std::string default_log_topic_prefix; std::shared_ptr bstate; std::unordered_map data_stores; + std::unordered_map> forwarded_stores; std::unordered_map pending_queries; std::vector forwarded_prefixes; @@ -404,6 +429,8 @@ private: zeek::VectorTypePtr vector_of_data_type; zeek::EnumType* log_id_type; zeek::EnumType* writer_id_type; + bool zeek_table_manager = false; + std::string zeek_table_db_directory; static int script_scope; }; diff --git a/src/broker/Store.h b/src/broker/Store.h index cd4c9e2298..b355f848c5 100644 --- a/src/broker/Store.h +++ b/src/broker/Store.h @@ -6,6 +6,7 @@ #include "Trigger.h" #include +#include #include #include @@ -45,6 +46,25 @@ inline zeek::RecordValPtr query_result(zeek::RecordValPtr data) return rval; } +/** + * Convert an expiry from a double (used by Zeek) to the format required by Broker + * @param e: expire interval as double; 0 if no expiry + * @return expire interval in Broker format + */ +static broker::optional convert_expiry(double e) + { + broker::optional ts; + + if ( e ) + { + broker::timespan x; + broker::convert(e, x); + ts = x; + } + + return ts; + } + /** * Used for asynchronous data store queries which use "when" statements. */ @@ -94,13 +114,16 @@ private: class StoreHandleVal : public zeek::OpaqueVal { public: StoreHandleVal(broker::store s) - : zeek::OpaqueVal(bro_broker::opaque_of_store_handle), store{s}, proxy{store} + : zeek::OpaqueVal(bro_broker::opaque_of_store_handle), store{s}, proxy{store}, store_pid{store.frontend_id()} { } void ValDescribe(ODesc* d) const override; broker::store store; broker::store::proxy proxy; + broker::publisher_id store_pid; + // Zeek table that events are forwarded to. + zeek::IntrusivePtr forward_to; protected: diff --git a/src/broker/store.bif b/src/broker/store.bif index 8bca7e1144..81dfe0e44f 100644 --- a/src/broker/store.bif +++ b/src/broker/store.bif @@ -7,20 +7,6 @@ #include "broker/Data.h" #include "Trigger.h" -static broker::optional prepare_expiry(double e) - { - broker::optional ts; - - if ( e ) - { - broker::timespan x; - broker::convert(e, x); - ts = x; - } - - return ts; - } - static bro_broker::StoreHandleVal* to_store_handle(zeek::Val* h) { return dynamic_cast(h); } %%} @@ -263,7 +249,7 @@ function Broker::__put_unique%(h: opaque of Broker::Store, handle->store); auto req_id = handle->proxy.put_unique(std::move(*key), std::move(*val), - prepare_expiry(e)); + bro_broker::convert_expiry(e)); broker_mgr->TrackStoreQuery(handle, req_id, cb); return nullptr; @@ -387,7 +373,7 @@ function Broker::__put%(h: opaque of Broker::Store, return zeek::val_mgr->False(); } - handle->store.put(std::move(*key), std::move(*val), prepare_expiry(e)); + handle->store.put(std::move(*key), std::move(*val), bro_broker::convert_expiry(e)); return zeek::val_mgr->True(); %} @@ -440,7 +426,7 @@ function Broker::__increment%(h: opaque of Broker::Store, k: any, a: any, } handle->store.increment(std::move(*key), std::move(*amount), - prepare_expiry(e)); + bro_broker::convert_expiry(e)); return zeek::val_mgr->True(); %} @@ -470,7 +456,7 @@ function Broker::__decrement%(h: opaque of Broker::Store, k: any, a: any, return zeek::val_mgr->False(); } - handle->store.decrement(std::move(*key), std::move(*amount), prepare_expiry(e)); + handle->store.decrement(std::move(*key), std::move(*amount), bro_broker::convert_expiry(e)); return zeek::val_mgr->True(); %} @@ -500,7 +486,7 @@ function Broker::__append%(h: opaque of Broker::Store, k: any, s: any, return zeek::val_mgr->False(); } - handle->store.append(std::move(*key), std::move(*str), prepare_expiry(e)); + handle->store.append(std::move(*key), std::move(*str), bro_broker::convert_expiry(e)); return zeek::val_mgr->True(); %} @@ -531,7 +517,7 @@ function Broker::__insert_into_set%(h: opaque of Broker::Store, k: any, i: any, } handle->store.insert_into(std::move(*key), std::move(*idx), - prepare_expiry(e)); + bro_broker::convert_expiry(e)); return zeek::val_mgr->True(); %} @@ -569,7 +555,7 @@ function Broker::__insert_into_table%(h: opaque of Broker::Store, k: any, } handle->store.insert_into(std::move(*key), std::move(*idx), - std::move(*val), prepare_expiry(e)); + std::move(*val), bro_broker::convert_expiry(e)); return zeek::val_mgr->True(); %} @@ -600,7 +586,7 @@ function Broker::__remove_from%(h: opaque of Broker::Store, k: any, i: any, } handle->store.remove_from(std::move(*key), std::move(*idx), - prepare_expiry(e)); + bro_broker::convert_expiry(e)); return zeek::val_mgr->True(); %} @@ -630,7 +616,7 @@ function Broker::__push%(h: opaque of Broker::Store, k: any, v: any, return zeek::val_mgr->False(); } - handle->store.push(std::move(*key), std::move(*val), prepare_expiry(e)); + handle->store.push(std::move(*key), std::move(*val), bro_broker::convert_expiry(e)); return zeek::val_mgr->True(); %} @@ -652,7 +638,7 @@ function Broker::__pop%(h: opaque of Broker::Store, k: any, e: interval%): bool return zeek::val_mgr->False(); } - handle->store.pop(std::move(*key), prepare_expiry(e)); + handle->store.pop(std::move(*key), bro_broker::convert_expiry(e)); return zeek::val_mgr->True(); %} diff --git a/src/parse.y b/src/parse.y index 1e163b1432..f42f4b04bd 100644 --- a/src/parse.y +++ b/src/parse.y @@ -5,7 +5,7 @@ // Switching parser table type fixes ambiguity problems. %define lr.type ielr -%expect 111 +%expect 129 %token TOK_ADD TOK_ADD_TO TOK_ADDR TOK_ANY %token TOK_ATENDIF TOK_ATELSE TOK_ATIF TOK_ATIFDEF TOK_ATIFNDEF @@ -24,7 +24,8 @@ %token TOK_ATTR_ADD_FUNC TOK_ATTR_DEFAULT TOK_ATTR_OPTIONAL TOK_ATTR_REDEF %token TOK_ATTR_DEL_FUNC TOK_ATTR_EXPIRE_FUNC %token TOK_ATTR_EXPIRE_CREATE TOK_ATTR_EXPIRE_READ TOK_ATTR_EXPIRE_WRITE -%token TOK_ATTR_RAW_OUTPUT TOK_ATTR_ON_CHANGE +%token TOK_ATTR_RAW_OUTPUT TOK_ATTR_ON_CHANGE TOK_ATTR_BROKER_STORE +%token TOK_ATTR_BROKER_STORE_ALLOW_COMPLEX TOK_ATTR_BACKEND %token TOK_ATTR_PRIORITY TOK_ATTR_LOG TOK_ATTR_ERROR_HANDLER %token TOK_ATTR_TYPE_COLUMN TOK_ATTR_DEPRECATED @@ -1354,7 +1355,7 @@ attr_list: attr: TOK_ATTR_DEFAULT '=' expr - { $$ = new zeek::detail::Attr(zeek::detail::ATTR_DEFAULT, {zeek::AdoptRef{}, $3}); } + { $$ = new zeek::detail::Attr(zeek::detail::ATTR_DEFAULT, {zeek::AdoptRef{}, $3}); } | TOK_ATTR_OPTIONAL { $$ = new zeek::detail::Attr(zeek::detail::ATTR_OPTIONAL); } | TOK_ATTR_REDEF @@ -1365,6 +1366,12 @@ attr: { $$ = new zeek::detail::Attr(zeek::detail::ATTR_DEL_FUNC, {zeek::AdoptRef{}, $3}); } | TOK_ATTR_ON_CHANGE '=' expr { $$ = new zeek::detail::Attr(zeek::detail::ATTR_ON_CHANGE, {zeek::AdoptRef{}, $3}); } + | TOK_ATTR_BROKER_STORE '=' expr + { $$ = new zeek::detail::Attr(zeek::detail::ATTR_BROKER_STORE, {zeek::AdoptRef{}, $3}); } + | TOK_ATTR_BROKER_STORE_ALLOW_COMPLEX + { $$ = new zeek::detail::Attr(zeek::detail::ATTR_BROKER_STORE_ALLOW_COMPLEX); } + | TOK_ATTR_BACKEND '=' expr + { $$ = new zeek::detail::Attr(zeek::detail::ATTR_BACKEND, {zeek::AdoptRef{}, $3}); } | TOK_ATTR_EXPIRE_FUNC '=' expr { $$ = new zeek::detail::Attr(zeek::detail::ATTR_EXPIRE_FUNC, {zeek::AdoptRef{}, $3}); } | TOK_ATTR_EXPIRE_CREATE '=' expr diff --git a/src/scan.l b/src/scan.l index f42c1277ce..2c2562d344 100644 --- a/src/scan.l +++ b/src/scan.l @@ -284,6 +284,9 @@ when return TOK_WHEN; &redef return TOK_ATTR_REDEF; &write_expire return TOK_ATTR_EXPIRE_WRITE; &on_change return TOK_ATTR_ON_CHANGE; +&broker_store return TOK_ATTR_BROKER_STORE; +&broker_allow_complex_type return TOK_ATTR_BROKER_STORE_ALLOW_COMPLEX; +&backend return TOK_ATTR_BACKEND; @deprecated.* { auto num_files = file_stack.length(); diff --git a/src/zeek.bif b/src/zeek.bif index a1df858dc7..f1af07ea36 100644 --- a/src/zeek.bif +++ b/src/zeek.bif @@ -1955,9 +1955,10 @@ function global_ids%(%): id_table rec->Assign(3, zeek::val_mgr->Bool(id->IsEnumConst())); rec->Assign(4, zeek::val_mgr->Bool(id->IsOption())); rec->Assign(5, zeek::val_mgr->Bool(id->IsRedefinable())); + rec->Assign(6, zeek::val_mgr->Bool(id->GetAttr(zeek::detail::ATTR_BACKEND) != zeek::detail::Attr::nil)); if ( id->HasVal() ) - rec->Assign(6, id->GetVal()); + rec->Assign(7, id->GetVal()); auto id_name = zeek::make_intrusive(id->Name()); ids->Assign(std::move(id_name), std::move(rec)); diff --git a/testing/btest/Baseline/broker.store.brokerstore-attr-clone/clonetwo.out b/testing/btest/Baseline/broker.store.brokerstore-attr-clone/clonetwo.out new file mode 100644 index 0000000000..b2bbfd2600 --- /dev/null +++ b/testing/btest/Baseline/broker.store.brokerstore-attr-clone/clonetwo.out @@ -0,0 +1,19 @@ +Peer added +{ +[b] = 3, +[whatever] = 5, +[a] = 3 +} +{ +hi +} +{ +[b] = [a=2, b=d, c={ +elem1, +elem2 +}], +[a] = [a=1, b=c, c={ +elem1, +elem2 +}] +} diff --git a/testing/btest/Baseline/broker.store.brokerstore-attr-clone/master.out b/testing/btest/Baseline/broker.store.brokerstore-attr-clone/master.out new file mode 100644 index 0000000000..b18d9195da --- /dev/null +++ b/testing/btest/Baseline/broker.store.brokerstore-attr-clone/master.out @@ -0,0 +1,38 @@ +Peer added +Peer added +{ +[b] = 3, +[whatever] = 5, +[a] = 3 +} +{ +hi +} +{ +[b] = [a=2, b=d, c={ +elem1, +elem2 +}], +[a] = [a=1, b=c, c={ +elem1, +elem2 +}] +} +{ +[b] = 3, +[whatever] = 5, +[a] = 3 +} +{ +hi +} +{ +[b] = [a=2, b=d, c={ +elem1, +elem2 +}], +[a] = [a=1, b=c, c={ +elem1, +elem2 +}] +} diff --git a/testing/btest/Baseline/broker.store.brokerstore-attr-expire/clone.out b/testing/btest/Baseline/broker.store.brokerstore-attr-expire/clone.out new file mode 100644 index 0000000000..100aaba603 --- /dev/null +++ b/testing/btest/Baseline/broker.store.brokerstore-attr-expire/clone.out @@ -0,0 +1,19 @@ +Peer added +Expiring s: expire_first +Expiring t: b +Expiring t: whatever +Expiring t: a +Expiring r: recb +Expiring s: expire_later +Expiring t: expire_later_in_t_not_with_a +Expiring r: reca +{ + +} +{ + +} +{ + +} +terminating diff --git a/testing/btest/Baseline/broker.store.brokerstore-attr-persistence-clone/output1 b/testing/btest/Baseline/broker.store.brokerstore-attr-persistence-clone/output1 new file mode 100644 index 0000000000..1710727ce2 --- /dev/null +++ b/testing/btest/Baseline/broker.store.brokerstore-attr-persistence-clone/output1 @@ -0,0 +1,20 @@ +{ +[b] = 3, +[whatever] = 5, +[a] = 5 +} +{ +I am really a set!, +Believe me - I am a set, +I am a set! +} +{ +[b] = [a=2, b=d, c={ +elem1, +elem2 +}], +[a] = [a=1, b=c, c={ +elem1, +elem2 +}] +} diff --git a/testing/btest/Baseline/broker.store.brokerstore-attr-persistence-clone/output2 b/testing/btest/Baseline/broker.store.brokerstore-attr-persistence-clone/output2 new file mode 100644 index 0000000000..1710727ce2 --- /dev/null +++ b/testing/btest/Baseline/broker.store.brokerstore-attr-persistence-clone/output2 @@ -0,0 +1,20 @@ +{ +[b] = 3, +[whatever] = 5, +[a] = 5 +} +{ +I am really a set!, +Believe me - I am a set, +I am a set! +} +{ +[b] = [a=2, b=d, c={ +elem1, +elem2 +}], +[a] = [a=1, b=c, c={ +elem1, +elem2 +}] +} diff --git a/testing/btest/Baseline/broker.store.brokerstore-attr-persistence-clone/output3 b/testing/btest/Baseline/broker.store.brokerstore-attr-persistence-clone/output3 new file mode 100644 index 0000000000..1710727ce2 --- /dev/null +++ b/testing/btest/Baseline/broker.store.brokerstore-attr-persistence-clone/output3 @@ -0,0 +1,20 @@ +{ +[b] = 3, +[whatever] = 5, +[a] = 5 +} +{ +I am really a set!, +Believe me - I am a set, +I am a set! +} +{ +[b] = [a=2, b=d, c={ +elem1, +elem2 +}], +[a] = [a=1, b=c, c={ +elem1, +elem2 +}] +} diff --git a/testing/btest/Baseline/broker.store.brokerstore-attr-persistence/output1 b/testing/btest/Baseline/broker.store.brokerstore-attr-persistence/output1 new file mode 100644 index 0000000000..1710727ce2 --- /dev/null +++ b/testing/btest/Baseline/broker.store.brokerstore-attr-persistence/output1 @@ -0,0 +1,20 @@ +{ +[b] = 3, +[whatever] = 5, +[a] = 5 +} +{ +I am really a set!, +Believe me - I am a set, +I am a set! +} +{ +[b] = [a=2, b=d, c={ +elem1, +elem2 +}], +[a] = [a=1, b=c, c={ +elem1, +elem2 +}] +} diff --git a/testing/btest/Baseline/broker.store.brokerstore-attr-persistence/output2 b/testing/btest/Baseline/broker.store.brokerstore-attr-persistence/output2 new file mode 100644 index 0000000000..1710727ce2 --- /dev/null +++ b/testing/btest/Baseline/broker.store.brokerstore-attr-persistence/output2 @@ -0,0 +1,20 @@ +{ +[b] = 3, +[whatever] = 5, +[a] = 5 +} +{ +I am really a set!, +Believe me - I am a set, +I am a set! +} +{ +[b] = [a=2, b=d, c={ +elem1, +elem2 +}], +[a] = [a=1, b=c, c={ +elem1, +elem2 +}] +} diff --git a/testing/btest/Baseline/broker.store.brokerstore-attr-simple/clone.out b/testing/btest/Baseline/broker.store.brokerstore-attr-simple/clone.out new file mode 100644 index 0000000000..b2bbfd2600 --- /dev/null +++ b/testing/btest/Baseline/broker.store.brokerstore-attr-simple/clone.out @@ -0,0 +1,19 @@ +Peer added +{ +[b] = 3, +[whatever] = 5, +[a] = 3 +} +{ +hi +} +{ +[b] = [a=2, b=d, c={ +elem1, +elem2 +}], +[a] = [a=1, b=c, c={ +elem1, +elem2 +}] +} diff --git a/testing/btest/Baseline/broker.store.brokerstore-backend-invalid/.stderr b/testing/btest/Baseline/broker.store.brokerstore-backend-invalid/.stderr new file mode 100644 index 0000000000..7894cdb98f --- /dev/null +++ b/testing/btest/Baseline/broker.store.brokerstore-backend-invalid/.stderr @@ -0,0 +1,5 @@ +error in /this/is/a/path/brokerstore-backend-invalid.zeek, line 12: &backend only supports one-element set/table indexes (&backend=Broker::MEMORY) +error in /this/is/a/path/broker.store.brokerstore-backend-invalid/brokerstore-backend-invalid.zeek, line 13: &backend only supports atomic types as table value (&backend=Broker::MEMORY) +error in /this/is/a/path/broker.store.brokerstore-backend-invalid/brokerstore-backend-invalid.zeek, line 14: &backend and &read_expire cannot be used simultaneously (&read_expire=5.0 secs, &backend=Broker::MEMORY) +error in /this/is/a/path/broker.store.brokerstore-backend-invalid/brokerstore-backend-invalid.zeek, line 15: &backend and &broker_store cannot be used simultaneously (&broker_store=store, &backend=Broker::MEMORY) +error in /this/is/a/path/broker.store.brokerstore-backend-invalid/brokerstore-backend-invalid.zeek, line 16: &backend only applicable to global sets/tables (&backend=Broker::MEMORY) diff --git a/testing/btest/Baseline/broker.store.brokerstore-backend-simple-incompatible/worker-1..stderr b/testing/btest/Baseline/broker.store.brokerstore-backend-simple-incompatible/worker-1..stderr new file mode 100644 index 0000000000..ea16941541 --- /dev/null +++ b/testing/btest/Baseline/broker.store.brokerstore-backend-simple-incompatible/worker-1..stderr @@ -0,0 +1,3 @@ +error: ProcessStoreEvent Insert: could not convert value "b" for key "a" in store "___sync_store_TestModule::s" while receiving remote data. This probably means the tables have different types on different nodes. +error: ProcessStoreEvent Insert: could not convert key "a" for store "___sync_store_TestModule::t" while receiving remote data. This probably means the tables have different types on different nodes. +received termination signal diff --git a/testing/btest/Baseline/broker.store.brokerstore-backend-simple-reverse/clone.out b/testing/btest/Baseline/broker.store.brokerstore-backend-simple-reverse/clone.out new file mode 100644 index 0000000000..06d6a343ba --- /dev/null +++ b/testing/btest/Baseline/broker.store.brokerstore-backend-simple-reverse/clone.out @@ -0,0 +1,18 @@ +{ +[b] = 3, +[whatever] = 5, +[a] = 3 +} +{ +hi +} +{ +[b] = [a=2, b=d, c={ +elem1, +elem2 +}], +[a] = [a=1, b=c, c={ +elem1, +elem2 +}] +} diff --git a/testing/btest/Baseline/broker.store.brokerstore-backend-simple-reverse/master.out b/testing/btest/Baseline/broker.store.brokerstore-backend-simple-reverse/master.out new file mode 100644 index 0000000000..06d6a343ba --- /dev/null +++ b/testing/btest/Baseline/broker.store.brokerstore-backend-simple-reverse/master.out @@ -0,0 +1,18 @@ +{ +[b] = 3, +[whatever] = 5, +[a] = 3 +} +{ +hi +} +{ +[b] = [a=2, b=d, c={ +elem1, +elem2 +}], +[a] = [a=1, b=c, c={ +elem1, +elem2 +}] +} diff --git a/testing/btest/Baseline/broker.store.brokerstore-backend-simple/clone.out b/testing/btest/Baseline/broker.store.brokerstore-backend-simple/clone.out new file mode 100644 index 0000000000..06d6a343ba --- /dev/null +++ b/testing/btest/Baseline/broker.store.brokerstore-backend-simple/clone.out @@ -0,0 +1,18 @@ +{ +[b] = 3, +[whatever] = 5, +[a] = 3 +} +{ +hi +} +{ +[b] = [a=2, b=d, c={ +elem1, +elem2 +}], +[a] = [a=1, b=c, c={ +elem1, +elem2 +}] +} diff --git a/testing/btest/Baseline/broker.store.brokerstore-backend-simple/master.out b/testing/btest/Baseline/broker.store.brokerstore-backend-simple/master.out new file mode 100644 index 0000000000..06d6a343ba --- /dev/null +++ b/testing/btest/Baseline/broker.store.brokerstore-backend-simple/master.out @@ -0,0 +1,18 @@ +{ +[b] = 3, +[whatever] = 5, +[a] = 3 +} +{ +hi +} +{ +[b] = [a=2, b=d, c={ +elem1, +elem2 +}], +[a] = [a=1, b=c, c={ +elem1, +elem2 +}] +} diff --git a/testing/btest/Baseline/broker.store.brokerstore-backend-sqlite-incompatible/.stderr b/testing/btest/Baseline/broker.store.brokerstore-backend-sqlite-incompatible/.stderr new file mode 100644 index 0000000000..ce13e7d1a4 --- /dev/null +++ b/testing/btest/Baseline/broker.store.brokerstore-backend-sqlite-incompatible/.stderr @@ -0,0 +1 @@ +error: Failed to convert key "a" while importing broker store to table for store "___sync_store_TestModule::t". Aborting import. diff --git a/testing/btest/Baseline/broker.store.brokerstore-backend-sqlite/clone.out b/testing/btest/Baseline/broker.store.brokerstore-backend-sqlite/clone.out new file mode 100644 index 0000000000..06d6a343ba --- /dev/null +++ b/testing/btest/Baseline/broker.store.brokerstore-backend-sqlite/clone.out @@ -0,0 +1,18 @@ +{ +[b] = 3, +[whatever] = 5, +[a] = 3 +} +{ +hi +} +{ +[b] = [a=2, b=d, c={ +elem1, +elem2 +}], +[a] = [a=1, b=c, c={ +elem1, +elem2 +}] +} diff --git a/testing/btest/Baseline/broker.store.brokerstore-backend-sqlite/master.out b/testing/btest/Baseline/broker.store.brokerstore-backend-sqlite/master.out new file mode 100644 index 0000000000..06d6a343ba --- /dev/null +++ b/testing/btest/Baseline/broker.store.brokerstore-backend-sqlite/master.out @@ -0,0 +1,18 @@ +{ +[b] = 3, +[whatever] = 5, +[a] = 3 +} +{ +hi +} +{ +[b] = [a=2, b=d, c={ +elem1, +elem2 +}], +[a] = [a=1, b=c, c={ +elem1, +elem2 +}] +} diff --git a/testing/btest/Baseline/coverage.init-default/missing_loads b/testing/btest/Baseline/coverage.init-default/missing_loads index 893a603972..8d6a158c72 100644 --- a/testing/btest/Baseline/coverage.init-default/missing_loads +++ b/testing/btest/Baseline/coverage.init-default/missing_loads @@ -1,3 +1,4 @@ +-./frameworks/cluster/broker-stores.zeek -./frameworks/cluster/nodes/logger.zeek -./frameworks/cluster/nodes/manager.zeek -./frameworks/cluster/nodes/proxy.zeek diff --git a/testing/btest/broker/store/brokerstore-attr-clone.zeek b/testing/btest/broker/store/brokerstore-attr-clone.zeek new file mode 100644 index 0000000000..429c8e6d4f --- /dev/null +++ b/testing/btest/broker/store/brokerstore-attr-clone.zeek @@ -0,0 +1,159 @@ +# Start master and two clones. One clone changes table and the change ends up in master + other clone. + +# @TEST-PORT: BROKER_PORT + +# @TEST-EXEC: btest-bg-run master "zeek -B broker -b ../master.zeek >../master.out" +# @TEST-EXEC: btest-bg-run cloneone "zeek -B broker -b ../cloneone.zeek >../cloneone.out" +# @TEST-EXEC: btest-bg-run clonetwo "zeek -B broker -b ../clonetwo.zeek >../clonetwo.out" +# @TEST-EXEC: btest-bg-wait 15 +# +# @TEST-EXEC: btest-diff master.out +# @TEST-EXEC: btest-diff clonetwo.out + +@TEST-START-FILE master.zeek +redef exit_only_after_terminate = T; + +module TestModule; + +global tablestore: opaque of Broker::Store; +global setstore: opaque of Broker::Store; +global recordstore: opaque of Broker::Store; + +type testrec: record { + a: count; + b: string; + c: set[string]; +}; + +global t: table[string] of count &broker_store="table"; +global s: set[string] &broker_store="set"; +global r: table[string] of testrec &broker_allow_complex_type &broker_store="rec"; + +event zeek_init() + { + Broker::listen("127.0.0.1", to_port(getenv("BROKER_PORT"))); + tablestore = Broker::create_master("table"); + setstore = Broker::create_master("set"); + recordstore = Broker::create_master("rec"); + } + +event dump_tables() + { + print t; + print s; + print r; + } + +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) + { + print "Peer added "; + schedule 5secs { dump_tables() }; + } + +event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) + { + terminate(); + } +@TEST-END-FILE + +@TEST-START-FILE cloneone.zeek +redef exit_only_after_terminate = T; + +module TestModule; + +global tablestore: opaque of Broker::Store; +global setstore: opaque of Broker::Store; +global recordstore: opaque of Broker::Store; + +type testrec: record { + a: count; + b: string; + c: set[string]; +}; + +global t: table[string] of count &broker_store="table"; +global s: set[string] &broker_store="set"; +global r: table[string] of testrec &broker_allow_complex_type &broker_store="rec"; + +event zeek_init() + { + Broker::peer("127.0.0.1", to_port(getenv("BROKER_PORT"))); + } + +event send_stuff_over() + { + print "Inserting stuff"; + t["a"] = 5; + delete t["a"]; + add s["hi"]; + t["a"] = 2; + t["a"] = 3; + t["b"] = 3; + t["c"] = 4; + t["whatever"] = 5; + delete t["c"]; + r["a"] = testrec($a=1, $b="b", $c=set("elem1", "elem2")); + r["a"] = testrec($a=1, $b="c", $c=set("elem1", "elem2")); + r["b"] = testrec($a=2, $b="d", $c=set("elem1", "elem2")); + print t; + print s; + print r; + } + +event killmeplease() + { + terminate(); + } + +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) + { + print "Peer added", endpoint; + tablestore = Broker::create_clone("table"); + setstore = Broker::create_clone("set"); + recordstore = Broker::create_clone("rec"); + schedule 2secs { send_stuff_over() }; + schedule 5secs { killmeplease() }; + } +@TEST-END-FILE + +@TEST-START-FILE clonetwo.zeek +redef exit_only_after_terminate = T; + +module TestModule; + +global tablestore: opaque of Broker::Store; +global setstore: opaque of Broker::Store; +global recordstore: opaque of Broker::Store; + +type testrec: record { + a: count; + b: string; + c: set[string]; +}; + +global t: table[string] of count &broker_store="table"; +global s: set[string] &broker_store="set"; +global r: table[string] of testrec &broker_allow_complex_type &broker_store="rec"; + +event zeek_init() + { + Broker::peer("127.0.0.1", to_port(getenv("BROKER_PORT"))); + } + +event dump_tables() + { + print t; + print s; + print r; + terminate(); + } + +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) + { + print "Peer added"; + tablestore = Broker::create_clone("table"); + setstore = Broker::create_clone("set"); + recordstore = Broker::create_clone("rec"); + schedule 5secs { dump_tables() }; + } +@TEST-END-FILE diff --git a/testing/btest/broker/store/brokerstore-attr-expire.zeek b/testing/btest/broker/store/brokerstore-attr-expire.zeek new file mode 100644 index 0000000000..f43be15deb --- /dev/null +++ b/testing/btest/broker/store/brokerstore-attr-expire.zeek @@ -0,0 +1,189 @@ +# So - this test currently is not really that great. The goal was to test expiration after +# syncing values with Broker. However, it turns out that the delays introduced by Broker seem +# a bit random - and too high to really test this without the test taking forever. +# +# so - instead we just check that expiries do indeed happen - however the ordering is not as +# guaranteed as I would have liked to have it. + + +# @TEST-PORT: BROKER_PORT + +# @TEST-EXEC: btest-bg-run master "zeek -B broker -b ../master.zeek >../master.out" +# @TEST-EXEC: btest-bg-run clone "zeek -B broker -b ../clone.zeek >../clone.out" +# @TEST-EXEC: btest-bg-wait 20 +# +# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-sort btest-diff clone.out + +@TEST-START-FILE master.zeek +redef exit_only_after_terminate = T; +redef table_expire_interval = 0.5sec; + +module TestModule; + +global start_time: time; + +function time_past(): interval + { + return network_time() - start_time; + } + +global tablestore: opaque of Broker::Store; +global setstore: opaque of Broker::Store; +global recordstore: opaque of Broker::Store; + +type testrec: record { + a: count; + b: string; + c: set[string]; +}; + +function change_t(tbl: any, tpe: TableChange, idx: string, idxb: count) + { + if ( tpe == TABLE_ELEMENT_EXPIRED ) + print fmt("Expiring t: %s", idx); + } + +function change_s(tbl: any, tpe: TableChange, idx: string, idbx: count) + { + if ( tpe == TABLE_ELEMENT_EXPIRED ) + print fmt("Expiring s: %s", idx); + } + +function change_r(tbl: any, tpe: TableChange, idx: string, idxb: testrec) + { + if ( tpe == TABLE_ELEMENT_EXPIRED ) + print fmt("Expiring r: %s", idx); + } + +function print_keys() + { + print "Printing keys"; + when ( local s = Broker::keys(tablestore) ) + { + print "keys", s; + } + timeout 2sec + { + print fmt(""); + } + } + +global t: table[string] of count &broker_store="table" &create_expire=4sec &on_change=change_t; +global s: table[string] of count &broker_store="set" &write_expire=3sec &on_change=change_s; +global r: table[string] of testrec &broker_allow_complex_type &broker_store="rec" &write_expire=5sec &on_change=change_r; + +event zeek_init() + { + Broker::listen("127.0.0.1", to_port(getenv("BROKER_PORT"))); + tablestore = Broker::create_master("table"); + setstore = Broker::create_master("set"); + recordstore = Broker::create_master("rec"); + } + +event update_stuff() + { + t["a"] = 3; + t["expire_later_in_t_not_with_a"] = 4; + s["expire_later"] = 2; + r["reca"] = testrec($a=1, $b="c", $c=set("elem1", "elem2")); + } + +event insert_stuff() + { + print "Inserting stuff"; + start_time = network_time(); + t["a"] = 5; + delete t["a"]; + s["expire_first"] = 0; + s["expire_later"] = 1; + t["a"] = 2; + t["b"] = 3; + t["whatever"] = 5; + r["reca"] = testrec($a=1, $b="b", $c=set("elem1", "elem2")); + r["recb"] = testrec($a=2, $b="d", $c=set("elem1", "elem2")); + print t; + print s; + print r; + schedule 1.5sec { update_stuff() }; + } + +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) + { + print "Peer added ", endpoint; + schedule 3secs { insert_stuff() }; + } + +event terminate_me() + { + print "Terminating"; + terminate(); + } + +event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) + { + print_keys(); + schedule 3secs { terminate_me() }; + } +@TEST-END-FILE + +@TEST-START-FILE clone.zeek +redef exit_only_after_terminate = T; +redef table_expire_interval = 0.5sec; + +module TestModule; + +global tablestore: opaque of Broker::Store; +global setstore: opaque of Broker::Store; +global recordstore: opaque of Broker::Store; + +type testrec: record { + a: count; + b: string; + c: set[string]; +}; + +function change_t(tbl: any, tpe: TableChange, idx: string, idxb: count) + { + if ( tpe == TABLE_ELEMENT_EXPIRED ) + print fmt("Expiring t: %s", idx); + } + +function change_s(tbl: any, tpe: TableChange, idx: string, idbx: count) + { + if ( tpe == TABLE_ELEMENT_EXPIRED ) + print fmt("Expiring s: %s", idx); + } + +function change_r(tbl: any, tpe: TableChange, idx: string, idxb: testrec) + { + if ( tpe == TABLE_ELEMENT_EXPIRED ) + print fmt("Expiring r: %s", idx); + } + +global t: table[string] of count &broker_store="table" &create_expire=4sec &on_change=change_t; +global s: table[string] of count &broker_store="set" &write_expire=3sec &on_change=change_s; +global r: table[string] of testrec &broker_allow_complex_type &broker_store="rec" &write_expire=5sec &on_change=change_r; + +event zeek_init() + { + Broker::peer("127.0.0.1", to_port(getenv("BROKER_PORT"))); + } + +event dump_tables() + { + print t; + print s; + print r; + print "terminating"; + terminate(); + } + +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) + { + print "Peer added"; + tablestore = Broker::create_clone("table"); + setstore = Broker::create_clone("set"); + recordstore = Broker::create_clone("rec"); + schedule 15secs { dump_tables() }; + } +@TEST-END-FILE diff --git a/testing/btest/broker/store/brokerstore-attr-persistence-clone.zeek b/testing/btest/broker/store/brokerstore-attr-persistence-clone.zeek new file mode 100644 index 0000000000..bece4f119b --- /dev/null +++ b/testing/btest/broker/store/brokerstore-attr-persistence-clone.zeek @@ -0,0 +1,141 @@ +# @TEST-PORT: BROKER_PORT + +# @TEST-EXEC: zeek -B broker -b one.zeek > output1 +# @TEST-EXEC: btest-bg-run master "cp ../*.sqlite . && zeek -B broker -b ../two.zeek >../output2" +# @TEST-EXEC: btest-bg-run clone "zeek -B broker -b ../three.zeek >../output3" +# @TEST-EXEC: btest-bg-wait 15 + +# @TEST-EXEC: btest-diff output1 +# @TEST-EXEC: btest-diff output2 +# @TEST-EXEC: btest-diff output3 +# @TEST-EXEC: diff output1 output2 +# @TEST-EXEC: diff output2 output3 + +# the first test writes out the sqlite files... + +@TEST-START-FILE one.zeek + +module TestModule; + +global tablestore: opaque of Broker::Store; +global setstore: opaque of Broker::Store; +global recordstore: opaque of Broker::Store; + +type testrec: record { + a: count; + b: string; + c: set[string]; +}; + +global t: table[string] of count &broker_store="table"; +global s: set[string] &broker_store="set"; +global r: table[string] of testrec &broker_allow_complex_type &broker_store="rec"; + +event zeek_init() + { + tablestore = Broker::create_master("table", Broker::SQLITE); + setstore = Broker::create_master("set", Broker::SQLITE); + recordstore = Broker::create_master("rec", Broker::SQLITE); + t["a"] = 5; + t["b"] = 3; + t["c"] = 4; + t["whatever"] = 5; + delete t["c"]; + add s["I am a set!"]; + add s["I am really a set!"]; + add s["Believe me - I am a set"]; + r["a"] = testrec($a=1, $b="b", $c=set("elem1", "elem2")); + r["a"] = testrec($a=1, $b="c", $c=set("elem1", "elem2")); + r["b"] = testrec($a=2, $b="d", $c=set("elem1", "elem2")); + print t; + print s; + print r; + } + +@TEST-END-FILE +@TEST-START-FILE two.zeek + +# read in again - and serve to clones + +redef exit_only_after_terminate = T; + +module TestModule; + +global tablestore: opaque of Broker::Store; +global setstore: opaque of Broker::Store; +global recordstore: opaque of Broker::Store; + +type testrec: record { + a: count; + b: string; + c: set[string]; +}; + +global t: table[string] of count &broker_store="table"; +global s: set[string] &broker_store="set"; +global r: table[string] of testrec &broker_allow_complex_type &broker_store="rec"; + +event zeek_init() + { + Broker::listen("127.0.0.1", to_port(getenv("BROKER_PORT"))); + tablestore = Broker::create_master("table", Broker::SQLITE); + setstore = Broker::create_master("set", Broker::SQLITE); + recordstore = Broker::create_master("rec", Broker::SQLITE); + print t; + print s; + print r; + } + +event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) + { + terminate(); + } + +@TEST-END-FILE + +@TEST-START-FILE three.zeek + +# get copy from master + +redef exit_only_after_terminate = T; + +module TestModule; + +global tablestore: opaque of Broker::Store; +global setstore: opaque of Broker::Store; +global recordstore: opaque of Broker::Store; + +type testrec: record { + a: count; + b: string; + c: set[string]; +}; + + +global t: table[string] of count &broker_store="table"; +global s: set[string] &broker_store="set"; +global r: table[string] of testrec &broker_allow_complex_type &broker_store="rec"; + +event zeek_init() + { + Broker::peer("127.0.0.1", to_port(getenv("BROKER_PORT"))); + } + +event print_me() + { + print t; + print s; + print r; + terminate(); + } + +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) + { + tablestore = Broker::create_clone("table"); + setstore = Broker::create_clone("set"); + recordstore = Broker::create_clone("rec"); + schedule 2sec { print_me() }; + } + + +@TEST-END-FILE diff --git a/testing/btest/broker/store/brokerstore-attr-persistence.zeek b/testing/btest/broker/store/brokerstore-attr-persistence.zeek new file mode 100644 index 0000000000..abd40df407 --- /dev/null +++ b/testing/btest/broker/store/brokerstore-attr-persistence.zeek @@ -0,0 +1,80 @@ +# @TEST-PORT: BROKER_PORT + +# @TEST-EXEC: zeek -B broker -b one.zeek > output1 +# @TEST-EXEC: zeek -B broker -b two.zeek > output2 +# @TEST-EXEC: btest-diff output1 +# @TEST-EXEC: btest-diff output2 +# @TEST-EXEC: diff output1 output2 + +# the first test writes out the sqlite files... + +@TEST-START-FILE one.zeek + +module TestModule; + +global tablestore: opaque of Broker::Store; +global setstore: opaque of Broker::Store; +global recordstore: opaque of Broker::Store; + +type testrec: record { + a: count; + b: string; + c: set[string]; +}; + +global t: table[string] of count &broker_store="table"; +global s: set[string] &broker_store="set"; +global r: table[string] of testrec &broker_allow_complex_type &broker_store="rec"; + +event zeek_init() + { + tablestore = Broker::create_master("table", Broker::SQLITE); + setstore = Broker::create_master("set", Broker::SQLITE); + recordstore = Broker::create_master("rec", Broker::SQLITE); + t["a"] = 5; + t["b"] = 3; + t["c"] = 4; + t["whatever"] = 5; + delete t["c"]; + add s["I am a set!"]; + add s["I am really a set!"]; + add s["Believe me - I am a set"]; + r["a"] = testrec($a=1, $b="b", $c=set("elem1", "elem2")); + r["a"] = testrec($a=1, $b="c", $c=set("elem1", "elem2")); + r["b"] = testrec($a=2, $b="d", $c=set("elem1", "elem2")); + print t; + print s; + print r; + } + +@TEST-END-FILE +@TEST-START-FILE two.zeek + +# the second one reads them in again + +module TestModule; + +global tablestore: opaque of Broker::Store; +global setstore: opaque of Broker::Store; +global recordstore: opaque of Broker::Store; + +type testrec: record { + a: count; + b: string; + c: set[string]; +}; + +global t: table[string] of count &broker_store="table"; +global s: set[string] &broker_store="set"; +global r: table[string] of testrec &broker_allow_complex_type &broker_store="rec"; + +event zeek_init() + { + tablestore = Broker::create_master("table", Broker::SQLITE); + setstore = Broker::create_master("set", Broker::SQLITE); + recordstore = Broker::create_master("rec", Broker::SQLITE); + print t; + print s; + print r; + } +@TEST-END-FILE diff --git a/testing/btest/broker/store/brokerstore-attr-simple.zeek b/testing/btest/broker/store/brokerstore-attr-simple.zeek new file mode 100644 index 0000000000..95ef10454f --- /dev/null +++ b/testing/btest/broker/store/brokerstore-attr-simple.zeek @@ -0,0 +1,109 @@ +# @TEST-PORT: BROKER_PORT + +# @TEST-EXEC: btest-bg-run master "zeek -B broker -b ../master.zeek >../master.out" +# @TEST-EXEC: btest-bg-run clone "zeek -B broker -b ../clone.zeek >../clone.out" +# @TEST-EXEC: btest-bg-wait 15 +# +# @TEST-EXEC: btest-diff clone.out + +@TEST-START-FILE master.zeek +redef exit_only_after_terminate = T; + +module TestModule; + +global tablestore: opaque of Broker::Store; +global setstore: opaque of Broker::Store; +global recordstore: opaque of Broker::Store; + +type testrec: record { + a: count; + b: string; + c: set[string]; +}; + +global t: table[string] of count &broker_store="table"; +global s: set[string] &broker_store="set"; +global r: table[string] of testrec &broker_allow_complex_type &broker_store="rec"; + +event zeek_init() + { + Broker::listen("127.0.0.1", to_port(getenv("BROKER_PORT"))); + tablestore = Broker::create_master("table"); + setstore = Broker::create_master("set"); + recordstore = Broker::create_master("rec"); + } + +event insert_stuff() + { + print "Inserting stuff"; + t["a"] = 5; + delete t["a"]; + add s["hi"]; + t["a"] = 2; + t["a"] = 3; + t["b"] = 3; + t["c"] = 4; + t["whatever"] = 5; + delete t["c"]; + r["a"] = testrec($a=1, $b="b", $c=set("elem1", "elem2")); + r["a"] = testrec($a=1, $b="c", $c=set("elem1", "elem2")); + r["b"] = testrec($a=2, $b="d", $c=set("elem1", "elem2")); + print t; + print s; + print r; + } + +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) + { + print "Peer added ", endpoint; + schedule 3secs { insert_stuff() }; + } + +event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) + { + terminate(); + } + +@TEST-END-FILE + +@TEST-START-FILE clone.zeek +redef exit_only_after_terminate = T; + +module TestModule; + +global tablestore: opaque of Broker::Store; +global setstore: opaque of Broker::Store; +global recordstore: opaque of Broker::Store; + +type testrec: record { + a: count; + b: string; + c: set[string]; +}; + +global t: table[string] of count &broker_store="table"; +global s: set[string] &broker_store="set"; +global r: table[string] of testrec &broker_allow_complex_type &broker_store="rec"; + +event zeek_init() + { + Broker::peer("127.0.0.1", to_port(getenv("BROKER_PORT"))); + } + +event dump_tables() + { + print t; + print s; + print r; + terminate(); + } + +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) + { + print "Peer added"; + tablestore = Broker::create_clone("table"); + setstore = Broker::create_clone("set"); + recordstore = Broker::create_clone("rec"); + schedule 5secs { dump_tables() }; + } +@TEST-END-FILE diff --git a/testing/btest/broker/store/brokerstore-backend-invalid.zeek b/testing/btest/broker/store/brokerstore-backend-invalid.zeek new file mode 100644 index 0000000000..cad1828213 --- /dev/null +++ b/testing/btest/broker/store/brokerstore-backend-invalid.zeek @@ -0,0 +1,16 @@ +# @TEST-EXEC-FAIL: zeek -B broker %INPUT +# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-remove-abspath btest-diff .stderr + +module TestModule; + +type testrec: record { + a: count; + b: string; + c: set[string]; +}; + +global a: table[string, count] of count &backend=Broker::MEMORY; +global b: table[string] of testrec &backend=Broker::MEMORY; +global c: table[string] of count &read_expire=5sec &backend=Broker::MEMORY; +global d: table[string] of count &broker_store="store" &backend=Broker::MEMORY; +global f: count &backend=Broker::MEMORY; diff --git a/testing/btest/broker/store/brokerstore-backend-simple-incompatible.zeek b/testing/btest/broker/store/brokerstore-backend-simple-incompatible.zeek new file mode 100644 index 0000000000..ce07488da0 --- /dev/null +++ b/testing/btest/broker/store/brokerstore-backend-simple-incompatible.zeek @@ -0,0 +1,67 @@ +# @TEST-PORT: BROKER_PORT1 +# @TEST-PORT: BROKER_PORT2 +# @TEST-PORT: BROKER_PORT3 + +# @TEST-EXEC: btest-bg-run manager-1 "ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=manager-1 zeek -B broker ../master.zeek >../master.out" +# @TEST-EXEC: btest-bg-run worker-1 "ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=worker-1 zeek -B broker ../clone.zeek >../clone.out" +# @TEST-EXEC: btest-bg-run worker-2 "ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=worker-2 zeek -B broker ../clone.zeek >../clone2.out" +# @TEST-EXEC: btest-bg-wait 15 +# +# @TEST-EXEC: btest-diff worker-1/.stderr + +@TEST-START-FILE cluster-layout.zeek +redef Cluster::nodes = { + ["manager-1"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT1"))], + ["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT2")), $manager="manager-1", $interface="eth0"], + ["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT3")), $manager="manager-1", $interface="eth0"], +}; +@TEST-END-FILE + + +@TEST-START-FILE master.zeek +redef exit_only_after_terminate = T; +redef Log::enable_local_logging = T; +redef Log::default_rotation_interval = 0secs; + +module TestModule; + +global t: table[string] of count &backend=Broker::MEMORY; +global s: table[string] of string &backend=Broker::MEMORY; + +event zeek_init() + { + t["a"] = 5; + s["a"] = "b"; + print t; + } + +event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) + { + terminate(); + } + +@TEST-END-FILE + +@TEST-START-FILE clone.zeek +redef exit_only_after_terminate = T; +redef Log::enable_local_logging = T; +redef Log::default_rotation_interval = 0secs; + +module TestModule; + +global t: table[count] of count &backend=Broker::MEMORY; +global s: table[string] of count &backend=Broker::MEMORY; + +event dump_tables() + { + print t; + print s; + terminate(); + } + +event Cluster::node_up(name: string, id: string) + { + #print "node up", name; + schedule 4secs { dump_tables() }; + } +@TEST-END-FILE diff --git a/testing/btest/broker/store/brokerstore-backend-simple-reverse.zeek b/testing/btest/broker/store/brokerstore-backend-simple-reverse.zeek new file mode 100644 index 0000000000..c44f66f597 --- /dev/null +++ b/testing/btest/broker/store/brokerstore-backend-simple-reverse.zeek @@ -0,0 +1,158 @@ +# @TEST-PORT: BROKER_PORT1 +# @TEST-PORT: BROKER_PORT2 +# @TEST-PORT: BROKER_PORT3 + +# @TEST-EXEC: btest-bg-run manager-1 "ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=manager-1 zeek -B broker ../master.zeek >../master.out" +# @TEST-EXEC: btest-bg-run worker-1 "ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=worker-1 zeek -B broker ../clone.zeek >../clone.out" +# @TEST-EXEC: btest-bg-run worker-2 "ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=worker-2 zeek -B broker ../clone2.zeek >../clone2.out" +# @TEST-EXEC: btest-bg-wait 40 +# +# @TEST-EXEC: btest-diff master.out +# @TEST-EXEC: btest-diff clone.out +# @TEST-EXEC: diff master.out clone.out +# @TEST-EXEC: diff master.out clone2.out + +@TEST-START-FILE cluster-layout.zeek +redef Cluster::nodes = { + ["manager-1"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT1"))], + ["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT2")), $manager="manager-1", $interface="eth0"], + ["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT3")), $manager="manager-1", $interface="eth0"], +}; +@TEST-END-FILE + + +@TEST-START-FILE master.zeek +redef exit_only_after_terminate = T; +redef Log::enable_local_logging = T; +redef Log::default_rotation_interval = 0secs; + +module TestModule; + +type testrec: record { + a: count; + b: string; + c: set[string]; +}; + +global t: table[string] of count &backend=Broker::MEMORY; +global s: set[string] &backend=Broker::MEMORY; +global r: table[string] of testrec &broker_allow_complex_type &backend=Broker::MEMORY; + +global terminate_count = 0; + +event zeek_init() + { + } + +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) &priority=1 + { + Reporter::info(fmt("Peer added: %s", cat(endpoint))); + } + +event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) + { + Reporter::info(fmt("Peer lost: %s", cat(endpoint))); + terminate_count += 1; + if ( terminate_count == 2) + { + terminate(); + print t; + print s; + print r; + } + } + +@TEST-END-FILE + +@TEST-START-FILE clone.zeek +redef exit_only_after_terminate = T; +redef Log::enable_local_logging = T; +redef Log::default_rotation_interval = 0secs; + +module TestModule; + +type testrec: record { + a: count; + b: string; + c: set[string]; +}; + +global t: table[string] of count &backend=Broker::MEMORY; +global s: set[string] &backend=Broker::MEMORY; +global r: table[string] of testrec &broker_allow_complex_type &backend=Broker::MEMORY; + +event terminate_me() + { + terminate(); + } + +event dump_tables() + { + t["a"] = 5; + delete t["a"]; + add s["hi"]; + t["a"] = 2; + t["a"] = 3; + t["b"] = 3; + t["c"] = 4; + t["whatever"] = 5; + delete t["c"]; + r["a"] = testrec($a=1, $b="b", $c=set("elem1", "elem2")); + r["a"] = testrec($a=1, $b="c", $c=set("elem1", "elem2")); + r["b"] = testrec($a=2, $b="d", $c=set("elem1", "elem2")); + print t; + print s; + print r; + schedule 10sec { terminate_me() }; + } + +event Cluster::node_up(name: string, id: string) + { + Reporter::info(fmt("Node Up: %s", name)); + schedule 5secs { dump_tables() }; + } + +event Broker::announce_masters(masters: set[string]) + { + Reporter::info(fmt("Received announce_masters: %s", cat(masters))); + } + +@TEST-END-FILE + +@TEST-START-FILE clone2.zeek +redef exit_only_after_terminate = T; +redef Log::enable_local_logging = T; +redef Log::default_rotation_interval = 0secs; + +module TestModule; + +type testrec: record { + a: count; + b: string; + c: set[string]; +}; + +global t: table[string] of count &backend=Broker::MEMORY; +global s: set[string] &backend=Broker::MEMORY; +global r: table[string] of testrec &broker_allow_complex_type &backend=Broker::MEMORY; + +event dump_tables() + { + print t; + print s; + print r; + terminate(); + } + +event Broker::announce_masters(masters: set[string]) + { + Reporter::info(fmt("Received announce_masters: %s", cat(masters))); + } + +event Cluster::node_up(name: string, id: string) + { + Reporter::info(fmt("Node Up: %s", name)); + schedule 20secs { dump_tables() }; + } +@TEST-END-FILE + diff --git a/testing/btest/broker/store/brokerstore-backend-simple.zeek b/testing/btest/broker/store/brokerstore-backend-simple.zeek new file mode 100644 index 0000000000..6f47e5d3e0 --- /dev/null +++ b/testing/btest/broker/store/brokerstore-backend-simple.zeek @@ -0,0 +1,98 @@ +# @TEST-PORT: BROKER_PORT1 +# @TEST-PORT: BROKER_PORT2 +# @TEST-PORT: BROKER_PORT3 + +# @TEST-EXEC: btest-bg-run manager-1 "ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=manager-1 zeek -B broker ../master.zeek >../master.out" +# @TEST-EXEC: btest-bg-run worker-1 "ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=worker-1 zeek -B broker ../clone.zeek >../clone.out" +# @TEST-EXEC: btest-bg-run worker-2 "ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=worker-2 zeek -B broker ../clone.zeek >../clone2.out" +# @TEST-EXEC: btest-bg-wait 15 +# +# @TEST-EXEC: btest-diff master.out +# @TEST-EXEC: btest-diff clone.out +# @TEST-EXEC: diff master.out clone.out +# @TEST-EXEC: diff master.out clone2.out + +@TEST-START-FILE cluster-layout.zeek +redef Cluster::nodes = { + ["manager-1"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT1"))], + ["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT2")), $manager="manager-1", $interface="eth0"], + ["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT3")), $manager="manager-1", $interface="eth0"], +}; +@TEST-END-FILE + + +@TEST-START-FILE master.zeek +redef exit_only_after_terminate = T; +redef Log::enable_local_logging = T; +redef Log::default_rotation_interval = 0secs; + +module TestModule; + +type testrec: record { + a: count; + b: string; + c: set[string]; +}; + +global t: table[string] of count &backend=Broker::MEMORY; +global s: set[string] &backend=Broker::MEMORY; +global r: table[string] of testrec &broker_allow_complex_type &backend=Broker::MEMORY; + +event zeek_init() + { + t["a"] = 5; + delete t["a"]; + add s["hi"]; + t["a"] = 2; + t["a"] = 3; + t["b"] = 3; + t["c"] = 4; + t["whatever"] = 5; + delete t["c"]; + r["a"] = testrec($a=1, $b="b", $c=set("elem1", "elem2")); + r["a"] = testrec($a=1, $b="c", $c=set("elem1", "elem2")); + r["b"] = testrec($a=2, $b="d", $c=set("elem1", "elem2")); + print t; + print s; + print r; + } + +event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) + { + terminate(); + } + +@TEST-END-FILE + +@TEST-START-FILE clone.zeek +redef exit_only_after_terminate = T; +redef Log::enable_local_logging = T; +redef Log::default_rotation_interval = 0secs; + +module TestModule; + +type testrec: record { + a: count; + b: string; + c: set[string]; +}; + +global t: table[string] of count &backend=Broker::MEMORY; +global s: set[string] &backend=Broker::MEMORY; +global r: table[string] of testrec &broker_allow_complex_type &backend=Broker::MEMORY; + + +event dump_tables() + { + print t; + print s; + print r; + terminate(); + } + +event Cluster::node_up(name: string, id: string) + { + #print "node up", name; + schedule 4secs { dump_tables() }; + } +@TEST-END-FILE diff --git a/testing/btest/broker/store/brokerstore-backend-sqlite-incompatible.zeek b/testing/btest/broker/store/brokerstore-backend-sqlite-incompatible.zeek new file mode 100644 index 0000000000..c895a9c6e8 --- /dev/null +++ b/testing/btest/broker/store/brokerstore-backend-sqlite-incompatible.zeek @@ -0,0 +1,38 @@ +# @TEST-PORT: BROKER_PORT + +# @TEST-EXEC: zeek -B broker -b one.zeek > output1 +# @TEST-EXEC-FAIL: zeek -B broker -b two.zeek > output2 +# @TEST-EXEC: btest-diff .stderr +# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-remove-abspath btest-diff .stderr + +# the first test writes out the sqlite files... + +@TEST-START-FILE one.zeek + +module TestModule; + +global t: table[string] of string &backend=Broker::SQLITE; + +event zeek_init() + { + t["a"] = "a"; + t["b"] = "b"; + t["c"] = "c"; + print t; + } + +@TEST-END-FILE +@TEST-START-FILE two.zeek + +# the second one reads them in again. Or not because the types are incompatible. + +module TestModule; + +global t: table[count] of count &backend=Broker::SQLITE; + + +event zeek_init() + { + print t; + } +@TEST-END-FILE diff --git a/testing/btest/broker/store/brokerstore-backend-sqlite.zeek b/testing/btest/broker/store/brokerstore-backend-sqlite.zeek new file mode 100644 index 0000000000..eb34a51379 --- /dev/null +++ b/testing/btest/broker/store/brokerstore-backend-sqlite.zeek @@ -0,0 +1,129 @@ +# @TEST-PORT: BROKER_PORT1 +# @TEST-PORT: BROKER_PORT2 +# @TEST-PORT: BROKER_PORT3 + +# @TEST-EXEC: zeek preseed-sqlite.zeek; +# @TEST-EXEC: btest-bg-run manager-1 "ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=manager-1 zeek -B broker ../master.zeek >../master.out" +# @TEST-EXEC: btest-bg-run worker-1 "ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=worker-1 zeek -B broker ../clone.zeek >../clone.out" +# @TEST-EXEC: btest-bg-run worker-2 "ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=worker-2 zeek -B broker ../clone.zeek >../clone2.out" +# @TEST-EXEC: btest-bg-wait 40 +# +# @TEST-EXEC: btest-diff master.out +# @TEST-EXEC: btest-diff clone.out +# @TEST-EXEC: diff master.out clone.out +# @TEST-EXEC: diff master.out clone2.out + +@TEST-START-FILE cluster-layout.zeek +redef Cluster::nodes = { + ["manager-1"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT1"))], + ["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT2")), $manager="manager-1", $interface="eth0"], + ["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT3")), $manager="manager-1", $interface="eth0"], +}; +@TEST-END-FILE + +@TEST-START-FILE preseed-sqlite.zeek + +module TestModule; + +type testrec: record { + a: count; + b: string; + c: set[string]; +}; + +global t: table[string] of count &backend=Broker::SQLITE; +global s: set[string] &backend=Broker::SQLITE; +global r: table[string] of testrec &broker_allow_complex_type &backend=Broker::SQLITE; + +event zeek_init() + { + t["a"] = 5; + delete t["a"]; + add s["hi"]; + t["a"] = 2; + t["a"] = 3; + t["b"] = 3; + t["c"] = 4; + t["whatever"] = 5; + delete t["c"]; + r["a"] = testrec($a=1, $b="b", $c=set("elem1", "elem2")); + r["a"] = testrec($a=1, $b="c", $c=set("elem1", "elem2")); + r["b"] = testrec($a=2, $b="d", $c=set("elem1", "elem2")); + print t; + print s; + print r; + } + +@TEST-END-FILE + +@TEST-START-FILE master.zeek +redef exit_only_after_terminate = T; +redef Log::enable_local_logging = T; +redef Log::default_rotation_interval = 0secs; + +module TestModule; + +type testrec: record { + a: count; + b: string; + c: set[string]; +}; + +function change_function(t: table[string] of count, tpe: TableChange, idxa: string, val: count) + { + print "This should not print"; + print "change_function", idxa, val, tpe; + } + +global t: table[string] of count &backend=Broker::SQLITE &on_change=change_function; +global s: set[string] &backend=Broker::SQLITE; +global r: table[string] of testrec &broker_allow_complex_type &backend=Broker::SQLITE; + +redef Broker::table_store_db_directory = ".."; + +event zeek_init() + { + print t; + print s; + print r; + } + +event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) + { + terminate(); + } + +@TEST-END-FILE + +@TEST-START-FILE clone.zeek +redef exit_only_after_terminate = T; +redef Log::enable_local_logging = T; +redef Log::default_rotation_interval = 0secs; + +module TestModule; + +type testrec: record { + a: count; + b: string; + c: set[string]; +}; + +global t: table[string] of count &backend=Broker::MEMORY; +global s: set[string] &backend=Broker::MEMORY; +global r: table[string] of testrec &broker_allow_complex_type &backend=Broker::MEMORY; + + +event dump_tables() + { + print t; + print s; + print r; + terminate(); + } + +event Cluster::node_up(name: string, id: string) + { + #print "node up", name; + schedule 15secs { dump_tables() }; + } +@TEST-END-FILE