diff --git a/awx/main/dispatch/control.py b/awx/main/dispatch/control.py index 9beb6b4da2..f836e0624c 100644 --- a/awx/main/dispatch/control.py +++ b/awx/main/dispatch/control.py @@ -14,18 +14,18 @@ class Control(object): services = ('dispatcher', 'callback_receiver') result = None - def __init__(self, service): + def __init__(self, service, host=None): if service not in self.services: raise RuntimeError('{} must be in {}'.format(service, self.services)) self.service = service - queuename = get_local_queuename() - self.queue = Queue(queuename, Exchange(queuename), routing_key=queuename) + self.queuename = host or get_local_queuename() + self.queue = Queue(self.queuename, Exchange(self.queuename), routing_key=self.queuename) - def publish(self, msg, conn, host, **kwargs): + def publish(self, msg, conn, **kwargs): producer = Producer( exchange=self.queue.exchange, channel=conn, - routing_key=get_local_queuename() + routing_key=self.queuename ) producer.publish(msg, expiration=5, **kwargs) @@ -35,14 +35,13 @@ class Control(object): def running(self, *args, **kwargs): return self.control_with_reply('running', *args, **kwargs) - def control_with_reply(self, command, host=None, timeout=5): - host = host or settings.CLUSTER_HOST_ID - logger.warn('checking {} {} for {}'.format(self.service, command, host)) + def control_with_reply(self, command, timeout=5): + logger.warn('checking {} {} for {}'.format(self.service, command, self.queuename)) reply_queue = Queue(name="amq.rabbitmq.reply-to") self.result = None with Connection(settings.BROKER_URL) as conn: with Consumer(conn, reply_queue, callbacks=[self.process_message], no_ack=True): - self.publish({'control': command}, conn, host, reply_to='amq.rabbitmq.reply-to') + self.publish({'control': command}, conn, reply_to='amq.rabbitmq.reply-to') try: conn.drain_events(timeout=timeout) except socket.timeout: @@ -50,10 +49,9 @@ class Control(object): raise return self.result - def control(self, msg, host=None, **kwargs): - host = host or settings.CLUSTER_HOST_ID + def control(self, msg, **kwargs): with Connection(settings.BROKER_URL) as conn: - self.publish(msg, conn, host) + self.publish(msg, conn) def process_message(self, body, message): self.result = body diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 082fa8951d..28f348d1e6 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -7,9 +7,11 @@ import json import logging import os import re +import socket import subprocess import tempfile from collections import OrderedDict +import six # Django from django.conf import settings @@ -29,6 +31,7 @@ from polymorphic.models import PolymorphicModel # AWX from awx.main.models.base import * # noqa +from awx.main.dispatch.control import Control as ControlDispatcher from awx.main.models.mixins import ResourceMixin, TaskManagerUnifiedJobMixin from awx.main.utils import ( encrypt_dict, decrypt_field, _inventory_updates, @@ -1248,6 +1251,31 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique # Done! return True + + @property + def actually_running(self): + # returns True if the job is running in the appropriate dispatcher process + running = False + if all([ + self.status == 'running', + self.celery_task_id, + self.execution_node + ]): + # If the job is marked as running, but the dispatcher + # doesn't know about it (or the dispatcher doesn't reply), + # then cancel the job + timeout = 5 + try: + running = self.celery_task_id in ControlDispatcher( + 'dispatcher', self.execution_node + ).running(timeout=timeout) + except socket.timeout: + logger.error(six.text_type( + 'could not reach dispatcher on {} within {}s' + ).format(self.execution_node, timeout)) + running = False + return running + @property def can_cancel(self): return bool(self.status in CAN_CANCEL) @@ -1270,6 +1298,9 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique if self.status in ('pending', 'waiting', 'new'): self.status = 'canceled' cancel_fields.append('status') + if self.status == 'running' and not self.actually_running: + self.status = 'canceled' + cancel_fields.append('status') if job_explanation is not None: self.job_explanation = job_explanation cancel_fields.append('job_explanation') diff --git a/awx/main/models/workflow.py b/awx/main/models/workflow.py index d8698abada..357dd9eeb0 100644 --- a/awx/main/models/workflow.py +++ b/awx/main/models/workflow.py @@ -481,3 +481,9 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, SurveyJobMixin, JobNotificatio @property def preferred_instance_groups(self): return [] + + @property + def actually_running(self): + # WorkflowJobs don't _actually_ run anything in the dispatcher, so + # there's no point in asking the dispatcher if it knows about this task + return self.status == 'running'