diff --git a/auxil/broker b/auxil/broker index 4f43a95824..71534a16c5 160000 --- a/auxil/broker +++ b/auxil/broker @@ -1 +1 @@ -Subproject commit 4f43a95824c700642bf466f6f41274fc4d1baebb +Subproject commit 71534a16c50735ee268c43aa9942c57f23b9b4d4 diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index e09d1117c4..7ded17be45 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -57,14 +57,14 @@ class BrokerState { public: BrokerState(BrokerConfig config, size_t congestion_queue_size) : endpoint(std::move(config)), - subscriber(endpoint.make_subscriber({}, congestion_queue_size)), - status_subscriber(endpoint.make_status_subscriber(true)) + subscriber(endpoint.make_subscriber({broker::topics::statuses, + broker::topics::errors}, + congestion_queue_size)) { } broker::endpoint endpoint; broker::subscriber subscriber; - broker::status_subscriber status_subscriber; }; const broker::endpoint_info Manager::NoPeer{{}, {}}; @@ -114,15 +114,18 @@ static std::string RenderMessage(const broker::vector& xs) return broker::to_string(xs); } -static std::string RenderMessage(const broker::status& s) +static std::string RenderMessage(broker::status_view s) { return broker::to_string(s.code()); } -static std::string RenderMessage(const broker::error& e) +static std::string RenderMessage(broker::error_view e) { - return util::fmt("%s (%s)", broker::to_string(e.code()).c_str(), - caf::to_string(e.context()).c_str()); + if ( auto ctx = e.context() ) + return util::fmt("%s (%s)", to_string(e.code()).c_str(), + to_string(*ctx).c_str()); + else + return util::fmt("%s (null)", to_string(e.code()).c_str()); } #endif @@ -213,8 +216,6 @@ void Manager::InitPostScript() if ( ! iosource_mgr->RegisterFd(bstate->subscriber.fd(), this) ) reporter->FatalError("Failed to register broker subscriber with iosource_mgr"); - if ( ! iosource_mgr->RegisterFd(bstate->status_subscriber.fd(), this) ) - reporter->FatalError("Failed to register broker status subscriber with iosource_mgr"); bstate->subscriber.add_topic(broker::topics::store_events, true); @@ -270,7 +271,6 @@ void Manager::Terminate() FlushLogBuffers(); iosource_mgr->UnregisterFd(bstate->subscriber.fd(), this); - iosource_mgr->UnregisterFd(bstate->status_subscriber.fd(), this); vector stores_to_close; @@ -924,37 +924,44 @@ void Manager::Process() if ( use_real_time ) run_state::detail::update_network_time(util::current_time()); - bool had_input = false; - - auto status_msgs = bstate->status_subscriber.poll(); - - for ( auto& status_msg : status_msgs ) - { - had_input = true; - - if ( auto stat = caf::get_if(&status_msg) ) - { - ProcessStatus(std::move(*stat)); - continue; - } - - if ( auto err = caf::get_if(&status_msg) ) - { - ProcessError(std::move(*err)); - continue; - } - - reporter->InternalWarning("ignoring status_subscriber message with unexpected type"); - } - auto messages = bstate->subscriber.poll(); + bool had_input = ! messages.empty(); + for ( auto& message : messages ) { - had_input = true; - auto& topic = broker::get_topic(message); + if ( broker::topics::statuses.prefix_of(topic) ) + { + if ( auto stat = broker::make_status_view(get_data(message)) ) + { + ProcessStatus(stat); + } + else + { + auto str = to_string(message); + reporter->Warning("ignoring malformed Broker status event: %s", + str.c_str()); + } + continue; + } + + if ( broker::topics::errors.prefix_of(topic) ) + { + if ( auto err = broker::make_error_view(get_data(message)) ) + { + ProcessError(err); + } + else + { + auto str = to_string(message); + reporter->Warning("ignoring malformed Broker error event: %s", + str.c_str()); + } + continue; + } + if ( broker::topics::store_events.prefix_of(topic) ) { ProcessStoreEvent(broker::move_data(message)); @@ -1407,11 +1414,11 @@ bool Manager::ProcessIdentifierUpdate(broker::zeek::IdentifierUpdate iu) return true; } -void Manager::ProcessStatus(broker::status stat) +void Manager::ProcessStatus(broker::status_view stat) { DBG_LOG(DBG_BROKER, "Received status message: %s", RenderMessage(stat).c_str()); - auto ctx = stat.context(); + auto ctx = stat.context(); EventHandlerPtr event; switch (stat.code()) { @@ -1475,51 +1482,46 @@ void Manager::ProcessStatus(broker::status stat) event_mgr.Enqueue(event, std::move(endpoint_info), std::move(msg)); } -void Manager::ProcessError(broker::error err) +void Manager::ProcessError(broker::error_view err) { DBG_LOG(DBG_BROKER, "Received error message: %s", RenderMessage(err).c_str()); if ( ! ::Broker::error ) return; + auto int_code = static_cast(err.code()); + BifEnum::Broker::ErrorCode ec; - std::string msg; - - if ( err.category() == caf::type_id_v ) - { - static auto enum_type = id::find_type("Broker::ErrorCode"); - - if ( enum_type->Lookup(err.code()) ) - ec = static_cast(err.code()); - else - { - reporter->Warning("Unknown Broker error code %u: mapped to unspecificed enum value ", err.code()); - ec = BifEnum::Broker::ErrorCode::UNSPECIFIED; - } - - // Note: we could also use to_string, but that change the log output - // and we would have to update all baselines relying on this format. - if ( auto mv = caf::make_const_typed_message_view(err.context()) ) - { - msg += '('; - msg += to_string(get<0>(mv).node); - msg += ", "; - msg += caf::deep_to_string(get<0>(mv).network); - msg += ", "; - msg += caf::deep_to_string(get<1>(mv)); - msg += ')'; - } - else - msg = caf::to_string(err.context()); - } + static auto enum_type = id::find_type("Broker::ErrorCode"); + if ( enum_type->Lookup(int_code) ) + ec = static_cast(int_code); else { - ec = BifEnum::Broker::ErrorCode::CAF_ERROR; - auto sv = caf::query_type_name(err.category()); - std::string category{sv.begin(), sv.end()}; - msg = util::fmt("[%s] %s", category.c_str(), caf::to_string(err.context()).c_str()); + + reporter->Warning("Unknown Broker error code %u: mapped to unspecificed enum value ", + static_cast(int_code)); + ec = BifEnum::Broker::ErrorCode::UNSPECIFIED; } + std::string msg; + // Note: we could also use to_string, but that would change the log output + // and we would have to update all baselines relying on this format. + if ( auto ctx = err.context() ) + { + msg += '('; + msg += to_string(ctx->node); + msg += ", "; + msg += caf::deep_to_string(ctx->network); + msg += ", "; + if ( auto what = err.message() ) + msg += caf::deep_to_string(*what); + else + msg += R"_("")_"; + msg += ')'; + } + else + msg = "(null)"; + event_mgr.Enqueue(::Broker::error, BifType::Enum::Broker::ErrorCode->GetEnumVal(ec), make_intrusive(msg)); diff --git a/src/broker/Manager.h b/src/broker/Manager.h index 5ef0e5efe2..493c88aa08 100644 --- a/src/broker/Manager.h +++ b/src/broker/Manager.h @@ -367,8 +367,8 @@ private: bool ProcessLogCreate(broker::zeek::LogCreate lc); bool ProcessLogWrite(broker::zeek::LogWrite lw); bool ProcessIdentifierUpdate(broker::zeek::IdentifierUpdate iu); - void ProcessStatus(broker::status stat); - void ProcessError(broker::error err); + void ProcessStatus(broker::status_view stat); + void ProcessError(broker::error_view err); void ProcessStoreResponse(detail::StoreHandleVal*, broker::store::response response); void FlushPendingQueries(); // Initializes the masters for Broker backed Zeek tables when using the &backend attribute