mirror of
https://github.com/ansible/awx.git
synced 2026-05-14 12:57:40 -02:30
Implementing models for instance groups, updating task manager
* New InstanceGroup model and associative relationship with Instances * Associative instances between Organizations, Inventory, and Job Templates and InstanceGroups * Migrations for adding fields and tables for Instance Groups * Adding activity stream reference for instance groups * Task Manager Refactoring: ** Simplifying task manager relationships and move away from the interstitial hash tables ** Simplify dependency determination logic ** Reduce task manager runtime complexity by removing the partial references and moving the logic into the task manager directly or relying on Job model logic for determinism
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
#Copyright (c) 2015 Ansible, Inc.
|
||||
# Copyright (c) 2015 Ansible, Inc.
|
||||
# All Rights Reserved
|
||||
|
||||
# Python
|
||||
@@ -11,6 +11,7 @@ from django.conf import settings
|
||||
from django.db import transaction, connection
|
||||
from django.db.utils import DatabaseError
|
||||
from django.utils.translation import ugettext_lazy as _
|
||||
from django.utils.timezone import now as tz_now
|
||||
|
||||
# AWX
|
||||
from awx.main.models import * # noqa
|
||||
@@ -18,17 +19,6 @@ from awx.main.models import * # noqa
|
||||
from awx.main.scheduler.dag_workflow import WorkflowDAG
|
||||
|
||||
from awx.main.scheduler.dependency_graph import DependencyGraph
|
||||
from awx.main.scheduler.partial import (
|
||||
JobDict,
|
||||
ProjectUpdateDict,
|
||||
ProjectUpdateLatestDict,
|
||||
InventoryUpdateDict,
|
||||
InventoryUpdateLatestDict,
|
||||
InventorySourceDict,
|
||||
SystemJobDict,
|
||||
AdHocCommandDict,
|
||||
WorkflowJobDict,
|
||||
)
|
||||
from awx.main.tasks import _send_notification_templates
|
||||
|
||||
# Celery
|
||||
@@ -38,77 +28,104 @@ logger = logging.getLogger('awx.main.scheduler')
|
||||
|
||||
|
||||
class TaskManager():
|
||||
|
||||
def __init__(self):
|
||||
self.graph = DependencyGraph()
|
||||
self.capacity_total = Instance.objects.total_capacity()
|
||||
self.capacity_used = 0
|
||||
self.graph = dict()
|
||||
for rampart_group in InstanceGroup.objects.all():
|
||||
self.graph[rampart_group.name] = dict(graph=DependencyGraph(rampart_group.name),
|
||||
capacity_total=rampart_group.capacity,
|
||||
capacity_used=0)
|
||||
|
||||
def get_tasks(self):
|
||||
status_list = ('pending', 'waiting', 'running')
|
||||
def is_job_blocked(self, task):
|
||||
# TODO: I'm not happy with this, I think blocking behavior should be decided outside of the dependency graph
|
||||
# in the old task manager this was handled as a method on each task object outside of the graph and
|
||||
# probably has the side effect of cutting down *a lot* of the logic from this task manager class
|
||||
for g in self.graph:
|
||||
if self.graph[g]['graph'].is_job_blocked(task):
|
||||
return True
|
||||
return False
|
||||
|
||||
jobs = JobDict.filter_partial(status=status_list)
|
||||
inventory_updates = InventoryUpdateDict.filter_partial(status=status_list)
|
||||
project_updates = ProjectUpdateDict.filter_partial(status=status_list)
|
||||
system_jobs = SystemJobDict.filter_partial(status=status_list)
|
||||
ad_hoc_commands = AdHocCommandDict.filter_partial(status=status_list)
|
||||
workflow_jobs = WorkflowJobDict.filter_partial(status=status_list)
|
||||
def get_tasks(self, status_list=('pending', 'waiting', 'running')):
|
||||
jobs = [j for j in Job.objects.filter(status__in=status_list)]
|
||||
inventory_updates = [i for i in InventoryUpdate.objects.filter(status__in=status_list)]
|
||||
project_updates = [p for p in ProjectUpdate.objects.filter(status__in=status_list)]
|
||||
system_jobs = [s for s in SystemJob.objects.filter(status__in=status_list)]
|
||||
ad_hoc_commands = [a for a in AdHocCommand.objects.filter(status__in=status_list)]
|
||||
workflow_jobs = [w for w in WorkflowJob.objects.filter(status__in=status_list)]
|
||||
all_tasks = sorted(jobs + project_updates + inventory_updates + system_jobs + ad_hoc_commands + workflow_jobs,
|
||||
key=lambda task: task.created)
|
||||
return all_tasks
|
||||
|
||||
all_actions = sorted(jobs + project_updates + inventory_updates + system_jobs + ad_hoc_commands + workflow_jobs,
|
||||
key=lambda task: task['created'])
|
||||
return all_actions
|
||||
@classmethod
|
||||
def get_node_type(cls, obj):
|
||||
if type(obj) == Job:
|
||||
return "job"
|
||||
elif type(obj) == AdHocCommand:
|
||||
return "ad_hoc_command"
|
||||
elif type(obj) == InventoryUpdate:
|
||||
return "inventory_update"
|
||||
elif type(obj) == ProjectUpdate:
|
||||
return "project_update"
|
||||
elif type(obj) == SystemJob:
|
||||
return "system_job"
|
||||
elif type(obj) == WorkflowJob:
|
||||
return "workflow_job"
|
||||
return "unknown"
|
||||
|
||||
'''
|
||||
Tasks that are running and SHOULD have a celery task.
|
||||
'''
|
||||
def get_running_tasks(self):
|
||||
status_list = ('running',)
|
||||
def get_running_tasks(self, all_tasks=None):
|
||||
if all_tasks is None:
|
||||
return self.get_tasks(status_list=('running',))
|
||||
return filter(lambda t: t.status == 'running', all_tasks)
|
||||
|
||||
jobs = JobDict.filter_partial(status=status_list)
|
||||
inventory_updates = InventoryUpdateDict.filter_partial(status=status_list)
|
||||
project_updates = ProjectUpdateDict.filter_partial(status=status_list)
|
||||
system_jobs = SystemJobDict.filter_partial(status=status_list)
|
||||
ad_hoc_commands = AdHocCommandDict.filter_partial(status=status_list)
|
||||
'''
|
||||
Tasks that are currently running in celery
|
||||
'''
|
||||
def get_active_tasks(self):
|
||||
inspector = inspect()
|
||||
if not hasattr(settings, 'IGNORE_CELERY_INSPECTOR'):
|
||||
active_task_queues = inspector.active()
|
||||
else:
|
||||
logger.warn("Ignoring celery task inspector")
|
||||
active_task_queues = None
|
||||
|
||||
all_actions = sorted(jobs + project_updates + inventory_updates + system_jobs + ad_hoc_commands,
|
||||
key=lambda task: task['created'])
|
||||
return all_actions
|
||||
active_tasks = set()
|
||||
if active_task_queues is not None:
|
||||
for queue in active_task_queues:
|
||||
map(lambda at: active_tasks.add(at['id']), active_task_queues[queue])
|
||||
else:
|
||||
if not hasattr(settings, 'CELERY_UNIT_TEST'):
|
||||
return (None, None)
|
||||
|
||||
return (active_task_queues, active_tasks)
|
||||
|
||||
# TODO: Consider a database query for this logic
|
||||
def get_latest_project_update_tasks(self, all_sorted_tasks):
|
||||
project_ids = Set()
|
||||
for task in all_sorted_tasks:
|
||||
if type(task) == JobDict:
|
||||
project_ids.add(task['project_id'])
|
||||
if isinstance(task, Job):
|
||||
project_ids.add(task.project_id)
|
||||
return ProjectUpdate.objects.filter(id__in=project_ids)
|
||||
|
||||
return ProjectUpdateLatestDict.filter_partial(list(project_ids))
|
||||
|
||||
# TODO: Consider a database query for this logic
|
||||
def get_latest_inventory_update_tasks(self, all_sorted_tasks):
|
||||
inventory_ids = Set()
|
||||
for task in all_sorted_tasks:
|
||||
if type(task) == JobDict:
|
||||
inventory_ids.add(task['inventory_id'])
|
||||
|
||||
return InventoryUpdateLatestDict.filter_partial(list(inventory_ids))
|
||||
|
||||
if isinstance(task, Job):
|
||||
inventory_ids.add(task.inventory_id)
|
||||
return InventoryUpdate.objects.filter(id__in=inventory_ids)
|
||||
|
||||
def get_running_workflow_jobs(self):
|
||||
graph_workflow_jobs = [wf for wf in
|
||||
WorkflowJob.objects.filter(status='running')]
|
||||
return graph_workflow_jobs
|
||||
|
||||
# TODO: Consider a database query for this logic
|
||||
def get_inventory_source_tasks(self, all_sorted_tasks):
|
||||
inventory_ids = Set()
|
||||
results = []
|
||||
for task in all_sorted_tasks:
|
||||
if type(task) is JobDict:
|
||||
inventory_ids.add(task['inventory_id'])
|
||||
|
||||
for inventory_id in inventory_ids:
|
||||
results.append((inventory_id, InventorySourceDict.filter_partial(inventory_id)))
|
||||
|
||||
return results
|
||||
if isinstance(task, Job):
|
||||
inventory_ids.add(task.inventory_id)
|
||||
return [invsrc for invsrc in InventorySource.objects.filter(inventory_id__in=inventory_ids)]
|
||||
|
||||
def spawn_workflow_graph_jobs(self, workflow_jobs):
|
||||
for workflow_job in workflow_jobs:
|
||||
@@ -158,40 +175,21 @@ class TaskManager():
|
||||
connection.on_commit(lambda: workflow_job.websocket_emit_status(workflow_job.status))
|
||||
return result
|
||||
|
||||
def get_active_tasks(self):
|
||||
inspector = inspect()
|
||||
if not hasattr(settings, 'IGNORE_CELERY_INSPECTOR'):
|
||||
active_task_queues = inspector.active()
|
||||
else:
|
||||
logger.warn("Ignoring celery task inspector")
|
||||
active_task_queues = None
|
||||
|
||||
active_tasks = set()
|
||||
if active_task_queues is not None:
|
||||
for queue in active_task_queues:
|
||||
map(lambda at: active_tasks.add(at['id']), active_task_queues[queue])
|
||||
else:
|
||||
if not hasattr(settings, 'CELERY_UNIT_TEST'):
|
||||
return (None, None)
|
||||
|
||||
return (active_task_queues, active_tasks)
|
||||
|
||||
def get_dependent_jobs_for_inv_and_proj_update(self, job_obj):
|
||||
return [{'type': j.model_to_str(), 'id': j.id} for j in job_obj.dependent_jobs.all()]
|
||||
|
||||
def start_task(self, task, dependent_tasks=[]):
|
||||
def start_task(self, task, rampart_group, dependent_tasks=[]):
|
||||
from awx.main.tasks import handle_work_error, handle_work_success
|
||||
|
||||
task_actual = {
|
||||
'type':task.get_job_type_str(),
|
||||
'id': task['id'],
|
||||
'type':self.get_node_type(task),
|
||||
'id': task.id,
|
||||
}
|
||||
dependencies = [{'type': t.get_job_type_str(), 'id': t['id']} for t in dependent_tasks]
|
||||
dependencies = [{'type': self.get_node_type(t), 'id': t.id} for t in dependent_tasks]
|
||||
|
||||
error_handler = handle_work_error.s(subtasks=[task_actual] + dependencies)
|
||||
success_handler = handle_work_success.s(task_actual=task_actual)
|
||||
|
||||
job_obj = task.get_full()
|
||||
'''
|
||||
This is to account for when there isn't enough capacity to execute all
|
||||
dependent jobs (i.e. proj or inv update) within the same schedule()
|
||||
@@ -202,56 +200,51 @@ class TaskManager():
|
||||
failure dependency.
|
||||
'''
|
||||
if len(dependencies) == 0:
|
||||
dependencies = self.get_dependent_jobs_for_inv_and_proj_update(job_obj)
|
||||
job_obj.status = 'waiting'
|
||||
dependencies = self.get_dependent_jobs_for_inv_and_proj_update(task)
|
||||
task.status = 'waiting'
|
||||
|
||||
(start_status, opts) = job_obj.pre_start()
|
||||
(start_status, opts) = task.pre_start()
|
||||
if not start_status:
|
||||
job_obj.status = 'failed'
|
||||
if job_obj.job_explanation:
|
||||
job_obj.job_explanation += ' '
|
||||
job_obj.job_explanation += 'Task failed pre-start check.'
|
||||
job_obj.save()
|
||||
task.status = 'failed'
|
||||
if task.job_explanation:
|
||||
task.job_explanation += ' '
|
||||
task.job_explanation += 'Task failed pre-start check.'
|
||||
task.save()
|
||||
# TODO: run error handler to fail sub-tasks and send notifications
|
||||
else:
|
||||
if type(job_obj) is WorkflowJob:
|
||||
job_obj.status = 'running'
|
||||
if type(task) is WorkflowJob:
|
||||
task.status = 'running'
|
||||
task.instance_group = rampart_group
|
||||
task.save()
|
||||
|
||||
job_obj.save()
|
||||
|
||||
self.consume_capacity(task)
|
||||
self.consume_capacity(task, rampart_group.name)
|
||||
|
||||
def post_commit():
|
||||
job_obj.websocket_emit_status(job_obj.status)
|
||||
if job_obj.status != 'failed':
|
||||
job_obj.start_celery_task(opts, error_callback=error_handler, success_callback=success_handler)
|
||||
task.websocket_emit_status(task.status)
|
||||
if task.status != 'failed':
|
||||
task.start_celery_task(opts, error_callback=error_handler, success_callback=success_handler, queue=rampart_group.name)
|
||||
|
||||
connection.on_commit(post_commit)
|
||||
|
||||
def process_runnable_tasks(self, runnable_tasks):
|
||||
map(lambda task: self.graph.add_job(task), runnable_tasks)
|
||||
def process_running_tasks(self, running_tasks):
|
||||
map(lambda task: self.graph[task.instance_group.name]['graph'].add_job(task), running_tasks)
|
||||
|
||||
def create_project_update(self, task):
|
||||
dep = Project.objects.get(id=task['project_id']).create_project_update(launch_type='dependency')
|
||||
project_task = Project.objects.get(id=task.project_id).create_project_update(launch_type='dependency')
|
||||
|
||||
# Project created 1 seconds behind
|
||||
dep.created = task['created'] - timedelta(seconds=1)
|
||||
dep.status = 'pending'
|
||||
dep.save()
|
||||
|
||||
project_task = ProjectUpdateDict.get_partial(dep.id)
|
||||
|
||||
project_task.created = task.created - timedelta(seconds=1)
|
||||
project_task.status = 'pending'
|
||||
project_task.save()
|
||||
return project_task
|
||||
|
||||
def create_inventory_update(self, task, inventory_source_task):
|
||||
dep = InventorySource.objects.get(id=inventory_source_task['id']).create_inventory_update(launch_type='dependency')
|
||||
dep = InventorySource.objects.get(id=inventory_source_task.id).create_inventory_update(launch_type='dependency')
|
||||
|
||||
dep.created = task['created'] - timedelta(seconds=2)
|
||||
dep.created = task.created - timedelta(seconds=2)
|
||||
dep.status = 'pending'
|
||||
dep.save()
|
||||
|
||||
inventory_task = InventoryUpdateDict.get_partial(dep.id)
|
||||
|
||||
'''
|
||||
Update internal datastructures with the newly created inventory update
|
||||
'''
|
||||
@@ -261,104 +254,116 @@ class TaskManager():
|
||||
|
||||
inventory_sources = self.get_inventory_source_tasks([task])
|
||||
self.process_inventory_sources(inventory_sources)
|
||||
|
||||
self.graph.add_job(inventory_task)
|
||||
|
||||
return inventory_task
|
||||
|
||||
'''
|
||||
Since we are dealing with partial objects we don't get to take advantage
|
||||
of Django to resolve the type of related Many to Many field dependent_job.
|
||||
|
||||
Hence the, potentional, double query in this method.
|
||||
'''
|
||||
def get_related_dependent_jobs_as_patials(self, job_ids):
|
||||
dependent_partial_jobs = []
|
||||
for id in job_ids:
|
||||
if ProjectUpdate.objects.filter(id=id).exists():
|
||||
dependent_partial_jobs.append(ProjectUpdateDict({"id": id}).refresh_partial())
|
||||
elif InventoryUpdate.objects.filter(id=id).exists():
|
||||
dependent_partial_jobs.append(InventoryUpdateDict({"id": id}).refresh_partial())
|
||||
return dependent_partial_jobs
|
||||
return dep
|
||||
|
||||
def capture_chain_failure_dependencies(self, task, dependencies):
|
||||
for dep in dependencies:
|
||||
dep_obj = dep.get_full()
|
||||
dep_obj.dependent_jobs.add(task['id'])
|
||||
dep_obj.save()
|
||||
'''
|
||||
if not 'dependent_jobs__id' in task.data:
|
||||
task.data['dependent_jobs__id'] = [dep_obj.data['id']]
|
||||
else:
|
||||
task.data['dependent_jobs__id'].append(dep_obj.data['id'])
|
||||
'''
|
||||
dep.dependent_jobs.add(task.id)
|
||||
dep.save()
|
||||
|
||||
def should_update_inventory_source(self, job, inventory_source):
|
||||
now = tz_now()
|
||||
|
||||
# Already processed dependencies for this job
|
||||
if job.dependent_jobs.all():
|
||||
return False
|
||||
latest_inventory_update = InventoryUpdate.objects.filter(inventory_source=inventory_source).order_by("created")
|
||||
if not latest_inventory_update.exists():
|
||||
return True
|
||||
latest_inventory_update = latest_inventory_update.first()
|
||||
timeout_seconds = timedelta(seconds=latest_inventory_update.inventory_source.update_cache_timeout)
|
||||
if (latest_inventory_update.finished + timeout_seconds) < now:
|
||||
return True
|
||||
if latest_inventory_update.inventory_source.update_on_launch is True and \
|
||||
latest_inventory_update.status in ['failed', 'canceled', 'error']:
|
||||
return True
|
||||
return False
|
||||
|
||||
def should_update_related_project(self, job):
|
||||
now = tz_now()
|
||||
if job.dependent_jobs.all():
|
||||
return False
|
||||
latest_project_update = ProjectUpdate.objects.filter(project=job.project).order_by("created")
|
||||
if not latest_project_update.exists():
|
||||
return True
|
||||
latest_project_update = latest_project_update.first()
|
||||
if latest_project_update.status in ['failed', 'canceled']:
|
||||
return True
|
||||
|
||||
'''
|
||||
If the latest project update has a created time == job_created_time-1
|
||||
then consider the project update found. This is so we don't enter an infinite loop
|
||||
of updating the project when cache timeout is 0.
|
||||
'''
|
||||
if latest_project_update.project.scm_update_cache_timeout == 0 and \
|
||||
latest_project_update.launch_type == 'dependency' and \
|
||||
latest_project_update.created == job.created - timedelta(seconds=1):
|
||||
return False
|
||||
'''
|
||||
Normal Cache Timeout Logic
|
||||
'''
|
||||
timeout_seconds = timedelta(seconds=latest_project_update.project.scm_update_cache_timeout)
|
||||
if (latest_project_update.finished + timeout_seconds) < now:
|
||||
return True
|
||||
return False
|
||||
|
||||
def generate_dependencies(self, task):
|
||||
dependencies = []
|
||||
# TODO: What if the project is null ?
|
||||
if type(task) is JobDict:
|
||||
if type(task) is Job:
|
||||
|
||||
if task['project__scm_update_on_launch'] is True and \
|
||||
self.graph.should_update_related_project(task):
|
||||
if task.project.scm_update_on_launch is True and \
|
||||
self.should_update_related_project(task):
|
||||
project_task = self.create_project_update(task)
|
||||
dependencies.append(project_task)
|
||||
# Inventory created 2 seconds behind job
|
||||
|
||||
'''
|
||||
Inventory may have already been synced from a provision callback.
|
||||
'''
|
||||
inventory_sources_already_updated = task.get_inventory_sources_already_updated()
|
||||
|
||||
'''
|
||||
get_inventory_sources() only return update on launch sources
|
||||
'''
|
||||
for inventory_source_task in self.graph.get_inventory_sources(task['inventory_id']):
|
||||
if inventory_source_task['id'] in inventory_sources_already_updated:
|
||||
continue
|
||||
if self.graph.should_update_related_inventory_source(task, inventory_source_task['id']):
|
||||
inventory_task = self.create_inventory_update(task, inventory_source_task)
|
||||
dependencies.append(inventory_task)
|
||||
|
||||
if task.launch_type != 'callback':
|
||||
for inventory_source in [invsrc for invsrc in self.all_inventory_sources if invsrc.inventory == task.inventory]:
|
||||
if self.should_update_inventory_source(task, inventory_source):
|
||||
inventory_task = self.create_inventory_update(task, inventory_source)
|
||||
dependencies.append(inventory_task)
|
||||
self.capture_chain_failure_dependencies(task, dependencies)
|
||||
return dependencies
|
||||
|
||||
def process_latest_project_updates(self, latest_project_updates):
|
||||
map(lambda task: self.graph.add_latest_project_update(task), latest_project_updates)
|
||||
|
||||
def process_latest_inventory_updates(self, latest_inventory_updates):
|
||||
map(lambda task: self.graph.add_latest_inventory_update(task), latest_inventory_updates)
|
||||
|
||||
def process_inventory_sources(self, inventory_id_sources):
|
||||
map(lambda (inventory_id, inventory_sources): self.graph.add_inventory_sources(inventory_id, inventory_sources), inventory_id_sources)
|
||||
|
||||
def process_dependencies(self, dependent_task, dependency_tasks):
|
||||
for task in dependency_tasks:
|
||||
# ProjectUpdate or InventoryUpdate may be blocked by another of
|
||||
# the same type.
|
||||
if not self.graph.is_job_blocked(task):
|
||||
self.graph.add_job(task)
|
||||
if not self.would_exceed_capacity(task):
|
||||
self.start_task(task, [dependent_task])
|
||||
else:
|
||||
self.graph.add_job(task)
|
||||
if self.is_job_blocked(task):
|
||||
logger.debug("Dependent task {} is blocked from running".format(task))
|
||||
continue
|
||||
preferred_instance_groups = task.preferred_instance_groups
|
||||
found_acceptable_queue = False
|
||||
for rampart_group in preferred_instance_groups:
|
||||
if self.get_remaining_capacity(rampart_group.name) <= 0:
|
||||
logger.debug("Skipping group {} capacity <= 0".format(rampart_group.name))
|
||||
continue
|
||||
if not self.would_exceed_capacity(task, rampart_group.name):
|
||||
logger.debug("Starting dependent task {} in group {}".format(task, rampart_group.name))
|
||||
self.graph[rampart_group.name]['graph'].add_job(task)
|
||||
self.start_task(task, rampart_group, dependency_tasks)
|
||||
found_acceptable_queue = True
|
||||
if not found_acceptable_queue:
|
||||
logger.debug("Dependent task {} couldn't be scheduled on graph, waiting for next cycle".format(task))
|
||||
|
||||
def process_pending_tasks(self, pending_tasks):
|
||||
for task in pending_tasks:
|
||||
# Stop processing tasks if we know we are out of capacity
|
||||
if self.get_remaining_capacity() <= 0:
|
||||
self.process_dependencies(task, self.generate_dependencies(task))
|
||||
if self.is_job_blocked(task):
|
||||
logger.debug("Task {} is blocked from running".format(task))
|
||||
return
|
||||
|
||||
if not self.graph.is_job_blocked(task):
|
||||
dependencies = self.generate_dependencies(task)
|
||||
self.process_dependencies(task, dependencies)
|
||||
|
||||
# Spawning deps might have blocked us
|
||||
if not self.graph.is_job_blocked(task):
|
||||
self.graph.add_job(task)
|
||||
if not self.would_exceed_capacity(task):
|
||||
self.start_task(task)
|
||||
else:
|
||||
self.graph.add_job(task)
|
||||
preferred_instance_groups = task.preferred_instance_groups
|
||||
found_acceptable_queue = False
|
||||
for rampart_group in preferred_instance_groups:
|
||||
if self.get_remaining_capacity(rampart_group.name) <= 0:
|
||||
logger.debug("Skipping group {} capacity <= 0".format(rampart_group.name))
|
||||
continue
|
||||
if not self.would_exceed_capacity(task, rampart_group.name):
|
||||
logger.debug("Starting task {} in group {}".format(task, rampart_group.name))
|
||||
self.graph[rampart_group.name]['graph'].add_job(task)
|
||||
self.start_task(task, rampart_group)
|
||||
found_acceptable_queue = True
|
||||
break
|
||||
if not found_acceptable_queue:
|
||||
logger.debug("Task {} couldn't be scheduled on graph, waiting for next cycle".format(task))
|
||||
|
||||
def process_celery_tasks(self, active_tasks, all_running_sorted_tasks):
|
||||
'''
|
||||
@@ -366,66 +371,68 @@ class TaskManager():
|
||||
'''
|
||||
for task in all_running_sorted_tasks:
|
||||
|
||||
if (task['celery_task_id'] not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')):
|
||||
if (task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')):
|
||||
# TODO: try catch the getting of the job. The job COULD have been deleted
|
||||
task_obj = task.get_full()
|
||||
# Ensure job did not finish running between the time we get the
|
||||
# list of task id's from celery and now.
|
||||
# Note: This is an actual fix, not a reduction in the time
|
||||
# window that this can happen.
|
||||
if task_obj.status != 'running':
|
||||
if task.status != 'running':
|
||||
continue
|
||||
task_obj.status = 'failed'
|
||||
task_obj.job_explanation += ' '.join((
|
||||
task.status = 'failed'
|
||||
task.job_explanation += ' '.join((
|
||||
'Task was marked as running in Tower but was not present in',
|
||||
'Celery, so it has been marked as failed.',
|
||||
))
|
||||
task_obj.save()
|
||||
_send_notification_templates(task_obj, 'failed')
|
||||
task_obj.websocket_emit_status('failed')
|
||||
|
||||
task.save()
|
||||
_send_notification_templates(task, 'failed')
|
||||
task.websocket_emit_status('failed')
|
||||
logger.error("Task %s appears orphaned... marking as failed" % task)
|
||||
|
||||
|
||||
def calculate_capacity_used(self, tasks):
|
||||
self.capacity_used = 0
|
||||
for rampart_group in self.graph:
|
||||
self.graph[rampart_group]['capacity_used'] = 0
|
||||
for t in tasks:
|
||||
self.capacity_used += t.task_impact()
|
||||
for group_actual in InstanceGroup.objects.filter(instances__hostname=t.execution_node).values_list('name'):
|
||||
if group_actual[0] in self.graph:
|
||||
self.graph[group_actual[0]]['capacity_used'] += t.task_impact
|
||||
|
||||
def would_exceed_capacity(self, task):
|
||||
if self.capacity_used == 0:
|
||||
def would_exceed_capacity(self, task, instance_group):
|
||||
current_capacity = self.graph[instance_group]['capacity_used']
|
||||
capacity_total = self.graph[instance_group]['capacity_total']
|
||||
if current_capacity == 0:
|
||||
return False
|
||||
return (task.task_impact() + self.capacity_used > self.capacity_total)
|
||||
return (task.task_impact + current_capacity > capacity_total)
|
||||
|
||||
def consume_capacity(self, task):
|
||||
self.capacity_used += task.task_impact()
|
||||
def consume_capacity(self, task, instance_group):
|
||||
self.graph[instance_group]['capacity_used'] += task.task_impact
|
||||
|
||||
def get_remaining_capacity(self):
|
||||
return (self.capacity_total - self.capacity_used)
|
||||
def get_remaining_capacity(self, instance_group):
|
||||
return (self.graph[instance_group]['capacity_total'] - self.graph[instance_group]['capacity_used'])
|
||||
|
||||
def process_tasks(self, all_sorted_tasks):
|
||||
running_tasks = filter(lambda t: t['status'] == 'running', all_sorted_tasks)
|
||||
runnable_tasks = filter(lambda t: t['status'] in ['waiting', 'running'], all_sorted_tasks)
|
||||
running_tasks = filter(lambda t: t.status in ['waiting', 'running'], all_sorted_tasks)
|
||||
|
||||
self.calculate_capacity_used(running_tasks)
|
||||
|
||||
self.process_runnable_tasks(runnable_tasks)
|
||||
self.process_running_tasks(running_tasks)
|
||||
|
||||
pending_tasks = filter(lambda t: t['status'] in 'pending', all_sorted_tasks)
|
||||
pending_tasks = filter(lambda t: t.status in 'pending', all_sorted_tasks)
|
||||
self.process_pending_tasks(pending_tasks)
|
||||
|
||||
def _schedule(self):
|
||||
finished_wfjs = []
|
||||
all_sorted_tasks = self.get_tasks()
|
||||
if len(all_sorted_tasks) > 0:
|
||||
latest_project_updates = self.get_latest_project_update_tasks(all_sorted_tasks)
|
||||
self.process_latest_project_updates(latest_project_updates)
|
||||
# TODO: Deal with
|
||||
# latest_project_updates = self.get_latest_project_update_tasks(all_sorted_tasks)
|
||||
# self.process_latest_project_updates(latest_project_updates)
|
||||
|
||||
latest_inventory_updates = self.get_latest_inventory_update_tasks(all_sorted_tasks)
|
||||
self.process_latest_inventory_updates(latest_inventory_updates)
|
||||
# latest_inventory_updates = self.get_latest_inventory_update_tasks(all_sorted_tasks)
|
||||
# self.process_latest_inventory_updates(latest_inventory_updates)
|
||||
|
||||
inventory_id_sources = self.get_inventory_source_tasks(all_sorted_tasks)
|
||||
self.process_inventory_sources(inventory_id_sources)
|
||||
self.all_inventory_sources = self.get_inventory_source_tasks(all_sorted_tasks)
|
||||
|
||||
running_workflow_tasks = self.get_running_workflow_jobs()
|
||||
finished_wfjs = self.process_finished_workflow_jobs(running_workflow_tasks)
|
||||
|
||||
Reference in New Issue
Block a user