mirror of
https://github.com/jtesta/ssh-audit.git
synced 2025-01-24 13:15:37 +01:00
Re-enable mypy options (#43)
* Convert type comments to annotations Notes: - variable annotations are only possible for Python 3.6 and upwards - class names as a result of a function have to be quoted cf https://www.python.org/dev/peps/pep-0563/#enabling-the-future-behavior-in-python-3-7 This is ongoing work for #32 modified: ssh-audit.py * Do not use variable annotation ... as this feature works only for Python 3.6 and above only. modified: ssh-audit.py * Re-enable strict_optional `None` is a valid return type for mypy, even when you specify a certain type. `strict_optional` makes sure that only the annotated return type is actually returned. modified: tox.ini * Re-enable `warn_unused_ignores` Quote from mypy docs: This flag will make mypy report an error whenever your code uses a `# type: ignore` comment on a line that is not actually generating an error message. modified: tox.ini * Re-enable `warn_return_any` Quote from the documenation: "This flag causes mypy to generate a warning when returning a value with type Any from a function declared with a non-Any return type." modified: tox.ini * Re-enable `warn_redundant_casts` Quote from the documentation: "This flag will make mypy report an error whenever your code uses an unnecessary cast that can safely be removed." modified: tox.ini * Remove `warn_incomplete_stub` ... as the documentation says "This flag is mainly intended to be used by people who want contribute to typeshed and would like a convenient way to find gaps and omissions." modified: tox.ini * Re-enable `disallow_subclassing_any` Quote from the documentation: "This flag reports an error whenever a class subclasses a value of type Any." modified: tox.ini * Re-enable `follow_imports` ... and set it to `normal`. For more information, see https://mypy.readthedocs.io/en/latest/running_mypy.html#follow-imports modified: tox.ini * Re-enable `ignore_missing_imports` Quote from the documentation: "This flag makes mypy ignore all missing imports. It is equivalent to adding # type: ignore comments to all unresolved imports within your codebase." modified: tox.ini * Fix arguments for Kex initialization `follows` has to be a boolean, but an int was provided. This worked, as in Python boolean is a subtype of int. modified: ssh-audit.py * Do not uncomment `check_untyped_defs` yet modified: tox.ini * Change KexDH.__ed25519_pubkey's default type It was initialized with 0 (int), and later it gets set with bytes. Now, it gets initialized with None, and thus gets the type Optional[bytes]. Optional means None or the named type. modified: ssh-audit.py * Fix whitespace modified: tox.ini * Add type annotation for main function modified: ssh-audit.py * Add type annotation for KexDH.set_params modified: ssh-audit.py * Add type annotation for Kex.set_rsa_key_size modified: ssh-audit.py * Add type annotation for Kex.rsa_key_sizes modified: ssh-audit.py * Add type annotation for Kex.set_dh_modulus_size modified: ssh-audit.py * Add type annotation to Kex.dh_modulus_sizes modified: ssh-audit.py * Add type annotation for Kex.set_host_key modified: ssh-audit.py * Add type annotation for Kex.host_keys modified: ssh-audit.py * Add type annotation for HostKeyTest.run modified: ssh-audit.py * Add static typing to HostKeyTest.perform_test This revealed a small oversight in the guard protecting the call to perform_test. modified: ssh-audit.py * Add type annotation for GexTest.reconnect modified: ssh-audit.py * Add type annotation for GexTest.run modified: ssh-audit.py * Add type annotation for ReadBuf.reset modified: ssh-audit.py * Add type annoation for WriteBuf.reset modified: ssh-audit.py * Add type annotation to Socket.listen_and_accept modified: ssh-audit.py * Move comment for is_connected into docstring. modified: ssh-audit.py * Add type annotation for Socket.is_connected modified: ssh-audit.py * Add type annotation for Socket.close modified: ssh-audit.py * Do not commit breakpoint modified: ssh-audit.py * Add annotations for KexDH key size handling modified: ssh-audit.py * Add type annotation for KexDH.get_ca_size modified: ssh-audit.py * Add type annotation to output_info modified: ssh-audit.py * Add type annotation for KexDH.__get_bytes modified: ssh-audit.py * Add type annotation to KexGroup14.__init__ modified: ssh-audit.py * Add type annotation for KexGroup14_SHA256.__init__ modified: ssh-audit.py * Add type annotation for KexGroup16_SHA512.__init__ modified: ssh-audit.py * Add type annotation for KexGroup18_SHA512.__init__ modified: ssh-audit.py * Add type annotation for KexCurve25519_SHA256.__init__ modified: ssh-audit.py * Add type annotation for KexNISTP256.__init__ modified: ssh-audit.py * Add type annotations to several init methods modified: ssh-audit.py * Add type annotataion for KexGroupExchange.send_init_gex modified: ssh-audit.py * Add type annotation for KexGroupExchange.__init__ modified: ssh-audit.py * Add type annotation to KexCurve25519_SHA256.send_init modified: ssh-audit.py * Add type annotation for KexNISTP256.sent_init modified: ssh-audit.py * Add type annotation for KexNISTP384.send_init modified: ssh-audit.py * Add type annotation for KexNISTP521.send_init modified: ssh-audit.py * Add type annotation for KexGroupExchange.send_init modified: ssh-audit.py * Add type annotation to KexDH.get_dh_modulus_size modified: ssh-audit.py * Delete unused variables KexDH.__f and f_len __f was initialized as int, then assigned to bytes, but never used. f_len assigned an int, but not all. modified: ssh-audit.py * Delete unused variables KexDH.__h_sig and h_sig_len modified: ssh-audit.py * Add type annotation for KexDH.__hostkey_type modified: ssh-audit.py
This commit is contained in:
parent
a75be9ab41
commit
da31c19d38
104
ssh-audit.py
104
ssh-audit.py
@ -571,22 +571,22 @@ class SSH2: # pylint: disable=too-few-public-methods
|
||||
def unused(self) -> int:
|
||||
return self.__unused
|
||||
|
||||
def set_rsa_key_size(self, rsa_type, hostkey_size, ca_size=-1):
|
||||
def set_rsa_key_size(self, rsa_type: str, hostkey_size: int, ca_size: int = -1) -> None:
|
||||
self.__rsa_key_sizes[rsa_type] = (hostkey_size, ca_size)
|
||||
|
||||
def rsa_key_sizes(self):
|
||||
def rsa_key_sizes(self) -> Dict[str, Tuple[int, int]]:
|
||||
return self.__rsa_key_sizes
|
||||
|
||||
def set_dh_modulus_size(self, gex_alg, modulus_size):
|
||||
def set_dh_modulus_size(self, gex_alg: str, modulus_size: int) -> None:
|
||||
self.__dh_modulus_sizes[gex_alg] = (modulus_size, -1)
|
||||
|
||||
def dh_modulus_sizes(self):
|
||||
def dh_modulus_sizes(self) -> Dict[str, Tuple[int, int]]:
|
||||
return self.__dh_modulus_sizes
|
||||
|
||||
def set_host_key(self, key_type, hostkey):
|
||||
def set_host_key(self, key_type: str, hostkey: bytes) -> None:
|
||||
self.__host_keys[key_type] = hostkey
|
||||
|
||||
def host_keys(self):
|
||||
def host_keys(self) -> Dict[str, bytes]:
|
||||
return self.__host_keys
|
||||
|
||||
def write(self, wbuf: 'WriteBuf') -> None:
|
||||
@ -649,7 +649,7 @@ class SSH2: # pylint: disable=too-few-public-methods
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def run(s, server_kex):
|
||||
def run(s: 'SSH.Socket', server_kex: 'SSH2.Kex') -> None:
|
||||
KEX_TO_DHGROUP = {
|
||||
'diffie-hellman-group1-sha1': KexGroup1,
|
||||
'diffie-hellman-group14-sha1': KexGroup14_SHA1,
|
||||
@ -676,11 +676,11 @@ class SSH2: # pylint: disable=too-few-public-methods
|
||||
kex_group = KEX_TO_DHGROUP[kex_str]()
|
||||
break
|
||||
|
||||
if kex_str is not None:
|
||||
if kex_str is not None and kex_group is not None:
|
||||
SSH2.HostKeyTest.perform_test(s, server_kex, kex_str, kex_group, SSH2.HostKeyTest.HOST_KEY_TYPES)
|
||||
|
||||
@staticmethod
|
||||
def perform_test(s, server_kex, kex_str, kex_group, host_key_types):
|
||||
def perform_test(s: 'SSH.Socket', server_kex: 'SSH2.Kex', kex_str: str, kex_group: 'KexDH', host_key_types: Dict[str, Dict[str, bool]]) -> None:
|
||||
hostkey_modulus_size = 0
|
||||
ca_modulus_size = 0
|
||||
|
||||
@ -714,7 +714,7 @@ class SSH2: # pylint: disable=too-few-public-methods
|
||||
# selected kex and host key type. Send the server's own
|
||||
# list of ciphers and MACs back to it (this doesn't
|
||||
# matter, really).
|
||||
client_kex = SSH2.Kex(os.urandom(16), [kex_str], [host_key_type], server_kex.client, server_kex.server, 0, 0)
|
||||
client_kex = SSH2.Kex(os.urandom(16), [kex_str], [host_key_type], server_kex.client, server_kex.server, False, 0)
|
||||
|
||||
s.write_byte(SSH.Protocol.MSG_KEXINIT)
|
||||
client_kex.write(s)
|
||||
@ -768,7 +768,7 @@ class SSH2: # pylint: disable=too-few-public-methods
|
||||
|
||||
# Creates a new connection to the server. Returns True on success, or False.
|
||||
@staticmethod
|
||||
def reconnect(s, gex_alg):
|
||||
def reconnect(s: 'SSH.Socket', gex_alg: str) -> bool:
|
||||
if s.is_connected():
|
||||
return True
|
||||
|
||||
@ -787,7 +787,7 @@ class SSH2: # pylint: disable=too-few-public-methods
|
||||
|
||||
# Send our KEX using the specified group-exchange and most of the
|
||||
# server's own values.
|
||||
client_kex = SSH2.Kex(os.urandom(16), [gex_alg], kex.key_algorithms, kex.client, kex.server, 0, 0)
|
||||
client_kex = SSH2.Kex(os.urandom(16), [gex_alg], kex.key_algorithms, kex.client, kex.server, False, 0)
|
||||
s.write_byte(SSH.Protocol.MSG_KEXINIT)
|
||||
client_kex.write(s)
|
||||
s.send_packet()
|
||||
@ -795,7 +795,7 @@ class SSH2: # pylint: disable=too-few-public-methods
|
||||
|
||||
# Runs the DH moduli test against the specified target.
|
||||
@staticmethod
|
||||
def run(s, kex):
|
||||
def run(s: 'SSH.Socket', kex: 'SSH2.Kex') -> None:
|
||||
GEX_ALGS = {
|
||||
'diffie-hellman-group-exchange-sha1': KexGroupExchange_SHA1,
|
||||
'diffie-hellman-group-exchange-sha256': KexGroupExchange_SHA256,
|
||||
@ -1115,7 +1115,7 @@ class ReadBuf:
|
||||
def read_line(self) -> str:
|
||||
return self._buf.readline().rstrip().decode('utf-8', 'replace')
|
||||
|
||||
def reset(self):
|
||||
def reset(self) -> None:
|
||||
self._buf = io.BytesIO()
|
||||
self._len = 0
|
||||
|
||||
@ -1195,7 +1195,7 @@ class WriteBuf:
|
||||
self._wbuf.seek(0)
|
||||
return payload
|
||||
|
||||
def reset(self):
|
||||
def reset(self) -> None:
|
||||
self._wbuf = io.BytesIO()
|
||||
|
||||
|
||||
@ -1941,7 +1941,7 @@ class SSH: # pylint: disable=too-few-public-methods
|
||||
|
||||
# Listens on a server socket and accepts one connection (used for
|
||||
# auditing client connections).
|
||||
def listen_and_accept(self):
|
||||
def listen_and_accept(self) -> None:
|
||||
|
||||
try:
|
||||
# Socket to listen on all IPv4 addresses.
|
||||
@ -2147,11 +2147,11 @@ class SSH: # pylint: disable=too-few-public-methods
|
||||
data = struct.pack('>Ib', plen, padding) + payload + pad_bytes
|
||||
return self.send(data)
|
||||
|
||||
# Returns True if this Socket is connected, otherwise False.
|
||||
def is_connected(self):
|
||||
def is_connected(self) -> bool:
|
||||
"""Returns true if this Socket is connected, False otherwise."""
|
||||
return self.__sock is not None
|
||||
|
||||
def close(self):
|
||||
def close(self) -> None:
|
||||
self.__cleanup()
|
||||
self.reset()
|
||||
self.__state = 0
|
||||
@ -2187,16 +2187,14 @@ class KexDH: # pragma: nocover
|
||||
self.__e = 0
|
||||
self.set_params(g, p)
|
||||
|
||||
self.__ed25519_pubkey = 0
|
||||
self.__hostkey_type = None
|
||||
self.__ed25519_pubkey = None # type: Optional[bytes]
|
||||
self.__hostkey_type = None # type: Optional[bytes]
|
||||
self.__hostkey_e = 0
|
||||
self.__hostkey_n = 0
|
||||
self.__hostkey_n_len = 0 # Length of the host key modulus.
|
||||
self.__ca_n_len = 0 # Length of the CA key modulus (if hostkey is a cert).
|
||||
self.__f = 0
|
||||
self.__h_sig = 0
|
||||
|
||||
def set_params(self, g, p):
|
||||
def set_params(self, g: int, p: int) -> None:
|
||||
self.__g = g
|
||||
self.__p = p
|
||||
self.__q = (self.__p - 1) // 2
|
||||
@ -2230,7 +2228,7 @@ class KexDH: # pragma: nocover
|
||||
# will remain at length 0.
|
||||
return None
|
||||
|
||||
hostkey_len = f_len = h_sig_len = 0 # pylint: disable=unused-variable
|
||||
hostkey_len = 0 # pylint: disable=unused-variable
|
||||
hostkey_type_len = hostkey_e_len = 0 # pylint: disable=unused-variable
|
||||
key_id_len = principles_len = 0 # pylint: disable=unused-variable
|
||||
critical_options_len = extensions_len = 0 # pylint: disable=unused-variable
|
||||
@ -2250,8 +2248,8 @@ class KexDH: # pragma: nocover
|
||||
if not parse_host_key_size:
|
||||
return hostkey
|
||||
|
||||
self.__f, f_len, ptr = KexDH.__get_bytes(payload, ptr)
|
||||
self.__h_sig, h_sig_len, ptr = KexDH.__get_bytes(payload, ptr)
|
||||
_, _, ptr = KexDH.__get_bytes(payload, ptr)
|
||||
_, _, ptr = KexDH.__get_bytes(payload, ptr)
|
||||
|
||||
# Now pick apart the host key blob.
|
||||
# Get the host key type (i.e.: 'ssh-rsa', 'ssh-ed25519', etc).
|
||||
@ -2327,7 +2325,7 @@ class KexDH: # pragma: nocover
|
||||
return hostkey
|
||||
|
||||
@staticmethod
|
||||
def __get_bytes(buf, ptr):
|
||||
def __get_bytes(buf: bytes, ptr: int) -> Tuple[bytes, int, int]:
|
||||
num_bytes = struct.unpack('>I', buf[ptr:ptr + 4])[0]
|
||||
ptr += 4
|
||||
return buf[ptr:ptr + num_bytes], num_bytes, ptr + num_bytes
|
||||
@ -2335,7 +2333,7 @@ class KexDH: # pragma: nocover
|
||||
# Converts a modulus length in bytes to its size in bits, after some
|
||||
# possible adjustments.
|
||||
@staticmethod
|
||||
def __adjust_key_size(size):
|
||||
def __adjust_key_size(size: int) -> int:
|
||||
size = size * 8
|
||||
# Actual keys are observed to be about 8 bits bigger than expected
|
||||
# (i.e.: 1024-bit keys have a 1032-bit modulus). Check if this is
|
||||
@ -2346,15 +2344,15 @@ class KexDH: # pragma: nocover
|
||||
return size
|
||||
|
||||
# Returns the size of the hostkey, in bits.
|
||||
def get_hostkey_size(self):
|
||||
def get_hostkey_size(self) -> int:
|
||||
return KexDH.__adjust_key_size(self.__hostkey_n_len)
|
||||
|
||||
# Returns the size of the CA key, in bits.
|
||||
def get_ca_size(self):
|
||||
def get_ca_size(self) -> int:
|
||||
return KexDH.__adjust_key_size(self.__ca_n_len)
|
||||
|
||||
# Returns the size of the DH modulus, in bits.
|
||||
def get_dh_modulus_size(self):
|
||||
def get_dh_modulus_size(self) -> int:
|
||||
# -2 to account for the '0b' prefix in the string.
|
||||
return len(bin(self.__p)) - 2
|
||||
|
||||
@ -2374,36 +2372,36 @@ class KexGroup14(KexDH): # pragma: nocover
|
||||
|
||||
|
||||
class KexGroup14_SHA1(KexGroup14):
|
||||
def __init__(self):
|
||||
def __init__(self) -> None:
|
||||
super(KexGroup14_SHA1, self).__init__('sha1')
|
||||
|
||||
|
||||
class KexGroup14_SHA256(KexGroup14):
|
||||
def __init__(self):
|
||||
def __init__(self) -> None:
|
||||
super(KexGroup14_SHA256, self).__init__('sha256')
|
||||
|
||||
|
||||
class KexGroup16_SHA512(KexDH):
|
||||
def __init__(self):
|
||||
def __init__(self) -> None:
|
||||
# rfc3526: 4096-bit modp group
|
||||
p = int('ffffffffffffffffc90fdaa22168c234c4c6628b80dc1cd129024e088a67cc74020bbea63b139b22514a08798e3404ddef9519b3cd3a431b302b0a6df25f14374fe1356d6d51c245e485b576625e7ec6f44c42e9a637ed6b0bff5cb6f406b7edee386bfb5a899fa5ae9f24117c4b1fe649286651ece45b3dc2007cb8a163bf0598da48361c55d39a69163fa8fd24cf5f83655d23dca3ad961c62f356208552bb9ed529077096966d670c354e4abc9804f1746c08ca18217c32905e462e36ce3be39e772c180e86039b2783a2ec07a28fb5c55df06f4c52c9de2bcbf6955817183995497cea956ae515d2261898fa051015728e5a8aaac42dad33170d04507a33a85521abdf1cba64ecfb850458dbef0a8aea71575d060c7db3970f85a6e1e4c7abf5ae8cdb0933d71e8c94e04a25619dcee3d2261ad2ee6bf12ffa06d98a0864d87602733ec86a64521f2b18177b200cbbe117577a615d6c770988c0bad946e208e24fa074e5ab3143db5bfce0fd108e4b82d120a92108011a723c12a787e6d788719a10bdba5b2699c327186af4e23c1a946834b6150bda2583e9ca2ad44ce8dbbbc2db04de8ef92e8efc141fbecaa6287c59474e6bc05d99b2964fa090c3a2233ba186515be7ed1f612970cee2d7afb81bdd762170481cd0069127d5b05aa993b4ea988d8fddc186ffb7dc90a6c08f4df435c934063199ffffffffffffffff', 16)
|
||||
super(KexGroup16_SHA512, self).__init__('KexGroup16_SHA512', 'sha512', 2, p)
|
||||
|
||||
|
||||
class KexGroup18_SHA512(KexDH):
|
||||
def __init__(self):
|
||||
def __init__(self) -> None:
|
||||
# rfc3526: 8192-bit modp group
|
||||
p = int('ffffffffffffffffc90fdaa22168c234c4c6628b80dc1cd129024e088a67cc74020bbea63b139b22514a08798e3404ddef9519b3cd3a431b302b0a6df25f14374fe1356d6d51c245e485b576625e7ec6f44c42e9a637ed6b0bff5cb6f406b7edee386bfb5a899fa5ae9f24117c4b1fe649286651ece45b3dc2007cb8a163bf0598da48361c55d39a69163fa8fd24cf5f83655d23dca3ad961c62f356208552bb9ed529077096966d670c354e4abc9804f1746c08ca18217c32905e462e36ce3be39e772c180e86039b2783a2ec07a28fb5c55df06f4c52c9de2bcbf6955817183995497cea956ae515d2261898fa051015728e5a8aaac42dad33170d04507a33a85521abdf1cba64ecfb850458dbef0a8aea71575d060c7db3970f85a6e1e4c7abf5ae8cdb0933d71e8c94e04a25619dcee3d2261ad2ee6bf12ffa06d98a0864d87602733ec86a64521f2b18177b200cbbe117577a615d6c770988c0bad946e208e24fa074e5ab3143db5bfce0fd108e4b82d120a92108011a723c12a787e6d788719a10bdba5b2699c327186af4e23c1a946834b6150bda2583e9ca2ad44ce8dbbbc2db04de8ef92e8efc141fbecaa6287c59474e6bc05d99b2964fa090c3a2233ba186515be7ed1f612970cee2d7afb81bdd762170481cd0069127d5b05aa993b4ea988d8fddc186ffb7dc90a6c08f4df435c93402849236c3fab4d27c7026c1d4dcb2602646dec9751e763dba37bdf8ff9406ad9e530ee5db382f413001aeb06a53ed9027d831179727b0865a8918da3edbebcf9b14ed44ce6cbaced4bb1bdb7f1447e6cc254b332051512bd7af426fb8f401378cd2bf5983ca01c64b92ecf032ea15d1721d03f482d7ce6e74fef6d55e702f46980c82b5a84031900b1c9e59e7c97fbec7e8f323a97a7e36cc88be0f1d45b7ff585ac54bd407b22b4154aacc8f6d7ebf48e1d814cc5ed20f8037e0a79715eef29be32806a1d58bb7c5da76f550aa3d8a1fbff0eb19ccb1a313d55cda56c9ec2ef29632387fe8d76e3c0468043e8f663f4860ee12bf2d5b0b7474d6e694f91e6dbe115974a3926f12fee5e438777cb6a932df8cd8bec4d073b931ba3bc832b68d9dd300741fa7bf8afc47ed2576f6936ba424663aab639c5ae4f5683423b4742bf1c978238f16cbe39d652de3fdb8befc848ad922222e04a4037c0713eb57a81a23f0c73473fc646cea306b4bcbc8862f8385ddfa9d4b7fa2c087e879683303ed5bdd3a062b3cf5b3a278a66d2a13f83f44f82ddf310ee074ab6a364597e899a0255dc164f31cc50846851df9ab48195ded7ea1b1d510bd7ee74d73faf36bc31ecfa268359046f4eb879f924009438b481c6cd7889a002ed5ee382bc9190da6fc026e479558e4475677e9aa9e3050e2765694dfc81f56e880b96e7160c980dd98edd3dfffffffffffffffff', 16)
|
||||
super(KexGroup18_SHA512, self).__init__('KexGroup18_SHA512', 'sha512', 2, p)
|
||||
|
||||
|
||||
class KexCurve25519_SHA256(KexDH):
|
||||
def __init__(self):
|
||||
def __init__(self) -> None:
|
||||
super(KexCurve25519_SHA256, self).__init__('KexCurve25519_SHA256', 'sha256', 0, 0)
|
||||
|
||||
# To start an ED25519 kex, we simply send a random 256-bit number as the
|
||||
# public key.
|
||||
def send_init(self, s, init_msg=SSH.Protocol.MSG_KEXDH_INIT):
|
||||
def send_init(self, s: 'SSH.Socket', init_msg: int = SSH.Protocol.MSG_KEXDH_INIT) -> None:
|
||||
self.__ed25519_pubkey = os.urandom(32)
|
||||
s.write_byte(init_msg)
|
||||
s.write_string(self.__ed25519_pubkey)
|
||||
@ -2411,7 +2409,7 @@ class KexCurve25519_SHA256(KexDH):
|
||||
|
||||
|
||||
class KexNISTP256(KexDH):
|
||||
def __init__(self):
|
||||
def __init__(self) -> None:
|
||||
super(KexNISTP256, self).__init__('KexNISTP256', 'sha256', 0, 0)
|
||||
|
||||
# Because the server checks that the value sent here is valid (i.e.: it lies
|
||||
@ -2419,39 +2417,39 @@ class KexNISTP256(KexDH):
|
||||
# or import an elliptic curve library in order to randomly generate a
|
||||
# valid elliptic point each time. Hence, we will simply send a static
|
||||
# value, which is enough for us to extract the server's host key.
|
||||
def send_init(self, s, init_msg=SSH.Protocol.MSG_KEXDH_INIT):
|
||||
def send_init(self, s: 'SSH.Socket', init_msg: int = SSH.Protocol.MSG_KEXDH_INIT) -> None:
|
||||
s.write_byte(init_msg)
|
||||
s.write_string(b'\x04\x0b\x60\x44\x9f\x8a\x11\x9e\xc7\x81\x0c\xa9\x98\xfc\xb7\x90\xaa\x6b\x26\x8c\x12\x4a\xc0\x09\xbb\xdf\xc4\x2c\x4c\x2c\x99\xb6\xe1\x71\xa0\xd4\xb3\x62\x47\x74\xb3\x39\x0c\xf2\x88\x4a\x84\x6b\x3b\x15\x77\xa5\x77\xd2\xa9\xc9\x94\xf9\xd5\x66\x19\xcd\x02\x34\xd1')
|
||||
s.send_packet()
|
||||
|
||||
|
||||
class KexNISTP384(KexDH):
|
||||
def __init__(self):
|
||||
def __init__(self) -> None:
|
||||
super(KexNISTP384, self).__init__('KexNISTP384', 'sha256', 0, 0)
|
||||
|
||||
# See comment for KexNISTP256.send_init().
|
||||
def send_init(self, s, init_msg=SSH.Protocol.MSG_KEXDH_INIT):
|
||||
def send_init(self, s: 'SSH.Socket', init_msg: int = SSH.Protocol.MSG_KEXDH_INIT) -> None:
|
||||
s.write_byte(init_msg)
|
||||
s.write_string(b'\x04\xe2\x9b\x84\xce\xa1\x39\x50\xfe\x1e\xa3\x18\x70\x1c\xe2\x7a\xe4\xb5\x6f\xdf\x93\x9f\xd4\xf4\x08\xcc\x9b\x02\x10\xa4\xca\x77\x9c\x2e\x51\x44\x1d\x50\x7a\x65\x4e\x7e\x2f\x10\x2d\x2d\x4a\x32\xc9\x8e\x18\x75\x90\x6c\x19\x10\xda\xcc\xa8\xe9\xf4\xc4\x3a\x53\x80\x35\xf4\x97\x9c\x04\x16\xf9\x5a\xdc\xcc\x05\x94\x29\xfa\xc4\xd6\x87\x4e\x13\x21\xdb\x3d\x12\xac\xbd\x20\x3b\x60\xff\xe6\x58\x42')
|
||||
s.send_packet()
|
||||
|
||||
|
||||
class KexNISTP521(KexDH):
|
||||
def __init__(self):
|
||||
def __init__(self) -> None:
|
||||
super(KexNISTP521, self).__init__('KexNISTP521', 'sha256', 0, 0)
|
||||
|
||||
# See comment for KexNISTP256.send_init().
|
||||
def send_init(self, s, init_msg=SSH.Protocol.MSG_KEXDH_INIT):
|
||||
def send_init(self, s: 'SSH.Socket', init_msg: int = SSH.Protocol.MSG_KEXDH_INIT) -> None:
|
||||
s.write_byte(init_msg)
|
||||
s.write_string(b'\x04\x01\x02\x90\x29\xe9\x8f\xa8\x04\xaf\x1c\x00\xf9\xc6\x29\xc0\x39\x74\x8e\xea\x47\x7e\x7c\xf7\x15\x6e\x43\x3b\x59\x13\x53\x43\xb0\xae\x0b\xe7\xe6\x7c\x55\x73\x52\xa5\x2a\xc1\x42\xde\xfc\xf4\x1f\x8b\x5a\x8d\xfa\xcd\x0a\x65\x77\xa8\xce\x68\xd2\xc6\x26\xb5\x3f\xee\x4b\x01\x7b\xd2\x96\x23\x69\x53\xc7\x01\xe1\x0d\x39\xe9\x87\x49\x3b\xc8\xec\xda\x0c\xf9\xca\xad\x89\x42\x36\x6f\x93\x78\x78\x31\x55\x51\x09\x51\xc0\x96\xd7\xea\x61\xbf\xc2\x44\x08\x80\x43\xed\xc6\xbb\xfb\x94\xbd\xf8\xdf\x2b\xd8\x0b\x2e\x29\x1b\x8c\xc4\x8a\x04\x2d\x3a')
|
||||
s.send_packet()
|
||||
|
||||
|
||||
class KexGroupExchange(KexDH):
|
||||
def __init__(self, classname, hash_alg):
|
||||
def __init__(self, classname: str, hash_alg: str) -> None:
|
||||
super(KexGroupExchange, self).__init__(classname, hash_alg, 0, 0)
|
||||
|
||||
def send_init(self, s, init_msg=SSH.Protocol.MSG_KEXDH_GEX_REQUEST):
|
||||
def send_init(self, s: 'SSH.Socket', init_msg: int = SSH.Protocol.MSG_KEXDH_GEX_REQUEST) -> None:
|
||||
self.send_init_gex(s)
|
||||
|
||||
# The group exchange starts with sending a message to the server with
|
||||
@ -2459,7 +2457,7 @@ class KexGroupExchange(KexDH):
|
||||
# The server responds with a generator and prime modulus that matches that,
|
||||
# then the handshake continues on like a normal DH handshake (except the
|
||||
# SSH message types differ).
|
||||
def send_init_gex(self, s, minbits=1024, prefbits=2048, maxbits=8192):
|
||||
def send_init_gex(self, s: 'SSH.Socket', minbits: int = 1024, prefbits: int = 2048, maxbits: int = 8192) -> None:
|
||||
|
||||
# Send the initial group exchange request. Tell the server what range
|
||||
# of modulus sizes we will accept, along with our preference.
|
||||
@ -2499,16 +2497,16 @@ class KexGroupExchange(KexDH):
|
||||
|
||||
|
||||
class KexGroupExchange_SHA1(KexGroupExchange):
|
||||
def __init__(self):
|
||||
def __init__(self) -> None:
|
||||
super(KexGroupExchange_SHA1, self).__init__('KexGroupExchange_SHA1', 'sha1')
|
||||
|
||||
|
||||
class KexGroupExchange_SHA256(KexGroupExchange):
|
||||
def __init__(self):
|
||||
def __init__(self) -> None:
|
||||
super(KexGroupExchange_SHA256, self).__init__('KexGroupExchange_SHA256', 'sha256')
|
||||
|
||||
|
||||
def output_algorithms(title: str, alg_db: Dict[str, Dict[str, List[List[Optional[str]]]]], alg_type: str, algorithms: List[str], unknown_algs: List[str], maxlen: int = 0, alg_sizes: Optional[Dict[str, Iterable[int]]] = None) -> None:
|
||||
def output_algorithms(title: str, alg_db: Dict[str, Dict[str, List[List[Optional[str]]]]], alg_type: str, algorithms: List[str], unknown_algs: List[str], maxlen: int = 0, alg_sizes: Optional[Dict[str, Tuple[int, int]]] = None) -> None:
|
||||
with OutputBuffer() as obuf:
|
||||
for algorithm in algorithms:
|
||||
output_algorithm(alg_db, alg_type, algorithm, unknown_algs, maxlen, alg_sizes)
|
||||
@ -2518,7 +2516,7 @@ def output_algorithms(title: str, alg_db: Dict[str, Dict[str, List[List[Optional
|
||||
out.sep()
|
||||
|
||||
|
||||
def output_algorithm(alg_db: Dict[str, Dict[str, List[List[Optional[str]]]]], alg_type: str, alg_name: str, unknown_algs: List[str], alg_max_len: int = 0, alg_sizes: Optional[Dict[str, Iterable[int]]] = None) -> None:
|
||||
def output_algorithm(alg_db: Dict[str, Dict[str, List[List[Optional[str]]]]], alg_type: str, alg_name: str, unknown_algs: List[str], alg_max_len: int = 0, alg_sizes: Optional[Dict[str, Tuple[int, int]]] = None) -> None:
|
||||
prefix = '(' + alg_type + ') '
|
||||
if alg_max_len == 0:
|
||||
alg_max_len = len(alg_name)
|
||||
@ -2767,7 +2765,7 @@ def output_recommendations(algs: SSH.Algorithms, software: Optional[SSH.Software
|
||||
|
||||
|
||||
# Output additional information & notes.
|
||||
def output_info(software, client_audit, any_problems):
|
||||
def output_info(software: Optional['SSH.Software'], client_audit: bool, any_problems: bool) -> None:
|
||||
with OutputBuffer() as obuf:
|
||||
# Tell user that PuTTY cannot be hardened at the protocol-level.
|
||||
if client_audit and (software is not None) and (software.product == SSH.Product.PuTTY):
|
||||
@ -3094,7 +3092,7 @@ utils = Utils()
|
||||
out = Output()
|
||||
|
||||
|
||||
def main():
|
||||
def main() -> None: # printed text is still None
|
||||
conf = AuditConf.from_cmdline(sys.argv[1:], usage)
|
||||
audit(conf)
|
||||
|
||||
|
15
tox.ini
15
tox.ini
@ -88,17 +88,16 @@ commands =
|
||||
|
||||
|
||||
[mypy]
|
||||
; ignore_missing_imports = False
|
||||
; follow_imports = error
|
||||
ignore_missing_imports = False
|
||||
follow_imports = normal
|
||||
; disallow_untyped_calls = True
|
||||
; disallow_untyped_defs = True
|
||||
; check_untyped_defs = True
|
||||
; disallow_subclassing_any = True
|
||||
; warn_incomplete_stub = True
|
||||
; warn_redundant_casts = True
|
||||
; warn_return_any = True
|
||||
; warn_unused_ignores = True
|
||||
; strict_optional = True
|
||||
disallow_subclassing_any = True
|
||||
warn_redundant_casts = True
|
||||
warn_return_any = True
|
||||
warn_unused_ignores = True
|
||||
strict_optional = True
|
||||
|
||||
[pylint]
|
||||
reports = no
|
||||
|
Loading…
x
Reference in New Issue
Block a user