Merge pull request #2703 from ryanpetrello/simplify-dynamic-queues

simplify dynamic queue binding
This commit is contained in:
Ryan Petrello 2018-07-30 09:25:44 -04:00 committed by GitHub
commit 681d64c96f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 28 additions and 138 deletions

View File

@ -61,7 +61,7 @@ import pytz
from wsgiref.util import FileWrapper
# AWX
from awx.main.tasks import send_notifications, handle_ha_toplogy_changes
from awx.main.tasks import send_notifications
from awx.main.access import get_user_queryset
from awx.main.ha import is_ha_environment
from awx.api.filters import V1CredentialFilterBackend
@ -669,7 +669,6 @@ class InstanceDetail(RetrieveUpdateAPIView):
else:
obj.capacity = 0
obj.save()
handle_ha_toplogy_changes.apply_async()
r.data = InstanceSerializer(obj, context=self.get_serializer_context()).to_representation(obj)
return r

View File

@ -212,6 +212,8 @@ class InstanceGroup(BaseModel, RelatedJobsMixin):
def fit_task_to_most_remaining_capacity_instance(self, task):
instance_most_capacity = None
for i in self.instances.filter(capacity__gt=0).order_by('hostname'):
if not i.enabled:
continue
if i.remaining_capacity >= task.task_impact and \
(instance_most_capacity is None or
i.remaining_capacity > instance_most_capacity.remaining_capacity):

View File

@ -29,8 +29,10 @@ except Exception:
psutil = None
# Celery
from celery import Task, shared_task, Celery
from celery.signals import celeryd_init, worker_shutdown, worker_ready, celeryd_after_setup
from kombu import Queue, Exchange
from kombu.common import Broadcast
from celery import Task, shared_task
from celery.signals import celeryd_init, worker_shutdown, celeryd_after_setup
# Django
from django.conf import settings
@ -63,7 +65,6 @@ from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field,
from awx.main.utils.safe_yaml import safe_dump, sanitize_jinja
from awx.main.utils.reload import stop_local_services
from awx.main.utils.pglock import advisory_lock
from awx.main.utils.ha import register_celery_worker_queues
from awx.main.consumers import emit_channel_notification
from awx.conf import settings_registry
@ -107,8 +108,6 @@ def log_celery_failure(self, exc, task_id, args, kwargs, einfo):
@celeryd_init.connect
def celery_startup(conf=None, **kwargs):
# Re-init all schedules
# NOTE: Rework this during the Rampart work
startup_logger = logging.getLogger('awx.main.tasks')
startup_logger.info("Syncing Schedules")
for sch in Schedule.objects.all():
@ -120,6 +119,19 @@ def celery_startup(conf=None, **kwargs):
except Exception:
logger.exception(six.text_type("Failed to rebuild schedule {}.").format(sch))
# set the queues we want to bind to dynamically at startup
queues = []
me = Instance.objects.me()
for q in [me.hostname] + settings.AWX_CELERY_QUEUES_STATIC:
q = q.encode('utf-8')
queues.append(Queue(q, Exchange(q), routing_key=q))
for q in settings.AWX_CELERY_BCAST_QUEUES_STATIC:
queues.append(Broadcast(q.encode('utf-8')))
conf.CELERY_QUEUES = list(set(queues))
# Expedite the first hearbeat run so a node comes online quickly.
cluster_node_heartbeat.apply([])
@worker_shutdown.connect
def inform_cluster_of_shutdown(*args, **kwargs):
@ -184,7 +196,6 @@ def apply_cluster_membership_policies(self):
g.instances.append(i.obj.id)
g.obj.instances.add(i.obj)
i.groups.append(g.obj.id)
handle_ha_toplogy_changes.apply([])
@shared_task(exchange='tower_broadcast_all', bind=True)
@ -200,33 +211,6 @@ def handle_setting_changes(self, setting_keys):
cache.delete_many(cache_keys)
@shared_task(bind=True, exchange='tower_broadcast_all')
def handle_ha_toplogy_changes(self):
(changed, instance) = Instance.objects.get_or_register()
if changed:
logger.info(six.text_type("Registered tower node '{}'").format(instance.hostname))
logger.debug(six.text_type("Reconfigure celeryd queues task on host {}").format(self.request.hostname))
awx_app = Celery('awx')
awx_app.config_from_object('django.conf:settings')
removed_queues, added_queues = register_celery_worker_queues(awx_app, self.request.hostname)
if len(removed_queues) + len(added_queues) > 0:
logger.info(six.text_type("Workers on tower node '{}' removed from queues {} and added to queues {}")
.format(self.request.hostname, removed_queues, added_queues))
@worker_ready.connect
def handle_ha_toplogy_worker_ready(sender, **kwargs):
logger.debug(six.text_type("Configure celeryd queues task on host {}").format(sender.hostname))
removed_queues, added_queues = register_celery_worker_queues(sender.app, sender.hostname)
if len(removed_queues) + len(added_queues) > 0:
logger.info(six.text_type("Workers on tower node '{}' removed from queues {} and added to queues {}")
.format(sender.hostname, removed_queues, added_queues))
# Expedite the first hearbeat run so a node comes online quickly.
cluster_node_heartbeat.apply([])
apply_cluster_membership_policies.apply([])
@celeryd_after_setup.connect
def auto_register_ha_instance(sender, instance, **kwargs):
#
@ -340,11 +324,9 @@ def cluster_node_heartbeat(self):
logger.warning(six.text_type('Rejoining the cluster as instance {}.').format(this_inst.hostname))
if this_inst.enabled:
this_inst.refresh_capacity()
handle_ha_toplogy_changes.apply_async()
elif this_inst.capacity != 0 and not this_inst.enabled:
this_inst.capacity = 0
this_inst.save(update_fields=['capacity'])
handle_ha_toplogy_changes.apply_async()
if startup_event:
return
else:

View File

@ -1,5 +1,4 @@
import pytest
import mock
from awx.main.models import AdHocCommand, InventoryUpdate, Job, JobTemplate, ProjectUpdate, Instance
from awx.main.tasks import apply_cluster_membership_policies
@ -32,8 +31,7 @@ def test_instance_dup(org_admin, organization, project, instance_factory, instan
@pytest.mark.django_db
@mock.patch('awx.main.tasks.handle_ha_toplogy_changes', return_value=None)
def test_policy_instance_few_instances(mock, instance_factory, instance_group_factory):
def test_policy_instance_few_instances(instance_factory, instance_group_factory):
i1 = instance_factory("i1")
ig_1 = instance_group_factory("ig1", percentage=25)
ig_2 = instance_group_factory("ig2", percentage=25)
@ -61,8 +59,7 @@ def test_policy_instance_few_instances(mock, instance_factory, instance_group_fa
@pytest.mark.django_db
@mock.patch('awx.main.tasks.handle_ha_toplogy_changes', return_value=None)
def test_policy_instance_distribution_round_up(mock, instance_factory, instance_group_factory):
def test_policy_instance_distribution_round_up(instance_factory, instance_group_factory):
i1 = instance_factory("i1")
i2 = instance_factory("i2")
i3 = instance_factory("i3")
@ -76,8 +73,7 @@ def test_policy_instance_distribution_round_up(mock, instance_factory, instance_
@pytest.mark.django_db
@mock.patch('awx.main.tasks.handle_ha_toplogy_changes', return_value=None)
def test_policy_instance_distribution_uneven(mock, instance_factory, instance_group_factory):
def test_policy_instance_distribution_uneven(instance_factory, instance_group_factory):
i1 = instance_factory("i1")
i2 = instance_factory("i2")
i3 = instance_factory("i3")
@ -97,8 +93,7 @@ def test_policy_instance_distribution_uneven(mock, instance_factory, instance_gr
@pytest.mark.django_db
@mock.patch('awx.main.tasks.handle_ha_toplogy_changes', return_value=None)
def test_policy_instance_distribution_even(mock, instance_factory, instance_group_factory):
def test_policy_instance_distribution_even(instance_factory, instance_group_factory):
i1 = instance_factory("i1")
i2 = instance_factory("i2")
i3 = instance_factory("i3")
@ -131,8 +126,7 @@ def test_policy_instance_distribution_even(mock, instance_factory, instance_grou
@pytest.mark.django_db
@mock.patch('awx.main.tasks.handle_ha_toplogy_changes', return_value=None)
def test_policy_instance_distribution_simultaneous(mock, instance_factory, instance_group_factory):
def test_policy_instance_distribution_simultaneous(instance_factory, instance_group_factory):
i1 = instance_factory("i1")
i2 = instance_factory("i2")
i3 = instance_factory("i3")
@ -154,8 +148,7 @@ def test_policy_instance_distribution_simultaneous(mock, instance_factory, insta
@pytest.mark.django_db
@mock.patch('awx.main.tasks.handle_ha_toplogy_changes', return_value=None)
def test_policy_instance_list_manually_assigned(mock, instance_factory, instance_group_factory):
def test_policy_instance_list_manually_assigned(instance_factory, instance_group_factory):
i1 = instance_factory("i1")
i2 = instance_factory("i2")
ig_1 = instance_group_factory("ig1", percentage=100, minimum=2)
@ -171,8 +164,7 @@ def test_policy_instance_list_manually_assigned(mock, instance_factory, instance
@pytest.mark.django_db
@mock.patch('awx.main.tasks.handle_ha_toplogy_changes', return_value=None)
def test_policy_instance_list_explicitly_pinned(mock, instance_factory, instance_group_factory):
def test_policy_instance_list_explicitly_pinned(instance_factory, instance_group_factory):
i1 = instance_factory("i1")
i2 = instance_factory("i2")
i2.managed_by_policy = False
@ -213,8 +205,7 @@ def test_inherited_instance_group_membership(instance_group_factory, default_ins
@pytest.mark.django_db
@mock.patch('awx.main.tasks.handle_ha_toplogy_changes', return_value=None)
def test_mixed_group_membership(mock, instance_factory, instance_group_factory):
def test_mixed_group_membership(instance_factory, instance_group_factory):
for i in range(5):
instance_factory("i{}".format(i))
ig_1 = instance_group_factory("ig1", percentage=60)

View File

@ -21,7 +21,6 @@ def test_orphan_unified_job_creation(instance, inventory):
@pytest.mark.django_db
@mock.patch('awx.main.utils.common.get_cpu_capacity', lambda: (2,8))
@mock.patch('awx.main.utils.common.get_mem_capacity', lambda: (8000,62))
@mock.patch('awx.main.tasks.handle_ha_toplogy_changes.apply_async', lambda: True)
def test_job_capacity_and_with_inactive_node():
i = Instance.objects.create(hostname='test-1')
i.refresh_capacity()

View File

@ -6,11 +6,9 @@
# python
import pytest
import mock
from contextlib import nested
# AWX
from awx.main.utils.ha import (
_add_remove_celery_worker_queues,
AWXCeleryRouter,
)
@ -41,30 +39,6 @@ class TestAddRemoveCeleryWorkerQueues():
app.control.cancel_consumer = mocker.MagicMock()
return app
@pytest.mark.parametrize("broadcast_queues,static_queues,_worker_queues,hostname,added_expected,removed_expected", [
(['tower_broadcast_all'], ['east', 'west'], ['east', 'west', 'east-1'], 'east-1', ['tower_broadcast_all_east-1'], []),
([], [], ['east', 'west', 'east-1'], 'east-1', [], ['east', 'west']),
([], [], ['east', 'west'], 'east-1', ['east-1'], ['east', 'west']),
([], [], [], 'east-1', ['east-1'], []),
([], [], ['china', 'russia'], 'east-1', [ 'east-1'], ['china', 'russia']),
])
def test__add_remove_celery_worker_queues_noop(self, mock_app,
instance_generator,
worker_queues_generator,
broadcast_queues,
static_queues, _worker_queues,
hostname,
added_expected, removed_expected):
instance = instance_generator(hostname=hostname)
worker_queues = worker_queues_generator(_worker_queues)
with nested(
mock.patch('awx.main.utils.ha.settings.AWX_CELERY_QUEUES_STATIC', static_queues),
mock.patch('awx.main.utils.ha.settings.AWX_CELERY_BCAST_QUEUES_STATIC', broadcast_queues),
mock.patch('awx.main.utils.ha.settings.CLUSTER_HOST_ID', hostname)):
(added_queues, removed_queues) = _add_remove_celery_worker_queues(mock_app, instance, worker_queues, hostname)
assert set(added_expected) == set(added_queues)
assert set(removed_expected) == set(removed_queues)
class TestUpdateCeleryWorkerRouter():

View File

@ -3,52 +3,9 @@
# Copyright (c) 2017 Ansible Tower by Red Hat
# All Rights Reserved.
# Django
from django.conf import settings
# AWX
from awx.main.models import Instance
def construct_bcast_queue_name(common_name):
return common_name.encode('utf8') + '_' + settings.CLUSTER_HOST_ID
def _add_remove_celery_worker_queues(app, instance, worker_queues, worker_name):
removed_queues = []
added_queues = []
worker_queue_names = set([q['name'] for q in worker_queues])
bcast_queue_names = set([construct_bcast_queue_name(n) for n in settings.AWX_CELERY_BCAST_QUEUES_STATIC])
all_queue_names = set([instance.hostname]) | set(settings.AWX_CELERY_QUEUES_STATIC)
desired_queues = bcast_queue_names | (all_queue_names if instance.enabled else set())
# Remove queues
for queue_name in worker_queue_names:
if queue_name not in desired_queues:
app.control.cancel_consumer(queue_name.encode("utf8"), reply=True, destination=[worker_name])
removed_queues.append(queue_name.encode("utf8"))
# Add queues for instances
for queue_name in all_queue_names:
if queue_name not in worker_queue_names:
app.control.add_consumer(queue_name.encode("utf8"), reply=True, destination=[worker_name])
added_queues.append(queue_name.encode("utf8"))
# Add stable-named broadcast queues
for queue_name in settings.AWX_CELERY_BCAST_QUEUES_STATIC:
bcast_queue_name = construct_bcast_queue_name(queue_name)
if bcast_queue_name not in worker_queue_names:
app.control.add_consumer(bcast_queue_name,
exchange=queue_name.encode("utf8"),
exchange_type='fanout',
routing_key=queue_name.encode("utf8"),
reply=True)
added_queues.append(bcast_queue_name)
return (added_queues, removed_queues)
class AWXCeleryRouter(object):
def route_for_task(self, task, args=None, kwargs=None):
(changed, instance) = Instance.objects.get_or_register()
@ -64,17 +21,3 @@ class AWXCeleryRouter(object):
if instance.is_controller() and task in isolated_tasks:
return {'queue': instance.hostname.encode("utf8"), 'routing_key': instance.hostname.encode("utf8")}
def register_celery_worker_queues(app, celery_worker_name):
instance = Instance.objects.me()
added_queues = []
removed_queues = []
celery_host_queues = app.control.inspect([celery_worker_name]).active_queues()
celery_worker_queues = celery_host_queues[celery_worker_name] if celery_host_queues else []
(added_queues, removed_queues) = _add_remove_celery_worker_queues(app, instance,
celery_worker_queues, celery_worker_name)
return (removed_queues, added_queues)