mirror of
https://github.com/ansible/awx.git
synced 2026-05-23 16:47:45 -02:30
Add job timeout mechanism to tasks.
This commit is contained in:
@@ -517,9 +517,12 @@ class BaseTask(Task):
|
|||||||
if pexpect_sleep is not None:
|
if pexpect_sleep is not None:
|
||||||
logger.info("Suspending Job Execution for QA Work")
|
logger.info("Suspending Job Execution for QA Work")
|
||||||
time.sleep(pexpect_sleep)
|
time.sleep(pexpect_sleep)
|
||||||
|
# TODO: Update job_timeout using instance field instead.
|
||||||
|
job_timeout = 10
|
||||||
child = pexpect.spawnu(args[0], args[1:], cwd=cwd, env=env)
|
child = pexpect.spawnu(args[0], args[1:], cwd=cwd, env=env)
|
||||||
child.logfile_read = logfile
|
child.logfile_read = logfile
|
||||||
canceled = False
|
canceled = False
|
||||||
|
self.timeouted = False
|
||||||
last_stdout_update = time.time()
|
last_stdout_update = time.time()
|
||||||
idle_timeout = self.get_idle_timeout()
|
idle_timeout = self.get_idle_timeout()
|
||||||
expect_list = []
|
expect_list = []
|
||||||
@@ -531,6 +534,7 @@ class BaseTask(Task):
|
|||||||
expect_list.extend([pexpect.TIMEOUT, pexpect.EOF])
|
expect_list.extend([pexpect.TIMEOUT, pexpect.EOF])
|
||||||
instance = self.update_model(instance.pk, status='running',
|
instance = self.update_model(instance.pk, status='running',
|
||||||
output_replacements=output_replacements)
|
output_replacements=output_replacements)
|
||||||
|
job_start = time.time()
|
||||||
while child.isalive():
|
while child.isalive():
|
||||||
result_id = child.expect(expect_list, timeout=pexpect_timeout)
|
result_id = child.expect(expect_list, timeout=pexpect_timeout)
|
||||||
if result_id in expect_passwords:
|
if result_id in expect_passwords:
|
||||||
@@ -541,14 +545,40 @@ class BaseTask(Task):
|
|||||||
# Refresh model instance from the database (to check cancel flag).
|
# Refresh model instance from the database (to check cancel flag).
|
||||||
instance = self.update_model(instance.pk)
|
instance = self.update_model(instance.pk)
|
||||||
if instance.cancel_flag:
|
if instance.cancel_flag:
|
||||||
|
canceled = self.__handle_termination(instance, child)
|
||||||
|
elif job_timeout != 0 and (time.time() - job_start) > job_timeout:
|
||||||
|
self.__handle_termination(instance, child, is_cancel=False)
|
||||||
|
if idle_timeout and (time.time() - last_stdout_update) > idle_timeout:
|
||||||
|
child.close(True)
|
||||||
|
canceled = True
|
||||||
|
if canceled:
|
||||||
|
return 'canceled', child.exitstatus
|
||||||
|
elif child.exitstatus == 0 and not self.timeouted:
|
||||||
|
return 'successful', child.exitstatus
|
||||||
|
else:
|
||||||
|
return 'failed', child.exitstatus
|
||||||
|
|
||||||
|
def __handle_termination(self, instance, job, is_cancel=True):
|
||||||
|
'''Helper function to properly terminate specified job and return correct
|
||||||
|
flags.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
instance: The corresponding model instance of this task.
|
||||||
|
job: The pexpect subprocess running the job.
|
||||||
|
is_cancel: Flag showing whether this termination is caused by instance's
|
||||||
|
cancel_flag.
|
||||||
|
|
||||||
|
Return:
|
||||||
|
True if is_cancel is set or None.
|
||||||
|
'''
|
||||||
try:
|
try:
|
||||||
if settings.AWX_PROOT_ENABLED and self.should_use_proot(instance):
|
if tower_settings.AWX_PROOT_ENABLED and self.should_use_proot(instance):
|
||||||
# NOTE: Refactor this once we get a newer psutil across the board
|
# NOTE: Refactor this once we get a newer psutil across the board
|
||||||
if not psutil:
|
if not psutil:
|
||||||
os.kill(child.pid, signal.SIGKILL)
|
os.kill(job.pid, signal.SIGKILL)
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
main_proc = psutil.Process(pid=child.pid)
|
main_proc = psutil.Process(pid=job.pid)
|
||||||
if hasattr(main_proc, "children"):
|
if hasattr(main_proc, "children"):
|
||||||
child_procs = main_proc.children(recursive=True)
|
child_procs = main_proc.children(recursive=True)
|
||||||
else:
|
else:
|
||||||
@@ -557,22 +587,17 @@ class BaseTask(Task):
|
|||||||
os.kill(child_proc.pid, signal.SIGKILL)
|
os.kill(child_proc.pid, signal.SIGKILL)
|
||||||
os.kill(main_proc.pid, signal.SIGKILL)
|
os.kill(main_proc.pid, signal.SIGKILL)
|
||||||
except TypeError:
|
except TypeError:
|
||||||
os.kill(child.pid, signal.SIGKILL)
|
os.kill(job.pid, signal.SIGKILL)
|
||||||
else:
|
else:
|
||||||
os.kill(child.pid, signal.SIGTERM)
|
os.kill(job.pid, signal.SIGTERM)
|
||||||
time.sleep(3)
|
time.sleep(3)
|
||||||
canceled = True
|
if is_cancel:
|
||||||
except OSError:
|
return True
|
||||||
logger.warn("Attempted to cancel already finished job, ignoring")
|
|
||||||
if idle_timeout and (time.time() - last_stdout_update) > idle_timeout:
|
|
||||||
child.close(True)
|
|
||||||
canceled = True
|
|
||||||
if canceled:
|
|
||||||
return 'canceled', child.exitstatus
|
|
||||||
elif child.exitstatus == 0:
|
|
||||||
return 'successful', child.exitstatus
|
|
||||||
else:
|
else:
|
||||||
return 'failed', child.exitstatus
|
self.timeouted = True
|
||||||
|
except OSError:
|
||||||
|
keyword = 'cancel' if is_cancel else 'timeout'
|
||||||
|
logger.warn("Attempted to %s already finished job, ignoring" % keyword)
|
||||||
|
|
||||||
def pre_run_hook(self, instance, **kwargs):
|
def pre_run_hook(self, instance, **kwargs):
|
||||||
'''
|
'''
|
||||||
@@ -654,6 +679,9 @@ class BaseTask(Task):
|
|||||||
try:
|
try:
|
||||||
stdout_handle.flush()
|
stdout_handle.flush()
|
||||||
stdout_handle.close()
|
stdout_handle.close()
|
||||||
|
if getattr(self, 'timeouted', False):
|
||||||
|
with open(stdout_filename, 'a') as f:
|
||||||
|
f.write("\x1b[1;31m%s\x1b[0m" % "JOB FAILS DUE TO TIMEOUT!")
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
instance = self.update_model(pk, status=status, result_traceback=tb,
|
instance = self.update_model(pk, status=status, result_traceback=tb,
|
||||||
|
|||||||
Reference in New Issue
Block a user