awx/awx/main/tests/functional/models/test_workflow.py
AlanCoding e8581f6892
Implement WFJT prompting for limit & scm_branch
add feature to UI and awxkit

restructure some details of create_unified_job
  for workflows to allow use of char_prompts
  hidden field
avoid conflict with sliced jobs in char_prompts copy logic

update developer docs

update migration reference

bump migration
2019-09-16 14:51:53 -04:00

421 lines
16 KiB
Python

# Python
import pytest
from unittest import mock
import json
# AWX
from awx.main.models.workflow import (
WorkflowJob,
WorkflowJobNode,
WorkflowJobTemplateNode,
WorkflowJobTemplate,
)
from awx.main.models.jobs import JobTemplate, Job
from awx.main.models.projects import ProjectUpdate
from awx.main.scheduler.dag_workflow import WorkflowDAG
from awx.api.versioning import reverse
from awx.api.views import WorkflowJobTemplateNodeSuccessNodesList
# Django
from django.test import TransactionTestCase
from django.core.exceptions import ValidationError
class TestWorkflowDAGFunctional(TransactionTestCase):
def workflow_job(self, states=['new', 'new', 'new', 'new', 'new']):
"""
Workflow topology:
node[0]
/\
s/ \f
/ \
node[1] node[3]
/ \
s/ \f
/ \
node[2] node[4]
"""
wfj = WorkflowJob.objects.create()
jt = JobTemplate.objects.create(name='test-jt')
nodes = [WorkflowJobNode.objects.create(workflow_job=wfj, unified_job_template=jt) for i in range(0, 5)]
for node, state in zip(nodes, states):
if state:
node.job = jt.create_job()
node.job.status = state
node.job.save()
node.save()
nodes[0].success_nodes.add(nodes[1])
nodes[1].success_nodes.add(nodes[2])
nodes[0].failure_nodes.add(nodes[3])
nodes[3].failure_nodes.add(nodes[4])
return wfj
def test_build_WFJT_dag(self):
'''
Test that building the graph uses 4 queries
1 to get the nodes
3 to get the related success, failure, and always connections
'''
dag = WorkflowDAG()
wfj = self.workflow_job()
with self.assertNumQueries(4):
dag._init_graph(wfj)
def test_workflow_done(self):
wfj = self.workflow_job(states=['failed', None, None, 'successful', None])
dag = WorkflowDAG(workflow_job=wfj)
assert 3 == len(dag.mark_dnr_nodes())
is_done = dag.is_workflow_done()
has_failed, reason = dag.has_workflow_failed()
self.assertTrue(is_done)
self.assertFalse(has_failed)
assert reason is None
# verify that relaunched WFJ fails if a JT leaf is deleted
for jt in JobTemplate.objects.all():
jt.delete()
relaunched = wfj.create_relaunch_workflow_job()
dag = WorkflowDAG(workflow_job=relaunched)
dag.mark_dnr_nodes()
is_done = dag.is_workflow_done()
has_failed, reason = dag.has_workflow_failed()
self.assertTrue(is_done)
self.assertTrue(has_failed)
assert "Workflow job node {} related unified job template missing".format(wfj.workflow_nodes.all()[0].id)
def test_workflow_fails_for_no_error_handler(self):
wfj = self.workflow_job(states=['successful', 'failed', None, None, None])
dag = WorkflowDAG(workflow_job=wfj)
dag.mark_dnr_nodes()
is_done = dag.is_workflow_done()
has_failed = dag.has_workflow_failed()
self.assertTrue(is_done)
self.assertTrue(has_failed)
def test_workflow_fails_leaf(self):
wfj = self.workflow_job(states=['successful', 'successful', 'failed', None, None])
dag = WorkflowDAG(workflow_job=wfj)
dag.mark_dnr_nodes()
is_done = dag.is_workflow_done()
has_failed = dag.has_workflow_failed()
self.assertTrue(is_done)
self.assertTrue(has_failed)
def test_workflow_not_finished(self):
wfj = self.workflow_job(states=['new', None, None, None, None])
dag = WorkflowDAG(workflow_job=wfj)
dag.mark_dnr_nodes()
is_done = dag.is_workflow_done()
has_failed, reason = dag.has_workflow_failed()
self.assertFalse(is_done)
self.assertFalse(has_failed)
assert reason is None
@pytest.mark.django_db
class TestWorkflowDNR():
@pytest.fixture
def workflow_job_fn(self):
def fn(states=['new', 'new', 'new', 'new', 'new', 'new']):
r"""
Workflow topology:
node[0]
/ |
s f
/ |
node[1] node[3]
/ |
s f
/ |
node[2] node[4]
\ |
s f
\ |
node[5]
"""
wfj = WorkflowJob.objects.create()
jt = JobTemplate.objects.create(name='test-jt')
nodes = [WorkflowJobNode.objects.create(workflow_job=wfj, unified_job_template=jt) for i in range(0, 6)]
for node, state in zip(nodes, states):
if state:
node.job = jt.create_job()
node.job.status = state
node.job.save()
node.save()
nodes[0].success_nodes.add(nodes[1])
nodes[1].success_nodes.add(nodes[2])
nodes[0].failure_nodes.add(nodes[3])
nodes[3].failure_nodes.add(nodes[4])
nodes[2].success_nodes.add(nodes[5])
nodes[4].failure_nodes.add(nodes[5])
return wfj, nodes
return fn
def test_workflow_dnr_because_parent(self, workflow_job_fn):
wfj, nodes = workflow_job_fn(states=['successful', None, None, None, None, None,])
dag = WorkflowDAG(workflow_job=wfj)
workflow_nodes = dag.mark_dnr_nodes()
assert 2 == len(workflow_nodes)
assert nodes[3] in workflow_nodes
assert nodes[4] in workflow_nodes
@pytest.mark.django_db
class TestWorkflowJob:
@pytest.fixture
def workflow_job(self, workflow_job_template_factory):
wfjt = workflow_job_template_factory('blah').workflow_job_template
wfj = WorkflowJob.objects.create(workflow_job_template=wfjt)
nodes = [WorkflowJobTemplateNode.objects.create(workflow_job_template=wfjt) for i in range(0, 5)]
nodes[0].success_nodes.add(nodes[1])
nodes[1].success_nodes.add(nodes[2])
nodes[0].failure_nodes.add(nodes[3])
nodes[3].failure_nodes.add(nodes[4])
return wfj
def test_inherit_job_template_workflow_nodes(self, mocker, workflow_job):
workflow_job.copy_nodes_from_original(original=workflow_job.workflow_job_template)
nodes = WorkflowJob.objects.get(id=workflow_job.id).workflow_job_nodes.all().order_by('created')
assert nodes[0].success_nodes.filter(id=nodes[1].id).exists()
assert nodes[1].success_nodes.filter(id=nodes[2].id).exists()
assert nodes[0].failure_nodes.filter(id=nodes[3].id).exists()
assert nodes[3].failure_nodes.filter(id=nodes[4].id).exists()
def test_inherit_ancestor_artifacts_from_job(self, job_template, mocker):
"""
Assure that nodes along the line of execution inherit artifacts
from both jobs ran, and from the accumulation of old jobs
"""
# Related resources
wfj = WorkflowJob.objects.create(name='test-wf-job')
job = Job.objects.create(name='test-job', artifacts={'b': 43})
# Workflow job nodes
job_node = WorkflowJobNode.objects.create(workflow_job=wfj, job=job,
ancestor_artifacts={'a': 42})
queued_node = WorkflowJobNode.objects.create(workflow_job=wfj, unified_job_template=job_template)
# Connect old job -> new job
mocker.patch.object(queued_node, 'get_parent_nodes', lambda: [job_node])
assert queued_node.get_job_kwargs()['extra_vars'] == {'a': 42, 'b': 43}
assert queued_node.ancestor_artifacts == {'a': 42, 'b': 43}
def test_inherit_ancestor_artifacts_from_project_update(self, project, job_template, mocker):
"""
Test that the existence of a project update (no artifacts) does
not break the flow of ancestor_artifacts
"""
# Related resources
wfj = WorkflowJob.objects.create(name='test-wf-job')
update = ProjectUpdate.objects.create(name='test-update', project=project)
# Workflow job nodes
project_node = WorkflowJobNode.objects.create(workflow_job=wfj, job=update,
ancestor_artifacts={'a': 42, 'b': 43})
queued_node = WorkflowJobNode.objects.create(workflow_job=wfj, unified_job_template=job_template)
# Connect project update -> new job
mocker.patch.object(queued_node, 'get_parent_nodes', lambda: [project_node])
assert queued_node.get_job_kwargs()['extra_vars'] == {'a': 42, 'b': 43}
assert queued_node.ancestor_artifacts == {'a': 42, 'b': 43}
@pytest.mark.django_db
class TestWorkflowJobTemplate:
@pytest.fixture
def wfjt(self, workflow_job_template_factory, organization):
wfjt = workflow_job_template_factory(
'test', organization=organization).workflow_job_template
wfjt.organization = organization
nodes = [WorkflowJobTemplateNode.objects.create(workflow_job_template=wfjt) for i in range(0, 3)]
nodes[0].success_nodes.add(nodes[1])
nodes[1].failure_nodes.add(nodes[2])
return wfjt
def test_node_parentage(self, wfjt):
# test success parent
wfjt_node = wfjt.workflow_job_template_nodes.all()[1]
parent_qs = wfjt_node.get_parent_nodes()
assert len(parent_qs) == 1
assert parent_qs[0] == wfjt.workflow_job_template_nodes.all()[0]
# test failure parent
wfjt_node = wfjt.workflow_job_template_nodes.all()[2]
parent_qs = wfjt_node.get_parent_nodes()
assert len(parent_qs) == 1
assert parent_qs[0] == wfjt.workflow_job_template_nodes.all()[1]
def test_topology_validator(self, wfjt):
test_view = WorkflowJobTemplateNodeSuccessNodesList()
nodes = wfjt.workflow_job_template_nodes.all()
# test cycle validation
assert test_view.is_valid_relation(nodes[2], nodes[0]) == {'Error': 'Cycle detected.'}
def test_always_success_failure_creation(self, wfjt, admin, get):
wfjt_node = wfjt.workflow_job_template_nodes.all()[1]
node = WorkflowJobTemplateNode.objects.create(workflow_job_template=wfjt)
wfjt_node.always_nodes.add(node)
assert len(node.get_parent_nodes()) == 1
url = reverse('api:workflow_job_template_node_list') + str(wfjt_node.id) + '/'
resp = get(url, admin)
assert node.id in resp.data['always_nodes']
def test_wfjt_unique_together_with_org(self, organization):
wfjt1 = WorkflowJobTemplate(name='foo', organization=organization)
wfjt1.save()
wfjt2 = WorkflowJobTemplate(name='foo', organization=organization)
with pytest.raises(ValidationError):
wfjt2.validate_unique()
wfjt2 = WorkflowJobTemplate(name='foo', organization=None)
wfjt2.validate_unique()
@pytest.mark.django_db
class TestWorkflowJobTemplatePrompts:
"""These are tests for prompts that live on the workflow job template model
not the node, prompts apply for entire workflow
"""
@pytest.fixture
def wfjt_prompts(self):
return WorkflowJobTemplate.objects.create(
ask_inventory_on_launch=True,
ask_variables_on_launch=True,
ask_limit_on_launch=True,
ask_scm_branch_on_launch=True
)
@pytest.fixture
def prompts_data(self, inventory):
return dict(
inventory=inventory,
extra_vars={'foo': 'bar'},
limit='webservers',
scm_branch='release-3.3'
)
def test_apply_workflow_job_prompts(self, workflow_job_template, wfjt_prompts, prompts_data, inventory):
# null or empty fields used
workflow_job = workflow_job_template.create_unified_job()
assert workflow_job.limit is None
assert workflow_job.inventory is None
assert workflow_job.scm_branch is None
# fields from prompts used
workflow_job = workflow_job_template.create_unified_job(**prompts_data)
assert json.loads(workflow_job.extra_vars) == {'foo': 'bar'}
assert workflow_job.limit == 'webservers'
assert workflow_job.inventory == inventory
assert workflow_job.scm_branch == 'release-3.3'
# non-null fields from WFJT used
workflow_job_template.inventory = inventory
workflow_job_template.limit = 'fooo'
workflow_job_template.scm_branch = 'bar'
workflow_job = workflow_job_template.create_unified_job()
assert workflow_job.limit == 'fooo'
assert workflow_job.inventory == inventory
assert workflow_job.scm_branch == 'bar'
@pytest.mark.django_db
def test_process_workflow_job_prompts(self, inventory, workflow_job_template, wfjt_prompts, prompts_data):
accepted, rejected, errors = workflow_job_template._accept_or_ignore_job_kwargs(**prompts_data)
assert accepted == {}
assert rejected == prompts_data
assert errors
accepted, rejected, errors = wfjt_prompts._accept_or_ignore_job_kwargs(**prompts_data)
assert accepted == prompts_data
assert rejected == {}
assert not errors
@pytest.mark.django_db
def test_set_all_the_prompts(self, post, organization, inventory, org_admin):
r = post(
url = reverse('api:workflow_job_template_list'),
data = dict(
name='My new workflow',
organization=organization.id,
inventory=inventory.id,
limit='foooo',
ask_limit_on_launch=True,
scm_branch='bar',
ask_scm_branch_on_launch=True
),
user = org_admin,
expect = 201
)
wfjt = WorkflowJobTemplate.objects.get(id=r.data['id'])
assert wfjt.char_prompts == {
'limit': 'foooo', 'scm_branch': 'bar'
}
assert wfjt.ask_scm_branch_on_launch is True
assert wfjt.ask_limit_on_launch is True
launch_url = r.data['related']['launch']
with mock.patch('awx.main.queue.CallbackQueueDispatcher.dispatch', lambda self, obj: None):
r = post(
url = launch_url,
data = dict(
scm_branch = 'prompt_branch',
limit = 'prompt_limit'
),
user = org_admin,
expect=201
)
assert r.data['limit'] == 'prompt_limit'
assert r.data['scm_branch'] == 'prompt_branch'
@pytest.mark.django_db
def test_workflow_ancestors(organization):
# Spawn order of templates grandparent -> parent -> child
# create child WFJT and workflow job
child = WorkflowJobTemplate.objects.create(organization=organization, name='child')
child_job = WorkflowJob.objects.create(
workflow_job_template=child,
launch_type='workflow'
)
# create parent WFJT and workflow job, and link it up
parent = WorkflowJobTemplate.objects.create(organization=organization, name='parent')
parent_job = WorkflowJob.objects.create(
workflow_job_template=parent,
launch_type='workflow'
)
WorkflowJobNode.objects.create(
workflow_job=parent_job,
unified_job_template=child,
job=child_job
)
# create grandparent WFJT and workflow job and link it up
grandparent = WorkflowJobTemplate.objects.create(organization=organization, name='grandparent')
grandparent_job = WorkflowJob.objects.create(
workflow_job_template=grandparent,
launch_type='schedule'
)
WorkflowJobNode.objects.create(
workflow_job=grandparent_job,
unified_job_template=parent,
job=parent_job
)
# ancestors method gives a list of WFJT ids
assert child_job.get_ancestor_workflows() == [parent, grandparent]
@pytest.mark.django_db
def test_workflow_ancestors_recursion_prevention(organization):
# This is toxic database data, this tests that it doesn't create an infinite loop
wfjt = WorkflowJobTemplate.objects.create(organization=organization, name='child')
wfj = WorkflowJob.objects.create(
workflow_job_template=wfjt,
launch_type='workflow'
)
WorkflowJobNode.objects.create(
workflow_job=wfj,
unified_job_template=wfjt,
job=wfj # well, this is a problem
)
# mostly, we just care that this assertion finishes in finite time
assert wfj.get_ancestor_workflows() == []