Files
awx/awx/main/scheduler/tasks.py
Chris Meyers dd513621f0 Revert "Merge pull request #5553 from chrismeyersfsu/fix-waiting_blocked"
This reverts commit 9ba2122f4f85eecaeb6fa53ac92ea2811b05e83f, reversing
changes made to c3a5f2c96fd85dd1405a8f5c875ffc988dee16a4.
2017-02-27 09:38:45 -05:00

60 lines
1.5 KiB
Python

# Python
import logging
import json
# Django
from django.db import transaction
from django.db.utils import DatabaseError
# Celery
from celery import task
# AWX
from awx.main.models import Instance
from awx.main.scheduler import TaskManager
from django.core.cache import cache
logger = logging.getLogger('awx.main.scheduler')
# TODO: move logic to UnifiedJob model and use bind=True feature of celery.
# Would we need the request loop then? I think so. Even if we get the in-memory
# updated model, the call to schedule() may get stale data.
@task
def run_job_launch(job_id):
TaskManager().schedule()
@task
def run_job_complete(job_id):
TaskManager().schedule()
@task
def run_task_manager():
logger.debug("Running Tower task manager.")
TaskManager().schedule()
@task
def run_fail_inconsistent_running_jobs():
logger.debug("Running task to fail inconsistent running jobs.")
with transaction.atomic():
# Lock
try:
Instance.objects.select_for_update(nowait=True).all()[0]
scheduler = TaskManager()
active_task_queues, active_tasks = scheduler.get_active_tasks()
cache.set("active_celery_tasks", json.dumps(active_task_queues))
if active_tasks is None:
# TODO: Failed to contact celery. We should surface this.
return None
all_running_sorted_tasks = scheduler.get_running_tasks()
scheduler.process_celery_tasks(active_tasks, all_running_sorted_tasks)
except DatabaseError:
return