mirror of
https://github.com/jtesta/ssh-audit.git
synced 2026-05-25 15:31:23 +02:00
Compare commits
4 Commits
c095efe65a
...
aae7762e36
| Author | SHA1 | Date | |
|---|---|---|---|
| aae7762e36 | |||
| ea117b203b | |||
| d8f8b7c57c | |||
| b2e621cafc |
@@ -217,6 +217,7 @@ For convenience, a web front-end on top of the command-line tool is available at
|
||||
- Added built-in policies for Ubuntu 24.04 LTS server and client, and OpenSSH 9.8.
|
||||
- Added IPv6 support for DHEat and connection rate tests.
|
||||
- Fixed crash when running with `-P` and `-T` options simultaneously.
|
||||
- Fixed host key tests from only reporting a key type at most once despite multiple hosts supporting it; credit [Daniel Lenski](https://github.com/dlenskiSB).
|
||||
|
||||
### v3.2.0 (2024-04-22)
|
||||
- Added implementation of the DHEat denial-of-service attack (see `--dheat` option; [CVE-2002-20001](https://nvd.nist.gov/vuln/detail/CVE-2002-20001)).
|
||||
|
||||
@@ -110,7 +110,7 @@ class GEXTest:
|
||||
# before continuing to issue reconnects.
|
||||
modulus_size_returned, reconnect_failed = GEXTest._send_init(out, s, kex_group, kex, gex_alg, bits_min, bits_pref, bits_max)
|
||||
if reconnect_failed:
|
||||
out.fail('Reconnect failed.')
|
||||
out.error('Reconnect failed.')
|
||||
return exitcodes.FAILURE
|
||||
|
||||
if modulus_size_returned > 0:
|
||||
|
||||
@@ -40,7 +40,7 @@ class HostKeyTest:
|
||||
# Tracks the RSA host key types. As of this writing, testing one in this family yields valid results for the rest.
|
||||
RSA_FAMILY = ['ssh-rsa', 'rsa-sha2-256', 'rsa-sha2-512']
|
||||
|
||||
# Dict holding the host key types we should extract & parse. 'cert' is True to denote that a host key type handles certificates (thus requires additional parsing). 'variable_key_len' is True for host key types that can have variable sizes (True only for RSA types, as the rest are of fixed-size). After the host key type is fully parsed, the key 'parsed' is added with a value of True.
|
||||
# Dict holding the host key types we should extract & parse. 'cert' is True to denote that a host key type handles certificates (thus requires additional parsing). 'variable_key_len' is True for host key types that can have variable sizes (True only for RSA types, as the rest are of fixed-size).
|
||||
HOST_KEY_TYPES = {
|
||||
'ssh-rsa': {'cert': False, 'variable_key_len': True},
|
||||
'rsa-sha2-256': {'cert': False, 'variable_key_len': True},
|
||||
@@ -93,6 +93,7 @@ class HostKeyTest:
|
||||
def perform_test(out: 'OutputBuffer', s: 'SSH_Socket', server_kex: 'SSH2_Kex', kex_str: str, kex_group: 'KexDH', host_key_types: Dict[str, Dict[str, bool]]) -> None:
|
||||
hostkey_modulus_size = 0
|
||||
ca_modulus_size = 0
|
||||
parsed_host_key_types = set()
|
||||
|
||||
# If the connection still exists, close it so we can test
|
||||
# using a clean slate (otherwise it may exist in a non-testable
|
||||
@@ -106,7 +107,7 @@ class HostKeyTest:
|
||||
key_warn_comments = []
|
||||
|
||||
# Skip those already handled (i.e.: those in the RSA family, as testing one tests them all).
|
||||
if 'parsed' in host_key_types[host_key_type] and host_key_types[host_key_type]['parsed']:
|
||||
if host_key_type in parsed_host_key_types:
|
||||
continue
|
||||
|
||||
# If this host key type is supported by the server, we test it.
|
||||
@@ -216,7 +217,7 @@ class HostKeyTest:
|
||||
# If this host key type is in the RSA family, then mark them all as parsed (since results in one are valid for them all).
|
||||
if host_key_type in HostKeyTest.RSA_FAMILY:
|
||||
for rsa_type in HostKeyTest.RSA_FAMILY:
|
||||
host_key_types[rsa_type]['parsed'] = True
|
||||
parsed_host_key_types.add(rsa_type)
|
||||
|
||||
# If the current key is a member of the RSA family, then populate all RSA family members with the same
|
||||
# failure and/or warning comments.
|
||||
@@ -228,7 +229,7 @@ class HostKeyTest:
|
||||
db['key'][rsa_type][2].extend(key_warn_comments)
|
||||
|
||||
else:
|
||||
host_key_types[host_key_type]['parsed'] = True
|
||||
parsed_host_key_types.add(host_key_type)
|
||||
db = SSH2_KexDB.get_db()
|
||||
while len(db['key'][host_key_type]) < 3:
|
||||
db['key'][host_key_type].append([])
|
||||
|
||||
@@ -54,6 +54,14 @@ class OutputBuffer:
|
||||
self.__is_color_supported = ('colorama' in sys.modules) or (os.name == 'posix')
|
||||
self.line_ended = True
|
||||
|
||||
def error(self, msg, line_ended=True):
|
||||
"""
|
||||
Writes an error message to stderr.
|
||||
"""
|
||||
end = '' if line_ended else '\n'
|
||||
sys.stderr.write(f'{msg}{end}')
|
||||
sys.stderr.flush()
|
||||
|
||||
def _print(self, level: str, s: str = '', line_ended: bool = True) -> None:
|
||||
'''Saves output to buffer (if in buffered mode), or immediately prints to stdout otherwise.'''
|
||||
|
||||
|
||||
@@ -88,7 +88,7 @@ def usage(uout: OutputBuffer, err: Optional[str] = None) -> None:
|
||||
p = os.path.basename(sys.argv[0])
|
||||
uout.head('# {} {}, https://github.com/jtesta/ssh-audit\n'.format(p, VERSION))
|
||||
if err is not None and len(err) > 0:
|
||||
uout.fail(err + '\n')
|
||||
uout.error(err + '\n')
|
||||
retval = exitcodes.UNKNOWN_ERROR
|
||||
uout.info('usage: {0} [options] <host>\n'.format(p))
|
||||
uout.info(' -h, --help print this help')
|
||||
@@ -835,7 +835,7 @@ def list_policies(out: OutputBuffer, verbose: bool) -> None:
|
||||
|
||||
out.sep()
|
||||
if len(server_policy_names) == 0 and len(client_policy_names) == 0:
|
||||
out.fail("Error: no built-in policies found!")
|
||||
out.error("Error: no built-in policies found!")
|
||||
else:
|
||||
out.info("\nHint: Use -P and provide the full name of a policy to run a policy scan with.\n")
|
||||
out.info("Hint: Use -L -v to also see the change log for each policy.\n")
|
||||
@@ -1051,19 +1051,19 @@ def process_commandline(out: OutputBuffer, args: List[str], usage_cb: Callable[.
|
||||
try:
|
||||
aconf.policy = Policy(policy_file=aconf.policy_file, json_output=aconf.json)
|
||||
except Exception as e:
|
||||
out.fail("Error while loading policy file: %s: %s" % (str(e), traceback.format_exc()))
|
||||
out.error("Error while loading policy file: %s: %s" % (str(e), traceback.format_exc()))
|
||||
out.write()
|
||||
sys.exit(exitcodes.UNKNOWN_ERROR)
|
||||
|
||||
# If the user wants to do a client audit, but provided a server policy, terminate.
|
||||
if aconf.client_audit and aconf.policy.is_server_policy():
|
||||
out.fail("Error: client audit selected, but server policy provided.")
|
||||
out.error("Error: client audit selected, but server policy provided.")
|
||||
out.write()
|
||||
sys.exit(exitcodes.UNKNOWN_ERROR)
|
||||
|
||||
# If the user wants to do a server audit, but provided a client policy, terminate.
|
||||
if aconf.client_audit is False and aconf.policy.is_server_policy() is False:
|
||||
out.fail("Error: server audit selected, but client policy provided.")
|
||||
out.error("Error: server audit selected, but client policy provided.")
|
||||
out.write()
|
||||
sys.exit(exitcodes.UNKNOWN_ERROR)
|
||||
|
||||
@@ -1262,7 +1262,7 @@ def audit(out: OutputBuffer, aconf: AuditConf, sshv: Optional[int] = None, print
|
||||
err = s.connect()
|
||||
|
||||
if err is not None:
|
||||
out.fail(err)
|
||||
out.error(err)
|
||||
|
||||
# If we're running against multiple targets, return a connection error to the calling worker thread. Otherwise, write the error message to the console and exit.
|
||||
if len(aconf.target_list) > 0:
|
||||
@@ -1310,7 +1310,7 @@ def audit(out: OutputBuffer, aconf: AuditConf, sshv: Optional[int] = None, print
|
||||
err = fmt.format(err_pair[0], err_pair[1], packet_type)
|
||||
if err is not None:
|
||||
output(out, aconf, banner, header)
|
||||
out.fail(err)
|
||||
out.error(err)
|
||||
return exitcodes.CONNECTION_ERROR
|
||||
if sshv == 1:
|
||||
program_retval = output(out, aconf, banner, header, pkm=SSH1_PublicKeyMessage.parse(payload))
|
||||
@@ -1318,7 +1318,7 @@ def audit(out: OutputBuffer, aconf: AuditConf, sshv: Optional[int] = None, print
|
||||
try:
|
||||
kex = SSH2_Kex.parse(out, payload)
|
||||
except Exception:
|
||||
out.fail("Failed to parse server's kex. Stack trace:\n%s" % str(traceback.format_exc()))
|
||||
out.error("Failed to parse server's kex. Stack trace:\n%s" % str(traceback.format_exc()))
|
||||
return exitcodes.CONNECTION_ERROR
|
||||
|
||||
if aconf.dheat is not None:
|
||||
|
||||
@@ -108,7 +108,8 @@ class SSH_Socket(ReadBuf, WriteBuf):
|
||||
s.listen()
|
||||
self.__sock_map[s.fileno()] = s
|
||||
except Exception as e:
|
||||
print("Warning: failed to listen on any IPv4 interfaces: %s" % str(e))
|
||||
self.__outputbuffer.error("Warning: failed to listen on any IPv4 interfaces: %s" % str(e))
|
||||
sys.exit(exitcodes.CONNECTION_ERROR)
|
||||
|
||||
try:
|
||||
# Socket to listen on all IPv6 addresses.
|
||||
@@ -119,11 +120,12 @@ class SSH_Socket(ReadBuf, WriteBuf):
|
||||
s.listen()
|
||||
self.__sock_map[s.fileno()] = s
|
||||
except Exception as e:
|
||||
print("Warning: failed to listen on any IPv6 interfaces: %s" % str(e))
|
||||
self.__outputbuffer.error("Warning: failed to listen on any IPv6 interfaces: %s" % str(e))
|
||||
sys.exit(exitcodes.CONNECTION_ERROR)
|
||||
|
||||
# If we failed to listen on any interfaces, terminate.
|
||||
if len(self.__sock_map.keys()) == 0:
|
||||
print("Error: failed to listen on any IPv4 and IPv6 interfaces!")
|
||||
self.__outputbuffer.error("Error: failed to listen on any IPv4 and IPv6 interfaces!")
|
||||
sys.exit(exitcodes.CONNECTION_ERROR)
|
||||
|
||||
# Wait for an incoming connection. If a timeout was explicitly
|
||||
@@ -141,7 +143,7 @@ class SSH_Socket(ReadBuf, WriteBuf):
|
||||
break
|
||||
|
||||
if self.__timeout_set and time_elapsed >= self.__timeout:
|
||||
print("Timeout elapsed. Terminating...")
|
||||
self.__outputbuffer.error("Timeout elapsed. Terminating...")
|
||||
sys.exit(exitcodes.CONNECTION_ERROR)
|
||||
|
||||
# Accept the connection.
|
||||
@@ -275,7 +277,7 @@ class SSH_Socket(ReadBuf, WriteBuf):
|
||||
payload_length = packet_length - padding_length - 1
|
||||
check_size = 4 + 1 + payload_length + padding_length
|
||||
if check_size % self.__block_size != 0:
|
||||
self.__outputbuffer.fail('[exception] invalid ssh packet (block size)').write()
|
||||
self.__outputbuffer.error('[exception] invalid ssh packet (block size)').write()
|
||||
sys.exit(exitcodes.CONNECTION_ERROR)
|
||||
self.ensure_read(payload_length)
|
||||
if sshv == 1:
|
||||
@@ -290,7 +292,7 @@ class SSH_Socket(ReadBuf, WriteBuf):
|
||||
if sshv == 1:
|
||||
rcrc = SSH1.crc32(padding + payload)
|
||||
if crc != rcrc:
|
||||
self.__outputbuffer.fail('[exception] packet checksum CRC32 mismatch.').write()
|
||||
self.__outputbuffer.error('[exception] packet checksum CRC32 mismatch.').write()
|
||||
sys.exit(exitcodes.CONNECTION_ERROR)
|
||||
else:
|
||||
self.ensure_read(padding_length)
|
||||
|
||||
Reference in New Issue
Block a user