From 807cced57133cefc4c24189e52b0667650f17fb8 Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Fri, 9 Sep 2016 15:18:18 -0400 Subject: [PATCH] Implement a more dynamic celery queue system * Meant to be a starting point to more efficiently manage work routing and to balance work across all tower nodes * Integrate flower as a dev tool that starts alongside other nodes. Helpful for observing and monitoring the queues/exchanges * For the moment, force the task manager to only run on one node (not sure if this is needed) * Define queues and routes for all task work * Bump celery version to 3.1.23 * Expose flower through haproxy --- Makefile | 15 +++++++++++++-- Procfile | 1 + awx/main/tasks.py | 16 ++++++++-------- awx/settings/defaults.py | 20 ++++++++++++++++++++ requirements/requirements.txt | 2 +- requirements/requirements_dev.txt | 1 + tools/docker-compose-cluster.yml | 1 + tools/docker-compose.yml | 1 + tools/docker-compose/haproxy.cfg | 17 +++++++++++++++++ 9 files changed, 63 insertions(+), 11 deletions(-) diff --git a/Makefile b/Makefile index beea1eebd6..87a78a13d3 100644 --- a/Makefile +++ b/Makefile @@ -378,6 +378,12 @@ honcho: fi; \ honcho start +flower: + @if [ "$(VENV_BASE)" ]; then \ + . $(VENV_BASE)/tower/bin/activate; \ + fi; \ + $(PYTHON) manage.py celery flower --address=0.0.0.0 --port=5555 --broker=amqp://guest:guest@$(RABBITMQ_HOST):5672// + # Run the built-in development webserver (by default on http://localhost:8013). runserver: @if [ "$(VENV_BASE)" ]; then \ @@ -390,7 +396,8 @@ celeryd: @if [ "$(VENV_BASE)" ]; then \ . $(VENV_BASE)/tower/bin/activate; \ fi; \ - $(PYTHON) manage.py celeryd -l DEBUG -B --autoscale=20,2 -Ofair --schedule=$(CELERY_SCHEDULE_FILE) + $(PYTHON) manage.py celeryd -l DEBUG -B --autoscale=20,3 --schedule=$(CELERY_SCHEDULE_FILE) -Q projects,jobs,default + #$(PYTHON) manage.py celery multi show projects jobs default -l DEBUG -Q:projects projects -Q:jobs jobs -Q:default default -c:projects 1 -c:jobs 3 -c:default 3 -Ofair -B --schedule=$(CELERY_SCHEDULE_FILE) # Run to start the zeromq callback receiver receiver: @@ -403,7 +410,11 @@ taskmanager: @if [ "$(VENV_BASE)" ]; then \ . $(VENV_BASE)/tower/bin/activate; \ fi; \ - $(PYTHON) manage.py run_task_system + if [ "$(COMPOSE_HOST)" == "tower_1" ] || [ "$(COMPOSE_HOST)" == "tower" ]; then \ + $(PYTHON) manage.py run_task_system; \ + else \ + while true; do sleep 2; done; \ + fi socketservice: @if [ "$(VENV_BASE)" ]; then \ diff --git a/Procfile b/Procfile index a301a6aa1a..433417f70b 100644 --- a/Procfile +++ b/Procfile @@ -4,3 +4,4 @@ taskmanager: make taskmanager receiver: make receiver socketservice: make socketservice factcacher: make factcacher +flower: make flower \ No newline at end of file diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 877ed4b2d2..806a819e3e 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -80,7 +80,7 @@ def celery_startup(conf=None, **kwargs): except Exception as e: logger.error("Failed to rebuild schedule {}: {}".format(sch, e)) -@task() +@task(queue='default') def send_notifications(notification_list, job_id=None): if not isinstance(notification_list, list): raise TypeError("notification_list should be of type list") @@ -101,7 +101,7 @@ def send_notifications(notification_list, job_id=None): if job_id is not None: job_actual.notifications.add(notification) -@task(bind=True) +@task(bind=True, queue='default') def run_administrative_checks(self): if not tower_settings.TOWER_ADMIN_ALERTS: return @@ -122,11 +122,11 @@ def run_administrative_checks(self): tower_admin_emails, fail_silently=True) -@task(bind=True) +@task(bind=True, queue='default') def cleanup_authtokens(self): AuthToken.objects.filter(expires__lt=now()).delete() -@task(bind=True) +@task(bind=True, queue='default') def tower_periodic_scheduler(self): def get_last_run(): if not os.path.exists(settings.SCHEDULE_METADATA_LOCATION): @@ -177,7 +177,7 @@ def tower_periodic_scheduler(self): new_unified_job.socketio_emit_status("failed") emit_websocket_notification('/socket.io/schedules', 'schedule_changed', dict(id=schedule.id)) -@task() +@task(queue='default') def notify_task_runner(metadata_dict): """Add the given task into the Tower task manager's queue, to be consumed by the task system. @@ -185,7 +185,7 @@ def notify_task_runner(metadata_dict): queue = FifoQueue('tower_task_manager') queue.push(metadata_dict) -@task(bind=True) +@task(bind=True, queue='default') def handle_work_success(self, result, task_actual): if task_actual['type'] == 'project_update': instance = ProjectUpdate.objects.get(id=task_actual['id']) @@ -227,7 +227,7 @@ def handle_work_success(self, result, task_actual): for n in all_notification_templates], job_id=task_actual['id']) -@task(bind=True) +@task(bind=True, queue='default') def handle_work_error(self, task_id, subtasks=None): print('Executing error task id %s, subtasks: %s' % (str(self.request.id), str(subtasks))) @@ -294,7 +294,7 @@ def handle_work_error(self, task_id, subtasks=None): job_id=first_task_id) -@task() +@task(queue='default') def update_inventory_computed_fields(inventory_id, should_update_hosts=True): ''' Signal handler and wrapper around inventory.update_computed_fields to diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 4c3d605f4c..325536b535 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -8,6 +8,9 @@ import ldap import djcelery from datetime import timedelta +from kombu import Queue, Exchange +from kombu.common import Broadcast + # Update this module's local settings from the global settings module. from django.conf import global_settings this_module = sys.modules[__name__] @@ -326,6 +329,7 @@ os.environ.setdefault('DJANGO_LIVE_TEST_SERVER_ADDRESS', 'localhost:9013-9199') djcelery.setup_loader() BROKER_URL = 'redis://localhost/' +CELERY_DEFAULT_QUEUE = 'default' CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_ACCEPT_CONTENT = ['json'] @@ -335,6 +339,22 @@ CELERYD_TASK_SOFT_TIME_LIMIT = None CELERYBEAT_SCHEDULER = 'celery.beat.PersistentScheduler' CELERYBEAT_MAX_LOOP_INTERVAL = 60 CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend' +CELERY_QUEUES = ( + Queue('default', Exchange('default'), routing_key='default'), + Queue('jobs', Exchange('jobs'), routing_key='jobs'), + Broadcast('projects'), +) +CELERY_ROUTES = ({'awx.main.tasks.run_job': {'queue': 'jobs', + 'routing_key': 'jobs'}, + 'awx.main.tasks.run_project_update': {'queue': 'projects'}, + 'awx.main.tasks.run_inventory_update': {'queue': 'jobs', + 'routing_key': 'jobs'}, + 'awx.main.tasks.run_ad_hoc_command': {'queue': 'jobs', + 'routing_key': 'jobs'}, + 'awx.main.tasks.run_system_job': {'queue': 'jobs', + 'routing_key': 'jobs'} +}) + CELERYBEAT_SCHEDULE = { 'tower_scheduler': { 'task': 'awx.main.tasks.tower_periodic_scheduler', diff --git a/requirements/requirements.txt b/requirements/requirements.txt index 1a3ba9e7f3..fb5872f572 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -6,7 +6,7 @@ azure==2.0.0rc2 Babel==2.2.0 billiard==3.3.0.16 boto==2.40.0 -celery==3.1.10 +celery==3.1.23 cliff==1.15.0 cmd2==0.6.8 d2to1==0.2.11 # TODO: Still needed? diff --git a/requirements/requirements_dev.txt b/requirements/requirements_dev.txt index d7906ce28f..f7fef4a0d4 100644 --- a/requirements/requirements_dev.txt +++ b/requirements/requirements_dev.txt @@ -10,3 +10,4 @@ pytest-cov pytest-django pytest-pythonpath pytest-mock +flower diff --git a/tools/docker-compose-cluster.yml b/tools/docker-compose-cluster.yml index 86027f8849..1b1dee4041 100644 --- a/tools/docker-compose-cluster.yml +++ b/tools/docker-compose-cluster.yml @@ -11,6 +11,7 @@ services: ports: - "8013:8013" - "1936:1936" + - "5555:5555" tower_1: image: gcr.io/ansible-tower-engineering/tower_devel:${TAG} hostname: tower_1 diff --git a/tools/docker-compose.yml b/tools/docker-compose.yml index f34bb25766..08aec5babd 100644 --- a/tools/docker-compose.yml +++ b/tools/docker-compose.yml @@ -12,6 +12,7 @@ services: ports: - "8080:8080" - "8013:8013" + - "5555:5555" links: - postgres - memcached diff --git a/tools/docker-compose/haproxy.cfg b/tools/docker-compose/haproxy.cfg index cfbb3965f7..01d3c94a4a 100644 --- a/tools/docker-compose/haproxy.cfg +++ b/tools/docker-compose/haproxy.cfg @@ -17,6 +17,11 @@ frontend localnodes mode http default_backend nodes +frontend flower + bind *:5555 + mode http + default_backend flower_nodes + backend nodes mode http balance roundrobin @@ -29,6 +34,18 @@ backend nodes server tower_2 tower_2:8013 check server tower_3 tower_3:8013 check +backend flower_nodes + mode http + balance roundrobin + option forwardfor + option http-pretend-keepalive + http-request set-header X-Forwarded-Port %[dst_port] + http-request add-header X-Forwarded-Proto https if { ssl_fc } + #option httpchk HEAD / HTTP/1.1\r\nHost:localhost + server tower_1 tower_1:5555 + server tower_2 tower_2:5555 + server tower_3 tower_3:5555 + listen stats bind *:1936 stats enable