Proto/RPC: support RPC over UDP

This commit is contained in:
Pierre Lalet 2021-12-22 18:27:02 +01:00
parent f28e0770f6
commit 166f121d76
3 changed files with 122 additions and 53 deletions

View file

@ -17,6 +17,7 @@
import json
import logging
import os
import re
from socket import AF_INET6
from subprocess import check_call
import struct
@ -1223,40 +1224,45 @@ def test_ipv4_tcp_ghost():
@test
def test_rpc_nmap():
with NamedTemporaryFile(delete=False) as xml_result:
check_call(
[
"nmap",
"-n",
"-vv",
"-oX",
"-",
IPV4_ADDR,
"-sSV",
"-p",
"111",
"--script",
"rpcinfo,rpc-grind",
],
stdout=xml_result,
for scan in "SU":
with NamedTemporaryFile(delete=False) as xml_result:
check_call(
[
"nmap",
"-n",
"-vv",
"-oX",
"-",
IPV4_ADDR,
f"-s{scan}V",
"-p",
"111",
"--script",
"rpcinfo,rpc-grind",
],
stdout=xml_result,
)
with NamedTemporaryFile(delete=False, mode="w") as json_result:
DBNmap(output=json_result).store_scan(xml_result.name)
os.unlink(xml_result.name)
with open(json_result.name) as fdesc:
results = [json.loads(line) for line in fdesc]
os.unlink(json_result.name)
assert len(results) == 1, f"Expected 1 result, got {len(results)}"
result = results[0]
assert len(result["ports"]) == 1, f"Expected 1 port, got {len(result['ports'])}"
port = result["ports"][0]
assert port["port"] == 111 and port["protocol"] == (
"tcp" if scan == "S" else "udp"
)
with NamedTemporaryFile(delete=False, mode="w") as json_result:
DBNmap(output=json_result).store_scan(xml_result.name)
os.unlink(xml_result.name)
with open(json_result.name) as fdesc:
results = [json.loads(line) for line in fdesc]
os.unlink(json_result.name)
assert len(results) == 1, f"Expected 1 result, got {len(results)}"
result = results[0]
assert len(result["ports"]) == 1, f"Expected 1 port, got {len(result['ports'])}"
port = result["ports"][0]
assert port["port"] == 111 and port["protocol"] == "tcp"
assert port["service_name"] in {"rpcbind", "nfs"}
assert port["service_extrainfo"] in {"RPC #100000", "RPC #100003"}
assert len(port["scripts"]) == 1, f"Expected 1 script, got {len(port['scripts'])}"
script = port["scripts"][0]
assert script["id"] == "rpcinfo", "Expected rpcinfo script, not found"
assert len(script["rpcinfo"]) == 1
assert port["service_name"] in {"rpcbind", "nfs"}
assert port["service_extrainfo"] in {"RPC #100000", "RPC #100003"}
assert (
len(port["scripts"]) == 1
), f"Expected 1 script, got {len(port['scripts'])}"
script = port["scripts"][0]
assert script["id"] == "rpcinfo", "Expected rpcinfo script, not found"
assert len(script["rpcinfo"]) == 1
@test
@ -1276,6 +1282,17 @@ def test_rpcinfo():
for i in range(2, 5):
assert i in found, f"Missing version {i} in {found}"
os.unlink(rpcout.name)
with NamedTemporaryFile(delete=False) as rpcout:
check_call(["rpcinfo", "-u", IPV4_ADDR, "100000"], stdout=rpcout)
with open(rpcout.name) as fdesc:
found = []
expr = re.compile("^program 100000 version ([0-9]) ready and waiting$")
for line in fdesc:
found.append(int(expr.search(line.strip()).group(1)))
assert len(found) == 3, f"Expected three versions, got {found}"
for i in range(2, 5):
assert i in found, f"Missing version {i} in {found}"
os.unlink(rpcout.name)
def test_all():