diff --git a/aux/broker b/aux/broker index b0d97b1fcb..4fae86cd67 160000 --- a/aux/broker +++ b/aux/broker @@ -1 +1 @@ -Subproject commit b0d97b1fcbdcb9027bd34031c8706be0c0ab315b +Subproject commit 4fae86cd67b999f48a2f2f354c91e4b1b343b2a1 diff --git a/src/comm/Manager.cc b/src/comm/Manager.cc index 7db80ebb40..1dc7cc5415 100644 --- a/src/comm/Manager.cc +++ b/src/comm/Manager.cc @@ -8,6 +8,7 @@ #include "util.h" #include "Var.h" #include "Reporter.h" +#include "comm/comm.bif.h" #include "comm/data.bif.h" #include "comm/messaging.bif.h" #include "comm/store.bif.h" @@ -444,7 +445,8 @@ int comm::Manager::GetFlags(Val* flags) void comm::Manager::GetFds(iosource::FD_Set* read, iosource::FD_Set* write, iosource::FD_Set* except) { - read->Insert(endpoint->peer_status().fd()); + read->Insert(endpoint->outgoing_connection_status().fd()); + read->Insert(endpoint->incoming_connection_status().fd()); for ( const auto& ps : print_subscriptions ) read->Insert(ps.second.fd()); @@ -523,57 +525,85 @@ static RecordVal* response_to_val(broker::store::response r) void comm::Manager::Process() { bool idle = true; - auto peer_status_updates = endpoint->peer_status().want_pop(); + auto outgoing_connection_updates = + endpoint->outgoing_connection_status().want_pop(); + auto incoming_connection_updates = + endpoint->incoming_connection_status().want_pop(); - if ( ! peer_status_updates.empty() ) + for ( auto& u : outgoing_connection_updates ) + { idle = false; - for ( auto& u : peer_status_updates ) - { - if ( ! u.relation.remote() ) - continue; - switch ( u.status ) { - case broker::peer_status::tag::established: - if ( Comm::remote_connection_established ) + case broker::outgoing_connection_status::tag::established: + if ( Comm::outgoing_connection_established ) { val_list* vl = new val_list; vl->append(new StringVal(u.relation.remote_tuple().first)); vl->append(new PortVal(u.relation.remote_tuple().second, TRANSPORT_TCP)); vl->append(new StringVal(u.peer_name)); - mgr.QueueEvent(Comm::remote_connection_established, vl); + mgr.QueueEvent(Comm::outgoing_connection_established, vl); } - break; - case broker::peer_status::tag::disconnected: - if ( Comm::remote_connection_broken ) + case broker::outgoing_connection_status::tag::disconnected: + if ( Comm::outgoing_connection_broken ) { val_list* vl = new val_list; vl->append(new StringVal(u.relation.remote_tuple().first)); vl->append(new PortVal(u.relation.remote_tuple().second, TRANSPORT_TCP)); - mgr.QueueEvent(Comm::remote_connection_broken, vl); + mgr.QueueEvent(Comm::outgoing_connection_broken, vl); } - break; - case broker::peer_status::tag::incompatible: - if ( Comm::remote_connection_incompatible ) + case broker::outgoing_connection_status::tag::incompatible: + if ( Comm::outgoing_connection_incompatible ) { val_list* vl = new val_list; vl->append(new StringVal(u.relation.remote_tuple().first)); vl->append(new PortVal(u.relation.remote_tuple().second, TRANSPORT_TCP)); - mgr.QueueEvent(Comm::remote_connection_incompatible, vl); + mgr.QueueEvent(Comm::outgoing_connection_incompatible, vl); } - break; default: - reporter->InternalWarning("unknown broker::peer_status::tag : %d", - static_cast(u.status)); + reporter->InternalWarning( + "unknown broker::outgoing_connection_status::tag : %d", + static_cast(u.status)); + break; + } + } + + for ( auto& u : incoming_connection_updates ) + { + idle = false; + + switch ( u.status ) { + case broker::incoming_connection_status::tag::established: + if ( Comm::incoming_connection_established ) + { + val_list* vl = new val_list; + vl->append(new StringVal(u.peer_name)); + mgr.QueueEvent(Comm::incoming_connection_established, vl); + } + break; + + case broker::incoming_connection_status::tag::disconnected: + if ( Comm::incoming_connection_broken ) + { + val_list* vl = new val_list; + vl->append(new StringVal(u.peer_name)); + mgr.QueueEvent(Comm::incoming_connection_broken, vl); + } + break; + + default: + reporter->InternalWarning( + "unknown broker::incoming_connection_status::tag : %d", + static_cast(u.status)); break; } } diff --git a/src/comm/comm.bif b/src/comm/comm.bif index 7f8d85b720..e87c6c1144 100644 --- a/src/comm/comm.bif +++ b/src/comm/comm.bif @@ -11,3 +11,56 @@ function Comm::enable%(%): bool %{ return new Val(comm_mgr->Enable(), TYPE_BOOL); %} + +event Comm::outgoing_connection_established%(peer_address: string, + peer_port: port, + peer_name: string%); + +event Comm::outgoing_connection_broken%(peer_address: string, + peer_port: port%); + +event Comm::outgoing_connection_incompatible%(peer_address: string, + peer_port: port%); + +event Comm::incoming_connection_established%(peer_name: string%); + +event Comm::incoming_connection_broken%(peer_name: string%); + +function Comm::listen%(p: port, a: string &default = "", + reuse: bool &default = T%): bool + %{ + if ( ! p->IsTCP() ) + { + reporter->Error("listen port must use tcp"); + return new Val(false, TYPE_BOOL); + } + + auto rval = comm_mgr->Listen(p->Port(), a->Len() ? a->CheckString() : 0, + reuse); + return new Val(rval, TYPE_BOOL); + %} + +function Comm::connect%(a: string, p: port, retry: interval%): bool + %{ + if ( ! p->IsTCP() ) + { + reporter->Error("remote connection port must use tcp"); + return new Val(false, TYPE_BOOL); + } + + auto rval = comm_mgr->Connect(a->CheckString(), p->Port(), + std::chrono::duration(retry)); + return new Val(rval, TYPE_BOOL); + %} + +function Comm::disconnect%(a: string, p: port%): bool + %{ + if ( ! p->IsTCP() ) + { + reporter->Error("remote connection port must use tcp"); + return new Val(false, TYPE_BOOL); + } + + auto rval = comm_mgr->Disconnect(a->CheckString(), p->Port()); + return new Val(rval, TYPE_BOOL); + %} diff --git a/src/comm/messaging.bif b/src/comm/messaging.bif index f5034f842f..26f9497449 100644 --- a/src/comm/messaging.bif +++ b/src/comm/messaging.bif @@ -12,55 +12,6 @@ type Comm::SendFlags: record; type Comm::EventArgs: record; -event Comm::remote_connection_established%(peer_address: string, - peer_port: port, - peer_name: string%); - -event Comm::remote_connection_broken%(peer_address: string, - peer_port: port%); - -event Comm::remote_connection_incompatible%(peer_address: string, - peer_port: port%); - -function Comm::listen%(p: port, a: string &default = "", - reuse: bool &default = T%): bool - %{ - if ( ! p->IsTCP() ) - { - reporter->Error("listen port must use tcp"); - return new Val(false, TYPE_BOOL); - } - - auto rval = comm_mgr->Listen(p->Port(), a->Len() ? a->CheckString() : 0, - reuse); - return new Val(rval, TYPE_BOOL); - %} - -function Comm::connect%(a: string, p: port, retry: interval%): bool - %{ - if ( ! p->IsTCP() ) - { - reporter->Error("remote connection port must use tcp"); - return new Val(false, TYPE_BOOL); - } - - auto rval = comm_mgr->Connect(a->CheckString(), p->Port(), - std::chrono::duration(retry)); - return new Val(rval, TYPE_BOOL); - %} - -function Comm::disconnect%(a: string, p: port%): bool - %{ - if ( ! p->IsTCP() ) - { - reporter->Error("remote connection port must use tcp"); - return new Val(false, TYPE_BOOL); - } - - auto rval = comm_mgr->Disconnect(a->CheckString(), p->Port()); - return new Val(rval, TYPE_BOOL); - %} - event Comm::print_handler%(msg: string%); function Comm::print%(topic: string, msg: string, diff --git a/testing/btest/Baseline/comm.connection_updates/recv.recv.out b/testing/btest/Baseline/comm.connection_updates/recv.recv.out new file mode 100644 index 0000000000..3f2a1a9670 --- /dev/null +++ b/testing/btest/Baseline/comm.connection_updates/recv.recv.out @@ -0,0 +1,2 @@ +Comm::incoming_connection_established, connector +Comm::incoming_connection_broken, connector diff --git a/testing/btest/Baseline/comm.connection_updates/send.send.out b/testing/btest/Baseline/comm.connection_updates/send.send.out new file mode 100644 index 0000000000..e23422e320 --- /dev/null +++ b/testing/btest/Baseline/comm.connection_updates/send.send.out @@ -0,0 +1 @@ +Comm::outgoing_connection_established, 127.0.0.1, 9999/tcp, listener diff --git a/testing/btest/Baseline/comm.remote_event/send.send.out b/testing/btest/Baseline/comm.remote_event/send.send.out index ef1f7bc7e1..9fbb21f245 100644 --- a/testing/btest/Baseline/comm.remote_event/send.send.out +++ b/testing/btest/Baseline/comm.remote_event/send.send.out @@ -1,4 +1,4 @@ -Comm::remote_connection_established, 127.0.0.1, 9999/tcp +Comm::outgoing_connection_established, 127.0.0.1, 9999/tcp got event msg, pong, 0 got auto event msg, ping, 0 got event msg, pong, 1 diff --git a/testing/btest/Baseline/comm.remote_log/send.send.out b/testing/btest/Baseline/comm.remote_log/send.send.out index 0968e6beb9..e2415290d6 100644 --- a/testing/btest/Baseline/comm.remote_log/send.send.out +++ b/testing/btest/Baseline/comm.remote_log/send.send.out @@ -1 +1 @@ -Comm::remote_connection_established, 127.0.0.1, 9999/tcp +Comm::outgoing_connection_established, 127.0.0.1, 9999/tcp diff --git a/testing/btest/Baseline/comm.remote_print/send.send.out b/testing/btest/Baseline/comm.remote_print/send.send.out index 982ee993f6..fc5996194d 100644 --- a/testing/btest/Baseline/comm.remote_print/send.send.out +++ b/testing/btest/Baseline/comm.remote_print/send.send.out @@ -1,4 +1,4 @@ -Comm::remote_connection_established, 127.0.0.1, 9999/tcp +Comm::outgoing_connection_established, 127.0.0.1, 9999/tcp got print msg, pong 0 got print msg, pong 1 got print msg, pong 2 diff --git a/testing/btest/comm/clone_store.bro b/testing/btest/comm/clone_store.bro index 3ea0347024..7a8ccb3a56 100644 --- a/testing/btest/comm/clone_store.bro +++ b/testing/btest/comm/clone_store.bro @@ -81,9 +81,9 @@ event done() terminate(); } -event Comm::remote_connection_established(peer_address: string, - peer_port: port, - peer_name: string) +event Comm::outgoing_connection_established(peer_address: string, + peer_port: port, + peer_name: string) { local myset: set[string] = {"a", "b", "c"}; local myvec: vector of string = {"alpha", "beta", "gamma"}; diff --git a/testing/btest/comm/connection_updates.bro b/testing/btest/comm/connection_updates.bro new file mode 100644 index 0000000000..a1e8c517d2 --- /dev/null +++ b/testing/btest/comm/connection_updates.bro @@ -0,0 +1,55 @@ +# @TEST_SERIALIZE: brokercomm +# @TEST_REQUIRES: grep -q ENABLE_BROKER $BUILD/CMakeCache.txt + +# @TEST-EXEC: btest-bg-run recv "bro -b ../recv.bro >recv.out" +# @TEST-EXEC: btest-bg-run send "bro -b ../send.bro >send.out" + +# @TEST-EXEC: btest-bg-wait 20 +# @TEST-EXEC: btest-diff recv/recv.out +# @TEST-EXEC: btest-diff send/send.out + +@TEST-START-FILE recv.bro + +redef exit_only_after_terminate = T; +redef Comm::endpoint_name = "listener"; + +event bro_init() + { + Comm::enable(); + Comm::listen(9999/tcp, "127.0.0.1"); + } + +event Comm::incoming_connection_established(peer_name: string) + { + print "Comm::incoming_connection_established", peer_name;; + } + +event Comm::incoming_connection_broken(peer_name: string) + { + print "Comm::incoming_connection_broken", peer_name;; + terminate(); + } + +@TEST-END-FILE + +@TEST-START-FILE send.bro + +redef exit_only_after_terminate = T; +redef Comm::endpoint_name = "connector"; + +event bro_init() + { + Comm::enable(); + Comm::connect("127.0.0.1", 9999/tcp, 1sec); + } + +event Comm::outgoing_connection_established(peer_address: string, + peer_port: port, + peer_name: string) + { + print "Comm::outgoing_connection_established", + peer_address, peer_port, peer_name;; + terminate(); + } + +@TEST-END-FILE diff --git a/testing/btest/comm/remote_event.test b/testing/btest/comm/remote_event.test index f44ed0df10..fc34ad79ec 100644 --- a/testing/btest/comm/remote_event.test +++ b/testing/btest/comm/remote_event.test @@ -55,11 +55,11 @@ event bro_init() global event_count = 0; -event Comm::remote_connection_established(peer_address: string, - peer_port: port, - peer_name: string) +event Comm::outgoing_connection_established(peer_address: string, + peer_port: port, + peer_name: string) { - print "Comm::remote_connection_established", peer_address, peer_port; + print "Comm::outgoing_connection_established", peer_address, peer_port; local args = Comm::event_args(event_handler, "ping", event_count); Comm::event("bro/event/hi", args); ++event_count; diff --git a/testing/btest/comm/remote_log.test b/testing/btest/comm/remote_log.test index 7cdc2ab97d..47227e2fba 100644 --- a/testing/btest/comm/remote_log.test +++ b/testing/btest/comm/remote_log.test @@ -77,11 +77,11 @@ event do_write() } } -event Comm::remote_connection_established(peer_address: string, - peer_port: port, - peer_name: string) +event Comm::outgoing_connection_established(peer_address: string, + peer_port: port, + peer_name: string) { - print "Comm::remote_connection_established", peer_address, peer_port; + print "Comm::outgoing_connection_established", peer_address, peer_port; event do_write(); } diff --git a/testing/btest/comm/remote_print.test b/testing/btest/comm/remote_print.test index 03e7517f20..28e5bccc95 100644 --- a/testing/btest/comm/remote_print.test +++ b/testing/btest/comm/remote_print.test @@ -46,11 +46,11 @@ event bro_init() global n = 0; -event Comm::remote_connection_established(peer_address: string, - peer_port: port, - peer_name: string) +event Comm::outgoing_connection_established(peer_address: string, + peer_port: port, + peer_name: string) { - print "Comm::remote_connection_established", peer_address, peer_port; + print "Comm::outgoing_connection_established", peer_address, peer_port; Comm::print("bro/print/hi", fmt("ping %d", n)); ++n; }