diff --git a/CHANGES b/CHANGES index 8edd6185cb..34970b5d46 100644 --- a/CHANGES +++ b/CHANGES @@ -1,4 +1,21 @@ +3.3.0-dev.645 | 2020-12-10 14:14:12 -0800 + + * Fix message ordering of Broker messages (Dominik Charousset, Corelight) + + Using two separate Broker subscribers for status events and regular + messages introduces a race on the two objects. Even if Broker sends all + messages in a particular (deterministic) order, Zeek may still process + them in a different order as a result. Since several tests rely on a + strict ordering of Broker events, these tests could fail sporadically. + + Using only a single subscriber for all Broker messages makes sure that + Zeek observes all messages in the same order as Broker emits them. + + * Fix UB in shutdown of Broker manager (Dominik Charousset, Corelight) + + * Migrate to CAF 0.18 (Dominik Charousset, Corelight) + 3.3.0-dev.638 | 2020-12-09 17:29:03 -0800 * Update Mozilla Root Store (Johanna Amann, Corelight) diff --git a/CMakeLists.txt b/CMakeLists.txt index ab87906ec0..ba2a712bff 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,6 +1,6 @@ # When changing the minimum version here, also adapt # auxil/zeek-aux/plugin-support/skeleton/CMakeLists.txt -cmake_minimum_required(VERSION 3.5 FATAL_ERROR) +cmake_minimum_required(VERSION 3.5...3.18 FATAL_ERROR) project(Zeek C CXX) @@ -289,18 +289,35 @@ if ( PYTHON_VERSION_STRING VERSION_LESS ${ZEEK_PYTHON_MIN} ) message(FATAL_ERROR "Python ${ZEEK_PYTHON_MIN} or greater is required.") endif () -if ( CAF_ROOT_DIR ) - find_package(CAF COMPONENTS core io openssl REQUIRED) +if ( CAF_ROOT OR BROKER_ROOT_DIR ) + # TODO: drop < 3.12 compatibility check when raising the minimum CMake version + if ( CAF_ROOT AND CMAKE_VERSION VERSION_LESS 3.12 ) + find_package(CAF ${CAF_VERSION_MIN_REQUIRED} REQUIRED CONFIG + COMPONENTS openssl test io core + PATHS "${CAF_ROOT}") + else () + find_package(CAF ${CAF_VERSION_MIN_REQUIRED} REQUIRED CONFIG + COMPONENTS openssl test io core) + endif () + message(STATUS "Using system CAF version ${CAF_VERSION}") + # TODO: drop these legacy variables and simply use the targets consistently + set(CAF_LIBRARIES CAF::core CAF::io CAF::openssl CACHE INTERNAL "") + set(caf_dirs "") + foreach (caf_lib IN LISTS CAF_LIBRARIES ITEMS CAF::test) + get_target_property(dirs ${caf_lib} INTERFACE_INCLUDE_DIRECTORIES) + if ( dirs ) + list(APPEND caf_dirs ${dirs}) + endif () + endforeach () + list(REMOVE_DUPLICATES caf_dirs) + list(GET caf_dirs 0 caf_dir) + set(CAF_INCLUDE_DIRS "${caf_dirs}" CACHE INTERNAL "") endif () add_subdirectory(auxil/paraglob) set(zeekdeps ${zeekdeps} paraglob) if ( BROKER_ROOT_DIR ) - # Avoid calling find_package(CAF) twice. - if ( NOT CAF_ROOT_DIR ) - find_package(CAF COMPONENTS core io openssl REQUIRED) - endif () find_package(Broker REQUIRED) set(zeekdeps ${zeekdeps} ${BROKER_LIBRARY}) set(broker_includes ${BROKER_INCLUDE_DIR}) @@ -455,8 +472,8 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/zeek-config.h.in include_directories(${CMAKE_CURRENT_BINARY_DIR}) install(FILES ${CMAKE_CURRENT_BINARY_DIR}/zeek-config.h DESTINATION include/zeek) -if ( CAF_ROOT_DIR ) - set(ZEEK_CONFIG_CAF_ROOT_DIR ${CAF_ROOT_DIR}) +if ( CAF_ROOT ) + set(ZEEK_CONFIG_CAF_ROOT_DIR ${CAF_ROOT}) else () set(ZEEK_CONFIG_CAF_ROOT_DIR ${ZEEK_ROOT_DIR}) endif () diff --git a/NEWS b/NEWS index e7179b402d..ee2fa48500 100644 --- a/NEWS +++ b/NEWS @@ -207,6 +207,10 @@ Removed Functionality - CMake versions less than 3.5 are no longer supported. +- CAF version 0.18 is now required and, by default, that is bundled with + the Zeek distribution and will get built unless overridden with the + ``--with-caf=`` configuration option. + Deprecated Functionality ------------------------ diff --git a/VERSION b/VERSION index aa424db489..4c3effd7f3 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -3.3.0-dev.638 +3.3.0-dev.645 diff --git a/auxil/broker b/auxil/broker index bd49a3b789..8a3291800d 160000 --- a/auxil/broker +++ b/auxil/broker @@ -1 +1 @@ -Subproject commit bd49a3b789529618e8425c1cb1981846b16bda61 +Subproject commit 8a3291800d2d40e1b7f421e1e4b9a3ffef778d59 diff --git a/auxil/zeekctl b/auxil/zeekctl index 43bf8270ee..c98d51bc6b 160000 --- a/auxil/zeekctl +++ b/auxil/zeekctl @@ -1 +1 @@ -Subproject commit 43bf8270eeddc56aaf99c698c4d2a639703a7609 +Subproject commit c98d51bc6b7a2d40cdc095108b985088ccb7e32c diff --git a/configure b/configure index 7b74d3edf7..162b5c4acf 100755 --- a/configure +++ b/configure @@ -328,7 +328,7 @@ while [ $# -ne 0 ]; do append_cache_entry BROKER_ROOT_DIR PATH $optarg ;; --with-caf=*) - append_cache_entry CAF_ROOT_DIR PATH $optarg + append_cache_entry CAF_ROOT PATH $optarg ;; --with-libkqueue=*) append_cache_entry LIBKQUEUE_ROOT_DIR PATH $optarg diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index 5726a25550..7ded17be45 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -57,14 +57,14 @@ class BrokerState { public: BrokerState(BrokerConfig config, size_t congestion_queue_size) : endpoint(std::move(config)), - subscriber(endpoint.make_subscriber({}, congestion_queue_size)), - status_subscriber(endpoint.make_status_subscriber(true)) + subscriber(endpoint.make_subscriber({broker::topics::statuses, + broker::topics::errors}, + congestion_queue_size)) { } broker::endpoint endpoint; broker::subscriber subscriber; - broker::status_subscriber status_subscriber; }; const broker::endpoint_info Manager::NoPeer{{}, {}}; @@ -114,15 +114,18 @@ static std::string RenderMessage(const broker::vector& xs) return broker::to_string(xs); } -static std::string RenderMessage(const broker::status& s) +static std::string RenderMessage(broker::status_view s) { return broker::to_string(s.code()); } -static std::string RenderMessage(const broker::error& e) +static std::string RenderMessage(broker::error_view e) { - return util::fmt("%s (%s)", broker::to_string(e.code()).c_str(), - caf::to_string(e.context()).c_str()); + if ( auto ctx = e.context() ) + return util::fmt("%s (%s)", to_string(e.code()).c_str(), + to_string(*ctx).c_str()); + else + return util::fmt("%s (null)", to_string(e.code()).c_str()); } #endif @@ -176,36 +179,36 @@ void Manager::InitPostScript() auto scheduler_policy = get_option("Broker::scheduler_policy")->AsString()->CheckString(); if ( util::streq(scheduler_policy, "sharing") ) - config.set("scheduler.policy", caf::atom("sharing")); + config.set("caf.scheduler.policy", "sharing"); else if ( util::streq(scheduler_policy, "stealing") ) - config.set("scheduler.policy", caf::atom("stealing")); + config.set("caf.scheduler.policy", "stealing"); else reporter->FatalError("Invalid Broker::scheduler_policy: %s", scheduler_policy); auto max_threads_env = util::zeekenv("ZEEK_BROKER_MAX_THREADS"); if ( max_threads_env ) - config.set("scheduler.max-threads", atoi(max_threads_env)); + config.set("caf.scheduler.max-threads", atoi(max_threads_env)); else - config.set("scheduler.max-threads", + config.set("caf.scheduler.max-threads", get_option("Broker::max_threads")->AsCount()); - config.set("work-stealing.moderate-sleep-duration", caf::timespan( + config.set("caf.work-stealing.moderate-sleep-duration", caf::timespan( static_cast(get_option("Broker::moderate_sleep")->AsInterval() * 1e9))); - config.set("work-stealing.relaxed-sleep-duration", caf::timespan( + config.set("caf.work-stealing.relaxed-sleep-duration", caf::timespan( static_cast(get_option("Broker::relaxed_sleep")->AsInterval() * 1e9))); - config.set("work-stealing.aggressive-poll-attempts", + config.set("caf.work-stealing.aggressive-poll-attempts", get_option("Broker::aggressive_polls")->AsCount()); - config.set("work-stealing.moderate-poll-attempts", + config.set("caf.work-stealing.moderate-poll-attempts", get_option("Broker::moderate_polls")->AsCount()); - config.set("work-stealing.aggressive-steal-interval", + config.set("caf.work-stealing.aggressive-steal-interval", get_option("Broker::aggressive_interval")->AsCount()); - config.set("work-stealing.moderate-steal-interval", + config.set("caf.work-stealing.moderate-steal-interval", get_option("Broker::moderate_interval")->AsCount()); - config.set("work-stealing.relaxed-steal-interval", + config.set("caf.work-stealing.relaxed-steal-interval", get_option("Broker::relaxed_interval")->AsCount()); auto cqs = get_option("Broker::congestion_queue_size")->AsCount(); @@ -213,8 +216,6 @@ void Manager::InitPostScript() if ( ! iosource_mgr->RegisterFd(bstate->subscriber.fd(), this) ) reporter->FatalError("Failed to register broker subscriber with iosource_mgr"); - if ( ! iosource_mgr->RegisterFd(bstate->status_subscriber.fd(), this) ) - reporter->FatalError("Failed to register broker status subscriber with iosource_mgr"); bstate->subscriber.add_topic(broker::topics::store_events, true); @@ -270,7 +271,6 @@ void Manager::Terminate() FlushLogBuffers(); iosource_mgr->UnregisterFd(bstate->subscriber.fd(), this); - iosource_mgr->UnregisterFd(bstate->status_subscriber.fd(), this); vector stores_to_close; @@ -288,8 +288,6 @@ void Manager::Terminate() if ( p.peer.network ) bstate->endpoint.unpeer(p.peer.network->address, p.peer.network->port); - - bstate->endpoint.shutdown(); } bool Manager::Active() @@ -926,37 +924,44 @@ void Manager::Process() if ( use_real_time ) run_state::detail::update_network_time(util::current_time()); - bool had_input = false; - - auto status_msgs = bstate->status_subscriber.poll(); - - for ( auto& status_msg : status_msgs ) - { - had_input = true; - - if ( auto stat = caf::get_if(&status_msg) ) - { - ProcessStatus(std::move(*stat)); - continue; - } - - if ( auto err = caf::get_if(&status_msg) ) - { - ProcessError(std::move(*err)); - continue; - } - - reporter->InternalWarning("ignoring status_subscriber message with unexpected type"); - } - auto messages = bstate->subscriber.poll(); + bool had_input = ! messages.empty(); + for ( auto& message : messages ) { - had_input = true; - auto& topic = broker::get_topic(message); + if ( broker::topics::statuses.prefix_of(topic) ) + { + if ( auto stat = broker::make_status_view(get_data(message)) ) + { + ProcessStatus(stat); + } + else + { + auto str = to_string(message); + reporter->Warning("ignoring malformed Broker status event: %s", + str.c_str()); + } + continue; + } + + if ( broker::topics::errors.prefix_of(topic) ) + { + if ( auto err = broker::make_error_view(get_data(message)) ) + { + ProcessError(err); + } + else + { + auto str = to_string(message); + reporter->Warning("ignoring malformed Broker error event: %s", + str.c_str()); + } + continue; + } + if ( broker::topics::store_events.prefix_of(topic) ) { ProcessStoreEvent(broker::move_data(message)); @@ -1409,11 +1414,11 @@ bool Manager::ProcessIdentifierUpdate(broker::zeek::IdentifierUpdate iu) return true; } -void Manager::ProcessStatus(broker::status stat) +void Manager::ProcessStatus(broker::status_view stat) { DBG_LOG(DBG_BROKER, "Received status message: %s", RenderMessage(stat).c_str()); - auto ctx = stat.context(); + auto ctx = stat.context(); EventHandlerPtr event; switch (stat.code()) { @@ -1477,36 +1482,46 @@ void Manager::ProcessStatus(broker::status stat) event_mgr.Enqueue(event, std::move(endpoint_info), std::move(msg)); } -void Manager::ProcessError(broker::error err) +void Manager::ProcessError(broker::error_view err) { DBG_LOG(DBG_BROKER, "Received error message: %s", RenderMessage(err).c_str()); if ( ! ::Broker::error ) return; + auto int_code = static_cast(err.code()); + BifEnum::Broker::ErrorCode ec; - std::string msg; - - if ( err.category() == caf::atom("broker") ) - { - static auto enum_type = id::find_type("Broker::ErrorCode"); - - if ( enum_type->Lookup(err.code()) ) - ec = static_cast(err.code()); - else - { - reporter->Warning("Unknown Broker error code %u: mapped to unspecificed enum value ", err.code()); - ec = BifEnum::Broker::ErrorCode::UNSPECIFIED; - } - - msg = caf::to_string(err.context()); - } + static auto enum_type = id::find_type("Broker::ErrorCode"); + if ( enum_type->Lookup(int_code) ) + ec = static_cast(int_code); else { - ec = BifEnum::Broker::ErrorCode::CAF_ERROR; - msg = util::fmt("[%s] %s", caf::to_string(err.category()).c_str(), caf::to_string(err.context()).c_str()); + + reporter->Warning("Unknown Broker error code %u: mapped to unspecificed enum value ", + static_cast(int_code)); + ec = BifEnum::Broker::ErrorCode::UNSPECIFIED; } + std::string msg; + // Note: we could also use to_string, but that would change the log output + // and we would have to update all baselines relying on this format. + if ( auto ctx = err.context() ) + { + msg += '('; + msg += to_string(ctx->node); + msg += ", "; + msg += caf::deep_to_string(ctx->network); + msg += ", "; + if ( auto what = err.message() ) + msg += caf::deep_to_string(*what); + else + msg += R"_("")_"; + msg += ')'; + } + else + msg = "(null)"; + event_mgr.Enqueue(::Broker::error, BifType::Enum::Broker::ErrorCode->GetEnumVal(ec), make_intrusive(msg)); diff --git a/src/broker/Manager.h b/src/broker/Manager.h index 5ef0e5efe2..493c88aa08 100644 --- a/src/broker/Manager.h +++ b/src/broker/Manager.h @@ -367,8 +367,8 @@ private: bool ProcessLogCreate(broker::zeek::LogCreate lc); bool ProcessLogWrite(broker::zeek::LogWrite lw); bool ProcessIdentifierUpdate(broker::zeek::IdentifierUpdate iu); - void ProcessStatus(broker::status stat); - void ProcessError(broker::error err); + void ProcessStatus(broker::status_view stat); + void ProcessError(broker::error_view err); void ProcessStoreResponse(detail::StoreHandleVal*, broker::store::response response); void FlushPendingQueries(); // Initializes the masters for Broker backed Zeek tables when using the &backend attribute