mirror of
				https://github.com/jtesta/ssh-audit.git
				synced 2025-11-04 03:02:15 +01:00 
			
		
		
		
	Added -L option to list built-in policies.
This commit is contained in:
		
							
								
								
									
										70
									
								
								ssh-audit.py
									
									
									
									
									
								
							
							
						
						
									
										70
									
								
								ssh-audit.py
									
									
									
									
									
								
							@@ -46,6 +46,7 @@ from typing import Callable, Optional, Union, Any
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
VERSION = 'v2.2.1-dev'
 | 
					VERSION = 'v2.2.1-dev'
 | 
				
			||||||
SSH_HEADER = 'SSH-{0}-OpenSSH_8.0'  # SSH software to impersonate
 | 
					SSH_HEADER = 'SSH-{0}-OpenSSH_8.0'  # SSH software to impersonate
 | 
				
			||||||
 | 
					GITHUB_ISSUES_URL = 'https://github.com/jtesta/ssh-audit/issues'  # The URL to the Github issues tracker.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# The program return values corresponding to failure(s) encountered, warning(s) encountered, connection errors, and no problems found, respectively.
 | 
					# The program return values corresponding to failure(s) encountered, warning(s) encountered, connection errors, and no problems found, respectively.
 | 
				
			||||||
PROGRAM_RETVAL_FAILURE = 3
 | 
					PROGRAM_RETVAL_FAILURE = 3
 | 
				
			||||||
@@ -67,7 +68,7 @@ def usage(err: Optional[str] = None) -> None:
 | 
				
			|||||||
    uout.head('# {} {}, https://github.com/jtesta/ssh-audit\n'.format(p, VERSION))
 | 
					    uout.head('# {} {}, https://github.com/jtesta/ssh-audit\n'.format(p, VERSION))
 | 
				
			||||||
    if err is not None and len(err) > 0:
 | 
					    if err is not None and len(err) > 0:
 | 
				
			||||||
        uout.fail('\n' + err)
 | 
					        uout.fail('\n' + err)
 | 
				
			||||||
    uout.info('usage: {0} [-h1246ptbcPjlnv] <host>\n'.format(p))
 | 
					    uout.info('usage: {0} [-h1246ptbcMPjlLnv] <host>\n'.format(p))
 | 
				
			||||||
    uout.info('   -h,  --help             print this help')
 | 
					    uout.info('   -h,  --help             print this help')
 | 
				
			||||||
    uout.info('   -1,  --ssh1             force ssh version 1 only')
 | 
					    uout.info('   -1,  --ssh1             force ssh version 1 only')
 | 
				
			||||||
    uout.info('   -2,  --ssh2             force ssh version 2 only')
 | 
					    uout.info('   -2,  --ssh2             force ssh version 2 only')
 | 
				
			||||||
@@ -84,6 +85,7 @@ def usage(err: Optional[str] = None) -> None:
 | 
				
			|||||||
    uout.info('')
 | 
					    uout.info('')
 | 
				
			||||||
    uout.info('   -j,  --json             JSON output')
 | 
					    uout.info('   -j,  --json             JSON output')
 | 
				
			||||||
    uout.info('   -l,  --level=<level>    minimum output level (info|warn|fail)')
 | 
					    uout.info('   -l,  --level=<level>    minimum output level (info|warn|fail)')
 | 
				
			||||||
 | 
					    uout.info('   -L,  --list-policies    list all the official, built-in policies')
 | 
				
			||||||
    uout.info('   -n,  --no-colors        disable colors')
 | 
					    uout.info('   -n,  --no-colors        disable colors')
 | 
				
			||||||
    uout.info('   -v,  --verbose          verbose output')
 | 
					    uout.info('   -v,  --verbose          verbose output')
 | 
				
			||||||
    uout.sep()
 | 
					    uout.sep()
 | 
				
			||||||
@@ -421,10 +423,11 @@ class AuditConf:
 | 
				
			|||||||
        self.timeout_set = False  # Set to True when the user explicitly sets it.
 | 
					        self.timeout_set = False  # Set to True when the user explicitly sets it.
 | 
				
			||||||
        self.target_file = None  # type: Optional[str]
 | 
					        self.target_file = None  # type: Optional[str]
 | 
				
			||||||
        self.target_list = []  # type: List[str]
 | 
					        self.target_list = []  # type: List[str]
 | 
				
			||||||
 | 
					        self.list_policies = False
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def __setattr__(self, name: str, value: Union[str, int, float, bool, Sequence[int]]) -> None:
 | 
					    def __setattr__(self, name: str, value: Union[str, int, float, bool, Sequence[int]]) -> None:
 | 
				
			||||||
        valid = False
 | 
					        valid = False
 | 
				
			||||||
        if name in ['ssh1', 'ssh2', 'batch', 'client_audit', 'colors', 'verbose', 'timeout_set', 'json', 'make_policy']:
 | 
					        if name in ['ssh1', 'ssh2', 'batch', 'client_audit', 'colors', 'verbose', 'timeout_set', 'json', 'make_policy', 'list_policies']:
 | 
				
			||||||
            valid, value = True, bool(value)
 | 
					            valid, value = True, bool(value)
 | 
				
			||||||
        elif name in ['ipv4', 'ipv6']:
 | 
					        elif name in ['ipv4', 'ipv6']:
 | 
				
			||||||
            valid = False
 | 
					            valid = False
 | 
				
			||||||
@@ -473,8 +476,8 @@ class AuditConf:
 | 
				
			|||||||
        # pylint: disable=too-many-branches
 | 
					        # pylint: disable=too-many-branches
 | 
				
			||||||
        aconf = cls()
 | 
					        aconf = cls()
 | 
				
			||||||
        try:
 | 
					        try:
 | 
				
			||||||
            sopts = 'h1246M:p:P:jbcnvl:t:T:'
 | 
					            sopts = 'h1246M:p:P:jbcnvl:t:T:L'
 | 
				
			||||||
            lopts = ['help', 'ssh1', 'ssh2', 'ipv4', 'ipv6', 'make-policy=', 'port=', 'policy=', 'json', 'batch', 'client-audit', 'no-colors', 'verbose', 'level=', 'timeout=', 'targets=']
 | 
					            lopts = ['help', 'ssh1', 'ssh2', 'ipv4', 'ipv6', 'make-policy=', 'port=', 'policy=', 'json', 'batch', 'client-audit', 'no-colors', 'verbose', 'level=', 'timeout=', 'targets=', 'list-policies']
 | 
				
			||||||
            opts, args = getopt.gnu_getopt(args, sopts, lopts)
 | 
					            opts, args = getopt.gnu_getopt(args, sopts, lopts)
 | 
				
			||||||
        except getopt.GetoptError as err:
 | 
					        except getopt.GetoptError as err:
 | 
				
			||||||
            usage_cb(str(err))
 | 
					            usage_cb(str(err))
 | 
				
			||||||
@@ -520,10 +523,16 @@ class AuditConf:
 | 
				
			|||||||
                aconf.policy_file = a
 | 
					                aconf.policy_file = a
 | 
				
			||||||
            elif o in ('-T', '--targets'):
 | 
					            elif o in ('-T', '--targets'):
 | 
				
			||||||
                aconf.target_file = a
 | 
					                aconf.target_file = a
 | 
				
			||||||
 | 
					            elif o in ('-L', '--list-policies'):
 | 
				
			||||||
 | 
					                aconf.list_policies = True
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if len(args) == 0 and aconf.client_audit is False and aconf.target_file is None:
 | 
					        if len(args) == 0 and aconf.client_audit is False and aconf.target_file is None and aconf.list_policies is False:
 | 
				
			||||||
            usage_cb()
 | 
					            usage_cb()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if aconf.list_policies:
 | 
				
			||||||
 | 
					            list_policies()
 | 
				
			||||||
 | 
					            sys.exit(PROGRAM_RETVAL_GOOD)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if aconf.client_audit is False and aconf.target_file is None:
 | 
					        if aconf.client_audit is False and aconf.target_file is None:
 | 
				
			||||||
            if oport is not None:
 | 
					            if oport is not None:
 | 
				
			||||||
                host = args[0]
 | 
					                host = args[0]
 | 
				
			||||||
@@ -3282,6 +3291,57 @@ def evaluate_policy(aconf: AuditConf, banner: Optional['SSH.Banner'], client_hos
 | 
				
			|||||||
    return passed
 | 
					    return passed
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def list_policies() -> None:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # Get a list of all the files in the policies sub-directory, relative to the path of this script.
 | 
				
			||||||
 | 
					    installed_dir = os.path.dirname(os.path.abspath(__file__))
 | 
				
			||||||
 | 
					    policies_dir = os.path.join(installed_dir, 'policies')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # If the path is not a directory, print a useful error and exit.
 | 
				
			||||||
 | 
					    if not os.path.isdir(policies_dir):
 | 
				
			||||||
 | 
					        print("Error: could not find policies directory.  Please report this full output to <%s>:" % GITHUB_ISSUES_URL)
 | 
				
			||||||
 | 
					        print("\n__file__: %s" % __file__)
 | 
				
			||||||
 | 
					        print("policies_dir: %s" % policies_dir)
 | 
				
			||||||
 | 
					        sys.exit(PROGRAM_RETVAL_UNKNOWN_ERROR)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # Get a list of all the files in the policies sub-directory.
 | 
				
			||||||
 | 
					    files = []
 | 
				
			||||||
 | 
					    for f in os.listdir(policies_dir):
 | 
				
			||||||
 | 
					        files.append(f)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    files.sort()  # Now the files will be in order, like 'ubuntu_client_16_04.txt', 'ubuntu_client_18_04.txt', 'ubuntu_client_20_04.txt', ...
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    server_policies_summary = []
 | 
				
			||||||
 | 
					    client_policies_summary = []
 | 
				
			||||||
 | 
					    for f in files:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # Load each policy, and generate a short summary from its name and absolute file path.
 | 
				
			||||||
 | 
					        policy_file = os.path.join(policies_dir, f)
 | 
				
			||||||
 | 
					        policy = Policy(policy_file=policy_file)
 | 
				
			||||||
 | 
					        policy_summary = "Name:        %s\nPolicy path: %s" % (policy.get_name_and_version(), policy_file)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # We will print the server policies separately from thee client policies...
 | 
				
			||||||
 | 
					        if policy.is_server_policy():
 | 
				
			||||||
 | 
					            server_policies_summary.append(policy_summary)
 | 
				
			||||||
 | 
					        else:
 | 
				
			||||||
 | 
					            client_policies_summary.append(policy_summary)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    if len(server_policies_summary) > 0:
 | 
				
			||||||
 | 
					        out.head('\nServer policies:\n')
 | 
				
			||||||
 | 
					        print("\n\n".join(server_policies_summary))
 | 
				
			||||||
 | 
					        print()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    if len(client_policies_summary) > 0:
 | 
				
			||||||
 | 
					        out.head('\nClient policies:\n')
 | 
				
			||||||
 | 
					        print("\n\n".join(client_policies_summary))
 | 
				
			||||||
 | 
					        print()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    if len(server_policies_summary) == 0 and len(client_policies_summary) == 0:
 | 
				
			||||||
 | 
					        print("Error: no built-in policies found in %s." % policies_dir)
 | 
				
			||||||
 | 
					    else:
 | 
				
			||||||
 | 
					        print("\nHint: Use -P and provide the path to a policy to run a policy scan.\n")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def make_policy(aconf: AuditConf, banner: Optional['SSH.Banner'], kex: Optional['SSH2.Kex'], client_host: Optional[str]) -> None:
 | 
					def make_policy(aconf: AuditConf, banner: Optional['SSH.Banner'], kex: Optional['SSH2.Kex'], client_host: Optional[str]) -> None:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # Set the source of this policy to the server host if this is a server audit, otherwise set it to the client address.
 | 
					    # Set the source of this policy to the server host if this is a server audit, otherwise set it to the client address.
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user