BrokerStore<->Zeek tables: &backend works for in-memory stores.

Currently this requires using this with a normal cluster - or sending
messages by yourself.

It, in principle, should also work with SQLITE - but that is a bit
nonsensical without being able to change the storage location.
This commit is contained in:
Johanna Amann 2020-07-01 16:38:10 -07:00
parent 318a72c303
commit a220b02722
9 changed files with 118 additions and 33 deletions

View file

@ -49,5 +49,7 @@ redef Broker::log_topic = Cluster::rr_log_topic;
@load ./nodes/worker
@endif
@load ./broker-stores.zeek
@endif
@endif

View file

@ -0,0 +1,46 @@
module Broker;
export {
global announce_masters: event(masters: set[string]);
}
@if ( Cluster::is_enabled() && Cluster::local_node_type() != Cluster::MANAGER )
redef Broker::auto_store_master = F;
@endif
@if ( Broker::auto_store_master )
global broker_backed_ids: set[string];
event zeek_init()
{
local globals = global_ids();
for ( id in globals )
{
if ( globals[id]$broker_backend )
add broker_backed_ids[id];
}
}
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) &priority=1
{
if ( ! Cluster::is_enabled() )
return;
local e = Broker::make_event(Broker::announce_masters, broker_backed_ids);
Broker::publish(Cluster::nodeid_topic(endpoint$id), e);
}
@else
event Broker::announce_masters(masters: set[string])
{
for ( i in masters )
{
local name = "___sync_store_" + i;
Broker::create_clone(name);
}
}
@endif

View file

@ -249,10 +249,6 @@ export {
global nodeid_topic: function(id: string): string;
}
@if ( Cluster::is_enabled() && Cluster::local_node_type() != Cluster::MANAGER ) )
redef Broker::store_master = T;
@endif
global active_worker_ids: set[string] = set();
type NamedNode: record {

View file

@ -734,6 +734,7 @@ type script_id: record {
enum_constant: bool; ##< True if the identifier is an enum value.
option_value: bool; ##< True if the identifier is an option.
redefinable: bool; ##< True if the identifier is declared with the :zeek:attr:`&redef` attribute.
broker_backend: bool; ##< True if the identifier has a broker backend defined using the :zeek:attr:`&backend` attribute.
value: any &optional; ##< The current value of the identifier.
};

View file

@ -235,13 +235,15 @@ void Manager::InitializeBrokerStoreForwarding()
id->GetVal()->AsTableVal()->SetBrokerStore(storename);
AddForwardedStore(storename, {NewRef{}, id->GetVal()->AsTableVal()});
auto backend = bro_broker::to_backend_type(e);
// we only create masters here. For clones, we do all the work of setting up
// the forwarding - but we do not try to initialize the clone. We can only initialize
// the clone, once a node has a connection to a master. This is currently done in scriptland
// - check FIXME.
if ( zeek_table_manager )
MakeMaster(storename, backend, broker::backend_options{});
else
{
MakeClone(storename);
auto backend = bro_broker::to_backend_type(e);
MakeMaster(storename, backend, broker::backend_options{});
}
}
}

View file

@ -1955,9 +1955,10 @@ function global_ids%(%): id_table
rec->Assign(3, val_mgr->Bool(id->IsEnumConst()));
rec->Assign(4, val_mgr->Bool(id->IsOption()));
rec->Assign(5, val_mgr->Bool(id->IsRedefinable()));
rec->Assign(6, val_mgr->Bool(id->GetAttr(zeek::detail::ATTR_BACKEND) != zeek::detail::Attr::nil));
if ( id->HasVal() )
rec->Assign(6, id->GetVal());
rec->Assign(7, id->GetVal());
auto id_name = make_intrusive<StringVal>(id->Name());
ids->Assign(std::move(id_name), std::move(rec));

View file

@ -0,0 +1,18 @@
{
[b] = 3,
[whatever] = 5,
[a] = 3
}
{
hi
}
{
[b] = [a=2, b=d, c={
elem1,
elem2
}],
[a] = [a=1, b=c, c={
elem1,
elem2
}]
}

View file

@ -0,0 +1,18 @@
{
[b] = 3,
[whatever] = 5,
[a] = 3
}
{
hi
}
{
[b] = [a=2, b=d, c={
elem1,
elem2
}],
[a] = [a=1, b=c, c={
elem1,
elem2
}]
}

View file

@ -1,13 +1,30 @@
# @TEST-PORT: BROKER_PORT
# @TEST-PORT: BROKER_PORT1
# @TEST-PORT: BROKER_PORT2
# @TEST-PORT: BROKER_PORT3
# @TEST-EXEC: btest-bg-run master "zeek -B broker -b ../master.zeek >../master.out"
# @TEST-EXEC: btest-bg-run clone "zeek -B broker -b ../clone.zeek >../clone.out"
# @TEST-EXEC: btest-bg-run manager-1 "ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=manager-1 zeek -B broker ../master.zeek >../master.out"
# @TEST-EXEC: btest-bg-run worker-1 "ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=worker-1 zeek -B broker ../clone.zeek >../clone.out"
# @TEST-EXEC: btest-bg-run worker-2 "ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=worker-2 zeek -B broker ../clone.zeek >../clone2.out"
# @TEST-EXEC: btest-bg-wait 15
#
# @TEST-EXEC: btest-diff master.out
# @TEST-EXEC: btest-diff clone.out
# @TEST-EXEC: diff master.out clone.out
# @TEST-EXEC: diff master.out clone2.out
@TEST-START-FILE cluster-layout.zeek
redef Cluster::nodes = {
["manager-1"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT1"))],
["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT2")), $manager="manager-1", $interface="eth0"],
["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT3")), $manager="manager-1", $interface="eth0"],
};
@TEST-END-FILE
@TEST-START-FILE master.zeek
redef exit_only_after_terminate = T;
redef Log::enable_local_logging = T;
redef Log::default_rotation_interval = 0secs;
module TestModule;
@ -23,12 +40,6 @@ global r: table[string] of testrec &backend=Broker::MEMORY;
event zeek_init()
{
Broker::listen("127.0.0.1", to_port(getenv("BROKER_PORT")));
}
event insert_stuff()
{
print "Inserting stuff";
t["a"] = 5;
delete t["a"];
add s["hi"];
@ -46,12 +57,6 @@ event insert_stuff()
print r;
}
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
{
print "Peer added ", endpoint;
schedule 3secs { insert_stuff() };
}
event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
{
terminate();
@ -61,7 +66,8 @@ event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
@TEST-START-FILE clone.zeek
redef exit_only_after_terminate = T;
redef Broker::auto_store_master = F;
redef Log::enable_local_logging = T;
redef Log::default_rotation_interval = 0secs;
module TestModule;
@ -76,11 +82,6 @@ global s: set[string] &backend=Broker::MEMORY;
global r: table[string] of testrec &backend=Broker::MEMORY;
event zeek_init()
{
Broker::peer("127.0.0.1", to_port(getenv("BROKER_PORT")));
}
event dump_tables()
{
print t;
@ -89,9 +90,9 @@ event dump_tables()
terminate();
}
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
event Cluster::node_up(name: string, id: string)
{
print "Peer added";
schedule 5secs { dump_tables() };
#print "node up", name;
schedule 4secs { dump_tables() };
}
@TEST-END-FILE