From 9a409e835ea4c1789a13f9011c2d9fa293e515ad Mon Sep 17 00:00:00 2001 From: Andris Raugulis Date: Thu, 3 Nov 2016 19:10:49 +0200 Subject: [PATCH] Refactor outer functions within classes. Use mypy strict optional checks and fix them. Use better comparison for compatiblity output. Add initial socket tests. --- ssh-audit.py | 796 ++++++++++++++++++++++++-------------------- test/mypy-py2.sh | 2 +- test/mypy-py3.sh | 2 +- test/test_socket.py | 41 +++ 4 files changed, 473 insertions(+), 368 deletions(-) create mode 100644 test/test_socket.py diff --git a/ssh-audit.py b/ssh-audit.py index 958d995..0d86e03 100755 --- a/ssh-audit.py +++ b/ssh-audit.py @@ -267,6 +267,117 @@ class OutputBuffer(list): class SSH2(object): # pylint: disable=too-few-public-methods + class KexDB(object): # pylint: disable=too-few-public-methods + # pylint: disable=bad-whitespace + WARN_OPENSSH72_LEGACY = 'disabled (in client) since OpenSSH 7.2, legacy algorithm' + FAIL_OPENSSH70_LEGACY = 'removed since OpenSSH 7.0, legacy algorithm' + FAIL_OPENSSH70_WEAK = 'removed (in server) and disabled (in client) since OpenSSH 7.0, weak algorithm' + FAIL_OPENSSH70_LOGJAM = 'disabled (in client) since OpenSSH 7.0, logjam attack' + INFO_OPENSSH69_CHACHA = 'default cipher since OpenSSH 6.9.' + FAIL_OPENSSH67_UNSAFE = 'removed (in server) since OpenSSH 6.7, unsafe algorithm' + FAIL_OPENSSH61_REMOVE = 'removed since OpenSSH 6.1, removed from specification' + FAIL_OPENSSH31_REMOVE = 'removed since OpenSSH 3.1' + FAIL_DBEAR67_DISABLED = 'disabled since Dropbear SSH 2015.67' + FAIL_DBEAR53_DISABLED = 'disabled since Dropbear SSH 0.53' + FAIL_PLAINTEXT = 'no encryption/integrity' + WARN_CURVES_WEAK = 'using weak elliptic curves' + WARN_RNDSIG_KEY = 'using weak random number generator could reveal the key' + WARN_MODULUS_SIZE = 'using small 1024-bit modulus' + WARN_MODULUS_CUSTOM = 'using custom size modulus (possibly weak)' + WARN_HASH_WEAK = 'using weak hashing algorithm' + WARN_CIPHER_MODE = 'using weak cipher mode' + WARN_BLOCK_SIZE = 'using small 64-bit block size' + WARN_CIPHER_WEAK = 'using weak cipher' + WARN_ENCRYPT_AND_MAC = 'using encrypt-and-MAC mode' + WARN_TAG_SIZE = 'using small 64-bit tag size' + + ALGORITHMS = { + 'kex': { + 'diffie-hellman-group1-sha1': [['2.3.0,d0.28,l10.2', '6.6', '6.9'], [FAIL_OPENSSH67_UNSAFE, FAIL_OPENSSH70_LOGJAM], [WARN_MODULUS_SIZE, WARN_HASH_WEAK]], + 'diffie-hellman-group14-sha1': [['3.9,d0.53,l10.6.0'], [], [WARN_HASH_WEAK]], + 'diffie-hellman-group14-sha256': [['7.3,d2016.73']], + 'diffie-hellman-group16-sha512': [['7.3,d2016.73']], + 'diffie-hellman-group18-sha512': [['7.3']], + 'diffie-hellman-group-exchange-sha1': [['2.3.0', '6.6', None], [FAIL_OPENSSH67_UNSAFE], [WARN_HASH_WEAK]], + 'diffie-hellman-group-exchange-sha256': [['4.4'], [], [WARN_MODULUS_CUSTOM]], + 'ecdh-sha2-nistp256': [['5.7,d2013.62,l10.6.0'], [WARN_CURVES_WEAK]], + 'ecdh-sha2-nistp384': [['5.7,d2013.62'], [WARN_CURVES_WEAK]], + 'ecdh-sha2-nistp521': [['5.7,d2013.62'], [WARN_CURVES_WEAK]], + 'curve25519-sha256@libssh.org': [['6.5,d2013.62,l10.6.0']], + 'kexguess2@matt.ucc.asn.au': [['d2013.57']], + }, + 'key': { + 'rsa-sha2-256': [['7.2']], + 'rsa-sha2-512': [['7.2']], + 'ssh-ed25519': [['6.5,l10.7.0']], + 'ssh-ed25519-cert-v01@openssh.com': [['6.5']], + 'ssh-rsa': [['2.5.0,d0.28,l10.2']], + 'ssh-dss': [['2.1.0,d0.28,l10.2', '6.9'], [FAIL_OPENSSH70_WEAK], [WARN_MODULUS_SIZE, WARN_RNDSIG_KEY]], + 'ecdsa-sha2-nistp256': [['5.7,d2013.62,l10.6.4'], [WARN_CURVES_WEAK], [WARN_RNDSIG_KEY]], + 'ecdsa-sha2-nistp384': [['5.7,d2013.62,l10.6.4'], [WARN_CURVES_WEAK], [WARN_RNDSIG_KEY]], + 'ecdsa-sha2-nistp521': [['5.7,d2013.62,l10.6.4'], [WARN_CURVES_WEAK], [WARN_RNDSIG_KEY]], + 'ssh-rsa-cert-v00@openssh.com': [['5.4', '6.9'], [FAIL_OPENSSH70_LEGACY], []], + 'ssh-dss-cert-v00@openssh.com': [['5.4', '6.9'], [FAIL_OPENSSH70_LEGACY], [WARN_MODULUS_SIZE, WARN_RNDSIG_KEY]], + 'ssh-rsa-cert-v01@openssh.com': [['5.6']], + 'ssh-dss-cert-v01@openssh.com': [['5.6', '6.9'], [FAIL_OPENSSH70_WEAK], [WARN_MODULUS_SIZE, WARN_RNDSIG_KEY]], + 'ecdsa-sha2-nistp256-cert-v01@openssh.com': [['5.7'], [WARN_CURVES_WEAK], [WARN_RNDSIG_KEY]], + 'ecdsa-sha2-nistp384-cert-v01@openssh.com': [['5.7'], [WARN_CURVES_WEAK], [WARN_RNDSIG_KEY]], + 'ecdsa-sha2-nistp521-cert-v01@openssh.com': [['5.7'], [WARN_CURVES_WEAK], [WARN_RNDSIG_KEY]], + }, + 'enc': { + 'none': [['1.2.2,d2013.56,l10.2'], [FAIL_PLAINTEXT]], + '3des-cbc': [['1.2.2,d0.28,l10.2', '6.6', None], [FAIL_OPENSSH67_UNSAFE], [WARN_CIPHER_WEAK, WARN_CIPHER_MODE, WARN_BLOCK_SIZE]], + '3des-ctr': [['d0.52']], + 'blowfish-cbc': [['1.2.2,d0.28,l10.2', '6.6,d0.52', '7.1,d0.52'], [FAIL_OPENSSH67_UNSAFE, FAIL_DBEAR53_DISABLED], [WARN_OPENSSH72_LEGACY, WARN_CIPHER_MODE, WARN_BLOCK_SIZE]], + 'twofish-cbc': [['d0.28', 'd2014.66'], [FAIL_DBEAR67_DISABLED], [WARN_CIPHER_MODE]], + 'twofish128-cbc': [['d0.47', 'd2014.66'], [FAIL_DBEAR67_DISABLED], [WARN_CIPHER_MODE]], + 'twofish256-cbc': [['d0.47', 'd2014.66'], [FAIL_DBEAR67_DISABLED], [WARN_CIPHER_MODE]], + 'twofish128-ctr': [['d2015.68']], + 'twofish256-ctr': [['d2015.68']], + 'cast128-cbc': [['2.1.0', '6.6', '7.1'], [FAIL_OPENSSH67_UNSAFE], [WARN_OPENSSH72_LEGACY, WARN_CIPHER_MODE, WARN_BLOCK_SIZE]], + 'arcfour': [['2.1.0', '6.6', '7.1'], [FAIL_OPENSSH67_UNSAFE], [WARN_OPENSSH72_LEGACY, WARN_CIPHER_WEAK]], + 'arcfour128': [['4.2', '6.6', '7.1'], [FAIL_OPENSSH67_UNSAFE], [WARN_OPENSSH72_LEGACY, WARN_CIPHER_WEAK]], + 'arcfour256': [['4.2', '6.6', '7.1'], [FAIL_OPENSSH67_UNSAFE], [WARN_OPENSSH72_LEGACY, WARN_CIPHER_WEAK]], + 'aes128-cbc': [['2.3.0,d0.28,l10.2', '6.6', None], [FAIL_OPENSSH67_UNSAFE], [WARN_CIPHER_MODE]], + 'aes192-cbc': [['2.3.0,l10.2', '6.6', None], [FAIL_OPENSSH67_UNSAFE], [WARN_CIPHER_MODE]], + 'aes256-cbc': [['2.3.0,d0.47,l10.2', '6.6', None], [FAIL_OPENSSH67_UNSAFE], [WARN_CIPHER_MODE]], + 'rijndael128-cbc': [['2.3.0', '3.0.2'], [FAIL_OPENSSH31_REMOVE], [WARN_CIPHER_MODE]], + 'rijndael192-cbc': [['2.3.0', '3.0.2'], [FAIL_OPENSSH31_REMOVE], [WARN_CIPHER_MODE]], + 'rijndael256-cbc': [['2.3.0', '3.0.2'], [FAIL_OPENSSH31_REMOVE], [WARN_CIPHER_MODE]], + 'rijndael-cbc@lysator.liu.se': [['2.3.0', '6.6', '7.1'], [FAIL_OPENSSH67_UNSAFE], [WARN_OPENSSH72_LEGACY, WARN_CIPHER_MODE]], + 'aes128-ctr': [['3.7,d0.52,l10.4.1']], + 'aes192-ctr': [['3.7,l10.4.1']], + 'aes256-ctr': [['3.7,d0.52,l10.4.1']], + 'aes128-gcm@openssh.com': [['6.2']], + 'aes256-gcm@openssh.com': [['6.2']], + 'chacha20-poly1305@openssh.com': [['6.5'], [], [], [INFO_OPENSSH69_CHACHA]], + }, + 'mac': { + 'none': [['d2013.56'], [FAIL_PLAINTEXT]], + 'hmac-sha1': [['2.1.0,d0.28,l10.2'], [], [WARN_ENCRYPT_AND_MAC, WARN_HASH_WEAK]], + 'hmac-sha1-96': [['2.5.0,d0.47', '6.6', '7.1'], [FAIL_OPENSSH67_UNSAFE], [WARN_OPENSSH72_LEGACY, WARN_ENCRYPT_AND_MAC, WARN_HASH_WEAK]], + 'hmac-sha2-256': [['5.9,d2013.56,l10.7.0'], [], [WARN_ENCRYPT_AND_MAC]], + 'hmac-sha2-256-96': [['5.9', '6.0'], [FAIL_OPENSSH61_REMOVE], [WARN_ENCRYPT_AND_MAC]], + 'hmac-sha2-512': [['5.9,d2013.56,l10.7.0'], [], [WARN_ENCRYPT_AND_MAC]], + 'hmac-sha2-512-96': [['5.9', '6.0'], [FAIL_OPENSSH61_REMOVE], [WARN_ENCRYPT_AND_MAC]], + 'hmac-md5': [['2.1.0,d0.28', '6.6', '7.1'], [FAIL_OPENSSH67_UNSAFE], [WARN_OPENSSH72_LEGACY, WARN_ENCRYPT_AND_MAC, WARN_HASH_WEAK]], + 'hmac-md5-96': [['2.5.0', '6.6', '7.1'], [FAIL_OPENSSH67_UNSAFE], [WARN_OPENSSH72_LEGACY, WARN_ENCRYPT_AND_MAC, WARN_HASH_WEAK]], + 'hmac-ripemd160': [['2.5.0', '6.6', '7.1'], [FAIL_OPENSSH67_UNSAFE], [WARN_OPENSSH72_LEGACY, WARN_ENCRYPT_AND_MAC]], + 'hmac-ripemd160@openssh.com': [['2.1.0', '6.6', '7.1'], [FAIL_OPENSSH67_UNSAFE], [WARN_OPENSSH72_LEGACY, WARN_ENCRYPT_AND_MAC]], + 'umac-64@openssh.com': [['4.7'], [], [WARN_ENCRYPT_AND_MAC, WARN_TAG_SIZE]], + 'umac-128@openssh.com': [['6.2'], [], [WARN_ENCRYPT_AND_MAC]], + 'hmac-sha1-etm@openssh.com': [['6.2'], [], [WARN_HASH_WEAK]], + 'hmac-sha1-96-etm@openssh.com': [['6.2', '6.6', None], [FAIL_OPENSSH67_UNSAFE], [WARN_HASH_WEAK]], + 'hmac-sha2-256-etm@openssh.com': [['6.2']], + 'hmac-sha2-512-etm@openssh.com': [['6.2']], + 'hmac-md5-etm@openssh.com': [['6.2', '6.6', '7.1'], [FAIL_OPENSSH67_UNSAFE], [WARN_OPENSSH72_LEGACY, WARN_HASH_WEAK]], + 'hmac-md5-96-etm@openssh.com': [['6.2', '6.6', '7.1'], [FAIL_OPENSSH67_UNSAFE], [WARN_OPENSSH72_LEGACY, WARN_HASH_WEAK]], + 'hmac-ripemd160-etm@openssh.com': [['6.2', '6.6', '7.1'], [FAIL_OPENSSH67_UNSAFE], [WARN_OPENSSH72_LEGACY]], + 'umac-64-etm@openssh.com': [['6.2'], [], [WARN_TAG_SIZE]], + 'umac-128-etm@openssh.com': [['6.2']], + } + } # type: Dict[str, Dict[str, List[List[Optional[str]]]]] + class KexParty(object): def __init__(self, enc, mac, compression, languages): # type: (List[text_type], List[text_type], List[text_type], List[text_type]) -> None @@ -414,7 +525,7 @@ class SSH1(object): _crc32 = None # type: Optional[SSH1.CRC32] CIPHERS = ['none', 'idea', 'des', '3des', 'tss', 'rc4', 'blowfish'] - AUTHS = [None, 'rhosts', 'rsa', 'password', 'rhosts_rsa', 'tis', 'kerberos'] + AUTHS = ['none', 'rhosts', 'rsa', 'password', 'rhosts_rsa', 'tis', 'kerberos'] @classmethod def crc32(cls, v): @@ -452,7 +563,7 @@ class SSH1(object): 'tis': [['1.2.2']], 'kerberos': [['1.2.2', '3.6'], [FAIL_OPENSSH37_REMOVE]], } - } # type: Dict[str, Dict[str, List[List[str]]]] + } # type: Dict[str, Dict[str, List[List[Optional[str]]]]] class PublicKeyMessage(object): def __init__(self, cookie, skey, hkey, pflags, cmask, amask): @@ -887,7 +998,7 @@ class SSH(object): # pylint: disable=too-few-public-methods @classmethod def _extract_os_version(cls, c): - # type: (Optional[str]) -> str + # type: (Optional[str]) -> Optional[str] if c is None: return None mx = re.match(r'^NetBSD(?:_Secure_Shell)?(?:[\s-]+(\d{8})(.*))?$', c) @@ -914,10 +1025,11 @@ class SSH(object): # pylint: disable=too-few-public-methods @classmethod def parse(cls, banner): - # type: (SSH.Banner) -> SSH.Software + # type: (SSH.Banner) -> Optional[SSH.Software] # pylint: disable=too-many-return-statements software = str(banner.software) mx = re.match(r'^dropbear_([\d\.]+\d+)(.*)', software) + v = None # type: Optional[str] if mx: patch = cls._fix_patch(mx.group(2)) v, p = 'Matt Johnston', SSH.Product.DropbearSSH @@ -957,7 +1069,7 @@ class SSH(object): # pylint: disable=too-few-public-methods RX_BANNER = re.compile(r'^({0}(?:(?:-{0})*)){1}$'.format(_RXP, _RXR)) def __init__(self, protocol, software, comments, valid_ascii): - # type: (Tuple[int, int], str, str, bool) -> None + # type: (Tuple[int, int], Optional[str], Optional[str], bool) -> None self.__protocol = protocol self.__software = software self.__comments = comments @@ -970,12 +1082,12 @@ class SSH(object): # pylint: disable=too-few-public-methods @property def software(self): - # type: () -> str + # type: () -> Optional[str] return self.__software @property def comments(self): - # type: () -> str + # type: () -> Optional[str] return self.__comments @property @@ -1039,6 +1151,268 @@ class SSH(object): # pylint: disable=too-few-public-methods r = h.decode('ascii').rstrip('=') return u'SHA256:{0}'.format(r) + class Algorithm(object): + @staticmethod + def get_ssh_version(version_desc): + # type: (str) -> Tuple[str, str] + if version_desc.startswith('d'): + return (SSH.Product.DropbearSSH, version_desc[1:]) + elif version_desc.startswith('l1'): + return (SSH.Product.LibSSH, version_desc[2:]) + else: + return (SSH.Product.OpenSSH, version_desc) + + @classmethod + def get_timeframe(cls, versions, for_server=True, result=None): + # type: (List[Optional[str]], bool, Optional[Dict[str, List[Optional[str]]]]) -> Dict[str, List[Optional[str]]] + result = result or {} + vlen = len(versions) + for i in range(3): + if i > vlen - 1: + if i == 2 and vlen > 1: + cversions = versions[1] + else: + continue + else: + cversions = versions[i] + if cversions is None: + continue + for v in cversions.split(','): + ssh_prefix, ssh_version = cls.get_ssh_version(v) + if not ssh_version: + continue + if ssh_version.endswith('C'): + if for_server: + continue + ssh_version = ssh_version[:-1] + if ssh_prefix not in result: + result[ssh_prefix] = [None, None, None] + prev, push = result[ssh_prefix][i], False + if prev is None: + push = True + elif i == 0 and prev < ssh_version: + push = True + elif i > 0 and prev > ssh_version: + push = True + if push: + result[ssh_prefix][i] = ssh_version + return result + + @classmethod + def get_since_text(cls, versions): + # type: (List[Optional[str]]) -> Optional[text_type] + tv = [] + if len(versions) == 0 or versions[0] is None: + return None + for v in versions[0].split(','): + ssh_prefix, ssh_version = cls.get_ssh_version(v) + if not ssh_version: + continue + if ssh_prefix in [SSH.Product.LibSSH]: + continue + if ssh_version.endswith('C'): + ssh_version = '{0} (client only)'.format(ssh_version[:-1]) + tv.append('{0} {1}'.format(ssh_prefix, ssh_version)) + if len(tv) == 0: + return None + return 'available since ' + ', '.join(tv).rstrip(', ') + + class Algorithms(object): + def __init__(self, pkm, kex): + # type: (Optional[SSH1.PublicKeyMessage], Optional[SSH2.Kex]) -> None + self.__ssh1kex = pkm + self.__ssh2kex = kex + + @property + def ssh1kex(self): + # type: () -> Optional[SSH1.PublicKeyMessage] + return self.__ssh1kex + + @property + def ssh2kex(self): + # type: () -> Optional[SSH2.Kex] + return self.__ssh2kex + + @property + def ssh1(self): + # type: () -> Optional[SSH.Algorithms.Item] + if self.ssh1kex is None: + return None + item = SSH.Algorithms.Item(1, SSH1.KexDB.ALGORITHMS) + item.add('key', [u'ssh-rsa1']) + item.add('enc', self.ssh1kex.supported_ciphers) + item.add('aut', self.ssh1kex.supported_authentications) + return item + + @property + def ssh2(self): + # type: () -> Optional[SSH.Algorithms.Item] + if self.ssh2kex is None: + return None + item = SSH.Algorithms.Item(2, SSH2.KexDB.ALGORITHMS) + item.add('kex', self.ssh2kex.kex_algorithms) + item.add('key', self.ssh2kex.key_algorithms) + item.add('enc', self.ssh2kex.server.encryption) + item.add('mac', self.ssh2kex.server.mac) + return item + + @property + def values(self): + # type: () -> Iterable[SSH.Algorithms.Item] + for item in [self.ssh1, self.ssh2]: + if item is not None: + yield item + + @property + def maxlen(self): + # type: () -> int + ml, maxlen = lambda l: max(len(i) for i in l), 0 + if self.ssh1kex is not None: + maxlen = max(ml(self.ssh1kex.supported_ciphers), + ml(self.ssh1kex.supported_authentications), + maxlen) + if self.ssh2kex is not None: + maxlen = max(ml(self.ssh2kex.kex_algorithms), + ml(self.ssh2kex.key_algorithms), + ml(self.ssh2kex.server.encryption), + ml(self.ssh2kex.server.mac), + maxlen) + return maxlen + + def get_ssh_timeframe(self, for_server=True): + # type: (bool) -> Dict[str, List[Optional[str]]] + r = {} # type: Dict[str, List[Optional[str]]] + for alg_pair in self.values: + alg_db = alg_pair.db + for alg_type, alg_list in alg_pair.items(): + for alg_name in alg_list: + alg_name_native = utils.to_ntext(alg_name) + alg_desc = alg_db[alg_type].get(alg_name_native) + if alg_desc is None: + continue + versions = alg_desc[0] + r = SSH.Algorithm.get_timeframe(versions, for_server, r) + return r + + def get_recommendations(self, software, for_server=True): + # type: (Optional[SSH.Software], bool) -> Tuple[Optional[SSH.Software], Dict[int, Dict[str, Dict[str, Dict[str, int]]]]] + # pylint: disable=too-many-locals,too-many-statements + vproducts = [SSH.Product.OpenSSH, + SSH.Product.DropbearSSH, + SSH.Product.LibSSH] + if software is not None: + if software.product not in vproducts: + software = None + if software is None: + ssh_timeframe = self.get_ssh_timeframe(for_server) + for product in vproducts: + if product not in ssh_timeframe: + continue + version = ssh_timeframe[product][0] + if version is not None: + software = SSH.Software(None, product, version, None, None) + break + rec = {} # type: Dict[int, Dict[str, Dict[str, Dict[str, int]]]] + if software is None: + return software, rec + for alg_pair in self.values: + sshv, alg_db = alg_pair.sshv, alg_pair.db + rec[sshv] = {} + for alg_type, alg_list in alg_pair.items(): + if alg_type == 'aut': + continue + rec[sshv][alg_type] = {'add': {}, 'del': {}} + for n, alg_desc in alg_db[alg_type].items(): + if alg_type == 'key' and '-cert-' in n: + continue + versions = alg_desc[0] + if len(versions) == 0 or versions[0] is None: + continue + matches = False + for v in versions[0].split(','): + ssh_prefix, ssh_version = SSH.Algorithm.get_ssh_version(v) + if not ssh_version: + continue + if ssh_prefix != software.product: + continue + if ssh_version.endswith('C'): + if for_server: + continue + ssh_version = ssh_version[:-1] + if software.compare_version(ssh_version) < 0: + continue + matches = True + break + if not matches: + continue + adl, faults = len(alg_desc), 0 + for i in range(1, 3): + if not adl > i: + continue + fc = len(alg_desc[i]) + if fc > 0: + faults += pow(10, 2 - i) * fc + if n not in alg_list: + if faults > 0: + continue + rec[sshv][alg_type]['add'][n] = 0 + else: + if faults == 0: + continue + if n == 'diffie-hellman-group-exchange-sha256': + if software.compare_version('7.3') < 0: + continue + rec[sshv][alg_type]['del'][n] = faults + add_count = len(rec[sshv][alg_type]['add']) + del_count = len(rec[sshv][alg_type]['del']) + new_alg_count = len(alg_list) + add_count - del_count + if new_alg_count < 1 and del_count > 0: + mf = min(rec[sshv][alg_type]['del'].values()) + new_del = {} + for k, cf in rec[sshv][alg_type]['del'].items(): + if cf != mf: + new_del[k] = cf + if del_count != len(new_del): + rec[sshv][alg_type]['del'] = new_del + new_alg_count += del_count - len(new_del) + if new_alg_count < 1: + del rec[sshv][alg_type] + else: + if add_count == 0: + del rec[sshv][alg_type]['add'] + if del_count == 0: + del rec[sshv][alg_type]['del'] + if len(rec[sshv][alg_type]) == 0: + del rec[sshv][alg_type] + if len(rec[sshv]) == 0: + del rec[sshv] + return software, rec + + class Item(object): + def __init__(self, sshv, db): + # type: (int, Dict[str, Dict[str, List[List[Optional[str]]]]]) -> None + self.__sshv = sshv + self.__db = db + self.__storage = {} # type: Dict[str, List[text_type]] + + @property + def sshv(self): + # type: () -> int + return self.__sshv + + @property + def db(self): + # type: () -> Dict[str, Dict[str, List[List[Optional[str]]]]] + return self.__db + + def add(self, key, value): + # type: (str, List[text_type]) -> None + self.__storage[key] = value + + def items(self): + # type: () -> Iterable[Tuple[str, List[text_type]]] + return self.__storage.items() + class Security(object): # pylint: disable=too-few-public-methods # pylint: disable=bad-whitespace CVE = { @@ -1080,19 +1454,20 @@ class SSH(object): # pylint: disable=too-few-public-methods SM_BANNER_SENT = 1 def __init__(self, host, port): - # type: (str, int) -> None + # type: (Optional[str], int) -> None super(SSH.Socket, self).__init__() + self.__sock = None # type: Optional[socket.socket] self.__block_size = 8 self.__state = 0 self.__header = [] # type: List[text_type] self.__banner = None # type: Optional[SSH.Banner] + if host is None: + raise ValueError('undefined host') + nport = utils.parse_int(port) + if nport < 1 or nport > 65535: + raise ValueError('invalid port: {0}'.format(port)) self.__host = host - self.__port = port - self.__sock = None # type: socket.socket - - def __enter__(self): - # type: () -> SSH.Socket - return self + self.__port = nport def _resolve(self, ipvo): # type: (Sequence[int]) -> Iterable[Tuple[int, Tuple[Any, ...]]] @@ -1142,6 +1517,8 @@ class SSH(object): # pylint: disable=too-few-public-methods def get_banner(self, sshv=2): # type: (int) -> Tuple[Optional[SSH.Banner], List[text_type], Optional[str]] + if self.__sock is None: + return self.__banner, self.__header, 'not connected' banner = 'SSH-{0}-OpenSSH_7.3'.format('1.5' if sshv == 1 else '2.0') rto = self.__sock.gettimeout() self.__sock.settimeout(0.7) @@ -1171,6 +1548,8 @@ class SSH(object): # pylint: disable=too-few-public-methods def recv(self, size=2048): # type: (int) -> Tuple[int, Optional[str]] + if self.__sock is None: + return (-1, 'not connected') try: data = self.__sock.recv(size) except socket.timeout: @@ -1190,6 +1569,8 @@ class SSH(object): # pylint: disable=too-few-public-methods def send(self, data): # type: (binary_type) -> Tuple[int, Optional[str]] + if self.__sock is None: + return (-1, 'not connected') try: self.__sock.send(data) return (0, None) @@ -1278,7 +1659,7 @@ class SSH(object): # pylint: disable=too-few-public-methods try: if s is not None: s.shutdown(socket.SHUT_RDWR) - s.close() + s.close() # pragma: nocover except: # pylint: disable=bare-except pass @@ -1286,24 +1667,20 @@ class SSH(object): # pylint: disable=too-few-public-methods # type: () -> None self.__cleanup() - def __exit__(self, *args): - # type: (*Any) -> None - self.__cleanup() - def __cleanup(self): # type: () -> None self._close_socket(self.__sock) -class KexDH(object): +class KexDH(object): # pragma: nocover def __init__(self, alg, g, p): # type: (str, int, int) -> None self.__alg = alg self.__g = g self.__p = p self.__q = (self.__p - 1) // 2 - self.__x = None # type: Optional[int] - self.__e = None # type: Optional[int] + self.__x = 0 + self.__e = 0 def send_init(self, s): # type: (SSH.Socket) -> None @@ -1315,7 +1692,7 @@ class KexDH(object): s.send_packet() -class KexGroup1(KexDH): +class KexGroup1(KexDH): # pragma: nocover def __init__(self): # type: () -> None # rfc2409: second oakley group @@ -1327,7 +1704,7 @@ class KexGroup1(KexDH): super(KexGroup1, self).__init__('sha1', 2, p) -class KexGroup14(KexDH): +class KexGroup14(KexDH): # pragma: nocover def __init__(self): # type: () -> None # rfc3526: 2048-bit modp group @@ -1343,316 +1720,9 @@ class KexGroup14(KexDH): super(KexGroup14, self).__init__('sha1', 2, p) -class KexDB(object): # pylint: disable=too-few-public-methods - # pylint: disable=bad-whitespace - WARN_OPENSSH72_LEGACY = 'disabled (in client) since OpenSSH 7.2, legacy algorithm' - FAIL_OPENSSH70_LEGACY = 'removed since OpenSSH 7.0, legacy algorithm' - FAIL_OPENSSH70_WEAK = 'removed (in server) and disabled (in client) since OpenSSH 7.0, weak algorithm' - FAIL_OPENSSH70_LOGJAM = 'disabled (in client) since OpenSSH 7.0, logjam attack' - INFO_OPENSSH69_CHACHA = 'default cipher since OpenSSH 6.9.' - FAIL_OPENSSH67_UNSAFE = 'removed (in server) since OpenSSH 6.7, unsafe algorithm' - FAIL_OPENSSH61_REMOVE = 'removed since OpenSSH 6.1, removed from specification' - FAIL_OPENSSH31_REMOVE = 'removed since OpenSSH 3.1' - FAIL_DBEAR67_DISABLED = 'disabled since Dropbear SSH 2015.67' - FAIL_DBEAR53_DISABLED = 'disabled since Dropbear SSH 0.53' - FAIL_PLAINTEXT = 'no encryption/integrity' - WARN_CURVES_WEAK = 'using weak elliptic curves' - WARN_RNDSIG_KEY = 'using weak random number generator could reveal the key' - WARN_MODULUS_SIZE = 'using small 1024-bit modulus' - WARN_MODULUS_CUSTOM = 'using custom size modulus (possibly weak)' - WARN_HASH_WEAK = 'using weak hashing algorithm' - WARN_CIPHER_MODE = 'using weak cipher mode' - WARN_BLOCK_SIZE = 'using small 64-bit block size' - WARN_CIPHER_WEAK = 'using weak cipher' - WARN_ENCRYPT_AND_MAC = 'using encrypt-and-MAC mode' - WARN_TAG_SIZE = 'using small 64-bit tag size' - - ALGORITHMS = { - 'kex': { - 'diffie-hellman-group1-sha1': [['2.3.0,d0.28,l10.2', '6.6', '6.9'], [FAIL_OPENSSH67_UNSAFE, FAIL_OPENSSH70_LOGJAM], [WARN_MODULUS_SIZE, WARN_HASH_WEAK]], - 'diffie-hellman-group14-sha1': [['3.9,d0.53,l10.6.0'], [], [WARN_HASH_WEAK]], - 'diffie-hellman-group14-sha256': [['7.3,d2016.73']], - 'diffie-hellman-group16-sha512': [['7.3,d2016.73']], - 'diffie-hellman-group18-sha512': [['7.3']], - 'diffie-hellman-group-exchange-sha1': [['2.3.0', '6.6', None], [FAIL_OPENSSH67_UNSAFE], [WARN_HASH_WEAK]], - 'diffie-hellman-group-exchange-sha256': [['4.4'], [], [WARN_MODULUS_CUSTOM]], - 'ecdh-sha2-nistp256': [['5.7,d2013.62,l10.6.0'], [WARN_CURVES_WEAK]], - 'ecdh-sha2-nistp384': [['5.7,d2013.62'], [WARN_CURVES_WEAK]], - 'ecdh-sha2-nistp521': [['5.7,d2013.62'], [WARN_CURVES_WEAK]], - 'curve25519-sha256@libssh.org': [['6.5,d2013.62,l10.6.0']], - 'kexguess2@matt.ucc.asn.au': [['d2013.57']], - }, - 'key': { - 'rsa-sha2-256': [['7.2']], - 'rsa-sha2-512': [['7.2']], - 'ssh-ed25519': [['6.5,l10.7.0']], - 'ssh-ed25519-cert-v01@openssh.com': [['6.5']], - 'ssh-rsa': [['2.5.0,d0.28,l10.2']], - 'ssh-dss': [['2.1.0,d0.28,l10.2', '6.9'], [FAIL_OPENSSH70_WEAK], [WARN_MODULUS_SIZE, WARN_RNDSIG_KEY]], - 'ecdsa-sha2-nistp256': [['5.7,d2013.62,l10.6.4'], [WARN_CURVES_WEAK], [WARN_RNDSIG_KEY]], - 'ecdsa-sha2-nistp384': [['5.7,d2013.62,l10.6.4'], [WARN_CURVES_WEAK], [WARN_RNDSIG_KEY]], - 'ecdsa-sha2-nistp521': [['5.7,d2013.62,l10.6.4'], [WARN_CURVES_WEAK], [WARN_RNDSIG_KEY]], - 'ssh-rsa-cert-v00@openssh.com': [['5.4', '6.9'], [FAIL_OPENSSH70_LEGACY], []], - 'ssh-dss-cert-v00@openssh.com': [['5.4', '6.9'], [FAIL_OPENSSH70_LEGACY], [WARN_MODULUS_SIZE, WARN_RNDSIG_KEY]], - 'ssh-rsa-cert-v01@openssh.com': [['5.6']], - 'ssh-dss-cert-v01@openssh.com': [['5.6', '6.9'], [FAIL_OPENSSH70_WEAK], [WARN_MODULUS_SIZE, WARN_RNDSIG_KEY]], - 'ecdsa-sha2-nistp256-cert-v01@openssh.com': [['5.7'], [WARN_CURVES_WEAK], [WARN_RNDSIG_KEY]], - 'ecdsa-sha2-nistp384-cert-v01@openssh.com': [['5.7'], [WARN_CURVES_WEAK], [WARN_RNDSIG_KEY]], - 'ecdsa-sha2-nistp521-cert-v01@openssh.com': [['5.7'], [WARN_CURVES_WEAK], [WARN_RNDSIG_KEY]], - }, - 'enc': { - 'none': [['1.2.2,d2013.56,l10.2'], [FAIL_PLAINTEXT]], - '3des-cbc': [['1.2.2,d0.28,l10.2', '6.6', None], [FAIL_OPENSSH67_UNSAFE], [WARN_CIPHER_WEAK, WARN_CIPHER_MODE, WARN_BLOCK_SIZE]], - '3des-ctr': [['d0.52']], - 'blowfish-cbc': [['1.2.2,d0.28,l10.2', '6.6,d0.52', '7.1,d0.52'], [FAIL_OPENSSH67_UNSAFE, FAIL_DBEAR53_DISABLED], [WARN_OPENSSH72_LEGACY, WARN_CIPHER_MODE, WARN_BLOCK_SIZE]], - 'twofish-cbc': [['d0.28', 'd2014.66'], [FAIL_DBEAR67_DISABLED], [WARN_CIPHER_MODE]], - 'twofish128-cbc': [['d0.47', 'd2014.66'], [FAIL_DBEAR67_DISABLED], [WARN_CIPHER_MODE]], - 'twofish256-cbc': [['d0.47', 'd2014.66'], [FAIL_DBEAR67_DISABLED], [WARN_CIPHER_MODE]], - 'twofish128-ctr': [['d2015.68']], - 'twofish256-ctr': [['d2015.68']], - 'cast128-cbc': [['2.1.0', '6.6', '7.1'], [FAIL_OPENSSH67_UNSAFE], [WARN_OPENSSH72_LEGACY, WARN_CIPHER_MODE, WARN_BLOCK_SIZE]], - 'arcfour': [['2.1.0', '6.6', '7.1'], [FAIL_OPENSSH67_UNSAFE], [WARN_OPENSSH72_LEGACY, WARN_CIPHER_WEAK]], - 'arcfour128': [['4.2', '6.6', '7.1'], [FAIL_OPENSSH67_UNSAFE], [WARN_OPENSSH72_LEGACY, WARN_CIPHER_WEAK]], - 'arcfour256': [['4.2', '6.6', '7.1'], [FAIL_OPENSSH67_UNSAFE], [WARN_OPENSSH72_LEGACY, WARN_CIPHER_WEAK]], - 'aes128-cbc': [['2.3.0,d0.28,l10.2', '6.6', None], [FAIL_OPENSSH67_UNSAFE], [WARN_CIPHER_MODE]], - 'aes192-cbc': [['2.3.0,l10.2', '6.6', None], [FAIL_OPENSSH67_UNSAFE], [WARN_CIPHER_MODE]], - 'aes256-cbc': [['2.3.0,d0.47,l10.2', '6.6', None], [FAIL_OPENSSH67_UNSAFE], [WARN_CIPHER_MODE]], - 'rijndael128-cbc': [['2.3.0', '3.0.2'], [FAIL_OPENSSH31_REMOVE], [WARN_CIPHER_MODE]], - 'rijndael192-cbc': [['2.3.0', '3.0.2'], [FAIL_OPENSSH31_REMOVE], [WARN_CIPHER_MODE]], - 'rijndael256-cbc': [['2.3.0', '3.0.2'], [FAIL_OPENSSH31_REMOVE], [WARN_CIPHER_MODE]], - 'rijndael-cbc@lysator.liu.se': [['2.3.0', '6.6', '7.1'], [FAIL_OPENSSH67_UNSAFE], [WARN_OPENSSH72_LEGACY, WARN_CIPHER_MODE]], - 'aes128-ctr': [['3.7,d0.52,l10.4.1']], - 'aes192-ctr': [['3.7,l10.4.1']], - 'aes256-ctr': [['3.7,d0.52,l10.4.1']], - 'aes128-gcm@openssh.com': [['6.2']], - 'aes256-gcm@openssh.com': [['6.2']], - 'chacha20-poly1305@openssh.com': [['6.5'], [], [], [INFO_OPENSSH69_CHACHA]], - }, - 'mac': { - 'none': [['d2013.56'], [FAIL_PLAINTEXT]], - 'hmac-sha1': [['2.1.0,d0.28,l10.2'], [], [WARN_ENCRYPT_AND_MAC, WARN_HASH_WEAK]], - 'hmac-sha1-96': [['2.5.0,d0.47', '6.6', '7.1'], [FAIL_OPENSSH67_UNSAFE], [WARN_OPENSSH72_LEGACY, WARN_ENCRYPT_AND_MAC, WARN_HASH_WEAK]], - 'hmac-sha2-256': [['5.9,d2013.56,l10.7.0'], [], [WARN_ENCRYPT_AND_MAC]], - 'hmac-sha2-256-96': [['5.9', '6.0'], [FAIL_OPENSSH61_REMOVE], [WARN_ENCRYPT_AND_MAC]], - 'hmac-sha2-512': [['5.9,d2013.56,l10.7.0'], [], [WARN_ENCRYPT_AND_MAC]], - 'hmac-sha2-512-96': [['5.9', '6.0'], [FAIL_OPENSSH61_REMOVE], [WARN_ENCRYPT_AND_MAC]], - 'hmac-md5': [['2.1.0,d0.28', '6.6', '7.1'], [FAIL_OPENSSH67_UNSAFE], [WARN_OPENSSH72_LEGACY, WARN_ENCRYPT_AND_MAC, WARN_HASH_WEAK]], - 'hmac-md5-96': [['2.5.0', '6.6', '7.1'], [FAIL_OPENSSH67_UNSAFE], [WARN_OPENSSH72_LEGACY, WARN_ENCRYPT_AND_MAC, WARN_HASH_WEAK]], - 'hmac-ripemd160': [['2.5.0', '6.6', '7.1'], [FAIL_OPENSSH67_UNSAFE], [WARN_OPENSSH72_LEGACY, WARN_ENCRYPT_AND_MAC]], - 'hmac-ripemd160@openssh.com': [['2.1.0', '6.6', '7.1'], [FAIL_OPENSSH67_UNSAFE], [WARN_OPENSSH72_LEGACY, WARN_ENCRYPT_AND_MAC]], - 'umac-64@openssh.com': [['4.7'], [], [WARN_ENCRYPT_AND_MAC, WARN_TAG_SIZE]], - 'umac-128@openssh.com': [['6.2'], [], [WARN_ENCRYPT_AND_MAC]], - 'hmac-sha1-etm@openssh.com': [['6.2'], [], [WARN_HASH_WEAK]], - 'hmac-sha1-96-etm@openssh.com': [['6.2', '6.6', None], [FAIL_OPENSSH67_UNSAFE], [WARN_HASH_WEAK]], - 'hmac-sha2-256-etm@openssh.com': [['6.2']], - 'hmac-sha2-512-etm@openssh.com': [['6.2']], - 'hmac-md5-etm@openssh.com': [['6.2', '6.6', '7.1'], [FAIL_OPENSSH67_UNSAFE], [WARN_OPENSSH72_LEGACY, WARN_HASH_WEAK]], - 'hmac-md5-96-etm@openssh.com': [['6.2', '6.6', '7.1'], [FAIL_OPENSSH67_UNSAFE], [WARN_OPENSSH72_LEGACY, WARN_HASH_WEAK]], - 'hmac-ripemd160-etm@openssh.com': [['6.2', '6.6', '7.1'], [FAIL_OPENSSH67_UNSAFE], [WARN_OPENSSH72_LEGACY]], - 'umac-64-etm@openssh.com': [['6.2'], [], [WARN_TAG_SIZE]], - 'umac-128-etm@openssh.com': [['6.2']], - } - } # type: Dict[str, Dict[str, List[List[str]]]] - - -def get_ssh_version(version_desc): - # type: (str) -> Tuple[str, str] - if version_desc.startswith('d'): - return (SSH.Product.DropbearSSH, version_desc[1:]) - elif version_desc.startswith('l1'): - return (SSH.Product.LibSSH, version_desc[2:]) - else: - return (SSH.Product.OpenSSH, version_desc) - - -def get_alg_timeframe(versions, for_server=True, result=None): - # type: (List[str], bool, Optional[Dict[str, List[Optional[str]]]]) -> Dict[str, List[Optional[str]]] - result = result or {} - vlen = len(versions) - for i in range(3): - if i > vlen - 1: - if i == 2 and vlen > 1: - cversions = versions[1] - else: - continue - else: - cversions = versions[i] - if cversions is None: - continue - for v in cversions.split(','): - ssh_prefix, ssh_version = get_ssh_version(v) - if not ssh_version: - continue - if ssh_version.endswith('C'): - if for_server: - continue - ssh_version = ssh_version[:-1] - if ssh_prefix not in result: - result[ssh_prefix] = [None, None, None] - prev, push = result[ssh_prefix][i], False - if prev is None: - push = True - elif i == 0 and prev < ssh_version: - push = True - elif i > 0 and prev > ssh_version: - push = True - if push: - result[ssh_prefix][i] = ssh_version - return result - - -def get_ssh_timeframe(alg_pairs, for_server=True): - # type: (List[Tuple[int, Dict[str, Dict[str, List[List[str]]]], List[Tuple[str, List[text_type]]]]], bool) -> Dict[str, List[Optional[str]]] - timeframe = {} # type: Dict[str, List[Optional[str]]] - for alg_pair in alg_pairs: - alg_db = alg_pair[1] - for alg_set in alg_pair[2]: - alg_type, alg_list = alg_set - for alg_name in alg_list: - alg_name_native = utils.to_ntext(alg_name) - alg_desc = alg_db[alg_type].get(alg_name_native) - if alg_desc is None: - continue - versions = alg_desc[0] - timeframe = get_alg_timeframe(versions, for_server, timeframe) - return timeframe - - -def get_alg_since_text(versions): - # type: (List[str]) -> text_type - tv = [] - if len(versions) == 0 or versions[0] is None: - return None - for v in versions[0].split(','): - ssh_prefix, ssh_version = get_ssh_version(v) - if not ssh_version: - continue - if ssh_prefix in [SSH.Product.LibSSH]: - continue - if ssh_version.endswith('C'): - ssh_version = '{0} (client only)'.format(ssh_version[:-1]) - tv.append('{0} {1}'.format(ssh_prefix, ssh_version)) - if len(tv) == 0: - return None - return 'available since ' + ', '.join(tv).rstrip(', ') - - -def get_alg_pairs(kex, pkm): - # type: (Optional[SSH2.Kex], Optional[SSH1.PublicKeyMessage]) -> List[Tuple[int, Dict[str, Dict[str, List[List[str]]]], List[Tuple[str, List[text_type]]]]] - alg_pairs = [] - if pkm is not None: - alg_pairs.append((1, SSH1.KexDB.ALGORITHMS, - [('key', [u'ssh-rsa1']), - ('enc', pkm.supported_ciphers), - ('aut', pkm.supported_authentications)])) - if kex is not None: - alg_pairs.append((2, KexDB.ALGORITHMS, - [('kex', kex.kex_algorithms), - ('key', kex.key_algorithms), - ('enc', kex.server.encryption), - ('mac', kex.server.mac)])) - return alg_pairs - - -def get_alg_recommendations(software, kex, pkm, for_server=True): - # type: (SSH.Software, SSH2.Kex, SSH1.PublicKeyMessage, bool) -> Tuple[SSH.Software, Dict[int, Dict[str, Dict[str, Dict[str, int]]]]] - # pylint: disable=too-many-locals,too-many-statements - alg_pairs = get_alg_pairs(kex, pkm) - vproducts = [SSH.Product.OpenSSH, - SSH.Product.DropbearSSH, - SSH.Product.LibSSH] - if software is not None: - if software.product not in vproducts: - software = None - if software is None: - ssh_timeframe = get_ssh_timeframe(alg_pairs, for_server) - for product in vproducts: - if product not in ssh_timeframe: - continue - version = ssh_timeframe[product][0] - if version is not None: - software = SSH.Software(None, product, version, None, None) - break - rec = {} # type: Dict[int, Dict[str, Dict[str, Dict[str, int]]]] - if software is None: - return software, rec - for alg_pair in alg_pairs: - sshv, alg_db = alg_pair[0], alg_pair[1] - rec[sshv] = {} - for alg_set in alg_pair[2]: - alg_type, alg_list = alg_set - if alg_type == 'aut': - continue - rec[sshv][alg_type] = {'add': {}, 'del': {}} - for n, alg_desc in alg_db[alg_type].items(): - if alg_type == 'key' and '-cert-' in n: - continue - versions = alg_desc[0] - if len(versions) == 0 or versions[0] is None: - continue - matches = False - for v in versions[0].split(','): - ssh_prefix, ssh_version = get_ssh_version(v) - if not ssh_version: - continue - if ssh_prefix != software.product: - continue - if ssh_version.endswith('C'): - if for_server: - continue - ssh_version = ssh_version[:-1] - if software.compare_version(ssh_version) < 0: - continue - matches = True - break - if not matches: - continue - adl, faults = len(alg_desc), 0 - for i in range(1, 3): - if not adl > i: - continue - fc = len(alg_desc[i]) - if fc > 0: - faults += pow(10, 2 - i) * fc - if n not in alg_list: - if faults > 0: - continue - rec[sshv][alg_type]['add'][n] = 0 - else: - if faults == 0: - continue - if n == 'diffie-hellman-group-exchange-sha256': - if software.compare_version('7.3') < 0: - continue - rec[sshv][alg_type]['del'][n] = faults - add_count = len(rec[sshv][alg_type]['add']) - del_count = len(rec[sshv][alg_type]['del']) - new_alg_count = len(alg_list) + add_count - del_count - if new_alg_count < 1 and del_count > 0: - mf = min(rec[sshv][alg_type]['del'].values()) - new_del = {} - for k, cf in rec[sshv][alg_type]['del'].items(): - if cf != mf: - new_del[k] = cf - if del_count != len(new_del): - rec[sshv][alg_type]['del'] = new_del - new_alg_count += del_count - len(new_del) - if new_alg_count < 1: - del rec[sshv][alg_type] - else: - if add_count == 0: - del rec[sshv][alg_type]['add'] - if del_count == 0: - del rec[sshv][alg_type]['del'] - if len(rec[sshv][alg_type]) == 0: - del rec[sshv][alg_type] - if len(rec[sshv]) == 0: - del rec[sshv] - return software, rec - def output_algorithms(title, alg_db, alg_type, algorithms, maxlen=0): - # type: (str, Dict[str, Dict[str, List[List[str]]]], str, List[text_type], int) -> None + # type: (str, Dict[str, Dict[str, List[List[Optional[str]]]]], str, List[text_type], int) -> None with OutputBuffer() as obuf: for algorithm in algorithms: output_algorithm(alg_db, alg_type, algorithm, maxlen) @@ -1663,7 +1733,7 @@ def output_algorithms(title, alg_db, alg_type, algorithms, maxlen=0): def output_algorithm(alg_db, alg_type, alg_name, alg_max_len=0): - # type: (Dict[str, Dict[str, List[List[str]]]], str, text_type, int) -> None + # type: (Dict[str, Dict[str, List[List[Optional[str]]]]], str, text_type, int) -> None prefix = '(' + alg_type + ') ' if alg_max_len == 0: alg_max_len = len(alg_name) @@ -1678,12 +1748,14 @@ def output_algorithm(alg_db, alg_type, alg_name, alg_max_len=0): for idx, level in enumerate(['fail', 'warn', 'info']): if level == 'info': versions = alg_desc[0] - since_text = get_alg_since_text(versions) + since_text = SSH.Algorithm.get_since_text(versions) if since_text: texts.append((level, since_text)) idx = idx + 1 if ldesc > idx: for t in alg_desc[idx]: + if t is None: + continue texts.append((level, t)) if len(texts) == 0: texts.append(('info', '')) @@ -1705,22 +1777,24 @@ def output_algorithm(alg_db, alg_type, alg_name, alg_max_len=0): f(' ' * len(prefix + alg_name) + padding + ' `- ' + text) -def output_compatibility(kex, pkm, for_server=True): - # type: (Optional[SSH2.Kex], Optional[SSH1.PublicKeyMessage], bool) -> None - alg_pairs = get_alg_pairs(kex, pkm) - ssh_timeframe = get_ssh_timeframe(alg_pairs, for_server) +def output_compatibility(algs, for_server=True): + # type: (SSH.Algorithms, bool) -> None + ssh_timeframe = algs.get_ssh_timeframe(for_server) vp = 1 if for_server else 2 comp_text = [] for sshd_name in [SSH.Product.OpenSSH, SSH.Product.DropbearSSH]: if sshd_name not in ssh_timeframe: continue v = ssh_timeframe[sshd_name] + if v[0] is None: + continue if v[vp] is None: comp_text.append('{0} {1}+'.format(sshd_name, v[0])) elif v[0] == v[vp]: comp_text.append('{0} {1}'.format(sshd_name, v[0])) else: - if v[vp] < v[0]: + software = SSH.Software(None, sshd_name, v[0], None, None) + if software.compare_version(v[vp]) > 0: tfmt = '{0} {1}+ (some functionality from {2})' else: tfmt = '{0} {1}-{2}' @@ -1730,7 +1804,7 @@ def output_compatibility(kex, pkm, for_server=True): def output_security_sub(sub, software, padlen): - # type: (str, SSH.Software, int) -> None + # type: (str, Optional[SSH.Software], int) -> None secdb = SSH.Security.CVE if sub == 'cve' else SSH.Security.TXT if software is None or software.product not in secdb: return @@ -1753,9 +1827,9 @@ def output_security_sub(sub, software, padlen): def output_security(banner, padlen): - # type: (SSH.Banner, int) -> None + # type: (Optional[SSH.Banner], int) -> None with OutputBuffer() as obuf: - if banner: + if banner is not None: software = SSH.Software.parse(banner) output_security_sub('cve', software, padlen) output_security_sub('txt', software, padlen) @@ -1765,14 +1839,14 @@ def output_security(banner, padlen): out.sep() -def output_fingerprint(kex, pkm, sha256=True, padlen=0): - # type: (Optional[SSH2.Kex], Optional[SSH1.PublicKeyMessage], bool, int) -> None +def output_fingerprint(algs, sha256=True, padlen=0): + # type: (SSH.Algorithms, bool, int) -> None with OutputBuffer() as obuf: fps = [] - if pkm is not None: + if algs.ssh1kex is not None: name = 'ssh-rsa1' - fp = SSH.Fingerprint(pkm.host_key_fingerprint_data) - bits = pkm.host_key_bits + fp = SSH.Fingerprint(algs.ssh1kex.host_key_fingerprint_data) + bits = algs.ssh1kex.host_key_bits fps.append((name, fp, bits)) for fpp in fps: name, fp, bits = fpp @@ -1785,11 +1859,11 @@ def output_fingerprint(kex, pkm, sha256=True, padlen=0): out.sep() -def output_recommendations(software, kex, pkm, padlen=0): - # type: (SSH.Software, SSH2.Kex, SSH1.PublicKeyMessage, int) -> None +def output_recommendations(algs, software, padlen=0): + # type: (SSH.Algorithms, Optional[SSH.Software], int) -> None for_server = True with OutputBuffer() as obuf: - software, alg_rec = get_alg_recommendations(software, kex, pkm, for_server) + software, alg_rec = algs.get_recommendations(software, for_server) for sshv in range(2, 0, -1): if sshv not in alg_rec: continue @@ -1820,6 +1894,7 @@ def output_recommendations(software, kex, pkm, padlen=0): def output(banner, header, kex=None, pkm=None): # type: (Optional[SSH.Banner], List[text_type], Optional[SSH2.Kex], Optional[SSH1.PublicKeyMessage]) -> None sshv = 1 if pkm else 2 + algs = SSH.Algorithms(pkm, kex) with OutputBuffer() as obuf: if len(header) > 0: out.info('(gen) header: ' + '\n'.join(header)) @@ -1835,7 +1910,7 @@ def output(banner, header, kex=None, pkm=None): out.good('(gen) software: {0}'.format(software)) else: software = None - output_compatibility(kex, pkm) + output_compatibility(algs) if kex is not None: compressions = [x for x in kex.server.compression if x != 'none'] if len(compressions) > 0: @@ -1847,18 +1922,7 @@ def output(banner, header, kex=None, pkm=None): out.head('# general') obuf.flush() out.sep() - ml, maxlen = lambda l: max(len(i) for i in l), 0 - if pkm is not None: - maxlen = max(ml(pkm.supported_ciphers), - ml(pkm.supported_authentications), - maxlen) - if kex is not None: - maxlen = max(ml(kex.kex_algorithms), - ml(kex.key_algorithms), - ml(kex.server.encryption), - ml(kex.server.mac), - maxlen) - maxlen += 1 + maxlen = algs.maxlen + 1 output_security(banner, maxlen) if pkm is not None: adb = SSH1.KexDB.ALGORITHMS @@ -1871,7 +1935,7 @@ def output(banner, header, kex=None, pkm=None): title, atype = 'SSH1 authentication types', 'aut' output_algorithms(title, adb, atype, auths, maxlen) if kex is not None: - adb = KexDB.ALGORITHMS + adb = SSH2.KexDB.ALGORITHMS title, atype = 'key exchange algorithms', 'kex' output_algorithms(title, adb, atype, kex.kex_algorithms, maxlen) title, atype = 'host-key algorithms', 'key' @@ -1880,8 +1944,8 @@ def output(banner, header, kex=None, pkm=None): output_algorithms(title, adb, atype, kex.server.encryption, maxlen) title, atype = 'message authentication code algorithms', 'mac' output_algorithms(title, adb, atype, kex.server.mac, maxlen) - output_recommendations(software, kex, pkm, maxlen) - output_fingerprint(kex, pkm, True, maxlen) + output_recommendations(algs, software, maxlen) + output_fingerprint(algs, True, maxlen) class Utils(object): diff --git a/test/mypy-py2.sh b/test/mypy-py2.sh index f8e9244..766eb59 100755 --- a/test/mypy-py2.sh +++ b/test/mypy-py2.sh @@ -7,4 +7,4 @@ if [ $? -ne 0 ]; then fi _htmldir="${_cdir}/../html/mypy-py2" mkdir -p "${_htmldir}" -mypy --python-version 2.7 --config-file "${_cdir}/mypy.ini" --html-report "${_htmldir}" "${_cdir}/../ssh-audit.py" +mypy --python-version 2.7 --strict-optional --config-file "${_cdir}/mypy.ini" --html-report "${_htmldir}" "${_cdir}/../ssh-audit.py" diff --git a/test/mypy-py3.sh b/test/mypy-py3.sh index 0d2dfe5..c77ca4b 100755 --- a/test/mypy-py3.sh +++ b/test/mypy-py3.sh @@ -7,4 +7,4 @@ if [ $? -ne 0 ]; then fi _htmldir="${_cdir}/../html/mypy-py3" mkdir -p "${_htmldir}" -mypy --python-version 3.5 --config-file "${_cdir}/mypy.ini" --html-report "${_htmldir}" "${_cdir}/../ssh-audit.py" +mypy --python-version 3.5 --strict-optional --config-file "${_cdir}/mypy.ini" --html-report "${_htmldir}" "${_cdir}/../ssh-audit.py" diff --git a/test/test_socket.py b/test/test_socket.py new file mode 100644 index 0000000..d5c27fc --- /dev/null +++ b/test/test_socket.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +import socket +import pytest + + +# pylint: disable=attribute-defined-outside-init +class TestSocket(object): + @pytest.fixture(autouse=True) + def init(self, ssh_audit): + self.ssh = ssh_audit.SSH + + def test_invalid_host(self, virtual_socket): + with pytest.raises(ValueError): + s = self.ssh.Socket(None, 22) + + def test_invalid_port(self, virtual_socket): + with pytest.raises(ValueError): + s = self.ssh.Socket('localhost', 'abc') + with pytest.raises(ValueError): + s = self.ssh.Socket('localhost', -1) + with pytest.raises(ValueError): + s = self.ssh.Socket('localhost', 0) + with pytest.raises(ValueError): + s = self.ssh.Socket('localhost', 65536) + + def test_not_connected_socket(self, virtual_socket): + sock = self.ssh.Socket('localhost', 22) + banner, header, err = sock.get_banner() + assert banner is None + assert len(header) == 0 + assert err == 'not connected' + s, e = sock.recv() + assert s == -1 + assert e == 'not connected' + s, e = sock.send('nothing') + assert s == -1 + assert e == 'not connected' + s, e = sock.send_packet() + assert s == -1 + assert e == 'not connected'