Add task manager rescheduling hooks, de-duplication, lifecycle tests

This commit is contained in:
AlanCoding
2018-11-14 11:31:34 -05:00
parent 5e18eccd19
commit 758a488aee
7 changed files with 140 additions and 25 deletions

View File

@@ -30,7 +30,7 @@ from awx.main.models import (
)
from awx.main.scheduler.dag_workflow import WorkflowDAG
from awx.main.utils.pglock import advisory_lock
from awx.main.utils import get_type_for_model
from awx.main.utils import get_type_for_model, task_manager_bulk_reschedule, schedule_task_manager
from awx.main.signals import disable_activity_stream
from awx.main.scheduler.dependency_graph import DependencyGraph
from awx.main.utils import decrypt_field
@@ -161,6 +161,7 @@ class TaskManager():
result = []
for workflow_job in workflow_jobs:
dag = WorkflowDAG(workflow_job)
status_changed = False
if workflow_job.cancel_flag:
logger.debug('Canceling spawned jobs of %s due to cancel flag.', workflow_job.log_format)
cancel_finished = dag.cancel_node_jobs()
@@ -169,7 +170,7 @@ class TaskManager():
workflow_job.status = 'canceled'
workflow_job.start_args = '' # blank field to remove encrypted passwords
workflow_job.save(update_fields=['status', 'start_args'])
workflow_job.websocket_emit_status(workflow_job.status)
status_changed = True
else:
is_done, has_failed = dag.is_workflow_done()
if not is_done:
@@ -181,7 +182,11 @@ class TaskManager():
workflow_job.status = new_status
workflow_job.start_args = '' # blank field to remove encrypted passwords
workflow_job.save(update_fields=['status', 'start_args'])
status_changed = True
if status_changed:
workflow_job.websocket_emit_status(workflow_job.status)
if workflow_job.spawned_by_workflow:
schedule_task_manager()
return result
def get_dependent_jobs_for_inv_and_proj_update(self, job_obj):
@@ -221,6 +226,7 @@ class TaskManager():
if type(task) is WorkflowJob:
task.status = 'running'
logger.info('Transitioning %s to running status.', task.log_format)
schedule_task_manager()
elif not task.supports_isolation() and rampart_group.controller_id:
# non-Ansible jobs on isolated instances run on controller
task.instance_group = rampart_group.controller
@@ -556,7 +562,8 @@ class TaskManager():
return
logger.debug("Starting Scheduler")
finished_wfjs = self._schedule()
with task_manager_bulk_reschedule():
finished_wfjs = self._schedule()
# Operations whose queries rely on modifications made during the atomic scheduling session
for wfj in WorkflowJob.objects.filter(id__in=finished_wfjs):