mirror of
https://github.com/ansible/awx.git
synced 2026-05-07 01:17:37 -02:30
add assertion to test on number of queries made (#9)
* add assertions around access to resulting job there is a problem getting the job w/ the user that launched it add more assertions to bulk tests (#11) dig more into the results and assert on results also, use a fixture that already implemented the "max queries" thing fix ansible collection sanity tests (#12)
This commit is contained in:
@@ -6,23 +6,15 @@ from awx.api.versioning import reverse
|
||||
|
||||
import json
|
||||
from contextlib import contextmanager
|
||||
from django.test.utils import CaptureQueriesContext
|
||||
from django.db import connections
|
||||
|
||||
from awx.main.models.jobs import JobTemplate
|
||||
from awx.main.models import Organization, Inventory
|
||||
|
||||
|
||||
@contextmanager
|
||||
def withAssertNumQueriesLessThan(num_queries):
|
||||
with CaptureQueriesContext(connections['default']) as context:
|
||||
yield
|
||||
fail_msg = f"\r\n{json.dumps(context.captured_queries, indent=4)}"
|
||||
assert len(context.captured_queries) < num_queries, fail_msg
|
||||
from awx.main.models import Organization, Inventory, WorkflowJob
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('num_hosts, num_queries', [(9, 15), (99, 20), (999, 30)])
|
||||
def test_bulk_host_create_num_queries(organization, inventory, post, get, user, num_hosts, num_queries):
|
||||
def test_bulk_host_create_num_queries(organization, inventory, post, get, user, num_hosts, num_queries, django_assert_max_num_queries):
|
||||
'''
|
||||
If I am a...
|
||||
org admin
|
||||
@@ -45,7 +37,7 @@ def test_bulk_host_create_num_queries(organization, inventory, post, get, user,
|
||||
|
||||
for u in [org_admin, inventory_admin, org_inv_admin, superuser]:
|
||||
hosts = [{'name': uuid4()} for i in range(num_hosts)]
|
||||
with withAssertNumQueriesLessThan(num_queries):
|
||||
with django_assert_max_num_queries(num_queries):
|
||||
bulk_host_create_response = post(reverse('api:bulk_host_create'), {'inventory': inventory.id, 'hosts': hosts}, u, expect=201).data
|
||||
assert len(bulk_host_create_response['hosts']) == len(hosts), f"unexpected number of hosts created for user {u}"
|
||||
|
||||
@@ -89,20 +81,36 @@ def test_bulk_host_create_rbac(organization, inventory, post, get, user):
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_bulk_job_launch(job_template, organization, inventory, project, credential, post, get, user):
|
||||
@pytest.mark.parametrize('num_jobs, num_queries', [(9, 30), (99, 35)])
|
||||
def test_bulk_job_launch_queries(job_template, organization, inventory, project, post, get, user, num_jobs, num_queries, django_assert_max_num_queries):
|
||||
'''
|
||||
if I have access to the unified job template
|
||||
... I can launch the bulk job
|
||||
... and the number of queries should NOT scale with the number of jobs
|
||||
'''
|
||||
normal_user = user('normal_user', False)
|
||||
jt = JobTemplate.objects.create(name='my-jt', inventory=inventory, project=project, playbook='helloworld.yml')
|
||||
jt.save()
|
||||
org_admin = user('org_admin', False)
|
||||
jt = JobTemplate.objects.create(name='my-jt', ask_inventory_on_launch=True, project=project, playbook='helloworld.yml')
|
||||
organization.member_role.members.add(normal_user)
|
||||
organization.admin_role.members.add(org_admin)
|
||||
jt.execute_role.members.add(normal_user)
|
||||
bulk_job_launch_response = post(
|
||||
reverse('api:bulk_job_launch'), {'name': 'Bulk Job Launch', 'jobs': [{'unified_job_template': jt.id}]}, normal_user, expect=201
|
||||
).data
|
||||
assert bulk_job_launch_response['id'] == 1
|
||||
inventory.use_role.members.add(normal_user)
|
||||
jt.save()
|
||||
inventory.save()
|
||||
jobs = [{'unified_job_template': jt.id, 'inventory': inventory.id} for _ in range(num_jobs)]
|
||||
|
||||
# This is not working, we need to figure that out if we want to include tests for more jobs
|
||||
# with mock.patch('awx.api.serializers.settings.BULK_JOB_MAX_LAUNCH', num_jobs + 1):
|
||||
with django_assert_max_num_queries(num_queries):
|
||||
bulk_job_launch_response = post(reverse('api:bulk_job_launch'), {'name': 'Bulk Job Launch', 'jobs': jobs}, normal_user, expect=201).data
|
||||
|
||||
for u in (org_admin,): # TODO normal_user not working because launched_by not getting set in the tests...does happen in real request
|
||||
bulk_job = get(bulk_job_launch_response['url'], u, expect=200).data
|
||||
assert organization.id == bulk_job['summary_fields']['organization']['id']
|
||||
resp = get(bulk_job_launch_response['related']['workflow_nodes'], u)
|
||||
assert resp.data['count'] == num_jobs
|
||||
for item in resp.data['results']:
|
||||
assert item["unified_job_template"] == jt.id
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@@ -167,7 +175,9 @@ def test_bulk_job_launch_specific_org(job_template, organization, inventory, pro
|
||||
bulk_job_launch_response = post(
|
||||
reverse('api:bulk_job_launch'), {'name': 'Bulk Job Launch', 'jobs': [{'unified_job_template': jt.id}], 'organization': org1.id}, normal_user, expect=201
|
||||
).data
|
||||
assert bulk_job_launch_response['id'] == 1
|
||||
bulk_job_id = bulk_job_launch_response['id']
|
||||
bulk_job_obj = WorkflowJob.objects.filter(id=bulk_job_id, is_bulk_job=True).first()
|
||||
assert org1.id == bulk_job_obj.organization.id
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
|
||||
Reference in New Issue
Block a user