mirror of
				https://github.com/jtesta/ssh-audit.git
				synced 2025-11-03 18:52:15 +01:00 
			
		
		
		
	Added recommendations and CVE information to JSON output (#122).
This commit is contained in:
		@@ -227,10 +227,12 @@ def output_compatibility(out: OutputBuffer, algs: Algorithms, client_audit: bool
 | 
			
		||||
        out.good('(gen) compatibility: ' + ', '.join(comp_text))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def output_security_sub(out: OutputBuffer, sub: str, software: Optional[Software], client_audit: bool, padlen: int) -> None:
 | 
			
		||||
def output_security_sub(out: OutputBuffer, sub: str, software: Optional[Software], client_audit: bool, padlen: int) -> List[Dict[str, Union[str, float]]]:
 | 
			
		||||
    ret: List[Dict[str, Union[str, float]]] = []
 | 
			
		||||
 | 
			
		||||
    secdb = VersionVulnerabilityDB.CVE if sub == 'cve' else VersionVulnerabilityDB.TXT
 | 
			
		||||
    if software is None or software.product not in secdb:
 | 
			
		||||
        return
 | 
			
		||||
        return ret
 | 
			
		||||
    for line in secdb[software.product]:
 | 
			
		||||
        vfrom: str = ''
 | 
			
		||||
        vtill: str = ''
 | 
			
		||||
@@ -258,17 +260,22 @@ def output_security_sub(out: OutputBuffer, sub: str, software: Optional[Software
 | 
			
		||||
            if cvss >= 8.0:
 | 
			
		||||
                out_func = out.fail
 | 
			
		||||
            out_func('(cve) {}{} -- (CVSSv2: {}) {}'.format(name, p, cvss, descr))
 | 
			
		||||
            ret.append({'name': name, 'cvssv2': cvss, 'description': descr})
 | 
			
		||||
        else:
 | 
			
		||||
            descr = line[4]
 | 
			
		||||
            out.fail('(sec) {}{} -- {}'.format(name, p, descr))
 | 
			
		||||
 | 
			
		||||
    return ret
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def output_security(out: OutputBuffer, banner: Optional[Banner], client_audit: bool, padlen: int, is_json_output: bool) -> List[Dict[str, Union[str, float]]]:
 | 
			
		||||
    cves = []
 | 
			
		||||
 | 
			
		||||
def output_security(out: OutputBuffer, banner: Optional[Banner], client_audit: bool, padlen: int, is_json_output: bool) -> None:
 | 
			
		||||
    with out:
 | 
			
		||||
        if banner is not None:
 | 
			
		||||
            software = Software.parse(banner)
 | 
			
		||||
            output_security_sub(out, 'cve', software, client_audit, padlen)
 | 
			
		||||
            output_security_sub(out, 'txt', software, client_audit, padlen)
 | 
			
		||||
            cves = output_security_sub(out, 'cve', software, client_audit, padlen)
 | 
			
		||||
            _ = output_security_sub(out, 'txt', software, client_audit, padlen)
 | 
			
		||||
            if banner.protocol[0] == 1:
 | 
			
		||||
                p = '' if out.batch else ' ' * (padlen - 14)
 | 
			
		||||
                out.fail('(sec) SSH v1 enabled{} -- SSH v1 can be exploited to recover plaintext passwords'.format(p))
 | 
			
		||||
@@ -277,6 +284,8 @@ def output_security(out: OutputBuffer, banner: Optional[Banner], client_audit: b
 | 
			
		||||
        out.flush_section()
 | 
			
		||||
        out.sep()
 | 
			
		||||
 | 
			
		||||
    return cves
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def output_fingerprints(out: OutputBuffer, algs: Algorithms, is_json_output: bool) -> None:
 | 
			
		||||
    with out:
 | 
			
		||||
@@ -349,40 +358,35 @@ def output_recommendations(out: OutputBuffer, algs: Algorithms, algorithm_recomm
 | 
			
		||||
            ret = False
 | 
			
		||||
        return ret
 | 
			
		||||
 | 
			
		||||
    for_server = True
 | 
			
		||||
    with out:
 | 
			
		||||
        software, alg_rec = algs.get_recommendations(software, for_server)
 | 
			
		||||
        for sshv in range(2, 0, -1):
 | 
			
		||||
            if sshv not in alg_rec:
 | 
			
		||||
                continue
 | 
			
		||||
            for alg_type in ['kex', 'key', 'enc', 'mac']:
 | 
			
		||||
                if alg_type not in alg_rec[sshv]:
 | 
			
		||||
                    continue
 | 
			
		||||
                for action in ['del', 'add', 'chg']:
 | 
			
		||||
                    if action not in alg_rec[sshv][alg_type]:
 | 
			
		||||
                        continue
 | 
			
		||||
                    for name in alg_rec[sshv][alg_type][action]:
 | 
			
		||||
        recommendations = get_algorithm_recommendations(algs, algorithm_recommendation_suppress_list, software, for_server=True)
 | 
			
		||||
 | 
			
		||||
                        # If this algorithm should be suppressed, skip it.
 | 
			
		||||
                        if name in algorithm_recommendation_suppress_list:
 | 
			
		||||
                            continue
 | 
			
		||||
        for level in recommendations:  # pylint: disable=consider-using-dict-items
 | 
			
		||||
            for action in recommendations[level]:
 | 
			
		||||
                for alg_type in recommendations[level][action]:
 | 
			
		||||
                    for alg_name_and_notes in recommendations[level][action][alg_type]:
 | 
			
		||||
                        name = alg_name_and_notes['name']
 | 
			
		||||
                        notes = alg_name_and_notes['notes']
 | 
			
		||||
 | 
			
		||||
                        p = '' if out.batch else ' ' * (padlen - len(name))
 | 
			
		||||
                        chg_additional_info = ''
 | 
			
		||||
 | 
			
		||||
                        if action == 'del':
 | 
			
		||||
                            an, sg, fn = 'remove', '-', out.warn
 | 
			
		||||
                            ret = False
 | 
			
		||||
                            if alg_rec[sshv][alg_type][action][name] >= 10:
 | 
			
		||||
                            if level == 'critical':
 | 
			
		||||
                                fn = out.fail
 | 
			
		||||
                        elif action == 'add':
 | 
			
		||||
                            an, sg, fn = 'append', '+', out.good
 | 
			
		||||
                        elif action == 'chg':
 | 
			
		||||
                            an, sg, fn = 'change', '!', out.fail
 | 
			
		||||
                            ret = False
 | 
			
		||||
                            chg_additional_info = ' (increase modulus size to 3072 bits or larger)'
 | 
			
		||||
                        b = '(SSH{})'.format(sshv) if sshv == 1 else ''
 | 
			
		||||
                        fm = '(rec) {0}{1}{2}-- {3} algorithm to {4}{5} {6}'
 | 
			
		||||
                        fn(fm.format(sg, name, p, alg_type, an, chg_additional_info, b))
 | 
			
		||||
 | 
			
		||||
                        if notes != '':
 | 
			
		||||
                            notes = " (%s)" % notes
 | 
			
		||||
 | 
			
		||||
                        fm = '(rec) {0}{1}{2}-- {3} algorithm to {4}{5} '
 | 
			
		||||
                        fn(fm.format(sg, name, p, alg_type, an, notes))
 | 
			
		||||
 | 
			
		||||
    if not out.is_section_empty() and not is_json_output:
 | 
			
		||||
        if software is not None:
 | 
			
		||||
            title = '(for {})'.format(software.display(False))
 | 
			
		||||
@@ -491,7 +495,7 @@ def output(out: OutputBuffer, aconf: AuditConf, banner: Optional[Banner], header
 | 
			
		||||
        out.flush_section()
 | 
			
		||||
        out.sep()
 | 
			
		||||
    maxlen = algs.maxlen + 1
 | 
			
		||||
    output_security(out, banner, client_audit, maxlen, aconf.json)
 | 
			
		||||
    cves = output_security(out, banner, client_audit, maxlen, aconf.json)
 | 
			
		||||
    # Filled in by output_algorithms() with unidentified algs.
 | 
			
		||||
    unknown_algorithms: List[str] = []
 | 
			
		||||
    if pkm is not None:
 | 
			
		||||
@@ -521,7 +525,7 @@ def output(out: OutputBuffer, aconf: AuditConf, banner: Optional[Banner], header
 | 
			
		||||
    if aconf.json:
 | 
			
		||||
        out.reset()
 | 
			
		||||
        # Build & write the JSON struct.
 | 
			
		||||
        out.info(json.dumps(build_struct(aconf.host + ":" + str(aconf.port), banner, kex=kex, client_host=client_host), indent=4 if aconf.json_print_indent else None, sort_keys=True))
 | 
			
		||||
        out.info(json.dumps(build_struct(aconf.host + ":" + str(aconf.port), banner, cves, kex=kex, client_host=client_host, software=software, algorithms=algs, algorithm_recommendation_suppress_list=algorithm_recommendation_suppress_list), indent=4 if aconf.json_print_indent else None, sort_keys=True))
 | 
			
		||||
    elif len(unknown_algorithms) > 0:  # If we encountered any unknown algorithms, ask the user to report them.
 | 
			
		||||
        out.warn("\n\n!!! WARNING: unknown algorithm(s) found!: %s.  Please email the full output above to the maintainer (jtesta@positronsecurity.com), or create a Github issue at <https://github.com/jtesta/ssh-audit/issues>.\n" % ','.join(unknown_algorithms))
 | 
			
		||||
 | 
			
		||||
@@ -571,6 +575,55 @@ def evaluate_policy(out: OutputBuffer, aconf: AuditConf, banner: Optional['Banne
 | 
			
		||||
    return passed
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_algorithm_recommendations(algs: Optional[Algorithms], algorithm_recommendation_suppress_list: Optional[List[str]], software: Optional[Software], for_server: bool = True) -> Dict[str, Any]:
 | 
			
		||||
    '''Returns the algorithm recommendations.'''
 | 
			
		||||
    ret: Dict[str, Any] = {}
 | 
			
		||||
 | 
			
		||||
    if algs is None or software is None:
 | 
			
		||||
        return ret
 | 
			
		||||
 | 
			
		||||
    software, alg_rec = algs.get_recommendations(software, for_server)
 | 
			
		||||
    for sshv in range(2, 0, -1):
 | 
			
		||||
        if sshv not in alg_rec:
 | 
			
		||||
            continue
 | 
			
		||||
        for alg_type in ['kex', 'key', 'enc', 'mac']:
 | 
			
		||||
            if alg_type not in alg_rec[sshv]:
 | 
			
		||||
                continue
 | 
			
		||||
            for action in ['del', 'add', 'chg']:
 | 
			
		||||
                if action not in alg_rec[sshv][alg_type]:
 | 
			
		||||
                    continue
 | 
			
		||||
 | 
			
		||||
                for name in alg_rec[sshv][alg_type][action]:
 | 
			
		||||
 | 
			
		||||
                    # If this algorithm should be suppressed, skip it.
 | 
			
		||||
                    if algorithm_recommendation_suppress_list is not None and name in algorithm_recommendation_suppress_list:
 | 
			
		||||
                        continue
 | 
			
		||||
 | 
			
		||||
                    level = 'informational'
 | 
			
		||||
                    points = alg_rec[sshv][alg_type][action][name]
 | 
			
		||||
                    if points >= 10:
 | 
			
		||||
                        level = 'critical'
 | 
			
		||||
                    elif points >= 1:
 | 
			
		||||
                        level = 'warning'
 | 
			
		||||
 | 
			
		||||
                    if level not in ret:
 | 
			
		||||
                        ret[level] = {}
 | 
			
		||||
 | 
			
		||||
                    if action not in ret[level]:
 | 
			
		||||
                        ret[level][action] = {}
 | 
			
		||||
 | 
			
		||||
                    if alg_type not in ret[level][action]:
 | 
			
		||||
                        ret[level][action][alg_type] = []
 | 
			
		||||
 | 
			
		||||
                    notes = ''
 | 
			
		||||
                    if action == 'chg':
 | 
			
		||||
                        notes = 'increase modulus size to 3072 bits or larger'
 | 
			
		||||
 | 
			
		||||
                    ret[level][action][alg_type].append({'name': name, 'notes': notes})
 | 
			
		||||
 | 
			
		||||
    return ret
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def list_policies(out: OutputBuffer) -> None:
 | 
			
		||||
    '''Prints a list of server & client policies.'''
 | 
			
		||||
 | 
			
		||||
@@ -801,7 +854,7 @@ def process_commandline(out: OutputBuffer, args: List[str], usage_cb: Callable[.
 | 
			
		||||
    return aconf
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def build_struct(target_host: str, banner: Optional['Banner'], kex: Optional['SSH2_Kex'] = None, pkm: Optional['SSH1_PublicKeyMessage'] = None, client_host: Optional[str] = None) -> Any:
 | 
			
		||||
def build_struct(target_host: str, banner: Optional['Banner'], cves: List[Dict[str, Union[str, float]]], kex: Optional['SSH2_Kex'] = None, pkm: Optional['SSH1_PublicKeyMessage'] = None, client_host: Optional[str] = None, software: Optional[Software] = None, algorithms: Optional[Algorithms] = None, algorithm_recommendation_suppress_list: Optional[List[str]] = None) -> Any:  # pylint: disable=too-many-arguments
 | 
			
		||||
 | 
			
		||||
    banner_str = ''
 | 
			
		||||
    banner_protocol = None
 | 
			
		||||
@@ -907,6 +960,12 @@ def build_struct(target_host: str, banner: Optional['Banner'], kex: Optional['SS
 | 
			
		||||
            'fp': pkm_fp,
 | 
			
		||||
        }]
 | 
			
		||||
 | 
			
		||||
    # Add in the CVE information.
 | 
			
		||||
    res['cves'] = cves
 | 
			
		||||
 | 
			
		||||
    # Add in the recommendations.
 | 
			
		||||
    res['recommendations'] = get_algorithm_recommendations(algorithms, algorithm_recommendation_suppress_list, software, for_server=True)
 | 
			
		||||
 | 
			
		||||
    return res
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user