diff --git a/Makefile b/Makefile index 67136029d2..7d4c242693 100644 --- a/Makefile +++ b/Makefile @@ -323,7 +323,7 @@ celeryd: @if [ "$(VENV_BASE)" ]; then \ . $(VENV_BASE)/awx/bin/activate; \ fi; \ - celery worker -A awx -l DEBUG -B -Ofair --autoscale=100,4 --schedule=$(CELERY_SCHEDULE_FILE) -n celery@$(COMPOSE_HOST) --pidfile /tmp/celery_pid + celery worker -A awx -l DEBUG -B -Ofair --autoscale=100,4 --schedule=$(CELERY_SCHEDULE_FILE) --pidfile /tmp/celery_pid # Run to start the zeromq callback receiver receiver: diff --git a/awx/api/permissions.py b/awx/api/permissions.py index 6c7fee85bf..1567158f0e 100644 --- a/awx/api/permissions.py +++ b/awx/api/permissions.py @@ -231,7 +231,7 @@ class IsSuperUser(permissions.BasePermission): class InstanceGroupTowerPermission(ModelAccessPermission): def has_object_permission(self, request, view, obj): - if request.method not in permissions.SAFE_METHODS and obj.name == "tower": + if request.method == 'DELETE' and obj.name == "tower": return False return super(InstanceGroupTowerPermission, self).has_object_permission(request, view, obj) diff --git a/awx/main/access.py b/awx/main/access.py index 465ee1c540..ef2577d695 100644 --- a/awx/main/access.py +++ b/awx/main/access.py @@ -455,15 +455,6 @@ class InstanceGroupAccess(BaseAccess): def can_change(self, obj, data): return self.user.is_superuser - def can_delete(self, obj): - return self.user.is_superuser - - def can_attach(self, obj, sub_obj, relationship, *args, **kwargs): - return self.user.is_superuser - - def can_unattach(self, obj, sub_obj, relationship, *args, **kwargs): - return self.user.is_superuser - class UserAccess(BaseAccess): ''' diff --git a/awx/main/management/commands/inventory_import.py b/awx/main/management/commands/inventory_import.py index 10ae94062f..3a03e66c53 100644 --- a/awx/main/management/commands/inventory_import.py +++ b/awx/main/management/commands/inventory_import.py @@ -403,9 +403,7 @@ class Command(BaseCommand): _eager_fields=dict( job_args=json.dumps(sys.argv), job_env=dict(os.environ.items()), - job_cwd=os.getcwd(), - execution_node=settings.CLUSTER_HOST_ID, - instance_group=InstanceGroup.objects.get(name='tower')) + job_cwd=os.getcwd()) ) # FIXME: Wait or raise error if inventory is being updated by another diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 4df4ba2524..cb42464462 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -137,7 +137,7 @@ def inform_cluster_of_shutdown(*args, **kwargs): logger.exception('Encountered problem with normal shutdown signal.') -@shared_task(bind=True, queue='tower_instance_router') +@shared_task(bind=True, queue=settings.CELERY_DEFAULT_QUEUE) def apply_cluster_membership_policies(self): with advisory_lock('cluster_policy_lock', wait=True): considered_instances = Instance.objects.all().order_by('id') @@ -148,20 +148,9 @@ def apply_cluster_membership_policies(self): Group = namedtuple('Group', ['obj', 'instances']) Node = namedtuple('Instance', ['obj', 'groups']) - # Add every instance to the special 'tower' group - tower_q = InstanceGroup.objects.filter(name='tower') - if tower_q.exists(): - tower_inst = tower_q[0] - tower_inst.instances.set(Instance.objects.all_non_isolated()) - instances_hostnames = [i.hostname for i in tower_inst.instances.all()] - logger.info(six.text_type("Setting 'tower' group instances to {}").format(instances_hostnames)) - tower_inst.save() - else: - logger.warn(six.text_type("Special 'tower' Instance Group not found.")) - # Process policy instance list first, these will represent manually managed instances # that will not go through automatic policy determination - for ig in InstanceGroup.objects.exclude(name='tower'): + for ig in InstanceGroup.objects.all(): logger.info(six.text_type("Considering group {}").format(ig.name)) ig.instances.clear() group_actual = Group(obj=ig, instances=[]) @@ -269,7 +258,7 @@ def handle_update_celery_hostname(sender, instance, **kwargs): logger.warn(six.text_type("Set hostname to {}").format(instance.hostname)) -@shared_task(queue='tower') +@shared_task(queue=settings.CELERY_DEFAULT_QUEUE) def send_notifications(notification_list, job_id=None): if not isinstance(notification_list, list): raise TypeError("notification_list should be of type list") @@ -293,7 +282,7 @@ def send_notifications(notification_list, job_id=None): notification.save() -@shared_task(bind=True, queue='tower') +@shared_task(bind=True, queue=settings.CELERY_DEFAULT_QUEUE) def run_administrative_checks(self): logger.warn("Running administrative checks.") if not settings.TOWER_ADMIN_ALERTS: @@ -421,7 +410,7 @@ def awx_isolated_heartbeat(self): isolated_manager.IsolatedManager.health_check(isolated_instance_qs, awx_application_version) -@shared_task(bind=True, queue='tower') +@shared_task(bind=True, queue=settings.CELERY_DEFAULT_QUEUE) def awx_periodic_scheduler(self): run_now = now() state = TowerScheduleState.get_solo() @@ -456,7 +445,7 @@ def awx_periodic_scheduler(self): state.save() -@shared_task(bind=True, queue='tower') +@shared_task(bind=True, queue=settings.CELERY_DEFAULT_QUEUE) def handle_work_success(self, result, task_actual): try: instance = UnifiedJob.get_instance_by_type(task_actual['type'], task_actual['id']) @@ -470,7 +459,7 @@ def handle_work_success(self, result, task_actual): run_job_complete.delay(instance.id) -@shared_task(queue='tower') +@shared_task(queue=settings.CELERY_DEFAULT_QUEUE) def handle_work_error(task_id, *args, **kwargs): subtasks = kwargs.get('subtasks', None) logger.debug('Executing error task id %s, subtasks: %s' % (task_id, str(subtasks))) @@ -511,7 +500,7 @@ def handle_work_error(task_id, *args, **kwargs): pass -@shared_task(queue='tower') +@shared_task(queue=settings.CELERY_DEFAULT_QUEUE) def update_inventory_computed_fields(inventory_id, should_update_hosts=True): ''' Signal handler and wrapper around inventory.update_computed_fields to @@ -531,7 +520,7 @@ def update_inventory_computed_fields(inventory_id, should_update_hosts=True): raise -@shared_task(queue='tower') +@shared_task(queue=settings.CELERY_DEFAULT_QUEUE) def update_host_smart_inventory_memberships(): try: with transaction.atomic(): @@ -556,7 +545,7 @@ def update_host_smart_inventory_memberships(): smart_inventory.update_computed_fields(update_groups=False, update_hosts=False) -@shared_task(bind=True, queue='tower', max_retries=5) +@shared_task(bind=True, queue=settings.CELERY_DEFAULT_QUEUE, max_retries=5) def delete_inventory(self, inventory_id, user_id): # Delete inventory as user if user_id is None: @@ -2333,7 +2322,7 @@ def _reconstruct_relationships(copy_mapping): new_obj.save() -@shared_task(bind=True, queue='tower') +@shared_task(bind=True, queue=settings.CELERY_DEFAULT_QUEUE) def deep_copy_model_obj( self, model_module, model_name, obj_pk, new_obj_pk, user_pk, sub_obj_list, permission_check_func=None diff --git a/awx/main/tests/functional/api/test_instance_group.py b/awx/main/tests/functional/api/test_instance_group.py index f7978ca8da..0488453fcc 100644 --- a/awx/main/tests/functional/api/test_instance_group.py +++ b/awx/main/tests/functional/api/test_instance_group.py @@ -8,15 +8,15 @@ from awx.main.models import ( @pytest.fixture -def instance_group(job_factory): - ig = InstanceGroup(name="east") +def tower_instance_group(): + ig = InstanceGroup(name='tower') ig.save() return ig @pytest.fixture -def tower_instance_group(): - ig = InstanceGroup(name='tower') +def instance_group(job_factory): + ig = InstanceGroup(name="east") ig.save() return ig @@ -84,16 +84,10 @@ def test_modify_delete_tower_instance_group_prevented(delete, options, tower_ins url = reverse("api:instance_group_detail", kwargs={'pk': tower_instance_group.pk}) super_user = user('bob', True) - # DELETE tower group not allowed delete(url, None, super_user, expect=403) - # OPTIONS should just be "GET" resp = options(url, None, super_user, expect=200) - assert len(resp.data['actions'].keys()) == 1 + assert len(resp.data['actions'].keys()) == 2 + assert 'DELETE' not in resp.data['actions'] assert 'GET' in resp.data['actions'] - - # Updating tower group fields not allowed - patch(url, {'name': 'foobar'}, super_user, expect=403) - patch(url, {'policy_instance_percentage': 40}, super_user, expect=403) - put(url, {'name': 'foobar'}, super_user, expect=403) - + assert 'PUT' in resp.data['actions'] diff --git a/awx/main/tests/functional/task_management/test_rampart_groups.py b/awx/main/tests/functional/task_management/test_rampart_groups.py index e71afa09db..9b4b3eac44 100644 --- a/awx/main/tests/functional/task_management/test_rampart_groups.py +++ b/awx/main/tests/functional/task_management/test_rampart_groups.py @@ -162,7 +162,6 @@ def test_instance_group_basic_policies(instance_factory, instance_group_factory) i2 = instance_factory("i2") i3 = instance_factory("i3") i4 = instance_factory("i4") - instance_group_factory("tower") ig0 = instance_group_factory("ig0") ig1 = instance_group_factory("ig1", minimum=2) ig2 = instance_group_factory("ig2", percentage=50) @@ -175,7 +174,7 @@ def test_instance_group_basic_policies(instance_factory, instance_group_factory) ig2 = InstanceGroup.objects.get(id=ig2.id) ig3 = InstanceGroup.objects.get(id=ig3.id) assert len(ig0.instances.all()) == 1 - assert i0 in ig0.instances.all() + assert i0 in ig0.instances.all() assert len(InstanceGroup.objects.get(id=ig1.id).instances.all()) == 2 assert i1 in ig1.instances.all() assert i2 in ig1.instances.all() diff --git a/awx/main/tests/functional/test_instances.py b/awx/main/tests/functional/test_instances.py index e2d02a5df4..11484dfc6e 100644 --- a/awx/main/tests/functional/test_instances.py +++ b/awx/main/tests/functional/test_instances.py @@ -31,32 +31,15 @@ def test_instance_dup(org_admin, organization, project, instance_factory, instan assert api_num_instances_oa == (actual_num_instances - 1) -@pytest.mark.django_db -@mock.patch('awx.main.tasks.handle_ha_toplogy_changes', return_value=None) -def test_policy_instance_tower_group(mock, instance_factory, instance_group_factory): - instance_factory("i1") - instance_factory("i2") - instance_factory("i3") - ig_t = instance_group_factory("tower") - instance_group_factory("ig1", percentage=25) - instance_group_factory("ig2", percentage=25) - instance_group_factory("ig3", percentage=25) - instance_group_factory("ig4", percentage=25) - apply_cluster_membership_policies() - assert len(ig_t.instances.all()) == 3 - - @pytest.mark.django_db @mock.patch('awx.main.tasks.handle_ha_toplogy_changes', return_value=None) def test_policy_instance_few_instances(mock, instance_factory, instance_group_factory): i1 = instance_factory("i1") - ig_t = instance_group_factory("tower") ig_1 = instance_group_factory("ig1", percentage=25) ig_2 = instance_group_factory("ig2", percentage=25) ig_3 = instance_group_factory("ig3", percentage=25) ig_4 = instance_group_factory("ig4", percentage=25) apply_cluster_membership_policies() - assert len(ig_t.instances.all()) == 1 assert len(ig_1.instances.all()) == 1 assert i1 in ig_1.instances.all() assert len(ig_2.instances.all()) == 1 @@ -67,7 +50,6 @@ def test_policy_instance_few_instances(mock, instance_factory, instance_group_fa assert i1 in ig_4.instances.all() i2 = instance_factory("i2") apply_cluster_membership_policies() - assert len(ig_t.instances.all()) == 2 assert len(ig_1.instances.all()) == 1 assert i1 in ig_1.instances.all() assert len(ig_2.instances.all()) == 1 @@ -84,13 +66,11 @@ def test_policy_instance_distribution_uneven(mock, instance_factory, instance_gr i1 = instance_factory("i1") i2 = instance_factory("i2") i3 = instance_factory("i3") - ig_t = instance_group_factory("tower") ig_1 = instance_group_factory("ig1", percentage=25) ig_2 = instance_group_factory("ig2", percentage=25) ig_3 = instance_group_factory("ig3", percentage=25) ig_4 = instance_group_factory("ig4", percentage=25) apply_cluster_membership_policies() - assert len(ig_t.instances.all()) == 3 assert len(ig_1.instances.all()) == 1 assert i1 in ig_1.instances.all() assert len(ig_2.instances.all()) == 1 @@ -108,13 +88,11 @@ def test_policy_instance_distribution_even(mock, instance_factory, instance_grou i2 = instance_factory("i2") i3 = instance_factory("i3") i4 = instance_factory("i4") - ig_t = instance_group_factory("tower") ig_1 = instance_group_factory("ig1", percentage=25) ig_2 = instance_group_factory("ig2", percentage=25) ig_3 = instance_group_factory("ig3", percentage=25) ig_4 = instance_group_factory("ig4", percentage=25) apply_cluster_membership_policies() - assert len(ig_t.instances.all()) == 4 assert len(ig_1.instances.all()) == 1 assert i1 in ig_1.instances.all() assert len(ig_2.instances.all()) == 1 @@ -126,7 +104,6 @@ def test_policy_instance_distribution_even(mock, instance_factory, instance_grou ig_1.policy_instance_minimum = 2 ig_1.save() apply_cluster_membership_policies() - assert len(ig_t.instances.all()) == 4 assert len(ig_1.instances.all()) == 2 assert i1 in ig_1.instances.all() assert i2 in ig_1.instances.all() @@ -145,13 +122,11 @@ def test_policy_instance_distribution_simultaneous(mock, instance_factory, insta i2 = instance_factory("i2") i3 = instance_factory("i3") i4 = instance_factory("i4") - ig_t = instance_group_factory("tower") ig_1 = instance_group_factory("ig1", percentage=25, minimum=2) ig_2 = instance_group_factory("ig2", percentage=25) ig_3 = instance_group_factory("ig3", percentage=25) ig_4 = instance_group_factory("ig4", percentage=25) apply_cluster_membership_policies() - assert len(ig_t.instances.all()) == 4 assert len(ig_1.instances.all()) == 2 assert i1 in ig_1.instances.all() assert i2 in ig_1.instances.all() @@ -168,13 +143,11 @@ def test_policy_instance_distribution_simultaneous(mock, instance_factory, insta def test_policy_instance_list_manually_managed(mock, instance_factory, instance_group_factory): i1 = instance_factory("i1") i2 = instance_factory("i2") - ig_t = instance_group_factory("tower") ig_1 = instance_group_factory("ig1", percentage=100, minimum=2) ig_2 = instance_group_factory("ig2") ig_2.policy_instance_list = [i2.hostname] ig_2.save() apply_cluster_membership_policies() - assert len(ig_t.instances.all()) == 2 assert len(ig_1.instances.all()) == 1 assert i1 in ig_1.instances.all() assert i2 not in ig_1.instances.all() diff --git a/awx/main/tests/unit/utils/test_ha.py b/awx/main/tests/unit/utils/test_ha.py index 95e7aa260b..edd44b7958 100644 --- a/awx/main/tests/unit/utils/test_ha.py +++ b/awx/main/tests/unit/utils/test_ha.py @@ -60,7 +60,6 @@ class TestAddRemoveCeleryWorkerQueues(): static_queues, _worker_queues, groups, hostname, added_expected, removed_expected): - added_expected.append('tower_instance_router') instance = instance_generator(groups=groups, hostname=hostname) worker_queues = worker_queues_generator(_worker_queues) with mock.patch('awx.main.utils.ha.settings.AWX_CELERY_QUEUES_STATIC', static_queues): diff --git a/awx/main/utils/ha.py b/awx/main/utils/ha.py index 9aef118fe7..93a7f8dd24 100644 --- a/awx/main/utils/ha.py +++ b/awx/main/utils/ha.py @@ -3,9 +3,6 @@ # Copyright (c) 2017 Ansible Tower by Red Hat # All Rights Reserved. -# Python -import six - # Django from django.conf import settings @@ -16,24 +13,26 @@ from awx.main.models import Instance def _add_remove_celery_worker_queues(app, controlled_instances, worker_queues, worker_name): removed_queues = [] added_queues = [] - ig_names = set([six.text_type('tower_instance_router')]) + ig_names = set() hostnames = set([instance.hostname for instance in controlled_instances]) for instance in controlled_instances: ig_names.update(instance.rampart_groups.values_list('name', flat=True)) worker_queue_names = set([q['name'] for q in worker_queues]) + all_queue_names = ig_names | hostnames | set(settings.AWX_CELERY_QUEUES_STATIC) + # Remove queues that aren't in the instance group for queue in worker_queues: if queue['name'] in settings.AWX_CELERY_QUEUES_STATIC or \ - queue['alias'] in settings.AWX_CELERY_QUEUES_STATIC: + queue['alias'] in settings.AWX_CELERY_BCAST_QUEUES_STATIC: continue - if queue['name'] not in ig_names | hostnames or not instance.enabled: + if queue['name'] not in all_queue_names or not instance.enabled: app.control.cancel_consumer(queue['name'].encode("utf8"), reply=True, destination=[worker_name]) removed_queues.append(queue['name'].encode("utf8")) # Add queues for instance and instance groups - for queue_name in ig_names | hostnames: + for queue_name in all_queue_names: if queue_name not in worker_queue_names: app.control.add_consumer(queue_name.encode("utf8"), reply=True, destination=[worker_name]) added_queues.append(queue_name.encode("utf8")) @@ -76,6 +75,5 @@ def register_celery_worker_queues(app, celery_worker_name): celery_worker_queues = celery_host_queues[celery_worker_name] if celery_host_queues else [] (added_queues, removed_queues) = _add_remove_celery_worker_queues(app, controlled_instances, celery_worker_queues, celery_worker_name) - return (controlled_instances, removed_queues, added_queues) diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 8d84b2951e..bd56e8972e 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -6,9 +6,9 @@ import re # noqa import sys import ldap import djcelery +import six from datetime import timedelta -from kombu import Queue, Exchange from kombu.common import Broadcast # global settings @@ -451,7 +451,7 @@ djcelery.setup_loader() BROKER_POOL_LIMIT = None BROKER_URL = 'amqp://guest:guest@localhost:5672//' CELERY_EVENT_QUEUE_TTL = 5 -CELERY_DEFAULT_QUEUE = 'tower' +CELERY_DEFAULT_QUEUE = 'awx_private_queue' CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_ACCEPT_CONTENT = ['json'] @@ -463,8 +463,7 @@ CELERYD_AUTOSCALER = 'awx.main.utils.autoscale:DynamicAutoScaler' CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend' CELERY_IMPORTS = ('awx.main.scheduler.tasks',) CELERY_QUEUES = ( - Queue('tower', Exchange('tower'), routing_key='tower'), - Broadcast('tower_broadcast_all') + Broadcast('tower_broadcast_all'), ) CELERY_ROUTES = {} @@ -515,7 +514,13 @@ AWX_INCONSISTENT_TASK_INTERVAL = 60 * 3 # Celery queues that will always be listened to by celery workers # Note: Broadcast queues have unique, auto-generated names, with the alias # property value of the original queue name. -AWX_CELERY_QUEUES_STATIC = ['tower_broadcast_all',] +AWX_CELERY_QUEUES_STATIC = [ + six.text_type(CELERY_DEFAULT_QUEUE), +] + +AWX_CELERY_BCAST_QUEUES_STATIC = [ + six.text_type('tower_broadcast_all'), +] ASGI_AMQP = { 'INIT_FUNC': 'awx.prepare_env',