diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index 392057e0f9..a7ad3a1df6 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -127,15 +127,12 @@ Manager::Manager(bool arg_reading_pcaps) reading_pcaps = arg_reading_pcaps; after_zeek_init = false; peer_count = 0; - times_processed_without_idle = 0; log_batch_size = 0; log_batch_interval = 0; log_topic_func = nullptr; vector_of_data_type = nullptr; log_id_type = nullptr; writer_id_type = nullptr; - - SetIdle(false); } Manager::~Manager() @@ -209,12 +206,18 @@ void Manager::InitPostScript() auto cqs = get_option("Broker::congestion_queue_size")->AsCount(); bstate = std::make_shared(std::move(config), cqs); + + iosource_mgr->RegisterFd(bstate->subscriber.fd(), this); + iosource_mgr->RegisterFd(bstate->status_subscriber.fd(), this); } void Manager::Terminate() { FlushLogBuffers(); + iosource_mgr->UnregisterFd(bstate->subscriber.fd()); + iosource_mgr->UnregisterFd(bstate->status_subscriber.fd()); + vector stores_to_close; for ( auto& x : data_stores ) @@ -275,8 +278,6 @@ void Manager::FlushPendingQueries() } } } - - SetIdle(false); } uint16_t Manager::Listen(const string& addr, uint16_t port) @@ -815,24 +816,6 @@ bool Manager::Unsubscribe(const string& topic_prefix) return true; } -void Manager::GetFds(iosource::FD_Set* read, iosource::FD_Set* write, - iosource::FD_Set* except) - { - read->Insert(bstate->subscriber.fd()); - read->Insert(bstate->status_subscriber.fd()); - - for ( auto& x : data_stores ) - read->Insert(x.second->proxy.mailbox().descriptor()); - } - -double Manager::NextTimestamp(double* local_network_time) - { - // We're only asked for a timestamp if either (1) a FD was ready - // or (2) we're not idle (and we go idle if when Process is no-op), - // so there's no case where returning -1 to signify a skip will help. - return timer_mgr->Time(); - } - void Manager::DispatchMessage(const broker::topic& topic, broker::data msg) { switch ( broker::zeek::Message::type(msg) ) { @@ -885,6 +868,10 @@ void Manager::DispatchMessage(const broker::topic& topic, broker::data msg) void Manager::Process() { + // Ensure that time gets update before processing broker messages, or events + // based on them might get scheduled wrong. + net_update_time(current_time()); + bool had_input = false; auto status_msgs = bstate->status_subscriber.poll(); @@ -949,30 +936,6 @@ void Manager::Process() // network_time, may as well do so now because otherwise the // broker/cluster logs will end up using timestamp 0. net_update_time(current_time()); - - ++times_processed_without_idle; - - // The max number of Process calls allowed to happen in a row without - // idling is chosen a bit arbitrarily, except 12 is around half of the - // SELECT_FREQUENCY (25). - // - // But probably the general idea should be for it to have some relation - // to the SELECT_FREQUENCY: less than it so other busy IOSources can - // fit several Process loops in before the next poll event (e.g. the - // select() call ), but still large enough such that we don't have to - // wait long before the next poll ourselves after being forced to idle. - if ( times_processed_without_idle > 12 ) - { - times_processed_without_idle = 0; - SetIdle(true); - } - else - SetIdle(false); - } - else - { - times_processed_without_idle = 0; - SetIdle(true); } } @@ -1484,6 +1447,7 @@ StoreHandleVal* Manager::MakeMaster(const string& name, broker::backend type, Ref(handle); data_stores.emplace(name, handle); + iosource_mgr->RegisterFd(handle->proxy.mailbox().descriptor(), this); if ( bstate->endpoint.use_real_time() ) return handle; @@ -1520,6 +1484,7 @@ StoreHandleVal* Manager::MakeClone(const string& name, double resync_interval, Ref(handle); data_stores.emplace(name, handle); + iosource_mgr->RegisterFd(handle->proxy.mailbox().descriptor(), this); return handle; } @@ -1538,6 +1503,8 @@ bool Manager::CloseStore(const string& name) if ( s == data_stores.end() ) return false; + iosource_mgr->UnregisterFd(s->second->proxy.mailbox().descriptor()); + for ( auto i = pending_queries.begin(); i != pending_queries.end(); ) if ( i->second->Store().name() == name ) { diff --git a/src/broker/Manager.h b/src/broker/Manager.h index 3b36d189dd..43698a22d3 100644 --- a/src/broker/Manager.h +++ b/src/broker/Manager.h @@ -346,15 +346,9 @@ private: __attribute__((format (printf, 2, 3))); // IOSource interface overrides: - void GetFds(iosource::FD_Set* read, iosource::FD_Set* write, - iosource::FD_Set* except) override; - - double NextTimestamp(double* local_network_time) override; - void Process() override; - - const char* Tag() override - { return "Broker::Manager"; } + const char* Tag() override { return "Broker::Manager"; } + double GetNextTimeout() override { return -1; } struct LogBuffer { // Indexed by topic string. @@ -392,7 +386,6 @@ private: bool reading_pcaps; bool after_zeek_init; int peer_count; - int times_processed_without_idle; size_t log_batch_size; double log_batch_interval;