Zeek Table<->Brokerstore: cleanup, documentation, small fixes

This commit adds script/c++ documentation and fixes a few loose ends.
It also adds tests for corner cases and massively improves error
messages.

This also actually introduces type-compatibility checking and introduces
a new attribute that lets a user override this if they really know what
they are doing. I am not quite sure if we should really let that stay in
- but it can be very convenient to have this functionality.

One test is continuing to fail - the expiry test is very flaky. This is,
I think, caused by delays of the broker store forwarding. I am unsure if
we can actually do anything about that.
This commit is contained in:
Johanna Amann 2020-07-10 16:58:34 -07:00
parent 67917b83aa
commit 2b2a40f49c
26 changed files with 271 additions and 53 deletions

14
NEWS
View file

@ -55,6 +55,20 @@ New Functionality
for which no connection state information is available in the core anymore. These for which no connection state information is available in the core anymore. These
cases will raise the new ``expired_conn_weird`` event. cases will raise the new ``expired_conn_weird`` event.
- Broker Store table synchronization (Experimental).
Zeek now supports synchronizing tables/sets across clusters using a backing broker
store. The same feature also allows persistent storage of data in tables/sets
over Zeek restarts. This feature is implemented using the new ```&backend``` attribute.
To synchronize a table over a cluster, you can e.g. use:
```global t: table[string] of count &backend=Broker::MEMORY;```
This feature is documented in detail here: FIXME.
Note: this feature is experimental and the syntax/featureset can change in the future.
Changed Functionality Changed Functionality
--------------------- ---------------------

View file

@ -25,8 +25,14 @@ export {
## A negative/zero value indicates to never buffer commands. ## A negative/zero value indicates to never buffer commands.
const default_clone_mutation_buffer_interval = 2min &redef; const default_clone_mutation_buffer_interval = 2min &redef;
## If set to true, the current node is the master node for broker stores
## backing zeek tables. By default this value will be automatically set to
## true in standalone mode, or on the manager node of a cluster. This value
## should not typically be changed manually.
const auto_store_master = T &redef; const auto_store_master = T &redef;
## The directory used for storing persistent database files when using brokerstore
## backed zeek tables.
const auto_store_db_directory = "." &redef; const auto_store_db_directory = "." &redef;
## Whether a data store query could be completed or not. ## Whether a data store query could be completed or not.

View file

@ -1,11 +1,23 @@
##! This script deals with the cluster parts of broker backed zeek tables.
##! It makes sure that the master store is set correctly and that clones
##! are automatically created on the non-manager nodes.
# Note - this script should become unnecessary in the future, when we just can
# speculatively attach clones. This should be possible once the new ALM broker
# transport becomes available.
@load ./main @load ./main
module Broker; module Broker;
export { export {
## Event that is used by the manager to announce the master stores for zeek backed
## tables that is uses.
global announce_masters: event(masters: set[string]); global announce_masters: event(masters: set[string]);
} }
# If we are not the manager - disable automatically generating masters. We will attach
# clones instead.
@if ( Cluster::is_enabled() && Cluster::local_node_type() != Cluster::MANAGER ) @if ( Cluster::is_enabled() && Cluster::local_node_type() != Cluster::MANAGER )
redef Broker::auto_store_master = F; redef Broker::auto_store_master = F;
@endif @endif
@ -24,6 +36,7 @@ event zeek_init()
} }
} }
# Send the auto masters we created to the newly connected node
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) &priority=1 event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) &priority=1
{ {
if ( ! Cluster::is_enabled() ) if ( ! Cluster::is_enabled() )
@ -39,6 +52,7 @@ event Broker::announce_masters(masters: set[string])
{ {
for ( i in masters ) for ( i in masters )
{ {
# this magic name for the store is created in broker/Manager.cc for the manager.
local name = "___sync_store_" + i; local name = "___sync_store_" + i;
Broker::create_clone(name); Broker::create_clone(name);
} }

View file

@ -8,6 +8,7 @@
#include "Val.h" #include "Val.h"
#include "IntrusivePtr.h" #include "IntrusivePtr.h"
#include "threading/SerialTypes.h" #include "threading/SerialTypes.h"
#include "input/Manager.h"
namespace zeek::detail { namespace zeek::detail {
@ -19,8 +20,8 @@ const char* attr_name(AttrTag t)
"&read_expire", "&write_expire", "&create_expire", "&read_expire", "&write_expire", "&create_expire",
"&raw_output", "&priority", "&raw_output", "&priority",
"&group", "&log", "&error_handler", "&type_column", "&group", "&log", "&error_handler", "&type_column",
"(&tracked)", "&on_change", "&broker_store", "&backend", "(&tracked)", "&on_change", "&broker_store",
"&deprecated", "&broker_allow_complex_type", "&backend", "&deprecated",
}; };
return attr_names[int(t)]; return attr_names[int(t)];
@ -450,6 +451,10 @@ void Attributes::CheckAttr(Attr* a)
{ {
Error("&broker_store and &read_expire cannot be used simultaneously"); Error("&broker_store and &read_expire cannot be used simultaneously");
} }
if ( Find(ATTR_BACKEND) )
{
Error("&broker_store and &backend cannot be used simultaneously");
}
} }
// fallthrough // fallthrough
case ATTR_EXPIRE_WRITE: case ATTR_EXPIRE_WRITE:
@ -535,6 +540,10 @@ void Attributes::CheckAttr(Attr* a)
{ {
Error("&broker_store and &expire_func cannot be used simultaneously"); Error("&broker_store and &expire_func cannot be used simultaneously");
} }
if ( Find(ATTR_BACKEND ) )
{
Error("&backend and &expire_func cannot be used simultaneously");
}
} }
break; break;
@ -607,7 +616,7 @@ void Attributes::CheckAttr(Attr* a)
{ {
if ( ! global_var || type->Tag() != TYPE_TABLE ) if ( ! global_var || type->Tag() != TYPE_TABLE )
{ {
Error("&broker_store only applicable to global sets/tables"); Error("&backend only applicable to global sets/tables");
break; break;
} }
@ -615,7 +624,7 @@ void Attributes::CheckAttr(Attr* a)
// are here. We will do that later - before actually attaching to a broker store // are here. We will do that later - before actually attaching to a broker store
if ( a->GetExpr()->GetType()->Tag() != TYPE_ENUM ) if ( a->GetExpr()->GetType()->Tag() != TYPE_ENUM )
{ {
Error("&broker_store must take an enum argument"); Error("&backend must take an enum argument");
break; break;
} }
@ -625,6 +634,14 @@ void Attributes::CheckAttr(Attr* a)
Error("&backend only supports one-element set/table indexes"); Error("&backend only supports one-element set/table indexes");
} }
// Only support atomic types for the moment.
if ( ! type->AsTableType()->IsSet() &&
! input::Manager::IsCompatibleType(type->AsTableType()->Yield().get(), true) &&
! Find(ATTR_BROKER_STORE_ALLOW_COMPLEX) )
{
Error("&backend only supports atomic types as table value");
}
if ( Find(ATTR_EXPIRE_FUNC ) ) if ( Find(ATTR_EXPIRE_FUNC ) )
{ {
Error("&backend and &expire_func cannot be used simultaneously"); Error("&backend and &expire_func cannot be used simultaneously");
@ -655,11 +672,18 @@ void Attributes::CheckAttr(Attr* a)
} }
// Temporary since Broker does not support ListVals - and we cannot easily convert to set/vector // Temporary since Broker does not support ListVals - and we cannot easily convert to set/vector
if ( type->AsTableType()->GetIndexTypes().size() != 1 ) if ( type->AsTableType()->GetIndexTypes().size() != 1 && ! Find(ATTR_BROKER_STORE_ALLOW_COMPLEX) )
{ {
Error("&broker_store only supports one-element set/table indexes"); Error("&broker_store only supports one-element set/table indexes");
} }
if ( ! type->AsTableType()->IsSet() &&
! input::Manager::IsCompatibleType(type->AsTableType()->Yield().get(), true) &&
! Find(ATTR_BROKER_STORE_ALLOW_COMPLEX) )
{
Error("&broker_store only supports atomic types as table value");
}
if ( Find(ATTR_EXPIRE_FUNC ) ) if ( Find(ATTR_EXPIRE_FUNC ) )
{ {
Error("&broker_store and &expire_func cannot be used simultaneously"); Error("&broker_store and &expire_func cannot be used simultaneously");
@ -668,8 +692,21 @@ void Attributes::CheckAttr(Attr* a)
{ {
Error("&broker_store and &read_expire cannot be used simultaneously"); Error("&broker_store and &read_expire cannot be used simultaneously");
} }
if ( Find(ATTR_BACKEND) )
{
Error("&backend and &broker_store cannot be used simultaneously");
}
break; break;
} }
case ATTR_BROKER_STORE_ALLOW_COMPLEX:
{
if ( type->Tag() != TYPE_TABLE )
{
Error("&broker_allow_complex_type only applicable to sets/tables");
break;
}
}
case ATTR_TRACKED: case ATTR_TRACKED:
// FIXME: Check here for global ID? // FIXME: Check here for global ID?
break; break;

View file

@ -42,6 +42,7 @@ enum AttrTag {
ATTR_TRACKED, // hidden attribute, tracked by NotifierRegistry ATTR_TRACKED, // hidden attribute, tracked by NotifierRegistry
ATTR_ON_CHANGE, // for table change tracking ATTR_ON_CHANGE, // for table change tracking
ATTR_BROKER_STORE, // for broker-store backed tables ATTR_BROKER_STORE, // for broker-store backed tables
ATTR_BROKER_STORE_ALLOW_COMPLEX, // for broker-store backed tables
ATTR_BACKEND, // for broker-store backed tabled ATTR_BACKEND, // for broker-store backed tabled
ATTR_DEPRECATED, ATTR_DEPRECATED,
NUM_ATTRS // this item should always be last NUM_ATTRS // this item should always be last

View file

@ -1515,7 +1515,7 @@ void TableVal::SetAttrs(detail::AttributesPtr a)
change_func = cf->GetExpr(); change_func = cf->GetExpr();
auto bs = attrs->Find(zeek::detail::ATTR_BROKER_STORE); auto bs = attrs->Find(zeek::detail::ATTR_BROKER_STORE);
if ( bs && broker_store.empty() ) // this does not mesh well with being updated several times if ( bs && broker_store.empty() )
{ {
IntrusivePtr<Val> c = bs->GetExpr()->Eval(nullptr); IntrusivePtr<Val> c = bs->GetExpr()->Eval(nullptr);
assert(c); assert(c);
@ -2181,8 +2181,6 @@ void TableVal::SendToStore(const Val* index, const TableEntryVal* new_entry_val,
index_val = index; index_val = index;
} }
// FIXME: switch back to just storing tables directly in the broker store?
// me which store a change came from - and this still seems to be missing from the store_events. (Or I am blind).
auto broker_index = bro_broker::val_to_data(index_val); auto broker_index = bro_broker::val_to_data(index_val);
if ( ! broker_index ) if ( ! broker_index )

View file

@ -1023,6 +1023,10 @@ public:
// on zeek::RecordTypes. // on zeek::RecordTypes.
static void DoneParsing(); static void DoneParsing();
/**
* Sets the name of the broker store that is backing this table.
* @param store store that is backing this table.
*/
void SetBrokerStore(const std::string& store) { broker_store = store; } void SetBrokerStore(const std::string& store) { broker_store = store; }
protected: protected:

View file

@ -238,7 +238,8 @@ void Manager::InitializeBrokerStoreForwarding()
// we only create masters here. For clones, we do all the work of setting up // we only create masters here. For clones, we do all the work of setting up
// the forwarding - but we do not try to initialize the clone. We can only initialize // the forwarding - but we do not try to initialize the clone. We can only initialize
// the clone, once a node has a connection to a master. This is currently done in scriptland // the clone, once a node has a connection to a master. This is currently done in scriptland
// - check FIXME. // in scripts/base/frameworks/cluster/broker-stores.zeek. Once the ALM transport is ready
// we can change over to doing this here.
if ( ! zeek_table_manager ) if ( ! zeek_table_manager )
continue; continue;
@ -956,7 +957,7 @@ void Manager::Process()
if ( broker::topics::store_events.prefix_of(topic) ) if ( broker::topics::store_events.prefix_of(topic) )
{ {
ProcessStoreEvent(topic, std::move(msg)); ProcessStoreEvent(std::move(msg));
continue; continue;
} }
@ -995,7 +996,7 @@ void Manager::Process()
} }
} }
void Manager::ProcessStoreEvent(const broker::topic& topic, broker::data msg) void Manager::ProcessStoreEvent(broker::data msg)
{ {
if ( auto insert = broker::store_event::insert::make(msg) ) if ( auto insert = broker::store_event::insert::make(msg) )
{ {
@ -1016,20 +1017,18 @@ void Manager::ProcessStoreEvent(const broker::topic& topic, broker::data msg)
DBG_LOG(DBG_BROKER, "Store %s: Insert: %s:%s (%s:%s)", insert.store_id().c_str(), to_string(insert.key()).c_str(), to_string(insert.value()).c_str(), insert.key().get_type_name(), insert.value().get_type_name()); DBG_LOG(DBG_BROKER, "Store %s: Insert: %s:%s (%s:%s)", insert.store_id().c_str(), to_string(insert.key()).c_str(), to_string(insert.value()).c_str(), insert.key().get_type_name(), insert.value().get_type_name());
if ( table->GetType()->IsSet() && data.get_type() != broker::data::type::none ) if ( table->GetType()->IsSet() && data.get_type() != broker::data::type::none )
{ {
reporter->Error("ProcessStoreEvent Insert got %s when expecting set", data.get_type_name()); reporter->Error("ProcessStoreEvent Insert got %s when expecting set", data.get_type_name());
return; return;
} }
// FIXME: expiry!
const auto& its = table->GetType()->AsTableType()->GetIndexTypes(); const auto& its = table->GetType()->AsTableType()->GetIndexTypes();
assert( its.size() == 1 ); assert( its.size() == 1 );
auto zeek_key = data_to_val(std::move(key), its[0].get()); auto zeek_key = data_to_val(key, its[0].get());
if ( ! zeek_key ) if ( ! zeek_key )
{ {
reporter->Error("ProcessStoreEvent: failed to convert key"); reporter->Error("ProcessStoreEvent: could not convert key \"%s\" for store \"%s\" while receiving remote insert. This probably means the tables have different types on different nodes.", to_string(key).c_str(), insert.store_id().c_str());
return; return;
} }
@ -1040,10 +1039,10 @@ void Manager::ProcessStoreEvent(const broker::topic& topic, broker::data msg)
} }
// it is a table // it is a table
auto zeek_value = data_to_val(std::move(data), table->GetType()->Yield().get()); auto zeek_value = data_to_val(data, table->GetType()->Yield().get());
if ( ! zeek_value ) if ( ! zeek_value )
{ {
reporter->Error("ProcessStoreEvent: failed to convert value"); reporter->Error("ProcessStoreEvent: could not convert value \"%s\" for key \"%s\" in store \"%s\" while receiving remote insert. This probably means the tables have different types on different nodes.", to_string(data).c_str(), to_string(key).c_str(), insert.store_id().c_str());
return; return;
} }
table->Assign(zeek_key, zeek_value, false); table->Assign(zeek_key, zeek_value, false);
@ -1075,10 +1074,10 @@ void Manager::ProcessStoreEvent(const broker::topic& topic, broker::data msg)
const auto& its = table->GetType()->AsTableType()->GetIndexTypes(); const auto& its = table->GetType()->AsTableType()->GetIndexTypes();
assert( its.size() == 1 ); assert( its.size() == 1 );
auto zeek_key = data_to_val(std::move(key), its[0].get()); auto zeek_key = data_to_val(key, its[0].get());
if ( ! zeek_key ) if ( ! zeek_key )
{ {
reporter->Error("ProcessStoreEvent: failed to convert key"); reporter->Error("ProcessStoreEvent: could not convert key \"%s\" for store \"%s\" while receiving remote update. This probably means the tables have different types on different nodes.", to_string(key).c_str(), insert.store_id().c_str());
return; return;
} }
@ -1089,10 +1088,10 @@ void Manager::ProcessStoreEvent(const broker::topic& topic, broker::data msg)
} }
// it is a table // it is a table
auto zeek_value = data_to_val(std::move(data), table->GetType()->Yield().get()); auto zeek_value = data_to_val(data, table->GetType()->Yield().get());
if ( ! zeek_value ) if ( ! zeek_value )
{ {
reporter->Error("ProcessStoreEvent: failed to convert value"); reporter->Error("ProcessStoreEvent: could not convert value \"%s\" for key \"%s\" in store \"%s\" while receiving remote update. This probably means the tables have different types on different nodes.", to_string(data).c_str(), to_string(key).c_str(), insert.store_id().c_str());
return; return;
} }
table->Assign(zeek_key, zeek_value, false); table->Assign(zeek_key, zeek_value, false);
@ -1115,17 +1114,17 @@ void Manager::ProcessStoreEvent(const broker::topic& topic, broker::data msg)
DBG_LOG(DBG_BROKER, "Store %s: Erase key %s", erase.store_id().c_str(), to_string(key).c_str()); DBG_LOG(DBG_BROKER, "Store %s: Erase key %s", erase.store_id().c_str(), to_string(key).c_str());
const auto& its = table->GetType()->AsTableType()->GetIndexTypes(); const auto& its = table->GetType()->AsTableType()->GetIndexTypes();
assert( its.size() == 1 ); assert( its.size() == 1 );
auto zeek_key = data_to_val(std::move(key), its[0].get()); auto zeek_key = data_to_val(key, its[0].get());
if ( ! zeek_key ) if ( ! zeek_key )
{ {
reporter->Error("ProcessStoreEvent: failed to convert key"); reporter->Error("ProcessStoreEvent: could not convert key \"%s\" for store \"%s\" while receiving remote erase. This probably means the tables have different types on different nodes.", to_string(key).c_str(), insert.store_id().c_str());
return; return;
} }
table->Remove(*zeek_key, false); table->Remove(*zeek_key, false);
} }
else if ( auto expire = broker::store_event::expire::make(msg) ) else if ( auto expire = broker::store_event::expire::make(msg) )
{ {
// We just ignore expirys - expiring information on the Zeek side is handled by Zeek itself. // We just ignore expiries - expiring information on the Zeek side is handled by Zeek itself.
#ifdef DEBUG #ifdef DEBUG
// let's only debug log for stores that we know. // let's only debug log for stores that we know.
auto storehandle = broker_mgr->LookupStore(expire.store_id()); auto storehandle = broker_mgr->LookupStore(expire.store_id());
@ -1685,7 +1684,7 @@ void Manager::BrokerStoreToZeekTable(const std::string& name, const StoreHandleV
auto zeek_key = data_to_val(key, its[0].get()); auto zeek_key = data_to_val(key, its[0].get());
if ( ! zeek_key ) if ( ! zeek_key )
{ {
reporter->Error("Failed to convert key %s while importing broker store to table for store %s. Aborting import.", to_string(key).c_str(), name.c_str()); reporter->Error("Failed to convert key \"%s\" while importing broker store to table for store \"%s\". Aborting import.", to_string(key).c_str(), name.c_str());
// just abort - this probably means the types are incompatible // just abort - this probably means the types are incompatible
return; return;
} }

View file

@ -303,6 +303,15 @@ public:
*/ */
StoreHandleVal* LookupStore(const std::string& name); StoreHandleVal* LookupStore(const std::string& name);
/**
* Register a zeek table that is associated with a broker store that is backing it. This
* causes all changes that happen to the brokerstore in the future to be applied to the zeek
* table.
* A single broker store can only be forwarded to a single table.
* @param name name of the broker store
* @param table pointer to the table/set that is being backed
* @return true on success, false if the named store is already being forwarded.
*/
bool AddForwardedStore(const std::string& name, zeek::IntrusivePtr<zeek::TableVal> table); bool AddForwardedStore(const std::string& name, zeek::IntrusivePtr<zeek::TableVal> table);
/** /**
@ -350,7 +359,8 @@ public:
private: private:
void DispatchMessage(const broker::topic& topic, broker::data msg); void DispatchMessage(const broker::topic& topic, broker::data msg);
void ProcessStoreEvent(const broker::topic& topic, broker::data msg); // Process events used for broker store backed zeek tables
void ProcessStoreEvent(broker::data msg);
void ProcessEvent(const broker::topic& topic, broker::zeek::Event ev); void ProcessEvent(const broker::topic& topic, broker::zeek::Event ev);
bool ProcessLogCreate(broker::zeek::LogCreate lc); bool ProcessLogCreate(broker::zeek::LogCreate lc);
bool ProcessLogWrite(broker::zeek::LogWrite lw); bool ProcessLogWrite(broker::zeek::LogWrite lw);
@ -359,6 +369,7 @@ private:
void ProcessError(broker::error err); void ProcessError(broker::error err);
void ProcessStoreResponse(StoreHandleVal*, broker::store::response response); void ProcessStoreResponse(StoreHandleVal*, broker::store::response response);
void FlushPendingQueries(); void FlushPendingQueries();
// Initializes the masters for broker backed zeek tables when using the &backend attribute
void InitializeBrokerStoreForwarding(); void InitializeBrokerStoreForwarding();
// Check if a broker store is associated to a table on the Zeek side. // Check if a broker store is associated to a table on the Zeek side.
void CheckForwarding(const std::string& name); void CheckForwarding(const std::string& name);

View file

@ -5,7 +5,7 @@
// Switching parser table type fixes ambiguity problems. // Switching parser table type fixes ambiguity problems.
%define lr.type ielr %define lr.type ielr
%expect 123 %expect 129
%token TOK_ADD TOK_ADD_TO TOK_ADDR TOK_ANY %token TOK_ADD TOK_ADD_TO TOK_ADDR TOK_ANY
%token TOK_ATENDIF TOK_ATELSE TOK_ATIF TOK_ATIFDEF TOK_ATIFNDEF %token TOK_ATENDIF TOK_ATELSE TOK_ATIF TOK_ATIFDEF TOK_ATIFNDEF
@ -24,7 +24,8 @@
%token TOK_ATTR_ADD_FUNC TOK_ATTR_DEFAULT TOK_ATTR_OPTIONAL TOK_ATTR_REDEF %token TOK_ATTR_ADD_FUNC TOK_ATTR_DEFAULT TOK_ATTR_OPTIONAL TOK_ATTR_REDEF
%token TOK_ATTR_DEL_FUNC TOK_ATTR_EXPIRE_FUNC %token TOK_ATTR_DEL_FUNC TOK_ATTR_EXPIRE_FUNC
%token TOK_ATTR_EXPIRE_CREATE TOK_ATTR_EXPIRE_READ TOK_ATTR_EXPIRE_WRITE %token TOK_ATTR_EXPIRE_CREATE TOK_ATTR_EXPIRE_READ TOK_ATTR_EXPIRE_WRITE
%token TOK_ATTR_RAW_OUTPUT TOK_ATTR_ON_CHANGE TOK_ATTR_BROKER_STORE TOK_ATTR_BACKEND %token TOK_ATTR_RAW_OUTPUT TOK_ATTR_ON_CHANGE TOK_ATTR_BROKER_STORE
%token TOK_ATTR_BROKER_STORE_ALLOW_COMPLEX TOK_ATTR_BACKEND
%token TOK_ATTR_PRIORITY TOK_ATTR_LOG TOK_ATTR_ERROR_HANDLER %token TOK_ATTR_PRIORITY TOK_ATTR_LOG TOK_ATTR_ERROR_HANDLER
%token TOK_ATTR_TYPE_COLUMN TOK_ATTR_DEPRECATED %token TOK_ATTR_TYPE_COLUMN TOK_ATTR_DEPRECATED
@ -1368,6 +1369,8 @@ attr:
{ $$ = new zeek::detail::Attr(zeek::detail::ATTR_ON_CHANGE, {zeek::AdoptRef{}, $3}); } { $$ = new zeek::detail::Attr(zeek::detail::ATTR_ON_CHANGE, {zeek::AdoptRef{}, $3}); }
| TOK_ATTR_BROKER_STORE '=' expr | TOK_ATTR_BROKER_STORE '=' expr
{ $$ = new zeek::detail::Attr(zeek::detail::ATTR_BROKER_STORE, {zeek::AdoptRef{}, $3}); } { $$ = new zeek::detail::Attr(zeek::detail::ATTR_BROKER_STORE, {zeek::AdoptRef{}, $3}); }
| TOK_ATTR_BROKER_STORE_ALLOW_COMPLEX
{ $$ = new zeek::detail::Attr(zeek::detail::ATTR_BROKER_STORE_ALLOW_COMPLEX); }
| TOK_ATTR_BACKEND '=' expr | TOK_ATTR_BACKEND '=' expr
{ $$ = new zeek::detail::Attr(zeek::detail::ATTR_BACKEND, {zeek::AdoptRef{}, $3}); } { $$ = new zeek::detail::Attr(zeek::detail::ATTR_BACKEND, {zeek::AdoptRef{}, $3}); }
| TOK_ATTR_EXPIRE_FUNC '=' expr | TOK_ATTR_EXPIRE_FUNC '=' expr

View file

@ -285,6 +285,7 @@ when return TOK_WHEN;
&write_expire return TOK_ATTR_EXPIRE_WRITE; &write_expire return TOK_ATTR_EXPIRE_WRITE;
&on_change return TOK_ATTR_ON_CHANGE; &on_change return TOK_ATTR_ON_CHANGE;
&broker_store return TOK_ATTR_BROKER_STORE; &broker_store return TOK_ATTR_BROKER_STORE;
&broker_allow_complex_type return TOK_ATTR_BROKER_STORE_ALLOW_COMPLEX;
&backend return TOK_ATTR_BACKEND; &backend return TOK_ATTR_BACKEND;
@deprecated.* { @deprecated.* {

View file

@ -0,0 +1,5 @@
error in /this/is/a/path/brokerstore-backend-invalid.zeek, line 12: &backend only supports one-element set/table indexes (&backend=Broker::MEMORY)
error in /this/is/a/path/broker.store.brokerstore-backend-invalid/brokerstore-backend-invalid.zeek, line 13: &backend only supports atomic types as table value (&backend=Broker::MEMORY)
error in /this/is/a/path/broker.store.brokerstore-backend-invalid/brokerstore-backend-invalid.zeek, line 14: &backend and &read_expire cannot be used simultaneously (&read_expire=5.0 secs, &backend=Broker::MEMORY)
error in /this/is/a/path/broker.store.brokerstore-backend-invalid/brokerstore-backend-invalid.zeek, line 15: &backend and &broker_store cannot be used simultaneously (&broker_store=store, &backend=Broker::MEMORY)
error in /this/is/a/path/broker.store.brokerstore-backend-invalid/brokerstore-backend-invalid.zeek, line 16: &backend only applicable to global sets/tables (&backend=Broker::MEMORY)

View file

@ -0,0 +1,3 @@
error: ProcessStoreEvent: could not convert value "b" for key "a" in store "___sync_store_TestModule::s" while receiving remote insert. This probably means the tables have different types on different nodes.
error: ProcessStoreEvent: could not convert key "a" for store "___sync_store_TestModule::t" while receiving remote insert. This probably means the tables have different types on different nodes.
received termination signal

View file

@ -0,0 +1 @@
error: Failed to convert key "a" while importing broker store to table for store "___sync_store_TestModule::t". Aborting import.

View file

@ -27,7 +27,7 @@ type testrec: record {
global t: table[string] of count &broker_store="table"; global t: table[string] of count &broker_store="table";
global s: set[string] &broker_store="set"; global s: set[string] &broker_store="set";
global r: table[string] of testrec &broker_store="rec"; global r: table[string] of testrec &broker_allow_complex_type &broker_store="rec";
event zeek_init() event zeek_init()
{ {
@ -73,7 +73,7 @@ type testrec: record {
global t: table[string] of count &broker_store="table"; global t: table[string] of count &broker_store="table";
global s: set[string] &broker_store="set"; global s: set[string] &broker_store="set";
global r: table[string] of testrec &broker_store="rec"; global r: table[string] of testrec &broker_allow_complex_type &broker_store="rec";
event zeek_init() event zeek_init()
{ {
@ -133,7 +133,7 @@ type testrec: record {
global t: table[string] of count &broker_store="table"; global t: table[string] of count &broker_store="table";
global s: set[string] &broker_store="set"; global s: set[string] &broker_store="set";
global r: table[string] of testrec &broker_store="rec"; global r: table[string] of testrec &broker_allow_complex_type &broker_store="rec";
event zeek_init() event zeek_init()
{ {

View file

@ -62,7 +62,7 @@ function print_keys()
global t: table[string] of count &broker_store="table" &create_expire=4sec &on_change=change_t; global t: table[string] of count &broker_store="table" &create_expire=4sec &on_change=change_t;
global s: table[string] of count &broker_store="set" &write_expire=3sec &on_change=change_s; global s: table[string] of count &broker_store="set" &write_expire=3sec &on_change=change_s;
global r: table[string] of testrec &broker_store="rec" &write_expire=5sec &on_change=change_r; global r: table[string] of testrec &broker_allow_complex_type &broker_store="rec" &write_expire=5sec &on_change=change_r;
event zeek_init() event zeek_init()
{ {
@ -154,7 +154,7 @@ function change_r(tbl: any, tpe: TableChange, idx: string, idxb: testrec)
global t: table[string] of count &broker_store="table" &create_expire=4sec &on_change=change_t; global t: table[string] of count &broker_store="table" &create_expire=4sec &on_change=change_t;
global s: table[string] of count &broker_store="set" &write_expire=3sec &on_change=change_s; global s: table[string] of count &broker_store="set" &write_expire=3sec &on_change=change_s;
global r: table[string] of testrec &broker_store="rec" &write_expire=5sec &on_change=change_r; global r: table[string] of testrec &broker_allow_complex_type &broker_store="rec" &write_expire=5sec &on_change=change_r;
event zeek_init() event zeek_init()
{ {

View file

@ -29,7 +29,7 @@ type testrec: record {
global t: table[string] of count &broker_store="table"; global t: table[string] of count &broker_store="table";
global s: set[string] &broker_store="set"; global s: set[string] &broker_store="set";
global r: table[string] of testrec &broker_store="rec"; global r: table[string] of testrec &broker_allow_complex_type &broker_store="rec";
event zeek_init() event zeek_init()
{ {
@ -73,7 +73,7 @@ type testrec: record {
global t: table[string] of count &broker_store="table"; global t: table[string] of count &broker_store="table";
global s: set[string] &broker_store="set"; global s: set[string] &broker_store="set";
global r: table[string] of testrec &broker_store="rec"; global r: table[string] of testrec &broker_allow_complex_type &broker_store="rec";
event zeek_init() event zeek_init()
{ {
@ -114,7 +114,7 @@ type testrec: record {
global t: table[string] of count &broker_store="table"; global t: table[string] of count &broker_store="table";
global s: set[string] &broker_store="set"; global s: set[string] &broker_store="set";
global r: table[string] of testrec &broker_store="rec"; global r: table[string] of testrec &broker_allow_complex_type &broker_store="rec";
event zeek_init() event zeek_init()
{ {

View file

@ -24,7 +24,7 @@ type testrec: record {
global t: table[string] of count &broker_store="table"; global t: table[string] of count &broker_store="table";
global s: set[string] &broker_store="set"; global s: set[string] &broker_store="set";
global r: table[string] of testrec &broker_store="rec"; global r: table[string] of testrec &broker_allow_complex_type &broker_store="rec";
event zeek_init() event zeek_init()
{ {
@ -66,7 +66,7 @@ type testrec: record {
global t: table[string] of count &broker_store="table"; global t: table[string] of count &broker_store="table";
global s: set[string] &broker_store="set"; global s: set[string] &broker_store="set";
global r: table[string] of testrec &broker_store="rec"; global r: table[string] of testrec &broker_allow_complex_type &broker_store="rec";
event zeek_init() event zeek_init()
{ {

View file

@ -23,7 +23,7 @@ type testrec: record {
global t: table[string] of count &broker_store="table"; global t: table[string] of count &broker_store="table";
global s: set[string] &broker_store="set"; global s: set[string] &broker_store="set";
global r: table[string] of testrec &broker_store="rec"; global r: table[string] of testrec &broker_allow_complex_type &broker_store="rec";
event zeek_init() event zeek_init()
{ {
@ -83,7 +83,7 @@ type testrec: record {
global t: table[string] of count &broker_store="table"; global t: table[string] of count &broker_store="table";
global s: set[string] &broker_store="set"; global s: set[string] &broker_store="set";
global r: table[string] of testrec &broker_store="rec"; global r: table[string] of testrec &broker_allow_complex_type &broker_store="rec";
event zeek_init() event zeek_init()
{ {

View file

@ -0,0 +1,16 @@
# @TEST-EXEC-FAIL: zeek -B broker %INPUT
# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-remove-abspath btest-diff .stderr
module TestModule;
type testrec: record {
a: count;
b: string;
c: set[string];
};
global a: table[string, count] of count &backend=Broker::MEMORY;
global b: table[string] of testrec &backend=Broker::MEMORY;
global c: table[string] of count &read_expire=5sec &backend=Broker::MEMORY;
global d: table[string] of count &broker_store="store" &backend=Broker::MEMORY;
global f: count &backend=Broker::MEMORY;

View file

@ -0,0 +1,67 @@
# @TEST-PORT: BROKER_PORT1
# @TEST-PORT: BROKER_PORT2
# @TEST-PORT: BROKER_PORT3
# @TEST-EXEC: btest-bg-run manager-1 "ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=manager-1 zeek -B broker ../master.zeek >../master.out"
# @TEST-EXEC: btest-bg-run worker-1 "ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=worker-1 zeek -B broker ../clone.zeek >../clone.out"
# @TEST-EXEC: btest-bg-run worker-2 "ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=worker-2 zeek -B broker ../clone.zeek >../clone2.out"
# @TEST-EXEC: btest-bg-wait 15
#
# @TEST-EXEC: btest-diff worker-1/.stderr
@TEST-START-FILE cluster-layout.zeek
redef Cluster::nodes = {
["manager-1"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT1"))],
["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT2")), $manager="manager-1", $interface="eth0"],
["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT3")), $manager="manager-1", $interface="eth0"],
};
@TEST-END-FILE
@TEST-START-FILE master.zeek
redef exit_only_after_terminate = T;
redef Log::enable_local_logging = T;
redef Log::default_rotation_interval = 0secs;
module TestModule;
global t: table[string] of count &backend=Broker::MEMORY;
global s: table[string] of string &backend=Broker::MEMORY;
event zeek_init()
{
t["a"] = 5;
s["a"] = "b";
print t;
}
event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
{
terminate();
}
@TEST-END-FILE
@TEST-START-FILE clone.zeek
redef exit_only_after_terminate = T;
redef Log::enable_local_logging = T;
redef Log::default_rotation_interval = 0secs;
module TestModule;
global t: table[count] of count &backend=Broker::MEMORY;
global s: table[string] of count &backend=Broker::MEMORY;
event dump_tables()
{
print t;
print s;
terminate();
}
event Cluster::node_up(name: string, id: string)
{
#print "node up", name;
schedule 4secs { dump_tables() };
}
@TEST-END-FILE

View file

@ -36,7 +36,7 @@ type testrec: record {
global t: table[string] of count &backend=Broker::MEMORY; global t: table[string] of count &backend=Broker::MEMORY;
global s: set[string] &backend=Broker::MEMORY; global s: set[string] &backend=Broker::MEMORY;
global r: table[string] of testrec &backend=Broker::MEMORY; global r: table[string] of testrec &broker_allow_complex_type &backend=Broker::MEMORY;
event zeek_init() event zeek_init()
{ {
@ -67,7 +67,7 @@ type testrec: record {
global t: table[string] of count &backend=Broker::MEMORY; global t: table[string] of count &backend=Broker::MEMORY;
global s: set[string] &backend=Broker::MEMORY; global s: set[string] &backend=Broker::MEMORY;
global r: table[string] of testrec &backend=Broker::MEMORY; global r: table[string] of testrec &broker_allow_complex_type &backend=Broker::MEMORY;
event terminate_me() event terminate_me()
{ {
@ -116,7 +116,7 @@ type testrec: record {
global t: table[string] of count &backend=Broker::MEMORY; global t: table[string] of count &backend=Broker::MEMORY;
global s: set[string] &backend=Broker::MEMORY; global s: set[string] &backend=Broker::MEMORY;
global r: table[string] of testrec &backend=Broker::MEMORY; global r: table[string] of testrec &broker_allow_complex_type &backend=Broker::MEMORY;
event dump_tables() event dump_tables()
{ {

View file

@ -36,7 +36,7 @@ type testrec: record {
global t: table[string] of count &backend=Broker::MEMORY; global t: table[string] of count &backend=Broker::MEMORY;
global s: set[string] &backend=Broker::MEMORY; global s: set[string] &backend=Broker::MEMORY;
global r: table[string] of testrec &backend=Broker::MEMORY; global r: table[string] of testrec &broker_allow_complex_type &backend=Broker::MEMORY;
event zeek_init() event zeek_init()
{ {
@ -79,7 +79,7 @@ type testrec: record {
global t: table[string] of count &backend=Broker::MEMORY; global t: table[string] of count &backend=Broker::MEMORY;
global s: set[string] &backend=Broker::MEMORY; global s: set[string] &backend=Broker::MEMORY;
global r: table[string] of testrec &backend=Broker::MEMORY; global r: table[string] of testrec &broker_allow_complex_type &backend=Broker::MEMORY;
event dump_tables() event dump_tables()

View file

@ -0,0 +1,38 @@
# @TEST-PORT: BROKER_PORT
# @TEST-EXEC: zeek -B broker -b one.zeek > output1
# @TEST-EXEC-FAIL: zeek -B broker -b two.zeek > output2
# @TEST-EXEC: btest-diff .stderr
# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-remove-abspath btest-diff .stderr
# the first test writes out the sqlite files...
@TEST-START-FILE one.zeek
module TestModule;
global t: table[string] of string &backend=Broker::SQLITE;
event zeek_init()
{
t["a"] = "a";
t["b"] = "b";
t["c"] = "c";
print t;
}
@TEST-END-FILE
@TEST-START-FILE two.zeek
# the second one reads them in again. Or not because the types are incompatible.
module TestModule;
global t: table[count] of count &backend=Broker::SQLITE;
event zeek_init()
{
print t;
}
@TEST-END-FILE

View file

@ -33,7 +33,7 @@ type testrec: record {
global t: table[string] of count &backend=Broker::SQLITE; global t: table[string] of count &backend=Broker::SQLITE;
global s: set[string] &backend=Broker::SQLITE; global s: set[string] &backend=Broker::SQLITE;
global r: table[string] of testrec &backend=Broker::SQLITE; global r: table[string] of testrec &broker_allow_complex_type &backend=Broker::SQLITE;
event zeek_init() event zeek_init()
{ {
@ -71,7 +71,7 @@ type testrec: record {
global t: table[string] of count &backend=Broker::SQLITE; global t: table[string] of count &backend=Broker::SQLITE;
global s: set[string] &backend=Broker::SQLITE; global s: set[string] &backend=Broker::SQLITE;
global r: table[string] of testrec &backend=Broker::SQLITE; global r: table[string] of testrec &broker_allow_complex_type &backend=Broker::SQLITE;
redef Broker::auto_store_db_directory = ".."; redef Broker::auto_store_db_directory = "..";
@ -104,7 +104,7 @@ type testrec: record {
global t: table[string] of count &backend=Broker::MEMORY; global t: table[string] of count &backend=Broker::MEMORY;
global s: set[string] &backend=Broker::MEMORY; global s: set[string] &backend=Broker::MEMORY;
global r: table[string] of testrec &backend=Broker::MEMORY; global r: table[string] of testrec &broker_allow_complex_type &backend=Broker::MEMORY;
event dump_tables() event dump_tables()