Merge branch 'develop'

This commit is contained in:
Andris Raugulis 2016-10-26 19:00:44 +03:00
commit 8c24fc01e8
19 changed files with 1159 additions and 247 deletions

3
.gitignore vendored
View File

@ -1,2 +1,5 @@
*~
*.pyc
html/
venv/
.cache/

View File

@ -1,5 +1,6 @@
# ssh-audit
[![build status](https://api.travis-ci.org/arthepsy/ssh-audit.svg)](https://travis-ci.org/arthepsy/ssh-audit)
[![build status](https://api.travis-ci.org/arthepsy/ssh-audit.svg)](https://travis-ci.org/arthepsy/ssh-audit)
[![coverage status](https://coveralls.io/repos/github/arthepsy/ssh-audit/badge.svg)](https://coveralls.io/github/arthepsy/ssh-audit)
**ssh-audit** is a tool for ssh server auditing.
## Features
@ -15,16 +16,20 @@
## Usage
```
usage: ssh-audit.py [-bnv] [-l <level>] <host[:port]>
usage: ssh-audit.py [-1246pbnvl] <host>
-1, --ssh1 force ssh version 1 only
-2, --ssh2 force ssh version 2 only
-4, --ipv4 enable IPv4 (order of precedence)
-6, --ipv6 enable IPv6 (order of precedence)
-p, --port=<port> port to connect
-b, --batch batch output
-n, --no-colors disable colors
-v, --verbose verbose output
-l, --level=<level> minimum output level (info|warn|fail)
```
* if both IPv4 and IPv6 are used, order of precedence can be set by using either `-46` or `-64`.
* batch flag `-b` will output sections without header and without empty lines (implies verbose flag).
* verbose flag `-v` will prefix each line with section type and algorithm name.
@ -32,6 +37,13 @@ usage: ssh-audit.py [-bnv] [-l <level>] <host[:port]>
![screenshot](https://cloud.githubusercontent.com/assets/7356025/19233757/3e09b168-8ef0-11e6-91b4-e880bacd0b8a.png)
## ChangeLog
### v1.x.x (2016-xx-xx)
- implement options to allow specify IPv4/IPv6 usage and order of precedence
- implement option to specify remote port (old behavior kept for compatibility)
- add colors support for Microsoft Windows via optional colorama dependency
- fix encoding and decoding issues, add tests, do not crash on encoding errors
- use mypy-lang for static type checking and verify all code
### v1.6.0 (2016-10-14)
- implement algorithm recommendations section (based on recognized software)
- implement full libssh support (version history, algorithms, security, etc)

File diff suppressed because it is too large Load Diff

View File

@ -1,10 +1,14 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pytest, os, sys, io
import os
import io
import sys
import socket
import pytest
if sys.version_info[0] == 2:
import StringIO
import StringIO # pylint: disable=import-error
StringIO = StringIO.StringIO
else:
StringIO = io.StringIO
@ -17,6 +21,7 @@ def ssh_audit():
return __import__('ssh-audit')
# pylint: disable=attribute-defined-outside-init
class _OutputSpy(list):
def begin(self):
self.__out = StringIO()
@ -33,3 +38,93 @@ class _OutputSpy(list):
@pytest.fixture(scope='module')
def output_spy():
return _OutputSpy()
class _VirtualSocket(object):
def __init__(self):
self.sock_address = ('127.0.0.1', 0)
self.peer_address = None
self._connected = False
self.timeout = -1.0
self.rdata = []
self.sdata = []
self.errors = {}
def _check_err(self, method):
method_error = self.errors.get(method)
if method_error:
raise method_error
def connect(self, address):
return self._connect(address, False)
def _connect(self, address, ret=True):
self.peer_address = address
self._connected = True
self._check_err('connect')
return self if ret else None
def settimeout(self, timeout):
self.timeout = timeout
def gettimeout(self):
return self.timeout
def getpeername(self):
if self.peer_address is None or not self._connected:
raise socket.error(57, 'Socket is not connected')
return self.peer_address
def getsockname(self):
return self.sock_address
def bind(self, address):
self.sock_address = address
def listen(self, backlog):
pass
def accept(self):
# pylint: disable=protected-access
conn = _VirtualSocket()
conn.sock_address = self.sock_address
conn.peer_address = ('127.0.0.1', 0)
conn._connected = True
return conn, conn.peer_address
def recv(self, bufsize, flags=0):
# pylint: disable=unused-argument
if not self._connected:
raise socket.error(54, 'Connection reset by peer')
if not len(self.rdata) > 0:
return b''
data = self.rdata.pop(0)
if isinstance(data, Exception):
raise data
return data
def send(self, data):
if self.peer_address is None or not self._connected:
raise socket.error(32, 'Broken pipe')
self._check_err('send')
self.sdata.append(data)
@pytest.fixture()
def virtual_socket(monkeypatch):
vsocket = _VirtualSocket()
# pylint: disable=unused-argument
def _socket(family=socket.AF_INET,
socktype=socket.SOCK_STREAM,
proto=0,
fileno=None):
return vsocket
def _cc(address, timeout=0, source_address=None):
# pylint: disable=protected-access
return vsocket._connect(address, True)
monkeypatch.setattr(socket, 'create_connection', _cc)
monkeypatch.setattr(socket, 'socket', _socket)
return vsocket

10
test/coverage.sh Executable file
View File

@ -0,0 +1,10 @@
#!/bin/sh
_cdir=$(cd -- "$(dirname "$0")" && pwd)
type py.test > /dev/null 2>&1
if [ $? -ne 0 ]; then
echo "err: py.test (Python testing framework) not found."
exit 1
fi
cd -- "${_cdir}/.."
mkdir -p html
py.test -v --cov-report=html:html/coverage --cov=ssh-audit test

10
test/mypy-py2.sh Executable file
View File

@ -0,0 +1,10 @@
#!/bin/sh
_cdir=$(cd -- "$(dirname "$0")" && pwd)
type mypy > /dev/null 2>&1
if [ $? -ne 0 ]; then
echo "err: mypy (Optional Static Typing for Python) not found."
exit 1
fi
_htmldir="${_cdir}/../html/mypy-py2"
mkdir -p "${_htmldir}"
mypy --python-version 2.7 --config-file "${_cdir}/mypy.ini" --html-report "${_htmldir}" "${_cdir}/../ssh-audit.py"

10
test/mypy-py3.sh Executable file
View File

@ -0,0 +1,10 @@
#!/bin/sh
_cdir=$(cd -- "$(dirname "$0")" && pwd)
type mypy > /dev/null 2>&1
if [ $? -ne 0 ]; then
echo "err: mypy (Optional Static Typing for Python) not found."
exit 1
fi
_htmldir="${_cdir}/../html/mypy-py3"
mkdir -p "${_htmldir}"
mypy --python-version 3.5 --config-file "${_cdir}/mypy.ini" --html-report "${_htmldir}" "${_cdir}/../ssh-audit.py"

9
test/mypy.ini Normal file
View File

@ -0,0 +1,9 @@
[mypy]
silent_imports = True
disallow_untyped_calls = True
disallow_untyped_defs = True
check_untyped_defs = True
disallow-subclassing-any = True
warn-incomplete-stub = True
warn-redundant-casts = True

View File

@ -5,4 +5,9 @@ if [ $? -ne 0 ]; then
echo "err: prospector (Python Static Analysis) not found."
exit 1
fi
prospector --profile-path "${_cdir}" -P prospector "${_cdir}/../ssh-audit.py"
if [ X"$1" == X"" ]; then
_file="${_cdir}/../ssh-audit.py"
else
_file="$1"
fi
prospector -E --profile-path "${_cdir}" -P prospector "${_file}"

View File

@ -1,9 +1,42 @@
inherits:
- strictness_veryhigh
strictness: veryhigh
doc-warnings: false
pylint:
disable:
- multiple-imports
- invalid-name
- trailing-whitespace
options:
max-args: 8 # default: 5
max-locals: 20 # default: 15
max-returns: 6
max-branches: 15 # default: 12
max-statements: 60 # default: 50
max-parents: 7
max-attributes: 8 # default: 7
min-public-methods: 1 # default: 2
max-public-methods: 20
max-bool-expr: 5
max-nested-blocks: 6 # default: 5
max-line-length: 80 # default: 100
ignore-long-lines: ^\s*(#\s+type:\s+.*|[A-Z0-9_]+\s+=\s+.*|('.*':\s+)?\[.*\],?)$
max-module-lines: 2500 # default: 10000
pep8:
disable:
- W191
- W293
- E501
- E221
- W191 # indentation contains tabs
- W293 # blank line contains whitespace
- E101 # indentation contains mixed spaces and tabs
- E401 # multiple imports on one line
- E501 # line too long
- E221 # multiple spaces before operator
pyflakes:
disable:
- F401 # module imported but unused
- F821 # undefined name
mccabe:
options:
max-complexity: 15

View File

@ -3,13 +3,15 @@
import pytest
# pylint: disable=attribute-defined-outside-init
class TestAuditConf(object):
@pytest.fixture(autouse=True)
def init(self, ssh_audit):
self.AuditConf = ssh_audit.AuditConf
self.usage = ssh_audit.usage
def _test_conf(self, conf, **kwargs):
@classmethod
def _test_conf(cls, conf, **kwargs):
options = {
'host': None,
'port': 22,
@ -18,7 +20,10 @@ class TestAuditConf(object):
'batch': False,
'colors': True,
'verbose': False,
'minlevel': 'info'
'minlevel': 'info',
'ipv4': True,
'ipv6': True,
'ipvo': ()
}
for k, v in kwargs.items():
options[k] = v
@ -30,6 +35,9 @@ class TestAuditConf(object):
assert conf.colors is options['colors']
assert conf.verbose is options['verbose']
assert conf.minlevel == options['minlevel']
assert conf.ipv4 == options['ipv4']
assert conf.ipv6 == options['ipv6']
assert conf.ipvo == options['ipvo']
def test_audit_conf_defaults(self):
conf = self.AuditConf()
@ -55,6 +63,58 @@ class TestAuditConf(object):
conf.port = port
excinfo.match(r'.*invalid port.*')
def test_audit_conf_ipvo(self):
# ipv4-only
conf = self.AuditConf()
conf.ipv4 = True
assert conf.ipv4 is True
assert conf.ipv6 is False
assert conf.ipvo == (4,)
# ipv6-only
conf = self.AuditConf()
conf.ipv6 = True
assert conf.ipv4 is False
assert conf.ipv6 is True
assert conf.ipvo == (6,)
# ipv4-only (by removing ipv6)
conf = self.AuditConf()
conf.ipv6 = False
assert conf.ipv4 is True
assert conf.ipv6 is False
assert conf.ipvo == (4, )
# ipv6-only (by removing ipv4)
conf = self.AuditConf()
conf.ipv4 = False
assert conf.ipv4 is False
assert conf.ipv6 is True
assert conf.ipvo == (6, )
# ipv4-preferred
conf = self.AuditConf()
conf.ipv4 = True
conf.ipv6 = True
assert conf.ipv4 is True
assert conf.ipv6 is True
assert conf.ipvo == (4, 6)
# ipv6-preferred
conf = self.AuditConf()
conf.ipv6 = True
conf.ipv4 = True
assert conf.ipv4 is True
assert conf.ipv6 is True
assert conf.ipvo == (6, 4)
# ipvo empty
conf = self.AuditConf()
conf.ipvo = ()
assert conf.ipv4 is True
assert conf.ipv6 is True
assert conf.ipvo == ()
# ipvo validation
conf = self.AuditConf()
conf.ipvo = (1, 2, 3, 4, 5, 6)
assert conf.ipvo == (4, 6)
conf.ipvo = (4, 4, 4, 6, 6)
assert conf.ipvo == (4, 6)
def test_audit_conf_minlevel(self):
conf = self.AuditConf()
for level in ['info', 'warn', 'fail']:
@ -66,7 +126,8 @@ class TestAuditConf(object):
excinfo.match(r'.*invalid level.*')
def test_audit_conf_cmdline(self):
c = lambda x: self.AuditConf.from_cmdline(x.split(), self.usage)
# pylint: disable=too-many-statements
c = lambda x: self.AuditConf.from_cmdline(x.split(), self.usage) # noqa
with pytest.raises(SystemExit):
conf = c('')
with pytest.raises(SystemExit):
@ -85,20 +146,36 @@ class TestAuditConf(object):
self._test_conf(conf, host='github.com')
conf = c('localhost:2222')
self._test_conf(conf, host='localhost', port=2222)
conf = c('-p 2222 localhost')
self._test_conf(conf, host='localhost', port=2222)
with pytest.raises(SystemExit):
conf = c('localhost:')
with pytest.raises(SystemExit):
conf = c('localhost:abc')
with pytest.raises(SystemExit):
conf = c('-p abc localhost')
with pytest.raises(SystemExit):
conf = c('localhost:-22')
with pytest.raises(SystemExit):
conf = c('-p -22 localhost')
with pytest.raises(SystemExit):
conf = c('localhost:99999')
with pytest.raises(SystemExit):
conf = c('-p 99999 localhost')
conf = c('-1 localhost')
self._test_conf(conf, host='localhost', ssh1=True, ssh2=False)
conf = c('-2 localhost')
self._test_conf(conf, host='localhost', ssh1=False, ssh2=True)
conf = c('-12 localhost')
self._test_conf(conf, host='localhost', ssh1=True, ssh2=True)
conf = c('-4 localhost')
self._test_conf(conf, host='localhost', ipv4=True, ipv6=False, ipvo=(4,))
conf = c('-6 localhost')
self._test_conf(conf, host='localhost', ipv4=False, ipv6=True, ipvo=(6,))
conf = c('-46 localhost')
self._test_conf(conf, host='localhost', ipv4=True, ipv6=True, ipvo=(4, 6))
conf = c('-64 localhost')
self._test_conf(conf, host='localhost', ipv4=True, ipv6=True, ipvo=(6, 4))
conf = c('-b localhost')
self._test_conf(conf, host='localhost', batch=True, verbose=True)
conf = c('-n localhost')

View File

@ -3,13 +3,14 @@
import pytest
# pylint: disable=line-too-long,attribute-defined-outside-init
class TestBanner(object):
@pytest.fixture(autouse=True)
def init(self, ssh_audit):
self.ssh = ssh_audit.SSH
def test_simple_banners(self):
banner = lambda x: self.ssh.Banner.parse(x)
banner = lambda x: self.ssh.Banner.parse(x) # noqa
b = banner('SSH-2.0-OpenSSH_7.3')
assert b.protocol == (2, 0)
assert b.software == 'OpenSSH_7.3'
@ -27,12 +28,12 @@ class TestBanner(object):
assert str(b) == 'SSH-1.5-Cisco-1.25'
def test_invalid_banners(self):
b = lambda x: self.ssh.Banner.parse(x)
b = lambda x: self.ssh.Banner.parse(x) # noqa
assert b('Something') is None
assert b('SSH-XXX-OpenSSH_7.3') is None
def test_banners_with_spaces(self):
b = lambda x: self.ssh.Banner.parse(x)
b = lambda x: self.ssh.Banner.parse(x) # noqa
s = 'SSH-2.0-OpenSSH_4.3p2'
assert str(b('SSH-2.0-OpenSSH_4.3p2 ')) == s
assert str(b('SSH-2.0- OpenSSH_4.3p2')) == s
@ -43,7 +44,7 @@ class TestBanner(object):
assert str(b('SSH-2.0- OpenSSH_4.3p2 Debian-9etch3 on i686-pc-linux-gnu ')) == s
def test_banners_without_software(self):
b = lambda x: self.ssh.Banner.parse(x)
b = lambda x: self.ssh.Banner.parse(x) # noqa
assert b('SSH-2.0').protocol == (2, 0)
assert b('SSH-2.0').software is None
assert b('SSH-2.0').comments is None
@ -54,13 +55,13 @@ class TestBanner(object):
assert str(b('SSH-2.0-')) == 'SSH-2.0-'
def test_banners_with_comments(self):
b = lambda x: self.ssh.Banner.parse(x)
b = lambda x: self.ssh.Banner.parse(x) # noqa
assert repr(b('SSH-2.0-OpenSSH_7.2p2 Ubuntu-1')) == '<Banner(protocol=2.0, software=OpenSSH_7.2p2, comments=Ubuntu-1)>'
assert repr(b('SSH-1.99-OpenSSH_3.4p1 Debian 1:3.4p1-1.woody.3')) == '<Banner(protocol=1.99, software=OpenSSH_3.4p1, comments=Debian 1:3.4p1-1.woody.3)>'
assert repr(b('SSH-1.5-1.3.7 F-SECURE SSH')) == '<Banner(protocol=1.5, software=1.3.7, comments=F-SECURE SSH)>'
def test_banners_with_multiple_protocols(self):
b = lambda x: self.ssh.Banner.parse(x)
b = lambda x: self.ssh.Banner.parse(x) # noqa
assert str(b('SSH-1.99-SSH-1.99-OpenSSH_3.6.1p2')) == 'SSH-1.99-OpenSSH_3.6.1p2'
assert str(b('SSH-2.0-SSH-2.0-OpenSSH_4.3p2 Debian-9')) == 'SSH-2.0-OpenSSH_4.3p2 Debian-9'
assert str(b('SSH-1.99-SSH-2.0-dropbear_0.5')) == 'SSH-1.99-dropbear_0.5'

View File

@ -1,16 +1,19 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pytest
import re
import pytest
# pylint: disable=attribute-defined-outside-init,bad-whitespace
class TestBuffer(object):
@pytest.fixture(autouse=True)
def init(self, ssh_audit):
self.rbuf = ssh_audit.ReadBuf
self.wbuf = ssh_audit.WriteBuf
self.utf8rchar = b'\xef\xbf\xbd'
def _b(self, v):
@classmethod
def _b(cls, v):
v = re.sub(r'\s', '', v)
data = [int(v[i * 2:i * 2 + 2], 16) for i in range(len(v) // 2)]
return bytes(bytearray(data))
@ -25,8 +28,8 @@ class TestBuffer(object):
assert r.unread_len == 0
def test_byte(self):
w = lambda x: self.wbuf().write_byte(x).write_flush()
r = lambda x: self.rbuf(x).read_byte()
w = lambda x: self.wbuf().write_byte(x).write_flush() # noqa
r = lambda x: self.rbuf(x).read_byte() # noqa
tc = [(0x00, '00'),
(0x01, '01'),
(0x10, '10'),
@ -36,8 +39,8 @@ class TestBuffer(object):
assert r(self._b(p[1])) == p[0]
def test_bool(self):
w = lambda x: self.wbuf().write_bool(x).write_flush()
r = lambda x: self.rbuf(x).read_bool()
w = lambda x: self.wbuf().write_bool(x).write_flush() # noqa
r = lambda x: self.rbuf(x).read_bool() # noqa
tc = [(True, '01'),
(False, '00')]
for p in tc:
@ -45,8 +48,8 @@ class TestBuffer(object):
assert r(self._b(p[1])) == p[0]
def test_int(self):
w = lambda x: self.wbuf().write_int(x).write_flush()
r = lambda x: self.rbuf(x).read_int()
w = lambda x: self.wbuf().write_int(x).write_flush() # noqa
r = lambda x: self.rbuf(x).read_int() # noqa
tc = [(0x00, '00 00 00 00'),
(0x01, '00 00 00 01'),
(0xabcd, '00 00 ab cd'),
@ -56,8 +59,8 @@ class TestBuffer(object):
assert r(self._b(p[1])) == p[0]
def test_string(self):
w = lambda x: self.wbuf().write_string(x).write_flush()
r = lambda x: self.rbuf(x).read_string()
w = lambda x: self.wbuf().write_string(x).write_flush() # noqa
r = lambda x: self.rbuf(x).read_string() # noqa
tc = [(u'abc1', '00 00 00 04 61 62 63 31'),
(b'abc2', '00 00 00 04 61 62 63 32')]
for p in tc:
@ -68,22 +71,35 @@ class TestBuffer(object):
assert r(self._b(p[1])) == v
def test_list(self):
w = lambda x: self.wbuf().write_list(x).write_flush()
r = lambda x: self.rbuf(x).read_list()
w = lambda x: self.wbuf().write_list(x).write_flush() # noqa
r = lambda x: self.rbuf(x).read_list() # noqa
tc = [(['d', 'ef', 'ault'], '00 00 00 09 64 2c 65 66 2c 61 75 6c 74')]
for p in tc:
assert w(p[0]) == self._b(p[1])
assert r(self._b(p[1])) == p[0]
def test_list_nonutf8(self):
r = lambda x: self.rbuf(x).read_list() # noqa
src = self._b('00 00 00 04 de ad be ef')
dst = [(b'\xde\xad' + self.utf8rchar + self.utf8rchar).decode('utf-8')]
assert r(src) == dst
def test_line(self):
w = lambda x: self.wbuf().write_line(x).write_flush()
r = lambda x: self.rbuf(x).read_line()
w = lambda x: self.wbuf().write_line(x).write_flush() # noqa
r = lambda x: self.rbuf(x).read_line() # noqa
tc = [(u'example line', '65 78 61 6d 70 6c 65 20 6c 69 6e 65 0d 0a')]
for p in tc:
assert w(p[0]) == self._b(p[1])
assert r(self._b(p[1])) == p[0]
def test_line_nonutf8(self):
r = lambda x: self.rbuf(x).read_line() # noqa
src = self._b('de ad be af')
dst = (b'\xde\xad' + self.utf8rchar + self.utf8rchar).decode('utf-8')
assert r(src) == dst
def test_bitlen(self):
# pylint: disable=protected-access
class Py26Int(int):
def bit_length(self):
raise AttributeError
@ -91,8 +107,8 @@ class TestBuffer(object):
assert self.wbuf._bitlength(Py26Int(42)) == 6
def test_mpint1(self):
mpint1w = lambda x: self.wbuf().write_mpint1(x).write_flush()
mpint1r = lambda x: self.rbuf(x).read_mpint1()
mpint1w = lambda x: self.wbuf().write_mpint1(x).write_flush() # noqa
mpint1r = lambda x: self.rbuf(x).read_mpint1() # noqa
tc = [(0x0, '00 00'),
(0x1234, '00 0d 12 34'),
(0x12345, '00 11 01 23 45'),
@ -102,8 +118,8 @@ class TestBuffer(object):
assert mpint1r(self._b(p[1])) == p[0]
def test_mpint2(self):
mpint2w = lambda x: self.wbuf().write_mpint2(x).write_flush()
mpint2r = lambda x: self.rbuf(x).read_mpint2()
mpint2w = lambda x: self.wbuf().write_mpint2(x).write_flush() # noqa
mpint2r = lambda x: self.rbuf(x).read_mpint2() # noqa
tc = [(0x0, '00 00 00 00'),
(0x80, '00 00 00 02 00 80'),
(0x9a378f9b2e332a7, '00 00 00 08 09 a3 78 f9 b2 e3 32 a7'),

123
test/test_errors.py Normal file
View File

@ -0,0 +1,123 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import socket
import pytest
# pylint: disable=attribute-defined-outside-init
class TestErrors(object):
@pytest.fixture(autouse=True)
def init(self, ssh_audit):
self.AuditConf = ssh_audit.AuditConf
self.audit = ssh_audit.audit
def _conf(self):
conf = self.AuditConf('localhost', 22)
conf.colors = False
conf.batch = True
return conf
def test_connection_refused(self, output_spy, virtual_socket):
vsocket = virtual_socket
vsocket.errors['connect'] = socket.error(61, 'Connection refused')
output_spy.begin()
with pytest.raises(SystemExit):
self.audit(self._conf())
lines = output_spy.flush()
assert len(lines) == 1
assert 'Connection refused' in lines[-1]
def test_connection_closed_before_banner(self, output_spy, virtual_socket):
vsocket = virtual_socket
vsocket.rdata.append(socket.error(54, 'Connection reset by peer'))
output_spy.begin()
with pytest.raises(SystemExit):
self.audit(self._conf())
lines = output_spy.flush()
assert len(lines) == 1
assert 'did not receive banner' in lines[-1]
def test_connection_closed_after_header(self, output_spy, virtual_socket):
vsocket = virtual_socket
vsocket.rdata.append(b'header line 1\n')
vsocket.rdata.append(b'header line 2\n')
vsocket.rdata.append(socket.error(54, 'Connection reset by peer'))
output_spy.begin()
with pytest.raises(SystemExit):
self.audit(self._conf())
lines = output_spy.flush()
assert len(lines) == 3
assert 'did not receive banner' in lines[-1]
def test_connection_closed_after_banner(self, output_spy, virtual_socket):
vsocket = virtual_socket
vsocket.rdata.append(b'SSH-2.0-ssh-audit-test\r\n')
vsocket.rdata.append(socket.error(54, 'Connection reset by peer'))
output_spy.begin()
with pytest.raises(SystemExit):
self.audit(self._conf())
lines = output_spy.flush()
assert len(lines) == 2
assert 'error reading packet' in lines[-1]
assert 'reset by peer' in lines[-1]
def test_empty_data_after_banner(self, output_spy, virtual_socket):
vsocket = virtual_socket
vsocket.rdata.append(b'SSH-2.0-ssh-audit-test\r\n')
output_spy.begin()
with pytest.raises(SystemExit):
self.audit(self._conf())
lines = output_spy.flush()
assert len(lines) == 2
assert 'error reading packet' in lines[-1]
assert 'empty' in lines[-1]
def test_wrong_data_after_banner(self, output_spy, virtual_socket):
vsocket = virtual_socket
vsocket.rdata.append(b'SSH-2.0-ssh-audit-test\r\n')
vsocket.rdata.append(b'xxx\n')
output_spy.begin()
with pytest.raises(SystemExit):
self.audit(self._conf())
lines = output_spy.flush()
assert len(lines) == 2
assert 'error reading packet' in lines[-1]
assert 'xxx' in lines[-1]
def test_non_ascii_banner(self, output_spy, virtual_socket):
vsocket = virtual_socket
vsocket.rdata.append(b'SSH-2.0-ssh-audit-test\xc3\xbc\r\n')
output_spy.begin()
with pytest.raises(SystemExit):
self.audit(self._conf())
lines = output_spy.flush()
assert len(lines) == 3
assert 'error reading packet' in lines[-1]
assert 'ASCII' in lines[-2]
assert lines[-3].endswith('SSH-2.0-ssh-audit-test?')
def test_nonutf8_data_after_banner(self, output_spy, virtual_socket):
vsocket = virtual_socket
vsocket.rdata.append(b'SSH-2.0-ssh-audit-test\r\n')
vsocket.rdata.append(b'\x81\xff\n')
output_spy.begin()
with pytest.raises(SystemExit):
self.audit(self._conf())
lines = output_spy.flush()
assert len(lines) == 2
assert 'error reading packet' in lines[-1]
assert '\\x81\\xff' in lines[-1]
def test_protocol_mismatch_by_conf(self, output_spy, virtual_socket):
vsocket = virtual_socket
vsocket.rdata.append(b'SSH-1.3-ssh-audit-test\r\n')
vsocket.rdata.append(b'Protocol major versions differ.\n')
output_spy.begin()
with pytest.raises(SystemExit):
conf = self._conf()
conf.ssh1, conf.ssh2 = True, False
self.audit(conf)
lines = output_spy.flush()
assert len(lines) == 3
assert 'error reading packet' in lines[-1]
assert 'major versions differ' in lines[-1]

View File

@ -1,9 +1,10 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import print_function
import pytest, io, sys
import pytest
# pylint: disable=attribute-defined-outside-init
class TestOutput(object):
@pytest.fixture(autouse=True)
def init(self, ssh_audit):
@ -23,7 +24,7 @@ class TestOutput(object):
def test_output_buffer_no_flush(self, output_spy):
output_spy.begin()
with self.OutputBuffer() as obuf:
with self.OutputBuffer():
print(u'abc')
assert output_spy.flush() == []
@ -62,6 +63,8 @@ class TestOutput(object):
output_spy.begin()
out.fail('fail color')
assert output_spy.flush() == [u'fail color']
if not out.colors_supported:
return
# test with colors
out.colors = True
output_spy.begin()

View File

@ -3,19 +3,21 @@
import pytest
# pylint: disable=line-too-long,attribute-defined-outside-init
class TestSoftware(object):
@pytest.fixture(autouse=True)
def init(self, ssh_audit):
self.ssh = ssh_audit.SSH
def test_unknown_software(self):
ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x))
ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x)) # noqa
assert ps('SSH-1.5') is None
assert ps('SSH-1.99-AlfaMegaServer') is None
assert ps('SSH-2.0-BetaMegaServer 0.0.1') is None
def test_openssh_software(self):
ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x))
# pylint: disable=too-many-statements
ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x)) # noqa
# common
s = ps('SSH-2.0-OpenSSH_7.3')
assert s.vendor is None
@ -102,7 +104,7 @@ class TestSoftware(object):
assert repr(s) == '<Software(product=OpenSSH, version=5.9, patch=CASPUR)>'
def test_dropbear_software(self):
ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x))
ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x)) # noqa
# common
s = ps('SSH-2.0-dropbear_2016.74')
assert s.vendor is None
@ -153,7 +155,7 @@ class TestSoftware(object):
assert repr(s) == '<Software(product=Dropbear SSH, version=2014.66, patch=agbn_1)>'
def test_libssh_software(self):
ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x))
ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x)) # noqa
# common
s = ps('SSH-2.0-libssh-0.2')
assert s.vendor is None
@ -179,7 +181,7 @@ class TestSoftware(object):
assert repr(s) == '<Software(product=libssh, version=0.7.3)>'
def test_romsshell_software(self):
ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x))
ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x)) # noqa
# common
s = ps('SSH-2.0-RomSShell_5.40')
assert s.vendor == 'Allegro Software'
@ -194,7 +196,7 @@ class TestSoftware(object):
assert repr(s) == '<Software(vendor=Allegro Software, product=RomSShell, version=5.40)>'
def test_hp_ilo_software(self):
ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x))
ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x)) # noqa
# common
s = ps('SSH-2.0-mpSSH_0.2.1')
assert s.vendor == 'HP'
@ -209,7 +211,7 @@ class TestSoftware(object):
assert repr(s) == '<Software(vendor=HP, product=iLO (Integrated Lights-Out) sshd, version=0.2.1)>'
def test_cisco_software(self):
ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x))
ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x)) # noqa
# common
s = ps('SSH-1.5-Cisco-1.25')
assert s.vendor == 'Cisco'
@ -224,7 +226,7 @@ class TestSoftware(object):
assert repr(s) == '<Software(vendor=Cisco, product=IOS/PIX sshd, version=1.25)>'
def test_software_os(self):
ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x))
ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x)) # noqa
# unknown
s = ps('SSH-2.0-OpenSSH_3.7.1 MegaOperatingSystem 123')
assert s.os is None

View File

@ -1,8 +1,10 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import struct
import pytest
# pylint: disable=line-too-long,attribute-defined-outside-init
class TestSSH1(object):
@pytest.fixture(autouse=True)
def init(self, ssh_audit):
@ -10,15 +12,32 @@ class TestSSH1(object):
self.ssh1 = ssh_audit.SSH1
self.rbuf = ssh_audit.ReadBuf
self.wbuf = ssh_audit.WriteBuf
self.audit = ssh_audit.audit
self.AuditConf = ssh_audit.AuditConf
def test_crc32(self):
assert self.ssh1.crc32(b'') == 0x00
assert self.ssh1.crc32(b'The quick brown fox jumps over the lazy dog') == 0xb9c60808
def _conf(self):
conf = self.AuditConf('localhost', 22)
conf.colors = False
conf.batch = True
conf.verbose = True
conf.ssh1 = True
conf.ssh2 = False
return conf
def _server_key(self):
def _create_ssh1_packet(self, payload, valid_crc=True):
padding = -(len(payload) + 4) % 8
plen = len(payload) + 4
pad_bytes = b'\x00' * padding
cksum = self.ssh1.crc32(pad_bytes + payload) if valid_crc else 0
data = struct.pack('>I', plen) + pad_bytes + payload + struct.pack('>I', cksum)
return data
@classmethod
def _server_key(cls):
return (1024, 0x10001, 0xee6552da432e0ac2c422df1a51287507748bfe3b5e3e4fa989a8f49fdc163a17754939ef18ef8a667ea3b71036a151fcd7f5e01ceef1e4439864baf3ac569047582c69d6c128212e0980dcb3168f00d371004039983f6033cd785b8b8f85096c7d9405cbfdc664e27c966356a6b4eb6ee20ad43414b50de18b22829c1880b551)
def _host_key(self):
@classmethod
def _host_key(cls):
return (2048, 0x10001, 0xdfa20cd2a530ccc8c870aa60d9feb3b35deeab81c3215a96557abbd683d21f4600f38e475d87100da9a4404220eeb3bb5584e5a2b5b48ffda58530ea19104a32577d7459d91e76aa711b241050f4cc6d5327ccce254f371acad3be56d46eb5919b73f20dbdb1177b700f00891c5bf4ed128bb90ed541b778288285bcfa28432ab5cbcb8321b6e24760e998e0daa519f093a631e44276d7dd252ce0c08c75e2ab28a7349ead779f97d0f20a6d413bf3623cd216dc35375f6366690bcc41e3b2d5465840ec7ee0dc7e3f1c101d674a0c7dbccbc3942788b111396add2f8153b46a0e4b50d66e57ee92958f1c860dd97cc0e40e32febff915343ed53573142bdf4b)
def _pkm_payload(self):
@ -33,11 +52,17 @@ class TestSSH1(object):
w.write_int(36)
return w.write_flush()
def test_crc32(self):
assert self.ssh1.crc32(b'') == 0x00
assert self.ssh1.crc32(b'The quick brown fox jumps over the lazy dog') == 0xb9c60808
def test_fingerprint(self):
# pylint: disable=protected-access
b, e, m = self._host_key()
fpd = self.wbuf._create_mpint(m, False)
fpd += self.wbuf._create_mpint(e, False)
fp = self.ssh.Fingerprint(fpd)
assert b == 2048
assert fp.md5 == 'MD5:9d:26:f8:39:fc:20:9d:9b:ca:cc:4a:0f:e1:93:f5:96'
assert fp.sha256 == 'SHA256:vZdx3mhzbvVJmn08t/ruv8WDhJ9jfKYsCTuSzot+QIs'
@ -63,7 +88,7 @@ class TestSSH1(object):
assert fp.sha256 == 'SHA256:vZdx3mhzbvVJmn08t/ruv8WDhJ9jfKYsCTuSzot+QIs'
def test_pkm_payload(self):
cookie = b'\x88\x99\xaa\xbb\xcc\xdd\xee\xff'
cookie = b'\x88\x99\xaa\xbb\xcc\xdd\xee\xff'
skey = self._server_key()
hkey = self._host_key()
pflags = 2
@ -72,3 +97,43 @@ class TestSSH1(object):
pkm1 = self.ssh1.PublicKeyMessage(cookie, skey, hkey, pflags, cmask, amask)
pkm2 = self.ssh1.PublicKeyMessage.parse(self._pkm_payload())
assert pkm1.payload == pkm2.payload
def test_ssh1_server_simple(self, output_spy, virtual_socket):
vsocket = virtual_socket
w = self.wbuf()
w.write_byte(self.ssh.Protocol.SMSG_PUBLIC_KEY)
w.write(self._pkm_payload())
vsocket.rdata.append(b'SSH-1.5-OpenSSH_7.2 ssh-audit-test\r\n')
vsocket.rdata.append(self._create_ssh1_packet(w.write_flush()))
output_spy.begin()
self.audit(self._conf())
lines = output_spy.flush()
assert len(lines) == 10
def test_ssh1_server_invalid_first_packet(self, output_spy, virtual_socket):
vsocket = virtual_socket
w = self.wbuf()
w.write_byte(self.ssh.Protocol.SMSG_PUBLIC_KEY + 1)
w.write(self._pkm_payload())
vsocket.rdata.append(b'SSH-1.5-OpenSSH_7.2 ssh-audit-test\r\n')
vsocket.rdata.append(self._create_ssh1_packet(w.write_flush()))
output_spy.begin()
with pytest.raises(SystemExit):
self.audit(self._conf())
lines = output_spy.flush()
assert len(lines) == 4
assert 'unknown message' in lines[-1]
def test_ssh1_server_invalid_checksum(self, output_spy, virtual_socket):
vsocket = virtual_socket
w = self.wbuf()
w.write_byte(self.ssh.Protocol.SMSG_PUBLIC_KEY + 1)
w.write(self._pkm_payload())
vsocket.rdata.append(b'SSH-1.5-OpenSSH_7.2 ssh-audit-test\r\n')
vsocket.rdata.append(self._create_ssh1_packet(w.write_flush(), False))
output_spy.begin()
with pytest.raises(SystemExit):
self.audit(self._conf())
lines = output_spy.flush()
assert len(lines) == 1
assert 'checksum' in lines[-1]

View File

@ -1,8 +1,10 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import struct, os
import pytest
# pylint: disable=line-too-long,attribute-defined-outside-init
class TestSSH2(object):
@pytest.fixture(autouse=True)
def init(self, ssh_audit):
@ -10,6 +12,27 @@ class TestSSH2(object):
self.ssh2 = ssh_audit.SSH2
self.rbuf = ssh_audit.ReadBuf
self.wbuf = ssh_audit.WriteBuf
self.audit = ssh_audit.audit
self.AuditConf = ssh_audit.AuditConf
def _conf(self):
conf = self.AuditConf('localhost', 22)
conf.colors = False
conf.batch = True
conf.verbose = True
conf.ssh1 = False
conf.ssh2 = True
return conf
@classmethod
def _create_ssh2_packet(cls, payload):
padding = -(len(payload) + 5) % 8
if padding < 4:
padding += 8
plen = len(payload) + padding + 1
pad_bytes = b'\x00' * padding
data = struct.pack('>Ib', plen, padding) + payload + pad_bytes
return data
def _kex_payload(self):
w = self.wbuf()
@ -46,3 +69,87 @@ class TestSSH2(object):
assert kex.server.languages == [u'']
assert kex.follows is False
assert kex.unused == 0
def _get_empty_kex(self, cookie=None):
kex_algs, key_algs = [], []
enc, mac, compression, languages = [], [], ['none'], []
cli = self.ssh2.KexParty(enc, mac, compression, languages)
enc, mac, compression, languages = [], [], ['none'], []
srv = self.ssh2.KexParty(enc, mac, compression, languages)
if cookie is None:
cookie = os.urandom(16)
kex = self.ssh2.Kex(cookie, kex_algs, key_algs, cli, srv, 0)
return kex
def _get_kex_variat1(self):
cookie = b'\x00\x11\x22\x33\x44\x55\x66\x77\x88\x99\xaa\xbb\xcc\xdd\xee\xff'
kex = self._get_empty_kex(cookie)
kex.kex_algorithms.append('curve25519-sha256@libssh.org')
kex.kex_algorithms.append('ecdh-sha2-nistp256')
kex.kex_algorithms.append('ecdh-sha2-nistp384')
kex.kex_algorithms.append('ecdh-sha2-nistp521')
kex.kex_algorithms.append('diffie-hellman-group-exchange-sha256')
kex.kex_algorithms.append('diffie-hellman-group14-sha1')
kex.key_algorithms.append('ssh-rsa')
kex.key_algorithms.append('rsa-sha2-512')
kex.key_algorithms.append('rsa-sha2-256')
kex.key_algorithms.append('ssh-ed25519')
kex.server.encryption.append('chacha20-poly1305@openssh.com')
kex.server.encryption.append('aes128-ctr')
kex.server.encryption.append('aes192-ctr')
kex.server.encryption.append('aes256-ctr')
kex.server.encryption.append('aes128-gcm@openssh.com')
kex.server.encryption.append('aes256-gcm@openssh.com')
kex.server.encryption.append('aes128-cbc')
kex.server.encryption.append('aes192-cbc')
kex.server.encryption.append('aes256-cbc')
kex.server.mac.append('umac-64-etm@openssh.com')
kex.server.mac.append('umac-128-etm@openssh.com')
kex.server.mac.append('hmac-sha2-256-etm@openssh.com')
kex.server.mac.append('hmac-sha2-512-etm@openssh.com')
kex.server.mac.append('hmac-sha1-etm@openssh.com')
kex.server.mac.append('umac-64@openssh.com')
kex.server.mac.append('umac-128@openssh.com')
kex.server.mac.append('hmac-sha2-256')
kex.server.mac.append('hmac-sha2-512')
kex.server.mac.append('hmac-sha1')
kex.server.compression.append('zlib@openssh.com')
for a in kex.server.encryption:
kex.client.encryption.append(a)
for a in kex.server.mac:
kex.client.mac.append(a)
for a in kex.server.compression:
if a == 'none':
continue
kex.client.compression.append(a)
return kex
def test_key_payload(self):
kex1 = self._get_kex_variat1()
kex2 = self.ssh2.Kex.parse(self._kex_payload())
assert kex1.payload == kex2.payload
def test_ssh2_server_simple(self, output_spy, virtual_socket):
vsocket = virtual_socket
w = self.wbuf()
w.write_byte(self.ssh.Protocol.MSG_KEXINIT)
w.write(self._kex_payload())
vsocket.rdata.append(b'SSH-2.0-OpenSSH_7.3 ssh-audit-test\r\n')
vsocket.rdata.append(self._create_ssh2_packet(w.write_flush()))
output_spy.begin()
self.audit(self._conf())
lines = output_spy.flush()
assert len(lines) == 72
def test_ssh2_server_invalid_first_packet(self, output_spy, virtual_socket):
vsocket = virtual_socket
w = self.wbuf()
w.write_byte(self.ssh.Protocol.MSG_KEXINIT + 1)
vsocket.rdata.append(b'SSH-2.0-OpenSSH_7.3 ssh-audit-test\r\n')
vsocket.rdata.append(self._create_ssh2_packet(w.write_flush()))
output_spy.begin()
with pytest.raises(SystemExit):
self.audit(self._conf())
lines = output_spy.flush()
assert len(lines) == 3
assert 'unknown message' in lines[-1]

View File

@ -3,6 +3,7 @@
import pytest
# pylint: disable=attribute-defined-outside-init
class TestVersionCompare(object):
@pytest.fixture(autouse=True)
def init(self, ssh_audit):