broker/WebSocketShim: Add tests for endpoint, hubs and subscribers

These are really testing broker functionality, mostly added to ensure
the behavior is as expected by the WebSocketShim functionality.
This commit is contained in:
Arne Welzel 2025-04-22 12:03:54 +02:00
parent cb243e2d28
commit 7acedd18d0

View file

@ -180,3 +180,247 @@ void WebSocketShim::ProcessMessage(std::string_view topic, broker::zeek::Invalid
}
} // namespace zeek::Broker
/*
* The below tests are sanity tests for broker's endpoints, hubs and subscriber functionality
* required by the WebSocketShim implementation.
*/
#include "broker/message.hh"
#include "broker/publisher.hh"
#include "zeek/3rdparty/doctest.h"
namespace {
TEST_SUITE_BEGIN("broker-websocket-shim");
using namespace std::literals;
TEST_CASE("tests") {
// broker::set_console_logger("debug");
broker::broker_options opts;
opts.skip_ssl_init = true;
opts.disable_forwarding = true;
opts.disable_ssl = true;
broker::configuration conf{opts};
auto ep = broker::endpoint(std::move(conf));
auto hub1 = ep.make_hub({"/abc"});
auto hub2 = ep.make_hub({"/abc", "/cde"});
auto sub1 = ep.make_subscriber({"/abc"});
auto sub2 = ep.make_subscriber({"/abc", "/cde"});
auto pub1 = ep.make_publisher(broker::topic{"/abc"});
auto pub2 = ep.make_publisher(broker::topic{"/abc"});
auto make_message = [](int value) { return broker::list_builder{}.add(value).build(); };
auto make_data_message = [](auto topic, int value) {
auto msg = broker::list_builder{}.add(value).build();
return broker::make_data_message(broker::topic{topic}, msg);
};
SUBCASE("endpoint-publish") {
// Publishing through the endpoint is visible to hubs, but not subscribers.
auto dmsg = make_data_message("/abc", 1);
ep.publish(dmsg->topic(), dmsg->value().to_data());
auto h1msg = hub1.get(1000ms);
REQUIRE(h1msg != nullptr);
CHECK_EQ("(1)", broker::to_string(h1msg->value()));
auto h2msg = hub2.get(1000ms);
REQUIRE(h2msg != nullptr);
CHECK_EQ("(1)", broker::to_string(h2msg->value()));
auto s1msg = sub1.get(150ms);
CHECK_FALSE(s1msg.has_value());
auto s2msg = sub2.get(150ms);
CHECK_FALSE(s2msg.has_value());
}
SUBCASE("publisher-publish") {
// Publishing through a publisher is visible to hubs, but not subscribers.
auto dmsg = make_data_message("/abc", 2);
pub1.publish(dmsg->value().to_data());
auto h1msg = hub1.get(1000ms);
REQUIRE(h1msg != nullptr);
CHECK_EQ("(2)", broker::to_string(h1msg->value()));
auto h2msg = hub2.get(1000ms);
REQUIRE(h2msg != nullptr);
CHECK_EQ("(2)", broker::to_string(h2msg->value()));
auto s1msg = sub1.get(150ms);
CHECK_FALSE(s1msg.has_value());
auto s2msg = sub2.get(150ms);
CHECK_FALSE(s2msg.has_value());
}
SUBCASE("hub-publish") {
// Publishing on a hub is visible to subscribers and other hubs, but not the hub itself.
auto dmsg = make_data_message("/abc", 3);
hub1.publish(dmsg);
auto h1msg = hub1.get(150ms);
REQUIRE(h1msg == nullptr);
auto h2msg = hub2.get(1000ms);
REQUIRE(h2msg != nullptr);
CHECK_EQ("(3)", broker::to_string(h2msg->value()));
auto s1msg = sub1.get(1000ms);
REQUIRE(s1msg.has_value());
CHECK_EQ("(3)", broker::to_string(s1msg.value()->value()));
auto s2msg = sub2.get(1000ms);
REQUIRE(s2msg.has_value());
CHECK_EQ("(3)", broker::to_string(s2msg.value()->value()));
}
SUBCASE("hub-publish-topic-cde") {
// Ensure subscription filtering works, hub1 and sub1 are not subscribed to /cde.
auto dmsg = make_data_message("/cde", 3);
hub1.publish(dmsg);
auto h1msg = hub1.get(150ms);
REQUIRE(h1msg == nullptr);
auto h2msg = hub2.get(1000ms);
REQUIRE(h2msg != nullptr);
CHECK_EQ("(3)", broker::to_string(h2msg->value()));
// sub1 is not subscribed to /cde, doesn't see the message.
auto s1msg = sub1.get(150ms);
CHECK_FALSE(s1msg.has_value());
auto s2msg = sub2.get(1000ms);
REQUIRE(s2msg.has_value());
CHECK_EQ("(3)", broker::to_string(s2msg.value()->value()));
}
SUBCASE("hub-publish-recreated-hub-subscriber") {
// Re-create hub1 and sub1 with different subscriptions.
hub1 = ep.make_hub({"/efg"});
sub1 = ep.make_subscriber({"/efg"});
// Re-create hub1 and sub1 with different subscriptions
auto dmsg = make_data_message("/efg", 5);
// Publish through hub2, so hub1, sub1 and sub2 could see the message.
hub2.publish(dmsg);
auto h1msg = hub1.get(1000ms);
REQUIRE(h1msg != nullptr);
CHECK_EQ("(5)", broker::to_string(h1msg->value()));
auto h2msg = hub2.get(150ms);
CHECK(h2msg == nullptr);
auto s1msg = sub1.get(1000ms);
REQUIRE(s1msg.has_value());
CHECK_EQ("(5)", broker::to_string(s1msg.value()->value()));
// s2 does not have a /efg subscription.
auto s2msg = sub2.get(150ms);
CHECK_FALSE(s2msg.has_value());
}
SUBCASE("remote") {
// Create a second endpoint ep2 and peer it with ep.
broker::configuration conf2{opts};
auto ep2 = broker::endpoint(std::move(conf2));
auto ep2hub1 = ep2.make_hub({"/abc"});
auto ep2hub2 = ep2.make_hub({"/abc", "/cde"});
auto ep2sub1 = ep2.make_subscriber({"/abc"});
auto ep2sub2 = ep2.make_subscriber({"/abc", "/cde"});
auto ep2_port = ep2.listen("127.0.0.1");
REQUIRE(ep2_port != 0);
ep.peer("127.0.0.1", ep2_port);
REQUIRE(ep.await_peer(ep2.node_id()));
SUBCASE("ep2 publish") {
// Publishing from remote endpoint ep2: All subscribers and all hubs attached to ep receive the message.
auto dmsg = make_data_message("/abc", 40);
ep2.publish(dmsg->topic(), dmsg->value().to_data());
auto h1msg = hub1.get(1000ms);
REQUIRE(h1msg != nullptr);
CHECK_EQ("(40)", broker::to_string(h1msg->value()));
auto h2msg = hub2.get(1000ms);
REQUIRE(h2msg != nullptr);
CHECK_EQ("(40)", broker::to_string(h2msg->value()));
auto s1msg = sub1.get(1000ms);
REQUIRE(s1msg.has_value());
CHECK_EQ("(40)", broker::to_string(s1msg.value()->value()));
auto s2msg = sub2.get(1000ms);
REQUIRE(s2msg.has_value());
CHECK_EQ("(40)", broker::to_string(s2msg.value()->value()));
// Ensure the local hubs see it, too, and the local subscribers don't
auto ep2h1msg = ep2hub1.get(1000ms);
REQUIRE(h1msg != nullptr);
CHECK_EQ("(40)", broker::to_string(h1msg->value()));
auto ep2h2msg = ep2hub2.get(1000ms);
REQUIRE(h2msg != nullptr);
CHECK_EQ("(40)", broker::to_string(h2msg->value()));
auto ep2s1msg = ep2sub1.get(150ms);
CHECK_FALSE(ep2s1msg.has_value());
auto ep2s2msg = ep2sub2.get(150ms);
CHECK_FALSE(ep2s2msg.has_value());
}
SUBCASE("ep2 hub") {
// Publishing from a hub on endpoint ep2: All subscribers and all hubs attached to ep receive the message.
auto dmsg = make_data_message("/abc", 41);
ep2hub1.publish(dmsg);
auto h1msg = hub1.get(1000ms);
REQUIRE(h1msg != nullptr);
CHECK_EQ("(41)", broker::to_string(h1msg->value()));
auto h2msg = hub2.get(1000ms);
REQUIRE(h2msg != nullptr);
CHECK_EQ("(41)", broker::to_string(h2msg->value()));
auto s1msg = sub1.get(1000ms);
REQUIRE(s1msg.has_value());
CHECK_EQ("(41)", broker::to_string(s1msg.value()->value()));
auto s2msg = sub2.get(1000ms);
REQUIRE(s2msg.has_value());
CHECK_EQ("(41)", broker::to_string(s2msg.value()->value()));
// And all hubs and subscribers on ep2, too, but not the sending one.
auto ep2h1msg = ep2hub1.get(150ms);
CHECK(ep2h1msg == nullptr);
auto ep2h2msg = ep2hub2.get(1000ms);
REQUIRE(ep2h2msg != nullptr);
CHECK_EQ("(41)", broker::to_string(h2msg->value()));
auto ep2s1msg = ep2sub1.get(1000ms);
REQUIRE(ep2s1msg.has_value());
CHECK_EQ("(41)", broker::to_string(ep2s1msg.value()->value()));
auto ep2s2msg = ep2sub2.get(1000ms);
REQUIRE(ep2s2msg.has_value());
CHECK_EQ("(41)", broker::to_string(s2msg.value()->value()));
}
};
}
TEST_SUITE_END();
} // namespace