ssh-audit/test/test_errors.py

174 lines
6.8 KiB
Python
Raw Permalink Normal View History

2016-10-25 16:19:08 +02:00
import socket
2016-11-02 12:18:03 +01:00
import errno
2016-10-25 16:19:08 +02:00
import pytest
2021-02-01 19:10:06 +01:00
from ssh_audit.outputbuffer import OutputBuffer
2016-10-25 16:19:08 +02:00
# pylint: disable=attribute-defined-outside-init
Remove some more Python 2 leftovers (#37) * Remove mypy job for Python 2 modified: tox.ini * Remove Python 2 compatibility import modified: ssh-audit.py * Remove compatibility import for BytesIO and StringIO This is no longer necessary, as support for Python 2 was dropped. modified: ssh-audit.py * Remove `text-type` compatibility layer ... as support for Python 2 was dropped already. modified: ssh-audit.py * Remove `binary-type` compatibility layer ... as support for Python 2 was dropped already. modified: ssh-audit.py * Remove try-except block for typing ... as since Python 3.5 it is included in the standard library. modified: ssh-audit.py * Move typing import to top of module modified: ssh-audit.py * Remove obsolete encoding declaration modified: ssh-audit.py * Apply pyupgrade on ssh-audit.py pyupgrade is a tool which updates Python code to modern syntax modified: ssh-audit.py * Remove Python 2 compatibility from conftest.py modified: test/conftest.py * Remove Python 2 compatibility from test_auditconf.py modified: test/test_auditconf.py * Remove Python 2 compatibility from test_banner.py modified: test/test_banner.py * Remove Python 2 compatibility from test_buffer.py modified: test/test_buffer.py * Remove Python 2 compatibility from test_errors.py modified: test/test_errors.py * Remove Python 2 compatibility from test_output.py modified: test/test_output.py * Remove Python 2 compatibility from test_resolve.py modified: test/test_resolve.py * Remove Python 2 compatibility from test_socket.py modified: test/test_socket.py * Remove Python 2 compatibility from test_software.py modified: test/test_software.py * Remove Python 2 compatibility from test_ssh_algorithm.py modified: test/test_ssh_algorithm.py * Remove Python 2 compatibility from test_ssh1.py modified: test/test_ssh1.py * Remove Python 2 compatibility from test_ssh2.py modified: test/test_ssh2.py * Remove Python 2 compatibility and Py2 only tests ... from test_utils.py. modified: test/test_utils.py * Remove Python 2 compatibility from test_version_compare.py modified: test/test_version_compare.py * Remove Python 2 job from appveyor config This was done blindly, as it is unclear whether appveyor runs at all. modified: .appveyor.yml
2020-06-15 23:05:31 +02:00
class TestErrors:
2020-06-13 03:01:10 +02:00
@pytest.fixture(autouse=True)
def init(self, ssh_audit):
self.AuditConf = ssh_audit.AuditConf
2021-02-01 19:10:06 +01:00
self.OutputBuffer = ssh_audit.OutputBuffer
2020-06-13 03:01:10 +02:00
self.audit = ssh_audit.audit
def _conf(self):
conf = self.AuditConf('localhost', 22)
conf.colors = False
conf.batch = True
conf.skip_rate_test = True
2020-06-13 03:01:10 +02:00
return conf
2020-06-30 21:53:50 +02:00
def _audit(self, spy, conf=None, exit_expected=False):
2020-06-13 03:01:10 +02:00
if conf is None:
conf = self._conf()
spy.begin()
2020-06-30 21:53:50 +02:00
2021-02-01 19:10:06 +01:00
out = OutputBuffer()
2020-06-30 21:53:50 +02:00
if exit_expected:
2020-06-13 03:01:10 +02:00
with pytest.raises(SystemExit):
2021-02-01 19:10:06 +01:00
self.audit(out, conf)
2020-06-13 03:01:10 +02:00
else:
2021-02-01 19:10:06 +01:00
ret = self.audit(out, conf)
2020-06-30 21:53:50 +02:00
assert ret != 0
2021-02-01 19:10:06 +01:00
out.write()
2020-06-13 03:01:10 +02:00
lines = spy.flush()
2021-02-01 19:10:06 +01:00
# If the last line is empty, delete it.
if len(lines) > 1 and lines[-1] == '':
del lines[-1]
2020-06-13 03:01:10 +02:00
return lines
def test_connection_unresolved(self, output_spy, virtual_socket):
vsocket = virtual_socket
vsocket.gsock.addrinfodata['localhost#22'] = []
2020-06-30 21:53:50 +02:00
lines = self._audit(output_spy, exit_expected=True)
2020-06-13 03:01:10 +02:00
assert len(lines) == 1
assert 'has no DNS records' in lines[-1]
def test_connection_refused(self, output_spy, virtual_socket):
vsocket = virtual_socket
vsocket.errors['connect'] = socket.error(errno.ECONNREFUSED, 'Connection refused')
2020-06-30 21:53:50 +02:00
lines = self._audit(output_spy, exit_expected=True)
2020-06-13 03:01:10 +02:00
assert len(lines) == 1
assert 'Connection refused' in lines[-1]
def test_connection_timeout(self, output_spy, virtual_socket):
vsocket = virtual_socket
vsocket.errors['connect'] = socket.timeout('timed out')
2020-06-30 21:53:50 +02:00
lines = self._audit(output_spy, exit_expected=True)
2020-06-13 03:01:10 +02:00
assert len(lines) == 1
assert 'timed out' in lines[-1]
def test_recv_empty(self, output_spy, virtual_socket):
lines = self._audit(output_spy)
assert len(lines) == 1
assert 'did not receive banner' in lines[-1]
def test_recv_timeout(self, output_spy, virtual_socket):
vsocket = virtual_socket
vsocket.rdata.append(socket.timeout('timed out'))
lines = self._audit(output_spy)
assert len(lines) == 1
assert 'did not receive banner' in lines[-1]
assert 'timed out' in lines[-1]
def test_recv_retry_till_timeout(self, output_spy, virtual_socket):
vsocket = virtual_socket
vsocket.rdata.append(socket.error(errno.EAGAIN, 'Resource temporarily unavailable'))
vsocket.rdata.append(socket.error(errno.EWOULDBLOCK, 'Resource temporarily unavailable'))
vsocket.rdata.append(socket.error(errno.EAGAIN, 'Resource temporarily unavailable'))
vsocket.rdata.append(socket.timeout('timed out'))
lines = self._audit(output_spy)
assert len(lines) == 1
assert 'did not receive banner' in lines[-1]
assert 'timed out' in lines[-1]
def test_recv_retry_till_reset(self, output_spy, virtual_socket):
vsocket = virtual_socket
vsocket.rdata.append(socket.error(errno.EAGAIN, 'Resource temporarily unavailable'))
vsocket.rdata.append(socket.error(errno.EWOULDBLOCK, 'Resource temporarily unavailable'))
vsocket.rdata.append(socket.error(errno.EAGAIN, 'Resource temporarily unavailable'))
vsocket.rdata.append(socket.error(errno.ECONNRESET, 'Connection reset by peer'))
lines = self._audit(output_spy)
assert len(lines) == 1
assert 'did not receive banner' in lines[-1]
assert 'reset by peer' in lines[-1]
def test_connection_closed_before_banner(self, output_spy, virtual_socket):
vsocket = virtual_socket
vsocket.rdata.append(socket.error(errno.ECONNRESET, 'Connection reset by peer'))
lines = self._audit(output_spy)
assert len(lines) == 1
assert 'did not receive banner' in lines[-1]
assert 'reset by peer' in lines[-1]
def test_connection_closed_after_header(self, output_spy, virtual_socket):
vsocket = virtual_socket
vsocket.rdata.append(b'header line 1\n')
vsocket.rdata.append(b'\n')
vsocket.rdata.append(b'header line 2\n')
vsocket.rdata.append(socket.error(errno.ECONNRESET, 'Connection reset by peer'))
lines = self._audit(output_spy)
assert len(lines) == 3
assert 'did not receive banner' in lines[-1]
assert 'reset by peer' in lines[-1]
def test_connection_closed_after_banner(self, output_spy, virtual_socket):
vsocket = virtual_socket
vsocket.rdata.append(b'SSH-2.0-ssh-audit-test\r\n')
vsocket.rdata.append(socket.error(54, 'Connection reset by peer'))
lines = self._audit(output_spy)
assert len(lines) == 2
assert 'error reading packet' in lines[-1]
assert 'reset by peer' in lines[-1]
def test_empty_data_after_banner(self, output_spy, virtual_socket):
vsocket = virtual_socket
vsocket.rdata.append(b'SSH-2.0-ssh-audit-test\r\n')
lines = self._audit(output_spy)
assert len(lines) == 2
assert 'error reading packet' in lines[-1]
assert 'empty' in lines[-1]
def test_wrong_data_after_banner(self, output_spy, virtual_socket):
vsocket = virtual_socket
vsocket.rdata.append(b'SSH-2.0-ssh-audit-test\r\n')
vsocket.rdata.append(b'xxx\n')
lines = self._audit(output_spy)
assert len(lines) == 2
assert 'error reading packet' in lines[-1]
assert 'xxx' in lines[-1]
def test_non_ascii_banner(self, output_spy, virtual_socket):
vsocket = virtual_socket
vsocket.rdata.append(b'SSH-2.0-ssh-audit-test\xc3\xbc\r\n')
lines = self._audit(output_spy)
assert len(lines) == 3
assert 'error reading packet' in lines[-1]
assert 'ASCII' in lines[-2]
assert lines[-3].endswith('SSH-2.0-ssh-audit-test?')
def test_nonutf8_data_after_banner(self, output_spy, virtual_socket):
vsocket = virtual_socket
vsocket.rdata.append(b'SSH-2.0-ssh-audit-test\r\n')
vsocket.rdata.append(b'\x81\xff\n')
lines = self._audit(output_spy)
assert len(lines) == 2
assert 'error reading packet' in lines[-1]
assert '\\x81\\xff' in lines[-1]
def test_protocol_mismatch_by_conf(self, output_spy, virtual_socket):
vsocket = virtual_socket
vsocket.rdata.append(b'SSH-1.3-ssh-audit-test\r\n')
vsocket.rdata.append(b'Protocol major versions differ.\n')
conf = self._conf()
conf.ssh1, conf.ssh2 = True, False
lines = self._audit(output_spy, conf)
2021-02-02 18:20:37 +01:00
assert len(lines) == 4
2020-06-13 03:01:10 +02:00
assert 'error reading packet' in lines[-1]
assert 'major versions differ' in lines[-1]