From ccd46a1c0fb5d2eea4194882c7e59b6eef33ebad Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Wed, 27 Jul 2022 10:58:58 -0400 Subject: [PATCH] Move reaper logic into worker, avoiding bottlenecks --- awx/main/dispatch/pool.py | 29 ++++++++++++++--------------- awx/main/dispatch/publish.py | 12 +++++++++++- awx/main/dispatch/reaper.py | 7 ++++--- awx/main/tasks/system.py | 9 +++++++-- 4 files changed, 36 insertions(+), 21 deletions(-) diff --git a/awx/main/dispatch/pool.py b/awx/main/dispatch/pool.py index ce6c297861..8c36fd7c57 100644 --- a/awx/main/dispatch/pool.py +++ b/awx/main/dispatch/pool.py @@ -16,6 +16,7 @@ from queue import Full as QueueFull, Empty as QueueEmpty from django.conf import settings from django.db import connection as django_connection, connections from django.core.cache import cache as django_cache +from django.utils.timezone import now as tz_now from django_guid import set_guid from jinja2 import Template import psutil @@ -437,18 +438,17 @@ class AutoscalePool(WorkerPool): idx = random.choice(range(len(self.workers))) self.write(idx, m) - # if the database says a job is running or queued on this node, but it's *not*, - # then reap it - running_uuids = [] - for worker in self.workers: - worker.calculate_managed_tasks() - running_uuids.extend(list(worker.managed_tasks.keys())) - - # if we are not in the dangerous situation of queue backup then clear old waiting jobs - if self.workers and max(len(w.managed_tasks) for w in self.workers) <= 1: - reaper.reap_waiting(excluded_uuids=running_uuids) - - reaper.reap(excluded_uuids=running_uuids) + def add_bind_kwargs(self, body): + bind_kwargs = body.pop('bind_kwargs', []) + body.setdefault('kwargs', {}) + if 'dispatch_time' in bind_kwargs: + body['kwargs']['dispatch_time'] = tz_now().isoformat() + if 'active_task_ids' in bind_kwargs: + active_task_ids = [] + for worker in self.workers: + worker.calculate_managed_tasks() + active_task_ids.extend(list(worker.managed_tasks.keys())) + body['kwargs']['active_task_ids'] = active_task_ids def up(self): if self.full: @@ -463,9 +463,8 @@ class AutoscalePool(WorkerPool): if 'guid' in body: set_guid(body['guid']) try: - # when the cluster heartbeat occurs, clean up internally - if isinstance(body, dict) and 'cluster_node_heartbeat' in body['task']: - self.cleanup() + if isinstance(body, dict) and body.get('bind_kwargs'): + self.add_bind_kwargs(body) if self.should_grow: self.up() # we don't care about "preferred queue" round robin distribution, just diff --git a/awx/main/dispatch/publish.py b/awx/main/dispatch/publish.py index dd19c1338c..bc496496d5 100644 --- a/awx/main/dispatch/publish.py +++ b/awx/main/dispatch/publish.py @@ -50,13 +50,21 @@ class task: @task(queue='tower_broadcast') def announce(): print("Run this everywhere!") + + # The special parameter bind_kwargs tells the main dispatcher process to add certain kwargs + + @task(bind_kwargs=['dispatch_time']) + def print_time(dispatch_time=None): + print(f"Time I was dispatched: {dispatch_time}") """ - def __init__(self, queue=None): + def __init__(self, queue=None, bind_kwargs=None): self.queue = queue + self.bind_kwargs = bind_kwargs def __call__(self, fn=None): queue = self.queue + bind_kwargs = self.bind_kwargs class PublisherMixin(object): @@ -80,6 +88,8 @@ class task: guid = get_guid() if guid: obj['guid'] = guid + if bind_kwargs: + obj['bind_kwargs'] = bind_kwargs obj.update(**kw) if callable(queue): queue = queue() diff --git a/awx/main/dispatch/reaper.py b/awx/main/dispatch/reaper.py index 7a0ae1b884..4248eac3f6 100644 --- a/awx/main/dispatch/reaper.py +++ b/awx/main/dispatch/reaper.py @@ -55,7 +55,7 @@ def reap_job(j, status, job_explanation=None): logger.error(f'{j.log_format} is no longer {status_before}; reaping') -def reap_waiting(instance=None, status='failed', job_explanation=None, grace_period=None, excluded_uuids=None): +def reap_waiting(instance=None, status='failed', job_explanation=None, grace_period=None, excluded_uuids=None, ref_time=None): """ Reap all jobs in waiting for this instance. """ @@ -69,8 +69,9 @@ def reap_waiting(instance=None, status='failed', job_explanation=None, grace_per except RuntimeError as e: logger.warning(f'Local instance is not registered, not running reaper: {e}') return - now = tz_now() - jobs = UnifiedJob.objects.filter(status='waiting', modified__lte=now - timedelta(seconds=grace_period), controller_node=me.hostname) + if ref_time is None: + ref_time = tz_now() + jobs = UnifiedJob.objects.filter(status='waiting', modified__lte=ref_time - timedelta(seconds=grace_period), controller_node=me.hostname) if excluded_uuids: jobs = jobs.exclude(celery_task_id__in=excluded_uuids) for j in jobs: diff --git a/awx/main/tasks/system.py b/awx/main/tasks/system.py index e36c502400..598b3a680f 100644 --- a/awx/main/tasks/system.py +++ b/awx/main/tasks/system.py @@ -10,6 +10,7 @@ from contextlib import redirect_stdout import shutil import time from distutils.version import LooseVersion as Version +from datetime import datetime # Django from django.conf import settings @@ -482,8 +483,8 @@ def inspect_execution_nodes(instance_list): execution_node_health_check.apply_async([hostname]) -@task(queue=get_local_queuename) -def cluster_node_heartbeat(): +@task(queue=get_local_queuename, bind_kwargs=['dispatch_time', 'active_task_ids']) +def cluster_node_heartbeat(dispatch_time=None, active_task_ids=None): logger.debug("Cluster node heartbeat task.") nowtime = now() instance_list = list(Instance.objects.all()) @@ -562,6 +563,10 @@ def cluster_node_heartbeat(): else: logger.exception('Error marking {} as lost'.format(other_inst.hostname)) + # Run local reaper + if active_task_ids is not None: + reaper.reap(instance=this_inst, excluded_uuids=active_task_ids, ref_time=datetime.fromisoformat(dispatch_time)) + @task(queue=get_local_queuename) def awx_receptor_workunit_reaper():