Add job artifacts and workflow artifact passing

artifacts redact from job when no_log is set
parent no_log artifacts treated as survey passwords
This commit is contained in:
AlanCoding 2016-09-15 15:48:18 -04:00
parent 7bd19b8e98
commit 5d4cf9d4fc
11 changed files with 243 additions and 18 deletions

View File

@ -1920,13 +1920,14 @@ class JobSerializer(UnifiedJobSerializer, JobOptionsSerializer):
ask_job_type_on_launch = serializers.ReadOnlyField()
ask_inventory_on_launch = serializers.ReadOnlyField()
ask_credential_on_launch = serializers.ReadOnlyField()
artifacts = serializers.SerializerMethodField()
class Meta:
model = Job
fields = ('*', 'job_template', 'passwords_needed_to_start', 'ask_variables_on_launch',
'ask_limit_on_launch', 'ask_tags_on_launch', 'ask_skip_tags_on_launch',
'ask_job_type_on_launch', 'ask_inventory_on_launch', 'ask_credential_on_launch',
'allow_simultaneous',)
'allow_simultaneous', 'artifacts',)
def get_related(self, obj):
res = super(JobSerializer, self).get_related(obj)
@ -1949,6 +1950,11 @@ class JobSerializer(UnifiedJobSerializer, JobOptionsSerializer):
res['relaunch'] = reverse('api:job_relaunch', args=(obj.pk,))
return res
def get_artifacts(self, obj):
if obj:
return obj.display_artifacts()
return {}
def to_internal_value(self, data):
# When creating a new job and a job template is specified, populate any
# fields not provided in data from the job template.

View File

@ -4,6 +4,7 @@
# Python
import datetime
import logging
import json
from kombu import Connection, Exchange, Queue
from kombu.mixins import ConsumerMixin
@ -80,6 +81,7 @@ class CallbackBrokerWorker(ConsumerMixin):
event_uuid = payload.get("uuid", '')
parent_event_uuid = payload.get("parent_uuid", '')
artifact_data = payload.get("artifact_data", None)
# Sanity check: Don't honor keys that we don't recognize.
for key in payload.keys():
@ -123,6 +125,23 @@ class CallbackBrokerWorker(ConsumerMixin):
except DatabaseError as e:
logger.error("Database Error Saving Job Event: {}".format(e))
if artifact_data:
try:
self.process_artifacts(artifact_data, res, payload)
except DatabaseError as e:
logger.error("Database Error Saving Job Artifacts: {}".format(e))
def process_artifacts(self, artifact_data, res, payload):
artifact_dict = json.loads(artifact_data)
if res and isinstance(res, dict):
if res.get('_ansible_no_log', False):
artifact_dict['_ansible_no_log'] = True
if artifact_data is not None:
parent_job = Job.objects.filter(pk=payload['job_id']).first()
if parent_job is not None and parent_job.artifacts != artifact_dict:
parent_job.artifacts = artifact_dict
parent_job.save(update_fields=['artifacts'])
class Command(NoArgsCommand):
'''

View File

@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
import jsonfield.fields
class Migration(migrations.Migration):
dependencies = [
('main', '0039_v310_channelgroup'),
]
operations = [
migrations.AddField(
model_name='job',
name='artifacts',
field=jsonfield.fields.JSONField(default={}, editable=False, blank=True),
),
migrations.AddField(
model_name='workflowjobnode',
name='ancestor_artifacts',
field=jsonfield.fields.JSONField(default={}, editable=False, blank=True),
),
]

View File

@ -550,6 +550,11 @@ class Job(UnifiedJob, JobOptions, JobNotificationMixin):
default={},
editable=False,
)
artifacts = JSONField(
blank=True,
default={},
editable=False,
)
@classmethod
def _get_parent_field_name(cls):
@ -775,6 +780,15 @@ class Job(UnifiedJob, JobOptions, JobNotificationMixin):
else:
return self.extra_vars
def display_artifacts(self):
'''
Hides artifacts if they are marked as no_log type artifacts.
'''
artifacts = self.artifacts
if artifacts.get('_ansible_no_log', False):
return "$hidden due to Ansible no_log flag$"
return artifacts
def _survey_search_and_replace(self, content):
# Use job template survey spec to identify password fields.
# Then lookup password fields in extra_vars and save the values

View File

@ -348,11 +348,10 @@ class UnifiedJobTemplate(PolymorphicModel, CommonModelNameNotUnique, Notificatio
create_kwargs[field_name] = getattr(self, field_name)
new_kwargs = self._update_unified_job_kwargs(**create_kwargs)
unified_job = unified_job_class(**new_kwargs)
# For JobTemplate-based jobs with surveys, save list for perma-redaction
if (hasattr(self, 'survey_spec') and getattr(self, 'survey_enabled', False) and
not getattr(unified_job, 'survey_passwords', False)):
# For JobTemplate-based jobs with surveys, add passwords to list for perma-redaction
if hasattr(self, 'survey_spec') and getattr(self, 'survey_enabled', False):
password_list = self.survey_password_variables()
hide_password_dict = {}
hide_password_dict = getattr(unified_job, 'survey_passwords', {})
for password in password_list:
hide_password_dict[password] = REPLACE_STR
unified_job.survey_passwords = hide_password_dict

View File

@ -21,7 +21,9 @@ from awx.main.models.rbac import (
)
from awx.main.fields import ImplicitRoleField
from awx.main.models.mixins import ResourceMixin
from awx.main.redact import REPLACE_STR
from copy import copy
import yaml
import json
@ -124,6 +126,13 @@ class WorkflowNodeBase(CreatedModifiedModel):
data['missing'] = missing_dict
return data
def get_parent_nodes(self):
'''Returns queryset containing all parents of this node'''
success_parents = getattr(self, '%ss_success' % self.__class__.__name__.lower()).all()
failure_parents = getattr(self, '%ss_failure' % self.__class__.__name__.lower()).all()
always_parents = getattr(self, '%ss_always' % self.__class__.__name__.lower()).all()
return success_parents | failure_parents | always_parents
@classmethod
def _get_workflow_job_field_names(cls):
'''
@ -175,11 +184,22 @@ class WorkflowJobNode(WorkflowNodeBase):
default=None,
on_delete=models.CASCADE,
)
ancestor_artifacts = JSONField(
blank=True,
default={},
editable=False,
)
def get_absolute_url(self):
return reverse('api:workflow_job_node_detail', args=(self.pk,))
def get_job_kwargs(self):
'''
In advance of creating a new unified job as part of a workflow,
this method builds the attributes to use
It alters the node by saving its updated version of
ancestor_artifacts, making it available to subsequent nodes.
'''
# reject/accept prompted fields
data = {}
ujt_obj = self.unified_job_template
@ -189,19 +209,31 @@ class WorkflowJobNode(WorkflowNodeBase):
accepted_fields.pop(fd)
data.update(accepted_fields)
# TODO: decide what to do in the event of missing fields
# build ancestor artifacts, save them to node model for later
aa_dict = {}
for parent_node in self.get_parent_nodes():
aa_dict.update(parent_node.ancestor_artifacts)
if parent_node.job and hasattr(parent_node.job, 'artifacts'):
aa_dict.update(parent_node.job.artifacts)
if aa_dict:
self.ancestor_artifacts = aa_dict
self.save(update_fields=['ancestor_artifacts'])
if '_ansible_no_log' in aa_dict:
# TODO: merge Workflow Job survey passwords into this
password_dict = {}
for key in aa_dict:
if key != '_ansible_no_log':
password_dict[key] = REPLACE_STR
data['survey_passwords'] = password_dict
# process extra_vars
# TODO: still lack consensus about variable precedence
extra_vars = {}
if self.workflow_job and self.workflow_job.extra_vars:
try:
WJ_json_extra_vars = json.loads(
(self.workflow_job.extra_vars or '').strip() or '{}')
except ValueError:
try:
WJ_json_extra_vars = yaml.safe_load(self.workflow_job.extra_vars)
except yaml.YAMLError:
WJ_json_extra_vars = {}
extra_vars.update(WJ_json_extra_vars)
# TODO: merge artifacts, add ancestor_artifacts to kwargs
extra_vars.update(self.workflow_job.extra_vars_dict)
if aa_dict:
functional_aa_dict = copy(aa_dict)
functional_aa_dict.pop('_ansible_no_log', None)
extra_vars.update(functional_aa_dict)
if extra_vars:
data['extra_vars'] = extra_vars
return data

View File

@ -821,6 +821,11 @@ class RunJob(BaseTask):
env['ANSIBLE_CACHE_PLUGINS'] = self.get_path_to('..', 'plugins', 'fact_caching')
env['ANSIBLE_CACHE_PLUGIN'] = "tower"
env['ANSIBLE_CACHE_PLUGIN_CONNECTION'] = "tcp://127.0.0.1:%s" % str(settings.FACT_CACHE_PORT)
# Set artifact module path
# TODO: restrict this to workflow jobs, or JTs expecting artifacts
env['ANSIBLE_LIBRARY'] = self.get_path_to('..', 'plugins', 'library')
return env
def build_args(self, job, **kwargs):
@ -893,7 +898,7 @@ class RunJob(BaseTask):
'tower_user_name': job.created_by.username,
})
if job.extra_vars_dict:
if kwargs.get('display', False) and job.job_template and job.job_template.survey_enabled:
if kwargs.get('display', False) and job.job_template:
extra_vars.update(json.loads(job.display_extra_vars()))
else:
extra_vars.update(job.extra_vars_dict)

View File

@ -3,8 +3,11 @@
import pytest
# AWX
from awx.main.models.workflow import WorkflowJob, WorkflowJobTemplateNode
from awx.main.models.workflow import WorkflowJob, WorkflowJobNode, WorkflowJobTemplateNode
from awx.main.models.jobs import Job
from awx.main.models.projects import ProjectUpdate
@pytest.mark.django_db
class TestWorkflowJob:
@pytest.fixture
def workflow_job(self, workflow_job_template_factory):
@ -21,7 +24,6 @@ class TestWorkflowJob:
return wfj
@pytest.mark.django_db
def test_inherit_job_template_workflow_nodes(self, mocker, workflow_job):
workflow_job.inherit_job_template_workflow_nodes()
@ -31,4 +33,60 @@ class TestWorkflowJob:
assert nodes[0].failure_nodes.filter(id=nodes[3].id).exists()
assert nodes[3].failure_nodes.filter(id=nodes[4].id).exists()
def test_inherit_ancestor_artifacts_from_job(self, project, mocker):
"""
Assure that nodes along the line of execution inherit artifacts
from both jobs ran, and from the accumulation of old jobs
"""
# Related resources
wfj = WorkflowJob.objects.create(name='test-wf-job')
job = Job.objects.create(name='test-job', artifacts={'b': 43})
# Workflow job nodes
job_node = WorkflowJobNode.objects.create(workflow_job=wfj, job=job,
ancestor_artifacts={'a': 42})
queued_node = WorkflowJobNode.objects.create(workflow_job=wfj)
# Connect old job -> new job
mocker.patch.object(queued_node, 'get_parent_nodes', lambda: [job_node])
assert queued_node.get_job_kwargs()['extra_vars'] == {'a': 42, 'b': 43}
assert queued_node.ancestor_artifacts == {'a': 42, 'b': 43}
def test_inherit_ancestor_artifacts_from_project_update(self, project, mocker):
"""
Test that the existence of a project update (no artifacts) does
not break the flow of ancestor_artifacts
"""
# Related resources
wfj = WorkflowJob.objects.create(name='test-wf-job')
update = ProjectUpdate.objects.create(name='test-update', project=project)
# Workflow job nodes
project_node = WorkflowJobNode.objects.create(workflow_job=wfj, job=update,
ancestor_artifacts={'a': 42, 'b': 43})
queued_node = WorkflowJobNode.objects.create(workflow_job=wfj)
# Connect project update -> new job
mocker.patch.object(queued_node, 'get_parent_nodes', lambda: [project_node])
assert queued_node.get_job_kwargs()['extra_vars'] == {'a': 42, 'b': 43}
assert queued_node.ancestor_artifacts == {'a': 42, 'b': 43}
@pytest.mark.django_db
class TestWorkflowJobTemplate:
@pytest.fixture
def wfjt(self, workflow_job_template_factory):
wfjt = workflow_job_template_factory('test').workflow_job_template
nodes = [WorkflowJobTemplateNode.objects.create(workflow_job_template=wfjt) for i in range(0, 3)]
nodes[0].success_nodes.add(nodes[1])
nodes[1].failure_nodes.add(nodes[2])
return wfjt
def test_node_parentage(self, wfjt):
# test success parent
wfjt_node = wfjt.workflow_job_template_nodes.all()[1]
parent_qs = wfjt_node.get_parent_nodes()
assert len(parent_qs) == 1
assert parent_qs[0] == wfjt.workflow_job_template_nodes.all()[0]
# test failure parent
wfjt_node = wfjt.workflow_job_template_nodes.all()[2]
parent_qs = wfjt_node.get_parent_nodes()
assert len(parent_qs) == 1
assert parent_qs[0] == wfjt.workflow_job_template_nodes.all()[1]

View File

@ -6,6 +6,7 @@ from awx.main.models.workflow import (
WorkflowJobTemplate, WorkflowJobTemplateNode, WorkflowJobInheritNodesMixin,
WorkflowJob, WorkflowJobNode
)
import mock
class TestWorkflowJobInheritNodesMixin():
class TestCreateWorkflowJobNodes():
@ -151,6 +152,7 @@ class TestWorkflowJobCreate:
unified_job_template=wfjt_node_with_prompts.unified_job_template,
workflow_job=workflow_job_unit)
@mock.patch('awx.main.models.workflow.WorkflowNodeBase.get_parent_nodes', lambda self: [])
class TestWorkflowJobNodeJobKWARGS:
"""
Tests for building the keyword arguments that go into creating and

View File

@ -184,6 +184,9 @@ class BaseCallbackModule(object):
if getattr(self, 'ad_hoc_command_id', None):
msg['ad_hoc_command_id'] = self.ad_hoc_command_id
if getattr(self, 'artifact_data', None):
msg['artifact_data'] = self.artifact_data
active_pid = os.getpid()
if self.job_callback_debug:
msg.update({
@ -416,6 +419,9 @@ class JobCallbackModule(BaseCallbackModule):
event_data['task'] = task_name
if role_name and event not in self.EVENTS_WITHOUT_TASK:
event_data['role'] = role_name
self.artifact_data = None
if 'res' in event_data and 'artifact_data' in event_data['res']:
self.artifact_data = event_data['res']['artifact_data']
super(JobCallbackModule, self)._log_event(event, **event_data)
def playbook_on_start(self):

View File

@ -0,0 +1,59 @@
#!/usr/bin/env python
from ansible.module_utils.basic import * # noqa
DOCUMENTATION = '''
---
module: set_artifact
short_description: Stash some Ansible variables for later
description:
- Saves a user-specified JSON dictionary of variables from a playbook
for later use
version_added: "2.2"
options:
requirements: [ ]
author: Alan Rominger
'''
EXAMPLES = '''
# Example fact output:
# Simple specifying of an artifact dictionary, will be passed on callback
- set_artifact:
data:
one_artifact: "{{ local_var * 2 }}"
another_artifact: "{{ some_registered_var.results | map(attribute='ansible_facts.some_fact') | list }}"
# Specifying a local path to save the artifacts to
- set_artifact:
data:
one_artifact: "{{ local_var * 2 }}"
another_artifact: "{{ some_registered_var.results | map(attribute='ansible_facts.some_fact') | list }}"
dest=/tmp/prefix-{{ inventory_hostname }}
host | success >> {
"artifact_data": {}
}
'''
def main():
import json
module = AnsibleModule(
argument_spec = dict(
data=dict(
type='dict',
default={}
)
)
)
results = dict(
changed=True,
artifact_data=json.dumps(module.params.get('data'))
)
module.exit_json(**results)
if __name__ == '__main__':
main()