From 11a902cb1457098ff023eeac5c2560bcfd2cc95a Mon Sep 17 00:00:00 2001 From: Joe Testa Date: Sat, 26 Jul 2025 19:57:11 -0400 Subject: [PATCH] Removed SSHv1 support (#298). --- README.md | 8 +- src/ssh_audit/algorithms.py | 27 +--- src/ssh_audit/auditconf.py | 6 +- src/ssh_audit/gextest.py | 4 +- src/ssh_audit/kexdh.py | 10 +- src/ssh_audit/ssh1.py | 40 ------ src/ssh_audit/ssh1_crc32.py | 47 ------- src/ssh_audit/ssh1_kexdb.py | 84 ------------ src/ssh_audit/ssh1_publickeymessage.py | 144 -------------------- src/ssh_audit/ssh_audit.py | 154 +++++++--------------- src/ssh_audit/ssh_socket.py | 47 ++----- ssh-audit.1 | 12 +- test/test_auditconf.py | 12 +- test/test_errors.py | 1 - test/test_ssh1.py | 174 ------------------------- 15 files changed, 79 insertions(+), 691 deletions(-) delete mode 100644 src/ssh_audit/ssh1.py delete mode 100644 src/ssh_audit/ssh1_crc32.py delete mode 100644 src/ssh_audit/ssh1_kexdb.py delete mode 100644 src/ssh_audit/ssh1_publickeymessage.py delete mode 100644 test/test_ssh1.py diff --git a/README.md b/README.md index 913873e..4d7eb25 100644 --- a/README.md +++ b/README.md @@ -26,8 +26,7 @@ - [ChangeLog](#changelog) ## Features -- SSH1 and SSH2 protocol server support; -- analyze SSH client configuration; +- analyze SSH both server and client configuration; - grab banner, recognize device or software and operating system, detect compression; - gather key-exchange, host-key, encryption and message authentication code algorithms; - output algorithm security information (available since, removed/disabled, unsafe/weak/legacy, etc); @@ -41,7 +40,7 @@ ## Usage ``` -usage: ssh-audit.py [-h] [-1] [-2] [-4] [-6] [-b] [-c] [-d] +usage: ssh-audit.py [-h] [-4] [-6] [-b] [-c] [-d] [-g / ] [-j] [-l {info,warn,fail}] [-L] [-M custom_policy.txt] [-m] [-n] [-P "Built-In Policy Name" / custom_policy.txt] [-p N] [-T targets.txt] [-t N] [-v] [--conn-rate-test N[:max_rate]] [--dheat N[:kex[:e_len]]] @@ -53,8 +52,6 @@ positional arguments: optional arguments: -h, --help show this help message and exit - -1, --ssh1 force ssh version 1 only - -2, --ssh2 force ssh version 2 only -4, --ipv4 enable IPv4 (order of precedence) -6, --ipv6 enable IPv6 (order of precedence) -b, --batch batch output @@ -218,6 +215,7 @@ For convenience, a web front-end on top of the command-line tool is available at ### v3.4.0-dev - Added warning to all key exchanges that do not include protections against quantum attacks due to the Harvest Now, Decrypt Later strategy (see https://en.wikipedia.org/wiki/Harvest_now,_decrypt_later). + - Removed SSHv1 support (rationale is documented in: https://github.com/jtesta/ssh-audit/issues/298). - Migrated from deprecated `getopt` module to `argparse`; partial credit [oam7575](https://github.com/oam7575). - When running against multiple hosts, now prints each target host regardless of output level. - Batch mode (`-b`) no longer automatically enables verbose mode, due to sometimes confusing results; users can still explicitly enable verbose mode using the `-v` flag. diff --git a/src/ssh_audit/algorithms.py b/src/ssh_audit/algorithms.py index 313e37e..7b0cac9 100644 --- a/src/ssh_audit/algorithms.py +++ b/src/ssh_audit/algorithms.py @@ -1,7 +1,7 @@ """ The MIT License (MIT) - Copyright (C) 2017-2021 Joe Testa (jtesta@positronsecurity.com) + Copyright (C) 2017-2025 Joe Testa (jtesta@positronsecurity.com) Copyright (C) 2017 Andris Raugulis (moo@arthepsy.eu) Permission is hereby granted, free of charge, to any person obtaining a copy @@ -29,8 +29,6 @@ from typing import Callable, Optional, Union, Any # noqa: F401 from ssh_audit.algorithm import Algorithm from ssh_audit.product import Product from ssh_audit.software import Software -from ssh_audit.ssh1_kexdb import SSH1_KexDB -from ssh_audit.ssh1_publickeymessage import SSH1_PublicKeyMessage from ssh_audit.ssh2_kex import SSH2_Kex from ssh_audit.ssh2_kexdb import SSH2_KexDB from ssh_audit.timeframe import Timeframe @@ -38,28 +36,13 @@ from ssh_audit.utils import Utils class Algorithms: - def __init__(self, pkm: Optional[SSH1_PublicKeyMessage], kex: Optional[SSH2_Kex]) -> None: - self.__ssh1kex = pkm + def __init__(self, kex: Optional[SSH2_Kex]) -> None: self.__ssh2kex = kex - @property - def ssh1kex(self) -> Optional[SSH1_PublicKeyMessage]: - return self.__ssh1kex - @property def ssh2kex(self) -> Optional[SSH2_Kex]: return self.__ssh2kex - @property - def ssh1(self) -> Optional['Algorithms.Item']: - if self.ssh1kex is None: - return None - item = Algorithms.Item(1, SSH1_KexDB.get_db()) - item.add('key', ['ssh-rsa1']) - item.add('enc', self.ssh1kex.supported_ciphers) - item.add('aut', self.ssh1kex.supported_authentications) - return item - @property def ssh2(self) -> Optional['Algorithms.Item']: if self.ssh2kex is None: @@ -73,7 +56,7 @@ class Algorithms: @property def values(self) -> Iterable['Algorithms.Item']: - for item in [self.ssh1, self.ssh2]: + for item in [self.ssh2]: if item is not None: yield item @@ -82,10 +65,6 @@ class Algorithms: def _ml(items: Sequence[str]) -> int: return max(len(i) for i in items) maxlen = 0 - if self.ssh1kex is not None: - maxlen = max(_ml(self.ssh1kex.supported_ciphers), - _ml(self.ssh1kex.supported_authentications), - maxlen) if self.ssh2kex is not None: maxlen = max(_ml(self.ssh2kex.kex_algorithms), _ml(self.ssh2kex.key_algorithms), diff --git a/src/ssh_audit/auditconf.py b/src/ssh_audit/auditconf.py index bb3437c..2d126a4 100644 --- a/src/ssh_audit/auditconf.py +++ b/src/ssh_audit/auditconf.py @@ -1,7 +1,7 @@ """ The MIT License (MIT) - Copyright (C) 2017-2024 Joe Testa (jtesta@positronsecurity.com) + Copyright (C) 2017-2025 Joe Testa (jtesta@positronsecurity.com) Copyright (C) 2017 Andris Raugulis (moo@arthepsy.eu) Permission is hereby granted, free of charge, to any person obtaining a copy @@ -35,8 +35,6 @@ class AuditConf: def __init__(self, host: str = '', port: int = 22) -> None: self.host = host self.port = port - self.ssh1 = True - self.ssh2 = True self.batch = False self.client_audit = False self.colors = True @@ -73,7 +71,7 @@ class AuditConf: def __setattr__(self, name: str, value: Union[str, int, float, bool, Sequence[int]]) -> None: valid = False - if name in ['batch', 'client_audit', 'colors', 'json', 'json_print_indent', 'list_policies', 'manual', 'make_policy', 'ssh1', 'ssh2', 'timeout_set', 'verbose', 'debug', 'skip_rate_test']: + if name in ['batch', 'client_audit', 'colors', 'json', 'json_print_indent', 'list_policies', 'manual', 'make_policy', 'timeout_set', 'verbose', 'debug', 'skip_rate_test']: valid, value = True, bool(value) elif name in ['ipv4', 'ipv6']: valid, value = True, bool(value) diff --git a/src/ssh_audit/gextest.py b/src/ssh_audit/gextest.py index 3ce86d1..4494c0f 100644 --- a/src/ssh_audit/gextest.py +++ b/src/ssh_audit/gextest.py @@ -1,7 +1,7 @@ """ The MIT License (MIT) - Copyright (C) 2017-2023 Joe Testa (jtesta@positronsecurity.com) + Copyright (C) 2017-2025 Joe Testa (jtesta@positronsecurity.com) Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal @@ -64,7 +64,7 @@ class GEXTest: try: # Parse the server's KEX. - _, payload = s.read_packet(2) + _, payload = s.read_packet() SSH2_Kex.parse(out, payload) except (KexDHException, struct.error): out.v("Failed to parse server's kex. Stack trace:\n%s" % str(traceback.format_exc()), write_now=True) diff --git a/src/ssh_audit/kexdh.py b/src/ssh_audit/kexdh.py index 7335334..b791c55 100644 --- a/src/ssh_audit/kexdh.py +++ b/src/ssh_audit/kexdh.py @@ -1,7 +1,7 @@ """ The MIT License (MIT) - Copyright (C) 2017-2023 Joe Testa (jtesta@positronsecurity.com) + Copyright (C) 2017-2025 Joe Testa (jtesta@positronsecurity.com) Copyright (C) 2017 Andris Raugulis (moo@arthepsy.eu) Permission is hereby granted, free of charge, to any person obtaining a copy @@ -88,11 +88,11 @@ class KexDH: # pragma: nocover self.__ca_key_type = '' self.__ca_n_len = 0 - packet_type, payload = s.read_packet(2) + packet_type, payload = s.read_packet() # Skip any & all MSG_DEBUG messages. while packet_type == Protocol.MSG_DEBUG: - packet_type, payload = s.read_packet(2) + packet_type, payload = s.read_packet() if packet_type != -1 and packet_type not in [Protocol.MSG_KEXDH_REPLY, Protocol.MSG_KEXDH_GEX_REPLY]: # pylint: disable=no-else-raise raise KexDHException('Expected MSG_KEXDH_REPLY (%d) or MSG_KEXDH_GEX_REPLY (%d), but got %d instead.' % (Protocol.MSG_KEXDH_REPLY, Protocol.MSG_KEXDH_GEX_REPLY, packet_type)) @@ -380,13 +380,13 @@ class KexGroupExchange(KexDH): s.write_int(maxbits) s.send_packet() - packet_type, payload = s.read_packet(2) + packet_type, payload = s.read_packet() if packet_type not in [Protocol.MSG_KEXDH_GEX_GROUP, Protocol.MSG_DEBUG]: raise KexDHException('Expected MSG_KEXDH_GEX_REPLY (%d), but got %d instead.' % (Protocol.MSG_KEXDH_GEX_REPLY, packet_type)) # Skip any & all MSG_DEBUG messages. while packet_type == Protocol.MSG_DEBUG: - packet_type, payload = s.read_packet(2) + packet_type, payload = s.read_packet() try: # Parse the modulus (p) and generator (g) values from the server. diff --git a/src/ssh_audit/ssh1.py b/src/ssh_audit/ssh1.py deleted file mode 100644 index f966b74..0000000 --- a/src/ssh_audit/ssh1.py +++ /dev/null @@ -1,40 +0,0 @@ -""" - The MIT License (MIT) - - Copyright (C) 2017 Andris Raugulis (moo@arthepsy.eu) - - Permission is hereby granted, free of charge, to any person obtaining a copy - of this software and associated documentation files (the "Software"), to deal - in the Software without restriction, including without limitation the rights - to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - copies of the Software, and to permit persons to whom the Software is - furnished to do so, subject to the following conditions: - - The above copyright notice and this permission notice shall be included in - all copies or substantial portions of the Software. - - THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - THE SOFTWARE. -""" -# pylint: disable=unused-import -from typing import Dict, List, Set, Sequence, Tuple, Iterable # noqa: F401 -from typing import Callable, Optional, Union, Any # noqa: F401 - -from ssh_audit.ssh1_crc32 import SSH1_CRC32 - - -class SSH1: - _crc32: Optional[SSH1_CRC32] = None - CIPHERS = ['none', 'idea', 'des', '3des', 'tss', 'rc4', 'blowfish'] - AUTHS = ['none', 'rhosts', 'rsa', 'password', 'rhosts_rsa', 'tis', 'kerberos'] - - @classmethod - def crc32(cls, v: bytes) -> int: - if cls._crc32 is None: - cls._crc32 = SSH1_CRC32() - return cls._crc32.calc(v) diff --git a/src/ssh_audit/ssh1_crc32.py b/src/ssh_audit/ssh1_crc32.py deleted file mode 100644 index 55089ed..0000000 --- a/src/ssh_audit/ssh1_crc32.py +++ /dev/null @@ -1,47 +0,0 @@ -""" - The MIT License (MIT) - - Copyright (C) 2017 Andris Raugulis (moo@arthepsy.eu) - - Permission is hereby granted, free of charge, to any person obtaining a copy - of this software and associated documentation files (the "Software"), to deal - in the Software without restriction, including without limitation the rights - to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - copies of the Software, and to permit persons to whom the Software is - furnished to do so, subject to the following conditions: - - The above copyright notice and this permission notice shall be included in - all copies or substantial portions of the Software. - - THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - THE SOFTWARE. -""" -# pylint: disable=unused-import -from typing import Dict, List, Set, Sequence, Tuple, Iterable # noqa: F401 -from typing import Callable, Optional, Union, Any # noqa: F401 - - -class SSH1_CRC32: - def __init__(self) -> None: - self._table = [0] * 256 - for i in range(256): - crc = 0 - n = i - for _ in range(8): - x = (crc ^ n) & 1 - crc = (crc >> 1) ^ (x * 0xedb88320) - n = n >> 1 - self._table[i] = crc - - def calc(self, v: bytes) -> int: - crc, length = 0, len(v) - for i in range(length): - n = ord(v[i:i + 1]) - n = n ^ (crc & 0xff) - crc = (crc >> 8) ^ self._table[n] - return crc diff --git a/src/ssh_audit/ssh1_kexdb.py b/src/ssh_audit/ssh1_kexdb.py deleted file mode 100644 index 3a7de1b..0000000 --- a/src/ssh_audit/ssh1_kexdb.py +++ /dev/null @@ -1,84 +0,0 @@ -""" - The MIT License (MIT) - - Copyright (C) 2017 Andris Raugulis (moo@arthepsy.eu) - - Permission is hereby granted, free of charge, to any person obtaining a copy - of this software and associated documentation files (the "Software"), to deal - in the Software without restriction, including without limitation the rights - to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - copies of the Software, and to permit persons to whom the Software is - furnished to do so, subject to the following conditions: - - The above copyright notice and this permission notice shall be included in - all copies or substantial portions of the Software. - - THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - THE SOFTWARE. -""" -# pylint: disable=unused-import -import copy -import threading - -from typing import Dict, List, Set, Sequence, Tuple, Iterable # noqa: F401 -from typing import Callable, Optional, Union, Any # noqa: F401 - - -class SSH1_KexDB: # pylint: disable=too-few-public-methods - - FAIL_PLAINTEXT = 'no encryption/integrity' - FAIL_OPENSSH37_REMOVE = 'removed since OpenSSH 3.7' - FAIL_NA_BROKEN = 'not implemented in OpenSSH, broken algorithm' - FAIL_NA_UNSAFE = 'not implemented in OpenSSH (server), unsafe algorithm' - TEXT_CIPHER_IDEA = 'cipher used by commercial SSH' - - DB_PER_THREAD: Dict[int, Dict[str, Dict[str, List[List[Optional[str]]]]]] = {} - - MASTER_DB: Dict[str, Dict[str, List[List[Optional[str]]]]] = { - 'key': { - 'ssh-rsa1': [['1.2.2']], - }, - 'enc': { - 'none': [['1.2.2'], [FAIL_PLAINTEXT]], - 'idea': [[None], [], [], [TEXT_CIPHER_IDEA]], - 'des': [['2.3.0C'], [FAIL_NA_UNSAFE]], - '3des': [['1.2.2']], - 'tss': [[''], [FAIL_NA_BROKEN]], - 'rc4': [[], [FAIL_NA_BROKEN]], - 'blowfish': [['1.2.2']], - }, - 'aut': { - 'rhosts': [['1.2.2', '3.6'], [FAIL_OPENSSH37_REMOVE]], - 'rsa': [['1.2.2']], - 'password': [['1.2.2']], - 'rhosts_rsa': [['1.2.2']], - 'tis': [['1.2.2']], - 'kerberos': [['1.2.2', '3.6'], [FAIL_OPENSSH37_REMOVE]], - } - } - - - @staticmethod - def get_db() -> Dict[str, Dict[str, List[List[Optional[str]]]]]: - '''Returns a copy of the MASTER_DB that is private to the calling thread. This prevents multiple threads from polluting the results of other threads.''' - calling_thread_id = threading.get_ident() - - if calling_thread_id not in SSH1_KexDB.DB_PER_THREAD: - SSH1_KexDB.DB_PER_THREAD[calling_thread_id] = copy.deepcopy(SSH1_KexDB.MASTER_DB) - - return SSH1_KexDB.DB_PER_THREAD[calling_thread_id] - - - @staticmethod - def thread_exit() -> None: - '''Deletes the calling thread's copy of the MASTER_DB. This is needed because, in rare circumstances, a terminated thread's ID can be re-used by new threads.''' - - calling_thread_id = threading.get_ident() - - if calling_thread_id in SSH1_KexDB.DB_PER_THREAD: - del SSH1_KexDB.DB_PER_THREAD[calling_thread_id] diff --git a/src/ssh_audit/ssh1_publickeymessage.py b/src/ssh_audit/ssh1_publickeymessage.py deleted file mode 100644 index 67c4dec..0000000 --- a/src/ssh_audit/ssh1_publickeymessage.py +++ /dev/null @@ -1,144 +0,0 @@ -""" - The MIT License (MIT) - - Copyright (C) 2017 Andris Raugulis (moo@arthepsy.eu) - - Permission is hereby granted, free of charge, to any person obtaining a copy - of this software and associated documentation files (the "Software"), to deal - in the Software without restriction, including without limitation the rights - to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - copies of the Software, and to permit persons to whom the Software is - furnished to do so, subject to the following conditions: - - The above copyright notice and this permission notice shall be included in - all copies or substantial portions of the Software. - - THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - THE SOFTWARE. -""" -# pylint: disable=unused-import -from typing import Dict, List, Set, Sequence, Tuple, Iterable # noqa: F401 -from typing import Callable, Optional, Union, Any # noqa: F401 - -from ssh_audit.ssh1 import SSH1 -from ssh_audit.readbuf import ReadBuf -from ssh_audit.utils import Utils -from ssh_audit.writebuf import WriteBuf - - -class SSH1_PublicKeyMessage: - def __init__(self, cookie: bytes, skey: Tuple[int, int, int], hkey: Tuple[int, int, int], pflags: int, cmask: int, amask: int) -> None: - if len(skey) != 3: - raise ValueError('invalid server key pair: {}'.format(skey)) - if len(hkey) != 3: - raise ValueError('invalid host key pair: {}'.format(hkey)) - self.__cookie = cookie - self.__server_key = skey - self.__host_key = hkey - self.__protocol_flags = pflags - self.__supported_ciphers_mask = cmask - self.__supported_authentications_mask = amask - - @property - def cookie(self) -> bytes: - return self.__cookie - - @property - def server_key_bits(self) -> int: - return self.__server_key[0] - - @property - def server_key_public_exponent(self) -> int: - return self.__server_key[1] - - @property - def server_key_public_modulus(self) -> int: - return self.__server_key[2] - - @property - def host_key_bits(self) -> int: - return self.__host_key[0] - - @property - def host_key_public_exponent(self) -> int: - return self.__host_key[1] - - @property - def host_key_public_modulus(self) -> int: - return self.__host_key[2] - - @property - def host_key_fingerprint_data(self) -> bytes: - # pylint: disable=protected-access - mod = WriteBuf._create_mpint(self.host_key_public_modulus, False) - e = WriteBuf._create_mpint(self.host_key_public_exponent, False) - return mod + e - - @property - def protocol_flags(self) -> int: - return self.__protocol_flags - - @property - def supported_ciphers_mask(self) -> int: - return self.__supported_ciphers_mask - - @property - def supported_ciphers(self) -> List[str]: - ciphers = [] - for i in range(len(SSH1.CIPHERS)): # pylint: disable=consider-using-enumerate - if self.__supported_ciphers_mask & (1 << i) != 0: - ciphers.append(Utils.to_text(SSH1.CIPHERS[i])) - return ciphers - - @property - def supported_authentications_mask(self) -> int: - return self.__supported_authentications_mask - - @property - def supported_authentications(self) -> List[str]: - auths = [] - for i in range(1, len(SSH1.AUTHS)): - if self.__supported_authentications_mask & (1 << i) != 0: - auths.append(Utils.to_text(SSH1.AUTHS[i])) - return auths - - def write(self, wbuf: 'WriteBuf') -> None: - wbuf.write(self.cookie) - wbuf.write_int(self.server_key_bits) - wbuf.write_mpint1(self.server_key_public_exponent) - wbuf.write_mpint1(self.server_key_public_modulus) - wbuf.write_int(self.host_key_bits) - wbuf.write_mpint1(self.host_key_public_exponent) - wbuf.write_mpint1(self.host_key_public_modulus) - wbuf.write_int(self.protocol_flags) - wbuf.write_int(self.supported_ciphers_mask) - wbuf.write_int(self.supported_authentications_mask) - - @property - def payload(self) -> bytes: - wbuf = WriteBuf() - self.write(wbuf) - return wbuf.write_flush() - - @classmethod - def parse(cls, payload: bytes) -> 'SSH1_PublicKeyMessage': - buf = ReadBuf(payload) - cookie = buf.read(8) - server_key_bits = buf.read_int() - server_key_exponent = buf.read_mpint1() - server_key_modulus = buf.read_mpint1() - skey = (server_key_bits, server_key_exponent, server_key_modulus) - host_key_bits = buf.read_int() - host_key_exponent = buf.read_mpint1() - host_key_modulus = buf.read_mpint1() - hkey = (host_key_bits, host_key_exponent, host_key_modulus) - pflags = buf.read_int() - cmask = buf.read_int() - amask = buf.read_int() - pkm = cls(cookie, skey, hkey, pflags, cmask, amask) - return pkm diff --git a/src/ssh_audit/ssh_audit.py b/src/ssh_audit/ssh_audit.py index cf2d5aa..0ed06e6 100755 --- a/src/ssh_audit/ssh_audit.py +++ b/src/ssh_audit/ssh_audit.py @@ -2,7 +2,7 @@ """ The MIT License (MIT) - Copyright (C) 2017-2024 Joe Testa (jtesta@positronsecurity.com) + Copyright (C) 2017-2025 Joe Testa (jtesta@positronsecurity.com) Copyright (C) 2017 Andris Raugulis (moo@arthepsy.eu) Permission is hereby granted, free of charge, to any person obtaining a copy @@ -56,8 +56,6 @@ from ssh_audit.policy import Policy from ssh_audit.product import Product from ssh_audit.protocol import Protocol from ssh_audit.software import Software -from ssh_audit.ssh1_kexdb import SSH1_KexDB -from ssh_audit.ssh1_publickeymessage import SSH1_PublicKeyMessage from ssh_audit.ssh2_kex import SSH2_Kex from ssh_audit.ssh2_kexdb import SSH2_KexDB from ssh_audit.ssh_socket import SSH_Socket @@ -234,11 +232,6 @@ def output_security(out: OutputBuffer, banner: Optional[Banner], padlen: int, is def output_fingerprints(out: OutputBuffer, algs: Algorithms, is_json_output: bool) -> None: with out: fps = {} - if algs.ssh1kex is not None: - name = 'ssh-rsa1' - fp = Fingerprint(algs.ssh1kex.host_key_fingerprint_data) - # bits = algs.ssh1kex.host_key_bits - fps[name] = fp if algs.ssh2kex is not None: host_keys = algs.ssh2kex.host_keys() for host_key_type in algs.ssh2kex.host_keys(): @@ -509,12 +502,11 @@ def post_process_findings(banner: Optional[Banner], algs: Algorithms, client_aud # Returns a exitcodes.* flag to denote if any failures or warnings were encountered. -def output(out: OutputBuffer, aconf: AuditConf, banner: Optional[Banner], header: List[str], client_host: Optional[str] = None, kex: Optional[SSH2_Kex] = None, pkm: Optional[SSH1_PublicKeyMessage] = None, print_target: bool = False, dh_rate_test_notes: str = "") -> int: +def output(out: OutputBuffer, aconf: AuditConf, banner: Optional[Banner], header: List[str], client_host: Optional[str] = None, kex: Optional[SSH2_Kex] = None, print_target: bool = False, dh_rate_test_notes: str = "") -> int: program_retval = exitcodes.GOOD client_audit = client_host is not None # If set, this is a client audit. - sshv = 1 if pkm is not None else 2 - algs = Algorithms(pkm, kex) + algs = Algorithms(kex) # Perform post-processing on the findings to make final adjustments before outputting the results. algorithm_recommendation_suppress_list, additional_notes = post_process_findings(banner, algs, client_audit, dh_rate_test_notes) @@ -539,7 +531,7 @@ def output(out: OutputBuffer, aconf: AuditConf, banner: Optional[Banner], header out.info('(gen) header: ' + '\n'.join(header)) if banner is not None: banner_line = '(gen) banner: {}'.format(banner) - if sshv == 1 or banner.protocol[0] == 1: + if banner.protocol[0] == 1: out.fail(banner_line) out.fail('(gen) protocol SSH1 enabled') else: @@ -571,18 +563,6 @@ def output(out: OutputBuffer, aconf: AuditConf, banner: Optional[Banner], header # Filled in by output_algorithms() with unidentified algs. unknown_algorithms: List[str] = [] - # SSHv1 - if pkm is not None: - adb = SSH1_KexDB.get_db() - ciphers = pkm.supported_ciphers - auths = pkm.supported_authentications - title, atype = 'SSH1 host-key algorithms', 'key' - program_retval = output_algorithms(out, title, adb, atype, ['ssh-rsa1'], unknown_algorithms, aconf.json, program_retval, maxlen) - title, atype = 'SSH1 encryption algorithms (ciphers)', 'enc' - program_retval = output_algorithms(out, title, adb, atype, ciphers, unknown_algorithms, aconf.json, program_retval, maxlen) - title, atype = 'SSH1 authentication types', 'aut' - program_retval = output_algorithms(out, title, adb, atype, auths, unknown_algorithms, aconf.json, program_retval, maxlen) - # SSHv2 if kex is not None: adb = SSH2_KexDB.get_db() @@ -782,15 +762,12 @@ def process_commandline(out: OutputBuffer, args: List[str]) -> 'AuditConf': # p aconf.colors = enable_colors out.use_colors = enable_colors - aconf.ssh1, aconf.ssh2 = False, False host: str = '' port: int = 22 parser = argparse.ArgumentParser(description="# {} {}, https://github.com/jtesta/ssh-audit".format(os.path.basename(sys.argv[0]), VERSION), allow_abbrev=False) # Add short options to the parser - parser.add_argument("-1", "--ssh1", action="store_true", dest="ssh1", default=False, help="force ssh version 1 only") - parser.add_argument("-2", "--ssh2", action="store_true", dest="ssh2", default=False, help="force ssh version 2 only") parser.add_argument("-4", "--ipv4", action="store_true", dest="ipv4", default=False, help="enable IPv4 (order of precedence)") parser.add_argument("-6", "--ipv6", action="store_true", dest="ipv6", default=False, help="enable IPv6 (order of precedence)") parser.add_argument("-b", "--batch", action="store_true", dest="batch", default=False, help="batch output") @@ -836,8 +813,6 @@ def process_commandline(out: OutputBuffer, args: List[str]) -> 'AuditConf': # p aconf.list_policies = argument.list_policies aconf.manual = argument.manual aconf.skip_rate_test = argument.skip_rate_test - aconf.ssh1 = argument.ssh1 - aconf.ssh2 = argument.ssh2 oport = argument.oport if argument.batch is True: @@ -950,9 +925,6 @@ def process_commandline(out: OutputBuffer, args: List[str]) -> 'AuditConf': # p aconf.host = host aconf.port = port - if not (aconf.ssh1 or aconf.ssh2): - aconf.ssh1, aconf.ssh2 = True, True - # If a file containing a list of targets was given, read it. if aconf.target_file is not None: try: @@ -994,7 +966,7 @@ def process_commandline(out: OutputBuffer, args: List[str]) -> 'AuditConf': # p return aconf -def build_struct(target_host: str, banner: Optional['Banner'], kex: Optional['SSH2_Kex'] = None, pkm: Optional['SSH1_PublicKeyMessage'] = None, client_host: Optional[str] = None, software: Optional[Software] = None, algorithms: Optional[Algorithms] = None, algorithm_recommendation_suppress_list: Optional[List[str]] = None, additional_notes: List[str] = []) -> Any: # pylint: disable=dangerous-default-value +def build_struct(target_host: str, banner: Optional['Banner'], kex: Optional['SSH2_Kex'] = None, client_host: Optional[str] = None, software: Optional[Software] = None, algorithms: Optional[Algorithms] = None, algorithm_recommendation_suppress_list: Optional[List[str]] = None, additional_notes: List[str] = []) -> Any: # pylint: disable=dangerous-default-value def fetch_notes(algorithm: str, alg_type: str) -> Dict[str, List[Optional[str]]]: '''Returns a dictionary containing the messages in the "fail", "warn", and "info" levels for this algorithm.''' @@ -1139,22 +1111,6 @@ def build_struct(target_host: str, banner: Optional['Banner'], kex: Optional['SS 'hash_alg': 'MD5', 'hash': fp.md5[4:] }) - else: - pkm_supported_ciphers = None - pkm_supported_authentications = None - pkm_fp = None - if pkm is not None: - pkm_supported_ciphers = pkm.supported_ciphers - pkm_supported_authentications = pkm.supported_authentications - pkm_fp = Fingerprint(pkm.host_key_fingerprint_data).sha256 - - res['key'] = ['ssh-rsa1'] - res['enc'] = pkm_supported_ciphers - res['aut'] = pkm_supported_authentications - res['fingerprints'] = [{ - 'type': 'ssh-rsa1', - 'fp': pkm_fp, - }] # Historically, CVE information was returned. Now we'll just return an empty dictionary so as to not break any legacy clients. res['cves'] = [] @@ -1169,7 +1125,7 @@ def build_struct(target_host: str, banner: Optional['Banner'], kex: Optional['SS # Returns one of the exitcodes.* flags. -def audit(out: OutputBuffer, aconf: AuditConf, sshv: Optional[int] = None, print_target: bool = False) -> int: +def audit(out: OutputBuffer, aconf: AuditConf, print_target: bool = False) -> int: program_retval = exitcodes.GOOD out.batch = aconf.batch out.verbose = aconf.verbose @@ -1195,10 +1151,8 @@ def audit(out: OutputBuffer, aconf: AuditConf, sshv: Optional[int] = None, print out.write() sys.exit(exitcodes.CONNECTION_ERROR) - if sshv is None: - sshv = 2 if aconf.ssh2 else 1 err = None - banner, header, err = s.get_banner(sshv) + banner, header, err = s.get_banner() if banner is None: if err is None: err = '[exception] did not receive banner.' @@ -1207,7 +1161,7 @@ def audit(out: OutputBuffer, aconf: AuditConf, sshv: Optional[int] = None, print if err is None: s.send_kexinit() # Send the algorithms we support (except we don't since this isn't a real SSH connection). - packet_type, payload = s.read_packet(sshv) + packet_type, payload = s.read_packet() if packet_type < 0: try: if len(payload) > 0: @@ -1216,17 +1170,10 @@ def audit(out: OutputBuffer, aconf: AuditConf, sshv: Optional[int] = None, print payload_txt = 'empty' except UnicodeDecodeError: payload_txt = '"{}"'.format(repr(payload).lstrip('b')[1:-1]) - if payload_txt == 'Protocol major versions differ.': - if sshv == 2 and aconf.ssh1: - ret = audit(out, aconf, 1) - out.write() - return ret err = '[exception] error reading packet ({})'.format(payload_txt) else: err_pair = None - if sshv == 1 and packet_type != Protocol.SMSG_PUBLIC_KEY: - err_pair = ('SMSG_PUBLIC_KEY', Protocol.SMSG_PUBLIC_KEY) - elif sshv == 2 and packet_type != Protocol.MSG_KEXINIT: + if packet_type != Protocol.MSG_KEXINIT: err_pair = ('MSG_KEXINIT', Protocol.MSG_KEXINIT) if err_pair is not None: fmt = '[exception] did not receive {0} ({1}), ' + \ @@ -1236,52 +1183,50 @@ def audit(out: OutputBuffer, aconf: AuditConf, sshv: Optional[int] = None, print output(out, aconf, banner, header) out.fail(err) return exitcodes.CONNECTION_ERROR - if sshv == 1: - program_retval = output(out, aconf, banner, header, pkm=SSH1_PublicKeyMessage.parse(payload)) - elif sshv == 2: - try: - kex = SSH2_Kex.parse(out, payload) - out.d(str(kex)) - except Exception: - out.fail("Failed to parse server's kex. Stack trace:\n%s" % str(traceback.format_exc())) - return exitcodes.CONNECTION_ERROR - if aconf.dheat is not None: - DHEat(out, aconf, banner, kex).run() - return exitcodes.GOOD - elif aconf.conn_rate_test_enabled: - DHEat.dh_rate_test(out, aconf, kex, 0, 0, 0) - return exitcodes.GOOD + try: + kex = SSH2_Kex.parse(out, payload) + out.d(str(kex)) + except Exception: + out.fail("Failed to parse server's kex. Stack trace:\n%s" % str(traceback.format_exc())) + return exitcodes.CONNECTION_ERROR - dh_rate_test_notes = "" - if aconf.client_audit is False: - HostKeyTest.run(out, s, kex) - if aconf.gex_test != '': - return run_gex_granular_modulus_size_test(out, s, kex, aconf) - else: - GEXTest.run(out, s, banner, kex) - - # Skip the rate test if the user specified "--skip-rate-test". - if aconf.skip_rate_test: - out.d("Skipping rate test due to --skip-rate-test option.") - else: - # Try to open many TCP connections against the server if any Diffie-Hellman key exchanges are present; this tests potential vulnerability to the DHEat DOS attack. Use 3 concurrent sockets over at most 1.5 seconds to open at most 38 connections (stops if 1.5 seconds elapse, or 38 connections are opened--whichever comes first). If more than 25 connections per second were observed, flag the DH algorithms with a warning about the DHEat DOS vuln. - dh_rate_test_notes = DHEat.dh_rate_test(out, aconf, kex, 1.5, 38, 3) - - # This is a standard audit scan. - if (aconf.policy is None) and (aconf.make_policy is False): - program_retval = output(out, aconf, banner, header, client_host=s.client_host, kex=kex, print_target=print_target, dh_rate_test_notes=dh_rate_test_notes) - - # This is a policy test. - elif (aconf.policy is not None) and (aconf.make_policy is False): - program_retval = exitcodes.GOOD if evaluate_policy(out, aconf, banner, s.client_host, kex=kex) else exitcodes.FAILURE - - # A new policy should be made from this scan. - elif (aconf.policy is None) and (aconf.make_policy is True): - make_policy(aconf, banner, kex, s.client_host) + if aconf.dheat is not None: + DHEat(out, aconf, banner, kex).run() + return exitcodes.GOOD + elif aconf.conn_rate_test_enabled: + DHEat.dh_rate_test(out, aconf, kex, 0, 0, 0) + return exitcodes.GOOD + dh_rate_test_notes = "" + if aconf.client_audit is False: + HostKeyTest.run(out, s, kex) + if aconf.gex_test != '': + return run_gex_granular_modulus_size_test(out, s, kex, aconf) else: - raise RuntimeError('Internal error while handling output: %r %r' % (aconf.policy is None, aconf.make_policy)) + GEXTest.run(out, s, banner, kex) + + # Skip the rate test if the user specified "--skip-rate-test". + if aconf.skip_rate_test: + out.d("Skipping rate test due to --skip-rate-test option.") + else: + # Try to open many TCP connections against the server if any Diffie-Hellman key exchanges are present; this tests potential vulnerability to the DHEat DOS attack. Use 3 concurrent sockets over at most 1.5 seconds to open at most 38 connections (stops if 1.5 seconds elapse, or 38 connections are opened--whichever comes first). If more than 25 connections per second were observed, flag the DH algorithms with a warning about the DHEat DOS vuln. + dh_rate_test_notes = DHEat.dh_rate_test(out, aconf, kex, 1.5, 38, 3) + + # This is a standard audit scan. + if (aconf.policy is None) and (aconf.make_policy is False): + program_retval = output(out, aconf, banner, header, client_host=s.client_host, kex=kex, print_target=print_target, dh_rate_test_notes=dh_rate_test_notes) + + # This is a policy test. + elif (aconf.policy is not None) and (aconf.make_policy is False): + program_retval = exitcodes.GOOD if evaluate_policy(out, aconf, banner, s.client_host, kex=kex) else exitcodes.FAILURE + + # A new policy should be made from this scan. + elif (aconf.policy is None) and (aconf.make_policy is True): + make_policy(aconf, banner, kex, s.client_host) + + else: + raise RuntimeError('Internal error while handling output: %r %r' % (aconf.policy is None, aconf.make_policy)) return program_retval @@ -1548,7 +1493,6 @@ def main() -> int: print(']') # Send notification that this thread is exiting. This deletes the thread's local copy of the algorithm databases. - SSH1_KexDB.thread_exit() SSH2_KexDB.thread_exit() else: # Just a scan against a single target. diff --git a/src/ssh_audit/ssh_socket.py b/src/ssh_audit/ssh_socket.py index 400bd50..97a0e94 100644 --- a/src/ssh_audit/ssh_socket.py +++ b/src/ssh_audit/ssh_socket.py @@ -1,7 +1,7 @@ """ The MIT License (MIT) - Copyright (C) 2017-2021 Joe Testa (jtesta@positronsecurity.com) + Copyright (C) 2017-2025 Joe Testa (jtesta@positronsecurity.com) Copyright (C) 2017 Andris Raugulis (moo@arthepsy.eu) Permission is hereby granted, free of charge, to any person obtaining a copy @@ -39,7 +39,6 @@ from ssh_audit.globals import SSH_HEADER from ssh_audit.outputbuffer import OutputBuffer from ssh_audit.protocol import Protocol from ssh_audit.readbuf import ReadBuf -from ssh_audit.ssh1 import SSH1 from ssh_audit.ssh2_kex import SSH2_Kex from ssh_audit.ssh2_kexparty import SSH2_KexParty from ssh_audit.utils import Utils @@ -173,7 +172,7 @@ class SSH_Socket(ReadBuf, WriteBuf): errm = 'cannot connect to {} port {}: {}'.format(*errt) return '[exception] {}'.format(errm) - def get_banner(self, sshv: int = 2) -> Tuple[Optional['Banner'], List[str], Optional[str]]: + def get_banner(self) -> Tuple[Optional['Banner'], List[str], Optional[str]]: self.__outputbuffer.d('Getting banner...', write_now=True) if self.__sock is None: @@ -181,7 +180,7 @@ class SSH_Socket(ReadBuf, WriteBuf): if self.__banner is not None: return self.__banner, self.__header, None - banner = SSH_HEADER.format('1.5' if sshv == 1 else '2.0') + banner = SSH_HEADER.format('2.0') if self.__state < self.SM_BANNER_SENT: self.send_banner(banner) @@ -254,47 +253,27 @@ class SSH_Socket(ReadBuf, WriteBuf): if s < 0: raise SSH_Socket.InsufficientReadException(e) - def read_packet(self, sshv: int = 2) -> Tuple[int, bytes]: + def read_packet(self) -> Tuple[int, bytes]: try: header = WriteBuf() self.ensure_read(4) packet_length = self.read_int() header.write_int(packet_length) # XXX: validate length - if sshv == 1: - padding_length = 8 - packet_length % 8 - self.ensure_read(padding_length) - padding = self.read(padding_length) - header.write(padding) - payload_length = packet_length - check_size = padding_length + payload_length - else: - self.ensure_read(1) - padding_length = self.read_byte() - header.write_byte(padding_length) - payload_length = packet_length - padding_length - 1 - check_size = 4 + 1 + payload_length + padding_length + self.ensure_read(1) + padding_length = self.read_byte() + header.write_byte(padding_length) + payload_length = packet_length - padding_length - 1 + check_size = 4 + 1 + payload_length + padding_length if check_size % self.__block_size != 0: self.__outputbuffer.fail('[exception] invalid ssh packet (block size)').write() sys.exit(exitcodes.CONNECTION_ERROR) self.ensure_read(payload_length) - if sshv == 1: - payload = self.read(payload_length - 4) - header.write(payload) - crc = self.read_int() - header.write_int(crc) - else: - payload = self.read(payload_length) - header.write(payload) + payload = self.read(payload_length) + header.write(payload) packet_type = ord(payload[0:1]) - if sshv == 1: - rcrc = SSH1.crc32(padding + payload) - if crc != rcrc: - self.__outputbuffer.fail('[exception] packet checksum CRC32 mismatch.').write() - sys.exit(exitcodes.CONNECTION_ERROR) - else: - self.ensure_read(padding_length) - padding = self.read(padding_length) + self.ensure_read(padding_length) + _ = self.read(padding_length) payload = payload[1:] return packet_type, payload except SSH_Socket.InsufficientReadException as ex: diff --git a/ssh-audit.1 b/ssh-audit.1 index 0968c15..a3ff5f9 100644 --- a/ssh-audit.1 +++ b/ssh-audit.1 @@ -1,4 +1,4 @@ -.TH SSH-AUDIT 1 "September 24, 2024" +.TH SSH-AUDIT 1 "July 26, 2025" .SH NAME \fBssh-audit\fP \- SSH server & client configuration auditor .SH SYNOPSIS @@ -16,16 +16,6 @@ See for official hardening guides for common platf .br Print short summary of options. -.TP -.B -1, \-\-ssh1 -.br -Only perform an audit using SSH protocol version 1. - -.TP -.B -2, \-\-ssh2 -.br -Only perform an audit using SSH protocol version 2. - .TP .B -4, \-\-ipv4 .br diff --git a/test/test_auditconf.py b/test/test_auditconf.py index 1f389b9..3868b01 100644 --- a/test/test_auditconf.py +++ b/test/test_auditconf.py @@ -15,8 +15,6 @@ class TestAuditConf: options = { 'host': '', 'port': 22, - 'ssh1': True, - 'ssh2': True, 'batch': False, 'colors': True, 'verbose': False, @@ -28,8 +26,6 @@ class TestAuditConf: options[k] = v assert conf.host == options['host'] assert conf.port == options['port'] - assert conf.ssh1 is options['ssh1'] - assert conf.ssh2 is options['ssh2'] assert conf.batch is options['batch'] assert conf.colors is options['colors'] assert conf.verbose is options['verbose'] @@ -43,7 +39,7 @@ class TestAuditConf: def test_audit_conf_booleans(self): conf = self.AuditConf() - for p in ['ssh1', 'ssh2', 'batch', 'colors', 'verbose']: + for p in ['batch', 'colors', 'verbose']: for v in [True, 1]: setattr(conf, p, v) assert getattr(conf, p) is True @@ -147,12 +143,6 @@ class TestAuditConf: conf = c('localhost:99999') with pytest.raises(SystemExit): conf = c('-p 99999 localhost') - conf = c('-1 localhost') - self._test_conf(conf, host='localhost', ssh1=True, ssh2=False) - conf = c('-2 localhost') - self._test_conf(conf, host='localhost', ssh1=False, ssh2=True) - conf = c('-12 localhost') - self._test_conf(conf, host='localhost', ssh1=True, ssh2=True) conf = c('-4 localhost') self._test_conf(conf, host='localhost', ipv4=True, ipv6=False, ipvo=(4,)) conf = c('-6 localhost') diff --git a/test/test_errors.py b/test/test_errors.py index ad56a29..8744305 100644 --- a/test/test_errors.py +++ b/test/test_errors.py @@ -166,7 +166,6 @@ class TestErrors: vsocket.rdata.append(b'SSH-1.3-ssh-audit-test\r\n') vsocket.rdata.append(b'Protocol major versions differ.\n') conf = self._conf() - conf.ssh1, conf.ssh2 = True, False lines = self._audit(output_spy, conf) assert len(lines) == 4 assert 'error reading packet' in lines[-1] diff --git a/test/test_ssh1.py b/test/test_ssh1.py deleted file mode 100644 index bdc9b1b..0000000 --- a/test/test_ssh1.py +++ /dev/null @@ -1,174 +0,0 @@ -import struct -import pytest - -from ssh_audit.auditconf import AuditConf -from ssh_audit.fingerprint import Fingerprint -from ssh_audit.outputbuffer import OutputBuffer -from ssh_audit.protocol import Protocol -from ssh_audit.readbuf import ReadBuf -from ssh_audit.ssh1 import SSH1 -from ssh_audit.ssh1_publickeymessage import SSH1_PublicKeyMessage -from ssh_audit.ssh_audit import audit -from ssh_audit.writebuf import WriteBuf - - -# pylint: disable=line-too-long,attribute-defined-outside-init -class TestSSH1: - @pytest.fixture(autouse=True) - def init(self, ssh_audit): - self.OutputBuffer = OutputBuffer - self.protocol = Protocol - self.ssh1 = SSH1 - self.PublicKeyMessage = SSH1_PublicKeyMessage - self.rbuf = ReadBuf - self.wbuf = WriteBuf - self.audit = audit - self.AuditConf = AuditConf - self.fingerprint = Fingerprint - - def _conf(self): - conf = self.AuditConf('localhost', 22) - conf.colors = False - conf.batch = True - conf.verbose = True - conf.ssh1 = True - conf.ssh2 = False - conf.skip_rate_test = True - return conf - - def _create_ssh1_packet(self, payload, valid_crc=True): - padding = -(len(payload) + 4) % 8 - plen = len(payload) + 4 - pad_bytes = b'\x00' * padding - cksum = self.ssh1.crc32(pad_bytes + payload) if valid_crc else 0 - data = struct.pack('>I', plen) + pad_bytes + payload + struct.pack('>I', cksum) - return data - - @classmethod - def _server_key(cls): - return (1024, 0x10001, 0xee6552da432e0ac2c422df1a51287507748bfe3b5e3e4fa989a8f49fdc163a17754939ef18ef8a667ea3b71036a151fcd7f5e01ceef1e4439864baf3ac569047582c69d6c128212e0980dcb3168f00d371004039983f6033cd785b8b8f85096c7d9405cbfdc664e27c966356a6b4eb6ee20ad43414b50de18b22829c1880b551) - - @classmethod - def _host_key(cls): - return (2048, 0x10001, 0xdfa20cd2a530ccc8c870aa60d9feb3b35deeab81c3215a96557abbd683d21f4600f38e475d87100da9a4404220eeb3bb5584e5a2b5b48ffda58530ea19104a32577d7459d91e76aa711b241050f4cc6d5327ccce254f371acad3be56d46eb5919b73f20dbdb1177b700f00891c5bf4ed128bb90ed541b778288285bcfa28432ab5cbcb8321b6e24760e998e0daa519f093a631e44276d7dd252ce0c08c75e2ab28a7349ead779f97d0f20a6d413bf3623cd216dc35375f6366690bcc41e3b2d5465840ec7ee0dc7e3f1c101d674a0c7dbccbc3942788b111396add2f8153b46a0e4b50d66e57ee92958f1c860dd97cc0e40e32febff915343ed53573142bdf4b) - - def _pkm_payload(self): - w = self.wbuf() - w.write(b'\x88\x99\xaa\xbb\xcc\xdd\xee\xff') - b, e, m = self._server_key() - w.write_int(b).write_mpint1(e).write_mpint1(m) - b, e, m = self._host_key() - w.write_int(b).write_mpint1(e).write_mpint1(m) - w.write_int(2) - w.write_int(72) - w.write_int(36) - return w.write_flush() - - def test_crc32(self): - assert self.ssh1.crc32(b'') == 0x00 - assert self.ssh1.crc32(b'The quick brown fox jumps over the lazy dog') == 0xb9c60808 - - def test_fingerprint(self): - # pylint: disable=protected-access - b, e, m = self._host_key() - fpd = self.wbuf._create_mpint(m, False) - fpd += self.wbuf._create_mpint(e, False) - fp = self.fingerprint(fpd) - assert b == 2048 - assert fp.md5 == 'MD5:9d:26:f8:39:fc:20:9d:9b:ca:cc:4a:0f:e1:93:f5:96' - assert fp.sha256 == 'SHA256:vZdx3mhzbvVJmn08t/ruv8WDhJ9jfKYsCTuSzot+QIs' - - def _assert_pkm_keys(self, pkm, skey, hkey): - b, e, m = skey - assert pkm.server_key_bits == b - assert pkm.server_key_public_exponent == e - assert pkm.server_key_public_modulus == m - b, e, m = hkey - assert pkm.host_key_bits == b - assert pkm.host_key_public_exponent == e - assert pkm.host_key_public_modulus == m - - def _assert_pkm_fields(self, pkm, skey, hkey): - assert pkm is not None - assert pkm.cookie == b'\x88\x99\xaa\xbb\xcc\xdd\xee\xff' - self._assert_pkm_keys(pkm, skey, hkey) - assert pkm.protocol_flags == 2 - assert pkm.supported_ciphers_mask == 72 - assert pkm.supported_ciphers == ['3des', 'blowfish'] - assert pkm.supported_authentications_mask == 36 - assert pkm.supported_authentications == ['rsa', 'tis'] - fp = self.fingerprint(pkm.host_key_fingerprint_data) - assert fp.md5 == 'MD5:9d:26:f8:39:fc:20:9d:9b:ca:cc:4a:0f:e1:93:f5:96' - assert fp.sha256 == 'SHA256:vZdx3mhzbvVJmn08t/ruv8WDhJ9jfKYsCTuSzot+QIs' - - def test_pkm_init(self): - cookie = b'\x88\x99\xaa\xbb\xcc\xdd\xee\xff' - pflags, cmask, amask = 2, 72, 36 - skey, hkey = self._server_key(), self._host_key() - pkm = self.PublicKeyMessage(cookie, skey, hkey, pflags, cmask, amask) - self._assert_pkm_fields(pkm, skey, hkey) - for skey2 in ([], [0], [0, 1], [0, 1, 2, 3]): - with pytest.raises(ValueError): - pkm = self.PublicKeyMessage(cookie, skey2, hkey, pflags, cmask, amask) - for hkey2 in ([], [0], [0, 1], [0, 1, 2, 3]): - with pytest.raises(ValueError): - print(hkey2) - pkm = self.PublicKeyMessage(cookie, skey, hkey2, pflags, cmask, amask) - - def test_pkm_read(self): - pkm = self.PublicKeyMessage.parse(self._pkm_payload()) - self._assert_pkm_fields(pkm, self._server_key(), self._host_key()) - - def test_pkm_payload(self): - cookie = b'\x88\x99\xaa\xbb\xcc\xdd\xee\xff' - skey, hkey = self._server_key(), self._host_key() - pflags, cmask, amask = 2, 72, 36 - pkm1 = self.PublicKeyMessage(cookie, skey, hkey, pflags, cmask, amask) - pkm2 = self.PublicKeyMessage.parse(self._pkm_payload()) - assert pkm1.payload == pkm2.payload - - def test_ssh1_server_simple(self, output_spy, virtual_socket): - vsocket = virtual_socket - w = self.wbuf() - w.write_byte(self.protocol.SMSG_PUBLIC_KEY) - w.write(self._pkm_payload()) - vsocket.rdata.append(b'SSH-1.5-OpenSSH_7.2 ssh-audit-test\r\n') - vsocket.rdata.append(self._create_ssh1_packet(w.write_flush())) - output_spy.begin() - out = self.OutputBuffer() - self.audit(out, self._conf()) - out.write() - lines = output_spy.flush() - assert len(lines) == 13 - - def test_ssh1_server_invalid_first_packet(self, output_spy, virtual_socket): - vsocket = virtual_socket - w = self.wbuf() - w.write_byte(self.protocol.SMSG_PUBLIC_KEY + 1) - w.write(self._pkm_payload()) - vsocket.rdata.append(b'SSH-1.5-OpenSSH_7.2 ssh-audit-test\r\n') - vsocket.rdata.append(self._create_ssh1_packet(w.write_flush())) - output_spy.begin() - out = self.OutputBuffer() - ret = self.audit(out, self._conf()) - out.write() - assert ret != 0 - lines = output_spy.flush() - assert len(lines) == 6 - assert 'unknown message' in lines[-1] - - def test_ssh1_server_invalid_checksum(self, output_spy, virtual_socket): - vsocket = virtual_socket - w = self.wbuf() - w.write_byte(self.protocol.SMSG_PUBLIC_KEY + 1) - w.write(self._pkm_payload()) - vsocket.rdata.append(b'SSH-1.5-OpenSSH_7.2 ssh-audit-test\r\n') - vsocket.rdata.append(self._create_ssh1_packet(w.write_flush(), False)) - output_spy.begin() - out = self.OutputBuffer() - with pytest.raises(SystemExit): - self.audit(out, self._conf()) - out.write() - lines = output_spy.flush() - assert len(lines) == 3 - assert ('checksum' in lines[0]) or ('checksum' in lines[1]) or ('checksum' in lines[2])