diff --git a/auxil/broker b/auxil/broker index 8493e1734c..cfb97d3388 160000 --- a/auxil/broker +++ b/auxil/broker @@ -1 +1 @@ -Subproject commit 8493e1734c639ea9b3c66547ba26ebea972e102b +Subproject commit cfb97d338834af1f5c6f9e13ade8a1c5141ae062 diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index 20ba72f28e..d9e00b2291 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -275,7 +275,7 @@ void Manager::InitPostScript() broker::broker_options options; options.disable_ssl = get_option("Broker::disable_ssl")->AsBool(); - options.forward = get_option("Broker::forward_messages")->AsBool(); + options.disable_forwarding = !get_option("Broker::forward_messages")->AsBool(); options.use_real_time = use_real_time; broker::configuration config{std::move(options)}; @@ -474,10 +474,6 @@ void Manager::Terminate() CloseStore(x); FlushLogBuffers(); - - for ( auto& p : bstate->endpoint.peers() ) - if ( p.peer.network ) - bstate->endpoint.unpeer(p.peer.network->address, p.peer.network->port); } bool Manager::Active() @@ -554,7 +550,8 @@ void Manager::Peer(const string& addr, uint16_t port, double retry) if ( bstate->endpoint.is_shutdown() ) return; - DBG_LOG(DBG_BROKER, "Starting to peer with %s:%" PRIu16, addr.c_str(), port); + DBG_LOG(DBG_BROKER, "Starting to peer with %s:%" PRIu16 " (retry: %fs)", + addr.c_str(), port, retry); auto e = getenv("ZEEK_DEFAULT_CONNECT_RETRY"); @@ -1657,6 +1654,14 @@ void Manager::ProcessStatus(broker::status_view stat) event = ::Broker::peer_lost; break; + case broker::sc::endpoint_discovered: + event = ::Broker::endpoint_discovered; + break; + + case broker::sc::endpoint_unreachable: + event = ::Broker::endpoint_unreachable; + break; + default: reporter->Warning("Unhandled Broker status: %s", to_string(stat).data()); break; diff --git a/src/broker/Manager.h b/src/broker/Manager.h index 4649a668ef..8e03474c41 100644 --- a/src/broker/Manager.h +++ b/src/broker/Manager.h @@ -8,10 +8,6 @@ #include #include #include -#include -#include -#include -#include #include #include #include diff --git a/src/broker/Store.h b/src/broker/Store.h index 70cba682b6..10ebad95e5 100644 --- a/src/broker/Store.h +++ b/src/broker/Store.h @@ -122,7 +122,7 @@ public: broker::store store; broker::store::proxy proxy; - broker::publisher_id store_pid; + broker::entity_id store_pid; // Zeek table that events are forwarded to. TableValPtr forward_to; bool have_store = false; diff --git a/src/broker/comm.bif b/src/broker/comm.bif index 2b6d4be802..e4552d84f5 100644 --- a/src/broker/comm.bif +++ b/src/broker/comm.bif @@ -19,6 +19,12 @@ event Broker::peer_removed%(endpoint: EndpointInfo, msg: string%); ## Generated when an existing peering has been lost. event Broker::peer_lost%(endpoint: EndpointInfo, msg: string%); +## Generated when a new Broker endpoint appeared. +event Broker::endpoint_discovered%(endpoint: EndpointInfo, msg: string%); + +## Generated when the last path to a Broker endpoint has been lost. +event Broker::endpoint_unreachable%(endpoint: EndpointInfo, msg: string%); + ## Generated when an error occurs in the Broker sub-system. event Broker::error%(code: ErrorCode, msg: string%); diff --git a/src/broker/store.bif b/src/broker/store.bif index 87f614f373..d29a44191d 100644 --- a/src/broker/store.bif +++ b/src/broker/store.bif @@ -100,6 +100,11 @@ function Broker::__close%(h: opaque of Broker::Store%): bool return val_mgr->False(); } + if ( ! handle->have_store ) + { + return val_mgr->False(); + } + return zeek::val_mgr->Bool(broker_mgr->CloseStore(handle->store.name())); %} @@ -325,6 +330,12 @@ function Broker::__keys%(h: opaque of Broker::Store%): Broker::QueryResult return zeek::Broker::detail::query_result(); } + if ( ! handle->have_store ) + { + zeek::emit_builtin_error("cannot retrieve keys from a closed store", h); + return zeek::Broker::detail::query_result(); + } + auto trigger = frame->GetTrigger(); if ( ! trigger ) diff --git a/testing/benchmark/broker/node.zeek b/testing/benchmark/broker/node.zeek new file mode 100644 index 0000000000..14c7afd34a --- /dev/null +++ b/testing/benchmark/broker/node.zeek @@ -0,0 +1,33 @@ +redef exit_only_after_terminate = T; + +global event_count = 0; + +global event_1: event(val: count); + +event event_1(value: count) + { + ++event_count; + } + +event bye_bye() + { + print "received bye-bye event"; + terminate(); + } + +event print_stats() + { + print "received ", event_count, " events/s"; + event_count = 0; + schedule 1sec { print_stats() }; + } + +event zeek_init() + { + local broker_port = to_port(getenv("BROKER_PORT")); + print "trying to connect to port ", broker_port; + Broker::subscribe("benchmark/terminate"); + Broker::subscribe("benchmark/events"); + Broker::peer("127.0.0.1", broker_port); + schedule 1sec { print_stats() }; + } diff --git a/testing/benchmark/broker/sender.zeek b/testing/benchmark/broker/sender.zeek new file mode 100644 index 0000000000..da020e69e6 --- /dev/null +++ b/testing/benchmark/broker/sender.zeek @@ -0,0 +1,28 @@ +redef exit_only_after_terminate = T; + +global value = 0; + +global event_1: event(val: count); + +event bye_bye() + { + print "received bye-bye event"; + terminate(); + } + +event publish_next() + { + Broker::publish("benchmark/events", event_1, value); + ++value; + schedule 1msec { publish_next() }; + } + +event zeek_init() + { + local broker_port = to_port(getenv("BROKER_PORT")); + print fmt("trying to connect to port %s", broker_port); + Broker::subscribe("benchmark/terminate"); + Broker::peer("127.0.0.1", broker_port); + schedule 250usec { publish_next() }; + } + diff --git a/testing/benchmark/broker/server.zeek b/testing/benchmark/broker/server.zeek new file mode 100644 index 0000000000..d85e7e754f --- /dev/null +++ b/testing/benchmark/broker/server.zeek @@ -0,0 +1,33 @@ +redef exit_only_after_terminate = T; + +global event_count = 0; + +global event_1: event(val: count); + +event event_1(value: count) + { + ++event_count; + } + +event bye_bye() + { + print "received bye-bye event"; + terminate(); + } + +event print_stats() + { + print "received ", event_count, " events/s"; + event_count = 0; + schedule 1sec { print_stats() }; + } + +event zeek_init() + { + local broker_port = to_port(getenv("BROKER_PORT")); + Broker::subscribe("benchmark/terminate"); + Broker::subscribe("benchmark/events"); + Broker::listen("127.0.0.1", broker_port); + print fmt("listening on port %d", broker_port); + schedule 1sec { print_stats() }; + } diff --git a/testing/btest/Baseline/broker.disconnect/recv.recv.out b/testing/btest/Baseline/broker.disconnect/recv.recv.out index c512d83470..73c3301683 100644 --- a/testing/btest/Baseline/broker.disconnect/recv.recv.out +++ b/testing/btest/Baseline/broker.disconnect/recv.recv.out @@ -1,3 +1,4 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +endpoint discovered, found a new peer in the network peer added, handshake successful receiver got event, 1 diff --git a/testing/btest/Baseline/broker.disconnect/recv2.recv2.out b/testing/btest/Baseline/broker.disconnect/recv2.recv2.out index 335187e4b6..6091d6a0e4 100644 --- a/testing/btest/Baseline/broker.disconnect/recv2.recv2.out +++ b/testing/btest/Baseline/broker.disconnect/recv2.recv2.out @@ -1,3 +1,4 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +endpoint discovered, found a new peer in the network peer added, handshake successful receiver got event, 2 diff --git a/testing/btest/Baseline/broker.disconnect/send.send.out b/testing/btest/Baseline/broker.disconnect/send.send.out index 95f77841f4..ce37aaa5a1 100644 --- a/testing/btest/Baseline/broker.disconnect/send.send.out +++ b/testing/btest/Baseline/broker.disconnect/send.send.out @@ -1,5 +1,9 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +endpoint discovered, found a new peer in the network peer added, handshake successful peer lost, lost connection to remote peer +endpoint unreachable, lost the last path +endpoint discovered, found a new peer in the network peer added, handshake successful peer lost, lost connection to remote peer +endpoint unreachable, lost the last path diff --git a/testing/btest/Baseline/broker.error/send.out b/testing/btest/Baseline/broker.error/send.out index da3c12e129..33955b2d4a 100644 --- a/testing/btest/Baseline/broker.error/send.out +++ b/testing/btest/Baseline/broker.error/send.out @@ -1,2 +1,2 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. -error, Broker::PEER_INVALID, (invalid-node, *1.2.3.4:1947, "cannot unpeer from unknown peer") +error, Broker::PEER_INVALID, (00000000-0000-0000-0000-000000000000, *1.2.3.4:1947, "cannot unpeer from unknown peer") diff --git a/testing/btest/Baseline/broker.ssl_auth_failure/send.send.out b/testing/btest/Baseline/broker.ssl_auth_failure/send.send.out index 861f3823e9..0085e61336 100644 --- a/testing/btest/Baseline/broker.ssl_auth_failure/send.send.out +++ b/testing/btest/Baseline/broker.ssl_auth_failure/send.send.out @@ -1,2 +1,2 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. -sender error: code=Broker::PEER_UNAVAILABLE msg=(invalid-node, *, "unable to connect to remote peer") +sender error: code=Broker::PEER_UNAVAILABLE msg=(00000000-0000-0000-0000-000000000000, *, "unable to connect to remote peer") diff --git a/testing/btest/broker/connect-on-retry.zeek b/testing/btest/broker/connect-on-retry.zeek index 42ecd21c4d..7fe700a2cb 100644 --- a/testing/btest/broker/connect-on-retry.zeek +++ b/testing/btest/broker/connect-on-retry.zeek @@ -1,3 +1,5 @@ +# @TEST-GROUP: broker +# # @TEST-PORT: BROKER_PORT # # @TEST-EXEC: btest-bg-run recv "zeek -b ../recv.zeek >recv.out" diff --git a/testing/btest/broker/disconnect.zeek b/testing/btest/broker/disconnect.zeek index f44fad143b..701172eeb6 100644 --- a/testing/btest/broker/disconnect.zeek +++ b/testing/btest/broker/disconnect.zeek @@ -1,3 +1,5 @@ +# @TEST-GROUP: broker +# # @TEST-PORT: BROKER_PORT # @TEST-EXEC: btest-bg-run recv "zeek -b ../recv.zeek >recv.out" @@ -7,7 +9,7 @@ # @TEST-EXEC: btest-bg-run recv2 "zeek -b ../recv.zeek >recv2.out" # @TEST-EXEC: btest-bg-wait 45 - +# # @TEST-EXEC: btest-diff send/send.out # @TEST-EXEC: btest-diff recv/recv.out # @TEST-EXEC: btest-diff recv2/recv2.out @@ -33,10 +35,6 @@ event zeek_init() event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) { print "peer lost", msg; - system("touch lost"); - - if ( peers == 2 ) - terminate(); } event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) @@ -46,6 +44,20 @@ event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) Broker::publish(test_topic, my_event, peers); } +event Broker::endpoint_discovered(endpoint: Broker::EndpointInfo, msg: string) + { + print "endpoint discovered", msg; + } + +event Broker::endpoint_unreachable(endpoint: Broker::EndpointInfo, msg: string) + { + print "endpoint unreachable", msg; + system("touch lost"); + + if ( peers == 2 ) + terminate(); + } + @TEST-END-FILE @@ -77,4 +89,14 @@ event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) print "peer added", msg; } +event Broker::endpoint_discovered(endpoint: Broker::EndpointInfo, msg: string) + { + print "endpoint discovered", msg; + } + +event Broker::endpoint_unreachable(endpoint: Broker::EndpointInfo, msg: string) + { + print "endpoint unreachable", msg; + } + @TEST-END-FILE diff --git a/testing/btest/broker/error.zeek b/testing/btest/broker/error.zeek index 4a7887707e..c2686274b7 100644 --- a/testing/btest/broker/error.zeek +++ b/testing/btest/broker/error.zeek @@ -1,6 +1,8 @@ +# @TEST-GROUP: broker +# # @TEST-EXEC: zeek -b send.zeek >send.out # @TEST-EXEC: btest-diff send.out -# +# @TEST-START-FILE send.zeek diff --git a/testing/btest/broker/opaque.zeek b/testing/btest/broker/opaque.zeek index 41823a59ed..374ab7dce5 100644 --- a/testing/btest/broker/opaque.zeek +++ b/testing/btest/broker/opaque.zeek @@ -1,3 +1,5 @@ +# @TEST-GROUP: broker +# # @TEST-EXEC: zeek -b %INPUT >out # @TEST-EXEC: btest-diff out # @TEST-EXEC: btest-diff .stderr diff --git a/testing/btest/broker/remote_event.zeek b/testing/btest/broker/remote_event.zeek index e863af5523..ddc4092841 100644 --- a/testing/btest/broker/remote_event.zeek +++ b/testing/btest/broker/remote_event.zeek @@ -1,3 +1,5 @@ +# @TEST-GROUP: broker +# # @TEST-PORT: BROKER_PORT # # @TEST-EXEC: btest-bg-run recv "zeek -b ../recv.zeek >recv.out" diff --git a/testing/btest/broker/remote_event_any.zeek b/testing/btest/broker/remote_event_any.zeek index 28a5a1abbe..2a290376ca 100644 --- a/testing/btest/broker/remote_event_any.zeek +++ b/testing/btest/broker/remote_event_any.zeek @@ -1,3 +1,5 @@ +# @TEST-GROUP: broker +# # @TEST-PORT: BROKER_PORT # # @TEST-EXEC: btest-bg-run recv "zeek -b ../recv.zeek >recv.out" diff --git a/testing/btest/broker/remote_event_auto.zeek b/testing/btest/broker/remote_event_auto.zeek index c5497997ac..264f131708 100644 --- a/testing/btest/broker/remote_event_auto.zeek +++ b/testing/btest/broker/remote_event_auto.zeek @@ -1,3 +1,5 @@ +# @TEST-GROUP: broker +# # @TEST-PORT: BROKER_PORT # # @TEST-EXEC: btest-bg-run recv "zeek -b ../recv.zeek >recv.out" diff --git a/testing/btest/broker/remote_event_ssl_auth.zeek b/testing/btest/broker/remote_event_ssl_auth.zeek index 95ce393e0a..d3882ab2b1 100644 --- a/testing/btest/broker/remote_event_ssl_auth.zeek +++ b/testing/btest/broker/remote_event_ssl_auth.zeek @@ -1,3 +1,5 @@ +# @TEST-GROUP: broker +# # @TEST-PORT: BROKER_PORT # # @TEST-EXEC: btest-bg-run recv "zeek -b ../recv.zeek >recv.out" diff --git a/testing/btest/broker/remote_event_vector_any.zeek b/testing/btest/broker/remote_event_vector_any.zeek index 628180331d..36bc896a59 100644 --- a/testing/btest/broker/remote_event_vector_any.zeek +++ b/testing/btest/broker/remote_event_vector_any.zeek @@ -1,3 +1,5 @@ +# @TEST-GROUP: broker +# # @TEST-PORT: BROKER_PORT # # @TEST-EXEC: btest-bg-run recv "zeek -b ../recv.zeek >recv.out" diff --git a/testing/btest/broker/remote_id.zeek b/testing/btest/broker/remote_id.zeek index 35e2416912..c88fa80ac0 100644 --- a/testing/btest/broker/remote_id.zeek +++ b/testing/btest/broker/remote_id.zeek @@ -1,3 +1,5 @@ +# @TEST-GROUP: broker +# # @TEST-PORT: BROKER_PORT # # @TEST-EXEC: btest-bg-run recv "zeek -b ../recv.zeek >recv.out" diff --git a/testing/btest/broker/remote_log.zeek b/testing/btest/broker/remote_log.zeek index 2f50b0a766..6158e0572a 100644 --- a/testing/btest/broker/remote_log.zeek +++ b/testing/btest/broker/remote_log.zeek @@ -1,3 +1,5 @@ +# @TEST-GROUP: broker +# # @TEST-PORT: BROKER_PORT # @TEST-EXEC: btest-bg-run recv "zeek -b ../recv.zeek >recv.out" diff --git a/testing/btest/broker/remote_log_batch.zeek b/testing/btest/broker/remote_log_batch.zeek index b378deb56b..898968bceb 100644 --- a/testing/btest/broker/remote_log_batch.zeek +++ b/testing/btest/broker/remote_log_batch.zeek @@ -1,3 +1,5 @@ +# @TEST-GROUP: broker +# # @TEST-PORT: BROKER_PORT # @TEST-EXEC: btest-bg-run recv "zeek -b ../recv.zeek >recv.out" diff --git a/testing/btest/broker/remote_log_late_join.zeek b/testing/btest/broker/remote_log_late_join.zeek index 7e69bdd496..271c77f71a 100644 --- a/testing/btest/broker/remote_log_late_join.zeek +++ b/testing/btest/broker/remote_log_late_join.zeek @@ -1,3 +1,5 @@ +# @TEST-GROUP: broker +# # @TEST-PORT: BROKER_PORT # @TEST-EXEC: btest-bg-run recv "zeek -b ../recv.zeek >recv.out" diff --git a/testing/btest/broker/remote_log_types.zeek b/testing/btest/broker/remote_log_types.zeek index a14a12586d..e1ff5f304c 100644 --- a/testing/btest/broker/remote_log_types.zeek +++ b/testing/btest/broker/remote_log_types.zeek @@ -1,3 +1,5 @@ +# @TEST-GROUP: broker +# # @TEST-PORT: BROKER_PORT # @TEST-EXEC: btest-bg-run recv "zeek -b ../recv.zeek >recv.out" diff --git a/testing/btest/broker/ssl-mismatch.zeek b/testing/btest/broker/ssl-mismatch.zeek index 60ab15462a..d013b2b978 100644 --- a/testing/btest/broker/ssl-mismatch.zeek +++ b/testing/btest/broker/ssl-mismatch.zeek @@ -1,3 +1,5 @@ +# @TEST-GROUP: broker +# # @TEST-PORT: BROKER_PORT # # @TEST-EXEC: btest-bg-run listen "zeek -b %INPUT connect=F Broker::disable_ssl=T" diff --git a/testing/btest/broker/ssl_auth_failure.zeek b/testing/btest/broker/ssl_auth_failure.zeek index 1dffd27618..86275f0e96 100644 --- a/testing/btest/broker/ssl_auth_failure.zeek +++ b/testing/btest/broker/ssl_auth_failure.zeek @@ -1,3 +1,5 @@ +# @TEST-GROUP: broker +# # @TEST-PORT: BROKER_PORT # # @TEST-EXEC: btest-bg-run recv "zeek -b ../recv.zeek >recv.out" diff --git a/testing/btest/broker/store/brokerstore-backend-simple-reverse.zeek b/testing/btest/broker/store/brokerstore-backend-simple-reverse.zeek index 611e8f19cd..e435bc6f85 100644 --- a/testing/btest/broker/store/brokerstore-backend-simple-reverse.zeek +++ b/testing/btest/broker/store/brokerstore-backend-simple-reverse.zeek @@ -70,6 +70,9 @@ event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) @TEST-START-FILE clone.zeek +global has_node_up: bool = F; +global has_announce_masters: bool = F; + event dump_tables() { t["a"] = 5; @@ -95,7 +98,9 @@ event dump_tables() event Cluster::node_up(name: string, id: string) { Reporter::info(fmt("Node Up: %s", name)); - event dump_tables(); + has_node_up = T; + if ( has_announce_masters ) + event dump_tables(); } event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) @@ -106,6 +111,9 @@ event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) event Broker::announce_masters(masters: set[string]) { Reporter::info(fmt("Received announce_masters: %s", cat(masters))); + has_announce_masters = T; + if ( has_node_up ) + event dump_tables(); } @TEST-END-FILE diff --git a/testing/btest/broker/store/brokerstore-backend-sqlite.zeek b/testing/btest/broker/store/brokerstore-backend-sqlite.zeek index 39c781a347..042c5bec55 100644 --- a/testing/btest/broker/store/brokerstore-backend-sqlite.zeek +++ b/testing/btest/broker/store/brokerstore-backend-sqlite.zeek @@ -125,7 +125,12 @@ event dump_tables() event check_all_set() { - if ( "whatever" in t && "hi" in s && "b" in r ) + # Note: 'a' gets inserted first into 'r'. However, we may still observe 'r' + # with 'b' but without 'a'. This may happen if the clone completes + # its handshake with the server after 'a' and 'b' are already in 'r'. + # In this case, the master sends a snapshot of its state and the + # insertion events for 'a' and 'b' they may trigger in any order. + if ( "whatever" in t && "hi" in s && "a" in r && "b" in r ) event dump_tables(); else schedule 0.1sec { check_all_set() }; diff --git a/testing/btest/broker/unpeer.zeek b/testing/btest/broker/unpeer.zeek index fd057af241..6cb3cc9a8c 100644 --- a/testing/btest/broker/unpeer.zeek +++ b/testing/btest/broker/unpeer.zeek @@ -1,3 +1,5 @@ +# @TEST-GROUP: broker +# # @TEST-PORT: BROKER_PORT # # @TEST-EXEC: btest-bg-run recv "zeek -b ../recv.zeek >recv.out" @@ -6,7 +8,7 @@ # @TEST-EXEC: btest-bg-wait 45 # @TEST-EXEC: btest-diff recv/recv.out # @TEST-EXEC: btest-diff send/send.out -# +# # @TEST-EXEC: cat recv/broker.log | awk '/Broker::STATUS/ { $5="XXX"; print; }' >recv/broker.filtered.log # @TEST-EXEC: cat send/broker.log | awk '/Broker::STATUS/ { $5="XXX"; print; }' >send/broker.filtered.log # @TEST-EXEC: btest-diff recv/broker.filtered.log