BrokerStore<->Zeek table - introdude &backend attribute

The &backend attribute allows for a much more convenient way of
interacting with brokerstores. One does not need to create a broker
store anymore - instead all of this is done internally.

The current state of this partially works. This should work fine for
persistence - but clones are currently not yet correctly attached.
This commit is contained in:
Johanna Amann 2020-06-30 16:33:52 -07:00
parent 43d2289754
commit 318a72c303
10 changed files with 187 additions and 3 deletions

View file

@ -25,6 +25,10 @@ export {
## A negative/zero value indicates to never buffer commands. ## A negative/zero value indicates to never buffer commands.
const default_clone_mutation_buffer_interval = 2min &redef; const default_clone_mutation_buffer_interval = 2min &redef;
const auto_store_master = T &redef;
const auto_store_db_directory = "." &redef;
## Whether a data store query could be completed or not. ## Whether a data store query could be completed or not.
type QueryStatus: enum { type QueryStatus: enum {
SUCCESS, SUCCESS,

View file

@ -249,6 +249,10 @@ export {
global nodeid_topic: function(id: string): string; global nodeid_topic: function(id: string): string;
} }
@if ( Cluster::is_enabled() && Cluster::local_node_type() != Cluster::MANAGER ) )
redef Broker::store_master = T;
@endif
global active_worker_ids: set[string] = set(); global active_worker_ids: set[string] = set();
type NamedNode: record { type NamedNode: record {

View file

@ -19,7 +19,7 @@ const char* attr_name(attr_tag t)
"&read_expire", "&write_expire", "&create_expire", "&read_expire", "&write_expire", "&create_expire",
"&raw_output", "&priority", "&raw_output", "&priority",
"&group", "&log", "&error_handler", "&type_column", "&group", "&log", "&error_handler", "&type_column",
"(&tracked)", "&on_change", "&broker_store", "(&tracked)", "&on_change", "&broker_store", "&backend",
"&deprecated", "&deprecated",
}; };
@ -616,6 +616,43 @@ void Attributes::CheckAttr(Attr* a)
} }
break; break;
case ATTR_BACKEND:
{
if ( ! global_var || type->Tag() != TYPE_TABLE )
{
Error("&broker_store only applicable to global sets/tables");
break;
}
// cannot do better equality check - the broker types are not actually existing yet when we
// are here. We will do that later - before actually attaching to a broker store
if ( a->GetExpr()->GetType()->Tag() != TYPE_ENUM )
{
Error("&broker_store must take an enum argument");
break;
}
// Temporary since Broker does not support ListVals - and we cannot easily convert to set/vector
if ( type->AsTableType()->IndexTypes().size() != 1 )
{
Error("&backend only supports one-element set/table indexes");
}
if ( Find(ATTR_EXPIRE_FUNC ) )
{
Error("&backend and &expire_func cannot be used simultaneously");
}
if ( Find(ATTR_EXPIRE_READ) )
{
Error("&backend and &read_expire cannot be used simultaneously");
}
if ( Find(ATTR_BROKER_STORE) )
{
Error("&backend and &broker_store cannot be used simultaneously");
}
break;
}
case ATTR_BROKER_STORE: case ATTR_BROKER_STORE:
{ {
if ( type->Tag() != TYPE_TABLE ) if ( type->Tag() != TYPE_TABLE )
@ -635,6 +672,7 @@ void Attributes::CheckAttr(Attr* a)
{ {
Error("&broker_store only supports one-element set/table indexes"); Error("&broker_store only supports one-element set/table indexes");
} }
if ( Find(ATTR_EXPIRE_FUNC ) ) if ( Find(ATTR_EXPIRE_FUNC ) )
{ {
Error("&broker_store and &expire_func cannot be used simultaneously"); Error("&broker_store and &expire_func cannot be used simultaneously");

View file

@ -33,6 +33,7 @@ enum [[deprecated("Remove in v4.1. Use zeek::detail::attr_tag instead.")]] attr_
ATTR_TRACKED, // hidden attribute, tracked by NotifierRegistry ATTR_TRACKED, // hidden attribute, tracked by NotifierRegistry
ATTR_ON_CHANGE, // for table change tracking ATTR_ON_CHANGE, // for table change tracking
ATTR_BROKER_STORE, // for broker-store backed tables ATTR_BROKER_STORE, // for broker-store backed tables
ATTR_BACKEND, // for broker-store backed tabled
ATTR_DEPRECATED, ATTR_DEPRECATED,
NUM_ATTRS // this item should always be last NUM_ATTRS // this item should always be last
}; };
@ -58,6 +59,7 @@ enum attr_tag {
ATTR_TRACKED, // hidden attribute, tracked by NotifierRegistry ATTR_TRACKED, // hidden attribute, tracked by NotifierRegistry
ATTR_ON_CHANGE, // for table change tracking ATTR_ON_CHANGE, // for table change tracking
ATTR_BROKER_STORE, // for broker-store backed tables ATTR_BROKER_STORE, // for broker-store backed tables
ATTR_BACKEND, // for broker-store backed tabled
ATTR_DEPRECATED, ATTR_DEPRECATED,
NUM_ATTRS // this item should always be last NUM_ATTRS // this item should always be last
}; };

View file

@ -1030,6 +1030,8 @@ public:
// on zeek::RecordTypes. // on zeek::RecordTypes.
static void DoneParsing(); static void DoneParsing();
void SetBrokerStore(const std::string& store) { broker_store = store; }
protected: protected:
void Init(IntrusivePtr<zeek::TableType> t); void Init(IntrusivePtr<zeek::TableType> t);

View file

@ -151,6 +151,8 @@ void Manager::InitPostScript()
log_topic_func = get_option("Broker::log_topic")->AsFunc(); log_topic_func = get_option("Broker::log_topic")->AsFunc();
log_id_type = zeek::id::find_type("Log::ID")->AsEnumType(); log_id_type = zeek::id::find_type("Log::ID")->AsEnumType();
writer_id_type = zeek::id::find_type("Log::Writer")->AsEnumType(); writer_id_type = zeek::id::find_type("Log::Writer")->AsEnumType();
zeek_table_manager = get_option("Broker::auto_store_master")->AsBool();
zeek_table_db_directory = get_option("Broker::auto_store_db_directory")->AsString()->CheckString();
opaque_of_data_type = make_intrusive<zeek::OpaqueType>("Broker::Data"); opaque_of_data_type = make_intrusive<zeek::OpaqueType>("Broker::Data");
opaque_of_set_iterator = make_intrusive<zeek::OpaqueType>("Broker::SetIterator"); opaque_of_set_iterator = make_intrusive<zeek::OpaqueType>("Broker::SetIterator");
@ -214,6 +216,35 @@ void Manager::InitPostScript()
reporter->FatalError("Failed to register broker status subscriber with iosource_mgr"); reporter->FatalError("Failed to register broker status subscriber with iosource_mgr");
bstate->subscriber.add_topic(broker::topics::store_events, true); bstate->subscriber.add_topic(broker::topics::store_events, true);
InitializeBrokerStoreForwarding();
}
void Manager::InitializeBrokerStoreForwarding()
{
const auto& globals = global_scope()->Vars();
for ( const auto& global : globals )
{
auto& id = global.second;
if ( id->HasVal() && id->GetAttr(zeek::detail::ATTR_BACKEND) )
{
const auto& attr = id->GetAttr(zeek::detail::ATTR_BACKEND);
auto e = static_cast<BifEnum::Broker::BackendType>(attr->GetExpr()->Eval(nullptr)->AsEnum());
auto storename = std::string("___sync_store_") + global.first;
id->GetVal()->AsTableVal()->SetBrokerStore(storename);
AddForwardedStore(storename, {NewRef{}, id->GetVal()->AsTableVal()});
auto backend = bro_broker::to_backend_type(e);
if ( zeek_table_manager )
MakeMaster(storename, backend, broker::backend_options{});
else
{
MakeClone(storename);
}
}
}
} }
void Manager::Terminate() void Manager::Terminate()

View file

@ -355,6 +355,7 @@ private:
void ProcessError(broker::error err); void ProcessError(broker::error err);
void ProcessStoreResponse(StoreHandleVal*, broker::store::response response); void ProcessStoreResponse(StoreHandleVal*, broker::store::response response);
void FlushPendingQueries(); void FlushPendingQueries();
void InitializeBrokerStoreForwarding();
// Check if a broker store is associated to a table on the Zeek side. // Check if a broker store is associated to a table on the Zeek side.
void CheckForwarding(const std::string& name); void CheckForwarding(const std::string& name);
// Send the content of a broker store to the backing table. This is typically used // Send the content of a broker store to the backing table. This is typically used
@ -411,6 +412,8 @@ private:
IntrusivePtr<zeek::VectorType> vector_of_data_type; IntrusivePtr<zeek::VectorType> vector_of_data_type;
zeek::EnumType* log_id_type; zeek::EnumType* log_id_type;
zeek::EnumType* writer_id_type; zeek::EnumType* writer_id_type;
bool zeek_table_manager = false;
std::string zeek_table_db_directory;
static int script_scope; static int script_scope;
}; };

View file

@ -5,7 +5,7 @@
// Switching parser table type fixes ambiguity problems. // Switching parser table type fixes ambiguity problems.
%define lr.type ielr %define lr.type ielr
%expect 117 %expect 123
%token TOK_ADD TOK_ADD_TO TOK_ADDR TOK_ANY %token TOK_ADD TOK_ADD_TO TOK_ADDR TOK_ANY
%token TOK_ATENDIF TOK_ATELSE TOK_ATIF TOK_ATIFDEF TOK_ATIFNDEF %token TOK_ATENDIF TOK_ATELSE TOK_ATIF TOK_ATIFDEF TOK_ATIFNDEF
@ -24,7 +24,7 @@
%token TOK_ATTR_ADD_FUNC TOK_ATTR_DEFAULT TOK_ATTR_OPTIONAL TOK_ATTR_REDEF %token TOK_ATTR_ADD_FUNC TOK_ATTR_DEFAULT TOK_ATTR_OPTIONAL TOK_ATTR_REDEF
%token TOK_ATTR_DEL_FUNC TOK_ATTR_EXPIRE_FUNC %token TOK_ATTR_DEL_FUNC TOK_ATTR_EXPIRE_FUNC
%token TOK_ATTR_EXPIRE_CREATE TOK_ATTR_EXPIRE_READ TOK_ATTR_EXPIRE_WRITE %token TOK_ATTR_EXPIRE_CREATE TOK_ATTR_EXPIRE_READ TOK_ATTR_EXPIRE_WRITE
%token TOK_ATTR_RAW_OUTPUT TOK_ATTR_ON_CHANGE TOK_ATTR_BROKER_STORE %token TOK_ATTR_RAW_OUTPUT TOK_ATTR_ON_CHANGE TOK_ATTR_BROKER_STORE TOK_ATTR_BACKEND
%token TOK_ATTR_PRIORITY TOK_ATTR_LOG TOK_ATTR_ERROR_HANDLER %token TOK_ATTR_PRIORITY TOK_ATTR_LOG TOK_ATTR_ERROR_HANDLER
%token TOK_ATTR_TYPE_COLUMN TOK_ATTR_DEPRECATED %token TOK_ATTR_TYPE_COLUMN TOK_ATTR_DEPRECATED
@ -1367,6 +1367,8 @@ attr:
{ $$ = new zeek::detail::Attr(zeek::detail::ATTR_ON_CHANGE, {AdoptRef{}, $3}); } { $$ = new zeek::detail::Attr(zeek::detail::ATTR_ON_CHANGE, {AdoptRef{}, $3}); }
| TOK_ATTR_BROKER_STORE '=' expr | TOK_ATTR_BROKER_STORE '=' expr
{ $$ = new zeek::detail::Attr(zeek::detail::ATTR_BROKER_STORE, {AdoptRef{}, $3}); } { $$ = new zeek::detail::Attr(zeek::detail::ATTR_BROKER_STORE, {AdoptRef{}, $3}); }
| TOK_ATTR_BACKEND '=' expr
{ $$ = new zeek::detail::Attr(zeek::detail::ATTR_BACKEND, {AdoptRef{}, $3}); }
| TOK_ATTR_EXPIRE_FUNC '=' expr | TOK_ATTR_EXPIRE_FUNC '=' expr
{ $$ = new zeek::detail::Attr(zeek::detail::ATTR_EXPIRE_FUNC, {AdoptRef{}, $3}); } { $$ = new zeek::detail::Attr(zeek::detail::ATTR_EXPIRE_FUNC, {AdoptRef{}, $3}); }
| TOK_ATTR_EXPIRE_CREATE '=' expr | TOK_ATTR_EXPIRE_CREATE '=' expr

View file

@ -313,6 +313,7 @@ when return TOK_WHEN;
&write_expire return TOK_ATTR_EXPIRE_WRITE; &write_expire return TOK_ATTR_EXPIRE_WRITE;
&on_change return TOK_ATTR_ON_CHANGE; &on_change return TOK_ATTR_ON_CHANGE;
&broker_store return TOK_ATTR_BROKER_STORE; &broker_store return TOK_ATTR_BROKER_STORE;
&backend return TOK_ATTR_BACKEND;
@deprecated.* { @deprecated.* {
auto num_files = file_stack.length(); auto num_files = file_stack.length();

View file

@ -0,0 +1,97 @@
# @TEST-PORT: BROKER_PORT
# @TEST-EXEC: btest-bg-run master "zeek -B broker -b ../master.zeek >../master.out"
# @TEST-EXEC: btest-bg-run clone "zeek -B broker -b ../clone.zeek >../clone.out"
# @TEST-EXEC: btest-bg-wait 15
#
# @TEST-EXEC: btest-diff clone.out
@TEST-START-FILE master.zeek
redef exit_only_after_terminate = T;
module TestModule;
type testrec: record {
a: count;
b: string;
c: set[string];
};
global t: table[string] of count &backend=Broker::MEMORY;
global s: set[string] &backend=Broker::MEMORY;
global r: table[string] of testrec &backend=Broker::MEMORY;
event zeek_init()
{
Broker::listen("127.0.0.1", to_port(getenv("BROKER_PORT")));
}
event insert_stuff()
{
print "Inserting stuff";
t["a"] = 5;
delete t["a"];
add s["hi"];
t["a"] = 2;
t["a"] = 3;
t["b"] = 3;
t["c"] = 4;
t["whatever"] = 5;
delete t["c"];
r["a"] = testrec($a=1, $b="b", $c=set("elem1", "elem2"));
r["a"] = testrec($a=1, $b="c", $c=set("elem1", "elem2"));
r["b"] = testrec($a=2, $b="d", $c=set("elem1", "elem2"));
print t;
print s;
print r;
}
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
{
print "Peer added ", endpoint;
schedule 3secs { insert_stuff() };
}
event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
{
terminate();
}
@TEST-END-FILE
@TEST-START-FILE clone.zeek
redef exit_only_after_terminate = T;
redef Broker::auto_store_master = F;
module TestModule;
type testrec: record {
a: count;
b: string;
c: set[string];
};
global t: table[string] of count &backend=Broker::MEMORY;
global s: set[string] &backend=Broker::MEMORY;
global r: table[string] of testrec &backend=Broker::MEMORY;
event zeek_init()
{
Broker::peer("127.0.0.1", to_port(getenv("BROKER_PORT")));
}
event dump_tables()
{
print t;
print s;
print r;
terminate();
}
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
{
print "Peer added";
schedule 5secs { dump_tables() };
}
@TEST-END-FILE