diff --git a/CHANGES b/CHANGES index 1b82d323dc..64d116eb21 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,17 @@ +7.2.0-dev.454 | 2025-03-31 18:44:59 +0200 + + * Hook into Broker logs via its new API (Dominik Charousset) + + The new Broker API allows us to provide a custom logger to Broker that + pulls previously unattainable context information out of Broker to put + them into broker.log for users of Zeek. + + Since Broker log events happen asynchronously, we cache them in a queue + and use a flare to notify Zeek of activity. Furthermore, the Broker + manager now implements the `ProcessFd` function to avoid unnecessary + polling of the new log queue. As a side effect, data stores are polled + less as well. + 7.2.0-dev.451 | 2025-03-31 09:37:02 -0700 * GH-3526: Add "U" to QUIC history docstrings and expand version string docs (Christian Kreibich, Corelight) diff --git a/NEWS b/NEWS index 7feedfb65c..3d1dbf88da 100644 --- a/NEWS +++ b/NEWS @@ -55,6 +55,13 @@ New Functionality backend for NATS that will be available as an external plugin, but it is not quite ready yet. Both of the existing backends support usage in a cluster environment. +- Broker now exposes more information through ``broker.log``. Broker generated + log messages are now propagated as events to Zeek. This allows exposing more + information for debugging and operational behavior of Broker via Zeek logs. + Two new script-level options ``Broker::log_severity_level`` and + ``Broker::log_stderr_severity_level`` have been introduced to control + the which events to expose by default. + Changed Functionality --------------------- diff --git a/VERSION b/VERSION index a431d2c414..ee3dfa8ea8 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -7.2.0-dev.451 +7.2.0-dev.454 diff --git a/auxil/broker b/auxil/broker index c99696a69e..62a73dbfc9 160000 --- a/auxil/broker +++ b/auxil/broker @@ -1 +1 @@ -Subproject commit c99696a69e5ced0a91bf7c19098d391a57f279ce +Subproject commit 62a73dbfc9cfee2a386240b7f3239f417affce75 diff --git a/scripts/base/frameworks/broker/log.zeek b/scripts/base/frameworks/broker/log.zeek index b044214735..6b133f1ffe 100644 --- a/scripts/base/frameworks/broker/log.zeek +++ b/scripts/base/frameworks/broker/log.zeek @@ -14,7 +14,19 @@ export { ## An informational status update. STATUS, ## An error situation. - ERROR + ERROR, + ## Fatal event, normal operation has most likely broken down. + CRITICAL_EVENT, + ## Unrecoverable event that imparts at least part of the system. + ERROR_EVENT, + ## Unexpected or conspicuous event that may still be recoverable. + WARNING_EVENT, + ## Noteworthy event during normal operation. + INFO_EVENT, + ## Information that might be relevant for a user to understand system behavior. + VERBOSE_EVENT, + ## An event that is relevant only for troubleshooting and debugging. + DEBUG_EVENT, }; ## A record type containing the column fields of the Broker log. @@ -83,3 +95,28 @@ event Broker::error(code: ErrorCode, msg: string) Reporter::error(fmt("Broker error (%s): %s", code, msg)); } +event Broker::internal_log_event(lvl: LogSeverityLevel, id: string, description: string) + { + local severity = Broker::CRITICAL_EVENT; + switch lvl { + case Broker::LOG_ERROR: + severity = Broker::ERROR_EVENT; + break; + case Broker::LOG_WARNING: + severity = Broker::WARNING_EVENT; + break; + case Broker::LOG_INFO: + severity = Broker::INFO_EVENT; + break; + case Broker::LOG_VERBOSE: + severity = Broker::VERBOSE_EVENT; + break; + case Broker::LOG_DEBUG: + severity = Broker::DEBUG_EVENT; + break; + } + Log::write(Broker::LOG, [$ts = network_time(), + $ty = severity, + $ev = id, + $message = description]); + } diff --git a/scripts/base/frameworks/broker/main.zeek b/scripts/base/frameworks/broker/main.zeek index 21d3cdaf74..805f1f1e6c 100644 --- a/scripts/base/frameworks/broker/main.zeek +++ b/scripts/base/frameworks/broker/main.zeek @@ -176,6 +176,28 @@ export { ## will be sent. const log_topic: function(id: Log::ID, path: string): string = default_log_topic &redef; + ## The possible log event severity levels for Broker. + type LogSeverityLevel: enum { + ## Fatal event, normal operation has most likely broken down. + LOG_CRITICAL, + ## Unrecoverable event that imparts at least part of the system. + LOG_ERROR, + ## Unexpected or conspicuous event that may still be recoverable. + LOG_WARNING, + ## Noteworthy event during normal operation. + LOG_INFO, + ## Information that might be relevant for a user to understand system behavior. + LOG_VERBOSE, + ## An event that is relevant only for troubleshooting and debugging. + LOG_DEBUG, + }; + + ## The log event severity level for the Broker log output. + const log_severity_level = LOG_WARNING &redef; + + ## Event severity level for also printing the Broker log output to stderr. + const log_stderr_severity_level = LOG_CRITICAL &redef; + type ErrorCode: enum { ## The unspecified default error code. UNSPECIFIED = 1, diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index e5e6c66e75..f7f5f40303 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -5,6 +5,9 @@ #include #include #include +#include +#include +#include #include #include #include @@ -16,10 +19,12 @@ #include "zeek/DebugLogger.h" #include "zeek/Desc.h" #include "zeek/Event.h" +#include "zeek/Flare.h" #include "zeek/Func.h" #include "zeek/IntrusivePtr.h" #include "zeek/Reporter.h" #include "zeek/RunState.h" +#include "zeek/Scope.h" #include "zeek/SerializationFormat.h" #include "zeek/Type.h" #include "zeek/Var.h" @@ -86,6 +91,58 @@ void print_escaped(std::string& buf, std::string_view str) { buf.push_back('"'); } +class LoggerQueue { +public: + void Push(broker::event_ptr event) { + std::list tmp; + tmp.emplace_back(std::move(event)); + { + std::lock_guard lock(mutex_); + queue_.splice(queue_.end(), tmp); + if ( queue_.size() == 1 ) { + flare_.Fire(); + } + } + } + + auto Drain() { + std::list events; + std::lock_guard lock(mutex_); + if ( ! queue_.empty() ) { + queue_.swap(events); + flare_.Extinguish(); + } + return events; + } + + auto FlareFd() const noexcept { return flare_.FD(); } + +private: + std::mutex mutex_; + zeek::detail::Flare flare_; + std::list queue_; +}; + +using LoggerQueuePtr = std::shared_ptr; + +using BrokerSeverityLevel = broker::event::severity_level; + +class LoggerAdapter : public broker::event_observer { +public: + using SeverityLevel = broker::event::severity_level; + + explicit LoggerAdapter(SeverityLevel severity, LoggerQueuePtr queue) + : severity_(severity), queue_(std::move(queue)) {} + + void observe(broker::event_ptr what) override { queue_->Push(std::move(what)); } + + bool accepts(SeverityLevel severity, broker::event::component_type) const override { return severity <= severity_; } + +private: + SeverityLevel severity_; + LoggerQueuePtr queue_; +}; + } // namespace namespace zeek::Broker { @@ -162,13 +219,19 @@ struct opt_mapping { class BrokerState { public: - BrokerState(broker::configuration config, size_t congestion_queue_size) + using SeverityLevel = LoggerAdapter::SeverityLevel; + + BrokerState(broker::configuration config, size_t congestion_queue_size, LoggerQueuePtr queue) : endpoint(std::move(config), telemetry_mgr->GetRegistry()), subscriber( - endpoint.make_subscriber({broker::topic::statuses(), broker::topic::errors()}, congestion_queue_size)) {} + endpoint.make_subscriber({broker::topic::statuses(), broker::topic::errors()}, congestion_queue_size)), + loggerQueue(std::move(queue)) {} broker::endpoint endpoint; broker::subscriber subscriber; + LoggerQueuePtr loggerQueue; + SeverityLevel logSeverity = SeverityLevel::critical; + SeverityLevel stderrSeverity = SeverityLevel::critical; }; const broker::endpoint_info Manager::NoPeer{{}, {}}; @@ -213,7 +276,7 @@ std::string RenderEvent(const std::string& topic, const std::string& name, const } // namespace #endif -Manager::Manager(bool arg_use_real_time) : Backend(nullptr, nullptr, nullptr) { +Manager::Manager(bool arg_use_real_time) : Backend(nullptr, nullptr, nullptr), iosource::IOSource(true) { bound_port = 0; use_real_time = arg_use_real_time; peer_count = 0; @@ -324,12 +387,32 @@ void Manager::DoInitPostScript() { config.set("caf.work-stealing.moderate-steal-interval", get_option("Broker::moderate_interval")->AsCount()); config.set("caf.work-stealing.relaxed-steal-interval", get_option("Broker::relaxed_interval")->AsCount()); + // Hook up the logger. + auto checkLogSeverity = [](int level) { + if ( level < 0 || level > static_cast(BrokerSeverityLevel::debug) ) { + reporter->FatalError("Invalid Broker::log_severity_level: %d", level); + } + }; + auto logSeverityVal = static_cast(get_option("Broker::log_severity_level")->AsEnum()); + checkLogSeverity(logSeverityVal); + auto stderrSeverityVal = static_cast(get_option("Broker::log_stderr_severity_level")->AsEnum()); + checkLogSeverity(stderrSeverityVal); + auto adapterVerbosity = static_cast(std::max(logSeverityVal, stderrSeverityVal)); + auto queue = std::make_shared(); + auto adapter = std::make_shared(adapterVerbosity, queue); + broker::logger(adapter); // *must* be called before creating the BrokerState + auto cqs = get_option("Broker::congestion_queue_size")->AsCount(); - bstate = std::make_shared(std::move(config), cqs); + bstate = std::make_shared(std::move(config), cqs, queue); + bstate->logSeverity = static_cast(logSeverityVal); + bstate->stderrSeverity = static_cast(stderrSeverityVal); if ( ! iosource_mgr->RegisterFd(bstate->subscriber.fd(), this) ) reporter->FatalError("Failed to register broker subscriber with iosource_mgr"); + if ( ! iosource_mgr->RegisterFd(queue->FlareFd(), this) ) + reporter->FatalError("Failed to register broker logger with iosource_mgr"); + bstate->subscriber.add_topic(broker::topic::store_events(), true); InitializeBrokerStoreForwarding(); @@ -400,6 +483,8 @@ void Manager::DoTerminate() { iosource_mgr->UnregisterFd(bstate->subscriber.fd(), this); + iosource_mgr->UnregisterFd(bstate->loggerQueue->FlareFd(), this); + vector stores_to_close; for ( auto& x : data_stores ) @@ -410,6 +495,8 @@ void Manager::DoTerminate() { // modifies the map and invalidates iterators. CloseStore(x); + ProcessLogEvents(); + FlushLogBuffers(); } @@ -980,11 +1067,9 @@ bool Manager::DoUnsubscribe(const string& topic_prefix) { return true; } -void Manager::Process() { +void Manager::ProcessMessages() { auto messages = bstate->subscriber.poll(); - bool had_input = ! messages.empty(); - for ( auto& message : messages ) { auto&& topic = broker::get_topic(message); @@ -1029,20 +1114,105 @@ void Manager::Process() { continue; } } +} - for ( auto& s : data_stores ) { - auto num_available = s.second->proxy.mailbox().size(); +namespace { - if ( num_available > 0 ) { - had_input = true; - auto responses = s.second->proxy.receive(num_available); +// Note: copied from Stmt.cc, might be worth to move to a common place. +EnumValPtr lookup_enum_val(const char* module_name, const char* name) { + const auto& id = zeek::detail::lookup_ID(name, module_name); + assert(id); + assert(id->IsEnumConst()); - for ( auto& r : responses ) - ProcessStoreResponse(s.second, std::move(r)); + EnumType* et = id->GetType()->AsEnumType(); + + int index = et->Lookup(module_name, name); + assert(index >= 0); + + return et->GetEnumVal(index); +} + +} // namespace + +void Manager::ProcessLogEvents() { + static auto ev_critical = lookup_enum_val("Broker", "LOG_CRITICAL"); + static auto ev_error = lookup_enum_val("Broker", "LOG_ERROR"); + static auto ev_warning = lookup_enum_val("Broker", "LOG_WARNING"); + static auto ev_info = lookup_enum_val("Broker", "LOG_INFO"); + static auto ev_verbose = lookup_enum_val("Broker", "LOG_VERBOSE"); + static auto ev_debug = lookup_enum_val("Broker", "LOG_DEBUG"); + + auto evType = [](BrokerSeverityLevel lvl) { + switch ( lvl ) { + case BrokerSeverityLevel::critical: return ev_critical; + case BrokerSeverityLevel::error: return ev_error; + case BrokerSeverityLevel::warning: return ev_warning; + case BrokerSeverityLevel::info: return ev_info; + case BrokerSeverityLevel::verbose: return ev_verbose; + default: return ev_debug; + } + }; + + constexpr const char* severity_names_tbl[] = {"critical", "error", "warning", "info", "verbose", "debug"}; + + auto events = bstate->loggerQueue->Drain(); + for ( auto& event : events ) { + auto severity = event->severity; + if ( bstate->logSeverity >= severity ) { + auto args = Args{}; + args.reserve(3); + args.emplace_back(evType(severity)); + args.emplace_back(make_intrusive(event->identifier)); + args.emplace_back(make_intrusive(event->description)); + event_mgr.Enqueue(::Broker::internal_log_event, std::move(args)); + } + if ( bstate->stderrSeverity >= severity ) { + fprintf(stderr, "[BROKER/%s] %s\n", severity_names_tbl[static_cast(severity)], + event->description.c_str()); } } } +void Manager::ProcessDataStore(detail::StoreHandleVal* store) { + auto num_available = store->proxy.mailbox().size(); + + if ( num_available > 0 ) { + auto responses = store->proxy.receive(num_available); + + for ( auto& r : responses ) + ProcessStoreResponse(store, std::move(r)); + } +} + +void Manager::ProcessDataStores() { + for ( auto& kvp : data_stores ) { + ProcessDataStore(kvp.second); + } +} + +void Manager::ProcessFd(int fd, int flags) { + if ( fd == bstate->subscriber.fd() ) { + ProcessMessages(); + } + else if ( fd == bstate->loggerQueue->FlareFd() ) { + ProcessLogEvents(); + } + else { + for ( auto& kvp : data_stores ) { + if ( fd == kvp.second->proxy.mailbox().descriptor() ) { + ProcessDataStore(kvp.second); + return; + } + } + } +} + +void Manager::Process() { + ProcessMessages(); + ProcessLogEvents(); + ProcessDataStores(); +} + void Manager::ProcessStoreEventInsertUpdate(const TableValPtr& table, const std::string& store_id, const broker::data& key, const broker::data& data, const broker::data& old_value, bool insert) { diff --git a/src/broker/Manager.h b/src/broker/Manager.h index f41ba97c2c..e72e5d0df5 100644 --- a/src/broker/Manager.h +++ b/src/broker/Manager.h @@ -446,7 +446,20 @@ private: void Error(const char* format, ...) __attribute__((format(printf, 2, 3))); + // Processes events from the Broker message queue. + void ProcessMessages(); + + // Process events from Broker logger. + void ProcessLogEvents(); + + // Process events from @p store. + void ProcessDataStore(detail::StoreHandleVal* store); + + // Process events from all Broker data stores. + void ProcessDataStores(); + // IOSource interface overrides: + void ProcessFd(int fd, int flags) override; void Process() override; const char* Tag() override { return "Broker::Manager"; } double GetNextTimeout() override { return -1; } diff --git a/src/broker/comm.bif b/src/broker/comm.bif index 5cc2e89f8c..81f1114a22 100644 --- a/src/broker/comm.bif +++ b/src/broker/comm.bif @@ -7,6 +7,15 @@ module Broker; +## Generated when Broker emits an internal logging event. +## +## lvl: the severity of the event as reported by Broker. +## +## id: an identifier for the event type. +## +## description: a message providing additional context. +event Broker::internal_log_event%(lvl: LogSeverityLevel, id: string, description: string%); + ## Generated when a new peering has been established. Both sides of the peering ## receive this event, created independently in each endpoint. For the endpoint ## establishing the peering, the added endpoint's network information will match diff --git a/testing/btest/broker/store/brokerstore-backend-sqlite-corrupt-delete.zeek b/testing/btest/broker/store/brokerstore-backend-sqlite-corrupt-delete.zeek index 0df00a5a18..249ffea496 100644 --- a/testing/btest/broker/store/brokerstore-backend-sqlite-corrupt-delete.zeek +++ b/testing/btest/broker/store/brokerstore-backend-sqlite-corrupt-delete.zeek @@ -16,6 +16,8 @@ @load base/frameworks/broker/store +redef Broker::log_stderr_severity_level = Broker::LOG_ERROR; + global test_store: opaque of Broker::Store; global test_table: table[string] of count &broker_store="test_store_42"; diff --git a/testing/btest/broker/store/brokerstore-backend-sqlite-corrupt-fail.zeek b/testing/btest/broker/store/brokerstore-backend-sqlite-corrupt-fail.zeek index c0631f3ead..4c6a37cb81 100644 --- a/testing/btest/broker/store/brokerstore-backend-sqlite-corrupt-fail.zeek +++ b/testing/btest/broker/store/brokerstore-backend-sqlite-corrupt-fail.zeek @@ -5,13 +5,15 @@ # Evil # @TEST-EXEC: dd if=/dev/zero of=path_to_db.sqlite seek=512 count=32 bs=1 -# @TEST-EXEC-FAIL: zeek -b %INPUT >> out +# @TEST-EXEC: zeek -b %INPUT >> out # # @TEST-EXEC: grep 'database disk image is malformed' .stderr # @TEST-EXEC: btest-diff out @load base/frameworks/broker/store +redef Broker::log_stderr_severity_level = Broker::LOG_ERROR; + global test_store: opaque of Broker::Store; global test_table: table[string] of count &broker_store="test_store_42"; @@ -28,10 +30,10 @@ event zeek_init() ); if ( Broker::is_closed(test_store) ) { print("failed to open store"); - exit(1); - } else { - print("store is open"); + terminate(); + return; } + print("store is open"); local rows = 100; local i = 0;