Job splitting access logic and more feature development

*allow sharding with prompts and schedules
*modify create_unified_job contract to pass class & parent_field name
*make parent field name instance method & set sharded UJT field
*access methods made compatible with job sharding
*move shard job special logic from task manager to workflows
*save sharded job prompts to workflow job exclusively
*allow using sharded jobs in workflows
This commit is contained in:
AlanCoding
2018-08-28 15:31:59 -04:00
parent dab678c5cc
commit f9bdb1da15
13 changed files with 174 additions and 75 deletions

View File

@@ -3590,7 +3590,7 @@ class WorkflowJobSerializer(LabelsListMixin, UnifiedJobSerializer):
class Meta: class Meta:
model = WorkflowJob model = WorkflowJob
fields = ('*', 'workflow_job_template', 'extra_vars', 'allow_simultaneous', fields = ('*', 'workflow_job_template', 'extra_vars', 'allow_simultaneous', 'job_template',
'-execution_node', '-event_processing_finished', '-controller_node',) '-execution_node', '-event_processing_finished', '-controller_node',)
def get_related(self, obj): def get_related(self, obj):

View File

@@ -2905,7 +2905,7 @@ class JobTemplateLaunch(RetrieveAPIView):
raise PermissionDenied() raise PermissionDenied()
passwords = serializer.validated_data.pop('credential_passwords', {}) passwords = serializer.validated_data.pop('credential_passwords', {})
new_job = obj.create_job(**serializer.validated_data) new_job = obj.create_unified_job(**serializer.validated_data)
result = new_job.signal_start(**passwords) result = new_job.signal_start(**passwords)
if not result: if not result:

View File

@@ -1789,7 +1789,7 @@ class WorkflowJobNodeAccess(BaseAccess):
def filtered_queryset(self): def filtered_queryset(self):
return self.model.objects.filter( return self.model.objects.filter(
workflow_job__workflow_job_template__in=WorkflowJobTemplate.accessible_objects( workflow_job__unified_job_template__in=UnifiedJobTemplate.accessible_pk_qs(
self.user, 'read_role')) self.user, 'read_role'))
@check_superuser @check_superuser
@@ -1915,7 +1915,7 @@ class WorkflowJobAccess(BaseAccess):
def filtered_queryset(self): def filtered_queryset(self):
return WorkflowJob.objects.filter( return WorkflowJob.objects.filter(
workflow_job_template__in=WorkflowJobTemplate.accessible_objects( unified_job_template__in=UnifiedJobTemplate.accessible_pk_qs(
self.user, 'read_role')) self.user, 'read_role'))
def can_add(self, data): def can_add(self, data):
@@ -1947,9 +1947,11 @@ class WorkflowJobAccess(BaseAccess):
if self.user.is_superuser: if self.user.is_superuser:
return True return True
wfjt = obj.workflow_job_template template = obj.workflow_job_template
if not template and obj.job_template_id:
template = obj.job_template
# only superusers can relaunch orphans # only superusers can relaunch orphans
if not wfjt: if not template:
return False return False
# If job was launched by another user, it could have survey passwords # If job was launched by another user, it could have survey passwords
@@ -1967,7 +1969,7 @@ class WorkflowJobAccess(BaseAccess):
return False return False
# execute permission to WFJT is mandatory for any relaunch # execute permission to WFJT is mandatory for any relaunch
return (self.user in wfjt.execute_role) return (self.user in template.execute_role)
def can_recreate(self, obj): def can_recreate(self, obj):
node_qs = obj.workflow_job_nodes.all().prefetch_related('inventory', 'credentials', 'unified_job_template') node_qs = obj.workflow_job_nodes.all().prefetch_related('inventory', 'credentials', 'unified_job_template')

View File

@@ -136,8 +136,7 @@ class AdHocCommand(UnifiedJob, JobNotificationMixin):
else: else:
return [] return []
@classmethod def _get_parent_field_name(self):
def _get_parent_field_name(cls):
return '' return ''
@classmethod @classmethod

View File

@@ -1647,8 +1647,7 @@ class InventoryUpdate(UnifiedJob, InventorySourceOptions, JobNotificationMixin,
null=True null=True
) )
@classmethod def _get_parent_field_name(self):
def _get_parent_field_name(cls):
return 'inventory_source' return 'inventory_source'
@classmethod @classmethod

View File

@@ -320,32 +320,32 @@ class JobTemplate(UnifiedJobTemplate, JobOptions, SurveyJobTemplateMixin, Resour
def resources_needed_to_start(self): def resources_needed_to_start(self):
return [fd for fd in ['project', 'inventory'] if not getattr(self, '{}_id'.format(fd))] return [fd for fd in ['project', 'inventory'] if not getattr(self, '{}_id'.format(fd))]
def create_job(self, **kwargs): def create_unified_job(self, **kwargs):
''' '''
Create a new job based on this template. Create a new job based on this template.
''' '''
if self.job_shard_count > 1: split_event = bool(
self.job_shard_count > 1 and
not kwargs.pop('_prevent_sharding', False)
)
if split_event:
# A sharded Job Template will generate a WorkflowJob rather than a Job # A sharded Job Template will generate a WorkflowJob rather than a Job
from awx.main.models.workflow import WorkflowJobTemplate, WorkflowJobNode from awx.main.models.workflow import WorkflowJobTemplate, WorkflowJobNode
kwargs['_unified_job_class'] = WorkflowJobTemplate._get_unified_job_class() kwargs['_unified_job_class'] = WorkflowJobTemplate._get_unified_job_class()
kwargs['_unified_job_field_names'] = WorkflowJobTemplate._get_unified_job_field_names() kwargs['_parent_field_name'] = "job_template"
job = self.create_unified_job(**kwargs) job = super(JobTemplate, self).create_unified_job(**kwargs)
if self.job_shard_count > 1: if split_event:
if 'inventory' in kwargs: try:
actual_inventory = kwargs['inventory'] wj_config = job.launch_config
else: except JobLaunchConfig.DoesNotExist:
actual_inventory = self.inventory wj_config = JobLaunchConfig()
actual_inventory = wj_config.inventory if wj_config.inventory else self.inventory
for idx in xrange(min(self.job_shard_count, for idx in xrange(min(self.job_shard_count,
actual_inventory.hosts.count())): actual_inventory.hosts.count())):
create_kwargs = dict(workflow_job=job, create_kwargs = dict(workflow_job=job,
unified_job_template=self, unified_job_template=self,
#survey_passwords=self.survey_passwords,
inventory=actual_inventory,
ancestor_artifacts=dict(job_shard=idx)) ancestor_artifacts=dict(job_shard=idx))
#char_prompts=self.char_prompts)
wfjn = WorkflowJobNode.objects.create(**create_kwargs) wfjn = WorkflowJobNode.objects.create(**create_kwargs)
for cred in self.credentials.all():
wfjn.credentials.add(cred)
return job return job
def get_absolute_url(self, request=None): def get_absolute_url(self, request=None):
@@ -531,8 +531,7 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin, TaskMana
) )
@classmethod def _get_parent_field_name(self):
def _get_parent_field_name(cls):
return 'job_template' return 'job_template'
@classmethod @classmethod

View File

@@ -496,8 +496,7 @@ class ProjectUpdate(UnifiedJob, ProjectOptions, JobNotificationMixin, TaskManage
default='check', default='check',
) )
@classmethod def _get_parent_field_name(self):
def _get_parent_field_name(cls):
return 'project' return 'project'
@classmethod @classmethod

View File

@@ -309,13 +309,6 @@ class UnifiedJobTemplate(PolymorphicModel, CommonModelNameNotUnique, Notificatio
''' '''
raise NotImplementedError # Implement in subclass. raise NotImplementedError # Implement in subclass.
@classmethod
def _get_unified_job_field_names(cls):
'''
Return field names that should be copied from template to new job.
'''
raise NotImplementedError # Implement in subclass.
@property @property
def notification_templates(self): def notification_templates(self):
''' '''
@@ -338,27 +331,32 @@ class UnifiedJobTemplate(PolymorphicModel, CommonModelNameNotUnique, Notificatio
password_list = self.survey_password_variables() password_list = self.survey_password_variables()
encrypt_dict(kwargs.get('extra_vars', {}), password_list) encrypt_dict(kwargs.get('extra_vars', {}), password_list)
unified_job_class = kwargs.pop("_unified_job_class", self._get_unified_job_class()) unified_job_class = self._get_unified_job_class()
fields = kwargs.pop("_unified_job_field_names", self._get_unified_job_field_names()) fields = self._get_unified_job_field_names()
print("UJC: {}".format(unified_job_class)) parent_field_name = None
print("fields: {}".format(fields)) if "_unified_job_class" in kwargs:
# Special case where spawned job is different type than usual
# Only used for sharded jobs
unified_job_class = kwargs.pop("_unified_job_class")
fields = unified_job_class._get_unified_job_field_names() & fields
parent_field_name = kwargs.pop('_parent_field_name')
unallowed_fields = set(kwargs.keys()) - set(fields) unallowed_fields = set(kwargs.keys()) - set(fields)
validated_kwargs = kwargs.copy()
if unallowed_fields: if unallowed_fields:
logger.warn('Fields {} are not allowed as overrides.'.format(unallowed_fields)) logger.warn('Fields {} are not allowed as overrides.'.format(unallowed_fields))
map(kwargs.pop, unallowed_fields) map(validated_kwargs.pop, unallowed_fields)
unified_job = copy_model_by_class(self, unified_job_class, fields, kwargs) unified_job = copy_model_by_class(self, unified_job_class, fields, validated_kwargs)
if eager_fields: if eager_fields:
for fd, val in eager_fields.items(): for fd, val in eager_fields.items():
setattr(unified_job, fd, val) setattr(unified_job, fd, val)
# Set the unified job template back-link on the job # NOTE: sharded workflow jobs _get_parent_field_name method
# TODO: fix this hack properly before merge matburt # is not correct until this is set
if isinstance(self, JobTemplate) and isinstance(unified_job, WorkflowJob): if not parent_field_name:
parent_field_name = "job_template" parent_field_name = unified_job._get_parent_field_name()
else:
parent_field_name = unified_job_class._get_parent_field_name()
setattr(unified_job, parent_field_name, self) setattr(unified_job, parent_field_name, self)
# For JobTemplate-based jobs with surveys, add passwords to list for perma-redaction # For JobTemplate-based jobs with surveys, add passwords to list for perma-redaction
@@ -372,24 +370,25 @@ class UnifiedJobTemplate(PolymorphicModel, CommonModelNameNotUnique, Notificatio
unified_job.save() unified_job.save()
# Labels and credentials copied here # Labels and credentials copied here
if kwargs.get('credentials'): if validated_kwargs.get('credentials'):
Credential = UnifiedJob._meta.get_field('credentials').related_model Credential = UnifiedJob._meta.get_field('credentials').related_model
cred_dict = Credential.unique_dict(self.credentials.all()) cred_dict = Credential.unique_dict(self.credentials.all())
prompted_dict = Credential.unique_dict(kwargs['credentials']) prompted_dict = Credential.unique_dict(validated_kwargs['credentials'])
# combine prompted credentials with JT # combine prompted credentials with JT
cred_dict.update(prompted_dict) cred_dict.update(prompted_dict)
kwargs['credentials'] = [cred for cred in cred_dict.values()] validated_kwargs['credentials'] = [cred for cred in cred_dict.values()]
kwargs['credentials'] = validated_kwargs['credentials']
from awx.main.signals import disable_activity_stream from awx.main.signals import disable_activity_stream
with disable_activity_stream(): with disable_activity_stream():
copy_m2m_relationships(self, unified_job, fields, kwargs=kwargs) copy_m2m_relationships(self, unified_job, fields, kwargs=validated_kwargs)
if 'extra_vars' in kwargs: if 'extra_vars' in validated_kwargs:
unified_job.handle_extra_data(kwargs['extra_vars']) unified_job.handle_extra_data(validated_kwargs['extra_vars'])
if not getattr(self, '_deprecated_credential_launch', False): if not getattr(self, '_deprecated_credential_launch', False):
# Create record of provided prompts for relaunch and rescheduling # Create record of provided prompts for relaunch and rescheduling
unified_job.create_config_from_prompts(kwargs) unified_job.create_config_from_prompts(kwargs, parent=self)
return unified_job return unified_job
@@ -702,8 +701,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
def supports_isolation(cls): def supports_isolation(cls):
return False return False
@classmethod def _get_parent_field_name(self):
def _get_parent_field_name(cls):
return 'unified_job_template' # Override in subclasses. return 'unified_job_template' # Override in subclasses.
@classmethod @classmethod
@@ -874,16 +872,16 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
except JobLaunchConfig.DoesNotExist: except JobLaunchConfig.DoesNotExist:
return None return None
def create_config_from_prompts(self, kwargs): def create_config_from_prompts(self, kwargs, parent=None):
''' '''
Create a launch configuration entry for this job, given prompts Create a launch configuration entry for this job, given prompts
returns None if it can not be created returns None if it can not be created
''' '''
if self.unified_job_template is None:
return None
JobLaunchConfig = self._meta.get_field('launch_config').related_model JobLaunchConfig = self._meta.get_field('launch_config').related_model
config = JobLaunchConfig(job=self) config = JobLaunchConfig(job=self)
valid_fields = self.unified_job_template.get_ask_mapping().keys() if parent is None:
parent = getattr(self, self._get_parent_field_name())
valid_fields = parent.get_ask_mapping().keys()
# Special cases allowed for workflows # Special cases allowed for workflows
if hasattr(self, 'extra_vars'): if hasattr(self, 'extra_vars'):
valid_fields.extend(['survey_passwords', 'extra_vars']) valid_fields.extend(['survey_passwords', 'extra_vars'])
@@ -900,8 +898,9 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
setattr(config, key, value) setattr(config, key, value)
config.save() config.save()
job_creds = (set(kwargs.get('credentials', [])) - job_creds = set(kwargs.get('credentials', []))
set(self.unified_job_template.credentials.all())) if 'credentials' in [field.name for field in parent._meta.get_fields()]:
job_creds = job_creds - set(parent.credentials.all())
if job_creds: if job_creds:
config.credentials.add(*job_creds) config.credentials.add(*job_creds)
return config return config

View File

@@ -4,11 +4,13 @@
# Python # Python
#import urlparse #import urlparse
import logging import logging
import six
# Django # Django
from django.db import models from django.db import models
from django.conf import settings from django.conf import settings
from django.utils.translation import ugettext_lazy as _ from django.utils.translation import ugettext_lazy as _
from django.core.exceptions import ObjectDoesNotExist
#from django import settings as tower_settings #from django import settings as tower_settings
# AWX # AWX
@@ -206,6 +208,15 @@ class WorkflowJobNode(WorkflowNodeBase):
workflow_pk=self.pk, workflow_pk=self.pk,
error_text=errors)) error_text=errors))
data.update(accepted_fields) # missing fields are handled in the scheduler data.update(accepted_fields) # missing fields are handled in the scheduler
try:
# config saved on the workflow job itself
wj_config = self.workflow_job.launch_config
except ObjectDoesNotExist:
wj_config = None
if wj_config:
accepted_fields, ignored_fields, errors = ujt_obj._accept_or_ignore_job_kwargs(**wj_config.prompts_dict())
accepted_fields.pop('extra_vars', None) # merge handled with other extra_vars later
data.update(accepted_fields)
# build ancestor artifacts, save them to node model for later # build ancestor artifacts, save them to node model for later
aa_dict = {} aa_dict = {}
for parent_node in self.get_parent_nodes(): for parent_node in self.get_parent_nodes():
@@ -240,6 +251,15 @@ class WorkflowJobNode(WorkflowNodeBase):
data['extra_vars'] = extra_vars data['extra_vars'] = extra_vars
# ensure that unified jobs created by WorkflowJobs are marked # ensure that unified jobs created by WorkflowJobs are marked
data['_eager_fields'] = {'launch_type': 'workflow'} data['_eager_fields'] = {'launch_type': 'workflow'}
# Extra processing in the case that this is a sharded job
if 'job_shard' in self.ancestor_artifacts:
shard_str = six.text_type(self.ancestor_artifacts['job_shard'] + 1)
data['_eager_fields']['name'] = six.text_type("{} - {}").format(
self.unified_job_template.name[:512 - len(shard_str) - len(' - ')],
shard_str
)
data['_eager_fields']['allow_simultaneous'] = True
data['_prevent_sharding'] = True
return data return data
@@ -261,6 +281,12 @@ class WorkflowJobOptions(BaseModel):
def workflow_nodes(self): def workflow_nodes(self):
raise NotImplementedError() raise NotImplementedError()
@classmethod
def _get_unified_job_field_names(cls):
return set(f.name for f in WorkflowJobOptions._meta.fields) | set(
['name', 'description', 'schedule', 'survey_passwords', 'labels']
)
def _create_workflow_nodes(self, old_node_list, user=None): def _create_workflow_nodes(self, old_node_list, user=None):
node_links = {} node_links = {}
for old_node in old_node_list: for old_node in old_node_list:
@@ -331,12 +357,6 @@ class WorkflowJobTemplate(UnifiedJobTemplate, WorkflowJobOptions, SurveyJobTempl
def _get_unified_job_class(cls): def _get_unified_job_class(cls):
return WorkflowJob return WorkflowJob
@classmethod
def _get_unified_job_field_names(cls):
return set(f.name for f in WorkflowJobOptions._meta.fields) | set(
['name', 'description', 'schedule', 'survey_passwords', 'labels']
)
@classmethod @classmethod
def _get_unified_jt_copy_names(cls): def _get_unified_jt_copy_names(cls):
base_list = super(WorkflowJobTemplate, cls)._get_unified_jt_copy_names() base_list = super(WorkflowJobTemplate, cls)._get_unified_jt_copy_names()
@@ -446,8 +466,10 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, SurveyJobMixin, JobNotificatio
def workflow_nodes(self): def workflow_nodes(self):
return self.workflow_job_nodes return self.workflow_job_nodes
@classmethod def _get_parent_field_name(self):
def _get_parent_field_name(cls): if self.job_template_id:
# This is a workflow job which is a container for sharded jobs
return 'job_template'
return 'workflow_job_template' return 'workflow_job_template'
@classmethod @classmethod

View File

@@ -117,9 +117,6 @@ class TaskManager():
continue continue
kv = spawn_node.get_job_kwargs() kv = spawn_node.get_job_kwargs()
job = spawn_node.unified_job_template.create_unified_job(**kv) job = spawn_node.unified_job_template.create_unified_job(**kv)
if 'job_shard' in spawn_node.ancestor_artifacts:
job.name = six.text_type("{} - {}").format(job.name, spawn_node.ancestor_artifacts['job_shard'] + 1)
job.save()
spawn_node.job = job spawn_node.job = job
spawn_node.save() spawn_node.save()
logger.info('Spawned %s in %s for node %s', job.log_format, workflow_job.log_format, spawn_node.pk) logger.info('Spawned %s in %s for node %s', job.log_format, workflow_job.log_format, spawn_node.pk)

View File

@@ -776,3 +776,37 @@ def sqlite_copy_expert(request):
def disable_database_settings(mocker): def disable_database_settings(mocker):
m = mocker.patch('awx.conf.settings.SettingsWrapper.all_supported_settings', new_callable=PropertyMock) m = mocker.patch('awx.conf.settings.SettingsWrapper.all_supported_settings', new_callable=PropertyMock)
m.return_value = [] m.return_value = []
@pytest.fixture
def shard_jt_factory(inventory):
def r(N, jt_kwargs=None):
for i in range(N):
inventory.hosts.create(name='foo{}'.format(i))
if not jt_kwargs:
jt_kwargs = {}
return JobTemplate.objects.create(
name='shard-jt-from-factory',
job_shard_count=N,
inventory=inventory,
**jt_kwargs
)
return r
@pytest.fixture
def shard_job_factory(shard_jt_factory):
def r(N, jt_kwargs=None, prompts=None, spawn=False):
shard_jt = shard_jt_factory(N, jt_kwargs=jt_kwargs)
if not prompts:
prompts = {}
shard_job = shard_jt.create_unified_job(**prompts)
if spawn:
for node in shard_job.workflow_nodes.all():
# does what the task manager does for spawning workflow jobs
kv = node.get_job_kwargs()
job = node.unified_job_template.create_unified_job(**kv)
node.job = job
node.save()
return shard_job
return r

View File

@@ -1,7 +1,7 @@
import pytest import pytest
import six import six
from awx.main.models import JobTemplate, Job, JobHostSummary from awx.main.models import JobTemplate, Job, JobHostSummary, WorkflowJob
from crum import impersonate from crum import impersonate
@@ -81,3 +81,22 @@ def test_job_host_summary_representation(host):
jhs = JobHostSummary.objects.get(pk=jhs.id) jhs = JobHostSummary.objects.get(pk=jhs.id)
host.delete() host.delete()
assert 'N/A changed=1 dark=2 failures=3 ok=4 processed=5 skipped=6' == six.text_type(jhs) assert 'N/A changed=1 dark=2 failures=3 ok=4 processed=5 skipped=6' == six.text_type(jhs)
@pytest.mark.django_db
class TestShardingModels:
def test_shard_workflow_spawn(self, shard_jt_factory):
shard_jt = shard_jt_factory(3)
job = shard_jt.create_unified_job()
assert isinstance(job, WorkflowJob)
assert job.job_template == shard_jt
assert job.unified_job_template == shard_jt
assert job.workflow_nodes.count() == 3
def test_shards_with_JT_and_prompts(self, shard_job_factory):
job = shard_job_factory(3, jt_kwargs={'ask_limit_on_launch': True}, prompts={'limit': 'foobar'}, spawn=True)
assert job.launch_config.prompts_dict() == {'limit': 'foobar'}
for node in job.workflow_nodes.all():
assert node.limit == None # data not saved in node prompts
job = node.job
assert job.limit == 'foobar'

View File

@@ -3,6 +3,11 @@ import pytest
from awx.main.models.inventory import Inventory from awx.main.models.inventory import Inventory
from awx.main.models.credential import Credential from awx.main.models.credential import Credential
from awx.main.models.jobs import JobTemplate, Job from awx.main.models.jobs import JobTemplate, Job
from awx.main.access import (
UnifiedJobAccess,
WorkflowJobAccess, WorkflowJobNodeAccess,
JobAccess
)
@pytest.mark.django_db @pytest.mark.django_db
@@ -43,6 +48,31 @@ def test_inventory_use_access(inventory, user):
assert common_user.can_access(Inventory, 'use', inventory) assert common_user.can_access(Inventory, 'use', inventory)
@pytest.mark.django_db
def test_sharded_job(shard_job_factory, rando):
workflow_job = shard_job_factory(2, jt_kwargs={'created_by': rando}, spawn=True)
workflow_job.job_template.execute_role.members.add(rando)
# Abilities of user with execute_role for shard workflow job container
assert WorkflowJobAccess(rando).can_start(workflow_job) # relaunch allowed
for access_cls in (UnifiedJobAccess, WorkflowJobAccess):
access = access_cls(rando)
assert access.can_read(workflow_job)
assert workflow_job in access.get_queryset()
# Abilities of user with execute_role for all the shards of the job
for node in workflow_job.workflow_nodes.all():
access = WorkflowJobNodeAccess(rando)
assert access.can_read(node)
assert node in access.get_queryset()
job = node.job
assert JobAccess(rando).can_start(job) # relaunch allowed
for access_cls in (UnifiedJobAccess, JobAccess):
access = access_cls(rando)
assert access.can_read(job)
assert job in access.get_queryset()
@pytest.mark.django_db @pytest.mark.django_db
class TestJobRelaunchAccess: class TestJobRelaunchAccess:
@pytest.fixture @pytest.fixture