mirror of
https://github.com/ansible/awx.git
synced 2026-01-14 03:10:42 -03:30
send all tower work to a user-hidden queue
* Before, we had a special group, tower, that ran any async work that tower needed done. This allowed users fine grain control over which nodes did background work. However, this granularity was too complicated for users. So now, all tower system work goes to a special non-user exposed celery queue. Tower remains the fallback instance group to execute jobs on. The tower group will be created upon install and protected from deletion.
This commit is contained in:
parent
6595515987
commit
a56771c8f0
2
Makefile
2
Makefile
@ -323,7 +323,7 @@ celeryd:
|
||||
@if [ "$(VENV_BASE)" ]; then \
|
||||
. $(VENV_BASE)/awx/bin/activate; \
|
||||
fi; \
|
||||
celery worker -A awx -l DEBUG -B -Ofair --autoscale=100,4 --schedule=$(CELERY_SCHEDULE_FILE) -n celery@$(COMPOSE_HOST) --pidfile /tmp/celery_pid
|
||||
celery worker -A awx -l DEBUG -B -Ofair --autoscale=100,4 --schedule=$(CELERY_SCHEDULE_FILE) --pidfile /tmp/celery_pid
|
||||
|
||||
# Run to start the zeromq callback receiver
|
||||
receiver:
|
||||
|
||||
@ -231,7 +231,7 @@ class IsSuperUser(permissions.BasePermission):
|
||||
|
||||
class InstanceGroupTowerPermission(ModelAccessPermission):
|
||||
def has_object_permission(self, request, view, obj):
|
||||
if request.method not in permissions.SAFE_METHODS and obj.name == "tower":
|
||||
if request.method == 'DELETE' and obj.name == "tower":
|
||||
return False
|
||||
return super(InstanceGroupTowerPermission, self).has_object_permission(request, view, obj)
|
||||
|
||||
|
||||
@ -455,15 +455,6 @@ class InstanceGroupAccess(BaseAccess):
|
||||
def can_change(self, obj, data):
|
||||
return self.user.is_superuser
|
||||
|
||||
def can_delete(self, obj):
|
||||
return self.user.is_superuser
|
||||
|
||||
def can_attach(self, obj, sub_obj, relationship, *args, **kwargs):
|
||||
return self.user.is_superuser
|
||||
|
||||
def can_unattach(self, obj, sub_obj, relationship, *args, **kwargs):
|
||||
return self.user.is_superuser
|
||||
|
||||
|
||||
class UserAccess(BaseAccess):
|
||||
'''
|
||||
|
||||
@ -403,9 +403,7 @@ class Command(BaseCommand):
|
||||
_eager_fields=dict(
|
||||
job_args=json.dumps(sys.argv),
|
||||
job_env=dict(os.environ.items()),
|
||||
job_cwd=os.getcwd(),
|
||||
execution_node=settings.CLUSTER_HOST_ID,
|
||||
instance_group=InstanceGroup.objects.get(name='tower'))
|
||||
job_cwd=os.getcwd())
|
||||
)
|
||||
|
||||
# FIXME: Wait or raise error if inventory is being updated by another
|
||||
|
||||
@ -137,7 +137,7 @@ def inform_cluster_of_shutdown(*args, **kwargs):
|
||||
logger.exception('Encountered problem with normal shutdown signal.')
|
||||
|
||||
|
||||
@shared_task(bind=True, queue='tower_instance_router')
|
||||
@shared_task(bind=True, queue=settings.CELERY_DEFAULT_QUEUE)
|
||||
def apply_cluster_membership_policies(self):
|
||||
with advisory_lock('cluster_policy_lock', wait=True):
|
||||
considered_instances = Instance.objects.all().order_by('id')
|
||||
@ -148,20 +148,9 @@ def apply_cluster_membership_policies(self):
|
||||
Group = namedtuple('Group', ['obj', 'instances'])
|
||||
Node = namedtuple('Instance', ['obj', 'groups'])
|
||||
|
||||
# Add every instance to the special 'tower' group
|
||||
tower_q = InstanceGroup.objects.filter(name='tower')
|
||||
if tower_q.exists():
|
||||
tower_inst = tower_q[0]
|
||||
tower_inst.instances.set(Instance.objects.all_non_isolated())
|
||||
instances_hostnames = [i.hostname for i in tower_inst.instances.all()]
|
||||
logger.info(six.text_type("Setting 'tower' group instances to {}").format(instances_hostnames))
|
||||
tower_inst.save()
|
||||
else:
|
||||
logger.warn(six.text_type("Special 'tower' Instance Group not found."))
|
||||
|
||||
# Process policy instance list first, these will represent manually managed instances
|
||||
# that will not go through automatic policy determination
|
||||
for ig in InstanceGroup.objects.exclude(name='tower'):
|
||||
for ig in InstanceGroup.objects.all():
|
||||
logger.info(six.text_type("Considering group {}").format(ig.name))
|
||||
ig.instances.clear()
|
||||
group_actual = Group(obj=ig, instances=[])
|
||||
@ -269,7 +258,7 @@ def handle_update_celery_hostname(sender, instance, **kwargs):
|
||||
logger.warn(six.text_type("Set hostname to {}").format(instance.hostname))
|
||||
|
||||
|
||||
@shared_task(queue='tower')
|
||||
@shared_task(queue=settings.CELERY_DEFAULT_QUEUE)
|
||||
def send_notifications(notification_list, job_id=None):
|
||||
if not isinstance(notification_list, list):
|
||||
raise TypeError("notification_list should be of type list")
|
||||
@ -293,7 +282,7 @@ def send_notifications(notification_list, job_id=None):
|
||||
notification.save()
|
||||
|
||||
|
||||
@shared_task(bind=True, queue='tower')
|
||||
@shared_task(bind=True, queue=settings.CELERY_DEFAULT_QUEUE)
|
||||
def run_administrative_checks(self):
|
||||
logger.warn("Running administrative checks.")
|
||||
if not settings.TOWER_ADMIN_ALERTS:
|
||||
@ -421,7 +410,7 @@ def awx_isolated_heartbeat(self):
|
||||
isolated_manager.IsolatedManager.health_check(isolated_instance_qs, awx_application_version)
|
||||
|
||||
|
||||
@shared_task(bind=True, queue='tower')
|
||||
@shared_task(bind=True, queue=settings.CELERY_DEFAULT_QUEUE)
|
||||
def awx_periodic_scheduler(self):
|
||||
run_now = now()
|
||||
state = TowerScheduleState.get_solo()
|
||||
@ -456,7 +445,7 @@ def awx_periodic_scheduler(self):
|
||||
state.save()
|
||||
|
||||
|
||||
@shared_task(bind=True, queue='tower')
|
||||
@shared_task(bind=True, queue=settings.CELERY_DEFAULT_QUEUE)
|
||||
def handle_work_success(self, result, task_actual):
|
||||
try:
|
||||
instance = UnifiedJob.get_instance_by_type(task_actual['type'], task_actual['id'])
|
||||
@ -470,7 +459,7 @@ def handle_work_success(self, result, task_actual):
|
||||
run_job_complete.delay(instance.id)
|
||||
|
||||
|
||||
@shared_task(queue='tower')
|
||||
@shared_task(queue=settings.CELERY_DEFAULT_QUEUE)
|
||||
def handle_work_error(task_id, *args, **kwargs):
|
||||
subtasks = kwargs.get('subtasks', None)
|
||||
logger.debug('Executing error task id %s, subtasks: %s' % (task_id, str(subtasks)))
|
||||
@ -511,7 +500,7 @@ def handle_work_error(task_id, *args, **kwargs):
|
||||
pass
|
||||
|
||||
|
||||
@shared_task(queue='tower')
|
||||
@shared_task(queue=settings.CELERY_DEFAULT_QUEUE)
|
||||
def update_inventory_computed_fields(inventory_id, should_update_hosts=True):
|
||||
'''
|
||||
Signal handler and wrapper around inventory.update_computed_fields to
|
||||
@ -531,7 +520,7 @@ def update_inventory_computed_fields(inventory_id, should_update_hosts=True):
|
||||
raise
|
||||
|
||||
|
||||
@shared_task(queue='tower')
|
||||
@shared_task(queue=settings.CELERY_DEFAULT_QUEUE)
|
||||
def update_host_smart_inventory_memberships():
|
||||
try:
|
||||
with transaction.atomic():
|
||||
@ -556,7 +545,7 @@ def update_host_smart_inventory_memberships():
|
||||
smart_inventory.update_computed_fields(update_groups=False, update_hosts=False)
|
||||
|
||||
|
||||
@shared_task(bind=True, queue='tower', max_retries=5)
|
||||
@shared_task(bind=True, queue=settings.CELERY_DEFAULT_QUEUE, max_retries=5)
|
||||
def delete_inventory(self, inventory_id, user_id):
|
||||
# Delete inventory as user
|
||||
if user_id is None:
|
||||
@ -2333,7 +2322,7 @@ def _reconstruct_relationships(copy_mapping):
|
||||
new_obj.save()
|
||||
|
||||
|
||||
@shared_task(bind=True, queue='tower')
|
||||
@shared_task(bind=True, queue=settings.CELERY_DEFAULT_QUEUE)
|
||||
def deep_copy_model_obj(
|
||||
self, model_module, model_name, obj_pk, new_obj_pk,
|
||||
user_pk, sub_obj_list, permission_check_func=None
|
||||
|
||||
@ -8,15 +8,15 @@ from awx.main.models import (
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def instance_group(job_factory):
|
||||
ig = InstanceGroup(name="east")
|
||||
def tower_instance_group():
|
||||
ig = InstanceGroup(name='tower')
|
||||
ig.save()
|
||||
return ig
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tower_instance_group():
|
||||
ig = InstanceGroup(name='tower')
|
||||
def instance_group(job_factory):
|
||||
ig = InstanceGroup(name="east")
|
||||
ig.save()
|
||||
return ig
|
||||
|
||||
@ -84,16 +84,10 @@ def test_modify_delete_tower_instance_group_prevented(delete, options, tower_ins
|
||||
url = reverse("api:instance_group_detail", kwargs={'pk': tower_instance_group.pk})
|
||||
super_user = user('bob', True)
|
||||
|
||||
# DELETE tower group not allowed
|
||||
delete(url, None, super_user, expect=403)
|
||||
|
||||
# OPTIONS should just be "GET"
|
||||
resp = options(url, None, super_user, expect=200)
|
||||
assert len(resp.data['actions'].keys()) == 1
|
||||
assert len(resp.data['actions'].keys()) == 2
|
||||
assert 'DELETE' not in resp.data['actions']
|
||||
assert 'GET' in resp.data['actions']
|
||||
|
||||
# Updating tower group fields not allowed
|
||||
patch(url, {'name': 'foobar'}, super_user, expect=403)
|
||||
patch(url, {'policy_instance_percentage': 40}, super_user, expect=403)
|
||||
put(url, {'name': 'foobar'}, super_user, expect=403)
|
||||
|
||||
assert 'PUT' in resp.data['actions']
|
||||
|
||||
@ -162,7 +162,6 @@ def test_instance_group_basic_policies(instance_factory, instance_group_factory)
|
||||
i2 = instance_factory("i2")
|
||||
i3 = instance_factory("i3")
|
||||
i4 = instance_factory("i4")
|
||||
instance_group_factory("tower")
|
||||
ig0 = instance_group_factory("ig0")
|
||||
ig1 = instance_group_factory("ig1", minimum=2)
|
||||
ig2 = instance_group_factory("ig2", percentage=50)
|
||||
@ -175,7 +174,7 @@ def test_instance_group_basic_policies(instance_factory, instance_group_factory)
|
||||
ig2 = InstanceGroup.objects.get(id=ig2.id)
|
||||
ig3 = InstanceGroup.objects.get(id=ig3.id)
|
||||
assert len(ig0.instances.all()) == 1
|
||||
assert i0 in ig0.instances.all()
|
||||
assert i0 in ig0.instances.all()
|
||||
assert len(InstanceGroup.objects.get(id=ig1.id).instances.all()) == 2
|
||||
assert i1 in ig1.instances.all()
|
||||
assert i2 in ig1.instances.all()
|
||||
|
||||
@ -31,32 +31,15 @@ def test_instance_dup(org_admin, organization, project, instance_factory, instan
|
||||
assert api_num_instances_oa == (actual_num_instances - 1)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@mock.patch('awx.main.tasks.handle_ha_toplogy_changes', return_value=None)
|
||||
def test_policy_instance_tower_group(mock, instance_factory, instance_group_factory):
|
||||
instance_factory("i1")
|
||||
instance_factory("i2")
|
||||
instance_factory("i3")
|
||||
ig_t = instance_group_factory("tower")
|
||||
instance_group_factory("ig1", percentage=25)
|
||||
instance_group_factory("ig2", percentage=25)
|
||||
instance_group_factory("ig3", percentage=25)
|
||||
instance_group_factory("ig4", percentage=25)
|
||||
apply_cluster_membership_policies()
|
||||
assert len(ig_t.instances.all()) == 3
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@mock.patch('awx.main.tasks.handle_ha_toplogy_changes', return_value=None)
|
||||
def test_policy_instance_few_instances(mock, instance_factory, instance_group_factory):
|
||||
i1 = instance_factory("i1")
|
||||
ig_t = instance_group_factory("tower")
|
||||
ig_1 = instance_group_factory("ig1", percentage=25)
|
||||
ig_2 = instance_group_factory("ig2", percentage=25)
|
||||
ig_3 = instance_group_factory("ig3", percentage=25)
|
||||
ig_4 = instance_group_factory("ig4", percentage=25)
|
||||
apply_cluster_membership_policies()
|
||||
assert len(ig_t.instances.all()) == 1
|
||||
assert len(ig_1.instances.all()) == 1
|
||||
assert i1 in ig_1.instances.all()
|
||||
assert len(ig_2.instances.all()) == 1
|
||||
@ -67,7 +50,6 @@ def test_policy_instance_few_instances(mock, instance_factory, instance_group_fa
|
||||
assert i1 in ig_4.instances.all()
|
||||
i2 = instance_factory("i2")
|
||||
apply_cluster_membership_policies()
|
||||
assert len(ig_t.instances.all()) == 2
|
||||
assert len(ig_1.instances.all()) == 1
|
||||
assert i1 in ig_1.instances.all()
|
||||
assert len(ig_2.instances.all()) == 1
|
||||
@ -84,13 +66,11 @@ def test_policy_instance_distribution_uneven(mock, instance_factory, instance_gr
|
||||
i1 = instance_factory("i1")
|
||||
i2 = instance_factory("i2")
|
||||
i3 = instance_factory("i3")
|
||||
ig_t = instance_group_factory("tower")
|
||||
ig_1 = instance_group_factory("ig1", percentage=25)
|
||||
ig_2 = instance_group_factory("ig2", percentage=25)
|
||||
ig_3 = instance_group_factory("ig3", percentage=25)
|
||||
ig_4 = instance_group_factory("ig4", percentage=25)
|
||||
apply_cluster_membership_policies()
|
||||
assert len(ig_t.instances.all()) == 3
|
||||
assert len(ig_1.instances.all()) == 1
|
||||
assert i1 in ig_1.instances.all()
|
||||
assert len(ig_2.instances.all()) == 1
|
||||
@ -108,13 +88,11 @@ def test_policy_instance_distribution_even(mock, instance_factory, instance_grou
|
||||
i2 = instance_factory("i2")
|
||||
i3 = instance_factory("i3")
|
||||
i4 = instance_factory("i4")
|
||||
ig_t = instance_group_factory("tower")
|
||||
ig_1 = instance_group_factory("ig1", percentage=25)
|
||||
ig_2 = instance_group_factory("ig2", percentage=25)
|
||||
ig_3 = instance_group_factory("ig3", percentage=25)
|
||||
ig_4 = instance_group_factory("ig4", percentage=25)
|
||||
apply_cluster_membership_policies()
|
||||
assert len(ig_t.instances.all()) == 4
|
||||
assert len(ig_1.instances.all()) == 1
|
||||
assert i1 in ig_1.instances.all()
|
||||
assert len(ig_2.instances.all()) == 1
|
||||
@ -126,7 +104,6 @@ def test_policy_instance_distribution_even(mock, instance_factory, instance_grou
|
||||
ig_1.policy_instance_minimum = 2
|
||||
ig_1.save()
|
||||
apply_cluster_membership_policies()
|
||||
assert len(ig_t.instances.all()) == 4
|
||||
assert len(ig_1.instances.all()) == 2
|
||||
assert i1 in ig_1.instances.all()
|
||||
assert i2 in ig_1.instances.all()
|
||||
@ -145,13 +122,11 @@ def test_policy_instance_distribution_simultaneous(mock, instance_factory, insta
|
||||
i2 = instance_factory("i2")
|
||||
i3 = instance_factory("i3")
|
||||
i4 = instance_factory("i4")
|
||||
ig_t = instance_group_factory("tower")
|
||||
ig_1 = instance_group_factory("ig1", percentage=25, minimum=2)
|
||||
ig_2 = instance_group_factory("ig2", percentage=25)
|
||||
ig_3 = instance_group_factory("ig3", percentage=25)
|
||||
ig_4 = instance_group_factory("ig4", percentage=25)
|
||||
apply_cluster_membership_policies()
|
||||
assert len(ig_t.instances.all()) == 4
|
||||
assert len(ig_1.instances.all()) == 2
|
||||
assert i1 in ig_1.instances.all()
|
||||
assert i2 in ig_1.instances.all()
|
||||
@ -168,13 +143,11 @@ def test_policy_instance_distribution_simultaneous(mock, instance_factory, insta
|
||||
def test_policy_instance_list_manually_managed(mock, instance_factory, instance_group_factory):
|
||||
i1 = instance_factory("i1")
|
||||
i2 = instance_factory("i2")
|
||||
ig_t = instance_group_factory("tower")
|
||||
ig_1 = instance_group_factory("ig1", percentage=100, minimum=2)
|
||||
ig_2 = instance_group_factory("ig2")
|
||||
ig_2.policy_instance_list = [i2.hostname]
|
||||
ig_2.save()
|
||||
apply_cluster_membership_policies()
|
||||
assert len(ig_t.instances.all()) == 2
|
||||
assert len(ig_1.instances.all()) == 1
|
||||
assert i1 in ig_1.instances.all()
|
||||
assert i2 not in ig_1.instances.all()
|
||||
|
||||
@ -60,7 +60,6 @@ class TestAddRemoveCeleryWorkerQueues():
|
||||
static_queues, _worker_queues,
|
||||
groups, hostname,
|
||||
added_expected, removed_expected):
|
||||
added_expected.append('tower_instance_router')
|
||||
instance = instance_generator(groups=groups, hostname=hostname)
|
||||
worker_queues = worker_queues_generator(_worker_queues)
|
||||
with mock.patch('awx.main.utils.ha.settings.AWX_CELERY_QUEUES_STATIC', static_queues):
|
||||
|
||||
@ -3,9 +3,6 @@
|
||||
# Copyright (c) 2017 Ansible Tower by Red Hat
|
||||
# All Rights Reserved.
|
||||
|
||||
# Python
|
||||
import six
|
||||
|
||||
# Django
|
||||
from django.conf import settings
|
||||
|
||||
@ -16,24 +13,26 @@ from awx.main.models import Instance
|
||||
def _add_remove_celery_worker_queues(app, controlled_instances, worker_queues, worker_name):
|
||||
removed_queues = []
|
||||
added_queues = []
|
||||
ig_names = set([six.text_type('tower_instance_router')])
|
||||
ig_names = set()
|
||||
hostnames = set([instance.hostname for instance in controlled_instances])
|
||||
for instance in controlled_instances:
|
||||
ig_names.update(instance.rampart_groups.values_list('name', flat=True))
|
||||
worker_queue_names = set([q['name'] for q in worker_queues])
|
||||
|
||||
all_queue_names = ig_names | hostnames | set(settings.AWX_CELERY_QUEUES_STATIC)
|
||||
|
||||
# Remove queues that aren't in the instance group
|
||||
for queue in worker_queues:
|
||||
if queue['name'] in settings.AWX_CELERY_QUEUES_STATIC or \
|
||||
queue['alias'] in settings.AWX_CELERY_QUEUES_STATIC:
|
||||
queue['alias'] in settings.AWX_CELERY_BCAST_QUEUES_STATIC:
|
||||
continue
|
||||
|
||||
if queue['name'] not in ig_names | hostnames or not instance.enabled:
|
||||
if queue['name'] not in all_queue_names or not instance.enabled:
|
||||
app.control.cancel_consumer(queue['name'].encode("utf8"), reply=True, destination=[worker_name])
|
||||
removed_queues.append(queue['name'].encode("utf8"))
|
||||
|
||||
# Add queues for instance and instance groups
|
||||
for queue_name in ig_names | hostnames:
|
||||
for queue_name in all_queue_names:
|
||||
if queue_name not in worker_queue_names:
|
||||
app.control.add_consumer(queue_name.encode("utf8"), reply=True, destination=[worker_name])
|
||||
added_queues.append(queue_name.encode("utf8"))
|
||||
@ -76,6 +75,5 @@ def register_celery_worker_queues(app, celery_worker_name):
|
||||
celery_worker_queues = celery_host_queues[celery_worker_name] if celery_host_queues else []
|
||||
(added_queues, removed_queues) = _add_remove_celery_worker_queues(app, controlled_instances,
|
||||
celery_worker_queues, celery_worker_name)
|
||||
|
||||
return (controlled_instances, removed_queues, added_queues)
|
||||
|
||||
|
||||
@ -6,9 +6,9 @@ import re # noqa
|
||||
import sys
|
||||
import ldap
|
||||
import djcelery
|
||||
import six
|
||||
from datetime import timedelta
|
||||
|
||||
from kombu import Queue, Exchange
|
||||
from kombu.common import Broadcast
|
||||
|
||||
# global settings
|
||||
@ -451,7 +451,7 @@ djcelery.setup_loader()
|
||||
BROKER_POOL_LIMIT = None
|
||||
BROKER_URL = 'amqp://guest:guest@localhost:5672//'
|
||||
CELERY_EVENT_QUEUE_TTL = 5
|
||||
CELERY_DEFAULT_QUEUE = 'tower'
|
||||
CELERY_DEFAULT_QUEUE = 'awx_private_queue'
|
||||
CELERY_TASK_SERIALIZER = 'json'
|
||||
CELERY_RESULT_SERIALIZER = 'json'
|
||||
CELERY_ACCEPT_CONTENT = ['json']
|
||||
@ -463,8 +463,7 @@ CELERYD_AUTOSCALER = 'awx.main.utils.autoscale:DynamicAutoScaler'
|
||||
CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend'
|
||||
CELERY_IMPORTS = ('awx.main.scheduler.tasks',)
|
||||
CELERY_QUEUES = (
|
||||
Queue('tower', Exchange('tower'), routing_key='tower'),
|
||||
Broadcast('tower_broadcast_all')
|
||||
Broadcast('tower_broadcast_all'),
|
||||
)
|
||||
CELERY_ROUTES = {}
|
||||
|
||||
@ -515,7 +514,13 @@ AWX_INCONSISTENT_TASK_INTERVAL = 60 * 3
|
||||
# Celery queues that will always be listened to by celery workers
|
||||
# Note: Broadcast queues have unique, auto-generated names, with the alias
|
||||
# property value of the original queue name.
|
||||
AWX_CELERY_QUEUES_STATIC = ['tower_broadcast_all',]
|
||||
AWX_CELERY_QUEUES_STATIC = [
|
||||
six.text_type(CELERY_DEFAULT_QUEUE),
|
||||
]
|
||||
|
||||
AWX_CELERY_BCAST_QUEUES_STATIC = [
|
||||
six.text_type('tower_broadcast_all'),
|
||||
]
|
||||
|
||||
ASGI_AMQP = {
|
||||
'INIT_FUNC': 'awx.prepare_env',
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user