mirror of
https://github.com/ivre/masscanned.git
synced 2025-10-02 06:38:21 +00:00
Clean-up Python test script
This commit is contained in:
parent
12aa60b848
commit
9fb050188d
1 changed files with 44 additions and 31 deletions
|
@ -16,56 +16,68 @@
|
|||
# You should have received a copy of the GNU General Public License
|
||||
# along with Masscanned. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from scapy.all import *
|
||||
from time import sleep
|
||||
from tempfile import _get_candidate_names as gen_tmp_filename
|
||||
from tempfile import gettempdir
|
||||
import subprocess
|
||||
import logging
|
||||
import sys
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
from time import sleep
|
||||
from tempfile import NamedTemporaryFile
|
||||
|
||||
from scapy.config import conf
|
||||
from scapy.interfaces import resolve_iface
|
||||
from scapy.layers.tuntap import TunTapInterface
|
||||
|
||||
from src.all import test_all
|
||||
from src.conf import *
|
||||
|
||||
# if args in CLI, they are passed to masscanned
|
||||
if len(sys.argv) > 1:
|
||||
args = " ".join(sys.argv[1:])
|
||||
else:
|
||||
args = ""
|
||||
|
||||
fmt = logging.Formatter("%(levelname)s\t%(message)s")
|
||||
def setup_logs():
|
||||
ch = logging.StreamHandler()
|
||||
ch.setFormatter(fmt)
|
||||
ch.setFormatter(logging.Formatter("%(levelname)s\t%(message)s"))
|
||||
ch.setLevel(logging.INFO)
|
||||
LOG = logging.getLogger(__name__)
|
||||
LOG.setLevel(logging.INFO)
|
||||
LOG.addHandler(ch)
|
||||
log = logging.getLogger(__name__)
|
||||
log.setLevel(logging.INFO)
|
||||
log.addHandler(ch)
|
||||
return log
|
||||
|
||||
LOG = setup_logs()
|
||||
IFACE = "tap0"
|
||||
|
||||
conf.verb = 0
|
||||
|
||||
# prepare configuration file for masscanned
|
||||
ipfile = os.path.join(gettempdir(), next(gen_tmp_filename()))
|
||||
with open(ipfile, "w") as f:
|
||||
f.write("{}\n".format(IPV4_ADDR))
|
||||
f.write("{}\n".format(IPV6_ADDR))
|
||||
with NamedTemporaryFile(delete=False, mode="w") as ipfile:
|
||||
ipfile.write(f"{IPV4_ADDR}\n")
|
||||
ipfile.write(f"{IPV6_ADDR}\n")
|
||||
|
||||
# create test interface
|
||||
tap = TunTapInterface(IFACE)
|
||||
conf.iface = resolve_iface(IFACE)
|
||||
|
||||
# set interface
|
||||
subprocess.run("ip a a dev {} 192.0.0.2".format(conf.iface), shell=True)
|
||||
subprocess.run("ip link set {} up".format(conf.iface), shell=True)
|
||||
subprocess.check_call(["ip", "addr", "add", "dev", IFACE, "192.0.0.2"])
|
||||
subprocess.check_call(["ip", "link", "set", IFACE, "up"])
|
||||
|
||||
# start capture
|
||||
tcpdump = subprocess.Popen("tcpdump -enli {} -w {}".format(conf.iface, os.path.join(OUTDIR, "test_capture.pcap")), shell=True,
|
||||
stdin=None, stdout=None, stderr=None, close_fds=True)
|
||||
tcpdump = subprocess.Popen(
|
||||
["tcpdump", "-enli", IFACE, "-w", os.path.join(OUTDIR, "test_capture.pcap")]
|
||||
)
|
||||
# run masscanned
|
||||
masscanned = subprocess.Popen("RUST_BACKTRACE=1 ./target/debug/masscanned -vvvvv -i {} -f {} -a {} {}".format(conf.iface, ipfile, MAC_ADDR, args), shell=True,
|
||||
stdin=None, stdout=open("test/res/masscanned.stdout", "w"), stderr=open("test/res/masscanned.stderr", "w"), close_fds=True)
|
||||
masscanned = subprocess.Popen(
|
||||
[
|
||||
"./target/debug/masscanned",
|
||||
"-vvvvv",
|
||||
"-i",
|
||||
IFACE,
|
||||
"-f",
|
||||
ipfile.name,
|
||||
"-a",
|
||||
MAC_ADDR,
|
||||
]
|
||||
# if args in CLI, they are passed to masscanned
|
||||
+ sys.argv[1:],
|
||||
env=dict(os.environ, RUST_BACKTRACE="1"),
|
||||
stdout=open("test/res/masscanned.stdout", "w"),
|
||||
stderr=open("test/res/masscanned.stderr", "w"),
|
||||
)
|
||||
sleep(1)
|
||||
|
||||
try:
|
||||
|
@ -75,7 +87,8 @@ except AssertionError:
|
|||
|
||||
# terminate masscanned
|
||||
masscanned.kill()
|
||||
masscanned.wait()
|
||||
# terminate capture
|
||||
sleep(2)
|
||||
tcpdump.kill()
|
||||
tcpdump.wait()
|
||||
sys.exit(result)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue