mirror of
https://github.com/jtesta/ssh-audit.git
synced 2026-06-24 05:19:41 +02:00
Expanded SOCKS5 protocol support, refactored exception handling, updated documentation, added more tests. (#347)
This commit is contained in:
@@ -48,7 +48,7 @@ usage: ssh-audit.py [-h] [-4] [-6] [-b] [-c] [-d]
|
|||||||
[--conn-rate-test N[:max_rate]] [--dheat N[:kex[:e_len]]]
|
[--conn-rate-test N[:max_rate]] [--dheat N[:kex[:e_len]]]
|
||||||
[--get-hardening-guide platform] [--list-hardening-guides]
|
[--get-hardening-guide platform] [--list-hardening-guides]
|
||||||
[--lookup alg1[,alg2,...]] [--skip-rate-test]
|
[--lookup alg1[,alg2,...]] [--skip-rate-test]
|
||||||
[--threads N]
|
[--socks5 host:port] [--threads N]
|
||||||
[host]
|
[host]
|
||||||
|
|
||||||
# ssh-audit.py v3.4.0-dev, https://github.com/jtesta/ssh-audit
|
# ssh-audit.py v3.4.0-dev, https://github.com/jtesta/ssh-audit
|
||||||
@@ -128,6 +128,7 @@ optional arguments:
|
|||||||
--skip-rate-test skip the connection rate test during standard audits
|
--skip-rate-test skip the connection rate test during standard audits
|
||||||
(used to safely infer whether the DHEat attack is
|
(used to safely infer whether the DHEat attack is
|
||||||
viable)
|
viable)
|
||||||
|
--socks5 host:port connect via a SOCKS5 proxy (implies --skip-rate-test)
|
||||||
--threads N number of threads to use when scanning multiple
|
--threads N number of threads to use when scanning multiple
|
||||||
targets (-T/--targets) (default: 32)
|
targets (-T/--targets) (default: 32)
|
||||||
```
|
```
|
||||||
@@ -263,6 +264,7 @@ For convenience, a web front-end on top of the command-line tool is available at
|
|||||||
- When running against multiple hosts, now prints each target host regardless of output level.
|
- When running against multiple hosts, now prints each target host regardless of output level.
|
||||||
- Batch mode (`-b`) no longer automatically enables verbose mode, due to sometimes confusing results; users can still explicitly enable verbose mode using the `-v` flag.
|
- Batch mode (`-b`) no longer automatically enables verbose mode, due to sometimes confusing results; users can still explicitly enable verbose mode using the `-v` flag.
|
||||||
- Added UNIX server socket scanning (specify the target with `unix:///path/to/socket`).
|
- Added UNIX server socket scanning (specify the target with `unix:///path/to/socket`).
|
||||||
|
- Added SOCKS5 proxy support (specify the proxy with `--socks5 host:port`); partial credit [Michał Majchrowicz](https://github.com/sectroyer).
|
||||||
- Updated built-in policy for Debian 12.
|
- Updated built-in policy for Debian 12.
|
||||||
- Added built-in policies for OpenSSH 10.0, 10.1, 10.2, and 10.3.
|
- Added built-in policies for OpenSSH 10.0, 10.1, 10.2, and 10.3.
|
||||||
- Added hardening guides and policies for Debian 13.
|
- Added hardening guides and policies for Debian 13.
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
"""
|
"""
|
||||||
The MIT License (MIT)
|
The MIT License (MIT)
|
||||||
|
|
||||||
Copyright (C) 2017-2025 Joe Testa (jtesta@positronsecurity.com)
|
Copyright (C) 2017-2026 Joe Testa (jtesta@positronsecurity.com)
|
||||||
Copyright (C) 2017 Andris Raugulis (moo@arthepsy.eu)
|
Copyright (C) 2017 Andris Raugulis (moo@arthepsy.eu)
|
||||||
|
|
||||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||||
@@ -67,7 +67,7 @@ class AuditConf:
|
|||||||
self.conn_rate_test_enabled = False
|
self.conn_rate_test_enabled = False
|
||||||
self.conn_rate_test_threads = 0
|
self.conn_rate_test_threads = 0
|
||||||
self.conn_rate_test_target_rate = 0
|
self.conn_rate_test_target_rate = 0
|
||||||
self.socks_proxy: Optional[str] = None # SOCKS5 proxy in "host:port" format
|
self.socks5_proxy: Optional[str] = None # SOCKS5 proxy in "host:port" format
|
||||||
|
|
||||||
|
|
||||||
def __setattr__(self, name: str, value: Union[str, int, float, bool, Sequence[int]]) -> None:
|
def __setattr__(self, name: str, value: Union[str, int, float, bool, Sequence[int]]) -> None:
|
||||||
@@ -96,7 +96,7 @@ class AuditConf:
|
|||||||
if value == -1.0:
|
if value == -1.0:
|
||||||
raise ValueError('invalid timeout: {}'.format(value))
|
raise ValueError('invalid timeout: {}'.format(value))
|
||||||
valid = True
|
valid = True
|
||||||
elif name in ['ip_version_preference', 'lookup', 'policy_file', 'policy', 'target_file', 'target_list', 'gex_test', 'socks_proxy']:
|
elif name in ['ip_version_preference', 'lookup', 'policy_file', 'policy', 'target_file', 'target_list', 'gex_test']:
|
||||||
valid = True
|
valid = True
|
||||||
elif name == "threads":
|
elif name == "threads":
|
||||||
valid, num_threads = True, Utils.parse_int(value)
|
valid, num_threads = True, Utils.parse_int(value)
|
||||||
@@ -186,6 +186,12 @@ class AuditConf:
|
|||||||
if not isinstance(value, int):
|
if not isinstance(value, int):
|
||||||
valid = False
|
valid = False
|
||||||
|
|
||||||
|
elif name == "socks5_proxy":
|
||||||
|
valid = True
|
||||||
|
if isinstance(value, str):
|
||||||
|
host, port = Utils.parse_host_and_port(value, 0)
|
||||||
|
if len(host) == 0 or port <= 0 or port >= 65535:
|
||||||
|
raise ValueError("SOCKS5 host must be in host:port format.")
|
||||||
|
|
||||||
if valid:
|
if valid:
|
||||||
object.__setattr__(self, name, value)
|
object.__setattr__(self, name, value)
|
||||||
|
|||||||
+10
-16
@@ -794,7 +794,7 @@ def process_commandline(out: OutputBuffer, args: List[str]) -> 'AuditConf': # p
|
|||||||
parser.add_argument("--list-hardening-guides", action="store_true", dest="list_hardening_guides", default=False, help="list all official, built-in hardening guides for common systems. Their full names can then be passed to --get-hardening-guide. Add -v to this option to view hardening guide change logs and prior versions.")
|
parser.add_argument("--list-hardening-guides", action="store_true", dest="list_hardening_guides", default=False, help="list all official, built-in hardening guides for common systems. Their full names can then be passed to --get-hardening-guide. Add -v to this option to view hardening guide change logs and prior versions.")
|
||||||
parser.add_argument("--lookup", action="store", dest="lookup", metavar="alg1[,alg2,...]", type=str, default=None, help="looks up an algorithm(s) without connecting to a server.")
|
parser.add_argument("--lookup", action="store", dest="lookup", metavar="alg1[,alg2,...]", type=str, default=None, help="looks up an algorithm(s) without connecting to a server.")
|
||||||
parser.add_argument("--skip-rate-test", action="store_true", dest="skip_rate_test", default=False, help="skip the connection rate test during standard audits (used to safely infer whether the DHEat attack is viable)")
|
parser.add_argument("--skip-rate-test", action="store_true", dest="skip_rate_test", default=False, help="skip the connection rate test during standard audits (used to safely infer whether the DHEat attack is viable)")
|
||||||
parser.add_argument("--socks", action="store", dest="socks_proxy", metavar="host:port", type=str, default=None, help="connect via a SOCKS5 proxy (e.g. 127.0.0.1:1080)")
|
parser.add_argument("--socks5", action="store", dest="socks5_proxy", metavar="host:port", type=str, default=None, help="connect via a SOCKS5 proxy (implies --skip-rate-test)")
|
||||||
parser.add_argument("--threads", action="store", dest="threads", metavar="N", type=int, default=32, help="number of threads to use when scanning multiple targets (-T/--targets) (default: %(default)s)")
|
parser.add_argument("--threads", action="store", dest="threads", metavar="N", type=int, default=32, help="number of threads to use when scanning multiple targets (-T/--targets) (default: %(default)s)")
|
||||||
|
|
||||||
|
|
||||||
@@ -818,23 +818,17 @@ def process_commandline(out: OutputBuffer, args: List[str]) -> 'AuditConf': # p
|
|||||||
aconf.list_policies = argument.list_policies
|
aconf.list_policies = argument.list_policies
|
||||||
aconf.manual = argument.manual
|
aconf.manual = argument.manual
|
||||||
aconf.skip_rate_test = argument.skip_rate_test
|
aconf.skip_rate_test = argument.skip_rate_test
|
||||||
|
aconf.socks5_proxy = argument.socks5_proxy
|
||||||
oport = argument.oport
|
oport = argument.oport
|
||||||
|
|
||||||
if argument.socks_proxy is not None:
|
|
||||||
# Validate format: must be "host:port"
|
|
||||||
socks_parts = argument.socks_proxy.rsplit(':', 1)
|
|
||||||
if len(socks_parts) != 2 or not socks_parts[1].isdigit():
|
|
||||||
out.fail("--socks must be in host:port format (e.g. 127.0.0.1:1080)", write_now=True)
|
|
||||||
sys.exit(exitcodes.UNKNOWN_ERROR)
|
|
||||||
socks_port = int(socks_parts[1])
|
|
||||||
if socks_port < 1 or socks_port > 65535:
|
|
||||||
out.fail("SOCKS proxy port must be between 1 and 65535", write_now=True)
|
|
||||||
sys.exit(exitcodes.UNKNOWN_ERROR)
|
|
||||||
aconf.socks_proxy = argument.socks_proxy
|
|
||||||
|
|
||||||
if argument.batch is True:
|
if argument.batch is True:
|
||||||
aconf.batch = True
|
aconf.batch = True
|
||||||
|
|
||||||
|
# Skip the rate test if we're using a SOCKS5 proxy.
|
||||||
|
if aconf.socks5_proxy is not None:
|
||||||
|
out.d("Disabling rate test since SOCKS5 proxy is set.", write_now=True)
|
||||||
|
aconf.skip_rate_test = True
|
||||||
|
|
||||||
# If one -j was given, turn on JSON output. If -jj was given, enable indentation.
|
# If one -j was given, turn on JSON output. If -jj was given, enable indentation.
|
||||||
aconf.json = argument.json > 0
|
aconf.json = argument.json > 0
|
||||||
if argument.json > 1:
|
if argument.json > 1:
|
||||||
@@ -901,8 +895,8 @@ def process_commandline(out: OutputBuffer, args: List[str]) -> 'AuditConf': # p
|
|||||||
aconf.verbose = True
|
aconf.verbose = True
|
||||||
out.verbose = True
|
out.verbose = True
|
||||||
|
|
||||||
except argparse.ArgumentError as err:
|
except (argparse.ArgumentError, ValueError) as err:
|
||||||
out.fail(str(err), write_now=True)
|
out.fail(f"Error: {str(err)}", write_now=True)
|
||||||
parser.print_help()
|
parser.print_help()
|
||||||
sys.exit(exitcodes.UNKNOWN_ERROR)
|
sys.exit(exitcodes.UNKNOWN_ERROR)
|
||||||
|
|
||||||
@@ -1159,7 +1153,7 @@ def audit(out: OutputBuffer, aconf: AuditConf, print_target: bool = False) -> in
|
|||||||
out.debug = aconf.debug
|
out.debug = aconf.debug
|
||||||
out.level = aconf.level
|
out.level = aconf.level
|
||||||
out.use_colors = aconf.colors
|
out.use_colors = aconf.colors
|
||||||
s = SSH_Socket(out, aconf.host, aconf.port, aconf.ip_version_preference, aconf.timeout, aconf.timeout_set, aconf.socks_proxy)
|
s = SSH_Socket(out, aconf.host, aconf.port, aconf.ip_version_preference, aconf.timeout, aconf.timeout_set, aconf.socks5_proxy)
|
||||||
|
|
||||||
if aconf.client_audit:
|
if aconf.client_audit:
|
||||||
out.v("Listening for client connection on port %d..." % aconf.port, write_now=True)
|
out.v("Listening for client connection on port %d..." % aconf.port, write_now=True)
|
||||||
|
|||||||
+91
-79
@@ -51,7 +51,7 @@ class SSH_Socket(ReadBuf, WriteBuf):
|
|||||||
|
|
||||||
SM_BANNER_SENT = 1
|
SM_BANNER_SENT = 1
|
||||||
|
|
||||||
def __init__(self, outputbuffer: 'OutputBuffer', host: Optional[str], port: int, ip_version_preference: List[int] = [], timeout: Union[int, float] = 5, timeout_set: bool = False, socks_proxy: Optional[str] = None) -> None: # pylint: disable=dangerous-default-value
|
def __init__(self, outputbuffer: 'OutputBuffer', host: Optional[str], port: int, ip_version_preference: List[int] = [], timeout: Union[int, float] = 5, timeout_set: bool = False, socks5_proxy: Optional[str] = None) -> None: # pylint: disable=dangerous-default-value
|
||||||
super(SSH_Socket, self).__init__()
|
super(SSH_Socket, self).__init__()
|
||||||
self.__outputbuffer = outputbuffer
|
self.__outputbuffer = outputbuffer
|
||||||
self.__sock: Optional[socket.socket] = None
|
self.__sock: Optional[socket.socket] = None
|
||||||
@@ -72,7 +72,7 @@ class SSH_Socket(ReadBuf, WriteBuf):
|
|||||||
self.__timeout_set = timeout_set
|
self.__timeout_set = timeout_set
|
||||||
self.client_host: Optional[str] = None
|
self.client_host: Optional[str] = None
|
||||||
self.client_port = None
|
self.client_port = None
|
||||||
self.__socks_proxy = socks_proxy # SOCKS5 proxy in "host:port" format, or None
|
self.__socks5_proxy = socks5_proxy # SOCKS5 proxy in "host:port" format, or None
|
||||||
|
|
||||||
|
|
||||||
def _resolve(self) -> Iterable[Tuple[int, Tuple[Any, ...]]]:
|
def _resolve(self) -> Iterable[Tuple[int, Tuple[Any, ...]]]:
|
||||||
@@ -165,16 +165,16 @@ class SSH_Socket(ReadBuf, WriteBuf):
|
|||||||
try:
|
try:
|
||||||
# If we're connecting to a UNIX socket.
|
# If we're connecting to a UNIX socket.
|
||||||
if self.__host.startswith("unix://"):
|
if self.__host.startswith("unix://"):
|
||||||
if self.__socks_proxy is not None:
|
|
||||||
return '[exception] cannot use a SOCKS5 proxy with UNIX socket targets'
|
|
||||||
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||||
s.settimeout(self.__timeout)
|
s.settimeout(self.__timeout)
|
||||||
s.connect(self.__host[7:])
|
s.connect(self.__host[7:])
|
||||||
self.__sock = s
|
self.__sock = s
|
||||||
return None
|
return None
|
||||||
|
|
||||||
if self.__socks_proxy is not None:
|
# If we're connecting through a SOCKS5 proxy.
|
||||||
return self._connect_via_socks5()
|
if self.__socks5_proxy is not None:
|
||||||
|
self.__sock = self._connect_via_socks5()
|
||||||
|
return None
|
||||||
|
|
||||||
# We're connecting to an Internet host.
|
# We're connecting to an Internet host.
|
||||||
for af, addr in self._resolve():
|
for af, addr in self._resolve():
|
||||||
@@ -188,6 +188,7 @@ class SSH_Socket(ReadBuf, WriteBuf):
|
|||||||
except socket.error as e:
|
except socket.error as e:
|
||||||
err = e
|
err = e
|
||||||
self._close_socket(s)
|
self._close_socket(s)
|
||||||
|
|
||||||
if err is None:
|
if err is None:
|
||||||
errm = 'host {} has no DNS records'.format(self.__host)
|
errm = 'host {} has no DNS records'.format(self.__host)
|
||||||
else:
|
else:
|
||||||
@@ -196,87 +197,98 @@ class SSH_Socket(ReadBuf, WriteBuf):
|
|||||||
|
|
||||||
return '[exception] {}'.format(errm)
|
return '[exception] {}'.format(errm)
|
||||||
|
|
||||||
def _connect_via_socks5(self) -> Optional[str]:
|
|
||||||
'''Connect to the target host:port via a SOCKS5 proxy. Returns None on success, or an error string.'''
|
|
||||||
assert self.__socks_proxy is not None
|
|
||||||
proxy_parts = self.__socks_proxy.rsplit(':', 1)
|
|
||||||
proxy_host = proxy_parts[0]
|
|
||||||
proxy_port = int(proxy_parts[1])
|
|
||||||
|
|
||||||
s = None
|
def _connect_via_socks5(self) -> socket.socket:
|
||||||
try:
|
'''Connect to the target host:port via a SOCKS5 proxy. Returns a socket on success, or raises an exception.'''
|
||||||
self.__outputbuffer.d("Connecting to SOCKS5 proxy %s:%d..." % (proxy_host, proxy_port), write_now=True)
|
|
||||||
s = socket.create_connection((proxy_host, proxy_port), timeout=self.__timeout)
|
|
||||||
|
|
||||||
# SOCKS5 greeting: version=5, nmethods=1, method=0 (no auth)
|
def __socks5_recv_exact(s: socket.socket, n: int) -> Optional[bytes]:
|
||||||
s.sendall(b'\x05\x01\x00')
|
'''Read exactly n bytes from socket s, returning None on EOF.'''
|
||||||
resp = self._socks5_recv_exact(s, 2)
|
buf = b''
|
||||||
if resp is None:
|
while len(buf) < n:
|
||||||
raise socket.error("no response from SOCKS5 proxy during handshake")
|
chunk = s.recv(n - len(buf))
|
||||||
if resp[0] != 5:
|
if not chunk:
|
||||||
raise socket.error("SOCKS5 proxy returned unexpected version: {}".format(resp[0]))
|
return None
|
||||||
if resp[1] == 0xff:
|
buf += chunk
|
||||||
raise socket.error("SOCKS5 proxy rejected all authentication methods")
|
return buf
|
||||||
if resp[1] != 0:
|
|
||||||
raise socket.error("SOCKS5 proxy requires authentication (method {:d}), but only no-auth is supported".format(resp[1]))
|
|
||||||
|
|
||||||
# SOCKS5 connect request: version=5, cmd=1 (connect), rsv=0, atyp=3 (domain name)
|
# Parse the "host:port" string into its parts.
|
||||||
host_bytes = self.__host.encode('idna')
|
proxy_host, proxy_port = Utils.parse_host_and_port(self.__socks5_proxy) if self.__socks5_proxy is not None else ("", 0)
|
||||||
request = struct.pack('!BBBB', 5, 1, 0, 3) + struct.pack('!B', len(host_bytes)) + host_bytes + struct.pack('!H', self.__port)
|
|
||||||
self.__outputbuffer.d("Requesting SOCKS5 proxy to connect to %s:%d..." % (self.__host, self.__port), write_now=True)
|
|
||||||
s.sendall(request)
|
|
||||||
|
|
||||||
# Read the fixed part of the response (4 bytes: ver, rep, rsv, atyp)
|
self.__outputbuffer.d("Connecting to SOCKS5 proxy %s:%d..." % (proxy_host, proxy_port), write_now=True)
|
||||||
hdr = self._socks5_recv_exact(s, 4)
|
s = socket.create_connection((proxy_host, proxy_port), timeout=self.__timeout)
|
||||||
if hdr is None:
|
|
||||||
raise socket.error("no response from SOCKS5 proxy during connect")
|
|
||||||
if hdr[0] != 5:
|
|
||||||
raise socket.error("SOCKS5 proxy returned unexpected version in connect response: {}".format(hdr[0]))
|
|
||||||
if hdr[1] != 0:
|
|
||||||
socks5_errors = {
|
|
||||||
1: "general SOCKS server failure",
|
|
||||||
2: "connection not allowed by ruleset",
|
|
||||||
3: "network unreachable",
|
|
||||||
4: "host unreachable",
|
|
||||||
5: "connection refused",
|
|
||||||
6: "TTL expired",
|
|
||||||
7: "command not supported",
|
|
||||||
8: "address type not supported",
|
|
||||||
}
|
|
||||||
msg = socks5_errors.get(hdr[1], "unknown error {:d}".format(hdr[1]))
|
|
||||||
raise socket.error("SOCKS5 proxy connect failed: {}".format(msg))
|
|
||||||
|
|
||||||
# Read and discard the bound address from the response
|
# SOCKS5 greeting: version=5, nmethods=1, method=0 (no auth)
|
||||||
atyp = hdr[3]
|
s.sendall(b'\x05\x01\x00')
|
||||||
if atyp == 1: # IPv4
|
resp = __socks5_recv_exact(s, 2)
|
||||||
self._socks5_recv_exact(s, 4 + 2)
|
if resp is None:
|
||||||
elif atyp == 4: # IPv6
|
raise socket.error("no response from SOCKS5 proxy during handshake")
|
||||||
self._socks5_recv_exact(s, 16 + 2)
|
if resp[0] != 5:
|
||||||
elif atyp == 3: # domain name
|
raise socket.error("SOCKS5 proxy returned unexpected version: {}".format(resp[0]))
|
||||||
alen_data = self._socks5_recv_exact(s, 1)
|
if resp[1] == 0xff:
|
||||||
if alen_data is None:
|
raise socket.error("SOCKS5 proxy rejected all authentication methods")
|
||||||
raise socket.error("truncated SOCKS5 response")
|
if resp[1] != 0:
|
||||||
self._socks5_recv_exact(s, alen_data[0] + 2)
|
raise socket.error("SOCKS5 proxy requires authentication (method {:d}), but only no-auth is supported".format(resp[1]))
|
||||||
else:
|
|
||||||
raise socket.error("SOCKS5 proxy returned unknown address type: {}".format(atyp))
|
|
||||||
|
|
||||||
self.__sock = s
|
# Set the type and host encoding appropriately, depending on if we're sending a hostname, IPv4, or IPv6 address. The ATYP field is 3 when the client is sending a hostname.
|
||||||
return None
|
atyp = 3
|
||||||
|
_enc_host = self.__host.encode('idna')
|
||||||
|
dst_addr = struct.pack('!B', len(_enc_host)) + _enc_host
|
||||||
|
if Utils.is_ipv4_address(self.__host):
|
||||||
|
atyp = 1
|
||||||
|
dst_addr = socket.inet_pton(socket.AF_INET, self.__host)
|
||||||
|
elif Utils.is_ipv6_address(self.__host):
|
||||||
|
atyp = 4
|
||||||
|
dst_addr = socket.inet_pton(socket.AF_INET6, self.__host)
|
||||||
|
|
||||||
except socket.error as e:
|
# SOCKS5 connect request: version=5, cmd=1 (connect), rsv=0, atyp
|
||||||
self._close_socket(s)
|
request = struct.pack('!BBBB', 5, 1, 0, atyp) + dst_addr + struct.pack('!H', self.__port)
|
||||||
return '[exception] cannot connect via SOCKS5 proxy to {} port {}: {}'.format(self.__host, self.__port, e)
|
self.__outputbuffer.d("Requesting SOCKS5 proxy to connect to %s:%d..." % (self.__host, self.__port), write_now=True)
|
||||||
|
s.sendall(request)
|
||||||
|
|
||||||
|
# Read the fixed part of the response (4 bytes: ver, rep, rsv, atyp)
|
||||||
|
hdr = __socks5_recv_exact(s, 4)
|
||||||
|
if hdr is None:
|
||||||
|
raise socket.error("no response from SOCKS5 proxy during connect")
|
||||||
|
|
||||||
|
server_version = hdr[0]
|
||||||
|
reply = hdr[1]
|
||||||
|
# rsv = hdr[2] # Reserved, set to 0 as per RFC1928.
|
||||||
|
atyp = hdr[3]
|
||||||
|
|
||||||
|
if server_version != 5:
|
||||||
|
raise socket.error("SOCKS5 proxy returned unexpected version in connect response: {}".format(hdr[0]))
|
||||||
|
|
||||||
|
if reply != 0:
|
||||||
|
socks5_errors = {
|
||||||
|
1: "general SOCKS server failure",
|
||||||
|
2: "connection not allowed by ruleset",
|
||||||
|
3: "network unreachable",
|
||||||
|
4: "host unreachable",
|
||||||
|
5: "connection refused",
|
||||||
|
6: "TTL expired",
|
||||||
|
7: "command not supported",
|
||||||
|
8: "address type not supported",
|
||||||
|
}
|
||||||
|
|
||||||
|
err = socks5_errors[reply] if reply in socks5_errors else f"unknown error: {reply}"
|
||||||
|
raise socket.error("SOCKS5 proxy connect failed: {}".format(err))
|
||||||
|
|
||||||
|
# Read and discard the bound address from the response
|
||||||
|
if atyp == 1: # IPv4
|
||||||
|
__socks5_recv_exact(s, 4 + 2)
|
||||||
|
elif atyp == 4: # IPv6
|
||||||
|
__socks5_recv_exact(s, 16 + 2)
|
||||||
|
elif atyp == 3: # domain name
|
||||||
|
alen_data = __socks5_recv_exact(s, 1)
|
||||||
|
if alen_data is None:
|
||||||
|
raise socket.error("truncated SOCKS5 response")
|
||||||
|
__socks5_recv_exact(s, alen_data[0] + 2)
|
||||||
|
else:
|
||||||
|
raise socket.error("SOCKS5 proxy returned unknown address type: {}".format(atyp))
|
||||||
|
|
||||||
|
self.__outputbuffer.d("Successfully established SOCKS5 connection.", write_now=True)
|
||||||
|
return s
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _socks5_recv_exact(s: socket.socket, n: int) -> Optional[bytes]:
|
|
||||||
'''Read exactly n bytes from socket s, returning None on EOF.'''
|
|
||||||
buf = b''
|
|
||||||
while len(buf) < n:
|
|
||||||
chunk = s.recv(n - len(buf))
|
|
||||||
if not chunk:
|
|
||||||
return None
|
|
||||||
buf += chunk
|
|
||||||
return buf
|
|
||||||
|
|
||||||
def get_banner(self) -> Tuple[Optional['Banner'], List[str], Optional[str]]:
|
def get_banner(self) -> Tuple[Optional['Banner'], List[str], Optional[str]]:
|
||||||
self.__outputbuffer.d('Getting banner...', write_now=True)
|
self.__outputbuffer.d('Getting banner...', write_now=True)
|
||||||
|
|||||||
@@ -153,6 +153,17 @@ class Utils:
|
|||||||
|
|
||||||
return host, port
|
return host, port
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def is_ipv4_address(address: str) -> bool:
|
||||||
|
'''Returns True if address is an IPv4 address, otherwise False.'''
|
||||||
|
is_ipv4 = True
|
||||||
|
try:
|
||||||
|
ipaddress.IPv4Address(address)
|
||||||
|
except ipaddress.AddressValueError:
|
||||||
|
is_ipv4 = False
|
||||||
|
|
||||||
|
return is_ipv4
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def is_ipv6_address(address: str) -> bool:
|
def is_ipv6_address(address: str) -> bool:
|
||||||
'''Returns True if address is an IPv6 address, otherwise False.'''
|
'''Returns True if address is an IPv6 address, otherwise False.'''
|
||||||
|
|||||||
+6
-1
@@ -1,4 +1,4 @@
|
|||||||
.TH SSH-AUDIT 1 "June 14, 2026"
|
.TH SSH-AUDIT 1 "June 20, 2026"
|
||||||
.SH NAME
|
.SH NAME
|
||||||
\fBssh-audit\fP \- SSH server & client configuration auditor
|
\fBssh-audit\fP \- SSH server & client configuration auditor
|
||||||
.SH SYNOPSIS
|
.SH SYNOPSIS
|
||||||
@@ -141,6 +141,11 @@ Runs a policy audit against a target using the specified policy (see \fBPOLICY A
|
|||||||
.br
|
.br
|
||||||
Skips the connection rate test during standard audits. By default, a few dozen TCP connections are created with the target host to see if connection throttling is implemented (this can safely infer whether the target is vulnerable to the DHEat attack; see CVE-2002-20001).
|
Skips the connection rate test during standard audits. By default, a few dozen TCP connections are created with the target host to see if connection throttling is implemented (this can safely infer whether the target is vulnerable to the DHEat attack; see CVE-2002-20001).
|
||||||
|
|
||||||
|
.TP
|
||||||
|
.B \-\-socks5 host:port
|
||||||
|
.br
|
||||||
|
Connects through a SOCKS5 proxy. Rate tests are automatically skipped (i.e.: implies \-\-skip\-rate\-test).
|
||||||
|
|
||||||
.TP
|
.TP
|
||||||
.B -t, \-\-timeout=<secs>
|
.B -t, \-\-timeout=<secs>
|
||||||
.br
|
.br
|
||||||
|
|||||||
@@ -166,17 +166,18 @@ class TestAuditConf:
|
|||||||
with pytest.raises(SystemExit):
|
with pytest.raises(SystemExit):
|
||||||
conf = c('-l something localhost')
|
conf = c('-l something localhost')
|
||||||
|
|
||||||
def test_audit_conf_process_commandline_socks_proxy(self):
|
def test_audit_conf_process_commandline_socks5_proxy(self):
|
||||||
c = lambda x: self.process_commandline(self.OutputBuffer, x.split()) # noqa
|
c = lambda x: self.process_commandline(self.OutputBuffer, x.split()) # noqa
|
||||||
|
|
||||||
conf = c('--socks 127.0.0.1:1080 localhost')
|
assert c('--socks5 127.0.0.1:1080 localhost').socks5_proxy == '127.0.0.1:1080'
|
||||||
assert conf.socks_proxy == '127.0.0.1:1080'
|
assert c('--socks5 [fe80::aaaa:bbbb:cccc:dddd]:1080 localhost').socks5_proxy == '[fe80::aaaa:bbbb:cccc:dddd]:1080'
|
||||||
|
assert c('--socks5 somehost.lol:1080 localhost').socks5_proxy == 'somehost.lol:1080'
|
||||||
|
|
||||||
with pytest.raises(SystemExit):
|
with pytest.raises(SystemExit):
|
||||||
c('--socks localhost localhost')
|
c('--socks5 localhost localhost')
|
||||||
|
|
||||||
with pytest.raises(SystemExit):
|
with pytest.raises(SystemExit):
|
||||||
c('--socks localhost:0 localhost')
|
c('--socks5 localhost:0 localhost')
|
||||||
|
|
||||||
with pytest.raises(SystemExit):
|
with pytest.raises(SystemExit):
|
||||||
c('--socks localhost:65536 localhost')
|
c('--socks5 localhost:65536 localhost')
|
||||||
|
|||||||
@@ -40,8 +40,3 @@ class TestSocket:
|
|||||||
s, e = sock.send_packet()
|
s, e = sock.send_packet()
|
||||||
assert s == -1
|
assert s == -1
|
||||||
assert e == 'not connected'
|
assert e == 'not connected'
|
||||||
|
|
||||||
def test_socks_proxy_rejected_for_unix_socket_targets(self, virtual_socket):
|
|
||||||
sock = self.ssh_socket(self.OutputBuffer(), 'unix:///tmp/test.sock', 22, socks_proxy='127.0.0.1:1080')
|
|
||||||
err = sock.connect()
|
|
||||||
assert err == '[exception] cannot use a SOCKS5 proxy with UNIX socket targets'
|
|
||||||
|
|||||||
Reference in New Issue
Block a user