diff --git a/.appveyor.yml b/.appveyor.yml index 3368e05..f1c0cae 100644 --- a/.appveyor.yml +++ b/.appveyor.yml @@ -8,8 +8,6 @@ branches: environment: matrix: - - PYTHON: "C:\\Python27" - - PYTHON: "C:\\Python27-x64" - PYTHON: "C:\\Python35" - PYTHON: "C:\\Python35-x64" - PYTHON: "C:\\Python36" diff --git a/ssh-audit.py b/ssh-audit.py index c93d710..db7ec10 100755 --- a/ssh-audit.py +++ b/ssh-audit.py @@ -1,5 +1,4 @@ #!/usr/bin/env python3 -# -*- coding: utf-8 -*- """ The MIT License (MIT) @@ -24,7 +23,6 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ -from __future__ import print_function import base64 import binascii import errno @@ -39,26 +37,13 @@ import select import socket import struct import sys - +# pylint: disable=unused-import +from typing import Dict, List, Set, Sequence, Tuple, Iterable +from typing import Callable, Optional, Union, Any VERSION = 'v2.2.1-dev' SSH_HEADER = 'SSH-{0}-OpenSSH_8.0' # SSH software to impersonate -if sys.version_info >= (3,): # pragma: nocover - StringIO, BytesIO = io.StringIO, io.BytesIO - text_type = str - binary_type = bytes -else: # pragma: nocover - import StringIO as _StringIO # pylint: disable=import-error - StringIO = BytesIO = _StringIO.StringIO - text_type = unicode # pylint: disable=undefined-variable # noqa: F821 - binary_type = str -try: # pragma: nocover - # pylint: disable=unused-import - from typing import Dict, List, Set, Sequence, Tuple, Iterable - from typing import Callable, Optional, Union, Any -except ImportError: # pragma: nocover - pass try: # pragma: nocover from colorama import init as colorama_init colorama_init(strip=False) # pragma: nocover @@ -70,10 +55,10 @@ def usage(err=None): # type: (Optional[str]) -> None uout = Output() p = os.path.basename(sys.argv[0]) - uout.head('# {0} {1}, https://github.com/jtesta/ssh-audit\n'.format(p, VERSION)) + uout.head('# {} {}, https://github.com/jtesta/ssh-audit\n'.format(p, VERSION)) if err is not None and len(err) > 0: uout.fail('\n' + err) - uout.info('usage: {0} [-1246pbcnjvlt] \n'.format(p)) + uout.info('usage: {} [-1246pbcnjvlt] \n'.format(p)) uout.info(' -h, --help print this help') uout.info(' -1, --ssh1 force ssh version 1 only') uout.info(' -2, --ssh2 force ssh version 2 only') @@ -139,18 +124,18 @@ class AuditConf: elif name == 'port': valid, port = True, utils.parse_int(value) if port < 1 or port > 65535: - raise ValueError('invalid port: {0}'.format(value)) + raise ValueError('invalid port: {}'.format(value)) value = port elif name in ['level']: if value not in ('info', 'warn', 'fail'): - raise ValueError('invalid level: {0}'.format(value)) + raise ValueError('invalid level: {}'.format(value)) valid = True elif name == 'host': valid = True elif name == 'timeout': value = utils.parse_float(value) if value == -1.0: - raise ValueError('invalid timeout: {0}'.format(value)) + raise ValueError('invalid timeout: {}'.format(value)) valid = True if valid: object.__setattr__(self, name, value) @@ -195,7 +180,7 @@ class AuditConf: aconf.verbose = True elif o in ('-l', '--level'): if a not in ('info', 'warn', 'fail'): - usage_cb('level {0} is not valid'.format(a)) + usage_cb('level {} is not valid'.format(a)) aconf.level = a elif o in ('-t', '--timeout'): aconf.timeout = float(a) @@ -223,7 +208,7 @@ class AuditConf: oport = '2222' port = utils.parse_int(oport) if port <= 0 or port > 65535: - usage_cb('port {0} is not valid'.format(oport)) + usage_cb('port {} is not valid'.format(oport)) aconf.host = host aconf.port = port if not (aconf.ssh1 or aconf.ssh2): @@ -275,27 +260,27 @@ class Output: @staticmethod def _colorized(color): - # type: (str) -> Callable[[text_type], None] - return lambda x: print(u'{0}{1}\033[0m'.format(color, x)) + # type: (str) -> Callable[[str], None] + return lambda x: print(u'{}{}\033[0m'.format(color, x)) def __getattr__(self, name): - # type: (str) -> Callable[[text_type], None] + # type: (str) -> Callable[[str], None] if name == 'head' and self.batch: return lambda x: None if not self.get_level(name) >= self.__level: return lambda x: None if self.use_colors and self.colors_supported and name in self.COLORS: - color = '\033[0;{0}m'.format(self.COLORS[name]) + color = '\033[0;{}m'.format(self.COLORS[name]) return self._colorized(color) else: - return lambda x: print(u'{0}'.format(x)) + return lambda x: print(u'{}'.format(x)) class OutputBuffer(list): def __enter__(self): # type: () -> OutputBuffer # pylint: disable=attribute-defined-outside-init - self.__buf = StringIO() + self.__buf = io.StringIO() self.__stdout = sys.stdout sys.stdout = self.__buf return self @@ -536,7 +521,7 @@ class SSH2: # pylint: disable=too-few-public-methods class KexParty: def __init__(self, enc, mac, compression, languages): - # type: (List[text_type], List[text_type], List[text_type], List[text_type]) -> None + # type: (List[str], List[str], List[str], List[str]) -> None self.__enc = enc self.__mac = mac self.__compression = compression @@ -544,27 +529,27 @@ class SSH2: # pylint: disable=too-few-public-methods @property def encryption(self): - # type: () -> List[text_type] + # type: () -> List[str] return self.__enc @property def mac(self): - # type: () -> List[text_type] + # type: () -> List[str] return self.__mac @property def compression(self): - # type: () -> List[text_type] + # type: () -> List[str] return self.__compression @property def languages(self): - # type: () -> List[text_type] + # type: () -> List[str] return self.__languages class Kex: def __init__(self, cookie, kex_algs, key_algs, cli, srv, follows, unused=0): - # type: (binary_type, List[text_type], List[text_type], SSH2.KexParty, SSH2.KexParty, bool, int) -> None + # type: (bytes, List[str], List[str], SSH2.KexParty, SSH2.KexParty, bool, int) -> None self.__cookie = cookie self.__kex_algs = kex_algs self.__key_algs = key_algs @@ -579,17 +564,17 @@ class SSH2: # pylint: disable=too-few-public-methods @property def cookie(self): - # type: () -> binary_type + # type: () -> bytes return self.__cookie @property def kex_algorithms(self): - # type: () -> List[text_type] + # type: () -> List[str] return self.__kex_algs @property def key_algorithms(self): - # type: () -> List[text_type] + # type: () -> List[str] return self.__key_algs # client_to_server @@ -650,14 +635,14 @@ class SSH2: # pylint: disable=too-few-public-methods @property def payload(self): - # type: () -> binary_type + # type: () -> bytes wbuf = WriteBuf() self.write(wbuf) return wbuf.write_flush() @classmethod def parse(cls, payload): - # type: (binary_type) -> SSH2.Kex + # type: (bytes) -> SSH2.Kex buf = ReadBuf(payload) cookie = buf.read(16) kex_algs = buf.read_list() @@ -943,7 +928,7 @@ class SSH1: self._table[i] = crc def calc(self, v): - # type: (binary_type) -> int + # type: (bytes) -> int crc, length = 0, len(v) for i in range(length): n = ord(v[i:i + 1]) @@ -957,7 +942,7 @@ class SSH1: @classmethod def crc32(cls, v): - # type: (binary_type) -> int + # type: (bytes) -> int if cls._crc32 is None: cls._crc32 = cls.CRC32() return cls._crc32.calc(v) @@ -995,11 +980,11 @@ class SSH1: class PublicKeyMessage: def __init__(self, cookie, skey, hkey, pflags, cmask, amask): - # type: (binary_type, Tuple[int, int, int], Tuple[int, int, int], int, int, int) -> None + # type: (bytes, Tuple[int, int, int], Tuple[int, int, int], int, int, int) -> None if len(skey) != 3: - raise ValueError('invalid server key pair: {0}'.format(skey)) + raise ValueError('invalid server key pair: {}'.format(skey)) if len(hkey) != 3: - raise ValueError('invalid host key pair: {0}'.format(hkey)) + raise ValueError('invalid host key pair: {}'.format(hkey)) self.__cookie = cookie self.__server_key = skey self.__host_key = hkey @@ -1009,7 +994,7 @@ class SSH1: @property def cookie(self): - # type: () -> binary_type + # type: () -> bytes return self.__cookie @property @@ -1044,7 +1029,7 @@ class SSH1: @property def host_key_fingerprint_data(self): - # type: () -> binary_type + # type: () -> bytes # pylint: disable=protected-access mod = WriteBuf._create_mpint(self.host_key_public_modulus, False) e = WriteBuf._create_mpint(self.host_key_public_exponent, False) @@ -1062,7 +1047,7 @@ class SSH1: @property def supported_ciphers(self): - # type: () -> List[text_type] + # type: () -> List[str] ciphers = [] for i in range(len(SSH1.CIPHERS)): if self.__supported_ciphers_mask & (1 << i) != 0: @@ -1076,7 +1061,7 @@ class SSH1: @property def supported_authentications(self): - # type: () -> List[text_type] + # type: () -> List[str] auths = [] for i in range(1, len(SSH1.AUTHS)): if self.__supported_authentications_mask & (1 << i) != 0: @@ -1098,14 +1083,14 @@ class SSH1: @property def payload(self): - # type: () -> binary_type + # type: () -> bytes wbuf = WriteBuf() self.write(wbuf) return wbuf.write_flush() @classmethod def parse(cls, payload): - # type: (binary_type) -> SSH1.PublicKeyMessage + # type: (bytes) -> SSH1.PublicKeyMessage buf = ReadBuf(payload) cookie = buf.read(8) server_key_bits = buf.read_int() @@ -1125,9 +1110,9 @@ class SSH1: class ReadBuf: def __init__(self, data=None): - # type: (Optional[binary_type]) -> None + # type: (Optional[bytes]) -> None super(ReadBuf, self).__init__() - self._buf = BytesIO(data) if data is not None else BytesIO() + self._buf = io.BytesIO(data) if data is not None else io.BytesIO() self._len = len(data) if data is not None else 0 @property @@ -1136,7 +1121,7 @@ class ReadBuf: return self._len - self._buf.tell() def read(self, size): - # type: (int) -> binary_type + # type: (int) -> bytes return self._buf.read(size) def read_byte(self): @@ -1154,18 +1139,18 @@ class ReadBuf: return v def read_list(self): - # type: () -> List[text_type] + # type: () -> List[str] list_size = self.read_int() return self.read(list_size).decode('utf-8', 'replace').split(',') def read_string(self): - # type: () -> binary_type + # type: () -> bytes n = self.read_int() return self.read(n) @classmethod def _parse_mpint(cls, v, pad, f): - # type: (binary_type, binary_type, str) -> int + # type: (bytes, bytes, str) -> int r = 0 if len(v) % 4 != 0: v = pad * (4 - (len(v) % 4)) + v @@ -1190,22 +1175,22 @@ class ReadBuf: return self._parse_mpint(v, pad, f) def read_line(self): - # type: () -> text_type + # type: () -> str return self._buf.readline().rstrip().decode('utf-8', 'replace') def reset(self): - self._buf = BytesIO() + self._buf = io.BytesIO() self._len = 0 class WriteBuf: def __init__(self, data=None): - # type: (Optional[binary_type]) -> None + # type: (Optional[bytes]) -> None super(WriteBuf, self).__init__() - self._wbuf = BytesIO(data) if data is not None else BytesIO() + self._wbuf = io.BytesIO(data) if data is not None else io.BytesIO() def write(self, data): - # type: (binary_type) -> WriteBuf + # type: (bytes) -> WriteBuf self._wbuf.write(data) return self @@ -1222,14 +1207,14 @@ class WriteBuf: return self.write(struct.pack('>I', v)) def write_string(self, v): - # type: (Union[binary_type, text_type]) -> WriteBuf + # type: (Union[bytes, str]) -> WriteBuf if not isinstance(v, bytes): v = bytes(bytearray(v, 'utf-8')) self.write_int(len(v)) return self.write(v) def write_list(self, v): - # type: (List[text_type]) -> WriteBuf + # type: (List[str]) -> WriteBuf return self.write_string(u','.join(v)) @classmethod @@ -1242,12 +1227,12 @@ class WriteBuf: @classmethod def _create_mpint(cls, n, signed=True, bits=None): - # type: (int, bool, Optional[int]) -> binary_type + # type: (int, bool, Optional[int]) -> bytes if bits is None: bits = cls._bitlength(n) length = bits // 8 + (1 if n != 0 else 0) ql = (length + 7) // 8 - fmt, v2 = '>{0}Q'.format(ql), [0] * ql + fmt, v2 = '>{}Q'.format(ql), [0] * ql for i in range(ql): v2[ql - i - 1] = n & 0xffffffffffffffff n >>= 64 @@ -1273,21 +1258,21 @@ class WriteBuf: return self.write_string(data) def write_line(self, v): - # type: (Union[binary_type, str]) -> WriteBuf + # type: (Union[bytes, str]) -> WriteBuf if not isinstance(v, bytes): v = bytes(bytearray(v, 'utf-8')) v += b'\r\n' return self.write(v) def write_flush(self): - # type: () -> binary_type + # type: () -> bytes payload = self._wbuf.getvalue() self._wbuf.truncate(0) self._wbuf.seek(0) return payload def reset(self): - self._wbuf = BytesIO() + self._wbuf = io.BytesIO() class SSH: # pylint: disable=too-few-public-methods @@ -1346,12 +1331,12 @@ class SSH: # pylint: disable=too-few-public-methods return self.__os def compare_version(self, other): - # type: (Union[None, SSH.Software, text_type]) -> int + # type: (Union[None, SSH.Software, str]) -> int # pylint: disable=too-many-branches if other is None: return 1 if isinstance(other, SSH.Software): - other = '{0}{1}'.format(other.version, other.patch or '') + other = '{}{}'.format(other.version, other.patch or '') else: other = str(other) mx = re.match(r'^([\d\.]+\d+)(.*)$', other) @@ -1366,9 +1351,9 @@ class SSH: # pylint: disable=too-few-public-methods spatch = self.patch or '' if self.product == SSH.Product.DropbearSSH: if not re.match(r'^test\d.*$', opatch): - opatch = 'z{0}'.format(opatch) + opatch = 'z{}'.format(opatch) if not re.match(r'^test\d.*$', spatch): - spatch = 'z{0}'.format(spatch) + spatch = 'z{}'.format(spatch) elif self.product == SSH.Product.OpenSSH: mx1 = re.match(r'^p\d(.*)', opatch) mx2 = re.match(r'^p\d(.*)', spatch) @@ -1393,10 +1378,10 @@ class SSH: # pylint: disable=too-few-public-methods def display(self, full=True): # type: (bool) -> str - r = '{0} '.format(self.vendor) if bool(self.vendor) else '' + r = '{} '.format(self.vendor) if bool(self.vendor) else '' r += self.product if bool(self.version): - r += ' {0}'.format(self.version) + r += ' {}'.format(self.version) if full: patch = self.patch or '' if self.product == SSH.Product.OpenSSH: @@ -1405,9 +1390,9 @@ class SSH: # pylint: disable=too-few-public-methods r += mx.group(1) patch = mx.group(2).strip() if bool(patch): - r += ' ({0})'.format(patch) + r += ' ({})'.format(patch) if bool(self.os): - r += ' running on {0}'.format(self.os) + r += ' running on {}'.format(self.os) return r def __str__(self): @@ -1416,15 +1401,15 @@ class SSH: # pylint: disable=too-few-public-methods def __repr__(self): # type: () -> str - r = 'vendor={0}, '.format(self.vendor) if bool(self.vendor) else '' - r += 'product={0}'.format(self.product) + r = 'vendor={}, '.format(self.vendor) if bool(self.vendor) else '' + r += 'product={}'.format(self.product) if bool(self.version): - r += ', version={0}'.format(self.version) + r += ', version={}'.format(self.version) if bool(self.patch): - r += ', patch={0}'.format(self.patch) + r += ', patch={}'.format(self.patch) if bool(self.os): - r += ', os={0}'.format(self.os) - return '<{0}({1})>'.format(self.__class__.__name__, r) + r += ', os={}'.format(self.os) + return '<{}({})>'.format(self.__class__.__name__, r) @staticmethod def _fix_patch(patch): @@ -1435,7 +1420,7 @@ class SSH: # pylint: disable=too-few-public-methods def _fix_date(d): # type: (str) -> Optional[str] if d is not None and len(d) == 8: - return '{0}-{1}-{2}'.format(d[:4], d[4:6], d[6:8]) + return '{}-{}-{}'.format(d[:4], d[4:6], d[6:8]) else: return None @@ -1447,19 +1432,19 @@ class SSH: # pylint: disable=too-few-public-methods mx = re.match(r'^NetBSD(?:_Secure_Shell)?(?:[\s-]+(\d{8})(.*))?$', c) if bool(mx): d = cls._fix_date(mx.group(1)) - return 'NetBSD' if d is None else 'NetBSD ({0})'.format(d) + return 'NetBSD' if d is None else 'NetBSD ({})'.format(d) mx = re.match(r'^FreeBSD(?:\slocalisations)?[\s-]+(\d{8})(.*)$', c) if not bool(mx): mx = re.match(r'^[^@]+@FreeBSD\.org[\s-]+(\d{8})(.*)$', c) if bool(mx): d = cls._fix_date(mx.group(1)) - return 'FreeBSD' if d is None else 'FreeBSD ({0})'.format(d) + return 'FreeBSD' if d is None else 'FreeBSD ({})'.format(d) w = ['RemotelyAnywhere', 'DesktopAuthority', 'RemoteSupportManager'] for win_soft in w: mx = re.match(r'^in ' + win_soft + r' ([\d\.]+\d)$', c) if bool(mx): ver = mx.group(1) - return 'Microsoft Windows ({0} {1})'.format(win_soft, ver) + return 'Microsoft Windows ({} {})'.format(win_soft, ver) generic = ['NetBSD', 'FreeBSD'] for g in generic: if c.startswith(g) or c.endswith(g): @@ -1552,26 +1537,26 @@ class SSH: # pylint: disable=too-few-public-methods def __str__(self): # type: () -> str - r = 'SSH-{0}.{1}'.format(self.protocol[0], self.protocol[1]) + r = 'SSH-{}.{}'.format(self.protocol[0], self.protocol[1]) if self.software is not None: - r += '-{0}'.format(self.software) + r += '-{}'.format(self.software) if bool(self.comments): - r += ' {0}'.format(self.comments) + r += ' {}'.format(self.comments) return r def __repr__(self): # type: () -> str - p = '{0}.{1}'.format(self.protocol[0], self.protocol[1]) - r = 'protocol={0}'.format(p) + p = '{}.{}'.format(self.protocol[0], self.protocol[1]) + r = 'protocol={}'.format(p) if self.software is not None: - r += ', software={0}'.format(self.software) + r += ', software={}'.format(self.software) if bool(self.comments): - r += ', comments={0}'.format(self.comments) - return '<{0}({1})>'.format(self.__class__.__name__, r) + r += ', comments={}'.format(self.comments) + return '<{}({})>'.format(self.__class__.__name__, r) @classmethod def parse(cls, banner): - # type: (text_type) -> Optional[SSH.Banner] + # type: (str) -> Optional[SSH.Banner] valid_ascii = utils.is_print_ascii(banner) ascii_banner = utils.to_print_ascii(banner) mx = cls.RX_BANNER.match(ascii_banner) @@ -1589,22 +1574,22 @@ class SSH: # pylint: disable=too-few-public-methods class Fingerprint: def __init__(self, fpd): - # type: (binary_type) -> None + # type: (bytes) -> None self.__fpd = fpd @property def md5(self): - # type: () -> text_type + # type: () -> str h = hashlib.md5(self.__fpd).hexdigest() r = u':'.join(h[i:i + 2] for i in range(0, len(h), 2)) - return u'MD5:{0}'.format(r) + return u'MD5:{}'.format(r) @property def sha256(self): - # type: () -> text_type + # type: () -> str h = base64.b64encode(hashlib.sha256(self.__fpd).digest()) r = h.decode('ascii').rstrip('=') - return u'SHA256:{0}'.format(r) + return u'SHA256:{}'.format(r) class Algorithm: class Timeframe: @@ -1679,7 +1664,7 @@ class SSH: # pylint: disable=too-few-public-methods @classmethod def get_since_text(cls, versions): - # type: (List[Optional[str]]) -> Optional[text_type] + # type: (List[Optional[str]]) -> Optional[str] tv = [] if len(versions) == 0 or versions[0] is None: return None @@ -1690,8 +1675,8 @@ class SSH: # pylint: disable=too-few-public-methods if ssh_prod in [SSH.Product.LibSSH]: continue if is_cli: - ssh_ver = '{0} (client only)'.format(ssh_ver) - tv.append('{0} {1}'.format(ssh_prod, ssh_ver)) + ssh_ver = '{} (client only)'.format(ssh_ver) + tv.append('{} {}'.format(ssh_prod, ssh_ver)) if len(tv) == 0: return None return 'available since ' + ', '.join(tv).rstrip(', ') @@ -1746,7 +1731,7 @@ class SSH: # pylint: disable=too-few-public-methods def maxlen(self): # type: () -> int def _ml(items): - # type: (Sequence[text_type]) -> int + # type: (Sequence[str]) -> int return max(len(i) for i in items) maxlen = 0 if self.ssh1kex is not None: @@ -1888,7 +1873,7 @@ class SSH: # pylint: disable=too-few-public-methods # type: (int, Dict[str, Dict[str, List[List[Optional[str]]]]]) -> None self.__sshv = sshv self.__db = db - self.__storage = {} # type: Dict[str, List[text_type]] + self.__storage = {} # type: Dict[str, List[str]] @property def sshv(self): @@ -1901,11 +1886,11 @@ class SSH: # pylint: disable=too-few-public-methods return self.__db def add(self, key, value): - # type: (str, List[text_type]) -> None + # type: (str, List[str]) -> None self.__storage[key] = value def items(self): - # type: () -> Iterable[Tuple[str, List[text_type]]] + # type: () -> Iterable[Tuple[str, List[str]]] return self.__storage.items() class Security: # pylint: disable=too-few-public-methods @@ -2043,13 +2028,13 @@ class SSH: # pylint: disable=too-few-public-methods self.__sock_map = {} self.__block_size = 8 self.__state = 0 - self.__header = [] # type: List[text_type] + self.__header = [] # type: List[str] self.__banner = None # type: Optional[SSH.Banner] if host is None: raise ValueError('undefined host') nport = utils.parse_int(port) if nport < 1 or nport > 65535: - raise ValueError('invalid port: {0}'.format(port)) + raise ValueError('invalid port: {}'.format(port)) self.__host = host self.__port = nport if ipvo is not None: @@ -2081,7 +2066,7 @@ class SSH: # pylint: disable=too-few-public-methods if not check or socktype == socket.SOCK_STREAM: yield af, addr except socket.error as e: - out.fail('[exception] {0}'.format(e)) + out.fail('[exception] {}'.format(e)) sys.exit(1) # Listens on a server socket and accepts one connection (used for @@ -2154,15 +2139,15 @@ class SSH: # pylint: disable=too-few-public-methods err = e self._close_socket(s) if err is None: - errm = 'host {0} has no DNS records'.format(self.__host) + errm = 'host {} has no DNS records'.format(self.__host) else: errt = (self.__host, self.__port, err) - errm = 'cannot connect to {0} port {1}: {2}'.format(*errt) - out.fail('[exception] {0}'.format(errm)) + errm = 'cannot connect to {} port {}: {}'.format(*errt) + out.fail('[exception] {}'.format(errm)) sys.exit(1) def get_banner(self, sshv=2): - # type: (int) -> Tuple[Optional[SSH.Banner], List[text_type], Optional[str]] + # type: (int) -> Tuple[Optional[SSH.Banner], List[str], Optional[str]] if self.__sock is None: return self.__banner, self.__header, 'not connected' banner = SSH_HEADER.format('1.5' if sshv == 1 else '2.0') @@ -2214,7 +2199,7 @@ class SSH: # pylint: disable=too-few-public-methods return len(data), None def send(self, data): - # type: (binary_type) -> Tuple[int, Optional[str]] + # type: (bytes) -> Tuple[int, Optional[str]] if self.__sock is None: return -1, 'not connected' try: @@ -2238,7 +2223,7 @@ class SSH: # pylint: disable=too-few-public-methods raise SSH.Socket.InsufficientReadException(e) def read_packet(self, sshv=2): - # type: (int) -> Tuple[int, binary_type] + # type: (int) -> Tuple[int, bytes] try: header = WriteBuf() self.ensure_read(4) @@ -2669,7 +2654,7 @@ class KexGroupExchange_SHA256(KexGroupExchange): def output_algorithms(title, alg_db, alg_type, algorithms, unknown_algs, maxlen=0, alg_sizes=None): - # type: (str, Dict[str, Dict[str, List[List[Optional[str]]]]], str, List[text_type], int) -> None + # type: (str, Dict[str, Dict[str, List[List[Optional[str]]]]], str, List[str], int) -> None with OutputBuffer() as obuf: for algorithm in algorithms: output_algorithm(alg_db, alg_type, algorithm, unknown_algs, maxlen, alg_sizes) @@ -2680,7 +2665,7 @@ def output_algorithms(title, alg_db, alg_type, algorithms, unknown_algs, maxlen= def output_algorithm(alg_db, alg_type, alg_name, unknown_algs, alg_max_len=0, alg_sizes=None): - # type: (Dict[str, Dict[str, List[List[Optional[str]]]]], str, text_type, int) -> None + # type: (Dict[str, Dict[str, List[List[Optional[str]]]]], str, str, int) -> None prefix = '(' + alg_type + ') ' if alg_max_len == 0: alg_max_len = len(alg_name) @@ -2758,9 +2743,9 @@ def output_compatibility(algs, client_audit, for_server=True): if v_from is None: continue if v_till is None: - comp_text.append('{0} {1}+'.format(ssh_prod, v_from)) + comp_text.append('{} {}+'.format(ssh_prod, v_from)) elif v_from == v_till: - comp_text.append('{0} {1}'.format(ssh_prod, v_from)) + comp_text.append('{} {}'.format(ssh_prod, v_from)) else: software = SSH.Software(None, ssh_prod, v_from, None, None) if software.compare_version(v_till) > 0: @@ -2797,10 +2782,10 @@ def output_security_sub(sub, software, client_audit, padlen): out_func = out.warn if cvss >= 8.0: out_func = out.fail - out_func('(cve) {0}{1} -- (CVSSv2: {2}) {3}'.format(name, p, cvss, descr)) + out_func('(cve) {}{} -- (CVSSv2: {}) {}'.format(name, p, cvss, descr)) else: descr = line[4] - out.fail('(sec) {0}{1} -- {2}'.format(name, p, descr)) + out.fail('(sec) {}{} -- {}'.format(name, p, descr)) def output_security(banner, client_audit, padlen): @@ -2847,7 +2832,7 @@ def output_fingerprints(algs, sha256=True): fpo = fp.sha256 if sha256 else fp.md5 # p = '' if out.batch else ' ' * (padlen - len(name)) # out.good('(fin) {0}{1} -- {2} {3}'.format(name, p, bits, fpo)) - out.good('(fin) {0}: {1}'.format(name, fpo)) + out.good('(fin) {}: {}'.format(name, fpo)) if len(obuf) > 0: out.head('# fingerprints') obuf.flush() @@ -2913,15 +2898,15 @@ def output_recommendations(algs, software, padlen=0): an, sg, fn = 'change', '!', out.fail ret = False chg_additional_info = ' (increase modulus size to 2048 bits or larger)' - b = '(SSH{0})'.format(sshv) if sshv == 1 else '' + b = '(SSH{})'.format(sshv) if sshv == 1 else '' fm = '(rec) {0}{1}{2}-- {3} algorithm to {4}{5} {6}' fn(fm.format(sg, name, p, alg_type, an, chg_additional_info, b)) if len(obuf) > 0: if software is not None: - title = '(for {0})'.format(software.display(False)) + title = '(for {})'.format(software.display(False)) else: title = '' - out.head('# algorithm recommendations {0}'.format(title)) + out.head('# algorithm recommendations {}'.format(title)) obuf.flush(True) # Sort the output so that it is always stable (needed for repeatable testing). out.sep() return ret @@ -2945,17 +2930,17 @@ def output_info(software, client_audit, any_problems): def output(banner, header, client_host=None, kex=None, pkm=None): - # type: (Optional[SSH.Banner], List[text_type], Optional[SSH2.Kex], Optional[SSH1.PublicKeyMessage]) -> None + # type: (Optional[SSH.Banner], List[str], Optional[SSH2.Kex], Optional[SSH1.PublicKeyMessage]) -> None client_audit = client_host is not None # If set, this is a client audit. sshv = 1 if pkm is not None else 2 algs = SSH.Algorithms(pkm, kex) with OutputBuffer() as obuf: if client_audit: - out.good('(gen) client IP: {0}'.format(client_host)) + out.good('(gen) client IP: {}'.format(client_host)) if len(header) > 0: out.info('(gen) header: ' + '\n'.join(header)) if banner is not None: - out.good('(gen) banner: {0}'.format(banner)) + out.good('(gen) banner: {}'.format(banner)) if not banner.valid_ascii: # NOTE: RFC 4253, Section 4.2 out.warn('(gen) banner contains non-printable ASCII') @@ -2963,17 +2948,17 @@ def output(banner, header, client_host=None, kex=None, pkm=None): out.fail('(gen) protocol SSH1 enabled') software = SSH.Software.parse(banner) if software is not None: - out.good('(gen) software: {0}'.format(software)) + out.good('(gen) software: {}'.format(software)) else: software = None output_compatibility(algs, client_audit) if kex is not None: compressions = [x for x in kex.server.compression if x != 'none'] if len(compressions) > 0: - cmptxt = 'enabled ({0})'.format(', '.join(compressions)) + cmptxt = 'enabled ({})'.format(', '.join(compressions)) else: cmptxt = 'disabled' - out.good('(gen) compression: {0}'.format(cmptxt)) + out.good('(gen) compression: {}'.format(cmptxt)) if len(obuf) > 0: out.head('# general') obuf.flush() @@ -3013,43 +2998,41 @@ def output(banner, header, client_host=None, kex=None, pkm=None): class Utils: @classmethod def _type_err(cls, v, target): - # type: (Any, text_type) -> TypeError - return TypeError('cannot convert {0} to {1}'.format(type(v), target)) + # type: (Any, str) -> TypeError + return TypeError('cannot convert {} to {}'.format(type(v), target)) @classmethod def to_bytes(cls, v, enc='utf-8'): - # type: (Union[binary_type, text_type], str) -> binary_type - if isinstance(v, binary_type): + # type: (Union[bytes, str], str) -> bytes + if isinstance(v, bytes): return v - elif isinstance(v, text_type): + elif isinstance(v, str): return v.encode(enc) raise cls._type_err(v, 'bytes') @classmethod def to_utext(cls, v, enc='utf-8'): - # type: (Union[text_type, binary_type], str) -> text_type - if isinstance(v, text_type): + # type: (Union[str, bytes], str) -> str + if isinstance(v, str): return v - elif isinstance(v, binary_type): + elif isinstance(v, bytes): return v.decode(enc) raise cls._type_err(v, 'unicode text') @classmethod def to_ntext(cls, v, enc='utf-8'): - # type: (Union[text_type, binary_type], str) -> str + # type: (Union[str, bytes], str) -> str if isinstance(v, str): return v - elif isinstance(v, text_type): - return v.encode(enc) # PY2 only - elif isinstance(v, binary_type): - return v.decode(enc) # PY3 only + elif isinstance(v, bytes): + return v.decode(enc) raise cls._type_err(v, 'native text') @classmethod def _is_ascii(cls, v, char_filter=lambda x: x <= 127): - # type: (Union[text_type, str], Callable[[int], bool]) -> bool + # type: (str, Callable[[int], bool]) -> bool r = False - if isinstance(v, (text_type, str)): + if isinstance(v, str): for c in v: i = cls.ctoi(c) if not char_filter(i): @@ -3059,8 +3042,8 @@ class Utils: @classmethod def _to_ascii(cls, v, char_filter=lambda x: x <= 127, errors='replace'): - # type: (Union[text_type, str], Callable[[int], bool], str) -> str - if isinstance(v, (text_type, str)): + # type: (str, Callable[[int], bool], str) -> str + if isinstance(v, str): r = bytearray() for c in v: i = cls.ctoi(c) @@ -3075,22 +3058,22 @@ class Utils: @classmethod def is_ascii(cls, v): - # type: (Union[text_type, str]) -> bool + # type: (str) -> bool return cls._is_ascii(v) @classmethod def to_ascii(cls, v, errors='replace'): - # type: (Union[text_type, str], str) -> str + # type: (str, str) -> str return cls._to_ascii(v, errors=errors) @classmethod def is_print_ascii(cls, v): - # type: (Union[text_type, str]) -> bool + # type: (str) -> bool return cls._is_ascii(v, lambda x: 126 >= x >= 32) @classmethod def to_print_ascii(cls, v, errors='replace'): - # type: (Union[text_type, str], str) -> str + # type: (str, str) -> str return cls._to_ascii(v, lambda x: 126 >= x >= 32, errors) @classmethod @@ -3110,8 +3093,8 @@ class Utils: @classmethod def ctoi(cls, c): - # type: (Union[text_type, str, int]) -> int - if isinstance(c, (text_type, str)): + # type: (Union[str, int]) -> int + if isinstance(c, str): return ord(c[0]) else: return c @@ -3230,7 +3213,7 @@ def audit(aconf, sshv=None): if err is None: err = '[exception] did not receive banner.' else: - err = '[exception] did not receive banner: {0}'.format(err) + err = '[exception] did not receive banner: {}'.format(err) if err is None: packet_type, payload = s.read_packet(sshv) if packet_type < 0: @@ -3240,12 +3223,12 @@ def audit(aconf, sshv=None): else: payload_txt = u'empty' except UnicodeDecodeError: - payload_txt = u'"{0}"'.format(repr(payload).lstrip('b')[1:-1]) + payload_txt = u'"{}"'.format(repr(payload).lstrip('b')[1:-1]) if payload_txt == u'Protocol major versions differ.': if sshv == 2 and aconf.ssh1: audit(aconf, 1) return - err = '[exception] error reading packet ({0})'.format(payload_txt) + err = '[exception] error reading packet ({})'.format(payload_txt) else: err_pair = None if sshv == 1 and packet_type != SSH.Protocol.SMSG_PUBLIC_KEY: diff --git a/test/conftest.py b/test/conftest.py index 22b01f9..a451984 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- import os import io import sys @@ -7,13 +5,6 @@ import socket import pytest -if sys.version_info[0] == 2: - import StringIO # pylint: disable=import-error - StringIO = StringIO.StringIO -else: - StringIO = io.StringIO - - @pytest.fixture(scope='module') def ssh_audit(): __rdir = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..') @@ -24,7 +15,7 @@ def ssh_audit(): # pylint: disable=attribute-defined-outside-init class _OutputSpy(list): def begin(self): - self.__out = StringIO() + self.__out = io.StringIO() self.__old_stdout = sys.stdout sys.stdout = self.__out @@ -40,7 +31,7 @@ def output_spy(): return _OutputSpy() -class _VirtualGlobalSocket(object): +class _VirtualGlobalSocket: def __init__(self, vsocket): self.vsocket = vsocket self.addrinfodata = {} @@ -59,7 +50,7 @@ class _VirtualGlobalSocket(object): return self.vsocket def getaddrinfo(self, host, port, family=0, socktype=0, proto=0, flags=0): - key = '{0}#{1}'.format(host, port) + key = '{}#{}'.format(host, port) if key in self.addrinfodata: data = self.addrinfodata[key] if isinstance(data, Exception): @@ -75,7 +66,7 @@ class _VirtualGlobalSocket(object): return [] -class _VirtualSocket(object): +class _VirtualSocket: def __init__(self): self.sock_address = ('127.0.0.1', 0) self.peer_address = None @@ -108,7 +99,7 @@ class _VirtualSocket(object): def getpeername(self): if self.peer_address is None or not self._connected: - raise socket.error(57, 'Socket is not connected') + raise OSError(57, 'Socket is not connected') return self.peer_address def getsockname(self): @@ -131,7 +122,7 @@ class _VirtualSocket(object): def recv(self, bufsize, flags=0): # pylint: disable=unused-argument if not self._connected: - raise socket.error(54, 'Connection reset by peer') + raise OSError(54, 'Connection reset by peer') if not len(self.rdata) > 0: return b'' data = self.rdata.pop(0) @@ -141,7 +132,7 @@ class _VirtualSocket(object): def send(self, data): if self.peer_address is None or not self._connected: - raise socket.error(32, 'Broken pipe') + raise OSError(32, 'Broken pipe') self._check_err('send') self.sdata.append(data) diff --git a/test/test_auditconf.py b/test/test_auditconf.py index fc29e77..a41c4f4 100644 --- a/test/test_auditconf.py +++ b/test/test_auditconf.py @@ -1,10 +1,8 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- import pytest # pylint: disable=attribute-defined-outside-init -class TestAuditConf(object): +class TestAuditConf: @pytest.fixture(autouse=True) def init(self, ssh_audit): self.AuditConf = ssh_audit.AuditConf diff --git a/test/test_banner.py b/test/test_banner.py index 2ba6ec2..7a02630 100644 --- a/test/test_banner.py +++ b/test/test_banner.py @@ -1,10 +1,8 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- import pytest # pylint: disable=line-too-long,attribute-defined-outside-init -class TestBanner(object): +class TestBanner: @pytest.fixture(autouse=True) def init(self, ssh_audit): self.ssh = ssh_audit.SSH diff --git a/test/test_buffer.py b/test/test_buffer.py index 62bb621..b4efc33 100644 --- a/test/test_buffer.py +++ b/test/test_buffer.py @@ -1,11 +1,9 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- import re import pytest # pylint: disable=attribute-defined-outside-init,bad-whitespace -class TestBuffer(object): +class TestBuffer: @pytest.fixture(autouse=True) def init(self, ssh_audit): self.rbuf = ssh_audit.ReadBuf @@ -61,7 +59,7 @@ class TestBuffer(object): def test_string(self): w = lambda x: self.wbuf().write_string(x).write_flush() # noqa r = lambda x: self.rbuf(x).read_string() # noqa - tc = [(u'abc1', '00 00 00 04 61 62 63 31'), + tc = [('abc1', '00 00 00 04 61 62 63 31'), (b'abc2', '00 00 00 04 61 62 63 32')] for p in tc: v = p[0] @@ -87,7 +85,7 @@ class TestBuffer(object): def test_line(self): w = lambda x: self.wbuf().write_line(x).write_flush() # noqa r = lambda x: self.rbuf(x).read_line() # noqa - tc = [(u'example line', '65 78 61 6d 70 6c 65 20 6c 69 6e 65 0d 0a')] + tc = [('example line', '65 78 61 6d 70 6c 65 20 6c 69 6e 65 0d 0a')] for p in tc: assert w(p[0]) == self._b(p[1]) assert r(self._b(p[1])) == p[0] diff --git a/test/test_errors.py b/test/test_errors.py index acbb561..d99f405 100644 --- a/test/test_errors.py +++ b/test/test_errors.py @@ -1,12 +1,10 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- import socket import errno import pytest # pylint: disable=attribute-defined-outside-init -class TestErrors(object): +class TestErrors: @pytest.fixture(autouse=True) def init(self, ssh_audit): self.AuditConf = ssh_audit.AuditConf diff --git a/test/test_output.py b/test/test_output.py index 6301263..ac08a52 100644 --- a/test/test_output.py +++ b/test/test_output.py @@ -1,11 +1,8 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -from __future__ import print_function import pytest # pylint: disable=attribute-defined-outside-init -class TestOutput(object): +class TestOutput: @pytest.fixture(autouse=True) def init(self, ssh_audit): self.Output = ssh_audit.Output @@ -25,17 +22,17 @@ class TestOutput(object): def test_output_buffer_no_flush(self, output_spy): output_spy.begin() with self.OutputBuffer(): - print(u'abc') + print('abc') assert output_spy.flush() == [] def test_output_buffer_flush(self, output_spy): output_spy.begin() with self.OutputBuffer() as obuf: - print(u'abc') + print('abc') print() - print(u'def') + print('def') obuf.flush() - assert output_spy.flush() == [u'abc', u'', u'def'] + assert output_spy.flush() == ['abc', '', 'def'] def test_output_defaults(self): out = self.Output() @@ -50,38 +47,38 @@ class TestOutput(object): out.use_colors = False output_spy.begin() out.info('info color') - assert output_spy.flush() == [u'info color'] + assert output_spy.flush() == ['info color'] output_spy.begin() out.head('head color') - assert output_spy.flush() == [u'head color'] + assert output_spy.flush() == ['head color'] output_spy.begin() out.good('good color') - assert output_spy.flush() == [u'good color'] + assert output_spy.flush() == ['good color'] output_spy.begin() out.warn('warn color') - assert output_spy.flush() == [u'warn color'] + assert output_spy.flush() == ['warn color'] output_spy.begin() out.fail('fail color') - assert output_spy.flush() == [u'fail color'] + assert output_spy.flush() == ['fail color'] if not out.colors_supported: return # test with colors out.use_colors = True output_spy.begin() out.info('info color') - assert output_spy.flush() == [u'info color'] + assert output_spy.flush() == ['info color'] output_spy.begin() out.head('head color') - assert output_spy.flush() == [u'\x1b[0;36mhead color\x1b[0m'] + assert output_spy.flush() == ['\x1b[0;36mhead color\x1b[0m'] output_spy.begin() out.good('good color') - assert output_spy.flush() == [u'\x1b[0;32mgood color\x1b[0m'] + assert output_spy.flush() == ['\x1b[0;32mgood color\x1b[0m'] output_spy.begin() out.warn('warn color') - assert output_spy.flush() == [u'\x1b[0;33mwarn color\x1b[0m'] + assert output_spy.flush() == ['\x1b[0;33mwarn color\x1b[0m'] output_spy.begin() out.fail('fail color') - assert output_spy.flush() == [u'\x1b[0;31mfail color\x1b[0m'] + assert output_spy.flush() == ['\x1b[0;31mfail color\x1b[0m'] def test_output_sep(self, output_spy): out = self.Output() @@ -89,7 +86,7 @@ class TestOutput(object): out.sep() out.sep() out.sep() - assert output_spy.flush() == [u'', u'', u''] + assert output_spy.flush() == ['', '', ''] def test_output_levels(self): out = self.Output() diff --git a/test/test_resolve.py b/test/test_resolve.py index 0f5471a..1b49675 100644 --- a/test/test_resolve.py +++ b/test/test_resolve.py @@ -1,11 +1,9 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- import socket import pytest # pylint: disable=attribute-defined-outside-init,protected-access -class TestResolve(object): +class TestResolve: @pytest.fixture(autouse=True) def init(self, ssh_audit): self.AuditConf = ssh_audit.AuditConf diff --git a/test/test_socket.py b/test/test_socket.py index 3f10a6e..c605c42 100644 --- a/test/test_socket.py +++ b/test/test_socket.py @@ -1,10 +1,8 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- import pytest # pylint: disable=attribute-defined-outside-init -class TestSocket(object): +class TestSocket: @pytest.fixture(autouse=True) def init(self, ssh_audit): self.ssh = ssh_audit.SSH diff --git a/test/test_software.py b/test/test_software.py index 53d1732..dd352b4 100644 --- a/test/test_software.py +++ b/test/test_software.py @@ -1,10 +1,8 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- import pytest # pylint: disable=line-too-long,attribute-defined-outside-init -class TestSoftware(object): +class TestSoftware: @pytest.fixture(autouse=True) def init(self, ssh_audit): self.ssh = ssh_audit.SSH diff --git a/test/test_ssh1.py b/test/test_ssh1.py index d865ff4..1364153 100644 --- a/test/test_ssh1.py +++ b/test/test_ssh1.py @@ -1,11 +1,9 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- import struct import pytest # pylint: disable=line-too-long,attribute-defined-outside-init -class TestSSH1(object): +class TestSSH1: @pytest.fixture(autouse=True) def init(self, ssh_audit): self.ssh = ssh_audit.SSH diff --git a/test/test_ssh2.py b/test/test_ssh2.py index 5e4e8e1..19b9c1e 100644 --- a/test/test_ssh2.py +++ b/test/test_ssh2.py @@ -1,12 +1,10 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- import os import struct import pytest # pylint: disable=line-too-long,attribute-defined-outside-init -class TestSSH2(object): +class TestSSH2: @pytest.fixture(autouse=True) def init(self, ssh_audit): self.ssh = ssh_audit.SSH @@ -38,16 +36,16 @@ class TestSSH2(object): def _kex_payload(self): w = self.wbuf() w.write(b'\x00\x11\x22\x33\x44\x55\x66\x77\x88\x99\xaa\xbb\xcc\xdd\xee\xff') - w.write_list([u'bogus_kex1', u'bogus_kex2']) # We use a bogus kex, otherwise the host key tests will kick off and fail. - w.write_list([u'ssh-rsa', u'rsa-sha2-512', u'rsa-sha2-256', u'ssh-ed25519']) - w.write_list([u'chacha20-poly1305@openssh.com', u'aes128-ctr', u'aes192-ctr', u'aes256-ctr', u'aes128-gcm@openssh.com', u'aes256-gcm@openssh.com', u'aes128-cbc', u'aes192-cbc', u'aes256-cbc']) - w.write_list([u'chacha20-poly1305@openssh.com', u'aes128-ctr', u'aes192-ctr', u'aes256-ctr', u'aes128-gcm@openssh.com', u'aes256-gcm@openssh.com', u'aes128-cbc', u'aes192-cbc', u'aes256-cbc']) - w.write_list([u'umac-64-etm@openssh.com', u'umac-128-etm@openssh.com', u'hmac-sha2-256-etm@openssh.com', u'hmac-sha2-512-etm@openssh.com', u'hmac-sha1-etm@openssh.com', u'umac-64@openssh.com', u'umac-128@openssh.com', u'hmac-sha2-256', u'hmac-sha2-512', u'hmac-sha1']) - w.write_list([u'umac-64-etm@openssh.com', u'umac-128-etm@openssh.com', u'hmac-sha2-256-etm@openssh.com', u'hmac-sha2-512-etm@openssh.com', u'hmac-sha1-etm@openssh.com', u'umac-64@openssh.com', u'umac-128@openssh.com', u'hmac-sha2-256', u'hmac-sha2-512', u'hmac-sha1']) - w.write_list([u'none', u'zlib@openssh.com']) - w.write_list([u'none', u'zlib@openssh.com']) - w.write_list([u'']) - w.write_list([u'']) + w.write_list(['bogus_kex1', 'bogus_kex2']) # We use a bogus kex, otherwise the host key tests will kick off and fail. + w.write_list(['ssh-rsa', 'rsa-sha2-512', 'rsa-sha2-256', 'ssh-ed25519']) + w.write_list(['chacha20-poly1305@openssh.com', 'aes128-ctr', 'aes192-ctr', 'aes256-ctr', 'aes128-gcm@openssh.com', 'aes256-gcm@openssh.com', 'aes128-cbc', 'aes192-cbc', 'aes256-cbc']) + w.write_list(['chacha20-poly1305@openssh.com', 'aes128-ctr', 'aes192-ctr', 'aes256-ctr', 'aes128-gcm@openssh.com', 'aes256-gcm@openssh.com', 'aes128-cbc', 'aes192-cbc', 'aes256-cbc']) + w.write_list(['umac-64-etm@openssh.com', 'umac-128-etm@openssh.com', 'hmac-sha2-256-etm@openssh.com', 'hmac-sha2-512-etm@openssh.com', 'hmac-sha1-etm@openssh.com', 'umac-64@openssh.com', 'umac-128@openssh.com', 'hmac-sha2-256', 'hmac-sha2-512', 'hmac-sha1']) + w.write_list(['umac-64-etm@openssh.com', 'umac-128-etm@openssh.com', 'hmac-sha2-256-etm@openssh.com', 'hmac-sha2-512-etm@openssh.com', 'hmac-sha1-etm@openssh.com', 'umac-64@openssh.com', 'umac-128@openssh.com', 'hmac-sha2-256', 'hmac-sha2-512', 'hmac-sha1']) + w.write_list(['none', 'zlib@openssh.com']) + w.write_list(['none', 'zlib@openssh.com']) + w.write_list(['']) + w.write_list(['']) w.write_byte(False) w.write_int(0) return w.write_flush() @@ -56,18 +54,18 @@ class TestSSH2(object): kex = self.ssh2.Kex.parse(self._kex_payload()) assert kex is not None assert kex.cookie == b'\x00\x11\x22\x33\x44\x55\x66\x77\x88\x99\xaa\xbb\xcc\xdd\xee\xff' - assert kex.kex_algorithms == [u'bogus_kex1', u'bogus_kex2'] - assert kex.key_algorithms == [u'ssh-rsa', u'rsa-sha2-512', u'rsa-sha2-256', u'ssh-ed25519'] + assert kex.kex_algorithms == ['bogus_kex1', 'bogus_kex2'] + assert kex.key_algorithms == ['ssh-rsa', 'rsa-sha2-512', 'rsa-sha2-256', 'ssh-ed25519'] assert kex.client is not None assert kex.server is not None - assert kex.client.encryption == [u'chacha20-poly1305@openssh.com', u'aes128-ctr', u'aes192-ctr', u'aes256-ctr', u'aes128-gcm@openssh.com', u'aes256-gcm@openssh.com', u'aes128-cbc', u'aes192-cbc', u'aes256-cbc'] - assert kex.server.encryption == [u'chacha20-poly1305@openssh.com', u'aes128-ctr', u'aes192-ctr', u'aes256-ctr', u'aes128-gcm@openssh.com', u'aes256-gcm@openssh.com', u'aes128-cbc', u'aes192-cbc', u'aes256-cbc'] - assert kex.client.mac == [u'umac-64-etm@openssh.com', u'umac-128-etm@openssh.com', u'hmac-sha2-256-etm@openssh.com', u'hmac-sha2-512-etm@openssh.com', u'hmac-sha1-etm@openssh.com', u'umac-64@openssh.com', u'umac-128@openssh.com', u'hmac-sha2-256', u'hmac-sha2-512', u'hmac-sha1'] - assert kex.server.mac == [u'umac-64-etm@openssh.com', u'umac-128-etm@openssh.com', u'hmac-sha2-256-etm@openssh.com', u'hmac-sha2-512-etm@openssh.com', u'hmac-sha1-etm@openssh.com', u'umac-64@openssh.com', u'umac-128@openssh.com', u'hmac-sha2-256', u'hmac-sha2-512', u'hmac-sha1'] - assert kex.client.compression == [u'none', u'zlib@openssh.com'] - assert kex.server.compression == [u'none', u'zlib@openssh.com'] - assert kex.client.languages == [u''] - assert kex.server.languages == [u''] + assert kex.client.encryption == ['chacha20-poly1305@openssh.com', 'aes128-ctr', 'aes192-ctr', 'aes256-ctr', 'aes128-gcm@openssh.com', 'aes256-gcm@openssh.com', 'aes128-cbc', 'aes192-cbc', 'aes256-cbc'] + assert kex.server.encryption == ['chacha20-poly1305@openssh.com', 'aes128-ctr', 'aes192-ctr', 'aes256-ctr', 'aes128-gcm@openssh.com', 'aes256-gcm@openssh.com', 'aes128-cbc', 'aes192-cbc', 'aes256-cbc'] + assert kex.client.mac == ['umac-64-etm@openssh.com', 'umac-128-etm@openssh.com', 'hmac-sha2-256-etm@openssh.com', 'hmac-sha2-512-etm@openssh.com', 'hmac-sha1-etm@openssh.com', 'umac-64@openssh.com', 'umac-128@openssh.com', 'hmac-sha2-256', 'hmac-sha2-512', 'hmac-sha1'] + assert kex.server.mac == ['umac-64-etm@openssh.com', 'umac-128-etm@openssh.com', 'hmac-sha2-256-etm@openssh.com', 'hmac-sha2-512-etm@openssh.com', 'hmac-sha1-etm@openssh.com', 'umac-64@openssh.com', 'umac-128@openssh.com', 'hmac-sha2-256', 'hmac-sha2-512', 'hmac-sha1'] + assert kex.client.compression == ['none', 'zlib@openssh.com'] + assert kex.server.compression == ['none', 'zlib@openssh.com'] + assert kex.client.languages == [''] + assert kex.server.languages == [''] assert kex.follows is False assert kex.unused == 0 diff --git a/test/test_ssh_algorithm.py b/test/test_ssh_algorithm.py index 38905e0..a59c365 100644 --- a/test/test_ssh_algorithm.py +++ b/test/test_ssh_algorithm.py @@ -1,10 +1,8 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- import pytest # pylint: disable=attribute-defined-outside-init -class TestSSHAlgorithm(object): +class TestSSHAlgorithm: @pytest.fixture(autouse=True) def init(self, ssh_audit): self.ssh = ssh_audit.SSH diff --git a/test/test_utils.py b/test/test_utils.py index c97687e..6bf48d3 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -1,202 +1,60 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -import sys import pytest # pylint: disable=attribute-defined-outside-init -class TestUtils(object): +class TestUtils: @pytest.fixture(autouse=True) def init(self, ssh_audit): self.utils = ssh_audit.Utils - self.PY3 = sys.version_info >= (3,) - def test_to_bytes_py2(self): - if self.PY3: - return - # binary_type (native str, bytes as str) - assert self.utils.to_bytes('fran\xc3\xa7ais') == 'fran\xc3\xa7ais' - assert self.utils.to_bytes(b'fran\xc3\xa7ais') == 'fran\xc3\xa7ais' - # text_type (unicode) - assert self.utils.to_bytes(u'fran\xe7ais') == 'fran\xc3\xa7ais' - # other - with pytest.raises(TypeError): - self.utils.to_bytes(123) - - def test_to_bytes_py3(self): - if not self.PY3: - return - # binary_type (bytes) + def test_to_bytes(self): assert self.utils.to_bytes(b'fran\xc3\xa7ais') == b'fran\xc3\xa7ais' - # text_type (native str as unicode, unicode) assert self.utils.to_bytes('fran\xe7ais') == b'fran\xc3\xa7ais' - assert self.utils.to_bytes(u'fran\xe7ais') == b'fran\xc3\xa7ais' # other with pytest.raises(TypeError): self.utils.to_bytes(123) - def test_to_utext_py2(self): - if self.PY3: - return - # binary_type (native str, bytes as str) - assert self.utils.to_utext('fran\xc3\xa7ais') == u'fran\xe7ais' - assert self.utils.to_utext(b'fran\xc3\xa7ais') == u'fran\xe7ais' - # text_type (unicode) - assert self.utils.to_utext(u'fran\xe7ais') == u'fran\xe7ais' - # other - with pytest.raises(TypeError): - self.utils.to_utext(123) - - def test_to_utext_py3(self): - if not self.PY3: - return - # binary_type (bytes) - assert self.utils.to_utext(b'fran\xc3\xa7ais') == u'fran\xe7ais' - # text_type (native str as unicode, unicode) + def test_to_utext(self): + assert self.utils.to_utext(b'fran\xc3\xa7ais') == 'fran\xe7ais' assert self.utils.to_utext('fran\xe7ais') == 'fran\xe7ais' - assert self.utils.to_utext(u'fran\xe7ais') == u'fran\xe7ais' # other with pytest.raises(TypeError): self.utils.to_utext(123) - def test_to_ntext_py2(self): - if self.PY3: - return - # str (native str, bytes as str) + def test_to_ntext(self): assert self.utils.to_ntext('fran\xc3\xa7ais') == 'fran\xc3\xa7ais' - assert self.utils.to_ntext(b'fran\xc3\xa7ais') == 'fran\xc3\xa7ais' - # text_type (unicode) - assert self.utils.to_ntext(u'fran\xe7ais') == 'fran\xc3\xa7ais' - # other - with pytest.raises(TypeError): - self.utils.to_ntext(123) - - def test_to_ntext_py3(self): - if not self.PY3: - return - # str (native str) - assert self.utils.to_ntext('fran\xc3\xa7ais') == 'fran\xc3\xa7ais' - assert self.utils.to_ntext(u'fran\xe7ais') == 'fran\xe7ais' - # binary_type (bytes) assert self.utils.to_ntext(b'fran\xc3\xa7ais') == 'fran\xe7ais' # other with pytest.raises(TypeError): self.utils.to_ntext(123) - def test_is_ascii_py2(self): - if self.PY3: - return - # text_type (unicode) - assert self.utils.is_ascii(u'francais') is True - assert self.utils.is_ascii(u'fran\xe7ais') is False - # str + def test_is_ascii(self): assert self.utils.is_ascii('francais') is True - assert self.utils.is_ascii('fran\xc3\xa7ais') is False - # other - assert self.utils.is_ascii(123) is False - - def test_is_ascii_py3(self): - if not self.PY3: - return - # text_type (str) - assert self.utils.is_ascii('francais') is True - assert self.utils.is_ascii(u'francais') is True assert self.utils.is_ascii('fran\xe7ais') is False - assert self.utils.is_ascii(u'fran\xe7ais') is False # other assert self.utils.is_ascii(123) is False - def test_to_ascii_py2(self): - if self.PY3: - return - # text_type (unicode) - assert self.utils.to_ascii(u'francais') == 'francais' - assert self.utils.to_ascii(u'fran\xe7ais') == 'fran?ais' - assert self.utils.to_ascii(u'fran\xe7ais', 'ignore') == 'franais' - # str + def test_to_ascii(self): assert self.utils.to_ascii('francais') == 'francais' - assert self.utils.to_ascii('fran\xc3\xa7ais') == 'fran??ais' - assert self.utils.to_ascii('fran\xc3\xa7ais', 'ignore') == 'franais' - with pytest.raises(TypeError): - self.utils.to_ascii(123) - - def test_to_ascii_py3(self): - if not self.PY3: - return - # text_type (str) - assert self.utils.to_ascii('francais') == 'francais' - assert self.utils.to_ascii(u'francais') == 'francais' assert self.utils.to_ascii('fran\xe7ais') == 'fran?ais' assert self.utils.to_ascii('fran\xe7ais', 'ignore') == 'franais' - assert self.utils.to_ascii(u'fran\xe7ais') == 'fran?ais' - assert self.utils.to_ascii(u'fran\xe7ais', 'ignore') == 'franais' with pytest.raises(TypeError): self.utils.to_ascii(123) - def test_is_print_ascii_py2(self): - if self.PY3: - return - # text_type (unicode) - assert self.utils.is_print_ascii(u'francais') is True - assert self.utils.is_print_ascii(u'francais\n') is False - assert self.utils.is_print_ascii(u'fran\xe7ais') is False - assert self.utils.is_print_ascii(u'fran\xe7ais\n') is False - # str + def test_is_print_ascii(self): assert self.utils.is_print_ascii('francais') is True assert self.utils.is_print_ascii('francais\n') is False - assert self.utils.is_print_ascii('fran\xc3\xa7ais') is False - # other - assert self.utils.is_print_ascii(123) is False - - def test_is_print_ascii_py3(self): - if not self.PY3: - return - # text_type (str) - assert self.utils.is_print_ascii('francais') is True - assert self.utils.is_print_ascii('francais\n') is False - assert self.utils.is_print_ascii(u'francais') is True - assert self.utils.is_print_ascii(u'francais\n') is False assert self.utils.is_print_ascii('fran\xe7ais') is False - assert self.utils.is_print_ascii(u'fran\xe7ais') is False # other assert self.utils.is_print_ascii(123) is False - def test_to_print_ascii_py2(self): - if self.PY3: - return - # text_type (unicode) - assert self.utils.to_print_ascii(u'francais') == 'francais' - assert self.utils.to_print_ascii(u'francais\n') == 'francais?' - assert self.utils.to_print_ascii(u'fran\xe7ais') == 'fran?ais' - assert self.utils.to_print_ascii(u'fran\xe7ais\n') == 'fran?ais?' - assert self.utils.to_print_ascii(u'fran\xe7ais', 'ignore') == 'franais' - assert self.utils.to_print_ascii(u'fran\xe7ais\n', 'ignore') == 'franais' - # str + def test_to_print_ascii(self): assert self.utils.to_print_ascii('francais') == 'francais' assert self.utils.to_print_ascii('francais\n') == 'francais?' - assert self.utils.to_print_ascii('fran\xc3\xa7ais') == 'fran??ais' - assert self.utils.to_print_ascii('fran\xc3\xa7ais\n') == 'fran??ais?' - assert self.utils.to_print_ascii('fran\xc3\xa7ais', 'ignore') == 'franais' - assert self.utils.to_print_ascii('fran\xc3\xa7ais\n', 'ignore') == 'franais' - with pytest.raises(TypeError): - self.utils.to_print_ascii(123) - - def test_to_print_ascii_py3(self): - if not self.PY3: - return - # text_type (str) - assert self.utils.to_print_ascii('francais') == 'francais' - assert self.utils.to_print_ascii('francais\n') == 'francais?' - assert self.utils.to_print_ascii(u'francais') == 'francais' - assert self.utils.to_print_ascii(u'francais\n') == 'francais?' assert self.utils.to_print_ascii('fran\xe7ais') == 'fran?ais' assert self.utils.to_print_ascii('fran\xe7ais\n') == 'fran?ais?' assert self.utils.to_print_ascii('fran\xe7ais', 'ignore') == 'franais' assert self.utils.to_print_ascii('fran\xe7ais\n', 'ignore') == 'franais' - assert self.utils.to_print_ascii(u'fran\xe7ais') == 'fran?ais' - assert self.utils.to_print_ascii(u'fran\xe7ais\n') == 'fran?ais?' - assert self.utils.to_print_ascii(u'fran\xe7ais', 'ignore') == 'franais' - assert self.utils.to_print_ascii(u'fran\xe7ais\n', 'ignore') == 'franais' with pytest.raises(TypeError): self.utils.to_print_ascii(123) diff --git a/test/test_version_compare.py b/test/test_version_compare.py index dd225cf..8410345 100644 --- a/test/test_version_compare.py +++ b/test/test_version_compare.py @@ -1,24 +1,22 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- import pytest # pylint: disable=attribute-defined-outside-init -class TestVersionCompare(object): +class TestVersionCompare: @pytest.fixture(autouse=True) def init(self, ssh_audit): self.ssh = ssh_audit.SSH def get_dropbear_software(self, v): - b = self.ssh.Banner.parse('SSH-2.0-dropbear_{0}'.format(v)) + b = self.ssh.Banner.parse('SSH-2.0-dropbear_{}'.format(v)) return self.ssh.Software.parse(b) def get_openssh_software(self, v): - b = self.ssh.Banner.parse('SSH-2.0-OpenSSH_{0}'.format(v)) + b = self.ssh.Banner.parse('SSH-2.0-OpenSSH_{}'.format(v)) return self.ssh.Software.parse(b) def get_libssh_software(self, v): - b = self.ssh.Banner.parse('SSH-2.0-libssh-{0}'.format(v)) + b = self.ssh.Banner.parse('SSH-2.0-libssh-{}'.format(v)) return self.ssh.Software.parse(b) def test_dropbear_compare_version_pre_years(self): @@ -84,27 +82,27 @@ class TestVersionCompare(object): def test_dropbear_compare_version_sequential(self): versions = [] for i in range(28, 44): - versions.append('0.{0}'.format(i)) + versions.append('0.{}'.format(i)) for i in range(1, 5): - versions.append('0.44test{0}'.format(i)) + versions.append('0.44test{}'.format(i)) for i in range(44, 49): - versions.append('0.{0}'.format(i)) + versions.append('0.{}'.format(i)) versions.append('0.48.1') for i in range(49, 54): - versions.append('0.{0}'.format(i)) + versions.append('0.{}'.format(i)) versions.append('0.53.1') for v in ['2011.54', '2012.55']: versions.append(v) for i in range(56, 61): - versions.append('2013.{0}'.format(i)) + versions.append('2013.{}'.format(i)) for v in ['2013.61test', '2013.62']: versions.append(v) for i in range(63, 67): - versions.append('2014.{0}'.format(i)) + versions.append('2014.{}'.format(i)) for i in range(67, 72): - versions.append('2015.{0}'.format(i)) + versions.append('2015.{}'.format(i)) for i in range(72, 75): - versions.append('2016.{0}'.format(i)) + versions.append('2016.{}'.format(i)) length = len(versions) for i in range(length): v = versions[i] @@ -151,19 +149,19 @@ class TestVersionCompare(object): for v in ['3.0', '3.0.1', '3.0.2', '3.1', '3.2.2', '3.2.3']: versions.append(v) for i in range(3, 7): - versions.append('3.{0}'.format(i)) + versions.append('3.{}'.format(i)) for v in ['3.6.1', '3.7.0', '3.7.1']: versions.append(v) for i in range(8, 10): - versions.append('3.{0}'.format(i)) + versions.append('3.{}'.format(i)) for i in range(0, 10): - versions.append('4.{0}'.format(i)) + versions.append('4.{}'.format(i)) for i in range(0, 10): - versions.append('5.{0}'.format(i)) + versions.append('5.{}'.format(i)) for i in range(0, 10): - versions.append('6.{0}'.format(i)) + versions.append('6.{}'.format(i)) for i in range(0, 4): - versions.append('7.{0}'.format(i)) + versions.append('7.{}'.format(i)) length = len(versions) for i in range(length): v = versions[i] @@ -193,15 +191,15 @@ class TestVersionCompare(object): for v in ['0.2', '0.3']: versions.append(v) for i in range(1, 5): - versions.append('0.3.{0}'.format(i)) + versions.append('0.3.{}'.format(i)) for i in range(0, 9): - versions.append('0.4.{0}'.format(i)) + versions.append('0.4.{}'.format(i)) for i in range(0, 6): - versions.append('0.5.{0}'.format(i)) + versions.append('0.5.{}'.format(i)) for i in range(0, 6): - versions.append('0.6.{0}'.format(i)) + versions.append('0.6.{}'.format(i)) for i in range(0, 5): - versions.append('0.7.{0}'.format(i)) + versions.append('0.7.{}'.format(i)) length = len(versions) for i in range(length): v = versions[i] diff --git a/tox.ini b/tox.ini index bd692ae..5a49af8 100644 --- a/tox.ini +++ b/tox.ini @@ -57,13 +57,6 @@ commands = --config-file {toxinidir}/tox.ini \ --html-report {env:MYPYHTML}.py3.{envname} \ {posargs:{env:SSHAUDIT}} - -mypy \ - -2 \ - --no-warn-incomplete-stub \ - --show-error-context \ - --config-file {toxinidir}/tox.ini \ - --html-report {env:MYPYHTML}.py2.{envname} \ - {posargs:{env:SSHAUDIT}} [testenv:pylint] deps =