clean up some unnecessary dispatcher reaping code

This commit is contained in:
Ryan Petrello 2019-01-24 02:19:18 -05:00
parent 1becd4c39d
commit 4707dc2a05
No known key found for this signature in database
GPG Key ID: F2AA5F2122351777
3 changed files with 18 additions and 15 deletions

View File

@ -1,6 +1,5 @@
import logging
import os
import sys
import random
import traceback
from uuid import uuid4
@ -325,6 +324,11 @@ class AutoscalePool(WorkerPool):
2. Clean up unnecessary, idle workers.
3. Check to see if the database says this node is running any tasks
that aren't actually running. If so, reap them.
IMPORTANT: this function is one of the few places in the dispatcher
(aside from setting lookups) where we talk to the database. As such,
if there's an outage, this method _can_ throw various
django.db.utils.Error exceptions. Act accordingly.
"""
orphaned = []
for w in self.workers[::]:
@ -366,17 +370,7 @@ class AutoscalePool(WorkerPool):
for worker in self.workers:
worker.calculate_managed_tasks()
running_uuids.extend(list(worker.managed_tasks.keys()))
try:
reaper.reap(excluded_uuids=running_uuids)
except Exception:
# we _probably_ failed here due to DB connectivity issues, so
# don't use our logger (it accesses the database for configuration)
_, _, tb = sys.exc_info()
traceback.print_tb(tb)
for conn in connections.all():
# If the database connection has a hiccup, re-establish a new
# connection
conn.close_if_unusable_or_obsolete()
reaper.reap(excluded_uuids=running_uuids)
def up(self):
if self.full:

View File

@ -81,7 +81,11 @@ class AWXConsumer(ConsumerMixin):
def process_task(self, body, message):
if 'control' in body:
return self.control(body, message)
try:
return self.control(body, message)
except Exception:
logger.exception("Exception handling control message:")
return
if len(self.pool):
if "uuid" in body and body['uuid']:
try:

View File

@ -3,6 +3,7 @@ import multiprocessing
import random
import signal
import time
from unittest import mock
from django.utils.timezone import now as tz_now
import pytest
@ -200,7 +201,9 @@ class TestAutoScaling:
assert len(self.pool) == 10
# cleanup should scale down to 8 workers
self.pool.cleanup()
with mock.patch('awx.main.dispatch.reaper.reap') as reap:
self.pool.cleanup()
reap.assert_called()
assert len(self.pool) == 2
def test_max_scale_up(self):
@ -248,7 +251,9 @@ class TestAutoScaling:
time.sleep(1) # wait a moment for sigterm
# clean up and the dead worker
self.pool.cleanup()
with mock.patch('awx.main.dispatch.reaper.reap') as reap:
self.pool.cleanup()
reap.assert_called()
assert len(self.pool) == 1
assert self.pool.workers[0].pid == alive_pid