Moved built-in policies from external files to internal database. (#75)

This commit is contained in:
Joe Testa
2020-10-19 17:27:37 -04:00
parent 2a7b9292bb
commit 046c866da4
53 changed files with 256 additions and 437 deletions

View File

@@ -33,7 +33,7 @@ import traceback
from typing import Dict, List, Set, Sequence, Tuple, Iterable # noqa: F401
from typing import Callable, Optional, Union, Any # noqa: F401
from ssh_audit.globals import GITHUB_ISSUES_URL, VERSION
from ssh_audit.globals import VERSION
from ssh_audit.algorithm import Algorithm
from ssh_audit.algorithms import Algorithms
from ssh_audit.auditconf import AuditConf
@@ -508,55 +508,22 @@ def evaluate_policy(aconf: AuditConf, banner: Optional['Banner'], client_host: O
def list_policies() -> None:
'''Prints a list of server & client policies.'''
# Get a list of all the files in the policies sub-directory, relative to the path of this script.
installed_dir = os.path.dirname(os.path.abspath(__file__))
policies_dir = os.path.join(installed_dir, 'policies')
server_policy_names, client_policy_names = Policy.list_builtin_policies()
# If the path is not a directory, print a useful error and exit.
if not os.path.isdir(policies_dir):
print("Error: could not find policies directory. Please report this full output to <%s>:" % GITHUB_ISSUES_URL)
print("\nsys.argv[0]: %s" % sys.argv[0])
print("__file__: %s" % __file__)
print("policies_dir: %s" % policies_dir)
sys.exit(exitcodes.UNKNOWN_ERROR)
# Get a list of all the files in the policies sub-directory.
files = []
for f in os.listdir(policies_dir):
files.append(f)
files.sort() # Now the files will be in order, like 'ubuntu_client_16_04.txt', 'ubuntu_client_18_04.txt', 'ubuntu_client_20_04.txt', ...
server_policies_summary = []
client_policies_summary = []
for f in files:
# Load each policy, and generate a short summary from its name and absolute file path.
policy_file = os.path.join(policies_dir, f)
policy = Policy(policy_file=policy_file)
policy_summary = "Name: %s\nPolicy path: %s" % (policy.get_name_and_version(), policy_file)
# We will print the server policies separately from thee client policies...
if policy.is_server_policy():
server_policies_summary.append(policy_summary)
else:
client_policies_summary.append(policy_summary)
if len(server_policies_summary) > 0:
if len(server_policy_names) > 0:
out.head('\nServer policies:\n')
print("\n\n".join(server_policies_summary))
print()
print(" * \"%s\"" % "\"\n * \"".join(server_policy_names))
if len(client_policies_summary) > 0:
if len(client_policy_names) > 0:
out.head('\nClient policies:\n')
print("\n\n".join(client_policies_summary))
print()
print(" * \"%s\"" % "\"\n * \"".join(client_policy_names))
if len(server_policies_summary) == 0 and len(client_policies_summary) == 0:
print("Error: no built-in policies found in %s." % policies_dir)
if len(server_policy_names) == 0 and len(client_policy_names) == 0:
print("Error: no built-in policies found!")
else:
print("\nHint: Use -P and provide the path to a policy to run a policy scan.\n")
print("\nHint: Use -P and provide the full name of a policy to run a policy scan with.\n")
def make_policy(aconf: AuditConf, banner: Optional['Banner'], kex: Optional['SSH2_Kex'], client_host: Optional[str]) -> None:
@@ -685,11 +652,15 @@ def process_commandline(args: List[str], usage_cb: Callable[..., None]) -> 'Audi
# If a policy file was provided, validate it.
if (aconf.policy_file is not None) and (aconf.make_policy is False):
try:
aconf.policy = Policy(policy_file=aconf.policy_file)
except Exception as e:
print("Error while loading policy file: %s: %s" % (str(e), traceback.format_exc()))
sys.exit(exitcodes.UNKNOWN_ERROR)
# First, see if this is a built-in policy name. If not, assume a file path was provided, and try to load it from disk.
aconf.policy = Policy.load_builtin_policy(aconf.policy_file)
if aconf.policy is None:
try:
aconf.policy = Policy(policy_file=aconf.policy_file)
except Exception as e:
print("Error while loading policy file: %s: %s" % (str(e), traceback.format_exc()))
sys.exit(exitcodes.UNKNOWN_ERROR)
# If the user wants to do a client audit, but provided a server policy, terminate.
if aconf.client_audit and aconf.policy.is_server_policy():