From 93c329d9d517ebb3336dea772d3f1448a7bc901e Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Mon, 30 Oct 2023 15:30:18 -0400 Subject: [PATCH] Fix cancel bug - WorkflowManager cancel in transaction (#14608) This fixes a bug where jobs within a workflow job were not canceled when the workflow job was canceled by the user The fix is to submit the cancel request as a part of the transaction that WorkflowManager commits its work in this requires that we send the message without expecting a reply so this changes the control-with-reply cancel to just a control function --- awx/main/dispatch/control.py | 7 +++++-- awx/main/dispatch/worker/base.py | 5 +++-- awx/main/models/unified_jobs.py | 5 +++++ 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/awx/main/dispatch/control.py b/awx/main/dispatch/control.py index f08d9b8c38..977c19b946 100644 --- a/awx/main/dispatch/control.py +++ b/awx/main/dispatch/control.py @@ -37,8 +37,11 @@ class Control(object): def running(self, *args, **kwargs): return self.control_with_reply('running', *args, **kwargs) - def cancel(self, task_ids, *args, **kwargs): - return self.control_with_reply('cancel', *args, extra_data={'task_ids': task_ids}, **kwargs) + def cancel(self, task_ids, with_reply=True): + if with_reply: + return self.control_with_reply('cancel', extra_data={'task_ids': task_ids}) + else: + self.control({'control': 'cancel', 'task_ids': task_ids, 'reply_to': None}, extra_data={'task_ids': task_ids}) def schedule(self, *args, **kwargs): return self.control_with_reply('schedule', *args, **kwargs) diff --git a/awx/main/dispatch/worker/base.py b/awx/main/dispatch/worker/base.py index 8a86912f83..2ff8752f06 100644 --- a/awx/main/dispatch/worker/base.py +++ b/awx/main/dispatch/worker/base.py @@ -89,8 +89,9 @@ class AWXConsumerBase(object): if task_ids and not msg: logger.info(f'Could not locate running tasks to cancel with ids={task_ids}') - with pg_bus_conn() as conn: - conn.notify(reply_queue, json.dumps(msg)) + if reply_queue is not None: + with pg_bus_conn() as conn: + conn.notify(reply_queue, json.dumps(msg)) elif control == 'reload': for worker in self.pool.workers: worker.quit() diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 8dcb5df849..6ba605c0d4 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -1439,6 +1439,11 @@ class UnifiedJob( if not self.celery_task_id: return canceled = [] + if not connection.get_autocommit(): + # this condition is purpose-written for the task manager, when it cancels jobs in workflows + ControlDispatcher('dispatcher', self.controller_node).cancel([self.celery_task_id], with_reply=False) + return True # task manager itself needs to act under assumption that cancel was received + try: # Use control and reply mechanism to cancel and obtain confirmation timeout = 5