From b94b3a1e91604a83803635849f659d465917399b Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Tue, 12 Jul 2022 11:36:51 -0400 Subject: [PATCH] [task_manager_refactor] Move approval node expiration logic into queryset (#12502) Instead of loading all pending Workflow Approvals in the task manager, run a query that will only return the expired apporovals directly expire all which are returned by that query Cache expires time as a new field in order to simplify WorkflowApproval filter --- .../0165_workflowapproval_expires.py | 23 +++++++++ awx/main/models/workflow.py | 26 ++++++++++ awx/main/scheduler/task_manager.py | 48 +++++++++---------- .../functional/api/test_workflow_node.py | 42 ++++++++++++++-- 4 files changed, 111 insertions(+), 28 deletions(-) create mode 100644 awx/main/migrations/0165_workflowapproval_expires.py diff --git a/awx/main/migrations/0165_workflowapproval_expires.py b/awx/main/migrations/0165_workflowapproval_expires.py new file mode 100644 index 0000000000..b4fa0c6f39 --- /dev/null +++ b/awx/main/migrations/0165_workflowapproval_expires.py @@ -0,0 +1,23 @@ +# Generated by Django 3.2.13 on 2022-07-12 14:33 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('main', '0164_remove_inventorysource_update_on_project_update'), + ] + + operations = [ + migrations.AddField( + model_name='workflowapproval', + name='expires', + field=models.DateTimeField( + default=None, + editable=False, + help_text='The time this approval will expire. This is the created time plus timeout, used for filtering.', + null=True, + ), + ), + ] diff --git a/awx/main/models/workflow.py b/awx/main/models/workflow.py index ea0ef2f9ea..3664331f44 100644 --- a/awx/main/models/workflow.py +++ b/awx/main/models/workflow.py @@ -13,6 +13,7 @@ from django.db import connection, models from django.conf import settings from django.utils.translation import gettext_lazy as _ from django.core.exceptions import ObjectDoesNotExist +from django.utils.timezone import now, timedelta # from django import settings as tower_settings @@ -783,6 +784,12 @@ class WorkflowApproval(UnifiedJob, JobNotificationMixin): default=0, help_text=_("The amount of time (in seconds) before the approval node expires and fails."), ) + expires = models.DateTimeField( + default=None, + null=True, + editable=False, + help_text=_("The time this approval will expire. This is the created time plus timeout, used for filtering."), + ) timed_out = models.BooleanField(default=False, help_text=_("Shows when an approval node (with a timeout assigned to it) has timed out.")) approved_or_denied_by = models.ForeignKey( 'auth.User', @@ -810,6 +817,25 @@ class WorkflowApproval(UnifiedJob, JobNotificationMixin): def _get_parent_field_name(self): return 'workflow_approval_template' + def save(self, *args, **kwargs): + update_fields = list(kwargs.get('update_fields', [])) + if self.timeout != 0 and ((not self.pk) or (not update_fields) or ('timeout' in update_fields)): + if not self.created: # on creation, created will be set by parent class, so we fudge it here + created = now() + else: + created = self.created + new_expires = created + timedelta(seconds=self.timeout) + if new_expires != self.expires: + self.expires = new_expires + if update_fields and 'expires' not in update_fields: + update_fields.append('expires') + elif self.timeout == 0 and ((not update_fields) or ('timeout' in update_fields)): + if self.expires: + self.expires = None + if update_fields and 'expires' not in update_fields: + update_fields.append('expires') + super(WorkflowApproval, self).save(*args, **kwargs) + def approve(self, request=None): self.status = 'successful' self.approved_or_denied_by = get_current_user() diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index db1f96cff6..a1fa8289ec 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -250,30 +250,6 @@ class WorkflowManager(TaskBase): return result - def timeout_approval_node(self): - workflow_approvals = WorkflowApproval.objects.filter(status='pending') - now = tz_now() - for task in workflow_approvals: - if self.timed_out(): - logger.warning("Workflow manager has reached time out while processing approval nodes, exiting loop early") - # Do not process any more workflow approval nodes. Stop here. - # Maybe we should schedule another WorkflowManager run - break - approval_timeout_seconds = timedelta(seconds=task.timeout) - if task.timeout == 0: - continue - if (now - task.created) >= approval_timeout_seconds: - timeout_message = _("The approval node {name} ({pk}) has expired after {timeout} seconds.").format( - name=task.name, pk=task.pk, timeout=task.timeout - ) - logger.warning(timeout_message) - task.timed_out = True - task.status = 'failed' - task.send_approval_notification('timed_out') - task.websocket_emit_status(task.status) - task.job_explanation = timeout_message - task.save(update_fields=['status', 'job_explanation', 'timed_out']) - @timeit def get_tasks(self, filter_args): self.all_tasks = [wf for wf in WorkflowJob.objects.filter(**filter_args)] @@ -283,7 +259,6 @@ class WorkflowManager(TaskBase): self.get_tasks(dict(status__in=["running"], dependencies_processed=True)) if len(self.all_tasks) > 0: self.spawn_workflow_graph_jobs() - self.timeout_approval_node() class DependencyManager(TaskBase): @@ -731,6 +706,26 @@ class TaskManager(TaskBase): self.process_pending_tasks(pending_tasks) self.subsystem_metrics.inc(f"{self.prefix}_pending_processed", len(pending_tasks)) + def timeout_approval_node(self, task): + if self.timed_out(): + logger.warning("Task manager has reached time out while processing approval nodes, exiting loop early") + # Do not process any more workflow approval nodes. Stop here. + # Maybe we should schedule another TaskManager run + return + timeout_message = _("The approval node {name} ({pk}) has expired after {timeout} seconds.").format(name=task.name, pk=task.pk, timeout=task.timeout) + logger.warning(timeout_message) + task.timed_out = True + task.status = 'failed' + task.send_approval_notification('timed_out') + task.websocket_emit_status(task.status) + task.job_explanation = timeout_message + task.save(update_fields=['status', 'job_explanation', 'timed_out']) + + def get_expired_workflow_approvals(self): + # timeout of 0 indicates that it never expires + qs = WorkflowApproval.objects.filter(status='pending').exclude(timeout=0).filter(expires__lt=tz_now()) + return qs + @timeit def _schedule(self): self.get_tasks(dict(status__in=["pending", "waiting", "running"], dependencies_processed=True)) @@ -740,3 +735,6 @@ class TaskManager(TaskBase): if len(self.all_tasks) > 0: self.process_tasks() + + for workflow_approval in self.get_expired_workflow_approvals(): + self.timeout_approval_node(workflow_approval) diff --git a/awx/main/tests/functional/api/test_workflow_node.py b/awx/main/tests/functional/api/test_workflow_node.py index 74ab92fd7b..0b89dfb546 100644 --- a/awx/main/tests/functional/api/test_workflow_node.py +++ b/awx/main/tests/functional/api/test_workflow_node.py @@ -13,7 +13,10 @@ from awx.main.models.workflow import ( WorkflowJobTemplateNode, ) from awx.main.models.credential import Credential -from awx.main.scheduler import TaskManager +from awx.main.scheduler import TaskManager, WorkflowManager, DependencyManager + +# Django +from django.utils.timezone import now, timedelta @pytest.fixture @@ -137,8 +140,9 @@ class TestApprovalNodes: post(url, {'name': 'Approve Test', 'description': '', 'timeout': 0}, user=admin_user, expect=201) post(reverse('api:workflow_job_template_launch', kwargs={'pk': wfjt.pk}), user=admin_user, expect=201) wf_job = WorkflowJob.objects.first() + DependencyManager().schedule() # TODO: exclude workflows from this and delete line TaskManager().schedule() - TaskManager().schedule() + WorkflowManager().schedule() wfj_node = wf_job.workflow_nodes.first() approval = wfj_node.job assert approval.name == 'Approve Test' @@ -162,8 +166,9 @@ class TestApprovalNodes: post(url, {'name': 'Deny Test', 'description': '', 'timeout': 0}, user=admin_user, expect=201) post(reverse('api:workflow_job_template_launch', kwargs={'pk': wfjt.pk}), user=admin_user, expect=201) wf_job = WorkflowJob.objects.first() + DependencyManager().schedule() # TODO: exclude workflows from this and delete line TaskManager().schedule() - TaskManager().schedule() + WorkflowManager().schedule() wfj_node = wf_job.workflow_nodes.first() approval = wfj_node.job assert approval.name == 'Deny Test' @@ -216,6 +221,37 @@ class TestApprovalNodes: approval.refresh_from_db() assert approval.status == 'failed' + def test_expires_time_on_creation(self): + now_time = now() + wa = WorkflowApproval.objects.create(timeout=34) + # this is fudged, so we assert that the expires time is in reasonable range + assert timedelta(seconds=33) < (wa.expires - now_time) < timedelta(seconds=35) + + @pytest.mark.parametrize('with_update_fields', [True, False]) + def test_expires_time_update(self, with_update_fields): + wa = WorkflowApproval.objects.create() + assert wa.timeout == 0 + assert wa.expires is None + wa.timeout = 1234 + if with_update_fields: + wa.save(update_fields=['timeout']) + else: + wa.save() + assert wa.created + timedelta(seconds=1234) == wa.expires + + @pytest.mark.parametrize('with_update_fields', [True, False]) + def test_reset_timeout_and_expires(self, with_update_fields): + wa = WorkflowApproval.objects.create() + wa.timeout = 1234 + wa.save() + assert wa.expires + wa.timeout = 0 + if with_update_fields: + wa.save(update_fields=['timeout']) + else: + wa.save() + assert wa.expires is None + @pytest.mark.django_db class TestExclusiveRelationshipEnforcement: