From b6c64d296b3a218b3f4c2014dcbee3560f8cc61c Mon Sep 17 00:00:00 2001 From: Andreas Jaggi Date: Thu, 7 Nov 2019 22:08:09 +0100 Subject: [PATCH] Add --json output option --- README.md | 1 + ssh-audit.py | 95 ++++++++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 89 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 3e51905..440b4bd 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,7 @@ usage: ssh-audit.py [-1246pbcnvlt] software config (use -p to change port; use -t to change timeout) -n, --no-colors disable colors + -j, --json JSON output -v, --verbose verbose output -l, --level= minimum output level (info|warn|fail) -t, --timeout= timeout (in seconds) for connection and reading diff --git a/ssh-audit.py b/ssh-audit.py index b6e58ed..7871cc2 100755 --- a/ssh-audit.py +++ b/ssh-audit.py @@ -25,7 +25,7 @@ THE SOFTWARE. """ from __future__ import print_function -import base64, binascii, errno, hashlib, getopt, io, os, random, re, select, socket, struct, sys +import base64, binascii, errno, hashlib, getopt, io, os, random, re, select, socket, struct, sys, json VERSION = 'v2.1.0-dev' SSH_HEADER = 'SSH-{0}-OpenSSH_8.0' # SSH software to impersonate @@ -62,7 +62,7 @@ def usage(err=None): uout.head('# {0} {1}, https://github.com/jtesta/ssh-audit\n'.format(p, VERSION)) if err is not None and len(err) > 0: uout.fail('\n' + err) - uout.info('usage: {0} [-1246pbcnvlt] \n'.format(p)) + uout.info('usage: {0} [-1246pbcnjvlt] \n'.format(p)) uout.info(' -h, --help print this help') uout.info(' -1, --ssh1 force ssh version 1 only') uout.info(' -2, --ssh2 force ssh version 2 only') @@ -72,6 +72,7 @@ def usage(err=None): uout.info(' -b, --batch batch output') uout.info(' -c, --client-audit starts a server on port 2222 to audit client\n software config (use -p to change port;\n use -t to change timeout)') uout.info(' -n, --no-colors disable colors') + uout.info(' -j, --json JSON output') uout.info(' -v, --verbose verbose output') uout.info(' -l, --level= minimum output level (info|warn|fail)') uout.info(' -t, --timeout= timeout (in seconds) for connection and reading\n (default: 5)') @@ -90,6 +91,7 @@ class AuditConf(object): self.batch = False self.client_audit = False self.colors = True + self.json = False self.verbose = False self.level = 'info' self.ipvo = () # type: Sequence[int] @@ -101,7 +103,7 @@ class AuditConf(object): def __setattr__(self, name, value): # type: (str, Union[str, int, bool, Sequence[int]]) -> None valid = False - if name in ['ssh1', 'ssh2', 'batch', 'client_audit', 'colors', 'verbose', 'timeout_set']: + if name in ['ssh1', 'ssh2', 'batch', 'client_audit', 'colors', 'verbose', 'timeout_set', 'json']: valid, value = True, True if bool(value) else False elif name in ['ipv4', 'ipv6']: valid = False @@ -148,8 +150,8 @@ class AuditConf(object): # pylint: disable=too-many-branches aconf = cls() try: - sopts = 'h1246p:bcnvl:t:' - lopts = ['help', 'ssh1', 'ssh2', 'ipv4', 'ipv6', 'port', + sopts = 'h1246p:bcnjvl:t:' + lopts = ['help', 'ssh1', 'ssh2', 'ipv4', 'ipv6', 'port', 'json', 'batch', 'client-audit', 'no-colors', 'verbose', 'level=', 'timeout='] opts, args = getopt.gnu_getopt(args, sopts, lopts) except getopt.GetoptError as err: @@ -176,6 +178,8 @@ class AuditConf(object): aconf.client_audit = True elif o in ('-n', '--no-colors'): aconf.colors = False + elif o in ('-j', '--json'): + aconf.json = True elif o in ('-v', '--verbose'): aconf.verbose = True elif o in ('-l', '--level'): @@ -225,6 +229,7 @@ class Output(object): self.batch = False self.verbose = False self.use_colors = True + self.json = False self.__level = 0 self.__colsupport = 'colorama' in sys.modules or os.name == 'posix' @@ -3137,6 +3142,76 @@ class Utils(object): except: # pylint: disable=bare-except return -1.0 +def build_struct(banner, kex=None, pkm=None): + res = { + "banner": { + "raw": str(banner), + "protocol": banner.protocol, + "software": banner.software, + "comments": banner.comments, + }, + } + if kex is not None: + res['compression'] = kex.server.compression + + res['kex'] = [] + alg_sizes = kex.dh_modulus_sizes() + for algorithm in kex.kex_algorithms: + entry = { + 'algorithm': algorithm, + } + if (alg_sizes is not None) and (algorithm in alg_sizes): + hostkey_size, ca_size = alg_sizes[algorithm] + entry['keysize'] = hostkey_size + if ca_size > 0: + entry['casize'] = ca_size + res['kex'].append(entry) + + res['key'] = [] + alg_sizes = kex.rsa_key_sizes() + for algorithm in kex.key_algorithms: + entry = { + 'algorithm': algorithm, + } + if (alg_sizes is not None) and (algorithm in alg_sizes): + hostkey_size, ca_size = alg_sizes[algorithm] + entry['keysize'] = hostkey_size + if ca_size > 0: + entry['casize'] = ca_size + res['key'].append(entry) + + res['enc'] = kex.server.encryption + res['mac'] = kex.server.mac + res['fingerprints'] = [] + host_keys = kex.host_keys() + for host_key_type in host_keys: + if host_keys[host_key_type] is None: + continue + + fp = SSH.Fingerprint(host_keys[host_key_type]) + + # Workaround for Python's order-indifference in dicts. We might get a random RSA type (ssh-rsa, rsa-sha2-256, or rsa-sha2-512), so running the tool against the same server three times may give three different host key types here. So if we have any RSA type, we will simply hard-code it to 'ssh-rsa'. + if host_key_type in SSH2.HostKeyTest.RSA_FAMILY: + host_key_type = 'ssh-rsa' + + # Skip over certificate host types (or we would return invalid fingerprints). + if '-cert-' in host_key_type: + continue + entry = { + 'type': host_key_type, + 'fp': fp.sha256, + } + res['fingerprints'].append(entry) + else: + res['key'] = ['ssh-rsa1'] + res['enc'] = pkm.supported_ciphers + res['aut'] = pkm.supported_authentications + res['fingerprints'] = [{ + 'type': 'ssh-rsa1', + 'fp': SSH.Fingerprint(pkm.host_key_fingerprint_data).sha256, + }] + + return res def audit(aconf, sshv=None): # type: (AuditConf, Optional[int]) -> None @@ -3189,13 +3264,19 @@ def audit(aconf, sshv=None): sys.exit(1) if sshv == 1: pkm = SSH1.PublicKeyMessage.parse(payload) - output(banner, header, pkm=pkm) + if aconf.json: + print(json.dumps(build_struct(banner, pkm=pkm))) + else: + output(banner, header, pkm=pkm) elif sshv == 2: kex = SSH2.Kex.parse(payload) if aconf.client_audit is False: SSH2.HostKeyTest.run(s, kex) SSH2.GEXTest.run(s, kex) - output(banner, header, client_audit=aconf.client_audit, kex=kex) + if aconf.json: + print(json.dumps(build_struct(banner, kex=kex))) + else: + output(banner, header, client_audit=aconf.client_audit, kex=kex) utils = Utils()