Removed SSHv1 support (#298).

This commit is contained in:
Joe Testa
2025-07-26 19:57:11 -04:00
parent b456bb31b9
commit 11a902cb14
15 changed files with 79 additions and 691 deletions

View File

@@ -2,7 +2,7 @@
"""
The MIT License (MIT)
Copyright (C) 2017-2024 Joe Testa (jtesta@positronsecurity.com)
Copyright (C) 2017-2025 Joe Testa (jtesta@positronsecurity.com)
Copyright (C) 2017 Andris Raugulis (moo@arthepsy.eu)
Permission is hereby granted, free of charge, to any person obtaining a copy
@@ -56,8 +56,6 @@ from ssh_audit.policy import Policy
from ssh_audit.product import Product
from ssh_audit.protocol import Protocol
from ssh_audit.software import Software
from ssh_audit.ssh1_kexdb import SSH1_KexDB
from ssh_audit.ssh1_publickeymessage import SSH1_PublicKeyMessage
from ssh_audit.ssh2_kex import SSH2_Kex
from ssh_audit.ssh2_kexdb import SSH2_KexDB
from ssh_audit.ssh_socket import SSH_Socket
@@ -234,11 +232,6 @@ def output_security(out: OutputBuffer, banner: Optional[Banner], padlen: int, is
def output_fingerprints(out: OutputBuffer, algs: Algorithms, is_json_output: bool) -> None:
with out:
fps = {}
if algs.ssh1kex is not None:
name = 'ssh-rsa1'
fp = Fingerprint(algs.ssh1kex.host_key_fingerprint_data)
# bits = algs.ssh1kex.host_key_bits
fps[name] = fp
if algs.ssh2kex is not None:
host_keys = algs.ssh2kex.host_keys()
for host_key_type in algs.ssh2kex.host_keys():
@@ -509,12 +502,11 @@ def post_process_findings(banner: Optional[Banner], algs: Algorithms, client_aud
# Returns a exitcodes.* flag to denote if any failures or warnings were encountered.
def output(out: OutputBuffer, aconf: AuditConf, banner: Optional[Banner], header: List[str], client_host: Optional[str] = None, kex: Optional[SSH2_Kex] = None, pkm: Optional[SSH1_PublicKeyMessage] = None, print_target: bool = False, dh_rate_test_notes: str = "") -> int:
def output(out: OutputBuffer, aconf: AuditConf, banner: Optional[Banner], header: List[str], client_host: Optional[str] = None, kex: Optional[SSH2_Kex] = None, print_target: bool = False, dh_rate_test_notes: str = "") -> int:
program_retval = exitcodes.GOOD
client_audit = client_host is not None # If set, this is a client audit.
sshv = 1 if pkm is not None else 2
algs = Algorithms(pkm, kex)
algs = Algorithms(kex)
# Perform post-processing on the findings to make final adjustments before outputting the results.
algorithm_recommendation_suppress_list, additional_notes = post_process_findings(banner, algs, client_audit, dh_rate_test_notes)
@@ -539,7 +531,7 @@ def output(out: OutputBuffer, aconf: AuditConf, banner: Optional[Banner], header
out.info('(gen) header: ' + '\n'.join(header))
if banner is not None:
banner_line = '(gen) banner: {}'.format(banner)
if sshv == 1 or banner.protocol[0] == 1:
if banner.protocol[0] == 1:
out.fail(banner_line)
out.fail('(gen) protocol SSH1 enabled')
else:
@@ -571,18 +563,6 @@ def output(out: OutputBuffer, aconf: AuditConf, banner: Optional[Banner], header
# Filled in by output_algorithms() with unidentified algs.
unknown_algorithms: List[str] = []
# SSHv1
if pkm is not None:
adb = SSH1_KexDB.get_db()
ciphers = pkm.supported_ciphers
auths = pkm.supported_authentications
title, atype = 'SSH1 host-key algorithms', 'key'
program_retval = output_algorithms(out, title, adb, atype, ['ssh-rsa1'], unknown_algorithms, aconf.json, program_retval, maxlen)
title, atype = 'SSH1 encryption algorithms (ciphers)', 'enc'
program_retval = output_algorithms(out, title, adb, atype, ciphers, unknown_algorithms, aconf.json, program_retval, maxlen)
title, atype = 'SSH1 authentication types', 'aut'
program_retval = output_algorithms(out, title, adb, atype, auths, unknown_algorithms, aconf.json, program_retval, maxlen)
# SSHv2
if kex is not None:
adb = SSH2_KexDB.get_db()
@@ -782,15 +762,12 @@ def process_commandline(out: OutputBuffer, args: List[str]) -> 'AuditConf': # p
aconf.colors = enable_colors
out.use_colors = enable_colors
aconf.ssh1, aconf.ssh2 = False, False
host: str = ''
port: int = 22
parser = argparse.ArgumentParser(description="# {} {}, https://github.com/jtesta/ssh-audit".format(os.path.basename(sys.argv[0]), VERSION), allow_abbrev=False)
# Add short options to the parser
parser.add_argument("-1", "--ssh1", action="store_true", dest="ssh1", default=False, help="force ssh version 1 only")
parser.add_argument("-2", "--ssh2", action="store_true", dest="ssh2", default=False, help="force ssh version 2 only")
parser.add_argument("-4", "--ipv4", action="store_true", dest="ipv4", default=False, help="enable IPv4 (order of precedence)")
parser.add_argument("-6", "--ipv6", action="store_true", dest="ipv6", default=False, help="enable IPv6 (order of precedence)")
parser.add_argument("-b", "--batch", action="store_true", dest="batch", default=False, help="batch output")
@@ -836,8 +813,6 @@ def process_commandline(out: OutputBuffer, args: List[str]) -> 'AuditConf': # p
aconf.list_policies = argument.list_policies
aconf.manual = argument.manual
aconf.skip_rate_test = argument.skip_rate_test
aconf.ssh1 = argument.ssh1
aconf.ssh2 = argument.ssh2
oport = argument.oport
if argument.batch is True:
@@ -950,9 +925,6 @@ def process_commandline(out: OutputBuffer, args: List[str]) -> 'AuditConf': # p
aconf.host = host
aconf.port = port
if not (aconf.ssh1 or aconf.ssh2):
aconf.ssh1, aconf.ssh2 = True, True
# If a file containing a list of targets was given, read it.
if aconf.target_file is not None:
try:
@@ -994,7 +966,7 @@ def process_commandline(out: OutputBuffer, args: List[str]) -> 'AuditConf': # p
return aconf
def build_struct(target_host: str, banner: Optional['Banner'], kex: Optional['SSH2_Kex'] = None, pkm: Optional['SSH1_PublicKeyMessage'] = None, client_host: Optional[str] = None, software: Optional[Software] = None, algorithms: Optional[Algorithms] = None, algorithm_recommendation_suppress_list: Optional[List[str]] = None, additional_notes: List[str] = []) -> Any: # pylint: disable=dangerous-default-value
def build_struct(target_host: str, banner: Optional['Banner'], kex: Optional['SSH2_Kex'] = None, client_host: Optional[str] = None, software: Optional[Software] = None, algorithms: Optional[Algorithms] = None, algorithm_recommendation_suppress_list: Optional[List[str]] = None, additional_notes: List[str] = []) -> Any: # pylint: disable=dangerous-default-value
def fetch_notes(algorithm: str, alg_type: str) -> Dict[str, List[Optional[str]]]:
'''Returns a dictionary containing the messages in the "fail", "warn", and "info" levels for this algorithm.'''
@@ -1139,22 +1111,6 @@ def build_struct(target_host: str, banner: Optional['Banner'], kex: Optional['SS
'hash_alg': 'MD5',
'hash': fp.md5[4:]
})
else:
pkm_supported_ciphers = None
pkm_supported_authentications = None
pkm_fp = None
if pkm is not None:
pkm_supported_ciphers = pkm.supported_ciphers
pkm_supported_authentications = pkm.supported_authentications
pkm_fp = Fingerprint(pkm.host_key_fingerprint_data).sha256
res['key'] = ['ssh-rsa1']
res['enc'] = pkm_supported_ciphers
res['aut'] = pkm_supported_authentications
res['fingerprints'] = [{
'type': 'ssh-rsa1',
'fp': pkm_fp,
}]
# Historically, CVE information was returned. Now we'll just return an empty dictionary so as to not break any legacy clients.
res['cves'] = []
@@ -1169,7 +1125,7 @@ def build_struct(target_host: str, banner: Optional['Banner'], kex: Optional['SS
# Returns one of the exitcodes.* flags.
def audit(out: OutputBuffer, aconf: AuditConf, sshv: Optional[int] = None, print_target: bool = False) -> int:
def audit(out: OutputBuffer, aconf: AuditConf, print_target: bool = False) -> int:
program_retval = exitcodes.GOOD
out.batch = aconf.batch
out.verbose = aconf.verbose
@@ -1195,10 +1151,8 @@ def audit(out: OutputBuffer, aconf: AuditConf, sshv: Optional[int] = None, print
out.write()
sys.exit(exitcodes.CONNECTION_ERROR)
if sshv is None:
sshv = 2 if aconf.ssh2 else 1
err = None
banner, header, err = s.get_banner(sshv)
banner, header, err = s.get_banner()
if banner is None:
if err is None:
err = '[exception] did not receive banner.'
@@ -1207,7 +1161,7 @@ def audit(out: OutputBuffer, aconf: AuditConf, sshv: Optional[int] = None, print
if err is None:
s.send_kexinit() # Send the algorithms we support (except we don't since this isn't a real SSH connection).
packet_type, payload = s.read_packet(sshv)
packet_type, payload = s.read_packet()
if packet_type < 0:
try:
if len(payload) > 0:
@@ -1216,17 +1170,10 @@ def audit(out: OutputBuffer, aconf: AuditConf, sshv: Optional[int] = None, print
payload_txt = 'empty'
except UnicodeDecodeError:
payload_txt = '"{}"'.format(repr(payload).lstrip('b')[1:-1])
if payload_txt == 'Protocol major versions differ.':
if sshv == 2 and aconf.ssh1:
ret = audit(out, aconf, 1)
out.write()
return ret
err = '[exception] error reading packet ({})'.format(payload_txt)
else:
err_pair = None
if sshv == 1 and packet_type != Protocol.SMSG_PUBLIC_KEY:
err_pair = ('SMSG_PUBLIC_KEY', Protocol.SMSG_PUBLIC_KEY)
elif sshv == 2 and packet_type != Protocol.MSG_KEXINIT:
if packet_type != Protocol.MSG_KEXINIT:
err_pair = ('MSG_KEXINIT', Protocol.MSG_KEXINIT)
if err_pair is not None:
fmt = '[exception] did not receive {0} ({1}), ' + \
@@ -1236,52 +1183,50 @@ def audit(out: OutputBuffer, aconf: AuditConf, sshv: Optional[int] = None, print
output(out, aconf, banner, header)
out.fail(err)
return exitcodes.CONNECTION_ERROR
if sshv == 1:
program_retval = output(out, aconf, banner, header, pkm=SSH1_PublicKeyMessage.parse(payload))
elif sshv == 2:
try:
kex = SSH2_Kex.parse(out, payload)
out.d(str(kex))
except Exception:
out.fail("Failed to parse server's kex. Stack trace:\n%s" % str(traceback.format_exc()))
return exitcodes.CONNECTION_ERROR
if aconf.dheat is not None:
DHEat(out, aconf, banner, kex).run()
return exitcodes.GOOD
elif aconf.conn_rate_test_enabled:
DHEat.dh_rate_test(out, aconf, kex, 0, 0, 0)
return exitcodes.GOOD
try:
kex = SSH2_Kex.parse(out, payload)
out.d(str(kex))
except Exception:
out.fail("Failed to parse server's kex. Stack trace:\n%s" % str(traceback.format_exc()))
return exitcodes.CONNECTION_ERROR
dh_rate_test_notes = ""
if aconf.client_audit is False:
HostKeyTest.run(out, s, kex)
if aconf.gex_test != '':
return run_gex_granular_modulus_size_test(out, s, kex, aconf)
else:
GEXTest.run(out, s, banner, kex)
# Skip the rate test if the user specified "--skip-rate-test".
if aconf.skip_rate_test:
out.d("Skipping rate test due to --skip-rate-test option.")
else:
# Try to open many TCP connections against the server if any Diffie-Hellman key exchanges are present; this tests potential vulnerability to the DHEat DOS attack. Use 3 concurrent sockets over at most 1.5 seconds to open at most 38 connections (stops if 1.5 seconds elapse, or 38 connections are opened--whichever comes first). If more than 25 connections per second were observed, flag the DH algorithms with a warning about the DHEat DOS vuln.
dh_rate_test_notes = DHEat.dh_rate_test(out, aconf, kex, 1.5, 38, 3)
# This is a standard audit scan.
if (aconf.policy is None) and (aconf.make_policy is False):
program_retval = output(out, aconf, banner, header, client_host=s.client_host, kex=kex, print_target=print_target, dh_rate_test_notes=dh_rate_test_notes)
# This is a policy test.
elif (aconf.policy is not None) and (aconf.make_policy is False):
program_retval = exitcodes.GOOD if evaluate_policy(out, aconf, banner, s.client_host, kex=kex) else exitcodes.FAILURE
# A new policy should be made from this scan.
elif (aconf.policy is None) and (aconf.make_policy is True):
make_policy(aconf, banner, kex, s.client_host)
if aconf.dheat is not None:
DHEat(out, aconf, banner, kex).run()
return exitcodes.GOOD
elif aconf.conn_rate_test_enabled:
DHEat.dh_rate_test(out, aconf, kex, 0, 0, 0)
return exitcodes.GOOD
dh_rate_test_notes = ""
if aconf.client_audit is False:
HostKeyTest.run(out, s, kex)
if aconf.gex_test != '':
return run_gex_granular_modulus_size_test(out, s, kex, aconf)
else:
raise RuntimeError('Internal error while handling output: %r %r' % (aconf.policy is None, aconf.make_policy))
GEXTest.run(out, s, banner, kex)
# Skip the rate test if the user specified "--skip-rate-test".
if aconf.skip_rate_test:
out.d("Skipping rate test due to --skip-rate-test option.")
else:
# Try to open many TCP connections against the server if any Diffie-Hellman key exchanges are present; this tests potential vulnerability to the DHEat DOS attack. Use 3 concurrent sockets over at most 1.5 seconds to open at most 38 connections (stops if 1.5 seconds elapse, or 38 connections are opened--whichever comes first). If more than 25 connections per second were observed, flag the DH algorithms with a warning about the DHEat DOS vuln.
dh_rate_test_notes = DHEat.dh_rate_test(out, aconf, kex, 1.5, 38, 3)
# This is a standard audit scan.
if (aconf.policy is None) and (aconf.make_policy is False):
program_retval = output(out, aconf, banner, header, client_host=s.client_host, kex=kex, print_target=print_target, dh_rate_test_notes=dh_rate_test_notes)
# This is a policy test.
elif (aconf.policy is not None) and (aconf.make_policy is False):
program_retval = exitcodes.GOOD if evaluate_policy(out, aconf, banner, s.client_host, kex=kex) else exitcodes.FAILURE
# A new policy should be made from this scan.
elif (aconf.policy is None) and (aconf.make_policy is True):
make_policy(aconf, banner, kex, s.client_host)
else:
raise RuntimeError('Internal error while handling output: %r %r' % (aconf.policy is None, aconf.make_policy))
return program_retval
@@ -1548,7 +1493,6 @@ def main() -> int:
print(']')
# Send notification that this thread is exiting. This deletes the thread's local copy of the algorithm databases.
SSH1_KexDB.thread_exit()
SSH2_KexDB.thread_exit()
else: # Just a scan against a single target.