Merge pull request #1410 from chrismeyersfsu/fix-revert_tower_special_group

send all tower work to a user-hidden queue
This commit is contained in:
Chris Meyers 2018-04-20 14:21:50 -04:00 committed by GitHub
commit a2901a47ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 38 additions and 92 deletions

View File

@ -323,7 +323,7 @@ celeryd:
@if [ "$(VENV_BASE)" ]; then \
. $(VENV_BASE)/awx/bin/activate; \
fi; \
celery worker -A awx -l DEBUG -B -Ofair --autoscale=100,4 --schedule=$(CELERY_SCHEDULE_FILE) -n celery@$(COMPOSE_HOST) --pidfile /tmp/celery_pid
celery worker -A awx -l DEBUG -B -Ofair --autoscale=100,4 --schedule=$(CELERY_SCHEDULE_FILE) --pidfile /tmp/celery_pid
# Run to start the zeromq callback receiver
receiver:

View File

@ -231,7 +231,7 @@ class IsSuperUser(permissions.BasePermission):
class InstanceGroupTowerPermission(ModelAccessPermission):
def has_object_permission(self, request, view, obj):
if request.method not in permissions.SAFE_METHODS and obj.name == "tower":
if request.method == 'DELETE' and obj.name == "tower":
return False
return super(InstanceGroupTowerPermission, self).has_object_permission(request, view, obj)

View File

@ -455,15 +455,6 @@ class InstanceGroupAccess(BaseAccess):
def can_change(self, obj, data):
return self.user.is_superuser
def can_delete(self, obj):
return self.user.is_superuser
def can_attach(self, obj, sub_obj, relationship, *args, **kwargs):
return self.user.is_superuser
def can_unattach(self, obj, sub_obj, relationship, *args, **kwargs):
return self.user.is_superuser
class UserAccess(BaseAccess):
'''

View File

@ -403,9 +403,7 @@ class Command(BaseCommand):
_eager_fields=dict(
job_args=json.dumps(sys.argv),
job_env=dict(os.environ.items()),
job_cwd=os.getcwd(),
execution_node=settings.CLUSTER_HOST_ID,
instance_group=InstanceGroup.objects.get(name='tower'))
job_cwd=os.getcwd())
)
# FIXME: Wait or raise error if inventory is being updated by another

View File

@ -137,7 +137,7 @@ def inform_cluster_of_shutdown(*args, **kwargs):
logger.exception('Encountered problem with normal shutdown signal.')
@shared_task(bind=True, queue='tower_instance_router')
@shared_task(bind=True, queue=settings.CELERY_DEFAULT_QUEUE)
def apply_cluster_membership_policies(self):
with advisory_lock('cluster_policy_lock', wait=True):
considered_instances = Instance.objects.all().order_by('id')
@ -148,20 +148,9 @@ def apply_cluster_membership_policies(self):
Group = namedtuple('Group', ['obj', 'instances'])
Node = namedtuple('Instance', ['obj', 'groups'])
# Add every instance to the special 'tower' group
tower_q = InstanceGroup.objects.filter(name='tower')
if tower_q.exists():
tower_inst = tower_q[0]
tower_inst.instances.set(Instance.objects.all_non_isolated())
instances_hostnames = [i.hostname for i in tower_inst.instances.all()]
logger.info(six.text_type("Setting 'tower' group instances to {}").format(instances_hostnames))
tower_inst.save()
else:
logger.warn(six.text_type("Special 'tower' Instance Group not found."))
# Process policy instance list first, these will represent manually managed instances
# that will not go through automatic policy determination
for ig in InstanceGroup.objects.exclude(name='tower'):
for ig in InstanceGroup.objects.all():
logger.info(six.text_type("Considering group {}").format(ig.name))
ig.instances.clear()
group_actual = Group(obj=ig, instances=[])
@ -269,7 +258,7 @@ def handle_update_celery_hostname(sender, instance, **kwargs):
logger.warn(six.text_type("Set hostname to {}").format(instance.hostname))
@shared_task(queue='tower')
@shared_task(queue=settings.CELERY_DEFAULT_QUEUE)
def send_notifications(notification_list, job_id=None):
if not isinstance(notification_list, list):
raise TypeError("notification_list should be of type list")
@ -293,7 +282,7 @@ def send_notifications(notification_list, job_id=None):
notification.save()
@shared_task(bind=True, queue='tower')
@shared_task(bind=True, queue=settings.CELERY_DEFAULT_QUEUE)
def run_administrative_checks(self):
logger.warn("Running administrative checks.")
if not settings.TOWER_ADMIN_ALERTS:
@ -421,7 +410,7 @@ def awx_isolated_heartbeat(self):
isolated_manager.IsolatedManager.health_check(isolated_instance_qs, awx_application_version)
@shared_task(bind=True, queue='tower')
@shared_task(bind=True, queue=settings.CELERY_DEFAULT_QUEUE)
def awx_periodic_scheduler(self):
run_now = now()
state = TowerScheduleState.get_solo()
@ -456,7 +445,7 @@ def awx_periodic_scheduler(self):
state.save()
@shared_task(bind=True, queue='tower')
@shared_task(bind=True, queue=settings.CELERY_DEFAULT_QUEUE)
def handle_work_success(self, result, task_actual):
try:
instance = UnifiedJob.get_instance_by_type(task_actual['type'], task_actual['id'])
@ -470,7 +459,7 @@ def handle_work_success(self, result, task_actual):
run_job_complete.delay(instance.id)
@shared_task(queue='tower')
@shared_task(queue=settings.CELERY_DEFAULT_QUEUE)
def handle_work_error(task_id, *args, **kwargs):
subtasks = kwargs.get('subtasks', None)
logger.debug('Executing error task id %s, subtasks: %s' % (task_id, str(subtasks)))
@ -511,7 +500,7 @@ def handle_work_error(task_id, *args, **kwargs):
pass
@shared_task(queue='tower')
@shared_task(queue=settings.CELERY_DEFAULT_QUEUE)
def update_inventory_computed_fields(inventory_id, should_update_hosts=True):
'''
Signal handler and wrapper around inventory.update_computed_fields to
@ -531,7 +520,7 @@ def update_inventory_computed_fields(inventory_id, should_update_hosts=True):
raise
@shared_task(queue='tower')
@shared_task(queue=settings.CELERY_DEFAULT_QUEUE)
def update_host_smart_inventory_memberships():
try:
with transaction.atomic():
@ -556,7 +545,7 @@ def update_host_smart_inventory_memberships():
smart_inventory.update_computed_fields(update_groups=False, update_hosts=False)
@shared_task(bind=True, queue='tower', max_retries=5)
@shared_task(bind=True, queue=settings.CELERY_DEFAULT_QUEUE, max_retries=5)
def delete_inventory(self, inventory_id, user_id):
# Delete inventory as user
if user_id is None:
@ -2333,7 +2322,7 @@ def _reconstruct_relationships(copy_mapping):
new_obj.save()
@shared_task(bind=True, queue='tower')
@shared_task(bind=True, queue=settings.CELERY_DEFAULT_QUEUE)
def deep_copy_model_obj(
self, model_module, model_name, obj_pk, new_obj_pk,
user_pk, sub_obj_list, permission_check_func=None

View File

@ -8,15 +8,15 @@ from awx.main.models import (
@pytest.fixture
def instance_group(job_factory):
ig = InstanceGroup(name="east")
def tower_instance_group():
ig = InstanceGroup(name='tower')
ig.save()
return ig
@pytest.fixture
def tower_instance_group():
ig = InstanceGroup(name='tower')
def instance_group(job_factory):
ig = InstanceGroup(name="east")
ig.save()
return ig
@ -84,16 +84,10 @@ def test_modify_delete_tower_instance_group_prevented(delete, options, tower_ins
url = reverse("api:instance_group_detail", kwargs={'pk': tower_instance_group.pk})
super_user = user('bob', True)
# DELETE tower group not allowed
delete(url, None, super_user, expect=403)
# OPTIONS should just be "GET"
resp = options(url, None, super_user, expect=200)
assert len(resp.data['actions'].keys()) == 1
assert len(resp.data['actions'].keys()) == 2
assert 'DELETE' not in resp.data['actions']
assert 'GET' in resp.data['actions']
# Updating tower group fields not allowed
patch(url, {'name': 'foobar'}, super_user, expect=403)
patch(url, {'policy_instance_percentage': 40}, super_user, expect=403)
put(url, {'name': 'foobar'}, super_user, expect=403)
assert 'PUT' in resp.data['actions']

View File

@ -162,7 +162,6 @@ def test_instance_group_basic_policies(instance_factory, instance_group_factory)
i2 = instance_factory("i2")
i3 = instance_factory("i3")
i4 = instance_factory("i4")
instance_group_factory("tower")
ig0 = instance_group_factory("ig0")
ig1 = instance_group_factory("ig1", minimum=2)
ig2 = instance_group_factory("ig2", percentage=50)
@ -175,7 +174,7 @@ def test_instance_group_basic_policies(instance_factory, instance_group_factory)
ig2 = InstanceGroup.objects.get(id=ig2.id)
ig3 = InstanceGroup.objects.get(id=ig3.id)
assert len(ig0.instances.all()) == 1
assert i0 in ig0.instances.all()
assert i0 in ig0.instances.all()
assert len(InstanceGroup.objects.get(id=ig1.id).instances.all()) == 2
assert i1 in ig1.instances.all()
assert i2 in ig1.instances.all()

View File

@ -31,32 +31,15 @@ def test_instance_dup(org_admin, organization, project, instance_factory, instan
assert api_num_instances_oa == (actual_num_instances - 1)
@pytest.mark.django_db
@mock.patch('awx.main.tasks.handle_ha_toplogy_changes', return_value=None)
def test_policy_instance_tower_group(mock, instance_factory, instance_group_factory):
instance_factory("i1")
instance_factory("i2")
instance_factory("i3")
ig_t = instance_group_factory("tower")
instance_group_factory("ig1", percentage=25)
instance_group_factory("ig2", percentage=25)
instance_group_factory("ig3", percentage=25)
instance_group_factory("ig4", percentage=25)
apply_cluster_membership_policies()
assert len(ig_t.instances.all()) == 3
@pytest.mark.django_db
@mock.patch('awx.main.tasks.handle_ha_toplogy_changes', return_value=None)
def test_policy_instance_few_instances(mock, instance_factory, instance_group_factory):
i1 = instance_factory("i1")
ig_t = instance_group_factory("tower")
ig_1 = instance_group_factory("ig1", percentage=25)
ig_2 = instance_group_factory("ig2", percentage=25)
ig_3 = instance_group_factory("ig3", percentage=25)
ig_4 = instance_group_factory("ig4", percentage=25)
apply_cluster_membership_policies()
assert len(ig_t.instances.all()) == 1
assert len(ig_1.instances.all()) == 1
assert i1 in ig_1.instances.all()
assert len(ig_2.instances.all()) == 1
@ -67,7 +50,6 @@ def test_policy_instance_few_instances(mock, instance_factory, instance_group_fa
assert i1 in ig_4.instances.all()
i2 = instance_factory("i2")
apply_cluster_membership_policies()
assert len(ig_t.instances.all()) == 2
assert len(ig_1.instances.all()) == 1
assert i1 in ig_1.instances.all()
assert len(ig_2.instances.all()) == 1
@ -84,13 +66,11 @@ def test_policy_instance_distribution_uneven(mock, instance_factory, instance_gr
i1 = instance_factory("i1")
i2 = instance_factory("i2")
i3 = instance_factory("i3")
ig_t = instance_group_factory("tower")
ig_1 = instance_group_factory("ig1", percentage=25)
ig_2 = instance_group_factory("ig2", percentage=25)
ig_3 = instance_group_factory("ig3", percentage=25)
ig_4 = instance_group_factory("ig4", percentage=25)
apply_cluster_membership_policies()
assert len(ig_t.instances.all()) == 3
assert len(ig_1.instances.all()) == 1
assert i1 in ig_1.instances.all()
assert len(ig_2.instances.all()) == 1
@ -108,13 +88,11 @@ def test_policy_instance_distribution_even(mock, instance_factory, instance_grou
i2 = instance_factory("i2")
i3 = instance_factory("i3")
i4 = instance_factory("i4")
ig_t = instance_group_factory("tower")
ig_1 = instance_group_factory("ig1", percentage=25)
ig_2 = instance_group_factory("ig2", percentage=25)
ig_3 = instance_group_factory("ig3", percentage=25)
ig_4 = instance_group_factory("ig4", percentage=25)
apply_cluster_membership_policies()
assert len(ig_t.instances.all()) == 4
assert len(ig_1.instances.all()) == 1
assert i1 in ig_1.instances.all()
assert len(ig_2.instances.all()) == 1
@ -126,7 +104,6 @@ def test_policy_instance_distribution_even(mock, instance_factory, instance_grou
ig_1.policy_instance_minimum = 2
ig_1.save()
apply_cluster_membership_policies()
assert len(ig_t.instances.all()) == 4
assert len(ig_1.instances.all()) == 2
assert i1 in ig_1.instances.all()
assert i2 in ig_1.instances.all()
@ -145,13 +122,11 @@ def test_policy_instance_distribution_simultaneous(mock, instance_factory, insta
i2 = instance_factory("i2")
i3 = instance_factory("i3")
i4 = instance_factory("i4")
ig_t = instance_group_factory("tower")
ig_1 = instance_group_factory("ig1", percentage=25, minimum=2)
ig_2 = instance_group_factory("ig2", percentage=25)
ig_3 = instance_group_factory("ig3", percentage=25)
ig_4 = instance_group_factory("ig4", percentage=25)
apply_cluster_membership_policies()
assert len(ig_t.instances.all()) == 4
assert len(ig_1.instances.all()) == 2
assert i1 in ig_1.instances.all()
assert i2 in ig_1.instances.all()
@ -168,13 +143,11 @@ def test_policy_instance_distribution_simultaneous(mock, instance_factory, insta
def test_policy_instance_list_manually_managed(mock, instance_factory, instance_group_factory):
i1 = instance_factory("i1")
i2 = instance_factory("i2")
ig_t = instance_group_factory("tower")
ig_1 = instance_group_factory("ig1", percentage=100, minimum=2)
ig_2 = instance_group_factory("ig2")
ig_2.policy_instance_list = [i2.hostname]
ig_2.save()
apply_cluster_membership_policies()
assert len(ig_t.instances.all()) == 2
assert len(ig_1.instances.all()) == 1
assert i1 in ig_1.instances.all()
assert i2 not in ig_1.instances.all()

View File

@ -60,7 +60,6 @@ class TestAddRemoveCeleryWorkerQueues():
static_queues, _worker_queues,
groups, hostname,
added_expected, removed_expected):
added_expected.append('tower_instance_router')
instance = instance_generator(groups=groups, hostname=hostname)
worker_queues = worker_queues_generator(_worker_queues)
with mock.patch('awx.main.utils.ha.settings.AWX_CELERY_QUEUES_STATIC', static_queues):

View File

@ -3,9 +3,6 @@
# Copyright (c) 2017 Ansible Tower by Red Hat
# All Rights Reserved.
# Python
import six
# Django
from django.conf import settings
@ -16,24 +13,26 @@ from awx.main.models import Instance
def _add_remove_celery_worker_queues(app, controlled_instances, worker_queues, worker_name):
removed_queues = []
added_queues = []
ig_names = set([six.text_type('tower_instance_router')])
ig_names = set()
hostnames = set([instance.hostname for instance in controlled_instances])
for instance in controlled_instances:
ig_names.update(instance.rampart_groups.values_list('name', flat=True))
worker_queue_names = set([q['name'] for q in worker_queues])
all_queue_names = ig_names | hostnames | set(settings.AWX_CELERY_QUEUES_STATIC)
# Remove queues that aren't in the instance group
for queue in worker_queues:
if queue['name'] in settings.AWX_CELERY_QUEUES_STATIC or \
queue['alias'] in settings.AWX_CELERY_QUEUES_STATIC:
queue['alias'] in settings.AWX_CELERY_BCAST_QUEUES_STATIC:
continue
if queue['name'] not in ig_names | hostnames or not instance.enabled:
if queue['name'] not in all_queue_names or not instance.enabled:
app.control.cancel_consumer(queue['name'].encode("utf8"), reply=True, destination=[worker_name])
removed_queues.append(queue['name'].encode("utf8"))
# Add queues for instance and instance groups
for queue_name in ig_names | hostnames:
for queue_name in all_queue_names:
if queue_name not in worker_queue_names:
app.control.add_consumer(queue_name.encode("utf8"), reply=True, destination=[worker_name])
added_queues.append(queue_name.encode("utf8"))
@ -76,6 +75,5 @@ def register_celery_worker_queues(app, celery_worker_name):
celery_worker_queues = celery_host_queues[celery_worker_name] if celery_host_queues else []
(added_queues, removed_queues) = _add_remove_celery_worker_queues(app, controlled_instances,
celery_worker_queues, celery_worker_name)
return (controlled_instances, removed_queues, added_queues)

View File

@ -6,9 +6,9 @@ import re # noqa
import sys
import ldap
import djcelery
import six
from datetime import timedelta
from kombu import Queue, Exchange
from kombu.common import Broadcast
# global settings
@ -451,7 +451,7 @@ djcelery.setup_loader()
BROKER_POOL_LIMIT = None
BROKER_URL = 'amqp://guest:guest@localhost:5672//'
CELERY_EVENT_QUEUE_TTL = 5
CELERY_DEFAULT_QUEUE = 'tower'
CELERY_DEFAULT_QUEUE = 'awx_private_queue'
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
@ -463,8 +463,7 @@ CELERYD_AUTOSCALER = 'awx.main.utils.autoscale:DynamicAutoScaler'
CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend'
CELERY_IMPORTS = ('awx.main.scheduler.tasks',)
CELERY_QUEUES = (
Queue('tower', Exchange('tower'), routing_key='tower'),
Broadcast('tower_broadcast_all')
Broadcast('tower_broadcast_all'),
)
CELERY_ROUTES = {}
@ -515,7 +514,13 @@ AWX_INCONSISTENT_TASK_INTERVAL = 60 * 3
# Celery queues that will always be listened to by celery workers
# Note: Broadcast queues have unique, auto-generated names, with the alias
# property value of the original queue name.
AWX_CELERY_QUEUES_STATIC = ['tower_broadcast_all',]
AWX_CELERY_QUEUES_STATIC = [
six.text_type(CELERY_DEFAULT_QUEUE),
]
AWX_CELERY_BCAST_QUEUES_STATIC = [
six.text_type('tower_broadcast_all'),
]
ASGI_AMQP = {
'INIT_FUNC': 'awx.prepare_env',