From beefcc9185638ef02c238c3bff69a339ede83421 Mon Sep 17 00:00:00 2001 From: gpotter2 Date: Tue, 15 Feb 2022 14:55:09 +0100 Subject: [PATCH 1/2] Test suite refactor --- .github/workflows/test.yml | 2 +- README.md | 8 + src/proto/smb.rs | 3 +- test/src/all.py | 1308 +----------------------------------- test/src/core.py | 98 +++ test/src/tests/__init__.py | 15 + test/src/tests/arp.py | 51 ++ test/src/tests/ghost.py | 87 +++ test/src/tests/http.py | 339 ++++++++++ test/src/tests/icmpv4.py | 45 ++ test/src/tests/icmpv6.py | 87 +++ test/src/tests/ip46.py | 291 ++++++++ test/src/tests/rpc.py | 99 +++ test/src/tests/smb.py | 65 ++ test/src/tests/ssh.py | 217 ++++++ test/src/tests/stun.py | 196 ++++++ test/test_masscanned.py | 14 +- 17 files changed, 1625 insertions(+), 1300 deletions(-) create mode 100644 test/src/core.py create mode 100644 test/src/tests/__init__.py create mode 100644 test/src/tests/arp.py create mode 100644 test/src/tests/ghost.py create mode 100644 test/src/tests/http.py create mode 100644 test/src/tests/icmpv4.py create mode 100644 test/src/tests/icmpv6.py create mode 100644 test/src/tests/ip46.py create mode 100644 test/src/tests/rpc.py create mode 100644 test/src/tests/smb.py create mode 100644 test/src/tests/ssh.py create mode 100644 test/src/tests/stun.py diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 6ac573d..05c8984 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -87,7 +87,7 @@ jobs: run: sudo pip install -U flake8 black - name: Install packages for tests - run: sudo apt-get -q update && sudo apt-get -qy install nmap rpcbind + run: sudo apt-get -q update && sudo apt-get -qy install nmap rpcbind smbclient - name: Run black run: black -t py36 --check test/test_masscanned.py test/src/ diff --git a/README.md b/README.md index 41574c3..f3ebf81 100644 --- a/README.md +++ b/README.md @@ -290,6 +290,14 @@ tcpdump: pcap_loop: The interface disappeared 0 packets dropped by kernel ``` +You can also chose what tests to run using the `TESTS` environment variable +``` +TESTS=smb ./test/test_masscanned.py +INFO test_smb1_network_req.............................OK +INFO test_smb2_network_req.............................OK +INFO Ran 2 tests with 1 errors +``` + ## Logging ### Console Logger diff --git a/src/proto/smb.rs b/src/proto/smb.rs index 0e1ad4b..dcd5993 100644 --- a/src/proto/smb.rs +++ b/src/proto/smb.rs @@ -20,7 +20,6 @@ use std::convert::TryInto; use std::time::SystemTime; use crate::client::ClientInfo; -use crate::logger::MetaLogger; use crate::Masscanned; // NBTSession + SMB Header @@ -990,6 +989,8 @@ pub fn repl_smb2<'a>( #[cfg(test)] mod tests { use super::*; + use crate::logger::MetaLogger; + use itertools::assert_equal; use pnet::util::MacAddr; use std::str::FromStr; diff --git a/test/src/all.py b/test/src/all.py index e11d7d4..96f9062 100644 --- a/test/src/all.py +++ b/test/src/all.py @@ -14,1290 +14,28 @@ # You should have received a copy of the GNU General Public License # along with Masscanned. If not, see . -import json -import logging +import importlib import os -import re -from socket import AF_INET6 -from subprocess import check_call -import struct -from tempfile import NamedTemporaryFile -import zlib -from ivre.db import DBNmap -from scapy.compat import raw -from scapy.data import ETHER_BROADCAST -from scapy.layers.inet import ICMP, IP, TCP, UDP -from scapy.layers.inet6 import ( - ICMPv6EchoReply, - ICMPv6EchoRequest, - ICMPv6ND_NA, - ICMPv6ND_NS, - ICMPv6NDOptDstLLAddr, - IPv6, -) -from scapy.layers.l2 import ARP, Ether -from scapy.pton_ntop import inet_pton -from scapy.packet import Raw -from scapy.sendrecv import srp1 -from scapy.volatile import RandInt - -from .conf import IPV4_ADDR, IPV6_ADDR, MAC_ADDR - - -def setup_logs(): - ch = logging.StreamHandler() - ch.setFormatter(logging.Formatter("%(levelname)s\t%(message)s")) - ch.setLevel(logging.DEBUG) - log = logging.getLogger(__name__) - log.setLevel(logging.DEBUG) - log.addHandler(ch) - return log - - -LOG = setup_logs() -TESTS = [] -ERRORS = [] - - -# decorator to automatically add a function to tests -def test(f): - global ERRORS, TESTS - OK = "\033[1mOK\033[0m" - KO = "\033[1m\033[1;%dmKO\033[0m" % 31 - fname = f.__name__.ljust(50, ".") - - def w(): - try: - f() - LOG.info("{}{}".format(fname, OK)) - except AssertionError as e: - LOG.error("{}{}: {}".format(fname, KO, e)) - ERRORS.append(fname) - - TESTS.append(w) - return w - - -def multicast(ip6): - a, b = ip6.split(":")[-2:] - mac = ["33", "33", "ff"] - if len(a) == 4: - mac.append(a[2:]) - else: - mac.append("00") - if len(b) >= 2: - mac.append(b[:2]) - else: - mac.append("00") - if len(b) >= 4: - mac.append(b[2:]) - else: - mac.append("00") - return ":".join(mac) - - -def check_ip_checksum(pkt): - assert IP in pkt, "no IP layer found" - ip_pkt = pkt[IP] - chksum = ip_pkt.chksum - del ip_pkt.chksum - assert IP(raw(ip_pkt)).chksum == chksum, "bad IPv4 checksum" - - -def check_ipv6_checksum(pkt): - assert IPv6 in pkt, "no IP layer found" - ip_pkt = pkt[IPv6] - chksum = ip_pkt.chksum - del ip_pkt.chksum - assert IPv6(raw(ip_pkt)).chksum == chksum, "bad IPv6 checksum" - - -@test -def test_arp_req(): - ##### ARP ##### - arp_req = Ether(dst=ETHER_BROADCAST) / ARP(pdst=IPV4_ADDR) - arp_repl = srp1(arp_req, timeout=1) - assert arp_repl is not None, "expecting answer, got nothing" - assert ARP in arp_repl, "no ARP layer found" - arp_repl = arp_repl[ARP] - # check answer - ## op is "is-at" - assert arp_repl.op == 2, "unexpected ARP op: {}".format(arp_repl.op) - ## answer for the requested IP - assert arp_repl.psrc == arp_req.pdst, "unexpected ARP psrc: {}".format( - arp_repl.psrc - ) - assert arp_repl.pdst == arp_req.psrc, "unexpected ARP pdst: {}".format( - arp_repl.pdst - ) - ## answer is expected MAC address - assert arp_repl.hwsrc == MAC_ADDR, "unexpected ARP hwsrc: {}".format(arp_repl.hwsrc) - - -@test -def test_arp_req_other_ip(): - ##### ARP ##### - arp_req = Ether(dst=ETHER_BROADCAST) / ARP(pdst="1.2.3.4") - arp_repl = srp1(arp_req, timeout=1) - assert arp_repl is None, "responding to ARP requests for other IP addresses" - - -@test -def test_ipv4_req(): - ##### IP ##### - ip_req = Ether(dst=MAC_ADDR) / IP(dst=IPV4_ADDR, id=0x1337) / ICMP(type=8, code=0) - ip_repl = srp1(ip_req, timeout=1) - assert ip_repl is not None, "expecting answer, got nothing" - check_ip_checksum(ip_repl) - assert IP in ip_repl, "no IP layer in response" - ip_repl = ip_repl[IP] - assert ip_repl.id == 0, "IP identification unexpected" - - -@test -def test_eth_req_other_mac(): - #### ETH #### - ip_req = Ether(dst="00:00:00:11:11:11") / IP(dst=IPV4_ADDR) / ICMP(type=8, code=0) - ip_repl = srp1(ip_req, timeout=1) - assert ip_repl is None, "responding to other MAC addresses" - - -@test -def test_ipv4_req_other_ip(): - ##### IP ##### - ip_req = Ether(dst=MAC_ADDR) / IP(dst="1.2.3.4") / ICMP(type=8, code=0) - ip_repl = srp1(ip_req, timeout=1) - assert ip_repl is None, "responding to other IP addresses" - - -@test -def test_icmpv4_echo_req(): - ##### ICMPv4 ##### - icmp_req = ( - Ether(dst=MAC_ADDR) - / IP(dst=IPV4_ADDR) - / ICMP(type=8, code=0) - / Raw("idrinkwaytoomuchcoffee") - ) - icmp_repl = srp1(icmp_req, timeout=1) - assert icmp_repl is not None, "expecting answer, got nothing" - check_ip_checksum(icmp_repl) - assert ICMP in icmp_repl - icmp_repl = icmp_repl[ICMP] - # check answer - ## type is "echo-reply" - assert icmp_repl.type == 0 - assert icmp_repl.code == 0 - ## data is the same as sent - assert icmp_repl.load == icmp_req.load - - -@test -def test_icmpv6_neighbor_solicitation(): - ##### IPv6 Neighbor Solicitation ##### - for mac in [ - "ff:ff:ff:ff:ff:ff", - "33:33:00:00:00:01", - MAC_ADDR, - multicast(IPV6_ADDR), - ]: - nd_ns = Ether(dst=mac) / IPv6() / ICMPv6ND_NS(tgt=IPV6_ADDR) - nd_na = srp1(nd_ns, timeout=1) - assert nd_na is not None, "expecting answer, got nothing" - assert ICMPv6ND_NA in nd_na - nd_na = nd_na[ICMPv6ND_NA] - # check answer content - assert nd_na.code == 0 - assert nd_na.R == 0 - assert nd_na.S == 1 - assert nd_na.O == 1 # noqa: E741 - assert nd_na.tgt == IPV6_ADDR - # check ND Option - assert nd_na.haslayer(ICMPv6NDOptDstLLAddr) - assert nd_na.getlayer(ICMPv6NDOptDstLLAddr).lladdr == MAC_ADDR - for mac in ["00:00:00:00:00:00", "33:33:33:00:00:01"]: - nd_ns = Ether(dst="ff:ff:ff:ff:ff:ff") / IPv6() / ICMPv6ND_NS(tgt=IPV6_ADDR) - nd_na = srp1(nd_ns, timeout=1) - assert nd_na is not None, "expecting no answer, got one" - - -@test -def test_icmpv6_neighbor_solicitation_other_ip(): - ##### IPv6 Neighbor Solicitation ##### - nd_ns = ( - Ether(dst="ff:ff:ff:ff:ff:ff") - / IPv6() - / ICMPv6ND_NS(tgt="2020:4141:3030:2020::bdbd") - ) - nd_na = srp1(nd_ns, timeout=1) - assert nd_na is None, "responding to ND_NS for other IP addresses" - - -@test -def test_icmpv6_echo_req(): - ##### IPv6 Ping ##### - echo_req = ( - Ether(dst=MAC_ADDR) - / IPv6(dst=IPV6_ADDR) - / ICMPv6EchoRequest(data="waytoomanynapkins") - ) - echo_repl = srp1(echo_req, timeout=1) - assert echo_repl is not None, "expecting answer, got nothing" - assert ICMPv6EchoReply in echo_repl - echo_repl = echo_repl[ICMPv6EchoReply] - # check answer content - assert echo_repl.code == 0 - assert echo_repl.data == echo_req.data - - -@test -def test_tcp_syn(): - ##### SYN-ACK ##### - # test a list of ports, randomly generated once - ports_to_test = [ - 1152, - 2003, - 2193, - 3709, - 4054, - 6605, - 6737, - 6875, - 7320, - 8898, - 9513, - 9738, - 10623, - 10723, - 11253, - 12125, - 12189, - 12873, - 14648, - 14659, - 16242, - 16243, - 17209, - 17492, - 17667, - 17838, - 18081, - 18682, - 18790, - 19124, - 19288, - 19558, - 19628, - 19789, - 20093, - 21014, - 21459, - 21740, - 24070, - 24312, - 24576, - 26939, - 27136, - 27165, - 27361, - 29971, - 31088, - 33011, - 33068, - 34990, - 35093, - 35958, - 36626, - 36789, - 37130, - 37238, - 37256, - 37697, - 37890, - 38958, - 42131, - 43864, - 44420, - 44655, - 44868, - 45157, - 46213, - 46497, - 46955, - 49049, - 49067, - 49452, - 49480, - 50498, - 50945, - 51181, - 52890, - 53301, - 53407, - 53417, - 53980, - 55827, - 56483, - 58552, - 58713, - 58836, - 59362, - 59560, - 60534, - 60555, - 60660, - 61615, - 62402, - 62533, - 62941, - 63240, - 63339, - 63616, - 64380, - 65438, - ] - for p in ports_to_test: - seq_init = int(RandInt()) - syn = ( - Ether(dst=MAC_ADDR) - / IP(dst=IPV4_ADDR) - / TCP(flags="S", dport=p, seq=seq_init) - ) - syn_ack = srp1(syn, timeout=1) - assert syn_ack is not None, "expecting answer, got nothing" - check_ip_checksum(syn_ack) - assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary() - syn_ack = syn_ack[TCP] - assert syn_ack.flags == "SA", "expecting TCP SA, got %r" % syn_ack.flags - assert syn_ack.ack == seq_init + 1, "wrong TCP ack value (%r != %r)" % ( - syn_ack.ack, - seq_init + 1, - ) - - -@test -def test_ipv4_tcp_psh_ack(): - ##### PSH-ACK ##### - sport = 26695 - port = 445 - seq_init = int(RandInt()) - # send PSH-ACK first - psh_ack = ( - Ether(dst=MAC_ADDR) - / IP(dst=IPV4_ADDR) - / TCP(flags="PA", sport=sport, dport=port, seq=seq_init) - / Raw("payload") - ) - syn_ack = srp1(psh_ack, timeout=1) - assert syn_ack is None, "no answer expected, got one" - # test the anti-injection mechanism - seq_init = int(RandInt()) - syn = ( - Ether(dst=MAC_ADDR) - / IP(dst=IPV4_ADDR) - / TCP(flags="S", sport=sport, dport=port, seq=seq_init) - ) - syn_ack = srp1(syn, timeout=1) - assert syn_ack is not None, "expecting answer, got nothing" - check_ip_checksum(syn_ack) - assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary() - syn_ack = syn_ack[TCP] - assert syn_ack.flags == "SA", "expecting TCP SA, got %r" % syn_ack.flags - assert syn_ack.ack == seq_init + 1, "wrong TCP ack value (%r != %r)" % ( - syn_ack.ack, - seq_init + 1, - ) - ack = Ether(dst=MAC_ADDR) / IP(dst=IPV4_ADDR) / TCP(flags="A", dport=port) - # should fail because no ack given - psh_ack = ( - Ether(dst=MAC_ADDR) - / IP(dst=IPV4_ADDR) - / TCP(flags="PA", sport=sport, dport=port, ack=0, seq=seq_init + 1) - ) - ack = srp1(psh_ack, timeout=1) - assert ack is None, "no answer expected, got one" - # should get an answer this time - psh_ack = ( - Ether(dst=MAC_ADDR) - / IP(dst=IPV4_ADDR) - / TCP( - flags="PA", sport=sport, dport=port, ack=syn_ack.seq + 1, seq=seq_init + 1 - ) - ) - ack = srp1(psh_ack, timeout=1) - assert ack is not None, "expecting answer, got nothing" - check_ip_checksum(ack) - assert TCP in ack, "expecting TCP, got %r" % ack.summary() - ack = ack[TCP] - assert ack.flags == "A", "expecting TCP A, got %r" % syn_ack.flags - - -@test -def test_ipv6_tcp_psh_ack(): - ##### PSH-ACK ##### - sport = 26695 - port = 445 - seq_init = int(RandInt()) - # send PSH-ACK first - psh_ack = ( - Ether(dst=MAC_ADDR) - / IPv6(dst=IPV6_ADDR) - / TCP(flags="PA", sport=sport, dport=port, seq=seq_init) - / Raw("payload") - ) - syn_ack = srp1(psh_ack, timeout=1) - assert syn_ack is None, "no answer expected, got one" - # test the anti-injection mechanism - syn = ( - Ether(dst=MAC_ADDR) - / IPv6(dst=IPV6_ADDR) - / TCP(flags="S", sport=sport, dport=port, seq=seq_init) - ) - syn_ack = srp1(syn, timeout=1) - assert syn_ack is not None, "expecting answer, got nothing" - check_ipv6_checksum(syn_ack) - assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary() - syn_ack = syn_ack[TCP] - assert syn_ack.flags == "SA", "expecting TCP SA, got %r" % syn_ack.flags - assert syn_ack.ack == seq_init + 1, "wrong TCP ack value (%r != %r)" % ( - syn_ack.ack, - seq_init + 1, - ) - ack = Ether(dst=MAC_ADDR) / IPv6(dst=IPV6_ADDR) / TCP(flags="A", dport=port) - # should fail because no ack given - psh_ack = ( - Ether(dst=MAC_ADDR) - / IPv6(dst=IPV6_ADDR) - / TCP(flags="PA", sport=sport, dport=port, ack=0, seq=seq_init + 1) - ) - ack = srp1(psh_ack, timeout=1) - assert ack is None, "no answer expected, got one" - # should get an answer this time - psh_ack = ( - Ether(dst=MAC_ADDR) - / IPv6(dst=IPV6_ADDR) - / TCP( - flags="PA", sport=sport, dport=port, ack=syn_ack.seq + 1, seq=seq_init + 1 - ) - ) - ack = srp1(psh_ack, timeout=1) - assert ack is not None, "expecting answer, got nothing" - check_ipv6_checksum(ack) - assert TCP in ack, "expecting TCP, got %r" % ack.summary() - ack = ack[TCP] - assert ack.flags == "A", "expecting TCP A, got %r" % syn_ack.flags - - -@test -def test_ipv4_tcp_http(): - sport = 24592 - dports = [80, 443, 5000, 53228] - for dport in dports: - seq_init = int(RandInt()) - syn = ( - Ether(dst=MAC_ADDR) - / IP(dst=IPV4_ADDR) - / TCP(flags="S", sport=sport, dport=dport, seq=seq_init) - ) - syn_ack = srp1(syn, timeout=1) - assert syn_ack is not None, "expecting answer, got nothing" - check_ip_checksum(syn_ack) - assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary() - syn_ack = syn_ack[TCP] - assert syn_ack.flags == "SA", "expecting TCP SA, got %r" % syn_ack.flags - ack = ( - Ether(dst=MAC_ADDR) - / IP(dst=IPV4_ADDR) - / TCP( - flags="A", - sport=sport, - dport=dport, - seq=seq_init + 1, - ack=syn_ack.seq + 1, - ) - ) - _ = srp1(ack, timeout=1) - req = ( - Ether(dst=MAC_ADDR) - / IP(dst=IPV4_ADDR) - / TCP( - flags="PA", - sport=sport, - dport=dport, - seq=seq_init + 1, - ack=syn_ack.seq + 1, - ) - / Raw("GET / HTTP/1.1\r\n\r\n") - ) - resp = srp1(req, timeout=1) - assert resp is not None, "expecting answer, got nothing" - check_ip_checksum(resp) - assert TCP in resp, "expecting TCP, got %r" % resp.summary() - tcp = resp[TCP] - assert tcp.payload.load.startswith(b"HTTP/1.1 401 Unauthorized\n") - - -@test -def test_ipv4_tcp_http_incomplete(): - sport = 24595 - dports = [80, 443, 5000, 53228] - for dport in dports: - seq_init = int(RandInt()) - syn = ( - Ether(dst=MAC_ADDR) - / IP(dst=IPV4_ADDR) - / TCP(flags="S", sport=sport, dport=dport, seq=seq_init) - ) - syn_ack = srp1(syn, timeout=1) - assert syn_ack is not None, "expecting answer, got nothing" - check_ip_checksum(syn_ack) - assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary() - syn_ack = syn_ack[TCP] - assert syn_ack.flags == "SA", "expecting TCP SA, got %r" % syn_ack.flags - ack = ( - Ether(dst=MAC_ADDR) - / IP(dst=IPV4_ADDR) - / TCP( - flags="A", - sport=sport, - dport=dport, - seq=seq_init + 1, - ack=syn_ack.seq + 1, - ) - ) - _ = srp1(ack, timeout=1) - req = ( - Ether(dst=MAC_ADDR) - / IP(dst=IPV4_ADDR) - / TCP( - flags="PA", - sport=sport, - dport=dport, - seq=seq_init + 1, - ack=syn_ack.seq + 1, - ) - # purposedly incomplete request (missing additionnal ending \r\n) - / Raw("GET / HTTP/1.1\r\n") - ) - resp = srp1(req, timeout=1) - assert resp is not None, "expecting an answer, got none" - check_ip_checksum(resp) - assert TCP in resp, "expecting TCP, got %r" % resp.summary() - tcp = resp[TCP] - assert tcp.flags == "A", "expecting TCP flag A, got {}".format(tcp.flags) - - -@test -def test_ipv6_tcp_http(): - sport = 24592 - dports = [80, 443, 5000, 53228] - for dport in dports: - seq_init = int(RandInt()) - syn = ( - Ether(dst=MAC_ADDR) - / IPv6(dst=IPV6_ADDR) - / TCP(flags="S", sport=sport, dport=dport, seq=seq_init) - ) - syn_ack = srp1(syn, timeout=1) - assert syn_ack is not None, "expecting answer, got nothing" - check_ipv6_checksum(syn_ack) - assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary() - syn_ack = syn_ack[TCP] - assert syn_ack.flags == "SA" - ack = ( - Ether(dst=MAC_ADDR) - / IPv6(dst=IPV6_ADDR) - / TCP( - flags="A", - sport=sport, - dport=dport, - seq=seq_init + 1, - ack=syn_ack.seq + 1, - ) - ) - _ = srp1(ack, timeout=1) - req = ( - Ether(dst=MAC_ADDR) - / IPv6(dst=IPV6_ADDR) - / TCP( - flags="PA", - sport=sport, - dport=dport, - seq=seq_init + 1, - ack=syn_ack.seq + 1, - ) - / Raw("GET / HTTP/1.1\r\n\r\n") - ) - resp = srp1(req, timeout=1) - assert resp is not None, "expecting answer, got nothing" - check_ipv6_checksum(resp) - assert TCP in resp, "expecting TCP, got %r" % resp.summary() - tcp = resp[TCP] - assert tcp.payload.load.startswith(b"HTTP/1.1 401 Unauthorized\n") - - -@test -def test_ipv4_udp_http(): - sport = 24592 - dports = [80, 443, 5000, 53228] - for dport in dports: - req = ( - Ether(dst=MAC_ADDR) - / IP(dst=IPV4_ADDR) - / UDP(sport=sport, dport=dport) - / Raw("GET / HTTP/1.1\r\n\r\n") - ) - resp = srp1(req, timeout=1) - assert resp is not None, "expecting answer, got nothing" - check_ip_checksum(resp) - assert UDP in resp - udp = resp[UDP] - assert udp.payload.load.startswith(b"HTTP/1.1 401 Unauthorized\n") - - -@test -def test_ipv6_udp_http(): - sport = 24592 - dports = [80, 443, 5000, 53228] - for dport in dports: - req = ( - Ether(dst=MAC_ADDR) - / IPv6(dst=IPV6_ADDR) - / UDP(sport=sport, dport=dport) - / Raw("GET / HTTP/1.1\r\n\r\n") - ) - resp = srp1(req, timeout=1) - assert resp is not None, "expecting answer, got nothing" - check_ipv6_checksum(resp) - assert UDP in resp - udp = resp[UDP] - assert udp.payload.load.startswith(b"HTTP/1.1 401 Unauthorized\n") - - -@test -def test_ipv4_tcp_http_ko(): - sport = 24592 - dports = [80, 443, 5000, 53228] - for dport in dports: - seq_init = int(RandInt()) - syn = ( - Ether(dst=MAC_ADDR) - / IP(dst=IPV4_ADDR) - / TCP(flags="S", sport=sport, dport=dport, seq=seq_init) - ) - syn_ack = srp1(syn, timeout=1) - assert syn_ack is not None, "expecting answer, got nothing" - check_ip_checksum(syn_ack) - assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary() - syn_ack = syn_ack[TCP] - assert syn_ack.flags == "SA" - ack = ( - Ether(dst=MAC_ADDR) - / IP(dst=IPV4_ADDR) - / TCP( - flags="A", - sport=sport, - dport=dport, - seq=seq_init + 1, - ack=syn_ack.seq + 1, - ) - ) - _ = srp1(ack, timeout=1) - req = ( - Ether(dst=MAC_ADDR) - / IP(dst=IPV4_ADDR) - / TCP( - flags="PA", - sport=sport, - dport=dport, - seq=seq_init + 1, - ack=syn_ack.seq + 1, - ) - / Raw(bytes.fromhex("4f5054494f4e53")) - ) - resp = srp1(req, timeout=1) - assert resp is not None, "expecting answer, got nothing" - check_ip_checksum(resp) - assert TCP in resp, "expecting TCP, got %r" % resp.summary() - assert "P" not in resp[TCP].flags - assert len(resp[TCP].payload) == 0 - - -@test -def test_ipv4_udp_http_ko(): - sport = 24592 - dports = [80, 443, 5000, 53228] - for dport in dports: - req = ( - Ether(dst=MAC_ADDR) - / IP(dst=IPV4_ADDR) - / UDP(sport=sport, dport=dport) - / Raw(bytes.fromhex("4f5054494f4e53")) - ) - resp = srp1(req, timeout=1) - assert resp is None, "expecting no answer, got one" - - -@test -def test_ipv6_tcp_http_ko(): - sport = 24592 - dports = [80, 443, 5000, 53228] - for dport in dports: - seq_init = int(RandInt()) - syn = ( - Ether(dst=MAC_ADDR) - / IPv6(dst=IPV6_ADDR) - / TCP(flags="S", sport=sport, dport=dport, seq=seq_init) - ) - syn_ack = srp1(syn, timeout=1) - assert syn_ack is not None, "expecting answer, got nothing" - check_ipv6_checksum(syn_ack) - assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary() - syn_ack = syn_ack[TCP] - assert syn_ack.flags == "SA" - ack = ( - Ether(dst=MAC_ADDR) - / IPv6(dst=IPV6_ADDR) - / TCP( - flags="A", - sport=sport, - dport=dport, - seq=seq_init + 1, - ack=syn_ack.seq + 1, - ) - ) - _ = srp1(ack, timeout=1) - req = ( - Ether(dst=MAC_ADDR) - / IPv6(dst=IPV6_ADDR) - / TCP( - flags="PA", - sport=sport, - dport=dport, - seq=seq_init + 1, - ack=syn_ack.seq + 1, - ) - / Raw(bytes.fromhex("4f5054494f4e53")) - ) - resp = srp1(req, timeout=1) - assert resp is not None, "expecting answer, got nothing" - check_ipv6_checksum(resp) - assert TCP in resp, "expecting TCP, got %r" % resp.summary() - assert "P" not in resp[TCP].flags - assert len(resp[TCP].payload) == 0 - - -@test -def test_ipv6_udp_http_ko(): - sport = 24592 - dports = [80, 443, 5000, 53228] - for dport in dports: - req = ( - Ether(dst=MAC_ADDR) - / IPv6(dst=IPV6_ADDR) - / UDP(sport=sport, dport=dport) - / Raw(bytes.fromhex("4f5054494f4e53")) - ) - resp = srp1(req, timeout=1) - assert resp is None, "expecting no answer, got one" - - -@test -def test_ipv4_udp_stun(): - sports = [12345, 55555, 80, 43273] - dports = [80, 800, 8000, 3478] - payload = bytes.fromhex("000100002112a442000000000000000000000000") - for sport in sports: - for dport in dports: - req = ( - Ether(dst=MAC_ADDR) - / IP(dst=IPV4_ADDR) - / UDP(sport=sport, dport=dport) - / Raw(payload) - ) - resp = srp1(req, timeout=1) - assert resp is not None, "expecting answer, got nothing" - check_ip_checksum(resp) - assert UDP in resp, "no UDP layer found" - udp = resp[UDP] - assert udp.sport == dport, "unexpected UDP sport: {}".format(udp.sport) - assert udp.dport == sport, "unexpected UDP dport: {}".format(udp.dport) - resp_payload = udp.payload.load - type_, length, magic = struct.unpack(">HHI", resp_payload[:8]) - tid = resp_payload[8:20] - data = resp_payload[20:] - assert type_ == 0x0101, "expected type 0X0101, got 0x{:04x}".format(type_) - assert length == 12, "expected length 12, got {}".format(length) - assert ( - magic == 0x2112A442 - ), "expected magic 0x2112a442, got 0x{:08x}".format(magic) - assert ( - tid == b"\x00" * 12 - ), "expected tid 0x000000000000000000000000, got {:x}".format(tid) - expected_data = b"\x00\x01\x00\x08\x00\x01" + struct.pack( - ">HBBBB", sport, 192, 0, 0, 0 - ) - assert ( - data == expected_data - ), f"unexpected data {data!r} != {expected_data!r}" - - -@test -def test_ipv6_udp_stun(): - sports = [12345, 55555, 80, 43273] - dports = [80, 800, 8000, 3478] - payload = bytes.fromhex("000100002112a442000000000000000000000000") - for sport in sports: - for dport in dports: - req = ( - Ether(dst=MAC_ADDR) - / IPv6(dst=IPV6_ADDR) - / UDP(sport=sport, dport=dport) - / Raw(payload) - ) - resp = srp1(req, timeout=1) - assert resp is not None, "expecting answer, got nothing" - check_ipv6_checksum(resp) - assert UDP in resp - udp = resp[UDP] - assert udp.sport == dport - assert udp.dport == sport - resp_payload = udp.payload.load - type_, length, magic = struct.unpack(">HHI", resp_payload[:8]) - tid = resp_payload[8:20] - data = resp_payload[20:] - assert type_ == 0x0101, "expected type 0X0101, got 0x{:04x}".format(type_) - assert length == 24, "expected length 24, got {}".format(length) - assert ( - magic == 0x2112A442 - ), "expected magic 0x2112a442, got 0x{:08x}".format(magic) - assert ( - tid == b"\x00" * 12 - ), "expected tid 0x000000000000000000000000, got {:x}".format(tid) - expected_data = ( - bytes.fromhex("000100140002") - + struct.pack(">H", sport) - + inet_pton(AF_INET6, "2001:41d0::1234:5678") - ) - assert data == expected_data, "unexpected data: {}".format(data) - - -@test -def test_ipv4_udp_stun_change_port(): - sports = [12345, 55555, 80, 43273] - dports = [80, 800, 8000, 3478, 65535] - payload = bytes.fromhex("0001000803a3b9464dd8eb75e19481474293845c0003000400000002") - for sport in sports: - for dport in dports: - req = ( - Ether(dst=MAC_ADDR) - / IP(dst=IPV4_ADDR) - / UDP(sport=sport, dport=dport) - / Raw(payload) - ) - resp = srp1(req, timeout=1) - assert resp is not None, "expecting answer, got nothing" - check_ip_checksum(resp) - assert UDP in resp, "no UDP layer found" - udp = resp[UDP] - assert ( - udp.sport == (dport + 1) % 2**16 - ), "expected answer from UDP/{}, got it from UDP/{}".format( - (dport + 1) % 2**16, udp.sport - ) - assert ( - udp.dport == sport - ), "expected answer to UDP/{}, got it to UDP/{}".format(sport, udp.dport) - resp_payload = udp.payload.load - type_, length = struct.unpack(">HH", resp_payload[:4]) - tid = resp_payload[4:20] - data = resp_payload[20:] - assert type_ == 0x0101, "expected type 0X0101, got 0x{:04x}".format(type_) - assert length == 12, "expected length 12, got {}".format(length) - assert tid == bytes.fromhex("03a3b9464dd8eb75e19481474293845c"), ( - "expected tid 0x03a3b9464dd8eb75e19481474293845c, got %r" % tid - ) - expected_data = b"\x00\x01\x00\x08\x00\x01" + struct.pack( - ">HBBBB", sport, 192, 0, 0, 0 - ) - assert ( - data == expected_data - ), f"unexpected data {data!r} != {expected_data!r}" - - -@test -def test_ipv6_udp_stun_change_port(): - sports = [12345, 55555, 80, 43273] - dports = [80, 800, 8000, 3478, 65535] - payload = bytes.fromhex("0001000803a3b9464dd8eb75e19481474293845c0003000400000002") - for sport in sports: - for dport in dports: - req = ( - Ether(dst=MAC_ADDR) - / IPv6(dst=IPV6_ADDR) - / UDP(sport=sport, dport=dport) - / Raw(payload) - ) - resp = srp1(req, timeout=1) - assert resp is not None, "expecting answer, got nothing" - check_ipv6_checksum(resp) - assert UDP in resp, "expecting UDP layer in answer, got nothing" - udp = resp[UDP] - assert ( - udp.sport == (dport + 1) % 2**16 - ), "expected answer from UDP/{}, got it from UDP/{}".format( - (dport + 1) % 2**16, udp.sport - ) - assert ( - udp.dport == sport - ), "expected answer to UDP/{}, got it to UDP/{}".format(sport, udp.dport) - resp_payload = udp.payload.load - type_, length = struct.unpack(">HH", resp_payload[:4]) - tid = resp_payload[4:20] - data = resp_payload[20:] - assert type_ == 0x0101, "expected type 0X0101, got 0x{:04x}".format(type_) - assert length == 24, "expected length 12, got {}".format(length) - assert tid == bytes.fromhex("03a3b9464dd8eb75e19481474293845c"), ( - "expected tid 0x03a3b9464dd8eb75e19481474293845c, got %r" % tid - ) - expected_data = ( - bytes.fromhex("000100140002") - + struct.pack(">H", sport) - + inet_pton(AF_INET6, "2001:41d0::1234:5678") - ) - assert ( - data == expected_data - ), f"unexpected data {data!r} != {expected_data!r}" - - -@test -def test_ipv4_tcp_ssh(): - sport = 37183 - dports = [22, 80, 2222, 2022, 23874, 50000] - for i, dport in enumerate(dports): - seq_init = int(RandInt()) - banner = [ - b"SSH-2.0-AsyncSSH_2.1.0", - b"SSH-2.0-PuTTY", - b"SSH-2.0-libssh2_1.4.3", - b"SSH-2.0-Go", - b"SSH-2.0-PUTTY", - ][i % 5] - syn = ( - Ether(dst=MAC_ADDR) - / IP(dst=IPV4_ADDR) - / TCP(flags="S", sport=sport, dport=dport, seq=seq_init) - ) - syn_ack = srp1(syn, timeout=1) - assert syn_ack is not None, "expecting answer, got nothing" - check_ip_checksum(syn_ack) - assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary() - syn_ack = syn_ack[TCP] - assert syn_ack.flags == "SA" - ack = ( - Ether(dst=MAC_ADDR) - / IP(dst=IPV4_ADDR) - / TCP( - flags="A", - sport=sport, - dport=dport, - seq=seq_init + 1, - ack=syn_ack.seq + 1, - ) - ) - _ = srp1(ack, timeout=1) - req = ( - Ether(dst=MAC_ADDR) - / IP(dst=IPV4_ADDR) - / TCP( - flags="PA", - sport=sport, - dport=dport, - seq=seq_init + 1, - ack=syn_ack.seq + 1, - ) - / Raw(banner + b"\r\n") - ) - resp = srp1(req, timeout=1) - assert resp is not None, "expecting answer, got nothing" - check_ip_checksum(resp) - assert TCP in resp, "expecting TCP, got %r" % resp.summary() - tcp = resp[TCP] - assert "A" in tcp.flags, "expecting ACK flag, not set (%r)" % tcp.flags - assert "P" in tcp.flags, "expecting PSH flag, not set (%r)" % tcp.flags - assert len(tcp.payload) > 0, "expecting payload, got none" - assert tcp.payload.load.startswith(b"SSH-2.0-"), ( - "unexpected banner: %r" % tcp.payload.load - ) - assert tcp.payload.load.endswith(b"\r\n"), ( - "unexpected banner: %r" % tcp.payload.load - ) - - -@test -def test_ipv4_udp_ssh(): - sport = 37183 - dports = [22, 80, 2222, 2022, 23874, 50000] - for i, dport in enumerate(dports): - banner = [ - b"SSH-2.0-AsyncSSH_2.1.0", - b"SSH-2.0-PuTTY", - b"SSH-2.0-libssh2_1.4.3", - b"SSH-2.0-Go", - b"SSH-2.0-PUTTY", - ][i % 5] - req = ( - Ether(dst=MAC_ADDR) - / IP(dst=IPV4_ADDR) - / UDP(sport=sport, dport=dport) - / Raw(banner + b"\r\n") - ) - resp = srp1(req, timeout=1) - assert resp is not None, "expecting answer, got nothing" - check_ip_checksum(resp) - assert UDP in resp - udp = resp[UDP] - assert len(udp.payload) > 0, "expecting payload, got none" - assert udp.payload.load.startswith(b"SSH-2.0-"), ( - "unexpected banner: %r" % udp.payload.load - ) - assert udp.payload.load.endswith(b"\r\n"), ( - "unexpected banner: %r" % udp.payload.load - ) - - -@test -def test_ipv6_tcp_ssh(): - sport = 37183 - dports = [22, 80, 2222, 2022, 23874, 50000] - for i, dport in enumerate(dports): - seq_init = int(RandInt()) - banner = [ - b"SSH-2.0-AsyncSSH_2.1.0", - b"SSH-2.0-PuTTY", - b"SSH-2.0-libssh2_1.4.3", - b"SSH-2.0-Go", - b"SSH-2.0-PUTTY", - ][i % 5] - syn = ( - Ether(dst=MAC_ADDR) - / IPv6(dst=IPV6_ADDR) - / TCP(flags="S", sport=sport, dport=dport, seq=seq_init) - ) - syn_ack = srp1(syn, timeout=1) - assert syn_ack is not None, "expecting answer, got nothing" - check_ipv6_checksum(syn_ack) - assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary() - syn_ack = syn_ack[TCP] - assert syn_ack.flags == "SA" - ack = ( - Ether(dst=MAC_ADDR) - / IPv6(dst=IPV6_ADDR) - / TCP( - flags="A", - sport=sport, - dport=dport, - seq=seq_init + 1, - ack=syn_ack.seq + 1, - ) - ) - _ = srp1(ack, timeout=1) - req = ( - Ether(dst=MAC_ADDR) - / IPv6(dst=IPV6_ADDR) - / TCP( - flags="PA", - sport=sport, - dport=dport, - seq=seq_init + 1, - ack=syn_ack.seq + 1, - ) - / Raw(banner + b"\r\n") - ) - resp = srp1(req, timeout=1) - assert resp is not None, "expecting answer, got nothing" - check_ipv6_checksum(resp) - assert TCP in resp, "expecting TCP, got %r" % resp.summary() - tcp = resp[TCP] - assert "A" in tcp.flags, "expecting ACK flag, not set (%r)" % tcp.flags - assert "P" in tcp.flags, "expecting PSH flag, not set (%r)" % tcp.flags - assert len(tcp.payload) > 0, "expecting payload, got none" - assert tcp.payload.load.startswith(b"SSH-2.0-"), ( - "unexpected banner: %r" % tcp.payload.load - ) - assert tcp.payload.load.endswith(b"\r\n"), ( - "unexpected banner: %r" % tcp.payload.load - ) - - -@test -def test_ipv6_udp_ssh(): - sport = 37183 - dports = [22, 80, 2222, 2022, 23874, 50000] - for i, dport in enumerate(dports): - banner = [ - b"SSH-2.0-AsyncSSH_2.1.0", - b"SSH-2.0-PuTTY", - b"SSH-2.0-libssh2_1.4.3", - b"SSH-2.0-Go", - b"SSH-2.0-PUTTY", - ][i % 5] - req = ( - Ether(dst=MAC_ADDR) - / IPv6(dst=IPV6_ADDR) - / UDP(sport=sport, dport=dport) - / Raw(banner + b"\r\n") - ) - resp = srp1(req, timeout=1) - assert resp is not None, "expecting answer, got nothing" - check_ipv6_checksum(resp) - assert UDP in resp - udp = resp[UDP] - assert len(udp.payload) > 0, "expecting payload, got none" - assert udp.payload.load.startswith(b"SSH-2.0-"), ( - "unexpected banner: %r" % udp.payload.load - ) - assert udp.payload.load.endswith(b"\r\n"), ( - "unexpected banner: %r" % udp.payload.load - ) - - -@test -def test_ipv4_tcp_ghost(): - sport = 37184 - dports = [22, 23874] - for dport in dports: - seq_init = int(RandInt()) - banner = b"Gh0st\xad\x00\x00\x00\xe0\x00\x00\x00x\x9cKS``\x98\xc3\xc0\xc0\xc0\x06\xc4\x8c@\xbcQ\x96\x81\x81\tH\x07\xa7\x16\x95e&\xa7*\x04$&g+\x182\x94\xf6\xb000\xac\xa8rc\x00\x01\x11\xa0\x82\x1f\\`&\x83\xc7K7\x86\x19\xe5n\x0c9\x95n\x0c;\x84\x0f3\xac\xe8sch\xa8^\xcf4'J\x97\xa9\x82\xe30\xc3\x91h]&\x90\xf8\xce\x97S\xcbA4L?2=\xe1\xc4\x92\x86\x0b@\xf5`\x0cT\x1f\xae\xaf]\nr\x0b\x03#\xa3\xdc\x02~\x06\x86\x03+\x18m\xc2=\xfdtC,C\xfdL<<==\\\x9d\x19\x88\x00\xe5 \x02\x00T\xf5+\\" - syn = ( - Ether(dst=MAC_ADDR) - / IP(dst=IPV4_ADDR) - / TCP(flags="S", sport=sport, dport=dport, seq=seq_init) - ) - syn_ack = srp1(syn, timeout=1) - assert syn_ack is not None, "expecting answer, got nothing" - check_ip_checksum(syn_ack) - assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary() - syn_ack = syn_ack[TCP] - assert syn_ack.flags == "SA" - ack = ( - Ether(dst=MAC_ADDR) - / IP(dst=IPV4_ADDR) - / TCP( - flags="A", - sport=sport, - dport=dport, - seq=seq_init + 1, - ack=syn_ack.seq + 1, - ) - ) - _ = srp1(ack, timeout=1) - req = ( - Ether(dst=MAC_ADDR) - / IP(dst=IPV4_ADDR) - / TCP( - flags="PA", - sport=sport, - dport=dport, - seq=seq_init + 1, - ack=syn_ack.seq + 1, - ) - / Raw(banner) - ) - resp = srp1(req, timeout=1) - assert resp is not None, "expecting answer, got nothing" - check_ip_checksum(resp) - assert TCP in resp, "expecting TCP, got %r" % resp.summary() - tcp = resp[TCP] - assert "A" in tcp.flags, "expecting ACK flag, not set (%r)" % tcp.flags - assert "P" in tcp.flags, "expecting PSH flag, not set (%r)" % tcp.flags - data = raw(tcp.payload) - assert data, "expecting payload, got none" - assert data.startswith(b"Gh0st"), "unexpected banner: %r" % tcp.payload.load - data_len, uncompressed_len = struct.unpack(". + +import logging + +from scapy.compat import raw +from scapy.layers.inet import IP +from scapy.layers.inet6 import IPv6 + + +def setup_logs(): + log = logging.getLogger() + log.setLevel(logging.DEBUG) + if not log.handlers: + ch = logging.StreamHandler() + ch.setFormatter(logging.Formatter("%(levelname)s\t%(message)s")) + ch.setLevel(logging.DEBUG) + log.addHandler(ch) + return log + + +LOG = setup_logs() +TESTS = [] +ERRORS = [] + +# decorator to automatically add a function to tests +def test(f): + global ERRORS, TESTS + OK = "\033[1mOK\033[0m" + KO = "\033[1m\033[1;%dmKO\033[0m" % 31 + fname = f.__name__.ljust(50, ".") + + def w(): + try: + f() + LOG.info("{}{}".format(fname, OK)) + except AssertionError as e: + LOG.error("{}{}: {}".format(fname, KO, e)) + ERRORS.append(fname) + + TESTS.append(w) + return w + + +def test_all(): + global ERRORS, TESTS + # execute tests + for t in TESTS: + t() + LOG.info(f"\033[1mRan {len(TESTS)} tests with {len(ERRORS)} errors\033[0m") + return len(ERRORS) + + +def multicast(ip6): + a, b = ip6.split(":")[-2:] + mac = ["33", "33", "ff"] + if len(a) == 4: + mac.append(a[2:]) + else: + mac.append("00") + if len(b) >= 2: + mac.append(b[:2]) + else: + mac.append("00") + if len(b) >= 4: + mac.append(b[2:]) + else: + mac.append("00") + return ":".join(mac) + + +def check_ip_checksum(pkt): + assert IP in pkt, "no IP layer found" + ip_pkt = pkt[IP] + chksum = ip_pkt.chksum + del ip_pkt.chksum + assert IP(raw(ip_pkt)).chksum == chksum, "bad IPv4 checksum" + + +def check_ipv6_checksum(pkt): + assert IPv6 in pkt, "no IP layer found" + ip_pkt = pkt[IPv6] + chksum = ip_pkt.chksum + del ip_pkt.chksum + assert IPv6(raw(ip_pkt)).chksum == chksum, "bad IPv6 checksum" diff --git a/test/src/tests/__init__.py b/test/src/tests/__init__.py new file mode 100644 index 0000000..831f613 --- /dev/null +++ b/test/src/tests/__init__.py @@ -0,0 +1,15 @@ +# This file is part of masscanned. +# Copyright 2021 - The IVRE project +# +# Masscanned is free software: you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Masscanned is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY +# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public +# License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Masscanned. If not, see . diff --git a/test/src/tests/arp.py b/test/src/tests/arp.py new file mode 100644 index 0000000..4f65cb7 --- /dev/null +++ b/test/src/tests/arp.py @@ -0,0 +1,51 @@ +# This file is part of masscanned. +# Copyright 2021 - The IVRE project +# +# Masscanned is free software: you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Masscanned is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY +# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public +# License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Masscanned. If not, see . + +from scapy.layers.l2 import Ether, ARP, ETHER_BROADCAST +from scapy.sendrecv import srp1 + +from ..conf import IPV4_ADDR, MAC_ADDR +from ..core import test + + +@test +def test_arp_req(): + ##### ARP ##### + arp_req = Ether(dst=ETHER_BROADCAST) / ARP(pdst=IPV4_ADDR) + arp_repl = srp1(arp_req, timeout=1) + assert arp_repl is not None, "expecting answer, got nothing" + assert ARP in arp_repl, "no ARP layer found" + arp_repl = arp_repl[ARP] + # check answer + ## op is "is-at" + assert arp_repl.op == 2, "unexpected ARP op: {}".format(arp_repl.op) + ## answer for the requested IP + assert arp_repl.psrc == arp_req.pdst, "unexpected ARP psrc: {}".format( + arp_repl.psrc + ) + assert arp_repl.pdst == arp_req.psrc, "unexpected ARP pdst: {}".format( + arp_repl.pdst + ) + ## answer is expected MAC address + assert arp_repl.hwsrc == MAC_ADDR, "unexpected ARP hwsrc: {}".format(arp_repl.hwsrc) + + +@test +def test_arp_req_other_ip(): + ##### ARP ##### + arp_req = Ether(dst=ETHER_BROADCAST) / ARP(pdst="1.2.3.4") + arp_repl = srp1(arp_req, timeout=1) + assert arp_repl is None, "responding to ARP requests for other IP addresses" diff --git a/test/src/tests/ghost.py b/test/src/tests/ghost.py new file mode 100644 index 0000000..303e58e --- /dev/null +++ b/test/src/tests/ghost.py @@ -0,0 +1,87 @@ +# This file is part of masscanned. +# Copyright 2021 - The IVRE project +# +# Masscanned is free software: you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Masscanned is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY +# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public +# License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Masscanned. If not, see . + +import struct +import zlib + +from scapy.compat import raw +from scapy.layers.inet import IP, TCP +from scapy.layers.l2 import Ether +from scapy.packet import Raw +from scapy.sendrecv import srp1 +from scapy.volatile import RandInt + +from ..conf import IPV4_ADDR, MAC_ADDR +from ..core import test, check_ip_checksum + + +@test +def test_ipv4_tcp_ghost(): + sport = 37184 + dports = [22, 23874] + for dport in dports: + seq_init = int(RandInt()) + banner = b"Gh0st\xad\x00\x00\x00\xe0\x00\x00\x00x\x9cKS``\x98\xc3\xc0\xc0\xc0\x06\xc4\x8c@\xbcQ\x96\x81\x81\tH\x07\xa7\x16\x95e&\xa7*\x04$&g+\x182\x94\xf6\xb000\xac\xa8rc\x00\x01\x11\xa0\x82\x1f\\`&\x83\xc7K7\x86\x19\xe5n\x0c9\x95n\x0c;\x84\x0f3\xac\xe8sch\xa8^\xcf4'J\x97\xa9\x82\xe30\xc3\x91h]&\x90\xf8\xce\x97S\xcbA4L?2=\xe1\xc4\x92\x86\x0b@\xf5`\x0cT\x1f\xae\xaf]\nr\x0b\x03#\xa3\xdc\x02~\x06\x86\x03+\x18m\xc2=\xfdtC,C\xfdL<<==\\\x9d\x19\x88\x00\xe5 \x02\x00T\xf5+\\" + syn = ( + Ether(dst=MAC_ADDR) + / IP(dst=IPV4_ADDR) + / TCP(flags="S", sport=sport, dport=dport, seq=seq_init) + ) + syn_ack = srp1(syn, timeout=1) + assert syn_ack is not None, "expecting answer, got nothing" + check_ip_checksum(syn_ack) + assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary() + syn_ack = syn_ack[TCP] + assert syn_ack.flags == "SA" + ack = ( + Ether(dst=MAC_ADDR) + / IP(dst=IPV4_ADDR) + / TCP( + flags="A", + sport=sport, + dport=dport, + seq=seq_init + 1, + ack=syn_ack.seq + 1, + ) + ) + _ = srp1(ack, timeout=1) + req = ( + Ether(dst=MAC_ADDR) + / IP(dst=IPV4_ADDR) + / TCP( + flags="PA", + sport=sport, + dport=dport, + seq=seq_init + 1, + ack=syn_ack.seq + 1, + ) + / Raw(banner) + ) + resp = srp1(req, timeout=1) + assert resp is not None, "expecting answer, got nothing" + check_ip_checksum(resp) + assert TCP in resp, "expecting TCP, got %r" % resp.summary() + tcp = resp[TCP] + assert "A" in tcp.flags, "expecting ACK flag, not set (%r)" % tcp.flags + assert "P" in tcp.flags, "expecting PSH flag, not set (%r)" % tcp.flags + data = raw(tcp.payload) + assert data, "expecting payload, got none" + assert data.startswith(b"Gh0st"), "unexpected banner: %r" % tcp.payload.load + data_len, uncompressed_len = struct.unpack(". + +from scapy.layers.inet import IP, TCP, UDP +from scapy.layers.inet6 import IPv6 +from scapy.layers.l2 import Ether +from scapy.packet import Raw +from scapy.sendrecv import srp1 +from scapy.volatile import RandInt + +from ..conf import IPV4_ADDR, IPV6_ADDR, MAC_ADDR +from ..core import test, check_ip_checksum, check_ipv6_checksum + + +@test +def test_ipv4_tcp_http(): + sport = 24592 + dports = [80, 443, 5000, 53228] + for dport in dports: + seq_init = int(RandInt()) + syn = ( + Ether(dst=MAC_ADDR) + / IP(dst=IPV4_ADDR) + / TCP(flags="S", sport=sport, dport=dport, seq=seq_init) + ) + syn_ack = srp1(syn, timeout=1) + assert syn_ack is not None, "expecting answer, got nothing" + check_ip_checksum(syn_ack) + assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary() + syn_ack = syn_ack[TCP] + assert syn_ack.flags == "SA", "expecting TCP SA, got %r" % syn_ack.flags + ack = ( + Ether(dst=MAC_ADDR) + / IP(dst=IPV4_ADDR) + / TCP( + flags="A", + sport=sport, + dport=dport, + seq=seq_init + 1, + ack=syn_ack.seq + 1, + ) + ) + _ = srp1(ack, timeout=1) + req = ( + Ether(dst=MAC_ADDR) + / IP(dst=IPV4_ADDR) + / TCP( + flags="PA", + sport=sport, + dport=dport, + seq=seq_init + 1, + ack=syn_ack.seq + 1, + ) + / Raw("GET / HTTP/1.1\r\n\r\n") + ) + resp = srp1(req, timeout=1) + assert resp is not None, "expecting answer, got nothing" + check_ip_checksum(resp) + assert TCP in resp, "expecting TCP, got %r" % resp.summary() + tcp = resp[TCP] + assert tcp.payload.load.startswith(b"HTTP/1.1 401 Unauthorized\n") + + +@test +def test_ipv4_tcp_http_incomplete(): + sport = 24595 + dports = [80, 443, 5000, 53228] + for dport in dports: + seq_init = int(RandInt()) + syn = ( + Ether(dst=MAC_ADDR) + / IP(dst=IPV4_ADDR) + / TCP(flags="S", sport=sport, dport=dport, seq=seq_init) + ) + syn_ack = srp1(syn, timeout=1) + assert syn_ack is not None, "expecting answer, got nothing" + check_ip_checksum(syn_ack) + assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary() + syn_ack = syn_ack[TCP] + assert syn_ack.flags == "SA", "expecting TCP SA, got %r" % syn_ack.flags + ack = ( + Ether(dst=MAC_ADDR) + / IP(dst=IPV4_ADDR) + / TCP( + flags="A", + sport=sport, + dport=dport, + seq=seq_init + 1, + ack=syn_ack.seq + 1, + ) + ) + _ = srp1(ack, timeout=1) + req = ( + Ether(dst=MAC_ADDR) + / IP(dst=IPV4_ADDR) + / TCP( + flags="PA", + sport=sport, + dport=dport, + seq=seq_init + 1, + ack=syn_ack.seq + 1, + ) + # purposedly incomplete request (missing additionnal ending \r\n) + / Raw("GET / HTTP/1.1\r\n") + ) + resp = srp1(req, timeout=1) + assert resp is not None, "expecting an answer, got none" + check_ip_checksum(resp) + assert TCP in resp, "expecting TCP, got %r" % resp.summary() + tcp = resp[TCP] + assert tcp.flags == "A", "expecting TCP flag A, got {}".format(tcp.flags) + + +@test +def test_ipv6_tcp_http(): + sport = 24592 + dports = [80, 443, 5000, 53228] + for dport in dports: + seq_init = int(RandInt()) + syn = ( + Ether(dst=MAC_ADDR) + / IPv6(dst=IPV6_ADDR) + / TCP(flags="S", sport=sport, dport=dport, seq=seq_init) + ) + syn_ack = srp1(syn, timeout=1) + assert syn_ack is not None, "expecting answer, got nothing" + check_ipv6_checksum(syn_ack) + assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary() + syn_ack = syn_ack[TCP] + assert syn_ack.flags == "SA" + ack = ( + Ether(dst=MAC_ADDR) + / IPv6(dst=IPV6_ADDR) + / TCP( + flags="A", + sport=sport, + dport=dport, + seq=seq_init + 1, + ack=syn_ack.seq + 1, + ) + ) + _ = srp1(ack, timeout=1) + req = ( + Ether(dst=MAC_ADDR) + / IPv6(dst=IPV6_ADDR) + / TCP( + flags="PA", + sport=sport, + dport=dport, + seq=seq_init + 1, + ack=syn_ack.seq + 1, + ) + / Raw("GET / HTTP/1.1\r\n\r\n") + ) + resp = srp1(req, timeout=1) + assert resp is not None, "expecting answer, got nothing" + check_ipv6_checksum(resp) + assert TCP in resp, "expecting TCP, got %r" % resp.summary() + tcp = resp[TCP] + assert tcp.payload.load.startswith(b"HTTP/1.1 401 Unauthorized\n") + + +@test +def test_ipv4_udp_http(): + sport = 24592 + dports = [80, 443, 5000, 53228] + for dport in dports: + req = ( + Ether(dst=MAC_ADDR) + / IP(dst=IPV4_ADDR) + / UDP(sport=sport, dport=dport) + / Raw("GET / HTTP/1.1\r\n\r\n") + ) + resp = srp1(req, timeout=1) + assert resp is not None, "expecting answer, got nothing" + check_ip_checksum(resp) + assert UDP in resp + udp = resp[UDP] + assert udp.payload.load.startswith(b"HTTP/1.1 401 Unauthorized\n") + + +@test +def test_ipv6_udp_http(): + sport = 24592 + dports = [80, 443, 5000, 53228] + for dport in dports: + req = ( + Ether(dst=MAC_ADDR) + / IPv6(dst=IPV6_ADDR) + / UDP(sport=sport, dport=dport) + / Raw("GET / HTTP/1.1\r\n\r\n") + ) + resp = srp1(req, timeout=1) + assert resp is not None, "expecting answer, got nothing" + check_ipv6_checksum(resp) + assert UDP in resp + udp = resp[UDP] + assert udp.payload.load.startswith(b"HTTP/1.1 401 Unauthorized\n") + + +@test +def test_ipv4_tcp_http_ko(): + sport = 24592 + dports = [80, 443, 5000, 53228] + for dport in dports: + seq_init = int(RandInt()) + syn = ( + Ether(dst=MAC_ADDR) + / IP(dst=IPV4_ADDR) + / TCP(flags="S", sport=sport, dport=dport, seq=seq_init) + ) + syn_ack = srp1(syn, timeout=1) + assert syn_ack is not None, "expecting answer, got nothing" + check_ip_checksum(syn_ack) + assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary() + syn_ack = syn_ack[TCP] + assert syn_ack.flags == "SA" + ack = ( + Ether(dst=MAC_ADDR) + / IP(dst=IPV4_ADDR) + / TCP( + flags="A", + sport=sport, + dport=dport, + seq=seq_init + 1, + ack=syn_ack.seq + 1, + ) + ) + _ = srp1(ack, timeout=1) + req = ( + Ether(dst=MAC_ADDR) + / IP(dst=IPV4_ADDR) + / TCP( + flags="PA", + sport=sport, + dport=dport, + seq=seq_init + 1, + ack=syn_ack.seq + 1, + ) + / Raw(bytes.fromhex("4f5054494f4e53")) + ) + resp = srp1(req, timeout=1) + assert resp is not None, "expecting answer, got nothing" + check_ip_checksum(resp) + assert TCP in resp, "expecting TCP, got %r" % resp.summary() + assert "P" not in resp[TCP].flags + assert len(resp[TCP].payload) == 0 + + +@test +def test_ipv4_udp_http_ko(): + sport = 24592 + dports = [80, 443, 5000, 53228] + for dport in dports: + req = ( + Ether(dst=MAC_ADDR) + / IP(dst=IPV4_ADDR) + / UDP(sport=sport, dport=dport) + / Raw(bytes.fromhex("4f5054494f4e53")) + ) + resp = srp1(req, timeout=1) + assert resp is None, "expecting no answer, got one" + + +@test +def test_ipv6_tcp_http_ko(): + sport = 24592 + dports = [80, 443, 5000, 53228] + for dport in dports: + seq_init = int(RandInt()) + syn = ( + Ether(dst=MAC_ADDR) + / IPv6(dst=IPV6_ADDR) + / TCP(flags="S", sport=sport, dport=dport, seq=seq_init) + ) + syn_ack = srp1(syn, timeout=1) + assert syn_ack is not None, "expecting answer, got nothing" + check_ipv6_checksum(syn_ack) + assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary() + syn_ack = syn_ack[TCP] + assert syn_ack.flags == "SA" + ack = ( + Ether(dst=MAC_ADDR) + / IPv6(dst=IPV6_ADDR) + / TCP( + flags="A", + sport=sport, + dport=dport, + seq=seq_init + 1, + ack=syn_ack.seq + 1, + ) + ) + _ = srp1(ack, timeout=1) + req = ( + Ether(dst=MAC_ADDR) + / IPv6(dst=IPV6_ADDR) + / TCP( + flags="PA", + sport=sport, + dport=dport, + seq=seq_init + 1, + ack=syn_ack.seq + 1, + ) + / Raw(bytes.fromhex("4f5054494f4e53")) + ) + resp = srp1(req, timeout=1) + assert resp is not None, "expecting answer, got nothing" + check_ipv6_checksum(resp) + assert TCP in resp, "expecting TCP, got %r" % resp.summary() + assert "P" not in resp[TCP].flags + assert len(resp[TCP].payload) == 0 + + +@test +def test_ipv6_udp_http_ko(): + sport = 24592 + dports = [80, 443, 5000, 53228] + for dport in dports: + req = ( + Ether(dst=MAC_ADDR) + / IPv6(dst=IPV6_ADDR) + / UDP(sport=sport, dport=dport) + / Raw(bytes.fromhex("4f5054494f4e53")) + ) + resp = srp1(req, timeout=1) + assert resp is None, "expecting no answer, got one" diff --git a/test/src/tests/icmpv4.py b/test/src/tests/icmpv4.py new file mode 100644 index 0000000..7722148 --- /dev/null +++ b/test/src/tests/icmpv4.py @@ -0,0 +1,45 @@ +# This file is part of masscanned. +# Copyright 2021 - The IVRE project +# +# Masscanned is free software: you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Masscanned is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY +# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public +# License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Masscanned. If not, see . + +from scapy.layers.inet import IP, ICMP +from scapy.layers.l2 import Ether +from scapy.packet import Raw +from scapy.sendrecv import srp1 + +from ..conf import IPV4_ADDR, MAC_ADDR +from ..core import test, check_ip_checksum + + +@test +def test_icmpv4_echo_req(): + ##### ICMPv4 ##### + icmp_req = ( + Ether(dst=MAC_ADDR) + / IP(dst=IPV4_ADDR) + / ICMP(type=8, code=0) + / Raw("idrinkwaytoomuchcoffee") + ) + icmp_repl = srp1(icmp_req, timeout=1) + assert icmp_repl is not None, "expecting answer, got nothing" + check_ip_checksum(icmp_repl) + assert ICMP in icmp_repl + icmp_repl = icmp_repl[ICMP] + # check answer + ## type is "echo-reply" + assert icmp_repl.type == 0 + assert icmp_repl.code == 0 + ## data is the same as sent + assert icmp_repl.load == icmp_req.load diff --git a/test/src/tests/icmpv6.py b/test/src/tests/icmpv6.py new file mode 100644 index 0000000..13a8817 --- /dev/null +++ b/test/src/tests/icmpv6.py @@ -0,0 +1,87 @@ +# This file is part of masscanned. +# Copyright 2021 - The IVRE project +# +# Masscanned is free software: you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Masscanned is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY +# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public +# License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Masscanned. If not, see . + +from scapy.layers.inet6 import ( + ICMPv6EchoReply, + ICMPv6EchoRequest, + ICMPv6NDOptDstLLAddr, + ICMPv6ND_NA, + ICMPv6ND_NS, + IPv6, +) +from scapy.layers.l2 import Ether +from scapy.sendrecv import srp1 + +from ..conf import IPV6_ADDR, MAC_ADDR +from ..core import test, multicast + + +@test +def test_icmpv6_neighbor_solicitation(): + ##### IPv6 Neighbor Solicitation ##### + for mac in [ + "ff:ff:ff:ff:ff:ff", + "33:33:00:00:00:01", + MAC_ADDR, + multicast(IPV6_ADDR), + ]: + nd_ns = Ether(dst=mac) / IPv6() / ICMPv6ND_NS(tgt=IPV6_ADDR) + nd_na = srp1(nd_ns, timeout=1) + assert nd_na is not None, "expecting answer, got nothing" + assert ICMPv6ND_NA in nd_na + nd_na = nd_na[ICMPv6ND_NA] + # check answer content + assert nd_na.code == 0 + assert nd_na.R == 0 + assert nd_na.S == 1 + assert nd_na.O == 1 # noqa: E741 + assert nd_na.tgt == IPV6_ADDR + # check ND Option + assert nd_na.haslayer(ICMPv6NDOptDstLLAddr) + assert nd_na.getlayer(ICMPv6NDOptDstLLAddr).lladdr == MAC_ADDR + for mac in ["00:00:00:00:00:00", "33:33:33:00:00:01"]: + nd_ns = Ether(dst="ff:ff:ff:ff:ff:ff") / IPv6() / ICMPv6ND_NS(tgt=IPV6_ADDR) + nd_na = srp1(nd_ns, timeout=1) + assert nd_na is not None, "expecting no answer, got one" + + +@test +def test_icmpv6_neighbor_solicitation_other_ip(): + ##### IPv6 Neighbor Solicitation ##### + nd_ns = ( + Ether(dst="ff:ff:ff:ff:ff:ff") + / IPv6() + / ICMPv6ND_NS(tgt="2020:4141:3030:2020::bdbd") + ) + nd_na = srp1(nd_ns, timeout=1) + assert nd_na is None, "responding to ND_NS for other IP addresses" + + +@test +def test_icmpv6_echo_req(): + ##### IPv6 Ping ##### + echo_req = ( + Ether(dst=MAC_ADDR) + / IPv6(dst=IPV6_ADDR) + / ICMPv6EchoRequest(data="waytoomanynapkins") + ) + echo_repl = srp1(echo_req, timeout=1) + assert echo_repl is not None, "expecting answer, got nothing" + assert ICMPv6EchoReply in echo_repl + echo_repl = echo_repl[ICMPv6EchoReply] + # check answer content + assert echo_repl.code == 0 + assert echo_repl.data == echo_req.data diff --git a/test/src/tests/ip46.py b/test/src/tests/ip46.py new file mode 100644 index 0000000..62f3912 --- /dev/null +++ b/test/src/tests/ip46.py @@ -0,0 +1,291 @@ +# This file is part of masscanned. +# Copyright 2021 - The IVRE project +# +# Masscanned is free software: you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Masscanned is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY +# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public +# License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Masscanned. If not, see . + +from scapy.layers.inet import IP, ICMP, TCP +from scapy.layers.inet6 import IPv6 +from scapy.layers.l2 import Ether +from scapy.packet import Raw +from scapy.sendrecv import srp1 +from scapy.volatile import RandInt + +from ..conf import IPV4_ADDR, IPV6_ADDR, MAC_ADDR +from ..core import test, check_ip_checksum, check_ipv6_checksum + + +@test +def test_ipv4_req(): + ##### IP ##### + ip_req = Ether(dst=MAC_ADDR) / IP(dst=IPV4_ADDR, id=0x1337) / ICMP(type=8, code=0) + ip_repl = srp1(ip_req, timeout=1) + assert ip_repl is not None, "expecting answer, got nothing" + check_ip_checksum(ip_repl) + assert IP in ip_repl, "no IP layer in response" + ip_repl = ip_repl[IP] + assert ip_repl.id == 0, "IP identification unexpected" + + +@test +def test_eth_req_other_mac(): + #### ETH #### + ip_req = Ether(dst="00:00:00:11:11:11") / IP(dst=IPV4_ADDR) / ICMP(type=8, code=0) + ip_repl = srp1(ip_req, timeout=1) + assert ip_repl is None, "responding to other MAC addresses" + + +@test +def test_ipv4_req_other_ip(): + ##### IP ##### + ip_req = Ether(dst=MAC_ADDR) / IP(dst="1.2.3.4") / ICMP(type=8, code=0) + ip_repl = srp1(ip_req, timeout=1) + assert ip_repl is None, "responding to other IP addresses" + + +@test +def test_tcp_syn(): + ##### SYN-ACK ##### + # test a list of ports, randomly generated once + ports_to_test = [ + 1152, + 2003, + 2193, + 3709, + 4054, + 6605, + 6737, + 6875, + 7320, + 8898, + 9513, + 9738, + 10623, + 10723, + 11253, + 12125, + 12189, + 12873, + 14648, + 14659, + 16242, + 16243, + 17209, + 17492, + 17667, + 17838, + 18081, + 18682, + 18790, + 19124, + 19288, + 19558, + 19628, + 19789, + 20093, + 21014, + 21459, + 21740, + 24070, + 24312, + 24576, + 26939, + 27136, + 27165, + 27361, + 29971, + 31088, + 33011, + 33068, + 34990, + 35093, + 35958, + 36626, + 36789, + 37130, + 37238, + 37256, + 37697, + 37890, + 38958, + 42131, + 43864, + 44420, + 44655, + 44868, + 45157, + 46213, + 46497, + 46955, + 49049, + 49067, + 49452, + 49480, + 50498, + 50945, + 51181, + 52890, + 53301, + 53407, + 53417, + 53980, + 55827, + 56483, + 58552, + 58713, + 58836, + 59362, + 59560, + 60534, + 60555, + 60660, + 61615, + 62402, + 62533, + 62941, + 63240, + 63339, + 63616, + 64380, + 65438, + ] + for p in ports_to_test: + seq_init = int(RandInt()) + syn = ( + Ether(dst=MAC_ADDR) + / IP(dst=IPV4_ADDR) + / TCP(flags="S", dport=p, seq=seq_init) + ) + syn_ack = srp1(syn, timeout=1) + assert syn_ack is not None, "expecting answer, got nothing" + check_ip_checksum(syn_ack) + assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary() + syn_ack = syn_ack[TCP] + assert syn_ack.flags == "SA", "expecting TCP SA, got %r" % syn_ack.flags + assert syn_ack.ack == seq_init + 1, "wrong TCP ack value (%r != %r)" % ( + syn_ack.ack, + seq_init + 1, + ) + + +@test +def test_ipv4_tcp_psh_ack(): + ##### PSH-ACK ##### + sport = 26695 + port = 445 + seq_init = int(RandInt()) + # send PSH-ACK first + psh_ack = ( + Ether(dst=MAC_ADDR) + / IP(dst=IPV4_ADDR) + / TCP(flags="PA", sport=sport, dport=port, seq=seq_init) + / Raw("payload") + ) + syn_ack = srp1(psh_ack, timeout=1) + assert syn_ack is None, "no answer expected, got one" + # test the anti-injection mechanism + seq_init = int(RandInt()) + syn = ( + Ether(dst=MAC_ADDR) + / IP(dst=IPV4_ADDR) + / TCP(flags="S", sport=sport, dport=port, seq=seq_init) + ) + syn_ack = srp1(syn, timeout=1) + assert syn_ack is not None, "expecting answer, got nothing" + check_ip_checksum(syn_ack) + assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary() + syn_ack = syn_ack[TCP] + assert syn_ack.flags == "SA", "expecting TCP SA, got %r" % syn_ack.flags + assert syn_ack.ack == seq_init + 1, "wrong TCP ack value (%r != %r)" % ( + syn_ack.ack, + seq_init + 1, + ) + ack = Ether(dst=MAC_ADDR) / IP(dst=IPV4_ADDR) / TCP(flags="A", dport=port) + # should fail because no ack given + psh_ack = ( + Ether(dst=MAC_ADDR) + / IP(dst=IPV4_ADDR) + / TCP(flags="PA", sport=sport, dport=port, ack=0, seq=seq_init + 1) + ) + ack = srp1(psh_ack, timeout=1) + assert ack is None, "no answer expected, got one" + # should get an answer this time + psh_ack = ( + Ether(dst=MAC_ADDR) + / IP(dst=IPV4_ADDR) + / TCP( + flags="PA", sport=sport, dport=port, ack=syn_ack.seq + 1, seq=seq_init + 1 + ) + ) + ack = srp1(psh_ack, timeout=1) + assert ack is not None, "expecting answer, got nothing" + check_ip_checksum(ack) + assert TCP in ack, "expecting TCP, got %r" % ack.summary() + ack = ack[TCP] + assert ack.flags == "A", "expecting TCP A, got %r" % syn_ack.flags + + +@test +def test_ipv6_tcp_psh_ack(): + ##### PSH-ACK ##### + sport = 26695 + port = 445 + seq_init = int(RandInt()) + # send PSH-ACK first + psh_ack = ( + Ether(dst=MAC_ADDR) + / IPv6(dst=IPV6_ADDR) + / TCP(flags="PA", sport=sport, dport=port, seq=seq_init) + / Raw("payload") + ) + syn_ack = srp1(psh_ack, timeout=1) + assert syn_ack is None, "no answer expected, got one" + # test the anti-injection mechanism + syn = ( + Ether(dst=MAC_ADDR) + / IPv6(dst=IPV6_ADDR) + / TCP(flags="S", sport=sport, dport=port, seq=seq_init) + ) + syn_ack = srp1(syn, timeout=1) + assert syn_ack is not None, "expecting answer, got nothing" + check_ipv6_checksum(syn_ack) + assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary() + syn_ack = syn_ack[TCP] + assert syn_ack.flags == "SA", "expecting TCP SA, got %r" % syn_ack.flags + assert syn_ack.ack == seq_init + 1, "wrong TCP ack value (%r != %r)" % ( + syn_ack.ack, + seq_init + 1, + ) + ack = Ether(dst=MAC_ADDR) / IPv6(dst=IPV6_ADDR) / TCP(flags="A", dport=port) + # should fail because no ack given + psh_ack = ( + Ether(dst=MAC_ADDR) + / IPv6(dst=IPV6_ADDR) + / TCP(flags="PA", sport=sport, dport=port, ack=0, seq=seq_init + 1) + ) + ack = srp1(psh_ack, timeout=1) + assert ack is None, "no answer expected, got one" + # should get an answer this time + psh_ack = ( + Ether(dst=MAC_ADDR) + / IPv6(dst=IPV6_ADDR) + / TCP( + flags="PA", sport=sport, dport=port, ack=syn_ack.seq + 1, seq=seq_init + 1 + ) + ) + ack = srp1(psh_ack, timeout=1) + assert ack is not None, "expecting answer, got nothing" + check_ipv6_checksum(ack) + assert TCP in ack, "expecting TCP, got %r" % ack.summary() + ack = ack[TCP] + assert ack.flags == "A", "expecting TCP A, got %r" % syn_ack.flags diff --git a/test/src/tests/rpc.py b/test/src/tests/rpc.py new file mode 100644 index 0000000..62e42a1 --- /dev/null +++ b/test/src/tests/rpc.py @@ -0,0 +1,99 @@ +# This file is part of masscanned. +# Copyright 2021 - The IVRE project +# +# Masscanned is free software: you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Masscanned is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY +# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public +# License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Masscanned. If not, see . + +from subprocess import check_call +from tempfile import NamedTemporaryFile +import json +import os +import re + +from ivre.db import DBNmap + +from ..conf import IPV4_ADDR +from ..core import test + + +@test +def test_rpc_nmap(): + for scan in "SU": + with NamedTemporaryFile(delete=False) as xml_result: + check_call( + [ + "nmap", + "-n", + "-vv", + "-oX", + "-", + IPV4_ADDR, + f"-s{scan}V", + "-p", + "111", + "--script", + "rpcinfo,rpc-grind", + ], + stdout=xml_result, + ) + with NamedTemporaryFile(delete=False, mode="w") as json_result: + DBNmap(output=json_result).store_scan(xml_result.name) + os.unlink(xml_result.name) + with open(json_result.name) as fdesc: + results = [json.loads(line) for line in fdesc] + os.unlink(json_result.name) + assert len(results) == 1, f"Expected 1 result, got {len(results)}" + result = results[0] + assert len(result["ports"]) == 1, f"Expected 1 port, got {len(result['ports'])}" + port = result["ports"][0] + assert port["port"] == 111 and port["protocol"] == ( + "tcp" if scan == "S" else "udp" + ) + assert port["service_name"] in {"rpcbind", "nfs"} + assert port["service_extrainfo"] in {"RPC #100000", "RPC #100003"} + assert ( + len(port["scripts"]) == 1 + ), f"Expected 1 script, got {len(port['scripts'])}" + script = port["scripts"][0] + assert script["id"] == "rpcinfo", "Expected rpcinfo script, not found" + assert len(script["rpcinfo"]) == 1 + + +@test +def test_rpcinfo(): + with NamedTemporaryFile(delete=False) as rpcout: + check_call(["rpcinfo", "-p", IPV4_ADDR], stdout=rpcout) + with open(rpcout.name) as fdesc: + found = [] + for line in fdesc: + line = line.split() + if line[0] == "program": + # header + continue + assert line[0] == "100000", f"Expected program 100000, got {line[0]}" + found.append(int(line[1])) + assert len(found) == 3, f"Expected three versions, got {found}" + for i in range(2, 5): + assert i in found, f"Missing version {i} in {found}" + os.unlink(rpcout.name) + with NamedTemporaryFile(delete=False) as rpcout: + check_call(["rpcinfo", "-u", IPV4_ADDR, "100000"], stdout=rpcout) + with open(rpcout.name) as fdesc: + found = [] + expr = re.compile("^program 100000 version ([0-9]) ready and waiting$") + for line in fdesc: + found.append(int(expr.search(line.strip()).group(1))) + assert len(found) == 3, f"Expected three versions, got {found}" + for i in range(2, 5): + assert i in found, f"Missing version {i} in {found}" + os.unlink(rpcout.name) diff --git a/test/src/tests/smb.py b/test/src/tests/smb.py new file mode 100644 index 0000000..3ce421d --- /dev/null +++ b/test/src/tests/smb.py @@ -0,0 +1,65 @@ +# This file is part of masscanned. +# Copyright 2021 - The IVRE project +# +# Masscanned is free software: you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Masscanned is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY +# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public +# License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Masscanned. If not, see . + +import subprocess + +from ..core import test +from ..conf import IPV4_ADDR + + +@test +def test_smb1_network_req(): + proc = subprocess.Popen( + [ + "smbclient", + "-U ''", + "-N", + "-d 6", + "-t 1", + "-L", + IPV4_ADDR, + "--option=client min protocol=NT1", + ], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + ) + out, _ = proc.communicate() + assert f"Connecting to {IPV4_ADDR} at port 445" in out, "\n" + out + assert "session request ok" in out, "\n" + out + assert f"negotiated dialect[NT1] against server[{IPV4_ADDR}]" in out, "\n" + out + + +@test +def test_smb2_network_req(): + proc = subprocess.Popen( + [ + "smbclient", + "-U ''", + "-N", + "-d 5", + "-t 1", + "-L", + IPV4_ADDR, + ], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + ) + out, _ = proc.communicate() + assert f"Connecting to {IPV4_ADDR} at port 445" in out, "\n" + out + assert "session request ok" in out, "\n" + out + assert f"negotiated dialect[SMB2_02] against server[{IPV4_ADDR}]" in out, "\n" + out diff --git a/test/src/tests/ssh.py b/test/src/tests/ssh.py new file mode 100644 index 0000000..31501e4 --- /dev/null +++ b/test/src/tests/ssh.py @@ -0,0 +1,217 @@ +# This file is part of masscanned. +# Copyright 2021 - The IVRE project +# +# Masscanned is free software: you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Masscanned is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY +# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public +# License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Masscanned. If not, see . + +from scapy.layers.inet import IP, TCP, UDP +from scapy.layers.inet6 import IPv6 +from scapy.layers.l2 import Ether +from scapy.packet import Raw +from scapy.sendrecv import srp1 +from scapy.volatile import RandInt + +from ..conf import IPV4_ADDR, IPV6_ADDR, MAC_ADDR +from ..core import test, check_ip_checksum, check_ipv6_checksum + + +@test +def test_ipv4_tcp_ssh(): + sport = 37183 + dports = [22, 80, 2222, 2022, 23874, 50000] + for i, dport in enumerate(dports): + seq_init = int(RandInt()) + banner = [ + b"SSH-2.0-AsyncSSH_2.1.0", + b"SSH-2.0-PuTTY", + b"SSH-2.0-libssh2_1.4.3", + b"SSH-2.0-Go", + b"SSH-2.0-PUTTY", + ][i % 5] + syn = ( + Ether(dst=MAC_ADDR) + / IP(dst=IPV4_ADDR) + / TCP(flags="S", sport=sport, dport=dport, seq=seq_init) + ) + syn_ack = srp1(syn, timeout=1) + assert syn_ack is not None, "expecting answer, got nothing" + check_ip_checksum(syn_ack) + assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary() + syn_ack = syn_ack[TCP] + assert syn_ack.flags == "SA" + ack = ( + Ether(dst=MAC_ADDR) + / IP(dst=IPV4_ADDR) + / TCP( + flags="A", + sport=sport, + dport=dport, + seq=seq_init + 1, + ack=syn_ack.seq + 1, + ) + ) + _ = srp1(ack, timeout=1) + req = ( + Ether(dst=MAC_ADDR) + / IP(dst=IPV4_ADDR) + / TCP( + flags="PA", + sport=sport, + dport=dport, + seq=seq_init + 1, + ack=syn_ack.seq + 1, + ) + / Raw(banner + b"\r\n") + ) + resp = srp1(req, timeout=1) + assert resp is not None, "expecting answer, got nothing" + check_ip_checksum(resp) + assert TCP in resp, "expecting TCP, got %r" % resp.summary() + tcp = resp[TCP] + assert "A" in tcp.flags, "expecting ACK flag, not set (%r)" % tcp.flags + assert "P" in tcp.flags, "expecting PSH flag, not set (%r)" % tcp.flags + assert len(tcp.payload) > 0, "expecting payload, got none" + assert tcp.payload.load.startswith(b"SSH-2.0-"), ( + "unexpected banner: %r" % tcp.payload.load + ) + assert tcp.payload.load.endswith(b"\r\n"), ( + "unexpected banner: %r" % tcp.payload.load + ) + + +@test +def test_ipv4_udp_ssh(): + sport = 37183 + dports = [22, 80, 2222, 2022, 23874, 50000] + for i, dport in enumerate(dports): + banner = [ + b"SSH-2.0-AsyncSSH_2.1.0", + b"SSH-2.0-PuTTY", + b"SSH-2.0-libssh2_1.4.3", + b"SSH-2.0-Go", + b"SSH-2.0-PUTTY", + ][i % 5] + req = ( + Ether(dst=MAC_ADDR) + / IP(dst=IPV4_ADDR) + / UDP(sport=sport, dport=dport) + / Raw(banner + b"\r\n") + ) + resp = srp1(req, timeout=1) + assert resp is not None, "expecting answer, got nothing" + check_ip_checksum(resp) + assert UDP in resp + udp = resp[UDP] + assert len(udp.payload) > 0, "expecting payload, got none" + assert udp.payload.load.startswith(b"SSH-2.0-"), ( + "unexpected banner: %r" % udp.payload.load + ) + assert udp.payload.load.endswith(b"\r\n"), ( + "unexpected banner: %r" % udp.payload.load + ) + + +@test +def test_ipv6_tcp_ssh(): + sport = 37183 + dports = [22, 80, 2222, 2022, 23874, 50000] + for i, dport in enumerate(dports): + seq_init = int(RandInt()) + banner = [ + b"SSH-2.0-AsyncSSH_2.1.0", + b"SSH-2.0-PuTTY", + b"SSH-2.0-libssh2_1.4.3", + b"SSH-2.0-Go", + b"SSH-2.0-PUTTY", + ][i % 5] + syn = ( + Ether(dst=MAC_ADDR) + / IPv6(dst=IPV6_ADDR) + / TCP(flags="S", sport=sport, dport=dport, seq=seq_init) + ) + syn_ack = srp1(syn, timeout=1) + assert syn_ack is not None, "expecting answer, got nothing" + check_ipv6_checksum(syn_ack) + assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary() + syn_ack = syn_ack[TCP] + assert syn_ack.flags == "SA" + ack = ( + Ether(dst=MAC_ADDR) + / IPv6(dst=IPV6_ADDR) + / TCP( + flags="A", + sport=sport, + dport=dport, + seq=seq_init + 1, + ack=syn_ack.seq + 1, + ) + ) + _ = srp1(ack, timeout=1) + req = ( + Ether(dst=MAC_ADDR) + / IPv6(dst=IPV6_ADDR) + / TCP( + flags="PA", + sport=sport, + dport=dport, + seq=seq_init + 1, + ack=syn_ack.seq + 1, + ) + / Raw(banner + b"\r\n") + ) + resp = srp1(req, timeout=1) + assert resp is not None, "expecting answer, got nothing" + check_ipv6_checksum(resp) + assert TCP in resp, "expecting TCP, got %r" % resp.summary() + tcp = resp[TCP] + assert "A" in tcp.flags, "expecting ACK flag, not set (%r)" % tcp.flags + assert "P" in tcp.flags, "expecting PSH flag, not set (%r)" % tcp.flags + assert len(tcp.payload) > 0, "expecting payload, got none" + assert tcp.payload.load.startswith(b"SSH-2.0-"), ( + "unexpected banner: %r" % tcp.payload.load + ) + assert tcp.payload.load.endswith(b"\r\n"), ( + "unexpected banner: %r" % tcp.payload.load + ) + + +@test +def test_ipv6_udp_ssh(): + sport = 37183 + dports = [22, 80, 2222, 2022, 23874, 50000] + for i, dport in enumerate(dports): + banner = [ + b"SSH-2.0-AsyncSSH_2.1.0", + b"SSH-2.0-PuTTY", + b"SSH-2.0-libssh2_1.4.3", + b"SSH-2.0-Go", + b"SSH-2.0-PUTTY", + ][i % 5] + req = ( + Ether(dst=MAC_ADDR) + / IPv6(dst=IPV6_ADDR) + / UDP(sport=sport, dport=dport) + / Raw(banner + b"\r\n") + ) + resp = srp1(req, timeout=1) + assert resp is not None, "expecting answer, got nothing" + check_ipv6_checksum(resp) + assert UDP in resp + udp = resp[UDP] + assert len(udp.payload) > 0, "expecting payload, got none" + assert udp.payload.load.startswith(b"SSH-2.0-"), ( + "unexpected banner: %r" % udp.payload.load + ) + assert udp.payload.load.endswith(b"\r\n"), ( + "unexpected banner: %r" % udp.payload.load + ) diff --git a/test/src/tests/stun.py b/test/src/tests/stun.py new file mode 100644 index 0000000..4062ec9 --- /dev/null +++ b/test/src/tests/stun.py @@ -0,0 +1,196 @@ +# This file is part of masscanned. +# Copyright 2021 - The IVRE project +# +# Masscanned is free software: you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Masscanned is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY +# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public +# License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Masscanned. If not, see . + +from socket import AF_INET6 +import struct + +from scapy.layers.inet import IP, UDP +from scapy.layers.inet6 import IPv6 +from scapy.layers.l2 import Ether +from scapy.packet import Raw +from scapy.pton_ntop import inet_pton +from scapy.sendrecv import srp1 + +from ..conf import IPV4_ADDR, IPV6_ADDR, MAC_ADDR +from ..core import test, check_ip_checksum, check_ipv6_checksum + + +@test +def test_ipv4_udp_stun(): + sports = [12345, 55555, 80, 43273] + dports = [80, 800, 8000, 3478] + payload = bytes.fromhex("000100002112a442000000000000000000000000") + for sport in sports: + for dport in dports: + req = ( + Ether(dst=MAC_ADDR) + / IP(dst=IPV4_ADDR) + / UDP(sport=sport, dport=dport) + / Raw(payload) + ) + resp = srp1(req, timeout=1) + assert resp is not None, "expecting answer, got nothing" + check_ip_checksum(resp) + assert UDP in resp, "no UDP layer found" + udp = resp[UDP] + assert udp.sport == dport, "unexpected UDP sport: {}".format(udp.sport) + assert udp.dport == sport, "unexpected UDP dport: {}".format(udp.dport) + resp_payload = udp.payload.load + type_, length, magic = struct.unpack(">HHI", resp_payload[:8]) + tid = resp_payload[8:20] + data = resp_payload[20:] + assert type_ == 0x0101, "expected type 0X0101, got 0x{:04x}".format(type_) + assert length == 12, "expected length 12, got {}".format(length) + assert ( + magic == 0x2112A442 + ), "expected magic 0x2112a442, got 0x{:08x}".format(magic) + assert ( + tid == b"\x00" * 12 + ), "expected tid 0x000000000000000000000000, got {:x}".format(tid) + expected_data = b"\x00\x01\x00\x08\x00\x01" + struct.pack( + ">HBBBB", sport, 192, 0, 0, 0 + ) + assert ( + data == expected_data + ), f"unexpected data {data!r} != {expected_data!r}" + + +@test +def test_ipv6_udp_stun(): + sports = [12345, 55555, 80, 43273] + dports = [80, 800, 8000, 3478] + payload = bytes.fromhex("000100002112a442000000000000000000000000") + for sport in sports: + for dport in dports: + req = ( + Ether(dst=MAC_ADDR) + / IPv6(dst=IPV6_ADDR) + / UDP(sport=sport, dport=dport) + / Raw(payload) + ) + resp = srp1(req, timeout=1) + assert resp is not None, "expecting answer, got nothing" + check_ipv6_checksum(resp) + assert UDP in resp + udp = resp[UDP] + assert udp.sport == dport + assert udp.dport == sport + resp_payload = udp.payload.load + type_, length, magic = struct.unpack(">HHI", resp_payload[:8]) + tid = resp_payload[8:20] + data = resp_payload[20:] + assert type_ == 0x0101, "expected type 0X0101, got 0x{:04x}".format(type_) + assert length == 24, "expected length 24, got {}".format(length) + assert ( + magic == 0x2112A442 + ), "expected magic 0x2112a442, got 0x{:08x}".format(magic) + assert ( + tid == b"\x00" * 12 + ), "expected tid 0x000000000000000000000000, got {:x}".format(tid) + expected_data = ( + bytes.fromhex("000100140002") + + struct.pack(">H", sport) + + inet_pton(AF_INET6, "2001:41d0::1234:5678") + ) + assert data == expected_data, "unexpected data: {}".format(data) + + +@test +def test_ipv4_udp_stun_change_port(): + sports = [12345, 55555, 80, 43273] + dports = [80, 800, 8000, 3478, 65535] + payload = bytes.fromhex("0001000803a3b9464dd8eb75e19481474293845c0003000400000002") + for sport in sports: + for dport in dports: + req = ( + Ether(dst=MAC_ADDR) + / IP(dst=IPV4_ADDR) + / UDP(sport=sport, dport=dport) + / Raw(payload) + ) + resp = srp1(req, timeout=1) + assert resp is not None, "expecting answer, got nothing" + check_ip_checksum(resp) + assert UDP in resp, "no UDP layer found" + udp = resp[UDP] + assert ( + udp.sport == (dport + 1) % 2**16 + ), "expected answer from UDP/{}, got it from UDP/{}".format( + (dport + 1) % 2**16, udp.sport + ) + assert ( + udp.dport == sport + ), "expected answer to UDP/{}, got it to UDP/{}".format(sport, udp.dport) + resp_payload = udp.payload.load + type_, length = struct.unpack(">HH", resp_payload[:4]) + tid = resp_payload[4:20] + data = resp_payload[20:] + assert type_ == 0x0101, "expected type 0X0101, got 0x{:04x}".format(type_) + assert length == 12, "expected length 12, got {}".format(length) + assert tid == bytes.fromhex("03a3b9464dd8eb75e19481474293845c"), ( + "expected tid 0x03a3b9464dd8eb75e19481474293845c, got %r" % tid + ) + expected_data = b"\x00\x01\x00\x08\x00\x01" + struct.pack( + ">HBBBB", sport, 192, 0, 0, 0 + ) + assert ( + data == expected_data + ), f"unexpected data {data!r} != {expected_data!r}" + + +@test +def test_ipv6_udp_stun_change_port(): + sports = [12345, 55555, 80, 43273] + dports = [80, 800, 8000, 3478, 65535] + payload = bytes.fromhex("0001000803a3b9464dd8eb75e19481474293845c0003000400000002") + for sport in sports: + for dport in dports: + req = ( + Ether(dst=MAC_ADDR) + / IPv6(dst=IPV6_ADDR) + / UDP(sport=sport, dport=dport) + / Raw(payload) + ) + resp = srp1(req, timeout=1) + assert resp is not None, "expecting answer, got nothing" + check_ipv6_checksum(resp) + assert UDP in resp, "expecting UDP layer in answer, got nothing" + udp = resp[UDP] + assert ( + udp.sport == (dport + 1) % 2**16 + ), "expected answer from UDP/{}, got it from UDP/{}".format( + (dport + 1) % 2**16, udp.sport + ) + assert ( + udp.dport == sport + ), "expected answer to UDP/{}, got it to UDP/{}".format(sport, udp.dport) + resp_payload = udp.payload.load + type_, length = struct.unpack(">HH", resp_payload[:4]) + tid = resp_payload[4:20] + data = resp_payload[20:] + assert type_ == 0x0101, "expected type 0X0101, got 0x{:04x}".format(type_) + assert length == 24, "expected length 12, got {}".format(length) + assert tid == bytes.fromhex("03a3b9464dd8eb75e19481474293845c"), ( + "expected tid 0x03a3b9464dd8eb75e19481474293845c, got %r" % tid + ) + expected_data = ( + bytes.fromhex("000100140002") + + struct.pack(">H", sport) + + inet_pton(AF_INET6, "2001:41d0::1234:5678") + ) + assert ( + data == expected_data + ), f"unexpected data {data!r} != {expected_data!r}" diff --git a/test/test_masscanned.py b/test/test_masscanned.py index 044a295..232e65a 100755 --- a/test/test_masscanned.py +++ b/test/test_masscanned.py @@ -18,7 +18,6 @@ import atexit import functools -import logging import os from signal import SIGINT import subprocess @@ -39,16 +38,6 @@ from src.all import test_all from src.conf import IPV4_ADDR, IPV6_ADDR, MAC_ADDR, OUTDIR -def setup_logs(): - ch = logging.StreamHandler() - ch.setFormatter(logging.Formatter("%(levelname)s\t%(message)s")) - ch.setLevel(logging.INFO) - log = logging.getLogger(__name__) - log.setLevel(logging.INFO) - log.addHandler(ch) - return log - - def cleanup_net(iface): global ipfile subprocess.check_call(["ip", "link", "delete", iface]) @@ -77,10 +66,10 @@ def cleanup_net(iface): def setup_net(iface): global IPV4_ADDR # create the interfaces pair + atexit.register(functools.partial(cleanup_net, f"{iface}a")) subprocess.check_call( ["ip", "link", "add", f"{iface}a", "type", "veth", "peer", f"{iface}b"] ) - atexit.register(functools.partial(cleanup_net, f"{iface}a")) for sub in "a", "b": subprocess.check_call(["ip", "link", "set", f"{iface}{sub}", "up"]) subprocess.check_call(["ip", "addr", "add", "dev", f"{iface}a", "192.0.0.0/31"]) @@ -110,7 +99,6 @@ def setup_net(iface): conf.route6.resync() -LOG = setup_logs() IFACE = "masscanned" setup_net(IFACE) TCPDUMP = bool(os.environ.get("USE_TCPDUMP")) From 5a2b2927d925be9b78d11d040aecd29b8d2cea58 Mon Sep 17 00:00:00 2001 From: gpotter2 Date: Tue, 15 Feb 2022 15:27:25 +0100 Subject: [PATCH 2/2] Split ip46->ip/tcp --- test/src/all.py | 5 +-- test/src/core.py | 1 + test/src/tests/ip.py | 50 ++++++++++++++++++++++++++++++ test/src/tests/{ip46.py => tcp.py} | 30 +----------------- 4 files changed, 55 insertions(+), 31 deletions(-) create mode 100644 test/src/tests/ip.py rename test/src/tests/{ip46.py => tcp.py} (88%) diff --git a/test/src/all.py b/test/src/all.py index 96f9062..0a6bd61 100644 --- a/test/src/all.py +++ b/test/src/all.py @@ -18,7 +18,7 @@ import importlib import os # Export / other tests -from .core import test_all +from .core import test_all # noqa: F401 DEFAULT_TESTS = [ "arp", @@ -26,11 +26,12 @@ DEFAULT_TESTS = [ "http", "icmpv4", "icmpv6", - "ip46", + "ip", "rpc", "smb", "ssh", "stun", + "tcp", ] ENABLED_TESTS = DEFAULT_TESTS diff --git a/test/src/core.py b/test/src/core.py index a08e4d0..87f0844 100644 --- a/test/src/core.py +++ b/test/src/core.py @@ -36,6 +36,7 @@ LOG = setup_logs() TESTS = [] ERRORS = [] + # decorator to automatically add a function to tests def test(f): global ERRORS, TESTS diff --git a/test/src/tests/ip.py b/test/src/tests/ip.py new file mode 100644 index 0000000..f87fb75 --- /dev/null +++ b/test/src/tests/ip.py @@ -0,0 +1,50 @@ +# This file is part of masscanned. +# Copyright 2021 - The IVRE project +# +# Masscanned is free software: you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Masscanned is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY +# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public +# License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Masscanned. If not, see . + +from scapy.layers.inet import IP, ICMP +from scapy.layers.l2 import Ether +from scapy.sendrecv import srp1 + +from ..conf import IPV4_ADDR, MAC_ADDR +from ..core import test, check_ip_checksum + + +@test +def test_ipv4_req(): + ##### IP ##### + ip_req = Ether(dst=MAC_ADDR) / IP(dst=IPV4_ADDR, id=0x1337) / ICMP(type=8, code=0) + ip_repl = srp1(ip_req, timeout=1) + assert ip_repl is not None, "expecting answer, got nothing" + check_ip_checksum(ip_repl) + assert IP in ip_repl, "no IP layer in response" + ip_repl = ip_repl[IP] + assert ip_repl.id == 0, "IP identification unexpected" + + +@test +def test_eth_req_other_mac(): + #### ETH #### + ip_req = Ether(dst="00:00:00:11:11:11") / IP(dst=IPV4_ADDR) / ICMP(type=8, code=0) + ip_repl = srp1(ip_req, timeout=1) + assert ip_repl is None, "responding to other MAC addresses" + + +@test +def test_ipv4_req_other_ip(): + ##### IP ##### + ip_req = Ether(dst=MAC_ADDR) / IP(dst="1.2.3.4") / ICMP(type=8, code=0) + ip_repl = srp1(ip_req, timeout=1) + assert ip_repl is None, "responding to other IP addresses" diff --git a/test/src/tests/ip46.py b/test/src/tests/tcp.py similarity index 88% rename from test/src/tests/ip46.py rename to test/src/tests/tcp.py index 62f3912..1b0d00e 100644 --- a/test/src/tests/ip46.py +++ b/test/src/tests/tcp.py @@ -14,7 +14,7 @@ # You should have received a copy of the GNU General Public License # along with Masscanned. If not, see . -from scapy.layers.inet import IP, ICMP, TCP +from scapy.layers.inet import IP, TCP from scapy.layers.inet6 import IPv6 from scapy.layers.l2 import Ether from scapy.packet import Raw @@ -25,34 +25,6 @@ from ..conf import IPV4_ADDR, IPV6_ADDR, MAC_ADDR from ..core import test, check_ip_checksum, check_ipv6_checksum -@test -def test_ipv4_req(): - ##### IP ##### - ip_req = Ether(dst=MAC_ADDR) / IP(dst=IPV4_ADDR, id=0x1337) / ICMP(type=8, code=0) - ip_repl = srp1(ip_req, timeout=1) - assert ip_repl is not None, "expecting answer, got nothing" - check_ip_checksum(ip_repl) - assert IP in ip_repl, "no IP layer in response" - ip_repl = ip_repl[IP] - assert ip_repl.id == 0, "IP identification unexpected" - - -@test -def test_eth_req_other_mac(): - #### ETH #### - ip_req = Ether(dst="00:00:00:11:11:11") / IP(dst=IPV4_ADDR) / ICMP(type=8, code=0) - ip_repl = srp1(ip_req, timeout=1) - assert ip_repl is None, "responding to other MAC addresses" - - -@test -def test_ipv4_req_other_ip(): - ##### IP ##### - ip_req = Ether(dst=MAC_ADDR) / IP(dst="1.2.3.4") / ICMP(type=8, code=0) - ip_repl = srp1(ip_req, timeout=1) - assert ip_repl is None, "responding to other IP addresses" - - @test def test_tcp_syn(): ##### SYN-ACK #####