diff --git a/awx/main/dispatch/publish.py b/awx/main/dispatch/publish.py index fe77bd4c37..02fca647f6 100644 --- a/awx/main/dispatch/publish.py +++ b/awx/main/dispatch/publish.py @@ -8,7 +8,7 @@ from uuid import uuid4 from django.conf import settings from django.db import connection -from . import pg_bus_conn +from . import pg_bus_conn, get_local_queuename logger = logging.getLogger('awx.main.dispatch') @@ -73,9 +73,12 @@ class task: kwargs = kwargs or {} queue = ( queue or - getattr(cls.queue, 'im_func', cls.queue) or - settings.CELERY_DEFAULT_QUEUE + getattr(cls.queue, 'im_func', cls.queue) ) + if not queue: + msg = f'{cls.name}: Queue value required and may not me None' + logger.error(msg) + raise ValueError(msg) obj = { 'uuid': task_id, 'args': args, diff --git a/awx/main/management/commands/run_dispatcher.py b/awx/main/management/commands/run_dispatcher.py index c62dc6c732..b678797026 100644 --- a/awx/main/management/commands/run_dispatcher.py +++ b/awx/main/management/commands/run_dispatcher.py @@ -63,7 +63,7 @@ class Command(BaseCommand): # https://bugs.python.org/issue37429 AWXProxyHandler.disable() try: - queues = ['tower_broadcast_all'] + settings.AWX_CELERY_QUEUES_STATIC + [get_local_queuename()] + queues = ['tower_broadcast_all', get_local_queuename()] consumer = AWXConsumerPG( 'dispatcher', None, diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 253eb7b57f..6ab9ee6066 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -36,6 +36,7 @@ from awx.main.models.base import ( NotificationFieldsModel, prevent_search ) +from awx.main.dispatch import get_local_queuename from awx.main.dispatch.control import Control as ControlDispatcher from awx.main.registrar import activity_stream_registrar from awx.main.models.mixins import ResourceMixin, TaskManagerUnifiedJobMixin @@ -1466,7 +1467,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique return r def get_queue_name(self): - return self.controller_node or self.execution_node or settings.CELERY_DEFAULT_QUEUE + return self.controller_node or self.execution_node or get_local_queuename() def is_isolated(self): return bool(self.controller_node) diff --git a/awx/main/scheduler/tasks.py b/awx/main/scheduler/tasks.py index c0d3dd842e..7da6a305a9 100644 --- a/awx/main/scheduler/tasks.py +++ b/awx/main/scheduler/tasks.py @@ -5,11 +5,12 @@ import logging # AWX from awx.main.scheduler import TaskManager from awx.main.dispatch.publish import task +from awx.main.dispatch import get_local_queuename logger = logging.getLogger('awx.main.scheduler') -@task() +@task(queue=get_local_queuename) def run_task_manager(): logger.debug("Running Tower task manager.") TaskManager().schedule() diff --git a/awx/main/tasks.py b/awx/main/tasks.py index e45f34cd66..4ab008cd31 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -151,7 +151,7 @@ def inform_cluster_of_shutdown(): logger.exception('Encountered problem with normal shutdown signal.') -@task() +@task(queue=get_local_queuename) def apply_cluster_membership_policies(): started_waiting = time.time() with advisory_lock('cluster_policy_lock', wait=True): @@ -307,7 +307,7 @@ def profile_sql(threshold=1, minutes=1): logger.error('SQL QUERIES >={}s ENABLED FOR {} MINUTE(S)'.format(threshold, minutes)) -@task() +@task(queue=get_local_queuename) def send_notifications(notification_list, job_id=None): if not isinstance(notification_list, list): raise TypeError("notification_list should be of type list") @@ -336,7 +336,7 @@ def send_notifications(notification_list, job_id=None): logger.exception('Error saving notification {} result.'.format(notification.id)) -@task() +@task(queue=get_local_queuename) def gather_analytics(): from awx.conf.models import Setting from rest_framework.fields import DateTimeField @@ -492,7 +492,7 @@ def awx_isolated_heartbeat(): isolated_manager.IsolatedManager(CallbackQueueDispatcher.dispatch).health_check(isolated_instance_qs) -@task() +@task(queue=get_local_queuename) def awx_periodic_scheduler(): with advisory_lock('awx_periodic_scheduler_lock', wait=False) as acquired: if acquired is False: @@ -549,7 +549,7 @@ def awx_periodic_scheduler(): state.save() -@task() +@task(queue=get_local_queuename) def handle_work_success(task_actual): try: instance = UnifiedJob.get_instance_by_type(task_actual['type'], task_actual['id']) @@ -562,7 +562,7 @@ def handle_work_success(task_actual): schedule_task_manager() -@task() +@task(queue=get_local_queuename) def handle_work_error(task_id, *args, **kwargs): subtasks = kwargs.get('subtasks', None) logger.debug('Executing error task id %s, subtasks: %s' % (task_id, str(subtasks))) @@ -602,7 +602,7 @@ def handle_work_error(task_id, *args, **kwargs): pass -@task() +@task(queue=get_local_queuename) def update_inventory_computed_fields(inventory_id): ''' Signal handler and wrapper around inventory.update_computed_fields to @@ -644,7 +644,7 @@ def update_smart_memberships_for_inventory(smart_inventory): return False -@task() +@task(queue=get_local_queuename) def update_host_smart_inventory_memberships(): smart_inventories = Inventory.objects.filter(kind='smart', host_filter__isnull=False, pending_deletion=False) changed_inventories = set([]) @@ -660,7 +660,7 @@ def update_host_smart_inventory_memberships(): smart_inventory.update_computed_fields() -@task() +@task(queue=get_local_queuename) def delete_inventory(inventory_id, user_id, retries=5): # Delete inventory as user if user_id is None: @@ -1478,7 +1478,7 @@ class BaseTask(object): -@task() +@task(queue=get_local_queuename) class RunJob(BaseTask): ''' Run a job using ansible-playbook. @@ -1911,7 +1911,7 @@ class RunJob(BaseTask): update_inventory_computed_fields.delay(inventory.id) -@task() +@task(queue=get_local_queuename) class RunProjectUpdate(BaseTask): model = ProjectUpdate @@ -2321,7 +2321,7 @@ class RunProjectUpdate(BaseTask): return getattr(settings, 'AWX_PROOT_ENABLED', False) -@task() +@task(queue=get_local_queuename) class RunInventoryUpdate(BaseTask): model = InventoryUpdate @@ -2589,7 +2589,7 @@ class RunInventoryUpdate(BaseTask): ) -@task() +@task(queue=get_local_queuename) class RunAdHocCommand(BaseTask): ''' Run an ad hoc command using ansible. @@ -2779,7 +2779,7 @@ class RunAdHocCommand(BaseTask): isolated_manager_instance.cleanup() -@task() +@task(queue=get_local_queuename) class RunSystemJob(BaseTask): model = SystemJob @@ -2853,7 +2853,7 @@ def _reconstruct_relationships(copy_mapping): new_obj.save() -@task() +@task(queue=get_local_queuename) def deep_copy_model_obj( model_module, model_name, obj_pk, new_obj_pk, user_pk, sub_obj_list, permission_check_func=None diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index f6ad6f6d7c..63a5b74be6 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -423,7 +423,6 @@ BROKER_DURABILITY = True BROKER_POOL_LIMIT = None BROKER_URL = 'redis://localhost:6379' BROKER_TRANSPORT_OPTIONS = {} -CELERY_DEFAULT_QUEUE = 'awx_private_queue' CELERYBEAT_SCHEDULE = { 'tower_scheduler': { 'task': 'awx.main.tasks.awx_periodic_scheduler', @@ -452,14 +451,6 @@ CELERYBEAT_SCHEDULE = { # 'isolated_heartbeat': set up at the end of production.py and development.py } -AWX_CELERY_QUEUES_STATIC = [ - CELERY_DEFAULT_QUEUE, -] - -AWX_CELERY_BCAST_QUEUES_STATIC = [ - 'tower_broadcast_all', -] - ASGI_AMQP = { 'INIT_FUNC': 'awx.prepare_env', 'MODEL': 'awx.main.models.channels.ChannelGroup',