AC-505 Implement update_on_launch for inventory sources and dependency checking to prevent related inventory updates and jobs from happening simultaneously.

This commit is contained in:
Chris Church 2013-10-09 18:11:58 -04:00
parent 8531d6091f
commit 2f135f99d4
2 changed files with 88 additions and 5 deletions

View File

@ -1599,7 +1599,16 @@ class JobTemplate(CommonModel):
Return whether job template can be used to start a new job without
requiring any user input.
'''
return bool(self.credential and not self.credential.passwords_needed)
needed = []
if self.credential:
needed.extend(self.credential.passwords_needed)
if self.project.scm_update_on_launch:
needed.extend(self.project.scm_passwords_needed)
for inventory_source in self.inventory.inventory_sources.filter(active=True, update_on_launch=True):
for pw in inventory_source.source_passwords_needed:
if pw not in needed:
needed.append(pw)
return bool(len(needed) == 0)
class Job(CommonModelNameNotUnique):
'''
@ -1777,6 +1786,10 @@ class Job(CommonModelNameNotUnique):
needed.extend(self.credential.passwords_needed)
if self.project.scm_update_on_launch:
needed.extend(self.project.scm_passwords_needed)
for inventory_source in self.inventory.inventory_sources.filter(active=True, update_on_launch=True):
for pw in inventory_source.source_passwords_needed:
if pw not in needed:
needed.append(pw)
return needed
@property

View File

@ -278,7 +278,7 @@ class RunJob(BaseTask):
Celery task to run a job using ansible-playbook.
'''
name = 'run_job'
name = 'awx.main.tasks.run_job'
model = Job
def build_private_data(self, job, **kwargs):
@ -397,11 +397,16 @@ class RunJob(BaseTask):
Hook for checking job before running.
'''
project_update = None
inventory_updates = None
while True:
pk = job.pk
if job.status in ('pending', 'waiting'):
project = job.project
pu_qs = project.project_updates.filter(status__in=('pending', 'running'))
inventory = job.inventory
base_iu_qs = InventoryUpdate.objects.filter(inventory_source__inventory=inventory)
iu_qs = base_iu_qs.filter(status__in=('pending', 'running'))
is_qs = inventory.inventory_sources.filter(active=True, update_on_launch=True)
# Refresh the current project_update instance (if set).
if project_update:
try:
@ -411,6 +416,10 @@ class RunJob(BaseTask):
job = self.update_model(pk, status='error',
result_traceback=msg)
return False
# Refresh the current inventory_update instance(s) (if set).
if inventory_updates:
inventory_update_pks = [x.pk for x in inventory_updates]
inventory_updates = list(base_iu_qs.filter(pk__in=inventory_update_pks))
# If the job needs to update the project first (and there is no
# specific project update defined).
@ -429,14 +438,49 @@ class RunJob(BaseTask):
return False
#print 'job %d waiting on project update %d' % (pk, project_update.pk)
time.sleep(2.0)
# If the job needs to update any inventory first (and there are
# no current inventory updates pending).
elif inventory_updates is None and is_qs.count():
job = self.update_model(pk, status='waiting')
inventory_updates = []
msgs = []
for inventory_source in is_qs:
try:
inventory_update = iu_qs.filter(inventory_source=inventory_source)[0]
except IndexError:
# FIXME: Doesn't support multiple sources!!!
kw = dict([(k,v) for k,v in kwargs.items()
if k.startswith('source_')])
inventory_update = inventory_source.update(**kw)
if not inventory_update:
msgs.append('Unable to update inventory source %d before launch' % inventory_source.pk)
continue
inventory_updates.append(inventory_update)
if msgs:
msg = '\n'.join(msgs)
job = self.update_model(pk, status='error',
result_traceback=msg)
return False
time.sleep(2.0)
# If project update has failed, abort the job.
elif project_update and project_update.failed:
msg = 'Project update failed with status = %s.' % project_update.status
msg = 'Project update %d failed with status = %s.' % (project_update.pk, project_update.status)
job = self.update_model(pk, status='error',
result_traceback=msg)
return False
# Check if blocked by any other active project updates.
elif pu_qs.count():
# If any inventory update has failed, abort the job.
elif inventory_updates and any([x.failed for x in inventory_updates]):
msgs = []
for inventory_update in inventory_updates:
if inventory_update.failed:
msgs.append('Inventory update %d failed with status = %s.' % (inventory_update.pk, inventory_update.status))
if msgs:
msg = '\n'.join(msgs)
job = self.update_model(pk, status='error',
result_traceback=msg)
return False
# Check if blocked by any other active project or inventory updates.
elif pu_qs.count() or iu_qs.count():
#print 'job %d waiting on' % pk, pu_qs
job = self.update_model(pk, status='waiting')
time.sleep(4.0)
@ -758,3 +802,29 @@ class RunInventoryUpdate(BaseTask):
def build_cwd(self, inventory_update, **kwargs):
return self.get_path_to('..', 'plugins', 'inventory')
def pre_run_check(self, inventory_update, **kwargs):
'''
Hook for checking inventory update before running.
'''
while True:
pk = inventory_update.pk
if inventory_update.status in ('pending', 'waiting'):
# Check if inventory update is blocked by any jobs using the
# inventory or other active inventory updates.
inventory = inventory_update.inventory_source.inventory
jobs_qs = inventory.jobs.filter(status__in=('pending', 'running'))
iu_qs = InventoryUpdate.objects.filter(inventory_source__inventory=inventory, status__in=('pending', 'running'))
iu_qs = iu_qs.exclude(pk=inventory_update.pk)
if jobs_qs.count() or iu_qs.count():
#print 'inventory update %d waiting on' % pk, jobs_qs, iu_qs
inventory_update = self.update_model(pk, status='waiting')
time.sleep(4.0)
else:
inventory_update = self.update_model(pk, status='pending')
return True
elif inventory_update.cancel_flag:
inventory_update = self.update_model(pk, status='canceled')
return False
else:
return False