remove support for multi-reader dispatch queue

* Under the new postgres backed notify/listen message queue, this never
actually worked. Without using the database to store state, we can not
provide a at-most-once delivery mechanism w/ multi-readers.
* With this change, work is done ONLY on the node that requested for the
work to be done. Under rabbitmq, the node that was first to get the
message off the queue would do the work; presumably the least busy node.
This commit is contained in:
chris meyers 2020-01-16 14:26:16 -05:00 committed by Ryan Petrello
parent 50b56aa8cb
commit dc6c353ecd
No known key found for this signature in database
GPG Key ID: F2AA5F2122351777
6 changed files with 26 additions and 30 deletions

View File

@ -8,7 +8,7 @@ from uuid import uuid4
from django.conf import settings
from django.db import connection
from . import pg_bus_conn
from . import pg_bus_conn, get_local_queuename
logger = logging.getLogger('awx.main.dispatch')
@ -73,9 +73,12 @@ class task:
kwargs = kwargs or {}
queue = (
queue or
getattr(cls.queue, 'im_func', cls.queue) or
settings.CELERY_DEFAULT_QUEUE
getattr(cls.queue, 'im_func', cls.queue)
)
if not queue:
msg = f'{cls.name}: Queue value required and may not me None'
logger.error(msg)
raise ValueError(msg)
obj = {
'uuid': task_id,
'args': args,

View File

@ -63,7 +63,7 @@ class Command(BaseCommand):
# https://bugs.python.org/issue37429
AWXProxyHandler.disable()
try:
queues = ['tower_broadcast_all'] + settings.AWX_CELERY_QUEUES_STATIC + [get_local_queuename()]
queues = ['tower_broadcast_all', get_local_queuename()]
consumer = AWXConsumerPG(
'dispatcher',
None,

View File

@ -36,6 +36,7 @@ from awx.main.models.base import (
NotificationFieldsModel,
prevent_search
)
from awx.main.dispatch import get_local_queuename
from awx.main.dispatch.control import Control as ControlDispatcher
from awx.main.registrar import activity_stream_registrar
from awx.main.models.mixins import ResourceMixin, TaskManagerUnifiedJobMixin
@ -1466,7 +1467,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
return r
def get_queue_name(self):
return self.controller_node or self.execution_node or settings.CELERY_DEFAULT_QUEUE
return self.controller_node or self.execution_node or get_local_queuename()
def is_isolated(self):
return bool(self.controller_node)

View File

@ -5,11 +5,12 @@ import logging
# AWX
from awx.main.scheduler import TaskManager
from awx.main.dispatch.publish import task
from awx.main.dispatch import get_local_queuename
logger = logging.getLogger('awx.main.scheduler')
@task()
@task(queue=get_local_queuename)
def run_task_manager():
logger.debug("Running Tower task manager.")
TaskManager().schedule()

View File

@ -151,7 +151,7 @@ def inform_cluster_of_shutdown():
logger.exception('Encountered problem with normal shutdown signal.')
@task()
@task(queue=get_local_queuename)
def apply_cluster_membership_policies():
started_waiting = time.time()
with advisory_lock('cluster_policy_lock', wait=True):
@ -307,7 +307,7 @@ def profile_sql(threshold=1, minutes=1):
logger.error('SQL QUERIES >={}s ENABLED FOR {} MINUTE(S)'.format(threshold, minutes))
@task()
@task(queue=get_local_queuename)
def send_notifications(notification_list, job_id=None):
if not isinstance(notification_list, list):
raise TypeError("notification_list should be of type list")
@ -336,7 +336,7 @@ def send_notifications(notification_list, job_id=None):
logger.exception('Error saving notification {} result.'.format(notification.id))
@task()
@task(queue=get_local_queuename)
def gather_analytics():
from awx.conf.models import Setting
from rest_framework.fields import DateTimeField
@ -492,7 +492,7 @@ def awx_isolated_heartbeat():
isolated_manager.IsolatedManager(CallbackQueueDispatcher.dispatch).health_check(isolated_instance_qs)
@task()
@task(queue=get_local_queuename)
def awx_periodic_scheduler():
with advisory_lock('awx_periodic_scheduler_lock', wait=False) as acquired:
if acquired is False:
@ -549,7 +549,7 @@ def awx_periodic_scheduler():
state.save()
@task()
@task(queue=get_local_queuename)
def handle_work_success(task_actual):
try:
instance = UnifiedJob.get_instance_by_type(task_actual['type'], task_actual['id'])
@ -562,7 +562,7 @@ def handle_work_success(task_actual):
schedule_task_manager()
@task()
@task(queue=get_local_queuename)
def handle_work_error(task_id, *args, **kwargs):
subtasks = kwargs.get('subtasks', None)
logger.debug('Executing error task id %s, subtasks: %s' % (task_id, str(subtasks)))
@ -602,7 +602,7 @@ def handle_work_error(task_id, *args, **kwargs):
pass
@task()
@task(queue=get_local_queuename)
def update_inventory_computed_fields(inventory_id):
'''
Signal handler and wrapper around inventory.update_computed_fields to
@ -644,7 +644,7 @@ def update_smart_memberships_for_inventory(smart_inventory):
return False
@task()
@task(queue=get_local_queuename)
def update_host_smart_inventory_memberships():
smart_inventories = Inventory.objects.filter(kind='smart', host_filter__isnull=False, pending_deletion=False)
changed_inventories = set([])
@ -660,7 +660,7 @@ def update_host_smart_inventory_memberships():
smart_inventory.update_computed_fields()
@task()
@task(queue=get_local_queuename)
def delete_inventory(inventory_id, user_id, retries=5):
# Delete inventory as user
if user_id is None:
@ -1478,7 +1478,7 @@ class BaseTask(object):
@task()
@task(queue=get_local_queuename)
class RunJob(BaseTask):
'''
Run a job using ansible-playbook.
@ -1911,7 +1911,7 @@ class RunJob(BaseTask):
update_inventory_computed_fields.delay(inventory.id)
@task()
@task(queue=get_local_queuename)
class RunProjectUpdate(BaseTask):
model = ProjectUpdate
@ -2321,7 +2321,7 @@ class RunProjectUpdate(BaseTask):
return getattr(settings, 'AWX_PROOT_ENABLED', False)
@task()
@task(queue=get_local_queuename)
class RunInventoryUpdate(BaseTask):
model = InventoryUpdate
@ -2589,7 +2589,7 @@ class RunInventoryUpdate(BaseTask):
)
@task()
@task(queue=get_local_queuename)
class RunAdHocCommand(BaseTask):
'''
Run an ad hoc command using ansible.
@ -2779,7 +2779,7 @@ class RunAdHocCommand(BaseTask):
isolated_manager_instance.cleanup()
@task()
@task(queue=get_local_queuename)
class RunSystemJob(BaseTask):
model = SystemJob
@ -2853,7 +2853,7 @@ def _reconstruct_relationships(copy_mapping):
new_obj.save()
@task()
@task(queue=get_local_queuename)
def deep_copy_model_obj(
model_module, model_name, obj_pk, new_obj_pk,
user_pk, sub_obj_list, permission_check_func=None

View File

@ -423,7 +423,6 @@ BROKER_DURABILITY = True
BROKER_POOL_LIMIT = None
BROKER_URL = 'redis://localhost:6379'
BROKER_TRANSPORT_OPTIONS = {}
CELERY_DEFAULT_QUEUE = 'awx_private_queue'
CELERYBEAT_SCHEDULE = {
'tower_scheduler': {
'task': 'awx.main.tasks.awx_periodic_scheduler',
@ -452,14 +451,6 @@ CELERYBEAT_SCHEDULE = {
# 'isolated_heartbeat': set up at the end of production.py and development.py
}
AWX_CELERY_QUEUES_STATIC = [
CELERY_DEFAULT_QUEUE,
]
AWX_CELERY_BCAST_QUEUES_STATIC = [
'tower_broadcast_all',
]
ASGI_AMQP = {
'INIT_FUNC': 'awx.prepare_env',
'MODEL': 'awx.main.models.channels.ChannelGroup',