Broker manager changes to match the new IOSource API and loop architecture

This commit is contained in:
Tim Wojtulewicz 2019-11-26 12:59:28 -07:00
parent c4d9566294
commit c5462eaa80
2 changed files with 16 additions and 56 deletions

View file

@ -127,15 +127,12 @@ Manager::Manager(bool arg_reading_pcaps)
reading_pcaps = arg_reading_pcaps;
after_zeek_init = false;
peer_count = 0;
times_processed_without_idle = 0;
log_batch_size = 0;
log_batch_interval = 0;
log_topic_func = nullptr;
vector_of_data_type = nullptr;
log_id_type = nullptr;
writer_id_type = nullptr;
SetIdle(false);
}
Manager::~Manager()
@ -209,12 +206,18 @@ void Manager::InitPostScript()
auto cqs = get_option("Broker::congestion_queue_size")->AsCount();
bstate = std::make_shared<BrokerState>(std::move(config), cqs);
iosource_mgr->RegisterFd(bstate->subscriber.fd(), this);
iosource_mgr->RegisterFd(bstate->status_subscriber.fd(), this);
}
void Manager::Terminate()
{
FlushLogBuffers();
iosource_mgr->UnregisterFd(bstate->subscriber.fd());
iosource_mgr->UnregisterFd(bstate->status_subscriber.fd());
vector<string> stores_to_close;
for ( auto& x : data_stores )
@ -275,8 +278,6 @@ void Manager::FlushPendingQueries()
}
}
}
SetIdle(false);
}
uint16_t Manager::Listen(const string& addr, uint16_t port)
@ -815,24 +816,6 @@ bool Manager::Unsubscribe(const string& topic_prefix)
return true;
}
void Manager::GetFds(iosource::FD_Set* read, iosource::FD_Set* write,
iosource::FD_Set* except)
{
read->Insert(bstate->subscriber.fd());
read->Insert(bstate->status_subscriber.fd());
for ( auto& x : data_stores )
read->Insert(x.second->proxy.mailbox().descriptor());
}
double Manager::NextTimestamp(double* local_network_time)
{
// We're only asked for a timestamp if either (1) a FD was ready
// or (2) we're not idle (and we go idle if when Process is no-op),
// so there's no case where returning -1 to signify a skip will help.
return timer_mgr->Time();
}
void Manager::DispatchMessage(const broker::topic& topic, broker::data msg)
{
switch ( broker::zeek::Message::type(msg) ) {
@ -885,6 +868,10 @@ void Manager::DispatchMessage(const broker::topic& topic, broker::data msg)
void Manager::Process()
{
// Ensure that time gets update before processing broker messages, or events
// based on them might get scheduled wrong.
net_update_time(current_time());
bool had_input = false;
auto status_msgs = bstate->status_subscriber.poll();
@ -949,30 +936,6 @@ void Manager::Process()
// network_time, may as well do so now because otherwise the
// broker/cluster logs will end up using timestamp 0.
net_update_time(current_time());
++times_processed_without_idle;
// The max number of Process calls allowed to happen in a row without
// idling is chosen a bit arbitrarily, except 12 is around half of the
// SELECT_FREQUENCY (25).
//
// But probably the general idea should be for it to have some relation
// to the SELECT_FREQUENCY: less than it so other busy IOSources can
// fit several Process loops in before the next poll event (e.g. the
// select() call ), but still large enough such that we don't have to
// wait long before the next poll ourselves after being forced to idle.
if ( times_processed_without_idle > 12 )
{
times_processed_without_idle = 0;
SetIdle(true);
}
else
SetIdle(false);
}
else
{
times_processed_without_idle = 0;
SetIdle(true);
}
}
@ -1484,6 +1447,7 @@ StoreHandleVal* Manager::MakeMaster(const string& name, broker::backend type,
Ref(handle);
data_stores.emplace(name, handle);
iosource_mgr->RegisterFd(handle->proxy.mailbox().descriptor(), this);
if ( bstate->endpoint.use_real_time() )
return handle;
@ -1520,6 +1484,7 @@ StoreHandleVal* Manager::MakeClone(const string& name, double resync_interval,
Ref(handle);
data_stores.emplace(name, handle);
iosource_mgr->RegisterFd(handle->proxy.mailbox().descriptor(), this);
return handle;
}
@ -1538,6 +1503,8 @@ bool Manager::CloseStore(const string& name)
if ( s == data_stores.end() )
return false;
iosource_mgr->UnregisterFd(s->second->proxy.mailbox().descriptor());
for ( auto i = pending_queries.begin(); i != pending_queries.end(); )
if ( i->second->Store().name() == name )
{

View file

@ -346,15 +346,9 @@ private:
__attribute__((format (printf, 2, 3)));
// IOSource interface overrides:
void GetFds(iosource::FD_Set* read, iosource::FD_Set* write,
iosource::FD_Set* except) override;
double NextTimestamp(double* local_network_time) override;
void Process() override;
const char* Tag() override
{ return "Broker::Manager"; }
const char* Tag() override { return "Broker::Manager"; }
double GetNextTimeout() override { return -1; }
struct LogBuffer {
// Indexed by topic string.
@ -392,7 +386,6 @@ private:
bool reading_pcaps;
bool after_zeek_init;
int peer_count;
int times_processed_without_idle;
size_t log_batch_size;
double log_batch_interval;