mirror of
https://github.com/ansible/awx.git
synced 2026-02-25 06:56:00 -03:30
consolidate workflow migration
This commit is contained in:
@@ -180,25 +180,20 @@ class WorkflowDAG(SimpleDAG):
|
|||||||
for index, n in enumerate(nodes):
|
for index, n in enumerate(nodes):
|
||||||
obj = n['node_object']
|
obj = n['node_object']
|
||||||
job = obj.job
|
job = obj.job
|
||||||
print("\t\tExamining node %s job %s" % (obj, job))
|
|
||||||
|
|
||||||
if not job:
|
if not job:
|
||||||
print("\t\tNo job for node %s" % obj)
|
|
||||||
nodes_found.append(n)
|
nodes_found.append(n)
|
||||||
# Job is about to run or is running. Hold our horses and wait for
|
# Job is about to run or is running. Hold our horses and wait for
|
||||||
# the job to finish. We can't proceed down the graph path until we
|
# the job to finish. We can't proceed down the graph path until we
|
||||||
# have the job result.
|
# have the job result.
|
||||||
elif job.status not in ['failed', 'error', 'successful']:
|
elif job.status not in ['failed', 'error', 'successful']:
|
||||||
print("\t\tJob status not 'failed' 'error' nor 'successful' %s" % job.status)
|
|
||||||
continue
|
continue
|
||||||
elif job.status in ['failed', 'error']:
|
elif job.status in ['failed', 'error']:
|
||||||
print("\t\tJob status is failed or error %s" % job.status)
|
|
||||||
children_failed = self.get_dependencies(obj, 'failure_nodes')
|
children_failed = self.get_dependencies(obj, 'failure_nodes')
|
||||||
children_always = self.get_dependencies(obj, 'always_nodes')
|
children_always = self.get_dependencies(obj, 'always_nodes')
|
||||||
children_all = children_failed + children_always
|
children_all = children_failed + children_always
|
||||||
nodes.extend(children_all)
|
nodes.extend(children_all)
|
||||||
elif job.status in ['successful']:
|
elif job.status in ['successful']:
|
||||||
print("\t\tJob status is successful %s" % job.status)
|
|
||||||
children_success = self.get_dependencies(obj, 'success_nodes')
|
children_success = self.get_dependencies(obj, 'success_nodes')
|
||||||
nodes.extend(children_success)
|
nodes.extend(children_success)
|
||||||
else:
|
else:
|
||||||
@@ -225,7 +220,7 @@ class WorkflowDAG(SimpleDAG):
|
|||||||
children_always = self.get_dependencies(obj, 'always_nodes')
|
children_always = self.get_dependencies(obj, 'always_nodes')
|
||||||
children_all = children_failed + children_always
|
children_all = children_failed + children_always
|
||||||
nodes.extend(children_all)
|
nodes.extend(children_all)
|
||||||
elif job.status in ['successful']:
|
elif job.status in ['successfult']:
|
||||||
children_success = self.get_dependencies(obj, 'success_nodes')
|
children_success = self.get_dependencies(obj, 'success_nodes')
|
||||||
nodes.extend(children_success)
|
nodes.extend(children_success)
|
||||||
else:
|
else:
|
||||||
@@ -261,22 +256,13 @@ def get_running_workflow_jobs():
|
|||||||
|
|
||||||
def do_spawn_workflow_jobs():
|
def do_spawn_workflow_jobs():
|
||||||
workflow_jobs = get_running_workflow_jobs()
|
workflow_jobs = get_running_workflow_jobs()
|
||||||
print("Set of workflow jobs to process %s" % workflow_jobs)
|
|
||||||
for workflow_job in workflow_jobs:
|
for workflow_job in workflow_jobs:
|
||||||
print("Building the dag")
|
|
||||||
dag = WorkflowDAG(workflow_job)
|
dag = WorkflowDAG(workflow_job)
|
||||||
print("Imported the workflow job dag")
|
|
||||||
for n in dag.nodes:
|
|
||||||
print("\tWorkflow dag node %s" % n)
|
|
||||||
for f, to, label in dag.edges:
|
|
||||||
print("\tWorkflow dag edge <%s,%s,%s>" % (f, to, label))
|
|
||||||
spawn_nodes = dag.bfs_nodes_to_run()
|
spawn_nodes = dag.bfs_nodes_to_run()
|
||||||
for spawn_node in spawn_nodes:
|
for spawn_node in spawn_nodes:
|
||||||
print("Spawning job %s" % spawn_node)
|
|
||||||
# TODO: Inject job template template params as kwargs
|
# TODO: Inject job template template params as kwargs
|
||||||
kv = {}
|
kv = {}
|
||||||
job = spawn_node.unified_job_template.create_unified_job(**kv)
|
job = spawn_node.unified_job_template.create_unified_job(**kv)
|
||||||
print("Started new job %s" % job.id)
|
|
||||||
spawn_node.job = job
|
spawn_node.job = job
|
||||||
spawn_node.save()
|
spawn_node.save()
|
||||||
result = job.signal_start(**kv)
|
result = job.signal_start(**kv)
|
||||||
|
|||||||
@@ -2,7 +2,9 @@
|
|||||||
from __future__ import unicode_literals
|
from __future__ import unicode_literals
|
||||||
|
|
||||||
from django.db import migrations, models
|
from django.db import migrations, models
|
||||||
|
import awx.main.models.notifications
|
||||||
import django.db.models.deletion
|
import django.db.models.deletion
|
||||||
|
import awx.main.models.workflow
|
||||||
import awx.main.fields
|
import awx.main.fields
|
||||||
|
|
||||||
|
|
||||||
@@ -22,7 +24,7 @@ class Migration(migrations.Migration):
|
|||||||
options={
|
options={
|
||||||
'ordering': ('id',),
|
'ordering': ('id',),
|
||||||
},
|
},
|
||||||
bases=('main.unifiedjob', models.Model),
|
bases=('main.unifiedjob', models.Model, awx.main.models.notifications.JobNotificationMixin, awx.main.models.workflow.WorkflowJobInheritNodesMixin),
|
||||||
),
|
),
|
||||||
migrations.CreateModel(
|
migrations.CreateModel(
|
||||||
name='WorkflowJobTemplate',
|
name='WorkflowJobTemplate',
|
||||||
@@ -41,10 +43,11 @@ class Migration(migrations.Migration):
|
|||||||
('modified', models.DateTimeField(default=None, editable=False)),
|
('modified', models.DateTimeField(default=None, editable=False)),
|
||||||
('always_nodes', models.ManyToManyField(related_name='parent_always_nodes', to='main.WorkflowNode', blank=True)),
|
('always_nodes', models.ManyToManyField(related_name='parent_always_nodes', to='main.WorkflowNode', blank=True)),
|
||||||
('failure_nodes', models.ManyToManyField(related_name='parent_failure_nodes', to='main.WorkflowNode', blank=True)),
|
('failure_nodes', models.ManyToManyField(related_name='parent_failure_nodes', to='main.WorkflowNode', blank=True)),
|
||||||
('job', models.ForeignKey(related_name='workflow_node', on_delete=django.db.models.deletion.SET_NULL, default=None, blank=True, to='main.UnifiedJob', null=True)),
|
('job', models.ForeignKey(related_name='unified_job_nodes', on_delete=django.db.models.deletion.SET_NULL, default=None, blank=True, to='main.UnifiedJob', null=True)),
|
||||||
('success_nodes', models.ManyToManyField(related_name='parent_success_nodes', to='main.WorkflowNode', blank=True)),
|
('success_nodes', models.ManyToManyField(related_name='parent_success_nodes', to='main.WorkflowNode', blank=True)),
|
||||||
('unified_job_template', models.ForeignKey(related_name='unified_jt_workflow_nodes', on_delete=django.db.models.deletion.SET_NULL, default=None, blank=True, to='main.UnifiedJobTemplate', null=True)),
|
('unified_job_template', models.ForeignKey(related_name='unified_jt_workflow_nodes', on_delete=django.db.models.deletion.SET_NULL, default=None, blank=True, to='main.UnifiedJobTemplate', null=True)),
|
||||||
('workflow_job_template', models.ForeignKey(related_name='workflow_nodes', to='main.WorkflowJobTemplate')),
|
('workflow_job', models.ForeignKey(related_name='workflow_job_nodes', on_delete=django.db.models.deletion.SET_NULL, default=None, blank=True, to='main.WorkflowJob', null=True)),
|
||||||
|
('workflow_job_template', models.ForeignKey(related_name='workflow_nodes', default=None, blank=True, to='main.WorkflowJobTemplate', null=True)),
|
||||||
],
|
],
|
||||||
),
|
),
|
||||||
migrations.AddField(
|
migrations.AddField(
|
||||||
@@ -1,20 +0,0 @@
|
|||||||
# -*- coding: utf-8 -*-
|
|
||||||
from __future__ import unicode_literals
|
|
||||||
|
|
||||||
from django.db import migrations, models
|
|
||||||
import django.db.models.deletion
|
|
||||||
|
|
||||||
|
|
||||||
class Migration(migrations.Migration):
|
|
||||||
|
|
||||||
dependencies = [
|
|
||||||
('main', '0033_v301_workflow_create'),
|
|
||||||
]
|
|
||||||
|
|
||||||
operations = [
|
|
||||||
migrations.AlterField(
|
|
||||||
model_name='workflownode',
|
|
||||||
name='job',
|
|
||||||
field=models.ForeignKey(related_name='workflow_job_nodes', on_delete=django.db.models.deletion.SET_NULL, default=None, blank=True, to='main.UnifiedJob', null=True),
|
|
||||||
),
|
|
||||||
]
|
|
||||||
@@ -1,19 +0,0 @@
|
|||||||
# -*- coding: utf-8 -*-
|
|
||||||
from __future__ import unicode_literals
|
|
||||||
|
|
||||||
from django.db import migrations, models
|
|
||||||
|
|
||||||
|
|
||||||
class Migration(migrations.Migration):
|
|
||||||
|
|
||||||
dependencies = [
|
|
||||||
('main', '0034_auto_20160830_1716'),
|
|
||||||
]
|
|
||||||
|
|
||||||
operations = [
|
|
||||||
migrations.AlterField(
|
|
||||||
model_name='workflownode',
|
|
||||||
name='workflow_job_template',
|
|
||||||
field=models.ForeignKey(related_name='workflow_nodes', default=None, blank=True, to='main.WorkflowJobTemplate', null=True),
|
|
||||||
),
|
|
||||||
]
|
|
||||||
@@ -1,25 +0,0 @@
|
|||||||
# -*- coding: utf-8 -*-
|
|
||||||
from __future__ import unicode_literals
|
|
||||||
|
|
||||||
from django.db import migrations, models
|
|
||||||
import django.db.models.deletion
|
|
||||||
|
|
||||||
|
|
||||||
class Migration(migrations.Migration):
|
|
||||||
|
|
||||||
dependencies = [
|
|
||||||
('main', '0035_auto_20160831_2008'),
|
|
||||||
]
|
|
||||||
|
|
||||||
operations = [
|
|
||||||
migrations.AddField(
|
|
||||||
model_name='workflownode',
|
|
||||||
name='workflow_job',
|
|
||||||
field=models.ForeignKey(related_name='workflow_job_nodes', on_delete=django.db.models.deletion.SET_NULL, default=None, blank=True, to='main.WorkflowJob', null=True),
|
|
||||||
),
|
|
||||||
migrations.AlterField(
|
|
||||||
model_name='workflownode',
|
|
||||||
name='job',
|
|
||||||
field=models.ForeignKey(related_name='unified_job_nodes', on_delete=django.db.models.deletion.SET_NULL, default=None, blank=True, to='main.UnifiedJob', null=True),
|
|
||||||
),
|
|
||||||
]
|
|
||||||
@@ -1674,12 +1674,11 @@ class RunWorkflowJob(BaseTask):
|
|||||||
# FIXME: Detect workflow run completion
|
# FIXME: Detect workflow run completion
|
||||||
while True:
|
while True:
|
||||||
dag = WorkflowDAG(instance)
|
dag = WorkflowDAG(instance)
|
||||||
print("Deciding if workflow is done")
|
|
||||||
if dag.is_workflow_done():
|
if dag.is_workflow_done():
|
||||||
# TODO: update with accurate finish status (i.e. canceled, error, etc.)
|
# TODO: update with accurate finish status (i.e. canceled, error, etc.)
|
||||||
instance = self.update_model(instance.pk, status='success')
|
instance = self.update_model(instance.pk, status='successful')
|
||||||
print("Workflow IS done")
|
break
|
||||||
return
|
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
instance.socketio_emit_status(instance.status)
|
||||||
# TODO: Handle cancel
|
# TODO: Handle cancel
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user