diff --git a/awx/api/views.py b/awx/api/views.py index 3811b60f43..d7928218b4 100644 --- a/awx/api/views.py +++ b/awx/api/views.py @@ -61,7 +61,7 @@ import pytz from wsgiref.util import FileWrapper # AWX -from awx.main.tasks import send_notifications, handle_ha_toplogy_changes +from awx.main.tasks import send_notifications from awx.main.access import get_user_queryset from awx.main.ha import is_ha_environment from awx.api.filters import V1CredentialFilterBackend @@ -669,7 +669,6 @@ class InstanceDetail(RetrieveUpdateAPIView): else: obj.capacity = 0 obj.save() - handle_ha_toplogy_changes.apply_async() r.data = InstanceSerializer(obj, context=self.get_serializer_context()).to_representation(obj) return r diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index f94911df31..65f06d4408 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -212,6 +212,8 @@ class InstanceGroup(BaseModel, RelatedJobsMixin): def fit_task_to_most_remaining_capacity_instance(self, task): instance_most_capacity = None for i in self.instances.filter(capacity__gt=0).order_by('hostname'): + if not i.enabled: + continue if i.remaining_capacity >= task.task_impact and \ (instance_most_capacity is None or i.remaining_capacity > instance_most_capacity.remaining_capacity): diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 3d925ce63e..d97daf4e2b 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -29,8 +29,10 @@ except Exception: psutil = None # Celery -from celery import Task, shared_task, Celery -from celery.signals import celeryd_init, worker_shutdown, worker_ready, celeryd_after_setup +from kombu import Queue, Exchange +from kombu.common import Broadcast +from celery import Task, shared_task +from celery.signals import celeryd_init, worker_shutdown, celeryd_after_setup # Django from django.conf import settings @@ -63,7 +65,6 @@ from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field, from awx.main.utils.safe_yaml import safe_dump, sanitize_jinja from awx.main.utils.reload import stop_local_services from awx.main.utils.pglock import advisory_lock -from awx.main.utils.ha import register_celery_worker_queues from awx.main.consumers import emit_channel_notification from awx.conf import settings_registry @@ -107,8 +108,6 @@ def log_celery_failure(self, exc, task_id, args, kwargs, einfo): @celeryd_init.connect def celery_startup(conf=None, **kwargs): - # Re-init all schedules - # NOTE: Rework this during the Rampart work startup_logger = logging.getLogger('awx.main.tasks') startup_logger.info("Syncing Schedules") for sch in Schedule.objects.all(): @@ -120,6 +119,19 @@ def celery_startup(conf=None, **kwargs): except Exception: logger.exception(six.text_type("Failed to rebuild schedule {}.").format(sch)) + # set the queues we want to bind to dynamically at startup + queues = [] + me = Instance.objects.me() + for q in [me.hostname] + settings.AWX_CELERY_QUEUES_STATIC: + q = q.encode('utf-8') + queues.append(Queue(q, Exchange(q), routing_key=q)) + for q in settings.AWX_CELERY_BCAST_QUEUES_STATIC: + queues.append(Broadcast(q.encode('utf-8'))) + conf.CELERY_QUEUES = list(set(queues)) + + # Expedite the first hearbeat run so a node comes online quickly. + cluster_node_heartbeat.apply([]) + @worker_shutdown.connect def inform_cluster_of_shutdown(*args, **kwargs): @@ -184,7 +196,6 @@ def apply_cluster_membership_policies(self): g.instances.append(i.obj.id) g.obj.instances.add(i.obj) i.groups.append(g.obj.id) - handle_ha_toplogy_changes.apply([]) @shared_task(exchange='tower_broadcast_all', bind=True) @@ -200,33 +211,6 @@ def handle_setting_changes(self, setting_keys): cache.delete_many(cache_keys) -@shared_task(bind=True, exchange='tower_broadcast_all') -def handle_ha_toplogy_changes(self): - (changed, instance) = Instance.objects.get_or_register() - if changed: - logger.info(six.text_type("Registered tower node '{}'").format(instance.hostname)) - logger.debug(six.text_type("Reconfigure celeryd queues task on host {}").format(self.request.hostname)) - awx_app = Celery('awx') - awx_app.config_from_object('django.conf:settings') - removed_queues, added_queues = register_celery_worker_queues(awx_app, self.request.hostname) - if len(removed_queues) + len(added_queues) > 0: - logger.info(six.text_type("Workers on tower node '{}' removed from queues {} and added to queues {}") - .format(self.request.hostname, removed_queues, added_queues)) - - -@worker_ready.connect -def handle_ha_toplogy_worker_ready(sender, **kwargs): - logger.debug(six.text_type("Configure celeryd queues task on host {}").format(sender.hostname)) - removed_queues, added_queues = register_celery_worker_queues(sender.app, sender.hostname) - if len(removed_queues) + len(added_queues) > 0: - logger.info(six.text_type("Workers on tower node '{}' removed from queues {} and added to queues {}") - .format(sender.hostname, removed_queues, added_queues)) - - # Expedite the first hearbeat run so a node comes online quickly. - cluster_node_heartbeat.apply([]) - apply_cluster_membership_policies.apply([]) - - @celeryd_after_setup.connect def auto_register_ha_instance(sender, instance, **kwargs): # @@ -340,11 +324,9 @@ def cluster_node_heartbeat(self): logger.warning(six.text_type('Rejoining the cluster as instance {}.').format(this_inst.hostname)) if this_inst.enabled: this_inst.refresh_capacity() - handle_ha_toplogy_changes.apply_async() elif this_inst.capacity != 0 and not this_inst.enabled: this_inst.capacity = 0 this_inst.save(update_fields=['capacity']) - handle_ha_toplogy_changes.apply_async() if startup_event: return else: diff --git a/awx/main/tests/functional/test_instances.py b/awx/main/tests/functional/test_instances.py index 4ae56ca0b6..39a210be7b 100644 --- a/awx/main/tests/functional/test_instances.py +++ b/awx/main/tests/functional/test_instances.py @@ -1,5 +1,4 @@ import pytest -import mock from awx.main.models import AdHocCommand, InventoryUpdate, Job, JobTemplate, ProjectUpdate, Instance from awx.main.tasks import apply_cluster_membership_policies @@ -32,8 +31,7 @@ def test_instance_dup(org_admin, organization, project, instance_factory, instan @pytest.mark.django_db -@mock.patch('awx.main.tasks.handle_ha_toplogy_changes', return_value=None) -def test_policy_instance_few_instances(mock, instance_factory, instance_group_factory): +def test_policy_instance_few_instances(instance_factory, instance_group_factory): i1 = instance_factory("i1") ig_1 = instance_group_factory("ig1", percentage=25) ig_2 = instance_group_factory("ig2", percentage=25) @@ -61,8 +59,7 @@ def test_policy_instance_few_instances(mock, instance_factory, instance_group_fa @pytest.mark.django_db -@mock.patch('awx.main.tasks.handle_ha_toplogy_changes', return_value=None) -def test_policy_instance_distribution_round_up(mock, instance_factory, instance_group_factory): +def test_policy_instance_distribution_round_up(instance_factory, instance_group_factory): i1 = instance_factory("i1") i2 = instance_factory("i2") i3 = instance_factory("i3") @@ -76,8 +73,7 @@ def test_policy_instance_distribution_round_up(mock, instance_factory, instance_ @pytest.mark.django_db -@mock.patch('awx.main.tasks.handle_ha_toplogy_changes', return_value=None) -def test_policy_instance_distribution_uneven(mock, instance_factory, instance_group_factory): +def test_policy_instance_distribution_uneven(instance_factory, instance_group_factory): i1 = instance_factory("i1") i2 = instance_factory("i2") i3 = instance_factory("i3") @@ -97,8 +93,7 @@ def test_policy_instance_distribution_uneven(mock, instance_factory, instance_gr @pytest.mark.django_db -@mock.patch('awx.main.tasks.handle_ha_toplogy_changes', return_value=None) -def test_policy_instance_distribution_even(mock, instance_factory, instance_group_factory): +def test_policy_instance_distribution_even(instance_factory, instance_group_factory): i1 = instance_factory("i1") i2 = instance_factory("i2") i3 = instance_factory("i3") @@ -131,8 +126,7 @@ def test_policy_instance_distribution_even(mock, instance_factory, instance_grou @pytest.mark.django_db -@mock.patch('awx.main.tasks.handle_ha_toplogy_changes', return_value=None) -def test_policy_instance_distribution_simultaneous(mock, instance_factory, instance_group_factory): +def test_policy_instance_distribution_simultaneous(instance_factory, instance_group_factory): i1 = instance_factory("i1") i2 = instance_factory("i2") i3 = instance_factory("i3") @@ -154,8 +148,7 @@ def test_policy_instance_distribution_simultaneous(mock, instance_factory, insta @pytest.mark.django_db -@mock.patch('awx.main.tasks.handle_ha_toplogy_changes', return_value=None) -def test_policy_instance_list_manually_assigned(mock, instance_factory, instance_group_factory): +def test_policy_instance_list_manually_assigned(instance_factory, instance_group_factory): i1 = instance_factory("i1") i2 = instance_factory("i2") ig_1 = instance_group_factory("ig1", percentage=100, minimum=2) @@ -171,8 +164,7 @@ def test_policy_instance_list_manually_assigned(mock, instance_factory, instance @pytest.mark.django_db -@mock.patch('awx.main.tasks.handle_ha_toplogy_changes', return_value=None) -def test_policy_instance_list_explicitly_pinned(mock, instance_factory, instance_group_factory): +def test_policy_instance_list_explicitly_pinned(instance_factory, instance_group_factory): i1 = instance_factory("i1") i2 = instance_factory("i2") i2.managed_by_policy = False @@ -213,8 +205,7 @@ def test_inherited_instance_group_membership(instance_group_factory, default_ins @pytest.mark.django_db -@mock.patch('awx.main.tasks.handle_ha_toplogy_changes', return_value=None) -def test_mixed_group_membership(mock, instance_factory, instance_group_factory): +def test_mixed_group_membership(instance_factory, instance_group_factory): for i in range(5): instance_factory("i{}".format(i)) ig_1 = instance_group_factory("ig1", percentage=60) diff --git a/awx/main/tests/functional/test_jobs.py b/awx/main/tests/functional/test_jobs.py index aa95574b36..fd8070aab7 100644 --- a/awx/main/tests/functional/test_jobs.py +++ b/awx/main/tests/functional/test_jobs.py @@ -21,7 +21,6 @@ def test_orphan_unified_job_creation(instance, inventory): @pytest.mark.django_db @mock.patch('awx.main.utils.common.get_cpu_capacity', lambda: (2,8)) @mock.patch('awx.main.utils.common.get_mem_capacity', lambda: (8000,62)) -@mock.patch('awx.main.tasks.handle_ha_toplogy_changes.apply_async', lambda: True) def test_job_capacity_and_with_inactive_node(): i = Instance.objects.create(hostname='test-1') i.refresh_capacity() diff --git a/awx/main/tests/unit/utils/test_ha.py b/awx/main/tests/unit/utils/test_ha.py index 35c8005781..eb5cfcee03 100644 --- a/awx/main/tests/unit/utils/test_ha.py +++ b/awx/main/tests/unit/utils/test_ha.py @@ -6,11 +6,9 @@ # python import pytest import mock -from contextlib import nested # AWX from awx.main.utils.ha import ( - _add_remove_celery_worker_queues, AWXCeleryRouter, ) @@ -41,30 +39,6 @@ class TestAddRemoveCeleryWorkerQueues(): app.control.cancel_consumer = mocker.MagicMock() return app - @pytest.mark.parametrize("broadcast_queues,static_queues,_worker_queues,hostname,added_expected,removed_expected", [ - (['tower_broadcast_all'], ['east', 'west'], ['east', 'west', 'east-1'], 'east-1', ['tower_broadcast_all_east-1'], []), - ([], [], ['east', 'west', 'east-1'], 'east-1', [], ['east', 'west']), - ([], [], ['east', 'west'], 'east-1', ['east-1'], ['east', 'west']), - ([], [], [], 'east-1', ['east-1'], []), - ([], [], ['china', 'russia'], 'east-1', [ 'east-1'], ['china', 'russia']), - ]) - def test__add_remove_celery_worker_queues_noop(self, mock_app, - instance_generator, - worker_queues_generator, - broadcast_queues, - static_queues, _worker_queues, - hostname, - added_expected, removed_expected): - instance = instance_generator(hostname=hostname) - worker_queues = worker_queues_generator(_worker_queues) - with nested( - mock.patch('awx.main.utils.ha.settings.AWX_CELERY_QUEUES_STATIC', static_queues), - mock.patch('awx.main.utils.ha.settings.AWX_CELERY_BCAST_QUEUES_STATIC', broadcast_queues), - mock.patch('awx.main.utils.ha.settings.CLUSTER_HOST_ID', hostname)): - (added_queues, removed_queues) = _add_remove_celery_worker_queues(mock_app, instance, worker_queues, hostname) - assert set(added_expected) == set(added_queues) - assert set(removed_expected) == set(removed_queues) - class TestUpdateCeleryWorkerRouter(): diff --git a/awx/main/utils/ha.py b/awx/main/utils/ha.py index 1d9b6e08d3..538de73f69 100644 --- a/awx/main/utils/ha.py +++ b/awx/main/utils/ha.py @@ -3,52 +3,9 @@ # Copyright (c) 2017 Ansible Tower by Red Hat # All Rights Reserved. -# Django -from django.conf import settings - -# AWX from awx.main.models import Instance -def construct_bcast_queue_name(common_name): - return common_name.encode('utf8') + '_' + settings.CLUSTER_HOST_ID - - -def _add_remove_celery_worker_queues(app, instance, worker_queues, worker_name): - removed_queues = [] - added_queues = [] - worker_queue_names = set([q['name'] for q in worker_queues]) - - bcast_queue_names = set([construct_bcast_queue_name(n) for n in settings.AWX_CELERY_BCAST_QUEUES_STATIC]) - all_queue_names = set([instance.hostname]) | set(settings.AWX_CELERY_QUEUES_STATIC) - desired_queues = bcast_queue_names | (all_queue_names if instance.enabled else set()) - - # Remove queues - for queue_name in worker_queue_names: - if queue_name not in desired_queues: - app.control.cancel_consumer(queue_name.encode("utf8"), reply=True, destination=[worker_name]) - removed_queues.append(queue_name.encode("utf8")) - - # Add queues for instances - for queue_name in all_queue_names: - if queue_name not in worker_queue_names: - app.control.add_consumer(queue_name.encode("utf8"), reply=True, destination=[worker_name]) - added_queues.append(queue_name.encode("utf8")) - - # Add stable-named broadcast queues - for queue_name in settings.AWX_CELERY_BCAST_QUEUES_STATIC: - bcast_queue_name = construct_bcast_queue_name(queue_name) - if bcast_queue_name not in worker_queue_names: - app.control.add_consumer(bcast_queue_name, - exchange=queue_name.encode("utf8"), - exchange_type='fanout', - routing_key=queue_name.encode("utf8"), - reply=True) - added_queues.append(bcast_queue_name) - - return (added_queues, removed_queues) - - class AWXCeleryRouter(object): def route_for_task(self, task, args=None, kwargs=None): (changed, instance) = Instance.objects.get_or_register() @@ -64,17 +21,3 @@ class AWXCeleryRouter(object): if instance.is_controller() and task in isolated_tasks: return {'queue': instance.hostname.encode("utf8"), 'routing_key': instance.hostname.encode("utf8")} - - -def register_celery_worker_queues(app, celery_worker_name): - instance = Instance.objects.me() - added_queues = [] - removed_queues = [] - - celery_host_queues = app.control.inspect([celery_worker_name]).active_queues() - - celery_worker_queues = celery_host_queues[celery_worker_name] if celery_host_queues else [] - (added_queues, removed_queues) = _add_remove_celery_worker_queues(app, instance, - celery_worker_queues, celery_worker_name) - return (removed_queues, added_queues) -