From 9f8250bd4757f050b35f71c8b3bdbd464f694c1d Mon Sep 17 00:00:00 2001 From: chris meyers Date: Fri, 15 Oct 2021 04:49:28 -0400 Subject: [PATCH] add events to job lifecycle * Note in the job lifecycle when the controller_node and execution_node are chosen. This event occurs most commonly in the task manager with a couple of exceptions that happen when we dynamically create dependenct jobs on the fly in tasks.py --- awx/main/models/unified_jobs.py | 5 +++++ awx/main/scheduler/task_manager.py | 5 +++++ awx/main/tasks.py | 9 +++++++++ 3 files changed, 19 insertions(+) diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 2cb2fd28af..5281f25e6f 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -1506,6 +1506,11 @@ class UnifiedJob( extra["blocked_by"] = blocked_by_msg else: msg = f"{self._meta.model_name}-{self.id} {state.replace('_', ' ')}" + + if state == "controller_node_chosen": + extra["controller_node"] = self.controller_node or "NOT_SET" + elif state == "execution_node_chosen": + extra["execution_node"] = self.execution_node or "NOT_SET" logger_job_lifecycle.debug(msg, extra=extra) @property diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 2944562723..ff48c5267c 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -291,6 +291,7 @@ class TaskManager: # act as the controller for k8s API interaction try: task.controller_node = Instance.choose_online_control_plane_node() + task.log_lifecycle("controller_node_chosen") except IndexError: logger.warning("No control plane nodes available to run containerized job {}".format(task.log_format)) return @@ -298,19 +299,23 @@ class TaskManager: # project updates and system jobs don't *actually* run in pods, so # just pick *any* non-containerized host and use it as the execution node task.execution_node = Instance.choose_online_control_plane_node() + task.log_lifecycle("execution_node_chosen") logger.debug('Submitting containerized {} to queue {}.'.format(task.log_format, task.execution_node)) else: task.instance_group = rampart_group task.execution_node = instance.hostname + task.log_lifecycle("execution_node_chosen") if instance.node_type == 'execution': try: task.controller_node = Instance.choose_online_control_plane_node() + task.log_lifecycle("controller_node_chosen") except IndexError: logger.warning("No control plane nodes available to manage {}".format(task.log_format)) return else: # control plane nodes will manage jobs locally for performance and resilience task.controller_node = task.execution_node + task.log_lifecycle("controller_node_chosen") logger.debug('Submitting job {} to queue {} controlled by {}.'.format(task.log_format, task.execution_node, task.controller_node)) with disable_activity_stream(): task.celery_task_id = str(uuid.uuid4()) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 658cf19387..acc60fbf98 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -1914,6 +1914,7 @@ class RunJob(BaseTask): status='running', instance_group=pu_ig, execution_node=pu_en, + controller_node=pu_en, celery_task_id=job.celery_task_id, ) if branch_override: @@ -1922,6 +1923,8 @@ class RunJob(BaseTask): if 'update_' not in sync_metafields['job_tags']: sync_metafields['scm_revision'] = job_revision local_project_sync = job.project.create_project_update(_eager_fields=sync_metafields) + local_project_sync.log_lifecycle("controller_node_chosen") + local_project_sync.log_lifecycle("execution_node_chosen") create_partition(local_project_sync.event_class._meta.db_table, start=local_project_sync.created) # save the associated job before calling run() so that a # cancel() call on the job can cancel the project update @@ -2214,10 +2217,13 @@ class RunProjectUpdate(BaseTask): status='running', instance_group=instance_group, execution_node=project_update.execution_node, + controller_node=project_update.execution_node, source_project_update=project_update, celery_task_id=project_update.celery_task_id, ) ) + local_inv_update.log_lifecycle("controller_node_chosen") + local_inv_update.log_lifecycle("execution_node_chosen") try: create_partition(local_inv_update.event_class._meta.db_table, start=local_inv_update.created) inv_update_class().run(local_inv_update.id) @@ -2665,10 +2671,13 @@ class RunInventoryUpdate(BaseTask): job_tags=','.join(sync_needs), status='running', execution_node=Instance.objects.me().hostname, + controller_node=Instance.objects.me().hostname, instance_group=inventory_update.instance_group, celery_task_id=inventory_update.celery_task_id, ) ) + local_project_sync.log_lifecycle("controller_node_chosen") + local_project_sync.log_lifecycle("execution_node_chosen") create_partition(local_project_sync.event_class._meta.db_table, start=local_project_sync.created) # associate the inventory update before calling run() so that a # cancel() call on the inventory update can cancel the project update