Now supports a list of targets with -T (#11).

This commit is contained in:
Joe Testa 2020-07-13 18:39:05 -04:00
parent 8e3f3c6044
commit 381ba1a660
2 changed files with 120 additions and 33 deletions

View File

@ -31,6 +31,7 @@ import getopt
import hashlib
import io
import json
import ipaddress
import os
import random
import re
@ -74,6 +75,7 @@ def usage(err: Optional[str] = None) -> None:
uout.info(' -6, --ipv6 enable IPv6 (order of precedence)')
uout.info(' -p, --port=<port> port to connect')
uout.info(' -t, --timeout=<secs> timeout (in seconds) for connection and reading\n (default: 5)')
uout.info(' -T, --targets=<hosts.txt> a file containing a list of target hosts (one\n per line)')
uout.info('')
uout.info(' -b, --batch batch output')
uout.info(' -c, --client-audit starts a server on port 2222 to audit client\n software config (use -p to change port;\n use -t to change timeout)')
@ -405,6 +407,8 @@ class AuditConf:
self.policy = None # type: Optional[Policy] # Policy object
self.timeout = 5.0
self.timeout_set = False # Set to True when the user explicitly sets it.
self.target_file = None # type: Optional[str]
self.target_list = [] # type: List[str]
def __setattr__(self, name: str, value: Union[str, int, float, bool, Sequence[int]]) -> None:
valid = False
@ -446,7 +450,7 @@ class AuditConf:
if value == -1.0:
raise ValueError('invalid timeout: {}'.format(value))
valid = True
elif name in ['policy_file', 'policy']:
elif name in ['policy_file', 'policy', 'target_file', 'target_list']:
valid = True
if valid:
@ -457,14 +461,15 @@ class AuditConf:
# pylint: disable=too-many-branches
aconf = cls()
try:
sopts = 'h1246M:p:P:jbcnvl:t:'
lopts = ['help', 'ssh1', 'ssh2', 'ipv4', 'ipv6', 'make-policy=', 'port=', 'policy=', 'json', 'batch', 'client-audit', 'no-colors', 'verbose', 'level=', 'timeout=']
sopts = 'h1246M:p:P:jbcnvl:t:T:'
lopts = ['help', 'ssh1', 'ssh2', 'ipv4', 'ipv6', 'make-policy=', 'port=', 'policy=', 'json', 'batch', 'client-audit', 'no-colors', 'verbose', 'level=', 'timeout=', 'targets=']
opts, args = getopt.gnu_getopt(args, sopts, lopts)
except getopt.GetoptError as err:
usage_cb(str(err))
aconf.ssh1, aconf.ssh2 = False, False
host = '' # type: str
oport = None
oport = None # type: Optional[str]
port = 0 # type: int
for o, a in opts:
if o in ('-h', '--help'):
usage_cb()
@ -501,33 +506,44 @@ class AuditConf:
aconf.policy_file = a
elif o in ('-P', '--policy'):
aconf.policy_file = a
if len(args) == 0 and aconf.client_audit is False:
elif o in ('-T', '--targets'):
aconf.target_file = a
if len(args) == 0 and aconf.client_audit is False and aconf.target_file is None:
usage_cb()
if aconf.client_audit is False:
if aconf.client_audit is False and aconf.target_file is None:
if oport is not None:
host = args[0]
else:
mx = re.match(r'^\[([^\]]+)\](?::(.*))?$', args[0])
if mx is not None:
host, oport = mx.group(1), mx.group(2)
else:
s = args[0].split(':')
if len(s) > 2:
host, oport = args[0], '22'
else:
host, oport = s[0], s[1] if len(s) > 1 else '22'
if not host:
host, port = Utils.parse_host_and_port(args[0])
if not host and aconf.target_file is None:
usage_cb('host is empty')
elif oport is None:
oport = '2222'
port = utils.parse_int(oport)
if port <= 0 or port > 65535:
usage_cb('port {} is not valid'.format(oport))
if port == 0 and oport is None:
if aconf.client_audit: # The default port to listen on during a client audit is 2222.
port = 2222
else:
port = 22
if oport is not None:
port = utils.parse_int(oport)
if port <= 0 or port > 65535:
usage_cb('port {} is not valid'.format(oport))
aconf.host = host
aconf.port = port
if not (aconf.ssh1 or aconf.ssh2):
aconf.ssh1, aconf.ssh2 = True, True
# If a file containing a list of targets was given, read it.
if aconf.target_file is not None:
with open(aconf.target_file, 'r') as f:
aconf.target_list = f.readlines()
# Strip out whitespace from each line in target file.
aconf.target_list = [target.strip() for target in aconf.target_list]
# If a policy file was provided, validate it.
if (aconf.policy_file is not None) and (aconf.make_policy is False):
try:
@ -3123,13 +3139,32 @@ def output_info(software: Optional['SSH.Software'], client_audit: bool, any_prob
# Returns a PROGRAM_RETVAL_* flag to denote if any failures or warnings were encountered.
def output(aconf: AuditConf, banner: Optional[SSH.Banner], header: List[str], client_host: Optional[str] = None, kex: Optional[SSH2.Kex] = None, pkm: Optional[SSH1.PublicKeyMessage] = None) -> int:
def output(aconf: AuditConf, banner: Optional[SSH.Banner], header: List[str], client_host: Optional[str] = None, kex: Optional[SSH2.Kex] = None, pkm: Optional[SSH1.PublicKeyMessage] = None, print_target: bool = False) -> int:
program_retval = PROGRAM_RETVAL_GOOD
client_audit = client_host is not None # If set, this is a client audit.
sshv = 1 if pkm is not None else 2
algs = SSH.Algorithms(pkm, kex)
with OutputBuffer() as obuf:
if print_target:
host = aconf.host
# Print the port if it's not the default of 22.
if aconf.port != 22:
# Check if this is an IPv6 address, as that is printed in a different format.
is_ipv6 = True
try:
ipaddress.IPv6Address(aconf.host)
except ipaddress.AddressValueError:
is_ipv6 = False
if is_ipv6:
host = '[%s]:%d' % (aconf.host, aconf.port)
else:
host = '%s:%d' % (aconf.host, aconf.port)
out.good('(gen) target: {}'. format(host))
if client_audit:
out.good('(gen) client IP: {}'.format(client_host))
if len(header) > 0:
@ -3186,9 +3221,8 @@ def output(aconf: AuditConf, banner: Optional[SSH.Banner], header: List[str], cl
perfect_config = output_recommendations(algs, software, aconf.json, maxlen)
output_info(software, client_audit, not perfect_config, aconf.json)
# If the user requested JSON output, output that and return immediately.
if aconf.json:
print(json.dumps(build_struct(banner, kex=kex, client_host=client_host), sort_keys=True))
print(json.dumps(build_struct(banner, kex=kex, client_host=client_host), sort_keys=True), end='' if len(aconf.target_list) > 0 else "\n") # Print the JSON of the audit info. Skip the newline at the end if multiple targets were given (since each audit dump will go into its own list entry).
elif len(unknown_algorithms) > 0: # If we encountered any unknown algorithms, ask the user to report them.
out.warn("\n\n!!! WARNING: unknown algorithm(s) found!: %s. Please email the full output above to the maintainer (jtesta@positronsecurity.com), or create a Github issue at <https://github.com/jtesta/ssh-audit/issues>.\n" % ','.join(unknown_algorithms))
@ -3334,6 +3368,27 @@ class Utils:
except Exception: # pylint: disable=bare-except
return -1.0
@staticmethod
def parse_host_and_port(host_and_port: str) -> Tuple[str, int]:
'''Parses a string into a tuple of its host and port. The port is 0 if not specified.'''
host = host_and_port
port = 0
mx = re.match(r'^\[([^\]]+)\](?::(\d+))?$', host_and_port)
if mx is not None:
host = mx.group(1)
port_str = mx.group(2)
if port_str is not None:
port = int(port_str)
else:
s = host_and_port.split(':')
if len(s) == 2:
host = s[0]
if len(s[1]) > 0:
port = int(s[1])
return host, port
def build_struct(banner: Optional['SSH.Banner'], kex: Optional['SSH2.Kex'] = None, pkm: Optional['SSH1.PublicKeyMessage'] = None, client_host: Optional[str] = None) -> Any:
@ -3433,7 +3488,7 @@ def build_struct(banner: Optional['SSH.Banner'], kex: Optional['SSH2.Kex'] = Non
# Returns one of the PROGRAM_RETVAL_* flags.
def audit(aconf: AuditConf, sshv: Optional[int] = None) -> int:
def audit(aconf: AuditConf, sshv: Optional[int] = None, print_target: bool = False) -> int:
program_retval = PROGRAM_RETVAL_GOOD
out.batch = aconf.batch
out.verbose = aconf.verbose
@ -3491,7 +3546,7 @@ def audit(aconf: AuditConf, sshv: Optional[int] = None) -> int:
# This is a standard audit scan.
if (aconf.policy is None) and (aconf.make_policy is False):
program_retval = output(aconf, banner, header, client_host=s.client_host, kex=kex)
program_retval = output(aconf, banner, header, client_host=s.client_host, kex=kex, print_target=print_target)
# This is a policy test.
elif (aconf.policy is not None) and (aconf.make_policy is False):
@ -3512,8 +3567,42 @@ out = Output()
def main() -> int:
conf = AuditConf.from_cmdline(sys.argv[1:], usage)
return audit(conf)
aconf = AuditConf.from_cmdline(sys.argv[1:], usage)
# If multiple targets were specified...
if len(aconf.target_list) > 0:
ret = PROGRAM_RETVAL_GOOD
# If JSON output is desired, each target's results will be reported in its own list entry.
if aconf.json:
print('[', end='')
# Loop through each target in the list.
for i, target in enumerate(aconf.target_list):
aconf.host, port = Utils.parse_host_and_port(target)
if port == 0:
port = 22
aconf.port = port
new_ret = audit(aconf, print_target=True)
# Set the return value only if an unknown error occurred, a failure occurred, or if a warning occurred and the previous value was good.
if (new_ret == PROGRAM_RETVAL_UNKNOWN_ERROR) or (new_ret == PROGRAM_RETVAL_FAILURE) or ((new_ret == PROGRAM_RETVAL_WARNING) and (ret == PROGRAM_RETVAL_GOOD)):
ret = new_ret
# Don't print a delimiter after the last target was handled.
if i + 1 != len(aconf.target_list):
if aconf.json:
print(", ", end='')
else:
print(("-" * 80) + "\n")
if aconf.json:
print(']')
return ret
else:
return audit(aconf)
if __name__ == '__main__': # pragma: nocover

View File

@ -154,17 +154,15 @@ class TestAuditConf:
self._test_conf(conf, host='2001:4860:4860::8888', port=2222)
conf = c('-p 2222 2001:4860:4860::8888')
self._test_conf(conf, host='2001:4860:4860::8888', port=2222)
with pytest.raises(SystemExit):
conf = c('localhost:')
with pytest.raises(SystemExit):
with pytest.raises(ValueError):
conf = c('localhost:abc')
with pytest.raises(SystemExit):
conf = c('-p abc localhost')
with pytest.raises(SystemExit):
with pytest.raises(ValueError):
conf = c('localhost:-22')
with pytest.raises(SystemExit):
conf = c('-p -22 localhost')
with pytest.raises(SystemExit):
with pytest.raises(ValueError):
conf = c('localhost:99999')
with pytest.raises(SystemExit):
conf = c('-p 99999 localhost')