Hook into Broker logs via its new API

The new Broker API allows us to provide a custom logger to Broker that
pulls previously unattainable context information out of Broker to put
them into broker.log for users of Zeek.

Since Broker log events happen asynchronously, we cache them in a queue
and use a flare to notify Zeek of activity. Furthermore, the Broker
manager now implements the `ProcessFd` function to avoid unnecessary
polling of the new log queue. As a side effect, data stores are polled
less as well.
This commit is contained in:
Dominik Charousset 2024-09-29 14:59:35 +02:00 committed by Dominik Charousset
parent b7b31ebce5
commit 30615f425e
7 changed files with 245 additions and 20 deletions

@ -1 +1 @@
Subproject commit 5847b2a5458d03d56654e19b6b51a182476d36e5
Subproject commit c37005de5fd8cc487a6e1d804622376cc306afcf

View file

@ -14,7 +14,19 @@ export {
## An informational status update.
STATUS,
## An error situation.
ERROR
ERROR,
## Fatal event, normal operation has most likely broken down.
CRITICAL_EVENT,
## Unrecoverable event that imparts at least part of the system.
ERROR_EVENT,
## Unespected or conspicuous event that may still be recoverable.
WARNING_EVENT,
## Noteworthy event during normal operation.
INFO_EVENT,
## Information that might be relevant for a user to understand system behavior.
VERBOSE_EVENT,
## An event that is relevant only for troubleshooting and debugging.
DEBUG_EVENT,
};
## A record type containing the column fields of the Broker log.

View file

@ -176,6 +176,28 @@ export {
## will be sent.
const log_topic: function(id: Log::ID, path: string): string = default_log_topic &redef;
## The possible log event severity levels for Broker.
type LogSeverityLevel: enum {
## Fatal event, normal operation has most likely broken down.
LOG_CRITICAL,
## Unrecoverable event that imparts at least part of the system.
LOG_ERROR,
## Unespected or conspicuous event that may still be recoverable.
LOG_WARNING,
## Noteworthy event during normal operation.
LOG_INFO,
## Information that might be relevant for a user to understand system behavior.
LOG_VERBOSE,
## An event that is relevant only for troubleshooting and debugging.
LOG_DEBUG,
};
## The log event severity level for the Broker log output.
const log_severity_level = LOG_WARNING &redef;
## Event severity level for also printing the Broker log output to stderr.
const log_stderr_severity_level = LOG_CRITICAL &redef;
type ErrorCode: enum {
## The unspecified default error code.
UNSPECIFIED = 1,

View file

@ -5,6 +5,9 @@
#include <broker/config.hh>
#include <broker/configuration.hh>
#include <broker/endpoint.hh>
#include <broker/event.hh>
#include <broker/event_observer.hh>
#include <broker/logger.hh>
#include <broker/variant.hh>
#include <broker/zeek.hh>
#include <unistd.h>
@ -16,10 +19,12 @@
#include "zeek/DebugLogger.h"
#include "zeek/Desc.h"
#include "zeek/Event.h"
#include "zeek/Flare.h"
#include "zeek/Func.h"
#include "zeek/IntrusivePtr.h"
#include "zeek/Reporter.h"
#include "zeek/RunState.h"
#include "zeek/Scope.h"
#include "zeek/SerializationFormat.h"
#include "zeek/Type.h"
#include "zeek/Var.h"
@ -86,6 +91,58 @@ void print_escaped(std::string& buf, std::string_view str) {
buf.push_back('"');
}
class LoggerQueue {
public:
void Push(broker::event_ptr event) {
std::list<broker::event_ptr> tmp;
tmp.emplace_back(std::move(event));
{
std::lock_guard<std::mutex> lock(mutex_);
queue_.splice(queue_.end(), tmp);
if ( queue_.size() == 1 ) {
flare_.Fire();
}
}
}
auto Drain() {
std::list<broker::event_ptr> events;
std::lock_guard<std::mutex> lock(mutex_);
if ( ! queue_.empty() ) {
queue_.swap(events);
flare_.Extinguish();
}
return events;
}
auto FlareFd() const noexcept { return flare_.FD(); }
private:
std::mutex mutex_;
zeek::detail::Flare flare_;
std::list<broker::event_ptr> queue_;
};
using LoggerQueuePtr = std::shared_ptr<LoggerQueue>;
using BrokerSeverityLevel = broker::event::severity_level;
class LoggerAdapter : public broker::event_observer {
public:
using SeverityLevel = broker::event::severity_level;
explicit LoggerAdapter(SeverityLevel severity, LoggerQueuePtr queue)
: severity_(severity), queue_(std::move(queue)) {}
void observe(broker::event_ptr what) override { queue_->Push(std::move(what)); }
bool accepts(SeverityLevel severity, broker::event::component_type) const override { return severity <= severity_; }
private:
SeverityLevel severity_;
LoggerQueuePtr queue_;
};
} // namespace
namespace zeek::Broker {
@ -162,13 +219,19 @@ struct opt_mapping {
class BrokerState {
public:
BrokerState(broker::configuration config, size_t congestion_queue_size)
using SeverityLevel = LoggerAdapter::SeverityLevel;
BrokerState(broker::configuration config, size_t congestion_queue_size, LoggerQueuePtr queue)
: endpoint(std::move(config), telemetry_mgr->GetRegistry()),
subscriber(
endpoint.make_subscriber({broker::topic::statuses(), broker::topic::errors()}, congestion_queue_size)) {}
endpoint.make_subscriber({broker::topic::statuses(), broker::topic::errors()}, congestion_queue_size)),
loggerQueue(std::move(queue)) {}
broker::endpoint endpoint;
broker::subscriber subscriber;
LoggerQueuePtr loggerQueue;
SeverityLevel logSeverity = SeverityLevel::critical;
SeverityLevel stderrSeverity = SeverityLevel::critical;
};
const broker::endpoint_info Manager::NoPeer{{}, {}};
@ -213,7 +276,7 @@ std::string RenderEvent(const std::string& topic, const std::string& name, const
} // namespace
#endif
Manager::Manager(bool arg_use_real_time) : Backend(nullptr, nullptr, nullptr) {
Manager::Manager(bool arg_use_real_time) : Backend(nullptr, nullptr, nullptr), iosource::IOSource(true) {
bound_port = 0;
use_real_time = arg_use_real_time;
peer_count = 0;
@ -324,12 +387,32 @@ void Manager::DoInitPostScript() {
config.set("caf.work-stealing.moderate-steal-interval", get_option("Broker::moderate_interval")->AsCount());
config.set("caf.work-stealing.relaxed-steal-interval", get_option("Broker::relaxed_interval")->AsCount());
// Hook up the logger.
auto checkLogSeverity = [](int level) {
if ( level < 0 || level > static_cast<int>(BrokerSeverityLevel::debug) ) {
reporter->FatalError("Invalid Broker::log_severity_level: %d", level);
}
};
auto logSeverityVal = static_cast<int>(get_option("Broker::log_severity_level")->AsEnum());
checkLogSeverity(logSeverityVal);
auto stderrSeverityVal = static_cast<int>(get_option("Broker::log_stderr_severity_level")->AsEnum());
checkLogSeverity(stderrSeverityVal);
auto adapterVerbosity = static_cast<BrokerSeverityLevel>(std::max(logSeverityVal, stderrSeverityVal));
auto queue = std::make_shared<LoggerQueue>();
auto adapter = std::make_shared<LoggerAdapter>(adapterVerbosity, queue);
broker::logger(adapter); // *must* be called before creating the BrokerState
auto cqs = get_option("Broker::congestion_queue_size")->AsCount();
bstate = std::make_shared<BrokerState>(std::move(config), cqs);
bstate = std::make_shared<BrokerState>(std::move(config), cqs, queue);
bstate->logSeverity = static_cast<BrokerSeverityLevel>(logSeverityVal);
bstate->stderrSeverity = static_cast<BrokerSeverityLevel>(stderrSeverityVal);
if ( ! iosource_mgr->RegisterFd(bstate->subscriber.fd(), this) )
reporter->FatalError("Failed to register broker subscriber with iosource_mgr");
if ( ! iosource_mgr->RegisterFd(queue->FlareFd(), this) )
reporter->FatalError("Failed to register broker logger with iosource_mgr");
bstate->subscriber.add_topic(broker::topic::store_events(), true);
InitializeBrokerStoreForwarding();
@ -400,6 +483,8 @@ void Manager::DoTerminate() {
iosource_mgr->UnregisterFd(bstate->subscriber.fd(), this);
iosource_mgr->UnregisterFd(bstate->loggerQueue->FlareFd(), this);
vector<string> stores_to_close;
for ( auto& x : data_stores )
@ -410,6 +495,8 @@ void Manager::DoTerminate() {
// modifies the map and invalidates iterators.
CloseStore(x);
ProcessLogEvents();
FlushLogBuffers();
}
@ -980,11 +1067,9 @@ bool Manager::DoUnsubscribe(const string& topic_prefix) {
return true;
}
void Manager::Process() {
void Manager::ProcessMessages() {
auto messages = bstate->subscriber.poll();
bool had_input = ! messages.empty();
for ( auto& message : messages ) {
auto&& topic = broker::get_topic(message);
@ -1029,18 +1114,107 @@ void Manager::Process() {
continue;
}
}
}
for ( auto& s : data_stores ) {
auto num_available = s.second->proxy.mailbox().size();
namespace {
// Note: copied from Stmt.cc, might be worth to move to a common place.
EnumValPtr lookup_enum_val(const char* module_name, const char* name) {
const auto& id = zeek::detail::lookup_ID(name, module_name);
assert(id);
assert(id->IsEnumConst());
EnumType* et = id->GetType()->AsEnumType();
int index = et->Lookup(module_name, name);
assert(index >= 0);
return et->GetEnumVal(index);
}
} // namespace
void Manager::ProcessLogEvents() {
static auto plval = lookup_enum_val("Broker", "LOG");
static auto lpli = id::find_type<RecordType>("Broker::Info");
static auto ev_critical = lookup_enum_val("Broker", "CRITICAL_EVENT");
static auto ev_error = lookup_enum_val("Broker", "ERROR_EVENT");
static auto ev_warning = lookup_enum_val("Broker", "WARNING_EVENT");
static auto ev_info = lookup_enum_val("Broker", "INFO_EVENT");
static auto ev_verbose = lookup_enum_val("Broker", "VERBOSE_EVENT");
static auto ev_debug = lookup_enum_val("Broker", "DEBUG_EVENT");
auto evType = [](BrokerSeverityLevel lvl) {
switch ( lvl ) {
case BrokerSeverityLevel::critical: return ev_critical;
case BrokerSeverityLevel::error: return ev_error;
case BrokerSeverityLevel::warning: return ev_warning;
case BrokerSeverityLevel::info: return ev_info;
case BrokerSeverityLevel::verbose: return ev_verbose;
default: return ev_debug;
}
};
constexpr const char* severity_names_tbl[] = {"critical", "error", "warning", "info", "verbose", "debug"};
auto events = bstate->loggerQueue->Drain();
for ( auto& event : events ) {
auto severity = event->severity;
if ( bstate->logSeverity >= severity ) {
auto record = make_intrusive<RecordVal>(lpli);
record->AssignTime(0, run_state::network_time);
record->Assign(1, evType(event->severity));
auto ev = make_intrusive<StringVal>(event->identifier);
record->Assign(2, ev);
auto msg = make_intrusive<StringVal>(event->description);
record->Assign(4, msg);
log_mgr->Write(plval.get(), record.get());
}
if ( bstate->stderrSeverity >= severity ) {
fprintf(stderr, "[BROKER/%s] %s\n", severity_names_tbl[static_cast<int>(severity)],
event->description.c_str());
}
}
}
void Manager::ProcessDataStore(detail::StoreHandleVal* store) {
auto num_available = store->proxy.mailbox().size();
if ( num_available > 0 ) {
had_input = true;
auto responses = s.second->proxy.receive(num_available);
auto responses = store->proxy.receive(num_available);
for ( auto& r : responses )
ProcessStoreResponse(s.second, std::move(r));
ProcessStoreResponse(store, std::move(r));
}
}
void Manager::ProcessDataStores() {
for ( auto& kvp : data_stores ) {
ProcessDataStore(kvp.second);
}
}
void Manager::ProcessFd(int fd, int flags) {
if ( fd == bstate->subscriber.fd() ) {
ProcessMessages();
}
else if ( fd == bstate->loggerQueue->FlareFd() ) {
ProcessLogEvents();
}
else {
for ( auto& kvp : data_stores ) {
if ( fd == kvp.second->proxy.mailbox().descriptor() ) {
ProcessDataStore(kvp.second);
return;
}
}
}
}
void Manager::Process() {
ProcessMessages();
ProcessLogEvents();
ProcessDataStores();
}
void Manager::ProcessStoreEventInsertUpdate(const TableValPtr& table, const std::string& store_id,

View file

@ -446,7 +446,20 @@ private:
void Error(const char* format, ...) __attribute__((format(printf, 2, 3)));
// Processes events from the Broker message queue.
void ProcessMessages();
// Process events from Broker logger.
void ProcessLogEvents();
// Process events from @p store.
void ProcessDataStore(detail::StoreHandleVal* store);
// Process events from all Broker data stores.
void ProcessDataStores();
// IOSource interface overrides:
void ProcessFd(int fd, int flags) override;
void Process() override;
const char* Tag() override { return "Broker::Manager"; }
double GetNextTimeout() override { return -1; }

View file

@ -16,6 +16,8 @@
@load base/frameworks/broker/store
redef Broker::log_stderr_severity_level = Broker::LOG_ERROR;
global test_store: opaque of Broker::Store;
global test_table: table[string] of count &broker_store="test_store_42";

View file

@ -5,13 +5,15 @@
# Evil
# @TEST-EXEC: dd if=/dev/zero of=path_to_db.sqlite seek=512 count=32 bs=1
# @TEST-EXEC-FAIL: zeek -b %INPUT >> out
# @TEST-EXEC: zeek -b %INPUT >> out
#
# @TEST-EXEC: grep 'database disk image is malformed' .stderr
# @TEST-EXEC: btest-diff out
@load base/frameworks/broker/store
redef Broker::log_stderr_severity_level = Broker::LOG_ERROR;
global test_store: opaque of Broker::Store;
global test_table: table[string] of count &broker_store="test_store_42";
@ -28,10 +30,10 @@ event zeek_init()
);
if ( Broker::is_closed(test_store) ) {
print("failed to open store");
exit(1);
} else {
print("store is open");
terminate();
return;
}
print("store is open");
local rows = 100;
local i = 0;