mirror of
https://github.com/ansible/awx.git
synced 2026-02-23 14:05:59 -03:30
Merge branch 'devel' of https://github.com/ansible/ansible-tower into devel
This commit is contained in:
@@ -2201,10 +2201,9 @@ class WorkflowJobTemplateSerializer(LabelsListMixin, UnifiedJobTemplateSerialize
|
|||||||
launch = reverse('api:workflow_job_template_launch', args=(obj.pk,)),
|
launch = reverse('api:workflow_job_template_launch', args=(obj.pk,)),
|
||||||
workflow_nodes = reverse('api:workflow_job_template_workflow_nodes_list', args=(obj.pk,)),
|
workflow_nodes = reverse('api:workflow_job_template_workflow_nodes_list', args=(obj.pk,)),
|
||||||
labels = reverse('api:workflow_job_template_label_list', args=(obj.pk,)),
|
labels = reverse('api:workflow_job_template_label_list', args=(obj.pk,)),
|
||||||
# TODO: Implement notifications
|
notification_templates_any = reverse('api:workflow_job_template_notification_templates_any_list', args=(obj.pk,)),
|
||||||
#notification_templates_any = reverse('api:system_job_template_notification_templates_any_list', args=(obj.pk,)),
|
notification_templates_success = reverse('api:workflow_job_template_notification_templates_success_list', args=(obj.pk,)),
|
||||||
#notification_templates_success = reverse('api:system_job_template_notification_templates_success_list', args=(obj.pk,)),
|
notification_templates_error = reverse('api:workflow_job_template_notification_templates_error_list', args=(obj.pk,)),
|
||||||
#notification_templates_error = reverse('api:system_job_template_notification_templates_error_list', args=(obj.pk,)),
|
|
||||||
survey_spec = reverse('api:workflow_job_template_survey_spec', args=(obj.pk,)),
|
survey_spec = reverse('api:workflow_job_template_survey_spec', args=(obj.pk,)),
|
||||||
))
|
))
|
||||||
return res
|
return res
|
||||||
@@ -2228,8 +2227,7 @@ class WorkflowJobSerializer(LabelsListMixin, UnifiedJobSerializer):
|
|||||||
if obj.workflow_job_template:
|
if obj.workflow_job_template:
|
||||||
res['workflow_job_template'] = reverse('api:workflow_job_template_detail',
|
res['workflow_job_template'] = reverse('api:workflow_job_template_detail',
|
||||||
args=(obj.workflow_job_template.pk,))
|
args=(obj.workflow_job_template.pk,))
|
||||||
# TODO:
|
res['notifications'] = reverse('api:workflow_job_notifications_list', args=(obj.pk,))
|
||||||
#res['notifications'] = reverse('api:system_job_notifications_list', args=(obj.pk,))
|
|
||||||
res['workflow_nodes'] = reverse('api:workflow_job_workflow_nodes_list', args=(obj.pk,))
|
res['workflow_nodes'] = reverse('api:workflow_job_workflow_nodes_list', args=(obj.pk,))
|
||||||
res['labels'] = reverse('api:workflow_job_label_list', args=(obj.pk,))
|
res['labels'] = reverse('api:workflow_job_label_list', args=(obj.pk,))
|
||||||
if obj.can_cancel or True:
|
if obj.can_cancel or True:
|
||||||
|
|||||||
@@ -265,6 +265,9 @@ workflow_job_template_urls = patterns('awx.api.views',
|
|||||||
url(r'^(?P<pk>[0-9]+)/schedules/$', 'workflow_job_template_schedules_list'),
|
url(r'^(?P<pk>[0-9]+)/schedules/$', 'workflow_job_template_schedules_list'),
|
||||||
url(r'^(?P<pk>[0-9]+)/survey_spec/$', 'workflow_job_template_survey_spec'),
|
url(r'^(?P<pk>[0-9]+)/survey_spec/$', 'workflow_job_template_survey_spec'),
|
||||||
url(r'^(?P<pk>[0-9]+)/workflow_nodes/$', 'workflow_job_template_workflow_nodes_list'),
|
url(r'^(?P<pk>[0-9]+)/workflow_nodes/$', 'workflow_job_template_workflow_nodes_list'),
|
||||||
|
url(r'^(?P<pk>[0-9]+)/notification_templates_any/$', 'workflow_job_template_notification_templates_any_list'),
|
||||||
|
url(r'^(?P<pk>[0-9]+)/notification_templates_error/$', 'workflow_job_template_notification_templates_error_list'),
|
||||||
|
url(r'^(?P<pk>[0-9]+)/notification_templates_success/$', 'workflow_job_template_notification_templates_success_list'),
|
||||||
url(r'^(?P<pk>[0-9]+)/labels/$', 'workflow_job_template_label_list'),
|
url(r'^(?P<pk>[0-9]+)/labels/$', 'workflow_job_template_label_list'),
|
||||||
# url(r'^(?P<pk>[0-9]+)/cancel/$', 'workflow_job_template_cancel'),
|
# url(r'^(?P<pk>[0-9]+)/cancel/$', 'workflow_job_template_cancel'),
|
||||||
)
|
)
|
||||||
@@ -275,7 +278,7 @@ workflow_job_urls = patterns('awx.api.views',
|
|||||||
url(r'^(?P<pk>[0-9]+)/workflow_nodes/$', 'workflow_job_workflow_nodes_list'),
|
url(r'^(?P<pk>[0-9]+)/workflow_nodes/$', 'workflow_job_workflow_nodes_list'),
|
||||||
url(r'^(?P<pk>[0-9]+)/labels/$', 'workflow_job_label_list'),
|
url(r'^(?P<pk>[0-9]+)/labels/$', 'workflow_job_label_list'),
|
||||||
url(r'^(?P<pk>[0-9]+)/cancel/$', 'workflow_job_cancel'),
|
url(r'^(?P<pk>[0-9]+)/cancel/$', 'workflow_job_cancel'),
|
||||||
#url(r'^(?P<pk>[0-9]+)/notifications/$', 'workflow_job_notifications_list'),
|
url(r'^(?P<pk>[0-9]+)/notifications/$', 'workflow_job_notifications_list'),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -2857,6 +2857,29 @@ class WorkflowJobTemplateSchedulesList(SubListCreateAttachDetachAPIView):
|
|||||||
parent_model = WorkflowJobTemplate
|
parent_model = WorkflowJobTemplate
|
||||||
relationship = 'schedules'
|
relationship = 'schedules'
|
||||||
parent_key = 'unified_job_template'
|
parent_key = 'unified_job_template'
|
||||||
|
|
||||||
|
class WorkflowJobTemplateNotificationTemplatesAnyList(SubListCreateAttachDetachAPIView):
|
||||||
|
|
||||||
|
model = NotificationTemplate
|
||||||
|
serializer_class = NotificationTemplateSerializer
|
||||||
|
parent_model = WorkflowJobTemplate
|
||||||
|
relationship = 'notification_templates_any'
|
||||||
|
new_in_310 = True
|
||||||
|
|
||||||
|
class WorkflowJobTemplateNotificationTemplatesErrorList(SubListCreateAttachDetachAPIView):
|
||||||
|
|
||||||
|
model = NotificationTemplate
|
||||||
|
serializer_class = NotificationTemplateSerializer
|
||||||
|
parent_model = WorkflowJobTemplate
|
||||||
|
relationship = 'notification_templates_error'
|
||||||
|
new_in_310 = True
|
||||||
|
|
||||||
|
class WorkflowJobTemplateNotificationTemplatesSuccessList(SubListCreateAttachDetachAPIView):
|
||||||
|
|
||||||
|
model = NotificationTemplate
|
||||||
|
serializer_class = NotificationTemplateSerializer
|
||||||
|
parent_model = WorkflowJobTemplate
|
||||||
|
relationship = 'notification_templates_success'
|
||||||
new_in_310 = True
|
new_in_310 = True
|
||||||
|
|
||||||
# TODO:
|
# TODO:
|
||||||
@@ -2900,6 +2923,14 @@ class WorkflowJobCancel(RetrieveAPIView):
|
|||||||
else:
|
else:
|
||||||
return self.http_method_not_allowed(request, *args, **kwargs)
|
return self.http_method_not_allowed(request, *args, **kwargs)
|
||||||
|
|
||||||
|
class WorkflowJobNotificationsList(SubListAPIView):
|
||||||
|
|
||||||
|
model = Notification
|
||||||
|
serializer_class = NotificationSerializer
|
||||||
|
parent_model = WorkflowJob
|
||||||
|
relationship = 'notifications'
|
||||||
|
new_in_310 = True
|
||||||
|
|
||||||
class SystemJobTemplateList(ListAPIView):
|
class SystemJobTemplateList(ListAPIView):
|
||||||
|
|
||||||
model = SystemJobTemplate
|
model = SystemJobTemplate
|
||||||
|
|||||||
@@ -13,7 +13,10 @@ from jsonfield import JSONField
|
|||||||
|
|
||||||
# AWX
|
# AWX
|
||||||
from awx.main.models import UnifiedJobTemplate, UnifiedJob
|
from awx.main.models import UnifiedJobTemplate, UnifiedJob
|
||||||
from awx.main.models.notifications import JobNotificationMixin
|
from awx.main.models.notifications import (
|
||||||
|
NotificationTemplate,
|
||||||
|
JobNotificationMixin
|
||||||
|
)
|
||||||
from awx.main.models.base import BaseModel, CreatedModifiedModel, VarsDictProperty
|
from awx.main.models.base import BaseModel, CreatedModifiedModel, VarsDictProperty
|
||||||
from awx.main.models.rbac import (
|
from awx.main.models.rbac import (
|
||||||
ROLE_SINGLETON_SYSTEM_ADMINISTRATOR,
|
ROLE_SINGLETON_SYSTEM_ADMINISTRATOR,
|
||||||
@@ -306,7 +309,18 @@ class WorkflowJobTemplate(UnifiedJobTemplate, WorkflowJobOptions, SurveyJobTempl
|
|||||||
# TODO: don't allow running of job template if same workflow template running
|
# TODO: don't allow running of job template if same workflow template running
|
||||||
return False
|
return False
|
||||||
|
|
||||||
# TODO: Notifications
|
@property
|
||||||
|
def notification_templates(self):
|
||||||
|
base_notification_templates = NotificationTemplate.objects.all()
|
||||||
|
error_notification_templates = list(base_notification_templates
|
||||||
|
.filter(unifiedjobtemplate_notification_templates_for_errors__in=[self]))
|
||||||
|
success_notification_templates = list(base_notification_templates
|
||||||
|
.filter(unifiedjobtemplate_notification_templates_for_success__in=[self]))
|
||||||
|
any_notification_templates = list(base_notification_templates
|
||||||
|
.filter(unifiedjobtemplate_notification_templates_for_any__in=[self]))
|
||||||
|
return dict(error=list(error_notification_templates),
|
||||||
|
success=list(success_notification_templates),
|
||||||
|
any=list(any_notification_templates))
|
||||||
# TODO: Surveys
|
# TODO: Surveys
|
||||||
|
|
||||||
#def create_job(self, **kwargs):
|
#def create_job(self, **kwargs):
|
||||||
@@ -429,6 +443,19 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, SurveyJobMixin, JobNotificatio
|
|||||||
def get_absolute_url(self):
|
def get_absolute_url(self):
|
||||||
return reverse('api:workflow_job_detail', args=(self.pk,))
|
return reverse('api:workflow_job_detail', args=(self.pk,))
|
||||||
|
|
||||||
|
def notification_data(self):
|
||||||
|
result = super(WorkflowJob, self).notification_data()
|
||||||
|
str_arr = ['Workflow job summary:', '']
|
||||||
|
for node in self.workflow_job_nodes.all().select_related('job'):
|
||||||
|
if node.job is None:
|
||||||
|
node_job_description = 'no job.'
|
||||||
|
else:
|
||||||
|
node_job_description = ('job #{0}, "{1}", which finished with status {2}.'
|
||||||
|
.format(node.job.id, node.job.name, node.job.status))
|
||||||
|
str_arr.append("- node #{0} spawns {1}".format(node.id, node_job_description))
|
||||||
|
result['body'] = '\n'.join(str_arr)
|
||||||
|
return result
|
||||||
|
|
||||||
# TODO: Ask UI if this is needed ?
|
# TODO: Ask UI if this is needed ?
|
||||||
#def get_ui_url(self):
|
#def get_ui_url(self):
|
||||||
# return urlparse.urljoin(tower_settings.TOWER_URL_BASE, "/#/workflow_jobs/{}".format(self.pk))
|
# return urlparse.urljoin(tower_settings.TOWER_URL_BASE, "/#/workflow_jobs/{}".format(self.pk))
|
||||||
@@ -437,11 +464,9 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, SurveyJobMixin, JobNotificatio
|
|||||||
def task_impact(self):
|
def task_impact(self):
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
# TODO: workflow job notifications
|
|
||||||
def get_notification_templates(self):
|
def get_notification_templates(self):
|
||||||
return []
|
return self.workflow_job_template.notification_templates
|
||||||
|
|
||||||
# TODO: workflow job notifications
|
|
||||||
def get_notification_friendly_name(self):
|
def get_notification_friendly_name(self):
|
||||||
return "Workflow Job"
|
return "Workflow Job"
|
||||||
|
|
||||||
|
|||||||
@@ -28,6 +28,7 @@ from awx.main.scheduler.partial import (
|
|||||||
AdHocCommandDict,
|
AdHocCommandDict,
|
||||||
WorkflowJobDict,
|
WorkflowJobDict,
|
||||||
)
|
)
|
||||||
|
from awx.main.tasks import _send_notification_templates
|
||||||
|
|
||||||
# Celery
|
# Celery
|
||||||
from celery.task.control import inspect
|
from celery.task.control import inspect
|
||||||
@@ -128,6 +129,7 @@ class TaskManager():
|
|||||||
|
|
||||||
# See comment in tasks.py::RunWorkflowJob::run()
|
# See comment in tasks.py::RunWorkflowJob::run()
|
||||||
def process_finished_workflow_jobs(self, workflow_jobs):
|
def process_finished_workflow_jobs(self, workflow_jobs):
|
||||||
|
result = []
|
||||||
for workflow_job in workflow_jobs:
|
for workflow_job in workflow_jobs:
|
||||||
dag = WorkflowDAG(workflow_job)
|
dag = WorkflowDAG(workflow_job)
|
||||||
if workflow_job.cancel_flag:
|
if workflow_job.cancel_flag:
|
||||||
@@ -136,12 +138,14 @@ class TaskManager():
|
|||||||
dag.cancel_node_jobs()
|
dag.cancel_node_jobs()
|
||||||
connection.on_commit(lambda: workflow_job.websocket_emit_status(workflow_job.status))
|
connection.on_commit(lambda: workflow_job.websocket_emit_status(workflow_job.status))
|
||||||
elif dag.is_workflow_done():
|
elif dag.is_workflow_done():
|
||||||
|
result.append(workflow_job.id)
|
||||||
if workflow_job._has_failed():
|
if workflow_job._has_failed():
|
||||||
workflow_job.status = 'failed'
|
workflow_job.status = 'failed'
|
||||||
else:
|
else:
|
||||||
workflow_job.status = 'successful'
|
workflow_job.status = 'successful'
|
||||||
workflow_job.save()
|
workflow_job.save()
|
||||||
connection.on_commit(lambda: workflow_job.websocket_emit_status(workflow_job.status))
|
connection.on_commit(lambda: workflow_job.websocket_emit_status(workflow_job.status))
|
||||||
|
return result
|
||||||
|
|
||||||
def get_active_tasks(self):
|
def get_active_tasks(self):
|
||||||
inspector = inspect()
|
inspector = inspect()
|
||||||
@@ -239,7 +243,6 @@ class TaskManager():
|
|||||||
|
|
||||||
for inventory_source_task in self.graph.get_inventory_sources(task['inventory_id']):
|
for inventory_source_task in self.graph.get_inventory_sources(task['inventory_id']):
|
||||||
if inventory_source_task['id'] in inventory_sources_already_updated:
|
if inventory_source_task['id'] in inventory_sources_already_updated:
|
||||||
print("Inventory already updated")
|
|
||||||
continue
|
continue
|
||||||
if self.graph.should_update_related_inventory_source(task, inventory_source_task['id']):
|
if self.graph.should_update_related_inventory_source(task, inventory_source_task['id']):
|
||||||
inventory_task = self.create_inventory_update(task, inventory_source_task)
|
inventory_task = self.create_inventory_update(task, inventory_source_task)
|
||||||
@@ -312,6 +315,8 @@ class TaskManager():
|
|||||||
self.capacity_used += t.task_impact()
|
self.capacity_used += t.task_impact()
|
||||||
|
|
||||||
def would_exceed_capacity(self, task):
|
def would_exceed_capacity(self, task):
|
||||||
|
if self.capacity_used == 0:
|
||||||
|
return False
|
||||||
return (task.task_impact() + self.capacity_used > self.capacity_total)
|
return (task.task_impact() + self.capacity_used > self.capacity_total)
|
||||||
|
|
||||||
def consume_capacity(self, task):
|
def consume_capacity(self, task):
|
||||||
@@ -332,6 +337,7 @@ class TaskManager():
|
|||||||
self.process_pending_tasks(pending_tasks)
|
self.process_pending_tasks(pending_tasks)
|
||||||
|
|
||||||
def _schedule(self):
|
def _schedule(self):
|
||||||
|
finished_wfjs = []
|
||||||
all_sorted_tasks = self.get_tasks()
|
all_sorted_tasks = self.get_tasks()
|
||||||
if len(all_sorted_tasks) > 0:
|
if len(all_sorted_tasks) > 0:
|
||||||
latest_project_updates = self.get_latest_project_update_tasks(all_sorted_tasks)
|
latest_project_updates = self.get_latest_project_update_tasks(all_sorted_tasks)
|
||||||
@@ -344,11 +350,12 @@ class TaskManager():
|
|||||||
self.process_inventory_sources(inventory_id_sources)
|
self.process_inventory_sources(inventory_id_sources)
|
||||||
|
|
||||||
running_workflow_tasks = self.get_running_workflow_jobs()
|
running_workflow_tasks = self.get_running_workflow_jobs()
|
||||||
self.process_finished_workflow_jobs(running_workflow_tasks)
|
finished_wfjs = self.process_finished_workflow_jobs(running_workflow_tasks)
|
||||||
|
|
||||||
self.spawn_workflow_graph_jobs(running_workflow_tasks)
|
self.spawn_workflow_graph_jobs(running_workflow_tasks)
|
||||||
|
|
||||||
self.process_tasks(all_sorted_tasks)
|
self.process_tasks(all_sorted_tasks)
|
||||||
|
return finished_wfjs
|
||||||
|
|
||||||
def schedule(self):
|
def schedule(self):
|
||||||
with transaction.atomic():
|
with transaction.atomic():
|
||||||
@@ -358,6 +365,9 @@ class TaskManager():
|
|||||||
except DatabaseError:
|
except DatabaseError:
|
||||||
return
|
return
|
||||||
|
|
||||||
self._schedule()
|
finished_wfjs = self._schedule()
|
||||||
|
|
||||||
|
# Operations whose queries rely on modifications made during the atomic scheduling session
|
||||||
|
for wfj in WorkflowJob.objects.filter(id__in=finished_wfjs):
|
||||||
|
_send_notification_templates(wfj, 'succeeded' if wfj.status == 'successful' else 'failed')
|
||||||
|
|
||||||
|
|||||||
@@ -22,7 +22,7 @@ def epoch():
|
|||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def scheduler_factory(mocker, epoch):
|
def scheduler_factory(mocker, epoch):
|
||||||
mocker.patch('awx.main.models.Instance.objects.total_capacity', return_value=999999999)
|
mocker.patch('awx.main.models.Instance.objects.total_capacity', return_value=10000)
|
||||||
|
|
||||||
def fn(tasks=[], inventory_sources=[], latest_project_updates=[], latest_inventory_updates=[], create_project_update=None, create_inventory_update=None):
|
def fn(tasks=[], inventory_sources=[], latest_project_updates=[], latest_inventory_updates=[], create_project_update=None, create_inventory_update=None):
|
||||||
sched = TaskManager()
|
sched = TaskManager()
|
||||||
@@ -190,15 +190,15 @@ Job
|
|||||||
'''
|
'''
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def job_factory(epoch):
|
def job_factory(epoch):
|
||||||
def fn(project__scm_update_on_launch=True, inventory__inventory_sources=[]):
|
def fn(id=1, project__scm_update_on_launch=True, inventory__inventory_sources=[], allow_simultaneous=False):
|
||||||
return JobDict({
|
return JobDict({
|
||||||
'id': 1,
|
'id': id,
|
||||||
'status': 'pending',
|
'status': 'pending',
|
||||||
'job_template_id': 1,
|
'job_template_id': 1,
|
||||||
'project_id': 1,
|
'project_id': 1,
|
||||||
'inventory_id': 1,
|
'inventory_id': 1,
|
||||||
'launch_type': 'manual',
|
'launch_type': 'manual',
|
||||||
'allow_simultaneous': False,
|
'allow_simultaneous': allow_simultaneous,
|
||||||
'created': epoch - timedelta(seconds=99),
|
'created': epoch - timedelta(seconds=99),
|
||||||
'celery_task_id': '',
|
'celery_task_id': '',
|
||||||
'project__scm_update_on_launch': project__scm_update_on_launch,
|
'project__scm_update_on_launch': project__scm_update_on_launch,
|
||||||
|
|||||||
@@ -54,3 +54,31 @@ class TestJob():
|
|||||||
|
|
||||||
scheduler.start_task.assert_called_with(pending_job)
|
scheduler.start_task.assert_called_with(pending_job)
|
||||||
|
|
||||||
|
class TestCapacity():
|
||||||
|
@pytest.fixture
|
||||||
|
def pending_job_high_impact(self, mocker, job_factory):
|
||||||
|
pending_job = job_factory(project__scm_update_on_launch=False)
|
||||||
|
mocker.patch.object(pending_job, 'task_impact', return_value=10001)
|
||||||
|
return pending_job
|
||||||
|
|
||||||
|
def test_no_capacity(self, scheduler_factory, pending_job_high_impact):
|
||||||
|
scheduler = scheduler_factory(tasks=[pending_job_high_impact])
|
||||||
|
|
||||||
|
scheduler._schedule()
|
||||||
|
|
||||||
|
scheduler.start_task.assert_called_with(pending_job_high_impact)
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def pending_jobs_impactful(self, mocker, job_factory):
|
||||||
|
pending_jobs = [job_factory(id=i + 1, project__scm_update_on_launch=False, allow_simultaneous=True) for i in xrange(0, 3)]
|
||||||
|
map(lambda pending_job: mocker.patch.object(pending_job, 'task_impact', return_value=10), pending_jobs)
|
||||||
|
return pending_jobs
|
||||||
|
|
||||||
|
def test_capacity_exhausted(self, mocker, scheduler_factory, pending_jobs_impactful):
|
||||||
|
scheduler = scheduler_factory(tasks=pending_jobs_impactful)
|
||||||
|
|
||||||
|
scheduler._schedule()
|
||||||
|
|
||||||
|
calls = [mocker.call(job) for job in pending_jobs_impactful]
|
||||||
|
scheduler.start_task.assert_has_calls(calls)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user