reencrypt WFJT.survey_spec too

https://github.com/ansible/ansible-tower/issues/7046
This commit is contained in:
Ryan Petrello 2017-11-16 16:31:17 -05:00
parent e7077185bf
commit a80d5b1b39
No known key found for this signature in database
GPG Key ID: F2AA5F2122351777
2 changed files with 48 additions and 35 deletions

View File

@ -65,7 +65,6 @@ def _credentials(apps):
credential.save()
def _unified_jobs(apps):
UnifiedJob = apps.get_model('main', 'UnifiedJob')
for uj in UnifiedJob.objects.all():
@ -80,32 +79,36 @@ def encrypt_survey_passwords(apps, schema_editor):
_encrypt_survey_passwords(
apps.get_model('main', 'Job'),
apps.get_model('main', 'JobTemplate'),
apps.get_model('main', 'WorkflowJob'),
apps.get_model('main', 'WorkflowJobTemplate'),
)
def _encrypt_survey_passwords(Job, JobTemplate):
def _encrypt_survey_passwords(Job, JobTemplate, WorkflowJob, WorkflowJobTemplate):
from awx.main.utils.encryption import encrypt_value
for jt in JobTemplate.objects.exclude(survey_spec={}):
changed = False
if jt.survey_spec.get('spec', []):
for field in jt.survey_spec['spec']:
if field.get('type') == 'password' and field.get('default', ''):
if field['default'].startswith('$encrypted$'):
continue
field['default'] = encrypt_value(field['default'], pk=None)
changed = True
if changed:
jt.save()
for _type in (JobTemplate, WorkflowJobTemplate):
for jt in _type.objects.exclude(survey_spec={}):
changed = False
if jt.survey_spec.get('spec', []):
for field in jt.survey_spec['spec']:
if field.get('type') == 'password' and field.get('default', ''):
if field['default'].startswith('$encrypted$'):
continue
field['default'] = encrypt_value(field['default'], pk=None)
changed = True
if changed:
jt.save()
for job in Job.objects.defer('result_stdout_text').exclude(survey_passwords={}).iterator():
changed = False
for key in job.survey_passwords:
if key in job.extra_vars:
extra_vars = json.loads(job.extra_vars)
if not extra_vars.get(key, '') or extra_vars[key].startswith('$encrypted$'):
continue
extra_vars[key] = encrypt_value(extra_vars[key], pk=None)
job.extra_vars = json.dumps(extra_vars)
changed = True
if changed:
job.save()
for _type in (Job, WorkflowJob):
for job in _type.objects.defer('result_stdout_text').exclude(survey_passwords={}).iterator():
changed = False
for key in job.survey_passwords:
if key in job.extra_vars:
extra_vars = json.loads(job.extra_vars)
if not extra_vars.get(key, '') or extra_vars[key].startswith('$encrypted$'):
continue
extra_vars[key] = encrypt_value(extra_vars[key], pk=None)
job.extra_vars = json.dumps(extra_vars)
changed = True
if changed:
job.save()

View File

@ -12,6 +12,8 @@ from awx.main.models import (
UnifiedJob,
Job,
JobTemplate,
WorkflowJob,
WorkflowJobTemplate,
NotificationTemplate,
Credential,
)
@ -99,9 +101,13 @@ def test_unified_job_migration(old_enc, new_enc, value):
@pytest.mark.django_db
def test_survey_default_password_encryption(job_template_factory):
jt = job_template_factory('jt', organization='org1', project='prj',
inventory='inv', credential='cred').job_template
@pytest.mark.parametrize("attr, cls", [
['job_template', JobTemplate],
['workflow_job_template', WorkflowJobTemplate]
])
def test_survey_default_password_encryption(attr, cls, request):
factory = request.getfuncargvalue('{}_factory'.format(attr))
jt = getattr(factory('jt'), attr)
jt.survey_enabled = True
jt.survey_spec = {
'description': 'A survey',
@ -117,15 +123,19 @@ def test_survey_default_password_encryption(job_template_factory):
}
jt.save()
_encrypt_survey_passwords(Job, JobTemplate)
spec = JobTemplate.objects.get(pk=jt.pk).survey_spec['spec']
_encrypt_survey_passwords(Job, JobTemplate, WorkflowJob, WorkflowJobTemplate)
spec = cls.objects.get(pk=jt.pk).survey_spec['spec']
assert decrypt_value(get_encryption_key('value', pk=None), spec[0]['default']) == 'SUPERSECRET'
@pytest.mark.django_db
def test_job_survey_vars_encryption(job_template_factory):
jt = job_template_factory('jt', organization='org1', project='prj',
inventory='inv', credential='cred').job_template
@pytest.mark.parametrize("attr, cls", [
['job_template', Job],
['workflow_job_template', WorkflowJob]
])
def test_job_survey_vars_encryption(attr, cls, request):
factory = request.getfuncargvalue('{}_factory'.format(attr))
jt = getattr(factory('jt'), attr)
jt.survey_enabled = True
jt.survey_spec = {
'description': 'A survey',
@ -144,6 +154,6 @@ def test_job_survey_vars_encryption(job_template_factory):
job.extra_vars = json.dumps({'secret_value': 'SUPERSECRET'})
job.save()
_encrypt_survey_passwords(Job, JobTemplate)
job = Job.objects.get(pk=job.pk)
_encrypt_survey_passwords(Job, JobTemplate, WorkflowJob, WorkflowJobTemplate)
job = cls.objects.get(pk=job.pk)
assert json.loads(job.decrypted_extra_vars()) == {'secret_value': 'SUPERSECRET'}