diff --git a/src/Stats.cc b/src/Stats.cc index 01ca0a41d3..111af52598 100644 --- a/src/Stats.cc +++ b/src/Stats.cc @@ -10,6 +10,10 @@ #include "Trigger.h" #include "threading/Manager.h" +#ifdef ENABLE_BROKER +#include "comm/Manager.h" +#endif + int killed_by_inactivity = 0; uint64 tot_ack_events = 0; @@ -222,6 +226,26 @@ void ProfileLogger::Log() )); } +#ifdef ENABLE_BROKER + auto cs = comm_mgr->ConsumeStatistics(); + + file->Write(fmt("%0.6f Comm: peers=%zu stores=%zu " + "store_queries=%zu store_responses=%zu " + "outgoing_conn_status=%zu incoming_conn_status=%zu " + "reports=%zu\n", + network_time, cs.outgoing_peer_count, cs.data_store_count, + cs.pending_query_count, cs.response_count, + cs.outgoing_conn_status_count, cs.incoming_conn_status_count, + cs.report_count)); + + for ( const auto& s : cs.print_count ) + file->Write(fmt(" %-25s prints dequeued=%zu\n", s.first.data(), s.second)); + for ( const auto& s : cs.event_count ) + file->Write(fmt(" %-25s events dequeued=%zu\n", s.first.data(), s.second)); + for ( const auto& s : cs.log_count ) + file->Write(fmt(" %-25s logs dequeued=%zu\n", s.first.data(), s.second)); +#endif + // Script-level state. unsigned int size, mem = 0; PDict(ID)* globals = global_scope()->Vars(); diff --git a/src/comm/Manager.cc b/src/comm/Manager.cc index 65a7bddbf6..b3377bdec2 100644 --- a/src/comm/Manager.cc +++ b/src/comm/Manager.cc @@ -388,7 +388,7 @@ bool comm::Manager::SubscribeToPrints(string topic_prefix) if ( ! Enabled() ) return false; - auto& q = print_subscriptions[topic_prefix]; + auto& q = print_subscriptions[topic_prefix].q; if ( q ) return false; @@ -410,7 +410,7 @@ bool comm::Manager::SubscribeToEvents(string topic_prefix) if ( ! Enabled() ) return false; - auto& q = event_subscriptions[topic_prefix]; + auto& q = event_subscriptions[topic_prefix].q; if ( q ) return false; @@ -432,7 +432,7 @@ bool comm::Manager::SubscribeToLogs(string topic_prefix) if ( ! Enabled() ) return false; - auto& q = log_subscriptions[topic_prefix]; + auto& q = log_subscriptions[topic_prefix].q; if ( q ) return false; @@ -515,13 +515,13 @@ void comm::Manager::GetFds(iosource::FD_Set* read, iosource::FD_Set* write, read->Insert(endpoint->incoming_connection_status().fd()); for ( const auto& ps : print_subscriptions ) - read->Insert(ps.second.fd()); + read->Insert(ps.second.q.fd()); for ( const auto& ps : event_subscriptions ) - read->Insert(ps.second.fd()); + read->Insert(ps.second.q.fd()); for ( const auto& ps : log_subscriptions ) - read->Insert(ps.second.fd()); + read->Insert(ps.second.q.fd()); for ( const auto& s : data_stores ) read->Insert(s.second->store->responses().fd()); @@ -596,6 +596,9 @@ void comm::Manager::Process() auto incoming_connection_updates = endpoint->incoming_connection_status().want_pop(); + statistics.outgoing_conn_status_count += outgoing_connection_updates.size(); + statistics.incoming_conn_status_count += incoming_connection_updates.size(); + for ( auto& u : outgoing_connection_updates ) { idle = false; @@ -674,13 +677,14 @@ void comm::Manager::Process() } } - for ( const auto& ps : print_subscriptions ) + for ( auto& ps : print_subscriptions ) { - auto print_messages = ps.second.want_pop(); + auto print_messages = ps.second.q.want_pop(); if ( print_messages.empty() ) continue; + ps.second.received += print_messages.size(); idle = false; if ( ! Comm::print_handler ) @@ -710,13 +714,14 @@ void comm::Manager::Process() } } - for ( const auto& es : event_subscriptions ) + for ( auto& es : event_subscriptions ) { - auto event_messages = es.second.want_pop(); + auto event_messages = es.second.q.want_pop(); if ( event_messages.empty() ) continue; + es.second.received += event_messages.size(); idle = false; for ( auto& em : event_messages ) @@ -780,13 +785,14 @@ void comm::Manager::Process() Val* val; }; - for ( const auto& ls : log_subscriptions ) + for ( auto& ls : log_subscriptions ) { - auto log_messages = ls.second.want_pop(); + auto log_messages = ls.second.q.want_pop(); if ( log_messages.empty() ) continue; + ls.second.received += log_messages.size(); idle = false; for ( auto& lm : log_messages ) @@ -854,6 +860,7 @@ void comm::Manager::Process() if ( responses.empty() ) continue; + statistics.report_count += responses.size(); idle = false; for ( auto& response : responses ) @@ -900,48 +907,46 @@ void comm::Manager::Process() } auto reports = broker::report::default_queue->want_pop(); + statistics.report_count += reports.size(); - if ( ! reports.empty() ) + for ( auto& report : reports ) { idle = false; - for ( auto& report : reports ) + if ( report.size() < 2 ) { - if ( report.size() < 2 ) - { - reporter->Warning("got broker report msg of size %zu, expect 4", - report.size()); - continue; - } - - uint64_t* level = broker::get(report[1]); - - if ( ! level ) - { - reporter->Warning("got broker report msg w/ bad level type: %d", - static_cast(broker::which(report[1]))); - continue; - } - - auto lvl = static_cast(*level); - - switch ( lvl ) { - case broker::report::level::debug: - DBG_LOG(DBG_BROKER, broker::to_string(report).data()); - break; - case broker::report::level::info: - reporter->Info("broker info: %s", - broker::to_string(report).data()); - break; - case broker::report::level::warn: - reporter->Warning("broker warning: %s", - broker::to_string(report).data()); - break; - case broker::report::level::error: - reporter->Error("broker error: %s", - broker::to_string(report).data()); - break; + reporter->Warning("got broker report msg of size %zu, expect 4", + report.size()); + continue; } + + uint64_t* level = broker::get(report[1]); + + if ( ! level ) + { + reporter->Warning("got broker report msg w/ bad level type: %d", + static_cast(broker::which(report[1]))); + continue; + } + + auto lvl = static_cast(*level); + + switch ( lvl ) { + case broker::report::level::debug: + DBG_LOG(DBG_BROKER, broker::to_string(report).data()); + break; + case broker::report::level::info: + reporter->Info("broker info: %s", + broker::to_string(report).data()); + break; + case broker::report::level::warn: + reporter->Warning("broker warning: %s", + broker::to_string(report).data()); + break; + case broker::report::level::error: + reporter->Error("broker error: %s", + broker::to_string(report).data()); + break; } } @@ -1019,3 +1024,32 @@ bool comm::Manager::TrackStoreQuery(StoreQueryCallback* cb) assert(Enabled()); return pending_queries.insert(cb).second; } + +comm::Stats comm::Manager::ConsumeStatistics() + { + statistics.outgoing_peer_count = peers.size(); + statistics.data_store_count = data_stores.size(); + statistics.pending_query_count = pending_queries.size(); + + for ( auto& s : print_subscriptions ) + { + statistics.print_count[s.first] = s.second.received; + s.second.received = 0; + } + + for ( auto& s : event_subscriptions ) + { + statistics.event_count[s.first] = s.second.received; + s.second.received = 0; + } + + for ( auto& s : log_subscriptions ) + { + statistics.log_count[s.first] = s.second.received; + s.second.received = 0; + } + + auto rval = move(statistics); + statistics = Stats{}; + return rval; + } diff --git a/src/comm/Manager.h b/src/comm/Manager.h index bd1236bf34..0093c0bd90 100644 --- a/src/comm/Manager.h +++ b/src/comm/Manager.h @@ -14,6 +14,34 @@ namespace comm { +/** + * Communication statistics. Some are tracked in relation to last + * sample (comm::Manager::ConsumeStatistics()). + */ +struct Stats { + // Number of outgoing peer connections (at time of sample). + size_t outgoing_peer_count = 0; + // Number of data stores (at time of sample). + size_t data_store_count = 0; + // Number of pending data store queries (at time of sample). + size_t pending_query_count = 0; + // Number of data store responses received (since last sample). + size_t response_count = 0; + // Number of outgoing connection updates received (since last sample). + size_t outgoing_conn_status_count = 0; + // Number of incoming connection updates received (since last sample). + size_t incoming_conn_status_count = 0; + // Number of broker report messages (e.g. debug, warning, errors) received + // (since last sample). + size_t report_count = 0; + // Number of print messages received per topic-prefix (since last sample). + std::map print_count; + // Number of event messages received per topic-prefix (since last sample). + std::map event_count; + // Number of log messages received per topic-prefix (since last sample). + std::map log_count; +}; + /** * Manages various forms of communication between peer Bro processes * or other external applications via use of the Broker messaging library. @@ -279,6 +307,11 @@ public: */ bool TrackStoreQuery(StoreQueryCallback* cb); + /** + * @return communication statistics. + */ + Stats ConsumeStatistics(); + /** * Convert Comm::SendFlags to int flags for use with broker::send(). */ @@ -300,16 +333,23 @@ private: broker::endpoint& Endpoint() { return *endpoint; } + struct QueueWithStats { + broker::message_queue q; + size_t received = 0; + }; + std::unique_ptr endpoint; std::map, broker::peering> peers; - std::map print_subscriptions; - std::map event_subscriptions; - std::map log_subscriptions; + std::map print_subscriptions; + std::map event_subscriptions; + std::map log_subscriptions; std::map, StoreHandleVal*> data_stores; std::unordered_set pending_queries; + Stats statistics; + static VectorType* vector_of_data_type; static EnumType* log_id_type; static int send_flags_self_idx;