Files
awx/awx/main/tests/functional/test_migrations.py
AlanCoding 55a7591f89 Resolve actions conflicts and delete unwatned files
Bump migrations and delete some files

Resolve remaining conflicts

Fix requirements

Flake8 fixes

Prefer devel changes for schema

Use correct versions

Remove sso connected stuff

Update to modern actions and collection fixes

Remove unwated alias

Version problems in actions

Fix more versioning problems

Update warning string

Messed it up again

Shorten exception

More removals

Remove pbr license

Remove tests deleted in devel

Remove unexpected files

Remove some content missed in the rebase

Use sleep_task from devel

Restore devel live conftest file

Add in settings that got missed

Prefer devel version of collection test

Finish repairing .github path

Remove unintended test file duplication

Undo more unintended file additions
2025-09-17 10:23:19 -04:00

170 lines
9.0 KiB
Python

import pytest
from django_test_migrations.plan import all_migrations, nodes_to_tuples
from django.utils.timezone import now
"""
Most tests that live in here can probably be deleted at some point. They are mainly
for a developer. When AWX versions that users upgrade from falls out of support that
is when migration tests can be deleted. This is also a good time to squash. Squashing
will likely mess with the tests that live here.
The smoke test should be kept in here. The smoke test ensures that our migrations
continue to work when sqlite is the backing database (vs. the default DB of postgres).
"""
@pytest.mark.django_db
class TestMigrationSmoke:
def test_happy_path(self, migrator):
"""
This smoke test runs all the migrations.
Example of how to use django-test-migration to invoke particular migration(s)
while weaving in object creation and assertions.
Note that this is more than just an example. It is a smoke test because it runs ALL
the migrations. Our "normal" unit tests subvert the migrations running because it is slow.
"""
migration_nodes = all_migrations('default')
migration_tuples = nodes_to_tuples(migration_nodes)
final_migration = migration_tuples[-1]
migrator.apply_initial_migration(('main', None))
# I just picked a newish migration at the time of writing this.
# If someone from the future finds themselves here because the are squashing migrations
# it is fine to change the 0180_... below to some other newish migration
intermediate_state = migrator.apply_tested_migration(('main', '0180_add_hostmetric_fields'))
Instance = intermediate_state.apps.get_model('main', 'Instance')
# Create any old object in the database
Instance.objects.create(hostname='foobar', node_type='control')
final_state = migrator.apply_tested_migration(final_migration)
Instance = final_state.apps.get_model('main', 'Instance')
assert Instance.objects.filter(hostname='foobar').count() == 1
def test_receptor_address(self, migrator):
old_state = migrator.apply_initial_migration(('main', '0188_add_bitbucket_dc_webhook'))
Instance = old_state.apps.get_model('main', 'Instance')
for i in range(3):
Instance.objects.create(hostname=f'foobar{i}', node_type='hop')
foo = Instance.objects.create(hostname='foo', node_type='execution', listener_port=1234)
bar = Instance.objects.create(hostname='bar', node_type='execution', listener_port=None)
bar.peers.add(foo)
new_state = migrator.apply_tested_migration(
('main', '0189_inbound_hop_nodes'),
)
Instance = new_state.apps.get_model('main', 'Instance')
ReceptorAddress = new_state.apps.get_model('main', 'ReceptorAddress')
# We can now test how our migration worked, new field is there:
assert ReceptorAddress.objects.filter(address='foo', port=1234).count() == 1
assert not ReceptorAddress.objects.filter(address='bar').exists()
bar = Instance.objects.get(hostname='bar')
fooaddr = ReceptorAddress.objects.get(address='foo')
bar_peers = bar.peers.all()
assert len(bar_peers) == 1
assert fooaddr in bar_peers
def test_migrate_DAB_RBAC(self, migrator):
old_state = migrator.apply_initial_migration(('main', '0190_alter_inventorysource_source_and_more'))
Organization = old_state.apps.get_model('main', 'Organization')
Team = old_state.apps.get_model('main', 'Team')
User = old_state.apps.get_model('auth', 'User')
org = Organization.objects.create(name='arbitrary-org', created=now(), modified=now())
user = User.objects.create(username='random-user')
org.read_role.members.add(user)
org.member_role.members.add(user)
team = Team.objects.create(name='arbitrary-team', organization=org, created=now(), modified=now())
team.member_role.members.add(user)
new_state = migrator.apply_tested_migration(
('main', '0192_custom_roles'),
)
RoleUserAssignment = new_state.apps.get_model('dab_rbac', 'RoleUserAssignment')
assert RoleUserAssignment.objects.filter(user=user.id, object_id=org.id).exists()
assert RoleUserAssignment.objects.filter(user=user.id, role_definition__name='Organization Member', object_id=org.id).exists()
assert RoleUserAssignment.objects.filter(user=user.id, role_definition__name='Team Member', object_id=team.id).exists()
# Regression testing for bug that comes from current vs past models mismatch
RoleDefinition = new_state.apps.get_model('dab_rbac', 'RoleDefinition')
assert not RoleDefinition.objects.filter(name='Organization Organization Admin').exists()
# Test special cases in managed role creation
assert not RoleDefinition.objects.filter(name='Organization Team Admin').exists()
assert not RoleDefinition.objects.filter(name='Organization InstanceGroup Admin').exists()
# Test that a removed EE model permission has been deleted
new_state = migrator.apply_tested_migration(
('main', '0195_EE_permissions'),
)
DABPermission = new_state.apps.get_model('dab_rbac', 'DABPermission')
assert not DABPermission.objects.filter(codename='view_executionenvironment').exists()
# Test create a Project with a duplicate name
Organization = new_state.apps.get_model('main', 'Organization')
Project = new_state.apps.get_model('main', 'Project')
WorkflowJobTemplate = new_state.apps.get_model('main', 'WorkflowJobTemplate')
org = Organization.objects.create(name='duplicate-obj-organization', created=now(), modified=now())
proj_ids = []
for i in range(3):
proj = Project.objects.create(name='duplicate-project-name', organization=org, created=now(), modified=now())
proj_ids.append(proj.id)
# Test create WorkflowJobTemplate with duplicate names
wfjt_ids = []
for i in range(3):
wfjt = WorkflowJobTemplate.objects.create(name='duplicate-workflow-name', organization=org, created=now(), modified=now())
wfjt_ids.append(wfjt.id)
# The uniqueness rules will not apply to InventorySource
Inventory = new_state.apps.get_model('main', 'Inventory')
InventorySource = new_state.apps.get_model('main', 'InventorySource')
inv = Inventory.objects.create(name='migration-test-inv', organization=org, created=now(), modified=now())
InventorySource.objects.create(name='migration-test-src', source='file', inventory=inv, organization=org, created=now(), modified=now())
# Apply migration 0200 which should rename duplicates
new_state = migrator.apply_tested_migration(
('main', '0200_template_name_constraint'),
)
# Get the models from the new state for verification
Project = new_state.apps.get_model('main', 'Project')
WorkflowJobTemplate = new_state.apps.get_model('main', 'WorkflowJobTemplate')
InventorySource = new_state.apps.get_model('main', 'InventorySource')
for i, proj_id in enumerate(proj_ids):
proj = Project.objects.get(id=proj_id)
if i == 0:
assert proj.name == 'duplicate-project-name'
else:
assert proj.name != 'duplicate-project-name'
assert proj.name.startswith('duplicate-project-name')
# Verify WorkflowJobTemplate duplicates are renamed
for i, wfjt_id in enumerate(wfjt_ids):
wfjt = WorkflowJobTemplate.objects.get(id=wfjt_id)
if i == 0:
assert wfjt.name == 'duplicate-workflow-name'
else:
assert wfjt.name != 'duplicate-workflow-name'
assert wfjt.name.startswith('duplicate-workflow-name')
# The inventory source had this field set to avoid the constrains
inv_src = InventorySource.objects.get(name='migration-test-src')
assert inv_src.org_unique is False
for proj in Project.objects.all():
assert proj.org_unique is True
# Piggyback test for the new credential types
validate_exists = ['GitHub App Installation Access Token Lookup', 'Terraform backend configuration']
CredentialType = new_state.apps.get_model('main', 'CredentialType')
# simulate an upgrade by deleting existing types with these names
for expected_name in validate_exists:
ct = CredentialType.objects.filter(name=expected_name).first()
if ct:
ct.delete()
new_state = migrator.apply_tested_migration(
('main', '0201_create_managed_creds'),
)
CredentialType = new_state.apps.get_model('main', 'CredentialType')
for expected_name in validate_exists:
assert CredentialType.objects.filter(
name=expected_name
).exists(), f'Could not find {expected_name} credential type name, all names: {list(CredentialType.objects.values_list("name", flat=True))}'