diff --git a/aux/broker b/aux/broker index 08f41ccc24..17fcb73a30 160000 --- a/aux/broker +++ b/aux/broker @@ -1 +1 @@ -Subproject commit 08f41ccc2497f4c6567da0b95488593c39a12a01 +Subproject commit 17fcb73a30388862f0a921040a605ac38f47ff74 diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index 20bba4426d..eb1ef5de05 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -34,6 +34,43 @@ static const double LOG_BUFFER_INTERVAL = 1.0; // subscribers consider themselves congested. static const size_t SUBSCRIBER_MAX_QSIZE = 20u; +static inline Val* get_option(const char* option) + { + auto id = global_scope()->Lookup(option); + + if ( ! (id && id->ID_Val()) ) + reporter->FatalError("Unknown Broker option %s", option); + + return id->ID_Val(); + } + +class BrokerConfig : public broker::configuration { +public: + BrokerConfig(broker::broker_options options) + : broker::configuration(options) + { + openssl_cafile = get_option("Broker::ssl_cafile")->AsString()->CheckString(); + openssl_capath = get_option("Broker::ssl_capath")->AsString()->CheckString(); + openssl_certificate = get_option("Broker::ssl_certificate")->AsString()->CheckString(); + openssl_key = get_option("Broker::ssl_keyfile")->AsString()->CheckString(); + openssl_passphrase = get_option("Broker::ssl_passphrase")->AsString()->CheckString(); + } +}; + +class BrokerState { +public: + BrokerState(BrokerConfig config) + : endpoint(std::move(config)), + subscriber(endpoint.make_subscriber({}, SUBSCRIBER_MAX_QSIZE)), + status_subscriber(endpoint.make_status_subscriber(true)) + { + } + + broker::endpoint endpoint; + broker::subscriber subscriber; + broker::status_subscriber status_subscriber; +}; + const broker::endpoint_info Manager::NoPeer{{}, {}}; VectorType* Manager::vector_of_data_type; @@ -103,33 +140,6 @@ static std::string RenderMessage(const broker::error& e) #endif -static inline Val* get_option(const char* option) - { - auto id = global_scope()->Lookup(option); - - if ( ! (id && id->ID_Val()) ) - reporter->FatalError("Unknown Broker option %s", option); - - return id->ID_Val(); - } - -Manager::BrokerConfig::BrokerConfig(broker::broker_options options) - : broker::configuration(options) - { - openssl_cafile = get_option("Broker::ssl_cafile")->AsString()->CheckString(); - openssl_capath = get_option("Broker::ssl_capath")->AsString()->CheckString(); - openssl_certificate = get_option("Broker::ssl_certificate")->AsString()->CheckString(); - openssl_key = get_option("Broker::ssl_keyfile")->AsString()->CheckString(); - openssl_passphrase = get_option("Broker::ssl_passphrase")->AsString()->CheckString(); - } - -Manager::BrokerState::BrokerState(BrokerConfig config) - : endpoint(std::move(config)), - subscriber(endpoint.make_subscriber({}, SUBSCRIBER_MAX_QSIZE)), - status_subscriber(endpoint.make_status_subscriber(true)) - { - } - Manager::Manager(bool reading_pcaps) { bound_port = 0; @@ -634,7 +644,7 @@ bool Manager::PublishLogWrite(EnumVal* stream, EnumVal* writer, string path, int if ( lb.message_count >= LOG_BATCH_SIZE || (network_time - lb.last_flush >= LOG_BUFFER_INTERVAL) ) - statistics.num_logs_outgoing += lb.Flush(Endpoint()); + statistics.num_logs_outgoing += lb.Flush(bstate->endpoint); return true; } @@ -671,7 +681,7 @@ size_t Manager::FlushLogBuffers() auto rval = 0u; for ( auto& lb : log_buffers ) - rval += lb.Flush(Endpoint()); + rval += lb.Flush(bstate->endpoint); return rval; } diff --git a/src/broker/Manager.h b/src/broker/Manager.h index 8c5ab09dc6..b5b05adab1 100644 --- a/src/broker/Manager.h +++ b/src/broker/Manager.h @@ -16,6 +16,8 @@ namespace bro_broker { +class BrokerState; + /** * Communication statistics. */ @@ -197,8 +199,11 @@ public: * @param peer If given, send the message only to this peer. * @return true if the message is sent successfully. */ - bool PublishLogCreate(EnumVal* stream, EnumVal* writer, const logging::WriterBackend::WriterInfo& info, - int num_fields, const threading::Field* const * fields, const broker::endpoint_info& peer = NoPeer); + bool PublishLogCreate(EnumVal* stream, EnumVal* writer, + const logging::WriterBackend::WriterInfo& info, + int num_fields, + const threading::Field* const * fields, + const broker::endpoint_info& peer = NoPeer); /** * Send a log entry to any interested peers. The topic name used is @@ -268,7 +273,7 @@ public: * @return a pointer to the newly created store a nullptr on failure. */ StoreHandleVal* MakeMaster(const std::string& name, broker::backend type, - broker::backend_options opts); + broker::backend_options opts); /** * Create a new *clone* data store. @@ -339,19 +344,6 @@ public: private: - class BrokerConfig : public broker::configuration { - public: - BrokerConfig(broker::broker_options options); - }; - - class BrokerState { - public: - BrokerState(BrokerConfig config); - broker::endpoint endpoint; - broker::subscriber subscriber; - broker::status_subscriber status_subscriber; - }; - void DispatchMessage(broker::data msg); void ProcessEvent(std::string name, broker::vector args); void ProcessEvent(broker::bro::Event ev); @@ -379,9 +371,6 @@ private: const char* Tag() override { return "Broker::Manager"; } - broker::endpoint& Endpoint() - { assert(bstate); return bstate->endpoint; } - Func* log_topic_func; std::string default_log_topic_prefix; uint16_t bound_port;