mirror of
https://github.com/ansible/awx.git
synced 2026-02-27 07:56:06 -03:30
Manually pin reference list at start of pg_lock block
This commit is contained in:
@@ -147,9 +147,17 @@ def inform_cluster_of_shutdown(*args, **kwargs):
|
|||||||
|
|
||||||
@shared_task(bind=True, queue=settings.CELERY_DEFAULT_QUEUE)
|
@shared_task(bind=True, queue=settings.CELERY_DEFAULT_QUEUE)
|
||||||
def apply_cluster_membership_policies(self):
|
def apply_cluster_membership_policies(self):
|
||||||
|
started_waiting = time.time()
|
||||||
with advisory_lock('cluster_policy_lock', wait=True):
|
with advisory_lock('cluster_policy_lock', wait=True):
|
||||||
|
lock_time = time.time() - started_waiting
|
||||||
|
if lock_time > 1.0:
|
||||||
|
to_log = logger.info
|
||||||
|
else:
|
||||||
|
to_log = logger.debug
|
||||||
|
to_log('Waited {} seconds to obtain lock name: cluster_policy_lock'.format(lock_time))
|
||||||
|
started_compute = time.time()
|
||||||
all_instances = list(Instance.objects.order_by('id'))
|
all_instances = list(Instance.objects.order_by('id'))
|
||||||
all_groups = list(InstanceGroup.objects.all())
|
all_groups = list(InstanceGroup.objects.prefetch_related('instances'))
|
||||||
iso_hostnames = set([])
|
iso_hostnames = set([])
|
||||||
for ig in all_groups:
|
for ig in all_groups:
|
||||||
if ig.controller_id is not None:
|
if ig.controller_id is not None:
|
||||||
@@ -159,28 +167,32 @@ def apply_cluster_membership_policies(self):
|
|||||||
total_instances = len(considered_instances)
|
total_instances = len(considered_instances)
|
||||||
actual_groups = []
|
actual_groups = []
|
||||||
actual_instances = []
|
actual_instances = []
|
||||||
Group = namedtuple('Group', ['obj', 'instances'])
|
Group = namedtuple('Group', ['obj', 'instances', 'prior_instances'])
|
||||||
Node = namedtuple('Instance', ['obj', 'groups'])
|
Node = namedtuple('Instance', ['obj', 'groups'])
|
||||||
|
|
||||||
# Process policy instance list first, these will represent manually managed memberships
|
# Process policy instance list first, these will represent manually managed memberships
|
||||||
instance_hostnames_map = {inst.hostname: inst for inst in all_instances}
|
instance_hostnames_map = {inst.hostname: inst for inst in all_instances}
|
||||||
for ig in all_groups:
|
for ig in all_groups:
|
||||||
group_actual = Group(obj=ig, instances=[])
|
group_actual = Group(obj=ig, instances=[], prior_instances=[
|
||||||
|
instance.pk for instance in ig.instances.all() # obtained in prefetch
|
||||||
|
])
|
||||||
for hostname in ig.policy_instance_list:
|
for hostname in ig.policy_instance_list:
|
||||||
if hostname not in instance_hostnames_map:
|
if hostname not in instance_hostnames_map:
|
||||||
|
logger.info(six.text_type("Unknown instance {} in {} policy list").format(hostname, ig.name))
|
||||||
continue
|
continue
|
||||||
inst = instance_hostnames_map[hostname]
|
inst = instance_hostnames_map[hostname]
|
||||||
logger.info(six.text_type("Policy List, adding Instance {} to Group {}").format(inst.hostname, ig.name))
|
|
||||||
group_actual.instances.append(inst.id)
|
group_actual.instances.append(inst.id)
|
||||||
# NOTE: arguable behavior: policy-list-group is not added to
|
# NOTE: arguable behavior: policy-list-group is not added to
|
||||||
# instance's group count for consideration in minimum-policy rules
|
# instance's group count for consideration in minimum-policy rules
|
||||||
|
if group_actual.instances:
|
||||||
|
logger.info(six.text_type("Policy List, adding Instances {} to Group {}").format(group_actual.instances, ig.name))
|
||||||
|
|
||||||
if ig.controller_id is None:
|
if ig.controller_id is None:
|
||||||
actual_groups.append(group_actual)
|
actual_groups.append(group_actual)
|
||||||
else:
|
else:
|
||||||
# For isolated groups, _only_ apply the policy_instance_list
|
# For isolated groups, _only_ apply the policy_instance_list
|
||||||
# do not add to in-memory list, so minimum rules not applied
|
# do not add to in-memory list, so minimum rules not applied
|
||||||
logger.info('Committing instances {} to isolated group {}'.format(group_actual.instances, ig.name))
|
logger.info('Committing instances to isolated group {}'.format(ig.name))
|
||||||
ig.instances.set(group_actual.instances)
|
ig.instances.set(group_actual.instances)
|
||||||
|
|
||||||
# Process Instance minimum policies next, since it represents a concrete lower bound to the
|
# Process Instance minimum policies next, since it represents a concrete lower bound to the
|
||||||
@@ -189,6 +201,7 @@ def apply_cluster_membership_policies(self):
|
|||||||
logger.info("Total non-isolated instances:{} available for policy: {}".format(
|
logger.info("Total non-isolated instances:{} available for policy: {}".format(
|
||||||
total_instances, len(actual_instances)))
|
total_instances, len(actual_instances)))
|
||||||
for g in sorted(actual_groups, cmp=lambda x,y: len(x.instances) - len(y.instances)):
|
for g in sorted(actual_groups, cmp=lambda x,y: len(x.instances) - len(y.instances)):
|
||||||
|
policy_min_added = []
|
||||||
for i in sorted(actual_instances, cmp=lambda x,y: len(x.groups) - len(y.groups)):
|
for i in sorted(actual_instances, cmp=lambda x,y: len(x.groups) - len(y.groups)):
|
||||||
if len(g.instances) >= g.obj.policy_instance_minimum:
|
if len(g.instances) >= g.obj.policy_instance_minimum:
|
||||||
break
|
break
|
||||||
@@ -196,12 +209,15 @@ def apply_cluster_membership_policies(self):
|
|||||||
# If the instance is already _in_ the group, it was
|
# If the instance is already _in_ the group, it was
|
||||||
# applied earlier via the policy list
|
# applied earlier via the policy list
|
||||||
continue
|
continue
|
||||||
logger.info(six.text_type("Policy minimum, adding Instance {} to Group {}").format(i.obj.hostname, g.obj.name))
|
|
||||||
g.instances.append(i.obj.id)
|
g.instances.append(i.obj.id)
|
||||||
i.groups.append(g.obj.id)
|
i.groups.append(g.obj.id)
|
||||||
|
policy_min_added.append(i.obj.id)
|
||||||
|
if policy_min_added:
|
||||||
|
logger.info(six.text_type("Policy minimum, adding Instances {} to Group {}").format(policy_min_added, g.obj.name))
|
||||||
|
|
||||||
# Finally, process instance policy percentages
|
# Finally, process instance policy percentages
|
||||||
for g in sorted(actual_groups, cmp=lambda x,y: len(x.instances) - len(y.instances)):
|
for g in sorted(actual_groups, cmp=lambda x,y: len(x.instances) - len(y.instances)):
|
||||||
|
policy_per_added = []
|
||||||
for i in sorted(actual_instances, cmp=lambda x,y: len(x.groups) - len(y.groups)):
|
for i in sorted(actual_instances, cmp=lambda x,y: len(x.groups) - len(y.groups)):
|
||||||
if i.obj.id in g.instances:
|
if i.obj.id in g.instances:
|
||||||
# If the instance is already _in_ the group, it was
|
# If the instance is already _in_ the group, it was
|
||||||
@@ -209,15 +225,34 @@ def apply_cluster_membership_policies(self):
|
|||||||
continue
|
continue
|
||||||
if 100 * float(len(g.instances)) / len(actual_instances) >= g.obj.policy_instance_percentage:
|
if 100 * float(len(g.instances)) / len(actual_instances) >= g.obj.policy_instance_percentage:
|
||||||
break
|
break
|
||||||
logger.info(six.text_type("Policy percentage, adding Instance {} to Group {}").format(i.obj.hostname, g.obj.name))
|
|
||||||
g.instances.append(i.obj.id)
|
g.instances.append(i.obj.id)
|
||||||
i.groups.append(g.obj.id)
|
i.groups.append(g.obj.id)
|
||||||
|
policy_per_added.append(i.obj.id)
|
||||||
|
if policy_per_added:
|
||||||
|
logger.info(six.text_type("Policy percentage, adding Instances {} to Group {}").format(policy_per_added, g.obj.name))
|
||||||
|
|
||||||
|
# Determine if any changes need to be made
|
||||||
|
needs_change = False
|
||||||
|
for g in actual_groups:
|
||||||
|
if set(g.instances) != set(g.prior_instances):
|
||||||
|
needs_change = True
|
||||||
|
break
|
||||||
|
if not needs_change:
|
||||||
|
logger.info('Cluster policy no-op finished in {} seconds'.format(time.time() - started_compute))
|
||||||
|
return
|
||||||
|
|
||||||
# On a differential basis, apply instances to non-isolated groups
|
# On a differential basis, apply instances to non-isolated groups
|
||||||
with transaction.atomic():
|
with transaction.atomic():
|
||||||
for g in actual_groups:
|
for g in actual_groups:
|
||||||
logger.info('Committing instances {} to group {}'.format(g.instances, g.obj.name))
|
instances_to_add = set(g.instances) - set(g.prior_instances)
|
||||||
g.obj.instances.set(g.instances)
|
instances_to_remove = set(g.prior_instances) - set(g.instances)
|
||||||
|
if instances_to_add:
|
||||||
|
logger.info('Adding instances {} to group {}'.format(list(instances_to_add), g.obj.name))
|
||||||
|
g.obj.instances.add(*instances_to_add)
|
||||||
|
if instances_to_remove:
|
||||||
|
logger.info('Removing instances {} from group {}'.format(list(instances_to_remove), g.obj.name))
|
||||||
|
g.obj.instances.remove(*instances_to_remove)
|
||||||
|
logger.info('Cluster policy computation finished in {} seconds'.format(time.time() - started_compute))
|
||||||
|
|
||||||
|
|
||||||
@shared_task(exchange='tower_broadcast_all', bind=True)
|
@shared_task(exchange='tower_broadcast_all', bind=True)
|
||||||
|
|||||||
Reference in New Issue
Block a user