diff --git a/src/broker/WebSocketShim.cc b/src/broker/WebSocketShim.cc index 0deb979b8d..0b21eb333b 100644 --- a/src/broker/WebSocketShim.cc +++ b/src/broker/WebSocketShim.cc @@ -180,3 +180,247 @@ void WebSocketShim::ProcessMessage(std::string_view topic, broker::zeek::Invalid } } // namespace zeek::Broker + + +/* + * The below tests are sanity tests for broker's endpoints, hubs and subscriber functionality + * required by the WebSocketShim implementation. + */ +#include "broker/message.hh" +#include "broker/publisher.hh" + +#include "zeek/3rdparty/doctest.h" + +namespace { + +TEST_SUITE_BEGIN("broker-websocket-shim"); +using namespace std::literals; + +TEST_CASE("tests") { + // broker::set_console_logger("debug"); + broker::broker_options opts; + opts.skip_ssl_init = true; + opts.disable_forwarding = true; + opts.disable_ssl = true; + broker::configuration conf{opts}; + auto ep = broker::endpoint(std::move(conf)); + + auto hub1 = ep.make_hub({"/abc"}); + auto hub2 = ep.make_hub({"/abc", "/cde"}); + auto sub1 = ep.make_subscriber({"/abc"}); + auto sub2 = ep.make_subscriber({"/abc", "/cde"}); + + auto pub1 = ep.make_publisher(broker::topic{"/abc"}); + auto pub2 = ep.make_publisher(broker::topic{"/abc"}); + + auto make_message = [](int value) { return broker::list_builder{}.add(value).build(); }; + + auto make_data_message = [](auto topic, int value) { + auto msg = broker::list_builder{}.add(value).build(); + return broker::make_data_message(broker::topic{topic}, msg); + }; + + SUBCASE("endpoint-publish") { + // Publishing through the endpoint is visible to hubs, but not subscribers. + auto dmsg = make_data_message("/abc", 1); + ep.publish(dmsg->topic(), dmsg->value().to_data()); + + auto h1msg = hub1.get(1000ms); + REQUIRE(h1msg != nullptr); + CHECK_EQ("(1)", broker::to_string(h1msg->value())); + + auto h2msg = hub2.get(1000ms); + REQUIRE(h2msg != nullptr); + CHECK_EQ("(1)", broker::to_string(h2msg->value())); + + auto s1msg = sub1.get(150ms); + CHECK_FALSE(s1msg.has_value()); + auto s2msg = sub2.get(150ms); + CHECK_FALSE(s2msg.has_value()); + } + + SUBCASE("publisher-publish") { + // Publishing through a publisher is visible to hubs, but not subscribers. + auto dmsg = make_data_message("/abc", 2); + pub1.publish(dmsg->value().to_data()); + + auto h1msg = hub1.get(1000ms); + REQUIRE(h1msg != nullptr); + CHECK_EQ("(2)", broker::to_string(h1msg->value())); + + auto h2msg = hub2.get(1000ms); + REQUIRE(h2msg != nullptr); + CHECK_EQ("(2)", broker::to_string(h2msg->value())); + + auto s1msg = sub1.get(150ms); + CHECK_FALSE(s1msg.has_value()); + + auto s2msg = sub2.get(150ms); + CHECK_FALSE(s2msg.has_value()); + } + + SUBCASE("hub-publish") { + // Publishing on a hub is visible to subscribers and other hubs, but not the hub itself. + auto dmsg = make_data_message("/abc", 3); + hub1.publish(dmsg); + + auto h1msg = hub1.get(150ms); + REQUIRE(h1msg == nullptr); + + auto h2msg = hub2.get(1000ms); + REQUIRE(h2msg != nullptr); + CHECK_EQ("(3)", broker::to_string(h2msg->value())); + + auto s1msg = sub1.get(1000ms); + REQUIRE(s1msg.has_value()); + CHECK_EQ("(3)", broker::to_string(s1msg.value()->value())); + + auto s2msg = sub2.get(1000ms); + REQUIRE(s2msg.has_value()); + CHECK_EQ("(3)", broker::to_string(s2msg.value()->value())); + } + + SUBCASE("hub-publish-topic-cde") { + // Ensure subscription filtering works, hub1 and sub1 are not subscribed to /cde. + auto dmsg = make_data_message("/cde", 3); + hub1.publish(dmsg); + + auto h1msg = hub1.get(150ms); + REQUIRE(h1msg == nullptr); + + auto h2msg = hub2.get(1000ms); + REQUIRE(h2msg != nullptr); + CHECK_EQ("(3)", broker::to_string(h2msg->value())); + + // sub1 is not subscribed to /cde, doesn't see the message. + auto s1msg = sub1.get(150ms); + CHECK_FALSE(s1msg.has_value()); + + auto s2msg = sub2.get(1000ms); + REQUIRE(s2msg.has_value()); + CHECK_EQ("(3)", broker::to_string(s2msg.value()->value())); + } + + SUBCASE("hub-publish-recreated-hub-subscriber") { + // Re-create hub1 and sub1 with different subscriptions. + hub1 = ep.make_hub({"/efg"}); + sub1 = ep.make_subscriber({"/efg"}); + + // Re-create hub1 and sub1 with different subscriptions + auto dmsg = make_data_message("/efg", 5); + + // Publish through hub2, so hub1, sub1 and sub2 could see the message. + hub2.publish(dmsg); + + auto h1msg = hub1.get(1000ms); + REQUIRE(h1msg != nullptr); + CHECK_EQ("(5)", broker::to_string(h1msg->value())); + + auto h2msg = hub2.get(150ms); + CHECK(h2msg == nullptr); + + auto s1msg = sub1.get(1000ms); + REQUIRE(s1msg.has_value()); + CHECK_EQ("(5)", broker::to_string(s1msg.value()->value())); + + // s2 does not have a /efg subscription. + auto s2msg = sub2.get(150ms); + CHECK_FALSE(s2msg.has_value()); + } + + SUBCASE("remote") { + // Create a second endpoint ep2 and peer it with ep. + broker::configuration conf2{opts}; + auto ep2 = broker::endpoint(std::move(conf2)); + + auto ep2hub1 = ep2.make_hub({"/abc"}); + auto ep2hub2 = ep2.make_hub({"/abc", "/cde"}); + auto ep2sub1 = ep2.make_subscriber({"/abc"}); + auto ep2sub2 = ep2.make_subscriber({"/abc", "/cde"}); + + + auto ep2_port = ep2.listen("127.0.0.1"); + REQUIRE(ep2_port != 0); + ep.peer("127.0.0.1", ep2_port); + + REQUIRE(ep.await_peer(ep2.node_id())); + + SUBCASE("ep2 publish") { + // Publishing from remote endpoint ep2: All subscribers and all hubs attached to ep receive the message. + auto dmsg = make_data_message("/abc", 40); + ep2.publish(dmsg->topic(), dmsg->value().to_data()); + + auto h1msg = hub1.get(1000ms); + REQUIRE(h1msg != nullptr); + CHECK_EQ("(40)", broker::to_string(h1msg->value())); + + auto h2msg = hub2.get(1000ms); + REQUIRE(h2msg != nullptr); + CHECK_EQ("(40)", broker::to_string(h2msg->value())); + + auto s1msg = sub1.get(1000ms); + REQUIRE(s1msg.has_value()); + CHECK_EQ("(40)", broker::to_string(s1msg.value()->value())); + + auto s2msg = sub2.get(1000ms); + REQUIRE(s2msg.has_value()); + CHECK_EQ("(40)", broker::to_string(s2msg.value()->value())); + + // Ensure the local hubs see it, too, and the local subscribers don't + auto ep2h1msg = ep2hub1.get(1000ms); + REQUIRE(h1msg != nullptr); + CHECK_EQ("(40)", broker::to_string(h1msg->value())); + + auto ep2h2msg = ep2hub2.get(1000ms); + REQUIRE(h2msg != nullptr); + CHECK_EQ("(40)", broker::to_string(h2msg->value())); + + auto ep2s1msg = ep2sub1.get(150ms); + CHECK_FALSE(ep2s1msg.has_value()); + + auto ep2s2msg = ep2sub2.get(150ms); + CHECK_FALSE(ep2s2msg.has_value()); + } + + SUBCASE("ep2 hub") { + // Publishing from a hub on endpoint ep2: All subscribers and all hubs attached to ep receive the message. + auto dmsg = make_data_message("/abc", 41); + ep2hub1.publish(dmsg); + + auto h1msg = hub1.get(1000ms); + REQUIRE(h1msg != nullptr); + CHECK_EQ("(41)", broker::to_string(h1msg->value())); + + auto h2msg = hub2.get(1000ms); + REQUIRE(h2msg != nullptr); + CHECK_EQ("(41)", broker::to_string(h2msg->value())); + + auto s1msg = sub1.get(1000ms); + REQUIRE(s1msg.has_value()); + CHECK_EQ("(41)", broker::to_string(s1msg.value()->value())); + + auto s2msg = sub2.get(1000ms); + REQUIRE(s2msg.has_value()); + CHECK_EQ("(41)", broker::to_string(s2msg.value()->value())); + + // And all hubs and subscribers on ep2, too, but not the sending one. + auto ep2h1msg = ep2hub1.get(150ms); + CHECK(ep2h1msg == nullptr); + + auto ep2h2msg = ep2hub2.get(1000ms); + REQUIRE(ep2h2msg != nullptr); + CHECK_EQ("(41)", broker::to_string(h2msg->value())); + + auto ep2s1msg = ep2sub1.get(1000ms); + REQUIRE(ep2s1msg.has_value()); + CHECK_EQ("(41)", broker::to_string(ep2s1msg.value()->value())); + + auto ep2s2msg = ep2sub2.get(1000ms); + REQUIRE(ep2s2msg.has_value()); + CHECK_EQ("(41)", broker::to_string(s2msg.value()->value())); + } + }; +} + +TEST_SUITE_END(); +} // namespace