diff --git a/test/src/tests/dns.py b/test/src/tests/dns.py index 2b265ef..49113ad 100644 --- a/test/src/tests/dns.py +++ b/test/src/tests/dns.py @@ -17,6 +17,7 @@ from socket import AF_INET6 import struct +from scapy.compat import raw from scapy.layers.dns import DNS, DNSQR from scapy.layers.inet import IP, UDP from scapy.layers.inet6 import IPv6 @@ -30,19 +31,20 @@ from ..core import test, check_ip_checksum, check_ipv6_checksum @test -def test_ipv4_udp_dns_a(): - sports = [13274] # [53, 13274, 12198, 888, 0] - dports = [80] # [53, 5353, 80, 161, 24732] +def test_ipv4_udp_dns_in_a(): + sports = [53, 13274, 0] + dports = [53, 5353, 80, 161, 24732] payload = DNS() for sport in sports: for dport in dports: for domain in ['example.com', 'www.example.com', 'masscan.ned']: qd = DNSQR(qname=domain, qtype="A", qclass="IN") + dns_req = DNS(id=1234, rd=False, opcode=0, qd=qd) req = ( Ether(dst=MAC_ADDR) / IP(dst=IPV4_ADDR) / UDP(sport=sport, dport=dport) - / DNS(id=1234, rd=False, opcode=0, qd=qd)) + / dns_req) resp = srp1(req, timeout=1) assert resp is not None, "expecting answer, got nothing" check_ip_checksum(resp) @@ -52,151 +54,76 @@ def test_ipv4_udp_dns_a(): assert udp.dport == sport, "unexpected UDP dport: {}".format(udp.dport) if not DNS in udp: try: - rr = DNS(udp.load) + dns_rep = DNS(udp.load) except Exception: raise AssertionError("no DNS layer found") else: - rr = udp[DNS] - assert(rr.id == 1234), f"unexpected id value: {rr.id}" - assert(rr.qr == True), f"unexpected qr value" - assert(rr.opcode == 0), f"unexpected opcode value" - assert(rr.aa == True), f"unexpected aa value" - assert(rr.tc == False), f"unexpected tc value" - assert(rr.rd == False), f"unexpected rd value" - assert(rr.ra == False), f"unexpected ra value" - assert(rr.z == 0), f"unexpected z value" - assert(rr.rcode == 0), f"unexpected rcode value" - assert(rr.qdcount == 1), f"unexpected qdcount value" - assert(rr.ancount == 1), f"unexpected ancount value" - assert(rr.nscount == 0), f"unexpected nscount value" - assert(rr.arcount == 0), f"unexpected arcount value" - -""" + dns_rep = udp[DNS] + assert(dns_rep.id == 1234), f"unexpected id value: {rr.id}" + assert(dns_rep.qr == True), f"unexpected qr value" + assert(dns_rep.opcode == 0), f"unexpected opcode value" + assert(dns_rep.aa == True), f"unexpected aa value" + assert(dns_rep.tc == False), f"unexpected tc value" + assert(dns_rep.rd == False), f"unexpected rd value" + assert(dns_rep.ra == False), f"unexpected ra value" + assert(dns_rep.z == 0), f"unexpected z value" + assert(dns_rep.rcode == 0), f"unexpected rcode value" + assert(dns_rep.qdcount == 1), f"unexpected qdcount value" + assert(dns_rep.ancount == 1), f"unexpected ancount value" + assert(dns_rep.nscount == 0), f"unexpected nscount value" + assert(dns_rep.arcount == 0), f"unexpected arcount value" + assert(raw(dns_rep.qd[0]) == raw(dns_req.qd[0])), f"query in request and response do not match" + assert(raw(dns_rep.qd[0].qname) == raw(dns_req.qd[0].qname + b'.')), f"if this test fails, it may mean that scapy fixed the bug in dns.py L134 - if that is so, remove \" + b'.'\" in the test" + assert(dns_rep.an[0].rrname == dns_req.qd[0].qname + b'.'), f"if this test fails, it may mean that scapy fixed the bug in dns.py L134 - if that is so, remove \" + b'.'\" in the test" + assert(dns_rep.an[0].rclass == dns_req.qd[0].qclass), f"class in answer does not match query" + assert(dns_rep.an[0].type == dns_req.qd[0].qtype), f"type in answer does not match query" + assert(dns_rep.an[0].rdata == IPV4_ADDR) @test -def test_ipv6_udp_stun(): - sports = [12345, 55555, 80, 43273] - dports = [80, 800, 8000, 3478] - payload = bytes.fromhex("000100002112a442000000000000000000000000") - for sport in sports: - for dport in dports: - req = ( - Ether(dst=MAC_ADDR) - / IPv6(dst=IPV6_ADDR) - / UDP(sport=sport, dport=dport) - / Raw(payload) - ) - resp = srp1(req, timeout=1) - assert resp is not None, "expecting answer, got nothing" - check_ipv6_checksum(resp) - assert UDP in resp - udp = resp[UDP] - assert udp.sport == dport - assert udp.dport == sport - resp_payload = udp.payload.load - type_, length, magic = struct.unpack(">HHI", resp_payload[:8]) - tid = resp_payload[8:20] - data = resp_payload[20:] - assert type_ == 0x0101, "expected type 0X0101, got 0x{:04x}".format(type_) - assert length == 24, "expected length 24, got {}".format(length) - assert ( - magic == 0x2112A442 - ), "expected magic 0x2112a442, got 0x{:08x}".format(magic) - assert ( - tid == b"\x00" * 12 - ), "expected tid 0x000000000000000000000000, got {:x}".format(tid) - expected_data = ( - bytes.fromhex("000100140002") - + struct.pack(">H", sport) - + inet_pton(AF_INET6, "2001:41d0::1234:5678") - ) - assert data == expected_data, "unexpected data: {}".format(data) - - -@test -def test_ipv4_udp_stun_change_port(): - sports = [12345, 55555, 80, 43273] - dports = [80, 800, 8000, 3478, 65535] - payload = bytes.fromhex("0001000803a3b9464dd8eb75e19481474293845c0003000400000002") +def test_ipv4_udp_dns_in_a_multiple_queries(): + sports = [53, 13274, 12198, 888, 0] + dports = [53, 5353, 80, 161, 24732] + payload = DNS() for sport in sports: for dport in dports: + qd = DNSQR(qname="www.example1.com", qtype="A", qclass="IN")/DNSQR(qname="www.example2.com", qtype="A", qclass="IN")/DNSQR(qname="www.example3.com", qtype="A", qclass="IN") + dns_req = DNS(id=1234, rd=False, opcode=0, qd=qd) req = ( Ether(dst=MAC_ADDR) / IP(dst=IPV4_ADDR) / UDP(sport=sport, dport=dport) - / Raw(payload) - ) + / dns_req) resp = srp1(req, timeout=1) assert resp is not None, "expecting answer, got nothing" check_ip_checksum(resp) assert UDP in resp, "no UDP layer found" udp = resp[UDP] - assert ( - udp.sport == (dport + 1) % 2**16 - ), "expected answer from UDP/{}, got it from UDP/{}".format( - (dport + 1) % 2**16, udp.sport - ) - assert ( - udp.dport == sport - ), "expected answer to UDP/{}, got it to UDP/{}".format(sport, udp.dport) - resp_payload = udp.payload.load - type_, length = struct.unpack(">HH", resp_payload[:4]) - tid = resp_payload[4:20] - data = resp_payload[20:] - assert type_ == 0x0101, "expected type 0X0101, got 0x{:04x}".format(type_) - assert length == 12, "expected length 12, got {}".format(length) - assert tid == bytes.fromhex("03a3b9464dd8eb75e19481474293845c"), ( - "expected tid 0x03a3b9464dd8eb75e19481474293845c, got %r" % tid - ) - expected_data = b"\x00\x01\x00\x08\x00\x01" + struct.pack( - ">HBBBB", sport, 192, 0, 0, 0 - ) - assert ( - data == expected_data - ), f"unexpected data {data!r} != {expected_data!r}" - - -@test -def test_ipv6_udp_stun_change_port(): - sports = [12345, 55555, 80, 43273] - dports = [80, 800, 8000, 3478, 65535] - payload = bytes.fromhex("0001000803a3b9464dd8eb75e19481474293845c0003000400000002") - for sport in sports: - for dport in dports: - req = ( - Ether(dst=MAC_ADDR) - / IPv6(dst=IPV6_ADDR) - / UDP(sport=sport, dport=dport) - / Raw(payload) - ) - resp = srp1(req, timeout=1) - assert resp is not None, "expecting answer, got nothing" - check_ipv6_checksum(resp) - assert UDP in resp, "expecting UDP layer in answer, got nothing" - udp = resp[UDP] - assert ( - udp.sport == (dport + 1) % 2**16 - ), "expected answer from UDP/{}, got it from UDP/{}".format( - (dport + 1) % 2**16, udp.sport - ) - assert ( - udp.dport == sport - ), "expected answer to UDP/{}, got it to UDP/{}".format(sport, udp.dport) - resp_payload = udp.payload.load - type_, length = struct.unpack(">HH", resp_payload[:4]) - tid = resp_payload[4:20] - data = resp_payload[20:] - assert type_ == 0x0101, "expected type 0X0101, got 0x{:04x}".format(type_) - assert length == 24, "expected length 12, got {}".format(length) - assert tid == bytes.fromhex("03a3b9464dd8eb75e19481474293845c"), ( - "expected tid 0x03a3b9464dd8eb75e19481474293845c, got %r" % tid - ) - expected_data = ( - bytes.fromhex("000100140002") - + struct.pack(">H", sport) - + inet_pton(AF_INET6, "2001:41d0::1234:5678") - ) - assert ( - data == expected_data - ), f"unexpected data {data!r} != {expected_data!r}" -""" + assert udp.sport == dport, "unexpected UDP sport: {}".format(udp.sport) + assert udp.dport == sport, "unexpected UDP dport: {}".format(udp.dport) + if not DNS in udp: + try: + dns_rep = DNS(udp.load) + except Exception: + raise AssertionError("no DNS layer found") + else: + dns_rep = udp[DNS] + assert(dns_rep.id == 1234), f"unexpected id value: {rr.id}" + assert(dns_rep.qr == True), f"unexpected qr value" + assert(dns_rep.opcode == 0), f"unexpected opcode value" + assert(dns_rep.aa == True), f"unexpected aa value" + assert(dns_rep.tc == False), f"unexpected tc value" + assert(dns_rep.rd == False), f"unexpected rd value" + assert(dns_rep.ra == False), f"unexpected ra value" + assert(dns_rep.z == 0), f"unexpected z value" + assert(dns_rep.rcode == 0), f"unexpected rcode value" + assert(dns_rep.qdcount == 3), f"unexpected qdcount value" + assert(dns_rep.ancount == 3), f"unexpected ancount value" + assert(dns_rep.nscount == 0), f"unexpected nscount value" + assert(dns_rep.arcount == 0), f"unexpected arcount value" + for i, q in enumerate(qd): + assert(raw(dns_rep.qd[i]) == raw(dns_req.qd[i])), f"query in request and response do not match" + assert(raw(dns_rep.qd[i].qname) == raw(dns_req.qd[i].qname + b'.')), f"if this test fails, it may mean that scapy fixed the bug in dns.py L134 - if that is so, remove \" + b'.'\" in the test" + assert(dns_rep.an[i].rrname == dns_req.qd[i].qname + b'.'), f"if this test fails, it may mean that scapy fixed the bug in dns.py L134 - if that is so, remove \" + b'.'\" in the test" + assert(dns_rep.an[i].rclass == dns_req.qd[i].qclass), f"class in answer does not match query" + assert(dns_rep.an[i].type == dns_req.qd[i].qtype), f"type in answer does not match query" + assert(dns_rep.an[i].rdata == IPV4_ADDR)