Revert "fix celery task reaper"

This reverts commit 1359208a99.
This commit is contained in:
Ryan Petrello
2018-06-18 16:18:20 -04:00
parent b4c30576af
commit ef6433c6f9
6 changed files with 19 additions and 97 deletions

View File

@@ -8,7 +8,6 @@ import uuid
import json
import six
import random
import itertools
from sets import Set
# Django
@@ -159,33 +158,6 @@ class TaskManager():
return (active_task_queues, queues)
def map_system_hostname_to_instance_hostname(self, in_map):
'''
Convert celery's system hostnames to Instance.hostname values e.g.,
map_system_hostname_to_instance_hostname({
'node1.example.org': ABC,
'node2.example.org': ABC,
})
{
Instance.objects.get(system_hostname='node1.example.org').hostname: ABC,
Instance.objects.get(system_hostname='node2.example.org').hostname: ABC
}
'''
out_map = dict()
system_hostname_map = {i.system_hostname: i for i in
Instance.objects.only('system_hostname', 'hostname')}
for k, v in in_map.iteritems():
instance = system_hostname_map.get(k)
if not instance:
logger.warn("Could not map celery system hostname {} to Instance hostname".format(k))
else:
out_map[instance.hostname] = v
return out_map
def get_latest_project_update_tasks(self, all_sorted_tasks):
project_ids = Set()
for task in all_sorted_tasks:
@@ -582,12 +554,10 @@ class TaskManager():
Rectify tower db <-> celery inconsistent view of jobs state
'''
last_cleanup = cache.get('last_celery_task_cleanup') or datetime.min.replace(tzinfo=utc)
time_since_last_cleanup_sec = (tz_now() - last_cleanup).seconds
cleanup_diff = settings.AWX_INCONSISTENT_TASK_INTERVAL - time_since_last_cleanup_sec
if cleanup_diff > 0:
logger.debug("Skipping job reaper. Can run again in {} seconds".format(cleanup_diff))
if (tz_now() - last_cleanup).seconds < settings.AWX_INCONSISTENT_TASK_INTERVAL:
return
logger.debug("Failing inconsistent running jobs.")
celery_task_start_time = tz_now()
active_task_queues, active_queues = self.get_active_tasks()
cache.set('last_celery_task_cleanup', tz_now())
@@ -596,21 +566,21 @@ class TaskManager():
logger.error('Failed to retrieve active tasks from celery')
return None
remapped_active_queues = self.map_system_hostname_to_instance_hostname(active_queues)
'''
Only consider failing tasks on instances for which we obtained a task
list from celery for.
'''
running_tasks, waiting_tasks = self.get_running_tasks()
all_celery_task_ids = list(itertools.chain.from_iterable(remapped_active_queues.values()))
all_celery_task_ids = []
for node, node_jobs in active_queues.iteritems():
all_celery_task_ids.extend(node_jobs)
self.fail_jobs_if_not_in_celery(waiting_tasks, all_celery_task_ids, celery_task_start_time)
for node, node_jobs in running_tasks.iteritems():
isolated = False
if node in remapped_active_queues:
active_tasks = remapped_active_queues[node]
if node in active_queues:
active_tasks = active_queues[node]
else:
'''
Node task list not found in celery. We may branch into cases:
@@ -629,17 +599,11 @@ class TaskManager():
node, [j.log_format for j in node_jobs]))
active_tasks = []
elif instance.capacity == 0:
logger.info("Instance {} is known to be offline and did not reply "
"with a list of running celery tasks. Going to fail all running"
"jobs associated with this instance.".format(instance.hostname))
active_tasks = []
elif instance.rampart_groups.filter(controller__isnull=False).exists():
logger.info("Failing all jobs for Isolated Instance {} ".format(instance.hostname))
active_tasks = all_celery_task_ids
isolated = True
else:
logger.info("Instance {} did not reply with a list of running "
"celery tasks and the Instance does not look offline.".format(instance.hostname))
continue
self.fail_jobs_if_not_in_celery(