Remove some more Python 2 leftovers (#37)

* Remove mypy job for Python 2

modified:   tox.ini

* Remove Python 2 compatibility import

modified:   ssh-audit.py

* Remove compatibility import for BytesIO and StringIO

This is no longer necessary, as support for Python 2 was dropped.

modified:   ssh-audit.py

* Remove `text-type` compatibility layer

... as support for Python 2 was dropped already.

modified:   ssh-audit.py

* Remove `binary-type` compatibility layer

... as support for Python 2 was dropped already.

modified:   ssh-audit.py

* Remove try-except block for typing

... as since Python 3.5 it is included in the standard library.

modified:   ssh-audit.py

* Move typing import to top of module

modified:   ssh-audit.py

* Remove obsolete encoding declaration

modified:   ssh-audit.py

* Apply pyupgrade on ssh-audit.py

pyupgrade is a tool which updates Python code to modern syntax

modified:   ssh-audit.py

* Remove Python 2 compatibility from conftest.py

modified:   test/conftest.py

* Remove Python 2 compatibility from test_auditconf.py

modified:   test/test_auditconf.py

* Remove Python 2 compatibility from test_banner.py

modified:   test/test_banner.py

* Remove Python 2 compatibility from test_buffer.py

modified:   test/test_buffer.py

* Remove Python 2 compatibility from test_errors.py

modified:   test/test_errors.py

* Remove Python 2 compatibility from test_output.py

modified:   test/test_output.py

* Remove Python 2 compatibility from test_resolve.py

modified:   test/test_resolve.py

* Remove Python 2 compatibility from test_socket.py

modified:   test/test_socket.py

* Remove Python 2 compatibility from test_software.py

modified:   test/test_software.py

* Remove Python 2 compatibility from test_ssh_algorithm.py

modified:   test/test_ssh_algorithm.py

* Remove Python 2 compatibility from test_ssh1.py

modified:   test/test_ssh1.py

* Remove Python 2 compatibility from test_ssh2.py

modified:   test/test_ssh2.py

* Remove Python 2 compatibility and Py2 only tests

... from test_utils.py.

modified:   test/test_utils.py

* Remove Python 2 compatibility from test_version_compare.py

modified:   test/test_version_compare.py

* Remove Python 2 job from appveyor config

This was done blindly, as it is unclear whether appveyor runs at all.

modified:   .appveyor.yml
This commit is contained in:
Jürgen Gmach
2020-06-15 23:05:31 +02:00
committed by GitHub
parent 42fecf83e6
commit ec1dda8d7f
17 changed files with 231 additions and 433 deletions

View File

@ -1,5 +1,4 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
The MIT License (MIT)
@ -24,7 +23,6 @@
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""
from __future__ import print_function
import base64
import binascii
import errno
@ -39,26 +37,13 @@ import select
import socket
import struct
import sys
# pylint: disable=unused-import
from typing import Dict, List, Set, Sequence, Tuple, Iterable
from typing import Callable, Optional, Union, Any
VERSION = 'v2.2.1-dev'
SSH_HEADER = 'SSH-{0}-OpenSSH_8.0' # SSH software to impersonate
if sys.version_info >= (3,): # pragma: nocover
StringIO, BytesIO = io.StringIO, io.BytesIO
text_type = str
binary_type = bytes
else: # pragma: nocover
import StringIO as _StringIO # pylint: disable=import-error
StringIO = BytesIO = _StringIO.StringIO
text_type = unicode # pylint: disable=undefined-variable # noqa: F821
binary_type = str
try: # pragma: nocover
# pylint: disable=unused-import
from typing import Dict, List, Set, Sequence, Tuple, Iterable
from typing import Callable, Optional, Union, Any
except ImportError: # pragma: nocover
pass
try: # pragma: nocover
from colorama import init as colorama_init
colorama_init(strip=False) # pragma: nocover
@ -70,10 +55,10 @@ def usage(err=None):
# type: (Optional[str]) -> None
uout = Output()
p = os.path.basename(sys.argv[0])
uout.head('# {0} {1}, https://github.com/jtesta/ssh-audit\n'.format(p, VERSION))
uout.head('# {} {}, https://github.com/jtesta/ssh-audit\n'.format(p, VERSION))
if err is not None and len(err) > 0:
uout.fail('\n' + err)
uout.info('usage: {0} [-1246pbcnjvlt] <host>\n'.format(p))
uout.info('usage: {} [-1246pbcnjvlt] <host>\n'.format(p))
uout.info(' -h, --help print this help')
uout.info(' -1, --ssh1 force ssh version 1 only')
uout.info(' -2, --ssh2 force ssh version 2 only')
@ -139,18 +124,18 @@ class AuditConf:
elif name == 'port':
valid, port = True, utils.parse_int(value)
if port < 1 or port > 65535:
raise ValueError('invalid port: {0}'.format(value))
raise ValueError('invalid port: {}'.format(value))
value = port
elif name in ['level']:
if value not in ('info', 'warn', 'fail'):
raise ValueError('invalid level: {0}'.format(value))
raise ValueError('invalid level: {}'.format(value))
valid = True
elif name == 'host':
valid = True
elif name == 'timeout':
value = utils.parse_float(value)
if value == -1.0:
raise ValueError('invalid timeout: {0}'.format(value))
raise ValueError('invalid timeout: {}'.format(value))
valid = True
if valid:
object.__setattr__(self, name, value)
@ -195,7 +180,7 @@ class AuditConf:
aconf.verbose = True
elif o in ('-l', '--level'):
if a not in ('info', 'warn', 'fail'):
usage_cb('level {0} is not valid'.format(a))
usage_cb('level {} is not valid'.format(a))
aconf.level = a
elif o in ('-t', '--timeout'):
aconf.timeout = float(a)
@ -223,7 +208,7 @@ class AuditConf:
oport = '2222'
port = utils.parse_int(oport)
if port <= 0 or port > 65535:
usage_cb('port {0} is not valid'.format(oport))
usage_cb('port {} is not valid'.format(oport))
aconf.host = host
aconf.port = port
if not (aconf.ssh1 or aconf.ssh2):
@ -275,27 +260,27 @@ class Output:
@staticmethod
def _colorized(color):
# type: (str) -> Callable[[text_type], None]
return lambda x: print(u'{0}{1}\033[0m'.format(color, x))
# type: (str) -> Callable[[str], None]
return lambda x: print(u'{}{}\033[0m'.format(color, x))
def __getattr__(self, name):
# type: (str) -> Callable[[text_type], None]
# type: (str) -> Callable[[str], None]
if name == 'head' and self.batch:
return lambda x: None
if not self.get_level(name) >= self.__level:
return lambda x: None
if self.use_colors and self.colors_supported and name in self.COLORS:
color = '\033[0;{0}m'.format(self.COLORS[name])
color = '\033[0;{}m'.format(self.COLORS[name])
return self._colorized(color)
else:
return lambda x: print(u'{0}'.format(x))
return lambda x: print(u'{}'.format(x))
class OutputBuffer(list):
def __enter__(self):
# type: () -> OutputBuffer
# pylint: disable=attribute-defined-outside-init
self.__buf = StringIO()
self.__buf = io.StringIO()
self.__stdout = sys.stdout
sys.stdout = self.__buf
return self
@ -536,7 +521,7 @@ class SSH2: # pylint: disable=too-few-public-methods
class KexParty:
def __init__(self, enc, mac, compression, languages):
# type: (List[text_type], List[text_type], List[text_type], List[text_type]) -> None
# type: (List[str], List[str], List[str], List[str]) -> None
self.__enc = enc
self.__mac = mac
self.__compression = compression
@ -544,27 +529,27 @@ class SSH2: # pylint: disable=too-few-public-methods
@property
def encryption(self):
# type: () -> List[text_type]
# type: () -> List[str]
return self.__enc
@property
def mac(self):
# type: () -> List[text_type]
# type: () -> List[str]
return self.__mac
@property
def compression(self):
# type: () -> List[text_type]
# type: () -> List[str]
return self.__compression
@property
def languages(self):
# type: () -> List[text_type]
# type: () -> List[str]
return self.__languages
class Kex:
def __init__(self, cookie, kex_algs, key_algs, cli, srv, follows, unused=0):
# type: (binary_type, List[text_type], List[text_type], SSH2.KexParty, SSH2.KexParty, bool, int) -> None
# type: (bytes, List[str], List[str], SSH2.KexParty, SSH2.KexParty, bool, int) -> None
self.__cookie = cookie
self.__kex_algs = kex_algs
self.__key_algs = key_algs
@ -579,17 +564,17 @@ class SSH2: # pylint: disable=too-few-public-methods
@property
def cookie(self):
# type: () -> binary_type
# type: () -> bytes
return self.__cookie
@property
def kex_algorithms(self):
# type: () -> List[text_type]
# type: () -> List[str]
return self.__kex_algs
@property
def key_algorithms(self):
# type: () -> List[text_type]
# type: () -> List[str]
return self.__key_algs
# client_to_server
@ -650,14 +635,14 @@ class SSH2: # pylint: disable=too-few-public-methods
@property
def payload(self):
# type: () -> binary_type
# type: () -> bytes
wbuf = WriteBuf()
self.write(wbuf)
return wbuf.write_flush()
@classmethod
def parse(cls, payload):
# type: (binary_type) -> SSH2.Kex
# type: (bytes) -> SSH2.Kex
buf = ReadBuf(payload)
cookie = buf.read(16)
kex_algs = buf.read_list()
@ -943,7 +928,7 @@ class SSH1:
self._table[i] = crc
def calc(self, v):
# type: (binary_type) -> int
# type: (bytes) -> int
crc, length = 0, len(v)
for i in range(length):
n = ord(v[i:i + 1])
@ -957,7 +942,7 @@ class SSH1:
@classmethod
def crc32(cls, v):
# type: (binary_type) -> int
# type: (bytes) -> int
if cls._crc32 is None:
cls._crc32 = cls.CRC32()
return cls._crc32.calc(v)
@ -995,11 +980,11 @@ class SSH1:
class PublicKeyMessage:
def __init__(self, cookie, skey, hkey, pflags, cmask, amask):
# type: (binary_type, Tuple[int, int, int], Tuple[int, int, int], int, int, int) -> None
# type: (bytes, Tuple[int, int, int], Tuple[int, int, int], int, int, int) -> None
if len(skey) != 3:
raise ValueError('invalid server key pair: {0}'.format(skey))
raise ValueError('invalid server key pair: {}'.format(skey))
if len(hkey) != 3:
raise ValueError('invalid host key pair: {0}'.format(hkey))
raise ValueError('invalid host key pair: {}'.format(hkey))
self.__cookie = cookie
self.__server_key = skey
self.__host_key = hkey
@ -1009,7 +994,7 @@ class SSH1:
@property
def cookie(self):
# type: () -> binary_type
# type: () -> bytes
return self.__cookie
@property
@ -1044,7 +1029,7 @@ class SSH1:
@property
def host_key_fingerprint_data(self):
# type: () -> binary_type
# type: () -> bytes
# pylint: disable=protected-access
mod = WriteBuf._create_mpint(self.host_key_public_modulus, False)
e = WriteBuf._create_mpint(self.host_key_public_exponent, False)
@ -1062,7 +1047,7 @@ class SSH1:
@property
def supported_ciphers(self):
# type: () -> List[text_type]
# type: () -> List[str]
ciphers = []
for i in range(len(SSH1.CIPHERS)):
if self.__supported_ciphers_mask & (1 << i) != 0:
@ -1076,7 +1061,7 @@ class SSH1:
@property
def supported_authentications(self):
# type: () -> List[text_type]
# type: () -> List[str]
auths = []
for i in range(1, len(SSH1.AUTHS)):
if self.__supported_authentications_mask & (1 << i) != 0:
@ -1098,14 +1083,14 @@ class SSH1:
@property
def payload(self):
# type: () -> binary_type
# type: () -> bytes
wbuf = WriteBuf()
self.write(wbuf)
return wbuf.write_flush()
@classmethod
def parse(cls, payload):
# type: (binary_type) -> SSH1.PublicKeyMessage
# type: (bytes) -> SSH1.PublicKeyMessage
buf = ReadBuf(payload)
cookie = buf.read(8)
server_key_bits = buf.read_int()
@ -1125,9 +1110,9 @@ class SSH1:
class ReadBuf:
def __init__(self, data=None):
# type: (Optional[binary_type]) -> None
# type: (Optional[bytes]) -> None
super(ReadBuf, self).__init__()
self._buf = BytesIO(data) if data is not None else BytesIO()
self._buf = io.BytesIO(data) if data is not None else io.BytesIO()
self._len = len(data) if data is not None else 0
@property
@ -1136,7 +1121,7 @@ class ReadBuf:
return self._len - self._buf.tell()
def read(self, size):
# type: (int) -> binary_type
# type: (int) -> bytes
return self._buf.read(size)
def read_byte(self):
@ -1154,18 +1139,18 @@ class ReadBuf:
return v
def read_list(self):
# type: () -> List[text_type]
# type: () -> List[str]
list_size = self.read_int()
return self.read(list_size).decode('utf-8', 'replace').split(',')
def read_string(self):
# type: () -> binary_type
# type: () -> bytes
n = self.read_int()
return self.read(n)
@classmethod
def _parse_mpint(cls, v, pad, f):
# type: (binary_type, binary_type, str) -> int
# type: (bytes, bytes, str) -> int
r = 0
if len(v) % 4 != 0:
v = pad * (4 - (len(v) % 4)) + v
@ -1190,22 +1175,22 @@ class ReadBuf:
return self._parse_mpint(v, pad, f)
def read_line(self):
# type: () -> text_type
# type: () -> str
return self._buf.readline().rstrip().decode('utf-8', 'replace')
def reset(self):
self._buf = BytesIO()
self._buf = io.BytesIO()
self._len = 0
class WriteBuf:
def __init__(self, data=None):
# type: (Optional[binary_type]) -> None
# type: (Optional[bytes]) -> None
super(WriteBuf, self).__init__()
self._wbuf = BytesIO(data) if data is not None else BytesIO()
self._wbuf = io.BytesIO(data) if data is not None else io.BytesIO()
def write(self, data):
# type: (binary_type) -> WriteBuf
# type: (bytes) -> WriteBuf
self._wbuf.write(data)
return self
@ -1222,14 +1207,14 @@ class WriteBuf:
return self.write(struct.pack('>I', v))
def write_string(self, v):
# type: (Union[binary_type, text_type]) -> WriteBuf
# type: (Union[bytes, str]) -> WriteBuf
if not isinstance(v, bytes):
v = bytes(bytearray(v, 'utf-8'))
self.write_int(len(v))
return self.write(v)
def write_list(self, v):
# type: (List[text_type]) -> WriteBuf
# type: (List[str]) -> WriteBuf
return self.write_string(u','.join(v))
@classmethod
@ -1242,12 +1227,12 @@ class WriteBuf:
@classmethod
def _create_mpint(cls, n, signed=True, bits=None):
# type: (int, bool, Optional[int]) -> binary_type
# type: (int, bool, Optional[int]) -> bytes
if bits is None:
bits = cls._bitlength(n)
length = bits // 8 + (1 if n != 0 else 0)
ql = (length + 7) // 8
fmt, v2 = '>{0}Q'.format(ql), [0] * ql
fmt, v2 = '>{}Q'.format(ql), [0] * ql
for i in range(ql):
v2[ql - i - 1] = n & 0xffffffffffffffff
n >>= 64
@ -1273,21 +1258,21 @@ class WriteBuf:
return self.write_string(data)
def write_line(self, v):
# type: (Union[binary_type, str]) -> WriteBuf
# type: (Union[bytes, str]) -> WriteBuf
if not isinstance(v, bytes):
v = bytes(bytearray(v, 'utf-8'))
v += b'\r\n'
return self.write(v)
def write_flush(self):
# type: () -> binary_type
# type: () -> bytes
payload = self._wbuf.getvalue()
self._wbuf.truncate(0)
self._wbuf.seek(0)
return payload
def reset(self):
self._wbuf = BytesIO()
self._wbuf = io.BytesIO()
class SSH: # pylint: disable=too-few-public-methods
@ -1346,12 +1331,12 @@ class SSH: # pylint: disable=too-few-public-methods
return self.__os
def compare_version(self, other):
# type: (Union[None, SSH.Software, text_type]) -> int
# type: (Union[None, SSH.Software, str]) -> int
# pylint: disable=too-many-branches
if other is None:
return 1
if isinstance(other, SSH.Software):
other = '{0}{1}'.format(other.version, other.patch or '')
other = '{}{}'.format(other.version, other.patch or '')
else:
other = str(other)
mx = re.match(r'^([\d\.]+\d+)(.*)$', other)
@ -1366,9 +1351,9 @@ class SSH: # pylint: disable=too-few-public-methods
spatch = self.patch or ''
if self.product == SSH.Product.DropbearSSH:
if not re.match(r'^test\d.*$', opatch):
opatch = 'z{0}'.format(opatch)
opatch = 'z{}'.format(opatch)
if not re.match(r'^test\d.*$', spatch):
spatch = 'z{0}'.format(spatch)
spatch = 'z{}'.format(spatch)
elif self.product == SSH.Product.OpenSSH:
mx1 = re.match(r'^p\d(.*)', opatch)
mx2 = re.match(r'^p\d(.*)', spatch)
@ -1393,10 +1378,10 @@ class SSH: # pylint: disable=too-few-public-methods
def display(self, full=True):
# type: (bool) -> str
r = '{0} '.format(self.vendor) if bool(self.vendor) else ''
r = '{} '.format(self.vendor) if bool(self.vendor) else ''
r += self.product
if bool(self.version):
r += ' {0}'.format(self.version)
r += ' {}'.format(self.version)
if full:
patch = self.patch or ''
if self.product == SSH.Product.OpenSSH:
@ -1405,9 +1390,9 @@ class SSH: # pylint: disable=too-few-public-methods
r += mx.group(1)
patch = mx.group(2).strip()
if bool(patch):
r += ' ({0})'.format(patch)
r += ' ({})'.format(patch)
if bool(self.os):
r += ' running on {0}'.format(self.os)
r += ' running on {}'.format(self.os)
return r
def __str__(self):
@ -1416,15 +1401,15 @@ class SSH: # pylint: disable=too-few-public-methods
def __repr__(self):
# type: () -> str
r = 'vendor={0}, '.format(self.vendor) if bool(self.vendor) else ''
r += 'product={0}'.format(self.product)
r = 'vendor={}, '.format(self.vendor) if bool(self.vendor) else ''
r += 'product={}'.format(self.product)
if bool(self.version):
r += ', version={0}'.format(self.version)
r += ', version={}'.format(self.version)
if bool(self.patch):
r += ', patch={0}'.format(self.patch)
r += ', patch={}'.format(self.patch)
if bool(self.os):
r += ', os={0}'.format(self.os)
return '<{0}({1})>'.format(self.__class__.__name__, r)
r += ', os={}'.format(self.os)
return '<{}({})>'.format(self.__class__.__name__, r)
@staticmethod
def _fix_patch(patch):
@ -1435,7 +1420,7 @@ class SSH: # pylint: disable=too-few-public-methods
def _fix_date(d):
# type: (str) -> Optional[str]
if d is not None and len(d) == 8:
return '{0}-{1}-{2}'.format(d[:4], d[4:6], d[6:8])
return '{}-{}-{}'.format(d[:4], d[4:6], d[6:8])
else:
return None
@ -1447,19 +1432,19 @@ class SSH: # pylint: disable=too-few-public-methods
mx = re.match(r'^NetBSD(?:_Secure_Shell)?(?:[\s-]+(\d{8})(.*))?$', c)
if bool(mx):
d = cls._fix_date(mx.group(1))
return 'NetBSD' if d is None else 'NetBSD ({0})'.format(d)
return 'NetBSD' if d is None else 'NetBSD ({})'.format(d)
mx = re.match(r'^FreeBSD(?:\slocalisations)?[\s-]+(\d{8})(.*)$', c)
if not bool(mx):
mx = re.match(r'^[^@]+@FreeBSD\.org[\s-]+(\d{8})(.*)$', c)
if bool(mx):
d = cls._fix_date(mx.group(1))
return 'FreeBSD' if d is None else 'FreeBSD ({0})'.format(d)
return 'FreeBSD' if d is None else 'FreeBSD ({})'.format(d)
w = ['RemotelyAnywhere', 'DesktopAuthority', 'RemoteSupportManager']
for win_soft in w:
mx = re.match(r'^in ' + win_soft + r' ([\d\.]+\d)$', c)
if bool(mx):
ver = mx.group(1)
return 'Microsoft Windows ({0} {1})'.format(win_soft, ver)
return 'Microsoft Windows ({} {})'.format(win_soft, ver)
generic = ['NetBSD', 'FreeBSD']
for g in generic:
if c.startswith(g) or c.endswith(g):
@ -1552,26 +1537,26 @@ class SSH: # pylint: disable=too-few-public-methods
def __str__(self):
# type: () -> str
r = 'SSH-{0}.{1}'.format(self.protocol[0], self.protocol[1])
r = 'SSH-{}.{}'.format(self.protocol[0], self.protocol[1])
if self.software is not None:
r += '-{0}'.format(self.software)
r += '-{}'.format(self.software)
if bool(self.comments):
r += ' {0}'.format(self.comments)
r += ' {}'.format(self.comments)
return r
def __repr__(self):
# type: () -> str
p = '{0}.{1}'.format(self.protocol[0], self.protocol[1])
r = 'protocol={0}'.format(p)
p = '{}.{}'.format(self.protocol[0], self.protocol[1])
r = 'protocol={}'.format(p)
if self.software is not None:
r += ', software={0}'.format(self.software)
r += ', software={}'.format(self.software)
if bool(self.comments):
r += ', comments={0}'.format(self.comments)
return '<{0}({1})>'.format(self.__class__.__name__, r)
r += ', comments={}'.format(self.comments)
return '<{}({})>'.format(self.__class__.__name__, r)
@classmethod
def parse(cls, banner):
# type: (text_type) -> Optional[SSH.Banner]
# type: (str) -> Optional[SSH.Banner]
valid_ascii = utils.is_print_ascii(banner)
ascii_banner = utils.to_print_ascii(banner)
mx = cls.RX_BANNER.match(ascii_banner)
@ -1589,22 +1574,22 @@ class SSH: # pylint: disable=too-few-public-methods
class Fingerprint:
def __init__(self, fpd):
# type: (binary_type) -> None
# type: (bytes) -> None
self.__fpd = fpd
@property
def md5(self):
# type: () -> text_type
# type: () -> str
h = hashlib.md5(self.__fpd).hexdigest()
r = u':'.join(h[i:i + 2] for i in range(0, len(h), 2))
return u'MD5:{0}'.format(r)
return u'MD5:{}'.format(r)
@property
def sha256(self):
# type: () -> text_type
# type: () -> str
h = base64.b64encode(hashlib.sha256(self.__fpd).digest())
r = h.decode('ascii').rstrip('=')
return u'SHA256:{0}'.format(r)
return u'SHA256:{}'.format(r)
class Algorithm:
class Timeframe:
@ -1679,7 +1664,7 @@ class SSH: # pylint: disable=too-few-public-methods
@classmethod
def get_since_text(cls, versions):
# type: (List[Optional[str]]) -> Optional[text_type]
# type: (List[Optional[str]]) -> Optional[str]
tv = []
if len(versions) == 0 or versions[0] is None:
return None
@ -1690,8 +1675,8 @@ class SSH: # pylint: disable=too-few-public-methods
if ssh_prod in [SSH.Product.LibSSH]:
continue
if is_cli:
ssh_ver = '{0} (client only)'.format(ssh_ver)
tv.append('{0} {1}'.format(ssh_prod, ssh_ver))
ssh_ver = '{} (client only)'.format(ssh_ver)
tv.append('{} {}'.format(ssh_prod, ssh_ver))
if len(tv) == 0:
return None
return 'available since ' + ', '.join(tv).rstrip(', ')
@ -1746,7 +1731,7 @@ class SSH: # pylint: disable=too-few-public-methods
def maxlen(self):
# type: () -> int
def _ml(items):
# type: (Sequence[text_type]) -> int
# type: (Sequence[str]) -> int
return max(len(i) for i in items)
maxlen = 0
if self.ssh1kex is not None:
@ -1888,7 +1873,7 @@ class SSH: # pylint: disable=too-few-public-methods
# type: (int, Dict[str, Dict[str, List[List[Optional[str]]]]]) -> None
self.__sshv = sshv
self.__db = db
self.__storage = {} # type: Dict[str, List[text_type]]
self.__storage = {} # type: Dict[str, List[str]]
@property
def sshv(self):
@ -1901,11 +1886,11 @@ class SSH: # pylint: disable=too-few-public-methods
return self.__db
def add(self, key, value):
# type: (str, List[text_type]) -> None
# type: (str, List[str]) -> None
self.__storage[key] = value
def items(self):
# type: () -> Iterable[Tuple[str, List[text_type]]]
# type: () -> Iterable[Tuple[str, List[str]]]
return self.__storage.items()
class Security: # pylint: disable=too-few-public-methods
@ -2043,13 +2028,13 @@ class SSH: # pylint: disable=too-few-public-methods
self.__sock_map = {}
self.__block_size = 8
self.__state = 0
self.__header = [] # type: List[text_type]
self.__header = [] # type: List[str]
self.__banner = None # type: Optional[SSH.Banner]
if host is None:
raise ValueError('undefined host')
nport = utils.parse_int(port)
if nport < 1 or nport > 65535:
raise ValueError('invalid port: {0}'.format(port))
raise ValueError('invalid port: {}'.format(port))
self.__host = host
self.__port = nport
if ipvo is not None:
@ -2081,7 +2066,7 @@ class SSH: # pylint: disable=too-few-public-methods
if not check or socktype == socket.SOCK_STREAM:
yield af, addr
except socket.error as e:
out.fail('[exception] {0}'.format(e))
out.fail('[exception] {}'.format(e))
sys.exit(1)
# Listens on a server socket and accepts one connection (used for
@ -2154,15 +2139,15 @@ class SSH: # pylint: disable=too-few-public-methods
err = e
self._close_socket(s)
if err is None:
errm = 'host {0} has no DNS records'.format(self.__host)
errm = 'host {} has no DNS records'.format(self.__host)
else:
errt = (self.__host, self.__port, err)
errm = 'cannot connect to {0} port {1}: {2}'.format(*errt)
out.fail('[exception] {0}'.format(errm))
errm = 'cannot connect to {} port {}: {}'.format(*errt)
out.fail('[exception] {}'.format(errm))
sys.exit(1)
def get_banner(self, sshv=2):
# type: (int) -> Tuple[Optional[SSH.Banner], List[text_type], Optional[str]]
# type: (int) -> Tuple[Optional[SSH.Banner], List[str], Optional[str]]
if self.__sock is None:
return self.__banner, self.__header, 'not connected'
banner = SSH_HEADER.format('1.5' if sshv == 1 else '2.0')
@ -2214,7 +2199,7 @@ class SSH: # pylint: disable=too-few-public-methods
return len(data), None
def send(self, data):
# type: (binary_type) -> Tuple[int, Optional[str]]
# type: (bytes) -> Tuple[int, Optional[str]]
if self.__sock is None:
return -1, 'not connected'
try:
@ -2238,7 +2223,7 @@ class SSH: # pylint: disable=too-few-public-methods
raise SSH.Socket.InsufficientReadException(e)
def read_packet(self, sshv=2):
# type: (int) -> Tuple[int, binary_type]
# type: (int) -> Tuple[int, bytes]
try:
header = WriteBuf()
self.ensure_read(4)
@ -2669,7 +2654,7 @@ class KexGroupExchange_SHA256(KexGroupExchange):
def output_algorithms(title, alg_db, alg_type, algorithms, unknown_algs, maxlen=0, alg_sizes=None):
# type: (str, Dict[str, Dict[str, List[List[Optional[str]]]]], str, List[text_type], int) -> None
# type: (str, Dict[str, Dict[str, List[List[Optional[str]]]]], str, List[str], int) -> None
with OutputBuffer() as obuf:
for algorithm in algorithms:
output_algorithm(alg_db, alg_type, algorithm, unknown_algs, maxlen, alg_sizes)
@ -2680,7 +2665,7 @@ def output_algorithms(title, alg_db, alg_type, algorithms, unknown_algs, maxlen=
def output_algorithm(alg_db, alg_type, alg_name, unknown_algs, alg_max_len=0, alg_sizes=None):
# type: (Dict[str, Dict[str, List[List[Optional[str]]]]], str, text_type, int) -> None
# type: (Dict[str, Dict[str, List[List[Optional[str]]]]], str, str, int) -> None
prefix = '(' + alg_type + ') '
if alg_max_len == 0:
alg_max_len = len(alg_name)
@ -2758,9 +2743,9 @@ def output_compatibility(algs, client_audit, for_server=True):
if v_from is None:
continue
if v_till is None:
comp_text.append('{0} {1}+'.format(ssh_prod, v_from))
comp_text.append('{} {}+'.format(ssh_prod, v_from))
elif v_from == v_till:
comp_text.append('{0} {1}'.format(ssh_prod, v_from))
comp_text.append('{} {}'.format(ssh_prod, v_from))
else:
software = SSH.Software(None, ssh_prod, v_from, None, None)
if software.compare_version(v_till) > 0:
@ -2797,10 +2782,10 @@ def output_security_sub(sub, software, client_audit, padlen):
out_func = out.warn
if cvss >= 8.0:
out_func = out.fail
out_func('(cve) {0}{1} -- (CVSSv2: {2}) {3}'.format(name, p, cvss, descr))
out_func('(cve) {}{} -- (CVSSv2: {}) {}'.format(name, p, cvss, descr))
else:
descr = line[4]
out.fail('(sec) {0}{1} -- {2}'.format(name, p, descr))
out.fail('(sec) {}{} -- {}'.format(name, p, descr))
def output_security(banner, client_audit, padlen):
@ -2847,7 +2832,7 @@ def output_fingerprints(algs, sha256=True):
fpo = fp.sha256 if sha256 else fp.md5
# p = '' if out.batch else ' ' * (padlen - len(name))
# out.good('(fin) {0}{1} -- {2} {3}'.format(name, p, bits, fpo))
out.good('(fin) {0}: {1}'.format(name, fpo))
out.good('(fin) {}: {}'.format(name, fpo))
if len(obuf) > 0:
out.head('# fingerprints')
obuf.flush()
@ -2913,15 +2898,15 @@ def output_recommendations(algs, software, padlen=0):
an, sg, fn = 'change', '!', out.fail
ret = False
chg_additional_info = ' (increase modulus size to 2048 bits or larger)'
b = '(SSH{0})'.format(sshv) if sshv == 1 else ''
b = '(SSH{})'.format(sshv) if sshv == 1 else ''
fm = '(rec) {0}{1}{2}-- {3} algorithm to {4}{5} {6}'
fn(fm.format(sg, name, p, alg_type, an, chg_additional_info, b))
if len(obuf) > 0:
if software is not None:
title = '(for {0})'.format(software.display(False))
title = '(for {})'.format(software.display(False))
else:
title = ''
out.head('# algorithm recommendations {0}'.format(title))
out.head('# algorithm recommendations {}'.format(title))
obuf.flush(True) # Sort the output so that it is always stable (needed for repeatable testing).
out.sep()
return ret
@ -2945,17 +2930,17 @@ def output_info(software, client_audit, any_problems):
def output(banner, header, client_host=None, kex=None, pkm=None):
# type: (Optional[SSH.Banner], List[text_type], Optional[SSH2.Kex], Optional[SSH1.PublicKeyMessage]) -> None
# type: (Optional[SSH.Banner], List[str], Optional[SSH2.Kex], Optional[SSH1.PublicKeyMessage]) -> None
client_audit = client_host is not None # If set, this is a client audit.
sshv = 1 if pkm is not None else 2
algs = SSH.Algorithms(pkm, kex)
with OutputBuffer() as obuf:
if client_audit:
out.good('(gen) client IP: {0}'.format(client_host))
out.good('(gen) client IP: {}'.format(client_host))
if len(header) > 0:
out.info('(gen) header: ' + '\n'.join(header))
if banner is not None:
out.good('(gen) banner: {0}'.format(banner))
out.good('(gen) banner: {}'.format(banner))
if not banner.valid_ascii:
# NOTE: RFC 4253, Section 4.2
out.warn('(gen) banner contains non-printable ASCII')
@ -2963,17 +2948,17 @@ def output(banner, header, client_host=None, kex=None, pkm=None):
out.fail('(gen) protocol SSH1 enabled')
software = SSH.Software.parse(banner)
if software is not None:
out.good('(gen) software: {0}'.format(software))
out.good('(gen) software: {}'.format(software))
else:
software = None
output_compatibility(algs, client_audit)
if kex is not None:
compressions = [x for x in kex.server.compression if x != 'none']
if len(compressions) > 0:
cmptxt = 'enabled ({0})'.format(', '.join(compressions))
cmptxt = 'enabled ({})'.format(', '.join(compressions))
else:
cmptxt = 'disabled'
out.good('(gen) compression: {0}'.format(cmptxt))
out.good('(gen) compression: {}'.format(cmptxt))
if len(obuf) > 0:
out.head('# general')
obuf.flush()
@ -3013,43 +2998,41 @@ def output(banner, header, client_host=None, kex=None, pkm=None):
class Utils:
@classmethod
def _type_err(cls, v, target):
# type: (Any, text_type) -> TypeError
return TypeError('cannot convert {0} to {1}'.format(type(v), target))
# type: (Any, str) -> TypeError
return TypeError('cannot convert {} to {}'.format(type(v), target))
@classmethod
def to_bytes(cls, v, enc='utf-8'):
# type: (Union[binary_type, text_type], str) -> binary_type
if isinstance(v, binary_type):
# type: (Union[bytes, str], str) -> bytes
if isinstance(v, bytes):
return v
elif isinstance(v, text_type):
elif isinstance(v, str):
return v.encode(enc)
raise cls._type_err(v, 'bytes')
@classmethod
def to_utext(cls, v, enc='utf-8'):
# type: (Union[text_type, binary_type], str) -> text_type
if isinstance(v, text_type):
# type: (Union[str, bytes], str) -> str
if isinstance(v, str):
return v
elif isinstance(v, binary_type):
elif isinstance(v, bytes):
return v.decode(enc)
raise cls._type_err(v, 'unicode text')
@classmethod
def to_ntext(cls, v, enc='utf-8'):
# type: (Union[text_type, binary_type], str) -> str
# type: (Union[str, bytes], str) -> str
if isinstance(v, str):
return v
elif isinstance(v, text_type):
return v.encode(enc) # PY2 only
elif isinstance(v, binary_type):
return v.decode(enc) # PY3 only
elif isinstance(v, bytes):
return v.decode(enc)
raise cls._type_err(v, 'native text')
@classmethod
def _is_ascii(cls, v, char_filter=lambda x: x <= 127):
# type: (Union[text_type, str], Callable[[int], bool]) -> bool
# type: (str, Callable[[int], bool]) -> bool
r = False
if isinstance(v, (text_type, str)):
if isinstance(v, str):
for c in v:
i = cls.ctoi(c)
if not char_filter(i):
@ -3059,8 +3042,8 @@ class Utils:
@classmethod
def _to_ascii(cls, v, char_filter=lambda x: x <= 127, errors='replace'):
# type: (Union[text_type, str], Callable[[int], bool], str) -> str
if isinstance(v, (text_type, str)):
# type: (str, Callable[[int], bool], str) -> str
if isinstance(v, str):
r = bytearray()
for c in v:
i = cls.ctoi(c)
@ -3075,22 +3058,22 @@ class Utils:
@classmethod
def is_ascii(cls, v):
# type: (Union[text_type, str]) -> bool
# type: (str) -> bool
return cls._is_ascii(v)
@classmethod
def to_ascii(cls, v, errors='replace'):
# type: (Union[text_type, str], str) -> str
# type: (str, str) -> str
return cls._to_ascii(v, errors=errors)
@classmethod
def is_print_ascii(cls, v):
# type: (Union[text_type, str]) -> bool
# type: (str) -> bool
return cls._is_ascii(v, lambda x: 126 >= x >= 32)
@classmethod
def to_print_ascii(cls, v, errors='replace'):
# type: (Union[text_type, str], str) -> str
# type: (str, str) -> str
return cls._to_ascii(v, lambda x: 126 >= x >= 32, errors)
@classmethod
@ -3110,8 +3093,8 @@ class Utils:
@classmethod
def ctoi(cls, c):
# type: (Union[text_type, str, int]) -> int
if isinstance(c, (text_type, str)):
# type: (Union[str, int]) -> int
if isinstance(c, str):
return ord(c[0])
else:
return c
@ -3230,7 +3213,7 @@ def audit(aconf, sshv=None):
if err is None:
err = '[exception] did not receive banner.'
else:
err = '[exception] did not receive banner: {0}'.format(err)
err = '[exception] did not receive banner: {}'.format(err)
if err is None:
packet_type, payload = s.read_packet(sshv)
if packet_type < 0:
@ -3240,12 +3223,12 @@ def audit(aconf, sshv=None):
else:
payload_txt = u'empty'
except UnicodeDecodeError:
payload_txt = u'"{0}"'.format(repr(payload).lstrip('b')[1:-1])
payload_txt = u'"{}"'.format(repr(payload).lstrip('b')[1:-1])
if payload_txt == u'Protocol major versions differ.':
if sshv == 2 and aconf.ssh1:
audit(aconf, 1)
return
err = '[exception] error reading packet ({0})'.format(payload_txt)
err = '[exception] error reading packet ({})'.format(payload_txt)
else:
err_pair = None
if sshv == 1 and packet_type != SSH.Protocol.SMSG_PUBLIC_KEY: