From 9463aab4f76306d6c4169a4ecbd6c67a0d1bb5eb Mon Sep 17 00:00:00 2001 From: Joe Testa Date: Sat, 13 Jun 2020 11:27:01 -0400 Subject: [PATCH] Disable Python2 tests. Fix pylint warnings. --- .travis.yml | 1 - ssh-audit.py | 117 ++++++++++++++++++++++++--------------------------- tox.ini | 37 ++++++++++------ 3 files changed, 79 insertions(+), 76 deletions(-) diff --git a/.travis.yml b/.travis.yml index 14b61bf..4423249 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,7 +1,6 @@ language: python python: - - "2.7" - "3.5" - "3.6" - "3.7" diff --git a/ssh-audit.py b/ssh-audit.py index 1874a1e..c93d710 100755 --- a/ssh-audit.py +++ b/ssh-audit.py @@ -91,7 +91,7 @@ def usage(err=None): sys.exit(1) -class AuditConf(object): +class AuditConf: # pylint: disable=too-many-instance-attributes def __init__(self, host=None, port=22): # type: (Optional[str], int) -> None @@ -115,10 +115,10 @@ class AuditConf(object): # type: (str, Union[str, int, bool, Sequence[int]]) -> None valid = False if name in ['ssh1', 'ssh2', 'batch', 'client_audit', 'colors', 'verbose', 'timeout_set', 'json']: - valid, value = True, True if bool(value) else False + valid, value = True, bool(value) elif name in ['ipv4', 'ipv6']: valid = False - value = True if bool(value) else False + value = bool(value) ipv = 4 if name == 'ipv4' else 6 if value: value = tuple(list(self.ipvo) + [ipv]) @@ -156,7 +156,7 @@ class AuditConf(object): object.__setattr__(self, name, value) @classmethod - def from_cmdline(cls, args, usage_cb): + def from_cmdline(cls, args, usage_cb): # pylint: disable=too-many-statements # type: (List[str], Callable[..., None]) -> AuditConf # pylint: disable=too-many-branches aconf = cls() @@ -231,7 +231,7 @@ class AuditConf(object): return aconf -class Output(object): +class Output: LEVELS = ('info', 'warn', 'fail') # type: Sequence[str] COLORS = {'head': 36, 'good': 32, 'warn': 33, 'fail': 31} @@ -314,8 +314,8 @@ class OutputBuffer(list): sys.stdout = self.__stdout -class SSH2(object): # pylint: disable=too-few-public-methods - class KexDB(object): # pylint: disable=too-few-public-methods +class SSH2: # pylint: disable=too-few-public-methods + class KexDB: # pylint: disable=too-few-public-methods # pylint: disable=bad-whitespace WARN_OPENSSH74_UNSAFE = 'disabled (in client) since OpenSSH 7.4, unsafe algorithm' WARN_OPENSSH72_LEGACY = 'disabled (in client) since OpenSSH 7.2, legacy algorithm' @@ -534,7 +534,7 @@ class SSH2(object): # pylint: disable=too-few-public-methods } } # type: Dict[str, Dict[str, List[List[Optional[str]]]]] - class KexParty(object): + class KexParty: def __init__(self, enc, mac, compression, languages): # type: (List[text_type], List[text_type], List[text_type], List[text_type]) -> None self.__enc = enc @@ -562,7 +562,7 @@ class SSH2(object): # pylint: disable=too-few-public-methods # type: () -> List[text_type] return self.__languages - class Kex(object): + class Kex: def __init__(self, cookie, kex_algs, key_algs, cli, srv, follows, unused=0): # type: (binary_type, List[text_type], List[text_type], SSH2.KexParty, SSH2.KexParty, bool, int) -> None self.__cookie = cookie @@ -678,7 +678,7 @@ class SSH2(object): # pylint: disable=too-few-public-methods return kex # Obtains host keys, checks their size, and derives their fingerprints. - class HostKeyTest(object): + class HostKeyTest: # Tracks the RSA host key types. As of this writing, testing one in this family yields valid results for the rest. RSA_FAMILY = ['ssh-rsa', 'rsa-sha2-256', 'rsa-sha2-512'] @@ -691,7 +691,7 @@ class SSH2(object): # pylint: disable=too-few-public-methods 'ssh-rsa-cert-v01@openssh.com': {'cert': True, 'variable_key_len': True}, 'ssh-ed25519': {'cert': False, 'variable_key_len': False}, - 'ssh-ed25519-cert-v01@openssh.com': {'cert': True, 'variable_key_len': False}, + 'ssh-ed25519-cert-v01@openssh.com': {'cert': True, 'variable_key_len': False}, } @staticmethod @@ -723,10 +723,10 @@ class SSH2(object): # pylint: disable=too-few-public-methods break if kex_str is not None: - SSH2.HostKeyTest.__test(s, server_kex, kex_str, kex_group, SSH2.HostKeyTest.HOST_KEY_TYPES) + SSH2.HostKeyTest.perform_test(s, server_kex, kex_str, kex_group, SSH2.HostKeyTest.HOST_KEY_TYPES) @staticmethod - def __test(s, server_kex, kex_str, kex_group, host_key_types): + def perform_test(s, server_kex, kex_str, kex_group, host_key_types): hostkey_modulus_size = 0 ca_modulus_size = 0 @@ -745,7 +745,8 @@ class SSH2(object): # pylint: disable=too-few-public-methods if not s.is_connected(): s.connect() unused = None # pylint: disable=unused-variable - unused, unused, err = s.get_banner() + unused2 = None # pylint: disable=unused-variable + unused, unused2, err = s.get_banner() if err is not None: s.close() return @@ -794,7 +795,7 @@ class SSH2(object): # pylint: disable=too-few-public-methods for rsa_type in SSH2.HostKeyTest.RSA_FAMILY: alg_list = SSH2.KexDB.ALGORITHMS['key'][rsa_type] alg_list.append(['using small %d-bit modulus' % hostkey_modulus_size]) - elif (cert is True) and ((hostkey_modulus_size < 2048) or (ca_modulus_size > 0 and ca_modulus_size < 2048)): + elif (cert is True) and ((hostkey_modulus_size < 2048) or (ca_modulus_size > 0 and ca_modulus_size < 2048)): # pylint: disable=chained-comparison alg_list = SSH2.KexDB.ALGORITHMS['key'][host_key_type] min_modulus = min(hostkey_modulus_size, ca_modulus_size) min_modulus = min_modulus if min_modulus > 0 else max(hostkey_modulus_size, ca_modulus_size) @@ -809,18 +810,18 @@ class SSH2(object): # pylint: disable=too-few-public-methods # Performs DH group exchanges to find what moduli are supported, and checks # their size. - class GEXTest(object): + class GEXTest: - # Creates a new connection to the server. Returns an SSH.Socket, or - # None on failure. + # Creates a new connection to the server. Returns True on success, or False. @staticmethod def reconnect(s, gex_alg): if s.is_connected(): - return + return True s.connect() unused = None # pylint: disable=unused-variable - unused, unused, err = s.get_banner() + unused2 = None # pylint: disable=unused-variable + unused, unused2, err = s.get_banner() if err is not None: s.close() return False @@ -873,7 +874,7 @@ class SSH2(object): # pylint: disable=too-few-public-methods # got here, doesn't mean the server is vulnerable... smallest_modulus = kex_group.get_dh_modulus_size() - except Exception: # pylint: disable=bare-except + except Exception: pass finally: s.close() @@ -884,7 +885,7 @@ class SSH2(object): # pylint: disable=too-few-public-methods # If we found one modulus size already, but we're about # to test a larger one, don't bother. - if smallest_modulus > 0 and bits >= smallest_modulus: + if bits >= smallest_modulus > 0: break if SSH2.GEXTest.reconnect(s, gex_alg) is False: @@ -895,7 +896,7 @@ class SSH2(object): # pylint: disable=too-few-public-methods kex_group.send_init_gex(s, bits, bits, bits) kex_group.recv_reply(s, False) smallest_modulus = kex_group.get_dh_modulus_size() - except Exception: # pylint: disable=bare-except + except Exception: # import traceback # print(traceback.format_exc()) pass @@ -927,8 +928,8 @@ class SSH2(object): # pylint: disable=too-few-public-methods break -class SSH1(object): - class CRC32(object): +class SSH1: + class CRC32: def __init__(self): # type: () -> None self._table = [0] * 256 @@ -961,7 +962,7 @@ class SSH1(object): cls._crc32 = cls.CRC32() return cls._crc32.calc(v) - class KexDB(object): # pylint: disable=too-few-public-methods + class KexDB: # pylint: disable=too-few-public-methods # pylint: disable=bad-whitespace FAIL_PLAINTEXT = 'no encryption/integrity' FAIL_OPENSSH37_REMOVE = 'removed since OpenSSH 3.7' @@ -992,7 +993,7 @@ class SSH1(object): } } # type: Dict[str, Dict[str, List[List[Optional[str]]]]] - class PublicKeyMessage(object): + class PublicKeyMessage: def __init__(self, cookie, skey, hkey, pflags, cmask, amask): # type: (binary_type, Tuple[int, int, int], Tuple[int, int, int], int, int, int) -> None if len(skey) != 3: @@ -1122,7 +1123,7 @@ class SSH1(object): return pkm -class ReadBuf(object): +class ReadBuf: def __init__(self, data=None): # type: (Optional[binary_type]) -> None super(ReadBuf, self).__init__() @@ -1195,10 +1196,9 @@ class ReadBuf(object): def reset(self): self._buf = BytesIO() self._len = 0 - super(ReadBuf, self).reset() -class WriteBuf(object): +class WriteBuf: def __init__(self, data=None): # type: (Optional[binary_type]) -> None super(WriteBuf, self).__init__() @@ -1290,8 +1290,8 @@ class WriteBuf(object): self._wbuf = BytesIO() -class SSH(object): # pylint: disable=too-few-public-methods - class Protocol(object): # pylint: disable=too-few-public-methods +class SSH: # pylint: disable=too-few-public-methods + class Protocol: # pylint: disable=too-few-public-methods # pylint: disable=bad-whitespace SMSG_PUBLIC_KEY = 2 MSG_DEBUG = 4 @@ -1304,14 +1304,14 @@ class SSH(object): # pylint: disable=too-few-public-methods MSG_KEXDH_GEX_INIT = 32 MSG_KEXDH_GEX_REPLY = 33 - class Product(object): # pylint: disable=too-few-public-methods + class Product: # pylint: disable=too-few-public-methods OpenSSH = 'OpenSSH' DropbearSSH = 'Dropbear SSH' LibSSH = 'libssh' TinySSH = 'TinySSH' PuTTY = 'PuTTY' - class Software(object): + class Software: def __init__(self, vendor, product, version, patch, os_version): # type: (Optional[str], str, str, Optional[str], Optional[str]) -> None self.__vendor = vendor @@ -1518,7 +1518,7 @@ class SSH(object): # pylint: disable=too-few-public-methods return cls(None, SSH.Product.PuTTY, mx.group(1), None, None) return None - class Banner(object): + class Banner: _RXP, _RXR = r'SSH-\d\.\s*?\d+', r'(-\s*([^\s]*)(?:\s+(.*))?)?' RX_PROTOCOL = re.compile(re.sub(r'\\d(\+?)', r'(\\d\g<1>)', _RXP)) RX_BANNER = re.compile(r'^({0}(?:(?:-{0})*)){1}$'.format(_RXP, _RXR)) @@ -1587,7 +1587,7 @@ class SSH(object): # pylint: disable=too-few-public-methods comments = re.sub(r'\s+', ' ', comments) return cls(protocol, software, comments, valid_ascii) - class Fingerprint(object): + class Fingerprint: def __init__(self, fpd): # type: (binary_type) -> None self.__fpd = fpd @@ -1606,8 +1606,8 @@ class SSH(object): # pylint: disable=too-few-public-methods r = h.decode('ascii').rstrip('=') return u'SHA256:{0}'.format(r) - class Algorithm(object): - class Timeframe(object): + class Algorithm: + class Timeframe: def __init__(self): # type: () -> None self.__storage = {} # type: Dict[str, List[Optional[str]]] @@ -1642,7 +1642,7 @@ class SSH(object): # pylint: disable=too-few-public-methods for_srv, for_cli = pos < 2, pos > 1 for v in (versions or '').split(','): ssh_prod, ssh_ver, is_cli = SSH.Algorithm.get_ssh_version(v) - if (not ssh_ver or (is_cli and for_srv) or (not is_cli and for_cli and ssh_prod in ssh_versions)): + if not ssh_ver or (is_cli and for_srv) or (not is_cli and for_cli and ssh_prod in ssh_versions): continue ssh_versions[ssh_prod] = ssh_ver for ssh_product, ssh_version in ssh_versions.items(): @@ -1696,7 +1696,7 @@ class SSH(object): # pylint: disable=too-few-public-methods return None return 'available since ' + ', '.join(tv).rstrip(', ') - class Algorithms(object): + class Algorithms: def __init__(self, pkm, kex): # type: (Optional[SSH1.PublicKeyMessage], Optional[SSH2.Kex]) -> None self.__ssh1kex = pkm @@ -1883,7 +1883,7 @@ class SSH(object): # pylint: disable=too-few-public-methods del rec[sshv] return software, rec - class Item(object): + class Item: def __init__(self, sshv, db): # type: (int, Dict[str, Dict[str, List[List[Optional[str]]]]]) -> None self.__sshv = sshv @@ -1908,7 +1908,7 @@ class SSH(object): # pylint: disable=too-few-public-methods # type: () -> Iterable[Tuple[str, List[text_type]]] return self.__storage.items() - class Security(object): # pylint: disable=too-few-public-methods + class Security: # pylint: disable=too-few-public-methods # Format: [starting_vuln_version, last_vuln_version, affected, CVE_ID, CVSSv2, description] # affected: 1 = server, 2 = client, 4 = local # Example: if it affects servers, both remote & local, then affected @@ -2097,7 +2097,6 @@ class SSH(object): # pylint: disable=too-few-public-methods self.__sock_map[s.fileno()] = s except Exception: print("Warning: failed to listen on any IPv4 interfaces.") - pass try: # Socket to listen on all IPv6 addresses. @@ -2109,12 +2108,11 @@ class SSH(object): # pylint: disable=too-few-public-methods self.__sock_map[s.fileno()] = s except Exception: print("Warning: failed to listen on any IPv6 interfaces.") - pass # If we failed to listen on any interfaces, terminate. if len(self.__sock_map.keys()) == 0: print("Error: failed to listen on any IPv4 and IPv6 interfaces!") - exit(-1) + sys.exit(-1) # Wait for an incoming connection. If a timeout was explicitly # set by the user, terminate when it elapses. @@ -2132,7 +2130,7 @@ class SSH(object): # pylint: disable=too-few-public-methods if self.__timeout_set and time_elapsed >= self.__timeout: print("Timeout elapsed. Terminating...") - exit(-1) + sys.exit(-1) # Accept the connection. c, addr = self.__sock_map[fds[0][0]].accept() @@ -2304,7 +2302,7 @@ class SSH(object): # pylint: disable=too-few-public-methods # Returns True if this Socket is connected, otherwise False. def is_connected(self): - return (self.__sock is not None) + return self.__sock is not None def close(self): self.__cleanup() @@ -2313,16 +2311,13 @@ class SSH(object): # pylint: disable=too-few-public-methods self.__header = [] self.__banner = None - def reset(self): - super(SSH.Socket, self).reset() - - def _close_socket(self, s): + def _close_socket(self, s): # pylint: disable=no-self-use # type: (Optional[socket.socket]) -> None try: if s is not None: s.shutdown(socket.SHUT_RDWR) s.close() # pragma: nocover - except Exception: # pylint: disable=bare-except + except Exception: pass def __del__(self): @@ -2337,7 +2332,7 @@ class SSH(object): # pylint: disable=too-few-public-methods self.__sock = None -class KexDH(object): # pragma: nocover +class KexDH: # pragma: nocover def __init__(self, kex_name, hash_alg, g, p): # type: (str, int, int) -> None self.__kex_name = kex_name @@ -2384,7 +2379,7 @@ class KexDH(object): # pragma: nocover while packet_type == SSH.Protocol.MSG_DEBUG: packet_type, payload = s.read_packet(2) - if packet_type != -1 and packet_type not in [SSH.Protocol.MSG_KEXDH_REPLY, SSH.Protocol.MSG_KEXDH_GEX_REPLY]: + if packet_type != -1 and packet_type not in [SSH.Protocol.MSG_KEXDH_REPLY, SSH.Protocol.MSG_KEXDH_GEX_REPLY]: # pylint: disable=no-else-raise # TODO: change Exception to something more specific. raise Exception('Expected MSG_KEXDH_REPLY (%d) or MSG_KEXDH_GEX_REPLY (%d), but got %d instead.' % (SSH.Protocol.MSG_KEXDH_REPLY, SSH.Protocol.MSG_KEXDH_GEX_REPLY, packet_type)) elif packet_type == -1: @@ -2635,7 +2630,7 @@ class KexGroupExchange(KexDH): s.send_packet() packet_type, payload = s.read_packet(2) - if (packet_type != SSH.Protocol.MSG_KEXDH_GEX_GROUP) and (packet_type != SSH.Protocol.MSG_DEBUG): + if (packet_type != SSH.Protocol.MSG_KEXDH_GEX_GROUP) and (packet_type != SSH.Protocol.MSG_DEBUG): # pylint: disable=consider-using-in # TODO: replace with a better exception type. raise Exception('Expected MSG_KEXDH_GEX_REPLY (%d), but got %d instead.' % (SSH.Protocol.MSG_KEXDH_GEX_REPLY, packet_type)) @@ -2933,7 +2928,7 @@ def output_recommendations(algs, software, padlen=0): # Output additional information & notes. -def output_info(algs, software, client_audit, any_problems, padlen=0): +def output_info(software, client_audit, any_problems): with OutputBuffer() as obuf: # Tell user that PuTTY cannot be hardened at the protocol-level. if client_audit and (software is not None) and (software.product == SSH.Product.PuTTY): @@ -3008,14 +3003,14 @@ def output(banner, header, client_host=None, kex=None, pkm=None): output_algorithms(title, adb, atype, kex.server.mac, unknown_algorithms, maxlen) output_fingerprints(algs, True) perfect_config = output_recommendations(algs, software, maxlen) - output_info(algs, software, client_audit, not perfect_config) + output_info(software, client_audit, not perfect_config) # If we encountered any unknown algorithms, ask the user to report them. if len(unknown_algorithms) > 0: out.warn("\n\n!!! WARNING: unknown algorithm(s) found!: %s. Please email the full output above to the maintainer (jtesta@positronsecurity.com), or create a Github issue at .\n" % ','.join(unknown_algorithms)) -class Utils(object): +class Utils: @classmethod def _type_err(cls, v, target): # type: (Any, text_type) -> TypeError @@ -3091,12 +3086,12 @@ class Utils(object): @classmethod def is_print_ascii(cls, v): # type: (Union[text_type, str]) -> bool - return cls._is_ascii(v, lambda x: x >= 32 and x <= 126) + return cls._is_ascii(v, lambda x: 126 >= x >= 32) @classmethod def to_print_ascii(cls, v, errors='replace'): # type: (Union[text_type, str], str) -> str - return cls._to_ascii(v, lambda x: x >= 32 and x <= 126, errors) + return cls._to_ascii(v, lambda x: 126 >= x >= 32, errors) @classmethod def unique_seq(cls, seq): @@ -3187,7 +3182,7 @@ def build_struct(banner, kex=None, pkm=None, client_host=None): for host_key_type in host_keys.keys(): if host_key_type in SSH2.HostKeyTest.RSA_FAMILY: val = host_keys[host_key_type] - del(host_keys[host_key_type]) + del host_keys[host_key_type] host_keys['ssh-rsa'] = val for host_key_type in sorted(host_keys): diff --git a/tox.ini b/tox.ini index d94f34b..bd692ae 100644 --- a/tox.ini +++ b/tox.ini @@ -1,6 +1,6 @@ [tox] envlist = - py{27,py,py3}-{test,pylint,flake8,vulture} + py{py3}-{test,pylint,flake8,vulture} py{35,36,37,38}-{test,mypy,pylint,flake8,vulture} cov skipsdist = true @@ -12,9 +12,9 @@ deps = test,cov: {[testenv:cov]deps} test,py{35,36,37,38}-{type,mypy}: colorama py{35,36,37,38}-{type,mypy}: {[testenv:mypy]deps} - py{27,py,py3,35,36,37,38}-{lint,pylint},lint: {[testenv:pylint]deps} - py{27,py,py3,35,36,37,38}-{lint,flake8},lint: {[testenv:flake8]deps} - py{27,py,py3,35,36,37,38}-{lint,vulture},lint: {[testenv:vulture]deps} + py{py3,35,36,37,38}-{lint,pylint},lint: {[testenv:pylint]deps} + py{py3,35,36,37,38}-{lint,flake8},lint: {[testenv:flake8]deps} + py{py3,35,36,37,38}-{lint,vulture},lint: {[testenv:vulture]deps} setenv = SSHAUDIT = {toxinidir}/ssh-audit.py test: COVERAGE_FILE = {toxinidir}/.coverage.{envname} @@ -25,10 +25,11 @@ commands = test: pytest -v --junitxml={toxinidir}/reports/junit.{envname}.xml {posargs:test} test: coverage report --show-missing test: coverage html -d {toxinidir}/reports/html/coverage.{envname} - py{35,36,37,38}-{type,mypy}: {[testenv:mypy]commands} - py{27,py,py3,35,36,37,38}-{lint,pylint},lint: {[testenv:pylint]commands} - py{27,py,py3,35,36,37,38}-{lint,flake8},lint: {[testenv:flake8]commands} - py{27,py,py3,35,36,37,38}-{lint,vulture},lint: {[testenv:vulture]commands} +# Temporarily disable mypy, since types have been ignored since v2.0.0. +# py{35,36,37,38}-{type,mypy}: {[testenv:mypy]commands} + py{py3,35,36,37,38}-{lint,pylint},lint: {[testenv:pylint]commands} + py{py3,35,36,37,38}-{lint,flake8},lint: {[testenv:flake8]commands} + py{py3,35,36,37,38}-{lint,vulture},lint: {[testenv:vulture]commands} ignore_outcome = type: true lint: true @@ -113,12 +114,20 @@ reports = no #output-format = colorized indent-string = \t disable = - locally-disabled, - bad-continuation, - multiple-imports, - invalid-name, - trailing-whitespace, - missing-docstring + bad-continuation, + broad-except, + fixme, + invalid-name, + line-too-long, + missing-docstring, + mixed-indentation, + no-else-return, + too-complex, + too-many-branches, + too-many-instance-attributes, + too-many-lines, + too-many-locals, + too-many-boolean-expressions max-complexity = 15 max-args = 8 max-locals = 20