mirror of
https://github.com/ansible/awx.git
synced 2026-04-09 03:59:21 -02:30
Merge pull request #51 from chrismeyersfsu/improvement-stdout_sensitive_data_removal
added uri redaction to stdout
This commit is contained in:
@@ -50,6 +50,7 @@ from awx.main.models import *
|
|||||||
from awx.main.utils import *
|
from awx.main.utils import *
|
||||||
from awx.main.access import get_user_queryset
|
from awx.main.access import get_user_queryset
|
||||||
from awx.main.ha import is_ha_environment
|
from awx.main.ha import is_ha_environment
|
||||||
|
from awx.main.redact import UriCleaner
|
||||||
from awx.api.authentication import JobTaskAuthentication
|
from awx.api.authentication import JobTaskAuthentication
|
||||||
from awx.api.permissions import *
|
from awx.api.permissions import *
|
||||||
from awx.api.renderers import *
|
from awx.api.renderers import *
|
||||||
@@ -2232,12 +2233,12 @@ class UnifiedJobStdout(RetrieveAPIView):
|
|||||||
scheme = 'ansi2html'
|
scheme = 'ansi2html'
|
||||||
dark_val = request.QUERY_PARAMS.get('dark', '')
|
dark_val = request.QUERY_PARAMS.get('dark', '')
|
||||||
dark = bool(dark_val and dark_val[0].lower() in ('1', 't', 'y'))
|
dark = bool(dark_val and dark_val[0].lower() in ('1', 't', 'y'))
|
||||||
content_only = bool(request.accepted_renderer.format == 'api' or \
|
content_only = bool(request.accepted_renderer.format in ('api', 'json'))
|
||||||
request.accepted_renderer.format == 'json')
|
|
||||||
dark_bg = (content_only and dark) or (not content_only and (dark or not dark_val))
|
dark_bg = (content_only and dark) or (not content_only and (dark or not dark_val))
|
||||||
conv = Ansi2HTMLConverter(scheme=scheme, dark_bg=dark_bg,
|
conv = Ansi2HTMLConverter(scheme=scheme, dark_bg=dark_bg,
|
||||||
title=get_view_name(self.__class__))
|
title=get_view_name(self.__class__))
|
||||||
content, start, end, absolute_end = unified_job.result_stdout_raw_limited(start_line, end_line)
|
content, start, end, absolute_end = unified_job.result_stdout_raw_limited(start_line, end_line)
|
||||||
|
content = UriCleaner.remove_sensitive(content)
|
||||||
if content_only:
|
if content_only:
|
||||||
headers = conv.produce_headers()
|
headers = conv.produce_headers()
|
||||||
body = conv.convert(content, full=False) # Escapes any HTML that may be in content.
|
body = conv.convert(content, full=False) # Escapes any HTML that may be in content.
|
||||||
|
|||||||
61
awx/main/redact.py
Normal file
61
awx/main/redact.py
Normal file
@@ -0,0 +1,61 @@
|
|||||||
|
import re
|
||||||
|
import urlparse
|
||||||
|
|
||||||
|
class UriCleaner(object):
|
||||||
|
REPLACE_STR = '$encrypted$'
|
||||||
|
# https://regex101.com/r/sV2dO2/2
|
||||||
|
SENSITIVE_URI_PATTERN = re.compile(ur'(?i)\b((?:https?://|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}/)(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:\'".,<>?\xab\xbb\u201c\u201d\u2018\u2019]))', re.MULTILINE)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def remove_sensitive(cleartext):
|
||||||
|
redactedtext = cleartext
|
||||||
|
text_index = 0
|
||||||
|
while True:
|
||||||
|
match = UriCleaner.SENSITIVE_URI_PATTERN.search(redactedtext, text_index)
|
||||||
|
if not match:
|
||||||
|
break
|
||||||
|
o = urlparse.urlsplit(match.group(1))
|
||||||
|
if not o.username and not o.password:
|
||||||
|
flag_continue = False
|
||||||
|
if o.netloc:
|
||||||
|
# Handle the special case url http://username:password that can appear in SCM url
|
||||||
|
# on account of a bug? in ansible redaction
|
||||||
|
try:
|
||||||
|
(username, password) = o.netloc.split(':')
|
||||||
|
except ValueError as e:
|
||||||
|
flag_continue = True
|
||||||
|
pass
|
||||||
|
|
||||||
|
if flag_continue:
|
||||||
|
text_index += len(match.group(1))
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
username = o.username
|
||||||
|
password = o.password
|
||||||
|
|
||||||
|
|
||||||
|
# Given a python MatchObject, with respect to redactedtext, find and
|
||||||
|
# replace the first occurance of username and the first and second
|
||||||
|
# occurance of password
|
||||||
|
|
||||||
|
# TODO: Ideally, we would replace username and password using the index
|
||||||
|
# that they were found at.
|
||||||
|
uri_str = redactedtext[match.start():match.end()]
|
||||||
|
if username:
|
||||||
|
uri_str = uri_str.replace(username, UriCleaner.REPLACE_STR, 1)
|
||||||
|
# 2, just in case the password is $encrypted$
|
||||||
|
if password:
|
||||||
|
uri_str = uri_str.replace(password, UriCleaner.REPLACE_STR, 2)
|
||||||
|
|
||||||
|
t = redactedtext[:match.start()] + uri_str
|
||||||
|
text_index = len(t)
|
||||||
|
if (match.end() < len(redactedtext)):
|
||||||
|
t += redactedtext[match.end():]
|
||||||
|
|
||||||
|
redactedtext = t
|
||||||
|
if text_index >= len(redactedtext):
|
||||||
|
text_index = len(redactedtext) - 1
|
||||||
|
|
||||||
|
return redactedtext
|
||||||
|
|
||||||
|
|
||||||
@@ -12,3 +12,5 @@ from awx.main.tests.licenses import LicenseTests
|
|||||||
from awx.main.tests.jobs import *
|
from awx.main.tests.jobs import *
|
||||||
from awx.main.tests.activity_stream import *
|
from awx.main.tests.activity_stream import *
|
||||||
from awx.main.tests.schedules import *
|
from awx.main.tests.schedules import *
|
||||||
|
from awx.main.tests.redact import *
|
||||||
|
from awx.main.tests.views import *
|
||||||
@@ -33,8 +33,51 @@ from awx.main.management.commands.run_task_system import run_taskmanager
|
|||||||
from awx.main.utils import get_ansible_version
|
from awx.main.utils import get_ansible_version
|
||||||
from awx.main.task_engine import TaskEngager as LicenseWriter
|
from awx.main.task_engine import TaskEngager as LicenseWriter
|
||||||
|
|
||||||
|
TEST_PLAYBOOK = '''- hosts: mygroup
|
||||||
|
gather_facts: false
|
||||||
|
tasks:
|
||||||
|
- name: woohoo
|
||||||
|
command: test 1 = 1
|
||||||
|
'''
|
||||||
|
|
||||||
class BaseTestMixin(object):
|
class QueueTestMixin(object):
|
||||||
|
def start_queue(self):
|
||||||
|
self.start_redis()
|
||||||
|
receiver = CallbackReceiver()
|
||||||
|
self.queue_process = Process(target=receiver.run_subscriber,
|
||||||
|
args=(False,))
|
||||||
|
self.queue_process.start()
|
||||||
|
|
||||||
|
def terminate_queue(self):
|
||||||
|
if hasattr(self, 'queue_process'):
|
||||||
|
self.queue_process.terminate()
|
||||||
|
self.stop_redis()
|
||||||
|
|
||||||
|
def start_redis(self):
|
||||||
|
if not getattr(self, 'redis_process', None):
|
||||||
|
self.redis_process = Popen('redis-server --port 16379 > /dev/null',
|
||||||
|
shell=True, executable='/bin/bash')
|
||||||
|
|
||||||
|
def stop_redis(self):
|
||||||
|
if getattr(self, 'redis_process', None):
|
||||||
|
self.redis_process.kill()
|
||||||
|
self.redis_process = None
|
||||||
|
|
||||||
|
|
||||||
|
# The observed effect of not calling terminate_queue() if you call start_queue() are
|
||||||
|
# an hang on test cleanup database delete. Thus, to ensure terminate_queue() is called
|
||||||
|
# whenever start_queue() is called just inherit from this class when you want to use the queue.
|
||||||
|
class QueueStartStopTestMixin(QueueTestMixin):
|
||||||
|
def setUp(self):
|
||||||
|
super(QueueStartStopTestMixin, self).setUp()
|
||||||
|
self.start_queue()
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
super(QueueStartStopTestMixin, self).tearDown()
|
||||||
|
self.terminate_queue()
|
||||||
|
|
||||||
|
|
||||||
|
class BaseTestMixin(QueueTestMixin):
|
||||||
'''
|
'''
|
||||||
Mixin with shared code for use by all test cases.
|
Mixin with shared code for use by all test cases.
|
||||||
'''
|
'''
|
||||||
@@ -117,6 +160,10 @@ class BaseTestMixin(object):
|
|||||||
# Restore previous settings after each test.
|
# Restore previous settings after each test.
|
||||||
settings._wrapped = self._wrapped
|
settings._wrapped = self._wrapped
|
||||||
|
|
||||||
|
def unique_name(self, string):
|
||||||
|
rnd_str = '____' + str(random.randint(1, 9999999))
|
||||||
|
return __name__ + '-generated-' + string + rnd_str
|
||||||
|
|
||||||
def create_test_license_file(self, instance_count=10000):
|
def create_test_license_file(self, instance_count=10000):
|
||||||
writer = LicenseWriter(
|
writer = LicenseWriter(
|
||||||
company_name='AWX',
|
company_name='AWX',
|
||||||
@@ -173,6 +220,9 @@ class BaseTestMixin(object):
|
|||||||
))
|
))
|
||||||
return results
|
return results
|
||||||
|
|
||||||
|
def make_organization(self, created_by):
|
||||||
|
return self.make_organizations(created_by, 1)[0]
|
||||||
|
|
||||||
def make_project(self, name, description='', created_by=None,
|
def make_project(self, name, description='', created_by=None,
|
||||||
playbook_content='', role_playbooks=None, unicode_prefix=True):
|
playbook_content='', role_playbooks=None, unicode_prefix=True):
|
||||||
if not os.path.exists(settings.PROJECTS_ROOT):
|
if not os.path.exists(settings.PROJECTS_ROOT):
|
||||||
@@ -222,6 +272,56 @@ class BaseTestMixin(object):
|
|||||||
))
|
))
|
||||||
return results
|
return results
|
||||||
|
|
||||||
|
def decide_created_by(self, created_by=None):
|
||||||
|
if created_by:
|
||||||
|
return created_by
|
||||||
|
if self.super_django_user:
|
||||||
|
return self.super_django_user
|
||||||
|
raise RuntimeError('please call setup_users() or specify a user')
|
||||||
|
|
||||||
|
def make_inventory(self, organization=None, name=None, created_by=None):
|
||||||
|
created_by = self.decide_created_by(created_by)
|
||||||
|
if not organization:
|
||||||
|
organization = self.make_organization(created_by=created_by)
|
||||||
|
|
||||||
|
return Inventory.objects.create(name=name or self.unique_name('Inventory'), organization=organization, created_by=created_by)
|
||||||
|
|
||||||
|
def make_job_template(self, name=None, created_by=None, organization=None, inventory=None, project=None, playbook=None):
|
||||||
|
created_by = self.decide_created_by(created_by)
|
||||||
|
if not inventory:
|
||||||
|
inventory = self.make_inventory(organization=organization, created_by=created_by)
|
||||||
|
if not organization:
|
||||||
|
organization = inventory.organization
|
||||||
|
if not project:
|
||||||
|
project = self.make_project(self.unique_name('Project'), created_by=created_by, playbook_content=playbook if playbook else TEST_PLAYBOOK)
|
||||||
|
|
||||||
|
if project and project.playbooks and len(project.playbooks) > 0:
|
||||||
|
playbook = project.playbooks[0]
|
||||||
|
else:
|
||||||
|
raise RuntimeError('Expected project to have at least one playbook')
|
||||||
|
|
||||||
|
if project not in organization.projects.all():
|
||||||
|
organization.projects.add(project)
|
||||||
|
|
||||||
|
return JobTemplate.objects.create(
|
||||||
|
name=name or self.unique_name('JobTemplate'),
|
||||||
|
job_type='check',
|
||||||
|
inventory=inventory,
|
||||||
|
project=project,
|
||||||
|
playbook=project.playbooks[0],
|
||||||
|
host_config_key=settings.SYSTEM_UUID,
|
||||||
|
created_by=created_by,
|
||||||
|
)
|
||||||
|
|
||||||
|
def make_job(self, job_template=None, created_by=None, inital_state='new'):
|
||||||
|
created_by = self.decide_created_by(created_by)
|
||||||
|
if not job_template:
|
||||||
|
job_template = self.make_job_template(created_by=created_by)
|
||||||
|
|
||||||
|
job = job_template.create_job(created_by=created_by)
|
||||||
|
job.status = inital_state
|
||||||
|
return job
|
||||||
|
|
||||||
def setup_instances(self):
|
def setup_instances(self):
|
||||||
instance = Instance(uuid=settings.SYSTEM_UUID, primary=True, hostname='127.0.0.1')
|
instance = Instance(uuid=settings.SYSTEM_UUID, primary=True, hostname='127.0.0.1')
|
||||||
instance.save()
|
instance.save()
|
||||||
@@ -457,6 +557,13 @@ class BaseTestMixin(object):
|
|||||||
msg += 'fields %s not returned ' % ', '.join(not_returned)
|
msg += 'fields %s not returned ' % ', '.join(not_returned)
|
||||||
self.assertTrue(set(obj.keys()) <= set(fields), msg)
|
self.assertTrue(set(obj.keys()) <= set(fields), msg)
|
||||||
|
|
||||||
|
def check_not_found(self, string, substr):
|
||||||
|
self.assertEqual(string.find(substr), -1, "'%s' found in:\n%s" % (substr, string))
|
||||||
|
|
||||||
|
def check_found(self, string, substr, count=1):
|
||||||
|
count_actual = string.count(substr)
|
||||||
|
self.assertEqual(count_actual, count, "Found %d occurances of '%s' instead of %d in:\n%s" % (count_actual, substr, count, string))
|
||||||
|
|
||||||
def start_taskmanager(self, command_port):
|
def start_taskmanager(self, command_port):
|
||||||
self.start_redis()
|
self.start_redis()
|
||||||
self.taskmanager_process = Process(target=run_taskmanager,
|
self.taskmanager_process = Process(target=run_taskmanager,
|
||||||
@@ -468,29 +575,6 @@ class BaseTestMixin(object):
|
|||||||
self.taskmanager_process.terminate()
|
self.taskmanager_process.terminate()
|
||||||
self.stop_redis()
|
self.stop_redis()
|
||||||
|
|
||||||
def start_queue(self):
|
|
||||||
self.start_redis()
|
|
||||||
receiver = CallbackReceiver()
|
|
||||||
self.queue_process = Process(target=receiver.run_subscriber,
|
|
||||||
args=(False,))
|
|
||||||
self.queue_process.start()
|
|
||||||
|
|
||||||
def terminate_queue(self):
|
|
||||||
if hasattr(self, 'queue_process'):
|
|
||||||
self.queue_process.terminate()
|
|
||||||
self.stop_redis()
|
|
||||||
|
|
||||||
def start_redis(self):
|
|
||||||
if not getattr(self, 'redis_process', None):
|
|
||||||
self.redis_process = Popen('redis-server --port 16379 > /dev/null',
|
|
||||||
shell=True, executable='/bin/bash')
|
|
||||||
|
|
||||||
def stop_redis(self):
|
|
||||||
if getattr(self, 'redis_process', None):
|
|
||||||
self.redis_process.kill()
|
|
||||||
self.redis_process = None
|
|
||||||
|
|
||||||
|
|
||||||
class BaseTest(BaseTestMixin, django.test.TestCase):
|
class BaseTest(BaseTestMixin, django.test.TestCase):
|
||||||
'''
|
'''
|
||||||
Base class for unit tests.
|
Base class for unit tests.
|
||||||
@@ -506,3 +590,47 @@ class BaseLiveServerTest(BaseTestMixin, django.test.LiveServerTestCase):
|
|||||||
'''
|
'''
|
||||||
Base class for tests requiring a live test server.
|
Base class for tests requiring a live test server.
|
||||||
'''
|
'''
|
||||||
|
|
||||||
|
# Helps with test cases.
|
||||||
|
# Save all components of a uri (i.e. scheme, username, password, etc.) so that
|
||||||
|
# when we construct a uri string and decompose it, we can verify the decomposition
|
||||||
|
class URI(object):
|
||||||
|
DEFAULTS = {
|
||||||
|
'scheme' : 'http',
|
||||||
|
'username' : 'MYUSERNAME',
|
||||||
|
'password' : 'MYPASSWORD',
|
||||||
|
'host' : 'host.com',
|
||||||
|
}
|
||||||
|
|
||||||
|
def __init__(self, description='N/A', scheme=DEFAULTS['scheme'], username=DEFAULTS['username'], password=DEFAULTS['password'], host=DEFAULTS['host']):
|
||||||
|
self.description = description
|
||||||
|
self.scheme = scheme
|
||||||
|
self.username = username
|
||||||
|
self.password = password
|
||||||
|
self.host = host
|
||||||
|
|
||||||
|
def get_uri(self):
|
||||||
|
uri = "%s://" % self.scheme
|
||||||
|
if self.username:
|
||||||
|
uri += "%s" % self.username
|
||||||
|
if self.password:
|
||||||
|
uri += ":%s" % self.password
|
||||||
|
if (self.username or self.password) and self.host is not None:
|
||||||
|
uri += "@%s" % self.host
|
||||||
|
elif self.host is not None:
|
||||||
|
uri += "%s" % self.host
|
||||||
|
return uri
|
||||||
|
|
||||||
|
def get_secret_count(self):
|
||||||
|
secret_count = 0
|
||||||
|
if self.username:
|
||||||
|
secret_count += 1
|
||||||
|
if self.password:
|
||||||
|
secret_count += 1
|
||||||
|
return secret_count
|
||||||
|
|
||||||
|
def __string__(self):
|
||||||
|
return self.get_uri()
|
||||||
|
def __repr__(self):
|
||||||
|
return self.get_uri()
|
||||||
|
|
||||||
|
|||||||
@@ -219,11 +219,6 @@ class BaseJobTestMixin(BaseTestMixin):
|
|||||||
group.hosts.add(host)
|
group.hosts.add(host)
|
||||||
return inventory
|
return inventory
|
||||||
|
|
||||||
def make_job(self, job_template, created_by, inital_state='new'):
|
|
||||||
j_actual = job_template.create_job(created_by=created_by)
|
|
||||||
j_actual.status = inital_state
|
|
||||||
return j_actual
|
|
||||||
|
|
||||||
def populate(self):
|
def populate(self):
|
||||||
# Here's a little story about the Ansible Bread Company, or ABC. They
|
# Here's a little story about the Ansible Bread Company, or ABC. They
|
||||||
# make machines that make bread - bakers, slicers, and packagers - and
|
# make machines that make bread - bakers, slicers, and packagers - and
|
||||||
|
|||||||
138
awx/main/tests/redact.py
Normal file
138
awx/main/tests/redact.py
Normal file
@@ -0,0 +1,138 @@
|
|||||||
|
|
||||||
|
import textwrap
|
||||||
|
|
||||||
|
# AWX
|
||||||
|
from awx.main.redact import UriCleaner
|
||||||
|
from awx.main.tests.base import BaseTest, URI
|
||||||
|
|
||||||
|
__all__ = ['UriCleanTests']
|
||||||
|
|
||||||
|
TEST_URIS = [
|
||||||
|
URI('no host', scheme='https', username='myusername', password='mypass', host=None),
|
||||||
|
URI('no host', scheme='https', username='myusername', password='mypass*********', host=None),
|
||||||
|
URI('http', scheme='http'),
|
||||||
|
URI('https', scheme='https'),
|
||||||
|
URI('no password', password=''),
|
||||||
|
URI('no host', host=''),
|
||||||
|
URI('host with port', host='host.com:22'),
|
||||||
|
URI('host with @', host='host.com:22'),
|
||||||
|
URI('host with @, password with @ at the end', password='mypasswordwith@', host='@host.com'),
|
||||||
|
URI('no host, with space', host=' '),
|
||||||
|
URI('password is a space', password='%20%20'),
|
||||||
|
URI('no password field', password=None),
|
||||||
|
URI('no username no password', username=None, password=None),
|
||||||
|
URI('password with @ at the end', password='mypasswordwitha@'),
|
||||||
|
URI('password with @@ at the end', password='mypasswordwitha@@'),
|
||||||
|
URI('password with @@@ at the end', password='mypasswordwitha@@@'),
|
||||||
|
URI('password with @@@@ at the end', password='mypasswordwitha@@@@'),
|
||||||
|
URI('password with @a@@ at the end', password='mypasswordwitha@a@@'),
|
||||||
|
URI('password with @@@a at the end', password='mypasswordwitha@@@a'),
|
||||||
|
URI('password with a @ in the middle', password='pa@ssword'),
|
||||||
|
URI('url with @', password='pa@ssword', host='googly.com/whatever@#$:stuff@.com/'),
|
||||||
|
]
|
||||||
|
|
||||||
|
TEST_CLEARTEXT = []
|
||||||
|
# Arguably, this is a regression test given the below data.
|
||||||
|
# regression data https://trello.com/c/cdUELgVY/
|
||||||
|
uri = URI(scheme="https", username="myusername", password="mypasswordwith%40", host="nonexistant.ansible.com/ansible.git/")
|
||||||
|
TEST_CLEARTEXT.append({
|
||||||
|
'uri' : uri,
|
||||||
|
'text' : textwrap.dedent("""\
|
||||||
|
PLAY [all] ********************************************************************
|
||||||
|
|
||||||
|
TASK: [delete project directory before update] ********************************
|
||||||
|
skipping: [localhost]
|
||||||
|
|
||||||
|
TASK: [update project using git and accept hostkey] ***************************
|
||||||
|
skipping: [localhost]
|
||||||
|
|
||||||
|
TASK: [update project using git] **********************************************
|
||||||
|
failed: [localhost] => {"cmd": "/usr/bin/git ls-remote https://%s:%s -h refs/heads/HEAD", "failed": true, "rc": 128}
|
||||||
|
stderr: fatal: unable to access '%s': Could not resolve host: nonexistant.ansible.com
|
||||||
|
|
||||||
|
msg: fatal: unable to access '%s': Could not resolve host: nonexistant.ansible.com
|
||||||
|
|
||||||
|
FATAL: all hosts have already failed -- aborting
|
||||||
|
|
||||||
|
PLAY RECAP ********************************************************************
|
||||||
|
to retry, use: --limit @/root/project_update.retry
|
||||||
|
|
||||||
|
localhost : ok=0 changed=0 unreachable=0 failed=1
|
||||||
|
|
||||||
|
""" % (uri.username, uri.password, str(uri), str(uri))),
|
||||||
|
'host_occurrences' : 2
|
||||||
|
})
|
||||||
|
|
||||||
|
uri = URI(scheme="https", username="Dhh3U47nmC26xk9PKscV", password="PXPfWW8YzYrgS@E5NbQ2H@", host="github.ginger.com/theirrepo.git/info/refs")
|
||||||
|
TEST_CLEARTEXT.append({
|
||||||
|
'uri' : uri,
|
||||||
|
'text' : textwrap.dedent("""\
|
||||||
|
TASK: [update project using git] **
|
||||||
|
failed: [localhost] => {"cmd": "/usr/bin/git ls-remote https://REDACTED:********", "failed": true, "rc": 128}
|
||||||
|
stderr: error: Couldn't resolve host '@%s' while accessing %s
|
||||||
|
|
||||||
|
fatal: HTTP request failed
|
||||||
|
|
||||||
|
msg: error: Couldn't resolve host '@%s' while accessing %s
|
||||||
|
|
||||||
|
fatal: HTTP request failed
|
||||||
|
""" % (uri.host, str(uri), uri.host, str(uri))),
|
||||||
|
'host_occurrences' : 4
|
||||||
|
})
|
||||||
|
|
||||||
|
class UriCleanTests(BaseTest):
|
||||||
|
|
||||||
|
# should redact sensitive usernames and passwords
|
||||||
|
def test_uri_scm_simple_redacted(self):
|
||||||
|
for uri in TEST_URIS:
|
||||||
|
redacted_str = UriCleaner.remove_sensitive(str(uri))
|
||||||
|
if uri.username:
|
||||||
|
self.check_not_found(redacted_str, uri.username)
|
||||||
|
if uri.password:
|
||||||
|
self.check_not_found(redacted_str, uri.password)
|
||||||
|
|
||||||
|
# should replace secret data with safe string, UriCleaner.REPLACE_STR
|
||||||
|
def test_uri_scm_simple_replaced(self):
|
||||||
|
for uri in TEST_URIS:
|
||||||
|
redacted_str = UriCleaner.remove_sensitive(str(uri))
|
||||||
|
self.check_found(redacted_str, UriCleaner.REPLACE_STR, uri.get_secret_count())
|
||||||
|
|
||||||
|
# should redact multiple uris in text
|
||||||
|
def test_uri_scm_multiple(self):
|
||||||
|
cleartext = ''
|
||||||
|
for uri in TEST_URIS:
|
||||||
|
cleartext += str(uri) + ' '
|
||||||
|
for uri in TEST_URIS:
|
||||||
|
cleartext += str(uri) + '\n'
|
||||||
|
|
||||||
|
redacted_str = UriCleaner.remove_sensitive(str(uri))
|
||||||
|
if uri.username:
|
||||||
|
self.check_not_found(redacted_str, uri.username)
|
||||||
|
if uri.password:
|
||||||
|
self.check_not_found(redacted_str, uri.password)
|
||||||
|
|
||||||
|
# should replace multiple secret data with safe string
|
||||||
|
def test_uri_scm_multiple_replaced(self):
|
||||||
|
cleartext = ''
|
||||||
|
find_count = 0
|
||||||
|
for uri in TEST_URIS:
|
||||||
|
cleartext += str(uri) + ' '
|
||||||
|
find_count += uri.get_secret_count()
|
||||||
|
|
||||||
|
for uri in TEST_URIS:
|
||||||
|
cleartext += str(uri) + '\n'
|
||||||
|
find_count += uri.get_secret_count()
|
||||||
|
|
||||||
|
redacted_str = UriCleaner.remove_sensitive(cleartext)
|
||||||
|
self.check_found(redacted_str, UriCleaner.REPLACE_STR, find_count)
|
||||||
|
|
||||||
|
# should redact and replace multiple secret data within a complex cleartext blob
|
||||||
|
def test_uri_scm_cleartext_redact_and_replace(self):
|
||||||
|
for test_data in TEST_CLEARTEXT:
|
||||||
|
uri = test_data['uri']
|
||||||
|
redacted_str = UriCleaner.remove_sensitive(test_data['text'])
|
||||||
|
self.check_not_found(redacted_str, uri.username)
|
||||||
|
self.check_not_found(redacted_str, uri.password)
|
||||||
|
# Ensure the host didn't get redacted
|
||||||
|
self.check_found(redacted_str, uri.host, count=test_data['host_occurrences'])
|
||||||
|
|
||||||
54
awx/main/tests/views.py
Normal file
54
awx/main/tests/views.py
Normal file
@@ -0,0 +1,54 @@
|
|||||||
|
# Django
|
||||||
|
from django.core.urlresolvers import reverse
|
||||||
|
|
||||||
|
# Reuse Test code
|
||||||
|
from awx.main.tests.base import BaseLiveServerTest, QueueStartStopTestMixin
|
||||||
|
from awx.main.tests.base import URI
|
||||||
|
|
||||||
|
__all__ = ['UnifiedJobStdoutTests']
|
||||||
|
|
||||||
|
|
||||||
|
TEST_STDOUTS = []
|
||||||
|
uri = URI(scheme="https", username="Dhh3U47nmC26xk9PKscV", password="PXPfWW8YzYrgS@E5NbQ2H@", host="github.ginger.com/theirrepo.git/info/refs")
|
||||||
|
TEST_STDOUTS.append({
|
||||||
|
'uri' : uri,
|
||||||
|
'text' : 'hello world %s goodbye world' % uri,
|
||||||
|
'host_occurrences' : 1
|
||||||
|
})
|
||||||
|
|
||||||
|
uri = URI(scheme="https", username="applepie@@@", password="thatyouknow@@@@", host="github.ginger.com/theirrepo.git/info/refs")
|
||||||
|
TEST_STDOUTS.append({
|
||||||
|
'uri' : uri,
|
||||||
|
'text' : 'hello world %s \n\nyoyo\n\nhello\n%s' % (uri, uri),
|
||||||
|
'host_occurrences' : 2
|
||||||
|
})
|
||||||
|
|
||||||
|
|
||||||
|
class UnifiedJobStdoutTests(BaseLiveServerTest, QueueStartStopTestMixin):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(UnifiedJobStdoutTests, self).setUp()
|
||||||
|
self.setup_instances()
|
||||||
|
self.setup_users()
|
||||||
|
self.test_cases = []
|
||||||
|
|
||||||
|
for e in TEST_STDOUTS:
|
||||||
|
e['job'] = self.make_job()
|
||||||
|
e['job'].result_stdout_text = e['text']
|
||||||
|
e['job'].save()
|
||||||
|
self.test_cases.append(e)
|
||||||
|
|
||||||
|
# This is more of a functional test than a unit test.
|
||||||
|
# should filter out username and password
|
||||||
|
def test_redaction_enabled(self):
|
||||||
|
for test_data in self.test_cases:
|
||||||
|
uri = test_data['uri']
|
||||||
|
job_stdout_url = reverse('api:job_stdout', args=(test_data['job'].pk,))
|
||||||
|
|
||||||
|
response = self.get(job_stdout_url, expect=200, auth=self.get_super_credentials(), accept='application/json')
|
||||||
|
|
||||||
|
self.assertIsNotNone(response['content'])
|
||||||
|
self.check_not_found(response['content'], uri.username)
|
||||||
|
self.check_not_found(response['content'], uri.password)
|
||||||
|
# Ensure the host didn't get redacted
|
||||||
|
self.check_found(response['content'], uri.host, count=test_data['host_occurrences'])
|
||||||
Reference in New Issue
Block a user