Argparse v3 - RC1 (#304)

* Argparse v3 - RC1

* Argparse v3 - RC1

Argparse v3 RC1 - post feedback

Argparse v3 - RC2
This commit is contained in:
oam7575 2024-11-23 04:26:20 +11:00 committed by GitHub
parent 99c64787d9
commit 45abc3aaf4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -23,9 +23,9 @@
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE. THE SOFTWARE.
""" """
import argparse
import concurrent.futures import concurrent.futures
import copy import copy
import getopt # pylint: disable=deprecated-module
import json import json
import multiprocessing import multiprocessing
import os import os
@ -33,6 +33,7 @@ import re
import sys import sys
import traceback import traceback
# pylint: disable=unused-import # pylint: disable=unused-import
from typing import Dict, List, Set, Sequence, Tuple, Iterable # noqa: F401 from typing import Dict, List, Set, Sequence, Tuple, Iterable # noqa: F401
from typing import cast, Callable, Optional, Union, Any # noqa: F401 from typing import cast, Callable, Optional, Union, Any # noqa: F401
@ -89,7 +90,7 @@ def usage(uout: OutputBuffer, err: Optional[str] = None) -> None:
if err is not None and len(err) > 0: if err is not None and len(err) > 0:
uout.fail(err + '\n') uout.fail(err + '\n')
retval = exitcodes.UNKNOWN_ERROR retval = exitcodes.UNKNOWN_ERROR
uout.info('usage: {0} [options] <host>\n'.format(p)) uout.info('usage: {0} [options] -ip <host>\n'.format(p))
uout.info(' -h, --help print this help') uout.info(' -h, --help print this help')
uout.info(' -1, --ssh1 force ssh version 1 only') uout.info(' -1, --ssh1 force ssh version 1 only')
uout.info(' -2, --ssh2 force ssh version 2 only') uout.info(' -2, --ssh2 force ssh version 2 only')
@ -116,6 +117,8 @@ def usage(uout: OutputBuffer, err: Optional[str] = None) -> None:
uout.info(' -g, --gex-test=<x[,y,...]> dh gex modulus size test') uout.info(' -g, --gex-test=<x[,y,...]> dh gex modulus size test')
uout.info(' <min1:pref1:max1[,min2:pref2:max2,...]>') uout.info(' <min1:pref1:max1[,min2:pref2:max2,...]>')
uout.info(' <x-y[:step]>') uout.info(' <x-y[:step]>')
uout.info(' --hostname hostname of target to scan')
uout.info(' -ip, --ip-address ip address of target to scan')
uout.info(' -j, --json JSON output (use -jj to enable indents)') uout.info(' -j, --json JSON output (use -jj to enable indents)')
uout.info(' -l, --level=<level> minimum output level (info|warn|fail)') uout.info(' -l, --level=<level> minimum output level (info|warn|fail)')
uout.info(' -L, --list-policies list all the official, built-in policies. Use with -v') uout.info(' -L, --list-policies list all the official, built-in policies. Use with -v')
@ -836,82 +839,122 @@ def process_commandline(out: OutputBuffer, args: List[str], usage_cb: Callable[.
aconf.colors = enable_colors aconf.colors = enable_colors
out.use_colors = enable_colors out.use_colors = enable_colors
try:
sopts = 'h1246M:p:P:jbcnvl:t:T:Lmdg:'
lopts = ['help', 'ssh1', 'ssh2', 'ipv4', 'ipv6', 'make-policy=', 'port=', 'policy=', 'json', 'batch', 'client-audit', 'no-colors', 'verbose', 'level=', 'timeout=', 'targets=', 'list-policies', 'lookup=', 'threads=', 'manual', 'debug', 'gex-test=', 'dheat=', 'skip-rate-test', 'conn-rate-test=']
opts, args = getopt.gnu_getopt(args, sopts, lopts)
except getopt.GetoptError as err:
usage_cb(out, str(err))
aconf.ssh1, aconf.ssh2 = False, False aconf.ssh1, aconf.ssh2 = False, False
host: str = '' host: str = ''
oport: Optional[str] = None
port: int = 0 parser = argparse.ArgumentParser(prog='SSH Audit Tool', description='SSH Audit Tool', add_help=False, allow_abbrev=False)
for o, a in opts:
if o in ('-h', '--help'): # Add short options to the parser
parser.add_argument('-1', '--ssh1', action="store_true", dest='ssh1', default=None)
parser.add_argument('-2', '--ssh2', action="store_true", dest='ssh2', default=None)
parser.add_argument('-4', '--ipv4', action="store_true", dest='ipv4', default=None)
parser.add_argument('-6', '--ipv6', action="store_true", dest='ipv6', default=None)
parser.add_argument('-b', '--batch', action="store_true", dest='batch', default=None)
parser.add_argument('-c', '--client-audit', action="store_true", dest='client_audit', default=None)
parser.add_argument('-d', '--debug', action="store_true", dest='debug', default=None)
parser.add_argument('-g', '--gex-test', action="store", dest='gex_test', default=None)
parser.add_argument('-h', '--help', action="store_true", dest='help', default=None)
parser.add_argument('-ip', '--ip-address', '--hostname', action="store", dest='host', type=str)
parser.add_argument('-j', '--json', action="store_true", dest='json', default=None)
parser.add_argument('-jj', '--json-indent', action="store_true", dest='json_indent', default=None)
parser.add_argument('-l', '--level', action="store", dest='level', type=str, default='info')
parser.add_argument('-L', '--list-policies', action="store_true", dest='list_policies', default=None)
parser.add_argument('-M', '--make-policy', action="store", dest='make_policy', default=None)
parser.add_argument('-m', '--manual', action="store_true", dest='manual', default=None)
parser.add_argument('-n', '--no-colors', action="store_true", dest='no_colors', default=None)
parser.add_argument('-P', '--policy', action="store", dest='policy', default=None)
parser.add_argument('-p', '--port', action="store", dest='port', default='22', type=int)
parser.add_argument('-T', '--targets', action="store", dest='targets', default=None)
parser.add_argument('-t', '--timeout', action="store", dest='timeout', default='5', type=int)
parser.add_argument('-v', '--verbose', action="store_true", dest='verbose', default=None)
# Add long options to the parser
parser.add_argument('--conn-rate-test', action="store", dest='conn_rate_test', default='0', type=int)
parser.add_argument('--dheat', action="store", dest='dheat', default='0', type=int)
parser.add_argument('--lookup', action="store", dest='lookup', default=None)
parser.add_argument('--skip-rate-test', action="store_true", dest='skip_rate_test', default=None)
parser.add_argument('--threads', action="store", dest='threads', default='32', type=int)
try:
argument = parser.parse_args()
if argument.help is True:
usage_cb(out) usage_cb(out)
elif o in ('-1', '--ssh1'):
aconf.ssh1 = True aconf.host = argument.host
elif o in ('-2', '--ssh2'): host = argument.host
aconf.ssh2 = True port = argument.port
elif o in ('-4', '--ipv4'): aconf.ssh1 = argument.ssh1
aconf.ipv4 = True aconf.ssh2 = argument.ssh2
elif o in ('-6', '--ipv6'): aconf.ipv4 = argument.ipv4
aconf.ipv6 = True aconf.ipv6 = argument.ipv6
elif o in ('-p', '--port'):
oport = a aconf.json = argument.json
elif o in ('-b', '--batch'): if argument.json_indent is True:
setattr(argument, 'json', True)
aconf.json = argument.json
aconf.json_print_indent = argument.json_indent
if argument.batch is True:
aconf.batch = True aconf.batch = True
aconf.verbose = True aconf.verbose = True
elif o in ('-c', '--client-audit'):
aconf.client_audit = True aconf.client_audit = argument.client_audit
elif o in ('-j', '--json'):
if aconf.json: # If specified twice, enable indent printing. ttime = argument.timeout
aconf.json_print_indent = True if ttime != 5:
else: aconf.timeout = float(argument.timeout)
aconf.json = True aconf.timeout_set = True
elif o in ('-v', '--verbose'):
if argument.verbose is True:
aconf.verbose = True aconf.verbose = True
out.verbose = True out.verbose = True
elif o in ('-l', '--level'):
if a not in ('info', 'warn', 'fail'):
usage_cb(out, 'level {} is not valid'.format(a))
aconf.level = a
elif o in ('-t', '--timeout'):
aconf.timeout = float(a)
aconf.timeout_set = True
elif o in ('-M', '--make-policy'):
aconf.make_policy = True
aconf.policy_file = a
elif o in ('-P', '--policy'):
aconf.policy_file = a
elif o in ('-T', '--targets'):
aconf.target_file = a
# If we're on Windows, and we can't use the idna workaround, force only one thread to be used (otherwise a crash would occur). # Get error level regex
# if no_idna_workaround: err_level = argument.level
# print("\nWARNING: the idna module was not found on this system, thus only single-threaded scanning will be done (this is a workaround for this Windows-specific crash: https://github.com/python/cpython/issues/73474). Multi-threaded scanning can be enabled by installing the idna module (pip install idna).\n") if err_level in ["info", "warn", "fail"]:
# aconf.threads = 1 aconf.level = str(argument.level)
elif o == '--threads': else:
aconf.threads = int(a) usage_cb(out, 'Error level : {} is not valid'.format(err_level))
# if no_idna_workaround:
# aconf.threads = 1 if getattr(argument, 'make_policy') is True:
elif o in ('-L', '--list-policies'): aconf.make_policy = True
aconf.policy_file = argument.make_policy
if getattr(argument, 'policy') is True:
aconf.policy_file = argument.policy
if getattr(argument, 'targets') is True:
aconf.target_file = argument.targets
if argument.threads != 32:
aconf.threads = argument.threads
if getattr(argument, 'list_policies') is True:
aconf.list_policies = True aconf.list_policies = True
elif o == '--lookup':
aconf.lookup = a if getattr(argument, 'lookup') is True:
elif o in ('-m', '--manual'): aconf.lookup = argument.lookup
if getattr(argument, 'manual') is True:
aconf.manual = True aconf.manual = True
elif o in ('-d', '--debug'): else:
aconf.manual = False
if argument.debug is True:
aconf.debug = True aconf.debug = True
out.debug = True out.debug = True
elif o in ('-g', '--gex-test'):
if getattr(argument, 'gex_test') is True:
dh_gex = argument.gex_test
permitted_syntax = get_permitted_syntax_for_gex_test() permitted_syntax = get_permitted_syntax_for_gex_test()
if not any(re.search(regex_str, a) for regex_str in permitted_syntax.values()): if not any(re.search(regex_str, dh_gex) for regex_str in permitted_syntax.values()):
usage_cb(out, '{} {} is not valid'.format(o, a)) usage_cb(out, '{} is not valid'.format(dh_gex))
if re.search(permitted_syntax['RANGE'], a): if re.search(permitted_syntax['RANGE'], dh_gex):
extracted_digits = re.findall(r'\d+', a) extracted_digits = re.findall(r'\d+', dh_gex)
bits_left_bound = int(extracted_digits[0]) bits_left_bound = int(extracted_digits[0])
bits_right_bound = int(extracted_digits[1]) bits_right_bound = int(extracted_digits[1])
@ -920,21 +963,26 @@ def process_commandline(out: OutputBuffer, args: List[str], usage_cb: Callable[.
bits_step = int(extracted_digits[2]) bits_step = int(extracted_digits[2])
if bits_step <= 0: if bits_step <= 0:
usage_cb(out, '{} {} is not valid'.format(o, bits_step)) usage_cb(out, '{} {} is not valid'.format(dh_gex, bits_step))
if all(x < 0 for x in (bits_left_bound, bits_right_bound)): if all(x < 0 for x in (bits_left_bound, bits_right_bound)):
usage_cb(out, '{} {} {} is not valid'.format(o, bits_left_bound, bits_right_bound)) usage_cb(out, '{} {} {} is not valid'.format(dh_gex, bits_left_bound, bits_right_bound))
aconf.gex_test = a aconf.gex_test = argument.gex_test
elif o == '--dheat':
aconf.dheat = a
elif o == '--skip-rate-test':
aconf.skip_rate_test = True
elif o == '--conn-rate-test':
aconf.conn_rate_test = a
if len(args) == 0 and aconf.client_audit is False and aconf.target_file is None and aconf.list_policies is False and aconf.lookup == '' and aconf.manual is False: if int(argument.dheat) > 0:
aconf.dheat = argument.dheat
aconf.skip_rate_test = argument.skip_rate_test
if int(argument.conn_rate_test) > 0:
aconf.conn_rate_test = argument.conn_rate_test
except argparse.ArgumentError as err:
usage_cb(out, str(err))
if argument.host is None and argument.client_audit is None and argument.targets is None and argument.list_policies is None and argument.lookup is None and argument.manual is None:
usage_cb(out) usage_cb(out)
if aconf.manual: if aconf.manual:
@ -947,24 +995,20 @@ def process_commandline(out: OutputBuffer, args: List[str], usage_cb: Callable[.
list_policies(out, aconf.verbose) list_policies(out, aconf.verbose)
sys.exit(exitcodes.GOOD) sys.exit(exitcodes.GOOD)
if aconf.client_audit is False and aconf.target_file is None: if aconf.client_audit is None and aconf.target_file is None:
if oport is not None: host = argument.host
host = args[0] port = argument.port
else:
host, port = Utils.parse_host_and_port(args[0]) if argument.host is None and aconf.target_file is None:
if not host and aconf.target_file is None:
usage_cb(out, 'host is empty') usage_cb(out, 'host is empty')
if port == 0 and oport is None: if aconf.client_audit is True: # The default port to listen on during a client audit is 2222.
if aconf.client_audit: # The default port to listen on during a client audit is 2222.
port = 2222 port = 2222
else:
port = 22
if oport is not None: if argument.port != 22:
port = Utils.parse_int(oport) port = Utils.parse_int(argument.port)
if port <= 0 or port > 65535: if port <= 0 or port > 65535:
usage_cb(out, 'port {} is not valid'.format(oport)) usage_cb(out, 'port {} is not valid'.format(argument.port))
aconf.host = host aconf.host = host
aconf.port = port aconf.port = port