broker integration: add events for incoming connection status updates

e.g. for the listen() side of connections to tell when peers have
connected or disconnected.
This commit is contained in:
Jon Siwek 2015-02-09 15:48:42 -06:00
parent 0253f49a94
commit afc5767165
14 changed files with 182 additions and 90 deletions

View file

@ -8,6 +8,7 @@
#include "util.h"
#include "Var.h"
#include "Reporter.h"
#include "comm/comm.bif.h"
#include "comm/data.bif.h"
#include "comm/messaging.bif.h"
#include "comm/store.bif.h"
@ -444,7 +445,8 @@ int comm::Manager::GetFlags(Val* flags)
void comm::Manager::GetFds(iosource::FD_Set* read, iosource::FD_Set* write,
iosource::FD_Set* except)
{
read->Insert(endpoint->peer_status().fd());
read->Insert(endpoint->outgoing_connection_status().fd());
read->Insert(endpoint->incoming_connection_status().fd());
for ( const auto& ps : print_subscriptions )
read->Insert(ps.second.fd());
@ -523,57 +525,85 @@ static RecordVal* response_to_val(broker::store::response r)
void comm::Manager::Process()
{
bool idle = true;
auto peer_status_updates = endpoint->peer_status().want_pop();
auto outgoing_connection_updates =
endpoint->outgoing_connection_status().want_pop();
auto incoming_connection_updates =
endpoint->incoming_connection_status().want_pop();
if ( ! peer_status_updates.empty() )
for ( auto& u : outgoing_connection_updates )
{
idle = false;
for ( auto& u : peer_status_updates )
{
if ( ! u.relation.remote() )
continue;
switch ( u.status ) {
case broker::peer_status::tag::established:
if ( Comm::remote_connection_established )
case broker::outgoing_connection_status::tag::established:
if ( Comm::outgoing_connection_established )
{
val_list* vl = new val_list;
vl->append(new StringVal(u.relation.remote_tuple().first));
vl->append(new PortVal(u.relation.remote_tuple().second,
TRANSPORT_TCP));
vl->append(new StringVal(u.peer_name));
mgr.QueueEvent(Comm::remote_connection_established, vl);
mgr.QueueEvent(Comm::outgoing_connection_established, vl);
}
break;
case broker::peer_status::tag::disconnected:
if ( Comm::remote_connection_broken )
case broker::outgoing_connection_status::tag::disconnected:
if ( Comm::outgoing_connection_broken )
{
val_list* vl = new val_list;
vl->append(new StringVal(u.relation.remote_tuple().first));
vl->append(new PortVal(u.relation.remote_tuple().second,
TRANSPORT_TCP));
mgr.QueueEvent(Comm::remote_connection_broken, vl);
mgr.QueueEvent(Comm::outgoing_connection_broken, vl);
}
break;
case broker::peer_status::tag::incompatible:
if ( Comm::remote_connection_incompatible )
case broker::outgoing_connection_status::tag::incompatible:
if ( Comm::outgoing_connection_incompatible )
{
val_list* vl = new val_list;
vl->append(new StringVal(u.relation.remote_tuple().first));
vl->append(new PortVal(u.relation.remote_tuple().second,
TRANSPORT_TCP));
mgr.QueueEvent(Comm::remote_connection_incompatible, vl);
mgr.QueueEvent(Comm::outgoing_connection_incompatible, vl);
}
break;
default:
reporter->InternalWarning("unknown broker::peer_status::tag : %d",
static_cast<int>(u.status));
reporter->InternalWarning(
"unknown broker::outgoing_connection_status::tag : %d",
static_cast<int>(u.status));
break;
}
}
for ( auto& u : incoming_connection_updates )
{
idle = false;
switch ( u.status ) {
case broker::incoming_connection_status::tag::established:
if ( Comm::incoming_connection_established )
{
val_list* vl = new val_list;
vl->append(new StringVal(u.peer_name));
mgr.QueueEvent(Comm::incoming_connection_established, vl);
}
break;
case broker::incoming_connection_status::tag::disconnected:
if ( Comm::incoming_connection_broken )
{
val_list* vl = new val_list;
vl->append(new StringVal(u.peer_name));
mgr.QueueEvent(Comm::incoming_connection_broken, vl);
}
break;
default:
reporter->InternalWarning(
"unknown broker::incoming_connection_status::tag : %d",
static_cast<int>(u.status));
break;
}
}

View file

@ -11,3 +11,56 @@ function Comm::enable%(%): bool
%{
return new Val(comm_mgr->Enable(), TYPE_BOOL);
%}
event Comm::outgoing_connection_established%(peer_address: string,
peer_port: port,
peer_name: string%);
event Comm::outgoing_connection_broken%(peer_address: string,
peer_port: port%);
event Comm::outgoing_connection_incompatible%(peer_address: string,
peer_port: port%);
event Comm::incoming_connection_established%(peer_name: string%);
event Comm::incoming_connection_broken%(peer_name: string%);
function Comm::listen%(p: port, a: string &default = "",
reuse: bool &default = T%): bool
%{
if ( ! p->IsTCP() )
{
reporter->Error("listen port must use tcp");
return new Val(false, TYPE_BOOL);
}
auto rval = comm_mgr->Listen(p->Port(), a->Len() ? a->CheckString() : 0,
reuse);
return new Val(rval, TYPE_BOOL);
%}
function Comm::connect%(a: string, p: port, retry: interval%): bool
%{
if ( ! p->IsTCP() )
{
reporter->Error("remote connection port must use tcp");
return new Val(false, TYPE_BOOL);
}
auto rval = comm_mgr->Connect(a->CheckString(), p->Port(),
std::chrono::duration<double>(retry));
return new Val(rval, TYPE_BOOL);
%}
function Comm::disconnect%(a: string, p: port%): bool
%{
if ( ! p->IsTCP() )
{
reporter->Error("remote connection port must use tcp");
return new Val(false, TYPE_BOOL);
}
auto rval = comm_mgr->Disconnect(a->CheckString(), p->Port());
return new Val(rval, TYPE_BOOL);
%}

View file

@ -12,55 +12,6 @@ type Comm::SendFlags: record;
type Comm::EventArgs: record;
event Comm::remote_connection_established%(peer_address: string,
peer_port: port,
peer_name: string%);
event Comm::remote_connection_broken%(peer_address: string,
peer_port: port%);
event Comm::remote_connection_incompatible%(peer_address: string,
peer_port: port%);
function Comm::listen%(p: port, a: string &default = "",
reuse: bool &default = T%): bool
%{
if ( ! p->IsTCP() )
{
reporter->Error("listen port must use tcp");
return new Val(false, TYPE_BOOL);
}
auto rval = comm_mgr->Listen(p->Port(), a->Len() ? a->CheckString() : 0,
reuse);
return new Val(rval, TYPE_BOOL);
%}
function Comm::connect%(a: string, p: port, retry: interval%): bool
%{
if ( ! p->IsTCP() )
{
reporter->Error("remote connection port must use tcp");
return new Val(false, TYPE_BOOL);
}
auto rval = comm_mgr->Connect(a->CheckString(), p->Port(),
std::chrono::duration<double>(retry));
return new Val(rval, TYPE_BOOL);
%}
function Comm::disconnect%(a: string, p: port%): bool
%{
if ( ! p->IsTCP() )
{
reporter->Error("remote connection port must use tcp");
return new Val(false, TYPE_BOOL);
}
auto rval = comm_mgr->Disconnect(a->CheckString(), p->Port());
return new Val(rval, TYPE_BOOL);
%}
event Comm::print_handler%(msg: string%);
function Comm::print%(topic: string, msg: string,