From 01ec6b0b37f5689f3bca6e450033d76ed5e6d250 Mon Sep 17 00:00:00 2001 From: Joe Testa Date: Wed, 1 Jul 2020 13:12:49 -0400 Subject: [PATCH] Removed header processing from policy checks, as this did not function the way users would expect. --- ssh-audit.py | 40 ++++++++++++---------------------------- test/test_policy.py | 24 ++++++++++++------------ 2 files changed, 24 insertions(+), 40 deletions(-) diff --git a/ssh-audit.py b/ssh-audit.py index 81ede73..fdbf0cf 100755 --- a/ssh-audit.py +++ b/ssh-audit.py @@ -88,7 +88,6 @@ class Policy: self._name = None # type: Optional[str] self._version = None # type: Optional[str] self._banner = None # type: Optional[str] - self._header = None # type: Optional[str] self._compressions = None # type: Optional[List[str]] self._host_keys = None # type: Optional[List[str]] self._kex = None # type: Optional[List[str]] @@ -126,10 +125,10 @@ class Policy: key = key.strip() val = val.strip() - if key not in ['name', 'version', 'banner', 'header', 'compressions', 'host keys', 'key exchanges', 'ciphers', 'macs'] and not key.startswith('hostkey_size_') and not key.startswith('cakey_size_') and not key.startswith('dh_modulus_size_'): + if key not in ['name', 'version', 'banner', 'compressions', 'host keys', 'key exchanges', 'ciphers', 'macs'] and not key.startswith('hostkey_size_') and not key.startswith('cakey_size_') and not key.startswith('dh_modulus_size_'): raise ValueError("invalid field found in policy: %s" % line) - if key in ['name', 'banner', 'header']: + if key in ['name', 'banner']: # If the banner value is blank, set it to "" so that the code below handles it. if len(val) < 2: @@ -145,8 +144,6 @@ class Policy: self._name = val elif key == 'banner': self._banner = val - else: - self._header = val elif key == 'version': self._version = val elif key in ['compressions', 'host keys', 'key exchanges', 'ciphers', 'macs']: @@ -193,7 +190,7 @@ class Policy: @staticmethod - def create(host: str, banner: Optional['SSH.Banner'], header: List[str], kex: Optional['SSH2.Kex']) -> str: + def create(host: str, banner: Optional['SSH.Banner'], kex: Optional['SSH2.Kex']) -> str: '''Creates a policy based on a server configuration. Returns a string.''' today = date.today().strftime('%Y/%m/%d') @@ -252,9 +249,6 @@ version = 1 # The banner that must match exactly. Commented out to ignore banners, since minor variability in the banner is sometimes normal. # banner = "%s" -# The header that must match exactly. Commented out to ignore headers, since variability in the header is sometimes normal. -# header = "%s" - # The compression options that must match exactly (order matters). Commented out to ignore by default. # compressions = %s %s%s%s @@ -269,12 +263,12 @@ ciphers = %s # The MACs that must match exactly (order matters). macs = %s -''' % (host, today, host, today, banner, header, compressions, rsa_hostkey_sizes_str, rsa_cakey_sizes_str, dh_modulus_sizes_str, host_keys, kex_algs, ciphers, macs) +''' % (host, today, host, today, banner, compressions, rsa_hostkey_sizes_str, rsa_cakey_sizes_str, dh_modulus_sizes_str, host_keys, kex_algs, ciphers, macs) return policy_data - def evaluate(self, banner: Optional['SSH.Banner'], header: Optional[List[str]], kex: Optional['SSH2.Kex']) -> Tuple[bool, List[str]]: + def evaluate(self, banner: Optional['SSH.Banner'], kex: Optional['SSH2.Kex']) -> Tuple[bool, List[str]]: '''Evaluates a server configuration against this policy. Returns a tuple of a boolean (True if server adheres to policy) and an array of strings that holds error messages.''' ret = True @@ -285,13 +279,6 @@ macs = %s ret = False errors.append('Banner did not match. Expected: [%s]; Actual: [%s]' % (self._banner, banner_str)) - actual_header = None - if header is not None: - actual_header = "\n".join(header) - if (self._header is not None) and (actual_header is not None) and (actual_header != self._header): - ret = False - errors.append('Header did not match. Expected: [%s]; Actual: [%s]' % (self._header, actual_header)) - # All subsequent tests require a valid kex, so end here if we don't have one. if kex is None: return ret, errors @@ -363,7 +350,6 @@ macs = %s name = undefined version = undefined banner = undefined - header = undefined compressions_str = undefined host_keys_str = undefined kex_str = undefined @@ -376,8 +362,6 @@ macs = %s version = '[%s]' % self._version if self._banner is not None: banner = '[%s]' % self._banner - if self._header is not None: - header = '[%s]' % self._header if self._compressions is not None: compressions_str = ', '.join(self._compressions) @@ -390,7 +374,7 @@ macs = %s if self._macs is not None: macs_str = ', '.join(self._macs) - return "Name: %s\nVersion: %s\nBanner: %s\nHeader: %s\nCompressions: %s\nHost Keys: %s\nKey Exchanges: %s\nCiphers: %s\nMACs: %s" % (name, version, banner, header, compressions_str, host_keys_str, kex_str, ciphers_str, macs_str) + return "Name: %s\nVersion: %s\nBanner: %s\nCompressions: %s\nHost Keys: %s\nKey Exchanges: %s\nCiphers: %s\nMACs: %s" % (name, version, banner, compressions_str, host_keys_str, kex_str, ciphers_str, macs_str) class AuditConf: @@ -3187,12 +3171,12 @@ def output(aconf: AuditConf, banner: Optional[SSH.Banner], header: List[str], cl out.warn("\n\n!!! WARNING: unknown algorithm(s) found!: %s. Please email the full output above to the maintainer (jtesta@positronsecurity.com), or create a Github issue at .\n" % ','.join(unknown_algorithms)) -def evaluate_policy(aconf: AuditConf, banner: Optional['SSH.Banner'], header: Optional[List[str]], kex: Optional['SSH2.Kex'] = None) -> bool: +def evaluate_policy(aconf: AuditConf, banner: Optional['SSH.Banner'], kex: Optional['SSH2.Kex'] = None) -> bool: if aconf.policy is None: raise RuntimeError('Internal error: cannot evaluate against null Policy!') - passed, errors = aconf.policy.evaluate(banner, header, kex) + passed, errors = aconf.policy.evaluate(banner, kex) if aconf.json: json_struct = {'host': aconf.host, 'policy': aconf.policy.get_name_and_version(), 'passed': passed, 'errors': errors} print(json.dumps(json_struct, sort_keys=True)) @@ -3209,8 +3193,8 @@ def evaluate_policy(aconf: AuditConf, banner: Optional['SSH.Banner'], header: Op return passed -def make_policy(aconf: AuditConf, banner: Optional['SSH.Banner'], header: List[str], kex: Optional['SSH2.Kex']) -> None: - policy_data = Policy.create(aconf.host, banner, header, kex) +def make_policy(aconf: AuditConf, banner: Optional['SSH.Banner'], kex: Optional['SSH2.Kex']) -> None: + policy_data = Policy.create(aconf.host, banner, kex) if aconf.policy_file is None: raise RuntimeError('Internal error: cannot write policy file since filename is None!') @@ -3481,11 +3465,11 @@ def audit(aconf: AuditConf, sshv: Optional[int] = None) -> int: # This is a policy test. elif (aconf.policy is not None) and (aconf.make_policy is False): - return 0 if evaluate_policy(aconf, banner, header, kex=kex) else 1 + return 0 if evaluate_policy(aconf, banner, kex=kex) else 1 # A new policy should be made from this scan. elif (aconf.policy is None) and (aconf.make_policy is True): - make_policy(aconf, banner, header, kex=kex) + make_policy(aconf, banner, kex=kex) else: raise RuntimeError('Internal error while handling output: %r %r' % (aconf.policy is None, aconf.make_policy)) diff --git a/test/test_policy.py b/test/test_policy.py index 665d472..4fab97c 100644 --- a/test/test_policy.py +++ b/test/test_policy.py @@ -45,7 +45,7 @@ ciphers = cipher_alg1, cipher_alg2, cipher_alg3 macs = mac_alg1, mac_alg2, mac_alg3''' policy = self.Policy(policy_data=policy_data) - assert str(policy) == "Name: [Test Policy]\nVersion: [1]\nBanner: {undefined}\nHeader: {undefined}\nCompressions: comp_alg1\nHost Keys: key_alg1\nKey Exchanges: kex_alg1, kex_alg2\nCiphers: cipher_alg1, cipher_alg2, cipher_alg3\nMACs: mac_alg1, mac_alg2, mac_alg3" + assert str(policy) == "Name: [Test Policy]\nVersion: [1]\nBanner: {undefined}\nCompressions: comp_alg1\nHost Keys: key_alg1\nKey Exchanges: kex_alg1, kex_alg2\nCiphers: cipher_alg1, cipher_alg2, cipher_alg3\nMACs: mac_alg1, mac_alg2, mac_alg3" def test_policy_invalid_1(self): @@ -186,23 +186,23 @@ macs = mac_alg1, mac_alg2, mac_alg3''' '''Creates a policy from a kex and ensures it is generated exactly as expected.''' kex = self._get_kex() - pol_data = self.Policy.create('www.l0l.com', 'bannerX', 'headerX', kex) + pol_data = self.Policy.create('www.l0l.com', 'bannerX', kex) # Today's date is embedded in the policy, so filter it out to get repeatable results. pol_data = pol_data.replace(date.today().strftime('%Y/%m/%d'), '[todays date]') # Instead of writing out the entire expected policy--line by line--just check that it has the expected hash. - assert hashlib.sha256(pol_data.encode('ascii')).hexdigest() == 'e830fb9e5731995e5e4858b2b6d16704d7e5c2769d3a8d9acdd023a83ab337c5' + assert hashlib.sha256(pol_data.encode('ascii')).hexdigest() == '1765f236d765b1741e7006f601babf0a8e1628326341a3a00b1026c7f85f48ce' def test_policy_evaluate_passing_1(self): '''Creates a policy and evaluates it against the same server''' kex = self._get_kex() - policy_data = self.Policy.create('www.l0l.com', None, None, kex) + policy_data = self.Policy.create('www.l0l.com', None, kex) policy = self.Policy(policy_data=policy_data) - ret, errors = policy.evaluate('SSH Server 1.0', None, kex) + ret, errors = policy.evaluate('SSH Server 1.0', kex) assert ret is True assert len(errors) == 0 @@ -220,7 +220,7 @@ ciphers = cipher_alg1, cipher_alg2, cipher_alg3 macs = mac_alg1, mac_alg2, mac_alg3''' policy = self.Policy(policy_data=policy_data) - ret, errors = policy.evaluate('SSH Server 1.0', None, self._get_kex()) + ret, errors = policy.evaluate('SSH Server 1.0', self._get_kex()) assert ret is False assert len(errors) == 1 assert errors[0].find('Banner did not match.') != -1 @@ -238,7 +238,7 @@ ciphers = cipher_alg1, cipher_alg2, cipher_alg3 macs = mac_alg1, mac_alg2, mac_alg3''' policy = self.Policy(policy_data=policy_data) - ret, errors = policy.evaluate('SSH Server 1.0', None, self._get_kex()) + ret, errors = policy.evaluate('SSH Server 1.0', self._get_kex()) assert ret is False assert len(errors) == 1 assert errors[0].find('Compression types did not match.') != -1 @@ -256,7 +256,7 @@ ciphers = cipher_alg1, cipher_alg2, cipher_alg3 macs = mac_alg1, mac_alg2, mac_alg3''' policy = self.Policy(policy_data=policy_data) - ret, errors = policy.evaluate('SSH Server 1.0', None, self._get_kex()) + ret, errors = policy.evaluate('SSH Server 1.0', self._get_kex()) assert ret is False assert len(errors) == 1 assert errors[0].find('Host key types did not match.') != -1 @@ -274,7 +274,7 @@ ciphers = cipher_alg1, cipher_alg2, cipher_alg3 macs = mac_alg1, mac_alg2, mac_alg3''' policy = self.Policy(policy_data=policy_data) - ret, errors = policy.evaluate('SSH Server 1.0', None, self._get_kex()) + ret, errors = policy.evaluate('SSH Server 1.0', self._get_kex()) assert ret is False assert len(errors) == 1 assert errors[0].find('Key exchanges did not match.') != -1 @@ -292,7 +292,7 @@ ciphers = cipher_alg1, XXXmismatched, cipher_alg2, cipher_alg3 macs = mac_alg1, mac_alg2, mac_alg3''' policy = self.Policy(policy_data=policy_data) - ret, errors = policy.evaluate('SSH Server 1.0', None, self._get_kex()) + ret, errors = policy.evaluate('SSH Server 1.0', self._get_kex()) assert ret is False assert len(errors) == 1 assert errors[0].find('Ciphers did not match.') != -1 @@ -310,7 +310,7 @@ ciphers = cipher_alg1, cipher_alg2, cipher_alg3 macs = mac_alg1, mac_alg2, XXXmismatched, mac_alg3''' policy = self.Policy(policy_data=policy_data) - ret, errors = policy.evaluate('SSH Server 1.0', None, self._get_kex()) + ret, errors = policy.evaluate('SSH Server 1.0', self._get_kex()) assert ret is False assert len(errors) == 1 assert errors[0].find('MACs did not match.') != -1 @@ -328,7 +328,7 @@ ciphers = cipher_alg1, cipher_alg2, cipher_alg3 macs = mac_alg1, mac_alg2, XXXmismatchedXXX, mac_alg3''' policy = self.Policy(policy_data=policy_data) - ret, errors = policy.evaluate('SSH Server 1.0', None, self._get_kex()) + ret, errors = policy.evaluate('SSH Server 1.0', self._get_kex()) assert ret is False assert len(errors) == 2