zeek/src/RemoteSerializer.cc
Robin Sommer 2f7fa3470b Merge remote branch 'origin/topic/seth/fix-compiler-warnings'
* origin/topic/seth/fix-compiler-warnings:
  Fixed problem with PRI macros.
  PRI macros are currently not working for some reason.
  Two more small compile time error fixes.
  Cleaned up the output from running binpac.
  Added line to expect shift/reduce errors in parse.in
  Cleaned up g++ warnings.

Addition: I fixed a few more warnings I was getting, and tweaked some
of the existing changes slightly.
2011-02-09 08:10:41 -08:00

3910 lines
82 KiB
C++

// $Id: RemoteSerializer.cc 6951 2009-12-04 22:23:28Z vern $
//
// Processes involved in the communication:
//
// (Local-Parent) <-> (Local-Child) <-> (Remote-Child) <-> (Remote-Parent)
//
// Message types (for parent<->child communication the CMsg's peer indicates
// about whom we're talking).
//
// Communication protocol version
// VERSION <version> <cache_size> <data-format-version>
// <run-time> [<class:string>]
//
// Send serialization
// SERIAL <serialization>
//
// Terminate(d) connection
// CLOSE
//
// Close(d) all connections
// CLOSE_ALL
//
// Connect to remote side
// CONNECT_TO <id-of-new-peer> <ip> <port> <retry-interval> <use-ssl>
//
// Connected to remote side
// CONNECTED <ip> <port>
//
// Request events from remote side
// REQUEST_EVENTS <list of events>
//
// Request synchronization of IDs with remote side
// REQUEST_SYNC <authorative:bool>
//
// Listen for connection on ip/port (ip may be INADDR_ANY)
// LISTEN <ip> <port> <use_ssl>
//
// Close listen ports.
// LISTEN_STOP
//
// Error caused by host
// ERROR <msg>
//
// Some statistics about the given peer connection
// STATS <string>
//
// Requests to set a new capture_filter
// CAPTURE_FILTER <string>
//
// Ping to peer
// PING <struct ping_args>
//
// Pong from peer
// PONG <struct ping_args>
//
// Announce our capabilities
// CAPS <flags> <reserved> <reserved>
//
// Activate compression (parent->child)
// COMPRESS <level>
//
// Indicate that all following blocks are compressed (child->child)
// COMPRESS
//
// Synchronize for pseudo-realtime processing.
// Signals that we have reached sync-point number <count>.
// SYNC_POINT <count>
//
// Signals the child that we want to terminate. Anything sent after this may
// get lost. When the child answers with another TERMINATE it is safe to
// shutdown.
// TERMINATE
//
// Debug-only: tell child to dump recently received/sent data to disk.
// DEBUG_DUMP
//
// Valid messages between processes:
//
// Main -> Child
// CONNECT_TO
// REQUEST_EVENTS
// SERIAL
// CLOSE
// CLOSE_ALL
// LISTEN
// LISTEN_STOP
// CAPTURE_FILTER
// VERSION
// REQUEST_SYNC
// PHASE_DONE
// PING
// PONG
// CAPS
// COMPRESS
// SYNC_POINT
// DEBUG_DUMP
// REMOTE_PRINT
//
// Child -> Main
// CONNECTED
// REQUEST_EVENTS
// SERIAL
// CLOSE
// ERROR
// STATS
// VERSION
// CAPTURE_FILTER
// REQUEST_SYNC
// PHASE_DONE
// PING
// PONG
// CAPS
// LOG
// SYNC_POINT
// REMOTE_PRINT
//
// Child <-> Child
// VERSION
// SERIAL
// REQUEST_EVENTS
// CAPTURE_FILTER
// REQUEST_SYNC
// PHASE_DONE
// PING
// PONG
// CAPS
// COMPRESS
// SYNC_POINT
// REMOTE_PRINT
//
// A connection between two peers has four phases:
//
// Setup:
// Initial phase.
// VERSION messages must be exchanged.
// Ends when both peers have sent VERSION.
// Handshake:
// REQUEST_EVENTS/REQUEST_SYNC/CAPTURE_FILTER/CAPS/selected SERIALs
// may be exchanged.
// Phase ends when both peers have sent PHASE_DONE.
// State synchronization:
// Entered iff at least one of the peers has sent REQUEST_SYNC.
// The peer with the smallest runtime (incl. in VERSION msg) sends
// SERIAL messages compromising all of its state.
// Phase ends when peer sends another PHASE_DONE.
// Running:
// Peers exchange SERIAL (and PING/PONG) messages.
// Phase ends with connection tear-down by one of the peers.
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/wait.h>
#include <netinet/in.h>
#include <unistd.h>
#include <errno.h>
#include <signal.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <signal.h>
#include <strings.h>
#include <stdarg.h>
#include "config.h"
#ifdef TIME_WITH_SYS_TIME
# include <sys/time.h>
# include <time.h>
#else
# ifdef HAVE_SYS_TIME_H
# include <sys/time.h>
# else
# include <time.h>
# endif
#endif
#include <sys/resource.h>
#include "RemoteSerializer.h"
#include "Func.h"
#include "EventRegistry.h"
#include "Event.h"
#include "Net.h"
#include "NetVar.h"
#include "Scope.h"
#include "Sessions.h"
#include "File.h"
#include "Conn.h"
extern "C" {
#include "setsignal.h"
};
// Gets incremented each time there's an incompatible change
// to the communication internals.
static const unsigned short PROTOCOL_VERSION = 0x06;
static const char MSG_NONE = 0x00;
static const char MSG_VERSION = 0x01;
static const char MSG_SERIAL = 0x02;
static const char MSG_CLOSE = 0x03;
static const char MSG_CLOSE_ALL = 0x04;
static const char MSG_ERROR = 0x05;
static const char MSG_CONNECT_TO = 0x06;
static const char MSG_CONNECTED = 0x07;
static const char MSG_REQUEST_EVENTS = 0x08;
static const char MSG_LISTEN = 0x09;
static const char MSG_LISTEN_STOP = 0x0a;
static const char MSG_STATS = 0x0b;
static const char MSG_CAPTURE_FILTER = 0x0c;
static const char MSG_REQUEST_SYNC = 0x0d;
static const char MSG_PHASE_DONE = 0x0e;
static const char MSG_PING = 0x0f;
static const char MSG_PONG = 0x10;
static const char MSG_CAPS = 0x11;
static const char MSG_COMPRESS = 0x12;
static const char MSG_LOG = 0x13;
static const char MSG_SYNC_POINT = 0x14;
static const char MSG_TERMINATE = 0x15;
static const char MSG_DEBUG_DUMP = 0x16;
static const char MSG_REMOTE_PRINT = 0x17;
// Update this one whenever adding a new ID:
static const char MSG_ID_MAX = MSG_REMOTE_PRINT;
static const uint32 FINAL_SYNC_POINT = /* UINT32_MAX */ 4294967295U;
// Buffer size for remote-print data
static const int PRINT_BUFFER_SIZE = 10 * 1024;
static const int SOCKBUF_SIZE = 1024 * 1024;
struct ping_args {
uint32 seq;
double time1; // Round-trip time parent1<->parent2
double time2; // Round-trip time child1<->parent2
double time3; // Round-trip time child2<->parent2
};
#ifdef DEBUG
# define DEBUG_COMM(msg) DBG_LOG(DBG_COMM, msg)
#else
# define DEBUG_COMM(msg)
#endif
#define READ_CHUNK(i, c, do_if_eof) \
{ \
if ( ! i->Read(&c) ) \
{ \
if ( i->Eof() ) \
{ \
do_if_eof; \
} \
else \
Error(fmt("can't read data chunk: %s", io->Error()), i == io); \
return false; \
} \
\
if ( ! c ) \
return true; \
}
#define READ_CHUNK_FROM_CHILD(c) \
{ \
if ( ! io->Read(&c) ) \
{ \
if ( io->Eof() ) \
ChildDied(); \
else \
Error(fmt("can't read data chunk: %s", io->Error())); \
return false; \
} \
\
if ( ! c ) \
{ \
idle = io->IsIdle();\
return true; \
} \
idle = false; \
}
static const char* msgToStr(int msg)
{
# define MSG_STR(x) case x: return #x;
switch ( msg ) {
MSG_STR(MSG_VERSION)
MSG_STR(MSG_NONE)
MSG_STR(MSG_SERIAL)
MSG_STR(MSG_CLOSE)
MSG_STR(MSG_CLOSE_ALL)
MSG_STR(MSG_ERROR)
MSG_STR(MSG_CONNECT_TO)
MSG_STR(MSG_CONNECTED)
MSG_STR(MSG_REQUEST_EVENTS)
MSG_STR(MSG_LISTEN)
MSG_STR(MSG_LISTEN_STOP)
MSG_STR(MSG_STATS)
MSG_STR(MSG_CAPTURE_FILTER)
MSG_STR(MSG_REQUEST_SYNC)
MSG_STR(MSG_PHASE_DONE)
MSG_STR(MSG_PING)
MSG_STR(MSG_PONG)
MSG_STR(MSG_CAPS)
MSG_STR(MSG_COMPRESS)
MSG_STR(MSG_LOG)
MSG_STR(MSG_SYNC_POINT)
MSG_STR(MSG_TERMINATE)
MSG_STR(MSG_DEBUG_DUMP)
MSG_STR(MSG_REMOTE_PRINT)
default:
return "UNKNOWN_MSG";
}
}
// Start of every message between two processes. We do the low-level work
// ourselves to make this 64-bit safe. (The actual layout is an artifact of
// an earlier design that depended on how a 32-bit GCC lays out its structs ...)
class CMsg {
public:
CMsg(char type, RemoteSerializer::PeerID peer)
{
buffer[0] = type;
uint32 tmp = htonl(peer);
memcpy(buffer + 4, &tmp, sizeof(tmp));
}
char Type() { return buffer[0]; }
RemoteSerializer::PeerID Peer()
{
// Wow, is this ugly...
return ntohl(*(uint32*)(buffer + 4));
}
const char* Raw() { return buffer; }
private:
char buffer[8];
};
static bool sendCMsg(ChunkedIO* io, char msg_type, RemoteSerializer::PeerID id)
{
// We use the new[] operator here to avoid mismatches
// when deleting the data.
CMsg* msg = (CMsg*) new char[sizeof(CMsg)];
new (msg) CMsg(msg_type, id);
ChunkedIO::Chunk* c = new ChunkedIO::Chunk;
c->len = sizeof(CMsg);
c->data = (char*) msg;
return io->Write(c);
}
static ChunkedIO::Chunk* makeSerialMsg(RemoteSerializer::PeerID id)
{
// We use the new[] operator here to avoid mismatches
// when deleting the data.
CMsg* msg = (CMsg*) new char[sizeof(CMsg)];
new (msg) CMsg(MSG_SERIAL, id);
ChunkedIO::Chunk* c = new ChunkedIO::Chunk;
c->len = sizeof(CMsg);
c->data = (char*) msg;
return c;
}
inline void RemoteSerializer::SetupSerialInfo(SerialInfo* info, Peer* peer)
{
info->chunk = makeSerialMsg(peer->id);
if ( peer->caps & Peer::NO_CACHING )
info->cache = false;
if ( ! (peer->caps & Peer::PID_64BIT) || peer->phase != Peer::RUNNING )
info->pid_32bit = true;
if ( (peer->caps & Peer::NEW_CACHE_STRATEGY) &&
peer->phase == Peer::RUNNING )
info->new_cache_strategy = true;
info->include_locations = false;
}
static bool sendToIO(ChunkedIO* io, ChunkedIO::Chunk* c)
{
if ( ! io->Write(c) )
{
warn(fmt("can't send chunk: %s", io->Error()));
return false;
}
return true;
}
static bool sendToIO(ChunkedIO* io, char msg_type, RemoteSerializer::PeerID id,
const char* str, int len = -1)
{
if ( ! sendCMsg(io, msg_type, id) )
{
warn(fmt("can't send message of type %d: %s", msg_type, io->Error()));
return false;
}
ChunkedIO::Chunk* c = new ChunkedIO::Chunk;
c->len = len >= 0 ? len : strlen(str) + 1;
c->data = const_cast<char*>(str);
return sendToIO(io, c);
}
static bool sendToIO(ChunkedIO* io, char msg_type, RemoteSerializer::PeerID id,
int nargs, va_list ap)
{
if ( ! sendCMsg(io, msg_type, id) )
{
warn(fmt("can't send message of type %d: %s", msg_type, io->Error()));
return false;
}
if ( nargs == 0 )
return true;
uint32* args = new uint32[nargs];
for ( int i = 0; i < nargs; i++ )
args[i] = htonl(va_arg(ap, uint32));
ChunkedIO::Chunk* c = new ChunkedIO::Chunk;
c->len = sizeof(uint32) * nargs;
c->data = (char*) args;
return sendToIO(io, c);
}
#ifdef DEBUG
static inline char* fmt_uint32s(int nargs, va_list ap)
{
static char buf[512];
char* p = buf;
*p = '\0';
for ( int i = 0; i < nargs; i++ )
p += snprintf(p, sizeof(buf) - (p - buf),
" 0x%08x", va_arg(ap, uint32));
buf[511] = '\0';
return buf;
}
#endif
static inline const char* ip2a(uint32 ip)
{
static char buffer[32];
struct in_addr addr;
addr.s_addr = htonl(ip);
return inet_ntop(AF_INET, &addr, buffer, 32);
}
static pid_t child_pid = 0;
// Return true if message type is sent by a peer (rather than the child
// process itself).
static inline bool is_peer_msg(int msg)
{
return msg == MSG_VERSION ||
msg == MSG_SERIAL ||
msg == MSG_REQUEST_EVENTS ||
msg == MSG_REQUEST_SYNC ||
msg == MSG_CAPTURE_FILTER ||
msg == MSG_PHASE_DONE ||
msg == MSG_PING ||
msg == MSG_PONG ||
msg == MSG_CAPS ||
msg == MSG_COMPRESS ||
msg == MSG_SYNC_POINT ||
msg == MSG_REMOTE_PRINT;
}
bool RemoteSerializer::IsConnectedPeer(PeerID id)
{
if ( id == PEER_NONE )
return true;
return LookupPeer(id, true) != 0;
}
class IncrementalSendTimer : public Timer {
public:
IncrementalSendTimer(double t, RemoteSerializer::Peer* p, SerialInfo* i)
: Timer(t, TIMER_INCREMENTAL_SEND), info(i), peer(p) {}
virtual void Dispatch(double t, int is_expire)
{
// Never suspend when we're finishing up.
if ( terminating )
info->may_suspend = false;
remote_serializer->SendAllSynchronized(peer, info);
}
SerialInfo* info;
RemoteSerializer::Peer* peer;
};
RemoteSerializer::RemoteSerializer()
{
initialized = false;
current_peer = 0;
msgstate = TYPE;
id_counter = 1;
listening = false;
ignore_accesses = false;
propagate_accesses = 1;
current_sync_point = 0;
syncing_times = false;
io = 0;
closed = false;
terminating = false;
in_sync = 0;
}
RemoteSerializer::~RemoteSerializer()
{
if ( child_pid )
{
kill(child_pid, SIGKILL);
waitpid(child_pid, 0, 0);
}
delete io;
}
void RemoteSerializer::Init()
{
if ( initialized )
return;
if ( reading_traces && ! pseudo_realtime )
{
using_communication = 0;
return;
}
Fork();
io_sources.Register(this);
Log(LogInfo, fmt("communication started, parent pid is %d, child pid is %d", getpid(), child_pid));
initialized = 1;
}
void RemoteSerializer::SetSocketBufferSize(int fd, int opt, const char *what, int size, int verbose)
{
int defsize = 0;
socklen_t len = sizeof(defsize);
if ( getsockopt(fd, SOL_SOCKET, opt, (void *)&defsize, &len) < 0 )
{
if ( verbose )
Log(LogInfo, fmt("warning: cannot get socket buffer size (%s): %s", what, strerror(errno)));
return;
}
for ( int trysize = size; trysize > defsize; trysize -= 1024 )
{
if ( setsockopt(fd, SOL_SOCKET, opt, &trysize, sizeof(trysize)) >= 0 )
{
if ( verbose )
{
if ( trysize == size )
Log(LogInfo, fmt("raised pipe's socket buffer size from %dK to %dK", defsize / 1024, trysize / 1024));
else
Log(LogInfo, fmt("raised pipe's socket buffer size from %dK to %dK (%dK was requested)", defsize / 1024, trysize / 1024, size / 1024));
}
return;
}
}
Log(LogInfo, fmt("warning: cannot increase %s socket buffer size from %dK (%dK was requested)", what, defsize / 1024, size / 1024));
}
void RemoteSerializer::Fork()
{
if ( child_pid )
return;
// If we are re-forking, remove old entries
loop_over_list(peers, i)
RemovePeer(peers[i]);
// Create pipe for communication between parent and child.
int pipe[2];
if ( socketpair(AF_UNIX, SOCK_STREAM, 0, pipe) < 0 )
{
Error(fmt("can't create pipe: %s", strerror(errno)));
return;
}
// Try to increase the size of the socket send and receive buffers.
SetSocketBufferSize(pipe[0], SO_SNDBUF, "SO_SNDBUF", SOCKBUF_SIZE, 1);
SetSocketBufferSize(pipe[0], SO_RCVBUF, "SO_RCVBUF", SOCKBUF_SIZE, 0);
SetSocketBufferSize(pipe[1], SO_SNDBUF, "SO_SNDBUF", SOCKBUF_SIZE, 0);
SetSocketBufferSize(pipe[1], SO_RCVBUF, "SO_RCVBUF", SOCKBUF_SIZE, 0);
child_pid = 0;
int pid = fork();
if ( pid < 0 )
{
Error(fmt("can't fork: %s", strerror(errno)));
return;
}
if ( pid > 0 )
{
// Parent
child_pid = pid;
io = new ChunkedIOFd(pipe[0], "parent->child", child_pid);
if ( ! io->Init() )
{
Error(fmt("can't init child io: %s", io->Error()));
exit(1); // FIXME: Better way to handle this?
}
close(pipe[1]);
return;
}
else
{ // child
SocketComm child;
ChunkedIOFd* io =
new ChunkedIOFd(pipe[1], "child->parent", getppid());
if ( ! io->Init() )
{
Error(fmt("can't init parent io: %s", io->Error()));
exit(1);
}
child.SetParentIO(io);
close(pipe[0]);
// Close file descriptors.
close(0);
close(1);
close(2);
// Be nice.
setpriority(PRIO_PROCESS, 0, 5);
child.Run();
internal_error("cannot be reached");
}
}
RemoteSerializer::PeerID RemoteSerializer::Connect(addr_type ip, uint16 port,
const char* our_class, double retry, bool use_ssl)
{
if ( ! using_communication )
return true;
if ( ! initialized )
internal_error("remote serializer not initialized");
#ifdef BROv6
if ( ! is_v4_addr(ip) )
Error("inter-Bro communication not supported over IPv6");
uint32 ip4 = to_v4_addr(ip);
#else
uint32 ip4 = ip;
#endif
ip4 = ntohl(ip4);
if ( ! child_pid )
Fork();
Peer* p = AddPeer(ip4, port);
p->orig = true;
if ( our_class )
p->our_class = our_class;
if ( ! SendToChild(MSG_CONNECT_TO, p, 5, p->id,
ip4, port, uint32(retry), use_ssl) )
{
RemovePeer(p);
return false;
}
p->state = Peer::PENDING;
return p->id;
}
bool RemoteSerializer::CloseConnection(Peer* peer)
{
if ( peer->suspended_processing )
{
net_continue_processing();
peer->suspended_processing = false;
}
if ( peer->state == Peer::CLOSING )
return true;
FlushPrintBuffer(peer);
Log(LogInfo, "closing connection", peer);
peer->state = Peer::CLOSING;
return SendToChild(MSG_CLOSE, peer, 0);
}
bool RemoteSerializer::RequestSync(PeerID id, bool auth)
{
if ( ! using_communication )
return true;
Peer* peer = LookupPeer(id, true);
if ( ! peer )
{
run_time(fmt("unknown peer id %d for request sync", int(id)));
return false;
}
if ( peer->phase != Peer::HANDSHAKE )
{
run_time(fmt("can't request sync from peer; wrong phase %d",
peer->phase));
return false;
}
if ( ! SendToChild(MSG_REQUEST_SYNC, peer, 1, auth ? 1 : 0) )
return false;
peer->sync_requested |= Peer::WE | (auth ? Peer::AUTH_WE : 0);
return true;
}
bool RemoteSerializer::RequestEvents(PeerID id, RE_Matcher* pattern)
{
if ( ! using_communication )
return true;
Peer* peer = LookupPeer(id, true);
if ( ! peer )
{
run_time(fmt("unknown peer id %d for request sync", int(id)));
return false;
}
if ( peer->phase != Peer::HANDSHAKE )
{
run_time(fmt("can't request events from peer; wrong phase %d",
peer->phase));
return false;
}
EventRegistry::string_list* handlers = event_registry->Match(pattern);
// Concat the handlers' names.
int len = 0;
loop_over_list(*handlers, i)
len += strlen((*handlers)[i]) + 1;
if ( ! len )
{
Log(LogInfo, "warning: no events to request");
delete handlers;
return true;
}
char* data = new char[len];
char* d = data;
loop_over_list(*handlers, j)
{
for ( const char* p = (*handlers)[j]; *p; *d++ = *p++ )
;
*d++ = '\0';
}
delete handlers;
return SendToChild(MSG_REQUEST_EVENTS, peer, data, len);
}
bool RemoteSerializer::SetAcceptState(PeerID id, bool accept)
{
Peer* p = LookupPeer(id, false);
if ( ! p )
return true;
p->accept_state = accept;
return true;
}
bool RemoteSerializer::SetCompressionLevel(PeerID id, int level)
{
Peer* p = LookupPeer(id, false);
if ( ! p )
return true;
p->comp_level = level;
return true;
}
bool RemoteSerializer::CompleteHandshake(PeerID id)
{
Peer* p = LookupPeer(id, false);
if ( ! p )
return true;
if ( p->phase != Peer::HANDSHAKE )
{
run_time(fmt("can't complete handshake; wrong phase %d",
p->phase));
return false;
}
p->handshake_done |= Peer::WE;
if ( ! SendToChild(MSG_PHASE_DONE, p, 0) )
return false;
if ( p->handshake_done == Peer::BOTH )
HandshakeDone(p);
return true;
}
bool RemoteSerializer::SendCall(SerialInfo* info, PeerID id,
const char* name, val_list* vl)
{
if ( ! using_communication || terminating )
return true;
Peer* peer = LookupPeer(id, true);
if ( ! peer )
return false;
return SendCall(info, peer, name, vl);
}
bool RemoteSerializer::SendCall(SerialInfo* info, Peer* peer,
const char* name, val_list* vl)
{
if ( peer->phase != Peer::RUNNING || terminating )
return false;
++stats.events.out;
SetCache(peer->cache_out);
SetupSerialInfo(info, peer);
if ( ! Serialize(info, name, vl) )
{
FatalError(io->Error());
return false;
}
return true;
}
bool RemoteSerializer::SendCall(SerialInfo* info, const char* name,
val_list* vl)
{
if ( ! IsOpen() || ! PropagateAccesses() || terminating )
return true;
loop_over_list(peers, i)
{
// Do not send event back to originating peer.
if ( peers[i] == current_peer )
continue;
SerialInfo new_info(*info);
if ( ! SendCall(&new_info, peers[i], name, vl) )
return false;
}
return true;
}
bool RemoteSerializer::SendAccess(SerialInfo* info, Peer* peer,
const StateAccess& access)
{
if ( ! (peer->sync_requested & Peer::PEER) || terminating )
return true;
#ifdef DEBUG
ODesc desc;
access.Describe(&desc);
DBG_LOG(DBG_COMM, "Sending %s", desc.Description());
#endif
++stats.accesses.out;
SetCache(peer->cache_out);
SetupSerialInfo(info, peer);
info->globals_as_names = true;
if ( ! Serialize(info, access) )
{
FatalError(io->Error());
return false;
}
return true;
}
bool RemoteSerializer::SendAccess(SerialInfo* info, PeerID pid,
const StateAccess& access)
{
Peer* p = LookupPeer(pid, false);
if ( ! p )
return true;
return SendAccess(info, p, access);
}
bool RemoteSerializer::SendAccess(SerialInfo* info, const StateAccess& access)
{
if ( ! IsOpen() || ! PropagateAccesses() || terminating )
return true;
// A real broadcast would be nice here. But the different peers have
// different serialization caches, so we cannot simply send the same
// serialization to all of them ...
loop_over_list(peers, i)
{
// Do not send access back to originating peer.
if ( peers[i] == source_peer )
continue;
// Only sent accesses for fully setup peers.
if ( peers[i]->phase != Peer::RUNNING )
continue;
SerialInfo new_info(*info);
if ( ! SendAccess(&new_info, peers[i], access) )
return false;
}
return true;
}
bool RemoteSerializer::SendAllSynchronized(Peer* peer, SerialInfo* info)
{
// FIXME: When suspending ID serialization works, remove!
DisableSuspend suspend(info);
current_peer = peer;
Continuation* cont = &info->cont;
ptr_compat_int index;
if ( info->cont.NewInstance() )
{
Log(LogInfo, "starting to send full state", peer);
index = 0;
}
else
{
index = int(ptr_compat_int(cont->RestoreState()));
if ( ! cont->ChildSuspended() )
cont->Resume();
}
for ( ; index < sync_ids.length(); ++index )
{
cont->SaveContext();
StateAccess sa(OP_ASSIGN, sync_ids[index],
sync_ids[index]->ID_Val());
// FIXME: When suspending ID serialization works, we need to
// addsupport to StateAccesses, too.
bool result = SendAccess(info, peer, sa);
cont->RestoreContext();
if ( ! result )
return false;
if ( cont->ChildSuspended() || info->may_suspend )
{
double t = network_time + state_write_delay;
timer_mgr->Add(new IncrementalSendTimer(t, peer, info));
cont->SaveState((void*) index);
if ( info->may_suspend )
cont->Suspend();
return true;
}
}
if ( ! SendToChild(MSG_PHASE_DONE, peer, 0) )
return false;
suspend.Release();
delete info;
Log(LogInfo, "done sending full state", peer);
return EnterPhaseRunning(peer);
}
bool RemoteSerializer::SendID(SerialInfo* info, Peer* peer, const ID& id)
{
if ( terminating )
return true;
// FIXME: When suspending ID serialization works, remove!
DisableSuspend suspend(info);
if ( info->cont.NewInstance() )
++stats.ids.out;
SetCache(peer->cache_out);
SetupSerialInfo(info, peer);
info->cont.SaveContext();
bool result = Serialize(info, id);
info->cont.RestoreContext();
if ( ! result )
{
FatalError(io->Error());
return false;
}
return true;
}
bool RemoteSerializer::SendID(SerialInfo* info, PeerID pid, const ID& id)
{
if ( ! using_communication || terminating )
return true;
Peer* peer = LookupPeer(pid, true);
if ( ! peer )
return false;
if ( peer->phase != Peer::RUNNING )
return false;
return SendID(info, peer, id);
}
bool RemoteSerializer::SendConnection(SerialInfo* info, PeerID id,
const Connection& c)
{
if ( ! using_communication || terminating )
return true;
Peer* peer = LookupPeer(id, true);
if ( ! peer )
return false;
if ( peer->phase != Peer::RUNNING )
return false;
++stats.conns.out;
SetCache(peer->cache_out);
SetupSerialInfo(info, peer);
if ( ! Serialize(info, c) )
{
FatalError(io->Error());
return false;
}
return true;
}
bool RemoteSerializer::SendCaptureFilter(PeerID id, const char* filter)
{
if ( ! using_communication || terminating )
return true;
Peer* peer = LookupPeer(id, true);
if ( ! peer )
return false;
if ( peer->phase != Peer::HANDSHAKE )
{
run_time(fmt("can't sent capture filter to peer; wrong phase %d", peer->phase));
return false;
}
return SendToChild(MSG_CAPTURE_FILTER, peer, copy_string(filter));
}
bool RemoteSerializer::SendPacket(SerialInfo* info, const Packet& p)
{
if ( ! IsOpen() || !PropagateAccesses() || terminating )
return true;
loop_over_list(peers, i)
{
// Only sent packet for fully setup peers.
if ( peers[i]->phase != Peer::RUNNING )
continue;
SerialInfo new_info(*info);
if ( ! SendPacket(&new_info, peers[i], p) )
return false;
}
return true;
}
bool RemoteSerializer::SendPacket(SerialInfo* info, PeerID id, const Packet& p)
{
if ( ! using_communication || terminating )
return true;
Peer* peer = LookupPeer(id, true);
if ( ! peer )
return false;
return SendPacket(info, peer, p);
}
bool RemoteSerializer::SendPacket(SerialInfo* info, Peer* peer, const Packet& p)
{
++stats.packets.out;
SetCache(peer->cache_out);
SetupSerialInfo(info, peer);
if ( ! Serialize(info, p) )
{
FatalError(io->Error());
return false;
}
return true;
}
bool RemoteSerializer::SendPing(PeerID id, uint32 seq)
{
if ( ! using_communication || terminating )
return true;
Peer* peer = LookupPeer(id, true);
if ( ! peer )
return false;
char* data = new char[sizeof(ping_args)];
ping_args* args = (ping_args*) data;
args->seq = htonl(seq);
args->time1 = htond(current_time(true));
args->time2 = 0;
args->time3 = 0;
return SendToChild(MSG_PING, peer, data, sizeof(ping_args));
}
bool RemoteSerializer::SendCapabilities(Peer* peer)
{
if ( peer->phase != Peer::HANDSHAKE )
{
run_time(fmt("can't sent capabilties to peer; wrong phase %d",
peer->phase));
return false;
}
uint32 caps = 0;
#ifdef HAVE_LIBZ
caps |= Peer::COMPRESSION;
#endif
caps |= Peer::PID_64BIT;
caps |= Peer::NEW_CACHE_STRATEGY;
return caps ? SendToChild(MSG_CAPS, peer, 3, caps, 0, 0) : true;
}
bool RemoteSerializer::Listen(addr_type ip, uint16 port, bool expect_ssl)
{
if ( ! using_communication )
return true;
if ( ! initialized )
internal_error("remote serializer not initialized");
#ifdef BROv6
if ( ! is_v4_addr(ip) )
Error("inter-Bro communication not supported over IPv6");
uint32 ip4 = to_v4_addr(ip);
#else
uint32 ip4 = ip;
#endif
ip4 = ntohl(ip4);
if ( ! SendToChild(MSG_LISTEN, 0, 3, ip4, port, expect_ssl) )
return false;
listening = true;
closed = false;
return true;
}
void RemoteSerializer::SendSyncPoint(uint32 point)
{
if ( ! (remote_trace_sync_interval && pseudo_realtime) || terminating )
return;
current_sync_point = point;
loop_over_list(peers, i)
if ( peers[i]->phase == Peer::RUNNING &&
! SendToChild(MSG_SYNC_POINT, peers[i],
1, current_sync_point) )
return;
if ( ! syncing_times )
{
Log(LogInfo, "waiting for peers");
syncing_times = true;
loop_over_list(peers, i)
{
// Need to do this once per peer to correctly
// track the number of suspend calls.
net_suspend_processing();
peers[i]->suspended_processing = true;
}
}
CheckSyncPoints();
}
uint32 RemoteSerializer::SendSyncPoint()
{
Log(LogInfo, fmt("reached sync-point %u", current_sync_point));
SendSyncPoint(current_sync_point + 1);
return current_sync_point;
}
void RemoteSerializer::SendFinalSyncPoint()
{
Log(LogInfo, fmt("reached end of trace, sending final sync point"));
SendSyncPoint(FINAL_SYNC_POINT);
}
bool RemoteSerializer::Terminate()
{
Log(LogInfo, fmt("terminating..."));
return terminating = SendToChild(MSG_TERMINATE, 0, 0);
}
bool RemoteSerializer::StopListening()
{
if ( ! listening )
return true;
if ( ! SendToChild(MSG_LISTEN_STOP, 0, 0) )
return false;
listening = false;
closed = ! IsActive();
return true;
}
void RemoteSerializer::Register(ID* id)
{
DBG_LOG(DBG_STATE, "&synchronized %s", id->Name());
Unregister(id);
Ref(id);
sync_ids.append(id);
}
void RemoteSerializer::Unregister(ID* id)
{
loop_over_list(sync_ids, i)
if ( streq(sync_ids[i]->Name(), id->Name()) )
{
Unref(sync_ids[i]);
sync_ids.remove_nth(i);
break;
}
}
void RemoteSerializer::GetFds(int* read, int* write, int* except)
{
*read = io->Fd();
if ( io->CanWrite() )
*write = io->Fd();
}
double RemoteSerializer::NextTimestamp(double* local_network_time)
{
Poll(false);
double et = events.length() ? events[0]->time : -1;
double pt = packets.length() ? packets[0]->time : -1;
if ( ! et )
et = timer_mgr->Time();
if ( ! pt )
pt = timer_mgr->Time();
if ( packets.length() )
idle = false;
if ( et >= 0 && (et < pt || pt < 0) )
return et;
if ( pt >= 0 )
{
// Return packet time as network time.
*local_network_time = packets[0]->p->time;
return pt;
}
return -1;
}
TimerMgr::Tag* RemoteSerializer::GetCurrentTag()
{
return packets.length() ? &packets[0]->p->tag : 0;
}
void RemoteSerializer::Process()
{
Poll(false);
int i = 0;
while ( events.length() )
{
if ( max_remote_events_processed &&
++i > max_remote_events_processed )
break;
BufferedEvent* be = events[0];
::Event* event = new ::Event(be->handler, be->args, be->src);
Peer* old_current_peer = current_peer;
// Prevent the source peer from getting the event back.
current_peer = LookupPeer(be->src, true); // may be null.
mgr.Dispatch(event, ! forward_remote_events);
current_peer = old_current_peer;
assert(events[0] == be);
delete be;
events.remove_nth(0);
}
// We shouldn't pass along more than one packet, as otherwise the
// timer mgr will not advance.
if ( packets.length() )
{
BufferedPacket* bp = packets[0];
Packet* p = bp->p;
// FIXME: The following chunk of code is copied from
// net_packet_dispatch(). We should change that function
// to accept an IOSource instead of the PktSrc.
network_time = p->time;
SegmentProfiler(segment_logger, "expiring-timers");
TimerMgr* tmgr = sessions->LookupTimerMgr(GetCurrentTag());
current_dispatched =
tmgr->Advance(network_time, max_timer_expires);
current_hdr = p->hdr;
current_pkt = p->pkt;
current_pktsrc = 0;
current_iosrc = this;
sessions->NextPacket(p->time, p->hdr, p->pkt, p->hdr_size, 0);
mgr.Drain();
current_hdr = 0; // done with these
current_pkt = 0;
current_iosrc = 0;
delete p;
delete bp;
packets.remove_nth(0);
}
if ( packets.length() )
idle = false;
}
void RemoteSerializer::Finish()
{
if ( ! using_communication )
return;
do
Poll(true);
while ( io->CanWrite() );
loop_over_list(peers, i)
CloseConnection(peers[i]);
}
bool RemoteSerializer::Poll(bool may_block)
{
if ( ! child_pid )
return true;
// See if there's any peer waiting for initial state synchronization.
if ( sync_pending.length() && ! in_sync )
{
Peer* p = sync_pending[0];
sync_pending.remove_nth(0);
HandshakeDone(p);
}
io->Flush();
idle = false;
switch ( msgstate ) {
case TYPE:
{
current_peer = 0;
current_msgtype = MSG_NONE;
// CMsg follows
ChunkedIO::Chunk* c;
READ_CHUNK_FROM_CHILD(c);
CMsg* msg = (CMsg*) c->data;
current_peer = LookupPeer(msg->Peer(), false);
current_id = msg->Peer();
current_msgtype = msg->Type();
current_args = 0;
delete [] c->data;
delete c;
switch ( current_msgtype ) {
case MSG_CLOSE:
case MSG_CLOSE_ALL:
case MSG_LISTEN_STOP:
case MSG_PHASE_DONE:
case MSG_TERMINATE:
case MSG_DEBUG_DUMP:
{
// No further argument chunk.
msgstate = TYPE;
return DoMessage();
}
case MSG_VERSION:
case MSG_SERIAL:
case MSG_ERROR:
case MSG_CONNECT_TO:
case MSG_CONNECTED:
case MSG_REQUEST_EVENTS:
case MSG_REQUEST_SYNC:
case MSG_LISTEN:
case MSG_STATS:
case MSG_CAPTURE_FILTER:
case MSG_PING:
case MSG_PONG:
case MSG_CAPS:
case MSG_COMPRESS:
case MSG_LOG:
case MSG_SYNC_POINT:
case MSG_REMOTE_PRINT:
{
// One further argument chunk.
msgstate = ARGS;
return Poll(may_block);
}
case MSG_NONE:
InternalCommError(fmt("unexpected msg type %d",
current_msgtype));
return true;
default:
InternalCommError(fmt("unknown msg type %d in Poll()",
current_msgtype));
return true;
}
}
case ARGS:
{
// Argument chunk follows.
ChunkedIO::Chunk* c;
READ_CHUNK_FROM_CHILD(c);
current_args = c;
msgstate = TYPE;
bool result = DoMessage();
delete [] current_args->data;
delete current_args;
current_args = 0;
return result;
}
default:
internal_error("unknown msgstate");
}
internal_error("cannot be reached");
}
bool RemoteSerializer::DoMessage()
{
if ( current_peer &&
(current_peer->state == Peer::CLOSING ||
current_peer->state == Peer::CLOSED) &&
is_peer_msg(current_msgtype) )
{
// We shut the connection to this peer down,
// so we ignore all further messages.
DEBUG_COMM(fmt("parent: ignoring %s due to shutdown of peer #%" PRI_SOURCE_ID,
msgToStr(current_msgtype),
current_peer ? current_peer->id : 0));
return true;
}
DEBUG_COMM(fmt("parent: %s from child; peer is #%" PRI_SOURCE_ID,
msgToStr(current_msgtype),
current_peer ? current_peer->id : 0));
if ( current_peer &&
(current_msgtype < 0 || current_msgtype > MSG_ID_MAX) )
{
Log(LogError, "garbage message from peer, shutting down",
current_peer);
CloseConnection(current_peer);
return true;
}
// As long as we haven't finished the version
// handshake, no other messages than MSG_VERSION
// are allowed from peer.
if ( current_peer && current_peer->phase == Peer::SETUP &&
is_peer_msg(current_msgtype) && current_msgtype != MSG_VERSION )
{
Log(LogError, "peer did not send version", current_peer);
CloseConnection(current_peer);
return true;
}
switch ( current_msgtype ) {
case MSG_CLOSE:
PeerDisconnected(current_peer);
return true;
case MSG_CONNECTED:
return ProcessConnected();
case MSG_SERIAL:
return ProcessSerialization();
case MSG_REQUEST_EVENTS:
return ProcessRequestEventsMsg();
case MSG_REQUEST_SYNC:
return ProcessRequestSyncMsg();
case MSG_PHASE_DONE:
return ProcessPhaseDone();
case MSG_ERROR:
return ProcessLogMsg(true);
case MSG_LOG:
return ProcessLogMsg(false);
case MSG_STATS:
return ProcessStatsMsg();
case MSG_CAPTURE_FILTER:
return ProcessCaptureFilterMsg();
case MSG_VERSION:
return ProcessVersionMsg();
case MSG_PING:
return ProcessPingMsg();
case MSG_PONG:
return ProcessPongMsg();
case MSG_CAPS:
return ProcessCapsMsg();
case MSG_SYNC_POINT:
return ProcessSyncPointMsg();
case MSG_TERMINATE:
assert(terminating);
io_sources.Terminate();
return true;
case MSG_REMOTE_PRINT:
return ProcessRemotePrint();
default:
DEBUG_COMM(fmt("unexpected msg type: %d",
int(current_msgtype)));
InternalCommError(fmt("unexpected msg type in DoMessage(): %d",
int(current_msgtype)));
return true; // keep going
}
internal_error("cannot be reached");
return false;
}
void RemoteSerializer::PeerDisconnected(Peer* peer)
{
assert(peer);
if ( peer->suspended_processing )
{
net_continue_processing();
peer->suspended_processing = false;
}
if ( peer->state == Peer::CLOSED || peer->state == Peer::INIT )
return;
if ( peer->state == Peer::PENDING )
{
peer->state = Peer::CLOSED;
Log(LogError, "could not connect", peer);
return;
}
Log(LogInfo, "peer disconnected", peer);
if ( peer->phase != Peer::SETUP )
RaiseEvent(remote_connection_closed, peer);
if ( in_sync == peer )
in_sync = 0;
peer->state = Peer::CLOSED;
peer->phase = Peer::UNKNOWN;
peer->cache_in->Clear();
peer->cache_out->Clear();
UnregisterHandlers(peer);
}
void RemoteSerializer::PeerConnected(Peer* peer)
{
if ( peer->state == Peer::CONNECTED )
return;
peer->state = Peer::CONNECTED;
peer->phase = Peer::SETUP;
peer->sent_version = Peer::NONE;
peer->sync_requested = Peer::NONE;
peer->handshake_done = Peer::NONE;
peer->cache_in->Clear();
peer->cache_out->Clear();
peer->our_runtime = int(current_time(true) - bro_start_time);
peer->sync_point = 0;
if ( ! SendCMsgToChild(MSG_VERSION, peer) )
return;
int len = 4 * sizeof(uint32) + peer->our_class.size() + 1;
char* data = new char[len];
uint32* args = (uint32*) data;
*args++ = htonl(PROTOCOL_VERSION);
*args++ = htonl(peer->cache_out->GetMaxCacheSize());
*args++ = htonl(DATA_FORMAT_VERSION);
*args++ = htonl(peer->our_runtime);
strcpy((char*) args, peer->our_class.c_str());
ChunkedIO::Chunk* c = new ChunkedIO::Chunk;
c->len = len;
c->data = data;
if ( peer->our_class.size() )
Log(LogInfo, fmt("sending class \"%s\"", peer->our_class.c_str()), peer);
if ( ! SendToChild(c) )
{
Log(LogError, "can't send version message");
CloseConnection(peer);
return;
}
peer->sent_version |= Peer::WE;
Log(LogInfo, "peer connected", peer);
Log(LogInfo, "phase: version", peer);
}
RecordVal* RemoteSerializer::MakePeerVal(Peer* peer)
{
RecordVal* v = new RecordVal(::peer);
v->Assign(0, new Val(uint32(peer->id), TYPE_COUNT));
// Sic! Network order for AddrVal, host order for PortVal.
v->Assign(1, new AddrVal(htonl(peer->ip)));
v->Assign(2, new PortVal(peer->port, TRANSPORT_TCP));
v->Assign(3, new Val(false, TYPE_BOOL));
v->Assign(4, new StringVal("")); // set when received
v->Assign(5, peer->peer_class.size() ?
new StringVal(peer->peer_class.c_str()) : 0);
return v;
}
RemoteSerializer::Peer* RemoteSerializer::AddPeer(uint32 ip, uint16 port,
PeerID id)
{
Peer* peer = new Peer;
peer->id = id != PEER_NONE ? id : id_counter++;
peer->ip = ip;
peer->port = port;
peer->state = Peer::INIT;
peer->phase = Peer::UNKNOWN;
peer->sent_version = Peer::NONE;
peer->sync_requested = Peer::NONE;
peer->handshake_done = Peer::NONE;
peer->orig = false;
peer->accept_state = false;
peer->send_state = false;
peer->caps = 0;
peer->comp_level = 0;
peer->suspended_processing = false;
peer->caps = 0;
peer->val = MakePeerVal(peer);
peer->cache_in = new SerializationCache(MAX_CACHE_SIZE);
peer->cache_out = new SerializationCache(MAX_CACHE_SIZE);
peer->sync_point = 0;
peer->print_buffer = 0;
peer->print_buffer_used = 0;
peers.append(peer);
Log(LogInfo, "added peer", peer);
return peer;
}
void RemoteSerializer::UnregisterHandlers(Peer* peer)
{
// Unregister the peers for the EventHandlers.
loop_over_list(peer->handlers, i)
{
peer->handlers[i]->RemoveRemoteHandler(peer->id);
}
}
void RemoteSerializer::RemovePeer(Peer* peer)
{
if ( peer->suspended_processing )
{
net_continue_processing();
peer->suspended_processing = false;
}
peers.remove(peer);
UnregisterHandlers(peer);
Log(LogInfo, "removed peer", peer);
int id = peer->id;
Unref(peer->val);
delete [] peer->print_buffer;
delete peer->cache_in;
delete peer->cache_out;
delete peer;
closed = ! IsActive();
if ( in_sync == peer )
in_sync = 0;
}
RemoteSerializer::Peer* RemoteSerializer::LookupPeer(PeerID id,
bool only_if_connected)
{
Peer* peer = 0;
loop_over_list(peers, i)
if ( peers[i]->id == id )
{
peer = peers[i];
break;
}
if ( ! only_if_connected || (peer && peer->state == Peer::CONNECTED) )
return peer;
else
return 0;
}
bool RemoteSerializer::ProcessVersionMsg()
{
uint32* args = (uint32*) current_args->data;
uint32 version = ntohl(args[0]);
uint32 data_version = ntohl(args[2]);
if ( PROTOCOL_VERSION != version )
{
Log(LogError, fmt("remote protocol version mismatch: got %d, but expected %d",
version, PROTOCOL_VERSION), current_peer);
CloseConnection(current_peer);
return true;
}
// For backwards compatibility, data_version may be null.
if ( data_version && DATA_FORMAT_VERSION != data_version )
{
Log(LogError, fmt("remote data version mismatch: got %d, but expected %d",
data_version, DATA_FORMAT_VERSION),
current_peer);
CloseConnection(current_peer);
return true;
}
uint32 cache_size = ntohl(args[1]);
current_peer->cache_in->SetMaxCacheSize(cache_size);
current_peer->runtime = ntohl(args[3]);
current_peer->sent_version |= Peer::PEER;
if ( current_args->len > 4 * sizeof(uint32) )
{
// The peer sends us a class string.
const char* pclass = (const char*) &args[4];
current_peer->peer_class = pclass;
if ( *pclass )
Log(LogInfo, fmt("peer sent class \"%s\"", pclass), current_peer);
if ( current_peer->val )
current_peer->val->Assign(5, new StringVal(pclass));
}
assert(current_peer->sent_version == Peer::BOTH);
current_peer->phase = Peer::HANDSHAKE;
Log(LogInfo, "phase: handshake", current_peer);
if ( ! SendCapabilities(current_peer) )
return false;
RaiseEvent(remote_connection_established, current_peer);
return true;
}
bool RemoteSerializer::EnterPhaseRunning(Peer* peer)
{
if ( in_sync == peer )
in_sync = 0;
peer->phase = Peer::RUNNING;
Log(LogInfo, "phase: running", peer);
RaiseEvent(remote_connection_handshake_done, peer);
if ( remote_trace_sync_interval )
{
loop_over_list(peers, i)
{
if ( ! SendToChild(MSG_SYNC_POINT, peers[i],
1, current_sync_point) )
return false;
}
}
return true;
}
bool RemoteSerializer::ProcessConnected()
{
// IP and port follow.
uint32* args = (uint32*) current_args->data;
uint32 host = ntohl(args[0]); // ### Fix: only works for IPv4
uint16 port = (uint16) ntohl(args[1]);
if ( ! current_peer )
{
// The other side connected to one of our listening ports.
current_peer = AddPeer(host, port, current_id);
current_peer->orig = false;
}
else if ( current_peer->orig )
{
// It's a successful retry.
current_peer->port = port;
current_peer->accept_state = false;
Unref(current_peer->val);
current_peer->val = MakePeerVal(current_peer);
}
PeerConnected(current_peer);
ID* descr = global_scope()->Lookup("peer_description");
if ( ! descr )
internal_error("peer_description not defined");
SerialInfo info(this);
SendID(&info, current_peer, *descr);
return true;
}
bool RemoteSerializer::ProcessRequestEventsMsg()
{
if ( ! current_peer )
return false;
// Register new handlers.
char* p = current_args->data;
while ( p < current_args->data + current_args->len )
{
EventHandler* handler = event_registry->Lookup(p);
if ( handler )
{
handler->AddRemoteHandler(current_peer->id);
current_peer->handlers.append(handler);
RaiseEvent(remote_event_registered, current_peer, p);
Log(LogInfo, fmt("registered for event %s", p),
current_peer);
// If the other side requested the print_hook event,
// we initialize the buffer.
if ( current_peer->print_buffer == 0 &&
streq(p, "print_hook") )
{
current_peer->print_buffer =
new char[PRINT_BUFFER_SIZE];
current_peer->print_buffer_used = 0;
Log(LogInfo, "initialized print buffer",
current_peer);
}
}
else
Log(LogInfo, fmt("request for unknown event %s", p),
current_peer);
p += strlen(p) + 1;
}
return true;
}
bool RemoteSerializer::ProcessRequestSyncMsg()
{
if ( ! current_peer )
return false;
int auth = 0;
uint32* args = (uint32*) current_args->data;
if ( ntohl(args[0]) != 0 )
{
Log(LogInfo, "peer considers its state authoritative", current_peer);
auth = Peer::AUTH_PEER;
}
current_peer->sync_requested |= Peer::PEER | auth;
return true;
}
bool RemoteSerializer::ProcessPhaseDone()
{
switch ( current_peer->phase ) {
case Peer::HANDSHAKE:
{
current_peer->handshake_done |= Peer::PEER;
if ( current_peer->handshake_done == Peer::BOTH )
HandshakeDone(current_peer);
break;
}
case Peer::SYNC:
{
// Make sure that the other side is supposed to sent us this.
if ( current_peer->send_state )
{
Log(LogError, "unexpected phase_done in sync phase from peer", current_peer);
CloseConnection(current_peer);
return false;
}
if ( ! EnterPhaseRunning(current_peer) )
{
if ( current_peer->suspended_processing )
{
net_continue_processing();
current_peer->suspended_processing = false;
}
return false;
}
if ( current_peer->suspended_processing )
{
net_continue_processing();
current_peer->suspended_processing = false;
}
break;
}
default:
Log(LogError, "unexpected phase_done", current_peer);
CloseConnection(current_peer);
}
return true;
}
bool RemoteSerializer::HandshakeDone(Peer* peer)
{
#ifdef HAVE_LIBZ
if ( peer->caps & Peer::COMPRESSION && peer->comp_level > 0 )
if ( ! SendToChild(MSG_COMPRESS, peer, 1, peer->comp_level) )
return false;
#endif
if ( ! (peer->caps & Peer::PID_64BIT) )
Log(LogInfo, "peer does not support 64bit PIDs; using compatibility mode", peer);
if ( (peer->caps & Peer::NEW_CACHE_STRATEGY) )
Log(LogInfo, "peer supports keep-in-cache; using that", peer);
if ( peer->sync_requested != Peer::NONE )
{
if ( in_sync )
{
Log(LogInfo, "another sync in progress, waiting...",
peer);
sync_pending.append(peer);
return true;
}
if ( (peer->sync_requested & Peer::AUTH_PEER) &&
(peer->sync_requested & Peer::AUTH_WE) )
{
Log(LogError, "misconfiguration: authoritative state on both sides",
current_peer);
CloseConnection(peer);
return false;
}
in_sync = peer;
peer->phase = Peer::SYNC;
// If only one side has requested state synchronization,
// it will get all the state from the peer.
//
// If both sides have shown interest, the one considering
// itself authoritative will send the state. If none is
// authoritative, the peer which is running longest sends
// its state.
//
if ( (peer->sync_requested & Peer::BOTH) != Peer::BOTH )
{
// One side.
if ( peer->sync_requested & Peer::PEER )
peer->send_state = true;
else if ( peer->sync_requested & Peer::WE )
peer->send_state = false;
else
internal_error("illegal sync_requested value");
}
else
{
// Both.
if ( peer->sync_requested & Peer::AUTH_WE )
peer->send_state = true;
else if ( peer->sync_requested & Peer::AUTH_PEER )
peer->send_state = false;
else
{
if ( peer->our_runtime == peer->runtime )
peer->send_state = peer->orig;
else
peer->send_state = (peer->our_runtime >
peer->runtime);
}
}
Log(LogInfo, fmt("phase: sync (%s)", (peer->send_state ? "sender" : "receiver")), peer);
if ( peer->send_state )
{
SerialInfo* info = new SerialInfo(this);
SendAllSynchronized(peer, info);
}
else
{
// Suspend until we got everything.
net_suspend_processing();
peer->suspended_processing = true;
}
}
else
return EnterPhaseRunning(peer);
return true;
}
bool RemoteSerializer::ProcessPingMsg()
{
if ( ! current_peer )
return false;
if ( ! SendToChild(MSG_PONG, current_peer,
current_args->data, current_args->len) )
return false;
return true;
}
bool RemoteSerializer::ProcessPongMsg()
{
if ( ! current_peer )
return false;
ping_args* args = (ping_args*) current_args->data;
val_list* vl = new val_list;
vl->append(current_peer->val->Ref());
vl->append(new Val((unsigned int) ntohl(args->seq), TYPE_COUNT));
vl->append(new Val(current_time(true) - ntohd(args->time1),
TYPE_INTERVAL));
vl->append(new Val(ntohd(args->time2), TYPE_INTERVAL));
vl->append(new Val(ntohd(args->time3), TYPE_INTERVAL));
mgr.QueueEvent(remote_pong, vl);
return true;
}
bool RemoteSerializer::ProcessCapsMsg()
{
if ( ! current_peer )
return false;
uint32* args = (uint32*) current_args->data;
current_peer->caps = ntohl(args[0]);
return true;
}
bool RemoteSerializer::ProcessLogMsg(bool is_error)
{
Log(is_error ? LogError : LogInfo, current_args->data, 0, LogChild);
return true;
}
bool RemoteSerializer::ProcessStatsMsg()
{
// Take the opportunity to log our stats, too.
LogStats();
// Split the concatenated child stats into indiviual log messages.
int count = 0;
for ( char* p = current_args->data;
p < current_args->data + current_args->len; p += strlen(p) + 1 )
Log(LogInfo, fmt("child statistics: [%d] %s", count++, p),
current_peer);
return true;
}
bool RemoteSerializer::ProcessCaptureFilterMsg()
{
if ( ! current_peer )
return false;
RaiseEvent(remote_capture_filter, current_peer, current_args->data);
return true;
}
bool RemoteSerializer::CheckSyncPoints()
{
if ( ! current_sync_point )
return false;
int ready = 0;
loop_over_list(peers, i)
if ( peers[i]->sync_point >= current_sync_point )
ready++;
if ( ready < remote_trace_sync_peers )
return false;
if ( current_sync_point == FINAL_SYNC_POINT )
{
Log(LogInfo, fmt("all peers reached final sync-point, going to finish"));
Terminate();
}
else
Log(LogInfo, fmt("all peers reached sync-point %u",
current_sync_point));
if ( syncing_times )
{
loop_over_list(peers, i)
{
if ( peers[i]->suspended_processing )
{
net_continue_processing();
peers[i]->suspended_processing = false;
}
}
syncing_times = false;
}
return true;
}
bool RemoteSerializer::ProcessSyncPointMsg()
{
if ( ! current_peer )
return false;
uint32* args = (uint32*) current_args->data;
uint32 count = ntohl(args[0]);
current_peer->sync_point = max(current_peer->sync_point, count);
if ( current_peer->sync_point == FINAL_SYNC_POINT )
Log(LogInfo, fmt("reached final sync-point"), current_peer);
else
Log(LogInfo, fmt("reached sync-point %u", current_peer->sync_point), current_peer);
if ( syncing_times )
CheckSyncPoints();
return true;
}
bool RemoteSerializer::ProcessSerialization()
{
if ( current_peer->state == Peer::CLOSING )
return false;
SetCache(current_peer->cache_in);
UnserialInfo info(this);
bool accept_state = current_peer->accept_state;
#if 0
// If processing is suspended, we unserialize the data but throw
// it away.
if ( current_peer->phase == Peer::RUNNING &&
net_is_processing_suspended() )
accept_state = false;
#endif
assert(current_args);
info.chunk = current_args;
info.install_globals = accept_state;
info.install_conns = accept_state;
info.ignore_callbacks = ! accept_state;
if ( current_peer->phase != Peer::RUNNING )
info.id_policy = UnserialInfo::InstantiateNew;
else
info.id_policy = accept_state ?
UnserialInfo::CopyNewToCurrent :
UnserialInfo::Keep;
if ( ! (current_peer->caps & Peer::PID_64BIT) ||
current_peer->phase != Peer::RUNNING )
info.pid_32bit = true;
if ( (current_peer->caps & Peer::NEW_CACHE_STRATEGY) &&
current_peer->phase == Peer::RUNNING )
info.new_cache_strategy = true;
if ( ! forward_remote_state_changes )
ignore_accesses = true;
source_peer = current_peer;
int i = Unserialize(&info);
source_peer = 0;
if ( ! forward_remote_state_changes )
ignore_accesses = false;
if ( i < 0 )
{
Log(LogError, "unserialization error", current_peer);
CloseConnection(current_peer);
// Error
return false;
}
return true;
}
bool RemoteSerializer::FlushPrintBuffer(Peer* p)
{
if ( p->state == Peer::CLOSING )
return false;
if ( ! p->print_buffer )
return true;
SendToChild(MSG_REMOTE_PRINT, p, p->print_buffer, p->print_buffer_used);
p->print_buffer = new char[PRINT_BUFFER_SIZE];
p->print_buffer_used = 0;
return true;
}
bool RemoteSerializer::SendPrintHookEvent(BroFile* f, const char* txt)
{
loop_over_list(peers, i)
{
Peer* p = peers[i];
if ( ! p->print_buffer )
continue;
const char* fname = f->Name();
if ( ! fname )
continue; // not a managed file.
int len = strlen(txt);
// We cut off everything after the max buffer size. That
// makes the code a bit easier, and we shouldn't have such
// long lines anyway.
len = min(len, PRINT_BUFFER_SIZE - strlen(fname) - 2);
// If there's not enough space in the buffer, flush it.
int need = strlen(fname) + 1 + len + 1;
if ( p->print_buffer_used + need > PRINT_BUFFER_SIZE )
{
if ( ! FlushPrintBuffer(p) )
return false;
}
assert(p->print_buffer_used + need <= PRINT_BUFFER_SIZE);
char* dst = p->print_buffer + p->print_buffer_used;
strcpy(dst, fname);
dst += strlen(fname) + 1;
memcpy(dst, txt, len);
dst += len;
*dst++ = '\0';
p->print_buffer_used = dst - p->print_buffer;
}
return true;
}
bool RemoteSerializer::ProcessRemotePrint()
{
if ( current_peer->state == Peer::CLOSING )
return false;
const char* p = current_args->data;
while ( p < current_args->data + current_args->len )
{
const char* fname = p;
p += strlen(p) + 1;
const char* txt = p;
p += strlen(p) + 1;
val_list* vl = new val_list(2);
BroFile* f = BroFile::GetFile(fname);
Ref(f);
vl->append(new Val(f));
vl->append(new StringVal(txt));
GotEvent("print_hook", -1.0, print_hook, vl);
}
return true;
}
void RemoteSerializer::GotEvent(const char* name, double time,
EventHandlerPtr event, val_list* args)
{
if ( time >= 0 )
{
// Marker for being called from ProcessRemotePrint().
DEBUG_COMM("parent: got event");
++stats.events.in;
}
if ( ! current_peer )
{
Error("unserialized event from unknown peer");
return;
}
BufferedEvent* e = new BufferedEvent;
// Our time, not the time when the event was generated.
e->time = pkt_srcs.length() ?
time_t(network_time) : time_t(timer_mgr->Time());
e->src = current_peer->id;
e->handler = event;
e->args = args;
events.append(e);
}
void RemoteSerializer::GotFunctionCall(const char* name, double time,
Func* function, val_list* args)
{
DEBUG_COMM("parent: got function call");
++stats.events.in;
if ( ! current_peer )
{
Error("unserialized function from unknown peer");
return;
}
function->Call(args);
}
void RemoteSerializer::GotID(ID* id, Val* val)
{
++stats.ids.in;
if ( ! current_peer )
{
Error("unserialized id from unknown peer");
Unref(id);
return;
}
if ( current_peer->phase == Peer::HANDSHAKE &&
streq(id->Name(), "peer_description") )
{
if ( val->Type()->Tag() != TYPE_STRING )
{
Error("peer_description not a string");
Unref(id);
return;
}
const char* desc = val->AsString()->CheckString();
current_peer->val->Assign(4, new StringVal(desc));
Log(LogInfo, fmt("peer_description is %s",
(desc && *desc) ? desc : "not set"),
current_peer);
Unref(id);
return;
}
if ( id->Name()[0] == '#' )
{
// This is a globally unique, non-user-visible ID.
// Only MutableVals can be bound to names starting with '#'.
assert(val->IsMutableVal());
// It must be already installed in the global namespace:
// either we saw it before, or MutableVal::Unserialize()
// installed it.
assert(global_scope()->Lookup(id->Name()));
// Only synchronized values can arrive here.
assert(((MutableVal*) val)->GetProperties() & MutableVal::SYNCHRONIZED);
DBG_LOG(DBG_COMM, "got ID %s from peer\n", id->Name());
}
Unref(id);
}
void RemoteSerializer::GotConnection(Connection* c)
{
++stats.conns.in;
// Nothing else to-do. Connection will be installed automatically
// (if allowed).
Unref(c);
}
void RemoteSerializer::GotStateAccess(StateAccess* s)
{
++stats.accesses.in;
ODesc d;
DBG_LOG(DBG_COMM, "got StateAccess: %s", (s->Describe(&d), d.Description()));
if ( ! current_peer )
{
Error("unserialized function from unknown peer");
return;
}
if ( current_peer->sync_requested & Peer::WE )
s->Replay();
delete s;
}
void RemoteSerializer::GotTimer(Timer* s)
{
run_time("RemoteSerializer::GotTimer not implemented");
}
void RemoteSerializer::GotPacket(Packet* p)
{
++stats.packets.in;
BufferedPacket* bp = new BufferedPacket;
bp->time = time_t(timer_mgr->Time());
bp->p = p;
packets.append(bp);
}
void RemoteSerializer::Log(LogLevel level, const char* msg)
{
Log(level, msg, 0, LogParent);
}
void RemoteSerializer::Log(LogLevel level, const char* msg, Peer* peer,
LogSrc src)
{
const int BUFSIZE = 1024;
char buffer[BUFSIZE];
int len = 0;
if ( peer )
len += snprintf(buffer + len, sizeof(buffer) - len,
"[#%d/%s:%d] ", int(peer->id), ip2a(peer->ip),
peer->port);
len += safe_snprintf(buffer + len, sizeof(buffer) - len, "%s", msg);
val_list* vl = new val_list();
vl->append(new Val(level, TYPE_COUNT));
vl->append(new Val(src, TYPE_COUNT));
vl->append(new StringVal(buffer));
mgr.QueueEvent(remote_log, vl);
DEBUG_COMM(fmt("parent: %.6f %s", current_time(), buffer));
}
void RemoteSerializer::RaiseEvent(EventHandlerPtr event, Peer* peer,
const char* arg)
{
val_list* vl = new val_list;
if ( peer )
{
Ref(peer->val);
vl->append(peer->val);
}
else
{
Val* v = mgr.GetLocalPeerVal();
v->Ref();
vl->append(v);
}
if ( arg )
vl->append(new StringVal(arg));
// If we only have remote sources, the network time
// will not increase as long as no peers are connected.
// Therefore, we send these events immediately.
mgr.Dispatch(new Event(event, vl, PEER_LOCAL));
}
void RemoteSerializer::LogStats()
{
if ( ! io )
return;
char buffer[512];
io->Stats(buffer, 512);
Log(LogInfo, fmt("parent statistics: %s events=%lu/%lu operations=%lu/%lu",
buffer, stats.events.in, stats.events.out,
stats.accesses.in, stats.accesses.out));
}
RecordVal* RemoteSerializer::GetPeerVal(PeerID id)
{
Peer* peer = LookupPeer(id, true);
if ( ! peer )
return 0;
Ref(peer->val);
return peer->val;
}
void RemoteSerializer::ChildDied()
{
Log(LogError, "child died");
closed = true;
child_pid = 0;
// Shut down the main process as well.
terminate_processing();
}
bool RemoteSerializer::SendCMsgToChild(char msg_type, Peer* peer)
{
if ( ! sendCMsg(io, msg_type, peer ? peer->id : PEER_NONE) )
{
warn(fmt("can't send message of type %d: %s",
msg_type, io->Error()));
return false;
}
return true;
}
bool RemoteSerializer::SendToChild(char type, Peer* peer, char* str, int len)
{
DEBUG_COMM(fmt("parent: (->child) %s (#%" PRI_SOURCE_ID ", %s)", msgToStr(type), peer ? peer->id : PEER_NONE, str));
if ( ! child_pid )
return false;
if ( sendToIO(io, type, peer ? peer->id : PEER_NONE, str, len) )
return true;
if ( io->Eof() )
ChildDied();
FatalError(io->Error());
return false;
}
bool RemoteSerializer::SendToChild(char type, Peer* peer, int nargs, ...)
{
va_list ap;
if ( ! child_pid )
return false;
#ifdef DEBUG
va_start(ap, nargs);
DEBUG_COMM(fmt("parent: (->child) %s (#%" PRI_SOURCE_ID ",%s)",
msgToStr(type), peer ? peer->id : PEER_NONE, fmt_uint32s(nargs, ap)));
va_end(ap);
#endif
va_start(ap, nargs);
bool ret = sendToIO(io, type, peer ? peer->id : PEER_NONE, nargs, ap);
va_end(ap);
if ( ret )
return true;
if ( io->Eof() )
ChildDied();
FatalError(io->Error());
return false;
}
bool RemoteSerializer::SendToChild(ChunkedIO::Chunk* c)
{
DEBUG_COMM(fmt("parent: (->child) chunk of size %d", c->len));
if ( ! child_pid )
return false;
if ( sendToIO(io, c) )
return true;
if ( io->Eof() )
ChildDied();
FatalError(io->Error());
return false;
}
void RemoteSerializer::FatalError(const char* msg)
{
msg = fmt("fatal error, shutting down communication: %s", msg);
Log(LogError, msg);
error(msg);
closed = true;
kill(child_pid, SIGQUIT);
child_pid = 0;
using_communication = false;
io->Clear();
}
bool RemoteSerializer::IsActive()
{
if ( listening )
return true;
loop_over_list(peers, i)
if ( peers[i]->state == Peer::PENDING ||
peers[i]->state == Peer::CONNECTED )
return true;
return false;
}
const char* const* RemoteSerializer::GetBuiltins() const
{
static const char* builtins[] = { "connect", "listen", 0 };
return builtins;
}
void RemoteSerializer::ReportError(const char* msg)
{
if ( current_peer && current_peer->phase != Peer::SETUP )
RaiseEvent(remote_connection_error, current_peer, msg);
Log(LogError, msg, current_peer);
}
void RemoteSerializer::InternalCommError(const char* msg)
{
#ifdef DEBUG_COMMUNICATION
DumpDebugData();
#else
internal_error(msg);
#endif
}
#ifdef DEBUG_COMMUNICATION
void RemoteSerializer::DumpDebugData()
{
Log(LogError, "dumping debug data and terminating ...");
io->DumpDebugData("comm-dump.parent", true);
io->DumpDebugData("comm-dump.parent", false);
SendToChild(MSG_DEBUG_DUMP, 0, 0);
Terminate();
}
static ChunkedIO* openDump(const char* file)
{
int fd = open(file, O_RDONLY, 0600);
if ( fd < 0 )
{
fprintf(stderr, "cannot open %s: %s\n", file, strerror(errno));
return 0;
}
return new ChunkedIOFd(fd, "dump-file");
}
void RemoteSerializer::ReadDumpAsMessageType(const char* file)
{
ChunkedIO* io = openDump(file);
if ( ! io )
return;
ChunkedIO::Chunk* chunk;
if ( ! io->Read(&chunk, true ) )
{
fprintf(stderr, "cannot read %s: %s\n", file, strerror(errno));
return;
}
CMsg* msg = (CMsg*) chunk->data;
delete [] chunk->data;
delete io;
}
void RemoteSerializer::ReadDumpAsSerialization(const char* file)
{
FileSerializer s;
UnserialInfo info(&s);
info.print = stdout;
info.install_uniques = info.ignore_callbacks = true;
s.Read(&info, file, false);
}
#endif
////////////////////////////
// If true (set by signal handler), we will log some stats to parent.
static bool log_stats = false;
static bool log_prof = false;
// How often stats are sent (in seconds).
// Perhaps we should make this configurable...
const int STATS_INTERVAL = 60;
static RETSIGTYPE sig_handler_log(int signo)
{
// SIGALRM is the only one we get.
log_stats = true;
}
static RETSIGTYPE sig_handler_prof(int signo)
{
log_prof = true;
}
SocketComm::SocketComm()
{
io = 0;
// We start the ID counter high so that IDs assigned by us
// (hopefully) don't conflict with those of our parent.
id_counter = 10000;
parent_peer = 0;
parent_msgstate = TYPE;
shutting_conns_down = false;
terminating = false;
killing = false;
listen_fd_clear = -1;
listen_fd_ssl = -1;
listen_next_try = 0;
// We don't want to use the signal handlers of our parent.
(void) setsignal(SIGTERM, SIG_DFL);
(void) setsignal(SIGINT, SIG_DFL);
(void) setsignal(SIGUSR1, SIG_DFL);
(void) setsignal(SIGUSR2, SIG_DFL);
(void) setsignal(SIGCONT, SIG_DFL);
(void) setsignal(SIGCHLD, SIG_DFL);
// Raping SIGPROF for profiling
(void) setsignal(SIGPROF, sig_handler_prof);
(void) setsignal(SIGALRM, sig_handler_log);
alarm(STATS_INTERVAL);
}
SocketComm::~SocketComm()
{
loop_over_list(peers, i)
delete peers[i]->io;
delete io;
close(listen_fd_clear);
close(listen_fd_ssl);
}
static unsigned int first_rtime = 0;
void SocketComm::Run()
{
first_rtime = (unsigned int) current_time(true);
while ( true )
{
// Logging signaled?
if ( log_stats )
LogStats();
// Termination signaled
if ( terminating )
CheckFinished();
// Build FDSets for select.
fd_set fd_read, fd_write, fd_except;
FD_ZERO(&fd_read);
FD_ZERO(&fd_write);
FD_ZERO(&fd_except);
int max_fd = 0;
FD_SET(io->Fd(), &fd_read);
max_fd = io->Fd();
loop_over_list(peers, i)
{
if ( peers[i]->connected )
{
FD_SET(peers[i]->io->Fd(), &fd_read);
if ( peers[i]->io->Fd() > max_fd )
max_fd = peers[i]->io->Fd();
}
else
{
if ( peers[i]->next_try > 0 &&
time(0) > peers[i]->next_try )
// Try reconnect.
Connect(peers[i]);
}
}
if ( listen_next_try && time(0) > listen_next_try )
Listen(listen_if, listen_port, listen_ssl);
if ( listen_fd_clear >= 0 )
{
FD_SET(listen_fd_clear, &fd_read);
if ( listen_fd_clear > max_fd )
max_fd = listen_fd_clear;
}
if ( listen_fd_ssl >= 0 )
{
FD_SET(listen_fd_ssl, &fd_read);
if ( listen_fd_ssl > max_fd )
max_fd = listen_fd_ssl;
}
if ( io->IsFillingUp() && ! shutting_conns_down )
{
Error("queue to parent filling up; shutting down heaviest connection");
const ChunkedIO::Statistics* stats = 0;
unsigned long max = 0;
Peer* max_peer = 0;
loop_over_list(peers, i)
{
if ( ! peers[i]->connected )
continue;
stats = peers[i]->io->Stats();
if ( stats->bytes_read > max )
{
max = stats->bytes_read;
max_peer = peers[i];
}
}
if ( max_peer )
CloseConnection(max_peer, true);
shutting_conns_down = true;
}
if ( ! io->IsFillingUp() && shutting_conns_down )
shutting_conns_down = false;
// We cannot rely solely on select() as the there may
// be some data left in our input/output queues. So, we use
// a small timeout for select and check for data
// manually afterwards.
static long selects = 0;
static long canwrites = 0;
static long timeouts = 0;
++selects;
if ( io->CanWrite() )
++canwrites;
// FIXME: Fine-tune this (timeouts, flush, etc.)
struct timeval small_timeout;
small_timeout.tv_sec = 0;
small_timeout.tv_usec =
io->CanWrite() || io->CanRead() ? 1 : 10;
int a = select(max_fd + 1, &fd_read, &fd_write, &fd_except,
&small_timeout);
if ( a == 0 )
++timeouts;
if ( selects % 100000 == 0 )
Log(fmt("selects=%ld canwrites=%ld timeouts=%ld", selects, canwrites, timeouts));
if ( a < 0 )
// Ignore errors for now.
continue;
if ( io->CanRead() )
ProcessParentMessage();
io->Flush();
loop_over_list(peers, j)
{
// We have to be careful here as the peer may
// be removed when an error occurs.
Peer* current = peers[j];
int round = 0;
while ( ++round <= 10 && j < peers.length() &&
peers[j] == current && current->connected &&
current->io->CanRead() )
{
ProcessRemoteMessage(current);
}
}
if ( listen_fd_clear >= 0 &&
FD_ISSET(listen_fd_clear, &fd_read) )
AcceptConnection(listen_fd_clear);
if ( listen_fd_ssl >= 0 && FD_ISSET(listen_fd_ssl, &fd_read) )
AcceptConnection(listen_fd_ssl);
// Hack to display CPU usage of the child, triggered via
// SIGPROF.
static unsigned int first_rtime = 0;
if ( first_rtime == 0 )
first_rtime = (unsigned int) current_time(true);
if ( log_prof )
{
LogProf();
log_prof = false;
}
}
}
bool SocketComm::ProcessParentMessage()
{
switch ( parent_msgstate ) {
case TYPE:
{
parent_peer = 0;
parent_msgtype = MSG_NONE;
// CMsg follows
ChunkedIO::Chunk* c;
if ( ! io->Read(&c) )
{
if ( io->Eof() )
Error("parent died", true);
Error(fmt("can't read parent's cmsg: %s",
io->Error()), true);
return false;
}
if ( ! c )
return true;
CMsg* msg = (CMsg*) c->data;
parent_peer = LookupPeer(msg->Peer(), false);
parent_id = msg->Peer();
parent_msgtype = msg->Type();
parent_args = 0;
delete [] c->data;
delete c;
switch ( parent_msgtype ) {
case MSG_LISTEN_STOP:
case MSG_CLOSE:
case MSG_CLOSE_ALL:
case MSG_TERMINATE:
case MSG_PHASE_DONE:
case MSG_DEBUG_DUMP:
{
// No further argument chunk.
parent_msgstate = TYPE;
return DoParentMessage();
}
case MSG_LISTEN:
case MSG_CONNECT_TO:
case MSG_COMPRESS:
case MSG_PING:
case MSG_PONG:
case MSG_REQUEST_EVENTS:
case MSG_REQUEST_SYNC:
case MSG_SERIAL:
case MSG_CAPTURE_FILTER:
case MSG_VERSION:
case MSG_CAPS:
case MSG_SYNC_POINT:
case MSG_REMOTE_PRINT:
{
// One further argument chunk.
parent_msgstate = ARGS;
return ProcessParentMessage();
}
default:
internal_error("unknown msg type %d", parent_msgtype);
return true;
}
internal_error("cannot be reached");
}
case ARGS:
{
// Argument chunk follows.
ChunkedIO::Chunk* c = 0;
READ_CHUNK(io, c, Error("parent died", true));
parent_args = c;
parent_msgstate = TYPE;
bool result = DoParentMessage();
if ( parent_args )
{
delete [] parent_args->data;
delete parent_args;
parent_args = 0;
}
return result;
}
default:
internal_error("unknown msgstate");
}
internal_error("cannot be reached");
}
bool SocketComm::DoParentMessage()
{
switch ( parent_msgtype ) {
case MSG_LISTEN_STOP:
{
if ( listen_fd_ssl >= 0 )
close(listen_fd_ssl);
if ( listen_fd_clear >= 0 )
close(listen_fd_clear);
listen_fd_clear = listen_fd_ssl = -1;
Log("stopped listening");
return true;
}
case MSG_CLOSE:
{
if ( parent_peer && parent_peer->connected )
CloseConnection(parent_peer, false);
return true;
}
case MSG_CLOSE_ALL:
{
loop_over_list(peers, i)
{
if ( peers[i]->connected )
CloseConnection(peers[i], false);
}
return true;
}
case MSG_TERMINATE:
{
terminating = true;
CheckFinished();
return true;
}
case MSG_DEBUG_DUMP:
{
#ifdef DEBUG_COMMUNICATION
io->DumpDebugData("comm-dump.child.pipe", true);
io->DumpDebugData("comm-dump.child.pipe", false);
loop_over_list(peers, j)
{
RemoteSerializer::PeerID id = peers[j]->id;
peers[j]->io->DumpDebugData(fmt("comm-dump.child.peer.%d", id), true);
peers[j]->io->DumpDebugData(fmt("comm-dump.child.peer.%d", id), false);
}
#else
internal_error("DEBUG_DUMP support not compiled in");
#endif
return true;
}
case MSG_PHASE_DONE:
{
// No argument block follows.
if ( parent_peer && parent_peer->connected )
{
DEBUG_COMM("child: forwarding with MSG_PHASE_DONE to peer");
if ( ! SendToPeer(parent_peer, MSG_PHASE_DONE, 0) )
return false;
}
return true;
}
case MSG_LISTEN:
return ProcessListen();
case MSG_CONNECT_TO:
return ProcessConnectTo();
case MSG_COMPRESS:
return ProcessParentCompress();
case MSG_PING:
{
// Set time2.
assert(parent_args);
ping_args* args = (ping_args*) parent_args->data;
args->time2 = htond(current_time(true));
return ForwardChunkToPeer();
}
case MSG_PONG:
{
assert(parent_args);
// Calculate time delta.
ping_args* args = (ping_args*) parent_args->data;
args->time3 = htond(current_time(true) - ntohd(args->time3));
return ForwardChunkToPeer();
}
case MSG_REQUEST_EVENTS:
case MSG_REQUEST_SYNC:
case MSG_SERIAL:
case MSG_CAPTURE_FILTER:
case MSG_VERSION:
case MSG_CAPS:
case MSG_SYNC_POINT:
case MSG_REMOTE_PRINT:
assert(parent_args);
return ForwardChunkToPeer();
default:
internal_error("ProcessParentMessage: unexpected state");
}
internal_error("cannot be reached");
}
bool SocketComm::ForwardChunkToPeer()
{
char state = parent_msgtype;
if ( parent_peer && parent_peer->connected )
{
DEBUG_COMM("child: forwarding with 1 arg to peer");
if ( ! SendToPeer(parent_peer, state, 0) )
return false;
if ( ! SendToPeer(parent_peer, parent_args) )
return false;
parent_args = 0;
}
else
{
#ifdef DEBUG
if ( parent_peer )
DEBUG_COMM(fmt("child: not connected to #%" PRI_SOURCE_ID, parent_id));
#endif
}
return true;
}
bool SocketComm::ProcessConnectTo()
{
assert(parent_args);
uint32* args = (uint32*) parent_args->data;
Peer* peer = new Peer;
peer->id = ntohl(args[0]);
peer->ip = ntohl(args[1]);
peer->port = ntohl(args[2]);
peer->retry = ntohl(args[3]);
peer->ssl = ntohl(args[4]);
Connect(peer);
return true;
}
bool SocketComm::ProcessListen()
{
assert(parent_args);
uint32* args = (uint32*) parent_args->data;
uint32 addr = ntohl(args[0]);
uint16 port = uint16(ntohl(args[1]));
uint32 ssl = ntohl(args[2]);
return Listen(addr, port, ssl);
}
bool SocketComm::ProcessParentCompress()
{
#ifndef HAVE_LIBZ
internal_error("supposed to enable compression but don't have zlib");
return false;
#else
assert(parent_args);
uint32* args = (uint32*) parent_args->data;
uint32 level = ntohl(args[0]);
if ( ! parent_peer->compressor )
{
parent_peer->io = new CompressedChunkedIO(parent_peer->io);
parent_peer->io->Init();
parent_peer->compressor = true;
}
// Signal compression to peer.
if ( ! SendToPeer(parent_peer, MSG_COMPRESS, 0) )
return false;
// This cast is safe.
CompressedChunkedIO* comp_io = (CompressedChunkedIO*) parent_peer->io;
comp_io->EnableCompression(level);
Log(fmt("enabling compression (level %d)", level), parent_peer);
return true;
#endif
}
bool SocketComm::ProcessRemoteMessage(SocketComm::Peer* peer)
{
assert(peer);
peer->io->Flush();
switch ( peer->state ) {
case MSG_NONE:
{ // CMsg follows
ChunkedIO::Chunk* c;
READ_CHUNK(peer->io, c,
(CloseConnection(peer, true), peer))
CMsg* msg = (CMsg*) c->data;
DEBUG_COMM(fmt("child: %s from peer #%" PRI_SOURCE_ID,
msgToStr(msg->Type()), peer->id));
switch ( msg->Type() ) {
case MSG_PHASE_DONE:
// No further argument block.
DEBUG_COMM("child: forwarding with 0 args to parent");
if ( ! SendToParent(msg->Type(), peer, 0) )
return false;
break;
default:
peer->state = msg->Type();
}
delete [] c->data;
delete c;
break;
}
case MSG_COMPRESS:
ProcessPeerCompress(peer);
break;
case MSG_PING:
{
// Messages with one further argument block which we simply
// forward to our parent.
ChunkedIO::Chunk* c;
READ_CHUNK(peer->io, c,
(CloseConnection(peer, true), peer))
// Set time3.
ping_args* args = (ping_args*) c->data;
args->time3 = htond(current_time(true));
return ForwardChunkToParent(peer, c);
}
case MSG_PONG:
{
// Messages with one further argument block which we simply
// forward to our parent.
ChunkedIO::Chunk* c;
READ_CHUNK(peer->io, c,
(CloseConnection(peer, true), peer))
// Calculate time delta.
ping_args* args = (ping_args*) c->data;
args->time2 = htond(current_time(true) - ntohd(args->time2));
return ForwardChunkToParent(peer, c);
}
case MSG_REQUEST_EVENTS:
case MSG_REQUEST_SYNC:
case MSG_SERIAL:
case MSG_CAPTURE_FILTER:
case MSG_VERSION:
case MSG_CAPS:
case MSG_SYNC_POINT:
case MSG_REMOTE_PRINT:
{
// Messages with one further argument block which we simply
// forward to our parent.
ChunkedIO::Chunk* c;
READ_CHUNK(peer->io, c,
(CloseConnection(peer, true), peer))
return ForwardChunkToParent(peer, c);
}
default:
internal_error("ProcessRemoteMessage: unexpected state");
}
return true;
}
bool SocketComm::ForwardChunkToParent(Peer* peer, ChunkedIO::Chunk* c)
{
char state = peer->state;
peer->state = MSG_NONE;
DEBUG_COMM("child: forwarding message with 1 arg to parent");
if ( ! SendToParent(state, peer, 0) )
return false;
if ( ! SendToParent(c) )
return false;
io->Flush(); // FIXME: Needed?
return true;
}
bool SocketComm::ProcessPeerCompress(Peer* peer)
{
peer->state = MSG_NONE;
#ifndef HAVE_LIBZ
Error("peer compresses although we do not support it", peer);
return false;
#else
if ( ! parent_peer->compressor )
{
parent_peer->io = new CompressedChunkedIO(parent_peer->io);
parent_peer->io->Init();
parent_peer->compressor = true;
}
// This cast is safe here.
((CompressedChunkedIO*) peer->io)->EnableDecompression();
Log("enabling decompression", peer);
return true;
#endif
}
bool SocketComm::Connect(Peer* peer)
{
struct sockaddr_in server;
int sockfd = socket(PF_INET, SOCK_STREAM, 0);
if ( sockfd < 0 )
{
Error(fmt("can't create socket, %s", strerror(errno)));
return false;
}
bzero(&server, sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(peer->port);
server.sin_addr.s_addr = htonl(peer->ip);
bool connected = true;
if ( connect(sockfd, (sockaddr*) &server, sizeof(server)) < 0 )
{
Error(fmt("connect failed: %s", strerror(errno)), peer);
close(sockfd);
connected = false;
}
if ( ! (connected || peer->retry) )
{
CloseConnection(peer, false);
return false;
}
Peer* existing_peer = LookupPeer(peer->id, false);
if ( existing_peer )
{
*existing_peer = *peer;
peer = existing_peer;
}
else
peers.append(peer);
peer->connected = connected;
peer->next_try = connected ? 0 : time(0) + peer->retry;
peer->state = MSG_NONE;
peer->io = 0;
peer->compressor = false;
if ( connected )
{
if ( peer->ssl )
{
peer->io = new ChunkedIOSSL(sockfd, false);
}
else
peer->io = new ChunkedIOFd(sockfd, "child->peer");
if ( ! peer->io->Init() )
{
Error(fmt("can't init peer io: %s",
peer->io->Error()), peer);
return 0;
}
}
if ( connected )
{
Log("connected", peer);
if ( ! SendToParent(MSG_CONNECTED, peer, 2, peer->ip, peer->port) )
return false;
}
return connected;
}
bool SocketComm::CloseConnection(Peer* peer, bool reconnect)
{
if ( ! SendToParent(MSG_CLOSE, peer, 0) )
return false;
Log("connection closed", peer);
if ( ! peer->retry || ! reconnect )
{
peers.remove(peer);
delete peer->io; // This will close the fd.
delete peer;
}
else
{
delete peer->io; // This will close the fd.
peer->io = 0;
peer->connected = false;
peer->next_try = time(0) + peer->retry;
}
if ( parent_peer == peer )
{
parent_peer = 0;
parent_id = RemoteSerializer::PEER_NONE;
}
return true;
}
bool SocketComm::Listen(uint32 ip, uint16 port, bool expect_ssl)
{
int* listen_fd = expect_ssl ? &listen_fd_ssl : &listen_fd_clear;
if ( *listen_fd >= 0 )
close(*listen_fd);
struct sockaddr_in server;
*listen_fd = socket(PF_INET, SOCK_STREAM, 0);
if ( *listen_fd < 0 )
{
Error(fmt("can't create listen socket, %s",
strerror(errno)));
return false;
}
// Set SO_REUSEADDR.
int turn_on = 1;
if ( setsockopt(*listen_fd, SOL_SOCKET, SO_REUSEADDR,
&turn_on, sizeof(turn_on)) < 0 )
{
Error(fmt("can't set SO_REUSEADDR, %s",
strerror(errno)));
return false;
}
bzero(&server, sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(port);
server.sin_addr.s_addr = htonl(ip);
if ( bind(*listen_fd, (sockaddr*) &server, sizeof(server)) < 0 )
{
Error(fmt("can't bind to port %d, %s", port, strerror(errno)));
*listen_fd = -1;
if ( errno == EADDRINUSE )
{
listen_if = ip;
listen_port = port;
listen_ssl = expect_ssl;
// FIXME: Make this timeout configurable.
listen_next_try = time(0) + 30;
}
return false;
}
if ( listen(*listen_fd, 50) < 0 )
{
Error(fmt("can't listen, %s", strerror(errno)));
return false;
}
listen_next_try = 0;
Log(fmt("listening on %s:%d (%s)",
ip2a(ip), port, expect_ssl ? "ssl" : "clear"));
return true;
}
bool SocketComm::AcceptConnection(int fd)
{
sockaddr_in client;
socklen_t len = sizeof(client);
int clientfd = accept(fd, (sockaddr*) &client, &len);
if ( clientfd < 0 )
{
Error(fmt("accept failed, %s %d",
strerror(errno), errno));
return false;
}
Peer* peer = new Peer;
peer->id = id_counter++;
peer->ip = ntohl(client.sin_addr.s_addr);
peer->port = ntohs(client.sin_port);
peer->connected = true;
peer->ssl = (fd == listen_fd_ssl);
peer->compressor = false;
if ( peer->ssl )
peer->io = new ChunkedIOSSL(clientfd, true);
else
peer->io = new ChunkedIOFd(clientfd, "child->peer");
if ( ! peer->io->Init() )
{
Error(fmt("can't init peer io: %s",
peer->io->Error()), peer);
return false;
}
peers.append(peer);
Log(fmt("accepted %s connection", peer->ssl ? "SSL" : "clear"), peer);
if ( ! SendToParent(MSG_CONNECTED, peer, 2, peer->ip, peer->port) )
return false;
return true;
}
const char* SocketComm::MakeLogString(const char* msg, Peer* peer)
{
const int BUFSIZE = 1024;
static char* buffer = 0;
if ( ! buffer )
buffer = new char[BUFSIZE];
int len = 0;
if ( peer )
len = snprintf(buffer, BUFSIZE, "[#%d/%s:%d] ", int(peer->id),
ip2a(peer->ip), peer->port);
len += safe_snprintf(buffer + len, BUFSIZE - len, "%s", msg);
return buffer;
}
void SocketComm::Error(const char* msg, bool kill_me)
{
if ( kill_me )
{
fprintf(stderr, "fatal error in child: %s\n", msg);
Kill();
}
else
{
if ( io->Eof() )
// Can't send to parent, so fall back to stderr.
fprintf(stderr, "error in child: %s", msg);
else
SendToParent(MSG_ERROR, 0, copy_string(msg));
}
DEBUG_COMM(fmt("child: %s", msg));
}
bool SocketComm::Error(const char* msg, Peer* peer)
{
const char* buffer = MakeLogString(msg, peer);
Error(buffer);
// If a remote peer causes an error, we shutdown the connection
// as resynchronizing is in general not possible. But we may
// try again later.
if ( peer->connected )
CloseConnection(peer, true);
return true;
}
void SocketComm::Log(const char* msg, Peer* peer)
{
const char* buffer = MakeLogString(msg, peer);
SendToParent(MSG_LOG, 0, copy_string(buffer));
DEBUG_COMM(fmt("child: %s", buffer));
}
void SocketComm::Kill()
{
if ( killing )
// Ignore recursive calls.
return;
killing = true;
LogProf();
Log("terminating");
close(listen_fd_clear);
close(listen_fd_ssl);
kill(getpid(), SIGTERM);
while ( 1 )
; // loop until killed
}
SocketComm::Peer* SocketComm::LookupPeer(RemoteSerializer::PeerID id,
bool only_if_connected)
{
loop_over_list(peers, i)
if ( peers[i]->id == id )
return ! only_if_connected ||
peers[i]->connected ? peers[i] : 0;
return 0;
}
bool SocketComm::LogStats()
{
if ( ! peers.length() )
return true;
// Concat stats of all peers into single buffer.
char* buffer = new char[peers.length() * 512];
int pos = 0;
loop_over_list(peers, i)
{
if ( peers[i]->connected )
peers[i]->io->Stats(buffer+pos, 512);
else
strcpy(buffer+pos, "not connected");
pos += strlen(buffer+pos) + 1;
}
// Send it.
if ( ! SendToParent(MSG_STATS, 0, buffer, pos) )
return false;
log_stats = false;
alarm(STATS_INTERVAL);
return true;
}
bool SocketComm::LogProf()
{
static struct rusage cld_res;
getrusage(RUSAGE_SELF, &cld_res);
double Utime = cld_res.ru_utime.tv_sec + cld_res.ru_utime.tv_usec / 1e6;
double Stime = cld_res.ru_stime.tv_sec + cld_res.ru_stime.tv_usec / 1e6;
double Rtime = current_time(true);
SocketComm::Log(fmt("CPU usage: user %.03f sys %.03f real %0.03f",
Utime, Stime, Rtime - first_rtime));
return true;
}
void SocketComm::CheckFinished()
{
assert(terminating);
loop_over_list(peers, i)
{
if ( ! peers[i]->connected )
continue;
if ( ! peers[i]->io->IsIdle() )
return;
}
LogProf();
Log("terminating");
// All done.
SendToParent(MSG_TERMINATE, 0, 0);
}
bool SocketComm::SendToParent(char type, Peer* peer, const char* str, int len)
{
#ifdef DEBUG
// str may already by constructed with fmt()
const char* tmp = copy_string(str);
DEBUG_COMM(fmt("child: (->parent) %s (#%" PRI_SOURCE_ID ", %s)", msgToStr(type), peer ? peer->id : RemoteSerializer::PEER_NONE, tmp));
delete [] tmp;
#endif
if ( sendToIO(io, type, peer ? peer->id : RemoteSerializer::PEER_NONE,
str, len) )
return true;
if ( io->Eof() )
Error("parent died", true);
return false;
}
bool SocketComm::SendToParent(char type, Peer* peer, int nargs, ...)
{
va_list ap;
#ifdef DEBUG
va_start(ap,nargs);
DEBUG_COMM(fmt("child: (->parent) %s (#%" PRI_SOURCE_ID ",%s)", msgToStr(type), peer ? peer->id : RemoteSerializer::PEER_NONE, fmt_uint32s(nargs, ap)));
va_end(ap);
#endif
va_start(ap, nargs);
bool ret = sendToIO(io, type,
peer ? peer->id : RemoteSerializer::PEER_NONE,
nargs, ap);
va_end(ap);
if ( ret )
return true;
if ( io->Eof() )
Error("parent died", true);
return false;
}
bool SocketComm::SocketComm::SendToParent(ChunkedIO::Chunk* c)
{
DEBUG_COMM(fmt("child: (->parent) chunk of size %d", c->len));
if ( sendToIO(io, c) )
return true;
if ( io->Eof() )
Error("parent died", true);
return false;
}
bool SocketComm::SendToPeer(Peer* peer, char type, const char* str, int len)
{
#ifdef DEBUG
// str may already by constructed with fmt()
const char* tmp = copy_string(str);
DEBUG_COMM(fmt("child: (->peer) %s to #%" PRI_SOURCE_ID " (%s)", msgToStr(type), peer->id, tmp));
delete [] tmp;
#endif
if ( ! sendToIO(peer->io, type, RemoteSerializer::PEER_NONE, str, len) )
{
Error(fmt("child: write error %s", io->Error()), peer);
return false;
}
return true;
}
bool SocketComm::SendToPeer(Peer* peer, char type, int nargs, ...)
{
va_list ap;
#ifdef DEBUG
va_start(ap,nargs);
DEBUG_COMM(fmt("child: (->peer) %s to #%" PRI_SOURCE_ID " (%s)",
msgToStr(type), peer->id, fmt_uint32s(nargs, ap)));
va_end(ap);
#endif
va_start(ap, nargs);
bool ret = sendToIO(peer->io, type, RemoteSerializer::PEER_NONE,
nargs, ap);
va_end(ap);
if ( ! ret )
{
Error(fmt("child: write error %s", io->Error()), peer);
return false;
}
return true;
}
bool SocketComm::SendToPeer(Peer* peer, ChunkedIO::Chunk* c)
{
DEBUG_COMM(fmt("child: (->peer) chunk of size %d to #%" PRI_SOURCE_ID, c->len, peer->id));
if ( ! sendToIO(peer->io, c) )
{
Error(fmt("child: write error %s", io->Error()), peer);
return false;
}
return true;
}