mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 14:48:21 +00:00
BIT-1408: improve I/O loop and Broker IOSource.
This commit is contained in:
parent
a6618eb964
commit
58ea1ff458
4 changed files with 24 additions and 17 deletions
|
@ -24,6 +24,12 @@ int bro_broker::Manager::send_flags_self_idx;
|
||||||
int bro_broker::Manager::send_flags_peers_idx;
|
int bro_broker::Manager::send_flags_peers_idx;
|
||||||
int bro_broker::Manager::send_flags_unsolicited_idx;
|
int bro_broker::Manager::send_flags_unsolicited_idx;
|
||||||
|
|
||||||
|
bro_broker::Manager::Manager()
|
||||||
|
: iosource::IOSource(), next_timestamp(-1)
|
||||||
|
{
|
||||||
|
SetIdle(true);
|
||||||
|
}
|
||||||
|
|
||||||
bro_broker::Manager::~Manager()
|
bro_broker::Manager::~Manager()
|
||||||
{
|
{
|
||||||
vector<decltype(data_stores)::key_type> stores_to_close;
|
vector<decltype(data_stores)::key_type> stores_to_close;
|
||||||
|
@ -560,8 +566,10 @@ void bro_broker::Manager::GetFds(iosource::FD_Set* read, iosource::FD_Set* write
|
||||||
|
|
||||||
double bro_broker::Manager::NextTimestamp(double* local_network_time)
|
double bro_broker::Manager::NextTimestamp(double* local_network_time)
|
||||||
{
|
{
|
||||||
// TODO: do something better?
|
if ( next_timestamp < 0 )
|
||||||
return timer_mgr->Time();
|
next_timestamp = timer_mgr->Time();
|
||||||
|
|
||||||
|
return next_timestamp;
|
||||||
}
|
}
|
||||||
|
|
||||||
struct response_converter {
|
struct response_converter {
|
||||||
|
@ -619,7 +627,6 @@ static RecordVal* response_to_val(broker::store::response r)
|
||||||
|
|
||||||
void bro_broker::Manager::Process()
|
void bro_broker::Manager::Process()
|
||||||
{
|
{
|
||||||
bool idle = true;
|
|
||||||
auto outgoing_connection_updates =
|
auto outgoing_connection_updates =
|
||||||
endpoint->outgoing_connection_status().want_pop();
|
endpoint->outgoing_connection_status().want_pop();
|
||||||
auto incoming_connection_updates =
|
auto incoming_connection_updates =
|
||||||
|
@ -630,8 +637,6 @@ void bro_broker::Manager::Process()
|
||||||
|
|
||||||
for ( auto& u : outgoing_connection_updates )
|
for ( auto& u : outgoing_connection_updates )
|
||||||
{
|
{
|
||||||
idle = false;
|
|
||||||
|
|
||||||
switch ( u.status ) {
|
switch ( u.status ) {
|
||||||
case broker::outgoing_connection_status::tag::established:
|
case broker::outgoing_connection_status::tag::established:
|
||||||
if ( BrokerComm::outgoing_connection_established )
|
if ( BrokerComm::outgoing_connection_established )
|
||||||
|
@ -677,8 +682,6 @@ void bro_broker::Manager::Process()
|
||||||
|
|
||||||
for ( auto& u : incoming_connection_updates )
|
for ( auto& u : incoming_connection_updates )
|
||||||
{
|
{
|
||||||
idle = false;
|
|
||||||
|
|
||||||
switch ( u.status ) {
|
switch ( u.status ) {
|
||||||
case broker::incoming_connection_status::tag::established:
|
case broker::incoming_connection_status::tag::established:
|
||||||
if ( BrokerComm::incoming_connection_established )
|
if ( BrokerComm::incoming_connection_established )
|
||||||
|
@ -714,7 +717,6 @@ void bro_broker::Manager::Process()
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
ps.second.received += print_messages.size();
|
ps.second.received += print_messages.size();
|
||||||
idle = false;
|
|
||||||
|
|
||||||
if ( ! BrokerComm::print_handler )
|
if ( ! BrokerComm::print_handler )
|
||||||
continue;
|
continue;
|
||||||
|
@ -751,7 +753,6 @@ void bro_broker::Manager::Process()
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
es.second.received += event_messages.size();
|
es.second.received += event_messages.size();
|
||||||
idle = false;
|
|
||||||
|
|
||||||
for ( auto& em : event_messages )
|
for ( auto& em : event_messages )
|
||||||
{
|
{
|
||||||
|
@ -822,7 +823,6 @@ void bro_broker::Manager::Process()
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
ls.second.received += log_messages.size();
|
ls.second.received += log_messages.size();
|
||||||
idle = false;
|
|
||||||
|
|
||||||
for ( auto& lm : log_messages )
|
for ( auto& lm : log_messages )
|
||||||
{
|
{
|
||||||
|
@ -890,7 +890,6 @@ void bro_broker::Manager::Process()
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
statistics.report_count += responses.size();
|
statistics.report_count += responses.size();
|
||||||
idle = false;
|
|
||||||
|
|
||||||
for ( auto& response : responses )
|
for ( auto& response : responses )
|
||||||
{
|
{
|
||||||
|
@ -940,8 +939,6 @@ void bro_broker::Manager::Process()
|
||||||
|
|
||||||
for ( auto& report : reports )
|
for ( auto& report : reports )
|
||||||
{
|
{
|
||||||
idle = false;
|
|
||||||
|
|
||||||
if ( report.size() < 2 )
|
if ( report.size() < 2 )
|
||||||
{
|
{
|
||||||
reporter->Warning("got broker report msg of size %zu, expect 4",
|
reporter->Warning("got broker report msg of size %zu, expect 4",
|
||||||
|
@ -979,7 +976,7 @@ void bro_broker::Manager::Process()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SetIdle(idle);
|
next_timestamp = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool bro_broker::Manager::AddStore(StoreHandleVal* handle)
|
bool bro_broker::Manager::AddStore(StoreHandleVal* handle)
|
||||||
|
|
|
@ -50,6 +50,11 @@ class Manager : public iosource::IOSource {
|
||||||
friend class StoreHandleVal;
|
friend class StoreHandleVal;
|
||||||
public:
|
public:
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor.
|
||||||
|
*/
|
||||||
|
Manager();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Destructor. Any still-pending data store queries are aborted.
|
* Destructor. Any still-pending data store queries are aborted.
|
||||||
*/
|
*/
|
||||||
|
@ -351,6 +356,7 @@ private:
|
||||||
std::unordered_set<StoreQueryCallback*> pending_queries;
|
std::unordered_set<StoreQueryCallback*> pending_queries;
|
||||||
|
|
||||||
Stats statistics;
|
Stats statistics;
|
||||||
|
double next_timestamp;
|
||||||
|
|
||||||
static VectorType* vector_of_data_type;
|
static VectorType* vector_of_data_type;
|
||||||
static EnumType* log_id_type;
|
static EnumType* log_id_type;
|
||||||
|
|
|
@ -118,9 +118,6 @@ IOSource* Manager::FindSoonest(double* ts)
|
||||||
|
|
||||||
src->Clear();
|
src->Clear();
|
||||||
src->src->GetFds(&src->fd_read, &src->fd_write, &src->fd_except);
|
src->src->GetFds(&src->fd_read, &src->fd_write, &src->fd_except);
|
||||||
if ( src->fd_read.Empty() ) src->fd_read.Insert(0);
|
|
||||||
if ( src->fd_write.Empty() ) src->fd_write.Insert(0);
|
|
||||||
if ( src->fd_except.Empty() ) src->fd_except.Insert(0);
|
|
||||||
src->SetFds(&fd_read, &fd_write, &fd_except, &maxx);
|
src->SetFds(&fd_read, &fd_write, &fd_except, &maxx);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -240,6 +240,13 @@ void PktSrc::GetFds(iosource::FD_Set* read, iosource::FD_Set* write,
|
||||||
|
|
||||||
if ( IsOpen() && props.selectable_fd >= 0 )
|
if ( IsOpen() && props.selectable_fd >= 0 )
|
||||||
read->Insert(props.selectable_fd);
|
read->Insert(props.selectable_fd);
|
||||||
|
|
||||||
|
// TODO: This seems like a hack that should be removed, but doing so
|
||||||
|
// causes the main run loop to spin more frequently and increase cpu usage.
|
||||||
|
// See also commit 9cd85be308.
|
||||||
|
if ( read->Empty() ) read->Insert(0);
|
||||||
|
if ( write->Empty() ) write->Insert(0);
|
||||||
|
if ( except->Empty() ) except->Insert(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
double PktSrc::NextTimestamp(double* local_network_time)
|
double PktSrc::NextTimestamp(double* local_network_time)
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue