# @TEST-SERIALIZE: brokercomm # @TEST-REQUIRES: grep -q ENABLE_BROKER:BOOL=true $BUILD/CMakeCache.txt # @TEST-EXEC: btest-bg-run recv "bro -b ../common.bro ../recv.bro broker_port=$BROKER_PORT >recv.out" # @TEST-EXEC: btest-bg-run send "bro -b ../common.bro ../send.bro broker_port=$BROKER_PORT >send.out" # @TEST-EXEC: btest-bg-wait 20 # @TEST-EXEC: btest-diff recv/recv.out # @TEST-EXEC: btest-diff recv/test.log # @TEST-EXEC: btest-diff send/send.out # @TEST-EXEC: btest-diff send/test.log @TEST-START-FILE common.bro global quit_receiver: event(); global quit_sender: event(); module Test; export { redef enum Log::ID += { LOG }; type Info: record { msg: string &log; nolog: string &default="no"; num: count &log; }; } event bro_init() &priority=5 { Broker::enable(); Log::create_stream(Test::LOG, [$columns=Test::Info]); } @TEST-END-FILE @TEST-START-FILE recv.bro const broker_port: port &redef; redef exit_only_after_terminate = T; event bro_init() { Broker::subscribe_to_logs("bro/log/"); Broker::subscribe_to_events("bro/event/"); Broker::listen(broker_port, "127.0.0.1"); } event quit_receiver() { terminate(); } @TEST-END-FILE @TEST-START-FILE send.bro const broker_port: port &redef; redef exit_only_after_terminate = T; event bro_init() { Broker::enable_remote_logs(Test::LOG); Broker::publish_topic("bro/event/"); Broker::connect("127.0.0.1", broker_port, 1secs); } global n = 0; event do_write() { if ( n == 6 ) { local args = Broker::event_args(quit_receiver); Broker::send_event("bro/event/", args); schedule 1sec { quit_sender() }; } else { Log::write(Test::LOG, [$msg = "ping", $num = n]); ++n; event do_write(); } } event quit_sender() { terminate(); } event Broker::outgoing_connection_established(peer_address: string, peer_port: port, peer_name: string) { print "Broker::outgoing_connection_established", peer_address, peer_port; event do_write(); } event Broker::outgoing_connection_broken(peer_address: string, peer_port: port) { terminate(); } @TEST-END-FILE