mirror of
https://github.com/ansible/awx.git
synced 2026-02-12 07:04:45 -03:30
this is a simple sanity check, but it should help us avoid shooting
ourselves in the foot in complicated scenarios, such as:
1. A dispatcher worker is running a job, and it's killed with `kill -9`
2. The dispatcher attempts to reap jobs with a matching celery_task_id
3. The associated sync project update has the *same* celery_task_id
(an implementation detail of how we implemented that), and it ends
up getting reaped _even though_ it's already finished and has
status=successful
50 lines
1.7 KiB
Python
50 lines
1.7 KiB
Python
from datetime import timedelta
|
|
import logging
|
|
|
|
from django.db.models import Q
|
|
from django.utils.timezone import now as tz_now
|
|
from django.contrib.contenttypes.models import ContentType
|
|
|
|
from awx.main.models import Instance, UnifiedJob, WorkflowJob
|
|
|
|
logger = logging.getLogger('awx.main.dispatch')
|
|
|
|
|
|
def reap_job(j, status):
|
|
if UnifiedJob.objects.get(id=j.id).status not in ('running', 'waiting'):
|
|
# just in case, don't reap jobs that aren't running
|
|
return
|
|
j.status = status
|
|
j.start_args = '' # blank field to remove encrypted passwords
|
|
j.job_explanation += ' '.join((
|
|
'Task was marked as running in Tower but was not present in',
|
|
'the job queue, so it has been marked as failed.',
|
|
))
|
|
j.save(update_fields=['status', 'start_args', 'job_explanation'])
|
|
if hasattr(j, 'send_notification_templates'):
|
|
j.send_notification_templates('failed')
|
|
j.websocket_emit_status(status)
|
|
logger.error(
|
|
'{} is no longer running; reaping'.format(j.log_format)
|
|
)
|
|
|
|
|
|
def reap(instance=None, status='failed', excluded_uuids=[]):
|
|
'''
|
|
Reap all jobs in waiting|running for this instance.
|
|
'''
|
|
me = instance or Instance.objects.me()
|
|
now = tz_now()
|
|
workflow_ctype_id = ContentType.objects.get_for_model(WorkflowJob).id
|
|
jobs = UnifiedJob.objects.filter(
|
|
(
|
|
Q(status='running') |
|
|
Q(status='waiting', modified__lte=now - timedelta(seconds=60))
|
|
) & (
|
|
Q(execution_node=me.hostname) |
|
|
Q(controller_node=me.hostname)
|
|
) & ~Q(polymorphic_ctype_id=workflow_ctype_id)
|
|
).exclude(celery_task_id__in=excluded_uuids)
|
|
for j in jobs:
|
|
reap_job(j, status)
|