mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 06:38:20 +00:00
btest/bifs/hll_cluster: Use ZeroMQ
This commit is contained in:
parent
a501c35d8d
commit
02fd89fd16
1 changed files with 28 additions and 12 deletions
|
@ -1,10 +1,11 @@
|
|||
# @TEST-PORT: BROKER_MANAGER_PORT
|
||||
# @TEST-PORT: BROKER_WORKER1_PORT
|
||||
# @TEST-PORT: BROKER_WORKER2_PORT
|
||||
# @TEST-PORT: XPUB_PORT
|
||||
# @TEST-PORT: XSUB_PORT
|
||||
# @TEST-PORT: LOG_PULL_PORT
|
||||
#
|
||||
# @TEST-EXEC: cp $FILES/broker/cluster-layout.zeek .
|
||||
# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-no-logger.zeek cluster-layout.zeek
|
||||
# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek
|
||||
#
|
||||
# @TEST-EXEC: zeek -b %INPUT>out
|
||||
# @TEST-EXEC: zeek --parse-only -b %INPUT>out
|
||||
# @TEST-EXEC: btest-bg-run manager ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=manager zeek -b %INPUT
|
||||
# @TEST-EXEC: btest-bg-run worker-1 ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=worker-1 zeek -b %INPUT runnumber=1
|
||||
# @TEST-EXEC: btest-bg-run worker-2 ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=worker-2 zeek -b %INPUT runnumber=2
|
||||
|
@ -14,23 +15,30 @@
|
|||
# @TEST-EXEC: btest-diff worker-1/.stdout
|
||||
# @TEST-EXEC: btest-diff worker-2/.stdout
|
||||
|
||||
@load base/frameworks/cluster
|
||||
@load zeromq-test-bootstrap
|
||||
|
||||
redef Log::default_rotation_interval = 0secs;
|
||||
|
||||
global hll_data: event(data: opaque of cardinality);
|
||||
global hll_data: event(data: opaque of cardinality) &is_used;
|
||||
|
||||
global do_terminate: event() &is_used;
|
||||
|
||||
@if ( Cluster::local_node_type() == Cluster::WORKER )
|
||||
|
||||
global runnumber: count &redef; # differentiate runs
|
||||
|
||||
event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
|
||||
event do_terminate()
|
||||
{
|
||||
terminate();
|
||||
}
|
||||
|
||||
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
|
||||
event Cluster::node_up(name: string, id: string)
|
||||
{
|
||||
# Workers see each other, but we only want
|
||||
# to publish to the manager.
|
||||
if ( name != "manager" )
|
||||
return;
|
||||
|
||||
local c = hll_cardinality_init(0.01, 0.95);
|
||||
|
||||
local add1 = 2001;
|
||||
|
@ -73,9 +81,8 @@ event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
|
|||
print hll_cardinality_estimate(c);
|
||||
}
|
||||
|
||||
Broker::publish(Cluster::manager_topic, hll_data, c);
|
||||
Cluster::publish(Cluster::manager_topic, hll_data, c);
|
||||
}
|
||||
|
||||
@endif
|
||||
|
||||
@if ( Cluster::local_node_type() == Cluster::MANAGER )
|
||||
|
@ -97,8 +104,17 @@ event hll_data(data: opaque of cardinality)
|
|||
{
|
||||
print "This value should be about 21:";
|
||||
print hll_cardinality_estimate(hll);
|
||||
terminate();
|
||||
Cluster::publish(Cluster::worker_topic, do_terminate);
|
||||
}
|
||||
}
|
||||
|
||||
# The manager runs the XPUB/XSUB proxy, so it waits until
|
||||
# the workers are gone before shutting down.
|
||||
global nodes_down = 0;
|
||||
event Cluster::node_down(name: string, id: string)
|
||||
{
|
||||
++nodes_down;
|
||||
if ( nodes_down == 2 )
|
||||
terminate();
|
||||
}
|
||||
@endif
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue