mirror of
				https://github.com/jtesta/ssh-audit.git
				synced 2025-11-04 03:02:15 +01:00 
			
		
		
		
	Implement compatibility checker (based on returned algorithms).
This commit is contained in:
		
							
								
								
									
										42
									
								
								ssh-audit.py
									
									
									
									
									
								
							
							
						
						
									
										42
									
								
								ssh-audit.py
									
									
									
									
									
								
							@@ -283,16 +283,43 @@ class KexGroup14(KexDH):
 | 
				
			|||||||
		        '15728e5a8aacaa68ffffffffffffffff', 16)
 | 
							        '15728e5a8aacaa68ffffffffffffffff', 16)
 | 
				
			||||||
		super(KexGroup14, self).__init__('sha1', 2, p)
 | 
							super(KexGroup14, self).__init__('sha1', 2, p)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def get_ssh_version(version_desc):
 | 
				
			||||||
 | 
						if version_desc.startswith('d'):
 | 
				
			||||||
 | 
							return ('Dropbear SSH', version_desc[1:])
 | 
				
			||||||
 | 
						else:
 | 
				
			||||||
 | 
							return ('OpenSSH', version_desc)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def get_ssh_ver(versions):
 | 
					def get_alg_since_text(versions):
 | 
				
			||||||
	tv = []
 | 
						tv = []
 | 
				
			||||||
	for v in versions.split(','):
 | 
						for v in versions.split(','):
 | 
				
			||||||
		if v.startswith('d'):
 | 
							ssh_prefix, ssh_version = get_ssh_version(v)
 | 
				
			||||||
			tv.append('Dropbear SSH {0}'.format(v[1:]))
 | 
							tv.append('{0} {1}'.format(ssh_prefix, ssh_version))
 | 
				
			||||||
		else:
 | 
					 | 
				
			||||||
			tv.append('OpenSSH {0}'.format(v))
 | 
					 | 
				
			||||||
	return 'available since ' + ', '.join(tv).rstrip(', ')
 | 
						return 'available since ' + ', '.join(tv).rstrip(', ')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def get_alg_timeframe(alg_desc, result = {}):
 | 
				
			||||||
 | 
						vsince = alg_desc[0]
 | 
				
			||||||
 | 
						for v in vsince.split(','):
 | 
				
			||||||
 | 
							ssh_prefix, ssh_version = get_ssh_version(v)
 | 
				
			||||||
 | 
							if not ssh_prefix in result:
 | 
				
			||||||
 | 
								result[ssh_prefix] = [None, None]
 | 
				
			||||||
 | 
							if result[ssh_prefix][0] is None or result[ssh_prefix][0] < ssh_version:
 | 
				
			||||||
 | 
								result[ssh_prefix][0] = ssh_version
 | 
				
			||||||
 | 
						return result
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def get_ssh_timeframe(kex):
 | 
				
			||||||
 | 
						alg_timeframe = {}
 | 
				
			||||||
 | 
						algs = {'kex': kex.kex_algorithms, 
 | 
				
			||||||
 | 
						        'key': kex.key_algorithms,
 | 
				
			||||||
 | 
						        'enc': kex.server.encryption,
 | 
				
			||||||
 | 
						        'mac': kex.server.mac}
 | 
				
			||||||
 | 
						for alg_type, alg_list in algs.items():
 | 
				
			||||||
 | 
							for alg_name in alg_list:
 | 
				
			||||||
 | 
								alg_desc = KEX_DB[alg_type].get(alg_name)
 | 
				
			||||||
 | 
								if alg_desc is None:
 | 
				
			||||||
 | 
									continue
 | 
				
			||||||
 | 
								alg_timeframe = get_alg_timeframe(alg_desc, alg_timeframe)
 | 
				
			||||||
 | 
						return alg_timeframe
 | 
				
			||||||
 | 
					
 | 
				
			||||||
WARN_OPENSSH72_LEGACY = 'disabled (in client) since OpenSSH 7.2, legacy algorithm'
 | 
					WARN_OPENSSH72_LEGACY = 'disabled (in client) since OpenSSH 7.2, legacy algorithm'
 | 
				
			||||||
FAIL_OPENSSH70_LEGACY = 'removed since OpenSSH 7.0, legacy algorithm'
 | 
					FAIL_OPENSSH70_LEGACY = 'removed since OpenSSH 7.0, legacy algorithm'
 | 
				
			||||||
FAIL_OPENSSH70_WEAK   = 'removed (in server) and disabled (in client) since OpenSSH 7.0, weak algorithm'
 | 
					FAIL_OPENSSH70_WEAK   = 'removed (in server) and disabled (in client) since OpenSSH 7.0, weak algorithm'
 | 
				
			||||||
@@ -418,7 +445,7 @@ def output_algorithm(alg_type, alg_name, alg_max_len=0):
 | 
				
			|||||||
		ldesc = len(alg_desc)
 | 
							ldesc = len(alg_desc)
 | 
				
			||||||
		for idx, level in enumerate(['fail', 'warn', 'info']):
 | 
							for idx, level in enumerate(['fail', 'warn', 'info']):
 | 
				
			||||||
			if level == 'info':
 | 
								if level == 'info':
 | 
				
			||||||
				texts.append((level, get_ssh_ver(alg_desc[0])))
 | 
									texts.append((level, get_alg_since_text(alg_desc[0])))
 | 
				
			||||||
			idx = idx + 1
 | 
								idx = idx + 1
 | 
				
			||||||
			if ldesc > idx:
 | 
								if ldesc > idx:
 | 
				
			||||||
				for t in alg_desc[idx]: 
 | 
									for t in alg_desc[idx]: 
 | 
				
			||||||
@@ -449,6 +476,9 @@ def output(banner, kex):
 | 
				
			|||||||
		out.fail('[fail] protocol SSH1 enabled')
 | 
							out.fail('[fail] protocol SSH1 enabled')
 | 
				
			||||||
	if kex is None:
 | 
						if kex is None:
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
 | 
						alg_timeframe = get_ssh_timeframe(kex)
 | 
				
			||||||
 | 
						guess_text = ', '.join('{} {}'.format(k, ' - '.join([v[0] or '...', v[1] or '...'])) for k, v in alg_timeframe.items())
 | 
				
			||||||
 | 
						out.good('[info] compatibility: ' + guess_text) 
 | 
				
			||||||
	compressions = [x for x in kex.server.compression if x != 'none']
 | 
						compressions = [x for x in kex.server.compression if x != 'none']
 | 
				
			||||||
	if len(compressions) > 0:
 | 
						if len(compressions) > 0:
 | 
				
			||||||
		cmptxt = 'enabled ({0})'.format(', '.join(compressions))
 | 
							cmptxt = 'enabled ({0})'.format(', '.join(compressions))
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user