mirror of
https://github.com/ansible/awx.git
synced 2026-03-20 18:37:39 -02:30
handle unicode things in task logger
Related to https://github.com/ansible/ansible-tower/issues/7957 * Problem presented itself as Instances falling out of Instance Groups. This was due to the cluster membership policy decider erroring out on a logger message with unicode. * Fixed up potential other unicode logger unicode issues in tasks.py
This commit is contained in:
@@ -84,13 +84,13 @@ class LogErrorsTask(Task):
|
|||||||
def on_failure(self, exc, task_id, args, kwargs, einfo):
|
def on_failure(self, exc, task_id, args, kwargs, einfo):
|
||||||
if getattr(exc, 'is_awx_task_error', False):
|
if getattr(exc, 'is_awx_task_error', False):
|
||||||
# Error caused by user / tracked in job output
|
# Error caused by user / tracked in job output
|
||||||
logger.warning(str(exc))
|
logger.warning(six.text_type("{}").format(exc))
|
||||||
elif isinstance(self, BaseTask):
|
elif isinstance(self, BaseTask):
|
||||||
logger.exception(
|
logger.exception(six.text_type(
|
||||||
'%s %s execution encountered exception.',
|
'{!s} {!s} execution encountered exception.')
|
||||||
get_type_for_model(self.model), args[0])
|
.format(get_type_for_model(self.model), args[0]))
|
||||||
else:
|
else:
|
||||||
logger.exception('Task {} encountered exception.'.format(self.name), exc_info=exc)
|
logger.exception(six.text_type('Task {} encountered exception.').format(self.name), exc_info=exc)
|
||||||
super(LogErrorsTask, self).on_failure(exc, task_id, args, kwargs, einfo)
|
super(LogErrorsTask, self).on_failure(exc, task_id, args, kwargs, einfo)
|
||||||
|
|
||||||
|
|
||||||
@@ -107,7 +107,7 @@ def celery_startup(conf=None, **kwargs):
|
|||||||
with disable_activity_stream():
|
with disable_activity_stream():
|
||||||
sch.save()
|
sch.save()
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("Failed to rebuild schedule {}.".format(sch))
|
logger.exception(six.text_type("Failed to rebuild schedule {}.").format(sch))
|
||||||
|
|
||||||
|
|
||||||
@worker_process_init.connect
|
@worker_process_init.connect
|
||||||
@@ -126,8 +126,8 @@ def inform_cluster_of_shutdown(*args, **kwargs):
|
|||||||
this_inst = Instance.objects.get(hostname=settings.CLUSTER_HOST_ID)
|
this_inst = Instance.objects.get(hostname=settings.CLUSTER_HOST_ID)
|
||||||
this_inst.capacity = 0 # No thank you to new jobs while shut down
|
this_inst.capacity = 0 # No thank you to new jobs while shut down
|
||||||
this_inst.save(update_fields=['capacity', 'modified'])
|
this_inst.save(update_fields=['capacity', 'modified'])
|
||||||
logger.warning('Normal shutdown signal for instance {}, '
|
logger.warning(six.text_type('Normal shutdown signal for instance {}, '
|
||||||
'removed self from capacity pool.'.format(this_inst.hostname))
|
'removed self from capacity pool.').format(this_inst.hostname))
|
||||||
except Exception:
|
except Exception:
|
||||||
# General exception because LogErrorsTask not used with celery signals
|
# General exception because LogErrorsTask not used with celery signals
|
||||||
logger.exception('Encountered problem with normal shutdown signal.')
|
logger.exception('Encountered problem with normal shutdown signal.')
|
||||||
@@ -146,7 +146,7 @@ def apply_cluster_membership_policies(self):
|
|||||||
# Process policy instance list first, these will represent manually managed instances
|
# Process policy instance list first, these will represent manually managed instances
|
||||||
# that will not go through automatic policy determination
|
# that will not go through automatic policy determination
|
||||||
for ig in InstanceGroup.objects.all():
|
for ig in InstanceGroup.objects.all():
|
||||||
logger.info("Considering group {}".format(ig.name))
|
logger.info(six.text_type("Considering group {}").format(ig.name))
|
||||||
ig.instances.clear()
|
ig.instances.clear()
|
||||||
group_actual = Group(obj=ig, instances=[])
|
group_actual = Group(obj=ig, instances=[])
|
||||||
for i in ig.policy_instance_list:
|
for i in ig.policy_instance_list:
|
||||||
@@ -154,7 +154,7 @@ def apply_cluster_membership_policies(self):
|
|||||||
if not inst.exists():
|
if not inst.exists():
|
||||||
continue
|
continue
|
||||||
inst = inst[0]
|
inst = inst[0]
|
||||||
logger.info("Policy List, adding {} to {}".format(inst.hostname, ig.name))
|
logger.info(six.text_type("Policy List, adding {} to {}").format(inst.hostname, ig.name))
|
||||||
group_actual.instances.append(inst.id)
|
group_actual.instances.append(inst.id)
|
||||||
ig.instances.add(inst)
|
ig.instances.add(inst)
|
||||||
filtered_instances.append(inst)
|
filtered_instances.append(inst)
|
||||||
@@ -167,7 +167,7 @@ def apply_cluster_membership_policies(self):
|
|||||||
for i in sorted(actual_instances, cmp=lambda x,y: len(x.groups) - len(y.groups)):
|
for i in sorted(actual_instances, cmp=lambda x,y: len(x.groups) - len(y.groups)):
|
||||||
if len(g.instances) >= g.obj.policy_instance_minimum:
|
if len(g.instances) >= g.obj.policy_instance_minimum:
|
||||||
break
|
break
|
||||||
logger.info("Policy minimum, adding {} to {}".format(i.obj.hostname, g.obj.name))
|
logger.info(six.text_type("Policy minimum, adding {} to {}").format(i.obj.hostname, g.obj.name))
|
||||||
g.obj.instances.add(i.obj)
|
g.obj.instances.add(i.obj)
|
||||||
g.instances.append(i.obj.id)
|
g.instances.append(i.obj.id)
|
||||||
i.groups.append(g.obj.id)
|
i.groups.append(g.obj.id)
|
||||||
@@ -176,7 +176,7 @@ def apply_cluster_membership_policies(self):
|
|||||||
for i in sorted(actual_instances, cmp=lambda x,y: len(x.groups) - len(y.groups)):
|
for i in sorted(actual_instances, cmp=lambda x,y: len(x.groups) - len(y.groups)):
|
||||||
if 100 * float(len(g.instances)) / len(actual_instances) >= g.obj.policy_instance_percentage:
|
if 100 * float(len(g.instances)) / len(actual_instances) >= g.obj.policy_instance_percentage:
|
||||||
break
|
break
|
||||||
logger.info("Policy percentage, adding {} to {}".format(i.obj.hostname, g.obj.name))
|
logger.info(six.text_type("Policy percentage, adding {} to {}").format(i.obj.hostname, g.obj.name))
|
||||||
g.instances.append(i.obj.id)
|
g.instances.append(i.obj.id)
|
||||||
g.obj.instances.add(i.obj)
|
g.obj.instances.add(i.obj)
|
||||||
i.groups.append(g.obj.id)
|
i.groups.append(g.obj.id)
|
||||||
@@ -205,24 +205,24 @@ def handle_setting_changes(self, setting_keys):
|
|||||||
@shared_task(bind=True, queue='tower_broadcast_all', base=LogErrorsTask)
|
@shared_task(bind=True, queue='tower_broadcast_all', base=LogErrorsTask)
|
||||||
def handle_ha_toplogy_changes(self):
|
def handle_ha_toplogy_changes(self):
|
||||||
instance = Instance.objects.me()
|
instance = Instance.objects.me()
|
||||||
logger.debug("Reconfigure celeryd queues task on host {}".format(self.request.hostname))
|
logger.debug(six.text_type("Reconfigure celeryd queues task on host {}").format(self.request.hostname))
|
||||||
awx_app = Celery('awx')
|
awx_app = Celery('awx')
|
||||||
awx_app.config_from_object('django.conf:settings')
|
awx_app.config_from_object('django.conf:settings')
|
||||||
instances, removed_queues, added_queues = register_celery_worker_queues(awx_app, self.request.hostname)
|
instances, removed_queues, added_queues = register_celery_worker_queues(awx_app, self.request.hostname)
|
||||||
for instance in instances:
|
for instance in instances:
|
||||||
logger.info("Workers on tower node '{}' removed from queues {} and added to queues {}"
|
logger.info(six.text_type("Workers on tower node '{}' removed from queues {} and added to queues {}")
|
||||||
.format(instance.hostname, removed_queues, added_queues))
|
.format(instance.hostname, removed_queues, added_queues))
|
||||||
updated_routes = update_celery_worker_routes(instance, settings)
|
updated_routes = update_celery_worker_routes(instance, settings)
|
||||||
logger.info("Worker on tower node '{}' updated celery routes {} all routes are now {}"
|
logger.info(six.text_type("Worker on tower node '{}' updated celery routes {} all routes are now {}")
|
||||||
.format(instance.hostname, updated_routes, self.app.conf.CELERY_ROUTES))
|
.format(instance.hostname, updated_routes, self.app.conf.CELERY_ROUTES))
|
||||||
|
|
||||||
|
|
||||||
@worker_ready.connect
|
@worker_ready.connect
|
||||||
def handle_ha_toplogy_worker_ready(sender, **kwargs):
|
def handle_ha_toplogy_worker_ready(sender, **kwargs):
|
||||||
logger.debug("Configure celeryd queues task on host {}".format(sender.hostname))
|
logger.debug(six.text_type("Configure celeryd queues task on host {}").format(sender.hostname))
|
||||||
instances, removed_queues, added_queues = register_celery_worker_queues(sender.app, sender.hostname)
|
instances, removed_queues, added_queues = register_celery_worker_queues(sender.app, sender.hostname)
|
||||||
for instance in instances:
|
for instance in instances:
|
||||||
logger.info("Workers on tower node '{}' unsubscribed from queues {} and subscribed to queues {}"
|
logger.info(six.text_type("Workers on tower node '{}' unsubscribed from queues {} and subscribed to queues {}")
|
||||||
.format(instance.hostname, removed_queues, added_queues))
|
.format(instance.hostname, removed_queues, added_queues))
|
||||||
|
|
||||||
# Expedite the first hearbeat run so a node comes online quickly.
|
# Expedite the first hearbeat run so a node comes online quickly.
|
||||||
@@ -233,10 +233,10 @@ def handle_ha_toplogy_worker_ready(sender, **kwargs):
|
|||||||
@celeryd_init.connect
|
@celeryd_init.connect
|
||||||
def handle_update_celery_routes(sender=None, conf=None, **kwargs):
|
def handle_update_celery_routes(sender=None, conf=None, **kwargs):
|
||||||
conf = conf if conf else sender.app.conf
|
conf = conf if conf else sender.app.conf
|
||||||
logger.debug("Registering celery routes for {}".format(sender))
|
logger.debug(six.text_type("Registering celery routes for {}").format(sender))
|
||||||
instance = Instance.objects.me()
|
instance = Instance.objects.me()
|
||||||
added_routes = update_celery_worker_routes(instance, conf)
|
added_routes = update_celery_worker_routes(instance, conf)
|
||||||
logger.info("Workers on tower node '{}' added routes {} all routes are now {}"
|
logger.info(six.text_type("Workers on tower node '{}' added routes {} all routes are now {}")
|
||||||
.format(instance.hostname, added_routes, conf.CELERY_ROUTES))
|
.format(instance.hostname, added_routes, conf.CELERY_ROUTES))
|
||||||
|
|
||||||
|
|
||||||
@@ -244,7 +244,7 @@ def handle_update_celery_routes(sender=None, conf=None, **kwargs):
|
|||||||
def handle_update_celery_hostname(sender, instance, **kwargs):
|
def handle_update_celery_hostname(sender, instance, **kwargs):
|
||||||
tower_instance = Instance.objects.me()
|
tower_instance = Instance.objects.me()
|
||||||
instance.hostname = 'celery@{}'.format(tower_instance.hostname)
|
instance.hostname = 'celery@{}'.format(tower_instance.hostname)
|
||||||
logger.warn("Set hostname to {}".format(instance.hostname))
|
logger.warn(six.text_type("Set hostname to {}").format(instance.hostname))
|
||||||
|
|
||||||
|
|
||||||
@shared_task(queue='tower', base=LogErrorsTask)
|
@shared_task(queue='tower', base=LogErrorsTask)
|
||||||
@@ -264,7 +264,7 @@ def send_notifications(notification_list, job_id=None):
|
|||||||
notification.status = "successful"
|
notification.status = "successful"
|
||||||
notification.notifications_sent = sent
|
notification.notifications_sent = sent
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error("Send Notification Failed {}".format(e))
|
logger.error(six.text_type("Send Notification Failed {}").format(e))
|
||||||
notification.status = "failed"
|
notification.status = "failed"
|
||||||
notification.error = smart_str(e)
|
notification.error = smart_str(e)
|
||||||
finally:
|
finally:
|
||||||
@@ -299,7 +299,7 @@ def purge_old_stdout_files(self):
|
|||||||
for f in os.listdir(settings.JOBOUTPUT_ROOT):
|
for f in os.listdir(settings.JOBOUTPUT_ROOT):
|
||||||
if os.path.getctime(os.path.join(settings.JOBOUTPUT_ROOT,f)) < nowtime - settings.LOCAL_STDOUT_EXPIRE_TIME:
|
if os.path.getctime(os.path.join(settings.JOBOUTPUT_ROOT,f)) < nowtime - settings.LOCAL_STDOUT_EXPIRE_TIME:
|
||||||
os.unlink(os.path.join(settings.JOBOUTPUT_ROOT,f))
|
os.unlink(os.path.join(settings.JOBOUTPUT_ROOT,f))
|
||||||
logger.info("Removing {}".format(os.path.join(settings.JOBOUTPUT_ROOT,f)))
|
logger.info(six.text_type("Removing {}").format(os.path.join(settings.JOBOUTPUT_ROOT,f)))
|
||||||
|
|
||||||
|
|
||||||
@shared_task(bind=True, base=LogErrorsTask)
|
@shared_task(bind=True, base=LogErrorsTask)
|
||||||
@@ -320,7 +320,7 @@ def cluster_node_heartbeat(self):
|
|||||||
if this_inst:
|
if this_inst:
|
||||||
startup_event = this_inst.is_lost(ref_time=nowtime)
|
startup_event = this_inst.is_lost(ref_time=nowtime)
|
||||||
if this_inst.capacity == 0 and this_inst.enabled:
|
if this_inst.capacity == 0 and this_inst.enabled:
|
||||||
logger.warning('Rejoining the cluster as instance {}.'.format(this_inst.hostname))
|
logger.warning(six.text_type('Rejoining the cluster as instance {}.').format(this_inst.hostname))
|
||||||
if this_inst.enabled:
|
if this_inst.enabled:
|
||||||
this_inst.refresh_capacity()
|
this_inst.refresh_capacity()
|
||||||
handle_ha_toplogy_changes.apply_async()
|
handle_ha_toplogy_changes.apply_async()
|
||||||
@@ -337,10 +337,11 @@ def cluster_node_heartbeat(self):
|
|||||||
if other_inst.version == "":
|
if other_inst.version == "":
|
||||||
continue
|
continue
|
||||||
if Version(other_inst.version.split('-', 1)[0]) > Version(awx_application_version.split('-', 1)[0]) and not settings.DEBUG:
|
if Version(other_inst.version.split('-', 1)[0]) > Version(awx_application_version.split('-', 1)[0]) and not settings.DEBUG:
|
||||||
logger.error("Host {} reports version {}, but this node {} is at {}, shutting down".format(other_inst.hostname,
|
logger.error(six.text_type("Host {} reports version {}, but this node {} is at {}, shutting down")
|
||||||
other_inst.version,
|
.format(other_inst.hostname,
|
||||||
this_inst.hostname,
|
other_inst.version,
|
||||||
this_inst.version))
|
this_inst.hostname,
|
||||||
|
this_inst.version))
|
||||||
# Shutdown signal will set the capacity to zero to ensure no Jobs get added to this instance.
|
# Shutdown signal will set the capacity to zero to ensure no Jobs get added to this instance.
|
||||||
# The heartbeat task will reset the capacity to the system capacity after upgrade.
|
# The heartbeat task will reset the capacity to the system capacity after upgrade.
|
||||||
stop_local_services(['uwsgi', 'celery', 'beat', 'callback'], communicate=False)
|
stop_local_services(['uwsgi', 'celery', 'beat', 'callback'], communicate=False)
|
||||||
@@ -357,17 +358,17 @@ def cluster_node_heartbeat(self):
|
|||||||
if other_inst.capacity != 0 and not settings.AWX_AUTO_DEPROVISION_INSTANCES:
|
if other_inst.capacity != 0 and not settings.AWX_AUTO_DEPROVISION_INSTANCES:
|
||||||
other_inst.capacity = 0
|
other_inst.capacity = 0
|
||||||
other_inst.save(update_fields=['capacity'])
|
other_inst.save(update_fields=['capacity'])
|
||||||
logger.error("Host {} last checked in at {}, marked as lost.".format(
|
logger.error(six.text_type("Host {} last checked in at {}, marked as lost.").format(
|
||||||
other_inst.hostname, other_inst.modified))
|
other_inst.hostname, other_inst.modified))
|
||||||
elif settings.AWX_AUTO_DEPROVISION_INSTANCES:
|
elif settings.AWX_AUTO_DEPROVISION_INSTANCES:
|
||||||
deprovision_hostname = other_inst.hostname
|
deprovision_hostname = other_inst.hostname
|
||||||
other_inst.delete()
|
other_inst.delete()
|
||||||
logger.info("Host {} Automatically Deprovisioned.".format(deprovision_hostname))
|
logger.info(six.text_type("Host {} Automatically Deprovisioned.").format(deprovision_hostname))
|
||||||
except DatabaseError as e:
|
except DatabaseError as e:
|
||||||
if 'did not affect any rows' in str(e):
|
if 'did not affect any rows' in str(e):
|
||||||
logger.debug('Another instance has marked {} as lost'.format(other_inst.hostname))
|
logger.debug(six.text_type('Another instance has marked {} as lost').format(other_inst.hostname))
|
||||||
else:
|
else:
|
||||||
logger.exception('Error marking {} as lost'.format(other_inst.hostname))
|
logger.exception(six.text_type('Error marking {} as lost').format(other_inst.hostname))
|
||||||
|
|
||||||
|
|
||||||
@shared_task(bind=True, base=LogErrorsTask)
|
@shared_task(bind=True, base=LogErrorsTask)
|
||||||
@@ -390,7 +391,7 @@ def awx_isolated_heartbeat(self):
|
|||||||
isolated_instance.save(update_fields=['last_isolated_check'])
|
isolated_instance.save(update_fields=['last_isolated_check'])
|
||||||
# Slow pass looping over isolated IGs and their isolated instances
|
# Slow pass looping over isolated IGs and their isolated instances
|
||||||
if len(isolated_instance_qs) > 0:
|
if len(isolated_instance_qs) > 0:
|
||||||
logger.debug("Managing isolated instances {}.".format(','.join([inst.hostname for inst in isolated_instance_qs])))
|
logger.debug(six.text_type("Managing isolated instances {}.").format(','.join([inst.hostname for inst in isolated_instance_qs])))
|
||||||
isolated_manager.IsolatedManager.health_check(isolated_instance_qs, awx_application_version)
|
isolated_manager.IsolatedManager.health_check(isolated_instance_qs, awx_application_version)
|
||||||
|
|
||||||
|
|
||||||
@@ -551,7 +552,7 @@ def update_host_smart_inventory_memberships():
|
|||||||
changed_inventories.add(smart_inventory)
|
changed_inventories.add(smart_inventory)
|
||||||
SmartInventoryMembership.objects.bulk_create(memberships)
|
SmartInventoryMembership.objects.bulk_create(memberships)
|
||||||
except IntegrityError as e:
|
except IntegrityError as e:
|
||||||
logger.error("Update Host Smart Inventory Memberships failed due to an exception: " + str(e))
|
logger.error(six.text_type("Update Host Smart Inventory Memberships failed due to an exception: {}").format(e))
|
||||||
return
|
return
|
||||||
# Update computed fields for changed inventories outside atomic action
|
# Update computed fields for changed inventories outside atomic action
|
||||||
for smart_inventory in changed_inventories:
|
for smart_inventory in changed_inventories:
|
||||||
@@ -576,7 +577,7 @@ def delete_inventory(self, inventory_id, user_id):
|
|||||||
'inventories-status_changed',
|
'inventories-status_changed',
|
||||||
{'group_name': 'inventories', 'inventory_id': inventory_id, 'status': 'deleted'}
|
{'group_name': 'inventories', 'inventory_id': inventory_id, 'status': 'deleted'}
|
||||||
)
|
)
|
||||||
logger.debug('Deleted inventory %s as user %s.' % (inventory_id, user_id))
|
logger.debug(six.text_type('Deleted inventory {} as user {}.').format(inventory_id, user_id))
|
||||||
except Inventory.DoesNotExist:
|
except Inventory.DoesNotExist:
|
||||||
logger.exception("Delete Inventory failed due to missing inventory: " + str(inventory_id))
|
logger.exception("Delete Inventory failed due to missing inventory: " + str(inventory_id))
|
||||||
return
|
return
|
||||||
@@ -598,7 +599,7 @@ def with_path_cleanup(f):
|
|||||||
elif os.path.exists(p):
|
elif os.path.exists(p):
|
||||||
os.remove(p)
|
os.remove(p)
|
||||||
except OSError:
|
except OSError:
|
||||||
logger.exception("Failed to remove tmp file: {}".format(p))
|
logger.exception(six.text_type("Failed to remove tmp file: {}").format(p))
|
||||||
self.cleanup_paths = []
|
self.cleanup_paths = []
|
||||||
return _wrapped
|
return _wrapped
|
||||||
|
|
||||||
@@ -1011,7 +1012,7 @@ class BaseTask(LogErrorsTask):
|
|||||||
try:
|
try:
|
||||||
self.post_run_hook(instance, status, **kwargs)
|
self.post_run_hook(instance, status, **kwargs)
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception('{} Post run hook errored.'.format(instance.log_format))
|
logger.exception(six.text_type('{} Post run hook errored.').format(instance.log_format))
|
||||||
instance = self.update_model(pk)
|
instance = self.update_model(pk)
|
||||||
if instance.cancel_flag:
|
if instance.cancel_flag:
|
||||||
status = 'canceled'
|
status = 'canceled'
|
||||||
@@ -1023,7 +1024,7 @@ class BaseTask(LogErrorsTask):
|
|||||||
try:
|
try:
|
||||||
self.final_run_hook(instance, status, **kwargs)
|
self.final_run_hook(instance, status, **kwargs)
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception('%s Final run hook errored.', instance.log_format)
|
logger.exception(six.text_type('{} Final run hook errored.').format(instance.log_format))
|
||||||
instance.websocket_emit_status(status)
|
instance.websocket_emit_status(status)
|
||||||
if status != 'successful' and not hasattr(settings, 'CELERY_UNIT_TEST'):
|
if status != 'successful' and not hasattr(settings, 'CELERY_UNIT_TEST'):
|
||||||
# Raising an exception will mark the job as 'failed' in celery
|
# Raising an exception will mark the job as 'failed' in celery
|
||||||
@@ -1616,8 +1617,8 @@ class RunProjectUpdate(BaseTask):
|
|||||||
task_instance.request.id = project_request_id
|
task_instance.request.id = project_request_id
|
||||||
task_instance.run(local_inv_update.id)
|
task_instance.run(local_inv_update.id)
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception('%s Unhandled exception updating dependent SCM inventory sources.',
|
logger.exception(six.text_type('{} Unhandled exception updating dependent SCM inventory sources.')
|
||||||
project_update.log_format)
|
.format(project_update.log_format))
|
||||||
|
|
||||||
try:
|
try:
|
||||||
project_update.refresh_from_db()
|
project_update.refresh_from_db()
|
||||||
@@ -1630,10 +1631,10 @@ class RunProjectUpdate(BaseTask):
|
|||||||
logger.warning('%s Dependent inventory update deleted during execution.', project_update.log_format)
|
logger.warning('%s Dependent inventory update deleted during execution.', project_update.log_format)
|
||||||
continue
|
continue
|
||||||
if project_update.cancel_flag:
|
if project_update.cancel_flag:
|
||||||
logger.info('Project update {} was canceled while updating dependent inventories.'.format(project_update.log_format))
|
logger.info(six.text_type('Project update {} was canceled while updating dependent inventories.').format(project_update.log_format))
|
||||||
break
|
break
|
||||||
if local_inv_update.cancel_flag:
|
if local_inv_update.cancel_flag:
|
||||||
logger.info('Continuing to process project dependencies after {} was canceled'.format(local_inv_update.log_format))
|
logger.info(six.text_type('Continuing to process project dependencies after {} was canceled').format(local_inv_update.log_format))
|
||||||
if local_inv_update.status == 'successful':
|
if local_inv_update.status == 'successful':
|
||||||
inv_src.scm_last_revision = scm_revision
|
inv_src.scm_last_revision = scm_revision
|
||||||
inv_src.save(update_fields=['scm_last_revision'])
|
inv_src.save(update_fields=['scm_last_revision'])
|
||||||
@@ -1642,7 +1643,7 @@ class RunProjectUpdate(BaseTask):
|
|||||||
try:
|
try:
|
||||||
fcntl.flock(self.lock_fd, fcntl.LOCK_UN)
|
fcntl.flock(self.lock_fd, fcntl.LOCK_UN)
|
||||||
except IOError as e:
|
except IOError as e:
|
||||||
logger.error("I/O error({0}) while trying to open lock file [{1}]: {2}".format(e.errno, instance.get_lock_file(), e.strerror))
|
logger.error(six.text_type("I/O error({0}) while trying to open lock file [{1}]: {2}").format(e.errno, instance.get_lock_file(), e.strerror))
|
||||||
os.close(self.lock_fd)
|
os.close(self.lock_fd)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
@@ -1660,14 +1661,14 @@ class RunProjectUpdate(BaseTask):
|
|||||||
try:
|
try:
|
||||||
self.lock_fd = os.open(lock_path, os.O_RDONLY | os.O_CREAT)
|
self.lock_fd = os.open(lock_path, os.O_RDONLY | os.O_CREAT)
|
||||||
except OSError as e:
|
except OSError as e:
|
||||||
logger.error("I/O error({0}) while trying to open lock file [{1}]: {2}".format(e.errno, lock_path, e.strerror))
|
logger.error(six.text_type("I/O error({0}) while trying to open lock file [{1}]: {2}").format(e.errno, lock_path, e.strerror))
|
||||||
raise
|
raise
|
||||||
|
|
||||||
try:
|
try:
|
||||||
fcntl.flock(self.lock_fd, fcntl.LOCK_EX)
|
fcntl.flock(self.lock_fd, fcntl.LOCK_EX)
|
||||||
except IOError as e:
|
except IOError as e:
|
||||||
os.close(self.lock_fd)
|
os.close(self.lock_fd)
|
||||||
logger.error("I/O error({0}) while trying to aquire lock on file [{1}]: {2}".format(e.errno, lock_path, e.strerror))
|
logger.error(six.text_type("I/O error({0}) while trying to aquire lock on file [{1}]: {2}").format(e.errno, lock_path, e.strerror))
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def pre_run_hook(self, instance, **kwargs):
|
def pre_run_hook(self, instance, **kwargs):
|
||||||
@@ -1687,7 +1688,7 @@ class RunProjectUpdate(BaseTask):
|
|||||||
if lines:
|
if lines:
|
||||||
p.scm_revision = lines[0].strip()
|
p.scm_revision = lines[0].strip()
|
||||||
else:
|
else:
|
||||||
logger.info("%s Could not find scm revision in check", instance.log_format)
|
logger.info(six.text_type("{} Could not find scm revision in check").format(instance.log_format))
|
||||||
p.playbook_files = p.playbooks
|
p.playbook_files = p.playbooks
|
||||||
p.inventory_files = p.inventories
|
p.inventory_files = p.inventories
|
||||||
p.save()
|
p.save()
|
||||||
@@ -2297,7 +2298,7 @@ class RunSystemJob(BaseTask):
|
|||||||
if 'granularity' in json_vars:
|
if 'granularity' in json_vars:
|
||||||
args.extend(['--granularity', str(json_vars['granularity'])])
|
args.extend(['--granularity', str(json_vars['granularity'])])
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("%s Failed to parse system job", system_job.log_format)
|
logger.exception(six.text_type("{} Failed to parse system job").format(system_job.log_format))
|
||||||
return args
|
return args
|
||||||
|
|
||||||
def build_env(self, instance, **kwargs):
|
def build_env(self, instance, **kwargs):
|
||||||
@@ -2332,7 +2333,7 @@ def deep_copy_model_obj(
|
|||||||
self, model_module, model_name, obj_pk, new_obj_pk,
|
self, model_module, model_name, obj_pk, new_obj_pk,
|
||||||
user_pk, sub_obj_list, permission_check_func=None
|
user_pk, sub_obj_list, permission_check_func=None
|
||||||
):
|
):
|
||||||
logger.info('Deep copy {} from {} to {}.'.format(model_name, obj_pk, new_obj_pk))
|
logger.info(six.text_type('Deep copy {} from {} to {}.').format(model_name, obj_pk, new_obj_pk))
|
||||||
from awx.api.generics import CopyAPIView
|
from awx.api.generics import CopyAPIView
|
||||||
model = getattr(importlib.import_module(model_module), model_name, None)
|
model = getattr(importlib.import_module(model_module), model_name, None)
|
||||||
if model is None:
|
if model is None:
|
||||||
|
|||||||
Reference in New Issue
Block a user