mirror of
				https://github.com/jtesta/ssh-audit.git
				synced 2025-11-04 03:02:15 +01:00 
			
		
		
		
	Send KEX before reading server's KEX during host key and GEX tests; this prevents deadlock against certain server implementations.
This commit is contained in:
		@@ -1,7 +1,7 @@
 | 
			
		||||
"""
 | 
			
		||||
   The MIT License (MIT)
 | 
			
		||||
 | 
			
		||||
   Copyright (C) 2017-2020 Joe Testa (jtesta@positronsecurity.com)
 | 
			
		||||
   Copyright (C) 2017-2021 Joe Testa (jtesta@positronsecurity.com)
 | 
			
		||||
 | 
			
		||||
   Permission is hereby granted, free of charge, to any person obtaining a copy
 | 
			
		||||
   of this software and associated documentation files (the "Software"), to deal
 | 
			
		||||
@@ -40,7 +40,7 @@ class GEXTest:
 | 
			
		||||
 | 
			
		||||
    # Creates a new connection to the server.  Returns True on success, or False.
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def reconnect(s: 'SSH_Socket', gex_alg: str) -> bool:
 | 
			
		||||
    def reconnect(s: 'SSH_Socket', kex: 'SSH2_Kex', gex_alg: str) -> bool:
 | 
			
		||||
        if s.is_connected():
 | 
			
		||||
            return True
 | 
			
		||||
 | 
			
		||||
@@ -48,24 +48,22 @@ class GEXTest:
 | 
			
		||||
        if err is not None:
 | 
			
		||||
            return False
 | 
			
		||||
 | 
			
		||||
        unused = None  # pylint: disable=unused-variable
 | 
			
		||||
        unused2 = None  # pylint: disable=unused-variable
 | 
			
		||||
        unused, unused2, err = s.get_banner()
 | 
			
		||||
        _, _, err = s.get_banner()
 | 
			
		||||
        if err is not None:
 | 
			
		||||
            s.close()
 | 
			
		||||
            return False
 | 
			
		||||
 | 
			
		||||
        # Parse the server's initial KEX.
 | 
			
		||||
        packet_type = 0  # pylint: disable=unused-variable
 | 
			
		||||
        packet_type, payload = s.read_packet(2)
 | 
			
		||||
        kex = SSH2_Kex.parse(payload)
 | 
			
		||||
 | 
			
		||||
        # Send our KEX using the specified group-exchange and most of the
 | 
			
		||||
        # server's own values.
 | 
			
		||||
        client_kex = SSH2_Kex(os.urandom(16), [gex_alg], kex.key_algorithms, kex.client, kex.server, False, 0)
 | 
			
		||||
        s.write_byte(Protocol.MSG_KEXINIT)
 | 
			
		||||
        client_kex.write(s)
 | 
			
		||||
        s.send_packet()
 | 
			
		||||
 | 
			
		||||
        # Parse the server's KEX.
 | 
			
		||||
        _, payload = s.read_packet(2)
 | 
			
		||||
        SSH2_Kex.parse(payload)
 | 
			
		||||
 | 
			
		||||
        return True
 | 
			
		||||
 | 
			
		||||
    # Runs the DH moduli test against the specified target.
 | 
			
		||||
@@ -87,7 +85,7 @@ class GEXTest:
 | 
			
		||||
        for gex_alg in GEX_ALGS:
 | 
			
		||||
            if gex_alg in kex.kex_algorithms:
 | 
			
		||||
 | 
			
		||||
                if GEXTest.reconnect(s, gex_alg) is False:
 | 
			
		||||
                if GEXTest.reconnect(s, kex, gex_alg) is False:
 | 
			
		||||
                    break
 | 
			
		||||
 | 
			
		||||
                kex_group = GEX_ALGS[gex_alg]()
 | 
			
		||||
@@ -117,7 +115,7 @@ class GEXTest:
 | 
			
		||||
                    if bits >= smallest_modulus > 0:
 | 
			
		||||
                        break
 | 
			
		||||
 | 
			
		||||
                    if GEXTest.reconnect(s, gex_alg) is False:
 | 
			
		||||
                    if GEXTest.reconnect(s, kex, gex_alg) is False:
 | 
			
		||||
                        reconnect_failed = True
 | 
			
		||||
                        break
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -1,7 +1,7 @@
 | 
			
		||||
"""
 | 
			
		||||
   The MIT License (MIT)
 | 
			
		||||
 | 
			
		||||
   Copyright (C) 2017-2020 Joe Testa (jtesta@positronsecurity.com)
 | 
			
		||||
   Copyright (C) 2017-2021 Joe Testa (jtesta@positronsecurity.com)
 | 
			
		||||
 | 
			
		||||
   Permission is hereby granted, free of charge, to any person obtaining a copy
 | 
			
		||||
   of this software and associated documentation files (the "Software"), to deal
 | 
			
		||||
@@ -117,20 +117,16 @@ class HostKeyTest:
 | 
			
		||||
                        s.close()
 | 
			
		||||
                        return
 | 
			
		||||
 | 
			
		||||
                    # Parse the server's initial KEX.
 | 
			
		||||
                    packet_type = 0  # pylint: disable=unused-variable
 | 
			
		||||
                    packet_type, payload = s.read_packet()
 | 
			
		||||
                    # Send our KEX using the specified group-exchange and most of the server's own values.
 | 
			
		||||
                    client_kex = SSH2_Kex(os.urandom(16), [kex_str], [host_key_type], server_kex.client, server_kex.server, False, 0)
 | 
			
		||||
                    s.write_byte(Protocol.MSG_KEXINIT)
 | 
			
		||||
                    client_kex.write(s)
 | 
			
		||||
                    s.send_packet()
 | 
			
		||||
 | 
			
		||||
                    # Parse the server's KEX.
 | 
			
		||||
                    _, payload = s.read_packet()
 | 
			
		||||
                    SSH2_Kex.parse(payload)
 | 
			
		||||
 | 
			
		||||
                # Send the server our KEXINIT message, using only our
 | 
			
		||||
                # selected kex and host key type.  Send the server's own
 | 
			
		||||
                # list of ciphers and MACs back to it (this doesn't
 | 
			
		||||
                # matter, really).
 | 
			
		||||
                client_kex = SSH2_Kex(os.urandom(16), [kex_str], [host_key_type], server_kex.client, server_kex.server, False, 0)
 | 
			
		||||
 | 
			
		||||
                s.write_byte(Protocol.MSG_KEXINIT)
 | 
			
		||||
                client_kex.write(s)
 | 
			
		||||
                s.send_packet()
 | 
			
		||||
 | 
			
		||||
                # Do the initial DH exchange.  The server responds back
 | 
			
		||||
                # with the host key and its length.  Bingo.  We also get back the host key fingerprint.
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user