Port Zeek to latest Broker API

This commit is contained in:
Dominik Charousset 2020-09-04 09:31:58 +02:00 committed by Dominik Charousset
parent 013070d1a9
commit 4ea1a593a9
33 changed files with 208 additions and 23 deletions

@ -1 +1 @@
Subproject commit 8493e1734c639ea9b3c66547ba26ebea972e102b
Subproject commit cfb97d338834af1f5c6f9e13ade8a1c5141ae062

View file

@ -275,7 +275,7 @@ void Manager::InitPostScript()
broker::broker_options options;
options.disable_ssl = get_option("Broker::disable_ssl")->AsBool();
options.forward = get_option("Broker::forward_messages")->AsBool();
options.disable_forwarding = !get_option("Broker::forward_messages")->AsBool();
options.use_real_time = use_real_time;
broker::configuration config{std::move(options)};
@ -474,10 +474,6 @@ void Manager::Terminate()
CloseStore(x);
FlushLogBuffers();
for ( auto& p : bstate->endpoint.peers() )
if ( p.peer.network )
bstate->endpoint.unpeer(p.peer.network->address, p.peer.network->port);
}
bool Manager::Active()
@ -554,7 +550,8 @@ void Manager::Peer(const string& addr, uint16_t port, double retry)
if ( bstate->endpoint.is_shutdown() )
return;
DBG_LOG(DBG_BROKER, "Starting to peer with %s:%" PRIu16, addr.c_str(), port);
DBG_LOG(DBG_BROKER, "Starting to peer with %s:%" PRIu16 " (retry: %fs)",
addr.c_str(), port, retry);
auto e = getenv("ZEEK_DEFAULT_CONNECT_RETRY");
@ -1657,6 +1654,14 @@ void Manager::ProcessStatus(broker::status_view stat)
event = ::Broker::peer_lost;
break;
case broker::sc::endpoint_discovered:
event = ::Broker::endpoint_discovered;
break;
case broker::sc::endpoint_unreachable:
event = ::Broker::endpoint_unreachable;
break;
default:
reporter->Warning("Unhandled Broker status: %s", to_string(stat).data());
break;

View file

@ -8,10 +8,6 @@
#include <broker/endpoint_info.hh>
#include <broker/error.hh>
#include <broker/peer_info.hh>
#include <broker/publisher_id.hh>
#include <broker/status.hh>
#include <broker/store.hh>
#include <broker/topic.hh>
#include <broker/zeek.hh>
#include <memory>
#include <string>

View file

@ -122,7 +122,7 @@ public:
broker::store store;
broker::store::proxy proxy;
broker::publisher_id store_pid;
broker::entity_id store_pid;
// Zeek table that events are forwarded to.
TableValPtr forward_to;
bool have_store = false;

View file

@ -19,6 +19,12 @@ event Broker::peer_removed%(endpoint: EndpointInfo, msg: string%);
## Generated when an existing peering has been lost.
event Broker::peer_lost%(endpoint: EndpointInfo, msg: string%);
## Generated when a new Broker endpoint appeared.
event Broker::endpoint_discovered%(endpoint: EndpointInfo, msg: string%);
## Generated when the last path to a Broker endpoint has been lost.
event Broker::endpoint_unreachable%(endpoint: EndpointInfo, msg: string%);
## Generated when an error occurs in the Broker sub-system.
event Broker::error%(code: ErrorCode, msg: string%);

View file

@ -100,6 +100,11 @@ function Broker::__close%(h: opaque of Broker::Store%): bool
return val_mgr->False();
}
if ( ! handle->have_store )
{
return val_mgr->False();
}
return zeek::val_mgr->Bool(broker_mgr->CloseStore(handle->store.name()));
%}
@ -325,6 +330,12 @@ function Broker::__keys%(h: opaque of Broker::Store%): Broker::QueryResult
return zeek::Broker::detail::query_result();
}
if ( ! handle->have_store )
{
zeek::emit_builtin_error("cannot retrieve keys from a closed store", h);
return zeek::Broker::detail::query_result();
}
auto trigger = frame->GetTrigger();
if ( ! trigger )

View file

@ -0,0 +1,33 @@
redef exit_only_after_terminate = T;
global event_count = 0;
global event_1: event(val: count);
event event_1(value: count)
{
++event_count;
}
event bye_bye()
{
print "received bye-bye event";
terminate();
}
event print_stats()
{
print "received ", event_count, " events/s";
event_count = 0;
schedule 1sec { print_stats() };
}
event zeek_init()
{
local broker_port = to_port(getenv("BROKER_PORT"));
print "trying to connect to port ", broker_port;
Broker::subscribe("benchmark/terminate");
Broker::subscribe("benchmark/events");
Broker::peer("127.0.0.1", broker_port);
schedule 1sec { print_stats() };
}

View file

@ -0,0 +1,28 @@
redef exit_only_after_terminate = T;
global value = 0;
global event_1: event(val: count);
event bye_bye()
{
print "received bye-bye event";
terminate();
}
event publish_next()
{
Broker::publish("benchmark/events", event_1, value);
++value;
schedule 1msec { publish_next() };
}
event zeek_init()
{
local broker_port = to_port(getenv("BROKER_PORT"));
print fmt("trying to connect to port %s", broker_port);
Broker::subscribe("benchmark/terminate");
Broker::peer("127.0.0.1", broker_port);
schedule 250usec { publish_next() };
}

View file

@ -0,0 +1,33 @@
redef exit_only_after_terminate = T;
global event_count = 0;
global event_1: event(val: count);
event event_1(value: count)
{
++event_count;
}
event bye_bye()
{
print "received bye-bye event";
terminate();
}
event print_stats()
{
print "received ", event_count, " events/s";
event_count = 0;
schedule 1sec { print_stats() };
}
event zeek_init()
{
local broker_port = to_port(getenv("BROKER_PORT"));
Broker::subscribe("benchmark/terminate");
Broker::subscribe("benchmark/events");
Broker::listen("127.0.0.1", broker_port);
print fmt("listening on port %d", broker_port);
schedule 1sec { print_stats() };
}

View file

@ -1,3 +1,4 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
endpoint discovered, found a new peer in the network
peer added, handshake successful
receiver got event, 1

View file

@ -1,3 +1,4 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
endpoint discovered, found a new peer in the network
peer added, handshake successful
receiver got event, 2

View file

@ -1,5 +1,9 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
endpoint discovered, found a new peer in the network
peer added, handshake successful
peer lost, lost connection to remote peer
endpoint unreachable, lost the last path
endpoint discovered, found a new peer in the network
peer added, handshake successful
peer lost, lost connection to remote peer
endpoint unreachable, lost the last path

View file

@ -1,2 +1,2 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
error, Broker::PEER_INVALID, (invalid-node, *1.2.3.4:1947, "cannot unpeer from unknown peer")
error, Broker::PEER_INVALID, (00000000-0000-0000-0000-000000000000, *1.2.3.4:1947, "cannot unpeer from unknown peer")

View file

@ -1,2 +1,2 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
sender error: code=Broker::PEER_UNAVAILABLE msg=(invalid-node, *<endpoint addr:port>, "unable to connect to remote peer")
sender error: code=Broker::PEER_UNAVAILABLE msg=(00000000-0000-0000-0000-000000000000, *<endpoint addr:port>, "unable to connect to remote peer")

View file

@ -1,3 +1,5 @@
# @TEST-GROUP: broker
#
# @TEST-PORT: BROKER_PORT
#
# @TEST-EXEC: btest-bg-run recv "zeek -b ../recv.zeek >recv.out"

View file

@ -1,3 +1,5 @@
# @TEST-GROUP: broker
#
# @TEST-PORT: BROKER_PORT
# @TEST-EXEC: btest-bg-run recv "zeek -b ../recv.zeek >recv.out"
@ -7,7 +9,7 @@
# @TEST-EXEC: btest-bg-run recv2 "zeek -b ../recv.zeek >recv2.out"
# @TEST-EXEC: btest-bg-wait 45
#
# @TEST-EXEC: btest-diff send/send.out
# @TEST-EXEC: btest-diff recv/recv.out
# @TEST-EXEC: btest-diff recv2/recv2.out
@ -33,10 +35,6 @@ event zeek_init()
event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
{
print "peer lost", msg;
system("touch lost");
if ( peers == 2 )
terminate();
}
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
@ -46,6 +44,20 @@ event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
Broker::publish(test_topic, my_event, peers);
}
event Broker::endpoint_discovered(endpoint: Broker::EndpointInfo, msg: string)
{
print "endpoint discovered", msg;
}
event Broker::endpoint_unreachable(endpoint: Broker::EndpointInfo, msg: string)
{
print "endpoint unreachable", msg;
system("touch lost");
if ( peers == 2 )
terminate();
}
@TEST-END-FILE
@ -77,4 +89,14 @@ event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
print "peer added", msg;
}
event Broker::endpoint_discovered(endpoint: Broker::EndpointInfo, msg: string)
{
print "endpoint discovered", msg;
}
event Broker::endpoint_unreachable(endpoint: Broker::EndpointInfo, msg: string)
{
print "endpoint unreachable", msg;
}
@TEST-END-FILE

View file

@ -1,3 +1,5 @@
# @TEST-GROUP: broker
#
# @TEST-EXEC: zeek -b send.zeek >send.out
# @TEST-EXEC: btest-diff send.out
#

View file

@ -1,3 +1,5 @@
# @TEST-GROUP: broker
#
# @TEST-EXEC: zeek -b %INPUT >out
# @TEST-EXEC: btest-diff out
# @TEST-EXEC: btest-diff .stderr

View file

@ -1,3 +1,5 @@
# @TEST-GROUP: broker
#
# @TEST-PORT: BROKER_PORT
#
# @TEST-EXEC: btest-bg-run recv "zeek -b ../recv.zeek >recv.out"

View file

@ -1,3 +1,5 @@
# @TEST-GROUP: broker
#
# @TEST-PORT: BROKER_PORT
#
# @TEST-EXEC: btest-bg-run recv "zeek -b ../recv.zeek >recv.out"

View file

@ -1,3 +1,5 @@
# @TEST-GROUP: broker
#
# @TEST-PORT: BROKER_PORT
#
# @TEST-EXEC: btest-bg-run recv "zeek -b ../recv.zeek >recv.out"

View file

@ -1,3 +1,5 @@
# @TEST-GROUP: broker
#
# @TEST-PORT: BROKER_PORT
#
# @TEST-EXEC: btest-bg-run recv "zeek -b ../recv.zeek >recv.out"

View file

@ -1,3 +1,5 @@
# @TEST-GROUP: broker
#
# @TEST-PORT: BROKER_PORT
#
# @TEST-EXEC: btest-bg-run recv "zeek -b ../recv.zeek >recv.out"

View file

@ -1,3 +1,5 @@
# @TEST-GROUP: broker
#
# @TEST-PORT: BROKER_PORT
#
# @TEST-EXEC: btest-bg-run recv "zeek -b ../recv.zeek >recv.out"

View file

@ -1,3 +1,5 @@
# @TEST-GROUP: broker
#
# @TEST-PORT: BROKER_PORT
# @TEST-EXEC: btest-bg-run recv "zeek -b ../recv.zeek >recv.out"

View file

@ -1,3 +1,5 @@
# @TEST-GROUP: broker
#
# @TEST-PORT: BROKER_PORT
# @TEST-EXEC: btest-bg-run recv "zeek -b ../recv.zeek >recv.out"

View file

@ -1,3 +1,5 @@
# @TEST-GROUP: broker
#
# @TEST-PORT: BROKER_PORT
# @TEST-EXEC: btest-bg-run recv "zeek -b ../recv.zeek >recv.out"

View file

@ -1,3 +1,5 @@
# @TEST-GROUP: broker
#
# @TEST-PORT: BROKER_PORT
# @TEST-EXEC: btest-bg-run recv "zeek -b ../recv.zeek >recv.out"

View file

@ -1,3 +1,5 @@
# @TEST-GROUP: broker
#
# @TEST-PORT: BROKER_PORT
#
# @TEST-EXEC: btest-bg-run listen "zeek -b %INPUT connect=F Broker::disable_ssl=T"

View file

@ -1,3 +1,5 @@
# @TEST-GROUP: broker
#
# @TEST-PORT: BROKER_PORT
#
# @TEST-EXEC: btest-bg-run recv "zeek -b ../recv.zeek >recv.out"

View file

@ -70,6 +70,9 @@ event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
@TEST-START-FILE clone.zeek
global has_node_up: bool = F;
global has_announce_masters: bool = F;
event dump_tables()
{
t["a"] = 5;
@ -95,6 +98,8 @@ event dump_tables()
event Cluster::node_up(name: string, id: string)
{
Reporter::info(fmt("Node Up: %s", name));
has_node_up = T;
if ( has_announce_masters )
event dump_tables();
}
@ -106,6 +111,9 @@ event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
event Broker::announce_masters(masters: set[string])
{
Reporter::info(fmt("Received announce_masters: %s", cat(masters)));
has_announce_masters = T;
if ( has_node_up )
event dump_tables();
}
@TEST-END-FILE

View file

@ -125,7 +125,12 @@ event dump_tables()
event check_all_set()
{
if ( "whatever" in t && "hi" in s && "b" in r )
# Note: 'a' gets inserted first into 'r'. However, we may still observe 'r'
# with 'b' but without 'a'. This may happen if the clone completes
# its handshake with the server after 'a' and 'b' are already in 'r'.
# In this case, the master sends a snapshot of its state and the
# insertion events for 'a' and 'b' they may trigger in any order.
if ( "whatever" in t && "hi" in s && "a" in r && "b" in r )
event dump_tables();
else
schedule 0.1sec { check_all_set() };

View file

@ -1,3 +1,5 @@
# @TEST-GROUP: broker
#
# @TEST-PORT: BROKER_PORT
#
# @TEST-EXEC: btest-bg-run recv "zeek -b ../recv.zeek >recv.out"