mirror of
https://github.com/jtesta/ssh-audit.git
synced 2025-06-23 11:04:31 +02:00
Added more structure to JSON result when policy errors are found.
This commit is contained in:
76
ssh-audit.py
76
ssh-audit.py
@ -204,6 +204,15 @@ class Policy:
|
||||
raise ValueError('The policy does not have a version field.')
|
||||
|
||||
|
||||
@staticmethod
|
||||
def _append_error(errors: List[Any], mismatched_field: str, expected_required: Optional[List[str]], expected_optional: Optional[List[str]], actual: List[str]) -> None:
|
||||
if expected_required is None:
|
||||
expected_required = ['']
|
||||
if expected_optional is None:
|
||||
expected_optional = ['']
|
||||
errors.append({'mismatched_field': mismatched_field, 'expected_required': expected_required, 'expected_optional': expected_optional, 'actual': actual})
|
||||
|
||||
|
||||
@staticmethod
|
||||
def create(source: Optional[str], banner: Optional['SSH.Banner'], kex: Optional['SSH2.Kex'], client_audit: bool) -> str:
|
||||
'''Creates a policy based on a server configuration. Returns a string.'''
|
||||
@ -290,24 +299,24 @@ macs = %s
|
||||
return policy_data
|
||||
|
||||
|
||||
def evaluate(self, banner: Optional['SSH.Banner'], kex: Optional['SSH2.Kex']) -> Tuple[bool, List[str]]:
|
||||
def evaluate(self, banner: Optional['SSH.Banner'], kex: Optional['SSH2.Kex']) -> Tuple[bool, List[Dict[str, str]], str]:
|
||||
'''Evaluates a server configuration against this policy. Returns a tuple of a boolean (True if server adheres to policy) and an array of strings that holds error messages.'''
|
||||
|
||||
ret = True
|
||||
errors = []
|
||||
errors = [] # type: List[Any]
|
||||
|
||||
banner_str = str(banner)
|
||||
if (self._banner is not None) and (banner_str != self._banner):
|
||||
ret = False
|
||||
errors.append('Banner did not match. Expected: [%s]; Actual: [%s]' % (self._banner, banner_str))
|
||||
self._append_error(errors, 'Banner', [self._banner], None, [banner_str])
|
||||
|
||||
# All subsequent tests require a valid kex, so end here if we don't have one.
|
||||
if kex is None:
|
||||
return ret, errors
|
||||
return ret, errors, self._get_error_str(errors)
|
||||
|
||||
if (self._compressions is not None) and (kex.server.compression != self._compressions):
|
||||
ret = False
|
||||
errors.append('Compression types did not match. Expected: %s; Actual: %s' % (self._compressions, kex.server.compression))
|
||||
self._append_error(errors, 'Compression', self._compressions, None, kex.server.compression)
|
||||
|
||||
# If a list of optional host keys was given in the policy, remove any of its entries from the list retrieved from the server. This allows us to do an exact comparison with the expected list below.
|
||||
pruned_host_keys = kex.key_algorithms
|
||||
@ -316,7 +325,7 @@ macs = %s
|
||||
|
||||
if (self._host_keys is not None) and (pruned_host_keys != self._host_keys):
|
||||
ret = False
|
||||
errors.append('Host key types did not match. Expected (required): %s; Expected (optional): %s; Actual: %s' % (self._host_keys, self._optional_host_keys, kex.key_algorithms))
|
||||
self._append_error(errors, 'Host keys', self._host_keys, self._optional_host_keys, kex.key_algorithms)
|
||||
|
||||
if self._hostkey_sizes is not None:
|
||||
hostkey_types = list(self._hostkey_sizes.keys())
|
||||
@ -327,7 +336,7 @@ macs = %s
|
||||
actual_hostkey_size, actual_cakey_size = kex.rsa_key_sizes()[hostkey_type]
|
||||
if actual_hostkey_size != expected_hostkey_size:
|
||||
ret = False
|
||||
errors.append('RSA hostkey (%s) sizes did not match. Expected: %d; Actual: %d' % (hostkey_type, expected_hostkey_size, actual_hostkey_size))
|
||||
self._append_error(errors, 'RSA host key (%s) sizes' % hostkey_type, [str(expected_hostkey_size)], None, [str(actual_hostkey_size)])
|
||||
|
||||
if self._cakey_sizes is not None:
|
||||
hostkey_types = list(self._cakey_sizes.keys())
|
||||
@ -338,19 +347,19 @@ macs = %s
|
||||
actual_hostkey_size, actual_cakey_size = kex.rsa_key_sizes()[hostkey_type]
|
||||
if actual_cakey_size != expected_cakey_size:
|
||||
ret = False
|
||||
errors.append('RSA CA key (%s) sizes did not match. Expected: %d; Actual: %d' % (hostkey_type, expected_cakey_size, actual_cakey_size))
|
||||
self._append_error(errors, 'RSA CA key (%s) sizes' % hostkey_type, [str(expected_cakey_size)], None, [str(actual_cakey_size)])
|
||||
|
||||
if kex.kex_algorithms != self._kex:
|
||||
ret = False
|
||||
errors.append('Key exchanges did not match. Expected: %s; Actual: %s' % (self._kex, kex.kex_algorithms))
|
||||
self._append_error(errors, 'Key exchanges', self._kex, None, kex.kex_algorithms)
|
||||
|
||||
if (self._ciphers is not None) and (kex.server.encryption != self._ciphers):
|
||||
ret = False
|
||||
errors.append('Ciphers did not match. Expected: %s; Actual: %s' % (self._ciphers, kex.server.encryption))
|
||||
self._append_error(errors, 'Ciphers', self._ciphers, None, kex.server.encryption)
|
||||
|
||||
if (self._macs is not None) and (kex.server.mac != self._macs):
|
||||
ret = False
|
||||
errors.append('MACs did not match. Expected: %s; Actual: %s' % (self._macs, kex.server.mac))
|
||||
self._append_error(errors, 'MACs', self._macs, None, kex.server.mac)
|
||||
|
||||
if self._dh_modulus_sizes is not None:
|
||||
dh_modulus_types = list(self._dh_modulus_sizes.keys())
|
||||
@ -361,9 +370,32 @@ macs = %s
|
||||
actual_dh_modulus_size, _ = kex.dh_modulus_sizes()[dh_modulus_type]
|
||||
if expected_dh_modulus_size != actual_dh_modulus_size:
|
||||
ret = False
|
||||
errors.append('Group exchange (%s) modulus sizes did not match. Expected: %d; Actual: %d' % (dh_modulus_type, expected_dh_modulus_size, actual_dh_modulus_size))
|
||||
self._append_error(errors, 'Group exchange (%s) modulus sizes' % dh_modulus_type, [str(expected_dh_modulus_size)], None, [str(actual_dh_modulus_size)])
|
||||
|
||||
return ret, errors
|
||||
return ret, errors, self._get_error_str(errors)
|
||||
|
||||
|
||||
@staticmethod
|
||||
def _get_error_str(errors: List[Any]) -> str:
|
||||
'''Transforms an error struct to a flat string of error messages.'''
|
||||
|
||||
error_list = []
|
||||
for e in errors:
|
||||
e_str = "%s did not match. " % e['mismatched_field']
|
||||
if ('expected_optional' in e) and (e['expected_optional'] != ['']):
|
||||
e_str += "Expected (required): %s; Expected (optional): %s" % (Policy._normalize_error_field(e['expected_required']), Policy._normalize_error_field(e['expected_optional']))
|
||||
else:
|
||||
e_str += "Expected: %s" % Policy._normalize_error_field(e['expected_required'])
|
||||
e_str += "; Actual: %s" % Policy._normalize_error_field(e['actual'])
|
||||
error_list.append(e_str)
|
||||
|
||||
error_list.sort() # To ensure repeatable results for testing.
|
||||
|
||||
error_str = ''
|
||||
if len(error_list) > 0:
|
||||
error_str = " * %s" % '\n * '.join(error_list)
|
||||
|
||||
return error_str
|
||||
|
||||
|
||||
def get_name_and_version(self) -> str:
|
||||
@ -376,6 +408,18 @@ macs = %s
|
||||
return self._server_policy
|
||||
|
||||
|
||||
@staticmethod
|
||||
def _normalize_error_field(field: List[str]) -> Any:
|
||||
'''If field is an array with a string parsable as an integer, return that integer. Otherwise, return the field unmodified.'''
|
||||
if len(field) == 1:
|
||||
try:
|
||||
return int(field[0])
|
||||
except ValueError:
|
||||
return field
|
||||
else:
|
||||
return field
|
||||
|
||||
|
||||
def __str__(self) -> str:
|
||||
undefined = '{undefined}'
|
||||
|
||||
@ -3280,9 +3324,9 @@ def evaluate_policy(aconf: AuditConf, banner: Optional['SSH.Banner'], client_hos
|
||||
if aconf.policy is None:
|
||||
raise RuntimeError('Internal error: cannot evaluate against null Policy!')
|
||||
|
||||
passed, errors = aconf.policy.evaluate(banner, kex)
|
||||
passed, error_struct, error_str = aconf.policy.evaluate(banner, kex)
|
||||
if aconf.json:
|
||||
json_struct = {'host': aconf.host, 'policy': aconf.policy.get_name_and_version(), 'passed': passed, 'errors': errors}
|
||||
json_struct = {'host': aconf.host, 'policy': aconf.policy.get_name_and_version(), 'passed': passed, 'errors': error_struct}
|
||||
print(json.dumps(json_struct, sort_keys=True))
|
||||
else:
|
||||
spacing = ''
|
||||
@ -3297,7 +3341,7 @@ def evaluate_policy(aconf: AuditConf, banner: Optional['SSH.Banner'], client_hos
|
||||
out.good("✔ Passed")
|
||||
else:
|
||||
out.fail("❌ Failed!")
|
||||
out.warn("\nErrors:\n * %s" % '\n * '.join(errors))
|
||||
out.warn("\nErrors:\n%s" % error_str)
|
||||
|
||||
return passed
|
||||
|
||||
|
Reference in New Issue
Block a user