From 36def4b5aceafab77482c3a4997b84d5a8a31ed9 Mon Sep 17 00:00:00 2001 From: Joe Testa Date: Sat, 20 Jun 2026 10:07:14 -0400 Subject: [PATCH] Expanded SOCKS5 protocol support, refactored exception handling, updated documentation, added more tests. (#347) --- README.md | 4 +- src/ssh_audit/auditconf.py | 12 ++- src/ssh_audit/ssh_audit.py | 26 +++--- src/ssh_audit/ssh_socket.py | 170 +++++++++++++++++++----------------- src/ssh_audit/utils.py | 11 +++ ssh-audit.1 | 7 +- test/test_auditconf.py | 13 +-- test/test_socket.py | 5 -- 8 files changed, 137 insertions(+), 111 deletions(-) diff --git a/README.md b/README.md index f22fd18..f9e6d12 100644 --- a/README.md +++ b/README.md @@ -48,7 +48,7 @@ usage: ssh-audit.py [-h] [-4] [-6] [-b] [-c] [-d] [--conn-rate-test N[:max_rate]] [--dheat N[:kex[:e_len]]] [--get-hardening-guide platform] [--list-hardening-guides] [--lookup alg1[,alg2,...]] [--skip-rate-test] - [--threads N] + [--socks5 host:port] [--threads N] [host] # ssh-audit.py v3.4.0-dev, https://github.com/jtesta/ssh-audit @@ -128,6 +128,7 @@ optional arguments: --skip-rate-test skip the connection rate test during standard audits (used to safely infer whether the DHEat attack is viable) + --socks5 host:port connect via a SOCKS5 proxy (implies --skip-rate-test) --threads N number of threads to use when scanning multiple targets (-T/--targets) (default: 32) ``` @@ -263,6 +264,7 @@ For convenience, a web front-end on top of the command-line tool is available at - When running against multiple hosts, now prints each target host regardless of output level. - Batch mode (`-b`) no longer automatically enables verbose mode, due to sometimes confusing results; users can still explicitly enable verbose mode using the `-v` flag. - Added UNIX server socket scanning (specify the target with `unix:///path/to/socket`). + - Added SOCKS5 proxy support (specify the proxy with `--socks5 host:port`); partial credit [MichaƂ Majchrowicz](https://github.com/sectroyer). - Updated built-in policy for Debian 12. - Added built-in policies for OpenSSH 10.0, 10.1, 10.2, and 10.3. - Added hardening guides and policies for Debian 13. diff --git a/src/ssh_audit/auditconf.py b/src/ssh_audit/auditconf.py index 0b19cd5..0729c40 100644 --- a/src/ssh_audit/auditconf.py +++ b/src/ssh_audit/auditconf.py @@ -1,7 +1,7 @@ """ The MIT License (MIT) - Copyright (C) 2017-2025 Joe Testa (jtesta@positronsecurity.com) + Copyright (C) 2017-2026 Joe Testa (jtesta@positronsecurity.com) Copyright (C) 2017 Andris Raugulis (moo@arthepsy.eu) Permission is hereby granted, free of charge, to any person obtaining a copy @@ -67,7 +67,7 @@ class AuditConf: self.conn_rate_test_enabled = False self.conn_rate_test_threads = 0 self.conn_rate_test_target_rate = 0 - self.socks_proxy: Optional[str] = None # SOCKS5 proxy in "host:port" format + self.socks5_proxy: Optional[str] = None # SOCKS5 proxy in "host:port" format def __setattr__(self, name: str, value: Union[str, int, float, bool, Sequence[int]]) -> None: @@ -96,7 +96,7 @@ class AuditConf: if value == -1.0: raise ValueError('invalid timeout: {}'.format(value)) valid = True - elif name in ['ip_version_preference', 'lookup', 'policy_file', 'policy', 'target_file', 'target_list', 'gex_test', 'socks_proxy']: + elif name in ['ip_version_preference', 'lookup', 'policy_file', 'policy', 'target_file', 'target_list', 'gex_test']: valid = True elif name == "threads": valid, num_threads = True, Utils.parse_int(value) @@ -186,6 +186,12 @@ class AuditConf: if not isinstance(value, int): valid = False + elif name == "socks5_proxy": + valid = True + if isinstance(value, str): + host, port = Utils.parse_host_and_port(value, 0) + if len(host) == 0 or port <= 0 or port >= 65535: + raise ValueError("SOCKS5 host must be in host:port format.") if valid: object.__setattr__(self, name, value) diff --git a/src/ssh_audit/ssh_audit.py b/src/ssh_audit/ssh_audit.py index 9f00be9..67b90db 100755 --- a/src/ssh_audit/ssh_audit.py +++ b/src/ssh_audit/ssh_audit.py @@ -794,7 +794,7 @@ def process_commandline(out: OutputBuffer, args: List[str]) -> 'AuditConf': # p parser.add_argument("--list-hardening-guides", action="store_true", dest="list_hardening_guides", default=False, help="list all official, built-in hardening guides for common systems. Their full names can then be passed to --get-hardening-guide. Add -v to this option to view hardening guide change logs and prior versions.") parser.add_argument("--lookup", action="store", dest="lookup", metavar="alg1[,alg2,...]", type=str, default=None, help="looks up an algorithm(s) without connecting to a server.") parser.add_argument("--skip-rate-test", action="store_true", dest="skip_rate_test", default=False, help="skip the connection rate test during standard audits (used to safely infer whether the DHEat attack is viable)") - parser.add_argument("--socks", action="store", dest="socks_proxy", metavar="host:port", type=str, default=None, help="connect via a SOCKS5 proxy (e.g. 127.0.0.1:1080)") + parser.add_argument("--socks5", action="store", dest="socks5_proxy", metavar="host:port", type=str, default=None, help="connect via a SOCKS5 proxy (implies --skip-rate-test)") parser.add_argument("--threads", action="store", dest="threads", metavar="N", type=int, default=32, help="number of threads to use when scanning multiple targets (-T/--targets) (default: %(default)s)") @@ -818,23 +818,17 @@ def process_commandline(out: OutputBuffer, args: List[str]) -> 'AuditConf': # p aconf.list_policies = argument.list_policies aconf.manual = argument.manual aconf.skip_rate_test = argument.skip_rate_test + aconf.socks5_proxy = argument.socks5_proxy oport = argument.oport - if argument.socks_proxy is not None: - # Validate format: must be "host:port" - socks_parts = argument.socks_proxy.rsplit(':', 1) - if len(socks_parts) != 2 or not socks_parts[1].isdigit(): - out.fail("--socks must be in host:port format (e.g. 127.0.0.1:1080)", write_now=True) - sys.exit(exitcodes.UNKNOWN_ERROR) - socks_port = int(socks_parts[1]) - if socks_port < 1 or socks_port > 65535: - out.fail("SOCKS proxy port must be between 1 and 65535", write_now=True) - sys.exit(exitcodes.UNKNOWN_ERROR) - aconf.socks_proxy = argument.socks_proxy - if argument.batch is True: aconf.batch = True + # Skip the rate test if we're using a SOCKS5 proxy. + if aconf.socks5_proxy is not None: + out.d("Disabling rate test since SOCKS5 proxy is set.", write_now=True) + aconf.skip_rate_test = True + # If one -j was given, turn on JSON output. If -jj was given, enable indentation. aconf.json = argument.json > 0 if argument.json > 1: @@ -901,8 +895,8 @@ def process_commandline(out: OutputBuffer, args: List[str]) -> 'AuditConf': # p aconf.verbose = True out.verbose = True - except argparse.ArgumentError as err: - out.fail(str(err), write_now=True) + except (argparse.ArgumentError, ValueError) as err: + out.fail(f"Error: {str(err)}", write_now=True) parser.print_help() sys.exit(exitcodes.UNKNOWN_ERROR) @@ -1159,7 +1153,7 @@ def audit(out: OutputBuffer, aconf: AuditConf, print_target: bool = False) -> in out.debug = aconf.debug out.level = aconf.level out.use_colors = aconf.colors - s = SSH_Socket(out, aconf.host, aconf.port, aconf.ip_version_preference, aconf.timeout, aconf.timeout_set, aconf.socks_proxy) + s = SSH_Socket(out, aconf.host, aconf.port, aconf.ip_version_preference, aconf.timeout, aconf.timeout_set, aconf.socks5_proxy) if aconf.client_audit: out.v("Listening for client connection on port %d..." % aconf.port, write_now=True) diff --git a/src/ssh_audit/ssh_socket.py b/src/ssh_audit/ssh_socket.py index e401e20..2d29bb3 100644 --- a/src/ssh_audit/ssh_socket.py +++ b/src/ssh_audit/ssh_socket.py @@ -51,7 +51,7 @@ class SSH_Socket(ReadBuf, WriteBuf): SM_BANNER_SENT = 1 - def __init__(self, outputbuffer: 'OutputBuffer', host: Optional[str], port: int, ip_version_preference: List[int] = [], timeout: Union[int, float] = 5, timeout_set: bool = False, socks_proxy: Optional[str] = None) -> None: # pylint: disable=dangerous-default-value + def __init__(self, outputbuffer: 'OutputBuffer', host: Optional[str], port: int, ip_version_preference: List[int] = [], timeout: Union[int, float] = 5, timeout_set: bool = False, socks5_proxy: Optional[str] = None) -> None: # pylint: disable=dangerous-default-value super(SSH_Socket, self).__init__() self.__outputbuffer = outputbuffer self.__sock: Optional[socket.socket] = None @@ -72,7 +72,7 @@ class SSH_Socket(ReadBuf, WriteBuf): self.__timeout_set = timeout_set self.client_host: Optional[str] = None self.client_port = None - self.__socks_proxy = socks_proxy # SOCKS5 proxy in "host:port" format, or None + self.__socks5_proxy = socks5_proxy # SOCKS5 proxy in "host:port" format, or None def _resolve(self) -> Iterable[Tuple[int, Tuple[Any, ...]]]: @@ -165,16 +165,16 @@ class SSH_Socket(ReadBuf, WriteBuf): try: # If we're connecting to a UNIX socket. if self.__host.startswith("unix://"): - if self.__socks_proxy is not None: - return '[exception] cannot use a SOCKS5 proxy with UNIX socket targets' s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) s.settimeout(self.__timeout) s.connect(self.__host[7:]) self.__sock = s return None - if self.__socks_proxy is not None: - return self._connect_via_socks5() + # If we're connecting through a SOCKS5 proxy. + if self.__socks5_proxy is not None: + self.__sock = self._connect_via_socks5() + return None # We're connecting to an Internet host. for af, addr in self._resolve(): @@ -188,6 +188,7 @@ class SSH_Socket(ReadBuf, WriteBuf): except socket.error as e: err = e self._close_socket(s) + if err is None: errm = 'host {} has no DNS records'.format(self.__host) else: @@ -196,87 +197,98 @@ class SSH_Socket(ReadBuf, WriteBuf): return '[exception] {}'.format(errm) - def _connect_via_socks5(self) -> Optional[str]: - '''Connect to the target host:port via a SOCKS5 proxy. Returns None on success, or an error string.''' - assert self.__socks_proxy is not None - proxy_parts = self.__socks_proxy.rsplit(':', 1) - proxy_host = proxy_parts[0] - proxy_port = int(proxy_parts[1]) - s = None - try: - self.__outputbuffer.d("Connecting to SOCKS5 proxy %s:%d..." % (proxy_host, proxy_port), write_now=True) - s = socket.create_connection((proxy_host, proxy_port), timeout=self.__timeout) + def _connect_via_socks5(self) -> socket.socket: + '''Connect to the target host:port via a SOCKS5 proxy. Returns a socket on success, or raises an exception.''' - # SOCKS5 greeting: version=5, nmethods=1, method=0 (no auth) - s.sendall(b'\x05\x01\x00') - resp = self._socks5_recv_exact(s, 2) - if resp is None: - raise socket.error("no response from SOCKS5 proxy during handshake") - if resp[0] != 5: - raise socket.error("SOCKS5 proxy returned unexpected version: {}".format(resp[0])) - if resp[1] == 0xff: - raise socket.error("SOCKS5 proxy rejected all authentication methods") - if resp[1] != 0: - raise socket.error("SOCKS5 proxy requires authentication (method {:d}), but only no-auth is supported".format(resp[1])) + def __socks5_recv_exact(s: socket.socket, n: int) -> Optional[bytes]: + '''Read exactly n bytes from socket s, returning None on EOF.''' + buf = b'' + while len(buf) < n: + chunk = s.recv(n - len(buf)) + if not chunk: + return None + buf += chunk + return buf - # SOCKS5 connect request: version=5, cmd=1 (connect), rsv=0, atyp=3 (domain name) - host_bytes = self.__host.encode('idna') - request = struct.pack('!BBBB', 5, 1, 0, 3) + struct.pack('!B', len(host_bytes)) + host_bytes + struct.pack('!H', self.__port) - self.__outputbuffer.d("Requesting SOCKS5 proxy to connect to %s:%d..." % (self.__host, self.__port), write_now=True) - s.sendall(request) + # Parse the "host:port" string into its parts. + proxy_host, proxy_port = Utils.parse_host_and_port(self.__socks5_proxy) if self.__socks5_proxy is not None else ("", 0) - # Read the fixed part of the response (4 bytes: ver, rep, rsv, atyp) - hdr = self._socks5_recv_exact(s, 4) - if hdr is None: - raise socket.error("no response from SOCKS5 proxy during connect") - if hdr[0] != 5: - raise socket.error("SOCKS5 proxy returned unexpected version in connect response: {}".format(hdr[0])) - if hdr[1] != 0: - socks5_errors = { - 1: "general SOCKS server failure", - 2: "connection not allowed by ruleset", - 3: "network unreachable", - 4: "host unreachable", - 5: "connection refused", - 6: "TTL expired", - 7: "command not supported", - 8: "address type not supported", - } - msg = socks5_errors.get(hdr[1], "unknown error {:d}".format(hdr[1])) - raise socket.error("SOCKS5 proxy connect failed: {}".format(msg)) + self.__outputbuffer.d("Connecting to SOCKS5 proxy %s:%d..." % (proxy_host, proxy_port), write_now=True) + s = socket.create_connection((proxy_host, proxy_port), timeout=self.__timeout) - # Read and discard the bound address from the response - atyp = hdr[3] - if atyp == 1: # IPv4 - self._socks5_recv_exact(s, 4 + 2) - elif atyp == 4: # IPv6 - self._socks5_recv_exact(s, 16 + 2) - elif atyp == 3: # domain name - alen_data = self._socks5_recv_exact(s, 1) - if alen_data is None: - raise socket.error("truncated SOCKS5 response") - self._socks5_recv_exact(s, alen_data[0] + 2) - else: - raise socket.error("SOCKS5 proxy returned unknown address type: {}".format(atyp)) + # SOCKS5 greeting: version=5, nmethods=1, method=0 (no auth) + s.sendall(b'\x05\x01\x00') + resp = __socks5_recv_exact(s, 2) + if resp is None: + raise socket.error("no response from SOCKS5 proxy during handshake") + if resp[0] != 5: + raise socket.error("SOCKS5 proxy returned unexpected version: {}".format(resp[0])) + if resp[1] == 0xff: + raise socket.error("SOCKS5 proxy rejected all authentication methods") + if resp[1] != 0: + raise socket.error("SOCKS5 proxy requires authentication (method {:d}), but only no-auth is supported".format(resp[1])) - self.__sock = s - return None + # Set the type and host encoding appropriately, depending on if we're sending a hostname, IPv4, or IPv6 address. The ATYP field is 3 when the client is sending a hostname. + atyp = 3 + _enc_host = self.__host.encode('idna') + dst_addr = struct.pack('!B', len(_enc_host)) + _enc_host + if Utils.is_ipv4_address(self.__host): + atyp = 1 + dst_addr = socket.inet_pton(socket.AF_INET, self.__host) + elif Utils.is_ipv6_address(self.__host): + atyp = 4 + dst_addr = socket.inet_pton(socket.AF_INET6, self.__host) - except socket.error as e: - self._close_socket(s) - return '[exception] cannot connect via SOCKS5 proxy to {} port {}: {}'.format(self.__host, self.__port, e) + # SOCKS5 connect request: version=5, cmd=1 (connect), rsv=0, atyp + request = struct.pack('!BBBB', 5, 1, 0, atyp) + dst_addr + struct.pack('!H', self.__port) + self.__outputbuffer.d("Requesting SOCKS5 proxy to connect to %s:%d..." % (self.__host, self.__port), write_now=True) + s.sendall(request) + + # Read the fixed part of the response (4 bytes: ver, rep, rsv, atyp) + hdr = __socks5_recv_exact(s, 4) + if hdr is None: + raise socket.error("no response from SOCKS5 proxy during connect") + + server_version = hdr[0] + reply = hdr[1] + # rsv = hdr[2] # Reserved, set to 0 as per RFC1928. + atyp = hdr[3] + + if server_version != 5: + raise socket.error("SOCKS5 proxy returned unexpected version in connect response: {}".format(hdr[0])) + + if reply != 0: + socks5_errors = { + 1: "general SOCKS server failure", + 2: "connection not allowed by ruleset", + 3: "network unreachable", + 4: "host unreachable", + 5: "connection refused", + 6: "TTL expired", + 7: "command not supported", + 8: "address type not supported", + } + + err = socks5_errors[reply] if reply in socks5_errors else f"unknown error: {reply}" + raise socket.error("SOCKS5 proxy connect failed: {}".format(err)) + + # Read and discard the bound address from the response + if atyp == 1: # IPv4 + __socks5_recv_exact(s, 4 + 2) + elif atyp == 4: # IPv6 + __socks5_recv_exact(s, 16 + 2) + elif atyp == 3: # domain name + alen_data = __socks5_recv_exact(s, 1) + if alen_data is None: + raise socket.error("truncated SOCKS5 response") + __socks5_recv_exact(s, alen_data[0] + 2) + else: + raise socket.error("SOCKS5 proxy returned unknown address type: {}".format(atyp)) + + self.__outputbuffer.d("Successfully established SOCKS5 connection.", write_now=True) + return s - @staticmethod - def _socks5_recv_exact(s: socket.socket, n: int) -> Optional[bytes]: - '''Read exactly n bytes from socket s, returning None on EOF.''' - buf = b'' - while len(buf) < n: - chunk = s.recv(n - len(buf)) - if not chunk: - return None - buf += chunk - return buf def get_banner(self) -> Tuple[Optional['Banner'], List[str], Optional[str]]: self.__outputbuffer.d('Getting banner...', write_now=True) diff --git a/src/ssh_audit/utils.py b/src/ssh_audit/utils.py index ff7760f..371941a 100644 --- a/src/ssh_audit/utils.py +++ b/src/ssh_audit/utils.py @@ -153,6 +153,17 @@ class Utils: return host, port + @staticmethod + def is_ipv4_address(address: str) -> bool: + '''Returns True if address is an IPv4 address, otherwise False.''' + is_ipv4 = True + try: + ipaddress.IPv4Address(address) + except ipaddress.AddressValueError: + is_ipv4 = False + + return is_ipv4 + @staticmethod def is_ipv6_address(address: str) -> bool: '''Returns True if address is an IPv6 address, otherwise False.''' diff --git a/ssh-audit.1 b/ssh-audit.1 index 5368f8e..24f60d5 100644 --- a/ssh-audit.1 +++ b/ssh-audit.1 @@ -1,4 +1,4 @@ -.TH SSH-AUDIT 1 "June 14, 2026" +.TH SSH-AUDIT 1 "June 20, 2026" .SH NAME \fBssh-audit\fP \- SSH server & client configuration auditor .SH SYNOPSIS @@ -141,6 +141,11 @@ Runs a policy audit against a target using the specified policy (see \fBPOLICY A .br Skips the connection rate test during standard audits. By default, a few dozen TCP connections are created with the target host to see if connection throttling is implemented (this can safely infer whether the target is vulnerable to the DHEat attack; see CVE-2002-20001). +.TP +.B \-\-socks5 host:port +.br +Connects through a SOCKS5 proxy. Rate tests are automatically skipped (i.e.: implies \-\-skip\-rate\-test). + .TP .B -t, \-\-timeout= .br diff --git a/test/test_auditconf.py b/test/test_auditconf.py index d787710..a720c9b 100644 --- a/test/test_auditconf.py +++ b/test/test_auditconf.py @@ -166,17 +166,18 @@ class TestAuditConf: with pytest.raises(SystemExit): conf = c('-l something localhost') - def test_audit_conf_process_commandline_socks_proxy(self): + def test_audit_conf_process_commandline_socks5_proxy(self): c = lambda x: self.process_commandline(self.OutputBuffer, x.split()) # noqa - conf = c('--socks 127.0.0.1:1080 localhost') - assert conf.socks_proxy == '127.0.0.1:1080' + assert c('--socks5 127.0.0.1:1080 localhost').socks5_proxy == '127.0.0.1:1080' + assert c('--socks5 [fe80::aaaa:bbbb:cccc:dddd]:1080 localhost').socks5_proxy == '[fe80::aaaa:bbbb:cccc:dddd]:1080' + assert c('--socks5 somehost.lol:1080 localhost').socks5_proxy == 'somehost.lol:1080' with pytest.raises(SystemExit): - c('--socks localhost localhost') + c('--socks5 localhost localhost') with pytest.raises(SystemExit): - c('--socks localhost:0 localhost') + c('--socks5 localhost:0 localhost') with pytest.raises(SystemExit): - c('--socks localhost:65536 localhost') + c('--socks5 localhost:65536 localhost') diff --git a/test/test_socket.py b/test/test_socket.py index 2044006..9569768 100644 --- a/test/test_socket.py +++ b/test/test_socket.py @@ -40,8 +40,3 @@ class TestSocket: s, e = sock.send_packet() assert s == -1 assert e == 'not connected' - - def test_socks_proxy_rejected_for_unix_socket_targets(self, virtual_socket): - sock = self.ssh_socket(self.OutputBuffer(), 'unix:///tmp/test.sock', 22, socks_proxy='127.0.0.1:1080') - err = sock.connect() - assert err == '[exception] cannot use a SOCKS5 proxy with UNIX socket targets'