broker integration: add prof.log statistics

This commit is contained in:
Jon Siwek 2015-02-18 14:53:30 -06:00
parent 8d19bf6381
commit a87b4feaae
3 changed files with 149 additions and 51 deletions

View file

@ -10,6 +10,10 @@
#include "Trigger.h"
#include "threading/Manager.h"
#ifdef ENABLE_BROKER
#include "comm/Manager.h"
#endif
int killed_by_inactivity = 0;
uint64 tot_ack_events = 0;
@ -222,6 +226,26 @@ void ProfileLogger::Log()
));
}
#ifdef ENABLE_BROKER
auto cs = comm_mgr->ConsumeStatistics();
file->Write(fmt("%0.6f Comm: peers=%zu stores=%zu "
"store_queries=%zu store_responses=%zu "
"outgoing_conn_status=%zu incoming_conn_status=%zu "
"reports=%zu\n",
network_time, cs.outgoing_peer_count, cs.data_store_count,
cs.pending_query_count, cs.response_count,
cs.outgoing_conn_status_count, cs.incoming_conn_status_count,
cs.report_count));
for ( const auto& s : cs.print_count )
file->Write(fmt(" %-25s prints dequeued=%zu\n", s.first.data(), s.second));
for ( const auto& s : cs.event_count )
file->Write(fmt(" %-25s events dequeued=%zu\n", s.first.data(), s.second));
for ( const auto& s : cs.log_count )
file->Write(fmt(" %-25s logs dequeued=%zu\n", s.first.data(), s.second));
#endif
// Script-level state.
unsigned int size, mem = 0;
PDict(ID)* globals = global_scope()->Vars();

View file

@ -388,7 +388,7 @@ bool comm::Manager::SubscribeToPrints(string topic_prefix)
if ( ! Enabled() )
return false;
auto& q = print_subscriptions[topic_prefix];
auto& q = print_subscriptions[topic_prefix].q;
if ( q )
return false;
@ -410,7 +410,7 @@ bool comm::Manager::SubscribeToEvents(string topic_prefix)
if ( ! Enabled() )
return false;
auto& q = event_subscriptions[topic_prefix];
auto& q = event_subscriptions[topic_prefix].q;
if ( q )
return false;
@ -432,7 +432,7 @@ bool comm::Manager::SubscribeToLogs(string topic_prefix)
if ( ! Enabled() )
return false;
auto& q = log_subscriptions[topic_prefix];
auto& q = log_subscriptions[topic_prefix].q;
if ( q )
return false;
@ -515,13 +515,13 @@ void comm::Manager::GetFds(iosource::FD_Set* read, iosource::FD_Set* write,
read->Insert(endpoint->incoming_connection_status().fd());
for ( const auto& ps : print_subscriptions )
read->Insert(ps.second.fd());
read->Insert(ps.second.q.fd());
for ( const auto& ps : event_subscriptions )
read->Insert(ps.second.fd());
read->Insert(ps.second.q.fd());
for ( const auto& ps : log_subscriptions )
read->Insert(ps.second.fd());
read->Insert(ps.second.q.fd());
for ( const auto& s : data_stores )
read->Insert(s.second->store->responses().fd());
@ -596,6 +596,9 @@ void comm::Manager::Process()
auto incoming_connection_updates =
endpoint->incoming_connection_status().want_pop();
statistics.outgoing_conn_status_count += outgoing_connection_updates.size();
statistics.incoming_conn_status_count += incoming_connection_updates.size();
for ( auto& u : outgoing_connection_updates )
{
idle = false;
@ -674,13 +677,14 @@ void comm::Manager::Process()
}
}
for ( const auto& ps : print_subscriptions )
for ( auto& ps : print_subscriptions )
{
auto print_messages = ps.second.want_pop();
auto print_messages = ps.second.q.want_pop();
if ( print_messages.empty() )
continue;
ps.second.received += print_messages.size();
idle = false;
if ( ! Comm::print_handler )
@ -710,13 +714,14 @@ void comm::Manager::Process()
}
}
for ( const auto& es : event_subscriptions )
for ( auto& es : event_subscriptions )
{
auto event_messages = es.second.want_pop();
auto event_messages = es.second.q.want_pop();
if ( event_messages.empty() )
continue;
es.second.received += event_messages.size();
idle = false;
for ( auto& em : event_messages )
@ -780,13 +785,14 @@ void comm::Manager::Process()
Val* val;
};
for ( const auto& ls : log_subscriptions )
for ( auto& ls : log_subscriptions )
{
auto log_messages = ls.second.want_pop();
auto log_messages = ls.second.q.want_pop();
if ( log_messages.empty() )
continue;
ls.second.received += log_messages.size();
idle = false;
for ( auto& lm : log_messages )
@ -854,6 +860,7 @@ void comm::Manager::Process()
if ( responses.empty() )
continue;
statistics.report_count += responses.size();
idle = false;
for ( auto& response : responses )
@ -900,13 +907,12 @@ void comm::Manager::Process()
}
auto reports = broker::report::default_queue->want_pop();
if ( ! reports.empty() )
{
idle = false;
statistics.report_count += reports.size();
for ( auto& report : reports )
{
idle = false;
if ( report.size() < 2 )
{
reporter->Warning("got broker report msg of size %zu, expect 4",
@ -943,7 +949,6 @@ void comm::Manager::Process()
break;
}
}
}
SetIdle(idle);
}
@ -1019,3 +1024,32 @@ bool comm::Manager::TrackStoreQuery(StoreQueryCallback* cb)
assert(Enabled());
return pending_queries.insert(cb).second;
}
comm::Stats comm::Manager::ConsumeStatistics()
{
statistics.outgoing_peer_count = peers.size();
statistics.data_store_count = data_stores.size();
statistics.pending_query_count = pending_queries.size();
for ( auto& s : print_subscriptions )
{
statistics.print_count[s.first] = s.second.received;
s.second.received = 0;
}
for ( auto& s : event_subscriptions )
{
statistics.event_count[s.first] = s.second.received;
s.second.received = 0;
}
for ( auto& s : log_subscriptions )
{
statistics.log_count[s.first] = s.second.received;
s.second.received = 0;
}
auto rval = move(statistics);
statistics = Stats{};
return rval;
}

View file

@ -14,6 +14,34 @@
namespace comm {
/**
* Communication statistics. Some are tracked in relation to last
* sample (comm::Manager::ConsumeStatistics()).
*/
struct Stats {
// Number of outgoing peer connections (at time of sample).
size_t outgoing_peer_count = 0;
// Number of data stores (at time of sample).
size_t data_store_count = 0;
// Number of pending data store queries (at time of sample).
size_t pending_query_count = 0;
// Number of data store responses received (since last sample).
size_t response_count = 0;
// Number of outgoing connection updates received (since last sample).
size_t outgoing_conn_status_count = 0;
// Number of incoming connection updates received (since last sample).
size_t incoming_conn_status_count = 0;
// Number of broker report messages (e.g. debug, warning, errors) received
// (since last sample).
size_t report_count = 0;
// Number of print messages received per topic-prefix (since last sample).
std::map<std::string, size_t> print_count;
// Number of event messages received per topic-prefix (since last sample).
std::map<std::string, size_t> event_count;
// Number of log messages received per topic-prefix (since last sample).
std::map<std::string, size_t> log_count;
};
/**
* Manages various forms of communication between peer Bro processes
* or other external applications via use of the Broker messaging library.
@ -279,6 +307,11 @@ public:
*/
bool TrackStoreQuery(StoreQueryCallback* cb);
/**
* @return communication statistics.
*/
Stats ConsumeStatistics();
/**
* Convert Comm::SendFlags to int flags for use with broker::send().
*/
@ -300,16 +333,23 @@ private:
broker::endpoint& Endpoint()
{ return *endpoint; }
struct QueueWithStats {
broker::message_queue q;
size_t received = 0;
};
std::unique_ptr<broker::endpoint> endpoint;
std::map<std::pair<std::string, uint16_t>, broker::peering> peers;
std::map<std::string, broker::message_queue> print_subscriptions;
std::map<std::string, broker::message_queue> event_subscriptions;
std::map<std::string, broker::message_queue> log_subscriptions;
std::map<std::string, QueueWithStats> print_subscriptions;
std::map<std::string, QueueWithStats> event_subscriptions;
std::map<std::string, QueueWithStats> log_subscriptions;
std::map<std::pair<broker::store::identifier, StoreType>,
StoreHandleVal*> data_stores;
std::unordered_set<StoreQueryCallback*> pending_queries;
Stats statistics;
static VectorType* vector_of_data_type;
static EnumType* log_id_type;
static int send_flags_self_idx;