mirror of
https://github.com/ansible/awx.git
synced 2026-03-25 12:55:04 -02:30
Rearchitect project update strategy
* Instead of using a fanout for project updates initial project updates will sync the latest commit hash * Prior to a node running a job it will ensure that the latest project is synced
This commit is contained in:
@@ -53,10 +53,9 @@ from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field,
|
||||
from awx.main.consumers import emit_channel_notification
|
||||
|
||||
__all__ = ['RunJob', 'RunSystemJob', 'RunProjectUpdate', 'RunInventoryUpdate',
|
||||
'RunAdHocCommand', 'RunWorkflowJob', 'handle_work_error',
|
||||
'RunAdHocCommand', 'handle_work_error',
|
||||
'handle_work_success', 'update_inventory_computed_fields',
|
||||
'send_notifications', 'run_administrative_checks',
|
||||
'RunJobLaunch']
|
||||
'send_notifications', 'run_administrative_checks']
|
||||
|
||||
HIDDEN_PASSWORD = '**********'
|
||||
|
||||
@@ -234,8 +233,9 @@ def handle_work_error(self, task_id, subtasks=None):
|
||||
if instance.celery_task_id != task_id:
|
||||
instance.status = 'failed'
|
||||
instance.failed = True
|
||||
instance.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % \
|
||||
(first_instance_type, first_instance.name, first_instance.id)
|
||||
if not instance.job_explanation:
|
||||
instance.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % \
|
||||
(first_instance_type, first_instance.name, first_instance.id)
|
||||
instance.save()
|
||||
instance.websocket_emit_status("failed")
|
||||
|
||||
@@ -538,6 +538,7 @@ class BaseTask(Task):
|
||||
expect_passwords[n] = passwords.get(item[1], '') or ''
|
||||
expect_list.extend([pexpect.TIMEOUT, pexpect.EOF])
|
||||
instance = self.update_model(instance.pk, status='running',
|
||||
execution_node=settings.CLUSTER_HOST_ID,
|
||||
output_replacements=output_replacements)
|
||||
job_start = time.time()
|
||||
while child.isalive():
|
||||
@@ -608,7 +609,7 @@ class BaseTask(Task):
|
||||
Hook for any steps to run before the job/task starts
|
||||
'''
|
||||
|
||||
def post_run_hook(self, instance, **kwargs):
|
||||
def post_run_hook(self, instance, status, **kwargs):
|
||||
'''
|
||||
Hook for any steps to run after job/task is complete.
|
||||
'''
|
||||
@@ -617,7 +618,7 @@ class BaseTask(Task):
|
||||
'''
|
||||
Run the job/task and capture its output.
|
||||
'''
|
||||
instance = self.update_model(pk, status='running', celery_task_id=self.request.id)
|
||||
instance = self.update_model(pk, status='running', celery_task_id='' if self.request.id is None else self.request.id)
|
||||
|
||||
instance.websocket_emit_status("running")
|
||||
status, rc, tb = 'error', None, ''
|
||||
@@ -690,7 +691,7 @@ class BaseTask(Task):
|
||||
instance = self.update_model(pk, status=status, result_traceback=tb,
|
||||
output_replacements=output_replacements,
|
||||
**extra_update_fields)
|
||||
self.post_run_hook(instance, **kwargs)
|
||||
self.post_run_hook(instance, status, **kwargs)
|
||||
instance.websocket_emit_status(status)
|
||||
if status != 'successful' and not hasattr(settings, 'CELERY_UNIT_TEST'):
|
||||
# Raising an exception will mark the job as 'failed' in celery
|
||||
@@ -990,11 +991,26 @@ class RunJob(BaseTask):
|
||||
'''
|
||||
return getattr(settings, 'AWX_PROOT_ENABLED', False)
|
||||
|
||||
def post_run_hook(self, job, **kwargs):
|
||||
def pre_run_hook(self, job, **kwargs):
|
||||
if job.project.scm_type:
|
||||
local_project_sync = job.project.create_project_update()
|
||||
local_project_sync.job_type = 'run'
|
||||
local_project_sync.save()
|
||||
project_update_task = local_project_sync._get_task_class()
|
||||
try:
|
||||
project_update_task().run(local_project_sync.id)
|
||||
except Exception:
|
||||
job.status = 'failed'
|
||||
job.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % \
|
||||
('project_update', local_project_sync.name, local_project_sync.id)
|
||||
job.save()
|
||||
raise
|
||||
|
||||
def post_run_hook(self, job, status, **kwargs):
|
||||
'''
|
||||
Hook for actions to run after job/task has completed.
|
||||
'''
|
||||
super(RunJob, self).post_run_hook(job, **kwargs)
|
||||
super(RunJob, self).post_run_hook(job, status, **kwargs)
|
||||
try:
|
||||
inventory = job.inventory
|
||||
except Inventory.DoesNotExist:
|
||||
@@ -1095,7 +1111,10 @@ class RunProjectUpdate(BaseTask):
|
||||
args.append('-v')
|
||||
scm_url, extra_vars = self._build_scm_url_extra_vars(project_update,
|
||||
**kwargs)
|
||||
scm_branch = project_update.scm_branch or {'hg': 'tip'}.get(project_update.scm_type, 'HEAD')
|
||||
if project_update.project.scm_revision and project_update.job_type == 'check':
|
||||
scm_branch = project_update.project.scm_revision
|
||||
else:
|
||||
scm_branch = project_update.scm_branch or {'hg': 'tip'}.get(project_update.scm_type, 'HEAD')
|
||||
extra_vars.update({
|
||||
'project_path': project_update.get_project_path(check_if_exists=False),
|
||||
'scm_type': project_update.scm_type,
|
||||
@@ -1103,6 +1122,8 @@ class RunProjectUpdate(BaseTask):
|
||||
'scm_branch': scm_branch,
|
||||
'scm_clean': project_update.scm_clean,
|
||||
'scm_delete_on_update': project_update.scm_delete_on_update,
|
||||
'scm_full_checkout': True if project_update.job_type == 'run' else False,
|
||||
'scm_revision_output': '/tmp/_{}_syncrev'.format(project_update.id) # TODO: TempFile
|
||||
})
|
||||
args.extend(['-e', json.dumps(extra_vars)])
|
||||
args.append('project_update.yml')
|
||||
@@ -1176,6 +1197,17 @@ class RunProjectUpdate(BaseTask):
|
||||
'''
|
||||
return kwargs.get('private_data_files', {}).get('scm_credential', '')
|
||||
|
||||
def post_run_hook(self, instance, status, **kwargs):
|
||||
if instance.job_type == 'check':
|
||||
p = instance.project
|
||||
fd = open('/tmp/_{}_syncrev'.format(instance.id), 'r')
|
||||
lines = fd.readlines()
|
||||
if lines:
|
||||
p.scm_revision = lines[0].strip()
|
||||
p.save()
|
||||
else:
|
||||
logger.error("Could not find scm revision in check")
|
||||
|
||||
class RunInventoryUpdate(BaseTask):
|
||||
|
||||
name = 'awx.main.tasks.run_inventory_update'
|
||||
@@ -1670,7 +1702,7 @@ class RunAdHocCommand(BaseTask):
|
||||
'''
|
||||
return getattr(settings, 'AWX_PROOT_ENABLED', False)
|
||||
|
||||
def post_run_hook(self, ad_hoc_command, **kwargs):
|
||||
def post_run_hook(self, ad_hoc_command, status, **kwargs):
|
||||
'''
|
||||
Hook for actions to run after ad hoc command has completed.
|
||||
'''
|
||||
|
||||
Reference in New Issue
Block a user