Merge pull request #1372 from chrismeyersfsu/old-celery3

celery 4.x to 3.x roll back
This commit is contained in:
Chris Meyers
2018-02-27 15:26:46 -05:00
committed by GitHub
20 changed files with 70 additions and 78 deletions

View File

@@ -208,7 +208,7 @@ class Command(BaseCommand):
help = 'Launch the job callback receiver'
def handle(self, *arg, **options):
with Connection(settings.CELERY_BROKER_URL) as conn:
with Connection(settings.BROKER_URL) as conn:
try:
worker = CallbackBrokerWorker(conn)
worker.run()

View File

@@ -28,7 +28,7 @@ from rest_framework.exceptions import ParseError
from polymorphic.models import PolymorphicModel
# Django-Celery
from django_celery_results.models import TaskResult
from djcelery.models import TaskMeta
# AWX
from awx.main.models.base import * # noqa
@@ -1093,8 +1093,8 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
def celery_task(self):
try:
if self.celery_task_id:
return TaskResult.objects.get(task_id=self.celery_task_id)
except TaskResult.DoesNotExist:
return TaskMeta.objects.get(task_id=self.celery_task_id)
except TaskMeta.DoesNotExist:
pass
def get_passwords_needed_to_start(self):
@@ -1335,7 +1335,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
cancel_fields.append('job_explanation')
self.save(update_fields=cancel_fields)
self.websocket_emit_status("canceled")
if settings.CELERY_BROKER_URL.startswith('amqp://'):
if settings.BROKER_URL.startswith('amqp://'):
self._force_cancel()
return self.cancel_flag

View File

@@ -19,7 +19,7 @@ __all__ = ['CallbackQueueDispatcher']
class CallbackQueueDispatcher(object):
def __init__(self):
self.callback_connection = getattr(settings, 'CELERY_BROKER_URL', None)
self.callback_connection = getattr(settings, 'BROKER_URL', None)
self.connection_queue = getattr(settings, 'CALLBACK_QUEUE', '')
self.connection = None
self.exchange = None

View File

@@ -133,7 +133,7 @@ class TaskManager():
def get_active_tasks(self):
if not hasattr(settings, 'IGNORE_CELERY_INSPECTOR'):
app = Celery('awx')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.config_from_object('django.conf:settings')
inspector = Inspect(app=app)
active_task_queues = inspector.active()
else:

View File

@@ -47,7 +47,6 @@ from crum import impersonate
# AWX
from awx import __version__ as awx_application_version
from awx import celery_app
from awx.main.constants import CLOUD_PROVIDERS, PRIVILEGE_ESCALATION_METHODS
from awx.main.models import * # noqa
from awx.main.models.unified_jobs import ACTIVE_STATES
@@ -208,14 +207,14 @@ def handle_ha_toplogy_changes(self):
instance = Instance.objects.me()
logger.debug("Reconfigure celeryd queues task on host {}".format(self.request.hostname))
awx_app = Celery('awx')
awx_app.config_from_object('django.conf:settings', namespace='CELERY')
awx_app.config_from_object('django.conf:settings')
instances, removed_queues, added_queues = register_celery_worker_queues(awx_app, self.request.hostname)
for instance in instances:
logger.info("Workers on tower node '{}' removed from queues {} and added to queues {}"
.format(instance.hostname, removed_queues, added_queues))
updated_routes = update_celery_worker_routes(instance, settings)
logger.info("Worker on tower node '{}' updated celery routes {} all routes are now {}"
.format(instance.hostname, updated_routes, self.app.conf.CELERY_TASK_ROUTES))
.format(instance.hostname, updated_routes, self.app.conf.CELERY_ROUTES))
@worker_ready.connect
@@ -234,7 +233,7 @@ def handle_update_celery_routes(sender=None, conf=None, **kwargs):
instance = Instance.objects.me()
added_routes = update_celery_worker_routes(instance, conf)
logger.info("Workers on tower node '{}' added routes {} all routes are now {}"
.format(instance.hostname, added_routes, conf.CELERY_TASK_ROUTES))
.format(instance.hostname, added_routes, conf.CELERY_ROUTES))
@celeryd_after_setup.connect
@@ -2359,10 +2358,3 @@ def deep_copy_model_obj(
importlib.import_module(permission_check_func[0]), permission_check_func[1]
), permission_check_func[2])
permission_check_func(creater, copy_mapping.values())
celery_app.register_task(RunJob())
celery_app.register_task(RunProjectUpdate())
celery_app.register_task(RunInventoryUpdate())
celery_app.register_task(RunAdHocCommand())
celery_app.register_task(RunSystemJob())

View File

@@ -73,7 +73,7 @@ def celery_memory_broker():
Allows django signal code to execute without the need for redis
'''
settings.CELERY_BROKER_URL='memory://localhost/'
settings.BROKER_URL='memory://localhost/'
@pytest.fixture

View File

@@ -8,11 +8,11 @@ from datetime import timedelta
('admin_checks', 'awx.main.tasks.run_administrative_checks'),
('tower_scheduler', 'awx.main.tasks.awx_periodic_scheduler'),
])
def test_CELERY_BEAT_SCHEDULE(mocker, job_name, function_path):
assert job_name in settings.CELERY_BEAT_SCHEDULE
assert 'schedule' in settings.CELERY_BEAT_SCHEDULE[job_name]
assert type(settings.CELERY_BEAT_SCHEDULE[job_name]['schedule']) is timedelta
assert settings.CELERY_BEAT_SCHEDULE[job_name]['task'] == function_path
def test_CELERYBEAT_SCHEDULE(mocker, job_name, function_path):
assert job_name in settings.CELERYBEAT_SCHEDULE
assert 'schedule' in settings.CELERYBEAT_SCHEDULE[job_name]
assert type(settings.CELERYBEAT_SCHEDULE[job_name]['schedule']) is timedelta
assert settings.CELERYBEAT_SCHEDULE[job_name]['task'] == function_path
# Ensures that the function exists
mocker.patch(function_path)

View File

@@ -17,7 +17,7 @@ from awx.main.utils.ha import (
@pytest.fixture
def conf():
class Conf():
CELERY_TASK_ROUTES = dict()
CELERY_ROUTES = dict()
CELERYBEAT_SCHEDULE = dict()
return Conf()
@@ -88,14 +88,14 @@ class TestUpdateCeleryWorkerRoutes():
instance.is_controller = mocker.MagicMock(return_value=is_controller)
assert update_celery_worker_routes(instance, conf) == expected_routes
assert conf.CELERY_TASK_ROUTES == expected_routes
assert conf.CELERY_ROUTES == expected_routes
def test_update_celery_worker_routes_deleted(self, mocker, conf):
instance = mocker.MagicMock()
instance.hostname = 'east-1'
instance.is_controller = mocker.MagicMock(return_value=False)
conf.CELERY_TASK_ROUTES = {'awx.main.tasks.awx_isolated_heartbeat': 'foobar'}
conf.CELERY_ROUTES = {'awx.main.tasks.awx_isolated_heartbeat': 'foobar'}
update_celery_worker_routes(instance, conf)
assert 'awx.main.tasks.awx_isolated_heartbeat' not in conf.CELERY_TASK_ROUTES
assert 'awx.main.tasks.awx_isolated_heartbeat' not in conf.CELERY_ROUTES

View File

@@ -48,12 +48,12 @@ def update_celery_worker_routes(instance, conf):
if instance.is_controller():
tasks.append('awx.main.tasks.awx_isolated_heartbeat')
else:
if 'awx.main.tasks.awx_isolated_heartbeat' in conf.CELERY_TASK_ROUTES:
del conf.CELERY_TASK_ROUTES['awx.main.tasks.awx_isolated_heartbeat']
if 'awx.main.tasks.awx_isolated_heartbeat' in conf.CELERY_ROUTES:
del conf.CELERY_ROUTES['awx.main.tasks.awx_isolated_heartbeat']
for t in tasks:
conf.CELERY_TASK_ROUTES[t] = {'queue': instance.hostname, 'routing_key': instance.hostname}
routes_updated[t] = conf.CELERY_TASK_ROUTES[t]
conf.CELERY_ROUTES[t] = {'queue': instance.hostname, 'routing_key': instance.hostname}
routes_updated[t] = conf.CELERY_ROUTES[t]
return routes_updated