Fix pylint reported issues and disable unnecessary ones.

This commit is contained in:
Andris Raugulis 2016-10-20 20:00:51 +03:00
parent a5f1cd9197
commit 5b3b630623

View File

@ -33,33 +33,33 @@ if sys.version_info >= (3,):
text_type = str
binary_type = bytes
else:
import StringIO as _StringIO
import StringIO as _StringIO # pylint: disable=import-error
StringIO = BytesIO = _StringIO.StringIO
text_type = unicode # pylint: disable=undefined-variable
binary_type = str
try:
# pylint: disable=unused-import
from typing import List, Tuple, Optional, Callable, Union, Any
except:
except ImportError:
pass
def usage(err=None):
# type: (Optional[str]) -> None
out = Output()
uout = Output()
p = os.path.basename(sys.argv[0])
out.head('# {0} {1}, moo@arthepsy.eu'.format(p, VERSION))
uout.head('# {0} {1}, moo@arthepsy.eu'.format(p, VERSION))
if err is not None:
out.fail('\n' + err)
out.info('\nusage: {0} [-12bnv] [-l <level>] <host[:port]>\n'.format(p))
out.info(' -h, --help print this help')
out.info(' -1, --ssh1 force ssh version 1 only')
out.info(' -2, --ssh2 force ssh version 2 only')
out.info(' -b, --batch batch output')
out.info(' -n, --no-colors disable colors')
out.info(' -v, --verbose verbose output')
out.info(' -l, --level=<level> minimum output level (info|warn|fail)')
out.sep()
uout.fail('\n' + err)
uout.info('\nusage: {0} [-12bnv] [-l <level>] <host[:port]>\n'.format(p))
uout.info(' -h, --help print this help')
uout.info(' -1, --ssh1 force ssh version 1 only')
uout.info(' -2, --ssh2 force ssh version 2 only')
uout.info(' -b, --batch batch output')
uout.info(' -n, --no-colors disable colors')
uout.info(' -v, --verbose verbose output')
uout.info(' -l, --level=<level> minimum output level (info|warn|fail)')
uout.sep()
sys.exit(1)
@ -97,7 +97,8 @@ class AuditConf(object):
@classmethod
def from_cmdline(cls, args, usage_cb):
# type: (List[str], Callable[..., None]) -> AuditConf
conf = cls()
# pylint: disable=too-many-branches
aconf = cls()
try:
sopts = 'h12bnvl:'
lopts = ['help', 'ssh1', 'ssh2', 'batch',
@ -105,25 +106,25 @@ class AuditConf(object):
opts, args = getopt.getopt(args, sopts, lopts)
except getopt.GetoptError as err:
usage_cb(str(err))
conf.ssh1, conf.ssh2 = False, False
aconf.ssh1, aconf.ssh2 = False, False
for o, a in opts:
if o in ('-h', '--help'):
usage_cb()
elif o in ('-1', '--ssh1'):
conf.ssh1 = True
aconf.ssh1 = True
elif o in ('-2', '--ssh2'):
conf.ssh2 = True
aconf.ssh2 = True
elif o in ('-b', '--batch'):
conf.batch = True
conf.verbose = True
aconf.batch = True
aconf.verbose = True
elif o in ('-n', '--no-colors'):
conf.colors = False
aconf.colors = False
elif o in ('-v', '--verbose'):
conf.verbose = True
aconf.verbose = True
elif o in ('-l', '--level'):
if a not in ('info', 'warn', 'fail'):
usage_cb('level {0} is not valid'.format(a))
conf.minlevel = a
aconf.minlevel = a
if len(args) == 0:
usage_cb()
s = args[0].split(':')
@ -134,11 +135,11 @@ class AuditConf(object):
usage_cb('host is empty')
if port <= 0 or port > 65535:
usage_cb('port {0} is not valid'.format(s[1]))
conf.host = host
conf.port = port
if not (conf.ssh1 or conf.ssh2):
conf.ssh1, conf.ssh2 = True, True
return conf
aconf.host = host
aconf.port = port
if not (aconf.ssh1 or aconf.ssh2):
aconf.ssh1, aconf.ssh2 = True, True
return aconf
class Output(object):
@ -195,9 +196,9 @@ class Output(object):
class OutputBuffer(list):
# pylint: disable=attribute-defined-outside-init
def __enter__(self):
# type: () -> OutputBuffer
# pylint: disable=attribute-defined-outside-init
self.__buf = StringIO()
self.__stdout = sys.stdout
sys.stdout = self.__buf
@ -214,7 +215,7 @@ class OutputBuffer(list):
sys.stdout = self.__stdout
class SSH2(object):
class SSH2(object): # pylint: disable=too-few-public-methods
class KexParty(object):
def __init__(self, enc, mac, compression, languages):
# type: (List[text_type], List[text_type], List[text_type], List[text_type]) -> None
@ -345,7 +346,7 @@ class SSH1(object):
for i in range(256):
crc = 0
n = i
for j in range(8):
for _ in range(8):
x = (crc ^ n) & 1
crc = (crc >> 1) ^ (x * 0xedb88320)
n = n >> 1
@ -371,7 +372,8 @@ class SSH1(object):
cls._crc32 = cls.CRC32()
return cls._crc32.calc(v)
class KexDB(object):
class KexDB(object): # pylint: disable=too-few-public-methods
# pylint: disable=bad-whitespace
FAIL_PLAINTEXT = 'no encryption/integrity'
FAIL_OPENSSH37_REMOVE = 'removed since OpenSSH 3.7'
FAIL_NA_BROKEN = 'not implemented in OpenSSH, broken algorithm'
@ -451,6 +453,7 @@ class SSH1(object):
@property
def host_key_fingerprint_data(self):
# type: () -> binary_type
# pylint: disable=protected-access
mod = WriteBuf._create_mpint(self.host_key_public_modulus, False)
e = WriteBuf._create_mpint(self.host_key_public_exponent, False)
return mod + e
@ -686,27 +689,28 @@ class WriteBuf(object):
return payload
class SSH(object):
class Protocol(object):
class SSH(object): # pylint: disable=too-few-public-methods
class Protocol(object): # pylint: disable=too-few-public-methods
# pylint: disable=bad-whitespace
SMSG_PUBLIC_KEY = 2
MSG_KEXINIT = 20
MSG_NEWKEYS = 21
MSG_KEXDH_INIT = 30
MSG_KEXDH_REPLY = 32
class Product(object):
class Product(object): # pylint: disable=too-few-public-methods
OpenSSH = 'OpenSSH'
DropbearSSH = 'Dropbear SSH'
LibSSH = 'libssh'
class Software(object):
def __init__(self, vendor, product, version, patch, os):
def __init__(self, vendor, product, version, patch, os_version):
# type: (Optional[str], str, str, Optional[str], Optional[str]) -> None
self.__vendor = vendor
self.__product = product
self.__version = version
self.__patch = patch
self.__os = os
self.__os = os_version
@property
def vendor(self):
@ -735,6 +739,7 @@ class SSH(object):
def compare_version(self, other):
# type: (Union[None, SSH.Software, text_type]) -> int
# pylint: disable=too-many-branches
if other is None:
return 1
if isinstance(other, SSH.Software):
@ -780,22 +785,22 @@ class SSH(object):
def display(self, full=True):
# type: (bool) -> str
out = '{0} '.format(self.vendor) if self.vendor else ''
out += self.product
r = '{0} '.format(self.vendor) if self.vendor else ''
r += self.product
if self.version:
out += ' {0}'.format(self.version)
r += ' {0}'.format(self.version)
if full:
patch = self.patch or ''
if self.product == SSH.Product.OpenSSH:
mx = re.match(r'^(p\d)(.*)$', patch)
if mx is not None:
out += mx.group(1)
r += mx.group(1)
patch = mx.group(2).strip()
if patch:
out += ' ({0})'.format(patch)
r += ' ({0})'.format(patch)
if self.os:
out += ' running on {0}'.format(self.os)
return out
r += ' running on {0}'.format(self.os)
return r
def __str__(self):
# type: () -> str
@ -803,18 +808,18 @@ class SSH(object):
def __repr__(self):
# type: () -> str
out = 'vendor={0}'.format(self.vendor) if self.vendor else ''
r = 'vendor={0}'.format(self.vendor) if self.vendor else ''
if self.product:
if self.vendor:
out += ', '
out += 'product={0}'.format(self.product)
r += ', '
r += 'product={0}'.format(self.product)
if self.version:
out += ', version={0}'.format(self.version)
r += ', version={0}'.format(self.version)
if self.patch:
out += ', patch={0}'.format(self.patch)
r += ', patch={0}'.format(self.patch)
if self.os:
out += ', os={0}'.format(self.os)
return '<{0}({1})>'.format(self.__class__.__name__, out)
r += ', os={0}'.format(self.os)
return '<{0}({1})>'.format(self.__class__.__name__, r)
@staticmethod
def _fix_patch(patch):
@ -830,7 +835,7 @@ class SSH(object):
return None
@classmethod
def _extract_os(cls, c):
def _extract_os_version(cls, c):
# type: (Optional[str]) -> str
if c is None:
return None
@ -859,6 +864,7 @@ class SSH(object):
@classmethod
def parse(cls, banner):
# type: (SSH.Banner) -> SSH.Software
# pylint: disable=too-many-return-statements
software = str(banner.software)
mx = re.match(r'^dropbear_([\d\.]+\d+)(.*)', software)
if mx:
@ -871,14 +877,14 @@ class SSH(object):
patch = cls._fix_patch(mx.group(2))
v, p = 'OpenBSD', SSH.Product.OpenSSH
v = None
os = cls._extract_os(banner.comments)
return cls(v, p, mx.group(1), patch, os)
os_version = cls._extract_os_version(banner.comments)
return cls(v, p, mx.group(1), patch, os_version)
mx = re.match(r'^libssh-([\d\.]+\d+)(.*)', software)
if mx:
patch = cls._fix_patch(mx.group(2))
v, p = None, SSH.Product.LibSSH
os = cls._extract_os(banner.comments)
return cls(v, p, mx.group(1), patch, os)
os_version = cls._extract_os_version(banner.comments)
return cls(v, p, mx.group(1), patch, os_version)
mx = re.match(r'^RomSShell_([\d\.]+\d+)(.*)', software)
if mx:
patch = cls._fix_patch(mx.group(2))
@ -928,22 +934,22 @@ class SSH(object):
def __str__(self):
# type: () -> str
out = 'SSH-{0}.{1}'.format(self.protocol[0], self.protocol[1])
r = 'SSH-{0}.{1}'.format(self.protocol[0], self.protocol[1])
if self.software is not None:
out += '-{0}'.format(self.software)
r += '-{0}'.format(self.software)
if self.comments:
out += ' {0}'.format(self.comments)
return out
r += ' {0}'.format(self.comments)
return r
def __repr__(self):
# type: () -> str
p = '{0}.{1}'.format(self.protocol[0], self.protocol[1])
out = 'protocol={0}'.format(p)
r = 'protocol={0}'.format(p)
if self.software:
out += ', software={0}'.format(self.software)
r += ', software={0}'.format(self.software)
if self.comments:
out += ', comments={0}'.format(self.comments)
return '<{0}({1})>'.format(self.__class__.__name__, out)
r += ', comments={0}'.format(self.comments)
return '<{0}({1})>'.format(self.__class__.__name__, r)
@classmethod
def parse(cls, banner):
@ -982,7 +988,8 @@ class SSH(object):
r = h.decode('ascii').rstrip('=')
return u'SHA256:{0}'.format(r)
class Security(object):
class Security(object): # pylint: disable=too-few-public-methods
# pylint: disable=bad-whitespace
CVE = {
'Dropbear SSH': [
['0.44', '2015.71', 1, 'CVE-2016-3116', 5.5, 'bypass command restrictions via xauth command injection'],
@ -1031,7 +1038,7 @@ class SSH(object):
try:
self.__sock = socket.create_connection((host, port), cto)
self.__sock.settimeout(rto)
except Exception as e:
except Exception as e: # pylint: disable=broad-except
out.fail('[fail] {0}'.format(e))
sys.exit(1)
@ -1184,7 +1191,7 @@ class SSH(object):
try:
self.__sock.shutdown(socket.SHUT_RDWR)
self.__sock.close()
except:
except: # pylint: disable=bare-except
pass
@ -1236,7 +1243,8 @@ class KexGroup14(KexDH):
super(KexGroup14, self).__init__('sha1', 2, p)
class KexDB(object):
class KexDB(object): # pylint: disable=too-few-public-methods
# pylint: disable=bad-whitespace
WARN_OPENSSH72_LEGACY = 'disabled (in client) since OpenSSH 7.2, legacy algorithm'
FAIL_OPENSSH70_LEGACY = 'removed since OpenSSH 7.0, legacy algorithm'
FAIL_OPENSSH70_WEAK = 'removed (in server) and disabled (in client) since OpenSSH 7.0, weak algorithm'
@ -1397,7 +1405,7 @@ def get_ssh_timeframe(alg_pairs, for_server=True):
# type: (List[Tuple[int, Dict[str, Dict[str, List[List[str]]]], List[Tuple[str, List[text_type]]]]], bool) -> Dict[str, List[Optional[str]]]
timeframe = {} # type: Dict[str, List[Optional[str]]]
for alg_pair in alg_pairs:
sshv, alg_db = alg_pair[0], alg_pair[1]
alg_db = alg_pair[1]
for alg_set in alg_pair[2]:
alg_type, alg_list = alg_set
for alg_name in alg_list:
@ -1448,6 +1456,7 @@ def get_alg_pairs(kex, pkm):
def get_alg_recommendations(software, kex, pkm, for_server=True):
# type: (SSH.Software, SSH2.Kex, SSH1.PublicKeyMessage, bool) -> Tuple[SSH.Software, Dict[int, Dict[str, Dict[str, Dict[str, int]]]]]
# pylint: disable=too-many-locals,too-many-statements
alg_pairs = get_alg_pairs(kex, pkm)
vproducts = [SSH.Product.OpenSSH,
SSH.Product.DropbearSSH,
@ -1833,19 +1842,19 @@ class Utils(object):
# type: (Any) -> int
try:
return int(v)
except:
except: # pylint: disable=bare-except
return 0
def audit(conf, sshv=None):
def audit(aconf, sshv=None):
# type: (AuditConf, Optional[int]) -> None
out.batch = conf.batch
out.colors = conf.colors
out.verbose = conf.verbose
out.minlevel = conf.minlevel
s = SSH.Socket(conf.host, conf.port)
out.batch = aconf.batch
out.colors = aconf.colors
out.verbose = aconf.verbose
out.minlevel = aconf.minlevel
s = SSH.Socket(aconf.host, aconf.port)
if sshv is None:
sshv = 2 if conf.ssh2 else 1
sshv = 2 if aconf.ssh2 else 1
err = None
banner, header = s.get_banner(sshv)
if banner is None:
@ -1858,8 +1867,8 @@ def audit(conf, sshv=None):
except UnicodeDecodeError:
payload_txt = u'"{0}"'.format(repr(payload).lstrip('b')[1:-1])
if payload_txt == u'Protocol major versions differ.':
if sshv == 2 and conf.ssh1:
audit(conf, 1)
if sshv == 2 and aconf.ssh1:
audit(aconf, 1)
return
err = '[exception] error reading packet ({0})'.format(payload_txt)
else: