Files
awx/awxkit/awxkit/api/mixins/has_status.py
2023-02-01 14:59:38 -05:00

100 lines
4.2 KiB
Python

from datetime import datetime
import json
from awxkit.utils import poll_until
from awxkit.exceptions import WaitUntilTimeout
def bytes_to_str(obj):
try:
return obj.decode()
except AttributeError:
return str(obj)
class HasStatus(object):
completed_statuses = ['successful', 'failed', 'error', 'canceled']
started_statuses = ['pending', 'running'] + completed_statuses
@property
def is_completed(self):
return self.status.lower() in self.completed_statuses
@property
def is_successful(self):
return self.status == 'successful'
def wait_until_status(self, status, interval=1, timeout=60, **kwargs):
status = [status] if not isinstance(status, (list, tuple)) else status
try:
poll_until(lambda: getattr(self.get(), 'status') in status, interval=interval, timeout=timeout, **kwargs)
except WaitUntilTimeout:
# This will raise a more informative error than just "WaitUntilTimeout" error
self.assert_status(status)
return self
def wait_until_completed(self, interval=5, timeout=60, **kwargs):
start_time = datetime.utcnow()
HasStatus.wait_until_status(self, self.completed_statuses, interval=interval, timeout=timeout, **kwargs)
if not getattr(self, 'event_processing_finished', True):
elapsed = datetime.utcnow() - start_time
time_left = timeout - elapsed.total_seconds()
poll_until(lambda: getattr(self.get(), 'event_processing_finished', True), interval=interval, timeout=time_left, **kwargs)
return self
def wait_until_started(self, interval=1, timeout=60):
return self.wait_until_status(self.started_statuses, interval=interval, timeout=timeout)
def failure_output_details(self):
msg = ''
if getattr(self, 'result_stdout', ''):
output = bytes_to_str(self.result_stdout)
if output:
msg = '\nstdout:\n{}'.format(output)
if getattr(self, 'job_explanation', ''):
msg += '\njob_explanation: {}'.format(bytes_to_str(self.job_explanation))
if getattr(self, 'result_traceback', ''):
msg += '\nresult_traceback:\n{}'.format(bytes_to_str(self.result_traceback))
return msg
def assert_status(self, status_list, msg=None):
if isinstance(status_list, str):
status_list = [status_list]
if self.status in status_list:
# include corner cases in is_successful logic
if 'successful' not in status_list or self.is_successful:
return
if msg is None:
msg = ''
else:
msg += '\n'
msg += '{0}-{1} has status of {2}, which is not in {3}.'.format(self.type.title(), self.id, self.status, status_list)
if getattr(self, 'execution_environment', ''):
msg += '\nexecution_environment: {}'.format(bytes_to_str(self.execution_environment))
if getattr(self, 'related', False):
ee = self.related.execution_environment.get()
msg += f'\nee_image: {ee.image}'
msg += f'\nee_credential: {ee.credential}'
msg += f'\nee_pull_option: {ee.pull}'
msg += f'\nee_summary_fields: {ee.summary_fields}'
msg += self.failure_output_details()
if getattr(self, 'job_explanation', '').startswith('Previous Task Failed'):
try:
data = json.loads(self.job_explanation.replace('Previous Task Failed: ', ''))
dependency = self.walk('/api/v2/{0}s/{1}/'.format(data['job_type'], data['job_id']))
if hasattr(dependency, 'failure_output_details'):
msg += '\nDependency output:\n{}'.format(dependency.failure_output_details())
else:
msg += '\nDependency info:\n{}'.format(dependency)
except Exception as e:
msg += '\nFailed to obtain dependency stdout: {}'.format(e)
msg += '\nTIME WHEN STATUS WAS FOUND: {} (UTC)\n'.format(datetime.utcnow())
raise AssertionError(msg)
def assert_successful(self, msg=None):
return self.assert_status('successful', msg=msg)