mirror of
https://github.com/ivre/masscanned.git
synced 2025-10-02 06:38:21 +00:00
Test suite refactor
This commit is contained in:
parent
b356e52a93
commit
beefcc9185
17 changed files with 1625 additions and 1300 deletions
2
.github/workflows/test.yml
vendored
2
.github/workflows/test.yml
vendored
|
@ -87,7 +87,7 @@ jobs:
|
|||
run: sudo pip install -U flake8 black
|
||||
|
||||
- name: Install packages for tests
|
||||
run: sudo apt-get -q update && sudo apt-get -qy install nmap rpcbind
|
||||
run: sudo apt-get -q update && sudo apt-get -qy install nmap rpcbind smbclient
|
||||
|
||||
- name: Run black
|
||||
run: black -t py36 --check test/test_masscanned.py test/src/
|
||||
|
|
|
@ -290,6 +290,14 @@ tcpdump: pcap_loop: The interface disappeared
|
|||
0 packets dropped by kernel
|
||||
```
|
||||
|
||||
You can also chose what tests to run using the `TESTS` environment variable
|
||||
```
|
||||
TESTS=smb ./test/test_masscanned.py
|
||||
INFO test_smb1_network_req.............................OK
|
||||
INFO test_smb2_network_req.............................OK
|
||||
INFO Ran 2 tests with 1 errors
|
||||
```
|
||||
|
||||
## Logging
|
||||
|
||||
### Console Logger
|
||||
|
|
|
@ -20,7 +20,6 @@ use std::convert::TryInto;
|
|||
use std::time::SystemTime;
|
||||
|
||||
use crate::client::ClientInfo;
|
||||
use crate::logger::MetaLogger;
|
||||
use crate::Masscanned;
|
||||
|
||||
// NBTSession + SMB Header
|
||||
|
@ -990,6 +989,8 @@ pub fn repl_smb2<'a>(
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::logger::MetaLogger;
|
||||
|
||||
use itertools::assert_equal;
|
||||
use pnet::util::MacAddr;
|
||||
use std::str::FromStr;
|
||||
|
|
1300
test/src/all.py
1300
test/src/all.py
File diff suppressed because it is too large
Load diff
98
test/src/core.py
Normal file
98
test/src/core.py
Normal file
|
@ -0,0 +1,98 @@
|
|||
# This file is part of masscanned.
|
||||
# Copyright 2021 - The IVRE project
|
||||
#
|
||||
# Masscanned is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# Masscanned is distributed in the hope that it will be useful, but WITHOUT
|
||||
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
|
||||
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
|
||||
# License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with Masscanned. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import logging
|
||||
|
||||
from scapy.compat import raw
|
||||
from scapy.layers.inet import IP
|
||||
from scapy.layers.inet6 import IPv6
|
||||
|
||||
|
||||
def setup_logs():
|
||||
log = logging.getLogger()
|
||||
log.setLevel(logging.DEBUG)
|
||||
if not log.handlers:
|
||||
ch = logging.StreamHandler()
|
||||
ch.setFormatter(logging.Formatter("%(levelname)s\t%(message)s"))
|
||||
ch.setLevel(logging.DEBUG)
|
||||
log.addHandler(ch)
|
||||
return log
|
||||
|
||||
|
||||
LOG = setup_logs()
|
||||
TESTS = []
|
||||
ERRORS = []
|
||||
|
||||
# decorator to automatically add a function to tests
|
||||
def test(f):
|
||||
global ERRORS, TESTS
|
||||
OK = "\033[1mOK\033[0m"
|
||||
KO = "\033[1m\033[1;%dmKO\033[0m" % 31
|
||||
fname = f.__name__.ljust(50, ".")
|
||||
|
||||
def w():
|
||||
try:
|
||||
f()
|
||||
LOG.info("{}{}".format(fname, OK))
|
||||
except AssertionError as e:
|
||||
LOG.error("{}{}: {}".format(fname, KO, e))
|
||||
ERRORS.append(fname)
|
||||
|
||||
TESTS.append(w)
|
||||
return w
|
||||
|
||||
|
||||
def test_all():
|
||||
global ERRORS, TESTS
|
||||
# execute tests
|
||||
for t in TESTS:
|
||||
t()
|
||||
LOG.info(f"\033[1mRan {len(TESTS)} tests with {len(ERRORS)} errors\033[0m")
|
||||
return len(ERRORS)
|
||||
|
||||
|
||||
def multicast(ip6):
|
||||
a, b = ip6.split(":")[-2:]
|
||||
mac = ["33", "33", "ff"]
|
||||
if len(a) == 4:
|
||||
mac.append(a[2:])
|
||||
else:
|
||||
mac.append("00")
|
||||
if len(b) >= 2:
|
||||
mac.append(b[:2])
|
||||
else:
|
||||
mac.append("00")
|
||||
if len(b) >= 4:
|
||||
mac.append(b[2:])
|
||||
else:
|
||||
mac.append("00")
|
||||
return ":".join(mac)
|
||||
|
||||
|
||||
def check_ip_checksum(pkt):
|
||||
assert IP in pkt, "no IP layer found"
|
||||
ip_pkt = pkt[IP]
|
||||
chksum = ip_pkt.chksum
|
||||
del ip_pkt.chksum
|
||||
assert IP(raw(ip_pkt)).chksum == chksum, "bad IPv4 checksum"
|
||||
|
||||
|
||||
def check_ipv6_checksum(pkt):
|
||||
assert IPv6 in pkt, "no IP layer found"
|
||||
ip_pkt = pkt[IPv6]
|
||||
chksum = ip_pkt.chksum
|
||||
del ip_pkt.chksum
|
||||
assert IPv6(raw(ip_pkt)).chksum == chksum, "bad IPv6 checksum"
|
15
test/src/tests/__init__.py
Normal file
15
test/src/tests/__init__.py
Normal file
|
@ -0,0 +1,15 @@
|
|||
# This file is part of masscanned.
|
||||
# Copyright 2021 - The IVRE project
|
||||
#
|
||||
# Masscanned is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# Masscanned is distributed in the hope that it will be useful, but WITHOUT
|
||||
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
|
||||
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
|
||||
# License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with Masscanned. If not, see <http://www.gnu.org/licenses/>.
|
51
test/src/tests/arp.py
Normal file
51
test/src/tests/arp.py
Normal file
|
@ -0,0 +1,51 @@
|
|||
# This file is part of masscanned.
|
||||
# Copyright 2021 - The IVRE project
|
||||
#
|
||||
# Masscanned is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# Masscanned is distributed in the hope that it will be useful, but WITHOUT
|
||||
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
|
||||
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
|
||||
# License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with Masscanned. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from scapy.layers.l2 import Ether, ARP, ETHER_BROADCAST
|
||||
from scapy.sendrecv import srp1
|
||||
|
||||
from ..conf import IPV4_ADDR, MAC_ADDR
|
||||
from ..core import test
|
||||
|
||||
|
||||
@test
|
||||
def test_arp_req():
|
||||
##### ARP #####
|
||||
arp_req = Ether(dst=ETHER_BROADCAST) / ARP(pdst=IPV4_ADDR)
|
||||
arp_repl = srp1(arp_req, timeout=1)
|
||||
assert arp_repl is not None, "expecting answer, got nothing"
|
||||
assert ARP in arp_repl, "no ARP layer found"
|
||||
arp_repl = arp_repl[ARP]
|
||||
# check answer
|
||||
## op is "is-at"
|
||||
assert arp_repl.op == 2, "unexpected ARP op: {}".format(arp_repl.op)
|
||||
## answer for the requested IP
|
||||
assert arp_repl.psrc == arp_req.pdst, "unexpected ARP psrc: {}".format(
|
||||
arp_repl.psrc
|
||||
)
|
||||
assert arp_repl.pdst == arp_req.psrc, "unexpected ARP pdst: {}".format(
|
||||
arp_repl.pdst
|
||||
)
|
||||
## answer is expected MAC address
|
||||
assert arp_repl.hwsrc == MAC_ADDR, "unexpected ARP hwsrc: {}".format(arp_repl.hwsrc)
|
||||
|
||||
|
||||
@test
|
||||
def test_arp_req_other_ip():
|
||||
##### ARP #####
|
||||
arp_req = Ether(dst=ETHER_BROADCAST) / ARP(pdst="1.2.3.4")
|
||||
arp_repl = srp1(arp_req, timeout=1)
|
||||
assert arp_repl is None, "responding to ARP requests for other IP addresses"
|
87
test/src/tests/ghost.py
Normal file
87
test/src/tests/ghost.py
Normal file
|
@ -0,0 +1,87 @@
|
|||
# This file is part of masscanned.
|
||||
# Copyright 2021 - The IVRE project
|
||||
#
|
||||
# Masscanned is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# Masscanned is distributed in the hope that it will be useful, but WITHOUT
|
||||
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
|
||||
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
|
||||
# License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with Masscanned. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import struct
|
||||
import zlib
|
||||
|
||||
from scapy.compat import raw
|
||||
from scapy.layers.inet import IP, TCP
|
||||
from scapy.layers.l2 import Ether
|
||||
from scapy.packet import Raw
|
||||
from scapy.sendrecv import srp1
|
||||
from scapy.volatile import RandInt
|
||||
|
||||
from ..conf import IPV4_ADDR, MAC_ADDR
|
||||
from ..core import test, check_ip_checksum
|
||||
|
||||
|
||||
@test
|
||||
def test_ipv4_tcp_ghost():
|
||||
sport = 37184
|
||||
dports = [22, 23874]
|
||||
for dport in dports:
|
||||
seq_init = int(RandInt())
|
||||
banner = b"Gh0st\xad\x00\x00\x00\xe0\x00\x00\x00x\x9cKS``\x98\xc3\xc0\xc0\xc0\x06\xc4\x8c@\xbcQ\x96\x81\x81\tH\x07\xa7\x16\x95e&\xa7*\x04$&g+\x182\x94\xf6\xb000\xac\xa8rc\x00\x01\x11\xa0\x82\x1f\\`&\x83\xc7K7\x86\x19\xe5n\x0c9\x95n\x0c;\x84\x0f3\xac\xe8sch\xa8^\xcf4'J\x97\xa9\x82\xe30\xc3\x91h]&\x90\xf8\xce\x97S\xcbA4L?2=\xe1\xc4\x92\x86\x0b@\xf5`\x0cT\x1f\xae\xaf]\nr\x0b\x03#\xa3\xdc\x02~\x06\x86\x03+\x18m\xc2=\xfdtC,C\xfdL<<==\\\x9d\x19\x88\x00\xe5 \x02\x00T\xf5+\\"
|
||||
syn = (
|
||||
Ether(dst=MAC_ADDR)
|
||||
/ IP(dst=IPV4_ADDR)
|
||||
/ TCP(flags="S", sport=sport, dport=dport, seq=seq_init)
|
||||
)
|
||||
syn_ack = srp1(syn, timeout=1)
|
||||
assert syn_ack is not None, "expecting answer, got nothing"
|
||||
check_ip_checksum(syn_ack)
|
||||
assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary()
|
||||
syn_ack = syn_ack[TCP]
|
||||
assert syn_ack.flags == "SA"
|
||||
ack = (
|
||||
Ether(dst=MAC_ADDR)
|
||||
/ IP(dst=IPV4_ADDR)
|
||||
/ TCP(
|
||||
flags="A",
|
||||
sport=sport,
|
||||
dport=dport,
|
||||
seq=seq_init + 1,
|
||||
ack=syn_ack.seq + 1,
|
||||
)
|
||||
)
|
||||
_ = srp1(ack, timeout=1)
|
||||
req = (
|
||||
Ether(dst=MAC_ADDR)
|
||||
/ IP(dst=IPV4_ADDR)
|
||||
/ TCP(
|
||||
flags="PA",
|
||||
sport=sport,
|
||||
dport=dport,
|
||||
seq=seq_init + 1,
|
||||
ack=syn_ack.seq + 1,
|
||||
)
|
||||
/ Raw(banner)
|
||||
)
|
||||
resp = srp1(req, timeout=1)
|
||||
assert resp is not None, "expecting answer, got nothing"
|
||||
check_ip_checksum(resp)
|
||||
assert TCP in resp, "expecting TCP, got %r" % resp.summary()
|
||||
tcp = resp[TCP]
|
||||
assert "A" in tcp.flags, "expecting ACK flag, not set (%r)" % tcp.flags
|
||||
assert "P" in tcp.flags, "expecting PSH flag, not set (%r)" % tcp.flags
|
||||
data = raw(tcp.payload)
|
||||
assert data, "expecting payload, got none"
|
||||
assert data.startswith(b"Gh0st"), "unexpected banner: %r" % tcp.payload.load
|
||||
data_len, uncompressed_len = struct.unpack("<II", data[5:13])
|
||||
assert len(data) == data_len, "invalid Ghost payload: %r" % data
|
||||
assert len(zlib.decompress(data[13:])) == uncompressed_len, (
|
||||
"invalid Ghost payload: %r" % data
|
||||
)
|
339
test/src/tests/http.py
Normal file
339
test/src/tests/http.py
Normal file
|
@ -0,0 +1,339 @@
|
|||
# This file is part of masscanned.
|
||||
# Copyright 2021 - The IVRE project
|
||||
#
|
||||
# Masscanned is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# Masscanned is distributed in the hope that it will be useful, but WITHOUT
|
||||
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
|
||||
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
|
||||
# License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with Masscanned. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from scapy.layers.inet import IP, TCP, UDP
|
||||
from scapy.layers.inet6 import IPv6
|
||||
from scapy.layers.l2 import Ether
|
||||
from scapy.packet import Raw
|
||||
from scapy.sendrecv import srp1
|
||||
from scapy.volatile import RandInt
|
||||
|
||||
from ..conf import IPV4_ADDR, IPV6_ADDR, MAC_ADDR
|
||||
from ..core import test, check_ip_checksum, check_ipv6_checksum
|
||||
|
||||
|
||||
@test
|
||||
def test_ipv4_tcp_http():
|
||||
sport = 24592
|
||||
dports = [80, 443, 5000, 53228]
|
||||
for dport in dports:
|
||||
seq_init = int(RandInt())
|
||||
syn = (
|
||||
Ether(dst=MAC_ADDR)
|
||||
/ IP(dst=IPV4_ADDR)
|
||||
/ TCP(flags="S", sport=sport, dport=dport, seq=seq_init)
|
||||
)
|
||||
syn_ack = srp1(syn, timeout=1)
|
||||
assert syn_ack is not None, "expecting answer, got nothing"
|
||||
check_ip_checksum(syn_ack)
|
||||
assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary()
|
||||
syn_ack = syn_ack[TCP]
|
||||
assert syn_ack.flags == "SA", "expecting TCP SA, got %r" % syn_ack.flags
|
||||
ack = (
|
||||
Ether(dst=MAC_ADDR)
|
||||
/ IP(dst=IPV4_ADDR)
|
||||
/ TCP(
|
||||
flags="A",
|
||||
sport=sport,
|
||||
dport=dport,
|
||||
seq=seq_init + 1,
|
||||
ack=syn_ack.seq + 1,
|
||||
)
|
||||
)
|
||||
_ = srp1(ack, timeout=1)
|
||||
req = (
|
||||
Ether(dst=MAC_ADDR)
|
||||
/ IP(dst=IPV4_ADDR)
|
||||
/ TCP(
|
||||
flags="PA",
|
||||
sport=sport,
|
||||
dport=dport,
|
||||
seq=seq_init + 1,
|
||||
ack=syn_ack.seq + 1,
|
||||
)
|
||||
/ Raw("GET / HTTP/1.1\r\n\r\n")
|
||||
)
|
||||
resp = srp1(req, timeout=1)
|
||||
assert resp is not None, "expecting answer, got nothing"
|
||||
check_ip_checksum(resp)
|
||||
assert TCP in resp, "expecting TCP, got %r" % resp.summary()
|
||||
tcp = resp[TCP]
|
||||
assert tcp.payload.load.startswith(b"HTTP/1.1 401 Unauthorized\n")
|
||||
|
||||
|
||||
@test
|
||||
def test_ipv4_tcp_http_incomplete():
|
||||
sport = 24595
|
||||
dports = [80, 443, 5000, 53228]
|
||||
for dport in dports:
|
||||
seq_init = int(RandInt())
|
||||
syn = (
|
||||
Ether(dst=MAC_ADDR)
|
||||
/ IP(dst=IPV4_ADDR)
|
||||
/ TCP(flags="S", sport=sport, dport=dport, seq=seq_init)
|
||||
)
|
||||
syn_ack = srp1(syn, timeout=1)
|
||||
assert syn_ack is not None, "expecting answer, got nothing"
|
||||
check_ip_checksum(syn_ack)
|
||||
assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary()
|
||||
syn_ack = syn_ack[TCP]
|
||||
assert syn_ack.flags == "SA", "expecting TCP SA, got %r" % syn_ack.flags
|
||||
ack = (
|
||||
Ether(dst=MAC_ADDR)
|
||||
/ IP(dst=IPV4_ADDR)
|
||||
/ TCP(
|
||||
flags="A",
|
||||
sport=sport,
|
||||
dport=dport,
|
||||
seq=seq_init + 1,
|
||||
ack=syn_ack.seq + 1,
|
||||
)
|
||||
)
|
||||
_ = srp1(ack, timeout=1)
|
||||
req = (
|
||||
Ether(dst=MAC_ADDR)
|
||||
/ IP(dst=IPV4_ADDR)
|
||||
/ TCP(
|
||||
flags="PA",
|
||||
sport=sport,
|
||||
dport=dport,
|
||||
seq=seq_init + 1,
|
||||
ack=syn_ack.seq + 1,
|
||||
)
|
||||
# purposedly incomplete request (missing additionnal ending \r\n)
|
||||
/ Raw("GET / HTTP/1.1\r\n")
|
||||
)
|
||||
resp = srp1(req, timeout=1)
|
||||
assert resp is not None, "expecting an answer, got none"
|
||||
check_ip_checksum(resp)
|
||||
assert TCP in resp, "expecting TCP, got %r" % resp.summary()
|
||||
tcp = resp[TCP]
|
||||
assert tcp.flags == "A", "expecting TCP flag A, got {}".format(tcp.flags)
|
||||
|
||||
|
||||
@test
|
||||
def test_ipv6_tcp_http():
|
||||
sport = 24592
|
||||
dports = [80, 443, 5000, 53228]
|
||||
for dport in dports:
|
||||
seq_init = int(RandInt())
|
||||
syn = (
|
||||
Ether(dst=MAC_ADDR)
|
||||
/ IPv6(dst=IPV6_ADDR)
|
||||
/ TCP(flags="S", sport=sport, dport=dport, seq=seq_init)
|
||||
)
|
||||
syn_ack = srp1(syn, timeout=1)
|
||||
assert syn_ack is not None, "expecting answer, got nothing"
|
||||
check_ipv6_checksum(syn_ack)
|
||||
assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary()
|
||||
syn_ack = syn_ack[TCP]
|
||||
assert syn_ack.flags == "SA"
|
||||
ack = (
|
||||
Ether(dst=MAC_ADDR)
|
||||
/ IPv6(dst=IPV6_ADDR)
|
||||
/ TCP(
|
||||
flags="A",
|
||||
sport=sport,
|
||||
dport=dport,
|
||||
seq=seq_init + 1,
|
||||
ack=syn_ack.seq + 1,
|
||||
)
|
||||
)
|
||||
_ = srp1(ack, timeout=1)
|
||||
req = (
|
||||
Ether(dst=MAC_ADDR)
|
||||
/ IPv6(dst=IPV6_ADDR)
|
||||
/ TCP(
|
||||
flags="PA",
|
||||
sport=sport,
|
||||
dport=dport,
|
||||
seq=seq_init + 1,
|
||||
ack=syn_ack.seq + 1,
|
||||
)
|
||||
/ Raw("GET / HTTP/1.1\r\n\r\n")
|
||||
)
|
||||
resp = srp1(req, timeout=1)
|
||||
assert resp is not None, "expecting answer, got nothing"
|
||||
check_ipv6_checksum(resp)
|
||||
assert TCP in resp, "expecting TCP, got %r" % resp.summary()
|
||||
tcp = resp[TCP]
|
||||
assert tcp.payload.load.startswith(b"HTTP/1.1 401 Unauthorized\n")
|
||||
|
||||
|
||||
@test
|
||||
def test_ipv4_udp_http():
|
||||
sport = 24592
|
||||
dports = [80, 443, 5000, 53228]
|
||||
for dport in dports:
|
||||
req = (
|
||||
Ether(dst=MAC_ADDR)
|
||||
/ IP(dst=IPV4_ADDR)
|
||||
/ UDP(sport=sport, dport=dport)
|
||||
/ Raw("GET / HTTP/1.1\r\n\r\n")
|
||||
)
|
||||
resp = srp1(req, timeout=1)
|
||||
assert resp is not None, "expecting answer, got nothing"
|
||||
check_ip_checksum(resp)
|
||||
assert UDP in resp
|
||||
udp = resp[UDP]
|
||||
assert udp.payload.load.startswith(b"HTTP/1.1 401 Unauthorized\n")
|
||||
|
||||
|
||||
@test
|
||||
def test_ipv6_udp_http():
|
||||
sport = 24592
|
||||
dports = [80, 443, 5000, 53228]
|
||||
for dport in dports:
|
||||
req = (
|
||||
Ether(dst=MAC_ADDR)
|
||||
/ IPv6(dst=IPV6_ADDR)
|
||||
/ UDP(sport=sport, dport=dport)
|
||||
/ Raw("GET / HTTP/1.1\r\n\r\n")
|
||||
)
|
||||
resp = srp1(req, timeout=1)
|
||||
assert resp is not None, "expecting answer, got nothing"
|
||||
check_ipv6_checksum(resp)
|
||||
assert UDP in resp
|
||||
udp = resp[UDP]
|
||||
assert udp.payload.load.startswith(b"HTTP/1.1 401 Unauthorized\n")
|
||||
|
||||
|
||||
@test
|
||||
def test_ipv4_tcp_http_ko():
|
||||
sport = 24592
|
||||
dports = [80, 443, 5000, 53228]
|
||||
for dport in dports:
|
||||
seq_init = int(RandInt())
|
||||
syn = (
|
||||
Ether(dst=MAC_ADDR)
|
||||
/ IP(dst=IPV4_ADDR)
|
||||
/ TCP(flags="S", sport=sport, dport=dport, seq=seq_init)
|
||||
)
|
||||
syn_ack = srp1(syn, timeout=1)
|
||||
assert syn_ack is not None, "expecting answer, got nothing"
|
||||
check_ip_checksum(syn_ack)
|
||||
assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary()
|
||||
syn_ack = syn_ack[TCP]
|
||||
assert syn_ack.flags == "SA"
|
||||
ack = (
|
||||
Ether(dst=MAC_ADDR)
|
||||
/ IP(dst=IPV4_ADDR)
|
||||
/ TCP(
|
||||
flags="A",
|
||||
sport=sport,
|
||||
dport=dport,
|
||||
seq=seq_init + 1,
|
||||
ack=syn_ack.seq + 1,
|
||||
)
|
||||
)
|
||||
_ = srp1(ack, timeout=1)
|
||||
req = (
|
||||
Ether(dst=MAC_ADDR)
|
||||
/ IP(dst=IPV4_ADDR)
|
||||
/ TCP(
|
||||
flags="PA",
|
||||
sport=sport,
|
||||
dport=dport,
|
||||
seq=seq_init + 1,
|
||||
ack=syn_ack.seq + 1,
|
||||
)
|
||||
/ Raw(bytes.fromhex("4f5054494f4e53"))
|
||||
)
|
||||
resp = srp1(req, timeout=1)
|
||||
assert resp is not None, "expecting answer, got nothing"
|
||||
check_ip_checksum(resp)
|
||||
assert TCP in resp, "expecting TCP, got %r" % resp.summary()
|
||||
assert "P" not in resp[TCP].flags
|
||||
assert len(resp[TCP].payload) == 0
|
||||
|
||||
|
||||
@test
|
||||
def test_ipv4_udp_http_ko():
|
||||
sport = 24592
|
||||
dports = [80, 443, 5000, 53228]
|
||||
for dport in dports:
|
||||
req = (
|
||||
Ether(dst=MAC_ADDR)
|
||||
/ IP(dst=IPV4_ADDR)
|
||||
/ UDP(sport=sport, dport=dport)
|
||||
/ Raw(bytes.fromhex("4f5054494f4e53"))
|
||||
)
|
||||
resp = srp1(req, timeout=1)
|
||||
assert resp is None, "expecting no answer, got one"
|
||||
|
||||
|
||||
@test
|
||||
def test_ipv6_tcp_http_ko():
|
||||
sport = 24592
|
||||
dports = [80, 443, 5000, 53228]
|
||||
for dport in dports:
|
||||
seq_init = int(RandInt())
|
||||
syn = (
|
||||
Ether(dst=MAC_ADDR)
|
||||
/ IPv6(dst=IPV6_ADDR)
|
||||
/ TCP(flags="S", sport=sport, dport=dport, seq=seq_init)
|
||||
)
|
||||
syn_ack = srp1(syn, timeout=1)
|
||||
assert syn_ack is not None, "expecting answer, got nothing"
|
||||
check_ipv6_checksum(syn_ack)
|
||||
assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary()
|
||||
syn_ack = syn_ack[TCP]
|
||||
assert syn_ack.flags == "SA"
|
||||
ack = (
|
||||
Ether(dst=MAC_ADDR)
|
||||
/ IPv6(dst=IPV6_ADDR)
|
||||
/ TCP(
|
||||
flags="A",
|
||||
sport=sport,
|
||||
dport=dport,
|
||||
seq=seq_init + 1,
|
||||
ack=syn_ack.seq + 1,
|
||||
)
|
||||
)
|
||||
_ = srp1(ack, timeout=1)
|
||||
req = (
|
||||
Ether(dst=MAC_ADDR)
|
||||
/ IPv6(dst=IPV6_ADDR)
|
||||
/ TCP(
|
||||
flags="PA",
|
||||
sport=sport,
|
||||
dport=dport,
|
||||
seq=seq_init + 1,
|
||||
ack=syn_ack.seq + 1,
|
||||
)
|
||||
/ Raw(bytes.fromhex("4f5054494f4e53"))
|
||||
)
|
||||
resp = srp1(req, timeout=1)
|
||||
assert resp is not None, "expecting answer, got nothing"
|
||||
check_ipv6_checksum(resp)
|
||||
assert TCP in resp, "expecting TCP, got %r" % resp.summary()
|
||||
assert "P" not in resp[TCP].flags
|
||||
assert len(resp[TCP].payload) == 0
|
||||
|
||||
|
||||
@test
|
||||
def test_ipv6_udp_http_ko():
|
||||
sport = 24592
|
||||
dports = [80, 443, 5000, 53228]
|
||||
for dport in dports:
|
||||
req = (
|
||||
Ether(dst=MAC_ADDR)
|
||||
/ IPv6(dst=IPV6_ADDR)
|
||||
/ UDP(sport=sport, dport=dport)
|
||||
/ Raw(bytes.fromhex("4f5054494f4e53"))
|
||||
)
|
||||
resp = srp1(req, timeout=1)
|
||||
assert resp is None, "expecting no answer, got one"
|
45
test/src/tests/icmpv4.py
Normal file
45
test/src/tests/icmpv4.py
Normal file
|
@ -0,0 +1,45 @@
|
|||
# This file is part of masscanned.
|
||||
# Copyright 2021 - The IVRE project
|
||||
#
|
||||
# Masscanned is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# Masscanned is distributed in the hope that it will be useful, but WITHOUT
|
||||
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
|
||||
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
|
||||
# License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with Masscanned. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from scapy.layers.inet import IP, ICMP
|
||||
from scapy.layers.l2 import Ether
|
||||
from scapy.packet import Raw
|
||||
from scapy.sendrecv import srp1
|
||||
|
||||
from ..conf import IPV4_ADDR, MAC_ADDR
|
||||
from ..core import test, check_ip_checksum
|
||||
|
||||
|
||||
@test
|
||||
def test_icmpv4_echo_req():
|
||||
##### ICMPv4 #####
|
||||
icmp_req = (
|
||||
Ether(dst=MAC_ADDR)
|
||||
/ IP(dst=IPV4_ADDR)
|
||||
/ ICMP(type=8, code=0)
|
||||
/ Raw("idrinkwaytoomuchcoffee")
|
||||
)
|
||||
icmp_repl = srp1(icmp_req, timeout=1)
|
||||
assert icmp_repl is not None, "expecting answer, got nothing"
|
||||
check_ip_checksum(icmp_repl)
|
||||
assert ICMP in icmp_repl
|
||||
icmp_repl = icmp_repl[ICMP]
|
||||
# check answer
|
||||
## type is "echo-reply"
|
||||
assert icmp_repl.type == 0
|
||||
assert icmp_repl.code == 0
|
||||
## data is the same as sent
|
||||
assert icmp_repl.load == icmp_req.load
|
87
test/src/tests/icmpv6.py
Normal file
87
test/src/tests/icmpv6.py
Normal file
|
@ -0,0 +1,87 @@
|
|||
# This file is part of masscanned.
|
||||
# Copyright 2021 - The IVRE project
|
||||
#
|
||||
# Masscanned is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# Masscanned is distributed in the hope that it will be useful, but WITHOUT
|
||||
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
|
||||
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
|
||||
# License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with Masscanned. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from scapy.layers.inet6 import (
|
||||
ICMPv6EchoReply,
|
||||
ICMPv6EchoRequest,
|
||||
ICMPv6NDOptDstLLAddr,
|
||||
ICMPv6ND_NA,
|
||||
ICMPv6ND_NS,
|
||||
IPv6,
|
||||
)
|
||||
from scapy.layers.l2 import Ether
|
||||
from scapy.sendrecv import srp1
|
||||
|
||||
from ..conf import IPV6_ADDR, MAC_ADDR
|
||||
from ..core import test, multicast
|
||||
|
||||
|
||||
@test
|
||||
def test_icmpv6_neighbor_solicitation():
|
||||
##### IPv6 Neighbor Solicitation #####
|
||||
for mac in [
|
||||
"ff:ff:ff:ff:ff:ff",
|
||||
"33:33:00:00:00:01",
|
||||
MAC_ADDR,
|
||||
multicast(IPV6_ADDR),
|
||||
]:
|
||||
nd_ns = Ether(dst=mac) / IPv6() / ICMPv6ND_NS(tgt=IPV6_ADDR)
|
||||
nd_na = srp1(nd_ns, timeout=1)
|
||||
assert nd_na is not None, "expecting answer, got nothing"
|
||||
assert ICMPv6ND_NA in nd_na
|
||||
nd_na = nd_na[ICMPv6ND_NA]
|
||||
# check answer content
|
||||
assert nd_na.code == 0
|
||||
assert nd_na.R == 0
|
||||
assert nd_na.S == 1
|
||||
assert nd_na.O == 1 # noqa: E741
|
||||
assert nd_na.tgt == IPV6_ADDR
|
||||
# check ND Option
|
||||
assert nd_na.haslayer(ICMPv6NDOptDstLLAddr)
|
||||
assert nd_na.getlayer(ICMPv6NDOptDstLLAddr).lladdr == MAC_ADDR
|
||||
for mac in ["00:00:00:00:00:00", "33:33:33:00:00:01"]:
|
||||
nd_ns = Ether(dst="ff:ff:ff:ff:ff:ff") / IPv6() / ICMPv6ND_NS(tgt=IPV6_ADDR)
|
||||
nd_na = srp1(nd_ns, timeout=1)
|
||||
assert nd_na is not None, "expecting no answer, got one"
|
||||
|
||||
|
||||
@test
|
||||
def test_icmpv6_neighbor_solicitation_other_ip():
|
||||
##### IPv6 Neighbor Solicitation #####
|
||||
nd_ns = (
|
||||
Ether(dst="ff:ff:ff:ff:ff:ff")
|
||||
/ IPv6()
|
||||
/ ICMPv6ND_NS(tgt="2020:4141:3030:2020::bdbd")
|
||||
)
|
||||
nd_na = srp1(nd_ns, timeout=1)
|
||||
assert nd_na is None, "responding to ND_NS for other IP addresses"
|
||||
|
||||
|
||||
@test
|
||||
def test_icmpv6_echo_req():
|
||||
##### IPv6 Ping #####
|
||||
echo_req = (
|
||||
Ether(dst=MAC_ADDR)
|
||||
/ IPv6(dst=IPV6_ADDR)
|
||||
/ ICMPv6EchoRequest(data="waytoomanynapkins")
|
||||
)
|
||||
echo_repl = srp1(echo_req, timeout=1)
|
||||
assert echo_repl is not None, "expecting answer, got nothing"
|
||||
assert ICMPv6EchoReply in echo_repl
|
||||
echo_repl = echo_repl[ICMPv6EchoReply]
|
||||
# check answer content
|
||||
assert echo_repl.code == 0
|
||||
assert echo_repl.data == echo_req.data
|
291
test/src/tests/ip46.py
Normal file
291
test/src/tests/ip46.py
Normal file
|
@ -0,0 +1,291 @@
|
|||
# This file is part of masscanned.
|
||||
# Copyright 2021 - The IVRE project
|
||||
#
|
||||
# Masscanned is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# Masscanned is distributed in the hope that it will be useful, but WITHOUT
|
||||
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
|
||||
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
|
||||
# License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with Masscanned. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from scapy.layers.inet import IP, ICMP, TCP
|
||||
from scapy.layers.inet6 import IPv6
|
||||
from scapy.layers.l2 import Ether
|
||||
from scapy.packet import Raw
|
||||
from scapy.sendrecv import srp1
|
||||
from scapy.volatile import RandInt
|
||||
|
||||
from ..conf import IPV4_ADDR, IPV6_ADDR, MAC_ADDR
|
||||
from ..core import test, check_ip_checksum, check_ipv6_checksum
|
||||
|
||||
|
||||
@test
|
||||
def test_ipv4_req():
|
||||
##### IP #####
|
||||
ip_req = Ether(dst=MAC_ADDR) / IP(dst=IPV4_ADDR, id=0x1337) / ICMP(type=8, code=0)
|
||||
ip_repl = srp1(ip_req, timeout=1)
|
||||
assert ip_repl is not None, "expecting answer, got nothing"
|
||||
check_ip_checksum(ip_repl)
|
||||
assert IP in ip_repl, "no IP layer in response"
|
||||
ip_repl = ip_repl[IP]
|
||||
assert ip_repl.id == 0, "IP identification unexpected"
|
||||
|
||||
|
||||
@test
|
||||
def test_eth_req_other_mac():
|
||||
#### ETH ####
|
||||
ip_req = Ether(dst="00:00:00:11:11:11") / IP(dst=IPV4_ADDR) / ICMP(type=8, code=0)
|
||||
ip_repl = srp1(ip_req, timeout=1)
|
||||
assert ip_repl is None, "responding to other MAC addresses"
|
||||
|
||||
|
||||
@test
|
||||
def test_ipv4_req_other_ip():
|
||||
##### IP #####
|
||||
ip_req = Ether(dst=MAC_ADDR) / IP(dst="1.2.3.4") / ICMP(type=8, code=0)
|
||||
ip_repl = srp1(ip_req, timeout=1)
|
||||
assert ip_repl is None, "responding to other IP addresses"
|
||||
|
||||
|
||||
@test
|
||||
def test_tcp_syn():
|
||||
##### SYN-ACK #####
|
||||
# test a list of ports, randomly generated once
|
||||
ports_to_test = [
|
||||
1152,
|
||||
2003,
|
||||
2193,
|
||||
3709,
|
||||
4054,
|
||||
6605,
|
||||
6737,
|
||||
6875,
|
||||
7320,
|
||||
8898,
|
||||
9513,
|
||||
9738,
|
||||
10623,
|
||||
10723,
|
||||
11253,
|
||||
12125,
|
||||
12189,
|
||||
12873,
|
||||
14648,
|
||||
14659,
|
||||
16242,
|
||||
16243,
|
||||
17209,
|
||||
17492,
|
||||
17667,
|
||||
17838,
|
||||
18081,
|
||||
18682,
|
||||
18790,
|
||||
19124,
|
||||
19288,
|
||||
19558,
|
||||
19628,
|
||||
19789,
|
||||
20093,
|
||||
21014,
|
||||
21459,
|
||||
21740,
|
||||
24070,
|
||||
24312,
|
||||
24576,
|
||||
26939,
|
||||
27136,
|
||||
27165,
|
||||
27361,
|
||||
29971,
|
||||
31088,
|
||||
33011,
|
||||
33068,
|
||||
34990,
|
||||
35093,
|
||||
35958,
|
||||
36626,
|
||||
36789,
|
||||
37130,
|
||||
37238,
|
||||
37256,
|
||||
37697,
|
||||
37890,
|
||||
38958,
|
||||
42131,
|
||||
43864,
|
||||
44420,
|
||||
44655,
|
||||
44868,
|
||||
45157,
|
||||
46213,
|
||||
46497,
|
||||
46955,
|
||||
49049,
|
||||
49067,
|
||||
49452,
|
||||
49480,
|
||||
50498,
|
||||
50945,
|
||||
51181,
|
||||
52890,
|
||||
53301,
|
||||
53407,
|
||||
53417,
|
||||
53980,
|
||||
55827,
|
||||
56483,
|
||||
58552,
|
||||
58713,
|
||||
58836,
|
||||
59362,
|
||||
59560,
|
||||
60534,
|
||||
60555,
|
||||
60660,
|
||||
61615,
|
||||
62402,
|
||||
62533,
|
||||
62941,
|
||||
63240,
|
||||
63339,
|
||||
63616,
|
||||
64380,
|
||||
65438,
|
||||
]
|
||||
for p in ports_to_test:
|
||||
seq_init = int(RandInt())
|
||||
syn = (
|
||||
Ether(dst=MAC_ADDR)
|
||||
/ IP(dst=IPV4_ADDR)
|
||||
/ TCP(flags="S", dport=p, seq=seq_init)
|
||||
)
|
||||
syn_ack = srp1(syn, timeout=1)
|
||||
assert syn_ack is not None, "expecting answer, got nothing"
|
||||
check_ip_checksum(syn_ack)
|
||||
assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary()
|
||||
syn_ack = syn_ack[TCP]
|
||||
assert syn_ack.flags == "SA", "expecting TCP SA, got %r" % syn_ack.flags
|
||||
assert syn_ack.ack == seq_init + 1, "wrong TCP ack value (%r != %r)" % (
|
||||
syn_ack.ack,
|
||||
seq_init + 1,
|
||||
)
|
||||
|
||||
|
||||
@test
|
||||
def test_ipv4_tcp_psh_ack():
|
||||
##### PSH-ACK #####
|
||||
sport = 26695
|
||||
port = 445
|
||||
seq_init = int(RandInt())
|
||||
# send PSH-ACK first
|
||||
psh_ack = (
|
||||
Ether(dst=MAC_ADDR)
|
||||
/ IP(dst=IPV4_ADDR)
|
||||
/ TCP(flags="PA", sport=sport, dport=port, seq=seq_init)
|
||||
/ Raw("payload")
|
||||
)
|
||||
syn_ack = srp1(psh_ack, timeout=1)
|
||||
assert syn_ack is None, "no answer expected, got one"
|
||||
# test the anti-injection mechanism
|
||||
seq_init = int(RandInt())
|
||||
syn = (
|
||||
Ether(dst=MAC_ADDR)
|
||||
/ IP(dst=IPV4_ADDR)
|
||||
/ TCP(flags="S", sport=sport, dport=port, seq=seq_init)
|
||||
)
|
||||
syn_ack = srp1(syn, timeout=1)
|
||||
assert syn_ack is not None, "expecting answer, got nothing"
|
||||
check_ip_checksum(syn_ack)
|
||||
assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary()
|
||||
syn_ack = syn_ack[TCP]
|
||||
assert syn_ack.flags == "SA", "expecting TCP SA, got %r" % syn_ack.flags
|
||||
assert syn_ack.ack == seq_init + 1, "wrong TCP ack value (%r != %r)" % (
|
||||
syn_ack.ack,
|
||||
seq_init + 1,
|
||||
)
|
||||
ack = Ether(dst=MAC_ADDR) / IP(dst=IPV4_ADDR) / TCP(flags="A", dport=port)
|
||||
# should fail because no ack given
|
||||
psh_ack = (
|
||||
Ether(dst=MAC_ADDR)
|
||||
/ IP(dst=IPV4_ADDR)
|
||||
/ TCP(flags="PA", sport=sport, dport=port, ack=0, seq=seq_init + 1)
|
||||
)
|
||||
ack = srp1(psh_ack, timeout=1)
|
||||
assert ack is None, "no answer expected, got one"
|
||||
# should get an answer this time
|
||||
psh_ack = (
|
||||
Ether(dst=MAC_ADDR)
|
||||
/ IP(dst=IPV4_ADDR)
|
||||
/ TCP(
|
||||
flags="PA", sport=sport, dport=port, ack=syn_ack.seq + 1, seq=seq_init + 1
|
||||
)
|
||||
)
|
||||
ack = srp1(psh_ack, timeout=1)
|
||||
assert ack is not None, "expecting answer, got nothing"
|
||||
check_ip_checksum(ack)
|
||||
assert TCP in ack, "expecting TCP, got %r" % ack.summary()
|
||||
ack = ack[TCP]
|
||||
assert ack.flags == "A", "expecting TCP A, got %r" % syn_ack.flags
|
||||
|
||||
|
||||
@test
|
||||
def test_ipv6_tcp_psh_ack():
|
||||
##### PSH-ACK #####
|
||||
sport = 26695
|
||||
port = 445
|
||||
seq_init = int(RandInt())
|
||||
# send PSH-ACK first
|
||||
psh_ack = (
|
||||
Ether(dst=MAC_ADDR)
|
||||
/ IPv6(dst=IPV6_ADDR)
|
||||
/ TCP(flags="PA", sport=sport, dport=port, seq=seq_init)
|
||||
/ Raw("payload")
|
||||
)
|
||||
syn_ack = srp1(psh_ack, timeout=1)
|
||||
assert syn_ack is None, "no answer expected, got one"
|
||||
# test the anti-injection mechanism
|
||||
syn = (
|
||||
Ether(dst=MAC_ADDR)
|
||||
/ IPv6(dst=IPV6_ADDR)
|
||||
/ TCP(flags="S", sport=sport, dport=port, seq=seq_init)
|
||||
)
|
||||
syn_ack = srp1(syn, timeout=1)
|
||||
assert syn_ack is not None, "expecting answer, got nothing"
|
||||
check_ipv6_checksum(syn_ack)
|
||||
assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary()
|
||||
syn_ack = syn_ack[TCP]
|
||||
assert syn_ack.flags == "SA", "expecting TCP SA, got %r" % syn_ack.flags
|
||||
assert syn_ack.ack == seq_init + 1, "wrong TCP ack value (%r != %r)" % (
|
||||
syn_ack.ack,
|
||||
seq_init + 1,
|
||||
)
|
||||
ack = Ether(dst=MAC_ADDR) / IPv6(dst=IPV6_ADDR) / TCP(flags="A", dport=port)
|
||||
# should fail because no ack given
|
||||
psh_ack = (
|
||||
Ether(dst=MAC_ADDR)
|
||||
/ IPv6(dst=IPV6_ADDR)
|
||||
/ TCP(flags="PA", sport=sport, dport=port, ack=0, seq=seq_init + 1)
|
||||
)
|
||||
ack = srp1(psh_ack, timeout=1)
|
||||
assert ack is None, "no answer expected, got one"
|
||||
# should get an answer this time
|
||||
psh_ack = (
|
||||
Ether(dst=MAC_ADDR)
|
||||
/ IPv6(dst=IPV6_ADDR)
|
||||
/ TCP(
|
||||
flags="PA", sport=sport, dport=port, ack=syn_ack.seq + 1, seq=seq_init + 1
|
||||
)
|
||||
)
|
||||
ack = srp1(psh_ack, timeout=1)
|
||||
assert ack is not None, "expecting answer, got nothing"
|
||||
check_ipv6_checksum(ack)
|
||||
assert TCP in ack, "expecting TCP, got %r" % ack.summary()
|
||||
ack = ack[TCP]
|
||||
assert ack.flags == "A", "expecting TCP A, got %r" % syn_ack.flags
|
99
test/src/tests/rpc.py
Normal file
99
test/src/tests/rpc.py
Normal file
|
@ -0,0 +1,99 @@
|
|||
# This file is part of masscanned.
|
||||
# Copyright 2021 - The IVRE project
|
||||
#
|
||||
# Masscanned is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# Masscanned is distributed in the hope that it will be useful, but WITHOUT
|
||||
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
|
||||
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
|
||||
# License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with Masscanned. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from subprocess import check_call
|
||||
from tempfile import NamedTemporaryFile
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
|
||||
from ivre.db import DBNmap
|
||||
|
||||
from ..conf import IPV4_ADDR
|
||||
from ..core import test
|
||||
|
||||
|
||||
@test
|
||||
def test_rpc_nmap():
|
||||
for scan in "SU":
|
||||
with NamedTemporaryFile(delete=False) as xml_result:
|
||||
check_call(
|
||||
[
|
||||
"nmap",
|
||||
"-n",
|
||||
"-vv",
|
||||
"-oX",
|
||||
"-",
|
||||
IPV4_ADDR,
|
||||
f"-s{scan}V",
|
||||
"-p",
|
||||
"111",
|
||||
"--script",
|
||||
"rpcinfo,rpc-grind",
|
||||
],
|
||||
stdout=xml_result,
|
||||
)
|
||||
with NamedTemporaryFile(delete=False, mode="w") as json_result:
|
||||
DBNmap(output=json_result).store_scan(xml_result.name)
|
||||
os.unlink(xml_result.name)
|
||||
with open(json_result.name) as fdesc:
|
||||
results = [json.loads(line) for line in fdesc]
|
||||
os.unlink(json_result.name)
|
||||
assert len(results) == 1, f"Expected 1 result, got {len(results)}"
|
||||
result = results[0]
|
||||
assert len(result["ports"]) == 1, f"Expected 1 port, got {len(result['ports'])}"
|
||||
port = result["ports"][0]
|
||||
assert port["port"] == 111 and port["protocol"] == (
|
||||
"tcp" if scan == "S" else "udp"
|
||||
)
|
||||
assert port["service_name"] in {"rpcbind", "nfs"}
|
||||
assert port["service_extrainfo"] in {"RPC #100000", "RPC #100003"}
|
||||
assert (
|
||||
len(port["scripts"]) == 1
|
||||
), f"Expected 1 script, got {len(port['scripts'])}"
|
||||
script = port["scripts"][0]
|
||||
assert script["id"] == "rpcinfo", "Expected rpcinfo script, not found"
|
||||
assert len(script["rpcinfo"]) == 1
|
||||
|
||||
|
||||
@test
|
||||
def test_rpcinfo():
|
||||
with NamedTemporaryFile(delete=False) as rpcout:
|
||||
check_call(["rpcinfo", "-p", IPV4_ADDR], stdout=rpcout)
|
||||
with open(rpcout.name) as fdesc:
|
||||
found = []
|
||||
for line in fdesc:
|
||||
line = line.split()
|
||||
if line[0] == "program":
|
||||
# header
|
||||
continue
|
||||
assert line[0] == "100000", f"Expected program 100000, got {line[0]}"
|
||||
found.append(int(line[1]))
|
||||
assert len(found) == 3, f"Expected three versions, got {found}"
|
||||
for i in range(2, 5):
|
||||
assert i in found, f"Missing version {i} in {found}"
|
||||
os.unlink(rpcout.name)
|
||||
with NamedTemporaryFile(delete=False) as rpcout:
|
||||
check_call(["rpcinfo", "-u", IPV4_ADDR, "100000"], stdout=rpcout)
|
||||
with open(rpcout.name) as fdesc:
|
||||
found = []
|
||||
expr = re.compile("^program 100000 version ([0-9]) ready and waiting$")
|
||||
for line in fdesc:
|
||||
found.append(int(expr.search(line.strip()).group(1)))
|
||||
assert len(found) == 3, f"Expected three versions, got {found}"
|
||||
for i in range(2, 5):
|
||||
assert i in found, f"Missing version {i} in {found}"
|
||||
os.unlink(rpcout.name)
|
65
test/src/tests/smb.py
Normal file
65
test/src/tests/smb.py
Normal file
|
@ -0,0 +1,65 @@
|
|||
# This file is part of masscanned.
|
||||
# Copyright 2021 - The IVRE project
|
||||
#
|
||||
# Masscanned is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# Masscanned is distributed in the hope that it will be useful, but WITHOUT
|
||||
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
|
||||
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
|
||||
# License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with Masscanned. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import subprocess
|
||||
|
||||
from ..core import test
|
||||
from ..conf import IPV4_ADDR
|
||||
|
||||
|
||||
@test
|
||||
def test_smb1_network_req():
|
||||
proc = subprocess.Popen(
|
||||
[
|
||||
"smbclient",
|
||||
"-U ''",
|
||||
"-N",
|
||||
"-d 6",
|
||||
"-t 1",
|
||||
"-L",
|
||||
IPV4_ADDR,
|
||||
"--option=client min protocol=NT1",
|
||||
],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
text=True,
|
||||
)
|
||||
out, _ = proc.communicate()
|
||||
assert f"Connecting to {IPV4_ADDR} at port 445" in out, "\n" + out
|
||||
assert "session request ok" in out, "\n" + out
|
||||
assert f"negotiated dialect[NT1] against server[{IPV4_ADDR}]" in out, "\n" + out
|
||||
|
||||
|
||||
@test
|
||||
def test_smb2_network_req():
|
||||
proc = subprocess.Popen(
|
||||
[
|
||||
"smbclient",
|
||||
"-U ''",
|
||||
"-N",
|
||||
"-d 5",
|
||||
"-t 1",
|
||||
"-L",
|
||||
IPV4_ADDR,
|
||||
],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
text=True,
|
||||
)
|
||||
out, _ = proc.communicate()
|
||||
assert f"Connecting to {IPV4_ADDR} at port 445" in out, "\n" + out
|
||||
assert "session request ok" in out, "\n" + out
|
||||
assert f"negotiated dialect[SMB2_02] against server[{IPV4_ADDR}]" in out, "\n" + out
|
217
test/src/tests/ssh.py
Normal file
217
test/src/tests/ssh.py
Normal file
|
@ -0,0 +1,217 @@
|
|||
# This file is part of masscanned.
|
||||
# Copyright 2021 - The IVRE project
|
||||
#
|
||||
# Masscanned is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# Masscanned is distributed in the hope that it will be useful, but WITHOUT
|
||||
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
|
||||
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
|
||||
# License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with Masscanned. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from scapy.layers.inet import IP, TCP, UDP
|
||||
from scapy.layers.inet6 import IPv6
|
||||
from scapy.layers.l2 import Ether
|
||||
from scapy.packet import Raw
|
||||
from scapy.sendrecv import srp1
|
||||
from scapy.volatile import RandInt
|
||||
|
||||
from ..conf import IPV4_ADDR, IPV6_ADDR, MAC_ADDR
|
||||
from ..core import test, check_ip_checksum, check_ipv6_checksum
|
||||
|
||||
|
||||
@test
|
||||
def test_ipv4_tcp_ssh():
|
||||
sport = 37183
|
||||
dports = [22, 80, 2222, 2022, 23874, 50000]
|
||||
for i, dport in enumerate(dports):
|
||||
seq_init = int(RandInt())
|
||||
banner = [
|
||||
b"SSH-2.0-AsyncSSH_2.1.0",
|
||||
b"SSH-2.0-PuTTY",
|
||||
b"SSH-2.0-libssh2_1.4.3",
|
||||
b"SSH-2.0-Go",
|
||||
b"SSH-2.0-PUTTY",
|
||||
][i % 5]
|
||||
syn = (
|
||||
Ether(dst=MAC_ADDR)
|
||||
/ IP(dst=IPV4_ADDR)
|
||||
/ TCP(flags="S", sport=sport, dport=dport, seq=seq_init)
|
||||
)
|
||||
syn_ack = srp1(syn, timeout=1)
|
||||
assert syn_ack is not None, "expecting answer, got nothing"
|
||||
check_ip_checksum(syn_ack)
|
||||
assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary()
|
||||
syn_ack = syn_ack[TCP]
|
||||
assert syn_ack.flags == "SA"
|
||||
ack = (
|
||||
Ether(dst=MAC_ADDR)
|
||||
/ IP(dst=IPV4_ADDR)
|
||||
/ TCP(
|
||||
flags="A",
|
||||
sport=sport,
|
||||
dport=dport,
|
||||
seq=seq_init + 1,
|
||||
ack=syn_ack.seq + 1,
|
||||
)
|
||||
)
|
||||
_ = srp1(ack, timeout=1)
|
||||
req = (
|
||||
Ether(dst=MAC_ADDR)
|
||||
/ IP(dst=IPV4_ADDR)
|
||||
/ TCP(
|
||||
flags="PA",
|
||||
sport=sport,
|
||||
dport=dport,
|
||||
seq=seq_init + 1,
|
||||
ack=syn_ack.seq + 1,
|
||||
)
|
||||
/ Raw(banner + b"\r\n")
|
||||
)
|
||||
resp = srp1(req, timeout=1)
|
||||
assert resp is not None, "expecting answer, got nothing"
|
||||
check_ip_checksum(resp)
|
||||
assert TCP in resp, "expecting TCP, got %r" % resp.summary()
|
||||
tcp = resp[TCP]
|
||||
assert "A" in tcp.flags, "expecting ACK flag, not set (%r)" % tcp.flags
|
||||
assert "P" in tcp.flags, "expecting PSH flag, not set (%r)" % tcp.flags
|
||||
assert len(tcp.payload) > 0, "expecting payload, got none"
|
||||
assert tcp.payload.load.startswith(b"SSH-2.0-"), (
|
||||
"unexpected banner: %r" % tcp.payload.load
|
||||
)
|
||||
assert tcp.payload.load.endswith(b"\r\n"), (
|
||||
"unexpected banner: %r" % tcp.payload.load
|
||||
)
|
||||
|
||||
|
||||
@test
|
||||
def test_ipv4_udp_ssh():
|
||||
sport = 37183
|
||||
dports = [22, 80, 2222, 2022, 23874, 50000]
|
||||
for i, dport in enumerate(dports):
|
||||
banner = [
|
||||
b"SSH-2.0-AsyncSSH_2.1.0",
|
||||
b"SSH-2.0-PuTTY",
|
||||
b"SSH-2.0-libssh2_1.4.3",
|
||||
b"SSH-2.0-Go",
|
||||
b"SSH-2.0-PUTTY",
|
||||
][i % 5]
|
||||
req = (
|
||||
Ether(dst=MAC_ADDR)
|
||||
/ IP(dst=IPV4_ADDR)
|
||||
/ UDP(sport=sport, dport=dport)
|
||||
/ Raw(banner + b"\r\n")
|
||||
)
|
||||
resp = srp1(req, timeout=1)
|
||||
assert resp is not None, "expecting answer, got nothing"
|
||||
check_ip_checksum(resp)
|
||||
assert UDP in resp
|
||||
udp = resp[UDP]
|
||||
assert len(udp.payload) > 0, "expecting payload, got none"
|
||||
assert udp.payload.load.startswith(b"SSH-2.0-"), (
|
||||
"unexpected banner: %r" % udp.payload.load
|
||||
)
|
||||
assert udp.payload.load.endswith(b"\r\n"), (
|
||||
"unexpected banner: %r" % udp.payload.load
|
||||
)
|
||||
|
||||
|
||||
@test
|
||||
def test_ipv6_tcp_ssh():
|
||||
sport = 37183
|
||||
dports = [22, 80, 2222, 2022, 23874, 50000]
|
||||
for i, dport in enumerate(dports):
|
||||
seq_init = int(RandInt())
|
||||
banner = [
|
||||
b"SSH-2.0-AsyncSSH_2.1.0",
|
||||
b"SSH-2.0-PuTTY",
|
||||
b"SSH-2.0-libssh2_1.4.3",
|
||||
b"SSH-2.0-Go",
|
||||
b"SSH-2.0-PUTTY",
|
||||
][i % 5]
|
||||
syn = (
|
||||
Ether(dst=MAC_ADDR)
|
||||
/ IPv6(dst=IPV6_ADDR)
|
||||
/ TCP(flags="S", sport=sport, dport=dport, seq=seq_init)
|
||||
)
|
||||
syn_ack = srp1(syn, timeout=1)
|
||||
assert syn_ack is not None, "expecting answer, got nothing"
|
||||
check_ipv6_checksum(syn_ack)
|
||||
assert TCP in syn_ack, "expecting TCP, got %r" % syn_ack.summary()
|
||||
syn_ack = syn_ack[TCP]
|
||||
assert syn_ack.flags == "SA"
|
||||
ack = (
|
||||
Ether(dst=MAC_ADDR)
|
||||
/ IPv6(dst=IPV6_ADDR)
|
||||
/ TCP(
|
||||
flags="A",
|
||||
sport=sport,
|
||||
dport=dport,
|
||||
seq=seq_init + 1,
|
||||
ack=syn_ack.seq + 1,
|
||||
)
|
||||
)
|
||||
_ = srp1(ack, timeout=1)
|
||||
req = (
|
||||
Ether(dst=MAC_ADDR)
|
||||
/ IPv6(dst=IPV6_ADDR)
|
||||
/ TCP(
|
||||
flags="PA",
|
||||
sport=sport,
|
||||
dport=dport,
|
||||
seq=seq_init + 1,
|
||||
ack=syn_ack.seq + 1,
|
||||
)
|
||||
/ Raw(banner + b"\r\n")
|
||||
)
|
||||
resp = srp1(req, timeout=1)
|
||||
assert resp is not None, "expecting answer, got nothing"
|
||||
check_ipv6_checksum(resp)
|
||||
assert TCP in resp, "expecting TCP, got %r" % resp.summary()
|
||||
tcp = resp[TCP]
|
||||
assert "A" in tcp.flags, "expecting ACK flag, not set (%r)" % tcp.flags
|
||||
assert "P" in tcp.flags, "expecting PSH flag, not set (%r)" % tcp.flags
|
||||
assert len(tcp.payload) > 0, "expecting payload, got none"
|
||||
assert tcp.payload.load.startswith(b"SSH-2.0-"), (
|
||||
"unexpected banner: %r" % tcp.payload.load
|
||||
)
|
||||
assert tcp.payload.load.endswith(b"\r\n"), (
|
||||
"unexpected banner: %r" % tcp.payload.load
|
||||
)
|
||||
|
||||
|
||||
@test
|
||||
def test_ipv6_udp_ssh():
|
||||
sport = 37183
|
||||
dports = [22, 80, 2222, 2022, 23874, 50000]
|
||||
for i, dport in enumerate(dports):
|
||||
banner = [
|
||||
b"SSH-2.0-AsyncSSH_2.1.0",
|
||||
b"SSH-2.0-PuTTY",
|
||||
b"SSH-2.0-libssh2_1.4.3",
|
||||
b"SSH-2.0-Go",
|
||||
b"SSH-2.0-PUTTY",
|
||||
][i % 5]
|
||||
req = (
|
||||
Ether(dst=MAC_ADDR)
|
||||
/ IPv6(dst=IPV6_ADDR)
|
||||
/ UDP(sport=sport, dport=dport)
|
||||
/ Raw(banner + b"\r\n")
|
||||
)
|
||||
resp = srp1(req, timeout=1)
|
||||
assert resp is not None, "expecting answer, got nothing"
|
||||
check_ipv6_checksum(resp)
|
||||
assert UDP in resp
|
||||
udp = resp[UDP]
|
||||
assert len(udp.payload) > 0, "expecting payload, got none"
|
||||
assert udp.payload.load.startswith(b"SSH-2.0-"), (
|
||||
"unexpected banner: %r" % udp.payload.load
|
||||
)
|
||||
assert udp.payload.load.endswith(b"\r\n"), (
|
||||
"unexpected banner: %r" % udp.payload.load
|
||||
)
|
196
test/src/tests/stun.py
Normal file
196
test/src/tests/stun.py
Normal file
|
@ -0,0 +1,196 @@
|
|||
# This file is part of masscanned.
|
||||
# Copyright 2021 - The IVRE project
|
||||
#
|
||||
# Masscanned is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# Masscanned is distributed in the hope that it will be useful, but WITHOUT
|
||||
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
|
||||
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
|
||||
# License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with Masscanned. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from socket import AF_INET6
|
||||
import struct
|
||||
|
||||
from scapy.layers.inet import IP, UDP
|
||||
from scapy.layers.inet6 import IPv6
|
||||
from scapy.layers.l2 import Ether
|
||||
from scapy.packet import Raw
|
||||
from scapy.pton_ntop import inet_pton
|
||||
from scapy.sendrecv import srp1
|
||||
|
||||
from ..conf import IPV4_ADDR, IPV6_ADDR, MAC_ADDR
|
||||
from ..core import test, check_ip_checksum, check_ipv6_checksum
|
||||
|
||||
|
||||
@test
|
||||
def test_ipv4_udp_stun():
|
||||
sports = [12345, 55555, 80, 43273]
|
||||
dports = [80, 800, 8000, 3478]
|
||||
payload = bytes.fromhex("000100002112a442000000000000000000000000")
|
||||
for sport in sports:
|
||||
for dport in dports:
|
||||
req = (
|
||||
Ether(dst=MAC_ADDR)
|
||||
/ IP(dst=IPV4_ADDR)
|
||||
/ UDP(sport=sport, dport=dport)
|
||||
/ Raw(payload)
|
||||
)
|
||||
resp = srp1(req, timeout=1)
|
||||
assert resp is not None, "expecting answer, got nothing"
|
||||
check_ip_checksum(resp)
|
||||
assert UDP in resp, "no UDP layer found"
|
||||
udp = resp[UDP]
|
||||
assert udp.sport == dport, "unexpected UDP sport: {}".format(udp.sport)
|
||||
assert udp.dport == sport, "unexpected UDP dport: {}".format(udp.dport)
|
||||
resp_payload = udp.payload.load
|
||||
type_, length, magic = struct.unpack(">HHI", resp_payload[:8])
|
||||
tid = resp_payload[8:20]
|
||||
data = resp_payload[20:]
|
||||
assert type_ == 0x0101, "expected type 0X0101, got 0x{:04x}".format(type_)
|
||||
assert length == 12, "expected length 12, got {}".format(length)
|
||||
assert (
|
||||
magic == 0x2112A442
|
||||
), "expected magic 0x2112a442, got 0x{:08x}".format(magic)
|
||||
assert (
|
||||
tid == b"\x00" * 12
|
||||
), "expected tid 0x000000000000000000000000, got {:x}".format(tid)
|
||||
expected_data = b"\x00\x01\x00\x08\x00\x01" + struct.pack(
|
||||
">HBBBB", sport, 192, 0, 0, 0
|
||||
)
|
||||
assert (
|
||||
data == expected_data
|
||||
), f"unexpected data {data!r} != {expected_data!r}"
|
||||
|
||||
|
||||
@test
|
||||
def test_ipv6_udp_stun():
|
||||
sports = [12345, 55555, 80, 43273]
|
||||
dports = [80, 800, 8000, 3478]
|
||||
payload = bytes.fromhex("000100002112a442000000000000000000000000")
|
||||
for sport in sports:
|
||||
for dport in dports:
|
||||
req = (
|
||||
Ether(dst=MAC_ADDR)
|
||||
/ IPv6(dst=IPV6_ADDR)
|
||||
/ UDP(sport=sport, dport=dport)
|
||||
/ Raw(payload)
|
||||
)
|
||||
resp = srp1(req, timeout=1)
|
||||
assert resp is not None, "expecting answer, got nothing"
|
||||
check_ipv6_checksum(resp)
|
||||
assert UDP in resp
|
||||
udp = resp[UDP]
|
||||
assert udp.sport == dport
|
||||
assert udp.dport == sport
|
||||
resp_payload = udp.payload.load
|
||||
type_, length, magic = struct.unpack(">HHI", resp_payload[:8])
|
||||
tid = resp_payload[8:20]
|
||||
data = resp_payload[20:]
|
||||
assert type_ == 0x0101, "expected type 0X0101, got 0x{:04x}".format(type_)
|
||||
assert length == 24, "expected length 24, got {}".format(length)
|
||||
assert (
|
||||
magic == 0x2112A442
|
||||
), "expected magic 0x2112a442, got 0x{:08x}".format(magic)
|
||||
assert (
|
||||
tid == b"\x00" * 12
|
||||
), "expected tid 0x000000000000000000000000, got {:x}".format(tid)
|
||||
expected_data = (
|
||||
bytes.fromhex("000100140002")
|
||||
+ struct.pack(">H", sport)
|
||||
+ inet_pton(AF_INET6, "2001:41d0::1234:5678")
|
||||
)
|
||||
assert data == expected_data, "unexpected data: {}".format(data)
|
||||
|
||||
|
||||
@test
|
||||
def test_ipv4_udp_stun_change_port():
|
||||
sports = [12345, 55555, 80, 43273]
|
||||
dports = [80, 800, 8000, 3478, 65535]
|
||||
payload = bytes.fromhex("0001000803a3b9464dd8eb75e19481474293845c0003000400000002")
|
||||
for sport in sports:
|
||||
for dport in dports:
|
||||
req = (
|
||||
Ether(dst=MAC_ADDR)
|
||||
/ IP(dst=IPV4_ADDR)
|
||||
/ UDP(sport=sport, dport=dport)
|
||||
/ Raw(payload)
|
||||
)
|
||||
resp = srp1(req, timeout=1)
|
||||
assert resp is not None, "expecting answer, got nothing"
|
||||
check_ip_checksum(resp)
|
||||
assert UDP in resp, "no UDP layer found"
|
||||
udp = resp[UDP]
|
||||
assert (
|
||||
udp.sport == (dport + 1) % 2**16
|
||||
), "expected answer from UDP/{}, got it from UDP/{}".format(
|
||||
(dport + 1) % 2**16, udp.sport
|
||||
)
|
||||
assert (
|
||||
udp.dport == sport
|
||||
), "expected answer to UDP/{}, got it to UDP/{}".format(sport, udp.dport)
|
||||
resp_payload = udp.payload.load
|
||||
type_, length = struct.unpack(">HH", resp_payload[:4])
|
||||
tid = resp_payload[4:20]
|
||||
data = resp_payload[20:]
|
||||
assert type_ == 0x0101, "expected type 0X0101, got 0x{:04x}".format(type_)
|
||||
assert length == 12, "expected length 12, got {}".format(length)
|
||||
assert tid == bytes.fromhex("03a3b9464dd8eb75e19481474293845c"), (
|
||||
"expected tid 0x03a3b9464dd8eb75e19481474293845c, got %r" % tid
|
||||
)
|
||||
expected_data = b"\x00\x01\x00\x08\x00\x01" + struct.pack(
|
||||
">HBBBB", sport, 192, 0, 0, 0
|
||||
)
|
||||
assert (
|
||||
data == expected_data
|
||||
), f"unexpected data {data!r} != {expected_data!r}"
|
||||
|
||||
|
||||
@test
|
||||
def test_ipv6_udp_stun_change_port():
|
||||
sports = [12345, 55555, 80, 43273]
|
||||
dports = [80, 800, 8000, 3478, 65535]
|
||||
payload = bytes.fromhex("0001000803a3b9464dd8eb75e19481474293845c0003000400000002")
|
||||
for sport in sports:
|
||||
for dport in dports:
|
||||
req = (
|
||||
Ether(dst=MAC_ADDR)
|
||||
/ IPv6(dst=IPV6_ADDR)
|
||||
/ UDP(sport=sport, dport=dport)
|
||||
/ Raw(payload)
|
||||
)
|
||||
resp = srp1(req, timeout=1)
|
||||
assert resp is not None, "expecting answer, got nothing"
|
||||
check_ipv6_checksum(resp)
|
||||
assert UDP in resp, "expecting UDP layer in answer, got nothing"
|
||||
udp = resp[UDP]
|
||||
assert (
|
||||
udp.sport == (dport + 1) % 2**16
|
||||
), "expected answer from UDP/{}, got it from UDP/{}".format(
|
||||
(dport + 1) % 2**16, udp.sport
|
||||
)
|
||||
assert (
|
||||
udp.dport == sport
|
||||
), "expected answer to UDP/{}, got it to UDP/{}".format(sport, udp.dport)
|
||||
resp_payload = udp.payload.load
|
||||
type_, length = struct.unpack(">HH", resp_payload[:4])
|
||||
tid = resp_payload[4:20]
|
||||
data = resp_payload[20:]
|
||||
assert type_ == 0x0101, "expected type 0X0101, got 0x{:04x}".format(type_)
|
||||
assert length == 24, "expected length 12, got {}".format(length)
|
||||
assert tid == bytes.fromhex("03a3b9464dd8eb75e19481474293845c"), (
|
||||
"expected tid 0x03a3b9464dd8eb75e19481474293845c, got %r" % tid
|
||||
)
|
||||
expected_data = (
|
||||
bytes.fromhex("000100140002")
|
||||
+ struct.pack(">H", sport)
|
||||
+ inet_pton(AF_INET6, "2001:41d0::1234:5678")
|
||||
)
|
||||
assert (
|
||||
data == expected_data
|
||||
), f"unexpected data {data!r} != {expected_data!r}"
|
|
@ -18,7 +18,6 @@
|
|||
|
||||
import atexit
|
||||
import functools
|
||||
import logging
|
||||
import os
|
||||
from signal import SIGINT
|
||||
import subprocess
|
||||
|
@ -39,16 +38,6 @@ from src.all import test_all
|
|||
from src.conf import IPV4_ADDR, IPV6_ADDR, MAC_ADDR, OUTDIR
|
||||
|
||||
|
||||
def setup_logs():
|
||||
ch = logging.StreamHandler()
|
||||
ch.setFormatter(logging.Formatter("%(levelname)s\t%(message)s"))
|
||||
ch.setLevel(logging.INFO)
|
||||
log = logging.getLogger(__name__)
|
||||
log.setLevel(logging.INFO)
|
||||
log.addHandler(ch)
|
||||
return log
|
||||
|
||||
|
||||
def cleanup_net(iface):
|
||||
global ipfile
|
||||
subprocess.check_call(["ip", "link", "delete", iface])
|
||||
|
@ -77,10 +66,10 @@ def cleanup_net(iface):
|
|||
def setup_net(iface):
|
||||
global IPV4_ADDR
|
||||
# create the interfaces pair
|
||||
atexit.register(functools.partial(cleanup_net, f"{iface}a"))
|
||||
subprocess.check_call(
|
||||
["ip", "link", "add", f"{iface}a", "type", "veth", "peer", f"{iface}b"]
|
||||
)
|
||||
atexit.register(functools.partial(cleanup_net, f"{iface}a"))
|
||||
for sub in "a", "b":
|
||||
subprocess.check_call(["ip", "link", "set", f"{iface}{sub}", "up"])
|
||||
subprocess.check_call(["ip", "addr", "add", "dev", f"{iface}a", "192.0.0.0/31"])
|
||||
|
@ -110,7 +99,6 @@ def setup_net(iface):
|
|||
conf.route6.resync()
|
||||
|
||||
|
||||
LOG = setup_logs()
|
||||
IFACE = "masscanned"
|
||||
setup_net(IFACE)
|
||||
TCPDUMP = bool(os.environ.get("USE_TCPDUMP"))
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue