diff --git a/awx/api/serializers.py b/awx/api/serializers.py index b24c70fac3..98e8d03ff8 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -8,6 +8,7 @@ import logging import re from collections import OrderedDict from datetime import timedelta +from uuid import uuid4 # OAuth2 from oauthlib import oauth2 @@ -4525,6 +4526,249 @@ class WorkflowJobLaunchSerializer(BaseSerializer): return accepted +class BulkJobNodeSerializer(serializers.Serializer): + # if we can find out the user, we can filter down the UnifiedJobTemplate objects + unified_job_template = serializers.IntegerField( + required=True, + min_value=1, + ) + inventory = serializers.IntegerField(required=False, min_value=1) + credentials = serializers.ListField(child=serializers.IntegerField(min_value=1), required=False) + identifier = serializers.CharField(required=False, write_only=True, allow_blank=False) + labels = serializers.ListField(child=serializers.IntegerField(min_value=1), required=False) + instance_groups = serializers.ListField(child=serializers.IntegerField(min_value=1), required=False) + execution_environment = serializers.IntegerField(required=False, min_value=1) + # + limit = serializers.CharField(required=False, write_only=True, allow_blank=False) + scm_branch = serializers.CharField(required=False, write_only=True, allow_blank=False) + verbosity = serializers.IntegerField(required=False, min_value=1) + forks = serializers.IntegerField(required=False, min_value=1) + char_prompts = serializers.CharField(required=False, write_only=True, allow_blank=False) + diff_mode = serializers.CharField(required=False, write_only=True, allow_blank=False) + job_tags = serializers.CharField(required=False, write_only=True, allow_blank=False) + job_type = serializers.CharField(required=False, write_only=True, allow_blank=False) + skip_tags = serializers.CharField(required=False, write_only=True, allow_blank=False) + survey_passwords = serializers.CharField(required=False, write_only=True, allow_blank=False) + job_slice_count = serializers.IntegerField(required=False, min_value=1) + timeout = serializers.IntegerField(required=False, min_value=1) + + class Meta: + fields = ( + 'unified_job_template', + 'identifier', + 'inventory', + 'credentials', + 'limit', + 'labels', + 'instance_groups', + 'execution_environment' 'scm_branch', + 'verbosity' 'forks' 'char_prompts', + 'diff_mode', + 'extra_data', + 'job_slice_count', + 'job_tags', + 'job_type' 'skip_tags', + 'survey_passwords', + 'timeout', + # these are related objects and we need to add extra validation for them in the parent BulkJobLaunchSerializer + # + ) + + +class BulkJobLaunchSerializer(serializers.Serializer): + name = serializers.CharField(max_length=512, required=False) # limited by max name of jobs + jobs = BulkJobNodeSerializer(many=True, allow_empty=False, max_length=1000) + + class Meta: + fields = ('name', 'jobs') + read_only_fields = () + + def validate(self, attrs): + + request = self.context.get('request', None) + identifiers = set() + for node in attrs['jobs']: + if 'identifier' in node: + if node['identifier'] in identifiers: + raise serializers.ValidationError(_(f"Identifier {node['identifier']} not unique")) + identifiers.add(node['identifier']) + else: + node['identifier'] = str(uuid4()) + + # Build sets of all the requested resources + # TODO: As we add other related items, we need to add them here + requested_ujts = {j['unified_job_template'] for j in attrs['jobs']} + requested_use_inventories = {job['inventory'] for job in attrs['jobs'] if 'inventory' in job} + requested_use_execution_environments = {job['execution_environment'] for job in attrs['jobs'] if 'execution_environment' in job} + requested_use_credentials = set() + requested_use_labels = set() + requested_use_instance_groups = set() + for job in attrs['jobs']: + if 'credentials' in job: + [requested_use_credentials.add(cred) for cred in job['credentials']] + if 'labels' in job: + [requested_use_labels.add(label) for label in job['labels']] + if 'instance_groups' in job: + [requested_use_instance_groups.add(instance_group) for instance_group in job['instance_groups']] + if 'execution_environment' in job: + [requested_use_execution_environments.add(execution_env) for execution_env in job['execution_environment']] + + # If we are not a superuser, check we have permissions + # TODO: As we add other related items, we need to add them here + if request and not request.user.is_superuser: + allowed_ujts = set() + [allowed_ujts.add(tup[0]) for tup in UnifiedJobTemplate.accessible_pk_qs(request.user, 'execute_role').all()] + [allowed_ujts.add(tup[0]) for tup in UnifiedJobTemplate.accessible_pk_qs(request.user, 'admin_role').all()] + [allowed_ujts.add(tup[0]) for tup in UnifiedJobTemplate.accessible_pk_qs(request.user, 'update_role').all()] + accessible_inventories_qs = Inventory.accessible_pk_qs(request.user, 'update_role') + [allowed_ujts.add(tup[0]) for tup in InventorySource.objects.filter(inventory__in=accessible_inventories_qs).values_list('id')] + + if requested_ujts - allowed_ujts: + not_allowed = requested_ujts - allowed_ujts + raise serializers.ValidationError(_(f"Unified Job Templates {not_allowed} not found.")) + + if requested_use_inventories: + accessible_use_inventories = {tup[0] for tup in Inventory.accessible_pk_qs(request.user, 'use_role')} + if requested_use_inventories - accessible_use_inventories: + not_allowed = requested_use_inventories - accessible_use_inventories + raise serializers.ValidationError(_(f"Inventories {not_allowed} not found.")) + + if requested_use_credentials: + accessible_use_credentials = {tup[0] for tup in Credential.accessible_pk_qs(request.user, 'use_role').all()} + if requested_use_credentials - accessible_use_credentials: + not_allowed = requested_use_credentials - accessible_use_credentials + raise serializers.ValidationError(_(f"Credentials {not_allowed} not found.")) + + if requested_use_labels: + accessible_use_labels = {tup[0] for tup in Label.objects.all()} + if requested_use_labels - accessible_use_labels: + not_allowed = requested_use_labels - accessible_use_labels + raise serializers.ValidationError(_(f"Labels {not_allowed} not found")) + + if requested_use_instance_groups: + # only org admins are allowed to see instance groups + organization_admin_qs = Organization.accessible_pk_qs(request.user, 'admin_role').all() + if organization_admin_qs: + accessible_use_instance_groups = {tup[0] for tup in InstanceGroup.objects.all()} + if requested_use_instance_groups - accessible_use_instance_groups: + not_allowed = requested_use_instance_groups - accessible_use_instance_groups + raise serializers.ValidationError(_(f"Instance Groups {not_allowed} not found")) + + # TODO: Figure out the Execution environment RBAC + # For execution environment, need to figure out the RBAC part. Seems like any user part of an organization can see/use all the execution + # of that orgnization. So we need to filter out the ee's based on request.user organization. + + # all of the unified job templates and related items have now been checked, we can now grab the objects from the DB + # TODO: As we add more related objects like Label, InstanceGroup, etc we need to add them here + objectified_jobs = [] + key_to_obj_map = { + "unified_job_template": {obj.id: obj for obj in UnifiedJobTemplate.objects.filter(id__in=requested_ujts)}, + "inventory": {obj.id: obj for obj in Inventory.objects.filter(id__in=requested_use_inventories)}, + "credentials": {obj.id: obj for obj in Credential.objects.filter(id__in=requested_use_credentials)}, + "labels": {obj.id: obj for obj in Label.objects.filter(id__in=requested_use_labels)}, + "instance_groups": {obj.id: obj for obj in InstanceGroup.objects.filter(id__in=requested_use_instance_groups)}, + } + + # This loop is generalized so we should only have to add related items to the key_to_obj_map + for job in attrs['jobs']: + objectified_job = {} + for key, value in job.items(): + if key in key_to_obj_map: + if isinstance(value, int): + objectified_job[key] = key_to_obj_map[key][value] + elif isinstance(value, list): + objectified_job[key] = [] + for item in value: + objectified_job[key].append(key_to_obj_map[key][item]) + else: + objectified_job[key] = value + objectified_jobs.append(objectified_job) + + attrs['jobs'] = objectified_jobs + return attrs + + def create(self, validated_data): + job_node_data = validated_data.pop('jobs') + + # FIXME: Need to set organization on the WorkflowJob in order for users to be able to see it -- + # normally their permission is sourced from the underlying WorkflowJobTemplate + # maybe we need to add Organization to WorkflowJob + if 'name' not in validated_data: + validated_data['name'] = 'Bulk Job Launch' + + wfj = WorkflowJob.objects.create(**validated_data, is_bulk_job=True) + nodes = [] + node_m2m_objects = {} + node_m2m_object_types_to_through_model = { + 'credentials': WorkflowJobNode.credentials.through, + 'labels': WorkflowJobNode.labels.through, + 'instance_groups': WorkflowJobNode.instance_groups.through, + } + node_deferred_attr_names = ( + 'limit', + 'scm_branch', + 'verbosity', + 'forks', + 'char_prompts', + 'diff_mode', + 'job_tags', + 'job_type', + 'skip_tags', + 'survey_passwords', + 'job_slice_count', + 'timeout', + ) + node_deferred_attrs = {} + for node_attrs in job_node_data: + + # we need to add any m2m objects after creation via the through model + node_m2m_objects[node_attrs['identifier']] = {} + node_deferred_attrs[node_attrs['identifier']] = {} + for item in node_m2m_object_types_to_through_model.keys(): + if item in node_attrs: + node_m2m_objects[node_attrs['identifier']][item] = node_attrs.pop(item) + + # Some attributes are not accepted by WorkflowJobNode __init__, we have to set them after + for item in node_deferred_attr_names: + if item in node_attrs: + node_deferred_attrs[node_attrs['identifier']][item] = node_attrs.pop(item) + + # Create the node objects + node_obj = WorkflowJobNode(workflow_job=wfj, created=wfj.created, modified=wfj.modified, **node_attrs) + + # we can set the deferred attrs now + for item, value in node_deferred_attrs[node_attrs['identifier']].items(): + setattr(node_obj, item, value) + + # the node is now ready to be bulk created + nodes.append(node_obj) + + # we'll need this later when we do the m2m through model bulk create + node_m2m_objects[node_attrs['identifier']]['node'] = node_obj + + WorkflowJobNode.objects.bulk_create(nodes) + + # Deal with the m2m objects we have to create once the node exists + for obj_type, obj_through_model in node_m2m_object_types_to_through_model.items(): + through_models = [] + for node_identifier in node_m2m_objects.keys(): + if obj_type in node_m2m_objects[node_identifier] and obj_type == 'credentials': + for cred in node_m2m_objects[node_identifier][obj_type]: + through_models.append(obj_through_model(credential=cred, workflowjobnode=node_m2m_objects[node_identifier]['node'])) + if obj_type in node_m2m_objects[node_identifier] and obj_type == 'labels': + for label in node_m2m_objects[node_identifier][obj_type]: + through_models.append(obj_through_model(label=label, workflowjobnode=node_m2m_objects[node_identifier]['node'])) + if obj_type in node_m2m_objects[node_identifier] and obj_type == 'instance_groups': + for instance_group in node_m2m_objects[node_identifier][obj_type]: + through_models.append(obj_through_model(instancegroup=instance_group, workflowjobnode=node_m2m_objects[node_identifier]['node'])) + if through_models: + obj_through_model.objects.bulk_create(through_models) + + wfj.status = 'pending' + wfj.save() + return WorkflowJobSerializer().to_representation(wfj) + + class NotificationTemplateSerializer(BaseSerializer): show_capabilities = ['edit', 'delete', 'copy'] capabilities_prefetch = [{'copy': 'organization.admin'}] diff --git a/awx/api/urls/urls.py b/awx/api/urls/urls.py index 5d6121ad6c..49283ceffe 100644 --- a/awx/api/urls/urls.py +++ b/awx/api/urls/urls.py @@ -30,6 +30,8 @@ from awx.api.views import ( OAuth2TokenList, ApplicationOAuth2TokenList, OAuth2ApplicationDetail, + BulkJobLaunchView, + BulkView, ) from awx.api.views.bulk import ( BulkView, @@ -140,8 +142,9 @@ v2_urls = [ re_path(r'^activity_stream/', include(activity_stream_urls)), re_path(r'^workflow_approval_templates/', include(workflow_approval_template_urls)), re_path(r'^workflow_approvals/', include(workflow_approval_urls)), - re_path(r'^bulk/host_create/$', BulkHostCreateView.as_view(), name='bulk_host_create'), re_path(r'^bulk/$', BulkView.as_view(), name='bulk'), + re_path(r'^bulk/host_create/$', BulkHostCreateView.as_view(), name='bulk_host_create'), + re_path(r'^bulk/job_launch/$', BulkJobLaunchView.as_view(), name='bulk_job_launch'), ] diff --git a/awx/api/views/__init__.py b/awx/api/views/__init__.py index 4d5f98d2c2..14dc76367a 100644 --- a/awx/api/views/__init__.py +++ b/awx/api/views/__init__.py @@ -4301,6 +4301,41 @@ class WorkflowApprovalDetail(UnifiedJobDeletionMixin, RetrieveDestroyAPIView): serializer_class = serializers.WorkflowApprovalSerializer +from rest_framework.decorators import api_view + + +class BulkJobLaunchView(APIView): + _ignore_model_permissions = True + permission_classes = [IsAuthenticated] + serializer_class = serializers.BulkJobLaunchSerializer + allowed_methods = ['GET', 'POST', 'OPTIONS'] + + def get(self, request): + # TODO Return something sensible here, like the defaults + bulkjob_serializer = serializers.BulkJobLaunchSerializer(data={}, context={'request': request}) + bulkjob_serializer.is_valid() + return Response(bulkjob_serializer.errors, status=status.HTTP_200_OK) + + def post(self, request): + bulkjob_serializer = serializers.BulkJobLaunchSerializer(data=request.data, context={'request': request}) + if bulkjob_serializer.is_valid(): + result = bulkjob_serializer.create(bulkjob_serializer.validated_data) + return Response(result, status=status.HTTP_201_CREATED) + return Response(bulkjob_serializer.errors, status=status.HTTP_400_BAD_REQUEST) + + +class BulkView(APIView): + _ignore_model_permissions = True + permission_classes = [IsAuthenticated] + allowed_methods = ['GET'] + + def get(self, request, format=None): + '''List top level resources''' + data = OrderedDict() + data['bulk_job_launch'] = reverse('api:bulk_job_launch', request=request) + return Response(data) + + class WorkflowApprovalApprove(RetrieveAPIView): model = models.WorkflowApproval serializer_class = serializers.WorkflowApprovalViewSerializer diff --git a/awx/main/models/workflow.py b/awx/main/models/workflow.py index 56dcaf7d2d..3d8f33c0c2 100644 --- a/awx/main/models/workflow.py +++ b/awx/main/models/workflow.py @@ -650,6 +650,7 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, SurveyJobMixin, JobNotificatio help_text=_("If automatically created for a sliced job run, the job template " "the workflow job was created from."), ) is_sliced_job = models.BooleanField(default=False) + is_bulk_job = models.BooleanField(default=False) def _set_default_dependencies_processed(self): self.dependencies_processed = True