Merge remote-tracking branch 'origin/topic/neverlord/broker-logging'

* origin/topic/neverlord/broker-logging:
  Integrate review feedback
  Hook into Broker logs via its new API
This commit is contained in:
Arne Welzel 2025-03-31 18:44:30 +02:00
commit 14697ea6ba
11 changed files with 297 additions and 21 deletions

View file

@ -5,6 +5,9 @@
#include <broker/config.hh>
#include <broker/configuration.hh>
#include <broker/endpoint.hh>
#include <broker/event.hh>
#include <broker/event_observer.hh>
#include <broker/logger.hh>
#include <broker/variant.hh>
#include <broker/zeek.hh>
#include <unistd.h>
@ -16,10 +19,12 @@
#include "zeek/DebugLogger.h"
#include "zeek/Desc.h"
#include "zeek/Event.h"
#include "zeek/Flare.h"
#include "zeek/Func.h"
#include "zeek/IntrusivePtr.h"
#include "zeek/Reporter.h"
#include "zeek/RunState.h"
#include "zeek/Scope.h"
#include "zeek/SerializationFormat.h"
#include "zeek/Type.h"
#include "zeek/Var.h"
@ -86,6 +91,58 @@ void print_escaped(std::string& buf, std::string_view str) {
buf.push_back('"');
}
class LoggerQueue {
public:
void Push(broker::event_ptr event) {
std::list<broker::event_ptr> tmp;
tmp.emplace_back(std::move(event));
{
std::lock_guard<std::mutex> lock(mutex_);
queue_.splice(queue_.end(), tmp);
if ( queue_.size() == 1 ) {
flare_.Fire();
}
}
}
auto Drain() {
std::list<broker::event_ptr> events;
std::lock_guard<std::mutex> lock(mutex_);
if ( ! queue_.empty() ) {
queue_.swap(events);
flare_.Extinguish();
}
return events;
}
auto FlareFd() const noexcept { return flare_.FD(); }
private:
std::mutex mutex_;
zeek::detail::Flare flare_;
std::list<broker::event_ptr> queue_;
};
using LoggerQueuePtr = std::shared_ptr<LoggerQueue>;
using BrokerSeverityLevel = broker::event::severity_level;
class LoggerAdapter : public broker::event_observer {
public:
using SeverityLevel = broker::event::severity_level;
explicit LoggerAdapter(SeverityLevel severity, LoggerQueuePtr queue)
: severity_(severity), queue_(std::move(queue)) {}
void observe(broker::event_ptr what) override { queue_->Push(std::move(what)); }
bool accepts(SeverityLevel severity, broker::event::component_type) const override { return severity <= severity_; }
private:
SeverityLevel severity_;
LoggerQueuePtr queue_;
};
} // namespace
namespace zeek::Broker {
@ -162,13 +219,19 @@ struct opt_mapping {
class BrokerState {
public:
BrokerState(broker::configuration config, size_t congestion_queue_size)
using SeverityLevel = LoggerAdapter::SeverityLevel;
BrokerState(broker::configuration config, size_t congestion_queue_size, LoggerQueuePtr queue)
: endpoint(std::move(config), telemetry_mgr->GetRegistry()),
subscriber(
endpoint.make_subscriber({broker::topic::statuses(), broker::topic::errors()}, congestion_queue_size)) {}
endpoint.make_subscriber({broker::topic::statuses(), broker::topic::errors()}, congestion_queue_size)),
loggerQueue(std::move(queue)) {}
broker::endpoint endpoint;
broker::subscriber subscriber;
LoggerQueuePtr loggerQueue;
SeverityLevel logSeverity = SeverityLevel::critical;
SeverityLevel stderrSeverity = SeverityLevel::critical;
};
const broker::endpoint_info Manager::NoPeer{{}, {}};
@ -213,7 +276,7 @@ std::string RenderEvent(const std::string& topic, const std::string& name, const
} // namespace
#endif
Manager::Manager(bool arg_use_real_time) : Backend(nullptr, nullptr, nullptr) {
Manager::Manager(bool arg_use_real_time) : Backend(nullptr, nullptr, nullptr), iosource::IOSource(true) {
bound_port = 0;
use_real_time = arg_use_real_time;
peer_count = 0;
@ -324,12 +387,32 @@ void Manager::DoInitPostScript() {
config.set("caf.work-stealing.moderate-steal-interval", get_option("Broker::moderate_interval")->AsCount());
config.set("caf.work-stealing.relaxed-steal-interval", get_option("Broker::relaxed_interval")->AsCount());
// Hook up the logger.
auto checkLogSeverity = [](int level) {
if ( level < 0 || level > static_cast<int>(BrokerSeverityLevel::debug) ) {
reporter->FatalError("Invalid Broker::log_severity_level: %d", level);
}
};
auto logSeverityVal = static_cast<int>(get_option("Broker::log_severity_level")->AsEnum());
checkLogSeverity(logSeverityVal);
auto stderrSeverityVal = static_cast<int>(get_option("Broker::log_stderr_severity_level")->AsEnum());
checkLogSeverity(stderrSeverityVal);
auto adapterVerbosity = static_cast<BrokerSeverityLevel>(std::max(logSeverityVal, stderrSeverityVal));
auto queue = std::make_shared<LoggerQueue>();
auto adapter = std::make_shared<LoggerAdapter>(adapterVerbosity, queue);
broker::logger(adapter); // *must* be called before creating the BrokerState
auto cqs = get_option("Broker::congestion_queue_size")->AsCount();
bstate = std::make_shared<BrokerState>(std::move(config), cqs);
bstate = std::make_shared<BrokerState>(std::move(config), cqs, queue);
bstate->logSeverity = static_cast<BrokerSeverityLevel>(logSeverityVal);
bstate->stderrSeverity = static_cast<BrokerSeverityLevel>(stderrSeverityVal);
if ( ! iosource_mgr->RegisterFd(bstate->subscriber.fd(), this) )
reporter->FatalError("Failed to register broker subscriber with iosource_mgr");
if ( ! iosource_mgr->RegisterFd(queue->FlareFd(), this) )
reporter->FatalError("Failed to register broker logger with iosource_mgr");
bstate->subscriber.add_topic(broker::topic::store_events(), true);
InitializeBrokerStoreForwarding();
@ -400,6 +483,8 @@ void Manager::DoTerminate() {
iosource_mgr->UnregisterFd(bstate->subscriber.fd(), this);
iosource_mgr->UnregisterFd(bstate->loggerQueue->FlareFd(), this);
vector<string> stores_to_close;
for ( auto& x : data_stores )
@ -410,6 +495,8 @@ void Manager::DoTerminate() {
// modifies the map and invalidates iterators.
CloseStore(x);
ProcessLogEvents();
FlushLogBuffers();
}
@ -980,11 +1067,9 @@ bool Manager::DoUnsubscribe(const string& topic_prefix) {
return true;
}
void Manager::Process() {
void Manager::ProcessMessages() {
auto messages = bstate->subscriber.poll();
bool had_input = ! messages.empty();
for ( auto& message : messages ) {
auto&& topic = broker::get_topic(message);
@ -1029,20 +1114,105 @@ void Manager::Process() {
continue;
}
}
}
for ( auto& s : data_stores ) {
auto num_available = s.second->proxy.mailbox().size();
namespace {
if ( num_available > 0 ) {
had_input = true;
auto responses = s.second->proxy.receive(num_available);
// Note: copied from Stmt.cc, might be worth to move to a common place.
EnumValPtr lookup_enum_val(const char* module_name, const char* name) {
const auto& id = zeek::detail::lookup_ID(name, module_name);
assert(id);
assert(id->IsEnumConst());
for ( auto& r : responses )
ProcessStoreResponse(s.second, std::move(r));
EnumType* et = id->GetType()->AsEnumType();
int index = et->Lookup(module_name, name);
assert(index >= 0);
return et->GetEnumVal(index);
}
} // namespace
void Manager::ProcessLogEvents() {
static auto ev_critical = lookup_enum_val("Broker", "LOG_CRITICAL");
static auto ev_error = lookup_enum_val("Broker", "LOG_ERROR");
static auto ev_warning = lookup_enum_val("Broker", "LOG_WARNING");
static auto ev_info = lookup_enum_val("Broker", "LOG_INFO");
static auto ev_verbose = lookup_enum_val("Broker", "LOG_VERBOSE");
static auto ev_debug = lookup_enum_val("Broker", "LOG_DEBUG");
auto evType = [](BrokerSeverityLevel lvl) {
switch ( lvl ) {
case BrokerSeverityLevel::critical: return ev_critical;
case BrokerSeverityLevel::error: return ev_error;
case BrokerSeverityLevel::warning: return ev_warning;
case BrokerSeverityLevel::info: return ev_info;
case BrokerSeverityLevel::verbose: return ev_verbose;
default: return ev_debug;
}
};
constexpr const char* severity_names_tbl[] = {"critical", "error", "warning", "info", "verbose", "debug"};
auto events = bstate->loggerQueue->Drain();
for ( auto& event : events ) {
auto severity = event->severity;
if ( bstate->logSeverity >= severity ) {
auto args = Args{};
args.reserve(3);
args.emplace_back(evType(severity));
args.emplace_back(make_intrusive<StringVal>(event->identifier));
args.emplace_back(make_intrusive<StringVal>(event->description));
event_mgr.Enqueue(::Broker::internal_log_event, std::move(args));
}
if ( bstate->stderrSeverity >= severity ) {
fprintf(stderr, "[BROKER/%s] %s\n", severity_names_tbl[static_cast<int>(severity)],
event->description.c_str());
}
}
}
void Manager::ProcessDataStore(detail::StoreHandleVal* store) {
auto num_available = store->proxy.mailbox().size();
if ( num_available > 0 ) {
auto responses = store->proxy.receive(num_available);
for ( auto& r : responses )
ProcessStoreResponse(store, std::move(r));
}
}
void Manager::ProcessDataStores() {
for ( auto& kvp : data_stores ) {
ProcessDataStore(kvp.second);
}
}
void Manager::ProcessFd(int fd, int flags) {
if ( fd == bstate->subscriber.fd() ) {
ProcessMessages();
}
else if ( fd == bstate->loggerQueue->FlareFd() ) {
ProcessLogEvents();
}
else {
for ( auto& kvp : data_stores ) {
if ( fd == kvp.second->proxy.mailbox().descriptor() ) {
ProcessDataStore(kvp.second);
return;
}
}
}
}
void Manager::Process() {
ProcessMessages();
ProcessLogEvents();
ProcessDataStores();
}
void Manager::ProcessStoreEventInsertUpdate(const TableValPtr& table, const std::string& store_id,
const broker::data& key, const broker::data& data,
const broker::data& old_value, bool insert) {