Update action plugin to use ansible-sign library

Signed-off-by: Rick Elrod <rick@elrod.me>
This commit is contained in:
Rick Elrod 2022-08-16 03:59:38 -05:00
parent 8d71292d1a
commit 32bb603554
4 changed files with 157 additions and 364 deletions

View File

@ -631,8 +631,10 @@ class ProjectUpdate(UnifiedJob, ProjectOptions, JobNotificationMixin, TaskManage
added_update_fields = []
if not self.job_tags:
job_tags = ['update_{}'.format(self.scm_type), 'install_roles', 'install_collections']
if self.project.signature_validation:
job_tags.append('playbook_integrity')
if self.project.signature_validation_credential is not None:
credential_type = self.project.signature_validation_credential.credential_type.namespace
job_tags.append(f'validation_{credential_type}')
job_tags.append('validation_checksum_manifest')
self.job_tags = ','.join(job_tags)
added_update_fields.append('job_tags')
if self.scm_delete_on_update and 'delete' not in self.job_tags and self.job_type == 'check':

View File

@ -2,391 +2,114 @@ from __future__ import absolute_import, division, print_function
__metaclass__ = type
import base64
import git
import gnupg
import hashlib
import os
import tempfile
import traceback
from ansible.module_utils.basic import *
from ansible.plugins.action import ActionBase
from ansible.utils.display import Display
from ansible_sign.checksum import (
ChecksumFile,
ChecksumMismatch,
InvalidChecksumLine,
)
from ansible_sign.checksum.differ import DistlibManifestChecksumFileExistenceDiffer
from ansible_sign.signing import *
display = Display()
VALIDATION_TYPES = (
"checksum_manifest",
"gpg",
)
class ActionModule(ActionBase):
def run(self, tmp=None, task_vars=None):
self._supports_check_mode = False
result = super(ActionModule, self).run(tmp, task_vars)
super(ActionModule, self).run(tmp, task_vars)
params = self._task.args
self.params = self._task.args
scm_type = params.get("scm_type")
if scm_type is None:
return {
"failed": True,
"msg": "scm_type not given, needed for checksum verification.",
}
differ_cls = SCM_FILE_EXISTENCE_CHECKER_MAP.get(scm_type)
if differ_cls is None:
return {
"failed": True,
"msg": "scm_type given is not supported.",
}
project_path = params.get("project_path")
if project_path is None:
self.project_path = self.params.get("project_path")
if self.project_path is None:
return {
"failed": True,
"msg": "No project path (project_path) was supplied.",
}
sig_type = params.get("sig_type")
if sig_type is None:
return {
"failed": True,
"msg": "No signature type (sig_type) was supplied. Try setting it to 'gpg'?",
}
validation_type = self.params.get("validation_type")
if validation_type is None or validation_type not in VALIDATION_TYPES:
return {"failed": True, "msg": "validation_type must be one of: " + ', '.join(VALIDATION_TYPES)}
validation_method = getattr(self, f"validate_{validation_type}")
return validation_method()
sig_verifier_cls = SIGNATURE_VALIDATION_MAP.get(sig_type)
if sig_verifier_cls is None:
return {
"failed": True,
"msg": f"sig_type given is not supported, must be one of: {', '.join(SIGNATURE_VALIDATION_MAP.keys())}",
}
# NOTE: We currently do checksum validation before we do signature
# verification. We should probably flip them because the former takes
# more work, and if the signature check fails, there's no sense
# in doing that extra work.
# The validator will assume that the directory containing the checksum
# file is the root of the project.
checksum_path = os.path.join(project_path, "sha256sum.txt")
validator = ChecksumFileValidator.from_path(checksum_path, differ=differ_cls)
# Do checksum validation
try:
validator.verify()
except InvalidChecksumLine as e:
return {
"failed": True,
"msg": f"Checksum file contained a line that could not be parsed: {e}",
}
except ChecksumMismatch as e:
return {
"failed": True,
"msg": f"Checksum validation failed: {e}",
}
# Do signature validation
verifier = sig_verifier_cls(params)
verification = verifier.verify()
result.update(verification)
return result
class ContentVerifier:
'''
Represents a way of performing content verification. It doesn't make any
assumptions about the kind of verification being done. The constructor takes
raw parameters from the action, and subclasses can make use of those as
needed to perform whatever kind of verification they want to.
'''
def __init__(self, params):
self.params = params
def verify(self):
'''
Does the actual verification.
'''
raise NotImplementedError("verify")
class GPGVerifier(ContentVerifier):
def __init__(self, params):
super(GPGVerifier, self).__init__(params)
self.pubkey = params.get("gpg_pubkey")
# We know this key exists, because we check for it before we do checksum
# validation.
self.project_path = params.get("project_path")
def verify(self):
if self.pubkey is None:
def validate_gpg(self):
gpg_pubkey = self.params.get("gpg_pubkey")
if gpg_pubkey is None:
return {
"failed": True,
"msg": "No GPG public key (gpg_pubkey) was supplied.",
}
sha256sum_path = os.path.join(self.project_path, "sha256sum.txt")
sha256sum_sig_path = f"{sha256sum_path}.sig"
signature_file = os.path.join(self.project_path, ".ansible-sign", "sha256sum.txt.sig")
manifest_file = os.path.join(self.project_path, ".ansible-sign", "sha256sum.txt")
if not os.path.exists(sha256sum_sig_path):
return {
"failed": True,
"msg": "Project path existed but did not contain 'sha256sum.txt.sig'.",
}
result = {}
with tempfile.TemporaryDirectory() as tmpdir:
gpg = gnupg.GPG(gnupghome=tmpdir)
import_result = gpg.import_keys(self.pubkey)
result["gpg_pubkeys_imported"] = import_result.count
result["gpg_fingerprints"] = import_result.fingerprints
with open(sha256sum_sig_path, 'rb') as sha256sum_sig:
verified = gpg.verify_file(sha256sum_sig, sha256sum_path)
if not verified:
for path in (signature_file, manifest_file):
if not os.path.exists(path):
return {
"failed": True,
"msg": "Verification of checksum file failed.",
"details": {
"gpg_stderr": verified.stderr,
},
"msg": f"Expected file not found: {path}",
}
with tempfile.TemporaryDirectory() as gpg_home:
gpg = gnupg.GPG(gnupghome=gpg_home)
gpg.import_keys(gpg_pubkey)
verifier = GPGVerifier(
manifest_path=manifest_file,
detached_signature_path=signature_file,
gpg_home=gpg_home,
)
result = verifier.verify()
return {
"failed": not result.success,
"msg": result.summary,
"gpg_details": result.extra_information,
}
def validate_checksum_manifest(self):
checksum = ChecksumFile(self.project_path, differ=DistlibManifestChecksumFileExistenceDiffer)
manifest_file = os.path.join(self.project_path, ".ansible-sign", "sha256sum.txt")
if not os.path.exists(manifest_file):
return {
"failed": False,
"msg": "Verification of checksum file succeeded.",
"details": {
"gpg_stderr": verified.stderr,
"gpg_fingerprint": verified.fingerprint,
"gpg_creation_date": verified.creation_date,
"gpg_status": verified.status,
"gpg_timestamp": verified.timestamp,
# "gpg_expiry": verified.expiry,
},
"failed": True,
"msg": f"Expected file not found: {path}",
}
checksum_file_contents = open(manifest_file, "r").read()
class ChecksumFileExistenceDiffer:
'''
When checking checksum files, it can be important to ensure not only that
the files listed have correct hashes, but that no files were added that
are not listed.
try:
manifest = checksum.parse(checksum_file_contents)
except InvalidChecksumLine as e:
return {
"failed": True,
"msg": f"Invalid line in checksum manifest: {e}",
}
This is particularly important in situations where files might get
"wildcard-included" -- whereby an extra file slipping in could present a
security risk.
try:
checksum.verify(manifest)
except ChecksumMismatch as e:
return {
"failed": True,
"msg": str(e),
}
This class, and subclasses of it, provide an implementation that
ChecksumFileValidator instances can use to list all "interesting" files that
should be listed in the checksum file.
'''
ignored_files = set(
[
"sha256sum.txt",
"sha256sum.txt.sig",
]
)
def __init__(self, root):
self.root = root
self.files = self.gather_files()
def gather_files(self):
return set()
def list_files(self):
return self.files - self.ignored_files
def compare_filelist(self, checksum_paths):
'''
Given a set of paths (from a checksum file), see if files have since
been added or removed from the root directory and any deeper
directories.
The given set of paths is used as the source of truth and additions
and deletions are list with respect to it.
'''
out = {
"added": set(),
"removed": set(),
return {
"failed": False,
"msg": "Checksum manifest is valid.",
}
real_paths = self.list_files()
out["added"] = real_paths - checksum_paths
out["removed"] = checksum_paths - real_paths
return out
class GitChecksumFileExistenceDiffer(ChecksumFileExistenceDiffer):
'''
Use gitpython to get walk the file tree of a git repository at HEAD of the
branch that is currently checked out.
'''
def gather_files(self):
repo = git.Repo(self.root)
files = set()
stack = [repo.head.commit.tree]
while stack:
tree = stack.pop()
for blob in tree.blobs:
files.add(blob.path)
for inner in tree.trees:
stack.append(inner)
return set(files)
class InvalidChecksumLine(Exception):
pass
class NoDifferException(Exception):
pass
class ChecksumMismatch(Exception):
pass
class ChecksumFileValidator:
'''
Slurp a checksum file and be able to check and compare its contents to a
given root directory.
We assume sha256 for now.
'''
def __init__(self, raw_checksum_file, root, differ=None):
self._raw_checksum_file = raw_checksum_file
self.root = root
if differ is not None:
self.differ = differ(root=root)
self.parsed_checksums = self._parse()
@classmethod
def from_path(cls, path, *args, **kwargs):
if not args and "root" not in kwargs:
kwargs["root"] = os.path.dirname(path)
return cls(open(path, 'r').read(), *args, **kwargs)
def _parse_bsd_style(self, line):
'''
Attempt to parse a BSD style checksum line, returning False if we
are unable to.
A BSD style line looks like this:
SHA256 (hello_world.yml) = f712979c4c5dfe739253908d122f5c87faa8b5de6f15ba7a1548ae028ff22d13
'''
# Each BSD line is prefixed with 'SHA256 ('. Then, starting from the
# right (and assuming sha256 only, for now) we can count 68
# characters ( sha length and ") = " ) to look for another pattern.
if line.startswith('SHA256 (') and line[-68:-64] == ') = ':
# If both of those criteria match, we are pretty confident this
# is a BSD style line. From the right, split once at the = sign
# and parse out the path, and we are done. If the split
# doesn't work, or the sha isn't length 64, then assume it's
# not a BSD line, after all.
parts = line.rsplit(' = ', 1)
if len(parts) == 2 and len(parts[1]) == 64:
path = parts[0][8:-1]
shasum = parts[1]
return (path, shasum)
return False
def _parse_gnu_style(self, line):
'''
Attempt to parse a GNU style line checksum line, returning False if
we are unable to.
A GNU style line looks like this:
f712979c4c5dfe739253908d122f5c87faa8b5de6f15ba7a1548ae028ff22d13 hello_world.yml
Or maybe like this:
f82da8b4f98a3d3125fbc98408911f65dbc8dc38c0f38e258ebe290a8ad3d3c0 *binary
'''
parts = line.split(' ', 1)
if len(parts) != 2 or len(parts[0]) != 64:
return False
if len(parts[1]) < 2 or parts[1][0] not in (' ', '*'):
return False
shasum = parts[0]
path = parts[1][1:]
return (path, shasum)
def _parse(self):
checksums = {}
for idx, line in enumerate(self._raw_checksum_file.splitlines()):
if not line.strip():
continue
parsed = self._parse_bsd_style(line)
if parsed is False:
parsed = self._parse_gnu_style(line)
if parsed is False:
raise InvalidChecksumLine(f"Unparsable checksum, line {idx + 1}: {line}")
path = parsed[0]
shasum = parsed[1]
if path in checksums:
raise InvalidChecksumLine(f"Duplicate path in checksum, line {idx + 1}: {line}")
checksums[path] = shasum
return checksums
def diff(self):
if self.differ is None:
raise NoDifferException("diff: No differ was associated with this instance")
paths = set(self.parsed_checksums.keys())
return self.differ.compare_filelist(paths)
def calculate_checksum(self, path):
fullpath = os.path.join(self.root, path)
shasum = hashlib.sha256()
with open(fullpath, 'rb') as f:
while True:
chunk = f.read(65536)
if not chunk:
break
shasum.update(chunk)
return shasum.hexdigest()
def calculate_checksums(self):
# This uses the paths found in the parsed checksum file.
# Just calling this is not enough in many cases- you want to ensure that
# the files in the checksum list are the same ones present in reality.
# diff() above does just that. Use that in combination with this method,
# or use verify() which does it for you.
out = {}
for path in self.parsed_checksums.keys():
shasum = self.calculate_checksum(path)
out[path] = shasum
return out
def verify(self):
# If there are any differences in existing paths, fail the check...
differences = self.diff()
if differences["added"] or differences["removed"]:
raise ChecksumMismatch(differences)
recalculated = self.calculate_checksums()
mismatches = set()
for parsed_path, parsed_checksum in self.parsed_checksums.items():
if recalculated[parsed_path] != parsed_checksum:
mismatches.add(parsed_path)
if mismatches:
raise ChecksumMismatch(f"Checksum mismatch: {', '.join(mismatches)}")
return True
SCM_FILE_EXISTENCE_CHECKER_MAP = {
"git": GitChecksumFileExistenceDiffer,
}
SIGNATURE_VALIDATION_MAP = {
"gpg": GPGVerifier,
}

View File

@ -0,0 +1,65 @@
ANSIBLE_METADATA = {"metadata_version": "1.0", "status": ["stableinterface"], "supported_by": "community"}
DOCUMENTATION = """
---
module: playbook_integrity
short_description: verify that files within a project have not been tampered with.
description:
- Makes use of the 'ansible-sign' project as a library for ensuring that an
Ansible project has not been tampered with.
- There are multiple types of validation that this action plugin supports,
currently: GPG public/private key signing of a checksum manifest file, and
checking the checksum manifest file itself against the checksum of each file
that is being verified.
- In the future, other types of validation may be supported.
options:
project_path:
description:
- Directory of the project being verified. Expected to contain a
C(.ansible-sign) directory with a generated checksum manifest file and a
detached signature for it. These files are produced by the
C(ansible-sign) command-line utility.
required: true
validation_type:
description:
- Describes the kind of validation to perform on the project.
- I(validation_type=gpg) means that a GPG Public Key credential is being
used to verify the integrity of the checksum manifest (and therefore the
project).
- 'checksum_manifest' means that the signed checksum manifest is validated
against all files in the project listed by its MANIFEST.in file. Just
running this plugin with I(validation_type=checksum_manifest) is
typically B(NOT) enough. It should also be run with a I(validation_type)
that ensures that the manifest file itself has not changed, such as
I(validation_type=gpg).
required: true
choices:
- gpg
- checksum_manifest
gpg_pubkey:
description:
- The public key to validate a checksum manifest against. Must match the
detached signature in the project's C(.ansible-sign) directory.
- Required when I(validation_type=gpg).
author:
- Ansible AWX Team
"""
EXAMPLES = """
- name: Verify project content using GPG signature
playbook_integrity:
project_path: /srv/projects/example
validation_type: gpg
gpg_pubkey: |
-----BEING PGP PUBLIC KEY BLOCK-----
mWINAFXMtjsACADIf/zJS0V3UO3c+KAUcpVAcChpliM31ICDWydfIfF3dzMzLcCd
Cj2kk1mPWtP/JHfk1V5czcWWWWGC2Tw4g4IS+LokAAuwk7VKTlI34eeMl8SiZCAI
[...]
- name: Verify project content against checksum manifest
playbook_integrity:
project_path: /srv/projects/example
validation_type: checksum_manifest
"""

View File

@ -18,6 +18,7 @@
# galaxy_task_env: environment variables to use specifically for ansible-galaxy commands
# awx_version: Current running version of the awx or tower as a string
# awx_license_type: "open" for AWX; else presume Tower
# gpg_pubkey: the GPG public key to use for validation, when enabled
- hosts: localhost
gather_facts: false
@ -156,22 +157,24 @@
- hosts: localhost
gather_facts: false
connection: local
name: Validate project content integrity
name: Perform project signature/checksum verification
tasks:
- block:
- name: verify project integrity
playbook_integrity:
project_path: "{{ project_path | quote }}"
sig_type: gpg # TODO: unhardcode
gpg_pubkey: "{{ gpg_pubkey }}"
scm_type: git # TODO: unhardcode
register: playbook_integrity_check
- name: show integrity check result
debug:
var: playbook_integrity_check
- name: Verify project content using GPG signature
playbook_integrity:
project_path: "{{ project_path | quote }}"
validation_type: gpg
gpg_pubkey: "{{ gpg_pubkey }}"
register: gpg_result
tags:
- playbook_integrity_gpg
- playbook_integrity
- validation_gpg_public_key
- name: Verify project content against checksum manifest
playbook_integrity:
project_path: "{{ project_path | quote }}"
validation_type: checksum_manifest
register: checksum_result
tags:
- validation_checksum_manifest
- hosts: localhost
gather_facts: false