mirror of
				https://github.com/jtesta/ssh-audit.git
				synced 2025-11-03 18:52:15 +01:00 
			
		
		
		
	@@ -33,6 +33,7 @@ usage: ssh-audit.py [-1246pbcnvlt] <host>
 | 
			
		||||
                               software config (use -p to change port;
 | 
			
		||||
                               use -t to change timeout)
 | 
			
		||||
   -n,  --no-colors        disable colors
 | 
			
		||||
   -j,  --json             JSON output
 | 
			
		||||
   -v,  --verbose          verbose output
 | 
			
		||||
   -l,  --level=<level>    minimum output level (info|warn|fail)
 | 
			
		||||
   -t,  --timeout=<secs>   timeout (in seconds) for connection and reading
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										91
									
								
								ssh-audit.py
									
									
									
									
									
								
							
							
						
						
									
										91
									
								
								ssh-audit.py
									
									
									
									
									
								
							@@ -25,7 +25,7 @@
 | 
			
		||||
   THE SOFTWARE.
 | 
			
		||||
"""
 | 
			
		||||
from __future__ import print_function
 | 
			
		||||
import base64, binascii, errno, hashlib, getopt, io, os, random, re, select, socket, struct, sys
 | 
			
		||||
import base64, binascii, errno, hashlib, getopt, io, os, random, re, select, socket, struct, sys, json
 | 
			
		||||
 | 
			
		||||
VERSION = 'v2.1.0-dev'
 | 
			
		||||
SSH_HEADER = 'SSH-{0}-OpenSSH_8.0' # SSH software to impersonate
 | 
			
		||||
@@ -62,7 +62,7 @@ def usage(err=None):
 | 
			
		||||
	uout.head('# {0} {1}, https://github.com/jtesta/ssh-audit\n'.format(p, VERSION))
 | 
			
		||||
	if err is not None and len(err) > 0:
 | 
			
		||||
		uout.fail('\n' + err)
 | 
			
		||||
	uout.info('usage: {0} [-1246pbcnvlt] <host>\n'.format(p))
 | 
			
		||||
	uout.info('usage: {0} [-1246pbcnjvlt] <host>\n'.format(p))
 | 
			
		||||
	uout.info('   -h,  --help             print this help')
 | 
			
		||||
	uout.info('   -1,  --ssh1             force ssh version 1 only')
 | 
			
		||||
	uout.info('   -2,  --ssh2             force ssh version 2 only')
 | 
			
		||||
@@ -72,6 +72,7 @@ def usage(err=None):
 | 
			
		||||
	uout.info('   -b,  --batch            batch output')
 | 
			
		||||
	uout.info('   -c,  --client-audit     starts a server on port 2222 to audit client\n                               software config (use -p to change port;\n                               use -t to change timeout)')
 | 
			
		||||
	uout.info('   -n,  --no-colors        disable colors')
 | 
			
		||||
	uout.info('   -j,  --json             JSON output')
 | 
			
		||||
	uout.info('   -v,  --verbose          verbose output')
 | 
			
		||||
	uout.info('   -l,  --level=<level>    minimum output level (info|warn|fail)')
 | 
			
		||||
	uout.info('   -t,  --timeout=<secs>   timeout (in seconds) for connection and reading\n                               (default: 5)')
 | 
			
		||||
@@ -90,6 +91,7 @@ class AuditConf(object):
 | 
			
		||||
		self.batch = False
 | 
			
		||||
		self.client_audit = False
 | 
			
		||||
		self.colors = True
 | 
			
		||||
		self.json = False
 | 
			
		||||
		self.verbose = False
 | 
			
		||||
		self.level = 'info'
 | 
			
		||||
		self.ipvo = ()  # type: Sequence[int]
 | 
			
		||||
@@ -101,7 +103,7 @@ class AuditConf(object):
 | 
			
		||||
	def __setattr__(self, name, value):
 | 
			
		||||
		# type: (str, Union[str, int, bool, Sequence[int]]) -> None
 | 
			
		||||
		valid = False
 | 
			
		||||
		if name in ['ssh1', 'ssh2', 'batch', 'client_audit', 'colors', 'verbose', 'timeout_set']:
 | 
			
		||||
		if name in ['ssh1', 'ssh2', 'batch', 'client_audit', 'colors', 'verbose', 'timeout_set', 'json']:
 | 
			
		||||
			valid, value = True, True if bool(value) else False
 | 
			
		||||
		elif name in ['ipv4', 'ipv6']:
 | 
			
		||||
			valid = False
 | 
			
		||||
@@ -148,8 +150,8 @@ class AuditConf(object):
 | 
			
		||||
		# pylint: disable=too-many-branches
 | 
			
		||||
		aconf = cls()
 | 
			
		||||
		try:
 | 
			
		||||
			sopts = 'h1246p:bcnvl:t:'
 | 
			
		||||
			lopts = ['help', 'ssh1', 'ssh2', 'ipv4', 'ipv6', 'port',
 | 
			
		||||
			sopts = 'h1246p:bcnjvl:t:'
 | 
			
		||||
			lopts = ['help', 'ssh1', 'ssh2', 'ipv4', 'ipv6', 'port', 'json',
 | 
			
		||||
			         'batch', 'client-audit', 'no-colors', 'verbose', 'level=', 'timeout=']
 | 
			
		||||
			opts, args = getopt.gnu_getopt(args, sopts, lopts)
 | 
			
		||||
		except getopt.GetoptError as err:
 | 
			
		||||
@@ -176,6 +178,8 @@ class AuditConf(object):
 | 
			
		||||
				aconf.client_audit = True
 | 
			
		||||
			elif o in ('-n', '--no-colors'):
 | 
			
		||||
				aconf.colors = False
 | 
			
		||||
			elif o in ('-j', '--json'):
 | 
			
		||||
				aconf.json = True
 | 
			
		||||
			elif o in ('-v', '--verbose'):
 | 
			
		||||
				aconf.verbose = True
 | 
			
		||||
			elif o in ('-l', '--level'):
 | 
			
		||||
@@ -225,6 +229,7 @@ class Output(object):
 | 
			
		||||
		self.batch = False
 | 
			
		||||
		self.verbose = False
 | 
			
		||||
		self.use_colors = True
 | 
			
		||||
		self.json = False
 | 
			
		||||
		self.__level = 0
 | 
			
		||||
		self.__colsupport = 'colorama' in sys.modules or os.name == 'posix'
 | 
			
		||||
	
 | 
			
		||||
@@ -3137,6 +3142,76 @@ class Utils(object):
 | 
			
		||||
		except:  # pylint: disable=bare-except
 | 
			
		||||
			return -1.0
 | 
			
		||||
 | 
			
		||||
def build_struct(banner, kex=None, pkm=None):
 | 
			
		||||
	res = {
 | 
			
		||||
		"banner": {
 | 
			
		||||
			"raw": str(banner),
 | 
			
		||||
			"protocol": banner.protocol,
 | 
			
		||||
			"software": banner.software,
 | 
			
		||||
			"comments": banner.comments,
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	if kex is not None:
 | 
			
		||||
		res['compression'] = kex.server.compression
 | 
			
		||||
 | 
			
		||||
		res['kex'] = []
 | 
			
		||||
		alg_sizes = kex.dh_modulus_sizes()
 | 
			
		||||
		for algorithm in kex.kex_algorithms:
 | 
			
		||||
			entry = {
 | 
			
		||||
				'algorithm': algorithm,
 | 
			
		||||
			}
 | 
			
		||||
			if (alg_sizes is not None) and (algorithm in alg_sizes):
 | 
			
		||||
				hostkey_size, ca_size = alg_sizes[algorithm]
 | 
			
		||||
				entry['keysize'] = hostkey_size
 | 
			
		||||
				if ca_size > 0:
 | 
			
		||||
					entry['casize'] = ca_size
 | 
			
		||||
			res['kex'].append(entry)
 | 
			
		||||
 | 
			
		||||
		res['key'] = []
 | 
			
		||||
		alg_sizes = kex.rsa_key_sizes()
 | 
			
		||||
		for algorithm in kex.key_algorithms:
 | 
			
		||||
			entry = {
 | 
			
		||||
				'algorithm': algorithm,
 | 
			
		||||
			}
 | 
			
		||||
			if (alg_sizes is not None) and (algorithm in alg_sizes):
 | 
			
		||||
				hostkey_size, ca_size = alg_sizes[algorithm]
 | 
			
		||||
				entry['keysize'] = hostkey_size
 | 
			
		||||
				if ca_size > 0:
 | 
			
		||||
					entry['casize'] = ca_size
 | 
			
		||||
			res['key'].append(entry)
 | 
			
		||||
 | 
			
		||||
		res['enc'] = kex.server.encryption
 | 
			
		||||
		res['mac'] = kex.server.mac
 | 
			
		||||
		res['fingerprints'] = []
 | 
			
		||||
		host_keys = kex.host_keys()
 | 
			
		||||
		for host_key_type in host_keys:
 | 
			
		||||
			if host_keys[host_key_type] is None:
 | 
			
		||||
				continue
 | 
			
		||||
 | 
			
		||||
			fp = SSH.Fingerprint(host_keys[host_key_type])
 | 
			
		||||
 | 
			
		||||
			# Workaround for Python's order-indifference in dicts.  We might get a random RSA type (ssh-rsa, rsa-sha2-256, or rsa-sha2-512), so running the tool against the same server three times may give three different host key types here.  So if we have any RSA type, we will simply hard-code it to 'ssh-rsa'.
 | 
			
		||||
			if host_key_type in SSH2.HostKeyTest.RSA_FAMILY:
 | 
			
		||||
				host_key_type = 'ssh-rsa'
 | 
			
		||||
 | 
			
		||||
			# Skip over certificate host types (or we would return invalid fingerprints).
 | 
			
		||||
			if '-cert-' in host_key_type:
 | 
			
		||||
				continue
 | 
			
		||||
			entry = {
 | 
			
		||||
				'type': host_key_type,
 | 
			
		||||
				'fp': fp.sha256,
 | 
			
		||||
			}
 | 
			
		||||
			res['fingerprints'].append(entry)
 | 
			
		||||
	else:
 | 
			
		||||
		res['key'] = ['ssh-rsa1']
 | 
			
		||||
		res['enc'] = pkm.supported_ciphers
 | 
			
		||||
		res['aut'] = pkm.supported_authentications
 | 
			
		||||
		res['fingerprints'] = [{
 | 
			
		||||
			'type': 'ssh-rsa1',
 | 
			
		||||
			'fp': SSH.Fingerprint(pkm.host_key_fingerprint_data).sha256,
 | 
			
		||||
		}]
 | 
			
		||||
 | 
			
		||||
	return res
 | 
			
		||||
 | 
			
		||||
def audit(aconf, sshv=None):
 | 
			
		||||
	# type: (AuditConf, Optional[int]) -> None
 | 
			
		||||
@@ -3189,12 +3264,18 @@ def audit(aconf, sshv=None):
 | 
			
		||||
		sys.exit(1)
 | 
			
		||||
	if sshv == 1:
 | 
			
		||||
		pkm = SSH1.PublicKeyMessage.parse(payload)
 | 
			
		||||
		if aconf.json:
 | 
			
		||||
			print(json.dumps(build_struct(banner, pkm=pkm)))
 | 
			
		||||
		else:
 | 
			
		||||
			output(banner, header, pkm=pkm)
 | 
			
		||||
	elif sshv == 2:
 | 
			
		||||
		kex = SSH2.Kex.parse(payload)
 | 
			
		||||
		if aconf.client_audit is False:
 | 
			
		||||
			SSH2.HostKeyTest.run(s, kex)
 | 
			
		||||
			SSH2.GEXTest.run(s, kex)
 | 
			
		||||
		if aconf.json:
 | 
			
		||||
			print(json.dumps(build_struct(banner, kex=kex)))
 | 
			
		||||
		else:
 | 
			
		||||
                        output(banner, header, client_audit=aconf.client_audit, kex=kex)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user