diff --git a/src/Attr.cc b/src/Attr.cc index 72b984e924..6a56fcc275 100644 --- a/src/Attr.cc +++ b/src/Attr.cc @@ -14,7 +14,8 @@ const char* attr_name(attr_tag t) "&read_expire", "&write_expire", "&create_expire", "&raw_output", "&priority", "&group", "&log", "&error_handler", "&type_column", - "(&tracked)", "&deprecated", + "(&tracked)", "&on_change", "&broker_store", + "&deprecated", }; return attr_names[int(t)]; @@ -486,7 +487,7 @@ void Attributes::CheckAttr(Attr* a) { if ( type->Tag() != TYPE_TABLE ) { - Error("&on_change only applicable to tables"); + Error("&on_change only applicable to sets/tables"); break; } @@ -548,6 +549,27 @@ void Attributes::CheckAttr(Attr* a) } break; + case ATTR_BROKER_STORE: + { + if ( type->Tag() != TYPE_TABLE ) + { + Error("&broker_store only applicable to sets/tables"); + break; + } + + const Expr *broker_store = a->AttrExpr(); + if ( broker_store->Type()->Tag() != TYPE_OPAQUE || broker_store->Type()->AsOpaqueType()->Name() != "Broker::Store" ) + Error("&broker_store must take an opaque of Broker::Store"); + + + // Temporary since Broker does not support ListVals - and we cannot easily convert to set/vector + if ( type->AsTableType()->IndexTypes()->length() != 1 ) + { + Error("&broker_store only supports one-element set/table indexes"); + } + + } + case ATTR_TRACKED: // FIXME: Check here for global ID? break; diff --git a/src/Attr.h b/src/Attr.h index e9d75e7ae0..afb112f652 100644 --- a/src/Attr.h +++ b/src/Attr.h @@ -28,6 +28,7 @@ typedef enum { ATTR_TYPE_COLUMN, // for input framework ATTR_TRACKED, // hidden attribute, tracked by NotifierRegistry ATTR_ON_CHANGE, // for table change tracking + ATTR_BROKER_STORE, // for broker-store backed tables ATTR_DEPRECATED, #define NUM_ATTRS (int(ATTR_DEPRECATED) + 1) } attr_tag; diff --git a/src/Val.cc b/src/Val.cc index c5948d981c..921d952217 100644 --- a/src/Val.cc +++ b/src/Val.cc @@ -26,6 +26,7 @@ #include "IPAddr.h" #include "broker/Data.h" +#include "broker/Store.h" #include "threading/formatters/JSON.h" @@ -1415,6 +1416,12 @@ void TableVal::SetAttrs(Attributes* a) change_func = cf->AttrExpr(); change_func->Ref(); } + auto bs = attrs->FindAttr(ATTR_BROKER_STORE); + if ( bs ) + { + broker_store = bs->AttrExpr(); + broker_store->Ref(); + } } void TableVal::CheckExpireAttr(attr_tag at) @@ -1492,13 +1499,18 @@ int TableVal::Assign(Val* index, HashKey* k, Val* new_val) Modified(); - if ( change_func ) + if ( change_func || broker_store ) { auto change_index = index->Ref(); if ( ! change_index ) RecoverIndex(&k_copy); - Val *v = old_entry_val ? old_entry_val->Value() : new_val; - CallChangeFunc(change_index, v, old_entry_val ? element_changed : element_new); + if ( broker_store ) + SendToStore(change_index, new_val, old_entry_val ? element_changed : element_new); + if ( change_func ) + { + Val *v = old_entry_val ? old_entry_val->Value() : new_val; + CallChangeFunc(change_index, v, old_entry_val ? element_changed : element_new); + } Unref(change_index); } @@ -1971,9 +1983,7 @@ void TableVal::CallChangeFunc(const Val* index, Val* old_value, OnChangeType tpe Val* thefunc = change_func->Eval(0); if ( ! thefunc ) - { return; - } if ( thefunc->Type()->Tag() != TYPE_FUNC ) { @@ -2013,6 +2023,82 @@ void TableVal::CallChangeFunc(const Val* index, Val* old_value, OnChangeType tpe } } +void TableVal::SendToStore(const Val* index, const Val* new_value, OnChangeType tpe) + { + if ( ! broker_store || ! index ) + return; + + try + { + Val* thestore = broker_store->Eval(0); + + if ( ! thestore ) + return; + + if ( thestore->Type()->Tag() != TYPE_OPAQUE || broker_store->Type()->AsOpaqueType()->Name() != "Broker::Store" ) + { + thestore->Error("not a Broker::Store"); + Unref(thestore); + return; + } + + auto handle = static_cast(thestore); + if ( index->AsListVal()->Length() != 1 ) + { + builtin_error("table with complex index not supported for &broker_store"); + return; + } + + const auto index_val = index->AsListVal()->Index(0); + auto key_val = new StringVal("test"); + auto broker_key = bro_broker::val_to_data(key_val); + auto broker_index = bro_broker::val_to_data(index_val); + Unref(key_val); + + if ( ! broker_key ) + { + builtin_error("invalid Broker data conversion for &broker_store_key"); + return; + } + if ( ! broker_index ) + { + builtin_error("invalid Broker data conversation for table index"); + return; + } + + switch ( tpe ) + { + case element_new: + case element_changed: + if ( table_type->IsSet() ) + handle->store.insert_into(std::move(*broker_key), std::move(*broker_index)); + else + { + if ( ! new_value ) + { + builtin_error("did not receive new value for broker-store send operation"); + return; + } + auto broker_val = bro_broker::val_to_data(new_value); + if ( ! broker_val ) + { + builtin_error("invalid Broker data conversation for table value"); + return; + } + handle->store.insert_into(std::move(*broker_key), std::move(*broker_index), std::move(*broker_val)); + } + break; + case element_removed: + handle->store.remove_from(std::move(*broker_key), std::move(*broker_index)); + break; + } + } + catch ( InterpreterException& e ) + { + builtin_error("The previous error was encountered while trying to resolve the &broker_store attribute of the set/table. Potentially the Broker::Store has not been initialized before being used."); + } + } + Val* TableVal::Delete(const Val* index) { HashKey* k = ComputeHash(index); @@ -2027,6 +2113,8 @@ Val* TableVal::Delete(const Val* index) Modified(); + if ( broker_store ) + SendToStore(index, nullptr, element_removed); if ( change_func ) CallChangeFunc(index, va, element_removed); @@ -2050,10 +2138,13 @@ Val* TableVal::Delete(const HashKey* k) Modified(); - if ( change_func ) + if ( change_func || broker_store ) { auto index = table_hash->RecoverVals(k); - CallChangeFunc(index, va->Ref(), element_removed); + if ( broker_store ) + SendToStore(index, nullptr, element_removed); + if ( change_func ) + CallChangeFunc(index, va->Ref(), element_removed); Unref(index); } diff --git a/src/Val.h b/src/Val.h index 9124df2f74..bd1a051d6a 100644 --- a/src/Val.h +++ b/src/Val.h @@ -928,6 +928,9 @@ protected: // Calls &change_func. Does not take ownership of values. (Refs if needed). void CallChangeFunc(const Val* index, Val* old_value, OnChangeType tpe); + // Sends data on to backing Broker Store + void SendToStore(const Val* index, const Val* new_value, OnChangeType tpe); + Val* DoClone(CloneState* state) override; TableType* table_type; @@ -940,6 +943,7 @@ protected: PrefixTable* subnets; Val* def_val; Expr* change_func = nullptr; + Expr* broker_store = nullptr; }; class RecordVal : public Val, public notifier::Modifiable { diff --git a/src/parse.y b/src/parse.y index ff65c4b184..58adc2a21d 100644 --- a/src/parse.y +++ b/src/parse.y @@ -5,7 +5,7 @@ // Switching parser table type fixes ambiguity problems. %define lr.type ielr -%expect 111 +%expect 117 %token TOK_ADD TOK_ADD_TO TOK_ADDR TOK_ANY %token TOK_ATENDIF TOK_ATELSE TOK_ATIF TOK_ATIFDEF TOK_ATIFNDEF @@ -24,7 +24,7 @@ %token TOK_ATTR_ADD_FUNC TOK_ATTR_DEFAULT TOK_ATTR_OPTIONAL TOK_ATTR_REDEF %token TOK_ATTR_DEL_FUNC TOK_ATTR_EXPIRE_FUNC %token TOK_ATTR_EXPIRE_CREATE TOK_ATTR_EXPIRE_READ TOK_ATTR_EXPIRE_WRITE -%token TOK_ATTR_RAW_OUTPUT TOK_ATTR_ON_CHANGE +%token TOK_ATTR_RAW_OUTPUT TOK_ATTR_ON_CHANGE TOK_ATTR_BROKER_STORE %token TOK_ATTR_PRIORITY TOK_ATTR_LOG TOK_ATTR_ERROR_HANDLER %token TOK_ATTR_TYPE_COLUMN TOK_ATTR_DEPRECATED @@ -1329,6 +1329,8 @@ attr: { $$ = new Attr(ATTR_DEL_FUNC, $3); } | TOK_ATTR_ON_CHANGE '=' expr { $$ = new Attr(ATTR_ON_CHANGE, $3); } + | TOK_ATTR_BROKER_STORE '=' expr + { $$ = new Attr(ATTR_BROKER_STORE, $3); } | TOK_ATTR_EXPIRE_FUNC '=' expr { $$ = new Attr(ATTR_EXPIRE_FUNC, $3); } | TOK_ATTR_EXPIRE_CREATE '=' expr diff --git a/src/scan.l b/src/scan.l index d8792a1470..704c0452c6 100644 --- a/src/scan.l +++ b/src/scan.l @@ -299,6 +299,7 @@ when return TOK_WHEN; &redef return TOK_ATTR_REDEF; &write_expire return TOK_ATTR_EXPIRE_WRITE; &on_change return TOK_ATTR_ON_CHANGE; +&broker_store return TOK_ATTR_BROKER_STORE; @deprecated.* { auto num_files = file_stack.length(); diff --git a/testing/btest/language/brokerstore.test b/testing/btest/language/brokerstore.test new file mode 100644 index 0000000000..021a0d7463 --- /dev/null +++ b/testing/btest/language/brokerstore.test @@ -0,0 +1,24 @@ +# @TEST-EXEC: zeek %INPUT >output +# @TEST-EXEC: btest-diff output + +module TestModule; + +global tablestore: opaque of Broker::Store; +global setstore: opaque of Broker::Store; + +global t: table[string] of count &broker_store=tablestore; +global s: set[string] &broker_store=setstore; + +event zeek_init() + { + tablestore = Broker::create_master("table"); + setstore = Broker::create_master("set"); + print "inserting"; + t["a"] = 5; + add s["hi"]; + print "changing"; + t["a"] = 2; + print "deleting"; + delete t["a"]; + delete s["hi"]; + }