Add --socks argument for scanning via SOCKS5 proxy (#347)

* Add --socks argument for scanning via SOCKS5 proxy

Implement SOCKS5 proxy support without external dependencies:
- Add socks_proxy field to AuditConf
- Add --socks host:port CLI argument with input validation
- Implement SOCKS5 handshake (no-auth, domain-name addressing) in
  SSH_Socket._connect_via_socks5()

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Signed-off-by: Michał Majchrowicz <sectroyer@gmail.com>

* Add SOCKS proxy regression tests

---------

Signed-off-by: Michał Majchrowicz <sectroyer@gmail.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
Michał Majchrowicz
2026-06-20 15:57:26 +02:00
committed by GitHub
parent 3bd2dd95a9
commit f369689cd1
5 changed files with 132 additions and 13 deletions
+2 -1
View File
@@ -67,6 +67,7 @@ class AuditConf:
self.conn_rate_test_enabled = False self.conn_rate_test_enabled = False
self.conn_rate_test_threads = 0 self.conn_rate_test_threads = 0
self.conn_rate_test_target_rate = 0 self.conn_rate_test_target_rate = 0
self.socks_proxy: Optional[str] = None # SOCKS5 proxy in "host:port" format
def __setattr__(self, name: str, value: Union[str, int, float, bool, Sequence[int]]) -> None: def __setattr__(self, name: str, value: Union[str, int, float, bool, Sequence[int]]) -> None:
@@ -95,7 +96,7 @@ class AuditConf:
if value == -1.0: if value == -1.0:
raise ValueError('invalid timeout: {}'.format(value)) raise ValueError('invalid timeout: {}'.format(value))
valid = True valid = True
elif name in ['ip_version_preference', 'lookup', 'policy_file', 'policy', 'target_file', 'target_list', 'gex_test']: elif name in ['ip_version_preference', 'lookup', 'policy_file', 'policy', 'target_file', 'target_list', 'gex_test', 'socks_proxy']:
valid = True valid = True
elif name == "threads": elif name == "threads":
valid, num_threads = True, Utils.parse_int(value) valid, num_threads = True, Utils.parse_int(value)
+14 -1
View File
@@ -794,6 +794,7 @@ def process_commandline(out: OutputBuffer, args: List[str]) -> 'AuditConf': # p
parser.add_argument("--list-hardening-guides", action="store_true", dest="list_hardening_guides", default=False, help="list all official, built-in hardening guides for common systems. Their full names can then be passed to --get-hardening-guide. Add -v to this option to view hardening guide change logs and prior versions.") parser.add_argument("--list-hardening-guides", action="store_true", dest="list_hardening_guides", default=False, help="list all official, built-in hardening guides for common systems. Their full names can then be passed to --get-hardening-guide. Add -v to this option to view hardening guide change logs and prior versions.")
parser.add_argument("--lookup", action="store", dest="lookup", metavar="alg1[,alg2,...]", type=str, default=None, help="looks up an algorithm(s) without connecting to a server.") parser.add_argument("--lookup", action="store", dest="lookup", metavar="alg1[,alg2,...]", type=str, default=None, help="looks up an algorithm(s) without connecting to a server.")
parser.add_argument("--skip-rate-test", action="store_true", dest="skip_rate_test", default=False, help="skip the connection rate test during standard audits (used to safely infer whether the DHEat attack is viable)") parser.add_argument("--skip-rate-test", action="store_true", dest="skip_rate_test", default=False, help="skip the connection rate test during standard audits (used to safely infer whether the DHEat attack is viable)")
parser.add_argument("--socks", action="store", dest="socks_proxy", metavar="host:port", type=str, default=None, help="connect via a SOCKS5 proxy (e.g. 127.0.0.1:1080)")
parser.add_argument("--threads", action="store", dest="threads", metavar="N", type=int, default=32, help="number of threads to use when scanning multiple targets (-T/--targets) (default: %(default)s)") parser.add_argument("--threads", action="store", dest="threads", metavar="N", type=int, default=32, help="number of threads to use when scanning multiple targets (-T/--targets) (default: %(default)s)")
@@ -819,6 +820,18 @@ def process_commandline(out: OutputBuffer, args: List[str]) -> 'AuditConf': # p
aconf.skip_rate_test = argument.skip_rate_test aconf.skip_rate_test = argument.skip_rate_test
oport = argument.oport oport = argument.oport
if argument.socks_proxy is not None:
# Validate format: must be "host:port"
socks_parts = argument.socks_proxy.rsplit(':', 1)
if len(socks_parts) != 2 or not socks_parts[1].isdigit():
out.fail("--socks must be in host:port format (e.g. 127.0.0.1:1080)", write_now=True)
sys.exit(exitcodes.UNKNOWN_ERROR)
socks_port = int(socks_parts[1])
if socks_port < 1 or socks_port > 65535:
out.fail("SOCKS proxy port must be between 1 and 65535", write_now=True)
sys.exit(exitcodes.UNKNOWN_ERROR)
aconf.socks_proxy = argument.socks_proxy
if argument.batch is True: if argument.batch is True:
aconf.batch = True aconf.batch = True
@@ -1146,7 +1159,7 @@ def audit(out: OutputBuffer, aconf: AuditConf, print_target: bool = False) -> in
out.debug = aconf.debug out.debug = aconf.debug
out.level = aconf.level out.level = aconf.level
out.use_colors = aconf.colors out.use_colors = aconf.colors
s = SSH_Socket(out, aconf.host, aconf.port, aconf.ip_version_preference, aconf.timeout, aconf.timeout_set) s = SSH_Socket(out, aconf.host, aconf.port, aconf.ip_version_preference, aconf.timeout, aconf.timeout_set, aconf.socks_proxy)
if aconf.client_audit: if aconf.client_audit:
out.v("Listening for client connection on port %d..." % aconf.port, write_now=True) out.v("Listening for client connection on port %d..." % aconf.port, write_now=True)
+89 -4
View File
@@ -51,8 +51,7 @@ class SSH_Socket(ReadBuf, WriteBuf):
SM_BANNER_SENT = 1 SM_BANNER_SENT = 1
def __init__(self, outputbuffer: 'OutputBuffer', host: Optional[str], port: int, ip_version_preference: List[int] = [], timeout: Union[int, float] = 5, timeout_set: bool = False, socks_proxy: Optional[str] = None) -> None: # pylint: disable=dangerous-default-value
def __init__(self, outputbuffer: 'OutputBuffer', host: Optional[str], port: int, ip_version_preference: List[int] = [], timeout: Union[int, float] = 5, timeout_set: bool = False) -> None: # pylint: disable=dangerous-default-value
super(SSH_Socket, self).__init__() super(SSH_Socket, self).__init__()
self.__outputbuffer = outputbuffer self.__outputbuffer = outputbuffer
self.__sock: Optional[socket.socket] = None self.__sock: Optional[socket.socket] = None
@@ -73,6 +72,7 @@ class SSH_Socket(ReadBuf, WriteBuf):
self.__timeout_set = timeout_set self.__timeout_set = timeout_set
self.client_host: Optional[str] = None self.client_host: Optional[str] = None
self.client_port = None self.client_port = None
self.__socks_proxy = socks_proxy # SOCKS5 proxy in "host:port" format, or None
def _resolve(self) -> Iterable[Tuple[int, Tuple[Any, ...]]]: def _resolve(self) -> Iterable[Tuple[int, Tuple[Any, ...]]]:
@@ -161,18 +161,22 @@ class SSH_Socket(ReadBuf, WriteBuf):
'''Returns None on success, or an error string.''' '''Returns None on success, or an error string.'''
err = None err = None
s = None s = None
try:
try:
# If we're connecting to a UNIX socket. # If we're connecting to a UNIX socket.
if self.__host.startswith("unix://"): if self.__host.startswith("unix://"):
if self.__socks_proxy is not None:
return '[exception] cannot use a SOCKS5 proxy with UNIX socket targets'
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.settimeout(self.__timeout) s.settimeout(self.__timeout)
s.connect(self.__host[7:]) s.connect(self.__host[7:])
self.__sock = s self.__sock = s
return None return None
if self.__socks_proxy is not None:
return self._connect_via_socks5()
# We're connecting to an Internet host. # We're connecting to an Internet host.
else:
for af, addr in self._resolve(): for af, addr in self._resolve():
s = socket.socket(af, socket.SOCK_STREAM) s = socket.socket(af, socket.SOCK_STREAM)
s.settimeout(self.__timeout) s.settimeout(self.__timeout)
@@ -192,6 +196,87 @@ class SSH_Socket(ReadBuf, WriteBuf):
return '[exception] {}'.format(errm) return '[exception] {}'.format(errm)
def _connect_via_socks5(self) -> Optional[str]:
'''Connect to the target host:port via a SOCKS5 proxy. Returns None on success, or an error string.'''
assert self.__socks_proxy is not None
proxy_parts = self.__socks_proxy.rsplit(':', 1)
proxy_host = proxy_parts[0]
proxy_port = int(proxy_parts[1])
s = None
try:
self.__outputbuffer.d("Connecting to SOCKS5 proxy %s:%d..." % (proxy_host, proxy_port), write_now=True)
s = socket.create_connection((proxy_host, proxy_port), timeout=self.__timeout)
# SOCKS5 greeting: version=5, nmethods=1, method=0 (no auth)
s.sendall(b'\x05\x01\x00')
resp = self._socks5_recv_exact(s, 2)
if resp is None:
raise socket.error("no response from SOCKS5 proxy during handshake")
if resp[0] != 5:
raise socket.error("SOCKS5 proxy returned unexpected version: {}".format(resp[0]))
if resp[1] == 0xff:
raise socket.error("SOCKS5 proxy rejected all authentication methods")
if resp[1] != 0:
raise socket.error("SOCKS5 proxy requires authentication (method {:d}), but only no-auth is supported".format(resp[1]))
# SOCKS5 connect request: version=5, cmd=1 (connect), rsv=0, atyp=3 (domain name)
host_bytes = self.__host.encode('idna')
request = struct.pack('!BBBB', 5, 1, 0, 3) + struct.pack('!B', len(host_bytes)) + host_bytes + struct.pack('!H', self.__port)
self.__outputbuffer.d("Requesting SOCKS5 proxy to connect to %s:%d..." % (self.__host, self.__port), write_now=True)
s.sendall(request)
# Read the fixed part of the response (4 bytes: ver, rep, rsv, atyp)
hdr = self._socks5_recv_exact(s, 4)
if hdr is None:
raise socket.error("no response from SOCKS5 proxy during connect")
if hdr[0] != 5:
raise socket.error("SOCKS5 proxy returned unexpected version in connect response: {}".format(hdr[0]))
if hdr[1] != 0:
socks5_errors = {
1: "general SOCKS server failure",
2: "connection not allowed by ruleset",
3: "network unreachable",
4: "host unreachable",
5: "connection refused",
6: "TTL expired",
7: "command not supported",
8: "address type not supported",
}
msg = socks5_errors.get(hdr[1], "unknown error {:d}".format(hdr[1]))
raise socket.error("SOCKS5 proxy connect failed: {}".format(msg))
# Read and discard the bound address from the response
atyp = hdr[3]
if atyp == 1: # IPv4
self._socks5_recv_exact(s, 4 + 2)
elif atyp == 4: # IPv6
self._socks5_recv_exact(s, 16 + 2)
elif atyp == 3: # domain name
alen_data = self._socks5_recv_exact(s, 1)
if alen_data is None:
raise socket.error("truncated SOCKS5 response")
self._socks5_recv_exact(s, alen_data[0] + 2)
else:
raise socket.error("SOCKS5 proxy returned unknown address type: {}".format(atyp))
self.__sock = s
return None
except socket.error as e:
self._close_socket(s)
return '[exception] cannot connect via SOCKS5 proxy to {} port {}: {}'.format(self.__host, self.__port, e)
@staticmethod
def _socks5_recv_exact(s: socket.socket, n: int) -> Optional[bytes]:
'''Read exactly n bytes from socket s, returning None on EOF.'''
buf = b''
while len(buf) < n:
chunk = s.recv(n - len(buf))
if not chunk:
return None
buf += chunk
return buf
def get_banner(self) -> Tuple[Optional['Banner'], List[str], Optional[str]]: def get_banner(self) -> Tuple[Optional['Banner'], List[str], Optional[str]]:
self.__outputbuffer.d('Getting banner...', write_now=True) self.__outputbuffer.d('Getting banner...', write_now=True)
+15
View File
@@ -165,3 +165,18 @@ class TestAuditConf:
self._test_conf(conf, host='localhost', level='fail') self._test_conf(conf, host='localhost', level='fail')
with pytest.raises(SystemExit): with pytest.raises(SystemExit):
conf = c('-l something localhost') conf = c('-l something localhost')
def test_audit_conf_process_commandline_socks_proxy(self):
c = lambda x: self.process_commandline(self.OutputBuffer, x.split()) # noqa
conf = c('--socks 127.0.0.1:1080 localhost')
assert conf.socks_proxy == '127.0.0.1:1080'
with pytest.raises(SystemExit):
c('--socks localhost localhost')
with pytest.raises(SystemExit):
c('--socks localhost:0 localhost')
with pytest.raises(SystemExit):
c('--socks localhost:65536 localhost')
+5
View File
@@ -40,3 +40,8 @@ class TestSocket:
s, e = sock.send_packet() s, e = sock.send_packet()
assert s == -1 assert s == -1
assert e == 'not connected' assert e == 'not connected'
def test_socks_proxy_rejected_for_unix_socket_targets(self, virtual_socket):
sock = self.ssh_socket(self.OutputBuffer(), 'unix:///tmp/test.sock', 22, socks_proxy='127.0.0.1:1080')
err = sock.connect()
assert err == '[exception] cannot use a SOCKS5 proxy with UNIX socket targets'