diff --git a/aux/broker b/aux/broker index c217119d9a..0760c6808c 160000 --- a/aux/broker +++ b/aux/broker @@ -1 +1 @@ -Subproject commit c217119d9a484da941161d182cdc0a1f86a0d40f +Subproject commit 0760c6808c1d035b7e9f484daefe8ba0a3d6ee13 diff --git a/src/DebugLogger.cc b/src/DebugLogger.cc index 6f025e3c2b..3ce5d92888 100644 --- a/src/DebugLogger.cc +++ b/src/DebugLogger.cc @@ -19,7 +19,7 @@ DebugLogger::Stream DebugLogger::streams[NUM_DBGS] = { { "logging", 0, false }, {"input", 0, false }, { "threading", 0, false }, { "file_analysis", 0, false }, { "plugins", 0, false }, { "broxygen", 0, false }, - { "pktio", 0, false} + { "pktio", 0, false }, { "broker", 0, false } }; DebugLogger::DebugLogger(const char* filename) diff --git a/src/DebugLogger.h b/src/DebugLogger.h index 9cd09dada1..13124657e7 100644 --- a/src/DebugLogger.h +++ b/src/DebugLogger.h @@ -32,6 +32,7 @@ enum DebugStream { DBG_PLUGINS, // Plugin system DBG_BROXYGEN, // Broxygen DBG_PKTIO, // Packet sources and dumpers. + DBG_BROKER, // Broker communication NUM_DBGS // Has to be last }; diff --git a/src/comm/Manager.cc b/src/comm/Manager.cc index cfce84a1c9..443c5f90da 100644 --- a/src/comm/Manager.cc +++ b/src/comm/Manager.cc @@ -2,6 +2,7 @@ #include "Data.h" #include "Store.h" #include +#include #include #include #include "util.h" @@ -11,6 +12,7 @@ #include "comm/messaging.bif.h" #include "comm/store.bif.h" #include "logging/Manager.h" +#include "DebugLogger.h" using namespace std; @@ -67,6 +69,15 @@ bool comm::Manager::InitPostScript() return false; } + res = broker::report::init(true); + + if ( res ) + { + fprintf(stderr, "broker::report::init failed: %s\n", + broker::strerror(res)); + return false; + } + const char* name; auto name_from_script = internal_val("Comm::endpoint_name")->AsString(); @@ -398,6 +409,8 @@ void comm::Manager::GetFds(iosource::FD_Set* read, iosource::FD_Set* write, for ( const auto& s : data_stores ) read->Insert(s.second->store->responses().fd()); + + read->Insert(broker::report::default_queue->fd()); } double comm::Manager::NextTimestamp(double* local_network_time) @@ -733,6 +746,52 @@ void comm::Manager::Process() } } + auto reports = broker::report::default_queue->want_pop(); + + if ( ! reports.empty() ) + { + idle = false; + + for ( auto& report : reports ) + { + if ( report.size() < 2 ) + { + reporter->Warning("got broker report msg of size %zu, expect 4", + report.size()); + continue; + } + + uint64_t* level = broker::get(report[1]); + + if ( ! level ) + { + reporter->Warning("got broker report msg w/ bad level type: %d", + static_cast(broker::which(report[1]))); + continue; + } + + auto lvl = static_cast(*level); + + switch ( lvl ) { + case broker::report::level::debug: + DBG_LOG(DBG_BROKER, broker::to_string(report).data()); + break; + case broker::report::level::info: + reporter->Info("broker info: %s", + broker::to_string(report).data()); + break; + case broker::report::level::warn: + reporter->Warning("broker warning: %s", + broker::to_string(report).data()); + break; + case broker::report::level::error: + reporter->Error("broker error: %s", + broker::to_string(report).data()); + break; + } + } + } + SetIdle(idle); }