mirror of
https://github.com/zeek/zeek.git
synced 2025-10-12 19:48:20 +00:00
Fix message ordering of Broker messages
Using two separate Broker subscribers for status events and regular messages introduces a race on the two objects. Even if Broker sends all messages in a particular (deterministic) order, Zeek may still process them in a different order as a result. Since several tests rely on a strict ordering of Broker events, these tests could fail sporadically. Using only a single subscriber for all Broker messages makes sure that Zeek observes all messages in the same order as Broker emits them.
This commit is contained in:
parent
9da68ddc3d
commit
25fef3da1b
3 changed files with 76 additions and 74 deletions
|
@ -1 +1 @@
|
|||
Subproject commit 4f43a95824c700642bf466f6f41274fc4d1baebb
|
||||
Subproject commit 71534a16c50735ee268c43aa9942c57f23b9b4d4
|
|
@ -57,14 +57,14 @@ class BrokerState {
|
|||
public:
|
||||
BrokerState(BrokerConfig config, size_t congestion_queue_size)
|
||||
: endpoint(std::move(config)),
|
||||
subscriber(endpoint.make_subscriber({}, congestion_queue_size)),
|
||||
status_subscriber(endpoint.make_status_subscriber(true))
|
||||
subscriber(endpoint.make_subscriber({broker::topics::statuses,
|
||||
broker::topics::errors},
|
||||
congestion_queue_size))
|
||||
{
|
||||
}
|
||||
|
||||
broker::endpoint endpoint;
|
||||
broker::subscriber subscriber;
|
||||
broker::status_subscriber status_subscriber;
|
||||
};
|
||||
|
||||
const broker::endpoint_info Manager::NoPeer{{}, {}};
|
||||
|
@ -114,15 +114,18 @@ static std::string RenderMessage(const broker::vector& xs)
|
|||
return broker::to_string(xs);
|
||||
}
|
||||
|
||||
static std::string RenderMessage(const broker::status& s)
|
||||
static std::string RenderMessage(broker::status_view s)
|
||||
{
|
||||
return broker::to_string(s.code());
|
||||
}
|
||||
|
||||
static std::string RenderMessage(const broker::error& e)
|
||||
static std::string RenderMessage(broker::error_view e)
|
||||
{
|
||||
return util::fmt("%s (%s)", broker::to_string(e.code()).c_str(),
|
||||
caf::to_string(e.context()).c_str());
|
||||
if ( auto ctx = e.context() )
|
||||
return util::fmt("%s (%s)", to_string(e.code()).c_str(),
|
||||
to_string(*ctx).c_str());
|
||||
else
|
||||
return util::fmt("%s (null)", to_string(e.code()).c_str());
|
||||
}
|
||||
|
||||
#endif
|
||||
|
@ -213,8 +216,6 @@ void Manager::InitPostScript()
|
|||
|
||||
if ( ! iosource_mgr->RegisterFd(bstate->subscriber.fd(), this) )
|
||||
reporter->FatalError("Failed to register broker subscriber with iosource_mgr");
|
||||
if ( ! iosource_mgr->RegisterFd(bstate->status_subscriber.fd(), this) )
|
||||
reporter->FatalError("Failed to register broker status subscriber with iosource_mgr");
|
||||
|
||||
bstate->subscriber.add_topic(broker::topics::store_events, true);
|
||||
|
||||
|
@ -270,7 +271,6 @@ void Manager::Terminate()
|
|||
FlushLogBuffers();
|
||||
|
||||
iosource_mgr->UnregisterFd(bstate->subscriber.fd(), this);
|
||||
iosource_mgr->UnregisterFd(bstate->status_subscriber.fd(), this);
|
||||
|
||||
vector<string> stores_to_close;
|
||||
|
||||
|
@ -924,37 +924,44 @@ void Manager::Process()
|
|||
if ( use_real_time )
|
||||
run_state::detail::update_network_time(util::current_time());
|
||||
|
||||
bool had_input = false;
|
||||
|
||||
auto status_msgs = bstate->status_subscriber.poll();
|
||||
|
||||
for ( auto& status_msg : status_msgs )
|
||||
{
|
||||
had_input = true;
|
||||
|
||||
if ( auto stat = caf::get_if<broker::status>(&status_msg) )
|
||||
{
|
||||
ProcessStatus(std::move(*stat));
|
||||
continue;
|
||||
}
|
||||
|
||||
if ( auto err = caf::get_if<broker::error>(&status_msg) )
|
||||
{
|
||||
ProcessError(std::move(*err));
|
||||
continue;
|
||||
}
|
||||
|
||||
reporter->InternalWarning("ignoring status_subscriber message with unexpected type");
|
||||
}
|
||||
|
||||
auto messages = bstate->subscriber.poll();
|
||||
|
||||
bool had_input = ! messages.empty();
|
||||
|
||||
for ( auto& message : messages )
|
||||
{
|
||||
had_input = true;
|
||||
|
||||
auto& topic = broker::get_topic(message);
|
||||
|
||||
if ( broker::topics::statuses.prefix_of(topic) )
|
||||
{
|
||||
if ( auto stat = broker::make_status_view(get_data(message)) )
|
||||
{
|
||||
ProcessStatus(stat);
|
||||
}
|
||||
else
|
||||
{
|
||||
auto str = to_string(message);
|
||||
reporter->Warning("ignoring malformed Broker status event: %s",
|
||||
str.c_str());
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if ( broker::topics::errors.prefix_of(topic) )
|
||||
{
|
||||
if ( auto err = broker::make_error_view(get_data(message)) )
|
||||
{
|
||||
ProcessError(err);
|
||||
}
|
||||
else
|
||||
{
|
||||
auto str = to_string(message);
|
||||
reporter->Warning("ignoring malformed Broker error event: %s",
|
||||
str.c_str());
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if ( broker::topics::store_events.prefix_of(topic) )
|
||||
{
|
||||
ProcessStoreEvent(broker::move_data(message));
|
||||
|
@ -1407,11 +1414,11 @@ bool Manager::ProcessIdentifierUpdate(broker::zeek::IdentifierUpdate iu)
|
|||
return true;
|
||||
}
|
||||
|
||||
void Manager::ProcessStatus(broker::status stat)
|
||||
void Manager::ProcessStatus(broker::status_view stat)
|
||||
{
|
||||
DBG_LOG(DBG_BROKER, "Received status message: %s", RenderMessage(stat).c_str());
|
||||
|
||||
auto ctx = stat.context<broker::endpoint_info>();
|
||||
auto ctx = stat.context();
|
||||
|
||||
EventHandlerPtr event;
|
||||
switch (stat.code()) {
|
||||
|
@ -1475,51 +1482,46 @@ void Manager::ProcessStatus(broker::status stat)
|
|||
event_mgr.Enqueue(event, std::move(endpoint_info), std::move(msg));
|
||||
}
|
||||
|
||||
void Manager::ProcessError(broker::error err)
|
||||
void Manager::ProcessError(broker::error_view err)
|
||||
{
|
||||
DBG_LOG(DBG_BROKER, "Received error message: %s", RenderMessage(err).c_str());
|
||||
|
||||
if ( ! ::Broker::error )
|
||||
return;
|
||||
|
||||
auto int_code = static_cast<uint8_t>(err.code());
|
||||
|
||||
BifEnum::Broker::ErrorCode ec;
|
||||
std::string msg;
|
||||
|
||||
if ( err.category() == caf::type_id_v<broker::ec> )
|
||||
{
|
||||
static auto enum_type = id::find_type<EnumType>("Broker::ErrorCode");
|
||||
|
||||
if ( enum_type->Lookup(err.code()) )
|
||||
ec = static_cast<BifEnum::Broker::ErrorCode>(err.code());
|
||||
else
|
||||
{
|
||||
reporter->Warning("Unknown Broker error code %u: mapped to unspecificed enum value ", err.code());
|
||||
ec = BifEnum::Broker::ErrorCode::UNSPECIFIED;
|
||||
}
|
||||
|
||||
// Note: we could also use to_string, but that change the log output
|
||||
// and we would have to update all baselines relying on this format.
|
||||
if ( auto mv = caf::make_const_typed_message_view<broker::endpoint_info, std::string>(err.context()) )
|
||||
{
|
||||
msg += '(';
|
||||
msg += to_string(get<0>(mv).node);
|
||||
msg += ", ";
|
||||
msg += caf::deep_to_string(get<0>(mv).network);
|
||||
msg += ", ";
|
||||
msg += caf::deep_to_string(get<1>(mv));
|
||||
msg += ')';
|
||||
}
|
||||
else
|
||||
msg = caf::to_string(err.context());
|
||||
}
|
||||
static auto enum_type = id::find_type<EnumType>("Broker::ErrorCode");
|
||||
if ( enum_type->Lookup(int_code) )
|
||||
ec = static_cast<BifEnum::Broker::ErrorCode>(int_code);
|
||||
else
|
||||
{
|
||||
ec = BifEnum::Broker::ErrorCode::CAF_ERROR;
|
||||
auto sv = caf::query_type_name(err.category());
|
||||
std::string category{sv.begin(), sv.end()};
|
||||
msg = util::fmt("[%s] %s", category.c_str(), caf::to_string(err.context()).c_str());
|
||||
|
||||
reporter->Warning("Unknown Broker error code %u: mapped to unspecificed enum value ",
|
||||
static_cast<unsigned>(int_code));
|
||||
ec = BifEnum::Broker::ErrorCode::UNSPECIFIED;
|
||||
}
|
||||
|
||||
std::string msg;
|
||||
// Note: we could also use to_string, but that would change the log output
|
||||
// and we would have to update all baselines relying on this format.
|
||||
if ( auto ctx = err.context() )
|
||||
{
|
||||
msg += '(';
|
||||
msg += to_string(ctx->node);
|
||||
msg += ", ";
|
||||
msg += caf::deep_to_string(ctx->network);
|
||||
msg += ", ";
|
||||
if ( auto what = err.message() )
|
||||
msg += caf::deep_to_string(*what);
|
||||
else
|
||||
msg += R"_("")_";
|
||||
msg += ')';
|
||||
}
|
||||
else
|
||||
msg = "(null)";
|
||||
|
||||
event_mgr.Enqueue(::Broker::error,
|
||||
BifType::Enum::Broker::ErrorCode->GetEnumVal(ec),
|
||||
make_intrusive<StringVal>(msg));
|
||||
|
|
|
@ -367,8 +367,8 @@ private:
|
|||
bool ProcessLogCreate(broker::zeek::LogCreate lc);
|
||||
bool ProcessLogWrite(broker::zeek::LogWrite lw);
|
||||
bool ProcessIdentifierUpdate(broker::zeek::IdentifierUpdate iu);
|
||||
void ProcessStatus(broker::status stat);
|
||||
void ProcessError(broker::error err);
|
||||
void ProcessStatus(broker::status_view stat);
|
||||
void ProcessError(broker::error_view err);
|
||||
void ProcessStoreResponse(detail::StoreHandleVal*, broker::store::response response);
|
||||
void FlushPendingQueries();
|
||||
// Initializes the masters for Broker backed Zeek tables when using the &backend attribute
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue