Move internal broker/Manager classes out of header

This commit is contained in:
Jon Siwek 2018-06-25 15:50:28 -05:00
parent 0ae022205e
commit 0200b5bd88
3 changed files with 48 additions and 49 deletions

@ -1 +1 @@
Subproject commit 08f41ccc2497f4c6567da0b95488593c39a12a01
Subproject commit 17fcb73a30388862f0a921040a605ac38f47ff74

View file

@ -34,6 +34,43 @@ static const double LOG_BUFFER_INTERVAL = 1.0;
// subscribers consider themselves congested.
static const size_t SUBSCRIBER_MAX_QSIZE = 20u;
static inline Val* get_option(const char* option)
{
auto id = global_scope()->Lookup(option);
if ( ! (id && id->ID_Val()) )
reporter->FatalError("Unknown Broker option %s", option);
return id->ID_Val();
}
class BrokerConfig : public broker::configuration {
public:
BrokerConfig(broker::broker_options options)
: broker::configuration(options)
{
openssl_cafile = get_option("Broker::ssl_cafile")->AsString()->CheckString();
openssl_capath = get_option("Broker::ssl_capath")->AsString()->CheckString();
openssl_certificate = get_option("Broker::ssl_certificate")->AsString()->CheckString();
openssl_key = get_option("Broker::ssl_keyfile")->AsString()->CheckString();
openssl_passphrase = get_option("Broker::ssl_passphrase")->AsString()->CheckString();
}
};
class BrokerState {
public:
BrokerState(BrokerConfig config)
: endpoint(std::move(config)),
subscriber(endpoint.make_subscriber({}, SUBSCRIBER_MAX_QSIZE)),
status_subscriber(endpoint.make_status_subscriber(true))
{
}
broker::endpoint endpoint;
broker::subscriber subscriber;
broker::status_subscriber status_subscriber;
};
const broker::endpoint_info Manager::NoPeer{{}, {}};
VectorType* Manager::vector_of_data_type;
@ -103,33 +140,6 @@ static std::string RenderMessage(const broker::error& e)
#endif
static inline Val* get_option(const char* option)
{
auto id = global_scope()->Lookup(option);
if ( ! (id && id->ID_Val()) )
reporter->FatalError("Unknown Broker option %s", option);
return id->ID_Val();
}
Manager::BrokerConfig::BrokerConfig(broker::broker_options options)
: broker::configuration(options)
{
openssl_cafile = get_option("Broker::ssl_cafile")->AsString()->CheckString();
openssl_capath = get_option("Broker::ssl_capath")->AsString()->CheckString();
openssl_certificate = get_option("Broker::ssl_certificate")->AsString()->CheckString();
openssl_key = get_option("Broker::ssl_keyfile")->AsString()->CheckString();
openssl_passphrase = get_option("Broker::ssl_passphrase")->AsString()->CheckString();
}
Manager::BrokerState::BrokerState(BrokerConfig config)
: endpoint(std::move(config)),
subscriber(endpoint.make_subscriber({}, SUBSCRIBER_MAX_QSIZE)),
status_subscriber(endpoint.make_status_subscriber(true))
{
}
Manager::Manager(bool reading_pcaps)
{
bound_port = 0;
@ -634,7 +644,7 @@ bool Manager::PublishLogWrite(EnumVal* stream, EnumVal* writer, string path, int
if ( lb.message_count >= LOG_BATCH_SIZE ||
(network_time - lb.last_flush >= LOG_BUFFER_INTERVAL) )
statistics.num_logs_outgoing += lb.Flush(Endpoint());
statistics.num_logs_outgoing += lb.Flush(bstate->endpoint);
return true;
}
@ -671,7 +681,7 @@ size_t Manager::FlushLogBuffers()
auto rval = 0u;
for ( auto& lb : log_buffers )
rval += lb.Flush(Endpoint());
rval += lb.Flush(bstate->endpoint);
return rval;
}

View file

@ -16,6 +16,8 @@
namespace bro_broker {
class BrokerState;
/**
* Communication statistics.
*/
@ -197,8 +199,11 @@ public:
* @param peer If given, send the message only to this peer.
* @return true if the message is sent successfully.
*/
bool PublishLogCreate(EnumVal* stream, EnumVal* writer, const logging::WriterBackend::WriterInfo& info,
int num_fields, const threading::Field* const * fields, const broker::endpoint_info& peer = NoPeer);
bool PublishLogCreate(EnumVal* stream, EnumVal* writer,
const logging::WriterBackend::WriterInfo& info,
int num_fields,
const threading::Field* const * fields,
const broker::endpoint_info& peer = NoPeer);
/**
* Send a log entry to any interested peers. The topic name used is
@ -339,19 +344,6 @@ public:
private:
class BrokerConfig : public broker::configuration {
public:
BrokerConfig(broker::broker_options options);
};
class BrokerState {
public:
BrokerState(BrokerConfig config);
broker::endpoint endpoint;
broker::subscriber subscriber;
broker::status_subscriber status_subscriber;
};
void DispatchMessage(broker::data msg);
void ProcessEvent(std::string name, broker::vector args);
void ProcessEvent(broker::bro::Event ev);
@ -379,9 +371,6 @@ private:
const char* Tag() override
{ return "Broker::Manager"; }
broker::endpoint& Endpoint()
{ assert(bstate); return bstate->endpoint; }
Func* log_topic_func;
std::string default_log_topic_prefix;
uint16_t bound_port;