2 Commits

Author SHA1 Message Date
Joe Testa a01baadfa8 Additional cleanups after merging #304. 2024-11-22 12:28:02 -05:00
oam7575 45abc3aaf4 Argparse v3 - RC1 (#304)
* Argparse v3 - RC1

* Argparse v3 - RC1

Argparse v3 RC1 - post feedback

Argparse v3 - RC2
2024-11-22 12:26:20 -05:00
5 changed files with 196 additions and 215 deletions
+52 -52
View File
@@ -41,64 +41,61 @@
## Usage ## Usage
``` ```
usage: ssh-audit.py [options] <host> usage: ssh-audit.py [-h] [-1] [-2] [-4] [-6] [-b] [-c] [-d]
[-g <min1:pref1:max1[,min2:pref2:max2,...]> / <x-y[:step]>] [-j] [-l {info,warn,fail}] [-L]
[-M custom_policy.txt] [-m] [-n] [-P "Built-In Policy Name" / custom_policy.txt] [-p N]
[-T targets.txt] [-t N] [-v] [--conn-rate-test N[:max_rate]] [--dheat N[:kex[:e_len]]]
[--lookup alg1[,alg2,...]] [--skip-rate-test] [--threads N]
[host]
-h, --help print this help positional arguments:
host target hostname or IPv4/IPv6 address
optional arguments:
-h, --help show this help message and exit
-1, --ssh1 force ssh version 1 only -1, --ssh1 force ssh version 1 only
-2, --ssh2 force ssh version 2 only -2, --ssh2 force ssh version 2 only
-4, --ipv4 enable IPv4 (order of precedence) -4, --ipv4 enable IPv4 (order of precedence)
-6, --ipv6 enable IPv6 (order of precedence) -6, --ipv6 enable IPv6 (order of precedence)
-b, --batch batch output -b, --batch batch output
-c, --client-audit starts a server on port 2222 to audit client -c, --client-audit starts a server on port 2222 to audit client software config (use -p to change port; use -t
software config (use -p to change port; to change timeout)
use -t to change timeout) -d, --debug enable debugging output
--conn-rate-test=N[:max_rate] perform a connection rate test (useful -g <min1:pref1:max1[,min2:pref2:max2,...]> / <x-y[:step]>, --gex-test <min1:pref1:max1[,min2:pref2:max2,...]> / <x-y[:step]>
for collecting metrics related to conducts a very customized Diffie-Hellman GEX modulus size test. Tests an array of minimum,
susceptibility of the DHEat vuln). preferred, and maximum values, or a range of values with an optional incremental step amount
Testing is conducted with N concurrent -j, --json enable JSON output (use -jj to enable indentation for better readability)
sockets with an optional maximum rate -l {info,warn,fail}, --level {info,warn,fail}
of connections per second. minimum output level (default: info)
-d, --debug Enable debug output. -L, --list-policies list all the official, built-in policies. Combine with -v to view policy change logs
--dheat=N[:kex[:e_len]] continuously perform the DHEat DoS attack -M custom_policy.txt, --make-policy custom_policy.txt
(CVE-2002-20001) against the target using N creates a policy based on the target server (i.e.: the target server has the ideal
concurrent sockets. Optionally, a specific configuration that other servers should adhere to), and stores it in the file path specified
key exchange algorithm can be specified -m, --manual print the man page (Docker, PyPI, Snap, and Windows builds only)
instead of allowing it to be automatically -n, --no-colors disable colors (automatic when the NO_COLOR environment variable is set)
chosen. Additionally, a small length of -P "Built-In Policy Name" / custom_policy.txt, --policy "Built-In Policy Name" / custom_policy.txt
the fake e value sent to the server can run a policy test using the specified policy (use -L to see built-in policies, or specify
be chosen for a more efficient attack (such filesystem path to custom policy created by -M)
as 4). -p N, --port N the TCP port to connect to (or to listen on when -c is used)
-g, --gex-test=<x[,y,...]> dh gex modulus size test -T targets.txt, --targets targets.txt
<min1:pref1:max1[,min2:pref2:max2,...]> a file containing a list of target hosts (one per line, format HOST[:PORT]). Use -p/--port
<x-y[:step]> to set the default port for all hosts. Use --threads to control concurrent scans
-j, --json JSON output (use -jj to enable indents) -t N, --timeout N timeout (in seconds) for connection and reading (default: 5)
-l, --level=<level> minimum output level (info|warn|fail) -v, --verbose enable verbose output
-L, --list-policies list all the official, built-in policies. Use with -v --conn-rate-test N[:max_rate]
to view policy change logs. perform a connection rate test (useful for collecting metrics related to susceptibility of
--lookup=<alg1,alg2,...> looks up an algorithm(s) without the DHEat vuln). Testing is conducted with N concurrent sockets with an optional maximum
connecting to a server rate of connections per second
-m, --manual print the man page (Docker, PyPI, Snap, and Windows --dheat N[:kex[:e_len]]
builds only) continuously perform the DHEat DoS attack (CVE-2002-20001) against the target using N
-M, --make-policy=<policy.txt> creates a policy based on the target server concurrent sockets. Optionally, a specific key exchange algorithm can be specified instead
(i.e.: the target server has the ideal of allowing it to be automatically chosen. Additionally, a small length of the fake e value
configuration that other servers should sent to the server can be chosen for a more efficient attack (such as 4).
adhere to) --lookup alg1[,alg2,...]
-n, --no-colors disable colors looks up an algorithm(s) without connecting to a server.
-p, --port=<port> port to connect --skip-rate-test skip the connection rate test during standard audits (used to safely infer whether the DHEat
-P, --policy=<"policy name" | policy.txt> run a policy test using the attack is viable)
specified policy --threads N number of threads to use when scanning multiple targets (-T/--targets) (default: 32)
--skip-rate-test skip the connection rate test during standard audits
(used to safely infer whether the DHEat attack
is viable)
-t, --timeout=<secs> timeout (in seconds) for connection and reading
(default: 5)
-T, --targets=<hosts.txt> a file containing a list of target hosts (one
per line, format HOST[:PORT]). Use -p/--port
to set the default port for all hosts. Use
--threads to control concurrent scans.
--threads=<threads> number of threads to use when scanning multiple
targets (-T/--targets) (default: 32)
-v, --verbose verbose output
``` ```
* if both IPv4 and IPv6 are used, order of precedence can be set by using either `-46` or `-64`. * if both IPv4 and IPv6 are used, order of precedence can be set by using either `-46` or `-64`.
* batch flag `-b` will output sections without header and without empty lines (implies verbose flag). * batch flag `-b` will output sections without header and without empty lines (implies verbose flag).
@@ -219,6 +216,9 @@ For convenience, a web front-end on top of the command-line tool is available at
## ChangeLog ## ChangeLog
### v3.4.0-dev
- Migrated from deprecated `getopt` module to `argparse`; partial credit [oam7575](https://github.com/oam7575).
### v3.3.0 (2024-10-15) ### v3.3.0 (2024-10-15)
- Added Python 3.13 support. - Added Python 3.13 support.
- Added built-in policies for Ubuntu 24.04 LTS server & client, OpenSSH 9.8, and OpenSSH 9.9. - Added built-in policies for Ubuntu 24.04 LTS server & client, OpenSSH 9.8, and OpenSSH 9.9.
+3 -1
View File
@@ -145,8 +145,10 @@ class OutputBuffer:
self._print('head', s, line_ended) self._print('head', s, line_ended)
return self return self
def fail(self, s: str, line_ended: bool = True) -> 'OutputBuffer': def fail(self, s: str, line_ended: bool = True, write_now: bool = False) -> 'OutputBuffer':
self._print('fail', s, line_ended) self._print('fail', s, line_ended)
if write_now:
self.write()
return self return self
def warn(self, s: str, line_ended: bool = True) -> 'OutputBuffer': def warn(self, s: str, line_ended: bool = True) -> 'OutputBuffer':
+133 -153
View File
@@ -23,9 +23,9 @@
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE. THE SOFTWARE.
""" """
import argparse
import concurrent.futures import concurrent.futures
import copy import copy
import getopt # pylint: disable=deprecated-module
import json import json
import multiprocessing import multiprocessing
import os import os
@@ -33,6 +33,7 @@ import re
import sys import sys
import traceback import traceback
# pylint: disable=unused-import # pylint: disable=unused-import
from typing import Dict, List, Set, Sequence, Tuple, Iterable # noqa: F401 from typing import Dict, List, Set, Sequence, Tuple, Iterable # noqa: F401
from typing import cast, Callable, Optional, Union, Any # noqa: F401 from typing import cast, Callable, Optional, Union, Any # noqa: F401
@@ -82,61 +83,6 @@ if sys.platform == 'win32':
# no_idna_workaround = True # no_idna_workaround = True
def usage(uout: OutputBuffer, err: Optional[str] = None) -> None:
retval = exitcodes.GOOD
p = os.path.basename(sys.argv[0])
uout.head('# {} {}, https://github.com/jtesta/ssh-audit\n'.format(p, VERSION))
if err is not None and len(err) > 0:
uout.fail(err + '\n')
retval = exitcodes.UNKNOWN_ERROR
uout.info('usage: {0} [options] <host>\n'.format(p))
uout.info(' -h, --help print this help')
uout.info(' -1, --ssh1 force ssh version 1 only')
uout.info(' -2, --ssh2 force ssh version 2 only')
uout.info(' -4, --ipv4 enable IPv4 (order of precedence)')
uout.info(' -6, --ipv6 enable IPv6 (order of precedence)')
uout.info(' -b, --batch batch output')
uout.info(' -c, --client-audit starts a server on port 2222 to audit client\n software config (use -p to change port;\n use -t to change timeout)')
uout.info(' --conn-rate-test=N[:max_rate] perform a connection rate test (useful')
uout.info(' for collecting metrics related to')
uout.info(' susceptibility of the DHEat vuln).')
uout.info(' Testing is conducted with N concurrent')
uout.info(' sockets with an optional maximum rate')
uout.info(' of connections per second.')
uout.info(' -d, --debug debug output')
uout.info(' --dheat=N[:kex[:e_len]] continuously perform the DHEat DoS attack')
uout.info(' (CVE-2002-20001) against the target using N')
uout.info(' concurrent sockets. Optionally, a specific')
uout.info(' key exchange algorithm can be specified')
uout.info(' instead of allowing it to be automatically')
uout.info(' chosen. Additionally, a small length of')
uout.info(' the fake e value sent to the server can')
uout.info(' be chosen for a more efficient attack (such')
uout.info(' as 4).')
uout.info(' -g, --gex-test=<x[,y,...]> dh gex modulus size test')
uout.info(' <min1:pref1:max1[,min2:pref2:max2,...]>')
uout.info(' <x-y[:step]>')
uout.info(' -j, --json JSON output (use -jj to enable indents)')
uout.info(' -l, --level=<level> minimum output level (info|warn|fail)')
uout.info(' -L, --list-policies list all the official, built-in policies. Use with -v')
uout.info(' to view policy change logs.')
uout.info(' --lookup=<alg1,alg2,...> looks up an algorithm(s) without\n connecting to a server')
uout.info(' -M, --make-policy=<policy.txt> creates a policy based on the target server\n (i.e.: the target server has the ideal\n configuration that other servers should\n adhere to)')
uout.info(' -m, --manual print the man page (Docker, PyPI, Snap, and Windows\n builds only)')
uout.info(' -n, --no-colors disable colors (automatic when the NO_COLOR')
uout.info(' environment variable is set)')
uout.info(' -p, --port=<port> port to connect')
uout.info(' -P, --policy=<policy.txt> run a policy test using the specified policy')
uout.info(' --skip-rate-test skip the connection rate test during standard audits\n (used to safely infer whether the DHEat attack\n is viable)')
uout.info(' -t, --timeout=<secs> timeout (in seconds) for connection and reading\n (default: 5)')
uout.info(' -T, --targets=<hosts.txt> a file containing a list of target hosts (one\n per line, format HOST[:PORT]). Use -p/--port\n to set the default port for all hosts. Use\n --threads to control concurrent scans.')
uout.info(' --threads=<threads> number of threads to use when scanning multiple\n targets (-T/--targets) (default: 32)')
uout.info(' -v, --verbose verbose output')
uout.sep()
uout.write()
sys.exit(retval)
def output_algorithms(out: OutputBuffer, title: str, alg_db: Dict[str, Dict[str, List[List[Optional[str]]]]], alg_type: str, algorithms: List[str], unknown_algs: List[str], is_json_output: bool, program_retval: int, maxlen: int = 0, host_keys: Optional[Dict[str, Dict[str, Union[bytes, str, int]]]] = None, dh_modulus_sizes: Optional[Dict[str, int]] = None) -> int: # pylint: disable=too-many-arguments def output_algorithms(out: OutputBuffer, title: str, alg_db: Dict[str, Dict[str, List[List[Optional[str]]]]], alg_type: str, algorithms: List[str], unknown_algs: List[str], is_json_output: bool, program_retval: int, maxlen: int = 0, host_keys: Optional[Dict[str, Dict[str, Union[bytes, str, int]]]] = None, dh_modulus_sizes: Optional[Dict[str, int]] = None) -> int: # pylint: disable=too-many-arguments
with out: with out:
for algorithm in algorithms: for algorithm in algorithms:
@@ -371,7 +317,7 @@ def output_recommendations(out: OutputBuffer, algs: Algorithms, algorithm_recomm
notes = " (%s)" % notes notes = " (%s)" % notes
fm = '(rec) {0}{1}{2}-- {3} algorithm to {4}{5} ' fm = '(rec) {0}{1}{2}-- {3} algorithm to {4}{5} '
fn(fm.format(sg, name, p, alg_type, an, notes)) fn(fm.format(sg, name, p, alg_type, an, notes)) # type: ignore[operator]
if not out.is_section_empty() and not is_json_output: if not out.is_section_empty() and not is_json_output:
if software is not None: if software is not None:
@@ -823,7 +769,7 @@ def make_policy(aconf: AuditConf, banner: Optional['Banner'], kex: Optional['SSH
print(err) print(err)
def process_commandline(out: OutputBuffer, args: List[str], usage_cb: Callable[..., None]) -> 'AuditConf': # pylint: disable=too-many-statements def process_commandline(out: OutputBuffer, args: List[str]) -> 'AuditConf': # pylint: disable=too-many-statements
# pylint: disable=too-many-branches # pylint: disable=too-many-branches
aconf = AuditConf() aconf = AuditConf()
@@ -836,82 +782,93 @@ def process_commandline(out: OutputBuffer, args: List[str], usage_cb: Callable[.
aconf.colors = enable_colors aconf.colors = enable_colors
out.use_colors = enable_colors out.use_colors = enable_colors
try:
sopts = 'h1246M:p:P:jbcnvl:t:T:Lmdg:'
lopts = ['help', 'ssh1', 'ssh2', 'ipv4', 'ipv6', 'make-policy=', 'port=', 'policy=', 'json', 'batch', 'client-audit', 'no-colors', 'verbose', 'level=', 'timeout=', 'targets=', 'list-policies', 'lookup=', 'threads=', 'manual', 'debug', 'gex-test=', 'dheat=', 'skip-rate-test', 'conn-rate-test=']
opts, args = getopt.gnu_getopt(args, sopts, lopts)
except getopt.GetoptError as err:
usage_cb(out, str(err))
aconf.ssh1, aconf.ssh2 = False, False aconf.ssh1, aconf.ssh2 = False, False
host: str = '' host: str = ''
oport: Optional[str] = None port: int = 22
port: int = 0
for o, a in opts: parser = argparse.ArgumentParser(description="# {} {}, https://github.com/jtesta/ssh-audit".format(os.path.basename(sys.argv[0]), VERSION), allow_abbrev=False)
if o in ('-h', '--help'):
usage_cb(out) # Add short options to the parser
elif o in ('-1', '--ssh1'): parser.add_argument("-1", "--ssh1", action="store_true", dest="ssh1", default=False, help="force ssh version 1 only")
aconf.ssh1 = True parser.add_argument("-2", "--ssh2", action="store_true", dest="ssh2", default=False, help="force ssh version 2 only")
elif o in ('-2', '--ssh2'): parser.add_argument("-4", "--ipv4", action="store_true", dest="ipv4", default=False, help="enable IPv4 (order of precedence)")
aconf.ssh2 = True parser.add_argument("-6", "--ipv6", action="store_true", dest="ipv6", default=False, help="enable IPv6 (order of precedence)")
elif o in ('-4', '--ipv4'): parser.add_argument("-b", "--batch", action="store_true", dest="batch", default=False, help="batch output")
aconf.ipv4 = True parser.add_argument("-c", "--client-audit", action="store_true", dest="client_audit", default=False, help="starts a server on port 2222 to audit client software config (use -p to change port; use -t to change timeout)")
elif o in ('-6', '--ipv6'): parser.add_argument("-d", "--debug", action="store_true", dest="debug", default=False, help="enable debugging output")
aconf.ipv6 = True parser.add_argument("-g", "--gex-test", action="store", dest="gex_test", metavar="<min1:pref1:max1[,min2:pref2:max2,...]> / <x-y[:step]>", type=str, default=None, help="conducts a very customized Diffie-Hellman GEX modulus size test. Tests an array of minimum, preferred, and maximum values, or a range of values with an optional incremental step amount")
elif o in ('-p', '--port'): parser.add_argument("-j", "--json", action="count", dest="json", default=0, help="enable JSON output (use -jj to enable indentation for better readability)")
oport = a parser.add_argument("-l", "--level", action="store", dest="level", type=str, choices=["info", "warn", "fail"], default="info", help="minimum output level (default: %(default)s)")
elif o in ('-b', '--batch'): parser.add_argument("-L", "--list-policies", action="store_true", dest="list_policies", default=False, help="list all the official, built-in policies. Combine with -v to view policy change logs")
parser.add_argument("-M", "--make-policy", action="store", dest="make_policy", metavar="custom_policy.txt", type=str, default=None, help="creates a policy based on the target server (i.e.: the target server has the ideal configuration that other servers should adhere to), and stores it in the file path specified")
parser.add_argument("-m", "--manual", action="store_true", dest="manual", default=False, help="print the man page (Docker, PyPI, Snap, and Windows builds only)")
parser.add_argument("-n", "--no-colors", action="store_true", dest="no_colors", default=False, help="disable colors (automatic when the NO_COLOR environment variable is set)")
parser.add_argument("-P", "--policy", action="store", dest="policy", metavar="\"Built-In Policy Name\" / custom_policy.txt", type=str, default=None, help="run a policy test using the specified policy (use -L to see built-in policies, or specify filesystem path to custom policy created by -M)")
parser.add_argument("-p", "--port", action="store", dest="oport", metavar="N", type=int, default=None, help="the TCP port to connect to (or to listen on when -c is used)")
parser.add_argument("-T", "--targets", action="store", dest="targets", metavar="targets.txt", type=str, default=None, help="a file containing a list of target hosts (one per line, format HOST[:PORT]). Use -p/--port to set the default port for all hosts. Use --threads to control concurrent scans")
parser.add_argument("-t", "--timeout", action="store", dest="timeout", metavar="N", type=int, default=5, help="timeout (in seconds) for connection and reading (default: %(default)s)")
parser.add_argument("-v", "--verbose", action="store_true", dest="verbose", default=False, help="enable verbose output")
# Add long options to the parser
parser.add_argument("--conn-rate-test", action="store", dest="conn_rate_test", metavar="N[:max_rate]", type=str, default=None, help="perform a connection rate test (useful for collecting metrics related to susceptibility of the DHEat vuln). Testing is conducted with N concurrent sockets with an optional maximum rate of connections per second")
parser.add_argument("--dheat", action="store", dest="dheat", metavar="N[:kex[:e_len]]", type=str, default=None, help="continuously perform the DHEat DoS attack (CVE-2002-20001) against the target using N concurrent sockets. Optionally, a specific key exchange algorithm can be specified instead of allowing it to be automatically chosen. Additionally, a small length of the fake e value sent to the server can be chosen for a more efficient attack (such as 4).")
parser.add_argument("--lookup", action="store", dest="lookup", metavar="alg1[,alg2,...]", type=str, default=None, help="looks up an algorithm(s) without connecting to a server.")
parser.add_argument("--skip-rate-test", action="store_true", dest="skip_rate_test", default=False, help="skip the connection rate test during standard audits (used to safely infer whether the DHEat attack is viable)")
parser.add_argument("--threads", action="store", dest="threads", metavar="N", type=int, default=32, help="number of threads to use when scanning multiple targets (-T/--targets) (default: %(default)s)")
# The mandatory target option. Or rather, mandatory when -L, -T, or --lookup are not used.
parser.add_argument("host", nargs="?", action="store", type=str, default="", help="target hostname or IPv4/IPv6 address")
# If no arguments were given, print the help and exit.
if len(args) < 1:
parser.print_help()
sys.exit(exitcodes.UNKNOWN_ERROR)
oport: Optional[int] = None
try:
argument = parser.parse_args(args=args)
# Set simple flags.
aconf.client_audit = argument.client_audit
aconf.ipv4 = argument.ipv4
aconf.ipv6 = argument.ipv6
aconf.level = argument.level
aconf.list_policies = argument.list_policies
aconf.manual = argument.manual
aconf.skip_rate_test = argument.skip_rate_test
aconf.ssh1 = argument.ssh1
aconf.ssh2 = argument.ssh2
oport = argument.oport
if argument.batch is True:
aconf.batch = True aconf.batch = True
aconf.verbose = True aconf.verbose = True
elif o in ('-c', '--client-audit'):
aconf.client_audit = True
elif o in ('-j', '--json'):
if aconf.json: # If specified twice, enable indent printing.
aconf.json_print_indent = True
else:
aconf.json = True
elif o in ('-v', '--verbose'):
aconf.verbose = True
out.verbose = True
elif o in ('-l', '--level'):
if a not in ('info', 'warn', 'fail'):
usage_cb(out, 'level {} is not valid'.format(a))
aconf.level = a
elif o in ('-t', '--timeout'):
aconf.timeout = float(a)
aconf.timeout_set = True
elif o in ('-M', '--make-policy'):
aconf.make_policy = True
aconf.policy_file = a
elif o in ('-P', '--policy'):
aconf.policy_file = a
elif o in ('-T', '--targets'):
aconf.target_file = a
# If we're on Windows, and we can't use the idna workaround, force only one thread to be used (otherwise a crash would occur). # If one -j was given, turn on JSON output. If -jj was given, enable indentation.
# if no_idna_workaround: aconf.json = argument.json > 0
# print("\nWARNING: the idna module was not found on this system, thus only single-threaded scanning will be done (this is a workaround for this Windows-specific crash: https://github.com/python/cpython/issues/73474). Multi-threaded scanning can be enabled by installing the idna module (pip install idna).\n") if argument.json > 1:
# aconf.threads = 1 aconf.json_print_indent = True
elif o == '--threads':
aconf.threads = int(a) if argument.conn_rate_test is not None:
# if no_idna_workaround: aconf.conn_rate_test = argument.conn_rate_test
# aconf.threads = 1
elif o in ('-L', '--list-policies'): if argument.debug is True:
aconf.list_policies = True
elif o == '--lookup':
aconf.lookup = a
elif o in ('-m', '--manual'):
aconf.manual = True
elif o in ('-d', '--debug'):
aconf.debug = True aconf.debug = True
out.debug = True out.debug = True
elif o in ('-g', '--gex-test'):
if argument.dheat is not None:
aconf.dheat = argument.dheat
if argument.gex_test is not None:
dh_gex = argument.gex_test
permitted_syntax = get_permitted_syntax_for_gex_test() permitted_syntax = get_permitted_syntax_for_gex_test()
if not any(re.search(regex_str, a) for regex_str in permitted_syntax.values()): if not any(re.search(regex_str, dh_gex) for regex_str in permitted_syntax.values()):
usage_cb(out, '{} {} is not valid'.format(o, a)) out.fail('{} is not valid'.format(dh_gex), write_now=True)
sys.exit(exitcodes.UNKNOWN_ERROR)
if re.search(permitted_syntax['RANGE'], a): if re.search(permitted_syntax['RANGE'], dh_gex):
extracted_digits = re.findall(r'\d+', a) extracted_digits = re.findall(r'\d+', dh_gex)
bits_left_bound = int(extracted_digits[0]) bits_left_bound = int(extracted_digits[0])
bits_right_bound = int(extracted_digits[1]) bits_right_bound = int(extracted_digits[1])
@@ -920,27 +877,52 @@ def process_commandline(out: OutputBuffer, args: List[str], usage_cb: Callable[.
bits_step = int(extracted_digits[2]) bits_step = int(extracted_digits[2])
if bits_step <= 0: if bits_step <= 0:
usage_cb(out, '{} {} is not valid'.format(o, bits_step)) out.fail('the step field cannot be 0 or less: {}'.format(bits_step), write_now=True)
sys.exit(exitcodes.UNKNOWN_ERROR)
if all(x < 0 for x in (bits_left_bound, bits_right_bound)): if all(x < 0 for x in (bits_left_bound, bits_right_bound)):
usage_cb(out, '{} {} {} is not valid'.format(o, bits_left_bound, bits_right_bound)) out.fail('{} {} {} is not valid'.format(dh_gex, bits_left_bound, bits_right_bound), write_now=True)
sys.exit(exitcodes.UNKNOWN_ERROR)
aconf.gex_test = a aconf.gex_test = dh_gex
elif o == '--dheat':
aconf.dheat = a
elif o == '--skip-rate-test':
aconf.skip_rate_test = True
elif o == '--conn-rate-test':
aconf.conn_rate_test = a
if argument.lookup is not None:
aconf.lookup = argument.lookup
if len(args) == 0 and aconf.client_audit is False and aconf.target_file is None and aconf.list_policies is False and aconf.lookup == '' and aconf.manual is False: if argument.make_policy is not None:
usage_cb(out) aconf.make_policy = True
aconf.policy_file = argument.make_policy
if argument.policy is not None:
aconf.policy_file = argument.policy
if argument.targets is not None:
aconf.target_file = argument.targets
if argument.threads is not None:
aconf.threads = argument.threads
if argument.timeout is not None:
aconf.timeout = float(argument.timeout)
aconf.timeout_set = True
if argument.verbose is True:
aconf.verbose = True
out.verbose = True
except argparse.ArgumentError as err:
out.fail(str(err), write_now=True)
parser.print_help()
sys.exit(exitcodes.UNKNOWN_ERROR)
if argument.host == "" and argument.client_audit is False and argument.targets is None and argument.list_policies is False and argument.lookup is None and argument.manual is False:
out.fail("target host must be specified, unless -c, -m, -L, -T, or --lookup are used", write_now=True)
sys.exit(exitcodes.UNKNOWN_ERROR)
if aconf.manual: if aconf.manual:
return aconf return aconf
if aconf.lookup != '': if aconf.lookup != "":
return aconf return aconf
if aconf.list_policies: if aconf.list_policies:
@@ -949,25 +931,26 @@ def process_commandline(out: OutputBuffer, args: List[str], usage_cb: Callable[.
if aconf.client_audit is False and aconf.target_file is None: if aconf.client_audit is False and aconf.target_file is None:
if oport is not None: if oport is not None:
host = args[0] host = argument.host
else: else:
host, port = Utils.parse_host_and_port(args[0]) host, port = Utils.parse_host_and_port(argument.host)
if not host and aconf.target_file is None:
usage_cb(out, 'host is empty')
if port == 0 and oport is None: if not host and aconf.target_file is None:
if aconf.client_audit: # The default port to listen on during a client audit is 2222. out.fail("target host is not specified", write_now=True)
sys.exit(exitcodes.UNKNOWN_ERROR)
if oport is None and aconf.client_audit: # The default port to listen on during a client audit is 2222.
port = 2222 port = 2222
else:
port = 22
if oport is not None: if oport is not None:
port = Utils.parse_int(oport) port = Utils.parse_int(oport)
if port <= 0 or port > 65535: if port < 1 or port > 65535:
usage_cb(out, 'port {} is not valid'.format(oport)) out.fail("port must be greater than 0 and less than 65535: {}".format(oport), write_now=True)
sys.exit(exitcodes.UNKNOWN_ERROR)
aconf.host = host aconf.host = host
aconf.port = port aconf.port = port
if not (aconf.ssh1 or aconf.ssh2): if not (aconf.ssh1 or aconf.ssh2):
aconf.ssh1, aconf.ssh2 = True, True aconf.ssh1, aconf.ssh2 = True, True
@@ -996,20 +979,17 @@ def process_commandline(out: OutputBuffer, args: List[str], usage_cb: Callable[.
try: try:
aconf.policy = Policy(policy_file=aconf.policy_file, json_output=aconf.json) aconf.policy = Policy(policy_file=aconf.policy_file, json_output=aconf.json)
except Exception as e: except Exception as e:
out.fail("Error while loading policy file: %s: %s" % (str(e), traceback.format_exc())) out.fail("Error while loading policy file: %s: %s" % (str(e), traceback.format_exc()), write_now=True)
out.write()
sys.exit(exitcodes.UNKNOWN_ERROR) sys.exit(exitcodes.UNKNOWN_ERROR)
# If the user wants to do a client audit, but provided a server policy, terminate. # If the user wants to do a client audit, but provided a server policy, terminate.
if aconf.client_audit and aconf.policy.is_server_policy(): if aconf.client_audit and aconf.policy.is_server_policy():
out.fail("Error: client audit selected, but server policy provided.") out.fail("Error: client audit selected, but server policy provided.", write_now=True)
out.write()
sys.exit(exitcodes.UNKNOWN_ERROR) sys.exit(exitcodes.UNKNOWN_ERROR)
# If the user wants to do a server audit, but provided a client policy, terminate. # If the user wants to do a server audit, but provided a client policy, terminate.
if aconf.client_audit is False and aconf.policy.is_server_policy() is False: if aconf.client_audit is False and aconf.policy.is_server_policy() is False:
out.fail("Error: server audit selected, but client policy provided.") out.fail("Error: server audit selected, but client policy provided.", write_now=True)
out.write()
sys.exit(exitcodes.UNKNOWN_ERROR) sys.exit(exitcodes.UNKNOWN_ERROR)
return aconf return aconf
@@ -1499,7 +1479,7 @@ def run_gex_granular_modulus_size_test(out: OutputBuffer, s: 'SSH_Socket', kex:
def main() -> int: def main() -> int:
out = OutputBuffer() out = OutputBuffer()
aconf = process_commandline(out, sys.argv[1:], usage) aconf = process_commandline(out, sys.argv[1:])
# If we're on Windows, but the colorama module could not be imported, print a warning if we're in verbose mode. # If we're on Windows, but the colorama module could not be imported, print a warning if we're in verbose mode.
if (sys.platform == 'win32') and ('colorama' not in sys.modules): if (sys.platform == 'win32') and ('colorama' not in sys.modules):
+1 -1
View File
@@ -129,7 +129,7 @@ class Utils:
return -1.0 return -1.0
@staticmethod @staticmethod
def parse_host_and_port(host_and_port: str, default_port: int = 0) -> Tuple[str, int]: def parse_host_and_port(host_and_port: str, default_port: int = 22) -> Tuple[str, int]:
'''Parses a string into a tuple of its host and port. The port is 0 if not specified.''' '''Parses a string into a tuple of its host and port. The port is 0 if not specified.'''
host = host_and_port host = host_and_port
port = default_port port = default_port
+1 -2
View File
@@ -8,7 +8,6 @@ class TestAuditConf:
def init(self, ssh_audit): def init(self, ssh_audit):
self.AuditConf = ssh_audit.AuditConf self.AuditConf = ssh_audit.AuditConf
self.OutputBuffer = ssh_audit.OutputBuffer() self.OutputBuffer = ssh_audit.OutputBuffer()
self.usage = ssh_audit.usage
self.process_commandline = process_commandline self.process_commandline = process_commandline
@staticmethod @staticmethod
@@ -107,7 +106,7 @@ class TestAuditConf:
def test_audit_conf_process_commandline(self): def test_audit_conf_process_commandline(self):
# pylint: disable=too-many-statements # pylint: disable=too-many-statements
c = lambda x: self.process_commandline(self.OutputBuffer, x.split(), self.usage) # noqa c = lambda x: self.process_commandline(self.OutputBuffer, x.split()) # noqa
with pytest.raises(SystemExit): with pytest.raises(SystemExit):
conf = c('') conf = c('')
with pytest.raises(SystemExit): with pytest.raises(SystemExit):