Merge remote-tracking branch 'origin/topic/jsiwek/broker-store-process-n'

* origin/topic/jsiwek/broker-store-process-n:
  Improve Broker I/O loop integration: less mutex locking
  Improve processing of broker data store responses
This commit is contained in:
Jon Siwek 2019-05-28 17:43:52 -07:00
commit ebbeb4517b

View file

@ -805,15 +805,8 @@ bool Manager::Unsubscribe(const string& topic_prefix)
void Manager::GetFds(iosource::FD_Set* read, iosource::FD_Set* write,
iosource::FD_Set* except)
{
if ( bstate->status_subscriber.available() || bstate->subscriber.available() )
SetIdle(false);
read->Insert(bstate->subscriber.fd());
read->Insert(bstate->status_subscriber.fd());
write->Insert(bstate->subscriber.fd());
write->Insert(bstate->status_subscriber.fd());
except->Insert(bstate->subscriber.fd());
except->Insert(bstate->status_subscriber.fd());
for ( auto& x : data_stores )
read->Insert(x.second->proxy.mailbox().descriptor());
@ -821,19 +814,10 @@ void Manager::GetFds(iosource::FD_Set* read, iosource::FD_Set* write,
double Manager::NextTimestamp(double* local_network_time)
{
if ( ! IsIdle() )
return timer_mgr->Time();
if ( bstate->status_subscriber.available() || bstate->subscriber.available() )
return timer_mgr->Time();
for ( auto& s : data_stores )
{
if ( ! s.second->proxy.mailbox().empty() )
return timer_mgr->Time();
}
return -1;
// We're only asked for a timestamp if either (1) a FD was ready
// or (2) we're not idle (and we go idle if when Process is no-op),
// so there's no case where returning -1 to signify a skip will help.
return timer_mgr->Time();
}
void Manager::DispatchMessage(const broker::topic& topic, broker::data msg)
@ -933,11 +917,15 @@ void Manager::Process()
for ( auto& s : data_stores )
{
while ( ! s.second->proxy.mailbox().empty() )
auto num_available = s.second->proxy.mailbox().size();
if ( num_available > 0 )
{
had_input = true;
auto response = s.second->proxy.receive();
ProcessStoreResponse(s.second, move(response));
auto responses = s.second->proxy.receive(num_available);
for ( auto& r : responses )
ProcessStoreResponse(s.second, move(r));
}
}