redact survey password type values w/ tests

This commit is contained in:
Chris Meyers 2015-02-18 07:32:57 -05:00
parent 50e174ea43
commit 1e871f8d7c
13 changed files with 553 additions and 239 deletions

View File

@ -44,7 +44,6 @@ import qsstats
from awx.main.task_engine import TaskSerializer, TASK_FILE
from awx.main.access import get_user_queryset
from awx.main.ha import is_ha_environment
from awx.main.redact import UriCleaner
from awx.api.authentication import JobTaskAuthentication
from awx.api.utils.decorators import paginated
from awx.api.generics import get_view_name
@ -2214,7 +2213,6 @@ class UnifiedJobStdout(RetrieveAPIView):
conv = Ansi2HTMLConverter(scheme=scheme, dark_bg=dark_bg,
title=get_view_name(self.__class__))
content, start, end, absolute_end = unified_job.result_stdout_raw_limited(start_line, end_line)
content = UriCleaner.remove_sensitive(content)
if content_only:
headers = conv.produce_headers()
body = conv.convert(content, full=False) # Escapes any HTML that may be in content.
@ -2231,7 +2229,7 @@ class UnifiedJobStdout(RetrieveAPIView):
return Response({'range': {'start': start, 'end': end, 'absolute_end': absolute_end}, 'content': body})
return Response(data)
elif request.accepted_renderer.format == 'ansi':
return Response(UriCleaner.remove_sensitive(unified_job.result_stdout_raw))
return Response(unified_job.result_stdout_raw)
else:
return super(UnifiedJobStdout, self).retrieve(request, *args, **kwargs)

View File

@ -5,6 +5,7 @@
import hmac
import json
import logging
import re
# Django
from django.conf import settings
@ -23,6 +24,7 @@ from awx.main.models.base import * # noqa
from awx.main.models.unified_jobs import * # noqa
from awx.main.utils import decrypt_field, ignore_inventory_computed_fields
from awx.main.utils import emit_websocket_notification
from awx.main.redact import PlainTextCleaner
logger = logging.getLogger('awx.main.models.jobs')
@ -220,7 +222,7 @@ class JobTemplate(UnifiedJobTemplate, JobOptions):
if survey_element['variable'] not in data and \
survey_element['required']:
errors.append("'%s' value missing" % survey_element['variable'])
elif survey_element['type'] in ["textarea", "text"]:
elif survey_element['type'] in ["textarea", "text", "password"]:
if survey_element['variable'] in data:
if 'min' in survey_element and survey_element['min'] not in ["", None] and len(data[survey_element['variable']]) < survey_element['min']:
errors.append("'%s' value %s is too small (must be at least %s)" %
@ -452,6 +454,31 @@ class Job(UnifiedJob, JobOptions):
evars.update(extra_vars)
self.update_fields(extra_vars=json.dumps(evars))
def _survey_search_and_replace(self, content):
# Use job template survey spec to identify password fields.
# Then lookup password fields in extra_vars and save the values
jt = self.job_template
if jt and jt.survey_enabled and 'spec' in jt.survey_spec:
vars = []
# Get variables that are type password
for survey_element in jt.survey_spec['spec']:
if survey_element['type'] == 'password':
vars.append(survey_element['variable'])
# Use password vars to find in extra_vars
for key in vars:
if key in self.extra_vars_dict:
content = PlainTextCleaner.remove_sensitive(content, self.extra_vars_dict[key])
return content
def _result_stdout_raw_limited(self, *args, **kwargs):
buff, start, end, abs_end = super(Job, self)._result_stdout_raw_limited(*args, **kwargs)
return self._survey_search_and_replace(buff), start, end, abs_end
def _result_stdout_raw(self, *args, **kwargs):
content = super(Job, self)._result_stdout_raw(*args, **kwargs)
return self._survey_search_and_replace(content)
def copy(self):
presets = {}
for kw in self.job_template._get_unified_job_field_names():

View File

@ -625,16 +625,27 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
else:
return StringIO("stdout capture is missing")
def _escape_ascii(self, content):
ansi_escape = re.compile(r'\x1b[^m]*m')
return ansi_escape.sub('', content)
def _result_stdout_raw(self, redact_sensitive=True, escape_ascii=False):
content = self.result_stdout_raw_handle().read()
if redact_sensitive:
content = UriCleaner.remove_sensitive(content)
if escape_ascii:
content = self._escape_ascii(content)
return content
@property
def result_stdout_raw(self):
return self.result_stdout_raw_handle().read()
return self._result_stdout_raw()
@property
def result_stdout(self):
ansi_escape = re.compile(r'\x1b[^m]*m')
return ansi_escape.sub('', UriCleaner.remove_sensitive(self.result_stdout_raw))
return self._result_stdout_raw(escape_ascii=True)
def result_stdout_raw_limited(self, start_line=0, end_line=None):
def _result_stdout_raw_limited(self, start_line=0, end_line=None, redact_sensitive=True, escape_ascii=False):
return_buffer = u""
if end_line is not None:
end_line = int(end_line)
@ -651,12 +662,19 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
end_actual = min(int(end_line), len(stdout_lines))
else:
end_actual = len(stdout_lines)
if redact_sensitive:
return_buffer = UriCleaner.remove_sensitive(return_buffer)
if escape_ascii:
return_buffer = self._escape_ascii(return_buffer)
return return_buffer, start_actual, end_actual, absolute_end
def result_stdout_raw_limited(self, start_line=0, end_line=None):
return self._result_stdout_raw_limited(start_line, end_line)
def result_stdout_limited(self, start_line=0, end_line=None):
ansi_escape = re.compile(r'\x1b[^m]*m')
content, start, end, absolute_end = UriCleaner.remove_sensitive(self.result_stdout_raw_limited(start_line, end_line))
return ansi_escape.sub('', content), start, end, absolute_end
return self._result_stdout_raw_limited(start_line, end_line, escape_ascii=True)
@property
def celery_task(self):
@ -729,9 +747,6 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
def signal_start(self, **kwargs):
"""Notify the task runner system to begin work on this task."""
# Sanity check: If we are running unit tests, then run synchronously.
if getattr(settings, 'CELERY_UNIT_TEST', False):
return self.start(None, **kwargs)
# Sanity check: Are we able to start the job? If not, do not attempt
# to do so.
@ -747,6 +762,10 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
if 'extra_vars' in kwargs:
self.handle_extra_data(kwargs['extra_vars'])
# Sanity check: If we are running unit tests, then run synchronously.
if getattr(settings, 'CELERY_UNIT_TEST', False):
return self.start(None, **kwargs)
# Save the pending status, and inform the SocketIO listener.
self.update_fields(start_args=json.dumps(kwargs), status='pending')
self.socketio_emit_status("pending")

View File

@ -1,8 +1,10 @@
import re
import urlparse
REPLACE_STR = '$encrypted$'
class UriCleaner(object):
REPLACE_STR = '$encrypted$'
REPLACE_STR = REPLACE_STR
# https://regex101.com/r/sV2dO2/2
SENSITIVE_URI_PATTERN = re.compile(ur'(?i)\b((?:https?://|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}/)(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:\'".,<>?\xab\xbb\u201c\u201d\u2018\u2019]))', re.MULTILINE)
@ -51,4 +53,9 @@ class UriCleaner(object):
return redactedtext
class PlainTextCleaner(object):
REPLACE_STR = REPLACE_STR
@staticmethod
def remove_sensitive(cleartext, sensitive):
return re.sub(r'%s' % re.escape(sensitive), '$encrypted$', cleartext)

View File

@ -14,3 +14,4 @@ from awx.main.tests.activity_stream import * # noqa
from awx.main.tests.schedules import * # noqa
from awx.main.tests.redact import * # noqa
from awx.main.tests.views import * # noqa
from awx.main.tests.jobs import *

View File

@ -13,15 +13,17 @@ import tempfile
import time
from multiprocessing import Process
from subprocess import Popen
import re
# PyYAML
import yaml
# Django
import django.test
from django.conf import settings, UserSettingsHolder
from django.contrib.auth.models import User
import django.test
from django.test.client import Client
from django.test.utils import override_settings
# AWX
from awx.main.models import * # noqa
@ -211,17 +213,20 @@ class BaseTestMixin(QueueTestMixin):
def make_organizations(self, created_by, count=1):
results = []
for x in range(0, count):
self.object_ctr = self.object_ctr + 1
results.append(Organization.objects.create(
name="org%s-%s" % (x, self.object_ctr), description="org%s" % x, created_by=created_by
))
results.append(self.make_organization(created_by=created_by, count=x))
return results
def make_organization(self, created_by):
return self.make_organizations(created_by, 1)[0]
def make_organization(self, created_by, count=1):
self.object_ctr = self.object_ctr + 1
return Organization.objects.create(
name="org%s-%s" % (count, self.object_ctr), description="org%s" % count, created_by=created_by
)
def make_project(self, name, description='', created_by=None,
def make_project(self, name=None, description='', created_by=None,
playbook_content='', role_playbooks=None, unicode_prefix=True):
if not name:
name = self.unique_name('Project')
if not os.path.exists(settings.PROJECTS_ROOT):
os.makedirs(settings.PROJECTS_ROOT)
# Create temp project directory.
@ -283,7 +288,7 @@ class BaseTestMixin(QueueTestMixin):
return Inventory.objects.create(name=name or self.unique_name('Inventory'), organization=organization, created_by=created_by)
def make_job_template(self, name=None, created_by=None, organization=None, inventory=None, project=None, playbook=None):
def make_job_template(self, name=None, created_by=None, organization=None, inventory=None, project=None, playbook=None, **kwargs):
created_by = self.decide_created_by(created_by)
if not inventory:
inventory = self.make_inventory(organization=organization, created_by=created_by)
@ -300,24 +305,47 @@ class BaseTestMixin(QueueTestMixin):
if project not in organization.projects.all():
organization.projects.add(project)
return JobTemplate.objects.create(
name=name or self.unique_name('JobTemplate'),
job_type='check',
inventory=inventory,
project=project,
playbook=project.playbooks[0],
host_config_key=settings.SYSTEM_UUID,
created_by=created_by,
)
opts = {
'name' : name or self.unique_name('JobTemplate'),
'job_type': 'check',
'inventory': inventory,
'project': project,
'host_config_key': settings.SYSTEM_UUID,
'created_by': created_by,
'playbook': playbook,
}
opts.update(kwargs)
return JobTemplate.objects.create(**opts)
def make_job(self, job_template=None, created_by=None, inital_state='new'):
def make_job(self, job_template=None, created_by=None, inital_state='new', **kwargs):
created_by = self.decide_created_by(created_by)
if not job_template:
job_template = self.make_job_template(created_by=created_by)
job = job_template.create_job(created_by=created_by)
job.status = inital_state
return job
opts = {
'created_by': created_by,
'status': inital_state,
}
opts.update(kwargs)
return job_template.create_job(**opts)
def make_credential(self, **kwargs):
opts = {
'name': self.unique_name('Credential'),
'kind': 'ssh',
'user': self.super_django_user,
'username': '',
'ssh_key_data': '',
'ssh_key_unlock': '',
'password': '',
'sudo_username': '',
'sudo_password': '',
'su_username': '',
'su_password': '',
'vault_password': '',
}
opts.update(kwargs)
return Credential.objects.create(**opts)
def setup_instances(self):
instance = Instance(uuid=settings.SYSTEM_UUID, primary=True, hostname='127.0.0.1')
@ -419,6 +447,10 @@ class BaseTestMixin(QueueTestMixin):
obj = json.loads(response.content)
elif response['Content-Type'].startswith('application/yaml'):
obj = yaml.safe_load(response.content)
elif response['Content-Type'].startswith('text/plain'):
obj = { 'content': response.content }
elif response['Content-Type'].startswith('text/html'):
obj = { 'content': response.content }
else:
self.fail('Unsupport response content type %s' % response['Content-Type'])
else:
@ -556,12 +588,58 @@ class BaseTestMixin(QueueTestMixin):
msg += 'fields %s not returned ' % ', '.join(not_returned)
self.assertTrue(set(obj.keys()) <= set(fields), msg)
def check_not_found(self, string, substr):
self.assertEqual(string.find(substr), -1, "'%s' found in:\n%s" % (substr, string))
def check_not_found(self, string, substr, description=None, word_boundary=False):
if word_boundary:
count = len(re.findall(r'\b%s\b' % re.escape(substr), string))
else:
count = string.find(substr)
if count == -1:
count = 0
msg = ''
if description:
msg = 'Test "%s".\n' % description
msg += '"%s" found in: "%s"' % (substr, string)
self.assertEqual(count, 0, msg)
def check_found(self, string, substr, count, description=None, word_boundary=False):
if word_boundary:
count_actual = len(re.findall(r'\b%s\b' % re.escape(substr), string))
else:
count_actual = string.count(substr)
msg = ''
if description:
msg = 'Test "%s".\n' % description
msg += 'Found %d occurances of "%s" instead of %d in: "%s"' % (count_actual, substr, count, string)
self.assertEqual(count_actual, count, msg)
def check_job_result(self, job, expected='successful', expect_stdout=True,
expect_traceback=False):
msg = u'job status is %s, expected %s' % (job.status, expected)
msg = u'%s\nargs:\n%s' % (msg, job.job_args)
msg = u'%s\nenv:\n%s' % (msg, job.job_env)
if job.result_traceback:
msg = u'%s\ngot traceback:\n%s' % (msg, job.result_traceback)
if job.result_stdout:
msg = u'%s\ngot stdout:\n%s' % (msg, job.result_stdout)
if isinstance(expected, (list, tuple)):
self.assertTrue(job.status in expected)
else:
self.assertEqual(job.status, expected, msg)
if expect_stdout:
self.assertTrue(job.result_stdout)
else:
self.assertTrue(job.result_stdout in ('', 'stdout capture is missing'),
u'expected no stdout, got:\n%s' %
job.result_stdout)
if expect_traceback:
self.assertTrue(job.result_traceback)
else:
self.assertFalse(job.result_traceback,
u'expected no traceback, got:\n%s' %
job.result_traceback)
def check_found(self, string, substr, count=1):
count_actual = string.count(substr)
self.assertEqual(count_actual, count, "Found %d occurances of '%s' instead of %d in:\n%s" % (count_actual, substr, count, string))
def start_taskmanager(self, command_port):
self.start_redis()
@ -589,6 +667,17 @@ class BaseLiveServerTest(BaseTestMixin, django.test.LiveServerTestCase):
'''
Base class for tests requiring a live test server.
'''
def setUp(self):
super(BaseLiveServerTest, self).setUp()
settings.INTERNAL_API_URL = self.live_server_url
@override_settings(CELERY_ALWAYS_EAGER=True,
CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
ANSIBLE_TRANSPORT='local')
class BaseJobExecutionTest(QueueStartStopTestMixin, BaseLiveServerTest):
'''
Base class for celery task tests.
'''
# Helps with test cases.
# Save all components of a uri (i.e. scheme, username, password, etc.) so that

View File

@ -310,7 +310,6 @@ class CleanupJobsTest(BaseCommandMixin, BaseLiveServerTest):
self.group.hosts.add(self.host)
self.project = None
self.credential = None
settings.INTERNAL_API_URL = self.live_server_url
self.start_queue()
def tearDown(self):
@ -320,18 +319,7 @@ class CleanupJobsTest(BaseCommandMixin, BaseLiveServerTest):
shutil.rmtree(self.test_project_path, True)
def create_test_credential(self, **kwargs):
opts = {
'name': 'test-creds',
'user': self.super_django_user,
'ssh_username': '',
'ssh_key_data': '',
'ssh_key_unlock': '',
'ssh_password': '',
'sudo_username': '',
'sudo_password': '',
}
opts.update(kwargs)
self.credential = Credential.objects.create(**opts)
self.credential = self.make_credential(kwargs)
return self.credential
def create_test_project(self, playbook_content):

View File

@ -0,0 +1,3 @@
from awx.main.tests.jobs.jobs import *
from awx.main.tests.jobs.survey_password import *

View File

@ -27,7 +27,7 @@ from awx.main.models import * # noqa
from awx.main.tests.base import BaseTestMixin
__all__ = ['JobTemplateTest', 'JobTest', 'JobStartCancelTest',
'JobTemplateCallbackTest', 'JobTransactionTest']
'JobTemplateCallbackTest', 'JobTransactionTest', 'JobTemplateSurveyTest']
TEST_PLAYBOOK = '''- hosts: all
gather_facts: false
@ -933,112 +933,6 @@ class JobTemplateTest(BaseJobTestMixin, django.test.TestCase):
# FIXME: Check other credentials and optional fields.
def test_post_job_template_survey(self):
url = reverse('api:job_template_list')
data = dict(
name = 'launched job template',
job_type = PERM_INVENTORY_DEPLOY,
inventory = self.inv_eng.pk,
project = self.proj_dev.pk,
playbook = self.proj_dev.playbooks[0],
credential = self.cred_sue.pk,
survey_enabled = True,
)
with self.current_user(self.user_sue):
response = self.post(url, data, expect=201)
new_jt_id = response['id']
detail_url = reverse('api:job_template_detail',
args=(new_jt_id,))
self.assertEquals(response['url'], detail_url)
url = reverse('api:job_template_survey_spec', args=(new_jt_id,))
with self.current_user(self.user_sue):
response = self.post(url, json.loads(TEST_SIMPLE_REQUIRED_SURVEY), expect=200)
launch_url = reverse('api:job_template_launch', args=(new_jt_id,))
response = self.get(launch_url)
self.assertTrue('favorite_color' in response['variables_needed_to_start'])
response = self.post(launch_url, dict(extra_vars=dict(favorite_color="green")), expect=202)
job = Job.objects.get(pk=response["job"])
job_extra = json.loads(job.extra_vars)
self.assertTrue("favorite_color" in job_extra)
with self.current_user(self.user_sue):
response = self.post(url, json.loads(TEST_SIMPLE_NONREQUIRED_SURVEY), expect=200)
launch_url = reverse('api:job_template_launch', args=(new_jt_id,))
response = self.get(launch_url)
self.assertTrue(len(response['variables_needed_to_start']) == 0)
with self.current_user(self.user_sue):
response = self.post(url, json.loads(TEST_SURVEY_REQUIREMENTS), expect=200)
launch_url = reverse('api:job_template_launch', args=(new_jt_id,))
# Just the required answer should work
self.post(launch_url, dict(extra_vars=dict(reqd_answer="foo")), expect=202)
# Short answer but requires a long answer
self.post(launch_url, dict(extra_vars=dict(long_answer='a', reqd_answer="foo")), expect=400)
# Long answer but requires a short answer
self.post(launch_url, dict(extra_vars=dict(short_answer='thisissomelongtext', reqd_answer="foo")), expect=400)
# Long answer but missing required answer
self.post(launch_url, dict(extra_vars=dict(long_answer='thisissomelongtext')), expect=400)
# Integer that's not big enough
self.post(launch_url, dict(extra_vars=dict(int_answer=0, reqd_answer="foo")), expect=400)
# Integer that's too big
self.post(launch_url, dict(extra_vars=dict(int_answer=10, reqd_answer="foo")), expect=400)
# Integer that's just riiiiight
self.post(launch_url, dict(extra_vars=dict(int_answer=3, reqd_answer="foo")), expect=202)
# Integer bigger than min with no max defined
self.post(launch_url, dict(extra_vars=dict(int_answer_no_max=3, reqd_answer="foo")), expect=202)
# Integer answer that's the wrong type
self.post(launch_url, dict(extra_vars=dict(int_answer="test", reqd_answer="foo")), expect=400)
# Float that's too big
self.post(launch_url, dict(extra_vars=dict(float_answer=10.5, reqd_answer="foo")), expect=400)
# Float that's too small
self.post(launch_url, dict(extra_vars=dict(float_answer=1.995, reqd_answer="foo")), expect=400)
# float that's just riiiiight
self.post(launch_url, dict(extra_vars=dict(float_answer=2.01, reqd_answer="foo")), expect=202)
# float answer that's the wrong type
self.post(launch_url, dict(extra_vars=dict(float_answer="test", reqd_answer="foo")), expect=400)
# Wrong choice in single choice
self.post(launch_url, dict(extra_vars=dict(reqd_answer="foo", single_choice="three")), expect=400)
# Wrong choice in multi choice
self.post(launch_url, dict(extra_vars=dict(reqd_answer="foo", multi_choice=["four"])), expect=400)
# Wrong type for multi choicen
self.post(launch_url, dict(extra_vars=dict(reqd_answer="foo", multi_choice="two")), expect=400)
# Right choice in single choice
self.post(launch_url, dict(extra_vars=dict(reqd_answer="foo", single_choice="two")), expect=202)
# Right choices in multi choice
self.post(launch_url, dict(extra_vars=dict(reqd_answer="foo", multi_choice=["one", "two"])), expect=202)
# Nested json
self.post(launch_url, dict(extra_vars=dict(json_answer=dict(test="val", num=1), reqd_answer="foo")), expect=202)
# Bob can access and update the survey because he's an org-admin
with self.current_user(self.user_bob):
self.post(url, json.loads(TEST_SURVEY_REQUIREMENTS), expect=200)
# Chuck is the lead engineer and has the right permissions to edit it also
with self.current_user(self.user_chuck):
self.post(url, json.loads(TEST_SURVEY_REQUIREMENTS), expect=200)
# Doug shouldn't be able to access this playbook
with self.current_user(self.user_doug):
self.post(url, json.loads(TEST_SURVEY_REQUIREMENTS), expect=403)
# Neither can juan because he doesn't have the job template create permission
with self.current_user(self.user_juan):
self.post(url, json.loads(TEST_SURVEY_REQUIREMENTS), expect=403)
# Bob and chuck can read the template
with self.current_user(self.user_bob):
self.get(url, expect=200)
with self.current_user(self.user_chuck):
self.get(url, expect=200)
# Doug and Juan can't
with self.current_user(self.user_doug):
self.get(url, expect=403)
with self.current_user(self.user_juan):
self.get(url, expect=403)
def test_launch_job_template(self):
url = reverse('api:job_template_list')
data = dict(
@ -1945,3 +1839,115 @@ class JobTransactionTest(BaseJobTestMixin, django.test.LiveServerTestCase):
self.assertEqual(job.status, 'successful', job.result_stdout)
self.assertFalse(errors)
class JobTemplateSurveyTest(BaseJobTestMixin, django.test.TestCase):
def setUp(self):
super(JobTemplateSurveyTest, self).setUp()
def tearDown(self):
super(JobTemplateSurveyTest, self).tearDown()
def test_post_job_template_survey(self):
url = reverse('api:job_template_list')
data = dict(
name = 'launched job template',
job_type = PERM_INVENTORY_DEPLOY,
inventory = self.inv_eng.pk,
project = self.proj_dev.pk,
playbook = self.proj_dev.playbooks[0],
credential = self.cred_sue.pk,
survey_enabled = True,
)
with self.current_user(self.user_sue):
response = self.post(url, data, expect=201)
new_jt_id = response['id']
detail_url = reverse('api:job_template_detail',
args=(new_jt_id,))
self.assertEquals(response['url'], detail_url)
url = reverse('api:job_template_survey_spec', args=(new_jt_id,))
with self.current_user(self.user_sue):
response = self.post(url, json.loads(TEST_SIMPLE_REQUIRED_SURVEY), expect=200)
launch_url = reverse('api:job_template_launch', args=(new_jt_id,))
response = self.get(launch_url)
self.assertTrue('favorite_color' in response['variables_needed_to_start'])
response = self.post(launch_url, dict(extra_vars=dict(favorite_color="green")), expect=202)
job = Job.objects.get(pk=response["job"])
job_extra = json.loads(job.extra_vars)
self.assertTrue("favorite_color" in job_extra)
with self.current_user(self.user_sue):
response = self.post(url, json.loads(TEST_SIMPLE_NONREQUIRED_SURVEY), expect=200)
launch_url = reverse('api:job_template_launch', args=(new_jt_id,))
response = self.get(launch_url)
self.assertTrue(len(response['variables_needed_to_start']) == 0)
with self.current_user(self.user_sue):
response = self.post(url, json.loads(TEST_SURVEY_REQUIREMENTS), expect=200)
launch_url = reverse('api:job_template_launch', args=(new_jt_id,))
# Just the required answer should work
self.post(launch_url, dict(extra_vars=dict(reqd_answer="foo")), expect=202)
# Short answer but requires a long answer
self.post(launch_url, dict(extra_vars=dict(long_answer='a', reqd_answer="foo")), expect=400)
# Long answer but requires a short answer
self.post(launch_url, dict(extra_vars=dict(short_answer='thisissomelongtext', reqd_answer="foo")), expect=400)
# Long answer but missing required answer
self.post(launch_url, dict(extra_vars=dict(long_answer='thisissomelongtext')), expect=400)
# Integer that's not big enough
self.post(launch_url, dict(extra_vars=dict(int_answer=0, reqd_answer="foo")), expect=400)
# Integer that's too big
self.post(launch_url, dict(extra_vars=dict(int_answer=10, reqd_answer="foo")), expect=400)
# Integer that's just riiiiight
self.post(launch_url, dict(extra_vars=dict(int_answer=3, reqd_answer="foo")), expect=202)
# Integer bigger than min with no max defined
self.post(launch_url, dict(extra_vars=dict(int_answer_no_max=3, reqd_answer="foo")), expect=202)
# Integer answer that's the wrong type
self.post(launch_url, dict(extra_vars=dict(int_answer="test", reqd_answer="foo")), expect=400)
# Float that's too big
self.post(launch_url, dict(extra_vars=dict(float_answer=10.5, reqd_answer="foo")), expect=400)
# Float that's too small
self.post(launch_url, dict(extra_vars=dict(float_answer=1.995, reqd_answer="foo")), expect=400)
# float that's just riiiiight
self.post(launch_url, dict(extra_vars=dict(float_answer=2.01, reqd_answer="foo")), expect=202)
# float answer that's the wrong type
self.post(launch_url, dict(extra_vars=dict(float_answer="test", reqd_answer="foo")), expect=400)
# Wrong choice in single choice
self.post(launch_url, dict(extra_vars=dict(reqd_answer="foo", single_choice="three")), expect=400)
# Wrong choice in multi choice
self.post(launch_url, dict(extra_vars=dict(reqd_answer="foo", multi_choice=["four"])), expect=400)
# Wrong type for multi choicen
self.post(launch_url, dict(extra_vars=dict(reqd_answer="foo", multi_choice="two")), expect=400)
# Right choice in single choice
self.post(launch_url, dict(extra_vars=dict(reqd_answer="foo", single_choice="two")), expect=202)
# Right choices in multi choice
self.post(launch_url, dict(extra_vars=dict(reqd_answer="foo", multi_choice=["one", "two"])), expect=202)
# Nested json
self.post(launch_url, dict(extra_vars=dict(json_answer=dict(test="val", num=1), reqd_answer="foo")), expect=202)
# Bob can access and update the survey because he's an org-admin
with self.current_user(self.user_bob):
self.post(url, json.loads(TEST_SURVEY_REQUIREMENTS), expect=200)
# Chuck is the lead engineer and has the right permissions to edit it also
with self.current_user(self.user_chuck):
self.post(url, json.loads(TEST_SURVEY_REQUIREMENTS), expect=200)
# Doug shouldn't be able to access this playbook
with self.current_user(self.user_doug):
self.post(url, json.loads(TEST_SURVEY_REQUIREMENTS), expect=403)
# Neither can juan because he doesn't have the job template create permission
with self.current_user(self.user_juan):
self.post(url, json.loads(TEST_SURVEY_REQUIREMENTS), expect=403)
# Bob and chuck can read the template
with self.current_user(self.user_bob):
self.get(url, expect=200)
with self.current_user(self.user_chuck):
self.get(url, expect=200)
# Doug and Juan can't
with self.current_user(self.user_doug):
self.get(url, expect=403)
with self.current_user(self.user_juan):
self.get(url, expect=403)

View File

@ -0,0 +1,203 @@
# Python
import json
# Django
import django.test
from django.core.urlresolvers import reverse
# AWX
from awx.main.models import * # noqa
from awx.main.tests.base import BaseTest
__all__ = ['SurveyPasswordTest']
PASSWORD="5m/h"
ENCRYPTED_STR='$encrypted$'
TEST_PLAYBOOK = u'''---
- name: test success
hosts: test-group
gather_facts: True
tasks:
- name: should pass
command: echo {{ %s }}
''' % ('spot_speed')
TEST_SIMPLE_SURVEY = '''
{
"name": "Simple",
"description": "Description",
"spec": [
{
"type": "password",
"question_name": "spots speed",
"question_description": "How fast can spot run?",
"variable": "%s",
"choices": "",
"min": "",
"max": "",
"required": false,
"default": "%s"
}
]
}
''' % ('spot_speed', PASSWORD)
TEST_COMPLEX_SURVEY = '''
{
"name": "Simple",
"description": "Description",
"spec": [
{
"type": "password",
"question_name": "spots speed",
"question_description": "How fast can spot run?",
"variable": "spot_speed",
"choices": "",
"min": "",
"max": "",
"required": false,
"default": "0m/h"
},
{
"type": "password",
"question_name": "ssn",
"question_description": "What's your social security number?",
"variable": "ssn",
"choices": "",
"min": "",
"max": "",
"required": false,
"default": "999-99-9999"
},
{
"type": "password",
"question_name": "bday",
"question_description": "What's your birth day?",
"variable": "bday",
"choices": "",
"min": "",
"max": "",
"required": false,
"default": "1/1/1970"
}
]
}
'''
TEST_SINGLE_PASSWORDS = [
{
'description': 'Single instance with a . after',
'text' : 'See spot. See spot run. See spot run %s. That is a fast run.' % PASSWORD,
'passwords': [ PASSWORD ],
'occurances': 1,
},
{
'description': 'Single instance with , after',
'text': 'Spot goes %s, at a fast pace' % PASSWORD,
'passwords': [ PASSWORD ],
'occurances': 1,
},
{
'description': 'Single instance with a space after',
'text': 'Is %s very fast?' % PASSWORD,
'passwords': [ PASSWORD ],
'occurances': 1,
},
{
'description': 'Many instances, also with newline',
'text': 'I think %s is very very fast. If I ran %s for 4 hours how many hours would I run?.\nTrick question. %s for 4 hours would result in running for 4 hours' % (PASSWORD, PASSWORD, PASSWORD),
'passwords': [ PASSWORD ],
'occurances': 3,
},
]
passwd = 'my!@#$%^pass&*()_+'
TEST_SINGLE_PASSWORDS.append({
'description': 'password includes characters not in a-z 0-9 range',
'passwords': [ passwd ],
'text': 'Text is fun yeah with passwords %s.' % passwd,
'occurances': 1
})
# 3 because 3 password fields in spec TEST_COMPLEX_SURVEY
TEST_MULTIPLE_PASSWORDS = []
passwds = [ '65km/s', '545-83-4534', '7/4/2002']
TEST_MULTIPLE_PASSWORDS.append({
'description': '3 different passwords each used once',
'text': 'Spot runs %s. John has an ss of %s and is born on %s.' % (passwds[0], passwds[1], passwds[2]),
'passwords': passwds,
'occurances': 3,
})
TESTS = {
'simple': {
'survey' : json.loads(TEST_SIMPLE_SURVEY),
'tests' : TEST_SINGLE_PASSWORDS,
},
'complex': {
'survey' : json.loads(TEST_COMPLEX_SURVEY),
'tests' : TEST_MULTIPLE_PASSWORDS,
}
}
class SurveyPasswordBaseTest(BaseTest):
def setUp(self):
super(SurveyPasswordBaseTest, self).setUp()
self.setup_instances()
self.setup_users()
def check_passwords_redacted(self, test, response):
self.assertIsNotNone(response['content'])
for password in test['passwords']:
self.check_not_found(response['content'], password, test['description'], word_boundary=True)
self.check_found(response['content'], ENCRYPTED_STR, test['occurances'], test['description'])
def _get_url_job_stdout(self, job):
job_stdout_url = reverse('api:job_stdout', args=(job.pk,))
return self.get(job_stdout_url, expect=200, auth=self.get_super_credentials(), accept='application/json')
class SurveyPasswordTest(SurveyPasswordBaseTest):
def setup_test(self, test_name):
blueprint = TESTS[test_name]
self.tests[test_name] = []
job_template = self.make_job_template(survey_enabled=True, survey_spec=blueprint['survey'])
for test in blueprint['tests']:
test = dict(test)
extra_vars = {}
# build extra_vars from spec variables and passwords
for x in range(0, len(blueprint['survey']['spec'])):
question = blueprint['survey']['spec'][x]
extra_vars[question['variable']] = test['passwords'][x]
job = self.make_job(job_template=job_template)
job.extra_vars = json.dumps(extra_vars)
job.result_stdout_text = test['text']
job.save()
test['job'] = job
self.tests[test_name].append(test)
def setUp(self):
super(SurveyPasswordTest, self).setUp()
self.tests = {}
self.setup_test('simple')
self.setup_test('complex')
# should redact single variable survey
def test_survey_password_redact_simple_survey(self):
for test in self.tests['simple']:
response = self._get_url_job_stdout(test['job'])
self.check_passwords_redacted(test, response)
# should redact multiple variables survey
def test_survey_password_redact_complex_survey(self):
for test in self.tests['complex']:
response = self._get_url_job_stdout(test['job'])
self.check_passwords_redacted(test, response)

View File

@ -87,9 +87,9 @@ class UriCleanTests(BaseTest):
for uri in TEST_URIS:
redacted_str = UriCleaner.remove_sensitive(str(uri))
if uri.username:
self.check_not_found(redacted_str, uri.username)
self.check_not_found(redacted_str, uri.username, uri.description)
if uri.password:
self.check_not_found(redacted_str, uri.password)
self.check_not_found(redacted_str, uri.password, uri.description)
# should replace secret data with safe string, UriCleaner.REPLACE_STR
def test_uri_scm_simple_replaced(self):
@ -107,9 +107,9 @@ class UriCleanTests(BaseTest):
redacted_str = UriCleaner.remove_sensitive(str(uri))
if uri.username:
self.check_not_found(redacted_str, uri.username)
self.check_not_found(redacted_str, uri.username, uri.description)
if uri.password:
self.check_not_found(redacted_str, uri.password)
self.check_not_found(redacted_str, uri.password, uri.description)
# should replace multiple secret data with safe string
def test_uri_scm_multiple_replaced(self):
@ -131,8 +131,8 @@ class UriCleanTests(BaseTest):
for test_data in TEST_CLEARTEXT:
uri = test_data['uri']
redacted_str = UriCleaner.remove_sensitive(test_data['text'])
self.check_not_found(redacted_str, uri.username)
self.check_not_found(redacted_str, uri.password)
self.check_not_found(redacted_str, uri.username, uri.description)
self.check_not_found(redacted_str, uri.password, uri.description)
# Ensure the host didn't get redacted
self.check_found(redacted_str, uri.host, count=test_data['host_occurrences'])
self.check_found(redacted_str, uri.host, test_data['host_occurrences'], uri.description)

View File

@ -21,7 +21,7 @@ from crum import impersonate
# AWX
from awx.main.models import * # noqa
from awx.main.tests.base import BaseLiveServerTest
from awx.main.tests.base import BaseJobExecutionTest
TEST_PLAYBOOK = u'''
- name: test success
@ -341,15 +341,7 @@ L5Hj+B02+FAiz8zVGumbVykvPtzgTb0E+0rJKNO0/EgGqWsk/oC0
TEST_SSH_KEY_DATA_UNLOCK = 'unlockme'
@override_settings(CELERY_ALWAYS_EAGER=True,
CELERY_EAGER_PROPAGATES_EXCEPTIONS=True)
class BaseCeleryTest(BaseLiveServerTest):
'''
Base class for celery task tests.
'''
@override_settings(ANSIBLE_TRANSPORT='local')
class RunJobTest(BaseCeleryTest):
class RunJobTest(BaseJobExecutionTest):
'''
Test cases for RunJob celery task.
'''
@ -371,31 +363,14 @@ class RunJobTest(BaseCeleryTest):
self.credential = None
self.cloud_credential = None
settings.INTERNAL_API_URL = self.live_server_url
self.start_queue()
def tearDown(self):
super(RunJobTest, self).tearDown()
if self.test_project_path:
shutil.rmtree(self.test_project_path, True)
self.terminate_queue()
def create_test_credential(self, **kwargs):
opts = {
'name': 'test-creds',
'kind': 'ssh',
'user': self.super_django_user,
'username': '',
'ssh_key_data': '',
'ssh_key_unlock': '',
'password': '',
'sudo_username': '',
'sudo_password': '',
'su_username': '',
'su_password': '',
'vault_password': '',
}
opts.update(kwargs)
self.credential = Credential.objects.create(**opts)
self.credential = self.make_credential(**kwargs)
return self.credential
def create_test_cloud_credential(self, **kwargs):
@ -429,7 +404,7 @@ class RunJobTest(BaseCeleryTest):
except (AttributeError, IndexError):
pass
opts.update(kwargs)
self.job_template = JobTemplate.objects.create(**opts)
self.job_template = self.make_job_template(**opts)
return self.job_template
def create_test_job(self, **kwargs):
@ -453,32 +428,6 @@ class RunJobTest(BaseCeleryTest):
self.job = Job.objects.create(**opts)
return self.job
def check_job_result(self, job, expected='successful', expect_stdout=True,
expect_traceback=False):
msg = u'job status is %s, expected %s' % (job.status, expected)
msg = u'%s\nargs:\n%s' % (msg, job.job_args)
msg = u'%s\nenv:\n%s' % (msg, job.job_env)
if job.result_traceback:
msg = u'%s\ngot traceback:\n%s' % (msg, job.result_traceback)
if job.result_stdout:
msg = u'%s\ngot stdout:\n%s' % (msg, job.result_stdout)
if isinstance(expected, (list, tuple)):
self.assertTrue(job.status in expected)
else:
self.assertEqual(job.status, expected, msg)
if expect_stdout:
self.assertTrue(job.result_stdout)
else:
self.assertTrue(job.result_stdout in ('', 'stdout capture is missing'),
u'expected no stdout, got:\n%s' %
job.result_stdout)
if expect_traceback:
self.assertTrue(job.result_traceback)
else:
self.assertFalse(job.result_traceback,
u'expected no traceback, got:\n%s' %
job.result_traceback)
def check_job_events(self, job, runner_status='ok', plays=1, tasks=1,
async=False, async_timeout=False, async_nowait=False,
check_ignore_errors=False, async_tasks=0,

View File

@ -5,29 +5,30 @@ from django.core.urlresolvers import reverse
from awx.main.tests.base import BaseLiveServerTest, QueueStartStopTestMixin
from awx.main.tests.base import URI
__all__ = ['UnifiedJobStdoutTests']
__all__ = ['UnifiedJobStdoutRedactedTests']
TEST_STDOUTS = []
uri = URI(scheme="https", username="Dhh3U47nmC26xk9PKscV", password="PXPfWW8YzYrgS@E5NbQ2H@", host="github.ginger.com/theirrepo.git/info/refs")
TEST_STDOUTS.append({
'description': 'uri in a plain text document',
'uri' : uri,
'text' : 'hello world %s goodbye world' % uri,
'host_occurrences' : 1
'occurrences' : 1
})
uri = URI(scheme="https", username="applepie@@@", password="thatyouknow@@@@", host="github.ginger.com/theirrepo.git/info/refs")
TEST_STDOUTS.append({
'description': 'uri appears twice in a multiline plain text document',
'uri' : uri,
'text' : 'hello world %s \n\nyoyo\n\nhello\n%s' % (uri, uri),
'host_occurrences' : 2
'occurrences' : 2
})
class UnifiedJobStdoutTests(BaseLiveServerTest, QueueStartStopTestMixin):
class UnifiedJobStdoutRedactedTests(BaseLiveServerTest, QueueStartStopTestMixin):
def setUp(self):
super(UnifiedJobStdoutTests, self).setUp()
super(UnifiedJobStdoutRedactedTests, self).setUp()
self.setup_instances()
self.setup_users()
self.test_cases = []
@ -40,15 +41,38 @@ class UnifiedJobStdoutTests(BaseLiveServerTest, QueueStartStopTestMixin):
# This is more of a functional test than a unit test.
# should filter out username and password
def test_redaction_enabled(self):
def check_sensitive_redacted(self, test_data, response):
uri = test_data['uri']
self.assertIsNotNone(response['content'])
self.check_not_found(response['content'], uri.username, test_data['description'])
self.check_not_found(response['content'], uri.password, test_data['description'])
# Ensure the host didn't get redacted
self.check_found(response['content'], uri.host, test_data['occurrences'], test_data['description'])
def _get_url_job_stdout(self, job, format='json'):
formats = {
'json': 'application/json',
'ansi': 'text/plain',
'txt': 'text/plain',
'html': 'text/html',
}
content_type = formats[format]
job_stdout_url = reverse('api:job_stdout', args=(job.pk,)) + "?format=" + format
return self.get(job_stdout_url, expect=200, auth=self.get_super_credentials(), accept=content_type)
def _test_redaction_enabled(self, format):
for test_data in self.test_cases:
uri = test_data['uri']
job_stdout_url = reverse('api:job_stdout', args=(test_data['job'].pk,))
response = self._get_url_job_stdout(test_data['job'], format=format)
self.check_sensitive_redacted(test_data, response)
response = self.get(job_stdout_url, expect=200, auth=self.get_super_credentials(), accept='application/json')
def test_redaction_enabled_json(self):
self._test_redaction_enabled('json')
self.assertIsNotNone(response['content'])
self.check_not_found(response['content'], uri.username)
self.check_not_found(response['content'], uri.password)
# Ensure the host didn't get redacted
self.check_found(response['content'], uri.host, count=test_data['host_occurrences'])
def test_redaction_enabled_ansi(self):
self._test_redaction_enabled('ansi')
def test_redaction_enabled_html(self):
self._test_redaction_enabled('html')
def test_redaction_enabled_txt(self):
self._test_redaction_enabled('txt')