From 381ba1a660638f32785daf5a7433e9f50c666b24 Mon Sep 17 00:00:00 2001 From: Joe Testa Date: Mon, 13 Jul 2020 18:39:05 -0400 Subject: [PATCH] Now supports a list of targets with -T (#11). --- ssh-audit.py | 145 +++++++++++++++++++++++++++++++++-------- test/test_auditconf.py | 8 +-- 2 files changed, 120 insertions(+), 33 deletions(-) diff --git a/ssh-audit.py b/ssh-audit.py index 0154d78..7c5db57 100755 --- a/ssh-audit.py +++ b/ssh-audit.py @@ -31,6 +31,7 @@ import getopt import hashlib import io import json +import ipaddress import os import random import re @@ -74,6 +75,7 @@ def usage(err: Optional[str] = None) -> None: uout.info(' -6, --ipv6 enable IPv6 (order of precedence)') uout.info(' -p, --port= port to connect') uout.info(' -t, --timeout= timeout (in seconds) for connection and reading\n (default: 5)') + uout.info(' -T, --targets= a file containing a list of target hosts (one\n per line)') uout.info('') uout.info(' -b, --batch batch output') uout.info(' -c, --client-audit starts a server on port 2222 to audit client\n software config (use -p to change port;\n use -t to change timeout)') @@ -405,6 +407,8 @@ class AuditConf: self.policy = None # type: Optional[Policy] # Policy object self.timeout = 5.0 self.timeout_set = False # Set to True when the user explicitly sets it. + self.target_file = None # type: Optional[str] + self.target_list = [] # type: List[str] def __setattr__(self, name: str, value: Union[str, int, float, bool, Sequence[int]]) -> None: valid = False @@ -446,7 +450,7 @@ class AuditConf: if value == -1.0: raise ValueError('invalid timeout: {}'.format(value)) valid = True - elif name in ['policy_file', 'policy']: + elif name in ['policy_file', 'policy', 'target_file', 'target_list']: valid = True if valid: @@ -457,14 +461,15 @@ class AuditConf: # pylint: disable=too-many-branches aconf = cls() try: - sopts = 'h1246M:p:P:jbcnvl:t:' - lopts = ['help', 'ssh1', 'ssh2', 'ipv4', 'ipv6', 'make-policy=', 'port=', 'policy=', 'json', 'batch', 'client-audit', 'no-colors', 'verbose', 'level=', 'timeout='] + sopts = 'h1246M:p:P:jbcnvl:t:T:' + lopts = ['help', 'ssh1', 'ssh2', 'ipv4', 'ipv6', 'make-policy=', 'port=', 'policy=', 'json', 'batch', 'client-audit', 'no-colors', 'verbose', 'level=', 'timeout=', 'targets='] opts, args = getopt.gnu_getopt(args, sopts, lopts) except getopt.GetoptError as err: usage_cb(str(err)) aconf.ssh1, aconf.ssh2 = False, False host = '' # type: str - oport = None + oport = None # type: Optional[str] + port = 0 # type: int for o, a in opts: if o in ('-h', '--help'): usage_cb() @@ -501,33 +506,44 @@ class AuditConf: aconf.policy_file = a elif o in ('-P', '--policy'): aconf.policy_file = a - if len(args) == 0 and aconf.client_audit is False: + elif o in ('-T', '--targets'): + aconf.target_file = a + + if len(args) == 0 and aconf.client_audit is False and aconf.target_file is None: usage_cb() - if aconf.client_audit is False: + + if aconf.client_audit is False and aconf.target_file is None: if oport is not None: host = args[0] else: - mx = re.match(r'^\[([^\]]+)\](?::(.*))?$', args[0]) - if mx is not None: - host, oport = mx.group(1), mx.group(2) - else: - s = args[0].split(':') - if len(s) > 2: - host, oport = args[0], '22' - else: - host, oport = s[0], s[1] if len(s) > 1 else '22' - if not host: + host, port = Utils.parse_host_and_port(args[0]) + if not host and aconf.target_file is None: usage_cb('host is empty') - elif oport is None: - oport = '2222' - port = utils.parse_int(oport) - if port <= 0 or port > 65535: - usage_cb('port {} is not valid'.format(oport)) + + if port == 0 and oport is None: + if aconf.client_audit: # The default port to listen on during a client audit is 2222. + port = 2222 + else: + port = 22 + + if oport is not None: + port = utils.parse_int(oport) + if port <= 0 or port > 65535: + usage_cb('port {} is not valid'.format(oport)) + aconf.host = host aconf.port = port if not (aconf.ssh1 or aconf.ssh2): aconf.ssh1, aconf.ssh2 = True, True + # If a file containing a list of targets was given, read it. + if aconf.target_file is not None: + with open(aconf.target_file, 'r') as f: + aconf.target_list = f.readlines() + + # Strip out whitespace from each line in target file. + aconf.target_list = [target.strip() for target in aconf.target_list] + # If a policy file was provided, validate it. if (aconf.policy_file is not None) and (aconf.make_policy is False): try: @@ -3123,13 +3139,32 @@ def output_info(software: Optional['SSH.Software'], client_audit: bool, any_prob # Returns a PROGRAM_RETVAL_* flag to denote if any failures or warnings were encountered. -def output(aconf: AuditConf, banner: Optional[SSH.Banner], header: List[str], client_host: Optional[str] = None, kex: Optional[SSH2.Kex] = None, pkm: Optional[SSH1.PublicKeyMessage] = None) -> int: +def output(aconf: AuditConf, banner: Optional[SSH.Banner], header: List[str], client_host: Optional[str] = None, kex: Optional[SSH2.Kex] = None, pkm: Optional[SSH1.PublicKeyMessage] = None, print_target: bool = False) -> int: program_retval = PROGRAM_RETVAL_GOOD client_audit = client_host is not None # If set, this is a client audit. sshv = 1 if pkm is not None else 2 algs = SSH.Algorithms(pkm, kex) with OutputBuffer() as obuf: + if print_target: + host = aconf.host + + # Print the port if it's not the default of 22. + if aconf.port != 22: + + # Check if this is an IPv6 address, as that is printed in a different format. + is_ipv6 = True + try: + ipaddress.IPv6Address(aconf.host) + except ipaddress.AddressValueError: + is_ipv6 = False + + if is_ipv6: + host = '[%s]:%d' % (aconf.host, aconf.port) + else: + host = '%s:%d' % (aconf.host, aconf.port) + + out.good('(gen) target: {}'. format(host)) if client_audit: out.good('(gen) client IP: {}'.format(client_host)) if len(header) > 0: @@ -3186,9 +3221,8 @@ def output(aconf: AuditConf, banner: Optional[SSH.Banner], header: List[str], cl perfect_config = output_recommendations(algs, software, aconf.json, maxlen) output_info(software, client_audit, not perfect_config, aconf.json) - # If the user requested JSON output, output that and return immediately. if aconf.json: - print(json.dumps(build_struct(banner, kex=kex, client_host=client_host), sort_keys=True)) + print(json.dumps(build_struct(banner, kex=kex, client_host=client_host), sort_keys=True), end='' if len(aconf.target_list) > 0 else "\n") # Print the JSON of the audit info. Skip the newline at the end if multiple targets were given (since each audit dump will go into its own list entry). elif len(unknown_algorithms) > 0: # If we encountered any unknown algorithms, ask the user to report them. out.warn("\n\n!!! WARNING: unknown algorithm(s) found!: %s. Please email the full output above to the maintainer (jtesta@positronsecurity.com), or create a Github issue at .\n" % ','.join(unknown_algorithms)) @@ -3334,6 +3368,27 @@ class Utils: except Exception: # pylint: disable=bare-except return -1.0 + @staticmethod + def parse_host_and_port(host_and_port: str) -> Tuple[str, int]: + '''Parses a string into a tuple of its host and port. The port is 0 if not specified.''' + host = host_and_port + port = 0 + + mx = re.match(r'^\[([^\]]+)\](?::(\d+))?$', host_and_port) + if mx is not None: + host = mx.group(1) + port_str = mx.group(2) + if port_str is not None: + port = int(port_str) + else: + s = host_and_port.split(':') + if len(s) == 2: + host = s[0] + if len(s[1]) > 0: + port = int(s[1]) + + return host, port + def build_struct(banner: Optional['SSH.Banner'], kex: Optional['SSH2.Kex'] = None, pkm: Optional['SSH1.PublicKeyMessage'] = None, client_host: Optional[str] = None) -> Any: @@ -3433,7 +3488,7 @@ def build_struct(banner: Optional['SSH.Banner'], kex: Optional['SSH2.Kex'] = Non # Returns one of the PROGRAM_RETVAL_* flags. -def audit(aconf: AuditConf, sshv: Optional[int] = None) -> int: +def audit(aconf: AuditConf, sshv: Optional[int] = None, print_target: bool = False) -> int: program_retval = PROGRAM_RETVAL_GOOD out.batch = aconf.batch out.verbose = aconf.verbose @@ -3491,7 +3546,7 @@ def audit(aconf: AuditConf, sshv: Optional[int] = None) -> int: # This is a standard audit scan. if (aconf.policy is None) and (aconf.make_policy is False): - program_retval = output(aconf, banner, header, client_host=s.client_host, kex=kex) + program_retval = output(aconf, banner, header, client_host=s.client_host, kex=kex, print_target=print_target) # This is a policy test. elif (aconf.policy is not None) and (aconf.make_policy is False): @@ -3512,8 +3567,42 @@ out = Output() def main() -> int: - conf = AuditConf.from_cmdline(sys.argv[1:], usage) - return audit(conf) + aconf = AuditConf.from_cmdline(sys.argv[1:], usage) + + # If multiple targets were specified... + if len(aconf.target_list) > 0: + ret = PROGRAM_RETVAL_GOOD + + # If JSON output is desired, each target's results will be reported in its own list entry. + if aconf.json: + print('[', end='') + + # Loop through each target in the list. + for i, target in enumerate(aconf.target_list): + aconf.host, port = Utils.parse_host_and_port(target) + if port == 0: + port = 22 + aconf.port = port + + new_ret = audit(aconf, print_target=True) + + # Set the return value only if an unknown error occurred, a failure occurred, or if a warning occurred and the previous value was good. + if (new_ret == PROGRAM_RETVAL_UNKNOWN_ERROR) or (new_ret == PROGRAM_RETVAL_FAILURE) or ((new_ret == PROGRAM_RETVAL_WARNING) and (ret == PROGRAM_RETVAL_GOOD)): + ret = new_ret + + # Don't print a delimiter after the last target was handled. + if i + 1 != len(aconf.target_list): + if aconf.json: + print(", ", end='') + else: + print(("-" * 80) + "\n") + + if aconf.json: + print(']') + + return ret + else: + return audit(aconf) if __name__ == '__main__': # pragma: nocover diff --git a/test/test_auditconf.py b/test/test_auditconf.py index 4003dc6..bc2b2cd 100644 --- a/test/test_auditconf.py +++ b/test/test_auditconf.py @@ -154,17 +154,15 @@ class TestAuditConf: self._test_conf(conf, host='2001:4860:4860::8888', port=2222) conf = c('-p 2222 2001:4860:4860::8888') self._test_conf(conf, host='2001:4860:4860::8888', port=2222) - with pytest.raises(SystemExit): - conf = c('localhost:') - with pytest.raises(SystemExit): + with pytest.raises(ValueError): conf = c('localhost:abc') with pytest.raises(SystemExit): conf = c('-p abc localhost') - with pytest.raises(SystemExit): + with pytest.raises(ValueError): conf = c('localhost:-22') with pytest.raises(SystemExit): conf = c('-p -22 localhost') - with pytest.raises(SystemExit): + with pytest.raises(ValueError): conf = c('localhost:99999') with pytest.raises(SystemExit): conf = c('-p 99999 localhost')