Logo Search packages:      
Sourcecode: nmap version File versions

NmapCommand.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Copyright (C) 2005 Insecure.Com LLC.
#
# Author: Adriano Monteiro Marques <py.adriano@gmail.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA

# This file contains the definitions of two main classes:
# NmapCommand represents and runs an Nmap command line. CommandConstructor
# builds a command line string from textual option descriptions.

import sys
import os
import re
import threading
import unittest

from tempfile import mktemp
from types import StringTypes
try:
    from subprocess import Popen, PIPE
except ImportError, e:
    raise ImportError(str(e) + ".\n" + _("Python 2.4 or later is required."))

import zenmapCore.Paths
from zenmapCore.NmapOptions import NmapOptions
from zenmapCore.OptionsConf import options_file
from zenmapCore.UmitLogging import log
from zenmapCore.I18N import _
from zenmapCore.UmitConf import PathsConfig

# This variable is used in the call to Popen. It determines whether the
# subprocess invocation uses the shell or not. If it is False on Unix, the nmap
# process is started with execve and a list of arguments, which is what we want.
# (Indeed it fails when shell_state = True because it tries to exec
# ['sh', '-c', 'nmap', '-v', ...], which is wrong.) So normally we would want
# shell_state = False. But if shell_state = False on Windows, a big ugly black
# shell window opens whenever a scan is run, at least under py2exe. So we define
# shell_state = True on Windows only. Windows doesn't have exec, so it runs the
# command basically the same way regardless of shell_state.
shell_state = (sys.platform == "win32")

# The path to the nmap executable as used by Popen.
# Find the value from configuation file paths nmap_command_path
# to use for the location of the nmap executable.
nmap_paths = PathsConfig()
nmap_command_path = nmap_paths.nmap_command_path

log.debug(">>> Platform: %s" % sys.platform)
log.debug(">>> Nmap command path: %s" % nmap_command_path)

def split_quoted(s):
    """Like str.split, except that no splits occur inside quoted strings, and
    quoted strings are unquoted."""
    return [x.replace("\"", "") for x in re.findall('((?:"[^"]*"|[^"\s]+)+)', s)]

00071 class NmapCommand(object):
    """This class represents an Nmap command line. It is responsible for
    starting, stopping, and returning the results from a command-line scan. A
    command line is represented as a string but it is split into a list of
    arguments for execution."""

00077     def __init__(self, command):
        """Initialize an Nmap command. This creates temporary files for
        redirecting the various types of output and sets the backing
        command-line string."""
        self.xml_output = mktemp()
        self.normal_output = mktemp()
        self.stdout_output = mktemp()
        self.stderr_output = mktemp()

        log.debug(">>> Created temporary files:")
        log.debug(">>> XML OUTPUT: %s" % self.xml_output)
        log.debug(">>> NORMAL OUTPUT: %s" % self.normal_output)
        log.debug(">>> STDOUT OUTPUT: %s" % self.stdout_output)
        log.debug(">>> STDERR OUTPUT: %s" % self.stderr_output)

        # Pre-create the output files. This had the comment "Avoid troubles
        # while running at Windows" but it is unnecessary.
        open(self.xml_output,'w').close()
        open(self.normal_output,'w').close()
        open(self.stdout_output,'w').close()
        open(self.stderr_output,'w').close()

        self.command_process = None

        self.command = command

00103     def _get_sanitized_command_list(self):
        """Remove comments from the command, add output options, and return the
        command split up into a list ready for execution."""
        command = self.command

        # Remove comments from command.
        command = re.sub('#.*', '', command)

        # Split back into individual options, honoring double quotes.
        command_list = split_quoted(command)

        # Remove any output options the user may have given; we provide our own.
        i = 0
        while i < len(command_list):
            if re.match('-o[XGASN]$', command_list[i]):
                del command_list[i:i + 2]
            else:
                i += 1

        # Save the XML output to a temporary file.
        command_list.append('-oX')
        command_list.append('%s' % self.xml_output)

        # Save the normal output to a temporary file.
        command_list.append('-oN')
        command_list.append('%s' % self.normal_output)

        # Replace the executable name with the value of nmap_command_path.
        command_list[0] = nmap_command_path

        return command_list

00135     def close(self):
        """Close and remove temporary output files used by the command."""
        self._stdout_handler.close()
        self._stderr_handler.close()

        os.remove(self.xml_output)
        os.remove(self.normal_output)
        os.remove(self.stdout_output)
        os.remove(self.stderr_output)

00145     def kill(self):
        """Kill the nmap subprocess."""
        log.debug(">>> Killing scan process %s" % self.command_process.pid)

        if sys.platform != "win32":
            try:
                from signal import SIGKILL
                os.kill(self.command_process.pid, SIGKILL)
            except:
                pass
        else:
            try:
                # Not sure if this works. Must research a bit more about this
                # subprocess's method to see how it works.
                # In the meantime, this should not raise any exception because
                # we don't care if it killed the process as it never killed it anyway.
                from subprocess import TerminateProcess
                TerminateProcess(self.command_process._handle, 0)
            except:
                pass

00166     def get_path(self):
        """Return a value for the PATH environment variable that is appropriate
        for the current platform. It will be the PATH from the environment plus
        possibly some platform-specific directories."""
        path_env = os.getenv("PATH")
        if path_env is None:
            search_paths = []
        else:
            search_paths = path_env.split(os.pathsep)
        for path in zenmapCore.Paths.get_extra_executable_search_paths():
            if path not in search_paths:
                search_paths.append(path)
        return os.pathsep.join(search_paths)

00180     def run_scan(self):
        """Run the command represented by this class."""
        command_list = self._get_sanitized_command_list()

        self._stdout_handler = open(self.stdout_output, "w+")
        self._stderr_handler = open(self.stderr_output, "w+")
        
        search_paths = self.get_path()
        env = dict(os.environ)
        env["PATH"] = search_paths
        log.debug("PATH=%s" % env["PATH"])

        log.debug("Running command: %s" % repr(command_list))

        self.command_process = Popen(command_list, bufsize=1,
                                     stdin=PIPE,
                                     stdout=self._stdout_handler.fileno(),
                                     stderr=self._stderr_handler.fileno(),
                                     shell=shell_state,
                                     env=env)

00201     def scan_state(self):
        """Return the current state of a running scan. A return value of True
        means the scan is running and a return value of False means the scan
        subprocess completed successfully. If the subprocess terminated with an
        error an exception is raised. The scan must have been started with
        run_scan before calling this method."""
        if self.command_process == None:
            raise Exception("Scan is not running yet!")

        state = self.command_process.poll()

        if state == None:
            return True # True means that the process is still running
        elif state == 0:
            return False # False means that the process had a successful exit
        else:
            stderr = self.get_error()

            log.critical("An error occurred during the scan execution!")
            log.critical('%s' % stderr)
            log.critical("Command that raised the exception: '%s'" % " ".join(self._get_sanitized_command_list()))

            raise Exception("An error occurred during the scan execution!\n'%s'" % stderr)

00225     def get_output(self):
        """Return the stdout of the nmap subprocess."""
        output_desc = open(self.stdout_output, "r")
        output = output_desc.read()

        output_desc.close()
        return output

00233     def get_output_file(self):
        """Return the name of the stdout output file."""
        return self.stdout_output

00237     def get_normal_output(self):
        """Return the normal (-oN) output of the nmap subprocess."""
        normal_desc = open(self.normal_output, "r")
        normal = normal_desc.read()

        normal_desc.close()
        return normal

00245     def get_normal_output_file(self):
        """Return the name of the normal (-oN) output file."""
        return self.normal_output

00249     def get_xml_output(self):
        """Return the XML (-oX) output of the nmap subprocess."""
        xml_desc = open(self.xml_output, "r")
        xml = xml_desc.read()

        xml_desc.close()
        return xml

00257     def get_xml_output_file(self):
        """Return the name of the XML (-oX) output file."""
        return self.xml_output

00261     def get_error(self):
        """Return the stderr output of the nmap subprocess."""
        error_desc = open(self.stderr_output, "r")
        error = error_desc.read()

        error_desc.close()
        return error

00269 class CommandConstructor:
    """This class builds a string representing an Nmap command line from textual
    option descriptions such as 'Aggressive Options' or 'UDP Scan'
    (corresponding to -A and -sU respectively). The name-to-option mapping is
    done by the NmapOptions class. Options are stored in a dict that maps the
    option name to a tuple containing its arguments and "level." The level is
    the degree of repetition for options like -v that can be given more than
    once."""

00278     def __init__(self, options = {}):
        """Initialize a command line using the given options. The options are
        given as a dict mapping option names to arguments."""
        self.options = {}
        self.option_profile = NmapOptions(options_file)
        for k, v in options.items():
            self.add_option(k, v, False)

00286     def add_option(self, option_name, args=[], level=False):
        """Add an option to the command line. Only one of args and level can be
        defined. If both are defined, level takes precedence and args is
        ignored."""
        self.options[option_name] = (args, level)

00292     def remove_option(self, option_name):
        """Remove an option from the command line."""
        if option_name in self.options.keys():
            del(self.options[option_name])

00297     def get_command(self, target):
        """Return the contructed command line as a plain string."""
        splited = ['%s' % nmap_command_path]

        for option_name in self.options:
            option = self.option_profile.get_option(option_name)
            args, level = self.options[option_name]

            if type(args) in StringTypes:
                args = [args]

            if level:
                splited.append((option['option']+' ')*level)
            elif args:
                args = tuple (args)
                splited.append(option['option'] % args[0])
            else:
                splited.append(option['option'])
            
        splited.append(target)
        return ' '.join(splited)

00319     def get_options(self):
        """Return the options used in the command line, as a dict mapping
        options names to arguments. The level, if any, is discarded."""
        return dict([(k, v[0]) for k, v in self.options.items()])

00324 class SplitQuotedTest(unittest.TestCase):
    """A unittest class that tests the split_quoted function."""

    def test_split(self):
        self.assertEqual(split_quoted(''), [])
        self.assertEqual(split_quoted('a'), ['a'])
        self.assertEqual(split_quoted('a b c'), 'a b c'.split())

    def test_quotes(self):
        self.assertEqual(split_quoted('a "b" c'), ['a', 'b', 'c'])
        self.assertEqual(split_quoted('a "b c"'), ['a', 'b c'])
        self.assertEqual(split_quoted('a "b c""d e"'), ['a', 'b cd e'])
        self.assertEqual(split_quoted('a "b c"z"d e"'), ['a', 'b czd e'])

# Module test code.
if __name__ == '__main__':
    unittest.TextTestRunner().run(unittest.TestLoader().loadTestsFromTestCase(SplitQuotedTest))

    # This is an example of how CommandConstructor works. Nmap options are given
    # textual option descriptions.
    command = CommandConstructor()
    command.add_option('Aggressive')
    command.add_option('Version detection')
    command.add_option('UDP Scan')
    command.add_option('Idle Scan', ['10.0.0.138'])
    command.add_option('UDP Scan')
    command.add_option('ACK scan')
    command.remove_option('Idle Scan')
    print command.get_command('localhost')

Generated by  Doxygen 1.6.0   Back to index