mirror of
https://github.com/jtesta/ssh-audit.git
synced 2025-06-22 10:43:41 +02:00
Added policy checks (#10).
This commit is contained in:
440
ssh-audit.py
440
ssh-audit.py
@ -23,6 +23,7 @@
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
||||
"""
|
||||
from datetime import date
|
||||
import base64
|
||||
import binascii
|
||||
import errno
|
||||
@ -37,6 +38,7 @@ import select
|
||||
import socket
|
||||
import struct
|
||||
import sys
|
||||
import traceback
|
||||
# pylint: disable=unused-import
|
||||
from typing import Dict, List, Set, Sequence, Tuple, Iterable
|
||||
from typing import Callable, Optional, Union, Any
|
||||
@ -57,27 +59,340 @@ def usage(err: Optional[str] = None) -> None:
|
||||
uout.head('# {} {}, https://github.com/jtesta/ssh-audit\n'.format(p, VERSION))
|
||||
if err is not None and len(err) > 0:
|
||||
uout.fail('\n' + err)
|
||||
uout.info('usage: {} [-1246pbcnjvlt] <host>\n'.format(p))
|
||||
uout.info('usage: {0} [-h1246ptbcPjlnv] <host>\n'.format(p))
|
||||
uout.info(' -h, --help print this help')
|
||||
uout.info(' -1, --ssh1 force ssh version 1 only')
|
||||
uout.info(' -2, --ssh2 force ssh version 2 only')
|
||||
uout.info(' -4, --ipv4 enable IPv4 (order of precedence)')
|
||||
uout.info(' -6, --ipv6 enable IPv6 (order of precedence)')
|
||||
uout.info(' -p, --port=<port> port to connect')
|
||||
uout.info(' -t, --timeout=<secs> timeout (in seconds) for connection and reading\n (default: 5)')
|
||||
uout.info('')
|
||||
uout.info(' -b, --batch batch output')
|
||||
uout.info(' -c, --client-audit starts a server on port 2222 to audit client\n software config (use -p to change port;\n use -t to change timeout)')
|
||||
uout.info(' -n, --no-colors disable colors')
|
||||
uout.info(' -M, --make-policy=<policy.txt> creates a policy based on the target server\n (i.e.: the target server has the ideal\n configuration that other servers should\n adhere to)')
|
||||
uout.info(' -P, --policy=<policy.txt> run a policy test using the specified policy')
|
||||
uout.info('')
|
||||
uout.info(' -j, --json JSON output')
|
||||
uout.info(' -v, --verbose verbose output')
|
||||
uout.info(' -l, --level=<level> minimum output level (info|warn|fail)')
|
||||
uout.info(' -t, --timeout=<secs> timeout (in seconds) for connection and reading\n (default: 5)')
|
||||
uout.info(' -n, --no-colors disable colors')
|
||||
uout.info(' -v, --verbose verbose output')
|
||||
uout.sep()
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
# Validates policy files and performs policy testing
|
||||
class Policy:
|
||||
|
||||
def __init__(self, policy_file: str = None, policy_data: str = None) -> None:
|
||||
self._name = None # type: Optional[str]
|
||||
self._version = None # type: Optional[str]
|
||||
self._banner = None # type: Optional[str]
|
||||
self._header = None # type: Optional[str]
|
||||
self._compressions = None # type: Optional[List[str]]
|
||||
self._host_keys = None # type: Optional[List[str]]
|
||||
self._kex = None # type: Optional[List[str]]
|
||||
self._ciphers = None # type: Optional[List[str]]
|
||||
self._macs = None # type: Optional[List[str]]
|
||||
self._hostkey_sizes = None # type: Optional[Dict[str, int]]
|
||||
self._cakey_sizes = None # type: Optional[Dict[str, int]]
|
||||
self._dh_modulus_sizes = None # type: Optional[Dict[str, int]]
|
||||
|
||||
if (policy_file is None) and (policy_data is None):
|
||||
raise RuntimeError('policy_file and policy_data must not both be None.')
|
||||
elif (policy_file is not None) and (policy_data is not None):
|
||||
raise RuntimeError('policy_file and policy_data must not both be specified.')
|
||||
|
||||
if policy_file is not None:
|
||||
with open(policy_file, "r") as f:
|
||||
policy_data = f.read()
|
||||
|
||||
lines = []
|
||||
if policy_data is not None:
|
||||
lines = policy_data.split("\n")
|
||||
|
||||
for line in lines:
|
||||
line = line.strip()
|
||||
if (len(line) == 0) or line.startswith('#'):
|
||||
continue
|
||||
|
||||
key = None
|
||||
val = None
|
||||
try:
|
||||
key, val = line.split('=')
|
||||
except ValueError:
|
||||
raise ValueError("could not parse line: %s" % line)
|
||||
|
||||
key = key.strip()
|
||||
val = val.strip()
|
||||
|
||||
if key not in ['name', 'version', 'banner', 'header', 'compressions', 'host keys', 'key exchanges', 'ciphers', 'macs'] and not key.startswith('hostkey_size_') and not key.startswith('cakey_size_') and not key.startswith('dh_modulus_size_'):
|
||||
raise ValueError("invalid field found in policy: %s" % line)
|
||||
|
||||
if key in ['name', 'banner', 'header']:
|
||||
|
||||
# If the banner value is blank, set it to "" so that the code below handles it.
|
||||
if len(val) < 2:
|
||||
val = "\"\""
|
||||
|
||||
if (val[0] != '"') or (val[-1] != '"'):
|
||||
raise ValueError('the value for the %s field must be enclosed in quotes: %s' % (key, val))
|
||||
|
||||
# Remove the surrounding quotes, and unescape quotes & newlines.
|
||||
val = val[1:-1]. replace("\\\"", "\"").replace("\\n", "\n")
|
||||
|
||||
if key == 'name':
|
||||
self._name = val
|
||||
elif key == 'banner':
|
||||
self._banner = val
|
||||
else:
|
||||
self._header = val
|
||||
elif key == 'version':
|
||||
self._version = val
|
||||
elif key in ['compressions', 'host keys', 'key exchanges', 'ciphers', 'macs']:
|
||||
try:
|
||||
algs = val.split(',')
|
||||
except ValueError:
|
||||
# If the value has no commas, then set the algorithm list to just the value.
|
||||
algs = [val]
|
||||
|
||||
# Strip whitespace in each algorithm name.
|
||||
algs = [alg.strip() for alg in algs]
|
||||
|
||||
if key == 'compressions':
|
||||
self._compressions = algs
|
||||
elif key == 'host keys':
|
||||
self._host_keys = algs
|
||||
elif key == 'key exchanges':
|
||||
self._kex = algs
|
||||
elif key == 'ciphers':
|
||||
self._ciphers = algs
|
||||
elif key == 'macs':
|
||||
self._macs = algs
|
||||
elif key.startswith('hostkey_size_'):
|
||||
hostkey_type = key[13:]
|
||||
if self._hostkey_sizes is None:
|
||||
self._hostkey_sizes = {}
|
||||
self._hostkey_sizes[hostkey_type] = int(val)
|
||||
elif key.startswith('cakey_size_'):
|
||||
cakey_type = key[11:]
|
||||
if self._cakey_sizes is None:
|
||||
self._cakey_sizes = {}
|
||||
self._cakey_sizes[cakey_type] = int(val)
|
||||
elif key.startswith('dh_modulus_size_'):
|
||||
dh_modulus_type = key[16:]
|
||||
if self._dh_modulus_sizes is None:
|
||||
self._dh_modulus_sizes = {}
|
||||
self._dh_modulus_sizes[dh_modulus_type] = int(val)
|
||||
|
||||
|
||||
if self._name is None:
|
||||
raise ValueError('The policy does not have a name field.')
|
||||
if self._version is None:
|
||||
raise ValueError('The policy does not have a version field.')
|
||||
|
||||
|
||||
@staticmethod
|
||||
def create(host: str, banner: Optional['SSH.Banner'], header: List[str], kex: Optional['SSH2.Kex']) -> str:
|
||||
'''Creates a policy based on a server configuration. Returns a string.'''
|
||||
|
||||
today = date.today().strftime('%Y/%m/%d')
|
||||
compressions = None
|
||||
host_keys = None
|
||||
kex_algs = None
|
||||
ciphers = None
|
||||
macs = None
|
||||
rsa_hostkey_sizes_str = ''
|
||||
rsa_cakey_sizes_str = ''
|
||||
dh_modulus_sizes_str = ''
|
||||
|
||||
if kex is not None:
|
||||
if kex.server.compression is not None:
|
||||
compressions = ', '.join(kex.server.compression)
|
||||
if kex.key_algorithms is not None:
|
||||
host_keys = ', '.join(kex.key_algorithms)
|
||||
if kex.kex_algorithms is not None:
|
||||
kex_algs = ', '.join(kex.kex_algorithms)
|
||||
if kex.server.encryption is not None:
|
||||
ciphers = ', '.join(kex.server.encryption)
|
||||
if kex.server.mac is not None:
|
||||
macs = ', '.join(kex.server.mac)
|
||||
if kex.rsa_key_sizes():
|
||||
rsa_key_sizes_dict = kex.rsa_key_sizes()
|
||||
for host_key_type in sorted(rsa_key_sizes_dict):
|
||||
hostkey_size, cakey_size = rsa_key_sizes_dict[host_key_type]
|
||||
|
||||
rsa_hostkey_sizes_str = "%shostkey_size_%s = %d\n" % (rsa_hostkey_sizes_str, host_key_type, hostkey_size)
|
||||
if cakey_size != -1:
|
||||
rsa_cakey_sizes_str = "%scakey_size_%s = %d\n" % (rsa_cakey_sizes_str, host_key_type, cakey_size)
|
||||
|
||||
if len(rsa_hostkey_sizes_str) > 0:
|
||||
rsa_hostkey_sizes_str = "\n# RSA host key sizes.\n%s" % rsa_hostkey_sizes_str
|
||||
if len(rsa_cakey_sizes_str) > 0:
|
||||
rsa_cakey_sizes_str = "\n# RSA CA key sizes.\n%s" % rsa_cakey_sizes_str
|
||||
if kex.dh_modulus_sizes():
|
||||
dh_modulus_sizes_dict = kex.dh_modulus_sizes()
|
||||
for gex_type in sorted(dh_modulus_sizes_dict):
|
||||
modulus_size, _ = dh_modulus_sizes_dict[gex_type]
|
||||
dh_modulus_sizes_str = "%sdh_modulus_size_%s = %d\n" % (dh_modulus_sizes_str, gex_type, modulus_size)
|
||||
if len(dh_modulus_sizes_str) > 0:
|
||||
dh_modulus_sizes_str = "\n# Group exchange DH modulus sizes.\n%s" % dh_modulus_sizes_str
|
||||
|
||||
|
||||
policy_data = '''#
|
||||
# Custom policy based on %s (created on %s)
|
||||
#
|
||||
|
||||
# The name of this policy (displayed in the output during scans). Must be in quotes.
|
||||
name = "Custom Policy (based on %s on %s)"
|
||||
|
||||
# The version of this policy (displayed in the output during scans). Not parsed, and may be any value, including strings.
|
||||
version = 1
|
||||
|
||||
# The banner that must match exactly. Commented out to ignore banners, since minor variability in the banner is sometimes normal.
|
||||
# banner = "%s"
|
||||
|
||||
# The header that must match exactly. Commented out to ignore headers, since variability in the header is sometimes normal.
|
||||
# header = "%s"
|
||||
|
||||
# The compression options that must match exactly (order matters). Commented out to ignore by default.
|
||||
# compressions = %s
|
||||
%s%s%s
|
||||
# The host key types that must match exactly (order matters).
|
||||
host keys = %s
|
||||
|
||||
# The key exchange algorithms that must match exactly (order matters).
|
||||
key exchanges = %s
|
||||
|
||||
# The ciphers that must match exactly (order matters).
|
||||
ciphers = %s
|
||||
|
||||
# The MACs that must match exactly (order matters).
|
||||
macs = %s
|
||||
''' % (host, today, host, today, banner, header, compressions, rsa_hostkey_sizes_str, rsa_cakey_sizes_str, dh_modulus_sizes_str, host_keys, kex_algs, ciphers, macs)
|
||||
|
||||
return policy_data
|
||||
|
||||
|
||||
def evaluate(self, banner: Optional['SSH.Banner'], header: List[str], kex: Optional['SSH2.Kex']) -> Tuple[bool, List[str]]:
|
||||
'''Evaluates a server configuration against this policy. Returns a tuple of a boolean (True if server adheres to policy) and an array of strings that holds error messages.'''
|
||||
|
||||
ret = True
|
||||
errors = []
|
||||
|
||||
banner_str = str(banner)
|
||||
if (self._banner is not None) and (banner_str != self._banner):
|
||||
ret = False
|
||||
errors.append('Banner did not match. Expected: [%s]; Actual: [%s]' % (self._banner, banner_str))
|
||||
|
||||
if (self._header is not None) and (header != self._header):
|
||||
ret = False
|
||||
errors.append('Header did not match. Expected: [%s]; Actual: [%s]' % (self._header, header))
|
||||
|
||||
# All subsequent tests require a valid kex, so end here if we don't have one.
|
||||
if kex is None:
|
||||
return ret, errors
|
||||
|
||||
if (self._compressions is not None) and (kex.server.compression != self._compressions):
|
||||
ret = False
|
||||
errors.append('Compression types did not match. Expected: %s; Actual: %s' % (self._compressions, kex.server.compression))
|
||||
|
||||
if (self._host_keys is not None) and (kex.key_algorithms != self._host_keys):
|
||||
ret = False
|
||||
errors.append('Host key types did not match. Expected: %s; Actual: %s' % (self._host_keys, kex.key_algorithms))
|
||||
|
||||
if self._hostkey_sizes is not None:
|
||||
hostkey_types = list(self._hostkey_sizes.keys())
|
||||
hostkey_types.sort() # Sorted to make testing output repeatable.
|
||||
for hostkey_type in hostkey_types:
|
||||
expected_hostkey_size = self._hostkey_sizes[hostkey_type]
|
||||
if hostkey_type in kex.rsa_key_sizes():
|
||||
actual_hostkey_size, actual_cakey_size = kex.rsa_key_sizes()[hostkey_type]
|
||||
if actual_hostkey_size != expected_hostkey_size:
|
||||
ret = False
|
||||
errors.append('RSA hostkey (%s) sizes did not match. Expected: %d; Actual: %d' % (hostkey_type, expected_hostkey_size, actual_hostkey_size))
|
||||
|
||||
if self._cakey_sizes is not None:
|
||||
hostkey_types = list(self._cakey_sizes.keys())
|
||||
hostkey_types.sort() # Sorted to make testing output repeatable.
|
||||
for hostkey_type in hostkey_types:
|
||||
expected_cakey_size = self._cakey_sizes[hostkey_type]
|
||||
if hostkey_type in kex.rsa_key_sizes():
|
||||
actual_hostkey_size, actual_cakey_size = kex.rsa_key_sizes()[hostkey_type]
|
||||
if actual_cakey_size != expected_cakey_size:
|
||||
ret = False
|
||||
errors.append('RSA CA key (%s) sizes did not match. Expected: %d; Actual: %d' % (hostkey_type, expected_cakey_size, actual_cakey_size))
|
||||
|
||||
if kex.kex_algorithms != self._kex:
|
||||
ret = False
|
||||
errors.append('Key exchanges did not match. Expected: %s; Actual: %s' % (self._kex, kex.kex_algorithms))
|
||||
|
||||
if (self._ciphers is not None) and (kex.server.encryption != self._ciphers):
|
||||
ret = False
|
||||
errors.append('Ciphers did not match. Expected: %s; Actual: %s' % (self._ciphers, kex.server.encryption))
|
||||
|
||||
if (self._macs is not None) and (kex.server.mac != self._macs):
|
||||
ret = False
|
||||
errors.append('MACs did not match. Expected: %s; Actual: %s' % (self._macs, kex.server.mac))
|
||||
|
||||
if self._dh_modulus_sizes is not None:
|
||||
dh_modulus_types = list(self._dh_modulus_sizes.keys())
|
||||
dh_modulus_types.sort() # Sorted to make testing output repeatable.
|
||||
for dh_modulus_type in dh_modulus_types:
|
||||
expected_dh_modulus_size = self._dh_modulus_sizes[dh_modulus_type]
|
||||
if dh_modulus_type in kex.dh_modulus_sizes():
|
||||
actual_dh_modulus_size, _ = kex.dh_modulus_sizes()[dh_modulus_type]
|
||||
if expected_dh_modulus_size != actual_dh_modulus_size:
|
||||
ret = False
|
||||
errors.append('Group exchange (%s) modulus sizes did not match. Expected: %d; Actual: %d' % (dh_modulus_type, expected_dh_modulus_size, actual_dh_modulus_size))
|
||||
|
||||
return ret, errors
|
||||
|
||||
|
||||
def get_name_and_version(self) -> str:
|
||||
'''Returns a string of this Policy's name and version.'''
|
||||
return '%s v%s' % (self._name, self._version)
|
||||
|
||||
|
||||
def __str__(self) -> str:
|
||||
undefined = '{undefined}'
|
||||
|
||||
name = undefined
|
||||
version = undefined
|
||||
banner = undefined
|
||||
header = undefined
|
||||
compressions_str = undefined
|
||||
host_keys_str = undefined
|
||||
kex_str = undefined
|
||||
ciphers_str = undefined
|
||||
macs_str = undefined
|
||||
|
||||
if self._name is not None:
|
||||
name = '[%s]' % self._name
|
||||
if self._version is not None:
|
||||
version = '[%s]' % self._version
|
||||
if self._banner is not None:
|
||||
banner = '[%s]' % self._banner
|
||||
if self._header is not None:
|
||||
header = '[%s]' % self._header
|
||||
|
||||
if self._compressions is not None:
|
||||
compressions_str = ', '.join(self._compressions)
|
||||
if self._host_keys is not None:
|
||||
host_keys_str = ', '.join(self._host_keys)
|
||||
if self._kex is not None:
|
||||
kex_str = ', '.join(self._kex)
|
||||
if self._ciphers is not None:
|
||||
ciphers_str = ', '.join(self._ciphers)
|
||||
if self._macs is not None:
|
||||
macs_str = ', '.join(self._macs)
|
||||
|
||||
return "Name: %s\nVersion: %s\nBanner: %s\nHeader: %s\nCompressions: %s\nHost Keys: %s\nKey Exchanges: %s\nCiphers: %s\nMACs: %s" % (name, version, banner, header, compressions_str, host_keys_str, kex_str, ciphers_str, macs_str)
|
||||
|
||||
|
||||
class AuditConf:
|
||||
# pylint: disable=too-many-instance-attributes
|
||||
def __init__(self, host: Optional[str] = None, port: int = 22) -> None:
|
||||
def __init__(self, host: str = '', port: int = 22) -> None:
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.ssh1 = True
|
||||
@ -91,12 +406,15 @@ class AuditConf:
|
||||
self.ipvo = () # type: Sequence[int]
|
||||
self.ipv4 = False
|
||||
self.ipv6 = False
|
||||
self.make_policy = False # When True, creates a policy file from an audit scan.
|
||||
self.policy_file = None # type: Optional[str] # File system path to a policy
|
||||
self.policy = None # type: Optional[Policy] # Policy object
|
||||
self.timeout = 5.0
|
||||
self.timeout_set = False # Set to True when the user explicitly sets it.
|
||||
|
||||
def __setattr__(self, name: str, value: Union[str, int, float, bool, Sequence[int]]) -> None:
|
||||
valid = False
|
||||
if name in ['ssh1', 'ssh2', 'batch', 'client_audit', 'colors', 'verbose', 'timeout_set', 'json']:
|
||||
if name in ['ssh1', 'ssh2', 'batch', 'client_audit', 'colors', 'verbose', 'timeout_set', 'json', 'make_policy']:
|
||||
valid, value = True, bool(value)
|
||||
elif name in ['ipv4', 'ipv6']:
|
||||
valid = False
|
||||
@ -134,6 +452,9 @@ class AuditConf:
|
||||
if value == -1.0:
|
||||
raise ValueError('invalid timeout: {}'.format(value))
|
||||
valid = True
|
||||
elif name in ['policy_file', 'policy']:
|
||||
valid = True
|
||||
|
||||
if valid:
|
||||
object.__setattr__(self, name, value)
|
||||
|
||||
@ -142,13 +463,13 @@ class AuditConf:
|
||||
# pylint: disable=too-many-branches
|
||||
aconf = cls()
|
||||
try:
|
||||
sopts = 'h1246p:bcnjvl:t:'
|
||||
lopts = ['help', 'ssh1', 'ssh2', 'ipv4', 'ipv6', 'port=', 'json',
|
||||
'batch', 'client-audit', 'no-colors', 'verbose', 'level=', 'timeout=']
|
||||
sopts = 'h1246M:p:P:jbcnvl:t:'
|
||||
lopts = ['help', 'ssh1', 'ssh2', 'ipv4', 'ipv6', 'make-policy=', 'port=', 'policy=', 'json', 'batch', 'client-audit', 'no-colors', 'verbose', 'level=', 'timeout=']
|
||||
opts, args = getopt.gnu_getopt(args, sopts, lopts)
|
||||
except getopt.GetoptError as err:
|
||||
usage_cb(str(err))
|
||||
aconf.ssh1, aconf.ssh2 = False, False
|
||||
host = '' # type: str
|
||||
oport = None
|
||||
for o, a in opts:
|
||||
if o in ('-h', '--help'):
|
||||
@ -181,11 +502,16 @@ class AuditConf:
|
||||
elif o in ('-t', '--timeout'):
|
||||
aconf.timeout = float(a)
|
||||
aconf.timeout_set = True
|
||||
elif o in ('-M', '--make-policy'):
|
||||
aconf.make_policy = True
|
||||
aconf.policy_file = a
|
||||
elif o in ('-P', '--policy'):
|
||||
aconf.policy_file = a
|
||||
if len(args) == 0 and aconf.client_audit is False:
|
||||
usage_cb()
|
||||
if aconf.client_audit is False:
|
||||
if oport is not None:
|
||||
host = args[0] # type: Optional[str]
|
||||
host = args[0]
|
||||
else:
|
||||
mx = re.match(r'^\[([^\]]+)\](?::(.*))?$', args[0])
|
||||
if mx is not None:
|
||||
@ -198,10 +524,8 @@ class AuditConf:
|
||||
host, oport = s[0], s[1] if len(s) > 1 else '22'
|
||||
if not host:
|
||||
usage_cb('host is empty')
|
||||
else:
|
||||
host = None
|
||||
if oport is None:
|
||||
oport = '2222'
|
||||
elif oport is None:
|
||||
oport = '2222'
|
||||
port = utils.parse_int(oport)
|
||||
if port <= 0 or port > 65535:
|
||||
usage_cb('port {} is not valid'.format(oport))
|
||||
@ -209,6 +533,15 @@ class AuditConf:
|
||||
aconf.port = port
|
||||
if not (aconf.ssh1 or aconf.ssh2):
|
||||
aconf.ssh1, aconf.ssh2 = True, True
|
||||
|
||||
# If a policy file was provided, validate it.
|
||||
if (aconf.policy_file is not None) and (aconf.make_policy is False):
|
||||
try:
|
||||
aconf.policy = Policy(policy_file=aconf.policy_file)
|
||||
except Exception as e:
|
||||
print("Error while loading policy file: %s: %s" % (str(e), traceback.format_exc()))
|
||||
sys.exit(-1)
|
||||
|
||||
return aconf
|
||||
|
||||
|
||||
@ -2782,7 +3115,13 @@ def output_info(software: Optional['SSH.Software'], client_audit: bool, any_prob
|
||||
out.sep()
|
||||
|
||||
|
||||
def output(banner: Optional[SSH.Banner], header: List[str], client_host: Optional[str] = None, kex: Optional[SSH2.Kex] = None, pkm: Optional[SSH1.PublicKeyMessage] = None) -> None:
|
||||
def output(aconf: AuditConf, banner: Optional[SSH.Banner], header: List[str], client_host: Optional[str] = None, kex: Optional[SSH2.Kex] = None, pkm: Optional[SSH1.PublicKeyMessage] = None) -> None:
|
||||
|
||||
# If the user requested JSON output, output that and return immediately.
|
||||
if aconf.json:
|
||||
print(json.dumps(build_struct(banner, kex=kex, client_host=client_host), sort_keys=True))
|
||||
return
|
||||
|
||||
client_audit = client_host is not None # If set, this is a client audit.
|
||||
sshv = 1 if pkm is not None else 2
|
||||
algs = SSH.Algorithms(pkm, kex)
|
||||
@ -2848,6 +3187,40 @@ def output(banner: Optional[SSH.Banner], header: List[str], client_host: Optiona
|
||||
out.warn("\n\n!!! WARNING: unknown algorithm(s) found!: %s. Please email the full output above to the maintainer (jtesta@positronsecurity.com), or create a Github issue at <https://github.com/jtesta/ssh-audit/issues>.\n" % ','.join(unknown_algorithms))
|
||||
|
||||
|
||||
def evaluate_policy(aconf: AuditConf, banner: Optional['SSH.Banner'], header: List[str], kex: Optional['SSH2.Kex'] = None) -> bool:
|
||||
|
||||
if aconf.policy is None:
|
||||
raise RuntimeError('Internal error: cannot evaluate against null Policy!')
|
||||
|
||||
passed, errors = aconf.policy.evaluate(banner, header, kex)
|
||||
if aconf.json:
|
||||
json_struct = {'host': aconf.host, 'policy': aconf.policy.get_name_and_version(), 'passed': passed, 'errors': errors}
|
||||
print(json.dumps(json_struct, sort_keys=True))
|
||||
else:
|
||||
print("Host: %s" % aconf.host)
|
||||
print("Policy: %s" % aconf.policy.get_name_and_version())
|
||||
print("Result: ", end='')
|
||||
if passed:
|
||||
out.good("✔ Passed")
|
||||
else:
|
||||
out.fail("❌ Failed!")
|
||||
out.warn("\nErrors:\n * %s" % '\n * '.join(errors))
|
||||
|
||||
return passed
|
||||
|
||||
|
||||
def make_policy(aconf: AuditConf, banner: Optional['SSH.Banner'], header: List[str], kex: Optional['SSH2.Kex']) -> None:
|
||||
policy_data = Policy.create(aconf.host, banner, header, kex)
|
||||
|
||||
if aconf.policy_file is None:
|
||||
raise RuntimeError('Internal error: cannot write policy file since filename is None!')
|
||||
|
||||
with open(aconf.policy_file, 'w') as f:
|
||||
f.write(policy_data)
|
||||
|
||||
print("Wrote policy to %s. Customize as necessary." % aconf.policy_file)
|
||||
|
||||
|
||||
class Utils:
|
||||
@classmethod
|
||||
def _type_err(cls, v: Any, target: str) -> TypeError:
|
||||
@ -3024,7 +3397,7 @@ def build_struct(banner, kex=None, pkm=None, client_host=None):
|
||||
return res
|
||||
|
||||
|
||||
def audit(aconf: AuditConf, sshv: Optional[int] = None) -> None:
|
||||
def audit(aconf: AuditConf, sshv: Optional[int] = None) -> int:
|
||||
out.batch = aconf.batch
|
||||
out.verbose = aconf.verbose
|
||||
out.level = aconf.level
|
||||
@ -3055,8 +3428,7 @@ def audit(aconf: AuditConf, sshv: Optional[int] = None) -> None:
|
||||
payload_txt = u'"{}"'.format(repr(payload).lstrip('b')[1:-1])
|
||||
if payload_txt == u'Protocol major versions differ.':
|
||||
if sshv == 2 and aconf.ssh1:
|
||||
audit(aconf, 1)
|
||||
return
|
||||
return audit(aconf, 1)
|
||||
err = '[exception] error reading packet ({})'.format(payload_txt)
|
||||
else:
|
||||
err_pair = None
|
||||
@ -3069,34 +3441,48 @@ def audit(aconf: AuditConf, sshv: Optional[int] = None) -> None:
|
||||
'instead received unknown message ({2})'
|
||||
err = fmt.format(err_pair[0], err_pair[1], packet_type)
|
||||
if err is not None:
|
||||
output(banner, header)
|
||||
output(aconf, banner, header)
|
||||
out.fail(err)
|
||||
sys.exit(1)
|
||||
return 1
|
||||
if sshv == 1:
|
||||
pkm = SSH1.PublicKeyMessage.parse(payload)
|
||||
if aconf.json:
|
||||
print(json.dumps(build_struct(banner, pkm=pkm), sort_keys=True))
|
||||
else:
|
||||
output(banner, header, pkm=pkm)
|
||||
output(aconf, banner, header, pkm=pkm)
|
||||
elif sshv == 2:
|
||||
kex = SSH2.Kex.parse(payload)
|
||||
if aconf.client_audit is False:
|
||||
SSH2.HostKeyTest.run(s, kex)
|
||||
SSH2.GEXTest.run(s, kex)
|
||||
if aconf.json:
|
||||
print(json.dumps(build_struct(banner, kex=kex, client_host=s.client_host), sort_keys=True))
|
||||
|
||||
# This is a standard audit scan.
|
||||
if (aconf.policy is None) and (aconf.make_policy is False):
|
||||
output(aconf, banner, header, client_host=s.client_host, kex=kex)
|
||||
|
||||
# This is a policy test.
|
||||
elif (aconf.policy is not None) and (aconf.make_policy is False):
|
||||
return 0 if evaluate_policy(aconf, banner, header, kex=kex) else 1
|
||||
|
||||
# A new policy should be made from this scan.
|
||||
elif (aconf.policy is None) and (aconf.make_policy is True):
|
||||
make_policy(aconf, banner, header, kex=kex)
|
||||
|
||||
else:
|
||||
output(banner, header, client_host=s.client_host, kex=kex)
|
||||
raise RuntimeError('Internal error while handling output: %r %r' % (aconf.policy is None, aconf.make_policy))
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
utils = Utils()
|
||||
out = Output()
|
||||
|
||||
|
||||
def main() -> None: # printed text is still None
|
||||
def main() -> int:
|
||||
conf = AuditConf.from_cmdline(sys.argv[1:], usage)
|
||||
audit(conf)
|
||||
return audit(conf)
|
||||
|
||||
|
||||
if __name__ == '__main__': # pragma: nocover
|
||||
main()
|
||||
exit_code = main()
|
||||
sys.exit(exit_code)
|
||||
|
Reference in New Issue
Block a user