From 0d29bbfdc66f061e60231f70d582f5fc5ff12c01 Mon Sep 17 00:00:00 2001 From: Ryan Petrello Date: Wed, 17 Oct 2018 13:36:19 -0400 Subject: [PATCH] make the dispatcher more fault-tolerant to prolonged database outages --- awx/main/dispatch/pool.py | 56 +++++++++++++------ awx/main/dispatch/reaper.py | 4 +- awx/main/dispatch/worker/task.py | 5 ++ .../management/commands/run_dispatcher.py | 6 +- awx/main/tests/functional/test_dispatch.py | 26 +++++++++ 5 files changed, 78 insertions(+), 19 deletions(-) diff --git a/awx/main/dispatch/pool.py b/awx/main/dispatch/pool.py index b03bbf4c92..e93d2ffc90 100644 --- a/awx/main/dispatch/pool.py +++ b/awx/main/dispatch/pool.py @@ -1,5 +1,6 @@ import logging import os +import sys import random import traceback from uuid import uuid4 @@ -10,7 +11,7 @@ from multiprocessing import Queue as MPQueue from Queue import Full as QueueFull, Empty as QueueEmpty from django.conf import settings -from django.db import connection as django_connection +from django.db import connection as django_connection, connections from django.core.cache import cache as django_cache from jinja2 import Template import psutil @@ -319,6 +320,8 @@ class AutoscalePool(WorkerPool): 1. Discover worker processes that exited, and recover messages they were handling. 2. Clean up unnecessary, idle workers. + 3. Check to see if the database says this node is running any tasks + that aren't actually running. If so, reap them. """ orphaned = [] for w in self.workers[::]: @@ -354,6 +357,20 @@ class AutoscalePool(WorkerPool): idx = random.choice(range(len(self.workers))) self.write(idx, m) + # if the database says a job is running on this node, but it's *not*, + # then reap it + running_uuids = [] + for worker in self.workers: + worker.calculate_managed_tasks() + running_uuids.extend(worker.managed_tasks.keys()) + try: + reaper.reap(excluded_uuids=running_uuids) + except Exception: + # we _probably_ failed here due to DB connectivity issues, so + # don't use our logger (it accesses the database for configuration) + _, _, tb = sys.exc_info() + traceback.print_tb(tb) + def up(self): if self.full: # if we can't spawn more workers, just toss this message into a @@ -364,18 +381,25 @@ class AutoscalePool(WorkerPool): return super(AutoscalePool, self).up() def write(self, preferred_queue, body): - # when the cluster heartbeat occurs, clean up internally - if isinstance(body, dict) and 'cluster_node_heartbeat' in body['task']: - self.cleanup() - if self.should_grow: - self.up() - # we don't care about "preferred queue" round robin distribution, just - # find the first non-busy worker and claim it - workers = self.workers[:] - random.shuffle(workers) - for w in workers: - if not w.busy: - w.put(body) - break - else: - return super(AutoscalePool, self).write(preferred_queue, body) + try: + # when the cluster heartbeat occurs, clean up internally + if isinstance(body, dict) and 'cluster_node_heartbeat' in body['task']: + self.cleanup() + if self.should_grow: + self.up() + # we don't care about "preferred queue" round robin distribution, just + # find the first non-busy worker and claim it + workers = self.workers[:] + random.shuffle(workers) + for w in workers: + if not w.busy: + w.put(body) + break + else: + return super(AutoscalePool, self).write(preferred_queue, body) + except Exception: + for conn in connections.all(): + # If the database connection has a hiccup, re-establish a new + # connection + conn.close_if_unusable_or_obsolete() + logger.exception('failed to write inbound message') diff --git a/awx/main/dispatch/reaper.py b/awx/main/dispatch/reaper.py index 8e9a9d3d15..f37a62290f 100644 --- a/awx/main/dispatch/reaper.py +++ b/awx/main/dispatch/reaper.py @@ -26,7 +26,7 @@ def reap_job(j, status): ) -def reap(instance=None, status='failed'): +def reap(instance=None, status='failed', excluded_uuids=[]): ''' Reap all jobs in waiting|running for this instance. ''' @@ -41,6 +41,6 @@ def reap(instance=None, status='failed'): Q(execution_node=me.hostname) | Q(controller_node=me.hostname) ) & ~Q(polymorphic_ctype_id=workflow_ctype_id) - ) + ).exclude(celery_task_id__in=excluded_uuids) for j in jobs: reap_job(j, status) diff --git a/awx/main/dispatch/worker/task.py b/awx/main/dispatch/worker/task.py index d1273749a1..89298384bb 100644 --- a/awx/main/dispatch/worker/task.py +++ b/awx/main/dispatch/worker/task.py @@ -5,6 +5,7 @@ import sys import traceback import six +from django import db from awx.main.tasks import dispatch_startup, inform_cluster_of_shutdown @@ -74,6 +75,10 @@ class TaskWorker(BaseWorker): 'task': u'awx.main.tasks.RunProjectUpdate' } ''' + for conn in db.connections.all(): + # If the database connection has a hiccup during at task, close it + # so we can establish a new connection + conn.close_if_unusable_or_obsolete() result = None try: result = self.run_callable(body) diff --git a/awx/main/management/commands/run_dispatcher.py b/awx/main/management/commands/run_dispatcher.py index 53cfe05a29..312c146e20 100644 --- a/awx/main/management/commands/run_dispatcher.py +++ b/awx/main/management/commands/run_dispatcher.py @@ -7,7 +7,7 @@ from multiprocessing import Process from django.conf import settings from django.core.cache import cache as django_cache from django.core.management.base import BaseCommand -from django.db import connection as django_connection +from django.db import connection as django_connection, connections from kombu import Connection, Exchange, Queue from awx.main.dispatch import get_local_queuename, reaper @@ -57,6 +57,10 @@ class Command(BaseCommand): return super(AWXScheduler, self).tick(*args, **kwargs) def apply_async(self, entry, producer=None, advance=True, **kwargs): + for conn in connections.all(): + # If the database connection has a hiccup, re-establish a new + # connection + conn.close_if_unusable_or_obsolete() task = TaskWorker.resolve_callable(entry.task) result, queue = task.apply_async() diff --git a/awx/main/tests/functional/test_dispatch.py b/awx/main/tests/functional/test_dispatch.py index b7dc158a9b..3567d7d37a 100644 --- a/awx/main/tests/functional/test_dispatch.py +++ b/awx/main/tests/functional/test_dispatch.py @@ -348,6 +348,32 @@ class TestJobReaper(object): else: assert job.status == status + @pytest.mark.parametrize('excluded_uuids, fail', [ + (['abc123'], False), + ([], True), + ]) + def test_do_not_reap_excluded_uuids(self, excluded_uuids, fail): + i = Instance(hostname='awx') + i.save() + j = Job( + status='running', + execution_node='awx', + controller_node='', + start_args='SENSITIVE', + celery_task_id='abc123', + ) + j.save() + + # if the UUID is excluded, don't reap it + reaper.reap(i, excluded_uuids=excluded_uuids) + job = Job.objects.first() + if fail: + assert job.status == 'failed' + assert 'marked as failed' in job.job_explanation + assert job.start_args == '' + else: + assert job.status == 'running' + def test_workflow_does_not_reap(self): i = Instance(hostname='awx') i.save()