mirror of
				https://github.com/jtesta/ssh-audit.git
				synced 2025-11-04 11:12:15 +01:00 
			
		
		
		
	Fixed new pylint warnings.
This commit is contained in:
		@@ -1,7 +1,7 @@
 | 
			
		||||
"""
 | 
			
		||||
   The MIT License (MIT)
 | 
			
		||||
 | 
			
		||||
   Copyright (C) 2017-2020 Joe Testa (jtesta@positronsecurity.com)
 | 
			
		||||
   Copyright (C) 2017-2021 Joe Testa (jtesta@positronsecurity.com)
 | 
			
		||||
   Copyright (C) 2017 Andris Raugulis (moo@arthepsy.eu)
 | 
			
		||||
 | 
			
		||||
   Permission is hereby granted, free of charge, to any person obtaining a copy
 | 
			
		||||
@@ -55,7 +55,7 @@ class Algorithms:
 | 
			
		||||
        if self.ssh1kex is None:
 | 
			
		||||
            return None
 | 
			
		||||
        item = Algorithms.Item(1, SSH1_KexDB.ALGORITHMS)
 | 
			
		||||
        item.add('key', [u'ssh-rsa1'])
 | 
			
		||||
        item.add('key', ['ssh-rsa1'])
 | 
			
		||||
        item.add('enc', self.ssh1kex.supported_ciphers)
 | 
			
		||||
        item.add('aut', self.ssh1kex.supported_authentications)
 | 
			
		||||
        return item
 | 
			
		||||
 
 | 
			
		||||
@@ -1,7 +1,7 @@
 | 
			
		||||
"""
 | 
			
		||||
   The MIT License (MIT)
 | 
			
		||||
 | 
			
		||||
   Copyright (C) 2017-2020 Joe Testa (jtesta@positronsecurity.com)
 | 
			
		||||
   Copyright (C) 2017-2021 Joe Testa (jtesta@positronsecurity.com)
 | 
			
		||||
   Copyright (C) 2017 Andris Raugulis (moo@arthepsy.eu)
 | 
			
		||||
 | 
			
		||||
   Permission is hereby granted, free of charge, to any person obtaining a copy
 | 
			
		||||
@@ -33,11 +33,11 @@ class Fingerprint:
 | 
			
		||||
    @property
 | 
			
		||||
    def md5(self) -> str:
 | 
			
		||||
        h = hashlib.md5(self.__fpd).hexdigest()
 | 
			
		||||
        r = u':'.join(h[i:i + 2] for i in range(0, len(h), 2))
 | 
			
		||||
        return u'MD5:{}'.format(r)
 | 
			
		||||
        r = ':'.join(h[i:i + 2] for i in range(0, len(h), 2))
 | 
			
		||||
        return 'MD5:{}'.format(r)
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def sha256(self) -> str:
 | 
			
		||||
        h = base64.b64encode(hashlib.sha256(self.__fpd).digest())
 | 
			
		||||
        r = h.decode('ascii').rstrip('=')
 | 
			
		||||
        return u'SHA256:{}'.format(r)
 | 
			
		||||
        return 'SHA256:{}'.format(r)
 | 
			
		||||
 
 | 
			
		||||
@@ -86,14 +86,14 @@ class GEXTest:
 | 
			
		||||
 | 
			
		||||
        # Check if the server supports any of the group-exchange
 | 
			
		||||
        # algorithms.  If so, test each one.
 | 
			
		||||
        for gex_alg in GEX_ALGS:
 | 
			
		||||
        for gex_alg, kex_group_class in GEX_ALGS.items():
 | 
			
		||||
            if gex_alg in kex.kex_algorithms:
 | 
			
		||||
                out.d('Preparing to perform DH group exchange using ' + gex_alg + '...', write_now=True)
 | 
			
		||||
 | 
			
		||||
                if GEXTest.reconnect(out, s, kex, gex_alg) is False:
 | 
			
		||||
                    break
 | 
			
		||||
 | 
			
		||||
                kex_group = GEX_ALGS[gex_alg]()
 | 
			
		||||
                kex_group = kex_group_class()
 | 
			
		||||
                smallest_modulus = -1
 | 
			
		||||
 | 
			
		||||
                # First try a range of weak sizes.
 | 
			
		||||
 
 | 
			
		||||
@@ -1,7 +1,7 @@
 | 
			
		||||
"""
 | 
			
		||||
   The MIT License (MIT)
 | 
			
		||||
 | 
			
		||||
   Copyright (C) 2017-2020 Joe Testa (jtesta@positronsecurity.com)
 | 
			
		||||
   Copyright (C) 2017-2021 Joe Testa (jtesta@positronsecurity.com)
 | 
			
		||||
   Copyright (C) 2017 Andris Raugulis (moo@arthepsy.eu)
 | 
			
		||||
 | 
			
		||||
   Permission is hereby granted, free of charge, to any person obtaining a copy
 | 
			
		||||
@@ -37,8 +37,8 @@ from ssh_audit.ssh_socket import SSH_Socket
 | 
			
		||||
 | 
			
		||||
class KexDH:  # pragma: nocover
 | 
			
		||||
    def __init__(self, kex_name: str, hash_alg: str, g: int, p: int) -> None:
 | 
			
		||||
        self.__kex_name = kex_name
 | 
			
		||||
        self.__hash_alg = hash_alg
 | 
			
		||||
        self.__kex_name = kex_name  # pylint: disable=unused-private-member
 | 
			
		||||
        self.__hash_alg = hash_alg  # pylint: disable=unused-private-member
 | 
			
		||||
        self.__g = 0
 | 
			
		||||
        self.__p = 0
 | 
			
		||||
        self.__q = 0
 | 
			
		||||
@@ -46,10 +46,10 @@ class KexDH:  # pragma: nocover
 | 
			
		||||
        self.__e = 0
 | 
			
		||||
        self.set_params(g, p)
 | 
			
		||||
 | 
			
		||||
        self.__ed25519_pubkey: Optional[bytes] = None
 | 
			
		||||
        self.__ed25519_pubkey: Optional[bytes] = None  # pylint: disable=unused-private-member
 | 
			
		||||
        self.__hostkey_type: Optional[bytes] = None
 | 
			
		||||
        self.__hostkey_e = 0
 | 
			
		||||
        self.__hostkey_n = 0
 | 
			
		||||
        self.__hostkey_e = 0  # pylint: disable=unused-private-member
 | 
			
		||||
        self.__hostkey_n = 0  # pylint: disable=unused-private-member
 | 
			
		||||
        self.__hostkey_n_len = 0  # Length of the host key modulus.
 | 
			
		||||
        self.__ca_n_len = 0  # Length of the CA key modulus (if hostkey is a cert).
 | 
			
		||||
 | 
			
		||||
@@ -121,11 +121,11 @@ class KexDH:  # pragma: nocover
 | 
			
		||||
 | 
			
		||||
        # The public key exponent.
 | 
			
		||||
        hostkey_e, hostkey_e_len, ptr = KexDH.__get_bytes(hostkey, ptr)
 | 
			
		||||
        self.__hostkey_e = int(binascii.hexlify(hostkey_e), 16)
 | 
			
		||||
        self.__hostkey_e = int(binascii.hexlify(hostkey_e), 16)  # pylint: disable=unused-private-member
 | 
			
		||||
 | 
			
		||||
        # Here is the modulus size & actual modulus of the host key public key.
 | 
			
		||||
        hostkey_n, self.__hostkey_n_len, ptr = KexDH.__get_bytes(hostkey, ptr)
 | 
			
		||||
        self.__hostkey_n = int(binascii.hexlify(hostkey_n), 16)
 | 
			
		||||
        self.__hostkey_n = int(binascii.hexlify(hostkey_n), 16)  # pylint: disable=unused-private-member
 | 
			
		||||
 | 
			
		||||
        # If this is an RSA certificate, continue parsing to extract the CA
 | 
			
		||||
        # key.
 | 
			
		||||
 
 | 
			
		||||
@@ -1,7 +1,7 @@
 | 
			
		||||
"""
 | 
			
		||||
   The MIT License (MIT)
 | 
			
		||||
 | 
			
		||||
   Copyright (C) 2020 Joe Testa (jtesta@positronsecurity.com)
 | 
			
		||||
   Copyright (C) 2020-2021 Joe Testa (jtesta@positronsecurity.com)
 | 
			
		||||
 | 
			
		||||
   Permission is hereby granted, free of charge, to any person obtaining a copy
 | 
			
		||||
   of this software and associated documentation files (the "Software"), to deal
 | 
			
		||||
@@ -117,7 +117,7 @@ class Policy:
 | 
			
		||||
 | 
			
		||||
        if policy_file is not None:
 | 
			
		||||
            try:
 | 
			
		||||
                with open(policy_file, "r") as f:
 | 
			
		||||
                with open(policy_file, "r", encoding='utf-8') as f:
 | 
			
		||||
                    policy_data = f.read()
 | 
			
		||||
            except FileNotFoundError:
 | 
			
		||||
                print("Error: policy file not found: %s" % policy_file)
 | 
			
		||||
@@ -425,8 +425,8 @@ macs = %s
 | 
			
		||||
        server_policy_names = []
 | 
			
		||||
        client_policy_names = []
 | 
			
		||||
 | 
			
		||||
        for policy_name in Policy.BUILTIN_POLICIES:
 | 
			
		||||
            if Policy.BUILTIN_POLICIES[policy_name]['server_policy']:
 | 
			
		||||
        for policy_name, policy in Policy.BUILTIN_POLICIES.items():
 | 
			
		||||
            if policy['server_policy']:
 | 
			
		||||
                server_policy_names.append(policy_name)
 | 
			
		||||
            else:
 | 
			
		||||
                client_policy_names.append(policy_name)
 | 
			
		||||
 
 | 
			
		||||
@@ -90,7 +90,7 @@ class SSH1_PublicKeyMessage:
 | 
			
		||||
    @property
 | 
			
		||||
    def supported_ciphers(self) -> List[str]:
 | 
			
		||||
        ciphers = []
 | 
			
		||||
        for i in range(len(SSH1.CIPHERS)):
 | 
			
		||||
        for i in range(len(SSH1.CIPHERS)):  # pylint: disable=consider-using-enumerate
 | 
			
		||||
            if self.__supported_ciphers_mask & (1 << i) != 0:
 | 
			
		||||
                ciphers.append(Utils.to_text(SSH1.CIPHERS[i]))
 | 
			
		||||
        return ciphers
 | 
			
		||||
 
 | 
			
		||||
@@ -563,7 +563,7 @@ def make_policy(aconf: AuditConf, banner: Optional['Banner'], kex: Optional['SSH
 | 
			
		||||
    # Open with mode 'x' (creates the file, or fails if it already exist).
 | 
			
		||||
    succeeded = True
 | 
			
		||||
    try:
 | 
			
		||||
        with open(aconf.policy_file, 'x') as f:
 | 
			
		||||
        with open(aconf.policy_file, 'x', encoding='utf-8') as f:
 | 
			
		||||
            f.write(policy_data)
 | 
			
		||||
    except FileExistsError:
 | 
			
		||||
        succeeded = False
 | 
			
		||||
@@ -681,7 +681,7 @@ def process_commandline(out: OutputBuffer, args: List[str], usage_cb: Callable[.
 | 
			
		||||
 | 
			
		||||
    # If a file containing a list of targets was given, read it.
 | 
			
		||||
    if aconf.target_file is not None:
 | 
			
		||||
        with open(aconf.target_file, 'r') as f:
 | 
			
		||||
        with open(aconf.target_file, 'r', encoding='utf-8') as f:
 | 
			
		||||
            aconf.target_list = f.readlines()
 | 
			
		||||
 | 
			
		||||
        # Strip out whitespace from each line in target file, and skip empty lines.
 | 
			
		||||
@@ -869,10 +869,10 @@ def audit(out: OutputBuffer, aconf: AuditConf, sshv: Optional[int] = None, print
 | 
			
		||||
                if len(payload) > 0:
 | 
			
		||||
                    payload_txt = payload.decode('utf-8')
 | 
			
		||||
                else:
 | 
			
		||||
                    payload_txt = u'empty'
 | 
			
		||||
                    payload_txt = 'empty'
 | 
			
		||||
            except UnicodeDecodeError:
 | 
			
		||||
                payload_txt = u'"{}"'.format(repr(payload).lstrip('b')[1:-1])
 | 
			
		||||
            if payload_txt == u'Protocol major versions differ.':
 | 
			
		||||
                payload_txt = '"{}"'.format(repr(payload).lstrip('b')[1:-1])
 | 
			
		||||
            if payload_txt == 'Protocol major versions differ.':
 | 
			
		||||
                if sshv == 2 and aconf.ssh1:
 | 
			
		||||
                    ret = audit(out, aconf, 1)
 | 
			
		||||
                    out.write()
 | 
			
		||||
@@ -972,8 +972,8 @@ def algorithm_lookup(out: OutputBuffer, alg_names: str) -> int:
 | 
			
		||||
    similar_algorithms = [
 | 
			
		||||
        alg_unknown + " --> (" + alg_type + ") " + alg_name
 | 
			
		||||
        for alg_unknown in algorithms_not_found
 | 
			
		||||
        for alg_type in adb
 | 
			
		||||
        for alg_name in adb[alg_type]
 | 
			
		||||
        for alg_type, alg_names in adb.items()
 | 
			
		||||
        for alg_name in alg_names
 | 
			
		||||
        # Perform a case-insensitive comparison using 'casefold'
 | 
			
		||||
        # and match substrings using the 'in' operator.
 | 
			
		||||
        if alg_unknown.casefold() in alg_name.casefold()
 | 
			
		||||
 
 | 
			
		||||
@@ -338,6 +338,6 @@ class SSH_Socket(ReadBuf, WriteBuf):
 | 
			
		||||
 | 
			
		||||
    def __cleanup(self) -> None:
 | 
			
		||||
        self._close_socket(self.__sock)
 | 
			
		||||
        for fd in self.__sock_map:
 | 
			
		||||
            self._close_socket(self.__sock_map[fd])
 | 
			
		||||
        for sock in self.__sock_map.values():
 | 
			
		||||
            self._close_socket(sock)
 | 
			
		||||
        self.__sock = None
 | 
			
		||||
 
 | 
			
		||||
@@ -54,7 +54,7 @@ class WriteBuf:
 | 
			
		||||
        return self.write(v)
 | 
			
		||||
 | 
			
		||||
    def write_list(self, v: List[str]) -> 'WriteBuf':
 | 
			
		||||
        return self.write_string(u','.join(v))
 | 
			
		||||
        return self.write_string(','.join(v))
 | 
			
		||||
 | 
			
		||||
    @classmethod
 | 
			
		||||
    def _bitlength(cls, n: int) -> int:
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user