Make canceling view non-atomic to fix 500 errors with job bursts (#13072)

* Make canceling view non-atomic to fix 500 errors with job bursts

* Update test calls for cancel method changes
This commit is contained in:
Alan Rominger 2022-10-20 15:02:54 -04:00 committed by GitHub
parent e013d25e2d
commit 192f45bbd0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 50 additions and 71 deletions

View File

@ -13,7 +13,7 @@ from django.contrib.auth import views as auth_views
from django.contrib.contenttypes.models import ContentType
from django.core.cache import cache
from django.core.exceptions import FieldDoesNotExist
from django.db import connection
from django.db import connection, transaction
from django.db.models.fields.related import OneToOneRel
from django.http import QueryDict
from django.shortcuts import get_object_or_404
@ -64,6 +64,7 @@ __all__ = [
'ParentMixin',
'SubListAttachDetachAPIView',
'CopyAPIView',
'GenericCancelView',
'BaseUsersList',
]
@ -985,6 +986,23 @@ class CopyAPIView(GenericAPIView):
return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers)
class GenericCancelView(RetrieveAPIView):
# In subclass set model, serializer_class
obj_permission_type = 'cancel'
@transaction.non_atomic_requests
def dispatch(self, *args, **kwargs):
return super(GenericCancelView, self).dispatch(*args, **kwargs)
def post(self, request, *args, **kwargs):
obj = self.get_object()
if obj.can_cancel:
obj.cancel()
return Response(status=status.HTTP_202_ACCEPTED)
else:
return self.http_method_not_allowed(request, *args, **kwargs)
class BaseUsersList(SubListCreateAttachDetachAPIView):
def post(self, request, *args, **kwargs):
ret = super(BaseUsersList, self).post(request, *args, **kwargs)

View File

@ -69,6 +69,7 @@ from awx.api.generics import (
APIView,
BaseUsersList,
CopyAPIView,
GenericCancelView,
GenericAPIView,
ListAPIView,
ListCreateAPIView,
@ -976,20 +977,11 @@ class SystemJobEventsList(SubListAPIView):
return job.get_event_queryset()
class ProjectUpdateCancel(RetrieveAPIView):
class ProjectUpdateCancel(GenericCancelView):
model = models.ProjectUpdate
obj_permission_type = 'cancel'
serializer_class = serializers.ProjectUpdateCancelSerializer
def post(self, request, *args, **kwargs):
obj = self.get_object()
if obj.can_cancel:
obj.cancel()
return Response(status=status.HTTP_202_ACCEPTED)
else:
return self.http_method_not_allowed(request, *args, **kwargs)
class ProjectUpdateNotificationsList(SubListAPIView):
@ -2262,20 +2254,11 @@ class InventoryUpdateCredentialsList(SubListAPIView):
relationship = 'credentials'
class InventoryUpdateCancel(RetrieveAPIView):
class InventoryUpdateCancel(GenericCancelView):
model = models.InventoryUpdate
obj_permission_type = 'cancel'
serializer_class = serializers.InventoryUpdateCancelSerializer
def post(self, request, *args, **kwargs):
obj = self.get_object()
if obj.can_cancel:
obj.cancel()
return Response(status=status.HTTP_202_ACCEPTED)
else:
return self.http_method_not_allowed(request, *args, **kwargs)
class InventoryUpdateNotificationsList(SubListAPIView):
@ -3352,20 +3335,15 @@ class WorkflowJobWorkflowNodesList(SubListAPIView):
return super(WorkflowJobWorkflowNodesList, self).get_queryset().order_by('id')
class WorkflowJobCancel(RetrieveAPIView):
class WorkflowJobCancel(GenericCancelView):
model = models.WorkflowJob
obj_permission_type = 'cancel'
serializer_class = serializers.WorkflowJobCancelSerializer
def post(self, request, *args, **kwargs):
obj = self.get_object()
if obj.can_cancel:
obj.cancel()
ScheduleWorkflowManager().schedule()
return Response(status=status.HTTP_202_ACCEPTED)
else:
return self.http_method_not_allowed(request, *args, **kwargs)
r = super().post(request, *args, **kwargs)
ScheduleWorkflowManager().schedule()
return r
class WorkflowJobNotificationsList(SubListAPIView):
@ -3521,20 +3499,11 @@ class JobActivityStreamList(SubListAPIView):
search_fields = ('changes',)
class JobCancel(RetrieveAPIView):
class JobCancel(GenericCancelView):
model = models.Job
obj_permission_type = 'cancel'
serializer_class = serializers.JobCancelSerializer
def post(self, request, *args, **kwargs):
obj = self.get_object()
if obj.can_cancel:
obj.cancel()
return Response(status=status.HTTP_202_ACCEPTED)
else:
return self.http_method_not_allowed(request, *args, **kwargs)
class JobRelaunch(RetrieveAPIView):
@ -4005,20 +3974,11 @@ class AdHocCommandDetail(UnifiedJobDeletionMixin, RetrieveDestroyAPIView):
serializer_class = serializers.AdHocCommandDetailSerializer
class AdHocCommandCancel(RetrieveAPIView):
class AdHocCommandCancel(GenericCancelView):
model = models.AdHocCommand
obj_permission_type = 'cancel'
serializer_class = serializers.AdHocCommandCancelSerializer
def post(self, request, *args, **kwargs):
obj = self.get_object()
if obj.can_cancel:
obj.cancel()
return Response(status=status.HTTP_202_ACCEPTED)
else:
return self.http_method_not_allowed(request, *args, **kwargs)
class AdHocCommandRelaunch(GenericAPIView):
@ -4153,20 +4113,11 @@ class SystemJobDetail(UnifiedJobDeletionMixin, RetrieveDestroyAPIView):
serializer_class = serializers.SystemJobSerializer
class SystemJobCancel(RetrieveAPIView):
class SystemJobCancel(GenericCancelView):
model = models.SystemJob
obj_permission_type = 'cancel'
serializer_class = serializers.SystemJobCancelSerializer
def post(self, request, *args, **kwargs):
obj = self.get_object()
if obj.can_cancel:
obj.cancel()
return Response(status=status.HTTP_202_ACCEPTED)
else:
return self.http_method_not_allowed(request, *args, **kwargs)
class SystemJobNotificationsList(SubListAPIView):

View File

@ -3,6 +3,7 @@ import uuid
import json
from django.conf import settings
from django.db import connection
import redis
from awx.main.dispatch import get_local_queuename
@ -49,7 +50,10 @@ class Control(object):
reply_queue = Control.generate_reply_queue_name()
self.result = None
with pg_bus_conn(new_connection=True) as conn:
if not connection.get_autocommit():
raise RuntimeError('Control-with-reply messages can only be done in autocommit mode')
with pg_bus_conn() as conn:
conn.listen(reply_queue)
send_data = {'control': command, 'reply_to': reply_queue}
if extra_data:

View File

@ -1465,23 +1465,23 @@ class UnifiedJob(
self.job_explanation = job_explanation
cancel_fields.append('job_explanation')
# Important to save here before sending cancel signal to dispatcher to cancel because
# the job control process will use the cancel_flag to distinguish a shutdown from a cancel
self.save(update_fields=cancel_fields)
controller_notified = False
if self.celery_task_id:
controller_notified = self.cancel_dispatcher_process()
else:
# Avoid race condition where we have stale model from pending state but job has already started,
# its checking signal but not cancel_flag, so re-send signal after this database commit
connection.on_commit(self.fallback_cancel)
# If a SIGTERM signal was sent to the control process, and acked by the dispatcher
# then we want to let its own cleanup change status, otherwise change status now
if not controller_notified:
if self.status != 'canceled':
self.status = 'canceled'
cancel_fields.append('status')
self.save(update_fields=cancel_fields)
self.save(update_fields=['status'])
# Avoid race condition where we have stale model from pending state but job has already started,
# its checking signal but not cancel_flag, so re-send signal after updating cancel fields
self.fallback_cancel()
return self.cancel_flag

View File

@ -50,7 +50,10 @@ def test_cancel(unified_job):
# Some more thought may want to go into only emitting canceled if/when the job record
# status is changed to canceled. Unlike, currently, where it's emitted unconditionally.
unified_job.websocket_emit_status.assert_called_with("canceled")
unified_job.save.assert_called_with(update_fields=['cancel_flag', 'start_args', 'status'])
assert [(args, kwargs) for args, kwargs in unified_job.save.call_args_list] == [
((), {'update_fields': ['cancel_flag', 'start_args']}),
((), {'update_fields': ['status']}),
]
def test_cancel_job_explanation(unified_job):
@ -60,7 +63,10 @@ def test_cancel_job_explanation(unified_job):
unified_job.cancel(job_explanation=job_explanation)
assert unified_job.job_explanation == job_explanation
unified_job.save.assert_called_with(update_fields=['cancel_flag', 'start_args', 'job_explanation', 'status'])
assert [(args, kwargs) for args, kwargs in unified_job.save.call_args_list] == [
((), {'update_fields': ['cancel_flag', 'start_args', 'job_explanation']}),
((), {'update_fields': ['status']}),
]
def test_organization_copy_to_jobs():