From 1123ac718c674597ca84c05da3c0cbd4050f39d1 Mon Sep 17 00:00:00 2001 From: Joe Testa Date: Tue, 11 Aug 2020 20:11:42 -0400 Subject: [PATCH] Send peer a list of supported algorithms after the banner exchange. Fixes not only the weird case of an ssh-audit client hanging against an ssh-audit server, but perhaps some real-world hangs as well. --- ssh-audit.py | 51 +++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 43 insertions(+), 8 deletions(-) diff --git a/ssh-audit.py b/ssh-audit.py index 70ed274..b7ac667 100755 --- a/ssh-audit.py +++ b/ssh-audit.py @@ -45,7 +45,7 @@ from typing import Dict, List, Set, Sequence, Tuple, Iterable from typing import Callable, Optional, Union, Any VERSION = 'v2.2.1-dev' -SSH_HEADER = 'SSH-{0}-OpenSSH_8.0' # SSH software to impersonate +SSH_HEADER = 'SSH-{0}-OpenSSH_8.2' # SSH software to impersonate GITHUB_ISSUES_URL = 'https://github.com/jtesta/ssh-audit/issues' # The URL to the Github issues tracker. # The program return values corresponding to failure(s) encountered, warning(s) encountered, connection errors, and no problems found, respectively. @@ -1133,6 +1133,12 @@ class SSH2: # pylint: disable=too-few-public-methods hostkey_modulus_size = 0 ca_modulus_size = 0 + # If the connection still exists, close it so we can test + # using a clean slate (otherwise it may exist in a non-testable + # state). + if s.is_connected(): + s.close() + # For each host key type... for host_key_type in host_key_types: # Skip those already handled (i.e.: those in the RSA family, as testing one tests them all). @@ -1146,7 +1152,10 @@ class SSH2: # pylint: disable=too-few-public-methods # If the connection is closed, re-open it and get the kex again. if not s.is_connected(): - s.connect() + err = s.connect() + if err is not None: + return + unused = None # pylint: disable=unused-variable unused2 = None # pylint: disable=unused-variable unused, unused2, err = s.get_banner() @@ -1222,7 +1231,10 @@ class SSH2: # pylint: disable=too-few-public-methods if s.is_connected(): return True - s.connect() + err = s.connect() + if err is not None: + return False + unused = None # pylint: disable=unused-variable unused2 = None # pylint: disable=unused-variable unused, unused2, err = s.get_banner() @@ -2445,7 +2457,8 @@ class SSH: # pylint: disable=too-few-public-methods c.settimeout(self.__timeout) self.__sock = c - def connect(self) -> None: + def connect(self) -> Optional[str]: + '''Returns None on success, or an error string.''' err = None for af, addr in self._resolve(self.__ipvo): s = None @@ -2454,7 +2467,7 @@ class SSH: # pylint: disable=too-few-public-methods s.settimeout(self.__timeout) s.connect(addr) self.__sock = s - return + return None except socket.error as e: err = e self._close_socket(s) @@ -2463,8 +2476,7 @@ class SSH: # pylint: disable=too-few-public-methods else: errt = (self.__host, self.__port, err) errm = 'cannot connect to {} port {}: {}'.format(*errt) - out.fail('[exception] {}'.format(errm)) - sys.exit(PROGRAM_RETVAL_CONNECTION_ERROR) + return '[exception] {}'.format(errm) def get_banner(self, sshv: int = 2) -> Tuple[Optional['SSH.Banner'], List[str], Optional[str]]: if self.__sock is None: @@ -2522,6 +2534,23 @@ class SSH: # pylint: disable=too-few-public-methods except socket.error as e: return -1, str(e.args[-1]) + def send_algorithms(self) -> None: + '''Sends the list of supported host keys, key exchanges, ciphers, and MACs. Emulates OpenSSH v8.2.''' + + key_exchanges = ['curve25519-sha256', 'curve25519-sha256@libssh.org', 'ecdh-sha2-nistp256', 'ecdh-sha2-nistp384', 'ecdh-sha2-nistp521', 'diffie-hellman-group-exchange-sha256', 'diffie-hellman-group16-sha512', 'diffie-hellman-group18-sha512', 'diffie-hellman-group14-sha256'] + hostkeys = ['rsa-sha2-512', 'rsa-sha2-256', 'ssh-rsa', 'ecdsa-sha2-nistp256', 'ssh-ed25519'] + ciphers = ['chacha20-poly1305@openssh.com', 'aes128-ctr', 'aes192-ctr', 'aes256-ctr', 'aes128-gcm@openssh.com', 'aes256-gcm@openssh.com'] + macs = ['umac-64-etm@openssh.com', 'umac-128-etm@openssh.com', 'hmac-sha2-256-etm@openssh.com', 'hmac-sha2-512-etm@openssh.com', 'hmac-sha1-etm@openssh.com', 'umac-64@openssh.com', 'umac-128@openssh.com', 'hmac-sha2-256', 'hmac-sha2-512', 'hmac-sha1'] + compressions = ['none', 'zlib@openssh.com'] + languages = [''] + + kexparty = SSH2.KexParty(ciphers, macs, compressions, languages) + kex = SSH2.Kex(os.urandom(16), key_exchanges, hostkeys, kexparty, kexparty, False, 0) + + self.write_byte(SSH.Protocol.MSG_KEXINIT) + kex.write(self) + self.send_packet() + def send_banner(self, banner: str) -> None: self.send(banner.encode() + b'\r\n') if self.__state < self.SM_BANNER_SENT: @@ -3659,7 +3688,11 @@ def audit(aconf: AuditConf, sshv: Optional[int] = None, print_target: bool = Fal if aconf.client_audit: s.listen_and_accept() else: - s.connect() + err = s.connect() + if err is not None: + out.fail(err) + sys.exit(PROGRAM_RETVAL_CONNECTION_ERROR) + if sshv is None: sshv = 2 if aconf.ssh2 else 1 err = None @@ -3670,6 +3703,8 @@ def audit(aconf: AuditConf, sshv: Optional[int] = None, print_target: bool = Fal else: err = '[exception] did not receive banner: {}'.format(err) if err is None: + s.send_algorithms() # Send the algorithms we support (except we don't since this isn't a real SSH connection). + packet_type, payload = s.read_packet(sshv) if packet_type < 0: try: