From 1359208a995582f779e194be2584d9a4c4beeb09 Mon Sep 17 00:00:00 2001 From: chris meyers Date: Thu, 14 Jun 2018 15:52:52 -0400 Subject: [PATCH] fix celery task reaper * celery workers have internal queue names that are named after the system hostname. This may differ from what tower knows the host by, Instance.hostname This adds a mapping so we can convert internal celery names to Instance names for purposes of reaping jobs. --- .../0041_v330_instance_system_hostname.py | 20 ++++++++ awx/main/models/ha.py | 9 ++++ awx/main/scheduler/task_manager.py | 50 ++++++++++++++++--- awx/main/tasks.py | 21 ++++++-- .../task_management/test_scheduler.py | 14 +++--- awx/main/tests/unit/test_task_manager.py | 2 + 6 files changed, 97 insertions(+), 19 deletions(-) create mode 100644 awx/main/migrations/0041_v330_instance_system_hostname.py diff --git a/awx/main/migrations/0041_v330_instance_system_hostname.py b/awx/main/migrations/0041_v330_instance_system_hostname.py new file mode 100644 index 0000000000..321566e563 --- /dev/null +++ b/awx/main/migrations/0041_v330_instance_system_hostname.py @@ -0,0 +1,20 @@ +# -*- coding: utf-8 -*- +# Generated by Django 1.11.11 on 2018-06-14 17:23 +from __future__ import unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('main', '0040_v330_unifiedjob_controller_node'), + ] + + operations = [ + migrations.AddField( + model_name='instance', + name='system_hostname', + field=models.CharField(default=b'', max_length=255, unique=True), + ), + ] diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index 6386857f08..a1b5b4cf41 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -45,6 +45,11 @@ class Instance(BaseModel): uuid = models.CharField(max_length=40) hostname = models.CharField(max_length=250, unique=True) + system_hostname = models.CharField( + max_length=255, + db_index=True, + help_text="Machine hostname", + ) created = models.DateTimeField(auto_now_add=True) modified = models.DateTimeField(auto_now=True) last_isolated_check = models.DateTimeField( @@ -109,6 +114,10 @@ class Instance(BaseModel): def jobs_total(self): return UnifiedJob.objects.filter(execution_node=self.hostname).count() + @property + def celery_system_hostname(self): + return 'celery@{}'.format(self.system_hostname) + def is_lost(self, ref_time=None, isolated=False): if ref_time is None: ref_time = now() diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 198f0e3652..d6fffca74c 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -8,6 +8,7 @@ import uuid import json import six import random +import itertools from sets import Set # Django @@ -158,6 +159,33 @@ class TaskManager(): return (active_task_queues, queues) + def map_system_hostname_to_instance_hostname(self, in_map): + ''' + Convert celery's system hostnames to Instance.hostname values e.g., + + map_system_hostname_to_instance_hostname({ + 'node1.example.org': ABC, + 'node2.example.org': ABC, + }) + + { + Instance.objects.get(system_hostname='node1.example.org').hostname: ABC, + Instance.objects.get(system_hostname='node2.example.org').hostname: ABC + } + ''' + out_map = dict() + + system_hostname_map = {i.system_hostname: i for i in + Instance.objects.only('system_hostname', 'hostname')} + + for k, v in in_map.iteritems(): + instance = system_hostname_map.get(k) + if not instance: + logger.warn("Could not map celery system hostname {} to Instance hostname".format(k)) + else: + out_map[instance.hostname] = v + return out_map + def get_latest_project_update_tasks(self, all_sorted_tasks): project_ids = Set() for task in all_sorted_tasks: @@ -554,10 +582,12 @@ class TaskManager(): Rectify tower db <-> celery inconsistent view of jobs state ''' last_cleanup = cache.get('last_celery_task_cleanup') or datetime.min.replace(tzinfo=utc) - if (tz_now() - last_cleanup).seconds < settings.AWX_INCONSISTENT_TASK_INTERVAL: + time_since_last_cleanup_sec = (tz_now() - last_cleanup).seconds + cleanup_diff = settings.AWX_INCONSISTENT_TASK_INTERVAL - time_since_last_cleanup_sec + if cleanup_diff > 0: + logger.debug("Skipping job reaper. Can run again in {} seconds".format(cleanup_diff)) return - logger.debug("Failing inconsistent running jobs.") celery_task_start_time = tz_now() active_task_queues, active_queues = self.get_active_tasks() cache.set('last_celery_task_cleanup', tz_now()) @@ -566,21 +596,21 @@ class TaskManager(): logger.error('Failed to retrieve active tasks from celery') return None + remapped_active_queues = self.map_system_hostname_to_instance_hostname(active_queues) + ''' Only consider failing tasks on instances for which we obtained a task list from celery for. ''' running_tasks, waiting_tasks = self.get_running_tasks() - all_celery_task_ids = [] - for node, node_jobs in active_queues.iteritems(): - all_celery_task_ids.extend(node_jobs) + all_celery_task_ids = list(itertools.chain.from_iterable(remapped_active_queues.values())) self.fail_jobs_if_not_in_celery(waiting_tasks, all_celery_task_ids, celery_task_start_time) for node, node_jobs in running_tasks.iteritems(): isolated = False - if node in active_queues: - active_tasks = active_queues[node] + if node in remapped_active_queues: + active_tasks = remapped_active_queues[node] else: ''' Node task list not found in celery. We may branch into cases: @@ -599,11 +629,17 @@ class TaskManager(): node, [j.log_format for j in node_jobs])) active_tasks = [] elif instance.capacity == 0: + logger.info("Instance {} is known to be offline and did not reply " + "with a list of running celery tasks. Going to fail all running" + "jobs associated with this instance.".format(instance.hostname)) active_tasks = [] elif instance.rampart_groups.filter(controller__isnull=False).exists(): + logger.info("Failing all jobs for Isolated Instance {} ".format(instance.hostname)) active_tasks = all_celery_task_ids isolated = True else: + logger.info("Instance {} did not reply with a list of running " + "celery tasks and the Instance does not look offline.".format(instance.hostname)) continue self.fail_jobs_if_not_in_celery( diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 7edfcc7246..11aeff7a8a 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -20,6 +20,7 @@ import time import traceback import six import urlparse +import socket from distutils.version import LooseVersion as Version import yaml import fcntl @@ -231,11 +232,21 @@ def handle_ha_toplogy_worker_ready(sender, **kwargs): @celeryd_after_setup.connect def handle_update_celery_hostname(sender, instance, **kwargs): - (changed, tower_instance) = Instance.objects.get_or_register() - if changed: - logger.info(six.text_type("Registered tower node '{}'").format(tower_instance.hostname)) - instance.hostname = 'celery@{}'.format(tower_instance.hostname) - logger.warn(six.text_type("Set hostname to {}").format(instance.hostname)) + ''' + Celery will appear to infinitely reboot if an error occurs here. + ''' + try: + (changed, tower_instance) = Instance.objects.get_or_register() + if changed: + logger.info(six.text_type("Registered tower node '{}'").format(tower_instance.hostname)) + system_hostname = socket.gethostname() + if system_hostname != tower_instance.system_hostname: + tower_instance.system_hostname = system_hostname + tower_instance.save(update_fields=['system_hostname']) + logger.warn(six.text_type("Set system hostname to {}").format(tower_instance.system_hostname)) + except Exception as e: + logger.error("Error encountered while starting celery and getting system hostname {}".format(e)) + raise e @shared_task(queue=settings.CELERY_DEFAULT_QUEUE) diff --git a/awx/main/tests/functional/task_management/test_scheduler.py b/awx/main/tests/functional/task_management/test_scheduler.py index 813adff7cf..709be26e40 100644 --- a/awx/main/tests/functional/task_management/test_scheduler.py +++ b/awx/main/tests/functional/task_management/test_scheduler.py @@ -265,10 +265,10 @@ class TestReaper(): def all_jobs(self, mocker): now = tz_now() - Instance.objects.create(hostname='host1', capacity=100) - Instance.objects.create(hostname='host2', capacity=100) - Instance.objects.create(hostname='host3_split', capacity=100) - Instance.objects.create(hostname='host4_offline', capacity=0) + Instance.objects.create(hostname='host1', system_hostname='host1_not_really', capacity=100) + Instance.objects.create(hostname='host2', system_hostname='host2', capacity=100) + Instance.objects.create(hostname='host3_split', system_hostname='host3_not_really', capacity=100) + Instance.objects.create(hostname='host4_offline', system_hostname='host4_offline', capacity=0) j1 = Job.objects.create(status='pending', execution_node='host1') j2 = Job.objects.create(status='waiting', celery_task_id='considered_j2') @@ -327,7 +327,7 @@ class TestReaper(): @pytest.fixture def active_tasks(self): return ([], { - 'host1': ['considered_j2', 'considered_j3', 'considered_j4',], + 'host1_not_really': ['considered_j2', 'considered_j3', 'considered_j4',], 'host2': ['considered_j6', 'considered_j7'], }) @@ -340,9 +340,9 @@ class TestReaper(): tm.get_running_tasks = mocker.Mock(return_value=(running_tasks, waiting_tasks)) tm.get_active_tasks = mocker.Mock(return_value=active_tasks) - + tm.cleanup_inconsistent_celery_tasks() - + for j in considered_jobs: if j not in reapable_jobs: j.save.assert_not_called() diff --git a/awx/main/tests/unit/test_task_manager.py b/awx/main/tests/unit/test_task_manager.py index da3bddc5e4..22e268607f 100644 --- a/awx/main/tests/unit/test_task_manager.py +++ b/awx/main/tests/unit/test_task_manager.py @@ -26,6 +26,7 @@ class TestCleanupInconsistentCeleryTasks(): def test_instance_does_not_exist(self, logger_mock, *args): logger_mock.error = mock.MagicMock(side_effect=RuntimeError("mocked")) tm = TaskManager() + tm.map_system_hostname_to_instance_hostname = lambda *args: dict() with pytest.raises(RuntimeError) as excinfo: tm.cleanup_inconsistent_celery_tasks() @@ -45,6 +46,7 @@ class TestCleanupInconsistentCeleryTasks(): job.websocket_emit_status = mock.MagicMock() get_running_tasks.return_value = ({'host1': [job]}, []) tm = TaskManager() + tm.map_system_hostname_to_instance_hostname = lambda *args: dict(host1=[]) with mock.patch.object(job, 'save', side_effect=DatabaseError): tm.cleanup_inconsistent_celery_tasks()