Allow concurrent workflow job runs.

This commit is contained in:
Aaron Tan
2017-04-28 17:41:47 -04:00
parent 84b8dcece0
commit 057b24ccd0
6 changed files with 40 additions and 23 deletions

View File

@@ -2510,7 +2510,7 @@ class WorkflowJobTemplateSerializer(JobTemplateMixin, LabelsListMixin, UnifiedJo
class Meta:
model = WorkflowJobTemplate
fields = ('*', 'extra_vars', 'organization', 'survey_enabled',)
fields = ('*', 'extra_vars', 'organization', 'survey_enabled', 'allow_simultaneous',)
def get_related(self, obj):
res = super(WorkflowJobTemplateSerializer, self).get_related(obj)
@@ -2547,7 +2547,7 @@ class WorkflowJobSerializer(LabelsListMixin, UnifiedJobSerializer):
class Meta:
model = WorkflowJob
fields = ('*', 'workflow_job_template', 'extra_vars')
fields = ('*', 'workflow_job_template', 'extra_vars', 'allow_simultaneous',)
def get_related(self, obj):
res = super(WorkflowJobSerializer, self).get_related(obj)

View File

@@ -147,4 +147,16 @@ class Migration(migrations.Migration):
name='verbosity',
field=models.PositiveIntegerField(default=1, blank=True, choices=[(0, b'0 (WARNING)'), (1, b'1 (INFO)'), (2, b'2 (DEBUG)')]),
),
# Workflows
migrations.AddField(
model_name='workflowjob',
name='allow_simultaneous',
field=models.BooleanField(default=False),
),
migrations.AddField(
model_name='workflowjobtemplate',
name='allow_simultaneous',
field=models.BooleanField(default=False),
),
]

View File

@@ -284,6 +284,9 @@ class WorkflowJobOptions(BaseModel):
blank=True,
default='',
))
allow_simultaneous = models.BooleanField(
default=False
)
extra_vars_dict = VarsDictProperty('extra_vars', True)
@@ -356,7 +359,7 @@ class WorkflowJobTemplate(UnifiedJobTemplate, WorkflowJobOptions, SurveyJobTempl
@classmethod
def _get_unified_job_field_names(cls):
return ['name', 'description', 'extra_vars', 'labels', 'survey_passwords',
'schedule', 'launch_type']
'schedule', 'launch_type', 'allow_simultaneous']
@classmethod
def _get_unified_jt_copy_names(cls):

View File

@@ -78,7 +78,7 @@ class DependencyGraph(object):
'''
JobDict
Presume that job is related to a project that is update on launch
'''
def should_update_related_project(self, job):
@@ -98,7 +98,7 @@ class DependencyGraph(object):
'''
This is a bit of fuzzy logic.
If the latest project update has a created time == job_created_time-1
If the latest project update has a created time == job_created_time-1
then consider the project update found. This is so we don't enter an infinite loop
of updating the project when cache timeout is 0.
'''
@@ -178,6 +178,8 @@ class DependencyGraph(object):
return False
def can_workflow_job_run(self, job):
if job['allow_simultaneous'] is True:
return True
return self.data[self.WORKFLOW_JOB_TEMPLATES_JOBS].get(job['workflow_job_template_id'], True)
def can_system_job_run(self):
@@ -217,4 +219,3 @@ class DependencyGraph(object):
def add_jobs(self, jobs):
map(lambda j: self.add_job(j), jobs)

View File

@@ -58,7 +58,7 @@ class PartialModelDict(object):
def get_job_type_str(self):
raise RuntimeError("Inherit and implement me")
def task_impact(self):
raise RuntimeError("Inherit and implement me")
@@ -87,8 +87,8 @@ class PartialModelDict(object):
class JobDict(PartialModelDict):
FIELDS = (
'id', 'status', 'job_template_id', 'inventory_id', 'project_id',
'launch_type', 'limit', 'allow_simultaneous', 'created',
'id', 'status', 'job_template_id', 'inventory_id', 'project_id',
'launch_type', 'limit', 'allow_simultaneous', 'created',
'job_type', 'celery_task_id', 'project__scm_update_on_launch',
'forks', 'start_args', 'dependent_jobs__id',
)
@@ -119,15 +119,15 @@ class JobDict(PartialModelDict):
class ProjectUpdateDict(PartialModelDict):
FIELDS = (
'id', 'status', 'project_id', 'created', 'celery_task_id',
'launch_type', 'project__scm_update_cache_timeout',
'id', 'status', 'project_id', 'created', 'celery_task_id',
'launch_type', 'project__scm_update_cache_timeout',
'project__scm_update_on_launch',
)
model = ProjectUpdate
def get_job_type_str(self):
return 'project_update'
def task_impact(self):
return 10
@@ -142,8 +142,8 @@ class ProjectUpdateDict(PartialModelDict):
class ProjectUpdateLatestDict(ProjectUpdateDict):
FIELDS = (
'id', 'status', 'project_id', 'created', 'finished',
'project__scm_update_cache_timeout',
'id', 'status', 'project_id', 'created', 'finished',
'project__scm_update_cache_timeout',
'launch_type', 'project__scm_update_on_launch',
)
model = ProjectUpdate
@@ -162,7 +162,7 @@ class ProjectUpdateLatestDict(ProjectUpdateDict):
class InventoryUpdateDict(PartialModelDict):
#'inventory_source__update_on_launch',
#'inventory_source__update_on_launch',
#'inventory_source__update_cache_timeout',
FIELDS = (
'id', 'status', 'created', 'celery_task_id', 'inventory_source_id',
@@ -178,10 +178,10 @@ class InventoryUpdateDict(PartialModelDict):
class InventoryUpdateLatestDict(InventoryUpdateDict):
#'inventory_source__update_on_launch',
#'inventory_source__update_on_launch',
#'inventory_source__update_cache_timeout',
FIELDS = (
'id', 'status', 'created', 'celery_task_id', 'inventory_source_id',
'id', 'status', 'created', 'celery_task_id', 'inventory_source_id',
'finished', 'inventory_source__update_cache_timeout', 'launch_type',
'inventory_source__update_on_launch',
)
@@ -198,7 +198,7 @@ class InventoryUpdateLatestDict(InventoryUpdateDict):
update_on_launch=True).values_list('id', flat=True)
# Find the most recent inventory update for each inventory source
for inventory_source_id in inventory_source_ids:
qs = cls.model.objects.filter(inventory_source_id=inventory_source_id,
qs = cls.model.objects.filter(inventory_source_id=inventory_source_id,
status__in=['waiting', 'successful', 'failed'],
inventory_source__update_on_launch=True).order_by('-finished', '-started', '-created')
if qs.count() > 0:
@@ -263,7 +263,7 @@ class AdHocCommandDict(PartialModelDict):
class WorkflowJobDict(PartialModelDict):
FIELDS = (
'id', 'created', 'status', 'workflow_job_template_id',
'id', 'created', 'status', 'workflow_job_template_id', 'allow_simultaneous',
)
model = WorkflowJob
@@ -272,4 +272,3 @@ class WorkflowJobDict(PartialModelDict):
def task_impact(self):
return 0