diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml
new file mode 100644
index 0000000..5c16b63
--- /dev/null
+++ b/.github/workflows/test.yml
@@ -0,0 +1,82 @@
+# This file is part of masscanned.
+# Copyright 2021 - The IVRE project
+#
+# Masscanned is free software: you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Masscanned is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
+# License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Masscanned. If not, see .
+
+name: Build masscanned
+
+on: [push, pull_request]
+
+jobs:
+ build:
+ runs-on: ubuntu-latest
+ steps:
+
+ - name: Git checkout
+ uses: actions/checkout@v2
+
+ - name: Get Rust toolchain
+ uses: actions-rs/toolchain@v1
+ with:
+ toolchain: stable
+
+ - name: Run cargo build
+ uses: actions-rs/cargo@v1
+ with:
+ command: build
+
+ - name: Run cargo test
+ uses: actions-rs/cargo@v1
+ with:
+ command: test
+
+ - name: Create build archive
+ run: tar cf masscanned.tar target/debug/masscanned
+
+ - name: Upload binary
+ uses: actions/upload-artifact@v2
+ with:
+ name: masscanned.tar
+ path: masscanned.tar
+
+ test:
+ needs: build
+ runs-on: ubuntu-latest
+ steps:
+
+ - name: Git checkout
+ uses: actions/checkout@v2
+
+ - name: Get binary
+ uses: actions/download-artifact@v2
+ with:
+ name: masscanned.tar
+
+ - name: Extract build archive
+ run: tar xf masscanned.tar
+
+ - name: Use Python
+ uses: actions/setup-python@v2
+ with:
+ python-version: 3.9
+
+ - name: Install dependencies
+ run: sudo pip install -r test/requirements.txt
+
+ - name: Run tests
+ run: sudo python test/test_masscanned.py
+
+ - name: Display logs
+ run: echo STDOUT; cat test/res/masscanned.stdout && echo && echo STDERR && cat test/res/masscanned.stderr
+ if: failure()
diff --git a/.gitignore b/.gitignore
index f07cd65..3feaee7 100644
--- a/.gitignore
+++ b/.gitignore
@@ -5,6 +5,8 @@ Cargo.lock
# Vim temporary files
*.swp
*.swo
+# Emacs temporary files
+*~
*__pycache__*
test/res/*
diff --git a/README.md b/README.md
index 0dd352b..95d2a9f 100644
--- a/README.md
+++ b/README.md
@@ -1,3 +1,5 @@
+[](https://github.com/ivre/masscanned/actions/workflows/test.yml?branch=master)
+
# Masscanned
**Masscanned** (name inspired, of course, by [masscan](https://github.com/robertdavidgraham/masscan))
diff --git a/test/requirements.txt b/test/requirements.txt
new file mode 100644
index 0000000..ebda39b
--- /dev/null
+++ b/test/requirements.txt
@@ -0,0 +1,2 @@
+scapy
+requests
diff --git a/test/src/all.py b/test/src/all.py
index e031228..bd121ed 100644
--- a/test/src/all.py
+++ b/test/src/all.py
@@ -29,20 +29,22 @@ LOG = logging.getLogger(__name__)
LOG.setLevel(logging.DEBUG)
LOG.addHandler(ch)
-tests = list()
+tests = []
+errors = []
# decorator to automatically add a function to tests
def test(f):
+ global errors, tests
OK = "\033[1mOK\033[0m"
KO = "\033[1m\033[1;%dmKO\033[0m" % 31
- global tests
fname = f.__name__.ljust(50, '.')
def w(iface):
try:
f(iface)
LOG.info("{}{}".format(fname, OK))
except AssertionError as e:
- LOG.info("{}{}: {}".format(fname, KO, e))
+ LOG.error("{}{}: {}".format(fname, KO, e))
+ errors.append(fname)
tests.append(w)
return w
@@ -80,7 +82,7 @@ def check_ipv6_checksum(pkt):
@test
def test_arp_req(iface):
##### ARP #####
- arp_req = Ether()/ARP(psrc='192.0.0.2', pdst=IPV4_ADDR)
+ arp_req = Ether(dst=ETHER_BROADCAST)/ARP(psrc='192.0.0.2', pdst=IPV4_ADDR)
arp_repl = iface.sr1(arp_req, timeout=1)
assert(arp_repl is not None), "expecting answer, got nothing"
assert(ARP in arp_repl), "no ARP layer found"
@@ -425,7 +427,8 @@ def test_ipv4_udp_stun(iface):
assert(length == 12), "expected length 12, got {}".format(length)
assert(magic == 0x2112a442), "expected magic 0x2112a442, got 0x{:08x}".format(magic)
assert(tid == b'\x00' * 12), "expected tid 0x000000000000000000000000, got {:x}".format(tid)
- assert(data == bytes.fromhex("000100080001") + struct.pack(">H", sport) + bytes.fromhex("00000000")), "unexpected data"
+ assert(data[:8] == bytes.fromhex("000100080001") + struct.pack(">H", sport)), f"unexpected data {data!r}"
+ assert(len(data) == 12), f"unexpected data {data!r}"
@test
def test_ipv6_udp_stun(iface):
@@ -474,7 +477,8 @@ def test_ipv4_udp_stun_change_port(iface):
assert(type_ == 0x0101), "expected type 0X0101, got 0x{:04x}".format(type_)
assert(length == 12), "expected length 12, got {}".format(length)
assert(tid == bytes.fromhex("03a3b9464dd8eb75e19481474293845c")), "expected tid 0x03a3b9464dd8eb75e19481474293845c, got %r" % tid
- assert(data == bytes.fromhex("000100080001") + struct.pack(">H", sport) + bytes.fromhex("00000000")), "unexpected data"
+ assert(data[:8] == bytes.fromhex("000100080001") + struct.pack(">H", sport)), f"unexpected data {data!r}"
+ assert(len(data) == 12), f"unexpected data {data!r}"
@test
def test_ipv6_udp_stun_change_port(iface):
@@ -591,3 +595,4 @@ def test_all(iface):
# execute tests
for t in tests:
t(iface)
+ return len(errors)
diff --git a/test/test_masscanned.py b/test/test_masscanned.py
index 7384005..49a99ef 100755
--- a/test/test_masscanned.py
+++ b/test/test_masscanned.py
@@ -16,63 +16,79 @@
# You should have received a copy of the GNU General Public License
# along with Masscanned. If not, see .
-from scapy.all import *
-from time import sleep
-from tempfile import _get_candidate_names as gen_tmp_filename
-from tempfile import gettempdir
-import subprocess
import logging
-import sys
import os
+import subprocess
+import sys
+from time import sleep
+from tempfile import NamedTemporaryFile
+
+from scapy.config import conf
+from scapy.interfaces import resolve_iface
+from scapy.layers.tuntap import TunTapInterface
from src.all import test_all
from src.conf import *
-# if args in CLI, they are passed to masscanned
-if len(sys.argv) > 1:
- args = " ".join(sys.argv[1:])
-else:
- args = ""
+def setup_logs():
+ ch = logging.StreamHandler()
+ ch.setFormatter(logging.Formatter("%(levelname)s\t%(message)s"))
+ ch.setLevel(logging.INFO)
+ log = logging.getLogger(__name__)
+ log.setLevel(logging.INFO)
+ log.addHandler(ch)
+ return log
-fmt = logging.Formatter("%(levelname)s\t%(message)s")
-ch = logging.StreamHandler()
-ch.setFormatter(fmt)
-ch.setLevel(logging.INFO)
-LOG = logging.getLogger(__name__)
-LOG.setLevel(logging.INFO)
-LOG.addHandler(ch)
-
-conf.iface = 'tap0'
+LOG = setup_logs()
+IFACE = "tap0"
conf.verb = 0
# prepare configuration file for masscanned
-ipfile = os.path.join(gettempdir(), next(gen_tmp_filename()))
-with open(ipfile, "w") as f:
- f.write("{}\n".format(IPV4_ADDR))
- f.write("{}\n".format(IPV6_ADDR))
+with NamedTemporaryFile(delete=False, mode="w") as ipfile:
+ ipfile.write(f"{IPV4_ADDR}\n")
+ ipfile.write(f"{IPV6_ADDR}\n")
# create test interface
-tap = TunTapInterface(resolve_iface(conf.iface))
+tap = TunTapInterface(IFACE)
+conf.iface = resolve_iface(IFACE)
# set interface
-subprocess.run("ip a a dev {} 192.0.0.2".format(conf.iface), shell=True)
-subprocess.run("ip link set {} up".format(conf.iface), shell=True)
+subprocess.check_call(["ip", "addr", "add", "dev", IFACE, "192.0.0.2"])
+subprocess.check_call(["ip", "link", "set", IFACE, "up"])
# start capture
-tcpdump = subprocess.Popen("tcpdump -enli {} -w {}".format(conf.iface, os.path.join(OUTDIR, "test_capture.pcap")), shell=True,
- stdin=None, stdout=None, stderr=None, close_fds=True)
+tcpdump = subprocess.Popen(
+ ["tcpdump", "-enli", IFACE, "-w", os.path.join(OUTDIR, "test_capture.pcap")]
+)
# run masscanned
-masscanned = subprocess.Popen("RUST_BACKTRACE=1 ./target/debug/masscanned -vvvvv -i {} -f {} -a {} {}".format(conf.iface, ipfile, MAC_ADDR, args), shell=True,
- stdin=None, stdout=open("test/res/masscanned.stdout", "w"), stderr=open("test/res/masscanned.stderr", "w"), close_fds=True)
+masscanned = subprocess.Popen(
+ [
+ "./target/debug/masscanned",
+ "-vvvvv",
+ "-i",
+ IFACE,
+ "-f",
+ ipfile.name,
+ "-a",
+ MAC_ADDR,
+ ]
+ # if args in CLI, they are passed to masscanned
+ + sys.argv[1:],
+ env=dict(os.environ, RUST_BACKTRACE="1"),
+ stdout=open("test/res/masscanned.stdout", "w"),
+ stderr=open("test/res/masscanned.stderr", "w"),
+)
sleep(1)
try:
- test_all(tap)
+ result = test_all(tap)
except AssertionError:
- pass
+ result = -1
# terminate masscanned
masscanned.kill()
+masscanned.wait()
# terminate capture
-sleep(2)
tcpdump.kill()
+tcpdump.wait()
+sys.exit(result)