mirror of
https://github.com/ansible/awx.git
synced 2026-02-26 07:26:03 -03:30
Merge pull request #250 from AlanCoding/keep_on_truckin
pass over job-run exceptions & log ID
This commit is contained in:
@@ -912,14 +912,17 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
|
|||||||
return websocket_data
|
return websocket_data
|
||||||
|
|
||||||
def _websocket_emit_status(self, status):
|
def _websocket_emit_status(self, status):
|
||||||
status_data = dict(unified_job_id=self.id, status=status)
|
try:
|
||||||
status_data.update(self.websocket_emit_data())
|
status_data = dict(unified_job_id=self.id, status=status)
|
||||||
status_data['group_name'] = 'jobs'
|
status_data.update(self.websocket_emit_data())
|
||||||
emit_channel_notification('jobs-status_changed', status_data)
|
status_data['group_name'] = 'jobs'
|
||||||
|
emit_channel_notification('jobs-status_changed', status_data)
|
||||||
|
|
||||||
if self.spawned_by_workflow:
|
if self.spawned_by_workflow:
|
||||||
status_data['group_name'] = "workflow_events"
|
status_data['group_name'] = "workflow_events"
|
||||||
emit_channel_notification('workflow_events-' + str(self.workflow_job_id), status_data)
|
emit_channel_notification('workflow_events-' + str(self.workflow_job_id), status_data)
|
||||||
|
except IOError: # includes socket errors
|
||||||
|
logger.exception('%s failed to emit channel msg about status change', self.log_format)
|
||||||
|
|
||||||
def websocket_emit_status(self, status):
|
def websocket_emit_status(self, status):
|
||||||
connection.on_commit(lambda: self._websocket_emit_status(status))
|
connection.on_commit(lambda: self._websocket_emit_status(status))
|
||||||
|
|||||||
@@ -55,7 +55,8 @@ from awx.main.isolated import run, isolated_manager
|
|||||||
from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field, update_scm_url,
|
from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field, update_scm_url,
|
||||||
check_proot_installed, build_proot_temp_dir, get_licenser,
|
check_proot_installed, build_proot_temp_dir, get_licenser,
|
||||||
wrap_args_with_proot, get_system_task_capacity, OutputEventFilter,
|
wrap_args_with_proot, get_system_task_capacity, OutputEventFilter,
|
||||||
parse_yaml_or_json, ignore_inventory_computed_fields, ignore_inventory_group_removal)
|
parse_yaml_or_json, ignore_inventory_computed_fields, ignore_inventory_group_removal,
|
||||||
|
get_type_for_model)
|
||||||
from awx.main.utils.reload import restart_local_services, stop_local_services
|
from awx.main.utils.reload import restart_local_services, stop_local_services
|
||||||
from awx.main.utils.handlers import configure_external_logger
|
from awx.main.utils.handlers import configure_external_logger
|
||||||
from awx.main.consumers import emit_channel_notification
|
from awx.main.consumers import emit_channel_notification
|
||||||
@@ -79,7 +80,12 @@ logger = logging.getLogger('awx.main.tasks')
|
|||||||
|
|
||||||
class LogErrorsTask(Task):
|
class LogErrorsTask(Task):
|
||||||
def on_failure(self, exc, task_id, args, kwargs, einfo):
|
def on_failure(self, exc, task_id, args, kwargs, einfo):
|
||||||
logger.exception('Task {} encountered exception.'.format(self.name), exc_info=exc)
|
if isinstance(self, BaseTask):
|
||||||
|
logger.exception(
|
||||||
|
'%s %s execution encountered exception.',
|
||||||
|
get_type_for_model(self.model), args[0])
|
||||||
|
else:
|
||||||
|
logger.exception('Task {} encountered exception.'.format(self.name), exc_info=exc)
|
||||||
super(LogErrorsTask, self).on_failure(exc, task_id, args, kwargs, einfo)
|
super(LogErrorsTask, self).on_failure(exc, task_id, args, kwargs, einfo)
|
||||||
|
|
||||||
|
|
||||||
@@ -352,12 +358,12 @@ def handle_work_error(self, task_id, subtasks=None):
|
|||||||
for each_task in subtasks:
|
for each_task in subtasks:
|
||||||
try:
|
try:
|
||||||
instance = UnifiedJob.get_instance_by_type(each_task['type'], each_task['id'])
|
instance = UnifiedJob.get_instance_by_type(each_task['type'], each_task['id'])
|
||||||
|
if not instance:
|
||||||
|
# Unknown task type
|
||||||
|
logger.warn("Unknown task type: {}".format(each_task['type']))
|
||||||
|
continue
|
||||||
except ObjectDoesNotExist:
|
except ObjectDoesNotExist:
|
||||||
logger.warning('Missing {} `{}` in success callback.'.format(each_task['type'], each_task['id']))
|
logger.warning('Missing {} `{}` in error callback.'.format(each_task['type'], each_task['id']))
|
||||||
instance = None
|
|
||||||
if not instance:
|
|
||||||
# Unknown task type
|
|
||||||
logger.warn("Unknown task type: {}".format(each_task['type']))
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if first_instance is None:
|
if first_instance is None:
|
||||||
@@ -866,7 +872,7 @@ class BaseTask(LogErrorsTask):
|
|||||||
if status != 'canceled':
|
if status != 'canceled':
|
||||||
tb = traceback.format_exc()
|
tb = traceback.format_exc()
|
||||||
if settings.DEBUG:
|
if settings.DEBUG:
|
||||||
logger.exception('exception occurred while running task')
|
logger.exception('%s Exception occurred while running task', instance.log_format)
|
||||||
finally:
|
finally:
|
||||||
try:
|
try:
|
||||||
stdout_handle.flush()
|
stdout_handle.flush()
|
||||||
@@ -877,7 +883,7 @@ class BaseTask(LogErrorsTask):
|
|||||||
try:
|
try:
|
||||||
self.post_run_hook(instance, status, **kwargs)
|
self.post_run_hook(instance, status, **kwargs)
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception('Post run hook of unified job {} errored.'.format(instance.pk))
|
logger.exception('{} Post run hook errored.'.format(instance.log_format))
|
||||||
instance = self.update_model(pk)
|
instance = self.update_model(pk)
|
||||||
if instance.cancel_flag:
|
if instance.cancel_flag:
|
||||||
status = 'canceled'
|
status = 'canceled'
|
||||||
@@ -885,16 +891,19 @@ class BaseTask(LogErrorsTask):
|
|||||||
instance = self.update_model(pk, status=status, result_traceback=tb,
|
instance = self.update_model(pk, status=status, result_traceback=tb,
|
||||||
output_replacements=output_replacements,
|
output_replacements=output_replacements,
|
||||||
**extra_update_fields)
|
**extra_update_fields)
|
||||||
self.final_run_hook(instance, status, **kwargs)
|
try:
|
||||||
|
self.final_run_hook(instance, status, **kwargs)
|
||||||
|
except:
|
||||||
|
logger.exception('%s Final run hook errored.', instance.log_format)
|
||||||
instance.websocket_emit_status(status)
|
instance.websocket_emit_status(status)
|
||||||
if status != 'successful' and not hasattr(settings, 'CELERY_UNIT_TEST'):
|
if status != 'successful' and not hasattr(settings, 'CELERY_UNIT_TEST'):
|
||||||
# Raising an exception will mark the job as 'failed' in celery
|
# Raising an exception will mark the job as 'failed' in celery
|
||||||
# and will stop a task chain from continuing to execute
|
# and will stop a task chain from continuing to execute
|
||||||
if status == 'canceled':
|
if status == 'canceled':
|
||||||
raise Exception("Task %s(pk:%s) was canceled (rc=%s)" % (str(self.model.__class__), str(pk), str(rc)))
|
raise Exception("%s was canceled (rc=%s)" % (instance.log_format, str(rc)))
|
||||||
else:
|
else:
|
||||||
raise Exception("Task %s(pk:%s) encountered an error (rc=%s), please see task stdout for details." %
|
raise Exception("%s encountered an error (rc=%s), please see task stdout for details." %
|
||||||
(str(self.model.__class__), str(pk), str(rc)))
|
(instance.log_format, str(rc)))
|
||||||
if not hasattr(settings, 'CELERY_UNIT_TEST'):
|
if not hasattr(settings, 'CELERY_UNIT_TEST'):
|
||||||
self.signal_finished(pk)
|
self.signal_finished(pk)
|
||||||
|
|
||||||
@@ -1503,7 +1512,8 @@ class RunProjectUpdate(BaseTask):
|
|||||||
task_instance.request.id = project_request_id
|
task_instance.request.id = project_request_id
|
||||||
task_instance.run(local_inv_update.id)
|
task_instance.run(local_inv_update.id)
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception('Encountered unhandled exception updating dependent SCM inventory sources.')
|
logger.exception('%s Unhandled exception updating dependent SCM inventory sources.',
|
||||||
|
project_update.log_format)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
project_update.refresh_from_db()
|
project_update.refresh_from_db()
|
||||||
@@ -1513,7 +1523,7 @@ class RunProjectUpdate(BaseTask):
|
|||||||
try:
|
try:
|
||||||
local_inv_update.refresh_from_db()
|
local_inv_update.refresh_from_db()
|
||||||
except InventoryUpdate.DoesNotExist:
|
except InventoryUpdate.DoesNotExist:
|
||||||
logger.warning('Inventory update deleted during execution.')
|
logger.warning('%s Dependent inventory update deleted during execution.', project_update.log_format)
|
||||||
continue
|
continue
|
||||||
if project_update.cancel_flag or local_inv_update.cancel_flag:
|
if project_update.cancel_flag or local_inv_update.cancel_flag:
|
||||||
if not project_update.cancel_flag:
|
if not project_update.cancel_flag:
|
||||||
@@ -1573,7 +1583,7 @@ class RunProjectUpdate(BaseTask):
|
|||||||
if lines:
|
if lines:
|
||||||
p.scm_revision = lines[0].strip()
|
p.scm_revision = lines[0].strip()
|
||||||
else:
|
else:
|
||||||
logger.info("Could not find scm revision in check")
|
logger.info("%s Could not find scm revision in check", instance.log_format)
|
||||||
p.playbook_files = p.playbooks
|
p.playbook_files = p.playbooks
|
||||||
p.inventory_files = p.inventories
|
p.inventory_files = p.inventories
|
||||||
p.save()
|
p.save()
|
||||||
@@ -2196,8 +2206,8 @@ class RunSystemJob(BaseTask):
|
|||||||
args.extend(['--older_than', str(json_vars['older_than'])])
|
args.extend(['--older_than', str(json_vars['older_than'])])
|
||||||
if 'granularity' in json_vars:
|
if 'granularity' in json_vars:
|
||||||
args.extend(['--granularity', str(json_vars['granularity'])])
|
args.extend(['--granularity', str(json_vars['granularity'])])
|
||||||
except Exception as e:
|
except Exception:
|
||||||
logger.error("Failed to parse system job: " + str(e))
|
logger.exception("%s Failed to parse system job", instance.log_format)
|
||||||
return args
|
return args
|
||||||
|
|
||||||
def get_stdout_handle(self, instance):
|
def get_stdout_handle(self, instance):
|
||||||
|
|||||||
Reference in New Issue
Block a user