mirror of
				https://github.com/jtesta/ssh-audit.git
				synced 2025-11-03 18:52:15 +01:00 
			
		
		
		
	Fixed docker tests.
This commit is contained in:
		@@ -1,2 +1,16 @@
 | 
			
		||||
import sys
 | 
			
		||||
import traceback
 | 
			
		||||
 | 
			
		||||
from ssh_audit.ssh_audit import main
 | 
			
		||||
main()
 | 
			
		||||
from ssh_audit import exitcodes
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
exit_code = exitcodes.GOOD
 | 
			
		||||
 | 
			
		||||
try:
 | 
			
		||||
    exit_code = main()
 | 
			
		||||
except Exception:
 | 
			
		||||
    exit_code = exitcodes.UNKNOWN_ERROR
 | 
			
		||||
    print(traceback.format_exc())
 | 
			
		||||
 | 
			
		||||
sys.exit(exit_code)
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										6
									
								
								src/ssh_audit/exitcodes.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										6
									
								
								src/ssh_audit/exitcodes.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,6 @@
 | 
			
		||||
# The program return values corresponding to failure(s) encountered, warning(s) encountered, connection errors, and no problems found, respectively.
 | 
			
		||||
FAILURE = 3
 | 
			
		||||
WARNING = 2
 | 
			
		||||
CONNECTION_ERROR = 1
 | 
			
		||||
GOOD = 0
 | 
			
		||||
UNKNOWN_ERROR = -1
 | 
			
		||||
@@ -44,16 +44,13 @@ import traceback
 | 
			
		||||
from typing import Dict, List, Set, Sequence, Tuple, Iterable
 | 
			
		||||
from typing import Callable, Optional, Union, Any
 | 
			
		||||
 | 
			
		||||
from ssh_audit import exitcodes
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
VERSION = 'v2.3.1-dev'
 | 
			
		||||
SSH_HEADER = 'SSH-{0}-OpenSSH_8.2'  # SSH software to impersonate
 | 
			
		||||
GITHUB_ISSUES_URL = 'https://github.com/jtesta/ssh-audit/issues'  # The URL to the Github issues tracker.
 | 
			
		||||
 | 
			
		||||
# The program return values corresponding to failure(s) encountered, warning(s) encountered, connection errors, and no problems found, respectively.
 | 
			
		||||
PROGRAM_RETVAL_FAILURE = 3
 | 
			
		||||
PROGRAM_RETVAL_WARNING = 2
 | 
			
		||||
PROGRAM_RETVAL_CONNECTION_ERROR = 1
 | 
			
		||||
PROGRAM_RETVAL_GOOD = 0
 | 
			
		||||
PROGRAM_RETVAL_UNKNOWN_ERROR = -1
 | 
			
		||||
 | 
			
		||||
try:  # pragma: nocover
 | 
			
		||||
    from colorama import init as colorama_init
 | 
			
		||||
@@ -63,13 +60,13 @@ except ImportError:  # pragma: nocover
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def usage(err: Optional[str] = None) -> None:
 | 
			
		||||
    retval = PROGRAM_RETVAL_GOOD
 | 
			
		||||
    retval = exitcodes.GOOD
 | 
			
		||||
    uout = Output()
 | 
			
		||||
    p = os.path.basename(sys.argv[0])
 | 
			
		||||
    uout.head('# {} {}, https://github.com/jtesta/ssh-audit\n'.format(p, VERSION))
 | 
			
		||||
    if err is not None and len(err) > 0:
 | 
			
		||||
        uout.fail('\n' + err)
 | 
			
		||||
        retval = PROGRAM_RETVAL_UNKNOWN_ERROR
 | 
			
		||||
        retval = exitcodes.UNKNOWN_ERROR
 | 
			
		||||
    uout.info('usage: {0} [options] <host>\n'.format(p))
 | 
			
		||||
    uout.info('   -h,  --help             print this help')
 | 
			
		||||
    uout.info('   -1,  --ssh1             force ssh version 1 only')
 | 
			
		||||
@@ -727,7 +724,7 @@ class AuditConf:
 | 
			
		||||
 | 
			
		||||
        if aconf.list_policies:
 | 
			
		||||
            list_policies()
 | 
			
		||||
            sys.exit(PROGRAM_RETVAL_GOOD)
 | 
			
		||||
            sys.exit(exitcodes.GOOD)
 | 
			
		||||
 | 
			
		||||
        if aconf.client_audit is False and aconf.target_file is None:
 | 
			
		||||
            if oport is not None:
 | 
			
		||||
@@ -767,17 +764,17 @@ class AuditConf:
 | 
			
		||||
                aconf.policy = Policy(policy_file=aconf.policy_file)
 | 
			
		||||
            except Exception as e:
 | 
			
		||||
                print("Error while loading policy file: %s: %s" % (str(e), traceback.format_exc()))
 | 
			
		||||
                sys.exit(PROGRAM_RETVAL_UNKNOWN_ERROR)
 | 
			
		||||
                sys.exit(exitcodes.UNKNOWN_ERROR)
 | 
			
		||||
 | 
			
		||||
            # If the user wants to do a client audit, but provided a server policy, terminate.
 | 
			
		||||
            if aconf.client_audit and aconf.policy.is_server_policy():
 | 
			
		||||
                print("Error: client audit selected, but server policy provided.")
 | 
			
		||||
                sys.exit(PROGRAM_RETVAL_UNKNOWN_ERROR)
 | 
			
		||||
                sys.exit(exitcodes.UNKNOWN_ERROR)
 | 
			
		||||
 | 
			
		||||
            # If the user wants to do a server audit, but provided a client policy, terminate.
 | 
			
		||||
            if aconf.client_audit is False and aconf.policy.is_server_policy() is False:
 | 
			
		||||
                print("Error: server audit selected, but client policy provided.")
 | 
			
		||||
                sys.exit(PROGRAM_RETVAL_UNKNOWN_ERROR)
 | 
			
		||||
                sys.exit(exitcodes.UNKNOWN_ERROR)
 | 
			
		||||
 | 
			
		||||
        return aconf
 | 
			
		||||
 | 
			
		||||
@@ -2548,7 +2545,7 @@ class SSH:  # pylint: disable=too-few-public-methods
 | 
			
		||||
                        yield af, addr
 | 
			
		||||
            except socket.error as e:
 | 
			
		||||
                out.fail('[exception] {}'.format(e))
 | 
			
		||||
                sys.exit(PROGRAM_RETVAL_CONNECTION_ERROR)
 | 
			
		||||
                sys.exit(exitcodes.CONNECTION_ERROR)
 | 
			
		||||
 | 
			
		||||
        # Listens on a server socket and accepts one connection (used for
 | 
			
		||||
        # auditing client connections).
 | 
			
		||||
@@ -2578,7 +2575,7 @@ class SSH:  # pylint: disable=too-few-public-methods
 | 
			
		||||
            # If we failed to listen on any interfaces, terminate.
 | 
			
		||||
            if len(self.__sock_map.keys()) == 0:
 | 
			
		||||
                print("Error: failed to listen on any IPv4 and IPv6 interfaces!")
 | 
			
		||||
                sys.exit(PROGRAM_RETVAL_CONNECTION_ERROR)
 | 
			
		||||
                sys.exit(exitcodes.CONNECTION_ERROR)
 | 
			
		||||
 | 
			
		||||
            # Wait for an incoming connection.  If a timeout was explicitly
 | 
			
		||||
            # set by the user, terminate when it elapses.
 | 
			
		||||
@@ -2596,7 +2593,7 @@ class SSH:  # pylint: disable=too-few-public-methods
 | 
			
		||||
 | 
			
		||||
                if self.__timeout_set and time_elapsed >= self.__timeout:
 | 
			
		||||
                    print("Timeout elapsed.  Terminating...")
 | 
			
		||||
                    sys.exit(PROGRAM_RETVAL_CONNECTION_ERROR)
 | 
			
		||||
                    sys.exit(exitcodes.CONNECTION_ERROR)
 | 
			
		||||
 | 
			
		||||
            # Accept the connection.
 | 
			
		||||
            c, addr = self.__sock_map[fds[0][0]].accept()
 | 
			
		||||
@@ -2732,7 +2729,7 @@ class SSH:  # pylint: disable=too-few-public-methods
 | 
			
		||||
                    check_size = 4 + 1 + payload_length + padding_length
 | 
			
		||||
                if check_size % self.__block_size != 0:
 | 
			
		||||
                    out.fail('[exception] invalid ssh packet (block size)')
 | 
			
		||||
                    sys.exit(PROGRAM_RETVAL_CONNECTION_ERROR)
 | 
			
		||||
                    sys.exit(exitcodes.CONNECTION_ERROR)
 | 
			
		||||
                self.ensure_read(payload_length)
 | 
			
		||||
                if sshv == 1:
 | 
			
		||||
                    payload = self.read(payload_length - 4)
 | 
			
		||||
@@ -2747,7 +2744,7 @@ class SSH:  # pylint: disable=too-few-public-methods
 | 
			
		||||
                    rcrc = SSH1.crc32(padding + payload)
 | 
			
		||||
                    if crc != rcrc:
 | 
			
		||||
                        out.fail('[exception] packet checksum CRC32 mismatch.')
 | 
			
		||||
                        sys.exit(PROGRAM_RETVAL_CONNECTION_ERROR)
 | 
			
		||||
                        sys.exit(exitcodes.CONNECTION_ERROR)
 | 
			
		||||
                else:
 | 
			
		||||
                    self.ensure_read(padding_length)
 | 
			
		||||
                    padding = self.read(padding_length)
 | 
			
		||||
@@ -3189,9 +3186,9 @@ def output_algorithm(alg_db: Dict[str, Dict[str, List[List[Optional[str]]]]], al
 | 
			
		||||
    first = True
 | 
			
		||||
    for level, text in texts:
 | 
			
		||||
        if level == 'fail':
 | 
			
		||||
            program_retval = PROGRAM_RETVAL_FAILURE
 | 
			
		||||
        elif level == 'warn' and program_retval != PROGRAM_RETVAL_FAILURE:  # If a failure was found previously, don't downgrade to warning.
 | 
			
		||||
            program_retval = PROGRAM_RETVAL_WARNING
 | 
			
		||||
            program_retval = exitcodes.FAILURE
 | 
			
		||||
        elif level == 'warn' and program_retval != exitcodes.FAILURE:  # If a failure was found previously, don't downgrade to warning.
 | 
			
		||||
            program_retval = exitcodes.WARNING
 | 
			
		||||
 | 
			
		||||
        f = getattr(out, level)
 | 
			
		||||
        comment = (padding + ' -- [' + level + '] ' + text) if text != '' else ''
 | 
			
		||||
@@ -3414,10 +3411,10 @@ def output_info(software: Optional['SSH.Software'], client_audit: bool, any_prob
 | 
			
		||||
        out.sep()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# Returns a PROGRAM_RETVAL_* flag to denote if any failures or warnings were encountered.
 | 
			
		||||
# Returns a exitcodes.* flag to denote if any failures or warnings were encountered.
 | 
			
		||||
def output(aconf: AuditConf, banner: Optional[SSH.Banner], header: List[str], client_host: Optional[str] = None, kex: Optional[SSH2.Kex] = None, pkm: Optional[SSH1.PublicKeyMessage] = None, print_target: bool = False) -> int:
 | 
			
		||||
 | 
			
		||||
    program_retval = PROGRAM_RETVAL_GOOD
 | 
			
		||||
    program_retval = exitcodes.GOOD
 | 
			
		||||
    client_audit = client_host is not None  # If set, this is a client audit.
 | 
			
		||||
    sshv = 1 if pkm is not None else 2
 | 
			
		||||
    algs = SSH.Algorithms(pkm, kex)
 | 
			
		||||
@@ -3554,7 +3551,7 @@ def list_policies() -> None:
 | 
			
		||||
        print("\nsys.argv[0]: %s" % sys.argv[0])
 | 
			
		||||
        print("__file__: %s" % __file__)
 | 
			
		||||
        print("policies_dir: %s" % policies_dir)
 | 
			
		||||
        sys.exit(PROGRAM_RETVAL_UNKNOWN_ERROR)
 | 
			
		||||
        sys.exit(exitcodes.UNKNOWN_ERROR)
 | 
			
		||||
 | 
			
		||||
    # Get a list of all the files in the policies sub-directory.
 | 
			
		||||
    files = []
 | 
			
		||||
@@ -3717,9 +3714,9 @@ def build_struct(banner: Optional['SSH.Banner'], kex: Optional['SSH2.Kex'] = Non
 | 
			
		||||
    return res
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# Returns one of the PROGRAM_RETVAL_* flags.
 | 
			
		||||
# Returns one of the exitcodes.* flags.
 | 
			
		||||
def audit(aconf: AuditConf, sshv: Optional[int] = None, print_target: bool = False) -> int:
 | 
			
		||||
    program_retval = PROGRAM_RETVAL_GOOD
 | 
			
		||||
    program_retval = exitcodes.GOOD
 | 
			
		||||
    out.batch = aconf.batch
 | 
			
		||||
    out.verbose = aconf.verbose
 | 
			
		||||
    out.level = aconf.level
 | 
			
		||||
@@ -3731,7 +3728,7 @@ def audit(aconf: AuditConf, sshv: Optional[int] = None, print_target: bool = Fal
 | 
			
		||||
        err = s.connect()
 | 
			
		||||
        if err is not None:
 | 
			
		||||
            out.fail(err)
 | 
			
		||||
            sys.exit(PROGRAM_RETVAL_CONNECTION_ERROR)
 | 
			
		||||
            sys.exit(exitcodes.CONNECTION_ERROR)
 | 
			
		||||
 | 
			
		||||
    if sshv is None:
 | 
			
		||||
        sshv = 2 if aconf.ssh2 else 1
 | 
			
		||||
@@ -3771,7 +3768,7 @@ def audit(aconf: AuditConf, sshv: Optional[int] = None, print_target: bool = Fal
 | 
			
		||||
    if err is not None:
 | 
			
		||||
        output(aconf, banner, header)
 | 
			
		||||
        out.fail(err)
 | 
			
		||||
        return PROGRAM_RETVAL_CONNECTION_ERROR
 | 
			
		||||
        return exitcodes.CONNECTION_ERROR
 | 
			
		||||
    if sshv == 1:
 | 
			
		||||
        program_retval = output(aconf, banner, header, pkm=SSH1.PublicKeyMessage.parse(payload))
 | 
			
		||||
    elif sshv == 2:
 | 
			
		||||
@@ -3786,7 +3783,7 @@ def audit(aconf: AuditConf, sshv: Optional[int] = None, print_target: bool = Fal
 | 
			
		||||
 | 
			
		||||
        # This is a policy test.
 | 
			
		||||
        elif (aconf.policy is not None) and (aconf.make_policy is False):
 | 
			
		||||
            program_retval = PROGRAM_RETVAL_GOOD if evaluate_policy(aconf, banner, s.client_host, kex=kex) else PROGRAM_RETVAL_FAILURE
 | 
			
		||||
            program_retval = exitcodes.GOOD if evaluate_policy(aconf, banner, s.client_host, kex=kex) else exitcodes.FAILURE
 | 
			
		||||
 | 
			
		||||
        # A new policy should be made from this scan.
 | 
			
		||||
        elif (aconf.policy is None) and (aconf.make_policy is True):
 | 
			
		||||
@@ -3799,8 +3796,8 @@ def audit(aconf: AuditConf, sshv: Optional[int] = None, print_target: bool = Fal
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def algorithm_lookup(alg_names: str) -> int:
 | 
			
		||||
    '''Looks up a comma-separated list of algorithms and outputs their security properties.  Returns a PROGRAM_RETVAL_* flag.'''
 | 
			
		||||
    retval = PROGRAM_RETVAL_GOOD
 | 
			
		||||
    '''Looks up a comma-separated list of algorithms and outputs their security properties.  Returns an exitcodes.* flag.'''
 | 
			
		||||
    retval = exitcodes.GOOD
 | 
			
		||||
    alg_types = {
 | 
			
		||||
        'kex': 'key exchange algorithms',
 | 
			
		||||
        'key': 'host-key algorithms',
 | 
			
		||||
@@ -3845,7 +3842,7 @@ def algorithm_lookup(alg_names: str) -> int:
 | 
			
		||||
    ]
 | 
			
		||||
 | 
			
		||||
    if len(algorithms_not_found) > 0:
 | 
			
		||||
        retval = PROGRAM_RETVAL_FAILURE
 | 
			
		||||
        retval = exitcodes.FAILURE
 | 
			
		||||
        out.head('# unknown algorithms')
 | 
			
		||||
        for algorithm_not_found in algorithms_not_found:
 | 
			
		||||
            out.fail(algorithm_not_found)
 | 
			
		||||
@@ -3866,7 +3863,7 @@ def main() -> int:
 | 
			
		||||
 | 
			
		||||
    # If multiple targets were specified...
 | 
			
		||||
    if len(aconf.target_list) > 0:
 | 
			
		||||
        ret = PROGRAM_RETVAL_GOOD
 | 
			
		||||
        ret = exitcodes.GOOD
 | 
			
		||||
 | 
			
		||||
        # If JSON output is desired, each target's results will be reported in its own list entry.
 | 
			
		||||
        if aconf.json:
 | 
			
		||||
@@ -3882,7 +3879,7 @@ def main() -> int:
 | 
			
		||||
            new_ret = audit(aconf, print_target=True)
 | 
			
		||||
 | 
			
		||||
            # Set the return value only if an unknown error occurred, a failure occurred, or if a warning occurred and the previous value was good.
 | 
			
		||||
            if (new_ret == PROGRAM_RETVAL_UNKNOWN_ERROR) or (new_ret == PROGRAM_RETVAL_FAILURE) or ((new_ret == PROGRAM_RETVAL_WARNING) and (ret == PROGRAM_RETVAL_GOOD)):
 | 
			
		||||
            if (new_ret == exitcodes.UNKNOWN_ERROR) or (new_ret == exitcodes.FAILURE) or ((new_ret == exitcodes.WARNING) and (ret == exitcodes.GOOD)):
 | 
			
		||||
                ret = new_ret
 | 
			
		||||
 | 
			
		||||
            # Don't print a delimiter after the last target was handled.
 | 
			
		||||
@@ -3901,12 +3898,12 @@ def main() -> int:
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if __name__ == '__main__':  # pragma: nocover
 | 
			
		||||
    exit_code = PROGRAM_RETVAL_GOOD
 | 
			
		||||
    exit_code = exitcodes.GOOD
 | 
			
		||||
 | 
			
		||||
    try:
 | 
			
		||||
        exit_code = main()
 | 
			
		||||
    except Exception:
 | 
			
		||||
        exit_code = PROGRAM_RETVAL_UNKNOWN_ERROR
 | 
			
		||||
        exit_code = exitcodes.UNKNOWN_ERROR
 | 
			
		||||
        print(traceback.format_exc())
 | 
			
		||||
 | 
			
		||||
    sys.exit(exit_code)
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user