don't allow usage of jinja templates in certain ansible CLI flags

see: https://github.com/ansible/tower/issues/1338
This commit is contained in:
Ryan Petrello 2018-04-13 16:32:39 -04:00
parent 88c243c92a
commit 7074dcd677
3 changed files with 59 additions and 8 deletions

View File

@ -58,7 +58,7 @@ from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field,
wrap_args_with_proot, get_system_task_capacity, OutputEventFilter,
parse_yaml_or_json, ignore_inventory_computed_fields, ignore_inventory_group_removal,
get_type_for_model, extract_ansible_vars)
from awx.main.utils.safe_yaml import safe_dump
from awx.main.utils.safe_yaml import safe_dump, sanitize_jinja
from awx.main.utils.reload import restart_local_services, stop_local_services
from awx.main.utils.handlers import configure_external_logger
from awx.main.consumers import emit_channel_notification
@ -1151,7 +1151,7 @@ class RunJob(BaseTask):
args = ['ansible-playbook', '-i', self.build_inventory(job, **kwargs)]
if job.job_type == 'check':
args.append('--check')
args.extend(['-u', ssh_username])
args.extend(['-u', sanitize_jinja(ssh_username)])
if 'ssh_password' in kwargs.get('passwords', {}):
args.append('--ask-pass')
if job.become_enabled:
@ -1159,9 +1159,9 @@ class RunJob(BaseTask):
if job.diff_mode:
args.append('--diff')
if become_method:
args.extend(['--become-method', become_method])
args.extend(['--become-method', sanitize_jinja(become_method)])
if become_username:
args.extend(['--become-user', become_username])
args.extend(['--become-user', sanitize_jinja(become_username)])
if 'become_password' in kwargs.get('passwords', {}):
args.append('--ask-become-pass')
# Support prompting for a vault password.
@ -2203,7 +2203,7 @@ class RunAdHocCommand(BaseTask):
args = ['ansible', '-i', self.build_inventory(ad_hoc_command, **kwargs)]
if ad_hoc_command.job_type == 'check':
args.append('--check')
args.extend(['-u', ssh_username])
args.extend(['-u', sanitize_jinja(ssh_username)])
if 'ssh_password' in kwargs.get('passwords', {}):
args.append('--ask-pass')
# We only specify sudo/su user and password if explicitly given by the
@ -2211,9 +2211,9 @@ class RunAdHocCommand(BaseTask):
if ad_hoc_command.become_enabled:
args.append('--become')
if become_method:
args.extend(['--become-method', become_method])
args.extend(['--become-method', sanitize_jinja(become_method)])
if become_username:
args.extend(['--become-user', become_username])
args.extend(['--become-user', sanitize_jinja(become_username)])
if 'become_password' in kwargs.get('passwords', {}):
args.append('--ask-become-pass')
@ -2248,7 +2248,7 @@ class RunAdHocCommand(BaseTask):
args.extend(['-e', '@%s' % (extra_vars_path)])
args.extend(['-m', ad_hoc_command.module_name])
args.extend(['-a', ad_hoc_command.module_args])
args.extend(['-a', sanitize_jinja(ad_hoc_command.module_args)])
if ad_hoc_command.limit:
args.append(ad_hoc_command.limit)

View File

@ -525,6 +525,13 @@ class TestAdhocRun(TestJobExecution):
extra_vars={'awx_foo': 'awx-bar'}
)
def test_options_jinja_usage(self):
self.instance.module_args = '{{ ansible_ssh_pass }}'
with pytest.raises(Exception):
self.task.run(self.pk)
update_model_call = self.task.update_model.call_args[1]
assert 'Jinja variables are not allowed' in update_model_call['result_traceback']
def test_created_by_extra_vars(self):
self.instance.created_by = User(pk=123, username='angry-spud')
@ -647,6 +654,33 @@ class TestJobCredentials(TestJobExecution):
]
}
def test_username_jinja_usage(self):
ssh = CredentialType.defaults['ssh']()
credential = Credential(
pk=1,
credential_type=ssh,
inputs = {'username': '{{ ansible_ssh_pass }}'}
)
self.instance.credential = credential
with pytest.raises(Exception):
self.task.run(self.pk)
update_model_call = self.task.update_model.call_args[1]
assert 'Jinja variables are not allowed' in update_model_call['result_traceback']
@pytest.mark.parametrize("flag", ['become_username', 'become_method'])
def test_become_jinja_usage(self, flag):
ssh = CredentialType.defaults['ssh']()
credential = Credential(
pk=1,
credential_type=ssh,
inputs = {'username': 'joe', flag: '{{ ansible_ssh_pass }}'}
)
self.instance.credential = credential
with pytest.raises(Exception):
self.task.run(self.pk)
update_model_call = self.task.update_model.call_args[1]
assert 'Jinja variables are not allowed' in update_model_call['result_traceback']
def test_ssh_passwords(self, field, password_name, expected_flag):
ssh = CredentialType.defaults['ssh']()
credential = Credential(

View File

@ -1,3 +1,4 @@
import re
import six
import yaml
@ -64,3 +65,19 @@ def safe_dump(x, safe_dict=None):
default_flow_style=False,
))
return ''.join(yamls)
def sanitize_jinja(arg):
"""
For some string, prevent usage of Jinja-like flags
"""
if isinstance(arg, six.string_types):
# If the argument looks like it contains Jinja expressions
# {{ x }} ...
if re.search('\{\{[^}]+}}', arg) is not None:
raise ValueError('Inline Jinja variables are not allowed.')
# If the argument looks like it contains Jinja statements/control flow...
# {% if x.foo() %} ...
if re.search('\{%[^%]+%}', arg) is not None:
raise ValueError('Inline Jinja variables are not allowed.')
return arg