diff --git a/test/src/all.py b/test/src/all.py index 88a257b..b0c098a 100644 --- a/test/src/all.py +++ b/test/src/all.py @@ -30,6 +30,7 @@ from scapy.layers.inet6 import ( ) from scapy.layers.l2 import ARP, Ether from scapy.packet import Raw +from scapy.volatile import RandInt from .conf import IPV4_ADDR, IPV6_ADDR, MAC_ADDR @@ -347,13 +348,22 @@ def test_tcp_syn(iface): 65438, ] for p in ports_to_test: - syn = Ether(dst=MAC_ADDR) / IP(dst=IPV4_ADDR) / TCP(flags="S", dport=p) + seq_init = int(RandInt()) + syn = ( + Ether(dst=MAC_ADDR) + / IP(dst=IPV4_ADDR) + / TCP(flags="S", dport=p, seq=seq_init) + ) syn_ack = iface.sr1(syn, timeout=1) assert syn_ack is not None, "expecting answer, got nothing" check_ip_checksum(syn_ack) - assert TCP in syn_ack + assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary() syn_ack = syn_ack[TCP] - assert syn_ack.flags == "SA" + assert syn_ack.flags == "SA", "expecting TCP SA, got %r" % syn_ack.flags + assert syn_ack.ack == seq_init + 1, "wrong TCP ack value (%r != %r)" % ( + syn_ack.ack, + seq_init + 1, + ) @test @@ -361,40 +371,56 @@ def test_ipv4_tcp_psh_ack(iface): ##### PSH-ACK ##### sport = 26695 port = 445 + seq_init = int(RandInt()) # send PSH-ACK first psh_ack = ( Ether(dst=MAC_ADDR) / IP(dst=IPV4_ADDR) - / TCP(flags="PA", sport=sport, dport=port) + / TCP(flags="PA", sport=sport, dport=port, seq=seq_init) / Raw("payload") ) syn_ack = iface.sr1(psh_ack, timeout=1) assert syn_ack is None, "no answer expected, got one" # test the anti-injection mechanism - syn = Ether(dst=MAC_ADDR) / IP(dst=IPV4_ADDR) / TCP(flags="S", dport=port) + seq_init = int(RandInt()) + syn = ( + Ether(dst=MAC_ADDR) + / IP(dst=IPV4_ADDR) + / TCP(flags="S", sport=sport, dport=port, seq=seq_init) + ) syn_ack = iface.sr1(syn, timeout=1) assert syn_ack is not None, "expecting answer, got nothing" check_ip_checksum(syn_ack) - assert TCP in syn_ack + assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary() syn_ack = syn_ack[TCP] - assert syn_ack.flags == "SA" + assert syn_ack.flags == "SA", "expecting TCP SA, got %r" % syn_ack.flags + assert syn_ack.ack == seq_init + 1, "wrong TCP ack value (%r != %r)" % ( + syn_ack.ack, + seq_init + 1, + ) ack = Ether(dst=MAC_ADDR) / IP(dst=IPV4_ADDR) / TCP(flags="A", dport=port) # should fail because no ack given - psh_ack = Ether(dst=MAC_ADDR) / IP(dst=IPV4_ADDR) / TCP(flags="PA", dport=port) + psh_ack = ( + Ether(dst=MAC_ADDR) + / IP(dst=IPV4_ADDR) + / TCP(flags="PA", sport=sport, dport=port, ack=0, seq=seq_init + 1) + ) ack = iface.sr1(psh_ack, timeout=1) assert ack is None, "no answer expected, got one" # should get an answer this time psh_ack = ( Ether(dst=MAC_ADDR) / IP(dst=IPV4_ADDR) - / TCP(flags="PA", dport=port, ack=syn_ack.seq + 1) + / TCP( + flags="PA", sport=sport, dport=port, ack=syn_ack.seq + 1, seq=seq_init + 1 + ) ) ack = iface.sr1(psh_ack, timeout=1) assert ack is not None, "expecting answer, got nothing" check_ip_checksum(ack) - assert TCP in ack + assert TCP in ack, "expecting TCP, got %r" % ack.summary() ack = ack[TCP] - assert ack.flags == "A" + assert ack.flags == "A", "expecting TCP A, got %r" % syn_ack.flags @test @@ -402,40 +428,55 @@ def test_ipv6_tcp_psh_ack(iface): ##### PSH-ACK ##### sport = 26695 port = 445 + seq_init = int(RandInt()) # send PSH-ACK first psh_ack = ( Ether(dst=MAC_ADDR) / IPv6(dst=IPV6_ADDR) - / TCP(flags="PA", sport=sport, dport=port) + / TCP(flags="PA", sport=sport, dport=port, seq=seq_init) / Raw("payload") ) syn_ack = iface.sr1(psh_ack, timeout=1) assert syn_ack is None, "no answer expected, got one" # test the anti-injection mechanism - syn = Ether(dst=MAC_ADDR) / IPv6(dst=IPV6_ADDR) / TCP(flags="S", dport=port) + syn = ( + Ether(dst=MAC_ADDR) + / IPv6(dst=IPV6_ADDR) + / TCP(flags="S", sport=sport, dport=port, seq=seq_init) + ) syn_ack = iface.sr1(syn, timeout=1) assert syn_ack is not None, "expecting answer, got nothing" check_ipv6_checksum(syn_ack) - assert TCP in syn_ack + assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary() syn_ack = syn_ack[TCP] - assert syn_ack.flags == "SA" + assert syn_ack.flags == "SA", "expecting TCP SA, got %r" % syn_ack.flags + assert syn_ack.ack == seq_init + 1, "wrong TCP ack value (%r != %r)" % ( + syn_ack.ack, + seq_init + 1, + ) ack = Ether(dst=MAC_ADDR) / IPv6(dst=IPV6_ADDR) / TCP(flags="A", dport=port) # should fail because no ack given - psh_ack = Ether(dst=MAC_ADDR) / IPv6(dst=IPV6_ADDR) / TCP(flags="PA", dport=port) + psh_ack = ( + Ether(dst=MAC_ADDR) + / IPv6(dst=IPV6_ADDR) + / TCP(flags="PA", sport=sport, dport=port, ack=0, seq=seq_init + 1) + ) ack = iface.sr1(psh_ack, timeout=1) assert ack is None, "no answer expected, got one" # should get an answer this time psh_ack = ( Ether(dst=MAC_ADDR) / IPv6(dst=IPV6_ADDR) - / TCP(flags="PA", dport=port, ack=syn_ack.seq + 1) + / TCP( + flags="PA", sport=sport, dport=port, ack=syn_ack.seq + 1, seq=seq_init + 1 + ) ) ack = iface.sr1(psh_ack, timeout=1) assert ack is not None, "expecting answer, got nothing" check_ipv6_checksum(ack) - assert TCP in ack + assert TCP in ack, "expecting TCP, got %r" % ack.summary() ack = ack[TCP] - assert ack.flags == "A" + assert ack.flags == "A", "expecting TCP A, got %r" % syn_ack.flags @test @@ -443,33 +484,46 @@ def test_ipv4_tcp_http(iface): sport = 24592 dports = [80, 443, 5000, 53228] for dport in dports: + seq_init = int(RandInt()) syn = ( Ether(dst=MAC_ADDR) / IP(dst=IPV4_ADDR) - / TCP(flags="S", sport=sport, dport=dport) + / TCP(flags="S", sport=sport, dport=dport, seq=seq_init) ) syn_ack = iface.sr1(syn, timeout=1) assert syn_ack is not None, "expecting answer, got nothing" check_ip_checksum(syn_ack) - assert TCP in syn_ack + assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary() syn_ack = syn_ack[TCP] - assert syn_ack.flags == "SA" + assert syn_ack.flags == "SA", "expecting TCP SA, got %r" % syn_ack.flags ack = ( Ether(dst=MAC_ADDR) / IP(dst=IPV4_ADDR) - / TCP(flags="A", sport=sport, dport=dport, ack=syn_ack.seq + 1) + / TCP( + flags="A", + sport=sport, + dport=dport, + seq=seq_init + 1, + ack=syn_ack.seq + 1, + ) ) _ = iface.sr1(ack, timeout=1) req = ( Ether(dst=MAC_ADDR) / IP(dst=IPV4_ADDR) - / TCP(flags="PA", ack=syn_ack.seq + 1, sport=sport, dport=dport) + / TCP( + flags="PA", + sport=sport, + dport=dport, + seq=seq_init + 1, + ack=syn_ack.seq + 1, + ) / Raw("GET / HTTP/1.1\r\n\r\n") ) resp = iface.sr1(req, timeout=1) assert resp is not None, "expecting answer, got nothing" check_ip_checksum(resp) - assert TCP in resp + assert TCP in resp, "expecting TCP, got %r" % resp.summary() tcp = resp[TCP] assert tcp.payload.load.startswith(b"HTTP/1.1 401 Unauthorized\n") @@ -479,33 +533,46 @@ def test_ipv6_tcp_http(iface): sport = 24592 dports = [80, 443, 5000, 53228] for dport in dports: + seq_init = int(RandInt()) syn = ( Ether(dst=MAC_ADDR) / IPv6(dst=IPV6_ADDR) - / TCP(flags="S", sport=sport, dport=dport) + / TCP(flags="S", sport=sport, dport=dport, seq=seq_init) ) syn_ack = iface.sr1(syn, timeout=1) assert syn_ack is not None, "expecting answer, got nothing" check_ipv6_checksum(syn_ack) - assert TCP in syn_ack + assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary() syn_ack = syn_ack[TCP] assert syn_ack.flags == "SA" ack = ( Ether(dst=MAC_ADDR) / IPv6(dst=IPV6_ADDR) - / TCP(flags="A", sport=sport, dport=dport, ack=syn_ack.seq + 1) + / TCP( + flags="A", + sport=sport, + dport=dport, + seq=seq_init + 1, + ack=syn_ack.seq + 1, + ) ) _ = iface.sr1(ack, timeout=1) req = ( Ether(dst=MAC_ADDR) / IPv6(dst=IPV6_ADDR) - / TCP(flags="PA", ack=syn_ack.seq + 1, sport=sport, dport=dport) + / TCP( + flags="PA", + sport=sport, + dport=dport, + seq=seq_init + 1, + ack=syn_ack.seq + 1, + ) / Raw("GET / HTTP/1.1\r\n\r\n") ) resp = iface.sr1(req, timeout=1) assert resp is not None, "expecting answer, got nothing" check_ipv6_checksum(resp) - assert TCP in resp + assert TCP in resp, "expecting TCP, got %r" % resp.summary() tcp = resp[TCP] assert tcp.payload.load.startswith(b"HTTP/1.1 401 Unauthorized\n") @@ -553,33 +620,46 @@ def test_ipv4_tcp_http_ko(iface): sport = 24592 dports = [80, 443, 5000, 53228] for dport in dports: + seq_init = int(RandInt()) syn = ( Ether(dst=MAC_ADDR) / IP(dst=IPV4_ADDR) - / TCP(flags="S", sport=sport, dport=dport) + / TCP(flags="S", sport=sport, dport=dport, seq=seq_init) ) syn_ack = iface.sr1(syn, timeout=1) assert syn_ack is not None, "expecting answer, got nothing" check_ip_checksum(syn_ack) - assert TCP in syn_ack + assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary() syn_ack = syn_ack[TCP] assert syn_ack.flags == "SA" ack = ( Ether(dst=MAC_ADDR) / IP(dst=IPV4_ADDR) - / TCP(flags="A", sport=sport, dport=dport, ack=syn_ack.seq + 1) + / TCP( + flags="A", + sport=sport, + dport=dport, + seq=seq_init + 1, + ack=syn_ack.seq + 1, + ) ) _ = iface.sr1(ack, timeout=1) req = ( Ether(dst=MAC_ADDR) / IP(dst=IPV4_ADDR) - / TCP(flags="PA", ack=syn_ack.seq + 1, sport=sport, dport=dport) + / TCP( + flags="PA", + sport=sport, + dport=dport, + seq=seq_init + 1, + ack=syn_ack.seq + 1, + ) / Raw(bytes.fromhex("4f5054494f4e53")) ) resp = iface.sr1(req, timeout=1) assert resp is not None, "expecting answer, got nothing" check_ip_checksum(resp) - assert TCP in resp + assert TCP in resp, "expecting TCP, got %r" % resp.summary() assert "P" not in resp[TCP].flags assert len(resp[TCP].payload) == 0 @@ -604,33 +684,46 @@ def test_ipv6_tcp_http_ko(iface): sport = 24592 dports = [80, 443, 5000, 53228] for dport in dports: + seq_init = int(RandInt()) syn = ( Ether(dst=MAC_ADDR) / IPv6(dst=IPV6_ADDR) - / TCP(flags="S", sport=sport, dport=dport) + / TCP(flags="S", sport=sport, dport=dport, seq=seq_init) ) syn_ack = iface.sr1(syn, timeout=1) assert syn_ack is not None, "expecting answer, got nothing" check_ipv6_checksum(syn_ack) - assert TCP in syn_ack + assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary() syn_ack = syn_ack[TCP] assert syn_ack.flags == "SA" ack = ( Ether(dst=MAC_ADDR) / IPv6(dst=IPV6_ADDR) - / TCP(flags="A", sport=sport, dport=dport, ack=syn_ack.seq + 1) + / TCP( + flags="A", + sport=sport, + dport=dport, + seq=seq_init + 1, + ack=syn_ack.seq + 1, + ) ) _ = iface.sr1(ack, timeout=1) req = ( Ether(dst=MAC_ADDR) / IPv6(dst=IPV6_ADDR) - / TCP(flags="PA", ack=syn_ack.seq + 1, sport=sport, dport=dport) + / TCP( + flags="PA", + sport=sport, + dport=dport, + seq=seq_init + 1, + ack=syn_ack.seq + 1, + ) / Raw(bytes.fromhex("4f5054494f4e53")) ) resp = iface.sr1(req, timeout=1) assert resp is not None, "expecting answer, got nothing" check_ipv6_checksum(resp) - assert TCP in resp + assert TCP in resp, "expecting TCP, got %r" % resp.summary() assert "P" not in resp[TCP].flags assert len(resp[TCP].payload) == 0 @@ -815,6 +908,7 @@ def test_ipv4_tcp_ssh(iface): sport = 37183 dports = [22, 80, 2222, 2022, 23874, 50000] for i, dport in enumerate(dports): + seq_init = int(RandInt()) banner = [ b"SSH-2.0-AsyncSSH_2.1.0", b"SSH-2.0-PuTTY", @@ -825,30 +919,42 @@ def test_ipv4_tcp_ssh(iface): syn = ( Ether(dst=MAC_ADDR) / IP(dst=IPV4_ADDR) - / TCP(flags="S", sport=sport, dport=dport) + / TCP(flags="S", sport=sport, dport=dport, seq=seq_init) ) syn_ack = iface.sr1(syn, timeout=1) assert syn_ack is not None, "expecting answer, got nothing" check_ip_checksum(syn_ack) - assert TCP in syn_ack + assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary() syn_ack = syn_ack[TCP] assert syn_ack.flags == "SA" ack = ( Ether(dst=MAC_ADDR) / IP(dst=IPV4_ADDR) - / TCP(flags="A", sport=sport, dport=dport, ack=syn_ack.seq + 1) + / TCP( + flags="A", + sport=sport, + dport=dport, + seq=seq_init + 1, + ack=syn_ack.seq + 1, + ) ) _ = iface.sr1(ack, timeout=1) req = ( Ether(dst=MAC_ADDR) / IP(dst=IPV4_ADDR) - / TCP(flags="PA", ack=syn_ack.seq + 1, sport=sport, dport=dport) + / TCP( + flags="PA", + sport=sport, + dport=dport, + seq=seq_init + 1, + ack=syn_ack.seq + 1, + ) / Raw(banner + b"\r\n") ) resp = iface.sr1(req, timeout=1) assert resp is not None, "expecting answer, got nothing" check_ip_checksum(resp) - assert TCP in resp + assert TCP in resp, "expecting TCP, got %r" % resp.summary() tcp = resp[TCP] assert "A" in tcp.flags, "expecting ACK flag, not set (%r)" % tcp.flags assert "P" in tcp.flags, "expecting PSH flag, not set (%r)" % tcp.flags @@ -898,6 +1004,7 @@ def test_ipv6_tcp_ssh(iface): sport = 37183 dports = [22, 80, 2222, 2022, 23874, 50000] for i, dport in enumerate(dports): + seq_init = int(RandInt()) banner = [ b"SSH-2.0-AsyncSSH_2.1.0", b"SSH-2.0-PuTTY", @@ -908,30 +1015,42 @@ def test_ipv6_tcp_ssh(iface): syn = ( Ether(dst=MAC_ADDR) / IPv6(dst=IPV6_ADDR) - / TCP(flags="S", sport=sport, dport=dport) + / TCP(flags="S", sport=sport, dport=dport, seq=seq_init) ) syn_ack = iface.sr1(syn, timeout=1) assert syn_ack is not None, "expecting answer, got nothing" check_ipv6_checksum(syn_ack) - assert TCP in syn_ack + assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary() syn_ack = syn_ack[TCP] assert syn_ack.flags == "SA" ack = ( Ether(dst=MAC_ADDR) / IPv6(dst=IPV6_ADDR) - / TCP(flags="A", sport=sport, dport=dport, ack=syn_ack.seq + 1) + / TCP( + flags="A", + sport=sport, + dport=dport, + seq=seq_init + 1, + ack=syn_ack.seq + 1, + ) ) _ = iface.sr1(ack, timeout=1) req = ( Ether(dst=MAC_ADDR) / IPv6(dst=IPV6_ADDR) - / TCP(flags="PA", ack=syn_ack.seq + 1, sport=sport, dport=dport) + / TCP( + flags="PA", + sport=sport, + dport=dport, + seq=seq_init + 1, + ack=syn_ack.seq + 1, + ) / Raw(banner + b"\r\n") ) resp = iface.sr1(req, timeout=1) assert resp is not None, "expecting answer, got nothing" check_ipv6_checksum(resp) - assert TCP in resp + assert TCP in resp, "expecting TCP, got %r" % resp.summary() tcp = resp[TCP] assert "A" in tcp.flags, "expecting ACK flag, not set (%r)" % tcp.flags assert "P" in tcp.flags, "expecting PSH flag, not set (%r)" % tcp.flags