mirror of
https://github.com/ansible/awx.git
synced 2026-01-11 10:00:01 -03:30
Merge pull request #12356 from AlanCoding/copytree_neo
Replace git shallow clone with shutil.copytree
This commit is contained in:
commit
fad5934c1e
@ -1,6 +1,5 @@
|
||||
# Python
|
||||
from collections import OrderedDict
|
||||
from distutils.dir_util import copy_tree
|
||||
import errno
|
||||
import functools
|
||||
import fcntl
|
||||
@ -15,7 +14,6 @@ import tempfile
|
||||
import traceback
|
||||
import time
|
||||
import urllib.parse as urlparse
|
||||
from uuid import uuid4
|
||||
|
||||
# Django
|
||||
from django.conf import settings
|
||||
@ -211,14 +209,22 @@ class BaseTask(object):
|
||||
os.chmod(path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
|
||||
if settings.AWX_CLEANUP_PATHS:
|
||||
self.cleanup_paths.append(path)
|
||||
# Ansible runner requires that project exists,
|
||||
# and we will write files in the other folders without pre-creating the folder
|
||||
for subfolder in ('project', 'inventory', 'env'):
|
||||
# We will write files in these folders later
|
||||
for subfolder in ('inventory', 'env'):
|
||||
runner_subfolder = os.path.join(path, subfolder)
|
||||
if not os.path.exists(runner_subfolder):
|
||||
os.mkdir(runner_subfolder)
|
||||
return path
|
||||
|
||||
def build_project_dir(self, instance, private_data_dir):
|
||||
"""
|
||||
Create the ansible-runner project subdirectory. In many cases this is the source checkout.
|
||||
In cases that do not even need the source checkout, we create an empty dir to be the workdir.
|
||||
"""
|
||||
project_dir = os.path.join(private_data_dir, 'project')
|
||||
if not os.path.exists(project_dir):
|
||||
os.mkdir(project_dir)
|
||||
|
||||
def build_private_data_files(self, instance, private_data_dir):
|
||||
"""
|
||||
Creates temporary files containing the private data.
|
||||
@ -354,6 +360,52 @@ class BaseTask(object):
|
||||
expect_passwords[k] = passwords.get(v, '') or ''
|
||||
return expect_passwords
|
||||
|
||||
def release_lock(self, project):
|
||||
try:
|
||||
fcntl.lockf(self.lock_fd, fcntl.LOCK_UN)
|
||||
except IOError as e:
|
||||
logger.error("I/O error({0}) while trying to release lock file [{1}]: {2}".format(e.errno, project.get_lock_file(), e.strerror))
|
||||
os.close(self.lock_fd)
|
||||
raise
|
||||
|
||||
os.close(self.lock_fd)
|
||||
self.lock_fd = None
|
||||
|
||||
def acquire_lock(self, project, unified_job_id=None):
|
||||
if not os.path.exists(settings.PROJECTS_ROOT):
|
||||
os.mkdir(settings.PROJECTS_ROOT)
|
||||
|
||||
lock_path = project.get_lock_file()
|
||||
if lock_path is None:
|
||||
# If from migration or someone blanked local_path for any other reason, recoverable by save
|
||||
project.save()
|
||||
lock_path = project.get_lock_file()
|
||||
if lock_path is None:
|
||||
raise RuntimeError(u'Invalid lock file path')
|
||||
|
||||
try:
|
||||
self.lock_fd = os.open(lock_path, os.O_RDWR | os.O_CREAT)
|
||||
except OSError as e:
|
||||
logger.error("I/O error({0}) while trying to open lock file [{1}]: {2}".format(e.errno, lock_path, e.strerror))
|
||||
raise
|
||||
|
||||
start_time = time.time()
|
||||
while True:
|
||||
try:
|
||||
fcntl.lockf(self.lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
break
|
||||
except IOError as e:
|
||||
if e.errno not in (errno.EAGAIN, errno.EACCES):
|
||||
os.close(self.lock_fd)
|
||||
logger.error("I/O error({0}) while trying to aquire lock on file [{1}]: {2}".format(e.errno, lock_path, e.strerror))
|
||||
raise
|
||||
else:
|
||||
time.sleep(1.0)
|
||||
waiting_time = time.time() - start_time
|
||||
|
||||
if waiting_time > 1.0:
|
||||
logger.info(f'Job {unified_job_id} waited {waiting_time} to acquire lock for local source tree for path {lock_path}.')
|
||||
|
||||
def pre_run_hook(self, instance, private_data_dir):
|
||||
"""
|
||||
Hook for any steps to run before the job/task starts
|
||||
@ -424,6 +476,7 @@ class BaseTask(object):
|
||||
self.instance.send_notification_templates("running")
|
||||
private_data_dir = self.build_private_data_dir(self.instance)
|
||||
self.pre_run_hook(self.instance, private_data_dir)
|
||||
self.build_project_dir(self.instance, private_data_dir)
|
||||
self.instance.log_lifecycle("preparing_playbook")
|
||||
if self.instance.cancel_flag or signal_callback():
|
||||
self.instance = self.update_model(self.instance.pk, status='canceled')
|
||||
@ -593,8 +646,144 @@ class BaseTask(object):
|
||||
raise AwxTaskError.TaskError(self.instance, rc)
|
||||
|
||||
|
||||
class SourceControlMixin(BaseTask):
|
||||
"""Utility methods for tasks that run use content from source control"""
|
||||
|
||||
def get_sync_needs(self, project, scm_branch=None):
|
||||
project_path = project.get_project_path(check_if_exists=False)
|
||||
job_revision = project.scm_revision
|
||||
sync_needs = []
|
||||
source_update_tag = 'update_{}'.format(project.scm_type)
|
||||
branch_override = bool(scm_branch and scm_branch != project.scm_branch)
|
||||
# TODO: skip syncs for inventory updates. Now, UI needs a link added so clients can link to project
|
||||
# source_project is only a field on inventory sources.
|
||||
if isinstance(self.instance, InventoryUpdate):
|
||||
sync_needs.append(source_update_tag)
|
||||
elif not project.scm_type:
|
||||
pass # manual projects are not synced, user has responsibility for that
|
||||
elif not os.path.exists(project_path):
|
||||
logger.debug(f'Performing fresh clone of {project.id} for unified job {self.instance.id} on this instance.')
|
||||
sync_needs.append(source_update_tag)
|
||||
elif project.scm_type == 'git' and project.scm_revision and (not branch_override):
|
||||
try:
|
||||
git_repo = git.Repo(project_path)
|
||||
|
||||
if job_revision == git_repo.head.commit.hexsha:
|
||||
logger.debug(f'Skipping project sync for {self.instance.id} because commit is locally available')
|
||||
else:
|
||||
sync_needs.append(source_update_tag)
|
||||
except (ValueError, BadGitName, git.exc.InvalidGitRepositoryError):
|
||||
logger.debug(f'Needed commit for {self.instance.id} not in local source tree, will sync with remote')
|
||||
sync_needs.append(source_update_tag)
|
||||
else:
|
||||
logger.debug(f'Project not available locally, {self.instance.id} will sync with remote')
|
||||
sync_needs.append(source_update_tag)
|
||||
|
||||
has_cache = os.path.exists(os.path.join(project.get_cache_path(), project.cache_id))
|
||||
# Galaxy requirements are not supported for manual projects
|
||||
if project.scm_type and ((not has_cache) or branch_override):
|
||||
sync_needs.extend(['install_roles', 'install_collections'])
|
||||
|
||||
return sync_needs
|
||||
|
||||
def spawn_project_sync(self, project, sync_needs, scm_branch=None):
|
||||
pu_ig = self.instance.instance_group
|
||||
pu_en = Instance.objects.me().hostname
|
||||
|
||||
sync_metafields = dict(
|
||||
launch_type="sync",
|
||||
job_type='run',
|
||||
job_tags=','.join(sync_needs),
|
||||
status='running',
|
||||
instance_group=pu_ig,
|
||||
execution_node=pu_en,
|
||||
controller_node=pu_en,
|
||||
celery_task_id=self.instance.celery_task_id,
|
||||
)
|
||||
if scm_branch and scm_branch != project.scm_branch:
|
||||
sync_metafields['scm_branch'] = scm_branch
|
||||
sync_metafields['scm_clean'] = True # to accomidate force pushes
|
||||
if 'update_' not in sync_metafields['job_tags']:
|
||||
sync_metafields['scm_revision'] = project.scm_revision
|
||||
local_project_sync = project.create_project_update(_eager_fields=sync_metafields)
|
||||
local_project_sync.log_lifecycle("controller_node_chosen")
|
||||
local_project_sync.log_lifecycle("execution_node_chosen")
|
||||
create_partition(local_project_sync.event_class._meta.db_table, start=local_project_sync.created)
|
||||
return local_project_sync
|
||||
|
||||
def sync_and_copy_without_lock(self, project, private_data_dir, scm_branch=None):
|
||||
sync_needs = self.get_sync_needs(project, scm_branch=scm_branch)
|
||||
|
||||
if sync_needs:
|
||||
local_project_sync = self.spawn_project_sync(project, sync_needs, scm_branch=scm_branch)
|
||||
# save the associated job before calling run() so that a
|
||||
# cancel() call on the job can cancel the project update
|
||||
if isinstance(self.instance, Job):
|
||||
self.instance = self.update_model(self.instance.pk, project_update=local_project_sync)
|
||||
else:
|
||||
self.instance = self.update_model(self.instance.pk, source_project_update=local_project_sync)
|
||||
|
||||
try:
|
||||
# the job private_data_dir is passed so sync can download roles and collections there
|
||||
sync_task = RunProjectUpdate(job_private_data_dir=private_data_dir)
|
||||
sync_task.run(local_project_sync.id)
|
||||
local_project_sync.refresh_from_db()
|
||||
if isinstance(self.instance, Job):
|
||||
self.instance = self.update_model(self.instance.pk, scm_revision=local_project_sync.scm_revision)
|
||||
except Exception:
|
||||
local_project_sync.refresh_from_db()
|
||||
if local_project_sync.status != 'canceled':
|
||||
self.instance = self.update_model(
|
||||
self.instance.pk,
|
||||
status='failed',
|
||||
job_explanation=(
|
||||
'Previous Task Failed: {"job_type": "project_update", '
|
||||
f'"job_name": "{local_project_sync.name}", "job_id": "{local_project_sync.id}"}}'
|
||||
),
|
||||
)
|
||||
raise
|
||||
self.instance.refresh_from_db()
|
||||
if self.instance.cancel_flag:
|
||||
return
|
||||
else:
|
||||
# Case where a local sync is not needed, meaning that local tree is
|
||||
# up-to-date with project, job is running project current version
|
||||
if isinstance(self.instance, Job):
|
||||
self.instance = self.update_model(self.instance.pk, scm_revision=project.scm_revision)
|
||||
# Project update does not copy the folder, so copy here
|
||||
RunProjectUpdate.make_local_copy(project, private_data_dir)
|
||||
|
||||
def sync_and_copy(self, project, private_data_dir, scm_branch=None):
|
||||
self.acquire_lock(project, self.instance.id)
|
||||
|
||||
try:
|
||||
original_branch = None
|
||||
project_path = project.get_project_path(check_if_exists=False)
|
||||
if project.scm_type == 'git' and (scm_branch and scm_branch != project.scm_branch):
|
||||
if os.path.exists(project_path):
|
||||
git_repo = git.Repo(project_path)
|
||||
if git_repo.head.is_detached:
|
||||
original_branch = git_repo.head.commit
|
||||
else:
|
||||
original_branch = git_repo.active_branch
|
||||
|
||||
return self.sync_and_copy_without_lock(project, private_data_dir, scm_branch=scm_branch)
|
||||
finally:
|
||||
# We have made the copy so we can set the tree back to its normal state
|
||||
if original_branch:
|
||||
# for git project syncs, non-default branches can be problems
|
||||
# restore to branch the repo was on before this run
|
||||
try:
|
||||
original_branch.checkout()
|
||||
except Exception:
|
||||
# this could have failed due to dirty tree, but difficult to predict all cases
|
||||
logger.exception(f'Failed to restore project repo to prior state after {self.instance.id}')
|
||||
|
||||
self.release_lock(project)
|
||||
|
||||
|
||||
@task(queue=get_local_queuename)
|
||||
class RunJob(BaseTask):
|
||||
class RunJob(SourceControlMixin, BaseTask):
|
||||
"""
|
||||
Run a job using ansible-playbook.
|
||||
"""
|
||||
@ -863,98 +1052,14 @@ class RunJob(BaseTask):
|
||||
job = self.update_model(job.pk, status='failed', job_explanation=msg)
|
||||
raise RuntimeError(msg)
|
||||
|
||||
project_path = job.project.get_project_path(check_if_exists=False)
|
||||
job_revision = job.project.scm_revision
|
||||
sync_needs = []
|
||||
source_update_tag = 'update_{}'.format(job.project.scm_type)
|
||||
branch_override = bool(job.scm_branch and job.scm_branch != job.project.scm_branch)
|
||||
if not job.project.scm_type:
|
||||
pass # manual projects are not synced, user has responsibility for that
|
||||
elif not os.path.exists(project_path):
|
||||
logger.debug('Performing fresh clone of {} on this instance.'.format(job.project))
|
||||
sync_needs.append(source_update_tag)
|
||||
elif job.project.scm_type == 'git' and job.project.scm_revision and (not branch_override):
|
||||
try:
|
||||
git_repo = git.Repo(project_path)
|
||||
|
||||
if job_revision == git_repo.head.commit.hexsha:
|
||||
logger.debug('Skipping project sync for {} because commit is locally available'.format(job.log_format))
|
||||
else:
|
||||
sync_needs.append(source_update_tag)
|
||||
except (ValueError, BadGitName, git.exc.InvalidGitRepositoryError):
|
||||
logger.debug('Needed commit for {} not in local source tree, will sync with remote'.format(job.log_format))
|
||||
sync_needs.append(source_update_tag)
|
||||
else:
|
||||
logger.debug('Project not available locally, {} will sync with remote'.format(job.log_format))
|
||||
sync_needs.append(source_update_tag)
|
||||
|
||||
has_cache = os.path.exists(os.path.join(job.project.get_cache_path(), job.project.cache_id))
|
||||
# Galaxy requirements are not supported for manual projects
|
||||
if job.project.scm_type and ((not has_cache) or branch_override):
|
||||
sync_needs.extend(['install_roles', 'install_collections'])
|
||||
|
||||
if sync_needs:
|
||||
pu_ig = job.instance_group
|
||||
pu_en = Instance.objects.me().hostname
|
||||
|
||||
sync_metafields = dict(
|
||||
launch_type="sync",
|
||||
job_type='run',
|
||||
job_tags=','.join(sync_needs),
|
||||
status='running',
|
||||
instance_group=pu_ig,
|
||||
execution_node=pu_en,
|
||||
controller_node=pu_en,
|
||||
celery_task_id=job.celery_task_id,
|
||||
)
|
||||
if branch_override:
|
||||
sync_metafields['scm_branch'] = job.scm_branch
|
||||
sync_metafields['scm_clean'] = True # to accomidate force pushes
|
||||
if 'update_' not in sync_metafields['job_tags']:
|
||||
sync_metafields['scm_revision'] = job_revision
|
||||
local_project_sync = job.project.create_project_update(_eager_fields=sync_metafields)
|
||||
local_project_sync.log_lifecycle("controller_node_chosen")
|
||||
local_project_sync.log_lifecycle("execution_node_chosen")
|
||||
create_partition(local_project_sync.event_class._meta.db_table, start=local_project_sync.created)
|
||||
# save the associated job before calling run() so that a
|
||||
# cancel() call on the job can cancel the project update
|
||||
job = self.update_model(job.pk, project_update=local_project_sync)
|
||||
|
||||
project_update_task = local_project_sync._get_task_class()
|
||||
try:
|
||||
# the job private_data_dir is passed so sync can download roles and collections there
|
||||
sync_task = project_update_task(job_private_data_dir=private_data_dir)
|
||||
sync_task.run(local_project_sync.id)
|
||||
local_project_sync.refresh_from_db()
|
||||
job = self.update_model(job.pk, scm_revision=local_project_sync.scm_revision)
|
||||
except Exception:
|
||||
local_project_sync.refresh_from_db()
|
||||
if local_project_sync.status != 'canceled':
|
||||
job = self.update_model(
|
||||
job.pk,
|
||||
status='failed',
|
||||
job_explanation=(
|
||||
'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}'
|
||||
% ('project_update', local_project_sync.name, local_project_sync.id)
|
||||
),
|
||||
)
|
||||
raise
|
||||
job.refresh_from_db()
|
||||
if job.cancel_flag:
|
||||
return
|
||||
else:
|
||||
# Case where a local sync is not needed, meaning that local tree is
|
||||
# up-to-date with project, job is running project current version
|
||||
if job_revision:
|
||||
job = self.update_model(job.pk, scm_revision=job_revision)
|
||||
# Project update does not copy the folder, so copy here
|
||||
RunProjectUpdate.make_local_copy(job.project, private_data_dir, scm_revision=job_revision)
|
||||
|
||||
if job.inventory.kind == 'smart':
|
||||
# cache smart inventory memberships so that the host_filter query is not
|
||||
# ran inside of the event saving code
|
||||
update_smart_memberships_for_inventory(job.inventory)
|
||||
|
||||
def build_project_dir(self, job, private_data_dir):
|
||||
self.sync_and_copy(job.project, private_data_dir, scm_branch=job.scm_branch)
|
||||
|
||||
def final_run_hook(self, job, status, private_data_dir, fact_modification_times):
|
||||
super(RunJob, self).final_run_hook(job, status, private_data_dir, fact_modification_times)
|
||||
if not private_data_dir:
|
||||
@ -986,7 +1091,6 @@ class RunProjectUpdate(BaseTask):
|
||||
|
||||
def __init__(self, *args, job_private_data_dir=None, **kwargs):
|
||||
super(RunProjectUpdate, self).__init__(*args, **kwargs)
|
||||
self.original_branch = None
|
||||
self.job_private_data_dir = job_private_data_dir
|
||||
|
||||
def build_private_data(self, project_update, private_data_dir):
|
||||
@ -1173,74 +1277,17 @@ class RunProjectUpdate(BaseTask):
|
||||
d[r'^Are you sure you want to continue connecting \(yes/no\)\?\s*?$'] = 'yes'
|
||||
return d
|
||||
|
||||
def release_lock(self, instance):
|
||||
try:
|
||||
fcntl.lockf(self.lock_fd, fcntl.LOCK_UN)
|
||||
except IOError as e:
|
||||
logger.error("I/O error({0}) while trying to release lock file [{1}]: {2}".format(e.errno, instance.get_lock_file(), e.strerror))
|
||||
os.close(self.lock_fd)
|
||||
raise
|
||||
|
||||
os.close(self.lock_fd)
|
||||
self.lock_fd = None
|
||||
|
||||
'''
|
||||
Note: We don't support blocking=False
|
||||
'''
|
||||
|
||||
def acquire_lock(self, instance, blocking=True):
|
||||
lock_path = instance.get_lock_file()
|
||||
if lock_path is None:
|
||||
# If from migration or someone blanked local_path for any other reason, recoverable by save
|
||||
instance.save()
|
||||
lock_path = instance.get_lock_file()
|
||||
if lock_path is None:
|
||||
raise RuntimeError(u'Invalid lock file path')
|
||||
|
||||
try:
|
||||
self.lock_fd = os.open(lock_path, os.O_RDWR | os.O_CREAT)
|
||||
except OSError as e:
|
||||
logger.error("I/O error({0}) while trying to open lock file [{1}]: {2}".format(e.errno, lock_path, e.strerror))
|
||||
raise
|
||||
|
||||
start_time = time.time()
|
||||
while True:
|
||||
try:
|
||||
instance.refresh_from_db(fields=['cancel_flag'])
|
||||
if instance.cancel_flag:
|
||||
logger.debug("ProjectUpdate({0}) was canceled".format(instance.pk))
|
||||
return
|
||||
fcntl.lockf(self.lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
break
|
||||
except IOError as e:
|
||||
if e.errno not in (errno.EAGAIN, errno.EACCES):
|
||||
os.close(self.lock_fd)
|
||||
logger.error("I/O error({0}) while trying to aquire lock on file [{1}]: {2}".format(e.errno, lock_path, e.strerror))
|
||||
raise
|
||||
else:
|
||||
time.sleep(1.0)
|
||||
waiting_time = time.time() - start_time
|
||||
|
||||
if waiting_time > 1.0:
|
||||
logger.info('{} spent {} waiting to acquire lock for local source tree ' 'for path {}.'.format(instance.log_format, waiting_time, lock_path))
|
||||
|
||||
def pre_run_hook(self, instance, private_data_dir):
|
||||
super(RunProjectUpdate, self).pre_run_hook(instance, private_data_dir)
|
||||
# re-create root project folder if a natural disaster has destroyed it
|
||||
if not os.path.exists(settings.PROJECTS_ROOT):
|
||||
os.mkdir(settings.PROJECTS_ROOT)
|
||||
project_path = instance.project.get_project_path(check_if_exists=False)
|
||||
|
||||
self.acquire_lock(instance)
|
||||
|
||||
self.original_branch = None
|
||||
if instance.scm_type == 'git' and instance.branch_override:
|
||||
if os.path.exists(project_path):
|
||||
git_repo = git.Repo(project_path)
|
||||
if git_repo.head.is_detached:
|
||||
self.original_branch = git_repo.head.commit
|
||||
else:
|
||||
self.original_branch = git_repo.active_branch
|
||||
instance.refresh_from_db(fields=['cancel_flag'])
|
||||
if instance.cancel_flag:
|
||||
logger.debug("ProjectUpdate({0}) was canceled".format(instance.pk))
|
||||
return
|
||||
if instance.launch_type != 'sync':
|
||||
self.acquire_lock(instance.project, instance.id)
|
||||
|
||||
if not os.path.exists(project_path):
|
||||
os.makedirs(project_path) # used as container mount
|
||||
@ -1251,11 +1298,12 @@ class RunProjectUpdate(BaseTask):
|
||||
shutil.rmtree(stage_path)
|
||||
os.makedirs(stage_path) # presence of empty cache indicates lack of roles or collections
|
||||
|
||||
def build_project_dir(self, instance, private_data_dir):
|
||||
# the project update playbook is not in a git repo, but uses a vendoring directory
|
||||
# to be consistent with the ansible-runner model,
|
||||
# that is moved into the runner project folder here
|
||||
awx_playbooks = self.get_path_to('../../', 'playbooks')
|
||||
copy_tree(awx_playbooks, os.path.join(private_data_dir, 'project'))
|
||||
shutil.copytree(awx_playbooks, os.path.join(private_data_dir, 'project'))
|
||||
|
||||
@staticmethod
|
||||
def clear_project_cache(cache_dir, keep_value):
|
||||
@ -1272,50 +1320,18 @@ class RunProjectUpdate(BaseTask):
|
||||
logger.warning(f"Could not remove cache directory {old_path}")
|
||||
|
||||
@staticmethod
|
||||
def make_local_copy(p, job_private_data_dir, scm_revision=None):
|
||||
def make_local_copy(project, job_private_data_dir):
|
||||
"""Copy project content (roles and collections) to a job private_data_dir
|
||||
|
||||
:param object p: Either a project or a project update
|
||||
:param object project: Either a project or a project update
|
||||
:param str job_private_data_dir: The root of the target ansible-runner folder
|
||||
:param str scm_revision: For branch_override cases, the git revision to copy
|
||||
"""
|
||||
project_path = p.get_project_path(check_if_exists=False)
|
||||
project_path = project.get_project_path(check_if_exists=False)
|
||||
destination_folder = os.path.join(job_private_data_dir, 'project')
|
||||
if not scm_revision:
|
||||
scm_revision = p.scm_revision
|
||||
|
||||
if p.scm_type == 'git':
|
||||
git_repo = git.Repo(project_path)
|
||||
if not os.path.exists(destination_folder):
|
||||
os.mkdir(destination_folder, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
|
||||
tmp_branch_name = 'awx_internal/{}'.format(uuid4())
|
||||
# always clone based on specific job revision
|
||||
if not p.scm_revision:
|
||||
raise RuntimeError('Unexpectedly could not determine a revision to run from project.')
|
||||
source_branch = git_repo.create_head(tmp_branch_name, p.scm_revision)
|
||||
# git clone must take file:// syntax for source repo or else options like depth will be ignored
|
||||
source_as_uri = Path(project_path).as_uri()
|
||||
git.Repo.clone_from(
|
||||
source_as_uri,
|
||||
destination_folder,
|
||||
branch=source_branch,
|
||||
depth=1,
|
||||
single_branch=True, # shallow, do not copy full history
|
||||
)
|
||||
# submodules copied in loop because shallow copies from local HEADs are ideal
|
||||
# and no git clone submodule options are compatible with minimum requirements
|
||||
for submodule in git_repo.submodules:
|
||||
subrepo_path = os.path.abspath(os.path.join(project_path, submodule.path))
|
||||
subrepo_destination_folder = os.path.abspath(os.path.join(destination_folder, submodule.path))
|
||||
subrepo_uri = Path(subrepo_path).as_uri()
|
||||
git.Repo.clone_from(subrepo_uri, subrepo_destination_folder, depth=1, single_branch=True)
|
||||
# force option is necessary because remote refs are not counted, although no information is lost
|
||||
git_repo.delete_head(tmp_branch_name, force=True)
|
||||
else:
|
||||
copy_tree(project_path, destination_folder, preserve_symlinks=1)
|
||||
shutil.copytree(project_path, destination_folder, ignore=shutil.ignore_patterns('.git'), symlinks=True)
|
||||
|
||||
# copy over the roles and collection cache to job folder
|
||||
cache_path = os.path.join(p.get_cache_path(), p.cache_id)
|
||||
cache_path = os.path.join(project.get_cache_path(), project.cache_id)
|
||||
subfolders = []
|
||||
if settings.AWX_COLLECTIONS_ENABLED:
|
||||
subfolders.append('requirements_collections')
|
||||
@ -1325,8 +1341,8 @@ class RunProjectUpdate(BaseTask):
|
||||
cache_subpath = os.path.join(cache_path, subfolder)
|
||||
if os.path.exists(cache_subpath):
|
||||
dest_subpath = os.path.join(job_private_data_dir, subfolder)
|
||||
copy_tree(cache_subpath, dest_subpath, preserve_symlinks=1)
|
||||
logger.debug('{0} {1} prepared {2} from cache'.format(type(p).__name__, p.pk, dest_subpath))
|
||||
shutil.copytree(cache_subpath, dest_subpath, symlinks=True)
|
||||
logger.debug('{0} {1} prepared {2} from cache'.format(type(project).__name__, project.pk, dest_subpath))
|
||||
|
||||
def post_run_hook(self, instance, status):
|
||||
super(RunProjectUpdate, self).post_run_hook(instance, status)
|
||||
@ -1356,23 +1372,13 @@ class RunProjectUpdate(BaseTask):
|
||||
if self.job_private_data_dir:
|
||||
if status == 'successful':
|
||||
# copy project folder before resetting to default branch
|
||||
# because some git-tree-specific resources (like submodules) might matter
|
||||
self.make_local_copy(instance, self.job_private_data_dir)
|
||||
if self.original_branch:
|
||||
# for git project syncs, non-default branches can be problems
|
||||
# restore to branch the repo was on before this run
|
||||
try:
|
||||
self.original_branch.checkout()
|
||||
except Exception:
|
||||
# this could have failed due to dirty tree, but difficult to predict all cases
|
||||
logger.exception('Failed to restore project repo to prior state after {}'.format(instance.log_format))
|
||||
finally:
|
||||
self.release_lock(instance)
|
||||
if instance.launch_type != 'sync':
|
||||
self.release_lock(instance.project)
|
||||
|
||||
p = instance.project
|
||||
if instance.job_type == 'check' and status not in (
|
||||
'failed',
|
||||
'canceled',
|
||||
):
|
||||
if instance.job_type == 'check' and status not in ('failed', 'canceled'):
|
||||
if self.runner_callback.playbook_new_revision:
|
||||
p.scm_revision = self.runner_callback.playbook_new_revision
|
||||
else:
|
||||
@ -1400,7 +1406,7 @@ class RunProjectUpdate(BaseTask):
|
||||
|
||||
|
||||
@task(queue=get_local_queuename)
|
||||
class RunInventoryUpdate(BaseTask):
|
||||
class RunInventoryUpdate(SourceControlMixin, BaseTask):
|
||||
|
||||
model = InventoryUpdate
|
||||
event_model = InventoryUpdateEvent
|
||||
@ -1556,54 +1562,18 @@ class RunInventoryUpdate(BaseTask):
|
||||
# All credentials not used by inventory source injector
|
||||
return inventory_update.get_extra_credentials()
|
||||
|
||||
def pre_run_hook(self, inventory_update, private_data_dir):
|
||||
super(RunInventoryUpdate, self).pre_run_hook(inventory_update, private_data_dir)
|
||||
def build_project_dir(self, inventory_update, private_data_dir):
|
||||
source_project = None
|
||||
if inventory_update.inventory_source:
|
||||
source_project = inventory_update.inventory_source.source_project
|
||||
if inventory_update.source == 'scm' and source_project and source_project.scm_type: # never ever update manual projects
|
||||
|
||||
# Check if the content cache exists, so that we do not unnecessarily re-download roles
|
||||
sync_needs = ['update_{}'.format(source_project.scm_type)]
|
||||
has_cache = os.path.exists(os.path.join(source_project.get_cache_path(), source_project.cache_id))
|
||||
# Galaxy requirements are not supported for manual projects
|
||||
if not has_cache:
|
||||
sync_needs.extend(['install_roles', 'install_collections'])
|
||||
|
||||
local_project_sync = source_project.create_project_update(
|
||||
_eager_fields=dict(
|
||||
launch_type="sync",
|
||||
job_type='run',
|
||||
job_tags=','.join(sync_needs),
|
||||
status='running',
|
||||
execution_node=Instance.objects.me().hostname,
|
||||
controller_node=Instance.objects.me().hostname,
|
||||
instance_group=inventory_update.instance_group,
|
||||
celery_task_id=inventory_update.celery_task_id,
|
||||
)
|
||||
)
|
||||
local_project_sync.log_lifecycle("controller_node_chosen")
|
||||
local_project_sync.log_lifecycle("execution_node_chosen")
|
||||
create_partition(local_project_sync.event_class._meta.db_table, start=local_project_sync.created)
|
||||
# associate the inventory update before calling run() so that a
|
||||
# cancel() call on the inventory update can cancel the project update
|
||||
local_project_sync.scm_inventory_updates.add(inventory_update)
|
||||
|
||||
project_update_task = local_project_sync._get_task_class()
|
||||
try:
|
||||
sync_task = project_update_task(job_private_data_dir=private_data_dir)
|
||||
sync_task.run(local_project_sync.id)
|
||||
local_project_sync.refresh_from_db()
|
||||
except Exception:
|
||||
inventory_update = self.update_model(
|
||||
inventory_update.pk,
|
||||
status='failed',
|
||||
job_explanation=(
|
||||
'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}'
|
||||
% ('project_update', local_project_sync.name, local_project_sync.id)
|
||||
),
|
||||
)
|
||||
raise
|
||||
if inventory_update.source == 'scm':
|
||||
if not source_project:
|
||||
raise RuntimeError('Could not find project to run SCM inventory update from.')
|
||||
self.sync_and_copy(source_project, private_data_dir)
|
||||
else:
|
||||
# If source is not SCM make an empty project directory, content is built inside inventory folder
|
||||
super(RunInventoryUpdate, self).build_project_dir(inventory_update, private_data_dir)
|
||||
|
||||
def post_run_hook(self, inventory_update, status):
|
||||
super(RunInventoryUpdate, self).post_run_hook(inventory_update, status)
|
||||
|
||||
@ -474,7 +474,7 @@ class TestGenericRun:
|
||||
task.model.objects.get = mock.Mock(return_value=job)
|
||||
task.build_private_data_files = mock.Mock(side_effect=OSError())
|
||||
|
||||
with mock.patch('awx.main.tasks.jobs.copy_tree'):
|
||||
with mock.patch('awx.main.tasks.jobs.shutil.copytree'):
|
||||
with pytest.raises(Exception):
|
||||
task.run(1)
|
||||
|
||||
@ -496,7 +496,7 @@ class TestGenericRun:
|
||||
task.model.objects.get = mock.Mock(return_value=job)
|
||||
task.build_private_data_files = mock.Mock()
|
||||
|
||||
with mock.patch('awx.main.tasks.jobs.copy_tree'):
|
||||
with mock.patch('awx.main.tasks.jobs.shutil.copytree'):
|
||||
with pytest.raises(Exception):
|
||||
task.run(1)
|
||||
|
||||
@ -1946,7 +1946,7 @@ def test_job_run_no_ee(mock_me):
|
||||
task.update_model = mock.Mock(return_value=job)
|
||||
task.model.objects.get = mock.Mock(return_value=job)
|
||||
|
||||
with mock.patch('awx.main.tasks.jobs.copy_tree'):
|
||||
with mock.patch('awx.main.tasks.jobs.shutil.copytree'):
|
||||
with pytest.raises(RuntimeError) as e:
|
||||
task.pre_run_hook(job, private_data_dir)
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user