diff --git a/src/ssh_audit/policy.py b/src/ssh_audit/policy.py index c61b80f..2463c42 100644 --- a/src/ssh_audit/policy.py +++ b/src/ssh_audit/policy.py @@ -58,6 +58,7 @@ class Policy: self._hostkey_sizes: Optional[Dict[str, Dict[str, Union[int, str, bytes]]]] = None self._dh_modulus_sizes: Optional[Dict[str, int]] = None self._server_policy = True + self._allow_algorithm_subset_and_reordering = False self._name_and_version: str = '' @@ -116,7 +117,7 @@ class Policy: key = key.strip() val = val.strip() - if key not in ['name', 'version', 'banner', 'compressions', 'host keys', 'optional host keys', 'allowed host keys', 'key exchanges', 'allowed key exchanges', 'ciphers', 'allowed ciphers', 'macs', 'allowed macs', 'client policy', 'host_key_sizes', 'dh_modulus_sizes'] and not key.startswith('hostkey_size_') and not key.startswith('cakey_size_') and not key.startswith('dh_modulus_size_'): + if key not in ['name', 'version', 'banner', 'compressions', 'host keys', 'optional host keys', 'allowed host keys', 'key exchanges', 'allowed key exchanges', 'ciphers', 'allowed ciphers', 'macs', 'allowed macs', 'client policy', 'host_key_sizes', 'dh_modulus_sizes', 'allow_algorithm_subset_and_reordering'] and not key.startswith('hostkey_size_') and not key.startswith('cakey_size_') and not key.startswith('dh_modulus_size_'): raise ValueError("invalid field found in policy: %s" % line) if key in ['name', 'banner']: @@ -217,7 +218,8 @@ class Policy: elif key.startswith('client policy') and val.lower() == 'true': self._server_policy = False - + elif key == 'allow_algorithm_subset_and_reordering' and val.lower() == 'true': + self._allow_algorithm_subset_and_reordering = True if self._name is None: raise ValueError('The policy does not have a name field.') @@ -357,9 +359,7 @@ macs = %s if self._optional_host_keys is not None: pruned_host_keys = [x for x in kex.key_algorithms if x not in self._optional_host_keys] - - - # Checking Hostkeys + # Checking allowed Hostkeys hostkey_error = False if self._allowed_host_keys is not None: for hostkey_t in kex.key_algorithms: @@ -368,15 +368,20 @@ macs = %s ret = False hostkey_error = True + # Checking required Hostkeys if self._host_keys is not None: - for hostkey_t in self._host_keys: - if hostkey_t not in kex.key_algorithms: - ret = False - if not hostkey_error: - self._append_error(errors, 'Host keys', self._host_keys, None, self._optional_host_keys, kex.key_algorithms) + if self._allow_algorithm_subset_and_reordering: + for hostkey_t in self._host_keys: + if hostkey_t not in kex.key_algorithms: + ret = False + if not hostkey_error: + self._append_error(errors, 'Host keys', self._host_keys, None, self._optional_host_keys, kex.key_algorithms) + break + elif pruned_host_keys != self._host_keys: + ret = False + self._append_error(errors, 'Host keys', self._host_keys, None, self._optional_host_keys, kex.key_algorithms) # Checking Host Key Sizes - if self._hostkey_sizes is not None: hostkey_types = list(self._hostkey_sizes.keys()) hostkey_types.sort() # Sorted to make testing output repeatable. @@ -405,7 +410,7 @@ macs = %s ret = False self._append_error(errors, 'CA signature size (%s)' % actual_ca_key_type, [str(expected_ca_key_size)], None, None, [str(actual_ca_key_size)]) - # Checking KEX + # Checking allowed KEX kex_error = False if self._allowed_kex is not None: for kex_t in kex.kex_algorithms: @@ -415,14 +420,20 @@ macs = %s kex_error = True break + # Checking required KEX if self._kex is not None: - for kex_t in self._kex: - if kex_t not in kex.kex_algorithms: - ret = False - if not kex_error: - self._append_error(errors, 'Key exchanges', self._kex, None, None, kex.kex_algorithms) + if self._allow_algorithm_subset_and_reordering: + for kex_t in self._kex: + if kex_t not in kex.kex_algorithms: + ret = False + if not kex_error: + self._append_error(errors, 'Key exchanges', self._kex, None, None, kex.kex_algorithms) + break + elif kex.kex_algorithms != self._kex: # Requires perfect match + ret = False + self._append_error(errors, 'Key exchanges', self._kex, None, None, kex.kex_algorithms) - # Checking Ciphers + # Checking allowed Ciphers cipher_error = False if self._allowed_ciphers is not None: for cipher_t in kex.server.encryption: @@ -432,14 +443,20 @@ macs = %s cipher_error = True break + # Checking required Ciphers if self._ciphers is not None: - for cipher_t in self._ciphers: - if cipher_t not in kex.server.encryption: - ret = False - if not cipher_error: - self._append_error(errors, 'Ciphers', self._ciphers, None, None, kex.server.encryption) + if self._allow_algorithm_subset_and_reordering: + for cipher_t in self._ciphers: + if cipher_t not in kex.server.encryption: + ret = False + if not cipher_error: + self._append_error(errors, 'Ciphers', self._ciphers, None, None, kex.server.encryption) + break + elif kex.server.encryption != self._ciphers: # Requires perfect match + ret = False + self._append_error(errors, 'Ciphers', self._ciphers, None, None, kex.server.encryption) - # Checking MACs + # Checking allowed MACs mac_error = False if self._allowed_macs is not None: for mac_t in kex.server.mac: @@ -449,12 +466,18 @@ macs = %s self._append_error(errors, 'MACs', self._macs, self._allowed_macs, None, kex.server.mac) break + # Checking required MACs if self._macs is not None: - for mac_t in self._macs: - if mac_t not in kex.server.mac: - ret = False - if not mac_error: - self._append_error(errors, 'MACs', self._macs, None, None, kex.server.mac) + if self._allow_algorithm_subset_and_reordering: + for mac_t in self._macs: + if mac_t not in kex.server.mac: + ret = False + if not mac_error: + self._append_error(errors, 'MACs', self._macs, None, None, kex.server.mac) + break + elif kex.server.mac != self._macs: # Requires perfect match + ret = False + self._append_error(errors, 'MACs', self._macs, None, None, kex.server.mac) if self._dh_modulus_sizes is not None: dh_modulus_types = list(self._dh_modulus_sizes.keys())