diff --git a/awx/main/models/inventory.py b/awx/main/models/inventory.py index 805e4eb1f4..c7b9254ada 100644 --- a/awx/main/models/inventory.py +++ b/awx/main/models/inventory.py @@ -247,6 +247,7 @@ class Inventory(CommonModelNameNotUnique, ResourceMixin, RelatedJobsMixin): data = dict() all_group = data.setdefault('all', dict()) + all_hostnames = set(host.name for host in hosts) if self.variables_dict: all_group['vars'] = self.variables_dict @@ -264,6 +265,8 @@ class Inventory(CommonModelNameNotUnique, ResourceMixin, RelatedJobsMixin): ).values_list('group_id', 'host_id', 'host__name') group_hosts_map = {} for group_id, host_id, host_name in group_hosts_qs: + if host_name not in all_hostnames: + continue # host might not be in current shard group_hostnames = group_hosts_map.setdefault(group_id, []) group_hostnames.append(host_name) grouped_hosts.add(host_name) diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index bb8f6f0dbc..a0ea174bbe 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -595,6 +595,7 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin, TaskMana new_prompts['_prevent_slicing'] = True new_prompts.setdefault('_eager_fields', {}) new_prompts['_eager_fields']['job_slice_number'] = self.job_slice_number + new_prompts['_eager_fields']['job_slice_count'] = self.job_slice_count return super(Job, self).copy_unified_job(**new_prompts) @property @@ -690,6 +691,9 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin, TaskMana count_hosts = 2 else: count_hosts = Host.objects.filter(inventory__jobs__pk=self.pk).count() + if self.job_slice_count > 1: + # Integer division intentional + count_hosts = (count_hosts + self.job_slice_count - self.job_slice_number) / self.job_slice_count return min(count_hosts, 5 if self.forks == 0 else self.forks) + 1 @property diff --git a/awx/main/tests/functional/models/test_inventory.py b/awx/main/tests/functional/models/test_inventory.py index e11a4f926c..97cf1cb0a0 100644 --- a/awx/main/tests/functional/models/test_inventory.py +++ b/awx/main/tests/functional/models/test_inventory.py @@ -46,6 +46,25 @@ class TestInventoryScript: 'all': {'hosts': ['host{}'.format(i)]} } + def test_slice_subset_with_groups(self, inventory): + hosts = [] + for i in range(3): + host = inventory.hosts.create(name='host{}'.format(i)) + hosts.append(host) + g1 = inventory.groups.create(name='contains_all_hosts') + for host in hosts: + g1.hosts.add(host) + g2 = inventory.groups.create(name='contains_two_hosts') + for host in hosts[:2]: + g2.hosts.add(host) + for i in range(3): + expected_data = { + 'contains_all_hosts': {'hosts': ['host{}'.format(i)], 'children': [], 'vars': {}}, + } + if i < 2: + expected_data['contains_two_hosts'] = {'hosts': ['host{}'.format(i)], 'children': [], 'vars': {}} + assert inventory.get_script_data(slice_number=i + 1, slice_count=3) == expected_data + @pytest.mark.django_db class TestActiveCount: diff --git a/awx/main/tests/functional/models/test_unified_job.py b/awx/main/tests/functional/models/test_unified_job.py index f587e4c448..74e163c66e 100644 --- a/awx/main/tests/functional/models/test_unified_job.py +++ b/awx/main/tests/functional/models/test_unified_job.py @@ -152,3 +152,50 @@ def test_event_processing_not_finished(): def test_event_model_undefined(): wj = WorkflowJob.objects.create(name='foobar', status='finished') assert wj.event_processing_finished + + +@pytest.mark.django_db +class TestTaskImpact: + @pytest.fixture + def job_host_limit(self, job_template, inventory): + def r(hosts, forks): + for i in range(hosts): + inventory.hosts.create(name='foo' + str(i)) + job = Job.objects.create( + name='fake-job', + launch_type='workflow', + job_template=job_template, + inventory=inventory, + forks=forks + ) + return job + return r + + def test_limit_task_impact(self, job_host_limit): + job = job_host_limit(5, 2) + assert job.task_impact == 2 + 1 # forks becomes constraint + + def test_host_task_impact(self, job_host_limit): + job = job_host_limit(3, 5) + assert job.task_impact == 3 + 1 # hosts becomes constraint + + def test_shard_task_impact(self, slice_job_factory): + # factory creates on host per slice + workflow_job = slice_job_factory(3, jt_kwargs={'forks': 50}, spawn=True) + # arrange the jobs by their number + jobs = [None for i in range(3)] + for node in workflow_job.workflow_nodes.all(): + jobs[node.job.job_slice_number - 1] = node.job + # Even distribution - all jobs run on 1 host + assert [ + len(jobs[0].inventory.get_script_data(slice_number=i + 1, slice_count=3)['all']['hosts']) + for i in range(3) + ] == [1, 1, 1] + assert [job.task_impact for job in jobs] == [2, 2, 2] # plus one base task impact + # Uneven distribution - first job takes the extra host + jobs[0].inventory.hosts.create(name='remainder_foo') + assert [ + len(jobs[0].inventory.get_script_data(slice_number=i + 1, slice_count=3)['all']['hosts']) + for i in range(3) + ] == [2, 1, 1] + assert [job.task_impact for job in jobs] == [3, 2, 2]