Files
awx/awx/main/tests/functional/models/test_unified_job.py
Alan Rominger ac99708952 Serializer RBAC and structure review changes (#17)
* Bulk launch serializer RBAC and code structure review

Use WJ node as base in bulk job launch child
  remove fields we get for free this way

Minor translation marking

Consolidate bulk API permission methods
  split out permission check for each UJT type

Code consolidation for org check method

add a save before starting the workflow job
2023-03-08 12:58:12 -05:00

285 lines
12 KiB
Python

import itertools
import pytest
# CRUM
from crum import impersonate
# Django
from django.contrib.contenttypes.models import ContentType
# AWX
from awx.main.models import UnifiedJobTemplate, Job, JobTemplate, WorkflowJobTemplate, WorkflowApprovalTemplate, Project, WorkflowJob, Schedule, Credential
from awx.api.versioning import reverse
from awx.main.constants import JOB_VARIABLE_PREFIXES
@pytest.mark.django_db
def test_subclass_types():
assert set(UnifiedJobTemplate._submodels_with_roles()) == set(
[
ContentType.objects.get_for_model(JobTemplate).id,
ContentType.objects.get_for_model(Project).id,
ContentType.objects.get_for_model(WorkflowJobTemplate).id,
ContentType.objects.get_for_model(WorkflowApprovalTemplate).id,
]
)
@pytest.mark.django_db
def test_soft_unique_together(post, project, admin_user):
"""This tests that SOFT_UNIQUE_TOGETHER restrictions are applied correctly."""
jt1 = JobTemplate.objects.create(name='foo_jt', project=project)
assert jt1.organization == project.organization
r = post(
url=reverse('api:job_template_list'),
data=dict(name='foo_jt', project=project.id, ask_inventory_on_launch=True, playbook='helloworld.yml'), # same as first
user=admin_user,
expect=400,
)
assert 'combination already exists' in str(r.data)
@pytest.mark.django_db
class TestCreateUnifiedJob:
"""
Ensure that copying a job template to a job handles many to many field copy
"""
def test_many_to_many(self, mocker, job_template_labels):
jt = job_template_labels
_get_unified_job_field_names = mocker.patch('awx.main.models.jobs.JobTemplate._get_unified_job_field_names', return_value=['labels'])
j = jt.create_unified_job()
_get_unified_job_field_names.assert_called_with()
assert j.labels.all().count() == 2
assert j.labels.all()[0] == jt.labels.all()[0]
assert j.labels.all()[1] == jt.labels.all()[1]
'''
Ensure that data is looked for in parameter list before looking at the object
'''
def test_many_to_many_kwargs(self, mocker, job_template_labels):
jt = job_template_labels
_get_unified_job_field_names = mocker.patch('awx.main.models.jobs.JobTemplate._get_unified_job_field_names', return_value=['labels'])
jt.create_unified_job()
_get_unified_job_field_names.assert_called_with()
'''
Ensure that credentials m2m field is copied to new relaunched job
'''
def test_job_relaunch_copy_vars(self, machine_credential, inventory, deploy_jobtemplate, post, mocker, net_credential):
job_with_links = Job(name='existing-job', inventory=inventory)
job_with_links.job_template = deploy_jobtemplate
job_with_links.limit = "my_server"
job_with_links.save()
job_with_links.credentials.add(machine_credential)
job_with_links.credentials.add(net_credential)
second_job = job_with_links.copy_unified_job()
# Check that job data matches the original variables
assert [c.pk for c in second_job.credentials.all()] == [machine_credential.pk, net_credential.pk]
assert second_job.inventory == job_with_links.inventory
assert second_job.limit == 'my_server'
assert net_credential in second_job.credentials.all()
def test_job_relaunch_modifed_jt(self, jt_linked):
# Replace all credentials with a new one of same type
new_creds = []
for cred in jt_linked.credentials.all():
new_creds.append(Credential.objects.create(name=str(cred.name) + '_new', credential_type=cred.credential_type, inputs=cred.inputs))
job = jt_linked.create_unified_job()
jt_linked.credentials.clear()
jt_linked.credentials.add(*new_creds)
relaunched_job = job.copy_unified_job()
assert set(relaunched_job.credentials.all()) == set(new_creds)
@pytest.mark.django_db
class TestMetaVars:
"""
Extension of unit tests with same class name
"""
def test_deleted_user(self, admin_user):
job = Job.objects.create(name='job', created_by=admin_user)
job.save()
user_vars = ['_'.join(x) for x in itertools.product(['tower', 'awx'], ['user_name', 'user_id', 'user_email', 'user_first_name', 'user_last_name'])]
for key in user_vars:
assert key in job.awx_meta_vars()
# deleted user is hard to simulate as this test occurs within one transaction
job = Job.objects.get(pk=job.id)
job.created_by_id = 999999999
for key in user_vars:
assert key not in job.awx_meta_vars()
def test_workflow_job_metavars(self, admin_user, job_template):
workflow_job = WorkflowJob.objects.create(name='workflow-job', created_by=admin_user)
node = workflow_job.workflow_nodes.create(unified_job_template=job_template)
job_kv = node.get_job_kwargs()
job = node.unified_job_template.create_unified_job(**job_kv)
workflow_job.workflow_nodes.create(job=job)
data = job.awx_meta_vars()
for name in JOB_VARIABLE_PREFIXES:
assert data['{}_user_id'.format(name)] == admin_user.id
assert data['{}_user_name'.format(name)] == admin_user.username
assert data['{}_workflow_job_id'.format(name)] == workflow_job.pk
assert data['{}_workflow_job_launch_type'.format(name)] == workflow_job.launch_type
def test_scheduled_job_metavars(self, job_template, admin_user):
schedule = Schedule.objects.create(name='job-schedule', rrule='DTSTART:20171129T155939z\nFREQ=MONTHLY', unified_job_template=job_template)
job = Job.objects.create(name='fake-job', launch_type='workflow', schedule=schedule, job_template=job_template)
data = job.awx_meta_vars()
for name in JOB_VARIABLE_PREFIXES:
assert data['{}_schedule_id'.format(name)] == schedule.pk
assert '{}_user_name'.format(name) not in data
def test_scheduled_workflow_job_node_metavars(self, workflow_job_template):
schedule = Schedule.objects.create(name='job-schedule', rrule='DTSTART:20171129T155939z\nFREQ=MONTHLY', unified_job_template=workflow_job_template)
workflow_job = WorkflowJob.objects.create(name='workflow-job', workflow_job_template=workflow_job_template, schedule=schedule)
job = Job.objects.create(launch_type='workflow')
workflow_job.workflow_nodes.create(job=job)
result_hash = {}
for name in JOB_VARIABLE_PREFIXES:
result_hash['{}_job_id'.format(name)] = job.id
result_hash['{}_job_launch_type'.format(name)] = 'workflow'
result_hash['{}_workflow_job_name'.format(name)] = 'workflow-job'
result_hash['{}_workflow_job_id'.format(name)] = workflow_job.id
result_hash['{}_workflow_job_launch_type'.format(name)] = workflow_job.launch_type
result_hash['{}_parent_job_schedule_id'.format(name)] = schedule.id
result_hash['{}_parent_job_schedule_name'.format(name)] = 'job-schedule'
assert job.awx_meta_vars() == result_hash
@pytest.mark.django_db
def test_event_processing_not_finished():
job = Job.objects.create(emitted_events=2, status='finished')
job.event_class.objects.create(job=job)
assert not job.event_processing_finished
@pytest.mark.django_db
def test_event_model_undefined():
wj = WorkflowJob.objects.create(name='foobar', status='finished')
assert wj.event_processing_finished
@pytest.mark.django_db
class TestUpdateParentInstance:
def test_template_modified_by_not_changed_on_launch(self, job_template, alice):
# jobs are launched as a particular user, user not saved as JT modified_by
with impersonate(alice):
assert job_template.current_job is None
assert job_template.status == 'never updated'
assert job_template.modified_by is None
job = job_template.jobs.create(status='new')
job.status = 'pending'
job.save()
assert job_template.current_job == job
assert job_template.status == 'pending'
assert job_template.modified_by is None
def check_update(self, project, status):
pu_check = project.project_updates.create(job_type='check', status='new', launch_type='manual')
pu_check.status = 'running'
pu_check.save()
# these should always be updated for a running check job
assert project.current_job == pu_check
assert project.status == 'running'
pu_check.status = status
pu_check.save()
return pu_check
def run_update(self, project, status):
pu_run = project.project_updates.create(job_type='run', status='new', launch_type='sync')
pu_run.status = 'running'
pu_run.save()
pu_run.status = status
pu_run.save()
return pu_run
def test_project_update_fails_project(self, project):
# This is the normal server auto-update on create behavior
assert project.status == 'never updated'
pu_check = self.check_update(project, status='failed')
assert project.last_job == pu_check
assert project.status == 'failed'
def test_project_sync_with_skip_update(self, project):
# syncs may be permitted to change project status
# only if prior status is "never updated"
assert project.status == 'never updated'
pu_run = self.run_update(project, status='successful')
assert project.last_job == pu_run
assert project.status == 'successful'
def test_project_sync_does_not_fail_project(self, project):
# Accurate normal server behavior, creating a project auto-updates
# have to create update, otherwise will fight with last_job logic
assert project.status == 'never updated'
pu_check = self.check_update(project, status='successful')
assert project.status == 'successful'
self.run_update(project, status='failed')
assert project.last_job == pu_check
assert project.status == 'successful'
@pytest.mark.django_db
class TestTaskImpact:
@pytest.fixture
def job_host_limit(self, job_template, inventory):
def r(hosts, forks):
for i in range(hosts):
inventory.hosts.create(name='foo' + str(i))
job = Job.objects.create(name='fake-job', launch_type='workflow', job_template=job_template, inventory=inventory, forks=forks)
return job
return r
def test_limit_task_impact(self, job_host_limit, run_computed_fields_right_away):
job = job_host_limit(5, 2)
job.inventory.update_computed_fields()
job.task_impact = job._get_task_impact()
assert job.inventory.total_hosts == 5
assert job.task_impact == 2 + 1 # forks becomes constraint
def test_host_task_impact(self, job_host_limit, run_computed_fields_right_away):
job = job_host_limit(3, 5)
job.inventory.update_computed_fields()
job.task_impact = job._get_task_impact()
assert job.task_impact == 3 + 1 # hosts becomes constraint
def test_shard_task_impact(self, slice_job_factory, run_computed_fields_right_away):
# factory creates on host per slice
workflow_job = slice_job_factory(3, jt_kwargs={'forks': 50}, spawn=True)
# arrange the jobs by their number
jobs = [None for i in range(3)]
for node in workflow_job.workflow_nodes.all():
jobs[node.job.job_slice_number - 1] = node.job
# Even distribution - all jobs run on 1 host
assert [len(jobs[0].inventory.get_script_data(slice_number=i + 1, slice_count=3)['all']['hosts']) for i in range(3)] == [1, 1, 1]
jobs[0].inventory.update_computed_fields()
for j in jobs:
j.task_impact = j._get_task_impact()
assert [job.task_impact for job in jobs] == [2, 2, 2] # plus one base task impact
# Uneven distribution - first job takes the extra host
jobs[0].inventory.hosts.create(name='remainder_foo')
assert [len(jobs[0].inventory.get_script_data(slice_number=i + 1, slice_count=3)['all']['hosts']) for i in range(3)] == [2, 1, 1]
jobs[0].inventory.update_computed_fields()
# recalculate task_impact
jobs[0].task_impact = jobs[0]._get_task_impact()
assert [job.task_impact for job in jobs] == [3, 2, 2]