diff --git a/awx/api/serializers.py b/awx/api/serializers.py index 7be39a054d..2a22a6b04f 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -914,7 +914,7 @@ class ProjectSerializer(UnifiedJobTemplateSerializer, ProjectOptionsSerializer): class Meta: model = Project fields = ('*', 'organization', 'scm_delete_on_next_update', 'scm_update_on_launch', - 'scm_update_cache_timeout') + \ + 'scm_update_cache_timeout', 'timeout') + \ ('last_update_failed', 'last_updated') # Backwards compatibility read_only_fields = ('scm_delete_on_next_update',) @@ -1329,7 +1329,8 @@ class InventorySourceOptionsSerializer(BaseSerializer): class Meta: fields = ('*', 'source', 'source_path', 'source_script', 'source_vars', 'credential', - 'source_regions', 'instance_filters', 'group_by', 'overwrite', 'overwrite_vars') + 'source_regions', 'instance_filters', 'group_by', 'overwrite', 'overwrite_vars', + 'timeout') def get_related(self, obj): res = super(InventorySourceOptionsSerializer, self).get_related(obj) @@ -1799,7 +1800,7 @@ class JobOptionsSerializer(LabelsListMixin, BaseSerializer): fields = ('*', 'job_type', 'inventory', 'project', 'playbook', 'credential', 'cloud_credential', 'network_credential', 'forks', 'limit', 'verbosity', 'extra_vars', 'job_tags', 'force_handlers', - 'skip_tags', 'start_at_task',) + 'skip_tags', 'start_at_task', 'timeout') def get_related(self, obj): res = super(JobOptionsSerializer, self).get_related(obj) diff --git a/awx/main/migrations/0041_v310_job_timeout.py b/awx/main/migrations/0041_v310_job_timeout.py new file mode 100644 index 0000000000..447ed6d38b --- /dev/null +++ b/awx/main/migrations/0041_v310_job_timeout.py @@ -0,0 +1,44 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('main', '0040_v310_artifacts'), + ] + + operations = [ + migrations.AddField( + model_name='inventorysource', + name='timeout', + field=models.PositiveIntegerField(default=0, blank=True), + ), + migrations.AddField( + model_name='inventoryupdate', + name='timeout', + field=models.PositiveIntegerField(default=0, blank=True), + ), + migrations.AddField( + model_name='job', + name='timeout', + field=models.PositiveIntegerField(default=0, blank=True), + ), + migrations.AddField( + model_name='jobtemplate', + name='timeout', + field=models.PositiveIntegerField(default=0, blank=True), + ), + migrations.AddField( + model_name='project', + name='timeout', + field=models.PositiveIntegerField(default=0, blank=True), + ), + migrations.AddField( + model_name='projectupdate', + name='timeout', + field=models.PositiveIntegerField(default=0, blank=True), + ), + ] diff --git a/awx/main/models/inventory.py b/awx/main/models/inventory.py index 7dc7eae7b5..6fb3e2f992 100644 --- a/awx/main/models/inventory.py +++ b/awx/main/models/inventory.py @@ -860,6 +860,10 @@ class InventorySourceOptions(BaseModel): default=False, help_text=_('Overwrite local variables from remote inventory source.'), ) + timeout = models.PositiveIntegerField( + blank=True, + default=0, + ) @classmethod def get_ec2_region_choices(cls): @@ -1084,7 +1088,8 @@ class InventorySource(UnifiedJobTemplate, InventorySourceOptions): @classmethod def _get_unified_job_field_names(cls): return ['name', 'description', 'source', 'source_path', 'source_script', 'source_vars', 'schedule', - 'credential', 'source_regions', 'instance_filters', 'group_by', 'overwrite', 'overwrite_vars'] + 'credential', 'source_regions', 'instance_filters', 'group_by', 'overwrite', 'overwrite_vars', + 'timeout'] def save(self, *args, **kwargs): # If update_fields has been specified, add our field names to it, diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index 27efff9cc7..9c216fc526 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -143,6 +143,10 @@ class JobOptions(BaseModel): allow_simultaneous = models.BooleanField( default=False, ) + timeout = models.PositiveIntegerField( + blank=True, + default=0, + ) extra_vars_dict = VarsDictProperty('extra_vars', True) @@ -253,7 +257,7 @@ class JobTemplate(UnifiedJobTemplate, JobOptions, ResourceMixin): 'playbook', 'credential', 'cloud_credential', 'network_credential', 'forks', 'schedule', 'limit', 'verbosity', 'job_tags', 'extra_vars', 'launch_type', 'force_handlers', 'skip_tags', 'start_at_task', 'become_enabled', - 'labels', 'survey_passwords', 'allow_simultaneous',] + 'labels', 'survey_passwords', 'allow_simultaneous', 'timeout'] def resource_validation_data(self): ''' @@ -1328,6 +1332,7 @@ class SystemJobOptions(BaseModel): default='', ) + class SystemJobTemplate(UnifiedJobTemplate, SystemJobOptions): class Meta: diff --git a/awx/main/models/projects.py b/awx/main/models/projects.py index 59263fff6a..256809d4ac 100644 --- a/awx/main/models/projects.py +++ b/awx/main/models/projects.py @@ -106,6 +106,10 @@ class ProjectOptions(models.Model): default=None, on_delete=models.SET_NULL, ) + timeout = models.PositiveIntegerField( + blank=True, + default=0, + ) def clean_scm_type(self): return self.scm_type or '' @@ -251,7 +255,7 @@ class Project(UnifiedJobTemplate, ProjectOptions, ResourceMixin): def _get_unified_job_field_names(cls): return ['name', 'description', 'local_path', 'scm_type', 'scm_url', 'scm_branch', 'scm_clean', 'scm_delete_on_update', - 'credential', 'schedule'] + 'credential', 'schedule', 'timeout'] def save(self, *args, **kwargs): new_instance = not bool(self.pk) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 5cbddbf210..65c687e1d4 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -501,7 +501,7 @@ class BaseTask(Task): return OrderedDict() def run_pexpect(self, instance, args, cwd, env, passwords, stdout_handle, - output_replacements=None): + output_replacements=None, extra_update_fields=None): ''' Run the given command using pexpect to capture output and provide passwords when requested. @@ -517,9 +517,17 @@ class BaseTask(Task): if pexpect_sleep is not None: logger.info("Suspending Job Execution for QA Work") time.sleep(pexpect_sleep) + global_timeout = getattr(settings, 'DEFAULT_JOB_TIMEOUTS', {}) + cls_name = instance.__class__.__name__ + if cls_name in global_timeout: + local_timeout = getattr(instance, 'timeout', 0) + job_timeout = global_timeout[cls_name] if local_timeout == 0 else local_timeout + else: + job_timeout = 0 child = pexpect.spawnu(args[0], args[1:], cwd=cwd, env=env) child.logfile_read = logfile canceled = False + timed_out = False last_stdout_update = time.time() idle_timeout = self.get_idle_timeout() expect_list = [] @@ -531,6 +539,7 @@ class BaseTask(Task): expect_list.extend([pexpect.TIMEOUT, pexpect.EOF]) instance = self.update_model(instance.pk, status='running', output_replacements=output_replacements) + job_start = time.time() while child.isalive(): result_id = child.expect(expect_list, timeout=pexpect_timeout) if result_id in expect_passwords: @@ -541,39 +550,59 @@ class BaseTask(Task): # Refresh model instance from the database (to check cancel flag). instance = self.update_model(instance.pk) if instance.cancel_flag: - try: - if settings.AWX_PROOT_ENABLED and self.should_use_proot(instance): - # NOTE: Refactor this once we get a newer psutil across the board - if not psutil: - os.kill(child.pid, signal.SIGKILL) - else: - try: - main_proc = psutil.Process(pid=child.pid) - if hasattr(main_proc, "children"): - child_procs = main_proc.children(recursive=True) - else: - child_procs = main_proc.get_children(recursive=True) - for child_proc in child_procs: - os.kill(child_proc.pid, signal.SIGKILL) - os.kill(main_proc.pid, signal.SIGKILL) - except TypeError: - os.kill(child.pid, signal.SIGKILL) - else: - os.kill(child.pid, signal.SIGTERM) - time.sleep(3) - canceled = True - except OSError: - logger.warn("Attempted to cancel already finished job, ignoring") + canceled = True + elif job_timeout != 0 and (time.time() - job_start) > job_timeout: + timed_out = True + if isinstance(extra_update_fields, dict): + extra_update_fields['job_explanation'] = "Job terminated due to timeout" + if canceled or timed_out: + self._handle_termination(instance, child, is_cancel=canceled) if idle_timeout and (time.time() - last_stdout_update) > idle_timeout: child.close(True) canceled = True if canceled: return 'canceled', child.exitstatus - elif child.exitstatus == 0: + elif child.exitstatus == 0 and not timed_out: return 'successful', child.exitstatus else: return 'failed', child.exitstatus + def _handle_termination(self, instance, job, is_cancel=True): + '''Helper function to properly terminate specified job. + + Args: + instance: The corresponding model instance of this task. + job: The pexpect subprocess running the job. + is_cancel: Flag showing whether this termination is caused by instance's + cancel_flag. + + Return: + None. + ''' + try: + if settings.AWX_PROOT_ENABLED and self.should_use_proot(instance): + # NOTE: Refactor this once we get a newer psutil across the board + if not psutil: + os.kill(job.pid, signal.SIGKILL) + else: + try: + main_proc = psutil.Process(pid=job.pid) + if hasattr(main_proc, "children"): + child_procs = main_proc.children(recursive=True) + else: + child_procs = main_proc.get_children(recursive=True) + for child_proc in child_procs: + os.kill(child_proc.pid, signal.SIGKILL) + os.kill(main_proc.pid, signal.SIGKILL) + except TypeError: + os.kill(job.pid, signal.SIGKILL) + else: + os.kill(job.pid, signal.SIGTERM) + time.sleep(3) + except OSError: + keyword = 'cancel' if is_cancel else 'timeout' + logger.warn("Attempted to %s already finished job, ignoring" % keyword) + def pre_run_hook(self, instance, **kwargs): ''' Hook for any steps to run before the job/task starts @@ -593,6 +622,7 @@ class BaseTask(Task): instance.websocket_emit_status("running") status, rc, tb = 'error', None, '' output_replacements = [] + extra_update_fields = {} try: self.pre_run_hook(instance, **kwargs) if instance.cancel_flag: @@ -636,7 +666,8 @@ class BaseTask(Task): safe_args = self.wrap_args_with_ssh_agent(safe_args, ssh_key_path, ssh_auth_sock) instance = self.update_model(pk, job_args=json.dumps(safe_args), job_cwd=cwd, job_env=safe_env, result_stdout_file=stdout_filename) - status, rc = self.run_pexpect(instance, args, cwd, env, kwargs['passwords'], stdout_handle) + status, rc = self.run_pexpect(instance, args, cwd, env, kwargs['passwords'], stdout_handle, + extra_update_fields=extra_update_fields) except Exception: if status != 'canceled': tb = traceback.format_exc() @@ -657,7 +688,8 @@ class BaseTask(Task): except Exception: pass instance = self.update_model(pk, status=status, result_traceback=tb, - output_replacements=output_replacements) + output_replacements=output_replacements, + **extra_update_fields) self.post_run_hook(instance, **kwargs) instance.websocket_emit_status(status) if status != 'successful' and not hasattr(settings, 'CELERY_UNIT_TEST'): diff --git a/awx/settings/local_settings.py.docker_compose b/awx/settings/local_settings.py.docker_compose index 088f651e77..c68476e3d0 100644 --- a/awx/settings/local_settings.py.docker_compose +++ b/awx/settings/local_settings.py.docker_compose @@ -276,3 +276,10 @@ TEST_OPENSTACK_PROJECT = '' # Azure credentials. TEST_AZURE_USERNAME = '' TEST_AZURE_KEY_DATA = '' + +# Exemplary global job timeout settings +# DEFAULT_JOB_TIMEOUTS = { +# 'Job': 10, +# 'InventoryUpdate': 15, +# 'ProjectUpdate': 20, +# }