From 37139301b4317570eb76d8737450896eb82cc127 Mon Sep 17 00:00:00 2001 From: Aaron Tan Date: Mon, 12 Sep 2016 16:41:18 -0400 Subject: [PATCH 01/13] Add job timeout mechanism to tasks. --- awx/main/tasks.py | 76 ++++++++++++++++++++++++++++++++--------------- 1 file changed, 52 insertions(+), 24 deletions(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 5cbddbf210..34edb2e4a8 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -517,9 +517,12 @@ class BaseTask(Task): if pexpect_sleep is not None: logger.info("Suspending Job Execution for QA Work") time.sleep(pexpect_sleep) + # TODO: Update job_timeout using instance field instead. + job_timeout = 10 child = pexpect.spawnu(args[0], args[1:], cwd=cwd, env=env) child.logfile_read = logfile canceled = False + self.timeouted = False last_stdout_update = time.time() idle_timeout = self.get_idle_timeout() expect_list = [] @@ -531,6 +534,7 @@ class BaseTask(Task): expect_list.extend([pexpect.TIMEOUT, pexpect.EOF]) instance = self.update_model(instance.pk, status='running', output_replacements=output_replacements) + job_start = time.time() while child.isalive(): result_id = child.expect(expect_list, timeout=pexpect_timeout) if result_id in expect_passwords: @@ -541,39 +545,60 @@ class BaseTask(Task): # Refresh model instance from the database (to check cancel flag). instance = self.update_model(instance.pk) if instance.cancel_flag: - try: - if settings.AWX_PROOT_ENABLED and self.should_use_proot(instance): - # NOTE: Refactor this once we get a newer psutil across the board - if not psutil: - os.kill(child.pid, signal.SIGKILL) - else: - try: - main_proc = psutil.Process(pid=child.pid) - if hasattr(main_proc, "children"): - child_procs = main_proc.children(recursive=True) - else: - child_procs = main_proc.get_children(recursive=True) - for child_proc in child_procs: - os.kill(child_proc.pid, signal.SIGKILL) - os.kill(main_proc.pid, signal.SIGKILL) - except TypeError: - os.kill(child.pid, signal.SIGKILL) - else: - os.kill(child.pid, signal.SIGTERM) - time.sleep(3) - canceled = True - except OSError: - logger.warn("Attempted to cancel already finished job, ignoring") + canceled = self.__handle_termination(instance, child) + elif job_timeout != 0 and (time.time() - job_start) > job_timeout: + self.__handle_termination(instance, child, is_cancel=False) if idle_timeout and (time.time() - last_stdout_update) > idle_timeout: child.close(True) canceled = True if canceled: return 'canceled', child.exitstatus - elif child.exitstatus == 0: + elif child.exitstatus == 0 and not self.timeouted: return 'successful', child.exitstatus else: return 'failed', child.exitstatus + def __handle_termination(self, instance, job, is_cancel=True): + '''Helper function to properly terminate specified job and return correct + flags. + + Args: + instance: The corresponding model instance of this task. + job: The pexpect subprocess running the job. + is_cancel: Flag showing whether this termination is caused by instance's + cancel_flag. + + Return: + True if is_cancel is set or None. + ''' + try: + if tower_settings.AWX_PROOT_ENABLED and self.should_use_proot(instance): + # NOTE: Refactor this once we get a newer psutil across the board + if not psutil: + os.kill(job.pid, signal.SIGKILL) + else: + try: + main_proc = psutil.Process(pid=job.pid) + if hasattr(main_proc, "children"): + child_procs = main_proc.children(recursive=True) + else: + child_procs = main_proc.get_children(recursive=True) + for child_proc in child_procs: + os.kill(child_proc.pid, signal.SIGKILL) + os.kill(main_proc.pid, signal.SIGKILL) + except TypeError: + os.kill(job.pid, signal.SIGKILL) + else: + os.kill(job.pid, signal.SIGTERM) + time.sleep(3) + if is_cancel: + return True + else: + self.timeouted = True + except OSError: + keyword = 'cancel' if is_cancel else 'timeout' + logger.warn("Attempted to %s already finished job, ignoring" % keyword) + def pre_run_hook(self, instance, **kwargs): ''' Hook for any steps to run before the job/task starts @@ -654,6 +679,9 @@ class BaseTask(Task): try: stdout_handle.flush() stdout_handle.close() + if getattr(self, 'timeouted', False): + with open(stdout_filename, 'a') as f: + f.write("\x1b[1;31m%s\x1b[0m" % "JOB FAILS DUE TO TIMEOUT!") except Exception: pass instance = self.update_model(pk, status=status, result_traceback=tb, From d52e237ea09338761ec2a7f8b0e762465f625686 Mon Sep 17 00:00:00 2001 From: Aaron Tan Date: Tue, 13 Sep 2016 16:17:31 -0400 Subject: [PATCH 02/13] Implement timeout on job/job-template. --- awx/api/serializers.py | 2 +- awx/main/migrations/0033_v310_job_timeout.py | 24 ++++++++++++++++++++ awx/main/models/jobs.py | 8 +++++++ 3 files changed, 33 insertions(+), 1 deletion(-) create mode 100644 awx/main/migrations/0033_v310_job_timeout.py diff --git a/awx/api/serializers.py b/awx/api/serializers.py index ded5d454c3..4ea799682d 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -1789,7 +1789,7 @@ class JobOptionsSerializer(BaseSerializer): fields = ('*', 'job_type', 'inventory', 'project', 'playbook', 'credential', 'cloud_credential', 'network_credential', 'forks', 'limit', 'verbosity', 'extra_vars', 'job_tags', 'force_handlers', - 'skip_tags', 'start_at_task',) + 'skip_tags', 'start_at_task', 'timeout') def get_related(self, obj): res = super(JobOptionsSerializer, self).get_related(obj) diff --git a/awx/main/migrations/0033_v310_job_timeout.py b/awx/main/migrations/0033_v310_job_timeout.py new file mode 100644 index 0000000000..df18030d47 --- /dev/null +++ b/awx/main/migrations/0033_v310_job_timeout.py @@ -0,0 +1,24 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('main', '0032_v302_credential_permissions_update'), + ] + + operations = [ + migrations.AddField( + model_name='job', + name='timeout', + field=models.PositiveIntegerField(default=0, blank=True), + ), + migrations.AddField( + model_name='jobtemplate', + name='timeout', + field=models.PositiveIntegerField(default=0, blank=True), + ), + ] diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index 27efff9cc7..5d33feb533 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -143,6 +143,10 @@ class JobOptions(BaseModel): allow_simultaneous = models.BooleanField( default=False, ) + timeout = models.PositiveIntegerField( + blank=True, + default=0, + ) extra_vars_dict = VarsDictProperty('extra_vars', True) @@ -253,7 +257,11 @@ class JobTemplate(UnifiedJobTemplate, JobOptions, ResourceMixin): 'playbook', 'credential', 'cloud_credential', 'network_credential', 'forks', 'schedule', 'limit', 'verbosity', 'job_tags', 'extra_vars', 'launch_type', 'force_handlers', 'skip_tags', 'start_at_task', 'become_enabled', +<<<<<<< 887818cb8987d465e23f1ed0c5886473acdbfcda 'labels', 'survey_passwords', 'allow_simultaneous',] +======= + 'labels', 'survey_passwords', 'timeout'] +>>>>>>> Implement timeout on job/job-template. def resource_validation_data(self): ''' From 2eda6b95284379543223052def07a0a47a986316 Mon Sep 17 00:00:00 2001 From: Aaron Tan Date: Wed, 14 Sep 2016 14:52:24 -0400 Subject: [PATCH 03/13] Implement timeout on other unified jobs/JTs. --- awx/api/serializers.py | 9 ++--- awx/main/migrations/0033_v310_job_timeout.py | 35 ++++++++++++++++++++ awx/main/models/ad_hoc_commands.py | 4 +++ awx/main/models/inventory.py | 7 +++- awx/main/models/jobs.py | 7 +++- awx/main/models/projects.py | 6 +++- 6 files changed, 61 insertions(+), 7 deletions(-) diff --git a/awx/api/serializers.py b/awx/api/serializers.py index 4ea799682d..d72a3e3730 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -914,7 +914,7 @@ class ProjectSerializer(UnifiedJobTemplateSerializer, ProjectOptionsSerializer): class Meta: model = Project fields = ('*', 'organization', 'scm_delete_on_next_update', 'scm_update_on_launch', - 'scm_update_cache_timeout') + \ + 'scm_update_cache_timeout', 'timeout') + \ ('last_update_failed', 'last_updated') # Backwards compatibility read_only_fields = ('scm_delete_on_next_update',) @@ -1329,7 +1329,8 @@ class InventorySourceOptionsSerializer(BaseSerializer): class Meta: fields = ('*', 'source', 'source_path', 'source_script', 'source_vars', 'credential', - 'source_regions', 'instance_filters', 'group_by', 'overwrite', 'overwrite_vars') + 'source_regions', 'instance_filters', 'group_by', 'overwrite', 'overwrite_vars', + 'timeout') def get_related(self, obj): res = super(InventorySourceOptionsSerializer, self).get_related(obj) @@ -2061,7 +2062,7 @@ class AdHocCommandSerializer(UnifiedJobSerializer): model = AdHocCommand fields = ('*', 'job_type', 'inventory', 'limit', 'credential', 'module_name', 'module_args', 'forks', 'verbosity', 'extra_vars', - 'become_enabled', '-unified_job_template', '-description') + 'become_enabled', 'timeout', '-unified_job_template', '-description') extra_kwargs = { 'name': { 'read_only': True, @@ -2159,7 +2160,7 @@ class SystemJobSerializer(UnifiedJobSerializer): class Meta: model = SystemJob - fields = ('*', 'system_job_template', 'job_type', 'extra_vars') + fields = ('*', 'system_job_template', 'job_type', 'extra_vars', 'timeout') def get_related(self, obj): res = super(SystemJobSerializer, self).get_related(obj) diff --git a/awx/main/migrations/0033_v310_job_timeout.py b/awx/main/migrations/0033_v310_job_timeout.py index df18030d47..5c0ddd1826 100644 --- a/awx/main/migrations/0033_v310_job_timeout.py +++ b/awx/main/migrations/0033_v310_job_timeout.py @@ -11,6 +11,21 @@ class Migration(migrations.Migration): ] operations = [ + migrations.AddField( + model_name='adhoccommand', + name='timeout', + field=models.PositiveIntegerField(default=0, blank=True), + ), + migrations.AddField( + model_name='inventorysource', + name='timeout', + field=models.PositiveIntegerField(default=0, blank=True), + ), + migrations.AddField( + model_name='inventoryupdate', + name='timeout', + field=models.PositiveIntegerField(default=0, blank=True), + ), migrations.AddField( model_name='job', name='timeout', @@ -21,4 +36,24 @@ class Migration(migrations.Migration): name='timeout', field=models.PositiveIntegerField(default=0, blank=True), ), + migrations.AddField( + model_name='project', + name='timeout', + field=models.PositiveIntegerField(default=0, blank=True), + ), + migrations.AddField( + model_name='projectupdate', + name='timeout', + field=models.PositiveIntegerField(default=0, blank=True), + ), + migrations.AddField( + model_name='systemjob', + name='timeout', + field=models.PositiveIntegerField(default=0, blank=True), + ), + migrations.AddField( + model_name='systemjobtemplate', + name='timeout', + field=models.PositiveIntegerField(default=0, blank=True), + ), ] diff --git a/awx/main/models/ad_hoc_commands.py b/awx/main/models/ad_hoc_commands.py index b03be56452..dce9767bd0 100644 --- a/awx/main/models/ad_hoc_commands.py +++ b/awx/main/models/ad_hoc_commands.py @@ -88,6 +88,10 @@ class AdHocCommand(UnifiedJob, JobNotificationMixin): blank=True, default='', ) + timeout = models.PositiveIntegerField( + blank=True, + default=0, + ) extra_vars_dict = VarsDictProperty('extra_vars', True) diff --git a/awx/main/models/inventory.py b/awx/main/models/inventory.py index 7dc7eae7b5..6fb3e2f992 100644 --- a/awx/main/models/inventory.py +++ b/awx/main/models/inventory.py @@ -860,6 +860,10 @@ class InventorySourceOptions(BaseModel): default=False, help_text=_('Overwrite local variables from remote inventory source.'), ) + timeout = models.PositiveIntegerField( + blank=True, + default=0, + ) @classmethod def get_ec2_region_choices(cls): @@ -1084,7 +1088,8 @@ class InventorySource(UnifiedJobTemplate, InventorySourceOptions): @classmethod def _get_unified_job_field_names(cls): return ['name', 'description', 'source', 'source_path', 'source_script', 'source_vars', 'schedule', - 'credential', 'source_regions', 'instance_filters', 'group_by', 'overwrite', 'overwrite_vars'] + 'credential', 'source_regions', 'instance_filters', 'group_by', 'overwrite', 'overwrite_vars', + 'timeout'] def save(self, *args, **kwargs): # If update_fields has been specified, add our field names to it, diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index 5d33feb533..076a5c618f 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -1336,6 +1336,11 @@ class SystemJobOptions(BaseModel): default='', ) + timeout = models.PositiveIntegerField( + blank=True, + default=0, + ) + class SystemJobTemplate(UnifiedJobTemplate, SystemJobOptions): class Meta: @@ -1347,7 +1352,7 @@ class SystemJobTemplate(UnifiedJobTemplate, SystemJobOptions): @classmethod def _get_unified_job_field_names(cls): - return ['name', 'description', 'job_type', 'extra_vars'] + return ['name', 'description', 'job_type', 'extra_vars', 'timeout'] def get_absolute_url(self): return reverse('api:system_job_template_detail', args=(self.pk,)) diff --git a/awx/main/models/projects.py b/awx/main/models/projects.py index 59263fff6a..256809d4ac 100644 --- a/awx/main/models/projects.py +++ b/awx/main/models/projects.py @@ -106,6 +106,10 @@ class ProjectOptions(models.Model): default=None, on_delete=models.SET_NULL, ) + timeout = models.PositiveIntegerField( + blank=True, + default=0, + ) def clean_scm_type(self): return self.scm_type or '' @@ -251,7 +255,7 @@ class Project(UnifiedJobTemplate, ProjectOptions, ResourceMixin): def _get_unified_job_field_names(cls): return ['name', 'description', 'local_path', 'scm_type', 'scm_url', 'scm_branch', 'scm_clean', 'scm_delete_on_update', - 'credential', 'schedule'] + 'credential', 'schedule', 'timeout'] def save(self, *args, **kwargs): new_instance = not bool(self.pk) From 69be94feb9206203d3f17975fc0ce4d6f6c54bd8 Mon Sep 17 00:00:00 2001 From: Aaron Tan Date: Thu, 15 Sep 2016 14:56:27 -0400 Subject: [PATCH 04/13] Implement global job timeout mechanism. --- awx/main/tasks.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 34edb2e4a8..7461bf9edf 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -517,8 +517,9 @@ class BaseTask(Task): if pexpect_sleep is not None: logger.info("Suspending Job Execution for QA Work") time.sleep(pexpect_sleep) - # TODO: Update job_timeout using instance field instead. - job_timeout = 10 + local_timeout = getattr(instance, 'timeout', 0) + global_timeout = getattr(settings, 'JOB_TIMEOUT', 0) + job_timeout = global_timeout if local_timeout == 0 else local_timeout child = pexpect.spawnu(args[0], args[1:], cwd=cwd, env=env) child.logfile_read = logfile canceled = False From 70481414c616a2c378acb8089ea9801417436a93 Mon Sep 17 00:00:00 2001 From: Aaron Tan Date: Thu, 15 Sep 2016 15:43:29 -0400 Subject: [PATCH 05/13] Migration update. --- .../{0033_v310_job_timeout.py => 0036_v310_job_timeout.py} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename awx/main/migrations/{0033_v310_job_timeout.py => 0036_v310_job_timeout.py} (96%) diff --git a/awx/main/migrations/0033_v310_job_timeout.py b/awx/main/migrations/0036_v310_job_timeout.py similarity index 96% rename from awx/main/migrations/0033_v310_job_timeout.py rename to awx/main/migrations/0036_v310_job_timeout.py index 5c0ddd1826..5045985fb7 100644 --- a/awx/main/migrations/0033_v310_job_timeout.py +++ b/awx/main/migrations/0036_v310_job_timeout.py @@ -7,7 +7,7 @@ from django.db import migrations, models class Migration(migrations.Migration): dependencies = [ - ('main', '0032_v302_credential_permissions_update'), + ('main', '0035_v310_jobevent_uuid'), ] operations = [ From 33952808c45312c559ace0affb0ee3d9a78794a5 Mon Sep 17 00:00:00 2001 From: Aaron Tan Date: Wed, 21 Sep 2016 11:37:41 -0400 Subject: [PATCH 06/13] Readability improvement and job explaination added. --- awx/main/tasks.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 7461bf9edf..065758fc0f 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -523,7 +523,7 @@ class BaseTask(Task): child = pexpect.spawnu(args[0], args[1:], cwd=cwd, env=env) child.logfile_read = logfile canceled = False - self.timeouted = False + self.timed_out = False last_stdout_update = time.time() idle_timeout = self.get_idle_timeout() expect_list = [] @@ -554,7 +554,7 @@ class BaseTask(Task): canceled = True if canceled: return 'canceled', child.exitstatus - elif child.exitstatus == 0 and not self.timeouted: + elif child.exitstatus == 0 and not self.timed_out: return 'successful', child.exitstatus else: return 'failed', child.exitstatus @@ -594,8 +594,7 @@ class BaseTask(Task): time.sleep(3) if is_cancel: return True - else: - self.timeouted = True + self.timed_out = True except OSError: keyword = 'cancel' if is_cancel else 'timeout' logger.warn("Attempted to %s already finished job, ignoring" % keyword) @@ -641,6 +640,7 @@ class BaseTask(Task): args = self.build_args(instance, **kwargs) safe_args = self.build_safe_args(instance, **kwargs) output_replacements = self.build_output_replacements(instance, **kwargs) + job_explanation = "" cwd = self.build_cwd(instance, **kwargs) env = self.build_env(instance, **kwargs) safe_env = self.build_safe_env(instance, **kwargs) @@ -680,13 +680,15 @@ class BaseTask(Task): try: stdout_handle.flush() stdout_handle.close() - if getattr(self, 'timeouted', False): + if getattr(self, 'timed_out', False): + job_explanation = "Job terminated due to timeout" with open(stdout_filename, 'a') as f: f.write("\x1b[1;31m%s\x1b[0m" % "JOB FAILS DUE TO TIMEOUT!") except Exception: pass instance = self.update_model(pk, status=status, result_traceback=tb, - output_replacements=output_replacements) + output_replacements=output_replacements, + job_explanation=job_explanation) self.post_run_hook(instance, **kwargs) instance.websocket_emit_status(status) if status != 'successful' and not hasattr(settings, 'CELERY_UNIT_TEST'): From 651a09f5fbedd466fd5a45d8847aad825bc94829 Mon Sep 17 00:00:00 2001 From: Aaron Tan Date: Thu, 22 Sep 2016 11:12:24 -0400 Subject: [PATCH 07/13] Remove stdout indicator. --- awx/main/tasks.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 065758fc0f..e2cb50c737 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -682,8 +682,6 @@ class BaseTask(Task): stdout_handle.close() if getattr(self, 'timed_out', False): job_explanation = "Job terminated due to timeout" - with open(stdout_filename, 'a') as f: - f.write("\x1b[1;31m%s\x1b[0m" % "JOB FAILS DUE TO TIMEOUT!") except Exception: pass instance = self.update_model(pk, status=status, result_traceback=tb, From 96e3228c9d8abc654a6f1572edb248a3b52a80e1 Mon Sep 17 00:00:00 2001 From: Aaron Tan Date: Sat, 24 Sep 2016 00:46:42 -0400 Subject: [PATCH 08/13] Change global settings structure and fix potential celery issue. --- awx/main/tasks.py | 41 +++++++++++-------- awx/settings/local_settings.py.docker_compose | 7 ++++ 2 files changed, 31 insertions(+), 17 deletions(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index e2cb50c737..2e25ee39b4 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -501,7 +501,7 @@ class BaseTask(Task): return OrderedDict() def run_pexpect(self, instance, args, cwd, env, passwords, stdout_handle, - output_replacements=None): + output_replacements=None, runtime_flags={}): ''' Run the given command using pexpect to capture output and provide passwords when requested. @@ -517,13 +517,17 @@ class BaseTask(Task): if pexpect_sleep is not None: logger.info("Suspending Job Execution for QA Work") time.sleep(pexpect_sleep) - local_timeout = getattr(instance, 'timeout', 0) - global_timeout = getattr(settings, 'JOB_TIMEOUT', 0) - job_timeout = global_timeout if local_timeout == 0 else local_timeout + global_timeout = getattr(settings, 'DEFAULT_TIMEOUT', {}) + cls_name = instance.__class__.__name__ + if cls_name in global_timeout: + local_timeout = getattr(instance, 'timeout', 0) + job_timeout = global_timeout[cls_name] if local_timeout == 0 else local_timeout + else: + job_timeout = 0 child = pexpect.spawnu(args[0], args[1:], cwd=cwd, env=env) child.logfile_read = logfile - canceled = False - self.timed_out = False + runtime_flags['canceled'] = False + runtime_flags['timed_out'] = False last_stdout_update = time.time() idle_timeout = self.get_idle_timeout() expect_list = [] @@ -546,22 +550,23 @@ class BaseTask(Task): # Refresh model instance from the database (to check cancel flag). instance = self.update_model(instance.pk) if instance.cancel_flag: - canceled = self.__handle_termination(instance, child) + runtime_flags['canceled'] = True elif job_timeout != 0 and (time.time() - job_start) > job_timeout: - self.__handle_termination(instance, child, is_cancel=False) + runtime_flags['timed_out'] = True + if any(list(runtime_flags.values())): + self._handle_termination(instance, child, is_cancel=runtime_flags['canceled']) if idle_timeout and (time.time() - last_stdout_update) > idle_timeout: child.close(True) - canceled = True - if canceled: + runtime_flags['canceled'] = True + if runtime_flags['canceled']: return 'canceled', child.exitstatus - elif child.exitstatus == 0 and not self.timed_out: + elif child.exitstatus == 0 and not runtime_flags['timed_out']: return 'successful', child.exitstatus else: return 'failed', child.exitstatus - def __handle_termination(self, instance, job, is_cancel=True): - '''Helper function to properly terminate specified job and return correct - flags. + def _handle_termination(self, instance, job, is_cancel=True): + '''Helper function to properly terminate specified job. Args: instance: The corresponding model instance of this task. @@ -570,7 +575,7 @@ class BaseTask(Task): cancel_flag. Return: - True if is_cancel is set or None. + None. ''' try: if tower_settings.AWX_PROOT_ENABLED and self.should_use_proot(instance): @@ -618,6 +623,7 @@ class BaseTask(Task): instance.websocket_emit_status("running") status, rc, tb = 'error', None, '' output_replacements = [] + runtime_flags = {} try: self.pre_run_hook(instance, **kwargs) if instance.cancel_flag: @@ -662,7 +668,8 @@ class BaseTask(Task): safe_args = self.wrap_args_with_ssh_agent(safe_args, ssh_key_path, ssh_auth_sock) instance = self.update_model(pk, job_args=json.dumps(safe_args), job_cwd=cwd, job_env=safe_env, result_stdout_file=stdout_filename) - status, rc = self.run_pexpect(instance, args, cwd, env, kwargs['passwords'], stdout_handle) + status, rc = self.run_pexpect(instance, args, cwd, env, kwargs['passwords'], stdout_handle, + runtime_flags=runtime_flags) except Exception: if status != 'canceled': tb = traceback.format_exc() @@ -680,7 +687,7 @@ class BaseTask(Task): try: stdout_handle.flush() stdout_handle.close() - if getattr(self, 'timed_out', False): + if runtime_flags.get('timed_out', False): job_explanation = "Job terminated due to timeout" except Exception: pass diff --git a/awx/settings/local_settings.py.docker_compose b/awx/settings/local_settings.py.docker_compose index 088f651e77..5a2306bc17 100644 --- a/awx/settings/local_settings.py.docker_compose +++ b/awx/settings/local_settings.py.docker_compose @@ -276,3 +276,10 @@ TEST_OPENSTACK_PROJECT = '' # Azure credentials. TEST_AZURE_USERNAME = '' TEST_AZURE_KEY_DATA = '' + +# Exemplary job timeout settings +# DEFAULT_TIMEOUT = { +# 'Job': 10, +# 'InventoryUpdate': 15, +# 'ProjectUpdate': 20, +# } From 701c23171f4a5565611637b93a9a82f07e1118eb Mon Sep 17 00:00:00 2001 From: Aaron Tan Date: Sat, 24 Sep 2016 11:06:01 -0400 Subject: [PATCH 09/13] Remove timeout fields from sys job & adhoc cmd. --- awx/api/serializers.py | 4 ++-- awx/main/migrations/0036_v310_job_timeout.py | 15 --------------- awx/main/models/ad_hoc_commands.py | 4 ---- awx/main/models/jobs.py | 6 +----- awx/main/tasks.py | 3 --- 5 files changed, 3 insertions(+), 29 deletions(-) diff --git a/awx/api/serializers.py b/awx/api/serializers.py index d72a3e3730..7eeaf1f92a 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -2062,7 +2062,7 @@ class AdHocCommandSerializer(UnifiedJobSerializer): model = AdHocCommand fields = ('*', 'job_type', 'inventory', 'limit', 'credential', 'module_name', 'module_args', 'forks', 'verbosity', 'extra_vars', - 'become_enabled', 'timeout', '-unified_job_template', '-description') + 'become_enabled', '-unified_job_template', '-description') extra_kwargs = { 'name': { 'read_only': True, @@ -2160,7 +2160,7 @@ class SystemJobSerializer(UnifiedJobSerializer): class Meta: model = SystemJob - fields = ('*', 'system_job_template', 'job_type', 'extra_vars', 'timeout') + fields = ('*', 'system_job_template', 'job_type', 'extra_vars') def get_related(self, obj): res = super(SystemJobSerializer, self).get_related(obj) diff --git a/awx/main/migrations/0036_v310_job_timeout.py b/awx/main/migrations/0036_v310_job_timeout.py index 5045985fb7..7bc5868ff8 100644 --- a/awx/main/migrations/0036_v310_job_timeout.py +++ b/awx/main/migrations/0036_v310_job_timeout.py @@ -11,11 +11,6 @@ class Migration(migrations.Migration): ] operations = [ - migrations.AddField( - model_name='adhoccommand', - name='timeout', - field=models.PositiveIntegerField(default=0, blank=True), - ), migrations.AddField( model_name='inventorysource', name='timeout', @@ -46,14 +41,4 @@ class Migration(migrations.Migration): name='timeout', field=models.PositiveIntegerField(default=0, blank=True), ), - migrations.AddField( - model_name='systemjob', - name='timeout', - field=models.PositiveIntegerField(default=0, blank=True), - ), - migrations.AddField( - model_name='systemjobtemplate', - name='timeout', - field=models.PositiveIntegerField(default=0, blank=True), - ), ] diff --git a/awx/main/models/ad_hoc_commands.py b/awx/main/models/ad_hoc_commands.py index dce9767bd0..b03be56452 100644 --- a/awx/main/models/ad_hoc_commands.py +++ b/awx/main/models/ad_hoc_commands.py @@ -88,10 +88,6 @@ class AdHocCommand(UnifiedJob, JobNotificationMixin): blank=True, default='', ) - timeout = models.PositiveIntegerField( - blank=True, - default=0, - ) extra_vars_dict = VarsDictProperty('extra_vars', True) diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index 076a5c618f..ae83cf87ba 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -1336,10 +1336,6 @@ class SystemJobOptions(BaseModel): default='', ) - timeout = models.PositiveIntegerField( - blank=True, - default=0, - ) class SystemJobTemplate(UnifiedJobTemplate, SystemJobOptions): @@ -1352,7 +1348,7 @@ class SystemJobTemplate(UnifiedJobTemplate, SystemJobOptions): @classmethod def _get_unified_job_field_names(cls): - return ['name', 'description', 'job_type', 'extra_vars', 'timeout'] + return ['name', 'description', 'job_type', 'extra_vars'] def get_absolute_url(self): return reverse('api:system_job_template_detail', args=(self.pk,)) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 2e25ee39b4..f1e8daed78 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -597,9 +597,6 @@ class BaseTask(Task): else: os.kill(job.pid, signal.SIGTERM) time.sleep(3) - if is_cancel: - return True - self.timed_out = True except OSError: keyword = 'cancel' if is_cancel else 'timeout' logger.warn("Attempted to %s already finished job, ignoring" % keyword) From 2362f34d270c2ef9e468b35ba296d22df757ddef Mon Sep 17 00:00:00 2001 From: Aaron Tan Date: Mon, 26 Sep 2016 11:17:28 -0400 Subject: [PATCH 10/13] Implement pluggable job explanation. --- awx/main/tasks.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index f1e8daed78..7afd245f7c 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -621,6 +621,7 @@ class BaseTask(Task): status, rc, tb = 'error', None, '' output_replacements = [] runtime_flags = {} + extra_update_fields = {} try: self.pre_run_hook(instance, **kwargs) if instance.cancel_flag: @@ -643,7 +644,6 @@ class BaseTask(Task): args = self.build_args(instance, **kwargs) safe_args = self.build_safe_args(instance, **kwargs) output_replacements = self.build_output_replacements(instance, **kwargs) - job_explanation = "" cwd = self.build_cwd(instance, **kwargs) env = self.build_env(instance, **kwargs) safe_env = self.build_safe_env(instance, **kwargs) @@ -685,12 +685,12 @@ class BaseTask(Task): stdout_handle.flush() stdout_handle.close() if runtime_flags.get('timed_out', False): - job_explanation = "Job terminated due to timeout" + extra_update_fields['job_explanation'] = "Job terminated due to timeout" except Exception: pass instance = self.update_model(pk, status=status, result_traceback=tb, output_replacements=output_replacements, - job_explanation=job_explanation) + **extra_update_fields) self.post_run_hook(instance, **kwargs) instance.websocket_emit_status(status) if status != 'successful' and not hasattr(settings, 'CELERY_UNIT_TEST'): From 21feae7c11e96fb52ca50ab901cc629a21fa41c7 Mon Sep 17 00:00:00 2001 From: Aaron Tan Date: Mon, 26 Sep 2016 15:00:37 -0400 Subject: [PATCH 11/13] Rename default timeout and remove runtime_flag. --- awx/main/tasks.py | 28 +++++++++---------- awx/settings/local_settings.py.docker_compose | 4 +-- 2 files changed, 15 insertions(+), 17 deletions(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 7afd245f7c..fa69d1562a 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -501,7 +501,7 @@ class BaseTask(Task): return OrderedDict() def run_pexpect(self, instance, args, cwd, env, passwords, stdout_handle, - output_replacements=None, runtime_flags={}): + output_replacements=None, extra_update_fields={}): ''' Run the given command using pexpect to capture output and provide passwords when requested. @@ -517,7 +517,7 @@ class BaseTask(Task): if pexpect_sleep is not None: logger.info("Suspending Job Execution for QA Work") time.sleep(pexpect_sleep) - global_timeout = getattr(settings, 'DEFAULT_TIMEOUT', {}) + global_timeout = getattr(settings, 'DEFAULT_JOB_TIMEOUTS', {}) cls_name = instance.__class__.__name__ if cls_name in global_timeout: local_timeout = getattr(instance, 'timeout', 0) @@ -526,8 +526,8 @@ class BaseTask(Task): job_timeout = 0 child = pexpect.spawnu(args[0], args[1:], cwd=cwd, env=env) child.logfile_read = logfile - runtime_flags['canceled'] = False - runtime_flags['timed_out'] = False + canceled = False + timed_out = False last_stdout_update = time.time() idle_timeout = self.get_idle_timeout() expect_list = [] @@ -550,17 +550,18 @@ class BaseTask(Task): # Refresh model instance from the database (to check cancel flag). instance = self.update_model(instance.pk) if instance.cancel_flag: - runtime_flags['canceled'] = True + canceled = True elif job_timeout != 0 and (time.time() - job_start) > job_timeout: - runtime_flags['timed_out'] = True - if any(list(runtime_flags.values())): - self._handle_termination(instance, child, is_cancel=runtime_flags['canceled']) + timed_out = True + extra_update_fields['job_explanation'] = "Job terminated due to timeout" + if canceled or timed_out: + self._handle_termination(instance, child, is_cancel=canceled) if idle_timeout and (time.time() - last_stdout_update) > idle_timeout: child.close(True) - runtime_flags['canceled'] = True - if runtime_flags['canceled']: + canceled = True + if canceled: return 'canceled', child.exitstatus - elif child.exitstatus == 0 and not runtime_flags['timed_out']: + elif child.exitstatus == 0 and not timed_out: return 'successful', child.exitstatus else: return 'failed', child.exitstatus @@ -620,7 +621,6 @@ class BaseTask(Task): instance.websocket_emit_status("running") status, rc, tb = 'error', None, '' output_replacements = [] - runtime_flags = {} extra_update_fields = {} try: self.pre_run_hook(instance, **kwargs) @@ -666,7 +666,7 @@ class BaseTask(Task): instance = self.update_model(pk, job_args=json.dumps(safe_args), job_cwd=cwd, job_env=safe_env, result_stdout_file=stdout_filename) status, rc = self.run_pexpect(instance, args, cwd, env, kwargs['passwords'], stdout_handle, - runtime_flags=runtime_flags) + extra_update_fields=extra_update_fields) except Exception: if status != 'canceled': tb = traceback.format_exc() @@ -684,8 +684,6 @@ class BaseTask(Task): try: stdout_handle.flush() stdout_handle.close() - if runtime_flags.get('timed_out', False): - extra_update_fields['job_explanation'] = "Job terminated due to timeout" except Exception: pass instance = self.update_model(pk, status=status, result_traceback=tb, diff --git a/awx/settings/local_settings.py.docker_compose b/awx/settings/local_settings.py.docker_compose index 5a2306bc17..c68476e3d0 100644 --- a/awx/settings/local_settings.py.docker_compose +++ b/awx/settings/local_settings.py.docker_compose @@ -277,8 +277,8 @@ TEST_OPENSTACK_PROJECT = '' TEST_AZURE_USERNAME = '' TEST_AZURE_KEY_DATA = '' -# Exemplary job timeout settings -# DEFAULT_TIMEOUT = { +# Exemplary global job timeout settings +# DEFAULT_JOB_TIMEOUTS = { # 'Job': 10, # 'InventoryUpdate': 15, # 'ProjectUpdate': 20, From c59549295077a74e800c0fc2ea6774941ac80da7 Mon Sep 17 00:00:00 2001 From: Aaron Tan Date: Mon, 17 Oct 2016 17:08:05 -0400 Subject: [PATCH 12/13] Solve rebase conflicts. --- .../{0036_v310_job_timeout.py => 0041_v310_job_timeout.py} | 2 +- awx/main/models/jobs.py | 6 +----- 2 files changed, 2 insertions(+), 6 deletions(-) rename awx/main/migrations/{0036_v310_job_timeout.py => 0041_v310_job_timeout.py} (96%) diff --git a/awx/main/migrations/0036_v310_job_timeout.py b/awx/main/migrations/0041_v310_job_timeout.py similarity index 96% rename from awx/main/migrations/0036_v310_job_timeout.py rename to awx/main/migrations/0041_v310_job_timeout.py index 7bc5868ff8..447ed6d38b 100644 --- a/awx/main/migrations/0036_v310_job_timeout.py +++ b/awx/main/migrations/0041_v310_job_timeout.py @@ -7,7 +7,7 @@ from django.db import migrations, models class Migration(migrations.Migration): dependencies = [ - ('main', '0035_v310_jobevent_uuid'), + ('main', '0040_v310_artifacts'), ] operations = [ diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index ae83cf87ba..9c216fc526 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -257,11 +257,7 @@ class JobTemplate(UnifiedJobTemplate, JobOptions, ResourceMixin): 'playbook', 'credential', 'cloud_credential', 'network_credential', 'forks', 'schedule', 'limit', 'verbosity', 'job_tags', 'extra_vars', 'launch_type', 'force_handlers', 'skip_tags', 'start_at_task', 'become_enabled', -<<<<<<< 887818cb8987d465e23f1ed0c5886473acdbfcda - 'labels', 'survey_passwords', 'allow_simultaneous',] -======= - 'labels', 'survey_passwords', 'timeout'] ->>>>>>> Implement timeout on job/job-template. + 'labels', 'survey_passwords', 'allow_simultaneous', 'timeout'] def resource_validation_data(self): ''' From 5fdd44c3f4f5af33de9e5595dfea2457614b4852 Mon Sep 17 00:00:00 2001 From: Aaron Tan Date: Tue, 18 Oct 2016 09:48:19 -0400 Subject: [PATCH 13/13] Fix mutable default argument issue. --- awx/main/tasks.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index fa69d1562a..65c687e1d4 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -501,7 +501,7 @@ class BaseTask(Task): return OrderedDict() def run_pexpect(self, instance, args, cwd, env, passwords, stdout_handle, - output_replacements=None, extra_update_fields={}): + output_replacements=None, extra_update_fields=None): ''' Run the given command using pexpect to capture output and provide passwords when requested. @@ -553,7 +553,8 @@ class BaseTask(Task): canceled = True elif job_timeout != 0 and (time.time() - job_start) > job_timeout: timed_out = True - extra_update_fields['job_explanation'] = "Job terminated due to timeout" + if isinstance(extra_update_fields, dict): + extra_update_fields['job_explanation'] = "Job terminated due to timeout" if canceled or timed_out: self._handle_termination(instance, child, is_cancel=canceled) if idle_timeout and (time.time() - last_stdout_update) > idle_timeout: @@ -579,7 +580,7 @@ class BaseTask(Task): None. ''' try: - if tower_settings.AWX_PROOT_ENABLED and self.should_use_proot(instance): + if settings.AWX_PROOT_ENABLED and self.should_use_proot(instance): # NOTE: Refactor this once we get a newer psutil across the board if not psutil: os.kill(job.pid, signal.SIGKILL)