From 5189c341f3aad8b19b7f7ab824451c35cf2c17d4 Mon Sep 17 00:00:00 2001 From: Andris Raugulis Date: Fri, 2 Sep 2016 16:25:57 +0300 Subject: [PATCH] Implement new features: minimum output level and batch output. --- ssh-audit.py | 189 ++++++++++++++++++++++++++++++++++----------------- 1 file changed, 126 insertions(+), 63 deletions(-) diff --git a/ssh-audit.py b/ssh-audit.py index f6bef68..5cd1972 100755 --- a/ssh-audit.py +++ b/ssh-audit.py @@ -24,38 +24,77 @@ THE SOFTWARE. """ from __future__ import print_function -import os, io, sys, socket, struct, random, errno +import os, io, sys, socket, struct, random, errno, getopt +VERSION = 'v1.0.20160902' SSH_BANNER = 'SSH-2.0-OpenSSH_7.3' -def usage(): +def usage(err = None): p = os.path.basename(sys.argv[0]) - out.head('# {0} v1.0.20160812, moo@arthepsy.eu'.format(p)) - out.info('\nusage: {0} [-nv] host[:port]\n'.format(p)) - out.info(' -v verbose') - out.info(' -n disable colors' + os.linesep) + out.batch = False + out.minlevel = 'info' + out.head('# {0} {1}, moo@arthepsy.eu'.format(p, VERSION)) + if err is not None: + out.fail('\n' + err) + out.info('\nusage: {0} [-bnv] [-l ] \n'.format(p)) + out.info(' -h, --help print this help') + out.info(' -b --batch batch output') + out.info(' -n --no-colors disable colors') + out.info(' -v --verbose verbose output') + out.info(' -l, --level= minimum output level (info|warn|fail)') + out.sep() sys.exit(1) class Output(object): - colors = True - verbose = False + LEVELS = ['info', 'warn', 'fail'] + COLORS = {'head': 36, 'good': 32, 'warn': 33, 'fail': 31} + + def __init__(self): + self.batch = False + self.colors = True + self.verbose = False + self.__minlevel = 0 + + @property + def minlevel(self): + return self.__minlevel + @minlevel.setter + def minlevel(self, name): + self.__minlevel = self.getlevel(name) + def getlevel(self, name): + cname = 'info' if name == 'good' else name + if not cname in self.LEVELS: + return sys.maxsize + return self.LEVELS.index(cname) - _colors = { - 'head': 36, - 'good': 32, - 'fail': 31, - 'warn': 33, - } def sep(self): - print() + if not self.batch: + print() def _colorized(self, color): - return lambda x: print(color + x + '\033[0m') + return lambda x: print(u'{0}{1}\033[0m'.format(color, x)) def __getattr__(self, name): - if self.colors and os.name == 'posix' and name in self._colors: - color = '\033[0;{0}m'.format(self._colors[name]) + if name == 'head' and self.batch: + return lambda x: None + if not self.getlevel(name) >= self.minlevel: + return lambda x: None + if self.colors and os.name == 'posix' and name in self.COLORS: + color = u'\033[0;{0}m'.format(self.COLORS[name]) return self._colorized(color) else: - return lambda x: print(x) + return lambda x: print(u'{0}'.format(x)) + +class OutputBuffer(list): + def __enter__(self): + self.__buf = io.StringIO() + self.__stdout = sys.stdout + sys.stdout = self.__buf + return self + def flush(self): + for line in self: + print(line) + def __exit__(self, *args): + self.extend(self.__buf.getvalue().splitlines()) + sys.stdout = self.__stdout class KexParty(object): encryption = [] @@ -481,9 +520,14 @@ KEX_DB = { } } -def output_algorithms(alg_type, algorithms, maxlen=0): - for algorithm in algorithms: - output_algorithm(alg_type, algorithm, maxlen) +def output_algorithms(title, alg_type, algorithms, maxlen=0): + with OutputBuffer() as obuf: + for algorithm in algorithms: + output_algorithm(alg_type, algorithm, maxlen) + if len(obuf) > 0: + out.head('# ' + title) + obuf.flush() + out.sep() def output_algorithm(alg_type, alg_name, alg_max_len=0): prefix = '(' + alg_type + ') ' @@ -512,7 +556,7 @@ def output_algorithm(alg_type, alg_name, alg_max_len=0): if first: if first and level == 'info': f = out.good - f(prefix + alg_name + padding + ' -- ' + text) + f(prefix + alg_name + padding +' -- ' + text) first = False else: if out.verbose: @@ -531,44 +575,48 @@ def output_compatibility(kex, client=False): comp_text.append('{0} {1}'.format(sshd_name, v[0])) else: if v[1] < v[0]: - comp_text.append('{0} {1}+ (some functionality from {2})'.format(sshd_name, v[0], v[1])) + tfmt = '{0} {1}+ (some functionality from {2})' else: - comp_text.append('{0} {1}-{2}'.format(sshd_name, v[0], v[1])) + tfmt = '{0} {1}-{2}' + comp_text.append(tfmt.format(sshd_name, v[0], v[1])) if len(comp_text) > 0: - out.good('[info] compatibility: ' + ', '.join(comp_text)) + out.good('(gen) compatibility: ' + ', '.join(comp_text)) def output(banner, header, kex): - if banner is not None or kex is not None: + with OutputBuffer() as obuf: + if len(header) > 0: + out.info('(gen) header: ' + '\n'.join(header)) + if banner is not None: + out.good('(gen) banner: ' + banner) + if banner.startswith('SSH-1.99-'): + out.fail('(gen) protocol SSH1 enabled') + if kex is not None: + output_compatibility(kex) + compressions = [x for x in kex.server.compression if x != 'none'] + if len(compressions) > 0: + cmptxt = 'enabled ({0})'.format(', '.join(compressions)) + else: + cmptxt = 'disabled' + out.good('(gen) compression is ' + cmptxt) + if len(obuf) > 0: out.head('# general') - if len(header) > 0: - out.info('[info] header: ' + '\n'.join(header)) - if banner is not None: - out.good('[info] banner: ' + banner) - if banner.startswith('SSH-1.99-'): - out.fail('[fail] protocol SSH1 enabled') + obuf.flush() + out.sep() if kex is None: return - output_compatibility(kex) - compressions = [x for x in kex.server.compression if x != 'none'] - if len(compressions) > 0: - cmptxt = 'enabled ({0})'.format(', '.join(compressions)) - else: - cmptxt = 'disabled' - out.good('[info] compression is ' + cmptxt) ml = lambda l: max(len(i) for i in l) maxlen = max(ml(kex.kex_algorithms), ml(kex.key_algorithms), ml(kex.server.encryption), ml(kex.server.mac)) - out.head('\n# key exchange algorithms') - output_algorithms('kex', kex.kex_algorithms, maxlen) - out.head('\n# host-key algorithms') - output_algorithms('key', kex.key_algorithms, maxlen) - out.head('\n# encryption algorithms (ciphers)') - output_algorithms('enc', kex.server.encryption, maxlen) - out.head('\n# message authentication code algorithms') - output_algorithms('mac', kex.server.mac, maxlen) - out.sep() + title, alg_type = 'key exchange algorithms', 'kex' + output_algorithms(title, alg_type, kex.kex_algorithms, maxlen) + title, alg_type = 'host-key algorithms', 'key' + output_algorithms(title, alg_type, kex.key_algorithms, maxlen) + title, alg_type = 'encryption algorithms (ciphers)', 'enc' + output_algorithms(title, alg_type, kex.server.encryption, maxlen) + title, alg_type = 'message authentication code algorithms', 'mac' + output_algorithms(title, alg_type, kex.server.mac, maxlen) def parse_int(v): @@ -578,20 +626,34 @@ def parse_int(v): return 0 def parse_args(): - host = None - port = 22 - for arg in sys.argv[1:]: - if arg.startswith('-'): - arg = arg.lstrip('-') - if arg == 'n': out.colors = False - elif arg == 'v': out.verbose = True - continue - s = arg.split(':') - host = s[0].strip() - if len(s) > 1: - port = parse_int(s[1]) - if not host or port <= 0: + host, port = None, 22 + try: + sopts = 'hbnvl:' + lopts = ['help', 'batch', 'no-colors', 'verbose', 'level='] + opts, args = getopt.getopt(sys.argv[1:], sopts, lopts) + except getopt.GetoptError as err: + usage(str(err)) + for o, a in opts: + if o in ('-h', '--hep'): + usage() + elif o in ('-b', '--batch'): + out.batch = True + elif o in ('-n', '--no-colors'): + out.colors = False + elif o in ('-v', '--verbose'): + out.verbose = True + elif o in ('-l', '--level='): + if a not in ('info', 'warn', 'fail'): + usage('level ' + a + ' is not valid') + out.minlevel = a + if len(args) == 0: usage() + s = args[0].split(':') + host = s[0].strip() + if len(s) > 1: + port = parse_int(s[1]) + if not host or port <= 0: + usage('port {0} is not valid'.format(port)) return host, port def main(): @@ -606,7 +668,8 @@ def main(): if packet_type < 0: err = '[exception] error reading packet ({0})'.format(payload) elif packet_type != SSH.MSG_KEXINIT: - err = '[exception] did not receive MSG_KEXINIT (20), instead received unknown message ({0})'.format(packet_type) + err = '[exception] did not receive MSG_KEXINIT (20), ' + \ + 'instead received unknown message ({0})'.format(packet_type) if err: output(banner, header, None) out.fail(err)