enforce required credential fields at job start time rather than on save

this is necessary for credential plugins support so that you can (in two
requests):

1.  Save a Credential with _no_ input values defined
2.  Create/associate one (or more) CredentialInputSource records to the
    new Credential
This commit is contained in:
Ryan Petrello 2019-02-28 11:24:38 -05:00 committed by Jake McDermott
parent e2d474ddd2
commit 42f4956a7f
No known key found for this signature in database
GPG Key ID: 9A6F084352C3A0B7
4 changed files with 52 additions and 34 deletions

View File

@ -655,13 +655,7 @@ class CredentialInputField(JSONSchemaField):
)
errors[error.schema['id']] = [error.message]
inputs = model_instance.credential_type.inputs
defined_fields = model_instance.credential_type.defined_fields
for field in inputs.get('required', []):
if field in defined_fields and not value.get(field, None):
errors[field] = [_('required for %s') % (
model_instance.credential_type.name
)]
# `ssh_key_unlock` requirements are very specific and can't be
# represented without complicated JSON schema

View File

@ -1230,6 +1230,23 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
self.save(update_fields=['job_explanation'])
return (False, None)
# verify that any associated credentials aren't missing required field data
missing_credential_inputs = []
for credential in self.credentials.all():
defined_fields = credential.credential_type.defined_fields
for required in credential.credential_type.inputs.get('required', []):
if required in defined_fields and not credential.has_input(required):
missing_credential_inputs.append(required)
if missing_credential_inputs:
self.job_explanation = '{} cannot start because Credential {} does not provide one or more required fields ({}).'.format(
self._meta.verbose_name.title(),
credential.name,
', '.join(sorted(missing_credential_inputs))
)
self.save(update_fields=['job_explanation'])
return (False, None)
needed = self.get_passwords_needed_to_start()
try:
start_args = json.loads(decrypt_field(self, 'start_args'))

View File

@ -763,7 +763,7 @@ def test_falsey_field_data(get, post, organization, admin, field_value):
'credential_type': net.pk,
'organization': organization.id,
'inputs': {
'username': 'joe-user', # username is required
'username': 'joe-user',
'authorize': field_value
}
}
@ -952,9 +952,15 @@ def test_vault_password_required(post, organization, admin):
},
admin
)
assert response.status_code == 400
assert response.data['inputs'] == {'vault_password': ['required for Vault']}
assert Credential.objects.count() == 0
assert response.status_code == 201
assert Credential.objects.count() == 1
# vault_password must be specified by launch time
j = Job()
j.save()
j.credentials.add(Credential.objects.first())
assert j.pre_start() == (False, None)
assert 'required fields (vault_password)' in j.job_explanation
#
@ -1236,14 +1242,15 @@ def test_aws_create_fail_required_fields(post, organization, admin, version, par
params,
admin
)
assert response.status_code == 400
assert response.status_code == 201
assert Credential.objects.count() == 1
assert Credential.objects.count() == 0
errors = response.data
if version == 'v2':
errors = response.data['inputs']
assert errors['username'] == ['required for %s' % aws.name]
assert errors['password'] == ['required for %s' % aws.name]
# username and password must be specified by launch time
j = Job()
j.save()
j.credentials.add(Credential.objects.first())
assert j.pre_start() == (False, None)
assert 'required fields (password, username)' in j.job_explanation
#
@ -1307,15 +1314,15 @@ def test_vmware_create_fail_required_fields(post, organization, admin, version,
params,
admin
)
assert response.status_code == 400
assert response.status_code == 201
assert Credential.objects.count() == 1
assert Credential.objects.count() == 0
errors = response.data
if version == 'v2':
errors = response.data['inputs']
assert errors['username'] == ['required for %s' % vmware.name]
assert errors['password'] == ['required for %s' % vmware.name]
assert errors['host'] == ['required for %s' % vmware.name]
# username, password, and host must be specified by launch time
j = Job()
j.save()
j.credentials.add(Credential.objects.first())
assert j.pre_start() == (False, None)
assert 'required fields (host, password, username)' in j.job_explanation
#
@ -1406,14 +1413,14 @@ def test_openstack_create_fail_required_fields(post, organization, admin, versio
params,
admin
)
assert response.status_code == 400
errors = response.data
if version == 'v2':
errors = response.data['inputs']
assert errors['username'] == ['required for %s' % openstack.name]
assert errors['password'] == ['required for %s' % openstack.name]
assert errors['host'] == ['required for %s' % openstack.name]
assert errors['project'] == ['required for %s' % openstack.name]
assert response.status_code == 201
# username, password, host, and project must be specified by launch time
j = Job()
j.save()
j.credentials.add(Credential.objects.first())
assert j.pre_start() == (False, None)
assert 'required fields (host, password, project, username)' in j.job_explanation
@pytest.mark.django_db

View File

@ -184,7 +184,7 @@ def test_ssh_key_data_validation(organization, kind, ssh_key_data, ssh_key_unloc
@pytest.mark.django_db
@pytest.mark.parametrize('inputs, valid', [
({'vault_password': 'some-pass'}, True),
({}, False),
({}, True),
({'vault_password': 'dev-pass', 'vault_id': 'dev'}, True),
({'vault_password': 'dev-pass', 'vault_id': 'dev@prompt'}, False), # @ not allowed
])