Fix flake8

This commit is contained in:
_Frky 2022-08-04 17:33:30 +02:00
parent aaf2eb5e8f
commit 1030dc7d43

View file

@ -14,27 +14,20 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with Masscanned. If not, see <http://www.gnu.org/licenses/>. # along with Masscanned. If not, see <http://www.gnu.org/licenses/>.
from socket import AF_INET6
import struct
from scapy.compat import raw from scapy.compat import raw
from scapy.layers.dns import DNS, DNSQR from scapy.layers.dns import DNS, DNSQR
from scapy.layers.inet import IP, UDP from scapy.layers.inet import IP, UDP
from scapy.layers.inet6 import IPv6
from scapy.layers.l2 import Ether from scapy.layers.l2 import Ether
from scapy.packet import Raw
from scapy.pton_ntop import inet_pton
from scapy.sendrecv import srp1 from scapy.sendrecv import srp1
from ..conf import IPV4_ADDR, IPV6_ADDR, MAC_ADDR from ..conf import IPV4_ADDR, MAC_ADDR
from ..core import test, check_ip_checksum, check_ipv6_checksum from ..core import test, check_ip_checksum
@test @test
def test_ipv4_udp_dns_in_a(): def test_ipv4_udp_dns_in_a():
sports = [53, 13274, 0] sports = [53, 13274, 0]
dports = [53, 5353, 80, 161, 24732] dports = [53, 5353, 80, 161, 24732]
payload = DNS()
for sport in sports: for sport in sports:
for dport in dports: for dport in dports:
for domain in ["example.com", "www.example.com", "masscan.ned"]: for domain in ["example.com", "www.example.com", "masscan.ned"]:
@ -53,41 +46,41 @@ def test_ipv4_udp_dns_in_a():
udp = resp[UDP] udp = resp[UDP]
assert udp.sport == dport, "unexpected UDP sport: {}".format(udp.sport) assert udp.sport == dport, "unexpected UDP sport: {}".format(udp.sport)
assert udp.dport == sport, "unexpected UDP dport: {}".format(udp.dport) assert udp.dport == sport, "unexpected UDP dport: {}".format(udp.dport)
if not DNS in udp: if DNS not in udp:
try: try:
dns_rep = DNS(udp.load) dns_rep = DNS(udp.load)
except Exception: except Exception:
raise AssertionError("no DNS layer found") raise AssertionError("no DNS layer found")
else: else:
dns_rep = udp[DNS] dns_rep = udp[DNS]
assert dns_rep.id == 1234, f"unexpected id value: {rr.id}" assert dns_rep.id == 1234, f"unexpected id value: {dns_rep.id}"
assert dns_rep.qr == True, f"unexpected qr value" assert dns_rep.qr, "unexpected qr value"
assert dns_rep.opcode == 0, f"unexpected opcode value" assert dns_rep.opcode == 0, "unexpected opcode value"
assert dns_rep.aa == True, f"unexpected aa value" assert dns_rep.aa, "unexpected aa value"
assert dns_rep.tc == False, f"unexpected tc value" assert not dns_rep.tc, "unexpected tc value"
assert dns_rep.rd == False, f"unexpected rd value" assert not dns_rep.rd, "unexpected rd value"
assert dns_rep.ra == False, f"unexpected ra value" assert not dns_rep.ra, "unexpected ra value"
assert dns_rep.z == 0, f"unexpected z value" assert dns_rep.z == 0, "unexpected z value"
assert dns_rep.rcode == 0, f"unexpected rcode value" assert dns_rep.rcode == 0, "unexpected rcode value"
assert dns_rep.qdcount == 1, f"unexpected qdcount value" assert dns_rep.qdcount == 1, "unexpected qdcount value"
assert dns_rep.ancount == 1, f"unexpected ancount value" assert dns_rep.ancount == 1, "unexpected ancount value"
assert dns_rep.nscount == 0, f"unexpected nscount value" assert dns_rep.nscount == 0, "unexpected nscount value"
assert dns_rep.arcount == 0, f"unexpected arcount value" assert dns_rep.arcount == 0, "unexpected arcount value"
assert raw(dns_rep.qd[0]) == raw( assert raw(dns_rep.qd[0]) == raw(
dns_req.qd[0] dns_req.qd[0]
), f"query in request and response do not match" ), "query in request and response do not match"
assert raw(dns_rep.qd[0].qname) == raw( assert raw(dns_rep.qd[0].qname) == raw(
dns_req.qd[0].qname + b"." dns_req.qd[0].qname + b"."
), f"if this test fails, it may mean that scapy fixed the bug in dns.py L134 - if that is so, remove \" + b'.'\" in the test" ), "if this test fails, it may mean that scapy fixed the bug in dns.py L134 - if that is so, remove \" + b'.'\" in the test"
assert ( assert (
dns_rep.an[0].rrname == dns_req.qd[0].qname + b"." dns_rep.an[0].rrname == dns_req.qd[0].qname + b"."
), f"if this test fails, it may mean that scapy fixed the bug in dns.py L134 - if that is so, remove \" + b'.'\" in the test" ), "if this test fails, it may mean that scapy fixed the bug in dns.py L134 - if that is so, remove \" + b'.'\" in the test"
assert ( assert (
dns_rep.an[0].rclass == dns_req.qd[0].qclass dns_rep.an[0].rclass == dns_req.qd[0].qclass
), f"class in answer does not match query" ), "class in answer does not match query"
assert ( assert (
dns_rep.an[0].type == dns_req.qd[0].qtype dns_rep.an[0].type == dns_req.qd[0].qtype
), f"type in answer does not match query" ), "type in answer does not match query"
assert dns_rep.an[0].rdata == IPV4_ADDR assert dns_rep.an[0].rdata == IPV4_ADDR
@ -95,7 +88,6 @@ def test_ipv4_udp_dns_in_a():
def test_ipv4_udp_dns_in_a_multiple_queries(): def test_ipv4_udp_dns_in_a_multiple_queries():
sports = [53, 13274, 12198, 888, 0] sports = [53, 13274, 12198, 888, 0]
dports = [53, 5353, 80, 161, 24732] dports = [53, 5353, 80, 161, 24732]
payload = DNS()
for sport in sports: for sport in sports:
for dport in dports: for dport in dports:
qd = ( qd = (
@ -117,40 +109,40 @@ def test_ipv4_udp_dns_in_a_multiple_queries():
udp = resp[UDP] udp = resp[UDP]
assert udp.sport == dport, "unexpected UDP sport: {}".format(udp.sport) assert udp.sport == dport, "unexpected UDP sport: {}".format(udp.sport)
assert udp.dport == sport, "unexpected UDP dport: {}".format(udp.dport) assert udp.dport == sport, "unexpected UDP dport: {}".format(udp.dport)
if not DNS in udp: if DNS not in udp:
try: try:
dns_rep = DNS(udp.load) dns_rep = DNS(udp.load)
except Exception: except Exception:
raise AssertionError("no DNS layer found") raise AssertionError("no DNS layer found")
else: else:
dns_rep = udp[DNS] dns_rep = udp[DNS]
assert dns_rep.id == 1234, f"unexpected id value: {rr.id}" assert dns_rep.id == 1234, f"unexpected id value: {dns_rep.id}"
assert dns_rep.qr == True, f"unexpected qr value" assert dns_rep.qr, "unexpected qr value"
assert dns_rep.opcode == 0, f"unexpected opcode value" assert dns_rep.opcode == 0, "unexpected opcode value"
assert dns_rep.aa == True, f"unexpected aa value" assert dns_rep.aa, "unexpected aa value"
assert dns_rep.tc == False, f"unexpected tc value" assert not dns_rep.tc, "unexpected tc value"
assert dns_rep.rd == False, f"unexpected rd value" assert not dns_rep.rd, "unexpected rd value"
assert dns_rep.ra == False, f"unexpected ra value" assert not dns_rep.ra, "unexpected ra value"
assert dns_rep.z == 0, f"unexpected z value" assert dns_rep.z == 0, "unexpected z value"
assert dns_rep.rcode == 0, f"unexpected rcode value" assert dns_rep.rcode == 0, "unexpected rcode value"
assert dns_rep.qdcount == 3, f"unexpected qdcount value" assert dns_rep.qdcount == 3, "unexpected qdcount value"
assert dns_rep.ancount == 3, f"unexpected ancount value" assert dns_rep.ancount == 3, "unexpected ancount value"
assert dns_rep.nscount == 0, f"unexpected nscount value" assert dns_rep.nscount == 0, "unexpected nscount value"
assert dns_rep.arcount == 0, f"unexpected arcount value" assert dns_rep.arcount == 0, "unexpected arcount value"
for i, q in enumerate(qd): for i, q in enumerate(qd):
assert raw(dns_rep.qd[i]) == raw( assert raw(dns_rep.qd[i]) == raw(
dns_req.qd[i] dns_req.qd[i]
), f"query in request and response do not match" ), "query in request and response do not match"
assert raw(dns_rep.qd[i].qname) == raw( assert raw(dns_rep.qd[i].qname) == raw(
dns_req.qd[i].qname + b"." dns_req.qd[i].qname + b"."
), f"if this test fails, it may mean that scapy fixed the bug in dns.py L134 - if that is so, remove \" + b'.'\" in the test" ), "if this test fails, it may mean that scapy fixed the bug in dns.py L134 - if that is so, remove \" + b'.'\" in the test"
assert ( assert (
dns_rep.an[i].rrname == dns_req.qd[i].qname + b"." dns_rep.an[i].rrname == dns_req.qd[i].qname + b"."
), f"if this test fails, it may mean that scapy fixed the bug in dns.py L134 - if that is so, remove \" + b'.'\" in the test" ), "if this test fails, it may mean that scapy fixed the bug in dns.py L134 - if that is so, remove \" + b'.'\" in the test"
assert ( assert (
dns_rep.an[i].rclass == dns_req.qd[i].qclass dns_rep.an[i].rclass == dns_req.qd[i].qclass
), f"class in answer does not match query" ), "class in answer does not match query"
assert ( assert (
dns_rep.an[i].type == dns_req.qd[i].qtype dns_rep.an[i].type == dns_req.qd[i].qtype
), f"type in answer does not match query" ), "type in answer does not match query"
assert dns_rep.an[i].rdata == IPV4_ADDR assert dns_rep.an[i].rdata == IPV4_ADDR