Added support for scanning servers over UNIX sockets. (#351)

This commit is contained in:
Joe Testa
2026-06-14 17:21:13 -04:00
parent abf5b8326a
commit 0ccb915f37
6 changed files with 70 additions and 20 deletions
+10 -2
View File
@@ -1,7 +1,7 @@
"""
The MIT License (MIT)
Copyright (C) 2023-2024 Joe Testa (jtesta@positronsecurity.com)
Copyright (C) 2023-2026 Joe Testa (jtesta@positronsecurity.com)
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
@@ -435,7 +435,11 @@ class DHEat:
s.setblocking(False)
# out.d("Creating socket (%u of %u already exist)..." % (len(socket_dict), concurrent_sockets), write_now=True)
ret = s.connect_ex((target_ip_address, aconf.port))
if target_address_family == socket.AF_UNIX:
ret = s.connect_ex(target_ip_address)
else:
ret = s.connect_ex((target_ip_address, aconf.port))
num_attempted_connections += 1
if ret in [0, errno.EINPROGRESS, errno.EWOULDBLOCK]:
socket_dict[s] = now
@@ -752,6 +756,10 @@ class DHEat:
def _resolve_hostname(host: str, ip_version_preference: List[int]) -> Tuple[int, str]:
'''Resolves a hostname to its IPv4 or IPv6 address, depending on user preference.'''
# First check if this is a UNIX socket.
if host.startswith("unix://"):
return int(socket.AF_UNIX), host[7:]
family = socket.AF_UNSPEC
if len(ip_version_preference) == 1:
family = socket.AF_INET if ip_version_preference[0] == 4 else socket.AF_INET6
+7 -5
View File
@@ -2,7 +2,7 @@
"""
The MIT License (MIT)
Copyright (C) 2017-2025 Joe Testa (jtesta@positronsecurity.com)
Copyright (C) 2017-2026 Joe Testa (jtesta@positronsecurity.com)
Copyright (C) 2017 Andris Raugulis (moo@arthepsy.eu)
Permission is hereby granted, free of charge, to any person obtaining a copy
@@ -915,19 +915,21 @@ def process_commandline(out: OutputBuffer, args: List[str]) -> 'AuditConf': # p
Hardening_Guides.print_hardening_guide(out, argument.get_hardening_guide)
sys.exit(exitcodes.GOOD)
# If we're doing a server audit against a single target.
if aconf.client_audit is False and aconf.target_file is None:
if oport is not None:
host = argument.host
else:
host = argument.host
if oport is None:
host, port = Utils.parse_host_and_port(argument.host)
if not host and aconf.target_file is None:
out.fail("target host is not specified", write_now=True)
sys.exit(exitcodes.UNKNOWN_ERROR)
if oport is None and aconf.client_audit: # The default port to listen on during a client audit is 2222.
# For client audits, if an explicit port isn't set, default to 2222/tcp.
if aconf.client_audit and oport is None:
port = 2222
# If an explicit port was set by the user, parse it.
if oport is not None:
port = Utils.parse_int(oport)
if port < 1 or port > 65535:
+39 -7
View File
@@ -1,7 +1,7 @@
"""
The MIT License (MIT)
Copyright (C) 2017-2025 Joe Testa (jtesta@positronsecurity.com)
Copyright (C) 2017-2026 Joe Testa (jtesta@positronsecurity.com)
Copyright (C) 2017 Andris Raugulis (moo@arthepsy.eu)
Permission is hereby granted, free of charge, to any person obtaining a copy
@@ -51,6 +51,7 @@ class SSH_Socket(ReadBuf, WriteBuf):
SM_BANNER_SENT = 1
def __init__(self, outputbuffer: 'OutputBuffer', host: Optional[str], port: int, ip_version_preference: List[int] = [], timeout: Union[int, float] = 5, timeout_set: bool = False) -> None: # pylint: disable=dangerous-default-value
super(SSH_Socket, self).__init__()
self.__outputbuffer = outputbuffer
@@ -73,6 +74,7 @@ class SSH_Socket(ReadBuf, WriteBuf):
self.client_host: Optional[str] = None
self.client_port = None
def _resolve(self) -> Iterable[Tuple[int, Tuple[Any, ...]]]:
"""Resolves a hostname into a list of IPs
Raises
@@ -95,11 +97,13 @@ class SSH_Socket(ReadBuf, WriteBuf):
if socktype == socket.SOCK_STREAM:
yield af, addr
# Listens on a server socket and accepts one connection (used for
# auditing client connections).
def listen_and_accept(self) -> None:
'''Listens on a server socket and accepts one connection (used for auditing client connections).'''
try:
self.__outputbuffer.d(f"Listening on 0.0.0.0:{self.__port}...", write_now=True)
# Socket to listen on all IPv4 addresses.
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
@@ -110,6 +114,8 @@ class SSH_Socket(ReadBuf, WriteBuf):
print("Warning: failed to listen on any IPv4 interfaces: %s" % str(e), file=sys.stderr)
try:
self.__outputbuffer.d(f"Listening on [::]:{self.__port}...", write_now=True)
# Socket to listen on all IPv6 addresses.
s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
@@ -150,18 +156,31 @@ class SSH_Socket(ReadBuf, WriteBuf):
c.settimeout(self.__timeout)
self.__sock = c
def connect(self) -> Optional[str]:
'''Returns None on success, or an error string.'''
err = None
s = None
try:
for af, addr in self._resolve():
s = socket.socket(af, socket.SOCK_STREAM)
# If we're connecting to a UNIX socket.
if self.__host.startswith("unix://"):
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.settimeout(self.__timeout)
self.__outputbuffer.d(("Connecting to %s:%d..." % ('[%s]' % addr[0] if Utils.is_ipv6_address(addr[0]) else addr[0], addr[1])), write_now=True)
s.connect(addr)
s.connect(self.__host[7:])
self.__sock = s
return None
# We're connecting to an Internet host.
else:
for af, addr in self._resolve():
s = socket.socket(af, socket.SOCK_STREAM)
s.settimeout(self.__timeout)
self.__outputbuffer.d(("Connecting to %s:%d..." % ('[%s]' % addr[0] if Utils.is_ipv6_address(addr[0]) else addr[0], addr[1])), write_now=True)
s.connect(addr)
self.__sock = s
return None
except socket.error as e:
err = e
self._close_socket(s)
@@ -170,8 +189,10 @@ class SSH_Socket(ReadBuf, WriteBuf):
else:
errt = (self.__host, self.__port, err)
errm = 'cannot connect to {} port {}: {}'.format(*errt)
return '[exception] {}'.format(errm)
def get_banner(self) -> Tuple[Optional['Banner'], List[str], Optional[str]]:
self.__outputbuffer.d('Getting banner...', write_now=True)
@@ -201,6 +222,7 @@ class SSH_Socket(ReadBuf, WriteBuf):
return self.__banner, self.__header, e
def recv(self, size: int = 2048) -> Tuple[int, Optional[str]]:
if self.__sock is None:
return -1, 'not connected'
@@ -221,6 +243,7 @@ class SSH_Socket(ReadBuf, WriteBuf):
self._buf.seek(pos, 0)
return len(data), None
def send(self, data: bytes) -> Tuple[int, Optional[str]]:
if self.__sock is None:
return -1, 'not connected'
@@ -243,16 +266,19 @@ class SSH_Socket(ReadBuf, WriteBuf):
kex.write(self)
self.send_packet()
def send_banner(self, banner: str) -> None:
self.send(banner.encode() + b'\r\n')
self.__state = max(self.__state, self.SM_BANNER_SENT)
def ensure_read(self, size: int) -> None:
while self.unread_len < size:
s, e = self.recv()
if s < 0:
raise SSH_Socket.InsufficientReadException(e)
def read_packet(self) -> Tuple[int, bytes]:
try:
header = WriteBuf()
@@ -284,6 +310,7 @@ class SSH_Socket(ReadBuf, WriteBuf):
e = ex.args[0].encode('utf-8')
return -1, e
def send_packet(self) -> Tuple[int, Optional[str]]:
payload = self.write_flush()
padding = -(len(payload) + 5) % 8
@@ -294,10 +321,12 @@ class SSH_Socket(ReadBuf, WriteBuf):
data = struct.pack('>Ib', plen, padding) + payload + pad_bytes
return self.send(data)
def is_connected(self) -> bool:
"""Returns true if this Socket is connected, False otherwise."""
return self.__sock is not None
def close(self) -> None:
self.__cleanup()
self.reset()
@@ -305,6 +334,7 @@ class SSH_Socket(ReadBuf, WriteBuf):
self.__header = []
self.__banner = None
def _close_socket(self, s: Optional[socket.socket]) -> None:
try:
if s is not None:
@@ -313,9 +343,11 @@ class SSH_Socket(ReadBuf, WriteBuf):
except Exception:
pass
def __del__(self) -> None:
self.__cleanup()
def __cleanup(self) -> None:
self._close_socket(self.__sock)
for sock in self.__sock_map.values():
+5 -1
View File
@@ -1,7 +1,7 @@
"""
The MIT License (MIT)
Copyright (C) 2017-2020 Joe Testa (jtesta@positronsecurity.com)
Copyright (C) 2017-2026 Joe Testa (jtesta@positronsecurity.com)
Copyright (C) 2017 Andris Raugulis (moo@arthepsy.eu)
Permission is hereby granted, free of charge, to any person obtaining a copy
@@ -134,6 +134,10 @@ class Utils:
host = host_and_port
port = default_port
# If we have a UNIX socket path, do no further processing.
if host.startswith("unix://"):
return host, 1
mx = re.match(r'^\[([^\]]+)\](?::(\d+))?$', host_and_port)
if mx is not None:
host = mx.group(1)