From 83bd049486d6d48ca89bbcdd21e36e63b8d628d2 Mon Sep 17 00:00:00 2001 From: thecliguy Date: Tue, 2 Mar 2021 16:06:40 +0000 Subject: [PATCH] Debug Logging and visibility of SSH Connection errors (#99) * Debug Logging and visibility of SSH Connection errors * Updated date in man page --- src/ssh_audit/auditconf.py | 3 ++- src/ssh_audit/gextest.py | 20 +++++++++++++------- src/ssh_audit/hostkeytest.py | 19 ++++++++++++------- src/ssh_audit/outputbuffer.py | 12 +++++++++++- src/ssh_audit/ssh_audit.py | 23 +++++++++++++++-------- src/ssh_audit/ssh_socket.py | 11 ++++++++--- ssh-audit.1 | 7 ++++++- 7 files changed, 67 insertions(+), 28 deletions(-) diff --git a/src/ssh_audit/auditconf.py b/src/ssh_audit/auditconf.py index 71b71ac..de0153a 100644 --- a/src/ssh_audit/auditconf.py +++ b/src/ssh_audit/auditconf.py @@ -57,10 +57,11 @@ class AuditConf: self.list_policies = False self.lookup = '' self.manual = False + self.debug = False def __setattr__(self, name: str, value: Union[str, int, float, bool, Sequence[int]]) -> None: valid = False - if name in ['batch', 'client_audit', 'colors', 'json', 'list_policies', 'manual', 'make_policy', 'ssh1', 'ssh2', 'timeout_set', 'verbose']: + if name in ['batch', 'client_audit', 'colors', 'json', 'list_policies', 'manual', 'make_policy', 'ssh1', 'ssh2', 'timeout_set', 'verbose', 'debug']: valid, value = True, bool(value) elif name in ['ipv4', 'ipv6']: valid, value = True, bool(value) diff --git a/src/ssh_audit/gextest.py b/src/ssh_audit/gextest.py index d76f549..7f84fc6 100644 --- a/src/ssh_audit/gextest.py +++ b/src/ssh_audit/gextest.py @@ -30,6 +30,7 @@ from ssh_audit.kexdh import KexGroupExchange_SHA1, KexGroupExchange_SHA256 from ssh_audit.ssh2_kexdb import SSH2_KexDB from ssh_audit.ssh2_kex import SSH2_Kex from ssh_audit.ssh_socket import SSH_Socket +from ssh_audit.outputbuffer import OutputBuffer # Performs DH group exchanges to find what moduli are supported, and checks @@ -38,22 +39,24 @@ class GEXTest: # Creates a new connection to the server. Returns True on success, or False. @staticmethod - def reconnect(s: 'SSH_Socket', kex: 'SSH2_Kex', gex_alg: str) -> bool: + def reconnect(out: 'OutputBuffer', s: 'SSH_Socket', kex: 'SSH2_Kex', gex_alg: str) -> bool: if s.is_connected(): return True - err = s.connect() + err = s.connect(out) if err is not None: + out.v(err, write_now=True) return False - _, _, err = s.get_banner() + _, _, err = s.get_banner(out) if err is not None: + out.v(err, write_now=True) s.close() return False # Send our KEX using the specified group-exchange and most of the # server's own values. - s.send_kexinit(key_exchanges=[gex_alg], hostkeys=kex.key_algorithms, ciphers=kex.server.encryption, macs=kex.server.mac, compressions=kex.server.compression, languages=kex.server.languages) + s.send_kexinit(out, key_exchanges=[gex_alg], hostkeys=kex.key_algorithms, ciphers=kex.server.encryption, macs=kex.server.mac, compressions=kex.server.compression, languages=kex.server.languages) # Parse the server's KEX. _, payload = s.read_packet(2) @@ -63,7 +66,7 @@ class GEXTest: # Runs the DH moduli test against the specified target. @staticmethod - def run(s: 'SSH_Socket', kex: 'SSH2_Kex') -> None: + def run(out: 'OutputBuffer', s: 'SSH_Socket', kex: 'SSH2_Kex') -> None: GEX_ALGS = { 'diffie-hellman-group-exchange-sha1': KexGroupExchange_SHA1, 'diffie-hellman-group-exchange-sha256': KexGroupExchange_SHA256, @@ -79,8 +82,9 @@ class GEXTest: # algorithms. If so, test each one. for gex_alg in GEX_ALGS: if gex_alg in kex.kex_algorithms: + out.d('Preparing to perform DH group exchange using ' + gex_alg + '...', write_now=True) - if GEXTest.reconnect(s, kex, gex_alg) is False: + if GEXTest.reconnect(out, s, kex, gex_alg) is False: break kex_group = GEX_ALGS[gex_alg]() @@ -110,7 +114,9 @@ class GEXTest: if bits >= smallest_modulus > 0: break - if GEXTest.reconnect(s, kex, gex_alg) is False: + out.d('Preparing to perform DH group exchange using ' + gex_alg + ' with modulus size ' + str(bits) + '...', write_now=True) + + if GEXTest.reconnect(out, s, kex, gex_alg) is False: reconnect_failed = True break diff --git a/src/ssh_audit/hostkeytest.py b/src/ssh_audit/hostkeytest.py index 87b31fe..7a37055 100644 --- a/src/ssh_audit/hostkeytest.py +++ b/src/ssh_audit/hostkeytest.py @@ -30,6 +30,7 @@ from ssh_audit.kexdh import KexDH, KexGroup1, KexGroup14_SHA1, KexGroup14_SHA256 from ssh_audit.ssh2_kex import SSH2_Kex from ssh_audit.ssh2_kexdb import SSH2_KexDB from ssh_audit.ssh_socket import SSH_Socket +from ssh_audit.outputbuffer import OutputBuffer # Obtains host keys, checks their size, and derives their fingerprints. @@ -52,7 +53,7 @@ class HostKeyTest: } @staticmethod - def run(s: 'SSH_Socket', server_kex: 'SSH2_Kex') -> None: + def run(out: 'OutputBuffer', s: 'SSH_Socket', server_kex: 'SSH2_Kex') -> None: KEX_TO_DHGROUP = { 'diffie-hellman-group1-sha1': KexGroup1, 'diffie-hellman-group14-sha1': KexGroup14_SHA1, @@ -80,10 +81,10 @@ class HostKeyTest: break if kex_str is not None and kex_group is not None: - HostKeyTest.perform_test(s, server_kex, kex_str, kex_group, HostKeyTest.HOST_KEY_TYPES) + HostKeyTest.perform_test(out, s, server_kex, kex_str, kex_group, HostKeyTest.HOST_KEY_TYPES) @staticmethod - def perform_test(s: 'SSH_Socket', server_kex: 'SSH2_Kex', kex_str: str, kex_group: 'KexDH', host_key_types: Dict[str, Dict[str, bool]]) -> None: + def perform_test(out: 'OutputBuffer', s: 'SSH_Socket', server_kex: 'SSH2_Kex', kex_str: str, kex_group: 'KexDH', host_key_types: Dict[str, Dict[str, bool]]) -> None: hostkey_modulus_size = 0 ca_modulus_size = 0 @@ -101,22 +102,26 @@ class HostKeyTest: # If this host key type is supported by the server, we test it. if host_key_type in server_kex.key_algorithms: + out.d('Preparing to obtain ' + host_key_type + ' host key...', write_now=True) + cert = host_key_types[host_key_type]['cert'] variable_key_len = host_key_types[host_key_type]['variable_key_len'] # If the connection is closed, re-open it and get the kex again. if not s.is_connected(): - err = s.connect() + err = s.connect(out) if err is not None: + out.v(err, write_now=True) return - _, _, err = s.get_banner() + _, _, err = s.get_banner(out) if err is not None: + out.v(err, write_now=True) s.close() return # Send our KEX using the specified group-exchange and most of the server's own values. - s.send_kexinit(key_exchanges=[kex_str], hostkeys=[host_key_type], ciphers=server_kex.server.encryption, macs=server_kex.server.mac, compressions=server_kex.server.compression, languages=server_kex.server.languages) + s.send_kexinit(out, key_exchanges=[kex_str], hostkeys=[host_key_type], ciphers=server_kex.server.encryption, macs=server_kex.server.mac, compressions=server_kex.server.compression, languages=server_kex.server.languages) # Parse the server's KEX. _, payload = s.read_packet() @@ -125,8 +130,8 @@ class HostKeyTest: # Do the initial DH exchange. The server responds back # with the host key and its length. Bingo. We also get back the host key fingerprint. + kex_group.send_init(s) try: - kex_group.send_init(s) host_key = kex_group.recv_reply(s, variable_key_len) if host_key is not None: server_kex.set_host_key(host_key_type, host_key) diff --git a/src/ssh_audit/outputbuffer.py b/src/ssh_audit/outputbuffer.py index f82223c..38723e3 100644 --- a/src/ssh_audit/outputbuffer.py +++ b/src/ssh_audit/outputbuffer.py @@ -47,6 +47,7 @@ class OutputBuffer: self.section: List[str] = [] self.batch = False self.verbose = False + self.debug = False self.use_colors = True self.json = False self.__level = 0 @@ -167,7 +168,16 @@ class OutputBuffer: def v(self, s: str, write_now: bool = False) -> 'OutputBuffer': '''Prints a message if verbose output is enabled.''' - if self.verbose: + if self.verbose or self.debug: + self.info(s) + if write_now: + self.write() + + return self + + def d(self, s: str, write_now: bool = False) -> 'OutputBuffer': + '''Prints a message if verbose output is enabled.''' + if self.debug: self.info(s) if write_now: self.write() diff --git a/src/ssh_audit/ssh_audit.py b/src/ssh_audit/ssh_audit.py index eb08707..1382e61 100755 --- a/src/ssh_audit/ssh_audit.py +++ b/src/ssh_audit/ssh_audit.py @@ -84,6 +84,7 @@ def usage(err: Optional[str] = None) -> None: uout.info(' -6, --ipv6 enable IPv6 (order of precedence)') uout.info(' -b, --batch batch output') uout.info(' -c, --client-audit starts a server on port 2222 to audit client\n software config (use -p to change port;\n use -t to change timeout)') + uout.info(' -d, --debug debug output') uout.info(' -j, --json JSON output') uout.info(' -l, --level= minimum output level (info|warn|fail)') uout.info(' -L, --list-policies list all the official, built-in policies') @@ -575,8 +576,8 @@ def process_commandline(out: OutputBuffer, args: List[str], usage_cb: Callable[. # pylint: disable=too-many-branches aconf = AuditConf() try: - sopts = 'h1246M:p:P:jbcnvl:t:T:Lm' - lopts = ['help', 'ssh1', 'ssh2', 'ipv4', 'ipv6', 'make-policy=', 'port=', 'policy=', 'json', 'batch', 'client-audit', 'no-colors', 'verbose', 'level=', 'timeout=', 'targets=', 'list-policies', 'lookup=', 'threads=', 'manual'] + sopts = 'h1246M:p:P:jbcnvl:t:T:Lmd' + lopts = ['help', 'ssh1', 'ssh2', 'ipv4', 'ipv6', 'make-policy=', 'port=', 'policy=', 'json', 'batch', 'client-audit', 'no-colors', 'verbose', 'level=', 'timeout=', 'targets=', 'list-policies', 'lookup=', 'threads=', 'manual', 'debug'] opts, args = getopt.gnu_getopt(args, sopts, lopts) except getopt.GetoptError as err: usage_cb(str(err)) @@ -632,6 +633,9 @@ def process_commandline(out: OutputBuffer, args: List[str], usage_cb: Callable[. aconf.lookup = a elif o in ('-m', '--manual'): aconf.manual = True + elif o in ('-d', '--debug'): + aconf.debug = True + out.debug = True if len(args) == 0 and aconf.client_audit is False and aconf.target_file is None and aconf.list_policies is False and aconf.lookup == '' and aconf.manual is False: usage_cb() @@ -813,15 +817,18 @@ def audit(out: OutputBuffer, aconf: AuditConf, sshv: Optional[int] = None, print program_retval = exitcodes.GOOD out.batch = aconf.batch out.verbose = aconf.verbose + out.debug = aconf.debug out.level = aconf.level out.use_colors = aconf.colors s = SSH_Socket(aconf.host, aconf.port, aconf.ip_version_preference, aconf.timeout, aconf.timeout_set) + if aconf.client_audit: out.v("Listening for client connection on port %d..." % aconf.port, write_now=True) s.listen_and_accept() else: - out.v("Connecting to %s:%d..." % ('[%s]' % aconf.host if Utils.is_ipv6_address(aconf.host) else aconf.host, aconf.port), write_now=True) - err = s.connect() + out.v("Starting audit of %s:%d..." % ('[%s]' % aconf.host if Utils.is_ipv6_address(aconf.host) else aconf.host, aconf.port), write_now=True) + err = s.connect(out) + if err is not None: out.fail(err) @@ -835,14 +842,14 @@ def audit(out: OutputBuffer, aconf: AuditConf, sshv: Optional[int] = None, print if sshv is None: sshv = 2 if aconf.ssh2 else 1 err = None - banner, header, err = s.get_banner(sshv) + banner, header, err = s.get_banner(out, sshv) if banner is None: if err is None: err = '[exception] did not receive banner.' else: err = '[exception] did not receive banner: {}'.format(err) if err is None: - s.send_kexinit() # Send the algorithms we support (except we don't since this isn't a real SSH connection). + s.send_kexinit(out) # Send the algorithms we support (except we don't since this isn't a real SSH connection). packet_type, payload = s.read_packet(sshv) if packet_type < 0: @@ -878,8 +885,8 @@ def audit(out: OutputBuffer, aconf: AuditConf, sshv: Optional[int] = None, print elif sshv == 2: kex = SSH2_Kex.parse(payload) if aconf.client_audit is False: - HostKeyTest.run(s, kex) - GEXTest.run(s, kex) + HostKeyTest.run(out, s, kex) + GEXTest.run(out, s, kex) # This is a standard audit scan. if (aconf.policy is None) and (aconf.make_policy is False): diff --git a/src/ssh_audit/ssh_socket.py b/src/ssh_audit/ssh_socket.py index 28dfc50..35c2a9c 100644 --- a/src/ssh_audit/ssh_socket.py +++ b/src/ssh_audit/ssh_socket.py @@ -148,7 +148,7 @@ class SSH_Socket(ReadBuf, WriteBuf): c.settimeout(self.__timeout) self.__sock = c - def connect(self) -> Optional[str]: + def connect(self, out: 'OutputBuffer') -> Optional[str]: '''Returns None on success, or an error string.''' err = None for af, addr in self._resolve(): @@ -156,6 +156,7 @@ class SSH_Socket(ReadBuf, WriteBuf): try: s = socket.socket(af, socket.SOCK_STREAM) s.settimeout(self.__timeout) + out.d(("Connecting to %s:%d..." % ('[%s]' % addr[0] if Utils.is_ipv6_address(addr[0]) else addr[0], addr[1])), write_now=True) s.connect(addr) self.__sock = s return None @@ -169,7 +170,9 @@ class SSH_Socket(ReadBuf, WriteBuf): errm = 'cannot connect to {} port {}: {}'.format(*errt) return '[exception] {}'.format(errm) - def get_banner(self, sshv: int = 2) -> Tuple[Optional['Banner'], List[str], Optional[str]]: + def get_banner(self, out: 'OutputBuffer', sshv: int = 2) -> Tuple[Optional['Banner'], List[str], Optional[str]]: + out.d('Getting banner...', write_now=True) + if self.__sock is None: return self.__banner, self.__header, 'not connected' if self.__banner is not None: @@ -226,9 +229,11 @@ class SSH_Socket(ReadBuf, WriteBuf): return -1, str(e.args[-1]) # Send a KEXINIT with the lists of key exchanges, hostkeys, ciphers, MACs, compressions, and languages that we "support". - def send_kexinit(self, key_exchanges: List[str] = ['curve25519-sha256', 'curve25519-sha256@libssh.org', 'ecdh-sha2-nistp256', 'ecdh-sha2-nistp384', 'ecdh-sha2-nistp521', 'diffie-hellman-group-exchange-sha256', 'diffie-hellman-group16-sha512', 'diffie-hellman-group18-sha512', 'diffie-hellman-group14-sha256'], hostkeys: List[str] = ['rsa-sha2-512', 'rsa-sha2-256', 'ssh-rsa', 'ecdsa-sha2-nistp256', 'ssh-ed25519'], ciphers: List[str] = ['chacha20-poly1305@openssh.com', 'aes128-ctr', 'aes192-ctr', 'aes256-ctr', 'aes128-gcm@openssh.com', 'aes256-gcm@openssh.com'], macs: List[str] = ['umac-64-etm@openssh.com', 'umac-128-etm@openssh.com', 'hmac-sha2-256-etm@openssh.com', 'hmac-sha2-512-etm@openssh.com', 'hmac-sha1-etm@openssh.com', 'umac-64@openssh.com', 'umac-128@openssh.com', 'hmac-sha2-256', 'hmac-sha2-512', 'hmac-sha1'], compressions: List[str] = ['none', 'zlib@openssh.com'], languages: List[str] = ['']) -> None: # pylint: disable=dangerous-default-value + def send_kexinit(self, out: 'OutputBuffer', key_exchanges: List[str] = ['curve25519-sha256', 'curve25519-sha256@libssh.org', 'ecdh-sha2-nistp256', 'ecdh-sha2-nistp384', 'ecdh-sha2-nistp521', 'diffie-hellman-group-exchange-sha256', 'diffie-hellman-group16-sha512', 'diffie-hellman-group18-sha512', 'diffie-hellman-group14-sha256'], hostkeys: List[str] = ['rsa-sha2-512', 'rsa-sha2-256', 'ssh-rsa', 'ecdsa-sha2-nistp256', 'ssh-ed25519'], ciphers: List[str] = ['chacha20-poly1305@openssh.com', 'aes128-ctr', 'aes192-ctr', 'aes256-ctr', 'aes128-gcm@openssh.com', 'aes256-gcm@openssh.com'], macs: List[str] = ['umac-64-etm@openssh.com', 'umac-128-etm@openssh.com', 'hmac-sha2-256-etm@openssh.com', 'hmac-sha2-512-etm@openssh.com', 'hmac-sha1-etm@openssh.com', 'umac-64@openssh.com', 'umac-128@openssh.com', 'hmac-sha2-256', 'hmac-sha2-512', 'hmac-sha1'], compressions: List[str] = ['none', 'zlib@openssh.com'], languages: List[str] = ['']) -> None: # pylint: disable=dangerous-default-value '''Sends the list of supported host keys, key exchanges, ciphers, and MACs. Emulates OpenSSH v8.2.''' + out.d('KEX initialisation...', write_now=True) + kexparty = SSH2_KexParty(ciphers, macs, compressions, languages) kex = SSH2_Kex(os.urandom(16), key_exchanges, hostkeys, kexparty, kexparty, False, 0) diff --git a/ssh-audit.1 b/ssh-audit.1 index d7d0217..482f164 100644 --- a/ssh-audit.1 +++ b/ssh-audit.1 @@ -1,4 +1,4 @@ -.TH SSH-AUDIT 1 "February 7, 2021" +.TH SSH-AUDIT 1 "March 2, 2021" .SH NAME \fBssh-audit\fP \- SSH server & client configuration auditor .SH SYNOPSIS @@ -46,6 +46,11 @@ Enables grepable output. .br Starts a server on port 2222 to audit client software configuration. Use -p/--port= to change port and -t/--timeout= to change listen timeout. +.TP +.B -d, \-\-debug +.br +Enable debug output. + .TP .B -j, \-\-json .br