Fix reassembly of data w/ sizes beyond 32-bit capacities (BIT-348).

The main change is that reassembly code (e.g. for TCP) now uses
int64/uint64 (signedness is situational) data types in place of int
types in order to support delivering data to analyzers that pass 2GB
thresholds.  There's also changes in logic that accompany the change in
data types, e.g. to fix TCP sequence space arithmetic inconsistencies.

Another significant change is in the Analyzer API: the *Packet and
*Undelivered methods now use a uint64 in place of an int for the
relative sequence space offset parameter.
This commit is contained in:
Jon Siwek 2014-04-09 13:03:24 -05:00
parent 2f57c26d5b
commit 2b3c2bd394
75 changed files with 1627 additions and 1540 deletions

View file

@ -14,11 +14,6 @@ using namespace analyzer::tcp;
// Note, sequence numbers are relative. I.e., they start with 1.
// TODO: The Reassembler should start using 64 bit ints for keeping track of
// sequence numbers; currently they become negative once 2GB are exceeded.
//
// See #348 for more information.
const bool DEBUG_tcp_contents = false;
const bool DEBUG_tcp_connection_close = false;
const bool DEBUG_tcp_match_undelivered = false;
@ -44,9 +39,7 @@ TCP_Reassembler::TCP_Reassembler(analyzer::Analyzer* arg_dst_analyzer,
deliver_tcp_contents = 0;
skip_deliveries = 0;
did_EOF = 0;
#ifdef ENABLE_SEQ_TO_SKIP
seq_to_skip = 0;
#endif
in_delivery = false;
if ( tcp_contents )
@ -73,12 +66,11 @@ TCP_Reassembler::~TCP_Reassembler()
void TCP_Reassembler::Done()
{
MatchUndelivered(-1);
MatchUndelivered(-1, true);
if ( record_contents_file )
{ // Record any undelivered data.
if ( blocks &&
seq_delta(last_reassem_seq, last_block->upper) < 0 )
if ( blocks && last_reassem_seq < last_block->upper )
RecordToSeq(last_reassem_seq, last_block->upper,
record_contents_file);
@ -86,13 +78,13 @@ void TCP_Reassembler::Done()
}
}
void TCP_Reassembler::SizeBufferedData(int& waiting_on_hole,
int& waiting_on_ack) const
void TCP_Reassembler::SizeBufferedData(uint64& waiting_on_hole,
uint64& waiting_on_ack) const
{
waiting_on_hole = waiting_on_ack = 0;
for ( DataBlock* b = blocks; b; b = b->next )
{
if ( seq_delta(b->seq, last_reassem_seq) <= 0 )
if ( b->seq <= last_reassem_seq )
// We must have delivered this block, but
// haven't yet trimmed it.
waiting_on_ack += b->Size();
@ -126,7 +118,7 @@ void TCP_Reassembler::SetContentsFile(BroFile* f)
}
void TCP_Reassembler::Undelivered(int up_to_seq)
void TCP_Reassembler::Undelivered(uint64 up_to_seq)
{
TCP_Endpoint* endpoint = endp;
TCP_Endpoint* peer = endpoint->peer;
@ -142,13 +134,7 @@ void TCP_Reassembler::Undelivered(int up_to_seq)
// was a keep-alive. So, in either case, just ignore it.
// TODO: Don't we need to update last_reassm_seq ????
if ( up_to_seq >=0 )
// Since seq are currently only 32 bit signed
// integers, they will become negative if a
// connection has more than 2GB of data. Remove the
// above if and always return here, once we're using
// 64 bit ints
return;
return;
}
#if 0
@ -156,15 +142,14 @@ void TCP_Reassembler::Undelivered(int up_to_seq)
{
// Make sure we're not worrying about undelivered
// FIN control octets!
int FIN_seq = endpoint->FIN_seq - endpoint->start_seq;
if ( seq_delta(up_to_seq, FIN_seq) >= 0 )
up_to_seq = FIN_seq - 1;
if ( up_to_seq >= endpoint->FIN_seq )
up_to_seq = endpoint->FIN_seq - 1;
}
#endif
if ( DEBUG_tcp_contents )
{
DEBUG_MSG("%.6f Undelivered: IsOrig()=%d up_to_seq=%d, last_reassm=%d, "
DEBUG_MSG("%.6f Undelivered: IsOrig()=%d up_to_seq=%"PRIu64", last_reassm=%"PRIu64", "
"endp: FIN_cnt=%d, RST_cnt=%d, "
"peer: FIN_cnt=%d, RST_cnt=%d\n",
network_time, IsOrig(), up_to_seq, last_reassem_seq,
@ -172,7 +157,7 @@ void TCP_Reassembler::Undelivered(int up_to_seq)
peer->FIN_cnt, peer->RST_cnt);
}
if ( seq_delta(up_to_seq, last_reassem_seq) <= 0 )
if ( up_to_seq <= last_reassem_seq )
// This should never happen. (Reassembler::TrimToSeq has the only call
// to this method and only if this condition is not true).
reporter->InternalError("Calling Undelivered for data that has already been delivered (or has already been marked as undelivered");
@ -195,10 +180,10 @@ void TCP_Reassembler::Undelivered(int up_to_seq)
{
if ( DEBUG_tcp_contents )
{
DEBUG_MSG("%.6f Undelivered: IsOrig()=%d, seq=%d, len=%d, "
DEBUG_MSG("%.6f Undelivered: IsOrig()=%d, seq=%"PRIu64", len=%"PRIu64", "
"skip_deliveries=%d\n",
network_time, IsOrig(), last_reassem_seq,
seq_delta(up_to_seq, last_reassem_seq),
up_to_seq - last_reassem_seq,
skip_deliveries);
}
@ -210,8 +195,8 @@ void TCP_Reassembler::Undelivered(int up_to_seq)
// packet, but it's undelievered because it's out of
// sequence.
int seq = last_reassem_seq;
int len = seq_delta(up_to_seq, last_reassem_seq);
uint64 seq = last_reassem_seq;
uint64 len = up_to_seq - last_reassem_seq;
// Only report on content gaps for connections that
// are in a cleanly established state. In other
@ -255,19 +240,19 @@ void TCP_Reassembler::Undelivered(int up_to_seq)
RecordToSeq(last_reassem_seq, up_to_seq, record_contents_file);
if ( tcp_match_undelivered )
MatchUndelivered(up_to_seq);
MatchUndelivered(up_to_seq, false);
// But we need to re-adjust last_reassem_seq in either case.
last_reassem_seq = up_to_seq; // we've done our best ...
}
void TCP_Reassembler::MatchUndelivered(int up_to_seq)
void TCP_Reassembler::MatchUndelivered(uint64 up_to_seq, bool use_last_upper)
{
if ( ! blocks || ! rule_matcher )
return;
ASSERT(last_block);
if ( up_to_seq == -1 )
if ( use_last_upper )
up_to_seq = last_block->upper;
// ### Note: the original code did not check whether blocks have
@ -277,36 +262,35 @@ void TCP_Reassembler::MatchUndelivered(int up_to_seq)
// We are to match any undelivered data, from last_reassem_seq to
// min(last_block->upper, up_to_seq).
// Is there such data?
if ( seq_delta(up_to_seq, last_reassem_seq) <= 0 ||
seq_delta(last_block->upper, last_reassem_seq) <= 0 )
if ( up_to_seq <= last_reassem_seq ||
last_block->upper <= last_reassem_seq )
return;
// Skip blocks that are already delivered (but not ACK'ed).
// Question: shall we instead keep a pointer to the first undelivered
// block?
DataBlock* b;
for ( b = blocks; b && seq_delta(b->upper, last_reassem_seq) <= 0;
b = b->next )
for ( b = blocks; b && b->upper <= last_reassem_seq; b = b->next )
tcp_analyzer->Conn()->Match(Rule::PAYLOAD, b->block, b->Size(),
false, false, IsOrig(), false);
ASSERT(b);
}
void TCP_Reassembler::RecordToSeq(int start_seq, int stop_seq, BroFile* f)
void TCP_Reassembler::RecordToSeq(uint64 start_seq, uint64 stop_seq, BroFile* f)
{
DataBlock* b = blocks;
// Skip over blocks up to the start seq.
while ( b && seq_delta(b->upper, start_seq) <= 0 )
while ( b && b->upper <= start_seq )
b = b->next;
if ( ! b )
return;
int last_seq = start_seq;
while ( b && seq_delta(b->upper, stop_seq) <= 0 )
uint64 last_seq = start_seq;
while ( b && b->upper <= stop_seq )
{
if ( seq_delta(b->seq, last_seq) > 0 )
if ( b->seq > last_seq )
RecordGap(last_seq, b->seq, f);
RecordBlock(b, f);
@ -316,7 +300,7 @@ void TCP_Reassembler::RecordToSeq(int start_seq, int stop_seq, BroFile* f)
if ( b )
// Check for final gap.
if ( seq_delta(last_seq, stop_seq) < 0 )
if ( last_seq < stop_seq )
RecordGap(last_seq, stop_seq, f);
}
@ -337,9 +321,9 @@ void TCP_Reassembler::RecordBlock(DataBlock* b, BroFile* f)
}
}
void TCP_Reassembler::RecordGap(int start_seq, int upper_seq, BroFile* f)
void TCP_Reassembler::RecordGap(uint64 start_seq, uint64 upper_seq, BroFile* f)
{
if ( f->Write(fmt("\n<<gap %d>>\n", seq_delta(upper_seq, start_seq))) )
if ( f->Write(fmt("\n<<gap %"PRIu64">>\n", upper_seq - start_seq)) )
return;
reporter->Error("TCP_Reassembler contents gap write failed");
@ -356,8 +340,8 @@ void TCP_Reassembler::RecordGap(int start_seq, int upper_seq, BroFile* f)
void TCP_Reassembler::BlockInserted(DataBlock* start_block)
{
if ( seq_delta(start_block->seq, last_reassem_seq) > 0 ||
seq_delta(start_block->upper, last_reassem_seq) <= 0 )
if ( start_block->seq > last_reassem_seq ||
start_block->upper <= last_reassem_seq )
return;
// We've filled a leading hole. Deliver as much as possible.
@ -367,12 +351,12 @@ void TCP_Reassembler::BlockInserted(DataBlock* start_block)
// loop we have to take care not to deliver already-delivered
// data.
for ( DataBlock* b = start_block;
b && seq_delta(b->seq, last_reassem_seq) <= 0; b = b->next )
b && b->seq <= last_reassem_seq; b = b->next )
{
if ( b->seq == last_reassem_seq )
{ // New stuff.
int len = b->Size();
int seq = last_reassem_seq;
uint64 len = b->Size();
uint64 seq = last_reassem_seq;
last_reassem_seq += len;
@ -406,10 +390,10 @@ void TCP_Reassembler::BlockInserted(DataBlock* start_block)
// TCP_Connection::NextPacket.
}
void TCP_Reassembler::Overlap(const u_char* b1, const u_char* b2, int n)
void TCP_Reassembler::Overlap(const u_char* b1, const u_char* b2, uint64 n)
{
if ( DEBUG_tcp_contents )
DEBUG_MSG("%.6f TCP contents overlap: %d IsOrig()=%d\n", network_time, n, IsOrig());
DEBUG_MSG("%.6f TCP contents overlap: %"PRIu64" IsOrig()=%d\n", network_time, n, IsOrig());
if ( rexmit_inconsistency &&
memcmp((const void*) b1, (const void*) b2, n) &&
@ -438,7 +422,7 @@ bool TCP_Reassembler::DoUnserialize(UnserialInfo* info)
return false; // Cannot be reached.
}
void TCP_Reassembler::Deliver(int seq, int len, const u_char* data)
void TCP_Reassembler::Deliver(uint64 seq, int len, const u_char* data)
{
if ( type == Direct )
dst_analyzer->NextStream(len, data, IsOrig());
@ -446,24 +430,24 @@ void TCP_Reassembler::Deliver(int seq, int len, const u_char* data)
dst_analyzer->ForwardStream(len, data, IsOrig());
}
int TCP_Reassembler::DataSent(double t, int seq, int len,
int TCP_Reassembler::DataSent(double t, uint64 seq, int len,
const u_char* data, bool replaying)
{
int ack = seq_delta(endp->AckSeq(), endp->StartSeq());
int upper_seq = seq + len;
uint64 ack = endp->ToRelativeSeqSpace(endp->AckSeq(), endp->AckWraps());
uint64 upper_seq = seq + len;
if ( DEBUG_tcp_contents )
{
DEBUG_MSG("%.6f DataSent: IsOrig()=%d seq=%d upper=%d ack=%d\n",
DEBUG_MSG("%.6f DataSent: IsOrig()=%d seq=%"PRIu64" upper=%"PRIu64" ack=%"PRIu64"\n",
network_time, IsOrig(), seq, upper_seq, ack);
}
if ( skip_deliveries )
return 0;
if ( seq_delta(seq, ack) < 0 && ! replaying )
if ( seq < ack && ! replaying )
{
if ( seq_delta(upper_seq, ack) <= 0 )
if ( upper_seq <= ack )
// We've already delivered this and it's been acked.
return 0;
@ -472,7 +456,7 @@ int TCP_Reassembler::DataSent(double t, int seq, int len,
// packet held [a, a+b) and this packet holds [a, a+c) for c>b
// (which some TCP's will do when retransmitting). Trim the
// packet to just the unacked data.
int amount_acked = seq_delta(ack, seq);
uint64 amount_acked = ack - seq;
seq += amount_acked;
data += amount_acked;
len -= amount_acked;
@ -500,16 +484,13 @@ int TCP_Reassembler::DataSent(double t, int seq, int len,
}
void TCP_Reassembler::AckReceived(int seq)
void TCP_Reassembler::AckReceived(uint64 seq)
{
if ( endp->FIN_cnt > 0 && seq_delta(seq, endp->FIN_seq) >= 0 )
// TrimToSeq: FIN_seq - 1
if ( endp->FIN_cnt > 0 && seq >= endp->FIN_seq )
seq = endp->FIN_seq - 1;
int bytes_covered = seq_delta(seq, trim_seq);
if ( bytes_covered <= 0 )
// Zero, or negative in sequence-space terms. Nothing to do.
if ( seq <= trim_seq )
// Nothing to do.
return;
bool test_active = ! skip_deliveries && ! tcp_analyzer->Skipping() &&
@ -517,12 +498,12 @@ void TCP_Reassembler::AckReceived(int seq)
(endp->state == TCP_ENDPOINT_ESTABLISHED &&
endp->peer->state == TCP_ENDPOINT_ESTABLISHED ) );
int num_missing = TrimToSeq(seq);
uint64 num_missing = TrimToSeq(seq);
if ( test_active )
{
++tot_ack_events;
tot_ack_bytes += bytes_covered;
tot_ack_bytes += seq - trim_seq;
if ( num_missing > 0 )
{
@ -602,20 +583,18 @@ void TCP_Reassembler::CheckEOF()
// Deliver, DeliverBlock is not virtual, and this allows us to insert
// operations that apply to all connections using TCP_Contents.
void TCP_Reassembler::DeliverBlock(int seq, int len, const u_char* data)
void TCP_Reassembler::DeliverBlock(uint64 seq, int len, const u_char* data)
{
#ifdef ENABLE_SEQ_TO_SKIP
if ( seq_delta(seq + len, seq_to_skip) <= 0 )
if ( seq + len <= seq_to_skip )
return;
if ( seq_delta(seq, seq_to_skip) < 0 )
if ( seq < seq_to_skip )
{
int to_skip = seq_delta(seq_to_skip, seq);
uint64 to_skip = seq_to_skip - seq;
len -= to_skip;
data += to_skip;
seq = seq_to_skip;
}
#endif
if ( deliver_tcp_contents )
{
@ -640,23 +619,21 @@ void TCP_Reassembler::DeliverBlock(int seq, int len, const u_char* data)
in_delivery = true;
Deliver(seq, len, data);
in_delivery = false;
#ifdef ENABLE_SEQ_TO_SKIP
if ( seq_delta(seq + len, seq_to_skip) < 0 )
if ( seq + len < seq_to_skip )
SkipToSeq(seq_to_skip);
#endif
}
#ifdef ENABLE_SEQ_TO_SKIP
void TCP_Reassembler::SkipToSeq(int seq)
void TCP_Reassembler::SkipToSeq(uint64 seq)
{
if ( seq_delta(seq, seq_to_skip) > 0 )
if ( seq > seq_to_skip )
{
seq_to_skip = seq;
if ( ! in_delivery )
TrimToSeq(seq);
}
}
#endif
int TCP_Reassembler::DataPending() const
{
@ -665,20 +642,28 @@ int TCP_Reassembler::DataPending() const
if ( skip_deliveries )
return 0;
uint32 delivered_seq = Endpoint()->StartSeq() + DataSeq();
uint64 delivered_seq = Endpoint()->StartSeqI64() + DataSeq();
uint64 last_seq = TCP_Endpoint::ToFullSeqSpace(Endpoint()->LastSeq(),
Endpoint()->SeqWraps());
if ( last_seq < delivered_seq )
return 0;
// Q. Can we say that?
// ASSERT(delivered_seq <= Endpoint()->LastSeq());
// ASSERT(delivered_seq <= last_seq);
//
// A. Yes, but only if we express it with 64-bit comparison
// to handle sequence wrapping around. (Or perhaps seq_delta
// is enough here?)
// A. That should be true if endpoints are always initialized w/
// trustworthy sequence numbers, though it seems that may not currently
// be the case. e.g. a RST packet may end up initializing the endpoint.
// In that case, maybe there's not any "right" way to initialize it, so
// the check for last_seq < delivered_seq sort of serves as a check for
// endpoints that weren't initialized w/ meaningful sequence numbers.
// We've delivered everything if we're up to the penultimate
// sequence number (since a FIN consumes an octet in the
// sequence space), or right at it (because a RST does not).
if ( delivered_seq != Endpoint()->LastSeq() - 1 &&
delivered_seq != Endpoint()->LastSeq() )
if ( delivered_seq != last_seq - 1 &&
delivered_seq != last_seq )
return 1;
// If we've sent RST, then we can't send ACKs any more.