From e6bcc039f2ce5e8147ec4803724c55a8531db329 Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Thu, 20 Oct 2016 13:24:42 -0400 Subject: [PATCH] Rearchitect project update strategy * Instead of using a fanout for project updates initial project updates will sync the latest commit hash * Prior to a node running a job it will ensure that the latest project is synced --- awx/api/serializers.py | 4 +- awx/main/models/base.py | 8 ++- awx/main/models/projects.py | 18 +++++++ awx/main/tasks.py | 56 +++++++++++++++----- awx/playbooks/project_update.yml | 89 +++++++++++++++++++++++++++++--- awx/settings/defaults.py | 31 +++++------ 6 files changed, 169 insertions(+), 37 deletions(-) diff --git a/awx/api/serializers.py b/awx/api/serializers.py index 82e44d4f24..861e245a75 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -914,7 +914,7 @@ class ProjectSerializer(UnifiedJobTemplateSerializer, ProjectOptionsSerializer): class Meta: model = Project fields = ('*', 'organization', 'scm_delete_on_next_update', 'scm_update_on_launch', - 'scm_update_cache_timeout', 'timeout') + \ + 'scm_update_cache_timeout', 'scm_revision', 'timeout',) + \ ('last_update_failed', 'last_updated') # Backwards compatibility read_only_fields = ('scm_delete_on_next_update',) @@ -986,7 +986,7 @@ class ProjectUpdateSerializer(UnifiedJobSerializer, ProjectOptionsSerializer): class Meta: model = ProjectUpdate - fields = ('*', 'project') + fields = ('*', 'project', 'job_type') def get_related(self, obj): res = super(ProjectUpdateSerializer, self).get_related(obj) diff --git a/awx/main/models/base.py b/awx/main/models/base.py index c4914cdd20..691b4532fe 100644 --- a/awx/main/models/base.py +++ b/awx/main/models/base.py @@ -29,7 +29,8 @@ __all__ = ['VarsDictProperty', 'BaseModel', 'CreatedModifiedModel', 'PERM_INVENTORY_ADMIN', 'PERM_INVENTORY_READ', 'PERM_INVENTORY_WRITE', 'PERM_INVENTORY_DEPLOY', 'PERM_INVENTORY_SCAN', 'PERM_INVENTORY_CHECK', 'PERM_JOBTEMPLATE_CREATE', 'JOB_TYPE_CHOICES', - 'AD_HOC_JOB_TYPE_CHOICES', 'PERMISSION_TYPE_CHOICES', 'CLOUD_INVENTORY_SOURCES', + 'AD_HOC_JOB_TYPE_CHOICES', 'PROJECT_UPDATE_JOB_TYPE_CHOICES', + 'PERMISSION_TYPE_CHOICES', 'CLOUD_INVENTORY_SOURCES', 'VERBOSITY_CHOICES'] PERM_INVENTORY_ADMIN = 'admin' @@ -51,6 +52,11 @@ AD_HOC_JOB_TYPE_CHOICES = [ (PERM_INVENTORY_CHECK, _('Check')), ] +PROJECT_UPDATE_JOB_TYPE_CHOICES = [ + (PERM_INVENTORY_DEPLOY, _('Run')), + (PERM_INVENTORY_CHECK, _('Check')), +] + PERMISSION_TYPE_CHOICES = [ (PERM_INVENTORY_READ, _('Read Inventory')), (PERM_INVENTORY_WRITE, _('Edit Inventory')), diff --git a/awx/main/models/projects.py b/awx/main/models/projects.py index 256809d4ac..d96bcec98b 100644 --- a/awx/main/models/projects.py +++ b/awx/main/models/projects.py @@ -7,6 +7,9 @@ import os import re import urlparse +# Celery +from celery import group, chord + # Django from django.conf import settings from django.db import models @@ -227,6 +230,15 @@ class Project(UnifiedJobTemplate, ProjectOptions, ResourceMixin): blank=True, ) + scm_revision = models.CharField( + max_length=1024, + blank=True, + default='', + editable=False, + verbose_name=_('SCM Revision'), + help_text=_('The last revision fetched by a project update'), + ) + admin_role = ImplicitRoleField(parent_role=[ 'organization.admin_role', 'singleton:' + ROLE_SINGLETON_SYSTEM_ADMINISTRATOR, @@ -393,6 +405,12 @@ class ProjectUpdate(UnifiedJob, ProjectOptions, JobNotificationMixin): editable=False, ) + job_type = models.CharField( + max_length=64, + choices=PROJECT_UPDATE_JOB_TYPE_CHOICES, + default='check', + ) + @classmethod def _get_parent_field_name(cls): return 'project' diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 65c687e1d4..814700afd3 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -53,10 +53,9 @@ from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field, from awx.main.consumers import emit_channel_notification __all__ = ['RunJob', 'RunSystemJob', 'RunProjectUpdate', 'RunInventoryUpdate', - 'RunAdHocCommand', 'RunWorkflowJob', 'handle_work_error', + 'RunAdHocCommand', 'handle_work_error', 'handle_work_success', 'update_inventory_computed_fields', - 'send_notifications', 'run_administrative_checks', - 'RunJobLaunch'] + 'send_notifications', 'run_administrative_checks'] HIDDEN_PASSWORD = '**********' @@ -234,8 +233,9 @@ def handle_work_error(self, task_id, subtasks=None): if instance.celery_task_id != task_id: instance.status = 'failed' instance.failed = True - instance.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % \ - (first_instance_type, first_instance.name, first_instance.id) + if not instance.job_explanation: + instance.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % \ + (first_instance_type, first_instance.name, first_instance.id) instance.save() instance.websocket_emit_status("failed") @@ -538,6 +538,7 @@ class BaseTask(Task): expect_passwords[n] = passwords.get(item[1], '') or '' expect_list.extend([pexpect.TIMEOUT, pexpect.EOF]) instance = self.update_model(instance.pk, status='running', + execution_node=settings.CLUSTER_HOST_ID, output_replacements=output_replacements) job_start = time.time() while child.isalive(): @@ -608,7 +609,7 @@ class BaseTask(Task): Hook for any steps to run before the job/task starts ''' - def post_run_hook(self, instance, **kwargs): + def post_run_hook(self, instance, status, **kwargs): ''' Hook for any steps to run after job/task is complete. ''' @@ -617,7 +618,7 @@ class BaseTask(Task): ''' Run the job/task and capture its output. ''' - instance = self.update_model(pk, status='running', celery_task_id=self.request.id) + instance = self.update_model(pk, status='running', celery_task_id='' if self.request.id is None else self.request.id) instance.websocket_emit_status("running") status, rc, tb = 'error', None, '' @@ -690,7 +691,7 @@ class BaseTask(Task): instance = self.update_model(pk, status=status, result_traceback=tb, output_replacements=output_replacements, **extra_update_fields) - self.post_run_hook(instance, **kwargs) + self.post_run_hook(instance, status, **kwargs) instance.websocket_emit_status(status) if status != 'successful' and not hasattr(settings, 'CELERY_UNIT_TEST'): # Raising an exception will mark the job as 'failed' in celery @@ -990,11 +991,26 @@ class RunJob(BaseTask): ''' return getattr(settings, 'AWX_PROOT_ENABLED', False) - def post_run_hook(self, job, **kwargs): + def pre_run_hook(self, job, **kwargs): + if job.project.scm_type: + local_project_sync = job.project.create_project_update() + local_project_sync.job_type = 'run' + local_project_sync.save() + project_update_task = local_project_sync._get_task_class() + try: + project_update_task().run(local_project_sync.id) + except Exception: + job.status = 'failed' + job.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % \ + ('project_update', local_project_sync.name, local_project_sync.id) + job.save() + raise + + def post_run_hook(self, job, status, **kwargs): ''' Hook for actions to run after job/task has completed. ''' - super(RunJob, self).post_run_hook(job, **kwargs) + super(RunJob, self).post_run_hook(job, status, **kwargs) try: inventory = job.inventory except Inventory.DoesNotExist: @@ -1095,7 +1111,10 @@ class RunProjectUpdate(BaseTask): args.append('-v') scm_url, extra_vars = self._build_scm_url_extra_vars(project_update, **kwargs) - scm_branch = project_update.scm_branch or {'hg': 'tip'}.get(project_update.scm_type, 'HEAD') + if project_update.project.scm_revision and project_update.job_type == 'check': + scm_branch = project_update.project.scm_revision + else: + scm_branch = project_update.scm_branch or {'hg': 'tip'}.get(project_update.scm_type, 'HEAD') extra_vars.update({ 'project_path': project_update.get_project_path(check_if_exists=False), 'scm_type': project_update.scm_type, @@ -1103,6 +1122,8 @@ class RunProjectUpdate(BaseTask): 'scm_branch': scm_branch, 'scm_clean': project_update.scm_clean, 'scm_delete_on_update': project_update.scm_delete_on_update, + 'scm_full_checkout': True if project_update.job_type == 'run' else False, + 'scm_revision_output': '/tmp/_{}_syncrev'.format(project_update.id) # TODO: TempFile }) args.extend(['-e', json.dumps(extra_vars)]) args.append('project_update.yml') @@ -1176,6 +1197,17 @@ class RunProjectUpdate(BaseTask): ''' return kwargs.get('private_data_files', {}).get('scm_credential', '') + def post_run_hook(self, instance, status, **kwargs): + if instance.job_type == 'check': + p = instance.project + fd = open('/tmp/_{}_syncrev'.format(instance.id), 'r') + lines = fd.readlines() + if lines: + p.scm_revision = lines[0].strip() + p.save() + else: + logger.error("Could not find scm revision in check") + class RunInventoryUpdate(BaseTask): name = 'awx.main.tasks.run_inventory_update' @@ -1670,7 +1702,7 @@ class RunAdHocCommand(BaseTask): ''' return getattr(settings, 'AWX_PROOT_ENABLED', False) - def post_run_hook(self, ad_hoc_command, **kwargs): + def post_run_hook(self, ad_hoc_command, status, **kwargs): ''' Hook for actions to run after ad hoc command has completed. ''' diff --git a/awx/playbooks/project_update.yml b/awx/playbooks/project_update.yml index 1b2f4520f3..9ab7f1a277 100644 --- a/awx/playbooks/project_update.yml +++ b/awx/playbooks/project_update.yml @@ -17,28 +17,93 @@ tasks: - name: delete project directory before update - file: path={{project_path|quote}} state=absent + file: + path: "{{project_path|quote}}" + state: absent when: scm_delete_on_update|default('') - name: update project using git and accept hostkey - git: dest={{project_path|quote}} repo={{scm_url|quote}} version={{scm_branch|quote}} force={{scm_clean}} accept_hostkey={{scm_accept_hostkey}} + git: + dest: "{{project_path|quote}}" + repo: "{{scm_url|quote}}" + version: "{{scm_branch|quote}}" + force: "{{scm_clean}}" + accept_hostkey: "{{scm_accept_hostkey}}" + clone: "{{ scm_full_checkout }}" + update: "{{ scm_full_checkout }}" when: scm_type == 'git' and scm_accept_hostkey is defined + register: scm_result + + - name: Set the git repository version + set_fact: + scm_version: "{{ scm_result['after'] }}" + when: "'after' in scm_result" - name: update project using git - git: dest={{project_path|quote}} repo={{scm_url|quote}} version={{scm_branch|quote}} force={{scm_clean}} + git: + dest: "{{project_path|quote}}" + repo: "{{scm_url|quote}}" + version: "{{scm_branch|quote}}" + force: "{{scm_clean}}" + clone: "{{ scm_full_checkout }}" + update: "{{ scm_full_checkout }}" when: scm_type == 'git' and scm_accept_hostkey is not defined + register: scm_result + + - name: Set the git repository version + set_fact: + scm_version: "{{ scm_result['after'] }}" + when: "'after' in scm_result" - name: update project using hg - hg: dest={{project_path|quote}} repo={{scm_url|quote}} revision={{scm_branch|quote}} force={{scm_clean}} + hg: + dest: "{{project_path|quote}}" + repo: "{{scm_url|quote}}" + revision: "{{scm_branch|quote}}" + force: "{{scm_clean}}" + #clone: "{{ scm_full_checkout }}" + #update: "{{ scm_full_checkout }}" when: scm_type == 'hg' + register: scm_result + + - name: Set the hg repository version + set_fact: + scm_version: "{{ scm_result['after'] }}" + when: "'after' in scm_result" - name: update project using svn - subversion: dest={{project_path|quote}} repo={{scm_url|quote}} revision={{scm_branch|quote}} force={{scm_clean}} + subversion: + dest: "{{project_path|quote}}" + repo: "{{scm_url|quote}}" + revision: "{{scm_branch|quote}}" + force: "{{scm_clean}}" + #checkout: "{{ scm_full_checkout }}" + #update: "{{ scm_full_checkout }}" when: scm_type == 'svn' and not scm_username|default('') + register: scm_result + + - name: Set the svn repository version + set_fact: + scm_version: "{{ scm_result['after'] }}" + when: "'after' in scm_result" - name: update project using svn with auth - subversion: dest={{project_path|quote}} repo={{scm_url|quote}} revision={{scm_branch|quote}} force={{scm_clean}} username={{scm_username|quote}} password={{scm_password|quote}} + subversion: + dest: "{{project_path|quote}}" + repo: "{{scm_url|quote}}" + revision: "{{scm_branch|quote}}" + force: "{{scm_clean}}" + username: "{{scm_username|quote}}" + password: "{{scm_password|quote}}" + #checkout: "{{ scm_full_checkout }}" + #update: "{{ scm_full_checkout }}" when: scm_type == 'svn' and scm_username|default('') + register: scm_result + + - name: Set the svn repository version + set_fact: + scm_version: "{{ scm_result['after'] }}" + when: "'after' in scm_result" - name: detect requirements.yml stat: path={{project_path|quote}}/roles/requirements.yml @@ -48,4 +113,14 @@ command: ansible-galaxy install -r requirements.yml -p {{project_path|quote}}/roles/ --force args: chdir: "{{project_path|quote}}/roles" - when: doesRequirementsExist.stat.exists + when: doesRequirementsExist.stat.exists and scm_full_checkout|bool + + - name: Repository Version + debug: msg="Repository Version {{ scm_version }}" + when: scm_version is defined + + - name: Write Repository Version + copy: + dest: "{{ scm_revision_output }}" + content: "{{ scm_version }}" + when: scm_version is defined and scm_revision_output is defined diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 630b5ee6ee..11095a61f5 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -358,23 +358,24 @@ CELERY_QUEUES = ( Queue('jobs', Exchange('jobs'), routing_key='jobs'), Queue('scheduler', Exchange('scheduler', type='topic'), routing_key='scheduler.job.#', durable=False), # Projects use a fanout queue, this isn't super well supported - Broadcast('projects'), ) CELERY_ROUTES = {'awx.main.tasks.run_job': {'queue': 'jobs', - 'routing_key': 'jobs'}, - 'awx.main.tasks.run_project_update': {'queue': 'projects'}, - 'awx.main.tasks.run_inventory_update': {'queue': 'jobs', - 'routing_key': 'jobs'}, - 'awx.main.tasks.run_ad_hoc_command': {'queue': 'jobs', - 'routing_key': 'jobs'}, - 'awx.main.tasks.run_system_job': {'queue': 'jobs', - 'routing_key': 'jobs'}, - 'awx.main.scheduler.tasks.run_job_launch': {'queue': 'scheduler', - 'routing_key': 'scheduler.job.launch'}, - 'awx.main.scheduler.tasks.run_job_complete': {'queue': 'scheduler', - 'routing_key': 'scheduler.job.complete'}, - 'awx.main.tasks.cluster_node_heartbeat': {'queue': 'default', - 'routing_key': 'cluster.heartbeat'},} + 'routing_key': 'jobs'}, + 'awx.main.tasks.run_project_update': {'queue': 'jobs', + 'routing_key': 'jobs'}, + 'awx.main.tasks.run_inventory_update': {'queue': 'jobs', + 'routing_key': 'jobs'}, + 'awx.main.tasks.run_ad_hoc_command': {'queue': 'jobs', + 'routing_key': 'jobs'}, + 'awx.main.tasks.run_system_job': {'queue': 'jobs', + 'routing_key': 'jobs'}, + 'awx.main.scheduler.tasks.run_job_launch': {'queue': 'scheduler', + 'routing_key': 'scheduler.job.launch'}, + 'awx.main.scheduler.tasks.run_job_complete': {'queue': 'scheduler', + 'routing_key': 'scheduler.job.complete'}, + 'awx.main.tasks.cluster_node_heartbeat': {'queue': 'default', + 'routing_key': 'cluster.heartbeat'}, +} CELERYBEAT_SCHEDULE = { 'tower_scheduler': {