Added support for mixed host key/CA key types (i.e.: RSA host keys signed by ED25519 CAs) (#120).

This commit is contained in:
Joe Testa
2023-04-25 09:17:32 -04:00
parent 4f31304b66
commit 263267c5ad
34 changed files with 556 additions and 308 deletions

View File

@@ -34,7 +34,7 @@ import traceback
# pylint: disable=unused-import
from typing import Dict, List, Set, Sequence, Tuple, Iterable # noqa: F401
from typing import Callable, Optional, Union, Any # noqa: F401
from typing import cast, Callable, Optional, Union, Any # noqa: F401
from ssh_audit.globals import SNAP_PACKAGE
from ssh_audit.globals import SNAP_PERMISSIONS_ERROR
@@ -107,10 +107,10 @@ def usage(uout: OutputBuffer, err: Optional[str] = None) -> None:
sys.exit(retval)
def output_algorithms(out: OutputBuffer, title: str, alg_db: Dict[str, Dict[str, List[List[Optional[str]]]]], alg_type: str, algorithms: List[str], unknown_algs: List[str], is_json_output: bool, program_retval: int, maxlen: int = 0, alg_sizes: Optional[Dict[str, Tuple[int, int]]] = None) -> int: # pylint: disable=too-many-arguments
def output_algorithms(out: OutputBuffer, title: str, alg_db: Dict[str, Dict[str, List[List[Optional[str]]]]], alg_type: str, algorithms: List[str], unknown_algs: List[str], is_json_output: bool, program_retval: int, maxlen: int = 0, host_keys: Optional[Dict[str, Dict[str, Union[bytes, str, int]]]] = None, dh_modulus_sizes: Optional[Dict[str, int]] = None) -> int: # pylint: disable=too-many-arguments
with out:
for algorithm in algorithms:
program_retval = output_algorithm(out, alg_db, alg_type, algorithm, unknown_algs, program_retval, maxlen, alg_sizes)
program_retval = output_algorithm(out, alg_db, alg_type, algorithm, unknown_algs, program_retval, maxlen, host_keys=host_keys, dh_modulus_sizes=dh_modulus_sizes)
if not out.is_section_empty() and not is_json_output:
out.head('# ' + title)
out.flush_section()
@@ -119,7 +119,7 @@ def output_algorithms(out: OutputBuffer, title: str, alg_db: Dict[str, Dict[str,
return program_retval
def output_algorithm(out: OutputBuffer, alg_db: Dict[str, Dict[str, List[List[Optional[str]]]]], alg_type: str, alg_name: str, unknown_algs: List[str], program_retval: int, alg_max_len: int = 0, alg_sizes: Optional[Dict[str, Tuple[int, int]]] = None) -> int:
def output_algorithm(out: OutputBuffer, alg_db: Dict[str, Dict[str, List[List[Optional[str]]]]], alg_type: str, alg_name: str, unknown_algs: List[str], program_retval: int, alg_max_len: int = 0, host_keys: Optional[Dict[str, Dict[str, Union[bytes, str, int]]]] = None, dh_modulus_sizes: Optional[Dict[str, int]] = None) -> int: # pylint: disable=too-many-arguments
prefix = '(' + alg_type + ') '
if alg_max_len == 0:
alg_max_len = len(alg_name)
@@ -128,13 +128,23 @@ def output_algorithm(out: OutputBuffer, alg_db: Dict[str, Dict[str, List[List[Op
# If this is an RSA host key or DH GEX, append the size to its name and fix
# the padding.
alg_name_with_size = None
if (alg_sizes is not None) and (alg_name in alg_sizes):
hostkey_size, ca_size = alg_sizes[alg_name]
if ca_size > 0:
alg_name_with_size = '%s (%d-bit cert/%d-bit CA)' % (alg_name, hostkey_size, ca_size)
if (dh_modulus_sizes is not None) and (alg_name in dh_modulus_sizes):
alg_name_with_size = '%s (%u-bit)' % (alg_name, dh_modulus_sizes[alg_name])
padding = padding[0:-11]
elif (host_keys is not None) and (alg_name in host_keys):
hostkey_size = cast(int, host_keys[alg_name]['hostkey_size'])
ca_key_type = cast(str, host_keys[alg_name]['ca_key_type'])
ca_key_size = cast(int, host_keys[alg_name]['ca_key_size'])
# If this is an RSA variant, just print "RSA".
if ca_key_type in HostKeyTest.RSA_FAMILY:
ca_key_type = "RSA"
if len(ca_key_type) > 0 and ca_key_size > 0:
alg_name_with_size = '%s (%u-bit cert/%u-bit %s CA)' % (alg_name, hostkey_size, ca_key_size, ca_key_type)
padding = padding[0:-15]
else:
alg_name_with_size = '%s (%d-bit)' % (alg_name, hostkey_size)
elif alg_name in HostKeyTest.RSA_FAMILY:
alg_name_with_size = '%s (%u-bit)' % (alg_name, hostkey_size)
padding = padding[0:-11]
# If this is a kex algorithm and starts with 'gss-', then normalize its name (i.e.: 'gss-gex-sha1-vz8J1E9PzLr8b1K+0remTg==' => 'gss-gex-sha1-*'). The base64 field can vary, so we'll convert it to the wildcard that our database uses and we'll just resume doing a straight match like all other algorithm names.
@@ -289,36 +299,36 @@ def output_security(out: OutputBuffer, banner: Optional[Banner], client_audit: b
def output_fingerprints(out: OutputBuffer, algs: Algorithms, is_json_output: bool) -> None:
with out:
fps = []
fps = {}
if algs.ssh1kex is not None:
name = 'ssh-rsa1'
fp = Fingerprint(algs.ssh1kex.host_key_fingerprint_data)
# bits = algs.ssh1kex.host_key_bits
fps.append((name, fp))
fps[name] = fp
if algs.ssh2kex is not None:
host_keys = algs.ssh2kex.host_keys()
for host_key_type in algs.ssh2kex.host_keys():
if host_keys[host_key_type] is None:
continue
fp = Fingerprint(host_keys[host_key_type])
fp = Fingerprint(cast(bytes, host_keys[host_key_type]['raw_hostkey_bytes']))
# Workaround for Python's order-indifference in dicts. We might get a random RSA type (ssh-rsa, rsa-sha2-256, or rsa-sha2-512), so running the tool against the same server three times may give three different host key types here. So if we have any RSA type, we will simply hard-code it to 'ssh-rsa'.
if host_key_type in HostKeyTest.RSA_FAMILY:
host_key_type = 'ssh-rsa'
# Skip over certificate host types (or we would return invalid fingerprints).
# Skip over certificate host types (or we would return invalid fingerprints), and only add one fingerprint in the RSA family.
if '-cert-' not in host_key_type:
fps.append((host_key_type, fp))
fps[host_key_type] = fp
# Similarly, the host keys can be processed in random order due to Python's order-indifference in dicts. So we sort this list before printing; this makes automated testing possible.
fps = sorted(fps)
for fpp in fps:
name, fp = fpp
out.good('(fin) {}: {}'.format(name, fp.sha256))
fp_types = sorted(fps.keys())
for fp_type in fp_types:
fp = fps[fp_type]
out.good('(fin) {}: {}'.format(fp_type, fp.sha256))
# Output the MD5 hash too if verbose mode is enabled.
if out.verbose:
out.info('(fin) {}: {} -- [info] do not rely on MD5 fingerprints for server identification; it is insecure for this use case'.format(name, fp.md5))
out.info('(fin) {}: {} -- [info] do not rely on MD5 fingerprints for server identification; it is insecure for this use case'.format(fp_type, fp.md5))
if not out.is_section_empty() and not is_json_output:
out.head('# fingerprints')
@@ -422,7 +432,7 @@ def post_process_findings(banner: Optional[Banner], algs: Algorithms) -> List[st
algorithm_recommendation_suppress_list = []
# If the server is OpenSSH, and the diffie-hellman-group-exchange-sha256 key exchange was found with modulus size 2048, add a note regarding the bug that causes the server to support 2048-bit moduli no matter the configuration.
if (algs.ssh2kex is not None and 'diffie-hellman-group-exchange-sha256' in algs.ssh2kex.kex_algorithms and 'diffie-hellman-group-exchange-sha256' in algs.ssh2kex.dh_modulus_sizes() and algs.ssh2kex.dh_modulus_sizes()['diffie-hellman-group-exchange-sha256'][0] == 2048) and (banner is not None and banner.software is not None and banner.software.find('OpenSSH') != -1):
if (algs.ssh2kex is not None and 'diffie-hellman-group-exchange-sha256' in algs.ssh2kex.kex_algorithms and 'diffie-hellman-group-exchange-sha256' in algs.ssh2kex.dh_modulus_sizes() and algs.ssh2kex.dh_modulus_sizes()['diffie-hellman-group-exchange-sha256'] == 2048) and (banner is not None and banner.software is not None and banner.software.find('OpenSSH') != -1):
# Ensure a list for notes exists.
while len(SSH2_KexDB.ALGORITHMS['kex']['diffie-hellman-group-exchange-sha256']) < 4:
@@ -498,6 +508,8 @@ def output(out: OutputBuffer, aconf: AuditConf, banner: Optional[Banner], header
cves = output_security(out, banner, client_audit, maxlen, aconf.json)
# Filled in by output_algorithms() with unidentified algs.
unknown_algorithms: List[str] = []
# SSHv1
if pkm is not None:
adb = SSH1_KexDB.ALGORITHMS
ciphers = pkm.supported_ciphers
@@ -508,16 +520,19 @@ def output(out: OutputBuffer, aconf: AuditConf, banner: Optional[Banner], header
program_retval = output_algorithms(out, title, adb, atype, ciphers, unknown_algorithms, aconf.json, program_retval, maxlen)
title, atype = 'SSH1 authentication types', 'aut'
program_retval = output_algorithms(out, title, adb, atype, auths, unknown_algorithms, aconf.json, program_retval, maxlen)
# SSHv2
if kex is not None:
adb = SSH2_KexDB.ALGORITHMS
title, atype = 'key exchange algorithms', 'kex'
program_retval = output_algorithms(out, title, adb, atype, kex.kex_algorithms, unknown_algorithms, aconf.json, program_retval, maxlen, kex.dh_modulus_sizes())
program_retval = output_algorithms(out, title, adb, atype, kex.kex_algorithms, unknown_algorithms, aconf.json, program_retval, maxlen, dh_modulus_sizes=kex.dh_modulus_sizes())
title, atype = 'host-key algorithms', 'key'
program_retval = output_algorithms(out, title, adb, atype, kex.key_algorithms, unknown_algorithms, aconf.json, program_retval, maxlen, kex.rsa_key_sizes())
program_retval = output_algorithms(out, title, adb, atype, kex.key_algorithms, unknown_algorithms, aconf.json, program_retval, maxlen, host_keys=kex.host_keys())
title, atype = 'encryption algorithms (ciphers)', 'enc'
program_retval = output_algorithms(out, title, adb, atype, kex.server.encryption, unknown_algorithms, aconf.json, program_retval, maxlen)
title, atype = 'message authentication code algorithms', 'mac'
program_retval = output_algorithms(out, title, adb, atype, kex.server.mac, unknown_algorithms, aconf.json, program_retval, maxlen)
output_fingerprints(out, algs, aconf.json)
perfect_config = output_recommendations(out, algs, algorithm_recommendation_suppress_list, software, aconf.json, maxlen)
output_info(out, software, client_audit, not perfect_config, aconf.json)
@@ -830,10 +845,10 @@ def process_commandline(out: OutputBuffer, args: List[str], usage_cb: Callable[.
if (aconf.policy_file is not None) and (aconf.make_policy is False):
# First, see if this is a built-in policy name. If not, assume a file path was provided, and try to load it from disk.
aconf.policy = Policy.load_builtin_policy(aconf.policy_file)
aconf.policy = Policy.load_builtin_policy(aconf.policy_file, json_output=aconf.json)
if aconf.policy is None:
try:
aconf.policy = Policy(policy_file=aconf.policy_file)
aconf.policy = Policy(policy_file=aconf.policy_file, json_output=aconf.json)
except Exception as e:
out.fail("Error while loading policy file: %s: %s" % (str(e), traceback.format_exc()))
out.write()
@@ -885,28 +900,37 @@ def build_struct(target_host: str, banner: Optional['Banner'], cves: List[Dict[s
res['compression'] = kex.server.compression
res['kex'] = []
alg_sizes = kex.dh_modulus_sizes()
dh_alg_sizes = kex.dh_modulus_sizes()
for algorithm in kex.kex_algorithms:
entry: Any = {
'algorithm': algorithm,
}
if algorithm in alg_sizes:
hostkey_size, ca_size = alg_sizes[algorithm]
if algorithm in dh_alg_sizes:
hostkey_size = dh_alg_sizes[algorithm]
entry['keysize'] = hostkey_size
if ca_size > 0:
entry['casize'] = ca_size
res['kex'].append(entry)
res['key'] = []
alg_sizes = kex.rsa_key_sizes()
host_keys = kex.host_keys()
for algorithm in kex.key_algorithms:
entry = {
'algorithm': algorithm,
}
if algorithm in alg_sizes:
hostkey_size, ca_size = alg_sizes[algorithm]
entry['keysize'] = hostkey_size
if algorithm in host_keys:
hostkey_info = host_keys[algorithm]
hostkey_size = cast(int, hostkey_info['hostkey_size'])
ca_type = ''
ca_size = 0
if 'ca_key_type' in hostkey_info:
ca_type = cast(str, hostkey_info['ca_key_type'])
if 'ca_key_size' in hostkey_info:
ca_size = cast(int, hostkey_info['ca_key_size'])
if algorithm in HostKeyTest.RSA_FAMILY or algorithm.startswith('ssh-rsa-cert-v0'):
entry['keysize'] = hostkey_size
if ca_size > 0:
entry['ca_algorithm'] = ca_type
entry['casize'] = ca_size
res['key'].append(entry)
@@ -926,7 +950,7 @@ def build_struct(target_host: str, banner: Optional['Banner'], cves: List[Dict[s
if host_keys[host_key_type] is None:
continue
fp = Fingerprint(host_keys[host_key_type])
fp = Fingerprint(cast(bytes, host_keys[host_key_type]['raw_hostkey_bytes']))
# Skip over certificate host types (or we would return invalid fingerprints).
if '-cert-' in host_key_type:
@@ -1041,7 +1065,7 @@ def audit(out: OutputBuffer, aconf: AuditConf, sshv: Optional[int] = None, print
program_retval = output(out, aconf, banner, header, pkm=SSH1_PublicKeyMessage.parse(payload))
elif sshv == 2:
try:
kex = SSH2_Kex.parse(payload)
kex = SSH2_Kex.parse(out, payload)
except Exception:
out.fail("Failed to parse server's kex. Stack trace:\n%s" % str(traceback.format_exc()))
return exitcodes.CONNECTION_ERROR