Merge pull request #9005 from AlanCoding/assert_workflow_failure

Make workflow_job.assert_successful() give specifics

Reviewed-by: https://github.com/apps/softwarefactory-project-zuul
This commit is contained in:
softwarefactory-project-zuul[bot] 2021-01-05 17:28:28 +00:00 committed by GitHub
commit 96a7fe0035
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 36 additions and 6 deletions

View File

@ -47,6 +47,13 @@ class HasStatus(object):
def wait_until_started(self, interval=1, timeout=60):
return self.wait_until_status(self.started_statuses, interval=interval, timeout=timeout)
def failure_output_details(self):
if getattr(self, 'result_stdout', ''):
output = bytes_to_str(self.result_stdout)
if output:
return '\nstdout:\n{}'.format(output)
return ''
def assert_status(self, status_list, msg=None):
if isinstance(status_list, str):
status_list = [status_list]
@ -65,10 +72,9 @@ class HasStatus(object):
msg += '\njob_explanation: {}'.format(bytes_to_str(self.job_explanation))
if getattr(self, 'result_traceback', ''):
msg += '\nresult_traceback:\n{}'.format(bytes_to_str(self.result_traceback))
if getattr(self, 'result_stdout', ''):
output = bytes_to_str(self.result_stdout)
if output:
msg = msg + '\nstdout:\n{}'.format(output)
msg += self.failure_output_details()
if getattr(self, 'job_explanation', '').startswith('Previous Task Failed'):
try:
data = json.loads(self.job_explanation.replace('Previous Task Failed: ', ''))

View File

@ -13,11 +13,35 @@ class WorkflowJob(UnifiedJob):
result = self.related.relaunch.post(payload)
return self.walk(result.url)
def failure_output_details(self):
"""Special implementation of this part of assert_status so that
workflow_job.assert_successful() will give a breakdown of failure
"""
node_list = self.related.workflow_nodes.get().results
msg = '\nNode summary:'
for node in node_list:
msg += '\n{}: {}'.format(node.id, node.summary_fields.get('job'))
for rel in ('failure_nodes', 'always_nodes', 'success_nodes'):
val = getattr(node, rel, [])
if val:
msg += ' {} {}'.format(rel, val)
msg += '\n\nUnhandled individual job failures:\n'
for node in node_list:
# nodes without always or failure paths consider failures unhandled
if node.job and not (node.failure_nodes or node.always_nodes):
job = node.related.job.get()
try:
job.assert_successful()
except Exception as e:
msg += str(e)
return msg
@property
def result_stdout(self):
# workflow jobs do not have result_stdout
# which is problematic for the UnifiedJob.is_successful reliance on
# related stdout endpoint.
if 'result_stdout' not in self.json:
return 'Unprovided AWX field.'
else: