DH GEX Modulus Size Testing

This commit is contained in:
Adam Russell
2022-02-16 21:01:22 +00:00
committed by Joe Testa
parent 0a6ac5de54
commit 5ac0ffa8f1
4 changed files with 243 additions and 11 deletions

View File

@@ -87,6 +87,7 @@ def usage(err: Optional[str] = None) -> None:
uout.info(' -b, --batch batch output')
uout.info(' -c, --client-audit starts a server on port 2222 to audit client\n software config (use -p to change port;\n use -t to change timeout)')
uout.info(' -d, --debug debug output')
uout.info(' -g, --gex-test=<n[,n,...] | min:pref:max[,min:pref:max,...] | n-n[:step]> dh gex modulus size test')
uout.info(' -j, --json JSON output (use -jj to enable indents)')
uout.info(' -l, --level=<level> minimum output level (info|warn|fail)')
uout.info(' -L, --list-policies list all the official, built-in policies')
@@ -589,8 +590,8 @@ def process_commandline(out: OutputBuffer, args: List[str], usage_cb: Callable[.
# pylint: disable=too-many-branches
aconf = AuditConf()
try:
sopts = 'h1246M:p:P:jbcnvl:t:T:Lmd'
lopts = ['help', 'ssh1', 'ssh2', 'ipv4', 'ipv6', 'make-policy=', 'port=', 'policy=', 'json', 'batch', 'client-audit', 'no-colors', 'verbose', 'level=', 'timeout=', 'targets=', 'list-policies', 'lookup=', 'threads=', 'manual', 'debug']
sopts = 'h1246M:p:P:jbcnvl:t:T:Lmdg:'
lopts = ['help', 'ssh1', 'ssh2', 'ipv4', 'ipv6', 'make-policy=', 'port=', 'policy=', 'json', 'batch', 'client-audit', 'no-colors', 'verbose', 'level=', 'timeout=', 'targets=', 'list-policies', 'lookup=', 'threads=', 'manual', 'debug', 'gex-test=']
opts, args = getopt.gnu_getopt(args, sopts, lopts)
except getopt.GetoptError as err:
usage_cb(str(err))
@@ -652,6 +653,10 @@ def process_commandline(out: OutputBuffer, args: List[str], usage_cb: Callable[.
elif o in ('-d', '--debug'):
aconf.debug = True
out.debug = True
elif o in ('-g', '--gex-test'):
if not((any(re.search(regex_str, a) for regex_str in get_permitted_syntax_for_gex_test().values()))):
usage_cb('{} {} is not valid'.format(o, a))
aconf.gex_test = a
if len(args) == 0 and aconf.client_audit is False and aconf.target_file is None and aconf.list_policies is False and aconf.lookup == '' and aconf.manual is False:
usage_cb()
@@ -922,7 +927,11 @@ def audit(out: OutputBuffer, aconf: AuditConf, sshv: Optional[int] = None, print
if aconf.client_audit is False:
HostKeyTest.run(out, s, kex)
GEXTest.run(out, s, kex)
if aconf.gex_test != '':
program_retval = invoke_modulus_size_test(out, s, kex, aconf)
return program_retval
else:
GEXTest.run(out, s, kex)
# This is a standard audit scan.
if (aconf.policy is None) and (aconf.make_policy is False):
@@ -1061,6 +1070,91 @@ def windows_manual(out: OutputBuffer) -> int:
return retval
def get_permitted_syntax_for_gex_test() -> Dict[str, str]:
syntax = {
'RANGE': r'^\d+-\d+(:\d+)?$',
'LIST_WITHOUT_MIN_PREF_MAX': r'^\d+(,\d+)*$',
'LIST_WITH_MIN_PREF_MAX': r'^\d+:\d+:\d+(,\d+:\d+:\d+)*$'
}
return syntax
def invoke_modulus_size_test(out: OutputBuffer, s: 'SSH_Socket', kex: 'SSH2_Kex', aconf: AuditConf) -> int:
'''Extracts the user specified modulus sizes and submits them for testing against the target target. Returns an exitcodes.* flag.'''
permitted_syntax = get_permitted_syntax_for_gex_test()
if not((any(re.search(regex_str, aconf.gex_test) for regex_str in permitted_syntax.values()))):
out.fail("Invalid syntax.")
return exitcodes.FAILURE
mod_dict: Dict[str, List[int]] = {}
# Range syntax.
if re.search(permitted_syntax['RANGE'], aconf.gex_test):
extracted_digits = re.findall(r'\d+', aconf.gex_test)
bits_left_bound = int(extracted_digits[0])
bits_right_bound = int(extracted_digits[1])
if (len(extracted_digits)) == 3:
bits_step = int(extracted_digits[2])
else:
bits_step = 1
if bits_step <= 0:
out.fail("Step value must be greater than zero.")
return exitcodes.FAILURE
if all(x < 0 for x in (bits_left_bound, bits_right_bound)):
out.fail("Start and end values cannot be negative.")
return exitcodes.FAILURE
# If the left value is greater than the right value, then the sequence
# operates from right to left.
if bits_left_bound <= bits_right_bound:
bits_in_range_to_test = range(bits_left_bound, bits_right_bound + 1, bits_step)
else:
bits_in_range_to_test = range(bits_left_bound, bits_right_bound - 1, -abs(bits_step))
out.v("A separate test will be performed against each of the following modulus sizes: " + ", ".join([str(x) for x in bits_in_range_to_test]) + ".", write_now=True)
for i_bits in bits_in_range_to_test:
program_retval = GEXTest.modulus_size_test(out, s, kex, i_bits, i_bits, i_bits, mod_dict)
if program_retval != exitcodes.GOOD:
return program_retval
# Two variations of list syntax.
if re.search(permitted_syntax['LIST_WITHOUT_MIN_PREF_MAX'], aconf.gex_test):
bits_in_list_to_test = aconf.gex_test.split(',')
out.v("A separate test will be performed against each of the following modulus sizes: " + ", ".join([str(x) for x in bits_in_list_to_test]) + ".", write_now=True)
for s_bits in bits_in_list_to_test:
program_retval = GEXTest.modulus_size_test(out, s, kex, int(s_bits), int(s_bits), int(s_bits), mod_dict)
if program_retval != exitcodes.GOOD:
return program_retval
if re.search(permitted_syntax['LIST_WITH_MIN_PREF_MAX'], aconf.gex_test):
sets_of_min_pref_max = aconf.gex_test.split(',')
out.v("A separate test will be performed against each of the following sets of 'min:pref:max' modulus sizes: " + ', '.join(sets_of_min_pref_max), write_now=True)
for set_of_min_pref_max in sets_of_min_pref_max:
bits_in_list_to_test = set_of_min_pref_max.split(':')
program_retval = GEXTest.modulus_size_test(out, s, kex, int(bits_in_list_to_test[0]), int(bits_in_list_to_test[1]), int(bits_in_list_to_test[2]), mod_dict)
if program_retval != exitcodes.GOOD:
return program_retval
if mod_dict:
if aconf.json:
json_struct = {'dh-gex-modulus-size': mod_dict}
out.info(json.dumps(json_struct, indent=4 if aconf.json_print_indent else None, sort_keys=True))
else:
out.head('# diffie-hellman group exchange modulus size')
max_key_len = len(max(mod_dict, key=len))
for key, value in mod_dict.items():
padding = (max_key_len - len(key)) + 1
out.info(key + " " * padding + '--> ' + ', '.join(map(str, value)))
return program_retval
def main() -> int:
out = OutputBuffer()
aconf = process_commandline(out, sys.argv[1:], usage)