From 4ced911c00e984e622bd20874ac236935777651e Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Wed, 3 May 2017 15:28:31 -0400 Subject: [PATCH] Implementing models for instance groups, updating task manager * New InstanceGroup model and associative relationship with Instances * Associative instances between Organizations, Inventory, and Job Templates and InstanceGroups * Migrations for adding fields and tables for Instance Groups * Adding activity stream reference for instance groups * Task Manager Refactoring: ** Simplifying task manager relationships and move away from the interstitial hash tables ** Simplify dependency determination logic ** Reduce task manager runtime complexity by removing the partial references and moving the logic into the task manager directly or relying on Job model logic for determinism --- Makefile | 2 +- awx/api/serializers.py | 2 +- .../migrations/0043_v320_rampartgroups.py | 50 ++ awx/main/models/ha.py | 21 +- awx/main/models/inventory.py | 16 +- awx/main/models/jobs.py | 15 +- awx/main/models/organization.py | 4 + awx/main/models/projects.py | 9 + awx/main/models/unified_jobs.py | 27 +- awx/main/scheduler/__init__.py | 435 +++++++++--------- awx/main/scheduler/dag_simple.py | 16 - awx/main/scheduler/dependency_graph.py | 159 ++----- awx/main/scheduler/partial.py | 274 ----------- awx/main/tasks.py | 9 +- tools/docker-compose.yml | 2 +- 15 files changed, 407 insertions(+), 634 deletions(-) create mode 100644 awx/main/migrations/0043_v320_rampartgroups.py delete mode 100644 awx/main/scheduler/partial.py diff --git a/Makefile b/Makefile index 00c54e7fc4..25654e4669 100644 --- a/Makefile +++ b/Makefile @@ -438,7 +438,7 @@ celeryd: @if [ "$(VENV_BASE)" ]; then \ . $(VENV_BASE)/tower/bin/activate; \ fi; \ - $(PYTHON) manage.py celeryd -l DEBUG -B --autoreload --autoscale=20,3 --schedule=$(CELERY_SCHEDULE_FILE) -Q projects,jobs,default,scheduler,broadcast_all,$(COMPOSE_HOST) -n celery@$(COMPOSE_HOST) + $(PYTHON) manage.py celeryd -l DEBUG -B --autoreload --autoscale=20,3 --schedule=$(CELERY_SCHEDULE_FILE) -Q default,tower_scheduler,tower_broadcast_all,tower,$(COMPOSE_HOST) -n celery@$(COMPOSE_HOST) #$(PYTHON) manage.py celery multi show projects jobs default -l DEBUG -Q:projects projects -Q:jobs jobs -Q:default default -c:projects 1 -c:jobs 3 -c:default 3 -Ofair -B --schedule=$(CELERY_SCHEDULE_FILE) # Run to start the zeromq callback receiver diff --git a/awx/api/serializers.py b/awx/api/serializers.py index 1084ac8bbd..af4629ee02 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -2263,7 +2263,7 @@ class JobSerializer(UnifiedJobSerializer, JobOptionsSerializer): fields = ('*', 'job_template', 'passwords_needed_to_start', 'ask_variables_on_launch', 'ask_limit_on_launch', 'ask_tags_on_launch', 'ask_skip_tags_on_launch', 'ask_job_type_on_launch', 'ask_inventory_on_launch', 'ask_credential_on_launch', - 'allow_simultaneous', 'artifacts', 'scm_revision',) + 'allow_simultaneous', 'artifacts', 'scm_revision') def get_related(self, obj): res = super(JobSerializer, self).get_related(obj) diff --git a/awx/main/migrations/0043_v320_rampartgroups.py b/awx/main/migrations/0043_v320_rampartgroups.py new file mode 100644 index 0000000000..a89f64ba17 --- /dev/null +++ b/awx/main/migrations/0043_v320_rampartgroups.py @@ -0,0 +1,50 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + + dependencies = [ + ('main', '0042_v320_drop_v1_credential_fields'), + ] + + operations = [ + migrations.CreateModel( + name='InstanceGroup', + fields=[ + ('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)), + ('name', models.CharField(unique=True, max_length=250)), + ('created', models.DateTimeField(auto_now_add=True)), + ('modified', models.DateTimeField(auto_now=True)), + ('instances', models.ManyToManyField(help_text='Instances that are members of this InstanceGroup', related_name='rampart_groups', editable=False, to='main.Instance')), + ], + ), + migrations.AddField( + model_name='inventory', + name='instance_groups', + field=models.ManyToManyField(to='main.InstanceGroup', blank=True), + ), + migrations.AddField( + model_name='unifiedjob', + name='instance_group', + field=models.ForeignKey(on_delete=django.db.models.deletion.SET_NULL, default=None, blank=True, to='main.InstanceGroup', help_text='The Rampart/Instance group the job was run under', null=True), + ), + migrations.AddField( + model_name='unifiedjobtemplate', + name='instance_groups', + field=models.ManyToManyField(to='main.InstanceGroup', blank=True), + ), + migrations.AddField( + model_name='organization', + name='instance_groups', + field=models.ManyToManyField(to='main.InstanceGroup', blank=True), + ), + migrations.AddField( + model_name='activitystream', + name='instance_group', + field=models.ManyToManyField(to='main.InstanceGroup', blank=True), + ), + ] diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index 2a75f1440a..275689d905 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -4,6 +4,7 @@ from django.db import models from django.db.models.signals import post_save from django.dispatch import receiver +from django.utils.translation import ugettext_lazy as _ from solo.models import SingletonModel @@ -17,9 +18,7 @@ __all__ = ('Instance', 'JobOrigin', 'TowerScheduleState',) class Instance(models.Model): - """A model representing an Ansible Tower instance, primary or secondary, - running against this database. - """ + """A model representing an Ansible Tower instance running against this database.""" objects = InstanceManager() uuid = models.CharField(max_length=40) @@ -41,6 +40,22 @@ class Instance(models.Model): return "tower" +class InstanceGroup(models.Model): + """A model representing a Queue/Group of Tower Instances.""" + name = models.CharField(max_length=250, unique=True) + created = models.DateTimeField(auto_now_add=True) + modified = models.DateTimeField(auto_now=True) + instances = models.ManyToManyField( + 'Instance', + related_name='rampart_groups', + editable=False, + help_text=_('Instances that are members of this InstanceGroup'), + ) + + class Meta: + app_label = 'main' + + class TowerScheduleState(SingletonModel): schedule_last_run = models.DateTimeField(auto_now_add=True) diff --git a/awx/main/models/inventory.py b/awx/main/models/inventory.py index 3cd94c3446..02f1d2acd2 100644 --- a/awx/main/models/inventory.py +++ b/awx/main/models/inventory.py @@ -121,7 +121,10 @@ class Inventory(CommonModelNameNotUnique, ResourceMixin): default=None, help_text=_('Filter that will be applied to the hosts of this inventory.'), ) - + instance_groups = models.ManyToManyField( + 'InstanceGroup', + blank=True, + ) admin_role = ImplicitRoleField( parent_role='organization.admin_role', ) @@ -1379,6 +1382,17 @@ class InventoryUpdate(UnifiedJob, InventorySourceOptions, JobNotificationMixin): def get_notification_friendly_name(self): return "Inventory Update" + @property + def preferred_instance_groups(self): + if self.inventory is not None and self.inventory.organization is not None: + organization_groups = [x for x in self.inventory.organization.instance_groups.all()] + else: + organization_groups = [] + if self.inventory is not None: + inventory_groups = [x for x in self.inventory.instance_groups.all()] + template_groups = [x for x in super(InventoryUpdate, self).preferred_instance_groups] + return template_groups + inventory_groups + organization_groups + def _build_job_explanation(self): if not self.job_explanation: return 'Previous Task Canceled: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % \ diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index c15ab663a3..d45db583e4 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -244,6 +244,7 @@ class JobTemplate(UnifiedJobTemplate, JobOptions, SurveyJobTemplateMixin, Resour blank=True, default=False, ) + admin_role = ImplicitRoleField( parent_role=['project.organization.admin_role', 'inventory.organization.admin_role'] ) @@ -467,7 +468,6 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin): ) - @classmethod def _get_parent_field_name(cls): return 'job_template' @@ -627,6 +627,19 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin): return "$hidden due to Ansible no_log flag$" return artifacts + @property + def preferred_instance_groups(self): + if self.project is not None and self.project.organization is not None: + organization_groups = [x for x in self.project.organization.instance_groups.all()] + else: + organization_groups = [] + if self.inventory is not None: + inventory_groups = [x for x in self.inventory.instance_groups.all()] + else: + inventory_groups = [] + template_groups = [x for x in super(Job, self).preferred_instance_groups] + return template_groups + inventory_groups + organization_groups + # Job Credential required @property def can_start(self): diff --git a/awx/main/models/organization.py b/awx/main/models/organization.py index eccfdfba42..4679850120 100644 --- a/awx/main/models/organization.py +++ b/awx/main/models/organization.py @@ -36,6 +36,10 @@ class Organization(CommonModel, NotificationFieldsModel, ResourceMixin): app_label = 'main' ordering = ('name',) + instance_groups = models.ManyToManyField( + 'InstanceGroup', + blank=True, + ) admin_role = ImplicitRoleField( parent_role='singleton:' + ROLE_SINGLETON_SYSTEM_ADMINISTRATOR, ) diff --git a/awx/main/models/projects.py b/awx/main/models/projects.py index 05125a8f8e..560c8aa81a 100644 --- a/awx/main/models/projects.py +++ b/awx/main/models/projects.py @@ -509,3 +509,12 @@ class ProjectUpdate(UnifiedJob, ProjectOptions, JobNotificationMixin): def get_notification_friendly_name(self): return "Project Update" + + @property + def preferred_instance_groups(self): + if self.project is not None and self.project.organization is not None: + organization_groups = [x for x in self.project.organization.instance_groups.all()] + else: + organization_groups = [] + template_groups = [x for x in super(ProjectUpdate, self).preferred_instance_groups] + return template_groups + organization_groups diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 23f98c6b7d..b849e2e8a9 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -152,6 +152,10 @@ class UnifiedJobTemplate(PolymorphicModel, CommonModelNameNotUnique, Notificatio blank=True, related_name='%(class)s_labels' ) + instance_groups = models.ManyToManyField( + 'InstanceGroup', + blank=True, + ) def get_absolute_url(self, request=None): real_instance = self.get_real_instance() @@ -561,6 +565,14 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique blank=True, related_name='%(class)s_labels' ) + instance_group = models.ForeignKey( + 'InstanceGroup', + blank=True, + null=True, + default=None, + on_delete=models.SET_NULL, + help_text=_('The Rampart/Instance group the job was run under'), + ) def get_absolute_url(self, request=None): real_instance = self.get_real_instance() @@ -936,9 +948,9 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique return (True, opts) - def start_celery_task(self, opts, error_callback, success_callback): + def start_celery_task(self, opts, error_callback, success_callback, queue): task_class = self._get_task_class() - task_class().apply_async((self.pk,), opts, link_error=error_callback, link=success_callback) + task_class().apply_async((self.pk,), opts, link_error=error_callback, link=success_callback, queue=queue) def start(self, error_callback, success_callback, **kwargs): ''' @@ -1046,3 +1058,14 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique if settings.BROKER_URL.startswith('amqp://'): self._force_cancel() return self.cancel_flag + + @property + def preferred_instance_groups(self): + ''' + Return Instance/Rampart Groups preferred by this unified job templates + ''' + from awx.main.models.ha import InstanceGroup + default_instance_group = InstanceGroup.objects.filter(name='tower') + template_groups = [x for x in self.unified_job_template.instance_groups.all()] + if not template_groups and default_instance_group.exists(): + return [default_instance_group.first()] diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index dc0a4c82e0..9d71d40a8b 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -1,4 +1,4 @@ -#Copyright (c) 2015 Ansible, Inc. +# Copyright (c) 2015 Ansible, Inc. # All Rights Reserved # Python @@ -11,6 +11,7 @@ from django.conf import settings from django.db import transaction, connection from django.db.utils import DatabaseError from django.utils.translation import ugettext_lazy as _ +from django.utils.timezone import now as tz_now # AWX from awx.main.models import * # noqa @@ -18,17 +19,6 @@ from awx.main.models import * # noqa from awx.main.scheduler.dag_workflow import WorkflowDAG from awx.main.scheduler.dependency_graph import DependencyGraph -from awx.main.scheduler.partial import ( - JobDict, - ProjectUpdateDict, - ProjectUpdateLatestDict, - InventoryUpdateDict, - InventoryUpdateLatestDict, - InventorySourceDict, - SystemJobDict, - AdHocCommandDict, - WorkflowJobDict, -) from awx.main.tasks import _send_notification_templates # Celery @@ -38,77 +28,104 @@ logger = logging.getLogger('awx.main.scheduler') class TaskManager(): + def __init__(self): - self.graph = DependencyGraph() - self.capacity_total = Instance.objects.total_capacity() - self.capacity_used = 0 + self.graph = dict() + for rampart_group in InstanceGroup.objects.all(): + self.graph[rampart_group.name] = dict(graph=DependencyGraph(rampart_group.name), + capacity_total=rampart_group.capacity, + capacity_used=0) - def get_tasks(self): - status_list = ('pending', 'waiting', 'running') + def is_job_blocked(self, task): + # TODO: I'm not happy with this, I think blocking behavior should be decided outside of the dependency graph + # in the old task manager this was handled as a method on each task object outside of the graph and + # probably has the side effect of cutting down *a lot* of the logic from this task manager class + for g in self.graph: + if self.graph[g]['graph'].is_job_blocked(task): + return True + return False - jobs = JobDict.filter_partial(status=status_list) - inventory_updates = InventoryUpdateDict.filter_partial(status=status_list) - project_updates = ProjectUpdateDict.filter_partial(status=status_list) - system_jobs = SystemJobDict.filter_partial(status=status_list) - ad_hoc_commands = AdHocCommandDict.filter_partial(status=status_list) - workflow_jobs = WorkflowJobDict.filter_partial(status=status_list) + def get_tasks(self, status_list=('pending', 'waiting', 'running')): + jobs = [j for j in Job.objects.filter(status__in=status_list)] + inventory_updates = [i for i in InventoryUpdate.objects.filter(status__in=status_list)] + project_updates = [p for p in ProjectUpdate.objects.filter(status__in=status_list)] + system_jobs = [s for s in SystemJob.objects.filter(status__in=status_list)] + ad_hoc_commands = [a for a in AdHocCommand.objects.filter(status__in=status_list)] + workflow_jobs = [w for w in WorkflowJob.objects.filter(status__in=status_list)] + all_tasks = sorted(jobs + project_updates + inventory_updates + system_jobs + ad_hoc_commands + workflow_jobs, + key=lambda task: task.created) + return all_tasks - all_actions = sorted(jobs + project_updates + inventory_updates + system_jobs + ad_hoc_commands + workflow_jobs, - key=lambda task: task['created']) - return all_actions + @classmethod + def get_node_type(cls, obj): + if type(obj) == Job: + return "job" + elif type(obj) == AdHocCommand: + return "ad_hoc_command" + elif type(obj) == InventoryUpdate: + return "inventory_update" + elif type(obj) == ProjectUpdate: + return "project_update" + elif type(obj) == SystemJob: + return "system_job" + elif type(obj) == WorkflowJob: + return "workflow_job" + return "unknown" ''' Tasks that are running and SHOULD have a celery task. ''' - def get_running_tasks(self): - status_list = ('running',) + def get_running_tasks(self, all_tasks=None): + if all_tasks is None: + return self.get_tasks(status_list=('running',)) + return filter(lambda t: t.status == 'running', all_tasks) - jobs = JobDict.filter_partial(status=status_list) - inventory_updates = InventoryUpdateDict.filter_partial(status=status_list) - project_updates = ProjectUpdateDict.filter_partial(status=status_list) - system_jobs = SystemJobDict.filter_partial(status=status_list) - ad_hoc_commands = AdHocCommandDict.filter_partial(status=status_list) + ''' + Tasks that are currently running in celery + ''' + def get_active_tasks(self): + inspector = inspect() + if not hasattr(settings, 'IGNORE_CELERY_INSPECTOR'): + active_task_queues = inspector.active() + else: + logger.warn("Ignoring celery task inspector") + active_task_queues = None - all_actions = sorted(jobs + project_updates + inventory_updates + system_jobs + ad_hoc_commands, - key=lambda task: task['created']) - return all_actions + active_tasks = set() + if active_task_queues is not None: + for queue in active_task_queues: + map(lambda at: active_tasks.add(at['id']), active_task_queues[queue]) + else: + if not hasattr(settings, 'CELERY_UNIT_TEST'): + return (None, None) + + return (active_task_queues, active_tasks) - # TODO: Consider a database query for this logic def get_latest_project_update_tasks(self, all_sorted_tasks): project_ids = Set() for task in all_sorted_tasks: - if type(task) == JobDict: - project_ids.add(task['project_id']) + if isinstance(task, Job): + project_ids.add(task.project_id) + return ProjectUpdate.objects.filter(id__in=project_ids) - return ProjectUpdateLatestDict.filter_partial(list(project_ids)) - - # TODO: Consider a database query for this logic def get_latest_inventory_update_tasks(self, all_sorted_tasks): inventory_ids = Set() for task in all_sorted_tasks: - if type(task) == JobDict: - inventory_ids.add(task['inventory_id']) - - return InventoryUpdateLatestDict.filter_partial(list(inventory_ids)) - + if isinstance(task, Job): + inventory_ids.add(task.inventory_id) + return InventoryUpdate.objects.filter(id__in=inventory_ids) def get_running_workflow_jobs(self): graph_workflow_jobs = [wf for wf in WorkflowJob.objects.filter(status='running')] return graph_workflow_jobs - # TODO: Consider a database query for this logic def get_inventory_source_tasks(self, all_sorted_tasks): inventory_ids = Set() - results = [] for task in all_sorted_tasks: - if type(task) is JobDict: - inventory_ids.add(task['inventory_id']) - - for inventory_id in inventory_ids: - results.append((inventory_id, InventorySourceDict.filter_partial(inventory_id))) - - return results + if isinstance(task, Job): + inventory_ids.add(task.inventory_id) + return [invsrc for invsrc in InventorySource.objects.filter(inventory_id__in=inventory_ids)] def spawn_workflow_graph_jobs(self, workflow_jobs): for workflow_job in workflow_jobs: @@ -158,40 +175,21 @@ class TaskManager(): connection.on_commit(lambda: workflow_job.websocket_emit_status(workflow_job.status)) return result - def get_active_tasks(self): - inspector = inspect() - if not hasattr(settings, 'IGNORE_CELERY_INSPECTOR'): - active_task_queues = inspector.active() - else: - logger.warn("Ignoring celery task inspector") - active_task_queues = None - - active_tasks = set() - if active_task_queues is not None: - for queue in active_task_queues: - map(lambda at: active_tasks.add(at['id']), active_task_queues[queue]) - else: - if not hasattr(settings, 'CELERY_UNIT_TEST'): - return (None, None) - - return (active_task_queues, active_tasks) - def get_dependent_jobs_for_inv_and_proj_update(self, job_obj): return [{'type': j.model_to_str(), 'id': j.id} for j in job_obj.dependent_jobs.all()] - def start_task(self, task, dependent_tasks=[]): + def start_task(self, task, rampart_group, dependent_tasks=[]): from awx.main.tasks import handle_work_error, handle_work_success task_actual = { - 'type':task.get_job_type_str(), - 'id': task['id'], + 'type':self.get_node_type(task), + 'id': task.id, } - dependencies = [{'type': t.get_job_type_str(), 'id': t['id']} for t in dependent_tasks] + dependencies = [{'type': self.get_node_type(t), 'id': t.id} for t in dependent_tasks] error_handler = handle_work_error.s(subtasks=[task_actual] + dependencies) success_handler = handle_work_success.s(task_actual=task_actual) - job_obj = task.get_full() ''' This is to account for when there isn't enough capacity to execute all dependent jobs (i.e. proj or inv update) within the same schedule() @@ -202,56 +200,51 @@ class TaskManager(): failure dependency. ''' if len(dependencies) == 0: - dependencies = self.get_dependent_jobs_for_inv_and_proj_update(job_obj) - job_obj.status = 'waiting' + dependencies = self.get_dependent_jobs_for_inv_and_proj_update(task) + task.status = 'waiting' - (start_status, opts) = job_obj.pre_start() + (start_status, opts) = task.pre_start() if not start_status: - job_obj.status = 'failed' - if job_obj.job_explanation: - job_obj.job_explanation += ' ' - job_obj.job_explanation += 'Task failed pre-start check.' - job_obj.save() + task.status = 'failed' + if task.job_explanation: + task.job_explanation += ' ' + task.job_explanation += 'Task failed pre-start check.' + task.save() # TODO: run error handler to fail sub-tasks and send notifications else: - if type(job_obj) is WorkflowJob: - job_obj.status = 'running' + if type(task) is WorkflowJob: + task.status = 'running' + task.instance_group = rampart_group + task.save() - job_obj.save() - - self.consume_capacity(task) + self.consume_capacity(task, rampart_group.name) def post_commit(): - job_obj.websocket_emit_status(job_obj.status) - if job_obj.status != 'failed': - job_obj.start_celery_task(opts, error_callback=error_handler, success_callback=success_handler) + task.websocket_emit_status(task.status) + if task.status != 'failed': + task.start_celery_task(opts, error_callback=error_handler, success_callback=success_handler, queue=rampart_group.name) connection.on_commit(post_commit) - def process_runnable_tasks(self, runnable_tasks): - map(lambda task: self.graph.add_job(task), runnable_tasks) + def process_running_tasks(self, running_tasks): + map(lambda task: self.graph[task.instance_group.name]['graph'].add_job(task), running_tasks) def create_project_update(self, task): - dep = Project.objects.get(id=task['project_id']).create_project_update(launch_type='dependency') + project_task = Project.objects.get(id=task.project_id).create_project_update(launch_type='dependency') # Project created 1 seconds behind - dep.created = task['created'] - timedelta(seconds=1) - dep.status = 'pending' - dep.save() - - project_task = ProjectUpdateDict.get_partial(dep.id) - + project_task.created = task.created - timedelta(seconds=1) + project_task.status = 'pending' + project_task.save() return project_task def create_inventory_update(self, task, inventory_source_task): - dep = InventorySource.objects.get(id=inventory_source_task['id']).create_inventory_update(launch_type='dependency') + dep = InventorySource.objects.get(id=inventory_source_task.id).create_inventory_update(launch_type='dependency') - dep.created = task['created'] - timedelta(seconds=2) + dep.created = task.created - timedelta(seconds=2) dep.status = 'pending' dep.save() - inventory_task = InventoryUpdateDict.get_partial(dep.id) - ''' Update internal datastructures with the newly created inventory update ''' @@ -261,104 +254,116 @@ class TaskManager(): inventory_sources = self.get_inventory_source_tasks([task]) self.process_inventory_sources(inventory_sources) - - self.graph.add_job(inventory_task) - - return inventory_task - - ''' - Since we are dealing with partial objects we don't get to take advantage - of Django to resolve the type of related Many to Many field dependent_job. - - Hence the, potentional, double query in this method. - ''' - def get_related_dependent_jobs_as_patials(self, job_ids): - dependent_partial_jobs = [] - for id in job_ids: - if ProjectUpdate.objects.filter(id=id).exists(): - dependent_partial_jobs.append(ProjectUpdateDict({"id": id}).refresh_partial()) - elif InventoryUpdate.objects.filter(id=id).exists(): - dependent_partial_jobs.append(InventoryUpdateDict({"id": id}).refresh_partial()) - return dependent_partial_jobs + return dep def capture_chain_failure_dependencies(self, task, dependencies): for dep in dependencies: - dep_obj = dep.get_full() - dep_obj.dependent_jobs.add(task['id']) - dep_obj.save() - ''' - if not 'dependent_jobs__id' in task.data: - task.data['dependent_jobs__id'] = [dep_obj.data['id']] - else: - task.data['dependent_jobs__id'].append(dep_obj.data['id']) - ''' + dep.dependent_jobs.add(task.id) + dep.save() + + def should_update_inventory_source(self, job, inventory_source): + now = tz_now() + + # Already processed dependencies for this job + if job.dependent_jobs.all(): + return False + latest_inventory_update = InventoryUpdate.objects.filter(inventory_source=inventory_source).order_by("created") + if not latest_inventory_update.exists(): + return True + latest_inventory_update = latest_inventory_update.first() + timeout_seconds = timedelta(seconds=latest_inventory_update.inventory_source.update_cache_timeout) + if (latest_inventory_update.finished + timeout_seconds) < now: + return True + if latest_inventory_update.inventory_source.update_on_launch is True and \ + latest_inventory_update.status in ['failed', 'canceled', 'error']: + return True + return False + + def should_update_related_project(self, job): + now = tz_now() + if job.dependent_jobs.all(): + return False + latest_project_update = ProjectUpdate.objects.filter(project=job.project).order_by("created") + if not latest_project_update.exists(): + return True + latest_project_update = latest_project_update.first() + if latest_project_update.status in ['failed', 'canceled']: + return True + + ''' + If the latest project update has a created time == job_created_time-1 + then consider the project update found. This is so we don't enter an infinite loop + of updating the project when cache timeout is 0. + ''' + if latest_project_update.project.scm_update_cache_timeout == 0 and \ + latest_project_update.launch_type == 'dependency' and \ + latest_project_update.created == job.created - timedelta(seconds=1): + return False + ''' + Normal Cache Timeout Logic + ''' + timeout_seconds = timedelta(seconds=latest_project_update.project.scm_update_cache_timeout) + if (latest_project_update.finished + timeout_seconds) < now: + return True + return False def generate_dependencies(self, task): dependencies = [] # TODO: What if the project is null ? - if type(task) is JobDict: + if type(task) is Job: - if task['project__scm_update_on_launch'] is True and \ - self.graph.should_update_related_project(task): + if task.project.scm_update_on_launch is True and \ + self.should_update_related_project(task): project_task = self.create_project_update(task) dependencies.append(project_task) # Inventory created 2 seconds behind job - - ''' - Inventory may have already been synced from a provision callback. - ''' - inventory_sources_already_updated = task.get_inventory_sources_already_updated() - - ''' - get_inventory_sources() only return update on launch sources - ''' - for inventory_source_task in self.graph.get_inventory_sources(task['inventory_id']): - if inventory_source_task['id'] in inventory_sources_already_updated: - continue - if self.graph.should_update_related_inventory_source(task, inventory_source_task['id']): - inventory_task = self.create_inventory_update(task, inventory_source_task) - dependencies.append(inventory_task) - + if task.launch_type != 'callback': + for inventory_source in [invsrc for invsrc in self.all_inventory_sources if invsrc.inventory == task.inventory]: + if self.should_update_inventory_source(task, inventory_source): + inventory_task = self.create_inventory_update(task, inventory_source) + dependencies.append(inventory_task) self.capture_chain_failure_dependencies(task, dependencies) return dependencies - def process_latest_project_updates(self, latest_project_updates): - map(lambda task: self.graph.add_latest_project_update(task), latest_project_updates) - - def process_latest_inventory_updates(self, latest_inventory_updates): - map(lambda task: self.graph.add_latest_inventory_update(task), latest_inventory_updates) - - def process_inventory_sources(self, inventory_id_sources): - map(lambda (inventory_id, inventory_sources): self.graph.add_inventory_sources(inventory_id, inventory_sources), inventory_id_sources) - def process_dependencies(self, dependent_task, dependency_tasks): for task in dependency_tasks: - # ProjectUpdate or InventoryUpdate may be blocked by another of - # the same type. - if not self.graph.is_job_blocked(task): - self.graph.add_job(task) - if not self.would_exceed_capacity(task): - self.start_task(task, [dependent_task]) - else: - self.graph.add_job(task) + if self.is_job_blocked(task): + logger.debug("Dependent task {} is blocked from running".format(task)) + continue + preferred_instance_groups = task.preferred_instance_groups + found_acceptable_queue = False + for rampart_group in preferred_instance_groups: + if self.get_remaining_capacity(rampart_group.name) <= 0: + logger.debug("Skipping group {} capacity <= 0".format(rampart_group.name)) + continue + if not self.would_exceed_capacity(task, rampart_group.name): + logger.debug("Starting dependent task {} in group {}".format(task, rampart_group.name)) + self.graph[rampart_group.name]['graph'].add_job(task) + self.start_task(task, rampart_group, dependency_tasks) + found_acceptable_queue = True + if not found_acceptable_queue: + logger.debug("Dependent task {} couldn't be scheduled on graph, waiting for next cycle".format(task)) def process_pending_tasks(self, pending_tasks): for task in pending_tasks: - # Stop processing tasks if we know we are out of capacity - if self.get_remaining_capacity() <= 0: + self.process_dependencies(task, self.generate_dependencies(task)) + if self.is_job_blocked(task): + logger.debug("Task {} is blocked from running".format(task)) return - - if not self.graph.is_job_blocked(task): - dependencies = self.generate_dependencies(task) - self.process_dependencies(task, dependencies) - - # Spawning deps might have blocked us - if not self.graph.is_job_blocked(task): - self.graph.add_job(task) - if not self.would_exceed_capacity(task): - self.start_task(task) - else: - self.graph.add_job(task) + preferred_instance_groups = task.preferred_instance_groups + found_acceptable_queue = False + for rampart_group in preferred_instance_groups: + if self.get_remaining_capacity(rampart_group.name) <= 0: + logger.debug("Skipping group {} capacity <= 0".format(rampart_group.name)) + continue + if not self.would_exceed_capacity(task, rampart_group.name): + logger.debug("Starting task {} in group {}".format(task, rampart_group.name)) + self.graph[rampart_group.name]['graph'].add_job(task) + self.start_task(task, rampart_group) + found_acceptable_queue = True + break + if not found_acceptable_queue: + logger.debug("Task {} couldn't be scheduled on graph, waiting for next cycle".format(task)) def process_celery_tasks(self, active_tasks, all_running_sorted_tasks): ''' @@ -366,66 +371,68 @@ class TaskManager(): ''' for task in all_running_sorted_tasks: - if (task['celery_task_id'] not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')): + if (task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')): # TODO: try catch the getting of the job. The job COULD have been deleted - task_obj = task.get_full() # Ensure job did not finish running between the time we get the # list of task id's from celery and now. # Note: This is an actual fix, not a reduction in the time # window that this can happen. - if task_obj.status != 'running': + if task.status != 'running': continue - task_obj.status = 'failed' - task_obj.job_explanation += ' '.join(( + task.status = 'failed' + task.job_explanation += ' '.join(( 'Task was marked as running in Tower but was not present in', 'Celery, so it has been marked as failed.', )) - task_obj.save() - _send_notification_templates(task_obj, 'failed') - task_obj.websocket_emit_status('failed') - + task.save() + _send_notification_templates(task, 'failed') + task.websocket_emit_status('failed') logger.error("Task %s appears orphaned... marking as failed" % task) def calculate_capacity_used(self, tasks): - self.capacity_used = 0 + for rampart_group in self.graph: + self.graph[rampart_group]['capacity_used'] = 0 for t in tasks: - self.capacity_used += t.task_impact() + for group_actual in InstanceGroup.objects.filter(instances__hostname=t.execution_node).values_list('name'): + if group_actual[0] in self.graph: + self.graph[group_actual[0]]['capacity_used'] += t.task_impact - def would_exceed_capacity(self, task): - if self.capacity_used == 0: + def would_exceed_capacity(self, task, instance_group): + current_capacity = self.graph[instance_group]['capacity_used'] + capacity_total = self.graph[instance_group]['capacity_total'] + if current_capacity == 0: return False - return (task.task_impact() + self.capacity_used > self.capacity_total) + return (task.task_impact + current_capacity > capacity_total) - def consume_capacity(self, task): - self.capacity_used += task.task_impact() + def consume_capacity(self, task, instance_group): + self.graph[instance_group]['capacity_used'] += task.task_impact - def get_remaining_capacity(self): - return (self.capacity_total - self.capacity_used) + def get_remaining_capacity(self, instance_group): + return (self.graph[instance_group]['capacity_total'] - self.graph[instance_group]['capacity_used']) def process_tasks(self, all_sorted_tasks): - running_tasks = filter(lambda t: t['status'] == 'running', all_sorted_tasks) - runnable_tasks = filter(lambda t: t['status'] in ['waiting', 'running'], all_sorted_tasks) + running_tasks = filter(lambda t: t.status in ['waiting', 'running'], all_sorted_tasks) self.calculate_capacity_used(running_tasks) - self.process_runnable_tasks(runnable_tasks) + self.process_running_tasks(running_tasks) - pending_tasks = filter(lambda t: t['status'] in 'pending', all_sorted_tasks) + pending_tasks = filter(lambda t: t.status in 'pending', all_sorted_tasks) self.process_pending_tasks(pending_tasks) def _schedule(self): finished_wfjs = [] all_sorted_tasks = self.get_tasks() if len(all_sorted_tasks) > 0: - latest_project_updates = self.get_latest_project_update_tasks(all_sorted_tasks) - self.process_latest_project_updates(latest_project_updates) + # TODO: Deal with + # latest_project_updates = self.get_latest_project_update_tasks(all_sorted_tasks) + # self.process_latest_project_updates(latest_project_updates) - latest_inventory_updates = self.get_latest_inventory_update_tasks(all_sorted_tasks) - self.process_latest_inventory_updates(latest_inventory_updates) + # latest_inventory_updates = self.get_latest_inventory_update_tasks(all_sorted_tasks) + # self.process_latest_inventory_updates(latest_inventory_updates) - inventory_id_sources = self.get_inventory_source_tasks(all_sorted_tasks) - self.process_inventory_sources(inventory_id_sources) + self.all_inventory_sources = self.get_inventory_source_tasks(all_sorted_tasks) running_workflow_tasks = self.get_running_workflow_jobs() finished_wfjs = self.process_finished_workflow_jobs(running_workflow_tasks) diff --git a/awx/main/scheduler/dag_simple.py b/awx/main/scheduler/dag_simple.py index 1bfb387569..c7bde94101 100644 --- a/awx/main/scheduler/dag_simple.py +++ b/awx/main/scheduler/dag_simple.py @@ -5,7 +5,6 @@ from awx.main.models import ( InventoryUpdate, ProjectUpdate, WorkflowJob, - SystemJob, ) @@ -86,21 +85,6 @@ class SimpleDAG(object): return idx return None - def get_node_type(self, obj): - if type(obj) == Job: - return "job" - elif type(obj) == AdHocCommand: - return "ad_hoc_command" - elif type(obj) == InventoryUpdate: - return "inventory_update" - elif type(obj) == ProjectUpdate: - return "project_update" - elif type(obj) == SystemJob: - return "system_job" - elif type(obj) == WorkflowJob: - return "workflow_job" - return "unknown" - def get_dependencies(self, obj, label=None): antecedents = [] this_ord = self.find_ord(obj) diff --git a/awx/main/scheduler/dependency_graph.py b/awx/main/scheduler/dependency_graph.py index 33e4dfada8..db21c322fa 100644 --- a/awx/main/scheduler/dependency_graph.py +++ b/awx/main/scheduler/dependency_graph.py @@ -1,13 +1,12 @@ -from datetime import timedelta from django.utils.timezone import now as tz_now -from awx.main.scheduler.partial import ( - JobDict, - ProjectUpdateDict, - InventoryUpdateDict, - SystemJobDict, - AdHocCommandDict, - WorkflowJobDict, +from awx.main.models import ( + Job, + ProjectUpdate, + InventoryUpdate, + SystemJob, + AdHocCommand, + WorkflowJob, ) @@ -28,7 +27,8 @@ class DependencyGraph(object): INVENTORY_SOURCES = 'inventory_source_ids' - def __init__(self, *args, **kwargs): + def __init__(self, queue): + self.queue = queue self.data = {} # project_id -> True / False self.data[self.PROJECT_UPDATES] = {} @@ -53,7 +53,7 @@ class DependencyGraph(object): # workflow_job_template_id -> True / False self.data[self.WORKFLOW_JOB_TEMPLATES_JOBS] = {} - # project_id -> latest ProjectUpdateLatestDict + # project_id -> latest ProjectUpdateLatestDict' self.data[self.LATEST_PROJECT_UPDATES] = {} # inventory_source_id -> latest InventoryUpdateLatestDict self.data[self.LATEST_INVENTORY_UPDATES] = {} @@ -62,89 +62,16 @@ class DependencyGraph(object): self.data[self.INVENTORY_SOURCES] = {} def add_latest_project_update(self, job): - self.data[self.LATEST_PROJECT_UPDATES][job['project_id']] = job - - def add_latest_inventory_update(self, job): - self.data[self.LATEST_INVENTORY_UPDATES][job['inventory_source_id']] = job - - def add_inventory_sources(self, inventory_id, inventory_sources): - self.data[self.INVENTORY_SOURCES][inventory_id] = inventory_sources - - def get_inventory_sources(self, inventory_id): - return self.data[self.INVENTORY_SOURCES].get(inventory_id, []) + self.data[self.LATEST_PROJECT_UPDATES][job.project_id] = job def get_now(self): return tz_now() - - ''' - JobDict - - Presume that job is related to a project that is update on launch - ''' - def should_update_related_project(self, job): - now = self.get_now() - - # Already processed dependencies for this job - if job.data['dependent_jobs__id'] is not None: - return False - - latest_project_update = self.data[self.LATEST_PROJECT_UPDATES].get(job['project_id'], None) - if not latest_project_update: - return True - - # TODO: Other finished, failed cases? i.e. error ? - if latest_project_update['status'] in ['failed', 'canceled']: - return True - - ''' - This is a bit of fuzzy logic. - If the latest project update has a created time == job_created_time-1 - then consider the project update found. This is so we don't enter an infinite loop - of updating the project when cache timeout is 0. - ''' - if latest_project_update['project__scm_update_cache_timeout'] == 0 and \ - latest_project_update['launch_type'] == 'dependency' and \ - latest_project_update['created'] == job['created'] - timedelta(seconds=1): - return False - - ''' - Normal, expected, cache timeout logic - ''' - timeout_seconds = timedelta(seconds=latest_project_update['project__scm_update_cache_timeout']) - if (latest_project_update['finished'] + timeout_seconds) < now: - return True - - return False - - def should_update_related_inventory_source(self, job, inventory_source_id): - now = self.get_now() - - # Already processed dependencies for this job - if job.data['dependent_jobs__id'] is not None: - return False - - latest_inventory_update = self.data[self.LATEST_INVENTORY_UPDATES].get(inventory_source_id, None) - if not latest_inventory_update: - return True - - ''' - Normal, expected, cache timeout logic - ''' - timeout_seconds = timedelta(seconds=latest_inventory_update['inventory_source__update_cache_timeout']) - if (latest_inventory_update['finished'] + timeout_seconds) < now: - return True - - if latest_inventory_update['inventory_source__update_on_launch'] is True and \ - latest_inventory_update['status'] in ['failed', 'canceled', 'error']: - return True - - return False - + def mark_system_job(self): self.data[self.SYSTEM_JOB] = False def mark_project_update(self, job): - self.data[self.PROJECT_UPDATES][job['project_id']] = False + self.data[self.PROJECT_UPDATES][job.project_id] = False def mark_inventory_update(self, inventory_id): self.data[self.INVENTORY_UPDATES][inventory_id] = False @@ -153,69 +80,69 @@ class DependencyGraph(object): self.data[self.INVENTORY_SOURCE_UPDATES][inventory_source_id] = False def mark_job_template_job(self, job): - self.data[self.JOB_INVENTORY_IDS][job['inventory_id']] = False - self.data[self.JOB_PROJECT_IDS][job['project_id']] = False - self.data[self.JOB_TEMPLATE_JOBS][job['job_template_id']] = False + self.data[self.JOB_INVENTORY_IDS][job.inventory_id] = False + self.data[self.JOB_PROJECT_IDS][job.project_id] = False + self.data[self.JOB_TEMPLATE_JOBS][job.job_template_id] = False def mark_workflow_job(self, job): - self.data[self.WORKFLOW_JOB_TEMPLATES_JOBS][job['workflow_job_template_id']] = False + self.data[self.WORKFLOW_JOB_TEMPLATES_JOBS][job.workflow_job_template_id] = False def can_project_update_run(self, job): - return self.data[self.JOB_PROJECT_IDS].get(job['project_id'], True) and \ - self.data[self.PROJECT_UPDATES].get(job['project_id'], True) + return self.data[self.JOB_PROJECT_IDS].get(job.project_id, True) and \ + self.data[self.PROJECT_UPDATES].get(job.project_id, True) def can_inventory_update_run(self, job): - return self.data[self.JOB_INVENTORY_IDS].get(job['inventory_source__inventory_id'], True) and \ - self.data[self.INVENTORY_SOURCE_UPDATES].get(job['inventory_source_id'], True) + return self.data[self.JOB_INVENTORY_IDS].get(job.inventory_source.inventory_id, True) and \ + self.data[self.INVENTORY_SOURCE_UPDATES].get(job.inventory_source_id, True) def can_job_run(self, job): - if self.data[self.PROJECT_UPDATES].get(job['project_id'], True) is True and \ - self.data[self.INVENTORY_UPDATES].get(job['inventory_id'], True) is True: - if job['allow_simultaneous'] is False: - return self.data[self.JOB_TEMPLATE_JOBS].get(job['job_template_id'], True) + if self.data[self.PROJECT_UPDATES].get(job.project_id, True) is True and \ + self.data[self.INVENTORY_UPDATES].get(job.inventory_id, True) is True: + if job.allow_simultaneous is False: + return self.data[self.JOB_TEMPLATE_JOBS].get(job.job_template_id, True) else: return True return False def can_workflow_job_run(self, job): - if job['allow_simultaneous'] is True: + if job.allow_simultaneous: return True - return self.data[self.WORKFLOW_JOB_TEMPLATES_JOBS].get(job['workflow_job_template_id'], True) + return self.data[self.WORKFLOW_JOB_TEMPLATES_JOBS].get(job.workflow_job_template_id, True) def can_system_job_run(self): return self.data[self.SYSTEM_JOB] def can_ad_hoc_command_run(self, job): - return self.data[self.INVENTORY_UPDATES].get(job['inventory_id'], True) + return self.data[self.INVENTORY_UPDATES].get(job.inventory_id, True) def is_job_blocked(self, job): - if type(job) is ProjectUpdateDict: + if type(job) is ProjectUpdate: return not self.can_project_update_run(job) - elif type(job) is InventoryUpdateDict: + elif type(job) is InventoryUpdate: return not self.can_inventory_update_run(job) - elif type(job) is JobDict: + elif type(job) is Job: return not self.can_job_run(job) - elif type(job) is SystemJobDict: + elif type(job) is SystemJob: return not self.can_system_job_run() - elif type(job) is AdHocCommandDict: + elif type(job) is AdHocCommand: return not self.can_ad_hoc_command_run(job) - elif type(job) is WorkflowJobDict: + elif type(job) is WorkflowJob: return not self.can_workflow_job_run(job) def add_job(self, job): - if type(job) is ProjectUpdateDict: + if type(job) is ProjectUpdate: self.mark_project_update(job) - elif type(job) is InventoryUpdateDict: - self.mark_inventory_update(job['inventory_source__inventory_id']) - self.mark_inventory_source_update(job['inventory_source_id']) - elif type(job) is JobDict: + elif type(job) is InventoryUpdate: + self.mark_inventory_update(job.inventory_source__inventory_id) + self.mark_inventory_source_update(job.inventory_source_id) + elif type(job) is Job: self.mark_job_template_job(job) - elif type(job) is WorkflowJobDict: + elif type(job) is WorkflowJob: self.mark_workflow_job(job) - elif type(job) is SystemJobDict: + elif type(job) is SystemJob: self.mark_system_job() - elif type(job) is AdHocCommandDict: - self.mark_inventory_update(job['inventory_id']) + elif type(job) is AdHocCommand: + self.mark_inventory_update(job.inventory_id) def add_jobs(self, jobs): map(lambda j: self.add_job(j), jobs) diff --git a/awx/main/scheduler/partial.py b/awx/main/scheduler/partial.py deleted file mode 100644 index 15c4797b85..0000000000 --- a/awx/main/scheduler/partial.py +++ /dev/null @@ -1,274 +0,0 @@ - -# Python -import json -import itertools - -# AWX -from awx.main.utils import decrypt_field_value -from awx.main.models import ( - Job, - ProjectUpdate, - InventoryUpdate, - InventorySource, - SystemJob, - AdHocCommand, - WorkflowJob, -) - - -class PartialModelDict(object): - FIELDS = () - model = None - data = None - - def __init__(self, data): - if type(data) is not dict: - raise RuntimeError("Expected data to be of type dict not %s" % type(data)) - self.data = data - - def __getitem__(self, index): - return self.data[index] - - def __setitem__(self, key, value): - self.data[key] = value - - def get(self, key, **kwargs): - return self.data.get(key, **kwargs) - - def get_full(self): - return self.model.objects.get(id=self.data['id']) - - def refresh_partial(self): - return self.__class__(self.model.objects.filter(id=self.data['id']).values(*self.__class__.get_db_values())[0]) - - @classmethod - def get_partial(cls, id): - return cls(cls.model.objects.filter(id=id).values(*cls.get_db_values())[0]) - - @classmethod - def get_db_values(cls): - return cls.FIELDS - - @classmethod - def filter_partial(cls, status=[]): - kv = { - 'status__in': status - } - return [cls(o) for o in cls.model.objects.filter(**kv).values(*cls.get_db_values())] - - def get_job_type_str(self): - raise RuntimeError("Inherit and implement me") - - def task_impact(self): - raise RuntimeError("Inherit and implement me") - - @classmethod - def merge_values(cls, values): - grouped_results = itertools.groupby(values, key=lambda value: value['id']) - - merged_values = [] - for k, g in grouped_results: - groups = list(g) - merged_value = {} - for group in groups: - for key, val in group.iteritems(): - if not merged_value.get(key): - merged_value[key] = val - elif val != merged_value[key]: - if isinstance(merged_value[key], list): - if val not in merged_value[key]: - merged_value[key].append(val) - else: - old_val = merged_value[key] - merged_value[key] = [old_val, val] - merged_values.append(merged_value) - return merged_values - - -class JobDict(PartialModelDict): - FIELDS = ( - 'id', 'status', 'job_template_id', 'inventory_id', 'project_id', - 'launch_type', 'limit', 'allow_simultaneous', 'created', - 'job_type', 'celery_task_id', 'project__scm_update_on_launch', - 'forks', 'start_args', 'dependent_jobs__id', - ) - model = Job - - def get_job_type_str(self): - return 'job' - - def task_impact(self): - return (5 if self.data['forks'] == 0 else self.data['forks']) * 10 - - def get_inventory_sources_already_updated(self): - try: - start_args = json.loads(decrypt_field_value(self.data['id'], 'start_args', self.data['start_args'])) - except Exception: - return [] - start_args = start_args or {} - return start_args.get('inventory_sources_already_updated', []) - - @classmethod - def filter_partial(cls, status=[]): - kv = { - 'status__in': status - } - merged = PartialModelDict.merge_values(cls.model.objects.filter(**kv).values(*cls.get_db_values())) - return [cls(o) for o in merged] - - -class ProjectUpdateDict(PartialModelDict): - FIELDS = ( - 'id', 'status', 'project_id', 'created', 'celery_task_id', - 'launch_type', 'project__scm_update_cache_timeout', - 'project__scm_update_on_launch', - ) - model = ProjectUpdate - - def get_job_type_str(self): - return 'project_update' - - def task_impact(self): - return 10 - - @classmethod - def filter_partial(cls, status=[]): - kv = { - 'status__in': status, - 'job_type': 'check', - } - return [cls(o) for o in cls.model.objects.filter(**kv).values(*cls.get_db_values())] - - -class ProjectUpdateLatestDict(ProjectUpdateDict): - FIELDS = ( - 'id', 'status', 'project_id', 'created', 'finished', - 'project__scm_update_cache_timeout', - 'launch_type', 'project__scm_update_on_launch', - ) - model = ProjectUpdate - - @classmethod - def filter_partial(cls, project_ids): - # TODO: This can shurley be made more efficient - # * shouldn't have to do a query per inventory_id - # * shouldn't have to call .values() on all the results, only to get the first result - results = [] - for project_id in project_ids: - qs = cls.model.objects.filter(project_id=project_id, status__in=['waiting', 'successful', 'failed']).order_by('-finished', '-started', '-created',) - if qs.count() > 0: - results.append(cls(cls.model.objects.filter(id=qs[0].id).values(*cls.get_db_values())[0])) - return results - - -class InventoryUpdateDict(PartialModelDict): - #'inventory_source__update_on_launch', - #'inventory_source__update_cache_timeout', - FIELDS = ( - 'id', 'status', 'created', 'celery_task_id', 'inventory_source_id', - 'inventory_source__inventory_id', - ) - model = InventoryUpdate - - def get_job_type_str(self): - return 'inventory_update' - - def task_impact(self): - return 20 - - -class InventoryUpdateLatestDict(InventoryUpdateDict): - #'inventory_source__update_on_launch', - #'inventory_source__update_cache_timeout', - FIELDS = ( - 'id', 'status', 'created', 'celery_task_id', 'inventory_source_id', - 'finished', 'inventory_source__update_cache_timeout', 'launch_type', - 'inventory_source__update_on_launch', - ) - model = InventoryUpdate - - @classmethod - def filter_partial(cls, inventory_ids): - # TODO: This can shurley be made more efficient - # * shouldn't have to do a query per inventory_id nor per inventory_source_id - # * shouldn't have to call .values() on all the results, only to get the first result - results = [] - for inventory_id in inventory_ids: - inventory_source_ids = InventorySource.objects.filter(inventory_id=inventory_id, - update_on_launch=True).values_list('id', flat=True) - # Find the most recent inventory update for each inventory source - for inventory_source_id in inventory_source_ids: - qs = cls.model.objects.filter(inventory_source_id=inventory_source_id, - status__in=['waiting', 'successful', 'failed'], - inventory_source__update_on_launch=True).order_by('-finished', '-started', '-created') - if qs.count() > 0: - results.append(cls(cls.model.objects.filter(id=qs[0].id).values(*cls.get_db_values())[0])) - return results - - -class InventorySourceDict(PartialModelDict): - FIELDS = ( - 'id', - ) - model = InventorySource - - def get_job_type_str(self): - return 'inventory_source' - - def task_impact(self): - return 20 - - @classmethod - # TODO: Optimize this to run the query once - def filter_partial(cls, inventory_id): - kv = { - 'inventory_id': inventory_id, - 'update_on_launch': True, - } - return [cls(o) for o in cls.model.objects.filter(**kv).values(*cls.get_db_values())] - - -class SystemJobDict(PartialModelDict): - FIELDS = ( - 'id', 'created', 'status', 'celery_task_id', - ) - model = SystemJob - - def get_job_type_str(self): - return 'system_job' - - def task_impact(self): - return 20 - - @classmethod - def filter_partial(cls, status=[]): - kv = { - 'status__in': status - } - return [cls(o) for o in cls.model.objects.filter(**kv).values(*cls.get_db_values())] - - -class AdHocCommandDict(PartialModelDict): - FIELDS = ( - 'id', 'created', 'status', 'inventory_id', 'dependent_jobs__id', 'celery_task_id', - ) - model = AdHocCommand - - def get_job_type_str(self): - return 'ad_hoc_command' - - def task_impact(self): - return 20 - - -class WorkflowJobDict(PartialModelDict): - FIELDS = ( - 'id', 'created', 'status', 'workflow_job_template_id', 'allow_simultaneous', - ) - model = WorkflowJob - - def get_job_type_str(self): - return 'workflow_job' - - def task_impact(self): - return 0 diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 3104527b96..e9d7e2ee39 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -201,7 +201,7 @@ def cluster_node_heartbeat(self): -@task(bind=True, queue='default') +@task(bind=True, queue='tower') def tower_periodic_scheduler(self): run_now = now() state = TowerScheduleState.get_solo() @@ -251,7 +251,7 @@ def _send_notification_templates(instance, status_str): job_id=instance.id) -@task(bind=True, queue='default') +@task(bind=True, queue='tower') def handle_work_success(self, result, task_actual): try: instance = UnifiedJob.get_instance_by_type(task_actual['type'], task_actual['id']) @@ -267,7 +267,7 @@ def handle_work_success(self, result, task_actual): run_job_complete.delay(instance.id) -@task(bind=True, queue='default') +@task(bind=True, queue='tower') def handle_work_error(self, task_id, subtasks=None): print('Executing error task id %s, subtasks: %s' % (str(self.request.id), str(subtasks))) @@ -307,7 +307,7 @@ def handle_work_error(self, task_id, subtasks=None): pass -@task(queue='default') +@task(queue='tower') def update_inventory_computed_fields(inventory_id, should_update_hosts=True): ''' Signal handler and wrapper around inventory.update_computed_fields to @@ -1131,6 +1131,7 @@ class RunJob(BaseTask): _eager_fields=dict( job_type='run', status='running', + instance_group = job.instance_group, celery_task_id=job_request_id)) # save the associated job before calling run() so that a # cancel() call on the job can cancel the project update diff --git a/tools/docker-compose.yml b/tools/docker-compose.yml index da7abb7472..bebe7c5bc7 100644 --- a/tools/docker-compose.yml +++ b/tools/docker-compose.yml @@ -33,7 +33,7 @@ services: dockerfile: Dockerfile-logstash # Postgres Database Container postgres: - image: postgres:9.4.1 + image: postgres:9.6 memcached: image: memcached:alpine ports: