add events to job lifecycle

* Note in the job lifecycle when the controller_node and execution_node
  are chosen. This event occurs most commonly in the task manager with a
  couple of exceptions that happen when we dynamically create dependenct
  jobs on the fly in tasks.py
This commit is contained in:
chris meyers 2021-10-15 04:49:28 -04:00 committed by Shane McDonald
parent 3a3fffb2dd
commit 9f8250bd47
No known key found for this signature in database
GPG Key ID: 6F374AF6E9EB9374
3 changed files with 19 additions and 0 deletions

View File

@ -1506,6 +1506,11 @@ class UnifiedJob(
extra["blocked_by"] = blocked_by_msg
else:
msg = f"{self._meta.model_name}-{self.id} {state.replace('_', ' ')}"
if state == "controller_node_chosen":
extra["controller_node"] = self.controller_node or "NOT_SET"
elif state == "execution_node_chosen":
extra["execution_node"] = self.execution_node or "NOT_SET"
logger_job_lifecycle.debug(msg, extra=extra)
@property

View File

@ -291,6 +291,7 @@ class TaskManager:
# act as the controller for k8s API interaction
try:
task.controller_node = Instance.choose_online_control_plane_node()
task.log_lifecycle("controller_node_chosen")
except IndexError:
logger.warning("No control plane nodes available to run containerized job {}".format(task.log_format))
return
@ -298,19 +299,23 @@ class TaskManager:
# project updates and system jobs don't *actually* run in pods, so
# just pick *any* non-containerized host and use it as the execution node
task.execution_node = Instance.choose_online_control_plane_node()
task.log_lifecycle("execution_node_chosen")
logger.debug('Submitting containerized {} to queue {}.'.format(task.log_format, task.execution_node))
else:
task.instance_group = rampart_group
task.execution_node = instance.hostname
task.log_lifecycle("execution_node_chosen")
if instance.node_type == 'execution':
try:
task.controller_node = Instance.choose_online_control_plane_node()
task.log_lifecycle("controller_node_chosen")
except IndexError:
logger.warning("No control plane nodes available to manage {}".format(task.log_format))
return
else:
# control plane nodes will manage jobs locally for performance and resilience
task.controller_node = task.execution_node
task.log_lifecycle("controller_node_chosen")
logger.debug('Submitting job {} to queue {} controlled by {}.'.format(task.log_format, task.execution_node, task.controller_node))
with disable_activity_stream():
task.celery_task_id = str(uuid.uuid4())

View File

@ -1914,6 +1914,7 @@ class RunJob(BaseTask):
status='running',
instance_group=pu_ig,
execution_node=pu_en,
controller_node=pu_en,
celery_task_id=job.celery_task_id,
)
if branch_override:
@ -1922,6 +1923,8 @@ class RunJob(BaseTask):
if 'update_' not in sync_metafields['job_tags']:
sync_metafields['scm_revision'] = job_revision
local_project_sync = job.project.create_project_update(_eager_fields=sync_metafields)
local_project_sync.log_lifecycle("controller_node_chosen")
local_project_sync.log_lifecycle("execution_node_chosen")
create_partition(local_project_sync.event_class._meta.db_table, start=local_project_sync.created)
# save the associated job before calling run() so that a
# cancel() call on the job can cancel the project update
@ -2214,10 +2217,13 @@ class RunProjectUpdate(BaseTask):
status='running',
instance_group=instance_group,
execution_node=project_update.execution_node,
controller_node=project_update.execution_node,
source_project_update=project_update,
celery_task_id=project_update.celery_task_id,
)
)
local_inv_update.log_lifecycle("controller_node_chosen")
local_inv_update.log_lifecycle("execution_node_chosen")
try:
create_partition(local_inv_update.event_class._meta.db_table, start=local_inv_update.created)
inv_update_class().run(local_inv_update.id)
@ -2665,10 +2671,13 @@ class RunInventoryUpdate(BaseTask):
job_tags=','.join(sync_needs),
status='running',
execution_node=Instance.objects.me().hostname,
controller_node=Instance.objects.me().hostname,
instance_group=inventory_update.instance_group,
celery_task_id=inventory_update.celery_task_id,
)
)
local_project_sync.log_lifecycle("controller_node_chosen")
local_project_sync.log_lifecycle("execution_node_chosen")
create_partition(local_project_sync.event_class._meta.db_table, start=local_project_sync.created)
# associate the inventory update before calling run() so that a
# cancel() call on the inventory update can cancel the project update