Add job lifecycle logging

Various	points (e.g. created, running, processing events), are
structured into	json format and	output to /var/log/tower/job_lifecycle.log

As part	of this	work, the DependencyGraph is reworked to return
which job object is doing the blocking, rather than a boolean.
This commit is contained in:
Seth Foster
2021-02-03 14:33:25 -05:00
parent 47de2ddcb5
commit 41d0a2f7b9
12 changed files with 191 additions and 91 deletions

View File

@@ -828,6 +828,7 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin, TaskMana
return self.inventory.hosts.only(*only)
def start_job_fact_cache(self, destination, modification_times, timeout=None):
self.log_lifecycle("start_job_fact_cache")
os.makedirs(destination, mode=0o700)
hosts = self._get_inventory_hosts()
if timeout is None:
@@ -852,6 +853,7 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin, TaskMana
modification_times[filepath] = os.path.getmtime(filepath)
def finish_job_fact_cache(self, destination, modification_times):
self.log_lifecycle("finish_job_fact_cache")
for host in self._get_inventory_hosts():
filepath = os.sep.join(map(str, [destination, host.name]))
if not os.path.realpath(filepath).startswith(destination):

View File

@@ -55,7 +55,7 @@ from awx.main.fields import JSONField, AskForField, OrderedManyToManyField
__all__ = ['UnifiedJobTemplate', 'UnifiedJob', 'StdoutMaxBytesExceeded']
logger = logging.getLogger('awx.main.models.unified_jobs')
logger_job_lifecycle = logging.getLogger('awx.analytics.job_lifecycle')
# NOTE: ACTIVE_STATES moved to constants because it is used by parent modules
@@ -420,7 +420,7 @@ class UnifiedJobTemplate(PolymorphicModel, CommonModelNameNotUnique, Notificatio
# have been associated to the UJ
if unified_job.__class__ in activity_stream_registrar.models:
activity_stream_create(None, unified_job, True)
unified_job.log_lifecycle("created")
return unified_job
@classmethod
@@ -862,7 +862,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
self.unified_job_template = self._get_parent_instance()
if 'unified_job_template' not in update_fields:
update_fields.append('unified_job_template')
if self.cancel_flag and not self.canceled_on:
# Record the 'canceled' time.
self.canceled_on = now()
@@ -1010,6 +1010,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
event_qs = self.get_event_queryset()
except NotImplementedError:
return True # Model without events, such as WFJT
self.log_lifecycle("event_processing_finished")
return self.emitted_events == event_qs.count()
def result_stdout_raw_handle(self, enforce_max_bytes=True):
@@ -1318,6 +1319,10 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
if 'extra_vars' in kwargs:
self.handle_extra_data(kwargs['extra_vars'])
# remove any job_explanations that may have been set while job was in pending
if self.job_explanation != "":
self.job_explanation = ""
return (True, opts)
def signal_start(self, **kwargs):
@@ -1484,3 +1489,17 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
@property
def is_containerized(self):
return False
def log_lifecycle(self, state, blocked_by=None):
extra={'type': self._meta.model_name,
'task_id': self.id,
'state': state}
if self.unified_job_template:
extra["template_name"] = self.unified_job_template.name
if state == "blocked" and blocked_by:
blocked_by_msg = f"{blocked_by._meta.model_name}-{blocked_by.id}"
msg = f"{self._meta.model_name}-{self.id} blocked by {blocked_by_msg}"
extra["blocked_by"] = blocked_by_msg
else:
msg = f"{self._meta.model_name}-{self.id} {state.replace('_', ' ')}"
logger_job_lifecycle.debug(msg, extra=extra)

View File

@@ -1,5 +1,3 @@
from django.utils.timezone import now as tz_now
from awx.main.models import (
Job,
ProjectUpdate,
@@ -20,119 +18,110 @@ class DependencyGraph(object):
INVENTORY_SOURCE_UPDATES = 'inventory_source_updates'
WORKFLOW_JOB_TEMPLATES_JOBS = 'workflow_job_template_jobs'
LATEST_PROJECT_UPDATES = 'latest_project_updates'
LATEST_INVENTORY_UPDATES = 'latest_inventory_updates'
INVENTORY_SOURCES = 'inventory_source_ids'
def __init__(self, queue):
self.queue = queue
def __init__(self):
self.data = {}
# project_id -> True / False
self.data[self.PROJECT_UPDATES] = {}
# inventory_id -> True / False
# The reason for tracking both inventory and inventory sources:
# Consider InvA, which has two sources, InvSource1, InvSource2.
# JobB might depend on InvA, which launches two updates, one for each source.
# To determine if JobB can run, we can just check InvA, which is marked in
# INVENTORY_UPDATES, instead of having to check for both entries in
# INVENTORY_SOURCE_UPDATES.
self.data[self.INVENTORY_UPDATES] = {}
# job_template_id -> True / False
self.data[self.JOB_TEMPLATE_JOBS] = {}
'''
Track runnable job related project and inventory to ensure updates
don't run while a job needing those resources is running.
'''
# inventory_source_id -> True / False
self.data[self.INVENTORY_SOURCE_UPDATES] = {}
# True / False
self.data[self.SYSTEM_JOB] = True
# workflow_job_template_id -> True / False
self.data[self.JOB_TEMPLATE_JOBS] = {}
self.data[self.SYSTEM_JOB] = {}
self.data[self.WORKFLOW_JOB_TEMPLATES_JOBS] = {}
# project_id -> latest ProjectUpdateLatestDict'
self.data[self.LATEST_PROJECT_UPDATES] = {}
# inventory_source_id -> latest InventoryUpdateLatestDict
self.data[self.LATEST_INVENTORY_UPDATES] = {}
def mark_if_no_key(self, job_type, id, job):
# only mark first occurrence of a task. If 10 of JobA are launched
# (concurrent disabled), the dependency graph should return that jobs
# 2 through 10 are blocked by job1
if id not in self.data[job_type]:
self.data[job_type][id] = job
# inventory_id -> [inventory_source_ids]
self.data[self.INVENTORY_SOURCES] = {}
def get_item(self, job_type, id):
return self.data[job_type].get(id, None)
def add_latest_project_update(self, job):
self.data[self.LATEST_PROJECT_UPDATES][job.project_id] = job
def get_now(self):
return tz_now()
def mark_system_job(self):
self.data[self.SYSTEM_JOB] = False
def mark_system_job(self, job):
# Don't track different types of system jobs, so that only one can run
# at a time. Therefore id in this case is just 'system_job'.
self.mark_if_no_key(self.SYSTEM_JOB, 'system_job', job)
def mark_project_update(self, job):
self.data[self.PROJECT_UPDATES][job.project_id] = False
self.mark_if_no_key(self.PROJECT_UPDATES, job.project_id, job)
def mark_inventory_update(self, inventory_id):
self.data[self.INVENTORY_UPDATES][inventory_id] = False
def mark_inventory_update(self, job):
if type(job) is AdHocCommand:
self.mark_if_no_key(self.INVENTORY_UPDATES, job.inventory_id, job)
else:
self.mark_if_no_key(self.INVENTORY_UPDATES, job.inventory_source.inventory_id, job)
def mark_inventory_source_update(self, inventory_source_id):
self.data[self.INVENTORY_SOURCE_UPDATES][inventory_source_id] = False
def mark_inventory_source_update(self, job):
self.mark_if_no_key(self.INVENTORY_SOURCE_UPDATES, job.inventory_source_id, job)
def mark_job_template_job(self, job):
self.data[self.JOB_TEMPLATE_JOBS][job.job_template_id] = False
self.mark_if_no_key(self.JOB_TEMPLATE_JOBS, job.job_template_id, job)
def mark_workflow_job(self, job):
self.data[self.WORKFLOW_JOB_TEMPLATES_JOBS][job.workflow_job_template_id] = False
self.mark_if_no_key(self.WORKFLOW_JOB_TEMPLATES_JOBS, job.workflow_job_template_id, job)
def can_project_update_run(self, job):
return self.data[self.PROJECT_UPDATES].get(job.project_id, True)
def project_update_blocked_by(self, job):
return self.get_item(self.PROJECT_UPDATES, job.project_id)
def can_inventory_update_run(self, job):
return self.data[self.INVENTORY_SOURCE_UPDATES].get(job.inventory_source_id, True)
def inventory_update_blocked_by(self, job):
return self.get_item(self.INVENTORY_SOURCE_UPDATES, job.inventory_source_id)
def can_job_run(self, job):
if self.data[self.PROJECT_UPDATES].get(job.project_id, True) is True and \
self.data[self.INVENTORY_UPDATES].get(job.inventory_id, True) is True:
if job.allow_simultaneous is False:
return self.data[self.JOB_TEMPLATE_JOBS].get(job.job_template_id, True)
else:
return True
return False
def job_blocked_by(self, job):
project_block = self.get_item(self.PROJECT_UPDATES, job.project_id)
inventory_block = self.get_item(self.INVENTORY_UPDATES, job.inventory_id)
if job.allow_simultaneous is False:
job_block = self.get_item(self.JOB_TEMPLATE_JOBS, job.job_template_id)
else:
job_block = None
return project_block or inventory_block or job_block
def can_workflow_job_run(self, job):
if job.allow_simultaneous:
return True
return self.data[self.WORKFLOW_JOB_TEMPLATES_JOBS].get(job.workflow_job_template_id, True)
def workflow_job_blocked_by(self, job):
if job.allow_simultaneous is False:
return self.get_item(self.WORKFLOW_JOB_TEMPLATES_JOBS, job.workflow_job_template_id)
return None
def can_system_job_run(self):
return self.data[self.SYSTEM_JOB]
def system_job_blocked_by(self, job):
return self.get_item(self.SYSTEM_JOB, 'system_job')
def can_ad_hoc_command_run(self, job):
return self.data[self.INVENTORY_UPDATES].get(job.inventory_id, True)
def ad_hoc_command_blocked_by(self, job):
return self.get_item(self.INVENTORY_UPDATES, job.inventory_id)
def is_job_blocked(self, job):
def task_blocked_by(self, job):
if type(job) is ProjectUpdate:
return not self.can_project_update_run(job)
return self.project_update_blocked_by(job)
elif type(job) is InventoryUpdate:
return not self.can_inventory_update_run(job)
return self.inventory_update_blocked_by(job)
elif type(job) is Job:
return not self.can_job_run(job)
return self.job_blocked_by(job)
elif type(job) is SystemJob:
return not self.can_system_job_run()
return self.system_job_blocked_by(job)
elif type(job) is AdHocCommand:
return not self.can_ad_hoc_command_run(job)
return self.ad_hoc_command_blocked_by(job)
elif type(job) is WorkflowJob:
return not self.can_workflow_job_run(job)
return self.workflow_job_blocked_by(job)
def add_job(self, job):
if type(job) is ProjectUpdate:
self.mark_project_update(job)
elif type(job) is InventoryUpdate:
self.mark_inventory_update(job.inventory_source.inventory_id)
self.mark_inventory_source_update(job.inventory_source_id)
self.mark_inventory_update(job)
self.mark_inventory_source_update(job)
elif type(job) is Job:
self.mark_job_template_job(job)
elif type(job) is WorkflowJob:
self.mark_workflow_job(job)
elif type(job) is SystemJob:
self.mark_system_job()
self.mark_system_job(job)
elif type(job) is AdHocCommand:
self.mark_inventory_update(job.inventory_id)
self.mark_inventory_update(job)
def add_jobs(self, jobs):
for j in jobs:

View File

@@ -64,6 +64,8 @@ class TaskManager():
# will no longer be started and will be started on the next task manager cycle.
self.start_task_limit = settings.START_TASK_LIMIT
self.time_delta_job_explanation = timedelta(seconds=30)
def after_lock_init(self):
'''
Init AFTER we know this instance of the task manager will run because the lock is acquired.
@@ -80,7 +82,7 @@ class TaskManager():
instances_by_hostname = {i.hostname: i for i in instances_partial}
for rampart_group in InstanceGroup.objects.prefetch_related('instances'):
self.graph[rampart_group.name] = dict(graph=DependencyGraph(rampart_group.name),
self.graph[rampart_group.name] = dict(graph=DependencyGraph(),
capacity_total=rampart_group.capacity,
consumed_capacity=0,
instances=[])
@@ -88,18 +90,21 @@ class TaskManager():
if instance.hostname in instances_by_hostname:
self.graph[rampart_group.name]['instances'].append(instances_by_hostname[instance.hostname])
def is_job_blocked(self, task):
def job_blocked_by(self, task):
# TODO: I'm not happy with this, I think blocking behavior should be decided outside of the dependency graph
# in the old task manager this was handled as a method on each task object outside of the graph and
# probably has the side effect of cutting down *a lot* of the logic from this task manager class
for g in self.graph:
if self.graph[g]['graph'].is_job_blocked(task):
return True
blocked_by = self.graph[g]['graph'].task_blocked_by(task)
if blocked_by:
return blocked_by
if not task.dependent_jobs_finished():
return True
blocked_by = task.dependent_jobs.first()
if blocked_by:
return blocked_by
return False
return None
def get_tasks(self, status_list=('pending', 'waiting', 'running')):
jobs = [j for j in Job.objects.filter(status__in=status_list).prefetch_related('instance_group')]
@@ -312,6 +317,7 @@ class TaskManager():
with disable_activity_stream():
task.celery_task_id = str(uuid.uuid4())
task.save()
task.log_lifecycle("waiting")
if rampart_group is not None:
self.consume_capacity(task, rampart_group.name)
@@ -450,6 +456,7 @@ class TaskManager():
def generate_dependencies(self, undeped_tasks):
created_dependencies = []
for task in undeped_tasks:
task.log_lifecycle("acknowledged")
dependencies = []
if not type(task) is Job:
continue
@@ -489,11 +496,18 @@ class TaskManager():
def process_pending_tasks(self, pending_tasks):
running_workflow_templates = set([wf.unified_job_template_id for wf in self.get_running_workflow_jobs()])
tasks_to_update_job_explanation = []
for task in pending_tasks:
if self.start_task_limit <= 0:
break
if self.is_job_blocked(task):
logger.debug("{} is blocked from running".format(task.log_format))
blocked_by = self.job_blocked_by(task)
if blocked_by:
task.log_lifecycle("blocked", blocked_by=blocked_by)
job_explanation = gettext_noop(f"waiting for {blocked_by._meta.model_name}-{blocked_by.id} to finish")
if task.job_explanation != job_explanation:
if task.created < (tz_now() - self.time_delta_job_explanation):
task.job_explanation = job_explanation
tasks_to_update_job_explanation.append(task)
continue
preferred_instance_groups = task.preferred_instance_groups
found_acceptable_queue = False
@@ -539,7 +553,17 @@ class TaskManager():
logger.debug("No instance available in group {} to run job {} w/ capacity requirement {}".format(
rampart_group.name, task.log_format, task.task_impact))
if not found_acceptable_queue:
task.log_lifecycle("needs_capacity")
job_explanation = gettext_noop("This job is not ready to start because there is not enough available capacity.")
if task.job_explanation != job_explanation:
if task.created < (tz_now() - self.time_delta_job_explanation):
# Many launched jobs are immediately blocked, but most blocks will resolve in a few seconds.
# Therefore we should only update the job_explanation after some time has elapsed to
# prevent excessive task saves.
task.job_explanation = job_explanation
tasks_to_update_job_explanation.append(task)
logger.debug("{} couldn't be scheduled on graph, waiting for next cycle".format(task.log_format))
UnifiedJob.objects.bulk_update(tasks_to_update_job_explanation, ['job_explanation'])
def timeout_approval_node(self):
workflow_approvals = WorkflowApproval.objects.filter(status='pending')

View File

@@ -336,6 +336,7 @@ def send_notifications(notification_list, job_id=None):
sent = notification.notification_template.send(notification.subject, notification.body)
notification.status = "successful"
notification.notifications_sent = sent
job_actual.log_lifecycle("notifications_sent")
except Exception as e:
logger.exception("Send Notification Failed {}".format(e))
notification.status = "failed"
@@ -1186,16 +1187,19 @@ class BaseTask(object):
'''
Hook for any steps to run before the job/task starts
'''
instance.log_lifecycle("pre_run")
def post_run_hook(self, instance, status):
'''
Hook for any steps to run before job/task is marked as complete.
'''
instance.log_lifecycle("post_run")
def final_run_hook(self, instance, status, private_data_dir, fact_modification_times, isolated_manager_instance=None):
'''
Hook for any steps to run after job/task is marked as complete.
'''
instance.log_lifecycle("finalize_run")
job_profiling_dir = os.path.join(private_data_dir, 'artifacts/playbook_profiling')
awx_profiling_dir = '/var/log/tower/playbook_profiling/'
if not os.path.exists(awx_profiling_dir):
@@ -1358,7 +1362,6 @@ class BaseTask(object):
# self.instance because of the update_model pattern and when it's used in callback handlers
self.instance = self.update_model(pk, status='running',
start_args='') # blank field to remove encrypted passwords
self.instance.websocket_emit_status("running")
status, rc = 'error', None
extra_update_fields = {}
@@ -1383,6 +1386,7 @@ class BaseTask(object):
self.instance.send_notification_templates("running")
private_data_dir = self.build_private_data_dir(self.instance)
self.pre_run_hook(self.instance, private_data_dir)
self.instance.log_lifecycle("preparing_playbook")
if self.instance.cancel_flag:
self.instance = self.update_model(self.instance.pk, status='canceled')
if self.instance.status != 'running':
@@ -1510,6 +1514,7 @@ class BaseTask(object):
res = ansible_runner.interface.run(**params)
status = res.status
rc = res.rc
self.instance.log_lifecycle("running_playbook")
if status == 'timeout':
self.instance.job_explanation = "Job terminated due to timeout"
@@ -1868,6 +1873,7 @@ class RunJob(BaseTask):
return getattr(settings, 'AWX_PROOT_ENABLED', False)
def pre_run_hook(self, job, private_data_dir):
super(RunJob, self).pre_run_hook(job, private_data_dir)
if job.inventory is None:
error = _('Job could not start because it does not have a valid inventory.')
self.update_model(job.pk, status='failed', job_explanation=error)
@@ -2313,6 +2319,7 @@ class RunProjectUpdate(BaseTask):
'for path {}.'.format(instance.log_format, waiting_time, lock_path))
def pre_run_hook(self, instance, private_data_dir):
super(RunProjectUpdate, self).pre_run_hook(instance, private_data_dir)
# re-create root project folder if a natural disaster has destroyed it
if not os.path.exists(settings.PROJECTS_ROOT):
os.mkdir(settings.PROJECTS_ROOT)
@@ -2408,6 +2415,7 @@ class RunProjectUpdate(BaseTask):
logger.debug('{0} {1} prepared {2} from cache'.format(type(p).__name__, p.pk, dest_subpath))
def post_run_hook(self, instance, status):
super(RunProjectUpdate, self).post_run_hook(instance, status)
# To avoid hangs, very important to release lock even if errors happen here
try:
if self.playbook_new_revision:
@@ -2663,6 +2671,7 @@ class RunInventoryUpdate(BaseTask):
return inventory_update.get_extra_credentials()
def pre_run_hook(self, inventory_update, private_data_dir):
super(RunInventoryUpdate, self).pre_run_hook(inventory_update, private_data_dir)
source_project = None
if inventory_update.inventory_source:
source_project = inventory_update.inventory_source.source_project
@@ -2707,6 +2716,7 @@ class RunInventoryUpdate(BaseTask):
RunProjectUpdate.make_local_copy(source_project, private_data_dir)
def post_run_hook(self, inventory_update, status):
super(RunInventoryUpdate, self).post_run_hook(inventory_update, status)
if status != 'successful':
return # nothing to save, step out of the way to allow error reporting

View File

@@ -348,11 +348,11 @@ def test_job_not_blocking_project_update(default_instance_group, job_template_fa
project_update.instance_group = default_instance_group
project_update.status = "pending"
project_update.save()
assert not task_manager.is_job_blocked(project_update)
assert not task_manager.job_blocked_by(project_update)
dependency_graph = DependencyGraph(None)
dependency_graph = DependencyGraph()
dependency_graph.add_job(job)
assert not dependency_graph.is_job_blocked(project_update)
assert not dependency_graph.task_blocked_by(project_update)
@pytest.mark.django_db
@@ -378,11 +378,11 @@ def test_job_not_blocking_inventory_update(default_instance_group, job_template_
inventory_update.status = "pending"
inventory_update.save()
assert not task_manager.is_job_blocked(inventory_update)
assert not task_manager.job_blocked_by(inventory_update)
dependency_graph = DependencyGraph(None)
dependency_graph = DependencyGraph()
dependency_graph.add_job(job)
assert not dependency_graph.is_job_blocked(inventory_update)
assert not dependency_graph.task_blocked_by(inventory_update)
@pytest.mark.django_db

View File

@@ -3,6 +3,7 @@
from copy import copy
import json
import json_log_formatter
import logging
import traceback
import socket
@@ -14,6 +15,15 @@ from django.core.serializers.json import DjangoJSONEncoder
from django.conf import settings
class JobLifeCycleFormatter(json_log_formatter.JSONFormatter):
def json_record(self, message: str, extra: dict, record: logging.LogRecord):
if 'time' not in extra:
extra['time'] = now()
if record.exc_info:
extra['exc_info'] = self.formatException(record.exc_info)
return extra
class TimeFormatter(logging.Formatter):
'''
Custom log formatter used for inventory imports

View File

@@ -103,6 +103,15 @@ if settings.COLOR_LOGS is True:
from logutils.colorize import ColorizingStreamHandler
class ColorHandler(ColorizingStreamHandler):
def colorize(self, line, record):
# comment out this method if you don't like the job_lifecycle
# logs rendered with cyan text
previous_level_map = self.level_map.copy()
if record.name == "awx.analytics.job_lifecycle":
self.level_map[logging.DEBUG] = (None, 'cyan', True)
msg = super(ColorHandler, self).colorize(line, record)
self.level_map = previous_level_map
return msg
def format(self, record):
message = logging.StreamHandler.format(self, record)