From 30615f425ef956755f4660a6b11fa3effc3e3c24 Mon Sep 17 00:00:00 2001 From: Dominik Charousset Date: Sun, 29 Sep 2024 14:59:35 +0200 Subject: [PATCH 1/2] Hook into Broker logs via its new API The new Broker API allows us to provide a custom logger to Broker that pulls previously unattainable context information out of Broker to put them into broker.log for users of Zeek. Since Broker log events happen asynchronously, we cache them in a queue and use a flare to notify Zeek of activity. Furthermore, the Broker manager now implements the `ProcessFd` function to avoid unnecessary polling of the new log queue. As a side effect, data stores are polled less as well. --- auxil/broker | 2 +- scripts/base/frameworks/broker/log.zeek | 14 +- scripts/base/frameworks/broker/main.zeek | 22 ++ src/broker/Manager.cc | 202 ++++++++++++++++-- src/broker/Manager.h | 13 ++ ...erstore-backend-sqlite-corrupt-delete.zeek | 2 + ...okerstore-backend-sqlite-corrupt-fail.zeek | 10 +- 7 files changed, 245 insertions(+), 20 deletions(-) diff --git a/auxil/broker b/auxil/broker index 5847b2a545..c37005de5f 160000 --- a/auxil/broker +++ b/auxil/broker @@ -1 +1 @@ -Subproject commit 5847b2a5458d03d56654e19b6b51a182476d36e5 +Subproject commit c37005de5fd8cc487a6e1d804622376cc306afcf diff --git a/scripts/base/frameworks/broker/log.zeek b/scripts/base/frameworks/broker/log.zeek index b044214735..73812442a7 100644 --- a/scripts/base/frameworks/broker/log.zeek +++ b/scripts/base/frameworks/broker/log.zeek @@ -14,7 +14,19 @@ export { ## An informational status update. STATUS, ## An error situation. - ERROR + ERROR, + ## Fatal event, normal operation has most likely broken down. + CRITICAL_EVENT, + ## Unrecoverable event that imparts at least part of the system. + ERROR_EVENT, + ## Unespected or conspicuous event that may still be recoverable. + WARNING_EVENT, + ## Noteworthy event during normal operation. + INFO_EVENT, + ## Information that might be relevant for a user to understand system behavior. + VERBOSE_EVENT, + ## An event that is relevant only for troubleshooting and debugging. + DEBUG_EVENT, }; ## A record type containing the column fields of the Broker log. diff --git a/scripts/base/frameworks/broker/main.zeek b/scripts/base/frameworks/broker/main.zeek index 21d3cdaf74..34970f7875 100644 --- a/scripts/base/frameworks/broker/main.zeek +++ b/scripts/base/frameworks/broker/main.zeek @@ -176,6 +176,28 @@ export { ## will be sent. const log_topic: function(id: Log::ID, path: string): string = default_log_topic &redef; + ## The possible log event severity levels for Broker. + type LogSeverityLevel: enum { + ## Fatal event, normal operation has most likely broken down. + LOG_CRITICAL, + ## Unrecoverable event that imparts at least part of the system. + LOG_ERROR, + ## Unespected or conspicuous event that may still be recoverable. + LOG_WARNING, + ## Noteworthy event during normal operation. + LOG_INFO, + ## Information that might be relevant for a user to understand system behavior. + LOG_VERBOSE, + ## An event that is relevant only for troubleshooting and debugging. + LOG_DEBUG, + }; + + ## The log event severity level for the Broker log output. + const log_severity_level = LOG_WARNING &redef; + + ## Event severity level for also printing the Broker log output to stderr. + const log_stderr_severity_level = LOG_CRITICAL &redef; + type ErrorCode: enum { ## The unspecified default error code. UNSPECIFIED = 1, diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index adbf494878..54d295f488 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -5,6 +5,9 @@ #include #include #include +#include +#include +#include #include #include #include @@ -16,10 +19,12 @@ #include "zeek/DebugLogger.h" #include "zeek/Desc.h" #include "zeek/Event.h" +#include "zeek/Flare.h" #include "zeek/Func.h" #include "zeek/IntrusivePtr.h" #include "zeek/Reporter.h" #include "zeek/RunState.h" +#include "zeek/Scope.h" #include "zeek/SerializationFormat.h" #include "zeek/Type.h" #include "zeek/Var.h" @@ -86,6 +91,58 @@ void print_escaped(std::string& buf, std::string_view str) { buf.push_back('"'); } +class LoggerQueue { +public: + void Push(broker::event_ptr event) { + std::list tmp; + tmp.emplace_back(std::move(event)); + { + std::lock_guard lock(mutex_); + queue_.splice(queue_.end(), tmp); + if ( queue_.size() == 1 ) { + flare_.Fire(); + } + } + } + + auto Drain() { + std::list events; + std::lock_guard lock(mutex_); + if ( ! queue_.empty() ) { + queue_.swap(events); + flare_.Extinguish(); + } + return events; + } + + auto FlareFd() const noexcept { return flare_.FD(); } + +private: + std::mutex mutex_; + zeek::detail::Flare flare_; + std::list queue_; +}; + +using LoggerQueuePtr = std::shared_ptr; + +using BrokerSeverityLevel = broker::event::severity_level; + +class LoggerAdapter : public broker::event_observer { +public: + using SeverityLevel = broker::event::severity_level; + + explicit LoggerAdapter(SeverityLevel severity, LoggerQueuePtr queue) + : severity_(severity), queue_(std::move(queue)) {} + + void observe(broker::event_ptr what) override { queue_->Push(std::move(what)); } + + bool accepts(SeverityLevel severity, broker::event::component_type) const override { return severity <= severity_; } + +private: + SeverityLevel severity_; + LoggerQueuePtr queue_; +}; + } // namespace namespace zeek::Broker { @@ -162,13 +219,19 @@ struct opt_mapping { class BrokerState { public: - BrokerState(broker::configuration config, size_t congestion_queue_size) + using SeverityLevel = LoggerAdapter::SeverityLevel; + + BrokerState(broker::configuration config, size_t congestion_queue_size, LoggerQueuePtr queue) : endpoint(std::move(config), telemetry_mgr->GetRegistry()), subscriber( - endpoint.make_subscriber({broker::topic::statuses(), broker::topic::errors()}, congestion_queue_size)) {} + endpoint.make_subscriber({broker::topic::statuses(), broker::topic::errors()}, congestion_queue_size)), + loggerQueue(std::move(queue)) {} broker::endpoint endpoint; broker::subscriber subscriber; + LoggerQueuePtr loggerQueue; + SeverityLevel logSeverity = SeverityLevel::critical; + SeverityLevel stderrSeverity = SeverityLevel::critical; }; const broker::endpoint_info Manager::NoPeer{{}, {}}; @@ -213,7 +276,7 @@ std::string RenderEvent(const std::string& topic, const std::string& name, const } // namespace #endif -Manager::Manager(bool arg_use_real_time) : Backend(nullptr, nullptr, nullptr) { +Manager::Manager(bool arg_use_real_time) : Backend(nullptr, nullptr, nullptr), iosource::IOSource(true) { bound_port = 0; use_real_time = arg_use_real_time; peer_count = 0; @@ -324,12 +387,32 @@ void Manager::DoInitPostScript() { config.set("caf.work-stealing.moderate-steal-interval", get_option("Broker::moderate_interval")->AsCount()); config.set("caf.work-stealing.relaxed-steal-interval", get_option("Broker::relaxed_interval")->AsCount()); + // Hook up the logger. + auto checkLogSeverity = [](int level) { + if ( level < 0 || level > static_cast(BrokerSeverityLevel::debug) ) { + reporter->FatalError("Invalid Broker::log_severity_level: %d", level); + } + }; + auto logSeverityVal = static_cast(get_option("Broker::log_severity_level")->AsEnum()); + checkLogSeverity(logSeverityVal); + auto stderrSeverityVal = static_cast(get_option("Broker::log_stderr_severity_level")->AsEnum()); + checkLogSeverity(stderrSeverityVal); + auto adapterVerbosity = static_cast(std::max(logSeverityVal, stderrSeverityVal)); + auto queue = std::make_shared(); + auto adapter = std::make_shared(adapterVerbosity, queue); + broker::logger(adapter); // *must* be called before creating the BrokerState + auto cqs = get_option("Broker::congestion_queue_size")->AsCount(); - bstate = std::make_shared(std::move(config), cqs); + bstate = std::make_shared(std::move(config), cqs, queue); + bstate->logSeverity = static_cast(logSeverityVal); + bstate->stderrSeverity = static_cast(stderrSeverityVal); if ( ! iosource_mgr->RegisterFd(bstate->subscriber.fd(), this) ) reporter->FatalError("Failed to register broker subscriber with iosource_mgr"); + if ( ! iosource_mgr->RegisterFd(queue->FlareFd(), this) ) + reporter->FatalError("Failed to register broker logger with iosource_mgr"); + bstate->subscriber.add_topic(broker::topic::store_events(), true); InitializeBrokerStoreForwarding(); @@ -400,6 +483,8 @@ void Manager::DoTerminate() { iosource_mgr->UnregisterFd(bstate->subscriber.fd(), this); + iosource_mgr->UnregisterFd(bstate->loggerQueue->FlareFd(), this); + vector stores_to_close; for ( auto& x : data_stores ) @@ -410,6 +495,8 @@ void Manager::DoTerminate() { // modifies the map and invalidates iterators. CloseStore(x); + ProcessLogEvents(); + FlushLogBuffers(); } @@ -980,11 +1067,9 @@ bool Manager::DoUnsubscribe(const string& topic_prefix) { return true; } -void Manager::Process() { +void Manager::ProcessMessages() { auto messages = bstate->subscriber.poll(); - bool had_input = ! messages.empty(); - for ( auto& message : messages ) { auto&& topic = broker::get_topic(message); @@ -1029,20 +1114,109 @@ void Manager::Process() { continue; } } +} - for ( auto& s : data_stores ) { - auto num_available = s.second->proxy.mailbox().size(); +namespace { - if ( num_available > 0 ) { - had_input = true; - auto responses = s.second->proxy.receive(num_available); +// Note: copied from Stmt.cc, might be worth to move to a common place. +EnumValPtr lookup_enum_val(const char* module_name, const char* name) { + const auto& id = zeek::detail::lookup_ID(name, module_name); + assert(id); + assert(id->IsEnumConst()); - for ( auto& r : responses ) - ProcessStoreResponse(s.second, std::move(r)); + EnumType* et = id->GetType()->AsEnumType(); + + int index = et->Lookup(module_name, name); + assert(index >= 0); + + return et->GetEnumVal(index); +} + +} // namespace + +void Manager::ProcessLogEvents() { + static auto plval = lookup_enum_val("Broker", "LOG"); + static auto lpli = id::find_type("Broker::Info"); + static auto ev_critical = lookup_enum_val("Broker", "CRITICAL_EVENT"); + static auto ev_error = lookup_enum_val("Broker", "ERROR_EVENT"); + static auto ev_warning = lookup_enum_val("Broker", "WARNING_EVENT"); + static auto ev_info = lookup_enum_val("Broker", "INFO_EVENT"); + static auto ev_verbose = lookup_enum_val("Broker", "VERBOSE_EVENT"); + static auto ev_debug = lookup_enum_val("Broker", "DEBUG_EVENT"); + + auto evType = [](BrokerSeverityLevel lvl) { + switch ( lvl ) { + case BrokerSeverityLevel::critical: return ev_critical; + case BrokerSeverityLevel::error: return ev_error; + case BrokerSeverityLevel::warning: return ev_warning; + case BrokerSeverityLevel::info: return ev_info; + case BrokerSeverityLevel::verbose: return ev_verbose; + default: return ev_debug; + } + }; + + constexpr const char* severity_names_tbl[] = {"critical", "error", "warning", "info", "verbose", "debug"}; + + auto events = bstate->loggerQueue->Drain(); + for ( auto& event : events ) { + auto severity = event->severity; + if ( bstate->logSeverity >= severity ) { + auto record = make_intrusive(lpli); + record->AssignTime(0, run_state::network_time); + record->Assign(1, evType(event->severity)); + auto ev = make_intrusive(event->identifier); + record->Assign(2, ev); + auto msg = make_intrusive(event->description); + record->Assign(4, msg); + log_mgr->Write(plval.get(), record.get()); + } + if ( bstate->stderrSeverity >= severity ) { + fprintf(stderr, "[BROKER/%s] %s\n", severity_names_tbl[static_cast(severity)], + event->description.c_str()); } } } +void Manager::ProcessDataStore(detail::StoreHandleVal* store) { + auto num_available = store->proxy.mailbox().size(); + + if ( num_available > 0 ) { + auto responses = store->proxy.receive(num_available); + + for ( auto& r : responses ) + ProcessStoreResponse(store, std::move(r)); + } +} + +void Manager::ProcessDataStores() { + for ( auto& kvp : data_stores ) { + ProcessDataStore(kvp.second); + } +} + +void Manager::ProcessFd(int fd, int flags) { + if ( fd == bstate->subscriber.fd() ) { + ProcessMessages(); + } + else if ( fd == bstate->loggerQueue->FlareFd() ) { + ProcessLogEvents(); + } + else { + for ( auto& kvp : data_stores ) { + if ( fd == kvp.second->proxy.mailbox().descriptor() ) { + ProcessDataStore(kvp.second); + return; + } + } + } +} + +void Manager::Process() { + ProcessMessages(); + ProcessLogEvents(); + ProcessDataStores(); +} + void Manager::ProcessStoreEventInsertUpdate(const TableValPtr& table, const std::string& store_id, const broker::data& key, const broker::data& data, const broker::data& old_value, bool insert) { diff --git a/src/broker/Manager.h b/src/broker/Manager.h index f41ba97c2c..e72e5d0df5 100644 --- a/src/broker/Manager.h +++ b/src/broker/Manager.h @@ -446,7 +446,20 @@ private: void Error(const char* format, ...) __attribute__((format(printf, 2, 3))); + // Processes events from the Broker message queue. + void ProcessMessages(); + + // Process events from Broker logger. + void ProcessLogEvents(); + + // Process events from @p store. + void ProcessDataStore(detail::StoreHandleVal* store); + + // Process events from all Broker data stores. + void ProcessDataStores(); + // IOSource interface overrides: + void ProcessFd(int fd, int flags) override; void Process() override; const char* Tag() override { return "Broker::Manager"; } double GetNextTimeout() override { return -1; } diff --git a/testing/btest/broker/store/brokerstore-backend-sqlite-corrupt-delete.zeek b/testing/btest/broker/store/brokerstore-backend-sqlite-corrupt-delete.zeek index 0df00a5a18..249ffea496 100644 --- a/testing/btest/broker/store/brokerstore-backend-sqlite-corrupt-delete.zeek +++ b/testing/btest/broker/store/brokerstore-backend-sqlite-corrupt-delete.zeek @@ -16,6 +16,8 @@ @load base/frameworks/broker/store +redef Broker::log_stderr_severity_level = Broker::LOG_ERROR; + global test_store: opaque of Broker::Store; global test_table: table[string] of count &broker_store="test_store_42"; diff --git a/testing/btest/broker/store/brokerstore-backend-sqlite-corrupt-fail.zeek b/testing/btest/broker/store/brokerstore-backend-sqlite-corrupt-fail.zeek index c0631f3ead..4c6a37cb81 100644 --- a/testing/btest/broker/store/brokerstore-backend-sqlite-corrupt-fail.zeek +++ b/testing/btest/broker/store/brokerstore-backend-sqlite-corrupt-fail.zeek @@ -5,13 +5,15 @@ # Evil # @TEST-EXEC: dd if=/dev/zero of=path_to_db.sqlite seek=512 count=32 bs=1 -# @TEST-EXEC-FAIL: zeek -b %INPUT >> out +# @TEST-EXEC: zeek -b %INPUT >> out # # @TEST-EXEC: grep 'database disk image is malformed' .stderr # @TEST-EXEC: btest-diff out @load base/frameworks/broker/store +redef Broker::log_stderr_severity_level = Broker::LOG_ERROR; + global test_store: opaque of Broker::Store; global test_table: table[string] of count &broker_store="test_store_42"; @@ -28,10 +30,10 @@ event zeek_init() ); if ( Broker::is_closed(test_store) ) { print("failed to open store"); - exit(1); - } else { - print("store is open"); + terminate(); + return; } + print("store is open"); local rows = 100; local i = 0; From 20b3eca2575f70e37a3e86db412b0e154d8463eb Mon Sep 17 00:00:00 2001 From: Dominik Charousset Date: Sat, 15 Feb 2025 16:37:24 +0100 Subject: [PATCH 2/2] Integrate review feedback --- scripts/base/frameworks/broker/log.zeek | 25 ++++++++++++++++++++++ src/broker/Manager.cc | 28 +++++++++++-------------- src/broker/comm.bif | 9 ++++++++ 3 files changed, 46 insertions(+), 16 deletions(-) diff --git a/scripts/base/frameworks/broker/log.zeek b/scripts/base/frameworks/broker/log.zeek index 73812442a7..c77fd59087 100644 --- a/scripts/base/frameworks/broker/log.zeek +++ b/scripts/base/frameworks/broker/log.zeek @@ -95,3 +95,28 @@ event Broker::error(code: ErrorCode, msg: string) Reporter::error(fmt("Broker error (%s): %s", code, msg)); } +event Broker::internal_log_event(lvl: LogSeverityLevel, id: string, description: string) + { + local severity = Broker::CRITICAL_EVENT; + switch lvl { + case Broker::LOG_ERROR: + severity = Broker::ERROR; + break; + case Broker::LOG_WARNING: + severity = Broker::WARNING_EVENT; + break; + case Broker::LOG_INFO: + severity = Broker::INFO_EVENT; + break; + case Broker::LOG_VERBOSE: + severity = Broker::VERBOSE_EVENT; + break; + case Broker::LOG_DEBUG: + severity = Broker::DEBUG_EVENT; + break; + } + Log::write(Broker::LOG, [$ts = network_time(), + $ty = severity, + $ev = id, + $message = description]); + } diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index 54d295f488..fb04d30c17 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -1135,14 +1135,12 @@ EnumValPtr lookup_enum_val(const char* module_name, const char* name) { } // namespace void Manager::ProcessLogEvents() { - static auto plval = lookup_enum_val("Broker", "LOG"); - static auto lpli = id::find_type("Broker::Info"); - static auto ev_critical = lookup_enum_val("Broker", "CRITICAL_EVENT"); - static auto ev_error = lookup_enum_val("Broker", "ERROR_EVENT"); - static auto ev_warning = lookup_enum_val("Broker", "WARNING_EVENT"); - static auto ev_info = lookup_enum_val("Broker", "INFO_EVENT"); - static auto ev_verbose = lookup_enum_val("Broker", "VERBOSE_EVENT"); - static auto ev_debug = lookup_enum_val("Broker", "DEBUG_EVENT"); + static auto ev_critical = lookup_enum_val("Broker", "LOG_CRITICAL"); + static auto ev_error = lookup_enum_val("Broker", "LOG_ERROR"); + static auto ev_warning = lookup_enum_val("Broker", "LOG_WARNING"); + static auto ev_info = lookup_enum_val("Broker", "LOG_INFO"); + static auto ev_verbose = lookup_enum_val("Broker", "LOG_VERBOSE"); + static auto ev_debug = lookup_enum_val("Broker", "LOG_DEBUG"); auto evType = [](BrokerSeverityLevel lvl) { switch ( lvl ) { @@ -1161,14 +1159,12 @@ void Manager::ProcessLogEvents() { for ( auto& event : events ) { auto severity = event->severity; if ( bstate->logSeverity >= severity ) { - auto record = make_intrusive(lpli); - record->AssignTime(0, run_state::network_time); - record->Assign(1, evType(event->severity)); - auto ev = make_intrusive(event->identifier); - record->Assign(2, ev); - auto msg = make_intrusive(event->description); - record->Assign(4, msg); - log_mgr->Write(plval.get(), record.get()); + auto args = Args{}; + args.reserve(3); + args.emplace_back(evType(severity)); + args.emplace_back(make_intrusive(event->identifier)); + args.emplace_back(make_intrusive(event->description)); + event_mgr.Enqueue(::Broker::internal_log_event, std::move(args)); } if ( bstate->stderrSeverity >= severity ) { fprintf(stderr, "[BROKER/%s] %s\n", severity_names_tbl[static_cast(severity)], diff --git a/src/broker/comm.bif b/src/broker/comm.bif index 5cc2e89f8c..81f1114a22 100644 --- a/src/broker/comm.bif +++ b/src/broker/comm.bif @@ -7,6 +7,15 @@ module Broker; +## Generated when Broker emits an internal logging event. +## +## lvl: the severity of the event as reported by Broker. +## +## id: an identifier for the event type. +## +## description: a message providing additional context. +event Broker::internal_log_event%(lvl: LogSeverityLevel, id: string, description: string%); + ## Generated when a new peering has been established. Both sides of the peering ## receive this event, created independently in each endpoint. For the endpoint ## establishing the peering, the added endpoint's network information will match