Sync new broker options, fix name inconsistencies

This commit is contained in:
Dominik Charousset 2021-05-25 17:22:45 +02:00
parent f9cd05f00b
commit 7767c3d36c
5 changed files with 293 additions and 72 deletions

View file

@ -41,6 +41,86 @@ static inline Val* get_option(const char* option)
return id->GetVal().get();
}
template <class T>
static inline void set_option(const char* option, const T& value)
{
const auto& id = zeek::detail::global_scope()->Find(option);
if ( ! id )
reporter->FatalError("Unknown Broker option %s", option);
if constexpr ( std::is_same_v<T, broker::port> )
{
switch ( value.type() ) {
case broker::port::protocol::tcp:
id->SetVal(val_mgr->Port(value.number(), TRANSPORT_TCP));
break;
case broker::port::protocol::udp:
id->SetVal(val_mgr->Port(value.number(), TRANSPORT_UDP));
break;
case broker::port::protocol::icmp:
id->SetVal(val_mgr->Port(value.number(), TRANSPORT_ICMP));
break;
default:
id->SetVal(val_mgr->Port(value.number(), TRANSPORT_UNKNOWN ));
}
}
else if constexpr ( std::is_same_v<T, broker::timespan> )
{
using std::chrono::duration_cast;
auto ts = duration_cast<broker::fractional_seconds>(value);
id->SetVal(make_intrusive<IntervalVal>(ts.count()));
}
else if constexpr ( std::is_same_v<T, std::vector<std::string>> )
{
auto ptr = make_intrusive<VectorVal>(zeek::id::string_vec);
for ( const auto& str : value )
ptr->Append(make_intrusive<StringVal>(str));
}
else
{
static_assert(std::is_same_v<T, std::string>);
id->SetVal(make_intrusive<StringVal>(value));
}
}
namespace {
struct opt_mapping {
broker::configuration* cfg;
std::string_view broker_name;
const char* zeek_name;
template <class T>
auto broker_read()
{
return caf::get_as<T>(*cfg, broker_name);
}
template <class T>
auto broker_write(T&& val)
{
cfg->set(broker_name, std::forward<T>(val));
}
auto zeek_read()
{
return get_option(zeek_name);
}
template <class T>
auto zeek_write(const T& val)
{
set_option(zeek_name, val);
}
};
#define WITH_OPT_MAPPING(broker_name, zeek_name) \
if ( auto opt = opt_mapping{&config, broker_name, zeek_name}; true )
}//namespace
class BrokerConfig : public broker::configuration {
public:
BrokerConfig(broker::broker_options options)
@ -212,6 +292,89 @@ void Manager::InitPostScript()
config.set("caf.work-stealing.relaxed-steal-interval",
get_option("Broker::relaxed_interval")->AsCount());
// Before launching Broker, we check whether the configuration contains
// values for the metric_exporter_* options. If Broker already has picked up
// values from environment variables (or config files) then we write then
// back. Otherwise, we forward user-defined values from script land (but
// ignore defaults).
WITH_OPT_MAPPING("broker.metrics.port", "Broker::metrics_port")
{
if ( auto port = opt.broker_read<uint16_t>() )
{
opt.zeek_write(broker::port{*port, broker::port::protocol::tcp});
}
else
{
auto ptr = opt.zeek_read()->AsPortVal();
if ( ptr->IsTCP() )
opt.broker_write(ptr->Port());
}
}
WITH_OPT_MAPPING("broker.metrics.export.interval",
"Broker::metrics_export_interval")
{
if ( auto ts = opt.broker_read<broker::timespan>() )
{
opt.zeek_write(*ts);
}
else
{
using std::chrono::duration_cast;
auto val = opt.zeek_read()->AsInterval();
auto frac_ts = broker::fractional_seconds{val};
if ( frac_ts.count() > 0.0 )
opt.broker_write(duration_cast<broker::timespan>(frac_ts));
}
}
WITH_OPT_MAPPING("broker.metrics.export.topic",
"Broker::metrics_export_topic")
{
if ( auto str = opt.broker_read<std::string>() )
{
opt.zeek_write(*str);
}
else
{
auto ptr = opt.zeek_read()->AsStringVal();
if ( ptr->Len() > 0 )
opt.broker_write(ptr->ToStdString());
}
}
WITH_OPT_MAPPING("broker.metrics.endpoint-name",
"Broker::metrics_export_endpoint_name")
{
if ( auto str = opt.broker_read<std::string>() )
{
opt.zeek_write(*str);
}
else
{
auto ptr = opt.zeek_read()->AsStringVal();
if ( ptr->Len() > 0 )
opt.broker_write(ptr->ToStdString());
}
}
WITH_OPT_MAPPING("broker.metrics.export.prefixes",
"Broker::metrics_export_prefixes")
{
if ( auto str = opt.broker_read<std::vector<std::string>>() )
{
opt.zeek_write(*str);
}
else
{
auto ptr = opt.zeek_read()->AsVectorVal();
std::vector<std::string> str_ls;
for ( unsigned index = 0; index < ptr->Size(); ++index )
str_ls.emplace_back(ptr->StringValAt(index)->ToStdString());
opt.broker_write(std::move(str_ls));
}
}
auto cqs = get_option("Broker::congestion_queue_size")->AsCount();
bstate = std::make_shared<BrokerState>(std::move(config), cqs);
@ -1807,24 +1970,24 @@ void Manager::PrepareForwarding(const std::string &name)
DBG_LOG(DBG_BROKER, "Resolved table forward for data store %s", name.c_str());
}
void Manager::SetMetricsExporterInterval(double value)
void Manager::SetMetricsExportInterval(double value)
{
broker::timespan ts;
if ( broker::convert(value, ts) )
bstate->endpoint.metrics_exporter().set_interval(ts);
}
void Manager::SetMetricsExporterTarget(std::string value)
void Manager::SetMetricsExportTopic(std::string value)
{
bstate->endpoint.metrics_exporter().set_target(std::move(value));
}
void Manager::SetMetricsExporterId(std::string value)
void Manager::SetMetricsExportEndpointName(std::string value)
{
bstate->endpoint.metrics_exporter().set_id(std::move(value));
}
void Manager::SetMetricsExporterPrefixes(std::vector<std::string> filter)
void Manager::SetMetricsExportPrefixes(std::vector<std::string> filter)
{
bstate->endpoint.metrics_exporter().set_prefixes(std::move(filter));
}