Merge pull request #2239 from AlanCoding/multi_pass_cancel

Do 2-pass cancel for workflow jobs

Reviewed-by: https://github.com/softwarefactory-project-zuul[bot]
This commit is contained in:
softwarefactory-project-zuul[bot] 2018-10-26 14:43:14 +00:00 committed by GitHub
commit 264f35d259
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 27 additions and 5 deletions

View File

@ -50,6 +50,7 @@ class WorkflowDAG(SimpleDAG):
return [n['node_object'] for n in nodes_found]
def cancel_node_jobs(self):
cancel_finished = True
for n in self.nodes:
obj = n['node_object']
job = obj.job
@ -57,7 +58,9 @@ class WorkflowDAG(SimpleDAG):
if not job:
continue
elif job.can_cancel:
cancel_finished = False
job.cancel()
return cancel_finished
def is_workflow_done(self):
root_nodes = self.get_root_nodes()

View File

@ -103,8 +103,15 @@ class TaskManager():
def spawn_workflow_graph_jobs(self, workflow_jobs):
for workflow_job in workflow_jobs:
if workflow_job.cancel_flag:
logger.debug('Not spawning jobs for %s because it is pending cancelation.', workflow_job.log_format)
continue
dag = WorkflowDAG(workflow_job)
spawn_nodes = dag.bfs_nodes_to_run()
if spawn_nodes:
logger.info('Spawning jobs for %s', workflow_job.log_format)
else:
logger.debug('No nodes to spawn for %s', workflow_job.log_format)
for spawn_node in spawn_nodes:
if spawn_node.unified_job_template is None:
continue
@ -112,6 +119,7 @@ class TaskManager():
job = spawn_node.unified_job_template.create_unified_job(**kv)
spawn_node.job = job
spawn_node.save()
logger.info('Spawned %s in %s for node %s', job.log_format, workflow_job.log_format, spawn_node.pk)
if job._resources_sufficient_for_launch():
can_start = job.signal_start()
if not can_start:
@ -129,20 +137,23 @@ class TaskManager():
# TODO: should we emit a status on the socket here similar to tasks.py awx_periodic_scheduler() ?
#emit_websocket_notification('/socket.io/jobs', '', dict(id=))
# See comment in tasks.py::RunWorkflowJob::run()
def process_finished_workflow_jobs(self, workflow_jobs):
result = []
for workflow_job in workflow_jobs:
dag = WorkflowDAG(workflow_job)
if workflow_job.cancel_flag:
workflow_job.status = 'canceled'
workflow_job.save()
dag.cancel_node_jobs()
connection.on_commit(lambda: workflow_job.websocket_emit_status(workflow_job.status))
logger.debug('Canceling spawned jobs of %s due to cancel flag.', workflow_job.log_format)
cancel_finished = dag.cancel_node_jobs()
if cancel_finished:
logger.info('Marking %s as canceled, all spawned jobs have concluded.', workflow_job.log_format)
workflow_job.status = 'canceled'
workflow_job.save()
connection.on_commit(lambda: workflow_job.websocket_emit_status(workflow_job.status))
else:
is_done, has_failed = dag.is_workflow_done()
if not is_done:
continue
logger.info('Marking %s as %s.', workflow_job.log_format, 'failed' if has_failed else 'successful')
result.append(workflow_job.id)
workflow_job.status = 'failed' if has_failed else 'successful'
workflow_job.save()
@ -499,6 +510,14 @@ class TaskManager():
running_workflow_tasks = self.get_running_workflow_jobs()
finished_wfjs = self.process_finished_workflow_jobs(running_workflow_tasks)
previously_running_workflow_tasks = running_workflow_tasks
running_workflow_tasks = []
for workflow_job in previously_running_workflow_tasks:
if workflow_job.status == 'running':
running_workflow_tasks.append(workflow_job)
else:
logger.debug('Removed %s from job spawning consideration.', workflow_job.log_format)
self.spawn_workflow_graph_jobs(running_workflow_tasks)
self.process_tasks(all_sorted_tasks)