mirror of
https://github.com/ansible/awx.git
synced 2026-02-18 03:30:02 -03:30
don't access the database in our custom route_for_task
If database connectivity is lost/interrupted in this block of celery internals, beat is *not* smart enough to recover, and it gets stuck in an endless fail loop. We don't _need_ to talk to the database here anyways; just use settings.CLUSTER_HOST_ID to get what we need. see: https://github.com/ansible/tower/issues/2957
This commit is contained in:
@@ -1,68 +0,0 @@
|
|||||||
# -*- coding: utf-8 -*-
|
|
||||||
|
|
||||||
# Copyright (c) 2017 Ansible Tower by Red Hat
|
|
||||||
# All Rights Reserved.
|
|
||||||
|
|
||||||
# python
|
|
||||||
import pytest
|
|
||||||
import mock
|
|
||||||
|
|
||||||
# AWX
|
|
||||||
from awx.main.utils.ha import (
|
|
||||||
AWXCeleryRouter,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class TestAddRemoveCeleryWorkerQueues():
|
|
||||||
@pytest.fixture
|
|
||||||
def instance_generator(self, mocker):
|
|
||||||
def fn(hostname='east-1'):
|
|
||||||
groups=['east', 'west', 'north', 'south']
|
|
||||||
instance = mocker.MagicMock()
|
|
||||||
instance.hostname = hostname
|
|
||||||
instance.rampart_groups = mocker.MagicMock()
|
|
||||||
instance.rampart_groups.values_list = mocker.MagicMock(return_value=groups)
|
|
||||||
|
|
||||||
return instance
|
|
||||||
return fn
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def worker_queues_generator(self, mocker):
|
|
||||||
def fn(queues=['east', 'west']):
|
|
||||||
return [dict(name=n, alias='') for n in queues]
|
|
||||||
return fn
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def mock_app(self, mocker):
|
|
||||||
app = mocker.MagicMock()
|
|
||||||
app.control = mocker.MagicMock()
|
|
||||||
app.control.cancel_consumer = mocker.MagicMock()
|
|
||||||
return app
|
|
||||||
|
|
||||||
|
|
||||||
class TestUpdateCeleryWorkerRouter():
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("is_controller,expected_routes", [
|
|
||||||
(False, {
|
|
||||||
'awx.main.tasks.cluster_node_heartbeat': {'queue': 'east-1', 'routing_key': 'east-1'},
|
|
||||||
'awx.main.tasks.purge_old_stdout_files': {'queue': 'east-1', 'routing_key': 'east-1'}
|
|
||||||
}),
|
|
||||||
(True, {
|
|
||||||
'awx.main.tasks.cluster_node_heartbeat': {'queue': 'east-1', 'routing_key': 'east-1'},
|
|
||||||
'awx.main.tasks.purge_old_stdout_files': {'queue': 'east-1', 'routing_key': 'east-1'},
|
|
||||||
'awx.main.tasks.awx_isolated_heartbeat': {'queue': 'east-1', 'routing_key': 'east-1'},
|
|
||||||
}),
|
|
||||||
])
|
|
||||||
def test_update_celery_worker_routes(self, mocker, is_controller, expected_routes):
|
|
||||||
def get_or_register():
|
|
||||||
instance = mock.MagicMock()
|
|
||||||
instance.hostname = 'east-1'
|
|
||||||
instance.is_controller = mock.MagicMock(return_value=is_controller)
|
|
||||||
return (False, instance)
|
|
||||||
|
|
||||||
with mock.patch('awx.main.models.Instance.objects.get_or_register', get_or_register):
|
|
||||||
router = AWXCeleryRouter()
|
|
||||||
|
|
||||||
for k,v in expected_routes.iteritems():
|
|
||||||
assert router.route_for_task(k) == v
|
|
||||||
|
|
||||||
@@ -3,21 +3,15 @@
|
|||||||
# Copyright (c) 2017 Ansible Tower by Red Hat
|
# Copyright (c) 2017 Ansible Tower by Red Hat
|
||||||
# All Rights Reserved.
|
# All Rights Reserved.
|
||||||
|
|
||||||
from awx.main.models import Instance
|
from django.conf import settings
|
||||||
|
|
||||||
|
|
||||||
class AWXCeleryRouter(object):
|
class AWXCeleryRouter(object):
|
||||||
def route_for_task(self, task, args=None, kwargs=None):
|
def route_for_task(self, task, args=None, kwargs=None):
|
||||||
(changed, instance) = Instance.objects.get_or_register()
|
|
||||||
tasks = [
|
tasks = [
|
||||||
'awx.main.tasks.cluster_node_heartbeat',
|
'awx.main.tasks.cluster_node_heartbeat',
|
||||||
'awx.main.tasks.purge_old_stdout_files',
|
'awx.main.tasks.purge_old_stdout_files',
|
||||||
]
|
|
||||||
isolated_tasks = [
|
|
||||||
'awx.main.tasks.awx_isolated_heartbeat',
|
'awx.main.tasks.awx_isolated_heartbeat',
|
||||||
]
|
]
|
||||||
if task in tasks:
|
if task in tasks:
|
||||||
return {'queue': instance.hostname.encode("utf8"), 'routing_key': instance.hostname.encode("utf8")}
|
return {'queue': settings.CLUSTER_HOST_ID, 'routing_key': settings.CLUSTER_HOST_ID}
|
||||||
|
|
||||||
if instance.is_controller() and task in isolated_tasks:
|
|
||||||
return {'queue': instance.hostname.encode("utf8"), 'routing_key': instance.hostname.encode("utf8")}
|
|
||||||
|
|||||||
Reference in New Issue
Block a user