mirror of
https://github.com/jtesta/ssh-audit.git
synced 2025-01-24 13:15:37 +01:00
Removed header processing from policy checks, as this did not function the way users would expect.
This commit is contained in:
parent
30f2b7690a
commit
01ec6b0b37
40
ssh-audit.py
40
ssh-audit.py
@ -88,7 +88,6 @@ class Policy:
|
||||
self._name = None # type: Optional[str]
|
||||
self._version = None # type: Optional[str]
|
||||
self._banner = None # type: Optional[str]
|
||||
self._header = None # type: Optional[str]
|
||||
self._compressions = None # type: Optional[List[str]]
|
||||
self._host_keys = None # type: Optional[List[str]]
|
||||
self._kex = None # type: Optional[List[str]]
|
||||
@ -126,10 +125,10 @@ class Policy:
|
||||
key = key.strip()
|
||||
val = val.strip()
|
||||
|
||||
if key not in ['name', 'version', 'banner', 'header', 'compressions', 'host keys', 'key exchanges', 'ciphers', 'macs'] and not key.startswith('hostkey_size_') and not key.startswith('cakey_size_') and not key.startswith('dh_modulus_size_'):
|
||||
if key not in ['name', 'version', 'banner', 'compressions', 'host keys', 'key exchanges', 'ciphers', 'macs'] and not key.startswith('hostkey_size_') and not key.startswith('cakey_size_') and not key.startswith('dh_modulus_size_'):
|
||||
raise ValueError("invalid field found in policy: %s" % line)
|
||||
|
||||
if key in ['name', 'banner', 'header']:
|
||||
if key in ['name', 'banner']:
|
||||
|
||||
# If the banner value is blank, set it to "" so that the code below handles it.
|
||||
if len(val) < 2:
|
||||
@ -145,8 +144,6 @@ class Policy:
|
||||
self._name = val
|
||||
elif key == 'banner':
|
||||
self._banner = val
|
||||
else:
|
||||
self._header = val
|
||||
elif key == 'version':
|
||||
self._version = val
|
||||
elif key in ['compressions', 'host keys', 'key exchanges', 'ciphers', 'macs']:
|
||||
@ -193,7 +190,7 @@ class Policy:
|
||||
|
||||
|
||||
@staticmethod
|
||||
def create(host: str, banner: Optional['SSH.Banner'], header: List[str], kex: Optional['SSH2.Kex']) -> str:
|
||||
def create(host: str, banner: Optional['SSH.Banner'], kex: Optional['SSH2.Kex']) -> str:
|
||||
'''Creates a policy based on a server configuration. Returns a string.'''
|
||||
|
||||
today = date.today().strftime('%Y/%m/%d')
|
||||
@ -252,9 +249,6 @@ version = 1
|
||||
# The banner that must match exactly. Commented out to ignore banners, since minor variability in the banner is sometimes normal.
|
||||
# banner = "%s"
|
||||
|
||||
# The header that must match exactly. Commented out to ignore headers, since variability in the header is sometimes normal.
|
||||
# header = "%s"
|
||||
|
||||
# The compression options that must match exactly (order matters). Commented out to ignore by default.
|
||||
# compressions = %s
|
||||
%s%s%s
|
||||
@ -269,12 +263,12 @@ ciphers = %s
|
||||
|
||||
# The MACs that must match exactly (order matters).
|
||||
macs = %s
|
||||
''' % (host, today, host, today, banner, header, compressions, rsa_hostkey_sizes_str, rsa_cakey_sizes_str, dh_modulus_sizes_str, host_keys, kex_algs, ciphers, macs)
|
||||
''' % (host, today, host, today, banner, compressions, rsa_hostkey_sizes_str, rsa_cakey_sizes_str, dh_modulus_sizes_str, host_keys, kex_algs, ciphers, macs)
|
||||
|
||||
return policy_data
|
||||
|
||||
|
||||
def evaluate(self, banner: Optional['SSH.Banner'], header: Optional[List[str]], kex: Optional['SSH2.Kex']) -> Tuple[bool, List[str]]:
|
||||
def evaluate(self, banner: Optional['SSH.Banner'], kex: Optional['SSH2.Kex']) -> Tuple[bool, List[str]]:
|
||||
'''Evaluates a server configuration against this policy. Returns a tuple of a boolean (True if server adheres to policy) and an array of strings that holds error messages.'''
|
||||
|
||||
ret = True
|
||||
@ -285,13 +279,6 @@ macs = %s
|
||||
ret = False
|
||||
errors.append('Banner did not match. Expected: [%s]; Actual: [%s]' % (self._banner, banner_str))
|
||||
|
||||
actual_header = None
|
||||
if header is not None:
|
||||
actual_header = "\n".join(header)
|
||||
if (self._header is not None) and (actual_header is not None) and (actual_header != self._header):
|
||||
ret = False
|
||||
errors.append('Header did not match. Expected: [%s]; Actual: [%s]' % (self._header, actual_header))
|
||||
|
||||
# All subsequent tests require a valid kex, so end here if we don't have one.
|
||||
if kex is None:
|
||||
return ret, errors
|
||||
@ -363,7 +350,6 @@ macs = %s
|
||||
name = undefined
|
||||
version = undefined
|
||||
banner = undefined
|
||||
header = undefined
|
||||
compressions_str = undefined
|
||||
host_keys_str = undefined
|
||||
kex_str = undefined
|
||||
@ -376,8 +362,6 @@ macs = %s
|
||||
version = '[%s]' % self._version
|
||||
if self._banner is not None:
|
||||
banner = '[%s]' % self._banner
|
||||
if self._header is not None:
|
||||
header = '[%s]' % self._header
|
||||
|
||||
if self._compressions is not None:
|
||||
compressions_str = ', '.join(self._compressions)
|
||||
@ -390,7 +374,7 @@ macs = %s
|
||||
if self._macs is not None:
|
||||
macs_str = ', '.join(self._macs)
|
||||
|
||||
return "Name: %s\nVersion: %s\nBanner: %s\nHeader: %s\nCompressions: %s\nHost Keys: %s\nKey Exchanges: %s\nCiphers: %s\nMACs: %s" % (name, version, banner, header, compressions_str, host_keys_str, kex_str, ciphers_str, macs_str)
|
||||
return "Name: %s\nVersion: %s\nBanner: %s\nCompressions: %s\nHost Keys: %s\nKey Exchanges: %s\nCiphers: %s\nMACs: %s" % (name, version, banner, compressions_str, host_keys_str, kex_str, ciphers_str, macs_str)
|
||||
|
||||
|
||||
class AuditConf:
|
||||
@ -3187,12 +3171,12 @@ def output(aconf: AuditConf, banner: Optional[SSH.Banner], header: List[str], cl
|
||||
out.warn("\n\n!!! WARNING: unknown algorithm(s) found!: %s. Please email the full output above to the maintainer (jtesta@positronsecurity.com), or create a Github issue at <https://github.com/jtesta/ssh-audit/issues>.\n" % ','.join(unknown_algorithms))
|
||||
|
||||
|
||||
def evaluate_policy(aconf: AuditConf, banner: Optional['SSH.Banner'], header: Optional[List[str]], kex: Optional['SSH2.Kex'] = None) -> bool:
|
||||
def evaluate_policy(aconf: AuditConf, banner: Optional['SSH.Banner'], kex: Optional['SSH2.Kex'] = None) -> bool:
|
||||
|
||||
if aconf.policy is None:
|
||||
raise RuntimeError('Internal error: cannot evaluate against null Policy!')
|
||||
|
||||
passed, errors = aconf.policy.evaluate(banner, header, kex)
|
||||
passed, errors = aconf.policy.evaluate(banner, kex)
|
||||
if aconf.json:
|
||||
json_struct = {'host': aconf.host, 'policy': aconf.policy.get_name_and_version(), 'passed': passed, 'errors': errors}
|
||||
print(json.dumps(json_struct, sort_keys=True))
|
||||
@ -3209,8 +3193,8 @@ def evaluate_policy(aconf: AuditConf, banner: Optional['SSH.Banner'], header: Op
|
||||
return passed
|
||||
|
||||
|
||||
def make_policy(aconf: AuditConf, banner: Optional['SSH.Banner'], header: List[str], kex: Optional['SSH2.Kex']) -> None:
|
||||
policy_data = Policy.create(aconf.host, banner, header, kex)
|
||||
def make_policy(aconf: AuditConf, banner: Optional['SSH.Banner'], kex: Optional['SSH2.Kex']) -> None:
|
||||
policy_data = Policy.create(aconf.host, banner, kex)
|
||||
|
||||
if aconf.policy_file is None:
|
||||
raise RuntimeError('Internal error: cannot write policy file since filename is None!')
|
||||
@ -3481,11 +3465,11 @@ def audit(aconf: AuditConf, sshv: Optional[int] = None) -> int:
|
||||
|
||||
# This is a policy test.
|
||||
elif (aconf.policy is not None) and (aconf.make_policy is False):
|
||||
return 0 if evaluate_policy(aconf, banner, header, kex=kex) else 1
|
||||
return 0 if evaluate_policy(aconf, banner, kex=kex) else 1
|
||||
|
||||
# A new policy should be made from this scan.
|
||||
elif (aconf.policy is None) and (aconf.make_policy is True):
|
||||
make_policy(aconf, banner, header, kex=kex)
|
||||
make_policy(aconf, banner, kex=kex)
|
||||
|
||||
else:
|
||||
raise RuntimeError('Internal error while handling output: %r %r' % (aconf.policy is None, aconf.make_policy))
|
||||
|
@ -45,7 +45,7 @@ ciphers = cipher_alg1, cipher_alg2, cipher_alg3
|
||||
macs = mac_alg1, mac_alg2, mac_alg3'''
|
||||
|
||||
policy = self.Policy(policy_data=policy_data)
|
||||
assert str(policy) == "Name: [Test Policy]\nVersion: [1]\nBanner: {undefined}\nHeader: {undefined}\nCompressions: comp_alg1\nHost Keys: key_alg1\nKey Exchanges: kex_alg1, kex_alg2\nCiphers: cipher_alg1, cipher_alg2, cipher_alg3\nMACs: mac_alg1, mac_alg2, mac_alg3"
|
||||
assert str(policy) == "Name: [Test Policy]\nVersion: [1]\nBanner: {undefined}\nCompressions: comp_alg1\nHost Keys: key_alg1\nKey Exchanges: kex_alg1, kex_alg2\nCiphers: cipher_alg1, cipher_alg2, cipher_alg3\nMACs: mac_alg1, mac_alg2, mac_alg3"
|
||||
|
||||
|
||||
def test_policy_invalid_1(self):
|
||||
@ -186,23 +186,23 @@ macs = mac_alg1, mac_alg2, mac_alg3'''
|
||||
'''Creates a policy from a kex and ensures it is generated exactly as expected.'''
|
||||
|
||||
kex = self._get_kex()
|
||||
pol_data = self.Policy.create('www.l0l.com', 'bannerX', 'headerX', kex)
|
||||
pol_data = self.Policy.create('www.l0l.com', 'bannerX', kex)
|
||||
|
||||
# Today's date is embedded in the policy, so filter it out to get repeatable results.
|
||||
pol_data = pol_data.replace(date.today().strftime('%Y/%m/%d'), '[todays date]')
|
||||
|
||||
# Instead of writing out the entire expected policy--line by line--just check that it has the expected hash.
|
||||
assert hashlib.sha256(pol_data.encode('ascii')).hexdigest() == 'e830fb9e5731995e5e4858b2b6d16704d7e5c2769d3a8d9acdd023a83ab337c5'
|
||||
assert hashlib.sha256(pol_data.encode('ascii')).hexdigest() == '1765f236d765b1741e7006f601babf0a8e1628326341a3a00b1026c7f85f48ce'
|
||||
|
||||
|
||||
def test_policy_evaluate_passing_1(self):
|
||||
'''Creates a policy and evaluates it against the same server'''
|
||||
|
||||
kex = self._get_kex()
|
||||
policy_data = self.Policy.create('www.l0l.com', None, None, kex)
|
||||
policy_data = self.Policy.create('www.l0l.com', None, kex)
|
||||
policy = self.Policy(policy_data=policy_data)
|
||||
|
||||
ret, errors = policy.evaluate('SSH Server 1.0', None, kex)
|
||||
ret, errors = policy.evaluate('SSH Server 1.0', kex)
|
||||
assert ret is True
|
||||
assert len(errors) == 0
|
||||
|
||||
@ -220,7 +220,7 @@ ciphers = cipher_alg1, cipher_alg2, cipher_alg3
|
||||
macs = mac_alg1, mac_alg2, mac_alg3'''
|
||||
|
||||
policy = self.Policy(policy_data=policy_data)
|
||||
ret, errors = policy.evaluate('SSH Server 1.0', None, self._get_kex())
|
||||
ret, errors = policy.evaluate('SSH Server 1.0', self._get_kex())
|
||||
assert ret is False
|
||||
assert len(errors) == 1
|
||||
assert errors[0].find('Banner did not match.') != -1
|
||||
@ -238,7 +238,7 @@ ciphers = cipher_alg1, cipher_alg2, cipher_alg3
|
||||
macs = mac_alg1, mac_alg2, mac_alg3'''
|
||||
|
||||
policy = self.Policy(policy_data=policy_data)
|
||||
ret, errors = policy.evaluate('SSH Server 1.0', None, self._get_kex())
|
||||
ret, errors = policy.evaluate('SSH Server 1.0', self._get_kex())
|
||||
assert ret is False
|
||||
assert len(errors) == 1
|
||||
assert errors[0].find('Compression types did not match.') != -1
|
||||
@ -256,7 +256,7 @@ ciphers = cipher_alg1, cipher_alg2, cipher_alg3
|
||||
macs = mac_alg1, mac_alg2, mac_alg3'''
|
||||
|
||||
policy = self.Policy(policy_data=policy_data)
|
||||
ret, errors = policy.evaluate('SSH Server 1.0', None, self._get_kex())
|
||||
ret, errors = policy.evaluate('SSH Server 1.0', self._get_kex())
|
||||
assert ret is False
|
||||
assert len(errors) == 1
|
||||
assert errors[0].find('Host key types did not match.') != -1
|
||||
@ -274,7 +274,7 @@ ciphers = cipher_alg1, cipher_alg2, cipher_alg3
|
||||
macs = mac_alg1, mac_alg2, mac_alg3'''
|
||||
|
||||
policy = self.Policy(policy_data=policy_data)
|
||||
ret, errors = policy.evaluate('SSH Server 1.0', None, self._get_kex())
|
||||
ret, errors = policy.evaluate('SSH Server 1.0', self._get_kex())
|
||||
assert ret is False
|
||||
assert len(errors) == 1
|
||||
assert errors[0].find('Key exchanges did not match.') != -1
|
||||
@ -292,7 +292,7 @@ ciphers = cipher_alg1, XXXmismatched, cipher_alg2, cipher_alg3
|
||||
macs = mac_alg1, mac_alg2, mac_alg3'''
|
||||
|
||||
policy = self.Policy(policy_data=policy_data)
|
||||
ret, errors = policy.evaluate('SSH Server 1.0', None, self._get_kex())
|
||||
ret, errors = policy.evaluate('SSH Server 1.0', self._get_kex())
|
||||
assert ret is False
|
||||
assert len(errors) == 1
|
||||
assert errors[0].find('Ciphers did not match.') != -1
|
||||
@ -310,7 +310,7 @@ ciphers = cipher_alg1, cipher_alg2, cipher_alg3
|
||||
macs = mac_alg1, mac_alg2, XXXmismatched, mac_alg3'''
|
||||
|
||||
policy = self.Policy(policy_data=policy_data)
|
||||
ret, errors = policy.evaluate('SSH Server 1.0', None, self._get_kex())
|
||||
ret, errors = policy.evaluate('SSH Server 1.0', self._get_kex())
|
||||
assert ret is False
|
||||
assert len(errors) == 1
|
||||
assert errors[0].find('MACs did not match.') != -1
|
||||
@ -328,7 +328,7 @@ ciphers = cipher_alg1, cipher_alg2, cipher_alg3
|
||||
macs = mac_alg1, mac_alg2, XXXmismatchedXXX, mac_alg3'''
|
||||
|
||||
policy = self.Policy(policy_data=policy_data)
|
||||
ret, errors = policy.evaluate('SSH Server 1.0', None, self._get_kex())
|
||||
ret, errors = policy.evaluate('SSH Server 1.0', self._get_kex())
|
||||
assert ret is False
|
||||
assert len(errors) == 2
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user