Allow JTs to specify and prompt for SCM branch

Copy project folder each job run
  change cwd to private_data_dir, from proj
  do not add cwd to show_paths if it is
  a subdirectory of private_data_dir, which
  is already shown

Pass the job private_data_dir to the local
  project sync, and also add that directory
  to the project sync show paths

Add GitPython dep and use for job sync logic
  use this to manage shallow clone from desired
  commit, and to map branch to commit,
  and to assess necessity of project sync

Start on some validation change, but not all
  allow arbitrary playbooks with custom branch
This commit is contained in:
AlanCoding
2019-06-04 15:26:14 -04:00
parent 28e3625066
commit ac86dc4fb9
12 changed files with 367 additions and 93 deletions

View File

@@ -20,6 +20,8 @@ from distutils.dir_util import copy_tree
from distutils.version import LooseVersion as Version
import yaml
import fcntl
from pathlib import Path
from uuid import uuid4
try:
import psutil
except Exception:
@@ -41,6 +43,10 @@ from django.core.exceptions import ObjectDoesNotExist
# Django-CRUM
from crum import impersonate
# GitPython
import git
from gitdb.exc import BadName as BadGitName
# Runner
import ansible_runner
@@ -694,9 +700,12 @@ class BaseTask(object):
model = None
event_model = None
abstract = True
cleanup_paths = []
proot_show_paths = []
def __init__(self, *args, **kwargs):
super(BaseTask, self).__init__(*args, **kwargs)
self.cleanup_paths = []
def update_model(self, pk, _attempt=0, **updates):
"""Reload the model instance from the database and update the
given fields.
@@ -769,9 +778,11 @@ class BaseTask(object):
os.chmod(path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
if settings.AWX_CLEANUP_PATHS:
self.cleanup_paths.append(path)
# Ansible Runner requires that this directory exists.
# Specifically, when using process isolation
os.mkdir(os.path.join(path, 'project'))
runner_project_folder = os.path.join(path, 'project')
if not os.path.exists(runner_project_folder):
# Ansible Runner requires that this directory exists.
# Specifically, when using process isolation
os.mkdir(runner_project_folder)
return path
def build_private_data_files(self, instance, private_data_dir):
@@ -860,7 +871,10 @@ class BaseTask(object):
'''
process_isolation_params = dict()
if self.should_use_proot(instance):
show_paths = self.proot_show_paths + [private_data_dir, cwd] + \
local_paths = [private_data_dir]
if cwd != private_data_dir and Path(private_data_dir) not in Path(cwd).parents:
local_paths.append(cwd)
show_paths = self.proot_show_paths + local_paths + \
settings.AWX_PROOT_SHOW_PATHS
# Help the user out by including the collections path inside the bubblewrap environment
@@ -1030,7 +1044,7 @@ class BaseTask(object):
expect_passwords[k] = passwords.get(v, '') or ''
return expect_passwords
def pre_run_hook(self, instance):
def pre_run_hook(self, instance, private_data_dir):
'''
Hook for any steps to run before the job/task starts
'''
@@ -1157,7 +1171,8 @@ class BaseTask(object):
try:
isolated = self.instance.is_isolated()
self.instance.send_notification_templates("running")
self.pre_run_hook(self.instance)
private_data_dir = self.build_private_data_dir(self.instance)
self.pre_run_hook(self.instance, private_data_dir)
if self.instance.cancel_flag:
self.instance = self.update_model(self.instance.pk, status='canceled')
if self.instance.status != 'running':
@@ -1173,7 +1188,6 @@ class BaseTask(object):
# store a record of the venv used at runtime
if hasattr(self.instance, 'custom_virtualenv'):
self.update_model(pk, custom_virtualenv=getattr(self.instance, 'ansible_virtualenv_path', settings.ANSIBLE_VENV_PATH))
private_data_dir = self.build_private_data_dir(self.instance)
# Fetch "cached" fact data from prior runs and put on the disk
# where ansible expects to find it
@@ -1256,9 +1270,6 @@ class BaseTask(object):
module_args = ansible_runner.utils.args2cmdline(
params.get('module_args'),
)
else:
# otherwise, it's a playbook, so copy the project dir
copy_tree(cwd, os.path.join(private_data_dir, 'project'))
shutil.move(
params.pop('inventory'),
os.path.join(private_data_dir, 'inventory')
@@ -1532,15 +1543,10 @@ class RunJob(BaseTask):
return args
def build_cwd(self, job, private_data_dir):
cwd = job.project.get_project_path()
if not cwd:
root = settings.PROJECTS_ROOT
raise RuntimeError('project local_path %s cannot be found in %s' %
(job.project.local_path, root))
return cwd
return os.path.join(private_data_dir, 'project')
def build_playbook_path_relative_to_cwd(self, job, private_data_dir):
return os.path.join(job.playbook)
return job.playbook
def build_extra_vars_file(self, job, private_data_dir):
# Define special extra_vars for AWX, combine with job.extra_vars.
@@ -1587,39 +1593,117 @@ class RunJob(BaseTask):
'''
return getattr(settings, 'AWX_PROOT_ENABLED', False)
def pre_run_hook(self, job):
def copy_folders(self, project_path, galaxy_install_path, private_data_dir):
if project_path is None:
raise RuntimeError('project does not supply a valid path')
elif not os.path.exists(project_path):
raise RuntimeError('project path %s cannot be found' % project_path)
runner_project_folder = os.path.join(private_data_dir, 'project')
copy_tree(project_path, runner_project_folder)
if galaxy_install_path:
galaxy_run_path = os.path.join(private_data_dir, 'project', 'roles')
copy_tree(galaxy_install_path, galaxy_run_path)
def pre_run_hook(self, job, private_data_dir):
if job.inventory is None:
error = _('Job could not start because it does not have a valid inventory.')
self.update_model(job.pk, status='failed', job_explanation=error)
raise RuntimeError(error)
if job.project and job.project.scm_type:
elif job.project is None:
error = _('Job could not start because it does not have a valid project.')
self.update_model(job.pk, status='failed', job_explanation=error)
raise RuntimeError(error)
elif job.project.status in ('error', 'failed'):
msg = _(
'The project revision for this job template is unknown due to a failed update.'
)
job = self.update_model(job.pk, status='failed', job_explanation=msg)
raise RuntimeError(msg)
galaxy_install_path = None
git_repo = None
project_path = job.project.get_project_path(check_if_exists=False)
job_revision = job.project.scm_revision
needs_sync = True
if not job.project.scm_type:
# manual projects are not synced, user has responsibility for that
needs_sync = False
elif not os.path.exists(project_path):
logger.debug('Performing fresh clone of {} on this instance.'.format(job.project))
needs_sync = True
elif job.project.scm_type == 'git':
git_repo = git.Repo(project_path)
if job.scm_branch and job.scm_branch != job.project.scm_branch and git_repo:
try:
commit = git_repo.commit(job.scm_branch)
job_revision = commit.hexsha
logger.info('Skipping project sync for {} because commit is locally available'.format(job.log_format))
needs_sync = False # requested commit is already locally available
except (ValueError, BadGitName):
pass
else:
if git_repo.head.commit.hexsha == job.project.scm_revision:
logger.info('Source tree for for {} is already up to date'.format(job.log_format))
needs_sync = False
# Galaxy requirements are not supported for manual projects
if not needs_sync and job.project.scm_type:
# see if we need a sync because of presence of roles
galaxy_req_path = os.path.join(project_path, 'roles', 'requirements.yml')
if os.path.exists(galaxy_req_path):
logger.debug('Running project sync for {} because of galaxy role requirements.'.format(job.log_format))
needs_sync = True
if needs_sync:
pu_ig = job.instance_group
pu_en = job.execution_node
if job.is_isolated() is True:
pu_ig = pu_ig.controller
pu_en = settings.CLUSTER_HOST_ID
if job.project.status in ('error', 'failed'):
msg = _(
'The project revision for this job template is unknown due to a failed update.'
)
job = self.update_model(job.pk, status='failed', job_explanation=msg)
raise RuntimeError(msg)
local_project_sync = job.project.create_project_update(
_eager_fields=dict(
launch_type="sync",
job_type='run',
status='running',
instance_group = pu_ig,
execution_node=pu_en,
celery_task_id=job.celery_task_id))
sync_metafields = dict(
launch_type="sync",
job_type='run',
status='running',
instance_group = pu_ig,
execution_node=pu_en,
celery_task_id=job.celery_task_id
)
if job.scm_branch and job.scm_branch != job.project.scm_branch:
sync_metafields['scm_branch'] = job.scm_branch
local_project_sync = job.project.create_project_update(_eager_fields=sync_metafields)
# save the associated job before calling run() so that a
# cancel() call on the job can cancel the project update
job = self.update_model(job.pk, project_update=local_project_sync)
# Save the roles from galaxy to a temporary directory to be moved later
# at this point, the project folder has not yet been coppied into the temporary directory
galaxy_install_path = tempfile.mkdtemp(prefix='tmp_roles_', dir=private_data_dir)
os.chmod(galaxy_install_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
project_update_task = local_project_sync._get_task_class()
try:
project_update_task().run(local_project_sync.id)
job = self.update_model(job.pk, scm_revision=job.project.scm_revision)
sync_task = project_update_task(roles_destination=galaxy_install_path)
sync_task.run(local_project_sync.id)
# if job overrided the branch, we need to find the revision that will be ran
if job.scm_branch and job.scm_branch != job.project.scm_branch:
# TODO: handle case of non-git
if job.project.scm_type == 'git':
git_repo = git.Repo(project_path)
try:
commit = git_repo.commit(job.scm_branch)
job_revision = commit.hexsha
logger.debug('Evaluated {} to be a valid commit for {}'.format(job.scm_branch, job.log_format))
except (ValueError, BadGitName):
# not a commit, see if it is a ref
try:
user_branch = getattr(git_repo.refs, job.scm_branch)
job_revision = user_branch.commit.hexsha
logger.debug('Evaluated {} to be a valid ref for {}'.format(job.scm_branch, job.log_format))
except git.exc.NoSuchPathError as exc:
raise RuntimeError('Could not find specified version {}, error: {}'.format(
job.scm_branch, exc
))
else:
job_revision = sync_task.updated_revision
job = self.update_model(job.pk, scm_revision=job_revision)
except Exception:
local_project_sync.refresh_from_db()
if local_project_sync.status != 'canceled':
@@ -1627,6 +1711,31 @@ class RunJob(BaseTask):
job_explanation=('Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' %
('project_update', local_project_sync.name, local_project_sync.id)))
raise
else:
# Case where a local sync is not needed, meaning that local tree is
# up-to-date with project, job is running project current version
if job_revision:
job = self.update_model(job.pk, scm_revision=job_revision)
# copy the project directory
runner_project_folder = os.path.join(private_data_dir, 'project')
if job.project.scm_type == 'git':
git_repo = git.Repo(project_path)
if not os.path.exists(runner_project_folder):
os.mkdir(runner_project_folder)
tmp_branch_name = 'awx_internal/{}'.format(uuid4())
# always clone based on specific job revision
source_branch = git_repo.create_head(tmp_branch_name, job.scm_revision)
git_repo.clone(runner_project_folder, branch=source_branch, depth=1, single_branch=True)
# force option is necessary because remote refs are not counted, although no information is lost
git_repo.delete_head(tmp_branch_name, force=True)
else:
copy_tree(project_path, runner_project_folder)
if galaxy_install_path and os.listdir(galaxy_install_path):
logger.debug('Copying galaxy roles for {} to tmp directory'.format(job.log_format))
galaxy_run_path = os.path.join(private_data_dir, 'project', 'roles')
copy_tree(galaxy_install_path, galaxy_run_path)
if job.inventory.kind == 'smart':
# cache smart inventory memberships so that the host_filter query is not
# ran inside of the event saving code
@@ -1663,7 +1772,23 @@ class RunProjectUpdate(BaseTask):
@property
def proot_show_paths(self):
return [settings.PROJECTS_ROOT]
show_paths = [settings.PROJECTS_ROOT]
if self.roles_destination:
show_paths.append(self.roles_destination)
return show_paths
def __init__(self, *args, roles_destination=None, **kwargs):
super(RunProjectUpdate, self).__init__(*args, **kwargs)
self.updated_revision = None
self.roles_destination = roles_destination
def event_handler(self, event_data):
super(RunProjectUpdate, self).event_handler(event_data)
returned_data = event_data.get('event_data', {})
if returned_data.get('task_action', '') == 'set_fact':
returned_facts = returned_data.get('res', {}).get('ansible_facts', {})
if 'scm_version' in returned_facts:
self.updated_revision = returned_facts['scm_version']
def build_private_data(self, project_update, private_data_dir):
'''
@@ -1678,9 +1803,6 @@ class RunProjectUpdate(BaseTask):
}
}
'''
handle, self.revision_path = tempfile.mkstemp(dir=settings.PROJECTS_ROOT)
if settings.AWX_CLEANUP_PATHS:
self.cleanup_paths.append(self.revision_path)
private_data = {'credentials': {}}
if project_update.credential:
credential = project_update.credential
@@ -1781,7 +1903,7 @@ class RunProjectUpdate(BaseTask):
scm_url, extra_vars_new = self._build_scm_url_extra_vars(project_update)
extra_vars.update(extra_vars_new)
if project_update.project.scm_revision and project_update.job_type == 'run':
if project_update.project.scm_revision and project_update.job_type == 'run' and not project_update.project.allow_override:
scm_branch = project_update.project.scm_revision
else:
scm_branch = project_update.scm_branch or {'hg': 'tip'}.get(project_update.scm_type, 'HEAD')
@@ -1796,17 +1918,21 @@ class RunProjectUpdate(BaseTask):
'scm_clean': project_update.scm_clean,
'scm_delete_on_update': project_update.scm_delete_on_update if project_update.job_type == 'check' else False,
'scm_full_checkout': True if project_update.job_type == 'run' else False,
'scm_revision_output': self.revision_path,
'scm_revision': project_update.project.scm_revision,
'roles_enabled': getattr(settings, 'AWX_ROLES_ENABLED', True)
'roles_enabled': getattr(settings, 'AWX_ROLES_ENABLED', True) if project_update.job_type != 'check' else False
})
if project_update.project.allow_override:
# If branch is override-able, do extra fetch for all branches
# coming feature TODO: obtain custom refspec from user for PR refs and the like
extra_vars['git_refspec'] = 'refs/heads/*:refs/remotes/origin/*'
if self.roles_destination:
extra_vars['roles_destination'] = self.roles_destination
self._write_extra_vars_file(private_data_dir, extra_vars)
def build_cwd(self, project_update, private_data_dir):
return self.get_path_to('..', 'playbooks')
def build_playbook_path_relative_to_cwd(self, project_update, private_data_dir):
self.build_cwd(project_update, private_data_dir)
return os.path.join('project_update.yml')
def get_password_prompts(self, passwords={}):
@@ -1920,7 +2046,7 @@ class RunProjectUpdate(BaseTask):
'{} spent {} waiting to acquire lock for local source tree '
'for path {}.'.format(instance.log_format, waiting_time, lock_path))
def pre_run_hook(self, instance):
def pre_run_hook(self, instance, private_data_dir):
# re-create root project folder if a natural disaster has destroyed it
if not os.path.exists(settings.PROJECTS_ROOT):
os.mkdir(settings.PROJECTS_ROOT)
@@ -1930,10 +2056,8 @@ class RunProjectUpdate(BaseTask):
self.release_lock(instance)
p = instance.project
if instance.job_type == 'check' and status not in ('failed', 'canceled',):
fd = open(self.revision_path, 'r')
lines = fd.readlines()
if lines:
p.scm_revision = lines[0].strip()
if self.updated_revision:
p.scm_revision = self.updated_revision
else:
logger.info("{} Could not find scm revision in check".format(instance.log_format))
p.playbook_files = p.playbooks
@@ -2159,11 +2283,12 @@ class RunInventoryUpdate(BaseTask):
# All credentials not used by inventory source injector
return inventory_update.get_extra_credentials()
def pre_run_hook(self, inventory_update):
def pre_run_hook(self, inventory_update, private_data_dir):
source_project = None
if inventory_update.inventory_source:
source_project = inventory_update.inventory_source.source_project
if (inventory_update.source=='scm' and inventory_update.launch_type!='scm' and source_project):
# In project sync, pulling galaxy roles is not needed
local_project_sync = source_project.create_project_update(
_eager_fields=dict(
launch_type="sync",