rectify celery<->db inconsistent running job

This commit is contained in:
Chris Meyers 2016-10-27 16:31:47 -04:00
parent 4ef4b4709b
commit 454b3edb7c
2 changed files with 30 additions and 31 deletions

View File

@ -54,6 +54,22 @@ class Scheduler():
key=lambda task: task['created'])
return all_actions
'''
Tasks that are running and SHOULD have a celery task.
'''
def get_running_tasks(self):
status_list = ('running',)
jobs = JobDict.filter_partial(status=status_list)
inventory_updates = InventoryUpdateDict.filter_partial(status=status_list)
project_updates = ProjectUpdateDict.filter_partial(status=status_list)
system_jobs = SystemJobDict.filter_partial(status=status_list)
ad_hoc_commands = AdHocCommandDict.filter_partial(status=status_list)
all_actions = sorted(jobs + project_updates + inventory_updates + system_jobs + ad_hoc_commands,
key=lambda task: task['created'])
return all_actions
# TODO: Consider a database query for this logic
def get_latest_project_update_tasks(self, all_sorted_tasks):
project_ids = Set()
@ -121,7 +137,7 @@ class Scheduler():
workflow_job.save()
connection.on_commit(lambda: workflow_job.websocket_emit_status(workflow_job.status))
def get_activate_tasks(self):
def get_active_tasks(self):
inspector = inspect()
if not hasattr(settings, 'IGNORE_CELERY_INSPECTOR'):
active_task_queues = inspector.active()
@ -129,10 +145,10 @@ class Scheduler():
logger.warn("Ignoring celery task inspector")
active_task_queues = None
active_tasks = []
active_tasks = set()
if active_task_queues is not None:
for queue in active_task_queues:
active_tasks += [at['id'] for at in active_task_queues[queue]]
map(lambda at: active_tasks.add(at['id']), active_task_queues[queue])
else:
logger.error("Could not communicate with celery!")
# TODO: Something needs to be done here to signal to the system
@ -274,10 +290,11 @@ class Scheduler():
if self.get_remaining_capacity() <= 0:
return
def fail_inconsistent_running_jobs(self, active_tasks, all_sorted_tasks):
for i, task in enumerate(all_sorted_tasks):
if task['status'] != 'running':
continue
def process_celery_tasks(self, active_tasks, all_running_sorted_tasks):
'''
Rectify tower db <-> celery inconsistent view of jobs state
'''
for task in all_running_sorted_tasks:
if (task['celery_task_id'] not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')):
# NOTE: Pull status again and make sure it didn't finish in
@ -290,20 +307,11 @@ class Scheduler():
'Celery, so it has been marked as failed.',
))
task_obj.save()
print("Going to fail %s" % task_obj.id)
connection.on_commit(lambda: task_obj.websocket_emit_status('failed'))
all_sorted_tasks.pop(i)
logger.error("Task %s appears orphaned... marking as failed" % task)
def process_celery_tasks(self, active_tasks, all_sorted_tasks):
'''
Rectify tower db <-> celery inconsistent view of jobs state
'''
# Check running tasks and make sure they are active in celery
logger.debug("Active celery tasks: " + str(active_tasks))
all_sorted_tasks = self.fail_inconsistent_running_jobs(active_tasks,
all_sorted_tasks)
def calculate_capacity_used(self, tasks):
self.capacity_used = 0
@ -361,15 +369,6 @@ class Scheduler():
except DatabaseError:
return
'''
Get tasks known by celery
'''
'''
active_tasks = self.get_activate_tasks()
# Communication with celery failed :(, return
if active_tasks is None:
return None
'''
self._schedule()

View File

@ -33,19 +33,19 @@ def run_scheduler():
@task
def run_fail_inconsistent_running_jobs():
return
print("run_fail_inconsistent_running_jobs() running")
with transaction.atomic():
# Lock
try:
Instance.objects.select_for_update(nowait=True).all()[0]
scheduler = Scheduler()
active_tasks = scheduler.get_activate_tasks()
active_tasks = scheduler.get_active_tasks()
if active_tasks is None:
# TODO: Failed to contact celery. We should surface this.
return None
all_sorted_tasks = scheduler.get_tasks()
scheduler.process_celery_tasks(active_tasks, all_sorted_tasks)
all_running_sorted_tasks = scheduler.get_running_tasks()
scheduler.process_celery_tasks(active_tasks, all_running_sorted_tasks)
except DatabaseError:
return