From 60de5e55cb9abd46f0b55839f932078513e93742 Mon Sep 17 00:00:00 2001 From: Joe Testa Date: Thu, 21 Jan 2021 10:20:48 -0500 Subject: [PATCH] Transformed comment type annotations to variable declaration annotations. --- README.md | 1 + src/ssh_audit/algorithms.py | 4 ++-- src/ssh_audit/auditconf.py | 10 ++++---- src/ssh_audit/kexdh.py | 4 ++-- src/ssh_audit/output.py | 2 +- src/ssh_audit/policy.py | 32 ++++++++++++------------- src/ssh_audit/readbuf.py | 4 ++-- src/ssh_audit/software.py | 2 +- src/ssh_audit/ssh1.py | 2 +- src/ssh_audit/ssh1_kexdb.py | 4 ++-- src/ssh_audit/ssh2_kex.py | 6 ++--- src/ssh_audit/ssh2_kexdb.py | 4 ++-- src/ssh_audit/ssh_audit.py | 32 ++++++++++++------------- src/ssh_audit/ssh_socket.py | 10 ++++---- src/ssh_audit/timeframe.py | 6 ++--- src/ssh_audit/utils.py | 2 +- src/ssh_audit/versionvulnerabilitydb.py | 8 +++---- 17 files changed, 67 insertions(+), 66 deletions(-) diff --git a/README.md b/README.md index 7b634fc..630c592 100644 --- a/README.md +++ b/README.md @@ -161,6 +161,7 @@ For convenience, a web front-end on top of the command-line tool is available at - Added version check for OpenSSH user enumeration (CVE-2018-15473). - Fixed crash when receiving unexpected response during host key test. - Fixed hang against older Cisco devices during host key test & gex test. + - Dropped support for Python 3.5 (which reached EOL in Sept. 2020). ### v2.3.1 (2020-10-28) - Now parses public key sizes for `rsa-sha2-256-cert-v01@openssh.com` and `rsa-sha2-512-cert-v01@openssh.com` host key types. diff --git a/src/ssh_audit/algorithms.py b/src/ssh_audit/algorithms.py index 6b4294b..c36275c 100644 --- a/src/ssh_audit/algorithms.py +++ b/src/ssh_audit/algorithms.py @@ -131,7 +131,7 @@ class Algorithms: # if version is not None: # software = SSH.Software(None, product, version, None, None) # break - rec = {} # type: Dict[int, Dict[str, Dict[str, Dict[str, int]]]] + rec: Dict[int, Dict[str, Dict[str, Dict[str, int]]]] = {} if software is None: unknown_software = True for alg_pair in self.values: @@ -206,7 +206,7 @@ class Algorithms: def __init__(self, sshv: int, db: Dict[str, Dict[str, List[List[Optional[str]]]]]) -> None: self.__sshv = sshv self.__db = db - self.__storage = {} # type: Dict[str, List[str]] + self.__storage: Dict[str, List[str]] = {} @property def sshv(self) -> int: diff --git a/src/ssh_audit/auditconf.py b/src/ssh_audit/auditconf.py index 69e69ab..f55141c 100644 --- a/src/ssh_audit/auditconf.py +++ b/src/ssh_audit/auditconf.py @@ -43,16 +43,16 @@ class AuditConf: self.json = False self.verbose = False self.level = 'info' - self.ipvo = () # type: Sequence[int] + self.ipvo: Sequence[int] = () self.ipv4 = False self.ipv6 = False self.make_policy = False # When True, creates a policy file from an audit scan. - self.policy_file = None # type: Optional[str] # File system path to a policy - self.policy = None # type: Optional[Policy] # Policy object + self.policy_file: Optional[str] = None # File system path to a policy + self.policy: Optional[Policy] = None # Policy object self.timeout = 5.0 self.timeout_set = False # Set to True when the user explicitly sets it. - self.target_file = None # type: Optional[str] - self.target_list = [] # type: List[str] + self.target_file: Optional[str] = None + self.target_list: List[str] = [] self.list_policies = False self.lookup = '' diff --git a/src/ssh_audit/kexdh.py b/src/ssh_audit/kexdh.py index 6849794..0d10fc9 100644 --- a/src/ssh_audit/kexdh.py +++ b/src/ssh_audit/kexdh.py @@ -46,8 +46,8 @@ class KexDH: # pragma: nocover self.__e = 0 self.set_params(g, p) - self.__ed25519_pubkey = None # type: Optional[bytes] - self.__hostkey_type = None # type: Optional[bytes] + self.__ed25519_pubkey: Optional[bytes] = None + self.__hostkey_type: Optional[bytes] = None self.__hostkey_e = 0 self.__hostkey_n = 0 self.__hostkey_n_len = 0 # Length of the host key modulus. diff --git a/src/ssh_audit/output.py b/src/ssh_audit/output.py index cf74ebd..15e0d03 100644 --- a/src/ssh_audit/output.py +++ b/src/ssh_audit/output.py @@ -32,7 +32,7 @@ from ssh_audit.utils import Utils class Output: - LEVELS = ('info', 'warn', 'fail') # type: Sequence[str] + LEVELS: Sequence[str] = ('info', 'warn', 'fail') COLORS = {'head': 36, 'good': 32, 'warn': 33, 'fail': 31} # Use brighter colors on Windows for better readability. diff --git a/src/ssh_audit/policy.py b/src/ssh_audit/policy.py index c3a03be..e2358ef 100644 --- a/src/ssh_audit/policy.py +++ b/src/ssh_audit/policy.py @@ -36,7 +36,7 @@ from ssh_audit.banner import Banner # pylint: disable=unused-import class Policy: # Each field maps directly to a private member variable of the Policy class. - BUILTIN_POLICIES = { + BUILTIN_POLICIES: Dict[str, Dict[str, Union[Optional[str], Optional[List[str]], bool, Dict[str, int]]]] = { # Ubuntu Server policies @@ -74,25 +74,25 @@ class Policy: 'Hardened Ubuntu Client 20.04 LTS (version 2)': {'version': '2', 'banner': None, 'compressions': None, 'host_keys': ['ssh-ed25519', 'ssh-ed25519-cert-v01@openssh.com', 'sk-ssh-ed25519@openssh.com', 'sk-ssh-ed25519-cert-v01@openssh.com', 'rsa-sha2-256', 'rsa-sha2-256-cert-v01@openssh.com', 'rsa-sha2-512', 'rsa-sha2-512-cert-v01@openssh.com'], 'optional_host_keys': None, 'kex': ['curve25519-sha256', 'curve25519-sha256@libssh.org', 'diffie-hellman-group16-sha512', 'diffie-hellman-group18-sha512', 'diffie-hellman-group-exchange-sha256', 'ext-info-c'], 'ciphers': ['chacha20-poly1305@openssh.com', 'aes256-gcm@openssh.com', 'aes128-gcm@openssh.com', 'aes256-ctr', 'aes192-ctr', 'aes128-ctr'], 'macs': ['hmac-sha2-256-etm@openssh.com', 'hmac-sha2-512-etm@openssh.com', 'umac-128-etm@openssh.com'], 'hostkey_sizes': None, 'cakey_sizes': None, 'dh_modulus_sizes': None, 'server_policy': False}, - } # type: Dict[str, Dict[str, Union[Optional[str], Optional[List[str]], bool, Dict[str, int]]]] + } def __init__(self, policy_file: Optional[str] = None, policy_data: Optional[str] = None, manual_load: bool = False) -> None: - self._name = None # type: Optional[str] - self._version = None # type: Optional[str] - self._banner = None # type: Optional[str] - self._compressions = None # type: Optional[List[str]] - self._host_keys = None # type: Optional[List[str]] - self._optional_host_keys = None # type: Optional[List[str]] - self._kex = None # type: Optional[List[str]] - self._ciphers = None # type: Optional[List[str]] - self._macs = None # type: Optional[List[str]] - self._hostkey_sizes = None # type: Optional[Dict[str, int]] - self._cakey_sizes = None # type: Optional[Dict[str, int]] - self._dh_modulus_sizes = None # type: Optional[Dict[str, int]] + self._name: Optional[str] = None + self._version: Optional[str] = None + self._banner: Optional[str] = None + self._compressions: Optional[List[str]] = None + self._host_keys: Optional[List[str]] = None + self._optional_host_keys: Optional[List[str]] = None + self._kex: Optional[List[str]] = None + self._ciphers: Optional[List[str]] = None + self._macs: Optional[List[str]] = None + self._hostkey_sizes: Optional[Dict[str, int]] = None + self._cakey_sizes: Optional[Dict[str, int]] = None + self._dh_modulus_sizes: Optional[Dict[str, int]] = None self._server_policy = True - self._name_and_version = '' # type: str + self._name_and_version: str = '' # Ensure that only one mode was specified. num_modes = 0 @@ -305,7 +305,7 @@ macs = %s '''Evaluates a server configuration against this policy. Returns a tuple of a boolean (True if server adheres to policy) and an array of strings that holds error messages.''' ret = True - errors = [] # type: List[Any] + errors: List[Any] = [] banner_str = str(banner) if (self._banner is not None) and (banner_str != self._banner): diff --git a/src/ssh_audit/readbuf.py b/src/ssh_audit/readbuf.py index e986bd4..c9405c4 100644 --- a/src/ssh_audit/readbuf.py +++ b/src/ssh_audit/readbuf.py @@ -43,14 +43,14 @@ class ReadBuf: return self._buf.read(size) def read_byte(self) -> int: - v = struct.unpack('B', self.read(1))[0] # type: int + v: int = struct.unpack('B', self.read(1))[0] return v def read_bool(self) -> bool: return self.read_byte() != 0 def read_int(self) -> int: - v = struct.unpack('>I', self.read(4))[0] # type: int + v: int = struct.unpack('>I', self.read(4))[0] return v def read_list(self) -> List[str]: diff --git a/src/ssh_audit/software.py b/src/ssh_audit/software.py index 3b1462e..f13ebeb 100644 --- a/src/ssh_audit/software.py +++ b/src/ssh_audit/software.py @@ -180,7 +180,7 @@ class Software: # pylint: disable=too-many-return-statements software = str(banner.software) mx = re.match(r'^dropbear_([\d\.]+\d+)(.*)', software) - v = None # type: Optional[str] + v: Optional[str] = None if mx is not None: patch = cls._fix_patch(mx.group(2)) v, p = 'Matt Johnston', Product.DropbearSSH diff --git a/src/ssh_audit/ssh1.py b/src/ssh_audit/ssh1.py index 8246147..f966b74 100644 --- a/src/ssh_audit/ssh1.py +++ b/src/ssh_audit/ssh1.py @@ -29,7 +29,7 @@ from ssh_audit.ssh1_crc32 import SSH1_CRC32 class SSH1: - _crc32 = None # type: Optional[SSH1_CRC32] + _crc32: Optional[SSH1_CRC32] = None CIPHERS = ['none', 'idea', 'des', '3des', 'tss', 'rc4', 'blowfish'] AUTHS = ['none', 'rhosts', 'rsa', 'password', 'rhosts_rsa', 'tis', 'kerberos'] diff --git a/src/ssh_audit/ssh1_kexdb.py b/src/ssh_audit/ssh1_kexdb.py index 2c8e53e..c9810ce 100644 --- a/src/ssh_audit/ssh1_kexdb.py +++ b/src/ssh_audit/ssh1_kexdb.py @@ -34,7 +34,7 @@ class SSH1_KexDB: # pylint: disable=too-few-public-methods FAIL_NA_UNSAFE = 'not implemented in OpenSSH (server), unsafe algorithm' TEXT_CIPHER_IDEA = 'cipher used by commercial SSH' - ALGORITHMS = { + ALGORITHMS: Dict[str, Dict[str, List[List[Optional[str]]]]] = { 'key': { 'ssh-rsa1': [['1.2.2']], }, @@ -55,4 +55,4 @@ class SSH1_KexDB: # pylint: disable=too-few-public-methods 'tis': [['1.2.2']], 'kerberos': [['1.2.2', '3.6'], [FAIL_OPENSSH37_REMOVE]], } - } # type: Dict[str, Dict[str, List[List[Optional[str]]]]] + } diff --git a/src/ssh_audit/ssh2_kex.py b/src/ssh_audit/ssh2_kex.py index 84637f6..0c31f3d 100644 --- a/src/ssh_audit/ssh2_kex.py +++ b/src/ssh_audit/ssh2_kex.py @@ -41,9 +41,9 @@ class SSH2_Kex: self.__follows = follows self.__unused = unused - self.__rsa_key_sizes = {} # type: Dict[str, Tuple[int, int]] - self.__dh_modulus_sizes = {} # type: Dict[str, Tuple[int, int]] - self.__host_keys = {} # type: Dict[str, bytes] + self.__rsa_key_sizes: Dict[str, Tuple[int, int]] = {} + self.__dh_modulus_sizes: Dict[str, Tuple[int, int]] = {} + self.__host_keys: Dict[str, bytes] = {} @property def cookie(self) -> bytes: diff --git a/src/ssh_audit/ssh2_kexdb.py b/src/ssh_audit/ssh2_kexdb.py index 136be89..f013e67 100644 --- a/src/ssh_audit/ssh2_kexdb.py +++ b/src/ssh_audit/ssh2_kexdb.py @@ -59,7 +59,7 @@ class SSH2_KexDB: # pylint: disable=too-few-public-methods WARN_OBSOLETE = 'using obsolete algorithm' WARN_UNTRUSTED = 'using untrusted algorithm' - ALGORITHMS = { + ALGORITHMS: Dict[str, Dict[str, List[List[Optional[str]]]]] = { # Format: 'algorithm_name': [['version_first_appeared_in'], [reason_for_failure1, reason_for_failure2, ...], [warning1, warning2, ...]] 'kex': { 'diffie-hellman-group1-sha1': [['2.3.0,d0.28,l10.2', '6.6', '6.9'], [FAIL_1024BIT_MODULUS, FAIL_OPENSSH67_UNSAFE, FAIL_OPENSSH70_LOGJAM], [WARN_HASH_WEAK]], @@ -268,4 +268,4 @@ class SSH2_KexDB: # pylint: disable=too-few-public-methods 'chacha20-poly1305@openssh.com': [[]], # Despite the @openssh.com tag, this was never shipped as a MAC in OpenSSH (only as a cipher); it is only implemented as a MAC in Syncplify. 'crypticore-mac@ssh.com': [[], [FAIL_UNPROVEN]], } - } # type: Dict[str, Dict[str, List[List[Optional[str]]]]] + } diff --git a/src/ssh_audit/ssh_audit.py b/src/ssh_audit/ssh_audit.py index 57d948e..85aec2e 100755 --- a/src/ssh_audit/ssh_audit.py +++ b/src/ssh_audit/ssh_audit.py @@ -210,13 +210,13 @@ def output_security_sub(sub: str, software: Optional[Software], client_audit: bo if software is None or software.product not in secdb: return for line in secdb[software.product]: - vfrom = '' # type: str - vtill = '' # type: str + vfrom: str = '' + vtill: str = '' vfrom, vtill = line[0:2] if not software.between_versions(vfrom, vtill): continue - target = 0 # type: int - name = '' # type: str + target: int = 0 + name: str = '' target, name = line[2:4] is_server = target & 1 == 1 is_client = target & 2 == 2 @@ -227,8 +227,8 @@ def output_security_sub(sub: str, software: Optional[Software], client_audit: bo continue p = '' if out.batch else ' ' * (padlen - len(name)) if sub == 'cve': - cvss = 0.0 # type: float - descr = '' # type: str + cvss: float = 0.0 + descr: str = '' cvss, descr = line[4:6] # Critical CVSS scores (>= 8.0) are printed as a fail, otherwise they are printed as a warning. @@ -431,7 +431,7 @@ def output(aconf: AuditConf, banner: Optional[Banner], header: List[str], client maxlen = algs.maxlen + 1 output_security(banner, client_audit, maxlen, aconf.json) # Filled in by output_algorithms() with unidentified algs. - unknown_algorithms = [] # type: List[str] + unknown_algorithms: List[str] = [] if pkm is not None: adb = SSH1_KexDB.ALGORITHMS ciphers = pkm.supported_ciphers @@ -529,7 +529,7 @@ def list_policies() -> None: def make_policy(aconf: AuditConf, banner: Optional['Banner'], kex: Optional['SSH2_Kex'], client_host: Optional[str]) -> None: # Set the source of this policy to the server host if this is a server audit, otherwise set it to the client address. - source = aconf.host # type: Optional[str] + source: Optional[str] = aconf.host if aconf.client_audit: source = client_host @@ -562,9 +562,9 @@ def process_commandline(args: List[str], usage_cb: Callable[..., None]) -> 'Audi except getopt.GetoptError as err: usage_cb(str(err)) aconf.ssh1, aconf.ssh2 = False, False - host = '' # type: str - oport = None # type: Optional[str] - port = 0 # type: int + host: str = '' + oport: Optional[str] = None + port: int = 0 for o, a in opts: if o in ('-h', '--help'): usage_cb() @@ -687,14 +687,14 @@ def build_struct(banner: Optional['Banner'], kex: Optional['SSH2_Kex'] = None, p banner_software = banner.software banner_comments = banner.comments - res = { + res: Any = { "banner": { "raw": banner_str, "protocol": banner_protocol, "software": banner_software, "comments": banner_comments, }, - } # type: Any + } if client_host is not None: res['client_ip'] = client_host if kex is not None: @@ -703,9 +703,9 @@ def build_struct(banner: Optional['Banner'], kex: Optional['SSH2_Kex'] = None, p res['kex'] = [] alg_sizes = kex.dh_modulus_sizes() for algorithm in kex.kex_algorithms: - entry = { + entry: Any = { 'algorithm': algorithm, - } # type: Any + } if algorithm in alg_sizes: hostkey_size, ca_size = alg_sizes[algorithm] entry['keysize'] = hostkey_size @@ -879,7 +879,7 @@ def algorithm_lookup(alg_names: str) -> int: for (outer_k, outer_v) in adb.items() } - unknown_algorithms = [] # type: List[str] + unknown_algorithms: List[str] = [] padding = len(max(algorithm_names, key=len)) for alg_type in alg_types: diff --git a/src/ssh_audit/ssh_socket.py b/src/ssh_audit/ssh_socket.py index 2727a08..730125c 100644 --- a/src/ssh_audit/ssh_socket.py +++ b/src/ssh_audit/ssh_socket.py @@ -54,12 +54,12 @@ class SSH_Socket(ReadBuf, WriteBuf): def __init__(self, host: Optional[str], port: int, ipvo: Optional[Sequence[int]] = None, timeout: Union[int, float] = 5, timeout_set: bool = False) -> None: super(SSH_Socket, self).__init__() - self.__sock = None # type: Optional[socket.socket] - self.__sock_map = {} # type: Dict[int, socket.socket] + self.__sock: Optional[socket.socket] = None + self.__sock_map: Dict[int, socket.socket] = {} self.__block_size = 8 self.__state = 0 - self.__header = [] # type: List[str] - self.__banner = None # type: Optional[Banner] + self.__header: List[str] = [] + self.__banner: Optional[Banner] = None if host is None: raise ValueError('undefined host') nport = Utils.parse_int(port) @@ -73,7 +73,7 @@ class SSH_Socket(ReadBuf, WriteBuf): self.__ipvo = () self.__timeout = timeout self.__timeout_set = timeout_set - self.client_host = None # type: Optional[str] + self.client_host: Optional[str] = None self.client_port = None def _resolve(self, ipvo: Sequence[int]) -> Iterable[Tuple[int, Tuple[Any, ...]]]: diff --git a/src/ssh_audit/timeframe.py b/src/ssh_audit/timeframe.py index d977d2f..cb98d9b 100644 --- a/src/ssh_audit/timeframe.py +++ b/src/ssh_audit/timeframe.py @@ -30,12 +30,12 @@ from ssh_audit.algorithm import Algorithm class Timeframe: def __init__(self) -> None: - self.__storage = {} # type: Dict[str, List[Optional[str]]] + self.__storage: Dict[str, List[Optional[str]]] = {} def __contains__(self, product: str) -> bool: return product in self.__storage - def __getitem__(self, product): # type: (str) -> Sequence[Optional[str]] + def __getitem__(self, product: str) -> Sequence[Optional[str]]: return tuple(self.__storage.get(product, [None] * 4)) def __str__(self) -> str: @@ -51,7 +51,7 @@ class Timeframe: return self[product][1 if bool(for_server) else 3] def _update(self, versions: Optional[str], pos: int) -> None: - ssh_versions = {} # type: Dict[str, str] + ssh_versions: Dict[str, str] = {} for_srv, for_cli = pos < 2, pos > 1 for v in (versions or '').split(','): ssh_prod, ssh_ver, is_cli = Algorithm.get_ssh_version(v) diff --git a/src/ssh_audit/utils.py b/src/ssh_audit/utils.py index 7028b0e..3469dd0 100644 --- a/src/ssh_audit/utils.py +++ b/src/ssh_audit/utils.py @@ -96,7 +96,7 @@ class Utils: @classmethod def unique_seq(cls, seq: Sequence[Any]) -> Sequence[Any]: - seen = set() # type: Set[Any] + seen: Set[Any] = set() def _seen_add(x: Any) -> bool: seen.add(x) diff --git a/src/ssh_audit/versionvulnerabilitydb.py b/src/ssh_audit/versionvulnerabilitydb.py index 625e455..7f02c71 100644 --- a/src/ssh_audit/versionvulnerabilitydb.py +++ b/src/ssh_audit/versionvulnerabilitydb.py @@ -33,7 +33,7 @@ class VersionVulnerabilityDB: # pylint: disable=too-few-public-methods # Example: if it affects servers, both remote & local, then affected # = 1. If it affects servers, but is a local issue only, # then affected = 1 + 4 = 5. - CVE = { + CVE: Dict[str, List[List[Any]]] = { 'Dropbear SSH': [ ['0.0', '2018.76', 1, 'CVE-2018-15599', 5.0, 'remote users may enumerate users on the system'], ['0.0', '2017.74', 5, 'CVE-2017-9079', 4.7, 'local users can read certain files as root'], @@ -140,12 +140,12 @@ class VersionVulnerabilityDB: # pylint: disable=too-few-public-methods ['0.0', '0.66', 2, 'CVE-2016-2563', 7.5, 'buffer overflow in SCP command-line utility'], ['0.0', '0.65', 2, 'CVE-2015-5309', 4.3, 'integer overflow in terminal-handling code'], ] - } # type: Dict[str, List[List[Any]]] - TXT = { + } + TXT: Dict[str, List[List[Any]]] = { 'Dropbear SSH': [ ['0.28', '0.34', 1, 'remote root exploit', 'remote format string buffer overflow exploit (exploit-db#387)']], 'libssh': [ ['0.3.3', '0.3.3', 1, 'null pointer check', 'missing null pointer check in "crypt_set_algorithms_server"'], ['0.3.3', '0.3.3', 1, 'integer overflow', 'integer overflow in "buffer_get_data"'], ['0.3.3', '0.3.3', 3, 'heap overflow', 'heap overflow in "packet_decrypt"']] - } # type: Dict[str, List[List[Any]]] + }