Updates for automatic triggering of policies

* Switch policy router queue to not be "tower" so that we don't
  fall into a chicken/egg scenario
* Show fixed policy list in serializer so a user can determine if
  an instance is manually managed
* Change IG membership mixin to not directly handle applying topology
  changes. Instead it just makes sure the policy instance list is
  accurate
* Add create/delete hooks for instances and groups to trigger policy
  re-evaluation
* Update policy algorithm for fairer distribution
* Fix an issue where CELERY_ROUTES wasn't renamed after celery/django
  upgrade
* Update unit tests to be more explicit
* Update count calculations used by algorithm to only consider
  non-manual instances
* Adding unit tests and fixture
* Don't propagate logging messages from awx.main.tasks and
  awx.main.scheduler
* Use advisory lock to prevent policy eval conflicts
* Allow updating instance groups from view
This commit is contained in:
Matthew Jones 2017-11-16 14:55:17 -05:00
parent 56abfa732e
commit d9e774c4b6
No known key found for this signature in database
GPG Key ID: 76A4C17A97590C1C
14 changed files with 159 additions and 68 deletions

View File

@ -216,13 +216,11 @@ init:
. $(VENV_BASE)/awx/bin/activate; \
fi; \
$(MANAGEMENT_COMMAND) provision_instance --hostname=$(COMPOSE_HOST); \
$(MANAGEMENT_COMMAND) register_queue --queuename=tower --hostnames=$(COMPOSE_HOST);\
$(MANAGEMENT_COMMAND) register_queue --queuename=tower --instance_percent=100;\
if [ "$(AWX_GROUP_QUEUES)" == "tower,thepentagon" ]; then \
$(MANAGEMENT_COMMAND) provision_instance --hostname=isolated; \
$(MANAGEMENT_COMMAND) register_queue --queuename='thepentagon' --hostnames=isolated --controller=tower; \
$(MANAGEMENT_COMMAND) generate_isolated_key | ssh -o "StrictHostKeyChecking no" root@isolated 'cat > /root/.ssh/authorized_keys'; \
elif [ "$(AWX_GROUP_QUEUES)" != "tower" ]; then \
$(MANAGEMENT_COMMAND) register_queue --queuename=$(firstword $(subst $(comma), ,$(AWX_GROUP_QUEUES))) --hostnames=$(COMPOSE_HOST); \
fi;
# Refresh development environment after pulling new code.

View File

@ -4012,7 +4012,7 @@ class InstanceGroupSerializer(BaseSerializer):
fields = ("id", "type", "url", "related", "name", "created", "modified",
"capacity", "committed_capacity", "consumed_capacity",
"percent_capacity_remaining", "jobs_running", "instances", "controller",
"policy_instance_percentage", "policy_instance_minimum")
"policy_instance_percentage", "policy_instance_minimum", "policy_instance_list")
def get_related(self, obj):
res = super(InstanceGroupSerializer, self).get_related(obj)

View File

@ -57,7 +57,7 @@ import pytz
from wsgiref.util import FileWrapper
# AWX
from awx.main.tasks import send_notifications, handle_ha_toplogy_changes
from awx.main.tasks import send_notifications
from awx.main.access import get_user_queryset
from awx.main.ha import is_ha_environment
from awx.api.authentication import TokenGetAuthentication
@ -154,20 +154,32 @@ class InstanceGroupMembershipMixin(object):
'''
def attach(self, request, *args, **kwargs):
response = super(InstanceGroupMembershipMixin, self).attach(request, *args, **kwargs)
sub_id, res = self.attach_validate(request)
if status.is_success(response.status_code):
handle_ha_toplogy_changes.apply_async()
if self.parent_model is Instance:
ig_obj = get_object_or_400(self.model, pk=sub_id)
inst_name = ig_obj.hostname
else:
ig_obj = self.get_parent_object()
inst_name = get_object_or_400(self.model, pk=sub_id).hostname
if inst_name not in ig_obj.policy_instance_list:
ig_obj.policy_instance_list.append(inst_name)
ig_obj.save()
return response
def unattach(self, request, *args, **kwargs):
response = super(InstanceGroupMembershipMixin, self).unattach(request, *args, **kwargs)
sub_id, res = self.attach_validate(request)
if status.is_success(response.status_code):
handle_ha_toplogy_changes.apply_async()
return response
def destroy(self, request, *args, **kwargs):
response = super(InstanceGroupMembershipMixin, self).destroy(request, *args, **kwargs)
if status.is_success(response.status_code):
handle_ha_toplogy_changes.apply_async()
if self.parent_model is Instance:
ig_obj = get_object_or_400(self.model, pk=sub_id)
inst_name = self.get_parent_object().hostname
else:
ig_obj = self.get_parent_object()
inst_name = get_object_or_400(self.model, pk=sub_id).hostname
if inst_name in ig_obj.policy_instance_list:
ig_obj.policy_instance_list.pop(ig_obj.policy_instance_list.index(inst_name))
ig_obj.save()
return response
@ -589,7 +601,7 @@ class InstanceGroupList(ListCreateAPIView):
new_in_320 = True
class InstanceGroupDetail(InstanceGroupMembershipMixin, RetrieveDestroyAPIView):
class InstanceGroupDetail(RetrieveUpdateDestroyAPIView):
view_name = _("Instance Group Detail")
model = InstanceGroup

View File

@ -8,14 +8,15 @@ import awx.main.fields
class Migration(migrations.Migration):
dependencies = [
('main', '0008_v320_drop_v1_credential_fields'),
('main', '0017_v330_move_deprecated_stdout'),
]
operations = [
migrations.AddField(
model_name='instancegroup',
name='policy_instance_list',
field=awx.main.fields.JSONField(default=[], help_text='List of exact-match Instances that will always be automatically assigned to this group', blank=True),
field=awx.main.fields.JSONField(default=[], help_text='List of exact-match Instances that will always be automatically assigned to this group',
blank=True),
),
migrations.AddField(
model_name='instancegroup',

View File

@ -2,7 +2,7 @@
# All Rights Reserved.
from django.db import models
from django.db.models.signals import post_save
from django.db.models.signals import post_save, post_delete
from django.dispatch import receiver
from django.utils.translation import ugettext_lazy as _
from django.conf import settings
@ -136,6 +136,32 @@ class JobOrigin(models.Model):
app_label = 'main'
@receiver(post_save, sender=InstanceGroup)
def on_instance_group_saved(sender, instance, created=False, raw=False, **kwargs):
if created:
from awx.main.tasks import apply_cluster_membership_policies
apply_cluster_membership_policies.apply_async(countdown=5)
@receiver(post_save, sender=Instance)
def on_instance_saved(sender, instance, created=False, raw=False, **kwargs):
if created:
from awx.main.tasks import apply_cluster_membership_policies
apply_cluster_membership_policies.apply_async(countdown=5)
@receiver(post_delete, sender=InstanceGroup)
def on_instance_group_deleted(sender, instance, using, **kwargs):
from awx.main.tasks import apply_cluster_membership_policies
apply_cluster_membership_policies.apply_async(countdown=5)
@receiver(post_delete, sender=Instance)
def on_instance_deleted(sender, instance, using, **kwargs):
from awx.main.tasks import apply_cluster_membership_policies
apply_cluster_membership_policies.apply_async(countdown=5)
# Unfortunately, the signal can't just be connected against UnifiedJob; it
# turns out that creating a model's subclass doesn't fire the signal for the
# superclass model.

View File

@ -26,7 +26,7 @@ except Exception:
psutil = None
# Celery
from celery import Task, shared_task
from celery import Task, shared_task, Celery
from celery.signals import celeryd_init, worker_process_init, worker_shutdown, worker_ready, celeryd_after_setup
# Django
@ -58,13 +58,14 @@ from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field,
ignore_inventory_computed_fields, ignore_inventory_group_removal,
get_type_for_model, extract_ansible_vars)
from awx.main.utils.reload import restart_local_services, stop_local_services
from awx.main.utils.pglock import advisory_lock
from awx.main.utils.ha import update_celery_worker_routes, register_celery_worker_queues
from awx.main.utils.handlers import configure_external_logger
from awx.main.consumers import emit_channel_notification
from awx.conf import settings_registry
__all__ = ['RunJob', 'RunSystemJob', 'RunProjectUpdate', 'RunInventoryUpdate',
'RunAdHocCommand', 'handle_work_error', 'handle_work_success',
'RunAdHocCommand', 'handle_work_error', 'handle_work_success', 'apply_cluster_membership_policies',
'update_inventory_computed_fields', 'update_host_smart_inventory_memberships',
'send_notifications', 'run_administrative_checks', 'purge_old_stdout_files']
@ -132,41 +133,54 @@ def inform_cluster_of_shutdown(*args, **kwargs):
logger.exception('Encountered problem with normal shutdown signal.')
@shared_task(bind=True, queue='tower', base=LogErrorsTask)
@shared_task(bind=True, queue='tower_instance_router', base=LogErrorsTask)
def apply_cluster_membership_policies(self):
considered_instances = Instance.objects.all().order_by('id').only('id')
total_instances = considered_instances.count()
actual_groups = []
actual_instances = []
Group = namedtuple('Group', ['obj', 'instances'])
Instance = namedtuple('Instance', ['obj', 'groups'])
# Process policy instance list first, these will represent manually managed instances
# that will not go through automatic policy determination
for ig in InstanceGroup.objects.all():
group_actual = Group(obj=ig, instances=[])
for i in ig.policy_instance_list:
group_actual.instances.append(i)
if i in considered_instances:
considered_instances.remove(i)
actual_groups.append(group_actual)
# Process Instance minimum policies next, since it represents a concrete lower bound to the
# number of instances to make available to instance groups
for i in considered_instances:
instance_actual = Instance(obj=i, groups=[])
with advisory_lock('cluster_policy_lock', wait=True):
considered_instances = Instance.objects.all().order_by('id')
total_instances = considered_instances.count()
filtered_instances = []
actual_groups = []
actual_instances = []
Group = namedtuple('Group', ['obj', 'instances'])
Node = namedtuple('Instance', ['obj', 'groups'])
# Process policy instance list first, these will represent manually managed instances
# that will not go through automatic policy determination
for ig in InstanceGroup.objects.all():
logger.info("Considering group {}".format(ig.name))
ig.instances.clear()
group_actual = Group(obj=ig, instances=[])
for i in ig.policy_instance_list:
inst = Instance.objects.filter(hostname=i)
if not inst.exists():
continue
inst = inst[0]
logger.info("Policy List, adding {} to {}".format(inst.hostname, ig.name))
group_actual.instances.append(inst.id)
ig.instances.add(inst)
filtered_instances.append(inst)
actual_groups.append(group_actual)
# Process Instance minimum policies next, since it represents a concrete lower bound to the
# number of instances to make available to instance groups
actual_instances = [Node(obj=i, groups=[]) for i in filter(lambda x: x not in filtered_instances, considered_instances)]
logger.info("Total instances not directly associated: {}".format(total_instances))
for g in sorted(actual_groups, cmp=lambda x,y: len(x.instances) - len(y.instances)):
if len(g.instances) < g.obj.policy_instance_minimum:
g.instances.append(instance_actual.obj.id)
instance_actual.groups.append(g.obj.id)
break
actual_instances.append(instance_actual)
# Finally process instance policy percentages
for i in sorted(actual_instances, cmp=lambda x,y: len(x.groups) - len(y.groups)):
for g in sorted(actual_groups, cmp=lambda x,y: len(x.instances) - len(y.instances)):
if 100 * float(len(g.instances)) / total_instances < g.obj.policy_instance_percentage:
for i in sorted(actual_instances, cmp=lambda x,y: len(x.groups) - len(y.groups)):
if len(g.instances) >= g.obj.policy_instance_minimum:
break
logger.info("Policy minimum, adding {} to {}".format(i.obj.hostname, g.obj.name))
g.obj.instances.add(i.obj)
g.instances.append(i.obj.id)
i.groups.append(g.obj.id)
break
# Next step
# Finally process instance policy percentages
for g in sorted(actual_groups, cmp=lambda x,y: len(x.instances) - len(y.instances)):
for i in sorted(actual_instances, cmp=lambda x,y: len(x.groups) - len(y.groups)):
if 100 * float(len(g.instances)) / len(actual_instances) >= g.obj.policy_instance_percentage:
break
logger.info("Policy percentage, adding {} to {}".format(i.obj.hostname, g.obj.name))
g.instances.append(i.obj.id)
g.obj.instances.add(i.obj)
i.groups.append(g.obj.id)
handle_ha_toplogy_changes.apply_async()
@shared_task(queue='tower_broadcast_all', bind=True, base=LogErrorsTask)
@ -190,12 +204,14 @@ def handle_setting_changes(self, setting_keys):
def handle_ha_toplogy_changes(self):
instance = Instance.objects.me()
logger.debug("Reconfigure celeryd queues task on host {}".format(self.request.hostname))
(instance, removed_queues, added_queues) = register_celery_worker_queues(self.app, self.request.hostname)
awx_app = Celery('awx')
awx_app.config_from_object('django.conf:settings', namespace='CELERY')
(instance, removed_queues, added_queues) = register_celery_worker_queues(awx_app, self.request.hostname)
logger.info("Workers on tower node '{}' removed from queues {} and added to queues {}"
.format(instance.hostname, removed_queues, added_queues))
updated_routes = update_celery_worker_routes(instance, settings)
logger.info("Worker on tower node '{}' updated celery routes {} all routes are now {}"
.format(instance.hostname, updated_routes, self.app.conf.CELERY_ROUTES))
.format(instance.hostname, updated_routes, self.app.conf.CELERY_TASK_ROUTES))
@worker_ready.connect
@ -213,7 +229,7 @@ def handle_update_celery_routes(sender=None, conf=None, **kwargs):
instance = Instance.objects.me()
added_routes = update_celery_worker_routes(instance, conf)
logger.info("Workers on tower node '{}' added routes {} all routes are now {}"
.format(instance.hostname, added_routes, conf.CELERY_ROUTES))
.format(instance.hostname, added_routes, conf.CELERY_TASK_ROUTES))
@celeryd_after_setup.connect

View File

@ -35,8 +35,9 @@ def mk_instance(persisted=True, hostname='instance.example.org'):
return Instance.objects.get_or_create(uuid=settings.SYSTEM_UUID, hostname=hostname)[0]
def mk_instance_group(name='tower', instance=None):
ig, status = InstanceGroup.objects.get_or_create(name=name)
def mk_instance_group(name='tower', instance=None, minimum=0, percentage=0):
ig, status = InstanceGroup.objects.get_or_create(name=name, policy_instance_minimum=minimum,
policy_instance_percentage=percentage)
if instance is not None:
if type(instance) == list:
for i in instance:

View File

@ -135,8 +135,8 @@ def create_instance(name, instance_groups=None):
return mk_instance(hostname=name)
def create_instance_group(name, instances=None):
return mk_instance_group(name=name, instance=instances)
def create_instance_group(name, instances=None, minimum=0, percentage=0):
return mk_instance_group(name=name, instance=instances, minimum=minimum, percentage=percentage)
def create_survey_spec(variables=None, default_type='integer', required=True, min=None, max=None):

View File

@ -2,6 +2,8 @@ import pytest
import mock
from datetime import timedelta
from awx.main.scheduler import TaskManager
from awx.main.models import InstanceGroup
from awx.main.tasks import apply_cluster_membership_policies
@pytest.mark.django_db
@ -151,3 +153,34 @@ def test_failover_group_run(instance_factory, default_instance_group, mocker,
tm.schedule()
mock_job.assert_has_calls([mock.call(j1, ig1, []), mock.call(j1_1, ig2, [])])
assert mock_job.call_count == 2
@pytest.mark.django_db
def test_instance_group_basic_policies(instance_factory, instance_group_factory):
i0 = instance_factory("i0")
i1 = instance_factory("i1")
i2 = instance_factory("i2")
i3 = instance_factory("i3")
i4 = instance_factory("i4")
ig0 = instance_group_factory("ig0")
ig1 = instance_group_factory("ig1", minimum=2)
ig2 = instance_group_factory("ig2", percentage=50)
ig3 = instance_group_factory("ig3", percentage=50)
ig0.policy_instance_list.append(i0.hostname)
ig0.save()
apply_cluster_membership_policies()
ig0 = InstanceGroup.objects.get(id=ig0.id)
ig1 = InstanceGroup.objects.get(id=ig1.id)
ig2 = InstanceGroup.objects.get(id=ig2.id)
ig3 = InstanceGroup.objects.get(id=ig3.id)
assert len(ig0.instances.all()) == 1
assert i0 in ig0.instances.all()
assert len(InstanceGroup.objects.get(id=ig1.id).instances.all()) == 2
assert i1 in ig1.instances.all()
assert i2 in ig1.instances.all()
assert len(InstanceGroup.objects.get(id=ig2.id).instances.all()) == 2
assert i3 in ig2.instances.all()
assert i4 in ig2.instances.all()
assert len(InstanceGroup.objects.get(id=ig3.id).instances.all()) == 2
assert i1 in ig3.instances.all()
assert i2 in ig3.instances.all()

View File

@ -17,7 +17,7 @@ from awx.main.utils.ha import (
@pytest.fixture
def conf():
class Conf():
CELERY_ROUTES = dict()
CELERY_TASK_ROUTES = dict()
CELERYBEAT_SCHEDULE = dict()
return Conf()
@ -87,14 +87,14 @@ class TestUpdateCeleryWorkerRoutes():
instance.is_controller = mocker.MagicMock(return_value=is_controller)
assert update_celery_worker_routes(instance, conf) == expected_routes
assert conf.CELERY_ROUTES == expected_routes
assert conf.CELERY_TASK_ROUTES == expected_routes
def test_update_celery_worker_routes_deleted(self, mocker, conf):
instance = mocker.MagicMock()
instance.hostname = 'east-1'
instance.is_controller = mocker.MagicMock(return_value=False)
conf.CELERY_ROUTES = {'awx.main.tasks.awx_isolated_heartbeat': 'foobar'}
conf.CELERY_TASK_ROUTES = {'awx.main.tasks.awx_isolated_heartbeat': 'foobar'}
update_celery_worker_routes(instance, conf)
assert 'awx.main.tasks.awx_isolated_heartbeat' not in conf.CELERY_ROUTES
assert 'awx.main.tasks.awx_isolated_heartbeat' not in conf.CELERY_TASK_ROUTES

View File

@ -14,6 +14,7 @@ def _add_remove_celery_worker_queues(app, instance, worker_queues, worker_name):
removed_queues = []
added_queues = []
ig_names = set(instance.rampart_groups.values_list('name', flat=True))
ig_names.add("tower_instance_router")
worker_queue_names = set([q['name'] for q in worker_queues])
@ -47,12 +48,12 @@ def update_celery_worker_routes(instance, conf):
if instance.is_controller():
tasks.append('awx.main.tasks.awx_isolated_heartbeat')
else:
if 'awx.main.tasks.awx_isolated_heartbeat' in conf.CELERY_ROUTES:
del conf.CELERY_ROUTES['awx.main.tasks.awx_isolated_heartbeat']
if 'awx.main.tasks.awx_isolated_heartbeat' in conf.CELERY_TASK_ROUTES:
del conf.CELERY_TASK_ROUTES['awx.main.tasks.awx_isolated_heartbeat']
for t in tasks:
conf.CELERY_ROUTES[t] = {'queue': instance.hostname, 'routing_key': instance.hostname}
routes_updated[t] = conf.CELERY_ROUTES[t]
conf.CELERY_TASK_ROUTES[t] = {'queue': instance.hostname, 'routing_key': instance.hostname}
routes_updated[t] = conf.CELERY_TASK_ROUTES[t]
return routes_updated

View File

@ -432,6 +432,7 @@ DEVSERVER_DEFAULT_PORT = '8013'
# Set default ports for live server tests.
os.environ.setdefault('DJANGO_LIVE_TEST_SERVER_ADDRESS', 'localhost:9013-9199')
BROKER_POOL_LIMIT = None
CELERY_BROKER_URL = 'amqp://guest:guest@localhost:5672//'
CELERY_EVENT_QUEUE_TTL = 5
CELERY_TASK_DEFAULT_QUEUE = 'tower'
@ -452,7 +453,7 @@ CELERY_TASK_QUEUES = (
)
CELERY_TASK_ROUTES = {}
CELERYBEAT_SCHEDULE = {
CELERY_BEAT_SCHEDULE = {
'tower_scheduler': {
'task': 'awx.main.tasks.awx_periodic_scheduler',
'schedule': timedelta(seconds=30),
@ -1123,9 +1124,11 @@ LOGGING = {
},
'awx.main.tasks': {
'handlers': ['task_system'],
'propagate': False
},
'awx.main.scheduler': {
'handlers': ['task_system'],
'propagate': False
},
'awx.main.consumers': {
'handlers': ['null']

View File

@ -19,5 +19,5 @@ else
awx-manage create_preload_data
fi
awx-manage provision_instance --hostname=$(hostname)
awx-manage register_queue --queuename=tower --hostnames=$(hostname)
awx-manage register_queue --queuename=tower --instance_percent=100
supervisord -c /supervisor_task.conf

View File

@ -3,7 +3,7 @@ nodaemon = True
umask = 022
[program:celery]
command = /var/lib/awx/venv/awx/bin/celery worker -A awx -B -l debug --autoscale=4 -Ofair -s /var/lib/awx/beat.db -Q tower_broadcast_all -n celery@$(ENV_HOSTNAME)s
command = /var/lib/awx/venv/awx/bin/celery worker -A awx -B -l debug --autoscale=4 -Ofair -s /var/lib/awx/beat.db -Q tower_broadcast_all -n celery@%(ENV_HOSTNAME)s
directory = /var/lib/awx
environment = LANGUAGE="en_US.UTF-8",LANG="en_US.UTF-8",LC_ALL="en_US.UTF-8",LC_CTYPE="en_US.UTF-8"
#user = {{ aw_user }}