Serializer RBAC and structure review changes (#17)

* Bulk launch serializer RBAC and code structure review

Use WJ node as base in bulk job launch child
  remove fields we get for free this way

Minor translation marking

Consolidate bulk API permission methods
  split out permission check for each UJT type

Code consolidation for org check method

add a save before starting the workflow job
This commit is contained in:
Alan Rominger
2023-03-07 01:55:53 -05:00
committed by Elijah DeLee
parent 47b7bbeda7
commit ac99708952
4 changed files with 105 additions and 175 deletions

View File

@@ -31,7 +31,6 @@ from django.utils.encoding import force_str
from django.utils.text import capfirst from django.utils.text import capfirst
from django.utils.timezone import now from django.utils.timezone import now
from django.core.validators import RegexValidator, MaxLengthValidator from django.core.validators import RegexValidator, MaxLengthValidator
from django.db.models import Q
# Django REST Framework # Django REST Framework
from rest_framework.exceptions import ValidationError, PermissionDenied from rest_framework.exceptions import ValidationError, PermissionDenied
@@ -1966,8 +1965,13 @@ class BulkHostCreateSerializer(serializers.Serializer):
inventory = serializers.PrimaryKeyRelatedField( inventory = serializers.PrimaryKeyRelatedField(
queryset=Inventory.objects.all(), required=True, write_only=True, help_text=_('Primary Key ID of inventory to add hosts to.') queryset=Inventory.objects.all(), required=True, write_only=True, help_text=_('Primary Key ID of inventory to add hosts to.')
) )
hosts_help_text = _('List of hosts to be created, JSON. e.g. [{"name": "example.com"}, {"name": "127.0.0.1"}]') hosts = serializers.ListField(
hosts = serializers.ListField(child=BulkHostSerializer(), allow_empty=False, max_length=100000, write_only=True, help_text=hosts_help_text) child=BulkHostSerializer(),
allow_empty=False,
max_length=100000,
write_only=True,
help_text=_('List of hosts to be created, JSON. e.g. [{"name": "example.com"}, {"name": "127.0.0.1"}]'),
)
class Meta: class Meta:
model = Inventory model = Inventory
@@ -1994,7 +1998,7 @@ class BulkHostCreateSerializer(serializers.Serializer):
# Don't check license if it is open license # Don't check license if it is open license
if validation_info.get('license_type', 'UNLICENSED') == 'open': if validation_info.get('license_type', 'UNLICENSED') == 'open':
return True return
sys_free_instances = validation_info.get('free_instances', 0) sys_free_instances = validation_info.get('free_instances', 0)
system_net_new_host_count = Host.objects.exclude(name__in=new_hosts).count() system_net_new_host_count = Host.objects.exclude(name__in=new_hosts).count()
@@ -2006,24 +2010,17 @@ class BulkHostCreateSerializer(serializers.Serializer):
raise PermissionDenied(_("Host count exceeds available instances.")) raise PermissionDenied(_("Host count exceeds available instances."))
logger.warning(_("Number of hosts allowed by license has been exceeded.")) logger.warning(_("Number of hosts allowed by license has been exceeded."))
return True
def validate(self, attrs): def validate(self, attrs):
request = self.context.get('request', None) request = self.context.get('request', None)
inv = attrs['inventory'] inv = attrs['inventory']
if inv.kind != '':
raise serializers.ValidationError(_('Hosts can only be created in manual inventories (not smart or constructed types).'))
if len(attrs['hosts']) > settings.BULK_HOST_MAX_CREATE: if len(attrs['hosts']) > settings.BULK_HOST_MAX_CREATE:
raise serializers.ValidationError(_('Number of hosts exceeds system setting BULK_HOST_MAX_CREATE')) raise serializers.ValidationError(_('Number of hosts exceeds system setting BULK_HOST_MAX_CREATE'))
if request and not request.user.is_superuser: if request and not request.user.is_superuser:
if inv.organization: if request.user not in inv.admin_role:
is_org_admin = request.user in inv.organization.admin_role
is_org_inv_admin = request.user in inv.organization.inventory_admin_role
else:
is_org_admin = False
is_org_inv_admin = False
is_inventory_admin = request.user in inv.admin_role
if not any([is_inventory_admin, is_org_admin, is_org_inv_admin]):
raise serializers.ValidationError(_(f'Inventory with id {inv.id} not found or lack permissions to add hosts.')) raise serializers.ValidationError(_(f'Inventory with id {inv.id} not found or lack permissions to add hosts.'))
current_hostnames = {h[0] for h in Host.objects.filter(inventory=inv).values_list('name').all()} current_hostnames = set(inv.hosts.values_list('name', flat=True))
new_names = [host['name'] for host in attrs['hosts']] new_names = [host['name'] for host in attrs['hosts']]
duplicate_new_names = [n for n in new_names if n in current_hostnames or new_names.count(n) > 1] duplicate_new_names = [n for n in new_names if n in current_hostnames or new_names.count(n) > 1]
if duplicate_new_names: if duplicate_new_names:
@@ -4536,59 +4533,41 @@ class WorkflowJobLaunchSerializer(BaseSerializer):
return accepted return accepted
class BulkJobNodeSerializer(serializers.Serializer): class BulkJobNodeSerializer(WorkflowJobNodeSerializer):
# We don't do a PrimaryKeyRelatedField for unified_job_template and inventory, because that increases the number # We don't do a PrimaryKeyRelatedField for unified_job_template and others, because that increases the number
# of database queries, rather we take them as integer and later convert them to objects in get_objectified_jobs # of database queries, rather we take them as integer and later convert them to objects in get_objectified_jobs
unified_job_template = serializers.IntegerField( unified_job_template = serializers.IntegerField(
required=True, required=True, min_value=1, help_text=_('Primary key of the template for this job, can be a job template or inventory source.')
min_value=1,
) )
inventory = serializers.IntegerField(required=False, min_value=1) inventory = serializers.IntegerField(required=False, min_value=1)
execution_environment = serializers.IntegerField(required=False, min_value=1)
# many-to-many fields
credentials = serializers.ListField(child=serializers.IntegerField(min_value=1), required=False) credentials = serializers.ListField(child=serializers.IntegerField(min_value=1), required=False)
identifier = serializers.CharField(required=False, write_only=True, allow_blank=False)
labels = serializers.ListField(child=serializers.IntegerField(min_value=1), required=False) labels = serializers.ListField(child=serializers.IntegerField(min_value=1), required=False)
instance_groups = serializers.ListField(child=serializers.IntegerField(min_value=1), required=False) instance_groups = serializers.ListField(child=serializers.IntegerField(min_value=1), required=False)
execution_environment = serializers.IntegerField(required=False, min_value=1)
limit = serializers.CharField(required=False, write_only=True, allow_blank=False)
scm_branch = serializers.CharField(required=False, write_only=True, allow_blank=False)
verbosity = serializers.IntegerField(required=False, min_value=1)
forks = serializers.IntegerField(required=False, min_value=1)
diff_mode = serializers.BooleanField(required=False, allow_null=True, default=None)
job_tags = serializers.CharField(required=False, write_only=True, allow_blank=False)
job_type = serializers.CharField(required=False, write_only=True, allow_blank=False)
skip_tags = serializers.CharField(required=False, write_only=True, allow_blank=False)
job_slice_count = serializers.IntegerField(required=False, min_value=1)
timeout = serializers.IntegerField(required=False, min_value=1)
extra_data = serializers.JSONField(write_only=True, required=False)
class Meta: class Meta:
model = WorkflowJobNode model = WorkflowJobNode
fields = ( fields = ('*', 'credentials', 'labels', 'instance_groups') # m2m fields are not canonical for WJ nodes
'unified_job_template',
'identifier', def validate(self, attrs):
'inventory', return super(LaunchConfigurationBaseSerializer, self).validate(attrs)
'credentials',
'limit', def get_validation_exclusions(self, obj=None):
'labels', ret = super().get_validation_exclusions(obj)
'instance_groups', ret.extend(['unified_job_template', 'inventory', 'execution_environment'])
'execution_environment', return ret
'scm_branch',
'verbosity',
'forks',
'diff_mode',
'extra_data',
'job_slice_count',
'job_tags',
'job_type',
'skip_tags',
'timeout',
)
class BulkJobLaunchSerializer(BaseSerializer): class BulkJobLaunchSerializer(serializers.Serializer):
name = serializers.CharField(default='Bulk Job Launch', max_length=512, write_only=True, required=False, allow_blank=True) # limited by max name of jobs name = serializers.CharField(default='Bulk Job Launch', max_length=512, write_only=True, required=False, allow_blank=True) # limited by max name of jobs
job_node_help_text = _('List of jobs to be launched, JSON. e.g. [{"unified_job_template": 7}, {"unified_job_template": 10}]') jobs = BulkJobNodeSerializer(
jobs = BulkJobNodeSerializer(many=True, allow_empty=False, write_only=True, max_length=100000, help_text=job_node_help_text) many=True,
allow_empty=False,
write_only=True,
max_length=100000,
help_text=_('List of jobs to be launched, JSON. e.g. [{"unified_job_template": 7}, {"unified_job_template": 10}]'),
)
description = serializers.CharField(write_only=True, required=False, allow_blank=False) description = serializers.CharField(write_only=True, required=False, allow_blank=False)
extra_vars = serializers.JSONField(write_only=True, required=False) extra_vars = serializers.JSONField(write_only=True, required=False)
organization = serializers.PrimaryKeyRelatedField( organization = serializers.PrimaryKeyRelatedField(
@@ -4596,7 +4575,8 @@ class BulkJobLaunchSerializer(BaseSerializer):
required=False, required=False,
default=None, default=None,
allow_null=True, allow_null=True,
help_text=_('Inherit permissions from organization roles.'), write_only=True,
help_text=_('Inherit permissions from this organization. If not provided, a organization the user is a member of will be selected automatically.'),
) )
inventory = serializers.PrimaryKeyRelatedField(queryset=Inventory.objects.all(), required=False, write_only=True) inventory = serializers.PrimaryKeyRelatedField(queryset=Inventory.objects.all(), required=False, write_only=True)
limit = serializers.CharField(write_only=True, required=False, allow_blank=False) limit = serializers.CharField(write_only=True, required=False, allow_blank=False)
@@ -4630,41 +4610,49 @@ class BulkJobLaunchSerializer(BaseSerializer):
requested_use_labels = set() requested_use_labels = set()
requested_use_instance_groups = set() requested_use_instance_groups = set()
for job in attrs['jobs']: for job in attrs['jobs']:
if 'credentials' in job: for cred in job.get('credentials', []):
[requested_use_credentials.add(cred) for cred in job['credentials']] requested_use_credentials.add(cred)
if 'labels' in job: for label in job.get('labels', []):
[requested_use_labels.add(label) for label in job['labels']] requested_use_labels.add(label)
if 'instance_groups' in job: for instance_group in job.get('instance_groups', []):
[requested_use_instance_groups.add(instance_group) for instance_group in job['instance_groups']] requested_use_instance_groups.add(instance_group)
key_to_obj_map = {
"unified_job_template": {obj.id: obj for obj in UnifiedJobTemplate.objects.filter(id__in=requested_ujts)},
"inventory": {obj.id: obj for obj in Inventory.objects.filter(id__in=requested_use_inventories)},
"credentials": {obj.id: obj for obj in Credential.objects.filter(id__in=requested_use_credentials)},
"labels": {obj.id: obj for obj in Label.objects.filter(id__in=requested_use_labels)},
"instance_groups": {obj.id: obj for obj in InstanceGroup.objects.filter(id__in=requested_use_instance_groups)},
"execution_environment": {obj.id: obj for obj in ExecutionEnvironment.objects.filter(id__in=requested_use_execution_environments)},
}
ujts = {}
for ujt in key_to_obj_map['unified_job_template'].values():
ujts.setdefault(type(ujt), [])
ujts[type(ujt)].append(ujt)
unallowed_types = set(ujts.keys()) - set([JobTemplate, Project, InventorySource, WorkflowJobTemplate])
if unallowed_types:
type_names = ' '.join([cls._meta.verbose_name.title() for cls in unallowed_types])
raise serializers.ValidationError(_("Template types {type_names} not allowed in bulk jobs").format(type_names=type_names))
for model, obj_list in ujts.items():
role_field = 'execute_role' if issubclass(model, (JobTemplate, WorkflowJobTemplate)) else 'update_role'
self.check_list_permission(model, set([obj.id for obj in obj_list]), role_field)
self.check_organization_permission(attrs, request) self.check_organization_permission(attrs, request)
self.check_unified_job_permission(request, requested_ujts)
if requested_use_inventories or 'inventory' in attrs:
self.check_inventory_permission(attrs, request, requested_use_inventories)
if requested_use_credentials: if 'inventory' in attrs:
self.check_credential_permission(request, requested_use_credentials) requested_use_inventories.add(attrs['inventory'].id)
if requested_use_labels: self.check_list_permission(Inventory, requested_use_inventories, 'use_role')
self.check_label_permission(request, requested_use_labels)
if requested_use_instance_groups: self.check_list_permission(Credential, requested_use_credentials, 'use_role')
self.check_instance_group_permission(request, requested_use_instance_groups) self.check_list_permission(Label, requested_use_labels)
self.check_list_permission(InstanceGroup, requested_use_instance_groups) # TODO: change to use_role for conflict
self.check_list_permission(ExecutionEnvironment, requested_use_execution_environments) # TODO: change if roles introduced
if requested_use_execution_environments: jobs_object = self.get_objectified_jobs(attrs, key_to_obj_map)
self.check_execution_environment_permission(request, requested_use_instance_groups)
# all of the unified job templates and related items have now been checked, we can now grab the objects from the DB
jobs_object = self.get_objectified_jobs(
attrs,
requested_ujts,
requested_use_inventories,
requested_use_credentials,
requested_use_labels,
requested_use_instance_groups,
requested_use_execution_environments,
)
attrs['jobs'] = jobs_object attrs['jobs'] = jobs_object
if 'extra_vars' in attrs: if 'extra_vars' in attrs:
@@ -4673,6 +4661,23 @@ class BulkJobLaunchSerializer(BaseSerializer):
attrs = super().validate(attrs) attrs = super().validate(attrs)
return attrs return attrs
def check_list_permission(self, model, id_list, role_field=None):
if not id_list:
return
user = self.context['request'].user
if role_field is None: # implies "read" level permission is required
access_qs = user.get_queryset(model)
else:
access_qs = model.accessible_objects(user, role_field)
not_allowed = set(id_list) - set(access_qs.filter(id__in=id_list).values_list('id', flat=True))
if not_allowed:
raise serializers.ValidationError(
_("{model_name} {not_allowed} not found or you don't have permissions to access it").format(
model_name=model._meta.verbose_name_plural.title(), not_allowed=not_allowed
)
)
def create(self, validated_data): def create(self, validated_data):
request = self.context.get('request', None) request = self.context.get('request', None)
launch_user = request.user if request else None launch_user = request.user if request else None
@@ -4750,8 +4755,8 @@ class BulkJobLaunchSerializer(BaseSerializer):
if through_models: if through_models:
obj_through_model.objects.bulk_create(through_models) obj_through_model.objects.bulk_create(through_models)
wfj.status = 'pending'
wfj.save() wfj.save()
wfj.signal_start()
return WorkflowJobSerializer().to_representation(wfj) return WorkflowJobSerializer().to_representation(wfj)
@@ -4760,97 +4765,23 @@ class BulkJobLaunchSerializer(BaseSerializer):
# - If the orgs is not set, set it to the org of the launching user # - If the orgs is not set, set it to the org of the launching user
# - If the user is part of multiple orgs, throw a validation error saying user is part of multiple orgs, please provide one # - If the user is part of multiple orgs, throw a validation error saying user is part of multiple orgs, please provide one
if not request.user.is_superuser: if not request.user.is_superuser:
read_org_qs = Organization.accessible_objects(request.user, 'read_role')
if 'organization' not in attrs or attrs['organization'] == None or attrs['organization'] == '': if 'organization' not in attrs or attrs['organization'] == None or attrs['organization'] == '':
if Organization.accessible_pk_qs(request.user, 'read_role').count() == 1: read_org_ct = read_org_qs.count()
for tup in Organization.accessible_pk_qs(request.user, 'read_role').all(): if read_org_ct == 1:
attrs['organization'] = Organization.objects.filter(id__in=str(tup[0])).first() attrs['organization'] = read_org_qs.first()
elif Organization.accessible_pk_qs(request.user, 'read_role').count() > 1: elif read_org_ct > 1:
raise serializers.ValidationError("User has permission to multiple Organizations, please set one of them in the request") raise serializers.ValidationError("User has permission to multiple Organizations, please set one of them in the request")
else: else:
raise serializers.ValidationError("User not part of any organization, please assign an organization to assign to the bulk job") raise serializers.ValidationError("User not part of any organization, please assign an organization to assign to the bulk job")
else: else:
allowed_orgs = set() allowed_orgs = set(read_org_qs.values_list('id', flat=True))
requested_org = attrs['organization'] requested_org = attrs['organization']
if request and not request.user.is_superuser: if requested_org.id not in allowed_orgs:
[allowed_orgs.add(tup[0]) for tup in Organization.accessible_pk_qs(request.user, 'read_role').all()] raise ValidationError(_(f"Organization {requested_org.id} not found or you don't have permissions to access it"))
if requested_org.id not in allowed_orgs:
raise ValidationError(_(f"Organization {requested_org.id} not found or you don't have permissions to access it"))
def check_unified_job_permission(self, request, requested_ujts): def get_objectified_jobs(self, attrs, key_to_obj_map):
allowed_ujts = set()
[allowed_ujts.add(tup[0]) for tup in UnifiedJobTemplate.accessible_pk_qs(request.user, 'execute_role').all()]
[allowed_ujts.add(tup[0]) for tup in UnifiedJobTemplate.accessible_pk_qs(request.user, 'admin_role').all()]
[allowed_ujts.add(tup[0]) for tup in UnifiedJobTemplate.accessible_pk_qs(request.user, 'update_role').all()]
accessible_inventories_qs = Inventory.accessible_pk_qs(request.user, 'update_role')
[allowed_ujts.add(tup[0]) for tup in InventorySource.objects.filter(inventory__in=accessible_inventories_qs).values_list('id')]
if requested_ujts - allowed_ujts:
not_allowed = requested_ujts - allowed_ujts
raise serializers.ValidationError(_(f"Unified Job Templates {not_allowed} not found or you don't have permissions to access it"))
def check_inventory_permission(self, attrs, request, requested_use_inventories):
accessible_use_inventories = {tup[0] for tup in Inventory.accessible_pk_qs(request.user, 'use_role')}
if requested_use_inventories - accessible_use_inventories:
not_allowed = requested_use_inventories - accessible_use_inventories
raise serializers.ValidationError(_(f"Inventories {not_allowed} not found or you don't have permissions to access it"))
if 'inventory' in attrs:
requested_workflow_inventory = attrs['inventory']
if requested_workflow_inventory.id not in accessible_use_inventories:
raise serializers.ValidationError(_(f"Inventories {requested_workflow_inventory.id} not found or you don't have permissions to access it"))
def check_credential_permission(self, request, requested_use_credentials):
accessible_use_credentials = {tup[0] for tup in Credential.accessible_pk_qs(request.user, 'use_role').all()}
if requested_use_credentials - accessible_use_credentials:
not_allowed = requested_use_credentials - accessible_use_credentials
raise serializers.ValidationError(_(f"Credentials {not_allowed} not found or you don't have permissions to access it"))
def check_label_permission(self, request, requested_use_labels):
accessible_use_labels = {tup.id for tup in Label.objects.filter(organization__in=Organization.accessible_pk_qs(request.user, 'read_role'))}
if requested_use_labels - accessible_use_labels:
not_allowed = requested_use_labels - accessible_use_labels
raise serializers.ValidationError(_(f"Labels {not_allowed} not found or you don't have permissions to access it"))
def check_instance_group_permission(self, request, requested_use_instance_groups):
# only org admins are allowed to see instance groups
organization_admin_qs = Organization.accessible_pk_qs(request.user, 'admin_role').all()
if organization_admin_qs:
accessible_use_instance_groups = {tup.id for tup in InstanceGroup.objects.all()}
if requested_use_instance_groups - accessible_use_instance_groups:
not_allowed = requested_use_instance_groups - accessible_use_instance_groups
raise serializers.ValidationError(_(f"Instance Groups {not_allowed} not found or you don't have permissions to access it"))
else:
raise serializers.ValidationError(_(f"Instance Groups {requested_use_instance_groups} not found or you don't have permissions to access it"))
def check_execution_environment_permission(self, request, requested_use_execution_environments):
accessible_execution_env = {
tup.id
for tup in ExecutionEnvironment.objects.filter(
Q(organization__in=Organization.accessible_pk_qs(request.user, 'read_role')) | Q(organization__isnull=True)
).distinct()
}
if requested_use_execution_environments - accessible_execution_env:
not_allowed = requested_use_execution_environments - accessible_execution_env
raise serializers.ValidationError(_(f"Execution Environments {not_allowed} not found or you don't have permissions to access it"))
def get_objectified_jobs(
self,
attrs,
requested_ujts,
requested_use_inventories,
requested_use_credentials,
requested_use_labels,
requested_use_instance_groups,
requested_use_execution_environments,
):
objectified_jobs = [] objectified_jobs = []
key_to_obj_map = {
"unified_job_template": {obj.id: obj for obj in UnifiedJobTemplate.objects.filter(id__in=requested_ujts)},
"inventory": {obj.id: obj for obj in Inventory.objects.filter(id__in=requested_use_inventories)},
"credentials": {obj.id: obj for obj in Credential.objects.filter(id__in=requested_use_credentials)},
"labels": {obj.id: obj for obj in Label.objects.filter(id__in=requested_use_labels)},
"instance_groups": {obj.id: obj for obj in InstanceGroup.objects.filter(id__in=requested_use_instance_groups)},
"execution_environment": {obj.id: obj for obj in ExecutionEnvironment.objects.filter(id__in=requested_use_execution_environments)},
}
# This loop is generalized so we should only have to add related items to the key_to_obj_map # This loop is generalized so we should only have to add related items to the key_to_obj_map
for job in attrs['jobs']: for job in attrs['jobs']:
objectified_job = {} objectified_job = {}
@@ -4859,9 +4790,7 @@ class BulkJobLaunchSerializer(BaseSerializer):
if isinstance(value, int): if isinstance(value, int):
objectified_job[key] = key_to_obj_map[key][value] objectified_job[key] = key_to_obj_map[key][value]
elif isinstance(value, list): elif isinstance(value, list):
objectified_job[key] = [] objectified_job[key] = [key_to_obj_map[key][item] for item in value]
for item in value:
objectified_job[key].append(key_to_obj_map[key][item])
else: else:
objectified_job[key] = value objectified_job[key] = value
objectified_jobs.append(objectified_job) objectified_jobs.append(objectified_job)

View File

@@ -1865,6 +1865,7 @@ class JobLaunchConfigAccess(UnifiedCredentialsMixin, BaseAccess):
@check_superuser @check_superuser
def can_add(self, data, template=None): def can_add(self, data, template=None):
# WARNING: duplicated with BulkJobLaunchSerializer, check when changing permission levels
# This is a special case, we don't check related many-to-many elsewhere # This is a special case, we don't check related many-to-many elsewhere
# launch RBAC checks use this # launch RBAC checks use this
if 'reference_obj' in data: if 'reference_obj' in data:
@@ -1999,7 +2000,7 @@ class WorkflowJobNodeAccess(BaseAccess):
def filtered_queryset(self): def filtered_queryset(self):
return self.model.objects.filter( return self.model.objects.filter(
Q(workflow_job__unified_job_template__in=UnifiedJobTemplate.accessible_pk_qs(self.user, 'read_role')) Q(workflow_job__unified_job_template__in=UnifiedJobTemplate.accessible_pk_qs(self.user, 'read_role'))
| Q(workflow_job__organization__in=Organization.objects.filter(Q(admin_role__members=self.user)), workflow_job__is_bulk_job=True) | Q(workflow_job__organization__in=Organization.objects.filter(Q(admin_role__members=self.user)))
) )
def can_read(self, obj): def can_read(self, obj):

View File

@@ -14,7 +14,7 @@ from awx.main.constants import JOB_VARIABLE_PREFIXES
@pytest.mark.django_db @pytest.mark.django_db
def test_subclass_types(rando): def test_subclass_types():
assert set(UnifiedJobTemplate._submodels_with_roles()) == set( assert set(UnifiedJobTemplate._submodels_with_roles()) == set(
[ [
ContentType.objects.get_for_model(JobTemplate).id, ContentType.objects.get_for_model(JobTemplate).id,

View File

@@ -129,7 +129,7 @@ def test_bulk_job_launch_no_access_to_job_template(job_template, organization, i
bulk_job_launch_response = post( bulk_job_launch_response = post(
reverse('api:bulk_job_launch'), {'name': 'Bulk Job Launch', 'jobs': [{'unified_job_template': jt.id}]}, normal_user, expect=400 reverse('api:bulk_job_launch'), {'name': 'Bulk Job Launch', 'jobs': [{'unified_job_template': jt.id}]}, normal_user, expect=400
).data ).data
assert bulk_job_launch_response['__all__'][0] == f'Unified Job Templates {{{jt.id}}} not found or you don\'t have permissions to access it' assert bulk_job_launch_response['__all__'][0] == f'Job Templates {{{jt.id}}} not found or you don\'t have permissions to access it'
@pytest.mark.django_db @pytest.mark.django_db