diff --git a/awx/main/managers.py b/awx/main/managers.py index 1adb75e913..274a0ef774 100644 --- a/awx/main/managers.py +++ b/awx/main/managers.py @@ -178,8 +178,6 @@ class InstanceGroupManager(models.Manager): if t.status == 'waiting' or not t.execution_node: # Subtract capacity from any peer groups that share instances if not t.instance_group: - logger.warning('Excluded %s from capacity algorithm ' - '(missing instance_group).', t.log_format) impacted_groups = [] elif t.instance_group.name not in ig_ig_mapping: # Waiting job in group with 0 capacity has no collateral impact diff --git a/awx/main/models/workflow.py b/awx/main/models/workflow.py index f43a43cd24..c63bbc6f1f 100644 --- a/awx/main/models/workflow.py +++ b/awx/main/models/workflow.py @@ -474,7 +474,7 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, SurveyJobMixin, JobNotificatio @property def preferred_instance_groups(self): - return self.global_instance_groups + return [] ''' A WorkflowJob is a virtual job. It doesn't result in a celery task. diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index f988e76fd3..b3d5ed14f1 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -259,7 +259,7 @@ class TaskManager(): else: if type(task) is WorkflowJob: task.status = 'running' - if not task.supports_isolation() and rampart_group.controller_id: + elif not task.supports_isolation() and rampart_group.controller_id: # non-Ansible jobs on isolated instances run on controller task.instance_group = rampart_group.controller logger.info('Submitting isolated %s to queue %s via %s.', @@ -271,7 +271,8 @@ class TaskManager(): task.celery_task_id = str(uuid.uuid4()) task.save() - self.consume_capacity(task, rampart_group.name) + if rampart_group is not None: + self.consume_capacity(task, rampart_group.name) def post_commit(): task.websocket_emit_status(task.status) @@ -281,7 +282,7 @@ class TaskManager(): connection.on_commit(post_commit) def process_running_tasks(self, running_tasks): - map(lambda task: self.graph[task.instance_group.name]['graph'].add_job(task), running_tasks) + map(lambda task: self.graph[task.instance_group.name]['graph'].add_job(task) if task.instance_group else None, running_tasks) def create_project_update(self, task): project_task = Project.objects.get(id=task.project_id).create_project_update( @@ -447,6 +448,9 @@ class TaskManager(): continue preferred_instance_groups = task.preferred_instance_groups found_acceptable_queue = False + if isinstance(task, WorkflowJob): + self.start_task(task, None, task.get_jobs_fail_chain()) + continue for rampart_group in preferred_instance_groups: remaining_capacity = self.get_remaining_capacity(rampart_group.name) if remaining_capacity <= 0: diff --git a/awx/main/tests/functional/task_management/test_rampart_groups.py b/awx/main/tests/functional/task_management/test_rampart_groups.py index 9b4b3eac44..ce79b78003 100644 --- a/awx/main/tests/functional/task_management/test_rampart_groups.py +++ b/awx/main/tests/functional/task_management/test_rampart_groups.py @@ -2,7 +2,7 @@ import pytest import mock from datetime import timedelta from awx.main.scheduler import TaskManager -from awx.main.models import InstanceGroup +from awx.main.models import InstanceGroup, WorkflowJob from awx.main.tasks import apply_cluster_membership_policies @@ -77,6 +77,18 @@ def test_multi_group_with_shared_dependency(instance_factory, default_instance_g assert TaskManager.start_task.call_count == 2 +@pytest.mark.django_db +def test_workflow_job_no_instancegroup(workflow_job_template_factory, default_instance_group, mocker): + wfjt = workflow_job_template_factory('anicedayforawalk').workflow_job_template + wfj = WorkflowJob.objects.create(workflow_job_template=wfjt) + wfj.status = "pending" + wfj.save() + with mocker.patch("awx.main.scheduler.TaskManager.start_task"): + TaskManager().schedule() + TaskManager.start_task.assert_called_once_with(wfj, None, []) + assert wfj.instance_group is None + + @pytest.mark.django_db def test_overcapacity_blocking_other_groups_unaffected(instance_factory, default_instance_group, mocker, instance_group_factory, job_template_factory):