make the dispatcher more fault-tolerant to prolonged database outages

This commit is contained in:
Ryan Petrello 2018-10-17 13:36:19 -04:00
parent ce8117ef19
commit 0d29bbfdc6
No known key found for this signature in database
GPG Key ID: F2AA5F2122351777
5 changed files with 78 additions and 19 deletions

View File

@ -1,5 +1,6 @@
import logging
import os
import sys
import random
import traceback
from uuid import uuid4
@ -10,7 +11,7 @@ from multiprocessing import Queue as MPQueue
from Queue import Full as QueueFull, Empty as QueueEmpty
from django.conf import settings
from django.db import connection as django_connection
from django.db import connection as django_connection, connections
from django.core.cache import cache as django_cache
from jinja2 import Template
import psutil
@ -319,6 +320,8 @@ class AutoscalePool(WorkerPool):
1. Discover worker processes that exited, and recover messages they
were handling.
2. Clean up unnecessary, idle workers.
3. Check to see if the database says this node is running any tasks
that aren't actually running. If so, reap them.
"""
orphaned = []
for w in self.workers[::]:
@ -354,6 +357,20 @@ class AutoscalePool(WorkerPool):
idx = random.choice(range(len(self.workers)))
self.write(idx, m)
# if the database says a job is running on this node, but it's *not*,
# then reap it
running_uuids = []
for worker in self.workers:
worker.calculate_managed_tasks()
running_uuids.extend(worker.managed_tasks.keys())
try:
reaper.reap(excluded_uuids=running_uuids)
except Exception:
# we _probably_ failed here due to DB connectivity issues, so
# don't use our logger (it accesses the database for configuration)
_, _, tb = sys.exc_info()
traceback.print_tb(tb)
def up(self):
if self.full:
# if we can't spawn more workers, just toss this message into a
@ -364,18 +381,25 @@ class AutoscalePool(WorkerPool):
return super(AutoscalePool, self).up()
def write(self, preferred_queue, body):
# when the cluster heartbeat occurs, clean up internally
if isinstance(body, dict) and 'cluster_node_heartbeat' in body['task']:
self.cleanup()
if self.should_grow:
self.up()
# we don't care about "preferred queue" round robin distribution, just
# find the first non-busy worker and claim it
workers = self.workers[:]
random.shuffle(workers)
for w in workers:
if not w.busy:
w.put(body)
break
else:
return super(AutoscalePool, self).write(preferred_queue, body)
try:
# when the cluster heartbeat occurs, clean up internally
if isinstance(body, dict) and 'cluster_node_heartbeat' in body['task']:
self.cleanup()
if self.should_grow:
self.up()
# we don't care about "preferred queue" round robin distribution, just
# find the first non-busy worker and claim it
workers = self.workers[:]
random.shuffle(workers)
for w in workers:
if not w.busy:
w.put(body)
break
else:
return super(AutoscalePool, self).write(preferred_queue, body)
except Exception:
for conn in connections.all():
# If the database connection has a hiccup, re-establish a new
# connection
conn.close_if_unusable_or_obsolete()
logger.exception('failed to write inbound message')

View File

@ -26,7 +26,7 @@ def reap_job(j, status):
)
def reap(instance=None, status='failed'):
def reap(instance=None, status='failed', excluded_uuids=[]):
'''
Reap all jobs in waiting|running for this instance.
'''
@ -41,6 +41,6 @@ def reap(instance=None, status='failed'):
Q(execution_node=me.hostname) |
Q(controller_node=me.hostname)
) & ~Q(polymorphic_ctype_id=workflow_ctype_id)
)
).exclude(celery_task_id__in=excluded_uuids)
for j in jobs:
reap_job(j, status)

View File

@ -5,6 +5,7 @@ import sys
import traceback
import six
from django import db
from awx.main.tasks import dispatch_startup, inform_cluster_of_shutdown
@ -74,6 +75,10 @@ class TaskWorker(BaseWorker):
'task': u'awx.main.tasks.RunProjectUpdate'
}
'''
for conn in db.connections.all():
# If the database connection has a hiccup during at task, close it
# so we can establish a new connection
conn.close_if_unusable_or_obsolete()
result = None
try:
result = self.run_callable(body)

View File

@ -7,7 +7,7 @@ from multiprocessing import Process
from django.conf import settings
from django.core.cache import cache as django_cache
from django.core.management.base import BaseCommand
from django.db import connection as django_connection
from django.db import connection as django_connection, connections
from kombu import Connection, Exchange, Queue
from awx.main.dispatch import get_local_queuename, reaper
@ -57,6 +57,10 @@ class Command(BaseCommand):
return super(AWXScheduler, self).tick(*args, **kwargs)
def apply_async(self, entry, producer=None, advance=True, **kwargs):
for conn in connections.all():
# If the database connection has a hiccup, re-establish a new
# connection
conn.close_if_unusable_or_obsolete()
task = TaskWorker.resolve_callable(entry.task)
result, queue = task.apply_async()

View File

@ -348,6 +348,32 @@ class TestJobReaper(object):
else:
assert job.status == status
@pytest.mark.parametrize('excluded_uuids, fail', [
(['abc123'], False),
([], True),
])
def test_do_not_reap_excluded_uuids(self, excluded_uuids, fail):
i = Instance(hostname='awx')
i.save()
j = Job(
status='running',
execution_node='awx',
controller_node='',
start_args='SENSITIVE',
celery_task_id='abc123',
)
j.save()
# if the UUID is excluded, don't reap it
reaper.reap(i, excluded_uuids=excluded_uuids)
job = Job.objects.first()
if fail:
assert job.status == 'failed'
assert 'marked as failed' in job.job_explanation
assert job.start_args == ''
else:
assert job.status == 'running'
def test_workflow_does_not_reap(self):
i = Instance(hostname='awx')
i.save()