mirror of
https://github.com/ansible/awx.git
synced 2026-05-20 07:17:40 -02:30
remove deprecated modules
This commit is contained in:
@@ -1,57 +0,0 @@
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
__metaclass__ = type
|
||||
|
||||
import pytest
|
||||
import json
|
||||
|
||||
from awx.main.models import Organization, Project, Inventory, Host, CredentialType, Credential, JobTemplate
|
||||
|
||||
|
||||
# warns based on password_management param, but not security issue
|
||||
@pytest.mark.django_db
|
||||
def test_receive_send_jt(run_module, admin_user, mocker, silence_deprecation):
|
||||
org = Organization.objects.create(name='SRtest')
|
||||
proj = Project.objects.create(
|
||||
name='SRtest',
|
||||
playbook_files=['debug.yml'],
|
||||
scm_type='git',
|
||||
scm_url='https://github.com/ansible/test-playbooks.git',
|
||||
organization=org,
|
||||
allow_override=True, # so we do not require playbooks populated
|
||||
)
|
||||
inv = Inventory.objects.create(name='SRtest', organization=org)
|
||||
Host.objects.create(name='SRtest', inventory=inv)
|
||||
ct = CredentialType.defaults['ssh']()
|
||||
ct.save()
|
||||
cred = Credential.objects.create(name='SRtest', credential_type=ct, organization=org)
|
||||
jt = JobTemplate.objects.create(name='SRtest', project=proj, inventory=inv, playbook='helloworld.yml')
|
||||
jt.credentials.add(cred)
|
||||
jt.admin_role.members.add(admin_user) # work around send/receive bug
|
||||
|
||||
# receive everything
|
||||
result = run_module('receive', dict(all=True), admin_user)
|
||||
|
||||
assert 'assets' in result, result
|
||||
assets = result['assets']
|
||||
assert not result.get('changed', True)
|
||||
assert set(a['asset_type'] for a in assets) == set(('organization', 'inventory', 'job_template', 'credential', 'project', 'user'))
|
||||
|
||||
# delete everything
|
||||
for obj in (jt, inv, proj, cred, org):
|
||||
obj.delete()
|
||||
|
||||
def fake_wait(self, pk, parent_pk=None, **kwargs):
|
||||
return {"changed": True}
|
||||
|
||||
# recreate everything
|
||||
with mocker.patch('sys.stdin.isatty', return_value=True):
|
||||
with mocker.patch('tower_cli.models.base.MonitorableResource.wait'):
|
||||
result = run_module('send', dict(assets=json.dumps(assets)), admin_user)
|
||||
|
||||
assert not result.get('failed'), result
|
||||
|
||||
new = JobTemplate.objects.get(name='SRtest')
|
||||
assert new.project.name == 'SRtest'
|
||||
assert new.inventory.name == 'SRtest'
|
||||
assert [cred.name for cred in new.credentials.all()] == ['SRtest']
|
||||
@@ -1,102 +0,0 @@
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
__metaclass__ = type
|
||||
|
||||
import pytest
|
||||
|
||||
from awx.main.models import WorkflowJobTemplate, JobTemplate, Project, InventorySource, Inventory, WorkflowJobTemplateNode
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_create_workflow_job_template(run_module, admin_user, organization, survey_spec, silence_deprecation):
|
||||
result = run_module(
|
||||
'workflow_template',
|
||||
{
|
||||
'name': 'foo-workflow',
|
||||
'organization': organization.name,
|
||||
'extra_vars': {'foo': 'bar', 'another-foo': {'barz': 'bar2'}},
|
||||
'survey': survey_spec,
|
||||
'survey_enabled': True,
|
||||
'state': 'present',
|
||||
},
|
||||
admin_user,
|
||||
)
|
||||
|
||||
wfjt = WorkflowJobTemplate.objects.get(name='foo-workflow')
|
||||
assert wfjt.extra_vars == '{"foo": "bar", "another-foo": {"barz": "bar2"}}'
|
||||
|
||||
result.pop('invocation', None)
|
||||
assert result == {"workflow_template": "foo-workflow", "state": "present", "id": wfjt.id, "changed": True} # TODO: remove after refactor
|
||||
|
||||
assert wfjt.organization_id == organization.id
|
||||
assert wfjt.survey_spec == survey_spec
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_with_nested_workflow(run_module, admin_user, organization, silence_deprecation):
|
||||
wfjt1 = WorkflowJobTemplate.objects.create(name='first', organization=organization)
|
||||
|
||||
result = run_module(
|
||||
'workflow_template',
|
||||
{'name': 'foo-workflow', 'organization': organization.name, 'schema': [{'workflow': wfjt1.name}], 'state': 'present'},
|
||||
admin_user,
|
||||
)
|
||||
assert not result.get('failed', False), result.get('msg', result)
|
||||
|
||||
wfjt = WorkflowJobTemplate.objects.get(name='foo-workflow')
|
||||
node = wfjt.workflow_nodes.first()
|
||||
assert node is not None
|
||||
assert node.unified_job_template == wfjt1
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_schema_with_branches(run_module, admin_user, organization, silence_deprecation):
|
||||
|
||||
proj = Project.objects.create(organization=organization, name='Ansible Examples')
|
||||
inv = Inventory.objects.create(organization=organization, name='test-inv')
|
||||
jt = JobTemplate.objects.create(project=proj, playbook='helloworld.yml', inventory=inv, name='Hello world')
|
||||
inv_src = InventorySource.objects.create(inventory=inv, name='AWS servers', source='ec2')
|
||||
|
||||
result = run_module(
|
||||
'workflow_template',
|
||||
{
|
||||
'name': 'foo-workflow',
|
||||
'organization': organization.name,
|
||||
'schema': [
|
||||
{
|
||||
'job_template': 'Hello world',
|
||||
'failure': [{'inventory_source': 'AWS servers', 'success': [{'project': 'Ansible Examples', 'always': [{'job_template': "Hello world"}]}]}],
|
||||
}
|
||||
],
|
||||
'state': 'present',
|
||||
},
|
||||
admin_user,
|
||||
)
|
||||
assert not result.get('failed', False), result.get('msg', result)
|
||||
|
||||
wfjt = WorkflowJobTemplate.objects.get(name='foo-workflow')
|
||||
root_nodes = wfjt.workflow_nodes.filter(
|
||||
**{
|
||||
'%ss_success__isnull' % WorkflowJobTemplateNode.__name__.lower(): True,
|
||||
'%ss_failure__isnull' % WorkflowJobTemplateNode.__name__.lower(): True,
|
||||
'%ss_always__isnull' % WorkflowJobTemplateNode.__name__.lower(): True,
|
||||
}
|
||||
)
|
||||
assert len(root_nodes) == 1
|
||||
node = root_nodes[0]
|
||||
assert node.unified_job_template == jt
|
||||
second = node.failure_nodes.first()
|
||||
assert second.unified_job_template == inv_src
|
||||
third = second.success_nodes.first()
|
||||
assert third.unified_job_template == proj
|
||||
fourth = third.always_nodes.first()
|
||||
assert fourth.unified_job_template == jt
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_with_missing_ujt(run_module, admin_user, organization, silence_deprecation):
|
||||
result = run_module(
|
||||
'workflow_template', {'name': 'foo-workflow', 'organization': organization.name, 'schema': [{'foo': 'bar'}], 'state': 'present'}, admin_user
|
||||
)
|
||||
assert result.get('failed', False), result
|
||||
assert 'You should provide exactly one of the attributes job_template,' in result['msg']
|
||||
Reference in New Issue
Block a user