mirror of
https://github.com/jtesta/ssh-audit.git
synced 2025-06-23 11:04:31 +02:00
Converted tab indents to spaces.
This commit is contained in:
@ -7,151 +7,151 @@ import pytest
|
||||
|
||||
# pylint: disable=line-too-long,attribute-defined-outside-init
|
||||
class TestSSH2(object):
|
||||
@pytest.fixture(autouse=True)
|
||||
def init(self, ssh_audit):
|
||||
self.ssh = ssh_audit.SSH
|
||||
self.ssh2 = ssh_audit.SSH2
|
||||
self.rbuf = ssh_audit.ReadBuf
|
||||
self.wbuf = ssh_audit.WriteBuf
|
||||
self.audit = ssh_audit.audit
|
||||
self.AuditConf = ssh_audit.AuditConf
|
||||
@pytest.fixture(autouse=True)
|
||||
def init(self, ssh_audit):
|
||||
self.ssh = ssh_audit.SSH
|
||||
self.ssh2 = ssh_audit.SSH2
|
||||
self.rbuf = ssh_audit.ReadBuf
|
||||
self.wbuf = ssh_audit.WriteBuf
|
||||
self.audit = ssh_audit.audit
|
||||
self.AuditConf = ssh_audit.AuditConf
|
||||
|
||||
def _conf(self):
|
||||
conf = self.AuditConf('localhost', 22)
|
||||
conf.colors = False
|
||||
conf.batch = True
|
||||
conf.verbose = True
|
||||
conf.ssh1 = False
|
||||
conf.ssh2 = True
|
||||
return conf
|
||||
def _conf(self):
|
||||
conf = self.AuditConf('localhost', 22)
|
||||
conf.colors = False
|
||||
conf.batch = True
|
||||
conf.verbose = True
|
||||
conf.ssh1 = False
|
||||
conf.ssh2 = True
|
||||
return conf
|
||||
|
||||
@classmethod
|
||||
def _create_ssh2_packet(cls, payload):
|
||||
padding = -(len(payload) + 5) % 8
|
||||
if padding < 4:
|
||||
padding += 8
|
||||
plen = len(payload) + padding + 1
|
||||
pad_bytes = b'\x00' * padding
|
||||
data = struct.pack('>Ib', plen, padding) + payload + pad_bytes
|
||||
return data
|
||||
@classmethod
|
||||
def _create_ssh2_packet(cls, payload):
|
||||
padding = -(len(payload) + 5) % 8
|
||||
if padding < 4:
|
||||
padding += 8
|
||||
plen = len(payload) + padding + 1
|
||||
pad_bytes = b'\x00' * padding
|
||||
data = struct.pack('>Ib', plen, padding) + payload + pad_bytes
|
||||
return data
|
||||
|
||||
def _kex_payload(self):
|
||||
w = self.wbuf()
|
||||
w.write(b'\x00\x11\x22\x33\x44\x55\x66\x77\x88\x99\xaa\xbb\xcc\xdd\xee\xff')
|
||||
w.write_list([u'curve25519-sha256@libssh.org', u'ecdh-sha2-nistp256', u'ecdh-sha2-nistp384', u'ecdh-sha2-nistp521', u'diffie-hellman-group-exchange-sha256', u'diffie-hellman-group14-sha1'])
|
||||
w.write_list([u'ssh-rsa', u'rsa-sha2-512', u'rsa-sha2-256', u'ssh-ed25519'])
|
||||
w.write_list([u'chacha20-poly1305@openssh.com', u'aes128-ctr', u'aes192-ctr', u'aes256-ctr', u'aes128-gcm@openssh.com', u'aes256-gcm@openssh.com', u'aes128-cbc', u'aes192-cbc', u'aes256-cbc'])
|
||||
w.write_list([u'chacha20-poly1305@openssh.com', u'aes128-ctr', u'aes192-ctr', u'aes256-ctr', u'aes128-gcm@openssh.com', u'aes256-gcm@openssh.com', u'aes128-cbc', u'aes192-cbc', u'aes256-cbc'])
|
||||
w.write_list([u'umac-64-etm@openssh.com', u'umac-128-etm@openssh.com', u'hmac-sha2-256-etm@openssh.com', u'hmac-sha2-512-etm@openssh.com', u'hmac-sha1-etm@openssh.com', u'umac-64@openssh.com', u'umac-128@openssh.com', u'hmac-sha2-256', u'hmac-sha2-512', u'hmac-sha1'])
|
||||
w.write_list([u'umac-64-etm@openssh.com', u'umac-128-etm@openssh.com', u'hmac-sha2-256-etm@openssh.com', u'hmac-sha2-512-etm@openssh.com', u'hmac-sha1-etm@openssh.com', u'umac-64@openssh.com', u'umac-128@openssh.com', u'hmac-sha2-256', u'hmac-sha2-512', u'hmac-sha1'])
|
||||
w.write_list([u'none', u'zlib@openssh.com'])
|
||||
w.write_list([u'none', u'zlib@openssh.com'])
|
||||
w.write_list([u''])
|
||||
w.write_list([u''])
|
||||
w.write_byte(False)
|
||||
w.write_int(0)
|
||||
return w.write_flush()
|
||||
def _kex_payload(self):
|
||||
w = self.wbuf()
|
||||
w.write(b'\x00\x11\x22\x33\x44\x55\x66\x77\x88\x99\xaa\xbb\xcc\xdd\xee\xff')
|
||||
w.write_list([u'curve25519-sha256@libssh.org', u'ecdh-sha2-nistp256', u'ecdh-sha2-nistp384', u'ecdh-sha2-nistp521', u'diffie-hellman-group-exchange-sha256', u'diffie-hellman-group14-sha1'])
|
||||
w.write_list([u'ssh-rsa', u'rsa-sha2-512', u'rsa-sha2-256', u'ssh-ed25519'])
|
||||
w.write_list([u'chacha20-poly1305@openssh.com', u'aes128-ctr', u'aes192-ctr', u'aes256-ctr', u'aes128-gcm@openssh.com', u'aes256-gcm@openssh.com', u'aes128-cbc', u'aes192-cbc', u'aes256-cbc'])
|
||||
w.write_list([u'chacha20-poly1305@openssh.com', u'aes128-ctr', u'aes192-ctr', u'aes256-ctr', u'aes128-gcm@openssh.com', u'aes256-gcm@openssh.com', u'aes128-cbc', u'aes192-cbc', u'aes256-cbc'])
|
||||
w.write_list([u'umac-64-etm@openssh.com', u'umac-128-etm@openssh.com', u'hmac-sha2-256-etm@openssh.com', u'hmac-sha2-512-etm@openssh.com', u'hmac-sha1-etm@openssh.com', u'umac-64@openssh.com', u'umac-128@openssh.com', u'hmac-sha2-256', u'hmac-sha2-512', u'hmac-sha1'])
|
||||
w.write_list([u'umac-64-etm@openssh.com', u'umac-128-etm@openssh.com', u'hmac-sha2-256-etm@openssh.com', u'hmac-sha2-512-etm@openssh.com', u'hmac-sha1-etm@openssh.com', u'umac-64@openssh.com', u'umac-128@openssh.com', u'hmac-sha2-256', u'hmac-sha2-512', u'hmac-sha1'])
|
||||
w.write_list([u'none', u'zlib@openssh.com'])
|
||||
w.write_list([u'none', u'zlib@openssh.com'])
|
||||
w.write_list([u''])
|
||||
w.write_list([u''])
|
||||
w.write_byte(False)
|
||||
w.write_int(0)
|
||||
return w.write_flush()
|
||||
|
||||
def test_kex_read(self):
|
||||
kex = self.ssh2.Kex.parse(self._kex_payload())
|
||||
assert kex is not None
|
||||
assert kex.cookie == b'\x00\x11\x22\x33\x44\x55\x66\x77\x88\x99\xaa\xbb\xcc\xdd\xee\xff'
|
||||
assert kex.kex_algorithms == [u'curve25519-sha256@libssh.org', u'ecdh-sha2-nistp256', u'ecdh-sha2-nistp384', u'ecdh-sha2-nistp521', u'diffie-hellman-group-exchange-sha256', u'diffie-hellman-group14-sha1']
|
||||
assert kex.key_algorithms == [u'ssh-rsa', u'rsa-sha2-512', u'rsa-sha2-256', u'ssh-ed25519']
|
||||
assert kex.client is not None
|
||||
assert kex.server is not None
|
||||
assert kex.client.encryption == [u'chacha20-poly1305@openssh.com', u'aes128-ctr', u'aes192-ctr', u'aes256-ctr', u'aes128-gcm@openssh.com', u'aes256-gcm@openssh.com', u'aes128-cbc', u'aes192-cbc', u'aes256-cbc']
|
||||
assert kex.server.encryption == [u'chacha20-poly1305@openssh.com', u'aes128-ctr', u'aes192-ctr', u'aes256-ctr', u'aes128-gcm@openssh.com', u'aes256-gcm@openssh.com', u'aes128-cbc', u'aes192-cbc', u'aes256-cbc']
|
||||
assert kex.client.mac == [u'umac-64-etm@openssh.com', u'umac-128-etm@openssh.com', u'hmac-sha2-256-etm@openssh.com', u'hmac-sha2-512-etm@openssh.com', u'hmac-sha1-etm@openssh.com', u'umac-64@openssh.com', u'umac-128@openssh.com', u'hmac-sha2-256', u'hmac-sha2-512', u'hmac-sha1']
|
||||
assert kex.server.mac == [u'umac-64-etm@openssh.com', u'umac-128-etm@openssh.com', u'hmac-sha2-256-etm@openssh.com', u'hmac-sha2-512-etm@openssh.com', u'hmac-sha1-etm@openssh.com', u'umac-64@openssh.com', u'umac-128@openssh.com', u'hmac-sha2-256', u'hmac-sha2-512', u'hmac-sha1']
|
||||
assert kex.client.compression == [u'none', u'zlib@openssh.com']
|
||||
assert kex.server.compression == [u'none', u'zlib@openssh.com']
|
||||
assert kex.client.languages == [u'']
|
||||
assert kex.server.languages == [u'']
|
||||
assert kex.follows is False
|
||||
assert kex.unused == 0
|
||||
def test_kex_read(self):
|
||||
kex = self.ssh2.Kex.parse(self._kex_payload())
|
||||
assert kex is not None
|
||||
assert kex.cookie == b'\x00\x11\x22\x33\x44\x55\x66\x77\x88\x99\xaa\xbb\xcc\xdd\xee\xff'
|
||||
assert kex.kex_algorithms == [u'curve25519-sha256@libssh.org', u'ecdh-sha2-nistp256', u'ecdh-sha2-nistp384', u'ecdh-sha2-nistp521', u'diffie-hellman-group-exchange-sha256', u'diffie-hellman-group14-sha1']
|
||||
assert kex.key_algorithms == [u'ssh-rsa', u'rsa-sha2-512', u'rsa-sha2-256', u'ssh-ed25519']
|
||||
assert kex.client is not None
|
||||
assert kex.server is not None
|
||||
assert kex.client.encryption == [u'chacha20-poly1305@openssh.com', u'aes128-ctr', u'aes192-ctr', u'aes256-ctr', u'aes128-gcm@openssh.com', u'aes256-gcm@openssh.com', u'aes128-cbc', u'aes192-cbc', u'aes256-cbc']
|
||||
assert kex.server.encryption == [u'chacha20-poly1305@openssh.com', u'aes128-ctr', u'aes192-ctr', u'aes256-ctr', u'aes128-gcm@openssh.com', u'aes256-gcm@openssh.com', u'aes128-cbc', u'aes192-cbc', u'aes256-cbc']
|
||||
assert kex.client.mac == [u'umac-64-etm@openssh.com', u'umac-128-etm@openssh.com', u'hmac-sha2-256-etm@openssh.com', u'hmac-sha2-512-etm@openssh.com', u'hmac-sha1-etm@openssh.com', u'umac-64@openssh.com', u'umac-128@openssh.com', u'hmac-sha2-256', u'hmac-sha2-512', u'hmac-sha1']
|
||||
assert kex.server.mac == [u'umac-64-etm@openssh.com', u'umac-128-etm@openssh.com', u'hmac-sha2-256-etm@openssh.com', u'hmac-sha2-512-etm@openssh.com', u'hmac-sha1-etm@openssh.com', u'umac-64@openssh.com', u'umac-128@openssh.com', u'hmac-sha2-256', u'hmac-sha2-512', u'hmac-sha1']
|
||||
assert kex.client.compression == [u'none', u'zlib@openssh.com']
|
||||
assert kex.server.compression == [u'none', u'zlib@openssh.com']
|
||||
assert kex.client.languages == [u'']
|
||||
assert kex.server.languages == [u'']
|
||||
assert kex.follows is False
|
||||
assert kex.unused == 0
|
||||
|
||||
def _get_empty_kex(self, cookie=None):
|
||||
kex_algs, key_algs = [], []
|
||||
enc, mac, compression, languages = [], [], ['none'], []
|
||||
cli = self.ssh2.KexParty(enc, mac, compression, languages)
|
||||
enc, mac, compression, languages = [], [], ['none'], []
|
||||
srv = self.ssh2.KexParty(enc, mac, compression, languages)
|
||||
if cookie is None:
|
||||
cookie = os.urandom(16)
|
||||
kex = self.ssh2.Kex(cookie, kex_algs, key_algs, cli, srv, 0)
|
||||
return kex
|
||||
def _get_empty_kex(self, cookie=None):
|
||||
kex_algs, key_algs = [], []
|
||||
enc, mac, compression, languages = [], [], ['none'], []
|
||||
cli = self.ssh2.KexParty(enc, mac, compression, languages)
|
||||
enc, mac, compression, languages = [], [], ['none'], []
|
||||
srv = self.ssh2.KexParty(enc, mac, compression, languages)
|
||||
if cookie is None:
|
||||
cookie = os.urandom(16)
|
||||
kex = self.ssh2.Kex(cookie, kex_algs, key_algs, cli, srv, 0)
|
||||
return kex
|
||||
|
||||
def _get_kex_variat1(self):
|
||||
cookie = b'\x00\x11\x22\x33\x44\x55\x66\x77\x88\x99\xaa\xbb\xcc\xdd\xee\xff'
|
||||
kex = self._get_empty_kex(cookie)
|
||||
kex.kex_algorithms.append('curve25519-sha256@libssh.org')
|
||||
kex.kex_algorithms.append('ecdh-sha2-nistp256')
|
||||
kex.kex_algorithms.append('ecdh-sha2-nistp384')
|
||||
kex.kex_algorithms.append('ecdh-sha2-nistp521')
|
||||
kex.kex_algorithms.append('diffie-hellman-group-exchange-sha256')
|
||||
kex.kex_algorithms.append('diffie-hellman-group14-sha1')
|
||||
kex.key_algorithms.append('ssh-rsa')
|
||||
kex.key_algorithms.append('rsa-sha2-512')
|
||||
kex.key_algorithms.append('rsa-sha2-256')
|
||||
kex.key_algorithms.append('ssh-ed25519')
|
||||
kex.server.encryption.append('chacha20-poly1305@openssh.com')
|
||||
kex.server.encryption.append('aes128-ctr')
|
||||
kex.server.encryption.append('aes192-ctr')
|
||||
kex.server.encryption.append('aes256-ctr')
|
||||
kex.server.encryption.append('aes128-gcm@openssh.com')
|
||||
kex.server.encryption.append('aes256-gcm@openssh.com')
|
||||
kex.server.encryption.append('aes128-cbc')
|
||||
kex.server.encryption.append('aes192-cbc')
|
||||
kex.server.encryption.append('aes256-cbc')
|
||||
kex.server.mac.append('umac-64-etm@openssh.com')
|
||||
kex.server.mac.append('umac-128-etm@openssh.com')
|
||||
kex.server.mac.append('hmac-sha2-256-etm@openssh.com')
|
||||
kex.server.mac.append('hmac-sha2-512-etm@openssh.com')
|
||||
kex.server.mac.append('hmac-sha1-etm@openssh.com')
|
||||
kex.server.mac.append('umac-64@openssh.com')
|
||||
kex.server.mac.append('umac-128@openssh.com')
|
||||
kex.server.mac.append('hmac-sha2-256')
|
||||
kex.server.mac.append('hmac-sha2-512')
|
||||
kex.server.mac.append('hmac-sha1')
|
||||
kex.server.compression.append('zlib@openssh.com')
|
||||
for a in kex.server.encryption:
|
||||
kex.client.encryption.append(a)
|
||||
for a in kex.server.mac:
|
||||
kex.client.mac.append(a)
|
||||
for a in kex.server.compression:
|
||||
if a == 'none':
|
||||
continue
|
||||
kex.client.compression.append(a)
|
||||
return kex
|
||||
def _get_kex_variat1(self):
|
||||
cookie = b'\x00\x11\x22\x33\x44\x55\x66\x77\x88\x99\xaa\xbb\xcc\xdd\xee\xff'
|
||||
kex = self._get_empty_kex(cookie)
|
||||
kex.kex_algorithms.append('curve25519-sha256@libssh.org')
|
||||
kex.kex_algorithms.append('ecdh-sha2-nistp256')
|
||||
kex.kex_algorithms.append('ecdh-sha2-nistp384')
|
||||
kex.kex_algorithms.append('ecdh-sha2-nistp521')
|
||||
kex.kex_algorithms.append('diffie-hellman-group-exchange-sha256')
|
||||
kex.kex_algorithms.append('diffie-hellman-group14-sha1')
|
||||
kex.key_algorithms.append('ssh-rsa')
|
||||
kex.key_algorithms.append('rsa-sha2-512')
|
||||
kex.key_algorithms.append('rsa-sha2-256')
|
||||
kex.key_algorithms.append('ssh-ed25519')
|
||||
kex.server.encryption.append('chacha20-poly1305@openssh.com')
|
||||
kex.server.encryption.append('aes128-ctr')
|
||||
kex.server.encryption.append('aes192-ctr')
|
||||
kex.server.encryption.append('aes256-ctr')
|
||||
kex.server.encryption.append('aes128-gcm@openssh.com')
|
||||
kex.server.encryption.append('aes256-gcm@openssh.com')
|
||||
kex.server.encryption.append('aes128-cbc')
|
||||
kex.server.encryption.append('aes192-cbc')
|
||||
kex.server.encryption.append('aes256-cbc')
|
||||
kex.server.mac.append('umac-64-etm@openssh.com')
|
||||
kex.server.mac.append('umac-128-etm@openssh.com')
|
||||
kex.server.mac.append('hmac-sha2-256-etm@openssh.com')
|
||||
kex.server.mac.append('hmac-sha2-512-etm@openssh.com')
|
||||
kex.server.mac.append('hmac-sha1-etm@openssh.com')
|
||||
kex.server.mac.append('umac-64@openssh.com')
|
||||
kex.server.mac.append('umac-128@openssh.com')
|
||||
kex.server.mac.append('hmac-sha2-256')
|
||||
kex.server.mac.append('hmac-sha2-512')
|
||||
kex.server.mac.append('hmac-sha1')
|
||||
kex.server.compression.append('zlib@openssh.com')
|
||||
for a in kex.server.encryption:
|
||||
kex.client.encryption.append(a)
|
||||
for a in kex.server.mac:
|
||||
kex.client.mac.append(a)
|
||||
for a in kex.server.compression:
|
||||
if a == 'none':
|
||||
continue
|
||||
kex.client.compression.append(a)
|
||||
return kex
|
||||
|
||||
def test_key_payload(self):
|
||||
kex1 = self._get_kex_variat1()
|
||||
kex2 = self.ssh2.Kex.parse(self._kex_payload())
|
||||
assert kex1.payload == kex2.payload
|
||||
def test_key_payload(self):
|
||||
kex1 = self._get_kex_variat1()
|
||||
kex2 = self.ssh2.Kex.parse(self._kex_payload())
|
||||
assert kex1.payload == kex2.payload
|
||||
|
||||
@pytest.mark.skip(reason="Temporarily skip this test to have a working test suite!")
|
||||
def test_ssh2_server_simple(self, output_spy, virtual_socket):
|
||||
vsocket = virtual_socket
|
||||
w = self.wbuf()
|
||||
w.write_byte(self.ssh.Protocol.MSG_KEXINIT)
|
||||
w.write(self._kex_payload())
|
||||
vsocket.rdata.append(b'SSH-2.0-OpenSSH_7.3 ssh-audit-test\r\n')
|
||||
vsocket.rdata.append(self._create_ssh2_packet(w.write_flush()))
|
||||
output_spy.begin()
|
||||
self.audit(self._conf())
|
||||
lines = output_spy.flush()
|
||||
assert len(lines) == 72
|
||||
@pytest.mark.skip(reason="Temporarily skip this test to have a working test suite!")
|
||||
def test_ssh2_server_simple(self, output_spy, virtual_socket):
|
||||
vsocket = virtual_socket
|
||||
w = self.wbuf()
|
||||
w.write_byte(self.ssh.Protocol.MSG_KEXINIT)
|
||||
w.write(self._kex_payload())
|
||||
vsocket.rdata.append(b'SSH-2.0-OpenSSH_7.3 ssh-audit-test\r\n')
|
||||
vsocket.rdata.append(self._create_ssh2_packet(w.write_flush()))
|
||||
output_spy.begin()
|
||||
self.audit(self._conf())
|
||||
lines = output_spy.flush()
|
||||
assert len(lines) == 72
|
||||
|
||||
def test_ssh2_server_invalid_first_packet(self, output_spy, virtual_socket):
|
||||
vsocket = virtual_socket
|
||||
w = self.wbuf()
|
||||
w.write_byte(self.ssh.Protocol.MSG_KEXINIT + 1)
|
||||
vsocket.rdata.append(b'SSH-2.0-OpenSSH_7.3 ssh-audit-test\r\n')
|
||||
vsocket.rdata.append(self._create_ssh2_packet(w.write_flush()))
|
||||
output_spy.begin()
|
||||
with pytest.raises(SystemExit):
|
||||
self.audit(self._conf())
|
||||
lines = output_spy.flush()
|
||||
assert len(lines) == 3
|
||||
assert 'unknown message' in lines[-1]
|
||||
def test_ssh2_server_invalid_first_packet(self, output_spy, virtual_socket):
|
||||
vsocket = virtual_socket
|
||||
w = self.wbuf()
|
||||
w.write_byte(self.ssh.Protocol.MSG_KEXINIT + 1)
|
||||
vsocket.rdata.append(b'SSH-2.0-OpenSSH_7.3 ssh-audit-test\r\n')
|
||||
vsocket.rdata.append(self._create_ssh2_packet(w.write_flush()))
|
||||
output_spy.begin()
|
||||
with pytest.raises(SystemExit):
|
||||
self.audit(self._conf())
|
||||
lines = output_spy.flush()
|
||||
assert len(lines) == 3
|
||||
assert 'unknown message' in lines[-1]
|
||||
|
Reference in New Issue
Block a user