diff --git a/aux/broker b/aux/broker index 425bab3bf4..ebc66f484a 160000 --- a/aux/broker +++ b/aux/broker @@ -1 +1 @@ -Subproject commit 425bab3bf420898d8dbd14280f94aee9d420f617 +Subproject commit ebc66f484af27a32dc5d91b1c985638847e35cf6 diff --git a/src/comm/Manager.cc b/src/comm/Manager.cc index ffe68970a8..bc0bc3f8a8 100644 --- a/src/comm/Manager.cc +++ b/src/comm/Manager.cc @@ -72,9 +72,9 @@ bool comm::Manager::InitPostScript() return true; } -bool comm::Manager::Listen(uint16_t port, const char* addr) +bool comm::Manager::Listen(uint16_t port, const char* addr, bool reuse_addr) { - auto rval = endpoint->listen(port, addr); + auto rval = endpoint->listen(port, addr, reuse_addr); if ( ! rval ) { diff --git a/src/comm/Manager.h b/src/comm/Manager.h index 3c1e80827b..5e3ec350b8 100644 --- a/src/comm/Manager.h +++ b/src/comm/Manager.h @@ -23,7 +23,8 @@ public: bool InitPostScript(); - bool Listen(uint16_t port, const char* addr = nullptr); + bool Listen(uint16_t port, const char* addr = nullptr, + bool reuse_addr = true); bool Connect(std::string addr, uint16_t port, std::chrono::duration retry_interval); diff --git a/src/comm/comm.bif b/src/comm/comm.bif index e1c2bc533f..ebe206d266 100644 --- a/src/comm/comm.bif +++ b/src/comm/comm.bif @@ -28,7 +28,8 @@ event Comm::remote_connection_broken%(peer_address: string, event Comm::remote_connection_incompatible%(peer_address: string, peer_port: port%); -function Comm::listen%(p: port, a: string &default = ""%): bool +function Comm::listen%(p: port, a: string &default = "", + reuse: bool &default = T%): bool %{ if ( ! p->IsTCP() ) { @@ -36,7 +37,8 @@ function Comm::listen%(p: port, a: string &default = ""%): bool return new Val(false, TYPE_BOOL); } - auto rval = comm_mgr->Listen(p->Port(), a->Len() ? a->CheckString() : 0); + auto rval = comm_mgr->Listen(p->Port(), a->Len() ? a->CheckString() : 0, + reuse); return new Val(rval, TYPE_BOOL); %} diff --git a/src/main.cc b/src/main.cc index a7099cb90b..5385ca7993 100644 --- a/src/main.cc +++ b/src/main.cc @@ -944,7 +944,7 @@ int main(int argc, char** argv) #ifdef ENABLE_BROKER comm_mgr->InitPostScript(); - iosource_mgr->Register(comm_mgr); + iosource_mgr->Register(comm_mgr, true); #endif #ifdef USE_PERFTOOLS_DEBUG diff --git a/testing/btest/Baseline/comm.remote_event/recv.recv.out b/testing/btest/Baseline/comm.remote_event/recv.recv.out new file mode 100644 index 0000000000..7dab0284ea --- /dev/null +++ b/testing/btest/Baseline/comm.remote_event/recv.recv.out @@ -0,0 +1,6 @@ +got event msg, ping, 0 +got event msg, ping, 1 +got event msg, ping, 2 +got event msg, ping, 3 +got event msg, ping, 4 +got event msg, ping, 5 diff --git a/testing/btest/Baseline/comm.remote_event/send.send.out b/testing/btest/Baseline/comm.remote_event/send.send.out new file mode 100644 index 0000000000..ef1f7bc7e1 --- /dev/null +++ b/testing/btest/Baseline/comm.remote_event/send.send.out @@ -0,0 +1,13 @@ +Comm::remote_connection_established, 127.0.0.1, 9999/tcp +got event msg, pong, 0 +got auto event msg, ping, 0 +got event msg, pong, 1 +got auto event msg, ping, 1 +got event msg, pong, 2 +got auto event msg, ping, 2 +got event msg, pong, 3 +got auto event msg, ping, 3 +got event msg, pong, 4 +got auto event msg, ping, 4 +got event msg, pong, 5 +got auto event msg, ping, 5 diff --git a/testing/btest/Baseline/comm.remote_log/recv.recv.out b/testing/btest/Baseline/comm.remote_log/recv.recv.out new file mode 100644 index 0000000000..3e0957442d --- /dev/null +++ b/testing/btest/Baseline/comm.remote_log/recv.recv.out @@ -0,0 +1,6 @@ +wrote log, [msg=ping, num=0] +wrote log, [msg=ping, num=1] +wrote log, [msg=ping, num=2] +wrote log, [msg=ping, num=3] +wrote log, [msg=ping, num=4] +wrote log, [msg=ping, num=5] diff --git a/testing/btest/Baseline/comm.remote_log/recv.test.log b/testing/btest/Baseline/comm.remote_log/recv.test.log new file mode 100644 index 0000000000..0d6dae756c --- /dev/null +++ b/testing/btest/Baseline/comm.remote_log/recv.test.log @@ -0,0 +1,15 @@ +#separator \x09 +#set_separator , +#empty_field (empty) +#unset_field - +#path test +#open 2015-01-26-22-47-11 +#fields msg num +#types string count +ping 0 +ping 1 +ping 2 +ping 3 +ping 4 +ping 5 +#close 2015-01-26-22-47-11 diff --git a/testing/btest/Baseline/comm.remote_log/send.send.out b/testing/btest/Baseline/comm.remote_log/send.send.out new file mode 100644 index 0000000000..0968e6beb9 --- /dev/null +++ b/testing/btest/Baseline/comm.remote_log/send.send.out @@ -0,0 +1 @@ +Comm::remote_connection_established, 127.0.0.1, 9999/tcp diff --git a/testing/btest/Baseline/comm.remote_log/send.test.log b/testing/btest/Baseline/comm.remote_log/send.test.log new file mode 100644 index 0000000000..0d6dae756c --- /dev/null +++ b/testing/btest/Baseline/comm.remote_log/send.test.log @@ -0,0 +1,15 @@ +#separator \x09 +#set_separator , +#empty_field (empty) +#unset_field - +#path test +#open 2015-01-26-22-47-11 +#fields msg num +#types string count +ping 0 +ping 1 +ping 2 +ping 3 +ping 4 +ping 5 +#close 2015-01-26-22-47-11 diff --git a/testing/btest/Baseline/comm.remote_print/recv.recv.out b/testing/btest/Baseline/comm.remote_print/recv.recv.out new file mode 100644 index 0000000000..6e5a37abbf --- /dev/null +++ b/testing/btest/Baseline/comm.remote_print/recv.recv.out @@ -0,0 +1,6 @@ +got print msg, ping 0 +got print msg, ping 1 +got print msg, ping 2 +got print msg, ping 3 +got print msg, ping 4 +got print msg, ping 5 diff --git a/testing/btest/Baseline/comm.remote_print/send.send.out b/testing/btest/Baseline/comm.remote_print/send.send.out new file mode 100644 index 0000000000..982ee993f6 --- /dev/null +++ b/testing/btest/Baseline/comm.remote_print/send.send.out @@ -0,0 +1,7 @@ +Comm::remote_connection_established, 127.0.0.1, 9999/tcp +got print msg, pong 0 +got print msg, pong 1 +got print msg, pong 2 +got print msg, pong 3 +got print msg, pong 4 +got print msg, pong 5 diff --git a/testing/btest/btest.cfg b/testing/btest/btest.cfg index 43f29d40a1..2eea514357 100644 --- a/testing/btest/btest.cfg +++ b/testing/btest/btest.cfg @@ -1,5 +1,5 @@ [btest] -TestDirs = doc bifs language core scripts istate coverage signatures plugins +TestDirs = doc bifs language core scripts istate coverage signatures plugins comm TmpDir = %(testbase)s/.tmp BaselineDir = %(testbase)s/Baseline IgnoreDirs = .svn CVS .tmp diff --git a/testing/btest/comm/remote_event.test b/testing/btest/comm/remote_event.test new file mode 100644 index 0000000000..9ab9a6b224 --- /dev/null +++ b/testing/btest/comm/remote_event.test @@ -0,0 +1,100 @@ +# @TEST_SERIALIZE: brokercomm +# @TEST_REQUIRES: grep -q ENABLE_BROKER $BUILD/CMakeCache.txt + +# @TEST-EXEC: btest-bg-run recv "bro -b ../recv.bro >recv.out" +# @TEST-EXEC: btest-bg-run send "bro -b ../send.bro >send.out" + +# @TEST-EXEC: btest-bg-wait 20 +# @TEST-EXEC: btest-diff recv/recv.out +# @TEST-EXEC: btest-diff send/send.out + +@TEST-START-FILE recv.bro + +redef exit_only_after_terminate = T; + +global event_handler: event(msg: string, c: count); +global auto_event_handler: event(msg: string, c: count); + +event bro_init() + { + Comm::listen(9999/tcp, "127.0.0.1"); + Comm::subscribe_to_events("bro/event/"); + Comm::auto_event("bro/event/my_topic", auto_event_handler); + } + +global event_count = 0; + +event event_handler(msg: string, n: count) + { + event auto_event_handler(msg, n); + print "got event msg", msg, n; + local args = Comm::event_args(event_handler, "pong", event_count); + Comm::event("bro/event/my_topic", args); + ++event_count; + + if ( n == 5 ) + terminate(); + } + +@TEST-END-FILE + +@TEST-START-FILE send.bro + +redef exit_only_after_terminate = T; + +global event_handler: event(msg: string, c: count); +global auto_event_handler: event(msg: string, c: count); + +event bro_init() + { + Comm::subscribe_to_events("bro/event/my_topic"); + Comm::connect("127.0.0.1", 9999/tcp, 1secs); + } + +global event_count = 0; + +event Comm::remote_connection_established(peer_address: string, + peer_port: port, + peer_name: string) + { + print "Comm::remote_connection_established", peer_address, peer_port; + local args = Comm::event_args(event_handler, "ping", event_count); + Comm::event("bro/event/hi", args); + ++event_count; + } + +global done = F; +global done_auto = F; + +function check_terminate() + { + if ( done && done_auto ) + terminate(); + } + +event event_handler(msg: string, n: count) + { + print "got event msg", msg, n; + local args = Comm::event_args(event_handler, "ping", event_count); + Comm::event("bro/event/hi", args); + ++event_count; + + if ( n == 5 ) + { + done = T; + check_terminate(); + } + } + +event auto_event_handler(msg: string, n: count) + { + print "got auto event msg", msg, n; + + if ( n == 5 ) + { + done_auto = T; + check_terminate(); + } + } + +@TEST-END-FILE diff --git a/testing/btest/comm/remote_log.test b/testing/btest/comm/remote_log.test new file mode 100644 index 0000000000..aea88cdc25 --- /dev/null +++ b/testing/btest/comm/remote_log.test @@ -0,0 +1,87 @@ +# @TEST_SERIALIZE: brokercomm +# @TEST_REQUIRES: grep -q ENABLE_BROKER $BUILD/CMakeCache.txt + +# @TEST-EXEC: btest-bg-run recv "bro -b ../common.bro ../recv.bro >recv.out" +# @TEST-EXEC: btest-bg-run send "bro -b ../common.bro ../send.bro >send.out" + +# @TEST-EXEC: btest-bg-wait 20 +# @TEST-EXEC: btest-diff recv/recv.out +# @TEST-EXEC: btest-diff recv/test.log +# @TEST-EXEC: btest-diff send/send.out +# @TEST-EXEC: btest-diff send/test.log + +@TEST-START-FILE common.bro + +module Test; + +export { + redef enum Log::ID += { LOG }; + + type Info: record { + msg: string &log; + num: count &log; + }; + + global log_test: event(rec: Test::Info); + + event bro_init() &priority=5 + { + Log::create_stream(Test::LOG, [$columns=Test::Info, $ev=log_test]); + } +} + +@TEST-END-FILE + +@TEST-START-FILE recv.bro + +redef exit_only_after_terminate = T; + +event bro_init() + { + Comm::listen(9999/tcp, "127.0.0.1"); + Comm::subscribe_to_logs("bro/log/"); + } + +event Test::log_test(rec: Test::Info) + { + print "wrote log", rec; + + if ( rec$num == 5 ) + terminate(); + } + +@TEST-END-FILE + +@TEST-START-FILE send.bro + +redef exit_only_after_terminate = T; + +event bro_init() + { + Comm::enable_remote_logs(Test::LOG); + Comm::connect("127.0.0.1", 9999/tcp, 1secs); + } + +global n = 0; + +event do_write() + { + if ( n == 6 ) + terminate(); + else + { + Log::write(Test::LOG, [$msg = "ping", $num = n]); + ++n; + event do_write(); + } + } + +event Comm::remote_connection_established(peer_address: string, + peer_port: port, + peer_name: string) + { + print "Comm::remote_connection_established", peer_address, peer_port; + event do_write(); + } + +@TEST-END-FILE diff --git a/testing/btest/comm/remote_print.test b/testing/btest/comm/remote_print.test new file mode 100644 index 0000000000..48dfd98bed --- /dev/null +++ b/testing/btest/comm/remote_print.test @@ -0,0 +1,66 @@ +# @TEST_SERIALIZE: brokercomm +# @TEST_REQUIRES: grep -q ENABLE_BROKER $BUILD/CMakeCache.txt + +# @TEST-EXEC: btest-bg-run recv "bro -b ../recv.bro >recv.out" +# @TEST-EXEC: btest-bg-run send "bro -b ../send.bro >send.out" + +# @TEST-EXEC: btest-bg-wait 20 +# @TEST-EXEC: btest-diff recv/recv.out +# @TEST-EXEC: btest-diff send/send.out + +@TEST-START-FILE recv.bro + +redef exit_only_after_terminate = T; + +event bro_init() + { + Comm::listen(9999/tcp, "127.0.0.1"); + Comm::subscribe_to_prints("bro/print/"); + } + +global n = 0; + +event Comm::print_handler(msg: string) + { + print "got print msg", msg; + Comm::print("bro/print/my_topic", fmt("pong %d", n)); + ++n; + + if ( msg == "ping 5" ) + terminate(); + } + +@TEST-END-FILE + +@TEST-START-FILE send.bro + +redef exit_only_after_terminate = T; + +event bro_init() + { + Comm::subscribe_to_prints("bro/print/my_topic"); + Comm::connect("127.0.0.1", 9999/tcp, 1secs); + } + +global n = 0; + +event Comm::remote_connection_established(peer_address: string, + peer_port: port, + peer_name: string) + { + print "Comm::remote_connection_established", peer_address, peer_port; + Comm::print("bro/print/hi", fmt("ping %d", n)); + ++n; + } + +event Comm::print_handler(msg: string) + { + print "got print msg", msg; + Comm::print("bro/print/hi", fmt("ping %d", n)); + ++n; + + if ( msg == "pong 5" ) + terminate(); + } + +@TEST-END-FILE