mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 06:38:20 +00:00
BrokerStore<->Zeek table: adopt to recent Zeek API changes
This commit is contained in:
parent
e1a45d33e0
commit
41dd7df69a
6 changed files with 20 additions and 21 deletions
|
@ -620,7 +620,7 @@ void Attributes::CheckAttr(Attr* a)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Temporary since Broker does not support ListVals - and we cannot easily convert to set/vector
|
// Temporary since Broker does not support ListVals - and we cannot easily convert to set/vector
|
||||||
if ( type->AsTableType()->IndexTypes().size() != 1 )
|
if ( type->AsTableType()->GetIndexTypes().size() != 1 )
|
||||||
{
|
{
|
||||||
Error("&backend only supports one-element set/table indexes");
|
Error("&backend only supports one-element set/table indexes");
|
||||||
}
|
}
|
||||||
|
@ -655,7 +655,7 @@ void Attributes::CheckAttr(Attr* a)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Temporary since Broker does not support ListVals - and we cannot easily convert to set/vector
|
// Temporary since Broker does not support ListVals - and we cannot easily convert to set/vector
|
||||||
if ( type->AsTableType()->IndexTypes().size() != 1 )
|
if ( type->AsTableType()->GetIndexTypes().size() != 1 )
|
||||||
{
|
{
|
||||||
Error("&broker_store only supports one-element set/table indexes");
|
Error("&broker_store only supports one-element set/table indexes");
|
||||||
}
|
}
|
||||||
|
|
13
src/Val.cc
13
src/Val.cc
|
@ -2151,7 +2151,6 @@ void TableVal::CallChangeFunc(const ValPtr& index,
|
||||||
in_change_func = false;
|
in_change_func = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
<<<<<<< HEAD
|
|
||||||
void TableVal::SendToStore(const Val* index, const TableEntryVal* new_entry_val, OnChangeType tpe)
|
void TableVal::SendToStore(const Val* index, const TableEntryVal* new_entry_val, OnChangeType tpe)
|
||||||
{
|
{
|
||||||
if ( broker_store.empty() || ! index )
|
if ( broker_store.empty() || ! index )
|
||||||
|
@ -2171,7 +2170,7 @@ void TableVal::SendToStore(const Val* index, const TableEntryVal* new_entry_val,
|
||||||
{
|
{
|
||||||
if ( index->AsListVal()->Length() != 1 )
|
if ( index->AsListVal()->Length() != 1 )
|
||||||
{
|
{
|
||||||
builtin_error("table with complex index not supported for &broker_store");
|
zeek::emit_builtin_error("table with complex index not supported for &broker_store");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2188,7 +2187,7 @@ void TableVal::SendToStore(const Val* index, const TableEntryVal* new_entry_val,
|
||||||
|
|
||||||
if ( ! broker_index )
|
if ( ! broker_index )
|
||||||
{
|
{
|
||||||
builtin_error("invalid Broker data conversation for table index");
|
zeek::emit_builtin_error("invalid Broker data conversation for table index");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2224,14 +2223,14 @@ void TableVal::SendToStore(const Val* index, const TableEntryVal* new_entry_val,
|
||||||
{
|
{
|
||||||
if ( ! new_entry_val )
|
if ( ! new_entry_val )
|
||||||
{
|
{
|
||||||
builtin_error("did not receive new value for broker-store send operation");
|
zeek::emit_builtin_error("did not receive new value for broker-store send operation");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
auto new_value = new_entry_val->GetVal().get();
|
auto new_value = new_entry_val->GetVal().get();
|
||||||
auto broker_val = bro_broker::val_to_data(new_value);
|
auto broker_val = bro_broker::val_to_data(new_value);
|
||||||
if ( ! broker_val )
|
if ( ! broker_val )
|
||||||
{
|
{
|
||||||
builtin_error("invalid Broker data conversation for table value");
|
zeek::emit_builtin_error("invalid Broker data conversation for table value");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
handle->store.put(std::move(*broker_index), std::move(*broker_val), expiry);
|
handle->store.put(std::move(*broker_index), std::move(*broker_val), expiry);
|
||||||
|
@ -2249,11 +2248,11 @@ void TableVal::SendToStore(const Val* index, const TableEntryVal* new_entry_val,
|
||||||
}
|
}
|
||||||
catch ( InterpreterException& e )
|
catch ( InterpreterException& e )
|
||||||
{
|
{
|
||||||
builtin_error("The previous error was encountered while trying to resolve the &broker_store attribute of the set/table. Potentially the Broker::Store has not been initialized before being used.");
|
zeek::emit_builtin_error("The previous error was encountered while trying to resolve the &broker_store attribute of the set/table. Potentially the Broker::Store has not been initialized before being used.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ValPtr TableVal::Remove(const Val& index, bool broker_forward))
|
ValPtr TableVal::Remove(const Val& index, bool broker_forward)
|
||||||
{
|
{
|
||||||
auto k = MakeHashKey(index);
|
auto k = MakeHashKey(index);
|
||||||
// this is totally cheating around the fact that we need a Intrusive pointer.
|
// this is totally cheating around the fact that we need a Intrusive pointer.
|
||||||
|
|
|
@ -1058,7 +1058,7 @@ protected:
|
||||||
enum OnChangeType { ELEMENT_NEW, ELEMENT_CHANGED, ELEMENT_REMOVED, ELEMENT_EXPIRED };
|
enum OnChangeType { ELEMENT_NEW, ELEMENT_CHANGED, ELEMENT_REMOVED, ELEMENT_EXPIRED };
|
||||||
|
|
||||||
// Calls &change_func.
|
// Calls &change_func.
|
||||||
void CallChangeFunc(const Val* index, const ValPtr& old_value,
|
void CallChangeFunc(const ValPtr& index, const ValPtr& old_value,
|
||||||
OnChangeType tpe);
|
OnChangeType tpe);
|
||||||
|
|
||||||
// Sends data on to backing Broker Store
|
// Sends data on to backing Broker Store
|
||||||
|
|
|
@ -222,7 +222,7 @@ void Manager::InitPostScript()
|
||||||
|
|
||||||
void Manager::InitializeBrokerStoreForwarding()
|
void Manager::InitializeBrokerStoreForwarding()
|
||||||
{
|
{
|
||||||
const auto& globals = global_scope()->Vars();
|
const auto& globals = zeek::detail::global_scope()->Vars();
|
||||||
|
|
||||||
for ( const auto& global : globals )
|
for ( const auto& global : globals )
|
||||||
{
|
{
|
||||||
|
@ -233,7 +233,7 @@ void Manager::InitializeBrokerStoreForwarding()
|
||||||
auto e = static_cast<BifEnum::Broker::BackendType>(attr->GetExpr()->Eval(nullptr)->AsEnum());
|
auto e = static_cast<BifEnum::Broker::BackendType>(attr->GetExpr()->Eval(nullptr)->AsEnum());
|
||||||
auto storename = std::string("___sync_store_") + global.first;
|
auto storename = std::string("___sync_store_") + global.first;
|
||||||
id->GetVal()->AsTableVal()->SetBrokerStore(storename);
|
id->GetVal()->AsTableVal()->SetBrokerStore(storename);
|
||||||
AddForwardedStore(storename, {NewRef{}, id->GetVal()->AsTableVal()});
|
AddForwardedStore(storename, {zeek::NewRef{}, id->GetVal()->AsTableVal()});
|
||||||
|
|
||||||
// we only create masters here. For clones, we do all the work of setting up
|
// we only create masters here. For clones, we do all the work of setting up
|
||||||
// the forwarding - but we do not try to initialize the clone. We can only initialize
|
// the forwarding - but we do not try to initialize the clone. We can only initialize
|
||||||
|
@ -1024,7 +1024,7 @@ void Manager::ProcessStoreEvent(const broker::topic& topic, broker::data msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
// FIXME: expiry!
|
// FIXME: expiry!
|
||||||
const auto& its = table->GetType()->AsTableType()->IndexTypes();
|
const auto& its = table->GetType()->AsTableType()->GetIndexTypes();
|
||||||
assert( its.size() == 1 );
|
assert( its.size() == 1 );
|
||||||
auto zeek_key = data_to_val(std::move(key), its[0].get());
|
auto zeek_key = data_to_val(std::move(key), its[0].get());
|
||||||
if ( ! zeek_key )
|
if ( ! zeek_key )
|
||||||
|
@ -1073,7 +1073,7 @@ void Manager::ProcessStoreEvent(const broker::topic& topic, broker::data msg)
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const auto& its = table->GetType()->AsTableType()->IndexTypes();
|
const auto& its = table->GetType()->AsTableType()->GetIndexTypes();
|
||||||
assert( its.size() == 1 );
|
assert( its.size() == 1 );
|
||||||
auto zeek_key = data_to_val(std::move(key), its[0].get());
|
auto zeek_key = data_to_val(std::move(key), its[0].get());
|
||||||
if ( ! zeek_key )
|
if ( ! zeek_key )
|
||||||
|
@ -1113,7 +1113,7 @@ void Manager::ProcessStoreEvent(const broker::topic& topic, broker::data msg)
|
||||||
|
|
||||||
auto key = erase.key();
|
auto key = erase.key();
|
||||||
DBG_LOG(DBG_BROKER, "Store %s: Erase key %s", erase.store_id().c_str(), to_string(key).c_str());
|
DBG_LOG(DBG_BROKER, "Store %s: Erase key %s", erase.store_id().c_str(), to_string(key).c_str());
|
||||||
const auto& its = table->GetType()->AsTableType()->IndexTypes();
|
const auto& its = table->GetType()->AsTableType()->GetIndexTypes();
|
||||||
assert( its.size() == 1 );
|
assert( its.size() == 1 );
|
||||||
auto zeek_key = data_to_val(std::move(key), its[0].get());
|
auto zeek_key = data_to_val(std::move(key), its[0].get());
|
||||||
if ( ! zeek_key )
|
if ( ! zeek_key )
|
||||||
|
@ -1677,7 +1677,7 @@ void Manager::BrokerStoreToZeekTable(const std::string& name, const StoreHandleV
|
||||||
|
|
||||||
auto set = caf::get_if<broker::set>(&(keys->get_data()));
|
auto set = caf::get_if<broker::set>(&(keys->get_data()));
|
||||||
auto table = handle->forward_to;
|
auto table = handle->forward_to;
|
||||||
const auto& its = table->GetType()->AsTableType()->IndexTypes();
|
const auto& its = table->GetType()->AsTableType()->GetIndexTypes();
|
||||||
bool is_set = table->GetType()->IsSet();
|
bool is_set = table->GetType()->IsSet();
|
||||||
assert( its.size() == 1 );
|
assert( its.size() == 1 );
|
||||||
for ( const auto key : *set )
|
for ( const auto key : *set )
|
||||||
|
@ -1804,7 +1804,7 @@ const Stats& Manager::GetStatistics()
|
||||||
return statistics;
|
return statistics;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Manager::AddForwardedStore(const std::string& name, IntrusivePtr<TableVal> table)
|
bool Manager::AddForwardedStore(const std::string& name, zeek::IntrusivePtr<zeek::TableVal> table)
|
||||||
{
|
{
|
||||||
if ( forwarded_stores.find(name) != forwarded_stores.end() )
|
if ( forwarded_stores.find(name) != forwarded_stores.end() )
|
||||||
{
|
{
|
||||||
|
@ -1812,7 +1812,7 @@ bool Manager::AddForwardedStore(const std::string& name, IntrusivePtr<TableVal>
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
DBG_LOG(DBG_BROKER, "Adding table forward for data store %s", name.c_str());
|
DBG_LOG(DBG_BROKER, "Adding table forward for data store %s", name.c_str());
|
||||||
forwarded_stores.emplace(name, table);
|
forwarded_stores.emplace(name, table);
|
||||||
|
|
||||||
CheckForwarding(name);
|
CheckForwarding(name);
|
||||||
|
|
|
@ -303,7 +303,7 @@ public:
|
||||||
*/
|
*/
|
||||||
StoreHandleVal* LookupStore(const std::string& name);
|
StoreHandleVal* LookupStore(const std::string& name);
|
||||||
|
|
||||||
bool AddForwardedStore(const std::string& name, IntrusivePtr<TableVal> table);
|
bool AddForwardedStore(const std::string& name, zeek::IntrusivePtr<zeek::TableVal> table);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Close and unregister a data store. Any existing references to the
|
* Close and unregister a data store. Any existing references to the
|
||||||
|
@ -399,7 +399,7 @@ private:
|
||||||
std::string default_log_topic_prefix;
|
std::string default_log_topic_prefix;
|
||||||
std::shared_ptr<BrokerState> bstate;
|
std::shared_ptr<BrokerState> bstate;
|
||||||
std::unordered_map<std::string, StoreHandleVal*> data_stores;
|
std::unordered_map<std::string, StoreHandleVal*> data_stores;
|
||||||
std::unordered_map<std::string, IntrusivePtr<TableVal>> forwarded_stores;
|
std::unordered_map<std::string, zeek::IntrusivePtr<zeek::TableVal>> forwarded_stores;
|
||||||
std::unordered_map<query_id, StoreQueryCallback*,
|
std::unordered_map<query_id, StoreQueryCallback*,
|
||||||
query_id_hasher> pending_queries;
|
query_id_hasher> pending_queries;
|
||||||
std::vector<std::string> forwarded_prefixes;
|
std::vector<std::string> forwarded_prefixes;
|
||||||
|
|
|
@ -123,7 +123,7 @@ public:
|
||||||
broker::store::proxy proxy;
|
broker::store::proxy proxy;
|
||||||
broker::publisher_id store_pid;
|
broker::publisher_id store_pid;
|
||||||
// Zeek table that events are forwarded to.
|
// Zeek table that events are forwarded to.
|
||||||
IntrusivePtr<TableVal> forward_to;
|
zeek::IntrusivePtr<zeek::TableVal> forward_to;
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue