introduce log format for jobs inside of scheduler

This commit is contained in:
AlanCoding
2017-08-08 11:31:33 -04:00
parent 4f4293a8cd
commit 33df1d8c8b
3 changed files with 27 additions and 11 deletions

View File

@@ -33,7 +33,8 @@ from awx.main.models.schedules import Schedule
from awx.main.models.mixins import ResourceMixin, TaskManagerUnifiedJobMixin from awx.main.models.mixins import ResourceMixin, TaskManagerUnifiedJobMixin
from awx.main.utils import ( from awx.main.utils import (
decrypt_field, _inventory_updates, decrypt_field, _inventory_updates,
copy_model_by_class, copy_m2m_relationships copy_model_by_class, copy_m2m_relationships,
get_type_for_model
) )
from awx.main.redact import UriCleaner, REPLACE_STR from awx.main.redact import UriCleaner, REPLACE_STR
from awx.main.consumers import emit_channel_notification from awx.main.consumers import emit_channel_notification
@@ -622,6 +623,10 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
def __unicode__(self): def __unicode__(self):
return u'%s-%s-%s' % (self.created, self.id, self.status) return u'%s-%s-%s' % (self.created, self.id, self.status)
@property
def log_format(self):
return '{} {} ({})'.format(get_type_for_model(type(self)), self.id, self.status)
def _get_parent_instance(self): def _get_parent_instance(self):
return getattr(self, self._get_parent_field_name(), None) return getattr(self, self._get_parent_field_name(), None)

View File

@@ -212,11 +212,11 @@ class TaskManager():
if not task.supports_isolation() and rampart_group.controller_id: if not task.supports_isolation() and rampart_group.controller_id:
# non-Ansible jobs on isolated instances run on controller # non-Ansible jobs on isolated instances run on controller
task.instance_group = rampart_group.controller task.instance_group = rampart_group.controller
logger.info('Submitting isolated job {} to queue {} via {}.'.format( logger.info('Submitting isolated {} to queue {} via {}.'.format(
task.id, task.instance_group_id, rampart_group.controller_id)) task.log_format, task.instance_group_id, rampart_group.controller_id))
else: else:
task.instance_group = rampart_group task.instance_group = rampart_group
logger.info('Submitting job {} to instance group {}.'.format(task.id, task.instance_group_id)) logger.info('Submitting {} to instance group {}.'.format(task.log_format, task.instance_group_id))
with disable_activity_stream(): with disable_activity_stream():
task.celery_task_id = str(uuid.uuid4()) task.celery_task_id = str(uuid.uuid4())
task.save() task.save()
@@ -342,7 +342,7 @@ class TaskManager():
def process_dependencies(self, dependent_task, dependency_tasks): def process_dependencies(self, dependent_task, dependency_tasks):
for task in dependency_tasks: for task in dependency_tasks:
if self.is_job_blocked(task): if self.is_job_blocked(task):
logger.debug("Dependent task {} is blocked from running".format(task)) logger.debug("Dependent {} is blocked from running".format(task.log_format))
continue continue
preferred_instance_groups = task.preferred_instance_groups preferred_instance_groups = task.preferred_instance_groups
found_acceptable_queue = False found_acceptable_queue = False
@@ -351,20 +351,20 @@ class TaskManager():
logger.debug("Skipping group {} capacity <= 0".format(rampart_group.name)) logger.debug("Skipping group {} capacity <= 0".format(rampart_group.name))
continue continue
if not self.would_exceed_capacity(task, rampart_group.name): if not self.would_exceed_capacity(task, rampart_group.name):
logger.debug("Starting dependent task {} in group {}".format(task, rampart_group.name)) logger.debug("Starting dependent {} in group {}".format(task.log_format, rampart_group.name))
self.graph[rampart_group.name]['graph'].add_job(task) self.graph[rampart_group.name]['graph'].add_job(task)
tasks_to_fail = filter(lambda t: t != task, dependency_tasks) tasks_to_fail = filter(lambda t: t != task, dependency_tasks)
tasks_to_fail += [dependent_task] tasks_to_fail += [dependent_task]
self.start_task(task, rampart_group, tasks_to_fail) self.start_task(task, rampart_group, tasks_to_fail)
found_acceptable_queue = True found_acceptable_queue = True
if not found_acceptable_queue: if not found_acceptable_queue:
logger.debug("Dependent task {} couldn't be scheduled on graph, waiting for next cycle".format(task)) logger.debug("Dependent {} couldn't be scheduled on graph, waiting for next cycle".format(task.log_format))
def process_pending_tasks(self, pending_tasks): def process_pending_tasks(self, pending_tasks):
for task in pending_tasks: for task in pending_tasks:
self.process_dependencies(task, self.generate_dependencies(task)) self.process_dependencies(task, self.generate_dependencies(task))
if self.is_job_blocked(task): if self.is_job_blocked(task):
logger.debug("Task {} is blocked from running".format(task)) logger.debug("{} is blocked from running".format(task.log_format))
continue continue
preferred_instance_groups = task.preferred_instance_groups preferred_instance_groups = task.preferred_instance_groups
found_acceptable_queue = False found_acceptable_queue = False
@@ -373,13 +373,13 @@ class TaskManager():
logger.debug("Skipping group {} capacity <= 0".format(rampart_group.name)) logger.debug("Skipping group {} capacity <= 0".format(rampart_group.name))
continue continue
if not self.would_exceed_capacity(task, rampart_group.name): if not self.would_exceed_capacity(task, rampart_group.name):
logger.debug("Starting task {} in group {}".format(task, rampart_group.name)) logger.debug("Starting {} in group {}".format(task.log_format, rampart_group.name))
self.graph[rampart_group.name]['graph'].add_job(task) self.graph[rampart_group.name]['graph'].add_job(task)
self.start_task(task, rampart_group, task.get_jobs_fail_chain()) self.start_task(task, rampart_group, task.get_jobs_fail_chain())
found_acceptable_queue = True found_acceptable_queue = True
break break
if not found_acceptable_queue: if not found_acceptable_queue:
logger.debug("Task {} couldn't be scheduled on graph, waiting for next cycle".format(task)) logger.debug("{} couldn't be scheduled on graph, waiting for next cycle".format(task.log_format))
def cleanup_inconsistent_celery_tasks(self): def cleanup_inconsistent_celery_tasks(self):
''' '''
@@ -415,7 +415,7 @@ class TaskManager():
task.save() task.save()
awx_tasks._send_notification_templates(task, 'failed') awx_tasks._send_notification_templates(task, 'failed')
task.websocket_emit_status('failed') task.websocket_emit_status('failed')
logger.error("Task %s appears orphaned... marking as failed" % task) logger.error("{} appears orphaned... marking as failed".format(task.log_format))
def calculate_capacity_used(self, tasks): def calculate_capacity_used(self, tasks):

View File

@@ -5,6 +5,7 @@ from awx.main.models import (
UnifiedJob, UnifiedJob,
WorkflowJob, WorkflowJob,
WorkflowJobNode, WorkflowJobNode,
Job
) )
@@ -50,3 +51,13 @@ def test_cancel_job_explanation(unified_job):
assert unified_job.job_explanation == job_explanation assert unified_job.job_explanation == job_explanation
unified_job.save.assert_called_with(update_fields=['cancel_flag', 'status', 'job_explanation']) unified_job.save.assert_called_with(update_fields=['cancel_flag', 'status', 'job_explanation'])
def test_log_representation():
'''
Common representation used inside of log messages
'''
uj = UnifiedJob(status='running', id=4)
job = Job(status='running', id=4)
assert job.log_format == 'job 4 (running)'
assert uj.log_format == 'unified_job 4 (running)'