mirror of
https://github.com/jtesta/ssh-audit.git
synced 2025-06-23 11:04:31 +02:00
Added return values for standard scans.
This commit is contained in:
98
ssh-audit.py
98
ssh-audit.py
@ -46,6 +46,12 @@ from typing import Callable, Optional, Union, Any
|
||||
VERSION = 'v2.2.1-dev'
|
||||
SSH_HEADER = 'SSH-{0}-OpenSSH_8.0' # SSH software to impersonate
|
||||
|
||||
# The program return values corresponding to failure(s) encountered, warning(s) encountered, connection errors, and no problems found, respectively.
|
||||
PROGRAM_RETVAL_FAILURE = 3
|
||||
PROGRAM_RETVAL_WARNING = 2
|
||||
PROGRAM_RETVAL_CONNECTION_ERROR = 1
|
||||
PROGRAM_RETVAL_GOOD = 0
|
||||
|
||||
try: # pragma: nocover
|
||||
from colorama import init as colorama_init
|
||||
colorama_init(strip=False) # pragma: nocover
|
||||
@ -2831,17 +2837,19 @@ class KexGroupExchange_SHA256(KexGroupExchange):
|
||||
super(KexGroupExchange_SHA256, self).__init__('KexGroupExchange_SHA256', 'sha256')
|
||||
|
||||
|
||||
def output_algorithms(title: str, alg_db: Dict[str, Dict[str, List[List[Optional[str]]]]], alg_type: str, algorithms: List[str], unknown_algs: List[str], maxlen: int = 0, alg_sizes: Optional[Dict[str, Tuple[int, int]]] = None) -> None:
|
||||
def output_algorithms(title: str, alg_db: Dict[str, Dict[str, List[List[Optional[str]]]]], alg_type: str, algorithms: List[str], unknown_algs: List[str], is_json_output: bool, program_retval: int, maxlen: int = 0, alg_sizes: Optional[Dict[str, Tuple[int, int]]] = None) -> int: # pylint: disable=too-many-arguments
|
||||
with OutputBuffer() as obuf:
|
||||
for algorithm in algorithms:
|
||||
output_algorithm(alg_db, alg_type, algorithm, unknown_algs, maxlen, alg_sizes)
|
||||
if len(obuf) > 0:
|
||||
program_retval = output_algorithm(alg_db, alg_type, algorithm, unknown_algs, program_retval, maxlen, alg_sizes)
|
||||
if len(obuf) > 0 and not is_json_output:
|
||||
out.head('# ' + title)
|
||||
obuf.flush()
|
||||
out.sep()
|
||||
|
||||
return program_retval
|
||||
|
||||
def output_algorithm(alg_db: Dict[str, Dict[str, List[List[Optional[str]]]]], alg_type: str, alg_name: str, unknown_algs: List[str], alg_max_len: int = 0, alg_sizes: Optional[Dict[str, Tuple[int, int]]] = None) -> None:
|
||||
|
||||
def output_algorithm(alg_db: Dict[str, Dict[str, List[List[Optional[str]]]]], alg_type: str, alg_name: str, unknown_algs: List[str], program_retval: int, alg_max_len: int = 0, alg_sizes: Optional[Dict[str, Tuple[int, int]]] = None) -> int:
|
||||
prefix = '(' + alg_type + ') '
|
||||
if alg_max_len == 0:
|
||||
alg_max_len = len(alg_name)
|
||||
@ -2861,7 +2869,7 @@ def output_algorithm(alg_db: Dict[str, Dict[str, List[List[Optional[str]]]]], al
|
||||
|
||||
texts = []
|
||||
if len(alg_name.strip()) == 0:
|
||||
return
|
||||
return program_retval
|
||||
alg_name_native = utils.to_text(alg_name)
|
||||
if alg_name_native in alg_db[alg_type]:
|
||||
alg_desc = alg_db[alg_type][alg_name_native]
|
||||
@ -2887,6 +2895,11 @@ def output_algorithm(alg_db: Dict[str, Dict[str, List[List[Optional[str]]]]], al
|
||||
alg_name = alg_name_with_size if alg_name_with_size is not None else alg_name
|
||||
first = True
|
||||
for level, text in texts:
|
||||
if level == 'fail':
|
||||
program_retval = PROGRAM_RETVAL_FAILURE
|
||||
elif level == 'warn' and program_retval != PROGRAM_RETVAL_FAILURE: # If a failure was found previously, don't downgrade to warning.
|
||||
program_retval = PROGRAM_RETVAL_WARNING
|
||||
|
||||
f = getattr(out, level)
|
||||
comment = (padding + ' -- [' + level + '] ' + text) if text != '' else ''
|
||||
if first:
|
||||
@ -2901,6 +2914,8 @@ def output_algorithm(alg_db: Dict[str, Dict[str, List[List[Optional[str]]]]], al
|
||||
comment = (padding + ' `- [' + level + '] ' + text)
|
||||
f(' ' * len(prefix + alg_name) + comment)
|
||||
|
||||
return program_retval
|
||||
|
||||
|
||||
def output_compatibility(algs: SSH.Algorithms, client_audit: bool, for_server: bool = True) -> None:
|
||||
|
||||
@ -2968,19 +2983,19 @@ def output_security_sub(sub: str, software: Optional[SSH.Software], client_audit
|
||||
out.fail('(sec) {}{} -- {}'.format(name, p, descr))
|
||||
|
||||
|
||||
def output_security(banner: Optional[SSH.Banner], client_audit: bool, padlen: int) -> None:
|
||||
def output_security(banner: Optional[SSH.Banner], client_audit: bool, padlen: int, is_json_output: bool) -> None:
|
||||
with OutputBuffer() as obuf:
|
||||
if banner is not None:
|
||||
software = SSH.Software.parse(banner)
|
||||
output_security_sub('cve', software, client_audit, padlen)
|
||||
output_security_sub('txt', software, client_audit, padlen)
|
||||
if len(obuf) > 0:
|
||||
if len(obuf) > 0 and not is_json_output:
|
||||
out.head('# security')
|
||||
obuf.flush()
|
||||
out.sep()
|
||||
|
||||
|
||||
def output_fingerprints(algs: SSH.Algorithms, sha256: bool = True) -> None:
|
||||
def output_fingerprints(algs: SSH.Algorithms, is_json_output: bool, sha256: bool = True) -> None:
|
||||
with OutputBuffer() as obuf:
|
||||
fps = []
|
||||
if algs.ssh1kex is not None:
|
||||
@ -3011,14 +3026,14 @@ def output_fingerprints(algs: SSH.Algorithms, sha256: bool = True) -> None:
|
||||
# p = '' if out.batch else ' ' * (padlen - len(name))
|
||||
# out.good('(fin) {0}{1} -- {2} {3}'.format(name, p, bits, fpo))
|
||||
out.good('(fin) {}: {}'.format(name, fpo))
|
||||
if len(obuf) > 0:
|
||||
if len(obuf) > 0 and not is_json_output:
|
||||
out.head('# fingerprints')
|
||||
obuf.flush()
|
||||
out.sep()
|
||||
|
||||
|
||||
# Returns True if no warnings or failures encountered in configuration.
|
||||
def output_recommendations(algs: SSH.Algorithms, software: Optional[SSH.Software], padlen: int = 0) -> bool:
|
||||
def output_recommendations(algs: SSH.Algorithms, software: Optional[SSH.Software], is_json_output: bool, padlen: int = 0) -> bool:
|
||||
|
||||
ret = True
|
||||
# PuTTY's algorithms cannot be modified, so there's no point in issuing recommendations.
|
||||
@ -3078,7 +3093,7 @@ def output_recommendations(algs: SSH.Algorithms, software: Optional[SSH.Software
|
||||
b = '(SSH{})'.format(sshv) if sshv == 1 else ''
|
||||
fm = '(rec) {0}{1}{2}-- {3} algorithm to {4}{5} {6}'
|
||||
fn(fm.format(sg, name, p, alg_type, an, chg_additional_info, b))
|
||||
if len(obuf) > 0:
|
||||
if len(obuf) > 0 and not is_json_output:
|
||||
if software is not None:
|
||||
title = '(for {})'.format(software.display(False))
|
||||
else:
|
||||
@ -3090,7 +3105,7 @@ def output_recommendations(algs: SSH.Algorithms, software: Optional[SSH.Software
|
||||
|
||||
|
||||
# Output additional information & notes.
|
||||
def output_info(software: Optional['SSH.Software'], client_audit: bool, any_problems: bool) -> None:
|
||||
def output_info(software: Optional['SSH.Software'], client_audit: bool, any_problems: bool, is_json_output: bool) -> None:
|
||||
with OutputBuffer() as obuf:
|
||||
# Tell user that PuTTY cannot be hardened at the protocol-level.
|
||||
if client_audit and (software is not None) and (software.product == SSH.Product.PuTTY):
|
||||
@ -3100,19 +3115,16 @@ def output_info(software: Optional['SSH.Software'], client_audit: bool, any_prob
|
||||
if any_problems:
|
||||
out.warn('(nfo) For hardening guides on common OSes, please see: <https://www.ssh-audit.com/hardening_guides.html>')
|
||||
|
||||
if len(obuf) > 0:
|
||||
if len(obuf) > 0 and not is_json_output:
|
||||
out.head('# additional info')
|
||||
obuf.flush()
|
||||
out.sep()
|
||||
|
||||
|
||||
def output(aconf: AuditConf, banner: Optional[SSH.Banner], header: List[str], client_host: Optional[str] = None, kex: Optional[SSH2.Kex] = None, pkm: Optional[SSH1.PublicKeyMessage] = None) -> None:
|
||||
|
||||
# If the user requested JSON output, output that and return immediately.
|
||||
if aconf.json:
|
||||
print(json.dumps(build_struct(banner, kex=kex, client_host=client_host), sort_keys=True))
|
||||
return
|
||||
# Returns a PROGRAM_RETVAL_* flag to denote if any failures or warnings were encountered.
|
||||
def output(aconf: AuditConf, banner: Optional[SSH.Banner], header: List[str], client_host: Optional[str] = None, kex: Optional[SSH2.Kex] = None, pkm: Optional[SSH1.PublicKeyMessage] = None) -> int:
|
||||
|
||||
program_retval = PROGRAM_RETVAL_GOOD
|
||||
client_audit = client_host is not None # If set, this is a client audit.
|
||||
sshv = 1 if pkm is not None else 2
|
||||
algs = SSH.Algorithms(pkm, kex)
|
||||
@ -3141,12 +3153,12 @@ def output(aconf: AuditConf, banner: Optional[SSH.Banner], header: List[str], cl
|
||||
else:
|
||||
cmptxt = 'disabled'
|
||||
out.good('(gen) compression: {}'.format(cmptxt))
|
||||
if len(obuf) > 0:
|
||||
if len(obuf) > 0 and not aconf.json: # Print output when it exists and JSON output isn't requested.
|
||||
out.head('# general')
|
||||
obuf.flush()
|
||||
out.sep()
|
||||
maxlen = algs.maxlen + 1
|
||||
output_security(banner, client_audit, maxlen)
|
||||
output_security(banner, client_audit, maxlen, aconf.json)
|
||||
# Filled in by output_algorithms() with unidentified algs.
|
||||
unknown_algorithms = [] # type: List[str]
|
||||
if pkm is not None:
|
||||
@ -3154,29 +3166,33 @@ def output(aconf: AuditConf, banner: Optional[SSH.Banner], header: List[str], cl
|
||||
ciphers = pkm.supported_ciphers
|
||||
auths = pkm.supported_authentications
|
||||
title, atype = 'SSH1 host-key algorithms', 'key'
|
||||
output_algorithms(title, adb, atype, ['ssh-rsa1'], unknown_algorithms, maxlen)
|
||||
program_retval = output_algorithms(title, adb, atype, ['ssh-rsa1'], unknown_algorithms, aconf.json, program_retval, maxlen)
|
||||
title, atype = 'SSH1 encryption algorithms (ciphers)', 'enc'
|
||||
output_algorithms(title, adb, atype, ciphers, unknown_algorithms, maxlen)
|
||||
program_retval = output_algorithms(title, adb, atype, ciphers, unknown_algorithms, aconf.json, program_retval, maxlen)
|
||||
title, atype = 'SSH1 authentication types', 'aut'
|
||||
output_algorithms(title, adb, atype, auths, unknown_algorithms, maxlen)
|
||||
program_retval = output_algorithms(title, adb, atype, auths, unknown_algorithms, aconf.json, program_retval, maxlen)
|
||||
if kex is not None:
|
||||
adb = SSH2.KexDB.ALGORITHMS
|
||||
title, atype = 'key exchange algorithms', 'kex'
|
||||
output_algorithms(title, adb, atype, kex.kex_algorithms, unknown_algorithms, maxlen, kex.dh_modulus_sizes())
|
||||
program_retval = output_algorithms(title, adb, atype, kex.kex_algorithms, unknown_algorithms, aconf.json, program_retval, maxlen, kex.dh_modulus_sizes())
|
||||
title, atype = 'host-key algorithms', 'key'
|
||||
output_algorithms(title, adb, atype, kex.key_algorithms, unknown_algorithms, maxlen, kex.rsa_key_sizes())
|
||||
program_retval = output_algorithms(title, adb, atype, kex.key_algorithms, unknown_algorithms, aconf.json, program_retval, maxlen, kex.rsa_key_sizes())
|
||||
title, atype = 'encryption algorithms (ciphers)', 'enc'
|
||||
output_algorithms(title, adb, atype, kex.server.encryption, unknown_algorithms, maxlen)
|
||||
program_retval = output_algorithms(title, adb, atype, kex.server.encryption, unknown_algorithms, aconf.json, program_retval, maxlen)
|
||||
title, atype = 'message authentication code algorithms', 'mac'
|
||||
output_algorithms(title, adb, atype, kex.server.mac, unknown_algorithms, maxlen)
|
||||
output_fingerprints(algs, True)
|
||||
perfect_config = output_recommendations(algs, software, maxlen)
|
||||
output_info(software, client_audit, not perfect_config)
|
||||
program_retval = output_algorithms(title, adb, atype, kex.server.mac, unknown_algorithms, aconf.json, program_retval, maxlen)
|
||||
output_fingerprints(algs, aconf.json, True)
|
||||
perfect_config = output_recommendations(algs, software, aconf.json, maxlen)
|
||||
output_info(software, client_audit, not perfect_config, aconf.json)
|
||||
|
||||
# If we encountered any unknown algorithms, ask the user to report them.
|
||||
if len(unknown_algorithms) > 0:
|
||||
# If the user requested JSON output, output that and return immediately.
|
||||
if aconf.json:
|
||||
print(json.dumps(build_struct(banner, kex=kex, client_host=client_host), sort_keys=True))
|
||||
elif len(unknown_algorithms) > 0: # If we encountered any unknown algorithms, ask the user to report them.
|
||||
out.warn("\n\n!!! WARNING: unknown algorithm(s) found!: %s. Please email the full output above to the maintainer (jtesta@positronsecurity.com), or create a Github issue at <https://github.com/jtesta/ssh-audit/issues>.\n" % ','.join(unknown_algorithms))
|
||||
|
||||
return program_retval
|
||||
|
||||
|
||||
def evaluate_policy(aconf: AuditConf, banner: Optional['SSH.Banner'], kex: Optional['SSH2.Kex'] = None) -> bool:
|
||||
|
||||
@ -3415,7 +3431,9 @@ def build_struct(banner: Optional['SSH.Banner'], kex: Optional['SSH2.Kex'] = Non
|
||||
return res
|
||||
|
||||
|
||||
# Returns one of the PROGRAM_RETVAL_* flags.
|
||||
def audit(aconf: AuditConf, sshv: Optional[int] = None) -> int:
|
||||
program_retval = PROGRAM_RETVAL_GOOD
|
||||
out.batch = aconf.batch
|
||||
out.verbose = aconf.verbose
|
||||
out.level = aconf.level
|
||||
@ -3461,13 +3479,9 @@ def audit(aconf: AuditConf, sshv: Optional[int] = None) -> int:
|
||||
if err is not None:
|
||||
output(aconf, banner, header)
|
||||
out.fail(err)
|
||||
return 1
|
||||
return PROGRAM_RETVAL_CONNECTION_ERROR
|
||||
if sshv == 1:
|
||||
pkm = SSH1.PublicKeyMessage.parse(payload)
|
||||
if aconf.json:
|
||||
print(json.dumps(build_struct(banner, pkm=pkm), sort_keys=True))
|
||||
else:
|
||||
output(aconf, banner, header, pkm=pkm)
|
||||
program_retval = output(aconf, banner, header, pkm=SSH1.PublicKeyMessage.parse(payload))
|
||||
elif sshv == 2:
|
||||
kex = SSH2.Kex.parse(payload)
|
||||
if aconf.client_audit is False:
|
||||
@ -3476,11 +3490,11 @@ def audit(aconf: AuditConf, sshv: Optional[int] = None) -> int:
|
||||
|
||||
# This is a standard audit scan.
|
||||
if (aconf.policy is None) and (aconf.make_policy is False):
|
||||
output(aconf, banner, header, client_host=s.client_host, kex=kex)
|
||||
program_retval = output(aconf, banner, header, client_host=s.client_host, kex=kex)
|
||||
|
||||
# This is a policy test.
|
||||
elif (aconf.policy is not None) and (aconf.make_policy is False):
|
||||
return 0 if evaluate_policy(aconf, banner, kex=kex) else 1
|
||||
program_retval = PROGRAM_RETVAL_GOOD if evaluate_policy(aconf, banner, kex=kex) else PROGRAM_RETVAL_FAILURE
|
||||
|
||||
# A new policy should be made from this scan.
|
||||
elif (aconf.policy is None) and (aconf.make_policy is True):
|
||||
@ -3489,7 +3503,7 @@ def audit(aconf: AuditConf, sshv: Optional[int] = None) -> int:
|
||||
else:
|
||||
raise RuntimeError('Internal error while handling output: %r %r' % (aconf.policy is None, aconf.make_policy))
|
||||
|
||||
return 0
|
||||
return program_retval
|
||||
|
||||
|
||||
utils = Utils()
|
||||
|
Reference in New Issue
Block a user