Merge remote-tracking branch 'origin/topic/jsiwek/ipv6-comm'

* origin/topic/jsiwek/ipv6-comm:
  Enable Bro to communicate with peers over non-global IPv6 addresses.
  Add unit tests for Broccoli SSL and Broccoli IPv6 connectivity.
  Remove AI_ADDRCONFIG getaddrinfo hints flag for listening sockets.
  Undo communication protocol version bump.
  Add support to Bro for connecting with peers over IPv6.

Closes #820.

Conflicts:
	src/bro.bif
This commit is contained in:
Robin Sommer 2012-05-24 17:01:34 -07:00
commit f7261a7851
31 changed files with 749 additions and 201 deletions

View file

@ -147,6 +147,7 @@
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <sys/wait.h>
#include <netinet/in.h>
#include <unistd.h>
@ -172,6 +173,9 @@
#include <sys/resource.h>
#include <algorithm>
#include <string>
#include <sstream>
#include <vector>
#include "RemoteSerializer.h"
#include "Func.h"
@ -322,6 +326,18 @@ static const char* msgToStr(int msg)
}
}
static vector<string> tokenize(const string& s, char delim)
{
vector<string> tokens;
stringstream ss(s);
string token;
while ( std::getline(ss, token, delim) )
tokens.push_back(token);
return tokens;
}
// Start of every message between two processes. We do the low-level work
// ourselves to make this 64-bit safe. (The actual layout is an artifact of
// an earlier design that depended on how a 32-bit GCC lays out its structs ...)
@ -458,17 +474,6 @@ static inline char* fmt_uint32s(int nargs, va_list ap)
}
#endif
static inline const char* ip2a(uint32 ip)
{
static char buffer[32];
struct in_addr addr;
addr.s_addr = htonl(ip);
return bro_inet_ntop(AF_INET, &addr, buffer, 32);
}
static pid_t child_pid = 0;
// Return true if message type is sent by a peer (rather than the child
@ -675,7 +680,8 @@ void RemoteSerializer::Fork()
}
RemoteSerializer::PeerID RemoteSerializer::Connect(const IPAddr& ip,
uint16 port, const char* our_class, double retry, bool use_ssl)
const string& zone_id, uint16 port, const char* our_class, double retry,
bool use_ssl)
{
if ( ! using_communication )
return true;
@ -683,24 +689,22 @@ RemoteSerializer::PeerID RemoteSerializer::Connect(const IPAddr& ip,
if ( ! initialized )
reporter->InternalError("remote serializer not initialized");
if ( ip.GetFamily() == IPv6 )
Error("inter-Bro communication not supported over IPv6");
const uint32* bytes;
ip.GetBytes(&bytes);
uint32 ip4 = ntohl(*bytes);
if ( ! child_pid )
Fork();
Peer* p = AddPeer(ip4, port);
Peer* p = AddPeer(ip, port);
p->orig = true;
if ( our_class )
p->our_class = our_class;
if ( ! SendToChild(MSG_CONNECT_TO, p, 5, p->id,
ip4, port, uint32(retry), use_ssl) )
const size_t BUFSIZE = 1024;
char* data = new char[BUFSIZE];
snprintf(data, BUFSIZE, "%"PRIu64",%s,%s,%"PRIu16",%"PRIu32",%d", p->id,
ip.AsString().c_str(), zone_id.c_str(), port, uint32(retry),
use_ssl);
if ( ! SendToChild(MSG_CONNECT_TO, p, data) )
{
RemovePeer(p);
return false;
@ -1232,7 +1236,8 @@ bool RemoteSerializer::SendCapabilities(Peer* peer)
return caps ? SendToChild(MSG_CAPS, peer, 3, caps, 0, 0) : true;
}
bool RemoteSerializer::Listen(const IPAddr& ip, uint16 port, bool expect_ssl)
bool RemoteSerializer::Listen(const IPAddr& ip, uint16 port, bool expect_ssl,
bool ipv6, const string& zone_id, double retry)
{
if ( ! using_communication )
return true;
@ -1240,14 +1245,18 @@ bool RemoteSerializer::Listen(const IPAddr& ip, uint16 port, bool expect_ssl)
if ( ! initialized )
reporter->InternalError("remote serializer not initialized");
if ( ip.GetFamily() == IPv6 )
Error("inter-Bro communication not supported over IPv6");
if ( ! ipv6 && ip.GetFamily() == IPv6 &&
ip != IPAddr("0.0.0.0") && ip != IPAddr("::") )
reporter->FatalError("Attempt to listen on address %s, but IPv6 "
"communication disabled", ip.AsString().c_str());
const uint32* bytes;
ip.GetBytes(&bytes);
uint32 ip4 = ntohl(*bytes);
const size_t BUFSIZE = 1024;
char* data = new char[BUFSIZE];
snprintf(data, BUFSIZE, "%s,%"PRIu16",%d,%d,%s,%"PRIu32,
ip.AsString().c_str(), port, expect_ssl, ipv6, zone_id.c_str(),
(uint32) retry);
if ( ! SendToChild(MSG_LISTEN, 0, 3, ip4, port, expect_ssl) )
if ( ! SendToChild(MSG_LISTEN, 0, data) )
return false;
listening = true;
@ -1784,7 +1793,7 @@ RecordVal* RemoteSerializer::MakePeerVal(Peer* peer)
RecordVal* v = new RecordVal(::peer);
v->Assign(0, new Val(uint32(peer->id), TYPE_COUNT));
// Sic! Network order for AddrVal, host order for PortVal.
v->Assign(1, new AddrVal(htonl(peer->ip)));
v->Assign(1, new AddrVal(peer->ip));
v->Assign(2, new PortVal(peer->port, TRANSPORT_TCP));
v->Assign(3, new Val(false, TYPE_BOOL));
v->Assign(4, new StringVal("")); // set when received
@ -1793,8 +1802,8 @@ RecordVal* RemoteSerializer::MakePeerVal(Peer* peer)
return v;
}
RemoteSerializer::Peer* RemoteSerializer::AddPeer(uint32 ip, uint16 port,
PeerID id)
RemoteSerializer::Peer* RemoteSerializer::AddPeer(const IPAddr& ip, uint16 port,
PeerID id)
{
Peer* peer = new Peer;
peer->id = id != PEER_NONE ? id : id_counter++;
@ -1959,9 +1968,22 @@ bool RemoteSerializer::EnterPhaseRunning(Peer* peer)
bool RemoteSerializer::ProcessConnected()
{
// IP and port follow.
uint32* args = (uint32*) current_args->data;
uint32 host = ntohl(args[0]); // ### Fix: only works for IPv4
uint16 port = (uint16) ntohl(args[1]);
vector<string> args = tokenize(current_args->data, ',');
if ( args.size() != 2 )
{
InternalCommError("ProcessConnected() bad number of arguments");
return false;
}
IPAddr host = IPAddr(args[0]);
uint16 port;
if ( ! atoi_n(args[1].size(), args[1].c_str(), 0, 10, port) )
{
InternalCommError("ProcessConnected() bad peer port string");
return false;
}
if ( ! current_peer )
{
@ -2980,7 +3002,8 @@ void RemoteSerializer::Log(LogLevel level, const char* msg, Peer* peer,
if ( peer )
len += snprintf(buffer + len, sizeof(buffer) - len, "[#%d/%s:%d] ",
int(peer->id), ip2a(peer->ip), peer->port);
int(peer->id), peer->ip.AsURIString().c_str(),
peer->port);
len += safe_snprintf(buffer + len, sizeof(buffer) - len, "%s", msg);
@ -3266,8 +3289,10 @@ SocketComm::SocketComm()
terminating = false;
killing = false;
listen_fd_clear = -1;
listen_fd_ssl = -1;
listen_port = 0;
listen_ssl = false;
enable_ipv6 = false;
bind_retry_interval = 0;
listen_next_try = 0;
// We don't want to use the signal handlers of our parent.
@ -3290,8 +3315,7 @@ SocketComm::~SocketComm()
delete peers[i]->io;
delete io;
close(listen_fd_clear);
close(listen_fd_ssl);
CloseListenFDs();
}
static unsigned int first_rtime = 0;
@ -3340,20 +3364,13 @@ void SocketComm::Run()
}
if ( listen_next_try && time(0) > listen_next_try )
Listen(listen_if, listen_port, listen_ssl);
Listen();
if ( listen_fd_clear >= 0 )
for ( size_t i = 0; i < listen_fds.size(); ++i )
{
FD_SET(listen_fd_clear, &fd_read);
if ( listen_fd_clear > max_fd )
max_fd = listen_fd_clear;
}
if ( listen_fd_ssl >= 0 )
{
FD_SET(listen_fd_ssl, &fd_read);
if ( listen_fd_ssl > max_fd )
max_fd = listen_fd_ssl;
FD_SET(listen_fds[i], &fd_read);
if ( listen_fds[i] > max_fd )
max_fd = listen_fds[i];
}
if ( io->IsFillingUp() && ! shutting_conns_down )
@ -3442,12 +3459,9 @@ void SocketComm::Run()
}
}
if ( listen_fd_clear >= 0 &&
FD_ISSET(listen_fd_clear, &fd_read) )
AcceptConnection(listen_fd_clear);
if ( listen_fd_ssl >= 0 && FD_ISSET(listen_fd_ssl, &fd_read) )
AcceptConnection(listen_fd_ssl);
for ( size_t i = 0; i < listen_fds.size(); ++i )
if ( FD_ISSET(listen_fds[i], &fd_read) )
AcceptConnection(listen_fds[i]);
// Hack to display CPU usage of the child, triggered via
// SIGPROF.
@ -3571,13 +3585,8 @@ bool SocketComm::DoParentMessage()
case MSG_LISTEN_STOP:
{
if ( listen_fd_ssl >= 0 )
close(listen_fd_ssl);
CloseListenFDs();
if ( listen_fd_clear >= 0 )
close(listen_fd_clear);
listen_fd_clear = listen_fd_ssl = -1;
Log("stopped listening");
return true;
@ -3717,14 +3726,43 @@ bool SocketComm::ForwardChunkToPeer()
bool SocketComm::ProcessConnectTo()
{
assert(parent_args);
uint32* args = (uint32*) parent_args->data;
vector<string> args = tokenize(parent_args->data, ',');
if ( args.size() != 6 )
{
Error(fmt("ProcessConnectTo() bad number of arguments"));
return false;
}
Peer* peer = new Peer;
peer->id = ntohl(args[0]);
peer->ip = ntohl(args[1]);
peer->port = ntohl(args[2]);
peer->retry = ntohl(args[3]);
peer->ssl = ntohl(args[4]);
if ( ! atoi_n(args[0].size(), args[0].c_str(), 0, 10, peer->id) )
{
Error(fmt("ProccessConnectTo() bad peer id string"));
delete peer;
return false;
}
peer->ip = IPAddr(args[1]);
peer->zone_id = args[2];
if ( ! atoi_n(args[3].size(), args[3].c_str(), 0, 10, peer->port) )
{
Error(fmt("ProcessConnectTo() bad peer port string"));
delete peer;
return false;
}
if ( ! atoi_n(args[4].size(), args[4].c_str(), 0, 10, peer->retry) )
{
Error(fmt("ProcessConnectTo() bad peer retry string"));
delete peer;
return false;
}
peer->ssl = false;
if ( args[5] != "0" )
peer->ssl = true;
return Connect(peer);
}
@ -3732,13 +3770,39 @@ bool SocketComm::ProcessConnectTo()
bool SocketComm::ProcessListen()
{
assert(parent_args);
uint32* args = (uint32*) parent_args->data;
vector<string> args = tokenize(parent_args->data, ',');
uint32 addr = ntohl(args[0]);
uint16 port = uint16(ntohl(args[1]));
uint32 ssl = ntohl(args[2]);
if ( args.size() != 6 )
{
Error(fmt("ProcessListen() bad number of arguments"));
return false;
}
return Listen(addr, port, ssl);
listen_if = args[0];
if ( ! atoi_n(args[1].size(), args[1].c_str(), 0, 10, listen_port) )
{
Error(fmt("ProcessListen() bad peer port string"));
return false;
}
listen_ssl = false;
if ( args[2] != "0" )
listen_ssl = true;
enable_ipv6 = false;
if ( args[3] != "0" )
enable_ipv6 = true;
listen_zone_id = args[4];
if ( ! atoi_n(args[5].size(), args[5].c_str(), 0, 10, bind_retry_interval) )
{
Error(fmt("ProcessListen() bad peer port string"));
return false;
}
return Listen();
}
bool SocketComm::ProcessParentCompress()
@ -3900,29 +3964,54 @@ bool SocketComm::ProcessPeerCompress(Peer* peer)
bool SocketComm::Connect(Peer* peer)
{
struct sockaddr_in server;
int status;
addrinfo hints, *res, *res0;
bzero(&hints, sizeof(hints));
int sockfd = socket(PF_INET, SOCK_STREAM, 0);
if ( sockfd < 0 )
hints.ai_family = PF_UNSPEC;
hints.ai_protocol = IPPROTO_TCP;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_NUMERICHOST;
char port_str[16];
modp_uitoa10(peer->port, port_str);
string gaihostname(peer->ip.AsString());
if ( peer->zone_id != "" )
gaihostname.append("%").append(peer->zone_id);
status = getaddrinfo(gaihostname.c_str(), port_str, &hints, &res0);
if ( status != 0 )
{
Error(fmt("can't create socket, %s", strerror(errno)));
Error(fmt("getaddrinfo error: %s", gai_strerror(status)));
return false;
}
bzero(&server, sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(peer->port);
server.sin_addr.s_addr = htonl(peer->ip);
bool connected = true;
if ( connect(sockfd, (sockaddr*) &server, sizeof(server)) < 0 )
int sockfd = -1;
for ( res = res0; res; res = res->ai_next )
{
Error(fmt("connect failed: %s", strerror(errno)), peer);
close(sockfd);
connected = false;
sockfd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
if ( sockfd < 0 )
{
Error(fmt("can't create connect socket, %s", strerror(errno)));
continue;
}
if ( connect(sockfd, res->ai_addr, res->ai_addrlen) < 0 )
{
Error(fmt("connect failed: %s", strerror(errno)), peer);
close(sockfd);
sockfd = -1;
continue;
}
break;
}
freeaddrinfo(res0);
bool connected = sockfd != -1;
if ( ! (connected || peer->retry) )
{
CloseConnection(peer, false);
@ -3947,9 +4036,7 @@ bool SocketComm::Connect(Peer* peer)
if ( connected )
{
if ( peer->ssl )
{
peer->io = new ChunkedIOSSL(sockfd, false);
}
else
peer->io = new ChunkedIOFd(sockfd, "child->peer");
@ -3964,7 +4051,13 @@ bool SocketComm::Connect(Peer* peer)
if ( connected )
{
Log("connected", peer);
if ( ! SendToParent(MSG_CONNECTED, peer, 2, peer->ip, peer->port) )
const size_t BUFSIZE = 1024;
char* data = new char[BUFSIZE];
snprintf(data, BUFSIZE, "%s,%"PRIu32, peer->ip.AsString().c_str(),
peer->port);
if ( ! SendToParent(MSG_CONNECTED, peer, data) )
return false;
}
@ -4001,86 +4094,148 @@ bool SocketComm::CloseConnection(Peer* peer, bool reconnect)
return true;
}
bool SocketComm::Listen(uint32 ip, uint16 port, bool expect_ssl)
bool SocketComm::Listen()
{
int* listen_fd = expect_ssl ? &listen_fd_ssl : &listen_fd_clear;
int status, on = 1;
addrinfo hints, *res, *res0;
bzero(&hints, sizeof(hints));
if ( *listen_fd >= 0 )
close(*listen_fd);
IPAddr listen_ip(listen_if);
struct sockaddr_in server;
*listen_fd = socket(PF_INET, SOCK_STREAM, 0);
if ( *listen_fd < 0 )
if ( enable_ipv6 )
{
Error(fmt("can't create listen socket, %s",
strerror(errno)));
if ( listen_ip == IPAddr("0.0.0.0") || listen_ip == IPAddr("::") )
hints.ai_family = PF_UNSPEC;
else
hints.ai_family = (listen_ip.GetFamily() == IPv4 ? PF_INET : PF_INET6);
}
else
hints.ai_family = PF_INET;
hints.ai_protocol = IPPROTO_TCP;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE | AI_NUMERICHOST;
char port_str[16];
modp_uitoa10(listen_port, port_str);
string scoped_addr(listen_if);
if ( listen_zone_id != "" )
scoped_addr.append("%").append(listen_zone_id);
const char* addr_str = 0;
if ( listen_ip != IPAddr("0.0.0.0") && listen_ip != IPAddr("::") )
addr_str = scoped_addr.c_str();
CloseListenFDs();
if ( (status = getaddrinfo(addr_str, port_str, &hints, &res0)) != 0 )
{
Error(fmt("getaddrinfo error: %s", gai_strerror(status)));
return false;
}
// Set SO_REUSEADDR.
int turn_on = 1;
if ( setsockopt(*listen_fd, SOL_SOCKET, SO_REUSEADDR,
&turn_on, sizeof(turn_on)) < 0 )
for ( res = res0; res; res = res->ai_next )
{
Error(fmt("can't set SO_REUSEADDR, %s",
strerror(errno)));
return false;
}
bzero(&server, sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(port);
server.sin_addr.s_addr = htonl(ip);
if ( bind(*listen_fd, (sockaddr*) &server, sizeof(server)) < 0 )
{
Error(fmt("can't bind to port %d, %s", port, strerror(errno)));
close(*listen_fd);
*listen_fd = -1;
if ( errno == EADDRINUSE )
if ( res->ai_family != AF_INET && res->ai_family != AF_INET6 )
{
listen_if = ip;
listen_port = port;
listen_ssl = expect_ssl;
// FIXME: Make this timeout configurable.
listen_next_try = time(0) + 30;
Error(fmt("can't create listen socket: unknown address family, %d",
res->ai_family));
continue;
}
return false;
IPAddr a = (res->ai_family == AF_INET) ?
IPAddr(((sockaddr_in*)res->ai_addr)->sin_addr) :
IPAddr(((sockaddr_in6*)res->ai_addr)->sin6_addr);
string l_addr_str(a.AsURIString());
if ( listen_zone_id != "")
l_addr_str.append("%").append(listen_zone_id);
int fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
if ( fd < 0 )
{
Error(fmt("can't create listen socket, %s", strerror(errno)));
continue;
}
if ( setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0 )
Error(fmt("can't set SO_REUSEADDR, %s", strerror(errno)));
// For IPv6 listening sockets, we don't want do dual binding to also
// get IPv4-mapped addresses because that's not as portable. e.g.
// many BSDs don't allow that.
if ( res->ai_family == AF_INET6 &&
setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &on, sizeof(on)) < 0 )
Error(fmt("can't set IPV6_V6ONLY, %s", strerror(errno)));
if ( bind(fd, res->ai_addr, res->ai_addrlen) < 0 )
{
Error(fmt("can't bind to %s:%s, %s", l_addr_str.c_str(),
port_str, strerror(errno)));
close(fd);
if ( errno == EADDRINUSE )
{
// Abandon completely this attempt to set up listening sockets,
// try again later.
CloseListenFDs();
listen_next_try = time(0) + bind_retry_interval;
return false;
}
continue;
}
if ( listen(fd, 50) < 0 )
{
Error(fmt("can't listen on %s:%s, %s", l_addr_str.c_str(),
port_str, strerror(errno)));
close(fd);
continue;
}
listen_fds.push_back(fd);
Log(fmt("listening on %s:%s (%s)", l_addr_str.c_str(), port_str,
listen_ssl ? "ssl" : "clear"));
}
if ( listen(*listen_fd, 50) < 0 )
{
Error(fmt("can't listen, %s", strerror(errno)));
return false;
}
freeaddrinfo(res0);
listen_next_try = 0;
Log(fmt("listening on %s:%d (%s)",
ip2a(ip), port, expect_ssl ? "ssl" : "clear"));
return true;
return listen_fds.size() > 0;
}
bool SocketComm::AcceptConnection(int fd)
{
sockaddr_in client;
sockaddr_storage client;
socklen_t len = sizeof(client);
int clientfd = accept(fd, (sockaddr*) &client, &len);
if ( clientfd < 0 )
{
Error(fmt("accept failed, %s %d",
strerror(errno), errno));
Error(fmt("accept failed, %s %d", strerror(errno), errno));
return false;
}
if ( client.ss_family != AF_INET && client.ss_family != AF_INET6 )
{
Error(fmt("accept fail, unknown address family %d", client.ss_family));
close(clientfd);
return false;
}
Peer* peer = new Peer;
peer->id = id_counter++;
peer->ip = ntohl(client.sin_addr.s_addr);
peer->port = ntohs(client.sin_port);
peer->ip = client.ss_family == AF_INET ?
IPAddr(((sockaddr_in*)&client)->sin_addr) :
IPAddr(((sockaddr_in6*)&client)->sin6_addr);
peer->port = client.ss_family == AF_INET ?
ntohs(((sockaddr_in*)&client)->sin_port) :
ntohs(((sockaddr_in6*)&client)->sin6_port);
peer->connected = true;
peer->ssl = (fd == listen_fd_ssl);
peer->ssl = listen_ssl;
peer->compressor = false;
if ( peer->ssl )
@ -4090,8 +4245,7 @@ bool SocketComm::AcceptConnection(int fd)
if ( ! peer->io->Init() )
{
Error(fmt("can't init peer io: %s",
peer->io->Error()), false);
Error(fmt("can't init peer io: %s", peer->io->Error()), false);
return false;
}
@ -4099,7 +4253,12 @@ bool SocketComm::AcceptConnection(int fd)
Log(fmt("accepted %s connection", peer->ssl ? "SSL" : "clear"), peer);
if ( ! SendToParent(MSG_CONNECTED, peer, 2, peer->ip, peer->port) )
const size_t BUFSIZE = 1024;
char* data = new char[BUFSIZE];
snprintf(data, BUFSIZE, "%s,%"PRIu32, peer->ip.AsString().c_str(),
peer->port);
if ( ! SendToParent(MSG_CONNECTED, peer, data) )
return false;
return true;
@ -4116,13 +4275,27 @@ const char* SocketComm::MakeLogString(const char* msg, Peer* peer)
int len = 0;
if ( peer )
{
string scoped_addr(peer->ip.AsURIString());
if ( peer->zone_id != "" )
scoped_addr.append("%").append(peer->zone_id);
len = snprintf(buffer, BUFSIZE, "[#%d/%s:%d] ", int(peer->id),
ip2a(peer->ip), peer->port);
scoped_addr.c_str(), peer->port);
}
len += safe_snprintf(buffer + len, BUFSIZE - len, "%s", msg);
return buffer;
}
void SocketComm::CloseListenFDs()
{
for ( size_t i = 0; i < listen_fds.size(); ++i )
close(listen_fds[i]);
listen_fds.clear();
}
void SocketComm::Error(const char* msg, bool kill_me)
{
if ( kill_me )
@ -4165,7 +4338,7 @@ void SocketComm::Log(const char* msg, Peer* peer)
void SocketComm::InternalError(const char* msg)
{
fprintf(stderr, "interal error in child: %s\n", msg);
fprintf(stderr, "internal error in child: %s\n", msg);
Kill();
}
@ -4180,8 +4353,7 @@ void SocketComm::Kill()
LogProf();
Log("terminating");
close(listen_fd_clear);
close(listen_fd_ssl);
CloseListenFDs();
kill(getpid(), SIGTERM);