diff --git a/ssh-audit.py b/ssh-audit.py index ba4fc92..e28b8bd 100755 --- a/ssh-audit.py +++ b/ssh-audit.py @@ -29,7 +29,8 @@ import os, io, sys, socket, struct, random, errno, getopt VERSION = 'v1.0.20160902' SSH_BANNER = 'SSH-2.0-OpenSSH_7.3' -def usage(err = None): + +def usage(err=None): p = os.path.basename(sys.argv[0]) out.batch = False out.minlevel = 'info' @@ -45,6 +46,7 @@ def usage(err = None): out.sep() sys.exit(1) + class Output(object): LEVELS = ['info', 'warn', 'fail'] COLORS = {'head': 36, 'good': 32, 'warn': 33, 'fail': 31} @@ -58,20 +60,24 @@ class Output(object): @property def minlevel(self): return self.__minlevel + @minlevel.setter def minlevel(self, name): self.__minlevel = self.getlevel(name) + def getlevel(self, name): cname = 'info' if name == 'good' else name - if not cname in self.LEVELS: + if cname not in self.LEVELS: return sys.maxsize return self.LEVELS.index(cname) def sep(self): if not self.batch: print() + def _colorized(self, color): return lambda x: print(u'{0}{1}\033[0m'.format(color, x)) + def __getattr__(self, name): if name == 'head' and self.batch: return lambda x: None @@ -83,25 +89,30 @@ class Output(object): else: return lambda x: print(u'{0}'.format(x)) + class OutputBuffer(list): def __enter__(self): self.__buf = io.StringIO() self.__stdout = sys.stdout sys.stdout = self.__buf return self + def flush(self): for line in self: print(line) + def __exit__(self, *args): self.extend(self.__buf.getvalue().splitlines()) sys.stdout = self.__stdout + class KexParty(object): encryption = [] mac = [] compression = [] languages = [] + class Kex(object): cookie = None kex_algorithms = [] @@ -130,8 +141,9 @@ class Kex(object): kex.unused = buf.read_int() return kex + class ReadBuf(object): - def __init__(self, data = None): + def __init__(self, data=None): super(ReadBuf, self).__init__() self._buf = io.BytesIO(data) if data else io.BytesIO() self._len = len(data) if data else 0 @@ -156,8 +168,9 @@ class ReadBuf(object): list_size = self.read_int() return self.read(list_size).decode().split(',') + class WriteBuf(object): - def __init__(self, data = None): + def __init__(self, data=None): super(WriteBuf, self).__init__() self._wbuf = io.BytesIO(data) if data else io.BytesIO() @@ -185,10 +198,11 @@ class WriteBuf(object): def write_mpint(self, v): length = v.bit_length() // 8 + 1 - data = [(v >> i*8) & 0xff for i in reversed(range(length))] + data = [(v >> i * 8) & 0xff for i in reversed(range(length))] data = bytes(bytearray(data)) self.write_string(data) + class SSH(object): MSG_KEXINIT = 20 MSG_NEWKEYS = 21 @@ -198,7 +212,7 @@ class SSH(object): class Socket(ReadBuf, WriteBuf): SM_BANNER_SENT = 1 - def __init__(self, host, port, cto = 3.0, rto = 5.0): + def __init__(self, host, port, cto=3.0, rto=5.0): self.__block_size = 8 self.__state = 0 self.__header = [] @@ -231,7 +245,7 @@ class SSH(object): self.__header.append(line) return self.__banner, self.__header - def recv(self, size = 2048): + def recv(self, size=2048): try: data = self.__sock.recv(size) except socket.timeout as e: @@ -257,7 +271,7 @@ class SSH(object): return (-1, e) self.__sock.send(data) - def send_banner(self, banner = SSH_BANNER): + def send_banner(self, banner=SSH_BANNER): self.send(banner.encode() + b'\r\n') if self.__state < self.SM_BANNER_SENT: self.__state = self.SM_BANNER_SENT @@ -314,6 +328,7 @@ class SSH(object): except: pass + class KexDH(object): def __init__(self, alg, g, p): self.__alg = alg @@ -330,6 +345,7 @@ class KexDH(object): s.write_mpint(self.__e) s.send_packet() + class KexGroup1(KexDH): def __init__(self): # rfc2409: second oakley group @@ -340,6 +356,7 @@ class KexGroup1(KexDH): 'ffffffffffffffff', 16) super(KexGroup1, self).__init__('sha1', 2, p) + class KexGroup14(KexDH): def __init__(self): # rfc3526: 2048-bit modp group @@ -354,12 +371,14 @@ class KexGroup14(KexDH): '15728e5a8aacaa68ffffffffffffffff', 16) super(KexGroup14, self).__init__('sha1', 2, p) + def get_ssh_version(version_desc): if version_desc.startswith('d'): return ('Dropbear SSH', version_desc[1:]) else: return ('OpenSSH', version_desc) + def get_alg_since_text(alg_desc): tv = [] versions = alg_desc[0] @@ -368,7 +387,8 @@ def get_alg_since_text(alg_desc): tv.append('{0} {1}'.format(ssh_prefix, ssh_version)) return 'available since ' + ', '.join(tv).rstrip(', ') -def get_alg_timeframe(alg_desc, result = {}): + +def get_alg_timeframe(alg_desc, result={}): versions = alg_desc[0] vlen = len(versions) for i in range(3): @@ -383,7 +403,7 @@ def get_alg_timeframe(alg_desc, result = {}): continue for v in cversions.split(','): ssh_prefix, ssh_version = get_ssh_version(v) - if not ssh_prefix in result: + if ssh_prefix not in result: result[ssh_prefix] = [None, None, None] prev, push = result[ssh_prefix][i], False if prev is None: @@ -396,9 +416,10 @@ def get_alg_timeframe(alg_desc, result = {}): result[ssh_prefix][i] = ssh_version return result + def get_ssh_timeframe(kex): alg_timeframe = {} - algs = {'kex': kex.kex_algorithms, + algs = {'kex': kex.kex_algorithms, 'key': kex.key_algorithms, 'enc': kex.server.encryption, 'mac': kex.server.mac} @@ -459,7 +480,7 @@ KEX_DB = { 'ecdsa-sha2-nistp384': [['5.7,d2013.62'], [TEXT_CURVES_WEAK], [TEXT_RNDSIG_KEY]], 'ecdsa-sha2-nistp521': [['5.7,d2013.62'], [TEXT_CURVES_WEAK], [TEXT_RNDSIG_KEY]], 'ssh-rsa-cert-v00@openssh.com': [['5.4', '6.9'], [FAIL_OPENSSH70_LEGACY], []], - 'ssh-dss-cert-v00@openssh.com': [['5.4', '6.9'], [FAIL_OPENSSH70_LEGACY], [TEXT_MODULUS_SIZE, TEXT_RNDSIG_KEY]], + 'ssh-dss-cert-v00@openssh.com': [['5.4', '6.9'], [FAIL_OPENSSH70_LEGACY], [TEXT_MODULUS_SIZE, TEXT_RNDSIG_KEY]], 'ssh-rsa-cert-v01@openssh.com': [['5.6']], 'ssh-dss-cert-v01@openssh.com': [['5.6', '6.9'], [FAIL_OPENSSH70_WEAK], [TEXT_MODULUS_SIZE, TEXT_RNDSIG_KEY]], 'ecdsa-sha2-nistp256-cert-v01@openssh.com': [['5.7'], [TEXT_CURVES_WEAK], [TEXT_RNDSIG_KEY]], @@ -470,7 +491,7 @@ KEX_DB = { 'none': [['1.2.2,d2013.56'], [FAIL_PLAINTEXT]], '3des-cbc': [['1.2.2,d0.28', '6.6', None], [FAIL_OPENSSH67_UNSAFE], [TEXT_CIPHER_WEAK, TEXT_CIPHER_MODE, TEXT_BLOCK_SIZE]], '3des-ctr': [['d0.52']], - 'blowfish-cbc': [['1.2.2,d0.28', '6.6,d0.52', '7.1,d0.52'], [FAIL_OPENSSH67_UNSAFE, FAIL_DBEAR53_DISABLED], [WARN_OPENSSH72_LEGACY, TEXT_CIPHER_MODE, TEXT_BLOCK_SIZE]], + 'blowfish-cbc': [['1.2.2,d0.28', '6.6,d0.52', '7.1,d0.52'], [FAIL_OPENSSH67_UNSAFE, FAIL_DBEAR53_DISABLED], [WARN_OPENSSH72_LEGACY, TEXT_CIPHER_MODE, TEXT_BLOCK_SIZE]], 'twofish-cbc': [['d0.28', 'd2014.66'], [FAIL_DBEAR67_DISABLED], [TEXT_CIPHER_MODE]], 'twofish128-cbc': [['d0.47', 'd2014.66'], [FAIL_DBEAR67_DISABLED], [TEXT_CIPHER_MODE]], 'twofish256-cbc': [['d0.47', 'd2014.66'], [FAIL_DBEAR67_DISABLED], [TEXT_CIPHER_MODE]], @@ -520,6 +541,7 @@ KEX_DB = { } } + def output_algorithms(title, alg_type, algorithms, maxlen=0): with OutputBuffer() as obuf: for algorithm in algorithms: @@ -529,6 +551,7 @@ def output_algorithms(title, alg_type, algorithms, maxlen=0): obuf.flush() out.sep() + def output_algorithm(alg_type, alg_name, alg_max_len=0): prefix = '(' + alg_type + ') ' if alg_max_len == 0: @@ -543,7 +566,7 @@ def output_algorithm(alg_type, alg_name, alg_max_len=0): texts.append((level, get_alg_since_text(alg_desc))) idx = idx + 1 if ldesc > idx: - for t in alg_desc[idx]: + for t in alg_desc[idx]: texts.append((level, t)) if len(texts) == 0: texts.append(('info', '')) @@ -556,7 +579,7 @@ def output_algorithm(alg_type, alg_name, alg_max_len=0): if first: if first and level == 'info': f = out.good - f(prefix + alg_name + padding +' -- ' + text) + f(prefix + alg_name + padding + ' -- ' + text) first = False else: if out.verbose: @@ -564,6 +587,7 @@ def output_algorithm(alg_type, alg_name, alg_max_len=0): else: f(' ' * len(prefix + alg_name) + padding + ' `- ' + text) + def output_compatibility(kex, client=False): ssh_timeframe = get_ssh_timeframe(kex) cp = 2 if client else 1 @@ -582,6 +606,7 @@ def output_compatibility(kex, client=False): if len(comp_text) > 0: out.good('(gen) compatibility: ' + ', '.join(comp_text)) + def output(banner, header, kex): with OutputBuffer() as obuf: if len(header) > 0: @@ -625,6 +650,7 @@ def parse_int(v): except: return 0 + def parse_args(): host, port = None, 22 try: @@ -642,7 +668,7 @@ def parse_args(): elif o in ('-n', '--no-colors'): out.colors = False elif o in ('-v', '--verbose'): - out.verbose = True + out.verbose = True elif o in ('-l', '--level'): if a not in ('info', 'warn', 'fail'): usage('level ' + a + ' is not valid') @@ -657,6 +683,7 @@ def parse_args(): usage('port {0} is not valid'.format(port)) return host, port + def main(): host, port = parse_args() s = SSH.Socket(host, port) @@ -678,6 +705,7 @@ def main(): kex = Kex.parse(payload) output(banner, header, kex) + if __name__ == '__main__': out = Output() main()