From 5b3b63062370482cfc79e465a65fb32471e3850c Mon Sep 17 00:00:00 2001 From: Andris Raugulis Date: Thu, 20 Oct 2016 20:00:51 +0300 Subject: [PATCH] Fix pylint reported issues and disable unnecessary ones. --- ssh-audit.py | 167 +++++++++++++++++++++++++++------------------------ 1 file changed, 88 insertions(+), 79 deletions(-) diff --git a/ssh-audit.py b/ssh-audit.py index e1f82df..b88e512 100755 --- a/ssh-audit.py +++ b/ssh-audit.py @@ -33,33 +33,33 @@ if sys.version_info >= (3,): text_type = str binary_type = bytes else: - import StringIO as _StringIO + import StringIO as _StringIO # pylint: disable=import-error StringIO = BytesIO = _StringIO.StringIO text_type = unicode # pylint: disable=undefined-variable binary_type = str try: # pylint: disable=unused-import from typing import List, Tuple, Optional, Callable, Union, Any -except: +except ImportError: pass def usage(err=None): # type: (Optional[str]) -> None - out = Output() + uout = Output() p = os.path.basename(sys.argv[0]) - out.head('# {0} {1}, moo@arthepsy.eu'.format(p, VERSION)) + uout.head('# {0} {1}, moo@arthepsy.eu'.format(p, VERSION)) if err is not None: - out.fail('\n' + err) - out.info('\nusage: {0} [-12bnv] [-l ] \n'.format(p)) - out.info(' -h, --help print this help') - out.info(' -1, --ssh1 force ssh version 1 only') - out.info(' -2, --ssh2 force ssh version 2 only') - out.info(' -b, --batch batch output') - out.info(' -n, --no-colors disable colors') - out.info(' -v, --verbose verbose output') - out.info(' -l, --level= minimum output level (info|warn|fail)') - out.sep() + uout.fail('\n' + err) + uout.info('\nusage: {0} [-12bnv] [-l ] \n'.format(p)) + uout.info(' -h, --help print this help') + uout.info(' -1, --ssh1 force ssh version 1 only') + uout.info(' -2, --ssh2 force ssh version 2 only') + uout.info(' -b, --batch batch output') + uout.info(' -n, --no-colors disable colors') + uout.info(' -v, --verbose verbose output') + uout.info(' -l, --level= minimum output level (info|warn|fail)') + uout.sep() sys.exit(1) @@ -97,7 +97,8 @@ class AuditConf(object): @classmethod def from_cmdline(cls, args, usage_cb): # type: (List[str], Callable[..., None]) -> AuditConf - conf = cls() + # pylint: disable=too-many-branches + aconf = cls() try: sopts = 'h12bnvl:' lopts = ['help', 'ssh1', 'ssh2', 'batch', @@ -105,25 +106,25 @@ class AuditConf(object): opts, args = getopt.getopt(args, sopts, lopts) except getopt.GetoptError as err: usage_cb(str(err)) - conf.ssh1, conf.ssh2 = False, False + aconf.ssh1, aconf.ssh2 = False, False for o, a in opts: if o in ('-h', '--help'): usage_cb() elif o in ('-1', '--ssh1'): - conf.ssh1 = True + aconf.ssh1 = True elif o in ('-2', '--ssh2'): - conf.ssh2 = True + aconf.ssh2 = True elif o in ('-b', '--batch'): - conf.batch = True - conf.verbose = True + aconf.batch = True + aconf.verbose = True elif o in ('-n', '--no-colors'): - conf.colors = False + aconf.colors = False elif o in ('-v', '--verbose'): - conf.verbose = True + aconf.verbose = True elif o in ('-l', '--level'): if a not in ('info', 'warn', 'fail'): usage_cb('level {0} is not valid'.format(a)) - conf.minlevel = a + aconf.minlevel = a if len(args) == 0: usage_cb() s = args[0].split(':') @@ -134,11 +135,11 @@ class AuditConf(object): usage_cb('host is empty') if port <= 0 or port > 65535: usage_cb('port {0} is not valid'.format(s[1])) - conf.host = host - conf.port = port - if not (conf.ssh1 or conf.ssh2): - conf.ssh1, conf.ssh2 = True, True - return conf + aconf.host = host + aconf.port = port + if not (aconf.ssh1 or aconf.ssh2): + aconf.ssh1, aconf.ssh2 = True, True + return aconf class Output(object): @@ -195,9 +196,9 @@ class Output(object): class OutputBuffer(list): - # pylint: disable=attribute-defined-outside-init def __enter__(self): # type: () -> OutputBuffer + # pylint: disable=attribute-defined-outside-init self.__buf = StringIO() self.__stdout = sys.stdout sys.stdout = self.__buf @@ -214,7 +215,7 @@ class OutputBuffer(list): sys.stdout = self.__stdout -class SSH2(object): +class SSH2(object): # pylint: disable=too-few-public-methods class KexParty(object): def __init__(self, enc, mac, compression, languages): # type: (List[text_type], List[text_type], List[text_type], List[text_type]) -> None @@ -345,7 +346,7 @@ class SSH1(object): for i in range(256): crc = 0 n = i - for j in range(8): + for _ in range(8): x = (crc ^ n) & 1 crc = (crc >> 1) ^ (x * 0xedb88320) n = n >> 1 @@ -371,7 +372,8 @@ class SSH1(object): cls._crc32 = cls.CRC32() return cls._crc32.calc(v) - class KexDB(object): + class KexDB(object): # pylint: disable=too-few-public-methods + # pylint: disable=bad-whitespace FAIL_PLAINTEXT = 'no encryption/integrity' FAIL_OPENSSH37_REMOVE = 'removed since OpenSSH 3.7' FAIL_NA_BROKEN = 'not implemented in OpenSSH, broken algorithm' @@ -451,6 +453,7 @@ class SSH1(object): @property def host_key_fingerprint_data(self): # type: () -> binary_type + # pylint: disable=protected-access mod = WriteBuf._create_mpint(self.host_key_public_modulus, False) e = WriteBuf._create_mpint(self.host_key_public_exponent, False) return mod + e @@ -686,27 +689,28 @@ class WriteBuf(object): return payload -class SSH(object): - class Protocol(object): +class SSH(object): # pylint: disable=too-few-public-methods + class Protocol(object): # pylint: disable=too-few-public-methods + # pylint: disable=bad-whitespace SMSG_PUBLIC_KEY = 2 MSG_KEXINIT = 20 MSG_NEWKEYS = 21 MSG_KEXDH_INIT = 30 MSG_KEXDH_REPLY = 32 - class Product(object): + class Product(object): # pylint: disable=too-few-public-methods OpenSSH = 'OpenSSH' DropbearSSH = 'Dropbear SSH' LibSSH = 'libssh' class Software(object): - def __init__(self, vendor, product, version, patch, os): + def __init__(self, vendor, product, version, patch, os_version): # type: (Optional[str], str, str, Optional[str], Optional[str]) -> None self.__vendor = vendor self.__product = product self.__version = version self.__patch = patch - self.__os = os + self.__os = os_version @property def vendor(self): @@ -735,6 +739,7 @@ class SSH(object): def compare_version(self, other): # type: (Union[None, SSH.Software, text_type]) -> int + # pylint: disable=too-many-branches if other is None: return 1 if isinstance(other, SSH.Software): @@ -780,22 +785,22 @@ class SSH(object): def display(self, full=True): # type: (bool) -> str - out = '{0} '.format(self.vendor) if self.vendor else '' - out += self.product + r = '{0} '.format(self.vendor) if self.vendor else '' + r += self.product if self.version: - out += ' {0}'.format(self.version) + r += ' {0}'.format(self.version) if full: patch = self.patch or '' if self.product == SSH.Product.OpenSSH: mx = re.match(r'^(p\d)(.*)$', patch) if mx is not None: - out += mx.group(1) + r += mx.group(1) patch = mx.group(2).strip() if patch: - out += ' ({0})'.format(patch) + r += ' ({0})'.format(patch) if self.os: - out += ' running on {0}'.format(self.os) - return out + r += ' running on {0}'.format(self.os) + return r def __str__(self): # type: () -> str @@ -803,18 +808,18 @@ class SSH(object): def __repr__(self): # type: () -> str - out = 'vendor={0}'.format(self.vendor) if self.vendor else '' + r = 'vendor={0}'.format(self.vendor) if self.vendor else '' if self.product: if self.vendor: - out += ', ' - out += 'product={0}'.format(self.product) + r += ', ' + r += 'product={0}'.format(self.product) if self.version: - out += ', version={0}'.format(self.version) + r += ', version={0}'.format(self.version) if self.patch: - out += ', patch={0}'.format(self.patch) + r += ', patch={0}'.format(self.patch) if self.os: - out += ', os={0}'.format(self.os) - return '<{0}({1})>'.format(self.__class__.__name__, out) + r += ', os={0}'.format(self.os) + return '<{0}({1})>'.format(self.__class__.__name__, r) @staticmethod def _fix_patch(patch): @@ -830,7 +835,7 @@ class SSH(object): return None @classmethod - def _extract_os(cls, c): + def _extract_os_version(cls, c): # type: (Optional[str]) -> str if c is None: return None @@ -859,6 +864,7 @@ class SSH(object): @classmethod def parse(cls, banner): # type: (SSH.Banner) -> SSH.Software + # pylint: disable=too-many-return-statements software = str(banner.software) mx = re.match(r'^dropbear_([\d\.]+\d+)(.*)', software) if mx: @@ -871,14 +877,14 @@ class SSH(object): patch = cls._fix_patch(mx.group(2)) v, p = 'OpenBSD', SSH.Product.OpenSSH v = None - os = cls._extract_os(banner.comments) - return cls(v, p, mx.group(1), patch, os) + os_version = cls._extract_os_version(banner.comments) + return cls(v, p, mx.group(1), patch, os_version) mx = re.match(r'^libssh-([\d\.]+\d+)(.*)', software) if mx: patch = cls._fix_patch(mx.group(2)) v, p = None, SSH.Product.LibSSH - os = cls._extract_os(banner.comments) - return cls(v, p, mx.group(1), patch, os) + os_version = cls._extract_os_version(banner.comments) + return cls(v, p, mx.group(1), patch, os_version) mx = re.match(r'^RomSShell_([\d\.]+\d+)(.*)', software) if mx: patch = cls._fix_patch(mx.group(2)) @@ -928,22 +934,22 @@ class SSH(object): def __str__(self): # type: () -> str - out = 'SSH-{0}.{1}'.format(self.protocol[0], self.protocol[1]) + r = 'SSH-{0}.{1}'.format(self.protocol[0], self.protocol[1]) if self.software is not None: - out += '-{0}'.format(self.software) + r += '-{0}'.format(self.software) if self.comments: - out += ' {0}'.format(self.comments) - return out + r += ' {0}'.format(self.comments) + return r def __repr__(self): # type: () -> str p = '{0}.{1}'.format(self.protocol[0], self.protocol[1]) - out = 'protocol={0}'.format(p) + r = 'protocol={0}'.format(p) if self.software: - out += ', software={0}'.format(self.software) + r += ', software={0}'.format(self.software) if self.comments: - out += ', comments={0}'.format(self.comments) - return '<{0}({1})>'.format(self.__class__.__name__, out) + r += ', comments={0}'.format(self.comments) + return '<{0}({1})>'.format(self.__class__.__name__, r) @classmethod def parse(cls, banner): @@ -982,7 +988,8 @@ class SSH(object): r = h.decode('ascii').rstrip('=') return u'SHA256:{0}'.format(r) - class Security(object): + class Security(object): # pylint: disable=too-few-public-methods + # pylint: disable=bad-whitespace CVE = { 'Dropbear SSH': [ ['0.44', '2015.71', 1, 'CVE-2016-3116', 5.5, 'bypass command restrictions via xauth command injection'], @@ -1031,7 +1038,7 @@ class SSH(object): try: self.__sock = socket.create_connection((host, port), cto) self.__sock.settimeout(rto) - except Exception as e: + except Exception as e: # pylint: disable=broad-except out.fail('[fail] {0}'.format(e)) sys.exit(1) @@ -1184,7 +1191,7 @@ class SSH(object): try: self.__sock.shutdown(socket.SHUT_RDWR) self.__sock.close() - except: + except: # pylint: disable=bare-except pass @@ -1236,7 +1243,8 @@ class KexGroup14(KexDH): super(KexGroup14, self).__init__('sha1', 2, p) -class KexDB(object): +class KexDB(object): # pylint: disable=too-few-public-methods + # pylint: disable=bad-whitespace WARN_OPENSSH72_LEGACY = 'disabled (in client) since OpenSSH 7.2, legacy algorithm' FAIL_OPENSSH70_LEGACY = 'removed since OpenSSH 7.0, legacy algorithm' FAIL_OPENSSH70_WEAK = 'removed (in server) and disabled (in client) since OpenSSH 7.0, weak algorithm' @@ -1397,7 +1405,7 @@ def get_ssh_timeframe(alg_pairs, for_server=True): # type: (List[Tuple[int, Dict[str, Dict[str, List[List[str]]]], List[Tuple[str, List[text_type]]]]], bool) -> Dict[str, List[Optional[str]]] timeframe = {} # type: Dict[str, List[Optional[str]]] for alg_pair in alg_pairs: - sshv, alg_db = alg_pair[0], alg_pair[1] + alg_db = alg_pair[1] for alg_set in alg_pair[2]: alg_type, alg_list = alg_set for alg_name in alg_list: @@ -1448,6 +1456,7 @@ def get_alg_pairs(kex, pkm): def get_alg_recommendations(software, kex, pkm, for_server=True): # type: (SSH.Software, SSH2.Kex, SSH1.PublicKeyMessage, bool) -> Tuple[SSH.Software, Dict[int, Dict[str, Dict[str, Dict[str, int]]]]] + # pylint: disable=too-many-locals,too-many-statements alg_pairs = get_alg_pairs(kex, pkm) vproducts = [SSH.Product.OpenSSH, SSH.Product.DropbearSSH, @@ -1833,19 +1842,19 @@ class Utils(object): # type: (Any) -> int try: return int(v) - except: + except: # pylint: disable=bare-except return 0 -def audit(conf, sshv=None): +def audit(aconf, sshv=None): # type: (AuditConf, Optional[int]) -> None - out.batch = conf.batch - out.colors = conf.colors - out.verbose = conf.verbose - out.minlevel = conf.minlevel - s = SSH.Socket(conf.host, conf.port) + out.batch = aconf.batch + out.colors = aconf.colors + out.verbose = aconf.verbose + out.minlevel = aconf.minlevel + s = SSH.Socket(aconf.host, aconf.port) if sshv is None: - sshv = 2 if conf.ssh2 else 1 + sshv = 2 if aconf.ssh2 else 1 err = None banner, header = s.get_banner(sshv) if banner is None: @@ -1858,8 +1867,8 @@ def audit(conf, sshv=None): except UnicodeDecodeError: payload_txt = u'"{0}"'.format(repr(payload).lstrip('b')[1:-1]) if payload_txt == u'Protocol major versions differ.': - if sshv == 2 and conf.ssh1: - audit(conf, 1) + if sshv == 2 and aconf.ssh1: + audit(aconf, 1) return err = '[exception] error reading packet ({0})'.format(payload_txt) else: