diff --git a/.travis.yml b/.travis.yml index 35a036f..f1ee663 100644 --- a/.travis.yml +++ b/.travis.yml @@ -8,7 +8,11 @@ python: - pypy - pypy3 install: - - pip install pytest + - pip install --upgrade pytest + - pip install --upgrade pytest-cov + - pip install --upgrade coveralls script: - - py.test -v test + - py.test --cov-report= --cov=ssh-audit -v test +after_success: + - coveralls diff --git a/README.md b/README.md index 359bd09..7830cc5 100644 --- a/README.md +++ b/README.md @@ -7,10 +7,11 @@ - grab banner, recognize device or software and operating system, detect compression; - gather key-exchange, host-key, encryption and message authentication code algorithms; - output algorithm information (available since, removed/disabled, unsafe/weak/legacy, etc); +- output algorithm recommendations (append or remove based on recognized software version); - output security information (related issues, assigned CVE list, etc); - analyze SSH version compatibility based on algorithm information; -- historical information from OpenSSH and Dropbear SSH; -- no dependencies, compatible with Python2 and Python3; +- historical information from OpenSSH, Dropbear SSH and libssh; +- no dependencies, compatible with Python 2.6+, Python 3.x and PyPy; ## Usage ``` @@ -28,9 +29,17 @@ usage: ssh-audit.py [-bnv] [-l ] * verbose flag `-v` will prefix each line with section type and algorithm name. ### example -![screenshot](https://cloud.githubusercontent.com/assets/7356025/17623665/da5281c8-60a9-11e6-9582-13f9971c22e0.png) +![screenshot](https://cloud.githubusercontent.com/assets/7356025/19233757/3e09b168-8ef0-11e6-91b4-e880bacd0b8a.png) ## ChangeLog +### v1.6.0 (2016-10-14) + - implement algorithm recommendations section (based on recognized software) + - implement full libssh support (version history, algorithms, security, etc) + - fix SSH-1.99 banner recognition and version comparison functionality + - do not output empty algorithms (happens for misconfigured servers) + - make consistent output for Python 3.x versions + - add a lot more tests (conf, banner, software, SSH1/SSH2, output, etc) + - use Travis CI to test for multiple Python versions (2.6-3.5, pypy, pypy3) ### v1.5.0 (2016-09-20) - create security section for related security information diff --git a/ssh-audit.py b/ssh-audit.py index dcb3208..422abc3 100755 --- a/ssh-audit.py +++ b/ssh-audit.py @@ -26,13 +26,12 @@ from __future__ import print_function import os, io, sys, socket, struct, random, errno, getopt, re, hashlib, base64 -VERSION = 'v1.5.0' +VERSION = 'v1.5.1.dev' def usage(err=None): + out = Output() p = os.path.basename(sys.argv[0]) - out.batch = False - out.minlevel = 'info' out.head('# {0} {1}, moo@arthepsy.eu'.format(p, VERSION)) if err is not None: out.fail('\n' + err) @@ -49,43 +48,78 @@ def usage(err=None): class AuditConf(object): - def __init__(self): - self.__host = None - self.__port = 22 - self.__ssh1 = False - self.__ssh2 = False + def __init__(self, host=None, port=22): + self.host = host + self.port = port + self.ssh1 = True + self.ssh2 = True + self.batch = False + self.colors = True + self.verbose = False + self.minlevel = 'info' - @property - def host(self): - return self.__host + def __setattr__(self, name, value): + valid = False + if name in ['ssh1', 'ssh2', 'batch', 'colors', 'verbose']: + valid, value = True, True if value else False + elif name == 'port': + valid, port = True, utils.parse_int(value) + if port < 1 or port > 65535: + raise ValueError('invalid port: {0}'.format(value)) + value = port + elif name in ['minlevel']: + if value not in ('info', 'warn', 'fail'): + raise ValueError('invalid level: {0}'.format(value)) + valid = True + elif name == 'host': + valid = True + if valid: + object.__setattr__(self, name, value) - @host.setter - def host(self, v): - self.__host = v - - @property - def port(self): - return self.__port - - @port.setter - def port(self, v): - self.__port = v - - @property - def ssh1(self): - return self.__ssh1 - - @ssh1.setter - def ssh1(self, v): - self.__ssh1 = v - - @property - def ssh2(self): - return self.__ssh2 - - @ssh2.setter - def ssh2(self, v): - self.__ssh2 = v + @classmethod + def from_cmdline(cls, args, usage_cb): + conf = cls() + try: + sopts = 'h12bnvl:' + lopts = ['help', 'ssh1', 'ssh2', 'batch', + 'no-colors', 'verbose', 'level='] + opts, args = getopt.getopt(args, sopts, lopts) + except getopt.GetoptError as err: + usage_cb(str(err)) + conf.ssh1, conf.ssh2 = False, False + for o, a in opts: + if o in ('-h', '--help'): + usage_cb() + elif o in ('-1', '--ssh1'): + conf.ssh1 = True + elif o in ('-2', '--ssh2'): + conf.ssh2 = True + elif o in ('-b', '--batch'): + conf.batch = True + conf.verbose = True + elif o in ('-n', '--no-colors'): + conf.colors = False + elif o in ('-v', '--verbose'): + conf.verbose = True + elif o in ('-l', '--level'): + if a not in ('info', 'warn', 'fail'): + usage_cb('level {0} is not valid'.format(a)) + conf.minlevel = a + if len(args) == 0: + usage_cb() + s = args[0].split(':') + host, port = s[0].strip(), 22 + if len(s) > 1: + port = utils.parse_int(s[1]) + if not host: + usage_cb('host is empty') + if port <= 0 or port > 65535: + usage_cb('port {0} is not valid'.format(s[1])) + conf.host = host + conf.port = port + if not (conf.ssh1 or conf.ssh2): + conf.ssh1, conf.ssh2 = True, True + return conf class Output(object): @@ -100,7 +134,9 @@ class Output(object): @property def minlevel(self): - return self.__minlevel + if self.__minlevel < len(self.LEVELS): + return self.LEVELS[self.__minlevel] + return 'unknown' @minlevel.setter def minlevel(self, name): @@ -122,7 +158,7 @@ class Output(object): def __getattr__(self, name): if name == 'head' and self.batch: return lambda x: None - if not self.getlevel(name) >= self.minlevel: + if not self.getlevel(name) >= self.__minlevel: return lambda x: None if self.colors and os.name == 'posix' and name in self.COLORS: color = u'\033[0;{0}m'.format(self.COLORS[name]) @@ -133,7 +169,7 @@ class Output(object): class OutputBuffer(list): def __enter__(self): - self.__buf = io.StringIO() + self.__buf = utils.StringIO() self.__stdout = sys.stdout sys.stdout = self.__buf return self @@ -147,40 +183,111 @@ class OutputBuffer(list): sys.stdout = self.__stdout -class KexParty(object): - encryption = [] - mac = [] - compression = [] - languages = [] - - -class Kex(object): - cookie = None - kex_algorithms = [] - key_algorithms = [] - server = KexParty() - client = KexParty() - follows = False - unused = 0 +class SSH2(object): + class KexParty(object): + def __init__(self, enc, mac, compression, languages): + self.__enc = enc + self.__mac = mac + self.__compression = compression + self.__languages = languages + + @property + def encryption(self): + return self.__enc + + @property + def mac(self): + return self.__mac + + @property + def compression(self): + return self.__compression + + @property + def languages(self): + return self.__languages - @classmethod - def parse(cls, payload): - kex = cls() - buf = ReadBuf(payload) - kex.cookie = buf.read(16) - kex.kex_algorithms = buf.read_list() - kex.key_algorithms = buf.read_list() - kex.client.encryption = buf.read_list() - kex.server.encryption = buf.read_list() - kex.client.mac = buf.read_list() - kex.server.mac = buf.read_list() - kex.client.compression = buf.read_list() - kex.server.compression = buf.read_list() - kex.client.languages = buf.read_list() - kex.server.languages = buf.read_list() - kex.follows = buf.read_bool() - kex.unused = buf.read_int() - return kex + class Kex(object): + def __init__(self, cookie, kex_algs, key_algs, cli, srv, follows, unused=0): + self.__cookie = cookie + self.__kex_algs = kex_algs + self.__key_algs = key_algs + self.__client = cli + self.__server = srv + self.__follows = follows + self.__unused = unused + + @property + def cookie(self): + return self.__cookie + + @property + def kex_algorithms(self): + return self.__kex_algs + + @property + def key_algorithms(self): + return self.__key_algs + + # client_to_server + @property + def client(self): + return self.__client + + # server_to_client + @property + def server(self): + return self.__server + + @property + def follows(self): + return self.__follows + + @property + def unused(self): + return self.__unused + + def write(self, wbuf): + wbuf.write(self.cookie) + wbuf.write_list(self.kex_algorithms) + wbuf.write_list(self.key_algorithms) + wbuf.write_list(self.client.encryption) + wbuf.write_list(self.server.encryption) + wbuf.write_list(self.client.mac) + wbuf.write_list(self.server.mac) + wbuf.write_list(self.client.compression) + wbuf.write_list(self.server.compression) + wbuf.write_list(self.client.languages) + wbuf.write_list(self.server.languages) + wbuf.write_bool(self.follows) + wbuf.write_int(self.__unused) + + @property + def payload(self): + wbuf = WriteBuf() + self.write(wbuf) + return wbuf.write_flush() + + @classmethod + def parse(cls, payload): + buf = ReadBuf(payload) + cookie = buf.read(16) + kex_algs = buf.read_list() + key_algs = buf.read_list() + cli_enc = buf.read_list() + srv_enc = buf.read_list() + cli_mac = buf.read_list() + srv_mac = buf.read_list() + cli_compression = buf.read_list() + srv_compression = buf.read_list() + cli_languages = buf.read_list() + srv_languages = buf.read_list() + follows = buf.read_bool() + unused = buf.read_int() + cli = SSH2.KexParty(cli_enc, cli_mac, cli_compression, cli_languages) + srv = SSH2.KexParty(srv_enc, srv_mac, srv_compression, srv_languages) + kex = cls(cookie, kex_algs, key_algs, cli, srv, follows, unused) + return kex class SSH1(object): @@ -204,12 +311,14 @@ class SSH1(object): crc = (crc >> 8) ^ self._table[n] return crc - _crc32 = CRC32() + _crc32 = None CIPHERS = ['none', 'idea', 'des', '3des', 'tss', 'rc4', 'blowfish'] AUTHS = [None, 'rhosts', 'rsa', 'password', 'rhosts_rsa', 'tis', 'kerberos'] @classmethod def crc32(cls, v): + if cls._crc32 is None: + cls._crc32 = cls.CRC32() return cls._crc32.calc(v) class KexDB(object): @@ -315,6 +424,24 @@ class SSH1(object): auths.append(SSH1.AUTHS[i]) return auths + def write(self, wbuf): + wbuf.write(self.cookie) + wbuf.write_int(self.server_key_bits) + wbuf.write_mpint1(self.server_key_public_exponent) + wbuf.write_mpint1(self.server_key_public_modulus) + wbuf.write_int(self.host_key_bits) + wbuf.write_mpint1(self.host_key_public_exponent) + wbuf.write_mpint1(self.host_key_public_modulus) + wbuf.write_int(self.protocol_flags) + wbuf.write_int(self.supported_ciphers_mask) + wbuf.write_int(self.supported_authentications_mask) + + @property + def payload(self): + wbuf = WriteBuf() + self.write(wbuf) + return wbuf.write_flush() + @classmethod def parse(cls, payload): buf = ReadBuf(payload) @@ -337,7 +464,7 @@ class SSH1(object): class ReadBuf(object): def __init__(self, data=None): super(ReadBuf, self).__init__() - self._buf = io.BytesIO(data) if data else io.BytesIO() + self._buf = utils.BytesIO(data) if data else utils.BytesIO() self._len = len(data) if data else 0 @property @@ -416,7 +543,7 @@ class WriteBuf(object): return self.write(v) def write_list(self, v): - self.write_string(u','.join(v)) + return self.write_string(u','.join(v)) @classmethod def _bitlength(cls, n): @@ -454,6 +581,12 @@ class WriteBuf(object): data = self._create_mpint(n) return self.write_string(data) + def write_line(self, v): + if not isinstance(v, bytes): + v = bytes(bytearray(v, 'utf-8')) + v += b'\r\n' + return self.write(v) + def write_flush(self): payload = self._wbuf.getvalue() self._wbuf.truncate(0) @@ -472,6 +605,7 @@ class SSH(object): class Product(object): OpenSSH = 'OpenSSH' DropbearSSH = 'Dropbear SSH' + LibSSH = 'libssh' class Software(object): def __init__(self, vendor, product, version, patch, os): @@ -505,7 +639,7 @@ class SSH(object): if other is None: return 1 if isinstance(other, self.__class__): - other = '{0}{1}'.format(other.version, other.patch) + other = '{0}{1}'.format(other.version, other.patch or '') else: other = str(other) mx = re.match(r'^([\d\.]+\d+)(.*)$', other) @@ -517,15 +651,15 @@ class SSH(object): return -1 elif self.version > oversion: return 1 - spatch = self.patch + spatch = self.patch or '' if self.product == SSH.Product.DropbearSSH: if not re.match(r'^test\d.*$', opatch): opatch = 'z{0}'.format(opatch) - if not re.match(r'^test\d.*$', self.patch): - spatch = 'z{0}'.format(self.patch) + if not re.match(r'^test\d.*$', spatch): + spatch = 'z{0}'.format(spatch) elif self.product == SSH.Product.OpenSSH: mx1 = re.match(r'^p\d(.*)', opatch) - mx2 = re.match(r'^p\d(.*)', self.patch) + mx2 = re.match(r'^p\d(.*)', spatch) if not (mx1 and mx2): if mx1: opatch = mx1.group(1) @@ -544,25 +678,29 @@ class SSH(object): return False return True - def __str__(self): + def display(self, full=True): out = '{0} '.format(self.vendor) if self.vendor else '' out += self.product if self.version: out += ' {0}'.format(self.version) - patch = self.patch - if self.product == SSH.Product.OpenSSH: - mx = re.match('^(p\d)(.*)$', self.patch) - if mx is not None: - out += mx.group(1) - patch = mx.group(2).strip() - if patch: - out += ' ({0})'.format(self.patch) - if self.os: - out += ' running on {0}'.format(self.os) + if full: + patch = self.patch or '' + if self.product == SSH.Product.OpenSSH: + mx = re.match('^(p\d)(.*)$', patch) + if mx is not None: + out += mx.group(1) + patch = mx.group(2).strip() + if patch: + out += ' ({0})'.format(patch) + if self.os: + out += ' running on {0}'.format(self.os) return out + def __str__(self): + return self.display() + def __repr__(self): - out = 'vendor={0} '.format(self.vendor) if self.vendor else '' + out = 'vendor={0}'.format(self.vendor) if self.vendor else '' if self.product: if self.vendor: out += ', ' @@ -577,7 +715,7 @@ class SSH(object): @staticmethod def _fix_patch(patch): - return re.sub(r'^[-_\.]+', '', patch) + return re.sub(r'^[-_\.]+', '', patch) or None @staticmethod def _fix_date(d): @@ -628,6 +766,12 @@ class SSH(object): v = None os = cls._extract_os(banner.comments) return cls(v, p, mx.group(1), patch, os) + mx = re.match(r'^libssh-([\d\.]+\d+)(.*)', software) + if mx: + patch = cls._fix_patch(mx.group(2)) + v, p = None, SSH.Product.LibSSH + os = cls._extract_os(banner.comments) + return cls(v, p, mx.group(1), patch, os) mx = re.match(r'^RomSShell_([\d\.]+\d+)(.*)', software) if mx: patch = cls._fix_patch(mx.group(2)) @@ -644,8 +788,8 @@ class SSH(object): return None class Banner(object): - _RXP, _RXR = r'SSH-\d\.\s*?\d+', r'(-([^\s]*)(?:\s+(.*))?)?' - RX_PROTOCOL = re.compile(_RXP.replace('\d', '(\d)')) + _RXP, _RXR = r'SSH-\d\.\s*?\d+', r'(-\s*([^\s]*)(?:\s+(.*))?)?' + RX_PROTOCOL = re.compile(re.sub(r'\\d(\+?)', '(\\d\g<1>)', _RXP)) RX_BANNER = re.compile(r'^({0}(?:(?:-{0})*)){1}$'.format(_RXP, _RXR)) def __init__(self, protocol, software, comments): @@ -693,6 +837,8 @@ class SSH(object): if software is None and (mx.group(2) or '').startswith('-'): software = '' comments = (mx.group(4) or '').strip() or None + if comments is not None: + comments = re.sub('\s+', ' ', comments) return cls(protocol, software, comments) class Fingerprint(object): @@ -714,21 +860,34 @@ class SSH(object): class Security(object): CVE = { 'Dropbear SSH': [ - ['0.44', '2015.71', 1, 'CVE-2016-3116', 5.5, 'bypass command restrictions via xauth command injection.'], - ['0.28', '2013.58', 1, 'CVE-2013-4434', 5.0, 'discover valid usernames through different time delays.'], - ['0.28', '2013.58', 1, 'CVE-2013-4421', 5.0, 'cause DoS (memory consumption) via a compressed packet.'], - ['0.52', '2011.54', 1, 'CVE-2012-0920', 7.1, 'execute arbitrary code or bypass command restrictions.'], - ['0.40', '0.48.1', 1, 'CVE-2007-1099', 7.5, 'conduct a MitM attack (no warning for hostkey mismatch).'], - ['0.28', '0.47', 1, 'CVE-2006-1206', 7.5, 'cause DoS (slot exhaustion) via large number of connections.'], - ['0.39', '0.47', 1, 'CVE-2006-0225', 4.6, 'execute arbitrary commands via scp with crafted filenames.'], - ['0.28', '0.46', 1, 'CVE-2005-4178', 6.5, 'execute arbitrary code via buffer overflow vulnerability.'], - ['0.28', '0.42', 1, 'CVE-2004-2486', 7.5, 'execute arbitrary code via DSS verification code.'], - ] + ['0.44', '2015.71', 1, 'CVE-2016-3116', 5.5, 'bypass command restrictions via xauth command injection'], + ['0.28', '2013.58', 1, 'CVE-2013-4434', 5.0, 'discover valid usernames through different time delays'], + ['0.28', '2013.58', 1, 'CVE-2013-4421', 5.0, 'cause DoS (memory consumption) via a compressed packet'], + ['0.52', '2011.54', 1, 'CVE-2012-0920', 7.1, 'execute arbitrary code or bypass command restrictions'], + ['0.40', '0.48.1', 1, 'CVE-2007-1099', 7.5, 'conduct a MitM attack (no warning for hostkey mismatch)'], + ['0.28', '0.47', 1, 'CVE-2006-1206', 7.5, 'cause DoS (slot exhaustion) via large number of connections'], + ['0.39', '0.47', 1, 'CVE-2006-0225', 4.6, 'execute arbitrary commands via scp with crafted filenames'], + ['0.28', '0.46', 1, 'CVE-2005-4178', 6.5, 'execute arbitrary code via buffer overflow vulnerability'], + ['0.28', '0.42', 1, 'CVE-2004-2486', 7.5, 'execute arbitrary code via DSS verification code']], + 'libssh': [ + ['0.1', '0.7.2', 1, 'CVE-2016-0739', 4.3, 'conduct a MitM attack (weakness in DH key generation)'], + ['0.5.1', '0.6.4', 1, 'CVE-2015-3146', 5.0, 'cause DoS via kex packets (null pointer dereference)'], + ['0.5.1', '0.6.3', 1, 'CVE-2014-8132', 5.0, 'cause DoS via kex init packet (dangling pointer)'], + ['0.4.7', '0.6.2', 1, 'CVE-2014-0017', 1.9, 'leak data via PRNG state reuse on forking servers'], + ['0.4.7', '0.5.3', 1, 'CVE-2013-0176', 4.3, 'cause DoS via kex packet (null pointer dereference)'], + ['0.4.7', '0.5.2', 1, 'CVE-2012-6063', 7.5, 'cause DoS or execute arbitrary code via sftp (double free)'], + ['0.4.7', '0.5.2', 1, 'CVE-2012-4562', 7.5, 'cause DoS or execute arbitrary code (overflow check)'], + ['0.4.7', '0.5.2', 1, 'CVE-2012-4561', 5.0, 'cause DoS via unspecified vectors (invalid pointer)'], + ['0.4.7', '0.5.2', 1, 'CVE-2012-4560', 7.5, 'cause DoS or execute arbitrary code (buffer overflow)'], + ['0.4.7', '0.5.2', 1, 'CVE-2012-4559', 6.8, 'cause DoS or execute arbitrary code (double free)']] } TXT = { 'Dropbear SSH': [ - ['0.28', '0.34', 1, 'remote root exploit', 'remote format string buffer overflow exploit (exploit-db#387).'], - ] + ['0.28', '0.34', 1, 'remote root exploit', 'remote format string buffer overflow exploit (exploit-db#387)']], + 'libssh': [ + ['0.3.3', '0.3.3', 1, 'null pointer check', 'missing null pointer check in "crypt_set_algorithms_server"'], + ['0.3.3', '0.3.3', 1, 'integer overflow', 'integer overflow in "buffer_get_data"'], + ['0.3.3', '0.3.3', 3, 'heap overflow', 'heap overflow in "packet_decrypt"']] } class Socket(ReadBuf, WriteBuf): @@ -960,29 +1119,29 @@ class KexDB(object): ALGORITHMS = { 'kex': { - 'diffie-hellman-group1-sha1': [['2.3.0,d0.28', '6.6', '6.9'], [FAIL_OPENSSH67_UNSAFE, FAIL_OPENSSH70_LOGJAM], [WARN_MODULUS_SIZE, WARN_HASH_WEAK]], - 'diffie-hellman-group14-sha1': [['3.9,d0.53'], [], [WARN_HASH_WEAK]], + 'diffie-hellman-group1-sha1': [['2.3.0,d0.28,l10.2', '6.6', '6.9'], [FAIL_OPENSSH67_UNSAFE, FAIL_OPENSSH70_LOGJAM], [WARN_MODULUS_SIZE, WARN_HASH_WEAK]], + 'diffie-hellman-group14-sha1': [['3.9,d0.53,l10.6.0'], [], [WARN_HASH_WEAK]], 'diffie-hellman-group14-sha256': [['7.3,d2016.73']], 'diffie-hellman-group16-sha512': [['7.3,d2016.73']], 'diffie-hellman-group18-sha512': [['7.3']], 'diffie-hellman-group-exchange-sha1': [['2.3.0', '6.6', None], [FAIL_OPENSSH67_UNSAFE], [WARN_HASH_WEAK]], 'diffie-hellman-group-exchange-sha256': [['4.4'], [], [WARN_MODULUS_CUSTOM]], - 'ecdh-sha2-nistp256': [['5.7,d2013.62'], [WARN_CURVES_WEAK]], + 'ecdh-sha2-nistp256': [['5.7,d2013.62,l10.6.0'], [WARN_CURVES_WEAK]], 'ecdh-sha2-nistp384': [['5.7,d2013.62'], [WARN_CURVES_WEAK]], 'ecdh-sha2-nistp521': [['5.7,d2013.62'], [WARN_CURVES_WEAK]], - 'curve25519-sha256@libssh.org': [['6.5,d2013.62']], + 'curve25519-sha256@libssh.org': [['6.5,d2013.62,l10.6.0']], 'kexguess2@matt.ucc.asn.au': [['d2013.57']], }, 'key': { 'rsa-sha2-256': [['7.2']], 'rsa-sha2-512': [['7.2']], - 'ssh-ed25519': [['6.5']], + 'ssh-ed25519': [['6.5,l10.7.0']], 'ssh-ed25519-cert-v01@openssh.com': [['6.5']], - 'ssh-rsa': [['2.5.0,d0.28']], - 'ssh-dss': [['2.1.0,d0.28', '6.9'], [FAIL_OPENSSH70_WEAK], [WARN_MODULUS_SIZE, WARN_RNDSIG_KEY]], - 'ecdsa-sha2-nistp256': [['5.7,d2013.62'], [WARN_CURVES_WEAK], [WARN_RNDSIG_KEY]], - 'ecdsa-sha2-nistp384': [['5.7,d2013.62'], [WARN_CURVES_WEAK], [WARN_RNDSIG_KEY]], - 'ecdsa-sha2-nistp521': [['5.7,d2013.62'], [WARN_CURVES_WEAK], [WARN_RNDSIG_KEY]], + 'ssh-rsa': [['2.5.0,d0.28,l10.2']], + 'ssh-dss': [['2.1.0,d0.28,l10.2', '6.9'], [FAIL_OPENSSH70_WEAK], [WARN_MODULUS_SIZE, WARN_RNDSIG_KEY]], + 'ecdsa-sha2-nistp256': [['5.7,d2013.62,l10.6.4'], [WARN_CURVES_WEAK], [WARN_RNDSIG_KEY]], + 'ecdsa-sha2-nistp384': [['5.7,d2013.62,l10.6.4'], [WARN_CURVES_WEAK], [WARN_RNDSIG_KEY]], + 'ecdsa-sha2-nistp521': [['5.7,d2013.62,l10.6.4'], [WARN_CURVES_WEAK], [WARN_RNDSIG_KEY]], 'ssh-rsa-cert-v00@openssh.com': [['5.4', '6.9'], [FAIL_OPENSSH70_LEGACY], []], 'ssh-dss-cert-v00@openssh.com': [['5.4', '6.9'], [FAIL_OPENSSH70_LEGACY], [WARN_MODULUS_SIZE, WARN_RNDSIG_KEY]], 'ssh-rsa-cert-v01@openssh.com': [['5.6']], @@ -992,10 +1151,10 @@ class KexDB(object): 'ecdsa-sha2-nistp521-cert-v01@openssh.com': [['5.7'], [WARN_CURVES_WEAK], [WARN_RNDSIG_KEY]], }, 'enc': { - 'none': [['1.2.2,d2013.56'], [FAIL_PLAINTEXT]], - '3des-cbc': [['1.2.2,d0.28', '6.6', None], [FAIL_OPENSSH67_UNSAFE], [WARN_CIPHER_WEAK, WARN_CIPHER_MODE, WARN_BLOCK_SIZE]], + 'none': [['1.2.2,d2013.56,l10.2'], [FAIL_PLAINTEXT]], + '3des-cbc': [['1.2.2,d0.28,l10.2', '6.6', None], [FAIL_OPENSSH67_UNSAFE], [WARN_CIPHER_WEAK, WARN_CIPHER_MODE, WARN_BLOCK_SIZE]], '3des-ctr': [['d0.52']], - 'blowfish-cbc': [['1.2.2,d0.28', '6.6,d0.52', '7.1,d0.52'], [FAIL_OPENSSH67_UNSAFE, FAIL_DBEAR53_DISABLED], [WARN_OPENSSH72_LEGACY, WARN_CIPHER_MODE, WARN_BLOCK_SIZE]], + 'blowfish-cbc': [['1.2.2,d0.28,l10.2', '6.6,d0.52', '7.1,d0.52'], [FAIL_OPENSSH67_UNSAFE, FAIL_DBEAR53_DISABLED], [WARN_OPENSSH72_LEGACY, WARN_CIPHER_MODE, WARN_BLOCK_SIZE]], 'twofish-cbc': [['d0.28', 'd2014.66'], [FAIL_DBEAR67_DISABLED], [WARN_CIPHER_MODE]], 'twofish128-cbc': [['d0.47', 'd2014.66'], [FAIL_DBEAR67_DISABLED], [WARN_CIPHER_MODE]], 'twofish256-cbc': [['d0.47', 'd2014.66'], [FAIL_DBEAR67_DISABLED], [WARN_CIPHER_MODE]], @@ -1005,27 +1164,27 @@ class KexDB(object): 'arcfour': [['2.1.0', '6.6', '7.1'], [FAIL_OPENSSH67_UNSAFE], [WARN_OPENSSH72_LEGACY, WARN_CIPHER_WEAK]], 'arcfour128': [['4.2', '6.6', '7.1'], [FAIL_OPENSSH67_UNSAFE], [WARN_OPENSSH72_LEGACY, WARN_CIPHER_WEAK]], 'arcfour256': [['4.2', '6.6', '7.1'], [FAIL_OPENSSH67_UNSAFE], [WARN_OPENSSH72_LEGACY, WARN_CIPHER_WEAK]], - 'aes128-cbc': [['2.3.0,d0.28', '6.6', None], [FAIL_OPENSSH67_UNSAFE], [WARN_CIPHER_MODE]], - 'aes192-cbc': [['2.3.0', '6.6', None], [FAIL_OPENSSH67_UNSAFE], [WARN_CIPHER_MODE]], - 'aes256-cbc': [['2.3.0,d0.47', '6.6', None], [FAIL_OPENSSH67_UNSAFE], [WARN_CIPHER_MODE]], + 'aes128-cbc': [['2.3.0,d0.28,l10.2', '6.6', None], [FAIL_OPENSSH67_UNSAFE], [WARN_CIPHER_MODE]], + 'aes192-cbc': [['2.3.0,l10.2', '6.6', None], [FAIL_OPENSSH67_UNSAFE], [WARN_CIPHER_MODE]], + 'aes256-cbc': [['2.3.0,d0.47,l10.2', '6.6', None], [FAIL_OPENSSH67_UNSAFE], [WARN_CIPHER_MODE]], 'rijndael128-cbc': [['2.3.0', '3.0.2'], [FAIL_OPENSSH31_REMOVE], [WARN_CIPHER_MODE]], 'rijndael192-cbc': [['2.3.0', '3.0.2'], [FAIL_OPENSSH31_REMOVE], [WARN_CIPHER_MODE]], 'rijndael256-cbc': [['2.3.0', '3.0.2'], [FAIL_OPENSSH31_REMOVE], [WARN_CIPHER_MODE]], 'rijndael-cbc@lysator.liu.se': [['2.3.0', '6.6', '7.1'], [FAIL_OPENSSH67_UNSAFE], [WARN_OPENSSH72_LEGACY, WARN_CIPHER_MODE]], - 'aes128-ctr': [['3.7,d0.52']], - 'aes192-ctr': [['3.7']], - 'aes256-ctr': [['3.7,d0.52']], + 'aes128-ctr': [['3.7,d0.52,l10.4.1']], + 'aes192-ctr': [['3.7,l10.4.1']], + 'aes256-ctr': [['3.7,d0.52,l10.4.1']], 'aes128-gcm@openssh.com': [['6.2']], 'aes256-gcm@openssh.com': [['6.2']], 'chacha20-poly1305@openssh.com': [['6.5'], [], [], [INFO_OPENSSH69_CHACHA]], }, 'mac': { 'none': [['d2013.56'], [FAIL_PLAINTEXT]], - 'hmac-sha1': [['2.1.0,d0.28'], [], [WARN_ENCRYPT_AND_MAC, WARN_HASH_WEAK]], + 'hmac-sha1': [['2.1.0,d0.28,l10.2'], [], [WARN_ENCRYPT_AND_MAC, WARN_HASH_WEAK]], 'hmac-sha1-96': [['2.5.0,d0.47', '6.6', '7.1'], [FAIL_OPENSSH67_UNSAFE], [WARN_OPENSSH72_LEGACY, WARN_ENCRYPT_AND_MAC, WARN_HASH_WEAK]], - 'hmac-sha2-256': [['5.9,d2013.56'], [], [WARN_ENCRYPT_AND_MAC]], + 'hmac-sha2-256': [['5.9,d2013.56,l10.7.0'], [], [WARN_ENCRYPT_AND_MAC]], 'hmac-sha2-256-96': [['5.9', '6.0'], [FAIL_OPENSSH61_REMOVE], [WARN_ENCRYPT_AND_MAC]], - 'hmac-sha2-512': [['5.9,d2013.56'], [], [WARN_ENCRYPT_AND_MAC]], + 'hmac-sha2-512': [['5.9,d2013.56,l10.7.0'], [], [WARN_ENCRYPT_AND_MAC]], 'hmac-sha2-512-96': [['5.9', '6.0'], [FAIL_OPENSSH61_REMOVE], [WARN_ENCRYPT_AND_MAC]], 'hmac-md5': [['2.1.0,d0.28', '6.6', '7.1'], [FAIL_OPENSSH67_UNSAFE], [WARN_OPENSSH72_LEGACY, WARN_ENCRYPT_AND_MAC, WARN_HASH_WEAK]], 'hmac-md5-96': [['2.5.0', '6.6', '7.1'], [FAIL_OPENSSH67_UNSAFE], [WARN_OPENSSH72_LEGACY, WARN_ENCRYPT_AND_MAC, WARN_HASH_WEAK]], @@ -1049,6 +1208,8 @@ class KexDB(object): def get_ssh_version(version_desc): if version_desc.startswith('d'): return (SSH.Product.DropbearSSH, version_desc[1:]) + elif version_desc.startswith('l1'): + return (SSH.Product.LibSSH, version_desc[2:]) else: return (SSH.Product.OpenSSH, version_desc) @@ -1091,8 +1252,10 @@ def get_alg_timeframe(alg_desc, for_server=True, result={}): def get_ssh_timeframe(alg_pairs, for_server=True): timeframe = {} for alg_pair in alg_pairs: - alg_db, algs = alg_pair - for alg_type, alg_list in algs.items(): + sshv, alg_db = alg_pair[0] + alg_sets = alg_pair[1:] + for alg_set in alg_sets: + alg_type, alg_list = alg_set for alg_name in alg_list: alg_desc = alg_db[alg_type].get(alg_name) if alg_desc is None: @@ -1110,6 +1273,8 @@ def get_alg_since_text(alg_desc): ssh_prefix, ssh_version = get_ssh_version(v) if not ssh_version: continue + if ssh_prefix in [SSH.Product.LibSSH]: + continue if ssh_version.endswith('C'): ssh_version = '{0} (client only)'.format(ssh_version[:-1]) tv.append('{0} {1}'.format(ssh_prefix, ssh_version)) @@ -1118,6 +1283,117 @@ def get_alg_since_text(alg_desc): return 'available since ' + ', '.join(tv).rstrip(', ') +def get_alg_pairs(kex, pkm): + alg_pairs = [] + if pkm is not None: + alg_pairs.append(((1, SSH1.KexDB.ALGORITHMS), + ('key', ['ssh-rsa1']), + ('enc', pkm.supported_ciphers), + ('aut', pkm.supported_authentications))) + if kex is not None: + alg_pairs.append(((2, KexDB.ALGORITHMS), + ('kex', kex.kex_algorithms), + ('key', kex.key_algorithms), + ('enc', kex.server.encryption), + ('mac', kex.server.mac))) + return alg_pairs + + +def get_alg_recommendations(software, kex, pkm, for_server=True): + alg_pairs = get_alg_pairs(kex, pkm) + vproducts = [SSH.Product.OpenSSH, + SSH.Product.DropbearSSH, + SSH.Product.LibSSH] + if software is not None: + if software.product not in vproducts: + software = None + if software is None: + ssh_timeframe = get_ssh_timeframe(alg_pairs, for_server) + for product in vproducts: + if product not in ssh_timeframe: + continue + version = ssh_timeframe[product][0] + if version is not None: + software = SSH.Software(None, product, version, None, None) + break + rec = {'.software': software} + if software is None: + return rec + for alg_pair in alg_pairs: + sshv, alg_db = alg_pair[0] + alg_sets = alg_pair[1:] + rec[sshv] = {} + for alg_set in alg_sets: + alg_type, alg_list = alg_set + if alg_type == 'aut': + continue + rec[sshv][alg_type] = {'add': [], 'del': {}} + for n, alg_desc in alg_db[alg_type].items(): + if alg_type == 'key' and '-cert-' in n: + continue + versions = alg_desc[0] + if len(versions) == 0 or versions[0] is None: + continue + matches = False + for v in versions[0].split(','): + ssh_prefix, ssh_version = get_ssh_version(v) + if not ssh_version: + continue + if ssh_prefix != software.product: + continue + if ssh_version.endswith('C'): + if for_server: + continue + ssh_version = ssh_version[:-1] + if software.compare_version(ssh_version) < 0: + continue + matches = True + break + if not matches: + continue + adl, faults = len(alg_desc), 0 + for i in range(1, 3): + if not adl > i: + continue + fc = len(alg_desc[i]) + if fc > 0: + faults += pow(10, 2 - i) * fc + if n not in alg_list: + if faults > 0: + continue + rec[sshv][alg_type]['add'].append(n) + else: + if faults == 0: + continue + if n == 'diffie-hellman-group-exchange-sha256': + if software.compare_version('7.3') < 0: + continue + rec[sshv][alg_type]['del'][n] = faults + add_count = len(rec[sshv][alg_type]['add']) + del_count = len(rec[sshv][alg_type]['del']) + new_alg_count = len(alg_list) + add_count - del_count + if new_alg_count < 1 and del_count > 0: + mf, new_del = min(rec[sshv][alg_type]['del'].values()), {} + for k, v in rec[sshv][alg_type]['del'].items(): + if v != mf: + new_del[k] = v + if del_count != len(new_del): + rec[sshv][alg_type]['del'] = new_del + new_alg_count += del_count - len(new_del) + if new_alg_count < 1: + del rec[sshv][alg_type] + else: + if add_count == 0: + del rec[sshv][alg_type]['add'] + if del_count == 0: + del rec[sshv][alg_type]['del'] + if len(rec[sshv][alg_type]) == 0: + del rec[sshv][alg_type] + if len(rec[sshv]) == 0: + del rec[sshv] + return rec + + def output_algorithms(title, alg_db, alg_type, algorithms, maxlen=0): with OutputBuffer() as obuf: for algorithm in algorithms: @@ -1134,6 +1410,8 @@ def output_algorithm(alg_db, alg_type, alg_name, alg_max_len=0): alg_max_len = len(alg_name) padding = '' if out.batch else ' ' * (alg_max_len - len(alg_name)) texts = [] + if len(alg_name.strip()) == 0: + return if alg_name in alg_db[alg_type]: alg_desc = alg_db[alg_type][alg_name] ldesc = len(alg_desc) @@ -1167,18 +1445,7 @@ def output_algorithm(alg_db, alg_type, alg_name, alg_max_len=0): def output_compatibility(kex, pkm, for_server=True): - alg_pairs = [] - if pkm is not None: - alg_pairs.append((SSH1.KexDB.ALGORITHMS, - {'key': ['ssh-rsa1'], - 'enc': pkm.supported_ciphers, - 'aut': pkm.supported_authentications})) - if kex is not None: - alg_pairs.append((KexDB.ALGORITHMS, - {'kex': kex.kex_algorithms, - 'key': kex.key_algorithms, - 'enc': kex.server.encryption, - 'mac': kex.server.mac})) + alg_pairs = get_alg_pairs(kex, pkm) ssh_timeframe = get_ssh_timeframe(alg_pairs, for_server) vp = 1 if for_server else 2 comp_text = [] @@ -1210,7 +1477,8 @@ def output_security_sub(sub, software, padlen): continue target, name = line[2:4] is_server, is_client = target & 1 == 1, target & 2 == 2 - if is_client: + is_local = target & 4 == 4 + if not is_server: continue p = '' if out.batch else ' ' * (padlen - len(name)) if sub == 'cve': @@ -1252,6 +1520,38 @@ def output_fingerprint(kex, pkm, sha256=True, padlen=0): out.sep() +def output_recommendations(software, kex, pkm, padlen=0): + for_server = True + with OutputBuffer() as obuf: + alg_rec = get_alg_recommendations(software, kex, pkm, for_server) + software = alg_rec['.software'] + for sshv in range(2, 0, -1): + if sshv not in alg_rec: + continue + for alg_type in ['kex', 'key', 'enc', 'mac']: + if alg_type not in alg_rec[sshv]: + continue + for action in ['del', 'add']: + if action not in alg_rec[sshv][alg_type]: + continue + for name in alg_rec[sshv][alg_type][action]: + p = '' if out.batch else ' ' * (padlen - len(name)) + if action == 'del': + an, sg, fn = 'remove', '-', out.warn + if alg_rec[sshv][alg_type][action][name] >= 10: + fn = out.fail + else: + an, sg, fn = 'append', '+', out.good + b = '(SSH{0})'.format(sshv) if sshv == 1 else '' + fm = '(rec) {0}{1}{2}-- {3} algorithm to {4} {5}' + fn(fm.format(sg, name, p, alg_type, an, b)) + if len(obuf) > 0: + title = '(for {0})'.format(software.display(False)) if software else '' + out.head('# algorithm recommendations {0}'.format(title)) + obuf.flush() + out.sep() + + def output(banner, header, kex=None, pkm=None): sshv = 1 if pkm else 2 with OutputBuffer() as obuf: @@ -1264,6 +1564,8 @@ def output(banner, header, kex=None, pkm=None): software = SSH.Software.parse(banner) if software is not None: out.good('(gen) software: {0}'.format(software)) + else: + software = None output_compatibility(kex, pkm) if kex is not None: compressions = [x for x in kex.server.compression if x != 'none'] @@ -1287,6 +1589,7 @@ def output(banner, header, kex=None, pkm=None): ml(kex.server.encryption), ml(kex.server.mac), maxlen) + maxlen += 1 output_security(banner, maxlen) if pkm is not None: adb = SSH1.KexDB.ALGORITHMS @@ -1308,59 +1611,36 @@ def output(banner, header, kex=None, pkm=None): output_algorithms(title, adb, atype, kex.server.encryption, maxlen) title, atype = 'message authentication code algorithms', 'mac' output_algorithms(title, adb, atype, kex.server.mac, maxlen) + output_recommendations(software, kex, pkm, maxlen) output_fingerprint(kex, pkm, True, maxlen) -def parse_int(v): - try: - return int(v) - except: - return 0 - - -def parse_args(): - conf = AuditConf() - try: - sopts = 'h12bnvl:' - lopts = ['help', 'ssh1', 'ssh2', 'batch', 'no-colors', 'verbose', 'level='] - opts, args = getopt.getopt(sys.argv[1:], sopts, lopts) - except getopt.GetoptError as err: - usage(str(err)) - for o, a in opts: - if o in ('-h', '--help'): - usage() - elif o in ('-1', '--ssh1'): - conf.ssh1 = True - elif o in ('-2', '--ssh2'): - conf.ssh2 = True - elif o in ('-b', '--batch'): - out.batch = True - out.verbose = True - elif o in ('-n', '--no-colors'): - out.colors = False - elif o in ('-v', '--verbose'): - out.verbose = True - elif o in ('-l', '--level'): - if a not in ('info', 'warn', 'fail'): - usage('level ' + a + ' is not valid') - out.minlevel = a - if len(args) == 0: - usage() - s = args[0].split(':') - host, port = s[0].strip(), 22 - if len(s) > 1: - port = parse_int(s[1]) - if not host or port <= 0: - usage('port {0} is not valid'.format(port)) - conf.host = host - conf.port = port - if not (conf.ssh1 or conf.ssh2): - conf.ssh1 = True - conf.ssh2 = True - return conf +class Utils(object): + PY2 = sys.version_info[0] == 2 + + @classmethod + def wrap(cls): + o = cls() + if cls.PY2: + import StringIO + o.StringIO = o.BytesIO = StringIO.StringIO + else: + o.StringIO, o.BytesIO = io.StringIO, io.BytesIO + return o + + @staticmethod + def parse_int(v): + try: + return int(v) + except: + return 0 def audit(conf, sshv=None): + out.batch = conf.batch + out.colors = conf.colors + out.verbose = conf.verbose + out.minlevel = conf.minlevel s = SSH.Socket(conf.host, conf.port) if sshv is None: sshv = 2 if conf.ssh2 else 1 @@ -1371,7 +1651,8 @@ def audit(conf, sshv=None): if err is None: packet_type, payload = s.read_packet(sshv) if packet_type < 0: - if payload == b'Protocol major versions differ.': + payload = str(payload).decode('utf-8') + if payload == u'Protocol major versions differ.': if sshv == 2 and conf.ssh1: audit(conf, 1) return @@ -1393,11 +1674,12 @@ def audit(conf, sshv=None): pkm = SSH1.PublicKeyMessage.parse(payload) output(banner, header, pkm=pkm) elif sshv == 2: - kex = Kex.parse(payload) + kex = SSH2.Kex.parse(payload) output(banner, header, kex=kex) +utils = Utils.wrap() +out = Output() if __name__ == '__main__': - out = Output() - conf = parse_args() + conf = AuditConf.from_cmdline(sys.argv[1:], usage) audit(conf) diff --git a/test/conftest.py b/test/conftest.py index cf81150..33e9bb4 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -1,10 +1,35 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -import os, sys -import pytest +import pytest, os, sys, io + + +if sys.version_info[0] == 2: + import StringIO + StringIO = StringIO.StringIO +else: + StringIO = io.StringIO + @pytest.fixture(scope='module') def ssh_audit(): __rdir = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..') sys.path.append(os.path.abspath(__rdir)) return __import__('ssh-audit') + + +class _OutputSpy(list): + def begin(self): + self.__out = StringIO() + self.__old_stdout = sys.stdout + sys.stdout = self.__out + + def flush(self): + lines = self.__out.getvalue().splitlines() + sys.stdout = self.__old_stdout + self.__out = None + return lines + + +@pytest.fixture(scope='module') +def output_spy(): + return _OutputSpy() diff --git a/test/test_auditconf.py b/test/test_auditconf.py new file mode 100644 index 0000000..b4f42f4 --- /dev/null +++ b/test/test_auditconf.py @@ -0,0 +1,115 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +import pytest + + +class TestAuditConf(object): + @pytest.fixture(autouse=True) + def init(self, ssh_audit): + self.AuditConf = ssh_audit.AuditConf + self.usage = ssh_audit.usage + + def _test_conf(self, conf, **kwargs): + options = { + 'host': None, + 'port': 22, + 'ssh1': True, + 'ssh2': True, + 'batch': False, + 'colors': True, + 'verbose': False, + 'minlevel': 'info' + } + for k, v in kwargs.items(): + options[k] = v + assert conf.host == options['host'] + assert conf.port == options['port'] + assert conf.ssh1 is options['ssh1'] + assert conf.ssh2 is options['ssh2'] + assert conf.batch is options['batch'] + assert conf.colors is options['colors'] + assert conf.verbose is options['verbose'] + assert conf.minlevel == options['minlevel'] + + def test_audit_conf_defaults(self): + conf = self.AuditConf() + self._test_conf(conf) + + def test_audit_conf_booleans(self): + conf = self.AuditConf() + for p in ['ssh1', 'ssh2', 'batch', 'colors', 'verbose']: + for v in [True, 1]: + setattr(conf, p, v) + assert getattr(conf, p) is True + for v in [False, 0]: + setattr(conf, p, v) + assert getattr(conf, p) is False + + def test_audit_conf_port(self): + conf = self.AuditConf() + for port in [22, 2222]: + conf.port = port + assert conf.port == port + for port in [-1, 0, 65536, 99999]: + with pytest.raises(ValueError) as excinfo: + conf.port = port + excinfo.match(r'.*invalid port.*') + + def test_audit_conf_minlevel(self): + conf = self.AuditConf() + for level in ['info', 'warn', 'fail']: + conf.minlevel = level + assert conf.minlevel == level + for level in ['head', 'good', 'unknown', None]: + with pytest.raises(ValueError) as excinfo: + conf.minlevel = level + excinfo.match(r'.*invalid level.*') + + def test_audit_conf_cmdline(self): + c = lambda x: self.AuditConf.from_cmdline(x.split(), self.usage) + with pytest.raises(SystemExit): + conf = c('') + with pytest.raises(SystemExit): + conf = c('-x') + with pytest.raises(SystemExit): + conf = c('-h') + with pytest.raises(SystemExit): + conf = c('--help') + with pytest.raises(SystemExit): + conf = c(':') + with pytest.raises(SystemExit): + conf = c(':22') + conf = c('localhost') + self._test_conf(conf, host='localhost') + conf = c('github.com') + self._test_conf(conf, host='github.com') + conf = c('localhost:2222') + self._test_conf(conf, host='localhost', port=2222) + with pytest.raises(SystemExit): + conf = c('localhost:') + with pytest.raises(SystemExit): + conf = c('localhost:abc') + with pytest.raises(SystemExit): + conf = c('localhost:-22') + with pytest.raises(SystemExit): + conf = c('localhost:99999') + conf = c('-1 localhost') + self._test_conf(conf, host='localhost', ssh1=True, ssh2=False) + conf = c('-2 localhost') + self._test_conf(conf, host='localhost', ssh1=False, ssh2=True) + conf = c('-12 localhost') + self._test_conf(conf, host='localhost', ssh1=True, ssh2=True) + conf = c('-b localhost') + self._test_conf(conf, host='localhost', batch=True, verbose=True) + conf = c('-n localhost') + self._test_conf(conf, host='localhost', colors=False) + conf = c('-v localhost') + self._test_conf(conf, host='localhost', verbose=True) + conf = c('-l info localhost') + self._test_conf(conf, host='localhost', minlevel='info') + conf = c('-l warn localhost') + self._test_conf(conf, host='localhost', minlevel='warn') + conf = c('-l fail localhost') + self._test_conf(conf, host='localhost', minlevel='fail') + with pytest.raises(SystemExit): + conf = c('-l something localhost') diff --git a/test/test_banner.py b/test/test_banner.py new file mode 100644 index 0000000..b2d9991 --- /dev/null +++ b/test/test_banner.py @@ -0,0 +1,68 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +import pytest + + +class TestBanner(object): + @pytest.fixture(autouse=True) + def init(self, ssh_audit): + self.ssh = ssh_audit.SSH + + def test_simple_banners(self): + banner = lambda x: self.ssh.Banner.parse(x) + b = banner('SSH-2.0-OpenSSH_7.3') + assert b.protocol == (2, 0) + assert b.software == 'OpenSSH_7.3' + assert b.comments is None + assert str(b) == 'SSH-2.0-OpenSSH_7.3' + b = banner('SSH-1.99-Sun_SSH_1.1.3') + assert b.protocol == (1, 99) + assert b.software == 'Sun_SSH_1.1.3' + assert b.comments is None + assert str(b) == 'SSH-1.99-Sun_SSH_1.1.3' + b = banner('SSH-1.5-Cisco-1.25') + assert b.protocol == (1, 5) + assert b.software == 'Cisco-1.25' + assert b.comments is None + assert str(b) == 'SSH-1.5-Cisco-1.25' + + def test_invalid_banners(self): + b = lambda x: self.ssh.Banner.parse(x) + assert b('Something') is None + assert b('SSH-XXX-OpenSSH_7.3') is None + + def test_banners_with_spaces(self): + b = lambda x: self.ssh.Banner.parse(x) + s = 'SSH-2.0-OpenSSH_4.3p2' + assert str(b('SSH-2.0-OpenSSH_4.3p2 ')) == s + assert str(b('SSH-2.0- OpenSSH_4.3p2')) == s + assert str(b('SSH-2.0- OpenSSH_4.3p2 ')) == s + s = 'SSH-2.0-OpenSSH_4.3p2 Debian-9etch3 on i686-pc-linux-gnu' + assert str(b('SSH-2.0- OpenSSH_4.3p2 Debian-9etch3 on i686-pc-linux-gnu')) == s + assert str(b('SSH-2.0-OpenSSH_4.3p2 Debian-9etch3 on i686-pc-linux-gnu ')) == s + assert str(b('SSH-2.0- OpenSSH_4.3p2 Debian-9etch3 on i686-pc-linux-gnu ')) == s + + def test_banners_without_software(self): + b = lambda x: self.ssh.Banner.parse(x) + assert b('SSH-2.0').protocol == (2, 0) + assert b('SSH-2.0').software is None + assert b('SSH-2.0').comments is None + assert str(b('SSH-2.0')) == 'SSH-2.0' + assert b('SSH-2.0-').protocol == (2, 0) + assert b('SSH-2.0-').software == '' + assert b('SSH-2.0-').comments is None + assert str(b('SSH-2.0-')) == 'SSH-2.0-' + + def test_banners_with_comments(self): + b = lambda x: self.ssh.Banner.parse(x) + assert repr(b('SSH-2.0-OpenSSH_7.2p2 Ubuntu-1')) == '' + assert repr(b('SSH-1.99-OpenSSH_3.4p1 Debian 1:3.4p1-1.woody.3')) == '' + assert repr(b('SSH-1.5-1.3.7 F-SECURE SSH')) == '' + + def test_banners_with_multiple_protocols(self): + b = lambda x: self.ssh.Banner.parse(x) + assert str(b('SSH-1.99-SSH-1.99-OpenSSH_3.6.1p2')) == 'SSH-1.99-OpenSSH_3.6.1p2' + assert str(b('SSH-2.0-SSH-2.0-OpenSSH_4.3p2 Debian-9')) == 'SSH-2.0-OpenSSH_4.3p2 Debian-9' + assert str(b('SSH-1.99-SSH-2.0-dropbear_0.5')) == 'SSH-1.99-dropbear_0.5' + assert str(b('SSH-2.0-SSH-1.99-OpenSSH_4.2p1 SSH Secure Shell (non-commercial)')) == 'SSH-1.99-OpenSSH_4.2p1 SSH Secure Shell (non-commercial)' + assert str(b('SSH-1.99-SSH-1.99-SSH-1.99-OpenSSH_3.9p1')) == 'SSH-1.99-OpenSSH_3.9p1' diff --git a/test/test_buffer.py b/test/test_buffer.py new file mode 100644 index 0000000..968e3f7 --- /dev/null +++ b/test/test_buffer.py @@ -0,0 +1,117 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +import pytest +import re + + +class TestBuffer(object): + @pytest.fixture(autouse=True) + def init(self, ssh_audit): + self.rbuf = ssh_audit.ReadBuf + self.wbuf = ssh_audit.WriteBuf + + def _b(self, v): + v = re.sub(r'\s', '', v) + data = [int(v[i * 2:i * 2 + 2], 16) for i in range(len(v) // 2)] + return bytes(bytearray(data)) + + def test_unread(self): + w = self.wbuf().write_byte(1).write_int(2).write_flush() + r = self.rbuf(w) + assert r.unread_len == 5 + r.read_byte() + assert r.unread_len == 4 + r.read_int() + assert r.unread_len == 0 + + def test_byte(self): + w = lambda x: self.wbuf().write_byte(x).write_flush() + r = lambda x: self.rbuf(x).read_byte() + tc = [(0x00, '00'), + (0x01, '01'), + (0x10, '10'), + (0xff, 'ff')] + for p in tc: + assert w(p[0]) == self._b(p[1]) + assert r(self._b(p[1])) == p[0] + + def test_bool(self): + w = lambda x: self.wbuf().write_bool(x).write_flush() + r = lambda x: self.rbuf(x).read_bool() + tc = [(True, '01'), + (False, '00')] + for p in tc: + assert w(p[0]) == self._b(p[1]) + assert r(self._b(p[1])) == p[0] + + def test_int(self): + w = lambda x: self.wbuf().write_int(x).write_flush() + r = lambda x: self.rbuf(x).read_int() + tc = [(0x00, '00 00 00 00'), + (0x01, '00 00 00 01'), + (0xabcd, '00 00 ab cd'), + (0xffffffff, 'ff ff ff ff')] + for p in tc: + assert w(p[0]) == self._b(p[1]) + assert r(self._b(p[1])) == p[0] + + def test_string(self): + w = lambda x: self.wbuf().write_string(x).write_flush() + r = lambda x: self.rbuf(x).read_string() + tc = [(u'abc1', '00 00 00 04 61 62 63 31'), + (b'abc2', '00 00 00 04 61 62 63 32')] + for p in tc: + v = p[0] + assert w(v) == self._b(p[1]) + if not isinstance(v, bytes): + v = bytes(bytearray(v, 'utf-8')) + assert r(self._b(p[1])) == v + + def test_list(self): + w = lambda x: self.wbuf().write_list(x).write_flush() + r = lambda x: self.rbuf(x).read_list() + tc = [(['d', 'ef', 'ault'], '00 00 00 09 64 2c 65 66 2c 61 75 6c 74')] + for p in tc: + assert w(p[0]) == self._b(p[1]) + assert r(self._b(p[1])) == p[0] + + def test_line(self): + w = lambda x: self.wbuf().write_line(x).write_flush() + r = lambda x: self.rbuf(x).read_line() + tc = [(u'example line', '65 78 61 6d 70 6c 65 20 6c 69 6e 65 0d 0a')] + for p in tc: + assert w(p[0]) == self._b(p[1]) + assert r(self._b(p[1])) == p[0] + + def test_bitlen(self): + class Py26Int(int): + def bit_length(self): + raise AttributeError + assert self.wbuf._bitlength(42) == 6 + assert self.wbuf._bitlength(Py26Int(42)) == 6 + + def test_mpint1(self): + mpint1w = lambda x: self.wbuf().write_mpint1(x).write_flush() + mpint1r = lambda x: self.rbuf(x).read_mpint1() + tc = [(0x0, '00 00'), + (0x1234, '00 0d 12 34'), + (0x12345, '00 11 01 23 45'), + (0xdeadbeef, '00 20 de ad be ef')] + for p in tc: + assert mpint1w(p[0]) == self._b(p[1]) + assert mpint1r(self._b(p[1])) == p[0] + + def test_mpint2(self): + mpint2w = lambda x: self.wbuf().write_mpint2(x).write_flush() + mpint2r = lambda x: self.rbuf(x).read_mpint2() + tc = [(0x0, '00 00 00 00'), + (0x80, '00 00 00 02 00 80'), + (0x9a378f9b2e332a7, '00 00 00 08 09 a3 78 f9 b2 e3 32 a7'), + (-0x1234, '00 00 00 02 ed cc'), + (-0xdeadbeef, '00 00 00 05 ff 21 52 41 11'), + (-0x8000, '00 00 00 02 80 00'), + (-0x80, '00 00 00 01 80')] + for p in tc: + assert mpint2w(p[0]) == self._b(p[1]) + assert mpint2r(self._b(p[1])) == p[0] + assert mpint2r(self._b('00 00 00 02 ff 80')) == -0x80 diff --git a/test/test_output.py b/test/test_output.py new file mode 100644 index 0000000..8ba3645 --- /dev/null +++ b/test/test_output.py @@ -0,0 +1,172 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +from __future__ import print_function +import pytest, io, sys + + +class TestOutput(object): + @pytest.fixture(autouse=True) + def init(self, ssh_audit): + self.Output = ssh_audit.Output + self.OutputBuffer = ssh_audit.OutputBuffer + + def test_output_buffer_no_lines(self, output_spy): + output_spy.begin() + with self.OutputBuffer() as obuf: + pass + assert output_spy.flush() == [] + output_spy.begin() + with self.OutputBuffer() as obuf: + pass + obuf.flush() + assert output_spy.flush() == [] + + def test_output_buffer_no_flush(self, output_spy): + output_spy.begin() + with self.OutputBuffer() as obuf: + print(u'abc') + assert output_spy.flush() == [] + + def test_output_buffer_flush(self, output_spy): + output_spy.begin() + with self.OutputBuffer() as obuf: + print(u'abc') + print() + print(u'def') + obuf.flush() + assert output_spy.flush() == [u'abc', u'', u'def'] + + def test_output_defaults(self): + out = self.Output() + # default: on + assert out.batch is False + assert out.colors is True + assert out.minlevel == 'info' + + def test_output_colors(self, output_spy): + out = self.Output() + # test without colors + out.colors = False + output_spy.begin() + out.info('info color') + assert output_spy.flush() == [u'info color'] + output_spy.begin() + out.head('head color') + assert output_spy.flush() == [u'head color'] + output_spy.begin() + out.good('good color') + assert output_spy.flush() == [u'good color'] + output_spy.begin() + out.warn('warn color') + assert output_spy.flush() == [u'warn color'] + output_spy.begin() + out.fail('fail color') + assert output_spy.flush() == [u'fail color'] + # test with colors + out.colors = True + output_spy.begin() + out.info('info color') + assert output_spy.flush() == [u'info color'] + output_spy.begin() + out.head('head color') + assert output_spy.flush() == [u'\x1b[0;36mhead color\x1b[0m'] + output_spy.begin() + out.good('good color') + assert output_spy.flush() == [u'\x1b[0;32mgood color\x1b[0m'] + output_spy.begin() + out.warn('warn color') + assert output_spy.flush() == [u'\x1b[0;33mwarn color\x1b[0m'] + output_spy.begin() + out.fail('fail color') + assert output_spy.flush() == [u'\x1b[0;31mfail color\x1b[0m'] + + def test_output_sep(self, output_spy): + out = self.Output() + output_spy.begin() + out.sep() + out.sep() + out.sep() + assert output_spy.flush() == [u'', u'', u''] + + def test_output_levels(self): + out = self.Output() + assert out.getlevel('info') == 0 + assert out.getlevel('good') == 0 + assert out.getlevel('warn') == 1 + assert out.getlevel('fail') == 2 + assert out.getlevel('unknown') > 2 + + def test_output_minlevel_property(self): + out = self.Output() + out.minlevel = 'info' + assert out.minlevel == 'info' + out.minlevel = 'good' + assert out.minlevel == 'info' + out.minlevel = 'warn' + assert out.minlevel == 'warn' + out.minlevel = 'fail' + assert out.minlevel == 'fail' + out.minlevel = 'invalid level' + assert out.minlevel == 'unknown' + + def test_output_minlevel(self, output_spy): + out = self.Output() + # visible: all + out.minlevel = 'info' + output_spy.begin() + out.info('info color') + out.head('head color') + out.good('good color') + out.warn('warn color') + out.fail('fail color') + assert len(output_spy.flush()) == 5 + # visible: head, warn, fail + out.minlevel = 'warn' + output_spy.begin() + out.info('info color') + out.head('head color') + out.good('good color') + out.warn('warn color') + out.fail('fail color') + assert len(output_spy.flush()) == 3 + # visible: head, fail + out.minlevel = 'fail' + output_spy.begin() + out.info('info color') + out.head('head color') + out.good('good color') + out.warn('warn color') + out.fail('fail color') + assert len(output_spy.flush()) == 2 + # visible: head + out.minlevel = 'invalid level' + output_spy.begin() + out.info('info color') + out.head('head color') + out.good('good color') + out.warn('warn color') + out.fail('fail color') + assert len(output_spy.flush()) == 1 + + def test_output_batch(self, output_spy): + out = self.Output() + # visible: all + output_spy.begin() + out.minlevel = 'info' + out.batch = False + out.info('info color') + out.head('head color') + out.good('good color') + out.warn('warn color') + out.fail('fail color') + assert len(output_spy.flush()) == 5 + # visible: all except head + output_spy.begin() + out.minlevel = 'info' + out.batch = True + out.info('info color') + out.head('head color') + out.good('good color') + out.warn('warn color') + out.fail('fail color') + assert len(output_spy.flush()) == 4 diff --git a/test/test_protocol.py b/test/test_protocol.py deleted file mode 100644 index 78c4291..0000000 --- a/test/test_protocol.py +++ /dev/null @@ -1,42 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -import pytest -import re - - -class TestProtocol(object): - @pytest.fixture(autouse=True) - def init(self, ssh_audit): - self.rbuf = ssh_audit.ReadBuf - self.wbuf = ssh_audit.WriteBuf - - def _b(self, v): - v = re.sub(r'\s', '', v) - data = [int(v[i * 2:i * 2 + 2], 16) for i in range(len(v) // 2)] - return bytes(bytearray(data)) - - def test_mpint1(self): - mpint1w = lambda x: self.wbuf().write_mpint1(x).write_flush() - mpint1r = lambda x: self.rbuf(x).read_mpint1() - tc = [(0x0, '00 00'), - (0x1234, '00 0d 12 34'), - (0x12345, '00 11 01 23 45'), - (0xdeadbeef, '00 20 de ad be ef')] - for p in tc: - assert mpint1w(p[0]) == self._b(p[1]) - assert mpint1r(self._b(p[1])) == p[0] - - def test_mpint2(self): - mpint2w = lambda x: self.wbuf().write_mpint2(x).write_flush() - mpint2r = lambda x: self.rbuf(x).read_mpint2() - tc = [(0x0, '00 00 00 00'), - (0x80, '00 00 00 02 00 80'), - (0x9a378f9b2e332a7, '00 00 00 08 09 a3 78 f9 b2 e3 32 a7'), - (-0x1234, '00 00 00 02 ed cc'), - (-0xdeadbeef, '00 00 00 05 ff 21 52 41 11'), - (-0x8000, '00 00 00 02 80 00'), - (-0x80, '00 00 00 01 80')] - for p in tc: - assert mpint2w(p[0]) == self._b(p[1]) - assert mpint2r(self._b(p[1])) == p[0] - assert mpint2r(self._b('00 00 00 02 ff 80')) == -0x80 diff --git a/test/test_software.py b/test/test_software.py new file mode 100644 index 0000000..20eca18 --- /dev/null +++ b/test/test_software.py @@ -0,0 +1,285 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +import pytest + + +class TestSoftware(object): + @pytest.fixture(autouse=True) + def init(self, ssh_audit): + self.ssh = ssh_audit.SSH + + def test_unknown_software(self): + ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x)) + assert ps('SSH-1.5') is None + assert ps('SSH-1.99-AlfaMegaServer') is None + assert ps('SSH-2.0-BetaMegaServer 0.0.1') is None + + def test_openssh_software(self): + ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x)) + # common + s = ps('SSH-2.0-OpenSSH_7.3') + assert s.vendor is None + assert s.product == 'OpenSSH' + assert s.version == '7.3' + assert s.patch is None + assert s.os is None + assert str(s) == 'OpenSSH 7.3' + assert str(s) == s.display() + assert s.display(True) == str(s) + assert s.display(False) == str(s) + assert repr(s) == '' + # common, portable + s = ps('SSH-2.0-OpenSSH_7.2p1') + assert s.vendor is None + assert s.product == 'OpenSSH' + assert s.version == '7.2' + assert s.patch == 'p1' + assert s.os is None + assert str(s) == 'OpenSSH 7.2p1' + assert str(s) == s.display() + assert s.display(True) == str(s) + assert s.display(False) == 'OpenSSH 7.2' + assert repr(s) == '' + # dot instead of underline + s = ps('SSH-2.0-OpenSSH.6.6') + assert s.vendor is None + assert s.product == 'OpenSSH' + assert s.version == '6.6' + assert s.patch is None + assert s.os is None + assert str(s) == 'OpenSSH 6.6' + assert str(s) == s.display() + assert s.display(True) == str(s) + assert s.display(False) == str(s) + assert repr(s) == '' + # dash instead of underline + s = ps('SSH-2.0-OpenSSH-3.9p1') + assert s.vendor is None + assert s.product == 'OpenSSH' + assert s.version == '3.9' + assert s.patch == 'p1' + assert s.os is None + assert str(s) == 'OpenSSH 3.9p1' + assert str(s) == s.display() + assert s.display(True) == str(s) + assert s.display(False) == 'OpenSSH 3.9' + assert repr(s) == '' + # patch prefix with dash + s = ps('SSH-2.0-OpenSSH_7.2-hpn14v5') + assert s.vendor is None + assert s.product == 'OpenSSH' + assert s.version == '7.2' + assert s.patch == 'hpn14v5' + assert s.os is None + assert str(s) == 'OpenSSH 7.2 (hpn14v5)' + assert str(s) == s.display() + assert s.display(True) == str(s) + assert s.display(False) == 'OpenSSH 7.2' + assert repr(s) == '' + # patch prefix with underline + s = ps('SSH-1.5-OpenSSH_6.6.1_hpn13v11') + assert s.vendor is None + assert s.product == 'OpenSSH' + assert s.version == '6.6.1' + assert s.patch == 'hpn13v11' + assert s.os is None + assert str(s) == 'OpenSSH 6.6.1 (hpn13v11)' + assert str(s) == s.display() + assert s.display(True) == str(s) + assert s.display(False) == 'OpenSSH 6.6.1' + assert repr(s) == '' + # patch prefix with dot + s = ps('SSH-2.0-OpenSSH_5.9.CASPUR') + assert s.vendor is None + assert s.product == 'OpenSSH' + assert s.version == '5.9' + assert s.patch == 'CASPUR' + assert s.os is None + assert str(s) == 'OpenSSH 5.9 (CASPUR)' + assert str(s) == s.display() + assert s.display(True) == str(s) + assert s.display(False) == 'OpenSSH 5.9' + assert repr(s) == '' + + def test_dropbear_software(self): + ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x)) + # common + s = ps('SSH-2.0-dropbear_2016.74') + assert s.vendor is None + assert s.product == 'Dropbear SSH' + assert s.version == '2016.74' + assert s.patch is None + assert s.os is None + assert str(s) == 'Dropbear SSH 2016.74' + assert str(s) == s.display() + assert s.display(True) == str(s) + assert s.display(False) == str(s) + assert repr(s) == '' + # common, patch + s = ps('SSH-2.0-dropbear_0.44test4') + assert s.vendor is None + assert s.product == 'Dropbear SSH' + assert s.version == '0.44' + assert s.patch == 'test4' + assert s.os is None + assert str(s) == 'Dropbear SSH 0.44 (test4)' + assert str(s) == s.display() + assert s.display(True) == str(s) + assert s.display(False) == 'Dropbear SSH 0.44' + assert repr(s) == '' + # patch prefix with dash + s = ps('SSH-2.0-dropbear_0.44-Freesco-p49') + assert s.vendor is None + assert s.product == 'Dropbear SSH' + assert s.version == '0.44' + assert s.patch == 'Freesco-p49' + assert s.os is None + assert str(s) == 'Dropbear SSH 0.44 (Freesco-p49)' + assert str(s) == s.display() + assert s.display(True) == str(s) + assert s.display(False) == 'Dropbear SSH 0.44' + assert repr(s) == '' + # patch prefix with underline + s = ps('SSH-2.0-dropbear_2014.66_agbn_1') + assert s.vendor is None + assert s.product == 'Dropbear SSH' + assert s.version == '2014.66' + assert s.patch == 'agbn_1' + assert s.os is None + assert str(s) == 'Dropbear SSH 2014.66 (agbn_1)' + assert str(s) == s.display() + assert s.display(True) == str(s) + assert s.display(False) == 'Dropbear SSH 2014.66' + assert repr(s) == '' + + def test_libssh_software(self): + ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x)) + # common + s = ps('SSH-2.0-libssh-0.2') + assert s.vendor is None + assert s.product == 'libssh' + assert s.version == '0.2' + assert s.patch is None + assert s.os is None + assert str(s) == 'libssh 0.2' + assert str(s) == s.display() + assert s.display(True) == str(s) + assert s.display(False) == str(s) + assert repr(s) == '' + s = ps('SSH-2.0-libssh-0.7.3') + assert s.vendor is None + assert s.product == 'libssh' + assert s.version == '0.7.3' + assert s.patch is None + assert s.os is None + assert str(s) == 'libssh 0.7.3' + assert str(s) == s.display() + assert s.display(True) == str(s) + assert s.display(False) == str(s) + assert repr(s) == '' + + def test_romsshell_software(self): + ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x)) + # common + s = ps('SSH-2.0-RomSShell_5.40') + assert s.vendor == 'Allegro Software' + assert s.product == 'RomSShell' + assert s.version == '5.40' + assert s.patch is None + assert s.os is None + assert str(s) == 'Allegro Software RomSShell 5.40' + assert str(s) == s.display() + assert s.display(True) == str(s) + assert s.display(False) == str(s) + assert repr(s) == '' + + def test_hp_ilo_software(self): + ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x)) + # common + s = ps('SSH-2.0-mpSSH_0.2.1') + assert s.vendor == 'HP' + assert s.product == 'iLO (Integrated Lights-Out) sshd' + assert s.version == '0.2.1' + assert s.patch is None + assert s.os is None + assert str(s) == 'HP iLO (Integrated Lights-Out) sshd 0.2.1' + assert str(s) == s.display() + assert s.display(True) == str(s) + assert s.display(False) == str(s) + assert repr(s) == '' + + def test_cisco_software(self): + ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x)) + # common + s = ps('SSH-1.5-Cisco-1.25') + assert s.vendor == 'Cisco' + assert s.product == 'IOS/PIX sshd' + assert s.version == '1.25' + assert s.patch is None + assert s.os is None + assert str(s) == 'Cisco IOS/PIX sshd 1.25' + assert str(s) == s.display() + assert s.display(True) == str(s) + assert s.display(False) == str(s) + assert repr(s) == '' + + def test_sofware_os(self): + ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x)) + # unknown + s = ps('SSH-2.0-OpenSSH_3.7.1 MegaOperatingSystem 123') + assert s.os is None + # NetBSD + s = ps('SSH-1.99-OpenSSH_2.5.1 NetBSD_Secure_Shell-20010614') + assert s.os == 'NetBSD (2001-06-14)' + assert str(s) == 'OpenSSH 2.5.1 running on NetBSD (2001-06-14)' + assert repr(s) == '' + s = ps('SSH-1.99-OpenSSH_5.0 NetBSD_Secure_Shell-20080403+-hpn13v1') + assert s.os == 'NetBSD (2008-04-03)' + assert str(s) == 'OpenSSH 5.0 running on NetBSD (2008-04-03)' + assert repr(s) == '' + s = ps('SSH-2.0-OpenSSH_6.6.1_hpn13v11 NetBSD-20100308') + assert s.os == 'NetBSD (2010-03-08)' + assert str(s) == 'OpenSSH 6.6.1 (hpn13v11) running on NetBSD (2010-03-08)' + assert repr(s) == '' + s = ps('SSH-2.0-OpenSSH_4.4 NetBSD') + assert s.os == 'NetBSD' + assert str(s) == 'OpenSSH 4.4 running on NetBSD' + assert repr(s) == '' + s = ps('SSH-2.0-OpenSSH_3.0.2 NetBSD Secure Shell') + assert s.os == 'NetBSD' + assert str(s) == 'OpenSSH 3.0.2 running on NetBSD' + assert repr(s) == '' + # FreeBSD + s = ps('SSH-2.0-OpenSSH_7.2 FreeBSD-20160310') + assert s.os == 'FreeBSD (2016-03-10)' + assert str(s) == 'OpenSSH 7.2 running on FreeBSD (2016-03-10)' + assert repr(s) == '' + s = ps('SSH-1.99-OpenSSH_2.9 FreeBSD localisations 20020307') + assert s.os == 'FreeBSD (2002-03-07)' + assert str(s) == 'OpenSSH 2.9 running on FreeBSD (2002-03-07)' + assert repr(s) == '' + s = ps('SSH-2.0-OpenSSH_2.3.0 green@FreeBSD.org 20010321') + assert s.os == 'FreeBSD (2001-03-21)' + assert str(s) == 'OpenSSH 2.3.0 running on FreeBSD (2001-03-21)' + assert repr(s) == '' + s = ps('SSH-1.99-OpenSSH_4.4p1 FreeBSD-openssh-portable-overwrite-base-4.4.p1_1,1') + assert s.os == 'FreeBSD' + assert str(s) == 'OpenSSH 4.4p1 running on FreeBSD' + assert repr(s) == '' + s = ps('SSH-2.0-OpenSSH_7.2-OVH-rescue FreeBSD') + assert s.os == 'FreeBSD' + assert str(s) == 'OpenSSH 7.2 (OVH-rescue) running on FreeBSD' + assert repr(s) == '' + # Windows + s = ps('SSH-2.0-OpenSSH_3.7.1 in RemotelyAnywhere 5.21.422') + assert s.os == 'Microsoft Windows (RemotelyAnywhere 5.21.422)' + assert str(s) == 'OpenSSH 3.7.1 running on Microsoft Windows (RemotelyAnywhere 5.21.422)' + assert repr(s) == '' + s = ps('SSH-2.0-OpenSSH_3.8 in DesktopAuthority 7.1.091') + assert s.os == 'Microsoft Windows (DesktopAuthority 7.1.091)' + assert str(s) == 'OpenSSH 3.8 running on Microsoft Windows (DesktopAuthority 7.1.091)' + assert repr(s) == '' + s = ps('SSH-2.0-OpenSSH_3.8 in RemoteSupportManager 1.0.023') + assert s.os == 'Microsoft Windows (RemoteSupportManager 1.0.023)' + assert str(s) == 'OpenSSH 3.8 running on Microsoft Windows (RemoteSupportManager 1.0.023)' + assert repr(s) == '' diff --git a/test/test_ssh1.py b/test/test_ssh1.py new file mode 100644 index 0000000..b9eec53 --- /dev/null +++ b/test/test_ssh1.py @@ -0,0 +1,74 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +import pytest + + +class TestSSH1(object): + @pytest.fixture(autouse=True) + def init(self, ssh_audit): + self.ssh = ssh_audit.SSH + self.ssh1 = ssh_audit.SSH1 + self.rbuf = ssh_audit.ReadBuf + self.wbuf = ssh_audit.WriteBuf + + def test_crc32(self): + assert self.ssh1.crc32(b'') == 0x00 + assert self.ssh1.crc32(b'The quick brown fox jumps over the lazy dog') == 0xb9c60808 + + def _server_key(self): + return (1024, 0x10001, 0xee6552da432e0ac2c422df1a51287507748bfe3b5e3e4fa989a8f49fdc163a17754939ef18ef8a667ea3b71036a151fcd7f5e01ceef1e4439864baf3ac569047582c69d6c128212e0980dcb3168f00d371004039983f6033cd785b8b8f85096c7d9405cbfdc664e27c966356a6b4eb6ee20ad43414b50de18b22829c1880b551) + + def _host_key(self): + return (2048, 0x10001, 0xdfa20cd2a530ccc8c870aa60d9feb3b35deeab81c3215a96557abbd683d21f4600f38e475d87100da9a4404220eeb3bb5584e5a2b5b48ffda58530ea19104a32577d7459d91e76aa711b241050f4cc6d5327ccce254f371acad3be56d46eb5919b73f20dbdb1177b700f00891c5bf4ed128bb90ed541b778288285bcfa28432ab5cbcb8321b6e24760e998e0daa519f093a631e44276d7dd252ce0c08c75e2ab28a7349ead779f97d0f20a6d413bf3623cd216dc35375f6366690bcc41e3b2d5465840ec7ee0dc7e3f1c101d674a0c7dbccbc3942788b111396add2f8153b46a0e4b50d66e57ee92958f1c860dd97cc0e40e32febff915343ed53573142bdf4b) + + def _pkm_payload(self): + w = self.wbuf() + w.write(b'\x88\x99\xaa\xbb\xcc\xdd\xee\xff') + b, e, m = self._server_key() + w.write_int(b).write_mpint1(e).write_mpint1(m) + b, e, m = self._host_key() + w.write_int(b).write_mpint1(e).write_mpint1(m) + w.write_int(2) + w.write_int(72) + w.write_int(36) + return w.write_flush() + + def test_fingerprint(self): + b, e, m = self._host_key() + fpd = self.wbuf._create_mpint(m, False) + fpd += self.wbuf._create_mpint(e, False) + fp = self.ssh.Fingerprint(fpd) + assert fp.md5 == 'MD5:9d:26:f8:39:fc:20:9d:9b:ca:cc:4a:0f:e1:93:f5:96' + assert fp.sha256 == 'SHA256:vZdx3mhzbvVJmn08t/ruv8WDhJ9jfKYsCTuSzot+QIs' + + def test_pkm_read(self): + pkm = self.ssh1.PublicKeyMessage.parse(self._pkm_payload()) + assert pkm is not None + assert pkm.cookie == b'\x88\x99\xaa\xbb\xcc\xdd\xee\xff' + b, e, m = self._server_key() + assert pkm.server_key_bits == b + assert pkm.server_key_public_exponent == e + assert pkm.server_key_public_modulus == m + b, e, m = self._host_key() + assert pkm.host_key_bits == b + assert pkm.host_key_public_exponent == e + assert pkm.host_key_public_modulus == m + fp = self.ssh.Fingerprint(pkm.host_key_fingerprint_data) + assert pkm.protocol_flags == 2 + assert pkm.supported_ciphers_mask == 72 + assert pkm.supported_ciphers == ['3des', 'blowfish'] + assert pkm.supported_authentications_mask == 36 + assert pkm.supported_authentications == ['rsa', 'tis'] + assert fp.md5 == 'MD5:9d:26:f8:39:fc:20:9d:9b:ca:cc:4a:0f:e1:93:f5:96' + assert fp.sha256 == 'SHA256:vZdx3mhzbvVJmn08t/ruv8WDhJ9jfKYsCTuSzot+QIs' + + def test_pkm_payload(self): + cookie = b'\x88\x99\xaa\xbb\xcc\xdd\xee\xff' + skey = self._server_key() + hkey = self._host_key() + pflags = 2 + cmask = 72 + amask = 36 + pkm1 = self.ssh1.PublicKeyMessage(cookie, skey, hkey, pflags, cmask, amask) + pkm2 = self.ssh1.PublicKeyMessage.parse(self._pkm_payload()) + assert pkm1.payload == pkm2.payload diff --git a/test/test_ssh2.py b/test/test_ssh2.py new file mode 100644 index 0000000..a9cc425 --- /dev/null +++ b/test/test_ssh2.py @@ -0,0 +1,48 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +import pytest + + +class TestSSH2(object): + @pytest.fixture(autouse=True) + def init(self, ssh_audit): + self.ssh = ssh_audit.SSH + self.ssh2 = ssh_audit.SSH2 + self.rbuf = ssh_audit.ReadBuf + self.wbuf = ssh_audit.WriteBuf + + def _kex_payload(self): + w = self.wbuf() + w.write(b'\x00\x11\x22\x33\x44\x55\x66\x77\x88\x99\xaa\xbb\xcc\xdd\xee\xff') + w.write_list([u'curve25519-sha256@libssh.org', u'ecdh-sha2-nistp256', u'ecdh-sha2-nistp384', u'ecdh-sha2-nistp521', u'diffie-hellman-group-exchange-sha256', u'diffie-hellman-group14-sha1']) + w.write_list([u'ssh-rsa', u'rsa-sha2-512', u'rsa-sha2-256', u'ssh-ed25519']) + w.write_list([u'chacha20-poly1305@openssh.com', u'aes128-ctr', u'aes192-ctr', u'aes256-ctr', u'aes128-gcm@openssh.com', u'aes256-gcm@openssh.com', u'aes128-cbc', u'aes192-cbc', u'aes256-cbc']) + w.write_list([u'chacha20-poly1305@openssh.com', u'aes128-ctr', u'aes192-ctr', u'aes256-ctr', u'aes128-gcm@openssh.com', u'aes256-gcm@openssh.com', u'aes128-cbc', u'aes192-cbc', u'aes256-cbc']) + w.write_list([u'umac-64-etm@openssh.com', u'umac-128-etm@openssh.com', u'hmac-sha2-256-etm@openssh.com', u'hmac-sha2-512-etm@openssh.com', u'hmac-sha1-etm@openssh.com', u'umac-64@openssh.com', u'umac-128@openssh.com', u'hmac-sha2-256', u'hmac-sha2-512', u'hmac-sha1']) + w.write_list([u'umac-64-etm@openssh.com', u'umac-128-etm@openssh.com', u'hmac-sha2-256-etm@openssh.com', u'hmac-sha2-512-etm@openssh.com', u'hmac-sha1-etm@openssh.com', u'umac-64@openssh.com', u'umac-128@openssh.com', u'hmac-sha2-256', u'hmac-sha2-512', u'hmac-sha1']) + w.write_list([u'none', u'zlib@openssh.com']) + w.write_list([u'none', u'zlib@openssh.com']) + w.write_list([u'']) + w.write_list([u'']) + w.write_byte(False) + w.write_int(0) + return w.write_flush() + + def test_kex_read(self): + kex = self.ssh2.Kex.parse(self._kex_payload()) + assert kex is not None + assert kex.cookie == b'\x00\x11\x22\x33\x44\x55\x66\x77\x88\x99\xaa\xbb\xcc\xdd\xee\xff' + assert kex.kex_algorithms == [u'curve25519-sha256@libssh.org', u'ecdh-sha2-nistp256', u'ecdh-sha2-nistp384', u'ecdh-sha2-nistp521', u'diffie-hellman-group-exchange-sha256', u'diffie-hellman-group14-sha1'] + assert kex.key_algorithms == [u'ssh-rsa', u'rsa-sha2-512', u'rsa-sha2-256', u'ssh-ed25519'] + assert kex.client is not None + assert kex.server is not None + assert kex.client.encryption == [u'chacha20-poly1305@openssh.com', u'aes128-ctr', u'aes192-ctr', u'aes256-ctr', u'aes128-gcm@openssh.com', u'aes256-gcm@openssh.com', u'aes128-cbc', u'aes192-cbc', u'aes256-cbc'] + assert kex.server.encryption == [u'chacha20-poly1305@openssh.com', u'aes128-ctr', u'aes192-ctr', u'aes256-ctr', u'aes128-gcm@openssh.com', u'aes256-gcm@openssh.com', u'aes128-cbc', u'aes192-cbc', u'aes256-cbc'] + assert kex.client.mac == [u'umac-64-etm@openssh.com', u'umac-128-etm@openssh.com', u'hmac-sha2-256-etm@openssh.com', u'hmac-sha2-512-etm@openssh.com', u'hmac-sha1-etm@openssh.com', u'umac-64@openssh.com', u'umac-128@openssh.com', u'hmac-sha2-256', u'hmac-sha2-512', u'hmac-sha1'] + assert kex.server.mac == [u'umac-64-etm@openssh.com', u'umac-128-etm@openssh.com', u'hmac-sha2-256-etm@openssh.com', u'hmac-sha2-512-etm@openssh.com', u'hmac-sha1-etm@openssh.com', u'umac-64@openssh.com', u'umac-128@openssh.com', u'hmac-sha2-256', u'hmac-sha2-512', u'hmac-sha1'] + assert kex.client.compression == [u'none', u'zlib@openssh.com'] + assert kex.server.compression == [u'none', u'zlib@openssh.com'] + assert kex.client.languages == [u''] + assert kex.server.languages == [u''] + assert kex.follows is False + assert kex.unused == 0 diff --git a/test/test_version_compare.py b/test/test_version_compare.py index 8829f68..2f74310 100644 --- a/test/test_version_compare.py +++ b/test/test_version_compare.py @@ -2,10 +2,11 @@ # -*- coding: utf-8 -*- import pytest + class TestVersionCompare(object): @pytest.fixture(autouse=True) def init(self, ssh_audit): - self.ssh = ssh_audit.SSH + self.ssh = ssh_audit.SSH def get_dropbear_software(self, v): b = self.ssh.Banner.parse('SSH-2.0-dropbear_{0}'.format(v)) @@ -15,34 +16,69 @@ class TestVersionCompare(object): b = self.ssh.Banner.parse('SSH-2.0-OpenSSH_{0}'.format(v)) return self.ssh.Software.parse(b) + def get_libssh_software(self, v): + b = self.ssh.Banner.parse('SSH-2.0-libssh-{0}'.format(v)) + return self.ssh.Software.parse(b) + def test_dropbear_compare_version_pre_years(self): s = self.get_dropbear_software('0.44') + assert s.compare_version(None) == 1 + assert s.compare_version('') == 1 assert s.compare_version('0.43') > 0 assert s.compare_version('0.44') == 0 + assert s.compare_version(s) == 0 assert s.compare_version('0.45') < 0 - assert s.between_versions('0.43', '0.45') == True + assert s.between_versions('0.43', '0.45') + assert s.between_versions('0.43', '0.43') is False + assert s.between_versions('0.45', '0.43') is False def test_dropbear_compare_version_with_years(self): s = self.get_dropbear_software('2015.71') - assert s.compare_version('2014.67') > 0 + assert s.compare_version(None) == 1 + assert s.compare_version('') == 1 + assert s.compare_version('2014.66') > 0 assert s.compare_version('2015.71') == 0 + assert s.compare_version(s) == 0 assert s.compare_version('2016.74') < 0 - assert s.between_versions('2014.67', '2016.74') == True + assert s.between_versions('2014.66', '2016.74') + assert s.between_versions('2014.66', '2015.69') is False + assert s.between_versions('2016.74', '2014.66') is False def test_dropbear_compare_version_mixed(self): s = self.get_dropbear_software('0.53.1') + assert s.compare_version(None) == 1 + assert s.compare_version('') == 1 assert s.compare_version('0.53') > 0 assert s.compare_version('0.53.1') == 0 + assert s.compare_version(s) == 0 assert s.compare_version('2011.54') < 0 - assert s.between_versions('0.53', '2011.54') == True + assert s.between_versions('0.53', '2011.54') + assert s.between_versions('0.53', '0.53') is False + assert s.between_versions('2011.54', '0.53') is False def test_dropbear_compare_version_patchlevel(self): s1 = self.get_dropbear_software('0.44') s2 = self.get_dropbear_software('0.44test3') + assert s1.compare_version(None) == 1 + assert s1.compare_version('') == 1 + assert s1.compare_version('0.44') == 0 + assert s1.compare_version(s1) == 0 assert s1.compare_version('0.43') > 0 assert s1.compare_version('0.44test4') > 0 + assert s1.between_versions('0.44test4', '0.45') + assert s1.between_versions('0.43', '0.44test4') is False + assert s1.between_versions('0.45', '0.44test4') is False + assert s2.compare_version(None) == 1 + assert s2.compare_version('') == 1 + assert s2.compare_version('0.44test3') == 0 + assert s2.compare_version(s2) == 0 assert s2.compare_version('0.44') < 0 assert s2.compare_version('0.44test4') < 0 + assert s2.between_versions('0.43', '0.44') + assert s2.between_versions('0.43', '0.44test2') is False + assert s2.between_versions('0.44', '0.43') is False + assert s1.compare_version(s2) > 0 + assert s2.compare_version(s1) < 0 def test_dropbear_compare_version_sequential(self): versions = [] @@ -82,20 +118,28 @@ class TestVersionCompare(object): def test_openssh_compare_version_simple(self): s = self.get_openssh_software('3.7.1') + assert s.compare_version(None) == 1 + assert s.compare_version('') == 1 assert s.compare_version('3.7') > 0 assert s.compare_version('3.7.1') == 0 + assert s.compare_version(s) == 0 assert s.compare_version('3.8') < 0 - assert s.between_versions('3.7', '3.8') == True - - + assert s.between_versions('3.7', '3.8') + assert s.between_versions('3.6', '3.7') is False + assert s.between_versions('3.8', '3.7') is False + def test_openssh_compare_version_patchlevel(self): s1 = self.get_openssh_software('2.1.1') s2 = self.get_openssh_software('2.1.1p2') + assert s1.compare_version(s1) == 0 + assert s2.compare_version(s2) == 0 assert s1.compare_version('2.1.1p1') == 0 assert s1.compare_version('2.1.1p2') == 0 assert s2.compare_version('2.1.1') == 0 assert s2.compare_version('2.1.1p1') > 0 assert s2.compare_version('2.1.1p3') < 0 + assert s1.compare_version(s2) == 0 + assert s2.compare_version(s1) == 0 def test_openbsd_compare_version_sequential(self): versions = [] @@ -130,3 +174,41 @@ class TestVersionCompare(object): if i + 1 < l: vnext = versions[i + 1] assert s.compare_version(vnext) < 0 + + def test_libssh_compare_version_simple(self): + s = self.get_libssh_software('0.3') + assert s.compare_version(None) == 1 + assert s.compare_version('') == 1 + assert s.compare_version('0.2') > 0 + assert s.compare_version('0.3') == 0 + assert s.compare_version(s) == 0 + assert s.compare_version('0.3.1') < 0 + assert s.between_versions('0.2', '0.3.1') + assert s.between_versions('0.1', '0.2') is False + assert s.between_versions('0.3.1', '0.2') is False + + def test_libssh_compare_version_sequential(self): + versions = [] + for v in ['0.2', '0.3']: + versions.append(v) + for i in range(1, 5): + versions.append('0.3.{0}'.format(i)) + for i in range(0, 9): + versions.append('0.4.{0}'.format(i)) + for i in range(0, 6): + versions.append('0.5.{0}'.format(i)) + for i in range(0, 6): + versions.append('0.6.{0}'.format(i)) + for i in range(0, 4): + versions.append('0.7.{0}'.format(i)) + l = len(versions) + for i in range(l): + v = versions[i] + s = self.get_libssh_software(v) + assert s.compare_version(v) == 0 + if i - 1 >= 0: + vbefore = versions[i - 1] + assert s.compare_version(vbefore) > 0 + if i + 1 < l: + vnext = versions[i + 1] + assert s.compare_version(vnext) < 0