Add test for multiple queries in one request

This commit is contained in:
_Frky 2022-08-04 16:49:18 +02:00
parent 2e296d7546
commit 9638e0900c

View file

@ -17,6 +17,7 @@
from socket import AF_INET6 from socket import AF_INET6
import struct import struct
from scapy.compat import raw
from scapy.layers.dns import DNS, DNSQR from scapy.layers.dns import DNS, DNSQR
from scapy.layers.inet import IP, UDP from scapy.layers.inet import IP, UDP
from scapy.layers.inet6 import IPv6 from scapy.layers.inet6 import IPv6
@ -30,19 +31,20 @@ from ..core import test, check_ip_checksum, check_ipv6_checksum
@test @test
def test_ipv4_udp_dns_a(): def test_ipv4_udp_dns_in_a():
sports = [13274] # [53, 13274, 12198, 888, 0] sports = [53, 13274, 0]
dports = [80] # [53, 5353, 80, 161, 24732] dports = [53, 5353, 80, 161, 24732]
payload = DNS() payload = DNS()
for sport in sports: for sport in sports:
for dport in dports: for dport in dports:
for domain in ['example.com', 'www.example.com', 'masscan.ned']: for domain in ['example.com', 'www.example.com', 'masscan.ned']:
qd = DNSQR(qname=domain, qtype="A", qclass="IN") qd = DNSQR(qname=domain, qtype="A", qclass="IN")
dns_req = DNS(id=1234, rd=False, opcode=0, qd=qd)
req = ( req = (
Ether(dst=MAC_ADDR) Ether(dst=MAC_ADDR)
/ IP(dst=IPV4_ADDR) / IP(dst=IPV4_ADDR)
/ UDP(sport=sport, dport=dport) / UDP(sport=sport, dport=dport)
/ DNS(id=1234, rd=False, opcode=0, qd=qd)) / dns_req)
resp = srp1(req, timeout=1) resp = srp1(req, timeout=1)
assert resp is not None, "expecting answer, got nothing" assert resp is not None, "expecting answer, got nothing"
check_ip_checksum(resp) check_ip_checksum(resp)
@ -52,151 +54,76 @@ def test_ipv4_udp_dns_a():
assert udp.dport == sport, "unexpected UDP dport: {}".format(udp.dport) assert udp.dport == sport, "unexpected UDP dport: {}".format(udp.dport)
if not DNS in udp: if not DNS in udp:
try: try:
rr = DNS(udp.load) dns_rep = DNS(udp.load)
except Exception: except Exception:
raise AssertionError("no DNS layer found") raise AssertionError("no DNS layer found")
else: else:
rr = udp[DNS] dns_rep = udp[DNS]
assert(rr.id == 1234), f"unexpected id value: {rr.id}" assert(dns_rep.id == 1234), f"unexpected id value: {rr.id}"
assert(rr.qr == True), f"unexpected qr value" assert(dns_rep.qr == True), f"unexpected qr value"
assert(rr.opcode == 0), f"unexpected opcode value" assert(dns_rep.opcode == 0), f"unexpected opcode value"
assert(rr.aa == True), f"unexpected aa value" assert(dns_rep.aa == True), f"unexpected aa value"
assert(rr.tc == False), f"unexpected tc value" assert(dns_rep.tc == False), f"unexpected tc value"
assert(rr.rd == False), f"unexpected rd value" assert(dns_rep.rd == False), f"unexpected rd value"
assert(rr.ra == False), f"unexpected ra value" assert(dns_rep.ra == False), f"unexpected ra value"
assert(rr.z == 0), f"unexpected z value" assert(dns_rep.z == 0), f"unexpected z value"
assert(rr.rcode == 0), f"unexpected rcode value" assert(dns_rep.rcode == 0), f"unexpected rcode value"
assert(rr.qdcount == 1), f"unexpected qdcount value" assert(dns_rep.qdcount == 1), f"unexpected qdcount value"
assert(rr.ancount == 1), f"unexpected ancount value" assert(dns_rep.ancount == 1), f"unexpected ancount value"
assert(rr.nscount == 0), f"unexpected nscount value" assert(dns_rep.nscount == 0), f"unexpected nscount value"
assert(rr.arcount == 0), f"unexpected arcount value" assert(dns_rep.arcount == 0), f"unexpected arcount value"
assert(raw(dns_rep.qd[0]) == raw(dns_req.qd[0])), f"query in request and response do not match"
""" assert(raw(dns_rep.qd[0].qname) == raw(dns_req.qd[0].qname + b'.')), f"if this test fails, it may mean that scapy fixed the bug in dns.py L134 - if that is so, remove \" + b'.'\" in the test"
assert(dns_rep.an[0].rrname == dns_req.qd[0].qname + b'.'), f"if this test fails, it may mean that scapy fixed the bug in dns.py L134 - if that is so, remove \" + b'.'\" in the test"
assert(dns_rep.an[0].rclass == dns_req.qd[0].qclass), f"class in answer does not match query"
assert(dns_rep.an[0].type == dns_req.qd[0].qtype), f"type in answer does not match query"
assert(dns_rep.an[0].rdata == IPV4_ADDR)
@test @test
def test_ipv6_udp_stun(): def test_ipv4_udp_dns_in_a_multiple_queries():
sports = [12345, 55555, 80, 43273] sports = [53, 13274, 12198, 888, 0]
dports = [80, 800, 8000, 3478] dports = [53, 5353, 80, 161, 24732]
payload = bytes.fromhex("000100002112a442000000000000000000000000") payload = DNS()
for sport in sports:
for dport in dports:
req = (
Ether(dst=MAC_ADDR)
/ IPv6(dst=IPV6_ADDR)
/ UDP(sport=sport, dport=dport)
/ Raw(payload)
)
resp = srp1(req, timeout=1)
assert resp is not None, "expecting answer, got nothing"
check_ipv6_checksum(resp)
assert UDP in resp
udp = resp[UDP]
assert udp.sport == dport
assert udp.dport == sport
resp_payload = udp.payload.load
type_, length, magic = struct.unpack(">HHI", resp_payload[:8])
tid = resp_payload[8:20]
data = resp_payload[20:]
assert type_ == 0x0101, "expected type 0X0101, got 0x{:04x}".format(type_)
assert length == 24, "expected length 24, got {}".format(length)
assert (
magic == 0x2112A442
), "expected magic 0x2112a442, got 0x{:08x}".format(magic)
assert (
tid == b"\x00" * 12
), "expected tid 0x000000000000000000000000, got {:x}".format(tid)
expected_data = (
bytes.fromhex("000100140002")
+ struct.pack(">H", sport)
+ inet_pton(AF_INET6, "2001:41d0::1234:5678")
)
assert data == expected_data, "unexpected data: {}".format(data)
@test
def test_ipv4_udp_stun_change_port():
sports = [12345, 55555, 80, 43273]
dports = [80, 800, 8000, 3478, 65535]
payload = bytes.fromhex("0001000803a3b9464dd8eb75e19481474293845c0003000400000002")
for sport in sports: for sport in sports:
for dport in dports: for dport in dports:
qd = DNSQR(qname="www.example1.com", qtype="A", qclass="IN")/DNSQR(qname="www.example2.com", qtype="A", qclass="IN")/DNSQR(qname="www.example3.com", qtype="A", qclass="IN")
dns_req = DNS(id=1234, rd=False, opcode=0, qd=qd)
req = ( req = (
Ether(dst=MAC_ADDR) Ether(dst=MAC_ADDR)
/ IP(dst=IPV4_ADDR) / IP(dst=IPV4_ADDR)
/ UDP(sport=sport, dport=dport) / UDP(sport=sport, dport=dport)
/ Raw(payload) / dns_req)
)
resp = srp1(req, timeout=1) resp = srp1(req, timeout=1)
assert resp is not None, "expecting answer, got nothing" assert resp is not None, "expecting answer, got nothing"
check_ip_checksum(resp) check_ip_checksum(resp)
assert UDP in resp, "no UDP layer found" assert UDP in resp, "no UDP layer found"
udp = resp[UDP] udp = resp[UDP]
assert ( assert udp.sport == dport, "unexpected UDP sport: {}".format(udp.sport)
udp.sport == (dport + 1) % 2**16 assert udp.dport == sport, "unexpected UDP dport: {}".format(udp.dport)
), "expected answer from UDP/{}, got it from UDP/{}".format( if not DNS in udp:
(dport + 1) % 2**16, udp.sport try:
) dns_rep = DNS(udp.load)
assert ( except Exception:
udp.dport == sport raise AssertionError("no DNS layer found")
), "expected answer to UDP/{}, got it to UDP/{}".format(sport, udp.dport) else:
resp_payload = udp.payload.load dns_rep = udp[DNS]
type_, length = struct.unpack(">HH", resp_payload[:4]) assert(dns_rep.id == 1234), f"unexpected id value: {rr.id}"
tid = resp_payload[4:20] assert(dns_rep.qr == True), f"unexpected qr value"
data = resp_payload[20:] assert(dns_rep.opcode == 0), f"unexpected opcode value"
assert type_ == 0x0101, "expected type 0X0101, got 0x{:04x}".format(type_) assert(dns_rep.aa == True), f"unexpected aa value"
assert length == 12, "expected length 12, got {}".format(length) assert(dns_rep.tc == False), f"unexpected tc value"
assert tid == bytes.fromhex("03a3b9464dd8eb75e19481474293845c"), ( assert(dns_rep.rd == False), f"unexpected rd value"
"expected tid 0x03a3b9464dd8eb75e19481474293845c, got %r" % tid assert(dns_rep.ra == False), f"unexpected ra value"
) assert(dns_rep.z == 0), f"unexpected z value"
expected_data = b"\x00\x01\x00\x08\x00\x01" + struct.pack( assert(dns_rep.rcode == 0), f"unexpected rcode value"
">HBBBB", sport, 192, 0, 0, 0 assert(dns_rep.qdcount == 3), f"unexpected qdcount value"
) assert(dns_rep.ancount == 3), f"unexpected ancount value"
assert ( assert(dns_rep.nscount == 0), f"unexpected nscount value"
data == expected_data assert(dns_rep.arcount == 0), f"unexpected arcount value"
), f"unexpected data {data!r} != {expected_data!r}" for i, q in enumerate(qd):
assert(raw(dns_rep.qd[i]) == raw(dns_req.qd[i])), f"query in request and response do not match"
assert(raw(dns_rep.qd[i].qname) == raw(dns_req.qd[i].qname + b'.')), f"if this test fails, it may mean that scapy fixed the bug in dns.py L134 - if that is so, remove \" + b'.'\" in the test"
@test assert(dns_rep.an[i].rrname == dns_req.qd[i].qname + b'.'), f"if this test fails, it may mean that scapy fixed the bug in dns.py L134 - if that is so, remove \" + b'.'\" in the test"
def test_ipv6_udp_stun_change_port(): assert(dns_rep.an[i].rclass == dns_req.qd[i].qclass), f"class in answer does not match query"
sports = [12345, 55555, 80, 43273] assert(dns_rep.an[i].type == dns_req.qd[i].qtype), f"type in answer does not match query"
dports = [80, 800, 8000, 3478, 65535] assert(dns_rep.an[i].rdata == IPV4_ADDR)
payload = bytes.fromhex("0001000803a3b9464dd8eb75e19481474293845c0003000400000002")
for sport in sports:
for dport in dports:
req = (
Ether(dst=MAC_ADDR)
/ IPv6(dst=IPV6_ADDR)
/ UDP(sport=sport, dport=dport)
/ Raw(payload)
)
resp = srp1(req, timeout=1)
assert resp is not None, "expecting answer, got nothing"
check_ipv6_checksum(resp)
assert UDP in resp, "expecting UDP layer in answer, got nothing"
udp = resp[UDP]
assert (
udp.sport == (dport + 1) % 2**16
), "expected answer from UDP/{}, got it from UDP/{}".format(
(dport + 1) % 2**16, udp.sport
)
assert (
udp.dport == sport
), "expected answer to UDP/{}, got it to UDP/{}".format(sport, udp.dport)
resp_payload = udp.payload.load
type_, length = struct.unpack(">HH", resp_payload[:4])
tid = resp_payload[4:20]
data = resp_payload[20:]
assert type_ == 0x0101, "expected type 0X0101, got 0x{:04x}".format(type_)
assert length == 24, "expected length 12, got {}".format(length)
assert tid == bytes.fromhex("03a3b9464dd8eb75e19481474293845c"), (
"expected tid 0x03a3b9464dd8eb75e19481474293845c, got %r" % tid
)
expected_data = (
bytes.fromhex("000100140002")
+ struct.pack(">H", sport)
+ inet_pton(AF_INET6, "2001:41d0::1234:5678")
)
assert (
data == expected_data
), f"unexpected data {data!r} != {expected_data!r}"
"""