Introduce bulk jobs

Provide a view that allows users to launch many jobs with one POST
request. Under the hood, this creates a workflow with a number of jobs
all in a "flat" structure --  much like a sliced job, but with arbitrary
"joblets".

For ~ 100 nodes looking at ~ 200 some queries, which is more than the
proof of concept, but still an order of magnitude better than individual
job launches.

Still more work to implement other m2m objects, and need to address what
Organization should be assigned to a WorkflowJob launched by a BulkJob.

They need this so they can step into the workflow_job_nodes and get the
status of all the containing jobs.

Also want to test when there are MANY job templates etc in the system
because the querires like
UnifiedJobTemplate.accessible_pk_qs(request.user, 'execute_role').all()
queries scare me, seems like it could be a lot of things.

use "many=True" instead of ListField

Still seeing identical number of queries when creatin 100 jobs, going to
investigate more

only validate type in nested serializer

then, we actually get the database object after we do the RBAC checks
This drops us down from hundreds of queries to launch 100 jobs,
to less than 100 queries to launch 100 jobs (I got around 24 queries to
launch 100 jobs with credentials)

pave way for more promptable things

add "limit" as possible prompt on launch to bulk jobs
re-organize how we add credentials to pave way for the other m2m items
not having to repeat too much code

add labels to the bulk job

add the other fields to the workflowjobnode

move urls around
This commit is contained in:
Elijah DeLee 2022-12-12 19:53:29 -05:00
parent 752289e175
commit 5c47c24e28
4 changed files with 284 additions and 1 deletions

View File

@ -8,6 +8,7 @@ import logging
import re
from collections import OrderedDict
from datetime import timedelta
from uuid import uuid4
# OAuth2
from oauthlib import oauth2
@ -4525,6 +4526,249 @@ class WorkflowJobLaunchSerializer(BaseSerializer):
return accepted
class BulkJobNodeSerializer(serializers.Serializer):
# if we can find out the user, we can filter down the UnifiedJobTemplate objects
unified_job_template = serializers.IntegerField(
required=True,
min_value=1,
)
inventory = serializers.IntegerField(required=False, min_value=1)
credentials = serializers.ListField(child=serializers.IntegerField(min_value=1), required=False)
identifier = serializers.CharField(required=False, write_only=True, allow_blank=False)
labels = serializers.ListField(child=serializers.IntegerField(min_value=1), required=False)
instance_groups = serializers.ListField(child=serializers.IntegerField(min_value=1), required=False)
execution_environment = serializers.IntegerField(required=False, min_value=1)
#
limit = serializers.CharField(required=False, write_only=True, allow_blank=False)
scm_branch = serializers.CharField(required=False, write_only=True, allow_blank=False)
verbosity = serializers.IntegerField(required=False, min_value=1)
forks = serializers.IntegerField(required=False, min_value=1)
char_prompts = serializers.CharField(required=False, write_only=True, allow_blank=False)
diff_mode = serializers.CharField(required=False, write_only=True, allow_blank=False)
job_tags = serializers.CharField(required=False, write_only=True, allow_blank=False)
job_type = serializers.CharField(required=False, write_only=True, allow_blank=False)
skip_tags = serializers.CharField(required=False, write_only=True, allow_blank=False)
survey_passwords = serializers.CharField(required=False, write_only=True, allow_blank=False)
job_slice_count = serializers.IntegerField(required=False, min_value=1)
timeout = serializers.IntegerField(required=False, min_value=1)
class Meta:
fields = (
'unified_job_template',
'identifier',
'inventory',
'credentials',
'limit',
'labels',
'instance_groups',
'execution_environment' 'scm_branch',
'verbosity' 'forks' 'char_prompts',
'diff_mode',
'extra_data',
'job_slice_count',
'job_tags',
'job_type' 'skip_tags',
'survey_passwords',
'timeout',
# these are related objects and we need to add extra validation for them in the parent BulkJobLaunchSerializer
#
)
class BulkJobLaunchSerializer(serializers.Serializer):
name = serializers.CharField(max_length=512, required=False) # limited by max name of jobs
jobs = BulkJobNodeSerializer(many=True, allow_empty=False, max_length=1000)
class Meta:
fields = ('name', 'jobs')
read_only_fields = ()
def validate(self, attrs):
request = self.context.get('request', None)
identifiers = set()
for node in attrs['jobs']:
if 'identifier' in node:
if node['identifier'] in identifiers:
raise serializers.ValidationError(_(f"Identifier {node['identifier']} not unique"))
identifiers.add(node['identifier'])
else:
node['identifier'] = str(uuid4())
# Build sets of all the requested resources
# TODO: As we add other related items, we need to add them here
requested_ujts = {j['unified_job_template'] for j in attrs['jobs']}
requested_use_inventories = {job['inventory'] for job in attrs['jobs'] if 'inventory' in job}
requested_use_execution_environments = {job['execution_environment'] for job in attrs['jobs'] if 'execution_environment' in job}
requested_use_credentials = set()
requested_use_labels = set()
requested_use_instance_groups = set()
for job in attrs['jobs']:
if 'credentials' in job:
[requested_use_credentials.add(cred) for cred in job['credentials']]
if 'labels' in job:
[requested_use_labels.add(label) for label in job['labels']]
if 'instance_groups' in job:
[requested_use_instance_groups.add(instance_group) for instance_group in job['instance_groups']]
if 'execution_environment' in job:
[requested_use_execution_environments.add(execution_env) for execution_env in job['execution_environment']]
# If we are not a superuser, check we have permissions
# TODO: As we add other related items, we need to add them here
if request and not request.user.is_superuser:
allowed_ujts = set()
[allowed_ujts.add(tup[0]) for tup in UnifiedJobTemplate.accessible_pk_qs(request.user, 'execute_role').all()]
[allowed_ujts.add(tup[0]) for tup in UnifiedJobTemplate.accessible_pk_qs(request.user, 'admin_role').all()]
[allowed_ujts.add(tup[0]) for tup in UnifiedJobTemplate.accessible_pk_qs(request.user, 'update_role').all()]
accessible_inventories_qs = Inventory.accessible_pk_qs(request.user, 'update_role')
[allowed_ujts.add(tup[0]) for tup in InventorySource.objects.filter(inventory__in=accessible_inventories_qs).values_list('id')]
if requested_ujts - allowed_ujts:
not_allowed = requested_ujts - allowed_ujts
raise serializers.ValidationError(_(f"Unified Job Templates {not_allowed} not found."))
if requested_use_inventories:
accessible_use_inventories = {tup[0] for tup in Inventory.accessible_pk_qs(request.user, 'use_role')}
if requested_use_inventories - accessible_use_inventories:
not_allowed = requested_use_inventories - accessible_use_inventories
raise serializers.ValidationError(_(f"Inventories {not_allowed} not found."))
if requested_use_credentials:
accessible_use_credentials = {tup[0] for tup in Credential.accessible_pk_qs(request.user, 'use_role').all()}
if requested_use_credentials - accessible_use_credentials:
not_allowed = requested_use_credentials - accessible_use_credentials
raise serializers.ValidationError(_(f"Credentials {not_allowed} not found."))
if requested_use_labels:
accessible_use_labels = {tup[0] for tup in Label.objects.all()}
if requested_use_labels - accessible_use_labels:
not_allowed = requested_use_labels - accessible_use_labels
raise serializers.ValidationError(_(f"Labels {not_allowed} not found"))
if requested_use_instance_groups:
# only org admins are allowed to see instance groups
organization_admin_qs = Organization.accessible_pk_qs(request.user, 'admin_role').all()
if organization_admin_qs:
accessible_use_instance_groups = {tup[0] for tup in InstanceGroup.objects.all()}
if requested_use_instance_groups - accessible_use_instance_groups:
not_allowed = requested_use_instance_groups - accessible_use_instance_groups
raise serializers.ValidationError(_(f"Instance Groups {not_allowed} not found"))
# TODO: Figure out the Execution environment RBAC
# For execution environment, need to figure out the RBAC part. Seems like any user part of an organization can see/use all the execution
# of that orgnization. So we need to filter out the ee's based on request.user organization.
# all of the unified job templates and related items have now been checked, we can now grab the objects from the DB
# TODO: As we add more related objects like Label, InstanceGroup, etc we need to add them here
objectified_jobs = []
key_to_obj_map = {
"unified_job_template": {obj.id: obj for obj in UnifiedJobTemplate.objects.filter(id__in=requested_ujts)},
"inventory": {obj.id: obj for obj in Inventory.objects.filter(id__in=requested_use_inventories)},
"credentials": {obj.id: obj for obj in Credential.objects.filter(id__in=requested_use_credentials)},
"labels": {obj.id: obj for obj in Label.objects.filter(id__in=requested_use_labels)},
"instance_groups": {obj.id: obj for obj in InstanceGroup.objects.filter(id__in=requested_use_instance_groups)},
}
# This loop is generalized so we should only have to add related items to the key_to_obj_map
for job in attrs['jobs']:
objectified_job = {}
for key, value in job.items():
if key in key_to_obj_map:
if isinstance(value, int):
objectified_job[key] = key_to_obj_map[key][value]
elif isinstance(value, list):
objectified_job[key] = []
for item in value:
objectified_job[key].append(key_to_obj_map[key][item])
else:
objectified_job[key] = value
objectified_jobs.append(objectified_job)
attrs['jobs'] = objectified_jobs
return attrs
def create(self, validated_data):
job_node_data = validated_data.pop('jobs')
# FIXME: Need to set organization on the WorkflowJob in order for users to be able to see it --
# normally their permission is sourced from the underlying WorkflowJobTemplate
# maybe we need to add Organization to WorkflowJob
if 'name' not in validated_data:
validated_data['name'] = 'Bulk Job Launch'
wfj = WorkflowJob.objects.create(**validated_data, is_bulk_job=True)
nodes = []
node_m2m_objects = {}
node_m2m_object_types_to_through_model = {
'credentials': WorkflowJobNode.credentials.through,
'labels': WorkflowJobNode.labels.through,
'instance_groups': WorkflowJobNode.instance_groups.through,
}
node_deferred_attr_names = (
'limit',
'scm_branch',
'verbosity',
'forks',
'char_prompts',
'diff_mode',
'job_tags',
'job_type',
'skip_tags',
'survey_passwords',
'job_slice_count',
'timeout',
)
node_deferred_attrs = {}
for node_attrs in job_node_data:
# we need to add any m2m objects after creation via the through model
node_m2m_objects[node_attrs['identifier']] = {}
node_deferred_attrs[node_attrs['identifier']] = {}
for item in node_m2m_object_types_to_through_model.keys():
if item in node_attrs:
node_m2m_objects[node_attrs['identifier']][item] = node_attrs.pop(item)
# Some attributes are not accepted by WorkflowJobNode __init__, we have to set them after
for item in node_deferred_attr_names:
if item in node_attrs:
node_deferred_attrs[node_attrs['identifier']][item] = node_attrs.pop(item)
# Create the node objects
node_obj = WorkflowJobNode(workflow_job=wfj, created=wfj.created, modified=wfj.modified, **node_attrs)
# we can set the deferred attrs now
for item, value in node_deferred_attrs[node_attrs['identifier']].items():
setattr(node_obj, item, value)
# the node is now ready to be bulk created
nodes.append(node_obj)
# we'll need this later when we do the m2m through model bulk create
node_m2m_objects[node_attrs['identifier']]['node'] = node_obj
WorkflowJobNode.objects.bulk_create(nodes)
# Deal with the m2m objects we have to create once the node exists
for obj_type, obj_through_model in node_m2m_object_types_to_through_model.items():
through_models = []
for node_identifier in node_m2m_objects.keys():
if obj_type in node_m2m_objects[node_identifier] and obj_type == 'credentials':
for cred in node_m2m_objects[node_identifier][obj_type]:
through_models.append(obj_through_model(credential=cred, workflowjobnode=node_m2m_objects[node_identifier]['node']))
if obj_type in node_m2m_objects[node_identifier] and obj_type == 'labels':
for label in node_m2m_objects[node_identifier][obj_type]:
through_models.append(obj_through_model(label=label, workflowjobnode=node_m2m_objects[node_identifier]['node']))
if obj_type in node_m2m_objects[node_identifier] and obj_type == 'instance_groups':
for instance_group in node_m2m_objects[node_identifier][obj_type]:
through_models.append(obj_through_model(instancegroup=instance_group, workflowjobnode=node_m2m_objects[node_identifier]['node']))
if through_models:
obj_through_model.objects.bulk_create(through_models)
wfj.status = 'pending'
wfj.save()
return WorkflowJobSerializer().to_representation(wfj)
class NotificationTemplateSerializer(BaseSerializer):
show_capabilities = ['edit', 'delete', 'copy']
capabilities_prefetch = [{'copy': 'organization.admin'}]

View File

@ -30,6 +30,8 @@ from awx.api.views import (
OAuth2TokenList,
ApplicationOAuth2TokenList,
OAuth2ApplicationDetail,
BulkJobLaunchView,
BulkView,
)
from awx.api.views.bulk import (
BulkView,
@ -140,8 +142,9 @@ v2_urls = [
re_path(r'^activity_stream/', include(activity_stream_urls)),
re_path(r'^workflow_approval_templates/', include(workflow_approval_template_urls)),
re_path(r'^workflow_approvals/', include(workflow_approval_urls)),
re_path(r'^bulk/host_create/$', BulkHostCreateView.as_view(), name='bulk_host_create'),
re_path(r'^bulk/$', BulkView.as_view(), name='bulk'),
re_path(r'^bulk/host_create/$', BulkHostCreateView.as_view(), name='bulk_host_create'),
re_path(r'^bulk/job_launch/$', BulkJobLaunchView.as_view(), name='bulk_job_launch'),
]

View File

@ -4301,6 +4301,41 @@ class WorkflowApprovalDetail(UnifiedJobDeletionMixin, RetrieveDestroyAPIView):
serializer_class = serializers.WorkflowApprovalSerializer
from rest_framework.decorators import api_view
class BulkJobLaunchView(APIView):
_ignore_model_permissions = True
permission_classes = [IsAuthenticated]
serializer_class = serializers.BulkJobLaunchSerializer
allowed_methods = ['GET', 'POST', 'OPTIONS']
def get(self, request):
# TODO Return something sensible here, like the defaults
bulkjob_serializer = serializers.BulkJobLaunchSerializer(data={}, context={'request': request})
bulkjob_serializer.is_valid()
return Response(bulkjob_serializer.errors, status=status.HTTP_200_OK)
def post(self, request):
bulkjob_serializer = serializers.BulkJobLaunchSerializer(data=request.data, context={'request': request})
if bulkjob_serializer.is_valid():
result = bulkjob_serializer.create(bulkjob_serializer.validated_data)
return Response(result, status=status.HTTP_201_CREATED)
return Response(bulkjob_serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class BulkView(APIView):
_ignore_model_permissions = True
permission_classes = [IsAuthenticated]
allowed_methods = ['GET']
def get(self, request, format=None):
'''List top level resources'''
data = OrderedDict()
data['bulk_job_launch'] = reverse('api:bulk_job_launch', request=request)
return Response(data)
class WorkflowApprovalApprove(RetrieveAPIView):
model = models.WorkflowApproval
serializer_class = serializers.WorkflowApprovalViewSerializer

View File

@ -650,6 +650,7 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, SurveyJobMixin, JobNotificatio
help_text=_("If automatically created for a sliced job run, the job template " "the workflow job was created from."),
)
is_sliced_job = models.BooleanField(default=False)
is_bulk_job = models.BooleanField(default=False)
def _set_default_dependencies_processed(self):
self.dependencies_processed = True