Move reaper logic into worker, avoiding bottlenecks

This commit is contained in:
Alan Rominger
2022-07-27 10:58:58 -04:00
parent cc1e349ea8
commit ccd46a1c0f
4 changed files with 36 additions and 21 deletions

View File

@@ -16,6 +16,7 @@ from queue import Full as QueueFull, Empty as QueueEmpty
from django.conf import settings from django.conf import settings
from django.db import connection as django_connection, connections from django.db import connection as django_connection, connections
from django.core.cache import cache as django_cache from django.core.cache import cache as django_cache
from django.utils.timezone import now as tz_now
from django_guid import set_guid from django_guid import set_guid
from jinja2 import Template from jinja2 import Template
import psutil import psutil
@@ -437,18 +438,17 @@ class AutoscalePool(WorkerPool):
idx = random.choice(range(len(self.workers))) idx = random.choice(range(len(self.workers)))
self.write(idx, m) self.write(idx, m)
# if the database says a job is running or queued on this node, but it's *not*, def add_bind_kwargs(self, body):
# then reap it bind_kwargs = body.pop('bind_kwargs', [])
running_uuids = [] body.setdefault('kwargs', {})
for worker in self.workers: if 'dispatch_time' in bind_kwargs:
worker.calculate_managed_tasks() body['kwargs']['dispatch_time'] = tz_now().isoformat()
running_uuids.extend(list(worker.managed_tasks.keys())) if 'active_task_ids' in bind_kwargs:
active_task_ids = []
# if we are not in the dangerous situation of queue backup then clear old waiting jobs for worker in self.workers:
if self.workers and max(len(w.managed_tasks) for w in self.workers) <= 1: worker.calculate_managed_tasks()
reaper.reap_waiting(excluded_uuids=running_uuids) active_task_ids.extend(list(worker.managed_tasks.keys()))
body['kwargs']['active_task_ids'] = active_task_ids
reaper.reap(excluded_uuids=running_uuids)
def up(self): def up(self):
if self.full: if self.full:
@@ -463,9 +463,8 @@ class AutoscalePool(WorkerPool):
if 'guid' in body: if 'guid' in body:
set_guid(body['guid']) set_guid(body['guid'])
try: try:
# when the cluster heartbeat occurs, clean up internally if isinstance(body, dict) and body.get('bind_kwargs'):
if isinstance(body, dict) and 'cluster_node_heartbeat' in body['task']: self.add_bind_kwargs(body)
self.cleanup()
if self.should_grow: if self.should_grow:
self.up() self.up()
# we don't care about "preferred queue" round robin distribution, just # we don't care about "preferred queue" round robin distribution, just

View File

@@ -50,13 +50,21 @@ class task:
@task(queue='tower_broadcast') @task(queue='tower_broadcast')
def announce(): def announce():
print("Run this everywhere!") print("Run this everywhere!")
# The special parameter bind_kwargs tells the main dispatcher process to add certain kwargs
@task(bind_kwargs=['dispatch_time'])
def print_time(dispatch_time=None):
print(f"Time I was dispatched: {dispatch_time}")
""" """
def __init__(self, queue=None): def __init__(self, queue=None, bind_kwargs=None):
self.queue = queue self.queue = queue
self.bind_kwargs = bind_kwargs
def __call__(self, fn=None): def __call__(self, fn=None):
queue = self.queue queue = self.queue
bind_kwargs = self.bind_kwargs
class PublisherMixin(object): class PublisherMixin(object):
@@ -80,6 +88,8 @@ class task:
guid = get_guid() guid = get_guid()
if guid: if guid:
obj['guid'] = guid obj['guid'] = guid
if bind_kwargs:
obj['bind_kwargs'] = bind_kwargs
obj.update(**kw) obj.update(**kw)
if callable(queue): if callable(queue):
queue = queue() queue = queue()

View File

@@ -55,7 +55,7 @@ def reap_job(j, status, job_explanation=None):
logger.error(f'{j.log_format} is no longer {status_before}; reaping') logger.error(f'{j.log_format} is no longer {status_before}; reaping')
def reap_waiting(instance=None, status='failed', job_explanation=None, grace_period=None, excluded_uuids=None): def reap_waiting(instance=None, status='failed', job_explanation=None, grace_period=None, excluded_uuids=None, ref_time=None):
""" """
Reap all jobs in waiting for this instance. Reap all jobs in waiting for this instance.
""" """
@@ -69,8 +69,9 @@ def reap_waiting(instance=None, status='failed', job_explanation=None, grace_per
except RuntimeError as e: except RuntimeError as e:
logger.warning(f'Local instance is not registered, not running reaper: {e}') logger.warning(f'Local instance is not registered, not running reaper: {e}')
return return
now = tz_now() if ref_time is None:
jobs = UnifiedJob.objects.filter(status='waiting', modified__lte=now - timedelta(seconds=grace_period), controller_node=me.hostname) ref_time = tz_now()
jobs = UnifiedJob.objects.filter(status='waiting', modified__lte=ref_time - timedelta(seconds=grace_period), controller_node=me.hostname)
if excluded_uuids: if excluded_uuids:
jobs = jobs.exclude(celery_task_id__in=excluded_uuids) jobs = jobs.exclude(celery_task_id__in=excluded_uuids)
for j in jobs: for j in jobs:

View File

@@ -10,6 +10,7 @@ from contextlib import redirect_stdout
import shutil import shutil
import time import time
from distutils.version import LooseVersion as Version from distutils.version import LooseVersion as Version
from datetime import datetime
# Django # Django
from django.conf import settings from django.conf import settings
@@ -482,8 +483,8 @@ def inspect_execution_nodes(instance_list):
execution_node_health_check.apply_async([hostname]) execution_node_health_check.apply_async([hostname])
@task(queue=get_local_queuename) @task(queue=get_local_queuename, bind_kwargs=['dispatch_time', 'active_task_ids'])
def cluster_node_heartbeat(): def cluster_node_heartbeat(dispatch_time=None, active_task_ids=None):
logger.debug("Cluster node heartbeat task.") logger.debug("Cluster node heartbeat task.")
nowtime = now() nowtime = now()
instance_list = list(Instance.objects.all()) instance_list = list(Instance.objects.all())
@@ -562,6 +563,10 @@ def cluster_node_heartbeat():
else: else:
logger.exception('Error marking {} as lost'.format(other_inst.hostname)) logger.exception('Error marking {} as lost'.format(other_inst.hostname))
# Run local reaper
if active_task_ids is not None:
reaper.reap(instance=this_inst, excluded_uuids=active_task_ids, ref_time=datetime.fromisoformat(dispatch_time))
@task(queue=get_local_queuename) @task(queue=get_local_queuename)
def awx_receptor_workunit_reaper(): def awx_receptor_workunit_reaper():