From af663da83811ad7c40bcd5ca411ad27ceafbc134 Mon Sep 17 00:00:00 2001 From: Joe Testa Date: Thu, 22 Aug 2019 15:47:37 -0400 Subject: [PATCH] Now SHA256 fingerprints are displayed for RSA and ED25519 host keys. Fixes #2. --- ssh-audit.py | 209 +++++++++++++++++++++++++++++++-------------------- 1 file changed, 128 insertions(+), 81 deletions(-) diff --git a/ssh-audit.py b/ssh-audit.py index 9be04dc..5f138e0 100755 --- a/ssh-audit.py +++ b/ssh-audit.py @@ -272,7 +272,10 @@ class OutputBuffer(list): sys.stdout = self.__buf return self - def flush(self): + def flush(self, sort_lines=False): + # Lines must be sorted in some cases to ensure consistent testing. + if sort_lines: + self.sort() # type: () -> None for line in self: print(line) @@ -322,7 +325,7 @@ class SSH2(object): # pylint: disable=too-few-public-methods 'diffie-hellman-group18-sha512': [['7.3']], 'diffie-hellman-group-exchange-sha1': [['2.3.0', '6.6', None], [FAIL_OPENSSH67_UNSAFE], [WARN_HASH_WEAK]], 'diffie-hellman-group-exchange-sha256': [['4.4']], - 'diffie-hellman-group-exchange-sha256@ssh.com': [['4.4']], + 'diffie-hellman-group-exchange-sha256@ssh.com': [[]], 'ecdh-sha2-nistp256': [['5.7,d2013.62,l10.6.0'], [WARN_CURVES_WEAK]], 'ecdh-sha2-nistp384': [['5.7,d2013.62'], [WARN_CURVES_WEAK]], 'ecdh-sha2-nistp521': [['5.7,d2013.62'], [WARN_CURVES_WEAK]], @@ -466,6 +469,7 @@ class SSH2(object): # pylint: disable=too-few-public-methods self.__rsa_key_sizes = {} self.__dh_modulus_sizes = {} + self.__host_keys = {} @property def cookie(self): @@ -516,6 +520,12 @@ class SSH2(object): # pylint: disable=too-few-public-methods def dh_modulus_sizes(self): return self.__dh_modulus_sizes + def set_host_key(self, key_type, hostkey): + self.__host_keys[key_type] = hostkey + + def host_keys(self): + return self.__host_keys + def write(self, wbuf): # type: (WriteBuf) -> None wbuf.write(self.cookie) @@ -561,10 +571,22 @@ class SSH2(object): # pylint: disable=too-few-public-methods kex = cls(cookie, kex_algs, key_algs, cli, srv, follows, unused) return kex - # Obtains RSA host keys and checks their size. - class RSAKeyTest(object): - RSA_TYPES = ['ssh-rsa', 'rsa-sha2-256', 'rsa-sha2-512'] - RSA_CA_TYPES = ['ssh-rsa-cert-v01@openssh.com'] + # Obtains host keys, checks their size, and derives their fingerprints. + class HostKeyTest(object): + # Tracks the RSA host key types. As of this writing, testing one in this family yields valid results for the rest. + RSA_FAMILY = ['ssh-rsa', 'rsa-sha2-256', 'rsa-sha2-512'] + + # Dict holding the host key types we should extract & parse. 'cert' is True to denote that a host key type handles certificates (thus requires additional parsing). 'variable_key_len' is True for host key types that can have variable sizes (True only for RSA types, as the rest are of fixed-size). After the host key type is fully parsed, the key 'parsed' is added with a value of True. + HOST_KEY_TYPES = { + 'ssh-rsa': {'cert': False, 'variable_key_len': True}, + 'rsa-sha2-256': {'cert': False, 'variable_key_len': True}, + 'rsa-sha2-512': {'cert': False, 'variable_key_len': True}, + + 'ssh-rsa-cert-v01@openssh.com': {'cert': True, 'variable_key_len': True}, + + 'ssh-ed25519': {'cert': False, 'variable_key_len': False}, + 'ssh-ed25519-cert-v01@openssh.com': {'cert': True, 'variable_key_len': False}, + } @staticmethod def run(s, server_kex): @@ -595,90 +617,90 @@ class SSH2(object): # pylint: disable=too-few-public-methods break if kex_str is not None: - SSH2.RSAKeyTest.__test(s, server_kex, kex_str, kex_group, SSH2.RSAKeyTest.RSA_TYPES) - SSH2.RSAKeyTest.__test(s, server_kex, kex_str, kex_group, SSH2.RSAKeyTest.RSA_CA_TYPES, ca=True) + SSH2.HostKeyTest.__test(s, server_kex, kex_str, kex_group, SSH2.HostKeyTest.HOST_KEY_TYPES) @staticmethod - def __test(s, server_kex, kex_str, kex_group, rsa_types, ca=False): - # If the server supports one of the RSA types, extract its key size. + def __test(s, server_kex, kex_str, kex_group, host_key_types): hostkey_modulus_size = 0 ca_modulus_size = 0 - ran_test = False - # If the connection is closed, re-open it and get the kex again. - if not s.is_connected(): - s.connect() - unused = None # pylint: disable=unused-variable - unused, unused, err = s.get_banner() - if err is not None: - s.close() - return + # For each host key type... + for host_key_type in host_key_types: + # Skip those already handled (i.e.: those in the RSA family, as testing one tests them all). + if 'parsed' in host_key_types[host_key_type] and host_key_types[host_key_type]['parsed']: + continue - # Parse the server's initial KEX. - packet_type = 0 # pylint: disable=unused-variable - packet_type, payload = s.read_packet() - SSH2.Kex.parse(payload) + # If this host key type is supported by the server, we test it. + if host_key_type in server_kex.key_algorithms: + cert = host_key_types[host_key_type]['cert'] + variable_key_len = host_key_types[host_key_type]['variable_key_len'] + # If the connection is closed, re-open it and get the kex again. + if not s.is_connected(): + s.connect() + unused = None # pylint: disable=unused-variable + unused, unused, err = s.get_banner() + if err is not None: + s.close() + return - for rsa_type in rsa_types: - if rsa_type in server_kex.key_algorithms: - ran_test = True + # Parse the server's initial KEX. + packet_type = 0 # pylint: disable=unused-variable + packet_type, payload = s.read_packet() + SSH2.Kex.parse(payload) # Send the server our KEXINIT message, using only our - # selected kex and RSA type. Send the server's own + # selected kex and host key type. Send the server's own # list of ciphers and MACs back to it (this doesn't # matter, really). - client_kex = SSH2.Kex(os.urandom(16), [kex_str], [rsa_type], server_kex.client, server_kex.server, 0, 0) + client_kex = SSH2.Kex(os.urandom(16), [kex_str], [host_key_type], server_kex.client, server_kex.server, 0, 0) s.write_byte(SSH.Protocol.MSG_KEXINIT) client_kex.write(s) s.send_packet() # Do the initial DH exchange. The server responds back - # with the host key and its length. Bingo. + # with the host key and its length. Bingo. We also get back the host key fingerprint. kex_group.send_init(s) - kex_group.recv_reply(s) + host_key = kex_group.recv_reply(s, variable_key_len) + server_kex.set_host_key(host_key_type, host_key) hostkey_modulus_size = kex_group.get_hostkey_size() ca_modulus_size = kex_group.get_ca_size() - # If we're not working with the CA types, we only need to - # test one RSA key, since the others will all be the same. - if ca is False: - break + # Close the socket, as the connection has + # been put in a state that later tests can't use. + s.close() - if hostkey_modulus_size > 0 or ca_modulus_size > 0: - # Set the hostkey size for all RSA key types since 'ssh-rsa', - # 'rsa-sha2-256', etc. are all using the same host key. - # Note, however, that this may change in the future. - if ca is False: - for rsa_type in rsa_types: - server_kex.set_rsa_key_size(rsa_type, hostkey_modulus_size) - else: - server_kex.set_rsa_key_size(rsa_type, hostkey_modulus_size, ca_modulus_size) + # If the host key modulus or CA modulus was successfully parsed, check to see that its a safe size. + if hostkey_modulus_size > 0 or ca_modulus_size > 0: + # Set the hostkey size for all RSA key types since 'ssh-rsa', + # 'rsa-sha2-256', etc. are all using the same host key. + # Note, however, that this may change in the future. + if cert is False and host_key_type in SSH2.HostKeyTest.RSA_FAMILY: + for rsa_type in SSH2.HostKeyTest.RSA_FAMILY: + server_kex.set_rsa_key_size(rsa_type, hostkey_modulus_size) + elif cert is True: + server_kex.set_rsa_key_size(host_key_type, hostkey_modulus_size, ca_modulus_size) - # Keys smaller than 2048 result in a failure. - fail = False - if hostkey_modulus_size < 2048 or (ca_modulus_size < 2048 and ca_modulus_size > 0): - fail = True + # Keys smaller than 2048 result in a failure. Update the database accordingly. + if (cert is False) and (hostkey_modulus_size < 2048): + for rsa_type in SSH2.HostKeyTest.RSA_FAMILY: + alg_list = SSH2.KexDB.ALGORITHMS['key'][rsa_type] + alg_list.append(['using small %d-bit modulus' % hostkey_modulus_size]) + elif (cert is True) and ((hostkey_modulus_size < 2048) or (ca_modulus_size > 0 and ca_modulus_size < 2048)): + alg_list = SSH2.KexDB.ALGORITHMS['key'][host_key_type] + min_modulus = min(hostkey_modulus_size, ca_modulus_size) + min_modulus = min_modulus if min_modulus > 0 else max(hostkey_modulus_size, ca_modulus_size) + alg_list.append(['using small %d-bit modulus' % min_modulus]) - # If this is a bad key size, update the database accordingly. - if fail: - if ca is False: - for rsa_type in SSH2.RSAKeyTest.RSA_TYPES: - alg_list = SSH2.KexDB.ALGORITHMS['key'][rsa_type] - alg_list.append(['using small %d-bit modulus' % hostkey_modulus_size]) + # If this host key type is in the RSA family, then mark them all as parsed (since results in one are valid for them all). + if host_key_type in SSH2.HostKeyTest.RSA_FAMILY: + for rsa_type in SSH2.HostKeyTest.RSA_FAMILY: + host_key_types[rsa_type]['parsed'] = True else: - alg_list = SSH2.KexDB.ALGORITHMS['key'][rsa_type] + host_key_types[host_key_type]['parsed'] = True - min_modulus = min(hostkey_modulus_size, ca_modulus_size) - min_modulus = min_modulus if min_modulus > 0 else max(hostkey_modulus_size, ca_modulus_size) - alg_list.append(['using small %d-bit modulus' % min_modulus]) - - # If we ran any tests, close the socket, as the connection has - # been put in a state that later tests can't use. - if ran_test: - s.close() # Performs DH group exchanges to find what moduli are supported, and checks # their size. @@ -733,21 +755,21 @@ class SSH2(object): # pylint: disable=too-few-public-methods if SSH2.GEXTest.reconnect(s, gex_alg) is False: break - kex_group = GEX_ALGS[gex_alg]() smallest_modulus = -1 # First try a range of weak sizes. try: kex_group.send_init_gex(s, 512, 1024, 1536) - kex_group.recv_reply(s) + kex_group.recv_reply(s, False) # Its been observed that servers will return a group # larger than the requested max. So just because we # got here, doesn't mean the server is vulnerable... smallest_modulus = kex_group.get_dh_modulus_size() - except Exception: # pylint: disable=bare-except - x = 1 # pylint: disable=unused-variable + + except Exception as e: # pylint: disable=bare-except + pass finally: s.close() @@ -766,10 +788,12 @@ class SSH2(object): # pylint: disable=too-few-public-methods try: kex_group.send_init_gex(s, bits, bits, bits) - kex_group.recv_reply(s) + kex_group.recv_reply(s, False) smallest_modulus = kex_group.get_dh_modulus_size() - except Exception: # pylint: disable=bare-except - x = 1 # pylint: disable=unused-variable + except Exception as e: # pylint: disable=bare-except + #import traceback + #print(traceback.format_exc()) + pass finally: # The server is in a state that is not re-testable, # so there's nothing else to do with this open @@ -2132,6 +2156,7 @@ class KexDH(object): # pragma: nocover self.__f = 0 self.__h_sig = 0 + def set_params(self, g, p): self.__g = g self.__p = p @@ -2150,8 +2175,9 @@ class KexDH(object): # pragma: nocover s.send_packet() # Parse a KEXDH_REPLY or KEXDH_GEX_REPLY message from the server. This - # Contains the host key, among other things. - def recv_reply(self, s): + # contains the host key, among other things. Function returns the host + # key blob (from which the fingerprint can be calculated). + def recv_reply(self, s, parse_host_key_size=True): packet_type, payload = s.read_packet(2) if packet_type != -1 and packet_type not in [SSH.Protocol.MSG_KEXDH_REPLY, SSH.Protocol.MSG_KEXDH_GEX_REPLY]: # TODO: change Exception to something more specific. @@ -2160,7 +2186,7 @@ class KexDH(object): # pragma: nocover # A connection error occurred. We can't parse anything, so just # return. The host key modulus (and perhaps certificate modulus) # will remain at length 0. - return + return None hostkey_len = f_len = h_sig_len = 0 # pylint: disable=unused-variable hostkey_type_len = hostkey_e_len = 0 # pylint: disable=unused-variable @@ -2178,6 +2204,11 @@ class KexDH(object): # pragma: nocover # Get the host key blob, F, and signature. ptr = 0 hostkey, hostkey_len, ptr = KexDH.__get_bytes(payload, ptr) + + # If we are not supposed to parse the host key size (i.e.: it is a type that is of fixed size such as ed25519), then stop here. + if not parse_host_key_size: + return hostkey + self.__f, f_len, ptr = KexDH.__get_bytes(payload, ptr) self.__h_sig, h_sig_len, ptr = KexDH.__get_bytes(payload, ptr) @@ -2254,6 +2285,7 @@ class KexDH(object): # pragma: nocover # CA's modulus. Bingo. ca_key_n, self.__ca_n_len, ptr = KexDH.__get_bytes(ca_key, ptr) + return hostkey @staticmethod def __get_bytes(buf, ptr): @@ -2639,20 +2671,35 @@ def output_security(banner, padlen): out.sep() -def output_fingerprint(algs, sha256=True, padlen=0): +def output_fingerprints(algs, sha256=True): # type: (SSH.Algorithms, bool, int) -> None with OutputBuffer() as obuf: fps = [] if algs.ssh1kex is not None: name = 'ssh-rsa1' fp = SSH.Fingerprint(algs.ssh1kex.host_key_fingerprint_data) - bits = algs.ssh1kex.host_key_bits - fps.append((name, fp, bits)) + #bits = algs.ssh1kex.host_key_bits + fps.append((name, fp)) + if algs.ssh2kex is not None: + host_keys = algs.ssh2kex.host_keys() + for host_key_type in algs.ssh2kex.host_keys(): + fp = SSH.Fingerprint(host_keys[host_key_type]) + + # Workaround for Python's order-indifference in dicts. We might get a random RSA type (ssh-rsa, rsa-sha2-256, or rsa-sha2-512), so running the tool against the same server three times may give three different host key types here. So if we have any RSA type, we will simply hard-code it to 'ssh-rsa'. + if host_key_type in SSH2.HostKeyTest.RSA_FAMILY: + host_key_type = 'ssh-rsa' + + # Skip over certificate host types (or we would return invalid fingerprints). + if '-cert-' not in host_key_type: + fps.append((host_key_type, fp)) + # Similarly, the host keys can be processed in random order due to Python's order-indifference in dicts. So we sort this list before printing; this makes automated testing possible. + fps = sorted(fps) for fpp in fps: - name, fp, bits = fpp + name, fp = fpp fpo = fp.sha256 if sha256 else fp.md5 - p = '' if out.batch else ' ' * (padlen - len(name)) - out.good('(fin) {0}{1} -- {2} {3}'.format(name, p, bits, fpo)) + #p = '' if out.batch else ' ' * (padlen - len(name)) + #out.good('(fin) {0}{1} -- {2} {3}'.format(name, p, bits, fpo)) + out.good('(fin) {0}: {1}'.format(name, fpo)) if len(obuf) > 0: out.head('# fingerprints') obuf.flush() @@ -2694,7 +2741,7 @@ def output_recommendations(algs, software, padlen=0): else: title = '' out.head('# algorithm recommendations {0}'.format(title)) - obuf.flush() + obuf.flush(True) # Sort the output so that it is always stable (needed for repeatable testing). out.sep() @@ -2752,8 +2799,8 @@ def output(banner, header, kex=None, pkm=None): output_algorithms(title, adb, atype, kex.server.encryption, unknown_algorithms, maxlen) title, atype = 'message authentication code algorithms', 'mac' output_algorithms(title, adb, atype, kex.server.mac, unknown_algorithms, maxlen) + output_fingerprints(algs, True) output_recommendations(algs, software, maxlen) - output_fingerprint(algs, True, maxlen) # If we encountered any unknown algorithms, ask the user to report them. if len(unknown_algorithms) > 0: @@ -2933,7 +2980,7 @@ def audit(aconf, sshv=None): output(banner, header, pkm=pkm) elif sshv == 2: kex = SSH2.Kex.parse(payload) - SSH2.RSAKeyTest.run(s, kex) + SSH2.HostKeyTest.run(s, kex) SSH2.GEXTest.run(s, kex) output(banner, header, kex=kex)