Change global settings structure and fix potential celery issue.

This commit is contained in:
Aaron Tan
2016-09-24 00:46:42 -04:00
parent 651a09f5fb
commit 96e3228c9d
2 changed files with 31 additions and 17 deletions

View File

@@ -501,7 +501,7 @@ class BaseTask(Task):
return OrderedDict() return OrderedDict()
def run_pexpect(self, instance, args, cwd, env, passwords, stdout_handle, def run_pexpect(self, instance, args, cwd, env, passwords, stdout_handle,
output_replacements=None): output_replacements=None, runtime_flags={}):
''' '''
Run the given command using pexpect to capture output and provide Run the given command using pexpect to capture output and provide
passwords when requested. passwords when requested.
@@ -517,13 +517,17 @@ class BaseTask(Task):
if pexpect_sleep is not None: if pexpect_sleep is not None:
logger.info("Suspending Job Execution for QA Work") logger.info("Suspending Job Execution for QA Work")
time.sleep(pexpect_sleep) time.sleep(pexpect_sleep)
local_timeout = getattr(instance, 'timeout', 0) global_timeout = getattr(settings, 'DEFAULT_TIMEOUT', {})
global_timeout = getattr(settings, 'JOB_TIMEOUT', 0) cls_name = instance.__class__.__name__
job_timeout = global_timeout if local_timeout == 0 else local_timeout if cls_name in global_timeout:
local_timeout = getattr(instance, 'timeout', 0)
job_timeout = global_timeout[cls_name] if local_timeout == 0 else local_timeout
else:
job_timeout = 0
child = pexpect.spawnu(args[0], args[1:], cwd=cwd, env=env) child = pexpect.spawnu(args[0], args[1:], cwd=cwd, env=env)
child.logfile_read = logfile child.logfile_read = logfile
canceled = False runtime_flags['canceled'] = False
self.timed_out = False runtime_flags['timed_out'] = False
last_stdout_update = time.time() last_stdout_update = time.time()
idle_timeout = self.get_idle_timeout() idle_timeout = self.get_idle_timeout()
expect_list = [] expect_list = []
@@ -546,22 +550,23 @@ class BaseTask(Task):
# Refresh model instance from the database (to check cancel flag). # Refresh model instance from the database (to check cancel flag).
instance = self.update_model(instance.pk) instance = self.update_model(instance.pk)
if instance.cancel_flag: if instance.cancel_flag:
canceled = self.__handle_termination(instance, child) runtime_flags['canceled'] = True
elif job_timeout != 0 and (time.time() - job_start) > job_timeout: elif job_timeout != 0 and (time.time() - job_start) > job_timeout:
self.__handle_termination(instance, child, is_cancel=False) runtime_flags['timed_out'] = True
if any(list(runtime_flags.values())):
self._handle_termination(instance, child, is_cancel=runtime_flags['canceled'])
if idle_timeout and (time.time() - last_stdout_update) > idle_timeout: if idle_timeout and (time.time() - last_stdout_update) > idle_timeout:
child.close(True) child.close(True)
canceled = True runtime_flags['canceled'] = True
if canceled: if runtime_flags['canceled']:
return 'canceled', child.exitstatus return 'canceled', child.exitstatus
elif child.exitstatus == 0 and not self.timed_out: elif child.exitstatus == 0 and not runtime_flags['timed_out']:
return 'successful', child.exitstatus return 'successful', child.exitstatus
else: else:
return 'failed', child.exitstatus return 'failed', child.exitstatus
def __handle_termination(self, instance, job, is_cancel=True): def _handle_termination(self, instance, job, is_cancel=True):
'''Helper function to properly terminate specified job and return correct '''Helper function to properly terminate specified job.
flags.
Args: Args:
instance: The corresponding model instance of this task. instance: The corresponding model instance of this task.
@@ -570,7 +575,7 @@ class BaseTask(Task):
cancel_flag. cancel_flag.
Return: Return:
True if is_cancel is set or None. None.
''' '''
try: try:
if tower_settings.AWX_PROOT_ENABLED and self.should_use_proot(instance): if tower_settings.AWX_PROOT_ENABLED and self.should_use_proot(instance):
@@ -618,6 +623,7 @@ class BaseTask(Task):
instance.websocket_emit_status("running") instance.websocket_emit_status("running")
status, rc, tb = 'error', None, '' status, rc, tb = 'error', None, ''
output_replacements = [] output_replacements = []
runtime_flags = {}
try: try:
self.pre_run_hook(instance, **kwargs) self.pre_run_hook(instance, **kwargs)
if instance.cancel_flag: if instance.cancel_flag:
@@ -662,7 +668,8 @@ class BaseTask(Task):
safe_args = self.wrap_args_with_ssh_agent(safe_args, ssh_key_path, ssh_auth_sock) safe_args = self.wrap_args_with_ssh_agent(safe_args, ssh_key_path, ssh_auth_sock)
instance = self.update_model(pk, job_args=json.dumps(safe_args), instance = self.update_model(pk, job_args=json.dumps(safe_args),
job_cwd=cwd, job_env=safe_env, result_stdout_file=stdout_filename) job_cwd=cwd, job_env=safe_env, result_stdout_file=stdout_filename)
status, rc = self.run_pexpect(instance, args, cwd, env, kwargs['passwords'], stdout_handle) status, rc = self.run_pexpect(instance, args, cwd, env, kwargs['passwords'], stdout_handle,
runtime_flags=runtime_flags)
except Exception: except Exception:
if status != 'canceled': if status != 'canceled':
tb = traceback.format_exc() tb = traceback.format_exc()
@@ -680,7 +687,7 @@ class BaseTask(Task):
try: try:
stdout_handle.flush() stdout_handle.flush()
stdout_handle.close() stdout_handle.close()
if getattr(self, 'timed_out', False): if runtime_flags.get('timed_out', False):
job_explanation = "Job terminated due to timeout" job_explanation = "Job terminated due to timeout"
except Exception: except Exception:
pass pass

View File

@@ -276,3 +276,10 @@ TEST_OPENSTACK_PROJECT = ''
# Azure credentials. # Azure credentials.
TEST_AZURE_USERNAME = '' TEST_AZURE_USERNAME = ''
TEST_AZURE_KEY_DATA = '' TEST_AZURE_KEY_DATA = ''
# Exemplary job timeout settings
# DEFAULT_TIMEOUT = {
# 'Job': 10,
# 'InventoryUpdate': 15,
# 'ProjectUpdate': 20,
# }