mirror of
https://github.com/ansible/awx.git
synced 2026-05-07 01:17:37 -02:30
set capacity to 0 if instance has not checked in lately
This commit is contained in:
@@ -33,7 +33,7 @@ from celery.signals import celeryd_init, worker_process_init
|
||||
|
||||
# Django
|
||||
from django.conf import settings
|
||||
from django.db import transaction, DatabaseError, IntegrityError
|
||||
from django.db import transaction, DatabaseError, IntegrityError, OperationalError
|
||||
from django.utils.timezone import now, timedelta
|
||||
from django.utils.encoding import smart_str
|
||||
from django.core.mail import send_mail
|
||||
@@ -184,32 +184,54 @@ def purge_old_stdout_files(self):
|
||||
def cluster_node_heartbeat(self):
|
||||
logger.debug("Cluster node heartbeat task.")
|
||||
nowtime = now()
|
||||
inst = Instance.objects.filter(hostname=settings.CLUSTER_HOST_ID)
|
||||
if inst.exists():
|
||||
inst = inst[0]
|
||||
inst.capacity = get_system_task_capacity()
|
||||
inst.version = awx_application_version
|
||||
inst.save()
|
||||
instance_list = list(Instance.objects.filter(rampart_groups__controller__isnull=True).distinct())
|
||||
this_inst = None
|
||||
lost_instances = []
|
||||
for inst in list(instance_list):
|
||||
if inst.hostname == settings.CLUSTER_HOST_ID:
|
||||
this_inst = inst
|
||||
instance_list.remove(inst)
|
||||
elif inst.is_lost(ref_time=nowtime):
|
||||
lost_instances.append(inst)
|
||||
instance_list.remove(inst)
|
||||
if this_inst:
|
||||
startup_event = this_inst.is_lost(ref_time=nowtime)
|
||||
if this_inst.capacity == 0:
|
||||
logger.warning('Rejoining the cluster as instance {}.'.format(this_inst.hostname))
|
||||
this_inst.capacity = get_system_task_capacity()
|
||||
this_inst.version = awx_application_version
|
||||
this_inst.save(update_fields=['capacity', 'version', 'modified'])
|
||||
if startup_event:
|
||||
return
|
||||
else:
|
||||
raise RuntimeError("Cluster Host Not Found: {}".format(settings.CLUSTER_HOST_ID))
|
||||
recent_inst = Instance.objects.filter(modified__gt=nowtime - timedelta(seconds=70)).exclude(hostname=settings.CLUSTER_HOST_ID)
|
||||
# IFF any node has a greater version than we do, then we'll shutdown services
|
||||
for other_inst in recent_inst:
|
||||
for other_inst in instance_list:
|
||||
if other_inst.version == "":
|
||||
continue
|
||||
if Version(other_inst.version.split('-', 1)[0]) > Version(awx_application_version) and not settings.DEBUG:
|
||||
logger.error("Host {} reports version {}, but this node {} is at {}, shutting down".format(other_inst.hostname,
|
||||
other_inst.version,
|
||||
inst.hostname,
|
||||
inst.version))
|
||||
this_inst.hostname,
|
||||
this_inst.version))
|
||||
# Set the capacity to zero to ensure no Jobs get added to this instance.
|
||||
# The heartbeat task will reset the capacity to the system capacity after upgrade.
|
||||
inst.capacity = 0
|
||||
inst.save()
|
||||
this_inst.capacity = 0
|
||||
this_inst.save(update_fields=['capacity'])
|
||||
stop_local_services(['uwsgi', 'celery', 'beat', 'callback', 'fact'])
|
||||
# We wait for the Popen call inside stop_local_services above
|
||||
# so the line below will rarely if ever be executed.
|
||||
raise RuntimeError("Shutting down.")
|
||||
for other_inst in lost_instances:
|
||||
if other_inst.capacity == 0:
|
||||
continue
|
||||
try:
|
||||
other_inst.capacity = 0
|
||||
other_inst.save(update_fields=['capacity'])
|
||||
logger.error("Host {} last checked in at {}, marked as lost.".format(
|
||||
other_inst.hostname, other_inst.modified))
|
||||
except (IntegrityError, OperationalError):
|
||||
pass # another instance is updating the lost instance
|
||||
|
||||
|
||||
@task(bind=True, base=LogErrorsTask)
|
||||
|
||||
Reference in New Issue
Block a user