Merge pull request #3502 from jangsutsr/2363_implement_job_timeout

Implement job timeout
This commit is contained in:
Aaron Tan
2016-10-19 23:54:51 -04:00
committed by GitHub
7 changed files with 131 additions and 33 deletions

View File

@@ -914,7 +914,7 @@ class ProjectSerializer(UnifiedJobTemplateSerializer, ProjectOptionsSerializer):
class Meta: class Meta:
model = Project model = Project
fields = ('*', 'organization', 'scm_delete_on_next_update', 'scm_update_on_launch', fields = ('*', 'organization', 'scm_delete_on_next_update', 'scm_update_on_launch',
'scm_update_cache_timeout') + \ 'scm_update_cache_timeout', 'timeout') + \
('last_update_failed', 'last_updated') # Backwards compatibility ('last_update_failed', 'last_updated') # Backwards compatibility
read_only_fields = ('scm_delete_on_next_update',) read_only_fields = ('scm_delete_on_next_update',)
@@ -1329,7 +1329,8 @@ class InventorySourceOptionsSerializer(BaseSerializer):
class Meta: class Meta:
fields = ('*', 'source', 'source_path', 'source_script', 'source_vars', 'credential', fields = ('*', 'source', 'source_path', 'source_script', 'source_vars', 'credential',
'source_regions', 'instance_filters', 'group_by', 'overwrite', 'overwrite_vars') 'source_regions', 'instance_filters', 'group_by', 'overwrite', 'overwrite_vars',
'timeout')
def get_related(self, obj): def get_related(self, obj):
res = super(InventorySourceOptionsSerializer, self).get_related(obj) res = super(InventorySourceOptionsSerializer, self).get_related(obj)
@@ -1799,7 +1800,7 @@ class JobOptionsSerializer(LabelsListMixin, BaseSerializer):
fields = ('*', 'job_type', 'inventory', 'project', 'playbook', fields = ('*', 'job_type', 'inventory', 'project', 'playbook',
'credential', 'cloud_credential', 'network_credential', 'forks', 'limit', 'credential', 'cloud_credential', 'network_credential', 'forks', 'limit',
'verbosity', 'extra_vars', 'job_tags', 'force_handlers', 'verbosity', 'extra_vars', 'job_tags', 'force_handlers',
'skip_tags', 'start_at_task',) 'skip_tags', 'start_at_task', 'timeout')
def get_related(self, obj): def get_related(self, obj):
res = super(JobOptionsSerializer, self).get_related(obj) res = super(JobOptionsSerializer, self).get_related(obj)

View File

@@ -0,0 +1,44 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('main', '0040_v310_artifacts'),
]
operations = [
migrations.AddField(
model_name='inventorysource',
name='timeout',
field=models.PositiveIntegerField(default=0, blank=True),
),
migrations.AddField(
model_name='inventoryupdate',
name='timeout',
field=models.PositiveIntegerField(default=0, blank=True),
),
migrations.AddField(
model_name='job',
name='timeout',
field=models.PositiveIntegerField(default=0, blank=True),
),
migrations.AddField(
model_name='jobtemplate',
name='timeout',
field=models.PositiveIntegerField(default=0, blank=True),
),
migrations.AddField(
model_name='project',
name='timeout',
field=models.PositiveIntegerField(default=0, blank=True),
),
migrations.AddField(
model_name='projectupdate',
name='timeout',
field=models.PositiveIntegerField(default=0, blank=True),
),
]

View File

@@ -860,6 +860,10 @@ class InventorySourceOptions(BaseModel):
default=False, default=False,
help_text=_('Overwrite local variables from remote inventory source.'), help_text=_('Overwrite local variables from remote inventory source.'),
) )
timeout = models.PositiveIntegerField(
blank=True,
default=0,
)
@classmethod @classmethod
def get_ec2_region_choices(cls): def get_ec2_region_choices(cls):
@@ -1084,7 +1088,8 @@ class InventorySource(UnifiedJobTemplate, InventorySourceOptions):
@classmethod @classmethod
def _get_unified_job_field_names(cls): def _get_unified_job_field_names(cls):
return ['name', 'description', 'source', 'source_path', 'source_script', 'source_vars', 'schedule', return ['name', 'description', 'source', 'source_path', 'source_script', 'source_vars', 'schedule',
'credential', 'source_regions', 'instance_filters', 'group_by', 'overwrite', 'overwrite_vars'] 'credential', 'source_regions', 'instance_filters', 'group_by', 'overwrite', 'overwrite_vars',
'timeout']
def save(self, *args, **kwargs): def save(self, *args, **kwargs):
# If update_fields has been specified, add our field names to it, # If update_fields has been specified, add our field names to it,

View File

@@ -143,6 +143,10 @@ class JobOptions(BaseModel):
allow_simultaneous = models.BooleanField( allow_simultaneous = models.BooleanField(
default=False, default=False,
) )
timeout = models.PositiveIntegerField(
blank=True,
default=0,
)
extra_vars_dict = VarsDictProperty('extra_vars', True) extra_vars_dict = VarsDictProperty('extra_vars', True)
@@ -253,7 +257,7 @@ class JobTemplate(UnifiedJobTemplate, JobOptions, ResourceMixin):
'playbook', 'credential', 'cloud_credential', 'network_credential', 'forks', 'schedule', 'playbook', 'credential', 'cloud_credential', 'network_credential', 'forks', 'schedule',
'limit', 'verbosity', 'job_tags', 'extra_vars', 'launch_type', 'limit', 'verbosity', 'job_tags', 'extra_vars', 'launch_type',
'force_handlers', 'skip_tags', 'start_at_task', 'become_enabled', 'force_handlers', 'skip_tags', 'start_at_task', 'become_enabled',
'labels', 'survey_passwords', 'allow_simultaneous',] 'labels', 'survey_passwords', 'allow_simultaneous', 'timeout']
def resource_validation_data(self): def resource_validation_data(self):
''' '''
@@ -1328,6 +1332,7 @@ class SystemJobOptions(BaseModel):
default='', default='',
) )
class SystemJobTemplate(UnifiedJobTemplate, SystemJobOptions): class SystemJobTemplate(UnifiedJobTemplate, SystemJobOptions):
class Meta: class Meta:

View File

@@ -106,6 +106,10 @@ class ProjectOptions(models.Model):
default=None, default=None,
on_delete=models.SET_NULL, on_delete=models.SET_NULL,
) )
timeout = models.PositiveIntegerField(
blank=True,
default=0,
)
def clean_scm_type(self): def clean_scm_type(self):
return self.scm_type or '' return self.scm_type or ''
@@ -251,7 +255,7 @@ class Project(UnifiedJobTemplate, ProjectOptions, ResourceMixin):
def _get_unified_job_field_names(cls): def _get_unified_job_field_names(cls):
return ['name', 'description', 'local_path', 'scm_type', 'scm_url', return ['name', 'description', 'local_path', 'scm_type', 'scm_url',
'scm_branch', 'scm_clean', 'scm_delete_on_update', 'scm_branch', 'scm_clean', 'scm_delete_on_update',
'credential', 'schedule'] 'credential', 'schedule', 'timeout']
def save(self, *args, **kwargs): def save(self, *args, **kwargs):
new_instance = not bool(self.pk) new_instance = not bool(self.pk)

View File

@@ -501,7 +501,7 @@ class BaseTask(Task):
return OrderedDict() return OrderedDict()
def run_pexpect(self, instance, args, cwd, env, passwords, stdout_handle, def run_pexpect(self, instance, args, cwd, env, passwords, stdout_handle,
output_replacements=None): output_replacements=None, extra_update_fields=None):
''' '''
Run the given command using pexpect to capture output and provide Run the given command using pexpect to capture output and provide
passwords when requested. passwords when requested.
@@ -517,9 +517,17 @@ class BaseTask(Task):
if pexpect_sleep is not None: if pexpect_sleep is not None:
logger.info("Suspending Job Execution for QA Work") logger.info("Suspending Job Execution for QA Work")
time.sleep(pexpect_sleep) time.sleep(pexpect_sleep)
global_timeout = getattr(settings, 'DEFAULT_JOB_TIMEOUTS', {})
cls_name = instance.__class__.__name__
if cls_name in global_timeout:
local_timeout = getattr(instance, 'timeout', 0)
job_timeout = global_timeout[cls_name] if local_timeout == 0 else local_timeout
else:
job_timeout = 0
child = pexpect.spawnu(args[0], args[1:], cwd=cwd, env=env) child = pexpect.spawnu(args[0], args[1:], cwd=cwd, env=env)
child.logfile_read = logfile child.logfile_read = logfile
canceled = False canceled = False
timed_out = False
last_stdout_update = time.time() last_stdout_update = time.time()
idle_timeout = self.get_idle_timeout() idle_timeout = self.get_idle_timeout()
expect_list = [] expect_list = []
@@ -531,6 +539,7 @@ class BaseTask(Task):
expect_list.extend([pexpect.TIMEOUT, pexpect.EOF]) expect_list.extend([pexpect.TIMEOUT, pexpect.EOF])
instance = self.update_model(instance.pk, status='running', instance = self.update_model(instance.pk, status='running',
output_replacements=output_replacements) output_replacements=output_replacements)
job_start = time.time()
while child.isalive(): while child.isalive():
result_id = child.expect(expect_list, timeout=pexpect_timeout) result_id = child.expect(expect_list, timeout=pexpect_timeout)
if result_id in expect_passwords: if result_id in expect_passwords:
@@ -541,39 +550,59 @@ class BaseTask(Task):
# Refresh model instance from the database (to check cancel flag). # Refresh model instance from the database (to check cancel flag).
instance = self.update_model(instance.pk) instance = self.update_model(instance.pk)
if instance.cancel_flag: if instance.cancel_flag:
try: canceled = True
if settings.AWX_PROOT_ENABLED and self.should_use_proot(instance): elif job_timeout != 0 and (time.time() - job_start) > job_timeout:
# NOTE: Refactor this once we get a newer psutil across the board timed_out = True
if not psutil: if isinstance(extra_update_fields, dict):
os.kill(child.pid, signal.SIGKILL) extra_update_fields['job_explanation'] = "Job terminated due to timeout"
else: if canceled or timed_out:
try: self._handle_termination(instance, child, is_cancel=canceled)
main_proc = psutil.Process(pid=child.pid)
if hasattr(main_proc, "children"):
child_procs = main_proc.children(recursive=True)
else:
child_procs = main_proc.get_children(recursive=True)
for child_proc in child_procs:
os.kill(child_proc.pid, signal.SIGKILL)
os.kill(main_proc.pid, signal.SIGKILL)
except TypeError:
os.kill(child.pid, signal.SIGKILL)
else:
os.kill(child.pid, signal.SIGTERM)
time.sleep(3)
canceled = True
except OSError:
logger.warn("Attempted to cancel already finished job, ignoring")
if idle_timeout and (time.time() - last_stdout_update) > idle_timeout: if idle_timeout and (time.time() - last_stdout_update) > idle_timeout:
child.close(True) child.close(True)
canceled = True canceled = True
if canceled: if canceled:
return 'canceled', child.exitstatus return 'canceled', child.exitstatus
elif child.exitstatus == 0: elif child.exitstatus == 0 and not timed_out:
return 'successful', child.exitstatus return 'successful', child.exitstatus
else: else:
return 'failed', child.exitstatus return 'failed', child.exitstatus
def _handle_termination(self, instance, job, is_cancel=True):
'''Helper function to properly terminate specified job.
Args:
instance: The corresponding model instance of this task.
job: The pexpect subprocess running the job.
is_cancel: Flag showing whether this termination is caused by instance's
cancel_flag.
Return:
None.
'''
try:
if settings.AWX_PROOT_ENABLED and self.should_use_proot(instance):
# NOTE: Refactor this once we get a newer psutil across the board
if not psutil:
os.kill(job.pid, signal.SIGKILL)
else:
try:
main_proc = psutil.Process(pid=job.pid)
if hasattr(main_proc, "children"):
child_procs = main_proc.children(recursive=True)
else:
child_procs = main_proc.get_children(recursive=True)
for child_proc in child_procs:
os.kill(child_proc.pid, signal.SIGKILL)
os.kill(main_proc.pid, signal.SIGKILL)
except TypeError:
os.kill(job.pid, signal.SIGKILL)
else:
os.kill(job.pid, signal.SIGTERM)
time.sleep(3)
except OSError:
keyword = 'cancel' if is_cancel else 'timeout'
logger.warn("Attempted to %s already finished job, ignoring" % keyword)
def pre_run_hook(self, instance, **kwargs): def pre_run_hook(self, instance, **kwargs):
''' '''
Hook for any steps to run before the job/task starts Hook for any steps to run before the job/task starts
@@ -593,6 +622,7 @@ class BaseTask(Task):
instance.websocket_emit_status("running") instance.websocket_emit_status("running")
status, rc, tb = 'error', None, '' status, rc, tb = 'error', None, ''
output_replacements = [] output_replacements = []
extra_update_fields = {}
try: try:
self.pre_run_hook(instance, **kwargs) self.pre_run_hook(instance, **kwargs)
if instance.cancel_flag: if instance.cancel_flag:
@@ -636,7 +666,8 @@ class BaseTask(Task):
safe_args = self.wrap_args_with_ssh_agent(safe_args, ssh_key_path, ssh_auth_sock) safe_args = self.wrap_args_with_ssh_agent(safe_args, ssh_key_path, ssh_auth_sock)
instance = self.update_model(pk, job_args=json.dumps(safe_args), instance = self.update_model(pk, job_args=json.dumps(safe_args),
job_cwd=cwd, job_env=safe_env, result_stdout_file=stdout_filename) job_cwd=cwd, job_env=safe_env, result_stdout_file=stdout_filename)
status, rc = self.run_pexpect(instance, args, cwd, env, kwargs['passwords'], stdout_handle) status, rc = self.run_pexpect(instance, args, cwd, env, kwargs['passwords'], stdout_handle,
extra_update_fields=extra_update_fields)
except Exception: except Exception:
if status != 'canceled': if status != 'canceled':
tb = traceback.format_exc() tb = traceback.format_exc()
@@ -657,7 +688,8 @@ class BaseTask(Task):
except Exception: except Exception:
pass pass
instance = self.update_model(pk, status=status, result_traceback=tb, instance = self.update_model(pk, status=status, result_traceback=tb,
output_replacements=output_replacements) output_replacements=output_replacements,
**extra_update_fields)
self.post_run_hook(instance, **kwargs) self.post_run_hook(instance, **kwargs)
instance.websocket_emit_status(status) instance.websocket_emit_status(status)
if status != 'successful' and not hasattr(settings, 'CELERY_UNIT_TEST'): if status != 'successful' and not hasattr(settings, 'CELERY_UNIT_TEST'):

View File

@@ -276,3 +276,10 @@ TEST_OPENSTACK_PROJECT = ''
# Azure credentials. # Azure credentials.
TEST_AZURE_USERNAME = '' TEST_AZURE_USERNAME = ''
TEST_AZURE_KEY_DATA = '' TEST_AZURE_KEY_DATA = ''
# Exemplary global job timeout settings
# DEFAULT_JOB_TIMEOUTS = {
# 'Job': 10,
# 'InventoryUpdate': 15,
# 'ProjectUpdate': 20,
# }