mirror of
https://github.com/ansible/awx.git
synced 2026-01-11 10:00:01 -03:30
AC-1071 Removed ssh_key_path option, only need to be able to accept PEM key format.
This commit is contained in:
parent
107fc85110
commit
ce2ae2b945
@ -995,7 +995,7 @@ class CredentialSerializer(BaseSerializer):
|
||||
class Meta:
|
||||
model = Credential
|
||||
fields = ('*', 'user', 'team', 'kind', 'cloud', 'username',
|
||||
'password', 'ssh_key_data', 'ssh_key_path', 'ssh_key_unlock',
|
||||
'password', 'ssh_key_data', 'ssh_key_unlock',
|
||||
'sudo_username', 'sudo_password', 'vault_password')
|
||||
|
||||
def to_native(self, obj):
|
||||
|
||||
@ -86,7 +86,8 @@ class Credential(CommonModelNameNotUnique):
|
||||
verbose_name=_('SSH private key'),
|
||||
help_text=_('RSA or DSA private key to be used instead of password.'),
|
||||
)
|
||||
ssh_key_path = models.CharField(
|
||||
ssh_key_path = models.CharField( # FIXME: No longer needed.
|
||||
editable=False,
|
||||
max_length=1024,
|
||||
blank=True,
|
||||
default='',
|
||||
@ -128,16 +129,10 @@ class Credential(CommonModelNameNotUnique):
|
||||
def needs_ssh_key_unlock(self):
|
||||
ssh_key_data = ''
|
||||
if self.kind == 'ssh' and self.ssh_key_unlock == 'ASK':
|
||||
if self.ssh_key_data:
|
||||
if self.pk:
|
||||
ssh_key_data = decrypt_field(self, 'ssh_key_data')
|
||||
else:
|
||||
ssh_key_data = self.ssh_key_data
|
||||
elif self.ssh_key_path:
|
||||
try:
|
||||
ssh_key_data = file(self.ssh_key_path).read(2**15)
|
||||
except IOError:
|
||||
pass
|
||||
if self.pk:
|
||||
ssh_key_data = decrypt_field(self, 'ssh_key_data')
|
||||
else:
|
||||
ssh_key_data = self.ssh_key_data
|
||||
return 'ENCRYPTED' in ssh_key_data
|
||||
|
||||
@property
|
||||
@ -223,28 +218,11 @@ class Credential(CommonModelNameNotUnique):
|
||||
self._validate_ssh_private_key(ssh_key_data)
|
||||
return self.ssh_key_data # No need to return decrypted version here.
|
||||
|
||||
def clean_ssh_key_path(self):
|
||||
ssh_key_path = self.ssh_key_path or ''
|
||||
if ssh_key_path:
|
||||
try:
|
||||
ssh_key_data = file(ssh_key_path).read(2**15)
|
||||
except IOError, e:
|
||||
raise ValidationError(e.strerror or 'Unable to read SSH key path')
|
||||
self._validate_ssh_private_key(ssh_key_data)
|
||||
return ssh_key_path
|
||||
|
||||
def clean_ssh_key_unlock(self):
|
||||
ssh_key_data = ''
|
||||
if self.ssh_key_data:
|
||||
if self.pk:
|
||||
ssh_key_data = decrypt_field(self, 'ssh_key_data')
|
||||
else:
|
||||
ssh_key_data = self.ssh_key_data
|
||||
elif self.ssh_key_path:
|
||||
try:
|
||||
ssh_key_data = file(self.ssh_key_path).read(2**15)
|
||||
except IOError:
|
||||
pass
|
||||
if self.pk:
|
||||
ssh_key_data = decrypt_field(self, 'ssh_key_data')
|
||||
else:
|
||||
ssh_key_data = self.ssh_key_data
|
||||
if 'ENCRYPTED' in ssh_key_data and not self.ssh_key_unlock:
|
||||
raise ValidationError('SSH key unlock must be set when SSH key '
|
||||
'is encrypted')
|
||||
@ -253,8 +231,6 @@ class Credential(CommonModelNameNotUnique):
|
||||
def clean(self):
|
||||
if self.user and self.team:
|
||||
raise ValidationError('Credential cannot be assigned to both a user and team')
|
||||
if self.ssh_key_data and self.ssh_key_path:
|
||||
raise ValidationError('Only one of SSH key data or path should be provided')
|
||||
|
||||
def _validate_unique_together_with_null(self, unique_check, exclude=None):
|
||||
# Based on existing Django model validation code, except it doesn't
|
||||
|
||||
@ -385,7 +385,7 @@ class RunJob(BaseTask):
|
||||
as ssh_key_data).
|
||||
'''
|
||||
credential = getattr(job, 'credential', None)
|
||||
if credential and credential.ssh_key_data:
|
||||
if credential:
|
||||
return decrypt_field(credential, 'ssh_key_data') or None
|
||||
|
||||
def build_passwords(self, job, **kwargs):
|
||||
@ -485,8 +485,7 @@ class RunJob(BaseTask):
|
||||
|
||||
# If private key isn't encrypted, pass the path on the command line.
|
||||
ssh_key_path = kwargs.get('private_data_file', '')
|
||||
ssh_key_path = ssh_key_path or (creds and creds.ssh_key_path) or ''
|
||||
use_ssh_agent = 'ssh_key_unlock' in kwargs.get('passwords', {})
|
||||
use_ssh_agent = bool(kwargs.get('passwords', {}).get('ssh_key_unlock', ''))
|
||||
if ssh_key_path and not use_ssh_agent:
|
||||
args.append('--private-key=%s' % ssh_key_path)
|
||||
|
||||
@ -654,6 +653,7 @@ class RunProjectUpdate(BaseTask):
|
||||
args.extend(['-e', json.dumps(extra_vars)])
|
||||
args.append('project_update.yml')
|
||||
|
||||
# If using an SSH key, run using ssh-agent.
|
||||
ssh_key_path = kwargs.get('private_data_file', '')
|
||||
if ssh_key_path:
|
||||
subcmds = [('ssh-add', ssh_key_path), args]
|
||||
|
||||
@ -547,39 +547,6 @@ class ProjectsTest(BaseTest):
|
||||
data['ssh_key_data'] = TEST_SSH_KEY_DATA
|
||||
self.post(url, data, expect=201)
|
||||
|
||||
# Test with ssh_key_path (invalid path, bad data, then valid key).
|
||||
handle, ssh_key_path = tempfile.mkstemp(suffix='.key')
|
||||
self._temp_paths.append(ssh_key_path)
|
||||
ssh_key_file = os.fdopen(handle, 'w')
|
||||
ssh_key_file.write(TEST_SSH_KEY_DATA)
|
||||
ssh_key_file.close()
|
||||
handle, invalid_ssh_key_path = tempfile.mkstemp(suffix='.key')
|
||||
self._temp_paths.append(invalid_ssh_key_path)
|
||||
invalid_ssh_key_file = os.fdopen(handle, 'w')
|
||||
invalid_ssh_key_file.write('not a valid key')
|
||||
invalid_ssh_key_file.close()
|
||||
with self.current_user(self.super_django_user):
|
||||
data = dict(name='yzv', user=self.super_django_user.pk, kind='ssh',
|
||||
ssh_key_path=ssh_key_path + '.moo')
|
||||
self.post(url, data, expect=400)
|
||||
data['ssh_key_path'] = invalid_ssh_key_path
|
||||
self.post(url, data, expect=400)
|
||||
data['ssh_key_path'] = ssh_key_path
|
||||
self.post(url, data, expect=201)
|
||||
|
||||
# Test with encrypted key on ssh_key_path.
|
||||
handle, enc_ssh_key_path = tempfile.mkstemp(suffix='.key')
|
||||
self._temp_paths.append(enc_ssh_key_path)
|
||||
enc_ssh_key_file = os.fdopen(handle, 'w')
|
||||
enc_ssh_key_file.write(TEST_SSH_KEY_DATA_LOCKED)
|
||||
enc_ssh_key_file.close()
|
||||
with self.current_user(self.super_django_user):
|
||||
data = dict(name='wvz', user=self.super_django_user.pk, kind='ssh',
|
||||
ssh_key_path=enc_ssh_key_path)
|
||||
self.post(url, data, expect=400)
|
||||
data['ssh_key_unlock'] = TEST_SSH_KEY_DATA_UNLOCK
|
||||
self.post(url, data, expect=201)
|
||||
|
||||
# Test post as organization admin where team is part of org, but user
|
||||
# creating credential is not a member of the team. UI may pass user
|
||||
# as an empty string instead of None.
|
||||
|
||||
@ -862,68 +862,6 @@ class RunJobTest(BaseCeleryTest):
|
||||
self.assertTrue('ssh-agent' in job.job_args)
|
||||
self.assertTrue('Bad passphrase' not in job.result_stdout)
|
||||
|
||||
def test_unlocked_ssh_key_path(self):
|
||||
handle, ssh_key_path = tempfile.mkstemp(suffix='.key')
|
||||
self._temp_paths.append(ssh_key_path)
|
||||
ssh_key_file = os.fdopen(handle, 'w')
|
||||
ssh_key_file.write(TEST_SSH_KEY_DATA)
|
||||
ssh_key_file.close()
|
||||
self.create_test_credential(ssh_key_path=ssh_key_path)
|
||||
self.create_test_project(TEST_PLAYBOOK)
|
||||
job_template = self.create_test_job_template()
|
||||
job = self.create_test_job(job_template=job_template)
|
||||
self.assertEqual(job.status, 'new')
|
||||
self.assertFalse(job.passwords_needed_to_start)
|
||||
self.assertTrue(job.signal_start())
|
||||
job = Job.objects.get(pk=job.pk)
|
||||
self.check_job_result(job, 'successful')
|
||||
self.assertTrue('--private-key=' in job.job_args)
|
||||
self.assertFalse('ssh-agent' in job.job_args)
|
||||
|
||||
def test_locked_ssh_key_path_with_password(self):
|
||||
handle, ssh_key_path = tempfile.mkstemp(suffix='.key')
|
||||
self._temp_paths.append(ssh_key_path)
|
||||
ssh_key_file = os.fdopen(handle, 'w')
|
||||
ssh_key_file.write(TEST_SSH_KEY_DATA_LOCKED)
|
||||
ssh_key_file.close()
|
||||
self.create_test_credential(ssh_key_path=ssh_key_path,
|
||||
ssh_key_unlock=TEST_SSH_KEY_DATA_UNLOCK)
|
||||
self.create_test_project(TEST_PLAYBOOK)
|
||||
job_template = self.create_test_job_template()
|
||||
job = self.create_test_job(job_template=job_template)
|
||||
self.assertEqual(job.status, 'new')
|
||||
self.assertFalse(job.passwords_needed_to_start)
|
||||
self.assertTrue(job.signal_start())
|
||||
job = Job.objects.get(pk=job.pk)
|
||||
self.check_job_result(job, 'successful')
|
||||
self.assertTrue('ssh-agent' in job.job_args)
|
||||
self.assertTrue('Bad passphrase' not in job.result_stdout)
|
||||
|
||||
def test_locked_ssh_key_path_ask_password(self):
|
||||
handle, ssh_key_path = tempfile.mkstemp(suffix='.key')
|
||||
self._temp_paths.append(ssh_key_path)
|
||||
ssh_key_file = os.fdopen(handle, 'w')
|
||||
ssh_key_file.write(TEST_SSH_KEY_DATA_LOCKED)
|
||||
ssh_key_file.close()
|
||||
self.create_test_credential(ssh_key_path=ssh_key_path,
|
||||
ssh_key_unlock='ASK')
|
||||
self.create_test_project(TEST_PLAYBOOK)
|
||||
job_template = self.create_test_job_template()
|
||||
job = self.create_test_job(job_template=job_template)
|
||||
self.assertEqual(job.status, 'new')
|
||||
self.assertTrue(job.passwords_needed_to_start)
|
||||
self.assertTrue('ssh_key_unlock' in job.passwords_needed_to_start)
|
||||
self.assertFalse(job.signal_start())
|
||||
job.status = 'failed'
|
||||
job.save()
|
||||
job = self.create_test_job(job_template=job_template)
|
||||
self.assertEqual(job.status, 'new')
|
||||
self.assertTrue(job.signal_start(ssh_key_unlock=TEST_SSH_KEY_DATA_UNLOCK))
|
||||
job = Job.objects.get(pk=job.pk)
|
||||
self.check_job_result(job, 'successful')
|
||||
self.assertTrue('ssh-agent' in job.job_args)
|
||||
self.assertTrue('Bad passphrase' not in job.result_stdout)
|
||||
|
||||
def test_vault_password(self):
|
||||
self.create_test_credential(vault_password=TEST_VAULT_PASSWORD)
|
||||
self.create_test_project(TEST_VAULT_PLAYBOOK)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user