From 5ac0ffa8f1619ae50b07948acbb051ddccb748f3 Mon Sep 17 00:00:00 2001 From: Adam Russell Date: Wed, 16 Feb 2022 21:01:22 +0000 Subject: [PATCH] DH GEX Modulus Size Testing --- src/ssh_audit/auditconf.py | 3 +- src/ssh_audit/gextest.py | 86 +++++++++++++++++++++++++++++-- src/ssh_audit/ssh_audit.py | 100 +++++++++++++++++++++++++++++++++++-- ssh-audit.1 | 65 +++++++++++++++++++++++- 4 files changed, 243 insertions(+), 11 deletions(-) diff --git a/src/ssh_audit/auditconf.py b/src/ssh_audit/auditconf.py index 1d51240..e2994a6 100644 --- a/src/ssh_audit/auditconf.py +++ b/src/ssh_audit/auditconf.py @@ -59,6 +59,7 @@ class AuditConf: self.lookup = '' self.manual = False self.debug = False + self.gex_test = '' def __setattr__(self, name: str, value: Union[str, int, float, bool, Sequence[int]]) -> None: valid = False @@ -86,7 +87,7 @@ class AuditConf: if value == -1.0: raise ValueError('invalid timeout: {}'.format(value)) valid = True - elif name in ['ip_version_preference', 'lookup', 'policy_file', 'policy', 'target_file', 'target_list']: + elif name in ['ip_version_preference', 'lookup', 'policy_file', 'policy', 'target_file', 'target_list', 'gex_test']: valid = True elif name == "threads": valid, num_threads = True, Utils.parse_int(value) diff --git a/src/ssh_audit/gextest.py b/src/ssh_audit/gextest.py index f82a856..b8e203c 100644 --- a/src/ssh_audit/gextest.py +++ b/src/ssh_audit/gextest.py @@ -33,6 +33,7 @@ from ssh_audit.ssh2_kexdb import SSH2_KexDB from ssh_audit.ssh2_kex import SSH2_Kex from ssh_audit.ssh_socket import SSH_Socket from ssh_audit.outputbuffer import OutputBuffer +from ssh_audit import exitcodes # Performs DH group exchanges to find what moduli are supported, and checks @@ -70,6 +71,76 @@ class GEXTest: return True + # Tests for modulus size in bits against the specified target. + @staticmethod + def modulus_size_test(out: 'OutputBuffer', s: 'SSH_Socket', kex: 'SSH2_Kex', bits_min: int, bits_pref: int, bits_max: int, modulus_dict: Dict[str, List[int]]) -> int: + ''' + Tests for modulus size in bits against the target target. + Builds a dictionary, where a key represents a DH algorithm name and the + values are the modulus sizes (in bits) that have been returned by the + target server. + Returns an exitcodes.* flag. + ''' + + retval = exitcodes.GOOD + + out.d("Starting modulus_size_test...") + out.d("Bits Min: " + str(bits_min)) + out.d("Bits Pref: " + str(bits_pref)) + out.d("Bits Max: " + str(bits_max)) + + if all(x < 0 for x in (bits_min, bits_pref, bits_max)): + out.fail("min, pref and max values cannot be negative.") + return exitcodes.FAILURE + + GEX_ALGS = { + 'diffie-hellman-group-exchange-sha1': KexGroupExchange_SHA1, + 'diffie-hellman-group-exchange-sha256': KexGroupExchange_SHA256, + } + + # Check if the server supports any of the group-exchange + # algorithms. If so, test each one. + for gex_alg in GEX_ALGS: + if gex_alg not in kex.kex_algorithms: + out.d('Server does not support the algorithm "' + gex_alg + '".', write_now=True) + else: + kex_group = GEX_ALGS[gex_alg]() + out.d('Preparing to perform DH group exchange using ' + gex_alg + ' with min, pref and max modulus sizes of ' + str(bits_min) + ' bits, ' + str(bits_pref) + ' bits and ' + str(bits_max) + ' bits...', write_now=True) + + # It has been observed that reconnecting to some SSH servers + # multiple times in quick succession can eventually result + # in a "connection reset by peer" error. It may be possible + # to recover from such an error by sleeping for some time + # before continuing to issue reconnects. + if GEXTest.reconnect(out, s, kex, gex_alg) is False: + out.fail('Reconnect failed.') + return exitcodes.FAILURE + try: + modulus_size_returned = None + kex_group.send_init_gex(s, bits_min, bits_pref, bits_max) + kex_group.recv_reply(s, False) + modulus_size_returned = kex_group.get_dh_modulus_size() + out.d('Modulus size returned by server: ' + str(modulus_size_returned) + ' bits', write_now=True) + except Exception as e: + out.d('[exception] ' + str(e), write_now=True) + # import traceback + # print(traceback.format_exc()) + pass + finally: + # The server is in a state that is not re-testable, + # so there's nothing else to do with this open + # connection. + s.close() + + if modulus_size_returned is not None: + if gex_alg in modulus_dict: + if modulus_size_returned not in modulus_dict[gex_alg]: + modulus_dict[gex_alg].append(modulus_size_returned) + else: + modulus_dict[gex_alg] = [modulus_size_returned] + + return retval + # Runs the DH moduli test against the specified target. @staticmethod def run(out: 'OutputBuffer', s: 'SSH_Socket', kex: 'SSH2_Kex') -> None: @@ -88,7 +159,8 @@ class GEXTest: # algorithms. If so, test each one. for gex_alg, kex_group_class in GEX_ALGS.items(): if gex_alg in kex.kex_algorithms: - out.d('Preparing to perform DH group exchange using ' + gex_alg + '...', write_now=True) + weak_sizes = 512, 1024, 1536 + out.d('Preparing to perform DH group exchange using ' + gex_alg + ' with min, pref and max modulus sizes of ' + str(weak_sizes[0]) + ' bits, ' + str(weak_sizes[1]) + ' bits and ' + str(weak_sizes[2]) + ' bits...', write_now=True) if GEXTest.reconnect(out, s, kex, gex_alg) is False: break @@ -98,15 +170,17 @@ class GEXTest: # First try a range of weak sizes. try: - kex_group.send_init_gex(s, 512, 1024, 1536) + kex_group.send_init_gex(s, weak_sizes[0], weak_sizes[1], weak_sizes[2]) kex_group.recv_reply(s, False) # Its been observed that servers will return a group # larger than the requested max. So just because we # got here, doesn't mean the server is vulnerable... smallest_modulus = kex_group.get_dh_modulus_size() + out.d('Modulus size returned by server: ' + str(smallest_modulus) + ' bits', write_now=True) - except Exception: + except Exception as e: + out.d('[exception] ' + str(e), write_now=True) pass finally: s.close() @@ -120,7 +194,7 @@ class GEXTest: if bits >= smallest_modulus > 0: break - out.d('Preparing to perform DH group exchange using ' + gex_alg + ' with modulus size ' + str(bits) + '...', write_now=True) + out.d('Preparing to perform DH group exchange using ' + gex_alg + ' with min, pref and max modulus sizes of ' + str(bits) + ' bits...', write_now=True) if GEXTest.reconnect(out, s, kex, gex_alg) is False: reconnect_failed = True @@ -130,7 +204,9 @@ class GEXTest: kex_group.send_init_gex(s, bits, bits, bits) kex_group.recv_reply(s, False) smallest_modulus = kex_group.get_dh_modulus_size() - except Exception: + out.d('Modulus size returned by server: ' + str(smallest_modulus) + ' bits', write_now=True) + except Exception as e: + out.d('[exception] ' + str(e), write_now=True) # import traceback # print(traceback.format_exc()) pass diff --git a/src/ssh_audit/ssh_audit.py b/src/ssh_audit/ssh_audit.py index 6e5bad7..547d6f4 100755 --- a/src/ssh_audit/ssh_audit.py +++ b/src/ssh_audit/ssh_audit.py @@ -87,6 +87,7 @@ def usage(err: Optional[str] = None) -> None: uout.info(' -b, --batch batch output') uout.info(' -c, --client-audit starts a server on port 2222 to audit client\n software config (use -p to change port;\n use -t to change timeout)') uout.info(' -d, --debug debug output') + uout.info(' -g, --gex-test= dh gex modulus size test') uout.info(' -j, --json JSON output (use -jj to enable indents)') uout.info(' -l, --level= minimum output level (info|warn|fail)') uout.info(' -L, --list-policies list all the official, built-in policies') @@ -589,8 +590,8 @@ def process_commandline(out: OutputBuffer, args: List[str], usage_cb: Callable[. # pylint: disable=too-many-branches aconf = AuditConf() try: - sopts = 'h1246M:p:P:jbcnvl:t:T:Lmd' - lopts = ['help', 'ssh1', 'ssh2', 'ipv4', 'ipv6', 'make-policy=', 'port=', 'policy=', 'json', 'batch', 'client-audit', 'no-colors', 'verbose', 'level=', 'timeout=', 'targets=', 'list-policies', 'lookup=', 'threads=', 'manual', 'debug'] + sopts = 'h1246M:p:P:jbcnvl:t:T:Lmdg:' + lopts = ['help', 'ssh1', 'ssh2', 'ipv4', 'ipv6', 'make-policy=', 'port=', 'policy=', 'json', 'batch', 'client-audit', 'no-colors', 'verbose', 'level=', 'timeout=', 'targets=', 'list-policies', 'lookup=', 'threads=', 'manual', 'debug', 'gex-test='] opts, args = getopt.gnu_getopt(args, sopts, lopts) except getopt.GetoptError as err: usage_cb(str(err)) @@ -652,6 +653,10 @@ def process_commandline(out: OutputBuffer, args: List[str], usage_cb: Callable[. elif o in ('-d', '--debug'): aconf.debug = True out.debug = True + elif o in ('-g', '--gex-test'): + if not((any(re.search(regex_str, a) for regex_str in get_permitted_syntax_for_gex_test().values()))): + usage_cb('{} {} is not valid'.format(o, a)) + aconf.gex_test = a if len(args) == 0 and aconf.client_audit is False and aconf.target_file is None and aconf.list_policies is False and aconf.lookup == '' and aconf.manual is False: usage_cb() @@ -922,7 +927,11 @@ def audit(out: OutputBuffer, aconf: AuditConf, sshv: Optional[int] = None, print if aconf.client_audit is False: HostKeyTest.run(out, s, kex) - GEXTest.run(out, s, kex) + if aconf.gex_test != '': + program_retval = invoke_modulus_size_test(out, s, kex, aconf) + return program_retval + else: + GEXTest.run(out, s, kex) # This is a standard audit scan. if (aconf.policy is None) and (aconf.make_policy is False): @@ -1061,6 +1070,91 @@ def windows_manual(out: OutputBuffer) -> int: return retval +def get_permitted_syntax_for_gex_test() -> Dict[str, str]: + syntax = { + 'RANGE': r'^\d+-\d+(:\d+)?$', + 'LIST_WITHOUT_MIN_PREF_MAX': r'^\d+(,\d+)*$', + 'LIST_WITH_MIN_PREF_MAX': r'^\d+:\d+:\d+(,\d+:\d+:\d+)*$' + } + return syntax + + +def invoke_modulus_size_test(out: OutputBuffer, s: 'SSH_Socket', kex: 'SSH2_Kex', aconf: AuditConf) -> int: + '''Extracts the user specified modulus sizes and submits them for testing against the target target. Returns an exitcodes.* flag.''' + + permitted_syntax = get_permitted_syntax_for_gex_test() + + if not((any(re.search(regex_str, aconf.gex_test) for regex_str in permitted_syntax.values()))): + out.fail("Invalid syntax.") + return exitcodes.FAILURE + + mod_dict: Dict[str, List[int]] = {} + + # Range syntax. + if re.search(permitted_syntax['RANGE'], aconf.gex_test): + extracted_digits = re.findall(r'\d+', aconf.gex_test) + bits_left_bound = int(extracted_digits[0]) + bits_right_bound = int(extracted_digits[1]) + + if (len(extracted_digits)) == 3: + bits_step = int(extracted_digits[2]) + else: + bits_step = 1 + + if bits_step <= 0: + out.fail("Step value must be greater than zero.") + return exitcodes.FAILURE + + if all(x < 0 for x in (bits_left_bound, bits_right_bound)): + out.fail("Start and end values cannot be negative.") + return exitcodes.FAILURE + + # If the left value is greater than the right value, then the sequence + # operates from right to left. + if bits_left_bound <= bits_right_bound: + bits_in_range_to_test = range(bits_left_bound, bits_right_bound + 1, bits_step) + else: + bits_in_range_to_test = range(bits_left_bound, bits_right_bound - 1, -abs(bits_step)) + + out.v("A separate test will be performed against each of the following modulus sizes: " + ", ".join([str(x) for x in bits_in_range_to_test]) + ".", write_now=True) + + for i_bits in bits_in_range_to_test: + program_retval = GEXTest.modulus_size_test(out, s, kex, i_bits, i_bits, i_bits, mod_dict) + if program_retval != exitcodes.GOOD: + return program_retval + + # Two variations of list syntax. + if re.search(permitted_syntax['LIST_WITHOUT_MIN_PREF_MAX'], aconf.gex_test): + bits_in_list_to_test = aconf.gex_test.split(',') + out.v("A separate test will be performed against each of the following modulus sizes: " + ", ".join([str(x) for x in bits_in_list_to_test]) + ".", write_now=True) + for s_bits in bits_in_list_to_test: + program_retval = GEXTest.modulus_size_test(out, s, kex, int(s_bits), int(s_bits), int(s_bits), mod_dict) + if program_retval != exitcodes.GOOD: + return program_retval + if re.search(permitted_syntax['LIST_WITH_MIN_PREF_MAX'], aconf.gex_test): + sets_of_min_pref_max = aconf.gex_test.split(',') + out.v("A separate test will be performed against each of the following sets of 'min:pref:max' modulus sizes: " + ', '.join(sets_of_min_pref_max), write_now=True) + for set_of_min_pref_max in sets_of_min_pref_max: + bits_in_list_to_test = set_of_min_pref_max.split(':') + program_retval = GEXTest.modulus_size_test(out, s, kex, int(bits_in_list_to_test[0]), int(bits_in_list_to_test[1]), int(bits_in_list_to_test[2]), mod_dict) + if program_retval != exitcodes.GOOD: + return program_retval + + if mod_dict: + if aconf.json: + json_struct = {'dh-gex-modulus-size': mod_dict} + out.info(json.dumps(json_struct, indent=4 if aconf.json_print_indent else None, sort_keys=True)) + else: + out.head('# diffie-hellman group exchange modulus size') + max_key_len = len(max(mod_dict, key=len)) + + for key, value in mod_dict.items(): + padding = (max_key_len - len(key)) + 1 + out.info(key + " " * padding + '--> ' + ', '.join(map(str, value))) + + return program_retval + + def main() -> int: out = OutputBuffer() aconf = process_commandline(out, sys.argv[1:], usage) diff --git a/ssh-audit.1 b/ssh-audit.1 index 8428516..bcb0f58 100644 --- a/ssh-audit.1 +++ b/ssh-audit.1 @@ -1,4 +1,4 @@ -.TH SSH-AUDIT 1 "March 2, 2021" +.TH SSH-AUDIT 1 "February 13, 2022" .SH NAME \fBssh-audit\fP \- SSH server & client configuration auditor .SH SYNOPSIS @@ -51,6 +51,43 @@ Starts a server on port 2222 to audit client software configuration. Use -p/--p .br Enable debug output. +.TP +.B -g, \-\-gex-test= +.br +Runs a Diffie-Hellman Group Exchange modulus size test against a server. + +Diffie-Hellman requires the client and server to agree on a generator value and +a modulus value. In the "Group Exchange" implementation of Diffie-Hellman, the +client specifies the size of the modulus in bits by providing the server with +minimum, preferred and maximum values. The server then finds a group that best +matches the client's request, returning the corresponding generator and modulus. +For a full explanation of this process see RFC 4419 and its successors. + +This test acts as a client by providing an SSH server with the size of a modulus +and then obtains the size of the modulus returned by the server. + +Three types of syntax are supported: + + 1. + + A comma delimited list of modulus sizes. + A test is performed against each value in the list where it acts as the minimum, preferred and maximum modulus size. + + + 2. + + A set of three colon delimited values denoting minimum, preferred and maximum modulus size. + A test is performed against each set. + Multiple sets can specified as a comma separated list. + + 3. + + A range of modulus sizes with an optional step value. Step defaults to 1 if omitted. + If the left value is greater than the right value, then the sequence operates from right to left. + A test is performed against each value in the range where it acts as the minimum, preferred and maximum modulus size. + +Duplicates are excluded from the return value. + .TP .B -j, \-\-json .br @@ -130,7 +167,7 @@ When the -P/--policy option is used, \fBssh-audit\fP performs a policy audit. T Policy auditing is helpful for ensuring a group of related servers are properly hardened to an exact specification. .PP -The set of official built-in policies can be viewed with -L/--list-policies. Multiple servers can be audited with -T/--targets=. Custom policies can be made from an ideal target server with -M/--make-policy=. +The set of official built-in policies can be viewed with -L/--list-policies. Multiple servers can be audited with -T/--targets=. Custom policies can be made from an preferred target server with -M/--make-policy=. .SH EXAMPLES @@ -219,6 +256,30 @@ ssh-audit -M new_policy.txt targetserver .fi .RE +.LP +To run a Diffie-Hellman Group Exchange modulus size test using the values 2000 bits, 3000 bits, 4000 bits and 5000 bits: +.RS +.nf +ssh-audit targetserver --gex-test=2000,3000,4000,5000 +.fi +.RE + +.LP +To run a Diffie-Hellman Group Exchange modulus size test where 2048 bits is the minimum, 3072 bits is the preferred and 5000 bits is the maximum: +.RS +.nf +ssh-audit targetserver --gex-test=2048:3072:5000 +.fi +.RE + +.LP +To run a Diffie-Hellman Group Exchange modulus size test from 0 bits to 5120 bits in increments of 1024 bits: +.RS +.nf +ssh-audit targetserver --gex-test=0-5120:1024 +.fi +.RE + .SH RETURN VALUES When a successful connection is made and all algorithms are rated as "good", \fBssh-audit\fP returns 0. Other possible return values are: