Merge pull request #1555 from ansible/no_instancegroup_for_workflowjob

Remove Instance Group concept/usage from WorkflowJobs
This commit is contained in:
Matthew Jones
2018-04-25 08:30:40 -04:00
committed by GitHub
4 changed files with 21 additions and 7 deletions

View File

@@ -178,8 +178,6 @@ class InstanceGroupManager(models.Manager):
if t.status == 'waiting' or not t.execution_node: if t.status == 'waiting' or not t.execution_node:
# Subtract capacity from any peer groups that share instances # Subtract capacity from any peer groups that share instances
if not t.instance_group: if not t.instance_group:
logger.warning('Excluded %s from capacity algorithm '
'(missing instance_group).', t.log_format)
impacted_groups = [] impacted_groups = []
elif t.instance_group.name not in ig_ig_mapping: elif t.instance_group.name not in ig_ig_mapping:
# Waiting job in group with 0 capacity has no collateral impact # Waiting job in group with 0 capacity has no collateral impact

View File

@@ -474,7 +474,7 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, SurveyJobMixin, JobNotificatio
@property @property
def preferred_instance_groups(self): def preferred_instance_groups(self):
return self.global_instance_groups return []
''' '''
A WorkflowJob is a virtual job. It doesn't result in a celery task. A WorkflowJob is a virtual job. It doesn't result in a celery task.

View File

@@ -259,7 +259,7 @@ class TaskManager():
else: else:
if type(task) is WorkflowJob: if type(task) is WorkflowJob:
task.status = 'running' task.status = 'running'
if not task.supports_isolation() and rampart_group.controller_id: elif not task.supports_isolation() and rampart_group.controller_id:
# non-Ansible jobs on isolated instances run on controller # non-Ansible jobs on isolated instances run on controller
task.instance_group = rampart_group.controller task.instance_group = rampart_group.controller
logger.info('Submitting isolated %s to queue %s via %s.', logger.info('Submitting isolated %s to queue %s via %s.',
@@ -271,7 +271,8 @@ class TaskManager():
task.celery_task_id = str(uuid.uuid4()) task.celery_task_id = str(uuid.uuid4())
task.save() task.save()
self.consume_capacity(task, rampart_group.name) if rampart_group is not None:
self.consume_capacity(task, rampart_group.name)
def post_commit(): def post_commit():
task.websocket_emit_status(task.status) task.websocket_emit_status(task.status)
@@ -281,7 +282,7 @@ class TaskManager():
connection.on_commit(post_commit) connection.on_commit(post_commit)
def process_running_tasks(self, running_tasks): def process_running_tasks(self, running_tasks):
map(lambda task: self.graph[task.instance_group.name]['graph'].add_job(task), running_tasks) map(lambda task: self.graph[task.instance_group.name]['graph'].add_job(task) if task.instance_group else None, running_tasks)
def create_project_update(self, task): def create_project_update(self, task):
project_task = Project.objects.get(id=task.project_id).create_project_update( project_task = Project.objects.get(id=task.project_id).create_project_update(
@@ -447,6 +448,9 @@ class TaskManager():
continue continue
preferred_instance_groups = task.preferred_instance_groups preferred_instance_groups = task.preferred_instance_groups
found_acceptable_queue = False found_acceptable_queue = False
if isinstance(task, WorkflowJob):
self.start_task(task, None, task.get_jobs_fail_chain())
continue
for rampart_group in preferred_instance_groups: for rampart_group in preferred_instance_groups:
remaining_capacity = self.get_remaining_capacity(rampart_group.name) remaining_capacity = self.get_remaining_capacity(rampart_group.name)
if remaining_capacity <= 0: if remaining_capacity <= 0:

View File

@@ -2,7 +2,7 @@ import pytest
import mock import mock
from datetime import timedelta from datetime import timedelta
from awx.main.scheduler import TaskManager from awx.main.scheduler import TaskManager
from awx.main.models import InstanceGroup from awx.main.models import InstanceGroup, WorkflowJob
from awx.main.tasks import apply_cluster_membership_policies from awx.main.tasks import apply_cluster_membership_policies
@@ -77,6 +77,18 @@ def test_multi_group_with_shared_dependency(instance_factory, default_instance_g
assert TaskManager.start_task.call_count == 2 assert TaskManager.start_task.call_count == 2
@pytest.mark.django_db
def test_workflow_job_no_instancegroup(workflow_job_template_factory, default_instance_group, mocker):
wfjt = workflow_job_template_factory('anicedayforawalk').workflow_job_template
wfj = WorkflowJob.objects.create(workflow_job_template=wfjt)
wfj.status = "pending"
wfj.save()
with mocker.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(wfj, None, [])
assert wfj.instance_group is None
@pytest.mark.django_db @pytest.mark.django_db
def test_overcapacity_blocking_other_groups_unaffected(instance_factory, default_instance_group, mocker, def test_overcapacity_blocking_other_groups_unaffected(instance_factory, default_instance_group, mocker,
instance_group_factory, job_template_factory): instance_group_factory, job_template_factory):