From cf2b5f7e05898feab69f163dde5f8bddd6ce1a7d Mon Sep 17 00:00:00 2001 From: Dominik Charousset Date: Mon, 7 Dec 2020 14:56:19 +0100 Subject: [PATCH 1/5] Migrate to CAF 0.18 --- CMakeLists.txt | 33 ++++++++++++++++++++++++--------- auxil/broker | 2 +- configure | 2 +- src/broker/Manager.cc | 28 +++++++++++++++------------- 4 files changed, 41 insertions(+), 24 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index ab87906ec0..8ed9b2e83f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,6 +1,6 @@ # When changing the minimum version here, also adapt # auxil/zeek-aux/plugin-support/skeleton/CMakeLists.txt -cmake_minimum_required(VERSION 3.5 FATAL_ERROR) +cmake_minimum_required(VERSION 3.5...3.18 FATAL_ERROR) project(Zeek C CXX) @@ -289,18 +289,33 @@ if ( PYTHON_VERSION_STRING VERSION_LESS ${ZEEK_PYTHON_MIN} ) message(FATAL_ERROR "Python ${ZEEK_PYTHON_MIN} or greater is required.") endif () -if ( CAF_ROOT_DIR ) - find_package(CAF COMPONENTS core io openssl REQUIRED) +if ( CAF_ROOT OR BROKER_ROOT_DIR ) + # TODO: drop < 3.12 compatibility check when raising the minimum CMake version + if ( CAF_ROOT AND CMAKE_VERSION VERSION_LESS 3.12 ) + find_package(CAF ${CAF_VERSION_MIN_REQUIRED} REQUIRED CONFIG + COMPONENTS openssl test io core + PATHS "${CAF_ROOT}") + else () + find_package(CAF ${CAF_VERSION_MIN_REQUIRED} REQUIRED CONFIG + COMPONENTS openssl test io core) + endif () + message(STATUS "Using system CAF version ${CAF_VERSION}") + # TODO: drop these legacy variables and simply use the targets consistently + set(CAF_LIBRARIES CAF::core CAF::io CAF::openssl CACHE INTERNAL "") + set(caf_dirs "") + foreach (caf_lib IN LISTS CAF_LIBRARIES ITEMS CAF::test) + get_target_property(dirs ${caf_lib} INTERFACE_INCLUDE_DIRECTORIES) + list(APPEND caf_dirs ${dirs}) + endforeach () + list(REMOVE_DUPLICATES caf_dirs) + list(GET caf_dirs 0 caf_dir) + set(CAF_INCLUDE_DIRS "${caf_dirs}" CACHE INTERNAL "") endif () add_subdirectory(auxil/paraglob) set(zeekdeps ${zeekdeps} paraglob) if ( BROKER_ROOT_DIR ) - # Avoid calling find_package(CAF) twice. - if ( NOT CAF_ROOT_DIR ) - find_package(CAF COMPONENTS core io openssl REQUIRED) - endif () find_package(Broker REQUIRED) set(zeekdeps ${zeekdeps} ${BROKER_LIBRARY}) set(broker_includes ${BROKER_INCLUDE_DIR}) @@ -455,8 +470,8 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/zeek-config.h.in include_directories(${CMAKE_CURRENT_BINARY_DIR}) install(FILES ${CMAKE_CURRENT_BINARY_DIR}/zeek-config.h DESTINATION include/zeek) -if ( CAF_ROOT_DIR ) - set(ZEEK_CONFIG_CAF_ROOT_DIR ${CAF_ROOT_DIR}) +if ( CAF_ROOT ) + set(ZEEK_CONFIG_CAF_ROOT ${CAF_ROOT_DIR}) else () set(ZEEK_CONFIG_CAF_ROOT_DIR ${ZEEK_ROOT_DIR}) endif () diff --git a/auxil/broker b/auxil/broker index 8899280694..4f43a95824 160000 --- a/auxil/broker +++ b/auxil/broker @@ -1 +1 @@ -Subproject commit 8899280694d8d5ad3aaa0a03cc99e4c3d3fd7887 +Subproject commit 4f43a95824c700642bf466f6f41274fc4d1baebb diff --git a/configure b/configure index 7b74d3edf7..162b5c4acf 100755 --- a/configure +++ b/configure @@ -328,7 +328,7 @@ while [ $# -ne 0 ]; do append_cache_entry BROKER_ROOT_DIR PATH $optarg ;; --with-caf=*) - append_cache_entry CAF_ROOT_DIR PATH $optarg + append_cache_entry CAF_ROOT PATH $optarg ;; --with-libkqueue=*) append_cache_entry LIBKQUEUE_ROOT_DIR PATH $optarg diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index 5726a25550..8aec4f5db3 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -176,36 +176,36 @@ void Manager::InitPostScript() auto scheduler_policy = get_option("Broker::scheduler_policy")->AsString()->CheckString(); if ( util::streq(scheduler_policy, "sharing") ) - config.set("scheduler.policy", caf::atom("sharing")); + config.set("caf.scheduler.policy", "sharing"); else if ( util::streq(scheduler_policy, "stealing") ) - config.set("scheduler.policy", caf::atom("stealing")); + config.set("caf.scheduler.policy", "stealing"); else reporter->FatalError("Invalid Broker::scheduler_policy: %s", scheduler_policy); auto max_threads_env = util::zeekenv("ZEEK_BROKER_MAX_THREADS"); if ( max_threads_env ) - config.set("scheduler.max-threads", atoi(max_threads_env)); + config.set("caf.scheduler.max-threads", atoi(max_threads_env)); else - config.set("scheduler.max-threads", + config.set("caf.scheduler.max-threads", get_option("Broker::max_threads")->AsCount()); - config.set("work-stealing.moderate-sleep-duration", caf::timespan( + config.set("caf.work-stealing.moderate-sleep-duration", caf::timespan( static_cast(get_option("Broker::moderate_sleep")->AsInterval() * 1e9))); - config.set("work-stealing.relaxed-sleep-duration", caf::timespan( + config.set("caf.work-stealing.relaxed-sleep-duration", caf::timespan( static_cast(get_option("Broker::relaxed_sleep")->AsInterval() * 1e9))); - config.set("work-stealing.aggressive-poll-attempts", + config.set("caf.work-stealing.aggressive-poll-attempts", get_option("Broker::aggressive_polls")->AsCount()); - config.set("work-stealing.moderate-poll-attempts", + config.set("caf.work-stealing.moderate-poll-attempts", get_option("Broker::moderate_polls")->AsCount()); - config.set("work-stealing.aggressive-steal-interval", + config.set("caf.work-stealing.aggressive-steal-interval", get_option("Broker::aggressive_interval")->AsCount()); - config.set("work-stealing.moderate-steal-interval", + config.set("caf.work-stealing.moderate-steal-interval", get_option("Broker::moderate_interval")->AsCount()); - config.set("work-stealing.relaxed-steal-interval", + config.set("caf.work-stealing.relaxed-steal-interval", get_option("Broker::relaxed_interval")->AsCount()); auto cqs = get_option("Broker::congestion_queue_size")->AsCount(); @@ -1487,7 +1487,7 @@ void Manager::ProcessError(broker::error err) BifEnum::Broker::ErrorCode ec; std::string msg; - if ( err.category() == caf::atom("broker") ) + if ( err.category() == caf::type_id_v ) { static auto enum_type = id::find_type("Broker::ErrorCode"); @@ -1504,7 +1504,9 @@ void Manager::ProcessError(broker::error err) else { ec = BifEnum::Broker::ErrorCode::CAF_ERROR; - msg = util::fmt("[%s] %s", caf::to_string(err.category()).c_str(), caf::to_string(err.context()).c_str()); + auto sv = caf::query_type_name(err.category()); + std::string category{sv.begin(), sv.end()}; + msg = util::fmt("[%s] %s", category.c_str(), caf::to_string(err.context()).c_str()); } event_mgr.Enqueue(::Broker::error, From 38aba87e41cf0b95bb6ddd3c1a0e40f9e10785ef Mon Sep 17 00:00:00 2001 From: Dominik Charousset Date: Mon, 7 Dec 2020 16:43:42 +0100 Subject: [PATCH 2/5] Preserve string output of Broker errors --- src/broker/Manager.cc | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index 8aec4f5db3..a486af5bdc 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -1499,7 +1499,20 @@ void Manager::ProcessError(broker::error err) ec = BifEnum::Broker::ErrorCode::UNSPECIFIED; } - msg = caf::to_string(err.context()); + // Note: we could also use to_string, but that change the log output + // and we would have to update all baselines relying on this format. + if ( auto mv = caf::make_const_typed_message_view(err.context()) ) + { + msg += '('; + msg += to_string(get<0>(mv).node); + msg += ", "; + msg += caf::deep_to_string(get<0>(mv).network); + msg += ", "; + msg += caf::deep_to_string(get<1>(mv)); + msg += ')'; + } + else + msg = caf::to_string(err.context()); } else { From 9da68ddc3d538e30359f2b0cf663a955638a3ff4 Mon Sep 17 00:00:00 2001 From: Dominik Charousset Date: Mon, 7 Dec 2020 20:37:08 +0100 Subject: [PATCH 3/5] Fix UB in shutdown of Broker manager --- src/broker/Manager.cc | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index a486af5bdc..e09d1117c4 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -288,8 +288,6 @@ void Manager::Terminate() if ( p.peer.network ) bstate->endpoint.unpeer(p.peer.network->address, p.peer.network->port); - - bstate->endpoint.shutdown(); } bool Manager::Active() From 25fef3da1bd4c53116321e1408c865e8fd467398 Mon Sep 17 00:00:00 2001 From: Dominik Charousset Date: Sun, 8 Nov 2020 13:41:00 +0100 Subject: [PATCH 4/5] Fix message ordering of Broker messages Using two separate Broker subscribers for status events and regular messages introduces a race on the two objects. Even if Broker sends all messages in a particular (deterministic) order, Zeek may still process them in a different order as a result. Since several tests rely on a strict ordering of Broker events, these tests could fail sporadically. Using only a single subscriber for all Broker messages makes sure that Zeek observes all messages in the same order as Broker emits them. --- auxil/broker | 2 +- src/broker/Manager.cc | 144 +++++++++++++++++++++--------------------- src/broker/Manager.h | 4 +- 3 files changed, 76 insertions(+), 74 deletions(-) diff --git a/auxil/broker b/auxil/broker index 4f43a95824..71534a16c5 160000 --- a/auxil/broker +++ b/auxil/broker @@ -1 +1 @@ -Subproject commit 4f43a95824c700642bf466f6f41274fc4d1baebb +Subproject commit 71534a16c50735ee268c43aa9942c57f23b9b4d4 diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index e09d1117c4..7ded17be45 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -57,14 +57,14 @@ class BrokerState { public: BrokerState(BrokerConfig config, size_t congestion_queue_size) : endpoint(std::move(config)), - subscriber(endpoint.make_subscriber({}, congestion_queue_size)), - status_subscriber(endpoint.make_status_subscriber(true)) + subscriber(endpoint.make_subscriber({broker::topics::statuses, + broker::topics::errors}, + congestion_queue_size)) { } broker::endpoint endpoint; broker::subscriber subscriber; - broker::status_subscriber status_subscriber; }; const broker::endpoint_info Manager::NoPeer{{}, {}}; @@ -114,15 +114,18 @@ static std::string RenderMessage(const broker::vector& xs) return broker::to_string(xs); } -static std::string RenderMessage(const broker::status& s) +static std::string RenderMessage(broker::status_view s) { return broker::to_string(s.code()); } -static std::string RenderMessage(const broker::error& e) +static std::string RenderMessage(broker::error_view e) { - return util::fmt("%s (%s)", broker::to_string(e.code()).c_str(), - caf::to_string(e.context()).c_str()); + if ( auto ctx = e.context() ) + return util::fmt("%s (%s)", to_string(e.code()).c_str(), + to_string(*ctx).c_str()); + else + return util::fmt("%s (null)", to_string(e.code()).c_str()); } #endif @@ -213,8 +216,6 @@ void Manager::InitPostScript() if ( ! iosource_mgr->RegisterFd(bstate->subscriber.fd(), this) ) reporter->FatalError("Failed to register broker subscriber with iosource_mgr"); - if ( ! iosource_mgr->RegisterFd(bstate->status_subscriber.fd(), this) ) - reporter->FatalError("Failed to register broker status subscriber with iosource_mgr"); bstate->subscriber.add_topic(broker::topics::store_events, true); @@ -270,7 +271,6 @@ void Manager::Terminate() FlushLogBuffers(); iosource_mgr->UnregisterFd(bstate->subscriber.fd(), this); - iosource_mgr->UnregisterFd(bstate->status_subscriber.fd(), this); vector stores_to_close; @@ -924,37 +924,44 @@ void Manager::Process() if ( use_real_time ) run_state::detail::update_network_time(util::current_time()); - bool had_input = false; - - auto status_msgs = bstate->status_subscriber.poll(); - - for ( auto& status_msg : status_msgs ) - { - had_input = true; - - if ( auto stat = caf::get_if(&status_msg) ) - { - ProcessStatus(std::move(*stat)); - continue; - } - - if ( auto err = caf::get_if(&status_msg) ) - { - ProcessError(std::move(*err)); - continue; - } - - reporter->InternalWarning("ignoring status_subscriber message with unexpected type"); - } - auto messages = bstate->subscriber.poll(); + bool had_input = ! messages.empty(); + for ( auto& message : messages ) { - had_input = true; - auto& topic = broker::get_topic(message); + if ( broker::topics::statuses.prefix_of(topic) ) + { + if ( auto stat = broker::make_status_view(get_data(message)) ) + { + ProcessStatus(stat); + } + else + { + auto str = to_string(message); + reporter->Warning("ignoring malformed Broker status event: %s", + str.c_str()); + } + continue; + } + + if ( broker::topics::errors.prefix_of(topic) ) + { + if ( auto err = broker::make_error_view(get_data(message)) ) + { + ProcessError(err); + } + else + { + auto str = to_string(message); + reporter->Warning("ignoring malformed Broker error event: %s", + str.c_str()); + } + continue; + } + if ( broker::topics::store_events.prefix_of(topic) ) { ProcessStoreEvent(broker::move_data(message)); @@ -1407,11 +1414,11 @@ bool Manager::ProcessIdentifierUpdate(broker::zeek::IdentifierUpdate iu) return true; } -void Manager::ProcessStatus(broker::status stat) +void Manager::ProcessStatus(broker::status_view stat) { DBG_LOG(DBG_BROKER, "Received status message: %s", RenderMessage(stat).c_str()); - auto ctx = stat.context(); + auto ctx = stat.context(); EventHandlerPtr event; switch (stat.code()) { @@ -1475,51 +1482,46 @@ void Manager::ProcessStatus(broker::status stat) event_mgr.Enqueue(event, std::move(endpoint_info), std::move(msg)); } -void Manager::ProcessError(broker::error err) +void Manager::ProcessError(broker::error_view err) { DBG_LOG(DBG_BROKER, "Received error message: %s", RenderMessage(err).c_str()); if ( ! ::Broker::error ) return; + auto int_code = static_cast(err.code()); + BifEnum::Broker::ErrorCode ec; - std::string msg; - - if ( err.category() == caf::type_id_v ) - { - static auto enum_type = id::find_type("Broker::ErrorCode"); - - if ( enum_type->Lookup(err.code()) ) - ec = static_cast(err.code()); - else - { - reporter->Warning("Unknown Broker error code %u: mapped to unspecificed enum value ", err.code()); - ec = BifEnum::Broker::ErrorCode::UNSPECIFIED; - } - - // Note: we could also use to_string, but that change the log output - // and we would have to update all baselines relying on this format. - if ( auto mv = caf::make_const_typed_message_view(err.context()) ) - { - msg += '('; - msg += to_string(get<0>(mv).node); - msg += ", "; - msg += caf::deep_to_string(get<0>(mv).network); - msg += ", "; - msg += caf::deep_to_string(get<1>(mv)); - msg += ')'; - } - else - msg = caf::to_string(err.context()); - } + static auto enum_type = id::find_type("Broker::ErrorCode"); + if ( enum_type->Lookup(int_code) ) + ec = static_cast(int_code); else { - ec = BifEnum::Broker::ErrorCode::CAF_ERROR; - auto sv = caf::query_type_name(err.category()); - std::string category{sv.begin(), sv.end()}; - msg = util::fmt("[%s] %s", category.c_str(), caf::to_string(err.context()).c_str()); + + reporter->Warning("Unknown Broker error code %u: mapped to unspecificed enum value ", + static_cast(int_code)); + ec = BifEnum::Broker::ErrorCode::UNSPECIFIED; } + std::string msg; + // Note: we could also use to_string, but that would change the log output + // and we would have to update all baselines relying on this format. + if ( auto ctx = err.context() ) + { + msg += '('; + msg += to_string(ctx->node); + msg += ", "; + msg += caf::deep_to_string(ctx->network); + msg += ", "; + if ( auto what = err.message() ) + msg += caf::deep_to_string(*what); + else + msg += R"_("")_"; + msg += ')'; + } + else + msg = "(null)"; + event_mgr.Enqueue(::Broker::error, BifType::Enum::Broker::ErrorCode->GetEnumVal(ec), make_intrusive(msg)); diff --git a/src/broker/Manager.h b/src/broker/Manager.h index 5ef0e5efe2..493c88aa08 100644 --- a/src/broker/Manager.h +++ b/src/broker/Manager.h @@ -367,8 +367,8 @@ private: bool ProcessLogCreate(broker::zeek::LogCreate lc); bool ProcessLogWrite(broker::zeek::LogWrite lw); bool ProcessIdentifierUpdate(broker::zeek::IdentifierUpdate iu); - void ProcessStatus(broker::status stat); - void ProcessError(broker::error err); + void ProcessStatus(broker::status_view stat); + void ProcessError(broker::error_view err); void ProcessStoreResponse(detail::StoreHandleVal*, broker::store::response response); void FlushPendingQueries(); // Initializes the masters for Broker backed Zeek tables when using the &backend attribute From 27730aabfb6ecf77459c25a6b043ea9994c894bf Mon Sep 17 00:00:00 2001 From: Dominik Charousset Date: Tue, 8 Dec 2020 09:29:22 +0100 Subject: [PATCH 5/5] Update submodule(s) --- auxil/broker | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/auxil/broker b/auxil/broker index 71534a16c5..675e25314e 160000 --- a/auxil/broker +++ b/auxil/broker @@ -1 +1 @@ -Subproject commit 71534a16c50735ee268c43aa9942c57f23b9b4d4 +Subproject commit 675e25314ec29709d69d9013bf8e26c96f09aa8e