inject WF node prompts into new jobs, new workflow RBAC tests

This commit is contained in:
AlanCoding
2016-09-21 16:04:43 -04:00
parent 2ffa7a91ec
commit 9acd50b8f3
10 changed files with 279 additions and 113 deletions

View File

@@ -167,11 +167,11 @@ def mk_workflow_job(status='new', workflow_job_template=None, extra_vars={},
job.save()
return job
def mk_workflow_job_template(name, extra_vars='', spec=None, persisted=True):
def mk_workflow_job_template(name, extra_vars='', spec=None, organization=None, persisted=True):
if extra_vars:
extra_vars = json.dumps(extra_vars)
wfjt = WorkflowJobTemplate(name=name, extra_vars=extra_vars)
wfjt = WorkflowJobTemplate(name=name, extra_vars=extra_vars, organization=organization)
wfjt.survey_spec = spec
if wfjt.survey_spec is not None:

View File

@@ -360,16 +360,20 @@ def generate_workflow_job_template_nodes(workflow_job_template,
new_node = WorkflowJobTemplateNode(workflow_job_template=workflow_job_template,
unified_job_template=node['unified_job_template'],
id=i)
if persisted:
new_node.save()
new_nodes.append(new_node)
node_types = ['success_nodes', 'failure_nodes', 'always_nodes']
for node_type in node_types:
for i, new_node in enumerate(new_nodes):
if node_type not in workflow_job_template_nodes[i]:
continue
for related_index in workflow_job_template_nodes[i][node_type]:
getattr(new_node, node_type).add(new_nodes[related_index])
# TODO: Implement survey and jobs
def create_workflow_job_template(name, persisted=True, **kwargs):
def create_workflow_job_template(name, organization=None, persisted=True, **kwargs):
Objects = generate_objects(["workflow_job_template",
"workflow_job_template_nodes",
"survey",], kwargs)
@@ -382,7 +386,8 @@ def create_workflow_job_template(name, persisted=True, **kwargs):
if 'survey' in kwargs:
spec = create_survey_spec(kwargs['survey'])
wfjt = mk_workflow_job_template(name,
wfjt = mk_workflow_job_template(name,
organization=organization,
spec=spec,
extra_vars=extra_vars,
persisted=persisted)

View File

@@ -0,0 +1,73 @@
import pytest
from awx.main.access import (
WorkflowJobTemplateAccess,
WorkflowJobTemplateNodeAccess,
WorkflowJobAccess,
# WorkflowJobNodeAccess
)
@pytest.fixture
def wfjt(workflow_job_template_factory, organization):
objects = workflow_job_template_factory('test_workflow', organization=organization, persisted=True)
return objects.workflow_job_template
@pytest.fixture
def wfjt_with_nodes(workflow_job_template_factory, organization, job_template):
objects = workflow_job_template_factory(
'test_workflow', organization=organization, workflow_job_template_nodes=[{'unified_job_template': job_template}], persisted=True)
return objects.workflow_job_template
@pytest.fixture
def wfjt_node(wfjt_with_nodes):
return wfjt_with_nodes.workflow_job_template_nodes.all()[0]
@pytest.fixture
def workflow_job(wfjt):
return wfjt.jobs.create(name='test_workflow')
@pytest.mark.django_db
class TestWorkflowJobTemplateAccess:
def test_random_user_no_edit(self, wfjt, rando):
access = WorkflowJobTemplateAccess(rando)
assert not access.can_change(wfjt, {'name': 'new name'})
def test_org_admin_edit(self, wfjt, org_admin):
access = WorkflowJobTemplateAccess(org_admin)
assert access.can_change(wfjt, {'name': 'new name'})
def test_org_admin_role_inheritance(self, wfjt, org_admin):
assert org_admin in wfjt.admin_role
assert org_admin in wfjt.execute_role
assert org_admin in wfjt.read_role
def test_jt_blocks_copy(self, wfjt_with_nodes, org_admin):
"""I want to copy a workflow JT in my organization, but someone
included a job template that I don't have access to, so I can
not copy the WFJT as-is"""
access = WorkflowJobTemplateAccess(org_admin)
assert not access.can_add({'reference_obj': wfjt_with_nodes})
@pytest.mark.django_db
class TestWorkflowJobTemplateNodeAccess:
def test_jt_access_to_edit(self, wfjt_node, org_admin):
access = WorkflowJobTemplateNodeAccess(org_admin)
assert not access.can_change(wfjt_node, {'job_type': 'scan'})
@pytest.mark.django_db
class TestWorkflowJobAccess:
def test_wfjt_admin_delete(self, wfjt, workflow_job, rando):
wfjt.admin_role.members.add(rando)
access = WorkflowJobAccess(rando)
assert access.can_delete(workflow_job)
def test_cancel_your_own_job(self, wfjt, workflow_job, rando):
wfjt.execute_role.members.add(rando)
workflow_job.created_by = rando
workflow_job.save()
access = WorkflowJobAccess(rando)
assert access.can_cancel(workflow_job)

View File

@@ -118,11 +118,19 @@ class TestWorkflowAccessMethods:
objects = workflow_job_template_factory('test_workflow', persisted=False)
return objects.workflow_job_template
class MockQuerySet(object):
pass
def test_workflow_can_add(self, workflow, user_unit):
# user_unit.admin_of_organizations = self.MockQuerySet()
access = WorkflowJobTemplateAccess(user_unit)
assert access.can_add({'organization': 1})
organization = Organization(name='test-org')
workflow.organization = organization
organization.admin_role = Role()
def mock_get_object(Class, **kwargs):
if Class == Organization:
return organization
else:
raise Exception('Item requested has not been mocked')
access = WorkflowJobTemplateAccess(user_unit)
with mock.patch('awx.main.models.rbac.Role.__contains__', return_value=True):
with mock.patch('awx.main.access.get_object_or_400', mock_get_object):
assert access.can_add({'organization': 1})