Logo Search packages:      
Sourcecode: nmap version File versions  Download package

NetworkInventory.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# ***********************IMPORTANT NMAP LICENSE TERMS************************
# *                                                                         *
# * The Nmap Security Scanner is (C) 1996-2009 Insecure.Com LLC. Nmap is    *
# * also a registered trademark of Insecure.Com LLC.  This program is free  *
# * software; you may redistribute and/or modify it under the terms of the  *
# * GNU General Public License as published by the Free Software            *
# * Foundation; Version 2 with the clarifications and exceptions described  *
# * below.  This guarantees your right to use, modify, and redistribute     *
# * this software under certain conditions.  If you wish to embed Nmap      *
# * technology into proprietary software, we sell alternative licenses      *
# * (contact sales@insecure.com).  Dozens of software vendors already       *
# * license Nmap technology such as host discovery, port scanning, OS       *
# * detection, and version detection.                                       *
# *                                                                         *
# * Note that the GPL places important restrictions on "derived works", yet *
# * it does not provide a detailed definition of that term.  To avoid       *
# * misunderstandings, we consider an application to constitute a           *
# * "derivative work" for the purpose of this license if it does any of the *
# * following:                                                              *
# * o Integrates source code from Nmap                                      *
# * o Reads or includes Nmap copyrighted data files, such as                *
# *   nmap-os-db or nmap-service-probes.                                    *
# * o Executes Nmap and parses the results (as opposed to typical shell or  *
# *   execution-menu apps, which simply display raw Nmap output and so are  *
# *   not derivative works.)                                                * 
# * o Integrates/includes/aggregates Nmap into a proprietary executable     *
# *   installer, such as those produced by InstallShield.                   *
# * o Links to a library or executes a program that does any of the above   *
# *                                                                         *
# * The term "Nmap" should be taken to also include any portions or derived *
# * works of Nmap.  This list is not exclusive, but is meant to clarify our *
# * interpretation of derived works with some common examples.  Our         *
# * interpretation applies only to Nmap--we don't speak for other people's  *
# * GPL works.                                                              *
# *                                                                         *
# * If you have any questions about the GPL licensing restrictions on using *
# * Nmap in non-GPL works, we would be happy to help.  As mentioned above,  *
# * we also offer alternative license to integrate Nmap into proprietary    *
# * applications and appliances.  These contracts have been sold to dozens  *
# * of software vendors, and generally include a perpetual license as well  *
# * as providing for priority support and updates as well as helping to     *
# * fund the continued development of Nmap technology.  Please email        *
# * sales@insecure.com for further information.                             *
# *                                                                         *
# * As a special exception to the GPL terms, Insecure.Com LLC grants        *
# * permission to link the code of this program with any version of the     *
# * OpenSSL library which is distributed under a license identical to that  *
# * listed in the included COPYING.OpenSSL file, and distribute linked      *
# * combinations including the two. You must obey the GNU GPL in all        *
# * respects for all of the code used other than OpenSSL.  If you modify    *
# * this file, you may extend this exception to your version of the file,   *
# * but you are not obligated to do so.                                     *
# *                                                                         *
# * If you received these files with a written license agreement or         *
# * contract stating terms other than the terms above, then that            *
# * alternative license agreement takes precedence over these comments.     *
# *                                                                         *
# * Source is provided to this software because we believe users have a     *
# * right to know exactly what a program is going to do before they run it. *
# * This also allows you to audit the software for security holes (none     *
# * have been found so far).                                                *
# *                                                                         *
# * Source code also allows you to port Nmap to new platforms, fix bugs,    *
# * and add new features.  You are highly encouraged to send your changes   *
# * to nmap-dev@insecure.org for possible incorporation into the main       *
# * distribution.  By sending these changes to Fyodor or one of the         *
# * Insecure.Org development mailing lists, it is assumed that you are      *
# * offering the Nmap Project (Insecure.Com LLC) the unlimited,             *
# * non-exclusive right to reuse, modify, and relicense the code.  Nmap     *
# * will always be available Open Source, but this is important because the *
# * inability to relicense code has caused devastating problems for other   *
# * Free Software projects (such as KDE and NASM).  We also occasionally    *
# * relicense the code to third parties as discussed above.  If you wish to *
# * specify special license conditions of your contributions, just say so   *
# * when you send them.                                                     *
# *                                                                         *
# * This program is distributed in the hope that it will be useful, but     *
# * WITHOUT ANY WARRANTY; without even the implied warranty of              *
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU       *
# * General Public License v2.0 for more details at                         *
# * http://www.gnu.org/licenses/gpl-2.0.html , or in the COPYING file       *
# * included with Nmap.                                                     *
# *                                                                         *
# ***************************************************************************/

import os
import re
import unittest
import zenmapCore
import zenmapCore.NmapParser
from zenmapGUI.SearchGUI import SearchParser
from SearchResult import HostSearch

00097 class NetworkInventory(object):
    """This class acts as a container for aggregated scans. It is also
    responsible for opening/saving the aggregation from/to persistent storage."""
    def __init__(self, filename=None):
        # A list of all scans that make up this inventory
        self.scans = []
        
        # A dictionary mapping parsed scans to filenames they were loaded from
        self.filenames = {}
        
        # A dictionary mapping IP addresses into HostInfo objects
        self.hosts = {}
        
        if filename != None:
            self.open_from_file(filename)
    
00113     def add_scan(self, scan, filename=None):
        """Adds a scan to the list of scans. The object passed as an argument
        should be a parsed nmap result."""
        from time import localtime
        
        for host in scan.get_hosts():
            addr = ""
            if host.ipv6 is not None:
                # This is an IPv6 host, so we add the IPv6 address to the map
                addr = host.ipv6["addr"]
            elif host.ip is not None:
                # IPv4
                addr = host.ip["addr"]
            
            if addr not in self.hosts:
                # Add this host to the hosts dictionary, mapped by IP address
                self.hosts[addr] = host.make_clone()
            else:
                # This host is already present in the host list, so we need to update its
                # info with the info held in the current host object
                old_host = self.hosts[addr]
                # We need to find old_host's scan date
                old_date = localtime(0)
                for old_scan in self.scans:
                    if old_host in old_scan.get_hosts():
                        old_date = old_scan.get_date()
                new_date = scan.get_date()
                self._update_host_info(old_host, host, old_date, new_date)
        
        self.scans.append(scan)
        
        if filename != None:
            basename = os.path.basename(filename)
            
            if basename in self.filenames.values():
                # We need to generate a new filename, since this basename already exists
                base = basename
                ext = "xml"
                try:
                    base, ext = basename.rsplit(".", 1)
                except ValueError:
                    pass
                
                counter = 2
                while basename in self.filenames.values():
                    basename = "%s %s.%s" % (base, counter, ext)
                    counter += 1
            
            self.filenames[scan] = basename
                    
00163     def remove_scan(self, scan):
        """Removes a scan and any host information it contained from the inventory."""
        # Note: If a scan is passed in that isn't in the inventory then this
        # method will throw a ValueError Exception and will not finish
        # Remove the scan from our scan list
        self.scans.remove(scan)
        
        # Clear the host dictionary
        self.hosts = {}
        
        # Remember the scan list
        scans = self.scans
        
        # Empty it
        self.scans = []
        
        # Delete the filename entry, if any
        if scan in self.filenames:
            del self.filenames[scan]
        
        # For each scan in the remembered list, append it to the scan list and update
        # the host list accordingly
        for scan in scans:
            self.add_scan(scan)
    
00188     def _update_host_info(self, old_host, new_host, old_date, new_date):
        """This function is called when a host needs to be added to the hosts
        dictionary, but another HostInfo object for that host already exists
        in the dictionary (from a previous scan). In that case, we need to
        update the original HostInfo object so that it holds information from
        both scans."""
        
        # Ports
        for new_port in new_host.ports:
            # Check if new_port is already present in old_host's ports
            for old_port in old_host.ports:
                if old_port["portid"] == new_port["portid"]:
                    # We update old_host's port information to reflect the latest known port state
                    if old_date < new_date:
                        index = old_host.ports.index(old_port)
                        old_host.ports[index] = new_port
                    # Finished processing this new_port, we jump to the next one
                    break
            else:
                # This new_port isn't present in old_host, so we simply append it to
                # old_host's port info
                old_host.ports.append(new_port)
        
        # extraports, ipidsequence, state, tcpsequence, tcptssequence, uptime
        if old_date < new_date:
            old_host.extraports = new_host.extraports
            old_host.ipidsequence = new_host.ipidsequence
            old_host.state = new_host.state
            old_host.tcpsequence = new_host.tcpsequence
            old_host.tcptssequence = new_host.tcptssequence
            old_host.uptime = new_host.uptime
        
        # Comment
        if old_host.comment == "":
            old_host.comment = new_host.comment
        elif new_host.comment != "":
            old_host.comment = "%s\n\n%s" % (old_host.comment, new_host.comment)
        
        # Hostnames
        # Replace old_host's hostname with new_host's if old_host has no
        # hostname or new_host's is newer.
        if len(new_host.hostnames) > 0 and \
           (len(old_host.hostnames) == 0 or old_date < new_date):
            old_host.hostnames = new_host.hostnames
        
        # MAC address
        # If there was no MAC address set in old_host, set it to whatever is in new_host.mac.
        # Do the same if both hosts have a MAC address set, but new_host's address is newer.
        if old_host.mac is None or \
           (old_host.mac is not None and new_host.mac is not None and old_date < new_date):
            old_host.mac = new_host.mac
        
        # OS detection fields
        # Replace old_host's OS detection fields with new_host's if old_host has no
        # OS detection info or new_host's info is newer.
        if len(new_host.osclasses) > 0 and (len(old_host.osclasses) == 0 or old_date < new_date):
            old_host.osclasses = new_host.osclasses
            old_host.osmatches = new_host.osmatches
            old_host.ports_used = new_host.ports_used
        
        # Traceroute information
        if len(new_host.trace) > 0 and (len(old_host.trace) == 0 or old_date < new_date):
            old_host.trace = new_host.trace
    
    def get_scans(self):
        return self.scans
    
    def get_hosts(self):
        return self.hosts.values()

    def get_hosts_up(self):
        return filter(lambda h: h.get_state() == 'up', self.hosts.values())

    def get_hosts_down(self):
        return filter(lambda h: h.get_state() == 'down', self.hosts.values())
    
00264     def open_from_file(self, path):
        """Loads a scan from the given file."""
        from zenmapCore.NmapParser import NmapParser
        
        parsed = NmapParser(path)
        parsed.parse()
        self.add_scan(parsed, path)
    
00272     def open_from_dir(self, path):
        """Loads all scans from the given directory into the network inventory."""
        from zenmapCore.NmapParser import NmapParser
        
        for filename in os.listdir(path):
            fullpath = os.path.join(path, filename)
            if os.path.isdir(fullpath):
                continue
            parsed = NmapParser(fullpath)
            parsed.parse()
            self.add_scan(parsed, filename=fullpath)
    
00284     def save_to_file(self, path, index):
        """Saves the scan with the given list index into a file with a given path."""
        f = open(path, 'w')
        self.get_scans()[index].write_xml(f)
        self.filenames[self.get_scans()[index]] = f
        f.close()
    
00291     def _generate_filenames(self, path):
        """Generates filenames for all scans that don't already have a filename."""
        # The directory must not contain filenames other than those in the self.filenames dictionary
        for filename in os.listdir(path):
            if os.path.basename(filename) not in self.filenames.values():
                raise Exception("The destination directory contains a file (%s) that's not a part "
                                "of the current inventory. The inventory will not be saved." %
                                os.path.basename(filename))
        
        for scan in self.scans:
            if scan in self.filenames:
                # This scan already has a filename
                continue
            
            date = "%04d%02d%02d%02d%02d" % (scan.date[0], scan.date[1], scan.date[2],
                                             scan.date[3], scan.date[4])
            if scan.profile_name != "":
                filename = "%s on %s" % (scan.profile_name, scan.target)
            else:
                filename = scan.nmap_command
            
            # Prepend the date
            filename = "%s %s" % (date, filename)
            
            # Sanitize the filename
            for char in ["\"", "'", "/", "\\", "?", "*", ":", ";"]:
                if char in filename:
                    filename = filename.replace(char, "_")
            
            # Filename length check
            # (http://en.wikipedia.org/wiki/Filename#Comparison_of_file_name_limitations)
            if len(filename) > 250:
                filename = filename[:250]
            
            # TODO: Filename security checks?
            
            # Try to open the file in append mode. If file.tell() returns a greater-than-zero
            # value, this means that the file already exists and has some data in it, so we
            # choose another filename until we successfully open a zero-length file.
            filename_full = filename + ".xml"
            counter = 2
            while filename_full in self.filenames.values():
                # There's already a scan with this filename, so we generate a new name by appending
                # the counter value before the file extension.
                filename_full = "%s %s.xml" % (filename, str(counter))
                counter += 1
            
            # Add the filename to the list of saved filenames
            self.filenames[scan] = filename_full
    
00341     def save_to_dir(self, path):
        """Saves all scans in the inventory into a given directory and returns
        a list of (full-path) filenames that were used to save the scans."""
        self._generate_filenames(path)
        
        for scan, filename in self.filenames.iteritems():
            f = open(os.path.join(path, filename), "w")
            scan.write_xml(f)
            f.close()
        
        return self.filenames.values()
    
    def open_from_db(self, id):
        pass
    
    def save_to_db(self):
        # For now, this saves each scan making up the inventory separately in
        # the database.
        from time import time
        from tempfile import mktemp
        from zenmapCore.UmitDB import Scans

        filename = mktemp()

        for parsed in self.get_scans():
            f = open(filename, "w")
            parsed.write_xml(f)
            f.close()

            scan = Scans(scan_name = parsed.scan_name,
                         nmap_xml_output = open(filename).read(),
                         date = time())

class FilteredNetworkInventory(NetworkInventory):
    def __init__(self, filename = None):
        NetworkInventory.__init__(self, filename)

        # A dictionary listing host filtering criteria
        self.search_dict = {}
        self.filtered_hosts = []
        search_keywords = dict()
        search_keywords["target"] = "target"
        search_keywords["t"] = "target"
        search_keywords["inroute"] = "in_route"
        search_keywords["ir"] = "in_route"
        search_keywords["hostname"] = "hostname"
        search_keywords["service"] = "service"
        search_keywords["s"] = "service"
        search_keywords["os"] = "os"
        search_keywords["open"] = "open"
        search_keywords["op"] = "open"
        search_keywords["closed"] = "closed"
        search_keywords["cp"] = "closed"
        search_keywords["filtered"] = "filtered"
        search_keywords["fp"] = "filtered"
        search_keywords["unfiltered"] = "unfiltered"
        search_keywords["ufp"] = "unfiltered"
        search_keywords["open|filtered"] = "open_filtered"
        search_keywords["ofp"] = "open_filtered"
        search_keywords["closed|filtered"] = "closed_filtered"
        search_keywords["cfp"] = "closed_filtered"
        search_keywords[""] = ""
        self.search_parser = SearchParser(self, search_keywords)

    # FIXME: This method doesn't do anything.  We just need to support
    # the type of interface that SearchParser expects in order to use it.
    # Perhaps, we will eventually refactor the SearchParser a little bit
    # more?
    def init_search_dirs(self, junk):
        pass

    def get_hosts(self):
        if len(self.search_dict) > 0:
            return self.filtered_hosts
        else:
            return NetworkInventory.get_hosts(self)
    def get_hosts_up(self):
        if len(self.search_dict) > 0:
            return filter(lambda h: h.get_state() == 'up', self.filtered_hosts)
        else:
            return NetworkInventory.get_hosts_up(self)
    def get_hosts_down(self):
        if len(self.search_dict) > 0:
            return filter(lambda h: h.get_state() == 'down', self.filtered_hosts)
        else:
            return NetworkInventory.get_hosts_down(self)
    def get_total_host_count(self):
        return len(self.hosts)
    def _match_all_args(self, host, operator, args):
        """A helper function that calls the matching function for the given
        operator and each of its arguments."""
        for arg in args:
            if not self.__getattribute__("match_%s" % operator)(host, arg):
                # No match for this operator
                return False
        else:
            # if the operator is not supported, pretend its true
            # All arguments for this operator produced a match
            return True
    def get_host_count(self):
        return len(self.network_inventory.hosts)
    def match_keyword(self, host, keyword):
        return self.match_os(host, keyword) or\
               self.match_target(host, keyword) or\
               self.match_service(host, keyword)
    def match_target(self, host, name):
        return HostSearch.match_target(host, name)
    def match_in_route(self, host, hop):
        hops = host.get_trace().get('hops', [])
        return hop in hops
    def match_hostname(self, host, hostname):
        return HostSearch.match_hostname(host, hostname)
    def match_service(self, host, service):
        return HostSearch.match_service(host, service)
    def match_os(self, host, os):
        return HostSearch.match_os(host, os)
    def match_open(self, host, portno):
        host_ports = host.get_ports()
        return HostSearch.match_port(host_ports, portno, "open")
    def match_closed(self, host, portno):
        host_ports = host.get_ports()
        return HostSearch.match_port(host_ports, portno, "closed")
    def match_filtered(self, host, portno):
        host_ports = host.get_ports()
        return HostSearch.match_port(host_ports, portno, "filtered")
    def match_unfiltered(self, host, portno):
        host_ports = host.get_ports()
        return HostSearch.match_port(host_ports, portno, "unfiltered")
    def match_open_filtered(self, host, portno):
        host_ports = host.get_ports()
        return HostSearch.match_port(host_ports, portno, "open|filtered")
    def match_closed_filtered(self, host, portno):
        host_ports = host.get_ports()
        return HostSearch.match_port(host_ports, portno, "closed|filtered")
    def apply_filter(self, filter_text):
        self.filter_text = filter_text.lower()
        self.search_parser.update(self.filter_text)
        self.filtered_hosts = []
        for hostname, host in self.hosts.iteritems():
            # For each host in this scan
            # Test each given operator against the current host
            for operator, args in self.search_dict.iteritems():
                if not self._match_all_args(host, operator, args):
                    # No match => we discard this scan_result
                    break
            else:
                # All operator-matching functions have returned True, so this
                # host satisfies all conditions
                self.filtered_hosts.append(host)

class NetworkInventoryTest(unittest.TestCase):
    def test_no_external_modification(self):
        """Test that HostInfo objects passed into the inventory are not modified
        during aggregation."""
        scan_1 = zenmapCore.NmapParser.ParserBasics()
        host_a = zenmapCore.NmapParser.HostInfo()
        host_a.hostnames = ["a"]
        host_a.set_state('up')
        scan_1.start = "1000000000"
        scan_1.nmap["hosts"] = [host_a]

        scan_2 = zenmapCore.NmapParser.ParserBasics()
        host_b = zenmapCore.NmapParser.HostInfo()
        host_b.hostnames = ["b"]
        host_b.set_state('up')
        scan_2.start = "1000000001"
        scan_2.nmap["hosts"] = [host_b]

        inv = NetworkInventory()
        inv.add_scan(scan_1)
        inv.add_scan(scan_2)

        self.assertEqual(host_a.hostnames, ["a"])
        self.assertEqual(host_b.hostnames, ["b"])
        self.assertEqual(scan_1.nmap["hosts"], [host_a])
        self.assertEqual(scan_2.nmap["hosts"], [host_b])
        self.assertEqual(inv.get_hosts_up()[0].hostnames, ["b"])
    def test_cancel_and_remove_scan(self):
        """Test that canceling and removing a scan does not blow away the inventory hosts"""
        added_ips = ['10.0.0.1','10.0.0.2']
        removed_ips = ['10.0.0.3']
        scan_1 = zenmapCore.NmapParser.ParserBasics()
        host_a = zenmapCore.NmapParser.HostInfo()
        host_a.hostnames = ["a"]
        host_a.set_ip({'addr':added_ips[0]})
        scan_1.start = "1000000000"
        scan_1.nmap["hosts"] = [host_a]

        scan_2 = zenmapCore.NmapParser.ParserBasics()
        host_b = zenmapCore.NmapParser.HostInfo()
        host_b.hostnames = ["b"]
        host_b.set_ip({'addr':added_ips[1]})
        scan_2.start = "1000000001"
        scan_2.nmap["hosts"] = [host_b]

        scan_3 = zenmapCore.NmapParser.ParserBasics()
        host_c = zenmapCore.NmapParser.HostInfo()
        host_c.hostnames = ["b"]
        host_c.set_ip({'addr':removed_ips[0]})
        scan_3.start = "1000000001"
        scan_3.nmap["hosts"] = [host_c]

        inv = NetworkInventory()
        inv.add_scan(scan_1)
        inv.add_scan(scan_2)
        try:
            inv.remove_scan(scan_3)
        except:
            pass
        self.assertEqual(added_ips, inv.hosts.keys())
        self.assertEqual(host_a.hostnames, ["a"])
        self.assertEqual(host_b.hostnames, ["b"])

class FilteredNetworkInventoryTest(unittest.TestCase):
    def test_filter(self):
        """Test that the filter still works after moving code to the """
        """HostSearch class"""
        from zenmapCore.NmapParser import NmapParser
        inv = FilteredNetworkInventory()
        scan = NmapParser()
        scan.parse_file("test/xml_test9.xml")
        filter_text = "open:22 os:linux service:openssh"
        inv.add_scan(scan)
        inv.apply_filter(filter_text)
        assert(len(inv.get_hosts()) == 2)

if __name__ == "__main__":
    unittest.main()
    if False:
        
        scan1 = NmapParser("/home/ndwi/scanz/neobee_1.xml")
        scan1.parse()
        scan2 = NmapParser("/home/ndwi/scanz/scanme_nmap_org.usr")
        scan2.parse()
        
        inventory1 = NetworkInventory()
        inventory1.add_scan(scan1)
        inventory1.add_scan(scan2)
        
        for host in inventory1.get_hosts():
            print "%s" % host.ip["addr"],
            #if len(host.hostnames) > 0:
            #    print "[%s]:" % host.hostnames[0]["hostname"]
            #else:
            #    print ":"
            #for port in host.ports:
            #    print "  %s: %s" % (port["portid"], port["port_state"])
            #print "  OS matches: %s" % host.osmatches
            #print "  OS classes: %s" % host.osclasses
            #print "  Ports used: %s" % host.ports_used
            #print "  Trace: %s" % host.trace
            #if "hops" in host.trace:
            #    print "         (%d)" % len(host.trace["hops"])
        
        inventory1.remove_scan(scan2)
        print
        for host in inventory1.get_hosts():
            print "%s" % host.ip["addr"],
        
        inventory1.add_scan(scan2)
        print
        for host in inventory1.get_hosts():
            print "%s" % host.ip["addr"],
        
        dir = "/home/ndwi/scanz/top01"
        inventory1.save_to_dir(dir)
        
        inventory2 = NetworkInventory()
        inventory2.open_from_dir(dir)
        
        print
        for host in inventory2.get_hosts():
            print "%s" % host.ip["addr"],

Generated by  Doxygen 1.6.0   Back to index