move code linting to a stricter pep8-esque auto-formatting tool, black

This commit is contained in:
Ryan Petrello
2021-03-19 12:44:51 -04:00
parent 9b702e46fe
commit c2ef0a6500
671 changed files with 20538 additions and 21924 deletions

View File

@@ -9,7 +9,7 @@ import requests
# Django
from django.apps import apps
from django.conf import settings
from django.contrib.auth.models import User # noqa
from django.contrib.auth.models import User # noqa
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ValidationError
from django.db import models
@@ -19,9 +19,7 @@ from django.utils.translation import ugettext_lazy as _
# AWX
from awx.main.models.base import prevent_search
from awx.main.models.rbac import (
Role, RoleAncestorEntry, get_roles_on_resource
)
from awx.main.models.rbac import Role, RoleAncestorEntry, get_roles_on_resource
from awx.main.utils import parse_yaml_or_json, get_custom_venv_choices, get_licenser, polymorphic
from awx.main.utils.encryption import decrypt_value, get_encryption_key, is_encrypted
from awx.main.utils.polymorphic import build_polymorphic_ctypes_map
@@ -32,19 +30,26 @@ from awx.main.constants import ACTIVE_STATES
logger = logging.getLogger('awx.main.models.mixins')
__all__ = ['ResourceMixin', 'SurveyJobTemplateMixin', 'SurveyJobMixin',
'TaskManagerUnifiedJobMixin', 'TaskManagerJobMixin', 'TaskManagerProjectUpdateMixin',
'TaskManagerInventoryUpdateMixin', 'ExecutionEnvironmentMixin', 'CustomVirtualEnvMixin']
__all__ = [
'ResourceMixin',
'SurveyJobTemplateMixin',
'SurveyJobMixin',
'TaskManagerUnifiedJobMixin',
'TaskManagerJobMixin',
'TaskManagerProjectUpdateMixin',
'TaskManagerInventoryUpdateMixin',
'ExecutionEnvironmentMixin',
'CustomVirtualEnvMixin',
]
class ResourceMixin(models.Model):
class Meta:
abstract = True
@classmethod
def accessible_objects(cls, accessor, role_field):
'''
"""
Use instead of `MyModel.objects` when you want to only consider
resources that a user has specific permissions for. For example:
MyModel.accessible_objects(user, 'read_role').filter(name__istartswith='bar');
@@ -52,7 +57,7 @@ class ResourceMixin(models.Model):
specific resource you want to check permissions on, it is more
performant to resolve the resource in question then call
`myresource.get_permissions(user)`.
'''
"""
return ResourceMixin._accessible_objects(cls, accessor, role_field)
@classmethod
@@ -67,32 +72,25 @@ class ResourceMixin(models.Model):
ancestor_roles = [accessor]
else:
accessor_type = ContentType.objects.get_for_model(accessor)
ancestor_roles = Role.objects.filter(content_type__pk=accessor_type.id,
object_id=accessor.id)
ancestor_roles = Role.objects.filter(content_type__pk=accessor_type.id, object_id=accessor.id)
if content_types is None:
ct_kwarg = dict(content_type_id = ContentType.objects.get_for_model(cls).id)
ct_kwarg = dict(content_type_id=ContentType.objects.get_for_model(cls).id)
else:
ct_kwarg = dict(content_type_id__in = content_types)
return RoleAncestorEntry.objects.filter(
ancestor__in = ancestor_roles,
role_field = role_field,
**ct_kwarg
).values_list('object_id').distinct()
ct_kwarg = dict(content_type_id__in=content_types)
return RoleAncestorEntry.objects.filter(ancestor__in=ancestor_roles, role_field=role_field, **ct_kwarg).values_list('object_id').distinct()
@staticmethod
def _accessible_objects(cls, accessor, role_field):
return cls.objects.filter(pk__in = ResourceMixin._accessible_pk_qs(cls, accessor, role_field))
return cls.objects.filter(pk__in=ResourceMixin._accessible_pk_qs(cls, accessor, role_field))
def get_permissions(self, accessor):
'''
"""
Returns a string list of the roles a accessor has for a given resource.
An accessor can be either a User, Role, or an arbitrary resource that
contains one or more Roles associated with it.
'''
"""
return get_roles_on_resource(self, accessor)
@@ -104,15 +102,13 @@ class SurveyJobTemplateMixin(models.Model):
survey_enabled = models.BooleanField(
default=False,
)
survey_spec = prevent_search(JSONField(
blank=True,
default=dict,
))
ask_variables_on_launch = AskForField(
blank=True,
default=False,
allows_field='extra_vars'
survey_spec = prevent_search(
JSONField(
blank=True,
default=dict,
)
)
ask_variables_on_launch = AskForField(blank=True, default=False, allows_field='extra_vars')
def survey_password_variables(self):
vars = []
@@ -133,7 +129,7 @@ class SurveyJobTemplateMixin(models.Model):
return vars
def _update_unified_job_kwargs(self, create_kwargs, kwargs):
'''
"""
Combine extra_vars with variable precedence order:
JT extra_vars -> JT survey defaults -> runtime extra_vars
@@ -143,7 +139,7 @@ class SurveyJobTemplateMixin(models.Model):
:type kwargs: dict
:return: modified create_kwargs.
:rtype: dict
'''
"""
# Job Template extra_vars
extra_vars = self.extra_vars_dict
@@ -170,11 +166,7 @@ class SurveyJobTemplateMixin(models.Model):
if default is not None:
decrypted_default = default
if (
survey_element['type'] == "password" and
isinstance(decrypted_default, str) and
decrypted_default.startswith('$encrypted$')
):
if survey_element['type'] == "password" and isinstance(decrypted_default, str) and decrypted_default.startswith('$encrypted$'):
decrypted_default = decrypt_value(get_encryption_key('value', pk=None), decrypted_default)
errors = self._survey_element_validation(survey_element, {variable_key: decrypted_default})
if not errors:
@@ -192,12 +184,9 @@ class SurveyJobTemplateMixin(models.Model):
# default (if any) will be validated against instead
errors = []
if (survey_element['type'] == "password"):
if survey_element['type'] == "password":
password_value = data.get(survey_element['variable'])
if (
isinstance(password_value, str) and
password_value == '$encrypted$'
):
if isinstance(password_value, str) and password_value == '$encrypted$':
if survey_element.get('default') is None and survey_element['required']:
if validate_required:
errors.append("'%s' value missing" % survey_element['variable'])
@@ -209,43 +198,60 @@ class SurveyJobTemplateMixin(models.Model):
elif survey_element['type'] in ["textarea", "text", "password"]:
if survey_element['variable'] in data:
if not isinstance(data[survey_element['variable']], str):
errors.append("Value %s for '%s' expected to be a string." % (data[survey_element['variable']],
survey_element['variable']))
errors.append("Value %s for '%s' expected to be a string." % (data[survey_element['variable']], survey_element['variable']))
return errors
if 'min' in survey_element and survey_element['min'] not in ["", None] and len(data[survey_element['variable']]) < int(survey_element['min']):
errors.append("'%s' value %s is too small (length is %s must be at least %s)." %
(survey_element['variable'], data[survey_element['variable']], len(data[survey_element['variable']]), survey_element['min']))
errors.append(
"'%s' value %s is too small (length is %s must be at least %s)."
% (survey_element['variable'], data[survey_element['variable']], len(data[survey_element['variable']]), survey_element['min'])
)
if 'max' in survey_element and survey_element['max'] not in ["", None] and len(data[survey_element['variable']]) > int(survey_element['max']):
errors.append("'%s' value %s is too large (must be no more than %s)." %
(survey_element['variable'], data[survey_element['variable']], survey_element['max']))
errors.append(
"'%s' value %s is too large (must be no more than %s)."
% (survey_element['variable'], data[survey_element['variable']], survey_element['max'])
)
elif survey_element['type'] == 'integer':
if survey_element['variable'] in data:
if type(data[survey_element['variable']]) != int:
errors.append("Value %s for '%s' expected to be an integer." % (data[survey_element['variable']],
survey_element['variable']))
errors.append("Value %s for '%s' expected to be an integer." % (data[survey_element['variable']], survey_element['variable']))
return errors
if 'min' in survey_element and survey_element['min'] not in ["", None] and survey_element['variable'] in data and \
data[survey_element['variable']] < int(survey_element['min']):
errors.append("'%s' value %s is too small (must be at least %s)." %
(survey_element['variable'], data[survey_element['variable']], survey_element['min']))
if 'max' in survey_element and survey_element['max'] not in ["", None] and survey_element['variable'] in data and \
data[survey_element['variable']] > int(survey_element['max']):
errors.append("'%s' value %s is too large (must be no more than %s)." %
(survey_element['variable'], data[survey_element['variable']], survey_element['max']))
if (
'min' in survey_element
and survey_element['min'] not in ["", None]
and survey_element['variable'] in data
and data[survey_element['variable']] < int(survey_element['min'])
):
errors.append(
"'%s' value %s is too small (must be at least %s)."
% (survey_element['variable'], data[survey_element['variable']], survey_element['min'])
)
if (
'max' in survey_element
and survey_element['max'] not in ["", None]
and survey_element['variable'] in data
and data[survey_element['variable']] > int(survey_element['max'])
):
errors.append(
"'%s' value %s is too large (must be no more than %s)."
% (survey_element['variable'], data[survey_element['variable']], survey_element['max'])
)
elif survey_element['type'] == 'float':
if survey_element['variable'] in data:
if type(data[survey_element['variable']]) not in (float, int):
errors.append("Value %s for '%s' expected to be a numeric type." % (data[survey_element['variable']],
survey_element['variable']))
errors.append("Value %s for '%s' expected to be a numeric type." % (data[survey_element['variable']], survey_element['variable']))
return errors
if 'min' in survey_element and survey_element['min'] not in ["", None] and data[survey_element['variable']] < float(survey_element['min']):
errors.append("'%s' value %s is too small (must be at least %s)." %
(survey_element['variable'], data[survey_element['variable']], survey_element['min']))
errors.append(
"'%s' value %s is too small (must be at least %s)."
% (survey_element['variable'], data[survey_element['variable']], survey_element['min'])
)
if 'max' in survey_element and survey_element['max'] not in ["", None] and data[survey_element['variable']] > float(survey_element['max']):
errors.append("'%s' value %s is too large (must be no more than %s)." %
(survey_element['variable'], data[survey_element['variable']], survey_element['max']))
errors.append(
"'%s' value %s is too large (must be no more than %s)."
% (survey_element['variable'], data[survey_element['variable']], survey_element['max'])
)
elif survey_element['type'] == 'multiselect':
if survey_element['variable'] in data:
if type(data[survey_element['variable']]) != list:
@@ -256,21 +262,18 @@ class SurveyJobTemplateMixin(models.Model):
choice_list = [choice for choice in choice_list.splitlines() if choice.strip() != '']
for val in data[survey_element['variable']]:
if val not in choice_list:
errors.append("Value %s for '%s' expected to be one of %s." % (val, survey_element['variable'],
choice_list))
errors.append("Value %s for '%s' expected to be one of %s." % (val, survey_element['variable'], choice_list))
elif survey_element['type'] == 'multiplechoice':
choice_list = copy(survey_element['choices'])
if isinstance(choice_list, str):
choice_list = [choice for choice in choice_list.splitlines() if choice.strip() != '']
if survey_element['variable'] in data:
if data[survey_element['variable']] not in choice_list:
errors.append("Value %s for '%s' expected to be one of %s." % (data[survey_element['variable']],
survey_element['variable'],
choice_list))
errors.append("Value %s for '%s' expected to be one of %s." % (data[survey_element['variable']], survey_element['variable'], choice_list))
return errors
def _accept_or_ignore_variables(self, data, errors=None, _exclude_errors=(), extra_passwords=None):
survey_is_enabled = (self.survey_enabled and self.survey_spec)
survey_is_enabled = self.survey_enabled and self.survey_spec
extra_vars = data.copy()
if errors is None:
errors = {}
@@ -285,12 +288,11 @@ class SurveyJobTemplateMixin(models.Model):
value = data.get(key, None)
validate_required = 'required' not in _exclude_errors
if extra_passwords and key in extra_passwords and is_encrypted(value):
element_errors = self._survey_element_validation(survey_element, {
key: decrypt_value(get_encryption_key('value', pk=None), value)
}, validate_required=validate_required)
else:
element_errors = self._survey_element_validation(
survey_element, data, validate_required=validate_required)
survey_element, {key: decrypt_value(get_encryption_key('value', pk=None), value)}, validate_required=validate_required
)
else:
element_errors = self._survey_element_validation(survey_element, data, validate_required=validate_required)
if element_errors:
survey_errors += element_errors
@@ -309,7 +311,7 @@ class SurveyJobTemplateMixin(models.Model):
if extra_vars:
# Prune the prompted variables for those identical to template
tmp_extra_vars = self.extra_vars_dict
for key in (set(tmp_extra_vars.keys()) & set(extra_vars.keys())):
for key in set(tmp_extra_vars.keys()) & set(extra_vars.keys()):
if tmp_extra_vars[key] == extra_vars[key]:
extra_vars.pop(key)
@@ -318,18 +320,20 @@ class SurveyJobTemplateMixin(models.Model):
rejected.update(extra_vars)
# ignored variables does not block manual launch
if 'prompts' not in _exclude_errors:
errors['extra_vars'] = [_('Variables {list_of_keys} are not allowed on launch. Check the Prompt on Launch setting '+
'on the {model_name} to include Extra Variables.').format(
list_of_keys=', '.join([str(key) for key in extra_vars.keys()]),
model_name=self._meta.verbose_name.title())]
errors['extra_vars'] = [
_(
'Variables {list_of_keys} are not allowed on launch. Check the Prompt on Launch setting '
+ 'on the {model_name} to include Extra Variables.'
).format(list_of_keys=', '.join([str(key) for key in extra_vars.keys()]), model_name=self._meta.verbose_name.title())
]
return (accepted, rejected, errors)
@staticmethod
def pivot_spec(spec):
'''
"""
Utility method that will return a dictionary keyed off variable names
'''
"""
pivoted = {}
for element_data in spec.get('spec', []):
if 'variable' in element_data:
@@ -349,9 +353,9 @@ class SurveyJobTemplateMixin(models.Model):
return errors
def display_survey_spec(self):
'''
"""
Hide encrypted default passwords in survey specs
'''
"""
survey_spec = deepcopy(self.survey_spec) if self.survey_spec else {}
for field in survey_spec.get('spec', []):
if field.get('type') == 'password':
@@ -364,16 +368,18 @@ class SurveyJobMixin(models.Model):
class Meta:
abstract = True
survey_passwords = prevent_search(JSONField(
blank=True,
default=dict,
editable=False,
))
survey_passwords = prevent_search(
JSONField(
blank=True,
default=dict,
editable=False,
)
)
def display_extra_vars(self):
'''
"""
Hides fields marked as passwords in survey.
'''
"""
if self.survey_passwords:
extra_vars = json.loads(self.extra_vars)
for key, value in self.survey_passwords.items():
@@ -384,9 +390,9 @@ class SurveyJobMixin(models.Model):
return self.extra_vars
def decrypted_extra_vars(self):
'''
"""
Decrypts fields marked as passwords in survey.
'''
"""
if self.survey_passwords:
extra_vars = json.loads(self.extra_vars)
for key in self.survey_passwords:
@@ -484,19 +490,13 @@ class CustomVirtualEnvMixin(models.Model):
abstract = True
custom_virtualenv = models.CharField(
blank=True,
null=True,
default=None,
max_length=100,
help_text=_('Local absolute file path containing a custom Python virtualenv to use')
blank=True, null=True, default=None, max_length=100, help_text=_('Local absolute file path containing a custom Python virtualenv to use')
)
def clean_custom_virtualenv(self):
value = self.custom_virtualenv
if value and os.path.join(value, '') not in get_custom_venv_choices():
raise ValidationError(
_('{} is not a valid virtualenv in {}').format(value, settings.BASE_VENV_PATH)
)
raise ValidationError(_('{} is not a valid virtualenv in {}').format(value, settings.BASE_VENV_PATH))
if value:
return os.path.join(value, '')
return None
@@ -504,12 +504,13 @@ class CustomVirtualEnvMixin(models.Model):
class RelatedJobsMixin(object):
'''
"""
This method is intended to be overwritten.
Called by get_active_jobs()
Returns a list of active jobs (i.e. running) associated with the calling
resource (self). Expected to return a QuerySet
'''
"""
def _get_related_jobs(self):
return self.objects.none()
@@ -519,6 +520,7 @@ class RelatedJobsMixin(object):
'''
Returns [{'id': 1, 'type': 'job'}, {'id': 2, 'type': 'project_update'}, ...]
'''
def get_active_jobs(self):
UnifiedJob = apps.get_model('main', 'UnifiedJob')
mapping = build_polymorphic_ctypes_map(UnifiedJob)
@@ -538,24 +540,15 @@ class WebhookTemplateMixin(models.Model):
('gitlab', "GitLab"),
]
webhook_service = models.CharField(
max_length=16,
choices=SERVICES,
blank=True,
help_text=_('Service that webhook requests will be accepted from')
)
webhook_key = prevent_search(models.CharField(
max_length=64,
blank=True,
help_text=_('Shared secret that the webhook service will use to sign requests')
))
webhook_service = models.CharField(max_length=16, choices=SERVICES, blank=True, help_text=_('Service that webhook requests will be accepted from'))
webhook_key = prevent_search(models.CharField(max_length=64, blank=True, help_text=_('Shared secret that the webhook service will use to sign requests')))
webhook_credential = models.ForeignKey(
'Credential',
blank=True,
null=True,
on_delete=models.SET_NULL,
related_name='%(class)ss',
help_text=_('Personal Access Token for posting back the status to the service API')
help_text=_('Personal Access Token for posting back the status to the service API'),
)
def rotate_webhook_key(self):
@@ -582,25 +575,16 @@ class WebhookMixin(models.Model):
SERVICES = WebhookTemplateMixin.SERVICES
webhook_service = models.CharField(
max_length=16,
choices=SERVICES,
blank=True,
help_text=_('Service that webhook requests will be accepted from')
)
webhook_service = models.CharField(max_length=16, choices=SERVICES, blank=True, help_text=_('Service that webhook requests will be accepted from'))
webhook_credential = models.ForeignKey(
'Credential',
blank=True,
null=True,
on_delete=models.SET_NULL,
related_name='%(class)ss',
help_text=_('Personal Access Token for posting back the status to the service API')
)
webhook_guid = models.CharField(
blank=True,
max_length=128,
help_text=_('Unique identifier of the event that triggered this webhook')
help_text=_('Personal Access Token for posting back the status to the service API'),
)
webhook_guid = models.CharField(blank=True, max_length=128, help_text=_('Unique identifier of the event that triggered this webhook'))
def update_webhook_status(self, status):
if not self.webhook_credential:
@@ -645,10 +629,7 @@ class WebhookMixin(models.Model):
'target_url': self.get_ui_url(),
}
k, v = service_header[self.webhook_service]
headers = {
k: v.format(self.webhook_credential.get_input('token')),
'Content-Type': 'application/json'
}
headers = {k: v.format(self.webhook_credential.get_input('token')), 'Content-Type': 'application/json'}
response = requests.post(status_api, data=json.dumps(data), headers=headers, timeout=30)
except Exception:
logger.exception("Posting webhook status caused an error.")
@@ -657,8 +638,4 @@ class WebhookMixin(models.Model):
if response.status_code < 400:
logger.debug("Webhook status update sent.")
else:
logger.error(
"Posting webhook status failed, code: {}\n"
"{}\n"
"Payload sent: {}".format(response.status_code, response.text, json.dumps(data))
)
logger.error("Posting webhook status failed, code: {}\n" "{}\n" "Payload sent: {}".format(response.status_code, response.text, json.dumps(data)))