Immediately apply broker subscriptions made during bro_init()

Otherwise that's begging for unit test failures due to races
This commit is contained in:
Jon Siwek 2018-08-10 17:12:53 -05:00
parent 083947af41
commit 67524f26d5
7 changed files with 24 additions and 5 deletions

10
CHANGES
View file

@ -1,4 +1,14 @@
2.5-831 | 2018-08-10 17:12:53 -0500
* Immediately apply broker subscriptions made during bro_init()
(Jon Siwek, Corelight)
* Update default broker threading configuration to use 4 threads and allow
tuning via BRO_BROKER_MAX_THREADS env. variable (Jon Siwek, Corelight)
* Misc. unit test improvements (Jon Siwek, Corelight)
2.5-826 | 2018-08-08 13:09:27 -0700 2.5-826 | 2018-08-08 13:09:27 -0700
* Add support for code coverage statistics for bro source files after running btest * Add support for code coverage statistics for bro source files after running btest

View file

@ -1 +1 @@
2.5-826 2.5-831

@ -1 +1 @@
Subproject commit 3ab9b647eedf6be674067198551d57b2b17e86d1 Subproject commit e0f9f6504db9285a48e0be490abddf959999a404

View file

@ -261,7 +261,8 @@ export {
global publish_id: function(topic: string, id: string): bool; global publish_id: function(topic: string, id: string): bool;
## Register interest in all peer event messages that use a certain topic ## Register interest in all peer event messages that use a certain topic
## prefix. ## prefix. Note that subscriptions may not be altered immediately after
## calling (except during :bro:see:`bro_init`).
## ##
## topic_prefix: a prefix to match against remote message topics. ## topic_prefix: a prefix to match against remote message topics.
## e.g. an empty prefix matches everything and "a" matches ## e.g. an empty prefix matches everything and "a" matches
@ -271,6 +272,8 @@ export {
global subscribe: function(topic_prefix: string): bool; global subscribe: function(topic_prefix: string): bool;
## Unregister interest in all peer event messages that use a topic prefix. ## Unregister interest in all peer event messages that use a topic prefix.
## Note that subscriptions may not be altered immediately after calling
## (except during :bro:see:`bro_init`).
## ##
## topic_prefix: a prefix previously supplied to a successful call to ## topic_prefix: a prefix previously supplied to a successful call to
## :bro:see:`Broker::subscribe`. ## :bro:see:`Broker::subscribe`.

View file

@ -137,6 +137,7 @@ Manager::Manager(bool arg_reading_pcaps)
{ {
bound_port = 0; bound_port = 0;
reading_pcaps = arg_reading_pcaps; reading_pcaps = arg_reading_pcaps;
after_bro_init = false;
peer_count = 0; peer_count = 0;
log_topic_func = nullptr; log_topic_func = nullptr;
vector_of_data_type = nullptr; vector_of_data_type = nullptr;
@ -847,14 +848,14 @@ RecordVal* Manager::MakeEvent(val_list* args, Frame* frame)
bool Manager::Subscribe(const string& topic_prefix) bool Manager::Subscribe(const string& topic_prefix)
{ {
DBG_LOG(DBG_BROKER, "Subscribing to topic prefix %s", topic_prefix.c_str()); DBG_LOG(DBG_BROKER, "Subscribing to topic prefix %s", topic_prefix.c_str());
bstate->subscriber.add_topic(topic_prefix); bstate->subscriber.add_topic(topic_prefix, ! after_bro_init);
return true; return true;
} }
bool Manager::Unsubscribe(const string& topic_prefix) bool Manager::Unsubscribe(const string& topic_prefix)
{ {
DBG_LOG(DBG_BROKER, "Unsubscribing from topic prefix %s", topic_prefix.c_str()); DBG_LOG(DBG_BROKER, "Unsubscribing from topic prefix %s", topic_prefix.c_str());
bstate->subscriber.remove_topic(topic_prefix); bstate->subscriber.remove_topic(topic_prefix, ! after_bro_init);
return true; return true;
} }

View file

@ -66,6 +66,9 @@ public:
*/ */
void InitPostScript(); void InitPostScript();
void BroInitDone()
{ after_bro_init = true; }
/** /**
* Shuts Broker down at termination. * Shuts Broker down at termination.
*/ */
@ -404,6 +407,7 @@ private:
uint16_t bound_port; uint16_t bound_port;
bool reading_pcaps; bool reading_pcaps;
bool after_bro_init;
int peer_count; int peer_count;
Func* log_topic_func; Func* log_topic_func;

View file

@ -1182,6 +1182,7 @@ int main(int argc, char** argv)
// Drain the event queue here to support the protocols framework configuring DPM // Drain the event queue here to support the protocols framework configuring DPM
mgr.Drain(); mgr.Drain();
broker_mgr->BroInitDone();
analyzer_mgr->DumpDebug(); analyzer_mgr->DumpDebug();
have_pending_timers = ! reading_traces && timer_mgr->Size() > 0; have_pending_timers = ! reading_traces && timer_mgr->Size() > 0;