mirror of
https://github.com/jtesta/ssh-audit.git
synced 2024-12-22 14:05:22 +01:00
Disable Python2 tests. Fix pylint warnings.
This commit is contained in:
parent
22ac41bfb8
commit
9463aab4f7
@ -1,7 +1,6 @@
|
|||||||
language: python
|
language: python
|
||||||
|
|
||||||
python:
|
python:
|
||||||
- "2.7"
|
|
||||||
- "3.5"
|
- "3.5"
|
||||||
- "3.6"
|
- "3.6"
|
||||||
- "3.7"
|
- "3.7"
|
||||||
|
117
ssh-audit.py
117
ssh-audit.py
@ -91,7 +91,7 @@ def usage(err=None):
|
|||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
class AuditConf(object):
|
class AuditConf:
|
||||||
# pylint: disable=too-many-instance-attributes
|
# pylint: disable=too-many-instance-attributes
|
||||||
def __init__(self, host=None, port=22):
|
def __init__(self, host=None, port=22):
|
||||||
# type: (Optional[str], int) -> None
|
# type: (Optional[str], int) -> None
|
||||||
@ -115,10 +115,10 @@ class AuditConf(object):
|
|||||||
# type: (str, Union[str, int, bool, Sequence[int]]) -> None
|
# type: (str, Union[str, int, bool, Sequence[int]]) -> None
|
||||||
valid = False
|
valid = False
|
||||||
if name in ['ssh1', 'ssh2', 'batch', 'client_audit', 'colors', 'verbose', 'timeout_set', 'json']:
|
if name in ['ssh1', 'ssh2', 'batch', 'client_audit', 'colors', 'verbose', 'timeout_set', 'json']:
|
||||||
valid, value = True, True if bool(value) else False
|
valid, value = True, bool(value)
|
||||||
elif name in ['ipv4', 'ipv6']:
|
elif name in ['ipv4', 'ipv6']:
|
||||||
valid = False
|
valid = False
|
||||||
value = True if bool(value) else False
|
value = bool(value)
|
||||||
ipv = 4 if name == 'ipv4' else 6
|
ipv = 4 if name == 'ipv4' else 6
|
||||||
if value:
|
if value:
|
||||||
value = tuple(list(self.ipvo) + [ipv])
|
value = tuple(list(self.ipvo) + [ipv])
|
||||||
@ -156,7 +156,7 @@ class AuditConf(object):
|
|||||||
object.__setattr__(self, name, value)
|
object.__setattr__(self, name, value)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_cmdline(cls, args, usage_cb):
|
def from_cmdline(cls, args, usage_cb): # pylint: disable=too-many-statements
|
||||||
# type: (List[str], Callable[..., None]) -> AuditConf
|
# type: (List[str], Callable[..., None]) -> AuditConf
|
||||||
# pylint: disable=too-many-branches
|
# pylint: disable=too-many-branches
|
||||||
aconf = cls()
|
aconf = cls()
|
||||||
@ -231,7 +231,7 @@ class AuditConf(object):
|
|||||||
return aconf
|
return aconf
|
||||||
|
|
||||||
|
|
||||||
class Output(object):
|
class Output:
|
||||||
LEVELS = ('info', 'warn', 'fail') # type: Sequence[str]
|
LEVELS = ('info', 'warn', 'fail') # type: Sequence[str]
|
||||||
COLORS = {'head': 36, 'good': 32, 'warn': 33, 'fail': 31}
|
COLORS = {'head': 36, 'good': 32, 'warn': 33, 'fail': 31}
|
||||||
|
|
||||||
@ -314,8 +314,8 @@ class OutputBuffer(list):
|
|||||||
sys.stdout = self.__stdout
|
sys.stdout = self.__stdout
|
||||||
|
|
||||||
|
|
||||||
class SSH2(object): # pylint: disable=too-few-public-methods
|
class SSH2: # pylint: disable=too-few-public-methods
|
||||||
class KexDB(object): # pylint: disable=too-few-public-methods
|
class KexDB: # pylint: disable=too-few-public-methods
|
||||||
# pylint: disable=bad-whitespace
|
# pylint: disable=bad-whitespace
|
||||||
WARN_OPENSSH74_UNSAFE = 'disabled (in client) since OpenSSH 7.4, unsafe algorithm'
|
WARN_OPENSSH74_UNSAFE = 'disabled (in client) since OpenSSH 7.4, unsafe algorithm'
|
||||||
WARN_OPENSSH72_LEGACY = 'disabled (in client) since OpenSSH 7.2, legacy algorithm'
|
WARN_OPENSSH72_LEGACY = 'disabled (in client) since OpenSSH 7.2, legacy algorithm'
|
||||||
@ -534,7 +534,7 @@ class SSH2(object): # pylint: disable=too-few-public-methods
|
|||||||
}
|
}
|
||||||
} # type: Dict[str, Dict[str, List[List[Optional[str]]]]]
|
} # type: Dict[str, Dict[str, List[List[Optional[str]]]]]
|
||||||
|
|
||||||
class KexParty(object):
|
class KexParty:
|
||||||
def __init__(self, enc, mac, compression, languages):
|
def __init__(self, enc, mac, compression, languages):
|
||||||
# type: (List[text_type], List[text_type], List[text_type], List[text_type]) -> None
|
# type: (List[text_type], List[text_type], List[text_type], List[text_type]) -> None
|
||||||
self.__enc = enc
|
self.__enc = enc
|
||||||
@ -562,7 +562,7 @@ class SSH2(object): # pylint: disable=too-few-public-methods
|
|||||||
# type: () -> List[text_type]
|
# type: () -> List[text_type]
|
||||||
return self.__languages
|
return self.__languages
|
||||||
|
|
||||||
class Kex(object):
|
class Kex:
|
||||||
def __init__(self, cookie, kex_algs, key_algs, cli, srv, follows, unused=0):
|
def __init__(self, cookie, kex_algs, key_algs, cli, srv, follows, unused=0):
|
||||||
# type: (binary_type, List[text_type], List[text_type], SSH2.KexParty, SSH2.KexParty, bool, int) -> None
|
# type: (binary_type, List[text_type], List[text_type], SSH2.KexParty, SSH2.KexParty, bool, int) -> None
|
||||||
self.__cookie = cookie
|
self.__cookie = cookie
|
||||||
@ -678,7 +678,7 @@ class SSH2(object): # pylint: disable=too-few-public-methods
|
|||||||
return kex
|
return kex
|
||||||
|
|
||||||
# Obtains host keys, checks their size, and derives their fingerprints.
|
# Obtains host keys, checks their size, and derives their fingerprints.
|
||||||
class HostKeyTest(object):
|
class HostKeyTest:
|
||||||
# Tracks the RSA host key types. As of this writing, testing one in this family yields valid results for the rest.
|
# Tracks the RSA host key types. As of this writing, testing one in this family yields valid results for the rest.
|
||||||
RSA_FAMILY = ['ssh-rsa', 'rsa-sha2-256', 'rsa-sha2-512']
|
RSA_FAMILY = ['ssh-rsa', 'rsa-sha2-256', 'rsa-sha2-512']
|
||||||
|
|
||||||
@ -691,7 +691,7 @@ class SSH2(object): # pylint: disable=too-few-public-methods
|
|||||||
'ssh-rsa-cert-v01@openssh.com': {'cert': True, 'variable_key_len': True},
|
'ssh-rsa-cert-v01@openssh.com': {'cert': True, 'variable_key_len': True},
|
||||||
|
|
||||||
'ssh-ed25519': {'cert': False, 'variable_key_len': False},
|
'ssh-ed25519': {'cert': False, 'variable_key_len': False},
|
||||||
'ssh-ed25519-cert-v01@openssh.com': {'cert': True, 'variable_key_len': False},
|
'ssh-ed25519-cert-v01@openssh.com': {'cert': True, 'variable_key_len': False},
|
||||||
}
|
}
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
@ -723,10 +723,10 @@ class SSH2(object): # pylint: disable=too-few-public-methods
|
|||||||
break
|
break
|
||||||
|
|
||||||
if kex_str is not None:
|
if kex_str is not None:
|
||||||
SSH2.HostKeyTest.__test(s, server_kex, kex_str, kex_group, SSH2.HostKeyTest.HOST_KEY_TYPES)
|
SSH2.HostKeyTest.perform_test(s, server_kex, kex_str, kex_group, SSH2.HostKeyTest.HOST_KEY_TYPES)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def __test(s, server_kex, kex_str, kex_group, host_key_types):
|
def perform_test(s, server_kex, kex_str, kex_group, host_key_types):
|
||||||
hostkey_modulus_size = 0
|
hostkey_modulus_size = 0
|
||||||
ca_modulus_size = 0
|
ca_modulus_size = 0
|
||||||
|
|
||||||
@ -745,7 +745,8 @@ class SSH2(object): # pylint: disable=too-few-public-methods
|
|||||||
if not s.is_connected():
|
if not s.is_connected():
|
||||||
s.connect()
|
s.connect()
|
||||||
unused = None # pylint: disable=unused-variable
|
unused = None # pylint: disable=unused-variable
|
||||||
unused, unused, err = s.get_banner()
|
unused2 = None # pylint: disable=unused-variable
|
||||||
|
unused, unused2, err = s.get_banner()
|
||||||
if err is not None:
|
if err is not None:
|
||||||
s.close()
|
s.close()
|
||||||
return
|
return
|
||||||
@ -794,7 +795,7 @@ class SSH2(object): # pylint: disable=too-few-public-methods
|
|||||||
for rsa_type in SSH2.HostKeyTest.RSA_FAMILY:
|
for rsa_type in SSH2.HostKeyTest.RSA_FAMILY:
|
||||||
alg_list = SSH2.KexDB.ALGORITHMS['key'][rsa_type]
|
alg_list = SSH2.KexDB.ALGORITHMS['key'][rsa_type]
|
||||||
alg_list.append(['using small %d-bit modulus' % hostkey_modulus_size])
|
alg_list.append(['using small %d-bit modulus' % hostkey_modulus_size])
|
||||||
elif (cert is True) and ((hostkey_modulus_size < 2048) or (ca_modulus_size > 0 and ca_modulus_size < 2048)):
|
elif (cert is True) and ((hostkey_modulus_size < 2048) or (ca_modulus_size > 0 and ca_modulus_size < 2048)): # pylint: disable=chained-comparison
|
||||||
alg_list = SSH2.KexDB.ALGORITHMS['key'][host_key_type]
|
alg_list = SSH2.KexDB.ALGORITHMS['key'][host_key_type]
|
||||||
min_modulus = min(hostkey_modulus_size, ca_modulus_size)
|
min_modulus = min(hostkey_modulus_size, ca_modulus_size)
|
||||||
min_modulus = min_modulus if min_modulus > 0 else max(hostkey_modulus_size, ca_modulus_size)
|
min_modulus = min_modulus if min_modulus > 0 else max(hostkey_modulus_size, ca_modulus_size)
|
||||||
@ -809,18 +810,18 @@ class SSH2(object): # pylint: disable=too-few-public-methods
|
|||||||
|
|
||||||
# Performs DH group exchanges to find what moduli are supported, and checks
|
# Performs DH group exchanges to find what moduli are supported, and checks
|
||||||
# their size.
|
# their size.
|
||||||
class GEXTest(object):
|
class GEXTest:
|
||||||
|
|
||||||
# Creates a new connection to the server. Returns an SSH.Socket, or
|
# Creates a new connection to the server. Returns True on success, or False.
|
||||||
# None on failure.
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def reconnect(s, gex_alg):
|
def reconnect(s, gex_alg):
|
||||||
if s.is_connected():
|
if s.is_connected():
|
||||||
return
|
return True
|
||||||
|
|
||||||
s.connect()
|
s.connect()
|
||||||
unused = None # pylint: disable=unused-variable
|
unused = None # pylint: disable=unused-variable
|
||||||
unused, unused, err = s.get_banner()
|
unused2 = None # pylint: disable=unused-variable
|
||||||
|
unused, unused2, err = s.get_banner()
|
||||||
if err is not None:
|
if err is not None:
|
||||||
s.close()
|
s.close()
|
||||||
return False
|
return False
|
||||||
@ -873,7 +874,7 @@ class SSH2(object): # pylint: disable=too-few-public-methods
|
|||||||
# got here, doesn't mean the server is vulnerable...
|
# got here, doesn't mean the server is vulnerable...
|
||||||
smallest_modulus = kex_group.get_dh_modulus_size()
|
smallest_modulus = kex_group.get_dh_modulus_size()
|
||||||
|
|
||||||
except Exception: # pylint: disable=bare-except
|
except Exception:
|
||||||
pass
|
pass
|
||||||
finally:
|
finally:
|
||||||
s.close()
|
s.close()
|
||||||
@ -884,7 +885,7 @@ class SSH2(object): # pylint: disable=too-few-public-methods
|
|||||||
|
|
||||||
# If we found one modulus size already, but we're about
|
# If we found one modulus size already, but we're about
|
||||||
# to test a larger one, don't bother.
|
# to test a larger one, don't bother.
|
||||||
if smallest_modulus > 0 and bits >= smallest_modulus:
|
if bits >= smallest_modulus > 0:
|
||||||
break
|
break
|
||||||
|
|
||||||
if SSH2.GEXTest.reconnect(s, gex_alg) is False:
|
if SSH2.GEXTest.reconnect(s, gex_alg) is False:
|
||||||
@ -895,7 +896,7 @@ class SSH2(object): # pylint: disable=too-few-public-methods
|
|||||||
kex_group.send_init_gex(s, bits, bits, bits)
|
kex_group.send_init_gex(s, bits, bits, bits)
|
||||||
kex_group.recv_reply(s, False)
|
kex_group.recv_reply(s, False)
|
||||||
smallest_modulus = kex_group.get_dh_modulus_size()
|
smallest_modulus = kex_group.get_dh_modulus_size()
|
||||||
except Exception: # pylint: disable=bare-except
|
except Exception:
|
||||||
# import traceback
|
# import traceback
|
||||||
# print(traceback.format_exc())
|
# print(traceback.format_exc())
|
||||||
pass
|
pass
|
||||||
@ -927,8 +928,8 @@ class SSH2(object): # pylint: disable=too-few-public-methods
|
|||||||
break
|
break
|
||||||
|
|
||||||
|
|
||||||
class SSH1(object):
|
class SSH1:
|
||||||
class CRC32(object):
|
class CRC32:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
# type: () -> None
|
# type: () -> None
|
||||||
self._table = [0] * 256
|
self._table = [0] * 256
|
||||||
@ -961,7 +962,7 @@ class SSH1(object):
|
|||||||
cls._crc32 = cls.CRC32()
|
cls._crc32 = cls.CRC32()
|
||||||
return cls._crc32.calc(v)
|
return cls._crc32.calc(v)
|
||||||
|
|
||||||
class KexDB(object): # pylint: disable=too-few-public-methods
|
class KexDB: # pylint: disable=too-few-public-methods
|
||||||
# pylint: disable=bad-whitespace
|
# pylint: disable=bad-whitespace
|
||||||
FAIL_PLAINTEXT = 'no encryption/integrity'
|
FAIL_PLAINTEXT = 'no encryption/integrity'
|
||||||
FAIL_OPENSSH37_REMOVE = 'removed since OpenSSH 3.7'
|
FAIL_OPENSSH37_REMOVE = 'removed since OpenSSH 3.7'
|
||||||
@ -992,7 +993,7 @@ class SSH1(object):
|
|||||||
}
|
}
|
||||||
} # type: Dict[str, Dict[str, List[List[Optional[str]]]]]
|
} # type: Dict[str, Dict[str, List[List[Optional[str]]]]]
|
||||||
|
|
||||||
class PublicKeyMessage(object):
|
class PublicKeyMessage:
|
||||||
def __init__(self, cookie, skey, hkey, pflags, cmask, amask):
|
def __init__(self, cookie, skey, hkey, pflags, cmask, amask):
|
||||||
# type: (binary_type, Tuple[int, int, int], Tuple[int, int, int], int, int, int) -> None
|
# type: (binary_type, Tuple[int, int, int], Tuple[int, int, int], int, int, int) -> None
|
||||||
if len(skey) != 3:
|
if len(skey) != 3:
|
||||||
@ -1122,7 +1123,7 @@ class SSH1(object):
|
|||||||
return pkm
|
return pkm
|
||||||
|
|
||||||
|
|
||||||
class ReadBuf(object):
|
class ReadBuf:
|
||||||
def __init__(self, data=None):
|
def __init__(self, data=None):
|
||||||
# type: (Optional[binary_type]) -> None
|
# type: (Optional[binary_type]) -> None
|
||||||
super(ReadBuf, self).__init__()
|
super(ReadBuf, self).__init__()
|
||||||
@ -1195,10 +1196,9 @@ class ReadBuf(object):
|
|||||||
def reset(self):
|
def reset(self):
|
||||||
self._buf = BytesIO()
|
self._buf = BytesIO()
|
||||||
self._len = 0
|
self._len = 0
|
||||||
super(ReadBuf, self).reset()
|
|
||||||
|
|
||||||
|
|
||||||
class WriteBuf(object):
|
class WriteBuf:
|
||||||
def __init__(self, data=None):
|
def __init__(self, data=None):
|
||||||
# type: (Optional[binary_type]) -> None
|
# type: (Optional[binary_type]) -> None
|
||||||
super(WriteBuf, self).__init__()
|
super(WriteBuf, self).__init__()
|
||||||
@ -1290,8 +1290,8 @@ class WriteBuf(object):
|
|||||||
self._wbuf = BytesIO()
|
self._wbuf = BytesIO()
|
||||||
|
|
||||||
|
|
||||||
class SSH(object): # pylint: disable=too-few-public-methods
|
class SSH: # pylint: disable=too-few-public-methods
|
||||||
class Protocol(object): # pylint: disable=too-few-public-methods
|
class Protocol: # pylint: disable=too-few-public-methods
|
||||||
# pylint: disable=bad-whitespace
|
# pylint: disable=bad-whitespace
|
||||||
SMSG_PUBLIC_KEY = 2
|
SMSG_PUBLIC_KEY = 2
|
||||||
MSG_DEBUG = 4
|
MSG_DEBUG = 4
|
||||||
@ -1304,14 +1304,14 @@ class SSH(object): # pylint: disable=too-few-public-methods
|
|||||||
MSG_KEXDH_GEX_INIT = 32
|
MSG_KEXDH_GEX_INIT = 32
|
||||||
MSG_KEXDH_GEX_REPLY = 33
|
MSG_KEXDH_GEX_REPLY = 33
|
||||||
|
|
||||||
class Product(object): # pylint: disable=too-few-public-methods
|
class Product: # pylint: disable=too-few-public-methods
|
||||||
OpenSSH = 'OpenSSH'
|
OpenSSH = 'OpenSSH'
|
||||||
DropbearSSH = 'Dropbear SSH'
|
DropbearSSH = 'Dropbear SSH'
|
||||||
LibSSH = 'libssh'
|
LibSSH = 'libssh'
|
||||||
TinySSH = 'TinySSH'
|
TinySSH = 'TinySSH'
|
||||||
PuTTY = 'PuTTY'
|
PuTTY = 'PuTTY'
|
||||||
|
|
||||||
class Software(object):
|
class Software:
|
||||||
def __init__(self, vendor, product, version, patch, os_version):
|
def __init__(self, vendor, product, version, patch, os_version):
|
||||||
# type: (Optional[str], str, str, Optional[str], Optional[str]) -> None
|
# type: (Optional[str], str, str, Optional[str], Optional[str]) -> None
|
||||||
self.__vendor = vendor
|
self.__vendor = vendor
|
||||||
@ -1518,7 +1518,7 @@ class SSH(object): # pylint: disable=too-few-public-methods
|
|||||||
return cls(None, SSH.Product.PuTTY, mx.group(1), None, None)
|
return cls(None, SSH.Product.PuTTY, mx.group(1), None, None)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
class Banner(object):
|
class Banner:
|
||||||
_RXP, _RXR = r'SSH-\d\.\s*?\d+', r'(-\s*([^\s]*)(?:\s+(.*))?)?'
|
_RXP, _RXR = r'SSH-\d\.\s*?\d+', r'(-\s*([^\s]*)(?:\s+(.*))?)?'
|
||||||
RX_PROTOCOL = re.compile(re.sub(r'\\d(\+?)', r'(\\d\g<1>)', _RXP))
|
RX_PROTOCOL = re.compile(re.sub(r'\\d(\+?)', r'(\\d\g<1>)', _RXP))
|
||||||
RX_BANNER = re.compile(r'^({0}(?:(?:-{0})*)){1}$'.format(_RXP, _RXR))
|
RX_BANNER = re.compile(r'^({0}(?:(?:-{0})*)){1}$'.format(_RXP, _RXR))
|
||||||
@ -1587,7 +1587,7 @@ class SSH(object): # pylint: disable=too-few-public-methods
|
|||||||
comments = re.sub(r'\s+', ' ', comments)
|
comments = re.sub(r'\s+', ' ', comments)
|
||||||
return cls(protocol, software, comments, valid_ascii)
|
return cls(protocol, software, comments, valid_ascii)
|
||||||
|
|
||||||
class Fingerprint(object):
|
class Fingerprint:
|
||||||
def __init__(self, fpd):
|
def __init__(self, fpd):
|
||||||
# type: (binary_type) -> None
|
# type: (binary_type) -> None
|
||||||
self.__fpd = fpd
|
self.__fpd = fpd
|
||||||
@ -1606,8 +1606,8 @@ class SSH(object): # pylint: disable=too-few-public-methods
|
|||||||
r = h.decode('ascii').rstrip('=')
|
r = h.decode('ascii').rstrip('=')
|
||||||
return u'SHA256:{0}'.format(r)
|
return u'SHA256:{0}'.format(r)
|
||||||
|
|
||||||
class Algorithm(object):
|
class Algorithm:
|
||||||
class Timeframe(object):
|
class Timeframe:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
# type: () -> None
|
# type: () -> None
|
||||||
self.__storage = {} # type: Dict[str, List[Optional[str]]]
|
self.__storage = {} # type: Dict[str, List[Optional[str]]]
|
||||||
@ -1642,7 +1642,7 @@ class SSH(object): # pylint: disable=too-few-public-methods
|
|||||||
for_srv, for_cli = pos < 2, pos > 1
|
for_srv, for_cli = pos < 2, pos > 1
|
||||||
for v in (versions or '').split(','):
|
for v in (versions or '').split(','):
|
||||||
ssh_prod, ssh_ver, is_cli = SSH.Algorithm.get_ssh_version(v)
|
ssh_prod, ssh_ver, is_cli = SSH.Algorithm.get_ssh_version(v)
|
||||||
if (not ssh_ver or (is_cli and for_srv) or (not is_cli and for_cli and ssh_prod in ssh_versions)):
|
if not ssh_ver or (is_cli and for_srv) or (not is_cli and for_cli and ssh_prod in ssh_versions):
|
||||||
continue
|
continue
|
||||||
ssh_versions[ssh_prod] = ssh_ver
|
ssh_versions[ssh_prod] = ssh_ver
|
||||||
for ssh_product, ssh_version in ssh_versions.items():
|
for ssh_product, ssh_version in ssh_versions.items():
|
||||||
@ -1696,7 +1696,7 @@ class SSH(object): # pylint: disable=too-few-public-methods
|
|||||||
return None
|
return None
|
||||||
return 'available since ' + ', '.join(tv).rstrip(', ')
|
return 'available since ' + ', '.join(tv).rstrip(', ')
|
||||||
|
|
||||||
class Algorithms(object):
|
class Algorithms:
|
||||||
def __init__(self, pkm, kex):
|
def __init__(self, pkm, kex):
|
||||||
# type: (Optional[SSH1.PublicKeyMessage], Optional[SSH2.Kex]) -> None
|
# type: (Optional[SSH1.PublicKeyMessage], Optional[SSH2.Kex]) -> None
|
||||||
self.__ssh1kex = pkm
|
self.__ssh1kex = pkm
|
||||||
@ -1883,7 +1883,7 @@ class SSH(object): # pylint: disable=too-few-public-methods
|
|||||||
del rec[sshv]
|
del rec[sshv]
|
||||||
return software, rec
|
return software, rec
|
||||||
|
|
||||||
class Item(object):
|
class Item:
|
||||||
def __init__(self, sshv, db):
|
def __init__(self, sshv, db):
|
||||||
# type: (int, Dict[str, Dict[str, List[List[Optional[str]]]]]) -> None
|
# type: (int, Dict[str, Dict[str, List[List[Optional[str]]]]]) -> None
|
||||||
self.__sshv = sshv
|
self.__sshv = sshv
|
||||||
@ -1908,7 +1908,7 @@ class SSH(object): # pylint: disable=too-few-public-methods
|
|||||||
# type: () -> Iterable[Tuple[str, List[text_type]]]
|
# type: () -> Iterable[Tuple[str, List[text_type]]]
|
||||||
return self.__storage.items()
|
return self.__storage.items()
|
||||||
|
|
||||||
class Security(object): # pylint: disable=too-few-public-methods
|
class Security: # pylint: disable=too-few-public-methods
|
||||||
# Format: [starting_vuln_version, last_vuln_version, affected, CVE_ID, CVSSv2, description]
|
# Format: [starting_vuln_version, last_vuln_version, affected, CVE_ID, CVSSv2, description]
|
||||||
# affected: 1 = server, 2 = client, 4 = local
|
# affected: 1 = server, 2 = client, 4 = local
|
||||||
# Example: if it affects servers, both remote & local, then affected
|
# Example: if it affects servers, both remote & local, then affected
|
||||||
@ -2097,7 +2097,6 @@ class SSH(object): # pylint: disable=too-few-public-methods
|
|||||||
self.__sock_map[s.fileno()] = s
|
self.__sock_map[s.fileno()] = s
|
||||||
except Exception:
|
except Exception:
|
||||||
print("Warning: failed to listen on any IPv4 interfaces.")
|
print("Warning: failed to listen on any IPv4 interfaces.")
|
||||||
pass
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Socket to listen on all IPv6 addresses.
|
# Socket to listen on all IPv6 addresses.
|
||||||
@ -2109,12 +2108,11 @@ class SSH(object): # pylint: disable=too-few-public-methods
|
|||||||
self.__sock_map[s.fileno()] = s
|
self.__sock_map[s.fileno()] = s
|
||||||
except Exception:
|
except Exception:
|
||||||
print("Warning: failed to listen on any IPv6 interfaces.")
|
print("Warning: failed to listen on any IPv6 interfaces.")
|
||||||
pass
|
|
||||||
|
|
||||||
# If we failed to listen on any interfaces, terminate.
|
# If we failed to listen on any interfaces, terminate.
|
||||||
if len(self.__sock_map.keys()) == 0:
|
if len(self.__sock_map.keys()) == 0:
|
||||||
print("Error: failed to listen on any IPv4 and IPv6 interfaces!")
|
print("Error: failed to listen on any IPv4 and IPv6 interfaces!")
|
||||||
exit(-1)
|
sys.exit(-1)
|
||||||
|
|
||||||
# Wait for an incoming connection. If a timeout was explicitly
|
# Wait for an incoming connection. If a timeout was explicitly
|
||||||
# set by the user, terminate when it elapses.
|
# set by the user, terminate when it elapses.
|
||||||
@ -2132,7 +2130,7 @@ class SSH(object): # pylint: disable=too-few-public-methods
|
|||||||
|
|
||||||
if self.__timeout_set and time_elapsed >= self.__timeout:
|
if self.__timeout_set and time_elapsed >= self.__timeout:
|
||||||
print("Timeout elapsed. Terminating...")
|
print("Timeout elapsed. Terminating...")
|
||||||
exit(-1)
|
sys.exit(-1)
|
||||||
|
|
||||||
# Accept the connection.
|
# Accept the connection.
|
||||||
c, addr = self.__sock_map[fds[0][0]].accept()
|
c, addr = self.__sock_map[fds[0][0]].accept()
|
||||||
@ -2304,7 +2302,7 @@ class SSH(object): # pylint: disable=too-few-public-methods
|
|||||||
|
|
||||||
# Returns True if this Socket is connected, otherwise False.
|
# Returns True if this Socket is connected, otherwise False.
|
||||||
def is_connected(self):
|
def is_connected(self):
|
||||||
return (self.__sock is not None)
|
return self.__sock is not None
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
self.__cleanup()
|
self.__cleanup()
|
||||||
@ -2313,16 +2311,13 @@ class SSH(object): # pylint: disable=too-few-public-methods
|
|||||||
self.__header = []
|
self.__header = []
|
||||||
self.__banner = None
|
self.__banner = None
|
||||||
|
|
||||||
def reset(self):
|
def _close_socket(self, s): # pylint: disable=no-self-use
|
||||||
super(SSH.Socket, self).reset()
|
|
||||||
|
|
||||||
def _close_socket(self, s):
|
|
||||||
# type: (Optional[socket.socket]) -> None
|
# type: (Optional[socket.socket]) -> None
|
||||||
try:
|
try:
|
||||||
if s is not None:
|
if s is not None:
|
||||||
s.shutdown(socket.SHUT_RDWR)
|
s.shutdown(socket.SHUT_RDWR)
|
||||||
s.close() # pragma: nocover
|
s.close() # pragma: nocover
|
||||||
except Exception: # pylint: disable=bare-except
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def __del__(self):
|
def __del__(self):
|
||||||
@ -2337,7 +2332,7 @@ class SSH(object): # pylint: disable=too-few-public-methods
|
|||||||
self.__sock = None
|
self.__sock = None
|
||||||
|
|
||||||
|
|
||||||
class KexDH(object): # pragma: nocover
|
class KexDH: # pragma: nocover
|
||||||
def __init__(self, kex_name, hash_alg, g, p):
|
def __init__(self, kex_name, hash_alg, g, p):
|
||||||
# type: (str, int, int) -> None
|
# type: (str, int, int) -> None
|
||||||
self.__kex_name = kex_name
|
self.__kex_name = kex_name
|
||||||
@ -2384,7 +2379,7 @@ class KexDH(object): # pragma: nocover
|
|||||||
while packet_type == SSH.Protocol.MSG_DEBUG:
|
while packet_type == SSH.Protocol.MSG_DEBUG:
|
||||||
packet_type, payload = s.read_packet(2)
|
packet_type, payload = s.read_packet(2)
|
||||||
|
|
||||||
if packet_type != -1 and packet_type not in [SSH.Protocol.MSG_KEXDH_REPLY, SSH.Protocol.MSG_KEXDH_GEX_REPLY]:
|
if packet_type != -1 and packet_type not in [SSH.Protocol.MSG_KEXDH_REPLY, SSH.Protocol.MSG_KEXDH_GEX_REPLY]: # pylint: disable=no-else-raise
|
||||||
# TODO: change Exception to something more specific.
|
# TODO: change Exception to something more specific.
|
||||||
raise Exception('Expected MSG_KEXDH_REPLY (%d) or MSG_KEXDH_GEX_REPLY (%d), but got %d instead.' % (SSH.Protocol.MSG_KEXDH_REPLY, SSH.Protocol.MSG_KEXDH_GEX_REPLY, packet_type))
|
raise Exception('Expected MSG_KEXDH_REPLY (%d) or MSG_KEXDH_GEX_REPLY (%d), but got %d instead.' % (SSH.Protocol.MSG_KEXDH_REPLY, SSH.Protocol.MSG_KEXDH_GEX_REPLY, packet_type))
|
||||||
elif packet_type == -1:
|
elif packet_type == -1:
|
||||||
@ -2635,7 +2630,7 @@ class KexGroupExchange(KexDH):
|
|||||||
s.send_packet()
|
s.send_packet()
|
||||||
|
|
||||||
packet_type, payload = s.read_packet(2)
|
packet_type, payload = s.read_packet(2)
|
||||||
if (packet_type != SSH.Protocol.MSG_KEXDH_GEX_GROUP) and (packet_type != SSH.Protocol.MSG_DEBUG):
|
if (packet_type != SSH.Protocol.MSG_KEXDH_GEX_GROUP) and (packet_type != SSH.Protocol.MSG_DEBUG): # pylint: disable=consider-using-in
|
||||||
# TODO: replace with a better exception type.
|
# TODO: replace with a better exception type.
|
||||||
raise Exception('Expected MSG_KEXDH_GEX_REPLY (%d), but got %d instead.' % (SSH.Protocol.MSG_KEXDH_GEX_REPLY, packet_type))
|
raise Exception('Expected MSG_KEXDH_GEX_REPLY (%d), but got %d instead.' % (SSH.Protocol.MSG_KEXDH_GEX_REPLY, packet_type))
|
||||||
|
|
||||||
@ -2933,7 +2928,7 @@ def output_recommendations(algs, software, padlen=0):
|
|||||||
|
|
||||||
|
|
||||||
# Output additional information & notes.
|
# Output additional information & notes.
|
||||||
def output_info(algs, software, client_audit, any_problems, padlen=0):
|
def output_info(software, client_audit, any_problems):
|
||||||
with OutputBuffer() as obuf:
|
with OutputBuffer() as obuf:
|
||||||
# Tell user that PuTTY cannot be hardened at the protocol-level.
|
# Tell user that PuTTY cannot be hardened at the protocol-level.
|
||||||
if client_audit and (software is not None) and (software.product == SSH.Product.PuTTY):
|
if client_audit and (software is not None) and (software.product == SSH.Product.PuTTY):
|
||||||
@ -3008,14 +3003,14 @@ def output(banner, header, client_host=None, kex=None, pkm=None):
|
|||||||
output_algorithms(title, adb, atype, kex.server.mac, unknown_algorithms, maxlen)
|
output_algorithms(title, adb, atype, kex.server.mac, unknown_algorithms, maxlen)
|
||||||
output_fingerprints(algs, True)
|
output_fingerprints(algs, True)
|
||||||
perfect_config = output_recommendations(algs, software, maxlen)
|
perfect_config = output_recommendations(algs, software, maxlen)
|
||||||
output_info(algs, software, client_audit, not perfect_config)
|
output_info(software, client_audit, not perfect_config)
|
||||||
|
|
||||||
# If we encountered any unknown algorithms, ask the user to report them.
|
# If we encountered any unknown algorithms, ask the user to report them.
|
||||||
if len(unknown_algorithms) > 0:
|
if len(unknown_algorithms) > 0:
|
||||||
out.warn("\n\n!!! WARNING: unknown algorithm(s) found!: %s. Please email the full output above to the maintainer (jtesta@positronsecurity.com), or create a Github issue at <https://github.com/jtesta/ssh-audit/issues>.\n" % ','.join(unknown_algorithms))
|
out.warn("\n\n!!! WARNING: unknown algorithm(s) found!: %s. Please email the full output above to the maintainer (jtesta@positronsecurity.com), or create a Github issue at <https://github.com/jtesta/ssh-audit/issues>.\n" % ','.join(unknown_algorithms))
|
||||||
|
|
||||||
|
|
||||||
class Utils(object):
|
class Utils:
|
||||||
@classmethod
|
@classmethod
|
||||||
def _type_err(cls, v, target):
|
def _type_err(cls, v, target):
|
||||||
# type: (Any, text_type) -> TypeError
|
# type: (Any, text_type) -> TypeError
|
||||||
@ -3091,12 +3086,12 @@ class Utils(object):
|
|||||||
@classmethod
|
@classmethod
|
||||||
def is_print_ascii(cls, v):
|
def is_print_ascii(cls, v):
|
||||||
# type: (Union[text_type, str]) -> bool
|
# type: (Union[text_type, str]) -> bool
|
||||||
return cls._is_ascii(v, lambda x: x >= 32 and x <= 126)
|
return cls._is_ascii(v, lambda x: 126 >= x >= 32)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def to_print_ascii(cls, v, errors='replace'):
|
def to_print_ascii(cls, v, errors='replace'):
|
||||||
# type: (Union[text_type, str], str) -> str
|
# type: (Union[text_type, str], str) -> str
|
||||||
return cls._to_ascii(v, lambda x: x >= 32 and x <= 126, errors)
|
return cls._to_ascii(v, lambda x: 126 >= x >= 32, errors)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def unique_seq(cls, seq):
|
def unique_seq(cls, seq):
|
||||||
@ -3187,7 +3182,7 @@ def build_struct(banner, kex=None, pkm=None, client_host=None):
|
|||||||
for host_key_type in host_keys.keys():
|
for host_key_type in host_keys.keys():
|
||||||
if host_key_type in SSH2.HostKeyTest.RSA_FAMILY:
|
if host_key_type in SSH2.HostKeyTest.RSA_FAMILY:
|
||||||
val = host_keys[host_key_type]
|
val = host_keys[host_key_type]
|
||||||
del(host_keys[host_key_type])
|
del host_keys[host_key_type]
|
||||||
host_keys['ssh-rsa'] = val
|
host_keys['ssh-rsa'] = val
|
||||||
|
|
||||||
for host_key_type in sorted(host_keys):
|
for host_key_type in sorted(host_keys):
|
||||||
|
37
tox.ini
37
tox.ini
@ -1,6 +1,6 @@
|
|||||||
[tox]
|
[tox]
|
||||||
envlist =
|
envlist =
|
||||||
py{27,py,py3}-{test,pylint,flake8,vulture}
|
py{py3}-{test,pylint,flake8,vulture}
|
||||||
py{35,36,37,38}-{test,mypy,pylint,flake8,vulture}
|
py{35,36,37,38}-{test,mypy,pylint,flake8,vulture}
|
||||||
cov
|
cov
|
||||||
skipsdist = true
|
skipsdist = true
|
||||||
@ -12,9 +12,9 @@ deps =
|
|||||||
test,cov: {[testenv:cov]deps}
|
test,cov: {[testenv:cov]deps}
|
||||||
test,py{35,36,37,38}-{type,mypy}: colorama
|
test,py{35,36,37,38}-{type,mypy}: colorama
|
||||||
py{35,36,37,38}-{type,mypy}: {[testenv:mypy]deps}
|
py{35,36,37,38}-{type,mypy}: {[testenv:mypy]deps}
|
||||||
py{27,py,py3,35,36,37,38}-{lint,pylint},lint: {[testenv:pylint]deps}
|
py{py3,35,36,37,38}-{lint,pylint},lint: {[testenv:pylint]deps}
|
||||||
py{27,py,py3,35,36,37,38}-{lint,flake8},lint: {[testenv:flake8]deps}
|
py{py3,35,36,37,38}-{lint,flake8},lint: {[testenv:flake8]deps}
|
||||||
py{27,py,py3,35,36,37,38}-{lint,vulture},lint: {[testenv:vulture]deps}
|
py{py3,35,36,37,38}-{lint,vulture},lint: {[testenv:vulture]deps}
|
||||||
setenv =
|
setenv =
|
||||||
SSHAUDIT = {toxinidir}/ssh-audit.py
|
SSHAUDIT = {toxinidir}/ssh-audit.py
|
||||||
test: COVERAGE_FILE = {toxinidir}/.coverage.{envname}
|
test: COVERAGE_FILE = {toxinidir}/.coverage.{envname}
|
||||||
@ -25,10 +25,11 @@ commands =
|
|||||||
test: pytest -v --junitxml={toxinidir}/reports/junit.{envname}.xml {posargs:test}
|
test: pytest -v --junitxml={toxinidir}/reports/junit.{envname}.xml {posargs:test}
|
||||||
test: coverage report --show-missing
|
test: coverage report --show-missing
|
||||||
test: coverage html -d {toxinidir}/reports/html/coverage.{envname}
|
test: coverage html -d {toxinidir}/reports/html/coverage.{envname}
|
||||||
py{35,36,37,38}-{type,mypy}: {[testenv:mypy]commands}
|
# Temporarily disable mypy, since types have been ignored since v2.0.0.
|
||||||
py{27,py,py3,35,36,37,38}-{lint,pylint},lint: {[testenv:pylint]commands}
|
# py{35,36,37,38}-{type,mypy}: {[testenv:mypy]commands}
|
||||||
py{27,py,py3,35,36,37,38}-{lint,flake8},lint: {[testenv:flake8]commands}
|
py{py3,35,36,37,38}-{lint,pylint},lint: {[testenv:pylint]commands}
|
||||||
py{27,py,py3,35,36,37,38}-{lint,vulture},lint: {[testenv:vulture]commands}
|
py{py3,35,36,37,38}-{lint,flake8},lint: {[testenv:flake8]commands}
|
||||||
|
py{py3,35,36,37,38}-{lint,vulture},lint: {[testenv:vulture]commands}
|
||||||
ignore_outcome =
|
ignore_outcome =
|
||||||
type: true
|
type: true
|
||||||
lint: true
|
lint: true
|
||||||
@ -113,12 +114,20 @@ reports = no
|
|||||||
#output-format = colorized
|
#output-format = colorized
|
||||||
indent-string = \t
|
indent-string = \t
|
||||||
disable =
|
disable =
|
||||||
locally-disabled,
|
bad-continuation,
|
||||||
bad-continuation,
|
broad-except,
|
||||||
multiple-imports,
|
fixme,
|
||||||
invalid-name,
|
invalid-name,
|
||||||
trailing-whitespace,
|
line-too-long,
|
||||||
missing-docstring
|
missing-docstring,
|
||||||
|
mixed-indentation,
|
||||||
|
no-else-return,
|
||||||
|
too-complex,
|
||||||
|
too-many-branches,
|
||||||
|
too-many-instance-attributes,
|
||||||
|
too-many-lines,
|
||||||
|
too-many-locals,
|
||||||
|
too-many-boolean-expressions
|
||||||
max-complexity = 15
|
max-complexity = 15
|
||||||
max-args = 8
|
max-args = 8
|
||||||
max-locals = 20
|
max-locals = 20
|
||||||
|
Loading…
Reference in New Issue
Block a user