Merge pull request #43 from gpotter2/refactor-test-suite

Refactor test suite
This commit is contained in:
_Frky 2022-02-16 11:40:49 +01:00 committed by GitHub
commit e28ea53b5d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 1649 additions and 1300 deletions

View file

@ -87,7 +87,7 @@ jobs:
run: sudo pip install -U flake8 black run: sudo pip install -U flake8 black
- name: Install packages for tests - name: Install packages for tests
run: sudo apt-get -q update && sudo apt-get -qy install nmap rpcbind run: sudo apt-get -q update && sudo apt-get -qy install nmap rpcbind smbclient
- name: Run black - name: Run black
run: black -t py36 --check test/test_masscanned.py test/src/ run: black -t py36 --check test/test_masscanned.py test/src/

View file

@ -290,6 +290,14 @@ tcpdump: pcap_loop: The interface disappeared
0 packets dropped by kernel 0 packets dropped by kernel
``` ```
You can also chose what tests to run using the `TESTS` environment variable
```
TESTS=smb ./test/test_masscanned.py
INFO test_smb1_network_req.............................OK
INFO test_smb2_network_req.............................OK
INFO Ran 2 tests with 1 errors
```
## Logging ## Logging
### Console Logger ### Console Logger

View file

@ -20,7 +20,6 @@ use std::convert::TryInto;
use std::time::SystemTime; use std::time::SystemTime;
use crate::client::ClientInfo; use crate::client::ClientInfo;
use crate::logger::MetaLogger;
use crate::Masscanned; use crate::Masscanned;
// NBTSession + SMB Header // NBTSession + SMB Header
@ -990,6 +989,8 @@ pub fn repl_smb2<'a>(
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::logger::MetaLogger;
use itertools::assert_equal; use itertools::assert_equal;
use pnet::util::MacAddr; use pnet::util::MacAddr;
use std::str::FromStr; use std::str::FromStr;

File diff suppressed because it is too large Load diff

99
test/src/core.py Normal file
View file

@ -0,0 +1,99 @@
# This file is part of masscanned.
# Copyright 2021 - The IVRE project
#
# Masscanned is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Masscanned is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
# License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Masscanned. If not, see <http://www.gnu.org/licenses/>.
import logging
from scapy.compat import raw
from scapy.layers.inet import IP
from scapy.layers.inet6 import IPv6
def setup_logs():
log = logging.getLogger()
log.setLevel(logging.DEBUG)
if not log.handlers:
ch = logging.StreamHandler()
ch.setFormatter(logging.Formatter("%(levelname)s\t%(message)s"))
ch.setLevel(logging.DEBUG)
log.addHandler(ch)
return log
LOG = setup_logs()
TESTS = []
ERRORS = []
# decorator to automatically add a function to tests
def test(f):
global ERRORS, TESTS
OK = "\033[1mOK\033[0m"
KO = "\033[1m\033[1;%dmKO\033[0m" % 31
fname = f.__name__.ljust(50, ".")
def w():
try:
f()
LOG.info("{}{}".format(fname, OK))
except AssertionError as e:
LOG.error("{}{}: {}".format(fname, KO, e))
ERRORS.append(fname)
TESTS.append(w)
return w
def test_all():
global ERRORS, TESTS
# execute tests
for t in TESTS:
t()
LOG.info(f"\033[1mRan {len(TESTS)} tests with {len(ERRORS)} errors\033[0m")
return len(ERRORS)
def multicast(ip6):
a, b = ip6.split(":")[-2:]
mac = ["33", "33", "ff"]
if len(a) == 4:
mac.append(a[2:])
else:
mac.append("00")
if len(b) >= 2:
mac.append(b[:2])
else:
mac.append("00")
if len(b) >= 4:
mac.append(b[2:])
else:
mac.append("00")
return ":".join(mac)
def check_ip_checksum(pkt):
assert IP in pkt, "no IP layer found"
ip_pkt = pkt[IP]
chksum = ip_pkt.chksum
del ip_pkt.chksum
assert IP(raw(ip_pkt)).chksum == chksum, "bad IPv4 checksum"
def check_ipv6_checksum(pkt):
assert IPv6 in pkt, "no IP layer found"
ip_pkt = pkt[IPv6]
chksum = ip_pkt.chksum
del ip_pkt.chksum
assert IPv6(raw(ip_pkt)).chksum == chksum, "bad IPv6 checksum"

View file

@ -0,0 +1,15 @@
# This file is part of masscanned.
# Copyright 2021 - The IVRE project
#
# Masscanned is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Masscanned is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
# License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Masscanned. If not, see <http://www.gnu.org/licenses/>.

51
test/src/tests/arp.py Normal file
View file

@ -0,0 +1,51 @@
# This file is part of masscanned.
# Copyright 2021 - The IVRE project
#
# Masscanned is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Masscanned is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
# License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Masscanned. If not, see <http://www.gnu.org/licenses/>.
from scapy.layers.l2 import Ether, ARP, ETHER_BROADCAST
from scapy.sendrecv import srp1
from ..conf import IPV4_ADDR, MAC_ADDR
from ..core import test
@test
def test_arp_req():
##### ARP #####
arp_req = Ether(dst=ETHER_BROADCAST) / ARP(pdst=IPV4_ADDR)
arp_repl = srp1(arp_req, timeout=1)
assert arp_repl is not None, "expecting answer, got nothing"
assert ARP in arp_repl, "no ARP layer found"
arp_repl = arp_repl[ARP]
# check answer
## op is "is-at"
assert arp_repl.op == 2, "unexpected ARP op: {}".format(arp_repl.op)
## answer for the requested IP
assert arp_repl.psrc == arp_req.pdst, "unexpected ARP psrc: {}".format(
arp_repl.psrc
)
assert arp_repl.pdst == arp_req.psrc, "unexpected ARP pdst: {}".format(
arp_repl.pdst
)
## answer is expected MAC address
assert arp_repl.hwsrc == MAC_ADDR, "unexpected ARP hwsrc: {}".format(arp_repl.hwsrc)
@test
def test_arp_req_other_ip():
##### ARP #####
arp_req = Ether(dst=ETHER_BROADCAST) / ARP(pdst="1.2.3.4")
arp_repl = srp1(arp_req, timeout=1)
assert arp_repl is None, "responding to ARP requests for other IP addresses"

87
test/src/tests/ghost.py Normal file
View file

@ -0,0 +1,87 @@
# This file is part of masscanned.
# Copyright 2021 - The IVRE project
#
# Masscanned is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Masscanned is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
# License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Masscanned. If not, see <http://www.gnu.org/licenses/>.
import struct
import zlib
from scapy.compat import raw
from scapy.layers.inet import IP, TCP
from scapy.layers.l2 import Ether
from scapy.packet import Raw
from scapy.sendrecv import srp1
from scapy.volatile import RandInt
from ..conf import IPV4_ADDR, MAC_ADDR
from ..core import test, check_ip_checksum
@test
def test_ipv4_tcp_ghost():
sport = 37184
dports = [22, 23874]
for dport in dports:
seq_init = int(RandInt())
banner = b"Gh0st\xad\x00\x00\x00\xe0\x00\x00\x00x\x9cKS``\x98\xc3\xc0\xc0\xc0\x06\xc4\x8c@\xbcQ\x96\x81\x81\tH\x07\xa7\x16\x95e&\xa7*\x04$&g+\x182\x94\xf6\xb000\xac\xa8rc\x00\x01\x11\xa0\x82\x1f\\`&\x83\xc7K7\x86\x19\xe5n\x0c9\x95n\x0c;\x84\x0f3\xac\xe8sch\xa8^\xcf4'J\x97\xa9\x82\xe30\xc3\x91h]&\x90\xf8\xce\x97S\xcbA4L?2=\xe1\xc4\x92\x86\x0b@\xf5`\x0cT\x1f\xae\xaf]\nr\x0b\x03#\xa3\xdc\x02~\x06\x86\x03+\x18m\xc2=\xfdtC,C\xfdL<<==\\\x9d\x19\x88\x00\xe5 \x02\x00T\xf5+\\"
syn = (
Ether(dst=MAC_ADDR)
/ IP(dst=IPV4_ADDR)
/ TCP(flags="S", sport=sport, dport=dport, seq=seq_init)
)
syn_ack = srp1(syn, timeout=1)
assert syn_ack is not None, "expecting answer, got nothing"
check_ip_checksum(syn_ack)
assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary()
syn_ack = syn_ack[TCP]
assert syn_ack.flags == "SA"
ack = (
Ether(dst=MAC_ADDR)
/ IP(dst=IPV4_ADDR)
/ TCP(
flags="A",
sport=sport,
dport=dport,
seq=seq_init + 1,
ack=syn_ack.seq + 1,
)
)
_ = srp1(ack, timeout=1)
req = (
Ether(dst=MAC_ADDR)
/ IP(dst=IPV4_ADDR)
/ TCP(
flags="PA",
sport=sport,
dport=dport,
seq=seq_init + 1,
ack=syn_ack.seq + 1,
)
/ Raw(banner)
)
resp = srp1(req, timeout=1)
assert resp is not None, "expecting answer, got nothing"
check_ip_checksum(resp)
assert TCP in resp, "expecting TCP, got %r" % resp.summary()
tcp = resp[TCP]
assert "A" in tcp.flags, "expecting ACK flag, not set (%r)" % tcp.flags
assert "P" in tcp.flags, "expecting PSH flag, not set (%r)" % tcp.flags
data = raw(tcp.payload)
assert data, "expecting payload, got none"
assert data.startswith(b"Gh0st"), "unexpected banner: %r" % tcp.payload.load
data_len, uncompressed_len = struct.unpack("<II", data[5:13])
assert len(data) == data_len, "invalid Ghost payload: %r" % data
assert len(zlib.decompress(data[13:])) == uncompressed_len, (
"invalid Ghost payload: %r" % data
)

339
test/src/tests/http.py Normal file
View file

@ -0,0 +1,339 @@
# This file is part of masscanned.
# Copyright 2021 - The IVRE project
#
# Masscanned is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Masscanned is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
# License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Masscanned. If not, see <http://www.gnu.org/licenses/>.
from scapy.layers.inet import IP, TCP, UDP
from scapy.layers.inet6 import IPv6
from scapy.layers.l2 import Ether
from scapy.packet import Raw
from scapy.sendrecv import srp1
from scapy.volatile import RandInt
from ..conf import IPV4_ADDR, IPV6_ADDR, MAC_ADDR
from ..core import test, check_ip_checksum, check_ipv6_checksum
@test
def test_ipv4_tcp_http():
sport = 24592
dports = [80, 443, 5000, 53228]
for dport in dports:
seq_init = int(RandInt())
syn = (
Ether(dst=MAC_ADDR)
/ IP(dst=IPV4_ADDR)
/ TCP(flags="S", sport=sport, dport=dport, seq=seq_init)
)
syn_ack = srp1(syn, timeout=1)
assert syn_ack is not None, "expecting answer, got nothing"
check_ip_checksum(syn_ack)
assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary()
syn_ack = syn_ack[TCP]
assert syn_ack.flags == "SA", "expecting TCP SA, got %r" % syn_ack.flags
ack = (
Ether(dst=MAC_ADDR)
/ IP(dst=IPV4_ADDR)
/ TCP(
flags="A",
sport=sport,
dport=dport,
seq=seq_init + 1,
ack=syn_ack.seq + 1,
)
)
_ = srp1(ack, timeout=1)
req = (
Ether(dst=MAC_ADDR)
/ IP(dst=IPV4_ADDR)
/ TCP(
flags="PA",
sport=sport,
dport=dport,
seq=seq_init + 1,
ack=syn_ack.seq + 1,
)
/ Raw("GET / HTTP/1.1\r\n\r\n")
)
resp = srp1(req, timeout=1)
assert resp is not None, "expecting answer, got nothing"
check_ip_checksum(resp)
assert TCP in resp, "expecting TCP, got %r" % resp.summary()
tcp = resp[TCP]
assert tcp.payload.load.startswith(b"HTTP/1.1 401 Unauthorized\n")
@test
def test_ipv4_tcp_http_incomplete():
sport = 24595
dports = [80, 443, 5000, 53228]
for dport in dports:
seq_init = int(RandInt())
syn = (
Ether(dst=MAC_ADDR)
/ IP(dst=IPV4_ADDR)
/ TCP(flags="S", sport=sport, dport=dport, seq=seq_init)
)
syn_ack = srp1(syn, timeout=1)
assert syn_ack is not None, "expecting answer, got nothing"
check_ip_checksum(syn_ack)
assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary()
syn_ack = syn_ack[TCP]
assert syn_ack.flags == "SA", "expecting TCP SA, got %r" % syn_ack.flags
ack = (
Ether(dst=MAC_ADDR)
/ IP(dst=IPV4_ADDR)
/ TCP(
flags="A",
sport=sport,
dport=dport,
seq=seq_init + 1,
ack=syn_ack.seq + 1,
)
)
_ = srp1(ack, timeout=1)
req = (
Ether(dst=MAC_ADDR)
/ IP(dst=IPV4_ADDR)
/ TCP(
flags="PA",
sport=sport,
dport=dport,
seq=seq_init + 1,
ack=syn_ack.seq + 1,
)
# purposedly incomplete request (missing additionnal ending \r\n)
/ Raw("GET / HTTP/1.1\r\n")
)
resp = srp1(req, timeout=1)
assert resp is not None, "expecting an answer, got none"
check_ip_checksum(resp)
assert TCP in resp, "expecting TCP, got %r" % resp.summary()
tcp = resp[TCP]
assert tcp.flags == "A", "expecting TCP flag A, got {}".format(tcp.flags)
@test
def test_ipv6_tcp_http():
sport = 24592
dports = [80, 443, 5000, 53228]
for dport in dports:
seq_init = int(RandInt())
syn = (
Ether(dst=MAC_ADDR)
/ IPv6(dst=IPV6_ADDR)
/ TCP(flags="S", sport=sport, dport=dport, seq=seq_init)
)
syn_ack = srp1(syn, timeout=1)
assert syn_ack is not None, "expecting answer, got nothing"
check_ipv6_checksum(syn_ack)
assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary()
syn_ack = syn_ack[TCP]
assert syn_ack.flags == "SA"
ack = (
Ether(dst=MAC_ADDR)
/ IPv6(dst=IPV6_ADDR)
/ TCP(
flags="A",
sport=sport,
dport=dport,
seq=seq_init + 1,
ack=syn_ack.seq + 1,
)
)
_ = srp1(ack, timeout=1)
req = (
Ether(dst=MAC_ADDR)
/ IPv6(dst=IPV6_ADDR)
/ TCP(
flags="PA",
sport=sport,
dport=dport,
seq=seq_init + 1,
ack=syn_ack.seq + 1,
)
/ Raw("GET / HTTP/1.1\r\n\r\n")
)
resp = srp1(req, timeout=1)
assert resp is not None, "expecting answer, got nothing"
check_ipv6_checksum(resp)
assert TCP in resp, "expecting TCP, got %r" % resp.summary()
tcp = resp[TCP]
assert tcp.payload.load.startswith(b"HTTP/1.1 401 Unauthorized\n")
@test
def test_ipv4_udp_http():
sport = 24592
dports = [80, 443, 5000, 53228]
for dport in dports:
req = (
Ether(dst=MAC_ADDR)
/ IP(dst=IPV4_ADDR)
/ UDP(sport=sport, dport=dport)
/ Raw("GET / HTTP/1.1\r\n\r\n")
)
resp = srp1(req, timeout=1)
assert resp is not None, "expecting answer, got nothing"
check_ip_checksum(resp)
assert UDP in resp
udp = resp[UDP]
assert udp.payload.load.startswith(b"HTTP/1.1 401 Unauthorized\n")
@test
def test_ipv6_udp_http():
sport = 24592
dports = [80, 443, 5000, 53228]
for dport in dports:
req = (
Ether(dst=MAC_ADDR)
/ IPv6(dst=IPV6_ADDR)
/ UDP(sport=sport, dport=dport)
/ Raw("GET / HTTP/1.1\r\n\r\n")
)
resp = srp1(req, timeout=1)
assert resp is not None, "expecting answer, got nothing"
check_ipv6_checksum(resp)
assert UDP in resp
udp = resp[UDP]
assert udp.payload.load.startswith(b"HTTP/1.1 401 Unauthorized\n")
@test
def test_ipv4_tcp_http_ko():
sport = 24592
dports = [80, 443, 5000, 53228]
for dport in dports:
seq_init = int(RandInt())
syn = (
Ether(dst=MAC_ADDR)
/ IP(dst=IPV4_ADDR)
/ TCP(flags="S", sport=sport, dport=dport, seq=seq_init)
)
syn_ack = srp1(syn, timeout=1)
assert syn_ack is not None, "expecting answer, got nothing"
check_ip_checksum(syn_ack)
assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary()
syn_ack = syn_ack[TCP]
assert syn_ack.flags == "SA"
ack = (
Ether(dst=MAC_ADDR)
/ IP(dst=IPV4_ADDR)
/ TCP(
flags="A",
sport=sport,
dport=dport,
seq=seq_init + 1,
ack=syn_ack.seq + 1,
)
)
_ = srp1(ack, timeout=1)
req = (
Ether(dst=MAC_ADDR)
/ IP(dst=IPV4_ADDR)
/ TCP(
flags="PA",
sport=sport,
dport=dport,
seq=seq_init + 1,
ack=syn_ack.seq + 1,
)
/ Raw(bytes.fromhex("4f5054494f4e53"))
)
resp = srp1(req, timeout=1)
assert resp is not None, "expecting answer, got nothing"
check_ip_checksum(resp)
assert TCP in resp, "expecting TCP, got %r" % resp.summary()
assert "P" not in resp[TCP].flags
assert len(resp[TCP].payload) == 0
@test
def test_ipv4_udp_http_ko():
sport = 24592
dports = [80, 443, 5000, 53228]
for dport in dports:
req = (
Ether(dst=MAC_ADDR)
/ IP(dst=IPV4_ADDR)
/ UDP(sport=sport, dport=dport)
/ Raw(bytes.fromhex("4f5054494f4e53"))
)
resp = srp1(req, timeout=1)
assert resp is None, "expecting no answer, got one"
@test
def test_ipv6_tcp_http_ko():
sport = 24592
dports = [80, 443, 5000, 53228]
for dport in dports:
seq_init = int(RandInt())
syn = (
Ether(dst=MAC_ADDR)
/ IPv6(dst=IPV6_ADDR)
/ TCP(flags="S", sport=sport, dport=dport, seq=seq_init)
)
syn_ack = srp1(syn, timeout=1)
assert syn_ack is not None, "expecting answer, got nothing"
check_ipv6_checksum(syn_ack)
assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary()
syn_ack = syn_ack[TCP]
assert syn_ack.flags == "SA"
ack = (
Ether(dst=MAC_ADDR)
/ IPv6(dst=IPV6_ADDR)
/ TCP(
flags="A",
sport=sport,
dport=dport,
seq=seq_init + 1,
ack=syn_ack.seq + 1,
)
)
_ = srp1(ack, timeout=1)
req = (
Ether(dst=MAC_ADDR)
/ IPv6(dst=IPV6_ADDR)
/ TCP(
flags="PA",
sport=sport,
dport=dport,
seq=seq_init + 1,
ack=syn_ack.seq + 1,
)
/ Raw(bytes.fromhex("4f5054494f4e53"))
)
resp = srp1(req, timeout=1)
assert resp is not None, "expecting answer, got nothing"
check_ipv6_checksum(resp)
assert TCP in resp, "expecting TCP, got %r" % resp.summary()
assert "P" not in resp[TCP].flags
assert len(resp[TCP].payload) == 0
@test
def test_ipv6_udp_http_ko():
sport = 24592
dports = [80, 443, 5000, 53228]
for dport in dports:
req = (
Ether(dst=MAC_ADDR)
/ IPv6(dst=IPV6_ADDR)
/ UDP(sport=sport, dport=dport)
/ Raw(bytes.fromhex("4f5054494f4e53"))
)
resp = srp1(req, timeout=1)
assert resp is None, "expecting no answer, got one"

45
test/src/tests/icmpv4.py Normal file
View file

@ -0,0 +1,45 @@
# This file is part of masscanned.
# Copyright 2021 - The IVRE project
#
# Masscanned is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Masscanned is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
# License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Masscanned. If not, see <http://www.gnu.org/licenses/>.
from scapy.layers.inet import IP, ICMP
from scapy.layers.l2 import Ether
from scapy.packet import Raw
from scapy.sendrecv import srp1
from ..conf import IPV4_ADDR, MAC_ADDR
from ..core import test, check_ip_checksum
@test
def test_icmpv4_echo_req():
##### ICMPv4 #####
icmp_req = (
Ether(dst=MAC_ADDR)
/ IP(dst=IPV4_ADDR)
/ ICMP(type=8, code=0)
/ Raw("idrinkwaytoomuchcoffee")
)
icmp_repl = srp1(icmp_req, timeout=1)
assert icmp_repl is not None, "expecting answer, got nothing"
check_ip_checksum(icmp_repl)
assert ICMP in icmp_repl
icmp_repl = icmp_repl[ICMP]
# check answer
## type is "echo-reply"
assert icmp_repl.type == 0
assert icmp_repl.code == 0
## data is the same as sent
assert icmp_repl.load == icmp_req.load

87
test/src/tests/icmpv6.py Normal file
View file

@ -0,0 +1,87 @@
# This file is part of masscanned.
# Copyright 2021 - The IVRE project
#
# Masscanned is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Masscanned is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
# License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Masscanned. If not, see <http://www.gnu.org/licenses/>.
from scapy.layers.inet6 import (
ICMPv6EchoReply,
ICMPv6EchoRequest,
ICMPv6NDOptDstLLAddr,
ICMPv6ND_NA,
ICMPv6ND_NS,
IPv6,
)
from scapy.layers.l2 import Ether
from scapy.sendrecv import srp1
from ..conf import IPV6_ADDR, MAC_ADDR
from ..core import test, multicast
@test
def test_icmpv6_neighbor_solicitation():
##### IPv6 Neighbor Solicitation #####
for mac in [
"ff:ff:ff:ff:ff:ff",
"33:33:00:00:00:01",
MAC_ADDR,
multicast(IPV6_ADDR),
]:
nd_ns = Ether(dst=mac) / IPv6() / ICMPv6ND_NS(tgt=IPV6_ADDR)
nd_na = srp1(nd_ns, timeout=1)
assert nd_na is not None, "expecting answer, got nothing"
assert ICMPv6ND_NA in nd_na
nd_na = nd_na[ICMPv6ND_NA]
# check answer content
assert nd_na.code == 0
assert nd_na.R == 0
assert nd_na.S == 1
assert nd_na.O == 1 # noqa: E741
assert nd_na.tgt == IPV6_ADDR
# check ND Option
assert nd_na.haslayer(ICMPv6NDOptDstLLAddr)
assert nd_na.getlayer(ICMPv6NDOptDstLLAddr).lladdr == MAC_ADDR
for mac in ["00:00:00:00:00:00", "33:33:33:00:00:01"]:
nd_ns = Ether(dst="ff:ff:ff:ff:ff:ff") / IPv6() / ICMPv6ND_NS(tgt=IPV6_ADDR)
nd_na = srp1(nd_ns, timeout=1)
assert nd_na is not None, "expecting no answer, got one"
@test
def test_icmpv6_neighbor_solicitation_other_ip():
##### IPv6 Neighbor Solicitation #####
nd_ns = (
Ether(dst="ff:ff:ff:ff:ff:ff")
/ IPv6()
/ ICMPv6ND_NS(tgt="2020:4141:3030:2020::bdbd")
)
nd_na = srp1(nd_ns, timeout=1)
assert nd_na is None, "responding to ND_NS for other IP addresses"
@test
def test_icmpv6_echo_req():
##### IPv6 Ping #####
echo_req = (
Ether(dst=MAC_ADDR)
/ IPv6(dst=IPV6_ADDR)
/ ICMPv6EchoRequest(data="waytoomanynapkins")
)
echo_repl = srp1(echo_req, timeout=1)
assert echo_repl is not None, "expecting answer, got nothing"
assert ICMPv6EchoReply in echo_repl
echo_repl = echo_repl[ICMPv6EchoReply]
# check answer content
assert echo_repl.code == 0
assert echo_repl.data == echo_req.data

50
test/src/tests/ip.py Normal file
View file

@ -0,0 +1,50 @@
# This file is part of masscanned.
# Copyright 2021 - The IVRE project
#
# Masscanned is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Masscanned is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
# License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Masscanned. If not, see <http://www.gnu.org/licenses/>.
from scapy.layers.inet import IP, ICMP
from scapy.layers.l2 import Ether
from scapy.sendrecv import srp1
from ..conf import IPV4_ADDR, MAC_ADDR
from ..core import test, check_ip_checksum
@test
def test_ipv4_req():
##### IP #####
ip_req = Ether(dst=MAC_ADDR) / IP(dst=IPV4_ADDR, id=0x1337) / ICMP(type=8, code=0)
ip_repl = srp1(ip_req, timeout=1)
assert ip_repl is not None, "expecting answer, got nothing"
check_ip_checksum(ip_repl)
assert IP in ip_repl, "no IP layer in response"
ip_repl = ip_repl[IP]
assert ip_repl.id == 0, "IP identification unexpected"
@test
def test_eth_req_other_mac():
#### ETH ####
ip_req = Ether(dst="00:00:00:11:11:11") / IP(dst=IPV4_ADDR) / ICMP(type=8, code=0)
ip_repl = srp1(ip_req, timeout=1)
assert ip_repl is None, "responding to other MAC addresses"
@test
def test_ipv4_req_other_ip():
##### IP #####
ip_req = Ether(dst=MAC_ADDR) / IP(dst="1.2.3.4") / ICMP(type=8, code=0)
ip_repl = srp1(ip_req, timeout=1)
assert ip_repl is None, "responding to other IP addresses"

99
test/src/tests/rpc.py Normal file
View file

@ -0,0 +1,99 @@
# This file is part of masscanned.
# Copyright 2021 - The IVRE project
#
# Masscanned is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Masscanned is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
# License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Masscanned. If not, see <http://www.gnu.org/licenses/>.
from subprocess import check_call
from tempfile import NamedTemporaryFile
import json
import os
import re
from ivre.db import DBNmap
from ..conf import IPV4_ADDR
from ..core import test
@test
def test_rpc_nmap():
for scan in "SU":
with NamedTemporaryFile(delete=False) as xml_result:
check_call(
[
"nmap",
"-n",
"-vv",
"-oX",
"-",
IPV4_ADDR,
f"-s{scan}V",
"-p",
"111",
"--script",
"rpcinfo,rpc-grind",
],
stdout=xml_result,
)
with NamedTemporaryFile(delete=False, mode="w") as json_result:
DBNmap(output=json_result).store_scan(xml_result.name)
os.unlink(xml_result.name)
with open(json_result.name) as fdesc:
results = [json.loads(line) for line in fdesc]
os.unlink(json_result.name)
assert len(results) == 1, f"Expected 1 result, got {len(results)}"
result = results[0]
assert len(result["ports"]) == 1, f"Expected 1 port, got {len(result['ports'])}"
port = result["ports"][0]
assert port["port"] == 111 and port["protocol"] == (
"tcp" if scan == "S" else "udp"
)
assert port["service_name"] in {"rpcbind", "nfs"}
assert port["service_extrainfo"] in {"RPC #100000", "RPC #100003"}
assert (
len(port["scripts"]) == 1
), f"Expected 1 script, got {len(port['scripts'])}"
script = port["scripts"][0]
assert script["id"] == "rpcinfo", "Expected rpcinfo script, not found"
assert len(script["rpcinfo"]) == 1
@test
def test_rpcinfo():
with NamedTemporaryFile(delete=False) as rpcout:
check_call(["rpcinfo", "-p", IPV4_ADDR], stdout=rpcout)
with open(rpcout.name) as fdesc:
found = []
for line in fdesc:
line = line.split()
if line[0] == "program":
# header
continue
assert line[0] == "100000", f"Expected program 100000, got {line[0]}"
found.append(int(line[1]))
assert len(found) == 3, f"Expected three versions, got {found}"
for i in range(2, 5):
assert i in found, f"Missing version {i} in {found}"
os.unlink(rpcout.name)
with NamedTemporaryFile(delete=False) as rpcout:
check_call(["rpcinfo", "-u", IPV4_ADDR, "100000"], stdout=rpcout)
with open(rpcout.name) as fdesc:
found = []
expr = re.compile("^program 100000 version ([0-9]) ready and waiting$")
for line in fdesc:
found.append(int(expr.search(line.strip()).group(1)))
assert len(found) == 3, f"Expected three versions, got {found}"
for i in range(2, 5):
assert i in found, f"Missing version {i} in {found}"
os.unlink(rpcout.name)

65
test/src/tests/smb.py Normal file
View file

@ -0,0 +1,65 @@
# This file is part of masscanned.
# Copyright 2021 - The IVRE project
#
# Masscanned is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Masscanned is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
# License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Masscanned. If not, see <http://www.gnu.org/licenses/>.
import subprocess
from ..core import test
from ..conf import IPV4_ADDR
@test
def test_smb1_network_req():
proc = subprocess.Popen(
[
"smbclient",
"-U ''",
"-N",
"-d 6",
"-t 1",
"-L",
IPV4_ADDR,
"--option=client min protocol=NT1",
],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
)
out, _ = proc.communicate()
assert f"Connecting to {IPV4_ADDR} at port 445" in out, "\n" + out
assert "session request ok" in out, "\n" + out
assert f"negotiated dialect[NT1] against server[{IPV4_ADDR}]" in out, "\n" + out
@test
def test_smb2_network_req():
proc = subprocess.Popen(
[
"smbclient",
"-U ''",
"-N",
"-d 5",
"-t 1",
"-L",
IPV4_ADDR,
],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
)
out, _ = proc.communicate()
assert f"Connecting to {IPV4_ADDR} at port 445" in out, "\n" + out
assert "session request ok" in out, "\n" + out
assert f"negotiated dialect[SMB2_02] against server[{IPV4_ADDR}]" in out, "\n" + out

217
test/src/tests/ssh.py Normal file
View file

@ -0,0 +1,217 @@
# This file is part of masscanned.
# Copyright 2021 - The IVRE project
#
# Masscanned is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Masscanned is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
# License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Masscanned. If not, see <http://www.gnu.org/licenses/>.
from scapy.layers.inet import IP, TCP, UDP
from scapy.layers.inet6 import IPv6
from scapy.layers.l2 import Ether
from scapy.packet import Raw
from scapy.sendrecv import srp1
from scapy.volatile import RandInt
from ..conf import IPV4_ADDR, IPV6_ADDR, MAC_ADDR
from ..core import test, check_ip_checksum, check_ipv6_checksum
@test
def test_ipv4_tcp_ssh():
sport = 37183
dports = [22, 80, 2222, 2022, 23874, 50000]
for i, dport in enumerate(dports):
seq_init = int(RandInt())
banner = [
b"SSH-2.0-AsyncSSH_2.1.0",
b"SSH-2.0-PuTTY",
b"SSH-2.0-libssh2_1.4.3",
b"SSH-2.0-Go",
b"SSH-2.0-PUTTY",
][i % 5]
syn = (
Ether(dst=MAC_ADDR)
/ IP(dst=IPV4_ADDR)
/ TCP(flags="S", sport=sport, dport=dport, seq=seq_init)
)
syn_ack = srp1(syn, timeout=1)
assert syn_ack is not None, "expecting answer, got nothing"
check_ip_checksum(syn_ack)
assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary()
syn_ack = syn_ack[TCP]
assert syn_ack.flags == "SA"
ack = (
Ether(dst=MAC_ADDR)
/ IP(dst=IPV4_ADDR)
/ TCP(
flags="A",
sport=sport,
dport=dport,
seq=seq_init + 1,
ack=syn_ack.seq + 1,
)
)
_ = srp1(ack, timeout=1)
req = (
Ether(dst=MAC_ADDR)
/ IP(dst=IPV4_ADDR)
/ TCP(
flags="PA",
sport=sport,
dport=dport,
seq=seq_init + 1,
ack=syn_ack.seq + 1,
)
/ Raw(banner + b"\r\n")
)
resp = srp1(req, timeout=1)
assert resp is not None, "expecting answer, got nothing"
check_ip_checksum(resp)
assert TCP in resp, "expecting TCP, got %r" % resp.summary()
tcp = resp[TCP]
assert "A" in tcp.flags, "expecting ACK flag, not set (%r)" % tcp.flags
assert "P" in tcp.flags, "expecting PSH flag, not set (%r)" % tcp.flags
assert len(tcp.payload) > 0, "expecting payload, got none"
assert tcp.payload.load.startswith(b"SSH-2.0-"), (
"unexpected banner: %r" % tcp.payload.load
)
assert tcp.payload.load.endswith(b"\r\n"), (
"unexpected banner: %r" % tcp.payload.load
)
@test
def test_ipv4_udp_ssh():
sport = 37183
dports = [22, 80, 2222, 2022, 23874, 50000]
for i, dport in enumerate(dports):
banner = [
b"SSH-2.0-AsyncSSH_2.1.0",
b"SSH-2.0-PuTTY",
b"SSH-2.0-libssh2_1.4.3",
b"SSH-2.0-Go",
b"SSH-2.0-PUTTY",
][i % 5]
req = (
Ether(dst=MAC_ADDR)
/ IP(dst=IPV4_ADDR)
/ UDP(sport=sport, dport=dport)
/ Raw(banner + b"\r\n")
)
resp = srp1(req, timeout=1)
assert resp is not None, "expecting answer, got nothing"
check_ip_checksum(resp)
assert UDP in resp
udp = resp[UDP]
assert len(udp.payload) > 0, "expecting payload, got none"
assert udp.payload.load.startswith(b"SSH-2.0-"), (
"unexpected banner: %r" % udp.payload.load
)
assert udp.payload.load.endswith(b"\r\n"), (
"unexpected banner: %r" % udp.payload.load
)
@test
def test_ipv6_tcp_ssh():
sport = 37183
dports = [22, 80, 2222, 2022, 23874, 50000]
for i, dport in enumerate(dports):
seq_init = int(RandInt())
banner = [
b"SSH-2.0-AsyncSSH_2.1.0",
b"SSH-2.0-PuTTY",
b"SSH-2.0-libssh2_1.4.3",
b"SSH-2.0-Go",
b"SSH-2.0-PUTTY",
][i % 5]
syn = (
Ether(dst=MAC_ADDR)
/ IPv6(dst=IPV6_ADDR)
/ TCP(flags="S", sport=sport, dport=dport, seq=seq_init)
)
syn_ack = srp1(syn, timeout=1)
assert syn_ack is not None, "expecting answer, got nothing"
check_ipv6_checksum(syn_ack)
assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary()
syn_ack = syn_ack[TCP]
assert syn_ack.flags == "SA"
ack = (
Ether(dst=MAC_ADDR)
/ IPv6(dst=IPV6_ADDR)
/ TCP(
flags="A",
sport=sport,
dport=dport,
seq=seq_init + 1,
ack=syn_ack.seq + 1,
)
)
_ = srp1(ack, timeout=1)
req = (
Ether(dst=MAC_ADDR)
/ IPv6(dst=IPV6_ADDR)
/ TCP(
flags="PA",
sport=sport,
dport=dport,
seq=seq_init + 1,
ack=syn_ack.seq + 1,
)
/ Raw(banner + b"\r\n")
)
resp = srp1(req, timeout=1)
assert resp is not None, "expecting answer, got nothing"
check_ipv6_checksum(resp)
assert TCP in resp, "expecting TCP, got %r" % resp.summary()
tcp = resp[TCP]
assert "A" in tcp.flags, "expecting ACK flag, not set (%r)" % tcp.flags
assert "P" in tcp.flags, "expecting PSH flag, not set (%r)" % tcp.flags
assert len(tcp.payload) > 0, "expecting payload, got none"
assert tcp.payload.load.startswith(b"SSH-2.0-"), (
"unexpected banner: %r" % tcp.payload.load
)
assert tcp.payload.load.endswith(b"\r\n"), (
"unexpected banner: %r" % tcp.payload.load
)
@test
def test_ipv6_udp_ssh():
sport = 37183
dports = [22, 80, 2222, 2022, 23874, 50000]
for i, dport in enumerate(dports):
banner = [
b"SSH-2.0-AsyncSSH_2.1.0",
b"SSH-2.0-PuTTY",
b"SSH-2.0-libssh2_1.4.3",
b"SSH-2.0-Go",
b"SSH-2.0-PUTTY",
][i % 5]
req = (
Ether(dst=MAC_ADDR)
/ IPv6(dst=IPV6_ADDR)
/ UDP(sport=sport, dport=dport)
/ Raw(banner + b"\r\n")
)
resp = srp1(req, timeout=1)
assert resp is not None, "expecting answer, got nothing"
check_ipv6_checksum(resp)
assert UDP in resp
udp = resp[UDP]
assert len(udp.payload) > 0, "expecting payload, got none"
assert udp.payload.load.startswith(b"SSH-2.0-"), (
"unexpected banner: %r" % udp.payload.load
)
assert udp.payload.load.endswith(b"\r\n"), (
"unexpected banner: %r" % udp.payload.load
)

196
test/src/tests/stun.py Normal file
View file

@ -0,0 +1,196 @@
# This file is part of masscanned.
# Copyright 2021 - The IVRE project
#
# Masscanned is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Masscanned is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
# License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Masscanned. If not, see <http://www.gnu.org/licenses/>.
from socket import AF_INET6
import struct
from scapy.layers.inet import IP, UDP
from scapy.layers.inet6 import IPv6
from scapy.layers.l2 import Ether
from scapy.packet import Raw
from scapy.pton_ntop import inet_pton
from scapy.sendrecv import srp1
from ..conf import IPV4_ADDR, IPV6_ADDR, MAC_ADDR
from ..core import test, check_ip_checksum, check_ipv6_checksum
@test
def test_ipv4_udp_stun():
sports = [12345, 55555, 80, 43273]
dports = [80, 800, 8000, 3478]
payload = bytes.fromhex("000100002112a442000000000000000000000000")
for sport in sports:
for dport in dports:
req = (
Ether(dst=MAC_ADDR)
/ IP(dst=IPV4_ADDR)
/ UDP(sport=sport, dport=dport)
/ Raw(payload)
)
resp = srp1(req, timeout=1)
assert resp is not None, "expecting answer, got nothing"
check_ip_checksum(resp)
assert UDP in resp, "no UDP layer found"
udp = resp[UDP]
assert udp.sport == dport, "unexpected UDP sport: {}".format(udp.sport)
assert udp.dport == sport, "unexpected UDP dport: {}".format(udp.dport)
resp_payload = udp.payload.load
type_, length, magic = struct.unpack(">HHI", resp_payload[:8])
tid = resp_payload[8:20]
data = resp_payload[20:]
assert type_ == 0x0101, "expected type 0X0101, got 0x{:04x}".format(type_)
assert length == 12, "expected length 12, got {}".format(length)
assert (
magic == 0x2112A442
), "expected magic 0x2112a442, got 0x{:08x}".format(magic)
assert (
tid == b"\x00" * 12
), "expected tid 0x000000000000000000000000, got {:x}".format(tid)
expected_data = b"\x00\x01\x00\x08\x00\x01" + struct.pack(
">HBBBB", sport, 192, 0, 0, 0
)
assert (
data == expected_data
), f"unexpected data {data!r} != {expected_data!r}"
@test
def test_ipv6_udp_stun():
sports = [12345, 55555, 80, 43273]
dports = [80, 800, 8000, 3478]
payload = bytes.fromhex("000100002112a442000000000000000000000000")
for sport in sports:
for dport in dports:
req = (
Ether(dst=MAC_ADDR)
/ IPv6(dst=IPV6_ADDR)
/ UDP(sport=sport, dport=dport)
/ Raw(payload)
)
resp = srp1(req, timeout=1)
assert resp is not None, "expecting answer, got nothing"
check_ipv6_checksum(resp)
assert UDP in resp
udp = resp[UDP]
assert udp.sport == dport
assert udp.dport == sport
resp_payload = udp.payload.load
type_, length, magic = struct.unpack(">HHI", resp_payload[:8])
tid = resp_payload[8:20]
data = resp_payload[20:]
assert type_ == 0x0101, "expected type 0X0101, got 0x{:04x}".format(type_)
assert length == 24, "expected length 24, got {}".format(length)
assert (
magic == 0x2112A442
), "expected magic 0x2112a442, got 0x{:08x}".format(magic)
assert (
tid == b"\x00" * 12
), "expected tid 0x000000000000000000000000, got {:x}".format(tid)
expected_data = (
bytes.fromhex("000100140002")
+ struct.pack(">H", sport)
+ inet_pton(AF_INET6, "2001:41d0::1234:5678")
)
assert data == expected_data, "unexpected data: {}".format(data)
@test
def test_ipv4_udp_stun_change_port():
sports = [12345, 55555, 80, 43273]
dports = [80, 800, 8000, 3478, 65535]
payload = bytes.fromhex("0001000803a3b9464dd8eb75e19481474293845c0003000400000002")
for sport in sports:
for dport in dports:
req = (
Ether(dst=MAC_ADDR)
/ IP(dst=IPV4_ADDR)
/ UDP(sport=sport, dport=dport)
/ Raw(payload)
)
resp = srp1(req, timeout=1)
assert resp is not None, "expecting answer, got nothing"
check_ip_checksum(resp)
assert UDP in resp, "no UDP layer found"
udp = resp[UDP]
assert (
udp.sport == (dport + 1) % 2**16
), "expected answer from UDP/{}, got it from UDP/{}".format(
(dport + 1) % 2**16, udp.sport
)
assert (
udp.dport == sport
), "expected answer to UDP/{}, got it to UDP/{}".format(sport, udp.dport)
resp_payload = udp.payload.load
type_, length = struct.unpack(">HH", resp_payload[:4])
tid = resp_payload[4:20]
data = resp_payload[20:]
assert type_ == 0x0101, "expected type 0X0101, got 0x{:04x}".format(type_)
assert length == 12, "expected length 12, got {}".format(length)
assert tid == bytes.fromhex("03a3b9464dd8eb75e19481474293845c"), (
"expected tid 0x03a3b9464dd8eb75e19481474293845c, got %r" % tid
)
expected_data = b"\x00\x01\x00\x08\x00\x01" + struct.pack(
">HBBBB", sport, 192, 0, 0, 0
)
assert (
data == expected_data
), f"unexpected data {data!r} != {expected_data!r}"
@test
def test_ipv6_udp_stun_change_port():
sports = [12345, 55555, 80, 43273]
dports = [80, 800, 8000, 3478, 65535]
payload = bytes.fromhex("0001000803a3b9464dd8eb75e19481474293845c0003000400000002")
for sport in sports:
for dport in dports:
req = (
Ether(dst=MAC_ADDR)
/ IPv6(dst=IPV6_ADDR)
/ UDP(sport=sport, dport=dport)
/ Raw(payload)
)
resp = srp1(req, timeout=1)
assert resp is not None, "expecting answer, got nothing"
check_ipv6_checksum(resp)
assert UDP in resp, "expecting UDP layer in answer, got nothing"
udp = resp[UDP]
assert (
udp.sport == (dport + 1) % 2**16
), "expected answer from UDP/{}, got it from UDP/{}".format(
(dport + 1) % 2**16, udp.sport
)
assert (
udp.dport == sport
), "expected answer to UDP/{}, got it to UDP/{}".format(sport, udp.dport)
resp_payload = udp.payload.load
type_, length = struct.unpack(">HH", resp_payload[:4])
tid = resp_payload[4:20]
data = resp_payload[20:]
assert type_ == 0x0101, "expected type 0X0101, got 0x{:04x}".format(type_)
assert length == 24, "expected length 12, got {}".format(length)
assert tid == bytes.fromhex("03a3b9464dd8eb75e19481474293845c"), (
"expected tid 0x03a3b9464dd8eb75e19481474293845c, got %r" % tid
)
expected_data = (
bytes.fromhex("000100140002")
+ struct.pack(">H", sport)
+ inet_pton(AF_INET6, "2001:41d0::1234:5678")
)
assert (
data == expected_data
), f"unexpected data {data!r} != {expected_data!r}"

263
test/src/tests/tcp.py Normal file
View file

@ -0,0 +1,263 @@
# This file is part of masscanned.
# Copyright 2021 - The IVRE project
#
# Masscanned is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Masscanned is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
# License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Masscanned. If not, see <http://www.gnu.org/licenses/>.
from scapy.layers.inet import IP, TCP
from scapy.layers.inet6 import IPv6
from scapy.layers.l2 import Ether
from scapy.packet import Raw
from scapy.sendrecv import srp1
from scapy.volatile import RandInt
from ..conf import IPV4_ADDR, IPV6_ADDR, MAC_ADDR
from ..core import test, check_ip_checksum, check_ipv6_checksum
@test
def test_tcp_syn():
##### SYN-ACK #####
# test a list of ports, randomly generated once
ports_to_test = [
1152,
2003,
2193,
3709,
4054,
6605,
6737,
6875,
7320,
8898,
9513,
9738,
10623,
10723,
11253,
12125,
12189,
12873,
14648,
14659,
16242,
16243,
17209,
17492,
17667,
17838,
18081,
18682,
18790,
19124,
19288,
19558,
19628,
19789,
20093,
21014,
21459,
21740,
24070,
24312,
24576,
26939,
27136,
27165,
27361,
29971,
31088,
33011,
33068,
34990,
35093,
35958,
36626,
36789,
37130,
37238,
37256,
37697,
37890,
38958,
42131,
43864,
44420,
44655,
44868,
45157,
46213,
46497,
46955,
49049,
49067,
49452,
49480,
50498,
50945,
51181,
52890,
53301,
53407,
53417,
53980,
55827,
56483,
58552,
58713,
58836,
59362,
59560,
60534,
60555,
60660,
61615,
62402,
62533,
62941,
63240,
63339,
63616,
64380,
65438,
]
for p in ports_to_test:
seq_init = int(RandInt())
syn = (
Ether(dst=MAC_ADDR)
/ IP(dst=IPV4_ADDR)
/ TCP(flags="S", dport=p, seq=seq_init)
)
syn_ack = srp1(syn, timeout=1)
assert syn_ack is not None, "expecting answer, got nothing"
check_ip_checksum(syn_ack)
assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary()
syn_ack = syn_ack[TCP]
assert syn_ack.flags == "SA", "expecting TCP SA, got %r" % syn_ack.flags
assert syn_ack.ack == seq_init + 1, "wrong TCP ack value (%r != %r)" % (
syn_ack.ack,
seq_init + 1,
)
@test
def test_ipv4_tcp_psh_ack():
##### PSH-ACK #####
sport = 26695
port = 445
seq_init = int(RandInt())
# send PSH-ACK first
psh_ack = (
Ether(dst=MAC_ADDR)
/ IP(dst=IPV4_ADDR)
/ TCP(flags="PA", sport=sport, dport=port, seq=seq_init)
/ Raw("payload")
)
syn_ack = srp1(psh_ack, timeout=1)
assert syn_ack is None, "no answer expected, got one"
# test the anti-injection mechanism
seq_init = int(RandInt())
syn = (
Ether(dst=MAC_ADDR)
/ IP(dst=IPV4_ADDR)
/ TCP(flags="S", sport=sport, dport=port, seq=seq_init)
)
syn_ack = srp1(syn, timeout=1)
assert syn_ack is not None, "expecting answer, got nothing"
check_ip_checksum(syn_ack)
assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary()
syn_ack = syn_ack[TCP]
assert syn_ack.flags == "SA", "expecting TCP SA, got %r" % syn_ack.flags
assert syn_ack.ack == seq_init + 1, "wrong TCP ack value (%r != %r)" % (
syn_ack.ack,
seq_init + 1,
)
ack = Ether(dst=MAC_ADDR) / IP(dst=IPV4_ADDR) / TCP(flags="A", dport=port)
# should fail because no ack given
psh_ack = (
Ether(dst=MAC_ADDR)
/ IP(dst=IPV4_ADDR)
/ TCP(flags="PA", sport=sport, dport=port, ack=0, seq=seq_init + 1)
)
ack = srp1(psh_ack, timeout=1)
assert ack is None, "no answer expected, got one"
# should get an answer this time
psh_ack = (
Ether(dst=MAC_ADDR)
/ IP(dst=IPV4_ADDR)
/ TCP(
flags="PA", sport=sport, dport=port, ack=syn_ack.seq + 1, seq=seq_init + 1
)
)
ack = srp1(psh_ack, timeout=1)
assert ack is not None, "expecting answer, got nothing"
check_ip_checksum(ack)
assert TCP in ack, "expecting TCP, got %r" % ack.summary()
ack = ack[TCP]
assert ack.flags == "A", "expecting TCP A, got %r" % syn_ack.flags
@test
def test_ipv6_tcp_psh_ack():
##### PSH-ACK #####
sport = 26695
port = 445
seq_init = int(RandInt())
# send PSH-ACK first
psh_ack = (
Ether(dst=MAC_ADDR)
/ IPv6(dst=IPV6_ADDR)
/ TCP(flags="PA", sport=sport, dport=port, seq=seq_init)
/ Raw("payload")
)
syn_ack = srp1(psh_ack, timeout=1)
assert syn_ack is None, "no answer expected, got one"
# test the anti-injection mechanism
syn = (
Ether(dst=MAC_ADDR)
/ IPv6(dst=IPV6_ADDR)
/ TCP(flags="S", sport=sport, dport=port, seq=seq_init)
)
syn_ack = srp1(syn, timeout=1)
assert syn_ack is not None, "expecting answer, got nothing"
check_ipv6_checksum(syn_ack)
assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary()
syn_ack = syn_ack[TCP]
assert syn_ack.flags == "SA", "expecting TCP SA, got %r" % syn_ack.flags
assert syn_ack.ack == seq_init + 1, "wrong TCP ack value (%r != %r)" % (
syn_ack.ack,
seq_init + 1,
)
ack = Ether(dst=MAC_ADDR) / IPv6(dst=IPV6_ADDR) / TCP(flags="A", dport=port)
# should fail because no ack given
psh_ack = (
Ether(dst=MAC_ADDR)
/ IPv6(dst=IPV6_ADDR)
/ TCP(flags="PA", sport=sport, dport=port, ack=0, seq=seq_init + 1)
)
ack = srp1(psh_ack, timeout=1)
assert ack is None, "no answer expected, got one"
# should get an answer this time
psh_ack = (
Ether(dst=MAC_ADDR)
/ IPv6(dst=IPV6_ADDR)
/ TCP(
flags="PA", sport=sport, dport=port, ack=syn_ack.seq + 1, seq=seq_init + 1
)
)
ack = srp1(psh_ack, timeout=1)
assert ack is not None, "expecting answer, got nothing"
check_ipv6_checksum(ack)
assert TCP in ack, "expecting TCP, got %r" % ack.summary()
ack = ack[TCP]
assert ack.flags == "A", "expecting TCP A, got %r" % syn_ack.flags

View file

@ -18,7 +18,6 @@
import atexit import atexit
import functools import functools
import logging
import os import os
from signal import SIGINT from signal import SIGINT
import subprocess import subprocess
@ -39,16 +38,6 @@ from src.all import test_all
from src.conf import IPV4_ADDR, IPV6_ADDR, MAC_ADDR, OUTDIR from src.conf import IPV4_ADDR, IPV6_ADDR, MAC_ADDR, OUTDIR
def setup_logs():
ch = logging.StreamHandler()
ch.setFormatter(logging.Formatter("%(levelname)s\t%(message)s"))
ch.setLevel(logging.INFO)
log = logging.getLogger(__name__)
log.setLevel(logging.INFO)
log.addHandler(ch)
return log
def cleanup_net(iface): def cleanup_net(iface):
global ipfile global ipfile
subprocess.check_call(["ip", "link", "delete", iface]) subprocess.check_call(["ip", "link", "delete", iface])
@ -77,10 +66,10 @@ def cleanup_net(iface):
def setup_net(iface): def setup_net(iface):
global IPV4_ADDR global IPV4_ADDR
# create the interfaces pair # create the interfaces pair
atexit.register(functools.partial(cleanup_net, f"{iface}a"))
subprocess.check_call( subprocess.check_call(
["ip", "link", "add", f"{iface}a", "type", "veth", "peer", f"{iface}b"] ["ip", "link", "add", f"{iface}a", "type", "veth", "peer", f"{iface}b"]
) )
atexit.register(functools.partial(cleanup_net, f"{iface}a"))
for sub in "a", "b": for sub in "a", "b":
subprocess.check_call(["ip", "link", "set", f"{iface}{sub}", "up"]) subprocess.check_call(["ip", "link", "set", f"{iface}{sub}", "up"])
subprocess.check_call(["ip", "addr", "add", "dev", f"{iface}a", "192.0.0.0/31"]) subprocess.check_call(["ip", "addr", "add", "dev", f"{iface}a", "192.0.0.0/31"])
@ -110,7 +99,6 @@ def setup_net(iface):
conf.route6.resync() conf.route6.resync()
LOG = setup_logs()
IFACE = "masscanned" IFACE = "masscanned"
setup_net(IFACE) setup_net(IFACE)
TCPDUMP = bool(os.environ.get("USE_TCPDUMP")) TCPDUMP = bool(os.environ.get("USE_TCPDUMP"))