mirror of
synced 2025-03-12 14:45:25 +01:00
Transformed comment type annotations to variable declaration annotations.
This commit is contained in:
@ -161,6 +161,7 @@ For convenience, a web front-end on top of the command-line tool is available at
- Added version check for OpenSSH user enumeration (CVE-2018-15473).
- Fixed crash when receiving unexpected response during host key test.
- Fixed hang against older Cisco devices during host key test & gex test.
- Dropped support for Python 3.5 (which reached EOL in Sept. 2020).
### v2.3.1 (2020-10-28)
- Now parses public key sizes for `rsa-sha2-256-cert-v01@openssh.com` and `rsa-sha2-512-cert-v01@openssh.com` host key types.
@ -131,7 +131,7 @@ class Algorithms:
# if version is not None:
# software = SSH.Software(None, product, version, None, None)
# break
rec = {} # type: Dict[int, Dict[str, Dict[str, Dict[str, int]]]]
rec: Dict[int, Dict[str, Dict[str, Dict[str, int]]]] = {}
if software is None:
unknown_software = True
for alg_pair in self.values:
@ -206,7 +206,7 @@ class Algorithms:
def __init__(self, sshv: int, db: Dict[str, Dict[str, List[List[Optional[str]]]]]) -> None:
self.__sshv = sshv
self.__db = db
self.__storage = {} # type: Dict[str, List[str]]
self.__storage: Dict[str, List[str]] = {}
def sshv(self) -> int:
@ -43,16 +43,16 @@ class AuditConf:
self.json = False
self.verbose = False
self.level = 'info'
self.ipvo = () # type: Sequence[int]
self.ipvo: Sequence[int] = ()
self.ipv4 = False
self.ipv6 = False
self.make_policy = False # When True, creates a policy file from an audit scan.
self.policy_file = None # type: Optional[str] # File system path to a policy
self.policy = None # type: Optional[Policy] # Policy object
self.policy_file: Optional[str] = None # File system path to a policy
self.policy: Optional[Policy] = None # Policy object
self.timeout = 5.0
self.timeout_set = False # Set to True when the user explicitly sets it.
self.target_file = None # type: Optional[str]
self.target_list = [] # type: List[str]
self.target_file: Optional[str] = None
self.target_list: List[str] = []
self.list_policies = False
self.lookup = ''
@ -46,8 +46,8 @@ class KexDH: # pragma: nocover
self.__e = 0
self.set_params(g, p)
self.__ed25519_pubkey = None # type: Optional[bytes]
self.__hostkey_type = None # type: Optional[bytes]
self.__ed25519_pubkey: Optional[bytes] = None
self.__hostkey_type: Optional[bytes] = None
self.__hostkey_e = 0
self.__hostkey_n = 0
self.__hostkey_n_len = 0 # Length of the host key modulus.
@ -32,7 +32,7 @@ from ssh_audit.utils import Utils
class Output:
LEVELS = ('info', 'warn', 'fail') # type: Sequence[str]
LEVELS: Sequence[str] = ('info', 'warn', 'fail')
COLORS = {'head': 36, 'good': 32, 'warn': 33, 'fail': 31}
# Use brighter colors on Windows for better readability.
@ -36,7 +36,7 @@ from ssh_audit.banner import Banner # pylint: disable=unused-import
class Policy:
# Each field maps directly to a private member variable of the Policy class.
BUILTIN_POLICIES: Dict[str, Dict[str, Union[Optional[str], Optional[List[str]], bool, Dict[str, int]]]] = {
# Ubuntu Server policies
@ -74,25 +74,25 @@ class Policy:
'Hardened Ubuntu Client 20.04 LTS (version 2)': {'version': '2', 'banner': None, 'compressions': None, 'host_keys': ['ssh-ed25519', 'ssh-ed25519-cert-v01@openssh.com', 'sk-ssh-ed25519@openssh.com', 'sk-ssh-ed25519-cert-v01@openssh.com', 'rsa-sha2-256', 'rsa-sha2-256-cert-v01@openssh.com', 'rsa-sha2-512', 'rsa-sha2-512-cert-v01@openssh.com'], 'optional_host_keys': None, 'kex': ['curve25519-sha256', 'curve25519-sha256@libssh.org', 'diffie-hellman-group16-sha512', 'diffie-hellman-group18-sha512', 'diffie-hellman-group-exchange-sha256', 'ext-info-c'], 'ciphers': ['chacha20-poly1305@openssh.com', 'aes256-gcm@openssh.com', 'aes128-gcm@openssh.com', 'aes256-ctr', 'aes192-ctr', 'aes128-ctr'], 'macs': ['hmac-sha2-256-etm@openssh.com', 'hmac-sha2-512-etm@openssh.com', 'umac-128-etm@openssh.com'], 'hostkey_sizes': None, 'cakey_sizes': None, 'dh_modulus_sizes': None, 'server_policy': False},
} # type: Dict[str, Dict[str, Union[Optional[str], Optional[List[str]], bool, Dict[str, int]]]]
def __init__(self, policy_file: Optional[str] = None, policy_data: Optional[str] = None, manual_load: bool = False) -> None:
self._name = None # type: Optional[str]
self._version = None # type: Optional[str]
self._banner = None # type: Optional[str]
self._compressions = None # type: Optional[List[str]]
self._host_keys = None # type: Optional[List[str]]
self._optional_host_keys = None # type: Optional[List[str]]
self._kex = None # type: Optional[List[str]]
self._ciphers = None # type: Optional[List[str]]
self._macs = None # type: Optional[List[str]]
self._hostkey_sizes = None # type: Optional[Dict[str, int]]
self._cakey_sizes = None # type: Optional[Dict[str, int]]
self._dh_modulus_sizes = None # type: Optional[Dict[str, int]]
self._name: Optional[str] = None
self._version: Optional[str] = None
self._banner: Optional[str] = None
self._compressions: Optional[List[str]] = None
self._host_keys: Optional[List[str]] = None
self._optional_host_keys: Optional[List[str]] = None
self._kex: Optional[List[str]] = None
self._ciphers: Optional[List[str]] = None
self._macs: Optional[List[str]] = None
self._hostkey_sizes: Optional[Dict[str, int]] = None
self._cakey_sizes: Optional[Dict[str, int]] = None
self._dh_modulus_sizes: Optional[Dict[str, int]] = None
self._server_policy = True
self._name_and_version = '' # type: str
self._name_and_version: str = ''
# Ensure that only one mode was specified.
num_modes = 0
@ -305,7 +305,7 @@ macs = %s
'''Evaluates a server configuration against this policy. Returns a tuple of a boolean (True if server adheres to policy) and an array of strings that holds error messages.'''
ret = True
errors = [] # type: List[Any]
errors: List[Any] = []
banner_str = str(banner)
if (self._banner is not None) and (banner_str != self._banner):
@ -43,14 +43,14 @@ class ReadBuf:
return self._buf.read(size)
def read_byte(self) -> int:
v = struct.unpack('B', self.read(1))[0] # type: int
v: int = struct.unpack('B', self.read(1))[0]
return v
def read_bool(self) -> bool:
return self.read_byte() != 0
def read_int(self) -> int:
v = struct.unpack('>I', self.read(4))[0] # type: int
v: int = struct.unpack('>I', self.read(4))[0]
return v
def read_list(self) -> List[str]:
@ -180,7 +180,7 @@ class Software:
# pylint: disable=too-many-return-statements
software = str(banner.software)
mx = re.match(r'^dropbear_([\d\.]+\d+)(.*)', software)
v = None # type: Optional[str]
v: Optional[str] = None
if mx is not None:
patch = cls._fix_patch(mx.group(2))
v, p = 'Matt Johnston', Product.DropbearSSH
@ -29,7 +29,7 @@ from ssh_audit.ssh1_crc32 import SSH1_CRC32
class SSH1:
_crc32 = None # type: Optional[SSH1_CRC32]
_crc32: Optional[SSH1_CRC32] = None
CIPHERS = ['none', 'idea', 'des', '3des', 'tss', 'rc4', 'blowfish']
AUTHS = ['none', 'rhosts', 'rsa', 'password', 'rhosts_rsa', 'tis', 'kerberos']
@ -34,7 +34,7 @@ class SSH1_KexDB: # pylint: disable=too-few-public-methods
FAIL_NA_UNSAFE = 'not implemented in OpenSSH (server), unsafe algorithm'
TEXT_CIPHER_IDEA = 'cipher used by commercial SSH'
ALGORITHMS: Dict[str, Dict[str, List[List[Optional[str]]]]] = {
'key': {
'ssh-rsa1': [['1.2.2']],
@ -55,4 +55,4 @@ class SSH1_KexDB: # pylint: disable=too-few-public-methods
'tis': [['1.2.2']],
'kerberos': [['1.2.2', '3.6'], [FAIL_OPENSSH37_REMOVE]],
} # type: Dict[str, Dict[str, List[List[Optional[str]]]]]
@ -41,9 +41,9 @@ class SSH2_Kex:
self.__follows = follows
self.__unused = unused
self.__rsa_key_sizes = {} # type: Dict[str, Tuple[int, int]]
self.__dh_modulus_sizes = {} # type: Dict[str, Tuple[int, int]]
self.__host_keys = {} # type: Dict[str, bytes]
self.__rsa_key_sizes: Dict[str, Tuple[int, int]] = {}
self.__dh_modulus_sizes: Dict[str, Tuple[int, int]] = {}
self.__host_keys: Dict[str, bytes] = {}
def cookie(self) -> bytes:
@ -59,7 +59,7 @@ class SSH2_KexDB: # pylint: disable=too-few-public-methods
WARN_OBSOLETE = 'using obsolete algorithm'
WARN_UNTRUSTED = 'using untrusted algorithm'
ALGORITHMS: Dict[str, Dict[str, List[List[Optional[str]]]]] = {
# Format: 'algorithm_name': [['version_first_appeared_in'], [reason_for_failure1, reason_for_failure2, ...], [warning1, warning2, ...]]
'kex': {
'diffie-hellman-group1-sha1': [['2.3.0,d0.28,l10.2', '6.6', '6.9'], [FAIL_1024BIT_MODULUS, FAIL_OPENSSH67_UNSAFE, FAIL_OPENSSH70_LOGJAM], [WARN_HASH_WEAK]],
@ -268,4 +268,4 @@ class SSH2_KexDB: # pylint: disable=too-few-public-methods
'chacha20-poly1305@openssh.com': [[]], # Despite the @openssh.com tag, this was never shipped as a MAC in OpenSSH (only as a cipher); it is only implemented as a MAC in Syncplify.
'crypticore-mac@ssh.com': [[], [FAIL_UNPROVEN]],
} # type: Dict[str, Dict[str, List[List[Optional[str]]]]]
@ -210,13 +210,13 @@ def output_security_sub(sub: str, software: Optional[Software], client_audit: bo
if software is None or software.product not in secdb:
for line in secdb[software.product]:
vfrom = '' # type: str
vtill = '' # type: str
vfrom: str = ''
vtill: str = ''
vfrom, vtill = line[0:2]
if not software.between_versions(vfrom, vtill):
target = 0 # type: int
name = '' # type: str
target: int = 0
name: str = ''
target, name = line[2:4]
is_server = target & 1 == 1
is_client = target & 2 == 2
@ -227,8 +227,8 @@ def output_security_sub(sub: str, software: Optional[Software], client_audit: bo
p = '' if out.batch else ' ' * (padlen - len(name))
if sub == 'cve':
cvss = 0.0 # type: float
descr = '' # type: str
cvss: float = 0.0
descr: str = ''
cvss, descr = line[4:6]
# Critical CVSS scores (>= 8.0) are printed as a fail, otherwise they are printed as a warning.
@ -431,7 +431,7 @@ def output(aconf: AuditConf, banner: Optional[Banner], header: List[str], client
maxlen = algs.maxlen + 1
output_security(banner, client_audit, maxlen, aconf.json)
# Filled in by output_algorithms() with unidentified algs.
unknown_algorithms = [] # type: List[str]
unknown_algorithms: List[str] = []
if pkm is not None:
ciphers = pkm.supported_ciphers
@ -529,7 +529,7 @@ def list_policies() -> None:
def make_policy(aconf: AuditConf, banner: Optional['Banner'], kex: Optional['SSH2_Kex'], client_host: Optional[str]) -> None:
# Set the source of this policy to the server host if this is a server audit, otherwise set it to the client address.
source = aconf.host # type: Optional[str]
source: Optional[str] = aconf.host
if aconf.client_audit:
source = client_host
@ -562,9 +562,9 @@ def process_commandline(args: List[str], usage_cb: Callable[..., None]) -> 'Audi
except getopt.GetoptError as err:
aconf.ssh1, aconf.ssh2 = False, False
host = '' # type: str
oport = None # type: Optional[str]
port = 0 # type: int
host: str = ''
oport: Optional[str] = None
port: int = 0
for o, a in opts:
if o in ('-h', '--help'):
@ -687,14 +687,14 @@ def build_struct(banner: Optional['Banner'], kex: Optional['SSH2_Kex'] = None, p
banner_software = banner.software
banner_comments = banner.comments
res = {
res: Any = {
"banner": {
"raw": banner_str,
"protocol": banner_protocol,
"software": banner_software,
"comments": banner_comments,
} # type: Any
if client_host is not None:
res['client_ip'] = client_host
if kex is not None:
@ -703,9 +703,9 @@ def build_struct(banner: Optional['Banner'], kex: Optional['SSH2_Kex'] = None, p
res['kex'] = []
alg_sizes = kex.dh_modulus_sizes()
for algorithm in kex.kex_algorithms:
entry = {
entry: Any = {
'algorithm': algorithm,
} # type: Any
if algorithm in alg_sizes:
hostkey_size, ca_size = alg_sizes[algorithm]
entry['keysize'] = hostkey_size
@ -879,7 +879,7 @@ def algorithm_lookup(alg_names: str) -> int:
for (outer_k, outer_v) in adb.items()
unknown_algorithms = [] # type: List[str]
unknown_algorithms: List[str] = []
padding = len(max(algorithm_names, key=len))
for alg_type in alg_types:
@ -54,12 +54,12 @@ class SSH_Socket(ReadBuf, WriteBuf):
def __init__(self, host: Optional[str], port: int, ipvo: Optional[Sequence[int]] = None, timeout: Union[int, float] = 5, timeout_set: bool = False) -> None:
super(SSH_Socket, self).__init__()
self.__sock = None # type: Optional[socket.socket]
self.__sock_map = {} # type: Dict[int, socket.socket]
self.__sock: Optional[socket.socket] = None
self.__sock_map: Dict[int, socket.socket] = {}
self.__block_size = 8
self.__state = 0
self.__header = [] # type: List[str]
self.__banner = None # type: Optional[Banner]
self.__header: List[str] = []
self.__banner: Optional[Banner] = None
if host is None:
raise ValueError('undefined host')
nport = Utils.parse_int(port)
@ -73,7 +73,7 @@ class SSH_Socket(ReadBuf, WriteBuf):
self.__ipvo = ()
self.__timeout = timeout
self.__timeout_set = timeout_set
self.client_host = None # type: Optional[str]
self.client_host: Optional[str] = None
self.client_port = None
def _resolve(self, ipvo: Sequence[int]) -> Iterable[Tuple[int, Tuple[Any, ...]]]:
@ -30,12 +30,12 @@ from ssh_audit.algorithm import Algorithm
class Timeframe:
def __init__(self) -> None:
self.__storage = {} # type: Dict[str, List[Optional[str]]]
self.__storage: Dict[str, List[Optional[str]]] = {}
def __contains__(self, product: str) -> bool:
return product in self.__storage
def __getitem__(self, product): # type: (str) -> Sequence[Optional[str]]
def __getitem__(self, product: str) -> Sequence[Optional[str]]:
return tuple(self.__storage.get(product, [None] * 4))
def __str__(self) -> str:
@ -51,7 +51,7 @@ class Timeframe:
return self[product][1 if bool(for_server) else 3]
def _update(self, versions: Optional[str], pos: int) -> None:
ssh_versions = {} # type: Dict[str, str]
ssh_versions: Dict[str, str] = {}
for_srv, for_cli = pos < 2, pos > 1
for v in (versions or '').split(','):
ssh_prod, ssh_ver, is_cli = Algorithm.get_ssh_version(v)
@ -96,7 +96,7 @@ class Utils:
def unique_seq(cls, seq: Sequence[Any]) -> Sequence[Any]:
seen = set() # type: Set[Any]
seen: Set[Any] = set()
def _seen_add(x: Any) -> bool:
@ -33,7 +33,7 @@ class VersionVulnerabilityDB: # pylint: disable=too-few-public-methods
# Example: if it affects servers, both remote & local, then affected
# = 1. If it affects servers, but is a local issue only,
# then affected = 1 + 4 = 5.
CVE = {
CVE: Dict[str, List[List[Any]]] = {
'Dropbear SSH': [
['0.0', '2018.76', 1, 'CVE-2018-15599', 5.0, 'remote users may enumerate users on the system'],
['0.0', '2017.74', 5, 'CVE-2017-9079', 4.7, 'local users can read certain files as root'],
@ -140,12 +140,12 @@ class VersionVulnerabilityDB: # pylint: disable=too-few-public-methods
['0.0', '0.66', 2, 'CVE-2016-2563', 7.5, 'buffer overflow in SCP command-line utility'],
['0.0', '0.65', 2, 'CVE-2015-5309', 4.3, 'integer overflow in terminal-handling code'],
} # type: Dict[str, List[List[Any]]]
TXT = {
TXT: Dict[str, List[List[Any]]] = {
'Dropbear SSH': [
['0.28', '0.34', 1, 'remote root exploit', 'remote format string buffer overflow exploit (exploit-db#387)']],
'libssh': [
['0.3.3', '0.3.3', 1, 'null pointer check', 'missing null pointer check in "crypt_set_algorithms_server"'],
['0.3.3', '0.3.3', 1, 'integer overflow', 'integer overflow in "buffer_get_data"'],
['0.3.3', '0.3.3', 3, 'heap overflow', 'heap overflow in "packet_decrypt"']]
} # type: Dict[str, List[List[Any]]]
Reference in New Issue
Block a user