From 49bd2c96a867f5960d7ebb429c95e358aeafd7fe Mon Sep 17 00:00:00 2001 From: Joe Testa Date: Tue, 7 Jul 2020 15:56:37 -0400 Subject: [PATCH] Added return values for standard scans. --- docker_test.sh | 76 ++++++++++++++++++++++----------------- ssh-audit.py | 98 ++++++++++++++++++++++++++++---------------------- 2 files changed, 100 insertions(+), 74 deletions(-) diff --git a/docker_test.sh b/docker_test.sh index 48d0d7e..03f72cd 100755 --- a/docker_test.sh +++ b/docker_test.sh @@ -28,6 +28,12 @@ GREEN="\033[0;32m" REDB="\033[1;31m" # Red + bold GREENB="\033[1;32m" # Green + bold +# Program return values. +PROGRAM_RETVAL_FAILURE=3 +PROGRAM_RETVAL_WARNING=2 +PROGRAM_RETVAL_CONNECTION_ERROR=1 +PROGRAM_RETVAL_GOOD=0 + # Returns 0 if current docker image exists. function check_if_docker_image_exists { @@ -353,8 +359,9 @@ function run_dropbear_test { dropbear_version=$1 test_number=$2 options=$3 + expected_retval=$4 - run_test 'Dropbear' $dropbear_version $test_number "$options" + run_test 'Dropbear' $dropbear_version $test_number "$options" $expected_retval } @@ -363,8 +370,9 @@ function run_dropbear_test { function run_openssh_test { openssh_version=$1 test_number=$2 + expected_retval=$3 - run_test 'OpenSSH' $openssh_version $test_number '' + run_test 'OpenSSH' $openssh_version $test_number '' $expected_retval } @@ -373,8 +381,9 @@ function run_openssh_test { function run_tinyssh_test { tinyssh_version=$1 test_number=$2 + expected_retval=$3 - run_test 'TinySSH' $tinyssh_version $test_number '' + run_test 'TinySSH' $tinyssh_version $test_number '' $expected_retval } @@ -383,6 +392,7 @@ function run_test { version=$2 test_number=$3 options=$4 + expected_retval=$5 server_exec= test_result_stdout= @@ -421,15 +431,17 @@ function run_test { fi ./ssh-audit.py localhost:2222 > $test_result_stdout - if [[ $? != 0 ]]; then - echo -e "${REDB}Failed to run ssh-audit.py! (exit code: $?)${CLR}" + actual_retval=$? + if [[ $actual_retval != $expected_retval ]]; then + echo -e "${REDB}Unexpected return value. Expected: ${expected_retval}; Actual: ${actual_retval}${CLR}" docker container stop -t 0 $cid > /dev/null exit 1 fi ./ssh-audit.py -j localhost:2222 > $test_result_json - if [[ $? != 0 ]]; then - echo -e "${REDB}Failed to run ssh-audit.py! (exit code: $?)${CLR}" + actual_retval=$? + if [[ $actual_retval != $expected_retval ]]; then + echo -e "${REDB}Unexpected return value. Expected: ${expected_retval}; Actual: ${actual_retval}${CLR}" docker container stop -t 0 $cid > /dev/null exit 1 fi @@ -560,53 +572,53 @@ TEST_RESULT_DIR=`mktemp -d /tmp/ssh-audit_test-results_XXXXXXXXXX` # Now run all the tests. echo -e "\nRunning tests..." -run_openssh_test '4.0p1' 'test1' +run_openssh_test '4.0p1' 'test1' $PROGRAM_RETVAL_FAILURE echo -run_openssh_test '5.6p1' 'test1' -run_openssh_test '5.6p1' 'test2' -run_openssh_test '5.6p1' 'test3' -run_openssh_test '5.6p1' 'test4' -run_openssh_test '5.6p1' 'test5' +run_openssh_test '5.6p1' 'test1' $PROGRAM_RETVAL_FAILURE +run_openssh_test '5.6p1' 'test2' $PROGRAM_RETVAL_FAILURE +run_openssh_test '5.6p1' 'test3' $PROGRAM_RETVAL_FAILURE +run_openssh_test '5.6p1' 'test4' $PROGRAM_RETVAL_FAILURE +run_openssh_test '5.6p1' 'test5' $PROGRAM_RETVAL_FAILURE echo -run_openssh_test '8.0p1' 'test1' -run_openssh_test '8.0p1' 'test2' -run_openssh_test '8.0p1' 'test3' +run_openssh_test '8.0p1' 'test1' $PROGRAM_RETVAL_FAILURE +run_openssh_test '8.0p1' 'test2' $PROGRAM_RETVAL_FAILURE +run_openssh_test '8.0p1' 'test3' $PROGRAM_RETVAL_GOOD echo -run_dropbear_test '2019.78' 'test1' '-r /etc/dropbear/dropbear_rsa_host_key_1024 -r /etc/dropbear/dropbear_dss_host_key -r /etc/dropbear/dropbear_ecdsa_host_key' +run_dropbear_test '2019.78' 'test1' '-r /etc/dropbear/dropbear_rsa_host_key_1024 -r /etc/dropbear/dropbear_dss_host_key -r /etc/dropbear/dropbear_ecdsa_host_key' 3 echo -run_tinyssh_test '20190101' 'test1' +run_tinyssh_test '20190101' 'test1' $PROGRAM_RETVAL_WARNING echo echo -run_policy_test 'config1' 'test1' '0' -run_policy_test 'config1' 'test2' '1' -run_policy_test 'config1' 'test3' '1' -run_policy_test 'config1' 'test4' '1' -run_policy_test 'config1' 'test5' '1' -run_policy_test 'config2' 'test6' '0' +run_policy_test 'config1' 'test1' $PROGRAM_RETVAL_GOOD +run_policy_test 'config1' 'test2' $PROGRAM_RETVAL_FAILURE +run_policy_test 'config1' 'test3' $PROGRAM_RETVAL_FAILURE +run_policy_test 'config1' 'test4' $PROGRAM_RETVAL_FAILURE +run_policy_test 'config1' 'test5' $PROGRAM_RETVAL_FAILURE +run_policy_test 'config2' 'test6' $PROGRAM_RETVAL_GOOD # Passing test with host key certificate and CA key certificates. -run_policy_test 'config3' 'test7' '0' +run_policy_test 'config3' 'test7' $PROGRAM_RETVAL_GOOD # Failing test with host key certificate and non-compliant CA key length. -run_policy_test 'config3' 'test8' '1' +run_policy_test 'config3' 'test8' $PROGRAM_RETVAL_FAILURE # Failing test with non-compliant host key certificate and CA key certificate. -run_policy_test 'config3' 'test9' '1' +run_policy_test 'config3' 'test9' $PROGRAM_RETVAL_FAILURE # Failing test with non-compliant host key certificate and non-compliant CA key certificate. -run_policy_test 'config3' 'test10' '1' +run_policy_test 'config3' 'test10' $PROGRAM_RETVAL_FAILURE # Passing test with host key size check. -run_policy_test 'config2' 'test11' '0' +run_policy_test 'config2' 'test11' $PROGRAM_RETVAL_GOOD # Failing test with non-compliant host key size check. -run_policy_test 'config2' 'test12' '1' +run_policy_test 'config2' 'test12' $PROGRAM_RETVAL_FAILURE # Passing test with DH modulus test. -run_policy_test 'config2' 'test13' '0' +run_policy_test 'config2' 'test13' $PROGRAM_RETVAL_GOOD # Failing test with DH modulus test. -run_policy_test 'config2' 'test14' '1' +run_policy_test 'config2' 'test14' $PROGRAM_RETVAL_FAILURE # The test functions above will terminate the script on failure, so if we reached here, diff --git a/ssh-audit.py b/ssh-audit.py index 8d320eb..fbd484b 100755 --- a/ssh-audit.py +++ b/ssh-audit.py @@ -46,6 +46,12 @@ from typing import Callable, Optional, Union, Any VERSION = 'v2.2.1-dev' SSH_HEADER = 'SSH-{0}-OpenSSH_8.0' # SSH software to impersonate +# The program return values corresponding to failure(s) encountered, warning(s) encountered, connection errors, and no problems found, respectively. +PROGRAM_RETVAL_FAILURE = 3 +PROGRAM_RETVAL_WARNING = 2 +PROGRAM_RETVAL_CONNECTION_ERROR = 1 +PROGRAM_RETVAL_GOOD = 0 + try: # pragma: nocover from colorama import init as colorama_init colorama_init(strip=False) # pragma: nocover @@ -2831,17 +2837,19 @@ class KexGroupExchange_SHA256(KexGroupExchange): super(KexGroupExchange_SHA256, self).__init__('KexGroupExchange_SHA256', 'sha256') -def output_algorithms(title: str, alg_db: Dict[str, Dict[str, List[List[Optional[str]]]]], alg_type: str, algorithms: List[str], unknown_algs: List[str], maxlen: int = 0, alg_sizes: Optional[Dict[str, Tuple[int, int]]] = None) -> None: +def output_algorithms(title: str, alg_db: Dict[str, Dict[str, List[List[Optional[str]]]]], alg_type: str, algorithms: List[str], unknown_algs: List[str], is_json_output: bool, program_retval: int, maxlen: int = 0, alg_sizes: Optional[Dict[str, Tuple[int, int]]] = None) -> int: # pylint: disable=too-many-arguments with OutputBuffer() as obuf: for algorithm in algorithms: - output_algorithm(alg_db, alg_type, algorithm, unknown_algs, maxlen, alg_sizes) - if len(obuf) > 0: + program_retval = output_algorithm(alg_db, alg_type, algorithm, unknown_algs, program_retval, maxlen, alg_sizes) + if len(obuf) > 0 and not is_json_output: out.head('# ' + title) obuf.flush() out.sep() + return program_retval -def output_algorithm(alg_db: Dict[str, Dict[str, List[List[Optional[str]]]]], alg_type: str, alg_name: str, unknown_algs: List[str], alg_max_len: int = 0, alg_sizes: Optional[Dict[str, Tuple[int, int]]] = None) -> None: + +def output_algorithm(alg_db: Dict[str, Dict[str, List[List[Optional[str]]]]], alg_type: str, alg_name: str, unknown_algs: List[str], program_retval: int, alg_max_len: int = 0, alg_sizes: Optional[Dict[str, Tuple[int, int]]] = None) -> int: prefix = '(' + alg_type + ') ' if alg_max_len == 0: alg_max_len = len(alg_name) @@ -2861,7 +2869,7 @@ def output_algorithm(alg_db: Dict[str, Dict[str, List[List[Optional[str]]]]], al texts = [] if len(alg_name.strip()) == 0: - return + return program_retval alg_name_native = utils.to_text(alg_name) if alg_name_native in alg_db[alg_type]: alg_desc = alg_db[alg_type][alg_name_native] @@ -2887,6 +2895,11 @@ def output_algorithm(alg_db: Dict[str, Dict[str, List[List[Optional[str]]]]], al alg_name = alg_name_with_size if alg_name_with_size is not None else alg_name first = True for level, text in texts: + if level == 'fail': + program_retval = PROGRAM_RETVAL_FAILURE + elif level == 'warn' and program_retval != PROGRAM_RETVAL_FAILURE: # If a failure was found previously, don't downgrade to warning. + program_retval = PROGRAM_RETVAL_WARNING + f = getattr(out, level) comment = (padding + ' -- [' + level + '] ' + text) if text != '' else '' if first: @@ -2901,6 +2914,8 @@ def output_algorithm(alg_db: Dict[str, Dict[str, List[List[Optional[str]]]]], al comment = (padding + ' `- [' + level + '] ' + text) f(' ' * len(prefix + alg_name) + comment) + return program_retval + def output_compatibility(algs: SSH.Algorithms, client_audit: bool, for_server: bool = True) -> None: @@ -2968,19 +2983,19 @@ def output_security_sub(sub: str, software: Optional[SSH.Software], client_audit out.fail('(sec) {}{} -- {}'.format(name, p, descr)) -def output_security(banner: Optional[SSH.Banner], client_audit: bool, padlen: int) -> None: +def output_security(banner: Optional[SSH.Banner], client_audit: bool, padlen: int, is_json_output: bool) -> None: with OutputBuffer() as obuf: if banner is not None: software = SSH.Software.parse(banner) output_security_sub('cve', software, client_audit, padlen) output_security_sub('txt', software, client_audit, padlen) - if len(obuf) > 0: + if len(obuf) > 0 and not is_json_output: out.head('# security') obuf.flush() out.sep() -def output_fingerprints(algs: SSH.Algorithms, sha256: bool = True) -> None: +def output_fingerprints(algs: SSH.Algorithms, is_json_output: bool, sha256: bool = True) -> None: with OutputBuffer() as obuf: fps = [] if algs.ssh1kex is not None: @@ -3011,14 +3026,14 @@ def output_fingerprints(algs: SSH.Algorithms, sha256: bool = True) -> None: # p = '' if out.batch else ' ' * (padlen - len(name)) # out.good('(fin) {0}{1} -- {2} {3}'.format(name, p, bits, fpo)) out.good('(fin) {}: {}'.format(name, fpo)) - if len(obuf) > 0: + if len(obuf) > 0 and not is_json_output: out.head('# fingerprints') obuf.flush() out.sep() # Returns True if no warnings or failures encountered in configuration. -def output_recommendations(algs: SSH.Algorithms, software: Optional[SSH.Software], padlen: int = 0) -> bool: +def output_recommendations(algs: SSH.Algorithms, software: Optional[SSH.Software], is_json_output: bool, padlen: int = 0) -> bool: ret = True # PuTTY's algorithms cannot be modified, so there's no point in issuing recommendations. @@ -3078,7 +3093,7 @@ def output_recommendations(algs: SSH.Algorithms, software: Optional[SSH.Software b = '(SSH{})'.format(sshv) if sshv == 1 else '' fm = '(rec) {0}{1}{2}-- {3} algorithm to {4}{5} {6}' fn(fm.format(sg, name, p, alg_type, an, chg_additional_info, b)) - if len(obuf) > 0: + if len(obuf) > 0 and not is_json_output: if software is not None: title = '(for {})'.format(software.display(False)) else: @@ -3090,7 +3105,7 @@ def output_recommendations(algs: SSH.Algorithms, software: Optional[SSH.Software # Output additional information & notes. -def output_info(software: Optional['SSH.Software'], client_audit: bool, any_problems: bool) -> None: +def output_info(software: Optional['SSH.Software'], client_audit: bool, any_problems: bool, is_json_output: bool) -> None: with OutputBuffer() as obuf: # Tell user that PuTTY cannot be hardened at the protocol-level. if client_audit and (software is not None) and (software.product == SSH.Product.PuTTY): @@ -3100,19 +3115,16 @@ def output_info(software: Optional['SSH.Software'], client_audit: bool, any_prob if any_problems: out.warn('(nfo) For hardening guides on common OSes, please see: ') - if len(obuf) > 0: + if len(obuf) > 0 and not is_json_output: out.head('# additional info') obuf.flush() out.sep() -def output(aconf: AuditConf, banner: Optional[SSH.Banner], header: List[str], client_host: Optional[str] = None, kex: Optional[SSH2.Kex] = None, pkm: Optional[SSH1.PublicKeyMessage] = None) -> None: - - # If the user requested JSON output, output that and return immediately. - if aconf.json: - print(json.dumps(build_struct(banner, kex=kex, client_host=client_host), sort_keys=True)) - return +# Returns a PROGRAM_RETVAL_* flag to denote if any failures or warnings were encountered. +def output(aconf: AuditConf, banner: Optional[SSH.Banner], header: List[str], client_host: Optional[str] = None, kex: Optional[SSH2.Kex] = None, pkm: Optional[SSH1.PublicKeyMessage] = None) -> int: + program_retval = PROGRAM_RETVAL_GOOD client_audit = client_host is not None # If set, this is a client audit. sshv = 1 if pkm is not None else 2 algs = SSH.Algorithms(pkm, kex) @@ -3141,12 +3153,12 @@ def output(aconf: AuditConf, banner: Optional[SSH.Banner], header: List[str], cl else: cmptxt = 'disabled' out.good('(gen) compression: {}'.format(cmptxt)) - if len(obuf) > 0: + if len(obuf) > 0 and not aconf.json: # Print output when it exists and JSON output isn't requested. out.head('# general') obuf.flush() out.sep() maxlen = algs.maxlen + 1 - output_security(banner, client_audit, maxlen) + output_security(banner, client_audit, maxlen, aconf.json) # Filled in by output_algorithms() with unidentified algs. unknown_algorithms = [] # type: List[str] if pkm is not None: @@ -3154,29 +3166,33 @@ def output(aconf: AuditConf, banner: Optional[SSH.Banner], header: List[str], cl ciphers = pkm.supported_ciphers auths = pkm.supported_authentications title, atype = 'SSH1 host-key algorithms', 'key' - output_algorithms(title, adb, atype, ['ssh-rsa1'], unknown_algorithms, maxlen) + program_retval = output_algorithms(title, adb, atype, ['ssh-rsa1'], unknown_algorithms, aconf.json, program_retval, maxlen) title, atype = 'SSH1 encryption algorithms (ciphers)', 'enc' - output_algorithms(title, adb, atype, ciphers, unknown_algorithms, maxlen) + program_retval = output_algorithms(title, adb, atype, ciphers, unknown_algorithms, aconf.json, program_retval, maxlen) title, atype = 'SSH1 authentication types', 'aut' - output_algorithms(title, adb, atype, auths, unknown_algorithms, maxlen) + program_retval = output_algorithms(title, adb, atype, auths, unknown_algorithms, aconf.json, program_retval, maxlen) if kex is not None: adb = SSH2.KexDB.ALGORITHMS title, atype = 'key exchange algorithms', 'kex' - output_algorithms(title, adb, atype, kex.kex_algorithms, unknown_algorithms, maxlen, kex.dh_modulus_sizes()) + program_retval = output_algorithms(title, adb, atype, kex.kex_algorithms, unknown_algorithms, aconf.json, program_retval, maxlen, kex.dh_modulus_sizes()) title, atype = 'host-key algorithms', 'key' - output_algorithms(title, adb, atype, kex.key_algorithms, unknown_algorithms, maxlen, kex.rsa_key_sizes()) + program_retval = output_algorithms(title, adb, atype, kex.key_algorithms, unknown_algorithms, aconf.json, program_retval, maxlen, kex.rsa_key_sizes()) title, atype = 'encryption algorithms (ciphers)', 'enc' - output_algorithms(title, adb, atype, kex.server.encryption, unknown_algorithms, maxlen) + program_retval = output_algorithms(title, adb, atype, kex.server.encryption, unknown_algorithms, aconf.json, program_retval, maxlen) title, atype = 'message authentication code algorithms', 'mac' - output_algorithms(title, adb, atype, kex.server.mac, unknown_algorithms, maxlen) - output_fingerprints(algs, True) - perfect_config = output_recommendations(algs, software, maxlen) - output_info(software, client_audit, not perfect_config) + program_retval = output_algorithms(title, adb, atype, kex.server.mac, unknown_algorithms, aconf.json, program_retval, maxlen) + output_fingerprints(algs, aconf.json, True) + perfect_config = output_recommendations(algs, software, aconf.json, maxlen) + output_info(software, client_audit, not perfect_config, aconf.json) - # If we encountered any unknown algorithms, ask the user to report them. - if len(unknown_algorithms) > 0: + # If the user requested JSON output, output that and return immediately. + if aconf.json: + print(json.dumps(build_struct(banner, kex=kex, client_host=client_host), sort_keys=True)) + elif len(unknown_algorithms) > 0: # If we encountered any unknown algorithms, ask the user to report them. out.warn("\n\n!!! WARNING: unknown algorithm(s) found!: %s. Please email the full output above to the maintainer (jtesta@positronsecurity.com), or create a Github issue at .\n" % ','.join(unknown_algorithms)) + return program_retval + def evaluate_policy(aconf: AuditConf, banner: Optional['SSH.Banner'], kex: Optional['SSH2.Kex'] = None) -> bool: @@ -3415,7 +3431,9 @@ def build_struct(banner: Optional['SSH.Banner'], kex: Optional['SSH2.Kex'] = Non return res +# Returns one of the PROGRAM_RETVAL_* flags. def audit(aconf: AuditConf, sshv: Optional[int] = None) -> int: + program_retval = PROGRAM_RETVAL_GOOD out.batch = aconf.batch out.verbose = aconf.verbose out.level = aconf.level @@ -3461,13 +3479,9 @@ def audit(aconf: AuditConf, sshv: Optional[int] = None) -> int: if err is not None: output(aconf, banner, header) out.fail(err) - return 1 + return PROGRAM_RETVAL_CONNECTION_ERROR if sshv == 1: - pkm = SSH1.PublicKeyMessage.parse(payload) - if aconf.json: - print(json.dumps(build_struct(banner, pkm=pkm), sort_keys=True)) - else: - output(aconf, banner, header, pkm=pkm) + program_retval = output(aconf, banner, header, pkm=SSH1.PublicKeyMessage.parse(payload)) elif sshv == 2: kex = SSH2.Kex.parse(payload) if aconf.client_audit is False: @@ -3476,11 +3490,11 @@ def audit(aconf: AuditConf, sshv: Optional[int] = None) -> int: # This is a standard audit scan. if (aconf.policy is None) and (aconf.make_policy is False): - output(aconf, banner, header, client_host=s.client_host, kex=kex) + program_retval = output(aconf, banner, header, client_host=s.client_host, kex=kex) # This is a policy test. elif (aconf.policy is not None) and (aconf.make_policy is False): - return 0 if evaluate_policy(aconf, banner, kex=kex) else 1 + program_retval = PROGRAM_RETVAL_GOOD if evaluate_policy(aconf, banner, kex=kex) else PROGRAM_RETVAL_FAILURE # A new policy should be made from this scan. elif (aconf.policy is None) and (aconf.make_policy is True): @@ -3489,7 +3503,7 @@ def audit(aconf: AuditConf, sshv: Optional[int] = None) -> int: else: raise RuntimeError('Internal error while handling output: %r %r' % (aconf.policy is None, aconf.make_policy)) - return 0 + return program_retval utils = Utils()