diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 8360ae82db..0a3fe95a44 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -148,38 +148,55 @@ def inform_cluster_of_shutdown(*args, **kwargs): @shared_task(bind=True, queue=settings.CELERY_DEFAULT_QUEUE) def apply_cluster_membership_policies(self): with advisory_lock('cluster_policy_lock', wait=True): - considered_instances = Instance.objects.all_non_isolated().order_by('id') - total_instances = considered_instances.count() + all_instances = list(Instance.objects.order_by('id')) + all_groups = list(InstanceGroup.objects.all()) + iso_hostnames = set([]) + for ig in all_groups: + if ig.controller_id is not None: + iso_hostnames.update(ig.policy_instance_list) + + considered_instances = [inst for inst in all_instances if inst.hostname not in iso_hostnames] + total_instances = len(considered_instances) actual_groups = [] actual_instances = [] Group = namedtuple('Group', ['obj', 'instances']) Node = namedtuple('Instance', ['obj', 'groups']) # Process policy instance list first, these will represent manually managed memberships - for ig in InstanceGroup.objects.all(): - logger.info(six.text_type("Applying cluster policy instance list to Group {}").format(ig.name)) - ig.instances.clear() + instance_hostnames_map = {inst.hostname: inst for inst in all_instances} + for ig in all_groups: group_actual = Group(obj=ig, instances=[]) - for i in ig.policy_instance_list: - inst = Instance.objects.filter(hostname=i) - if not inst.exists(): + for hostname in ig.policy_instance_list: + if hostname not in instance_hostnames_map: continue - inst = inst[0] + inst = instance_hostnames_map[hostname] logger.info(six.text_type("Policy List, adding Instance {} to Group {}").format(inst.hostname, ig.name)) group_actual.instances.append(inst.id) - ig.instances.add(inst) - actual_groups.append(group_actual) + # NOTE: arguable behavior: policy-list-group is not added to + # instance's group count for consideration in minimum-policy rules + + if ig.controller_id is None: + actual_groups.append(group_actual) + else: + # For isolated groups, _only_ apply the policy_instance_list + # do not add to in-memory list, so minimum rules not applied + logger.info('Committing instances {} to isolated group {}'.format(group_actual.instances, ig.name)) + ig.instances.set(group_actual.instances) # Process Instance minimum policies next, since it represents a concrete lower bound to the # number of instances to make available to instance groups actual_instances = [Node(obj=i, groups=[]) for i in considered_instances if i.managed_by_policy] - logger.info("Total instances not directly associated: {}".format(total_instances)) + logger.info("Total non-isolated instances:{} available for policy: {}".format( + total_instances, len(actual_instances))) for g in sorted(actual_groups, cmp=lambda x,y: len(x.instances) - len(y.instances)): for i in sorted(actual_instances, cmp=lambda x,y: len(x.groups) - len(y.groups)): if len(g.instances) >= g.obj.policy_instance_minimum: break + if i.obj.id in g.instances: + # If the instance is already _in_ the group, it was + # applied earlier via the policy list + continue logger.info(six.text_type("Policy minimum, adding Instance {} to Group {}").format(i.obj.hostname, g.obj.name)) - g.obj.instances.add(i.obj) g.instances.append(i.obj.id) i.groups.append(g.obj.id) @@ -188,15 +205,20 @@ def apply_cluster_membership_policies(self): for i in sorted(actual_instances, cmp=lambda x,y: len(x.groups) - len(y.groups)): if i.obj.id in g.instances: # If the instance is already _in_ the group, it was - # probably applied earlier via a minimum policy + # applied earlier via a minimum policy or policy list continue if 100 * float(len(g.instances)) / len(actual_instances) >= g.obj.policy_instance_percentage: break logger.info(six.text_type("Policy percentage, adding Instance {} to Group {}").format(i.obj.hostname, g.obj.name)) g.instances.append(i.obj.id) - g.obj.instances.add(i.obj) i.groups.append(g.obj.id) + # On a differential basis, apply instances to non-isolated groups + with transaction.atomic(): + for g in actual_groups: + logger.info('Committing instances {} to group {}'.format(g.instances, g.obj.name)) + g.obj.instances.set(g.instances) + @shared_task(exchange='tower_broadcast_all', bind=True) def handle_setting_changes(self, setting_keys):