mirror of
https://github.com/ansible/awx.git
synced 2026-05-19 23:07:42 -02:30
fully message driven job execution
TODO: * Need a distributed lock (leverage postgres) * Less memory-intensive graph representation * Maybe serializer/deserializer graph to database * Iterative graph building instead of full rebuild.
This commit is contained in:
@@ -798,34 +798,43 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
|
|||||||
status=self.status,
|
status=self.status,
|
||||||
traceback=self.result_traceback)
|
traceback=self.result_traceback)
|
||||||
|
|
||||||
def start(self, error_callback, success_callback, **kwargs):
|
def pre_start(self, **kwargs):
|
||||||
'''
|
|
||||||
Start the task running via Celery.
|
|
||||||
'''
|
|
||||||
task_class = self._get_task_class()
|
|
||||||
if not self.can_start:
|
if not self.can_start:
|
||||||
self.job_explanation = u'%s is not in a startable status: %s, expecting one of %s' % (self._meta.verbose_name, self.status, str(('new', 'waiting')))
|
self.job_explanation = u'%s is not in a startable status: %s, expecting one of %s' % (self._meta.verbose_name, self.status, str(('new', 'waiting')))
|
||||||
self.save(update_fields=['job_explanation'])
|
self.save(update_fields=['job_explanation'])
|
||||||
return False
|
return (False, None)
|
||||||
|
|
||||||
needed = self.get_passwords_needed_to_start()
|
needed = self.get_passwords_needed_to_start()
|
||||||
try:
|
try:
|
||||||
start_args = json.loads(decrypt_field(self, 'start_args'))
|
start_args = json.loads(decrypt_field(self, 'start_args'))
|
||||||
except Exception:
|
except Exception:
|
||||||
start_args = None
|
start_args = None
|
||||||
|
|
||||||
if start_args in (None, ''):
|
if start_args in (None, ''):
|
||||||
start_args = kwargs
|
start_args = kwargs
|
||||||
|
|
||||||
opts = dict([(field, start_args.get(field, '')) for field in needed])
|
opts = dict([(field, start_args.get(field, '')) for field in needed])
|
||||||
|
|
||||||
if not all(opts.values()):
|
if not all(opts.values()):
|
||||||
missing_fields = ', '.join([k for k,v in opts.items() if not v])
|
missing_fields = ', '.join([k for k,v in opts.items() if not v])
|
||||||
self.job_explanation = u'Missing needed fields: %s.' % missing_fields
|
self.job_explanation = u'Missing needed fields: %s.' % missing_fields
|
||||||
self.save(update_fields=['job_explanation'])
|
self.save(update_fields=['job_explanation'])
|
||||||
return False
|
return (False, None)
|
||||||
#extra_data = dict([(field, kwargs[field]) for field in kwargs
|
|
||||||
# if field not in needed])
|
|
||||||
if 'extra_vars' in kwargs:
|
if 'extra_vars' in kwargs:
|
||||||
self.handle_extra_data(kwargs['extra_vars'])
|
self.handle_extra_data(kwargs['extra_vars'])
|
||||||
task_class().apply_async((self.pk,), opts, link_error=error_callback, link=success_callback)
|
|
||||||
return True
|
return (True, opts)
|
||||||
|
|
||||||
|
def start(self, error_callback, success_callback, **kwargs):
|
||||||
|
'''
|
||||||
|
Start the task running via Celery.
|
||||||
|
'''
|
||||||
|
task_class = self._get_task_class()
|
||||||
|
(res, opts) = self.pre_start(**kwargs)
|
||||||
|
if res:
|
||||||
|
task_class().apply_async((self.pk,), opts, link_error=error_callback, link=success_callback)
|
||||||
|
return res
|
||||||
|
|
||||||
def signal_start(self, **kwargs):
|
def signal_start(self, **kwargs):
|
||||||
"""Notify the task runner system to begin work on this task."""
|
"""Notify the task runner system to begin work on this task."""
|
||||||
@@ -852,6 +861,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
|
|||||||
self.update_fields(start_args=json.dumps(kwargs), status='pending')
|
self.update_fields(start_args=json.dumps(kwargs), status='pending')
|
||||||
self.socketio_emit_status("pending")
|
self.socketio_emit_status("pending")
|
||||||
|
|
||||||
|
print("Running job launch for job %s" % self.name)
|
||||||
from awx.main.scheduler.tasks import run_job_launch
|
from awx.main.scheduler.tasks import run_job_launch
|
||||||
run_job_launch.delay(self.id)
|
run_job_launch.delay(self.id)
|
||||||
|
|
||||||
|
|||||||
@@ -240,3 +240,11 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, JobNotificationMixin, Workflow
|
|||||||
def get_notification_friendly_name(self):
|
def get_notification_friendly_name(self):
|
||||||
return "Workflow Job"
|
return "Workflow Job"
|
||||||
|
|
||||||
|
def start(self, *args, **kwargs):
|
||||||
|
(res, opts) = self.pre_start(**kwargs)
|
||||||
|
if res:
|
||||||
|
self.status = 'running'
|
||||||
|
self.save()
|
||||||
|
self.socketio_emit_status("running")
|
||||||
|
return res
|
||||||
|
|
||||||
|
|||||||
@@ -4,13 +4,14 @@
|
|||||||
# Python
|
# Python
|
||||||
import datetime
|
import datetime
|
||||||
import logging
|
import logging
|
||||||
|
import struct, fcntl, os
|
||||||
|
|
||||||
# Django
|
# Django
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
|
from django.db import transaction
|
||||||
|
|
||||||
# AWX
|
# AWX
|
||||||
from awx.main.models import * # noqa
|
from awx.main.models import * # noqa
|
||||||
from awx.main.tasks import handle_work_error, handle_work_success
|
|
||||||
from awx.main.utils import get_system_task_capacity
|
from awx.main.utils import get_system_task_capacity
|
||||||
from awx.main.scheduler.dag_simple import SimpleDAG
|
from awx.main.scheduler.dag_simple import SimpleDAG
|
||||||
from awx.main.scheduler.dag_workflow import WorkflowDAG
|
from awx.main.scheduler.dag_workflow import WorkflowDAG
|
||||||
@@ -47,8 +48,8 @@ def get_running_workflow_jobs():
|
|||||||
WorkflowJob.objects.filter(status='running')]
|
WorkflowJob.objects.filter(status='running')]
|
||||||
return graph_workflow_jobs
|
return graph_workflow_jobs
|
||||||
|
|
||||||
def do_spawn_workflow_jobs():
|
def spawn_workflow_graph_jobs(workflow_jobs):
|
||||||
workflow_jobs = get_running_workflow_jobs()
|
# TODO: Consider using transaction.atomic
|
||||||
for workflow_job in workflow_jobs:
|
for workflow_job in workflow_jobs:
|
||||||
dag = WorkflowDAG(workflow_job)
|
dag = WorkflowDAG(workflow_job)
|
||||||
spawn_nodes = dag.bfs_nodes_to_run()
|
spawn_nodes = dag.bfs_nodes_to_run()
|
||||||
@@ -69,6 +70,16 @@ def do_spawn_workflow_jobs():
|
|||||||
# TODO: should we emit a status on the socket here similar to tasks.py tower_periodic_scheduler() ?
|
# TODO: should we emit a status on the socket here similar to tasks.py tower_periodic_scheduler() ?
|
||||||
#emit_websocket_notification('/socket.io/jobs', '', dict(id=))
|
#emit_websocket_notification('/socket.io/jobs', '', dict(id=))
|
||||||
|
|
||||||
|
# See comment in tasks.py::RunWorkflowJob::run()
|
||||||
|
def process_finished_workflow_jobs(workflow_jobs):
|
||||||
|
for workflow_job in workflow_jobs:
|
||||||
|
dag = WorkflowDAG(workflow_job)
|
||||||
|
if dag.is_workflow_done():
|
||||||
|
with transaction.atomic():
|
||||||
|
# TODO: detect if wfj failed
|
||||||
|
workflow_job.status = 'completed'
|
||||||
|
workflow_job.save()
|
||||||
|
workflow_job.socketio_emit_status('completed')
|
||||||
|
|
||||||
def rebuild_graph():
|
def rebuild_graph():
|
||||||
"""Regenerate the task graph by refreshing known tasks from Tower, purging
|
"""Regenerate the task graph by refreshing known tasks from Tower, purging
|
||||||
@@ -88,8 +99,6 @@ def rebuild_graph():
|
|||||||
logger.warn("Ignoring celery task inspector")
|
logger.warn("Ignoring celery task inspector")
|
||||||
active_task_queues = None
|
active_task_queues = None
|
||||||
|
|
||||||
do_spawn_workflow_jobs()
|
|
||||||
|
|
||||||
all_sorted_tasks = get_tasks()
|
all_sorted_tasks = get_tasks()
|
||||||
if not len(all_sorted_tasks):
|
if not len(all_sorted_tasks):
|
||||||
return None
|
return None
|
||||||
@@ -106,12 +115,13 @@ def rebuild_graph():
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
running_tasks = filter(lambda t: t.status == 'running', all_sorted_tasks)
|
running_tasks = filter(lambda t: t.status == 'running', all_sorted_tasks)
|
||||||
|
running_celery_tasks = filter(lambda t: type(t) != WorkflowJob, running_tasks)
|
||||||
waiting_tasks = filter(lambda t: t.status != 'running', all_sorted_tasks)
|
waiting_tasks = filter(lambda t: t.status != 'running', all_sorted_tasks)
|
||||||
new_tasks = filter(lambda t: t.status == 'pending', all_sorted_tasks)
|
new_tasks = filter(lambda t: t.status == 'pending', all_sorted_tasks)
|
||||||
|
|
||||||
# Check running tasks and make sure they are active in celery
|
# Check running tasks and make sure they are active in celery
|
||||||
logger.debug("Active celery tasks: " + str(active_tasks))
|
logger.debug("Active celery tasks: " + str(active_tasks))
|
||||||
for task in list(running_tasks):
|
for task in list(running_celery_tasks):
|
||||||
if (task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')):
|
if (task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')):
|
||||||
# NOTE: Pull status again and make sure it didn't finish in
|
# NOTE: Pull status again and make sure it didn't finish in
|
||||||
# the meantime?
|
# the meantime?
|
||||||
@@ -122,7 +132,7 @@ def rebuild_graph():
|
|||||||
))
|
))
|
||||||
task.save()
|
task.save()
|
||||||
task.socketio_emit_status("failed")
|
task.socketio_emit_status("failed")
|
||||||
running_tasks.pop(running_tasks.index(task))
|
running_tasks.pop(task)
|
||||||
logger.error("Task %s appears orphaned... marking as failed" % task)
|
logger.error("Task %s appears orphaned... marking as failed" % task)
|
||||||
|
|
||||||
# Create and process dependencies for new tasks
|
# Create and process dependencies for new tasks
|
||||||
@@ -171,6 +181,8 @@ def process_graph(graph, task_capacity):
|
|||||||
"""Given a task dependency graph, start and manage tasks given their
|
"""Given a task dependency graph, start and manage tasks given their
|
||||||
priority and weight.
|
priority and weight.
|
||||||
"""
|
"""
|
||||||
|
from awx.main.tasks import handle_work_error, handle_work_success
|
||||||
|
|
||||||
leaf_nodes = graph.get_leaf_nodes()
|
leaf_nodes = graph.get_leaf_nodes()
|
||||||
running_nodes = filter(lambda x: x['node_object'].status == 'running', leaf_nodes)
|
running_nodes = filter(lambda x: x['node_object'].status == 'running', leaf_nodes)
|
||||||
running_impact = sum([t['node_object'].task_impact for t in running_nodes])
|
running_impact = sum([t['node_object'].task_impact for t in running_nodes])
|
||||||
@@ -190,33 +202,57 @@ def process_graph(graph, task_capacity):
|
|||||||
node_dependencies = graph.get_dependents(node_obj)
|
node_dependencies = graph.get_dependents(node_obj)
|
||||||
# Allow other tasks to continue if a job fails, even if they are
|
# Allow other tasks to continue if a job fails, even if they are
|
||||||
# other jobs.
|
# other jobs.
|
||||||
if graph.get_node_type(node_obj) == 'job':
|
|
||||||
|
node_type = graph.get_node_type(node_obj)
|
||||||
|
if node_type == 'job':
|
||||||
|
# clear dependencies because a job can block (not necessarily
|
||||||
|
# depend) on other jobs that share the same job template
|
||||||
node_dependencies = []
|
node_dependencies = []
|
||||||
|
|
||||||
|
# Make the workflow_job look like it's started by setting status to
|
||||||
|
# running, but don't make a celery Task for it.
|
||||||
|
# Introduce jobs from the workflow so they are candidates to run.
|
||||||
|
# Call process_graph() again to allow choosing for run, the
|
||||||
|
# created candidate jobs.
|
||||||
|
elif node_type == 'workflow_job':
|
||||||
|
node_obj.start()
|
||||||
|
spawn_workflow_graph_jobs([node_obj])
|
||||||
|
return process_graph(graph, task_capacity)
|
||||||
|
|
||||||
dependent_nodes = [{'type': graph.get_node_type(node_obj), 'id': node_obj.id}] + \
|
dependent_nodes = [{'type': graph.get_node_type(node_obj), 'id': node_obj.id}] + \
|
||||||
[{'type': graph.get_node_type(n['node_object']),
|
[{'type': graph.get_node_type(n['node_object']),
|
||||||
'id': n['node_object'].id} for n in node_dependencies]
|
'id': n['node_object'].id} for n in node_dependencies]
|
||||||
error_handler = handle_work_error.s(subtasks=dependent_nodes)
|
error_handler = handle_work_error.s(subtasks=dependent_nodes)
|
||||||
success_handler = handle_work_success.s(task_actual={'type': graph.get_node_type(node_obj),
|
success_handler = handle_work_success.s(task_actual={'type': graph.get_node_type(node_obj),
|
||||||
'id': node_obj.id})
|
'id': node_obj.id})
|
||||||
start_status = node_obj.start(error_callback=error_handler, success_callback=success_handler)
|
with transaction.atomic():
|
||||||
if not start_status:
|
start_status = node_obj.start(error_callback=error_handler, success_callback=success_handler)
|
||||||
node_obj.status = 'failed'
|
if not start_status:
|
||||||
if node_obj.job_explanation:
|
node_obj.status = 'failed'
|
||||||
node_obj.job_explanation += ' '
|
if node_obj.job_explanation:
|
||||||
node_obj.job_explanation += 'Task failed pre-start check.'
|
node_obj.job_explanation += ' '
|
||||||
node_obj.save()
|
node_obj.job_explanation += 'Task failed pre-start check.'
|
||||||
continue
|
node_obj.save()
|
||||||
|
continue
|
||||||
remaining_volume -= impact
|
remaining_volume -= impact
|
||||||
running_impact += impact
|
running_impact += impact
|
||||||
logger.info('Started Node: %s (capacity hit: %s) '
|
logger.info('Started Node: %s (capacity hit: %s) '
|
||||||
'Remaining Capacity: %s' %
|
'Remaining Capacity: %s' %
|
||||||
(str(node_obj), str(impact), str(remaining_volume)))
|
(str(node_obj), str(impact), str(remaining_volume)))
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def schedule():
|
def schedule():
|
||||||
|
lockfile = open("/tmp/tower_scheduler.lock", "w")
|
||||||
|
fcntl.lockf(lockfile, fcntl.LOCK_EX)
|
||||||
|
|
||||||
task_capacity = get_system_task_capacity()
|
task_capacity = get_system_task_capacity()
|
||||||
|
|
||||||
|
workflow_jobs = get_running_workflow_jobs()
|
||||||
|
process_finished_workflow_jobs(workflow_jobs)
|
||||||
|
spawn_workflow_graph_jobs(workflow_jobs)
|
||||||
|
|
||||||
graph = rebuild_graph()
|
graph = rebuild_graph()
|
||||||
if graph:
|
if graph:
|
||||||
process_graph(graph, task_capacity)
|
process_graph(graph, task_capacity)
|
||||||
|
|
||||||
|
fcntl.lockf(lockfile, fcntl.LOCK_UN)
|
||||||
|
|
||||||
|
|||||||
@@ -1665,21 +1665,30 @@ class RunSystemJob(BaseTask):
|
|||||||
def build_cwd(self, instance, **kwargs):
|
def build_cwd(self, instance, **kwargs):
|
||||||
return settings.BASE_DIR
|
return settings.BASE_DIR
|
||||||
|
|
||||||
|
'''
|
||||||
class RunWorkflowJob(BaseTask):
|
class RunWorkflowJob(BaseTask):
|
||||||
|
|
||||||
name = 'awx.main.tasks.run_workflow_job'
|
name = 'awx.main.tasks.run_workflow_job'
|
||||||
model = WorkflowJob
|
model = WorkflowJob
|
||||||
|
|
||||||
def run(self, pk, **kwargs):
|
def run(self, pk, **kwargs):
|
||||||
print("I'm a running a workflow job")
|
#Run the job/task and capture its output.
|
||||||
'''
|
|
||||||
Run the job/task and capture its output.
|
|
||||||
'''
|
|
||||||
pass
|
|
||||||
instance = self.update_model(pk, status='running', celery_task_id=self.request.id)
|
instance = self.update_model(pk, status='running', celery_task_id=self.request.id)
|
||||||
instance.socketio_emit_status("running")
|
instance.socketio_emit_status("running")
|
||||||
|
|
||||||
# FIXME: Detect workflow run completion
|
# FIXME: Currently, the workflow job busy waits until the graph run is
|
||||||
|
# complete. Instead, the workflow job should return or never even run,
|
||||||
|
# because all of the "launch logic" can be done schedule().
|
||||||
|
|
||||||
|
# However, other aspects of our system depend on a 1-1 relationship
|
||||||
|
# between a Job and a Celery Task.
|
||||||
|
#
|
||||||
|
# * If we let the workflow job task (RunWorkflowJob.run()) complete
|
||||||
|
# then how do we trigger the handle_work_error and
|
||||||
|
# handle_work_success subtasks?
|
||||||
|
#
|
||||||
|
# * How do we handle the recovery process? (i.e. there is an entry in
|
||||||
|
# the database but not in celery).
|
||||||
while True:
|
while True:
|
||||||
dag = WorkflowDAG(instance)
|
dag = WorkflowDAG(instance)
|
||||||
if dag.is_workflow_done():
|
if dag.is_workflow_done():
|
||||||
@@ -1689,4 +1698,4 @@ class RunWorkflowJob(BaseTask):
|
|||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
instance.socketio_emit_status(instance.status)
|
instance.socketio_emit_status(instance.status)
|
||||||
# TODO: Handle cancel
|
# TODO: Handle cancel
|
||||||
|
'''
|
||||||
|
|||||||
@@ -360,7 +360,7 @@ CELERY_ROUTES = ({'awx.main.tasks.run_job': {'queue': 'jobs',
|
|||||||
'routing_key': 'scheduler.job.launch'},
|
'routing_key': 'scheduler.job.launch'},
|
||||||
'awx.main.scheduler.tasks.run_job_complete': {'queue': 'scheduler',
|
'awx.main.scheduler.tasks.run_job_complete': {'queue': 'scheduler',
|
||||||
'routing_key': 'scheduler.job.complete'},})
|
'routing_key': 'scheduler.job.complete'},})
|
||||||
|
|
||||||
CELERYBEAT_SCHEDULE = {
|
CELERYBEAT_SCHEDULE = {
|
||||||
'tower_scheduler': {
|
'tower_scheduler': {
|
||||||
'task': 'awx.main.tasks.tower_periodic_scheduler',
|
'task': 'awx.main.tasks.tower_periodic_scheduler',
|
||||||
|
|||||||
Reference in New Issue
Block a user