Merge pull request #2487 from ryanpetrello/improved-amqp-cancel

fix a bug that breaks job cancellation on single node jobs

Reviewed-by: https://github.com/softwarefactory-project-zuul[bot]
This commit is contained in:
softwarefactory-project-zuul[bot]
2018-10-19 13:53:57 +00:00
committed by GitHub
3 changed files with 47 additions and 12 deletions

View File

@@ -14,18 +14,18 @@ class Control(object):
services = ('dispatcher', 'callback_receiver') services = ('dispatcher', 'callback_receiver')
result = None result = None
def __init__(self, service): def __init__(self, service, host=None):
if service not in self.services: if service not in self.services:
raise RuntimeError('{} must be in {}'.format(service, self.services)) raise RuntimeError('{} must be in {}'.format(service, self.services))
self.service = service self.service = service
queuename = get_local_queuename() self.queuename = host or get_local_queuename()
self.queue = Queue(queuename, Exchange(queuename), routing_key=queuename) self.queue = Queue(self.queuename, Exchange(self.queuename), routing_key=self.queuename)
def publish(self, msg, conn, host, **kwargs): def publish(self, msg, conn, **kwargs):
producer = Producer( producer = Producer(
exchange=self.queue.exchange, exchange=self.queue.exchange,
channel=conn, channel=conn,
routing_key=get_local_queuename() routing_key=self.queuename
) )
producer.publish(msg, expiration=5, **kwargs) producer.publish(msg, expiration=5, **kwargs)
@@ -35,14 +35,13 @@ class Control(object):
def running(self, *args, **kwargs): def running(self, *args, **kwargs):
return self.control_with_reply('running', *args, **kwargs) return self.control_with_reply('running', *args, **kwargs)
def control_with_reply(self, command, host=None, timeout=5): def control_with_reply(self, command, timeout=5):
host = host or settings.CLUSTER_HOST_ID logger.warn('checking {} {} for {}'.format(self.service, command, self.queuename))
logger.warn('checking {} {} for {}'.format(self.service, command, host))
reply_queue = Queue(name="amq.rabbitmq.reply-to") reply_queue = Queue(name="amq.rabbitmq.reply-to")
self.result = None self.result = None
with Connection(settings.BROKER_URL) as conn: with Connection(settings.BROKER_URL) as conn:
with Consumer(conn, reply_queue, callbacks=[self.process_message], no_ack=True): with Consumer(conn, reply_queue, callbacks=[self.process_message], no_ack=True):
self.publish({'control': command}, conn, host, reply_to='amq.rabbitmq.reply-to') self.publish({'control': command}, conn, reply_to='amq.rabbitmq.reply-to')
try: try:
conn.drain_events(timeout=timeout) conn.drain_events(timeout=timeout)
except socket.timeout: except socket.timeout:
@@ -50,10 +49,9 @@ class Control(object):
raise raise
return self.result return self.result
def control(self, msg, host=None, **kwargs): def control(self, msg, **kwargs):
host = host or settings.CLUSTER_HOST_ID
with Connection(settings.BROKER_URL) as conn: with Connection(settings.BROKER_URL) as conn:
self.publish(msg, conn, host) self.publish(msg, conn)
def process_message(self, body, message): def process_message(self, body, message):
self.result = body self.result = body

View File

@@ -7,9 +7,11 @@ import json
import logging import logging
import os import os
import re import re
import socket
import subprocess import subprocess
import tempfile import tempfile
from collections import OrderedDict from collections import OrderedDict
import six
# Django # Django
from django.conf import settings from django.conf import settings
@@ -29,6 +31,7 @@ from polymorphic.models import PolymorphicModel
# AWX # AWX
from awx.main.models.base import * # noqa from awx.main.models.base import * # noqa
from awx.main.dispatch.control import Control as ControlDispatcher
from awx.main.models.mixins import ResourceMixin, TaskManagerUnifiedJobMixin from awx.main.models.mixins import ResourceMixin, TaskManagerUnifiedJobMixin
from awx.main.utils import ( from awx.main.utils import (
encrypt_dict, decrypt_field, _inventory_updates, encrypt_dict, decrypt_field, _inventory_updates,
@@ -1248,6 +1251,31 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
# Done! # Done!
return True return True
@property
def actually_running(self):
# returns True if the job is running in the appropriate dispatcher process
running = False
if all([
self.status == 'running',
self.celery_task_id,
self.execution_node
]):
# If the job is marked as running, but the dispatcher
# doesn't know about it (or the dispatcher doesn't reply),
# then cancel the job
timeout = 5
try:
running = self.celery_task_id in ControlDispatcher(
'dispatcher', self.execution_node
).running(timeout=timeout)
except socket.timeout:
logger.error(six.text_type(
'could not reach dispatcher on {} within {}s'
).format(self.execution_node, timeout))
running = False
return running
@property @property
def can_cancel(self): def can_cancel(self):
return bool(self.status in CAN_CANCEL) return bool(self.status in CAN_CANCEL)
@@ -1270,6 +1298,9 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
if self.status in ('pending', 'waiting', 'new'): if self.status in ('pending', 'waiting', 'new'):
self.status = 'canceled' self.status = 'canceled'
cancel_fields.append('status') cancel_fields.append('status')
if self.status == 'running' and not self.actually_running:
self.status = 'canceled'
cancel_fields.append('status')
if job_explanation is not None: if job_explanation is not None:
self.job_explanation = job_explanation self.job_explanation = job_explanation
cancel_fields.append('job_explanation') cancel_fields.append('job_explanation')

View File

@@ -481,3 +481,9 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, SurveyJobMixin, JobNotificatio
@property @property
def preferred_instance_groups(self): def preferred_instance_groups(self):
return [] return []
@property
def actually_running(self):
# WorkflowJobs don't _actually_ run anything in the dispatcher, so
# there's no point in asking the dispatcher if it knows about this task
return self.status == 'running'