Files
awx/awx/main/tests/functional/test_bulk.py
jessicamack cbbd683720 AAP-70294: Migrate Unit Test Candidates from ATF to Upstream (#16385)
* add converted atf tests

* fix bulk settings test
2026-05-04 15:07:46 +00:00

632 lines
28 KiB
Python

import pytest
from uuid import uuid4
from awx.api.versioning import reverse
from awx.main.models.jobs import JobTemplate
from awx.main.models import Organization, Inventory, WorkflowJob, ExecutionEnvironment, Host
from awx.main.scheduler import TaskManager
from django.test import override_settings
@pytest.mark.django_db
@pytest.mark.parametrize('num_hosts, num_queries', [(1, 15), (10, 15)])
def test_bulk_host_create_num_queries(organization, inventory, post, get, user, num_hosts, num_queries, django_assert_max_num_queries):
'''
If I am a...
org admin
inventory admin at org level
admin of a particular inventory
superuser
Bulk Host create should take under a certain number of queries
'''
inventory.organization = organization
inventory_admin = user('inventory_admin', False)
org_admin = user('org_admin', False)
org_inv_admin = user('org_admin', False)
superuser = user('admin', True)
for u in [org_admin, org_inv_admin, inventory_admin]:
organization.member_role.members.add(u)
organization.admin_role.members.add(org_admin)
organization.inventory_admin_role.members.add(org_inv_admin)
inventory.admin_role.members.add(inventory_admin)
for u in [org_admin, inventory_admin, org_inv_admin, superuser]:
hosts = [{'name': uuid4()} for i in range(num_hosts)]
with django_assert_max_num_queries(num_queries):
bulk_host_create_response = post(reverse('api:bulk_host_create'), {'inventory': inventory.id, 'hosts': hosts}, u, expect=201).data
assert len(bulk_host_create_response['hosts']) == len(hosts), f"unexpected number of hosts created for user {u}"
@pytest.mark.django_db
def test_bulk_host_create_rbac(organization, inventory, post, get, user):
'''
If I am a...
org admin
inventory admin at org level
admin of a particular invenotry
... I can bulk add hosts
Everyone else cannot
'''
inventory.organization = organization
inventory_admin = user('inventory_admin', False)
org_admin = user('org_admin', False)
org_inv_admin = user('org_admin', False)
auditor = user('auditor', False)
member = user('member', False)
use_inv_member = user('member', False)
for u in [org_admin, org_inv_admin, auditor, member, inventory_admin, use_inv_member]:
organization.member_role.members.add(u)
organization.admin_role.members.add(org_admin)
organization.inventory_admin_role.members.add(org_inv_admin)
inventory.admin_role.members.add(inventory_admin)
inventory.use_role.members.add(use_inv_member)
organization.auditor_role.members.add(auditor)
for indx, u in enumerate([org_admin, inventory_admin, org_inv_admin]):
bulk_host_create_response = post(
reverse('api:bulk_host_create'), {'inventory': inventory.id, 'hosts': [{'name': f'foobar-{indx}'}]}, u, expect=201
).data
assert len(bulk_host_create_response['hosts']) == 1, f"unexpected number of hosts created for user {u}"
assert Host.objects.filter(inventory__id=inventory.id)[0].name == 'foobar-0'
for indx, u in enumerate([member, auditor, use_inv_member]):
bulk_host_create_response = post(
reverse('api:bulk_host_create'), {'inventory': inventory.id, 'hosts': [{'name': f'foobar2-{indx}'}]}, u, expect=400
).data
assert bulk_host_create_response['__all__'][0] == f'Inventory with id {inventory.id} not found or lack permissions to add hosts.'
@pytest.mark.django_db
@pytest.mark.parametrize('num_jobs, num_queries', [(1, 25), (10, 25)])
def test_bulk_job_launch_queries(job_template, organization, inventory, project, post, get, user, num_jobs, num_queries, django_assert_max_num_queries):
'''
if I have access to the unified job template
... I can launch the bulk job
... and the number of queries should NOT scale with the number of jobs
'''
normal_user = user('normal_user', False)
org_admin = user('org_admin', False)
jt = JobTemplate.objects.create(name='my-jt', ask_inventory_on_launch=True, project=project, playbook='helloworld.yml')
organization.member_role.members.add(normal_user)
organization.admin_role.members.add(org_admin)
jt.execute_role.members.add(normal_user)
inventory.use_role.members.add(normal_user)
jt.save()
inventory.save()
jobs = [{'unified_job_template': jt.id, 'inventory': inventory.id} for _ in range(num_jobs)]
# This is not working, we need to figure that out if we want to include tests for more jobs
# with mock.patch('awx.api.serializers.settings.BULK_JOB_MAX_LAUNCH', num_jobs + 1):
with django_assert_max_num_queries(num_queries):
bulk_job_launch_response = post(reverse('api:bulk_job_launch'), {'name': 'Bulk Job Launch', 'jobs': jobs}, normal_user, expect=201).data
# Run task manager so the workflow job nodes actually spawn
TaskManager().schedule()
for u in (org_admin, normal_user):
bulk_job = get(bulk_job_launch_response['url'], u, expect=200).data
assert organization.id == bulk_job['summary_fields']['organization']['id']
resp = get(bulk_job_launch_response['related']['workflow_nodes'], u)
assert resp.data['count'] == num_jobs
for item in resp.data['results']:
assert item["unified_job_template"] == jt.id
assert item["inventory"] == inventory.id
@pytest.mark.django_db
def test_bulk_job_launch_no_access_to_job_template(job_template, organization, inventory, project, credential, post, get, user):
'''
if I don't have access to the unified job templare
... I can't launch the bulk job
'''
normal_user = user('normal_user', False)
jt = JobTemplate.objects.create(name='my-jt', inventory=inventory, project=project, playbook='helloworld.yml')
jt.save()
organization.member_role.members.add(normal_user)
bulk_job_launch_response = post(
reverse('api:bulk_job_launch'), {'name': 'Bulk Job Launch', 'jobs': [{'unified_job_template': jt.id}]}, normal_user, expect=400
).data
assert bulk_job_launch_response['__all__'][0] == f'Job Templates {{{jt.id}}} not found or you don\'t have permissions to access it'
@pytest.mark.django_db
def test_bulk_job_launch_no_org_assigned(job_template, organization, inventory, project, credential, post, get, user):
'''
if I am not part of any organization...
... I can't launch the bulk job
'''
normal_user = user('normal_user', False)
jt = JobTemplate.objects.create(name='my-jt', inventory=inventory, project=project, playbook='helloworld.yml')
jt.save()
jt.execute_role.members.add(normal_user)
bulk_job_launch_response = post(
reverse('api:bulk_job_launch'), {'name': 'Bulk Job Launch', 'jobs': [{'unified_job_template': jt.id}]}, normal_user, expect=400
).data
assert bulk_job_launch_response['__all__'][0] == 'User not part of any organization, please assign an organization to assign to the bulk job'
@pytest.mark.django_db
def test_bulk_job_launch_multiple_org_assigned(job_template, organization, inventory, project, credential, post, get, user):
'''
if I am part of multiple organization...
and if I do not provide org at the launch time
... I can't launch the bulk job
'''
normal_user = user('normal_user', False)
org1 = Organization.objects.create(name='foo1')
org2 = Organization.objects.create(name='foo2')
org1.member_role.members.add(normal_user)
org2.member_role.members.add(normal_user)
jt = JobTemplate.objects.create(name='my-jt', inventory=inventory, project=project, playbook='helloworld.yml')
jt.save()
jt.execute_role.members.add(normal_user)
bulk_job_launch_response = post(
reverse('api:bulk_job_launch'), {'name': 'Bulk Job Launch', 'jobs': [{'unified_job_template': jt.id}]}, normal_user, expect=400
).data
assert bulk_job_launch_response['__all__'][0] == 'User has permission to multiple Organizations, please set one of them in the request'
@pytest.mark.django_db
def test_bulk_job_launch_specific_org(job_template, organization, inventory, project, credential, post, get, user):
'''
if I am part of multiple organization...
and if I provide org at the launch time
... I can launch the bulk job
'''
normal_user = user('normal_user', False)
org1 = Organization.objects.create(name='foo1')
org2 = Organization.objects.create(name='foo2')
org1.member_role.members.add(normal_user)
org2.member_role.members.add(normal_user)
jt = JobTemplate.objects.create(name='my-jt', inventory=inventory, project=project, playbook='helloworld.yml')
jt.save()
jt.execute_role.members.add(normal_user)
bulk_job_launch_response = post(
reverse('api:bulk_job_launch'), {'name': 'Bulk Job Launch', 'jobs': [{'unified_job_template': jt.id}], 'organization': org1.id}, normal_user, expect=201
).data
bulk_job_id = bulk_job_launch_response['id']
bulk_job_obj = WorkflowJob.objects.filter(id=bulk_job_id, is_bulk_job=True).first()
assert org1.id == bulk_job_obj.organization.id
@pytest.mark.django_db
def test_bulk_job_launch_inventory_no_access(job_template, organization, inventory, project, credential, post, get, user):
'''
if I don't have access to the inventory...
and if I try to use it at the launch time
... I can't launch the bulk job
'''
normal_user = user('normal_user', False)
org1 = Organization.objects.create(name='foo1')
org2 = Organization.objects.create(name='foo2')
jt = JobTemplate.objects.create(name='my-jt', inventory=inventory, project=project, playbook='helloworld.yml')
jt.save()
org1.member_role.members.add(normal_user)
inv = Inventory.objects.create(name='inv1', organization=org2)
jt.execute_role.members.add(normal_user)
bulk_job_launch_response = post(
reverse('api:bulk_job_launch'), {'name': 'Bulk Job Launch', 'jobs': [{'unified_job_template': jt.id, 'inventory': inv.id}]}, normal_user, expect=400
).data
assert bulk_job_launch_response['__all__'][0] == f'Inventories {{{inv.id}}} not found or you don\'t have permissions to access it'
@pytest.mark.django_db
def test_bulk_job_inventory_prompt(job_template, organization, inventory, project, credential, post, get, user):
'''
Job template has an inventory set as prompt_on_launch
and if I provide the inventory as a parameter in bulk job
... job uses that inventory
'''
normal_user = user('normal_user', False)
org1 = Organization.objects.create(name='foo1')
jt = JobTemplate.objects.create(name='my-jt', ask_inventory_on_launch=True, project=project, playbook='helloworld.yml')
jt.save()
org1.member_role.members.add(normal_user)
inv = Inventory.objects.create(name='inv1', organization=org1)
jt.execute_role.members.add(normal_user)
inv.use_role.members.add(normal_user)
bulk_job_launch_response = post(
reverse('api:bulk_job_launch'), {'name': 'Bulk Job Launch', 'jobs': [{'unified_job_template': jt.id, 'inventory': inv.id}]}, normal_user, expect=201
).data
bulk_job_id = bulk_job_launch_response['id']
node = WorkflowJob.objects.get(id=bulk_job_id).workflow_job_nodes.all().order_by('created')
assert inv.id == node[0].inventory.id
@pytest.mark.django_db
def test_bulk_job_set_all_prompt(job_template, organization, inventory, project, credentialtype_ssh, post, get, user):
'''
Job template has many fields set as prompt_on_launch
and if I provide all those fields as a parameter in bulk job
... job uses them
'''
normal_user = user('normal_user', False)
jt = JobTemplate.objects.create(
name='my-jt',
ask_inventory_on_launch=True,
ask_diff_mode_on_launch=True,
ask_job_type_on_launch=True,
ask_verbosity_on_launch=True,
ask_execution_environment_on_launch=True,
ask_forks_on_launch=True,
ask_job_slice_count_on_launch=True,
ask_timeout_on_launch=True,
ask_variables_on_launch=True,
ask_scm_branch_on_launch=True,
ask_limit_on_launch=True,
ask_skip_tags_on_launch=True,
ask_tags_on_launch=True,
project=project,
playbook='helloworld.yml',
)
jt.save()
organization.member_role.members.add(normal_user)
inv = Inventory.objects.create(name='inv1', organization=organization)
ee = ExecutionEnvironment.objects.create(name='test-ee', image='quay.io/foo/bar')
jt.execute_role.members.add(normal_user)
inv.use_role.members.add(normal_user)
bulk_job_launch_response = post(
reverse('api:bulk_job_launch'),
{
'name': 'Bulk Job Launch',
'jobs': [
{
'unified_job_template': jt.id,
'inventory': inv.id,
'diff_mode': True,
'job_type': 'check',
'verbosity': 3,
'execution_environment': ee.id,
'forks': 1,
'job_slice_count': 1,
'timeout': 200,
'extra_data': {'prompted_key': 'prompted_val'},
'scm_branch': 'non_dev',
'limit': 'kansas',
'skip_tags': 'foobar',
'job_tags': 'untagged',
}
],
},
normal_user,
expect=201,
).data
bulk_job_id = bulk_job_launch_response['id']
node = WorkflowJob.objects.get(id=bulk_job_id).workflow_job_nodes.all().order_by('created')
assert node[0].inventory.id == inv.id
assert node[0].diff_mode == True
assert node[0].job_type == 'check'
assert node[0].verbosity == 3
assert node[0].execution_environment.id == ee.id
assert node[0].forks == 1
assert node[0].job_slice_count == 1
assert node[0].timeout == 200
assert node[0].extra_data == {'prompted_key': 'prompted_val'}
assert node[0].scm_branch == 'non_dev'
assert node[0].limit == 'kansas'
assert node[0].skip_tags == 'foobar'
assert node[0].job_tags == 'untagged'
@pytest.mark.django_db
@pytest.mark.parametrize('num_hosts, num_queries', [(1, 70), (10, 150), (25, 250)])
def test_bulk_host_delete_num_queries(organization, inventory, post, get, user, num_hosts, num_queries, django_assert_max_num_queries):
'''
If I am a...
org admin
inventory admin at org level
admin of a particular inventory
superuser
Bulk Host delete should take under a certain number of queries
'''
users_list = setup_admin_users_list(organization, inventory, user)
for u in users_list:
hosts = [{'name': str(uuid4())} for i in range(num_hosts)]
with django_assert_max_num_queries(num_queries):
bulk_host_create_response = post(reverse('api:bulk_host_create'), {'inventory': inventory.id, 'hosts': hosts}, u, expect=201).data
assert len(bulk_host_create_response['hosts']) == len(hosts), f"unexpected number of hosts created for user {u}"
hosts_ids_created = get_inventory_hosts(get, inventory.id, u)
bulk_host_delete_response = post(reverse('api:bulk_host_delete'), {'hosts': hosts_ids_created}, u, expect=201).data
assert len(bulk_host_delete_response['hosts'].keys()) == len(hosts), f"unexpected number of hosts deleted for user {u}"
@pytest.mark.django_db
def test_bulk_host_delete_rbac(organization, inventory, post, get, user):
'''
If I am a...
org admin
inventory admin at org level
admin of a particular invenotry
... I can bulk delete hosts
Everyone else cannot
'''
admin_users_list = setup_admin_users_list(organization, inventory, user)
users_list = setup_none_admin_uses_list(organization, inventory, user)
for indx, u in enumerate(admin_users_list):
bulk_host_create_response = post(
reverse('api:bulk_host_create'), {'inventory': inventory.id, 'hosts': [{'name': f'foobar-{indx}'}]}, u, expect=201
).data
assert len(bulk_host_create_response['hosts']) == 1, f"unexpected number of hosts created for user {u}"
assert Host.objects.filter(inventory__id=inventory.id)[0].name == f'foobar-{indx}'
hosts_ids_created = get_inventory_hosts(get, inventory.id, u)
bulk_host_delete_response = post(reverse('api:bulk_host_delete'), {'hosts': hosts_ids_created}, u, expect=201).data
assert len(bulk_host_delete_response['hosts'].keys()) == 1, f"unexpected number of hosts deleted by user {u}"
for indx, create_u in enumerate(admin_users_list):
bulk_host_create_response = post(
reverse('api:bulk_host_create'), {'inventory': inventory.id, 'hosts': [{'name': f'foobar2-{indx}'}]}, create_u, expect=201
).data
print(bulk_host_create_response)
assert bulk_host_create_response['hosts'][0]['name'] == f'foobar2-{indx}'
hosts_ids_created = get_inventory_hosts(get, inventory.id, create_u)
print(f"Try to delete {hosts_ids_created}")
for delete_u in users_list:
bulk_host_delete_response = post(reverse('api:bulk_host_delete'), {'hosts': hosts_ids_created}, delete_u, expect=403).data
assert "Lack permissions to delete hosts from this inventory." in bulk_host_delete_response['inventories'].values()
@pytest.mark.django_db
def test_bulk_host_delete_from_multiple_inv(organization, inventory, post, get, user):
'''
If I am inventory admin at org level
Bulk Host delete should be enabled only on my inventory
'''
num_hosts = 10
inventory.organization = organization
# Create second inventory
inv2 = organization.inventories.create(name="second-test-inv")
inv2.organization = organization
admin2_user = user('inventory2_admin', False)
inv2.admin_role.members.add(admin2_user)
admin_user = user('inventory_admin', False)
inventory.admin_role.members.add(admin_user)
organization.member_role.members.add(admin_user)
organization.member_role.members.add(admin2_user)
hosts = [{'name': str(uuid4())} for i in range(num_hosts)]
hosts2 = [{'name': str(uuid4())} for i in range(num_hosts)]
# create hosts in each of the inventories
bulk_host_create_response = post(reverse('api:bulk_host_create'), {'inventory': inventory.id, 'hosts': hosts}, admin_user, expect=201).data
assert len(bulk_host_create_response['hosts']) == len(hosts), f"unexpected number of hosts created for user {admin_user}"
bulk_host_create_response2 = post(reverse('api:bulk_host_create'), {'inventory': inv2.id, 'hosts': hosts2}, admin2_user, expect=201).data
assert len(bulk_host_create_response2['hosts']) == len(hosts), f"unexpected number of hosts created for user {admin2_user}"
# get all hosts ids - from both inventories
hosts_ids_created = get_inventory_hosts(get, inventory.id, admin_user)
hosts_ids_created += get_inventory_hosts(get, inv2.id, admin2_user)
expected_error = "Lack permissions to delete hosts from this inventory."
# try to delete ALL hosts with admin user of inventory 1.
for inv_name, invadmin in zip([inv2.name, inventory.name], [admin_user, admin2_user]):
bulk_host_delete_response = post(reverse('api:bulk_host_delete'), {'hosts': hosts_ids_created}, invadmin, expect=403).data
result_message = bulk_host_delete_response['inventories'][inv_name]
assert result_message == expected_error, f"deleted hosts without permission by user {invadmin}"
def setup_admin_users_list(organization, inventory, user):
inventory.organization = organization
inventory_admin = user('inventory_admin', False)
org_admin = user('org_admin', False)
org_inv_admin = user('org_admin', False)
superuser = user('admin', True)
for u in [org_admin, org_inv_admin, inventory_admin]:
organization.member_role.members.add(u)
organization.admin_role.members.add(org_admin)
organization.inventory_admin_role.members.add(org_inv_admin)
inventory.admin_role.members.add(inventory_admin)
return [inventory_admin, org_inv_admin, superuser, org_admin]
def setup_none_admin_uses_list(organization, inventory, user):
inventory.organization = organization
auditor = user('auditor', False)
member = user('member', False)
use_inv_member = user('member', False)
for u in [auditor, member, use_inv_member]:
organization.member_role.members.add(u)
inventory.use_role.members.add(use_inv_member)
organization.auditor_role.members.add(auditor)
return [auditor, member, use_inv_member]
def get_inventory_hosts(get, inv_id, use_user):
data = get(reverse('api:inventory_hosts_list', kwargs={'pk': inv_id}), use_user, expect=200).data
results = [host['id'] for host in data['results']]
return results
@pytest.mark.django_db
def test_bulk_job_launch_respects_settings_limit(job_template, organization, inventory, project, post, patch, get, user):
"""Test that bulk job launch respects BULK_JOB_MAX_LAUNCH setting."""
normal_user = user('normal_user', False)
organization.member_role.members.add(normal_user)
jt = JobTemplate.objects.create(
name='bulk-test-jt',
ask_inventory_on_launch=True,
project=project,
playbook='helloworld.yml',
allow_simultaneous=True,
)
jt.execute_role.members.add(normal_user)
inventory.use_role.members.add(normal_user)
# Test with limit set to 3
with override_settings(BULK_JOB_MAX_LAUNCH=3):
# Attempt to launch 5 jobs when limit is 3 - should fail
jobs = [{'unified_job_template': jt.id, 'inventory': inventory.id} for _ in range(5)]
resp = post(
reverse('api:bulk_job_launch'),
{'name': 'Bulk Job Test', 'jobs': jobs},
normal_user,
expect=400,
)
assert 'Number of requested jobs exceeds system setting' in str(resp.data)
# Test with limit increased to 10
with override_settings(BULK_JOB_MAX_LAUNCH=10):
# Now launching 5 jobs should succeed
jobs = [{'unified_job_template': jt.id, 'inventory': inventory.id} for _ in range(5)]
resp = post(
reverse('api:bulk_job_launch'),
{'name': 'Bulk Job Test', 'jobs': jobs},
normal_user,
expect=201,
)
bulk_job = get(resp.data['url'], normal_user, expect=200).data
# Verify the workflow job was created
assert bulk_job['name'] == 'Bulk Job Test'
# Tests for BulkHostCreateSerializer duplicate detection optimization
@pytest.mark.django_db
def test_bulk_host_create_duplicate_within_batch(organization, inventory, post, user):
"""
Test that duplicate hostnames within the same batch are detected.
This tests the Counter-based duplicate detection logic.
"""
inventory.organization = organization
inv_admin = user('inventory_admin', False)
organization.member_role.members.add(inv_admin)
inventory.admin_role.members.add(inv_admin)
# Try to create hosts where 'duplicate-host' appears twice in the same batch
hosts = [
{'name': 'unique-host-1'},
{'name': 'duplicate-host'},
{'name': 'unique-host-2'},
{'name': 'duplicate-host'}, # Duplicate within batch
]
response = post(reverse('api:bulk_host_create'), {'inventory': inventory.id, 'hosts': hosts}, inv_admin, expect=400)
assert 'Hostnames must be unique in an inventory' in response.data['__all__'][0]
assert 'duplicate-host' in response.data['__all__'][0]
assert Host.objects.filter(inventory=inventory).count() == 0
@pytest.mark.django_db
def test_bulk_host_create_duplicate_against_existing(organization, inventory, post, user):
"""
Test that duplicate hostnames against existing inventory hosts are detected.
This tests the database query-based duplicate detection.
"""
inventory.organization = organization
inv_admin = user('inventory_admin', False)
organization.member_role.members.add(inv_admin)
inventory.admin_role.members.add(inv_admin)
Host.objects.create(name='existing-host-1', inventory=inventory)
Host.objects.create(name='existing-host-2', inventory=inventory)
# Try to create hosts where one already exists
hosts = [
{'name': 'new-host-1'},
{'name': 'existing-host-1'},
{'name': 'new-host-2'},
]
response = post(reverse('api:bulk_host_create'), {'inventory': inventory.id, 'hosts': hosts}, inv_admin, expect=400)
assert 'Hostnames must be unique in an inventory' in response.data['__all__'][0]
assert 'existing-host-1' in response.data['__all__'][0]
assert Host.objects.filter(inventory=inventory).count() == 2
@pytest.mark.django_db
def test_bulk_host_create_combined_duplicates(organization, inventory, post, user):
"""
Test detection of both batch-internal duplicates and duplicates against existing hosts.
"""
inventory.organization = organization
inventory_admin = user('inventory_admin', False)
organization.member_role.members.add(inventory_admin)
inventory.admin_role.members.add(inventory_admin)
Host.objects.create(name='existing-host', inventory=inventory)
# Try to create hosts with both types of duplicates
hosts = [
{'name': 'new-host'},
{'name': 'batch-duplicate'},
{'name': 'existing-host'},
{'name': 'batch-duplicate'},
]
response = post(reverse('api:bulk_host_create'), {'inventory': inventory.id, 'hosts': hosts}, inventory_admin, expect=400)
error_message = response.data['__all__'][0]
assert 'Hostnames must be unique in an inventory' in error_message
assert 'batch-duplicate' in error_message or 'existing-host' in error_message
@pytest.mark.django_db
def test_bulk_host_create_no_duplicates_success(organization, inventory, post, user):
"""
Test that hosts are created successfully when there are no duplicates.
"""
inventory.organization = organization
inventory_admin = user('inventory_admin', False)
organization.member_role.members.add(inventory_admin)
inventory.admin_role.members.add(inventory_admin)
Host.objects.create(name='existing-host-1', inventory=inventory)
Host.objects.create(name='existing-host-2', inventory=inventory)
# Create new hosts with unique names
hosts = [
{'name': 'new-host-1'},
{'name': 'new-host-2'},
{'name': 'new-host-3'},
]
response = post(reverse('api:bulk_host_create'), {'inventory': inventory.id, 'hosts': hosts}, inventory_admin, expect=201)
assert len(response.data['hosts']) == 3
assert Host.objects.filter(inventory=inventory).count() == 5
assert Host.objects.filter(inventory=inventory, name='new-host-1').exists()
assert Host.objects.filter(inventory=inventory, name='new-host-2').exists()
assert Host.objects.filter(inventory=inventory, name='new-host-3').exists()
@pytest.mark.django_db
def test_bulk_host_create_performance_large_inventory(organization, inventory, post, user, django_assert_max_num_queries):
"""
Test that duplicate detection is performant and doesn't load all hosts.
"""
inventory.organization = organization
inventory_admin = user('inventory_admin', False)
organization.member_role.members.add(inventory_admin)
inventory.admin_role.members.add(inventory_admin)
# Create 10k existing hosts to simulate a reasonably large inventory
from django.utils.timezone import now
_now = now()
existing_hosts = [Host(name=f'existing-host-{i}', inventory=inventory, created=_now, modified=_now) for i in range(10000)]
Host.objects.bulk_create(existing_hosts)
new_hosts = [{'name': f'new-host-{i}'} for i in range(10)]
# The number of queries should be bounded and not scale with inventory size
# This should be around 15-20 queries regardless of whether there are 10k or 500k+ existing hosts
with django_assert_max_num_queries(20):
response = post(reverse('api:bulk_host_create'), {'inventory': inventory.id, 'hosts': new_hosts}, inventory_admin, expect=201)
assert len(response.data['hosts']) == 10
assert Host.objects.filter(inventory=inventory).count() == 10010