mirror of
https://github.com/ansible/awx.git
synced 2026-03-18 09:27:31 -02:30
fix a bug that breaks job cancel on single node jobs
1. Install awx w/ a single node.
2. Start a long-running job.
3. Forcibly kill the `awx-manage run_dispatcher` process (e.g.,
SIGKILL) and do not start it again.
4. The job remains in running - without a second cluster to discover
the job, it is never reaped.
5. This PR allows you to cancel the job from the UI+API.
This commit is contained in:
@@ -14,18 +14,18 @@ class Control(object):
|
|||||||
services = ('dispatcher', 'callback_receiver')
|
services = ('dispatcher', 'callback_receiver')
|
||||||
result = None
|
result = None
|
||||||
|
|
||||||
def __init__(self, service):
|
def __init__(self, service, host=None):
|
||||||
if service not in self.services:
|
if service not in self.services:
|
||||||
raise RuntimeError('{} must be in {}'.format(service, self.services))
|
raise RuntimeError('{} must be in {}'.format(service, self.services))
|
||||||
self.service = service
|
self.service = service
|
||||||
queuename = get_local_queuename()
|
self.queuename = host or get_local_queuename()
|
||||||
self.queue = Queue(queuename, Exchange(queuename), routing_key=queuename)
|
self.queue = Queue(self.queuename, Exchange(self.queuename), routing_key=self.queuename)
|
||||||
|
|
||||||
def publish(self, msg, conn, host, **kwargs):
|
def publish(self, msg, conn, **kwargs):
|
||||||
producer = Producer(
|
producer = Producer(
|
||||||
exchange=self.queue.exchange,
|
exchange=self.queue.exchange,
|
||||||
channel=conn,
|
channel=conn,
|
||||||
routing_key=get_local_queuename()
|
routing_key=self.queuename
|
||||||
)
|
)
|
||||||
producer.publish(msg, expiration=5, **kwargs)
|
producer.publish(msg, expiration=5, **kwargs)
|
||||||
|
|
||||||
@@ -35,14 +35,13 @@ class Control(object):
|
|||||||
def running(self, *args, **kwargs):
|
def running(self, *args, **kwargs):
|
||||||
return self.control_with_reply('running', *args, **kwargs)
|
return self.control_with_reply('running', *args, **kwargs)
|
||||||
|
|
||||||
def control_with_reply(self, command, host=None, timeout=5):
|
def control_with_reply(self, command, timeout=5):
|
||||||
host = host or settings.CLUSTER_HOST_ID
|
logger.warn('checking {} {} for {}'.format(self.service, command, self.queuename))
|
||||||
logger.warn('checking {} {} for {}'.format(self.service, command, host))
|
|
||||||
reply_queue = Queue(name="amq.rabbitmq.reply-to")
|
reply_queue = Queue(name="amq.rabbitmq.reply-to")
|
||||||
self.result = None
|
self.result = None
|
||||||
with Connection(settings.BROKER_URL) as conn:
|
with Connection(settings.BROKER_URL) as conn:
|
||||||
with Consumer(conn, reply_queue, callbacks=[self.process_message], no_ack=True):
|
with Consumer(conn, reply_queue, callbacks=[self.process_message], no_ack=True):
|
||||||
self.publish({'control': command}, conn, host, reply_to='amq.rabbitmq.reply-to')
|
self.publish({'control': command}, conn, reply_to='amq.rabbitmq.reply-to')
|
||||||
try:
|
try:
|
||||||
conn.drain_events(timeout=timeout)
|
conn.drain_events(timeout=timeout)
|
||||||
except socket.timeout:
|
except socket.timeout:
|
||||||
@@ -50,10 +49,9 @@ class Control(object):
|
|||||||
raise
|
raise
|
||||||
return self.result
|
return self.result
|
||||||
|
|
||||||
def control(self, msg, host=None, **kwargs):
|
def control(self, msg, **kwargs):
|
||||||
host = host or settings.CLUSTER_HOST_ID
|
|
||||||
with Connection(settings.BROKER_URL) as conn:
|
with Connection(settings.BROKER_URL) as conn:
|
||||||
self.publish(msg, conn, host)
|
self.publish(msg, conn)
|
||||||
|
|
||||||
def process_message(self, body, message):
|
def process_message(self, body, message):
|
||||||
self.result = body
|
self.result = body
|
||||||
|
|||||||
@@ -7,9 +7,11 @@ import json
|
|||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
|
import socket
|
||||||
import subprocess
|
import subprocess
|
||||||
import tempfile
|
import tempfile
|
||||||
from collections import OrderedDict
|
from collections import OrderedDict
|
||||||
|
import six
|
||||||
|
|
||||||
# Django
|
# Django
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
@@ -29,6 +31,7 @@ from polymorphic.models import PolymorphicModel
|
|||||||
|
|
||||||
# AWX
|
# AWX
|
||||||
from awx.main.models.base import * # noqa
|
from awx.main.models.base import * # noqa
|
||||||
|
from awx.main.dispatch.control import Control as ControlDispatcher
|
||||||
from awx.main.models.mixins import ResourceMixin, TaskManagerUnifiedJobMixin
|
from awx.main.models.mixins import ResourceMixin, TaskManagerUnifiedJobMixin
|
||||||
from awx.main.utils import (
|
from awx.main.utils import (
|
||||||
encrypt_dict, decrypt_field, _inventory_updates,
|
encrypt_dict, decrypt_field, _inventory_updates,
|
||||||
@@ -1248,6 +1251,31 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
|
|||||||
# Done!
|
# Done!
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
@property
|
||||||
|
def actually_running(self):
|
||||||
|
# returns True if the job is running in the appropriate dispatcher process
|
||||||
|
running = False
|
||||||
|
if all([
|
||||||
|
self.status == 'running',
|
||||||
|
self.celery_task_id,
|
||||||
|
self.execution_node
|
||||||
|
]):
|
||||||
|
# If the job is marked as running, but the dispatcher
|
||||||
|
# doesn't know about it (or the dispatcher doesn't reply),
|
||||||
|
# then cancel the job
|
||||||
|
timeout = 5
|
||||||
|
try:
|
||||||
|
running = self.celery_task_id in ControlDispatcher(
|
||||||
|
'dispatcher', self.execution_node
|
||||||
|
).running(timeout=timeout)
|
||||||
|
except socket.timeout:
|
||||||
|
logger.error(six.text_type(
|
||||||
|
'could not reach dispatcher on {} within {}s'
|
||||||
|
).format(self.execution_node, timeout))
|
||||||
|
running = False
|
||||||
|
return running
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def can_cancel(self):
|
def can_cancel(self):
|
||||||
return bool(self.status in CAN_CANCEL)
|
return bool(self.status in CAN_CANCEL)
|
||||||
@@ -1270,6 +1298,9 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
|
|||||||
if self.status in ('pending', 'waiting', 'new'):
|
if self.status in ('pending', 'waiting', 'new'):
|
||||||
self.status = 'canceled'
|
self.status = 'canceled'
|
||||||
cancel_fields.append('status')
|
cancel_fields.append('status')
|
||||||
|
if self.status == 'running' and not self.actually_running:
|
||||||
|
self.status = 'canceled'
|
||||||
|
cancel_fields.append('status')
|
||||||
if job_explanation is not None:
|
if job_explanation is not None:
|
||||||
self.job_explanation = job_explanation
|
self.job_explanation = job_explanation
|
||||||
cancel_fields.append('job_explanation')
|
cancel_fields.append('job_explanation')
|
||||||
|
|||||||
@@ -481,3 +481,9 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, SurveyJobMixin, JobNotificatio
|
|||||||
@property
|
@property
|
||||||
def preferred_instance_groups(self):
|
def preferred_instance_groups(self):
|
||||||
return []
|
return []
|
||||||
|
|
||||||
|
@property
|
||||||
|
def actually_running(self):
|
||||||
|
# WorkflowJobs don't _actually_ run anything in the dispatcher, so
|
||||||
|
# there's no point in asking the dispatcher if it knows about this task
|
||||||
|
return self.status == 'running'
|
||||||
|
|||||||
Reference in New Issue
Block a user