Push celery queue stats to memcached periodically

We'll piggyback off the task that checks for inconsistent jobs between
celery and Tower itself. These are read off via /api/v1/ping
This commit is contained in:
Matthew Jones
2016-11-03 12:09:40 -04:00
parent 08b4364211
commit 8e77deea27
3 changed files with 8 additions and 6 deletions

View File

@@ -16,6 +16,7 @@ from collections import OrderedDict
# Django # Django
from django.conf import settings from django.conf import settings
from django.contrib.auth.models import User from django.contrib.auth.models import User
from django.core.cache import cache
from django.core.urlresolvers import reverse from django.core.urlresolvers import reverse
from django.core.exceptions import FieldError from django.core.exceptions import FieldError
from django.db.models import Q, Count from django.db.models import Q, Count
@@ -165,11 +166,12 @@ class ApiV1PingView(APIView):
Everything returned here should be considered public / insecure, as Everything returned here should be considered public / insecure, as
this requires no auth and is intended for use by the installer process. this requires no auth and is intended for use by the installer process.
""" """
# Most of this response is canned; just build the dictionary. active_tasks = cache.get("active_celery_tasks", None)
response = { response = {
'ha': is_ha_environment(), 'ha': is_ha_environment(),
'version': get_awx_version(), 'version': get_awx_version(),
'active_node': settings.CLUSTER_HOST_ID, 'active_node': settings.CLUSTER_HOST_ID,
'celery_active_tasks': json.loads(active_tasks) if active_tasks is not None else None
} }
response['instances'] = [] response['instances'] = []

View File

@@ -154,7 +154,7 @@ class TaskManager():
if not hasattr(settings, 'CELERY_UNIT_TEST'): if not hasattr(settings, 'CELERY_UNIT_TEST'):
return None return None
return active_tasks return (active_task_queues, active_tasks)
def start_task(self, task, dependent_tasks=[]): def start_task(self, task, dependent_tasks=[]):
from awx.main.tasks import handle_work_error, handle_work_success from awx.main.tasks import handle_work_error, handle_work_success
@@ -326,8 +326,6 @@ class TaskManager():
def _schedule(self): def _schedule(self):
all_sorted_tasks = self.get_tasks() all_sorted_tasks = self.get_tasks()
if len(all_sorted_tasks) > 0: if len(all_sorted_tasks) > 0:
#self.process_celery_tasks(active_tasks, all_sorted_tasks)
latest_project_updates = self.get_latest_project_update_tasks(all_sorted_tasks) latest_project_updates = self.get_latest_project_update_tasks(all_sorted_tasks)
self.process_latest_project_updates(latest_project_updates) self.process_latest_project_updates(latest_project_updates)

View File

@@ -1,6 +1,7 @@
# Python # Python
import logging import logging
import json
# Django # Django
from django.db import transaction from django.db import transaction
@@ -12,6 +13,7 @@ from celery import task
# AWX # AWX
from awx.main.models import Instance from awx.main.models import Instance
from awx.main.scheduler import TaskManager from awx.main.scheduler import TaskManager
from django.core.cache import cache
logger = logging.getLogger('awx.main.scheduler') logger = logging.getLogger('awx.main.scheduler')
@@ -38,8 +40,8 @@ def run_fail_inconsistent_running_jobs():
try: try:
Instance.objects.select_for_update(nowait=True).all()[0] Instance.objects.select_for_update(nowait=True).all()[0]
scheduler = TaskManager() scheduler = TaskManager()
active_tasks = scheduler.get_active_tasks() active_task_queues, active_tasks = scheduler.get_active_tasks()
cache.set("active_celery_tasks", json.dumps(active_task_queues))
if active_tasks is None: if active_tasks is None:
# TODO: Failed to contact celery. We should surface this. # TODO: Failed to contact celery. We should surface this.
return None return None