diff --git a/awx/api/serializers.py b/awx/api/serializers.py index c8a7b98cfc..986f5c8c1e 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -2201,10 +2201,9 @@ class WorkflowJobTemplateSerializer(LabelsListMixin, UnifiedJobTemplateSerialize launch = reverse('api:workflow_job_template_launch', args=(obj.pk,)), workflow_nodes = reverse('api:workflow_job_template_workflow_nodes_list', args=(obj.pk,)), labels = reverse('api:workflow_job_template_label_list', args=(obj.pk,)), - # TODO: Implement notifications - #notification_templates_any = reverse('api:system_job_template_notification_templates_any_list', args=(obj.pk,)), - #notification_templates_success = reverse('api:system_job_template_notification_templates_success_list', args=(obj.pk,)), - #notification_templates_error = reverse('api:system_job_template_notification_templates_error_list', args=(obj.pk,)), + notification_templates_any = reverse('api:workflow_job_template_notification_templates_any_list', args=(obj.pk,)), + notification_templates_success = reverse('api:workflow_job_template_notification_templates_success_list', args=(obj.pk,)), + notification_templates_error = reverse('api:workflow_job_template_notification_templates_error_list', args=(obj.pk,)), survey_spec = reverse('api:workflow_job_template_survey_spec', args=(obj.pk,)), )) return res @@ -2228,8 +2227,7 @@ class WorkflowJobSerializer(LabelsListMixin, UnifiedJobSerializer): if obj.workflow_job_template: res['workflow_job_template'] = reverse('api:workflow_job_template_detail', args=(obj.workflow_job_template.pk,)) - # TODO: - #res['notifications'] = reverse('api:system_job_notifications_list', args=(obj.pk,)) + res['notifications'] = reverse('api:workflow_job_notifications_list', args=(obj.pk,)) res['workflow_nodes'] = reverse('api:workflow_job_workflow_nodes_list', args=(obj.pk,)) res['labels'] = reverse('api:workflow_job_label_list', args=(obj.pk,)) if obj.can_cancel or True: diff --git a/awx/api/urls.py b/awx/api/urls.py index 4699d9f2bf..8e349008e6 100644 --- a/awx/api/urls.py +++ b/awx/api/urls.py @@ -265,6 +265,9 @@ workflow_job_template_urls = patterns('awx.api.views', url(r'^(?P[0-9]+)/schedules/$', 'workflow_job_template_schedules_list'), url(r'^(?P[0-9]+)/survey_spec/$', 'workflow_job_template_survey_spec'), url(r'^(?P[0-9]+)/workflow_nodes/$', 'workflow_job_template_workflow_nodes_list'), + url(r'^(?P[0-9]+)/notification_templates_any/$', 'workflow_job_template_notification_templates_any_list'), + url(r'^(?P[0-9]+)/notification_templates_error/$', 'workflow_job_template_notification_templates_error_list'), + url(r'^(?P[0-9]+)/notification_templates_success/$', 'workflow_job_template_notification_templates_success_list'), url(r'^(?P[0-9]+)/labels/$', 'workflow_job_template_label_list'), # url(r'^(?P[0-9]+)/cancel/$', 'workflow_job_template_cancel'), ) @@ -275,7 +278,7 @@ workflow_job_urls = patterns('awx.api.views', url(r'^(?P[0-9]+)/workflow_nodes/$', 'workflow_job_workflow_nodes_list'), url(r'^(?P[0-9]+)/labels/$', 'workflow_job_label_list'), url(r'^(?P[0-9]+)/cancel/$', 'workflow_job_cancel'), - #url(r'^(?P[0-9]+)/notifications/$', 'workflow_job_notifications_list'), + url(r'^(?P[0-9]+)/notifications/$', 'workflow_job_notifications_list'), ) diff --git a/awx/api/views.py b/awx/api/views.py index ba76e777b3..390fc0194a 100644 --- a/awx/api/views.py +++ b/awx/api/views.py @@ -2857,6 +2857,29 @@ class WorkflowJobTemplateSchedulesList(SubListCreateAttachDetachAPIView): parent_model = WorkflowJobTemplate relationship = 'schedules' parent_key = 'unified_job_template' + +class WorkflowJobTemplateNotificationTemplatesAnyList(SubListCreateAttachDetachAPIView): + + model = NotificationTemplate + serializer_class = NotificationTemplateSerializer + parent_model = WorkflowJobTemplate + relationship = 'notification_templates_any' + new_in_310 = True + +class WorkflowJobTemplateNotificationTemplatesErrorList(SubListCreateAttachDetachAPIView): + + model = NotificationTemplate + serializer_class = NotificationTemplateSerializer + parent_model = WorkflowJobTemplate + relationship = 'notification_templates_error' + new_in_310 = True + +class WorkflowJobTemplateNotificationTemplatesSuccessList(SubListCreateAttachDetachAPIView): + + model = NotificationTemplate + serializer_class = NotificationTemplateSerializer + parent_model = WorkflowJobTemplate + relationship = 'notification_templates_success' new_in_310 = True # TODO: @@ -2900,6 +2923,14 @@ class WorkflowJobCancel(RetrieveAPIView): else: return self.http_method_not_allowed(request, *args, **kwargs) +class WorkflowJobNotificationsList(SubListAPIView): + + model = Notification + serializer_class = NotificationSerializer + parent_model = WorkflowJob + relationship = 'notifications' + new_in_310 = True + class SystemJobTemplateList(ListAPIView): model = SystemJobTemplate diff --git a/awx/main/models/workflow.py b/awx/main/models/workflow.py index 0a9c081680..6d99be326f 100644 --- a/awx/main/models/workflow.py +++ b/awx/main/models/workflow.py @@ -13,7 +13,10 @@ from jsonfield import JSONField # AWX from awx.main.models import UnifiedJobTemplate, UnifiedJob -from awx.main.models.notifications import JobNotificationMixin +from awx.main.models.notifications import ( + NotificationTemplate, + JobNotificationMixin +) from awx.main.models.base import BaseModel, CreatedModifiedModel, VarsDictProperty from awx.main.models.rbac import ( ROLE_SINGLETON_SYSTEM_ADMINISTRATOR, @@ -306,7 +309,18 @@ class WorkflowJobTemplate(UnifiedJobTemplate, WorkflowJobOptions, SurveyJobTempl # TODO: don't allow running of job template if same workflow template running return False - # TODO: Notifications + @property + def notification_templates(self): + base_notification_templates = NotificationTemplate.objects.all() + error_notification_templates = list(base_notification_templates + .filter(unifiedjobtemplate_notification_templates_for_errors__in=[self])) + success_notification_templates = list(base_notification_templates + .filter(unifiedjobtemplate_notification_templates_for_success__in=[self])) + any_notification_templates = list(base_notification_templates + .filter(unifiedjobtemplate_notification_templates_for_any__in=[self])) + return dict(error=list(error_notification_templates), + success=list(success_notification_templates), + any=list(any_notification_templates)) # TODO: Surveys #def create_job(self, **kwargs): @@ -429,6 +443,19 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, SurveyJobMixin, JobNotificatio def get_absolute_url(self): return reverse('api:workflow_job_detail', args=(self.pk,)) + def notification_data(self): + result = super(WorkflowJob, self).notification_data() + str_arr = ['Workflow job summary:', ''] + for node in self.workflow_job_nodes.all().select_related('job'): + if node.job is None: + node_job_description = 'no job.' + else: + node_job_description = ('job #{0}, "{1}", which finished with status {2}.' + .format(node.job.id, node.job.name, node.job.status)) + str_arr.append("- node #{0} spawns {1}".format(node.id, node_job_description)) + result['body'] = '\n'.join(str_arr) + return result + # TODO: Ask UI if this is needed ? #def get_ui_url(self): # return urlparse.urljoin(tower_settings.TOWER_URL_BASE, "/#/workflow_jobs/{}".format(self.pk)) @@ -437,11 +464,9 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, SurveyJobMixin, JobNotificatio def task_impact(self): return 0 - # TODO: workflow job notifications def get_notification_templates(self): - return [] + return self.workflow_job_template.notification_templates - # TODO: workflow job notifications def get_notification_friendly_name(self): return "Workflow Job" diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index 8f6e1b9617..16ca577d6c 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -28,6 +28,7 @@ from awx.main.scheduler.partial import ( AdHocCommandDict, WorkflowJobDict, ) +from awx.main.tasks import _send_notification_templates # Celery from celery.task.control import inspect @@ -128,6 +129,7 @@ class TaskManager(): # See comment in tasks.py::RunWorkflowJob::run() def process_finished_workflow_jobs(self, workflow_jobs): + result = [] for workflow_job in workflow_jobs: dag = WorkflowDAG(workflow_job) if workflow_job.cancel_flag: @@ -136,12 +138,14 @@ class TaskManager(): dag.cancel_node_jobs() connection.on_commit(lambda: workflow_job.websocket_emit_status(workflow_job.status)) elif dag.is_workflow_done(): + result.append(workflow_job.id) if workflow_job._has_failed(): workflow_job.status = 'failed' else: workflow_job.status = 'successful' workflow_job.save() connection.on_commit(lambda: workflow_job.websocket_emit_status(workflow_job.status)) + return result def get_active_tasks(self): inspector = inspect() @@ -239,7 +243,6 @@ class TaskManager(): for inventory_source_task in self.graph.get_inventory_sources(task['inventory_id']): if inventory_source_task['id'] in inventory_sources_already_updated: - print("Inventory already updated") continue if self.graph.should_update_related_inventory_source(task, inventory_source_task['id']): inventory_task = self.create_inventory_update(task, inventory_source_task) @@ -312,6 +315,8 @@ class TaskManager(): self.capacity_used += t.task_impact() def would_exceed_capacity(self, task): + if self.capacity_used == 0: + return False return (task.task_impact() + self.capacity_used > self.capacity_total) def consume_capacity(self, task): @@ -332,6 +337,7 @@ class TaskManager(): self.process_pending_tasks(pending_tasks) def _schedule(self): + finished_wfjs = [] all_sorted_tasks = self.get_tasks() if len(all_sorted_tasks) > 0: latest_project_updates = self.get_latest_project_update_tasks(all_sorted_tasks) @@ -344,11 +350,12 @@ class TaskManager(): self.process_inventory_sources(inventory_id_sources) running_workflow_tasks = self.get_running_workflow_jobs() - self.process_finished_workflow_jobs(running_workflow_tasks) + finished_wfjs = self.process_finished_workflow_jobs(running_workflow_tasks) self.spawn_workflow_graph_jobs(running_workflow_tasks) self.process_tasks(all_sorted_tasks) + return finished_wfjs def schedule(self): with transaction.atomic(): @@ -358,6 +365,9 @@ class TaskManager(): except DatabaseError: return - self._schedule() + finished_wfjs = self._schedule() + # Operations whose queries rely on modifications made during the atomic scheduling session + for wfj in WorkflowJob.objects.filter(id__in=finished_wfjs): + _send_notification_templates(wfj, 'succeeded' if wfj.status == 'successful' else 'failed') diff --git a/awx/main/tests/unit/scheduler/conftest.py b/awx/main/tests/unit/scheduler/conftest.py index 97eec0dbb4..42f2b59cd3 100644 --- a/awx/main/tests/unit/scheduler/conftest.py +++ b/awx/main/tests/unit/scheduler/conftest.py @@ -22,7 +22,7 @@ def epoch(): @pytest.fixture def scheduler_factory(mocker, epoch): - mocker.patch('awx.main.models.Instance.objects.total_capacity', return_value=999999999) + mocker.patch('awx.main.models.Instance.objects.total_capacity', return_value=10000) def fn(tasks=[], inventory_sources=[], latest_project_updates=[], latest_inventory_updates=[], create_project_update=None, create_inventory_update=None): sched = TaskManager() @@ -190,15 +190,15 @@ Job ''' @pytest.fixture def job_factory(epoch): - def fn(project__scm_update_on_launch=True, inventory__inventory_sources=[]): + def fn(id=1, project__scm_update_on_launch=True, inventory__inventory_sources=[], allow_simultaneous=False): return JobDict({ - 'id': 1, + 'id': id, 'status': 'pending', 'job_template_id': 1, 'project_id': 1, 'inventory_id': 1, 'launch_type': 'manual', - 'allow_simultaneous': False, + 'allow_simultaneous': allow_simultaneous, 'created': epoch - timedelta(seconds=99), 'celery_task_id': '', 'project__scm_update_on_launch': project__scm_update_on_launch, diff --git a/awx/main/tests/unit/scheduler/test_scheduler_job.py b/awx/main/tests/unit/scheduler/test_scheduler_job.py index 735ce04d95..5d045efaec 100644 --- a/awx/main/tests/unit/scheduler/test_scheduler_job.py +++ b/awx/main/tests/unit/scheduler/test_scheduler_job.py @@ -54,3 +54,31 @@ class TestJob(): scheduler.start_task.assert_called_with(pending_job) +class TestCapacity(): + @pytest.fixture + def pending_job_high_impact(self, mocker, job_factory): + pending_job = job_factory(project__scm_update_on_launch=False) + mocker.patch.object(pending_job, 'task_impact', return_value=10001) + return pending_job + + def test_no_capacity(self, scheduler_factory, pending_job_high_impact): + scheduler = scheduler_factory(tasks=[pending_job_high_impact]) + + scheduler._schedule() + + scheduler.start_task.assert_called_with(pending_job_high_impact) + + @pytest.fixture + def pending_jobs_impactful(self, mocker, job_factory): + pending_jobs = [job_factory(id=i + 1, project__scm_update_on_launch=False, allow_simultaneous=True) for i in xrange(0, 3)] + map(lambda pending_job: mocker.patch.object(pending_job, 'task_impact', return_value=10), pending_jobs) + return pending_jobs + + def test_capacity_exhausted(self, mocker, scheduler_factory, pending_jobs_impactful): + scheduler = scheduler_factory(tasks=pending_jobs_impactful) + + scheduler._schedule() + + calls = [mocker.call(job) for job in pending_jobs_impactful] + scheduler.start_task.assert_has_calls(calls) +