Merge pull request #3912 from chrismeyersfsu/bug-allow_simulataneous

fix job blocking job with allow simultaneous
This commit is contained in:
Chris Meyers 2016-11-08 11:39:23 -05:00 committed by GitHub
commit 258fd52ef2
2 changed files with 21 additions and 7 deletions

View File

@ -291,7 +291,6 @@ class TaskManager():
'Celery, so it has been marked as failed.',
))
task_obj.save()
print("Going to fail %s" % task_obj.id)
connection.on_commit(lambda: task_obj.websocket_emit_status('failed'))
logger.error("Task %s appears orphaned... marking as failed" % task)
@ -312,7 +311,6 @@ class TaskManager():
return (self.capacity_total - self.capacity_used)
def process_tasks(self, all_sorted_tasks):
running_tasks = filter(lambda t: t['status'] == 'running', all_sorted_tasks)
runnable_tasks = filter(lambda t: t['status'] in ['waiting', 'running'], all_sorted_tasks)

View File

@ -12,7 +12,11 @@ from awx.main.scheduler.partial import (
class DependencyGraph(object):
PROJECT_UPDATES = 'project_updates'
INVENTORY_UPDATES = 'inventory_updates'
JOB_TEMPLATE_JOBS = 'job_template_jobs'
JOB_PROJECT_IDS = 'job_project_ids'
JOB_INVENTORY_IDS = 'job_inventory_ids'
SYSTEM_JOB = 'system_job'
INVENTORY_SOURCE_UPDATES = 'inventory_source_updates'
WORKFLOW_JOB_TEMPLATES_JOBS = 'workflow_job_template_jobs'
@ -30,6 +34,16 @@ class DependencyGraph(object):
self.data[self.INVENTORY_UPDATES] = {}
# job_template_id -> True / False
self.data[self.JOB_TEMPLATE_JOBS] = {}
'''
Track runnable job related project and inventory to ensure updates
don't run while a job needing those resources is running.
'''
# project_id -> True / False
self.data[self.JOB_PROJECT_IDS] = {}
# inventory_id -> True / False
self.data[self.JOB_INVENTORY_IDS] = {}
# inventory_source_id -> True / False
self.data[self.INVENTORY_SOURCE_UPDATES] = {}
# True / False
@ -138,21 +152,23 @@ class DependencyGraph(object):
self.data[self.INVENTORY_SOURCE_UPDATES][inventory_source_id] = False
def mark_job_template_job(self, job):
self.data[self.INVENTORY_UPDATES][job['inventory_id']] = False
self.data[self.PROJECT_UPDATES][job['project_id']] = False
self.data[self.JOB_INVENTORY_IDS][job['inventory_id']] = False
self.data[self.JOB_PROJECT_IDS][job['project_id']] = False
self.data[self.JOB_TEMPLATE_JOBS][job['job_template_id']] = False
def mark_workflow_job(self, job):
self.data[self.WORKFLOW_JOB_TEMPLATES_JOBS][job['workflow_job_template_id']] = False
def can_project_update_run(self, job):
return self.data[self.PROJECT_UPDATES].get(job['project_id'], True)
return self.data[self.JOB_PROJECT_IDS].get(job['project_id'], True) and \
self.data[self.PROJECT_UPDATES].get(job['project_id'], True)
def can_inventory_update_run(self, job):
return self.data[self.INVENTORY_SOURCE_UPDATES].get(job['inventory_source_id'], True)
return self.data[self.JOB_INVENTORY_IDS].get(job['inventory_source__inventory_id'], True) and \
self.data[self.INVENTORY_SOURCE_UPDATES].get(job['inventory_source_id'], True)
def can_job_run(self, job):
if self.can_project_update_run(job) is True and \
if self.data[self.PROJECT_UPDATES].get(job['project_id'], True) is True and \
self.data[self.INVENTORY_UPDATES].get(job['inventory_id'], True) is True:
if job['allow_simultaneous'] is False:
return self.data[self.JOB_TEMPLATE_JOBS].get(job['job_template_id'], True)