Specify error when couldn't get banner. Test for timeout and retry cases.

This commit is contained in:
Andris Raugulis
2016-11-02 13:00:24 +02:00
parent dd3ca9688e
commit 44c1d4827c
2 changed files with 78 additions and 44 deletions

View File

@ -17,46 +17,91 @@ class TestErrors(object):
conf.batch = True
return conf
def _audit(self, spy, conf=None, sysexit=True):
if conf is None:
conf = self._conf()
spy.begin()
if sysexit:
with pytest.raises(SystemExit):
self.audit(conf)
else:
self.audit(conf)
lines = spy.flush()
return lines
def test_connection_refused(self, output_spy, virtual_socket):
vsocket = virtual_socket
vsocket.errors['connect'] = socket.error(61, 'Connection refused')
output_spy.begin()
with pytest.raises(SystemExit):
self.audit(self._conf())
lines = output_spy.flush()
lines = self._audit(output_spy)
assert len(lines) == 1
assert 'Connection refused' in lines[-1]
def test_connection_timeout(self, output_spy, virtual_socket):
vsocket = virtual_socket
vsocket.errors['connect'] = socket.timeout('timed out')
lines = self._audit(output_spy)
assert len(lines) == 1
assert 'timed out' in lines[-1]
def test_recv_empty(self, output_spy, virtual_socket):
vsocket = virtual_socket
lines = self._audit(output_spy)
assert len(lines) == 1
assert 'did not receive banner' in lines[-1]
def test_recv_timeout(self, output_spy, virtual_socket):
vsocket = virtual_socket
vsocket.rdata.append(socket.timeout('timed out'))
lines = self._audit(output_spy)
assert len(lines) == 1
assert 'did not receive banner' in lines[-1]
assert 'timed out' in lines[-1]
def test_recv_retry_till_timeout(self, output_spy, virtual_socket):
vsocket = virtual_socket
vsocket.rdata.append(socket.error(35, 'Resource temporarily unavailable'))
vsocket.rdata.append(socket.error(35, 'Resource temporarily unavailable'))
vsocket.rdata.append(socket.error(35, 'Resource temporarily unavailable'))
vsocket.rdata.append(socket.timeout('timed out'))
lines = self._audit(output_spy)
assert len(lines) == 1
assert 'did not receive banner' in lines[-1]
assert 'timed out' in lines[-1]
def test_recv_retry_till_reset(self, output_spy, virtual_socket):
vsocket = virtual_socket
vsocket.rdata.append(socket.error(35, 'Resource temporarily unavailable'))
vsocket.rdata.append(socket.error(35, 'Resource temporarily unavailable'))
vsocket.rdata.append(socket.error(35, 'Resource temporarily unavailable'))
vsocket.rdata.append(socket.error(54, 'Connection reset by peer'))
lines = self._audit(output_spy)
assert len(lines) == 1
assert 'did not receive banner' in lines[-1]
assert 'reset by peer' in lines[-1]
def test_connection_closed_before_banner(self, output_spy, virtual_socket):
vsocket = virtual_socket
vsocket.rdata.append(socket.error(54, 'Connection reset by peer'))
output_spy.begin()
with pytest.raises(SystemExit):
self.audit(self._conf())
lines = output_spy.flush()
lines = self._audit(output_spy)
assert len(lines) == 1
assert 'did not receive banner' in lines[-1]
assert 'reset by peer' in lines[-1]
def test_connection_closed_after_header(self, output_spy, virtual_socket):
vsocket = virtual_socket
vsocket.rdata.append(b'header line 1\n')
vsocket.rdata.append(b'header line 2\n')
vsocket.rdata.append(socket.error(54, 'Connection reset by peer'))
output_spy.begin()
with pytest.raises(SystemExit):
self.audit(self._conf())
lines = output_spy.flush()
lines = self._audit(output_spy)
assert len(lines) == 3
assert 'did not receive banner' in lines[-1]
assert 'reset by peer' in lines[-1]
def test_connection_closed_after_banner(self, output_spy, virtual_socket):
vsocket = virtual_socket
vsocket.rdata.append(b'SSH-2.0-ssh-audit-test\r\n')
vsocket.rdata.append(socket.error(54, 'Connection reset by peer'))
output_spy.begin()
with pytest.raises(SystemExit):
self.audit(self._conf())
lines = output_spy.flush()
lines = self._audit(output_spy)
assert len(lines) == 2
assert 'error reading packet' in lines[-1]
assert 'reset by peer' in lines[-1]
@ -64,10 +109,7 @@ class TestErrors(object):
def test_empty_data_after_banner(self, output_spy, virtual_socket):
vsocket = virtual_socket
vsocket.rdata.append(b'SSH-2.0-ssh-audit-test\r\n')
output_spy.begin()
with pytest.raises(SystemExit):
self.audit(self._conf())
lines = output_spy.flush()
lines = self._audit(output_spy)
assert len(lines) == 2
assert 'error reading packet' in lines[-1]
assert 'empty' in lines[-1]
@ -76,10 +118,7 @@ class TestErrors(object):
vsocket = virtual_socket
vsocket.rdata.append(b'SSH-2.0-ssh-audit-test\r\n')
vsocket.rdata.append(b'xxx\n')
output_spy.begin()
with pytest.raises(SystemExit):
self.audit(self._conf())
lines = output_spy.flush()
lines = self._audit(output_spy)
assert len(lines) == 2
assert 'error reading packet' in lines[-1]
assert 'xxx' in lines[-1]
@ -87,10 +126,7 @@ class TestErrors(object):
def test_non_ascii_banner(self, output_spy, virtual_socket):
vsocket = virtual_socket
vsocket.rdata.append(b'SSH-2.0-ssh-audit-test\xc3\xbc\r\n')
output_spy.begin()
with pytest.raises(SystemExit):
self.audit(self._conf())
lines = output_spy.flush()
lines = self._audit(output_spy)
assert len(lines) == 3
assert 'error reading packet' in lines[-1]
assert 'ASCII' in lines[-2]
@ -100,10 +136,7 @@ class TestErrors(object):
vsocket = virtual_socket
vsocket.rdata.append(b'SSH-2.0-ssh-audit-test\r\n')
vsocket.rdata.append(b'\x81\xff\n')
output_spy.begin()
with pytest.raises(SystemExit):
self.audit(self._conf())
lines = output_spy.flush()
lines = self._audit(output_spy)
assert len(lines) == 2
assert 'error reading packet' in lines[-1]
assert '\\x81\\xff' in lines[-1]
@ -112,12 +145,9 @@ class TestErrors(object):
vsocket = virtual_socket
vsocket.rdata.append(b'SSH-1.3-ssh-audit-test\r\n')
vsocket.rdata.append(b'Protocol major versions differ.\n')
output_spy.begin()
with pytest.raises(SystemExit):
conf = self._conf()
conf.ssh1, conf.ssh2 = True, False
self.audit(conf)
lines = output_spy.flush()
conf = self._conf()
conf.ssh1, conf.ssh2 = True, False
lines = self._audit(output_spy, conf)
assert len(lines) == 3
assert 'error reading packet' in lines[-1]
assert 'major versions differ' in lines[-1]