From 30f2b7690a066457ae806dc6694e6003b78420b6 Mon Sep 17 00:00:00 2001 From: Joe Testa Date: Wed, 1 Jul 2020 13:00:44 -0400 Subject: [PATCH] Enabled the following mypy options: check_untyped_defs, disallow_untyped_defs, disallow_untyped_calls, disallow_incomplete_defs, disallow_untyped_decorators, disallow_untyped_decorators, strict_equality, and strict. --- ssh-audit.py | 102 ++++++++++++++++++++++++++++++--------------------- tox.ini | 11 ++++-- 2 files changed, 68 insertions(+), 45 deletions(-) diff --git a/ssh-audit.py b/ssh-audit.py index eb6c583..81ede73 100755 --- a/ssh-audit.py +++ b/ssh-audit.py @@ -84,7 +84,7 @@ def usage(err: Optional[str] = None) -> None: # Validates policy files and performs policy testing class Policy: - def __init__(self, policy_file: str = None, policy_data: str = None) -> None: + def __init__(self, policy_file: Optional[str] = None, policy_data: Optional[str] = None) -> None: self._name = None # type: Optional[str] self._version = None # type: Optional[str] self._banner = None # type: Optional[str] @@ -274,7 +274,7 @@ macs = %s return policy_data - def evaluate(self, banner: Optional['SSH.Banner'], header: List[str], kex: Optional['SSH2.Kex']) -> Tuple[bool, List[str]]: + def evaluate(self, banner: Optional['SSH.Banner'], header: Optional[List[str]], kex: Optional['SSH2.Kex']) -> Tuple[bool, List[str]]: '''Evaluates a server configuration against this policy. Returns a tuple of a boolean (True if server adheres to policy) and an array of strings that holds error messages.''' ret = True @@ -285,9 +285,12 @@ macs = %s ret = False errors.append('Banner did not match. Expected: [%s]; Actual: [%s]' % (self._banner, banner_str)) - if (self._header is not None) and (header != self._header): + actual_header = None + if header is not None: + actual_header = "\n".join(header) + if (self._header is not None) and (actual_header is not None) and (actual_header != self._header): ret = False - errors.append('Header did not match. Expected: [%s]; Actual: [%s]' % (self._header, header)) + errors.append('Header did not match. Expected: [%s]; Actual: [%s]' % (self._header, actual_header)) # All subsequent tests require a valid kex, so end here if we don't have one. if kex is None: @@ -597,7 +600,7 @@ class Output: return lambda x: print(u'{}'.format(x)) -class OutputBuffer(list): +class OutputBuffer(List[str]): def __enter__(self) -> 'OutputBuffer': # pylint: disable=attribute-defined-outside-init self.__buf = io.StringIO() @@ -608,12 +611,12 @@ class OutputBuffer(list): def flush(self, sort_lines: bool = False) -> None: # Lines must be sorted in some cases to ensure consistent testing. if sort_lines: - self.sort() - for line in self: + self.sort() # pylint: disable=no-member + for line in self: # pylint: disable=not-an-iterable print(line) def __exit__(self, *args: Any) -> None: - self.extend(self.__buf.getvalue().splitlines()) + self.extend(self.__buf.getvalue().splitlines()) # pylint: disable=no-member sys.stdout = self.__stdout @@ -1664,7 +1667,7 @@ class SSH: # pylint: disable=too-few-public-methods return re.sub(r'^[-_\.]+', '', patch) or None @staticmethod - def _fix_date(d: str) -> Optional[str]: + def _fix_date(d: Optional[str]) -> Optional[str]: if d is not None and len(d) == 8: return '{}-{}-{}'.format(d[:4], d[4:6], d[6:8]) else: @@ -2353,31 +2356,28 @@ class SSH: # pylint: disable=too-few-public-methods def get_banner(self, sshv: int = 2) -> Tuple[Optional['SSH.Banner'], List[str], Optional[str]]: if self.__sock is None: return self.__banner, self.__header, 'not connected' + if self.__banner is not None: + return self.__banner, self.__header, None + banner = SSH_HEADER.format('1.5' if sshv == 1 else '2.0') if self.__state < self.SM_BANNER_SENT: self.send_banner(banner) - # rto = self.__sock.gettimeout() - # self.__sock.settimeout(0.7) - s, e = self.recv() - # self.__sock.settimeout(rto) - if s < 0: - return self.__banner, self.__header, e + + s = 0 e = None - while self.__banner is None: - if not s > 0: - s, e = self.recv() - if s < 0: - break - while self.__banner is None and self.unread_len > 0: + while s >= 0: + s, e = self.recv() + if s < 0: + continue + while self.unread_len > 0: line = self.read_line() if len(line.strip()) == 0: continue - if self.__banner is None: - self.__banner = SSH.Banner.parse(line) - if self.__banner is not None: - continue + self.__banner = SSH.Banner.parse(line) + if self.__banner is not None: + return self.__banner, self.__header, None self.__header.append(line) - s = 0 + return self.__banner, self.__header, e def recv(self, size: int = 2048) -> Tuple[int, Optional[str]]: @@ -2408,7 +2408,6 @@ class SSH: # pylint: disable=too-few-public-methods return 0, None except socket.error as e: return -1, str(e.args[-1]) - self.__sock.send(data) def send_banner(self, banner: str) -> None: self.send(banner.encode() + b'\r\n') @@ -2547,7 +2546,7 @@ class KexDH: # pragma: nocover # Parse a KEXDH_REPLY or KEXDH_GEX_REPLY message from the server. This # contains the host key, among other things. Function returns the host # key blob (from which the fingerprint can be calculated). - def recv_reply(self, s, parse_host_key_size=True): + def recv_reply(self, s: 'SSH.Socket', parse_host_key_size: bool = True) -> Optional[bytes]: packet_type, payload = s.read_packet(2) # Skip any & all MSG_DEBUG messages. @@ -3188,7 +3187,7 @@ def output(aconf: AuditConf, banner: Optional[SSH.Banner], header: List[str], cl out.warn("\n\n!!! WARNING: unknown algorithm(s) found!: %s. Please email the full output above to the maintainer (jtesta@positronsecurity.com), or create a Github issue at .\n" % ','.join(unknown_algorithms)) -def evaluate_policy(aconf: AuditConf, banner: Optional['SSH.Banner'], header: List[str], kex: Optional['SSH2.Kex'] = None) -> bool: +def evaluate_policy(aconf: AuditConf, banner: Optional['SSH.Banner'], header: Optional[List[str]], kex: Optional['SSH2.Kex'] = None) -> bool: if aconf.policy is None: raise RuntimeError('Internal error: cannot evaluate against null Policy!') @@ -3320,15 +3319,26 @@ class Utils: return -1.0 -def build_struct(banner, kex=None, pkm=None, client_host=None): +def build_struct(banner: Optional['SSH.Banner'], kex: Optional['SSH2.Kex'] = None, pkm: Optional['SSH1.PublicKeyMessage'] = None, client_host: Optional[str] = None) -> Any: + + banner_str = '' + banner_protocol = None + banner_software = None + banner_comments = None + if banner is not None: + banner_str = str(banner) + banner_protocol = banner.protocol + banner_software = banner.software + banner_comments = banner.comments + res = { "banner": { - "raw": str(banner), - "protocol": banner.protocol, - "software": banner.software, - "comments": banner.comments, + "raw": banner_str, + "protocol": banner_protocol, + "software": banner_software, + "comments": banner_comments, }, - } + } # type: Any if client_host is not None: res['client_ip'] = client_host if kex is not None: @@ -3339,8 +3349,8 @@ def build_struct(banner, kex=None, pkm=None, client_host=None): for algorithm in kex.kex_algorithms: entry = { 'algorithm': algorithm, - } - if (alg_sizes is not None) and (algorithm in alg_sizes): + } # type: Any + if algorithm in alg_sizes: hostkey_size, ca_size = alg_sizes[algorithm] entry['keysize'] = hostkey_size if ca_size > 0: @@ -3353,7 +3363,7 @@ def build_struct(banner, kex=None, pkm=None, client_host=None): entry = { 'algorithm': algorithm, } - if (alg_sizes is not None) and (algorithm in alg_sizes): + if algorithm in alg_sizes: hostkey_size, ca_size = alg_sizes[algorithm] entry['keysize'] = hostkey_size if ca_size > 0: @@ -3387,12 +3397,20 @@ def build_struct(banner, kex=None, pkm=None, client_host=None): } res['fingerprints'].append(entry) else: + pkm_supported_ciphers = None + pkm_supported_authentications = None + pkm_fp = None + if pkm is not None: + pkm_supported_ciphers = pkm.supported_ciphers + pkm_supported_authentications = pkm.supported_authentications + pkm_fp = SSH.Fingerprint(pkm.host_key_fingerprint_data).sha256 + res['key'] = ['ssh-rsa1'] - res['enc'] = pkm.supported_ciphers - res['aut'] = pkm.supported_authentications + res['enc'] = pkm_supported_ciphers + res['aut'] = pkm_supported_authentications res['fingerprints'] = [{ 'type': 'ssh-rsa1', - 'fp': SSH.Fingerprint(pkm.host_key_fingerprint_data).sha256, + 'fp': pkm_fp, }] return res @@ -3421,7 +3439,7 @@ def audit(aconf: AuditConf, sshv: Optional[int] = None) -> int: packet_type, payload = s.read_packet(sshv) if packet_type < 0: try: - if payload is not None and len(payload) > 0: + if len(payload) > 0: payload_txt = payload.decode('utf-8') else: payload_txt = u'empty' diff --git a/tox.ini b/tox.ini index ba99c8b..05e9179 100644 --- a/tox.ini +++ b/tox.ini @@ -90,14 +90,19 @@ commands = [mypy] ignore_missing_imports = False follow_imports = normal -; disallow_untyped_calls = True -; disallow_untyped_defs = True -; check_untyped_defs = True +disallow_incomplete_defs = True +disallow_untyped_calls = True +disallow_untyped_decorators = True +disallow_untyped_defs = True +check_untyped_defs = True disallow_subclassing_any = True warn_redundant_casts = True warn_return_any = True +warn_unreachable = True warn_unused_ignores = True strict_optional = True +strict_equality = True +strict = True [pylint] reports = no