mirror of
https://github.com/zeek/zeek.git
synced 2025-10-12 03:28:19 +00:00
Automatic bro table->brokerstore insert operations
We now have an &broker_store attribute which automatically sends inserts/deletes into a set/table to broker. This might work - I actually did not test if the data ends up in the broker store in the end. A limitation is that the table/set currently only can have a one-element type since Broker doesn't support the list type.
This commit is contained in:
parent
c306fcf3d7
commit
68f0fe9e8c
7 changed files with 156 additions and 11 deletions
26
src/Attr.cc
26
src/Attr.cc
|
@ -14,7 +14,8 @@ const char* attr_name(attr_tag t)
|
||||||
"&read_expire", "&write_expire", "&create_expire",
|
"&read_expire", "&write_expire", "&create_expire",
|
||||||
"&raw_output", "&priority",
|
"&raw_output", "&priority",
|
||||||
"&group", "&log", "&error_handler", "&type_column",
|
"&group", "&log", "&error_handler", "&type_column",
|
||||||
"(&tracked)", "&deprecated",
|
"(&tracked)", "&on_change", "&broker_store",
|
||||||
|
"&deprecated",
|
||||||
};
|
};
|
||||||
|
|
||||||
return attr_names[int(t)];
|
return attr_names[int(t)];
|
||||||
|
@ -486,7 +487,7 @@ void Attributes::CheckAttr(Attr* a)
|
||||||
{
|
{
|
||||||
if ( type->Tag() != TYPE_TABLE )
|
if ( type->Tag() != TYPE_TABLE )
|
||||||
{
|
{
|
||||||
Error("&on_change only applicable to tables");
|
Error("&on_change only applicable to sets/tables");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -548,6 +549,27 @@ void Attributes::CheckAttr(Attr* a)
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
case ATTR_BROKER_STORE:
|
||||||
|
{
|
||||||
|
if ( type->Tag() != TYPE_TABLE )
|
||||||
|
{
|
||||||
|
Error("&broker_store only applicable to sets/tables");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
const Expr *broker_store = a->AttrExpr();
|
||||||
|
if ( broker_store->Type()->Tag() != TYPE_OPAQUE || broker_store->Type()->AsOpaqueType()->Name() != "Broker::Store" )
|
||||||
|
Error("&broker_store must take an opaque of Broker::Store");
|
||||||
|
|
||||||
|
|
||||||
|
// Temporary since Broker does not support ListVals - and we cannot easily convert to set/vector
|
||||||
|
if ( type->AsTableType()->IndexTypes()->length() != 1 )
|
||||||
|
{
|
||||||
|
Error("&broker_store only supports one-element set/table indexes");
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
case ATTR_TRACKED:
|
case ATTR_TRACKED:
|
||||||
// FIXME: Check here for global ID?
|
// FIXME: Check here for global ID?
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -28,6 +28,7 @@ typedef enum {
|
||||||
ATTR_TYPE_COLUMN, // for input framework
|
ATTR_TYPE_COLUMN, // for input framework
|
||||||
ATTR_TRACKED, // hidden attribute, tracked by NotifierRegistry
|
ATTR_TRACKED, // hidden attribute, tracked by NotifierRegistry
|
||||||
ATTR_ON_CHANGE, // for table change tracking
|
ATTR_ON_CHANGE, // for table change tracking
|
||||||
|
ATTR_BROKER_STORE, // for broker-store backed tables
|
||||||
ATTR_DEPRECATED,
|
ATTR_DEPRECATED,
|
||||||
#define NUM_ATTRS (int(ATTR_DEPRECATED) + 1)
|
#define NUM_ATTRS (int(ATTR_DEPRECATED) + 1)
|
||||||
} attr_tag;
|
} attr_tag;
|
||||||
|
|
105
src/Val.cc
105
src/Val.cc
|
@ -26,6 +26,7 @@
|
||||||
#include "IPAddr.h"
|
#include "IPAddr.h"
|
||||||
|
|
||||||
#include "broker/Data.h"
|
#include "broker/Data.h"
|
||||||
|
#include "broker/Store.h"
|
||||||
|
|
||||||
#include "threading/formatters/JSON.h"
|
#include "threading/formatters/JSON.h"
|
||||||
|
|
||||||
|
@ -1415,6 +1416,12 @@ void TableVal::SetAttrs(Attributes* a)
|
||||||
change_func = cf->AttrExpr();
|
change_func = cf->AttrExpr();
|
||||||
change_func->Ref();
|
change_func->Ref();
|
||||||
}
|
}
|
||||||
|
auto bs = attrs->FindAttr(ATTR_BROKER_STORE);
|
||||||
|
if ( bs )
|
||||||
|
{
|
||||||
|
broker_store = bs->AttrExpr();
|
||||||
|
broker_store->Ref();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void TableVal::CheckExpireAttr(attr_tag at)
|
void TableVal::CheckExpireAttr(attr_tag at)
|
||||||
|
@ -1492,13 +1499,18 @@ int TableVal::Assign(Val* index, HashKey* k, Val* new_val)
|
||||||
|
|
||||||
Modified();
|
Modified();
|
||||||
|
|
||||||
if ( change_func )
|
if ( change_func || broker_store )
|
||||||
{
|
{
|
||||||
auto change_index = index->Ref();
|
auto change_index = index->Ref();
|
||||||
if ( ! change_index )
|
if ( ! change_index )
|
||||||
RecoverIndex(&k_copy);
|
RecoverIndex(&k_copy);
|
||||||
Val *v = old_entry_val ? old_entry_val->Value() : new_val;
|
if ( broker_store )
|
||||||
CallChangeFunc(change_index, v, old_entry_val ? element_changed : element_new);
|
SendToStore(change_index, new_val, old_entry_val ? element_changed : element_new);
|
||||||
|
if ( change_func )
|
||||||
|
{
|
||||||
|
Val *v = old_entry_val ? old_entry_val->Value() : new_val;
|
||||||
|
CallChangeFunc(change_index, v, old_entry_val ? element_changed : element_new);
|
||||||
|
}
|
||||||
Unref(change_index);
|
Unref(change_index);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1971,9 +1983,7 @@ void TableVal::CallChangeFunc(const Val* index, Val* old_value, OnChangeType tpe
|
||||||
Val* thefunc = change_func->Eval(0);
|
Val* thefunc = change_func->Eval(0);
|
||||||
|
|
||||||
if ( ! thefunc )
|
if ( ! thefunc )
|
||||||
{
|
|
||||||
return;
|
return;
|
||||||
}
|
|
||||||
|
|
||||||
if ( thefunc->Type()->Tag() != TYPE_FUNC )
|
if ( thefunc->Type()->Tag() != TYPE_FUNC )
|
||||||
{
|
{
|
||||||
|
@ -2013,6 +2023,82 @@ void TableVal::CallChangeFunc(const Val* index, Val* old_value, OnChangeType tpe
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void TableVal::SendToStore(const Val* index, const Val* new_value, OnChangeType tpe)
|
||||||
|
{
|
||||||
|
if ( ! broker_store || ! index )
|
||||||
|
return;
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
Val* thestore = broker_store->Eval(0);
|
||||||
|
|
||||||
|
if ( ! thestore )
|
||||||
|
return;
|
||||||
|
|
||||||
|
if ( thestore->Type()->Tag() != TYPE_OPAQUE || broker_store->Type()->AsOpaqueType()->Name() != "Broker::Store" )
|
||||||
|
{
|
||||||
|
thestore->Error("not a Broker::Store");
|
||||||
|
Unref(thestore);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto handle = static_cast<bro_broker::StoreHandleVal*>(thestore);
|
||||||
|
if ( index->AsListVal()->Length() != 1 )
|
||||||
|
{
|
||||||
|
builtin_error("table with complex index not supported for &broker_store");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const auto index_val = index->AsListVal()->Index(0);
|
||||||
|
auto key_val = new StringVal("test");
|
||||||
|
auto broker_key = bro_broker::val_to_data(key_val);
|
||||||
|
auto broker_index = bro_broker::val_to_data(index_val);
|
||||||
|
Unref(key_val);
|
||||||
|
|
||||||
|
if ( ! broker_key )
|
||||||
|
{
|
||||||
|
builtin_error("invalid Broker data conversion for &broker_store_key");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if ( ! broker_index )
|
||||||
|
{
|
||||||
|
builtin_error("invalid Broker data conversation for table index");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
switch ( tpe )
|
||||||
|
{
|
||||||
|
case element_new:
|
||||||
|
case element_changed:
|
||||||
|
if ( table_type->IsSet() )
|
||||||
|
handle->store.insert_into(std::move(*broker_key), std::move(*broker_index));
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if ( ! new_value )
|
||||||
|
{
|
||||||
|
builtin_error("did not receive new value for broker-store send operation");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
auto broker_val = bro_broker::val_to_data(new_value);
|
||||||
|
if ( ! broker_val )
|
||||||
|
{
|
||||||
|
builtin_error("invalid Broker data conversation for table value");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
handle->store.insert_into(std::move(*broker_key), std::move(*broker_index), std::move(*broker_val));
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case element_removed:
|
||||||
|
handle->store.remove_from(std::move(*broker_key), std::move(*broker_index));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch ( InterpreterException& e )
|
||||||
|
{
|
||||||
|
builtin_error("The previous error was encountered while trying to resolve the &broker_store attribute of the set/table. Potentially the Broker::Store has not been initialized before being used.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Val* TableVal::Delete(const Val* index)
|
Val* TableVal::Delete(const Val* index)
|
||||||
{
|
{
|
||||||
HashKey* k = ComputeHash(index);
|
HashKey* k = ComputeHash(index);
|
||||||
|
@ -2027,6 +2113,8 @@ Val* TableVal::Delete(const Val* index)
|
||||||
|
|
||||||
Modified();
|
Modified();
|
||||||
|
|
||||||
|
if ( broker_store )
|
||||||
|
SendToStore(index, nullptr, element_removed);
|
||||||
if ( change_func )
|
if ( change_func )
|
||||||
CallChangeFunc(index, va, element_removed);
|
CallChangeFunc(index, va, element_removed);
|
||||||
|
|
||||||
|
@ -2050,10 +2138,13 @@ Val* TableVal::Delete(const HashKey* k)
|
||||||
|
|
||||||
Modified();
|
Modified();
|
||||||
|
|
||||||
if ( change_func )
|
if ( change_func || broker_store )
|
||||||
{
|
{
|
||||||
auto index = table_hash->RecoverVals(k);
|
auto index = table_hash->RecoverVals(k);
|
||||||
CallChangeFunc(index, va->Ref(), element_removed);
|
if ( broker_store )
|
||||||
|
SendToStore(index, nullptr, element_removed);
|
||||||
|
if ( change_func )
|
||||||
|
CallChangeFunc(index, va->Ref(), element_removed);
|
||||||
Unref(index);
|
Unref(index);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -928,6 +928,9 @@ protected:
|
||||||
// Calls &change_func. Does not take ownership of values. (Refs if needed).
|
// Calls &change_func. Does not take ownership of values. (Refs if needed).
|
||||||
void CallChangeFunc(const Val* index, Val* old_value, OnChangeType tpe);
|
void CallChangeFunc(const Val* index, Val* old_value, OnChangeType tpe);
|
||||||
|
|
||||||
|
// Sends data on to backing Broker Store
|
||||||
|
void SendToStore(const Val* index, const Val* new_value, OnChangeType tpe);
|
||||||
|
|
||||||
Val* DoClone(CloneState* state) override;
|
Val* DoClone(CloneState* state) override;
|
||||||
|
|
||||||
TableType* table_type;
|
TableType* table_type;
|
||||||
|
@ -940,6 +943,7 @@ protected:
|
||||||
PrefixTable* subnets;
|
PrefixTable* subnets;
|
||||||
Val* def_val;
|
Val* def_val;
|
||||||
Expr* change_func = nullptr;
|
Expr* change_func = nullptr;
|
||||||
|
Expr* broker_store = nullptr;
|
||||||
};
|
};
|
||||||
|
|
||||||
class RecordVal : public Val, public notifier::Modifiable {
|
class RecordVal : public Val, public notifier::Modifiable {
|
||||||
|
|
|
@ -5,7 +5,7 @@
|
||||||
// Switching parser table type fixes ambiguity problems.
|
// Switching parser table type fixes ambiguity problems.
|
||||||
%define lr.type ielr
|
%define lr.type ielr
|
||||||
|
|
||||||
%expect 111
|
%expect 117
|
||||||
|
|
||||||
%token TOK_ADD TOK_ADD_TO TOK_ADDR TOK_ANY
|
%token TOK_ADD TOK_ADD_TO TOK_ADDR TOK_ANY
|
||||||
%token TOK_ATENDIF TOK_ATELSE TOK_ATIF TOK_ATIFDEF TOK_ATIFNDEF
|
%token TOK_ATENDIF TOK_ATELSE TOK_ATIF TOK_ATIFDEF TOK_ATIFNDEF
|
||||||
|
@ -24,7 +24,7 @@
|
||||||
%token TOK_ATTR_ADD_FUNC TOK_ATTR_DEFAULT TOK_ATTR_OPTIONAL TOK_ATTR_REDEF
|
%token TOK_ATTR_ADD_FUNC TOK_ATTR_DEFAULT TOK_ATTR_OPTIONAL TOK_ATTR_REDEF
|
||||||
%token TOK_ATTR_DEL_FUNC TOK_ATTR_EXPIRE_FUNC
|
%token TOK_ATTR_DEL_FUNC TOK_ATTR_EXPIRE_FUNC
|
||||||
%token TOK_ATTR_EXPIRE_CREATE TOK_ATTR_EXPIRE_READ TOK_ATTR_EXPIRE_WRITE
|
%token TOK_ATTR_EXPIRE_CREATE TOK_ATTR_EXPIRE_READ TOK_ATTR_EXPIRE_WRITE
|
||||||
%token TOK_ATTR_RAW_OUTPUT TOK_ATTR_ON_CHANGE
|
%token TOK_ATTR_RAW_OUTPUT TOK_ATTR_ON_CHANGE TOK_ATTR_BROKER_STORE
|
||||||
%token TOK_ATTR_PRIORITY TOK_ATTR_LOG TOK_ATTR_ERROR_HANDLER
|
%token TOK_ATTR_PRIORITY TOK_ATTR_LOG TOK_ATTR_ERROR_HANDLER
|
||||||
%token TOK_ATTR_TYPE_COLUMN TOK_ATTR_DEPRECATED
|
%token TOK_ATTR_TYPE_COLUMN TOK_ATTR_DEPRECATED
|
||||||
|
|
||||||
|
@ -1329,6 +1329,8 @@ attr:
|
||||||
{ $$ = new Attr(ATTR_DEL_FUNC, $3); }
|
{ $$ = new Attr(ATTR_DEL_FUNC, $3); }
|
||||||
| TOK_ATTR_ON_CHANGE '=' expr
|
| TOK_ATTR_ON_CHANGE '=' expr
|
||||||
{ $$ = new Attr(ATTR_ON_CHANGE, $3); }
|
{ $$ = new Attr(ATTR_ON_CHANGE, $3); }
|
||||||
|
| TOK_ATTR_BROKER_STORE '=' expr
|
||||||
|
{ $$ = new Attr(ATTR_BROKER_STORE, $3); }
|
||||||
| TOK_ATTR_EXPIRE_FUNC '=' expr
|
| TOK_ATTR_EXPIRE_FUNC '=' expr
|
||||||
{ $$ = new Attr(ATTR_EXPIRE_FUNC, $3); }
|
{ $$ = new Attr(ATTR_EXPIRE_FUNC, $3); }
|
||||||
| TOK_ATTR_EXPIRE_CREATE '=' expr
|
| TOK_ATTR_EXPIRE_CREATE '=' expr
|
||||||
|
|
|
@ -299,6 +299,7 @@ when return TOK_WHEN;
|
||||||
&redef return TOK_ATTR_REDEF;
|
&redef return TOK_ATTR_REDEF;
|
||||||
&write_expire return TOK_ATTR_EXPIRE_WRITE;
|
&write_expire return TOK_ATTR_EXPIRE_WRITE;
|
||||||
&on_change return TOK_ATTR_ON_CHANGE;
|
&on_change return TOK_ATTR_ON_CHANGE;
|
||||||
|
&broker_store return TOK_ATTR_BROKER_STORE;
|
||||||
|
|
||||||
@deprecated.* {
|
@deprecated.* {
|
||||||
auto num_files = file_stack.length();
|
auto num_files = file_stack.length();
|
||||||
|
|
24
testing/btest/language/brokerstore.test
Normal file
24
testing/btest/language/brokerstore.test
Normal file
|
@ -0,0 +1,24 @@
|
||||||
|
# @TEST-EXEC: zeek %INPUT >output
|
||||||
|
# @TEST-EXEC: btest-diff output
|
||||||
|
|
||||||
|
module TestModule;
|
||||||
|
|
||||||
|
global tablestore: opaque of Broker::Store;
|
||||||
|
global setstore: opaque of Broker::Store;
|
||||||
|
|
||||||
|
global t: table[string] of count &broker_store=tablestore;
|
||||||
|
global s: set[string] &broker_store=setstore;
|
||||||
|
|
||||||
|
event zeek_init()
|
||||||
|
{
|
||||||
|
tablestore = Broker::create_master("table");
|
||||||
|
setstore = Broker::create_master("set");
|
||||||
|
print "inserting";
|
||||||
|
t["a"] = 5;
|
||||||
|
add s["hi"];
|
||||||
|
print "changing";
|
||||||
|
t["a"] = 2;
|
||||||
|
print "deleting";
|
||||||
|
delete t["a"];
|
||||||
|
delete s["hi"];
|
||||||
|
}
|
Loading…
Add table
Add a link
Reference in a new issue