From 1ba4c7c7ca437e2b84a69cfed74ffb5b4e90d3e4 Mon Sep 17 00:00:00 2001 From: Joe Testa Date: Wed, 20 Jan 2021 15:27:38 -0500 Subject: [PATCH] Send KEX before reading server's KEX during host key and GEX tests; this prevents deadlock against certain server implementations. --- docker_test.sh | 1 + src/ssh_audit/gextest.py | 22 ++++++++++------------ src/ssh_audit/hostkeytest.py | 22 +++++++++------------- 3 files changed, 20 insertions(+), 25 deletions(-) diff --git a/docker_test.sh b/docker_test.sh index f736528..4019a6f 100755 --- a/docker_test.sh +++ b/docker_test.sh @@ -443,6 +443,7 @@ function run_test { fi cid=`docker run -d -p 2222:22 ${IMAGE_NAME}:${IMAGE_VERSION} ${server_exec}` + #echo "Running: docker run -d -p 2222:22 ${IMAGE_NAME}:${IMAGE_VERSION} ${server_exec}" if [[ $? != 0 ]]; then echo -e "${REDB}Failed to run docker image! (exit code: $?)${CLR}" exit 1 diff --git a/src/ssh_audit/gextest.py b/src/ssh_audit/gextest.py index f6a7c12..8e0c0e9 100644 --- a/src/ssh_audit/gextest.py +++ b/src/ssh_audit/gextest.py @@ -1,7 +1,7 @@ """ The MIT License (MIT) - Copyright (C) 2017-2020 Joe Testa (jtesta@positronsecurity.com) + Copyright (C) 2017-2021 Joe Testa (jtesta@positronsecurity.com) Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal @@ -40,7 +40,7 @@ class GEXTest: # Creates a new connection to the server. Returns True on success, or False. @staticmethod - def reconnect(s: 'SSH_Socket', gex_alg: str) -> bool: + def reconnect(s: 'SSH_Socket', kex: 'SSH2_Kex', gex_alg: str) -> bool: if s.is_connected(): return True @@ -48,24 +48,22 @@ class GEXTest: if err is not None: return False - unused = None # pylint: disable=unused-variable - unused2 = None # pylint: disable=unused-variable - unused, unused2, err = s.get_banner() + _, _, err = s.get_banner() if err is not None: s.close() return False - # Parse the server's initial KEX. - packet_type = 0 # pylint: disable=unused-variable - packet_type, payload = s.read_packet(2) - kex = SSH2_Kex.parse(payload) - # Send our KEX using the specified group-exchange and most of the # server's own values. client_kex = SSH2_Kex(os.urandom(16), [gex_alg], kex.key_algorithms, kex.client, kex.server, False, 0) s.write_byte(Protocol.MSG_KEXINIT) client_kex.write(s) s.send_packet() + + # Parse the server's KEX. + _, payload = s.read_packet(2) + SSH2_Kex.parse(payload) + return True # Runs the DH moduli test against the specified target. @@ -87,7 +85,7 @@ class GEXTest: for gex_alg in GEX_ALGS: if gex_alg in kex.kex_algorithms: - if GEXTest.reconnect(s, gex_alg) is False: + if GEXTest.reconnect(s, kex, gex_alg) is False: break kex_group = GEX_ALGS[gex_alg]() @@ -117,7 +115,7 @@ class GEXTest: if bits >= smallest_modulus > 0: break - if GEXTest.reconnect(s, gex_alg) is False: + if GEXTest.reconnect(s, kex, gex_alg) is False: reconnect_failed = True break diff --git a/src/ssh_audit/hostkeytest.py b/src/ssh_audit/hostkeytest.py index c220c3f..d8035fd 100644 --- a/src/ssh_audit/hostkeytest.py +++ b/src/ssh_audit/hostkeytest.py @@ -1,7 +1,7 @@ """ The MIT License (MIT) - Copyright (C) 2017-2020 Joe Testa (jtesta@positronsecurity.com) + Copyright (C) 2017-2021 Joe Testa (jtesta@positronsecurity.com) Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal @@ -117,20 +117,16 @@ class HostKeyTest: s.close() return - # Parse the server's initial KEX. - packet_type = 0 # pylint: disable=unused-variable - packet_type, payload = s.read_packet() + # Send our KEX using the specified group-exchange and most of the server's own values. + client_kex = SSH2_Kex(os.urandom(16), [kex_str], [host_key_type], server_kex.client, server_kex.server, False, 0) + s.write_byte(Protocol.MSG_KEXINIT) + client_kex.write(s) + s.send_packet() + + # Parse the server's KEX. + _, payload = s.read_packet() SSH2_Kex.parse(payload) - # Send the server our KEXINIT message, using only our - # selected kex and host key type. Send the server's own - # list of ciphers and MACs back to it (this doesn't - # matter, really). - client_kex = SSH2_Kex(os.urandom(16), [kex_str], [host_key_type], server_kex.client, server_kex.server, False, 0) - - s.write_byte(Protocol.MSG_KEXINIT) - client_kex.write(s) - s.send_packet() # Do the initial DH exchange. The server responds back # with the host key and its length. Bingo. We also get back the host key fingerprint.