From 69eee058c18376c75bfb6d75732961d800d8b6e1 Mon Sep 17 00:00:00 2001 From: Jon Siwek Date: Sat, 4 May 2019 11:13:48 -0700 Subject: [PATCH 1/2] Improve processing of broker data store responses Now retrieves and processes all N available responses at once instead of one-by-one-until-empty. The later may be problematic from two points: (1) hitting the shared queue/mailbox matching logic once per response instead of once per Process() and (2) looping until empty is not clearly bounded -- imagining a condition where there's a thread trying to push a large influx of responses into the mailbox while at the same time we're trying to take from it until it's empty. --- src/broker/Manager.cc | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index 959ef6cb9d..1da35a87b4 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -936,11 +936,15 @@ void Manager::Process() for ( auto& s : data_stores ) { - while ( ! s.second->proxy.mailbox().empty() ) + auto num_available = s.second->proxy.mailbox().size(); + + if ( num_available > 0 ) { had_input = true; - auto response = s.second->proxy.receive(); - ProcessStoreResponse(s.second, move(response)); + auto responses = s.second->proxy.receive(num_available); + + for ( auto& r : responses ) + ProcessStoreResponse(s.second, move(r)); } } From 3ae4ffc66ec51d929d7641b857aeacaa6c205d7e Mon Sep 17 00:00:00 2001 From: Jon Siwek Date: Fri, 10 May 2019 09:16:29 -0700 Subject: [PATCH 2/2] Improve Broker I/O loop integration: less mutex locking Checking a subscriber for available messages required locking a mutex, but we should never actually need to do that in the main-loop to check for Broker readiness since we can rely on file descriptor polling. --- src/broker/Manager.cc | 24 ++++-------------------- 1 file changed, 4 insertions(+), 20 deletions(-) diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index 1da35a87b4..1a6b79c445 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -808,15 +808,8 @@ bool Manager::Unsubscribe(const string& topic_prefix) void Manager::GetFds(iosource::FD_Set* read, iosource::FD_Set* write, iosource::FD_Set* except) { - if ( bstate->status_subscriber.available() || bstate->subscriber.available() ) - SetIdle(false); - read->Insert(bstate->subscriber.fd()); read->Insert(bstate->status_subscriber.fd()); - write->Insert(bstate->subscriber.fd()); - write->Insert(bstate->status_subscriber.fd()); - except->Insert(bstate->subscriber.fd()); - except->Insert(bstate->status_subscriber.fd()); for ( auto& x : data_stores ) read->Insert(x.second->proxy.mailbox().descriptor()); @@ -824,19 +817,10 @@ void Manager::GetFds(iosource::FD_Set* read, iosource::FD_Set* write, double Manager::NextTimestamp(double* local_network_time) { - if ( ! IsIdle() ) - return timer_mgr->Time(); - - if ( bstate->status_subscriber.available() || bstate->subscriber.available() ) - return timer_mgr->Time(); - - for ( auto& s : data_stores ) - { - if ( ! s.second->proxy.mailbox().empty() ) - return timer_mgr->Time(); - } - - return -1; + // We're only asked for a timestamp if either (1) a FD was ready + // or (2) we're not idle (and we go idle if when Process is no-op), + // so there's no case where returning -1 to signify a skip will help. + return timer_mgr->Time(); } void Manager::DispatchMessage(const broker::topic& topic, broker::data msg)