mirror of
https://github.com/jtesta/ssh-audit.git
synced 2024-11-16 13:35:39 +01:00
Add --json output option
This commit is contained in:
parent
1ec13c653e
commit
b6c64d296b
@ -33,6 +33,7 @@ usage: ssh-audit.py [-1246pbcnvlt] <host>
|
||||
software config (use -p to change port;
|
||||
use -t to change timeout)
|
||||
-n, --no-colors disable colors
|
||||
-j, --json JSON output
|
||||
-v, --verbose verbose output
|
||||
-l, --level=<level> minimum output level (info|warn|fail)
|
||||
-t, --timeout=<secs> timeout (in seconds) for connection and reading
|
||||
|
95
ssh-audit.py
95
ssh-audit.py
@ -25,7 +25,7 @@
|
||||
THE SOFTWARE.
|
||||
"""
|
||||
from __future__ import print_function
|
||||
import base64, binascii, errno, hashlib, getopt, io, os, random, re, select, socket, struct, sys
|
||||
import base64, binascii, errno, hashlib, getopt, io, os, random, re, select, socket, struct, sys, json
|
||||
|
||||
VERSION = 'v2.1.0-dev'
|
||||
SSH_HEADER = 'SSH-{0}-OpenSSH_8.0' # SSH software to impersonate
|
||||
@ -62,7 +62,7 @@ def usage(err=None):
|
||||
uout.head('# {0} {1}, https://github.com/jtesta/ssh-audit\n'.format(p, VERSION))
|
||||
if err is not None and len(err) > 0:
|
||||
uout.fail('\n' + err)
|
||||
uout.info('usage: {0} [-1246pbcnvlt] <host>\n'.format(p))
|
||||
uout.info('usage: {0} [-1246pbcnjvlt] <host>\n'.format(p))
|
||||
uout.info(' -h, --help print this help')
|
||||
uout.info(' -1, --ssh1 force ssh version 1 only')
|
||||
uout.info(' -2, --ssh2 force ssh version 2 only')
|
||||
@ -72,6 +72,7 @@ def usage(err=None):
|
||||
uout.info(' -b, --batch batch output')
|
||||
uout.info(' -c, --client-audit starts a server on port 2222 to audit client\n software config (use -p to change port;\n use -t to change timeout)')
|
||||
uout.info(' -n, --no-colors disable colors')
|
||||
uout.info(' -j, --json JSON output')
|
||||
uout.info(' -v, --verbose verbose output')
|
||||
uout.info(' -l, --level=<level> minimum output level (info|warn|fail)')
|
||||
uout.info(' -t, --timeout=<secs> timeout (in seconds) for connection and reading\n (default: 5)')
|
||||
@ -90,6 +91,7 @@ class AuditConf(object):
|
||||
self.batch = False
|
||||
self.client_audit = False
|
||||
self.colors = True
|
||||
self.json = False
|
||||
self.verbose = False
|
||||
self.level = 'info'
|
||||
self.ipvo = () # type: Sequence[int]
|
||||
@ -101,7 +103,7 @@ class AuditConf(object):
|
||||
def __setattr__(self, name, value):
|
||||
# type: (str, Union[str, int, bool, Sequence[int]]) -> None
|
||||
valid = False
|
||||
if name in ['ssh1', 'ssh2', 'batch', 'client_audit', 'colors', 'verbose', 'timeout_set']:
|
||||
if name in ['ssh1', 'ssh2', 'batch', 'client_audit', 'colors', 'verbose', 'timeout_set', 'json']:
|
||||
valid, value = True, True if bool(value) else False
|
||||
elif name in ['ipv4', 'ipv6']:
|
||||
valid = False
|
||||
@ -148,8 +150,8 @@ class AuditConf(object):
|
||||
# pylint: disable=too-many-branches
|
||||
aconf = cls()
|
||||
try:
|
||||
sopts = 'h1246p:bcnvl:t:'
|
||||
lopts = ['help', 'ssh1', 'ssh2', 'ipv4', 'ipv6', 'port',
|
||||
sopts = 'h1246p:bcnjvl:t:'
|
||||
lopts = ['help', 'ssh1', 'ssh2', 'ipv4', 'ipv6', 'port', 'json',
|
||||
'batch', 'client-audit', 'no-colors', 'verbose', 'level=', 'timeout=']
|
||||
opts, args = getopt.gnu_getopt(args, sopts, lopts)
|
||||
except getopt.GetoptError as err:
|
||||
@ -176,6 +178,8 @@ class AuditConf(object):
|
||||
aconf.client_audit = True
|
||||
elif o in ('-n', '--no-colors'):
|
||||
aconf.colors = False
|
||||
elif o in ('-j', '--json'):
|
||||
aconf.json = True
|
||||
elif o in ('-v', '--verbose'):
|
||||
aconf.verbose = True
|
||||
elif o in ('-l', '--level'):
|
||||
@ -225,6 +229,7 @@ class Output(object):
|
||||
self.batch = False
|
||||
self.verbose = False
|
||||
self.use_colors = True
|
||||
self.json = False
|
||||
self.__level = 0
|
||||
self.__colsupport = 'colorama' in sys.modules or os.name == 'posix'
|
||||
|
||||
@ -3137,6 +3142,76 @@ class Utils(object):
|
||||
except: # pylint: disable=bare-except
|
||||
return -1.0
|
||||
|
||||
def build_struct(banner, kex=None, pkm=None):
|
||||
res = {
|
||||
"banner": {
|
||||
"raw": str(banner),
|
||||
"protocol": banner.protocol,
|
||||
"software": banner.software,
|
||||
"comments": banner.comments,
|
||||
},
|
||||
}
|
||||
if kex is not None:
|
||||
res['compression'] = kex.server.compression
|
||||
|
||||
res['kex'] = []
|
||||
alg_sizes = kex.dh_modulus_sizes()
|
||||
for algorithm in kex.kex_algorithms:
|
||||
entry = {
|
||||
'algorithm': algorithm,
|
||||
}
|
||||
if (alg_sizes is not None) and (algorithm in alg_sizes):
|
||||
hostkey_size, ca_size = alg_sizes[algorithm]
|
||||
entry['keysize'] = hostkey_size
|
||||
if ca_size > 0:
|
||||
entry['casize'] = ca_size
|
||||
res['kex'].append(entry)
|
||||
|
||||
res['key'] = []
|
||||
alg_sizes = kex.rsa_key_sizes()
|
||||
for algorithm in kex.key_algorithms:
|
||||
entry = {
|
||||
'algorithm': algorithm,
|
||||
}
|
||||
if (alg_sizes is not None) and (algorithm in alg_sizes):
|
||||
hostkey_size, ca_size = alg_sizes[algorithm]
|
||||
entry['keysize'] = hostkey_size
|
||||
if ca_size > 0:
|
||||
entry['casize'] = ca_size
|
||||
res['key'].append(entry)
|
||||
|
||||
res['enc'] = kex.server.encryption
|
||||
res['mac'] = kex.server.mac
|
||||
res['fingerprints'] = []
|
||||
host_keys = kex.host_keys()
|
||||
for host_key_type in host_keys:
|
||||
if host_keys[host_key_type] is None:
|
||||
continue
|
||||
|
||||
fp = SSH.Fingerprint(host_keys[host_key_type])
|
||||
|
||||
# Workaround for Python's order-indifference in dicts. We might get a random RSA type (ssh-rsa, rsa-sha2-256, or rsa-sha2-512), so running the tool against the same server three times may give three different host key types here. So if we have any RSA type, we will simply hard-code it to 'ssh-rsa'.
|
||||
if host_key_type in SSH2.HostKeyTest.RSA_FAMILY:
|
||||
host_key_type = 'ssh-rsa'
|
||||
|
||||
# Skip over certificate host types (or we would return invalid fingerprints).
|
||||
if '-cert-' in host_key_type:
|
||||
continue
|
||||
entry = {
|
||||
'type': host_key_type,
|
||||
'fp': fp.sha256,
|
||||
}
|
||||
res['fingerprints'].append(entry)
|
||||
else:
|
||||
res['key'] = ['ssh-rsa1']
|
||||
res['enc'] = pkm.supported_ciphers
|
||||
res['aut'] = pkm.supported_authentications
|
||||
res['fingerprints'] = [{
|
||||
'type': 'ssh-rsa1',
|
||||
'fp': SSH.Fingerprint(pkm.host_key_fingerprint_data).sha256,
|
||||
}]
|
||||
|
||||
return res
|
||||
|
||||
def audit(aconf, sshv=None):
|
||||
# type: (AuditConf, Optional[int]) -> None
|
||||
@ -3189,13 +3264,19 @@ def audit(aconf, sshv=None):
|
||||
sys.exit(1)
|
||||
if sshv == 1:
|
||||
pkm = SSH1.PublicKeyMessage.parse(payload)
|
||||
output(banner, header, pkm=pkm)
|
||||
if aconf.json:
|
||||
print(json.dumps(build_struct(banner, pkm=pkm)))
|
||||
else:
|
||||
output(banner, header, pkm=pkm)
|
||||
elif sshv == 2:
|
||||
kex = SSH2.Kex.parse(payload)
|
||||
if aconf.client_audit is False:
|
||||
SSH2.HostKeyTest.run(s, kex)
|
||||
SSH2.GEXTest.run(s, kex)
|
||||
output(banner, header, client_audit=aconf.client_audit, kex=kex)
|
||||
if aconf.json:
|
||||
print(json.dumps(build_struct(banner, kex=kex)))
|
||||
else:
|
||||
output(banner, header, client_audit=aconf.client_audit, kex=kex)
|
||||
|
||||
|
||||
utils = Utils()
|
||||
|
Loading…
Reference in New Issue
Block a user