Additional cleanups after merging #304.

This commit is contained in:
Joe Testa 2024-11-22 12:28:02 -05:00
parent 45abc3aaf4
commit a01baadfa8
5 changed files with 172 additions and 235 deletions

104
README.md
View File

@ -41,64 +41,61 @@
## Usage
```
usage: ssh-audit.py [options] <host>
usage: ssh-audit.py [-h] [-1] [-2] [-4] [-6] [-b] [-c] [-d]
[-g <min1:pref1:max1[,min2:pref2:max2,...]> / <x-y[:step]>] [-j] [-l {info,warn,fail}] [-L]
[-M custom_policy.txt] [-m] [-n] [-P "Built-In Policy Name" / custom_policy.txt] [-p N]
[-T targets.txt] [-t N] [-v] [--conn-rate-test N[:max_rate]] [--dheat N[:kex[:e_len]]]
[--lookup alg1[,alg2,...]] [--skip-rate-test] [--threads N]
[host]
-h, --help print this help
positional arguments:
host target hostname or IPv4/IPv6 address
optional arguments:
-h, --help show this help message and exit
-1, --ssh1 force ssh version 1 only
-2, --ssh2 force ssh version 2 only
-4, --ipv4 enable IPv4 (order of precedence)
-6, --ipv6 enable IPv6 (order of precedence)
-b, --batch batch output
-c, --client-audit starts a server on port 2222 to audit client
software config (use -p to change port;
use -t to change timeout)
--conn-rate-test=N[:max_rate] perform a connection rate test (useful
for collecting metrics related to
susceptibility of the DHEat vuln).
Testing is conducted with N concurrent
sockets with an optional maximum rate
of connections per second.
-d, --debug Enable debug output.
--dheat=N[:kex[:e_len]] continuously perform the DHEat DoS attack
(CVE-2002-20001) against the target using N
concurrent sockets. Optionally, a specific
key exchange algorithm can be specified
instead of allowing it to be automatically
chosen. Additionally, a small length of
the fake e value sent to the server can
be chosen for a more efficient attack (such
as 4).
-g, --gex-test=<x[,y,...]> dh gex modulus size test
<min1:pref1:max1[,min2:pref2:max2,...]>
<x-y[:step]>
-j, --json JSON output (use -jj to enable indents)
-l, --level=<level> minimum output level (info|warn|fail)
-L, --list-policies list all the official, built-in policies. Use with -v
to view policy change logs.
--lookup=<alg1,alg2,...> looks up an algorithm(s) without
connecting to a server
-m, --manual print the man page (Docker, PyPI, Snap, and Windows
builds only)
-M, --make-policy=<policy.txt> creates a policy based on the target server
(i.e.: the target server has the ideal
configuration that other servers should
adhere to)
-n, --no-colors disable colors
-p, --port=<port> port to connect
-P, --policy=<"policy name" | policy.txt> run a policy test using the
specified policy
--skip-rate-test skip the connection rate test during standard audits
(used to safely infer whether the DHEat attack
is viable)
-t, --timeout=<secs> timeout (in seconds) for connection and reading
(default: 5)
-T, --targets=<hosts.txt> a file containing a list of target hosts (one
per line, format HOST[:PORT]). Use -p/--port
to set the default port for all hosts. Use
--threads to control concurrent scans.
--threads=<threads> number of threads to use when scanning multiple
targets (-T/--targets) (default: 32)
-v, --verbose verbose output
-c, --client-audit starts a server on port 2222 to audit client software config (use -p to change port; use -t
to change timeout)
-d, --debug enable debugging output
-g <min1:pref1:max1[,min2:pref2:max2,...]> / <x-y[:step]>, --gex-test <min1:pref1:max1[,min2:pref2:max2,...]> / <x-y[:step]>
conducts a very customized Diffie-Hellman GEX modulus size test. Tests an array of minimum,
preferred, and maximum values, or a range of values with an optional incremental step amount
-j, --json enable JSON output (use -jj to enable indentation for better readability)
-l {info,warn,fail}, --level {info,warn,fail}
minimum output level (default: info)
-L, --list-policies list all the official, built-in policies. Combine with -v to view policy change logs
-M custom_policy.txt, --make-policy custom_policy.txt
creates a policy based on the target server (i.e.: the target server has the ideal
configuration that other servers should adhere to), and stores it in the file path specified
-m, --manual print the man page (Docker, PyPI, Snap, and Windows builds only)
-n, --no-colors disable colors (automatic when the NO_COLOR environment variable is set)
-P "Built-In Policy Name" / custom_policy.txt, --policy "Built-In Policy Name" / custom_policy.txt
run a policy test using the specified policy (use -L to see built-in policies, or specify
filesystem path to custom policy created by -M)
-p N, --port N the TCP port to connect to (or to listen on when -c is used)
-T targets.txt, --targets targets.txt
a file containing a list of target hosts (one per line, format HOST[:PORT]). Use -p/--port
to set the default port for all hosts. Use --threads to control concurrent scans
-t N, --timeout N timeout (in seconds) for connection and reading (default: 5)
-v, --verbose enable verbose output
--conn-rate-test N[:max_rate]
perform a connection rate test (useful for collecting metrics related to susceptibility of
the DHEat vuln). Testing is conducted with N concurrent sockets with an optional maximum
rate of connections per second
--dheat N[:kex[:e_len]]
continuously perform the DHEat DoS attack (CVE-2002-20001) against the target using N
concurrent sockets. Optionally, a specific key exchange algorithm can be specified instead
of allowing it to be automatically chosen. Additionally, a small length of the fake e value
sent to the server can be chosen for a more efficient attack (such as 4).
--lookup alg1[,alg2,...]
looks up an algorithm(s) without connecting to a server.
--skip-rate-test skip the connection rate test during standard audits (used to safely infer whether the DHEat
attack is viable)
--threads N number of threads to use when scanning multiple targets (-T/--targets) (default: 32)
```
* if both IPv4 and IPv6 are used, order of precedence can be set by using either `-46` or `-64`.
* batch flag `-b` will output sections without header and without empty lines (implies verbose flag).
@ -219,6 +216,9 @@ For convenience, a web front-end on top of the command-line tool is available at
## ChangeLog
### v3.4.0-dev
- Migrated from deprecated `getopt` module to `argparse`; partial credit [oam7575](https://github.com/oam7575).
### v3.3.0 (2024-10-15)
- Added Python 3.13 support.
- Added built-in policies for Ubuntu 24.04 LTS server & client, OpenSSH 9.8, and OpenSSH 9.9.

View File

@ -145,8 +145,10 @@ class OutputBuffer:
self._print('head', s, line_ended)
return self
def fail(self, s: str, line_ended: bool = True) -> 'OutputBuffer':
def fail(self, s: str, line_ended: bool = True, write_now: bool = False) -> 'OutputBuffer':
self._print('fail', s, line_ended)
if write_now:
self.write()
return self
def warn(self, s: str, line_ended: bool = True) -> 'OutputBuffer':

View File

@ -83,63 +83,6 @@ if sys.platform == 'win32':
# no_idna_workaround = True
def usage(uout: OutputBuffer, err: Optional[str] = None) -> None:
retval = exitcodes.GOOD
p = os.path.basename(sys.argv[0])
uout.head('# {} {}, https://github.com/jtesta/ssh-audit\n'.format(p, VERSION))
if err is not None and len(err) > 0:
uout.fail(err + '\n')
retval = exitcodes.UNKNOWN_ERROR
uout.info('usage: {0} [options] -ip <host>\n'.format(p))
uout.info(' -h, --help print this help')
uout.info(' -1, --ssh1 force ssh version 1 only')
uout.info(' -2, --ssh2 force ssh version 2 only')
uout.info(' -4, --ipv4 enable IPv4 (order of precedence)')
uout.info(' -6, --ipv6 enable IPv6 (order of precedence)')
uout.info(' -b, --batch batch output')
uout.info(' -c, --client-audit starts a server on port 2222 to audit client\n software config (use -p to change port;\n use -t to change timeout)')
uout.info(' --conn-rate-test=N[:max_rate] perform a connection rate test (useful')
uout.info(' for collecting metrics related to')
uout.info(' susceptibility of the DHEat vuln).')
uout.info(' Testing is conducted with N concurrent')
uout.info(' sockets with an optional maximum rate')
uout.info(' of connections per second.')
uout.info(' -d, --debug debug output')
uout.info(' --dheat=N[:kex[:e_len]] continuously perform the DHEat DoS attack')
uout.info(' (CVE-2002-20001) against the target using N')
uout.info(' concurrent sockets. Optionally, a specific')
uout.info(' key exchange algorithm can be specified')
uout.info(' instead of allowing it to be automatically')
uout.info(' chosen. Additionally, a small length of')
uout.info(' the fake e value sent to the server can')
uout.info(' be chosen for a more efficient attack (such')
uout.info(' as 4).')
uout.info(' -g, --gex-test=<x[,y,...]> dh gex modulus size test')
uout.info(' <min1:pref1:max1[,min2:pref2:max2,...]>')
uout.info(' <x-y[:step]>')
uout.info(' --hostname hostname of target to scan')
uout.info(' -ip, --ip-address ip address of target to scan')
uout.info(' -j, --json JSON output (use -jj to enable indents)')
uout.info(' -l, --level=<level> minimum output level (info|warn|fail)')
uout.info(' -L, --list-policies list all the official, built-in policies. Use with -v')
uout.info(' to view policy change logs.')
uout.info(' --lookup=<alg1,alg2,...> looks up an algorithm(s) without\n connecting to a server')
uout.info(' -M, --make-policy=<policy.txt> creates a policy based on the target server\n (i.e.: the target server has the ideal\n configuration that other servers should\n adhere to)')
uout.info(' -m, --manual print the man page (Docker, PyPI, Snap, and Windows\n builds only)')
uout.info(' -n, --no-colors disable colors (automatic when the NO_COLOR')
uout.info(' environment variable is set)')
uout.info(' -p, --port=<port> port to connect')
uout.info(' -P, --policy=<policy.txt> run a policy test using the specified policy')
uout.info(' --skip-rate-test skip the connection rate test during standard audits\n (used to safely infer whether the DHEat attack\n is viable)')
uout.info(' -t, --timeout=<secs> timeout (in seconds) for connection and reading\n (default: 5)')
uout.info(' -T, --targets=<hosts.txt> a file containing a list of target hosts (one\n per line, format HOST[:PORT]). Use -p/--port\n to set the default port for all hosts. Use\n --threads to control concurrent scans.')
uout.info(' --threads=<threads> number of threads to use when scanning multiple\n targets (-T/--targets) (default: 32)')
uout.info(' -v, --verbose verbose output')
uout.sep()
uout.write()
sys.exit(retval)
def output_algorithms(out: OutputBuffer, title: str, alg_db: Dict[str, Dict[str, List[List[Optional[str]]]]], alg_type: str, algorithms: List[str], unknown_algs: List[str], is_json_output: bool, program_retval: int, maxlen: int = 0, host_keys: Optional[Dict[str, Dict[str, Union[bytes, str, int]]]] = None, dh_modulus_sizes: Optional[Dict[str, int]] = None) -> int: # pylint: disable=too-many-arguments
with out:
for algorithm in algorithms:
@ -374,7 +317,7 @@ def output_recommendations(out: OutputBuffer, algs: Algorithms, algorithm_recomm
notes = " (%s)" % notes
fm = '(rec) {0}{1}{2}-- {3} algorithm to {4}{5} '
fn(fm.format(sg, name, p, alg_type, an, notes))
fn(fm.format(sg, name, p, alg_type, an, notes)) # type: ignore[operator]
if not out.is_section_empty() and not is_json_output:
if software is not None:
@ -826,7 +769,7 @@ def make_policy(aconf: AuditConf, banner: Optional['Banner'], kex: Optional['SSH
print(err)
def process_commandline(out: OutputBuffer, args: List[str], usage_cb: Callable[..., None]) -> 'AuditConf': # pylint: disable=too-many-statements
def process_commandline(out: OutputBuffer, args: List[str]) -> 'AuditConf': # pylint: disable=too-many-statements
# pylint: disable=too-many-branches
aconf = AuditConf()
@ -841,117 +784,88 @@ def process_commandline(out: OutputBuffer, args: List[str], usage_cb: Callable[.
aconf.ssh1, aconf.ssh2 = False, False
host: str = ''
port: int = 22
parser = argparse.ArgumentParser(prog='SSH Audit Tool', description='SSH Audit Tool', add_help=False, allow_abbrev=False)
parser = argparse.ArgumentParser(description="# {} {}, https://github.com/jtesta/ssh-audit".format(os.path.basename(sys.argv[0]), VERSION), allow_abbrev=False)
# Add short options to the parser
parser.add_argument('-1', '--ssh1', action="store_true", dest='ssh1', default=None)
parser.add_argument('-2', '--ssh2', action="store_true", dest='ssh2', default=None)
parser.add_argument('-4', '--ipv4', action="store_true", dest='ipv4', default=None)
parser.add_argument('-6', '--ipv6', action="store_true", dest='ipv6', default=None)
parser.add_argument('-b', '--batch', action="store_true", dest='batch', default=None)
parser.add_argument('-c', '--client-audit', action="store_true", dest='client_audit', default=None)
parser.add_argument('-d', '--debug', action="store_true", dest='debug', default=None)
parser.add_argument('-g', '--gex-test', action="store", dest='gex_test', default=None)
parser.add_argument('-h', '--help', action="store_true", dest='help', default=None)
parser.add_argument('-ip', '--ip-address', '--hostname', action="store", dest='host', type=str)
parser.add_argument('-j', '--json', action="store_true", dest='json', default=None)
parser.add_argument('-jj', '--json-indent', action="store_true", dest='json_indent', default=None)
parser.add_argument('-l', '--level', action="store", dest='level', type=str, default='info')
parser.add_argument('-L', '--list-policies', action="store_true", dest='list_policies', default=None)
parser.add_argument('-M', '--make-policy', action="store", dest='make_policy', default=None)
parser.add_argument('-m', '--manual', action="store_true", dest='manual', default=None)
parser.add_argument('-n', '--no-colors', action="store_true", dest='no_colors', default=None)
parser.add_argument('-P', '--policy', action="store", dest='policy', default=None)
parser.add_argument('-p', '--port', action="store", dest='port', default='22', type=int)
parser.add_argument('-T', '--targets', action="store", dest='targets', default=None)
parser.add_argument('-t', '--timeout', action="store", dest='timeout', default='5', type=int)
parser.add_argument('-v', '--verbose', action="store_true", dest='verbose', default=None)
parser.add_argument("-1", "--ssh1", action="store_true", dest="ssh1", default=False, help="force ssh version 1 only")
parser.add_argument("-2", "--ssh2", action="store_true", dest="ssh2", default=False, help="force ssh version 2 only")
parser.add_argument("-4", "--ipv4", action="store_true", dest="ipv4", default=False, help="enable IPv4 (order of precedence)")
parser.add_argument("-6", "--ipv6", action="store_true", dest="ipv6", default=False, help="enable IPv6 (order of precedence)")
parser.add_argument("-b", "--batch", action="store_true", dest="batch", default=False, help="batch output")
parser.add_argument("-c", "--client-audit", action="store_true", dest="client_audit", default=False, help="starts a server on port 2222 to audit client software config (use -p to change port; use -t to change timeout)")
parser.add_argument("-d", "--debug", action="store_true", dest="debug", default=False, help="enable debugging output")
parser.add_argument("-g", "--gex-test", action="store", dest="gex_test", metavar="<min1:pref1:max1[,min2:pref2:max2,...]> / <x-y[:step]>", type=str, default=None, help="conducts a very customized Diffie-Hellman GEX modulus size test. Tests an array of minimum, preferred, and maximum values, or a range of values with an optional incremental step amount")
parser.add_argument("-j", "--json", action="count", dest="json", default=0, help="enable JSON output (use -jj to enable indentation for better readability)")
parser.add_argument("-l", "--level", action="store", dest="level", type=str, choices=["info", "warn", "fail"], default="info", help="minimum output level (default: %(default)s)")
parser.add_argument("-L", "--list-policies", action="store_true", dest="list_policies", default=False, help="list all the official, built-in policies. Combine with -v to view policy change logs")
parser.add_argument("-M", "--make-policy", action="store", dest="make_policy", metavar="custom_policy.txt", type=str, default=None, help="creates a policy based on the target server (i.e.: the target server has the ideal configuration that other servers should adhere to), and stores it in the file path specified")
parser.add_argument("-m", "--manual", action="store_true", dest="manual", default=False, help="print the man page (Docker, PyPI, Snap, and Windows builds only)")
parser.add_argument("-n", "--no-colors", action="store_true", dest="no_colors", default=False, help="disable colors (automatic when the NO_COLOR environment variable is set)")
parser.add_argument("-P", "--policy", action="store", dest="policy", metavar="\"Built-In Policy Name\" / custom_policy.txt", type=str, default=None, help="run a policy test using the specified policy (use -L to see built-in policies, or specify filesystem path to custom policy created by -M)")
parser.add_argument("-p", "--port", action="store", dest="oport", metavar="N", type=int, default=None, help="the TCP port to connect to (or to listen on when -c is used)")
parser.add_argument("-T", "--targets", action="store", dest="targets", metavar="targets.txt", type=str, default=None, help="a file containing a list of target hosts (one per line, format HOST[:PORT]). Use -p/--port to set the default port for all hosts. Use --threads to control concurrent scans")
parser.add_argument("-t", "--timeout", action="store", dest="timeout", metavar="N", type=int, default=5, help="timeout (in seconds) for connection and reading (default: %(default)s)")
parser.add_argument("-v", "--verbose", action="store_true", dest="verbose", default=False, help="enable verbose output")
# Add long options to the parser
parser.add_argument('--conn-rate-test', action="store", dest='conn_rate_test', default='0', type=int)
parser.add_argument('--dheat', action="store", dest='dheat', default='0', type=int)
parser.add_argument('--lookup', action="store", dest='lookup', default=None)
parser.add_argument('--skip-rate-test', action="store_true", dest='skip_rate_test', default=None)
parser.add_argument('--threads', action="store", dest='threads', default='32', type=int)
parser.add_argument("--conn-rate-test", action="store", dest="conn_rate_test", metavar="N[:max_rate]", type=str, default=None, help="perform a connection rate test (useful for collecting metrics related to susceptibility of the DHEat vuln). Testing is conducted with N concurrent sockets with an optional maximum rate of connections per second")
parser.add_argument("--dheat", action="store", dest="dheat", metavar="N[:kex[:e_len]]", type=str, default=None, help="continuously perform the DHEat DoS attack (CVE-2002-20001) against the target using N concurrent sockets. Optionally, a specific key exchange algorithm can be specified instead of allowing it to be automatically chosen. Additionally, a small length of the fake e value sent to the server can be chosen for a more efficient attack (such as 4).")
parser.add_argument("--lookup", action="store", dest="lookup", metavar="alg1[,alg2,...]", type=str, default=None, help="looks up an algorithm(s) without connecting to a server.")
parser.add_argument("--skip-rate-test", action="store_true", dest="skip_rate_test", default=False, help="skip the connection rate test during standard audits (used to safely infer whether the DHEat attack is viable)")
parser.add_argument("--threads", action="store", dest="threads", metavar="N", type=int, default=32, help="number of threads to use when scanning multiple targets (-T/--targets) (default: %(default)s)")
# The mandatory target option. Or rather, mandatory when -L, -T, or --lookup are not used.
parser.add_argument("host", nargs="?", action="store", type=str, default="", help="target hostname or IPv4/IPv6 address")
# If no arguments were given, print the help and exit.
if len(args) < 1:
parser.print_help()
sys.exit(exitcodes.UNKNOWN_ERROR)
oport: Optional[int] = None
try:
argument = parser.parse_args()
argument = parser.parse_args(args=args)
if argument.help is True:
usage_cb(out)
aconf.host = argument.host
host = argument.host
port = argument.port
aconf.ssh1 = argument.ssh1
aconf.ssh2 = argument.ssh2
# Set simple flags.
aconf.client_audit = argument.client_audit
aconf.ipv4 = argument.ipv4
aconf.ipv6 = argument.ipv6
aconf.json = argument.json
if argument.json_indent is True:
setattr(argument, 'json', True)
aconf.json = argument.json
aconf.json_print_indent = argument.json_indent
aconf.level = argument.level
aconf.list_policies = argument.list_policies
aconf.manual = argument.manual
aconf.skip_rate_test = argument.skip_rate_test
aconf.ssh1 = argument.ssh1
aconf.ssh2 = argument.ssh2
oport = argument.oport
if argument.batch is True:
aconf.batch = True
aconf.verbose = True
aconf.client_audit = argument.client_audit
# If one -j was given, turn on JSON output. If -jj was given, enable indentation.
aconf.json = argument.json > 0
if argument.json > 1:
aconf.json_print_indent = True
ttime = argument.timeout
if ttime != 5:
aconf.timeout = float(argument.timeout)
aconf.timeout_set = True
if argument.verbose is True:
aconf.verbose = True
out.verbose = True
# Get error level regex
err_level = argument.level
if err_level in ["info", "warn", "fail"]:
aconf.level = str(argument.level)
else:
usage_cb(out, 'Error level : {} is not valid'.format(err_level))
if getattr(argument, 'make_policy') is True:
aconf.make_policy = True
aconf.policy_file = argument.make_policy
if getattr(argument, 'policy') is True:
aconf.policy_file = argument.policy
if getattr(argument, 'targets') is True:
aconf.target_file = argument.targets
if argument.threads != 32:
aconf.threads = argument.threads
if getattr(argument, 'list_policies') is True:
aconf.list_policies = True
if getattr(argument, 'lookup') is True:
aconf.lookup = argument.lookup
if getattr(argument, 'manual') is True:
aconf.manual = True
else:
aconf.manual = False
if argument.conn_rate_test is not None:
aconf.conn_rate_test = argument.conn_rate_test
if argument.debug is True:
aconf.debug = True
out.debug = True
if getattr(argument, 'gex_test') is True:
if argument.dheat is not None:
aconf.dheat = argument.dheat
if argument.gex_test is not None:
dh_gex = argument.gex_test
permitted_syntax = get_permitted_syntax_for_gex_test()
if not any(re.search(regex_str, dh_gex) for regex_str in permitted_syntax.values()):
usage_cb(out, '{} is not valid'.format(dh_gex))
out.fail('{} is not valid'.format(dh_gex), write_now=True)
sys.exit(exitcodes.UNKNOWN_ERROR)
if re.search(permitted_syntax['RANGE'], dh_gex):
extracted_digits = re.findall(r'\d+', dh_gex)
@ -963,55 +877,80 @@ def process_commandline(out: OutputBuffer, args: List[str], usage_cb: Callable[.
bits_step = int(extracted_digits[2])
if bits_step <= 0:
usage_cb(out, '{} {} is not valid'.format(dh_gex, bits_step))
out.fail('the step field cannot be 0 or less: {}'.format(bits_step), write_now=True)
sys.exit(exitcodes.UNKNOWN_ERROR)
if all(x < 0 for x in (bits_left_bound, bits_right_bound)):
usage_cb(out, '{} {} {} is not valid'.format(dh_gex, bits_left_bound, bits_right_bound))
out.fail('{} {} {} is not valid'.format(dh_gex, bits_left_bound, bits_right_bound), write_now=True)
sys.exit(exitcodes.UNKNOWN_ERROR)
aconf.gex_test = argument.gex_test
aconf.gex_test = dh_gex
if argument.lookup is not None:
aconf.lookup = argument.lookup
if int(argument.dheat) > 0:
aconf.dheat = argument.dheat
if argument.make_policy is not None:
aconf.make_policy = True
aconf.policy_file = argument.make_policy
aconf.skip_rate_test = argument.skip_rate_test
if argument.policy is not None:
aconf.policy_file = argument.policy
if int(argument.conn_rate_test) > 0:
aconf.conn_rate_test = argument.conn_rate_test
if argument.targets is not None:
aconf.target_file = argument.targets
if argument.threads is not None:
aconf.threads = argument.threads
if argument.timeout is not None:
aconf.timeout = float(argument.timeout)
aconf.timeout_set = True
if argument.verbose is True:
aconf.verbose = True
out.verbose = True
except argparse.ArgumentError as err:
usage_cb(out, str(err))
out.fail(str(err), write_now=True)
parser.print_help()
sys.exit(exitcodes.UNKNOWN_ERROR)
if argument.host is None and argument.client_audit is None and argument.targets is None and argument.list_policies is None and argument.lookup is None and argument.manual is None:
usage_cb(out)
if argument.host == "" and argument.client_audit is False and argument.targets is None and argument.list_policies is False and argument.lookup is None and argument.manual is False:
out.fail("target host must be specified, unless -c, -m, -L, -T, or --lookup are used", write_now=True)
sys.exit(exitcodes.UNKNOWN_ERROR)
if aconf.manual:
return aconf
if aconf.lookup != '':
if aconf.lookup != "":
return aconf
if aconf.list_policies:
list_policies(out, aconf.verbose)
sys.exit(exitcodes.GOOD)
if aconf.client_audit is None and aconf.target_file is None:
if aconf.client_audit is False and aconf.target_file is None:
if oport is not None:
host = argument.host
port = argument.port
else:
host, port = Utils.parse_host_and_port(argument.host)
if argument.host is None and aconf.target_file is None:
usage_cb(out, 'host is empty')
if not host and aconf.target_file is None:
out.fail("target host is not specified", write_now=True)
sys.exit(exitcodes.UNKNOWN_ERROR)
if aconf.client_audit is True: # The default port to listen on during a client audit is 2222.
if oport is None and aconf.client_audit: # The default port to listen on during a client audit is 2222.
port = 2222
if argument.port != 22:
port = Utils.parse_int(argument.port)
if port <= 0 or port > 65535:
usage_cb(out, 'port {} is not valid'.format(argument.port))
if oport is not None:
port = Utils.parse_int(oport)
if port < 1 or port > 65535:
out.fail("port must be greater than 0 and less than 65535: {}".format(oport), write_now=True)
sys.exit(exitcodes.UNKNOWN_ERROR)
aconf.host = host
aconf.port = port
if not (aconf.ssh1 or aconf.ssh2):
aconf.ssh1, aconf.ssh2 = True, True
@ -1040,20 +979,17 @@ def process_commandline(out: OutputBuffer, args: List[str], usage_cb: Callable[.
try:
aconf.policy = Policy(policy_file=aconf.policy_file, json_output=aconf.json)
except Exception as e:
out.fail("Error while loading policy file: %s: %s" % (str(e), traceback.format_exc()))
out.write()
out.fail("Error while loading policy file: %s: %s" % (str(e), traceback.format_exc()), write_now=True)
sys.exit(exitcodes.UNKNOWN_ERROR)
# If the user wants to do a client audit, but provided a server policy, terminate.
if aconf.client_audit and aconf.policy.is_server_policy():
out.fail("Error: client audit selected, but server policy provided.")
out.write()
out.fail("Error: client audit selected, but server policy provided.", write_now=True)
sys.exit(exitcodes.UNKNOWN_ERROR)
# If the user wants to do a server audit, but provided a client policy, terminate.
if aconf.client_audit is False and aconf.policy.is_server_policy() is False:
out.fail("Error: server audit selected, but client policy provided.")
out.write()
out.fail("Error: server audit selected, but client policy provided.", write_now=True)
sys.exit(exitcodes.UNKNOWN_ERROR)
return aconf
@ -1543,7 +1479,7 @@ def run_gex_granular_modulus_size_test(out: OutputBuffer, s: 'SSH_Socket', kex:
def main() -> int:
out = OutputBuffer()
aconf = process_commandline(out, sys.argv[1:], usage)
aconf = process_commandline(out, sys.argv[1:])
# If we're on Windows, but the colorama module could not be imported, print a warning if we're in verbose mode.
if (sys.platform == 'win32') and ('colorama' not in sys.modules):

View File

@ -129,7 +129,7 @@ class Utils:
return -1.0
@staticmethod
def parse_host_and_port(host_and_port: str, default_port: int = 0) -> Tuple[str, int]:
def parse_host_and_port(host_and_port: str, default_port: int = 22) -> Tuple[str, int]:
'''Parses a string into a tuple of its host and port. The port is 0 if not specified.'''
host = host_and_port
port = default_port

View File

@ -8,7 +8,6 @@ class TestAuditConf:
def init(self, ssh_audit):
self.AuditConf = ssh_audit.AuditConf
self.OutputBuffer = ssh_audit.OutputBuffer()
self.usage = ssh_audit.usage
self.process_commandline = process_commandline
@staticmethod
@ -107,7 +106,7 @@ class TestAuditConf:
def test_audit_conf_process_commandline(self):
# pylint: disable=too-many-statements
c = lambda x: self.process_commandline(self.OutputBuffer, x.split(), self.usage) # noqa
c = lambda x: self.process_commandline(self.OutputBuffer, x.split()) # noqa
with pytest.raises(SystemExit):
conf = c('')
with pytest.raises(SystemExit):