mirror of
https://github.com/ansible/awx.git
synced 2026-02-16 18:50:04 -03:30
refactor of create_unified_job in connection with new copy endpoint development
This commit is contained in:
@@ -3370,8 +3370,7 @@ class JobRelaunch(RetrieveAPIView, GenericAPIView):
|
|||||||
if not serializer.is_valid():
|
if not serializer.is_valid():
|
||||||
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
|
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
|
||||||
|
|
||||||
obj.launch_type = 'relaunch'
|
new_job = obj.copy_unified_job()
|
||||||
new_job = obj.copy()
|
|
||||||
result = new_job.signal_start(**request.data)
|
result = new_job.signal_start(**request.data)
|
||||||
if not result:
|
if not result:
|
||||||
data = dict(passwords_needed_to_start=new_job.passwords_needed_to_start)
|
data = dict(passwords_needed_to_start=new_job.passwords_needed_to_start)
|
||||||
|
|||||||
@@ -1549,7 +1549,7 @@ class WorkflowJobTemplateAccess(BaseAccess):
|
|||||||
if node.credential and self.user not in node.credential.use_role:
|
if node.credential and self.user not in node.credential.use_role:
|
||||||
node_errors['credential'] = 'Prompted credential %s can not be coppied.' % node.credential.name
|
node_errors['credential'] = 'Prompted credential %s can not be coppied.' % node.credential.name
|
||||||
ujt = node.unified_job_template
|
ujt = node.unified_job_template
|
||||||
if ujt and not self.user.can_access(UnifiedJobTemplate, 'start', ujt):
|
if ujt and not self.user.can_access(UnifiedJobTemplate, 'start', ujt, validate_license=False):
|
||||||
node_errors['unified_job_template'] = (
|
node_errors['unified_job_template'] = (
|
||||||
'Prompted %s %s can not be coppied.' % (ujt._meta.verbose_name_raw, ujt.name))
|
'Prompted %s %s can not be coppied.' % (ujt._meta.verbose_name_raw, ujt.name))
|
||||||
if node_errors:
|
if node_errors:
|
||||||
@@ -1833,6 +1833,11 @@ class UnifiedJobTemplateAccess(BaseAccess):
|
|||||||
|
|
||||||
return qs.all()
|
return qs.all()
|
||||||
|
|
||||||
|
def can_start(self, obj, validate_license=True):
|
||||||
|
access_class = access_registry.get(obj.__class__, [])[0]
|
||||||
|
access_instance = access_class(self.user)
|
||||||
|
return access_instance.can_start(obj, validate_license=validate_license)
|
||||||
|
|
||||||
|
|
||||||
class UnifiedJobAccess(BaseAccess):
|
class UnifiedJobAccess(BaseAccess):
|
||||||
'''
|
'''
|
||||||
|
|||||||
@@ -633,14 +633,6 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin):
|
|||||||
content = super(Job, self)._result_stdout_raw(*args, **kwargs)
|
content = super(Job, self)._result_stdout_raw(*args, **kwargs)
|
||||||
return self._survey_search_and_replace(content)
|
return self._survey_search_and_replace(content)
|
||||||
|
|
||||||
def copy(self):
|
|
||||||
presets = {}
|
|
||||||
for kw in JobTemplate._get_unified_job_field_names():
|
|
||||||
presets[kw] = getattr(self, kw)
|
|
||||||
if not self.job_template:
|
|
||||||
self.job_template = JobTemplate(name='temporary')
|
|
||||||
return self.job_template.create_unified_job(**presets)
|
|
||||||
|
|
||||||
# Job Credential required
|
# Job Credential required
|
||||||
@property
|
@property
|
||||||
def can_start(self):
|
def can_start(self):
|
||||||
|
|||||||
@@ -102,9 +102,6 @@ class SurveyJobTemplateMixin(models.Model):
|
|||||||
Combine extra_vars with variable precedence order:
|
Combine extra_vars with variable precedence order:
|
||||||
JT extra_vars -> JT survey defaults -> runtime extra_vars
|
JT extra_vars -> JT survey defaults -> runtime extra_vars
|
||||||
'''
|
'''
|
||||||
if 'launch_type' in kwargs and kwargs['launch_type'] == 'relaunch':
|
|
||||||
return kwargs
|
|
||||||
|
|
||||||
# Job Template extra_vars
|
# Job Template extra_vars
|
||||||
extra_vars = self.extra_vars_dict
|
extra_vars = self.extra_vars_dict
|
||||||
|
|
||||||
|
|||||||
@@ -30,7 +30,10 @@ from djcelery.models import TaskMeta
|
|||||||
# AWX
|
# AWX
|
||||||
from awx.main.models.base import * # noqa
|
from awx.main.models.base import * # noqa
|
||||||
from awx.main.models.schedules import Schedule
|
from awx.main.models.schedules import Schedule
|
||||||
from awx.main.utils import decrypt_field, _inventory_updates
|
from awx.main.utils import (
|
||||||
|
decrypt_field, _inventory_updates,
|
||||||
|
copy_model_by_class, copy_m2m_relationships
|
||||||
|
)
|
||||||
from awx.main.redact import UriCleaner, REPLACE_STR
|
from awx.main.redact import UriCleaner, REPLACE_STR
|
||||||
from awx.main.consumers import emit_channel_notification
|
from awx.main.consumers import emit_channel_notification
|
||||||
from awx.main.fields import JSONField
|
from awx.main.fields import JSONField
|
||||||
@@ -304,46 +307,13 @@ class UnifiedJobTemplate(PolymorphicModel, CommonModelNameNotUnique, Notificatio
|
|||||||
Create a new unified job based on this unified job template.
|
Create a new unified job based on this unified job template.
|
||||||
'''
|
'''
|
||||||
unified_job_class = self._get_unified_job_class()
|
unified_job_class = self._get_unified_job_class()
|
||||||
|
fields = self._get_unified_job_field_names()
|
||||||
|
unified_job = copy_model_by_class(self, unified_job_class, fields, kwargs)
|
||||||
|
|
||||||
|
# Set the unified job template back-link on the job
|
||||||
parent_field_name = unified_job_class._get_parent_field_name()
|
parent_field_name = unified_job_class._get_parent_field_name()
|
||||||
kwargs.pop('%s_id' % parent_field_name, None)
|
setattr(unified_job, parent_field_name, self)
|
||||||
create_kwargs = {}
|
|
||||||
m2m_fields = {}
|
|
||||||
if self.pk:
|
|
||||||
create_kwargs[parent_field_name] = self
|
|
||||||
for field_name in self._get_unified_job_field_names():
|
|
||||||
# Foreign keys can be specified as field_name or field_name_id.
|
|
||||||
id_field_name = '%s_id' % field_name
|
|
||||||
if hasattr(self, id_field_name):
|
|
||||||
if field_name in kwargs:
|
|
||||||
value = kwargs[field_name]
|
|
||||||
elif id_field_name in kwargs:
|
|
||||||
value = kwargs[id_field_name]
|
|
||||||
else:
|
|
||||||
value = getattr(self, id_field_name)
|
|
||||||
if hasattr(value, 'id'):
|
|
||||||
value = value.id
|
|
||||||
create_kwargs[id_field_name] = value
|
|
||||||
elif field_name in kwargs:
|
|
||||||
if field_name == 'extra_vars' and isinstance(kwargs[field_name], dict):
|
|
||||||
create_kwargs[field_name] = json.dumps(kwargs['extra_vars'])
|
|
||||||
# We can't get a hold of django.db.models.fields.related.ManyRelatedManager to compare
|
|
||||||
# so this is the next best thing.
|
|
||||||
elif kwargs[field_name].__class__.__name__ is 'ManyRelatedManager':
|
|
||||||
m2m_fields[field_name] = kwargs[field_name]
|
|
||||||
else:
|
|
||||||
create_kwargs[field_name] = kwargs[field_name]
|
|
||||||
elif hasattr(self, field_name):
|
|
||||||
field_obj = self._meta.get_field_by_name(field_name)[0]
|
|
||||||
# Many to Many can be specified as field_name
|
|
||||||
if isinstance(field_obj, models.ManyToManyField):
|
|
||||||
m2m_fields[field_name] = getattr(self, field_name)
|
|
||||||
else:
|
|
||||||
create_kwargs[field_name] = getattr(self, field_name)
|
|
||||||
if hasattr(self, '_update_unified_job_kwargs'):
|
|
||||||
new_kwargs = self._update_unified_job_kwargs(**create_kwargs)
|
|
||||||
else:
|
|
||||||
new_kwargs = create_kwargs
|
|
||||||
unified_job = unified_job_class(**new_kwargs)
|
|
||||||
# For JobTemplate-based jobs with surveys, add passwords to list for perma-redaction
|
# For JobTemplate-based jobs with surveys, add passwords to list for perma-redaction
|
||||||
if hasattr(self, 'survey_spec') and getattr(self, 'survey_enabled', False):
|
if hasattr(self, 'survey_spec') and getattr(self, 'survey_enabled', False):
|
||||||
password_list = self.survey_password_variables()
|
password_list = self.survey_password_variables()
|
||||||
@@ -351,10 +321,10 @@ class UnifiedJobTemplate(PolymorphicModel, CommonModelNameNotUnique, Notificatio
|
|||||||
for password in password_list:
|
for password in password_list:
|
||||||
hide_password_dict[password] = REPLACE_STR
|
hide_password_dict[password] = REPLACE_STR
|
||||||
unified_job.survey_passwords = hide_password_dict
|
unified_job.survey_passwords = hide_password_dict
|
||||||
|
|
||||||
unified_job.save()
|
unified_job.save()
|
||||||
for field_name, src_field_value in m2m_fields.iteritems():
|
# Labels coppied here
|
||||||
dest_field = getattr(unified_job, field_name)
|
copy_m2m_relationships(self, unified_job, fields, kwargs=kwargs)
|
||||||
dest_field.add(*list(src_field_value.all().values_list('id', flat=True)))
|
|
||||||
return unified_job
|
return unified_job
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
@@ -363,33 +333,18 @@ class UnifiedJobTemplate(PolymorphicModel, CommonModelNameNotUnique, Notificatio
|
|||||||
|
|
||||||
def copy_unified_jt(self):
|
def copy_unified_jt(self):
|
||||||
'''
|
'''
|
||||||
|
Returns saved object, including related fields.
|
||||||
Create a copy of this unified job template.
|
Create a copy of this unified job template.
|
||||||
'''
|
'''
|
||||||
unified_jt_class = self.__class__
|
unified_jt_class = self.__class__
|
||||||
create_kwargs = {}
|
fields = self._get_unified_jt_copy_names()
|
||||||
m2m_fields = {}
|
unified_jt = copy_model_by_class(self, unified_jt_class, fields, {})
|
||||||
for field_name in self._get_unified_jt_copy_names():
|
|
||||||
# Foreign keys can be specified as field_name or field_name_id.
|
|
||||||
id_field_name = '%s_id' % field_name
|
|
||||||
if hasattr(self, id_field_name):
|
|
||||||
value = getattr(self, id_field_name)
|
|
||||||
if hasattr(value, 'id'):
|
|
||||||
value = value.id
|
|
||||||
create_kwargs[id_field_name] = value
|
|
||||||
elif hasattr(self, field_name):
|
|
||||||
field_obj = self._meta.get_field_by_name(field_name)[0]
|
|
||||||
# Many to Many can be specified as field_name
|
|
||||||
if isinstance(field_obj, models.ManyToManyField):
|
|
||||||
m2m_fields[field_name] = getattr(self, field_name)
|
|
||||||
else:
|
|
||||||
create_kwargs[field_name] = getattr(self, field_name)
|
|
||||||
time_now = datetime.now()
|
time_now = datetime.now()
|
||||||
create_kwargs['name'] = create_kwargs['name'] + ' @ ' + time_now.strftime('%H:%M:%S %p')
|
unified_jt.name = unified_jt.name + ' @ ' + time_now.strftime('%H:%M:%S %p')
|
||||||
unified_jt = unified_jt_class(**create_kwargs)
|
|
||||||
unified_jt.save()
|
unified_jt.save()
|
||||||
for field_name, src_field_value in m2m_fields.iteritems():
|
copy_m2m_relationships(self, unified_jt, fields)
|
||||||
dest_field = getattr(unified_jt, field_name)
|
|
||||||
dest_field.add(*list(src_field_value.all().values_list('id', flat=True)))
|
|
||||||
return unified_jt
|
return unified_jt
|
||||||
|
|
||||||
|
|
||||||
@@ -711,32 +666,20 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
|
|||||||
|
|
||||||
def copy_unified_job(self):
|
def copy_unified_job(self):
|
||||||
'''
|
'''
|
||||||
Create a copy of this unified job.
|
Returns saved object, including related fields.
|
||||||
|
Create a copy of this unified job for the purpose of relaunch
|
||||||
'''
|
'''
|
||||||
unified_job_class = self.__class__
|
unified_job_class = self.__class__
|
||||||
unified_jt_class = self._get_unified_job_template_class()
|
unified_jt_class = self._get_unified_job_template_class()
|
||||||
create_kwargs = {}
|
parent_field_name = unified_job_class._get_parent_field_name()
|
||||||
m2m_fields = {}
|
|
||||||
for field_name in unified_jt_class._get_unified_job_field_names():
|
fields = unified_jt_class._get_unified_job_field_names() + [parent_field_name]
|
||||||
# Foreign keys can be specified as field_name or field_name_id.
|
unified_job = copy_model_by_class(self, unified_job_class, fields, {})
|
||||||
id_field_name = '%s_id' % field_name
|
unified_job.job_type = 'relaunch'
|
||||||
if hasattr(self, id_field_name):
|
|
||||||
value = getattr(self, id_field_name)
|
|
||||||
if hasattr(value, 'id'):
|
|
||||||
value = value.id
|
|
||||||
create_kwargs[id_field_name] = value
|
|
||||||
elif hasattr(self, field_name):
|
|
||||||
field_obj = self._meta.get_field_by_name(field_name)[0]
|
|
||||||
# Many to Many can be specified as field_name
|
|
||||||
if isinstance(field_obj, models.ManyToManyField):
|
|
||||||
m2m_fields[field_name] = getattr(self, field_name)
|
|
||||||
else:
|
|
||||||
create_kwargs[field_name] = getattr(self, field_name)
|
|
||||||
unified_job = unified_job_class(**create_kwargs)
|
|
||||||
unified_job.save()
|
unified_job.save()
|
||||||
for field_name, src_field_value in m2m_fields.iteritems():
|
|
||||||
dest_field = getattr(unified_job, field_name)
|
# Labels coppied here
|
||||||
dest_field.add(*list(src_field_value.all().values_list('id', flat=True)))
|
copy_m2m_relationships(self, unified_job, fields)
|
||||||
return unified_job
|
return unified_job
|
||||||
|
|
||||||
def result_stdout_raw_handle(self, attempt=0):
|
def result_stdout_raw_handle(self, attempt=0):
|
||||||
|
|||||||
@@ -198,7 +198,7 @@ class WorkflowJobTemplateNode(WorkflowNodeBase):
|
|||||||
if not user.can_access(item.__class__, 'use', item):
|
if not user.can_access(item.__class__, 'use', item):
|
||||||
continue
|
continue
|
||||||
if field_name in ['unified_job_template']:
|
if field_name in ['unified_job_template']:
|
||||||
if not user.can_access(item.__class__, 'start', item):
|
if not user.can_access(item.__class__, 'start', item, validate_license=False):
|
||||||
continue
|
continue
|
||||||
create_kwargs[field_name] = item
|
create_kwargs[field_name] = item
|
||||||
create_kwargs['workflow_job_template'] = workflow_job_template
|
create_kwargs['workflow_job_template'] = workflow_job_template
|
||||||
@@ -323,7 +323,6 @@ class WorkflowJobOptions(BaseModel):
|
|||||||
self._inherit_node_relationships(old_node_list, node_links)
|
self._inherit_node_relationships(old_node_list, node_links)
|
||||||
|
|
||||||
def create_relaunch_workflow_job(self):
|
def create_relaunch_workflow_job(self):
|
||||||
self.launch_type = 'relaunch'
|
|
||||||
new_workflow_job = self.copy_unified_job()
|
new_workflow_job = self.copy_unified_job()
|
||||||
new_workflow_job.copy_nodes_from_original(original=self)
|
new_workflow_job.copy_nodes_from_original(original=self)
|
||||||
return new_workflow_job
|
return new_workflow_job
|
||||||
@@ -436,7 +435,7 @@ class WorkflowJobTemplate(UnifiedJobTemplate, WorkflowJobOptions, SurveyJobTempl
|
|||||||
return new_wfjt
|
return new_wfjt
|
||||||
|
|
||||||
|
|
||||||
# Stub in place because of old migraitons, can remove if migraitons are squashed
|
# Stub in place because of old migrations, can remove if migrations are squashed
|
||||||
class WorkflowJobInheritNodesMixin(object):
|
class WorkflowJobInheritNodesMixin(object):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|||||||
@@ -178,7 +178,7 @@ def mk_workflow_job_template(name, extra_vars='', spec=None, organization=None,
|
|||||||
wfjt = WorkflowJobTemplate(name=name, extra_vars=extra_vars, organization=organization)
|
wfjt = WorkflowJobTemplate(name=name, extra_vars=extra_vars, organization=organization)
|
||||||
|
|
||||||
wfjt.survey_spec = spec
|
wfjt.survey_spec = spec
|
||||||
if wfjt.survey_spec is not None:
|
if wfjt.survey_spec:
|
||||||
wfjt.survey_enabled = True
|
wfjt.survey_enabled = True
|
||||||
|
|
||||||
if persisted:
|
if persisted:
|
||||||
|
|||||||
@@ -393,8 +393,6 @@ def create_workflow_job_template(name, organization=None, persisted=True, **kwar
|
|||||||
|
|
||||||
if 'survey' in kwargs:
|
if 'survey' in kwargs:
|
||||||
spec = create_survey_spec(kwargs['survey'])
|
spec = create_survey_spec(kwargs['survey'])
|
||||||
else:
|
|
||||||
spec = {}
|
|
||||||
|
|
||||||
wfjt = mk_workflow_job_template(name,
|
wfjt = mk_workflow_job_template(name,
|
||||||
organization=organization,
|
organization=organization,
|
||||||
|
|||||||
@@ -261,7 +261,7 @@ def test_job_relaunch_copy_vars(job_with_links, machine_credential, inventory,
|
|||||||
job_with_links.limit = "my_server"
|
job_with_links.limit = "my_server"
|
||||||
with mocker.patch('awx.main.models.unified_jobs.UnifiedJobTemplate._get_unified_job_field_names',
|
with mocker.patch('awx.main.models.unified_jobs.UnifiedJobTemplate._get_unified_job_field_names',
|
||||||
return_value=['inventory', 'credential', 'limit']):
|
return_value=['inventory', 'credential', 'limit']):
|
||||||
second_job = job_with_links.copy()
|
second_job = job_with_links.copy_unified_job()
|
||||||
|
|
||||||
# Check that job data matches the original variables
|
# Check that job data matches the original variables
|
||||||
assert second_job.credential == job_with_links.credential
|
assert second_job.credential == job_with_links.credential
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ import pytest
|
|||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
def test_orphan_unified_job_creation(instance, inventory):
|
def test_orphan_unified_job_creation(instance, inventory):
|
||||||
job = Job.objects.create(job_template=None, inventory=inventory, name='hi world')
|
job = Job.objects.create(job_template=None, inventory=inventory, name='hi world')
|
||||||
job2 = job.copy()
|
job2 = job.copy_unified_job()
|
||||||
assert job2.job_template is None
|
assert job2.job_template is None
|
||||||
assert job2.inventory == inventory
|
assert job2.inventory == inventory
|
||||||
assert job2.name == 'hi world'
|
assert job2.name == 'hi world'
|
||||||
|
|||||||
@@ -24,7 +24,7 @@ class TestWorkflowJobInheritNodesMixin():
|
|||||||
|
|
||||||
mixin = WorkflowJobOptions()
|
mixin = WorkflowJobOptions()
|
||||||
mixin._create_workflow_nodes(job_template_nodes)
|
mixin._create_workflow_nodes(job_template_nodes)
|
||||||
|
|
||||||
for job_template_node in job_template_nodes:
|
for job_template_node in job_template_nodes:
|
||||||
workflow_job_node_create.assert_any_call(workflow_job=mixin)
|
workflow_job_node_create.assert_any_call(workflow_job=mixin)
|
||||||
|
|
||||||
|
|||||||
@@ -23,6 +23,7 @@ from decorator import decorator
|
|||||||
|
|
||||||
# Django
|
# Django
|
||||||
from django.utils.translation import ugettext_lazy as _
|
from django.utils.translation import ugettext_lazy as _
|
||||||
|
from django.db.models import ManyToManyField
|
||||||
|
|
||||||
# Django REST Framework
|
# Django REST Framework
|
||||||
from rest_framework.exceptions import ParseError, PermissionDenied
|
from rest_framework.exceptions import ParseError, PermissionDenied
|
||||||
@@ -38,7 +39,8 @@ logger = logging.getLogger('awx.main.utils')
|
|||||||
|
|
||||||
__all__ = ['get_object_or_400', 'get_object_or_403', 'camelcase_to_underscore', 'memoize',
|
__all__ = ['get_object_or_400', 'get_object_or_403', 'camelcase_to_underscore', 'memoize',
|
||||||
'get_ansible_version', 'get_ssh_version', 'get_awx_version', 'update_scm_url',
|
'get_ansible_version', 'get_ssh_version', 'get_awx_version', 'update_scm_url',
|
||||||
'get_type_for_model', 'get_model_for_type', 'cache_list_capabilities', 'to_python_boolean',
|
'get_type_for_model', 'get_model_for_type', 'copy_model_by_class',
|
||||||
|
'copy_m2m_relationships' ,'cache_list_capabilities', 'to_python_boolean',
|
||||||
'ignore_inventory_computed_fields', 'ignore_inventory_group_removal',
|
'ignore_inventory_computed_fields', 'ignore_inventory_group_removal',
|
||||||
'_inventory_updates', 'get_pk_from_dict', 'getattrd', 'NoDefaultProvided',
|
'_inventory_updates', 'get_pk_from_dict', 'getattrd', 'NoDefaultProvided',
|
||||||
'get_current_apps', 'set_current_apps', 'OutputEventFilter']
|
'get_current_apps', 'set_current_apps', 'OutputEventFilter']
|
||||||
@@ -410,6 +412,66 @@ def model_to_dict(obj, serializer_mapping=None):
|
|||||||
return attr_d
|
return attr_d
|
||||||
|
|
||||||
|
|
||||||
|
def copy_model_by_class(obj1, Class2, fields, kwargs):
|
||||||
|
'''
|
||||||
|
Creates a new unsaved object of type Class2 using the fields from obj1
|
||||||
|
values in kwargs can override obj1
|
||||||
|
'''
|
||||||
|
create_kwargs = {}
|
||||||
|
for field_name in fields:
|
||||||
|
# Foreign keys can be specified as field_name or field_name_id.
|
||||||
|
id_field_name = '%s_id' % field_name
|
||||||
|
if hasattr(obj1, id_field_name):
|
||||||
|
if field_name in kwargs:
|
||||||
|
value = kwargs[field_name]
|
||||||
|
elif id_field_name in kwargs:
|
||||||
|
value = kwargs[id_field_name]
|
||||||
|
else:
|
||||||
|
value = getattr(obj1, id_field_name)
|
||||||
|
if hasattr(value, 'id'):
|
||||||
|
value = value.id
|
||||||
|
create_kwargs[id_field_name] = value
|
||||||
|
elif field_name in kwargs:
|
||||||
|
if field_name == 'extra_vars' and isinstance(kwargs[field_name], dict):
|
||||||
|
create_kwargs[field_name] = json.dumps(kwargs['extra_vars'])
|
||||||
|
# We can't get a hold of django.db.models.fields.related.ManyRelatedManager to compare
|
||||||
|
# so this is the next best thing.
|
||||||
|
elif kwargs[field_name].__class__.__name__ is not 'ManyRelatedManager':
|
||||||
|
create_kwargs[field_name] = kwargs[field_name]
|
||||||
|
elif hasattr(obj1, field_name):
|
||||||
|
field_obj = obj1._meta.get_field_by_name(field_name)[0]
|
||||||
|
if not isinstance(field_obj, ManyToManyField):
|
||||||
|
create_kwargs[field_name] = getattr(obj1, field_name)
|
||||||
|
|
||||||
|
# Apply class-specific extra processing for origination of unified jobs
|
||||||
|
if hasattr(obj1, '_update_unified_job_kwargs') and obj1.__class__ != Class2:
|
||||||
|
new_kwargs = obj1._update_unified_job_kwargs(**create_kwargs)
|
||||||
|
else:
|
||||||
|
new_kwargs = create_kwargs
|
||||||
|
|
||||||
|
return Class2(**new_kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
def copy_m2m_relationships(obj1, obj2, fields, kwargs=None):
|
||||||
|
'''
|
||||||
|
In-place operation.
|
||||||
|
Given two saved objects, copies related objects from obj1
|
||||||
|
to obj2 to field of same name, if field occurs in `fields`
|
||||||
|
'''
|
||||||
|
for field_name in fields:
|
||||||
|
if hasattr(obj1, field_name):
|
||||||
|
field_obj = obj1._meta.get_field_by_name(field_name)[0]
|
||||||
|
if isinstance(field_obj, ManyToManyField):
|
||||||
|
# Many to Many can be specified as field_name
|
||||||
|
src_field_value = getattr(obj1, field_name)
|
||||||
|
if kwargs and field_name in kwargs:
|
||||||
|
override_field_val = kwargs[field_name]
|
||||||
|
if override_field_val.__class__.__name__ is 'ManyRelatedManager':
|
||||||
|
src_field_value = override_field_val
|
||||||
|
dest_field = getattr(obj2, field_name)
|
||||||
|
dest_field.add(*list(src_field_value.all().values_list('id', flat=True)))
|
||||||
|
|
||||||
|
|
||||||
def get_type_for_model(model):
|
def get_type_for_model(model):
|
||||||
'''
|
'''
|
||||||
Return type name for a given model class.
|
Return type name for a given model class.
|
||||||
|
|||||||
Reference in New Issue
Block a user