From 16812542f88ac2a14057d4e258e22103e847deab Mon Sep 17 00:00:00 2001 From: Ryan Petrello Date: Thu, 17 Oct 2019 16:51:30 -0400 Subject: [PATCH] implement a simple periodic pod reaper for container groups see: https://github.com/ansible/awx/issues/4911 --- awx/main/scheduler/kubernetes.py | 28 +++++++++++++++++++++++++++- awx/main/tasks.py | 19 +++++++++++++++++++ awx/settings/defaults.py | 5 +++++ 3 files changed, 51 insertions(+), 1 deletion(-) diff --git a/awx/main/scheduler/kubernetes.py b/awx/main/scheduler/kubernetes.py index 90f2849c3d..00f82a3859 100644 --- a/awx/main/scheduler/kubernetes.py +++ b/awx/main/scheduler/kubernetes.py @@ -1,3 +1,4 @@ +import collections import os import stat import time @@ -47,6 +48,27 @@ class PodManager(object): else: logger.warn(f"Pod {self.pod_name} did not start. Status is {pod.status.phase}.") + @classmethod + def list_active_jobs(self, instance_group): + task = collections.namedtuple('Task', 'id instance_group')( + id='', + instance_group=instance_group + ) + pm = PodManager(task) + try: + for pod in pm.kube_api.list_namespaced_pod( + pm.namespace, + label_selector='ansible-awx={}'.format(settings.INSTALL_UUID) + ).to_dict().get('items', []): + job = pod['metadata'].get('labels', {}).get('ansible-awx-job-id') + if job: + try: + yield int(job) + except ValueError: + pass + except Exception: + logger.exception('Failed to list pods for container group {}'.format(instance_group)) + def delete(self): return self.kube_api.delete_namespaced_pod(name=self.pod_name, namespace=self.namespace, @@ -71,7 +93,7 @@ class PodManager(object): @property def pod_name(self): - return f"job-{self.task.id}" + return f"awx-job-{self.task.id}" @property def pod_definition(self): @@ -102,6 +124,10 @@ class PodManager(object): if self.task: pod_spec['metadata']['name'] = self.pod_name + pod_spec['metadata']['labels'] = { + 'ansible-awx': settings.INSTALL_UUID, + 'ansible-awx-job-id': str(self.task.id) + } pod_spec['spec']['containers'][0]['name'] = self.pod_name return pod_spec diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 10aac79dd9..ff53cd00ac 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -458,6 +458,25 @@ def cluster_node_heartbeat(): logger.exception('Error marking {} as lost'.format(other_inst.hostname)) +@task(queue=get_local_queuename) +def awx_k8s_reaper(): + from awx.main.scheduler.kubernetes import PodManager # prevent circular import + for group in InstanceGroup.objects.filter(credential__isnull=False).iterator(): + if group.is_containerized: + logger.debug("Checking for orphaned k8s pods for {}.".format(group)) + for job in UnifiedJob.objects.filter( + pk__in=list(PodManager.list_active_jobs(group)) + ).exclude(status__in=ACTIVE_STATES): + logger.debug('{} is no longer active, reaping orphaned k8s pod'.format(job.log_format)) + try: + PodManager(job).delete() + except Exception: + logger.exception("Failed to delete orphaned pod {} from {}".format( + job.log_format, group + )) + + + @task(queue=get_local_queuename) def awx_isolated_heartbeat(): local_hostname = settings.CLUSTER_HOST_ID diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 07c76f8b01..658d41d6b3 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -479,6 +479,11 @@ CELERYBEAT_SCHEDULE = { 'schedule': timedelta(seconds=20), 'options': {'expires': 20} }, + 'k8s_reaper': { + 'task': 'awx.main.tasks.awx_k8s_reaper', + 'schedule': timedelta(seconds=60), + 'options': {'expires': 50,} + }, # 'isolated_heartbeat': set up at the end of production.py and development.py } AWX_INCONSISTENT_TASK_INTERVAL = 60 * 3