mirror of
https://github.com/ansible/awx.git
synced 2026-02-26 15:36:04 -03:30
Replace git shallow clone with shutil.copytree
Introduce build_project_dir method the base method will create an empty project dir for workdir Share code between job and inventory tasks with new mixin combine rest of pre_run_hook logic structure to hold lock for entire sync process force sync to run for inventory updates due to UI issues Remove reference to removed scm_last_revision field
This commit is contained in:
@@ -1,6 +1,5 @@
|
|||||||
# Python
|
# Python
|
||||||
from collections import OrderedDict
|
from collections import OrderedDict
|
||||||
from distutils.dir_util import copy_tree
|
|
||||||
import errno
|
import errno
|
||||||
import functools
|
import functools
|
||||||
import fcntl
|
import fcntl
|
||||||
@@ -15,7 +14,6 @@ import tempfile
|
|||||||
import traceback
|
import traceback
|
||||||
import time
|
import time
|
||||||
import urllib.parse as urlparse
|
import urllib.parse as urlparse
|
||||||
from uuid import uuid4
|
|
||||||
|
|
||||||
# Django
|
# Django
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
@@ -211,14 +209,22 @@ class BaseTask(object):
|
|||||||
os.chmod(path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
|
os.chmod(path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
|
||||||
if settings.AWX_CLEANUP_PATHS:
|
if settings.AWX_CLEANUP_PATHS:
|
||||||
self.cleanup_paths.append(path)
|
self.cleanup_paths.append(path)
|
||||||
# Ansible runner requires that project exists,
|
# We will write files in these folders later
|
||||||
# and we will write files in the other folders without pre-creating the folder
|
for subfolder in ('inventory', 'env'):
|
||||||
for subfolder in ('project', 'inventory', 'env'):
|
|
||||||
runner_subfolder = os.path.join(path, subfolder)
|
runner_subfolder = os.path.join(path, subfolder)
|
||||||
if not os.path.exists(runner_subfolder):
|
if not os.path.exists(runner_subfolder):
|
||||||
os.mkdir(runner_subfolder)
|
os.mkdir(runner_subfolder)
|
||||||
return path
|
return path
|
||||||
|
|
||||||
|
def build_project_dir(self, instance, private_data_dir):
|
||||||
|
"""
|
||||||
|
Create the ansible-runner project subdirectory. In many cases this is the source checkout.
|
||||||
|
In cases that do not even need the source checkout, we create an empty dir to be the workdir.
|
||||||
|
"""
|
||||||
|
project_dir = os.path.join(private_data_dir, 'project')
|
||||||
|
if not os.path.exists(project_dir):
|
||||||
|
os.mkdir(project_dir)
|
||||||
|
|
||||||
def build_private_data_files(self, instance, private_data_dir):
|
def build_private_data_files(self, instance, private_data_dir):
|
||||||
"""
|
"""
|
||||||
Creates temporary files containing the private data.
|
Creates temporary files containing the private data.
|
||||||
@@ -354,6 +360,52 @@ class BaseTask(object):
|
|||||||
expect_passwords[k] = passwords.get(v, '') or ''
|
expect_passwords[k] = passwords.get(v, '') or ''
|
||||||
return expect_passwords
|
return expect_passwords
|
||||||
|
|
||||||
|
def release_lock(self, project):
|
||||||
|
try:
|
||||||
|
fcntl.lockf(self.lock_fd, fcntl.LOCK_UN)
|
||||||
|
except IOError as e:
|
||||||
|
logger.error("I/O error({0}) while trying to release lock file [{1}]: {2}".format(e.errno, project.get_lock_file(), e.strerror))
|
||||||
|
os.close(self.lock_fd)
|
||||||
|
raise
|
||||||
|
|
||||||
|
os.close(self.lock_fd)
|
||||||
|
self.lock_fd = None
|
||||||
|
|
||||||
|
def acquire_lock(self, project, unified_job_id=None):
|
||||||
|
if not os.path.exists(settings.PROJECTS_ROOT):
|
||||||
|
os.mkdir(settings.PROJECTS_ROOT)
|
||||||
|
|
||||||
|
lock_path = project.get_lock_file()
|
||||||
|
if lock_path is None:
|
||||||
|
# If from migration or someone blanked local_path for any other reason, recoverable by save
|
||||||
|
project.save()
|
||||||
|
lock_path = project.get_lock_file()
|
||||||
|
if lock_path is None:
|
||||||
|
raise RuntimeError(u'Invalid lock file path')
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.lock_fd = os.open(lock_path, os.O_RDWR | os.O_CREAT)
|
||||||
|
except OSError as e:
|
||||||
|
logger.error("I/O error({0}) while trying to open lock file [{1}]: {2}".format(e.errno, lock_path, e.strerror))
|
||||||
|
raise
|
||||||
|
|
||||||
|
start_time = time.time()
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
fcntl.lockf(self.lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||||
|
break
|
||||||
|
except IOError as e:
|
||||||
|
if e.errno not in (errno.EAGAIN, errno.EACCES):
|
||||||
|
os.close(self.lock_fd)
|
||||||
|
logger.error("I/O error({0}) while trying to aquire lock on file [{1}]: {2}".format(e.errno, lock_path, e.strerror))
|
||||||
|
raise
|
||||||
|
else:
|
||||||
|
time.sleep(1.0)
|
||||||
|
waiting_time = time.time() - start_time
|
||||||
|
|
||||||
|
if waiting_time > 1.0:
|
||||||
|
logger.info(f'Job {unified_job_id} waited {waiting_time} to acquire lock for local source tree for path {lock_path}.')
|
||||||
|
|
||||||
def pre_run_hook(self, instance, private_data_dir):
|
def pre_run_hook(self, instance, private_data_dir):
|
||||||
"""
|
"""
|
||||||
Hook for any steps to run before the job/task starts
|
Hook for any steps to run before the job/task starts
|
||||||
@@ -424,6 +476,7 @@ class BaseTask(object):
|
|||||||
self.instance.send_notification_templates("running")
|
self.instance.send_notification_templates("running")
|
||||||
private_data_dir = self.build_private_data_dir(self.instance)
|
private_data_dir = self.build_private_data_dir(self.instance)
|
||||||
self.pre_run_hook(self.instance, private_data_dir)
|
self.pre_run_hook(self.instance, private_data_dir)
|
||||||
|
self.build_project_dir(self.instance, private_data_dir)
|
||||||
self.instance.log_lifecycle("preparing_playbook")
|
self.instance.log_lifecycle("preparing_playbook")
|
||||||
if self.instance.cancel_flag or signal_callback():
|
if self.instance.cancel_flag or signal_callback():
|
||||||
self.instance = self.update_model(self.instance.pk, status='canceled')
|
self.instance = self.update_model(self.instance.pk, status='canceled')
|
||||||
@@ -593,8 +646,144 @@ class BaseTask(object):
|
|||||||
raise AwxTaskError.TaskError(self.instance, rc)
|
raise AwxTaskError.TaskError(self.instance, rc)
|
||||||
|
|
||||||
|
|
||||||
|
class SourceControlMixin(BaseTask):
|
||||||
|
"""Utility methods for tasks that run use content from source control"""
|
||||||
|
|
||||||
|
def get_sync_needs(self, project, scm_branch=None):
|
||||||
|
project_path = project.get_project_path(check_if_exists=False)
|
||||||
|
job_revision = project.scm_revision
|
||||||
|
sync_needs = []
|
||||||
|
source_update_tag = 'update_{}'.format(project.scm_type)
|
||||||
|
branch_override = bool(scm_branch and scm_branch != project.scm_branch)
|
||||||
|
# TODO: skip syncs for inventory updates. Now, UI needs a link added so clients can link to project
|
||||||
|
# source_project is only a field on inventory sources.
|
||||||
|
if isinstance(self.instance, InventoryUpdate):
|
||||||
|
sync_needs.append(source_update_tag)
|
||||||
|
elif not project.scm_type:
|
||||||
|
pass # manual projects are not synced, user has responsibility for that
|
||||||
|
elif not os.path.exists(project_path):
|
||||||
|
logger.debug(f'Performing fresh clone of {project.id} for unified job {self.instance.id} on this instance.')
|
||||||
|
sync_needs.append(source_update_tag)
|
||||||
|
elif project.scm_type == 'git' and project.scm_revision and (not branch_override):
|
||||||
|
try:
|
||||||
|
git_repo = git.Repo(project_path)
|
||||||
|
|
||||||
|
if job_revision == git_repo.head.commit.hexsha:
|
||||||
|
logger.debug(f'Skipping project sync for {self.instance.id} because commit is locally available')
|
||||||
|
else:
|
||||||
|
sync_needs.append(source_update_tag)
|
||||||
|
except (ValueError, BadGitName, git.exc.InvalidGitRepositoryError):
|
||||||
|
logger.debug(f'Needed commit for {self.instance.id} not in local source tree, will sync with remote')
|
||||||
|
sync_needs.append(source_update_tag)
|
||||||
|
else:
|
||||||
|
logger.debug(f'Project not available locally, {self.instance.id} will sync with remote')
|
||||||
|
sync_needs.append(source_update_tag)
|
||||||
|
|
||||||
|
has_cache = os.path.exists(os.path.join(project.get_cache_path(), project.cache_id))
|
||||||
|
# Galaxy requirements are not supported for manual projects
|
||||||
|
if project.scm_type and ((not has_cache) or branch_override):
|
||||||
|
sync_needs.extend(['install_roles', 'install_collections'])
|
||||||
|
|
||||||
|
return sync_needs
|
||||||
|
|
||||||
|
def spawn_project_sync(self, project, sync_needs, scm_branch=None):
|
||||||
|
pu_ig = self.instance.instance_group
|
||||||
|
pu_en = Instance.objects.me().hostname
|
||||||
|
|
||||||
|
sync_metafields = dict(
|
||||||
|
launch_type="sync",
|
||||||
|
job_type='run',
|
||||||
|
job_tags=','.join(sync_needs),
|
||||||
|
status='running',
|
||||||
|
instance_group=pu_ig,
|
||||||
|
execution_node=pu_en,
|
||||||
|
controller_node=pu_en,
|
||||||
|
celery_task_id=self.instance.celery_task_id,
|
||||||
|
)
|
||||||
|
if scm_branch and scm_branch != project.scm_branch:
|
||||||
|
sync_metafields['scm_branch'] = scm_branch
|
||||||
|
sync_metafields['scm_clean'] = True # to accomidate force pushes
|
||||||
|
if 'update_' not in sync_metafields['job_tags']:
|
||||||
|
sync_metafields['scm_revision'] = project.scm_revision
|
||||||
|
local_project_sync = project.create_project_update(_eager_fields=sync_metafields)
|
||||||
|
local_project_sync.log_lifecycle("controller_node_chosen")
|
||||||
|
local_project_sync.log_lifecycle("execution_node_chosen")
|
||||||
|
create_partition(local_project_sync.event_class._meta.db_table, start=local_project_sync.created)
|
||||||
|
return local_project_sync
|
||||||
|
|
||||||
|
def sync_and_copy_without_lock(self, project, private_data_dir, scm_branch=None):
|
||||||
|
sync_needs = self.get_sync_needs(project, scm_branch=scm_branch)
|
||||||
|
|
||||||
|
if sync_needs:
|
||||||
|
local_project_sync = self.spawn_project_sync(project, sync_needs, scm_branch=scm_branch)
|
||||||
|
# save the associated job before calling run() so that a
|
||||||
|
# cancel() call on the job can cancel the project update
|
||||||
|
if isinstance(self.instance, Job):
|
||||||
|
self.instance = self.update_model(self.instance.pk, project_update=local_project_sync)
|
||||||
|
else:
|
||||||
|
self.instance = self.update_model(self.instance.pk, source_project_update=local_project_sync)
|
||||||
|
|
||||||
|
try:
|
||||||
|
# the job private_data_dir is passed so sync can download roles and collections there
|
||||||
|
sync_task = RunProjectUpdate(job_private_data_dir=private_data_dir)
|
||||||
|
sync_task.run(local_project_sync.id)
|
||||||
|
local_project_sync.refresh_from_db()
|
||||||
|
if isinstance(self.instance, Job):
|
||||||
|
self.instance = self.update_model(self.instance.pk, scm_revision=local_project_sync.scm_revision)
|
||||||
|
except Exception:
|
||||||
|
local_project_sync.refresh_from_db()
|
||||||
|
if local_project_sync.status != 'canceled':
|
||||||
|
self.instance = self.update_model(
|
||||||
|
self.instance.pk,
|
||||||
|
status='failed',
|
||||||
|
job_explanation=(
|
||||||
|
'Previous Task Failed: {"job_type": "project_update", '
|
||||||
|
f'"job_name": "{local_project_sync.name}", "job_id": "{local_project_sync.id}"}}'
|
||||||
|
),
|
||||||
|
)
|
||||||
|
raise
|
||||||
|
self.instance.refresh_from_db()
|
||||||
|
if self.instance.cancel_flag:
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
# Case where a local sync is not needed, meaning that local tree is
|
||||||
|
# up-to-date with project, job is running project current version
|
||||||
|
if isinstance(self.instance, Job):
|
||||||
|
self.instance = self.update_model(self.instance.pk, scm_revision=project.scm_revision)
|
||||||
|
# Project update does not copy the folder, so copy here
|
||||||
|
RunProjectUpdate.make_local_copy(project, private_data_dir)
|
||||||
|
|
||||||
|
def sync_and_copy(self, project, private_data_dir, scm_branch=None):
|
||||||
|
self.acquire_lock(project, self.instance.id)
|
||||||
|
|
||||||
|
try:
|
||||||
|
original_branch = None
|
||||||
|
project_path = project.get_project_path(check_if_exists=False)
|
||||||
|
if project.scm_type == 'git' and (scm_branch and scm_branch != project.scm_branch):
|
||||||
|
if os.path.exists(project_path):
|
||||||
|
git_repo = git.Repo(project_path)
|
||||||
|
if git_repo.head.is_detached:
|
||||||
|
original_branch = git_repo.head.commit
|
||||||
|
else:
|
||||||
|
original_branch = git_repo.active_branch
|
||||||
|
|
||||||
|
return self.sync_and_copy_without_lock(project, private_data_dir, scm_branch=scm_branch)
|
||||||
|
finally:
|
||||||
|
# We have made the copy so we can set the tree back to its normal state
|
||||||
|
if original_branch:
|
||||||
|
# for git project syncs, non-default branches can be problems
|
||||||
|
# restore to branch the repo was on before this run
|
||||||
|
try:
|
||||||
|
original_branch.checkout()
|
||||||
|
except Exception:
|
||||||
|
# this could have failed due to dirty tree, but difficult to predict all cases
|
||||||
|
logger.exception(f'Failed to restore project repo to prior state after {self.instance.id}')
|
||||||
|
|
||||||
|
self.release_lock(project)
|
||||||
|
|
||||||
|
|
||||||
@task(queue=get_local_queuename)
|
@task(queue=get_local_queuename)
|
||||||
class RunJob(BaseTask):
|
class RunJob(SourceControlMixin, BaseTask):
|
||||||
"""
|
"""
|
||||||
Run a job using ansible-playbook.
|
Run a job using ansible-playbook.
|
||||||
"""
|
"""
|
||||||
@@ -863,98 +1052,14 @@ class RunJob(BaseTask):
|
|||||||
job = self.update_model(job.pk, status='failed', job_explanation=msg)
|
job = self.update_model(job.pk, status='failed', job_explanation=msg)
|
||||||
raise RuntimeError(msg)
|
raise RuntimeError(msg)
|
||||||
|
|
||||||
project_path = job.project.get_project_path(check_if_exists=False)
|
|
||||||
job_revision = job.project.scm_revision
|
|
||||||
sync_needs = []
|
|
||||||
source_update_tag = 'update_{}'.format(job.project.scm_type)
|
|
||||||
branch_override = bool(job.scm_branch and job.scm_branch != job.project.scm_branch)
|
|
||||||
if not job.project.scm_type:
|
|
||||||
pass # manual projects are not synced, user has responsibility for that
|
|
||||||
elif not os.path.exists(project_path):
|
|
||||||
logger.debug('Performing fresh clone of {} on this instance.'.format(job.project))
|
|
||||||
sync_needs.append(source_update_tag)
|
|
||||||
elif job.project.scm_type == 'git' and job.project.scm_revision and (not branch_override):
|
|
||||||
try:
|
|
||||||
git_repo = git.Repo(project_path)
|
|
||||||
|
|
||||||
if job_revision == git_repo.head.commit.hexsha:
|
|
||||||
logger.debug('Skipping project sync for {} because commit is locally available'.format(job.log_format))
|
|
||||||
else:
|
|
||||||
sync_needs.append(source_update_tag)
|
|
||||||
except (ValueError, BadGitName, git.exc.InvalidGitRepositoryError):
|
|
||||||
logger.debug('Needed commit for {} not in local source tree, will sync with remote'.format(job.log_format))
|
|
||||||
sync_needs.append(source_update_tag)
|
|
||||||
else:
|
|
||||||
logger.debug('Project not available locally, {} will sync with remote'.format(job.log_format))
|
|
||||||
sync_needs.append(source_update_tag)
|
|
||||||
|
|
||||||
has_cache = os.path.exists(os.path.join(job.project.get_cache_path(), job.project.cache_id))
|
|
||||||
# Galaxy requirements are not supported for manual projects
|
|
||||||
if job.project.scm_type and ((not has_cache) or branch_override):
|
|
||||||
sync_needs.extend(['install_roles', 'install_collections'])
|
|
||||||
|
|
||||||
if sync_needs:
|
|
||||||
pu_ig = job.instance_group
|
|
||||||
pu_en = Instance.objects.me().hostname
|
|
||||||
|
|
||||||
sync_metafields = dict(
|
|
||||||
launch_type="sync",
|
|
||||||
job_type='run',
|
|
||||||
job_tags=','.join(sync_needs),
|
|
||||||
status='running',
|
|
||||||
instance_group=pu_ig,
|
|
||||||
execution_node=pu_en,
|
|
||||||
controller_node=pu_en,
|
|
||||||
celery_task_id=job.celery_task_id,
|
|
||||||
)
|
|
||||||
if branch_override:
|
|
||||||
sync_metafields['scm_branch'] = job.scm_branch
|
|
||||||
sync_metafields['scm_clean'] = True # to accomidate force pushes
|
|
||||||
if 'update_' not in sync_metafields['job_tags']:
|
|
||||||
sync_metafields['scm_revision'] = job_revision
|
|
||||||
local_project_sync = job.project.create_project_update(_eager_fields=sync_metafields)
|
|
||||||
local_project_sync.log_lifecycle("controller_node_chosen")
|
|
||||||
local_project_sync.log_lifecycle("execution_node_chosen")
|
|
||||||
create_partition(local_project_sync.event_class._meta.db_table, start=local_project_sync.created)
|
|
||||||
# save the associated job before calling run() so that a
|
|
||||||
# cancel() call on the job can cancel the project update
|
|
||||||
job = self.update_model(job.pk, project_update=local_project_sync)
|
|
||||||
|
|
||||||
project_update_task = local_project_sync._get_task_class()
|
|
||||||
try:
|
|
||||||
# the job private_data_dir is passed so sync can download roles and collections there
|
|
||||||
sync_task = project_update_task(job_private_data_dir=private_data_dir)
|
|
||||||
sync_task.run(local_project_sync.id)
|
|
||||||
local_project_sync.refresh_from_db()
|
|
||||||
job = self.update_model(job.pk, scm_revision=local_project_sync.scm_revision)
|
|
||||||
except Exception:
|
|
||||||
local_project_sync.refresh_from_db()
|
|
||||||
if local_project_sync.status != 'canceled':
|
|
||||||
job = self.update_model(
|
|
||||||
job.pk,
|
|
||||||
status='failed',
|
|
||||||
job_explanation=(
|
|
||||||
'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}'
|
|
||||||
% ('project_update', local_project_sync.name, local_project_sync.id)
|
|
||||||
),
|
|
||||||
)
|
|
||||||
raise
|
|
||||||
job.refresh_from_db()
|
|
||||||
if job.cancel_flag:
|
|
||||||
return
|
|
||||||
else:
|
|
||||||
# Case where a local sync is not needed, meaning that local tree is
|
|
||||||
# up-to-date with project, job is running project current version
|
|
||||||
if job_revision:
|
|
||||||
job = self.update_model(job.pk, scm_revision=job_revision)
|
|
||||||
# Project update does not copy the folder, so copy here
|
|
||||||
RunProjectUpdate.make_local_copy(job.project, private_data_dir, scm_revision=job_revision)
|
|
||||||
|
|
||||||
if job.inventory.kind == 'smart':
|
if job.inventory.kind == 'smart':
|
||||||
# cache smart inventory memberships so that the host_filter query is not
|
# cache smart inventory memberships so that the host_filter query is not
|
||||||
# ran inside of the event saving code
|
# ran inside of the event saving code
|
||||||
update_smart_memberships_for_inventory(job.inventory)
|
update_smart_memberships_for_inventory(job.inventory)
|
||||||
|
|
||||||
|
def build_project_dir(self, job, private_data_dir):
|
||||||
|
self.sync_and_copy(job.project, private_data_dir, scm_branch=job.scm_branch)
|
||||||
|
|
||||||
def final_run_hook(self, job, status, private_data_dir, fact_modification_times):
|
def final_run_hook(self, job, status, private_data_dir, fact_modification_times):
|
||||||
super(RunJob, self).final_run_hook(job, status, private_data_dir, fact_modification_times)
|
super(RunJob, self).final_run_hook(job, status, private_data_dir, fact_modification_times)
|
||||||
if not private_data_dir:
|
if not private_data_dir:
|
||||||
@@ -986,7 +1091,6 @@ class RunProjectUpdate(BaseTask):
|
|||||||
|
|
||||||
def __init__(self, *args, job_private_data_dir=None, **kwargs):
|
def __init__(self, *args, job_private_data_dir=None, **kwargs):
|
||||||
super(RunProjectUpdate, self).__init__(*args, **kwargs)
|
super(RunProjectUpdate, self).__init__(*args, **kwargs)
|
||||||
self.original_branch = None
|
|
||||||
self.job_private_data_dir = job_private_data_dir
|
self.job_private_data_dir = job_private_data_dir
|
||||||
|
|
||||||
def build_private_data(self, project_update, private_data_dir):
|
def build_private_data(self, project_update, private_data_dir):
|
||||||
@@ -1173,74 +1277,17 @@ class RunProjectUpdate(BaseTask):
|
|||||||
d[r'^Are you sure you want to continue connecting \(yes/no\)\?\s*?$'] = 'yes'
|
d[r'^Are you sure you want to continue connecting \(yes/no\)\?\s*?$'] = 'yes'
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def release_lock(self, instance):
|
|
||||||
try:
|
|
||||||
fcntl.lockf(self.lock_fd, fcntl.LOCK_UN)
|
|
||||||
except IOError as e:
|
|
||||||
logger.error("I/O error({0}) while trying to release lock file [{1}]: {2}".format(e.errno, instance.get_lock_file(), e.strerror))
|
|
||||||
os.close(self.lock_fd)
|
|
||||||
raise
|
|
||||||
|
|
||||||
os.close(self.lock_fd)
|
|
||||||
self.lock_fd = None
|
|
||||||
|
|
||||||
'''
|
|
||||||
Note: We don't support blocking=False
|
|
||||||
'''
|
|
||||||
|
|
||||||
def acquire_lock(self, instance, blocking=True):
|
|
||||||
lock_path = instance.get_lock_file()
|
|
||||||
if lock_path is None:
|
|
||||||
# If from migration or someone blanked local_path for any other reason, recoverable by save
|
|
||||||
instance.save()
|
|
||||||
lock_path = instance.get_lock_file()
|
|
||||||
if lock_path is None:
|
|
||||||
raise RuntimeError(u'Invalid lock file path')
|
|
||||||
|
|
||||||
try:
|
|
||||||
self.lock_fd = os.open(lock_path, os.O_RDWR | os.O_CREAT)
|
|
||||||
except OSError as e:
|
|
||||||
logger.error("I/O error({0}) while trying to open lock file [{1}]: {2}".format(e.errno, lock_path, e.strerror))
|
|
||||||
raise
|
|
||||||
|
|
||||||
start_time = time.time()
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
instance.refresh_from_db(fields=['cancel_flag'])
|
|
||||||
if instance.cancel_flag:
|
|
||||||
logger.debug("ProjectUpdate({0}) was canceled".format(instance.pk))
|
|
||||||
return
|
|
||||||
fcntl.lockf(self.lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
|
||||||
break
|
|
||||||
except IOError as e:
|
|
||||||
if e.errno not in (errno.EAGAIN, errno.EACCES):
|
|
||||||
os.close(self.lock_fd)
|
|
||||||
logger.error("I/O error({0}) while trying to aquire lock on file [{1}]: {2}".format(e.errno, lock_path, e.strerror))
|
|
||||||
raise
|
|
||||||
else:
|
|
||||||
time.sleep(1.0)
|
|
||||||
waiting_time = time.time() - start_time
|
|
||||||
|
|
||||||
if waiting_time > 1.0:
|
|
||||||
logger.info('{} spent {} waiting to acquire lock for local source tree ' 'for path {}.'.format(instance.log_format, waiting_time, lock_path))
|
|
||||||
|
|
||||||
def pre_run_hook(self, instance, private_data_dir):
|
def pre_run_hook(self, instance, private_data_dir):
|
||||||
super(RunProjectUpdate, self).pre_run_hook(instance, private_data_dir)
|
super(RunProjectUpdate, self).pre_run_hook(instance, private_data_dir)
|
||||||
# re-create root project folder if a natural disaster has destroyed it
|
# re-create root project folder if a natural disaster has destroyed it
|
||||||
if not os.path.exists(settings.PROJECTS_ROOT):
|
|
||||||
os.mkdir(settings.PROJECTS_ROOT)
|
|
||||||
project_path = instance.project.get_project_path(check_if_exists=False)
|
project_path = instance.project.get_project_path(check_if_exists=False)
|
||||||
|
|
||||||
self.acquire_lock(instance)
|
instance.refresh_from_db(fields=['cancel_flag'])
|
||||||
|
if instance.cancel_flag:
|
||||||
self.original_branch = None
|
logger.debug("ProjectUpdate({0}) was canceled".format(instance.pk))
|
||||||
if instance.scm_type == 'git' and instance.branch_override:
|
return
|
||||||
if os.path.exists(project_path):
|
if instance.launch_type != 'sync':
|
||||||
git_repo = git.Repo(project_path)
|
self.acquire_lock(instance.project, instance.id)
|
||||||
if git_repo.head.is_detached:
|
|
||||||
self.original_branch = git_repo.head.commit
|
|
||||||
else:
|
|
||||||
self.original_branch = git_repo.active_branch
|
|
||||||
|
|
||||||
if not os.path.exists(project_path):
|
if not os.path.exists(project_path):
|
||||||
os.makedirs(project_path) # used as container mount
|
os.makedirs(project_path) # used as container mount
|
||||||
@@ -1251,11 +1298,12 @@ class RunProjectUpdate(BaseTask):
|
|||||||
shutil.rmtree(stage_path)
|
shutil.rmtree(stage_path)
|
||||||
os.makedirs(stage_path) # presence of empty cache indicates lack of roles or collections
|
os.makedirs(stage_path) # presence of empty cache indicates lack of roles or collections
|
||||||
|
|
||||||
|
def build_project_dir(self, instance, private_data_dir):
|
||||||
# the project update playbook is not in a git repo, but uses a vendoring directory
|
# the project update playbook is not in a git repo, but uses a vendoring directory
|
||||||
# to be consistent with the ansible-runner model,
|
# to be consistent with the ansible-runner model,
|
||||||
# that is moved into the runner project folder here
|
# that is moved into the runner project folder here
|
||||||
awx_playbooks = self.get_path_to('../../', 'playbooks')
|
awx_playbooks = self.get_path_to('../../', 'playbooks')
|
||||||
copy_tree(awx_playbooks, os.path.join(private_data_dir, 'project'))
|
shutil.copytree(awx_playbooks, os.path.join(private_data_dir, 'project'))
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def clear_project_cache(cache_dir, keep_value):
|
def clear_project_cache(cache_dir, keep_value):
|
||||||
@@ -1272,50 +1320,18 @@ class RunProjectUpdate(BaseTask):
|
|||||||
logger.warning(f"Could not remove cache directory {old_path}")
|
logger.warning(f"Could not remove cache directory {old_path}")
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def make_local_copy(p, job_private_data_dir, scm_revision=None):
|
def make_local_copy(project, job_private_data_dir):
|
||||||
"""Copy project content (roles and collections) to a job private_data_dir
|
"""Copy project content (roles and collections) to a job private_data_dir
|
||||||
|
|
||||||
:param object p: Either a project or a project update
|
:param object project: Either a project or a project update
|
||||||
:param str job_private_data_dir: The root of the target ansible-runner folder
|
:param str job_private_data_dir: The root of the target ansible-runner folder
|
||||||
:param str scm_revision: For branch_override cases, the git revision to copy
|
|
||||||
"""
|
"""
|
||||||
project_path = p.get_project_path(check_if_exists=False)
|
project_path = project.get_project_path(check_if_exists=False)
|
||||||
destination_folder = os.path.join(job_private_data_dir, 'project')
|
destination_folder = os.path.join(job_private_data_dir, 'project')
|
||||||
if not scm_revision:
|
shutil.copytree(project_path, destination_folder, ignore=shutil.ignore_patterns('.git'), symlinks=True)
|
||||||
scm_revision = p.scm_revision
|
|
||||||
|
|
||||||
if p.scm_type == 'git':
|
|
||||||
git_repo = git.Repo(project_path)
|
|
||||||
if not os.path.exists(destination_folder):
|
|
||||||
os.mkdir(destination_folder, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
|
|
||||||
tmp_branch_name = 'awx_internal/{}'.format(uuid4())
|
|
||||||
# always clone based on specific job revision
|
|
||||||
if not p.scm_revision:
|
|
||||||
raise RuntimeError('Unexpectedly could not determine a revision to run from project.')
|
|
||||||
source_branch = git_repo.create_head(tmp_branch_name, p.scm_revision)
|
|
||||||
# git clone must take file:// syntax for source repo or else options like depth will be ignored
|
|
||||||
source_as_uri = Path(project_path).as_uri()
|
|
||||||
git.Repo.clone_from(
|
|
||||||
source_as_uri,
|
|
||||||
destination_folder,
|
|
||||||
branch=source_branch,
|
|
||||||
depth=1,
|
|
||||||
single_branch=True, # shallow, do not copy full history
|
|
||||||
)
|
|
||||||
# submodules copied in loop because shallow copies from local HEADs are ideal
|
|
||||||
# and no git clone submodule options are compatible with minimum requirements
|
|
||||||
for submodule in git_repo.submodules:
|
|
||||||
subrepo_path = os.path.abspath(os.path.join(project_path, submodule.path))
|
|
||||||
subrepo_destination_folder = os.path.abspath(os.path.join(destination_folder, submodule.path))
|
|
||||||
subrepo_uri = Path(subrepo_path).as_uri()
|
|
||||||
git.Repo.clone_from(subrepo_uri, subrepo_destination_folder, depth=1, single_branch=True)
|
|
||||||
# force option is necessary because remote refs are not counted, although no information is lost
|
|
||||||
git_repo.delete_head(tmp_branch_name, force=True)
|
|
||||||
else:
|
|
||||||
copy_tree(project_path, destination_folder, preserve_symlinks=1)
|
|
||||||
|
|
||||||
# copy over the roles and collection cache to job folder
|
# copy over the roles and collection cache to job folder
|
||||||
cache_path = os.path.join(p.get_cache_path(), p.cache_id)
|
cache_path = os.path.join(project.get_cache_path(), project.cache_id)
|
||||||
subfolders = []
|
subfolders = []
|
||||||
if settings.AWX_COLLECTIONS_ENABLED:
|
if settings.AWX_COLLECTIONS_ENABLED:
|
||||||
subfolders.append('requirements_collections')
|
subfolders.append('requirements_collections')
|
||||||
@@ -1325,8 +1341,8 @@ class RunProjectUpdate(BaseTask):
|
|||||||
cache_subpath = os.path.join(cache_path, subfolder)
|
cache_subpath = os.path.join(cache_path, subfolder)
|
||||||
if os.path.exists(cache_subpath):
|
if os.path.exists(cache_subpath):
|
||||||
dest_subpath = os.path.join(job_private_data_dir, subfolder)
|
dest_subpath = os.path.join(job_private_data_dir, subfolder)
|
||||||
copy_tree(cache_subpath, dest_subpath, preserve_symlinks=1)
|
shutil.copytree(cache_subpath, dest_subpath, symlinks=True)
|
||||||
logger.debug('{0} {1} prepared {2} from cache'.format(type(p).__name__, p.pk, dest_subpath))
|
logger.debug('{0} {1} prepared {2} from cache'.format(type(project).__name__, project.pk, dest_subpath))
|
||||||
|
|
||||||
def post_run_hook(self, instance, status):
|
def post_run_hook(self, instance, status):
|
||||||
super(RunProjectUpdate, self).post_run_hook(instance, status)
|
super(RunProjectUpdate, self).post_run_hook(instance, status)
|
||||||
@@ -1356,23 +1372,13 @@ class RunProjectUpdate(BaseTask):
|
|||||||
if self.job_private_data_dir:
|
if self.job_private_data_dir:
|
||||||
if status == 'successful':
|
if status == 'successful':
|
||||||
# copy project folder before resetting to default branch
|
# copy project folder before resetting to default branch
|
||||||
# because some git-tree-specific resources (like submodules) might matter
|
|
||||||
self.make_local_copy(instance, self.job_private_data_dir)
|
self.make_local_copy(instance, self.job_private_data_dir)
|
||||||
if self.original_branch:
|
|
||||||
# for git project syncs, non-default branches can be problems
|
|
||||||
# restore to branch the repo was on before this run
|
|
||||||
try:
|
|
||||||
self.original_branch.checkout()
|
|
||||||
except Exception:
|
|
||||||
# this could have failed due to dirty tree, but difficult to predict all cases
|
|
||||||
logger.exception('Failed to restore project repo to prior state after {}'.format(instance.log_format))
|
|
||||||
finally:
|
finally:
|
||||||
self.release_lock(instance)
|
if instance.launch_type != 'sync':
|
||||||
|
self.release_lock(instance.project)
|
||||||
|
|
||||||
p = instance.project
|
p = instance.project
|
||||||
if instance.job_type == 'check' and status not in (
|
if instance.job_type == 'check' and status not in ('failed', 'canceled'):
|
||||||
'failed',
|
|
||||||
'canceled',
|
|
||||||
):
|
|
||||||
if self.runner_callback.playbook_new_revision:
|
if self.runner_callback.playbook_new_revision:
|
||||||
p.scm_revision = self.runner_callback.playbook_new_revision
|
p.scm_revision = self.runner_callback.playbook_new_revision
|
||||||
else:
|
else:
|
||||||
@@ -1400,7 +1406,7 @@ class RunProjectUpdate(BaseTask):
|
|||||||
|
|
||||||
|
|
||||||
@task(queue=get_local_queuename)
|
@task(queue=get_local_queuename)
|
||||||
class RunInventoryUpdate(BaseTask):
|
class RunInventoryUpdate(SourceControlMixin, BaseTask):
|
||||||
|
|
||||||
model = InventoryUpdate
|
model = InventoryUpdate
|
||||||
event_model = InventoryUpdateEvent
|
event_model = InventoryUpdateEvent
|
||||||
@@ -1556,54 +1562,18 @@ class RunInventoryUpdate(BaseTask):
|
|||||||
# All credentials not used by inventory source injector
|
# All credentials not used by inventory source injector
|
||||||
return inventory_update.get_extra_credentials()
|
return inventory_update.get_extra_credentials()
|
||||||
|
|
||||||
def pre_run_hook(self, inventory_update, private_data_dir):
|
def build_project_dir(self, inventory_update, private_data_dir):
|
||||||
super(RunInventoryUpdate, self).pre_run_hook(inventory_update, private_data_dir)
|
|
||||||
source_project = None
|
source_project = None
|
||||||
if inventory_update.inventory_source:
|
if inventory_update.inventory_source:
|
||||||
source_project = inventory_update.inventory_source.source_project
|
source_project = inventory_update.inventory_source.source_project
|
||||||
if inventory_update.source == 'scm' and source_project and source_project.scm_type: # never ever update manual projects
|
|
||||||
|
|
||||||
# Check if the content cache exists, so that we do not unnecessarily re-download roles
|
if inventory_update.source == 'scm':
|
||||||
sync_needs = ['update_{}'.format(source_project.scm_type)]
|
if not source_project:
|
||||||
has_cache = os.path.exists(os.path.join(source_project.get_cache_path(), source_project.cache_id))
|
raise RuntimeError('Could not find project to run SCM inventory update from.')
|
||||||
# Galaxy requirements are not supported for manual projects
|
self.sync_and_copy(source_project, private_data_dir)
|
||||||
if not has_cache:
|
else:
|
||||||
sync_needs.extend(['install_roles', 'install_collections'])
|
# If source is not SCM make an empty project directory, content is built inside inventory folder
|
||||||
|
super(RunInventoryUpdate, self).build_project_dir(inventory_update, private_data_dir)
|
||||||
local_project_sync = source_project.create_project_update(
|
|
||||||
_eager_fields=dict(
|
|
||||||
launch_type="sync",
|
|
||||||
job_type='run',
|
|
||||||
job_tags=','.join(sync_needs),
|
|
||||||
status='running',
|
|
||||||
execution_node=Instance.objects.me().hostname,
|
|
||||||
controller_node=Instance.objects.me().hostname,
|
|
||||||
instance_group=inventory_update.instance_group,
|
|
||||||
celery_task_id=inventory_update.celery_task_id,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
local_project_sync.log_lifecycle("controller_node_chosen")
|
|
||||||
local_project_sync.log_lifecycle("execution_node_chosen")
|
|
||||||
create_partition(local_project_sync.event_class._meta.db_table, start=local_project_sync.created)
|
|
||||||
# associate the inventory update before calling run() so that a
|
|
||||||
# cancel() call on the inventory update can cancel the project update
|
|
||||||
local_project_sync.scm_inventory_updates.add(inventory_update)
|
|
||||||
|
|
||||||
project_update_task = local_project_sync._get_task_class()
|
|
||||||
try:
|
|
||||||
sync_task = project_update_task(job_private_data_dir=private_data_dir)
|
|
||||||
sync_task.run(local_project_sync.id)
|
|
||||||
local_project_sync.refresh_from_db()
|
|
||||||
except Exception:
|
|
||||||
inventory_update = self.update_model(
|
|
||||||
inventory_update.pk,
|
|
||||||
status='failed',
|
|
||||||
job_explanation=(
|
|
||||||
'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}'
|
|
||||||
% ('project_update', local_project_sync.name, local_project_sync.id)
|
|
||||||
),
|
|
||||||
)
|
|
||||||
raise
|
|
||||||
|
|
||||||
def post_run_hook(self, inventory_update, status):
|
def post_run_hook(self, inventory_update, status):
|
||||||
super(RunInventoryUpdate, self).post_run_hook(inventory_update, status)
|
super(RunInventoryUpdate, self).post_run_hook(inventory_update, status)
|
||||||
|
|||||||
@@ -472,7 +472,7 @@ class TestGenericRun:
|
|||||||
task.model.objects.get = mock.Mock(return_value=job)
|
task.model.objects.get = mock.Mock(return_value=job)
|
||||||
task.build_private_data_files = mock.Mock(side_effect=OSError())
|
task.build_private_data_files = mock.Mock(side_effect=OSError())
|
||||||
|
|
||||||
with mock.patch('awx.main.tasks.jobs.copy_tree'):
|
with mock.patch('awx.main.tasks.jobs.shutil.copytree'):
|
||||||
with pytest.raises(Exception):
|
with pytest.raises(Exception):
|
||||||
task.run(1)
|
task.run(1)
|
||||||
|
|
||||||
@@ -494,7 +494,7 @@ class TestGenericRun:
|
|||||||
task.model.objects.get = mock.Mock(return_value=job)
|
task.model.objects.get = mock.Mock(return_value=job)
|
||||||
task.build_private_data_files = mock.Mock()
|
task.build_private_data_files = mock.Mock()
|
||||||
|
|
||||||
with mock.patch('awx.main.tasks.jobs.copy_tree'):
|
with mock.patch('awx.main.tasks.jobs.shutil.copytree'):
|
||||||
with pytest.raises(Exception):
|
with pytest.raises(Exception):
|
||||||
task.run(1)
|
task.run(1)
|
||||||
|
|
||||||
@@ -1944,7 +1944,7 @@ def test_job_run_no_ee(mock_me):
|
|||||||
task.update_model = mock.Mock(return_value=job)
|
task.update_model = mock.Mock(return_value=job)
|
||||||
task.model.objects.get = mock.Mock(return_value=job)
|
task.model.objects.get = mock.Mock(return_value=job)
|
||||||
|
|
||||||
with mock.patch('awx.main.tasks.jobs.copy_tree'):
|
with mock.patch('awx.main.tasks.jobs.shutil.copytree'):
|
||||||
with pytest.raises(RuntimeError) as e:
|
with pytest.raises(RuntimeError) as e:
|
||||||
task.pre_run_hook(job, private_data_dir)
|
task.pre_run_hook(job, private_data_dir)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user