diff --git a/nmap3/nmap3.py b/nmap3/nmap3.py index 294678c..97fdfe2 100644 --- a/nmap3/nmap3.py +++ b/nmap3/nmap3.py @@ -27,9 +27,12 @@ from xml.etree import ElementTree as ET from xml.etree.ElementTree import ParseError from nmap3.nmapparser import NmapCommandParser -from nmap3.utils import get_nmap_path, user_is_root +from nmap3.utils import get_nmap_path, user_is_root, communicate_with_progress from nmap3.exceptions import NmapXMLParserError, NmapExecutionError import re +from typing import Callable, Optional +import tempfile +import os __author__ = 'Wangolo Joel (inquiry@nmapper.com)' __version__ = '1.9.3' @@ -46,10 +49,8 @@ class Nmap(object): def __init__(self, path:str=''): """ Module initialization - :param path: Path where nmap is installed on a user system. On linux system it's typically on /usr/bin/nmap. """ - self.nmaptool = get_nmap_path(path) # check path, search or raise error self.default_args = "{nmap} {outarg} - " self.maxport = 65535 @@ -110,7 +111,7 @@ def nmap_version(self): return version_data # Unique method for repetitive tasks - Use of 'target' variable instead of 'host' or 'subnet' - no need to make difference between 2 strings that are used for the same purpose - def scan_command(self, target, arg, args=None, timeout=None): + def scan_command(self, target, arg, args=None, timeout=None, progress_callback=None): self.target = target command_args = "{target} {default}".format(target=target, default=arg) @@ -119,7 +120,7 @@ def scan_command(self, target, arg, args=None, timeout=None): scancommand += " {0}".format(args) scan_shlex = shlex.split(scancommand) - output = self.run_command(scan_shlex, timeout=timeout) + output = self.run_command(scan_shlex, timeout=timeout, progress_callback=progress_callback) file_name=re.search(r'(\-oX|-oN-|oG)\s+[a-zA-Z-_0-9]{1,100}\.[a-zA-Z]+',scancommand) if file_name: file_name=scancommand[file_name.start():file_name.end()].split(" ")[0] @@ -127,7 +128,7 @@ def scan_command(self, target, arg, args=None, timeout=None): xml_root = self.get_xml_et(output) return xml_root - def scan_top_ports(self, target, default=10, args=None, timeout=None): + def scan_top_ports(self, target, default=10, args=None, timeout=None, progress_callback=None): """ Perform nmap's top ports scan @@ -150,7 +151,7 @@ def scan_top_ports(self, target, default=10, args=None, timeout=None): scan_shlex = shlex.split(scan_command) # Run the command and get the output - output = self.run_command(scan_shlex, timeout=timeout) + output = self.run_command(scan_shlex, timeout=timeout, progress_callback=progress_callback) if not output: # Probaby and error was raise raise ValueError("Unable to perform requested command") @@ -160,7 +161,7 @@ def scan_top_ports(self, target, default=10, args=None, timeout=None): self.top_ports = self.parser.filter_top_ports(xml_root) return self.top_ports - def nmap_dns_brute_script(self, target, dns_brute="--script dns-brute.nse", args=None, timeout=None): + def nmap_dns_brute_script(self, target, dns_brute="--script dns-brute.nse", args=None, timeout=None, progress_callback=None): """ Perform nmap scan using the dns-brute script @@ -179,7 +180,7 @@ def nmap_dns_brute_script(self, target, dns_brute="--script dns-brute.nse", args dns_brute_shlex = shlex.split(dns_brute_command) # prepare it for popen # Run the command and get the output - output = self.run_command(dns_brute_shlex, timeout=timeout) + output = self.run_command(dns_brute_shlex, timeout=timeout, progress_callback=progress_callback) # Begin parsing the xml response xml_root = self.get_xml_et(output) @@ -200,42 +201,42 @@ def nmap_version_detection(self, target, arg="-sV", args=None, timeout=None): # Using of basic options for stealth scan @user_is_root - def nmap_stealth_scan(self, target, arg="-Pn -sZ", args=None): + def nmap_stealth_scan(self, target, arg="-Pn -sZ", args=None, progress_callback=None): """ nmap -oX - nmmapper.com -Pn -sZ """ - xml_root = self.scan_command(target=target, arg=arg, args=args) + xml_root = self.scan_command(target=target, arg=arg, args=args, progress_callback=None) self.top_ports = self.parser.filter_top_ports(xml_root) return self.top_ports - def nmap_detect_firewall(self, target, arg="-sA", args=None): # requires root + def nmap_detect_firewall(self, target, arg="-sA", args=None, progress_callback=None): # requires root """ nmap -oX - nmmapper.com -sA @ TODO """ - return self.scan_command(target=target, arg=arg, args=args) + return self.scan_command(target=target, arg=arg, args=args, progress_callback=progress_callback) # TODO @user_is_root - def nmap_os_detection(self, target, arg="-O", args=None): # requires root + def nmap_os_detection(self, target, arg="-O", args=None, progress_callback=None): # requires root """ nmap -oX - nmmapper.com -O NOTE: Requires root """ - xml_root = self.scan_command(target=target, arg=arg, args=args) + xml_root = self.scan_command(target=target, arg=arg, args=args, progress_callback=progress_callback) results = self.parser.os_identifier_parser(xml_root) return results - def nmap_subnet_scan(self, target, arg="-p-", args=None): # requires root + def nmap_subnet_scan(self, target, arg="-p-", args=None, progress_callback=None): # requires root """ nmap -oX - nmmapper.com -p- NOTE: Requires root """ - xml_root = self.scan_command(target=target, arg=arg, args=args) + xml_root = self.scan_command(target=target, arg=arg, args=args, progress_callback=progress_callback) results = self.parser.filter_top_ports(xml_root) return results - def nmap_list_scan(self, target, arg="-sL", args=None): # requires root + def nmap_list_scan(self, target, arg="-sL", args=None, progress_callback=None): # requires root """ The list scan is a degenerate form of target discovery that simply lists each target of the network(s) specified, without sending any packets to the target targets. @@ -243,36 +244,83 @@ def nmap_list_scan(self, target, arg="-sL", args=None): # requires root NOTE: /usr/bin/nmap -oX - 192.168.178.1/24 -sL """ self.target = target - xml_root = self.scan_command(target=target, arg=arg, args=args) + xml_root = self.scan_command(target=target, arg=arg, args=args, progress_callback=progress_callback) results = self.parser.filter_top_ports(xml_root) return results - def run_command(self, cmd, timeout=None): - """ - Runs the nmap command using popen + def run_command(self, cmd: list[str], timeout: int | None = None, progress_callback: Optional[Callable[[str], None]] | None = None): + """ + Runs the nmap command using popen. + + Parameters + ---------- + cmd : list[str] + The command to run, e.g. ['/usr/bin/nmap', '-oX', '-', 'nmmapper.com', '--top-ports', '10']. + timeout : int | None + Timeout in seconds for the subprocess. If None, waits until finished. + progress_callback : callable[[str], None] | None + Optional callback function called with the current scan progress. + + Example + ------- + def my_progress_callback(progress: str): + print(progress) + + nmap = Nmap() + nmap.run_command( + cmd=["/usr/bin/nmap", "-oX", "-", "example.com", "--top-ports", "10"], + timeout=60, + progress_callback=my_progress_callback + ) + """ + + xml_path = None + if progress_callback: + #get file to store xml output if progress_callback is used + with tempfile.NamedTemporaryFile(mode="w+", suffix=".xml", delete=False) as tmp: + xml_path = tmp.name + + #tell nmap to print status every 1 second + if "--stats-every" not in cmd: + cmd += ["--stats-every", "1s"] + + #tell nmap to write xml to xml_path + if "-oX" in cmd: + index = cmd.index("-oX") + cmd[index+1] = xml_path + else: + cmd += ["-oX", xml_path] - @param: cmd--> the command we want run eg /usr/bin/nmap -oX - nmmapper.com --top-ports 10 - @param: timeout--> command subprocess timeout in seconds. - """ sub_proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, - stderr=subprocess.PIPE + stderr=subprocess.PIPE, + text=True ) + try: - output, errs = sub_proc.communicate(timeout=timeout) + if(progress_callback): + output, errs = communicate_with_progress( + sub_proc=sub_proc, + xml_path=xml_path, + timeout=timeout, + progress_callback=progress_callback + ) + else: + output, errs = sub_proc.communicate(timeout=timeout) except Exception as e: sub_proc.kill() raise (e) - else: - if 0 != sub_proc.returncode: - raise NmapExecutionError( - 'Error during command: "' + ' '.join(cmd) + '"\n\n' \ - + errs.decode('utf8') - ) - # Response is bytes so decode the output and return - return output.decode('utf8').strip() - + finally: + if(xml_path): + os.remove(xml_path) + + if 0 != sub_proc.returncode: + raise NmapExecutionError( + 'Error during command: "' + ' '.join(cmd) + '"\n\n' \ + + errs + ) + return output def get_xml_et(self, command_output): """ @@ -323,7 +371,7 @@ def __init__(self, path:str=''): self.parser = NmapCommandParser(None) # Unique method for repetitive tasks - Use of 'target' variable instead of 'host' or 'subnet' - no need to make difference between 2 strings that are used for the same purpose. Creating a scan template as a switcher - def scan_command(self, scan_type, target, args, timeout=None): + def scan_command(self, scan_type, target, args, timeout=None, progress_callback=None): def tpl(i): scan_template = { 1: self.fin_scan, @@ -348,7 +396,7 @@ def tpl(i): scan_shlex = shlex.split(scan_type_command) # Use the ping scan parser - output = self.run_command(scan_shlex, timeout=timeout) + output = self.run_command(scan_shlex, timeout=timeout, progress_callback=progress_callback) xml_root = self.get_xml_et(output) return xml_root @@ -356,30 +404,30 @@ def tpl(i): @user_is_root - def nmap_fin_scan(self, target, args=None): + def nmap_fin_scan(self, target, args=None, progress_callback=None): """ Perform scan using nmap's fin scan @cmd nmap -sF 192.168.178.1 """ - xml_root = self.scan_command(self.fin_scan, target=target, args=args) + xml_root = self.scan_command(self.fin_scan, target=target, args=args, progress_callback=progress_callback) results = self.parser.filter_top_ports(xml_root) return results @user_is_root - def nmap_syn_scan(self, target, args=None): + def nmap_syn_scan(self, target, args=None, progress_callback=None): """ Perform syn scan on this given target @cmd nmap -sS 192.168.178.1 """ - xml_root = self.scan_command(self.sync_scan, target=target, args=args) + xml_root = self.scan_command(self.sync_scan, target=target, args=args, progress_callback=progress_callback) results = self.parser.filter_top_ports(xml_root) return results - def nmap_tcp_scan(self, target, args=None): + def nmap_tcp_scan(self, target, args=None, progress_callback=None): """ Scan target using the nmap tcp connect @@ -387,12 +435,12 @@ def nmap_tcp_scan(self, target, args=None): """ if (args): assert (isinstance(args, str)), "Expected string got {0} instead".format(type(args)) - xml_root = self.scan_command(self.tcp_connt, target=target, args=args) + xml_root = self.scan_command(self.tcp_connt, target=target, args=args, progress_callback=progress_callback) results = self.parser.filter_top_ports(xml_root) return results @user_is_root - def nmap_udp_scan(self, target, args=None): + def nmap_udp_scan(self, target, args=None, progress_callback=None): """ Scan target using the nmap tcp connect @@ -401,37 +449,37 @@ def nmap_udp_scan(self, target, args=None): if (args): assert (isinstance(args, str)), "Expected string got {0} instead".format(type(args)) - xml_root = self.scan_command(self.udp_scan, target=target, args=args) + xml_root = self.scan_command(self.udp_scan, target=target, args=args, progress_callback=progress_callback) results = self.parser.filter_top_ports(xml_root) return results - def nmap_ping_scan(self, target, args=None): + def nmap_ping_scan(self, target, args=None, progress_callback=None): """ Scan target using nmaps' ping scan @cmd nmap -sP 192.168.178.1 """ - xml_root = self.scan_command(self.ping_scan, target=target, args=args) + xml_root = self.scan_command(self.ping_scan, target=target, args=args, progress_callback=progress_callback) results = self.parser.filter_top_ports(xml_root) return results - def nmap_idle_scan(self, target, args=None): + def nmap_idle_scan(self, target, args=None, progress_callback=None): """ Using nmap idle_scan @cmd nmap -sL 192.168.178.1 """ - xml_root = self.scan_command(self.idle_scan, target=target, args=args) + xml_root = self.scan_command(self.idle_scan, target=target, args=args, progress_callback=progress_callback) results = self.parser.filter_top_ports(xml_root) return results - def nmap_ip_scan(self, target, args=None): + def nmap_ip_scan(self, target, args=None, progress_callback=None): """ Using nmap ip_scan @cmd nmap -sO 192.168.178.1 """ - xml_root = self.scan_command(self.ip_scan, target=target, args=args) + xml_root = self.scan_command(self.ip_scan, target=target, args=args, progress_callback=progress_callback) results = self.parser.filter_top_ports(xml_root) return results @@ -454,7 +502,7 @@ def __init__(self, path:str=''): self.disable_dns = "-n" self.parser = NmapCommandParser(None) - def scan_command(self, scan_type, target, args, timeout=None): + def scan_command(self, scan_type, target, args, timeout=None, progress_callback=None): def tpl(i): scan_template = { 1: self.port_scan_only, @@ -476,23 +524,23 @@ def tpl(i): scan_shlex = shlex.split(scan_type_command) # Use the ping scan parser - output = self.run_command(scan_shlex, timeout=timeout) + output = self.run_command(scan_shlex, timeout=timeout, progress_callback=progress_callback) xml_root = self.get_xml_et(output) return xml_root raise Exception("Something went wrong") - def nmap_portscan_only(self, target, args=None): + def nmap_portscan_only(self, target, args=None, progress_callback=None): """ Scan target using the nmap tcp connect @cmd nmap -Pn 192.168.178.1 """ - xml_root = self.scan_command(self.port_scan_only, target=target, args=args) + xml_root = self.scan_command(self.port_scan_only, target=target, args=args, progress_callback=progress_callback) results = self.parser.filter_top_ports(xml_root) return results - def nmap_no_portscan(self, target, args=None): + def nmap_no_portscan(self, target, args=None, progress_callback=None): """ Scan target using the nmap tcp connect @@ -500,29 +548,29 @@ def nmap_no_portscan(self, target, args=None): """ if (args): assert (isinstance(args, str)), "Expected string got {0} instead".format(type(args)) - xml_root = self.scan_command(self.no_port_scan, target=target, args=args) + xml_root = self.scan_command(self.no_port_scan, target=target, args=args, progress_callback=progress_callback) results = self.parser.filter_top_ports(xml_root) return results - def nmap_arp_discovery(self, target, args=None): + def nmap_arp_discovery(self, target, args=None, progress_callback=None): """ Scan target using the nmap tcp connect @cmd nmap -PR 192.168.178.1 """ if (args): assert (isinstance(args, str)), "Expected string got {0} instead".format(type(args)) - xml_root = self.scan_command(self.arp_discovery, target=target, args=args) + xml_root = self.scan_command(self.arp_discovery, target=target, args=args, progress_callback=progress_callback) results = self.parser.filter_top_ports(xml_root) return results - def nmap_disable_dns(self, target, args=None): + def nmap_disable_dns(self, target, args=None, progress_callback=None): """ Scan target using the nmap tcp connect @cmd nmap -n 192.168.178.1 """ if (args): assert (isinstance(args, str)), "Expected string got {0} instead".format(type(args)) - xml_root = self.scan_command(self.disable_dns, target=target, args=args) + xml_root = self.scan_command(self.disable_dns, target=target, args=args, progress_callback=progress_callback) results = self.parser.filter_top_ports(xml_root) return results @@ -553,7 +601,7 @@ async def run_command(self, cmd, timeout=None): # Response is bytes so decode the output and return return data.decode('utf8').strip() - async def scan_command(self, target, arg, args=None, timeout=None): + async def scan_command(self, target, arg, args=None, timeout=None, progress_callback=None): self.target = target command_args = "{target} {default}".format(target=target, default=arg) @@ -561,7 +609,7 @@ async def scan_command(self, target, arg, args=None, timeout=None): if (args): scancommand += " {0}".format(args) - output = await self.run_command(scancommand, timeout=timeout) + output = await self.run_command(scancommand, timeout=timeout, progress_callback=progress_callback) xml_root = self.get_xml_et(output) return xml_root diff --git a/nmap3/utils.py b/nmap3/utils.py index 499de60..3414300 100644 --- a/nmap3/utils.py +++ b/nmap3/utils.py @@ -24,6 +24,11 @@ import os import ctypes import functools +import re +import threading +import queue +from typing import Optional, Callable +import time from nmap3.exceptions import NmapNotInstalledError @@ -100,3 +105,81 @@ async def wrapped(*args, **kwargs): return {"error":True, "msg":"Nmap has not been install on this system yet!"} return wrapped return wrapper + +def communicate_with_progress( + sub_proc: subprocess.Popen, + xml_path: str, + timeout: int = None, + progress_callback: Optional[Callable[[str], None]] = None +) -> tuple[str, str]: + """ + Reads stdout and stderr from a subprocess, optionally reporting progress. + + Parameters + ---------- + sub_proc : subprocess.Popen + The subprocess object to communicate with. + xml_path : str + Path to xml_file for io + timeout : int | None, optional + Timeout in seconds for the subprocess. If None, waits until finished. + progress_callback : Callable[[float], None] | None, optional + A callback function that is called with the current scan progress + and details as a string. + + Returns + ------- + Tuple[str, str] + A tuple containing xml and stderr output. + """ + + stdout_queue = queue.Queue() + stderr_queue = queue.Queue() + + def reader(pipe, q): + for line in pipe: + q.put(line) + pipe.close() + + #start threads to read stdout and stderr + t_out = threading.Thread(target=reader, args=(sub_proc.stdout, stdout_queue)) + t_err = threading.Thread(target=reader, args=(sub_proc.stderr, stderr_queue)) + t_out.start() + t_err.start() + + output = "" + errs = "" + start_time = time.time() + + while True: + + #Check timeout + if timeout is not None and (time.time() - start_time) > timeout: + sub_proc.kill() + errs += f"\nProcess killed after exceeding timeout of {timeout} seconds.\n" + break + + if sub_proc.poll() is not None and stdout_queue.empty() and stderr_queue.empty(): + sub_proc.kill() + break # finished + + #Process stdout + while not stdout_queue.empty(): + line = stdout_queue.get_nowait() + if progress_callback: + #grab the progress line from stdout and pass to progress_callback + match = re.search(r'(\d+(?:\.\d+)?)% done', line) + if match: + progress_callback(line.strip()) + + #Process stderr + while not stderr_queue.empty(): + line = stderr_queue.get_nowait() + errs += line + + time.sleep(0.05) # prevent busy-loop + + with open(xml_path) as f: + output = f.read() + + return output, errs \ No newline at end of file