Add support to Bro for connecting with peers over IPv6.

- Communication::listen_ipv6 needs to be redef'd to true in order
  for IPv6 listening sockets to be opened.

- Added Communication::listen_retry option as an interval at which
  to retry binding to socket addresses that were already in use.

- Added some explicit baselines to check in the istate.events
  and istate.events-ssl tests -- the SSL test was incorrectly
  passing because it compared two empty files.  (The files being
  empty because "http/base" was given as an argument to Bro which
  it couldn't handle because that script doesn't exist anymore).
This commit is contained in:
Jon Siwek 2012-05-09 15:08:36 -05:00
parent ed9801db98
commit 1e66fe905a
20 changed files with 480 additions and 174 deletions

View file

@ -2,6 +2,7 @@
##! and/or transfer events. ##! and/or transfer events.
@load base/frameworks/packet-filter @load base/frameworks/packet-filter
@load base/utils/addrs
module Communication; module Communication;
@ -10,7 +11,7 @@ export {
## The communication logging stream identifier. ## The communication logging stream identifier.
redef enum Log::ID += { LOG }; redef enum Log::ID += { LOG };
## Which interface to listen on (0.0.0.0 for any interface). ## Which interface to listen on (``0.0.0.0`` or ``[::]`` are wildcards).
const listen_interface = 0.0.0.0 &redef; const listen_interface = 0.0.0.0 &redef;
## Which port to listen on. ## Which port to listen on.
@ -19,6 +20,14 @@ export {
## This defines if a listening socket should use SSL. ## This defines if a listening socket should use SSL.
const listen_ssl = F &redef; const listen_ssl = F &redef;
## Defines if a listening socket can bind to IPv6 addresses.
const listen_ipv6 = F &redef;
## Defines the interval at which to retry binding to
## :bro:id:`listen_interface` on :bro:id:`listen_port` if it's already in
## use.
const listen_retry = 30 secs &redef;
## Default compression level. Compression level is 0-9, with 0 = no ## Default compression level. Compression level is 0-9, with 0 = no
## compression. ## compression.
global compression_level = 0 &redef; global compression_level = 0 &redef;
@ -160,7 +169,7 @@ event remote_log(level: count, src: count, msg: string)
# This is a core generated event. # This is a core generated event.
event remote_log_peer(p: event_peer, level: count, src: count, msg: string) event remote_log_peer(p: event_peer, level: count, src: count, msg: string)
{ {
local rmsg = fmt("[#%d/%s:%d] %s", p$id, p$host, p$p, msg); local rmsg = fmt("[#%d/%s:%d] %s", p$id, addr_to_uri(p$host), p$p, msg);
do_script_log_common(level, src, rmsg); do_script_log_common(level, src, rmsg);
} }

View file

@ -98,3 +98,18 @@ function find_ip_addresses(input: string): string_array
} }
return output; return output;
} }
## Returns the string representation of an IP address suitable for inclusion
## in a URI. For IPv4, this does no special formatting, but for IPv6, the
## address is included in square brackets.
##
## a: the address to make suitable for URI inclusion.
##
## Returns: the string representation of *a* suitable for URI inclusion.
function addr_to_uri(a: addr): string
{
if ( is_v4_addr(a) )
return fmt("%s", a);
else
return fmt("[%s]", a);
}

View file

@ -8,5 +8,6 @@ module Communication;
event bro_init() &priority=-10 event bro_init() &priority=-10
{ {
enable_communication(); enable_communication();
listen(listen_interface, listen_port, listen_ssl); listen(listen_interface, listen_port, listen_ssl, listen_ipv6,
listen_retry);
} }

View file

@ -188,11 +188,16 @@ public:
* IPv4 to IPv6 address mapping to return a full 16 bytes. * IPv4 to IPv6 address mapping to return a full 16 bytes.
* *
* @param bytes The pointer to a memory location in which the * @param bytes The pointer to a memory location in which the
* raw bytes of the address are to be copied in network byte-order. * raw bytes of the address are to be copied.
*
* @param order The byte-order in which the returned raw bytes are copied.
* The default is network order.
*/ */
void CopyIPv6(uint32_t* bytes) const void CopyIPv6(uint32_t* bytes, ByteOrder order = Network) const
{ {
memcpy(bytes, in6.s6_addr, sizeof(in6.s6_addr)); memcpy(bytes, in6.s6_addr, sizeof(in6.s6_addr));
if ( order == Host )
for ( unsigned int i = 0; i < 4; ++i ) bytes[i] = ntohl(bytes[i]);
} }
/** /**
@ -280,6 +285,19 @@ public:
*/ */
string AsString() const; string AsString() const;
/**
* Returns a string representation of the address suitable for inclusion
* in an URI. For IPv4 addresses, this is the same as AsString(), but
* IPv6 addresses are encased in square brackets.
*/
string AsURIString() const
{
if ( GetFamily() == IPv4 )
return AsString();
else
return string("[") + AsString() + "]";
}
/** /**
* Returns a host-order, plain hex string representation of the address. * Returns a host-order, plain hex string representation of the address.
*/ */

View file

@ -147,6 +147,7 @@
#include <sys/types.h> #include <sys/types.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <netdb.h>
#include <sys/wait.h> #include <sys/wait.h>
#include <netinet/in.h> #include <netinet/in.h>
#include <unistd.h> #include <unistd.h>
@ -195,7 +196,7 @@ extern "C" {
// Gets incremented each time there's an incompatible change // Gets incremented each time there's an incompatible change
// to the communication internals. // to the communication internals.
static const unsigned short PROTOCOL_VERSION = 0x07; static const unsigned short PROTOCOL_VERSION = 0x08;
static const char MSG_NONE = 0x00; static const char MSG_NONE = 0x00;
static const char MSG_VERSION = 0x01; static const char MSG_VERSION = 0x01;
@ -458,17 +459,6 @@ static inline char* fmt_uint32s(int nargs, va_list ap)
} }
#endif #endif
static inline const char* ip2a(uint32 ip)
{
static char buffer[32];
struct in_addr addr;
addr.s_addr = htonl(ip);
return bro_inet_ntop(AF_INET, &addr, buffer, 32);
}
static pid_t child_pid = 0; static pid_t child_pid = 0;
// Return true if message type is sent by a peer (rather than the child // Return true if message type is sent by a peer (rather than the child
@ -683,24 +673,20 @@ RemoteSerializer::PeerID RemoteSerializer::Connect(const IPAddr& ip,
if ( ! initialized ) if ( ! initialized )
reporter->InternalError("remote serializer not initialized"); reporter->InternalError("remote serializer not initialized");
if ( ip.GetFamily() == IPv6 )
Error("inter-Bro communication not supported over IPv6");
const uint32* bytes;
ip.GetBytes(&bytes);
uint32 ip4 = ntohl(*bytes);
if ( ! child_pid ) if ( ! child_pid )
Fork(); Fork();
Peer* p = AddPeer(ip4, port); Peer* p = AddPeer(ip, port);
p->orig = true; p->orig = true;
if ( our_class ) if ( our_class )
p->our_class = our_class; p->our_class = our_class;
if ( ! SendToChild(MSG_CONNECT_TO, p, 5, p->id, uint32 bytes[4];
ip4, port, uint32(retry), use_ssl) ) ip.CopyIPv6(bytes, IPAddr::Host);
if ( ! SendToChild(MSG_CONNECT_TO, p, 8, p->id, bytes[0], bytes[1],
bytes[2], bytes[3], port, uint32(retry), use_ssl) )
{ {
RemovePeer(p); RemovePeer(p);
return false; return false;
@ -1232,7 +1218,8 @@ bool RemoteSerializer::SendCapabilities(Peer* peer)
return caps ? SendToChild(MSG_CAPS, peer, 3, caps, 0, 0) : true; return caps ? SendToChild(MSG_CAPS, peer, 3, caps, 0, 0) : true;
} }
bool RemoteSerializer::Listen(const IPAddr& ip, uint16 port, bool expect_ssl) bool RemoteSerializer::Listen(const IPAddr& ip, uint16 port, bool expect_ssl,
bool ipv6, double retry)
{ {
if ( ! using_communication ) if ( ! using_communication )
return true; return true;
@ -1240,14 +1227,15 @@ bool RemoteSerializer::Listen(const IPAddr& ip, uint16 port, bool expect_ssl)
if ( ! initialized ) if ( ! initialized )
reporter->InternalError("remote serializer not initialized"); reporter->InternalError("remote serializer not initialized");
if ( ip.GetFamily() == IPv6 ) if ( ! ipv6 && ip.GetFamily() == IPv6 &&
Error("inter-Bro communication not supported over IPv6"); ip != IPAddr("0.0.0.0") && ip != IPAddr("::") )
reporter->FatalError("Attempt to listen on address %s, but IPv6 communication disabled", ip.AsString().c_str());
const uint32* bytes; uint32 bytes[4];
ip.GetBytes(&bytes); ip.CopyIPv6(bytes, IPAddr::Host);
uint32 ip4 = ntohl(*bytes);
if ( ! SendToChild(MSG_LISTEN, 0, 3, ip4, port, expect_ssl) ) if ( ! SendToChild(MSG_LISTEN, 0, 8, bytes[0], bytes[1], bytes[2], bytes[3],
port, expect_ssl, ipv6, (uint32) retry) )
return false; return false;
listening = true; listening = true;
@ -1784,7 +1772,7 @@ RecordVal* RemoteSerializer::MakePeerVal(Peer* peer)
RecordVal* v = new RecordVal(::peer); RecordVal* v = new RecordVal(::peer);
v->Assign(0, new Val(uint32(peer->id), TYPE_COUNT)); v->Assign(0, new Val(uint32(peer->id), TYPE_COUNT));
// Sic! Network order for AddrVal, host order for PortVal. // Sic! Network order for AddrVal, host order for PortVal.
v->Assign(1, new AddrVal(htonl(peer->ip))); v->Assign(1, new AddrVal(peer->ip));
v->Assign(2, new PortVal(peer->port, TRANSPORT_TCP)); v->Assign(2, new PortVal(peer->port, TRANSPORT_TCP));
v->Assign(3, new Val(false, TYPE_BOOL)); v->Assign(3, new Val(false, TYPE_BOOL));
v->Assign(4, new StringVal("")); // set when received v->Assign(4, new StringVal("")); // set when received
@ -1793,7 +1781,7 @@ RecordVal* RemoteSerializer::MakePeerVal(Peer* peer)
return v; return v;
} }
RemoteSerializer::Peer* RemoteSerializer::AddPeer(uint32 ip, uint16 port, RemoteSerializer::Peer* RemoteSerializer::AddPeer(const IPAddr& ip, uint16 port,
PeerID id) PeerID id)
{ {
Peer* peer = new Peer; Peer* peer = new Peer;
@ -1960,8 +1948,8 @@ bool RemoteSerializer::ProcessConnected()
{ {
// IP and port follow. // IP and port follow.
uint32* args = (uint32*) current_args->data; uint32* args = (uint32*) current_args->data;
uint32 host = ntohl(args[0]); // ### Fix: only works for IPv4 IPAddr host = IPAddr(IPv6, args, IPAddr::Network);
uint16 port = (uint16) ntohl(args[1]); uint16 port = (uint16) ntohl(args[4]);
if ( ! current_peer ) if ( ! current_peer )
{ {
@ -2980,7 +2968,8 @@ void RemoteSerializer::Log(LogLevel level, const char* msg, Peer* peer,
if ( peer ) if ( peer )
len += snprintf(buffer + len, sizeof(buffer) - len, "[#%d/%s:%d] ", len += snprintf(buffer + len, sizeof(buffer) - len, "[#%d/%s:%d] ",
int(peer->id), ip2a(peer->ip), peer->port); int(peer->id), peer->ip.AsURIString().c_str(),
peer->port);
len += safe_snprintf(buffer + len, sizeof(buffer) - len, "%s", msg); len += safe_snprintf(buffer + len, sizeof(buffer) - len, "%s", msg);
@ -3266,8 +3255,10 @@ SocketComm::SocketComm()
terminating = false; terminating = false;
killing = false; killing = false;
listen_fd_clear = -1; listen_port = 0;
listen_fd_ssl = -1; listen_ssl = false;
enable_ipv6 = false;
bind_retry_interval = 0;
listen_next_try = 0; listen_next_try = 0;
// We don't want to use the signal handlers of our parent. // We don't want to use the signal handlers of our parent.
@ -3290,8 +3281,7 @@ SocketComm::~SocketComm()
delete peers[i]->io; delete peers[i]->io;
delete io; delete io;
close(listen_fd_clear); CloseListenFDs();
close(listen_fd_ssl);
} }
static unsigned int first_rtime = 0; static unsigned int first_rtime = 0;
@ -3340,20 +3330,13 @@ void SocketComm::Run()
} }
if ( listen_next_try && time(0) > listen_next_try ) if ( listen_next_try && time(0) > listen_next_try )
Listen(listen_if, listen_port, listen_ssl); Listen();
if ( listen_fd_clear >= 0 ) for ( size_t i = 0; i < listen_fds.size(); ++i )
{ {
FD_SET(listen_fd_clear, &fd_read); FD_SET(listen_fds[i], &fd_read);
if ( listen_fd_clear > max_fd ) if ( listen_fds[i] > max_fd )
max_fd = listen_fd_clear; max_fd = listen_fds[i];
}
if ( listen_fd_ssl >= 0 )
{
FD_SET(listen_fd_ssl, &fd_read);
if ( listen_fd_ssl > max_fd )
max_fd = listen_fd_ssl;
} }
if ( io->IsFillingUp() && ! shutting_conns_down ) if ( io->IsFillingUp() && ! shutting_conns_down )
@ -3442,12 +3425,9 @@ void SocketComm::Run()
} }
} }
if ( listen_fd_clear >= 0 && for ( size_t i = 0; i < listen_fds.size(); ++i )
FD_ISSET(listen_fd_clear, &fd_read) ) if ( FD_ISSET(listen_fds[i], &fd_read) )
AcceptConnection(listen_fd_clear); AcceptConnection(listen_fds[i]);
if ( listen_fd_ssl >= 0 && FD_ISSET(listen_fd_ssl, &fd_read) )
AcceptConnection(listen_fd_ssl);
// Hack to display CPU usage of the child, triggered via // Hack to display CPU usage of the child, triggered via
// SIGPROF. // SIGPROF.
@ -3571,13 +3551,8 @@ bool SocketComm::DoParentMessage()
case MSG_LISTEN_STOP: case MSG_LISTEN_STOP:
{ {
if ( listen_fd_ssl >= 0 ) CloseListenFDs();
close(listen_fd_ssl);
if ( listen_fd_clear >= 0 )
close(listen_fd_clear);
listen_fd_clear = listen_fd_ssl = -1;
Log("stopped listening"); Log("stopped listening");
return true; return true;
@ -3721,10 +3696,10 @@ bool SocketComm::ProcessConnectTo()
Peer* peer = new Peer; Peer* peer = new Peer;
peer->id = ntohl(args[0]); peer->id = ntohl(args[0]);
peer->ip = ntohl(args[1]); peer->ip = IPAddr(IPv6, &args[1], IPAddr::Network);
peer->port = ntohl(args[2]); peer->port = ntohl(args[5]);
peer->retry = ntohl(args[3]); peer->retry = ntohl(args[6]);
peer->ssl = ntohl(args[4]); peer->ssl = ntohl(args[7]);
return Connect(peer); return Connect(peer);
} }
@ -3734,11 +3709,13 @@ bool SocketComm::ProcessListen()
assert(parent_args); assert(parent_args);
uint32* args = (uint32*) parent_args->data; uint32* args = (uint32*) parent_args->data;
uint32 addr = ntohl(args[0]); listen_if = IPAddr(IPv6, args, IPAddr::Network);
uint16 port = uint16(ntohl(args[1])); listen_port = uint16(ntohl(args[4]));
uint32 ssl = ntohl(args[2]); listen_ssl = ntohl(args[5]) != 0;
enable_ipv6 = ntohl(args[6]) != 0;
bind_retry_interval = ntohl(args[7]);
return Listen(addr, port, ssl); return Listen();
} }
bool SocketComm::ProcessParentCompress() bool SocketComm::ProcessParentCompress()
@ -3900,29 +3877,53 @@ bool SocketComm::ProcessPeerCompress(Peer* peer)
bool SocketComm::Connect(Peer* peer) bool SocketComm::Connect(Peer* peer)
{ {
struct sockaddr_in server; int status;
addrinfo hints, *res, *res0;
bzero(&hints, sizeof(hints));
int sockfd = socket(PF_INET, SOCK_STREAM, 0); hints.ai_family = PF_UNSPEC;
if ( sockfd < 0 ) hints.ai_protocol = IPPROTO_TCP;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_NUMERICHOST;
char port_str[16];
modp_uitoa10(peer->port, port_str);
// TODO: better to accept string arguments from the user to pass into
// getaddrinfo? This might make it easier to explicitly connect to
// non-global IPv6 addresses with a scope zone identifier (RFC 4007).
status = getaddrinfo(peer->ip.AsString().c_str(), port_str, &hints, &res0);
if ( status != 0 )
{ {
Error(fmt("can't create socket, %s", strerror(errno))); Error(fmt("getaddrinfo error: %s", gai_strerror(status)));
return false; return false;
} }
bzero(&server, sizeof(server)); int sockfd = -1;
server.sin_family = AF_INET; for ( res = res0; res; res = res->ai_next )
server.sin_port = htons(peer->port); {
server.sin_addr.s_addr = htonl(peer->ip); sockfd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
if ( sockfd < 0 )
{
Error(fmt("can't create connect socket, %s", strerror(errno)));
continue;
}
bool connected = true; if ( connect(sockfd, res->ai_addr, res->ai_addrlen) < 0 )
if ( connect(sockfd, (sockaddr*) &server, sizeof(server)) < 0 )
{ {
Error(fmt("connect failed: %s", strerror(errno)), peer); Error(fmt("connect failed: %s", strerror(errno)), peer);
close(sockfd); close(sockfd);
connected = false; sockfd = -1;
continue;
} }
break;
}
freeaddrinfo(res0);
bool connected = sockfd != -1;
if ( ! (connected || peer->retry) ) if ( ! (connected || peer->retry) )
{ {
CloseConnection(peer, false); CloseConnection(peer, false);
@ -3947,9 +3948,7 @@ bool SocketComm::Connect(Peer* peer)
if ( connected ) if ( connected )
{ {
if ( peer->ssl ) if ( peer->ssl )
{
peer->io = new ChunkedIOSSL(sockfd, false); peer->io = new ChunkedIOSSL(sockfd, false);
}
else else
peer->io = new ChunkedIOFd(sockfd, "child->peer"); peer->io = new ChunkedIOFd(sockfd, "child->peer");
@ -3964,7 +3963,12 @@ bool SocketComm::Connect(Peer* peer)
if ( connected ) if ( connected )
{ {
Log("connected", peer); Log("connected", peer);
if ( ! SendToParent(MSG_CONNECTED, peer, 2, peer->ip, peer->port) )
uint32 bytes[4];
peer->ip.CopyIPv6(bytes, IPAddr::Host);
if ( ! SendToParent(MSG_CONNECTED, peer, 5, bytes[0], bytes[1],
bytes[2], bytes[3], peer->port) )
return false; return false;
} }
@ -4001,86 +4005,139 @@ bool SocketComm::CloseConnection(Peer* peer, bool reconnect)
return true; return true;
} }
bool SocketComm::Listen(uint32 ip, uint16 port, bool expect_ssl) bool SocketComm::Listen()
{ {
int* listen_fd = expect_ssl ? &listen_fd_ssl : &listen_fd_clear; int status, on = 1;
addrinfo hints, *res, *res0;
bzero(&hints, sizeof(hints));
if ( *listen_fd >= 0 ) if ( enable_ipv6 )
close(*listen_fd);
struct sockaddr_in server;
*listen_fd = socket(PF_INET, SOCK_STREAM, 0);
if ( *listen_fd < 0 )
{ {
Error(fmt("can't create listen socket, %s", if ( listen_if == IPAddr("0.0.0.0") || listen_if == IPAddr("::") )
strerror(errno))); hints.ai_family = PF_UNSPEC;
else
hints.ai_family = listen_if.GetFamily() == IPv4 ? PF_INET : PF_INET6;
}
else
hints.ai_family = PF_INET;
hints.ai_protocol = IPPROTO_TCP;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG | AI_NUMERICHOST;
char port_str[16];
modp_uitoa10(listen_port, port_str);
const char* addr_str = 0;
if ( listen_if != IPAddr("0.0.0.0") && listen_if != IPAddr("::") )
addr_str = listen_if.AsString().c_str();
CloseListenFDs();
// TODO: better to accept string arguments from the user to pass into
// getaddrinfo? This might make it easier to explicitly bind to a
// non-global IPv6 address with a scope zone identifier (RFC 4007).
if ( (status = getaddrinfo(addr_str, port_str, &hints, &res0)) != 0 )
{
Error(fmt("getaddrinfo error: %s", gai_strerror(status)));
return false; return false;
} }
// Set SO_REUSEADDR. for ( res = res0; res; res = res->ai_next )
int turn_on = 1;
if ( setsockopt(*listen_fd, SOL_SOCKET, SO_REUSEADDR,
&turn_on, sizeof(turn_on)) < 0 )
{ {
Error(fmt("can't set SO_REUSEADDR, %s", if ( res->ai_family != AF_INET && res->ai_family != AF_INET6 )
strerror(errno))); {
return false; Error(fmt("can't create listen socket: unknown address family, %d",
res->ai_family));
continue;
} }
bzero(&server, sizeof(server)); IPAddr a = res->ai_family == AF_INET ?
server.sin_family = AF_INET; IPAddr(((sockaddr_in*)res->ai_addr)->sin_addr) :
server.sin_port = htons(port); IPAddr(((sockaddr_in6*)res->ai_addr)->sin6_addr);
server.sin_addr.s_addr = htonl(ip);
if ( bind(*listen_fd, (sockaddr*) &server, sizeof(server)) < 0 ) int fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
if ( fd < 0 )
{ {
Error(fmt("can't bind to port %d, %s", port, strerror(errno))); Error(fmt("can't create listen socket, %s", strerror(errno)));
close(*listen_fd); continue;
*listen_fd = -1; }
if ( setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0 )
Error(fmt("can't set SO_REUSEADDR, %s", strerror(errno)));
// For IPv6 listening sockets, we don't want do dual binding to also
// get IPv4-mapped addresses because that's not as portable. e.g.
// many BSDs don't allow that.
if ( res->ai_family == AF_INET6 &&
setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &on, sizeof(on)) < 0 )
Error(fmt("can't set IPV6_V6ONLY, %s", strerror(errno)));
if ( bind(fd, res->ai_addr, res->ai_addrlen) < 0 )
{
Error(fmt("can't bind to %s:%s, %s", a.AsURIString().c_str(),
port_str, strerror(errno)));
close(fd);
if ( errno == EADDRINUSE ) if ( errno == EADDRINUSE )
{ {
listen_if = ip; // Abandon completely this attempt to set up listening sockets,
listen_port = port; // try again later.
listen_ssl = expect_ssl; CloseListenFDs();
// FIXME: Make this timeout configurable. listen_next_try = time(0) + bind_retry_interval;
listen_next_try = time(0) + 30;
}
return false; return false;
} }
continue;
}
if ( listen(*listen_fd, 50) < 0 ) if ( listen(fd, 50) < 0 )
{ {
Error(fmt("can't listen, %s", strerror(errno))); Error(fmt("can't listen on %s:%s, %s", a.AsURIString().c_str(),
return false; port_str, strerror(errno)));
close(fd);
continue;
} }
listen_fds.push_back(fd);
Log(fmt("listening on %s:%s (%s)", a.AsURIString().c_str(), port_str,
listen_ssl ? "ssl" : "clear"));
}
freeaddrinfo(res0);
listen_next_try = 0; listen_next_try = 0;
Log(fmt("listening on %s:%d (%s)", return listen_fds.size() > 0;
ip2a(ip), port, expect_ssl ? "ssl" : "clear"));
return true;
} }
bool SocketComm::AcceptConnection(int fd) bool SocketComm::AcceptConnection(int fd)
{ {
sockaddr_in client; sockaddr_storage client;
socklen_t len = sizeof(client); socklen_t len = sizeof(client);
int clientfd = accept(fd, (sockaddr*) &client, &len); int clientfd = accept(fd, (sockaddr*) &client, &len);
if ( clientfd < 0 ) if ( clientfd < 0 )
{ {
Error(fmt("accept failed, %s %d", Error(fmt("accept failed, %s %d", strerror(errno), errno));
strerror(errno), errno)); return false;
}
if ( client.ss_family != AF_INET && client.ss_family != AF_INET6 )
{
Error(fmt("accept fail, unknown address family %d", client.ss_family));
close(clientfd);
return false; return false;
} }
Peer* peer = new Peer; Peer* peer = new Peer;
peer->id = id_counter++; peer->id = id_counter++;
peer->ip = ntohl(client.sin_addr.s_addr); peer->ip = client.ss_family == AF_INET ?
peer->port = ntohs(client.sin_port); IPAddr(((sockaddr_in*)&client)->sin_addr) :
IPAddr(((sockaddr_in6*)&client)->sin6_addr);
peer->port = client.ss_family == AF_INET ?
ntohs(((sockaddr_in*)&client)->sin_port) :
ntohs(((sockaddr_in6*)&client)->sin6_port);
peer->connected = true; peer->connected = true;
peer->ssl = (fd == listen_fd_ssl); peer->ssl = listen_ssl;
peer->compressor = false; peer->compressor = false;
if ( peer->ssl ) if ( peer->ssl )
@ -4090,8 +4147,7 @@ bool SocketComm::AcceptConnection(int fd)
if ( ! peer->io->Init() ) if ( ! peer->io->Init() )
{ {
Error(fmt("can't init peer io: %s", Error(fmt("can't init peer io: %s", peer->io->Error()), false);
peer->io->Error()), false);
return false; return false;
} }
@ -4099,7 +4155,11 @@ bool SocketComm::AcceptConnection(int fd)
Log(fmt("accepted %s connection", peer->ssl ? "SSL" : "clear"), peer); Log(fmt("accepted %s connection", peer->ssl ? "SSL" : "clear"), peer);
if ( ! SendToParent(MSG_CONNECTED, peer, 2, peer->ip, peer->port) ) uint32 bytes[4];
peer->ip.CopyIPv6(bytes, IPAddr::Host);
if ( ! SendToParent(MSG_CONNECTED, peer, 5, bytes[0], bytes[1], bytes[2],
bytes[3], peer->port) )
return false; return false;
return true; return true;
@ -4117,12 +4177,19 @@ const char* SocketComm::MakeLogString(const char* msg, Peer* peer)
if ( peer ) if ( peer )
len = snprintf(buffer, BUFSIZE, "[#%d/%s:%d] ", int(peer->id), len = snprintf(buffer, BUFSIZE, "[#%d/%s:%d] ", int(peer->id),
ip2a(peer->ip), peer->port); peer->ip.AsURIString().c_str(), peer->port);
len += safe_snprintf(buffer + len, BUFSIZE - len, "%s", msg); len += safe_snprintf(buffer + len, BUFSIZE - len, "%s", msg);
return buffer; return buffer;
} }
void SocketComm::CloseListenFDs()
{
for ( size_t i = 0; i < listen_fds.size(); ++i )
close(listen_fds[i]);
listen_fds.clear();
}
void SocketComm::Error(const char* msg, bool kill_me) void SocketComm::Error(const char* msg, bool kill_me)
{ {
if ( kill_me ) if ( kill_me )
@ -4165,7 +4232,7 @@ void SocketComm::Log(const char* msg, Peer* peer)
void SocketComm::InternalError(const char* msg) void SocketComm::InternalError(const char* msg)
{ {
fprintf(stderr, "interal error in child: %s\n", msg); fprintf(stderr, "internal error in child: %s\n", msg);
Kill(); Kill();
} }
@ -4180,8 +4247,7 @@ void SocketComm::Kill()
LogProf(); LogProf();
Log("terminating"); Log("terminating");
close(listen_fd_clear); CloseListenFDs();
close(listen_fd_ssl);
kill(getpid(), SIGTERM); kill(getpid(), SIGTERM);

View file

@ -10,8 +10,7 @@
#include "Stats.h" #include "Stats.h"
#include "File.h" #include "File.h"
// All IP arguments are in host byte-order. #include <vector>
// FIXME: Change this to network byte order
class IncrementalSendTimer; class IncrementalSendTimer;
@ -63,7 +62,8 @@ public:
bool CompleteHandshake(PeerID peer); bool CompleteHandshake(PeerID peer);
// Start to listen. // Start to listen.
bool Listen(const IPAddr& ip, uint16 port, bool expect_ssl); bool Listen(const IPAddr& ip, uint16 port, bool expect_ssl, bool ipv6,
double retry);
// Stop it. // Stop it.
bool StopListening(); bool StopListening();
@ -179,9 +179,7 @@ protected:
struct Peer { struct Peer {
PeerID id; // Unique ID (non-zero) per peer. PeerID id; // Unique ID (non-zero) per peer.
// ### Fix: currently, we only work for IPv4. IPAddr ip;
// addr_type ip;
uint32 ip;
uint16 port; uint16 port;
handler_list handlers; handler_list handlers;
@ -277,7 +275,7 @@ protected:
bool ProcessLogWrite(); bool ProcessLogWrite();
bool ProcessRequestLogs(); bool ProcessRequestLogs();
Peer* AddPeer(uint32 ip, uint16 port, PeerID id = PEER_NONE); Peer* AddPeer(const IPAddr& ip, uint16 port, PeerID id = PEER_NONE);
Peer* LookupPeer(PeerID id, bool only_if_connected); Peer* LookupPeer(PeerID id, bool only_if_connected);
void RemovePeer(Peer* peer); void RemovePeer(Peer* peer);
bool IsConnectedPeer(PeerID id); bool IsConnectedPeer(PeerID id);
@ -412,7 +410,6 @@ protected:
{ {
id = 0; id = 0;
io = 0; io = 0;
ip = 0;
port = 0; port = 0;
state = 0; state = 0;
connected = false; connected = false;
@ -424,7 +421,7 @@ protected:
RemoteSerializer::PeerID id; RemoteSerializer::PeerID id;
ChunkedIO* io; ChunkedIO* io;
uint32 ip; IPAddr ip;
uint16 port; uint16 port;
char state; char state;
bool connected; bool connected;
@ -437,7 +434,7 @@ protected:
bool compressor; bool compressor;
}; };
bool Listen(uint32 ip, uint16 port, bool expect_ssl); bool Listen();
bool AcceptConnection(int listen_fd); bool AcceptConnection(int listen_fd);
bool Connect(Peer* peer); bool Connect(Peer* peer);
bool CloseConnection(Peer* peer, bool reconnect); bool CloseConnection(Peer* peer, bool reconnect);
@ -482,6 +479,9 @@ protected:
bool ForwardChunkToPeer(); bool ForwardChunkToPeer();
const char* MakeLogString(const char* msg, Peer *peer); const char* MakeLogString(const char* msg, Peer *peer);
// Closes all file descriptors associated with listening sockets.
void CloseListenFDs();
// Peers we are communicating with: // Peers we are communicating with:
declare(PList, Peer); declare(PList, Peer);
typedef PList(Peer) peer_list; typedef PList(Peer) peer_list;
@ -498,14 +498,15 @@ protected:
char parent_msgtype; char parent_msgtype;
ChunkedIO::Chunk* parent_args; ChunkedIO::Chunk* parent_args;
int listen_fd_clear; vector<int> listen_fds;
int listen_fd_ssl;
// If the port we're trying to bind to is already in use, we will retry // If the port we're trying to bind to is already in use, we will retry
// it regularly. // it regularly.
uint32 listen_if; // Fix: only supports IPv4 IPAddr listen_if;
uint16 listen_port; uint16 listen_port;
bool listen_ssl; bool listen_ssl;
bool enable_ipv6; // allow IPv6 listen sockets
uint32 bind_retry_interval;
time_t listen_next_try; time_t listen_next_try;
bool shutting_conns_down; bool shutting_conns_down;
bool terminating; bool terminating;

View file

@ -5402,12 +5402,17 @@ function set_compression_level%(p: event_peer, level: count%) : bool
## ##
## ssl: If true, Bro uses SSL to encrypt the session. ## ssl: If true, Bro uses SSL to encrypt the session.
## ##
## ipv6: If true, enable listening on IPv6 addresses.
##
## retry_interval: If address *ip* is found to be already in use, this is
## the interval at which to automatically retry binding.
##
## Returns: True on success. ## Returns: True on success.
## ##
## .. bro:see:: connect disconnect ## .. bro:see:: connect disconnect
function listen%(ip: addr, p: port, ssl: bool %) : bool function listen%(ip: addr, p: port, ssl: bool, ipv6: bool, retry_interval: interval%) : bool
%{ %{
return new Val(remote_serializer->Listen(ip->AsAddr(), p->Port(), ssl), TYPE_BOOL); return new Val(remote_serializer->Listen(ip->AsAddr(), p->Port(), ssl, ipv6, retry_interval), TYPE_BOOL);
%} %}
## Checks whether the last raised event came from a remote peer. ## Checks whether the last raised event came from a remote peer.

View file

@ -0,0 +1 @@
handshake done with peer: ::1

View file

@ -0,0 +1,2 @@
handshake done with peer: ::1
my_event: hello world

View file

@ -0,0 +1,33 @@
http_request
http_begin_entity
http_header
http_header
http_header
http_header
http_all_headers
http_content_type
http_end_entity
http_message_done
http_signature_found
http_reply
http_begin_entity
http_header
http_header
http_header
http_header
http_header
http_header
http_header
http_header
http_header
http_all_headers
http_content_type
http_entity_data
http_entity_data
http_entity_data
http_entity_data
http_entity_data
http_entity_data
http_entity_data
http_end_entity
http_message_done

View file

@ -0,0 +1,33 @@
http_request
http_begin_entity
http_header
http_header
http_header
http_header
http_all_headers
http_content_type
http_end_entity
http_message_done
http_signature_found
http_reply
http_begin_entity
http_header
http_header
http_header
http_header
http_header
http_header
http_header
http_header
http_header
http_all_headers
http_content_type
http_entity_data
http_entity_data
http_entity_data
http_entity_data
http_entity_data
http_entity_data
http_entity_data
http_end_entity
http_message_done

View file

@ -5,4 +5,4 @@
#path http #path http
#fields ts uid id.orig_h id.orig_p id.resp_h id.resp_p trans_depth method host uri referrer user_agent request_body_len response_body_len status_code status_msg info_code info_msg filename tags username password proxied mime_type md5 extraction_file #fields ts uid id.orig_h id.orig_p id.resp_h id.resp_p trans_depth method host uri referrer user_agent request_body_len response_body_len status_code status_msg info_code info_msg filename tags username password proxied mime_type md5 extraction_file
#types time string addr port addr port count string string string string string count count count string count string string table[enum] string string table[string] string string file #types time string addr port addr port count string string string string string count count count string count string string table[enum] string string table[string] string string file
1324314406.995958 arKYeMETxOg 141.42.64.125 56730 125.190.109.199 80 1 GET www.icir.org / - Wget/1.10 0 9130 200 OK - - - (empty) - - - text/html - - 1336588614.060989 arKYeMETxOg 141.42.64.125 56730 125.190.109.199 80 1 GET www.icir.org / - Wget/1.10 0 9130 200 OK - - - (empty) - - - text/html - -

View file

@ -5,4 +5,4 @@
#path http #path http
#fields ts uid id.orig_h id.orig_p id.resp_h id.resp_p trans_depth method host uri referrer user_agent request_body_len response_body_len status_code status_msg info_code info_msg filename tags username password proxied mime_type md5 extraction_file #fields ts uid id.orig_h id.orig_p id.resp_h id.resp_p trans_depth method host uri referrer user_agent request_body_len response_body_len status_code status_msg info_code info_msg filename tags username password proxied mime_type md5 extraction_file
#types time string addr port addr port count string string string string string count count count string count string string table[enum] string string table[string] string string file #types time string addr port addr port count string string string string string count count count string count string string table[enum] string string table[string] string string file
1324314406.995958 arKYeMETxOg 141.42.64.125 56730 125.190.109.199 80 1 GET www.icir.org / - Wget/1.10 0 9130 200 OK - - - (empty) - - - text/html - - 1336588614.060989 arKYeMETxOg 141.42.64.125 56730 125.190.109.199 80 1 GET www.icir.org / - Wget/1.10 0 9130 200 OK - - - (empty) - - - text/html - -

View file

@ -0,0 +1,33 @@
http_request
http_begin_entity
http_header
http_header
http_header
http_header
http_all_headers
http_content_type
http_end_entity
http_message_done
http_signature_found
http_reply
http_begin_entity
http_header
http_header
http_header
http_header
http_header
http_header
http_header
http_header
http_header
http_all_headers
http_content_type
http_entity_data
http_entity_data
http_entity_data
http_entity_data
http_entity_data
http_entity_data
http_entity_data
http_end_entity
http_message_done

View file

@ -0,0 +1,33 @@
http_request
http_begin_entity
http_header
http_header
http_header
http_header
http_all_headers
http_content_type
http_end_entity
http_message_done
http_signature_found
http_reply
http_begin_entity
http_header
http_header
http_header
http_header
http_header
http_header
http_header
http_header
http_header
http_all_headers
http_content_type
http_entity_data
http_entity_data
http_entity_data
http_entity_data
http_entity_data
http_entity_data
http_entity_data
http_end_entity
http_message_done

View file

@ -5,4 +5,4 @@
#path http #path http
#fields ts uid id.orig_h id.orig_p id.resp_h id.resp_p trans_depth method host uri referrer user_agent request_body_len response_body_len status_code status_msg info_code info_msg filename tags username password proxied mime_type md5 extraction_file #fields ts uid id.orig_h id.orig_p id.resp_h id.resp_p trans_depth method host uri referrer user_agent request_body_len response_body_len status_code status_msg info_code info_msg filename tags username password proxied mime_type md5 extraction_file
#types time string addr port addr port count string string string string string count count count string count string string table[enum] string string table[string] string string file #types time string addr port addr port count string string string string string count count count string count string string table[enum] string string table[string] string string file
1324314415.616486 arKYeMETxOg 141.42.64.125 56730 125.190.109.199 80 1 GET www.icir.org / - Wget/1.10 0 9130 200 OK - - - (empty) - - - text/html - - 1336587178.164598 arKYeMETxOg 141.42.64.125 56730 125.190.109.199 80 1 GET www.icir.org / - Wget/1.10 0 9130 200 OK - - - (empty) - - - text/html - -

View file

@ -5,4 +5,4 @@
#path http #path http
#fields ts uid id.orig_h id.orig_p id.resp_h id.resp_p trans_depth method host uri referrer user_agent request_body_len response_body_len status_code status_msg info_code info_msg filename tags username password proxied mime_type md5 extraction_file #fields ts uid id.orig_h id.orig_p id.resp_h id.resp_p trans_depth method host uri referrer user_agent request_body_len response_body_len status_code status_msg info_code info_msg filename tags username password proxied mime_type md5 extraction_file
#types time string addr port addr port count string string string string string count count count string count string string table[enum] string string table[string] string string file #types time string addr port addr port count string string string string string count count count string count string string table[enum] string string table[string] string string file
1324314415.616486 arKYeMETxOg 141.42.64.125 56730 125.190.109.199 80 1 GET www.icir.org / - Wget/1.10 0 9130 200 OK - - - (empty) - - - text/html - - 1336587178.164598 arKYeMETxOg 141.42.64.125 56730 125.190.109.199 80 1 GET www.icir.org / - Wget/1.10 0 9130 200 OK - - - (empty) - - - text/html - -

View file

@ -0,0 +1,52 @@
# @TEST-GROUP: comm
#
# @TEST-REQUIRES: ifconfig | grep -q "inet6 ::1"
#
# @TEST-EXEC: btest-bg-run recv bro -b ../recv.bro
# @TEST-EXEC: btest-bg-run send bro -b ../send.bro
# @TEST-EXEC: btest-bg-wait -k 20
#
# @TEST-EXEC: btest-diff recv/.stdout
# @TEST-EXEC: btest-diff send/.stdout
@TEST-START-FILE send.bro
@load base/frameworks/communication
redef Communication::nodes += {
["foo"] = [$host=[::1], $connect=T, $events=/my_event/]
};
global my_event: event(s: string);
event remote_connection_handshake_done(p: event_peer)
{
print fmt("handshake done with peer: %s", p$host);
}
event my_event(s: string)
{
print fmt("my_event: %s", s);
terminate();
}
@TEST-END-FILE
#############
@TEST-START-FILE recv.bro
@load frameworks/communication/listen
redef Communication::listen_ipv6=T;
global my_event: event(s: string);
event remote_connection_handshake_done(p: event_peer)
{
print fmt("handshake done with peer: %s", p$host);
event my_event("hello world");
terminate();
}
@TEST-END-FILE

View file

@ -8,8 +8,10 @@
# @TEST-EXEC: btest-diff receiver/http.log # @TEST-EXEC: btest-diff receiver/http.log
# @TEST-EXEC: cmp sender/http.log receiver/http.log # @TEST-EXEC: cmp sender/http.log receiver/http.log
# #
# @TEST-EXEC: bro -x sender/events.bst http/base | sed 's/^Event \[[-0-9.]*\] //g' | grep '^http_' | grep -v http_stats | sed 's/(.*$//g' >events.snd.log # @TEST-EXEC: bro -x sender/events.bst | sed 's/^Event \[[-0-9.]*\] //g' | grep '^http_' | grep -v http_stats | sed 's/(.*$//g' >events.snd.log
# @TEST-EXEC: bro -x receiver/events.bst http/base | sed 's/^Event \[[-0-9.]*\] //g' | grep '^http_' | grep -v http_stats | sed 's/(.*$//g' >events.rec.log # @TEST-EXEC: bro -x receiver/events.bst | sed 's/^Event \[[-0-9.]*\] //g' | grep '^http_' | grep -v http_stats | sed 's/(.*$//g' >events.rec.log
# @TEST-EXEC: btest-diff events.rec.log
# @TEST-EXEC: btest-diff events.snd.log
# @TEST-EXEC: cmp events.rec.log events.snd.log # @TEST-EXEC: cmp events.rec.log events.snd.log
# #
# We don't compare the transmitted event paramerters anymore. With the dynamic # We don't compare the transmitted event paramerters anymore. With the dynamic

View file

@ -10,6 +10,8 @@
# #
# @TEST-EXEC: bro -x sender/events.bst | sed 's/^Event \[[-0-9.]*\] //g' | grep '^http_' | grep -v http_stats | sed 's/(.*$//g' >events.snd.log # @TEST-EXEC: bro -x sender/events.bst | sed 's/^Event \[[-0-9.]*\] //g' | grep '^http_' | grep -v http_stats | sed 's/(.*$//g' >events.snd.log
# @TEST-EXEC: bro -x receiver/events.bst | sed 's/^Event \[[-0-9.]*\] //g' | grep '^http_' | grep -v http_stats | sed 's/(.*$//g' >events.rec.log # @TEST-EXEC: bro -x receiver/events.bst | sed 's/^Event \[[-0-9.]*\] //g' | grep '^http_' | grep -v http_stats | sed 's/(.*$//g' >events.rec.log
# @TEST-EXEC: btest-diff events.rec.log
# @TEST-EXEC: btest-diff events.snd.log
# @TEST-EXEC: cmp events.rec.log events.snd.log # @TEST-EXEC: cmp events.rec.log events.snd.log
# #
# We don't compare the transmitted event paramerters anymore. With the dynamic # We don't compare the transmitted event paramerters anymore. With the dynamic