From 35e4bd477c7b8c364166909d5a0f89a46250fb9a Mon Sep 17 00:00:00 2001 From: Chris Church Date: Fri, 28 Mar 2014 22:07:24 -0400 Subject: [PATCH] AC-1040 Cleanup old code in tasks, combine methods from unified job subclasses into base classes. --- awx/api/views.py | 2 +- .../management/commands/run_task_system.py | 2 +- awx/main/models/inventory.py | 43 +---- awx/main/models/jobs.py | 53 +----- awx/main/models/projects.py | 42 +--- awx/main/models/schedules.py | 28 ++- awx/main/models/unified_jobs.py | 91 +++++++-- awx/main/tasks.py | 179 ++++++------------ 8 files changed, 172 insertions(+), 268 deletions(-) diff --git a/awx/api/views.py b/awx/api/views.py index 98d397469f..99fb82b9a2 100644 --- a/awx/api/views.py +++ b/awx/api/views.py @@ -491,7 +491,7 @@ class ProjectUpdateView(GenericAPIView): def post(self, request, *args, **kwargs): obj = self.get_object() if obj.can_update: - project_update = obj.update(**request.DATA) + project_update = obj.update() if not project_update: return Response({}, status=status.HTTP_400_BAD_REQUEST) else: diff --git a/awx/main/management/commands/run_task_system.py b/awx/main/management/commands/run_task_system.py index 72c5c97515..0b5f2329b8 100644 --- a/awx/main/management/commands/run_task_system.py +++ b/awx/main/management/commands/run_task_system.py @@ -100,7 +100,7 @@ class SimpleDAG(object): def get_node_type(self, obj): if type(obj) == Job: - return "ansible_playbook" + return "job" elif type(obj) == InventoryUpdate: return "inventory_update" elif type(obj) == ProjectUpdate: diff --git a/awx/main/models/inventory.py b/awx/main/models/inventory.py index 57ce5158c2..4f7eddaea8 100644 --- a/awx/main/models/inventory.py +++ b/awx/main/models/inventory.py @@ -655,11 +655,10 @@ class InventorySource(UnifiedJobTemplate, InventorySourceOptions): return reverse('api:inventory_source_detail', args=(self.pk,)) def _can_update(self): - # FIXME: Prevent update when another one is active! - return bool(self.source) + return bool(self.source in CLOUD_INVENTORY_SOURCES) def create_inventory_update(self, **kwargs): - return self._create_unified_job_instance(**kwargs) + return self.create_unified_job(**kwargs) @property def needs_update_on_launch(self): @@ -670,21 +669,6 @@ class InventorySource(UnifiedJobTemplate, InventorySourceOptions): return True return False - def update_signature(self, **kwargs): - if self.can_update: - inventory_update = self.create_inventory_update() - inventory_update_sig = inventory_update.start_signature() - return (inventory_update, inventory_update_sig) - - def update(self, schedule=None, **kwargs): - if self.can_update: - inventory_update = self.create_inventory_update() - if hasattr(settings, 'CELERY_UNIT_TEST'): - inventory_update.start(None, **kwargs) - else: - inventory_update.signal_start(schedule=schedule, **kwargs) - return inventory_update - class InventoryUpdate(UnifiedJob, InventorySourceOptions): ''' @@ -727,6 +711,8 @@ class InventoryUpdate(UnifiedJob, InventorySourceOptions): return reverse('api:inventory_update_detail', args=(self.pk,)) def is_blocked_by(self, obj): + # FIXME: Block update when any other update is touching the same inventory! + # FIXME: Block update when any job is running using this inventory! if type(obj) == InventoryUpdate: if self.inventory_source == obj.inventory_source: return True @@ -735,24 +721,3 @@ class InventoryUpdate(UnifiedJob, InventorySourceOptions): @property def task_impact(self): return 50 - - def signal_start(self, schedule=None, **kwargs): - from awx.main.tasks import notify_task_runner - if schedule: - self.schedule=schedule - self.save() - if not self.can_start: - return False - needed = self._get_passwords_needed_to_start() - opts = dict([(field, kwargs.get(field, '')) for field in needed]) - if not all(opts.values()): - return False - - json_args = json.dumps(kwargs) - self.start_args = json_args - self.save() - self.start_args = encrypt_field(self, 'start_args') - self.status = 'pending' - self.save() - # notify_task_runner.delay(dict(task_type="inventory_update", id=self.id, metadata=kwargs)) - return True diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index ec82aa45d6..87c5ec2f0a 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -159,7 +159,7 @@ class JobTemplate(UnifiedJobTemplate, JobOptions): ''' Create a new job based on this template. ''' - return self._create_unified_job_instance(**kwargs) + return self.create_unified_job(**kwargs) def get_absolute_url(self): return reverse('api:job_template_detail', args=(self.pk,)) @@ -178,6 +178,9 @@ class JobTemplate(UnifiedJobTemplate, JobOptions): needed.append(pw) return bool(self.credential and not len(needed)) + def _can_update(self): + return self.can_start_without_user_input() + class Job(UnifiedJob, JobOptions): ''' @@ -235,7 +238,7 @@ class Job(UnifiedJob, JobOptions): needed.append(pw) return needed - def _get_passwords_needed_to_start(self): + def get_passwords_needed_to_start(self): return self.passwords_needed_to_start def _get_hosts(self, **kwargs): @@ -319,52 +322,6 @@ class Job(UnifiedJob, JobOptions): dependencies.append(source.create_inventory_update(launch_type='dependency')) return dependencies - def signal_start(self, schedule=None, **kwargs): - from awx.main.tasks import notify_task_runner - if schedule: - self.schedule = schedule - self.save() - if hasattr(settings, 'CELERY_UNIT_TEST'): - return self.start(None, **kwargs) - if not self.can_start: - return False - needed = self._get_passwords_needed_to_start() - opts = dict([(field, kwargs.get(field, '')) for field in needed]) - if not all(opts.values()): - return False - - json_args = json.dumps(kwargs) - self.start_args = json_args - self.save() - self.start_args = encrypt_field(self, 'start_args') - self.status = 'pending' - self.save() - # notify_task_runner.delay(dict(task_type="ansible_playbook", id=self.id)) - return True - - def start(self, error_callback, **kwargs): - from awx.main.tasks import handle_work_error - task_class = self._get_task_class() - if not self.can_start: - self.result_traceback = "Job is not in a startable status: %s, expecting one of %s" % (self.status, str(('new', 'waiting'))) - self.save() - return False - needed = self._get_passwords_needed_to_start() - try: - stored_args = json.loads(decrypt_field(self, 'start_args')) - except Exception, e: - stored_args = None - if stored_args is None or stored_args == '': - opts = dict([(field, kwargs.get(field, '')) for field in needed]) - else: - opts = dict([(field, stored_args.get(field, '')) for field in needed]) - if not all(opts.values()): - self.result_traceback = "Missing needed fields: %s" % str(opts.values()) - self.save() - return False - task_class().apply_async((self.pk,), opts, link_error=error_callback) - return True - class JobHostSummary(CreatedModifiedModel): ''' diff --git a/awx/main/models/projects.py b/awx/main/models/projects.py index 028f5e4c37..400c31f454 100644 --- a/awx/main/models/projects.py +++ b/awx/main/models/projects.py @@ -289,13 +289,12 @@ class Project(UnifiedJobTemplate, ProjectOptions): pass def _can_update(self): - # FIXME: Prevent update when another one is active! - return bool(self.scm_type)# and not self.current_update) + return bool(self.scm_type) def create_project_update(self, **kwargs): if self.scm_delete_on_next_update: kwargs['scm_delete_on_update'] = True - return self._create_unified_job_instance(**kwargs) + return self.create_unified_job(**kwargs) @property def needs_update_on_launch(self): @@ -306,21 +305,6 @@ class Project(UnifiedJobTemplate, ProjectOptions): return True return False - def update_signature(self, **kwargs): - if self.can_update: - project_update = self.create_project_update() - project_update_sig = project_update.start_signature() - return (project_update, project_update_sig) - - def update(self, schedule=None, **kwargs): - if self.can_update: - project_update = self.create_project_update() - if hasattr(settings, 'CELERY_UNIT_TEST'): - project_update.start(None, **kwargs) - else: - project_update.signal_start(schedule=schedule, **kwargs) - return project_update - def get_absolute_url(self): return reverse('api:project_detail', args=(self.pk,)) @@ -350,6 +334,7 @@ class ProjectUpdate(UnifiedJob, ProjectOptions): return RunProjectUpdate def is_blocked_by(self, obj): + # FIXME: Block update when any job is running using this project! if type(obj) == ProjectUpdate: if self.project == obj.project: return True @@ -359,27 +344,6 @@ class ProjectUpdate(UnifiedJob, ProjectOptions): def task_impact(self): return 20 - def signal_start(self, schedule=None, **kwargs): - from awx.main.tasks import notify_task_runner - if schedule: - self.schedule = schedule - self.save() - if not self.can_start: - return False - needed = self._get_passwords_needed_to_start() - opts = dict([(field, kwargs.get(field, '')) for field in needed]) - if not all(opts.values()): - return False - - json_args = json.dumps(kwargs) - self.start_args = json_args - self.save() - self.start_args = encrypt_field(self, 'start_args') - self.status = 'pending' - self.save() - # notify_task_runner.delay(dict(task_type="project_update", id=self.id, metadata=kwargs)) - return True - def get_absolute_url(self): return reverse('api:project_update_detail', args=(self.pk,)) diff --git a/awx/main/models/schedules.py b/awx/main/models/schedules.py index 14283e1cfe..ce6ac0f2f2 100644 --- a/awx/main/models/schedules.py +++ b/awx/main/models/schedules.py @@ -6,6 +6,7 @@ import dateutil.rrule # Django from django.db import models +from django.db.models.query import QuerySet from django.utils.timezone import now, make_aware, get_default_timezone # AWX @@ -16,9 +17,34 @@ logger = logging.getLogger('awx.main.models.schedule') __all__ = ['Schedule'] -class ScheduleManager(models.Manager): + +class ScheduleFilterMethods(object): + + def enabled(self, enabled=True): + return self.filter(enabled=enabled) + + def before(self, dt): + return self.filter(next_run__lt=dt) + + def after(self, dt): + return self.filter(next_run__gt=dt) + + def between(self, begin, end): + return self.after(begin).before(end) + + +class ScheduleQuerySet(ScheduleFilterMethods, QuerySet): pass + +class ScheduleManager(ScheduleFilterMethods, models.Manager): + + use_for_related_objects = True + + def get_query_set(self): + return ScheduleQuerySet(self.model, using=self._db) + + class Schedule(CommonModel): class Meta: diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index fc86540a5a..0ad399c5ea 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -33,6 +33,7 @@ from djcelery.models import TaskMeta # AWX from awx.main.models.base import * +from awx.main.utils import camelcase_to_underscore, encrypt_field, decrypt_field logger = logging.getLogger('awx.main.models.unified_jobs') @@ -224,11 +225,11 @@ class UnifiedJobTemplate(PolymorphicModel, CommonModelNameNotUnique): def can_update(self): return self._can_update() - def update_signature(self, **kwargs): - raise NotImplementedError # Implement in subclass. - def update(self, **kwargs): - raise NotImplementedError # Implement in subclass. + if self.can_update: + unified_job = self.create_unified_job() + unified_job.signal_start(**kwargs) + return unified_job @classmethod def _get_unified_job_class(cls): @@ -244,7 +245,7 @@ class UnifiedJobTemplate(PolymorphicModel, CommonModelNameNotUnique): ''' raise NotImplementedError # Implement in subclass. - def _create_unified_job_instance(self, **kwargs): + def create_unified_job(self, **kwargs): ''' Create a new unified job based on this unified job template. ''' @@ -278,6 +279,8 @@ class UnifiedJob(PolymorphicModel, CommonModelNameNotUnique): ('dependency', _('Dependency')), ] + PASSWORD_FIELDS = ('start_args',) + class Meta: app_label = 'main' @@ -393,6 +396,9 @@ class UnifiedJob(PolymorphicModel, CommonModelNameNotUnique): def _get_parent_field_name(cls): return 'unified_job_template' # Override in subclasses. + def _get_type(self): + return camelcase_to_underscore(self._meta.object_name) + def __unicode__(self): return u'%s-%s-%s' % (self.created, self.id, self.status) @@ -416,9 +422,23 @@ class UnifiedJob(PolymorphicModel, CommonModelNameNotUnique): 'last_job_failed']) def save(self, *args, **kwargs): + new_instance = not bool(self.pk) # If update_fields has been specified, add our field names to it, # if it hasn't been specified, then we're just doing a normal save. update_fields = kwargs.get('update_fields', []) + # When first saving to the database, don't store any password field + # values, but instead save them until after the instance is created. + # Otherwise, store encrypted values to the database. + for field in self.PASSWORD_FIELDS: + if new_instance: + value = getattr(self, field, '') + setattr(self, '_saved_%s' % field, value) + setattr(self, field, '') + else: + encrypted = encrypt_field(self, field) + setattr(self, field, encrypted) + if field not in update_fields: + update_fields.append(field) # Get status before save... status_before = self.status or 'new' if self.pk: @@ -452,6 +472,15 @@ class UnifiedJob(PolymorphicModel, CommonModelNameNotUnique): if 'unified_job_template' not in update_fields: update_fields.append('unified_job_template') super(UnifiedJob, self).save(*args, **kwargs) + # After saving a new instance for the first time, set the password + # fields and save again. + if new_instance: + update_fields = [] + for field in self.PASSWORD_FIELDS: + saved_value = getattr(self, '_saved_%s' % field, '') + setattr(self, field, saved_value) + update_fields.append(field) + self.save(update_fields=update_fields) # If status changed, update parent instance.... if self.status != status_before: self._update_parent_instance() @@ -483,48 +512,68 @@ class UnifiedJob(PolymorphicModel, CommonModelNameNotUnique): except TaskMeta.DoesNotExist: pass + def get_passwords_needed_to_start(self): + return [] + @property def can_start(self): return bool(self.status in ('new', 'waiting')) @property def task_impact(self): - raise NotImplementedError - - def _get_passwords_needed_to_start(self): - return [] + raise NotImplementedError # Implement in subclass. def is_blocked_by(self, task_object): ''' Given another task object determine if this task would be blocked by it ''' - raise NotImplementedError + raise NotImplementedError # Implement in subclass. def generate_dependencies(self, active_tasks): ''' Generate any tasks that the current task might be dependent on given a list of active tasks that might preclude creating one''' return [] - def signal_start(self): - ''' Notify the task runner system to begin work on this task ''' - raise NotImplementedError - def start(self, error_callback, **kwargs): + ''' + Start the task running via Celery. + ''' task_class = self._get_task_class() if not self.can_start: + self.result_traceback = "Job is not in a startable status: %s, expecting one of %s" % (self.status, str(('new', 'waiting'))) + self.save() return False - needed = self._get_passwords_needed_to_start() + needed = self.get_passwords_needed_to_start() try: - stored_args = json.loads(decrypt_field(self, 'start_args')) + start_args = json.loads(decrypt_field(self, 'start_args')) except Exception, e: - stored_args = None - if stored_args is None or stored_args == '': - opts = dict([(field, kwargs.get(field, '')) for field in needed]) - else: - opts = dict([(field, stored_args.get(field, '')) for field in needed]) + start_args = None + if start_args in (None, ''): + start_args = kwargs + opts = dict([(field, start_args.get(field, '')) for field in needed]) if not all(opts.values()): + missing_fields = ', '.join([k for k,v in opts.items() if not v]) + self.result_traceback = "Missing needed fields: %s" % missing_fields + self.save() return False task_class().apply_async((self.pk,), opts, link_error=error_callback) return True + def signal_start(self, **kwargs): + ''' + Notify the task runner system to begin work on this task. + ''' + from awx.main.tasks import notify_task_runner + if hasattr(settings, 'CELERY_UNIT_TEST'): + return self.start(None, **kwargs) + if not self.can_start: + return False + needed = self.get_passwords_needed_to_start() + opts = dict([(field, kwargs.get(field, '')) for field in needed]) + if not all(opts.values()): + return False + self.update_fields(start_args=json.dumps(kwargs), status='pending') + # notify_task_runner.delay(dict(task_type=self._get_type(), id=self.id, metadata=kwargs)) + return True + @property def can_cancel(self): return bool(self.status in ('pending', 'waiting', 'running')) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index a67b397fea..69709d4e9b 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -4,15 +4,12 @@ # Python import ConfigParser import cStringIO -import datetime from distutils.version import StrictVersion as Version -import functools import json import logging import os import pipes import re -import subprocess import stat import tempfile import time @@ -26,21 +23,15 @@ import pexpect # ZMQ import zmq -# Kombu -from kombu import Connection, Exchange, Queue - # Celery -from celery import Celery, Task, task -from celery.execute import send_task -from djcelery.models import PeriodicTask, TaskMeta +from celery import Task, task +from djcelery.models import PeriodicTask # Django from django.conf import settings from django.db import transaction, DatabaseError from django.utils.datastructures import SortedDict -from django.utils.dateparse import parse_datetime from django.utils.timezone import now -from django.utils.tzinfo import FixedOffset # AWX from awx.main.models import * # Job, JobEvent, ProjectUpdate, InventoryUpdate, Schedule, UnifiedJobTemplate @@ -48,34 +39,28 @@ from awx.main.utils import get_ansible_version, decrypt_field, update_scm_url __all__ = ['RunJob', 'RunProjectUpdate', 'RunInventoryUpdate', 'handle_work_error'] +HIDDEN_PASSWORD = '**********' + logger = logging.getLogger('awx.main.tasks') # FIXME: Cleanly cancel task when celery worker is stopped. @task(bind=True) def tower_periodic_scheduler(self): - run_now = now() - periodic_task = PeriodicTask.objects.get(task='awx.main.tasks.tower_periodic_scheduler') - logger.debug("Last run was: " + str(periodic_task.last_run_at)) + run_now = now() + try: + periodic_task = PeriodicTask.objects.filter(task='awx.main.tasks.tower_periodic_scheduler')[0] + except IndexError: + logger.warning('No PeriodicTask found for tower_periodic_scheduler') + return + logger.debug("Last run was: %s", periodic_task.last_run_at) # TODO: Cleanup jobs that we missed - jobs_matching_schedules = Schedule.objects.filter(enabled=True, - next_run__gt=periodic_task.last_run_at, next_run__lte=run_now) - for match in jobs_matching_schedules: - template = match.unified_job_template - match.save() - if type(template) == Project: - new_project_update = template.create_project_update(launch_type="scheduled") - new_project_update.signal_start(schedule=match) - elif type(template) == InventorySource: - new_inventory_update = template.create_inventory_update(launch_type="scheduled") - new_inventory_update.signal_start(schedule=match) - elif type(template) == JobTemplate: - new_job = template.create_job() - new_job.launch_type = "scheduled" - new_job.save() - new_job.signal_start(schedule=match) - else: - logger.error("Unknown task type: " + str(type(template))) + schedules = Schedule.objects.enabled().between(periodic_task.last_run_at, run_now) + for schedule in schedules: + template = schedule.unified_job_template + schedule.save() # To update next_run timestamp. + new_unified_job = template.create_unified_job(launch_type='scheduled', schedule=schedule) + new_unified_job.signal_start() periodic_task.last_run_at = run_now periodic_task.save() @@ -101,7 +86,7 @@ def handle_work_error(self, task_id, subtasks=None): elif each_task['type'] == 'inventory_update': instance = InventoryUpdate.objects.get(id=each_task['id']) instance_name = instance.inventory_source.inventory.name - elif each_task['type'] == 'ansible_playbook': + elif each_task['type'] == 'job': instance = Job.objects.get(id=each_task['id']) instance_name = instance.job_template.name else: @@ -163,9 +148,6 @@ class BaseTask(Task): pass # notify_task_runner(dict(complete=pk)) - def get_model(self, pk): - return self.model.objects.get(pk=pk) - def get_path_to(self, *args): ''' Return absolute path relative to this file. @@ -201,7 +183,6 @@ class BaseTask(Task): 'yes': 'yes', 'no': 'no', '': '', - } def build_env(self, instance, **kwargs): @@ -228,25 +209,34 @@ class BaseTask(Task): return env def build_safe_env(self, instance, **kwargs): - hidden_re = re.compile(r'API|TOKEN|KEY|SECRET|PASS') + ''' + Build environment dictionary, hiding potentially sensitive information + such as passwords or keys. + ''' + hidden_re = re.compile(r'API|TOKEN|KEY|SECRET|PASS', re.I) urlpass_re = re.compile(r'^.*?://.?:(.*?)@.*?$') env = self.build_env(instance, **kwargs) for k,v in env.items(): - if k == 'BROKER_URL': - m = urlpass_re.match(v) - if m: - env[k] = urlpass_re.sub('*'*len(m.groups()[0]), v) - elif k in ('REST_API_URL', 'AWS_ACCESS_KEY', 'AWS_ACCESS_KEY_ID'): + if k in ('REST_API_URL', 'AWS_ACCESS_KEY', 'AWS_ACCESS_KEY_ID'): continue elif k.startswith('ANSIBLE_'): continue elif hidden_re.search(k): - env[k] = '*'*len(str(v)) + env[k] = HIDDEN_PASSWORD + elif urlpass_re.match(v): + env[k] = urlpass_re.sub(HIDDEN_PASSWORD, v) return env def args2cmdline(self, *args): return ' '.join([pipes.quote(a) for a in args]) + def wrap_args_with_ssh_agent(self, args, ssh_key_path): + if ssh_key_path: + cmd = ' && '.join([self.args2cmdline('ssh-add', ssh_key_path), + self.args2cmdline(*args)]) + args = ['ssh-agent', 'sh', '-c', cmd] + return args + def build_args(self, instance, **kwargs): raise NotImplementedError @@ -264,19 +254,19 @@ class BaseTask(Task): def get_password_prompts(self): ''' - Return a dictionary of prompt regular expressions and password lookup - keys. + Return a dictionary where keys are strings or regular expressions for + prompts, and values are password lookup keys (keys that are returned + from build_passwords). ''' return SortedDict() - def run_pexpect(self, instance, args, cwd, env, passwords, task_stdout_handle, + def run_pexpect(self, instance, args, cwd, env, passwords, stdout_handle, output_replacements=None): ''' Run the given command using pexpect to capture output and provide passwords when requested. ''' - status, stdout = 'error', '' - logfile = task_stdout_handle + logfile = stdout_handle logfile_pos = logfile.tell() child = pexpect.spawnu(args[0], args[1:], cwd=cwd, env=env) child.logfile_read = logfile @@ -299,7 +289,7 @@ class BaseTask(Task): if logfile_pos != logfile.tell(): logfile_pos = logfile.tell() last_stdout_update = time.time() - # NOTE: In case revoke doesn't have an affect + # Refresh model instance from the database (to check cancel flag). instance = self.update_model(instance.pk) if instance.cancel_flag: child.terminate(canceled) @@ -308,34 +298,28 @@ class BaseTask(Task): child.close(True) canceled = True if canceled: - status = 'canceled' + return 'canceled' elif child.exitstatus == 0: - status = 'successful' + return 'successful' else: - status = 'failed' - return status, stdout - - def pre_run_check(self, instance, **kwargs): - ''' - Hook for checking job/task before running. - ''' - if instance.status != 'running': - return False - # TODO: Check that we can write to the stdout data directory - return True + return 'failed' def post_run_hook(self, instance, **kwargs): - pass + ''' + Hook for any steps to run after job/task is complete. + ''' def run(self, pk, **kwargs): ''' Run the job/task and capture its output. ''' instance = self.update_model(pk, status='running', celery_task_id=self.request.id) - status, stdout, tb = 'error', '', '' + status, tb = 'error', '' output_replacements = [] try: - if not self.pre_run_check(instance, **kwargs): + if instance.cancel_flag: + instance = self.update_model(instance.pk, status='canceled') + if instance.status != 'running': if hasattr(settings, 'CELERY_UNIT_TEST'): return else: @@ -360,7 +344,7 @@ class BaseTask(Task): stdout_handle = open(stdout_filename, 'w') instance = self.update_model(pk, job_args=json.dumps(safe_args), job_cwd=cwd, job_env=safe_env, result_stdout_file=stdout_filename) - status, stdout = self.run_pexpect(instance, args, cwd, env, kwargs['passwords'], stdout_handle) + status = self.run_pexpect(instance, args, cwd, env, kwargs['passwords'], stdout_handle) except Exception: if status != 'canceled': tb = traceback.format_exc() @@ -374,8 +358,7 @@ class BaseTask(Task): stdout_handle.close() except Exception: pass - instance = self.update_model(pk, status=status, - result_traceback=tb, + instance = self.update_model(pk, status=status, result_traceback=tb, output_replacements=output_replacements) self.post_run_hook(instance, **kwargs) if status != 'successful' and not hasattr(settings, 'CELERY_UNIT_TEST'): @@ -388,6 +371,7 @@ class BaseTask(Task): if not hasattr(settings, 'CELERY_UNIT_TEST'): self.signal_finished(pk) + class RunJob(BaseTask): ''' Celery task to run a job using ansible-playbook. @@ -520,10 +504,8 @@ class RunJob(BaseTask): # If ssh unlock password is needed, run using ssh-agent. if ssh_key_path and use_ssh_agent: - cmd = ' '.join([self.args2cmdline('ssh-add', ssh_key_path), - '&&', self.args2cmdline(*args)]) - args = ['ssh-agent', 'sh', '-c', cmd] - + args = self.wrap_args_with_ssh_agent(args, ssh_key_path) + return args def build_cwd(self, job, **kwargs): @@ -547,18 +529,6 @@ class RunJob(BaseTask): d[re.compile(r'^Vault password:\s*?$', re.M)] = 'vault_password' return d - def pre_run_check(self, job, **kwargs): - ''' - Hook for checking job before running. - ''' - if job.cancel_flag: - job = self.update_model(job.pk, status='canceled') - return False - elif job.status == 'running': - return True - else: - return False - def post_run_hook(self, job, **kwargs): ''' Hook for actions to run after job/task has completed. @@ -570,6 +540,7 @@ class RunJob(BaseTask): for job_event in job.job_events.order_by('pk'): job_event.save(post_process=True) + class RunProjectUpdate(BaseTask): name = 'awx.main.tasks.run_project_update' @@ -624,7 +595,6 @@ class RunProjectUpdate(BaseTask): scm_password = scm_url_parts.password or scm_password or '' if scm_username: if scm_type == 'svn': - # FIXME: Need to somehow escape single/double quotes in username/password extra_vars['scm_username'] = scm_username extra_vars['scm_password'] = scm_password scm_password = False @@ -673,9 +643,8 @@ class RunProjectUpdate(BaseTask): # If using an SSH key, run using ssh-agent. ssh_key_path = kwargs.get('private_data_file', '') if ssh_key_path: - subcmds = [('ssh-add', ssh_key_path), args] - cmd = ' && '.join([self.args2cmdline(*x) for x in subcmds]) - args = ['ssh-agent', 'sh', '-c', cmd] + args = self.wrap_args_with_ssh_agent(args, ssh_key_path) + return args def build_safe_args(self, project_update, **kwargs): @@ -683,7 +652,7 @@ class RunProjectUpdate(BaseTask): for pw_name, pw_val in pwdict.items(): if pw_name in ('', 'yes', 'no', 'scm_username'): continue - pwdict[pw_name] = '*'*len(pw_val) + pwdict[pw_name] = HIDDEN_PASSWORD kwargs['passwords'] = pwdict return self.build_args(project_update, **kwargs) @@ -704,7 +673,7 @@ class RunProjectUpdate(BaseTask): for pw_name, pw_val in pwdict.items(): if pw_name in ('', 'yes', 'no', 'scm_username'): continue - pwdict[pw_name] = '*'*len(pw_val) + pwdict[pw_name] = HIDDEN_PASSWORD kwargs['passwords'] = pwdict after_url = self._build_scm_url_extra_vars(project_update, **kwargs)[0] @@ -717,7 +686,7 @@ class RunProjectUpdate(BaseTask): } d_after = { 'username': scm_username, - 'password': '*'*len(scm_password), + 'password': HIDDEN_PASSWORD, } pattern1 = "username=\"%(username)s\" password=\"%(password)s\"" pattern2 = "--username '%(username)s' --password '%(password)s'" @@ -740,18 +709,6 @@ class RunProjectUpdate(BaseTask): def get_idle_timeout(self): return getattr(settings, 'PROJECT_UPDATE_IDLE_TIMEOUT', None) - def pre_run_check(self, project_update, **kwargs): - ''' - Hook for checking project update before running. - ''' - while True: - if project_update.cancel_flag: - project_update = self.update_model(project_update.pk, status='canceled') - return False - elif project_update.status == 'running': - return True - else: - return False class RunInventoryUpdate(BaseTask): @@ -828,7 +785,6 @@ class RunInventoryUpdate(BaseTask): elif inventory_update.source == 'file': # FIXME: Parse source_env to dict, update env. pass - #print env return env def build_args(self, inventory_update, **kwargs): @@ -869,16 +825,3 @@ class RunInventoryUpdate(BaseTask): def get_idle_timeout(self): return getattr(settings, 'INVENTORY_UPDATE_IDLE_TIMEOUT', None) - - def pre_run_check(self, inventory_update, **kwargs): - ''' - Hook for checking inventory update before running. - ''' - while True: - if inventory_update.cancel_flag: - inventory_update = self.update_model(inventory_update.pk, status='canceled') - return False - elif inventory_update.status == 'running': - return True - else: - return False