From 37139301b4317570eb76d8737450896eb82cc127 Mon Sep 17 00:00:00 2001 From: Aaron Tan Date: Mon, 12 Sep 2016 16:41:18 -0400 Subject: [PATCH] Add job timeout mechanism to tasks. --- awx/main/tasks.py | 76 ++++++++++++++++++++++++++++++++--------------- 1 file changed, 52 insertions(+), 24 deletions(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 5cbddbf210..34edb2e4a8 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -517,9 +517,12 @@ class BaseTask(Task): if pexpect_sleep is not None: logger.info("Suspending Job Execution for QA Work") time.sleep(pexpect_sleep) + # TODO: Update job_timeout using instance field instead. + job_timeout = 10 child = pexpect.spawnu(args[0], args[1:], cwd=cwd, env=env) child.logfile_read = logfile canceled = False + self.timeouted = False last_stdout_update = time.time() idle_timeout = self.get_idle_timeout() expect_list = [] @@ -531,6 +534,7 @@ class BaseTask(Task): expect_list.extend([pexpect.TIMEOUT, pexpect.EOF]) instance = self.update_model(instance.pk, status='running', output_replacements=output_replacements) + job_start = time.time() while child.isalive(): result_id = child.expect(expect_list, timeout=pexpect_timeout) if result_id in expect_passwords: @@ -541,39 +545,60 @@ class BaseTask(Task): # Refresh model instance from the database (to check cancel flag). instance = self.update_model(instance.pk) if instance.cancel_flag: - try: - if settings.AWX_PROOT_ENABLED and self.should_use_proot(instance): - # NOTE: Refactor this once we get a newer psutil across the board - if not psutil: - os.kill(child.pid, signal.SIGKILL) - else: - try: - main_proc = psutil.Process(pid=child.pid) - if hasattr(main_proc, "children"): - child_procs = main_proc.children(recursive=True) - else: - child_procs = main_proc.get_children(recursive=True) - for child_proc in child_procs: - os.kill(child_proc.pid, signal.SIGKILL) - os.kill(main_proc.pid, signal.SIGKILL) - except TypeError: - os.kill(child.pid, signal.SIGKILL) - else: - os.kill(child.pid, signal.SIGTERM) - time.sleep(3) - canceled = True - except OSError: - logger.warn("Attempted to cancel already finished job, ignoring") + canceled = self.__handle_termination(instance, child) + elif job_timeout != 0 and (time.time() - job_start) > job_timeout: + self.__handle_termination(instance, child, is_cancel=False) if idle_timeout and (time.time() - last_stdout_update) > idle_timeout: child.close(True) canceled = True if canceled: return 'canceled', child.exitstatus - elif child.exitstatus == 0: + elif child.exitstatus == 0 and not self.timeouted: return 'successful', child.exitstatus else: return 'failed', child.exitstatus + def __handle_termination(self, instance, job, is_cancel=True): + '''Helper function to properly terminate specified job and return correct + flags. + + Args: + instance: The corresponding model instance of this task. + job: The pexpect subprocess running the job. + is_cancel: Flag showing whether this termination is caused by instance's + cancel_flag. + + Return: + True if is_cancel is set or None. + ''' + try: + if tower_settings.AWX_PROOT_ENABLED and self.should_use_proot(instance): + # NOTE: Refactor this once we get a newer psutil across the board + if not psutil: + os.kill(job.pid, signal.SIGKILL) + else: + try: + main_proc = psutil.Process(pid=job.pid) + if hasattr(main_proc, "children"): + child_procs = main_proc.children(recursive=True) + else: + child_procs = main_proc.get_children(recursive=True) + for child_proc in child_procs: + os.kill(child_proc.pid, signal.SIGKILL) + os.kill(main_proc.pid, signal.SIGKILL) + except TypeError: + os.kill(job.pid, signal.SIGKILL) + else: + os.kill(job.pid, signal.SIGTERM) + time.sleep(3) + if is_cancel: + return True + else: + self.timeouted = True + except OSError: + keyword = 'cancel' if is_cancel else 'timeout' + logger.warn("Attempted to %s already finished job, ignoring" % keyword) + def pre_run_hook(self, instance, **kwargs): ''' Hook for any steps to run before the job/task starts @@ -654,6 +679,9 @@ class BaseTask(Task): try: stdout_handle.flush() stdout_handle.close() + if getattr(self, 'timeouted', False): + with open(stdout_filename, 'a') as f: + f.write("\x1b[1;31m%s\x1b[0m" % "JOB FAILS DUE TO TIMEOUT!") except Exception: pass instance = self.update_model(pk, status=status, result_traceback=tb,