Tests: use a veth pair of interfaces rather than a tap

This will allow the use of "regular" network tools and scanners (nc /
socat, Nmap, Masscan).
This commit is contained in:
Pierre Lalet 2021-12-19 18:45:45 +01:00
parent 719101110f
commit 951b5a0ba0
3 changed files with 157 additions and 101 deletions

View file

@ -16,6 +16,8 @@
# You should have received a copy of the GNU General Public License
# along with Masscanned. If not, see <http://www.gnu.org/licenses/>.
import atexit
import functools
import logging
import os
import subprocess
@ -31,7 +33,6 @@ else:
HAS_IVRE = True
from scapy.config import conf
from scapy.interfaces import resolve_iface
from scapy.layers.tuntap import TunTapInterface
from src.all import test_all
from src.conf import IPV4_ADDR, IPV6_ADDR, MAC_ADDR, OUTDIR
@ -47,8 +48,65 @@ def setup_logs():
return log
def cleanup_net(iface):
subprocess.check_call(["ip", "link", "delete", iface])
subprocess.check_call(
[
"iptables",
"-D",
"INPUT",
"-i",
iface,
"-m",
"state",
"--state",
"ESTABLISHED",
"-j",
"ACCEPT",
]
)
subprocess.check_call(["iptables", "-D", "INPUT", "-i", iface, "-j", "DROP"])
def setup_net(iface):
global IPV4_ADDR
# create the interfaces pair
subprocess.check_call(
["ip", "link", "add", f"{iface}a", "type", "veth", "peer", f"{iface}b"]
)
atexit.register(functools.partial(cleanup_net, f"{iface}a"))
for sub in "a", "b":
subprocess.check_call(["ip", "link", "set", f"{iface}{sub}", "up"])
subprocess.check_call(["ip", "addr", "add", "dev", f"{iface}a", "192.0.0.0/31"])
subprocess.check_call(
["ip", "addr", "add", "dev", f"{iface}a", "2001:41d0::1234:5678/96"]
)
subprocess.check_call(["ip", "route", "add", "1.2.3.4/32", "via", IPV4_ADDR])
# prevent problems between raw scanners (Scapy, Nmap, Masscan) and
# the host IP stack
subprocess.check_call(
[
"iptables",
"-A",
"INPUT",
"-i",
f"{iface}a",
"-m",
"state",
"--state",
"ESTABLISHED",
"-j",
"ACCEPT",
]
)
subprocess.check_call(["iptables", "-A", "INPUT", "-i", f"{iface}a", "-j", "DROP"])
conf.route.resync()
conf.route6.resync()
LOG = setup_logs()
IFACE = "tap0"
IFACE = "masscanned"
setup_net(IFACE)
TCPDUMP = bool(os.environ.get("USE_TCPDUMP"))
if HAS_IVRE:
ZEEK_PASSIVERECON = bool(os.environ.get("USE_ZEEK"))
@ -63,21 +121,18 @@ with NamedTemporaryFile(delete=False, mode="w") as ipfile:
ipfile.write(f"{IPV6_ADDR}\n")
# create test interface
tap = TunTapInterface(IFACE)
conf.iface = resolve_iface(IFACE)
# set interface
subprocess.check_call(["ip", "addr", "add", "dev", IFACE, "192.0.0.0/31"])
subprocess.check_call(["ip", "addr", "add", "dev", IFACE, "2001:41d0::1234:5678/96"])
subprocess.check_call(["ip", "link", "set", IFACE, "up"])
subprocess.check_call(["ip", "route", "add", "1.2.3.4/32", "via", IPV4_ADDR])
conf.route.resync()
conf.route6.resync()
conf.iface = resolve_iface(f"{IFACE}a")
# start capture
if TCPDUMP:
tcpdump = subprocess.Popen(
["tcpdump", "-enli", IFACE, "-w", os.path.join(OUTDIR, "test_capture.pcap")]
[
"tcpdump",
"-enli",
f"{IFACE}a",
"-w",
os.path.join(OUTDIR, "test_capture.pcap"),
]
)
if ZEEK_PASSIVERECON:
zeek = subprocess.Popen(
@ -86,7 +141,7 @@ if ZEEK_PASSIVERECON:
"-C",
"-b",
"-i",
IFACE,
f"{IFACE}a",
os.path.join(
guess_prefix("zeek"),
"ivre",
@ -103,7 +158,7 @@ if ZEEK_PASSIVERECON:
)
if P0F:
p0f = subprocess.Popen(
["p0f", "-i", IFACE, "-o", os.path.join(OUTDIR, "p0f_log.txt")],
["p0f", "-i", f"{IFACE}a", "-o", os.path.join(OUTDIR, "p0f_log.txt")],
stdout=open(os.path.join(OUTDIR, "p0f.stdout"), "w"),
stderr=open(os.path.join(OUTDIR, "p0f.stderr"), "w"),
)
@ -113,7 +168,7 @@ masscanned = subprocess.Popen(
"./target/debug/masscanned",
"-vvvvv",
"-i",
IFACE,
f"{IFACE}b",
"-f",
ipfile.name,
"-a",
@ -128,7 +183,7 @@ masscanned = subprocess.Popen(
sleep(1)
try:
result = test_all(tap)
result = test_all()
except AssertionError:
result = -1