Move bro_broker code to zeek::Broker namespace

This commit is contained in:
Tim Wojtulewicz 2020-08-01 10:49:16 -07:00
parent cba1bc18a5
commit f1cfd5aa2b
18 changed files with 420 additions and 329 deletions

View file

@ -26,7 +26,7 @@
using namespace std;
namespace bro_broker {
namespace zeek::Broker {
static inline zeek::Val* get_option(const char* option)
{
@ -154,12 +154,12 @@ void Manager::InitPostScript()
zeek_table_manager = get_option("Broker::table_store_master")->AsBool();
zeek_table_db_directory = get_option("Broker::table_store_db_directory")->AsString()->CheckString();
opaque_of_data_type = zeek::make_intrusive<zeek::OpaqueType>("Broker::Data");
opaque_of_set_iterator = zeek::make_intrusive<zeek::OpaqueType>("Broker::SetIterator");
opaque_of_table_iterator = zeek::make_intrusive<zeek::OpaqueType>("Broker::TableIterator");
opaque_of_vector_iterator = zeek::make_intrusive<zeek::OpaqueType>("Broker::VectorIterator");
opaque_of_record_iterator = zeek::make_intrusive<zeek::OpaqueType>("Broker::RecordIterator");
opaque_of_store_handle = zeek::make_intrusive<zeek::OpaqueType>("Broker::Store");
detail::opaque_of_data_type = zeek::make_intrusive<zeek::OpaqueType>("Broker::Data");
detail::opaque_of_set_iterator = zeek::make_intrusive<zeek::OpaqueType>("Broker::SetIterator");
detail::opaque_of_table_iterator = zeek::make_intrusive<zeek::OpaqueType>("Broker::TableIterator");
detail::opaque_of_vector_iterator = zeek::make_intrusive<zeek::OpaqueType>("Broker::VectorIterator");
detail::opaque_of_record_iterator = zeek::make_intrusive<zeek::OpaqueType>("Broker::RecordIterator");
detail::opaque_of_store_handle = zeek::make_intrusive<zeek::OpaqueType>("Broker::Store");
vector_of_data_type = zeek::make_intrusive<zeek::VectorType>(zeek::id::find_type("Broker::Data"));
// Register as a "dont-count" source first, we may change that later.
@ -243,7 +243,7 @@ void Manager::InitializeBrokerStoreForwarding()
if ( ! zeek_table_manager )
continue;
auto backend = bro_broker::to_backend_type(e);
auto backend = detail::to_backend_type(e);
auto suffix = ".store";
switch ( backend ) {
@ -448,7 +448,7 @@ bool Manager::PublishEvent(string topic, zeek::RecordVal* args)
for ( auto i = 0u; i < vv->Size(); ++i )
{
const auto& val = vv->At(i)->AsRecordVal()->GetField(0);
auto data_val = static_cast<DataVal*>(val.get());
auto data_val = static_cast<detail::DataVal*>(val.get());
xs.emplace_back(data_val->data);
}
@ -475,7 +475,7 @@ bool Manager::PublishIdentifier(std::string topic, std::string id)
// receiving side, but not sure what use that would be.
return false;
auto data = val_to_data(val.get());
auto data = detail::val_to_data(val.get());
if ( ! data )
{
@ -528,7 +528,7 @@ bool Manager::PublishLogCreate(zeek::EnumVal* stream, zeek::EnumVal* writer,
for ( auto i = 0; i < num_fields; ++i )
{
auto field_data = threading_field_to_data(fields[i]);
auto field_data = detail::threading_field_to_data(fields[i]);
fields_data.push_back(move(field_data));
}
@ -806,10 +806,10 @@ zeek::RecordVal* Manager::MakeEvent(val_list* args, zeek::detail::Frame* frame)
zeek::RecordValPtr data_val;
if ( same_type(got_type, bro_broker::DataVal::ScriptDataType()) )
if ( same_type(got_type, detail::DataVal::ScriptDataType()) )
data_val = {zeek::NewRef{}, (*args)[i]->AsRecordVal()};
else
data_val = make_data_val((*args)[i]);
data_val = detail::make_data_val((*args)[i]);
if ( ! data_val->GetField(0) )
{
@ -1031,7 +1031,7 @@ void Manager::ProcessStoreEventInsertUpdate(const zeek::TableValPtr& table,
const auto& its = table->GetType()->AsTableType()->GetIndexTypes();
assert( its.size() == 1 );
auto zeek_key = data_to_val(key, its[0].get());
auto zeek_key = detail::data_to_val(key, its[0].get());
if ( ! zeek_key )
{
zeek::reporter->Error("ProcessStoreEvent %s: could not convert key \"%s\" for store \"%s\" while receiving remote data. This probably means the tables have different types on different nodes.", type, to_string(key).c_str(), store_id.c_str());
@ -1045,7 +1045,7 @@ void Manager::ProcessStoreEventInsertUpdate(const zeek::TableValPtr& table,
}
// it is a table
auto zeek_value = data_to_val(data, table->GetType()->Yield().get());
auto zeek_value = detail::data_to_val(data, table->GetType()->Yield().get());
if ( ! zeek_value )
{
zeek::reporter->Error("ProcessStoreEvent %s: could not convert value \"%s\" for key \"%s\" in store \"%s\" while receiving remote data. This probably means the tables have different types on different nodes.", type, to_string(data).c_str(), to_string(key).c_str(), store_id.c_str());
@ -1107,7 +1107,7 @@ void Manager::ProcessStoreEvent(broker::data msg)
DBG_LOG(zeek::DBG_BROKER, "Store %s: Erase key %s", erase.store_id().c_str(), to_string(key).c_str());
const auto& its = table->GetType()->AsTableType()->GetIndexTypes();
assert( its.size() == 1 );
auto zeek_key = data_to_val(key, its[0].get());
auto zeek_key = detail::data_to_val(key, its[0].get());
if ( ! zeek_key )
{
zeek::reporter->Error("ProcessStoreEvent: could not convert key \"%s\" for store \"%s\" while receiving remote erase. This probably means the tables have different types on different nodes.", to_string(key).c_str(), insert.store_id().c_str());
@ -1190,7 +1190,7 @@ void Manager::ProcessEvent(const broker::topic& topic, broker::zeek::Event ev)
{
auto got_type = args[i].get_type_name();
const auto& expected_type = arg_types[i];
auto val = data_to_val(std::move(args[i]), expected_type.get());
auto val = detail::data_to_val(std::move(args[i]), expected_type.get());
if ( val )
vl.emplace_back(std::move(val));
@ -1219,7 +1219,7 @@ void Manager::ProcessEvent(const broker::topic& topic, broker::zeek::Event ev)
zeek::event_mgr.Enqueue(handler, std::move(vl), SOURCE_BROKER);
}
bool bro_broker::Manager::ProcessLogCreate(broker::zeek::LogCreate lc)
bool Manager::ProcessLogCreate(broker::zeek::LogCreate lc)
{
DBG_LOG(zeek::DBG_BROKER, "Received log-create: %s", RenderMessage(lc.as_data()).c_str());
if ( ! lc.valid() )
@ -1229,14 +1229,14 @@ bool bro_broker::Manager::ProcessLogCreate(broker::zeek::LogCreate lc)
return false;
}
auto stream_id = data_to_val(std::move(lc.stream_id()), log_id_type);
auto stream_id = detail::data_to_val(std::move(lc.stream_id()), log_id_type);
if ( ! stream_id )
{
zeek::reporter->Warning("failed to unpack remote log stream id");
return false;
}
auto writer_id = data_to_val(std::move(lc.writer_id()), writer_id_type);
auto writer_id = detail::data_to_val(std::move(lc.writer_id()), writer_id_type);
if ( ! writer_id )
{
zeek::reporter->Warning("failed to unpack remote log writer id");
@ -1264,7 +1264,7 @@ bool bro_broker::Manager::ProcessLogCreate(broker::zeek::LogCreate lc)
for ( size_t i = 0; i < num_fields; ++i )
{
if ( auto field = data_to_threading_field(std::move((*fields_data)[i])) )
if ( auto field = detail::data_to_threading_field(std::move((*fields_data)[i])) )
fields[i] = field;
else
{
@ -1284,7 +1284,7 @@ bool bro_broker::Manager::ProcessLogCreate(broker::zeek::LogCreate lc)
return true;
}
bool bro_broker::Manager::ProcessLogWrite(broker::zeek::LogWrite lw)
bool Manager::ProcessLogWrite(broker::zeek::LogWrite lw)
{
DBG_LOG(zeek::DBG_BROKER, "Received log-write: %s", RenderMessage(lw.as_data()).c_str());
@ -1299,7 +1299,7 @@ bool bro_broker::Manager::ProcessLogWrite(broker::zeek::LogWrite lw)
auto& stream_id_name = lw.stream_id().name;
// Get stream ID.
auto stream_id = data_to_val(std::move(lw.stream_id()), log_id_type);
auto stream_id = detail::data_to_val(std::move(lw.stream_id()), log_id_type);
if ( ! stream_id )
{
@ -1309,7 +1309,7 @@ bool bro_broker::Manager::ProcessLogWrite(broker::zeek::LogWrite lw)
}
// Get writer ID.
auto writer_id = data_to_val(std::move(lw.writer_id()), writer_id_type);
auto writer_id = detail::data_to_val(std::move(lw.writer_id()), writer_id_type);
if ( ! writer_id )
{
zeek::reporter->Warning("failed to unpack remote log writer id for stream: %s", stream_id_name.data());
@ -1391,7 +1391,7 @@ bool Manager::ProcessIdentifierUpdate(broker::zeek::IdentifierUpdate iu)
return false;
}
auto val = data_to_val(std::move(id_value), id->GetType().get());
auto val = detail::data_to_val(std::move(id_value), id->GetType().get());
if ( ! val )
{
@ -1413,24 +1413,24 @@ void Manager::ProcessStatus(broker::status stat)
zeek::EventHandlerPtr event;
switch (stat.code()) {
case broker::sc::unspecified:
event = Broker::status;
event = ::Broker::status;
break;
case broker::sc::peer_added:
++peer_count;
assert(ctx);
zeek::log_mgr->SendAllWritersTo(*ctx);
event = Broker::peer_added;
event = ::Broker::peer_added;
break;
case broker::sc::peer_removed:
--peer_count;
event = Broker::peer_removed;
event = ::Broker::peer_removed;
break;
case broker::sc::peer_lost:
--peer_count;
event = Broker::peer_lost;
event = ::Broker::peer_lost;
break;
default:
@ -1476,7 +1476,7 @@ void Manager::ProcessError(broker::error err)
{
DBG_LOG(zeek::DBG_BROKER, "Received error message: %s", RenderMessage(err).c_str());
if ( ! Broker::error )
if ( ! ::Broker::error )
return;
BifEnum::Broker::ErrorCode ec;
@ -1502,12 +1502,12 @@ void Manager::ProcessError(broker::error err)
msg = fmt("[%s] %s", caf::to_string(err.category()).c_str(), caf::to_string(err.context()).c_str());
}
zeek::event_mgr.Enqueue(Broker::error,
zeek::event_mgr.Enqueue(::Broker::error,
zeek::BifType::Enum::Broker::ErrorCode->GetEnumVal(ec),
zeek::make_intrusive<zeek::StringVal>(msg));
}
void Manager::ProcessStoreResponse(StoreHandleVal* s, broker::store::response response)
void Manager::ProcessStoreResponse(detail::StoreHandleVal* s, broker::store::response response)
{
DBG_LOG(zeek::DBG_BROKER, "Received store response: %s", RenderMessage(response).c_str());
@ -1529,7 +1529,7 @@ void Manager::ProcessStoreResponse(StoreHandleVal* s, broker::store::response re
}
if ( response.answer )
request->second->Result(query_result(make_data_val(std::move(*response.answer))));
request->second->Result(detail::query_result(detail::make_data_val(std::move(*response.answer))));
else if ( response.answer.error() == broker::ec::request_timeout )
{
// Fine, trigger's timeout takes care of things.
@ -1543,7 +1543,7 @@ void Manager::ProcessStoreResponse(StoreHandleVal* s, broker::store::response re
// this type of error (which is less easily handled programmatically).
}
else if ( response.answer.error() == broker::ec::no_such_key )
request->second->Result(query_result());
request->second->Result(detail::query_result());
else
zeek::reporter->InternalWarning("unknown store response status: %s",
to_string(response.answer.error()).c_str());
@ -1552,8 +1552,8 @@ void Manager::ProcessStoreResponse(StoreHandleVal* s, broker::store::response re
pending_queries.erase(request);
}
StoreHandleVal* Manager::MakeMaster(const string& name, broker::backend type,
broker::backend_options opts)
detail::StoreHandleVal* Manager::MakeMaster(const string& name, broker::backend type,
broker::backend_options opts)
{
if ( bstate->endpoint.is_shutdown() )
return nullptr;
@ -1594,7 +1594,7 @@ StoreHandleVal* Manager::MakeMaster(const string& name, broker::backend type,
return nullptr;
}
auto handle = new StoreHandleVal{*result};
auto handle = new detail::StoreHandleVal{*result};
Ref(handle);
data_stores.emplace(name, handle);
@ -1611,7 +1611,7 @@ StoreHandleVal* Manager::MakeMaster(const string& name, broker::backend type,
return handle;
}
void Manager::BrokerStoreToZeekTable(const std::string& name, const StoreHandleVal* handle)
void Manager::BrokerStoreToZeekTable(const std::string& name, const detail::StoreHandleVal* handle)
{
if ( ! handle->forward_to )
return;
@ -1631,7 +1631,7 @@ void Manager::BrokerStoreToZeekTable(const std::string& name, const StoreHandleV
for ( const auto& key : *set )
{
auto zeek_key = data_to_val(key, its[0].get());
auto zeek_key = detail::data_to_val(key, its[0].get());
if ( ! zeek_key )
{
zeek::reporter->Error("Failed to convert key \"%s\" while importing broker store to table for store \"%s\". Aborting import.", to_string(key).c_str(), name.c_str());
@ -1654,7 +1654,7 @@ void Manager::BrokerStoreToZeekTable(const std::string& name, const StoreHandleV
continue;
}
auto zeek_value = data_to_val(*value, table->GetType()->Yield().get());
auto zeek_value = detail::data_to_val(*value, table->GetType()->Yield().get());
if ( ! zeek_value )
{
zeek::reporter->Error("Could not convert %s to table value while trying to import Broker store %s. Aborting import.", to_string(value).c_str(), name.c_str());
@ -1669,9 +1669,9 @@ void Manager::BrokerStoreToZeekTable(const std::string& name, const StoreHandleV
return;
}
StoreHandleVal* Manager::MakeClone(const string& name, double resync_interval,
double stale_interval,
double mutation_buffer_interval)
detail::StoreHandleVal* Manager::MakeClone(const string& name, double resync_interval,
double stale_interval,
double mutation_buffer_interval)
{
if ( bstate->endpoint.is_shutdown() )
return nullptr;
@ -1691,7 +1691,7 @@ StoreHandleVal* Manager::MakeClone(const string& name, double resync_interval,
return nullptr;
}
auto handle = new StoreHandleVal{*result};
auto handle = new detail::StoreHandleVal{*result};
Ref(handle);
data_stores.emplace(name, handle);
@ -1700,7 +1700,7 @@ StoreHandleVal* Manager::MakeClone(const string& name, double resync_interval,
return handle;
}
StoreHandleVal* Manager::LookupStore(const string& name)
detail::StoreHandleVal* Manager::LookupStore(const string& name)
{
auto i = data_stores.find(name);
return i == data_stores.end() ? nullptr : i->second;
@ -1733,8 +1733,8 @@ bool Manager::CloseStore(const string& name)
return true;
}
bool Manager::TrackStoreQuery(StoreHandleVal* handle, broker::request_id id,
StoreQueryCallback* cb)
bool Manager::TrackStoreQuery(detail::StoreHandleVal* handle, broker::request_id id,
detail::StoreQueryCallback* cb)
{
auto rval = pending_queries.emplace(std::make_pair(id, handle), cb).second;
@ -1784,4 +1784,4 @@ void Manager::PrepareForwarding(const std::string &name)
DBG_LOG(zeek::DBG_BROKER, "Resolved table forward for data store %s", name.c_str());
}
} // namespace bro_broker
} // namespace zeek::Broker