Merge branch 'feature-ha_task_manager' into devel

This commit is contained in:
Chris Meyers
2016-09-29 10:27:17 -04:00
14 changed files with 633 additions and 551 deletions

View File

@@ -47,7 +47,6 @@ from django.contrib.auth.models import User
from awx.main.constants import CLOUD_PROVIDERS
from awx.main.models import * # noqa
from awx.main.models import UnifiedJob
from awx.main.queue import FifoQueue
from awx.main.task_engine import TaskEnhancer
from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field, update_scm_url,
emit_websocket_notification,
@@ -57,7 +56,7 @@ __all__ = ['RunJob', 'RunSystemJob', 'RunProjectUpdate', 'RunInventoryUpdate',
'RunAdHocCommand', 'RunWorkflowJob', 'handle_work_error',
'handle_work_success', 'update_inventory_computed_fields',
'send_notifications', 'run_administrative_checks',
'run_workflow_job']
'RunJobLaunch']
HIDDEN_PASSWORD = '**********'
@@ -177,14 +176,6 @@ def tower_periodic_scheduler(self):
new_unified_job.socketio_emit_status("failed")
emit_websocket_notification('/socket.io/schedules', 'schedule_changed', dict(id=schedule.id))
@task(queue='default')
def notify_task_runner(metadata_dict):
"""Add the given task into the Tower task manager's queue, to be consumed
by the task system.
"""
queue = FifoQueue('tower_task_manager')
queue.push(metadata_dict)
def _send_notification_templates(instance, status_str):
if status_str not in ['succeeded', 'failed']:
raise ValueError("status_str must be either succeeded or failed")
@@ -200,6 +191,7 @@ def _send_notification_templates(instance, status_str):
for n in all_notification_templates],
job_id=instance.id)
@task(bind=True, queue='default')
def handle_work_success(self, result, task_actual):
instance = UnifiedJob.get_instance_by_type(task_actual['type'], task_actual['id'])
@@ -208,6 +200,9 @@ def handle_work_success(self, result, task_actual):
_send_notification_templates(instance, 'succeeded')
from awx.main.scheduler.tasks import run_job_complete
run_job_complete.delay(instance.id)
@task(bind=True, queue='default')
def handle_work_error(self, task_id, subtasks=None):
print('Executing error task id %s, subtasks: %s' %
@@ -236,6 +231,15 @@ def handle_work_error(self, task_id, subtasks=None):
if first_instance:
_send_notification_templates(first_instance, 'failed')
# We only send 1 job complete message since all the job completion message
# handling does is trigger the scheduler. If we extend the functionality of
# what the job complete message handler does then we may want to send a
# completion event for each job here.
if first_instance:
from awx.main.scheduler.tasks import run_job_complete
run_job_complete.delay(first_instance.id)
pass
@task(queue='default')
def update_inventory_computed_fields(inventory_id, should_update_hosts=True):
@@ -301,10 +305,6 @@ class BaseTask(Task):
logger.error('Failed to update %s after %d retries.',
self.model._meta.object_name, _attempt)
def signal_finished(self, pk):
pass
# notify_task_runner(dict(complete=pk))
def get_path_to(self, *args):
'''
Return absolute path relative to this file.
@@ -1662,21 +1662,30 @@ class RunSystemJob(BaseTask):
def build_cwd(self, instance, **kwargs):
return settings.BASE_DIR
'''
class RunWorkflowJob(BaseTask):
name = 'awx.main.tasks.run_workflow_job'
model = WorkflowJob
def run(self, pk, **kwargs):
from awx.main.management.commands.run_task_system import WorkflowDAG
'''
Run the job/task and capture its output.
'''
pass
#Run the job/task and capture its output.
instance = self.update_model(pk, status='running', celery_task_id=self.request.id)
instance.socketio_emit_status("running")
# FIXME: Detect workflow run completion
# FIXME: Currently, the workflow job busy waits until the graph run is
# complete. Instead, the workflow job should return or never even run,
# because all of the "launch logic" can be done schedule().
# However, other aspects of our system depend on a 1-1 relationship
# between a Job and a Celery Task.
#
# * If we let the workflow job task (RunWorkflowJob.run()) complete
# then how do we trigger the handle_work_error and
# handle_work_success subtasks?
#
# * How do we handle the recovery process? (i.e. there is an entry in
# the database but not in celery).
while True:
dag = WorkflowDAG(instance)
if dag.is_workflow_done():
@@ -1686,4 +1695,4 @@ class RunWorkflowJob(BaseTask):
time.sleep(1)
instance.socketio_emit_status(instance.status)
# TODO: Handle cancel
'''