Merge pull request #6071 from ryanpetrello/task-manager-hang-detection

add code for detecting (and killing) a hung task manager task

Reviewed-by: https://github.com/apps/softwarefactory-project-zuul
This commit is contained in:
softwarefactory-project-zuul[bot]
2020-02-26 19:05:36 +00:00
committed by GitHub

View File

@@ -1,7 +1,9 @@
import logging import logging
import os import os
import random import random
import signal
import sys import sys
import time
import traceback import traceback
from uuid import uuid4 from uuid import uuid4
@@ -244,7 +246,7 @@ class WorkerPool(object):
' qsize={{ w.managed_tasks|length }}' ' qsize={{ w.managed_tasks|length }}'
' rss={{ w.mb }}MB' ' rss={{ w.mb }}MB'
'{% for task in w.managed_tasks.values() %}' '{% for task in w.managed_tasks.values() %}'
'\n - {% if loop.index0 == 0 %}running {% else %}queued {% endif %}' '\n - {% if loop.index0 == 0 %}running {% if "age" in task %}for: {{ "%.1f" % task["age"] }}s {% endif %}{% else %}queued {% endif %}'
'{{ task["uuid"] }} ' '{{ task["uuid"] }} '
'{% if "task" in task %}' '{% if "task" in task %}'
'{{ task["task"].rsplit(".", 1)[-1] }}' '{{ task["task"].rsplit(".", 1)[-1] }}'
@@ -365,6 +367,26 @@ class AutoscalePool(WorkerPool):
logger.warn('scaling down worker pid:{}'.format(w.pid)) logger.warn('scaling down worker pid:{}'.format(w.pid))
w.quit() w.quit()
self.workers.remove(w) self.workers.remove(w)
if w.alive:
# if we discover a task manager invocation that's been running
# too long, reap it (because otherwise it'll just hold the postgres
# advisory lock forever); the goal of this code is to discover
# deadlocks or other serious issues in the task manager that cause
# the task manager to never do more work
current_task = w.current_task
if current_task and isinstance(current_task, dict):
if current_task.get('task', '').endswith('tasks.run_task_manager'):
if 'started' not in current_task:
w.managed_tasks[
current_task['uuid']
]['started'] = time.time()
age = time.time() - current_task['started']
w.managed_tasks[current_task['uuid']]['age'] = age
if age > (60 * 5):
logger.error(
f'run_task_manager has held the advisory lock for >5m, sending SIGTERM to {w.pid}'
) # noqa
os.kill(w.pid, signal.SIGTERM)
for m in orphaned: for m in orphaned:
# if all the workers are dead, spawn at least one # if all the workers are dead, spawn at least one