update strings

This commit is contained in:
sundeep-co-in
2016-10-25 16:12:41 +05:30
parent 78a8ce9479
commit d7b3b90f4d
50 changed files with 892 additions and 450 deletions

View File

@@ -42,6 +42,7 @@ from django.utils.timezone import now
from django.utils.encoding import smart_str
from django.core.mail import send_mail
from django.contrib.auth.models import User
from django.utils.translation import ugettext_lazy as _
# AWX
from awx.main.constants import CLOUD_PROVIDERS
@@ -53,10 +54,9 @@ from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field,
from awx.main.consumers import emit_channel_notification
__all__ = ['RunJob', 'RunSystemJob', 'RunProjectUpdate', 'RunInventoryUpdate',
'RunAdHocCommand', 'RunWorkflowJob', 'handle_work_error',
'RunAdHocCommand', 'handle_work_error',
'handle_work_success', 'update_inventory_computed_fields',
'send_notifications', 'run_administrative_checks',
'RunJobLaunch']
'send_notifications', 'run_administrative_checks']
HIDDEN_PASSWORD = '**********'
@@ -112,12 +112,12 @@ def run_administrative_checks(self):
tower_admin_emails = User.objects.filter(is_superuser=True).values_list('email', flat=True)
if (used_percentage * 100) > 90:
send_mail("Ansible Tower host usage over 90%",
"Ansible Tower host usage over 90%",
_("Ansible Tower host usage over 90%"),
tower_admin_emails,
fail_silently=True)
if validation_info.get('date_warning', False):
send_mail("Ansible Tower license will expire soon",
"Ansible Tower license will expire soon",
_("Ansible Tower license will expire soon"),
tower_admin_emails,
fail_silently=True)
@@ -161,12 +161,6 @@ def tower_periodic_scheduler(self):
logger.debug("Last run was: %s", last_run)
write_last_run(run_now)
# Sanity check: If this is a secondary machine, there is nothing
# on the schedule.
# TODO: Fix for clustering/ha
if Instance.objects.my_role() == 'secondary':
return
old_schedules = Schedule.objects.enabled().before(last_run)
for schedule in old_schedules:
schedule.save()
@@ -188,7 +182,7 @@ def tower_periodic_scheduler(self):
def _send_notification_templates(instance, status_str):
if status_str not in ['succeeded', 'failed']:
raise ValueError("status_str must be either succeeded or failed")
raise ValueError(_("status_str must be either succeeded or failed"))
notification_templates = instance.get_notification_templates()
if notification_templates:
all_notification_templates = set(notification_templates.get('success', []) + notification_templates.get('any', []))
@@ -234,8 +228,9 @@ def handle_work_error(self, task_id, subtasks=None):
if instance.celery_task_id != task_id:
instance.status = 'failed'
instance.failed = True
instance.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % \
(first_instance_type, first_instance.name, first_instance.id)
if not instance.job_explanation:
instance.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % \
(first_instance_type, first_instance.name, first_instance.id)
instance.save()
instance.websocket_emit_status("failed")
@@ -501,7 +496,7 @@ class BaseTask(Task):
return OrderedDict()
def run_pexpect(self, instance, args, cwd, env, passwords, stdout_handle,
output_replacements=None):
output_replacements=None, extra_update_fields=None):
'''
Run the given command using pexpect to capture output and provide
passwords when requested.
@@ -517,9 +512,17 @@ class BaseTask(Task):
if pexpect_sleep is not None:
logger.info("Suspending Job Execution for QA Work")
time.sleep(pexpect_sleep)
global_timeout = getattr(settings, 'DEFAULT_JOB_TIMEOUTS', {})
cls_name = instance.__class__.__name__
if cls_name in global_timeout:
local_timeout = getattr(instance, 'timeout', 0)
job_timeout = global_timeout[cls_name] if local_timeout == 0 else local_timeout
else:
job_timeout = 0
child = pexpect.spawnu(args[0], args[1:], cwd=cwd, env=env)
child.logfile_read = logfile
canceled = False
timed_out = False
last_stdout_update = time.time()
idle_timeout = self.get_idle_timeout()
expect_list = []
@@ -530,7 +533,9 @@ class BaseTask(Task):
expect_passwords[n] = passwords.get(item[1], '') or ''
expect_list.extend([pexpect.TIMEOUT, pexpect.EOF])
instance = self.update_model(instance.pk, status='running',
execution_node=settings.CLUSTER_HOST_ID,
output_replacements=output_replacements)
job_start = time.time()
while child.isalive():
result_id = child.expect(expect_list, timeout=pexpect_timeout)
if result_id in expect_passwords:
@@ -541,45 +546,65 @@ class BaseTask(Task):
# Refresh model instance from the database (to check cancel flag).
instance = self.update_model(instance.pk)
if instance.cancel_flag:
try:
if settings.AWX_PROOT_ENABLED and self.should_use_proot(instance):
# NOTE: Refactor this once we get a newer psutil across the board
if not psutil:
os.kill(child.pid, signal.SIGKILL)
else:
try:
main_proc = psutil.Process(pid=child.pid)
if hasattr(main_proc, "children"):
child_procs = main_proc.children(recursive=True)
else:
child_procs = main_proc.get_children(recursive=True)
for child_proc in child_procs:
os.kill(child_proc.pid, signal.SIGKILL)
os.kill(main_proc.pid, signal.SIGKILL)
except TypeError:
os.kill(child.pid, signal.SIGKILL)
else:
os.kill(child.pid, signal.SIGTERM)
time.sleep(3)
canceled = True
except OSError:
logger.warn("Attempted to cancel already finished job, ignoring")
canceled = True
elif job_timeout != 0 and (time.time() - job_start) > job_timeout:
timed_out = True
if isinstance(extra_update_fields, dict):
extra_update_fields['job_explanation'] = "Job terminated due to timeout"
if canceled or timed_out:
self._handle_termination(instance, child, is_cancel=canceled)
if idle_timeout and (time.time() - last_stdout_update) > idle_timeout:
child.close(True)
canceled = True
if canceled:
return 'canceled', child.exitstatus
elif child.exitstatus == 0:
elif child.exitstatus == 0 and not timed_out:
return 'successful', child.exitstatus
else:
return 'failed', child.exitstatus
def _handle_termination(self, instance, job, is_cancel=True):
'''Helper function to properly terminate specified job.
Args:
instance: The corresponding model instance of this task.
job: The pexpect subprocess running the job.
is_cancel: Flag showing whether this termination is caused by instance's
cancel_flag.
Return:
None.
'''
try:
if settings.AWX_PROOT_ENABLED and self.should_use_proot(instance):
# NOTE: Refactor this once we get a newer psutil across the board
if not psutil:
os.kill(job.pid, signal.SIGKILL)
else:
try:
main_proc = psutil.Process(pid=job.pid)
if hasattr(main_proc, "children"):
child_procs = main_proc.children(recursive=True)
else:
child_procs = main_proc.get_children(recursive=True)
for child_proc in child_procs:
os.kill(child_proc.pid, signal.SIGKILL)
os.kill(main_proc.pid, signal.SIGKILL)
except TypeError:
os.kill(job.pid, signal.SIGKILL)
else:
os.kill(job.pid, signal.SIGTERM)
time.sleep(3)
except OSError:
keyword = 'cancel' if is_cancel else 'timeout'
logger.warn("Attempted to %s already finished job, ignoring" % keyword)
def pre_run_hook(self, instance, **kwargs):
'''
Hook for any steps to run before the job/task starts
'''
def post_run_hook(self, instance, **kwargs):
def post_run_hook(self, instance, status, **kwargs):
'''
Hook for any steps to run after job/task is complete.
'''
@@ -588,11 +613,12 @@ class BaseTask(Task):
'''
Run the job/task and capture its output.
'''
instance = self.update_model(pk, status='running', celery_task_id=self.request.id)
instance = self.update_model(pk, status='running', celery_task_id='' if self.request.id is None else self.request.id)
instance.websocket_emit_status("running")
status, rc, tb = 'error', None, ''
output_replacements = []
extra_update_fields = {}
try:
self.pre_run_hook(instance, **kwargs)
if instance.cancel_flag:
@@ -636,7 +662,8 @@ class BaseTask(Task):
safe_args = self.wrap_args_with_ssh_agent(safe_args, ssh_key_path, ssh_auth_sock)
instance = self.update_model(pk, job_args=json.dumps(safe_args),
job_cwd=cwd, job_env=safe_env, result_stdout_file=stdout_filename)
status, rc = self.run_pexpect(instance, args, cwd, env, kwargs['passwords'], stdout_handle)
status, rc = self.run_pexpect(instance, args, cwd, env, kwargs['passwords'], stdout_handle,
extra_update_fields=extra_update_fields)
except Exception:
if status != 'canceled':
tb = traceback.format_exc()
@@ -657,8 +684,9 @@ class BaseTask(Task):
except Exception:
pass
instance = self.update_model(pk, status=status, result_traceback=tb,
output_replacements=output_replacements)
self.post_run_hook(instance, **kwargs)
output_replacements=output_replacements,
**extra_update_fields)
self.post_run_hook(instance, status, **kwargs)
instance.websocket_emit_status(status)
if status != 'successful' and not hasattr(settings, 'CELERY_UNIT_TEST'):
# Raising an exception will mark the job as 'failed' in celery
@@ -749,6 +777,8 @@ class RunJob(BaseTask):
# callbacks to work.
env['JOB_ID'] = str(job.pk)
env['INVENTORY_ID'] = str(job.inventory.pk)
if job.project:
env['PROJECT_REVISION'] = job.project.scm_revision
env['ANSIBLE_CALLBACK_PLUGINS'] = plugin_path
env['REST_API_URL'] = settings.INTERNAL_API_URL
env['REST_API_TOKEN'] = job.task_auth_token or ''
@@ -882,6 +912,10 @@ class RunJob(BaseTask):
'tower_job_id': job.pk,
'tower_job_launch_type': job.launch_type,
}
if job.project:
extra_vars.update({
'tower_project_revision': job.project.scm_revision,
})
if job.job_template:
extra_vars.update({
'tower_job_template_id': job.job_template.pk,
@@ -958,11 +992,28 @@ class RunJob(BaseTask):
'''
return getattr(settings, 'AWX_PROOT_ENABLED', False)
def post_run_hook(self, job, **kwargs):
def pre_run_hook(self, job, **kwargs):
if job.project and job.project.scm_type:
local_project_sync = job.project.create_project_update()
local_project_sync.job_type = 'run'
local_project_sync.save()
project_update_task = local_project_sync._get_task_class()
try:
project_update_task().run(local_project_sync.id)
job.scm_revision = job.project.scm_revision
job.save()
except Exception:
job.status = 'failed'
job.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % \
('project_update', local_project_sync.name, local_project_sync.id)
job.save()
raise
def post_run_hook(self, job, status, **kwargs):
'''
Hook for actions to run after job/task has completed.
'''
super(RunJob, self).post_run_hook(job, **kwargs)
super(RunJob, self).post_run_hook(job, status, **kwargs)
try:
inventory = job.inventory
except Inventory.DoesNotExist:
@@ -1063,7 +1114,10 @@ class RunProjectUpdate(BaseTask):
args.append('-v')
scm_url, extra_vars = self._build_scm_url_extra_vars(project_update,
**kwargs)
scm_branch = project_update.scm_branch or {'hg': 'tip'}.get(project_update.scm_type, 'HEAD')
if project_update.project.scm_revision and project_update.job_type == 'run':
scm_branch = project_update.project.scm_revision
else:
scm_branch = project_update.scm_branch or {'hg': 'tip'}.get(project_update.scm_type, 'HEAD')
extra_vars.update({
'project_path': project_update.get_project_path(check_if_exists=False),
'scm_type': project_update.scm_type,
@@ -1071,6 +1125,8 @@ class RunProjectUpdate(BaseTask):
'scm_branch': scm_branch,
'scm_clean': project_update.scm_clean,
'scm_delete_on_update': project_update.scm_delete_on_update,
'scm_full_checkout': True if project_update.job_type == 'run' else False,
'scm_revision_output': '/tmp/_{}_syncrev'.format(project_update.id) # TODO: TempFile
})
args.extend(['-e', json.dumps(extra_vars)])
args.append('project_update.yml')
@@ -1144,6 +1200,18 @@ class RunProjectUpdate(BaseTask):
'''
return kwargs.get('private_data_files', {}).get('scm_credential', '')
def post_run_hook(self, instance, status, **kwargs):
if instance.job_type == 'check' and status not in ('failed', 'canceled',):
p = instance.project
fd = open('/tmp/_{}_syncrev'.format(instance.id), 'r')
lines = fd.readlines()
if lines:
p.scm_revision = lines[0].strip()
p.playbook_files = p.playbooks
p.save()
else:
logger.error("Could not find scm revision in check")
class RunInventoryUpdate(BaseTask):
name = 'awx.main.tasks.run_inventory_update'
@@ -1638,7 +1706,7 @@ class RunAdHocCommand(BaseTask):
'''
return getattr(settings, 'AWX_PROOT_ENABLED', False)
def post_run_hook(self, ad_hoc_command, **kwargs):
def post_run_hook(self, ad_hoc_command, status, **kwargs):
'''
Hook for actions to run after ad hoc command has completed.
'''
@@ -1675,38 +1743,3 @@ class RunSystemJob(BaseTask):
def build_cwd(self, instance, **kwargs):
return settings.BASE_DIR
'''
class RunWorkflowJob(BaseTask):
name = 'awx.main.tasks.run_workflow_job'
model = WorkflowJob
def run(self, pk, **kwargs):
#Run the job/task and capture its output.
instance = self.update_model(pk, status='running', celery_task_id=self.request.id)
instance.websocket_emit_status("running")
# FIXME: Currently, the workflow job busy waits until the graph run is
# complete. Instead, the workflow job should return or never even run,
# because all of the "launch logic" can be done schedule().
# However, other aspects of our system depend on a 1-1 relationship
# between a Job and a Celery Task.
#
# * If we let the workflow job task (RunWorkflowJob.run()) complete
# then how do we trigger the handle_work_error and
# handle_work_success subtasks?
#
# * How do we handle the recovery process? (i.e. there is an entry in
# the database but not in celery).
while True:
dag = WorkflowDAG(instance)
if dag.is_workflow_done():
# TODO: update with accurate finish status (i.e. canceled, error, etc.)
instance = self.update_model(instance.pk, status='successful')
break
time.sleep(1)
instance.websocket_emit_status(instance.status)
# TODO: Handle cancel
'''