From 74fc0fef04474991c25e04705e661935cbb6e80d Mon Sep 17 00:00:00 2001 From: AlanCoding Date: Fri, 24 Aug 2018 14:13:39 -0400 Subject: [PATCH] Manually pin reference list at start of pg_lock block --- awx/main/tasks.py | 53 +++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 44 insertions(+), 9 deletions(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index a355a28e41..f4014e9521 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -147,9 +147,17 @@ def inform_cluster_of_shutdown(*args, **kwargs): @shared_task(bind=True, queue=settings.CELERY_DEFAULT_QUEUE) def apply_cluster_membership_policies(self): + started_waiting = time.time() with advisory_lock('cluster_policy_lock', wait=True): + lock_time = time.time() - started_waiting + if lock_time > 1.0: + to_log = logger.info + else: + to_log = logger.debug + to_log('Waited {} seconds to obtain lock name: cluster_policy_lock'.format(lock_time)) + started_compute = time.time() all_instances = list(Instance.objects.order_by('id')) - all_groups = list(InstanceGroup.objects.all()) + all_groups = list(InstanceGroup.objects.prefetch_related('instances')) iso_hostnames = set([]) for ig in all_groups: if ig.controller_id is not None: @@ -159,28 +167,32 @@ def apply_cluster_membership_policies(self): total_instances = len(considered_instances) actual_groups = [] actual_instances = [] - Group = namedtuple('Group', ['obj', 'instances']) + Group = namedtuple('Group', ['obj', 'instances', 'prior_instances']) Node = namedtuple('Instance', ['obj', 'groups']) # Process policy instance list first, these will represent manually managed memberships instance_hostnames_map = {inst.hostname: inst for inst in all_instances} for ig in all_groups: - group_actual = Group(obj=ig, instances=[]) + group_actual = Group(obj=ig, instances=[], prior_instances=[ + instance.pk for instance in ig.instances.all() # obtained in prefetch + ]) for hostname in ig.policy_instance_list: if hostname not in instance_hostnames_map: + logger.info(six.text_type("Unknown instance {} in {} policy list").format(hostname, ig.name)) continue inst = instance_hostnames_map[hostname] - logger.info(six.text_type("Policy List, adding Instance {} to Group {}").format(inst.hostname, ig.name)) group_actual.instances.append(inst.id) # NOTE: arguable behavior: policy-list-group is not added to # instance's group count for consideration in minimum-policy rules + if group_actual.instances: + logger.info(six.text_type("Policy List, adding Instances {} to Group {}").format(group_actual.instances, ig.name)) if ig.controller_id is None: actual_groups.append(group_actual) else: # For isolated groups, _only_ apply the policy_instance_list # do not add to in-memory list, so minimum rules not applied - logger.info('Committing instances {} to isolated group {}'.format(group_actual.instances, ig.name)) + logger.info('Committing instances to isolated group {}'.format(ig.name)) ig.instances.set(group_actual.instances) # Process Instance minimum policies next, since it represents a concrete lower bound to the @@ -189,6 +201,7 @@ def apply_cluster_membership_policies(self): logger.info("Total non-isolated instances:{} available for policy: {}".format( total_instances, len(actual_instances))) for g in sorted(actual_groups, cmp=lambda x,y: len(x.instances) - len(y.instances)): + policy_min_added = [] for i in sorted(actual_instances, cmp=lambda x,y: len(x.groups) - len(y.groups)): if len(g.instances) >= g.obj.policy_instance_minimum: break @@ -196,12 +209,15 @@ def apply_cluster_membership_policies(self): # If the instance is already _in_ the group, it was # applied earlier via the policy list continue - logger.info(six.text_type("Policy minimum, adding Instance {} to Group {}").format(i.obj.hostname, g.obj.name)) g.instances.append(i.obj.id) i.groups.append(g.obj.id) + policy_min_added.append(i.obj.id) + if policy_min_added: + logger.info(six.text_type("Policy minimum, adding Instances {} to Group {}").format(policy_min_added, g.obj.name)) # Finally, process instance policy percentages for g in sorted(actual_groups, cmp=lambda x,y: len(x.instances) - len(y.instances)): + policy_per_added = [] for i in sorted(actual_instances, cmp=lambda x,y: len(x.groups) - len(y.groups)): if i.obj.id in g.instances: # If the instance is already _in_ the group, it was @@ -209,15 +225,34 @@ def apply_cluster_membership_policies(self): continue if 100 * float(len(g.instances)) / len(actual_instances) >= g.obj.policy_instance_percentage: break - logger.info(six.text_type("Policy percentage, adding Instance {} to Group {}").format(i.obj.hostname, g.obj.name)) g.instances.append(i.obj.id) i.groups.append(g.obj.id) + policy_per_added.append(i.obj.id) + if policy_per_added: + logger.info(six.text_type("Policy percentage, adding Instances {} to Group {}").format(policy_per_added, g.obj.name)) + + # Determine if any changes need to be made + needs_change = False + for g in actual_groups: + if set(g.instances) != set(g.prior_instances): + needs_change = True + break + if not needs_change: + logger.info('Cluster policy no-op finished in {} seconds'.format(time.time() - started_compute)) + return # On a differential basis, apply instances to non-isolated groups with transaction.atomic(): for g in actual_groups: - logger.info('Committing instances {} to group {}'.format(g.instances, g.obj.name)) - g.obj.instances.set(g.instances) + instances_to_add = set(g.instances) - set(g.prior_instances) + instances_to_remove = set(g.prior_instances) - set(g.instances) + if instances_to_add: + logger.info('Adding instances {} to group {}'.format(list(instances_to_add), g.obj.name)) + g.obj.instances.add(*instances_to_add) + if instances_to_remove: + logger.info('Removing instances {} from group {}'.format(list(instances_to_remove), g.obj.name)) + g.obj.instances.remove(*instances_to_remove) + logger.info('Cluster policy computation finished in {} seconds'.format(time.time() - started_compute)) @shared_task(exchange='tower_broadcast_all', bind=True)