Rollback celery

This commit is contained in:
Wayne Witzel III
2018-02-22 09:32:20 -05:00
parent 90bb43ce74
commit 91c6d406c5
10 changed files with 40 additions and 46 deletions

View File

@@ -208,7 +208,7 @@ class Command(BaseCommand):
help = 'Launch the job callback receiver'
def handle(self, *arg, **options):
with Connection(settings.CELERY_BROKER_URL) as conn:
with Connection(settings.BROKER_URL) as conn:
try:
worker = CallbackBrokerWorker(conn)
worker.run()

View File

@@ -28,7 +28,7 @@ from rest_framework.exceptions import ParseError
from polymorphic.models import PolymorphicModel
# Django-Celery
from django_celery_results.models import TaskResult
from djcelery.models import TaskMeta
# AWX
from awx.main.models.base import * # noqa
@@ -1093,8 +1093,8 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
def celery_task(self):
try:
if self.celery_task_id:
return TaskResult.objects.get(task_id=self.celery_task_id)
except TaskResult.DoesNotExist:
return TaskMeta.objects.get(task_id=self.celery_task_id)
except TaskMeta.DoesNotExist:
pass
def get_passwords_needed_to_start(self):
@@ -1335,7 +1335,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
cancel_fields.append('job_explanation')
self.save(update_fields=cancel_fields)
self.websocket_emit_status("canceled")
if settings.CELERY_BROKER_URL.startswith('amqp://'):
if settings.BROKER_URL.startswith('amqp://'):
self._force_cancel()
return self.cancel_flag

View File

@@ -19,7 +19,7 @@ __all__ = ['CallbackQueueDispatcher']
class CallbackQueueDispatcher(object):
def __init__(self):
self.callback_connection = getattr(settings, 'CELERY_BROKER_URL', None)
self.callback_connection = getattr(settings, 'BROKER_URL', None)
self.connection_queue = getattr(settings, 'CALLBACK_QUEUE', '')
self.connection = None
self.exchange = None

View File

@@ -133,7 +133,7 @@ class TaskManager():
def get_active_tasks(self):
if not hasattr(settings, 'IGNORE_CELERY_INSPECTOR'):
app = Celery('awx')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.config_from_object('django.conf:settings')
inspector = Inspect(app=app)
active_task_queues = inspector.active()
else:

View File

@@ -48,7 +48,6 @@ import six
# AWX
from awx import __version__ as awx_application_version
from awx import celery_app
from awx.main.constants import CLOUD_PROVIDERS, PRIVILEGE_ESCALATION_METHODS
from awx.main.models import * # noqa
from awx.main.models.unified_jobs import ACTIVE_STATES
@@ -207,7 +206,7 @@ def handle_ha_toplogy_changes(self):
instance = Instance.objects.me()
logger.debug("Reconfigure celeryd queues task on host {}".format(self.request.hostname))
awx_app = Celery('awx')
awx_app.config_from_object('django.conf:settings', namespace='CELERY')
awx_app.config_from_object('django.conf:settings')
instances, removed_queues, added_queues = register_celery_worker_queues(awx_app, self.request.hostname)
for instance in instances:
logger.info("Workers on tower node '{}' removed from queues {} and added to queues {}"
@@ -2332,10 +2331,3 @@ def deep_copy_model_obj(
importlib.import_module(permission_check_func[0]), permission_check_func[1]
), permission_check_func[2])
permission_check_func(creater, copy_mapping.values())
celery_app.register_task(RunJob())
celery_app.register_task(RunProjectUpdate())
celery_app.register_task(RunInventoryUpdate())
celery_app.register_task(RunAdHocCommand())
celery_app.register_task(RunSystemJob())