From 5a52dcace755d06c5a3f8d71801b9ff47058908c Mon Sep 17 00:00:00 2001 From: Pierre Lalet Date: Sun, 19 Dec 2021 23:25:46 +0100 Subject: [PATCH] Tests: add Nmap test for RPC --- .github/workflows/test.yml | 3 +++ test/requirements.txt | 1 + test/src/all.py | 43 ++++++++++++++++++++++++++++++++++++++ test/test_masscanned.py | 5 +++++ 4 files changed, 52 insertions(+) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 41f2ffe..0fb10e8 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -83,6 +83,9 @@ jobs: - name: Install linting tools run: sudo pip install -U flake8 black + - name: Install Nmap + run: sudo apt-get -q update && sudo apt-get -qy install nmap + - name: Run black run: black -t py36 --check test/test_masscanned.py test/src/ diff --git a/test/requirements.txt b/test/requirements.txt index ebda39b..fd8f3e8 100644 --- a/test/requirements.txt +++ b/test/requirements.txt @@ -1,2 +1,3 @@ +ivre scapy requests diff --git a/test/src/all.py b/test/src/all.py index c4b740e..f8f140a 100644 --- a/test/src/all.py +++ b/test/src/all.py @@ -14,11 +14,16 @@ # You should have received a copy of the GNU General Public License # along with Masscanned. If not, see . +import json import logging +import os from socket import AF_INET6 +from subprocess import check_call import struct +from tempfile import NamedTemporaryFile import zlib +from ivre.db import DBNmap from scapy.compat import raw from scapy.data import ETHER_BROADCAST from scapy.layers.inet import ICMP, IP, TCP, UDP @@ -1166,6 +1171,44 @@ def test_ipv4_tcp_ghost(): ) +@test +def test_rpc_nmap(): + with NamedTemporaryFile(delete=False) as xml_result: + check_call( + [ + "nmap", + "-n", + "-vv", + "-oX", + "-", + IPV4_ADDR, + "-sSV", + "-p", + "111", + "--script", + "rpcinfo,rpc-grind", + ], + stdout=xml_result, + ) + with NamedTemporaryFile(delete=False, mode="w") as json_result: + DBNmap(output=json_result).store_scan(xml_result.name) + os.unlink(xml_result.name) + with open(json_result.name) as fdesc: + results = [json.loads(line) for line in fdesc] + os.unlink(json_result.name) + assert len(results) == 1, f"Expected 1 result, got {len(results)}" + result = results[0] + assert len(result["ports"]) == 1, f"Expected 1 port, got {len(result['ports'])}" + port = result["ports"][0] + assert port["port"] == 111 and port["protocol"] == "tcp" + assert port["service_name"] in {"rpcbind", "nfs"} + assert port["service_extrainfo"] in {"RPC #100000", "RPC #100003"} + assert len(port["scripts"]) == 1, f"Expected 1 script, got {len(port['scripts'])}" + script = port["scripts"][0] + assert script["id"] == "rpcinfo", "Expected rpcinfo script, not found" + assert len(script["rpcinfo"]) == 1 + + def test_all(): global TESTS # execute tests diff --git a/test/test_masscanned.py b/test/test_masscanned.py index de4548a..54fee57 100755 --- a/test/test_masscanned.py +++ b/test/test_masscanned.py @@ -49,6 +49,7 @@ def setup_logs(): def cleanup_net(iface): + global ipfile subprocess.check_call(["ip", "link", "delete", iface]) subprocess.check_call( [ @@ -66,6 +67,10 @@ def cleanup_net(iface): ] ) subprocess.check_call(["iptables", "-D", "INPUT", "-i", iface, "-j", "DROP"]) + try: + os.unlink(ipfile.name) + except NameError: + pass def setup_net(iface):