mirror of
				https://github.com/jtesta/ssh-audit.git
				synced 2025-11-03 18:52:15 +01:00 
			
		
		
		
	Added IPv6 support for DHEat and connection rate tests. (#269)
This commit is contained in:
		@@ -160,6 +160,11 @@ class DHEat:
 | 
			
		||||
        # The SSH2_Kex object that we recieved from the server in a prior connection.  We'll use it as a template to craft our own kex.
 | 
			
		||||
        self.kex = kex
 | 
			
		||||
 | 
			
		||||
        # Resolve the target to an IP address depending on the user preferences (IPv4 or IPv6).
 | 
			
		||||
        self.debug("Resolving target %s..." % self.target)
 | 
			
		||||
        self.target_address_family, self.target_ip_address = DHEat._resolve_hostname(self.target, aconf.ip_version_preference)
 | 
			
		||||
        self.debug("Resolved %s to %s (address family %u)" % (self.target, self.target_ip_address, self.target_address_family))
 | 
			
		||||
 | 
			
		||||
        # The connection and read timeouts.
 | 
			
		||||
        self.connect_timeout = aconf.timeout
 | 
			
		||||
        self.read_timeout = aconf.timeout
 | 
			
		||||
@@ -324,6 +329,11 @@ class DHEat:
 | 
			
		||||
            print("\n%sUnfortunately, this feature is not currently functional under Windows.%s  This should get fixed in a future release.  See: <https://github.com/jtesta/ssh-audit/issues/261>" % (DHEat.YELLOWB, DHEat.CLEAR))
 | 
			
		||||
            return ""
 | 
			
		||||
 | 
			
		||||
        # Resolve the target into an IP address
 | 
			
		||||
        out.d("Resolving target %s..." % aconf.host)
 | 
			
		||||
        target_address_family, target_ip_address = DHEat._resolve_hostname(aconf.host, aconf.ip_version_preference)
 | 
			
		||||
        out.d("Resolved %s to %s (address family %u)" % (aconf.host, target_ip_address, target_address_family))
 | 
			
		||||
 | 
			
		||||
        spinner = ["-", "\\", "|", "/"]
 | 
			
		||||
        spinner_index = 0
 | 
			
		||||
 | 
			
		||||
@@ -349,7 +359,7 @@ class DHEat:
 | 
			
		||||
                rate_str = " at a max rate of %s%u%s connections per second" % (DHEat.WHITEB, aconf.conn_rate_test_target_rate, DHEat.CLEAR)
 | 
			
		||||
 | 
			
		||||
            print()
 | 
			
		||||
            print("Performing non-disruptive rate test against %s[%s]:%u%s with %s%u%s concurrent sockets%s.  No Diffie-Hellman requests will be sent." % (DHEat.WHITEB, aconf.host, aconf.port, DHEat.CLEAR, DHEat.WHITEB, concurrent_sockets, DHEat.CLEAR, rate_str))
 | 
			
		||||
            print("Performing non-disruptive rate test against %s[%s]:%u%s with %s%u%s concurrent sockets%s.  No Diffie-Hellman requests will be sent." % (DHEat.WHITEB, target_ip_address, aconf.port, DHEat.CLEAR, DHEat.WHITEB, concurrent_sockets, DHEat.CLEAR, rate_str))
 | 
			
		||||
            print()
 | 
			
		||||
 | 
			
		||||
            # Make room for the multi-line output.
 | 
			
		||||
@@ -426,11 +436,11 @@ class DHEat:
 | 
			
		||||
 | 
			
		||||
            # Open new sockets until we've hit the number of concurrent sockets, or if we exceeded the number of maximum connections.
 | 
			
		||||
            while (len(socket_dict) < concurrent_sockets) and (len(socket_dict) + num_opened_connections < max_connections):
 | 
			
		||||
                s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 | 
			
		||||
                s = socket.socket(target_address_family, socket.SOCK_STREAM)
 | 
			
		||||
                s.setblocking(False)
 | 
			
		||||
 | 
			
		||||
                # out.d("Creating socket (%u of %u already exist)..." % (len(socket_dict), concurrent_sockets), write_now=True)
 | 
			
		||||
                ret = s.connect_ex((aconf.host, aconf.port))
 | 
			
		||||
                ret = s.connect_ex((target_ip_address, aconf.port))
 | 
			
		||||
                num_attempted_connections += 1
 | 
			
		||||
                if ret in [0, 115]:  # Check if connection is successful or EINPROGRESS.
 | 
			
		||||
                    socket_dict[s] = now
 | 
			
		||||
@@ -743,6 +753,22 @@ class DHEat:
 | 
			
		||||
        print()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def _resolve_hostname(host: str, ip_version_preference: List[int]) -> Tuple[int, str]:
 | 
			
		||||
        '''Resolves a hostname to its IPv4 or IPv6 address, depending on user preference.'''
 | 
			
		||||
 | 
			
		||||
        family = socket.AF_UNSPEC
 | 
			
		||||
        if len(ip_version_preference) == 1:
 | 
			
		||||
            family = socket.AF_INET if ip_version_preference[0] == 4 else socket.AF_INET6
 | 
			
		||||
 | 
			
		||||
        r = socket.getaddrinfo(host, 0, family, socket.SOCK_STREAM)
 | 
			
		||||
        for address_family, socktype, _, _, addr in r:
 | 
			
		||||
            if socktype == socket.SOCK_STREAM:
 | 
			
		||||
                return address_family, addr[0]
 | 
			
		||||
 | 
			
		||||
        return -1, ''
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def _run(self) -> bool:
 | 
			
		||||
        '''Where all the magic happens.'''
 | 
			
		||||
 | 
			
		||||
@@ -751,7 +777,7 @@ class DHEat:
 | 
			
		||||
        if sys.platform == "win32":
 | 
			
		||||
            self.output("%sWARNING:%s this feature has not been thoroughly tested on Windows.  It may perform worse than on UNIX OSes." % (self.YELLOWB, self.CLEAR))
 | 
			
		||||
 | 
			
		||||
        self.output("Running DHEat test against %s[%s]:%u%s with %s%u%s concurrent sockets..." % (self.WHITEB, self.target, self.port, self.CLEAR, self.WHITEB, self.concurrent_connections, self.CLEAR))
 | 
			
		||||
        self.output("Running DHEat test against %s[%s]:%u%s with %s%u%s concurrent sockets..." % (self.WHITEB, self.target_ip_address, self.port, self.CLEAR, self.WHITEB, self.concurrent_connections, self.CLEAR))
 | 
			
		||||
 | 
			
		||||
        # If the user didn't specify an exact kex algorithm to test, check our prioritized list against what the server supports.  Larger p-values (such as group18: 8192-bits) cause the most strain on the server.
 | 
			
		||||
        chosen_alg = ""
 | 
			
		||||
@@ -894,7 +920,8 @@ class DHEat:
 | 
			
		||||
 | 
			
		||||
        # Copy variables from the object (which might exist in another process?).  This might cut down on inter-process overhead.
 | 
			
		||||
        connect_timeout = self.connect_timeout
 | 
			
		||||
        target = self.target
 | 
			
		||||
        target_ip_address = self.target_ip_address
 | 
			
		||||
        target_address_family = self.target_address_family
 | 
			
		||||
        port = self.port
 | 
			
		||||
 | 
			
		||||
        # Determine if we are attacking with a GEX.
 | 
			
		||||
@@ -945,17 +972,17 @@ class DHEat:
 | 
			
		||||
                num_socket_exceptions = 0
 | 
			
		||||
                num_openssh_throttled_connections = 0
 | 
			
		||||
 | 
			
		||||
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 | 
			
		||||
            s = socket.socket(target_address_family, socket.SOCK_STREAM)
 | 
			
		||||
            s.settimeout(connect_timeout)
 | 
			
		||||
 | 
			
		||||
            # Loop until a successful TCP connection is made.
 | 
			
		||||
            connected = False
 | 
			
		||||
            while not connected:
 | 
			
		||||
 | 
			
		||||
                # self.debug("Connecting to %s:%d" % (self.target, self.port))
 | 
			
		||||
                # self.debug("Connecting to %s:%d" % (self.target_ip_address, self.port))
 | 
			
		||||
                try:
 | 
			
		||||
                    num_attempted_tcp_connections += 1
 | 
			
		||||
                    s.connect((target, port))
 | 
			
		||||
                    s.connect((target_ip_address, port))
 | 
			
		||||
                    connected = True
 | 
			
		||||
                except OSError as e:
 | 
			
		||||
                    self.debug("Failed to connect: %s" % str(e))
 | 
			
		||||
 
 | 
			
		||||
@@ -421,6 +421,8 @@ def output_recommendations(out: OutputBuffer, algs: Algorithms, algorithm_recomm
 | 
			
		||||
 | 
			
		||||
                        fn = level_to_output[level]
 | 
			
		||||
 | 
			
		||||
                        an = '?'
 | 
			
		||||
                        sg = '?'
 | 
			
		||||
                        if action == 'del':
 | 
			
		||||
                            an, sg = 'remove', '-'
 | 
			
		||||
                            ret = False
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user