From 454b3edb7c96995942716935e43005c25a20abca Mon Sep 17 00:00:00 2001 From: Chris Meyers Date: Thu, 27 Oct 2016 16:31:47 -0400 Subject: [PATCH] rectify celery<->db inconsistent running job --- awx/main/scheduler/__init__.py | 51 +++++++++++++++++----------------- awx/main/scheduler/tasks.py | 10 +++---- 2 files changed, 30 insertions(+), 31 deletions(-) diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index cf5fbecddc..21fe954546 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -54,6 +54,22 @@ class Scheduler(): key=lambda task: task['created']) return all_actions + ''' + Tasks that are running and SHOULD have a celery task. + ''' + def get_running_tasks(self): + status_list = ('running',) + + jobs = JobDict.filter_partial(status=status_list) + inventory_updates = InventoryUpdateDict.filter_partial(status=status_list) + project_updates = ProjectUpdateDict.filter_partial(status=status_list) + system_jobs = SystemJobDict.filter_partial(status=status_list) + ad_hoc_commands = AdHocCommandDict.filter_partial(status=status_list) + + all_actions = sorted(jobs + project_updates + inventory_updates + system_jobs + ad_hoc_commands, + key=lambda task: task['created']) + return all_actions + # TODO: Consider a database query for this logic def get_latest_project_update_tasks(self, all_sorted_tasks): project_ids = Set() @@ -121,7 +137,7 @@ class Scheduler(): workflow_job.save() connection.on_commit(lambda: workflow_job.websocket_emit_status(workflow_job.status)) - def get_activate_tasks(self): + def get_active_tasks(self): inspector = inspect() if not hasattr(settings, 'IGNORE_CELERY_INSPECTOR'): active_task_queues = inspector.active() @@ -129,10 +145,10 @@ class Scheduler(): logger.warn("Ignoring celery task inspector") active_task_queues = None - active_tasks = [] + active_tasks = set() if active_task_queues is not None: for queue in active_task_queues: - active_tasks += [at['id'] for at in active_task_queues[queue]] + map(lambda at: active_tasks.add(at['id']), active_task_queues[queue]) else: logger.error("Could not communicate with celery!") # TODO: Something needs to be done here to signal to the system @@ -274,10 +290,11 @@ class Scheduler(): if self.get_remaining_capacity() <= 0: return - def fail_inconsistent_running_jobs(self, active_tasks, all_sorted_tasks): - for i, task in enumerate(all_sorted_tasks): - if task['status'] != 'running': - continue + def process_celery_tasks(self, active_tasks, all_running_sorted_tasks): + ''' + Rectify tower db <-> celery inconsistent view of jobs state + ''' + for task in all_running_sorted_tasks: if (task['celery_task_id'] not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')): # NOTE: Pull status again and make sure it didn't finish in @@ -290,20 +307,11 @@ class Scheduler(): 'Celery, so it has been marked as failed.', )) task_obj.save() + print("Going to fail %s" % task_obj.id) connection.on_commit(lambda: task_obj.websocket_emit_status('failed')) - all_sorted_tasks.pop(i) logger.error("Task %s appears orphaned... marking as failed" % task) - def process_celery_tasks(self, active_tasks, all_sorted_tasks): - - ''' - Rectify tower db <-> celery inconsistent view of jobs state - ''' - # Check running tasks and make sure they are active in celery - logger.debug("Active celery tasks: " + str(active_tasks)) - all_sorted_tasks = self.fail_inconsistent_running_jobs(active_tasks, - all_sorted_tasks) def calculate_capacity_used(self, tasks): self.capacity_used = 0 @@ -361,15 +369,6 @@ class Scheduler(): except DatabaseError: return - ''' - Get tasks known by celery - ''' - ''' - active_tasks = self.get_activate_tasks() - # Communication with celery failed :(, return - if active_tasks is None: - return None - ''' self._schedule() diff --git a/awx/main/scheduler/tasks.py b/awx/main/scheduler/tasks.py index ba1ddaeecc..2b35b5ab64 100644 --- a/awx/main/scheduler/tasks.py +++ b/awx/main/scheduler/tasks.py @@ -33,19 +33,19 @@ def run_scheduler(): @task def run_fail_inconsistent_running_jobs(): - return - print("run_fail_inconsistent_running_jobs() running") with transaction.atomic(): # Lock try: Instance.objects.select_for_update(nowait=True).all()[0] scheduler = Scheduler() - active_tasks = scheduler.get_activate_tasks() + active_tasks = scheduler.get_active_tasks() + if active_tasks is None: + # TODO: Failed to contact celery. We should surface this. return None - all_sorted_tasks = scheduler.get_tasks() - scheduler.process_celery_tasks(active_tasks, all_sorted_tasks) + all_running_sorted_tasks = scheduler.get_running_tasks() + scheduler.process_celery_tasks(active_tasks, all_running_sorted_tasks) except DatabaseError: return