Misc/minor cleanup after reviewing new Broker table store code

Just some naming/style + minor memory management tweaks (avoid
some copies; use move semantics).
This commit is contained in:
Jon Siwek 2020-07-24 13:57:19 -07:00
parent 823d4e6220
commit b62fd71181
7 changed files with 31 additions and 20 deletions

View file

@ -1,4 +1,11 @@
3.2.0-dev.986 | 2020-07-24 13:57:19 -0700
* Misc/minor cleanup after reviewing new Broker table store code (Jon Siwek, Corelight)
Just some naming/style + minor memory management tweaks (avoid
some copies; use move semantics).
3.2.0-dev.985 | 2020-07-24 11:40:22 -0700 3.2.0-dev.985 | 2020-07-24 11:40:22 -0700
* MySQL: Fix EOF parsing logic bug. (Vlad Grigorescu) * MySQL: Fix EOF parsing logic bug. (Vlad Grigorescu)

View file

@ -1 +1 @@
3.2.0-dev.985 3.2.0-dev.986

@ -1 +1 @@
Subproject commit 5a34500a5b0ddf88c4b75743a7da1063ae30897a Subproject commit e65d7095bf1f079d090e4bf8926e8b78c33192b5

View file

@ -1517,7 +1517,7 @@ void TableVal::SetAttrs(detail::AttributesPtr a)
auto bs = attrs->Find(zeek::detail::ATTR_BROKER_STORE); auto bs = attrs->Find(zeek::detail::ATTR_BROKER_STORE);
if ( bs && broker_store.empty() ) if ( bs && broker_store.empty() )
{ {
IntrusivePtr<Val> c = bs->GetExpr()->Eval(nullptr); auto c = bs->GetExpr()->Eval(nullptr);
assert(c); assert(c);
assert(c->GetType()->Tag() == zeek::TYPE_STRING); assert(c->GetType()->Tag() == zeek::TYPE_STRING);
broker_store = c->AsStringVal()->AsString()->CheckString(); broker_store = c->AsStringVal()->AsString()->CheckString();
@ -2280,7 +2280,7 @@ ValPtr TableVal::Remove(const Val& index, bool broker_forward)
if ( change_func ) if ( change_func )
{ {
// this is totally cheating around the fact that we need a Intrusive pointer. // this is totally cheating around the fact that we need a Intrusive pointer.
IntrusivePtr<Val> changefunc_val = RecreateIndex(*(k.get())); ValPtr changefunc_val = RecreateIndex(*(k.get()));
CallChangeFunc(changefunc_val, va, ELEMENT_REMOVED); CallChangeFunc(changefunc_val, va, ELEMENT_REMOVED);
} }

View file

@ -233,7 +233,7 @@ void Manager::InitializeBrokerStoreForwarding()
auto e = static_cast<BifEnum::Broker::BackendType>(attr->GetExpr()->Eval(nullptr)->AsEnum()); auto e = static_cast<BifEnum::Broker::BackendType>(attr->GetExpr()->Eval(nullptr)->AsEnum());
auto storename = std::string("___sync_store_") + global.first; auto storename = std::string("___sync_store_") + global.first;
id->GetVal()->AsTableVal()->SetBrokerStore(storename); id->GetVal()->AsTableVal()->SetBrokerStore(storename);
AddForwardedStore(storename, {zeek::NewRef{}, id->GetVal()->AsTableVal()}); AddForwardedStore(storename, zeek::cast_intrusive<zeek::TableVal>(id->GetVal()));
// We only create masters here. For clones, we do all the work of setting up // We only create masters here. For clones, we do all the work of setting up
// the forwarding - but we do not try to initialize the clone. We can only initialize // the forwarding - but we do not try to initialize the clone. We can only initialize
@ -955,17 +955,16 @@ void Manager::Process()
had_input = true; had_input = true;
auto& topic = broker::get_topic(message); auto& topic = broker::get_topic(message);
auto& msg = broker::get_data(message);
if ( broker::topics::store_events.prefix_of(topic) ) if ( broker::topics::store_events.prefix_of(topic) )
{ {
ProcessStoreEvent(std::move(msg)); ProcessStoreEvent(broker::move_data(message));
continue; continue;
} }
try try
{ {
DispatchMessage(topic, std::move(msg)); DispatchMessage(topic, broker::move_data(message));
} }
catch ( std::runtime_error& e ) catch ( std::runtime_error& e )
{ {
@ -998,7 +997,12 @@ void Manager::Process()
} }
} }
void Manager::ProcessStoreEventInsertUpdate(zeek::IntrusivePtr<zeek::TableVal> table, const std::string& store_id, const broker::data& key, const broker::data& data, const broker::data& old_value, bool insert) void Manager::ProcessStoreEventInsertUpdate(const zeek::TableValPtr& table,
const std::string& store_id,
const broker::data& key,
const broker::data& data,
const broker::data& old_value,
bool insert)
{ {
auto type = "Insert"; auto type = "Insert";
if ( ! insert ) if ( ! insert )
@ -1053,7 +1057,7 @@ void Manager::ProcessStoreEvent(broker::data msg)
if ( ! storehandle ) if ( ! storehandle )
return; return;
auto table = storehandle->forward_to; const auto& table = storehandle->forward_to;
if ( ! table ) if ( ! table )
return; return;
@ -1069,7 +1073,7 @@ void Manager::ProcessStoreEvent(broker::data msg)
if ( ! storehandle ) if ( ! storehandle )
return; return;
auto table = storehandle->forward_to; const auto& table = storehandle->forward_to;
if ( ! table ) if ( ! table )
return; return;
@ -1265,14 +1269,13 @@ bool bro_broker::Manager::ProcessLogCreate(broker::zeek::LogCreate lc)
} }
} }
if ( ! log_mgr->CreateWriterForRemoteLog(stream_id->AsEnumVal(), writer_id->AsEnumVal(), writer_info.get(), num_fields, fields) ) if ( ! log_mgr->CreateWriterForRemoteLog(stream_id->AsEnumVal(), writer_id->AsEnumVal(), writer_info.release(), num_fields, fields) )
{ {
ODesc d; ODesc d;
stream_id->Describe(&d); stream_id->Describe(&d);
reporter->Warning("failed to create remote log stream for %s locally", d.Description()); reporter->Warning("failed to create remote log stream for %s locally", d.Description());
} }
writer_info.release(); // log_mgr took ownership.
return true; return true;
} }
@ -1622,7 +1625,7 @@ void Manager::BrokerStoreToZeekTable(const std::string& name, const StoreHandleV
// disable &on_change notifications while filling the table. // disable &on_change notifications while filling the table.
table->DisableChangeNotifications(); table->DisableChangeNotifications();
for ( const auto key : *set ) for ( const auto& key : *set )
{ {
auto zeek_key = data_to_val(key, its[0].get()); auto zeek_key = data_to_val(key, its[0].get());
if ( ! zeek_key ) if ( ! zeek_key )
@ -1749,7 +1752,7 @@ const Stats& Manager::GetStatistics()
return statistics; return statistics;
} }
bool Manager::AddForwardedStore(const std::string& name, zeek::IntrusivePtr<zeek::TableVal> table) bool Manager::AddForwardedStore(const std::string& name, zeek::TableValPtr table)
{ {
if ( forwarded_stores.find(name) != forwarded_stores.end() ) if ( forwarded_stores.find(name) != forwarded_stores.end() )
{ {
@ -1758,7 +1761,7 @@ bool Manager::AddForwardedStore(const std::string& name, zeek::IntrusivePtr<zeek
} }
DBG_LOG(DBG_BROKER, "Adding table forward for data store %s", name.c_str()); DBG_LOG(DBG_BROKER, "Adding table forward for data store %s", name.c_str());
forwarded_stores.emplace(name, table); forwarded_stores.emplace(name, std::move(table));
PrepareForwarding(name); PrepareForwarding(name);
return true; return true;

View file

@ -29,6 +29,7 @@ ZEEK_FORWARD_DECLARE_NAMESPACED(TableVal, zeek);
namespace zeek { namespace zeek {
using VectorTypePtr = zeek::IntrusivePtr<zeek::VectorType>; using VectorTypePtr = zeek::IntrusivePtr<zeek::VectorType>;
using TableValPtr = zeek::IntrusivePtr<zeek::TableVal>;
} }
namespace bro_broker { namespace bro_broker {
@ -312,7 +313,7 @@ public:
* @param table pointer to the table/set that is being backed. * @param table pointer to the table/set that is being backed.
* @return true on success, false if the named store is already being forwarded. * @return true on success, false if the named store is already being forwarded.
*/ */
bool AddForwardedStore(const std::string& name, zeek::IntrusivePtr<zeek::TableVal> table); bool AddForwardedStore(const std::string& name, zeek::TableValPtr table);
/** /**
* Close and unregister a data store. Any existing references to the * Close and unregister a data store. Any existing references to the
@ -362,7 +363,7 @@ private:
// Process events used for Broker store backed zeek tables // Process events used for Broker store backed zeek tables
void ProcessStoreEvent(broker::data msg); void ProcessStoreEvent(broker::data msg);
// Common functionality for processing insert and update events. // Common functionality for processing insert and update events.
void ProcessStoreEventInsertUpdate(zeek::IntrusivePtr<zeek::TableVal> table, const std::string& store_id, const broker::data& key, const broker::data& data, const broker::data& old_value, bool insert); void ProcessStoreEventInsertUpdate(const zeek::TableValPtr& table, const std::string& store_id, const broker::data& key, const broker::data& data, const broker::data& old_value, bool insert);
void ProcessEvent(const broker::topic& topic, broker::zeek::Event ev); void ProcessEvent(const broker::topic& topic, broker::zeek::Event ev);
bool ProcessLogCreate(broker::zeek::LogCreate lc); bool ProcessLogCreate(broker::zeek::LogCreate lc);
bool ProcessLogWrite(broker::zeek::LogWrite lw); bool ProcessLogWrite(broker::zeek::LogWrite lw);
@ -412,7 +413,7 @@ private:
std::string default_log_topic_prefix; std::string default_log_topic_prefix;
std::shared_ptr<BrokerState> bstate; std::shared_ptr<BrokerState> bstate;
std::unordered_map<std::string, StoreHandleVal*> data_stores; std::unordered_map<std::string, StoreHandleVal*> data_stores;
std::unordered_map<std::string, zeek::IntrusivePtr<zeek::TableVal>> forwarded_stores; std::unordered_map<std::string, zeek::TableValPtr> forwarded_stores;
std::unordered_map<query_id, StoreQueryCallback*, std::unordered_map<query_id, StoreQueryCallback*,
query_id_hasher> pending_queries; query_id_hasher> pending_queries;
std::vector<std::string> forwarded_prefixes; std::vector<std::string> forwarded_prefixes;

View file

@ -123,7 +123,7 @@ public:
broker::store::proxy proxy; broker::store::proxy proxy;
broker::publisher_id store_pid; broker::publisher_id store_pid;
// Zeek table that events are forwarded to. // Zeek table that events are forwarded to.
zeek::IntrusivePtr<zeek::TableVal> forward_to; zeek::TableValPtr forward_to;
protected: protected: