mirror of
https://github.com/jtesta/ssh-audit.git
synced 2025-06-23 11:04:31 +02:00
Fixed pylint errors, consolidated error checking for granular GEX tests, renamed functions for better readability.
This commit is contained in:
@ -21,13 +21,12 @@
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
||||
"""
|
||||
import traceback
|
||||
|
||||
# pylint: disable=unused-import
|
||||
from typing import Dict, List, Set, Sequence, Tuple, Iterable # noqa: F401
|
||||
from typing import Callable, Optional, Union, Any # noqa: F401
|
||||
|
||||
import traceback
|
||||
|
||||
from ssh_audit.kexdh import KexGroupExchange_SHA1, KexGroupExchange_SHA256
|
||||
from ssh_audit.ssh2_kexdb import SSH2_KexDB
|
||||
from ssh_audit.ssh2_kex import SSH2_Kex
|
||||
@ -71,11 +70,10 @@ class GEXTest:
|
||||
|
||||
return True
|
||||
|
||||
# Tests for modulus size in bits against the specified target.
|
||||
@staticmethod
|
||||
def modulus_size_test(out: 'OutputBuffer', s: 'SSH_Socket', kex: 'SSH2_Kex', bits_min: int, bits_pref: int, bits_max: int, modulus_dict: Dict[str, List[int]]) -> int:
|
||||
def granular_modulus_size_test(out: 'OutputBuffer', s: 'SSH_Socket', kex: 'SSH2_Kex', bits_min: int, bits_pref: int, bits_max: int, modulus_dict: Dict[str, List[int]]) -> int:
|
||||
'''
|
||||
Tests for modulus size in bits against the target target.
|
||||
Tests for granular modulus sizes.
|
||||
Builds a dictionary, where a key represents a DH algorithm name and the
|
||||
values are the modulus sizes (in bits) that have been returned by the
|
||||
target server.
|
||||
@ -89,10 +87,6 @@ class GEXTest:
|
||||
out.d("Bits Pref: " + str(bits_pref))
|
||||
out.d("Bits Max: " + str(bits_max))
|
||||
|
||||
if all(x < 0 for x in (bits_min, bits_pref, bits_max)):
|
||||
out.fail("min, pref and max values cannot be negative.")
|
||||
return exitcodes.FAILURE
|
||||
|
||||
GEX_ALGS = {
|
||||
'diffie-hellman-group-exchange-sha1': KexGroupExchange_SHA1,
|
||||
'diffie-hellman-group-exchange-sha256': KexGroupExchange_SHA256,
|
||||
@ -100,11 +94,11 @@ class GEXTest:
|
||||
|
||||
# Check if the server supports any of the group-exchange
|
||||
# algorithms. If so, test each one.
|
||||
for gex_alg in GEX_ALGS:
|
||||
for gex_alg, kex_group_class in GEX_ALGS.items():
|
||||
if gex_alg not in kex.kex_algorithms:
|
||||
out.d('Server does not support the algorithm "' + gex_alg + '".', write_now=True)
|
||||
else:
|
||||
kex_group = GEX_ALGS[gex_alg]()
|
||||
kex_group = kex_group_class()
|
||||
out.d('Preparing to perform DH group exchange using ' + gex_alg + ' with min, pref and max modulus sizes of ' + str(bits_min) + ' bits, ' + str(bits_pref) + ' bits and ' + str(bits_max) + ' bits...', write_now=True)
|
||||
|
||||
# It has been observed that reconnecting to some SSH servers
|
||||
@ -121,11 +115,8 @@ class GEXTest:
|
||||
kex_group.recv_reply(s, False)
|
||||
modulus_size_returned = kex_group.get_dh_modulus_size()
|
||||
out.d('Modulus size returned by server: ' + str(modulus_size_returned) + ' bits', write_now=True)
|
||||
except Exception as e:
|
||||
out.d('[exception] ' + str(e), write_now=True)
|
||||
# import traceback
|
||||
# print(traceback.format_exc())
|
||||
pass
|
||||
except Exception:
|
||||
out.d('[exception] ' + str(traceback.format_exc()), write_now=True)
|
||||
finally:
|
||||
# The server is in a state that is not re-testable,
|
||||
# so there's nothing else to do with this open
|
||||
@ -159,8 +150,7 @@ class GEXTest:
|
||||
# algorithms. If so, test each one.
|
||||
for gex_alg, kex_group_class in GEX_ALGS.items():
|
||||
if gex_alg in kex.kex_algorithms:
|
||||
weak_sizes = 512, 1024, 1536
|
||||
out.d('Preparing to perform DH group exchange using ' + gex_alg + ' with min, pref and max modulus sizes of ' + str(weak_sizes[0]) + ' bits, ' + str(weak_sizes[1]) + ' bits and ' + str(weak_sizes[2]) + ' bits...', write_now=True)
|
||||
out.d('Preparing to perform DH group exchange using ' + gex_alg + ' with min, pref and max modulus sizes of 512 bits, 1024 bits and 1536 bits...', write_now=True)
|
||||
|
||||
if GEXTest.reconnect(out, s, kex, gex_alg) is False:
|
||||
break
|
||||
@ -170,7 +160,7 @@ class GEXTest:
|
||||
|
||||
# First try a range of weak sizes.
|
||||
try:
|
||||
kex_group.send_init_gex(s, weak_sizes[0], weak_sizes[1], weak_sizes[2])
|
||||
kex_group.send_init_gex(s, 512, 1024, 1536)
|
||||
kex_group.recv_reply(s, False)
|
||||
|
||||
# Its been observed that servers will return a group
|
||||
@ -179,9 +169,8 @@ class GEXTest:
|
||||
smallest_modulus = kex_group.get_dh_modulus_size()
|
||||
out.d('Modulus size returned by server: ' + str(smallest_modulus) + ' bits', write_now=True)
|
||||
|
||||
except Exception as e:
|
||||
out.d('[exception] ' + str(e), write_now=True)
|
||||
pass
|
||||
except Exception:
|
||||
out.d('[exception] ' + str(traceback.format_exc()), write_now=True)
|
||||
finally:
|
||||
s.close()
|
||||
|
||||
@ -205,11 +194,8 @@ class GEXTest:
|
||||
kex_group.recv_reply(s, False)
|
||||
smallest_modulus = kex_group.get_dh_modulus_size()
|
||||
out.d('Modulus size returned by server: ' + str(smallest_modulus) + ' bits', write_now=True)
|
||||
except Exception as e:
|
||||
out.d('[exception] ' + str(e), write_now=True)
|
||||
# import traceback
|
||||
# print(traceback.format_exc())
|
||||
pass
|
||||
except Exception:
|
||||
out.d('[exception] ' + str(traceback.format_exc()), write_now=True)
|
||||
finally:
|
||||
# The server is in a state that is not re-testable,
|
||||
# so there's nothing else to do with this open
|
||||
|
@ -87,7 +87,7 @@ def usage(err: Optional[str] = None) -> None:
|
||||
uout.info(' -b, --batch batch output')
|
||||
uout.info(' -c, --client-audit starts a server on port 2222 to audit client\n software config (use -p to change port;\n use -t to change timeout)')
|
||||
uout.info(' -d, --debug debug output')
|
||||
uout.info(' -g, --gex-test=<n[,n,...] | min:pref:max[,min:pref:max,...] | n-n[:step]> dh gex modulus size test')
|
||||
uout.info(' -g, --gex-test=<x[,y,...] | min:pref:max[,min:pref:max,...] | x-y[:step]> dh gex modulus size test')
|
||||
uout.info(' -j, --json JSON output (use -jj to enable indents)')
|
||||
uout.info(' -l, --level=<level> minimum output level (info|warn|fail)')
|
||||
uout.info(' -L, --list-policies list all the official, built-in policies')
|
||||
@ -654,10 +654,29 @@ def process_commandline(out: OutputBuffer, args: List[str], usage_cb: Callable[.
|
||||
aconf.debug = True
|
||||
out.debug = True
|
||||
elif o in ('-g', '--gex-test'):
|
||||
if not((any(re.search(regex_str, a) for regex_str in get_permitted_syntax_for_gex_test().values()))):
|
||||
permitted_syntax = get_permitted_syntax_for_gex_test()
|
||||
|
||||
if not any(re.search(regex_str, a) for regex_str in permitted_syntax.values()):
|
||||
usage_cb('{} {} is not valid'.format(o, a))
|
||||
|
||||
if re.search(permitted_syntax['RANGE'], a):
|
||||
extracted_digits = re.findall(r'\d+', a)
|
||||
bits_left_bound = int(extracted_digits[0])
|
||||
bits_right_bound = int(extracted_digits[1])
|
||||
|
||||
bits_step = 1
|
||||
if (len(extracted_digits)) == 3:
|
||||
bits_step = int(extracted_digits[2])
|
||||
|
||||
if bits_step <= 0:
|
||||
usage_cb('{} {} is not valid'.format(o, bits_step))
|
||||
|
||||
if all(x < 0 for x in (bits_left_bound, bits_right_bound)):
|
||||
usage_cb('{} {} {} is not valid'.format(o, bits_left_bound, bits_right_bound))
|
||||
|
||||
aconf.gex_test = a
|
||||
|
||||
|
||||
if len(args) == 0 and aconf.client_audit is False and aconf.target_file is None and aconf.list_policies is False and aconf.lookup == '' and aconf.manual is False:
|
||||
usage_cb()
|
||||
|
||||
@ -928,8 +947,7 @@ def audit(out: OutputBuffer, aconf: AuditConf, sshv: Optional[int] = None, print
|
||||
if aconf.client_audit is False:
|
||||
HostKeyTest.run(out, s, kex)
|
||||
if aconf.gex_test != '':
|
||||
program_retval = invoke_modulus_size_test(out, s, kex, aconf)
|
||||
return program_retval
|
||||
return run_gex_granular_modulus_size_test(out, s, kex, aconf)
|
||||
else:
|
||||
GEXTest.run(out, s, kex)
|
||||
|
||||
@ -1079,15 +1097,11 @@ def get_permitted_syntax_for_gex_test() -> Dict[str, str]:
|
||||
return syntax
|
||||
|
||||
|
||||
def invoke_modulus_size_test(out: OutputBuffer, s: 'SSH_Socket', kex: 'SSH2_Kex', aconf: AuditConf) -> int:
|
||||
def run_gex_granular_modulus_size_test(out: OutputBuffer, s: 'SSH_Socket', kex: 'SSH2_Kex', aconf: AuditConf) -> int:
|
||||
'''Extracts the user specified modulus sizes and submits them for testing against the target server. Returns an exitcodes.* flag.'''
|
||||
|
||||
permitted_syntax = get_permitted_syntax_for_gex_test()
|
||||
|
||||
if not((any(re.search(regex_str, aconf.gex_test) for regex_str in permitted_syntax.values()))):
|
||||
out.fail("Invalid syntax.")
|
||||
return exitcodes.FAILURE
|
||||
|
||||
mod_dict: Dict[str, List[int]] = {}
|
||||
|
||||
# Range syntax.
|
||||
@ -1096,18 +1110,9 @@ def invoke_modulus_size_test(out: OutputBuffer, s: 'SSH_Socket', kex: 'SSH2_Kex'
|
||||
bits_left_bound = int(extracted_digits[0])
|
||||
bits_right_bound = int(extracted_digits[1])
|
||||
|
||||
bits_step = 1
|
||||
if (len(extracted_digits)) == 3:
|
||||
bits_step = int(extracted_digits[2])
|
||||
else:
|
||||
bits_step = 1
|
||||
|
||||
if bits_step <= 0:
|
||||
out.fail("Step value must be greater than zero.")
|
||||
return exitcodes.FAILURE
|
||||
|
||||
if all(x < 0 for x in (bits_left_bound, bits_right_bound)):
|
||||
out.fail("Start and end values cannot be negative.")
|
||||
return exitcodes.FAILURE
|
||||
|
||||
# If the left value is greater than the right value, then the sequence
|
||||
# operates from right to left.
|
||||
@ -1119,7 +1124,7 @@ def invoke_modulus_size_test(out: OutputBuffer, s: 'SSH_Socket', kex: 'SSH2_Kex'
|
||||
out.v("A separate test will be performed against each of the following modulus sizes: " + ", ".join([str(x) for x in bits_in_range_to_test]) + ".", write_now=True)
|
||||
|
||||
for i_bits in bits_in_range_to_test:
|
||||
program_retval = GEXTest.modulus_size_test(out, s, kex, i_bits, i_bits, i_bits, mod_dict)
|
||||
program_retval = GEXTest.granular_modulus_size_test(out, s, kex, i_bits, i_bits, i_bits, mod_dict)
|
||||
if program_retval != exitcodes.GOOD:
|
||||
return program_retval
|
||||
|
||||
@ -1128,15 +1133,16 @@ def invoke_modulus_size_test(out: OutputBuffer, s: 'SSH_Socket', kex: 'SSH2_Kex'
|
||||
bits_in_list_to_test = aconf.gex_test.split(',')
|
||||
out.v("A separate test will be performed against each of the following modulus sizes: " + ", ".join([str(x) for x in bits_in_list_to_test]) + ".", write_now=True)
|
||||
for s_bits in bits_in_list_to_test:
|
||||
program_retval = GEXTest.modulus_size_test(out, s, kex, int(s_bits), int(s_bits), int(s_bits), mod_dict)
|
||||
program_retval = GEXTest.granular_modulus_size_test(out, s, kex, int(s_bits), int(s_bits), int(s_bits), mod_dict)
|
||||
if program_retval != exitcodes.GOOD:
|
||||
return program_retval
|
||||
|
||||
if re.search(permitted_syntax['LIST_WITH_MIN_PREF_MAX'], aconf.gex_test):
|
||||
sets_of_min_pref_max = aconf.gex_test.split(',')
|
||||
out.v("A separate test will be performed against each of the following sets of 'min:pref:max' modulus sizes: " + ', '.join(sets_of_min_pref_max), write_now=True)
|
||||
for set_of_min_pref_max in sets_of_min_pref_max:
|
||||
bits_in_list_to_test = set_of_min_pref_max.split(':')
|
||||
program_retval = GEXTest.modulus_size_test(out, s, kex, int(bits_in_list_to_test[0]), int(bits_in_list_to_test[1]), int(bits_in_list_to_test[2]), mod_dict)
|
||||
program_retval = GEXTest.granular_modulus_size_test(out, s, kex, int(bits_in_list_to_test[0]), int(bits_in_list_to_test[1]), int(bits_in_list_to_test[2]), mod_dict)
|
||||
if program_retval != exitcodes.GOOD:
|
||||
return program_retval
|
||||
|
||||
@ -1150,7 +1156,7 @@ def invoke_modulus_size_test(out: OutputBuffer, s: 'SSH_Socket', kex: 'SSH2_Kex'
|
||||
|
||||
for key, value in mod_dict.items():
|
||||
padding = (max_key_len - len(key)) + 1
|
||||
out.info(key + " " * padding + '--> ' + ', '.join(map(str, value)))
|
||||
out.info(key + " " * padding + '--> ' + ', '.join([str(i) for i in value]))
|
||||
|
||||
return program_retval
|
||||
|
||||
|
Reference in New Issue
Block a user