Merge pull request #3708 from AlanCoding/ancestors_PR

Store vars from set_artifact tasks, pipe data through workflows
This commit is contained in:
Alan Rominger 2016-10-13 15:59:58 -04:00 committed by GitHub
commit 1979d63c25
10 changed files with 180 additions and 21 deletions

View File

@ -1920,13 +1920,14 @@ class JobSerializer(UnifiedJobSerializer, JobOptionsSerializer):
ask_job_type_on_launch = serializers.ReadOnlyField()
ask_inventory_on_launch = serializers.ReadOnlyField()
ask_credential_on_launch = serializers.ReadOnlyField()
artifacts = serializers.SerializerMethodField()
class Meta:
model = Job
fields = ('*', 'job_template', 'passwords_needed_to_start', 'ask_variables_on_launch',
'ask_limit_on_launch', 'ask_tags_on_launch', 'ask_skip_tags_on_launch',
'ask_job_type_on_launch', 'ask_inventory_on_launch', 'ask_credential_on_launch',
'allow_simultaneous',)
'allow_simultaneous', 'artifacts',)
def get_related(self, obj):
res = super(JobSerializer, self).get_related(obj)
@ -1949,6 +1950,11 @@ class JobSerializer(UnifiedJobSerializer, JobOptionsSerializer):
res['relaunch'] = reverse('api:job_relaunch', args=(obj.pk,))
return res
def get_artifacts(self, obj):
if obj:
return obj.display_artifacts()
return {}
def to_internal_value(self, data):
# When creating a new job and a job template is specified, populate any
# fields not provided in data from the job template.

View File

@ -4,6 +4,7 @@
# Python
import datetime
import logging
import json
from kombu import Connection, Exchange, Queue
from kombu.mixins import ConsumerMixin
@ -80,6 +81,7 @@ class CallbackBrokerWorker(ConsumerMixin):
event_uuid = payload.get("uuid", '')
parent_event_uuid = payload.get("parent_uuid", '')
artifact_data = payload.get("artifact_data", None)
# Sanity check: Don't honor keys that we don't recognize.
for key in payload.keys():
@ -123,6 +125,23 @@ class CallbackBrokerWorker(ConsumerMixin):
except DatabaseError as e:
logger.error("Database Error Saving Job Event: {}".format(e))
if artifact_data:
try:
self.process_artifacts(artifact_data, res, payload)
except DatabaseError as e:
logger.error("Database Error Saving Job Artifacts: {}".format(e))
def process_artifacts(self, artifact_data, res, payload):
artifact_dict = json.loads(artifact_data)
if res and isinstance(res, dict):
if res.get('_ansible_no_log', False):
artifact_dict['_ansible_no_log'] = True
if artifact_data is not None:
parent_job = Job.objects.filter(pk=payload['job_id']).first()
if parent_job is not None and parent_job.artifacts != artifact_dict:
parent_job.artifacts = artifact_dict
parent_job.save(update_fields=['artifacts'])
class Command(NoArgsCommand):
'''

View File

@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
import jsonfield.fields
class Migration(migrations.Migration):
dependencies = [
('main', '0039_v310_channelgroup'),
]
operations = [
migrations.AddField(
model_name='job',
name='artifacts',
field=jsonfield.fields.JSONField(default={}, editable=False, blank=True),
),
migrations.AddField(
model_name='workflowjobnode',
name='ancestor_artifacts',
field=jsonfield.fields.JSONField(default={}, editable=False, blank=True),
),
]

View File

@ -550,6 +550,11 @@ class Job(UnifiedJob, JobOptions, JobNotificationMixin):
default={},
editable=False,
)
artifacts = JSONField(
blank=True,
default={},
editable=False,
)
@classmethod
def _get_parent_field_name(cls):
@ -775,6 +780,15 @@ class Job(UnifiedJob, JobOptions, JobNotificationMixin):
else:
return self.extra_vars
def display_artifacts(self):
'''
Hides artifacts if they are marked as no_log type artifacts.
'''
artifacts = self.artifacts
if artifacts.get('_ansible_no_log', False):
return "$hidden due to Ansible no_log flag$"
return artifacts
def _survey_search_and_replace(self, content):
# Use job template survey spec to identify password fields.
# Then lookup password fields in extra_vars and save the values

View File

@ -33,7 +33,7 @@ from djcelery.models import TaskMeta
from awx.main.models.base import * # noqa
from awx.main.models.schedules import Schedule
from awx.main.utils import decrypt_field, _inventory_updates
from awx.main.redact import UriCleaner
from awx.main.redact import UriCleaner, REPLACE_STR
from awx.main.consumers import emit_channel_notification
__all__ = ['UnifiedJobTemplate', 'UnifiedJob']
@ -348,11 +348,10 @@ class UnifiedJobTemplate(PolymorphicModel, CommonModelNameNotUnique, Notificatio
create_kwargs[field_name] = getattr(self, field_name)
new_kwargs = self._update_unified_job_kwargs(**create_kwargs)
unified_job = unified_job_class(**new_kwargs)
# For JobTemplate-based jobs with surveys, save list for perma-redaction
if (hasattr(self, 'survey_spec') and getattr(self, 'survey_enabled', False) and
not getattr(unified_job, 'survey_passwords', False)):
# For JobTemplate-based jobs with surveys, add passwords to list for perma-redaction
if hasattr(self, 'survey_spec') and getattr(self, 'survey_enabled', False):
password_list = self.survey_password_variables()
hide_password_dict = {}
hide_password_dict = getattr(unified_job, 'survey_passwords', {})
for password in password_list:
hide_password_dict[password] = REPLACE_STR
unified_job.survey_passwords = hide_password_dict

View File

@ -21,9 +21,9 @@ from awx.main.models.rbac import (
)
from awx.main.fields import ImplicitRoleField
from awx.main.models.mixins import ResourceMixin
from awx.main.redact import REPLACE_STR
import yaml
import json
from copy import copy
__all__ = ['WorkflowJobTemplate', 'WorkflowJob', 'WorkflowJobOptions', 'WorkflowJobNode', 'WorkflowJobTemplateNode',]
@ -124,6 +124,13 @@ class WorkflowNodeBase(CreatedModifiedModel):
data['missing'] = missing_dict
return data
def get_parent_nodes(self):
'''Returns queryset containing all parents of this node'''
success_parents = getattr(self, '%ss_success' % self.__class__.__name__.lower()).all()
failure_parents = getattr(self, '%ss_failure' % self.__class__.__name__.lower()).all()
always_parents = getattr(self, '%ss_always' % self.__class__.__name__.lower()).all()
return success_parents | failure_parents | always_parents
@classmethod
def _get_workflow_job_field_names(cls):
'''
@ -175,11 +182,22 @@ class WorkflowJobNode(WorkflowNodeBase):
default=None,
on_delete=models.CASCADE,
)
ancestor_artifacts = JSONField(
blank=True,
default={},
editable=False,
)
def get_absolute_url(self):
return reverse('api:workflow_job_node_detail', args=(self.pk,))
def get_job_kwargs(self):
'''
In advance of creating a new unified job as part of a workflow,
this method builds the attributes to use
It alters the node by saving its updated version of
ancestor_artifacts, making it available to subsequent nodes.
'''
# reject/accept prompted fields
data = {}
ujt_obj = self.unified_job_template
@ -189,19 +207,31 @@ class WorkflowJobNode(WorkflowNodeBase):
accepted_fields.pop(fd)
data.update(accepted_fields)
# TODO: decide what to do in the event of missing fields
# build ancestor artifacts, save them to node model for later
aa_dict = {}
for parent_node in self.get_parent_nodes():
aa_dict.update(parent_node.ancestor_artifacts)
if parent_node.job and hasattr(parent_node.job, 'artifacts'):
aa_dict.update(parent_node.job.artifacts)
if aa_dict:
self.ancestor_artifacts = aa_dict
self.save(update_fields=['ancestor_artifacts'])
if '_ansible_no_log' in aa_dict:
# TODO: merge Workflow Job survey passwords into this
password_dict = {}
for key in aa_dict:
if key != '_ansible_no_log':
password_dict[key] = REPLACE_STR
data['survey_passwords'] = password_dict
# process extra_vars
# TODO: still lack consensus about variable precedence
extra_vars = {}
if self.workflow_job and self.workflow_job.extra_vars:
try:
WJ_json_extra_vars = json.loads(
(self.workflow_job.extra_vars or '').strip() or '{}')
except ValueError:
try:
WJ_json_extra_vars = yaml.safe_load(self.workflow_job.extra_vars)
except yaml.YAMLError:
WJ_json_extra_vars = {}
extra_vars.update(WJ_json_extra_vars)
# TODO: merge artifacts, add ancestor_artifacts to kwargs
extra_vars.update(self.workflow_job.extra_vars_dict)
if aa_dict:
functional_aa_dict = copy(aa_dict)
functional_aa_dict.pop('_ansible_no_log', None)
extra_vars.update(functional_aa_dict)
if extra_vars:
data['extra_vars'] = extra_vars
return data

View File

@ -893,7 +893,7 @@ class RunJob(BaseTask):
'tower_user_name': job.created_by.username,
})
if job.extra_vars_dict:
if kwargs.get('display', False) and job.job_template and job.job_template.survey_enabled:
if kwargs.get('display', False) and job.job_template:
extra_vars.update(json.loads(job.display_extra_vars()))
else:
extra_vars.update(job.extra_vars_dict)

View File

@ -3,8 +3,11 @@
import pytest
# AWX
from awx.main.models.workflow import WorkflowJob, WorkflowJobTemplateNode
from awx.main.models.workflow import WorkflowJob, WorkflowJobNode, WorkflowJobTemplateNode
from awx.main.models.jobs import Job
from awx.main.models.projects import ProjectUpdate
@pytest.mark.django_db
class TestWorkflowJob:
@pytest.fixture
def workflow_job(self, workflow_job_template_factory):
@ -21,7 +24,6 @@ class TestWorkflowJob:
return wfj
@pytest.mark.django_db
def test_inherit_job_template_workflow_nodes(self, mocker, workflow_job):
workflow_job.inherit_job_template_workflow_nodes()
@ -31,4 +33,60 @@ class TestWorkflowJob:
assert nodes[0].failure_nodes.filter(id=nodes[3].id).exists()
assert nodes[3].failure_nodes.filter(id=nodes[4].id).exists()
def test_inherit_ancestor_artifacts_from_job(self, project, mocker):
"""
Assure that nodes along the line of execution inherit artifacts
from both jobs ran, and from the accumulation of old jobs
"""
# Related resources
wfj = WorkflowJob.objects.create(name='test-wf-job')
job = Job.objects.create(name='test-job', artifacts={'b': 43})
# Workflow job nodes
job_node = WorkflowJobNode.objects.create(workflow_job=wfj, job=job,
ancestor_artifacts={'a': 42})
queued_node = WorkflowJobNode.objects.create(workflow_job=wfj)
# Connect old job -> new job
mocker.patch.object(queued_node, 'get_parent_nodes', lambda: [job_node])
assert queued_node.get_job_kwargs()['extra_vars'] == {'a': 42, 'b': 43}
assert queued_node.ancestor_artifacts == {'a': 42, 'b': 43}
def test_inherit_ancestor_artifacts_from_project_update(self, project, mocker):
"""
Test that the existence of a project update (no artifacts) does
not break the flow of ancestor_artifacts
"""
# Related resources
wfj = WorkflowJob.objects.create(name='test-wf-job')
update = ProjectUpdate.objects.create(name='test-update', project=project)
# Workflow job nodes
project_node = WorkflowJobNode.objects.create(workflow_job=wfj, job=update,
ancestor_artifacts={'a': 42, 'b': 43})
queued_node = WorkflowJobNode.objects.create(workflow_job=wfj)
# Connect project update -> new job
mocker.patch.object(queued_node, 'get_parent_nodes', lambda: [project_node])
assert queued_node.get_job_kwargs()['extra_vars'] == {'a': 42, 'b': 43}
assert queued_node.ancestor_artifacts == {'a': 42, 'b': 43}
@pytest.mark.django_db
class TestWorkflowJobTemplate:
@pytest.fixture
def wfjt(self, workflow_job_template_factory):
wfjt = workflow_job_template_factory('test').workflow_job_template
nodes = [WorkflowJobTemplateNode.objects.create(workflow_job_template=wfjt) for i in range(0, 3)]
nodes[0].success_nodes.add(nodes[1])
nodes[1].failure_nodes.add(nodes[2])
return wfjt
def test_node_parentage(self, wfjt):
# test success parent
wfjt_node = wfjt.workflow_job_template_nodes.all()[1]
parent_qs = wfjt_node.get_parent_nodes()
assert len(parent_qs) == 1
assert parent_qs[0] == wfjt.workflow_job_template_nodes.all()[0]
# test failure parent
wfjt_node = wfjt.workflow_job_template_nodes.all()[2]
parent_qs = wfjt_node.get_parent_nodes()
assert len(parent_qs) == 1
assert parent_qs[0] == wfjt.workflow_job_template_nodes.all()[1]

View File

@ -6,6 +6,7 @@ from awx.main.models.workflow import (
WorkflowJobTemplate, WorkflowJobTemplateNode, WorkflowJobInheritNodesMixin,
WorkflowJob, WorkflowJobNode
)
import mock
class TestWorkflowJobInheritNodesMixin():
class TestCreateWorkflowJobNodes():
@ -151,6 +152,7 @@ class TestWorkflowJobCreate:
unified_job_template=wfjt_node_with_prompts.unified_job_template,
workflow_job=workflow_job_unit)
@mock.patch('awx.main.models.workflow.WorkflowNodeBase.get_parent_nodes', lambda self: [])
class TestWorkflowJobNodeJobKWARGS:
"""
Tests for building the keyword arguments that go into creating and

View File

@ -184,6 +184,9 @@ class BaseCallbackModule(object):
if getattr(self, 'ad_hoc_command_id', None):
msg['ad_hoc_command_id'] = self.ad_hoc_command_id
if getattr(self, 'artifact_data', None):
msg['artifact_data'] = self.artifact_data
active_pid = os.getpid()
if self.job_callback_debug:
msg.update({
@ -416,6 +419,9 @@ class JobCallbackModule(BaseCallbackModule):
event_data['task'] = task_name
if role_name and event not in self.EVENTS_WITHOUT_TASK:
event_data['role'] = role_name
self.artifact_data = None
if 'res' in event_data and 'artifact_data' in event_data['res']:
self.artifact_data = event_data['res']['artifact_data']
super(JobCallbackModule, self)._log_event(event, **event_data)
def playbook_on_start(self):