mirror of
https://github.com/ansible/awx.git
synced 2026-05-07 01:17:37 -02:30
Merge pull request #2216 from ansible/rollback-task-reaper-pr
Revert "fix celery task reaper"
This commit is contained in:
@@ -1,20 +0,0 @@
|
|||||||
# -*- coding: utf-8 -*-
|
|
||||||
# Generated by Django 1.11.11 on 2018-06-14 17:23
|
|
||||||
from __future__ import unicode_literals
|
|
||||||
|
|
||||||
from django.db import migrations, models
|
|
||||||
|
|
||||||
|
|
||||||
class Migration(migrations.Migration):
|
|
||||||
|
|
||||||
dependencies = [
|
|
||||||
('main', '0041_v330_update_oauth_refreshtoken'),
|
|
||||||
]
|
|
||||||
|
|
||||||
operations = [
|
|
||||||
migrations.AddField(
|
|
||||||
model_name='instance',
|
|
||||||
name='system_hostname',
|
|
||||||
field=models.CharField(default=b'', max_length=255, unique=True),
|
|
||||||
),
|
|
||||||
]
|
|
||||||
@@ -45,11 +45,6 @@ class Instance(BaseModel):
|
|||||||
|
|
||||||
uuid = models.CharField(max_length=40)
|
uuid = models.CharField(max_length=40)
|
||||||
hostname = models.CharField(max_length=250, unique=True)
|
hostname = models.CharField(max_length=250, unique=True)
|
||||||
system_hostname = models.CharField(
|
|
||||||
max_length=255,
|
|
||||||
db_index=True,
|
|
||||||
help_text="Machine hostname",
|
|
||||||
)
|
|
||||||
created = models.DateTimeField(auto_now_add=True)
|
created = models.DateTimeField(auto_now_add=True)
|
||||||
modified = models.DateTimeField(auto_now=True)
|
modified = models.DateTimeField(auto_now=True)
|
||||||
last_isolated_check = models.DateTimeField(
|
last_isolated_check = models.DateTimeField(
|
||||||
@@ -114,10 +109,6 @@ class Instance(BaseModel):
|
|||||||
def jobs_total(self):
|
def jobs_total(self):
|
||||||
return UnifiedJob.objects.filter(execution_node=self.hostname).count()
|
return UnifiedJob.objects.filter(execution_node=self.hostname).count()
|
||||||
|
|
||||||
@property
|
|
||||||
def celery_system_hostname(self):
|
|
||||||
return 'celery@{}'.format(self.system_hostname)
|
|
||||||
|
|
||||||
def is_lost(self, ref_time=None, isolated=False):
|
def is_lost(self, ref_time=None, isolated=False):
|
||||||
if ref_time is None:
|
if ref_time is None:
|
||||||
ref_time = now()
|
ref_time = now()
|
||||||
|
|||||||
@@ -8,7 +8,6 @@ import uuid
|
|||||||
import json
|
import json
|
||||||
import six
|
import six
|
||||||
import random
|
import random
|
||||||
import itertools
|
|
||||||
from sets import Set
|
from sets import Set
|
||||||
|
|
||||||
# Django
|
# Django
|
||||||
@@ -159,33 +158,6 @@ class TaskManager():
|
|||||||
|
|
||||||
return (active_task_queues, queues)
|
return (active_task_queues, queues)
|
||||||
|
|
||||||
def map_system_hostname_to_instance_hostname(self, in_map):
|
|
||||||
'''
|
|
||||||
Convert celery's system hostnames to Instance.hostname values e.g.,
|
|
||||||
|
|
||||||
map_system_hostname_to_instance_hostname({
|
|
||||||
'node1.example.org': ABC,
|
|
||||||
'node2.example.org': ABC,
|
|
||||||
})
|
|
||||||
|
|
||||||
{
|
|
||||||
Instance.objects.get(system_hostname='node1.example.org').hostname: ABC,
|
|
||||||
Instance.objects.get(system_hostname='node2.example.org').hostname: ABC
|
|
||||||
}
|
|
||||||
'''
|
|
||||||
out_map = dict()
|
|
||||||
|
|
||||||
system_hostname_map = {i.system_hostname: i for i in
|
|
||||||
Instance.objects.only('system_hostname', 'hostname')}
|
|
||||||
|
|
||||||
for k, v in in_map.iteritems():
|
|
||||||
instance = system_hostname_map.get(k)
|
|
||||||
if not instance:
|
|
||||||
logger.warn("Could not map celery system hostname {} to Instance hostname".format(k))
|
|
||||||
else:
|
|
||||||
out_map[instance.hostname] = v
|
|
||||||
return out_map
|
|
||||||
|
|
||||||
def get_latest_project_update_tasks(self, all_sorted_tasks):
|
def get_latest_project_update_tasks(self, all_sorted_tasks):
|
||||||
project_ids = Set()
|
project_ids = Set()
|
||||||
for task in all_sorted_tasks:
|
for task in all_sorted_tasks:
|
||||||
@@ -582,12 +554,10 @@ class TaskManager():
|
|||||||
Rectify tower db <-> celery inconsistent view of jobs state
|
Rectify tower db <-> celery inconsistent view of jobs state
|
||||||
'''
|
'''
|
||||||
last_cleanup = cache.get('last_celery_task_cleanup') or datetime.min.replace(tzinfo=utc)
|
last_cleanup = cache.get('last_celery_task_cleanup') or datetime.min.replace(tzinfo=utc)
|
||||||
time_since_last_cleanup_sec = (tz_now() - last_cleanup).seconds
|
if (tz_now() - last_cleanup).seconds < settings.AWX_INCONSISTENT_TASK_INTERVAL:
|
||||||
cleanup_diff = settings.AWX_INCONSISTENT_TASK_INTERVAL - time_since_last_cleanup_sec
|
|
||||||
if cleanup_diff > 0:
|
|
||||||
logger.debug("Skipping job reaper. Can run again in {} seconds".format(cleanup_diff))
|
|
||||||
return
|
return
|
||||||
|
|
||||||
|
logger.debug("Failing inconsistent running jobs.")
|
||||||
celery_task_start_time = tz_now()
|
celery_task_start_time = tz_now()
|
||||||
active_task_queues, active_queues = self.get_active_tasks()
|
active_task_queues, active_queues = self.get_active_tasks()
|
||||||
cache.set('last_celery_task_cleanup', tz_now())
|
cache.set('last_celery_task_cleanup', tz_now())
|
||||||
@@ -596,21 +566,21 @@ class TaskManager():
|
|||||||
logger.error('Failed to retrieve active tasks from celery')
|
logger.error('Failed to retrieve active tasks from celery')
|
||||||
return None
|
return None
|
||||||
|
|
||||||
remapped_active_queues = self.map_system_hostname_to_instance_hostname(active_queues)
|
|
||||||
|
|
||||||
'''
|
'''
|
||||||
Only consider failing tasks on instances for which we obtained a task
|
Only consider failing tasks on instances for which we obtained a task
|
||||||
list from celery for.
|
list from celery for.
|
||||||
'''
|
'''
|
||||||
running_tasks, waiting_tasks = self.get_running_tasks()
|
running_tasks, waiting_tasks = self.get_running_tasks()
|
||||||
all_celery_task_ids = list(itertools.chain.from_iterable(remapped_active_queues.values()))
|
all_celery_task_ids = []
|
||||||
|
for node, node_jobs in active_queues.iteritems():
|
||||||
|
all_celery_task_ids.extend(node_jobs)
|
||||||
|
|
||||||
self.fail_jobs_if_not_in_celery(waiting_tasks, all_celery_task_ids, celery_task_start_time)
|
self.fail_jobs_if_not_in_celery(waiting_tasks, all_celery_task_ids, celery_task_start_time)
|
||||||
|
|
||||||
for node, node_jobs in running_tasks.iteritems():
|
for node, node_jobs in running_tasks.iteritems():
|
||||||
isolated = False
|
isolated = False
|
||||||
if node in remapped_active_queues:
|
if node in active_queues:
|
||||||
active_tasks = remapped_active_queues[node]
|
active_tasks = active_queues[node]
|
||||||
else:
|
else:
|
||||||
'''
|
'''
|
||||||
Node task list not found in celery. We may branch into cases:
|
Node task list not found in celery. We may branch into cases:
|
||||||
@@ -629,17 +599,11 @@ class TaskManager():
|
|||||||
node, [j.log_format for j in node_jobs]))
|
node, [j.log_format for j in node_jobs]))
|
||||||
active_tasks = []
|
active_tasks = []
|
||||||
elif instance.capacity == 0:
|
elif instance.capacity == 0:
|
||||||
logger.info("Instance {} is known to be offline and did not reply "
|
|
||||||
"with a list of running celery tasks. Going to fail all running"
|
|
||||||
"jobs associated with this instance.".format(instance.hostname))
|
|
||||||
active_tasks = []
|
active_tasks = []
|
||||||
elif instance.rampart_groups.filter(controller__isnull=False).exists():
|
elif instance.rampart_groups.filter(controller__isnull=False).exists():
|
||||||
logger.info("Failing all jobs for Isolated Instance {} ".format(instance.hostname))
|
|
||||||
active_tasks = all_celery_task_ids
|
active_tasks = all_celery_task_ids
|
||||||
isolated = True
|
isolated = True
|
||||||
else:
|
else:
|
||||||
logger.info("Instance {} did not reply with a list of running "
|
|
||||||
"celery tasks and the Instance does not look offline.".format(instance.hostname))
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
self.fail_jobs_if_not_in_celery(
|
self.fail_jobs_if_not_in_celery(
|
||||||
|
|||||||
@@ -20,7 +20,6 @@ import time
|
|||||||
import traceback
|
import traceback
|
||||||
import six
|
import six
|
||||||
import urlparse
|
import urlparse
|
||||||
import socket
|
|
||||||
from distutils.version import LooseVersion as Version
|
from distutils.version import LooseVersion as Version
|
||||||
import yaml
|
import yaml
|
||||||
import fcntl
|
import fcntl
|
||||||
@@ -232,21 +231,11 @@ def handle_ha_toplogy_worker_ready(sender, **kwargs):
|
|||||||
|
|
||||||
@celeryd_after_setup.connect
|
@celeryd_after_setup.connect
|
||||||
def handle_update_celery_hostname(sender, instance, **kwargs):
|
def handle_update_celery_hostname(sender, instance, **kwargs):
|
||||||
'''
|
(changed, tower_instance) = Instance.objects.get_or_register()
|
||||||
Celery will appear to infinitely reboot if an error occurs here.
|
if changed:
|
||||||
'''
|
logger.info(six.text_type("Registered tower node '{}'").format(tower_instance.hostname))
|
||||||
try:
|
instance.hostname = 'celery@{}'.format(tower_instance.hostname)
|
||||||
(changed, tower_instance) = Instance.objects.get_or_register()
|
logger.warn(six.text_type("Set hostname to {}").format(instance.hostname))
|
||||||
if changed:
|
|
||||||
logger.info(six.text_type("Registered tower node '{}'").format(tower_instance.hostname))
|
|
||||||
system_hostname = socket.gethostname()
|
|
||||||
if system_hostname != tower_instance.system_hostname:
|
|
||||||
tower_instance.system_hostname = system_hostname
|
|
||||||
tower_instance.save(update_fields=['system_hostname'])
|
|
||||||
logger.warn(six.text_type("Set system hostname to {}").format(tower_instance.system_hostname))
|
|
||||||
except Exception as e:
|
|
||||||
logger.error("Error encountered while starting celery and getting system hostname {}".format(e))
|
|
||||||
raise e
|
|
||||||
|
|
||||||
|
|
||||||
@shared_task(queue=settings.CELERY_DEFAULT_QUEUE)
|
@shared_task(queue=settings.CELERY_DEFAULT_QUEUE)
|
||||||
|
|||||||
@@ -265,10 +265,10 @@ class TestReaper():
|
|||||||
def all_jobs(self, mocker):
|
def all_jobs(self, mocker):
|
||||||
now = tz_now()
|
now = tz_now()
|
||||||
|
|
||||||
Instance.objects.create(hostname='host1', system_hostname='host1_not_really', capacity=100)
|
Instance.objects.create(hostname='host1', capacity=100)
|
||||||
Instance.objects.create(hostname='host2', system_hostname='host2', capacity=100)
|
Instance.objects.create(hostname='host2', capacity=100)
|
||||||
Instance.objects.create(hostname='host3_split', system_hostname='host3_not_really', capacity=100)
|
Instance.objects.create(hostname='host3_split', capacity=100)
|
||||||
Instance.objects.create(hostname='host4_offline', system_hostname='host4_offline', capacity=0)
|
Instance.objects.create(hostname='host4_offline', capacity=0)
|
||||||
|
|
||||||
j1 = Job.objects.create(status='pending', execution_node='host1')
|
j1 = Job.objects.create(status='pending', execution_node='host1')
|
||||||
j2 = Job.objects.create(status='waiting', celery_task_id='considered_j2')
|
j2 = Job.objects.create(status='waiting', celery_task_id='considered_j2')
|
||||||
@@ -327,7 +327,7 @@ class TestReaper():
|
|||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def active_tasks(self):
|
def active_tasks(self):
|
||||||
return ([], {
|
return ([], {
|
||||||
'host1_not_really': ['considered_j2', 'considered_j3', 'considered_j4',],
|
'host1': ['considered_j2', 'considered_j3', 'considered_j4',],
|
||||||
'host2': ['considered_j6', 'considered_j7'],
|
'host2': ['considered_j6', 'considered_j7'],
|
||||||
})
|
})
|
||||||
|
|
||||||
@@ -340,9 +340,9 @@ class TestReaper():
|
|||||||
|
|
||||||
tm.get_running_tasks = mocker.Mock(return_value=(running_tasks, waiting_tasks))
|
tm.get_running_tasks = mocker.Mock(return_value=(running_tasks, waiting_tasks))
|
||||||
tm.get_active_tasks = mocker.Mock(return_value=active_tasks)
|
tm.get_active_tasks = mocker.Mock(return_value=active_tasks)
|
||||||
|
|
||||||
tm.cleanup_inconsistent_celery_tasks()
|
tm.cleanup_inconsistent_celery_tasks()
|
||||||
|
|
||||||
for j in considered_jobs:
|
for j in considered_jobs:
|
||||||
if j not in reapable_jobs:
|
if j not in reapable_jobs:
|
||||||
j.save.assert_not_called()
|
j.save.assert_not_called()
|
||||||
|
|||||||
@@ -26,7 +26,6 @@ class TestCleanupInconsistentCeleryTasks():
|
|||||||
def test_instance_does_not_exist(self, logger_mock, *args):
|
def test_instance_does_not_exist(self, logger_mock, *args):
|
||||||
logger_mock.error = mock.MagicMock(side_effect=RuntimeError("mocked"))
|
logger_mock.error = mock.MagicMock(side_effect=RuntimeError("mocked"))
|
||||||
tm = TaskManager()
|
tm = TaskManager()
|
||||||
tm.map_system_hostname_to_instance_hostname = lambda *args: dict()
|
|
||||||
with pytest.raises(RuntimeError) as excinfo:
|
with pytest.raises(RuntimeError) as excinfo:
|
||||||
tm.cleanup_inconsistent_celery_tasks()
|
tm.cleanup_inconsistent_celery_tasks()
|
||||||
|
|
||||||
@@ -46,7 +45,6 @@ class TestCleanupInconsistentCeleryTasks():
|
|||||||
job.websocket_emit_status = mock.MagicMock()
|
job.websocket_emit_status = mock.MagicMock()
|
||||||
get_running_tasks.return_value = ({'host1': [job]}, [])
|
get_running_tasks.return_value = ({'host1': [job]}, [])
|
||||||
tm = TaskManager()
|
tm = TaskManager()
|
||||||
tm.map_system_hostname_to_instance_hostname = lambda *args: dict(host1=[])
|
|
||||||
|
|
||||||
with mock.patch.object(job, 'save', side_effect=DatabaseError):
|
with mock.patch.object(job, 'save', side_effect=DatabaseError):
|
||||||
tm.cleanup_inconsistent_celery_tasks()
|
tm.cleanup_inconsistent_celery_tasks()
|
||||||
|
|||||||
Reference in New Issue
Block a user