diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml
index 6ac573d..05c8984 100644
--- a/.github/workflows/test.yml
+++ b/.github/workflows/test.yml
@@ -87,7 +87,7 @@ jobs:
run: sudo pip install -U flake8 black
- name: Install packages for tests
- run: sudo apt-get -q update && sudo apt-get -qy install nmap rpcbind
+ run: sudo apt-get -q update && sudo apt-get -qy install nmap rpcbind smbclient
- name: Run black
run: black -t py36 --check test/test_masscanned.py test/src/
diff --git a/README.md b/README.md
index 41574c3..f3ebf81 100644
--- a/README.md
+++ b/README.md
@@ -290,6 +290,14 @@ tcpdump: pcap_loop: The interface disappeared
0 packets dropped by kernel
```
+You can also chose what tests to run using the `TESTS` environment variable
+```
+TESTS=smb ./test/test_masscanned.py
+INFO test_smb1_network_req.............................OK
+INFO test_smb2_network_req.............................OK
+INFO Ran 2 tests with 1 errors
+```
+
## Logging
### Console Logger
diff --git a/src/proto/smb.rs b/src/proto/smb.rs
index 0e1ad4b..dcd5993 100644
--- a/src/proto/smb.rs
+++ b/src/proto/smb.rs
@@ -20,7 +20,6 @@ use std::convert::TryInto;
use std::time::SystemTime;
use crate::client::ClientInfo;
-use crate::logger::MetaLogger;
use crate::Masscanned;
// NBTSession + SMB Header
@@ -990,6 +989,8 @@ pub fn repl_smb2<'a>(
#[cfg(test)]
mod tests {
use super::*;
+ use crate::logger::MetaLogger;
+
use itertools::assert_equal;
use pnet::util::MacAddr;
use std::str::FromStr;
diff --git a/test/src/all.py b/test/src/all.py
index e11d7d4..96f9062 100644
--- a/test/src/all.py
+++ b/test/src/all.py
@@ -14,1290 +14,28 @@
# You should have received a copy of the GNU General Public License
# along with Masscanned. If not, see .
-import json
-import logging
+import importlib
import os
-import re
-from socket import AF_INET6
-from subprocess import check_call
-import struct
-from tempfile import NamedTemporaryFile
-import zlib
-from ivre.db import DBNmap
-from scapy.compat import raw
-from scapy.data import ETHER_BROADCAST
-from scapy.layers.inet import ICMP, IP, TCP, UDP
-from scapy.layers.inet6 import (
- ICMPv6EchoReply,
- ICMPv6EchoRequest,
- ICMPv6ND_NA,
- ICMPv6ND_NS,
- ICMPv6NDOptDstLLAddr,
- IPv6,
-)
-from scapy.layers.l2 import ARP, Ether
-from scapy.pton_ntop import inet_pton
-from scapy.packet import Raw
-from scapy.sendrecv import srp1
-from scapy.volatile import RandInt
-
-from .conf import IPV4_ADDR, IPV6_ADDR, MAC_ADDR
-
-
-def setup_logs():
- ch = logging.StreamHandler()
- ch.setFormatter(logging.Formatter("%(levelname)s\t%(message)s"))
- ch.setLevel(logging.DEBUG)
- log = logging.getLogger(__name__)
- log.setLevel(logging.DEBUG)
- log.addHandler(ch)
- return log
-
-
-LOG = setup_logs()
-TESTS = []
-ERRORS = []
-
-
-# decorator to automatically add a function to tests
-def test(f):
- global ERRORS, TESTS
- OK = "\033[1mOK\033[0m"
- KO = "\033[1m\033[1;%dmKO\033[0m" % 31
- fname = f.__name__.ljust(50, ".")
-
- def w():
- try:
- f()
- LOG.info("{}{}".format(fname, OK))
- except AssertionError as e:
- LOG.error("{}{}: {}".format(fname, KO, e))
- ERRORS.append(fname)
-
- TESTS.append(w)
- return w
-
-
-def multicast(ip6):
- a, b = ip6.split(":")[-2:]
- mac = ["33", "33", "ff"]
- if len(a) == 4:
- mac.append(a[2:])
- else:
- mac.append("00")
- if len(b) >= 2:
- mac.append(b[:2])
- else:
- mac.append("00")
- if len(b) >= 4:
- mac.append(b[2:])
- else:
- mac.append("00")
- return ":".join(mac)
-
-
-def check_ip_checksum(pkt):
- assert IP in pkt, "no IP layer found"
- ip_pkt = pkt[IP]
- chksum = ip_pkt.chksum
- del ip_pkt.chksum
- assert IP(raw(ip_pkt)).chksum == chksum, "bad IPv4 checksum"
-
-
-def check_ipv6_checksum(pkt):
- assert IPv6 in pkt, "no IP layer found"
- ip_pkt = pkt[IPv6]
- chksum = ip_pkt.chksum
- del ip_pkt.chksum
- assert IPv6(raw(ip_pkt)).chksum == chksum, "bad IPv6 checksum"
-
-
-@test
-def test_arp_req():
- ##### ARP #####
- arp_req = Ether(dst=ETHER_BROADCAST) / ARP(pdst=IPV4_ADDR)
- arp_repl = srp1(arp_req, timeout=1)
- assert arp_repl is not None, "expecting answer, got nothing"
- assert ARP in arp_repl, "no ARP layer found"
- arp_repl = arp_repl[ARP]
- # check answer
- ## op is "is-at"
- assert arp_repl.op == 2, "unexpected ARP op: {}".format(arp_repl.op)
- ## answer for the requested IP
- assert arp_repl.psrc == arp_req.pdst, "unexpected ARP psrc: {}".format(
- arp_repl.psrc
- )
- assert arp_repl.pdst == arp_req.psrc, "unexpected ARP pdst: {}".format(
- arp_repl.pdst
- )
- ## answer is expected MAC address
- assert arp_repl.hwsrc == MAC_ADDR, "unexpected ARP hwsrc: {}".format(arp_repl.hwsrc)
-
-
-@test
-def test_arp_req_other_ip():
- ##### ARP #####
- arp_req = Ether(dst=ETHER_BROADCAST) / ARP(pdst="1.2.3.4")
- arp_repl = srp1(arp_req, timeout=1)
- assert arp_repl is None, "responding to ARP requests for other IP addresses"
-
-
-@test
-def test_ipv4_req():
- ##### IP #####
- ip_req = Ether(dst=MAC_ADDR) / IP(dst=IPV4_ADDR, id=0x1337) / ICMP(type=8, code=0)
- ip_repl = srp1(ip_req, timeout=1)
- assert ip_repl is not None, "expecting answer, got nothing"
- check_ip_checksum(ip_repl)
- assert IP in ip_repl, "no IP layer in response"
- ip_repl = ip_repl[IP]
- assert ip_repl.id == 0, "IP identification unexpected"
-
-
-@test
-def test_eth_req_other_mac():
- #### ETH ####
- ip_req = Ether(dst="00:00:00:11:11:11") / IP(dst=IPV4_ADDR) / ICMP(type=8, code=0)
- ip_repl = srp1(ip_req, timeout=1)
- assert ip_repl is None, "responding to other MAC addresses"
-
-
-@test
-def test_ipv4_req_other_ip():
- ##### IP #####
- ip_req = Ether(dst=MAC_ADDR) / IP(dst="1.2.3.4") / ICMP(type=8, code=0)
- ip_repl = srp1(ip_req, timeout=1)
- assert ip_repl is None, "responding to other IP addresses"
-
-
-@test
-def test_icmpv4_echo_req():
- ##### ICMPv4 #####
- icmp_req = (
- Ether(dst=MAC_ADDR)
- / IP(dst=IPV4_ADDR)
- / ICMP(type=8, code=0)
- / Raw("idrinkwaytoomuchcoffee")
- )
- icmp_repl = srp1(icmp_req, timeout=1)
- assert icmp_repl is not None, "expecting answer, got nothing"
- check_ip_checksum(icmp_repl)
- assert ICMP in icmp_repl
- icmp_repl = icmp_repl[ICMP]
- # check answer
- ## type is "echo-reply"
- assert icmp_repl.type == 0
- assert icmp_repl.code == 0
- ## data is the same as sent
- assert icmp_repl.load == icmp_req.load
-
-
-@test
-def test_icmpv6_neighbor_solicitation():
- ##### IPv6 Neighbor Solicitation #####
- for mac in [
- "ff:ff:ff:ff:ff:ff",
- "33:33:00:00:00:01",
- MAC_ADDR,
- multicast(IPV6_ADDR),
- ]:
- nd_ns = Ether(dst=mac) / IPv6() / ICMPv6ND_NS(tgt=IPV6_ADDR)
- nd_na = srp1(nd_ns, timeout=1)
- assert nd_na is not None, "expecting answer, got nothing"
- assert ICMPv6ND_NA in nd_na
- nd_na = nd_na[ICMPv6ND_NA]
- # check answer content
- assert nd_na.code == 0
- assert nd_na.R == 0
- assert nd_na.S == 1
- assert nd_na.O == 1 # noqa: E741
- assert nd_na.tgt == IPV6_ADDR
- # check ND Option
- assert nd_na.haslayer(ICMPv6NDOptDstLLAddr)
- assert nd_na.getlayer(ICMPv6NDOptDstLLAddr).lladdr == MAC_ADDR
- for mac in ["00:00:00:00:00:00", "33:33:33:00:00:01"]:
- nd_ns = Ether(dst="ff:ff:ff:ff:ff:ff") / IPv6() / ICMPv6ND_NS(tgt=IPV6_ADDR)
- nd_na = srp1(nd_ns, timeout=1)
- assert nd_na is not None, "expecting no answer, got one"
-
-
-@test
-def test_icmpv6_neighbor_solicitation_other_ip():
- ##### IPv6 Neighbor Solicitation #####
- nd_ns = (
- Ether(dst="ff:ff:ff:ff:ff:ff")
- / IPv6()
- / ICMPv6ND_NS(tgt="2020:4141:3030:2020::bdbd")
- )
- nd_na = srp1(nd_ns, timeout=1)
- assert nd_na is None, "responding to ND_NS for other IP addresses"
-
-
-@test
-def test_icmpv6_echo_req():
- ##### IPv6 Ping #####
- echo_req = (
- Ether(dst=MAC_ADDR)
- / IPv6(dst=IPV6_ADDR)
- / ICMPv6EchoRequest(data="waytoomanynapkins")
- )
- echo_repl = srp1(echo_req, timeout=1)
- assert echo_repl is not None, "expecting answer, got nothing"
- assert ICMPv6EchoReply in echo_repl
- echo_repl = echo_repl[ICMPv6EchoReply]
- # check answer content
- assert echo_repl.code == 0
- assert echo_repl.data == echo_req.data
-
-
-@test
-def test_tcp_syn():
- ##### SYN-ACK #####
- # test a list of ports, randomly generated once
- ports_to_test = [
- 1152,
- 2003,
- 2193,
- 3709,
- 4054,
- 6605,
- 6737,
- 6875,
- 7320,
- 8898,
- 9513,
- 9738,
- 10623,
- 10723,
- 11253,
- 12125,
- 12189,
- 12873,
- 14648,
- 14659,
- 16242,
- 16243,
- 17209,
- 17492,
- 17667,
- 17838,
- 18081,
- 18682,
- 18790,
- 19124,
- 19288,
- 19558,
- 19628,
- 19789,
- 20093,
- 21014,
- 21459,
- 21740,
- 24070,
- 24312,
- 24576,
- 26939,
- 27136,
- 27165,
- 27361,
- 29971,
- 31088,
- 33011,
- 33068,
- 34990,
- 35093,
- 35958,
- 36626,
- 36789,
- 37130,
- 37238,
- 37256,
- 37697,
- 37890,
- 38958,
- 42131,
- 43864,
- 44420,
- 44655,
- 44868,
- 45157,
- 46213,
- 46497,
- 46955,
- 49049,
- 49067,
- 49452,
- 49480,
- 50498,
- 50945,
- 51181,
- 52890,
- 53301,
- 53407,
- 53417,
- 53980,
- 55827,
- 56483,
- 58552,
- 58713,
- 58836,
- 59362,
- 59560,
- 60534,
- 60555,
- 60660,
- 61615,
- 62402,
- 62533,
- 62941,
- 63240,
- 63339,
- 63616,
- 64380,
- 65438,
- ]
- for p in ports_to_test:
- seq_init = int(RandInt())
- syn = (
- Ether(dst=MAC_ADDR)
- / IP(dst=IPV4_ADDR)
- / TCP(flags="S", dport=p, seq=seq_init)
- )
- syn_ack = srp1(syn, timeout=1)
- assert syn_ack is not None, "expecting answer, got nothing"
- check_ip_checksum(syn_ack)
- assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary()
- syn_ack = syn_ack[TCP]
- assert syn_ack.flags == "SA", "expecting TCP SA, got %r" % syn_ack.flags
- assert syn_ack.ack == seq_init + 1, "wrong TCP ack value (%r != %r)" % (
- syn_ack.ack,
- seq_init + 1,
- )
-
-
-@test
-def test_ipv4_tcp_psh_ack():
- ##### PSH-ACK #####
- sport = 26695
- port = 445
- seq_init = int(RandInt())
- # send PSH-ACK first
- psh_ack = (
- Ether(dst=MAC_ADDR)
- / IP(dst=IPV4_ADDR)
- / TCP(flags="PA", sport=sport, dport=port, seq=seq_init)
- / Raw("payload")
- )
- syn_ack = srp1(psh_ack, timeout=1)
- assert syn_ack is None, "no answer expected, got one"
- # test the anti-injection mechanism
- seq_init = int(RandInt())
- syn = (
- Ether(dst=MAC_ADDR)
- / IP(dst=IPV4_ADDR)
- / TCP(flags="S", sport=sport, dport=port, seq=seq_init)
- )
- syn_ack = srp1(syn, timeout=1)
- assert syn_ack is not None, "expecting answer, got nothing"
- check_ip_checksum(syn_ack)
- assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary()
- syn_ack = syn_ack[TCP]
- assert syn_ack.flags == "SA", "expecting TCP SA, got %r" % syn_ack.flags
- assert syn_ack.ack == seq_init + 1, "wrong TCP ack value (%r != %r)" % (
- syn_ack.ack,
- seq_init + 1,
- )
- ack = Ether(dst=MAC_ADDR) / IP(dst=IPV4_ADDR) / TCP(flags="A", dport=port)
- # should fail because no ack given
- psh_ack = (
- Ether(dst=MAC_ADDR)
- / IP(dst=IPV4_ADDR)
- / TCP(flags="PA", sport=sport, dport=port, ack=0, seq=seq_init + 1)
- )
- ack = srp1(psh_ack, timeout=1)
- assert ack is None, "no answer expected, got one"
- # should get an answer this time
- psh_ack = (
- Ether(dst=MAC_ADDR)
- / IP(dst=IPV4_ADDR)
- / TCP(
- flags="PA", sport=sport, dport=port, ack=syn_ack.seq + 1, seq=seq_init + 1
- )
- )
- ack = srp1(psh_ack, timeout=1)
- assert ack is not None, "expecting answer, got nothing"
- check_ip_checksum(ack)
- assert TCP in ack, "expecting TCP, got %r" % ack.summary()
- ack = ack[TCP]
- assert ack.flags == "A", "expecting TCP A, got %r" % syn_ack.flags
-
-
-@test
-def test_ipv6_tcp_psh_ack():
- ##### PSH-ACK #####
- sport = 26695
- port = 445
- seq_init = int(RandInt())
- # send PSH-ACK first
- psh_ack = (
- Ether(dst=MAC_ADDR)
- / IPv6(dst=IPV6_ADDR)
- / TCP(flags="PA", sport=sport, dport=port, seq=seq_init)
- / Raw("payload")
- )
- syn_ack = srp1(psh_ack, timeout=1)
- assert syn_ack is None, "no answer expected, got one"
- # test the anti-injection mechanism
- syn = (
- Ether(dst=MAC_ADDR)
- / IPv6(dst=IPV6_ADDR)
- / TCP(flags="S", sport=sport, dport=port, seq=seq_init)
- )
- syn_ack = srp1(syn, timeout=1)
- assert syn_ack is not None, "expecting answer, got nothing"
- check_ipv6_checksum(syn_ack)
- assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary()
- syn_ack = syn_ack[TCP]
- assert syn_ack.flags == "SA", "expecting TCP SA, got %r" % syn_ack.flags
- assert syn_ack.ack == seq_init + 1, "wrong TCP ack value (%r != %r)" % (
- syn_ack.ack,
- seq_init + 1,
- )
- ack = Ether(dst=MAC_ADDR) / IPv6(dst=IPV6_ADDR) / TCP(flags="A", dport=port)
- # should fail because no ack given
- psh_ack = (
- Ether(dst=MAC_ADDR)
- / IPv6(dst=IPV6_ADDR)
- / TCP(flags="PA", sport=sport, dport=port, ack=0, seq=seq_init + 1)
- )
- ack = srp1(psh_ack, timeout=1)
- assert ack is None, "no answer expected, got one"
- # should get an answer this time
- psh_ack = (
- Ether(dst=MAC_ADDR)
- / IPv6(dst=IPV6_ADDR)
- / TCP(
- flags="PA", sport=sport, dport=port, ack=syn_ack.seq + 1, seq=seq_init + 1
- )
- )
- ack = srp1(psh_ack, timeout=1)
- assert ack is not None, "expecting answer, got nothing"
- check_ipv6_checksum(ack)
- assert TCP in ack, "expecting TCP, got %r" % ack.summary()
- ack = ack[TCP]
- assert ack.flags == "A", "expecting TCP A, got %r" % syn_ack.flags
-
-
-@test
-def test_ipv4_tcp_http():
- sport = 24592
- dports = [80, 443, 5000, 53228]
- for dport in dports:
- seq_init = int(RandInt())
- syn = (
- Ether(dst=MAC_ADDR)
- / IP(dst=IPV4_ADDR)
- / TCP(flags="S", sport=sport, dport=dport, seq=seq_init)
- )
- syn_ack = srp1(syn, timeout=1)
- assert syn_ack is not None, "expecting answer, got nothing"
- check_ip_checksum(syn_ack)
- assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary()
- syn_ack = syn_ack[TCP]
- assert syn_ack.flags == "SA", "expecting TCP SA, got %r" % syn_ack.flags
- ack = (
- Ether(dst=MAC_ADDR)
- / IP(dst=IPV4_ADDR)
- / TCP(
- flags="A",
- sport=sport,
- dport=dport,
- seq=seq_init + 1,
- ack=syn_ack.seq + 1,
- )
- )
- _ = srp1(ack, timeout=1)
- req = (
- Ether(dst=MAC_ADDR)
- / IP(dst=IPV4_ADDR)
- / TCP(
- flags="PA",
- sport=sport,
- dport=dport,
- seq=seq_init + 1,
- ack=syn_ack.seq + 1,
- )
- / Raw("GET / HTTP/1.1\r\n\r\n")
- )
- resp = srp1(req, timeout=1)
- assert resp is not None, "expecting answer, got nothing"
- check_ip_checksum(resp)
- assert TCP in resp, "expecting TCP, got %r" % resp.summary()
- tcp = resp[TCP]
- assert tcp.payload.load.startswith(b"HTTP/1.1 401 Unauthorized\n")
-
-
-@test
-def test_ipv4_tcp_http_incomplete():
- sport = 24595
- dports = [80, 443, 5000, 53228]
- for dport in dports:
- seq_init = int(RandInt())
- syn = (
- Ether(dst=MAC_ADDR)
- / IP(dst=IPV4_ADDR)
- / TCP(flags="S", sport=sport, dport=dport, seq=seq_init)
- )
- syn_ack = srp1(syn, timeout=1)
- assert syn_ack is not None, "expecting answer, got nothing"
- check_ip_checksum(syn_ack)
- assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary()
- syn_ack = syn_ack[TCP]
- assert syn_ack.flags == "SA", "expecting TCP SA, got %r" % syn_ack.flags
- ack = (
- Ether(dst=MAC_ADDR)
- / IP(dst=IPV4_ADDR)
- / TCP(
- flags="A",
- sport=sport,
- dport=dport,
- seq=seq_init + 1,
- ack=syn_ack.seq + 1,
- )
- )
- _ = srp1(ack, timeout=1)
- req = (
- Ether(dst=MAC_ADDR)
- / IP(dst=IPV4_ADDR)
- / TCP(
- flags="PA",
- sport=sport,
- dport=dport,
- seq=seq_init + 1,
- ack=syn_ack.seq + 1,
- )
- # purposedly incomplete request (missing additionnal ending \r\n)
- / Raw("GET / HTTP/1.1\r\n")
- )
- resp = srp1(req, timeout=1)
- assert resp is not None, "expecting an answer, got none"
- check_ip_checksum(resp)
- assert TCP in resp, "expecting TCP, got %r" % resp.summary()
- tcp = resp[TCP]
- assert tcp.flags == "A", "expecting TCP flag A, got {}".format(tcp.flags)
-
-
-@test
-def test_ipv6_tcp_http():
- sport = 24592
- dports = [80, 443, 5000, 53228]
- for dport in dports:
- seq_init = int(RandInt())
- syn = (
- Ether(dst=MAC_ADDR)
- / IPv6(dst=IPV6_ADDR)
- / TCP(flags="S", sport=sport, dport=dport, seq=seq_init)
- )
- syn_ack = srp1(syn, timeout=1)
- assert syn_ack is not None, "expecting answer, got nothing"
- check_ipv6_checksum(syn_ack)
- assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary()
- syn_ack = syn_ack[TCP]
- assert syn_ack.flags == "SA"
- ack = (
- Ether(dst=MAC_ADDR)
- / IPv6(dst=IPV6_ADDR)
- / TCP(
- flags="A",
- sport=sport,
- dport=dport,
- seq=seq_init + 1,
- ack=syn_ack.seq + 1,
- )
- )
- _ = srp1(ack, timeout=1)
- req = (
- Ether(dst=MAC_ADDR)
- / IPv6(dst=IPV6_ADDR)
- / TCP(
- flags="PA",
- sport=sport,
- dport=dport,
- seq=seq_init + 1,
- ack=syn_ack.seq + 1,
- )
- / Raw("GET / HTTP/1.1\r\n\r\n")
- )
- resp = srp1(req, timeout=1)
- assert resp is not None, "expecting answer, got nothing"
- check_ipv6_checksum(resp)
- assert TCP in resp, "expecting TCP, got %r" % resp.summary()
- tcp = resp[TCP]
- assert tcp.payload.load.startswith(b"HTTP/1.1 401 Unauthorized\n")
-
-
-@test
-def test_ipv4_udp_http():
- sport = 24592
- dports = [80, 443, 5000, 53228]
- for dport in dports:
- req = (
- Ether(dst=MAC_ADDR)
- / IP(dst=IPV4_ADDR)
- / UDP(sport=sport, dport=dport)
- / Raw("GET / HTTP/1.1\r\n\r\n")
- )
- resp = srp1(req, timeout=1)
- assert resp is not None, "expecting answer, got nothing"
- check_ip_checksum(resp)
- assert UDP in resp
- udp = resp[UDP]
- assert udp.payload.load.startswith(b"HTTP/1.1 401 Unauthorized\n")
-
-
-@test
-def test_ipv6_udp_http():
- sport = 24592
- dports = [80, 443, 5000, 53228]
- for dport in dports:
- req = (
- Ether(dst=MAC_ADDR)
- / IPv6(dst=IPV6_ADDR)
- / UDP(sport=sport, dport=dport)
- / Raw("GET / HTTP/1.1\r\n\r\n")
- )
- resp = srp1(req, timeout=1)
- assert resp is not None, "expecting answer, got nothing"
- check_ipv6_checksum(resp)
- assert UDP in resp
- udp = resp[UDP]
- assert udp.payload.load.startswith(b"HTTP/1.1 401 Unauthorized\n")
-
-
-@test
-def test_ipv4_tcp_http_ko():
- sport = 24592
- dports = [80, 443, 5000, 53228]
- for dport in dports:
- seq_init = int(RandInt())
- syn = (
- Ether(dst=MAC_ADDR)
- / IP(dst=IPV4_ADDR)
- / TCP(flags="S", sport=sport, dport=dport, seq=seq_init)
- )
- syn_ack = srp1(syn, timeout=1)
- assert syn_ack is not None, "expecting answer, got nothing"
- check_ip_checksum(syn_ack)
- assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary()
- syn_ack = syn_ack[TCP]
- assert syn_ack.flags == "SA"
- ack = (
- Ether(dst=MAC_ADDR)
- / IP(dst=IPV4_ADDR)
- / TCP(
- flags="A",
- sport=sport,
- dport=dport,
- seq=seq_init + 1,
- ack=syn_ack.seq + 1,
- )
- )
- _ = srp1(ack, timeout=1)
- req = (
- Ether(dst=MAC_ADDR)
- / IP(dst=IPV4_ADDR)
- / TCP(
- flags="PA",
- sport=sport,
- dport=dport,
- seq=seq_init + 1,
- ack=syn_ack.seq + 1,
- )
- / Raw(bytes.fromhex("4f5054494f4e53"))
- )
- resp = srp1(req, timeout=1)
- assert resp is not None, "expecting answer, got nothing"
- check_ip_checksum(resp)
- assert TCP in resp, "expecting TCP, got %r" % resp.summary()
- assert "P" not in resp[TCP].flags
- assert len(resp[TCP].payload) == 0
-
-
-@test
-def test_ipv4_udp_http_ko():
- sport = 24592
- dports = [80, 443, 5000, 53228]
- for dport in dports:
- req = (
- Ether(dst=MAC_ADDR)
- / IP(dst=IPV4_ADDR)
- / UDP(sport=sport, dport=dport)
- / Raw(bytes.fromhex("4f5054494f4e53"))
- )
- resp = srp1(req, timeout=1)
- assert resp is None, "expecting no answer, got one"
-
-
-@test
-def test_ipv6_tcp_http_ko():
- sport = 24592
- dports = [80, 443, 5000, 53228]
- for dport in dports:
- seq_init = int(RandInt())
- syn = (
- Ether(dst=MAC_ADDR)
- / IPv6(dst=IPV6_ADDR)
- / TCP(flags="S", sport=sport, dport=dport, seq=seq_init)
- )
- syn_ack = srp1(syn, timeout=1)
- assert syn_ack is not None, "expecting answer, got nothing"
- check_ipv6_checksum(syn_ack)
- assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary()
- syn_ack = syn_ack[TCP]
- assert syn_ack.flags == "SA"
- ack = (
- Ether(dst=MAC_ADDR)
- / IPv6(dst=IPV6_ADDR)
- / TCP(
- flags="A",
- sport=sport,
- dport=dport,
- seq=seq_init + 1,
- ack=syn_ack.seq + 1,
- )
- )
- _ = srp1(ack, timeout=1)
- req = (
- Ether(dst=MAC_ADDR)
- / IPv6(dst=IPV6_ADDR)
- / TCP(
- flags="PA",
- sport=sport,
- dport=dport,
- seq=seq_init + 1,
- ack=syn_ack.seq + 1,
- )
- / Raw(bytes.fromhex("4f5054494f4e53"))
- )
- resp = srp1(req, timeout=1)
- assert resp is not None, "expecting answer, got nothing"
- check_ipv6_checksum(resp)
- assert TCP in resp, "expecting TCP, got %r" % resp.summary()
- assert "P" not in resp[TCP].flags
- assert len(resp[TCP].payload) == 0
-
-
-@test
-def test_ipv6_udp_http_ko():
- sport = 24592
- dports = [80, 443, 5000, 53228]
- for dport in dports:
- req = (
- Ether(dst=MAC_ADDR)
- / IPv6(dst=IPV6_ADDR)
- / UDP(sport=sport, dport=dport)
- / Raw(bytes.fromhex("4f5054494f4e53"))
- )
- resp = srp1(req, timeout=1)
- assert resp is None, "expecting no answer, got one"
-
-
-@test
-def test_ipv4_udp_stun():
- sports = [12345, 55555, 80, 43273]
- dports = [80, 800, 8000, 3478]
- payload = bytes.fromhex("000100002112a442000000000000000000000000")
- for sport in sports:
- for dport in dports:
- req = (
- Ether(dst=MAC_ADDR)
- / IP(dst=IPV4_ADDR)
- / UDP(sport=sport, dport=dport)
- / Raw(payload)
- )
- resp = srp1(req, timeout=1)
- assert resp is not None, "expecting answer, got nothing"
- check_ip_checksum(resp)
- assert UDP in resp, "no UDP layer found"
- udp = resp[UDP]
- assert udp.sport == dport, "unexpected UDP sport: {}".format(udp.sport)
- assert udp.dport == sport, "unexpected UDP dport: {}".format(udp.dport)
- resp_payload = udp.payload.load
- type_, length, magic = struct.unpack(">HHI", resp_payload[:8])
- tid = resp_payload[8:20]
- data = resp_payload[20:]
- assert type_ == 0x0101, "expected type 0X0101, got 0x{:04x}".format(type_)
- assert length == 12, "expected length 12, got {}".format(length)
- assert (
- magic == 0x2112A442
- ), "expected magic 0x2112a442, got 0x{:08x}".format(magic)
- assert (
- tid == b"\x00" * 12
- ), "expected tid 0x000000000000000000000000, got {:x}".format(tid)
- expected_data = b"\x00\x01\x00\x08\x00\x01" + struct.pack(
- ">HBBBB", sport, 192, 0, 0, 0
- )
- assert (
- data == expected_data
- ), f"unexpected data {data!r} != {expected_data!r}"
-
-
-@test
-def test_ipv6_udp_stun():
- sports = [12345, 55555, 80, 43273]
- dports = [80, 800, 8000, 3478]
- payload = bytes.fromhex("000100002112a442000000000000000000000000")
- for sport in sports:
- for dport in dports:
- req = (
- Ether(dst=MAC_ADDR)
- / IPv6(dst=IPV6_ADDR)
- / UDP(sport=sport, dport=dport)
- / Raw(payload)
- )
- resp = srp1(req, timeout=1)
- assert resp is not None, "expecting answer, got nothing"
- check_ipv6_checksum(resp)
- assert UDP in resp
- udp = resp[UDP]
- assert udp.sport == dport
- assert udp.dport == sport
- resp_payload = udp.payload.load
- type_, length, magic = struct.unpack(">HHI", resp_payload[:8])
- tid = resp_payload[8:20]
- data = resp_payload[20:]
- assert type_ == 0x0101, "expected type 0X0101, got 0x{:04x}".format(type_)
- assert length == 24, "expected length 24, got {}".format(length)
- assert (
- magic == 0x2112A442
- ), "expected magic 0x2112a442, got 0x{:08x}".format(magic)
- assert (
- tid == b"\x00" * 12
- ), "expected tid 0x000000000000000000000000, got {:x}".format(tid)
- expected_data = (
- bytes.fromhex("000100140002")
- + struct.pack(">H", sport)
- + inet_pton(AF_INET6, "2001:41d0::1234:5678")
- )
- assert data == expected_data, "unexpected data: {}".format(data)
-
-
-@test
-def test_ipv4_udp_stun_change_port():
- sports = [12345, 55555, 80, 43273]
- dports = [80, 800, 8000, 3478, 65535]
- payload = bytes.fromhex("0001000803a3b9464dd8eb75e19481474293845c0003000400000002")
- for sport in sports:
- for dport in dports:
- req = (
- Ether(dst=MAC_ADDR)
- / IP(dst=IPV4_ADDR)
- / UDP(sport=sport, dport=dport)
- / Raw(payload)
- )
- resp = srp1(req, timeout=1)
- assert resp is not None, "expecting answer, got nothing"
- check_ip_checksum(resp)
- assert UDP in resp, "no UDP layer found"
- udp = resp[UDP]
- assert (
- udp.sport == (dport + 1) % 2**16
- ), "expected answer from UDP/{}, got it from UDP/{}".format(
- (dport + 1) % 2**16, udp.sport
- )
- assert (
- udp.dport == sport
- ), "expected answer to UDP/{}, got it to UDP/{}".format(sport, udp.dport)
- resp_payload = udp.payload.load
- type_, length = struct.unpack(">HH", resp_payload[:4])
- tid = resp_payload[4:20]
- data = resp_payload[20:]
- assert type_ == 0x0101, "expected type 0X0101, got 0x{:04x}".format(type_)
- assert length == 12, "expected length 12, got {}".format(length)
- assert tid == bytes.fromhex("03a3b9464dd8eb75e19481474293845c"), (
- "expected tid 0x03a3b9464dd8eb75e19481474293845c, got %r" % tid
- )
- expected_data = b"\x00\x01\x00\x08\x00\x01" + struct.pack(
- ">HBBBB", sport, 192, 0, 0, 0
- )
- assert (
- data == expected_data
- ), f"unexpected data {data!r} != {expected_data!r}"
-
-
-@test
-def test_ipv6_udp_stun_change_port():
- sports = [12345, 55555, 80, 43273]
- dports = [80, 800, 8000, 3478, 65535]
- payload = bytes.fromhex("0001000803a3b9464dd8eb75e19481474293845c0003000400000002")
- for sport in sports:
- for dport in dports:
- req = (
- Ether(dst=MAC_ADDR)
- / IPv6(dst=IPV6_ADDR)
- / UDP(sport=sport, dport=dport)
- / Raw(payload)
- )
- resp = srp1(req, timeout=1)
- assert resp is not None, "expecting answer, got nothing"
- check_ipv6_checksum(resp)
- assert UDP in resp, "expecting UDP layer in answer, got nothing"
- udp = resp[UDP]
- assert (
- udp.sport == (dport + 1) % 2**16
- ), "expected answer from UDP/{}, got it from UDP/{}".format(
- (dport + 1) % 2**16, udp.sport
- )
- assert (
- udp.dport == sport
- ), "expected answer to UDP/{}, got it to UDP/{}".format(sport, udp.dport)
- resp_payload = udp.payload.load
- type_, length = struct.unpack(">HH", resp_payload[:4])
- tid = resp_payload[4:20]
- data = resp_payload[20:]
- assert type_ == 0x0101, "expected type 0X0101, got 0x{:04x}".format(type_)
- assert length == 24, "expected length 12, got {}".format(length)
- assert tid == bytes.fromhex("03a3b9464dd8eb75e19481474293845c"), (
- "expected tid 0x03a3b9464dd8eb75e19481474293845c, got %r" % tid
- )
- expected_data = (
- bytes.fromhex("000100140002")
- + struct.pack(">H", sport)
- + inet_pton(AF_INET6, "2001:41d0::1234:5678")
- )
- assert (
- data == expected_data
- ), f"unexpected data {data!r} != {expected_data!r}"
-
-
-@test
-def test_ipv4_tcp_ssh():
- sport = 37183
- dports = [22, 80, 2222, 2022, 23874, 50000]
- for i, dport in enumerate(dports):
- seq_init = int(RandInt())
- banner = [
- b"SSH-2.0-AsyncSSH_2.1.0",
- b"SSH-2.0-PuTTY",
- b"SSH-2.0-libssh2_1.4.3",
- b"SSH-2.0-Go",
- b"SSH-2.0-PUTTY",
- ][i % 5]
- syn = (
- Ether(dst=MAC_ADDR)
- / IP(dst=IPV4_ADDR)
- / TCP(flags="S", sport=sport, dport=dport, seq=seq_init)
- )
- syn_ack = srp1(syn, timeout=1)
- assert syn_ack is not None, "expecting answer, got nothing"
- check_ip_checksum(syn_ack)
- assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary()
- syn_ack = syn_ack[TCP]
- assert syn_ack.flags == "SA"
- ack = (
- Ether(dst=MAC_ADDR)
- / IP(dst=IPV4_ADDR)
- / TCP(
- flags="A",
- sport=sport,
- dport=dport,
- seq=seq_init + 1,
- ack=syn_ack.seq + 1,
- )
- )
- _ = srp1(ack, timeout=1)
- req = (
- Ether(dst=MAC_ADDR)
- / IP(dst=IPV4_ADDR)
- / TCP(
- flags="PA",
- sport=sport,
- dport=dport,
- seq=seq_init + 1,
- ack=syn_ack.seq + 1,
- )
- / Raw(banner + b"\r\n")
- )
- resp = srp1(req, timeout=1)
- assert resp is not None, "expecting answer, got nothing"
- check_ip_checksum(resp)
- assert TCP in resp, "expecting TCP, got %r" % resp.summary()
- tcp = resp[TCP]
- assert "A" in tcp.flags, "expecting ACK flag, not set (%r)" % tcp.flags
- assert "P" in tcp.flags, "expecting PSH flag, not set (%r)" % tcp.flags
- assert len(tcp.payload) > 0, "expecting payload, got none"
- assert tcp.payload.load.startswith(b"SSH-2.0-"), (
- "unexpected banner: %r" % tcp.payload.load
- )
- assert tcp.payload.load.endswith(b"\r\n"), (
- "unexpected banner: %r" % tcp.payload.load
- )
-
-
-@test
-def test_ipv4_udp_ssh():
- sport = 37183
- dports = [22, 80, 2222, 2022, 23874, 50000]
- for i, dport in enumerate(dports):
- banner = [
- b"SSH-2.0-AsyncSSH_2.1.0",
- b"SSH-2.0-PuTTY",
- b"SSH-2.0-libssh2_1.4.3",
- b"SSH-2.0-Go",
- b"SSH-2.0-PUTTY",
- ][i % 5]
- req = (
- Ether(dst=MAC_ADDR)
- / IP(dst=IPV4_ADDR)
- / UDP(sport=sport, dport=dport)
- / Raw(banner + b"\r\n")
- )
- resp = srp1(req, timeout=1)
- assert resp is not None, "expecting answer, got nothing"
- check_ip_checksum(resp)
- assert UDP in resp
- udp = resp[UDP]
- assert len(udp.payload) > 0, "expecting payload, got none"
- assert udp.payload.load.startswith(b"SSH-2.0-"), (
- "unexpected banner: %r" % udp.payload.load
- )
- assert udp.payload.load.endswith(b"\r\n"), (
- "unexpected banner: %r" % udp.payload.load
- )
-
-
-@test
-def test_ipv6_tcp_ssh():
- sport = 37183
- dports = [22, 80, 2222, 2022, 23874, 50000]
- for i, dport in enumerate(dports):
- seq_init = int(RandInt())
- banner = [
- b"SSH-2.0-AsyncSSH_2.1.0",
- b"SSH-2.0-PuTTY",
- b"SSH-2.0-libssh2_1.4.3",
- b"SSH-2.0-Go",
- b"SSH-2.0-PUTTY",
- ][i % 5]
- syn = (
- Ether(dst=MAC_ADDR)
- / IPv6(dst=IPV6_ADDR)
- / TCP(flags="S", sport=sport, dport=dport, seq=seq_init)
- )
- syn_ack = srp1(syn, timeout=1)
- assert syn_ack is not None, "expecting answer, got nothing"
- check_ipv6_checksum(syn_ack)
- assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary()
- syn_ack = syn_ack[TCP]
- assert syn_ack.flags == "SA"
- ack = (
- Ether(dst=MAC_ADDR)
- / IPv6(dst=IPV6_ADDR)
- / TCP(
- flags="A",
- sport=sport,
- dport=dport,
- seq=seq_init + 1,
- ack=syn_ack.seq + 1,
- )
- )
- _ = srp1(ack, timeout=1)
- req = (
- Ether(dst=MAC_ADDR)
- / IPv6(dst=IPV6_ADDR)
- / TCP(
- flags="PA",
- sport=sport,
- dport=dport,
- seq=seq_init + 1,
- ack=syn_ack.seq + 1,
- )
- / Raw(banner + b"\r\n")
- )
- resp = srp1(req, timeout=1)
- assert resp is not None, "expecting answer, got nothing"
- check_ipv6_checksum(resp)
- assert TCP in resp, "expecting TCP, got %r" % resp.summary()
- tcp = resp[TCP]
- assert "A" in tcp.flags, "expecting ACK flag, not set (%r)" % tcp.flags
- assert "P" in tcp.flags, "expecting PSH flag, not set (%r)" % tcp.flags
- assert len(tcp.payload) > 0, "expecting payload, got none"
- assert tcp.payload.load.startswith(b"SSH-2.0-"), (
- "unexpected banner: %r" % tcp.payload.load
- )
- assert tcp.payload.load.endswith(b"\r\n"), (
- "unexpected banner: %r" % tcp.payload.load
- )
-
-
-@test
-def test_ipv6_udp_ssh():
- sport = 37183
- dports = [22, 80, 2222, 2022, 23874, 50000]
- for i, dport in enumerate(dports):
- banner = [
- b"SSH-2.0-AsyncSSH_2.1.0",
- b"SSH-2.0-PuTTY",
- b"SSH-2.0-libssh2_1.4.3",
- b"SSH-2.0-Go",
- b"SSH-2.0-PUTTY",
- ][i % 5]
- req = (
- Ether(dst=MAC_ADDR)
- / IPv6(dst=IPV6_ADDR)
- / UDP(sport=sport, dport=dport)
- / Raw(banner + b"\r\n")
- )
- resp = srp1(req, timeout=1)
- assert resp is not None, "expecting answer, got nothing"
- check_ipv6_checksum(resp)
- assert UDP in resp
- udp = resp[UDP]
- assert len(udp.payload) > 0, "expecting payload, got none"
- assert udp.payload.load.startswith(b"SSH-2.0-"), (
- "unexpected banner: %r" % udp.payload.load
- )
- assert udp.payload.load.endswith(b"\r\n"), (
- "unexpected banner: %r" % udp.payload.load
- )
-
-
-@test
-def test_ipv4_tcp_ghost():
- sport = 37184
- dports = [22, 23874]
- for dport in dports:
- seq_init = int(RandInt())
- banner = b"Gh0st\xad\x00\x00\x00\xe0\x00\x00\x00x\x9cKS``\x98\xc3\xc0\xc0\xc0\x06\xc4\x8c@\xbcQ\x96\x81\x81\tH\x07\xa7\x16\x95e&\xa7*\x04$&g+\x182\x94\xf6\xb000\xac\xa8rc\x00\x01\x11\xa0\x82\x1f\\`&\x83\xc7K7\x86\x19\xe5n\x0c9\x95n\x0c;\x84\x0f3\xac\xe8sch\xa8^\xcf4'J\x97\xa9\x82\xe30\xc3\x91h]&\x90\xf8\xce\x97S\xcbA4L?2=\xe1\xc4\x92\x86\x0b@\xf5`\x0cT\x1f\xae\xaf]\nr\x0b\x03#\xa3\xdc\x02~\x06\x86\x03+\x18m\xc2=\xfdtC,C\xfdL<<==\\\x9d\x19\x88\x00\xe5 \x02\x00T\xf5+\\"
- syn = (
- Ether(dst=MAC_ADDR)
- / IP(dst=IPV4_ADDR)
- / TCP(flags="S", sport=sport, dport=dport, seq=seq_init)
- )
- syn_ack = srp1(syn, timeout=1)
- assert syn_ack is not None, "expecting answer, got nothing"
- check_ip_checksum(syn_ack)
- assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary()
- syn_ack = syn_ack[TCP]
- assert syn_ack.flags == "SA"
- ack = (
- Ether(dst=MAC_ADDR)
- / IP(dst=IPV4_ADDR)
- / TCP(
- flags="A",
- sport=sport,
- dport=dport,
- seq=seq_init + 1,
- ack=syn_ack.seq + 1,
- )
- )
- _ = srp1(ack, timeout=1)
- req = (
- Ether(dst=MAC_ADDR)
- / IP(dst=IPV4_ADDR)
- / TCP(
- flags="PA",
- sport=sport,
- dport=dport,
- seq=seq_init + 1,
- ack=syn_ack.seq + 1,
- )
- / Raw(banner)
- )
- resp = srp1(req, timeout=1)
- assert resp is not None, "expecting answer, got nothing"
- check_ip_checksum(resp)
- assert TCP in resp, "expecting TCP, got %r" % resp.summary()
- tcp = resp[TCP]
- assert "A" in tcp.flags, "expecting ACK flag, not set (%r)" % tcp.flags
- assert "P" in tcp.flags, "expecting PSH flag, not set (%r)" % tcp.flags
- data = raw(tcp.payload)
- assert data, "expecting payload, got none"
- assert data.startswith(b"Gh0st"), "unexpected banner: %r" % tcp.payload.load
- data_len, uncompressed_len = struct.unpack(".
+
+import logging
+
+from scapy.compat import raw
+from scapy.layers.inet import IP
+from scapy.layers.inet6 import IPv6
+
+
+def setup_logs():
+ log = logging.getLogger()
+ log.setLevel(logging.DEBUG)
+ if not log.handlers:
+ ch = logging.StreamHandler()
+ ch.setFormatter(logging.Formatter("%(levelname)s\t%(message)s"))
+ ch.setLevel(logging.DEBUG)
+ log.addHandler(ch)
+ return log
+
+
+LOG = setup_logs()
+TESTS = []
+ERRORS = []
+
+# decorator to automatically add a function to tests
+def test(f):
+ global ERRORS, TESTS
+ OK = "\033[1mOK\033[0m"
+ KO = "\033[1m\033[1;%dmKO\033[0m" % 31
+ fname = f.__name__.ljust(50, ".")
+
+ def w():
+ try:
+ f()
+ LOG.info("{}{}".format(fname, OK))
+ except AssertionError as e:
+ LOG.error("{}{}: {}".format(fname, KO, e))
+ ERRORS.append(fname)
+
+ TESTS.append(w)
+ return w
+
+
+def test_all():
+ global ERRORS, TESTS
+ # execute tests
+ for t in TESTS:
+ t()
+ LOG.info(f"\033[1mRan {len(TESTS)} tests with {len(ERRORS)} errors\033[0m")
+ return len(ERRORS)
+
+
+def multicast(ip6):
+ a, b = ip6.split(":")[-2:]
+ mac = ["33", "33", "ff"]
+ if len(a) == 4:
+ mac.append(a[2:])
+ else:
+ mac.append("00")
+ if len(b) >= 2:
+ mac.append(b[:2])
+ else:
+ mac.append("00")
+ if len(b) >= 4:
+ mac.append(b[2:])
+ else:
+ mac.append("00")
+ return ":".join(mac)
+
+
+def check_ip_checksum(pkt):
+ assert IP in pkt, "no IP layer found"
+ ip_pkt = pkt[IP]
+ chksum = ip_pkt.chksum
+ del ip_pkt.chksum
+ assert IP(raw(ip_pkt)).chksum == chksum, "bad IPv4 checksum"
+
+
+def check_ipv6_checksum(pkt):
+ assert IPv6 in pkt, "no IP layer found"
+ ip_pkt = pkt[IPv6]
+ chksum = ip_pkt.chksum
+ del ip_pkt.chksum
+ assert IPv6(raw(ip_pkt)).chksum == chksum, "bad IPv6 checksum"
diff --git a/test/src/tests/__init__.py b/test/src/tests/__init__.py
new file mode 100644
index 0000000..831f613
--- /dev/null
+++ b/test/src/tests/__init__.py
@@ -0,0 +1,15 @@
+# This file is part of masscanned.
+# Copyright 2021 - The IVRE project
+#
+# Masscanned is free software: you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Masscanned is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
+# License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Masscanned. If not, see .
diff --git a/test/src/tests/arp.py b/test/src/tests/arp.py
new file mode 100644
index 0000000..4f65cb7
--- /dev/null
+++ b/test/src/tests/arp.py
@@ -0,0 +1,51 @@
+# This file is part of masscanned.
+# Copyright 2021 - The IVRE project
+#
+# Masscanned is free software: you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Masscanned is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
+# License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Masscanned. If not, see .
+
+from scapy.layers.l2 import Ether, ARP, ETHER_BROADCAST
+from scapy.sendrecv import srp1
+
+from ..conf import IPV4_ADDR, MAC_ADDR
+from ..core import test
+
+
+@test
+def test_arp_req():
+ ##### ARP #####
+ arp_req = Ether(dst=ETHER_BROADCAST) / ARP(pdst=IPV4_ADDR)
+ arp_repl = srp1(arp_req, timeout=1)
+ assert arp_repl is not None, "expecting answer, got nothing"
+ assert ARP in arp_repl, "no ARP layer found"
+ arp_repl = arp_repl[ARP]
+ # check answer
+ ## op is "is-at"
+ assert arp_repl.op == 2, "unexpected ARP op: {}".format(arp_repl.op)
+ ## answer for the requested IP
+ assert arp_repl.psrc == arp_req.pdst, "unexpected ARP psrc: {}".format(
+ arp_repl.psrc
+ )
+ assert arp_repl.pdst == arp_req.psrc, "unexpected ARP pdst: {}".format(
+ arp_repl.pdst
+ )
+ ## answer is expected MAC address
+ assert arp_repl.hwsrc == MAC_ADDR, "unexpected ARP hwsrc: {}".format(arp_repl.hwsrc)
+
+
+@test
+def test_arp_req_other_ip():
+ ##### ARP #####
+ arp_req = Ether(dst=ETHER_BROADCAST) / ARP(pdst="1.2.3.4")
+ arp_repl = srp1(arp_req, timeout=1)
+ assert arp_repl is None, "responding to ARP requests for other IP addresses"
diff --git a/test/src/tests/ghost.py b/test/src/tests/ghost.py
new file mode 100644
index 0000000..303e58e
--- /dev/null
+++ b/test/src/tests/ghost.py
@@ -0,0 +1,87 @@
+# This file is part of masscanned.
+# Copyright 2021 - The IVRE project
+#
+# Masscanned is free software: you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Masscanned is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
+# License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Masscanned. If not, see .
+
+import struct
+import zlib
+
+from scapy.compat import raw
+from scapy.layers.inet import IP, TCP
+from scapy.layers.l2 import Ether
+from scapy.packet import Raw
+from scapy.sendrecv import srp1
+from scapy.volatile import RandInt
+
+from ..conf import IPV4_ADDR, MAC_ADDR
+from ..core import test, check_ip_checksum
+
+
+@test
+def test_ipv4_tcp_ghost():
+ sport = 37184
+ dports = [22, 23874]
+ for dport in dports:
+ seq_init = int(RandInt())
+ banner = b"Gh0st\xad\x00\x00\x00\xe0\x00\x00\x00x\x9cKS``\x98\xc3\xc0\xc0\xc0\x06\xc4\x8c@\xbcQ\x96\x81\x81\tH\x07\xa7\x16\x95e&\xa7*\x04$&g+\x182\x94\xf6\xb000\xac\xa8rc\x00\x01\x11\xa0\x82\x1f\\`&\x83\xc7K7\x86\x19\xe5n\x0c9\x95n\x0c;\x84\x0f3\xac\xe8sch\xa8^\xcf4'J\x97\xa9\x82\xe30\xc3\x91h]&\x90\xf8\xce\x97S\xcbA4L?2=\xe1\xc4\x92\x86\x0b@\xf5`\x0cT\x1f\xae\xaf]\nr\x0b\x03#\xa3\xdc\x02~\x06\x86\x03+\x18m\xc2=\xfdtC,C\xfdL<<==\\\x9d\x19\x88\x00\xe5 \x02\x00T\xf5+\\"
+ syn = (
+ Ether(dst=MAC_ADDR)
+ / IP(dst=IPV4_ADDR)
+ / TCP(flags="S", sport=sport, dport=dport, seq=seq_init)
+ )
+ syn_ack = srp1(syn, timeout=1)
+ assert syn_ack is not None, "expecting answer, got nothing"
+ check_ip_checksum(syn_ack)
+ assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary()
+ syn_ack = syn_ack[TCP]
+ assert syn_ack.flags == "SA"
+ ack = (
+ Ether(dst=MAC_ADDR)
+ / IP(dst=IPV4_ADDR)
+ / TCP(
+ flags="A",
+ sport=sport,
+ dport=dport,
+ seq=seq_init + 1,
+ ack=syn_ack.seq + 1,
+ )
+ )
+ _ = srp1(ack, timeout=1)
+ req = (
+ Ether(dst=MAC_ADDR)
+ / IP(dst=IPV4_ADDR)
+ / TCP(
+ flags="PA",
+ sport=sport,
+ dport=dport,
+ seq=seq_init + 1,
+ ack=syn_ack.seq + 1,
+ )
+ / Raw(banner)
+ )
+ resp = srp1(req, timeout=1)
+ assert resp is not None, "expecting answer, got nothing"
+ check_ip_checksum(resp)
+ assert TCP in resp, "expecting TCP, got %r" % resp.summary()
+ tcp = resp[TCP]
+ assert "A" in tcp.flags, "expecting ACK flag, not set (%r)" % tcp.flags
+ assert "P" in tcp.flags, "expecting PSH flag, not set (%r)" % tcp.flags
+ data = raw(tcp.payload)
+ assert data, "expecting payload, got none"
+ assert data.startswith(b"Gh0st"), "unexpected banner: %r" % tcp.payload.load
+ data_len, uncompressed_len = struct.unpack(".
+
+from scapy.layers.inet import IP, TCP, UDP
+from scapy.layers.inet6 import IPv6
+from scapy.layers.l2 import Ether
+from scapy.packet import Raw
+from scapy.sendrecv import srp1
+from scapy.volatile import RandInt
+
+from ..conf import IPV4_ADDR, IPV6_ADDR, MAC_ADDR
+from ..core import test, check_ip_checksum, check_ipv6_checksum
+
+
+@test
+def test_ipv4_tcp_http():
+ sport = 24592
+ dports = [80, 443, 5000, 53228]
+ for dport in dports:
+ seq_init = int(RandInt())
+ syn = (
+ Ether(dst=MAC_ADDR)
+ / IP(dst=IPV4_ADDR)
+ / TCP(flags="S", sport=sport, dport=dport, seq=seq_init)
+ )
+ syn_ack = srp1(syn, timeout=1)
+ assert syn_ack is not None, "expecting answer, got nothing"
+ check_ip_checksum(syn_ack)
+ assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary()
+ syn_ack = syn_ack[TCP]
+ assert syn_ack.flags == "SA", "expecting TCP SA, got %r" % syn_ack.flags
+ ack = (
+ Ether(dst=MAC_ADDR)
+ / IP(dst=IPV4_ADDR)
+ / TCP(
+ flags="A",
+ sport=sport,
+ dport=dport,
+ seq=seq_init + 1,
+ ack=syn_ack.seq + 1,
+ )
+ )
+ _ = srp1(ack, timeout=1)
+ req = (
+ Ether(dst=MAC_ADDR)
+ / IP(dst=IPV4_ADDR)
+ / TCP(
+ flags="PA",
+ sport=sport,
+ dport=dport,
+ seq=seq_init + 1,
+ ack=syn_ack.seq + 1,
+ )
+ / Raw("GET / HTTP/1.1\r\n\r\n")
+ )
+ resp = srp1(req, timeout=1)
+ assert resp is not None, "expecting answer, got nothing"
+ check_ip_checksum(resp)
+ assert TCP in resp, "expecting TCP, got %r" % resp.summary()
+ tcp = resp[TCP]
+ assert tcp.payload.load.startswith(b"HTTP/1.1 401 Unauthorized\n")
+
+
+@test
+def test_ipv4_tcp_http_incomplete():
+ sport = 24595
+ dports = [80, 443, 5000, 53228]
+ for dport in dports:
+ seq_init = int(RandInt())
+ syn = (
+ Ether(dst=MAC_ADDR)
+ / IP(dst=IPV4_ADDR)
+ / TCP(flags="S", sport=sport, dport=dport, seq=seq_init)
+ )
+ syn_ack = srp1(syn, timeout=1)
+ assert syn_ack is not None, "expecting answer, got nothing"
+ check_ip_checksum(syn_ack)
+ assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary()
+ syn_ack = syn_ack[TCP]
+ assert syn_ack.flags == "SA", "expecting TCP SA, got %r" % syn_ack.flags
+ ack = (
+ Ether(dst=MAC_ADDR)
+ / IP(dst=IPV4_ADDR)
+ / TCP(
+ flags="A",
+ sport=sport,
+ dport=dport,
+ seq=seq_init + 1,
+ ack=syn_ack.seq + 1,
+ )
+ )
+ _ = srp1(ack, timeout=1)
+ req = (
+ Ether(dst=MAC_ADDR)
+ / IP(dst=IPV4_ADDR)
+ / TCP(
+ flags="PA",
+ sport=sport,
+ dport=dport,
+ seq=seq_init + 1,
+ ack=syn_ack.seq + 1,
+ )
+ # purposedly incomplete request (missing additionnal ending \r\n)
+ / Raw("GET / HTTP/1.1\r\n")
+ )
+ resp = srp1(req, timeout=1)
+ assert resp is not None, "expecting an answer, got none"
+ check_ip_checksum(resp)
+ assert TCP in resp, "expecting TCP, got %r" % resp.summary()
+ tcp = resp[TCP]
+ assert tcp.flags == "A", "expecting TCP flag A, got {}".format(tcp.flags)
+
+
+@test
+def test_ipv6_tcp_http():
+ sport = 24592
+ dports = [80, 443, 5000, 53228]
+ for dport in dports:
+ seq_init = int(RandInt())
+ syn = (
+ Ether(dst=MAC_ADDR)
+ / IPv6(dst=IPV6_ADDR)
+ / TCP(flags="S", sport=sport, dport=dport, seq=seq_init)
+ )
+ syn_ack = srp1(syn, timeout=1)
+ assert syn_ack is not None, "expecting answer, got nothing"
+ check_ipv6_checksum(syn_ack)
+ assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary()
+ syn_ack = syn_ack[TCP]
+ assert syn_ack.flags == "SA"
+ ack = (
+ Ether(dst=MAC_ADDR)
+ / IPv6(dst=IPV6_ADDR)
+ / TCP(
+ flags="A",
+ sport=sport,
+ dport=dport,
+ seq=seq_init + 1,
+ ack=syn_ack.seq + 1,
+ )
+ )
+ _ = srp1(ack, timeout=1)
+ req = (
+ Ether(dst=MAC_ADDR)
+ / IPv6(dst=IPV6_ADDR)
+ / TCP(
+ flags="PA",
+ sport=sport,
+ dport=dport,
+ seq=seq_init + 1,
+ ack=syn_ack.seq + 1,
+ )
+ / Raw("GET / HTTP/1.1\r\n\r\n")
+ )
+ resp = srp1(req, timeout=1)
+ assert resp is not None, "expecting answer, got nothing"
+ check_ipv6_checksum(resp)
+ assert TCP in resp, "expecting TCP, got %r" % resp.summary()
+ tcp = resp[TCP]
+ assert tcp.payload.load.startswith(b"HTTP/1.1 401 Unauthorized\n")
+
+
+@test
+def test_ipv4_udp_http():
+ sport = 24592
+ dports = [80, 443, 5000, 53228]
+ for dport in dports:
+ req = (
+ Ether(dst=MAC_ADDR)
+ / IP(dst=IPV4_ADDR)
+ / UDP(sport=sport, dport=dport)
+ / Raw("GET / HTTP/1.1\r\n\r\n")
+ )
+ resp = srp1(req, timeout=1)
+ assert resp is not None, "expecting answer, got nothing"
+ check_ip_checksum(resp)
+ assert UDP in resp
+ udp = resp[UDP]
+ assert udp.payload.load.startswith(b"HTTP/1.1 401 Unauthorized\n")
+
+
+@test
+def test_ipv6_udp_http():
+ sport = 24592
+ dports = [80, 443, 5000, 53228]
+ for dport in dports:
+ req = (
+ Ether(dst=MAC_ADDR)
+ / IPv6(dst=IPV6_ADDR)
+ / UDP(sport=sport, dport=dport)
+ / Raw("GET / HTTP/1.1\r\n\r\n")
+ )
+ resp = srp1(req, timeout=1)
+ assert resp is not None, "expecting answer, got nothing"
+ check_ipv6_checksum(resp)
+ assert UDP in resp
+ udp = resp[UDP]
+ assert udp.payload.load.startswith(b"HTTP/1.1 401 Unauthorized\n")
+
+
+@test
+def test_ipv4_tcp_http_ko():
+ sport = 24592
+ dports = [80, 443, 5000, 53228]
+ for dport in dports:
+ seq_init = int(RandInt())
+ syn = (
+ Ether(dst=MAC_ADDR)
+ / IP(dst=IPV4_ADDR)
+ / TCP(flags="S", sport=sport, dport=dport, seq=seq_init)
+ )
+ syn_ack = srp1(syn, timeout=1)
+ assert syn_ack is not None, "expecting answer, got nothing"
+ check_ip_checksum(syn_ack)
+ assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary()
+ syn_ack = syn_ack[TCP]
+ assert syn_ack.flags == "SA"
+ ack = (
+ Ether(dst=MAC_ADDR)
+ / IP(dst=IPV4_ADDR)
+ / TCP(
+ flags="A",
+ sport=sport,
+ dport=dport,
+ seq=seq_init + 1,
+ ack=syn_ack.seq + 1,
+ )
+ )
+ _ = srp1(ack, timeout=1)
+ req = (
+ Ether(dst=MAC_ADDR)
+ / IP(dst=IPV4_ADDR)
+ / TCP(
+ flags="PA",
+ sport=sport,
+ dport=dport,
+ seq=seq_init + 1,
+ ack=syn_ack.seq + 1,
+ )
+ / Raw(bytes.fromhex("4f5054494f4e53"))
+ )
+ resp = srp1(req, timeout=1)
+ assert resp is not None, "expecting answer, got nothing"
+ check_ip_checksum(resp)
+ assert TCP in resp, "expecting TCP, got %r" % resp.summary()
+ assert "P" not in resp[TCP].flags
+ assert len(resp[TCP].payload) == 0
+
+
+@test
+def test_ipv4_udp_http_ko():
+ sport = 24592
+ dports = [80, 443, 5000, 53228]
+ for dport in dports:
+ req = (
+ Ether(dst=MAC_ADDR)
+ / IP(dst=IPV4_ADDR)
+ / UDP(sport=sport, dport=dport)
+ / Raw(bytes.fromhex("4f5054494f4e53"))
+ )
+ resp = srp1(req, timeout=1)
+ assert resp is None, "expecting no answer, got one"
+
+
+@test
+def test_ipv6_tcp_http_ko():
+ sport = 24592
+ dports = [80, 443, 5000, 53228]
+ for dport in dports:
+ seq_init = int(RandInt())
+ syn = (
+ Ether(dst=MAC_ADDR)
+ / IPv6(dst=IPV6_ADDR)
+ / TCP(flags="S", sport=sport, dport=dport, seq=seq_init)
+ )
+ syn_ack = srp1(syn, timeout=1)
+ assert syn_ack is not None, "expecting answer, got nothing"
+ check_ipv6_checksum(syn_ack)
+ assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary()
+ syn_ack = syn_ack[TCP]
+ assert syn_ack.flags == "SA"
+ ack = (
+ Ether(dst=MAC_ADDR)
+ / IPv6(dst=IPV6_ADDR)
+ / TCP(
+ flags="A",
+ sport=sport,
+ dport=dport,
+ seq=seq_init + 1,
+ ack=syn_ack.seq + 1,
+ )
+ )
+ _ = srp1(ack, timeout=1)
+ req = (
+ Ether(dst=MAC_ADDR)
+ / IPv6(dst=IPV6_ADDR)
+ / TCP(
+ flags="PA",
+ sport=sport,
+ dport=dport,
+ seq=seq_init + 1,
+ ack=syn_ack.seq + 1,
+ )
+ / Raw(bytes.fromhex("4f5054494f4e53"))
+ )
+ resp = srp1(req, timeout=1)
+ assert resp is not None, "expecting answer, got nothing"
+ check_ipv6_checksum(resp)
+ assert TCP in resp, "expecting TCP, got %r" % resp.summary()
+ assert "P" not in resp[TCP].flags
+ assert len(resp[TCP].payload) == 0
+
+
+@test
+def test_ipv6_udp_http_ko():
+ sport = 24592
+ dports = [80, 443, 5000, 53228]
+ for dport in dports:
+ req = (
+ Ether(dst=MAC_ADDR)
+ / IPv6(dst=IPV6_ADDR)
+ / UDP(sport=sport, dport=dport)
+ / Raw(bytes.fromhex("4f5054494f4e53"))
+ )
+ resp = srp1(req, timeout=1)
+ assert resp is None, "expecting no answer, got one"
diff --git a/test/src/tests/icmpv4.py b/test/src/tests/icmpv4.py
new file mode 100644
index 0000000..7722148
--- /dev/null
+++ b/test/src/tests/icmpv4.py
@@ -0,0 +1,45 @@
+# This file is part of masscanned.
+# Copyright 2021 - The IVRE project
+#
+# Masscanned is free software: you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Masscanned is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
+# License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Masscanned. If not, see .
+
+from scapy.layers.inet import IP, ICMP
+from scapy.layers.l2 import Ether
+from scapy.packet import Raw
+from scapy.sendrecv import srp1
+
+from ..conf import IPV4_ADDR, MAC_ADDR
+from ..core import test, check_ip_checksum
+
+
+@test
+def test_icmpv4_echo_req():
+ ##### ICMPv4 #####
+ icmp_req = (
+ Ether(dst=MAC_ADDR)
+ / IP(dst=IPV4_ADDR)
+ / ICMP(type=8, code=0)
+ / Raw("idrinkwaytoomuchcoffee")
+ )
+ icmp_repl = srp1(icmp_req, timeout=1)
+ assert icmp_repl is not None, "expecting answer, got nothing"
+ check_ip_checksum(icmp_repl)
+ assert ICMP in icmp_repl
+ icmp_repl = icmp_repl[ICMP]
+ # check answer
+ ## type is "echo-reply"
+ assert icmp_repl.type == 0
+ assert icmp_repl.code == 0
+ ## data is the same as sent
+ assert icmp_repl.load == icmp_req.load
diff --git a/test/src/tests/icmpv6.py b/test/src/tests/icmpv6.py
new file mode 100644
index 0000000..13a8817
--- /dev/null
+++ b/test/src/tests/icmpv6.py
@@ -0,0 +1,87 @@
+# This file is part of masscanned.
+# Copyright 2021 - The IVRE project
+#
+# Masscanned is free software: you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Masscanned is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
+# License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Masscanned. If not, see .
+
+from scapy.layers.inet6 import (
+ ICMPv6EchoReply,
+ ICMPv6EchoRequest,
+ ICMPv6NDOptDstLLAddr,
+ ICMPv6ND_NA,
+ ICMPv6ND_NS,
+ IPv6,
+)
+from scapy.layers.l2 import Ether
+from scapy.sendrecv import srp1
+
+from ..conf import IPV6_ADDR, MAC_ADDR
+from ..core import test, multicast
+
+
+@test
+def test_icmpv6_neighbor_solicitation():
+ ##### IPv6 Neighbor Solicitation #####
+ for mac in [
+ "ff:ff:ff:ff:ff:ff",
+ "33:33:00:00:00:01",
+ MAC_ADDR,
+ multicast(IPV6_ADDR),
+ ]:
+ nd_ns = Ether(dst=mac) / IPv6() / ICMPv6ND_NS(tgt=IPV6_ADDR)
+ nd_na = srp1(nd_ns, timeout=1)
+ assert nd_na is not None, "expecting answer, got nothing"
+ assert ICMPv6ND_NA in nd_na
+ nd_na = nd_na[ICMPv6ND_NA]
+ # check answer content
+ assert nd_na.code == 0
+ assert nd_na.R == 0
+ assert nd_na.S == 1
+ assert nd_na.O == 1 # noqa: E741
+ assert nd_na.tgt == IPV6_ADDR
+ # check ND Option
+ assert nd_na.haslayer(ICMPv6NDOptDstLLAddr)
+ assert nd_na.getlayer(ICMPv6NDOptDstLLAddr).lladdr == MAC_ADDR
+ for mac in ["00:00:00:00:00:00", "33:33:33:00:00:01"]:
+ nd_ns = Ether(dst="ff:ff:ff:ff:ff:ff") / IPv6() / ICMPv6ND_NS(tgt=IPV6_ADDR)
+ nd_na = srp1(nd_ns, timeout=1)
+ assert nd_na is not None, "expecting no answer, got one"
+
+
+@test
+def test_icmpv6_neighbor_solicitation_other_ip():
+ ##### IPv6 Neighbor Solicitation #####
+ nd_ns = (
+ Ether(dst="ff:ff:ff:ff:ff:ff")
+ / IPv6()
+ / ICMPv6ND_NS(tgt="2020:4141:3030:2020::bdbd")
+ )
+ nd_na = srp1(nd_ns, timeout=1)
+ assert nd_na is None, "responding to ND_NS for other IP addresses"
+
+
+@test
+def test_icmpv6_echo_req():
+ ##### IPv6 Ping #####
+ echo_req = (
+ Ether(dst=MAC_ADDR)
+ / IPv6(dst=IPV6_ADDR)
+ / ICMPv6EchoRequest(data="waytoomanynapkins")
+ )
+ echo_repl = srp1(echo_req, timeout=1)
+ assert echo_repl is not None, "expecting answer, got nothing"
+ assert ICMPv6EchoReply in echo_repl
+ echo_repl = echo_repl[ICMPv6EchoReply]
+ # check answer content
+ assert echo_repl.code == 0
+ assert echo_repl.data == echo_req.data
diff --git a/test/src/tests/ip46.py b/test/src/tests/ip46.py
new file mode 100644
index 0000000..62f3912
--- /dev/null
+++ b/test/src/tests/ip46.py
@@ -0,0 +1,291 @@
+# This file is part of masscanned.
+# Copyright 2021 - The IVRE project
+#
+# Masscanned is free software: you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Masscanned is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
+# License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Masscanned. If not, see .
+
+from scapy.layers.inet import IP, ICMP, TCP
+from scapy.layers.inet6 import IPv6
+from scapy.layers.l2 import Ether
+from scapy.packet import Raw
+from scapy.sendrecv import srp1
+from scapy.volatile import RandInt
+
+from ..conf import IPV4_ADDR, IPV6_ADDR, MAC_ADDR
+from ..core import test, check_ip_checksum, check_ipv6_checksum
+
+
+@test
+def test_ipv4_req():
+ ##### IP #####
+ ip_req = Ether(dst=MAC_ADDR) / IP(dst=IPV4_ADDR, id=0x1337) / ICMP(type=8, code=0)
+ ip_repl = srp1(ip_req, timeout=1)
+ assert ip_repl is not None, "expecting answer, got nothing"
+ check_ip_checksum(ip_repl)
+ assert IP in ip_repl, "no IP layer in response"
+ ip_repl = ip_repl[IP]
+ assert ip_repl.id == 0, "IP identification unexpected"
+
+
+@test
+def test_eth_req_other_mac():
+ #### ETH ####
+ ip_req = Ether(dst="00:00:00:11:11:11") / IP(dst=IPV4_ADDR) / ICMP(type=8, code=0)
+ ip_repl = srp1(ip_req, timeout=1)
+ assert ip_repl is None, "responding to other MAC addresses"
+
+
+@test
+def test_ipv4_req_other_ip():
+ ##### IP #####
+ ip_req = Ether(dst=MAC_ADDR) / IP(dst="1.2.3.4") / ICMP(type=8, code=0)
+ ip_repl = srp1(ip_req, timeout=1)
+ assert ip_repl is None, "responding to other IP addresses"
+
+
+@test
+def test_tcp_syn():
+ ##### SYN-ACK #####
+ # test a list of ports, randomly generated once
+ ports_to_test = [
+ 1152,
+ 2003,
+ 2193,
+ 3709,
+ 4054,
+ 6605,
+ 6737,
+ 6875,
+ 7320,
+ 8898,
+ 9513,
+ 9738,
+ 10623,
+ 10723,
+ 11253,
+ 12125,
+ 12189,
+ 12873,
+ 14648,
+ 14659,
+ 16242,
+ 16243,
+ 17209,
+ 17492,
+ 17667,
+ 17838,
+ 18081,
+ 18682,
+ 18790,
+ 19124,
+ 19288,
+ 19558,
+ 19628,
+ 19789,
+ 20093,
+ 21014,
+ 21459,
+ 21740,
+ 24070,
+ 24312,
+ 24576,
+ 26939,
+ 27136,
+ 27165,
+ 27361,
+ 29971,
+ 31088,
+ 33011,
+ 33068,
+ 34990,
+ 35093,
+ 35958,
+ 36626,
+ 36789,
+ 37130,
+ 37238,
+ 37256,
+ 37697,
+ 37890,
+ 38958,
+ 42131,
+ 43864,
+ 44420,
+ 44655,
+ 44868,
+ 45157,
+ 46213,
+ 46497,
+ 46955,
+ 49049,
+ 49067,
+ 49452,
+ 49480,
+ 50498,
+ 50945,
+ 51181,
+ 52890,
+ 53301,
+ 53407,
+ 53417,
+ 53980,
+ 55827,
+ 56483,
+ 58552,
+ 58713,
+ 58836,
+ 59362,
+ 59560,
+ 60534,
+ 60555,
+ 60660,
+ 61615,
+ 62402,
+ 62533,
+ 62941,
+ 63240,
+ 63339,
+ 63616,
+ 64380,
+ 65438,
+ ]
+ for p in ports_to_test:
+ seq_init = int(RandInt())
+ syn = (
+ Ether(dst=MAC_ADDR)
+ / IP(dst=IPV4_ADDR)
+ / TCP(flags="S", dport=p, seq=seq_init)
+ )
+ syn_ack = srp1(syn, timeout=1)
+ assert syn_ack is not None, "expecting answer, got nothing"
+ check_ip_checksum(syn_ack)
+ assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary()
+ syn_ack = syn_ack[TCP]
+ assert syn_ack.flags == "SA", "expecting TCP SA, got %r" % syn_ack.flags
+ assert syn_ack.ack == seq_init + 1, "wrong TCP ack value (%r != %r)" % (
+ syn_ack.ack,
+ seq_init + 1,
+ )
+
+
+@test
+def test_ipv4_tcp_psh_ack():
+ ##### PSH-ACK #####
+ sport = 26695
+ port = 445
+ seq_init = int(RandInt())
+ # send PSH-ACK first
+ psh_ack = (
+ Ether(dst=MAC_ADDR)
+ / IP(dst=IPV4_ADDR)
+ / TCP(flags="PA", sport=sport, dport=port, seq=seq_init)
+ / Raw("payload")
+ )
+ syn_ack = srp1(psh_ack, timeout=1)
+ assert syn_ack is None, "no answer expected, got one"
+ # test the anti-injection mechanism
+ seq_init = int(RandInt())
+ syn = (
+ Ether(dst=MAC_ADDR)
+ / IP(dst=IPV4_ADDR)
+ / TCP(flags="S", sport=sport, dport=port, seq=seq_init)
+ )
+ syn_ack = srp1(syn, timeout=1)
+ assert syn_ack is not None, "expecting answer, got nothing"
+ check_ip_checksum(syn_ack)
+ assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary()
+ syn_ack = syn_ack[TCP]
+ assert syn_ack.flags == "SA", "expecting TCP SA, got %r" % syn_ack.flags
+ assert syn_ack.ack == seq_init + 1, "wrong TCP ack value (%r != %r)" % (
+ syn_ack.ack,
+ seq_init + 1,
+ )
+ ack = Ether(dst=MAC_ADDR) / IP(dst=IPV4_ADDR) / TCP(flags="A", dport=port)
+ # should fail because no ack given
+ psh_ack = (
+ Ether(dst=MAC_ADDR)
+ / IP(dst=IPV4_ADDR)
+ / TCP(flags="PA", sport=sport, dport=port, ack=0, seq=seq_init + 1)
+ )
+ ack = srp1(psh_ack, timeout=1)
+ assert ack is None, "no answer expected, got one"
+ # should get an answer this time
+ psh_ack = (
+ Ether(dst=MAC_ADDR)
+ / IP(dst=IPV4_ADDR)
+ / TCP(
+ flags="PA", sport=sport, dport=port, ack=syn_ack.seq + 1, seq=seq_init + 1
+ )
+ )
+ ack = srp1(psh_ack, timeout=1)
+ assert ack is not None, "expecting answer, got nothing"
+ check_ip_checksum(ack)
+ assert TCP in ack, "expecting TCP, got %r" % ack.summary()
+ ack = ack[TCP]
+ assert ack.flags == "A", "expecting TCP A, got %r" % syn_ack.flags
+
+
+@test
+def test_ipv6_tcp_psh_ack():
+ ##### PSH-ACK #####
+ sport = 26695
+ port = 445
+ seq_init = int(RandInt())
+ # send PSH-ACK first
+ psh_ack = (
+ Ether(dst=MAC_ADDR)
+ / IPv6(dst=IPV6_ADDR)
+ / TCP(flags="PA", sport=sport, dport=port, seq=seq_init)
+ / Raw("payload")
+ )
+ syn_ack = srp1(psh_ack, timeout=1)
+ assert syn_ack is None, "no answer expected, got one"
+ # test the anti-injection mechanism
+ syn = (
+ Ether(dst=MAC_ADDR)
+ / IPv6(dst=IPV6_ADDR)
+ / TCP(flags="S", sport=sport, dport=port, seq=seq_init)
+ )
+ syn_ack = srp1(syn, timeout=1)
+ assert syn_ack is not None, "expecting answer, got nothing"
+ check_ipv6_checksum(syn_ack)
+ assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary()
+ syn_ack = syn_ack[TCP]
+ assert syn_ack.flags == "SA", "expecting TCP SA, got %r" % syn_ack.flags
+ assert syn_ack.ack == seq_init + 1, "wrong TCP ack value (%r != %r)" % (
+ syn_ack.ack,
+ seq_init + 1,
+ )
+ ack = Ether(dst=MAC_ADDR) / IPv6(dst=IPV6_ADDR) / TCP(flags="A", dport=port)
+ # should fail because no ack given
+ psh_ack = (
+ Ether(dst=MAC_ADDR)
+ / IPv6(dst=IPV6_ADDR)
+ / TCP(flags="PA", sport=sport, dport=port, ack=0, seq=seq_init + 1)
+ )
+ ack = srp1(psh_ack, timeout=1)
+ assert ack is None, "no answer expected, got one"
+ # should get an answer this time
+ psh_ack = (
+ Ether(dst=MAC_ADDR)
+ / IPv6(dst=IPV6_ADDR)
+ / TCP(
+ flags="PA", sport=sport, dport=port, ack=syn_ack.seq + 1, seq=seq_init + 1
+ )
+ )
+ ack = srp1(psh_ack, timeout=1)
+ assert ack is not None, "expecting answer, got nothing"
+ check_ipv6_checksum(ack)
+ assert TCP in ack, "expecting TCP, got %r" % ack.summary()
+ ack = ack[TCP]
+ assert ack.flags == "A", "expecting TCP A, got %r" % syn_ack.flags
diff --git a/test/src/tests/rpc.py b/test/src/tests/rpc.py
new file mode 100644
index 0000000..62e42a1
--- /dev/null
+++ b/test/src/tests/rpc.py
@@ -0,0 +1,99 @@
+# This file is part of masscanned.
+# Copyright 2021 - The IVRE project
+#
+# Masscanned is free software: you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Masscanned is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
+# License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Masscanned. If not, see .
+
+from subprocess import check_call
+from tempfile import NamedTemporaryFile
+import json
+import os
+import re
+
+from ivre.db import DBNmap
+
+from ..conf import IPV4_ADDR
+from ..core import test
+
+
+@test
+def test_rpc_nmap():
+ for scan in "SU":
+ with NamedTemporaryFile(delete=False) as xml_result:
+ check_call(
+ [
+ "nmap",
+ "-n",
+ "-vv",
+ "-oX",
+ "-",
+ IPV4_ADDR,
+ f"-s{scan}V",
+ "-p",
+ "111",
+ "--script",
+ "rpcinfo,rpc-grind",
+ ],
+ stdout=xml_result,
+ )
+ with NamedTemporaryFile(delete=False, mode="w") as json_result:
+ DBNmap(output=json_result).store_scan(xml_result.name)
+ os.unlink(xml_result.name)
+ with open(json_result.name) as fdesc:
+ results = [json.loads(line) for line in fdesc]
+ os.unlink(json_result.name)
+ assert len(results) == 1, f"Expected 1 result, got {len(results)}"
+ result = results[0]
+ assert len(result["ports"]) == 1, f"Expected 1 port, got {len(result['ports'])}"
+ port = result["ports"][0]
+ assert port["port"] == 111 and port["protocol"] == (
+ "tcp" if scan == "S" else "udp"
+ )
+ assert port["service_name"] in {"rpcbind", "nfs"}
+ assert port["service_extrainfo"] in {"RPC #100000", "RPC #100003"}
+ assert (
+ len(port["scripts"]) == 1
+ ), f"Expected 1 script, got {len(port['scripts'])}"
+ script = port["scripts"][0]
+ assert script["id"] == "rpcinfo", "Expected rpcinfo script, not found"
+ assert len(script["rpcinfo"]) == 1
+
+
+@test
+def test_rpcinfo():
+ with NamedTemporaryFile(delete=False) as rpcout:
+ check_call(["rpcinfo", "-p", IPV4_ADDR], stdout=rpcout)
+ with open(rpcout.name) as fdesc:
+ found = []
+ for line in fdesc:
+ line = line.split()
+ if line[0] == "program":
+ # header
+ continue
+ assert line[0] == "100000", f"Expected program 100000, got {line[0]}"
+ found.append(int(line[1]))
+ assert len(found) == 3, f"Expected three versions, got {found}"
+ for i in range(2, 5):
+ assert i in found, f"Missing version {i} in {found}"
+ os.unlink(rpcout.name)
+ with NamedTemporaryFile(delete=False) as rpcout:
+ check_call(["rpcinfo", "-u", IPV4_ADDR, "100000"], stdout=rpcout)
+ with open(rpcout.name) as fdesc:
+ found = []
+ expr = re.compile("^program 100000 version ([0-9]) ready and waiting$")
+ for line in fdesc:
+ found.append(int(expr.search(line.strip()).group(1)))
+ assert len(found) == 3, f"Expected three versions, got {found}"
+ for i in range(2, 5):
+ assert i in found, f"Missing version {i} in {found}"
+ os.unlink(rpcout.name)
diff --git a/test/src/tests/smb.py b/test/src/tests/smb.py
new file mode 100644
index 0000000..3ce421d
--- /dev/null
+++ b/test/src/tests/smb.py
@@ -0,0 +1,65 @@
+# This file is part of masscanned.
+# Copyright 2021 - The IVRE project
+#
+# Masscanned is free software: you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Masscanned is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
+# License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Masscanned. If not, see .
+
+import subprocess
+
+from ..core import test
+from ..conf import IPV4_ADDR
+
+
+@test
+def test_smb1_network_req():
+ proc = subprocess.Popen(
+ [
+ "smbclient",
+ "-U ''",
+ "-N",
+ "-d 6",
+ "-t 1",
+ "-L",
+ IPV4_ADDR,
+ "--option=client min protocol=NT1",
+ ],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ text=True,
+ )
+ out, _ = proc.communicate()
+ assert f"Connecting to {IPV4_ADDR} at port 445" in out, "\n" + out
+ assert "session request ok" in out, "\n" + out
+ assert f"negotiated dialect[NT1] against server[{IPV4_ADDR}]" in out, "\n" + out
+
+
+@test
+def test_smb2_network_req():
+ proc = subprocess.Popen(
+ [
+ "smbclient",
+ "-U ''",
+ "-N",
+ "-d 5",
+ "-t 1",
+ "-L",
+ IPV4_ADDR,
+ ],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ text=True,
+ )
+ out, _ = proc.communicate()
+ assert f"Connecting to {IPV4_ADDR} at port 445" in out, "\n" + out
+ assert "session request ok" in out, "\n" + out
+ assert f"negotiated dialect[SMB2_02] against server[{IPV4_ADDR}]" in out, "\n" + out
diff --git a/test/src/tests/ssh.py b/test/src/tests/ssh.py
new file mode 100644
index 0000000..31501e4
--- /dev/null
+++ b/test/src/tests/ssh.py
@@ -0,0 +1,217 @@
+# This file is part of masscanned.
+# Copyright 2021 - The IVRE project
+#
+# Masscanned is free software: you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Masscanned is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
+# License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Masscanned. If not, see .
+
+from scapy.layers.inet import IP, TCP, UDP
+from scapy.layers.inet6 import IPv6
+from scapy.layers.l2 import Ether
+from scapy.packet import Raw
+from scapy.sendrecv import srp1
+from scapy.volatile import RandInt
+
+from ..conf import IPV4_ADDR, IPV6_ADDR, MAC_ADDR
+from ..core import test, check_ip_checksum, check_ipv6_checksum
+
+
+@test
+def test_ipv4_tcp_ssh():
+ sport = 37183
+ dports = [22, 80, 2222, 2022, 23874, 50000]
+ for i, dport in enumerate(dports):
+ seq_init = int(RandInt())
+ banner = [
+ b"SSH-2.0-AsyncSSH_2.1.0",
+ b"SSH-2.0-PuTTY",
+ b"SSH-2.0-libssh2_1.4.3",
+ b"SSH-2.0-Go",
+ b"SSH-2.0-PUTTY",
+ ][i % 5]
+ syn = (
+ Ether(dst=MAC_ADDR)
+ / IP(dst=IPV4_ADDR)
+ / TCP(flags="S", sport=sport, dport=dport, seq=seq_init)
+ )
+ syn_ack = srp1(syn, timeout=1)
+ assert syn_ack is not None, "expecting answer, got nothing"
+ check_ip_checksum(syn_ack)
+ assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary()
+ syn_ack = syn_ack[TCP]
+ assert syn_ack.flags == "SA"
+ ack = (
+ Ether(dst=MAC_ADDR)
+ / IP(dst=IPV4_ADDR)
+ / TCP(
+ flags="A",
+ sport=sport,
+ dport=dport,
+ seq=seq_init + 1,
+ ack=syn_ack.seq + 1,
+ )
+ )
+ _ = srp1(ack, timeout=1)
+ req = (
+ Ether(dst=MAC_ADDR)
+ / IP(dst=IPV4_ADDR)
+ / TCP(
+ flags="PA",
+ sport=sport,
+ dport=dport,
+ seq=seq_init + 1,
+ ack=syn_ack.seq + 1,
+ )
+ / Raw(banner + b"\r\n")
+ )
+ resp = srp1(req, timeout=1)
+ assert resp is not None, "expecting answer, got nothing"
+ check_ip_checksum(resp)
+ assert TCP in resp, "expecting TCP, got %r" % resp.summary()
+ tcp = resp[TCP]
+ assert "A" in tcp.flags, "expecting ACK flag, not set (%r)" % tcp.flags
+ assert "P" in tcp.flags, "expecting PSH flag, not set (%r)" % tcp.flags
+ assert len(tcp.payload) > 0, "expecting payload, got none"
+ assert tcp.payload.load.startswith(b"SSH-2.0-"), (
+ "unexpected banner: %r" % tcp.payload.load
+ )
+ assert tcp.payload.load.endswith(b"\r\n"), (
+ "unexpected banner: %r" % tcp.payload.load
+ )
+
+
+@test
+def test_ipv4_udp_ssh():
+ sport = 37183
+ dports = [22, 80, 2222, 2022, 23874, 50000]
+ for i, dport in enumerate(dports):
+ banner = [
+ b"SSH-2.0-AsyncSSH_2.1.0",
+ b"SSH-2.0-PuTTY",
+ b"SSH-2.0-libssh2_1.4.3",
+ b"SSH-2.0-Go",
+ b"SSH-2.0-PUTTY",
+ ][i % 5]
+ req = (
+ Ether(dst=MAC_ADDR)
+ / IP(dst=IPV4_ADDR)
+ / UDP(sport=sport, dport=dport)
+ / Raw(banner + b"\r\n")
+ )
+ resp = srp1(req, timeout=1)
+ assert resp is not None, "expecting answer, got nothing"
+ check_ip_checksum(resp)
+ assert UDP in resp
+ udp = resp[UDP]
+ assert len(udp.payload) > 0, "expecting payload, got none"
+ assert udp.payload.load.startswith(b"SSH-2.0-"), (
+ "unexpected banner: %r" % udp.payload.load
+ )
+ assert udp.payload.load.endswith(b"\r\n"), (
+ "unexpected banner: %r" % udp.payload.load
+ )
+
+
+@test
+def test_ipv6_tcp_ssh():
+ sport = 37183
+ dports = [22, 80, 2222, 2022, 23874, 50000]
+ for i, dport in enumerate(dports):
+ seq_init = int(RandInt())
+ banner = [
+ b"SSH-2.0-AsyncSSH_2.1.0",
+ b"SSH-2.0-PuTTY",
+ b"SSH-2.0-libssh2_1.4.3",
+ b"SSH-2.0-Go",
+ b"SSH-2.0-PUTTY",
+ ][i % 5]
+ syn = (
+ Ether(dst=MAC_ADDR)
+ / IPv6(dst=IPV6_ADDR)
+ / TCP(flags="S", sport=sport, dport=dport, seq=seq_init)
+ )
+ syn_ack = srp1(syn, timeout=1)
+ assert syn_ack is not None, "expecting answer, got nothing"
+ check_ipv6_checksum(syn_ack)
+ assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary()
+ syn_ack = syn_ack[TCP]
+ assert syn_ack.flags == "SA"
+ ack = (
+ Ether(dst=MAC_ADDR)
+ / IPv6(dst=IPV6_ADDR)
+ / TCP(
+ flags="A",
+ sport=sport,
+ dport=dport,
+ seq=seq_init + 1,
+ ack=syn_ack.seq + 1,
+ )
+ )
+ _ = srp1(ack, timeout=1)
+ req = (
+ Ether(dst=MAC_ADDR)
+ / IPv6(dst=IPV6_ADDR)
+ / TCP(
+ flags="PA",
+ sport=sport,
+ dport=dport,
+ seq=seq_init + 1,
+ ack=syn_ack.seq + 1,
+ )
+ / Raw(banner + b"\r\n")
+ )
+ resp = srp1(req, timeout=1)
+ assert resp is not None, "expecting answer, got nothing"
+ check_ipv6_checksum(resp)
+ assert TCP in resp, "expecting TCP, got %r" % resp.summary()
+ tcp = resp[TCP]
+ assert "A" in tcp.flags, "expecting ACK flag, not set (%r)" % tcp.flags
+ assert "P" in tcp.flags, "expecting PSH flag, not set (%r)" % tcp.flags
+ assert len(tcp.payload) > 0, "expecting payload, got none"
+ assert tcp.payload.load.startswith(b"SSH-2.0-"), (
+ "unexpected banner: %r" % tcp.payload.load
+ )
+ assert tcp.payload.load.endswith(b"\r\n"), (
+ "unexpected banner: %r" % tcp.payload.load
+ )
+
+
+@test
+def test_ipv6_udp_ssh():
+ sport = 37183
+ dports = [22, 80, 2222, 2022, 23874, 50000]
+ for i, dport in enumerate(dports):
+ banner = [
+ b"SSH-2.0-AsyncSSH_2.1.0",
+ b"SSH-2.0-PuTTY",
+ b"SSH-2.0-libssh2_1.4.3",
+ b"SSH-2.0-Go",
+ b"SSH-2.0-PUTTY",
+ ][i % 5]
+ req = (
+ Ether(dst=MAC_ADDR)
+ / IPv6(dst=IPV6_ADDR)
+ / UDP(sport=sport, dport=dport)
+ / Raw(banner + b"\r\n")
+ )
+ resp = srp1(req, timeout=1)
+ assert resp is not None, "expecting answer, got nothing"
+ check_ipv6_checksum(resp)
+ assert UDP in resp
+ udp = resp[UDP]
+ assert len(udp.payload) > 0, "expecting payload, got none"
+ assert udp.payload.load.startswith(b"SSH-2.0-"), (
+ "unexpected banner: %r" % udp.payload.load
+ )
+ assert udp.payload.load.endswith(b"\r\n"), (
+ "unexpected banner: %r" % udp.payload.load
+ )
diff --git a/test/src/tests/stun.py b/test/src/tests/stun.py
new file mode 100644
index 0000000..4062ec9
--- /dev/null
+++ b/test/src/tests/stun.py
@@ -0,0 +1,196 @@
+# This file is part of masscanned.
+# Copyright 2021 - The IVRE project
+#
+# Masscanned is free software: you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Masscanned is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
+# License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Masscanned. If not, see .
+
+from socket import AF_INET6
+import struct
+
+from scapy.layers.inet import IP, UDP
+from scapy.layers.inet6 import IPv6
+from scapy.layers.l2 import Ether
+from scapy.packet import Raw
+from scapy.pton_ntop import inet_pton
+from scapy.sendrecv import srp1
+
+from ..conf import IPV4_ADDR, IPV6_ADDR, MAC_ADDR
+from ..core import test, check_ip_checksum, check_ipv6_checksum
+
+
+@test
+def test_ipv4_udp_stun():
+ sports = [12345, 55555, 80, 43273]
+ dports = [80, 800, 8000, 3478]
+ payload = bytes.fromhex("000100002112a442000000000000000000000000")
+ for sport in sports:
+ for dport in dports:
+ req = (
+ Ether(dst=MAC_ADDR)
+ / IP(dst=IPV4_ADDR)
+ / UDP(sport=sport, dport=dport)
+ / Raw(payload)
+ )
+ resp = srp1(req, timeout=1)
+ assert resp is not None, "expecting answer, got nothing"
+ check_ip_checksum(resp)
+ assert UDP in resp, "no UDP layer found"
+ udp = resp[UDP]
+ assert udp.sport == dport, "unexpected UDP sport: {}".format(udp.sport)
+ assert udp.dport == sport, "unexpected UDP dport: {}".format(udp.dport)
+ resp_payload = udp.payload.load
+ type_, length, magic = struct.unpack(">HHI", resp_payload[:8])
+ tid = resp_payload[8:20]
+ data = resp_payload[20:]
+ assert type_ == 0x0101, "expected type 0X0101, got 0x{:04x}".format(type_)
+ assert length == 12, "expected length 12, got {}".format(length)
+ assert (
+ magic == 0x2112A442
+ ), "expected magic 0x2112a442, got 0x{:08x}".format(magic)
+ assert (
+ tid == b"\x00" * 12
+ ), "expected tid 0x000000000000000000000000, got {:x}".format(tid)
+ expected_data = b"\x00\x01\x00\x08\x00\x01" + struct.pack(
+ ">HBBBB", sport, 192, 0, 0, 0
+ )
+ assert (
+ data == expected_data
+ ), f"unexpected data {data!r} != {expected_data!r}"
+
+
+@test
+def test_ipv6_udp_stun():
+ sports = [12345, 55555, 80, 43273]
+ dports = [80, 800, 8000, 3478]
+ payload = bytes.fromhex("000100002112a442000000000000000000000000")
+ for sport in sports:
+ for dport in dports:
+ req = (
+ Ether(dst=MAC_ADDR)
+ / IPv6(dst=IPV6_ADDR)
+ / UDP(sport=sport, dport=dport)
+ / Raw(payload)
+ )
+ resp = srp1(req, timeout=1)
+ assert resp is not None, "expecting answer, got nothing"
+ check_ipv6_checksum(resp)
+ assert UDP in resp
+ udp = resp[UDP]
+ assert udp.sport == dport
+ assert udp.dport == sport
+ resp_payload = udp.payload.load
+ type_, length, magic = struct.unpack(">HHI", resp_payload[:8])
+ tid = resp_payload[8:20]
+ data = resp_payload[20:]
+ assert type_ == 0x0101, "expected type 0X0101, got 0x{:04x}".format(type_)
+ assert length == 24, "expected length 24, got {}".format(length)
+ assert (
+ magic == 0x2112A442
+ ), "expected magic 0x2112a442, got 0x{:08x}".format(magic)
+ assert (
+ tid == b"\x00" * 12
+ ), "expected tid 0x000000000000000000000000, got {:x}".format(tid)
+ expected_data = (
+ bytes.fromhex("000100140002")
+ + struct.pack(">H", sport)
+ + inet_pton(AF_INET6, "2001:41d0::1234:5678")
+ )
+ assert data == expected_data, "unexpected data: {}".format(data)
+
+
+@test
+def test_ipv4_udp_stun_change_port():
+ sports = [12345, 55555, 80, 43273]
+ dports = [80, 800, 8000, 3478, 65535]
+ payload = bytes.fromhex("0001000803a3b9464dd8eb75e19481474293845c0003000400000002")
+ for sport in sports:
+ for dport in dports:
+ req = (
+ Ether(dst=MAC_ADDR)
+ / IP(dst=IPV4_ADDR)
+ / UDP(sport=sport, dport=dport)
+ / Raw(payload)
+ )
+ resp = srp1(req, timeout=1)
+ assert resp is not None, "expecting answer, got nothing"
+ check_ip_checksum(resp)
+ assert UDP in resp, "no UDP layer found"
+ udp = resp[UDP]
+ assert (
+ udp.sport == (dport + 1) % 2**16
+ ), "expected answer from UDP/{}, got it from UDP/{}".format(
+ (dport + 1) % 2**16, udp.sport
+ )
+ assert (
+ udp.dport == sport
+ ), "expected answer to UDP/{}, got it to UDP/{}".format(sport, udp.dport)
+ resp_payload = udp.payload.load
+ type_, length = struct.unpack(">HH", resp_payload[:4])
+ tid = resp_payload[4:20]
+ data = resp_payload[20:]
+ assert type_ == 0x0101, "expected type 0X0101, got 0x{:04x}".format(type_)
+ assert length == 12, "expected length 12, got {}".format(length)
+ assert tid == bytes.fromhex("03a3b9464dd8eb75e19481474293845c"), (
+ "expected tid 0x03a3b9464dd8eb75e19481474293845c, got %r" % tid
+ )
+ expected_data = b"\x00\x01\x00\x08\x00\x01" + struct.pack(
+ ">HBBBB", sport, 192, 0, 0, 0
+ )
+ assert (
+ data == expected_data
+ ), f"unexpected data {data!r} != {expected_data!r}"
+
+
+@test
+def test_ipv6_udp_stun_change_port():
+ sports = [12345, 55555, 80, 43273]
+ dports = [80, 800, 8000, 3478, 65535]
+ payload = bytes.fromhex("0001000803a3b9464dd8eb75e19481474293845c0003000400000002")
+ for sport in sports:
+ for dport in dports:
+ req = (
+ Ether(dst=MAC_ADDR)
+ / IPv6(dst=IPV6_ADDR)
+ / UDP(sport=sport, dport=dport)
+ / Raw(payload)
+ )
+ resp = srp1(req, timeout=1)
+ assert resp is not None, "expecting answer, got nothing"
+ check_ipv6_checksum(resp)
+ assert UDP in resp, "expecting UDP layer in answer, got nothing"
+ udp = resp[UDP]
+ assert (
+ udp.sport == (dport + 1) % 2**16
+ ), "expected answer from UDP/{}, got it from UDP/{}".format(
+ (dport + 1) % 2**16, udp.sport
+ )
+ assert (
+ udp.dport == sport
+ ), "expected answer to UDP/{}, got it to UDP/{}".format(sport, udp.dport)
+ resp_payload = udp.payload.load
+ type_, length = struct.unpack(">HH", resp_payload[:4])
+ tid = resp_payload[4:20]
+ data = resp_payload[20:]
+ assert type_ == 0x0101, "expected type 0X0101, got 0x{:04x}".format(type_)
+ assert length == 24, "expected length 12, got {}".format(length)
+ assert tid == bytes.fromhex("03a3b9464dd8eb75e19481474293845c"), (
+ "expected tid 0x03a3b9464dd8eb75e19481474293845c, got %r" % tid
+ )
+ expected_data = (
+ bytes.fromhex("000100140002")
+ + struct.pack(">H", sport)
+ + inet_pton(AF_INET6, "2001:41d0::1234:5678")
+ )
+ assert (
+ data == expected_data
+ ), f"unexpected data {data!r} != {expected_data!r}"
diff --git a/test/test_masscanned.py b/test/test_masscanned.py
index 044a295..232e65a 100755
--- a/test/test_masscanned.py
+++ b/test/test_masscanned.py
@@ -18,7 +18,6 @@
import atexit
import functools
-import logging
import os
from signal import SIGINT
import subprocess
@@ -39,16 +38,6 @@ from src.all import test_all
from src.conf import IPV4_ADDR, IPV6_ADDR, MAC_ADDR, OUTDIR
-def setup_logs():
- ch = logging.StreamHandler()
- ch.setFormatter(logging.Formatter("%(levelname)s\t%(message)s"))
- ch.setLevel(logging.INFO)
- log = logging.getLogger(__name__)
- log.setLevel(logging.INFO)
- log.addHandler(ch)
- return log
-
-
def cleanup_net(iface):
global ipfile
subprocess.check_call(["ip", "link", "delete", iface])
@@ -77,10 +66,10 @@ def cleanup_net(iface):
def setup_net(iface):
global IPV4_ADDR
# create the interfaces pair
+ atexit.register(functools.partial(cleanup_net, f"{iface}a"))
subprocess.check_call(
["ip", "link", "add", f"{iface}a", "type", "veth", "peer", f"{iface}b"]
)
- atexit.register(functools.partial(cleanup_net, f"{iface}a"))
for sub in "a", "b":
subprocess.check_call(["ip", "link", "set", f"{iface}{sub}", "up"])
subprocess.check_call(["ip", "addr", "add", "dev", f"{iface}a", "192.0.0.0/31"])
@@ -110,7 +99,6 @@ def setup_net(iface):
conf.route6.resync()
-LOG = setup_logs()
IFACE = "masscanned"
setup_net(IFACE)
TCPDUMP = bool(os.environ.get("USE_TCPDUMP"))