AC-1040 Cleanup old code in tasks, combine methods from unified job subclasses into base classes.

This commit is contained in:
Chris Church
2014-03-28 22:07:24 -04:00
parent 352197c5d2
commit 35e4bd477c
8 changed files with 172 additions and 268 deletions

View File

@@ -491,7 +491,7 @@ class ProjectUpdateView(GenericAPIView):
def post(self, request, *args, **kwargs): def post(self, request, *args, **kwargs):
obj = self.get_object() obj = self.get_object()
if obj.can_update: if obj.can_update:
project_update = obj.update(**request.DATA) project_update = obj.update()
if not project_update: if not project_update:
return Response({}, status=status.HTTP_400_BAD_REQUEST) return Response({}, status=status.HTTP_400_BAD_REQUEST)
else: else:

View File

@@ -100,7 +100,7 @@ class SimpleDAG(object):
def get_node_type(self, obj): def get_node_type(self, obj):
if type(obj) == Job: if type(obj) == Job:
return "ansible_playbook" return "job"
elif type(obj) == InventoryUpdate: elif type(obj) == InventoryUpdate:
return "inventory_update" return "inventory_update"
elif type(obj) == ProjectUpdate: elif type(obj) == ProjectUpdate:

View File

@@ -655,11 +655,10 @@ class InventorySource(UnifiedJobTemplate, InventorySourceOptions):
return reverse('api:inventory_source_detail', args=(self.pk,)) return reverse('api:inventory_source_detail', args=(self.pk,))
def _can_update(self): def _can_update(self):
# FIXME: Prevent update when another one is active! return bool(self.source in CLOUD_INVENTORY_SOURCES)
return bool(self.source)
def create_inventory_update(self, **kwargs): def create_inventory_update(self, **kwargs):
return self._create_unified_job_instance(**kwargs) return self.create_unified_job(**kwargs)
@property @property
def needs_update_on_launch(self): def needs_update_on_launch(self):
@@ -670,21 +669,6 @@ class InventorySource(UnifiedJobTemplate, InventorySourceOptions):
return True return True
return False return False
def update_signature(self, **kwargs):
if self.can_update:
inventory_update = self.create_inventory_update()
inventory_update_sig = inventory_update.start_signature()
return (inventory_update, inventory_update_sig)
def update(self, schedule=None, **kwargs):
if self.can_update:
inventory_update = self.create_inventory_update()
if hasattr(settings, 'CELERY_UNIT_TEST'):
inventory_update.start(None, **kwargs)
else:
inventory_update.signal_start(schedule=schedule, **kwargs)
return inventory_update
class InventoryUpdate(UnifiedJob, InventorySourceOptions): class InventoryUpdate(UnifiedJob, InventorySourceOptions):
''' '''
@@ -727,6 +711,8 @@ class InventoryUpdate(UnifiedJob, InventorySourceOptions):
return reverse('api:inventory_update_detail', args=(self.pk,)) return reverse('api:inventory_update_detail', args=(self.pk,))
def is_blocked_by(self, obj): def is_blocked_by(self, obj):
# FIXME: Block update when any other update is touching the same inventory!
# FIXME: Block update when any job is running using this inventory!
if type(obj) == InventoryUpdate: if type(obj) == InventoryUpdate:
if self.inventory_source == obj.inventory_source: if self.inventory_source == obj.inventory_source:
return True return True
@@ -735,24 +721,3 @@ class InventoryUpdate(UnifiedJob, InventorySourceOptions):
@property @property
def task_impact(self): def task_impact(self):
return 50 return 50
def signal_start(self, schedule=None, **kwargs):
from awx.main.tasks import notify_task_runner
if schedule:
self.schedule=schedule
self.save()
if not self.can_start:
return False
needed = self._get_passwords_needed_to_start()
opts = dict([(field, kwargs.get(field, '')) for field in needed])
if not all(opts.values()):
return False
json_args = json.dumps(kwargs)
self.start_args = json_args
self.save()
self.start_args = encrypt_field(self, 'start_args')
self.status = 'pending'
self.save()
# notify_task_runner.delay(dict(task_type="inventory_update", id=self.id, metadata=kwargs))
return True

View File

@@ -159,7 +159,7 @@ class JobTemplate(UnifiedJobTemplate, JobOptions):
''' '''
Create a new job based on this template. Create a new job based on this template.
''' '''
return self._create_unified_job_instance(**kwargs) return self.create_unified_job(**kwargs)
def get_absolute_url(self): def get_absolute_url(self):
return reverse('api:job_template_detail', args=(self.pk,)) return reverse('api:job_template_detail', args=(self.pk,))
@@ -178,6 +178,9 @@ class JobTemplate(UnifiedJobTemplate, JobOptions):
needed.append(pw) needed.append(pw)
return bool(self.credential and not len(needed)) return bool(self.credential and not len(needed))
def _can_update(self):
return self.can_start_without_user_input()
class Job(UnifiedJob, JobOptions): class Job(UnifiedJob, JobOptions):
''' '''
@@ -235,7 +238,7 @@ class Job(UnifiedJob, JobOptions):
needed.append(pw) needed.append(pw)
return needed return needed
def _get_passwords_needed_to_start(self): def get_passwords_needed_to_start(self):
return self.passwords_needed_to_start return self.passwords_needed_to_start
def _get_hosts(self, **kwargs): def _get_hosts(self, **kwargs):
@@ -319,52 +322,6 @@ class Job(UnifiedJob, JobOptions):
dependencies.append(source.create_inventory_update(launch_type='dependency')) dependencies.append(source.create_inventory_update(launch_type='dependency'))
return dependencies return dependencies
def signal_start(self, schedule=None, **kwargs):
from awx.main.tasks import notify_task_runner
if schedule:
self.schedule = schedule
self.save()
if hasattr(settings, 'CELERY_UNIT_TEST'):
return self.start(None, **kwargs)
if not self.can_start:
return False
needed = self._get_passwords_needed_to_start()
opts = dict([(field, kwargs.get(field, '')) for field in needed])
if not all(opts.values()):
return False
json_args = json.dumps(kwargs)
self.start_args = json_args
self.save()
self.start_args = encrypt_field(self, 'start_args')
self.status = 'pending'
self.save()
# notify_task_runner.delay(dict(task_type="ansible_playbook", id=self.id))
return True
def start(self, error_callback, **kwargs):
from awx.main.tasks import handle_work_error
task_class = self._get_task_class()
if not self.can_start:
self.result_traceback = "Job is not in a startable status: %s, expecting one of %s" % (self.status, str(('new', 'waiting')))
self.save()
return False
needed = self._get_passwords_needed_to_start()
try:
stored_args = json.loads(decrypt_field(self, 'start_args'))
except Exception, e:
stored_args = None
if stored_args is None or stored_args == '':
opts = dict([(field, kwargs.get(field, '')) for field in needed])
else:
opts = dict([(field, stored_args.get(field, '')) for field in needed])
if not all(opts.values()):
self.result_traceback = "Missing needed fields: %s" % str(opts.values())
self.save()
return False
task_class().apply_async((self.pk,), opts, link_error=error_callback)
return True
class JobHostSummary(CreatedModifiedModel): class JobHostSummary(CreatedModifiedModel):
''' '''

View File

@@ -289,13 +289,12 @@ class Project(UnifiedJobTemplate, ProjectOptions):
pass pass
def _can_update(self): def _can_update(self):
# FIXME: Prevent update when another one is active! return bool(self.scm_type)
return bool(self.scm_type)# and not self.current_update)
def create_project_update(self, **kwargs): def create_project_update(self, **kwargs):
if self.scm_delete_on_next_update: if self.scm_delete_on_next_update:
kwargs['scm_delete_on_update'] = True kwargs['scm_delete_on_update'] = True
return self._create_unified_job_instance(**kwargs) return self.create_unified_job(**kwargs)
@property @property
def needs_update_on_launch(self): def needs_update_on_launch(self):
@@ -306,21 +305,6 @@ class Project(UnifiedJobTemplate, ProjectOptions):
return True return True
return False return False
def update_signature(self, **kwargs):
if self.can_update:
project_update = self.create_project_update()
project_update_sig = project_update.start_signature()
return (project_update, project_update_sig)
def update(self, schedule=None, **kwargs):
if self.can_update:
project_update = self.create_project_update()
if hasattr(settings, 'CELERY_UNIT_TEST'):
project_update.start(None, **kwargs)
else:
project_update.signal_start(schedule=schedule, **kwargs)
return project_update
def get_absolute_url(self): def get_absolute_url(self):
return reverse('api:project_detail', args=(self.pk,)) return reverse('api:project_detail', args=(self.pk,))
@@ -350,6 +334,7 @@ class ProjectUpdate(UnifiedJob, ProjectOptions):
return RunProjectUpdate return RunProjectUpdate
def is_blocked_by(self, obj): def is_blocked_by(self, obj):
# FIXME: Block update when any job is running using this project!
if type(obj) == ProjectUpdate: if type(obj) == ProjectUpdate:
if self.project == obj.project: if self.project == obj.project:
return True return True
@@ -359,27 +344,6 @@ class ProjectUpdate(UnifiedJob, ProjectOptions):
def task_impact(self): def task_impact(self):
return 20 return 20
def signal_start(self, schedule=None, **kwargs):
from awx.main.tasks import notify_task_runner
if schedule:
self.schedule = schedule
self.save()
if not self.can_start:
return False
needed = self._get_passwords_needed_to_start()
opts = dict([(field, kwargs.get(field, '')) for field in needed])
if not all(opts.values()):
return False
json_args = json.dumps(kwargs)
self.start_args = json_args
self.save()
self.start_args = encrypt_field(self, 'start_args')
self.status = 'pending'
self.save()
# notify_task_runner.delay(dict(task_type="project_update", id=self.id, metadata=kwargs))
return True
def get_absolute_url(self): def get_absolute_url(self):
return reverse('api:project_update_detail', args=(self.pk,)) return reverse('api:project_update_detail', args=(self.pk,))

View File

@@ -6,6 +6,7 @@ import dateutil.rrule
# Django # Django
from django.db import models from django.db import models
from django.db.models.query import QuerySet
from django.utils.timezone import now, make_aware, get_default_timezone from django.utils.timezone import now, make_aware, get_default_timezone
# AWX # AWX
@@ -16,9 +17,34 @@ logger = logging.getLogger('awx.main.models.schedule')
__all__ = ['Schedule'] __all__ = ['Schedule']
class ScheduleManager(models.Manager):
class ScheduleFilterMethods(object):
def enabled(self, enabled=True):
return self.filter(enabled=enabled)
def before(self, dt):
return self.filter(next_run__lt=dt)
def after(self, dt):
return self.filter(next_run__gt=dt)
def between(self, begin, end):
return self.after(begin).before(end)
class ScheduleQuerySet(ScheduleFilterMethods, QuerySet):
pass pass
class ScheduleManager(ScheduleFilterMethods, models.Manager):
use_for_related_objects = True
def get_query_set(self):
return ScheduleQuerySet(self.model, using=self._db)
class Schedule(CommonModel): class Schedule(CommonModel):
class Meta: class Meta:

View File

@@ -33,6 +33,7 @@ from djcelery.models import TaskMeta
# AWX # AWX
from awx.main.models.base import * from awx.main.models.base import *
from awx.main.utils import camelcase_to_underscore, encrypt_field, decrypt_field
logger = logging.getLogger('awx.main.models.unified_jobs') logger = logging.getLogger('awx.main.models.unified_jobs')
@@ -224,11 +225,11 @@ class UnifiedJobTemplate(PolymorphicModel, CommonModelNameNotUnique):
def can_update(self): def can_update(self):
return self._can_update() return self._can_update()
def update_signature(self, **kwargs):
raise NotImplementedError # Implement in subclass.
def update(self, **kwargs): def update(self, **kwargs):
raise NotImplementedError # Implement in subclass. if self.can_update:
unified_job = self.create_unified_job()
unified_job.signal_start(**kwargs)
return unified_job
@classmethod @classmethod
def _get_unified_job_class(cls): def _get_unified_job_class(cls):
@@ -244,7 +245,7 @@ class UnifiedJobTemplate(PolymorphicModel, CommonModelNameNotUnique):
''' '''
raise NotImplementedError # Implement in subclass. raise NotImplementedError # Implement in subclass.
def _create_unified_job_instance(self, **kwargs): def create_unified_job(self, **kwargs):
''' '''
Create a new unified job based on this unified job template. Create a new unified job based on this unified job template.
''' '''
@@ -278,6 +279,8 @@ class UnifiedJob(PolymorphicModel, CommonModelNameNotUnique):
('dependency', _('Dependency')), ('dependency', _('Dependency')),
] ]
PASSWORD_FIELDS = ('start_args',)
class Meta: class Meta:
app_label = 'main' app_label = 'main'
@@ -393,6 +396,9 @@ class UnifiedJob(PolymorphicModel, CommonModelNameNotUnique):
def _get_parent_field_name(cls): def _get_parent_field_name(cls):
return 'unified_job_template' # Override in subclasses. return 'unified_job_template' # Override in subclasses.
def _get_type(self):
return camelcase_to_underscore(self._meta.object_name)
def __unicode__(self): def __unicode__(self):
return u'%s-%s-%s' % (self.created, self.id, self.status) return u'%s-%s-%s' % (self.created, self.id, self.status)
@@ -416,9 +422,23 @@ class UnifiedJob(PolymorphicModel, CommonModelNameNotUnique):
'last_job_failed']) 'last_job_failed'])
def save(self, *args, **kwargs): def save(self, *args, **kwargs):
new_instance = not bool(self.pk)
# If update_fields has been specified, add our field names to it, # If update_fields has been specified, add our field names to it,
# if it hasn't been specified, then we're just doing a normal save. # if it hasn't been specified, then we're just doing a normal save.
update_fields = kwargs.get('update_fields', []) update_fields = kwargs.get('update_fields', [])
# When first saving to the database, don't store any password field
# values, but instead save them until after the instance is created.
# Otherwise, store encrypted values to the database.
for field in self.PASSWORD_FIELDS:
if new_instance:
value = getattr(self, field, '')
setattr(self, '_saved_%s' % field, value)
setattr(self, field, '')
else:
encrypted = encrypt_field(self, field)
setattr(self, field, encrypted)
if field not in update_fields:
update_fields.append(field)
# Get status before save... # Get status before save...
status_before = self.status or 'new' status_before = self.status or 'new'
if self.pk: if self.pk:
@@ -452,6 +472,15 @@ class UnifiedJob(PolymorphicModel, CommonModelNameNotUnique):
if 'unified_job_template' not in update_fields: if 'unified_job_template' not in update_fields:
update_fields.append('unified_job_template') update_fields.append('unified_job_template')
super(UnifiedJob, self).save(*args, **kwargs) super(UnifiedJob, self).save(*args, **kwargs)
# After saving a new instance for the first time, set the password
# fields and save again.
if new_instance:
update_fields = []
for field in self.PASSWORD_FIELDS:
saved_value = getattr(self, '_saved_%s' % field, '')
setattr(self, field, saved_value)
update_fields.append(field)
self.save(update_fields=update_fields)
# If status changed, update parent instance.... # If status changed, update parent instance....
if self.status != status_before: if self.status != status_before:
self._update_parent_instance() self._update_parent_instance()
@@ -483,48 +512,68 @@ class UnifiedJob(PolymorphicModel, CommonModelNameNotUnique):
except TaskMeta.DoesNotExist: except TaskMeta.DoesNotExist:
pass pass
def get_passwords_needed_to_start(self):
return []
@property @property
def can_start(self): def can_start(self):
return bool(self.status in ('new', 'waiting')) return bool(self.status in ('new', 'waiting'))
@property @property
def task_impact(self): def task_impact(self):
raise NotImplementedError raise NotImplementedError # Implement in subclass.
def _get_passwords_needed_to_start(self):
return []
def is_blocked_by(self, task_object): def is_blocked_by(self, task_object):
''' Given another task object determine if this task would be blocked by it ''' ''' Given another task object determine if this task would be blocked by it '''
raise NotImplementedError raise NotImplementedError # Implement in subclass.
def generate_dependencies(self, active_tasks): def generate_dependencies(self, active_tasks):
''' Generate any tasks that the current task might be dependent on given a list of active ''' Generate any tasks that the current task might be dependent on given a list of active
tasks that might preclude creating one''' tasks that might preclude creating one'''
return [] return []
def signal_start(self):
''' Notify the task runner system to begin work on this task '''
raise NotImplementedError
def start(self, error_callback, **kwargs): def start(self, error_callback, **kwargs):
'''
Start the task running via Celery.
'''
task_class = self._get_task_class() task_class = self._get_task_class()
if not self.can_start: if not self.can_start:
self.result_traceback = "Job is not in a startable status: %s, expecting one of %s" % (self.status, str(('new', 'waiting')))
self.save()
return False return False
needed = self._get_passwords_needed_to_start() needed = self.get_passwords_needed_to_start()
try: try:
stored_args = json.loads(decrypt_field(self, 'start_args')) start_args = json.loads(decrypt_field(self, 'start_args'))
except Exception, e: except Exception, e:
stored_args = None start_args = None
if stored_args is None or stored_args == '': if start_args in (None, ''):
opts = dict([(field, kwargs.get(field, '')) for field in needed]) start_args = kwargs
else: opts = dict([(field, start_args.get(field, '')) for field in needed])
opts = dict([(field, stored_args.get(field, '')) for field in needed])
if not all(opts.values()): if not all(opts.values()):
missing_fields = ', '.join([k for k,v in opts.items() if not v])
self.result_traceback = "Missing needed fields: %s" % missing_fields
self.save()
return False return False
task_class().apply_async((self.pk,), opts, link_error=error_callback) task_class().apply_async((self.pk,), opts, link_error=error_callback)
return True return True
def signal_start(self, **kwargs):
'''
Notify the task runner system to begin work on this task.
'''
from awx.main.tasks import notify_task_runner
if hasattr(settings, 'CELERY_UNIT_TEST'):
return self.start(None, **kwargs)
if not self.can_start:
return False
needed = self.get_passwords_needed_to_start()
opts = dict([(field, kwargs.get(field, '')) for field in needed])
if not all(opts.values()):
return False
self.update_fields(start_args=json.dumps(kwargs), status='pending')
# notify_task_runner.delay(dict(task_type=self._get_type(), id=self.id, metadata=kwargs))
return True
@property @property
def can_cancel(self): def can_cancel(self):
return bool(self.status in ('pending', 'waiting', 'running')) return bool(self.status in ('pending', 'waiting', 'running'))

View File

@@ -4,15 +4,12 @@
# Python # Python
import ConfigParser import ConfigParser
import cStringIO import cStringIO
import datetime
from distutils.version import StrictVersion as Version from distutils.version import StrictVersion as Version
import functools
import json import json
import logging import logging
import os import os
import pipes import pipes
import re import re
import subprocess
import stat import stat
import tempfile import tempfile
import time import time
@@ -26,21 +23,15 @@ import pexpect
# ZMQ # ZMQ
import zmq import zmq
# Kombu
from kombu import Connection, Exchange, Queue
# Celery # Celery
from celery import Celery, Task, task from celery import Task, task
from celery.execute import send_task from djcelery.models import PeriodicTask
from djcelery.models import PeriodicTask, TaskMeta
# Django # Django
from django.conf import settings from django.conf import settings
from django.db import transaction, DatabaseError from django.db import transaction, DatabaseError
from django.utils.datastructures import SortedDict from django.utils.datastructures import SortedDict
from django.utils.dateparse import parse_datetime
from django.utils.timezone import now from django.utils.timezone import now
from django.utils.tzinfo import FixedOffset
# AWX # AWX
from awx.main.models import * # Job, JobEvent, ProjectUpdate, InventoryUpdate, Schedule, UnifiedJobTemplate from awx.main.models import * # Job, JobEvent, ProjectUpdate, InventoryUpdate, Schedule, UnifiedJobTemplate
@@ -48,6 +39,8 @@ from awx.main.utils import get_ansible_version, decrypt_field, update_scm_url
__all__ = ['RunJob', 'RunProjectUpdate', 'RunInventoryUpdate', 'handle_work_error'] __all__ = ['RunJob', 'RunProjectUpdate', 'RunInventoryUpdate', 'handle_work_error']
HIDDEN_PASSWORD = '**********'
logger = logging.getLogger('awx.main.tasks') logger = logging.getLogger('awx.main.tasks')
# FIXME: Cleanly cancel task when celery worker is stopped. # FIXME: Cleanly cancel task when celery worker is stopped.
@@ -55,27 +48,19 @@ logger = logging.getLogger('awx.main.tasks')
@task(bind=True) @task(bind=True)
def tower_periodic_scheduler(self): def tower_periodic_scheduler(self):
run_now = now() run_now = now()
periodic_task = PeriodicTask.objects.get(task='awx.main.tasks.tower_periodic_scheduler') try:
logger.debug("Last run was: " + str(periodic_task.last_run_at)) periodic_task = PeriodicTask.objects.filter(task='awx.main.tasks.tower_periodic_scheduler')[0]
except IndexError:
logger.warning('No PeriodicTask found for tower_periodic_scheduler')
return
logger.debug("Last run was: %s", periodic_task.last_run_at)
# TODO: Cleanup jobs that we missed # TODO: Cleanup jobs that we missed
jobs_matching_schedules = Schedule.objects.filter(enabled=True, schedules = Schedule.objects.enabled().between(periodic_task.last_run_at, run_now)
next_run__gt=periodic_task.last_run_at, next_run__lte=run_now) for schedule in schedules:
for match in jobs_matching_schedules: template = schedule.unified_job_template
template = match.unified_job_template schedule.save() # To update next_run timestamp.
match.save() new_unified_job = template.create_unified_job(launch_type='scheduled', schedule=schedule)
if type(template) == Project: new_unified_job.signal_start()
new_project_update = template.create_project_update(launch_type="scheduled")
new_project_update.signal_start(schedule=match)
elif type(template) == InventorySource:
new_inventory_update = template.create_inventory_update(launch_type="scheduled")
new_inventory_update.signal_start(schedule=match)
elif type(template) == JobTemplate:
new_job = template.create_job()
new_job.launch_type = "scheduled"
new_job.save()
new_job.signal_start(schedule=match)
else:
logger.error("Unknown task type: " + str(type(template)))
periodic_task.last_run_at = run_now periodic_task.last_run_at = run_now
periodic_task.save() periodic_task.save()
@@ -101,7 +86,7 @@ def handle_work_error(self, task_id, subtasks=None):
elif each_task['type'] == 'inventory_update': elif each_task['type'] == 'inventory_update':
instance = InventoryUpdate.objects.get(id=each_task['id']) instance = InventoryUpdate.objects.get(id=each_task['id'])
instance_name = instance.inventory_source.inventory.name instance_name = instance.inventory_source.inventory.name
elif each_task['type'] == 'ansible_playbook': elif each_task['type'] == 'job':
instance = Job.objects.get(id=each_task['id']) instance = Job.objects.get(id=each_task['id'])
instance_name = instance.job_template.name instance_name = instance.job_template.name
else: else:
@@ -163,9 +148,6 @@ class BaseTask(Task):
pass pass
# notify_task_runner(dict(complete=pk)) # notify_task_runner(dict(complete=pk))
def get_model(self, pk):
return self.model.objects.get(pk=pk)
def get_path_to(self, *args): def get_path_to(self, *args):
''' '''
Return absolute path relative to this file. Return absolute path relative to this file.
@@ -201,7 +183,6 @@ class BaseTask(Task):
'yes': 'yes', 'yes': 'yes',
'no': 'no', 'no': 'no',
'': '', '': '',
} }
def build_env(self, instance, **kwargs): def build_env(self, instance, **kwargs):
@@ -228,25 +209,34 @@ class BaseTask(Task):
return env return env
def build_safe_env(self, instance, **kwargs): def build_safe_env(self, instance, **kwargs):
hidden_re = re.compile(r'API|TOKEN|KEY|SECRET|PASS') '''
Build environment dictionary, hiding potentially sensitive information
such as passwords or keys.
'''
hidden_re = re.compile(r'API|TOKEN|KEY|SECRET|PASS', re.I)
urlpass_re = re.compile(r'^.*?://.?:(.*?)@.*?$') urlpass_re = re.compile(r'^.*?://.?:(.*?)@.*?$')
env = self.build_env(instance, **kwargs) env = self.build_env(instance, **kwargs)
for k,v in env.items(): for k,v in env.items():
if k == 'BROKER_URL': if k in ('REST_API_URL', 'AWS_ACCESS_KEY', 'AWS_ACCESS_KEY_ID'):
m = urlpass_re.match(v)
if m:
env[k] = urlpass_re.sub('*'*len(m.groups()[0]), v)
elif k in ('REST_API_URL', 'AWS_ACCESS_KEY', 'AWS_ACCESS_KEY_ID'):
continue continue
elif k.startswith('ANSIBLE_'): elif k.startswith('ANSIBLE_'):
continue continue
elif hidden_re.search(k): elif hidden_re.search(k):
env[k] = '*'*len(str(v)) env[k] = HIDDEN_PASSWORD
elif urlpass_re.match(v):
env[k] = urlpass_re.sub(HIDDEN_PASSWORD, v)
return env return env
def args2cmdline(self, *args): def args2cmdline(self, *args):
return ' '.join([pipes.quote(a) for a in args]) return ' '.join([pipes.quote(a) for a in args])
def wrap_args_with_ssh_agent(self, args, ssh_key_path):
if ssh_key_path:
cmd = ' && '.join([self.args2cmdline('ssh-add', ssh_key_path),
self.args2cmdline(*args)])
args = ['ssh-agent', 'sh', '-c', cmd]
return args
def build_args(self, instance, **kwargs): def build_args(self, instance, **kwargs):
raise NotImplementedError raise NotImplementedError
@@ -264,19 +254,19 @@ class BaseTask(Task):
def get_password_prompts(self): def get_password_prompts(self):
''' '''
Return a dictionary of prompt regular expressions and password lookup Return a dictionary where keys are strings or regular expressions for
keys. prompts, and values are password lookup keys (keys that are returned
from build_passwords).
''' '''
return SortedDict() return SortedDict()
def run_pexpect(self, instance, args, cwd, env, passwords, task_stdout_handle, def run_pexpect(self, instance, args, cwd, env, passwords, stdout_handle,
output_replacements=None): output_replacements=None):
''' '''
Run the given command using pexpect to capture output and provide Run the given command using pexpect to capture output and provide
passwords when requested. passwords when requested.
''' '''
status, stdout = 'error', '' logfile = stdout_handle
logfile = task_stdout_handle
logfile_pos = logfile.tell() logfile_pos = logfile.tell()
child = pexpect.spawnu(args[0], args[1:], cwd=cwd, env=env) child = pexpect.spawnu(args[0], args[1:], cwd=cwd, env=env)
child.logfile_read = logfile child.logfile_read = logfile
@@ -299,7 +289,7 @@ class BaseTask(Task):
if logfile_pos != logfile.tell(): if logfile_pos != logfile.tell():
logfile_pos = logfile.tell() logfile_pos = logfile.tell()
last_stdout_update = time.time() last_stdout_update = time.time()
# NOTE: In case revoke doesn't have an affect # Refresh model instance from the database (to check cancel flag).
instance = self.update_model(instance.pk) instance = self.update_model(instance.pk)
if instance.cancel_flag: if instance.cancel_flag:
child.terminate(canceled) child.terminate(canceled)
@@ -308,34 +298,28 @@ class BaseTask(Task):
child.close(True) child.close(True)
canceled = True canceled = True
if canceled: if canceled:
status = 'canceled' return 'canceled'
elif child.exitstatus == 0: elif child.exitstatus == 0:
status = 'successful' return 'successful'
else: else:
status = 'failed' return 'failed'
return status, stdout
def pre_run_check(self, instance, **kwargs):
'''
Hook for checking job/task before running.
'''
if instance.status != 'running':
return False
# TODO: Check that we can write to the stdout data directory
return True
def post_run_hook(self, instance, **kwargs): def post_run_hook(self, instance, **kwargs):
pass '''
Hook for any steps to run after job/task is complete.
'''
def run(self, pk, **kwargs): def run(self, pk, **kwargs):
''' '''
Run the job/task and capture its output. Run the job/task and capture its output.
''' '''
instance = self.update_model(pk, status='running', celery_task_id=self.request.id) instance = self.update_model(pk, status='running', celery_task_id=self.request.id)
status, stdout, tb = 'error', '', '' status, tb = 'error', ''
output_replacements = [] output_replacements = []
try: try:
if not self.pre_run_check(instance, **kwargs): if instance.cancel_flag:
instance = self.update_model(instance.pk, status='canceled')
if instance.status != 'running':
if hasattr(settings, 'CELERY_UNIT_TEST'): if hasattr(settings, 'CELERY_UNIT_TEST'):
return return
else: else:
@@ -360,7 +344,7 @@ class BaseTask(Task):
stdout_handle = open(stdout_filename, 'w') stdout_handle = open(stdout_filename, 'w')
instance = self.update_model(pk, job_args=json.dumps(safe_args), instance = self.update_model(pk, job_args=json.dumps(safe_args),
job_cwd=cwd, job_env=safe_env, result_stdout_file=stdout_filename) job_cwd=cwd, job_env=safe_env, result_stdout_file=stdout_filename)
status, stdout = self.run_pexpect(instance, args, cwd, env, kwargs['passwords'], stdout_handle) status = self.run_pexpect(instance, args, cwd, env, kwargs['passwords'], stdout_handle)
except Exception: except Exception:
if status != 'canceled': if status != 'canceled':
tb = traceback.format_exc() tb = traceback.format_exc()
@@ -374,8 +358,7 @@ class BaseTask(Task):
stdout_handle.close() stdout_handle.close()
except Exception: except Exception:
pass pass
instance = self.update_model(pk, status=status, instance = self.update_model(pk, status=status, result_traceback=tb,
result_traceback=tb,
output_replacements=output_replacements) output_replacements=output_replacements)
self.post_run_hook(instance, **kwargs) self.post_run_hook(instance, **kwargs)
if status != 'successful' and not hasattr(settings, 'CELERY_UNIT_TEST'): if status != 'successful' and not hasattr(settings, 'CELERY_UNIT_TEST'):
@@ -388,6 +371,7 @@ class BaseTask(Task):
if not hasattr(settings, 'CELERY_UNIT_TEST'): if not hasattr(settings, 'CELERY_UNIT_TEST'):
self.signal_finished(pk) self.signal_finished(pk)
class RunJob(BaseTask): class RunJob(BaseTask):
''' '''
Celery task to run a job using ansible-playbook. Celery task to run a job using ansible-playbook.
@@ -520,9 +504,7 @@ class RunJob(BaseTask):
# If ssh unlock password is needed, run using ssh-agent. # If ssh unlock password is needed, run using ssh-agent.
if ssh_key_path and use_ssh_agent: if ssh_key_path and use_ssh_agent:
cmd = ' '.join([self.args2cmdline('ssh-add', ssh_key_path), args = self.wrap_args_with_ssh_agent(args, ssh_key_path)
'&&', self.args2cmdline(*args)])
args = ['ssh-agent', 'sh', '-c', cmd]
return args return args
@@ -547,18 +529,6 @@ class RunJob(BaseTask):
d[re.compile(r'^Vault password:\s*?$', re.M)] = 'vault_password' d[re.compile(r'^Vault password:\s*?$', re.M)] = 'vault_password'
return d return d
def pre_run_check(self, job, **kwargs):
'''
Hook for checking job before running.
'''
if job.cancel_flag:
job = self.update_model(job.pk, status='canceled')
return False
elif job.status == 'running':
return True
else:
return False
def post_run_hook(self, job, **kwargs): def post_run_hook(self, job, **kwargs):
''' '''
Hook for actions to run after job/task has completed. Hook for actions to run after job/task has completed.
@@ -570,6 +540,7 @@ class RunJob(BaseTask):
for job_event in job.job_events.order_by('pk'): for job_event in job.job_events.order_by('pk'):
job_event.save(post_process=True) job_event.save(post_process=True)
class RunProjectUpdate(BaseTask): class RunProjectUpdate(BaseTask):
name = 'awx.main.tasks.run_project_update' name = 'awx.main.tasks.run_project_update'
@@ -624,7 +595,6 @@ class RunProjectUpdate(BaseTask):
scm_password = scm_url_parts.password or scm_password or '' scm_password = scm_url_parts.password or scm_password or ''
if scm_username: if scm_username:
if scm_type == 'svn': if scm_type == 'svn':
# FIXME: Need to somehow escape single/double quotes in username/password
extra_vars['scm_username'] = scm_username extra_vars['scm_username'] = scm_username
extra_vars['scm_password'] = scm_password extra_vars['scm_password'] = scm_password
scm_password = False scm_password = False
@@ -673,9 +643,8 @@ class RunProjectUpdate(BaseTask):
# If using an SSH key, run using ssh-agent. # If using an SSH key, run using ssh-agent.
ssh_key_path = kwargs.get('private_data_file', '') ssh_key_path = kwargs.get('private_data_file', '')
if ssh_key_path: if ssh_key_path:
subcmds = [('ssh-add', ssh_key_path), args] args = self.wrap_args_with_ssh_agent(args, ssh_key_path)
cmd = ' && '.join([self.args2cmdline(*x) for x in subcmds])
args = ['ssh-agent', 'sh', '-c', cmd]
return args return args
def build_safe_args(self, project_update, **kwargs): def build_safe_args(self, project_update, **kwargs):
@@ -683,7 +652,7 @@ class RunProjectUpdate(BaseTask):
for pw_name, pw_val in pwdict.items(): for pw_name, pw_val in pwdict.items():
if pw_name in ('', 'yes', 'no', 'scm_username'): if pw_name in ('', 'yes', 'no', 'scm_username'):
continue continue
pwdict[pw_name] = '*'*len(pw_val) pwdict[pw_name] = HIDDEN_PASSWORD
kwargs['passwords'] = pwdict kwargs['passwords'] = pwdict
return self.build_args(project_update, **kwargs) return self.build_args(project_update, **kwargs)
@@ -704,7 +673,7 @@ class RunProjectUpdate(BaseTask):
for pw_name, pw_val in pwdict.items(): for pw_name, pw_val in pwdict.items():
if pw_name in ('', 'yes', 'no', 'scm_username'): if pw_name in ('', 'yes', 'no', 'scm_username'):
continue continue
pwdict[pw_name] = '*'*len(pw_val) pwdict[pw_name] = HIDDEN_PASSWORD
kwargs['passwords'] = pwdict kwargs['passwords'] = pwdict
after_url = self._build_scm_url_extra_vars(project_update, after_url = self._build_scm_url_extra_vars(project_update,
**kwargs)[0] **kwargs)[0]
@@ -717,7 +686,7 @@ class RunProjectUpdate(BaseTask):
} }
d_after = { d_after = {
'username': scm_username, 'username': scm_username,
'password': '*'*len(scm_password), 'password': HIDDEN_PASSWORD,
} }
pattern1 = "username=\"%(username)s\" password=\"%(password)s\"" pattern1 = "username=\"%(username)s\" password=\"%(password)s\""
pattern2 = "--username '%(username)s' --password '%(password)s'" pattern2 = "--username '%(username)s' --password '%(password)s'"
@@ -740,18 +709,6 @@ class RunProjectUpdate(BaseTask):
def get_idle_timeout(self): def get_idle_timeout(self):
return getattr(settings, 'PROJECT_UPDATE_IDLE_TIMEOUT', None) return getattr(settings, 'PROJECT_UPDATE_IDLE_TIMEOUT', None)
def pre_run_check(self, project_update, **kwargs):
'''
Hook for checking project update before running.
'''
while True:
if project_update.cancel_flag:
project_update = self.update_model(project_update.pk, status='canceled')
return False
elif project_update.status == 'running':
return True
else:
return False
class RunInventoryUpdate(BaseTask): class RunInventoryUpdate(BaseTask):
@@ -828,7 +785,6 @@ class RunInventoryUpdate(BaseTask):
elif inventory_update.source == 'file': elif inventory_update.source == 'file':
# FIXME: Parse source_env to dict, update env. # FIXME: Parse source_env to dict, update env.
pass pass
#print env
return env return env
def build_args(self, inventory_update, **kwargs): def build_args(self, inventory_update, **kwargs):
@@ -869,16 +825,3 @@ class RunInventoryUpdate(BaseTask):
def get_idle_timeout(self): def get_idle_timeout(self):
return getattr(settings, 'INVENTORY_UPDATE_IDLE_TIMEOUT', None) return getattr(settings, 'INVENTORY_UPDATE_IDLE_TIMEOUT', None)
def pre_run_check(self, inventory_update, **kwargs):
'''
Hook for checking inventory update before running.
'''
while True:
if inventory_update.cancel_flag:
inventory_update = self.update_model(inventory_update.pk, status='canceled')
return False
elif inventory_update.status == 'running':
return True
else:
return False