From 246a41d46f89caa44ad5a280718910dd2812b7fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrgen=20Gmach?= Date: Tue, 9 Jun 2020 23:54:07 +0200 Subject: [PATCH] Flake8 fixes (#35) * Apply Flake8 also on `setup.py` modified: tox.ini * Fix W605 - invalid escape syntax modified: packages/setup.py modified: tox.ini * Update comment about Flake8: W504 W503 and W504 are mutual exclusive - so we have to keep one of them. modified: tox.ini * Fix F841 - variable assigned but never used modified: ssh-audit.py modified: tox.ini * Fix E741 - ambiguous variable name 'l' modified: ssh-audit.py modified: tox.ini * Fix E712 - comparison to False should be 'if cond is False' ... and not 'if conf == False'. modified: ssh-audit.py modified: tox.ini * Fix E711 - comparison to None should be 'if cond is not None' ... and not 'if cond != None'. modified: ssh-audit.py modified: tox.ini * Fix E305 - expected 2 blank lines ... after class or function definition, found 1. modified: ssh-audit.py modified: tox.ini * Fix E303 - too many blank lines modified: ssh-audit.py modified: tox.ini * Fix E303 - too many blank lines modified: ssh-audit.py modified: tox.ini * Fix E301 - expected 1 blank line, found 0 No code change necessary, probably fixed by another commit. modified: tox.ini * Fix E265 - block comment should start with '# ' There is lots of commented out code, which usually should be just deleted. I will keep it for now, as I am not yet very familiar with the code base. modified: ssh-audit.py modified: tox.ini * Fix E261 - at least two spaces before inline comment modified: ssh-audit.py modified: tox.ini * Fix E251 - unexpected spaces around keyword / parameter equals modified: packages/setup.py modified: tox.ini * Fix E231 - missing whitespace after ',' No code change necessary, probably fixed by previous commit. modified: tox.ini * Fix E226 - missing whitespace around arithmetic operator modified: ssh-audit.py modified: tox.ini * Fix W293 - blank line contains whitespace modified: ssh-audit.py modified: tox.ini * Fix E221 - multiple spaces before operator modified: ssh-audit.py modified: tox.ini * Update comment about Flake 8 E241 Lots of data is formatted as tables, so this warning is disabled for a good reason. modified: tox.ini * Fix E401 - multiple imports on one line modified: ssh-audit.py modified: tox.ini * Do not ignore Flake8 warning F401 ... as there were no errors in source code anyway. modified: tox.ini * Fix F821 - undefined name modified: ssh-audit.py modified: tox.ini * Reformat ignore section for Flake8 modified: tox.ini * Flake8 test suite modified: test/conftest.py modified: test/test_auditconf.py modified: test/test_banner.py modified: test/test_buffer.py modified: test/test_errors.py modified: test/test_output.py modified: test/test_resolve.py modified: test/test_socket.py modified: test/test_software.py modified: test/test_ssh1.py modified: test/test_ssh2.py modified: test/test_ssh_algorithm.py modified: test/test_utils.py modified: test/test_version_compare.py modified: tox.ini --- packages/setup.py | 26 +- ssh-audit.py | 520 ++++++++++++++++++----------------- test/conftest.py | 32 +-- test/test_auditconf.py | 14 +- test/test_banner.py | 12 +- test/test_buffer.py | 26 +- test/test_errors.py | 35 ++- test/test_output.py | 20 +- test/test_resolve.py | 20 +- test/test_socket.py | 17 +- test/test_software.py | 16 +- test/test_ssh1.py | 32 +-- test/test_ssh2.py | 19 +- test/test_ssh_algorithm.py | 26 +- test/test_utils.py | 34 +-- test/test_version_compare.py | 44 +-- tox.ini | 51 +--- 17 files changed, 459 insertions(+), 485 deletions(-) diff --git a/packages/setup.py b/packages/setup.py index c0e8740..c4daba9 100644 --- a/packages/setup.py +++ b/packages/setup.py @@ -5,7 +5,7 @@ import re from setuptools import setup -version = re.search('^VERSION\s*=\s*\'v(\d\.\d\.\d)\'', open('sshaudit/sshaudit.py').read(), re.M).group(1) +version = re.search(r'^VERSION\s*=\s*\'v(\d\.\d\.\d)\'', open('sshaudit/sshaudit.py').read(), re.M).group(1) print("\n\nPackaging ssh-audit v%s...\n\n" % version) with open("sshaudit/README.md", "rb") as f: @@ -13,20 +13,20 @@ with open("sshaudit/README.md", "rb") as f: setup( - name = "ssh-audit", - packages = ["sshaudit"], - license = 'MIT', - entry_points = { + name="ssh-audit", + packages=["sshaudit"], + license='MIT', + entry_points={ "console_scripts": ['ssh-audit = sshaudit.sshaudit:main'] }, - version = version, - description = "An SSH server & client configuration security auditing tool", - long_description = long_descr, - long_description_content_type = "text/markdown", - author = "Joe Testa", - author_email = "jtesta@positronsecurity.com", - url = "https://github.com/jtesta/ssh-audit", - classifiers = [ + version=version, + description="An SSH server & client configuration security auditing tool", + long_description=long_descr, + long_description_content_type="text/markdown", + author="Joe Testa", + author_email="jtesta@positronsecurity.com", + url="https://github.com/jtesta/ssh-audit", + classifiers=[ "Development Status :: 5 - Production/Stable", "Intended Audience :: Information Technology", "Intended Audience :: System Administrators", diff --git a/ssh-audit.py b/ssh-audit.py index 7ee752d..8528d89 100755 --- a/ssh-audit.py +++ b/ssh-audit.py @@ -2,20 +2,20 @@ # -*- coding: utf-8 -*- """ The MIT License (MIT) - + Copyright (C) 2017-2020 Joe Testa (jtesta@positronsecurity.com) Copyright (C) 2017 Andris Raugulis (moo@arthepsy.eu) - + Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: - + The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. - + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE @@ -25,11 +25,24 @@ THE SOFTWARE. """ from __future__ import print_function -import base64, binascii, errno, hashlib, getopt, io, os, random, re, select, socket, struct, sys, json +import base64 +import binascii +import errno +import getopt +import hashlib +import io +import json +import os +import random +import re +import select +import socket +import struct +import sys VERSION = 'v2.2.1-dev' -SSH_HEADER = 'SSH-{0}-OpenSSH_8.0' # SSH software to impersonate +SSH_HEADER = 'SSH-{0}-OpenSSH_8.0' # SSH software to impersonate if sys.version_info.major < 3: print("\n!!!! NOTE: Python 2 is being considered for deprecation. If you have a good reason to need continued Python 2 support, please e-mail jtesta@positronsecurity.com with your rationale.\n\n") @@ -41,7 +54,7 @@ if sys.version_info >= (3,): # pragma: nocover else: # pragma: nocover import StringIO as _StringIO # pylint: disable=import-error StringIO = BytesIO = _StringIO.StringIO - text_type = unicode # pylint: disable=undefined-variable + text_type = unicode # pylint: disable=undefined-variable # noqa: F821 binary_type = str try: # pragma: nocover # pylint: disable=unused-import @@ -99,7 +112,7 @@ class AuditConf(object): self.ipv4 = False self.ipv6 = False self.timeout = 5.0 - self.timeout_set = False # Set to True when the user explicitly sets it. + self.timeout_set = False # Set to True when the user explicitly sets it. def __setattr__(self, name, value): # type: (str, Union[str, int, bool, Sequence[int]]) -> None @@ -144,7 +157,7 @@ class AuditConf(object): valid = True if valid: object.__setattr__(self, name, value) - + @classmethod def from_cmdline(cls, args, usage_cb): # type: (List[str], Callable[..., None]) -> AuditConf @@ -190,9 +203,9 @@ class AuditConf(object): elif o in ('-t', '--timeout'): aconf.timeout = float(a) aconf.timeout_set = True - if len(args) == 0 and aconf.client_audit == False: + if len(args) == 0 and aconf.client_audit is False: usage_cb() - if aconf.client_audit == False: + if aconf.client_audit is False: if oport is not None: host = args[0] else: @@ -224,7 +237,7 @@ class AuditConf(object): class Output(object): LEVELS = ('info', 'warn', 'fail') # type: Sequence[str] COLORS = {'head': 36, 'good': 32, 'warn': 33, 'fail': 31} - + def __init__(self): # type: () -> None self.batch = False @@ -233,41 +246,41 @@ class Output(object): self.json = False self.__level = 0 self.__colsupport = 'colorama' in sys.modules or os.name == 'posix' - + @property def level(self): # type: () -> str if self.__level < len(self.LEVELS): return self.LEVELS[self.__level] return 'unknown' - + @level.setter def level(self, name): # type: (str) -> None self.__level = self.get_level(name) - + def get_level(self, name): # type: (str) -> int cname = 'info' if name == 'good' else name if cname not in self.LEVELS: return sys.maxsize return self.LEVELS.index(cname) - + def sep(self): # type: () -> None if not self.batch: print() - + @property def colors_supported(self): # type: () -> bool return self.__colsupport - + @staticmethod def _colorized(color): # type: (str) -> Callable[[text_type], None] return lambda x: print(u'{0}{1}\033[0m'.format(color, x)) - + def __getattr__(self, name): # type: (str) -> Callable[[text_type], None] if name == 'head' and self.batch: @@ -289,7 +302,7 @@ class OutputBuffer(list): self.__stdout = sys.stdout sys.stdout = self.__buf return self - + def flush(self, sort_lines=False): # type: () -> None # Lines must be sorted in some cases to ensure consistent testing. @@ -297,7 +310,7 @@ class OutputBuffer(list): self.sort() for line in self: print(line) - + def __exit__(self, *args): # type: (*Any) -> None self.extend(self.__buf.getvalue().splitlines()) @@ -310,7 +323,7 @@ class SSH2(object): # pylint: disable=too-few-public-methods WARN_OPENSSH74_UNSAFE = 'disabled (in client) since OpenSSH 7.4, unsafe algorithm' WARN_OPENSSH72_LEGACY = 'disabled (in client) since OpenSSH 7.2, legacy algorithm' FAIL_OPENSSH70_LEGACY = 'removed since OpenSSH 7.0, legacy algorithm' - FAIL_OPENSSH70_WEAK = 'removed (in server) and disabled (in client) since OpenSSH 7.0, weak algorithm' + FAIL_OPENSSH70_WEAK = 'removed (in server) and disabled (in client) since OpenSSH 7.0, weak algorithm' FAIL_OPENSSH70_LOGJAM = 'disabled (in client) since OpenSSH 7.0, logjam attack' INFO_OPENSSH69_CHACHA = 'default cipher since OpenSSH 6.9.' FAIL_OPENSSH67_UNSAFE = 'removed (in server) since OpenSSH 6.7, unsafe algorithm' @@ -319,22 +332,22 @@ class SSH2(object): # pylint: disable=too-few-public-methods FAIL_DBEAR67_DISABLED = 'disabled since Dropbear SSH 2015.67' FAIL_DBEAR53_DISABLED = 'disabled since Dropbear SSH 0.53' FAIL_DEPRECATED_CIPHER = 'deprecated cipher' - FAIL_WEAK_CIPHER = 'using weak cipher' - FAIL_WEAK_ALGORITHM = 'using weak/obsolete algorithm' - FAIL_PLAINTEXT = 'no encryption/integrity' - FAIL_DEPRECATED_MAC = 'deprecated MAC' - WARN_CURVES_WEAK = 'using weak elliptic curves' - WARN_RNDSIG_KEY = 'using weak random number generator could reveal the key' - WARN_MODULUS_SIZE = 'using small 1024-bit modulus' - WARN_HASH_WEAK = 'using weak hashing algorithm' - WARN_CIPHER_MODE = 'using weak cipher mode' - WARN_BLOCK_SIZE = 'using small 64-bit block size' - WARN_CIPHER_WEAK = 'using weak cipher' - WARN_ENCRYPT_AND_MAC = 'using encrypt-and-MAC mode' - WARN_TAG_SIZE = 'using small 64-bit tag size' - WARN_TAG_SIZE_96 = 'using small 96-bit tag size' - WARN_EXPERIMENTAL = 'using experimental algorithm' - + FAIL_WEAK_CIPHER = 'using weak cipher' + FAIL_WEAK_ALGORITHM = 'using weak/obsolete algorithm' + FAIL_PLAINTEXT = 'no encryption/integrity' + FAIL_DEPRECATED_MAC = 'deprecated MAC' + WARN_CURVES_WEAK = 'using weak elliptic curves' + WARN_RNDSIG_KEY = 'using weak random number generator could reveal the key' + WARN_MODULUS_SIZE = 'using small 1024-bit modulus' + WARN_HASH_WEAK = 'using weak hashing algorithm' + WARN_CIPHER_MODE = 'using weak cipher mode' + WARN_BLOCK_SIZE = 'using small 64-bit block size' + WARN_CIPHER_WEAK = 'using weak cipher' + WARN_ENCRYPT_AND_MAC = 'using encrypt-and-MAC mode' + WARN_TAG_SIZE = 'using small 64-bit tag size' + WARN_TAG_SIZE_96 = 'using small 96-bit tag size' + WARN_EXPERIMENTAL = 'using experimental algorithm' + ALGORITHMS = { # Format: 'algorithm_name': [['version_first_appeared_in'], [reason_for_failure1, reason_for_failure2, ...], [warning1, warning2, ...]] 'kex': { @@ -378,7 +391,7 @@ class SSH2(object): # pylint: disable=too-few-public-methods 'ecdh-sha2-nistp384': [['5.7,d2013.62'], [WARN_CURVES_WEAK]], 'ecdh-sha2-nistp521': [['5.7,d2013.62'], [WARN_CURVES_WEAK]], 'ecdh-sha2-nistt571': [[], [WARN_CURVES_WEAK]], - 'ecdh-sha2-1.3.132.0.10': [[]], # ECDH over secp256k1 (i.e.: the Bitcoin curve) + 'ecdh-sha2-1.3.132.0.10': [[]], # ECDH over secp256k1 (i.e.: the Bitcoin curve) 'curve25519-sha256@libssh.org': [['6.5,d2013.62,l10.6.0']], 'curve25519-sha256': [['7.4,d2018.76']], 'curve448-sha512': [[]], @@ -386,8 +399,8 @@ class SSH2(object): # pylint: disable=too-few-public-methods 'rsa1024-sha1': [[], [], [WARN_MODULUS_SIZE, WARN_HASH_WEAK]], 'rsa2048-sha256': [[]], 'sntrup4591761x25519-sha512@tinyssh.org': [['8.0'], [], [WARN_EXPERIMENTAL]], - 'ext-info-c': [[]], # Extension negotiation (RFC 8308) - 'ext-info-s': [[]], # Extension negotiation (RFC 8308) + 'ext-info-c': [[]], # Extension negotiation (RFC 8308) + 'ext-info-s': [[]], # Extension negotiation (RFC 8308) }, 'key': { 'ssh-rsa1': [[], [FAIL_WEAK_ALGORITHM]], @@ -400,7 +413,7 @@ class SSH2(object): # pylint: disable=too-few-public-methods 'ecdsa-sha2-nistp256': [['5.7,d2013.62,l10.6.4'], [WARN_CURVES_WEAK], [WARN_RNDSIG_KEY]], 'ecdsa-sha2-nistp384': [['5.7,d2013.62,l10.6.4'], [WARN_CURVES_WEAK], [WARN_RNDSIG_KEY]], 'ecdsa-sha2-nistp521': [['5.7,d2013.62,l10.6.4'], [WARN_CURVES_WEAK], [WARN_RNDSIG_KEY]], - 'ecdsa-sha2-1.3.132.0.10': [[], [], [WARN_RNDSIG_KEY]], # ECDSA over secp256k1 (i.e.: the Bitcoin curve) + 'ecdsa-sha2-1.3.132.0.10': [[], [], [WARN_RNDSIG_KEY]], # ECDSA over secp256k1 (i.e.: the Bitcoin curve) 'x509v3-sign-dss': [[], [FAIL_OPENSSH70_WEAK], [WARN_MODULUS_SIZE, WARN_RNDSIG_KEY]], 'x509v3-sign-rsa': [[], [], [WARN_HASH_WEAK]], 'x509v3-sign-rsa-sha256@ssh.com': [[]], @@ -416,7 +429,7 @@ class SSH2(object): # pylint: disable=too-few-public-methods 'rsa-sha2-256-cert-v01@openssh.com': [['7.8']], 'rsa-sha2-512-cert-v01@openssh.com': [['7.8']], 'ssh-rsa-sha256@ssh.com': [[]], - 'ecdsa-sha2-1.3.132.0.10': [[], [], [WARN_RNDSIG_KEY]], # ECDSA over secp256k1 (i.e.: the Bitcoin curve) + 'ecdsa-sha2-1.3.132.0.10': [[], [], [WARN_RNDSIG_KEY]], # ECDSA over secp256k1 (i.e.: the Bitcoin curve) 'sk-ecdsa-sha2-nistp256-cert-v01@openssh.com': [['8.2'], [WARN_CURVES_WEAK], [WARN_RNDSIG_KEY]], 'sk-ecdsa-sha2-nistp256@openssh.com': [['8.2'], [WARN_CURVES_WEAK], [WARN_RNDSIG_KEY]], 'sk-ssh-ed25519-cert-v01@openssh.com': [['8.2']], @@ -508,23 +521,23 @@ class SSH2(object): # pylint: disable=too-few-public-methods 'umac-128@openssh.com': [['6.2'], [], [WARN_ENCRYPT_AND_MAC]], 'hmac-sha1-etm@openssh.com': [['6.2'], [], [WARN_HASH_WEAK]], 'hmac-sha1-96-etm@openssh.com': [['6.2', '6.6', None], [FAIL_OPENSSH67_UNSAFE], [WARN_HASH_WEAK]], - 'hmac-sha2-256-96-etm@openssh.com': [[], [], [WARN_TAG_SIZE_96]], # Despite the @openssh.com tag, it doesn't appear that this was ever shipped with OpenSSH; it is only implemented in AsyncSSH (?). - 'hmac-sha2-512-96-etm@openssh.com': [[], [], [WARN_TAG_SIZE_96]], # Despite the @openssh.com tag, it doesn't appear that this was ever shipped with OpenSSH; it is only implemented in AsyncSSH (?). + 'hmac-sha2-256-96-etm@openssh.com': [[], [], [WARN_TAG_SIZE_96]], # Despite the @openssh.com tag, it doesn't appear that this was ever shipped with OpenSSH; it is only implemented in AsyncSSH (?). + 'hmac-sha2-512-96-etm@openssh.com': [[], [], [WARN_TAG_SIZE_96]], # Despite the @openssh.com tag, it doesn't appear that this was ever shipped with OpenSSH; it is only implemented in AsyncSSH (?). 'hmac-sha2-256-etm@openssh.com': [['6.2']], 'hmac-sha2-512-etm@openssh.com': [['6.2']], 'hmac-md5-etm@openssh.com': [['6.2', '6.6', '7.1'], [FAIL_OPENSSH67_UNSAFE], [WARN_OPENSSH72_LEGACY, WARN_HASH_WEAK]], 'hmac-md5-96-etm@openssh.com': [['6.2', '6.6', '7.1'], [FAIL_OPENSSH67_UNSAFE], [WARN_OPENSSH72_LEGACY, WARN_HASH_WEAK]], 'hmac-ripemd160-etm@openssh.com': [['6.2', '6.6', '7.1'], [FAIL_OPENSSH67_UNSAFE], [WARN_OPENSSH72_LEGACY]], - 'umac-32@openssh.com': [[], [], [WARN_ENCRYPT_AND_MAC, WARN_TAG_SIZE]], # Despite having the @openssh.com suffix, this may never have shipped with OpenSSH (!). + 'umac-32@openssh.com': [[], [], [WARN_ENCRYPT_AND_MAC, WARN_TAG_SIZE]], # Despite having the @openssh.com suffix, this may never have shipped with OpenSSH (!). 'umac-64-etm@openssh.com': [['6.2'], [], [WARN_TAG_SIZE]], - 'umac-96@openssh.com': [[], [], [WARN_ENCRYPT_AND_MAC]], # Despite having the @openssh.com suffix, this may never have shipped with OpenSSH (!). + 'umac-96@openssh.com': [[], [], [WARN_ENCRYPT_AND_MAC]], # Despite having the @openssh.com suffix, this may never have shipped with OpenSSH (!). 'umac-128-etm@openssh.com': [['6.2']], 'aes128-gcm': [[]], 'aes256-gcm': [[]], - 'chacha20-poly1305@openssh.com': [[]], # Despite the @openssh.com tag, this was never shipped as a MAC in OpenSSH (only as a cipher); it is only implemented as a MAC in Syncplify. + 'chacha20-poly1305@openssh.com': [[]], # Despite the @openssh.com tag, this was never shipped as a MAC in OpenSSH (only as a cipher); it is only implemented as a MAC in Syncplify. } } # type: Dict[str, Dict[str, List[List[Optional[str]]]]] - + class KexParty(object): def __init__(self, enc, mac, compression, languages): # type: (List[text_type], List[text_type], List[text_type], List[text_type]) -> None @@ -532,27 +545,27 @@ class SSH2(object): # pylint: disable=too-few-public-methods self.__mac = mac self.__compression = compression self.__languages = languages - + @property def encryption(self): # type: () -> List[text_type] return self.__enc - + @property def mac(self): # type: () -> List[text_type] return self.__mac - + @property def compression(self): # type: () -> List[text_type] return self.__compression - + @property def languages(self): # type: () -> List[text_type] return self.__languages - + class Kex(object): def __init__(self, cookie, kex_algs, key_algs, cli, srv, follows, unused=0): # type: (binary_type, List[text_type], List[text_type], SSH2.KexParty, SSH2.KexParty, bool, int) -> None @@ -572,34 +585,34 @@ class SSH2(object): # pylint: disable=too-few-public-methods def cookie(self): # type: () -> binary_type return self.__cookie - + @property def kex_algorithms(self): # type: () -> List[text_type] return self.__kex_algs - + @property def key_algorithms(self): # type: () -> List[text_type] return self.__key_algs - + # client_to_server @property def client(self): # type: () -> SSH2.KexParty return self.__client - + # server_to_client @property def server(self): # type: () -> SSH2.KexParty return self.__server - + @property def follows(self): # type: () -> bool return self.__follows - + @property def unused(self): # type: () -> int @@ -638,14 +651,14 @@ class SSH2(object): # pylint: disable=too-few-public-methods wbuf.write_list(self.server.languages) wbuf.write_bool(self.follows) wbuf.write_int(self.__unused) - + @property def payload(self): # type: () -> binary_type wbuf = WriteBuf() self.write(wbuf) return wbuf.write_flush() - + @classmethod def parse(cls, payload): # type: (binary_type) -> SSH2.Kex @@ -700,7 +713,7 @@ class SSH2(object): # pylint: disable=too-few-public-methods 'ecdh-sha2-nistp256': KexNISTP256, 'ecdh-sha2-nistp384': KexNISTP384, 'ecdh-sha2-nistp521': KexNISTP521, - #'kexguess2@matt.ucc.asn.au': ??? + # 'kexguess2@matt.ucc.asn.au': ??? } # Pick the first kex algorithm that the server supports, which we @@ -735,14 +748,14 @@ class SSH2(object): # pylint: disable=too-few-public-methods # If the connection is closed, re-open it and get the kex again. if not s.is_connected(): s.connect() - unused = None # pylint: disable=unused-variable + unused = None # pylint: disable=unused-variable unused, unused, err = s.get_banner() if err is not None: s.close() return # Parse the server's initial KEX. - packet_type = 0 # pylint: disable=unused-variable + packet_type = 0 # pylint: disable=unused-variable packet_type, payload = s.read_packet() SSH2.Kex.parse(payload) @@ -798,7 +811,6 @@ class SSH2(object): # pylint: disable=too-few-public-methods else: host_key_types[host_key_type]['parsed'] = True - # Performs DH group exchanges to find what moduli are supported, and checks # their size. class GEXTest(object): @@ -811,14 +823,14 @@ class SSH2(object): # pylint: disable=too-few-public-methods return s.connect() - unused = None # pylint: disable=unused-variable + unused = None # pylint: disable=unused-variable unused, unused, err = s.get_banner() if err is not None: s.close() return False # Parse the server's initial KEX. - packet_type = 0 # pylint: disable=unused-variable + packet_type = 0 # pylint: disable=unused-variable packet_type, payload = s.read_packet(2) kex = SSH2.Kex.parse(payload) @@ -865,7 +877,7 @@ class SSH2(object): # pylint: disable=too-few-public-methods # got here, doesn't mean the server is vulnerable... smallest_modulus = kex_group.get_dh_modulus_size() - except Exception as e: # pylint: disable=bare-except + except Exception: # pylint: disable=bare-except pass finally: s.close() @@ -887,9 +899,9 @@ class SSH2(object): # pylint: disable=too-few-public-methods kex_group.send_init_gex(s, bits, bits, bits) kex_group.recv_reply(s, False) smallest_modulus = kex_group.get_dh_modulus_size() - except Exception as e: # pylint: disable=bare-except - #import traceback - #print(traceback.format_exc()) + except Exception: # pylint: disable=bare-except + # import traceback + # print(traceback.format_exc()) pass finally: # The server is in a state that is not re-testable, @@ -897,7 +909,6 @@ class SSH2(object): # pylint: disable=too-few-public-methods # connection. s.close() - if smallest_modulus > 0: kex.set_dh_modulus_size(gex_alg, smallest_modulus) @@ -919,6 +930,7 @@ class SSH2(object): # pylint: disable=too-few-public-methods if reconnect_failed: break + class SSH1(object): class CRC32(object): def __init__(self): @@ -932,35 +944,35 @@ class SSH1(object): crc = (crc >> 1) ^ (x * 0xedb88320) n = n >> 1 self._table[i] = crc - + def calc(self, v): # type: (binary_type) -> int - crc, l = 0, len(v) - for i in range(l): + crc, length = 0, len(v) + for i in range(length): n = ord(v[i:i + 1]) n = n ^ (crc & 0xff) crc = (crc >> 8) ^ self._table[n] return crc - + _crc32 = None # type: Optional[SSH1.CRC32] CIPHERS = ['none', 'idea', 'des', '3des', 'tss', 'rc4', 'blowfish'] AUTHS = ['none', 'rhosts', 'rsa', 'password', 'rhosts_rsa', 'tis', 'kerberos'] - + @classmethod def crc32(cls, v): # type: (binary_type) -> int if cls._crc32 is None: cls._crc32 = cls.CRC32() return cls._crc32.calc(v) - + class KexDB(object): # pylint: disable=too-few-public-methods # pylint: disable=bad-whitespace - FAIL_PLAINTEXT = 'no encryption/integrity' + FAIL_PLAINTEXT = 'no encryption/integrity' FAIL_OPENSSH37_REMOVE = 'removed since OpenSSH 3.7' - FAIL_NA_BROKEN = 'not implemented in OpenSSH, broken algorithm' - FAIL_NA_UNSAFE = 'not implemented in OpenSSH (server), unsafe algorithm' - TEXT_CIPHER_IDEA = 'cipher used by commercial SSH' - + FAIL_NA_BROKEN = 'not implemented in OpenSSH, broken algorithm' + FAIL_NA_UNSAFE = 'not implemented in OpenSSH (server), unsafe algorithm' + TEXT_CIPHER_IDEA = 'cipher used by commercial SSH' + ALGORITHMS = { 'key': { 'ssh-rsa1': [['1.2.2']], @@ -983,7 +995,7 @@ class SSH1(object): 'kerberos': [['1.2.2', '3.6'], [FAIL_OPENSSH37_REMOVE]], } } # type: Dict[str, Dict[str, List[List[Optional[str]]]]] - + class PublicKeyMessage(object): def __init__(self, cookie, skey, hkey, pflags, cmask, amask): # type: (binary_type, Tuple[int, int, int], Tuple[int, int, int], int, int, int) -> None @@ -997,42 +1009,42 @@ class SSH1(object): self.__protocol_flags = pflags self.__supported_ciphers_mask = cmask self.__supported_authentications_mask = amask - + @property def cookie(self): # type: () -> binary_type return self.__cookie - + @property def server_key_bits(self): # type: () -> int return self.__server_key[0] - + @property def server_key_public_exponent(self): # type: () -> int return self.__server_key[1] - + @property def server_key_public_modulus(self): # type: () -> int return self.__server_key[2] - + @property def host_key_bits(self): # type: () -> int return self.__host_key[0] - + @property def host_key_public_exponent(self): # type: () -> int return self.__host_key[1] - + @property def host_key_public_modulus(self): # type: () -> int return self.__host_key[2] - + @property def host_key_fingerprint_data(self): # type: () -> binary_type @@ -1040,17 +1052,17 @@ class SSH1(object): mod = WriteBuf._create_mpint(self.host_key_public_modulus, False) e = WriteBuf._create_mpint(self.host_key_public_exponent, False) return mod + e - + @property def protocol_flags(self): # type: () -> int return self.__protocol_flags - + @property def supported_ciphers_mask(self): # type: () -> int return self.__supported_ciphers_mask - + @property def supported_ciphers(self): # type: () -> List[text_type] @@ -1059,12 +1071,12 @@ class SSH1(object): if self.__supported_ciphers_mask & (1 << i) != 0: ciphers.append(utils.to_utext(SSH1.CIPHERS[i])) return ciphers - + @property def supported_authentications_mask(self): # type: () -> int return self.__supported_authentications_mask - + @property def supported_authentications(self): # type: () -> List[text_type] @@ -1073,7 +1085,7 @@ class SSH1(object): if self.__supported_authentications_mask & (1 << i) != 0: auths.append(utils.to_utext(SSH1.AUTHS[i])) return auths - + def write(self, wbuf): # type: (WriteBuf) -> None wbuf.write(self.cookie) @@ -1086,14 +1098,14 @@ class SSH1(object): wbuf.write_int(self.protocol_flags) wbuf.write_int(self.supported_ciphers_mask) wbuf.write_int(self.supported_authentications_mask) - + @property def payload(self): # type: () -> binary_type wbuf = WriteBuf() self.write(wbuf) return wbuf.write_flush() - + @classmethod def parse(cls, payload): # type: (binary_type) -> SSH1.PublicKeyMessage @@ -1120,40 +1132,40 @@ class ReadBuf(object): super(ReadBuf, self).__init__() self._buf = BytesIO(data) if data is not None else BytesIO() self._len = len(data) if data is not None else 0 - + @property def unread_len(self): # type: () -> int return self._len - self._buf.tell() - + def read(self, size): # type: (int) -> binary_type return self._buf.read(size) - + def read_byte(self): # type: () -> int v = struct.unpack('B', self.read(1))[0] # type: int return v - + def read_bool(self): # type: () -> bool return self.read_byte() != 0 - + def read_int(self): # type: () -> int v = struct.unpack('>I', self.read(4))[0] # type: int return v - + def read_list(self): # type: () -> List[text_type] list_size = self.read_int() return self.read(list_size).decode('utf-8', 'replace').split(',') - + def read_string(self): # type: () -> binary_type n = self.read_int() return self.read(n) - + @classmethod def _parse_mpint(cls, v, pad, f): # type: (binary_type, binary_type, str) -> int @@ -1163,14 +1175,14 @@ class ReadBuf(object): for i in range(0, len(v), 4): r = (r << 32) | struct.unpack(f, v[i:i + 4])[0] return r - + def read_mpint1(self): # type: () -> int # NOTE: Data Type Enc @ http://www.snailbook.com/docs/protocol-1.5.txt bits = struct.unpack('>H', self.read(2))[0] n = (bits + 7) // 8 return self._parse_mpint(self.read(n), b'\x00', '>I') - + def read_mpint2(self): # type: () -> int # NOTE: Section 5 @ https://www.ietf.org/rfc/rfc4251.txt @@ -1179,7 +1191,7 @@ class ReadBuf(object): return 0 pad, f = (b'\xff', '>i') if ord(v[0:1]) & 0x80 != 0 else (b'\x00', '>I') return self._parse_mpint(v, pad, f) - + def read_line(self): # type: () -> text_type return self._buf.readline().rstrip().decode('utf-8', 'replace') @@ -1189,40 +1201,41 @@ class ReadBuf(object): self._len = 0 super(ReadBuf, self).reset() + class WriteBuf(object): def __init__(self, data=None): # type: (Optional[binary_type]) -> None super(WriteBuf, self).__init__() self._wbuf = BytesIO(data) if data is not None else BytesIO() - + def write(self, data): # type: (binary_type) -> WriteBuf self._wbuf.write(data) return self - + def write_byte(self, v): # type: (int) -> WriteBuf return self.write(struct.pack('B', v)) - + def write_bool(self, v): # type: (bool) -> WriteBuf return self.write_byte(1 if v else 0) - + def write_int(self, v): # type: (int) -> WriteBuf return self.write(struct.pack('>I', v)) - + def write_string(self, v): # type: (Union[binary_type, text_type]) -> WriteBuf if not isinstance(v, bytes): v = bytes(bytearray(v, 'utf-8')) self.write_int(len(v)) return self.write(v) - + def write_list(self, v): # type: (List[text_type]) -> WriteBuf return self.write_string(u','.join(v)) - + @classmethod def _bitlength(cls, n): # type: (int) -> int @@ -1230,7 +1243,7 @@ class WriteBuf(object): return n.bit_length() except AttributeError: return len(bin(n)) - (2 if n > 0 else 3) - + @classmethod def _create_mpint(cls, n, signed=True, bits=None): # type: (int, bool, Optional[int]) -> binary_type @@ -1248,7 +1261,7 @@ class WriteBuf(object): elif data.startswith(b'\xff\x80'): data = data[1:] return data - + def write_mpint1(self, n): # type: (int) -> WriteBuf # NOTE: Data Type Enc @ http://www.snailbook.com/docs/protocol-1.5.txt @@ -1256,20 +1269,20 @@ class WriteBuf(object): data = self._create_mpint(n, False, bits) self.write(struct.pack('>H', bits)) return self.write(data) - + def write_mpint2(self, n): # type: (int) -> WriteBuf # NOTE: Section 5 @ https://www.ietf.org/rfc/rfc4251.txt data = self._create_mpint(n) return self.write_string(data) - + def write_line(self, v): # type: (Union[binary_type, str]) -> WriteBuf if not isinstance(v, bytes): v = bytes(bytearray(v, 'utf-8')) v += b'\r\n' return self.write(v) - + def write_flush(self): # type: () -> binary_type payload = self._wbuf.getvalue() @@ -1285,23 +1298,23 @@ class SSH(object): # pylint: disable=too-few-public-methods class Protocol(object): # pylint: disable=too-few-public-methods # pylint: disable=bad-whitespace SMSG_PUBLIC_KEY = 2 - MSG_DEBUG = 4 - MSG_KEXINIT = 20 - MSG_NEWKEYS = 21 - MSG_KEXDH_INIT = 30 + MSG_DEBUG = 4 + MSG_KEXINIT = 20 + MSG_NEWKEYS = 21 + MSG_KEXDH_INIT = 30 MSG_KEXDH_REPLY = 31 MSG_KEXDH_GEX_REQUEST = 34 - MSG_KEXDH_GEX_GROUP = 31 - MSG_KEXDH_GEX_INIT = 32 - MSG_KEXDH_GEX_REPLY = 33 - + MSG_KEXDH_GEX_GROUP = 31 + MSG_KEXDH_GEX_INIT = 32 + MSG_KEXDH_GEX_REPLY = 33 + class Product(object): # pylint: disable=too-few-public-methods OpenSSH = 'OpenSSH' DropbearSSH = 'Dropbear SSH' LibSSH = 'libssh' TinySSH = 'TinySSH' PuTTY = 'PuTTY' - + class Software(object): def __init__(self, vendor, product, version, patch, os_version): # type: (Optional[str], str, str, Optional[str], Optional[str]) -> None @@ -1310,32 +1323,32 @@ class SSH(object): # pylint: disable=too-few-public-methods self.__version = version self.__patch = patch self.__os = os_version - + @property def vendor(self): # type: () -> Optional[str] return self.__vendor - + @property def product(self): # type: () -> str return self.__product - + @property def version(self): # type: () -> str return self.__version - + @property def patch(self): # type: () -> Optional[str] return self.__patch - + @property def os(self): # type: () -> Optional[str] return self.__os - + def compare_version(self, other): # type: (Union[None, SSH.Software, text_type]) -> int # pylint: disable=too-many-branches @@ -1373,7 +1386,7 @@ class SSH(object): # pylint: disable=too-few-public-methods elif spatch > opatch: return 1 return 0 - + def between_versions(self, vfrom, vtill): # type: (str, str) -> bool if bool(vfrom) and self.compare_version(vfrom) < 0: @@ -1381,7 +1394,7 @@ class SSH(object): # pylint: disable=too-few-public-methods if bool(vtill) and self.compare_version(vtill) > 0: return False return True - + def display(self, full=True): # type: (bool) -> str r = '{0} '.format(self.vendor) if bool(self.vendor) else '' @@ -1400,11 +1413,11 @@ class SSH(object): # pylint: disable=too-few-public-methods if bool(self.os): r += ' running on {0}'.format(self.os) return r - + def __str__(self): # type: () -> str return self.display() - + def __repr__(self): # type: () -> str r = 'vendor={0}, '.format(self.vendor) if bool(self.vendor) else '' @@ -1416,12 +1429,12 @@ class SSH(object): # pylint: disable=too-few-public-methods if bool(self.os): r += ', os={0}'.format(self.os) return '<{0}({1})>'.format(self.__class__.__name__, r) - + @staticmethod def _fix_patch(patch): # type: (str) -> Optional[str] return re.sub(r'^[-_\.]+', '', patch) or None - + @staticmethod def _fix_date(d): # type: (str) -> Optional[str] @@ -1429,7 +1442,7 @@ class SSH(object): # pylint: disable=too-few-public-methods return '{0}-{1}-{2}'.format(d[:4], d[4:6], d[6:8]) else: return None - + @classmethod def _extract_os_version(cls, c): # type: (Optional[str]) -> Optional[str] @@ -1456,7 +1469,7 @@ class SSH(object): # pylint: disable=too-few-public-methods if c.startswith(g) or c.endswith(g): return g return None - + @classmethod def parse(cls, banner): # type: (SSH.Banner) -> Optional[SSH.Software] @@ -1508,39 +1521,39 @@ class SSH(object): # pylint: disable=too-few-public-methods if bool(mx): return cls(None, SSH.Product.PuTTY, mx.group(1), None, None) return None - + class Banner(object): _RXP, _RXR = r'SSH-\d\.\s*?\d+', r'(-\s*([^\s]*)(?:\s+(.*))?)?' RX_PROTOCOL = re.compile(re.sub(r'\\d(\+?)', r'(\\d\g<1>)', _RXP)) RX_BANNER = re.compile(r'^({0}(?:(?:-{0})*)){1}$'.format(_RXP, _RXR)) - + def __init__(self, protocol, software, comments, valid_ascii): # type: (Tuple[int, int], Optional[str], Optional[str], bool) -> None self.__protocol = protocol self.__software = software self.__comments = comments self.__valid_ascii = valid_ascii - + @property def protocol(self): # type: () -> Tuple[int, int] return self.__protocol - + @property def software(self): # type: () -> Optional[str] return self.__software - + @property def comments(self): # type: () -> Optional[str] return self.__comments - + @property def valid_ascii(self): # type: () -> bool return self.__valid_ascii - + def __str__(self): # type: () -> str r = 'SSH-{0}.{1}'.format(self.protocol[0], self.protocol[1]) @@ -1549,7 +1562,7 @@ class SSH(object): # pylint: disable=too-few-public-methods if bool(self.comments): r += ' {0}'.format(self.comments) return r - + def __repr__(self): # type: () -> str p = '{0}.{1}'.format(self.protocol[0], self.protocol[1]) @@ -1559,7 +1572,7 @@ class SSH(object): # pylint: disable=too-few-public-methods if bool(self.comments): r += ', comments={0}'.format(self.comments) return '<{0}({1})>'.format(self.__class__.__name__, r) - + @classmethod def parse(cls, banner): # type: (text_type) -> Optional[SSH.Banner] @@ -1577,56 +1590,56 @@ class SSH(object): # pylint: disable=too-few-public-methods if comments is not None: comments = re.sub(r'\s+', ' ', comments) return cls(protocol, software, comments, valid_ascii) - + class Fingerprint(object): def __init__(self, fpd): # type: (binary_type) -> None self.__fpd = fpd - + @property def md5(self): # type: () -> text_type h = hashlib.md5(self.__fpd).hexdigest() r = u':'.join(h[i:i + 2] for i in range(0, len(h), 2)) return u'MD5:{0}'.format(r) - + @property def sha256(self): # type: () -> text_type h = base64.b64encode(hashlib.sha256(self.__fpd).digest()) r = h.decode('ascii').rstrip('=') return u'SHA256:{0}'.format(r) - + class Algorithm(object): class Timeframe(object): def __init__(self): # type: () -> None self.__storage = {} # type: Dict[str, List[Optional[str]]] - + def __contains__(self, product): # type: (str) -> bool return product in self.__storage - + def __getitem__(self, product): # type: (str) -> Sequence[Optional[str]] - return tuple(self.__storage.get(product, [None]*4)) - + return tuple(self.__storage.get(product, [None] * 4)) + def __str__(self): # type: () -> str return self.__storage.__str__() - + def __repr__(self): # type: () -> str return self.__str__() - + def get_from(self, product, for_server=True): # type: (str, bool) -> Optional[str] return self[product][0 if bool(for_server) else 2] - + def get_till(self, product, for_server=True): # type: (str, bool) -> Optional[str] return self[product][1 if bool(for_server) else 3] - + def _update(self, versions, pos): # type: (Optional[str], int) -> None ssh_versions = {} # type: Dict[str, str] @@ -1640,13 +1653,13 @@ class SSH(object): # pylint: disable=too-few-public-methods ssh_versions[ssh_prod] = ssh_ver for ssh_product, ssh_version in ssh_versions.items(): if ssh_product not in self.__storage: - self.__storage[ssh_product] = [None]*4 + self.__storage[ssh_product] = [None] * 4 prev = self[ssh_product][pos] if (prev is None or (prev < ssh_version and pos % 2 == 0) or (prev > ssh_version and pos % 2 == 1)): self.__storage[ssh_product][pos] = ssh_version - + def update(self, versions, for_server=None): # type: (List[Optional[str]], Optional[bool]) -> SSH.Algorithm.Timeframe for_cli = for_server is None or for_server is False @@ -1658,7 +1671,7 @@ class SSH(object): # pylint: disable=too-few-public-methods if for_cli and (i % 2 == 0 or vlen == 2): self._update(versions[i], 3 - 0**i) return self - + @staticmethod def get_ssh_version(version_desc): # type: (str) -> Tuple[str, str, bool] @@ -1671,7 +1684,7 @@ class SSH(object): # pylint: disable=too-few-public-methods return SSH.Product.LibSSH, version_desc[2:], is_client else: return SSH.Product.OpenSSH, version_desc, is_client - + @classmethod def get_since_text(cls, versions): # type: (List[Optional[str]]) -> Optional[text_type] @@ -1690,23 +1703,23 @@ class SSH(object): # pylint: disable=too-few-public-methods if len(tv) == 0: return None return 'available since ' + ', '.join(tv).rstrip(', ') - + class Algorithms(object): def __init__(self, pkm, kex): # type: (Optional[SSH1.PublicKeyMessage], Optional[SSH2.Kex]) -> None self.__ssh1kex = pkm self.__ssh2kex = kex - + @property def ssh1kex(self): # type: () -> Optional[SSH1.PublicKeyMessage] return self.__ssh1kex - + @property def ssh2kex(self): # type: () -> Optional[SSH2.Kex] return self.__ssh2kex - + @property def ssh1(self): # type: () -> Optional[SSH.Algorithms.Item] @@ -1717,7 +1730,7 @@ class SSH(object): # pylint: disable=too-few-public-methods item.add('enc', self.ssh1kex.supported_ciphers) item.add('aut', self.ssh1kex.supported_authentications) return item - + @property def ssh2(self): # type: () -> Optional[SSH.Algorithms.Item] @@ -1729,14 +1742,14 @@ class SSH(object): # pylint: disable=too-few-public-methods item.add('enc', self.ssh2kex.server.encryption) item.add('mac', self.ssh2kex.server.mac) return item - + @property def values(self): # type: () -> Iterable[SSH.Algorithms.Item] for item in [self.ssh1, self.ssh2]: if item is not None: yield item - + @property def maxlen(self): # type: () -> int @@ -1755,7 +1768,7 @@ class SSH(object): # pylint: disable=too-few-public-methods _ml(self.ssh2kex.server.mac), maxlen) return maxlen - + def get_ssh_timeframe(self, for_server=None): # type: (Optional[bool]) -> SSH.Algorithm.Timeframe timeframe = SSH.Algorithm.Timeframe() @@ -1770,7 +1783,7 @@ class SSH(object): # pylint: disable=too-few-public-methods versions = alg_desc[0] timeframe.update(versions, for_server) return timeframe - + def get_recommendations(self, software, for_server=True): # type: (Optional[SSH.Software], bool) -> Tuple[Optional[SSH.Software], Dict[int, Dict[str, Dict[str, Dict[str, int]]]]] # pylint: disable=too-many-locals,too-many-statements @@ -1783,19 +1796,18 @@ class SSH(object): # pylint: disable=too-few-public-methods if software is not None: if software.product not in vproducts: unknown_software = True -# -# The code below is commented out because it would try to guess what the server is, -# usually resulting in wild & incorrect recommendations. -# -# if software is None: -# ssh_timeframe = self.get_ssh_timeframe(for_server) -# for product in vproducts: -# if product not in ssh_timeframe: -# continue -# version = ssh_timeframe.get_from(product, for_server) -# if version is not None: -# software = SSH.Software(None, product, version, None, None) -# break + + # The code below is commented out because it would try to guess what the server is, + # usually resulting in wild & incorrect recommendations. + # if software is None: + # ssh_timeframe = self.get_ssh_timeframe(for_server) + # for product in vproducts: + # if product not in ssh_timeframe: + # continue + # version = ssh_timeframe.get_from(product, for_server) + # if version is not None: + # software = SSH.Software(None, product, version, None, None) + # break rec = {} # type: Dict[int, Dict[str, Dict[str, Dict[str, int]]]] if software is None: unknown_software = True @@ -1878,32 +1890,32 @@ class SSH(object): # pylint: disable=too-few-public-methods if len(rec[sshv]) == 0: del rec[sshv] return software, rec - + class Item(object): def __init__(self, sshv, db): # type: (int, Dict[str, Dict[str, List[List[Optional[str]]]]]) -> None self.__sshv = sshv self.__db = db self.__storage = {} # type: Dict[str, List[text_type]] - + @property def sshv(self): # type: () -> int return self.__sshv - + @property def db(self): # type: () -> Dict[str, Dict[str, List[List[Optional[str]]]]] return self.__db - + def add(self, key, value): # type: (str, List[text_type]) -> None self.__storage[key] = value - + def items(self): # type: () -> Iterable[Tuple[str, List[text_type]]] return self.__storage.items() - + class Security(object): # pylint: disable=too-few-public-methods # Format: [starting_vuln_version, last_vuln_version, affected, CVE_ID, CVSSv2, description] # affected: 1 = server, 2 = client, 4 = local @@ -2025,13 +2037,13 @@ class SSH(object): # pylint: disable=too-few-public-methods ['0.3.3', '0.3.3', 1, 'integer overflow', 'integer overflow in "buffer_get_data"'], ['0.3.3', '0.3.3', 3, 'heap overflow', 'heap overflow in "packet_decrypt"']] } # type: Dict[str, List[List[Any]]] - + class Socket(ReadBuf, WriteBuf): class InsufficientReadException(Exception): pass - + SM_BANNER_SENT = 1 - + def __init__(self, host, port, ipvo=None, timeout=5, timeout_set=False): # type: (Optional[str], int) -> None super(SSH.Socket, self).__init__() @@ -2057,7 +2069,6 @@ class SSH(object): # pylint: disable=too-few-public-methods self.client_host = None self.client_port = None - def _resolve(self, ipvo): # type: (Sequence[int]) -> Iterable[Tuple[int, Tuple[Any, ...]]] ipvo = tuple([x for x in utils.unique_seq(ipvo) if x in (4, 6)]) @@ -2081,7 +2092,6 @@ class SSH(object): # pylint: disable=too-few-public-methods out.fail('[exception] {0}'.format(e)) sys.exit(1) - # Listens on a server socket and accepts one connection (used for # auditing client connections). def listen_and_accept(self): @@ -2093,7 +2103,7 @@ class SSH(object): # pylint: disable=too-few-public-methods s.bind(('0.0.0.0', self.__port)) s.listen() self.__sock_map[s.fileno()] = s - except Exception as e: + except Exception: print("Warning: failed to listen on any IPv4 interfaces.") pass @@ -2105,7 +2115,7 @@ class SSH(object): # pylint: disable=too-few-public-methods s.bind(('::', self.__port)) s.listen() self.__sock_map[s.fileno()] = s - except Exception as e: + except Exception: print("Warning: failed to listen on any IPv6 interfaces.") pass @@ -2139,7 +2149,6 @@ class SSH(object): # pylint: disable=too-few-public-methods c.settimeout(self.__timeout) self.__sock = c - def connect(self): # type: () -> None err = None @@ -2161,7 +2170,7 @@ class SSH(object): # pylint: disable=too-few-public-methods errm = 'cannot connect to {0} port {1}: {2}'.format(*errt) out.fail('[exception] {0}'.format(errm)) sys.exit(1) - + def get_banner(self, sshv=2): # type: (int) -> Tuple[Optional[SSH.Banner], List[text_type], Optional[str]] if self.__sock is None: @@ -2169,10 +2178,10 @@ class SSH(object): # pylint: disable=too-few-public-methods banner = SSH_HEADER.format('1.5' if sshv == 1 else '2.0') if self.__state < self.SM_BANNER_SENT: self.send_banner(banner) -# rto = self.__sock.gettimeout() -# self.__sock.settimeout(0.7) + # rto = self.__sock.gettimeout() + # self.__sock.settimeout(0.7) s, e = self.recv() -# self.__sock.settimeout(rto) + # self.__sock.settimeout(rto) if s < 0: return self.__banner, self.__header, e e = None @@ -2192,7 +2201,7 @@ class SSH(object): # pylint: disable=too-few-public-methods self.__header.append(line) s = 0 return self.__banner, self.__header, e - + def recv(self, size=2048): # type: (int) -> Tuple[int, Optional[str]] if self.__sock is None: @@ -2213,7 +2222,7 @@ class SSH(object): # pylint: disable=too-few-public-methods self._len += len(data) self._buf.seek(pos, 0) return len(data), None - + def send(self, data): # type: (binary_type) -> Tuple[int, Optional[str]] if self.__sock is None: @@ -2224,20 +2233,20 @@ class SSH(object): # pylint: disable=too-few-public-methods except socket.error as e: return -1, str(e.args[-1]) self.__sock.send(data) - + def send_banner(self, banner): # type: (str) -> None self.send(banner.encode() + b'\r\n') if self.__state < self.SM_BANNER_SENT: self.__state = self.SM_BANNER_SENT - + def ensure_read(self, size): # type: (int) -> None while self.unread_len < size: s, e = self.recv() if s < 0: raise SSH.Socket.InsufficientReadException(e) - + def read_packet(self, sshv=2): # type: (int) -> Tuple[int, binary_type] try: @@ -2289,7 +2298,7 @@ class SSH(object): # pylint: disable=too-few-public-methods else: e = ex.args[0].encode('utf-8') return -1, e - + def send_packet(self): # type: () -> Tuple[int, Optional[str]] payload = self.write_flush() @@ -2323,11 +2332,11 @@ class SSH(object): # pylint: disable=too-few-public-methods s.close() # pragma: nocover except: # pylint: disable=bare-except pass - + def __del__(self): # type: () -> None self.__cleanup() - + def __cleanup(self): # type: () -> None self._close_socket(self.__sock) @@ -2352,12 +2361,11 @@ class KexDH(object): # pragma: nocover self.__hostkey_type = None self.__hostkey_e = 0 self.__hostkey_n = 0 - self.__hostkey_n_len = 0 # Length of the host key modulus. - self.__ca_n_len = 0 # Length of the CA key modulus (if hostkey is a cert). + self.__hostkey_n_len = 0 # Length of the host key modulus. + self.__ca_n_len = 0 # Length of the CA key modulus (if hostkey is a cert). self.__f = 0 self.__h_sig = 0 - def set_params(self, g, p): self.__g = g self.__p = p @@ -2365,7 +2373,6 @@ class KexDH(object): # pragma: nocover self.__x = 0 self.__e = 0 - def send_init(self, s, init_msg=SSH.Protocol.MSG_KEXDH_INIT): # type: (SSH.Socket) -> None r = random.SystemRandom() @@ -2395,17 +2402,16 @@ class KexDH(object): # pragma: nocover return None hostkey_len = f_len = h_sig_len = 0 # pylint: disable=unused-variable - hostkey_type_len = hostkey_e_len = 0 # pylint: disable=unused-variable - key_id_len = principles_len = 0 # pylint: disable=unused-variable - critical_options_len = extensions_len = 0 # pylint: disable=unused-variable - nonce_len = ca_key_len = ca_key_type_len = 0 # pylint: disable=unused-variable + hostkey_type_len = hostkey_e_len = 0 # pylint: disable=unused-variable + key_id_len = principles_len = 0 # pylint: disable=unused-variable + critical_options_len = extensions_len = 0 # pylint: disable=unused-variable + nonce_len = ca_key_len = ca_key_type_len = 0 # pylint: disable=unused-variable ca_key_len = ca_key_type_len = ca_key_e_len = 0 # pylint: disable=unused-variable - key_id = principles = None # pylint: disable=unused-variable - critical_options = extensions = None # pylint: disable=unused-variable - valid_after = valid_before = None # pylint: disable=unused-variable + key_id = principles = None # pylint: disable=unused-variable + critical_options = extensions = None # pylint: disable=unused-variable nonce = ca_key = ca_key_type = None # pylint: disable=unused-variable - ca_key_e = ca_key_n = None # pylint: disable=unused-variable + ca_key_e = ca_key_n = None # pylint: disable=unused-variable # Get the host key blob, F, and signature. ptr = 0 @@ -2455,12 +2461,10 @@ class KexDH(object): # pragma: nocover # The principles, which are... I don't know what. principles, principles_len, ptr = KexDH.__get_bytes(hostkey, ptr) - # The timestamp that this certificate is valid after. - valid_after = hostkey[ptr:ptr + 8] + # Skip over the timestamp that this certificate is valid after. ptr += 8 - # The timestamp that this certificate is valid before. - valid_before = hostkey[ptr:ptr + 8] + # Skip over the timestamp that this certificate is valid before. ptr += 8 # TODO: validate the principles, and time range. @@ -2895,7 +2899,7 @@ def output_fingerprints(algs, sha256=True): if algs.ssh1kex is not None: name = 'ssh-rsa1' fp = SSH.Fingerprint(algs.ssh1kex.host_key_fingerprint_data) - #bits = algs.ssh1kex.host_key_bits + # bits = algs.ssh1kex.host_key_bits fps.append((name, fp)) if algs.ssh2kex is not None: host_keys = algs.ssh2kex.host_keys() @@ -2917,8 +2921,8 @@ def output_fingerprints(algs, sha256=True): for fpp in fps: name, fp = fpp fpo = fp.sha256 if sha256 else fp.md5 - #p = '' if out.batch else ' ' * (padlen - len(name)) - #out.good('(fin) {0}{1} -- {2} {3}'.format(name, p, bits, fpo)) + # p = '' if out.batch else ' ' * (padlen - len(name)) + # out.good('(fin) {0}{1} -- {2} {3}'.format(name, p, bits, fpo)) out.good('(fin) {0}: {1}'.format(name, fpo)) if len(obuf) > 0: out.head('# fingerprints') @@ -2994,7 +2998,7 @@ def output_recommendations(algs, software, padlen=0): else: title = '' out.head('# algorithm recommendations {0}'.format(title)) - obuf.flush(True) # Sort the output so that it is always stable (needed for repeatable testing). + obuf.flush(True) # Sort the output so that it is always stable (needed for repeatable testing). out.sep() return ret @@ -3018,7 +3022,7 @@ def output_info(algs, software, client_audit, any_problems, padlen=0): def output(banner, header, client_host=None, kex=None, pkm=None): # type: (Optional[SSH.Banner], List[text_type], Optional[SSH2.Kex], Optional[SSH1.PublicKeyMessage]) -> None - client_audit = (client_host != None) # If set, this is a client audit. + client_audit = client_host is not None # If set, this is a client audit. sshv = 1 if pkm is not None else 2 algs = SSH.Algorithms(pkm, kex) with OutputBuffer() as obuf: @@ -3077,17 +3081,17 @@ def output(banner, header, client_host=None, kex=None, pkm=None): perfect_config = output_recommendations(algs, software, maxlen) output_info(algs, software, client_audit, not perfect_config) - # If we encountered any unknown algorithms, ask the user to report them. if len(unknown_algorithms) > 0: out.warn("\n\n!!! WARNING: unknown algorithm(s) found!: %s. Please email the full output above to the maintainer (jtesta@positronsecurity.com), or create a Github issue at .\n" % ','.join(unknown_algorithms)) + class Utils(object): @classmethod def _type_err(cls, v, target): # type: (Any, text_type) -> TypeError return TypeError('cannot convert {0} to {1}'.format(type(v), target)) - + @classmethod def to_bytes(cls, v, enc='utf-8'): # type: (Union[binary_type, text_type], str) -> binary_type @@ -3096,7 +3100,7 @@ class Utils(object): elif isinstance(v, text_type): return v.encode(enc) raise cls._type_err(v, 'bytes') - + @classmethod def to_utext(cls, v, enc='utf-8'): # type: (Union[text_type, binary_type], str) -> text_type @@ -3105,7 +3109,7 @@ class Utils(object): elif isinstance(v, binary_type): return v.decode(enc) raise cls._type_err(v, 'unicode text') - + @classmethod def to_ntext(cls, v, enc='utf-8'): # type: (Union[text_type, binary_type], str) -> str @@ -3116,7 +3120,7 @@ class Utils(object): elif isinstance(v, binary_type): return v.decode(enc) # PY3 only raise cls._type_err(v, 'native text') - + @classmethod def _is_ascii(cls, v, char_filter=lambda x: x <= 127): # type: (Union[text_type, str], Callable[[int], bool]) -> bool @@ -3128,7 +3132,7 @@ class Utils(object): return r r = True return r - + @classmethod def _to_ascii(cls, v, char_filter=lambda x: x <= 127, errors='replace'): # type: (Union[text_type, str], Callable[[int], bool], str) -> str @@ -3144,42 +3148,42 @@ class Utils(object): r.append(63) return cls.to_ntext(r.decode('ascii')) raise cls._type_err(v, 'ascii') - + @classmethod def is_ascii(cls, v): # type: (Union[text_type, str]) -> bool return cls._is_ascii(v) - + @classmethod def to_ascii(cls, v, errors='replace'): # type: (Union[text_type, str], str) -> str return cls._to_ascii(v, errors=errors) - + @classmethod def is_print_ascii(cls, v): # type: (Union[text_type, str]) -> bool return cls._is_ascii(v, lambda x: x >= 32 and x <= 126) - + @classmethod def to_print_ascii(cls, v, errors='replace'): # type: (Union[text_type, str], str) -> str return cls._to_ascii(v, lambda x: x >= 32 and x <= 126, errors) - + @classmethod def unique_seq(cls, seq): # type: (Sequence[Any]) -> Sequence[Any] seen = set() # type: Set[Any] - + def _seen_add(x): # type: (Any) -> bool seen.add(x) return False - + if isinstance(seq, tuple): return tuple(x for x in seq if x not in seen and not _seen_add(x)) else: return [x for x in seq if x not in seen and not _seen_add(x)] - + @classmethod def ctoi(cls, c): # type: (Union[text_type, str, int]) -> int @@ -3187,7 +3191,7 @@ class Utils(object): return ord(c[0]) else: return c - + @staticmethod def parse_int(v): # type: (Any) -> int @@ -3204,6 +3208,7 @@ class Utils(object): except: # pylint: disable=bare-except return -1.0 + def build_struct(banner, kex=None, pkm=None, client_host=None): res = { "banner": { @@ -3281,6 +3286,7 @@ def build_struct(banner, kex=None, pkm=None, client_host=None): return res + def audit(aconf, sshv=None): # type: (AuditConf, Optional[int]) -> None out.batch = aconf.batch @@ -3350,9 +3356,11 @@ def audit(aconf, sshv=None): utils = Utils() out = Output() + def main(): conf = AuditConf.from_cmdline(sys.argv[1:], usage) audit(conf) + if __name__ == '__main__': # pragma: nocover main() diff --git a/test/conftest.py b/test/conftest.py index 0bc4124..fe441e1 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -27,7 +27,7 @@ class _OutputSpy(list): self.__out = StringIO() self.__old_stdout = sys.stdout sys.stdout = self.__out - + def flush(self): lines = self.__out.getvalue().splitlines() sys.stdout = self.__old_stdout @@ -44,12 +44,12 @@ class _VirtualGlobalSocket(object): def __init__(self, vsocket): self.vsocket = vsocket self.addrinfodata = {} - + # pylint: disable=unused-argument def create_connection(self, address, timeout=0, source_address=None): # pylint: disable=protected-access return self.vsocket._connect(address, True) - + # pylint: disable=unused-argument def socket(self, family=socket.AF_INET, @@ -57,7 +57,7 @@ class _VirtualGlobalSocket(object): proto=0, fileno=None): return self.vsocket - + def getaddrinfo(self, host, port, family=0, socktype=0, proto=0, flags=0): key = '{0}#{1}'.format(host, port) if key in self.addrinfodata: @@ -85,41 +85,41 @@ class _VirtualSocket(object): self.sdata = [] self.errors = {} self.gsock = _VirtualGlobalSocket(self) - + def _check_err(self, method): method_error = self.errors.get(method) if method_error: raise method_error - + def connect(self, address): return self._connect(address, False) - + def _connect(self, address, ret=True): self.peer_address = address self._connected = True self._check_err('connect') return self if ret else None - + def settimeout(self, timeout): self.timeout = timeout - + def gettimeout(self): return self.timeout - + def getpeername(self): if self.peer_address is None or not self._connected: raise socket.error(57, 'Socket is not connected') return self.peer_address - + def getsockname(self): return self.sock_address - + def bind(self, address): self.sock_address = address - + def listen(self, backlog): pass - + def accept(self): # pylint: disable=protected-access conn = _VirtualSocket() @@ -127,7 +127,7 @@ class _VirtualSocket(object): conn.peer_address = ('127.0.0.1', 0) conn._connected = True return conn, conn.peer_address - + def recv(self, bufsize, flags=0): # pylint: disable=unused-argument if not self._connected: @@ -138,7 +138,7 @@ class _VirtualSocket(object): if isinstance(data, Exception): raise data return data - + def send(self, data): if self.peer_address is None or not self._connected: raise socket.error(32, 'Broken pipe') diff --git a/test/test_auditconf.py b/test/test_auditconf.py index a901299..67f06f4 100644 --- a/test/test_auditconf.py +++ b/test/test_auditconf.py @@ -9,7 +9,7 @@ class TestAuditConf(object): def init(self, ssh_audit): self.AuditConf = ssh_audit.AuditConf self.usage = ssh_audit.usage - + @staticmethod def _test_conf(conf, **kwargs): options = { @@ -38,11 +38,11 @@ class TestAuditConf(object): assert conf.ipv4 == options['ipv4'] assert conf.ipv6 == options['ipv6'] assert conf.ipvo == options['ipvo'] - + def test_audit_conf_defaults(self): conf = self.AuditConf() self._test_conf(conf) - + def test_audit_conf_booleans(self): conf = self.AuditConf() for p in ['ssh1', 'ssh2', 'batch', 'colors', 'verbose']: @@ -52,7 +52,7 @@ class TestAuditConf(object): for v in [False, 0]: setattr(conf, p, v) assert getattr(conf, p) is False - + def test_audit_conf_port(self): conf = self.AuditConf() for port in [22, 2222]: @@ -62,7 +62,7 @@ class TestAuditConf(object): with pytest.raises(ValueError) as excinfo: conf.port = port excinfo.match(r'.*invalid port.*') - + def test_audit_conf_ipvo(self): # ipv4-only conf = self.AuditConf() @@ -114,7 +114,7 @@ class TestAuditConf(object): assert conf.ipvo == (4, 6) conf.ipvo = (4, 4, 4, 6, 6) assert conf.ipvo == (4, 6) - + def test_audit_conf_level(self): conf = self.AuditConf() for level in ['info', 'warn', 'fail']: @@ -124,7 +124,7 @@ class TestAuditConf(object): with pytest.raises(ValueError) as excinfo: conf.level = level excinfo.match(r'.*invalid level.*') - + def test_audit_conf_cmdline(self): # pylint: disable=too-many-statements c = lambda x: self.AuditConf.from_cmdline(x.split(), self.usage) # noqa diff --git a/test/test_banner.py b/test/test_banner.py index ca93a53..cb85074 100644 --- a/test/test_banner.py +++ b/test/test_banner.py @@ -8,7 +8,7 @@ class TestBanner(object): @pytest.fixture(autouse=True) def init(self, ssh_audit): self.ssh = ssh_audit.SSH - + def test_simple_banners(self): banner = lambda x: self.ssh.Banner.parse(x) # noqa b = banner('SSH-2.0-OpenSSH_7.3') @@ -26,12 +26,12 @@ class TestBanner(object): assert b.software == 'Cisco-1.25' assert b.comments is None assert str(b) == 'SSH-1.5-Cisco-1.25' - + def test_invalid_banners(self): b = lambda x: self.ssh.Banner.parse(x) # noqa assert b('Something') is None assert b('SSH-XXX-OpenSSH_7.3') is None - + def test_banners_with_spaces(self): b = lambda x: self.ssh.Banner.parse(x) # noqa s = 'SSH-2.0-OpenSSH_4.3p2' @@ -42,7 +42,7 @@ class TestBanner(object): assert str(b('SSH-2.0- OpenSSH_4.3p2 Debian-9etch3 on i686-pc-linux-gnu')) == s assert str(b('SSH-2.0-OpenSSH_4.3p2 Debian-9etch3 on i686-pc-linux-gnu ')) == s assert str(b('SSH-2.0- OpenSSH_4.3p2 Debian-9etch3 on i686-pc-linux-gnu ')) == s - + def test_banners_without_software(self): b = lambda x: self.ssh.Banner.parse(x) # noqa assert b('SSH-2.0').protocol == (2, 0) @@ -53,13 +53,13 @@ class TestBanner(object): assert b('SSH-2.0-').software == '' assert b('SSH-2.0-').comments is None assert str(b('SSH-2.0-')) == 'SSH-2.0-' - + def test_banners_with_comments(self): b = lambda x: self.ssh.Banner.parse(x) # noqa assert repr(b('SSH-2.0-OpenSSH_7.2p2 Ubuntu-1')) == '' assert repr(b('SSH-1.99-OpenSSH_3.4p1 Debian 1:3.4p1-1.woody.3')) == '' assert repr(b('SSH-1.5-1.3.7 F-SECURE SSH')) == '' - + def test_banners_with_multiple_protocols(self): b = lambda x: self.ssh.Banner.parse(x) # noqa assert str(b('SSH-1.99-SSH-1.99-OpenSSH_3.6.1p2')) == 'SSH-1.99-OpenSSH_3.6.1p2' diff --git a/test/test_buffer.py b/test/test_buffer.py index 1e457bc..bf9eaec 100644 --- a/test/test_buffer.py +++ b/test/test_buffer.py @@ -11,13 +11,13 @@ class TestBuffer(object): self.rbuf = ssh_audit.ReadBuf self.wbuf = ssh_audit.WriteBuf self.utf8rchar = b'\xef\xbf\xbd' - + @classmethod def _b(cls, v): v = re.sub(r'\s', '', v) data = [int(v[i * 2:i * 2 + 2], 16) for i in range(len(v) // 2)] return bytes(bytearray(data)) - + def test_unread(self): w = self.wbuf().write_byte(1).write_int(2).write_flush() r = self.rbuf(w) @@ -26,7 +26,7 @@ class TestBuffer(object): assert r.unread_len == 4 r.read_int() assert r.unread_len == 0 - + def test_byte(self): w = lambda x: self.wbuf().write_byte(x).write_flush() # noqa r = lambda x: self.rbuf(x).read_byte() # noqa @@ -37,7 +37,7 @@ class TestBuffer(object): for p in tc: assert w(p[0]) == self._b(p[1]) assert r(self._b(p[1])) == p[0] - + def test_bool(self): w = lambda x: self.wbuf().write_bool(x).write_flush() # noqa r = lambda x: self.rbuf(x).read_bool() # noqa @@ -46,7 +46,7 @@ class TestBuffer(object): for p in tc: assert w(p[0]) == self._b(p[1]) assert r(self._b(p[1])) == p[0] - + def test_int(self): w = lambda x: self.wbuf().write_int(x).write_flush() # noqa r = lambda x: self.rbuf(x).read_int() # noqa @@ -57,7 +57,7 @@ class TestBuffer(object): for p in tc: assert w(p[0]) == self._b(p[1]) assert r(self._b(p[1])) == p[0] - + def test_string(self): w = lambda x: self.wbuf().write_string(x).write_flush() # noqa r = lambda x: self.rbuf(x).read_string() # noqa @@ -69,7 +69,7 @@ class TestBuffer(object): if not isinstance(v, bytes): v = bytes(bytearray(v, 'utf-8')) assert r(self._b(p[1])) == v - + def test_list(self): w = lambda x: self.wbuf().write_list(x).write_flush() # noqa r = lambda x: self.rbuf(x).read_list() # noqa @@ -77,13 +77,13 @@ class TestBuffer(object): for p in tc: assert w(p[0]) == self._b(p[1]) assert r(self._b(p[1])) == p[0] - + def test_list_nonutf8(self): r = lambda x: self.rbuf(x).read_list() # noqa src = self._b('00 00 00 04 de ad be ef') dst = [(b'\xde\xad' + self.utf8rchar + self.utf8rchar).decode('utf-8')] assert r(src) == dst - + def test_line(self): w = lambda x: self.wbuf().write_line(x).write_flush() # noqa r = lambda x: self.rbuf(x).read_line() # noqa @@ -91,13 +91,13 @@ class TestBuffer(object): for p in tc: assert w(p[0]) == self._b(p[1]) assert r(self._b(p[1])) == p[0] - + def test_line_nonutf8(self): r = lambda x: self.rbuf(x).read_line() # noqa src = self._b('de ad be af') dst = (b'\xde\xad' + self.utf8rchar + self.utf8rchar).decode('utf-8') assert r(src) == dst - + def test_bitlen(self): # pylint: disable=protected-access class Py26Int(int): @@ -105,7 +105,7 @@ class TestBuffer(object): raise AttributeError assert self.wbuf._bitlength(42) == 6 assert self.wbuf._bitlength(Py26Int(42)) == 6 - + def test_mpint1(self): mpint1w = lambda x: self.wbuf().write_mpint1(x).write_flush() # noqa mpint1r = lambda x: self.rbuf(x).read_mpint1() # noqa @@ -116,7 +116,7 @@ class TestBuffer(object): for p in tc: assert mpint1w(p[0]) == self._b(p[1]) assert mpint1r(self._b(p[1])) == p[0] - + def test_mpint2(self): mpint2w = lambda x: self.wbuf().write_mpint2(x).write_flush() # noqa mpint2r = lambda x: self.rbuf(x).read_mpint2() # noqa diff --git a/test/test_errors.py b/test/test_errors.py index abf720e..aedd835 100644 --- a/test/test_errors.py +++ b/test/test_errors.py @@ -11,13 +11,13 @@ class TestErrors(object): def init(self, ssh_audit): self.AuditConf = ssh_audit.AuditConf self.audit = ssh_audit.audit - + def _conf(self): conf = self.AuditConf('localhost', 22) conf.colors = False conf.batch = True return conf - + def _audit(self, spy, conf=None, sysexit=True): if conf is None: conf = self._conf() @@ -29,34 +29,33 @@ class TestErrors(object): self.audit(conf) lines = spy.flush() return lines - + def test_connection_unresolved(self, output_spy, virtual_socket): vsocket = virtual_socket vsocket.gsock.addrinfodata['localhost#22'] = [] lines = self._audit(output_spy) assert len(lines) == 1 assert 'has no DNS records' in lines[-1] - + def test_connection_refused(self, output_spy, virtual_socket): vsocket = virtual_socket vsocket.errors['connect'] = socket.error(errno.ECONNREFUSED, 'Connection refused') lines = self._audit(output_spy) assert len(lines) == 1 assert 'Connection refused' in lines[-1] - + def test_connection_timeout(self, output_spy, virtual_socket): vsocket = virtual_socket vsocket.errors['connect'] = socket.timeout('timed out') lines = self._audit(output_spy) assert len(lines) == 1 assert 'timed out' in lines[-1] - + def test_recv_empty(self, output_spy, virtual_socket): - vsocket = virtual_socket lines = self._audit(output_spy) assert len(lines) == 1 assert 'did not receive banner' in lines[-1] - + def test_recv_timeout(self, output_spy, virtual_socket): vsocket = virtual_socket vsocket.rdata.append(socket.timeout('timed out')) @@ -64,7 +63,7 @@ class TestErrors(object): assert len(lines) == 1 assert 'did not receive banner' in lines[-1] assert 'timed out' in lines[-1] - + def test_recv_retry_till_timeout(self, output_spy, virtual_socket): vsocket = virtual_socket vsocket.rdata.append(socket.error(errno.EAGAIN, 'Resource temporarily unavailable')) @@ -75,7 +74,7 @@ class TestErrors(object): assert len(lines) == 1 assert 'did not receive banner' in lines[-1] assert 'timed out' in lines[-1] - + def test_recv_retry_till_reset(self, output_spy, virtual_socket): vsocket = virtual_socket vsocket.rdata.append(socket.error(errno.EAGAIN, 'Resource temporarily unavailable')) @@ -86,7 +85,7 @@ class TestErrors(object): assert len(lines) == 1 assert 'did not receive banner' in lines[-1] assert 'reset by peer' in lines[-1] - + def test_connection_closed_before_banner(self, output_spy, virtual_socket): vsocket = virtual_socket vsocket.rdata.append(socket.error(errno.ECONNRESET, 'Connection reset by peer')) @@ -94,7 +93,7 @@ class TestErrors(object): assert len(lines) == 1 assert 'did not receive banner' in lines[-1] assert 'reset by peer' in lines[-1] - + def test_connection_closed_after_header(self, output_spy, virtual_socket): vsocket = virtual_socket vsocket.rdata.append(b'header line 1\n') @@ -105,7 +104,7 @@ class TestErrors(object): assert len(lines) == 3 assert 'did not receive banner' in lines[-1] assert 'reset by peer' in lines[-1] - + def test_connection_closed_after_banner(self, output_spy, virtual_socket): vsocket = virtual_socket vsocket.rdata.append(b'SSH-2.0-ssh-audit-test\r\n') @@ -114,7 +113,7 @@ class TestErrors(object): assert len(lines) == 2 assert 'error reading packet' in lines[-1] assert 'reset by peer' in lines[-1] - + def test_empty_data_after_banner(self, output_spy, virtual_socket): vsocket = virtual_socket vsocket.rdata.append(b'SSH-2.0-ssh-audit-test\r\n') @@ -122,7 +121,7 @@ class TestErrors(object): assert len(lines) == 2 assert 'error reading packet' in lines[-1] assert 'empty' in lines[-1] - + def test_wrong_data_after_banner(self, output_spy, virtual_socket): vsocket = virtual_socket vsocket.rdata.append(b'SSH-2.0-ssh-audit-test\r\n') @@ -131,7 +130,7 @@ class TestErrors(object): assert len(lines) == 2 assert 'error reading packet' in lines[-1] assert 'xxx' in lines[-1] - + def test_non_ascii_banner(self, output_spy, virtual_socket): vsocket = virtual_socket vsocket.rdata.append(b'SSH-2.0-ssh-audit-test\xc3\xbc\r\n') @@ -140,7 +139,7 @@ class TestErrors(object): assert 'error reading packet' in lines[-1] assert 'ASCII' in lines[-2] assert lines[-3].endswith('SSH-2.0-ssh-audit-test?') - + def test_nonutf8_data_after_banner(self, output_spy, virtual_socket): vsocket = virtual_socket vsocket.rdata.append(b'SSH-2.0-ssh-audit-test\r\n') @@ -149,7 +148,7 @@ class TestErrors(object): assert len(lines) == 2 assert 'error reading packet' in lines[-1] assert '\\x81\\xff' in lines[-1] - + def test_protocol_mismatch_by_conf(self, output_spy, virtual_socket): vsocket = virtual_socket vsocket.rdata.append(b'SSH-1.3-ssh-audit-test\r\n') diff --git a/test/test_output.py b/test/test_output.py index 3ac6f06..3214616 100644 --- a/test/test_output.py +++ b/test/test_output.py @@ -10,7 +10,7 @@ class TestOutput(object): def init(self, ssh_audit): self.Output = ssh_audit.Output self.OutputBuffer = ssh_audit.OutputBuffer - + def test_output_buffer_no_lines(self, output_spy): output_spy.begin() with self.OutputBuffer() as obuf: @@ -21,13 +21,13 @@ class TestOutput(object): pass obuf.flush() assert output_spy.flush() == [] - + def test_output_buffer_no_flush(self, output_spy): output_spy.begin() with self.OutputBuffer(): print(u'abc') assert output_spy.flush() == [] - + def test_output_buffer_flush(self, output_spy): output_spy.begin() with self.OutputBuffer() as obuf: @@ -36,14 +36,14 @@ class TestOutput(object): print(u'def') obuf.flush() assert output_spy.flush() == [u'abc', u'', u'def'] - + def test_output_defaults(self): out = self.Output() # default: on assert out.batch is False assert out.use_colors is True assert out.level == 'info' - + def test_output_colors(self, output_spy): out = self.Output() # test without colors @@ -82,7 +82,7 @@ class TestOutput(object): output_spy.begin() out.fail('fail color') assert output_spy.flush() == [u'\x1b[0;31mfail color\x1b[0m'] - + def test_output_sep(self, output_spy): out = self.Output() output_spy.begin() @@ -90,7 +90,7 @@ class TestOutput(object): out.sep() out.sep() assert output_spy.flush() == [u'', u'', u''] - + def test_output_levels(self): out = self.Output() assert out.get_level('info') == 0 @@ -98,7 +98,7 @@ class TestOutput(object): assert out.get_level('warn') == 1 assert out.get_level('fail') == 2 assert out.get_level('unknown') > 2 - + def test_output_level_property(self): out = self.Output() out.level = 'info' @@ -111,7 +111,7 @@ class TestOutput(object): assert out.level == 'fail' out.level = 'invalid level' assert out.level == 'unknown' - + def test_output_level(self, output_spy): out = self.Output() # visible: all @@ -150,7 +150,7 @@ class TestOutput(object): out.warn('warn color') out.fail('fail color') assert len(output_spy.flush()) == 1 - + def test_output_batch(self, output_spy): out = self.Output() # visible: all diff --git a/test/test_resolve.py b/test/test_resolve.py index 8fcddf6..35dfd11 100644 --- a/test/test_resolve.py +++ b/test/test_resolve.py @@ -11,13 +11,13 @@ class TestResolve(object): self.AuditConf = ssh_audit.AuditConf self.audit = ssh_audit.audit self.ssh = ssh_audit.SSH - + def _conf(self): conf = self.AuditConf('localhost', 22) conf.colors = False conf.batch = True return conf - + def test_resolve_error(self, output_spy, virtual_socket): vsocket = virtual_socket vsocket.gsock.addrinfodata['localhost#22'] = socket.gaierror(8, 'hostname nor servname provided, or not known') @@ -25,11 +25,11 @@ class TestResolve(object): conf = self._conf() output_spy.begin() with pytest.raises(SystemExit): - r = list(s._resolve(conf.ipvo)) + list(s._resolve(conf.ipvo)) lines = output_spy.flush() assert len(lines) == 1 assert 'hostname nor servname provided' in lines[-1] - + def test_resolve_hostname_without_records(self, output_spy, virtual_socket): vsocket = virtual_socket vsocket.gsock.addrinfodata['localhost#22'] = [] @@ -38,36 +38,32 @@ class TestResolve(object): output_spy.begin() r = list(s._resolve(conf.ipvo)) assert len(r) == 0 - + def test_resolve_ipv4(self, virtual_socket): - vsocket = virtual_socket conf = self._conf() conf.ipv4 = True s = self.ssh.Socket('localhost', 22) r = list(s._resolve(conf.ipvo)) assert len(r) == 1 assert r[0] == (socket.AF_INET, ('127.0.0.1', 22)) - + def test_resolve_ipv6(self, virtual_socket): - vsocket = virtual_socket s = self.ssh.Socket('localhost', 22) conf = self._conf() conf.ipv6 = True r = list(s._resolve(conf.ipvo)) assert len(r) == 1 assert r[0] == (socket.AF_INET6, ('::1', 22)) - + def test_resolve_ipv46_both(self, virtual_socket): - vsocket = virtual_socket s = self.ssh.Socket('localhost', 22) conf = self._conf() r = list(s._resolve(conf.ipvo)) assert len(r) == 2 assert r[0] == (socket.AF_INET, ('127.0.0.1', 22)) assert r[1] == (socket.AF_INET6, ('::1', 22)) - + def test_resolve_ipv46_order(self, virtual_socket): - vsocket = virtual_socket s = self.ssh.Socket('localhost', 22) conf = self._conf() conf.ipv4 = True diff --git a/test/test_socket.py b/test/test_socket.py index d5c27fc..cd12470 100644 --- a/test/test_socket.py +++ b/test/test_socket.py @@ -1,6 +1,5 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -import socket import pytest @@ -9,21 +8,21 @@ class TestSocket(object): @pytest.fixture(autouse=True) def init(self, ssh_audit): self.ssh = ssh_audit.SSH - + def test_invalid_host(self, virtual_socket): with pytest.raises(ValueError): - s = self.ssh.Socket(None, 22) - + self.ssh.Socket(None, 22) + def test_invalid_port(self, virtual_socket): with pytest.raises(ValueError): - s = self.ssh.Socket('localhost', 'abc') + self.ssh.Socket('localhost', 'abc') with pytest.raises(ValueError): - s = self.ssh.Socket('localhost', -1) + self.ssh.Socket('localhost', -1) with pytest.raises(ValueError): - s = self.ssh.Socket('localhost', 0) + self.ssh.Socket('localhost', 0) with pytest.raises(ValueError): - s = self.ssh.Socket('localhost', 65536) - + self.ssh.Socket('localhost', 65536) + def test_not_connected_socket(self, virtual_socket): sock = self.ssh.Socket('localhost', 22) banner, header, err = sock.get_banner() diff --git a/test/test_software.py b/test/test_software.py index 4785041..b505507 100644 --- a/test/test_software.py +++ b/test/test_software.py @@ -8,13 +8,13 @@ class TestSoftware(object): @pytest.fixture(autouse=True) def init(self, ssh_audit): self.ssh = ssh_audit.SSH - + def test_unknown_software(self): ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x)) # noqa assert ps('SSH-1.5') is None assert ps('SSH-1.99-AlfaMegaServer') is None assert ps('SSH-2.0-BetaMegaServer 0.0.1') is None - + def test_openssh_software(self): # pylint: disable=too-many-statements ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x)) # noqa @@ -102,7 +102,7 @@ class TestSoftware(object): assert s.display(True) == str(s) assert s.display(False) == 'OpenSSH 5.9' assert repr(s) == '' - + def test_dropbear_software(self): ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x)) # noqa # common @@ -153,7 +153,7 @@ class TestSoftware(object): assert s.display(True) == str(s) assert s.display(False) == 'Dropbear SSH 2014.66' assert repr(s) == '' - + def test_libssh_software(self): ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x)) # noqa # common @@ -179,7 +179,7 @@ class TestSoftware(object): assert s.display(True) == str(s) assert s.display(False) == str(s) assert repr(s) == '' - + def test_romsshell_software(self): ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x)) # noqa # common @@ -194,7 +194,7 @@ class TestSoftware(object): assert s.display(True) == str(s) assert s.display(False) == str(s) assert repr(s) == '' - + def test_hp_ilo_software(self): ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x)) # noqa # common @@ -209,7 +209,7 @@ class TestSoftware(object): assert s.display(True) == str(s) assert s.display(False) == str(s) assert repr(s) == '' - + def test_cisco_software(self): ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x)) # noqa # common @@ -224,7 +224,7 @@ class TestSoftware(object): assert s.display(True) == str(s) assert s.display(False) == str(s) assert repr(s) == '' - + def test_software_os(self): ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x)) # noqa # unknown diff --git a/test/test_ssh1.py b/test/test_ssh1.py index f18e4be..6ccd601 100644 --- a/test/test_ssh1.py +++ b/test/test_ssh1.py @@ -14,7 +14,7 @@ class TestSSH1(object): self.wbuf = ssh_audit.WriteBuf self.audit = ssh_audit.audit self.AuditConf = ssh_audit.AuditConf - + def _conf(self): conf = self.AuditConf('localhost', 22) conf.colors = False @@ -23,7 +23,7 @@ class TestSSH1(object): conf.ssh1 = True conf.ssh2 = False return conf - + def _create_ssh1_packet(self, payload, valid_crc=True): padding = -(len(payload) + 4) % 8 plen = len(payload) + 4 @@ -31,15 +31,15 @@ class TestSSH1(object): cksum = self.ssh1.crc32(pad_bytes + payload) if valid_crc else 0 data = struct.pack('>I', plen) + pad_bytes + payload + struct.pack('>I', cksum) return data - + @classmethod def _server_key(cls): return (1024, 0x10001, 0xee6552da432e0ac2c422df1a51287507748bfe3b5e3e4fa989a8f49fdc163a17754939ef18ef8a667ea3b71036a151fcd7f5e01ceef1e4439864baf3ac569047582c69d6c128212e0980dcb3168f00d371004039983f6033cd785b8b8f85096c7d9405cbfdc664e27c966356a6b4eb6ee20ad43414b50de18b22829c1880b551) - + @classmethod def _host_key(cls): return (2048, 0x10001, 0xdfa20cd2a530ccc8c870aa60d9feb3b35deeab81c3215a96557abbd683d21f4600f38e475d87100da9a4404220eeb3bb5584e5a2b5b48ffda58530ea19104a32577d7459d91e76aa711b241050f4cc6d5327ccce254f371acad3be56d46eb5919b73f20dbdb1177b700f00891c5bf4ed128bb90ed541b778288285bcfa28432ab5cbcb8321b6e24760e998e0daa519f093a631e44276d7dd252ce0c08c75e2ab28a7349ead779f97d0f20a6d413bf3623cd216dc35375f6366690bcc41e3b2d5465840ec7ee0dc7e3f1c101d674a0c7dbccbc3942788b111396add2f8153b46a0e4b50d66e57ee92958f1c860dd97cc0e40e32febff915343ed53573142bdf4b) - + def _pkm_payload(self): w = self.wbuf() w.write(b'\x88\x99\xaa\xbb\xcc\xdd\xee\xff') @@ -51,11 +51,11 @@ class TestSSH1(object): w.write_int(72) w.write_int(36) return w.write_flush() - + def test_crc32(self): assert self.ssh1.crc32(b'') == 0x00 assert self.ssh1.crc32(b'The quick brown fox jumps over the lazy dog') == 0xb9c60808 - + def test_fingerprint(self): # pylint: disable=protected-access b, e, m = self._host_key() @@ -65,7 +65,7 @@ class TestSSH1(object): assert b == 2048 assert fp.md5 == 'MD5:9d:26:f8:39:fc:20:9d:9b:ca:cc:4a:0f:e1:93:f5:96' assert fp.sha256 == 'SHA256:vZdx3mhzbvVJmn08t/ruv8WDhJ9jfKYsCTuSzot+QIs' - + def _assert_pkm_keys(self, pkm, skey, hkey): b, e, m = skey assert pkm.server_key_bits == b @@ -75,7 +75,7 @@ class TestSSH1(object): assert pkm.host_key_bits == b assert pkm.host_key_public_exponent == e assert pkm.host_key_public_modulus == m - + def _assert_pkm_fields(self, pkm, skey, hkey): assert pkm is not None assert pkm.cookie == b'\x88\x99\xaa\xbb\xcc\xdd\xee\xff' @@ -88,25 +88,25 @@ class TestSSH1(object): fp = self.ssh.Fingerprint(pkm.host_key_fingerprint_data) assert fp.md5 == 'MD5:9d:26:f8:39:fc:20:9d:9b:ca:cc:4a:0f:e1:93:f5:96' assert fp.sha256 == 'SHA256:vZdx3mhzbvVJmn08t/ruv8WDhJ9jfKYsCTuSzot+QIs' - + def test_pkm_init(self): cookie = b'\x88\x99\xaa\xbb\xcc\xdd\xee\xff' pflags, cmask, amask = 2, 72, 36 skey, hkey = self._server_key(), self._host_key() pkm = self.ssh1.PublicKeyMessage(cookie, skey, hkey, pflags, cmask, amask) self._assert_pkm_fields(pkm, skey, hkey) - for skey2 in ([], [0], [0,1], [0,1,2,3]): + for skey2 in ([], [0], [0, 1], [0, 1, 2, 3]): with pytest.raises(ValueError): pkm = self.ssh1.PublicKeyMessage(cookie, skey2, hkey, pflags, cmask, amask) - for hkey2 in ([], [0], [0,1], [0,1,2,3]): + for hkey2 in ([], [0], [0, 1], [0, 1, 2, 3]): with pytest.raises(ValueError): print(hkey2) pkm = self.ssh1.PublicKeyMessage(cookie, skey, hkey2, pflags, cmask, amask) - + def test_pkm_read(self): pkm = self.ssh1.PublicKeyMessage.parse(self._pkm_payload()) self._assert_pkm_fields(pkm, self._server_key(), self._host_key()) - + def test_pkm_payload(self): cookie = b'\x88\x99\xaa\xbb\xcc\xdd\xee\xff' skey, hkey = self._server_key(), self._host_key() @@ -114,7 +114,7 @@ class TestSSH1(object): pkm1 = self.ssh1.PublicKeyMessage(cookie, skey, hkey, pflags, cmask, amask) pkm2 = self.ssh1.PublicKeyMessage.parse(self._pkm_payload()) assert pkm1.payload == pkm2.payload - + def test_ssh1_server_simple(self, output_spy, virtual_socket): vsocket = virtual_socket w = self.wbuf() @@ -126,7 +126,7 @@ class TestSSH1(object): self.audit(self._conf()) lines = output_spy.flush() assert len(lines) == 13 - + def test_ssh1_server_invalid_first_packet(self, output_spy, virtual_socket): vsocket = virtual_socket w = self.wbuf() diff --git a/test/test_ssh2.py b/test/test_ssh2.py index 24024e4..44c6f5c 100644 --- a/test/test_ssh2.py +++ b/test/test_ssh2.py @@ -1,6 +1,7 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -import struct, os +import os +import struct import pytest @@ -14,7 +15,7 @@ class TestSSH2(object): self.wbuf = ssh_audit.WriteBuf self.audit = ssh_audit.audit self.AuditConf = ssh_audit.AuditConf - + def _conf(self): conf = self.AuditConf('localhost', 22) conf.colors = False @@ -23,7 +24,7 @@ class TestSSH2(object): conf.ssh1 = False conf.ssh2 = True return conf - + @classmethod def _create_ssh2_packet(cls, payload): padding = -(len(payload) + 5) % 8 @@ -33,7 +34,7 @@ class TestSSH2(object): pad_bytes = b'\x00' * padding data = struct.pack('>Ib', plen, padding) + payload + pad_bytes return data - + def _kex_payload(self): w = self.wbuf() w.write(b'\x00\x11\x22\x33\x44\x55\x66\x77\x88\x99\xaa\xbb\xcc\xdd\xee\xff') @@ -50,7 +51,7 @@ class TestSSH2(object): w.write_byte(False) w.write_int(0) return w.write_flush() - + def test_kex_read(self): kex = self.ssh2.Kex.parse(self._kex_payload()) assert kex is not None @@ -69,7 +70,7 @@ class TestSSH2(object): assert kex.server.languages == [u''] assert kex.follows is False assert kex.unused == 0 - + def _get_empty_kex(self, cookie=None): kex_algs, key_algs = [], [] enc, mac, compression, languages = [], [], ['none'], [] @@ -80,7 +81,7 @@ class TestSSH2(object): cookie = os.urandom(16) kex = self.ssh2.Kex(cookie, kex_algs, key_algs, cli, srv, 0) return kex - + def _get_kex_variat1(self): cookie = b'\x00\x11\x22\x33\x44\x55\x66\x77\x88\x99\xaa\xbb\xcc\xdd\xee\xff' kex = self._get_empty_kex(cookie) @@ -123,12 +124,12 @@ class TestSSH2(object): continue kex.client.compression.append(a) return kex - + def test_key_payload(self): kex1 = self._get_kex_variat1() kex2 = self.ssh2.Kex.parse(self._kex_payload()) assert kex1.payload == kex2.payload - + @pytest.mark.skip(reason="Temporarily skip this test to have a working test suite!") def test_ssh2_server_simple(self, output_spy, virtual_socket): vsocket = virtual_socket diff --git a/test/test_ssh_algorithm.py b/test/test_ssh_algorithm.py index 5e03529..3572f39 100644 --- a/test/test_ssh_algorithm.py +++ b/test/test_ssh_algorithm.py @@ -8,24 +8,24 @@ class TestSSHAlgorithm(object): @pytest.fixture(autouse=True) def init(self, ssh_audit): self.ssh = ssh_audit.SSH - + def _tf(self, v, s=None): return self.ssh.Algorithm.Timeframe().update(v, s) - + def test_get_ssh_version(self): def ver(v): return self.ssh.Algorithm.get_ssh_version(v) - + assert ver('7.5') == ('OpenSSH', '7.5', False) assert ver('7.5C') == ('OpenSSH', '7.5', True) assert ver('d2016.74') == ('Dropbear SSH', '2016.74', False) assert ver('l10.7.4') == ('libssh', '0.7.4', False) assert ver('')[1] == '' - + def test_get_since_text(self): def gst(v): return self.ssh.Algorithm.get_since_text(v) - + assert gst(['7.5']) == 'available since OpenSSH 7.5' assert gst(['7.5C']) == 'available since OpenSSH 7.5 (client only)' assert gst(['7.5,']) == 'available since OpenSSH 7.5' @@ -33,12 +33,12 @@ class TestSSHAlgorithm(object): assert gst(['7.5,d2016.73']) == 'available since OpenSSH 7.5, Dropbear SSH 2016.73' assert gst(['l10.7.4']) is None assert gst([]) is None - + def test_timeframe_creation(self): # pylint: disable=line-too-long,too-many-statements def cmp_tf(v, s, r): assert str(self._tf(v, s)) == str(r) - + cmp_tf(['6.2'], None, {'OpenSSH': ['6.2', None, '6.2', None]}) cmp_tf(['6.2'], True, {'OpenSSH': ['6.2', None, None, None]}) cmp_tf(['6.2'], False, {'OpenSSH': [None, None, '6.2', None]}) @@ -57,7 +57,7 @@ class TestSSHAlgorithm(object): cmp_tf(['6.2C,6.3'], None, {'OpenSSH': ['6.3', None, '6.2', None]}) cmp_tf(['6.2C,6.3'], True, {'OpenSSH': ['6.3', None, None, None]}) cmp_tf(['6.2C,6.3'], False, {'OpenSSH': [None, None, '6.2', None]}) - + cmp_tf(['6.2', '6.6'], None, {'OpenSSH': ['6.2', '6.6', '6.2', '6.6']}) cmp_tf(['6.2', '6.6'], True, {'OpenSSH': ['6.2', '6.6', None, None]}) cmp_tf(['6.2', '6.6'], False, {'OpenSSH': [None, None, '6.2', '6.6']}) @@ -76,7 +76,7 @@ class TestSSHAlgorithm(object): cmp_tf(['6.2C,6.3', '6.6'], None, {'OpenSSH': ['6.3', '6.6', '6.2', '6.6']}) cmp_tf(['6.2C,6.3', '6.6'], True, {'OpenSSH': ['6.3', '6.6', None, None]}) cmp_tf(['6.2C,6.3', '6.6'], False, {'OpenSSH': [None, None, '6.2', '6.6']}) - + cmp_tf(['6.2', '6.6', None], None, {'OpenSSH': ['6.2', '6.6', '6.2', None]}) cmp_tf(['6.2', '6.6', None], True, {'OpenSSH': ['6.2', '6.6', None, None]}) cmp_tf(['6.2', '6.6', None], False, {'OpenSSH': [None, None, '6.2', None]}) @@ -95,7 +95,7 @@ class TestSSHAlgorithm(object): cmp_tf(['6.3C,6.2', '6.6', None], None, {'OpenSSH': ['6.2', '6.6', '6.3', None]}) cmp_tf(['6.3C,6.2', '6.6', None], True, {'OpenSSH': ['6.2', '6.6', None, None]}) cmp_tf(['6.3C,6.2', '6.6', None], False, {'OpenSSH': [None, None, '6.3', None]}) - + cmp_tf(['6.2', '6.6', '7.1'], None, {'OpenSSH': ['6.2', '6.6', '6.2', '7.1']}) cmp_tf(['6.2', '6.6', '7.1'], True, {'OpenSSH': ['6.2', '6.6', None, None]}) cmp_tf(['6.2', '6.6', '7.1'], False, {'OpenSSH': [None, None, '6.2', '7.1']}) @@ -111,7 +111,7 @@ class TestSSHAlgorithm(object): cmp_tf(['6.3C,6.2', '6.6', '7.1'], None, {'OpenSSH': ['6.2', '6.6', '6.3', '7.1']}) cmp_tf(['6.3C,6.2', '6.6', '7.1'], True, {'OpenSSH': ['6.2', '6.6', None, None]}) cmp_tf(['6.3C,6.2', '6.6', '7.1'], False, {'OpenSSH': [None, None, '6.3', '7.1']}) - + tf1 = self._tf(['6.1,d2016.72,6.2C', '6.6,d2016.73', '7.1,d2016.74']) tf2 = self._tf(['d2016.72,6.2C,6.1', 'd2016.73,6.6', 'd2016.74,7.1']) tf3 = self._tf(['d2016.72,6.2C,6.1', '6.6,d2016.73', '7.1,d2016.74']) @@ -123,7 +123,7 @@ class TestSSHAlgorithm(object): assert dv in str(tf1) and dv in str(tf2) and dv in str(tf3) assert ov in repr(tf1) and ov in repr(tf2) and ov in repr(tf3) assert dv in repr(tf1) and dv in repr(tf2) and dv in repr(tf3) - + def test_timeframe_object(self): tf = self._tf(['6.1,6.2C', '6.6', '7.1']) assert 'OpenSSH' in tf @@ -138,7 +138,7 @@ class TestSSHAlgorithm(object): assert tf.get_till('OpenSSH', True) == '6.6' assert tf.get_from('OpenSSH', False) == '6.2' assert tf.get_till('OpenSSH', False) == '7.1' - + tf = self._tf(['6.1,d2016.72,6.2C', '6.6,d2016.73', '7.1,d2016.74']) assert 'OpenSSH' in tf assert 'Dropbear SSH' in tf diff --git a/test/test_utils.py b/test/test_utils.py index 2a83bd8..78fd109 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -10,7 +10,7 @@ class TestUtils(object): def init(self, ssh_audit): self.utils = ssh_audit.Utils self.PY3 = sys.version_info >= (3,) - + def test_to_bytes_py2(self): if self.PY3: return @@ -22,7 +22,7 @@ class TestUtils(object): # other with pytest.raises(TypeError): self.utils.to_bytes(123) - + def test_to_bytes_py3(self): if not self.PY3: return @@ -34,7 +34,7 @@ class TestUtils(object): # other with pytest.raises(TypeError): self.utils.to_bytes(123) - + def test_to_utext_py2(self): if self.PY3: return @@ -46,7 +46,7 @@ class TestUtils(object): # other with pytest.raises(TypeError): self.utils.to_utext(123) - + def test_to_utext_py3(self): if not self.PY3: return @@ -58,7 +58,7 @@ class TestUtils(object): # other with pytest.raises(TypeError): self.utils.to_utext(123) - + def test_to_ntext_py2(self): if self.PY3: return @@ -70,7 +70,7 @@ class TestUtils(object): # other with pytest.raises(TypeError): self.utils.to_ntext(123) - + def test_to_ntext_py3(self): if not self.PY3: return @@ -82,7 +82,7 @@ class TestUtils(object): # other with pytest.raises(TypeError): self.utils.to_ntext(123) - + def test_is_ascii_py2(self): if self.PY3: return @@ -94,7 +94,7 @@ class TestUtils(object): assert self.utils.is_ascii('fran\xc3\xa7ais') is False # other assert self.utils.is_ascii(123) is False - + def test_is_ascii_py3(self): if not self.PY3: return @@ -105,7 +105,7 @@ class TestUtils(object): assert self.utils.is_ascii(u'fran\xe7ais') is False # other assert self.utils.is_ascii(123) is False - + def test_to_ascii_py2(self): if self.PY3: return @@ -119,7 +119,7 @@ class TestUtils(object): assert self.utils.to_ascii('fran\xc3\xa7ais', 'ignore') == 'franais' with pytest.raises(TypeError): self.utils.to_ascii(123) - + def test_to_ascii_py3(self): if not self.PY3: return @@ -132,7 +132,7 @@ class TestUtils(object): assert self.utils.to_ascii(u'fran\xe7ais', 'ignore') == 'franais' with pytest.raises(TypeError): self.utils.to_ascii(123) - + def test_is_print_ascii_py2(self): if self.PY3: return @@ -147,7 +147,7 @@ class TestUtils(object): assert self.utils.is_print_ascii('fran\xc3\xa7ais') is False # other assert self.utils.is_print_ascii(123) is False - + def test_is_print_ascii_py3(self): if not self.PY3: return @@ -160,7 +160,7 @@ class TestUtils(object): assert self.utils.is_print_ascii(u'fran\xe7ais') is False # other assert self.utils.is_print_ascii(123) is False - + def test_to_print_ascii_py2(self): if self.PY3: return @@ -180,7 +180,7 @@ class TestUtils(object): assert self.utils.to_print_ascii('fran\xc3\xa7ais\n', 'ignore') == 'franais' with pytest.raises(TypeError): self.utils.to_print_ascii(123) - + def test_to_print_ascii_py3(self): if not self.PY3: return @@ -199,18 +199,18 @@ class TestUtils(object): assert self.utils.to_print_ascii(u'fran\xe7ais\n', 'ignore') == 'franais' with pytest.raises(TypeError): self.utils.to_print_ascii(123) - + def test_ctoi(self): assert self.utils.ctoi(123) == 123 assert self.utils.ctoi('ABC') == 65 - + def test_parse_int(self): assert self.utils.parse_int(123) == 123 assert self.utils.parse_int('123') == 123 assert self.utils.parse_int(-123) == -123 assert self.utils.parse_int('-123') == -123 assert self.utils.parse_int('abc') == 0 - + def test_unique_seq(self): assert self.utils.unique_seq((1, 2, 2, 3, 3, 3)) == (1, 2, 3) assert self.utils.unique_seq((3, 3, 3, 2, 2, 1)) == (3, 2, 1) diff --git a/test/test_version_compare.py b/test/test_version_compare.py index b5c4a1f..b87179f 100644 --- a/test/test_version_compare.py +++ b/test/test_version_compare.py @@ -8,19 +8,19 @@ class TestVersionCompare(object): @pytest.fixture(autouse=True) def init(self, ssh_audit): self.ssh = ssh_audit.SSH - + def get_dropbear_software(self, v): b = self.ssh.Banner.parse('SSH-2.0-dropbear_{0}'.format(v)) return self.ssh.Software.parse(b) - + def get_openssh_software(self, v): b = self.ssh.Banner.parse('SSH-2.0-OpenSSH_{0}'.format(v)) return self.ssh.Software.parse(b) - + def get_libssh_software(self, v): b = self.ssh.Banner.parse('SSH-2.0-libssh-{0}'.format(v)) return self.ssh.Software.parse(b) - + def test_dropbear_compare_version_pre_years(self): s = self.get_dropbear_software('0.44') assert s.compare_version(None) == 1 @@ -32,7 +32,7 @@ class TestVersionCompare(object): assert s.between_versions('0.43', '0.45') assert s.between_versions('0.43', '0.43') is False assert s.between_versions('0.45', '0.43') is False - + def test_dropbear_compare_version_with_years(self): s = self.get_dropbear_software('2015.71') assert s.compare_version(None) == 1 @@ -44,7 +44,7 @@ class TestVersionCompare(object): assert s.between_versions('2014.66', '2016.74') assert s.between_versions('2014.66', '2015.69') is False assert s.between_versions('2016.74', '2014.66') is False - + def test_dropbear_compare_version_mixed(self): s = self.get_dropbear_software('0.53.1') assert s.compare_version(None) == 1 @@ -56,7 +56,7 @@ class TestVersionCompare(object): assert s.between_versions('0.53', '2011.54') assert s.between_versions('0.53', '0.53') is False assert s.between_versions('2011.54', '0.53') is False - + def test_dropbear_compare_version_patchlevel(self): s1 = self.get_dropbear_software('0.44') s2 = self.get_dropbear_software('0.44test3') @@ -80,7 +80,7 @@ class TestVersionCompare(object): assert s2.between_versions('0.44', '0.43') is False assert s1.compare_version(s2) > 0 assert s2.compare_version(s1) < 0 - + def test_dropbear_compare_version_sequential(self): versions = [] for i in range(28, 44): @@ -105,18 +105,18 @@ class TestVersionCompare(object): versions.append('2015.{0}'.format(i)) for i in range(72, 75): versions.append('2016.{0}'.format(i)) - l = len(versions) - for i in range(l): + length = len(versions) + for i in range(length): v = versions[i] s = self.get_dropbear_software(v) assert s.compare_version(v) == 0 if i - 1 >= 0: vbefore = versions[i - 1] assert s.compare_version(vbefore) > 0 - if i + 1 < l: + if i + 1 < length: vnext = versions[i + 1] assert s.compare_version(vnext) < 0 - + def test_openssh_compare_version_simple(self): s = self.get_openssh_software('3.7.1') assert s.compare_version(None) == 1 @@ -128,7 +128,7 @@ class TestVersionCompare(object): assert s.between_versions('3.7', '3.8') assert s.between_versions('3.6', '3.7') is False assert s.between_versions('3.8', '3.7') is False - + def test_openssh_compare_version_patchlevel(self): s1 = self.get_openssh_software('2.1.1') s2 = self.get_openssh_software('2.1.1p2') @@ -141,7 +141,7 @@ class TestVersionCompare(object): assert s2.compare_version('2.1.1p3') < 0 assert s1.compare_version(s2) == 0 assert s2.compare_version(s1) == 0 - + def test_openbsd_compare_version_sequential(self): versions = [] for v in ['1.2.3', '2.1.0', '2.1.1', '2.2.0', '2.3.0']: @@ -164,18 +164,18 @@ class TestVersionCompare(object): versions.append('6.{0}'.format(i)) for i in range(0, 4): versions.append('7.{0}'.format(i)) - l = len(versions) - for i in range(l): + length = len(versions) + for i in range(length): v = versions[i] s = self.get_openssh_software(v) assert s.compare_version(v) == 0 if i - 1 >= 0: vbefore = versions[i - 1] assert s.compare_version(vbefore) > 0 - if i + 1 < l: + if i + 1 < length: vnext = versions[i + 1] assert s.compare_version(vnext) < 0 - + def test_libssh_compare_version_simple(self): s = self.get_libssh_software('0.3') assert s.compare_version(None) == 1 @@ -187,7 +187,7 @@ class TestVersionCompare(object): assert s.between_versions('0.2', '0.3.1') assert s.between_versions('0.1', '0.2') is False assert s.between_versions('0.3.1', '0.2') is False - + def test_libssh_compare_version_sequential(self): versions = [] for v in ['0.2', '0.3']: @@ -202,14 +202,14 @@ class TestVersionCompare(object): versions.append('0.6.{0}'.format(i)) for i in range(0, 5): versions.append('0.7.{0}'.format(i)) - l = len(versions) - for i in range(l): + length = len(versions) + for i in range(length): v = versions[i] s = self.get_libssh_software(v) assert s.compare_version(v) == 0 if i - 1 >= 0: vbefore = versions[i - 1] assert s.compare_version(vbefore) > 0 - if i + 1 < l: + if i + 1 < length: vnext = versions[i + 1] assert s.compare_version(vnext) < 0 diff --git a/tox.ini b/tox.ini index bc21a99..6100fde 100644 --- a/tox.ini +++ b/tox.ini @@ -80,7 +80,7 @@ commands = deps = flake8 commands = - flake8 {posargs:{env:SSHAUDIT}} + flake8 {posargs:{env:SSHAUDIT} {toxinidir}/packages/setup.py {toxinidir}/test} --statistics [testenv:vulture] deps = @@ -137,42 +137,13 @@ max-module-lines = 2500 [flake8] ignore = - # indentation contains tabs - W191, - # blank line contains whitespace - W293, - # indentation contains mixed spaces and tabs - E101, - # multiple spaces before operator - E221, - # multiple spaces after operator - E241, - # multiple imports on one line - E401, - # line too long - E501, - # module imported but unused - F401, - # undefined name - F821, - # these exceptions should be handled one by one - E117, # over-indented - E126, # continuation line over-indented for hanging indent - E128, # continuation line under-indented for visual indent - E226, # missing whitespace around arithmetic operator - E231, # missing whitespace after ',' - E251, # unexpected spaces around keyword / parameter equals - E261, # at least two spaces before inline comment - E265, # block comment should start with '# ' - E301, # expected 1 blank line, found 0 - E302, # expected 2 blank lines, found 1 - E303, # too many blank lines (2) - E305, # expected 2 blank lines after class or function definition, found 1 - E711, # comparison to None should be 'if cond is not None:' - E712, # comparison to False should be 'if cond is False:' or 'if not cond:' - E722, # do not use bare 'except' - E741, # ambiguous variable name 'l' - F601, # dictionary key 'ecdsa-sha2-1.3.132.0.10' repeated with different values - F841, # local variable 'e' is assigned to but never used - W504, # line break after binary operator - W605, # invalid escape sequence '\s' + W191, # indentation contains tabs + E101, # indentation contains mixed spaces and tabs + E241, # multiple spaces after operator; should be kept for tabular data + E501, # line too long + E117, # over-indented + E126, # continuation line over-indented for hanging indent + E128, # continuation line under-indented for visual indent + E722, # do not use bare 'except' + F601, # dictionary key 'ecdsa-sha2-1.3.132.0.10' repeated with different values + W504, # line break after binary operator; this (or W503) has to stay