mirror of
https://github.com/ansible/awx.git
synced 2026-01-11 10:00:01 -03:30
Updates and fixes for unit tests on supporting ansible "become"
This commit is contained in:
parent
d5c9e37499
commit
5ff7558cd3
@ -532,7 +532,7 @@ class RunJob(BaseTask):
|
||||
passwords = super(RunJob, self).build_passwords(job, **kwargs)
|
||||
creds = job.credential
|
||||
if creds:
|
||||
for field in ('ssh_key_unlock', 'ssh_password', 'sudo_password', 'su_password', 'vault_password'):
|
||||
for field in ('ssh_key_unlock', 'ssh_password', 'become_password', 'vault_password'):
|
||||
if field == 'ssh_password':
|
||||
value = kwargs.get(field, decrypt_field(creds, 'password'))
|
||||
else:
|
||||
@ -640,24 +640,13 @@ class RunJob(BaseTask):
|
||||
if job.job_template.become_enabled:
|
||||
args.append('--become')
|
||||
if become_method:
|
||||
args.append('--become-method', become_method)
|
||||
args.extend(['--become-method', become_method])
|
||||
if become_username:
|
||||
args.append('--become-username', become_username)
|
||||
args.extend(['--become-user', become_username])
|
||||
if 'become_password' in kwargs.get('passwords', {}):
|
||||
args.append('--ask-become-pass')
|
||||
except ValueError:
|
||||
pass
|
||||
# We only specify sudo/su user and password if explicitly given by the
|
||||
# credential. Credential should never specify both sudo and su.
|
||||
# if su_username:
|
||||
# args.extend(['-R', su_username])
|
||||
# if 'su_password' in kwargs.get('passwords', {}):
|
||||
# args.append('--ask-su-pass')
|
||||
# if sudo_username:
|
||||
# args.extend(['-U', sudo_username])
|
||||
# if 'sudo_password' in kwargs.get('passwords', {}):
|
||||
# args.append('--ask-sudo-pass')
|
||||
|
||||
# Support prompting for a vault password.
|
||||
if 'vault_password' in kwargs.get('passwords', {}):
|
||||
args.append('--ask-vault-pass')
|
||||
@ -701,7 +690,6 @@ class RunJob(BaseTask):
|
||||
args.append("scan_facts.yml")
|
||||
else:
|
||||
args.append(job.playbook)
|
||||
|
||||
return args
|
||||
|
||||
def build_cwd(self, job, **kwargs):
|
||||
@ -721,10 +709,10 @@ class RunJob(BaseTask):
|
||||
d = super(RunJob, self).get_password_prompts()
|
||||
d[re.compile(r'^Enter passphrase for .*:\s*?$', re.M)] = 'ssh_key_unlock'
|
||||
d[re.compile(r'^Bad passphrase, try again for .*:\s*?$', re.M)] = ''
|
||||
d[re.compile(r'^sudo password.*:\s*?$', re.M)] = 'sudo_password'
|
||||
d[re.compile(r'^SUDO password.*:\s*?$', re.M)] = 'sudo_password'
|
||||
d[re.compile(r'^su password.*:\s*?$', re.M)] = 'su_password'
|
||||
d[re.compile(r'^SU password.*:\s*?$', re.M)] = 'su_password'
|
||||
d[re.compile(r'^sudo password.*:\s*?$', re.M)] = 'become_password'
|
||||
d[re.compile(r'^SUDO password.*:\s*?$', re.M)] = 'become_password'
|
||||
d[re.compile(r'^su password.*:\s*?$', re.M)] = 'become_password'
|
||||
d[re.compile(r'^SU password.*:\s*?$', re.M)] = 'become_password'
|
||||
d[re.compile(r'^SSH password:\s*?$', re.M)] = 'ssh_password'
|
||||
d[re.compile(r'^Password:\s*?$', re.M)] = 'ssh_password'
|
||||
d[re.compile(r'^Vault password:\s*?$', re.M)] = 'vault_password'
|
||||
|
||||
@ -338,10 +338,9 @@ class BaseTestMixin(QueueTestMixin):
|
||||
'ssh_key_data': '',
|
||||
'ssh_key_unlock': '',
|
||||
'password': '',
|
||||
'sudo_username': '',
|
||||
'sudo_password': '',
|
||||
'su_username': '',
|
||||
'su_password': '',
|
||||
'become_method': '',
|
||||
'become_username': '',
|
||||
'become_password': '',
|
||||
'vault_password': '',
|
||||
}
|
||||
opts.update(kwargs)
|
||||
|
||||
@ -283,8 +283,9 @@ class BaseJobTestMixin(BaseTestMixin):
|
||||
self.cred_eve = self.user_eve.credentials.create(
|
||||
username='eve',
|
||||
password='ASK',
|
||||
sudo_username='root',
|
||||
sudo_password='ASK',
|
||||
become_method='sudo',
|
||||
become_username='root',
|
||||
become_password='ASK',
|
||||
created_by=self.user_sue,
|
||||
)
|
||||
self.cred_frank = self.user_frank.credentials.create(
|
||||
|
||||
@ -190,7 +190,7 @@ class JobTemplateTest(BaseJobTestMixin, django.test.TestCase):
|
||||
JOB_TEMPLATE_FIELDS = ('id', 'type', 'url', 'related', 'summary_fields',
|
||||
'created', 'modified', 'name', 'description',
|
||||
'job_type', 'inventory', 'project', 'playbook',
|
||||
'credential', 'use_su_credential', 'sudo_su_flag',
|
||||
'become_enabled', 'credential',
|
||||
'cloud_credential', 'force_handlers', 'forks',
|
||||
'limit', 'verbosity', 'extra_vars',
|
||||
'ask_variables_on_launch', 'job_tags', 'skip_tags',
|
||||
@ -714,7 +714,7 @@ class JobStartCancelTest(BaseJobTestMixin, django.test.LiveServerTestCase):
|
||||
self.assertFalse(response['can_start'])
|
||||
response = self.post(url, {}, expect=405)
|
||||
|
||||
# Test with a job that prompts for SSH and sudo passwords.
|
||||
# Test with a job that prompts for SSH and sudo become passwords.
|
||||
#job = self.job_sup_run
|
||||
job = self.make_job(self.jt_sup_run, self.user_sue, 'new')
|
||||
url = reverse('api:job_start', args=(job.pk,))
|
||||
@ -722,12 +722,12 @@ class JobStartCancelTest(BaseJobTestMixin, django.test.LiveServerTestCase):
|
||||
response = self.get(url)
|
||||
self.assertTrue(response['can_start'])
|
||||
self.assertEqual(set(response['passwords_needed_to_start']),
|
||||
set(['ssh_password', 'sudo_password']))
|
||||
set(['ssh_password', 'become_password']))
|
||||
data = dict()
|
||||
response = self.post(url, data, expect=400)
|
||||
data['ssh_password'] = 'sshpass'
|
||||
response = self.post(url, data, expect=400)
|
||||
data2 = dict(sudo_password='sudopass')
|
||||
data2 = dict(become_password='sudopass')
|
||||
response = self.post(url, data2, expect=400)
|
||||
data.update(data2)
|
||||
response = self.post(url, data, expect=202)
|
||||
@ -796,12 +796,12 @@ class JobStartCancelTest(BaseJobTestMixin, django.test.LiveServerTestCase):
|
||||
with self.current_user(self.user_sue):
|
||||
response = self.get(url)
|
||||
self.assertEqual(set(response['passwords_needed_to_start']),
|
||||
set(['ssh_password', 'sudo_password']))
|
||||
set(['ssh_password', 'become_password']))
|
||||
data = dict()
|
||||
response = self.post(url, data, expect=400)
|
||||
data['ssh_password'] = 'sshpass'
|
||||
response = self.post(url, data, expect=400)
|
||||
data2 = dict(sudo_password='sudopass')
|
||||
data2 = dict(become_password='sudopass')
|
||||
response = self.post(url, data2, expect=400)
|
||||
data.update(data2)
|
||||
response = self.post(url, data, expect=202)
|
||||
|
||||
@ -935,9 +935,10 @@ class RunJobTest(BaseJobExecutionTest):
|
||||
self.check_job_result(job, 'successful')
|
||||
self.assertTrue('"--ask-pass"' in job.job_args)
|
||||
|
||||
def test_sudo_username_and_password(self):
|
||||
self.create_test_credential(sudo_username='sudouser',
|
||||
sudo_password='sudopass')
|
||||
def test_become_username_and_password(self):
|
||||
self.create_test_credential(become_method='sudo',
|
||||
become_username='sudouser',
|
||||
become_password='sudopass')
|
||||
self.create_test_project(TEST_PLAYBOOK)
|
||||
job_template = self.create_test_job_template()
|
||||
job = self.create_test_job(job_template=job_template)
|
||||
@ -945,77 +946,65 @@ class RunJobTest(BaseJobExecutionTest):
|
||||
self.assertFalse(job.passwords_needed_to_start)
|
||||
self.assertTrue(job.signal_start())
|
||||
job = Job.objects.get(pk=job.pk)
|
||||
# Job may fail if current user doesn't have password-less sudo
|
||||
# Job may fail if current user doesn't have password-less become
|
||||
# privileges, but we're mainly checking the command line arguments.
|
||||
self.check_job_result(job, ('successful', 'failed'))
|
||||
self.assertTrue('"-U"' in job.job_args)
|
||||
self.assertTrue('"--ask-sudo-pass"' in job.job_args)
|
||||
self.assertFalse('"-s"' in job.job_args)
|
||||
self.assertFalse('"-R"' in job.job_args)
|
||||
self.assertFalse('"--ask-su-pass"' in job.job_args)
|
||||
self.assertFalse('"-S"' in job.job_args)
|
||||
self.assertTrue('"--become-user"' in job.job_args)
|
||||
self.assertTrue('"--become-method"' in job.job_args)
|
||||
self.assertTrue('"--ask-become-pass"' in job.job_args)
|
||||
|
||||
def test_sudo_ask_password(self):
|
||||
self.create_test_credential(sudo_password='ASK')
|
||||
def test_become_ask_password(self):
|
||||
self.create_test_credential(become_password='ASK')
|
||||
self.create_test_project(TEST_PLAYBOOK)
|
||||
job_template = self.create_test_job_template()
|
||||
job = self.create_test_job(job_template=job_template)
|
||||
self.assertEqual(job.status, 'new')
|
||||
self.assertTrue(job.passwords_needed_to_start)
|
||||
self.assertTrue('sudo_password' in job.passwords_needed_to_start)
|
||||
self.assertFalse('su_password' in job.passwords_needed_to_start)
|
||||
self.assertTrue('become_password' in job.passwords_needed_to_start)
|
||||
self.assertFalse(job.signal_start())
|
||||
self.assertTrue(job.signal_start(sudo_password='sudopass'))
|
||||
self.assertTrue(job.signal_start(become_password='sudopass'))
|
||||
job = Job.objects.get(pk=job.pk)
|
||||
# Job may fail if current user doesn't have password-less sudo
|
||||
# Job may fail if current user doesn't have password-less become
|
||||
# privileges, but we're mainly checking the command line arguments.
|
||||
self.assertTrue(job.status in ('successful', 'failed'))
|
||||
self.assertTrue('"--ask-sudo-pass"' in job.job_args)
|
||||
self.assertFalse('"-s"' in job.job_args)
|
||||
self.assertFalse('"-R"' in job.job_args)
|
||||
self.assertFalse('"--ask-su-pass"' in job.job_args)
|
||||
self.assertFalse('"-S"' in job.job_args)
|
||||
self.assertTrue('"--ask-become-pass"' in job.job_args)
|
||||
self.assertFalse('"--become-user"' in job.job_args)
|
||||
self.assertFalse('"--become-method"' in job.job_args)
|
||||
|
||||
def test_su_username_and_password(self):
|
||||
self.create_test_credential(su_username='suuser',
|
||||
su_password='supass')
|
||||
def test_job_template_become_enabled(self):
|
||||
self.create_test_project(TEST_PLAYBOOK)
|
||||
job_template = self.create_test_job_template()
|
||||
job_template = self.create_test_job_template(become_enabled=True)
|
||||
job = self.create_test_job(job_template=job_template)
|
||||
self.assertEqual(job.status, 'new')
|
||||
self.assertFalse(job.passwords_needed_to_start)
|
||||
self.assertTrue(job.signal_start())
|
||||
job = Job.objects.get(pk=job.pk)
|
||||
# Job may fail, but we're mainly checking the command line arguments.
|
||||
self.check_job_result(job, ('successful', 'failed'))
|
||||
self.assertTrue('"-R"' in job.job_args)
|
||||
self.assertTrue('"--ask-su-pass"' in job.job_args)
|
||||
self.assertFalse('"-S"' in job.job_args)
|
||||
self.assertFalse('"-U"' in job.job_args)
|
||||
self.assertFalse('"--ask-sudo-pass"' in job.job_args)
|
||||
self.assertFalse('"-s"' in job.job_args)
|
||||
# Job may fail if current user doesn't have password-less become
|
||||
# privileges, but we're mainly checking the command line arguments.
|
||||
self.assertTrue(job.status in ('successful', 'failed'))
|
||||
self.assertTrue('"--become"' in job.job_args)
|
||||
self.assertFalse('"--become-user"' in job.job_args)
|
||||
self.assertFalse('"--become-method"' in job.job_args)
|
||||
|
||||
def test_su_ask_password(self):
|
||||
self.create_test_credential(su_password='ASK')
|
||||
def test_become_enabled_with_username_and_password(self):
|
||||
self.create_test_credential(become_method='sudo',
|
||||
become_username='sudouser',
|
||||
become_password='sudopass')
|
||||
self.create_test_project(TEST_PLAYBOOK)
|
||||
job_template = self.create_test_job_template()
|
||||
job_template = self.create_test_job_template(become_enabled=True)
|
||||
job = self.create_test_job(job_template=job_template)
|
||||
self.assertEqual(job.status, 'new')
|
||||
self.assertTrue(job.passwords_needed_to_start)
|
||||
self.assertTrue('su_password' in job.passwords_needed_to_start)
|
||||
self.assertFalse('sudo_password' in job.passwords_needed_to_start)
|
||||
self.assertFalse(job.signal_start())
|
||||
self.assertTrue(job.signal_start(su_password='supass'))
|
||||
self.assertFalse(job.passwords_needed_to_start)
|
||||
self.assertTrue(job.signal_start())
|
||||
job = Job.objects.get(pk=job.pk)
|
||||
# Job may fail, but we're mainly checking the command line arguments.
|
||||
self.assertTrue(job.status in ('successful', 'failed'))
|
||||
self.assertTrue('"--ask-su-pass"' in job.job_args)
|
||||
self.assertFalse('"-S"' in job.job_args)
|
||||
self.assertFalse('"-R"' in job.job_args)
|
||||
self.assertFalse('"-U"' in job.job_args)
|
||||
self.assertFalse('"--ask-sudo-pass"' in job.job_args)
|
||||
self.assertFalse('"-s"' in job.job_args)
|
||||
|
||||
# Job may fail if current user doesn't have password-less become
|
||||
# privileges, but we're mainly checking the command line arguments.
|
||||
self.check_job_result(job, ('successful', 'failed'))
|
||||
self.assertTrue('"--become-user"' in job.job_args)
|
||||
self.assertTrue('"--become-method"' in job.job_args)
|
||||
self.assertTrue('"--ask-become-pass"' in job.job_args)
|
||||
self.assertTrue('"--become"' in job.job_args)
|
||||
|
||||
def test_unlocked_ssh_key(self):
|
||||
self.create_test_credential(ssh_key_data=TEST_SSH_KEY_DATA)
|
||||
self.create_test_project(TEST_PLAYBOOK)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user