fix slicing task_impact and script gen bugs

This commit is contained in:
AlanCoding 2018-10-16 16:47:11 -04:00
parent f72fca5fcf
commit 37f9024940
No known key found for this signature in database
GPG Key ID: FD2C3C012A72926B
4 changed files with 73 additions and 0 deletions

View File

@ -247,6 +247,7 @@ class Inventory(CommonModelNameNotUnique, ResourceMixin, RelatedJobsMixin):
data = dict()
all_group = data.setdefault('all', dict())
all_hostnames = set(host.name for host in hosts)
if self.variables_dict:
all_group['vars'] = self.variables_dict
@ -264,6 +265,8 @@ class Inventory(CommonModelNameNotUnique, ResourceMixin, RelatedJobsMixin):
).values_list('group_id', 'host_id', 'host__name')
group_hosts_map = {}
for group_id, host_id, host_name in group_hosts_qs:
if host_name not in all_hostnames:
continue # host might not be in current shard
group_hostnames = group_hosts_map.setdefault(group_id, [])
group_hostnames.append(host_name)
grouped_hosts.add(host_name)

View File

@ -595,6 +595,7 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin, TaskMana
new_prompts['_prevent_slicing'] = True
new_prompts.setdefault('_eager_fields', {})
new_prompts['_eager_fields']['job_slice_number'] = self.job_slice_number
new_prompts['_eager_fields']['job_slice_count'] = self.job_slice_count
return super(Job, self).copy_unified_job(**new_prompts)
@property
@ -690,6 +691,9 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin, TaskMana
count_hosts = 2
else:
count_hosts = Host.objects.filter(inventory__jobs__pk=self.pk).count()
if self.job_slice_count > 1:
# Integer division intentional
count_hosts = (count_hosts + self.job_slice_count - self.job_slice_number) / self.job_slice_count
return min(count_hosts, 5 if self.forks == 0 else self.forks) + 1
@property

View File

@ -46,6 +46,25 @@ class TestInventoryScript:
'all': {'hosts': ['host{}'.format(i)]}
}
def test_slice_subset_with_groups(self, inventory):
hosts = []
for i in range(3):
host = inventory.hosts.create(name='host{}'.format(i))
hosts.append(host)
g1 = inventory.groups.create(name='contains_all_hosts')
for host in hosts:
g1.hosts.add(host)
g2 = inventory.groups.create(name='contains_two_hosts')
for host in hosts[:2]:
g2.hosts.add(host)
for i in range(3):
expected_data = {
'contains_all_hosts': {'hosts': ['host{}'.format(i)], 'children': [], 'vars': {}},
}
if i < 2:
expected_data['contains_two_hosts'] = {'hosts': ['host{}'.format(i)], 'children': [], 'vars': {}}
assert inventory.get_script_data(slice_number=i + 1, slice_count=3) == expected_data
@pytest.mark.django_db
class TestActiveCount:

View File

@ -152,3 +152,50 @@ def test_event_processing_not_finished():
def test_event_model_undefined():
wj = WorkflowJob.objects.create(name='foobar', status='finished')
assert wj.event_processing_finished
@pytest.mark.django_db
class TestTaskImpact:
@pytest.fixture
def job_host_limit(self, job_template, inventory):
def r(hosts, forks):
for i in range(hosts):
inventory.hosts.create(name='foo' + str(i))
job = Job.objects.create(
name='fake-job',
launch_type='workflow',
job_template=job_template,
inventory=inventory,
forks=forks
)
return job
return r
def test_limit_task_impact(self, job_host_limit):
job = job_host_limit(5, 2)
assert job.task_impact == 2 + 1 # forks becomes constraint
def test_host_task_impact(self, job_host_limit):
job = job_host_limit(3, 5)
assert job.task_impact == 3 + 1 # hosts becomes constraint
def test_shard_task_impact(self, slice_job_factory):
# factory creates on host per slice
workflow_job = slice_job_factory(3, jt_kwargs={'forks': 50}, spawn=True)
# arrange the jobs by their number
jobs = [None for i in range(3)]
for node in workflow_job.workflow_nodes.all():
jobs[node.job.job_slice_number - 1] = node.job
# Even distribution - all jobs run on 1 host
assert [
len(jobs[0].inventory.get_script_data(slice_number=i + 1, slice_count=3)['all']['hosts'])
for i in range(3)
] == [1, 1, 1]
assert [job.task_impact for job in jobs] == [2, 2, 2] # plus one base task impact
# Uneven distribution - first job takes the extra host
jobs[0].inventory.hosts.create(name='remainder_foo')
assert [
len(jobs[0].inventory.get_script_data(slice_number=i + 1, slice_count=3)['all']['hosts'])
for i in range(3)
] == [2, 1, 1]
assert [job.task_impact for job in jobs] == [3, 2, 2]