mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 06:38:20 +00:00
Merge remote-tracking branch 'origin/topic/neverlord/hide-caf'
* origin/topic/neverlord/hide-caf: Fix GCC builds and string output for Broker errors Update to latest Broker without public CAF dep
This commit is contained in:
commit
5f1f005142
34 changed files with 393 additions and 1066 deletions
|
@ -1,6 +1,7 @@
|
|||
#include "zeek/broker/Manager.h"
|
||||
|
||||
#include <broker/broker.hh>
|
||||
#include <broker/configuration.hh>
|
||||
#include <broker/zeek.hh>
|
||||
#include <unistd.h>
|
||||
#include <cstdio>
|
||||
|
@ -27,6 +28,58 @@
|
|||
|
||||
using namespace std;
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
void print_escaped(std::string& buf, std::string_view str)
|
||||
{
|
||||
buf.push_back('"');
|
||||
for ( auto c : str )
|
||||
{
|
||||
switch ( c )
|
||||
{
|
||||
default:
|
||||
buf.push_back(c);
|
||||
break;
|
||||
case '\\':
|
||||
buf.push_back('\\');
|
||||
buf.push_back('\\');
|
||||
break;
|
||||
case '\b':
|
||||
buf.push_back('\\');
|
||||
buf.push_back('b');
|
||||
break;
|
||||
case '\f':
|
||||
buf.push_back('\\');
|
||||
buf.push_back('f');
|
||||
break;
|
||||
case '\n':
|
||||
buf.push_back('\\');
|
||||
buf.push_back('n');
|
||||
break;
|
||||
case '\r':
|
||||
buf.push_back('\\');
|
||||
buf.push_back('r');
|
||||
break;
|
||||
case '\t':
|
||||
buf.push_back('\\');
|
||||
buf.push_back('t');
|
||||
break;
|
||||
case '\v':
|
||||
buf.push_back('\\');
|
||||
buf.push_back('v');
|
||||
break;
|
||||
case '"':
|
||||
buf.push_back('\\');
|
||||
buf.push_back('"');
|
||||
break;
|
||||
}
|
||||
}
|
||||
buf.push_back('"');
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
namespace zeek::Broker
|
||||
{
|
||||
|
||||
|
@ -90,10 +143,10 @@ namespace
|
|||
struct opt_mapping
|
||||
{
|
||||
broker::configuration* cfg;
|
||||
std::string_view broker_name;
|
||||
std::string broker_name;
|
||||
const char* zeek_name;
|
||||
|
||||
template <class T> auto broker_read() { return caf::get_as<T>(*cfg, broker_name); }
|
||||
template <class T> auto broker_read() { return broker::get_as<T>(*cfg, broker_name); }
|
||||
|
||||
template <class T> auto broker_write(T&& val) { cfg->set(broker_name, std::forward<T>(val)); }
|
||||
|
||||
|
@ -107,23 +160,10 @@ struct opt_mapping
|
|||
|
||||
} // namespace
|
||||
|
||||
class BrokerConfig : public broker::configuration
|
||||
{
|
||||
public:
|
||||
BrokerConfig(broker::broker_options options) : broker::configuration(options)
|
||||
{
|
||||
openssl_cafile = get_option("Broker::ssl_cafile")->AsString()->CheckString();
|
||||
openssl_capath = get_option("Broker::ssl_capath")->AsString()->CheckString();
|
||||
openssl_certificate = get_option("Broker::ssl_certificate")->AsString()->CheckString();
|
||||
openssl_key = get_option("Broker::ssl_keyfile")->AsString()->CheckString();
|
||||
openssl_passphrase = get_option("Broker::ssl_passphrase")->AsString()->CheckString();
|
||||
}
|
||||
};
|
||||
|
||||
class BrokerState
|
||||
{
|
||||
public:
|
||||
BrokerState(BrokerConfig config, size_t congestion_queue_size)
|
||||
BrokerState(broker::configuration config, size_t congestion_queue_size)
|
||||
: endpoint(std::move(config)),
|
||||
subscriber(endpoint.make_subscriber({broker::topic::statuses(), broker::topic::errors()},
|
||||
congestion_queue_size))
|
||||
|
@ -238,7 +278,13 @@ void Manager::InitPostScript()
|
|||
options.forward = get_option("Broker::forward_messages")->AsBool();
|
||||
options.use_real_time = use_real_time;
|
||||
|
||||
BrokerConfig config{std::move(options)};
|
||||
broker::configuration config{std::move(options)};
|
||||
|
||||
config.openssl_cafile(get_option("Broker::ssl_cafile")->AsString()->CheckString());
|
||||
config.openssl_capath(get_option("Broker::ssl_capath")->AsString()->CheckString());
|
||||
config.openssl_certificate(get_option("Broker::ssl_certificate")->AsString()->CheckString());
|
||||
config.openssl_key(get_option("Broker::ssl_keyfile")->AsString()->CheckString());
|
||||
config.openssl_passphrase(get_option("Broker::ssl_passphrase")->AsString()->CheckString());
|
||||
|
||||
auto scheduler_policy = get_option("Broker::scheduler_policy")->AsString()->CheckString();
|
||||
|
||||
|
@ -257,11 +303,11 @@ void Manager::InitPostScript()
|
|||
config.set("caf.scheduler.max-threads", get_option("Broker::max_threads")->AsCount());
|
||||
|
||||
config.set("caf.work-stealing.moderate-sleep-duration",
|
||||
caf::timespan(static_cast<unsigned>(
|
||||
broker::timespan(static_cast<unsigned>(
|
||||
get_option("Broker::moderate_sleep")->AsInterval() * 1e9)));
|
||||
|
||||
config.set("caf.work-stealing.relaxed-sleep-duration",
|
||||
caf::timespan(
|
||||
broker::timespan(
|
||||
static_cast<unsigned>(get_option("Broker::relaxed_sleep")->AsInterval() * 1e9)));
|
||||
|
||||
config.set("caf.work-stealing.aggressive-poll-attempts",
|
||||
|
@ -363,6 +409,8 @@ void Manager::InitPostScript()
|
|||
|
||||
bstate->subscriber.add_topic(broker::topic::store_events(), true);
|
||||
|
||||
telemetry_mgr->InitPostBrokerSetup(bstate->endpoint);
|
||||
|
||||
InitializeBrokerStoreForwarding();
|
||||
}
|
||||
|
||||
|
@ -1030,7 +1078,7 @@ void Manager::DispatchMessage(const broker::topic& topic, broker::data msg)
|
|||
if ( ! batch.valid() )
|
||||
{
|
||||
reporter->Warning("received invalid broker Batch: %s",
|
||||
broker::to_string(batch).data());
|
||||
broker::to_string(batch.as_data()).data());
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -1376,7 +1424,8 @@ bool Manager::ProcessLogCreate(broker::zeek::LogCreate lc)
|
|||
DBG_LOG(DBG_BROKER, "Received log-create: %s", RenderMessage(lc.as_data()).c_str());
|
||||
if ( ! lc.valid() )
|
||||
{
|
||||
reporter->Warning("received invalid broker LogCreate: %s", broker::to_string(lc).data());
|
||||
reporter->Warning("received invalid broker LogCreate: %s",
|
||||
broker::to_string(lc.as_data()).data());
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -1402,7 +1451,7 @@ bool Manager::ProcessLogCreate(broker::zeek::LogCreate lc)
|
|||
}
|
||||
|
||||
// Get log fields.
|
||||
auto fields_data = caf::get_if<broker::vector>(&lc.fields_data());
|
||||
auto fields_data = get_if<broker::vector>(&lc.fields_data());
|
||||
|
||||
if ( ! fields_data )
|
||||
{
|
||||
|
@ -1442,7 +1491,8 @@ bool Manager::ProcessLogWrite(broker::zeek::LogWrite lw)
|
|||
|
||||
if ( ! lw.valid() )
|
||||
{
|
||||
reporter->Warning("received invalid broker LogWrite: %s", broker::to_string(lw).data());
|
||||
reporter->Warning("received invalid broker LogWrite: %s",
|
||||
broker::to_string(lw.as_data()).data());
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -1467,7 +1517,7 @@ bool Manager::ProcessLogWrite(broker::zeek::LogWrite lw)
|
|||
return false;
|
||||
}
|
||||
|
||||
auto path = caf::get_if<std::string>(&lw.path());
|
||||
auto path = get_if<std::string>(&lw.path());
|
||||
|
||||
if ( ! path )
|
||||
{
|
||||
|
@ -1476,7 +1526,7 @@ bool Manager::ProcessLogWrite(broker::zeek::LogWrite lw)
|
|||
return false;
|
||||
}
|
||||
|
||||
auto serial_data = caf::get_if<std::string>(&lw.serial_data());
|
||||
auto serial_data = get_if<std::string>(&lw.serial_data());
|
||||
|
||||
if ( ! serial_data )
|
||||
{
|
||||
|
@ -1531,7 +1581,7 @@ bool Manager::ProcessIdentifierUpdate(broker::zeek::IdentifierUpdate iu)
|
|||
if ( ! iu.valid() )
|
||||
{
|
||||
reporter->Warning("received invalid broker IdentifierUpdate: %s",
|
||||
broker::to_string(iu).data());
|
||||
broker::to_string(iu.as_data()).data());
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -1655,12 +1705,12 @@ void Manager::ProcessError(broker::error_view err)
|
|||
if ( auto ctx = err.context() )
|
||||
{
|
||||
msg += '(';
|
||||
msg += to_string(ctx->node);
|
||||
msg += broker::to_string(ctx->node);
|
||||
msg += ", ";
|
||||
msg += caf::deep_to_string(ctx->network);
|
||||
msg += broker::to_string(ctx->network);
|
||||
msg += ", ";
|
||||
if ( auto what = err.message() )
|
||||
msg += caf::deep_to_string(*what);
|
||||
print_escaped(msg, *what);
|
||||
else
|
||||
msg += R"_("")_";
|
||||
msg += ')';
|
||||
|
@ -1783,7 +1833,7 @@ void Manager::BrokerStoreToZeekTable(const std::string& name, const detail::Stor
|
|||
if ( ! keys )
|
||||
return;
|
||||
|
||||
auto set = caf::get_if<broker::set>(&(keys->get_data()));
|
||||
auto set = get_if<broker::set>(&(keys->get_data()));
|
||||
auto table = handle->forward_to;
|
||||
const auto& its = table->GetType()->AsTableType()->GetIndexTypes();
|
||||
bool is_set = table->GetType()->IsSet();
|
||||
|
@ -1982,53 +2032,4 @@ void Manager::SetMetricsExportPrefixes(std::vector<std::string> filter)
|
|||
bstate->endpoint.metrics_exporter().set_prefixes(std::move(filter));
|
||||
}
|
||||
|
||||
std::unique_ptr<telemetry::Manager> Manager::NewTelemetryManager()
|
||||
{
|
||||
// The telemetry Manager actually only has a dependency on the actor system,
|
||||
// not to the Broker Manager. By having the telemetry Manager hold on to a
|
||||
// shared_ptr to our Broker state, we make sure the Broker endpoint, which
|
||||
// owns the CAF actor system, lives for as long as necessary. This also
|
||||
// makes sure that the Broker Manager may even get destroyed before the
|
||||
// telemetry Manager.
|
||||
struct TM final : public telemetry::Manager
|
||||
{
|
||||
using MetricRegistryPtr = std::unique_ptr<caf::telemetry::metric_registry>;
|
||||
|
||||
static auto getPimpl(BrokerState& st)
|
||||
{
|
||||
auto registry = std::addressof(st.endpoint.system().metrics());
|
||||
return reinterpret_cast<telemetry::Manager::Impl*>(registry);
|
||||
}
|
||||
|
||||
static auto getPimpl(MetricRegistryPtr& ptr)
|
||||
{
|
||||
return reinterpret_cast<telemetry::Manager::Impl*>(ptr.get());
|
||||
}
|
||||
|
||||
explicit TM(Broker::Manager* parent, MetricRegistryPtr ptr)
|
||||
: telemetry::Manager(getPimpl(ptr)), parent(parent), tmp(std::move(ptr))
|
||||
{
|
||||
assert(tmp != nullptr);
|
||||
assert(parent != nullptr);
|
||||
}
|
||||
|
||||
void InitPostScript() override
|
||||
{
|
||||
assert(parent->bstate != nullptr);
|
||||
ptr = parent->bstate;
|
||||
auto registry = std::addressof(ptr->endpoint.system().metrics());
|
||||
registry->merge(*tmp);
|
||||
tmp.reset();
|
||||
pimpl = reinterpret_cast<telemetry::Manager::Impl*>(registry);
|
||||
}
|
||||
|
||||
Broker::Manager* parent;
|
||||
MetricRegistryPtr tmp;
|
||||
std::shared_ptr<BrokerState> ptr;
|
||||
};
|
||||
|
||||
auto tmp = std::make_unique<caf::telemetry::metric_registry>();
|
||||
return std::make_unique<TM>(this, std::move(tmp));
|
||||
}
|
||||
|
||||
} // namespace zeek::Broker
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue