Argparse v3 - RC1

This commit is contained in:
OAM7575
2024-10-20 10:31:30 +11:00
parent 99c64787d9
commit c5f4ae74ce
+134 -77
View File
@@ -23,9 +23,9 @@
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE. THE SOFTWARE.
""" """
import argparse
import concurrent.futures import concurrent.futures
import copy import copy
import getopt # pylint: disable=deprecated-module
import json import json
import multiprocessing import multiprocessing
import os import os
@@ -33,6 +33,7 @@ import re
import sys import sys
import traceback import traceback
# pylint: disable=unused-import # pylint: disable=unused-import
from typing import Dict, List, Set, Sequence, Tuple, Iterable # noqa: F401 from typing import Dict, List, Set, Sequence, Tuple, Iterable # noqa: F401
from typing import cast, Callable, Optional, Union, Any # noqa: F401 from typing import cast, Callable, Optional, Union, Any # noqa: F401
@@ -89,7 +90,7 @@ def usage(uout: OutputBuffer, err: Optional[str] = None) -> None:
if err is not None and len(err) > 0: if err is not None and len(err) > 0:
uout.fail(err + '\n') uout.fail(err + '\n')
retval = exitcodes.UNKNOWN_ERROR retval = exitcodes.UNKNOWN_ERROR
uout.info('usage: {0} [options] <host>\n'.format(p)) uout.info('usage: {0} [options] -ip <host>\n'.format(p))
uout.info(' -h, --help print this help') uout.info(' -h, --help print this help')
uout.info(' -1, --ssh1 force ssh version 1 only') uout.info(' -1, --ssh1 force ssh version 1 only')
uout.info(' -2, --ssh2 force ssh version 2 only') uout.info(' -2, --ssh2 force ssh version 2 only')
@@ -116,6 +117,8 @@ def usage(uout: OutputBuffer, err: Optional[str] = None) -> None:
uout.info(' -g, --gex-test=<x[,y,...]> dh gex modulus size test') uout.info(' -g, --gex-test=<x[,y,...]> dh gex modulus size test')
uout.info(' <min1:pref1:max1[,min2:pref2:max2,...]>') uout.info(' <min1:pref1:max1[,min2:pref2:max2,...]>')
uout.info(' <x-y[:step]>') uout.info(' <x-y[:step]>')
uout.info(' --hostname hostname of target to scan')
uout.info(' -ip, --ip-address ip address of target to scan')
uout.info(' -j, --json JSON output (use -jj to enable indents)') uout.info(' -j, --json JSON output (use -jj to enable indents)')
uout.info(' -l, --level=<level> minimum output level (info|warn|fail)') uout.info(' -l, --level=<level> minimum output level (info|warn|fail)')
uout.info(' -L, --list-policies list all the official, built-in policies. Use with -v') uout.info(' -L, --list-policies list all the official, built-in policies. Use with -v')
@@ -836,82 +839,129 @@ def process_commandline(out: OutputBuffer, args: List[str], usage_cb: Callable[.
aconf.colors = enable_colors aconf.colors = enable_colors
out.use_colors = enable_colors out.use_colors = enable_colors
try:
sopts = 'h1246M:p:P:jbcnvl:t:T:Lmdg:'
lopts = ['help', 'ssh1', 'ssh2', 'ipv4', 'ipv6', 'make-policy=', 'port=', 'policy=', 'json', 'batch', 'client-audit', 'no-colors', 'verbose', 'level=', 'timeout=', 'targets=', 'list-policies', 'lookup=', 'threads=', 'manual', 'debug', 'gex-test=', 'dheat=', 'skip-rate-test', 'conn-rate-test=']
opts, args = getopt.gnu_getopt(args, sopts, lopts)
except getopt.GetoptError as err:
usage_cb(out, str(err))
aconf.ssh1, aconf.ssh2 = False, False aconf.ssh1, aconf.ssh2 = False, False
host: str = '' host: str = ''
oport: Optional[str] = None oport: Optional[str] = None
port: int = 0
for o, a in opts: parser = argparse.ArgumentParser(description='SSH Audit Tool', add_help=False)
if o in ('-h', '--help'):
# Add short options to the parser
parser.add_argument('-1', '--ssh1', action="store_true", dest='ssh1', default=None)
parser.add_argument('-2', '--ssh2', action="store_true", dest='ssh2', default=None)
parser.add_argument('-4', '--ipv4', action="store_true", dest='ipv4', default=None)
parser.add_argument('-6', '--ipv6', action="store_true", dest='ipv6', default=None)
parser.add_argument('-b', '--batch', action="store_true", dest='batch', default=None)
parser.add_argument('-c', '--client-audit', action="store_true", dest='client_audit', default=None)
parser.add_argument('-d', '--debug', action="store_true", dest='debug', default=None)
parser.add_argument('-g', '--gex-test', action="store", dest='gex_test', default=None)
parser.add_argument('-h', '--help', action="store_true", dest='help', default=None)
parser.add_argument('-ip', '--ip-address', '--hostname', action="store", dest='host', type=str)
parser.add_argument('-j', '--json', action="store_true", dest='json', default=None)
parser.add_argument('-jj', '--json-indent', action="store_true", dest='json_indent', default=None)
parser.add_argument('-l', '--level', action="store", dest='level', type=str, default='info')
parser.add_argument('-L', '--list-policies', action="store_true", dest='list_policies', default=None)
parser.add_argument('-M', '--make-policy', action="store", dest='make_policy', default=None)
parser.add_argument('-m', '--manual', action="store_true", dest='manual', default=None)
parser.add_argument('-P', '--policy', action="store", dest='policy', default=None)
parser.add_argument('-p', '--port', action="store", dest='port', default='22', type=int)
parser.add_argument('-T', '--targets', action="store", dest='targets', default=None)
parser.add_argument('-t', '--timeout', action="store", dest='timeout', default='5', type=int)
parser.add_argument('-v', '--verbose', action="store_true", dest='verbose', default=None)
# Add long options to the parser
parser.add_argument('--conn-rate-test', action="store", dest='conn_rate_test', default='0', type=int)
parser.add_argument('--dheat', action="store", dest='dheat', default='0', type=int)
parser.add_argument('--lookup', action="store", dest='lookup', default=None)
parser.add_argument('--no-colors', action="store_true", dest='no_colors', default=None)
parser.add_argument('--skip-rate-test', action="store_true", dest='skip_rate_test', default=None)
parser.add_argument('--threads', action="store", dest='threads', default='1', type=int)
try:
namespace = parser.parse_args()
if namespace.help is True:
usage_cb(out) usage_cb(out)
elif o in ('-1', '--ssh1'):
aconf.ssh1 = True aconf.host = namespace.host
elif o in ('-2', '--ssh2'): host = namespace.host
aconf.ssh2 = True port = namespace.port
elif o in ('-4', '--ipv4'): aconf.ssh1 = namespace.ssh1
aconf.ipv4 = True aconf.ssh2 = namespace.ssh2
elif o in ('-6', '--ipv6'): aconf.ipv4 = namespace.ipv4
aconf.ipv6 = True aconf.ipv6 = namespace.ipv6
elif o in ('-p', '--port'):
oport = a aconf.json = namespace.json
elif o in ('-b', '--batch'): if namespace.json_indent is True:
setattr(namespace, 'json', True)
aconf.json = namespace.json
aconf.json_print_indent = namespace.json_indent
if namespace.batch is True:
aconf.batch = True aconf.batch = True
aconf.verbose = True aconf.verbose = True
elif o in ('-c', '--client-audit'):
aconf.client_audit = True aconf.client_audit = namespace.client_audit
elif o in ('-j', '--json'):
if aconf.json: # If specified twice, enable indent printing. ttime = namespace.timeout
aconf.json_print_indent = True if ttime != 5:
else: aconf.timeout = float(namespace.timeout)
aconf.json = True aconf.timeout_set = True
elif o in ('-v', '--verbose'):
if namespace.verbose is True:
aconf.verbose = True aconf.verbose = True
out.verbose = True out.verbose = True
elif o in ('-l', '--level'):
if a not in ('info', 'warn', 'fail'):
usage_cb(out, 'level {} is not valid'.format(a))
aconf.level = a
elif o in ('-t', '--timeout'):
aconf.timeout = float(a)
aconf.timeout_set = True
elif o in ('-M', '--make-policy'):
aconf.make_policy = True
aconf.policy_file = a
elif o in ('-P', '--policy'):
aconf.policy_file = a
elif o in ('-T', '--targets'):
aconf.target_file = a
# If we're on Windows, and we can't use the idna workaround, force only one thread to be used (otherwise a crash would occur). # Get error level regex
# if no_idna_workaround: err_level = (namespace.level)
# print("\nWARNING: the idna module was not found on this system, thus only single-threaded scanning will be done (this is a workaround for this Windows-specific crash: https://github.com/python/cpython/issues/73474). Multi-threaded scanning can be enabled by installing the idna module (pip install idna).\n") pattern = re.compile(r'info|warn|fail')
# aconf.threads = 1 if pattern.match(err_level):
elif o == '--threads': aconf.level = str(namespace.level)
aconf.threads = int(a) else:
# if no_idna_workaround: usage_cb(out, 'Error level : {} is not valid'.format(err_level))
# aconf.threads = 1
elif o in ('-L', '--list-policies'): if getattr(namespace, 'make_policy') is True:
aconf.make_policy = True
aconf.policy_file = namespace.make_policy
if getattr(namespace, 'policy') is True:
aconf.policy_file = namespace.policy
if getattr(namespace, 'targets') is True:
aconf.target_file = namespace.targets
if os.name == 'nt':
aconf.threads = '1'
if os.name == 'posix':
aconf.threads = int(namespace.threads)
else:
aconf.threads = '1'
if getattr(namespace, 'list_policies') is True:
aconf.list_policies = True aconf.list_policies = True
elif o == '--lookup':
aconf.lookup = a if getattr(namespace, 'lookup') is True:
elif o in ('-m', '--manual'): aconf.lookup = namespace.lookup
if getattr(namespace, 'manual') is True:
aconf.manual = True aconf.manual = True
elif o in ('-d', '--debug'): else:
aconf.manual = False
if namespace.debug == True:
aconf.debug = True aconf.debug = True
out.debug = True out.debug = True
elif o in ('-g', '--gex-test'):
if getattr(namespace, 'gex_test') == True:
dh_gex = namespace.gex_test
permitted_syntax = get_permitted_syntax_for_gex_test() permitted_syntax = get_permitted_syntax_for_gex_test()
if not any(re.search(regex_str, a) for regex_str in permitted_syntax.values()): if not any(re.search(regex_str, dh_gex) for regex_str in permitted_syntax.values()):
usage_cb(out, '{} {} is not valid'.format(o, a)) usage_cb(out, '{} is not valid'.format(dh_gex))
if re.search(permitted_syntax['RANGE'], a): if re.search(permitted_syntax['RANGE'], dh_gex):
extracted_digits = re.findall(r'\d+', a) extracted_digits = re.findall(r'\d+', dh_gex)
bits_left_bound = int(extracted_digits[0]) bits_left_bound = int(extracted_digits[0])
bits_right_bound = int(extracted_digits[1]) bits_right_bound = int(extracted_digits[1])
@@ -920,21 +970,26 @@ def process_commandline(out: OutputBuffer, args: List[str], usage_cb: Callable[.
bits_step = int(extracted_digits[2]) bits_step = int(extracted_digits[2])
if bits_step <= 0: if bits_step <= 0:
usage_cb(out, '{} {} is not valid'.format(o, bits_step)) usage_cb(out, '{} {} is not valid'.format(dh_gex, bits_step))
if all(x < 0 for x in (bits_left_bound, bits_right_bound)): if all(x < 0 for x in (bits_left_bound, bits_right_bound)):
usage_cb(out, '{} {} {} is not valid'.format(o, bits_left_bound, bits_right_bound)) usage_cb(out, '{} {} {} is not valid'.format(dh_gex, bits_left_bound, bits_right_bound))
aconf.gex_test = a aconf.gex_test = namespace.gex_test
elif o == '--dheat':
aconf.dheat = a
elif o == '--skip-rate-test':
aconf.skip_rate_test = True
elif o == '--conn-rate-test':
aconf.conn_rate_test = a
if len(args) == 0 and aconf.client_audit is False and aconf.target_file is None and aconf.list_policies is False and aconf.lookup == '' and aconf.manual is False: if int(namespace.dheat) > 0:
aconf.dheat = int(namespace.dheat)
aconf.skip_rate_test = namespace.skip_rate_test
if int(namespace.conn_rate_test) > 0:
aconf.conn_rate_test = int(namespace.conn_rate_test)
except argparse.ArgumentError as err:
usage_cb(out, str(err))
if namespace.host is None and namespace.client_audit is None and namespace.targets is None and namespace.list_policies is None and namespace.lookup is None and namespace.manual is None:
usage_cb(out) usage_cb(out)
if aconf.manual: if aconf.manual:
@@ -947,19 +1002,21 @@ def process_commandline(out: OutputBuffer, args: List[str], usage_cb: Callable[.
list_policies(out, aconf.verbose) list_policies(out, aconf.verbose)
sys.exit(exitcodes.GOOD) sys.exit(exitcodes.GOOD)
if aconf.client_audit is False and aconf.target_file is None: if aconf.client_audit is None and aconf.target_file is None:
if oport is not None: if oport is not None:
host = args[0] host = host
else: else:
host, port = Utils.parse_host_and_port(args[0]) host = host
if not host and aconf.target_file is None: port = port
#if not host and aconf.target_file is None:
if host is None and aconf.target_file is None:
usage_cb(out, 'host is empty') usage_cb(out, 'host is empty')
if port == 0 and oport is None: if port == 0 and oport is None:
if aconf.client_audit: # The default port to listen on during a client audit is 2222. if aconf.client_audit: # The default port to listen on during a client audit is 2222.
port = 2222 port = 2222
else: else:
port = 22 port = port
if oport is not None: if oport is not None:
port = Utils.parse_int(oport) port = Utils.parse_int(oport)