diff --git a/Makefile b/Makefile index 99e4d0c325..61492e4bf4 100644 --- a/Makefile +++ b/Makefile @@ -216,13 +216,11 @@ init: . $(VENV_BASE)/awx/bin/activate; \ fi; \ $(MANAGEMENT_COMMAND) provision_instance --hostname=$(COMPOSE_HOST); \ - $(MANAGEMENT_COMMAND) register_queue --queuename=tower --hostnames=$(COMPOSE_HOST);\ + $(MANAGEMENT_COMMAND) register_queue --queuename=tower --instance_percent=100;\ if [ "$(AWX_GROUP_QUEUES)" == "tower,thepentagon" ]; then \ $(MANAGEMENT_COMMAND) provision_instance --hostname=isolated; \ $(MANAGEMENT_COMMAND) register_queue --queuename='thepentagon' --hostnames=isolated --controller=tower; \ $(MANAGEMENT_COMMAND) generate_isolated_key | ssh -o "StrictHostKeyChecking no" root@isolated 'cat > /root/.ssh/authorized_keys'; \ - elif [ "$(AWX_GROUP_QUEUES)" != "tower" ]; then \ - $(MANAGEMENT_COMMAND) register_queue --queuename=$(firstword $(subst $(comma), ,$(AWX_GROUP_QUEUES))) --hostnames=$(COMPOSE_HOST); \ fi; # Refresh development environment after pulling new code. diff --git a/awx/api/serializers.py b/awx/api/serializers.py index adb17dd65b..f134d393e6 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -4012,7 +4012,7 @@ class InstanceGroupSerializer(BaseSerializer): fields = ("id", "type", "url", "related", "name", "created", "modified", "capacity", "committed_capacity", "consumed_capacity", "percent_capacity_remaining", "jobs_running", "instances", "controller", - "policy_instance_percentage", "policy_instance_minimum") + "policy_instance_percentage", "policy_instance_minimum", "policy_instance_list") def get_related(self, obj): res = super(InstanceGroupSerializer, self).get_related(obj) diff --git a/awx/api/views.py b/awx/api/views.py index df7d2b61e8..56bee58c21 100644 --- a/awx/api/views.py +++ b/awx/api/views.py @@ -57,7 +57,7 @@ import pytz from wsgiref.util import FileWrapper # AWX -from awx.main.tasks import send_notifications, handle_ha_toplogy_changes +from awx.main.tasks import send_notifications from awx.main.access import get_user_queryset from awx.main.ha import is_ha_environment from awx.api.authentication import TokenGetAuthentication @@ -154,20 +154,32 @@ class InstanceGroupMembershipMixin(object): ''' def attach(self, request, *args, **kwargs): response = super(InstanceGroupMembershipMixin, self).attach(request, *args, **kwargs) + sub_id, res = self.attach_validate(request) if status.is_success(response.status_code): - handle_ha_toplogy_changes.apply_async() + if self.parent_model is Instance: + ig_obj = get_object_or_400(self.model, pk=sub_id) + inst_name = ig_obj.hostname + else: + ig_obj = self.get_parent_object() + inst_name = get_object_or_400(self.model, pk=sub_id).hostname + if inst_name not in ig_obj.policy_instance_list: + ig_obj.policy_instance_list.append(inst_name) + ig_obj.save() return response def unattach(self, request, *args, **kwargs): response = super(InstanceGroupMembershipMixin, self).unattach(request, *args, **kwargs) + sub_id, res = self.attach_validate(request) if status.is_success(response.status_code): - handle_ha_toplogy_changes.apply_async() - return response - - def destroy(self, request, *args, **kwargs): - response = super(InstanceGroupMembershipMixin, self).destroy(request, *args, **kwargs) - if status.is_success(response.status_code): - handle_ha_toplogy_changes.apply_async() + if self.parent_model is Instance: + ig_obj = get_object_or_400(self.model, pk=sub_id) + inst_name = self.get_parent_object().hostname + else: + ig_obj = self.get_parent_object() + inst_name = get_object_or_400(self.model, pk=sub_id).hostname + if inst_name in ig_obj.policy_instance_list: + ig_obj.policy_instance_list.pop(ig_obj.policy_instance_list.index(inst_name)) + ig_obj.save() return response @@ -589,7 +601,7 @@ class InstanceGroupList(ListCreateAPIView): new_in_320 = True -class InstanceGroupDetail(InstanceGroupMembershipMixin, RetrieveDestroyAPIView): +class InstanceGroupDetail(RetrieveUpdateDestroyAPIView): view_name = _("Instance Group Detail") model = InstanceGroup diff --git a/awx/main/migrations/0013_v330_instancegroup_policies.py b/awx/main/migrations/0018_v330_instancegroup_policies.py similarity index 87% rename from awx/main/migrations/0013_v330_instancegroup_policies.py rename to awx/main/migrations/0018_v330_instancegroup_policies.py index b8fa658fb8..63403f6766 100644 --- a/awx/main/migrations/0013_v330_instancegroup_policies.py +++ b/awx/main/migrations/0018_v330_instancegroup_policies.py @@ -8,14 +8,15 @@ import awx.main.fields class Migration(migrations.Migration): dependencies = [ - ('main', '0008_v320_drop_v1_credential_fields'), + ('main', '0017_v330_move_deprecated_stdout'), ] operations = [ migrations.AddField( model_name='instancegroup', name='policy_instance_list', - field=awx.main.fields.JSONField(default=[], help_text='List of exact-match Instances that will always be automatically assigned to this group', blank=True), + field=awx.main.fields.JSONField(default=[], help_text='List of exact-match Instances that will always be automatically assigned to this group', + blank=True), ), migrations.AddField( model_name='instancegroup', diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index eac0bec22f..12e8bf23e9 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -2,7 +2,7 @@ # All Rights Reserved. from django.db import models -from django.db.models.signals import post_save +from django.db.models.signals import post_save, post_delete from django.dispatch import receiver from django.utils.translation import ugettext_lazy as _ from django.conf import settings @@ -136,6 +136,32 @@ class JobOrigin(models.Model): app_label = 'main' +@receiver(post_save, sender=InstanceGroup) +def on_instance_group_saved(sender, instance, created=False, raw=False, **kwargs): + if created: + from awx.main.tasks import apply_cluster_membership_policies + apply_cluster_membership_policies.apply_async(countdown=5) + + +@receiver(post_save, sender=Instance) +def on_instance_saved(sender, instance, created=False, raw=False, **kwargs): + if created: + from awx.main.tasks import apply_cluster_membership_policies + apply_cluster_membership_policies.apply_async(countdown=5) + + +@receiver(post_delete, sender=InstanceGroup) +def on_instance_group_deleted(sender, instance, using, **kwargs): + from awx.main.tasks import apply_cluster_membership_policies + apply_cluster_membership_policies.apply_async(countdown=5) + + +@receiver(post_delete, sender=Instance) +def on_instance_deleted(sender, instance, using, **kwargs): + from awx.main.tasks import apply_cluster_membership_policies + apply_cluster_membership_policies.apply_async(countdown=5) + + # Unfortunately, the signal can't just be connected against UnifiedJob; it # turns out that creating a model's subclass doesn't fire the signal for the # superclass model. diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 109b92a771..698fda006e 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -26,7 +26,7 @@ except Exception: psutil = None # Celery -from celery import Task, shared_task +from celery import Task, shared_task, Celery from celery.signals import celeryd_init, worker_process_init, worker_shutdown, worker_ready, celeryd_after_setup # Django @@ -58,13 +58,14 @@ from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field, ignore_inventory_computed_fields, ignore_inventory_group_removal, get_type_for_model, extract_ansible_vars) from awx.main.utils.reload import restart_local_services, stop_local_services +from awx.main.utils.pglock import advisory_lock from awx.main.utils.ha import update_celery_worker_routes, register_celery_worker_queues from awx.main.utils.handlers import configure_external_logger from awx.main.consumers import emit_channel_notification from awx.conf import settings_registry __all__ = ['RunJob', 'RunSystemJob', 'RunProjectUpdate', 'RunInventoryUpdate', - 'RunAdHocCommand', 'handle_work_error', 'handle_work_success', + 'RunAdHocCommand', 'handle_work_error', 'handle_work_success', 'apply_cluster_membership_policies', 'update_inventory_computed_fields', 'update_host_smart_inventory_memberships', 'send_notifications', 'run_administrative_checks', 'purge_old_stdout_files'] @@ -132,41 +133,54 @@ def inform_cluster_of_shutdown(*args, **kwargs): logger.exception('Encountered problem with normal shutdown signal.') -@shared_task(bind=True, queue='tower', base=LogErrorsTask) +@shared_task(bind=True, queue='tower_instance_router', base=LogErrorsTask) def apply_cluster_membership_policies(self): - considered_instances = Instance.objects.all().order_by('id').only('id') - total_instances = considered_instances.count() - actual_groups = [] - actual_instances = [] - Group = namedtuple('Group', ['obj', 'instances']) - Instance = namedtuple('Instance', ['obj', 'groups']) - # Process policy instance list first, these will represent manually managed instances - # that will not go through automatic policy determination - for ig in InstanceGroup.objects.all(): - group_actual = Group(obj=ig, instances=[]) - for i in ig.policy_instance_list: - group_actual.instances.append(i) - if i in considered_instances: - considered_instances.remove(i) - actual_groups.append(group_actual) - # Process Instance minimum policies next, since it represents a concrete lower bound to the - # number of instances to make available to instance groups - for i in considered_instances: - instance_actual = Instance(obj=i, groups=[]) + with advisory_lock('cluster_policy_lock', wait=True): + considered_instances = Instance.objects.all().order_by('id') + total_instances = considered_instances.count() + filtered_instances = [] + actual_groups = [] + actual_instances = [] + Group = namedtuple('Group', ['obj', 'instances']) + Node = namedtuple('Instance', ['obj', 'groups']) + # Process policy instance list first, these will represent manually managed instances + # that will not go through automatic policy determination + for ig in InstanceGroup.objects.all(): + logger.info("Considering group {}".format(ig.name)) + ig.instances.clear() + group_actual = Group(obj=ig, instances=[]) + for i in ig.policy_instance_list: + inst = Instance.objects.filter(hostname=i) + if not inst.exists(): + continue + inst = inst[0] + logger.info("Policy List, adding {} to {}".format(inst.hostname, ig.name)) + group_actual.instances.append(inst.id) + ig.instances.add(inst) + filtered_instances.append(inst) + actual_groups.append(group_actual) + # Process Instance minimum policies next, since it represents a concrete lower bound to the + # number of instances to make available to instance groups + actual_instances = [Node(obj=i, groups=[]) for i in filter(lambda x: x not in filtered_instances, considered_instances)] + logger.info("Total instances not directly associated: {}".format(total_instances)) for g in sorted(actual_groups, cmp=lambda x,y: len(x.instances) - len(y.instances)): - if len(g.instances) < g.obj.policy_instance_minimum: - g.instances.append(instance_actual.obj.id) - instance_actual.groups.append(g.obj.id) - break - actual_instances.append(instance_actual) - # Finally process instance policy percentages - for i in sorted(actual_instances, cmp=lambda x,y: len(x.groups) - len(y.groups)): - for g in sorted(actual_groups, cmp=lambda x,y: len(x.instances) - len(y.instances)): - if 100 * float(len(g.instances)) / total_instances < g.obj.policy_instance_percentage: + for i in sorted(actual_instances, cmp=lambda x,y: len(x.groups) - len(y.groups)): + if len(g.instances) >= g.obj.policy_instance_minimum: + break + logger.info("Policy minimum, adding {} to {}".format(i.obj.hostname, g.obj.name)) + g.obj.instances.add(i.obj) g.instances.append(i.obj.id) i.groups.append(g.obj.id) - break - # Next step + # Finally process instance policy percentages + for g in sorted(actual_groups, cmp=lambda x,y: len(x.instances) - len(y.instances)): + for i in sorted(actual_instances, cmp=lambda x,y: len(x.groups) - len(y.groups)): + if 100 * float(len(g.instances)) / len(actual_instances) >= g.obj.policy_instance_percentage: + break + logger.info("Policy percentage, adding {} to {}".format(i.obj.hostname, g.obj.name)) + g.instances.append(i.obj.id) + g.obj.instances.add(i.obj) + i.groups.append(g.obj.id) + handle_ha_toplogy_changes.apply_async() @shared_task(queue='tower_broadcast_all', bind=True, base=LogErrorsTask) @@ -190,12 +204,14 @@ def handle_setting_changes(self, setting_keys): def handle_ha_toplogy_changes(self): instance = Instance.objects.me() logger.debug("Reconfigure celeryd queues task on host {}".format(self.request.hostname)) - (instance, removed_queues, added_queues) = register_celery_worker_queues(self.app, self.request.hostname) + awx_app = Celery('awx') + awx_app.config_from_object('django.conf:settings', namespace='CELERY') + (instance, removed_queues, added_queues) = register_celery_worker_queues(awx_app, self.request.hostname) logger.info("Workers on tower node '{}' removed from queues {} and added to queues {}" .format(instance.hostname, removed_queues, added_queues)) updated_routes = update_celery_worker_routes(instance, settings) logger.info("Worker on tower node '{}' updated celery routes {} all routes are now {}" - .format(instance.hostname, updated_routes, self.app.conf.CELERY_ROUTES)) + .format(instance.hostname, updated_routes, self.app.conf.CELERY_TASK_ROUTES)) @worker_ready.connect @@ -213,7 +229,7 @@ def handle_update_celery_routes(sender=None, conf=None, **kwargs): instance = Instance.objects.me() added_routes = update_celery_worker_routes(instance, conf) logger.info("Workers on tower node '{}' added routes {} all routes are now {}" - .format(instance.hostname, added_routes, conf.CELERY_ROUTES)) + .format(instance.hostname, added_routes, conf.CELERY_TASK_ROUTES)) @celeryd_after_setup.connect diff --git a/awx/main/tests/factories/fixtures.py b/awx/main/tests/factories/fixtures.py index eee545336d..2f3ec0656f 100644 --- a/awx/main/tests/factories/fixtures.py +++ b/awx/main/tests/factories/fixtures.py @@ -35,8 +35,9 @@ def mk_instance(persisted=True, hostname='instance.example.org'): return Instance.objects.get_or_create(uuid=settings.SYSTEM_UUID, hostname=hostname)[0] -def mk_instance_group(name='tower', instance=None): - ig, status = InstanceGroup.objects.get_or_create(name=name) +def mk_instance_group(name='tower', instance=None, minimum=0, percentage=0): + ig, status = InstanceGroup.objects.get_or_create(name=name, policy_instance_minimum=minimum, + policy_instance_percentage=percentage) if instance is not None: if type(instance) == list: for i in instance: diff --git a/awx/main/tests/factories/tower.py b/awx/main/tests/factories/tower.py index a8f20f941f..ecb395dd99 100644 --- a/awx/main/tests/factories/tower.py +++ b/awx/main/tests/factories/tower.py @@ -135,8 +135,8 @@ def create_instance(name, instance_groups=None): return mk_instance(hostname=name) -def create_instance_group(name, instances=None): - return mk_instance_group(name=name, instance=instances) +def create_instance_group(name, instances=None, minimum=0, percentage=0): + return mk_instance_group(name=name, instance=instances, minimum=minimum, percentage=percentage) def create_survey_spec(variables=None, default_type='integer', required=True, min=None, max=None): diff --git a/awx/main/tests/functional/task_management/test_rampart_groups.py b/awx/main/tests/functional/task_management/test_rampart_groups.py index f4c6ba95bf..9b4b3eac44 100644 --- a/awx/main/tests/functional/task_management/test_rampart_groups.py +++ b/awx/main/tests/functional/task_management/test_rampart_groups.py @@ -2,6 +2,8 @@ import pytest import mock from datetime import timedelta from awx.main.scheduler import TaskManager +from awx.main.models import InstanceGroup +from awx.main.tasks import apply_cluster_membership_policies @pytest.mark.django_db @@ -151,3 +153,34 @@ def test_failover_group_run(instance_factory, default_instance_group, mocker, tm.schedule() mock_job.assert_has_calls([mock.call(j1, ig1, []), mock.call(j1_1, ig2, [])]) assert mock_job.call_count == 2 + + +@pytest.mark.django_db +def test_instance_group_basic_policies(instance_factory, instance_group_factory): + i0 = instance_factory("i0") + i1 = instance_factory("i1") + i2 = instance_factory("i2") + i3 = instance_factory("i3") + i4 = instance_factory("i4") + ig0 = instance_group_factory("ig0") + ig1 = instance_group_factory("ig1", minimum=2) + ig2 = instance_group_factory("ig2", percentage=50) + ig3 = instance_group_factory("ig3", percentage=50) + ig0.policy_instance_list.append(i0.hostname) + ig0.save() + apply_cluster_membership_policies() + ig0 = InstanceGroup.objects.get(id=ig0.id) + ig1 = InstanceGroup.objects.get(id=ig1.id) + ig2 = InstanceGroup.objects.get(id=ig2.id) + ig3 = InstanceGroup.objects.get(id=ig3.id) + assert len(ig0.instances.all()) == 1 + assert i0 in ig0.instances.all() + assert len(InstanceGroup.objects.get(id=ig1.id).instances.all()) == 2 + assert i1 in ig1.instances.all() + assert i2 in ig1.instances.all() + assert len(InstanceGroup.objects.get(id=ig2.id).instances.all()) == 2 + assert i3 in ig2.instances.all() + assert i4 in ig2.instances.all() + assert len(InstanceGroup.objects.get(id=ig3.id).instances.all()) == 2 + assert i1 in ig3.instances.all() + assert i2 in ig3.instances.all() diff --git a/awx/main/tests/unit/utils/test_ha.py b/awx/main/tests/unit/utils/test_ha.py index 6bd1b856b9..b71ca454e1 100644 --- a/awx/main/tests/unit/utils/test_ha.py +++ b/awx/main/tests/unit/utils/test_ha.py @@ -17,7 +17,7 @@ from awx.main.utils.ha import ( @pytest.fixture def conf(): class Conf(): - CELERY_ROUTES = dict() + CELERY_TASK_ROUTES = dict() CELERYBEAT_SCHEDULE = dict() return Conf() @@ -87,14 +87,14 @@ class TestUpdateCeleryWorkerRoutes(): instance.is_controller = mocker.MagicMock(return_value=is_controller) assert update_celery_worker_routes(instance, conf) == expected_routes - assert conf.CELERY_ROUTES == expected_routes + assert conf.CELERY_TASK_ROUTES == expected_routes def test_update_celery_worker_routes_deleted(self, mocker, conf): instance = mocker.MagicMock() instance.hostname = 'east-1' instance.is_controller = mocker.MagicMock(return_value=False) - conf.CELERY_ROUTES = {'awx.main.tasks.awx_isolated_heartbeat': 'foobar'} + conf.CELERY_TASK_ROUTES = {'awx.main.tasks.awx_isolated_heartbeat': 'foobar'} update_celery_worker_routes(instance, conf) - assert 'awx.main.tasks.awx_isolated_heartbeat' not in conf.CELERY_ROUTES + assert 'awx.main.tasks.awx_isolated_heartbeat' not in conf.CELERY_TASK_ROUTES diff --git a/awx/main/utils/ha.py b/awx/main/utils/ha.py index 9efb3e9cf3..376faf9eb9 100644 --- a/awx/main/utils/ha.py +++ b/awx/main/utils/ha.py @@ -14,6 +14,7 @@ def _add_remove_celery_worker_queues(app, instance, worker_queues, worker_name): removed_queues = [] added_queues = [] ig_names = set(instance.rampart_groups.values_list('name', flat=True)) + ig_names.add("tower_instance_router") worker_queue_names = set([q['name'] for q in worker_queues]) @@ -47,12 +48,12 @@ def update_celery_worker_routes(instance, conf): if instance.is_controller(): tasks.append('awx.main.tasks.awx_isolated_heartbeat') else: - if 'awx.main.tasks.awx_isolated_heartbeat' in conf.CELERY_ROUTES: - del conf.CELERY_ROUTES['awx.main.tasks.awx_isolated_heartbeat'] + if 'awx.main.tasks.awx_isolated_heartbeat' in conf.CELERY_TASK_ROUTES: + del conf.CELERY_TASK_ROUTES['awx.main.tasks.awx_isolated_heartbeat'] for t in tasks: - conf.CELERY_ROUTES[t] = {'queue': instance.hostname, 'routing_key': instance.hostname} - routes_updated[t] = conf.CELERY_ROUTES[t] + conf.CELERY_TASK_ROUTES[t] = {'queue': instance.hostname, 'routing_key': instance.hostname} + routes_updated[t] = conf.CELERY_TASK_ROUTES[t] return routes_updated diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index db3dec23fb..5fb4edfe69 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -432,6 +432,7 @@ DEVSERVER_DEFAULT_PORT = '8013' # Set default ports for live server tests. os.environ.setdefault('DJANGO_LIVE_TEST_SERVER_ADDRESS', 'localhost:9013-9199') +BROKER_POOL_LIMIT = None CELERY_BROKER_URL = 'amqp://guest:guest@localhost:5672//' CELERY_EVENT_QUEUE_TTL = 5 CELERY_TASK_DEFAULT_QUEUE = 'tower' @@ -452,7 +453,7 @@ CELERY_TASK_QUEUES = ( ) CELERY_TASK_ROUTES = {} -CELERYBEAT_SCHEDULE = { +CELERY_BEAT_SCHEDULE = { 'tower_scheduler': { 'task': 'awx.main.tasks.awx_periodic_scheduler', 'schedule': timedelta(seconds=30), @@ -1123,9 +1124,11 @@ LOGGING = { }, 'awx.main.tasks': { 'handlers': ['task_system'], + 'propagate': False }, 'awx.main.scheduler': { 'handlers': ['task_system'], + 'propagate': False }, 'awx.main.consumers': { 'handlers': ['null'] diff --git a/installer/image_build/files/launch_awx_task.sh b/installer/image_build/files/launch_awx_task.sh index 88b59d63fa..ebcc8b6798 100755 --- a/installer/image_build/files/launch_awx_task.sh +++ b/installer/image_build/files/launch_awx_task.sh @@ -19,5 +19,5 @@ else awx-manage create_preload_data fi awx-manage provision_instance --hostname=$(hostname) -awx-manage register_queue --queuename=tower --hostnames=$(hostname) +awx-manage register_queue --queuename=tower --instance_percent=100 supervisord -c /supervisor_task.conf diff --git a/installer/image_build/files/supervisor_task.conf b/installer/image_build/files/supervisor_task.conf index 1a4e613925..83107bf6e7 100644 --- a/installer/image_build/files/supervisor_task.conf +++ b/installer/image_build/files/supervisor_task.conf @@ -3,7 +3,7 @@ nodaemon = True umask = 022 [program:celery] -command = /var/lib/awx/venv/awx/bin/celery worker -A awx -B -l debug --autoscale=4 -Ofair -s /var/lib/awx/beat.db -Q tower_broadcast_all -n celery@$(ENV_HOSTNAME)s +command = /var/lib/awx/venv/awx/bin/celery worker -A awx -B -l debug --autoscale=4 -Ofair -s /var/lib/awx/beat.db -Q tower_broadcast_all -n celery@%(ENV_HOSTNAME)s directory = /var/lib/awx environment = LANGUAGE="en_US.UTF-8",LANG="en_US.UTF-8",LC_ALL="en_US.UTF-8",LC_CTYPE="en_US.UTF-8" #user = {{ aw_user }}