Added client audit functionality. (#3)

This commit is contained in:
Joe Testa 2019-09-27 18:14:36 -04:00
parent 08677d65b1
commit fd3a1f7d41
10 changed files with 182 additions and 43 deletions

View File

@ -28,6 +28,7 @@ from __future__ import print_function
import binascii, os, io, sys, socket, struct, random, errno, getopt, re, hashlib, base64
VERSION = 'v2.1.0-dev'
SSH_HEADER = 'SSH-{0}-OpenSSH_8.0' # SSH software to impersonate
if sys.version_info.major < 3:
print("\n!!!! NOTE: Python 2 is being considered for deprecation. If you have a good reason to need continued Python 2 support, please e-mail jtesta@positronsecurity.com with your rationale.\n\n")
@ -69,10 +70,11 @@ def usage(err=None):
uout.info(' -6, --ipv6 enable IPv6 (order of precedence)')
uout.info(' -p, --port=<port> port to connect')
uout.info(' -b, --batch batch output')
uout.info(' -c, --client-audit starts a server on port 2222 to audit client\n software config (use -p to change port)')
uout.info(' -n, --no-colors disable colors')
uout.info(' -v, --verbose verbose output')
uout.info(' -l, --level=<level> minimum output level (info|warn|fail)')
uout.info(' -t, --timeout=<secs> timeout (in seconds) for connection and reading\n (default: 5)')
uout.info(' -t, --timeout=<secs> timeout (in seconds) for connection and reading\n (default: 5)')
uout.sep()
sys.exit(1)
@ -86,6 +88,7 @@ class AuditConf(object):
self.ssh1 = True
self.ssh2 = True
self.batch = False
self.client_audit = False
self.colors = True
self.verbose = False
self.level = 'info'
@ -97,7 +100,7 @@ class AuditConf(object):
def __setattr__(self, name, value):
# type: (str, Union[str, int, bool, Sequence[int]]) -> None
valid = False
if name in ['ssh1', 'ssh2', 'batch', 'colors', 'verbose']:
if name in ['ssh1', 'ssh2', 'batch', 'client_audit', 'colors', 'verbose']:
valid, value = True, True if bool(value) else False
elif name in ['ipv4', 'ipv6']:
valid = False
@ -144,9 +147,9 @@ class AuditConf(object):
# pylint: disable=too-many-branches
aconf = cls()
try:
sopts = 'h1246p:bnvl:t:'
sopts = 'h1246p:bcnvl:t:'
lopts = ['help', 'ssh1', 'ssh2', 'ipv4', 'ipv6', 'port',
'batch', 'no-colors', 'verbose', 'level=', 'timeout=']
'batch', 'client-audit', 'no-colors', 'verbose', 'level=', 'timeout=']
opts, args = getopt.getopt(args, sopts, lopts)
except getopt.GetoptError as err:
usage_cb(str(err))
@ -168,6 +171,8 @@ class AuditConf(object):
elif o in ('-b', '--batch'):
aconf.batch = True
aconf.verbose = True
elif o in ('-c', '--client-audit'):
aconf.client_audit = True
elif o in ('-n', '--no-colors'):
aconf.colors = False
elif o in ('-v', '--verbose'):
@ -178,23 +183,28 @@ class AuditConf(object):
aconf.level = a
elif o in ('-t', '--timeout'):
aconf.timeout = float(a)
if len(args) == 0:
if len(args) == 0 and aconf.client_audit == False:
usage_cb()
if oport is not None:
host = args[0]
else:
mx = re.match(r'^\[([^\]]+)\](?::(.*))?$', args[0])
if bool(mx):
host, oport = mx.group(1), mx.group(2)
if aconf.client_audit == False:
if oport is not None:
host = args[0]
else:
s = args[0].split(':')
if len(s) > 2:
host, oport = args[0], '22'
mx = re.match(r'^\[([^\]]+)\](?::(.*))?$', args[0])
if bool(mx):
host, oport = mx.group(1), mx.group(2)
else:
host, oport = s[0], s[1] if len(s) > 1 else '22'
s = args[0].split(':')
if len(s) > 2:
host, oport = args[0], '22'
else:
host, oport = s[0], s[1] if len(s) > 1 else '22'
if not host:
usage_cb('host is empty')
else:
host = None
if oport is None:
oport = '2222'
port = utils.parse_int(oport)
if not host:
usage_cb('host is empty')
if port <= 0 or port > 65535:
usage_cb('port {0} is not valid'.format(oport))
aconf.host = host
@ -343,6 +353,7 @@ class SSH2(object): # pylint: disable=too-few-public-methods
'rsa1024-sha1': [[], [], [WARN_MODULUS_SIZE, WARN_HASH_WEAK]],
'rsa2048-sha256': [[]],
'sntrup4591761x25519-sha512@tinyssh.org': [['8.0'], [], [WARN_EXPERIMENTAL]],
'ext-info-c': [[]], # Extension negotiation (RFC 8308)
},
'key': {
'rsa-sha2-256': [['7.2']],
@ -1217,6 +1228,7 @@ class SSH(object): # pylint: disable=too-few-public-methods
DropbearSSH = 'Dropbear SSH'
LibSSH = 'libssh'
TinySSH = 'TinySSH'
PuTTY = 'PuTTY'
class Software(object):
def __init__(self, vendor, product, version, patch, os_version):
@ -1420,6 +1432,9 @@ class SSH(object): # pylint: disable=too-few-public-methods
mx = re.match(r'^tinyssh_(.*)', software)
if bool(mx):
return cls(None, SSH.Product.TinySSH, mx.group(1), None, None)
mx = re.match(r'^PuTTY_Release_(.*)', software)
if bool(mx):
return cls(None, SSH.Product.PuTTY, mx.group(1), None, None)
return None
class Banner(object):
@ -1910,7 +1925,19 @@ class SSH(object): # pylint: disable=too-few-public-methods
['3.0', '3.0p1', 1, 'CVE-2001-1507', 7.5, 'bypass authentication'],
['1.2.3', '3.0.1p1', 5, 'CVE-2001-0872', 7.2, 'privilege escalation via crafted environment variables'],
['1.2.3', '2.1.1', 1, 'CVE-2001-0361', 4.0, 'recover plaintext from ciphertext'],
['1.2', '2.1', 1, 'CVE-2000-0525', 10.0, 'execute arbitrary code (improper privileges)']]
['1.2', '2.1', 1, 'CVE-2000-0525', 10.0, 'execute arbitrary code (improper privileges)']],
'PuTTY': [
['0.0', '0.71', 2, 'CVE-2019-XXXX', 5.0, 'undefined vulnerability in obsolete SSHv1 protocol handling'],
['0.0', '0.71', 6, 'CVE-2019-XXXX', 5.0, 'local privilege escalation in Pageant'],
['0.0', '0.70', 2, 'CVE-2019-9898', 7.5, 'potential recycling of random numbers'],
['0.0', '0.70', 2, 'CVE-2019-9897', 5.0, 'multiple denial-of-service issues from writing to the terminal'],
['0.0', '0.70', 6, 'CVE-2019-9896', 4.6, 'local application hijacking through malicious Windows help file'],
['0.0', '0.70', 2, 'CVE-2019-9894', 6.4, 'buffer overflow in RSA key exchange'],
['0.0', '0.69', 6, 'CVE-2016-6167', 4.4, 'local application hijacking through untrusted DLL loading'],
['0.0', '0.67', 2, 'CVE-2017-6542', 7.5, 'buffer overflow in UNIX client that can result in privilege escalation or denial-of-service'],
['0.0', '0.66', 2, 'CVE-2016-2563', 7.5, 'buffer overflow in SCP command-line utility'],
['0.0', '0.65', 2, 'CVE-2015-5309', 4.3, 'integer overflow in terminal-handling code'],
]
} # type: Dict[str, List[List[Any]]]
TXT = {
'Dropbear SSH': [
@ -1931,12 +1958,13 @@ class SSH(object): # pylint: disable=too-few-public-methods
# type: (Optional[str], int) -> None
super(SSH.Socket, self).__init__()
self.__sock = None # type: Optional[socket.socket]
self.__sock_server = None
self.__block_size = 8
self.__state = 0
self.__header = [] # type: List[text_type]
self.__banner = None # type: Optional[SSH.Banner]
if host is None:
raise ValueError('undefined host')
# if host is None:
# raise ValueError('undefined host')
nport = utils.parse_int(port)
if nport < 1 or nport > 65535:
raise ValueError('invalid port: {0}'.format(port))
@ -1971,7 +1999,26 @@ class SSH(object): # pylint: disable=too-few-public-methods
except socket.error as e:
out.fail('[exception] {0}'.format(e))
sys.exit(1)
# Listens on a server socket and accepts one connection (used for
# auditing client connections).
def listen_and_accept(self):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.__sock_server = s
# TODO: listen on IPv6 address if necessary.
s.bind(('0.0.0.0', self.__port))
s.listen()
c, addr = s.accept()
self.client_host = addr[0]
self.client_port = addr[1]
c.settimeout(self.__timeout)
self.__sock = c
def connect(self):
# type: () -> None
err = None
@ -1998,15 +2045,15 @@ class SSH(object): # pylint: disable=too-few-public-methods
# type: (int) -> Tuple[Optional[SSH.Banner], List[text_type], Optional[str]]
if self.__sock is None:
return self.__banner, self.__header, 'not connected'
banner = 'SSH-{0}-OpenSSH_7.4'.format('1.5' if sshv == 1 else '2.0')
rto = self.__sock.gettimeout()
self.__sock.settimeout(0.7)
s, e = self.recv()
self.__sock.settimeout(rto)
if s < 0:
return self.__banner, self.__header, e
banner = SSH_HEADER.format('1.5' if sshv == 1 else '2.0')
if self.__state < self.SM_BANNER_SENT:
self.send_banner(banner)
# rto = self.__sock.gettimeout()
# self.__sock.settimeout(0.7)
s, e = self.recv()
# self.__sock.settimeout(rto)
if s < 0:
return self.__banner, self.__header, e
e = None
while self.__banner is None:
if not s > 0:
@ -2163,6 +2210,7 @@ class SSH(object): # pylint: disable=too-few-public-methods
def __cleanup(self):
# type: () -> None
self._close_socket(self.__sock)
self._close_socket(self.__sock_server)
self.__sock = None
@ -2643,8 +2691,13 @@ def output_algorithm(alg_db, alg_type, alg_name, unknown_algs, alg_max_len=0, al
f(' ' * len(prefix + alg_name) + comment)
def output_compatibility(algs, for_server=True):
def output_compatibility(algs, client_audit, for_server=True):
# type: (SSH.Algorithms, bool) -> None
# Don't output any compatibility info if we're doing a client audit.
if client_audit:
return
ssh_timeframe = algs.get_ssh_timeframe(for_server)
comp_text = []
for ssh_prod in [SSH.Product.OpenSSH, SSH.Product.DropbearSSH]:
@ -2669,7 +2722,7 @@ def output_compatibility(algs, for_server=True):
out.good('(gen) compatibility: ' + ', '.join(comp_text))
def output_security_sub(sub, software, padlen):
def output_security_sub(sub, software, client_audit, padlen):
# type: (str, Optional[SSH.Software], int) -> None
secdb = SSH.Security.CVE if sub == 'cve' else SSH.Security.TXT
if software is None or software.product not in secdb:
@ -2680,9 +2733,11 @@ def output_security_sub(sub, software, padlen):
continue
target, name = line[2:4] # type: int, str
is_server = target & 1 == 1
# is_client = target & 2 == 2
is_client = target & 2 == 2
# is_local = target & 4 == 4
if not is_server:
# If this security entry applies only to servers, but we're testing a client, then skip it. Similarly, skip entries that apply only to clients, but we're testing a server.
if (is_server and not is_client and client_audit) or (is_client and not is_server and not client_audit):
continue
p = '' if out.batch else ' ' * (padlen - len(name))
if sub == 'cve':
@ -2698,13 +2753,13 @@ def output_security_sub(sub, software, padlen):
out.fail('(sec) {0}{1} -- {2}'.format(name, p, descr))
def output_security(banner, padlen):
def output_security(banner, client_audit, padlen):
# type: (Optional[SSH.Banner], int) -> None
with OutputBuffer() as obuf:
if banner is not None:
software = SSH.Software.parse(banner)
output_security_sub('cve', software, padlen)
output_security_sub('txt', software, padlen)
output_security_sub('cve', software, client_audit, padlen)
output_security_sub('txt', software, client_audit, padlen)
if len(obuf) > 0:
out.head('# security')
obuf.flush()
@ -2749,8 +2804,39 @@ def output_fingerprints(algs, sha256=True):
out.sep()
# Returns True if no warnings or failures encountered in configuration.
def output_recommendations(algs, software, padlen=0):
# type: (SSH.Algorithms, Optional[SSH.Software], int) -> None
ret = True
# PuTTY's algorithms cannot be modified, so there's no point in issuing recommendations.
if software.product == SSH.Product.PuTTY:
max_vuln_version = 0.0
max_cvssv2_severity = 0.0
# Search the CVE database for the most recent vulnerable version and the max CVSSv2 score.
for cve_list in SSH.Security.CVE['PuTTY']:
vuln_version = float(cve_list[1])
cvssv2_severity = cve_list[4]
if vuln_version > max_vuln_version:
max_vuln_version = vuln_version
if cvssv2_severity > max_cvssv2_severity:
max_cvssv2_severity = cvssv2_severity
fn = out.warn
if max_cvssv2_severity > 8.0:
fn = out.fail
# Assuming that PuTTY versions will always increment by 0.01, we can calculate the first safe version by adding 0.01 to the latest vulnerable version.
current_version = float(software.version)
upgrade_to_version = max_vuln_version + 0.01
if current_version < upgrade_to_version:
out.head('# recommendations')
fn('(rec) Upgrade to PuTTY v%.2f' % upgrade_to_version)
out.sep()
ret = False
return ret
for_server = True
with OutputBuffer() as obuf:
software, alg_rec = algs.get_recommendations(software, for_server)
@ -2768,12 +2854,14 @@ def output_recommendations(algs, software, padlen=0):
chg_additional_info = ''
if action == 'del':
an, sg, fn = 'remove', '-', out.warn
ret = False
if alg_rec[sshv][alg_type][action][name] >= 10:
fn = out.fail
elif action == 'add':
an, sg, fn = 'append', '+', out.good
elif action == 'chg':
an, sg, fn = 'change', '!', out.fail
ret = False
chg_additional_info = ' (increase modulus size to 2048 bits or larger)'
b = '(SSH{0})'.format(sshv) if sshv == 1 else ''
fm = '(rec) {0}{1}{2}-- {3} algorithm to {4}{5} {6}'
@ -2786,9 +2874,27 @@ def output_recommendations(algs, software, padlen=0):
out.head('# algorithm recommendations {0}'.format(title))
obuf.flush(True) # Sort the output so that it is always stable (needed for repeatable testing).
out.sep()
return ret
def output(banner, header, kex=None, pkm=None):
# Output additional information & notes.
def output_info(algs, software, client_audit, any_problems, padlen=0):
with OutputBuffer() as obuf:
# Tell user that PuTTY cannot be hardened at the protocol-level.
if client_audit and (software.product == SSH.Product.PuTTY):
out.warn('(nfo) PuTTY does not have the option of restricting any algorithms during the SSH handshake.')
# If any warnings or failures were given, print a link to the hardening guides.
if any_problems:
out.warn('(nfo) For hardening guides on common OSes, please see: <https://www.ssh-audit.com/hardening_guides.html>')
if len(obuf) > 0:
out.head('# additional info')
obuf.flush()
out.sep()
def output(banner, header, client_audit=False, kex=None, pkm=None):
# type: (Optional[SSH.Banner], List[text_type], Optional[SSH2.Kex], Optional[SSH1.PublicKeyMessage]) -> None
sshv = 1 if pkm is not None else 2
algs = SSH.Algorithms(pkm, kex)
@ -2807,7 +2913,7 @@ def output(banner, header, kex=None, pkm=None):
out.good('(gen) software: {0}'.format(software))
else:
software = None
output_compatibility(algs)
output_compatibility(algs, client_audit)
if kex is not None:
compressions = [x for x in kex.server.compression if x != 'none']
if len(compressions) > 0:
@ -2820,7 +2926,7 @@ def output(banner, header, kex=None, pkm=None):
obuf.flush()
out.sep()
maxlen = algs.maxlen + 1
output_security(banner, maxlen)
output_security(banner, client_audit, maxlen)
unknown_algorithms = [] # Filled in by output_algorithms() with unidentified algs.
if pkm is not None:
adb = SSH1.KexDB.ALGORITHMS
@ -2843,7 +2949,9 @@ def output(banner, header, kex=None, pkm=None):
title, atype = 'message authentication code algorithms', 'mac'
output_algorithms(title, adb, atype, kex.server.mac, unknown_algorithms, maxlen)
output_fingerprints(algs, True)
output_recommendations(algs, software, maxlen)
perfect_config = output_recommendations(algs, software, maxlen)
output_info(algs, software, client_audit, not perfect_config)
# If we encountered any unknown algorithms, ask the user to report them.
if len(unknown_algorithms) > 0:
@ -2979,7 +3087,10 @@ def audit(aconf, sshv=None):
out.level = aconf.level
out.use_colors = aconf.colors
s = SSH.Socket(aconf.host, aconf.port, aconf.ipvo, aconf.timeout)
s.connect()
if aconf.client_audit:
s.listen_and_accept()
else:
s.connect()
if sshv is None:
sshv = 2 if aconf.ssh2 else 1
err = None
@ -3023,9 +3134,10 @@ def audit(aconf, sshv=None):
output(banner, header, pkm=pkm)
elif sshv == 2:
kex = SSH2.Kex.parse(payload)
SSH2.HostKeyTest.run(s, kex)
SSH2.GEXTest.run(s, kex)
output(banner, header, kex=kex)
if aconf.client_audit is False:
SSH2.HostKeyTest.run(s, kex)
SSH2.GEXTest.run(s, kex)
output(banner, header, client_audit=aconf.client_audit, kex=kex)
utils = Utils()

View File

@ -80,3 +80,6 @@
(rec) -diffie-hellman-group14-sha1 -- kex algorithm to remove 
(rec) -hmac-sha1 -- mac algorithm to remove 
# additional info
(nfo) For hardening guides on common OSes, please see: <https://www.ssh-audit.com/hardening_guides.html>

View File

@ -134,3 +134,6 @@
(rec) -rijndael-cbc@lysator.liu.se -- enc algorithm to remove 
(rec) -ssh-dss -- key algorithm to remove 
# additional info
(nfo) For hardening guides on common OSes, please see: <https://www.ssh-audit.com/hardening_guides.html>

View File

@ -143,3 +143,6 @@
(rec) -ssh-dss -- key algorithm to remove 
(rec) -diffie-hellman-group14-sha1 -- kex algorithm to remove 
# additional info
(nfo) For hardening guides on common OSes, please see: <https://www.ssh-audit.com/hardening_guides.html>

View File

@ -141,3 +141,6 @@
(rec) -rijndael-cbc@lysator.liu.se -- enc algorithm to remove 
(rec) -diffie-hellman-group14-sha1 -- kex algorithm to remove 
# additional info
(nfo) For hardening guides on common OSes, please see: <https://www.ssh-audit.com/hardening_guides.html>

View File

@ -141,3 +141,6 @@
(rec) -rijndael-cbc@lysator.liu.se -- enc algorithm to remove 
(rec) -diffie-hellman-group14-sha1 -- kex algorithm to remove 
# additional info
(nfo) For hardening guides on common OSes, please see: <https://www.ssh-audit.com/hardening_guides.html>

View File

@ -139,3 +139,6 @@
(rec) -rijndael-cbc@lysator.liu.se -- enc algorithm to remove 
(rec) -diffie-hellman-group14-sha1 -- kex algorithm to remove 
# additional info
(nfo) For hardening guides on common OSes, please see: <https://www.ssh-audit.com/hardening_guides.html>

View File

@ -137,3 +137,6 @@
(rec) -rijndael-cbc@lysator.liu.se -- enc algorithm to remove 
(rec) -diffie-hellman-group14-sha1 -- kex algorithm to remove 
# additional info
(nfo) For hardening guides on common OSes, please see: <https://www.ssh-audit.com/hardening_guides.html>

View File

@ -77,3 +77,6 @@
(rec) -umac-64-etm@openssh.com -- mac algorithm to remove 
(rec) -umac-64@openssh.com -- mac algorithm to remove 
# additional info
(nfo) For hardening guides on common OSes, please see: <https://www.ssh-audit.com/hardening_guides.html>

View File

@ -73,3 +73,6 @@
(rec) -umac-64-etm@openssh.com -- mac algorithm to remove 
(rec) -umac-64@openssh.com -- mac algorithm to remove 
# additional info
(nfo) For hardening guides on common OSes, please see: <https://www.ssh-audit.com/hardening_guides.html>