mirror of
https://github.com/jtesta/ssh-audit.git
synced 2024-11-16 13:35:39 +01:00
Implement new features: minimum output level and batch output.
This commit is contained in:
parent
ef8d727356
commit
5189c341f3
189
ssh-audit.py
189
ssh-audit.py
@ -24,38 +24,77 @@
|
||||
THE SOFTWARE.
|
||||
"""
|
||||
from __future__ import print_function
|
||||
import os, io, sys, socket, struct, random, errno
|
||||
import os, io, sys, socket, struct, random, errno, getopt
|
||||
|
||||
VERSION = 'v1.0.20160902'
|
||||
SSH_BANNER = 'SSH-2.0-OpenSSH_7.3'
|
||||
|
||||
def usage():
|
||||
def usage(err = None):
|
||||
p = os.path.basename(sys.argv[0])
|
||||
out.head('# {0} v1.0.20160812, moo@arthepsy.eu'.format(p))
|
||||
out.info('\nusage: {0} [-nv] host[:port]\n'.format(p))
|
||||
out.info(' -v verbose')
|
||||
out.info(' -n disable colors' + os.linesep)
|
||||
out.batch = False
|
||||
out.minlevel = 'info'
|
||||
out.head('# {0} {1}, moo@arthepsy.eu'.format(p, VERSION))
|
||||
if err is not None:
|
||||
out.fail('\n' + err)
|
||||
out.info('\nusage: {0} [-bnv] [-l <level>] <host[:port]>\n'.format(p))
|
||||
out.info(' -h, --help print this help')
|
||||
out.info(' -b --batch batch output')
|
||||
out.info(' -n --no-colors disable colors')
|
||||
out.info(' -v --verbose verbose output')
|
||||
out.info(' -l, --level=<level> minimum output level (info|warn|fail)')
|
||||
out.sep()
|
||||
sys.exit(1)
|
||||
|
||||
class Output(object):
|
||||
colors = True
|
||||
verbose = False
|
||||
LEVELS = ['info', 'warn', 'fail']
|
||||
COLORS = {'head': 36, 'good': 32, 'warn': 33, 'fail': 31}
|
||||
|
||||
def __init__(self):
|
||||
self.batch = False
|
||||
self.colors = True
|
||||
self.verbose = False
|
||||
self.__minlevel = 0
|
||||
|
||||
@property
|
||||
def minlevel(self):
|
||||
return self.__minlevel
|
||||
@minlevel.setter
|
||||
def minlevel(self, name):
|
||||
self.__minlevel = self.getlevel(name)
|
||||
def getlevel(self, name):
|
||||
cname = 'info' if name == 'good' else name
|
||||
if not cname in self.LEVELS:
|
||||
return sys.maxsize
|
||||
return self.LEVELS.index(cname)
|
||||
|
||||
_colors = {
|
||||
'head': 36,
|
||||
'good': 32,
|
||||
'fail': 31,
|
||||
'warn': 33,
|
||||
}
|
||||
def sep(self):
|
||||
print()
|
||||
if not self.batch:
|
||||
print()
|
||||
def _colorized(self, color):
|
||||
return lambda x: print(color + x + '\033[0m')
|
||||
return lambda x: print(u'{0}{1}\033[0m'.format(color, x))
|
||||
def __getattr__(self, name):
|
||||
if self.colors and os.name == 'posix' and name in self._colors:
|
||||
color = '\033[0;{0}m'.format(self._colors[name])
|
||||
if name == 'head' and self.batch:
|
||||
return lambda x: None
|
||||
if not self.getlevel(name) >= self.minlevel:
|
||||
return lambda x: None
|
||||
if self.colors and os.name == 'posix' and name in self.COLORS:
|
||||
color = u'\033[0;{0}m'.format(self.COLORS[name])
|
||||
return self._colorized(color)
|
||||
else:
|
||||
return lambda x: print(x)
|
||||
return lambda x: print(u'{0}'.format(x))
|
||||
|
||||
class OutputBuffer(list):
|
||||
def __enter__(self):
|
||||
self.__buf = io.StringIO()
|
||||
self.__stdout = sys.stdout
|
||||
sys.stdout = self.__buf
|
||||
return self
|
||||
def flush(self):
|
||||
for line in self:
|
||||
print(line)
|
||||
def __exit__(self, *args):
|
||||
self.extend(self.__buf.getvalue().splitlines())
|
||||
sys.stdout = self.__stdout
|
||||
|
||||
class KexParty(object):
|
||||
encryption = []
|
||||
@ -481,9 +520,14 @@ KEX_DB = {
|
||||
}
|
||||
}
|
||||
|
||||
def output_algorithms(alg_type, algorithms, maxlen=0):
|
||||
for algorithm in algorithms:
|
||||
output_algorithm(alg_type, algorithm, maxlen)
|
||||
def output_algorithms(title, alg_type, algorithms, maxlen=0):
|
||||
with OutputBuffer() as obuf:
|
||||
for algorithm in algorithms:
|
||||
output_algorithm(alg_type, algorithm, maxlen)
|
||||
if len(obuf) > 0:
|
||||
out.head('# ' + title)
|
||||
obuf.flush()
|
||||
out.sep()
|
||||
|
||||
def output_algorithm(alg_type, alg_name, alg_max_len=0):
|
||||
prefix = '(' + alg_type + ') '
|
||||
@ -512,7 +556,7 @@ def output_algorithm(alg_type, alg_name, alg_max_len=0):
|
||||
if first:
|
||||
if first and level == 'info':
|
||||
f = out.good
|
||||
f(prefix + alg_name + padding + ' -- ' + text)
|
||||
f(prefix + alg_name + padding +' -- ' + text)
|
||||
first = False
|
||||
else:
|
||||
if out.verbose:
|
||||
@ -531,44 +575,48 @@ def output_compatibility(kex, client=False):
|
||||
comp_text.append('{0} {1}'.format(sshd_name, v[0]))
|
||||
else:
|
||||
if v[1] < v[0]:
|
||||
comp_text.append('{0} {1}+ (some functionality from {2})'.format(sshd_name, v[0], v[1]))
|
||||
tfmt = '{0} {1}+ (some functionality from {2})'
|
||||
else:
|
||||
comp_text.append('{0} {1}-{2}'.format(sshd_name, v[0], v[1]))
|
||||
tfmt = '{0} {1}-{2}'
|
||||
comp_text.append(tfmt.format(sshd_name, v[0], v[1]))
|
||||
if len(comp_text) > 0:
|
||||
out.good('[info] compatibility: ' + ', '.join(comp_text))
|
||||
out.good('(gen) compatibility: ' + ', '.join(comp_text))
|
||||
|
||||
def output(banner, header, kex):
|
||||
if banner is not None or kex is not None:
|
||||
with OutputBuffer() as obuf:
|
||||
if len(header) > 0:
|
||||
out.info('(gen) header: ' + '\n'.join(header))
|
||||
if banner is not None:
|
||||
out.good('(gen) banner: ' + banner)
|
||||
if banner.startswith('SSH-1.99-'):
|
||||
out.fail('(gen) protocol SSH1 enabled')
|
||||
if kex is not None:
|
||||
output_compatibility(kex)
|
||||
compressions = [x for x in kex.server.compression if x != 'none']
|
||||
if len(compressions) > 0:
|
||||
cmptxt = 'enabled ({0})'.format(', '.join(compressions))
|
||||
else:
|
||||
cmptxt = 'disabled'
|
||||
out.good('(gen) compression is ' + cmptxt)
|
||||
if len(obuf) > 0:
|
||||
out.head('# general')
|
||||
if len(header) > 0:
|
||||
out.info('[info] header: ' + '\n'.join(header))
|
||||
if banner is not None:
|
||||
out.good('[info] banner: ' + banner)
|
||||
if banner.startswith('SSH-1.99-'):
|
||||
out.fail('[fail] protocol SSH1 enabled')
|
||||
obuf.flush()
|
||||
out.sep()
|
||||
if kex is None:
|
||||
return
|
||||
output_compatibility(kex)
|
||||
compressions = [x for x in kex.server.compression if x != 'none']
|
||||
if len(compressions) > 0:
|
||||
cmptxt = 'enabled ({0})'.format(', '.join(compressions))
|
||||
else:
|
||||
cmptxt = 'disabled'
|
||||
out.good('[info] compression is ' + cmptxt)
|
||||
ml = lambda l: max(len(i) for i in l)
|
||||
maxlen = max(ml(kex.kex_algorithms),
|
||||
ml(kex.key_algorithms),
|
||||
ml(kex.server.encryption),
|
||||
ml(kex.server.mac))
|
||||
out.head('\n# key exchange algorithms')
|
||||
output_algorithms('kex', kex.kex_algorithms, maxlen)
|
||||
out.head('\n# host-key algorithms')
|
||||
output_algorithms('key', kex.key_algorithms, maxlen)
|
||||
out.head('\n# encryption algorithms (ciphers)')
|
||||
output_algorithms('enc', kex.server.encryption, maxlen)
|
||||
out.head('\n# message authentication code algorithms')
|
||||
output_algorithms('mac', kex.server.mac, maxlen)
|
||||
out.sep()
|
||||
title, alg_type = 'key exchange algorithms', 'kex'
|
||||
output_algorithms(title, alg_type, kex.kex_algorithms, maxlen)
|
||||
title, alg_type = 'host-key algorithms', 'key'
|
||||
output_algorithms(title, alg_type, kex.key_algorithms, maxlen)
|
||||
title, alg_type = 'encryption algorithms (ciphers)', 'enc'
|
||||
output_algorithms(title, alg_type, kex.server.encryption, maxlen)
|
||||
title, alg_type = 'message authentication code algorithms', 'mac'
|
||||
output_algorithms(title, alg_type, kex.server.mac, maxlen)
|
||||
|
||||
|
||||
def parse_int(v):
|
||||
@ -578,20 +626,34 @@ def parse_int(v):
|
||||
return 0
|
||||
|
||||
def parse_args():
|
||||
host = None
|
||||
port = 22
|
||||
for arg in sys.argv[1:]:
|
||||
if arg.startswith('-'):
|
||||
arg = arg.lstrip('-')
|
||||
if arg == 'n': out.colors = False
|
||||
elif arg == 'v': out.verbose = True
|
||||
continue
|
||||
s = arg.split(':')
|
||||
host = s[0].strip()
|
||||
if len(s) > 1:
|
||||
port = parse_int(s[1])
|
||||
if not host or port <= 0:
|
||||
host, port = None, 22
|
||||
try:
|
||||
sopts = 'hbnvl:'
|
||||
lopts = ['help', 'batch', 'no-colors', 'verbose', 'level=']
|
||||
opts, args = getopt.getopt(sys.argv[1:], sopts, lopts)
|
||||
except getopt.GetoptError as err:
|
||||
usage(str(err))
|
||||
for o, a in opts:
|
||||
if o in ('-h', '--hep'):
|
||||
usage()
|
||||
elif o in ('-b', '--batch'):
|
||||
out.batch = True
|
||||
elif o in ('-n', '--no-colors'):
|
||||
out.colors = False
|
||||
elif o in ('-v', '--verbose'):
|
||||
out.verbose = True
|
||||
elif o in ('-l', '--level='):
|
||||
if a not in ('info', 'warn', 'fail'):
|
||||
usage('level ' + a + ' is not valid')
|
||||
out.minlevel = a
|
||||
if len(args) == 0:
|
||||
usage()
|
||||
s = args[0].split(':')
|
||||
host = s[0].strip()
|
||||
if len(s) > 1:
|
||||
port = parse_int(s[1])
|
||||
if not host or port <= 0:
|
||||
usage('port {0} is not valid'.format(port))
|
||||
return host, port
|
||||
|
||||
def main():
|
||||
@ -606,7 +668,8 @@ def main():
|
||||
if packet_type < 0:
|
||||
err = '[exception] error reading packet ({0})'.format(payload)
|
||||
elif packet_type != SSH.MSG_KEXINIT:
|
||||
err = '[exception] did not receive MSG_KEXINIT (20), instead received unknown message ({0})'.format(packet_type)
|
||||
err = '[exception] did not receive MSG_KEXINIT (20), ' + \
|
||||
'instead received unknown message ({0})'.format(packet_type)
|
||||
if err:
|
||||
output(banner, header, None)
|
||||
out.fail(err)
|
||||
|
Loading…
Reference in New Issue
Block a user