mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 14:48:21 +00:00
Make &broker_store take argument of type string.
It turns out that directly passing a Broker::Store is not really a bright idea. Because - if we do that - we have to later try to intercept when the master/clone is generated to figure out what the actual name of the backing store is. Turns out that it is much easier to just use the name directly - and check if a store with that name exists when we want to insert something. I might want to reconsider this in the future in any case. At the moment this approach just stores one table into an entire store. In theory, one store should be able to handle several tables, but... that's more complex. So let's start with this for now.
This commit is contained in:
parent
031f0cac05
commit
8db83a5ed2
8 changed files with 88 additions and 27 deletions
|
@ -558,10 +558,11 @@ void Attributes::CheckAttr(Attr* a)
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
const Expr *broker_store = a->AttrExpr();
|
if ( a->AttrExpr()->Type()->Tag() != TYPE_STRING )
|
||||||
if ( broker_store->Type()->Tag() != TYPE_OPAQUE || broker_store->Type()->AsOpaqueType()->Name() != "Broker::Store" )
|
{
|
||||||
Error("&broker_store must take an opaque of Broker::Store");
|
Error("&broker_store must take a string argument");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
// Temporary since Broker does not support ListVals - and we cannot easily convert to set/vector
|
// Temporary since Broker does not support ListVals - and we cannot easily convert to set/vector
|
||||||
if ( type->AsTableType()->IndexTypes()->length() != 1 )
|
if ( type->AsTableType()->IndexTypes()->length() != 1 )
|
||||||
|
|
38
src/Val.cc
38
src/Val.cc
|
@ -38,6 +38,7 @@
|
||||||
|
|
||||||
#include "broker/Data.h"
|
#include "broker/Data.h"
|
||||||
#include "broker/Store.h"
|
#include "broker/Store.h"
|
||||||
|
#include "broker/Manager.h"
|
||||||
|
|
||||||
#include "threading/formatters/JSON.h"
|
#include "threading/formatters/JSON.h"
|
||||||
|
|
||||||
|
@ -1471,8 +1472,14 @@ void TableVal::SetAttrs(IntrusivePtr<Attributes> a)
|
||||||
change_func = {NewRef{}, cf->AttrExpr()};
|
change_func = {NewRef{}, cf->AttrExpr()};
|
||||||
|
|
||||||
auto bs = attrs->FindAttr(ATTR_BROKER_STORE);
|
auto bs = attrs->FindAttr(ATTR_BROKER_STORE);
|
||||||
if ( bs )
|
if ( bs && broker_store.empty() ) // this does not mesh well with being updated several times
|
||||||
broker_store = {NewRef{}, bs->AttrExpr()};
|
{
|
||||||
|
IntrusivePtr<Val> c = bs->AttrExpr()->Eval(nullptr);
|
||||||
|
assert(c);
|
||||||
|
assert(c->Type()->Tag() == TYPE_STRING);
|
||||||
|
broker_store = c->AsStringVal()->AsString()->CheckString();
|
||||||
|
broker_mgr->AddForwardedStore(broker_store, {NewRef{}, this});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void TableVal::CheckExpireAttr(attr_tag at)
|
void TableVal::CheckExpireAttr(attr_tag at)
|
||||||
|
@ -1552,16 +1559,16 @@ bool TableVal::Assign(Val* index, HashKey* k, IntrusivePtr<Val> new_val)
|
||||||
|
|
||||||
Modified();
|
Modified();
|
||||||
|
|
||||||
if ( change_func || broker_store )
|
if ( change_func || ( ! broker_store.empty() ) )
|
||||||
{
|
{
|
||||||
auto change_index = index ? IntrusivePtr<Val>{NewRef{}, index}
|
auto change_index = index ? IntrusivePtr<Val>{NewRef{}, index}
|
||||||
: RecoverIndex(&k_copy);
|
: RecoverIndex(&k_copy);
|
||||||
if ( broker_store )
|
if ( ! broker_store.empty() )
|
||||||
SendToStore(change_index.get(), new_val.get(), old_entry_val ? ELEMENT_CHANGED : ELEMENT_NEW);
|
SendToStore(change_index.get(), new_val.get(), old_entry_val ? ELEMENT_CHANGED : ELEMENT_NEW);
|
||||||
if ( change_func )
|
if ( change_func )
|
||||||
{
|
{
|
||||||
Val* v = old_entry_val ? old_entry_val->Value() : new_val.get();
|
Val* v = old_entry_val ? old_entry_val->Value() : new_val.get();
|
||||||
CallChangeFunc(change_index.get(), v, old_entry_val ? ELEMENT_CHANGED : ELEMENT_NEW);
|
CallChangeFunc(change_index.get(), v, old_entry_val ? ELEMENT_CHANGED : ELEMENT_NEW);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2062,23 +2069,16 @@ void TableVal::CallChangeFunc(const Val* index, Val* old_value, OnChangeType tpe
|
||||||
|
|
||||||
void TableVal::SendToStore(const Val* index, const Val* new_value, OnChangeType tpe)
|
void TableVal::SendToStore(const Val* index, const Val* new_value, OnChangeType tpe)
|
||||||
{
|
{
|
||||||
if ( ! broker_store || ! index )
|
if ( broker_store.empty() || ! index )
|
||||||
return;
|
return;
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
auto thestore = broker_store->Eval(0);
|
auto handle = broker_mgr->LookupStore(broker_store);
|
||||||
|
|
||||||
if ( ! thestore )
|
if ( ! handle )
|
||||||
return;
|
return;
|
||||||
|
|
||||||
if ( thestore->Type()->Tag() != TYPE_OPAQUE || broker_store->Type()->AsOpaqueType()->Name() != "Broker::Store" )
|
|
||||||
{
|
|
||||||
thestore->Error("not a Broker::Store");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
auto handle = static_cast<bro_broker::StoreHandleVal*>(thestore.get());
|
|
||||||
if ( index->AsListVal()->Length() != 1 )
|
if ( index->AsListVal()->Length() != 1 )
|
||||||
{
|
{
|
||||||
builtin_error("table with complex index not supported for &broker_store");
|
builtin_error("table with complex index not supported for &broker_store");
|
||||||
|
@ -2149,7 +2149,7 @@ IntrusivePtr<Val> TableVal::Delete(const Val* index)
|
||||||
|
|
||||||
Modified();
|
Modified();
|
||||||
|
|
||||||
if ( broker_store )
|
if ( ! broker_store.empty() )
|
||||||
SendToStore(index, nullptr, ELEMENT_REMOVED);
|
SendToStore(index, nullptr, ELEMENT_REMOVED);
|
||||||
if ( change_func )
|
if ( change_func )
|
||||||
CallChangeFunc(index, va.get(), ELEMENT_REMOVED);
|
CallChangeFunc(index, va.get(), ELEMENT_REMOVED);
|
||||||
|
@ -2174,10 +2174,10 @@ IntrusivePtr<Val> TableVal::Delete(const HashKey* k)
|
||||||
|
|
||||||
Modified();
|
Modified();
|
||||||
|
|
||||||
if ( ( change_func && va ) || broker_store )
|
if ( ( change_func && va ) || ( ! broker_store.empty() ) )
|
||||||
{
|
{
|
||||||
auto index = table_hash->RecoverVals(k);
|
auto index = table_hash->RecoverVals(k);
|
||||||
if ( broker_store )
|
if ( ! broker_store.empty() )
|
||||||
SendToStore(index.get(), nullptr, ELEMENT_REMOVED);
|
SendToStore(index.get(), nullptr, ELEMENT_REMOVED);
|
||||||
if ( change_func && va )
|
if ( change_func && va )
|
||||||
CallChangeFunc(index.get(), va.get(), ELEMENT_REMOVED);
|
CallChangeFunc(index.get(), va.get(), ELEMENT_REMOVED);
|
||||||
|
|
|
@ -895,7 +895,7 @@ protected:
|
||||||
PrefixTable* subnets;
|
PrefixTable* subnets;
|
||||||
IntrusivePtr<Val> def_val;
|
IntrusivePtr<Val> def_val;
|
||||||
IntrusivePtr<Expr> change_func;
|
IntrusivePtr<Expr> change_func;
|
||||||
IntrusivePtr<Expr> broker_store;
|
std::string broker_store;
|
||||||
// prevent recursion of change functions
|
// prevent recursion of change functions
|
||||||
bool in_change_func = false;
|
bool in_change_func = false;
|
||||||
|
|
||||||
|
|
|
@ -212,6 +212,8 @@ void Manager::InitPostScript()
|
||||||
reporter->FatalError("Failed to register broker subscriber with iosource_mgr");
|
reporter->FatalError("Failed to register broker subscriber with iosource_mgr");
|
||||||
if ( ! iosource_mgr->RegisterFd(bstate->status_subscriber.fd(), this) )
|
if ( ! iosource_mgr->RegisterFd(bstate->status_subscriber.fd(), this) )
|
||||||
reporter->FatalError("Failed to register broker status subscriber with iosource_mgr");
|
reporter->FatalError("Failed to register broker status subscriber with iosource_mgr");
|
||||||
|
|
||||||
|
bstate->subscriber.add_topic(broker::topics::store_events, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
void Manager::Terminate()
|
void Manager::Terminate()
|
||||||
|
@ -905,6 +907,20 @@ void Manager::Process()
|
||||||
auto& topic = broker::get_topic(message);
|
auto& topic = broker::get_topic(message);
|
||||||
auto& msg = broker::get_data(message);
|
auto& msg = broker::get_data(message);
|
||||||
|
|
||||||
|
if ( topic == broker::topics::store_events )
|
||||||
|
{
|
||||||
|
if (auto insert = broker::store_event::insert::make(msg))
|
||||||
|
{
|
||||||
|
reporter->Warning("It is an insert!");
|
||||||
|
reporter->Warning("Key/Data (endpoint): %s/%s (%s)", to_string(insert.key()).c_str(), to_string(insert.value()).c_str(), to_string(insert.publisher()).c_str());
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
reporter->Warning("Unhandled event type");
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
DispatchMessage(topic, std::move(msg));
|
DispatchMessage(topic, std::move(msg));
|
||||||
|
@ -1449,6 +1465,7 @@ StoreHandleVal* Manager::MakeMaster(const string& name, broker::backend type,
|
||||||
|
|
||||||
data_stores.emplace(name, handle);
|
data_stores.emplace(name, handle);
|
||||||
iosource_mgr->RegisterFd(handle->proxy.mailbox().descriptor(), this);
|
iosource_mgr->RegisterFd(handle->proxy.mailbox().descriptor(), this);
|
||||||
|
CheckForwarding(name);
|
||||||
|
|
||||||
if ( bstate->endpoint.use_real_time() )
|
if ( bstate->endpoint.use_real_time() )
|
||||||
return handle;
|
return handle;
|
||||||
|
@ -1486,7 +1503,7 @@ StoreHandleVal* Manager::MakeClone(const string& name, double resync_interval,
|
||||||
|
|
||||||
data_stores.emplace(name, handle);
|
data_stores.emplace(name, handle);
|
||||||
iosource_mgr->RegisterFd(handle->proxy.mailbox().descriptor(), this);
|
iosource_mgr->RegisterFd(handle->proxy.mailbox().descriptor(), this);
|
||||||
|
CheckForwarding(name);
|
||||||
return handle;
|
return handle;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1504,6 +1521,9 @@ bool Manager::CloseStore(const string& name)
|
||||||
if ( s == data_stores.end() )
|
if ( s == data_stores.end() )
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
|
auto pubid = s->second->store.frontend_id();
|
||||||
|
forwarded_ids.erase(pubid);
|
||||||
|
|
||||||
iosource_mgr->UnregisterFd(s->second->proxy.mailbox().descriptor(), this);
|
iosource_mgr->UnregisterFd(s->second->proxy.mailbox().descriptor(), this);
|
||||||
|
|
||||||
for ( auto i = pending_queries.begin(); i != pending_queries.end(); )
|
for ( auto i = pending_queries.begin(); i != pending_queries.end(); )
|
||||||
|
@ -1546,4 +1566,34 @@ const Stats& Manager::GetStatistics()
|
||||||
return statistics;
|
return statistics;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool Manager::AddForwardedStore(const std::string& name, IntrusivePtr<TableVal> table)
|
||||||
|
{
|
||||||
|
if ( forwarded_stores.find(name) != forwarded_stores.end() )
|
||||||
|
{
|
||||||
|
reporter->Error("same &broker_store %s specified for two different variables", name.c_str());
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
DBG_LOG(DBG_BROKER, "Adding table forward for data store %s", name.c_str());
|
||||||
|
forwarded_stores.emplace(name, table);
|
||||||
|
|
||||||
|
CheckForwarding(name);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
void Manager::CheckForwarding(const std::string &name)
|
||||||
|
{
|
||||||
|
auto handle = LookupStore(name);
|
||||||
|
if ( ! handle )
|
||||||
|
return;
|
||||||
|
|
||||||
|
if ( forwarded_stores.find(name) == forwarded_stores.end() )
|
||||||
|
return;
|
||||||
|
|
||||||
|
auto pubid = handle->store.frontend_id();
|
||||||
|
|
||||||
|
DBG_LOG(DBG_BROKER, "Resolved publishder %s for table forward for data store %s", to_string(pubid).c_str(), name.c_str());
|
||||||
|
forwarded_ids.emplace(pubid, forwarded_stores.at(name));
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace bro_broker
|
} // namespace bro_broker
|
||||||
|
|
|
@ -8,6 +8,7 @@
|
||||||
#include <broker/endpoint.hh>
|
#include <broker/endpoint.hh>
|
||||||
#include <broker/endpoint_info.hh>
|
#include <broker/endpoint_info.hh>
|
||||||
#include <broker/peer_info.hh>
|
#include <broker/peer_info.hh>
|
||||||
|
#include <broker/publisher_id.hh>
|
||||||
#include <broker/backend.hh>
|
#include <broker/backend.hh>
|
||||||
#include <broker/backend_options.hh>
|
#include <broker/backend_options.hh>
|
||||||
#include <broker/detail/hash.hh>
|
#include <broker/detail/hash.hh>
|
||||||
|
@ -294,6 +295,8 @@ public:
|
||||||
*/
|
*/
|
||||||
StoreHandleVal* LookupStore(const std::string& name);
|
StoreHandleVal* LookupStore(const std::string& name);
|
||||||
|
|
||||||
|
bool AddForwardedStore(const std::string& name, IntrusivePtr<TableVal> table);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Close and unregister a data store. Any existing references to the
|
* Close and unregister a data store. Any existing references to the
|
||||||
* store handle will not be able to be used for any data store operations.
|
* store handle will not be able to be used for any data store operations.
|
||||||
|
@ -347,6 +350,7 @@ private:
|
||||||
void ProcessError(broker::error err);
|
void ProcessError(broker::error err);
|
||||||
void ProcessStoreResponse(StoreHandleVal*, broker::store::response response);
|
void ProcessStoreResponse(StoreHandleVal*, broker::store::response response);
|
||||||
void FlushPendingQueries();
|
void FlushPendingQueries();
|
||||||
|
void CheckForwarding(const std::string& name);
|
||||||
|
|
||||||
void Error(const char* format, ...)
|
void Error(const char* format, ...)
|
||||||
__attribute__((format (printf, 2, 3)));
|
__attribute__((format (printf, 2, 3)));
|
||||||
|
@ -381,6 +385,8 @@ private:
|
||||||
std::string default_log_topic_prefix;
|
std::string default_log_topic_prefix;
|
||||||
std::shared_ptr<BrokerState> bstate;
|
std::shared_ptr<BrokerState> bstate;
|
||||||
std::unordered_map<std::string, StoreHandleVal*> data_stores;
|
std::unordered_map<std::string, StoreHandleVal*> data_stores;
|
||||||
|
std::unordered_map<std::string, IntrusivePtr<TableVal>> forwarded_stores;
|
||||||
|
std::unordered_map<broker::publisher_id, IntrusivePtr<TableVal>> forwarded_ids;
|
||||||
std::unordered_map<query_id, StoreQueryCallback*,
|
std::unordered_map<query_id, StoreQueryCallback*,
|
||||||
query_id_hasher> pending_queries;
|
query_id_hasher> pending_queries;
|
||||||
std::vector<std::string> forwarded_prefixes;
|
std::vector<std::string> forwarded_prefixes;
|
||||||
|
|
|
@ -6,6 +6,7 @@
|
||||||
#include "Trigger.h"
|
#include "Trigger.h"
|
||||||
|
|
||||||
#include <broker/store.hh>
|
#include <broker/store.hh>
|
||||||
|
#include <broker/store_event.hh>
|
||||||
#include <broker/backend.hh>
|
#include <broker/backend.hh>
|
||||||
#include <broker/backend_options.hh>
|
#include <broker/backend_options.hh>
|
||||||
|
|
||||||
|
|
|
@ -310,6 +310,7 @@ when return TOK_WHEN;
|
||||||
&redef return TOK_ATTR_REDEF;
|
&redef return TOK_ATTR_REDEF;
|
||||||
&write_expire return TOK_ATTR_EXPIRE_WRITE;
|
&write_expire return TOK_ATTR_EXPIRE_WRITE;
|
||||||
&on_change return TOK_ATTR_ON_CHANGE;
|
&on_change return TOK_ATTR_ON_CHANGE;
|
||||||
|
&broker_store return TOK_ATTR_BROKER_STORE;
|
||||||
|
|
||||||
@deprecated.* {
|
@deprecated.* {
|
||||||
auto num_files = file_stack.length();
|
auto num_files = file_stack.length();
|
||||||
|
|
|
@ -1,13 +1,15 @@
|
||||||
# @TEST-EXEC: zeek %INPUT >output
|
# @TEST-EXEC: zeek %INPUT >output
|
||||||
# @TEST-EXEC: btest-diff output
|
# @TEST-EXEC: btest-diff output
|
||||||
|
|
||||||
|
redef exit_only_after_terminate = T;
|
||||||
|
|
||||||
module TestModule;
|
module TestModule;
|
||||||
|
|
||||||
global tablestore: opaque of Broker::Store;
|
global tablestore: opaque of Broker::Store;
|
||||||
global setstore: opaque of Broker::Store;
|
global setstore: opaque of Broker::Store;
|
||||||
|
|
||||||
global t: table[string] of count &broker_store=tablestore;
|
global t: table[string] of count &broker_store="table";
|
||||||
global s: set[string] &broker_store=setstore;
|
global s: set[string] &broker_store="set";
|
||||||
|
|
||||||
event zeek_init()
|
event zeek_init()
|
||||||
{
|
{
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue