Merge pull request #3755 from ansible/cluster_project_sync

Rearchitect project sync for standalone and clusters
This commit is contained in:
Matthew Jones
2016-10-20 16:04:03 -04:00
committed by GitHub
13 changed files with 233 additions and 72 deletions

View File

@@ -428,7 +428,7 @@ celeryd:
@if [ "$(VENV_BASE)" ]; then \ @if [ "$(VENV_BASE)" ]; then \
. $(VENV_BASE)/tower/bin/activate; \ . $(VENV_BASE)/tower/bin/activate; \
fi; \ fi; \
$(PYTHON) manage.py celeryd -l DEBUG -B --autoscale=20,3 --schedule=$(CELERY_SCHEDULE_FILE) -Q projects,jobs,default,scheduler,$(COMPOSE_HOST) $(PYTHON) manage.py celeryd -l DEBUG -B --autoreload --autoscale=20,3 --schedule=$(CELERY_SCHEDULE_FILE) -Q projects,jobs,default,scheduler,$(COMPOSE_HOST)
#$(PYTHON) manage.py celery multi show projects jobs default -l DEBUG -Q:projects projects -Q:jobs jobs -Q:default default -c:projects 1 -c:jobs 3 -c:default 3 -Ofair -B --schedule=$(CELERY_SCHEDULE_FILE) #$(PYTHON) manage.py celery multi show projects jobs default -l DEBUG -Q:projects projects -Q:jobs jobs -Q:default default -c:projects 1 -c:jobs 3 -c:default 3 -Ofair -B --schedule=$(CELERY_SCHEDULE_FILE)
# Run to start the zeromq callback receiver # Run to start the zeromq callback receiver

View File

@@ -562,7 +562,7 @@ class UnifiedJobSerializer(BaseSerializer):
fields = ('*', 'unified_job_template', 'launch_type', 'status', fields = ('*', 'unified_job_template', 'launch_type', 'status',
'failed', 'started', 'finished', 'elapsed', 'job_args', 'failed', 'started', 'finished', 'elapsed', 'job_args',
'job_cwd', 'job_env', 'job_explanation', 'result_stdout', 'job_cwd', 'job_env', 'job_explanation', 'result_stdout',
'result_traceback') 'execution_node', 'result_traceback')
extra_kwargs = { extra_kwargs = {
'unified_job_template': { 'unified_job_template': {
'source': 'unified_job_template_id', 'source': 'unified_job_template_id',
@@ -914,7 +914,7 @@ class ProjectSerializer(UnifiedJobTemplateSerializer, ProjectOptionsSerializer):
class Meta: class Meta:
model = Project model = Project
fields = ('*', 'organization', 'scm_delete_on_next_update', 'scm_update_on_launch', fields = ('*', 'organization', 'scm_delete_on_next_update', 'scm_update_on_launch',
'scm_update_cache_timeout', 'timeout') + \ 'scm_update_cache_timeout', 'scm_revision', 'timeout',) + \
('last_update_failed', 'last_updated') # Backwards compatibility ('last_update_failed', 'last_updated') # Backwards compatibility
read_only_fields = ('scm_delete_on_next_update',) read_only_fields = ('scm_delete_on_next_update',)
@@ -986,7 +986,7 @@ class ProjectUpdateSerializer(UnifiedJobSerializer, ProjectOptionsSerializer):
class Meta: class Meta:
model = ProjectUpdate model = ProjectUpdate
fields = ('*', 'project') fields = ('*', 'project', 'job_type')
def get_related(self, obj): def get_related(self, obj):
res = super(ProjectUpdateSerializer, self).get_related(obj) res = super(ProjectUpdateSerializer, self).get_related(obj)
@@ -1930,7 +1930,7 @@ class JobSerializer(UnifiedJobSerializer, JobOptionsSerializer):
fields = ('*', 'job_template', 'passwords_needed_to_start', 'ask_variables_on_launch', fields = ('*', 'job_template', 'passwords_needed_to_start', 'ask_variables_on_launch',
'ask_limit_on_launch', 'ask_tags_on_launch', 'ask_skip_tags_on_launch', 'ask_limit_on_launch', 'ask_tags_on_launch', 'ask_skip_tags_on_launch',
'ask_job_type_on_launch', 'ask_inventory_on_launch', 'ask_credential_on_launch', 'ask_job_type_on_launch', 'ask_inventory_on_launch', 'ask_credential_on_launch',
'allow_simultaneous', 'artifacts',) 'allow_simultaneous', 'artifacts', 'scm_revision',)
def get_related(self, obj): def get_related(self, obj):
res = super(JobSerializer, self).get_related(obj) res = super(JobSerializer, self).get_related(obj)

View File

@@ -31,13 +31,10 @@ class InstanceManager(models.Manager):
hostname='localhost', hostname='localhost',
uuid='00000000-0000-0000-0000-000000000000') uuid='00000000-0000-0000-0000-000000000000')
# If we can determine the instance we are on then return node = self.filter(hostname=settings.CLUSTER_HOST_ID)
# that, otherwise None which would be the standalone if node.exists():
# case return node[0]
# TODO: Replace, this doesn't work if the hostname raise RuntimeError("No instance found with the current cluster host id")
# is different from the Instance.name
# node = self.filter(hostname=socket.gethostname())
return self.all()[0]
def my_role(self): def my_role(self):
# NOTE: TODO: Likely to repurpose this once standalone ramparts are a thing # NOTE: TODO: Likely to repurpose this once standalone ramparts are a thing

View File

@@ -0,0 +1,19 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('main', '0041_v310_job_timeout'),
]
operations = [
migrations.AddField(
model_name='unifiedjob',
name='execution_node',
field=models.TextField(default=b'', editable=False, blank=True),
),
]

View File

@@ -0,0 +1,30 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('main', '0042_v310_executionnode'),
]
operations = [
migrations.AddField(
model_name='project',
name='scm_revision',
field=models.CharField(default=b'', editable=False, max_length=1024, blank=True, help_text='The last revision fetched by a project update', verbose_name='SCM Revision'),
),
migrations.AddField(
model_name='projectupdate',
name='job_type',
field=models.CharField(default=b'check', max_length=64, choices=[(b'run', 'Run'), (b'check', 'Check')]),
),
migrations.AddField(
model_name='job',
name='scm_revision',
field=models.CharField(default=b'', editable=False, max_length=1024, blank=True, help_text='The SCM Revision from the Project used for this job, if available', verbose_name='SCM Revision'),
),
]

View File

@@ -29,7 +29,8 @@ __all__ = ['VarsDictProperty', 'BaseModel', 'CreatedModifiedModel',
'PERM_INVENTORY_ADMIN', 'PERM_INVENTORY_READ', 'PERM_INVENTORY_ADMIN', 'PERM_INVENTORY_READ',
'PERM_INVENTORY_WRITE', 'PERM_INVENTORY_DEPLOY', 'PERM_INVENTORY_SCAN', 'PERM_INVENTORY_WRITE', 'PERM_INVENTORY_DEPLOY', 'PERM_INVENTORY_SCAN',
'PERM_INVENTORY_CHECK', 'PERM_JOBTEMPLATE_CREATE', 'JOB_TYPE_CHOICES', 'PERM_INVENTORY_CHECK', 'PERM_JOBTEMPLATE_CREATE', 'JOB_TYPE_CHOICES',
'AD_HOC_JOB_TYPE_CHOICES', 'PERMISSION_TYPE_CHOICES', 'CLOUD_INVENTORY_SOURCES', 'AD_HOC_JOB_TYPE_CHOICES', 'PROJECT_UPDATE_JOB_TYPE_CHOICES',
'PERMISSION_TYPE_CHOICES', 'CLOUD_INVENTORY_SOURCES',
'VERBOSITY_CHOICES'] 'VERBOSITY_CHOICES']
PERM_INVENTORY_ADMIN = 'admin' PERM_INVENTORY_ADMIN = 'admin'
@@ -51,6 +52,11 @@ AD_HOC_JOB_TYPE_CHOICES = [
(PERM_INVENTORY_CHECK, _('Check')), (PERM_INVENTORY_CHECK, _('Check')),
] ]
PROJECT_UPDATE_JOB_TYPE_CHOICES = [
(PERM_INVENTORY_DEPLOY, _('Run')),
(PERM_INVENTORY_CHECK, _('Check')),
]
PERMISSION_TYPE_CHOICES = [ PERMISSION_TYPE_CHOICES = [
(PERM_INVENTORY_READ, _('Read Inventory')), (PERM_INVENTORY_READ, _('Read Inventory')),
(PERM_INVENTORY_WRITE, _('Edit Inventory')), (PERM_INVENTORY_WRITE, _('Edit Inventory')),

View File

@@ -559,6 +559,15 @@ class Job(UnifiedJob, JobOptions, JobNotificationMixin):
default={}, default={},
editable=False, editable=False,
) )
scm_revision = models.CharField(
max_length=1024,
blank=True,
default='',
editable=False,
verbose_name=_('SCM Revision'),
help_text=_('The SCM Revision from the Project used for this job, if available'),
)
@classmethod @classmethod
def _get_parent_field_name(cls): def _get_parent_field_name(cls):

View File

@@ -227,6 +227,15 @@ class Project(UnifiedJobTemplate, ProjectOptions, ResourceMixin):
blank=True, blank=True,
) )
scm_revision = models.CharField(
max_length=1024,
blank=True,
default='',
editable=False,
verbose_name=_('SCM Revision'),
help_text=_('The last revision fetched by a project update'),
)
admin_role = ImplicitRoleField(parent_role=[ admin_role = ImplicitRoleField(parent_role=[
'organization.admin_role', 'organization.admin_role',
'singleton:' + ROLE_SINGLETON_SYSTEM_ADMINISTRATOR, 'singleton:' + ROLE_SINGLETON_SYSTEM_ADMINISTRATOR,
@@ -393,6 +402,12 @@ class ProjectUpdate(UnifiedJob, ProjectOptions, JobNotificationMixin):
editable=False, editable=False,
) )
job_type = models.CharField(
max_length=64,
choices=PROJECT_UPDATE_JOB_TYPE_CHOICES,
default='check',
)
@classmethod @classmethod
def _get_parent_field_name(cls): def _get_parent_field_name(cls):
return 'project' return 'project'

View File

@@ -438,6 +438,11 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
editable=False, editable=False,
related_name='%(class)s_blocked_jobs+', related_name='%(class)s_blocked_jobs+',
) )
execution_node = models.TextField(
blank=True,
default='',
editable=False,
)
notifications = models.ManyToManyField( notifications = models.ManyToManyField(
'Notification', 'Notification',
editable=False, editable=False,
@@ -801,7 +806,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
def pre_start(self, **kwargs): def pre_start(self, **kwargs):
if not self.can_start: if not self.can_start:
self.job_explanation = u'%s is not in a startable status: %s, expecting one of %s' % (self._meta.verbose_name, self.status, str(('new', 'waiting'))) self.job_explanation = u'%s is not in a startable state: %s, expecting one of %s' % (self._meta.verbose_name, self.status, str(('new', 'waiting')))
self.save(update_fields=['job_explanation']) self.save(update_fields=['job_explanation'])
return (False, None) return (False, None)

View File

@@ -53,10 +53,9 @@ from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field,
from awx.main.consumers import emit_channel_notification from awx.main.consumers import emit_channel_notification
__all__ = ['RunJob', 'RunSystemJob', 'RunProjectUpdate', 'RunInventoryUpdate', __all__ = ['RunJob', 'RunSystemJob', 'RunProjectUpdate', 'RunInventoryUpdate',
'RunAdHocCommand', 'RunWorkflowJob', 'handle_work_error', 'RunAdHocCommand', 'handle_work_error',
'handle_work_success', 'update_inventory_computed_fields', 'handle_work_success', 'update_inventory_computed_fields',
'send_notifications', 'run_administrative_checks', 'send_notifications', 'run_administrative_checks']
'RunJobLaunch']
HIDDEN_PASSWORD = '**********' HIDDEN_PASSWORD = '**********'
@@ -234,8 +233,9 @@ def handle_work_error(self, task_id, subtasks=None):
if instance.celery_task_id != task_id: if instance.celery_task_id != task_id:
instance.status = 'failed' instance.status = 'failed'
instance.failed = True instance.failed = True
instance.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % \ if not instance.job_explanation:
(first_instance_type, first_instance.name, first_instance.id) instance.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % \
(first_instance_type, first_instance.name, first_instance.id)
instance.save() instance.save()
instance.websocket_emit_status("failed") instance.websocket_emit_status("failed")
@@ -538,6 +538,7 @@ class BaseTask(Task):
expect_passwords[n] = passwords.get(item[1], '') or '' expect_passwords[n] = passwords.get(item[1], '') or ''
expect_list.extend([pexpect.TIMEOUT, pexpect.EOF]) expect_list.extend([pexpect.TIMEOUT, pexpect.EOF])
instance = self.update_model(instance.pk, status='running', instance = self.update_model(instance.pk, status='running',
execution_node=settings.CLUSTER_HOST_ID,
output_replacements=output_replacements) output_replacements=output_replacements)
job_start = time.time() job_start = time.time()
while child.isalive(): while child.isalive():
@@ -608,7 +609,7 @@ class BaseTask(Task):
Hook for any steps to run before the job/task starts Hook for any steps to run before the job/task starts
''' '''
def post_run_hook(self, instance, **kwargs): def post_run_hook(self, instance, status, **kwargs):
''' '''
Hook for any steps to run after job/task is complete. Hook for any steps to run after job/task is complete.
''' '''
@@ -617,7 +618,7 @@ class BaseTask(Task):
''' '''
Run the job/task and capture its output. Run the job/task and capture its output.
''' '''
instance = self.update_model(pk, status='running', celery_task_id=self.request.id) instance = self.update_model(pk, status='running', celery_task_id='' if self.request.id is None else self.request.id)
instance.websocket_emit_status("running") instance.websocket_emit_status("running")
status, rc, tb = 'error', None, '' status, rc, tb = 'error', None, ''
@@ -690,7 +691,7 @@ class BaseTask(Task):
instance = self.update_model(pk, status=status, result_traceback=tb, instance = self.update_model(pk, status=status, result_traceback=tb,
output_replacements=output_replacements, output_replacements=output_replacements,
**extra_update_fields) **extra_update_fields)
self.post_run_hook(instance, **kwargs) self.post_run_hook(instance, status, **kwargs)
instance.websocket_emit_status(status) instance.websocket_emit_status(status)
if status != 'successful' and not hasattr(settings, 'CELERY_UNIT_TEST'): if status != 'successful' and not hasattr(settings, 'CELERY_UNIT_TEST'):
# Raising an exception will mark the job as 'failed' in celery # Raising an exception will mark the job as 'failed' in celery
@@ -781,6 +782,7 @@ class RunJob(BaseTask):
# callbacks to work. # callbacks to work.
env['JOB_ID'] = str(job.pk) env['JOB_ID'] = str(job.pk)
env['INVENTORY_ID'] = str(job.inventory.pk) env['INVENTORY_ID'] = str(job.inventory.pk)
env['PROJECT_REVISION'] = job.project.scm_revision
env['ANSIBLE_CALLBACK_PLUGINS'] = plugin_path env['ANSIBLE_CALLBACK_PLUGINS'] = plugin_path
env['REST_API_URL'] = settings.INTERNAL_API_URL env['REST_API_URL'] = settings.INTERNAL_API_URL
env['REST_API_TOKEN'] = job.task_auth_token or '' env['REST_API_TOKEN'] = job.task_auth_token or ''
@@ -914,6 +916,10 @@ class RunJob(BaseTask):
'tower_job_id': job.pk, 'tower_job_id': job.pk,
'tower_job_launch_type': job.launch_type, 'tower_job_launch_type': job.launch_type,
} }
if job.project:
extra_vars.update({
'tower_project_revision': job.project.scm_revision,
})
if job.job_template: if job.job_template:
extra_vars.update({ extra_vars.update({
'tower_job_template_id': job.job_template.pk, 'tower_job_template_id': job.job_template.pk,
@@ -990,11 +996,28 @@ class RunJob(BaseTask):
''' '''
return getattr(settings, 'AWX_PROOT_ENABLED', False) return getattr(settings, 'AWX_PROOT_ENABLED', False)
def post_run_hook(self, job, **kwargs): def pre_run_hook(self, job, **kwargs):
if job.project.scm_type:
local_project_sync = job.project.create_project_update()
local_project_sync.job_type = 'run'
local_project_sync.save()
project_update_task = local_project_sync._get_task_class()
try:
project_update_task().run(local_project_sync.id)
job.scm_revision = job.project.scm_revision
job.save()
except Exception:
job.status = 'failed'
job.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % \
('project_update', local_project_sync.name, local_project_sync.id)
job.save()
raise
def post_run_hook(self, job, status, **kwargs):
''' '''
Hook for actions to run after job/task has completed. Hook for actions to run after job/task has completed.
''' '''
super(RunJob, self).post_run_hook(job, **kwargs) super(RunJob, self).post_run_hook(job, status, **kwargs)
try: try:
inventory = job.inventory inventory = job.inventory
except Inventory.DoesNotExist: except Inventory.DoesNotExist:
@@ -1095,7 +1118,10 @@ class RunProjectUpdate(BaseTask):
args.append('-v') args.append('-v')
scm_url, extra_vars = self._build_scm_url_extra_vars(project_update, scm_url, extra_vars = self._build_scm_url_extra_vars(project_update,
**kwargs) **kwargs)
scm_branch = project_update.scm_branch or {'hg': 'tip'}.get(project_update.scm_type, 'HEAD') if project_update.project.scm_revision and project_update.job_type == 'run':
scm_branch = project_update.project.scm_revision
else:
scm_branch = project_update.scm_branch or {'hg': 'tip'}.get(project_update.scm_type, 'HEAD')
extra_vars.update({ extra_vars.update({
'project_path': project_update.get_project_path(check_if_exists=False), 'project_path': project_update.get_project_path(check_if_exists=False),
'scm_type': project_update.scm_type, 'scm_type': project_update.scm_type,
@@ -1103,6 +1129,8 @@ class RunProjectUpdate(BaseTask):
'scm_branch': scm_branch, 'scm_branch': scm_branch,
'scm_clean': project_update.scm_clean, 'scm_clean': project_update.scm_clean,
'scm_delete_on_update': project_update.scm_delete_on_update, 'scm_delete_on_update': project_update.scm_delete_on_update,
'scm_full_checkout': True if project_update.job_type == 'run' else False,
'scm_revision_output': '/tmp/_{}_syncrev'.format(project_update.id) # TODO: TempFile
}) })
args.extend(['-e', json.dumps(extra_vars)]) args.extend(['-e', json.dumps(extra_vars)])
args.append('project_update.yml') args.append('project_update.yml')
@@ -1176,6 +1204,17 @@ class RunProjectUpdate(BaseTask):
''' '''
return kwargs.get('private_data_files', {}).get('scm_credential', '') return kwargs.get('private_data_files', {}).get('scm_credential', '')
def post_run_hook(self, instance, status, **kwargs):
if instance.job_type == 'check':
p = instance.project
fd = open('/tmp/_{}_syncrev'.format(instance.id), 'r')
lines = fd.readlines()
if lines:
p.scm_revision = lines[0].strip()
p.save()
else:
logger.error("Could not find scm revision in check")
class RunInventoryUpdate(BaseTask): class RunInventoryUpdate(BaseTask):
name = 'awx.main.tasks.run_inventory_update' name = 'awx.main.tasks.run_inventory_update'
@@ -1670,7 +1709,7 @@ class RunAdHocCommand(BaseTask):
''' '''
return getattr(settings, 'AWX_PROOT_ENABLED', False) return getattr(settings, 'AWX_PROOT_ENABLED', False)
def post_run_hook(self, ad_hoc_command, **kwargs): def post_run_hook(self, ad_hoc_command, status, **kwargs):
''' '''
Hook for actions to run after ad hoc command has completed. Hook for actions to run after ad hoc command has completed.
''' '''
@@ -1707,38 +1746,3 @@ class RunSystemJob(BaseTask):
def build_cwd(self, instance, **kwargs): def build_cwd(self, instance, **kwargs):
return settings.BASE_DIR return settings.BASE_DIR
'''
class RunWorkflowJob(BaseTask):
name = 'awx.main.tasks.run_workflow_job'
model = WorkflowJob
def run(self, pk, **kwargs):
#Run the job/task and capture its output.
instance = self.update_model(pk, status='running', celery_task_id=self.request.id)
instance.websocket_emit_status("running")
# FIXME: Currently, the workflow job busy waits until the graph run is
# complete. Instead, the workflow job should return or never even run,
# because all of the "launch logic" can be done schedule().
# However, other aspects of our system depend on a 1-1 relationship
# between a Job and a Celery Task.
#
# * If we let the workflow job task (RunWorkflowJob.run()) complete
# then how do we trigger the handle_work_error and
# handle_work_success subtasks?
#
# * How do we handle the recovery process? (i.e. there is an entry in
# the database but not in celery).
while True:
dag = WorkflowDAG(instance)
if dag.is_workflow_done():
# TODO: update with accurate finish status (i.e. canceled, error, etc.)
instance = self.update_model(instance.pk, status='successful')
break
time.sleep(1)
instance.websocket_emit_status(instance.status)
# TODO: Handle cancel
'''

View File

@@ -17,28 +17,93 @@
tasks: tasks:
- name: delete project directory before update - name: delete project directory before update
file: path={{project_path|quote}} state=absent file:
path: "{{project_path|quote}}"
state: absent
when: scm_delete_on_update|default('') when: scm_delete_on_update|default('')
- name: update project using git and accept hostkey - name: update project using git and accept hostkey
git: dest={{project_path|quote}} repo={{scm_url|quote}} version={{scm_branch|quote}} force={{scm_clean}} accept_hostkey={{scm_accept_hostkey}} git:
dest: "{{project_path|quote}}"
repo: "{{scm_url|quote}}"
version: "{{scm_branch|quote}}"
force: "{{scm_clean}}"
accept_hostkey: "{{scm_accept_hostkey}}"
clone: "{{ scm_full_checkout }}"
update: "{{ scm_full_checkout }}"
when: scm_type == 'git' and scm_accept_hostkey is defined when: scm_type == 'git' and scm_accept_hostkey is defined
register: scm_result
- name: Set the git repository version
set_fact:
scm_version: "{{ scm_result['after'] }}"
when: "'after' in scm_result"
- name: update project using git - name: update project using git
git: dest={{project_path|quote}} repo={{scm_url|quote}} version={{scm_branch|quote}} force={{scm_clean}} git:
dest: "{{project_path|quote}}"
repo: "{{scm_url|quote}}"
version: "{{scm_branch|quote}}"
force: "{{scm_clean}}"
clone: "{{ scm_full_checkout }}"
update: "{{ scm_full_checkout }}"
when: scm_type == 'git' and scm_accept_hostkey is not defined when: scm_type == 'git' and scm_accept_hostkey is not defined
register: scm_result
- name: Set the git repository version
set_fact:
scm_version: "{{ scm_result['after'] }}"
when: "'after' in scm_result"
- name: update project using hg - name: update project using hg
hg: dest={{project_path|quote}} repo={{scm_url|quote}} revision={{scm_branch|quote}} force={{scm_clean}} hg:
dest: "{{project_path|quote}}"
repo: "{{scm_url|quote}}"
revision: "{{scm_branch|quote}}"
force: "{{scm_clean}}"
#clone: "{{ scm_full_checkout }}"
#update: "{{ scm_full_checkout }}"
when: scm_type == 'hg' when: scm_type == 'hg'
register: scm_result
- name: Set the hg repository version
set_fact:
scm_version: "{{ scm_result['after'] }}"
when: "'after' in scm_result"
- name: update project using svn - name: update project using svn
subversion: dest={{project_path|quote}} repo={{scm_url|quote}} revision={{scm_branch|quote}} force={{scm_clean}} subversion:
dest: "{{project_path|quote}}"
repo: "{{scm_url|quote}}"
revision: "{{scm_branch|quote}}"
force: "{{scm_clean}}"
#checkout: "{{ scm_full_checkout }}"
#update: "{{ scm_full_checkout }}"
when: scm_type == 'svn' and not scm_username|default('') when: scm_type == 'svn' and not scm_username|default('')
register: scm_result
- name: Set the svn repository version
set_fact:
scm_version: "{{ scm_result['after'] }}"
when: "'after' in scm_result"
- name: update project using svn with auth - name: update project using svn with auth
subversion: dest={{project_path|quote}} repo={{scm_url|quote}} revision={{scm_branch|quote}} force={{scm_clean}} username={{scm_username|quote}} password={{scm_password|quote}} subversion:
dest: "{{project_path|quote}}"
repo: "{{scm_url|quote}}"
revision: "{{scm_branch|quote}}"
force: "{{scm_clean}}"
username: "{{scm_username|quote}}"
password: "{{scm_password|quote}}"
#checkout: "{{ scm_full_checkout }}"
#update: "{{ scm_full_checkout }}"
when: scm_type == 'svn' and scm_username|default('') when: scm_type == 'svn' and scm_username|default('')
register: scm_result
- name: Set the svn repository version
set_fact:
scm_version: "{{ scm_result['after'] }}"
when: "'after' in scm_result"
- name: detect requirements.yml - name: detect requirements.yml
stat: path={{project_path|quote}}/roles/requirements.yml stat: path={{project_path|quote}}/roles/requirements.yml
@@ -48,4 +113,14 @@
command: ansible-galaxy install -r requirements.yml -p {{project_path|quote}}/roles/ --force command: ansible-galaxy install -r requirements.yml -p {{project_path|quote}}/roles/ --force
args: args:
chdir: "{{project_path|quote}}/roles" chdir: "{{project_path|quote}}/roles"
when: doesRequirementsExist.stat.exists when: doesRequirementsExist.stat.exists and scm_full_checkout|bool
- name: Repository Version
debug: msg="Repository Version {{ scm_version }}"
when: scm_version is defined
- name: Write Repository Version
copy:
dest: "{{ scm_revision_output }}"
content: "{{ scm_version }}"
when: scm_version is defined and scm_revision_output is defined

View File

@@ -9,7 +9,6 @@ import djcelery
from datetime import timedelta from datetime import timedelta
from kombu import Queue, Exchange from kombu import Queue, Exchange
from kombu.common import Broadcast
# Update this module's local settings from the global settings module. # Update this module's local settings from the global settings module.
from django.conf import global_settings from django.conf import global_settings
@@ -358,11 +357,11 @@ CELERY_QUEUES = (
Queue('jobs', Exchange('jobs'), routing_key='jobs'), Queue('jobs', Exchange('jobs'), routing_key='jobs'),
Queue('scheduler', Exchange('scheduler', type='topic'), routing_key='scheduler.job.#', durable=False), Queue('scheduler', Exchange('scheduler', type='topic'), routing_key='scheduler.job.#', durable=False),
# Projects use a fanout queue, this isn't super well supported # Projects use a fanout queue, this isn't super well supported
Broadcast('projects'),
) )
CELERY_ROUTES = {'awx.main.tasks.run_job': {'queue': 'jobs', CELERY_ROUTES = {'awx.main.tasks.run_job': {'queue': 'jobs',
'routing_key': 'jobs'}, 'routing_key': 'jobs'},
'awx.main.tasks.run_project_update': {'queue': 'projects'}, 'awx.main.tasks.run_project_update': {'queue': 'jobs',
'routing_key': 'jobs'},
'awx.main.tasks.run_inventory_update': {'queue': 'jobs', 'awx.main.tasks.run_inventory_update': {'queue': 'jobs',
'routing_key': 'jobs'}, 'routing_key': 'jobs'},
'awx.main.tasks.run_ad_hoc_command': {'queue': 'jobs', 'awx.main.tasks.run_ad_hoc_command': {'queue': 'jobs',
@@ -374,7 +373,7 @@ CELERY_ROUTES = {'awx.main.tasks.run_job': {'queue': 'jobs',
'awx.main.scheduler.tasks.run_job_complete': {'queue': 'scheduler', 'awx.main.scheduler.tasks.run_job_complete': {'queue': 'scheduler',
'routing_key': 'scheduler.job.complete'}, 'routing_key': 'scheduler.job.complete'},
'awx.main.tasks.cluster_node_heartbeat': {'queue': 'default', 'awx.main.tasks.cluster_node_heartbeat': {'queue': 'default',
'routing_key': 'cluster.heartbeat'},} 'routing_key': 'cluster.heartbeat'}}
CELERYBEAT_SCHEDULE = { CELERYBEAT_SCHEDULE = {
'tower_scheduler': { 'tower_scheduler': {

View File

@@ -21,6 +21,7 @@ It's important to point out a few existing things:
by its needs. Thus we are pretty inflexible to customization beyond what our setup playbook allows. Each Tower node has a by its needs. Thus we are pretty inflexible to customization beyond what our setup playbook allows. Each Tower node has a
deployment of RabbitMQ that will cluster with the other nodes' RabbitMQ instances. deployment of RabbitMQ that will cluster with the other nodes' RabbitMQ instances.
* Existing old-style HA deployments will be transitioned automatically to the new HA system during the upgrade process. * Existing old-style HA deployments will be transitioned automatically to the new HA system during the upgrade process.
* Manual projects will need to be synced to all nodes by the customer
## Important Changes ## Important Changes
@@ -168,6 +169,7 @@ When verifying acceptance we should ensure the following statements are true
can communicate with the database. can communicate with the database.
* Crucially when network partitioning is resolved all nodes should recover into a consistent state * Crucially when network partitioning is resolved all nodes should recover into a consistent state
* Upgrade Testing, verify behavior before and after are the same for the end user. * Upgrade Testing, verify behavior before and after are the same for the end user.
* Project Updates should be thoroughly tested for all scm types (git, svn, hg) and for manual projects.
## Performance Testing ## Performance Testing