mirror of
				https://github.com/jtesta/ssh-audit.git
				synced 2025-11-04 03:02:15 +01:00 
			
		
		
		
	Now SHA256 fingerprints are displayed for RSA and ED25519 host keys. Fixes #2.
This commit is contained in:
		
							
								
								
									
										209
									
								
								ssh-audit.py
									
									
									
									
									
								
							
							
						
						
									
										209
									
								
								ssh-audit.py
									
									
									
									
									
								
							@@ -272,7 +272,10 @@ class OutputBuffer(list):
 | 
				
			|||||||
		sys.stdout = self.__buf
 | 
							sys.stdout = self.__buf
 | 
				
			||||||
		return self
 | 
							return self
 | 
				
			||||||
	
 | 
						
 | 
				
			||||||
	def flush(self):
 | 
						def flush(self, sort_lines=False):
 | 
				
			||||||
 | 
							# Lines must be sorted in some cases to ensure consistent testing.
 | 
				
			||||||
 | 
							if sort_lines:
 | 
				
			||||||
 | 
								self.sort()
 | 
				
			||||||
		# type: () -> None
 | 
							# type: () -> None
 | 
				
			||||||
		for line in self:
 | 
							for line in self:
 | 
				
			||||||
			print(line)
 | 
								print(line)
 | 
				
			||||||
@@ -322,7 +325,7 @@ class SSH2(object):  # pylint: disable=too-few-public-methods
 | 
				
			|||||||
				'diffie-hellman-group18-sha512': [['7.3']],
 | 
									'diffie-hellman-group18-sha512': [['7.3']],
 | 
				
			||||||
				'diffie-hellman-group-exchange-sha1': [['2.3.0', '6.6', None], [FAIL_OPENSSH67_UNSAFE], [WARN_HASH_WEAK]],
 | 
									'diffie-hellman-group-exchange-sha1': [['2.3.0', '6.6', None], [FAIL_OPENSSH67_UNSAFE], [WARN_HASH_WEAK]],
 | 
				
			||||||
				'diffie-hellman-group-exchange-sha256': [['4.4']],
 | 
									'diffie-hellman-group-exchange-sha256': [['4.4']],
 | 
				
			||||||
				'diffie-hellman-group-exchange-sha256@ssh.com': [['4.4']],
 | 
									'diffie-hellman-group-exchange-sha256@ssh.com': [[]],
 | 
				
			||||||
				'ecdh-sha2-nistp256': [['5.7,d2013.62,l10.6.0'], [WARN_CURVES_WEAK]],
 | 
									'ecdh-sha2-nistp256': [['5.7,d2013.62,l10.6.0'], [WARN_CURVES_WEAK]],
 | 
				
			||||||
				'ecdh-sha2-nistp384': [['5.7,d2013.62'], [WARN_CURVES_WEAK]],
 | 
									'ecdh-sha2-nistp384': [['5.7,d2013.62'], [WARN_CURVES_WEAK]],
 | 
				
			||||||
				'ecdh-sha2-nistp521': [['5.7,d2013.62'], [WARN_CURVES_WEAK]],
 | 
									'ecdh-sha2-nistp521': [['5.7,d2013.62'], [WARN_CURVES_WEAK]],
 | 
				
			||||||
@@ -466,6 +469,7 @@ class SSH2(object):  # pylint: disable=too-few-public-methods
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
			self.__rsa_key_sizes = {}
 | 
								self.__rsa_key_sizes = {}
 | 
				
			||||||
			self.__dh_modulus_sizes = {}
 | 
								self.__dh_modulus_sizes = {}
 | 
				
			||||||
 | 
								self.__host_keys = {}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		@property
 | 
							@property
 | 
				
			||||||
		def cookie(self):
 | 
							def cookie(self):
 | 
				
			||||||
@@ -516,6 +520,12 @@ class SSH2(object):  # pylint: disable=too-few-public-methods
 | 
				
			|||||||
		def dh_modulus_sizes(self):
 | 
							def dh_modulus_sizes(self):
 | 
				
			||||||
			return self.__dh_modulus_sizes
 | 
								return self.__dh_modulus_sizes
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							def set_host_key(self, key_type, hostkey):
 | 
				
			||||||
 | 
								self.__host_keys[key_type] = hostkey
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							def host_keys(self):
 | 
				
			||||||
 | 
								return self.__host_keys
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		def write(self, wbuf):
 | 
							def write(self, wbuf):
 | 
				
			||||||
			# type: (WriteBuf) -> None
 | 
								# type: (WriteBuf) -> None
 | 
				
			||||||
			wbuf.write(self.cookie)
 | 
								wbuf.write(self.cookie)
 | 
				
			||||||
@@ -561,10 +571,22 @@ class SSH2(object):  # pylint: disable=too-few-public-methods
 | 
				
			|||||||
			kex = cls(cookie, kex_algs, key_algs, cli, srv, follows, unused)
 | 
								kex = cls(cookie, kex_algs, key_algs, cli, srv, follows, unused)
 | 
				
			||||||
			return kex
 | 
								return kex
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	# Obtains RSA host keys and checks their size.
 | 
						# Obtains host keys, checks their size, and derives their fingerprints.
 | 
				
			||||||
	class RSAKeyTest(object):
 | 
						class HostKeyTest(object):
 | 
				
			||||||
		RSA_TYPES = ['ssh-rsa', 'rsa-sha2-256', 'rsa-sha2-512']
 | 
							# Tracks the RSA host key types.  As of this writing, testing one in this family yields valid results for the rest.
 | 
				
			||||||
		RSA_CA_TYPES = ['ssh-rsa-cert-v01@openssh.com']
 | 
							RSA_FAMILY = ['ssh-rsa', 'rsa-sha2-256', 'rsa-sha2-512']
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							# Dict holding the host key types we should extract & parse.  'cert' is True to denote that a host key type handles certificates (thus requires additional parsing).  'variable_key_len' is True for host key types that can have variable sizes (True only for RSA types, as the rest are of fixed-size).  After the host key type is fully parsed, the key 'parsed' is added with a value of True.
 | 
				
			||||||
 | 
							HOST_KEY_TYPES = {
 | 
				
			||||||
 | 
								'ssh-rsa':      {'cert': False, 'variable_key_len': True},
 | 
				
			||||||
 | 
								'rsa-sha2-256': {'cert': False, 'variable_key_len': True},
 | 
				
			||||||
 | 
								'rsa-sha2-512': {'cert': False, 'variable_key_len': True},
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								'ssh-rsa-cert-v01@openssh.com':     {'cert': True, 'variable_key_len': True},
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								'ssh-ed25519':                      {'cert': False, 'variable_key_len': False},
 | 
				
			||||||
 | 
								'ssh-ed25519-cert-v01@openssh.com': {'cert': True,  'variable_key_len': False},
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		@staticmethod
 | 
							@staticmethod
 | 
				
			||||||
		def run(s, server_kex):
 | 
							def run(s, server_kex):
 | 
				
			||||||
@@ -595,90 +617,90 @@ class SSH2(object):  # pylint: disable=too-few-public-methods
 | 
				
			|||||||
					break
 | 
										break
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			if kex_str is not None:
 | 
								if kex_str is not None:
 | 
				
			||||||
				SSH2.RSAKeyTest.__test(s, server_kex, kex_str, kex_group, SSH2.RSAKeyTest.RSA_TYPES)
 | 
									SSH2.HostKeyTest.__test(s, server_kex, kex_str, kex_group, SSH2.HostKeyTest.HOST_KEY_TYPES)
 | 
				
			||||||
				SSH2.RSAKeyTest.__test(s, server_kex, kex_str, kex_group, SSH2.RSAKeyTest.RSA_CA_TYPES, ca=True)
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
		@staticmethod
 | 
							@staticmethod
 | 
				
			||||||
		def __test(s, server_kex, kex_str, kex_group, rsa_types, ca=False):
 | 
							def __test(s, server_kex, kex_str, kex_group, host_key_types):
 | 
				
			||||||
			# If the server supports one of the RSA types, extract its key size.
 | 
					 | 
				
			||||||
			hostkey_modulus_size = 0
 | 
								hostkey_modulus_size = 0
 | 
				
			||||||
			ca_modulus_size = 0
 | 
								ca_modulus_size = 0
 | 
				
			||||||
			ran_test = False
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
			# If the connection is closed, re-open it and get the kex again.
 | 
								# For each host key type...
 | 
				
			||||||
			if not s.is_connected():
 | 
								for host_key_type in host_key_types:
 | 
				
			||||||
				s.connect()
 | 
									# Skip those already handled (i.e.: those in the RSA family, as testing one tests them all).
 | 
				
			||||||
				unused = None # pylint: disable=unused-variable
 | 
									if 'parsed' in host_key_types[host_key_type] and host_key_types[host_key_type]['parsed']:
 | 
				
			||||||
				unused, unused, err = s.get_banner()
 | 
										continue
 | 
				
			||||||
				if err is not None:
 | 
					 | 
				
			||||||
					s.close()
 | 
					 | 
				
			||||||
					return
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
				# Parse the server's initial KEX.
 | 
									# If this host key type is supported by the server, we test it.
 | 
				
			||||||
				packet_type = 0 # pylint: disable=unused-variable
 | 
									if host_key_type in server_kex.key_algorithms:
 | 
				
			||||||
				packet_type, payload = s.read_packet()
 | 
										cert = host_key_types[host_key_type]['cert']
 | 
				
			||||||
				SSH2.Kex.parse(payload)
 | 
										variable_key_len = host_key_types[host_key_type]['variable_key_len']
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
										# If the connection is closed, re-open it and get the kex again.
 | 
				
			||||||
 | 
										if not s.is_connected():
 | 
				
			||||||
 | 
											s.connect()
 | 
				
			||||||
 | 
											unused = None # pylint: disable=unused-variable
 | 
				
			||||||
 | 
											unused, unused, err = s.get_banner()
 | 
				
			||||||
 | 
											if err is not None:
 | 
				
			||||||
 | 
												s.close()
 | 
				
			||||||
 | 
												return
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			for rsa_type in rsa_types:
 | 
											# Parse the server's initial KEX.
 | 
				
			||||||
				if rsa_type in server_kex.key_algorithms:
 | 
											packet_type = 0 # pylint: disable=unused-variable
 | 
				
			||||||
					ran_test = True
 | 
											packet_type, payload = s.read_packet()
 | 
				
			||||||
 | 
											SSH2.Kex.parse(payload)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
					# Send the server our KEXINIT message, using only our
 | 
										# Send the server our KEXINIT message, using only our
 | 
				
			||||||
					# selected kex and RSA type.  Send the server's own
 | 
										# selected kex and host key type.  Send the server's own
 | 
				
			||||||
					# list of ciphers and MACs back to it (this doesn't
 | 
										# list of ciphers and MACs back to it (this doesn't
 | 
				
			||||||
					# matter, really).
 | 
										# matter, really).
 | 
				
			||||||
					client_kex = SSH2.Kex(os.urandom(16), [kex_str], [rsa_type], server_kex.client, server_kex.server, 0, 0)
 | 
										client_kex = SSH2.Kex(os.urandom(16), [kex_str], [host_key_type], server_kex.client, server_kex.server, 0, 0)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
					s.write_byte(SSH.Protocol.MSG_KEXINIT)
 | 
										s.write_byte(SSH.Protocol.MSG_KEXINIT)
 | 
				
			||||||
					client_kex.write(s)
 | 
										client_kex.write(s)
 | 
				
			||||||
					s.send_packet()
 | 
										s.send_packet()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
					# Do the initial DH exchange.  The server responds back
 | 
										# Do the initial DH exchange.  The server responds back
 | 
				
			||||||
					# with the host key and its length.  Bingo.
 | 
										# with the host key and its length.  Bingo.  We also get back the host key fingerprint.
 | 
				
			||||||
					kex_group.send_init(s)
 | 
										kex_group.send_init(s)
 | 
				
			||||||
					kex_group.recv_reply(s)
 | 
										host_key = kex_group.recv_reply(s, variable_key_len)
 | 
				
			||||||
 | 
										server_kex.set_host_key(host_key_type, host_key)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
					hostkey_modulus_size = kex_group.get_hostkey_size()
 | 
										hostkey_modulus_size = kex_group.get_hostkey_size()
 | 
				
			||||||
					ca_modulus_size = kex_group.get_ca_size()
 | 
										ca_modulus_size = kex_group.get_ca_size()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
					# If we're not working with the CA types, we only need to
 | 
										# Close the socket, as the connection has
 | 
				
			||||||
					# test one RSA key, since the others will all be the same.
 | 
										# been put in a state that later tests can't use.
 | 
				
			||||||
					if ca is False:
 | 
										s.close()
 | 
				
			||||||
						break
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
			if hostkey_modulus_size > 0 or ca_modulus_size > 0:
 | 
										# If the host key modulus or CA modulus was successfully parsed, check to see that its a safe size.
 | 
				
			||||||
				# Set the hostkey size for all RSA key types since 'ssh-rsa',
 | 
										if hostkey_modulus_size > 0 or ca_modulus_size > 0:
 | 
				
			||||||
				# 'rsa-sha2-256', etc. are all using the same host key.
 | 
											# Set the hostkey size for all RSA key types since 'ssh-rsa',
 | 
				
			||||||
				# Note, however, that this may change in the future.
 | 
											# 'rsa-sha2-256', etc. are all using the same host key.
 | 
				
			||||||
				if ca is False:
 | 
											# Note, however, that this may change in the future.
 | 
				
			||||||
					for rsa_type in rsa_types:
 | 
											if cert is False and host_key_type in SSH2.HostKeyTest.RSA_FAMILY:
 | 
				
			||||||
						server_kex.set_rsa_key_size(rsa_type, hostkey_modulus_size)
 | 
												for rsa_type in SSH2.HostKeyTest.RSA_FAMILY:
 | 
				
			||||||
				else:
 | 
													server_kex.set_rsa_key_size(rsa_type, hostkey_modulus_size)
 | 
				
			||||||
					server_kex.set_rsa_key_size(rsa_type, hostkey_modulus_size, ca_modulus_size)
 | 
											elif cert is True:
 | 
				
			||||||
 | 
												server_kex.set_rsa_key_size(host_key_type, hostkey_modulus_size, ca_modulus_size)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
				# Keys smaller than 2048 result in a failure.
 | 
											# Keys smaller than 2048 result in a failure.  Update the database accordingly.
 | 
				
			||||||
				fail = False
 | 
											if (cert is False) and (hostkey_modulus_size < 2048):
 | 
				
			||||||
				if hostkey_modulus_size < 2048 or (ca_modulus_size < 2048 and ca_modulus_size > 0):
 | 
												for rsa_type in SSH2.HostKeyTest.RSA_FAMILY:
 | 
				
			||||||
					fail = True
 | 
													alg_list = SSH2.KexDB.ALGORITHMS['key'][rsa_type]
 | 
				
			||||||
 | 
													alg_list.append(['using small %d-bit modulus' % hostkey_modulus_size])
 | 
				
			||||||
 | 
											elif (cert is True) and ((hostkey_modulus_size < 2048) or (ca_modulus_size > 0 and ca_modulus_size < 2048)):
 | 
				
			||||||
 | 
												alg_list = SSH2.KexDB.ALGORITHMS['key'][host_key_type]
 | 
				
			||||||
 | 
												min_modulus = min(hostkey_modulus_size, ca_modulus_size)
 | 
				
			||||||
 | 
												min_modulus = min_modulus if min_modulus > 0 else max(hostkey_modulus_size, ca_modulus_size)
 | 
				
			||||||
 | 
												alg_list.append(['using small %d-bit modulus' % min_modulus])
 | 
				
			||||||
 | 
					
 | 
				
			||||||
				# If this is a bad key size, update the database accordingly.
 | 
										# If this host key type is in the RSA family, then mark them all as parsed (since results in one are valid for them all).
 | 
				
			||||||
				if fail:
 | 
										if host_key_type in SSH2.HostKeyTest.RSA_FAMILY:
 | 
				
			||||||
					if ca is False:
 | 
											for rsa_type in SSH2.HostKeyTest.RSA_FAMILY:
 | 
				
			||||||
						for rsa_type in SSH2.RSAKeyTest.RSA_TYPES:
 | 
												host_key_types[rsa_type]['parsed'] = True
 | 
				
			||||||
							alg_list = SSH2.KexDB.ALGORITHMS['key'][rsa_type]
 | 
					 | 
				
			||||||
							alg_list.append(['using small %d-bit modulus' % hostkey_modulus_size])
 | 
					 | 
				
			||||||
					else:
 | 
										else:
 | 
				
			||||||
						alg_list = SSH2.KexDB.ALGORITHMS['key'][rsa_type]
 | 
											host_key_types[host_key_type]['parsed'] = True
 | 
				
			||||||
 | 
					
 | 
				
			||||||
						min_modulus = min(hostkey_modulus_size, ca_modulus_size)
 | 
					 | 
				
			||||||
						min_modulus = min_modulus if min_modulus > 0 else max(hostkey_modulus_size, ca_modulus_size)
 | 
					 | 
				
			||||||
						alg_list.append(['using small %d-bit modulus' % min_modulus])
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			# If we ran any tests, close the socket, as the connection has
 | 
					 | 
				
			||||||
			# been put in a state that later tests can't use.
 | 
					 | 
				
			||||||
			if ran_test:
 | 
					 | 
				
			||||||
				s.close()
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	# Performs DH group exchanges to find what moduli are supported, and checks
 | 
						# Performs DH group exchanges to find what moduli are supported, and checks
 | 
				
			||||||
	# their size.
 | 
						# their size.
 | 
				
			||||||
@@ -733,21 +755,21 @@ class SSH2(object):  # pylint: disable=too-few-public-methods
 | 
				
			|||||||
					if SSH2.GEXTest.reconnect(s, gex_alg) is False:
 | 
										if SSH2.GEXTest.reconnect(s, gex_alg) is False:
 | 
				
			||||||
						break
 | 
											break
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					 | 
				
			||||||
					kex_group = GEX_ALGS[gex_alg]()
 | 
										kex_group = GEX_ALGS[gex_alg]()
 | 
				
			||||||
					smallest_modulus = -1
 | 
										smallest_modulus = -1
 | 
				
			||||||
 | 
					
 | 
				
			||||||
					# First try a range of weak sizes.
 | 
										# First try a range of weak sizes.
 | 
				
			||||||
					try:
 | 
										try:
 | 
				
			||||||
						kex_group.send_init_gex(s, 512, 1024, 1536)
 | 
											kex_group.send_init_gex(s, 512, 1024, 1536)
 | 
				
			||||||
						kex_group.recv_reply(s)
 | 
											kex_group.recv_reply(s, False)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
						# Its been observed that servers will return a group
 | 
											# Its been observed that servers will return a group
 | 
				
			||||||
						# larger than the requested max.  So just because we
 | 
											# larger than the requested max.  So just because we
 | 
				
			||||||
						# got here, doesn't mean the server is vulnerable...
 | 
											# got here, doesn't mean the server is vulnerable...
 | 
				
			||||||
						smallest_modulus = kex_group.get_dh_modulus_size()
 | 
											smallest_modulus = kex_group.get_dh_modulus_size()
 | 
				
			||||||
					except Exception: # pylint: disable=bare-except
 | 
					
 | 
				
			||||||
						x = 1 # pylint: disable=unused-variable
 | 
										except Exception as e: # pylint: disable=bare-except
 | 
				
			||||||
 | 
											pass
 | 
				
			||||||
					finally:
 | 
										finally:
 | 
				
			||||||
						s.close()
 | 
											s.close()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -766,10 +788,12 @@ class SSH2(object):  # pylint: disable=too-few-public-methods
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
						try:
 | 
											try:
 | 
				
			||||||
							kex_group.send_init_gex(s, bits, bits, bits)
 | 
												kex_group.send_init_gex(s, bits, bits, bits)
 | 
				
			||||||
							kex_group.recv_reply(s)
 | 
												kex_group.recv_reply(s, False)
 | 
				
			||||||
							smallest_modulus = kex_group.get_dh_modulus_size()
 | 
												smallest_modulus = kex_group.get_dh_modulus_size()
 | 
				
			||||||
						except Exception: # pylint: disable=bare-except
 | 
											except Exception as e: # pylint: disable=bare-except
 | 
				
			||||||
							x = 1 # pylint: disable=unused-variable
 | 
												#import traceback
 | 
				
			||||||
 | 
												#print(traceback.format_exc())
 | 
				
			||||||
 | 
												pass
 | 
				
			||||||
						finally:
 | 
											finally:
 | 
				
			||||||
							# The server is in a state that is not re-testable,
 | 
												# The server is in a state that is not re-testable,
 | 
				
			||||||
							# so there's nothing else to do with this open
 | 
												# so there's nothing else to do with this open
 | 
				
			||||||
@@ -2132,6 +2156,7 @@ class KexDH(object):  # pragma: nocover
 | 
				
			|||||||
		self.__f = 0
 | 
							self.__f = 0
 | 
				
			||||||
		self.__h_sig = 0
 | 
							self.__h_sig = 0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	def set_params(self, g, p):
 | 
						def set_params(self, g, p):
 | 
				
			||||||
		self.__g = g
 | 
							self.__g = g
 | 
				
			||||||
		self.__p = p
 | 
							self.__p = p
 | 
				
			||||||
@@ -2150,8 +2175,9 @@ class KexDH(object):  # pragma: nocover
 | 
				
			|||||||
		s.send_packet()
 | 
							s.send_packet()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	# Parse a KEXDH_REPLY or KEXDH_GEX_REPLY message from the server.  This
 | 
						# Parse a KEXDH_REPLY or KEXDH_GEX_REPLY message from the server.  This
 | 
				
			||||||
	# Contains the host key, among other things.
 | 
						# contains the host key, among other things.  Function returns the host
 | 
				
			||||||
	def recv_reply(self, s):
 | 
						# key blob (from which the fingerprint can be calculated).
 | 
				
			||||||
 | 
						def recv_reply(self, s, parse_host_key_size=True):
 | 
				
			||||||
		packet_type, payload = s.read_packet(2)
 | 
							packet_type, payload = s.read_packet(2)
 | 
				
			||||||
		if packet_type != -1 and packet_type not in [SSH.Protocol.MSG_KEXDH_REPLY, SSH.Protocol.MSG_KEXDH_GEX_REPLY]:
 | 
							if packet_type != -1 and packet_type not in [SSH.Protocol.MSG_KEXDH_REPLY, SSH.Protocol.MSG_KEXDH_GEX_REPLY]:
 | 
				
			||||||
			# TODO: change Exception to something more specific.
 | 
								# TODO: change Exception to something more specific.
 | 
				
			||||||
@@ -2160,7 +2186,7 @@ class KexDH(object):  # pragma: nocover
 | 
				
			|||||||
			# A connection error occurred.  We can't parse anything, so just
 | 
								# A connection error occurred.  We can't parse anything, so just
 | 
				
			||||||
			# return.  The host key modulus (and perhaps certificate modulus)
 | 
								# return.  The host key modulus (and perhaps certificate modulus)
 | 
				
			||||||
			# will remain at length 0.
 | 
								# will remain at length 0.
 | 
				
			||||||
			return
 | 
								return None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		hostkey_len = f_len = h_sig_len = 0  # pylint: disable=unused-variable
 | 
							hostkey_len = f_len = h_sig_len = 0  # pylint: disable=unused-variable
 | 
				
			||||||
		hostkey_type_len = hostkey_e_len = 0 # pylint: disable=unused-variable
 | 
							hostkey_type_len = hostkey_e_len = 0 # pylint: disable=unused-variable
 | 
				
			||||||
@@ -2178,6 +2204,11 @@ class KexDH(object):  # pragma: nocover
 | 
				
			|||||||
		# Get the host key blob, F, and signature.
 | 
							# Get the host key blob, F, and signature.
 | 
				
			||||||
		ptr = 0
 | 
							ptr = 0
 | 
				
			||||||
		hostkey, hostkey_len, ptr = KexDH.__get_bytes(payload, ptr)
 | 
							hostkey, hostkey_len, ptr = KexDH.__get_bytes(payload, ptr)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							# If we are not supposed to parse the host key size (i.e.: it is a type that is of fixed size such as ed25519), then stop here.
 | 
				
			||||||
 | 
							if not parse_host_key_size:
 | 
				
			||||||
 | 
								return hostkey
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		self.__f, f_len, ptr = KexDH.__get_bytes(payload, ptr)
 | 
							self.__f, f_len, ptr = KexDH.__get_bytes(payload, ptr)
 | 
				
			||||||
		self.__h_sig, h_sig_len, ptr = KexDH.__get_bytes(payload, ptr)
 | 
							self.__h_sig, h_sig_len, ptr = KexDH.__get_bytes(payload, ptr)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -2254,6 +2285,7 @@ class KexDH(object):  # pragma: nocover
 | 
				
			|||||||
				# CA's modulus.  Bingo.
 | 
									# CA's modulus.  Bingo.
 | 
				
			||||||
				ca_key_n, self.__ca_n_len, ptr = KexDH.__get_bytes(ca_key, ptr)
 | 
									ca_key_n, self.__ca_n_len, ptr = KexDH.__get_bytes(ca_key, ptr)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							return hostkey
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	@staticmethod
 | 
						@staticmethod
 | 
				
			||||||
	def __get_bytes(buf, ptr):
 | 
						def __get_bytes(buf, ptr):
 | 
				
			||||||
@@ -2639,20 +2671,35 @@ def output_security(banner, padlen):
 | 
				
			|||||||
		out.sep()
 | 
							out.sep()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def output_fingerprint(algs, sha256=True, padlen=0):
 | 
					def output_fingerprints(algs, sha256=True):
 | 
				
			||||||
	# type: (SSH.Algorithms, bool, int) -> None
 | 
						# type: (SSH.Algorithms, bool, int) -> None
 | 
				
			||||||
	with OutputBuffer() as obuf:
 | 
						with OutputBuffer() as obuf:
 | 
				
			||||||
		fps = []
 | 
							fps = []
 | 
				
			||||||
		if algs.ssh1kex is not None:
 | 
							if algs.ssh1kex is not None:
 | 
				
			||||||
			name = 'ssh-rsa1'
 | 
								name = 'ssh-rsa1'
 | 
				
			||||||
			fp = SSH.Fingerprint(algs.ssh1kex.host_key_fingerprint_data)
 | 
								fp = SSH.Fingerprint(algs.ssh1kex.host_key_fingerprint_data)
 | 
				
			||||||
			bits = algs.ssh1kex.host_key_bits
 | 
								#bits = algs.ssh1kex.host_key_bits
 | 
				
			||||||
			fps.append((name, fp, bits))
 | 
								fps.append((name, fp))
 | 
				
			||||||
 | 
							if algs.ssh2kex is not None:
 | 
				
			||||||
 | 
								host_keys = algs.ssh2kex.host_keys()
 | 
				
			||||||
 | 
								for host_key_type in algs.ssh2kex.host_keys():
 | 
				
			||||||
 | 
									fp = SSH.Fingerprint(host_keys[host_key_type])
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
									# Workaround for Python's order-indifference in dicts.  We might get a random RSA type (ssh-rsa, rsa-sha2-256, or rsa-sha2-512), so running the tool against the same server three times may give three different host key types here.  So if we have any RSA type, we will simply hard-code it to 'ssh-rsa'.
 | 
				
			||||||
 | 
									if host_key_type in SSH2.HostKeyTest.RSA_FAMILY:
 | 
				
			||||||
 | 
										host_key_type = 'ssh-rsa'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
									# Skip over certificate host types (or we would return invalid fingerprints).
 | 
				
			||||||
 | 
									if '-cert-' not in host_key_type:
 | 
				
			||||||
 | 
										fps.append((host_key_type, fp))
 | 
				
			||||||
 | 
							# Similarly, the host keys can be processed in random order due to Python's order-indifference in dicts.  So we sort this list before printing; this makes automated testing possible.
 | 
				
			||||||
 | 
							fps = sorted(fps)
 | 
				
			||||||
		for fpp in fps:
 | 
							for fpp in fps:
 | 
				
			||||||
			name, fp, bits = fpp
 | 
								name, fp = fpp
 | 
				
			||||||
			fpo = fp.sha256 if sha256 else fp.md5
 | 
								fpo = fp.sha256 if sha256 else fp.md5
 | 
				
			||||||
			p = '' if out.batch else ' ' * (padlen - len(name))
 | 
								#p = '' if out.batch else ' ' * (padlen - len(name))
 | 
				
			||||||
			out.good('(fin) {0}{1} -- {2} {3}'.format(name, p, bits, fpo))
 | 
								#out.good('(fin) {0}{1} -- {2} {3}'.format(name, p, bits, fpo))
 | 
				
			||||||
 | 
								out.good('(fin) {0}: {1}'.format(name, fpo))
 | 
				
			||||||
	if len(obuf) > 0:
 | 
						if len(obuf) > 0:
 | 
				
			||||||
		out.head('# fingerprints')
 | 
							out.head('# fingerprints')
 | 
				
			||||||
		obuf.flush()
 | 
							obuf.flush()
 | 
				
			||||||
@@ -2694,7 +2741,7 @@ def output_recommendations(algs, software, padlen=0):
 | 
				
			|||||||
		else:
 | 
							else:
 | 
				
			||||||
			title = ''
 | 
								title = ''
 | 
				
			||||||
		out.head('# algorithm recommendations {0}'.format(title))
 | 
							out.head('# algorithm recommendations {0}'.format(title))
 | 
				
			||||||
		obuf.flush()
 | 
							obuf.flush(True) # Sort the output so that it is always stable (needed for repeatable testing).
 | 
				
			||||||
		out.sep()
 | 
							out.sep()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -2752,8 +2799,8 @@ def output(banner, header, kex=None, pkm=None):
 | 
				
			|||||||
		output_algorithms(title, adb, atype, kex.server.encryption, unknown_algorithms, maxlen)
 | 
							output_algorithms(title, adb, atype, kex.server.encryption, unknown_algorithms, maxlen)
 | 
				
			||||||
		title, atype = 'message authentication code algorithms', 'mac'
 | 
							title, atype = 'message authentication code algorithms', 'mac'
 | 
				
			||||||
		output_algorithms(title, adb, atype, kex.server.mac, unknown_algorithms, maxlen)
 | 
							output_algorithms(title, adb, atype, kex.server.mac, unknown_algorithms, maxlen)
 | 
				
			||||||
 | 
						output_fingerprints(algs, True)
 | 
				
			||||||
	output_recommendations(algs, software, maxlen)
 | 
						output_recommendations(algs, software, maxlen)
 | 
				
			||||||
	output_fingerprint(algs, True, maxlen)
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	# If we encountered any unknown algorithms, ask the user to report them.
 | 
						# If we encountered any unknown algorithms, ask the user to report them.
 | 
				
			||||||
	if len(unknown_algorithms) > 0:
 | 
						if len(unknown_algorithms) > 0:
 | 
				
			||||||
@@ -2933,7 +2980,7 @@ def audit(aconf, sshv=None):
 | 
				
			|||||||
		output(banner, header, pkm=pkm)
 | 
							output(banner, header, pkm=pkm)
 | 
				
			||||||
	elif sshv == 2:
 | 
						elif sshv == 2:
 | 
				
			||||||
		kex = SSH2.Kex.parse(payload)
 | 
							kex = SSH2.Kex.parse(payload)
 | 
				
			||||||
		SSH2.RSAKeyTest.run(s, kex)
 | 
							SSH2.HostKeyTest.run(s, kex)
 | 
				
			||||||
		SSH2.GEXTest.run(s, kex)
 | 
							SSH2.GEXTest.run(s, kex)
 | 
				
			||||||
		output(banner, header, kex=kex)
 | 
							output(banner, header, kex=kex)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user